# 消费消息

RabbitMQ 消费模式分两种:

  • Push:推模式

    采用 Basic.Consume 进行消费

  • Pull:拉模式

    则使用 Basic.Get 进行消费

# 推模式

在推模式中,可以通过持续订阅的方式来消费消息,相关消费类如下:

  • com.rabbitmq.client.DefaultConsumer
  • com.rabbitmq.client.Consumer

接收消息,一般实现 Consumer 接口,或则继承 DefaultConsumer 来实现。

推模式的关键代码如下

boolean autoAck = false;
// 设置客户端最多接收未被 ack 的消息个数
channel.basicQos(64);
channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        final String routingKey = envelope.getRoutingKey();
        final String contentType = properties.getContentType();
        final long deliveryTag = envelope.getDeliveryTag();
        // 处理消息
        channel.basicAck(deliveryTag, false);
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  • consumerTag:不同的订阅有不同的标签
  • autoAck:在订阅队列时,设置了 false,在接收处理消息后,显示的 ack 操作。可以防止消息不必要的丢失

basicConsume 的全参数说明如下

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
1
  • queue:订阅的队列名称
  • autoAck:是否自动确认,建议设置成 false
  • consumerTag:消费者标签,用来区分多个消费者
  • noLocal:设置为 true,则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者
  • exclusive:是否排他
  • arguments:消费者的其他参数
  • callback:回调函数。用来处理 RabbitMQ 推送过来的消息。

实现回调函数时,重新 handleDelivery 方法,对客户端消费者来说很方便。更复杂的消费者客户端会重新更多的方法。

  • void handleConsumeOk(String consumerTag);

    会在其他方法之前调用,返回消费者标签

  • void handleCancelOk(String consumerTag);

    显示的取消一个消费者订阅时被调用,比如调用 channel.basicCancel(consumerTag),该 basicCancel 方法,触发顺序是: handleConsumeOk、handleDelivery 、handleCancelOk

  • void handleCancel(String consumerTag) throws IOException;

    隐式的取消消费者订阅时调用。

  • void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    当 Channel 或则 Connection 关闭时调用

  • void handleRecoverOk(String consumerTag);

和生产者一样,建议每个线程拥有自己的 Channel ,不要线程共享。业务是线程不安全的。

# 拉模式

拉模式是通过以下方式主动获取消息,消费消息

// 同样还是 channel 类
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
1
2
  • queue:队列名称
  • autoAck:如果设置为 false,则需要调用 channel.basicAck 来确认消息被消费

如下面的消费示例:

 final GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
1
2
3

注意是:basicGet 是一次获取一条消息,而推模式可以通过 channel.basicQos(3); 设置一次让服务器发送多少条消息。所以不要在循环中使用拉模式,这严重影响性能。