<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] # 什么是JMS JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。 Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。 JMS是一種與廠商無關的 API,用來訪問消息收發系統消息。它類似于JDBC(Java Database Connectivity): 這里,JDBC 是可以用來訪問許多不同關系數據庫的 API,而 JMS 則提供同樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。 JMS 使您能夠通過消息收發服務(有時稱為消息中介程序或路由器)從一個 JMS 客戶機向另一個 JMS客戶機發送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶著應用程序的數據或有效負載。根據有效負載的類型來劃分,可以將消息分為幾種類型,它們分別攜帶:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負載的消息 (Message)。 # JMS規范 1. 專業技術規范 JMS(Java Messaging Service)是Java平臺上有關面向消息中間件(MOM)的技術規范,它便于消息系統中的Java應用程序進行消息交換,并且通過提供標準的產生、發送、接收消息的接口簡化企業應用的開發,翻譯為Java消息服務。 2. 體系架構 JMS由以下元素組成。 JMS提供者:連接面向消息中間件的,JMS接口的一個實現。提供者可以是Java平臺的JMS實現,也可以是非Java平臺的面向消息中間件的適配器。 JMS客戶:生產或消費基于消息的Java的應用程序或對象。 JMS生產者:創建并發送消息的JMS客戶。 JMS消費者:接收消息的JMS客戶。 JMS消息:包括可以在JMS客戶之間傳遞的數據的對象 JMS隊列:一個容納那些被發送的等待閱讀的消息的區域。一旦一個消息被閱讀,該消息將被從隊列中移走。 JMS主題:一種支持發送消息給多個訂閱者的機制。 # Java消息服務應用程序結構支持兩種模型 1. 點對點或隊列模型 在點對點或隊列模型下,一個生產者向一個特定的隊列發布消息,一個消費者從該隊列中讀取消息。這里,生產者知道消費者的隊列,并直接將消息發送到消費者的隊列 ![](https://box.kancloud.cn/a76f56e71eea6602f63c33bc4cd8c061_793x145.png) 這種模式被概括為: 只有一個消費者將獲得消息 生產者不需要在接收者消費該消息期間處于運行狀態,接收者也同樣不需要在消息發送時處于運行狀態。 每一個成功處理的消息都由接收者簽收 2. 發布者/訂閱者模型 發布者/訂閱者模型支持向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。 ![](https://box.kancloud.cn/16600f938c0732f2c1607842eb51c57a_789x375.png) 這種模式被概括為: 多個消費者可以獲得消息 在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。 # activemq ## 安裝 1. 下載ActiveMQ 去官方網站下載:http://activemq.apache.org/ 2. 運行ActiveMQ 解壓縮apache-activemq-5.5.1-bin.zip, 修改配置文件activeMQ.xml,將0.0.0.0修改為localhost ~~~ <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616"/> <transportConnector name="ssl" uri="ssl://localhost:61617"/> <transportConnector name="stomp" uri="stomp://localhost:61613"/> <transportConnector uri="http://localhost:8081"/> <transportConnector uri="udp://localhost:61618"/> ~~~ 然后雙擊`apache-activemq-5.5.1\bin\win64\activemq.bat`運行ActiveMQ程序。 啟動ActiveMQ以后,登陸:`http://localhost:8161/admin/`,創建一個Queue,命名為FirstQueue。 3. 運行代碼 ~~~ package cn.itcast_03_mq.queue package cn.itcast_03_mq.topic ~~~ 賬號密碼都是admin 4. 配置文件activemq.xml ![](https://box.kancloud.cn/bfdadfb488b4ca2806f3792dfeef4215_2354x516.png) ## 代碼 ### 生產者 ~~~ package mq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; // private String url = "failover://tcp://localhost:61616"; //主題 private String subject = "myqueue"; //目標 private Queue destination = null; //連接 private Connection connection = null; //會話 private Session session = null; private MessageProducer producer = null; //初始化 private void initialize() throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } //發送消息 public void produceMessage(String message) throws JMSException { initialize(); //創建文本消息,把消息變成他的格式 TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sending message: " + message); producer.send(msg); System.out.println("Producer:->Message sent complete!"); } //關閉連接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } ~~~ 生產者內部是開啟多個線程在生產的 **測試** ~~~ package mq; import javax.jms.JMSException; public class ProducerTest { public static void main(String[] args) throws JMSException { ProducerTool producerTool = new ProducerTool(); for (int i=0; i<10; i++) { producerTool.produceMessage("hello,world"+i); } producerTool.close(); } } ~~~ **網頁** ![](https://box.kancloud.cn/a1cc8f227d923e767a8747d77535237f_2702x546.png) ### 消費者 ~~~ package mq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ConsumerTool implements MessageListener, ExceptionListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; // private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String url = "failover://tcp://localhost:41414"; private String queue = "myqueue"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; private ActiveMQConnectionFactory connectionFactory = null; public static Boolean isconnection = false; // 初始化 private void initialize() throws JMSException { connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(queue); consumer = session.createConsumer(destination); } public void consumeMessage() throws JMSException { initialize(); connection.start(); //activemq采用推送機制來消耗消息 consumer.setMessageListener(this); connection.setExceptionListener(this); System.out.println("Consumer " + Thread.currentThread().getName() + " :=>local listening..."); isconnection = true; //開始監聽 Message message = consumer.receive(); System.out.println(message.getJMSMessageID()); } //如果是異常的話 @Override public void onException(JMSException e) { isconnection = false; } // 消息處理函數 @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer " + Thread.currentThread().getId() + " :=>Received: " + msg); } else { System.out.println("Consumer " + Thread.currentThread().getId() + " :=>Received: " + message); } } catch (Exception e) { e.printStackTrace(); } } // 關閉連接 public void close() throws JMSException { System.out.println("Consumer" + Thread.currentThread().getName() + ":->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } } ~~~ **測試** ~~~ package mq; public class ConsumerTest implements Runnable{ static Thread t1 = null; static Thread t2 = null; public static void main(String[] args) { //創建消費者 t1 = new Thread(new ConsumerTest()); t1.start(); t2 = new Thread(new ConsumerTest()); t2.start(); /* * while (true) { System.out.println(t1.isAlive()); if (!t1.isAlive()) { * t1 = new Thread(new ConsumerTest()); t1.start(); * System.out.println("重新啟動"); } Thread.sleep(5000); } 延時500毫秒之后停止接受消息 * Thread.sleep(500); consumer.close(); */ } @Override public void run() { try{ ConsumerTool consumer = new ConsumerTool(); consumer.consumeMessage(); //如果斷開連接,主線程就不走 while (ConsumerTool.isconnection) { } }catch (Exception e) { } } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看