三 JMS发布 订阅模型--ActiveMQ简单应用
[TOC]
上一篇文章 《二、JMS 点对点模型 – ActiveMQ简单实现》 我们实现了JMS点对点模型的实例,本章对第二种 发布/订阅 模型来做一个简单的实例。
其实发布/订阅 模型与点对点模型的实现方式基本一致,因此这里就不写完整的过程了。
一、开发环境
与上篇文章相同
二、java项目
与上篇文章相同
三、具体实现
1、编写发布者
发布者的代码与上篇文章基本相同,不同的是 使用session 创建是的主题,而不是队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args){ ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageProducer messageProducer;
try { connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("HelloWorld.Topic"); messageProducer = session.createProducer(destination); sendMessage(session,messageProducer); session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally { if (connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
public static void sendMessage(Session session,MessageProducer messageProducer){ try { TextMessage textMessage = session.createTextMessage("你好,世界! by Topic"); messageProducer.send(textMessage); System.out.println("已发送主题消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }
}
|
2、编写订阅者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args){ ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageConsumer messageConsumer;
try { connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("HelloWorld.Topic"); messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { String text = ((TextMessage)message).getText(); System.out.println("收到的消息 :" +text ); } catch (JMSException e) { e.printStackTrace(); } } });
} catch (JMSException e) { e.printStackTrace(); } }
}
|
3、关于持久订阅模式
在第一篇文章 《一、JMS概述》中我们提到:
发布/订阅模型还支持持久订阅的概念,在消息发布时,注册了主题的消费者不需要处于活动状态; 当消费者随后变得活跃时,它将收到消息。如果没有活动使用者注册主题,则该主题不会持有它收到的消息,除非它具有持久订阅的不活动消费者。
主要是业务场景如下:
A系统通过MQ推送数据到B系统。通过发布订阅的消息传送模型。由于涉及到的数据比较重要:比如是关于资金、交易、股票价格的信息。要保证B系统一定收到A系统发送的消息,考虑B系统会断电重启之类异常,故设置持久订阅模式。可以保证在B订阅A主题后,因为断电,订阅者状态变为不活动的。在B系统重启后,依然可以收到消息。
实现持久订阅模式与普通的发布订阅模式一样,主要的不同是必须设置唯一的客户端ID和订阅者ID。
1、在连接启动前设置设置客户端ID
2、使用createDurableSubscriber 创建订阅者并指定订阅者ID
1 2 3 4
| connection.setClientID("client1");
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"sub1");
|
四、运行
1、启动ActiveMQ
与上一篇文章相同
2、运行程序
首先我们运行一下发布者。发布成功后我们可以在ActiveMQ 的主题中看到我们创建出来的主题消息
我们再运行一下订阅者,会发现订阅者一直在待接收消息,并没有输出我们刚刚发布的主题消息。这是因为 发布/订阅 模型 的特点:发布端在发布消息时,如果没有订阅端在线,则不会保留消息,将会认为消息已经发送。
因此我们可以先运行订阅者的代码,启动一个订阅者。为了体现发布/订阅模式一对多的特点,我们再启动第二个订阅者。可以在ActiveMQ中看到在两个订阅者在线了。
我们再启动发布者,发布一个消息。在线的两个订阅者就可以接收到我们刚刚发布的消息了。
我们再设想一下,其中一个订阅者断电下线了,如果再有消息发布,则待它再次上线时已经接收不到第二次发布的消息了。为了解决一个问题,我们可以使用持久订阅模式。
按照上面 持久订阅修改代码后重新启动两个订阅者,注意的时这两个订阅者的客户端ID与订阅者ID都必须唯一。
启动发布者发布一个消息 ,可以看到两个订阅者分别都接收到了发布的消息。
这时我们关闭 订阅者 Client1 ,再发布一个消息。这时Client2接收到了。当我们启动订阅者 Client1后,它也能够收到第二次发布的消息
至此,我们实现了JMS发布/订阅模型,并使用了持久订阅模式关于持久订阅我们需要注意的是:
很多情况下,持久化订阅非常有用,但有的时候并非如此。虽然使用持久还是非持久通常由业务决定。但是,我们还必须考虑消息消耗的存储容量。比如有一个持久订阅者长期处于不活动的状态,那么jms服务器就必须为这个订阅者存储数以千计、万计的无用信息,浪费JMS数据仓库的宝贵空间。因为,我们必须得考虑这个问题。