[Spring整合JMS(一)——基于ActiveMQ實現](http://elim.iteye.com/blog/1893038)
[TOC=1,3]
## 1.1?????JMS簡介
???????JMS的全稱是Java Message Service,即Java消息服務。它主要用于在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話我們可以在特定的時候利用生產者生成一消息,并進行發送,對應的消費者在接收到對應的消息后去完成對應的業務邏輯。對于消息的傳遞有兩種類型,一種是點對點的,即一個生產者和一個消費者一一對應;另一種是發布/訂閱模式,即一個生產者產生消息并進行發送后,可以由多個消費者進行接收。
## 1.2?????Spring整合JMS
???????對JMS做了一個簡要介紹之后,接下來就講一下Spring整合JMS的具體過程。JMS只是一個標準,真正在使用它的時候我們需要有它的具體實現,這里我們就使用Apache的activeMQ來作為它的實現。所使用的依賴利用Maven來進行管理,具體依賴如下:
Xml代碼??
1. dependencies>??
2. ????????dependency>??
3. ????????????groupId>junitgroupId>??
4. ????????????artifactId>junitartifactId>??
5. ????????????version>4.10version>??
6. ????????????scope>testscope>??
7. ????????dependency>??
8. ????????dependency>??
9. ????????????groupId>org.springframeworkgroupId>??
10. ????????????artifactId>spring-contextartifactId>??
11. ????????????version>${spring-version}version>??
12. ????????dependency>??
13. ????????dependency>??
14. ????????????groupId>org.springframeworkgroupId>??
15. ????????????artifactId>spring-jmsartifactId>??
16. ????????????version>${spring-version}version>??
17. ????????dependency>??
18. ????????dependency>??
19. ????????????groupId>org.springframeworkgroupId>??
20. ????????????artifactId>spring-testartifactId>??
21. ????????????version>${spring-version}version>??
22. ????????dependency>??
23. ????????dependency>??
24. ????????????groupId>javax.annotationgroupId>??
25. ????????????artifactId>jsr250-apiartifactId>??
26. ????????????version>1.0version>??
27. ????????dependency>??
28. ????????dependency>??
29. ????????????groupId>org.apache.activemqgroupId>??
30. ????????????artifactId>activemq-coreartifactId>??
31. ????????????version>5.7.0version>??
32. ????????dependency>??
33. dependencies>??
### 1.2.1??activeMQ準備
???????既然是使用的apache的activeMQ作為JMS的實現,那么首先我們應該到apache官網上下載activeMQ([http://activemq.apache.org/download.html](http://activemq.apache.org/download.html)),進行解壓后運行其bin目錄下面的activemq.bat文件啟動activeMQ。
### 1.2.2配置ConnectionFactory
???????ConnectionFactory是用于產生到JMS服務器的鏈接的,Spring為我們提供了多個ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory對于建立JMS服務器鏈接的請求會一直返回同一個鏈接,并且會忽略Connection的close方法調用。CachingConnectionFactory繼承了SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還新增了緩存功能,它可以緩存Session、MessageProducer和MessageConsumer。這里我們使用SingleConnectionFactory來作為示例。
Xml代碼??
1. bean?id="connectionFactory"?class="org.springframework.jms.connection.SingleConnectionFactory"/>??
???????這樣就定義好產生JMS服務器鏈接的ConnectionFactory了嗎?答案是非也。Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正產生到JMS服務器鏈接的ConnectionFactory還得是由JMS服務廠商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我們這里使用的是ActiveMQ實現的JMS,所以在我們這里真正的可以產生Connection的就應該是由ActiveMQ提供的ConnectionFactory。所以定義一個ConnectionFactory的完整代碼應該如下所示:
Xml代碼??
1. ??
2. bean?id="targetConnectionFactory"?class="org.apache.activemq.ActiveMQConnectionFactory">??
3. ????property?name="brokerURL"?value="tcp://localhost:61616"/>??
4. bean>??
5. ??
6. ??
7. bean?id="connectionFactory"?class="org.springframework.jms.connection.SingleConnectionFactory">??
8. ??????
9. ????property?name="targetConnectionFactory"?ref="targetConnectionFactory"/>??
10. bean>??
???????ActiveMQ為我們提供了一個PooledConnectionFactory,通過往里面注入一個ActiveMQConnectionFactory可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗。當使用PooledConnectionFactory時,我們在定義一個ConnectionFactory時應該是如下定義:
Xml代碼??
1. ??
2. bean?id="targetConnectionFactory"?class="org.apache.activemq.ActiveMQConnectionFactory">??
3. ????property?name="brokerURL"?value="tcp://localhost:61616"/>??
4. bean>??
5. ??
6. bean?id="pooledConnectionFactory"?class="org.apache.activemq.pool.PooledConnectionFactory">??
7. ????property?name="connectionFactory"?ref="targetConnectionFactory"/>??
8. ????property?name="maxConnections"?value="10"/>??
9. bean>??
10. ??
11. bean?id="connectionFactory"?class="org.springframework.jms.connection.SingleConnectionFactory">??
12. ????property?name="targetConnectionFactory"?ref="pooledConnectionFactory"/>??
13. bean>??
### 1.2.3配置生產者
配置好ConnectionFactory之后我們就需要配置生產者。生產者負責產生消息并發送到JMS服務器,這通常對應的是我們的一個業務邏輯服務實現類。但是我們的服務實現類是怎么進行消息的發送的呢?這通常是利用Spring為我們提供的JmsTemplate類來實現的,所以配置生產者其實最核心的就是配置進行消息發送的JmsTemplate。對于消息發送者而言,它在發送消息的時候要知道自己該往哪里發,為此,我們在定義JmsTemplate的時候需要往里面注入一個Spring提供的ConnectionFactory對象。
Xml代碼??
1. ??
2. bean?id="jmsTemplate"?class="org.springframework.jms.core.JmsTemplate">??
3. ??????
4. ????property?name="connectionFactory"?ref="connectionFactory"/>??
5. bean>??
???????在真正利用JmsTemplate進行消息發送的時候,我們需要知道消息發送的目的地,即destination。在Jms中有一個用來表示目的地的Destination接口,它里面沒有任何方法定義,只是用來做一個標識而已。當我們在使用JmsTemplate進行消息發送時沒有指定destination的時候將使用默認的Destination。默認Destination可以通過在定義jmsTemplate bean對象時通過屬性defaultDestination或defaultDestinationName來進行注入,defaultDestinationName對應的就是一個普通字符串。在ActiveMQ中實現了兩種類型的Destination,一個是點對點的ActiveMQQueue,另一個就是支持訂閱/發布模式的ActiveMQTopic。在定義這兩種類型的Destination時我們都可以通過一個name屬性來進行構造,如:
Xml代碼??
1. ??
2. bean?id="queueDestination"?class="org.apache.activemq.command.ActiveMQQueue">??
3. ????constructor-arg>??
4. ????????value>queuevalue>??
5. ????constructor-arg>??
6. bean>??
7. ??
8. bean?id="topicDestination"?class="org.apache.activemq.command.ActiveMQTopic">??
9. ????constructor-arg?value="topic"/>??
10. bean>??
???????假設我們定義了一個ProducerService,里面有一個向Destination發送純文本消息的方法sendMessage,那么我們的代碼就大概是這個樣子:
Java代碼??
1. package?com.tiantian.springintejms.service.impl;??
2. ???
3. import?javax.annotation.Resource;??
4. import?javax.jms.Destination;??
5. import?javax.jms.JMSException;??
6. import?javax.jms.Message;??
7. import?javax.jms.Session;??
8. ???
9. import?org.springframework.jms.core.JmsTemplate;??
10. import?org.springframework.jms.core.MessageCreator;??
11. import?org.springframework.stereotype.Component;??
12. ???
13. import?com.tiantian.springintejms.service.ProducerService;??
14. ???
15. @Component??
16. public?class?ProducerServiceImpl?implements?ProducerService?{??
17. ???
18. ????private?JmsTemplate?jmsTemplate;??
19. ??????
20. ????public?void?sendMessage(Destination?destination,?final?String?message)?{??
21. ????????System.out.println("---------------生產者發送消息-----------------");??
22. ????????System.out.println("---------------生產者發了一個消息:"?+?message);??
23. ????????jmsTemplate.send(destination,?new?MessageCreator()?{??
24. ????????????public?Message?createMessage(Session?session)?throws?JMSException?{??
25. ????????????????return?session.createTextMessage(message);??
26. ????????????}??
27. ????????});??
28. ????}???
29. ??
30. ????public?JmsTemplate?getJmsTemplate()?{??
31. ????????returnjmsTemplate;??
32. ????}???
33. ??
34. ????@Resource??
35. ????public?void?setJmsTemplate(JmsTemplate?jmsTemplate)?{??
36. ????????this.jmsTemplate?=?jmsTemplate;??
37. ????}??
38. ???
39. }??
???????我們可以看到在sendMessage方法體里面我們是通過jmsTemplate來發送消息到對應的Destination的。到此,我們生成一個簡單的文本消息并把它發送到指定目的地Destination的生產者就配置好了。
### 1.2.4配置消費者
生產者往指定目的地Destination發送消息后,接下來就是消費者對指定目的地的消息進行消費了。那么消費者是如何知道有生產者發送消息到指定目的地Destination了呢?這是通過Spring為我們封裝的消息監聽容器MessageListenerContainer實現的,它負責接收信息,并把接收到的信息分發給真正的MessageListener進行處理。每個消費者對應每個目的地都需要有對應的MessageListenerContainer。對于消息監聽容器而言,除了要知道監聽哪個目的地之外,還需要知道到哪里去監聽,也就是說它還需要知道去監聽哪個JMS服務器,這是通過在配置MessageConnectionFactory的時候往里面注入一個ConnectionFactory來實現的。所以我們在配置一個MessageListenerContainer的時候有三個屬性必須指定,一個是表示從哪里監聽的ConnectionFactory;一個是表示監聽什么的Destination;一個是接收到消息以后進行消息處理的MessageListener。Spring一共為我們提供了兩種類型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。
SimpleMessageListenerContainer會在一開始的時候就創建一個會話session和消費者Consumer,并且會使用標準的JMS MessageConsumer.setMessageListener()方法注冊監聽器讓JMS提供者調用監聽器的回調函數。它不會動態的適應運行時需要和參與外部的事務管理。兼容性方面,它非常接近于獨立的JMS規范,但一般不兼容Java EE的JMS限制。
大多數情況下我們還是使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會動態的適應運行時需要,并且能夠參與外部的事務管理。它很好的平衡了對JMS提供者要求低、先進功能如事務參與和兼容Java EE環境。
**定義處理消息的****MessageListener**
???????要定義處理消息的MessageListener我們只需要實現JMS規范中的MessageListener接口就可以了。MessageListener接口中只有一個方法onMessage方法,當接收到消息的時候會自動調用該方法。
Java代碼??
1. package?com.tiantian.springintejms.listener;??
2. ???
3. import?javax.jms.JMSException;??
4. import?javax.jms.Message;??
5. import?javax.jms.MessageListener;??
6. import?javax.jms.TextMessage;??
7. ???
8. public?class?ConsumerMessageListener?implements?MessageListener?{??
9. ???
10. ????public?void?onMessage(Message?message)?{??
11. ????????//這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換??
12. ????????TextMessage?textMsg?=?(TextMessage)?message;??
13. ????????System.out.println("接收到一個純文本消息。");??
14. ????????try?{??
15. ????????????System.out.println("消息內容是:"?+?textMsg.getText());??
16. ????????}?catch?(JMSException?e)?{??
17. ????????????e.printStackTrace();??
18. ????????}??
19. ????}??
20. ???
21. }??
???????有了MessageListener之后我們就可以在Spring的配置文件中配置一個消息監聽容器了。
Xml代碼??
1. ??
2. bean?id="queueDestination"?class="org.apache.activemq.command.ActiveMQQueue">??
3. ????constructor-arg>??
4. ????????value>queuevalue>??
5. ????constructor-arg>??
6. bean>??
7. ??
8. bean?id="consumerMessageListener"?class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>??????
9. ??
10. ??
11. bean?id="jmsContainer"????????class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
12. ????property?name="connectionFactory"?ref="connectionFactory"?/>??
13. ????property?name="destination"?ref="queueDestination"?/>??
14. ????property?name="messageListener"?ref="consumerMessageListener"?/>??
15. bean>??
???????我們可以看到我們定義了一個名叫queue的ActiveMQQueue目的地,我們的監聽器就是監聽了發送到這個目的地的消息。
???????至此我們的生成者和消費者都配置完成了,這也就意味著我們的整合已經完成了。這個時候完整的Spring的配置文件應該是這樣的:
Xml代碼??
1. xml?version="1.0"?encoding="UTF-8"?>??
2. beans?xmlns="http://www.springframework.org/schema/beans"??
3. ????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"?xmlns:context="http://www.springframework.org/schema/context"??
4. ????xmlns:jms="http://www.springframework.org/schema/jms"??
5. ????xsi:schemaLocation="http://www.springframework.org/schema/beans??
6. ?????http://www.springframework.org/schema/beans/spring-beans-3.0.xsd??
7. ?????http://www.springframework.org/schema/context??
8. ?????http://www.springframework.org/schema/context/spring-context-3.0.xsd??
9. ????http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans-3.0.xsd??
10. ????http://www.springframework.org/schema/jms?http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">??
11. ???
12. ????context:component-scan?base-package="com.tiantian"?/>??
13. ???
14. ??????
15. ????bean?id="jmsTemplate"?class="org.springframework.jms.core.JmsTemplate">??
16. ??????????
17. ????????property?name="connectionFactory"?ref="connectionFactory"/>??
18. ????bean>??
19. ??????
20. ??????
21. ????bean?id="targetConnectionFactory"?class="org.apache.activemq.ActiveMQConnectionFactory">??
22. ????????property?name="brokerURL"?value="tcp://localhost:61616"/>??
23. ????bean>??
24. ??????
25. ??????
26. ????bean?id="connectionFactory"?class="org.springframework.jms.connection.SingleConnectionFactory">??
27. ??????????
28. ????????property?name="targetConnectionFactory"?ref="targetConnectionFactory"/>??
29. ????bean>??
30. ??????
31. ??????
32. ????bean?id="queueDestination"?class="org.apache.activemq.command.ActiveMQQueue">??
33. ????????constructor-arg>??
34. ????????????value>queuevalue>??
35. ????????constructor-arg>??
36. ????bean>??
37. ??????
38. ????bean?id="consumerMessageListener"?class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>??
39. ??????
40. ????bean?id="jmsContainer"??
41. ????????class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
42. ????????property?name="connectionFactory"?ref="connectionFactory"?/>??
43. ????????property?name="destination"?ref="queueDestination"?/>??
44. ????????property?name="messageListener"?ref="consumerMessageListener"?/>??
45. ????bean>??
46. beans>??
???????接著我們來測試一下,看看我們的整合是否真的成功了,測試代碼如下:
Java代碼??
1. package?com.tiantian.springintejms.test;??
2. ???
3. import?javax.jms.Destination;??
4. ???
5. import?org.junit.Test;??
6. import?org.junit.runner.RunWith;??
7. import?org.springframework.beans.factory.annotation.Autowired;??
8. import?org.springframework.beans.factory.annotation.Qualifier;??
9. import?org.springframework.test.context.ContextConfiguration;??
10. import?org.springframework.test.context.junit4.SpringJUnit4ClassRunner;??
11. import?com.tiantian.springintejms.service.ProducerService;??
12. ???
13. @RunWith(SpringJUnit4ClassRunner.class)??
14. @ContextConfiguration("/applicationContext.xml")??
15. public?class?ProducerConsumerTest?{??
16. ???
17. ????@Autowired??
18. ????private?ProducerService?producerService;??
19. ????@Autowired??
20. ????@Qualifier("queueDestination")??
21. ????private?Destination?destination;??
22. ??????
23. ????@Test??
24. ????public?void?testSend()?{??
25. ????????for?(int?i=0;?i2;?i++)?{??
26. ????????????producerService.sendMessage(destination,?"你好,生產者!這是消息:"?+?(i+1));??
27. ????????}??
28. ????}??
29. ??????
30. }??
???????在上面的測試代碼中我們利用生產者發送了兩個消息,正常來說,消費者應該可以接收到這兩個消息。運行測試代碼后控制臺輸出如下:

???????看,控制臺已經進行了正確的輸出,這說明我們的整合確實是已經成功了。