# 生产者确认
小生产者将消息发送出去之后,消息到底有没有正确到达服务器?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的。也就是说默认情况下,生产者不知道消息是否正确到达服务器。
RabbitMQ 针对这个问题,提供了两种解决方式:
- 事物机制
- 发送方确认(publisher confirm)机制
# 事物机制
RabbitMQ 客户端中与事物机制相关的方法有三个:
channel.txSelect
:将当前的信道设置为 事物模式channel.txCommit
:提交事物channel.txRollback
:回滚事物
开启事物(设置为事物模式)后,可以发送消息,然后提交事物,如果在提交事物前 RabbitMQ 异常崩溃或则其他原因抛出异常,可以将其捕获,然后再执行回滚事物。
这里事物与数据库中的事物概念并不同。
// 开启事物
channel.txSelect();
// 发送消息
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
// 提交事物
channel.txCommit();
2
3
4
5
6
AMQP 协议流转过程如下图所示
- 客户端发送 Tx.Select 开启事物
- Broker 回复 Tx.Select-Ok,确认事物开启
- 发送消息后,客户端 Tx.Commit 提交事物
- Broker 回复 Tx.Commit-Ok ,确认事物提交
事物回滚的代码则如下所示
try {
channel.txSelect();
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
// 故意制造异常
int result = 1 / 0;
channel.txCommit();
} catch (Exception e) {
// 回滚事物1
channel.txRollback();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
交互流程如下
如果要发送多条消息,则如下做
channel.txSelect();
for (int i = 0; i < 10; i++) {
try {
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
}
2
3
4
5
6
7
8
9
10
对于这里笔者其实有点懵,事物模式什么时候退出?这里的 select 并不是和 commit 是一一对应的。
事物机制解决了 消息发送方 和 RabbitMQ 之间消息确认的问题,只有消息成功被 RabbitMQ 接收,事物才能提交成功。但是事物机制对 RabbitMQ 的性能影响很大。所以提供了一个改进机制:发送方确认机制
# 发送方确认机制
在 AMQP 协议层面提供了事物机制解决消息是否有正确到达 RabbitMQ 这个问题,但是事物机制会严重降低 RabbitMQ 的消息吞吐量。
发送方确认机制(publisher confirm)是一种轻量级的方式。
生产者将信道设置为 confirm(确认)模式,所有再该信道上 发布的消息 都会被 指派一个唯一的 ID(从 1 开始),当消息被匹配到队列后,RabbitMQ 会 发送一个确认(Basic.ack)给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达了目的地了。如果消息和队列是 可持久化的,那么确认消息会在 消息写入磁盘后 发出。
RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外还可以设置 channel.basicAck 方法中的 multple 参数
,表示在这个序号之前的所有消息都已经得到了处理。
如上图所示:生产者发送了 3 条消息,开启了 mutilple,那么 RabbitMQ 回调确认时,deliveryTag = 3,表示前面的 2 条消息,包含自己都已经正确到达 RabbitMQ 了。
事物机制:发送方是同步的
发送一条消息后,发送端阻塞,等待 RabbitMQ 回应后,才能继续下一条消息的发送
发送方确认机制:可以是异步的
发送方确认有两种方式:
- 同步方式:发送消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回
- 异步方式:提供一个回调方法,服务端确认了一条或多条消息后客户端会收到回调事件
# 同步方式
与事物机制使用类似,不过可以发布多条消息,再等待确认
// 将信道设置为生产者确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
// 发布消息
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
try {
if (channel.waitForConfirms()) {
// 发送的 5 条消息成功
} else {
// 发送失败,这一批消息都需要重发?
// 不清楚他的机制,全部失败?
}
} catch (InterruptedException e) {
e.printStackTrace();
// 发送失败,这一批消息都需要重发?
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
可以看到如上的代码,批量发送后再等待并确认发送成功。但是有一个问题,当失败时(Basic.Nack 或 超时)时,就需要将这一批消息重新发送;这种情况过多的时候,性能不升返降
# 异步方式
通过添加 ConfirmListener 的方式,来知晓是否成功还是失败
final Channel channel = connection.createChannel();
// 将信道设置为生产者确认模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
/**
* 处理 RabbitMQ 的 Basic.Ack 指令
* @param deliveryTag 那一条消息
* @param multiple
* @throws IOException
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("发送成功");
}
/**
* 处理 RabbitMQ 的 Basic.Nack 指令
* @param deliveryTag
* @param multiple
* @throws IOException
*/
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("发送失败");
// 可进行消息的重发处理
}
});
for (int i = 0; i < 10; i++) {
// 发布消息
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
channel.basicPublish("", reqestQueue, properties, "message".getBytes());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
在性能方面,QPS 对比如下
普通 confirm 就是发送一条就 waitForConfirms 一次。可见异步方式效率是最高的