AMQP 0-9-1模型解释

本指南提供了AMQP 0-9-1协议的概述,该协议是RabbitMQ支持的协议之一。

AMQP 0-9-1

AMQP 0-9-1(高级消息队列协议)是一种消息传递协议,它使符合标准的客户端应用程序能够与符合标准的消息传递中间件代理进行通信。

代理和他们的规则

消息传递代理接收来自发布者的消息(发布它们的应用程序,也称为生产者),并将它们发送给消费者(处理它们的应用程序)。

由于它是一种网络协议,发布者、消费者和代理都可以在不同的机器上。

AMQP 0-9-1模型简述

AMQP 0-9-1模型具有以下的世界观:信息被发布到交换(exchange),经常被比作邮局或邮箱。交换使用称为绑定的规则将消息副本分发到队列。然后AMQP代理交付消息到订阅队列的消费者,或消费者根据需要从队列拉取消息。

当发布消息时,发布者可以指定各种消息属性(消息元数据)。一些元数据可能会被代理使用,但其余的对于代理来说是完全不透明的,并且仅接收消息的应用程序使用。

网络是不可靠的,应用程序可能无法处理消息,因此AMQP模型有一个消息确认的概念。当消息被交付到消费者时,消费者既可以自动,同样应用开发者也可以选择手动通知代理。当使用消息确认时,代理只会在接收到该消息(或一组消息)的通知时,才会完全删除队列中的消息。

在某些情况下,例如,当一个消息不能被路由(routed),消息可能会被返回给发布者,放弃,或如果代理实现了扩展,则将其放入所谓的“dead letter queue”中。发布者通过发布消息使用某些参数来选择如何处理这样的情况。

队列(queue)、交换(exchange)和绑定(binding)统称为AMQP实体。

AMQP是一种可编程的协议

AMQP 0-9-1是一种可编程的协议,从某种意义上说,AMQP 0-9-1实体和路由方案主要由应用程序自己定义,而不是代理管理员。因此,为声明队列、交换,定义他们之间的绑定,订阅队列等协议操作制定了条款。

这给了应用程序开发人员很大的自由,但同时也要求他们注意潜在的定义冲突。在实践中,定义冲突是罕见的,而通常表明错误配置。

应用程序声明它们需要的AMQP 0-9-1实体,定义必要的路由方案,并在不再使用它们时,可以选择删除AMQP 0-9-1实体。

交换和交换类型

交换是消息发送的AMQP实体。交换会获取到一个消息,路由到零个或多个队列。使用的路由算法取决于交换类型和称为绑定的规则。AMQP 0-9-1代理提供了四种交换类型:

类型 默认预先声明的名字
Direct exchange 空字符串和 amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match 和amq.headers

处理交换类型,交换可以声明许多属性,其中最重要的是:

  • Name
  • Durability - 代理重启时交换存活下来
  • Auto-delete - 当最后的一个队列解绑交换将会删除
  • Arguments - 可选的,被插件和代理特定的功能使用

交换可以是持久的也可以是瞬态的。持久的交换会在代理重启存活下来,然而瞬态的交换则不会(当代理重新上线时必须重新声明)。并非所有的场景和用例都要求交换是持久的。

默认的交换

默认的交换是由代理预先声明没有名字(空字符串)的direct交换。它有一个特殊的属性使它对简单的应用程序非常有用:创建的每个队列都将使用与队列名称相同的路由键自动绑定到它。

例如,当你声明一个search-indexing-online的队列,AMQP 0-9-1代理将会使用search-indexing-online作为路由键绑定到默认的交换。因此,一个消息使用路由键search-indexing-online发布到默认交换,将会被路由到队列search-indexing-online。换句话说,换句话说,默认的交换使它看起来可以直接向队列发送消息,尽管从技术上讲这并不是正在发生的事情。

可以在rabbitmqdefault-exchange中找到对应的示例。

Direct交换

Direct交换是基于消息的routing key交付消息到队列。Direct交换是对消息单播路由的理想选择(尽管它们也可以用于多播路由)。下面是它的工作原理:

  • 队列使用路由键K绑定到交换
  • 当使用路由键R的新消息到达直接交换时,如果K = R,交换器会将其路由到队列中。

Direct交换通常用来以一种循环的方式在多个woker之间分配任务(同一个应用的实例)。在这样做的时候,重要的一点是,在AMQP 0-9-1中,消息是在消费者之间,而不是在队列之间进行负载均衡。

Direct交换可以用图形方式表示如下:

Fanout交换

一个fanout交换路由消息到所有绑定到它的队列,而路由键被忽略。如果N个队列被绑定到一个fanout交换,当一个新的消息被发布到该交换时,一个消息的副本将被发送到所有的N队列。Fanout交换是消息的广播路由的理想选择。

因为一个fanout交换器将一个消息的副本传递给每个队列,所以它的用例非常相似:

  • 大型多人在线游戏(MMO)可以使用它来进行排行榜更新或其他全局事件
  • 体育新闻网站可以使用fanout交换,近实时地向移动客户分发比分更新
  • 分布式系统可以广播各种状态和配置更新
  • 群组聊天可以在使用fanout交换的参与者之间分发消息(尽管AMQP没有一个内置的概念存在,所以XMPP可能是一个更好的选择)。

一个fanout交换可以用图形方式表示如下:

Topic交换

主题交换基于消息路由键和用于将队列绑定到交换的模式之间的匹配,将消息路由到一个或多个队列。主题交换类型通常用于实现各种发布/订阅模式的变化。主题交换通常用于消息的多播路由。

Topic交换有一个非常广泛的用例。当一个问题涉及多个消费者/应用程序,它们有选择性地选择它们想要什么类型的消息时,应该考虑使用主题交换。

使用示例:

  • 发布与特定地理位置相关的数据,例如销售点
  • 由多个woker完成的后台任务处理,每一个都能够处理特定的任务集
  • 股票价格更新(以及其他金融数据的更新)
  • 包含分类或标记的新闻更新(例如,只针对特定的体育或团队)
  • 云计算服务的编制
  • 分布式架构/特定于OS的软件构建或打包,每个构建器只能处理一个架构或操作系统

Headers交换

headers交换被设计用于在多个属性上进行路由,这些属性更容易表示为消息头,而不是路由键。Headers交换忽略路由键属性。相反,用于路由的属性是从header属性中获取的。如果消息头的值等于绑定上指定的值,那么消息就被认为是匹配的。

可以使用多个头进行匹配将一个队列绑定到headers交换。在这种情况下,代理需要从应用程序开发人员获得更多信息,也就是说,它是否应该考虑消息与任何头匹配,或者所有的?这就是“x-match”绑定参数的用途。当“x-match”参数被设置为“any”时,只有一个匹配的头值就足够了。或者,将“x-match”设置为“all”的命令,所有的值必须匹配。

Headers交换可以被看作是“Direct交换的”。因为它们是基于header值进行路由的,它们可以被用作直接的交换,路由键不必是字符串;例如,它可以是一个整数或哈希(字典)。

队列

AMQP 0-9-1模型中的队列与其他消息和任务排队系统中的队列非常相似;他们存储被应用程序消费的消息。队列可以与交换共享一些属性,但也有一些附加属性:

  • Name
  • Durable - 代理重启存活
  • Exclusive - 仅能被一个连接使用,当连接关闭队列将被删除
  • Auto-delete - 队列至少有一个消费者如果最后一个消费者取消订阅将被删除
  • Arguments - 可选的;被插件和代理特定的功能使用,例如新消息TTL,队列长度限制等。

在使用队列之前必须要声明。队列的声明会在队列不存在的情况下会创建。如果队列已经存在,并且其属性与声明中的属性相同,则声明将没有影响。当现有队列属性与声明中不同,一个通道级异常(PRECONDITION_FAILED)代码406将抛出。

队列名称

应用程序可以选择一个队列名称或者要求代理为它们声明一个。队列名称最多可以达到255字节的utf-8字符。AMQP 0-9-1代理可以为应用程序生成一个惟一的队列名称。要使用此功能,请将空字符串作为队列名称参数传递。生成的名称和队列声明的响应返回给客户端。

以”amq.”开头的队列名称留作代理内部使用。尝试使用违反该规则的名称声明队列将会导致一个channel级别的异常回复代码403(ACCESS_REFUSED)。

队列持久性

持久队列被持久化到磁盘中,因此可以在代理重新启动时存活。非持久的队列我们成为瞬态。并不是所有的场景和用例都要求队列是持久的。

队列的持久性不会使被路由到该队列的消息持久。如果代理被取下,然后重新启动,在代理启动时将重新声明持久队列,但是只会恢复持久性消息。

绑定(Binding)

绑定是交换用来路由消息到队列的规则。要指示交换E将消息发送到队列Q,Q必须绑定到E。绑定可能有一些交换类型使用的可选的路由键属性。路由键的目的是选择发布到交换的某些消息,以被路由到绑定队列。换句话说,路由键就像过滤器一样。

描述一个类比:

  • 队列就像你在纽约的目的地
  • 交换箱JFK机场
  • 绑定是从JFK到你的目的地的路线。可以有零或多种途径

有了这个间接层允许不能实现或非常难实现的直接发送到队列的路由场景 ,也可以消除一些应用程序开发人员必须要做,重复的工作。

如果AMQP消息不能被路由到任何队列(例如,因为没有对其发布的交换的绑定),那么它要么被丢弃,要么返回给发布者,这取决于发布者所设置的消息属性。

消费者

在队列中存储消息是没有用的,除非应用程序消费它们。在AMQP 0-9-1模型中,应用程序有两种方法:

  • 将消息交付给他们(Push API)
  • 根据需要获取信息(Pull API)

使用“push API”,应用程序必须表明对来自特定队列的消息的兴趣。当他们这样做的时候,我们说他们注册了一个消费者,或者简单地说,订阅了一个队列。每个队列有一个以上的消费者,或者注册一个独占的消费者(将所有其他的消费者排除在队列之外)。

每个消费者(订阅)都有一个称为消费者标签(consumer tag)的标识符。它可以用于从消息中取消订阅。消费者标签只是字符串。

消息确认

消费者应用程序—接收和处理消息的应用程序—可能偶尔会处理个别的消息失败,或者有时会崩溃。网络问题也有可能造成问题。这就提出了一个问题:AMQP代理何时应该从队列中删除消息?amqp 0-9-1规范提出了两种选择:

  • 代理向应用程序发送一条消息后(使用basic.deliver或basic.get-ok AMQP方法)。
  • 应用程序发送回确认后(使用basic.ack AMQP方法)

前者被称为自动确认模型,后者被称为显式的确认模型。使用显式模型,应用程序选择什么时候发送一个确认。它可能是在收到一条信息后,或者在处理之前将它持久化到一个数据仓库中,或者在完全处理消息之后(例如,成功地获取一个Web页面,处理并将其存储到某个持久的数据存储中)。

如果一个消费者在没有发送消息的情况下死亡,AMQP代理将把它重新交付给另一个消费者,或者,如果当时没有可用的服务,在尝试重新提交之前,代理将等待至少一个用户注册到相同的队列中。

拒绝消息

当使用者应用程序接收到消息时,该消息的处理可能会成功,也可能不会成功。一个应用程序可以通过拒绝一个消息,向代理表明消息处理已经失败(或者不能在当时完成)。当拒绝消息时,应用程序可以要求代理放弃或重新请求它。当队列中只有一个消费者时,请确保您不会通过拒绝并重新排队(requeue)使用同一消费者的消息来创建无限的消息传递循环。

否定的(Negative )确认

使用AMQP basic.reject方法拒绝消息。basic.reject有一个限制:没有办法拒绝多个消息,虽然您可以使用确认这么做。但是,如果使用RabbitMQ,则有一个解决方案。RabbitMQ提供了一个AMQP 0-9-1扩展,称为确否定确认或nacks。更多的信息,请参考帮助页

预取消息

对于多个消费者共享一个队列的情况,在发送下一个确认消息之前,可以指定每个消费者可以同时发送多少条消息是有用的。这可以作为一种简单的负载平衡技术,或者如果消息倾向于批量发布,则可以提高吞吐量。例如,如果一个生产应用程序每分钟都发送消息,这是因为它所做的工作的性质。

请注意,RabbitMQ只支持Channel级别的预取计数,而不是基于Connection或基于大小的预取。

消息属性和有效载荷(Payload)

AMQP模型中的消息具有属性。有些属性非常常见,AMQP 0-9-1规范定义了它们,应用程序开发人员不必考虑确切的属性名。一些示例:

  • 内容类型
  • 内容编码
  • 路由键
  • 交付模式(持续的或不)
  • 消息优先级
  • 消息发布时间戳
  • 过期期限
  • 发布者应用id

AMQP代理使用了一些属性,但大多数都是对接受它们的应用程序开放的。有些属性是可选的,并且被认为是header。他们类似于HTTP中的X-Headers。消息属性是在消息发布时设置的。

AMQP消息也有一个有效负载(它们携带的数据),AMQP代理将其视为一个不透明的字节数组。代理将不检查或修改有效负载。消息只包含属性和没有有效负载是可能的。常见的是使用序列化格式,如JSON、Thrift、Protocol Buffer和MessagePack来序列化结构化数据,以便将其作为消息有效负载发布。AMQP对等点通常使用“内容类型”和“内容编码”字段来传达这些信息,但这仅仅是惯例。

消息可能以持久的形式发布,这使得AMQP代理将它们持久化到磁盘中。如果服务器重新启动,系统将确保接收到的持久消息不会丢失。简单地将消息发布到持久的交换中,或者将其路由到持久的队列(s)的事实并不会使消息持久:这完全取决于其本身的持久性模式。将消息发布为持久的影响性能(就像数据存储一样,持久性在性能上是有代价的)。

消息确认

由于网络是不可靠的,应用程序也会失败,所以通常需要有某种类型的处理确认。有时只需要承认已经收到消息的事实。有时确认意味着消息经过消费者的验证和处理,例如,被验证为具有强制数据,并持久化到一个数据存储或索引。

这种情况非常常见,所以AMQP 0-9-1有一个称为消息确认的内置功能(有时称为“ack”),用户可以使用它来确认消息的传递和/或处理。如果应用程序崩溃(AMQP代理在连接关闭时注意到这一点),如果对一条消息的确认是预期的,而AMQP代理没有接收到,则消息将重新排队(如果存在,则可能立即传递给另一个使用者)。

在协议中内置了确认信息,可以帮助开发人员构建更健壮的软件。

AMQP 0-9-1 方法

AMQP 0-9-1是由许多方法构成的。方法是操作(比如HTTP方法),与面向对象编程语言中的方法没有什么共同之处。AMQP方法被集合到类中。类只是AMQP方法的逻辑分组。AMQP 0-9-1参考资料提供了所有AMQP方法的详细信息。

让我们来看看交换类,这是一组与交换操作有关的方法。它包括以下操作:

  • exchange.declare
  • exchange.declare_ok
  • exchange.delete
  • exchange.delete_ok

(请注意,RabbitMQ站点参考还包括对exchange类的特定扩展,这些扩展是我们在本指南中不会讨论的)。

上面的操作是逻辑对的;exchange.declareexchange.declare-okexchange.deleteexchange.delete-ok。这些操作是“请求”(由客户端发送)和“响应”(由代理响应上述的“请求”)。

例如,客户端请求代理使用exchange.declare声明一个新的交换。声明方法:

如上图所示,exchange.declare携带了几个参数。可以使客户端能够指定交换名称、类型、持久性标志等等。

如果操作成功,代理使用exchange.declare-ok方法来响应:

exchange.declare-ok除了通道号(channel number),不会携带任何参数。

事件的发生顺序与AMQP queue类的另一个方法对,queue-declarequeue-declare-ok非常相似。

并非所有的AMQP方法都有对应的方法。一些(basic.publish作为最广泛使用的一种)没有对应的的“响应”方法和其他(例如,basic.get)有不止一种可能的“响应”。

连接

AMQP连接通常是长期存在的。AMQP是一种应用程序级协议,它使用TCP进行可靠的传递。AMQP连接使用身份验证,可以使用TLS(SSL)保护。当应用程序不再需要连接到AMQP代理时,它应该优雅地关闭AMQP连接,而不是突然关闭底层的tcp连接。

通道

一些应用程序需要多个AMQP代理连接。但是,同时保持许多TCP连接是不可取的,因为这样做会消耗系统资源,并且使配置防火墙变得更加困难。AMQP 0-9-1连接是多路复用的通道,可以被认为是“共享一个TCP连接的轻重量连接”。

对于使用多个线程/进程进行处理的应用程序,在每个线程/进程中打开一个新的通道,而不是在它们之间共享通道是很常见的。

特定通道上的通信完全独立于另一个通道上的通信,因此每个AMQP方法也都包含一个通道号,客户端使用该通道来确定该方法的哪个通道(例如,需要调用哪个事件处理程序)。

虚拟主机(virtual host)

为了使单个代理能够承载多个隔离的“环境”(用户、交换、队列等),AMQP包含了虚拟主机(vhost)的概念。它们类似于许多受欢迎的Web服务器使用的虚拟主机,并提供了完全隔离的环境,在这些环境中AMQP实体可以生存。AMQP客户机在AMQP连接协商中指定它们想要使用的vhost。

AMQP是可扩展的

AMQP 0-9-1有几个扩展点:

  • 自定义交换类型允许开发人员实现路由方案,交换类型提供的开箱即用的类型不能很好地覆盖,例如基于地理数据的路由。
  • 交换和队列的声明可以包括代理可以使用的附加属性。例如,在RabbitMQ中,每个队列的消息TTL都是这样实现的。
  • 对协议的特定于代理的扩展。例如扩展RabbitMQ实现
  • 新的AMQP 0-9-1方法类被引入
  • 可以使用扩展的插件扩展代理,例如,RabbitMQ管理前端和HTTP API是作为一个插件实现的。

这些特性使AMQP 0-9-1模型更加灵活,适用于非常广泛的问题。

PS: 以上所展示AMQP中相应概念的代码已在我的github

坚持原创技术分享,更多深度分析、实践代码,您的支持将鼓励我继续创作!