[TOC]

上一篇文章[[RabbitMQ学习系列 二 “Hello World”]]记录了一个简单的rabbitmq 发布接收队列消息,但没有使用路由。本篇写一写rabbitmq的路由的使用。

介绍

有几个概念介绍一下
1、生产者

​ 生产者是发送消息的用户的应用程序

2、路由

​ 处理生产者消息发到哪个队列

3、队列

队列是存储消息的缓冲器

4、消费者

消费者是接收消息的用户的应用程序

消息模型

RabbitMQ中的消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列中。实际上,生产者通常甚至不知道消息是否会被传送到任何队列中。我们的消息实际上是从生产者传递到路由,路由会绑定队列并指定绑定的routingKey。根据routingkey匹配到这个路由上绑定的队列,并向队列发送消息,不能匹配上的队列则不会收到该消息。所以我们上一篇文章中虽然没有明确定义路由,实际上是使用是默认的路由。我们可以根据需求自己声明相应的路由。

路由(Exchange)

声明方式

exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)

exchange :路由名
type : 路由类型
durable: 是否持久化
autoDelete:是否自动删除
arguments: 其它参数

类型

1、Fanout
​ > 广播。这个类型的路由会忽略routingKey。收到生产者消息后会直接发送给绑定在该路由上的所有队列

2、Direct

单播。该类型的路由会根据routingKey去匹配队列,将消息发送给该路由上绑定的并且routingKey完全匹配上的队列

3、Topic

多播。该类型路由的routingKey可以使用通配符进行匹配。即 * 代表一个单词,# 代表多个单词。routingKey 的定义不能是任意字符,只能是由点号分隔的字符串,如: “ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。
如下图
Q1队列的与路由绑定的routingKey 是****.orange.****
Q2队列的与路由绑定的routingKey 是 ..rabbit 与 lazy.#
如果我们发送消息时指定的routingKey为:quick.orange.rabbit ,则消息会被路由发送到Q1与Q2两个队列中。
如果我们发送消息时指定的routingKey为:lazy.brown.fox ,则消息会被路由发送到Q2队列
如果指定的routingKey为 lazy.pink.rabbit,也会被发送到Q2,但只会发送一次,即使它匹配到了两个绑定的routingKey
示例

4、Headers

这种类型的路由不处理路由键,而是根据发送消息的Headers属性进行匹配,在队列绑定交换机的时候会指定一组键对值;

绑定队列

queueBind(String queue, String exchange, String routingKey)

前面我们说过 RabbitMQ中的消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列中。所以我们每个队列都需要绑定在一个路由上。从生产者发送到该路由的消息只会被传递到与路由绑定的队列上。绑定时还需要指定一个routingKey,路由根据发布消息时传递过来的routintKey来匹配到相应队列并传送消息到该队列中。
queue: 队列名
exchange: 路由名
routingKey: 队列与路由绑定的key

编写生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class EmitLogTopic {
private final static String EXCHANGE_NAME="topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message ="Hello World! This is error info!";
/* 我们使用的是topic类型的路由,第二个参数为routingKey*/
channel.basicPublish(EXCHANGE_NAME,"rabbit.log.error",null,message.getBytes());

channel.close();
connection.close();
}
}

发送消息时我们指定的routingKey为 rabbit.log.error 因为是topic类型的路由,所以需要用点分隔的形式写routingKey ,如果是Direct类型则不需要。

编写消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ReceivedLogsTopic {
private final static String EXCHANGE_NAME="topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queuesName = channel.queueDeclare().getQueue();
/*将队列绑定路由并定义topic类型路由的匹配规则*/
channel.queueBind(queuesName,EXCHANGE_NAME,"*.log.*");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("[x] recv:" + message);
}
};
channel.basicConsume(queuesName,true,consumer);
}
}

这里队列绑定topic 类型路由时指定的匹配规则为 .log.
前面生产者发送时的routingKey 为 rabbit.log.error 所以这条消息会被路由监听到。 如果我们将绑定的匹配规则修改为 log.# 并重新启动一个消费者B,通过生产者再次发送消息,则消费者B不会监听到这条消息,因为routingKey 无法匹配上,路由不会把这条消息传递给消费者B

*** 注意在测试时是需要先启动消费者,再启动生产者。因为如果没有消费者在线,消息会被rabbitMq丢弃处理 **