## 一、文檔批量操作
### 1、批量查詢
~~~
1、批量查詢的好處
就是一條一條的查詢,比如說要查詢100條數據,那么就要發送100次網絡請求,這個開銷還是很大的
如果進行批量查詢的話,查詢100條數據,就只要發送1次網絡請求,網絡請求的性能開銷縮減100倍
2、mget的語法
(1)一條一條的查詢
GET /test_index/test_type/1
GET /test_index/test_type/2
(2)mget批量查詢
GET /_mget
{
"docs" : [
{
"_index" : "test_index",
"_type" : "test_type",
"_id" : 1
},
{
"_index" : "test_index",
"_type" : "test_type",
"_id" : 2
}
]
}
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"test_field1": "test field1",
"test_field2": "test field2"
}
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 1,
"found": true,
"_source": {
"test_content": "my test"
}
}
]
}
(3)如果查詢的document是一個index下的不同type種的話
GET /test_index/_mget
{
"docs" : [
{
"_type" : "test_type",
"_id" : 1
},
{
"_type" : "test_type",
"_id" : 2
}
]
}
(4)如果查詢的數據都在同一個index下的同一個type下,最簡單了
GET /test_index/test_type/_mget
{
"ids": [1, 2]
}
3、mget的重要性
可以說mget是很重要的,一般來說,在進行查詢的時候,如果一次性要查詢多條數據的話,那么一定要用batch批量操作的api
盡可能減少網絡開銷次數,可能可以將性能提升數倍,甚至數十倍,非常非常之重要
復制代碼
~~~
### 2、批量增刪改
~~~
POST /_bulk
{ "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }}
{ "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}
{ "test_field": "test12" }
{ "index": { "_index": "test_index", "_type": "test_type", "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
每一個操作要兩個json串,語法如下:
{"action": {"metadata"}}
{"data"}
舉例,比如你現在要創建一個文檔,放bulk里面,看起來會是這樣子的:
{"index": {"_index": "test_index", "_type", "test_type", "_id": "1"}}
{"test_field1": "test1", "test_field2": "test2"}
有哪些類型的操作可以執行呢?
(1)delete:刪除一個文檔,只要1個json串就可以了
(2)create:PUT /index/type/id/_create,強制創建
(3)index:普通的put操作,可以是創建文檔,也可以是全量替換文檔
(4)update:執行的partial update操作
bulk api對json的語法,有嚴格的要求,每個json串不能換行,只能放一行,同時一個json串和一個json串之間,必須有一個換行
{
"error": {
"root_cause": [
{
"type": "json_e_o_f_exception",
"reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 3]"
}
],
"type": "json_e_o_f_exception",
"reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 3]"
},
"status": 500
}
{
"took": 41,
"errors": true,
"items": [
{
"delete": {
"found": true,
"_index": "test_index",
"_type": "test_type",
"_id": "10",
"_version": 3,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true,
"status": 201
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[test_type][2]: version conflict, document already exists (current version [1])",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "2",
"index": "test_index"
}
}
},
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true,
"status": 201
}
},
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false,
"status": 200
}
},
{
"update": {
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200
}
}
]
}
bulk操作中,任意一個操作失敗,是不會影響其他的操作的,但是在返回結果里,會告訴你異常日志
POST /test_index/_bulk
{ "delete": { "_type": "test_type", "_id": "3" }}
{ "create": { "_type": "test_type", "_id": "12" }}
{ "test_field": "test12" }
{ "index": { "_type": "test_type" }}
{ "test_field": "auto-generate id test" }
{ "index": { "_type": "test_type", "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
POST /test_index/test_type/_bulk
{ "delete": { "_id": "3" }}
{ "create": { "_id": "12" }}
{ "test_field": "test12" }
{ "index": { }}
{ "test_field": "auto-generate id test" }
{ "index": { "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
2、bulk size最佳大小
bulk request會加載到內存里,如果太大的話,性能反而會下降,因此需要反復嘗試一個最佳的bulk size。一般從1000~5000條數據開始,嘗試逐漸增加。另外,如果看大小的話,最好是在5~15MB之間。
復制代碼
~~~
## 二、數據路由
### 1.document路由到shard上是什么意思?
~~~
一個index的數據會被分為多片,每片都在一個shard中,所以每個document只能存在一個shard中。
當在客戶端創建document的時候,es就要決定這個document是要放在哪個index的shard上。這個過程就叫做document數據路由
復制代碼
~~~
### 2.路由算法
~~~
shard = hash(routing) % number_of_primary_shards
舉個例子,一個index有3個primary shard,P0,P1,P2
每次增刪改查一個document的時候,都會帶過來一個routing number,默認就是這個document的_id(可能是手動指定,也可能是自動生成)
routing = _id,假設_id=1
會將這個routing值,傳入一個hash函數中,產出一個routing值的hash值,hash(routing) = 21
然后將hash函數產出的值對這個index的primary shard的數量求余數,21 % 3 = 0
就決定了,這個document就放在P0上。
決定一個document在哪個shard上,最重要的一個值就是routing值,默認是_id,也可以手動指定,相同的routing值,每次過來,從hash函數中,產出的hash值一定是相同的
無論hash值是幾,無論是什么數字,對number_of_primary_shards求余數,結果一定是在0~number_of_primary_shards-1之間這個范圍內的。0,1,2。
復制代碼
~~~
### 3.\_id or custom routing value
~~~
默認的routing就是_id
也可以在發送請求的時候,手動指定一個routing value,比如說put /index/type/id?routing=user_id
手動指定routing value是很有用的,可以保證說,某一類document一定被路由到一個shard上去,那么在后續進行應用級別的負載均衡,以及提升批量讀取的性能的時候,是很有幫助的
復制代碼
~~~
### 4\. primary shard數量為什么不可變?
~~~
就是為了計算document 的路由。
復制代碼
~~~
## 三、document內部增刪改原理
~~~
(1)客戶端選擇一個node發送請求過去,這個node就是coordinating node(協調節點)
(2)coordinating node,對document進行路由,將請求轉發給對應的node(有primary shard)
(3)實際的node上的primary shard處理請求,然后將數據同步到replica node
(4)coordinating node,如果發現primary node和所有replica node都搞定之后,就返回響應結果給客戶端
復制代碼
~~~

## 四、document 讀請求內部原理(與上面大致差不多)
~~~
1、客戶端發送請求到任意一個node,成為coordinate node
2、coordinate node對document進行路由,將請求轉發到對應的node,此時會使用round-robin隨機輪詢算法,在primary shard以及其所有replica中隨機選擇一個,讓讀請求負載均衡
3、接收請求的node返回document給coordinate node
4、coordinate node返回document給客戶端
5、特殊情況:document如果還在建立索引過程中,可能只有primary shard有,任何一個replica shard都沒有,此時可能會導致無法讀取到document,但是document完成索引建立之后,primary shard和replica shard就都有了
復制代碼
~~~
## 五、寫一致性及 quorum機制
#### 1.寫一致性
~~~
如果有3個primary shard 必須primary shard都是活躍狀態 才能執行寫入操作。
復制代碼
~~~
#### 2.quorum機制
~~~
(1)consistency,one(primary shard),all(all shard),quorum(default)
我們在發送任何一個增刪改操作的時候,比如說put /index/type/id,都可以帶上一個consistency參數,指明我們想要的寫一致性是什么?
put /index/type/id?consistency=quorum
one:要求我們這個寫操作,只要有一個primary shard是active活躍可用的,就可以執行
all:要求我們這個寫操作,必須所有的primary shard和replica shard都是活躍的,才可以執行這個寫操作
quorum:默認的值,要求所有的shard中,必須是大部分的shard都是活躍的,可用的,才可以執行這個寫操作
(2)quorum機制,寫之前必須確保大多數shard都可用,int( (primary + number_of_replicas) / 2 ) + 1,當number_of_replicas>1時才生效
quroum = int( (primary + number_of_replicas) / 2 ) + 1
舉個例子,3個primary shard,number_of_replicas=1,總共有3 + 3 * 1 = 6個shard
quorum = int( (3 + 1) / 2 ) + 1 = 3
所以,要求6個shard中至少有3個shard是active狀態的,才可以執行這個寫操作
(3)如果節點數少于quorum數量,可能導致quorum不齊全,進而導致無法執行任何寫操作
3個primary shard,replica=1,要求至少3個shard是active,3個shard按照之前學習的shard&replica機制,必須在不同的節點上,如果說只有1臺機器的話,是不是有可能出現說,3個shard都沒法分配齊全,此時就可能會出現寫操作無法執行的情況
1個primary shard,replica=3,quorum=((1 + 3) / 2) + 1 = 3,要求1個primary shard + 3個replica shard = 4個shard,其中必須有3個shard是要處于active狀態的。如果這個時候只有2臺機器的話,會出現什么情況呢?
es提供了一種特殊的處理場景,就是說當number_of_replicas>1時才生效,因為假如說,你就一個primary shard,replica=1,此時就2個shard
(1 + 1 / 2) + 1 = 2,要求必須有2個shard是活躍的,但是可能就1個node,此時就1個shard是活躍的,如果你不特殊處理的話,導致我們的單節點集群就無法工作
(4)quorum不齊全時,wait,默認1分鐘,timeout,100,30s
等待期間,期望活躍的shard數量可以增加,最后實在不行,就會timeout
我們其實可以在寫操作的時候,加一個timeout參數,比如說put /index/type/id?timeout=30,這個就是說自己去設定quorum不齊全的時候,es的timeout時長,可以縮短,也可以增長
復制代碼
~~~
## 六、bulk api 奇特格式與底層性能優化
#### 1.前面批量增刪改 大致提了下關于 bulk 的用法
~~~
bulk api奇特的json格式
{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n
[{
"action": {
},
"data": {
}
}]
1、bulk中的每個操作都可能要轉發到不同的node的shard去執行
2、如果采用比較良好的json數組格式
允許任意的換行,整個可讀性非常棒,讀起來很爽,es拿到那種標準格式的json串以后,要按照下述流程去進行處理
(1)將json數組解析為JSONArray對象,這個時候,整個數據,就會在內存中出現一份一模一樣的拷貝,一份數據是json文本,一份數據是JSONArray對象
(2)解析json數組里的每個json,對每個請求中的document進行路由
(3)為路由到同一個shard上的多個請求,創建一個請求數組
(4)將這個請求數組序列化
(5)將序列化后的請求數組發送到對應的節點上去
3、耗費更多內存,更多的jvm gc開銷
我們之前提到過bulk size最佳大小的那個問題,一般建議說在幾千條那樣,然后大小在10MB左右,所以說,可怕的事情來了。假設說現在100個bulk請求發送到了一個節點上去,然后每個請求是10MB,100個請求,就是1000MB = 1GB,然后每個請求的json都copy一份為jsonarray對象,此時內存中的占用就會翻倍,就會占用2GB的內存,甚至還不止。因為弄成jsonarray之后,還可能會多搞一些其他的數據結構,2GB+的內存占用。
占用更多的內存可能就會積壓其他請求的內存使用量,比如說最重要的搜索請求,分析請求,等等,此時就可能會導致其他請求的性能急速下降
另外的話,占用內存更多,就會導致java虛擬機的垃圾回收次數更多,跟頻繁,每次要回收的垃圾對象更多,耗費的時間更多,導致es的java虛擬機停止工作線程的時間更多
4、現在的奇特格式
{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n
(1)不用將其轉換為json對象,不會出現內存中的相同數據的拷貝,直接按照換行符切割json
(2)對每兩個一組的json,讀取meta,進行document路由
(3)直接將對應的json發送到node上去
5、最大的優勢在于,不需要將json數組解析為一個JSONArray對象,形成一份大數據的拷貝,浪費內存空間,盡可能地保證性能
~~~
作者:Leo\_CX330
鏈接:https://juejin.cn/post/6926344149796913166
來源:掘金
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
- 一.JVM
- 1.1 java代碼是怎么運行的
- 1.2 JVM的內存區域
- 1.3 JVM運行時內存
- 1.4 JVM內存分配策略
- 1.5 JVM類加載機制與對象的生命周期
- 1.6 常用的垃圾回收算法
- 1.7 JVM垃圾收集器
- 1.8 CMS垃圾收集器
- 1.9 G1垃圾收集器
- 2.面試相關文章
- 2.1 可能是把Java內存區域講得最清楚的一篇文章
- 2.0 GC調優參數
- 2.1GC排查系列
- 2.2 內存泄漏和內存溢出
- 2.2.3 深入理解JVM-hotspot虛擬機對象探秘
- 1.10 并發的可達性分析相關問題
- 二.Java集合架構
- 1.ArrayList深入源碼分析
- 2.Vector深入源碼分析
- 3.LinkedList深入源碼分析
- 4.HashMap深入源碼分析
- 5.ConcurrentHashMap深入源碼分析
- 6.HashSet,LinkedHashSet 和 LinkedHashMap
- 7.容器中的設計模式
- 8.集合架構之面試指南
- 9.TreeSet和TreeMap
- 三.Java基礎
- 1.基礎概念
- 1.1 Java程序初始化的順序是怎么樣的
- 1.2 Java和C++的區別
- 1.3 反射
- 1.4 注解
- 1.5 泛型
- 1.6 字節與字符的區別以及訪問修飾符
- 1.7 深拷貝與淺拷貝
- 1.8 字符串常量池
- 2.面向對象
- 3.關鍵字
- 4.基本數據類型與運算
- 5.字符串與數組
- 6.異常處理
- 7.Object 通用方法
- 8.Java8
- 8.1 Java 8 Tutorial
- 8.2 Java 8 數據流(Stream)
- 8.3 Java 8 并發教程:線程和執行器
- 8.4 Java 8 并發教程:同步和鎖
- 8.5 Java 8 并發教程:原子變量和 ConcurrentMap
- 8.6 Java 8 API 示例:字符串、數值、算術和文件
- 8.7 在 Java 8 中避免 Null 檢查
- 8.8 使用 Intellij IDEA 解決 Java 8 的數據流問題
- 四.Java 并發編程
- 1.線程的實現/創建
- 2.線程生命周期/狀態轉換
- 3.線程池
- 4.線程中的協作、中斷
- 5.Java鎖
- 5.1 樂觀鎖、悲觀鎖和自旋鎖
- 5.2 Synchronized
- 5.3 ReentrantLock
- 5.4 公平鎖和非公平鎖
- 5.3.1 說說ReentrantLock的實現原理,以及ReentrantLock的核心源碼是如何實現的?
- 5.5 鎖優化和升級
- 6.多線程的上下文切換
- 7.死鎖的產生和解決
- 8.J.U.C(java.util.concurrent)
- 0.簡化版(快速復習用)
- 9.鎖優化
- 10.Java 內存模型(JMM)
- 11.ThreadLocal詳解
- 12 CAS
- 13.AQS
- 0.ArrayBlockingQueue和LinkedBlockingQueue的實現原理
- 1.DelayQueue的實現原理
- 14.Thread.join()實現原理
- 15.PriorityQueue 的特性和原理
- 16.CyclicBarrier的實際使用場景
- 五.Java I/O NIO
- 1.I/O模型簡述
- 2.Java NIO之緩沖區
- 3.JAVA NIO之文件通道
- 4.Java NIO之套接字通道
- 5.Java NIO之選擇器
- 6.基于 Java NIO 實現簡單的 HTTP 服務器
- 7.BIO-NIO-AIO
- 8.netty(一)
- 9.NIO面試題
- 六.Java設計模式
- 1.單例模式
- 2.策略模式
- 3.模板方法
- 4.適配器模式
- 5.簡單工廠
- 6.門面模式
- 7.代理模式
- 七.數據結構和算法
- 1.什么是紅黑樹
- 2.二叉樹
- 2.1 二叉樹的前序、中序、后序遍歷
- 3.排序算法匯總
- 4.java實現鏈表及鏈表的重用操作
- 4.1算法題-鏈表反轉
- 5.圖的概述
- 6.常見的幾道字符串算法題
- 7.幾道常見的鏈表算法題
- 8.leetcode常見算法題1
- 9.LRU緩存策略
- 10.二進制及位運算
- 10.1.二進制和十進制轉換
- 10.2.位運算
- 11.常見鏈表算法題
- 12.算法好文推薦
- 13.跳表
- 八.Spring 全家桶
- 1.Spring IOC
- 2.Spring AOP
- 3.Spring 事務管理
- 4.SpringMVC 運行流程和手動實現
- 0.Spring 核心技術
- 5.spring如何解決循環依賴問題
- 6.springboot自動裝配原理
- 7.Spring中的循環依賴解決機制中,為什么要三級緩存,用二級緩存不夠嗎
- 8.beanFactory和factoryBean有什么區別
- 九.數據庫
- 1.mybatis
- 1.1 MyBatis-# 與 $ 區別以及 sql 預編譯
- Mybatis系列1-Configuration
- Mybatis系列2-SQL執行過程
- Mybatis系列3-之SqlSession
- Mybatis系列4-之Executor
- Mybatis系列5-StatementHandler
- Mybatis系列6-MappedStatement
- Mybatis系列7-參數設置揭秘(ParameterHandler)
- Mybatis系列8-緩存機制
- 2.淺談聚簇索引和非聚簇索引的區別
- 3.mysql 證明為什么用limit時,offset很大會影響性能
- 4.MySQL中的索引
- 5.數據庫索引2
- 6.面試題收集
- 7.MySQL行鎖、表鎖、間隙鎖詳解
- 8.數據庫MVCC詳解
- 9.一條SQL查詢語句是如何執行的
- 10.MySQL 的 crash-safe 原理解析
- 11.MySQL 性能優化神器 Explain 使用分析
- 12.mysql中,一條update語句執行的過程是怎么樣的?期間用到了mysql的哪些log,分別有什么作用
- 十.Redis
- 0.快速復習回顧Redis
- 1.通俗易懂的Redis數據結構基礎教程
- 2.分布式鎖(一)
- 3.分布式鎖(二)
- 4.延時隊列
- 5.位圖Bitmaps
- 6.Bitmaps(位圖)的使用
- 7.Scan
- 8.redis緩存雪崩、緩存擊穿、緩存穿透
- 9.Redis為什么是單線程、及高并發快的3大原因詳解
- 10.布隆過濾器你值得擁有的開發利器
- 11.Redis哨兵、復制、集群的設計原理與區別
- 12.redis的IO多路復用
- 13.相關redis面試題
- 14.redis集群
- 十一.中間件
- 1.RabbitMQ
- 1.1 RabbitMQ實戰,hello world
- 1.2 RabbitMQ 實戰,工作隊列
- 1.3 RabbitMQ 實戰, 發布訂閱
- 1.4 RabbitMQ 實戰,路由
- 1.5 RabbitMQ 實戰,主題
- 1.6 Spring AMQP 的 AMQP 抽象
- 1.7 Spring AMQP 實戰 – 整合 RabbitMQ 發送郵件
- 1.8 RabbitMQ 的消息持久化與 Spring AMQP 的實現剖析
- 1.9 RabbitMQ必備核心知識
- 2.RocketMQ 的幾個簡單問題與答案
- 2.Kafka
- 2.1 kafka 基礎概念和術語
- 2.2 Kafka的重平衡(Rebalance)
- 2.3.kafka日志機制
- 2.4 kafka是pull還是push的方式傳遞消息的?
- 2.5 Kafka的數據處理流程
- 2.6 Kafka的腦裂預防和處理機制
- 2.7 Kafka中partition副本的Leader選舉機制
- 2.8 如果Leader掛了的時候,follower沒來得及同步,是否會出現數據不一致
- 2.9 kafka的partition副本是否會出現腦裂情況
- 十二.Zookeeper
- 0.什么是Zookeeper(漫畫)
- 1.使用docker安裝Zookeeper偽集群
- 3.ZooKeeper-Plus
- 4.zk實現分布式鎖
- 5.ZooKeeper之Watcher機制
- 6.Zookeeper之選舉及數據一致性
- 十三.計算機網絡
- 1.進制轉換:二進制、八進制、十六進制、十進制之間的轉換
- 2.位運算
- 3.計算機網絡面試題匯總1
- 十四.Docker
- 100.面試題收集合集
- 1.美團面試常見問題總結
- 2.b站部分面試題
- 3.比心面試題
- 4.騰訊面試題
- 5.哈羅部分面試
- 6.筆記
- 十五.Storm
- 1.Storm和流處理簡介
- 2.Storm 核心概念詳解
- 3.Storm 單機版本環境搭建
- 4.Storm 集群環境搭建
- 5.Storm 編程模型詳解
- 6.Storm 項目三種打包方式對比分析
- 7.Storm 集成 Redis 詳解
- 8.Storm 集成 HDFS 和 HBase
- 9.Storm 集成 Kafka
- 十六.Elasticsearch
- 1.初識ElasticSearch
- 2.文檔基本CRUD、集群健康檢查
- 3.shard&replica
- 4.document核心元數據解析及ES的并發控制
- 5.document的批量操作及數據路由原理
- 6.倒排索引
- 十七.分布式相關
- 1.分布式事務解決方案一網打盡
- 2.關于xxx怎么保證高可用的問題
- 3.一致性hash原理與實現
- 4.微服務注冊中心 Nacos 比 Eureka的優勢
- 5.Raft 協議算法
- 6.為什么微服務架構中需要網關
- 0.CAP與BASE理論
- 十八.Dubbo
- 1.快速掌握Dubbo常規應用
- 2.Dubbo應用進階
- 3.Dubbo調用模塊詳解
- 4.Dubbo調用模塊源碼分析
- 6.Dubbo協議模塊