[JMS簡介與ActiveMQ實戰](http://boy00fly.iteye.com/blog/1103586)
**1\. JMS架構**
Java?消息服務(Java Message Service,簡稱JMS)是用于訪問企業消息系統的開發商中立的API。企業消息系統可以協助應用軟件通過網絡進行消息交互。JMS?在其中扮演的角色與JDBC?很相似,正如JDBC?提供了一套用于訪問各種不同關系數據庫的公共API,JMS?也提供了獨立于特定廠商的企業消息系統訪問方式。
使用JMS?的應用程序被稱為JMS?客戶端,處理消息路由與傳遞的消息系統被稱為JMS Provider,而JMS?應用則是由多個JMS?客戶端和一個JMS Provider?構成的業務系統。發送消息的JMS?客戶端被稱為生產者(producer),而接收消息的JMS?客戶端則被稱為消費者(consumer)。同一JMS?客戶端既可以是生產者也可以是消費者。
JMS?的編程過程很簡單,概括為:應用程序A?發送一條消息到消息服務器(也就是JMS Provider)的某個目得地(Destination),然后消息服務器把消息轉發給應用程序B。因為應用程序A?和應用程序B?沒有直接的代碼關連,所以兩者實現了解偶。如下圖:

消息傳遞系統的中心就是消息。一條Message?由三個部分組成:
**消息的組成**
1.?頭(head)
每條JMS 消息都必須具有消息頭。頭字段包含用于路由和識別消息的值。可以通過多種方式來設置消息頭的值:
a. 由JMS 提供者在生成或傳送消息的過程中自動設置
b. 由生產者客戶機通過在創建消息生產者時指定的設置進行設置
c. 由生產者客戶機逐一對各條消息進行設置
2.?屬性(property)
消息可以包含稱作屬性的可選頭字段。他們是以屬性名和屬性值對的形式制定的。可以將屬性是為消息頭得擴展,其中可以包括如下信息:創建數據的進程、數據的創建時間以及每條數據的結構。JMS提供者也可以添加影響消息處理的屬性,如是否應壓縮消息或如何在消息生命周期結束時廢棄消息。
3.?主體(body)
包含要發送給接收應用程序的內容。每個消息接口特定于它所支持的內容類型。JMS為不同類型的內容提供了他們各自的消息類型,但是所有消息都派生自Message接口。
StreamMessage ? 一種主體中包含Java基元值流的消息。其填充和讀取均按順序進行。
MapMessage ?? ?一種主體中包含一組鍵--值對的消息。沒有定義條目順序。
TextMessage ? ? ? 一種主體中包含Java字符串的消息(例如,XML消息)。
ObjectMessage ? ?一種主體中包含序列化Java對象的消息。
BytesMessage ? ? 一種主體中包含連續字節流的消息。
例如:MapMessage 消息格式
Json代碼??
1. MapMessage={??
2. ????Header={??
3. ????????...?standard?headers?...??
4. ????????CorrelationID={123-00001}??
5. ????}??
6. ????Properties={??
7. ????????AccountID={Integer:1234}??
8. ????}??
9. ????Fields={??
10. ????????Name={String:Mark}??
11. ????????Age={Integer:47}??
12. ????}???
13. }??
**消息的傳遞模型**
JMS支持兩種消息傳遞模型:點對點(point-to-point,簡稱PTP)和發布/訂閱(publish/subscribe,簡稱pub/sub)。這兩種消息傳遞模型非常相似,但有以下區別:
a. PTP消息傳遞模型規定了一條消息之恩能夠傳遞費一個接收方。
b. Pub/sub消息傳遞模型允許一條消息傳遞給多個接收方
每個模型都通過擴展公用基類來實現。例如:javax.jms.Queue和Javax.jms.Topic都擴展自javax.jms.Destination類。
1.?點對點消息傳遞
通過點對點的消息傳遞模型,一個應用程序可以向另外一個應用程序發送消息。在此傳遞模型中,目標類型時隊列。消息首先被傳送至隊列目標,然后從改對壘將消息傳送至對此隊列進行監聽的某個消費者,如下圖:

一個隊列可以關聯多個隊列發送方和接收方,但一條消息僅傳遞給一個接收方。如果多個接收方正在監聽隊列上的消息,JMS Provider將根據“先來者優先”的原則確定由哪個價售房接受下一條消息。如果沒有接收方在監聽隊列,消息將保留在隊列中,直至接收方連接到隊列為止。這種消息傳遞模型是傳統意義上的拉模型或輪詢模型。在此列模型中,消息不時自動推動給客戶端的,而是要由客戶端從隊列中請求獲得。
2.?發布/訂閱消息傳遞
通過發布/訂閱消息傳遞模型,應用程序能夠將一條消息發送到多個接收方。在此傳送模型中,目標類型是主題。消息首先被傳送至主題目標,然后傳送至所有已訂閱此主題的或送消費者。如下圖:

主題目標也支持長期訂閱。長期訂閱表示消費者已注冊了主題目標,但在消息到達目標時改消費者可以處于非活動狀態。當消費者再次處于活動狀態時,將會接收該消息。如果消費者均沒有注冊某個主題目標,該主題只保留注冊了長期訂閱的非活動消費者的消息。與PTP消息傳遞模型不同,pub/sub消息傳遞模型允許多個主題訂閱者接收同一條消息。JMS一直保留消息,直至所有主題訂閱者都接收到消息為止。pub/sub消息傳遞模型基本上時一個推模型。在該模型中,消息會自動廣播,消費者無須通過主動請求或輪詢主題的方法來獲得新的消息。
上面兩種消息傳遞模型里,我們都需要定義消息生產者和消費者,生產者吧消息發送到JMS Provider的某個目標地址(Destination),消息從該目標地址傳送至消費者。消費者可以同步或異步接收消息,一般而言,異步消息消費者的執行和伸縮性都優于同步消息接收者,體現在:
1\. 異步消息接收者創建的網絡流量比較小。單向對東消息,并使之通過管道進入消息監聽器。管道操作支持將多條消息聚合為一個網絡調用。
2\. 異步消息接收者使用線程比較少。異步消息接收者在不活動期間不使用線程。同步消息接收者在接收調用期間內使用線程,結果線程可能會長時間保持空閑,尤其是如果該調用中指定了阻塞超時。
3.對于服務器上運行的應用程序代碼,使用異步消息接收者幾乎總是最佳選擇,尤其是通過消息驅動Bean。使用異步消息接收者可以防止應用程序代碼在服務器上執行阻塞操作。而阻塞操作會是服務器端線程空閑,甚至會導致死鎖。阻塞操作使用所有線程時則發生死鎖。如果沒有空余的線程可以處理阻塞操作自身解鎖所需的操作,這該操作永遠無法停止阻塞。
**2\. JMS?Provider(ActiveMQ)**
**特性及優勢**
1\. 實現JMS1.1規范,支持J2EE1.4以上。
2\. 可運行與任何JVM和大部分web容器(ActiveMQ works great in any JVM)
3\. 支持多種語言客戶端(java, C, C++, Ajax, ActionScript等等)
4\. 支持多種協議(stomp, openwire, REST)
5\. 良好的Spring支持(ActiveMQ has great Spring Support)
6\. 速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than JBossMQ)
7\. 與OpenJMS、JBossMQ等開源jms provider相比,ActiveMQ有apache的支持,持續發展的優勢明顯
**Queue與Topic的比較**
1.?JMS Queue執行load balancer語義
?? ?一條消息僅能被一個consumer收到。如果在message發送的時候沒有可用的consumer,那么它講被保存一直到能處理該message的consumer可用。如果一個consumer收到一條message后卻不響應它,那么這條消息將被轉到另外一個consumer那兒。一個Queue可以有很多consumer,并且在多個可用的consumer中負載均衡。
2.?Topic實現publish和subscribe語義
?? ?一條消息被publish時,他將發送給所有感興趣的訂閱者,所以零到多個subscriber將接收到消息的一個拷貝。但是在消息代理接收到消息時,只有激活訂閱的subscriber能夠獲得消息的一個拷貝。
3.?分別對應兩種消息模式
?? ?Point-to-Point(點對點),Publisher/Subscriber Model(發布/訂閱者)
?? ?其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久化訂閱)和durable subscription(持久化訂閱)兩種消息處理方式。
**Point-to-Point(點對點)消息模式開發流程**
1.?生產者(producer)開發流程
?? ?1.1 創建 Connection
Java代碼??
1. //?根據url,user和password創建一個jms?Connection。??
2. ActiveMQConnectionFactory?connectionFactory???=???new?ActiveMQConnectionFactory?(user,?password,?url);??
3. connection?=?connectionFactory.createConnection();??
4. connection.start();??
?? ?1.2 創建Session
Java代碼??
1. /**在connection的基礎上創建一個session,同時設置是否支持事務ACKNOWLEDGE標識。?
2. ??AUTO_ACKNOWLEDGE:自動確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收。??
3. ??CLIENT_ACKNOWLEDGE:客戶端確認模式。會話對象依賴于應用程序對被接收的消息調用一個acknowledge()方法。一旦這個方法被調用,會話會確認最后一次確認之后所有接收到的消息。這種模式允許應用程序以一個調用來接收,處理并確認一批消息。注意:在管理控制臺中,如果連接工廠的Acknowledge?Policy(確認方針)屬性被設置為"Previous"(提前),但是你希望為一個給定的會話確認所有接收到的消息,那么就用最后一條消息來調用acknowledge()方法。??
4. ??DUPS_OK_ACKNOWLEDGE:允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;而且允許重復確認。在需要考慮資源使用時,這種模式非常有效。注意:如果你的應用程序無法處理重復的消息的話,你應該避免使用這種模式。如果發送消息的初始化嘗試失敗,那么重復的消息可以被重新發送。?
5. ???SESSION_TRANSACTED**/??
6. Session?session?=?connection.createSession(??
7. transacted,?Session.AUTO_ACKNOWLEDGE);??
?? ?1.3 創建Destination對象
Java代碼??
1. //需指定其對應的主題(subject)名稱,producer和consumer將根據subject來發送/接收對應的消息??
2. if?(topic)?{??
3. ????destination?=?session.createTopic(subject);??
4. }?else?{??
5. ????destination?=?session.createQueue(subject);??
6. }??
?? ?1.4 創建MessageProducer
Java代碼??
1. //根據Destination創建MessageProducer對象,同時設置其持久模式。???
2. MessageProducer?producer?=?session.createProducer(destination);??
3. if?(persistent)?{??
4. ??????producer.setDeliveryMode(DeliveryMode.PERSISTENT);??
5. }?else?{??
6. ??????producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);??
7. }??
8. if?(timeToLive?!=?0)?{??
9. ???????producer.setTimeToLive(timeToLive);??
10. }???
?? ?1.5 發送消息到隊列(Queue)
Java代碼??
1. //封裝TextMessage消息,使用MessageProducer的send方法將消息發送出去。??
2. TextMessage?message?=?session.createTextMessage("createMessageText");??
3. producer.send(message);??
2.?消費者(consumer)開發流程
?? ?2.1 實現MessageListener接口
Java代碼??
1. //消費者類必須實現MessageListener接口,然后在onMessage方法中監聽消息到達處理。??
?? ?2.2 創建Connection
Java代碼??
1. //根據url,user和password創建一個jms?Connection,如果是durable模式,還需要給connection設置一個clientId。??
2. ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory(user,?password,?url);??
3. Connection?connection?=?connectionFactory.createConnection();??
4. if?(durable?&&?clientId?!=?null?&&?clientId.length()?>?0?&&?!"null".equals(clientId))?{??
5. ??????connection.setClientID(clientId);??
6. }??
7. connection.setExceptionListener(this);??
8. connection.start();??
????2.3 創建Session和Destination
Java代碼??
1. //與產品類似??
?? ?2.4 創建replayProducer【可選】
Java代碼??
1. //可以用來將消息處理結果發送給producer。???
2. replyProducer?=?session.createProducer(null);??
3. replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);"white-space:?normal;">???
?? ?2.5 創建MessageConsumer
Java代碼??
1. //根據Destination創建MessageConsumer對象。??
2. MessageConsumer?consumer?=?null;??
3. if?(durable?&&?topic)?{??
4. ?????consumer?=?session.createDurableSubscriber((Topic)destination,?consumerName);??
5. }?else?{??
6. ?????consumer?=?session.createConsumer(destination);??
7. }"white-space:?normal;">???
????2.6 消費message
Java代碼??
1. //在onMessage()方法中接收producer發送過來的消息進行處理,并可以通過replyProducer反饋信息給producer???
2. if?(message.getJMSReplyTo()?!=?null)?{???
3. ????replyProducer.send(message.getJMSReplyTo(),???
4. ????session.createTextMessage("Reply:?"?+?message.getJMSMessageID()));???
5. }????
**Publish/Subscriber(發布/訂閱者)消息開發模式**
1\. 訂閱者(Subscriber)開發流程
?? ?1.1 實現MessageListener接口
??在onMessage()方法中監聽發布者發出的消息隊列,并做相應處理。
?? ?1.2 創建Connection
?? ? ? ? ?根據url, user和password創建一個jms connection
?? ?1.3 創建Session
??在connection的基礎上創建一個session,同時設置是否支持和ACKNOWLEDGE標志。
?? ?1.4 創建 Topic
?? ? ? ? ?創建兩個Topic,topictest.message用于接收發布者發出的消息,topictest.control用于向發布者發送消息,實現雙方的交互。
????1.5 創建consumer和producer對象
??根據topictest.message創建consumer,根據topictest.control創建producer
?? ?1.6 接收處理消息
??在onMessage()方法中,對收到的消息進行處理,可直接簡單在本地顯示消息,或者根據消息內容不同處理對應的業務邏輯(比如:數據庫更新、文件操作等等),并且可以使用 ??producer對象處理結果返回給發布者。
2\. 發布者(Publisher)開發流程
?? ??2.1 實現MessageListener接口?
?? 在onMessage()方法中接收訂閱者的反饋消息。
?? ??2.2 創建Connection
?? 根據url, user和password 創建一個 jms Connection。
?? ??2.3 創建session
?? 在connection的基礎上創建一個session,同時設置是否支持事務和ACKNOWLEDGE標志。
?? ??2.4 創建Topic
?? 創建兩個Topic,topictest.messages用于向訂閱者發布消息,topictest.control用于接收訂閱者反饋的消息。這兩個Topic與訂閱者開發流程中的topic是一一對應的。
?? ??2.5 創建consumer和producer對象
?? 根據topictest.message創建publisher;
?? 根據topictest.control穿件consumer,同時監聽訂閱者反饋的消息。
**3\. JMS For Spring**
?? ? ?Spring提供了用于簡化JMS API使用的抽象框架,并且對用戶屏蔽了JMS API中1.0.2和1.1版本的差異。
?? ? ?JMS的功能大致上分為兩塊,叫做消息制造和消息消耗。JmsTemplate用于制造消息和同步消息接收。和J2EE的事件驅動Bean風格類似,對于異步接收消息,Spring提供了一些消息監聽容器來創建消息驅動的POJO(MDP)。
1\. Spring 配置 connectionFactory
Xml代碼??
1.
2. ????此處需加以注意的是Listener端的borkerURL使用了failover傳輸方式:???
3. ????failover:(tcp://localhost:61616)?initialReconnectDelay=100&??
4. maxReconnectAttempts=5???
5. ????failover?transport是一種重新連接機制,用于建立可靠的傳輸。此處配置的是一旦ActiveMQ?broker中斷,Listener端將每隔100ms自動嘗試連接,直至成功連接或重試5次連接失敗為止。??
6. ????failover還支持多個borker同時提供服務,實現負載均衡的同時可增加系統容錯性,格式:?failover:(uri1,...,uriN)?transportOptions??
7. -->??
8. bean?id="jmsFactory"?destroy-method="stop"?class="org.apache.activemq.pool.PooledConnectionFactory">??
9. ????????property?name="connectionFactory">??
10. ????????????bean?class="org.apache.activemq.ActiveMQConnectionFactory">??
11. ????????????????property?name="brokerURL"?value="tcp://localhost:61616"?/>??
12. ?????????????????property?name="userName"?value="${activemq.username}"?/>??
13. ????????????????property?name="password"?value="${activemq.password}"?/>??
14. ????????????bean>??
15. ????????property>??
16. bean>??
??
2\. Spring JmsTemplate
Xml代碼??
1.
2. ????????JmsTemplate?類的實例?一經配置便是線程安全?的。?要清楚一點,JmsTemplate???
3. ????????是有狀態的,因為它維護了?ConnectionFactory?的引用,但這個狀態時不是會話狀態。??
4. -->??
5. ????bean?id="myJmsTemplate"??
6. ????????class="org.springframework.jms.core.JmsTemplate">??
7. ????????property?name="connectionFactory"?ref="jmsFactory"?/>??
8. ????????property?name="defaultDestinationName"?value="MySubject"?/>??
9. ????????
10. 優先級和存活時間作為服務質量(QOS)參數,??
11. ????????????默認{deliveryMode:2(1),priority:4,timeToLive:0}??
12. ????????????另一種使用無需QOS參數的缺省值方法。??
13. ????????????property?name="explicitQosEnabled"?value="true"/>??
14. ????????????property?name="deliveryMode"?value="2"/>??
15. ????????????property?name="priority"?value="4"/>??
16. ????????????????????????property?name="timeToLive"?value="1000"/>??
17. ?????????-->??
18. ?????????
19. ?????????????
20. ?????????-->??
21. ????bean>??
??
3\. 發送的接收消息
Xml代碼??
1. bean?id="destinationTopic"??
2. ????????class="org.apache.activemq.command.ActiveMQTopic">??
3. ????????constructor-arg?index="0"?value="HelloWorldTopic"?/>??
4. ????bean>??
5. ??????
6. ????bean?id="consumer"?class="com.d1xn.jms.demo.Consumer">??
7. ????????property?name="jmsTemplate"?ref="myJmsTemplate"?/>??
8. ????bean>??
9. ??
10. ??????
11. ????bean?id="producerTopic"?class="com.d1xn.jms.demo.ProducerTopic">??
12. ????????property?name="jmsTemplet"?ref="myJmsTemplate"?/>??
13. ????????property?name="destination"?ref="destinationTopic"?/>??
14. ????bean>??
15. ??????
16. ??
17. ????bean?id="listenerContainerTopic"??
18. class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
19. ????????property?name="connectionFactory"?ref="jmsFactory"?/>??
20. ????????property?name="destination"?ref="destinationTopic"?/>??
21. ????????property?name="messageListener"?ref="consumer"?/>??
22. ?????????>??
23. ????????property?name="clientId"?value="clientId_001"?/>??
24. ????????property?name="subscriptionDurable"?value="true"?/>??
25. ????????property?name="durableSubscriptionName"?value="My_001"?/>??
26. bean>??
說明(基于ActiveMQ5.4.2版本):
?? ? ? ?1、Web Console 的安全配置可參考 ? ? ?
? ? 將conf/jetty.xml下面一段xml配置:
Xml代碼??
1. bean?id="securityConstraint"?class="org.eclipse.jetty.http.security.Constraint">??
2. ????property?name="name"?value="BASIC"?/>?????????????????????????????????????????
3. ????property?name="roles"?value="admin"?/>????????????????????????????????????????
4. ????property?name="authenticate"?value="true"?/>??????????????????????????????????
5. bean>????????????????????????????????????????????????????????????????????????????
? ?authenticate值設置為true即可!那用戶名/密碼的配置是在conf/jetty-realm.properties!
? ?詳細可參考[http://activemq.apache.org/web-console.html](http://activemq.apache.org/web-console.html)
2、連接安全配置可參考
? ? ?將conf/activemq-security.xml中如下的配置
Xml代碼??
1. plugins>??????????????????????????????????????????????????????????????????????????????????????????
2. ????????????????????????????????
3. ?simpleAuthenticationPlugin?anonymousAccessAllowed="false">?????????????????????????????????????
4. ????users>??????????????????????????????????????????????????????????????????????????????????????
5. ????????authenticationUser?username="system"?password="${activemq.password}"????????????????????
6. ????????????groups="admins"/>????????????????????????????????????????????????????????????????????
7. ????????authenticationUser?username="user"?password="${guest.password}"?????????????????????
8. ????????????groups="users"/>?????????????????????????????????????????????????????????????????????
9. ????????authenticationUser?username="guest"?password="${guest.password}"?groups="guests"/>-->???
10. ????users>?????????????????????????????????????????????????????????????????????????????????????
11. ?simpleAuthenticationPlugin>???????????????????????????????????????????????????????????????????
12. plugins>???????????????????????????????????????????????????????????????????????????????????????
? ? ? ? ? ??copy至conf/activemq.xml中
Xml代碼??
1. persistenceAdapter>????????????????????????????????????
2. ????kahaDB?directory="${activemq.base}/data/kahadb"/>??
3. persistenceAdapter>???????????????????????????????????
? ? ? 的下面(這是簡單的用戶名、密碼的認證方式)!
? ?用戶名、密碼的可在conf/credentials.properties配置!
? ?詳細可參考:
?? ??[http://activemq.apache.org/security.html](http://activemq.apache.org/security.html)
?? ??[http://activemq.apache.org/xml-reference.html](http://activemq.apache.org/xml-reference.html)
- 誰能舉個通俗易懂的例子告訴我IAAS,SAAS,PAAS的區別?
- 服務器與容器
- 常見NIO框架
- Nginx/Apache 和Apache Tomcat 的區別
- tomcat結合nginx使用小結
- java nio框架netty 與tomcat的關系
- Nginx、Lighttpd與Apache的區別
- Apache vs Lighttpd vs Nginx對比
- 數據庫
- mybatis
- MyBatis傳入多個參數的問題
- MS
- JMS(Java消息服務)入門教程
- ActiveMQ
- JMS簡介與ActiveMQ實戰
- JMS-使用消息隊列優化網站性能
- 深入淺出JMS(一)--JMS基本概念
- 深入淺出JMS(二)--ActiveMQ簡單介紹以及安裝
- 深入淺出JMS(三)--ActiveMQ簡單的HelloWorld實例
- RabbitMq、ActiveMq、ZeroMq、kafka之間的比較,資料匯總
- kafka
- zookeeper
- 集群與負載
- 單機到分布式集群
- 日志
- 從Log4j遷移到LogBack的理由
- 角色權限
- shiro
- Shiro的認證和權限控制
- Spring 整合 Apache Shiro 實現各等級的權限管理
- 安全
- basic
- Servlet、Filter、Listener深入理解
- filter與servlet的比較
- Servlet Filter