JMS-AMQP
JMS vs AMQP
JMS: 全称是 Java Message Service,是 J2EE 的一部分,由 JSR 914 定义,是和面向消息的中间件(Message Ortiented Middleware, MOM)通信的标准 API。两个 Java 应用使用 JMS client 来进行通信,以实现解耦。常用的 JMS 消息中间件有 ActiveMQ,HornetQ 等。
AMQP:Advanced Message Queuing Protocol,是一种用于传送消息的标准应用层协议。支持消息队列、路由,支持P2P、pub-sub 等模式。常用的 AMQP 中间件有 RabbitMQ,StormMQ,RocketMQ 等。
两者的区别: JMS 是 API,JMS 的客户端实现了这套 API 来和 JMS 的服务端交互,至于交互使用的协议是什么,并没有限制。 而 AMQP 是一种协议,消息的客户端和服务端必须遵循这套协议。 所以,JMS 的客户端可以使用 AMQP 作为和服务端通信的协议。 相比于 JMS,AMQP 没有限制语言、平台、中间件类型,更为灵活。
参考
AMQP 0-9-1 协议
AMQP 0-9-1 协议是一种客户端应用和消息中间件之间通信遵循的协议。
注:
- 翻译自: amqp-concepts
- 0-9-1 是指的 AMQP 的版本号。
消息中间件
Message broker (broker:释义为中间人,经济商),从 publisher/producer 应用接收消息,将消息路由给 consumer。
AMQP 0-9-1 模型简介
- 消息从生产者发送给 exchanges
- exchange 将消息的副本分发给 queues,分发的过程中使用的规则叫做 bindings
- 中间件将消息发送给订阅了 queue 的消费者,或者消费者从 queue 中“拉取”消息
生产者发布消息时,可以指定消息的属性(message meta-data)。这些属性中部分可以被中间件使用,例如用作 binding。
考虑到网络的不可靠性、消费者应用可能会处理消息失败,AMQP 模型提出了 message acknowledgements 的概念:当消息传送到消费者端时,消费者会通知中间件已经收到消息。通知的时机可以是马上,或者由消费者应用的开发者来决定。中间件只有当收到该通知时,才会从队列中移除消息。
在某些情况下,消息无法被路由,可能会被返还给生产者,或者丢弃,或者进入死信队列(dead letter queue)。
Exchanges,queues,bindings 被统称为 AMQP entities。
AMQP 0-9-1 是一个可编程协议
这句话的意思是,AMQP 0-9-1 的 entities 和路由规则是由应用定义的,而不是中间件管理员。
Exchanges 和 Exchange 类型
Exchange 的职责是:获取消息,然后路由给0个或者多个队列。路由的算法由 exchange 类型和 bindings 规则决定。 共有四种 exchange 类型:
- Direct exchange,默认预定义名称是
(Empty string) and amq.direct
- Fanout exchange,默认预定义名称是
amq.fanout
- Topic exchange,默认预定义名称是
amq.topic
- Headers exchange,默认预定义名称是
amq.match
,在 RabbitMQ 中又叫amq.headers
exchage 重要的属性:
- Name
- Durability (exchanges survive broker restart)
- Auto-delete (exchange is deleted when last queue is unbound from it)
- Arguments (optional, used by plugins and broker-specific features)
Default Exchange
Default exchange 是一种特殊的 Direct exchange,没有名称,或者说名称是空字符串。 特性:所有的队列自动会和这个 exchange 绑定,使用的 routing key 就是队列的名字。 例如,生产者往一个名为 default exchange 发送消息,带上的路由键是”abc”,消息就会被路由至名为”abc”的队列中。这样看起来就像是直接往 “abc” 队列发送消息一样。
Direct Exchange
特性:Direct Exchange 根据消息路由键将消息发送给相应的队列。
- 队列使用路由键 K 和 exchange 绑定
- 当带有路由键 R 的消息发送到该 exchange 时,如果R=K,exchange 会将这个消息路由至队列
Fanout Exchange
特性:Fanout exchange 会将消息路由给所有和它绑定的队列,忽略路由键。 如果有N个队列绑定,那么一条消息会复制N份,分别发送给这 N 个队列。 Fanout exchange 适合广播的场景。
Topic Exchange
特性:Topic exchange 根据消息的路由键 和将队列与 exchange 绑定的 pattern 之间的匹配关系来将消息路由至一个或者多个队列。
使用场景:组播路由(multicasting route of messages) 多个消费者,每个消费者可以选择自己接收哪种消息。
Headers Exchange
特性:根据消息头而不是路由键来路由消息。
相比于路由键,消息头可以更容易地表达多个属性。
消息头可以有一个或者多个。当有多个时,可以用 x-match
这个header 来指明是要全匹配(all)还是任意匹配(any)。
Queues
队列的重要属性有:
- Name
- Durable (the queue will survive a broker restart)
- Exclusive (used by only one connection and the queue will be deleted when that connection closes)
- Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes)
- Arguments (optional; used by plugins and broker-specific features such as message TTL, queue length limit, etc)
名称: 上限是 255 字节 UTF8 字符。
持久化:Durable 的队列会持久化到磁盘,当中间件重启时会恢复。但是注意:队列里的消息并不会持久化。
bindings:是 exchange 用来将消息路由到队列的规则。bindings 有一个叫做路由键的可选属性。路由键的功能类似于过滤器,选择适合过滤条件的消息发给绑定的队列。
Consumers
消费有两种模式:
- Have messages delivered to them (“push API”)
- Fetch messages as needed (“pull API”)
Message Acknowledgements
消费者可能会处理消息失败,或者消费者自身挂了,或者网络故障。 这些都会引发问题。 那么什么时候中间件才能安全地将消息移除出队列呢? AMQP 协议将决定权交给消费者,即消费者声明一个 message acknowledgement 表示接收到了消息。
有两种 acknowledgement 模式:
- After broker sends a message to an application (using either
basic.deliver
orbasic.get-ok
method). - After the application sends back an acknowledgement (using the
basic.ack
method).
前者叫做 “automatic acknowledgement model”,后者叫做 “explicit acknowledgement model”。 使用显式模型时,由应用来选择什么时候发送 acknowledgement,例如马上、存储到数据库等。
Rejecting Messages
当处理消息失败时,消费者要通知中间件处理失败了或是该时间段没法处理,通知的方式就是拒绝消息。应用可以请求中间件丢弃消息或者将其重新入队。 注意:要避免拒绝-重入队的死循环。
Negative Acknowledgements
拒绝消息是通过 basic.reject
方法实现的。
该方法有一个缺陷:无法拒绝多个消息。
如果是使用 RabbitMQ,有一个解决方法:AMQP 0-9-1 extension known as negative acknowledgements or nacks.
Message Attributes and Payload
AMQP 协议定义了一系列常用的消息属性,包括:
- Content type
- Content encoding
- Routing key
- Delivery mode (persistent or not)
- Message priority
- Message publishing timestamp
- Expiration period
- Publisher application id
这些属性中有一部分就是前面提到的 headers。
payload 指的是消息体,一般是 byte array。中间件不会对消息体做任何改变。 常用的序列化格式有:Json,Thrift,Protocol Buffer,MessagePack 等。 按照惯例,使用 “content-type” 和 “content-encoding” 属性来指明序列化格式。
AMQP 0-9-1 Methods
AMQP 协议规定了一系列的方法,这些方法类似于 HTTP 方法,代表一种操作。 方法使用一系列的 classes 做归类。
以 exchange 这个 class 为例,有以下方法:
exchange.declare
exchange.declare-ok
exchange.delete
exchange.delete-ok
declare
和 delete
是客户端发送的请求,declare-ok
和 delete-ok
是中间件发送的响应。
所有方法参见:amqp-reference
Connections
AMQP 的连接是长连接(long-lived)。 基于 TCP 做可靠地消息发送。
Channels
当应用需要和中间件建立多个连接时,为了减少资源消耗,使用 channel 做连接复用。
Comments