JMS-AMQP

2 minute read

JMS vs AMQP

JMS: 全称是 Java Message Service,是 J2EE 的一部分,由 JSR 914 定义,是和面向消息的中间件(Message Ortiented Middleware, MOM)通信的标准 API。两个 Java 应用使用 JMS client 来进行通信,以实现解耦。常用的 JMS 消息中间件有 ActiveMQ,HornetQ 等。

AMQP:Advanced Message Queuing Protocol,是一种用于传送消息的标准应用层协议。支持消息队列、路由,支持P2P、pub-sub 等模式。常用的 AMQP 中间件有 RabbitMQ,StormMQ,RocketMQ 等。

两者的区别: JMS 是 API,JMS 的客户端实现了这套 API 来和 JMS 的服务端交互,至于交互使用的协议是什么,并没有限制。 而 AMQP 是一种协议,消息的客户端和服务端必须遵循这套协议。 所以,JMS 的客户端可以使用 AMQP 作为和服务端通信的协议。 相比于 JMS,AMQP 没有限制语言、平台、中间件类型,更为灵活。

参考

AMQP 0-9-1 协议

AMQP 0-9-1 协议是一种客户端应用和消息中间件之间通信遵循的协议。

注:

  • 翻译自: amqp-concepts
  • 0-9-1 是指的 AMQP 的版本号。

消息中间件

Message broker (broker:释义为中间人,经济商),从 publisher/producer 应用接收消息,将消息路由给 consumer。

AMQP 0-9-1 模型简介

amqp simple model

  • 消息从生产者发送给 exchanges
  • exchange 将消息的副本分发给 queues,分发的过程中使用的规则叫做 bindings
  • 中间件将消息发送给订阅了 queue 的消费者,或者消费者从 queue 中“拉取”消息

生产者发布消息时,可以指定消息的属性(message meta-data)。这些属性中部分可以被中间件使用,例如用作 binding。

考虑到网络的不可靠性、消费者应用可能会处理消息失败,AMQP 模型提出了 message acknowledgements 的概念:当消息传送到消费者端时,消费者会通知中间件已经收到消息。通知的时机可以是马上,或者由消费者应用的开发者来决定。中间件只有当收到该通知时,才会从队列中移除消息。

在某些情况下,消息无法被路由,可能会被返还给生产者,或者丢弃,或者进入死信队列(dead letter queue)

Exchanges,queues,bindings 被统称为 AMQP entities

AMQP 0-9-1 是一个可编程协议

这句话的意思是,AMQP 0-9-1 的 entities 和路由规则是由应用定义的,而不是中间件管理员。

Exchanges 和 Exchange 类型

Exchange 的职责是:获取消息,然后路由给0个或者多个队列。路由的算法由 exchange 类型bindings 规则决定。 共有四种 exchange 类型:

  1. Direct exchange,默认预定义名称是 (Empty string) and amq.direct
  2. Fanout exchange,默认预定义名称是 amq.fanout
  3. Topic exchange,默认预定义名称是 amq.topic
  4. Headers exchange,默认预定义名称是 amq.match,在 RabbitMQ 中又叫 amq.headers

exchage 重要的属性:

  • Name
  • Durability (exchanges survive broker restart)
  • Auto-delete (exchange is deleted when last queue is unbound from it)
  • Arguments (optional, used by plugins and broker-specific features)

Default Exchange

Default exchange 是一种特殊的 Direct exchange,没有名称,或者说名称是空字符串。 特性:所有的队列自动会和这个 exchange 绑定,使用的 routing key 就是队列的名字。 例如,生产者往一个名为 default exchange 发送消息,带上的路由键是”abc”,消息就会被路由至名为”abc”的队列中。这样看起来就像是直接往 “abc” 队列发送消息一样。

Direct Exchange

特性:Direct Exchange 根据消息路由键将消息发送给相应的队列。

  • 队列使用路由键 K 和 exchange 绑定
  • 当带有路由键 R 的消息发送到该 exchange 时,如果R=K,exchange 会将这个消息路由至队列

direct exchange routing

Fanout Exchange

特性:Fanout exchange 会将消息路由给所有和它绑定的队列,忽略路由键。 如果有N个队列绑定,那么一条消息会复制N份,分别发送给这 N 个队列。 Fanout exchange 适合广播的场景。

fanout exchange routing

Topic Exchange

特性:Topic exchange 根据消息的路由键将队列与 exchange 绑定的 pattern 之间的匹配关系来将消息路由至一个或者多个队列。

使用场景:组播路由(multicasting route of messages) 多个消费者,每个消费者可以选择自己接收哪种消息。

Headers Exchange

特性:根据消息头而不是路由键来路由消息。 相比于路由键,消息头可以更容易地表达多个属性。 消息头可以有一个或者多个。当有多个时,可以用 x-match 这个header 来指明是要全匹配(all)还是任意匹配(any)。

Queues

队列的重要属性有:

  • Name
  • Durable (the queue will survive a broker restart)
  • Exclusive (used by only one connection and the queue will be deleted when that connection closes)
  • Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes)
  • Arguments (optional; used by plugins and broker-specific features such as message TTL, queue length limit, etc)

名称: 上限是 255 字节 UTF8 字符。

持久化:Durable 的队列会持久化到磁盘,当中间件重启时会恢复。但是注意:队列里的消息并不会持久化。

bindings:是 exchange 用来将消息路由到队列的规则。bindings 有一个叫做路由键的可选属性。路由键的功能类似于过滤器,选择适合过滤条件的消息发给绑定的队列。

Consumers

消费有两种模式:

  • Have messages delivered to them (“push API”)
  • Fetch messages as needed (“pull API”)

Message Acknowledgements

消费者可能会处理消息失败,或者消费者自身挂了,或者网络故障。 这些都会引发问题。 那么什么时候中间件才能安全地将消息移除出队列呢? AMQP 协议将决定权交给消费者,即消费者声明一个 message acknowledgement 表示接收到了消息。

有两种 acknowledgement 模式

  • After broker sends a message to an application (using either basic.deliver or basic.get-ok method).
  • After the application sends back an acknowledgement (using the basic.ack method).

前者叫做 “automatic acknowledgement model”,后者叫做 “explicit acknowledgement model”。 使用显式模型时,由应用来选择什么时候发送 acknowledgement,例如马上、存储到数据库等。

Rejecting Messages

当处理消息失败时,消费者要通知中间件处理失败了或是该时间段没法处理,通知的方式就是拒绝消息。应用可以请求中间件丢弃消息或者将其重新入队。 注意:要避免拒绝-重入队的死循环。

Negative Acknowledgements

拒绝消息是通过 basic.reject 方法实现的。 该方法有一个缺陷:无法拒绝多个消息。 如果是使用 RabbitMQ,有一个解决方法:AMQP 0-9-1 extension known as negative acknowledgements or nacks.

Message Attributes and Payload

AMQP 协议定义了一系列常用的消息属性,包括:

  • Content type
  • Content encoding
  • Routing key
  • Delivery mode (persistent or not)
  • Message priority
  • Message publishing timestamp
  • Expiration period
  • Publisher application id

这些属性中有一部分就是前面提到的 headers。

payload 指的是消息体,一般是 byte array。中间件不会对消息体做任何改变。 常用的序列化格式有:Json,Thrift,Protocol Buffer,MessagePack 等。 按照惯例,使用 “content-type” 和 “content-encoding” 属性来指明序列化格式。

AMQP 0-9-1 Methods

AMQP 协议规定了一系列的方法,这些方法类似于 HTTP 方法,代表一种操作。 方法使用一系列的 classes 做归类。

以 exchange 这个 class 为例,有以下方法:

  • exchange.declare
  • exchange.declare-ok
  • exchange.delete
  • exchange.delete-ok

declaredelete 是客户端发送的请求,declare-okdelete-ok 是中间件发送的响应。

所有方法参见:amqp-reference

Connections

AMQP 的连接是长连接(long-lived)。 基于 TCP 做可靠地消息发送。

Channels

当应用需要和中间件建立多个连接时,为了减少资源消耗,使用 channel 做连接复用。

Comments