[TOC]

本文我们使用ActiveMQ实现简单的点对点的消息模型。

一、开发环境

这里我使用的是 apache-activemq-5.11.1 可以去官网下载

  • jdk1.8
  • idea

二、新建java项目

这里写图片描述

其中activemq-all-5.11.1.jar在下载下来的 apache-activemq-5.11.1 中就有,直接复制过来导入项目即可。

三、具体实现

JMSProducer:消息的生产者
JMSConsumer:消息的消费者

大致步骤
(1)创建连接工厂
(2)使用连接工厂创建一个连接
(3)启动连接
(4)使用连接创建一个会话
(5)使用会话创建一个队列/主题
(6)使用会话创建一个生产者/消费者
(7)使用会话创建一个消息/对象/集合/文件/字节
(8)使用生产者/消费者 发送/获取 消息

1、编写生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @author
* @version 1.0
* @name JMS生产者
* @description 消息的生产者类
* @date 2018/4/11 0011.
*/
public class JMSProducer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args){
//连接工厂
ConnectionFactory connectionFactory;
//连接
Connection connection = null;
//会话
Session session;
//目的地
Destination destination;
//生产者
MessageProducer messageProducer;

/**
* 编写生产者的步骤
*/
try {
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.URL);
//使用连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//使用连接创建获取会话
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//使用会话连接一个队列作为目的地,如果这个队列不存在将会被创建
destination = session.createQueue("HelloWorld");
//使用会话创建消息生产者
messageProducer = session.createProducer(destination);

//发送消息
sendMessage(session,messageProducer);
//支持事务则必须提交
session.commit();

} catch (JMSException e) {
e.printStackTrace();
}finally {
if (connection!=null){
try {
//关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}

/**
* 生产者发送消息
* @param session 会话
* @param messageProducer 生产者
*/
public static void sendMessage(Session session,MessageProducer messageProducer){
try {
//使用会话创建一条文本消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
TextMessage textMessage = session.createTextMessage("你好,世界!");
//通过消息生产者发出消息
messageProducer.send(textMessage);
System.out.println("已发送消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

}

2、编写消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @author .
* @version 1.0
* @name
* @description
* @date 2018/4/11 0011.
*/
public class JMSConsumer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args){
//连接工厂
ConnectionFactory connectionFactory;
//连接
Connection connection = null;
//会话
Session session;
//目的地
Destination destination;
//消息的消费者
MessageConsumer messageConsumer;

/**
* 消息的消费者编写
*/

try {
//实例化工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.URL);
//使用实例工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//使用连接获取会话
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//使用会话连接一个队列作为目的地,如果这个队列不存在将会被创建
destination = session.createQueue("HelloWorld");
//使用会话获取消费者
messageConsumer = session.createConsumer(destination);


/**
*获取消息
*/
/*同步实现*/
//设置接收者接收消息的时间,为了便于测试,这里定为50s,接收到消息之前(或超时之前)将一直阻塞
/*TextMessage textMessage = (TextMessage) messageConsumer.receive(50000);
if (textMessage!=null){
System.out.println("收到的消息是:" + textMessage.getText());
}else {
System.out.println("没有收到消息");
}*/
/*异步实现*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
String text = ((TextMessage)message).getText();
System.out.println("收到的消息是:" +text );
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}

3、session与事务处理

session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

第一个参数

要使用事务处理,必须通过作为第一个参数设置为true来创建一个事务处理会话

事务处理允许您将整个系列的传入和传出消息分组在一起,并将它们视为原子单元。消息代理跟踪事务的各个消息的状态,但在您提交事务之前不会完成它们的传送。在发生故障时,您可以回滚事务,取消其所有消息并从头开始重新启动整个系列。
事务处理会话总是只有一个打开的事务,包含自会话创建或前一个事务完成以来发送或接收的所有消息。提交或回滚事务会结束该事务并自动开始另一个事务。


当交易中的所有消息都已成功交付时,您可以调用会话的commit方法来提交交易

session.commit();

所有会话的传入消息都会被确认,并且所有传出的消息都将被发送。交易被视为完成,并开始新的交易。

发送或接收操作失败时,会引发异常。虽然可以通过忽略或重试操作来处理异常,但建议您使用会话的rollback方法回退事务:

session.rollback();

第二个参数

值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。

4、连接工厂 ConnectionFactory

connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.URL);

activemq默认是不需要密码,生产消费者就可以连接的
实例化连接工厂时我们可以去掉用户名与密码,同样可以连接到ActiveMQ
如果ActiveMQ是部署在你本地的,则默认的用户名为admin,默认密码也为admin,地址为:tcp://localhost:61616 我们可以直接从ActiveMQConnection 中 取得默认的用户名,密码与地址

5、关于同步与异步

  • 同步

    订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞

  • 异步

    订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。上面消费者的代码中使用的是这种方法

四、运行

1、启动ActiveMQ

ActiveMQ运行

如果你的64bit机器,则在这个目录下打开Activemq.bat 
32bit机器可以执行 bin\win32\activemq.bat

打开后出现以下窗口后,表示启动成功,这个窗口不能关闭
ActiveMQ打开窗口

打开ActiveMQ的管理界面:http://127.0.0.1:8161/admin/
ActiveMQ管理页面

分别点击队列与主题可以查看发送到ActiveMQ的队列消息或主题

2、运行生产者

ActiveMQ 启动好后,我们就可以运行生产者向ActiveMQ发送消息了
运行生产者

运行结束后,我们查看一下ActiveMQ的队列
查看队列

表示ActiveMQ自动创建一个名为”HelloWorld“的队列,队列已经收到了一条消息,暂时没有消费者,我们也可以在ActiveMQ中对消息进行浏览删除等操作

3、运行消费者

这里写图片描述
这里我们使用异步接收,生产者再次发送消息时同样可以接收到。
如果使用同步接收,在规定的时间超时后会程序停止,关闭连接。

看看ActiveMQ中的队列发生的变化:

接收消息后

可以看到待处理消息已为0,出队消息加1,有一个消费者在线。

官方文档参考: https://docs.oracle.com/cd/E26576_01/doc.312/e24945/toc.htm