[TOC]
# 簡介
首先加入RabbitMQ java client依賴:
~~~xml
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
</dependencies>
~~~
RabbitMQ的java client使用`com.rabbitmq.client`作為其頂級包。關鍵的類和接口是:
~~~
com.rabbitmq.client.Channel
com.rabbitmq.client.Connection
com.rabbitmq.client.ConnectionFactory
com.rabbitmq.client.Consumer
~~~
通過Channel可以進行一系列的api操作。 Connection(連接)用于打開通道,注冊連接生命周期事件處理程序,并關閉不再需要的連接。
Connection(連接)通過ConnectionFactory實例化,ConnectionFactory可以設置一些Collection(連接)的一些配置,比如說vhost或者說username等等。
## Connections(連接)和Channels(管道)
核心的類是Connections(連接)和Channels(管道),分別代表著AMQP 0-9-1協議中的Connections(連接)和Channels(管道),一般被導入
~~~
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
~~~
## 連接服務器
下面的代碼時使用給定的參數(host name,端口等等)連接AMQP的服務器。
~~~
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
~~~
所有的這些參數RabbitMQ服務器都設置了默認值,可以在ConnectionFactory類中查看這些默認值。
另外,URI可以以下面的方法進行連接都有默認值。
~~~
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
~~~
Connection(連接)接口可以被用作創建一個channel(管道):
~~~
Channel channel = conn.createChannel();
~~~
可以使用channel(管道)發送和接收消息,下面會有講到。
關閉連接,只需要關閉channel(管道)和connection(連接):
~~~
channel.close();
conn.close();
~~~
注意,關閉管道是被認為是最佳實踐,但是卻不是嚴格意義的必要的。當底層的連接關閉時候,channel(管道)也就自動的被關閉了。
## 使用Exchanges和Queues
客戶端應用必須應用在exchanges和queues,這些都是AMQP協議定義的。使用這些(exchanges和queues)首先必須“聲明”它(就是創建的意思)。
下面的代碼就是怎樣去"聲明"一個exchange和隊列,并且將它們綁定在一起。
~~~
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
~~~
可以通過參數去設置exchange和queue的一些屬性,使用這些方法的一些重載方法進行相關設置。
~~~
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
~~~
## 發送消息(Publishing messages)
使用Channel.basicPublish方法將消息發送給一個exchange:
~~~
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
~~~
為了更好的控制,你可以使用重載的參數來設置消息的一些屬性(比如說mandatory標志,關于mandatory標志,下面會講到),或者在發送消息前設定一些消息屬性。
~~~
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
~~~
可以自己構建BasicProperties的對象,如下面的代碼:
~~~
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build()),
messageBodyBytes);
~~~
發送消息指定頭信息:
~~~
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);
~~~
發送一個有過期時間的消息,下面的博客也會講到:
~~~
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()),
messageBodyBytes);
~~~
## 訂閱消息("Push API")
~~~
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
~~~
最有效的接收消息的方法是使用Consumer接口去訂閱。當消息到達消費端的時候會自動的傳遞消費(delivered),而不需要去請求。
當我們調用Consumers(消費者)有關的api的時候,會生成一個消費者標識符(consumer tag)。
不同的Consumer實例必須有不同的消費者標簽。 強烈建議不要在連接上重復使用消費者標簽,不然在監視消費者時可能導致自動連接恢復和混淆監控數據的問題。
實現Consumer的最簡單的方法是將便利(convenience)類DefaultConsumer子類化。 該子類的對象可以在basicConsume方法調用中傳遞以設置訂閱:
~~~java
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
~~~
在這里,因為我們設置了自動確認(`autoAck`)的值為false,所以有必要在傳遞給消費者的方法中進行自動確認(`handleDelivery`方法中)。
更復雜的消費者將會重寫更多的方法。事實上,`handleShutdownSignal`方法被調用當Channel(通道)和連接關閉的時候。并且在調用該消費者的任何回調方法之前將`consumer tag`傳遞給`handleConsumeOk`(com.rabbitmq.client.Consumer接口中定義的方法)方法
消費者還可以分別實現`handleCancelOk`(com.rabbitmq.client.Consumer接口中定義的方法)和`handleCancel`(com.rabbitmq.client.Consumer接口中定義的方法)方法來通知顯式和隱式取消。
你也可以使用Channel.basicCancel方法明確的取消一個特定的消費,傳遞consumer tag,
~~~
channel.basicCancel(consumerTag);
~~~
和生產者一樣,對于消費者來說并發處理消息也要慎重考慮。
回調給消費者是在與實例化其`Channel`(管道)的線程分開的線程池中調度的。 這意味著消費者可以安全地在`Connection`或`Channel`上調用阻塞方法,例如`Channel#queueDeclare`或`Channel#basicCancel`。
每一個`Channel`(管道)都有自己的調度線程。對于最常用的使用方式就是一個消費者一個`Channel`(管道),意味著一個消費者不會阻塞其他的消費。如果是一個`Channel`(管道)多消費者必須明白一個長時間的消費調用可能會阻塞其他消費者的回調調度。
# 通道和并發注意事項(線程安全)
根據經驗,**在線程間共享Channel(通道)是要避免的**。應用應該優先使用每個線程自己的Channel(通道)實例,而不是多個線程共享這個Channel(通道)實例。
雖然有些在Channel(通道)上的操作是可以并發安全的調用,但是一些操作不行會導致一些邊界交錯,雙重確認等等。
在共享(多線程)Channel(通道)上進行并發發布會導致一些邊界交錯,觸發連接協議異常和連接關閉。因此需要嚴格在應用中同步調用(Channel#basicPublish必須在正確關鍵的地方調用)。線程之間的共享也會干擾生產者的消息確認。我們強烈的推薦不應該在通道上進行并發的發布消息。
在共享的Channel(通道)上一個線程生產(publish)消息,一個線程消費(consume)消息是線程安全的。
服務器推送可以同時發送,保證每通道的訂閱被保留。 調度機制使用`java.util.concurrent.ExecutorService`。 可以使用單列的`ConnectionFactory`調用`ConnectionFactory#setSharedExecutor`去設置所有連接共用的`executor`。
當我們手動確認[manual acknowledgements](https://link.jianshu.com?t=http://www.rabbitmq.com/confirms.html) 的時候,很重要的是考慮什么線程去做這個ack確認。如果接收傳遞的線程(例如,Consumer#handleDelivery委托給不同線程的傳遞處理)不同于手動確認的線程,則將多個線程參數設置為true是線程不安全的并導致雙重確認,因此導致通道協議異常導致Channel關閉。一次確認一條消息可以確保安全的。
# 簡單例子
通過ConnectionFactory獲得Connection,Connection得到Channel
**Exchange**
~~~
package rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class ExchangeTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//創建exchange,類型是direct類型
channel.exchangeDeclare("zhihao.miao", "direct");
//創建exchange, 類型是direct類型
channel.exchangeDeclare("zhihao.miao.info", BuiltinExchangeType.DIRECT);
//第三個參數表示是否持久化,同步操作,有返回值
AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare("zhihao.miao.debug", BuiltinExchangeType.DIRECT, true);
System.out.println("---ok---" + ok);
//設置屬性
Map<String, Object> argument = new HashMap<>();
argument.put("alternate-exchange", "log");
//第三個是持久化,第四個是是否自動刪除
channel.exchangeDeclare("zhihao.miao.warn", BuiltinExchangeType.TOPIC, true, false, argument);
//異步創建exchange,沒有返回值,
channel.exchangeDeclareNoWait("zhihao.miao.log", BuiltinExchangeType.TOPIC, true, false, false, argument);
//判斷是否存在, 不存在就報錯
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive("zhihao.miao.info");
System.out.println("---declareOk---" + declareOk);
//判斷是否存在, 不存在就報錯
declareOk = channel.exchangeDeclarePassive("zhihao.miao.debug");
System.out.println("---declareOk2---" + declareOk);
//刪除exchange(可重復執行), 刪除一個不存在的也不會報錯
channel.exchangeDelete("zhihao.miao");
channel.exchangeDelete("zhihao.miao.debug");
channel.exchangeDelete("zhihao.miao.info");
channel.exchangeDelete("zhihao.miao.warn");
channel.exchangeDelete("zhihao.miao.log");
//刪除exchange
channel.exchangeDelete("zhihao.miao.info");
channel.close();
connection.close();
}
}
~~~
隊列的api操作
**queues**
~~~
public class QueueTest {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.131");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zhihao.miao");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//第二個參數表示是否持久化,第三個參數是判斷這個隊列是否在連接是否生效,為true表示連接關閉隊列刪除。
AMQP.Queue.DeclareOk ok = channel.queueDeclare("zhihao.info",true,false,false,null);
System.out.println(ok);
//異步沒有返回值的方法api
channel.queueDeclareNoWait("zhihao.info.miao",true,false,false,null);
//判斷queue是否存在,不存在會拋出異常
//channel.exchangeDeclarePassive("zhihao.info");
//拋出錯誤
//channel.exchangeDeclarePassive("zhihao.info.miao2");
//exchange和queue進行綁定(可重復執行,不會重復創建)
channel.queueBind("zhihao.info","zhihao.miao.order","info");
//異步進行綁定
channel.queueBindNoWait("zhihao.info.miao","zhihao.miao.pay","info",null);
//exchange與exchange進行綁定(可重復執行,不會重復創建)
channel.exchangeBind("zhihao.miao.email","zhihao.miao.weixin","debug");
//exchange和queue進行解綁(可重復執行)
channel.queueUnbind("zhihao.info","zhihao.miao.order","info");
//exchange和exchange進行解綁(可重復執行)
channel.exchangeUnbind("zhihao.info.miao","zhihao.miao.pay","debug");
//刪除隊列
channel.queueDelete("zhihao.info");
channel.close();
connection.close();
}
}
~~~
**消息的發送**
~~~java
public class Sender {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
contentEncoding("UTF-8").build();
//第一個參數是exchange參數,如果是為空字符串,那么就會發送到(AMQP default)默認的exchange,而且routingKey
//便是所要發送到的隊列名
channel.basicPublish("","zhihao.info.miao",properties,"忘記密碼,驗證碼是1234".getBytes());
channel.basicPublish("","zhihao.miao",properties,"忘記密碼,六位驗證密碼是343sdf".getBytes());
//direct類型的exchange類型的exchange,zhihao.miao.order綁定zhihao.info.miao隊列,route key是order
channel.basicPublish("zhihao.miao.order","order",properties,"愛奇藝會員到期了".getBytes());
//zhihao.miao.pay綁定zhihao.info.miao隊列,route key是order
channel.basicPublish("zhihao.miao.pay","pay",properties,"優酷會員到期了".getBytes());
//topic類型的exchange
channel.basicPublish("log","user.log",properties,"你的外賣已經送達".getBytes());
channel.basicPublish("log","user.log.info",properties,"你的外賣正在配送中".getBytes());
channel.basicPublish("log","user",properties,"你的投訴已經采納".getBytes());
channel.close();
connection.close();
}
}
~~~
**消息消費**
~~~
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.131");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zhihao.miao");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
//客戶端的消費消息
Map<String,Object> clientProperties = new HashMap<>();
clientProperties.put("desc","支付系統2.0");
clientProperties.put("author","zhihao.miao");
clientProperties.put("user","zhihao.miao@xxx.com");
connectionFactory.setClientProperties(clientProperties);
//給客戶端的connetction命名
Connection connection = connectionFactory.newConnection("log隊列的消費者");
//給channel起個編號
Channel channel = connection.createChannel(10);
//返回consumerTag,也可以通過重載方法進行設置consumerTag
String consumerTag = channel.basicConsume("user_log_queue",true,new SimpleConsumer(channel));
System.out.println(consumerTag);
TimeUnit.SECONDS.sleep(30);
channel.close();
connection.close();
}
}
~~~
具體的消息邏輯,繼承DefaultConsumer類重寫handleDelivery方法,如果是手工確認消息,會在handleDelivery方法中進行相關的確認(調用相關api)
~~~java
public class SimpleConsumer extends DefaultConsumer{
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println("-----收到消息了---------------");
System.out.println("消息屬性為:"+properties);
System.out.println("消息內容為:"+new String(body));
}
}
~~~
- 基礎
- 編譯和安裝
- classpath到底是什么?
- 編譯運行
- 安裝
- sdkman多版本
- jabba多版本
- java字節碼查看
- 數據類型
- 簡介
- 整形
- char和int
- 變量和常量
- 大數值運算
- 基本類型包裝類
- Math類
- 內存劃分
- 位運算符
- 方法相關
- 方法重載
- 可變參數
- 方法引用
- 面向對象
- 定義
- 繼承和覆蓋
- 接口和抽象類
- 接口定義增強
- 內建函數式接口
- 多態
- 泛型
- final和static
- 內部類
- 包
- 修飾符
- 異常
- 枚舉類
- 代碼塊
- 對象克隆
- BeanUtils
- java基礎類
- scanner類
- Random類
- System類
- Runtime類
- Comparable接口
- Comparator接口
- MessageFormat類
- NumberFormat
- 數組相關
- 數組
- Arrays
- string相關
- String
- StringBuffer
- StringBuilder
- 正則
- 日期類
- Locale類
- Date
- DateFormat
- SimpleDateFormat
- Calendar
- 新時間日期API
- 簡介
- LocalDate,LocalTime,LocalDateTime
- Instant時間點
- 帶時區的日期,時間處理
- 時間間隔
- 日期時間校正器
- TimeUnit
- 用yyyy
- 集合
- 集合和迭代器
- ArrayList集合
- List
- Set
- 判斷集合唯一
- Map和Entry
- stack類
- Collections集合工具類
- Stream數據流
- foreach不能修改內部元素
- of方法
- IO
- File類
- 字節流stream
- 字符流Reader
- IO流分類
- 轉換流
- 緩沖流
- 流的操作規律
- properties
- 序列化流與反序列化流
- 打印流
- System類對IO支持
- commons-IO
- IO流總結
- NIO
- 異步與非阻塞
- IO通信
- Unix的IO模型
- epoll對于文件描述符操作模式
- 用戶空間和內核空間
- NIO與普通IO的主要區別
- Paths,Path,Files
- Buffer
- Channel
- Selector
- Pipe
- Charset
- NIO代碼
- 多線程
- 創建線程
- 線程常用方法
- 線程池相關
- 線程池概念
- ThreadPoolExecutor
- Runnable和Callable
- 常用的幾種線程池
- 線程安全
- 線程同步的幾種方法
- synchronized
- 死鎖
- lock接口
- ThreadLoad
- ReentrantLock
- 讀寫鎖
- 鎖的相關概念
- volatile
- 釋放鎖和不釋放鎖的操作
- 等待喚醒機制
- 線程狀態
- 守護線程和普通線程
- Lamda表達式
- 反射相關
- 類加載器
- 反射
- 注解
- junit注解
- 動態代理
- 網絡編程相關
- 簡介
- UDP
- TCP
- 多線程socket上傳圖片
- NIO
- JDBC相關
- JDBC
- 預處理
- 批處理
- 事務
- properties配置文件
- DBUtils
- DBCP連接池
- C3P0連接池
- 獲得MySQL自動生成的主鍵
- Optional類
- Jigsaw模塊化
- 日志相關
- JDK日志
- log4j
- logback
- xml
- tomcat
- maven
- 簡介
- 倉庫
- 目錄結構
- 常用命令
- 生命周期
- idea配置
- jar包沖突
- 依賴范圍
- 私服
- 插件
- git-commit-id-plugin
- maven-assembly-plugin
- maven-resources-plugin
- maven-compiler-plugin
- versions-maven-plugin
- maven-source-plugin
- tomcat-maven-plugin
- 多環境
- 自定義插件
- stream
- swing
- json
- jackson
- optional
- junit
- gradle
- servlet
- 配置
- ServletContext
- 生命周期
- HttpServlet
- request
- response
- 亂碼
- session和cookie
- cookie
- session
- jsp
- 簡介
- 注釋
- 方法,成員變量
- 指令
- 動作標簽
- 隱式對象
- EL
- JSTL
- javaBean
- listener監聽器
- Filter過濾器
- 圖片驗證碼
- HttpUrlConnection
- 國際化
- 文件上傳
- 文件下載
- spring
- 簡介
- Bean
- 獲取和實例化
- 屬性注入
- 自動裝配
- 繼承和依賴
- 作用域
- 使用外部屬性文件
- spel
- 前后置處理器
- 生命周期
- 掃描規則
- 整合多個配置文件
- 注解
- 簡介
- 注解分層
- 類注入
- 分層和作用域
- 初始化方法和銷毀方法
- 屬性
- 泛型注入
- Configuration配置文件
- aop
- aop的實現
- 動態代理實現
- cglib代理實現
- aop名詞
- 簡介
- aop-xml
- aop-注解
- 代理方式選擇
- jdbc
- 簡介
- JDBCTemplate
- 事務
- 整合
- junit整合
- hibernate
- 簡介
- hibernate.properties
- 實體對象三種狀態
- 檢索方式
- 簡介
- 導航對象圖檢索
- OID檢索
- HQL
- Criteria(QBC)
- Query
- 緩存
- 事務管理
- 關系映射
- 注解
- 優化
- MyBatis
- 簡介
- 入門程序
- Mapper動態代理開發
- 原始Dao開發
- Mapper接口開發
- SqlMapConfig.xml
- map映射文件
- 輸出返回map
- 輸入參數
- pojo包裝類
- 多個輸入參數
- resultMap
- 動態sql
- 關聯
- 一對一
- 一對多
- 多對多
- 整合spring
- CURD
- 占位符和sql拼接以及參數處理
- 緩存
- 延遲加載
- 注解開發
- springMVC
- 簡介
- RequestMapping
- 參數綁定
- 常用注解
- 響應
- 文件上傳
- 異常處理
- 攔截器
- springBoot
- 配置
- 熱更新
- java配置
- springboot配置
- yaml語法
- 運行
- Actuator 監控
- 多環境配置切換
- 日志
- 日志簡介
- logback和access
- 日志文件配置屬性
- 開機自啟
- aop
- 整合
- 整合Redis
- 整合Spring Data JPA
- 基本查詢
- 復雜查詢
- 多數據源的支持
- Repository分析
- JpaSpeci?cationExecutor
- 整合Junit
- 整合mybatis
- 常用注解
- 基本操作
- 通用mapper
- 動態sql
- 關聯映射
- 使用xml
- spring容器
- 整合druid
- 整合郵件
- 整合fastjson
- 整合swagger
- 整合JDBC
- 整合spingboot-cache
- 請求
- restful
- 攔截器
- 常用注解
- 參數校驗
- 自定義filter
- websocket
- 響應
- 異常錯誤處理
- 文件下載
- 常用注解
- 頁面
- Thymeleaf組件
- 基本對象
- 內嵌對象
- 上傳文件
- 單元測試
- 模擬請求測試
- 集成測試
- 源碼解析
- 自動配置原理
- 啟動流程分析
- 源碼相關鏈接
- Servlet,Filter,Listener
- springcloud
- 配置
- 父pom
- 創建子工程
- Eureka
- Hystrix
- Ribbon
- Feign
- Zuul
- kotlin
- 基本數據類型
- 函數
- 區間
- 區塊鏈
- 簡介
- linux
- ulimit修改
- 防止syn攻擊
- centos7部署bbr
- debain9開啟bbr
- mysql
- 隔離性
- sql執行加載順序
- 7種join
- explain
- 索引失效和優化
- 表連接優化
- orderby的filesort問題
- 慢查詢
- show profile
- 全局查詢日志
- 死鎖解決
- sql
- 主從
- IDEA
- mac快捷鍵
- 美化界面
- 斷點調試
- 重構
- springboot-devtools熱部署
- IDEA進行JAR打包
- 導入jar包
- ProjectStructure
- toString添加json模板
- 配置maven
- Lombok插件
- rest client
- 文檔顯示
- sftp文件同步
- 書簽
- 代碼查看和搜索
- postfix
- live template
- git
- 文件頭注釋
- JRebel
- 離線模式
- xRebel
- github
- 連接mysql
- 選項沒有Java class的解決方法
- 擴展
- 項目配置和web部署
- 前端開發
- json和Inject language
- idea內存和cpu變高
- 相關設置
- 設計模式
- 單例模式
- 簡介
- 責任鏈
- JUC
- 原子類
- 原子類簡介
- 基本類型原子類
- 數組類型原子類
- 引用類型原子類
- JVM
- JVM規范內存解析
- 對象的創建和結構
- 垃圾回收
- 內存分配策略
- 備注
- 虛擬機工具
- 內存模型
- 同步八種操作
- 內存區域大小參數設置
- happens-before
- web service
- tomcat
- HTTPS
- nginx
- 變量
- 運算符
- 模塊
- Rewrite規則
- Netty
- netty為什么沒用AIO
- 基本組件
- 源碼解讀
- 簡單的socket例子
- 準備netty
- netty服務端啟動
- 案例一:發送字符串
- 案例二:發送對象
- websocket
- ActiveMQ
- JMS
- 安裝
- 生產者-消費者代碼
- 整合springboot
- kafka
- 簡介
- 安裝
- 圖形化界面
- 生產過程分析
- 保存消息分析
- 消費過程分析
- 命令行
- 生產者
- 消費者
- 攔截器interceptor
- partition
- kafka為什么快
- kafka streams
- kafka與flume整合
- RabbitMQ
- AMQP
- 整體架構
- RabbitMQ安裝
- rpm方式安裝
- 命令行和管控頁面
- 消息生產與消費
- 整合springboot
- 依賴和配置
- 簡單測試
- 多方測試
- 對象支持
- Topic Exchange模式
- Fanout Exchange訂閱
- 消息確認
- java client
- RabbitAdmin和RabbitTemplate
- 兩者簡介
- RabbitmqAdmin
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapter
- MessageConverter
- 詳解
- Jackson2JsonMessageConverter
- ContentTypeDelegatingMessageConverter
- lucene
- 簡介
- 入門程序
- luke查看索引
- 分析器
- 索引庫維護
- elasticsearch
- 配置
- 插件
- head插件
- ik分詞插件
- 常用術語
- Mapping映射
- 數據類型
- 屬性方法
- Dynamic Mapping
- Index Template 索引模板
- 管理映射
- 建立映射
- 索引操作
- 單模式下CURD
- mget多個文檔
- 批量操作
- 版本控制
- 基本查詢
- Filter過濾
- 組合查詢
- 分析器
- redis
- String
- list
- hash
- set
- sortedset
- 發布訂閱
- 事務
- 連接池
- 管道
- 分布式可重入鎖
- 配置文件翻譯
- 持久化
- RDB
- AOF
- 總結
- Lettuce
- zookeeper
- zookeeper簡介
- 集群部署
- Observer模式
- 核心工作機制
- zk命令行操作
- zk客戶端API
- 感知服務動態上下線
- 分布式共享鎖
- 原理
- zab協議
- 兩階段提交協議
- 三階段提交協議
- Paxos協議
- ZAB協議
- hadoop
- 簡介
- hadoop安裝
- 集群安裝
- 單機安裝
- linux編譯hadoop
- 添加新節點
- 退役舊節點
- 集群間數據拷貝
- 歸檔
- 快照管理
- 回收站
- 檢查hdfs健康狀態
- 安全模式
- hdfs簡介
- hdfs命令行操作
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案例-單詞統計
- 局部聚合Combiner
- combiner流程
- combiner案例
- 自定義排序
- 自定義Bean對象
- 排序的分類
- 案例-按總量排序需求
- 一次性完成統計和排序
- 分區
- 分區簡介
- 案例-結果分區
- 多表合并
- reducer端合并
- map端合并(分布式緩存)
- 分組
- groupingComparator
- 案例-求topN
- 全局計數器
- 合并小文件
- 小文件的弊端
- CombineTextInputFormat機制
- 自定義InputFormat
- 自定義outputFormat
- 多job串聯
- 倒排索引
- 共同好友
- 串聯
- 數據壓縮
- InputFormat接口實現類
- yarn簡介
- 推測執行算法
- 本地提交到yarn
- 框架運算全流程
- 數據傾斜問題
- mapreduce的優化方案
- HA機制
- 優化
- Hive
- 安裝
- shell參數
- 數據類型
- 集合類型
- 數據庫
- DDL操作
- 創建表
- 修改表
- 分區表
- 分桶表
- DML操作
- load
- insert
- select
- export,import
- Truncate
- 注意
- 嚴格模式
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transfrom實現
- having和where不同
- 壓縮
- 存儲
- 存儲和壓縮結合使用
- explain詳解
- 調優
- Fetch抓取
- 本地模式
- 表的優化
- GroupBy
- count(Distinct)去重統計
- 行列過濾
- 動態分區調整
- 數據傾斜
- 并行執行
- JVM重用
- 推測執行
- reduce內存和個數
- sql查詢結果作為變量(shell)
- youtube
- flume
- 簡介
- 安裝
- 常用組件
- 攔截器
- 案例
- 監聽端口到控制臺
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 單flume多channel,sink
- 自定義攔截器
- 高可用配置
- 使用注意
- 監控Ganglia
- sqoop
- 安裝
- 常用命令
- 數據導入
- 準備數據
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 打包腳本
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- oozie
- 安裝
- hbase
- 簡介
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- 安裝
- 命令行
- 基本CURD
- java api
- CURD
- CAS
- 過濾器查詢
- 建表高級屬性
- 與mapreduce結合
- 與sqoop結合
- 協處理器
- 參數配置優化
- 數據備份和恢復
- 節點管理
- 案例-點擊流
- 簡介
- HUE
- 安裝
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 單詞統計(接入kafka)
- 并行度和分組
- 啟動流程分析
- ACK容錯機制
- ACK簡介
- BaseRichBolt簡單使用
- BaseBasicBolt簡單使用
- Ack工作機制
- 本地目錄樹
- zookeeper目錄樹
- 通信機制
- 案例
- 日志告警
- 工具
- YAPI
- chrome無法手動拖動安裝插件
- 時間和空間復雜度
- jenkins
- 定位cpu 100%
- 常用腳本工具
- OOM問題定位
- scala
- 編譯
- 基本語法
- 函數
- 數組常用方法
- 集合
- 并行集合
- 類
- 模式匹配
- 異常
- tuple元祖
- actor并發編程
- 柯里化
- 隱式轉換
- 泛型
- 迭代器
- 流stream
- 視圖view
- 控制抽象
- 注解
- spark
- 企業架構
- 安裝
- api開發
- mycat
- Groovy
- 基礎