[Spring整合JMS(四)——事務管理](http://elim.iteye.com/blog/1983532)
[TOC=1,3]
Spring提供了一個JmsTransactionManager用于對JMS ConnectionFactory做事務管理。這將允許JMS應用利用Spring的事務管理特性。JmsTransactionManager在執行本地資源事務管理時將從指定的ConnectionFactory綁定一個ConnectionFactory/Session這樣的配對到線程中。JmsTemplate會自動檢測這樣的事務資源,并對它們進行相應操作。
在Java EE環境中,ConnectionFactory會池化Connection和Session,這樣這些資源將會在整個事務中被有效地重復利用。在一個獨立的環境中,使用Spring的SingleConnectionFactory時所有的事務將公用一個Connection,但是每個事務將保留自己獨立的Session。
JmsTemplate可以利用JtaTransactionManager和能夠進行分布式的?JMS ConnectionFactory處理分布式事務。
???????在Spring整合JMS的應用中,如果我們要進行本地的事務管理的話非常簡單,只需要在定義對應的消息監聽容器時指定其sessionTransacted屬性為true,如:
Xml代碼??
1. bean?id="jmsContainer"??
2. ????class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
3. ????property?name="connectionFactory"?ref="connectionFactory"?/>??
4. ????property?name="destination"?ref="queueDestination"?/>??
5. ????property?name="messageListener"?ref="consumerMessageListener"?/>??
6. ????property?name="sessionTransacted"?value="true"/>??
7. bean>??
???????該屬性值默認為false,這樣JMS在進行消息監聽的時候就會進行事務控制,當在接收消息時監聽器執行失敗時JMS就會對接收到的消息進行回滾,對于SessionAwareMessageListener在接收到消息后發送一個返回消息時也處于同一事務下,但是對于其他操作如數據庫訪問等將不屬于該事務控制。
這里我們可以來做一個這樣的測試:我們如上配置監聽在queueDestination的消息監聽容器的sessionTransacted屬性為true,然后把我們前面提到的消息監聽器ConsumerMessageListener改成這樣:
Java代碼??
1. public?class?ConsumerMessageListener?implements?MessageListener?{??
2. ???
3. ????public?void?onMessage(Message?message)?{??
4. ????????????//這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessage??
5. ????????????TextMessage?textMsg?=?(TextMessage)?message;??
6. ????????????System.out.println("接收到一個純文本消息。");??
7. ????????????try?{??
8. ????????????????System.out.println("消息內容是:"?+?textMsg.getText());??
9. ????????????????if?(1?==?1)?{??
10. ????????????????????throw?new?RuntimeException("Error");??
11. ????????????????}??
12. ????????????}?catch?(JMSException?e)?{??
13. ????????????????e.printStackTrace();??
14. ????????????}??
15. ????}??
16. ???
17. }??
我們可以看到在上述代碼中我們的ConsumerMessageListener在進行消息接收的時候拋出了一個RuntimeException,根據我們上面說的,因為我們已經在對應的監聽容器上定義了其sessionTransacted屬性為true,所以當這里拋出異常的時候JMS將對接收到的消息進行回滾,即下次進行消息接收的時候該消息仍然能夠被接收到。為了驗證這一點,我們先執行一遍測試代碼,往queueDestination發送一個文本消息,這個時候ConsumerMessageListener在進行接收的時候將會拋出一個RuntimeException,已經接收到的純文本消息將進行回滾;接著我們去掉上面代碼中拋出異常的語句,即ConsumerMessageListener能夠正常的進行消息接收,這個時候我們再運行一次測試代碼,往ConsumerMessageListener監聽的queueDestination發送一條消息。如果之前在接手時拋出了異常的那條消息已經回滾了的話,那么這個時候將能夠接收到兩條消息,控制臺將輸出接收到的兩條消息的內容。具體結果有興趣的朋友可以自己驗證一下。
???????如果想接收消息和數據庫訪問處于同一事務中,那么我們就可以配置一個外部的事務管理同時配置一個支持外部事務管理的消息監聽容器(如DefaultMessageListenerContainer)。要配置這樣一個參與分布式事務管理的消息監聽容器,我們可以配置一個JtaTransactionManager,當然底層的JMS ConnectionFactory需要能夠支持分布式事務管理,并正確地注冊我們的JtaTransactionManager。這樣消息監聽器進行消息接收和對應的數據庫訪問就會處于同一數據庫控制下,當消息接收失敗或數據庫訪問失敗都會進行事務回滾操作。
Xml代碼??
1. bean?id="jmsContainer"??
2. ????class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
3. ????property?name="connectionFactory"?ref="connectionFactory"?/>??
4. ????property?name="destination"?ref="queueDestination"?/>??
5. ????property?name="messageListener"?ref="consumerMessageListener"?/>??
6. ????property?name="transactionManager"?ref="jtaTransactionManager"/>??
7. bean>??
8. ??
9. bean?id="jtaTransactionManager"?class="org.springframework.transaction.jta.JtaTransactionManager"/>??
當給消息監聽容器指定了transactionManager時,消息監聽容器將忽略sessionTransacted的值。?
???????關于使用JtaTransactionManager來管理上述分布式事務,我們這里也可以來做一個試驗。
???????首先:往Spring配置文件applicationContext.xml中添加如下配置:
???
Xml代碼??
1. bean?id="jdbcTemplate"?class="org.springframework.jdbc.core.JdbcTemplate">??
2. ????property?name="dataSource"?ref="dataSource"/>??
3. bean>??
4. ??
5. jee:jndi-lookup?jndi-name="jdbc/mysql"?id="dataSource"/>??
6. bean?id="jtaTransactionManager"?class="org.springframework.transaction.jta.JtaTransactionManager"/>??
7. ??
8. tx:annotation-driven?transaction-manager="jtaTransactionManager"/>??
???????我們可以看到,在這里我們引入了一個jndi數據源,定義了一個JtaTransactionManager,定義了Spring基于注解的聲明式事務管理,定義了一個Spring提供的進行Jdbc操作的工具類jdbcTemplate。
???????接下來把我們的ConsumerMessageListener改為如下形式:
Java代碼??
1. public?class?ConsumerMessageListener?implements?MessageListener?{??
2. ???
3. ????@Autowired??
4. ????private?TestDao?testDao;??
5. ??????
6. ????private?int?count?=?0;??
7. ??????
8. ????public?void?onMessage(Message?message)?{??
9. ????????????//這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessage??
10. ????????????TextMessage?textMsg?=?(TextMessage)?message;??
11. ????????????System.out.println(new?Date().toLocaleString()?+?"接收到一個純文本消息。");??
12. ????????????try?{??
13. ????????????????String?text?=?textMsg.getText();??
14. ????????????????System.out.println("消息內容是:"?+?text);??
15. ????????????????System.out.println("當前count的值是:"?+?count);??
16. ????????????????testDao.insert(text?+?count);??
17. ????????????????if?(count?==?0)?{??
18. ????????????????????count?++;??
19. ????????????????????throw?new?RuntimeException("Error!?出錯啦!");??
20. ????????????????}??
21. ????????????}?catch?(JMSException?e)?{??
22. ????????????????e.printStackTrace();??
23. ????????????}??
24. ????}??
25. ???
26. }??
???????我們可以看到,在ConsumerMessageListener中我們定義了一個實例變量count,其初始值為0;在onMessage里面,我們可以看到我們把接收到的消息內容作為參數調用了testDao的insert方法;當count值為0,也就是進行第一次消息接收的時候會將count的值加1,同時拋出一個運行時異常。那么我們這里要測試的就是進行第一次接收的時候testDao已經把相關內容插入數據庫了,接著在onMessage里面拋出了一個異常同時count加1,我們預期的結果應該是此時數據庫進行回滾,同時JMS也回滾,這樣JMS將繼續嘗試接收該消息,此時同樣會調用testDao的insert方法將內容插入數據庫,再接著count已經不為0了,所以此時將不再拋出異常,JMS成功進行消息的接收,testDao也成功的將消息內容插入到了數據庫。要證明這個預期我們除了看數據庫中插入的數據外,還可以看控制臺的輸出,正常情況控制臺將輸出兩次消息接收的內容,且第一次時count為0,第二次count為1。
???????TestDao是一個接口,其TestDaoImpl對insert的方法實現如下:?
Java代碼??
1. @Transactional(readOnly=false)??
2. public?void?insert(final?String?name)?{??
3. ??????
4. ????jdbcTemplate.update("insert?into?test(name)?values(?)",?name);??
5. ??
6. }??
這里我們使用支持JtaTransactionManager的Weblogic來進行測試,因為是Web容器,所以我們這里定義了一個Controller來進行消息的發送,具體代碼如下:
Java代碼??
1. @Controller??
2. @RequestMapping("test")??
3. public?class?TestController?{??
4. ???
5. ????@Autowired??
6. ????@Qualifier("queueDestination")??
7. ????private?Destination?destination;??
8. ??????
9. ????@Autowired??
10. ????private?ProducerService?producerService;??
11. ??????
12. ????@RequestMapping("first")??
13. ????public?String?first()?{??
14. ????????producerService.sendMessage(destination,?"你好,現在是:"?+?new?Date().toLocaleString());??
15. ????????return?"/test/first";??
16. ????}??
17. ??????
18. }??
接下來就是啟用Weblogic服務器,進入其控制臺,定義一個名叫“jdbc/mysql”的JNDI數據源,然后把該項目部署到Weblogic服務器上并進行啟動。接下來我們就可以訪問/test/first.do訪問到上述first方法。之后控制臺會輸出如下信息:

????????我們可以看到當count為0時接收了一次,并隨后拋出了異常,之后count為1又接收了一次,這說明在count為0時拋出異常后我們的JMS進行回滾了,那么我們的數據庫是否有進行回滾呢?接著我們來看數據庫中的內容:

????????我們可以看到數據庫表中只有一條記錄,而且最后一位表示count的值的為1,這說明在JMS進行消息接收拋出異常時我們的數據庫也回滾了。關于使用JtaTransactionManager進行分布式事務管理的問題就說到這里了,有興趣的朋友可以自己試驗一下。
* [Spring整合JMS——基于ActiveMQ實現(一)](http://elim.iteye.com/blog/1893038)
* [Spring整合JMS(二)——三種消息監聽器](http://elim.iteye.com/blog/1893676)
* [Spring整合JMS(三)——MessageConverter介紹](http://elim.iteye.com/blog/1900937)