# Celery介紹和基本使用
**需求場景**
1. 對100臺命令執行一條批量命令,命令執行需要很長時間,但是不想讓主程序等著結果返回,而是給主程序返回一個任務ID,task_id
主程序過一段時間根據task_id,獲取執行結果即可,再命令執行期間,主程序 可以繼續做其他事情
2. 定時任務,比如每天檢測一下所有的客戶資料,發現是客戶的生日,發個祝福短信
**解決方案**
1. 邏輯view 中啟一個進程
父進程結束,子進程跟著結束,子進程任務沒有完成,不符合需求
父進程結束,等著子進程結束,父進程需等著結果返回,不符合需求
小結:該方案解決不了阻塞問題,即需要等待
2. 啟動 subprocess,任務托管給操作系統執行
實現task_id,實現異步,解決阻塞
小結:大批量高并發,主服務器會出現問題,解決不了并發
3. celery
celery提供多子節點,解決并發問題
## celery介紹
celery是一個基于python開發的分布式異步消息隊列,輕松實現任務的異步處理
celery在執行任務時需要一個消息中間件來接收和發送任務消息,以及存儲任務結果,一般使用RabbitMQ 或 Redis
## celery優點
簡單:熟悉celery的工作流程后,配置使用簡單
高可用:當任務執行失敗或執行過程中發生連接中斷,celery會自動嘗試重新執行任務
快速:一個單進程的celery每分鐘可處理上百萬個任務
靈活:幾乎celery的各個組件都可以被擴展及自定制
## celery基本工作流程

其中中間隊列用于分配任務以及存儲執行結果
## celery安裝及使用
1. 安裝python模塊
```
pip3 install celery
pip3 install redis
```
2. 安裝redis服務
```
wget http://download.redis.io/releases/redis-3.2.8.tar.gz
tar -zxvf redis-3.2.8.tar.gz
cd redis-3.2.8
make
src/redis-server # 啟動redis 服務
```
3. 創建一個celery application 用來定義任務列表
創建一個任務 tasks.py
```
from celery import Celery
app = Celery('TASK',
broker='amqp://guest:guest@localhost:5672//',
backend='redis://localhost')
@app.task
def add(x,y):
print("running...",x,y)
return x+y
```
4. 啟動celery worker 來開始監聽并執行任務
```
celery -A tasks worker --loglevel=info
```
tasks 任務文件名,worker 任務角色,--loglevel=info 任務日志級別
5. 調用任務
打開另外終端,進入命令行模式,調用任務

6. celery常用接口
* tasks.add(4,6) ---> 本地執行
* tasks.add.delay(3,4) --> worker執行
* t=tasks.add.delay(3,4) --> t.get() 獲取結果,或卡住,阻塞
* t.ready()---> False:未執行完,True:已執行完
* t.get(propagate=False) 拋出簡單異常,但程序不會停止
* t.traceback 追蹤完整異常
# 項目中使用Celery
## 1. 項目目錄結構
```
project
|-- __init__.py
|-- celery.py # 配置文檔
|-- tasks.py # 任務函數
|-- tasks2.py # 任務函數
```
## 2. 項目文件
project/celery.py
```
# from celery import Celery 默認當前路徑,更改為絕對路徑(當前路徑有個celery.py文件啦)
from celery import Celery
app = Celery('project',
broker='redis://localhost',
backend='redis://localhost',
include=['project.tasks','project.tasks2']) # 配置文件和任務文件分開了,可以寫多個任務文件
# app 擴展配置
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
```
celery.py作用相當于配置文件
project/tasks.py
```
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
```
project/tasks.py
```
from .celery import app
@app.task
def hello():
return 'Hello World'
```
3. 啟動項目worker
```
celery -A project worker -l info
```
另啟終端,與project同目錄進入python3

4. 實現分布式
當啟動多個時 celery -A project worker -l info,去broker去相應任務,實現分布式
5. 后臺啟動woker
```
celery multi start w1 -A project -l info
celery multi start w2 -A project -l info
celery multi start w3 -A project -l info
celery multi restart w1 -A project -l info
celery multi stop w1 w2 w3 # 任務立刻停止
celery multi stopwait w1 w2 w3 # 任務執行完,停止
```
# Celery定時任務
celery支持定時任務,設定好任務的執行時間,celery就會定時幫你執行,這個定時任務模塊叫 celery beat
項目目錄結構
```
project
|-- __init__.py
|-- celery.py # 配置文件
|-- periodic_task.py # 定時任務文件
```
腳本celery.py
```
from celery import Celery
app = Celery('project',
broker='redis://localhost',
backend='redis://localhost',
include=['project.periodic_task',])
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
```
腳本periodic_task.py
```
from .celery import app
from celery.schedules import crontab
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 每10s調用 test('hello')
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# 每20s調用 test('world')
sender.add_periodic_task(20.0, test.s('world'), expires=10)
# 每周一早上7:30 執行 test('Happy Mondays!')
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1), # 可靈活修改
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
```
啟動角色 worker 執行任務
```
celery -A project worker -l info
```
啟動角色 beat 將定時任務放到隊列中
```
celery -A project.periodic_task beat -l debug
```
也可以在配置文件celery.py 里添加定時任務
```
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'project.tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'
```
# Celery與Django結合
```
LearnCelery
|-- app1
|-- tasks.py
|-- models.py
|-- app2
|-- tasks.py
|-- models.py
|-- LearnCelery
|-- __init__.py
|-- celery.py
|-- settings.py
```
腳本代碼
LearnCelery/app/tasks.py # 必須叫這個名字
```
from celery import shared_task
import time
# 所有的app都可以調用
@shared_task
def add(x, y):
time.sleep(10)
return x + y
@shared_task
def mul(x, y):
time.sleep(10)
return x * y
```
LearnCelery/LearnCelery/__init__.py
```
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
```
LearnCelery/LearnCelery/celery.py
```
import os
from celery import Celery
# 單獨腳本調用Django內容時,需配置腳本的環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
app = Celery('mysite')
# CELERY_ 作為前綴,在settings中寫配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 到Django各個app下,自動發現tasks.py 任務腳本
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
```
LearnCelery/LearnCelery/settings.py
```
# For celery
CELERY_BROKER_URL = 'redis://localhost'
CELERY_RESULT_BACKEND = 'redis://localhost'
```
3. 啟動celery
```
celery -A LearnCelery worker -l debug
```
4. urls.py 視圖處理
```
urlpatterns = [
url(r'^celery_call/$', views.celery_call),
url(r'^celery_res/$', views.celery_res),
]
```
# django中使用計劃任務
1. 安裝插件
```
pip3 install django-celery-beat
```
2. 修改配置 settings.py
```
INSTALLED_APPS = [
'django_celery_beat',
]
```
3.數據庫遷移
```
python manage.py migrate
```
4. 啟動 celery beat
```
celery -A LearnCelery beat -l info -S django
```
定時任務存到數據庫里,啟動beat定時取任務放到隊列里執行
5. admin管理


此時啟動你的celery beat 和worker,會發現每隔2分鐘,beat會發起一個任務消息讓worker執行scp_task任務
注意,經測試,每添加或修改一個任務,celery beat都需要重啟一次,要不然新的配置不會被celery beat進程讀到