> 原文出處:http://www.infoq.com/cn/articles/database-timestamp-03
> 作者: 陶文
## 加載
如何利用索引和主存儲,是一種兩難的選擇。
* 選擇不使用索引,只使用主存儲:除非查詢的字段就是主存儲的排序字段,否則就需要順序掃描整個主存儲。
* 選擇使用索引,然后用找到的row id去主存儲加載數據:這樣會導致很多碎片化的隨機讀操作。
沒有所謂完美的解決方案。MySQL支持索引,一般索引檢索出來的行數也就是在1~100條之間。如果索引檢索出來很多行,很有可能MySQL會選擇不使用索引而直接掃描主存儲,這就是因為用row id去主存儲里讀取行的內容是碎片化的隨機讀操作,這在普通磁盤上很慢。
Opentsdb是另外一個極端,它完全沒有索引,只有主存儲。使用Opentsdb可以按照主存儲的排序順序快速地掃描很多條記錄。但是訪問的不是按主存儲的排序順序仍然要面對隨機讀的問題。
Elasticsearch/Lucene的解決辦法是讓主存儲的隨機讀操作變得很快,從而可以充分利用索引,而不用懼怕從主存儲里隨機讀加載幾百萬行帶來的代價。
## Opentsdb 的弱點
Opentsdb沒有索引,主存儲是Hbase。所有的數據點按照時間順序排列存儲在Hbase中。Hbase是一種支持排序的存儲引擎,其排序的方式是根據每個row的rowkey(就是關系數據庫里的主鍵的概念)。MySQL存儲時間序列的最佳實踐是利用MySQL的Innodb的clustered index特性,使用它去模仿類似Hbase按rowkey排序的效果。所以Opentsdb的弱點也基本適用于MySQL。Opentsdb的rowkey的設計大致如下:
~~~
[metric_name][timestamp][tags]
~~~
舉例而言:
~~~
Proc.load_avg.1m 12:05:00 ip=10.0.0.1
Proc.load_avg.1m 12:05:00 ip=10.0.0.2
Proc.load_avg.1m 12:05:01 ip=10.0.0.1
Proc.load_avg.1m 12:05:01 ip=10.0.0.2
Proc.load_avg.5m 12:05:00 ip=10.0.0.1
Proc.load_avg:5m 12:05:00 ip=10.0.0.2
~~~
也就是行是先按照metric_name排序,再按照timestamp排序,再按照tags來排序。
對于這樣的rowkey設計,獲取一個metric在一個時間范圍內的所有數據是很快的,比如Proc.load_avg.1m在12:05到12:10之間的所有數據。先找到Proc.load_avg.1m 12:05:00的行號,然后按順序掃描就可以了。
但是以下兩種情況就麻煩了。
* 獲取12:05 到 12:10 所有 Proc.load_avg.* 的數據,如果預先知道所有的metric name包括Proc.load_avg.1m,Proc.load_avg.5m,Proc.load_avg.15m。這樣會導致很多的隨機讀。如果不預先知道所有的metric name,就無法知道Proc.load_avg.*代表了什么。
* 獲取指定ip的數據。因為ip是做為tags保存的。即便是訪問一個ip的數據,也要把所有其他的ip數據讀取出來再過濾掉。如果ip總數有十多萬個,那么查詢的效率也會非常低。為了讓這樣的查詢變得更快,需要把ip編碼到metric_name里去。比如ip.10.0.0.1.Proc.load_avg.1m 這樣。
所以結論是,不用索引是不行的。如果希望支持任意條件的組合查詢,只有主存儲的排序是無法對所有查詢條件進行優化的。但是如果查詢條件是固定的一種,那么可以像Opentsdb這樣只有一個主存儲,做針對性的優化。
## DocValues為什么快?
DocValues是一種按列組織的存儲格式,這種存儲方式降低了隨機讀的成本。傳統的按行存儲是這樣的:

1和2代表的是docid。顏色代表的是不同的字段。
改成按列存儲是這樣的:

按列存儲的話會把一個文件分成多個文件,每個列一個。對于每個文件,都是按照docid排序的。這樣一來,只要知道docid,就可以計算出這個docid在這個文件里的偏移量。也就是對于每個docid需要一次隨機讀操作。
那么這種排列是如何讓隨機讀更快的呢?秘密在于Lucene底層讀取文件的方式是基于memory mapped byte buffer的,也就是mmap。這種文件訪問的方式是由操作系統去緩存這個文件到內存里。這樣在內存足夠的情況下,訪問文件就相當于訪問內存。那么隨機讀操作也就不再是磁盤操作了,而是對內存的隨機讀。
那么為什么按行存儲不能用mmap的方式呢?因為按行存儲的方式一個文件里包含了很多列的數據,這個文件尺寸往往很大,超過了操作系統的文件緩存的大小。而按列存儲的方式把不同列分成了很多文件,可以只緩存用到的那些列,而不讓很少使用的列數據浪費內存。
按列存儲之后,一個列的數據和前面的posting list就差不多了。很多應用在posting list上的壓縮技術也可以應用到DocValues上。這不但減少了文件尺寸,而且提高數據加載的速度。因為我們知道從磁盤到內存的帶寬是很小的,普通磁盤也就每秒100MB的讀速度。利用壓縮,我們可以把數據以壓縮的方式讀取出來,然后在內存里再進行解壓,從而獲得比讀取原始數據更高的效率。
如果內存不夠是不是會使得隨機讀的速度變慢?肯定會的。但是mmap是操作系統實現的API,其內部有預讀取機制。如果讀取offset為100的文件位置,默認會把后面16k的文件內容都預讀取出來都緩存在內存里。因為DocValues是只讀,而且順序排序存儲的。相比b-tree等存儲結構,在磁盤上沒有空洞和碎片。而隨機讀的時候也是按照DocId排序的。所以如果讀取的DocId是緊密相連的,實際上也相當于把隨機讀變成了順序讀了。Random_read(100), Random_read(101), Random_read(102)就相當于Scan(100~102)了。
## 分布式計算
### 分布式聚合如何做得快
Elasticsearch/Lucene從最底層就支持數據分片,查詢的時候可以自動把不同分片的查詢結果合并起來。Elasticsearch的document都有一個uid,默認策略是按照uid 的 hash把文檔進行分片。

一個Elasticsearch Index相當于一個MySQL里的表,不同Index的數據是物理上隔離開來的。Elasticsearch的Index會分成多個Shard存儲,一部分Shard是Replica備份。一個Shard是一份本地的存儲(一個本地磁盤上的目錄),也就是一個Lucene的Index。不同的Shard可能會被分配到不同的主機節點上。一個Lucene Index會存儲很多的doc,為了好管理,Lucene把Index再拆成了Segment存儲(子目錄)。Segment內的doc數量上限是1的31次方,這樣doc id就只需要一個int就可以存儲。Segment對應了一些列文件存儲索引(倒排表等)和主存儲(DocValues等),這些文件內部又分為小的Block進行壓縮。

時間序列數據一般按照日期分成多個Elasticsearch Index來存儲,比如logstash-2014.08.02。查詢的時候可以指定多個Elasticsearch Index作為查找的范圍,也可以用logstash-*做模糊匹配。
美妙之處在于,雖然數據被拆得七零八落的,在查詢聚合的時候甚至需要分為兩個階段完成。但是對于最終用戶來說,使用起來就好像是一個數據庫表一樣。所有的合并查詢的細節都是隱藏起來的。
對于聚合查詢,其處理是分兩階段完成的:
* Shard本地的Lucene Index并行計算出局部的聚合結果;
* 收到所有的Shard的局部聚合結果,聚合出最終的聚合結果。
這種兩階段聚合的架構使得每個shard不用把原數據返回,而只用返回數據量小得多的聚合結果。相比Opentsdb這樣的數據庫設計更合理。Opentsdb其聚合只在最終節點處完成,所有的分片數據要匯聚到一個地方進行計算,這樣帶來大量的網絡帶寬消耗。所以Influxdb等更新的時間序列數據庫選擇把分布式計算模塊和存儲引擎進行同機部署,以減少網絡帶寬的影響。
除此之外Elasticsearch還有另外一個減少聚合過程中網絡傳輸量的優化,那就是[Hyperloglog](https://www.elastic.co/blog/count-elasticsearch)算法。在計算unique visitor(uv)這樣的場景下,經常需要按用戶id去重之后統計人數。最簡單的實現是用一個hashset保存這些用戶id。但是用set保存所有的用戶id做去重需要消耗大量的內存,同時分布式聚合的時候也要消耗大量的網絡帶寬。Hyperloglog算法以一定的誤差做為代價,可以用很小的數據量保存這個set,從而減少網絡傳輸消耗。
##### 為什么時間序列需要更復雜的聚合?
關系型數據庫支持一些很復雜的聚合查詢邏輯,比如:
* Join兩張表;
* Group by之后用Having再對聚合結果進行過濾;
* 用子查詢對聚合結果進行二次聚合。
在使用時間序列數據庫的時候,我們經常會懷念這些SQL的查詢能力。在時間序列里有一個特別常見的需求就是降頻和降維。舉例如下:
~~~
12:05:05 湖南 81
12:05:07 江西 30
12:05:11 湖南 80
12:05:12 江西 32
12:05:16 湖南 80
12:05:16 江西 30
~~~
按1分鐘頻率進行max的降頻操作得出的結果是:
~~~
12:05 湖南 81
12:05 江西 32
~~~
這種按max進行降頻的最常見的場景是采樣點的歸一化。不同的采集器采樣的時間點是不同的,為了避免漏點也會加大采樣率。這樣就可能導致一分鐘內采樣多次,而且采樣點的時間都不對齊。在查詢的時候按max進行降頻可以得出一個統一時間點的數據。
按sum進行降維的結果是:
~~~
12:05 113
~~~
經常我們需要舍棄掉某些維度進行一個加和的統計。這個統計需要在時間點對齊之后再進行計算。這就導致一個查詢需要做兩次,上面的例子里:
* 先按1分鐘,用max做降頻;
* 再去掉省份維度,用sum做降維。
如果僅僅能做一次聚合,要么用sum做聚合,要么用max做聚合。無法滿足業務邏輯的需求。為了避免在一個查詢里做兩次聚合,大部分的時間序列數據庫都要求數據在入庫的時候已經是整點整分的。這就要求數據不能直接從采集點直接入庫,而要經過一個實時計算管道進行處理。如果能夠在查詢的時候同時完成降頻和降維,那就可以帶來一些使用上的便利。
這個功能看似簡單,其實非常難以實現。很多所謂的支持大數據的數據庫都只支持簡單的一次聚合操作。Elasticsearch 將要發布的 2.0 版本的最重量級的新特性是Pipeline Aggregation,它支持數據在聚合之后再做聚合。類似SQL的子查詢和Having等功能都將被支持。
## 總結
時間序列隨著Internet of Things等潮流的興起正變得越來越常見。希望本文可以幫助你了解到那些號稱自己非常海量,查詢非常快的時間序列數據庫背后的秘密。沒有完美的數據庫,Elasticsearch也不例外。如果你的用例根本不包括聚合的需求,也許Opentsdb甚至MySQL就是你最好的選擇。但是如果你需要聚合海量的時間序列數據,一定要嘗試一下Elasticsearch!
## 作者簡介
**陶文**,曾就職于騰訊IEG的藍鯨產品中心,負責過告警平臺的架構設計與實現。2006年從ThoughtWorks開始職業生涯,在大型遺留系統的重構,持續交付能力建設,高可用分布式系統構建方面積累了豐富的經驗。