[TOC]

上一篇文章 《二、JMS 点对点模型 – ActiveMQ简单实现》 我们实现了JMS点对点模型的实例,本章对第二种 发布/订阅 模型来做一个简单的实例。

其实发布/订阅 模型与点对点模型的实现方式基本一致,因此这里就不写完整的过程了。

一、开发环境

与上篇文章相同

二、java项目

与上篇文章相同

三、具体实现

1、编写发布者

发布者的代码与上篇文章基本相同,不同的是 使用session 创建是的主题,而不是队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @author .
* @version 1.0
* @name JMS生产者
* @description 消息的生产者类
* @date 2018/4/11 0011.
*/
public class JMSProducer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args){
//连接工厂
ConnectionFactory connectionFactory;
//连接
Connection connection = null;
//会话
Session session;
//目的地
Destination destination;
//生产者
MessageProducer messageProducer;

/**
* 编写生产者的步骤
*/
try {
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.URL);
//使用连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//使用连接创建获取会话
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//与点对点唯一不同的地方
//使用会话连接一个主题作为目的地,如果这个主题不存在将会被创建
destination = session.createTopic("HelloWorld.Topic");
//使用会话创建消息发布者
messageProducer = session.createProducer(destination);
//发布主题
sendMessage(session,messageProducer);
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if (connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}

/**
* 发布者发布主题
* @param session 会话
* @param messageProducer 发布者
*/
public static void sendMessage(Session session,MessageProducer messageProducer){
try {
//使用会话创建一条文本消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
TextMessage textMessage = session.createTextMessage("你好,世界! by Topic");
//通过消息发布者发出主题
messageProducer.send(textMessage);
System.out.println("已发送主题消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

}

2、编写订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @author .
* @version 1.0
* @name
* @description
* @date 2018/4/11 0011.
*/
public class JMSConsumer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args){
//连接工厂
ConnectionFactory connectionFactory;
//连接
Connection connection = null;
//会话
Session session;
//目的地
Destination destination;
//消息的消费者
MessageConsumer messageConsumer;

/**
* 消息的消费者编写
*/

try {
//实例化工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.URL);
//使用实例工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//使用连接获取会话
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//使用会话连接一个队列作为目的地,如果这个队列不存在将会被创建
// destination = session.createQueue("HelloWorld");
//使用会话创建一个主题,如果这个主题不存在将会被创建
Topic topic = session.createTopic("HelloWorld.Topic");
//使用会话创建一个订阅者
messageConsumer = session.createConsumer(topic);

/**
*获取消息
*/
/*同步实现*/
//设置接收者接收消息的时间,为了便于测试,这里定为50s,接收到消息之前(或超时之前)将一直阻塞
/*TextMessage textMessage = (TextMessage) messageConsumer.receive(50000);
if (textMessage!=null){
System.out.println("收到的消息是:" + textMessage.getText());
}else {
System.out.println("没有收到消息");
}*/
/*异步实现*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
String text = ((TextMessage)message).getText();
System.out.println("收到的消息 :" +text );
} catch (JMSException e) {
e.printStackTrace();
}
}
});

} catch (JMSException e) {
e.printStackTrace();
}
/*需要异步,则不关闭连接*/
}

}

3、关于持久订阅模式

在第一篇文章 《一、JMS概述》中我们提到:

发布/订阅模型还支持持久订阅的概念,在消息发布时,注册了主题的消费者不需要处于活动状态; 当消费者随后变得活跃时,它将收到消息。如果没有活动使用者注册主题,则该主题不会持有它收到的消息,除非它具有持久订阅的不活动消费者。

主要是业务场景如下:

A系统通过MQ推送数据到B系统。通过发布订阅的消息传送模型。由于涉及到的数据比较重要:比如是关于资金、交易、股票价格的信息。要保证B系统一定收到A系统发送的消息,考虑B系统会断电重启之类异常,故设置持久订阅模式。可以保证在B订阅A主题后,因为断电,订阅者状态变为不活动的。在B系统重启后,依然可以收到消息。

实现持久订阅模式与普通的发布订阅模式一样,主要的不同是必须设置唯一的客户端ID和订阅者ID。

1、在连接启动前设置设置客户端ID
2、使用createDurableSubscriber 创建订阅者并指定订阅者ID
1
2
3
4
//设置客户端ID
connection.setClientID("client1");
//使用会话创建一个订阅者,并指定订阅者ID为 sub1
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"sub1");

四、运行

1、启动ActiveMQ

与上一篇文章相同

2、运行程序

首先我们运行一下发布者。发布成功后我们可以在ActiveMQ 的主题中看到我们创建出来的主题消息
这里写图片描述

我们再运行一下订阅者,会发现订阅者一直在待接收消息,并没有输出我们刚刚发布的主题消息。这是因为 发布/订阅 模型 的特点:发布端在发布消息时,如果没有订阅端在线,则不会保留消息,将会认为消息已经发送。

因此我们可以先运行订阅者的代码,启动一个订阅者。为了体现发布/订阅模式一对多的特点,我们再启动第二个订阅者。可以在ActiveMQ中看到在两个订阅者在线了。
这里写图片描述

我们再启动发布者,发布一个消息。在线的两个订阅者就可以接收到我们刚刚发布的消息了。
我们再设想一下,其中一个订阅者断电下线了,如果再有消息发布,则待它再次上线时已经接收不到第二次发布的消息了。为了解决一个问题,我们可以使用持久订阅模式。
按照上面 持久订阅修改代码后重新启动两个订阅者,注意的时这两个订阅者的客户端ID与订阅者ID都必须唯一。
这里写图片描述
这里写图片描述

启动发布者发布一个消息 ,可以看到两个订阅者分别都接收到了发布的消息。
这时我们关闭 订阅者 Client1 ,再发布一个消息。这时Client2接收到了。当我们启动订阅者 Client1后,它也能够收到第二次发布的消息
这里写图片描述

至此,我们实现了JMS发布/订阅模型,并使用了持久订阅模式关于持久订阅我们需要注意的是:

很多情况下,持久化订阅非常有用,但有的时候并非如此。虽然使用持久还是非持久通常由业务决定。但是,我们还必须考虑消息消耗的存储容量。比如有一个持久订阅者长期处于不活动的状态,那么jms服务器就必须为这个订阅者存储数以千计、万计的无用信息,浪费JMS数据仓库的宝贵空间。因为,我们必须得考虑这个问题。