[TOC]
# 什么是JMS
JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。
Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
JMS是一種與廠商無關的 API,用來訪問消息收發系統消息。它類似于JDBC(Java Database Connectivity):
這里,JDBC 是可以用來訪問許多不同關系數據庫的 API,而 JMS 則提供同樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。
JMS 使您能夠通過消息收發服務(有時稱為消息中介程序或路由器)從一個 JMS 客戶機向另一個 JMS客戶機發送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶著應用程序的數據或有效負載。根據有效負載的類型來劃分,可以將消息分為幾種類型,它們分別攜帶:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負載的消息 (Message)。
# JMS規范
1. 專業技術規范
JMS(Java Messaging Service)是Java平臺上有關面向消息中間件(MOM)的技術規范,它便于消息系統中的Java應用程序進行消息交換,并且通過提供標準的產生、發送、接收消息的接口簡化企業應用的開發,翻譯為Java消息服務。
2. 體系架構
JMS由以下元素組成。
JMS提供者:連接面向消息中間件的,JMS接口的一個實現。提供者可以是Java平臺的JMS實現,也可以是非Java平臺的面向消息中間件的適配器。
JMS客戶:生產或消費基于消息的Java的應用程序或對象。
JMS生產者:創建并發送消息的JMS客戶。
JMS消費者:接收消息的JMS客戶。
JMS消息:包括可以在JMS客戶之間傳遞的數據的對象
JMS隊列:一個容納那些被發送的等待閱讀的消息的區域。一旦一個消息被閱讀,該消息將被從隊列中移走。
JMS主題:一種支持發送消息給多個訂閱者的機制。
# Java消息服務應用程序結構支持兩種模型
1. 點對點或隊列模型
在點對點或隊列模型下,一個生產者向一個特定的隊列發布消息,一個消費者從該隊列中讀取消息。這里,生產者知道消費者的隊列,并直接將消息發送到消費者的隊列

這種模式被概括為:
只有一個消費者將獲得消息
生產者不需要在接收者消費該消息期間處于運行狀態,接收者也同樣不需要在消息發送時處于運行狀態。
每一個成功處理的消息都由接收者簽收
2. 發布者/訂閱者模型
發布者/訂閱者模型支持向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。

這種模式被概括為:
多個消費者可以獲得消息
在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。
# activemq
## 安裝
1. 下載ActiveMQ
去官方網站下載:http://activemq.apache.org/
2. 運行ActiveMQ
解壓縮apache-activemq-5.5.1-bin.zip,
修改配置文件activeMQ.xml,將0.0.0.0修改為localhost
~~~
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector uri="http://localhost:8081"/>
<transportConnector uri="udp://localhost:61618"/>
~~~
然后雙擊`apache-activemq-5.5.1\bin\win64\activemq.bat`運行ActiveMQ程序。
啟動ActiveMQ以后,登陸:`http://localhost:8161/admin/`,創建一個Queue,命名為FirstQueue。
3. 運行代碼
~~~
package cn.itcast_03_mq.queue
package cn.itcast_03_mq.topic
~~~
賬號密碼都是admin
4. 配置文件activemq.xml

## 代碼
### 生產者
~~~
package mq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ProducerTool {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// private String url = "failover://tcp://localhost:61616";
//主題
private String subject = "myqueue";
//目標
private Queue destination = null;
//連接
private Connection connection = null;
//會話
private Session session = null;
private MessageProducer producer = null;
//初始化
private void initialize() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
//發送消息
public void produceMessage(String message) throws JMSException {
initialize();
//創建文本消息,把消息變成他的格式
TextMessage msg = session.createTextMessage(message);
connection.start();
System.out.println("Producer:->Sending message: " + message);
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
//關閉連接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
~~~
生產者內部是開啟多個線程在生產的
**測試**
~~~
package mq;
import javax.jms.JMSException;
public class ProducerTest {
public static void main(String[] args) throws JMSException {
ProducerTool producerTool = new ProducerTool();
for (int i=0; i<10; i++) {
producerTool.produceMessage("hello,world"+i);
}
producerTool.close();
}
}
~~~
**網頁**

### 消費者
~~~
package mq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ConsumerTool implements MessageListener, ExceptionListener {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
// private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String url = "failover://tcp://localhost:41414";
private String queue = "myqueue";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
private ActiveMQConnectionFactory connectionFactory = null;
public static Boolean isconnection = false;
// 初始化
private void initialize() throws JMSException {
connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(queue);
consumer = session.createConsumer(destination);
}
public void consumeMessage() throws JMSException {
initialize();
connection.start();
//activemq采用推送機制來消耗消息
consumer.setMessageListener(this);
connection.setExceptionListener(this);
System.out.println("Consumer " + Thread.currentThread().getName() + " :=>local listening...");
isconnection = true;
//開始監聽
Message message = consumer.receive();
System.out.println(message.getJMSMessageID());
}
//如果是異常的話
@Override
public void onException(JMSException e) {
isconnection = false;
}
// 消息處理函數
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer " + Thread.currentThread().getId() + " :=>Received: " + msg);
} else {
System.out.println("Consumer " + Thread.currentThread().getId() + " :=>Received: " + message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 關閉連接
public void close() throws JMSException {
System.out.println("Consumer" + Thread.currentThread().getName() + ":->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
~~~
**測試**
~~~
package mq;
public class ConsumerTest implements Runnable{
static Thread t1 = null;
static Thread t2 = null;
public static void main(String[] args) {
//創建消費者
t1 = new Thread(new ConsumerTest());
t1.start();
t2 = new Thread(new ConsumerTest());
t2.start();
/*
* while (true) { System.out.println(t1.isAlive()); if (!t1.isAlive()) {
* t1 = new Thread(new ConsumerTest()); t1.start();
* System.out.println("重新啟動"); } Thread.sleep(5000); } 延時500毫秒之后停止接受消息
* Thread.sleep(500); consumer.close();
*/
}
@Override
public void run() {
try{
ConsumerTool consumer = new ConsumerTool();
consumer.consumeMessage();
//如果斷開連接,主線程就不走
while (ConsumerTool.isconnection) {
}
}catch (Exception e) {
}
}
}
~~~
- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介