# Celery
Celery是一個基于Python開發的分布式異步消息隊列,可以輕松實現任務的異步處理。它的基本工作就是管理分配任務到不同的服務器,并且取得結果。至于說服務器之間是如何進行通信的?這個Celery本身不能解決。
Celery在執行任務時需要一個消息中間件來接收和發送任務消息,以及存儲任務結果,一般使用RabbitMQ 或 Redis,我們這里只討論Celery+RabbitMQ,其他的組合方式可以查閱更多資料。
RabbitMQ是一個由Erlang語言開發的AMQP的開源實現。AMQP即Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,為面向消息的中間件設計,基于此協議的客戶端與消息中間件可傳遞消息,并不受產品、開發語言等條件的限制。
在Celery+RabbitMQ組合中,RabbitMQ作為一個消息隊列管理工具被引入到和Celery集成,負責處理服務器之間的通信任務。
那么有一個疑問:RabbitMQ作為消息管理系統已經可以實現異步的發送消息,為什么還要使用Celery?
Celery相當于包裝了一個現成的系統,可以方便的在項目中操作RabbitMQ這個消息隊列介質,減少在RabbitMQ上編寫腳本的任務。最直接的例子就是在Celery Python里,只需要config一下settings,然后就可以用decorator輕松使用消息隊列,而不用在RabbitMQ上編寫復雜的腳本。
當然,Celery也支持和Redis、MongoDB之類的組合,原因是RabbitMQ盡管足夠強大,但對于一些相對簡單的業務環境來說可能太多(復雜)了一些。
# Celery+RabbitMQ是如何工作的?
關于Celery和RabbitMQ的協作方式,可以通過工作上的一些案例來說明:
假設A公司最近在開下年度工作會議,會議上要確定下一年的工作內容和計劃,參會人員有老板(下發任務者)、部門主管(Celery分配任務者)、部門員工(工作者)、老板秘書(溝通協調者RabbitMQ)。
那么這場會議首先需要確定的是下一年的具體工作內容,這里就稱之為“任務內容”。比如老板說我們下一年要開發出一款社交類APP產品,部門主管表示贊同,于是便愉快地定下了具體的工作任務(task),當然開發一款社交類APP產品是這個項目的總任務,其中可以細分成很多小的任務,比如業務流程是怎么樣的?界面怎么設計等。
在確定了具體工作任務后,老板便把這個項目交給了部門主管(Celery),部門主管確定部門員工中誰去完成這項任務,于是指定某個人(Worker),也可以多個人。?
發布工作任務的人是老板(下發任務者),他指定了部門主管(Celery)什么時候去完成哪些任務,并要求獲取反饋信息。但有一點需要注意,老板只管布置任務,不參與具體的任務分配,這個任務分配的工作是交給部門主管(Celery)去執行。?
項目之初,老板(下發任務者)通過公司會議將任務傳遞給部門主管(Celery),部門主管通過部門會議將任務分配給員工(Worker),過段時間再將任務結果反饋給老板。然而隨著任務越來越多,部門主管發現任務太多,每個任務都要反饋結果,記不住,也容易弄亂,導致效率下降。?
在召開會議商量了一番后,老板秘書(溝通協調者RabbitMQ)站起來說:“我有個提議,老板每天將布置的任務寫成一張紙條放到我這,然后部門主管每天早上來取并交給員工,至于紙條上的任務如何分配,部門主管決定就行,但是要將結果同樣寫一張紙條反饋給我,我再交給老板。這樣老板只負責下發任務,我只負責保管任務紙條,部門主管只負責分配任務并獲取反饋,員工只負責按任務工作。大家職責都很明確,效率肯定會更高。”至此,老板與員工的溝通問題也解決了。

# Celery介紹和基本使用
Celery是一個基于Python開發的分布式異步消息任務隊列,它簡單、靈活、可靠,是一個專注于實時處理的任務隊列,同時也支持任務調度。通過它可以輕松的實現任務的異步處理,如果你的業務場景中需要用到異步任務,就可以考慮使用Celery。舉幾個適用場景:
1)可以在 Request-Response 循環之外執行的操作:發送郵件、推送消息。
2)耗時的操作:調用第三方 API、視頻處理(前端通過 AJAX 展示進度和結果)。
3)周期性任務:取代 crontab。
## Celery有以下幾個優點
簡單:一旦熟悉了Celery的工作流程后,配置和使用是比較簡單的。
高可用:當任務執行失敗或執行過程中發生連接中斷,Celery 會自動嘗試重新執行任務。
快速:一個單進程的Celery每分鐘可處理上百萬個任務。
靈活: Celery的大部分組件都可以被擴展及自定制。
# 選擇Broker
Celery的基本架構和工作流程如下圖

常用的Broker有RabbitMQ、Redis、數據庫等,我們這里使用的是RabbitMQ

# Celery安裝使用
Celery是一個Python的應用,而且已經上傳到了PyPi,所以可以使用pip或easy_install安裝:
```
pip install celery
pip install eventlet
```
# 創建Application和Task
Celery的默認broker是RabbitMQ,僅需配置一行就可以:
```
broker_url = 'amqp://guest:guest@localhost:5672//'
```
創建一個Celery Application用來定義任務列表。
實例化一個Celery對象app,然后通過@app.task 裝飾器注冊一個 task。任務文件就叫tasks.py:
```
from celery import Celery
app = Celery(__name__, broker='amqp://guest:guest@localhost:5672//')
@app.task
def add(x, y):? ?
????????return x + y
```
# 運行 worker,啟動Celery Worker來開始監聽并執行任務
在 tasks.py 文件所在目錄運行
```
celery worker -A tasks.app -l INFO -P eventlet
```
這個命令會開啟一個在前臺運行的 worker,解釋這個命令的意義:
worker: 運行 worker 模塊。
-A: –app=APP, 指定使用的 Celery 實例。
-l: –loglevel=INFO, 指定日志級別,可選:DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL
其它常用的選項:
-P: –pool=prefork, 并發模型,可選:prefork (默認,multiprocessing), eventlet, gevent, threads.
-c: –concurrency=10, 并發級別,prefork 模型下就是子進程數量,默認等于 CPU 核心數
完整的命令行選項可以這樣查看:
```
celery worker --help
```
# 調用Task
再打開一個終端, 進行命令行模式,調用任務
```
from tasks import add
add.delay(1,2)
add.apply_async(args=(1,2))
```
上面兩種調用方式等價,delay()?方法是?apply_async()?方法的簡寫。這個調用會把?add?操作放入到隊列里,然后立即返回一個?AsyncResult?對象。如果關心處理結果,需要給?app?配置?CELERY_RESULT_BACKEND,指定一個存儲后端保存任務的返回值。
# 在項目中的簡單使用流程
1)RabbitMQ所在服務器,啟動crontab設置 crontable -user user -e設置定時執行celery application應用。
```
python tasks.py day
```
在task.py文件里面啟動一個叫做app的Celery Application,編寫一個app.task函數來produce 任務到rabbitmq。
```
app = Celery()
app.config_from_object(celeryconfig)
```
在每個worker里面通過命令啟動worker消費任務
```
celery worker -A tasks.app -l INFO -P eventlet
```