基于maven的Spring+ActiveMQ整合Demo

本文主要是示范基于Maven的ActiveMQ+Spring的简单使用,基于ActiveMQ消息代理的Spring JMS消息配置,以及定时任务的使用。

JMS简介

JMS提供了应用之间的异步通信机制,当异步发送消息时,客户端不需要等待服务端处理消息结果。

同步通信与异步通信

构建JMS

两个主要概念:消息代理(message broker)和目的地(destination)。

当一个应用发送消息时,会把消息交给一个消息代理。消息代理实际上是JMS版的邮局。消息代理可以确保消息被投递到指定的目的地,同时释放发送者,使其能够继续其他的业务。

目的地就像一个邮箱,可以将消息放入邮箱,直至有人将其取走。JMS中有两种类型的目的地:队列主题。,分别应用于队列的点对点模型和主题的发布/订阅模型。

点对点消息模型

每一个消息都有一个发送者和一个接收者。
点对点消息模型
当消息代理得到消息后,会将消息放入到消息队列中,接收者请求队列中的下一条消息时,该消息就会从队列中取出,投递给接收者。因为消息投递后会从队列中删除,因此可以保证消息只投递给一个接收者。

可以使用多个接收者来处理队列中的消息,不过每个接收者都会处理自己接收到的消息,需要多个接收者监听队列。

发布-订阅消息模型

消息会发送给一个主题,多个接收者可以监听一个主题。与队列不同的是,消息不再是只投递给一个接收者,所有主题的订阅者都可以接收到此消息。
发布-订阅消息模型

JMS的优点

  • 无需等待
  • 面向消息和解耦
  • 位置独立
  • 确保投递

在Spring中搭建消息代理

安装ActiveMQ

ActiveMQ是一个开源的消息代理, 也是使用JMS进行异步消息传递的最佳选择。在官方网站下载后,解压缩安装包,点击apache-activemq-5.12.1\bin\activemq.bat运行即可(64位操作系统可能需要点击apache-activemq-5.12.1\bin\win64\activemq.bat运行)。运行后进入http://localhost:8161/表明安装成功, 这时就可以使用它进行消息代理了。

maven配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.samples.service.service</groupId>
<artifactId>activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>

<properties>

<!-- Generic properties -->
<java.version>1.6</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!-- Web -->
<jsp.version>2.2</jsp.version>
<jstl.version>1.2</jstl.version>
<servlet.version>2.5</servlet.version>


<!-- Spring -->
<spring-framework.version>3.2.3.RELEASE</spring-framework.version>

<!-- Logging -->
<logback.version>1.0.13</logback.version>
<slf4j.version>1.7.5</slf4j.version>

<!-- Test -->
<junit.version>4.11</junit.version>

</properties>

<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring-framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring-framework.version}</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>1.8.5</version>
</dependency>
<!-- Spring MVC -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring-framework.version}</version>
</dependency>

<!-- Other Web dependencies -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<version>${jstl.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>${servlet.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>${jsp.version}</version>
<scope>provided</scope>
</dependency>

<!-- Spring and Transactions -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring-framework.version}</version>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>jsr250-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-framework.version}</version>
</dependency>

</dependencies>
</project>

Spring配置

使用JMS连接工厂通过消息代理发送消息,因为选择了ActiveMQ作为消息代理,因此需要配置JMS连接工厂,让它知道如何连接到ActiveMQ。ActiveMQConnectionFactory是ActiveMQ自带的连接工厂,可以在Spring中进行配置。

使用JmsTemplate可以创建连接、获得会话以及发送和接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

<!-- Activemq 连接工厂 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg value="system1" />
<constructor-arg value="manager1" />
<constructor-arg value="failover:(tcp://localhost:61616)?timeout=2000" />
</bean>

<!-- ConnectionFactory Definition -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="activeMQConnectionFactory" />
</bean>

<!-- Default Destination Queue Definition -->
<!-- 测试配置多个Destination -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="test.activemq.queue" />
</bean>

<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>

<!-- Message Sender Definition -->
<bean id="messageSender" class="activemq.publisher.MessageSender">
<constructor-arg index="0" ref="jmsTemplate" />
<constructor-arg index="1" ref="destination" />
</bean>

<!-- 消息监听器,主要监听的目的地址 Message Receiver Definition -->
<bean id="messageReceiver" class="activemq.consumer.MessageReceiver">
</bean>
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="test.activemq.queue" />
<property name="messageListener" ref="messageReceiver" />
</bean>

</beans>

发送消息

当调用JmsTemplatesend()方法,JmsTemplate将负责JMS连接、会话并代表发送者发送消息。这里使用了默认的消息目的地。

JmsTemplate发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package activemq.publisher;

import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;

public class MessageSender {

private final JmsTemplate jmsTemplate;
private final Destination destination;

public MessageSender(final JmsTemplate jmsTemplate, final Destination destination) {
this.jmsTemplate = jmsTemplate;
this.destination = destination;
}

public void send(final String text) {
try {
jmsTemplate.setDefaultDestination(destination);
jmsTemplate.convertAndSend(text);
System.out.println("发送消息 : " + text);
} catch (Exception e) {
e.printStackTrace();
}
}
}

接收消息监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package activemq.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MessageReceiver implements MessageListener {

public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println("接收到消息: " + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}

测试

使用Spring的定时任务定时发送消息。
定时任务配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">

<context:annotation-config />
<bean id="QuartzFactoryBean"
class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="capacityDataPublisherJobTrigger" />
</list>
</property>
</bean>

<bean id="capacityDataPublisherJob" class="activemq.TestSenderService"
init-method="run">
</bean>

<bean id="capacityDataPublisherJobTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
<property name="jobDetail" ref="capacityDataPublisherJobDetail" />
<property name="cronExpression">
<value>0 0/5 * * * ?</value>
</property>
</bean>
<bean id="capacityDataPublisherJobDetail"
class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<property name="targetObject" ref="capacityDataPublisherJob" />
<property name="targetMethod" value="run" />
<property name="concurrent" value="false" />
</bean>

</beans>

定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package activemq;

import org.springframework.beans.factory.annotation.Autowired;
import activemq.publisher.MessageSender;

public class TestSenderService {

@Autowired
private MessageSender messageSender;

public void run() {
messageSender.send("message");
}

}

测试结果

在tomcat中运行项目。
运行后发送了两条消息,消息队列中显示:
此处输入图片的描述

重启项目时,接收消息监听器会处理队列中所有的消息,项目运行时,每次发送消息成功后都会触发接收消息监听器:
此处输入图片的描述

入列和出列:
此处输入图片的描述


代码获取地址:
https://github.com/hoxis/JavaWeb/tree/master/activemq

hoxis wechat
一个脱离了高级趣味的程序员,关注回复1024有惊喜~
赞赏一杯咖啡
0%