# 消费消息
RabbitMQ 消费模式分两种:
Push:推模式
采用 Basic.Consume 进行消费
Pull:拉模式
则使用 Basic.Get 进行消费
# 推模式
在推模式中,可以通过持续订阅的方式来消费消息,相关消费类如下:
com.rabbitmq.client.DefaultConsumer
com.rabbitmq.client.Consumer
接收消息,一般实现 Consumer 接口,或则继承 DefaultConsumer 来实现。
推模式的关键代码如下
boolean autoAck = false;
// 设置客户端最多接收未被 ack 的消息个数
channel.basicQos(64);
channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
final String routingKey = envelope.getRoutingKey();
final String contentType = properties.getContentType();
final long deliveryTag = envelope.getDeliveryTag();
// 处理消息
channel.basicAck(deliveryTag, false);
}
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- consumerTag:不同的订阅有不同的标签
- autoAck:在订阅队列时,设置了 false,在接收处理消息后,显示的 ack 操作。可以防止消息不必要的丢失
basicConsume 的全参数说明如下
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
- queue:订阅的队列名称
- autoAck:是否自动确认,建议设置成 false
- consumerTag:消费者标签,用来区分多个消费者
- noLocal:设置为 true,则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者
- exclusive:是否排他
- arguments:消费者的其他参数
- callback:回调函数。用来处理 RabbitMQ 推送过来的消息。
实现回调函数时,重新 handleDelivery 方法,对客户端消费者来说很方便。更复杂的消费者客户端会重新更多的方法。
void handleConsumeOk(String consumerTag);
会在其他方法之前调用,返回消费者标签
void handleCancelOk(String consumerTag);
显示的取消一个消费者订阅时被调用,比如调用
channel.basicCancel(consumerTag)
,该 basicCancel 方法,触发顺序是: handleConsumeOk、handleDelivery 、handleCancelOkvoid handleCancel(String consumerTag) throws IOException;
隐式的取消消费者订阅时调用。
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
当 Channel 或则 Connection 关闭时调用
void handleRecoverOk(String consumerTag);
和生产者一样,建议每个线程拥有自己的 Channel ,不要线程共享。业务是线程不安全的。
# 拉模式
拉模式是通过以下方式主动获取消息,消费消息
// 同样还是 channel 类
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
2
- queue:队列名称
- autoAck:如果设置为 false,则需要调用 channel.basicAck 来确认消息被消费
如下面的消费示例:
final GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
2
3
注意是:basicGet 是一次获取一条消息,而推模式可以通过 channel.basicQos(3);
设置一次让服务器发送多少条消息。所以不要在循环中使用拉模式,这严重影响性能。