作者 閻志濤
[TOC]
## 前言
隨著移動互聯網時代的到來,越來越多的與人、與物、與環境有關的數據產生,大數據技術也變得越來越重要。在國內,大數據也由幾年前的概念階段逐漸的在不同的企業和行業落地,并且對企業的運營、發展起到了越來越重要的作用。從2011年創業之初,TalkingData就堅信一句話:”In God we trust, everyone else must bring data”. 期望通過數據去改變人們做決策的方式,通過數據讓人們更好的了解自己。創業這4年,對于TalkingData技術團隊來講,也是對大數據技術架構的認識逐步深入的過程。經過4年的發展,我們每天處理的新增數據由幾個GB逐漸的增加到如今的數個TB。而數據計算類型也從最初的統計分析的類型到支持多維交叉、即席查詢、機器學習、廣告歸因等等多種計算類型。一路跌跌撞撞走來,TalkingData技術團隊在大數據技術上踩過不少坑,有過許多個不眠之夜,也逐漸總結了一些自己的經驗,并在這里分享給大家。
## 大數據平臺之初試
提到大數據平臺,首先需要考慮的是采用什么樣的技術架構。TalkingData在成立之初,主要業務是移動統計分析業務,主要幫助我們的客戶分析移動應用的新增、活躍、留存、流失等等指標。包括這些指標在渠道、版本等不同維度的分布。在2011年,大部分的分析系統還都是純粹離線的分析系統,所有的指標都是T+1才能獲取。但是考慮到用戶的體驗,所有的數據都是T+1顯然是不能夠滿足用戶的需求的,用戶需要知道當天的實時數據的分析結果。為了解決這個問題,大數據平臺的高層架構如下:

數據通過數據收集器進行接收,接收后的數據會發送到消息隊列中。ETL負責將數據進行規范化和邏輯組織、抽取,然后發送給實時計算部分和離線批量計算部分。實時計算部分主要負責計算當天的實時數據,而離線批量處理部分則主要計算當天以前的數據。查詢API則提供統計分析業務對計算結果的查詢。
具體到采用的技術,平臺的組件架構圖如下:

在這個架構當中,不同的組件采用的技術分別為:
## Data Collector(收據收集器)
Data Collector主要負責收集從SDK發送來的各種數據,以日志的形式保留在本地,然后再將數據發送到消息隊列中。 整個Data Collector組件包括Nginx作為負載均衡器,接收所有從SDK發送來的請求數據并發送到后方的真正處理數據的Data Collector中。Data Collector是運行在Jetty容器中的java servlet,利用容器提供的多線程的支持,接收并處理SDK發送上來的數據。數據會先以日志的形式存儲在本地磁盤,然后再將數據發送到Message Queue中。Data Collector由于承擔了數據接收的工作,在設計實現中不承擔任何的計算邏輯,主要承擔的是存儲和轉發的邏輯,從而能夠高效的接收數據。
## Message Queue(消息隊列)
消息隊列主要是為了解耦數據接收和數據計算的邏輯,在第一個版本中,采用的是輕量級的消息隊列Kestrel。作為一個輕量級的消息隊列,Kestrel非常的輕巧和方便使用,并且能夠支持消息的存儲,對消息的訪問支持memcached協議,并且有非常不錯的讀寫性能。對于快速構建一個支持異步處理的分布式系統來講,Kestrel無疑是一個非常簡單方便的選擇。
## Batch Computing(離線處理)
Batch Computing主要是對非實時要求的數據做批量處理,在2011年,想做離線批量計算,能夠選擇必然是hadoop生態系統中的某種技術。可以選擇自己寫MapReduce,也可以選擇Pig或者Hive來完成對應的工作。考慮到開發的方便性,Hive因為其支持類似于標準SQL的HQL最終被我們選擇為離線處理的計算平臺。批量計算的結果,會存儲到Result Store中。為了解決多維交叉的問題,在批量處理過程中,我們會對每個維度生成對應設備的bitmap索引,同時也會將索引存儲到Result Store中。
## Real-time Computing(實時計算)
Real-time Computing主要是為了解決客戶需要看到實時分析數據的結果需求而引入的組件。在離線處理部分,我們通過Hive來計算一天以前的數據,包括各種時間跨度比較長的指標。不過對于用戶當天當時的各種指標,Hadoop生態系統中的各種技術,因為其設計就是為離線計算而生,就不能夠滿足實時計算的要求了。在2011年,還沒有非常好的開源的實時處理框架。我們能夠選擇的只能是自己去根據業務的需求開發自己的實時計算的組件。整個實時計算組件是采用Redis內存數據庫為基礎實現的。利用Redis提供的高速的訪問能力,以及對能夠對key的值進行增加計數,可以設置key過期等能力,我們將實時的計算指標通過組織Redis key來完成。
## Qeury API(查詢接口)
數據在離線計算和實時計算完成之后,會被Query API進行查詢。Qeury API會分別從Redis當中查詢當日的實時結果,同時會從Result DB查詢離線計算的結果。為了方便查詢,我們將Query API進行了封裝,封裝成了一個支持類似于SQL的查詢引擎。分析業務會通過類SQL的表達將查詢發送給查詢API,查詢API會根據時間切片的不同,決定是從Redis還是從Result DB中查詢數據,并且將結果拼裝后返回給分析業務系統應用。現在回頭看我們的架構,實際上像極了后來Storm的開發者Nathan Marz提出的Lambda架構,其架構如下:

從上圖可以看到,我們在2011年采用的架構和Lambda架構非常的像,只是采用的技術實現不盡相同。
## 大數據平臺之改良
隨著業務的發展和數據的增加,2011年我們那套技術架構也逐漸出現各種各樣的問題,對架構的重構也就變得越來越重要。這套架構主要存在的問題主要包括:
## 數據一致性問題
由于數據計算存在實時和離線兩個部分,實時計算采用的Redis,而離線部分采用的是Hive,由于SDK上傳的數據可能出現延遲,這樣實時計算的時候可能當天沒有上傳的數據,在以后會上傳,這樣實時計算的當天的數據和當天過后通過Hive進行批量計算的結果就會產生偏差,有時會造成用戶的困惑。
## 數據處理能力不足問題
隨著數據量的逐漸增加,這套技術架構也開始面臨數據處理能力的考驗。Kestrel作為一個輕量級的隊列,能夠滿足初期的要求。可是數據量增多后,kestrel的平行擴展能力的不足開始體現,另外隨著數據業務的增加,消息隊列模型需要更靈活的支持多消費者的消息隊列。而kestrel在這方面也很難滿足業務的需求。
為了解決這些問題,我們決定對架構進行重構,于是一套自行研發的計算框架Torch(火炬)系統應運而生。這套系統采用微批次的概念,主要解決大數據場景下統計分析業務的需求。整體的技術架構如下:

這個架構中,大家可以看到,消息隊列從Kestrel變為Kafka。采用Kafka,在數據量每天都在增加的時候,更方便的進行平行擴展。另外,業務可靠性的要求也越來越高,而Kafka本身的高可靠性的特點也更適合業務的需求。
在這個改良的架構中,不再存在實時和離線處理兩個數據計算路徑,所有的數據計算都是通過Torch的計算引擎來完成。整個計算引擎分為Counter引擎和Bitmap引擎兩個部分。計算是以分鐘為單位的微批次的計算,Counter引擎主要進行匯總類型的計算,而Bitmap引擎則負責生成數據的Bitmap索引,并將結果存儲在存儲當中。計算過程是基于預先定義好維度和度量的事實表來進行的。而某些不能預先進行索引的數據,則存儲在列式數據庫當中,從而可以在沒有預先計算的情況下,高效的執行分析型的計算。
## 大數據平臺之進階
隨著公司業務的進一步發展和擴充,對于數據計算的需求已經不僅僅是統計分析類型的業務,對數據價值的探索變得越來越重要。面向統計分析業務的平臺已經不能夠滿足業務發展的需求,我們需要對平臺進行進一步重構,使得大數據平臺能夠滿足:
- 統計分析業務
- 交互式分析
- 機器學習
- 數據可視化
基于這些需求,TalkingData新的π系統應運而生。整個平臺的架構如下圖:

在新的π系統架構中,整個大數據平臺除了能夠支持統計分析業務,還增加了對機器學習、以及交互式分析的支持。不同的技術組件描述如下:
## Data Collector
在新的價格中,為了提高數據收集的效率,Data Collector在新的架構中從基于java servlet改為基于actor模型的node.js進行實現。另外Data Collector本身分為前置節點和中心節點兩級,從而可以實現數據收集的分布式部署。前置節點分布式部署在多個區域,使得SDK可以選擇網絡連接更快的節點發送數據。而前置節點和中心節點采用高壓縮比的數據傳輸,從而更好的利用中心機房的帶寬資源。
## Message Bus
在新的架構中,消息隊列還是采用基于Kafka的消息總線,從而保證平行擴展、高可靠性,另外支持多消費者。
## Storage Service
在新的架構中,我們將使用到的存儲做為服務進行了封裝。整個存儲部分根據數據的冷熱時間不同,進行分區。熱數據存儲在分布式緩存Tachyon中,而冷數據則以Parquet格式存儲在HDFS當中。為了更好的支持多維交叉的分析型業務,TalkingData開發了針對bitmap的bitmap存儲。所有的存儲可以通過封裝好的API進行統一的訪問。另外,引入了基于HCatalog進行封裝的元數據管理,從而方便對數據的管理和訪問。
## Compute Service
計算服務基于分布式計算框架Spark,其中融合了Torch系統中的bitmap引擎,從而可以對流式數據生成bitmap索引,并將索引存儲在bitmap存儲中。另外將流式消費的數據轉化為列式存儲結構,存儲在Tachyon中。Tachyon中存儲的數據有有效期,過期的數據會遷移到HDFS當中,并且在Tachyon中做清除。即時數據請求會根據請求類型和時間,決定是從bitmap存儲、Tachyon、還是HDFS中讀取數據。所有的數據計算封裝為統一的數據計算API。
## Machine Learning Service
為了更好的發揮數據的價值,我們的架構中引入了機器學習服務。機器學習服務包括了Spark提供的MLLib,另外也包括公司自己開發的一些高效的機器學習算法,比如隨機決策森林、LR等等算法。所有的算法都封裝為算法庫,通過API的方式提供調用。
## Query Engine
查詢引擎則是對存儲API、計算API和機器學習API進行封裝,上層業務可以通過類似于SQL的語句進行數據計算,查詢引擎會對查詢進行解析,然后轉化為對應的下層API調用和執行。
## Data Service API
數據服務API則是各數據業務系統對數據進行業務化封裝的API,這些API一般都是Restful API。數據可視化層可以通過這些Restful API獲取數據,進行數據展現。
## Data Visualization
數據可視化服務包含標準的數據可視化組件,通過對數據可視化組件化封裝,業務系統的開發變得更為高效。數據可視化組件通過與數據服務API交互,獲取需要的數據,完成數據的可視化展現。
## 后記
新的π系統的架構是TalkingData技術團隊第一次以更為面向全局的視角進行的一次架構重構。整個架構的設計和實現也融合了公司不同技術團隊的集體力量,整個架構目前還在逐步完善中,期望我們能夠將這個架構變得更加成熟,實現的更加靈活,變成一個真正的可平行擴展的支持多種大數據計算能力的大數據平臺。