<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # Celery介紹和基本使用 **需求場景** 1. 對100臺命令執行一條批量命令,命令執行需要很長時間,但是不想讓主程序等著結果返回,而是給主程序返回一個任務ID,task_id 主程序過一段時間根據task_id,獲取執行結果即可,再命令執行期間,主程序 可以繼續做其他事情 2. 定時任務,比如每天檢測一下所有的客戶資料,發現是客戶的生日,發個祝福短信 **解決方案** 1. 邏輯view 中啟一個進程 父進程結束,子進程跟著結束,子進程任務沒有完成,不符合需求 父進程結束,等著子進程結束,父進程需等著結果返回,不符合需求 小結:該方案解決不了阻塞問題,即需要等待 2. 啟動 subprocess,任務托管給操作系統執行 實現task_id,實現異步,解決阻塞 小結:大批量高并發,主服務器會出現問題,解決不了并發 3. celery celery提供多子節點,解決并發問題 ## celery介紹 celery是一個基于python開發的分布式異步消息隊列,輕松實現任務的異步處理 celery在執行任務時需要一個消息中間件來接收和發送任務消息,以及存儲任務結果,一般使用RabbitMQ 或 Redis ## celery優點 簡單:熟悉celery的工作流程后,配置使用簡單 高可用:當任務執行失敗或執行過程中發生連接中斷,celery會自動嘗試重新執行任務 快速:一個單進程的celery每分鐘可處理上百萬個任務 靈活:幾乎celery的各個組件都可以被擴展及自定制 ## celery基本工作流程 ![](https://box.kancloud.cn/32d19c7420eb9c8c072f1b38ff0f8e41_670x319.png) 其中中間隊列用于分配任務以及存儲執行結果 ## celery安裝及使用 1. 安裝python模塊 ``` pip3 install celery pip3 install redis ``` 2. 安裝redis服務 ``` wget http://download.redis.io/releases/redis-3.2.8.tar.gz tar -zxvf redis-3.2.8.tar.gz cd redis-3.2.8 make src/redis-server # 啟動redis 服務 ``` 3. 創建一個celery application 用來定義任務列表 創建一個任務 tasks.py ``` from celery import Celery app = Celery('TASK', broker='amqp://guest:guest@localhost:5672//', backend='redis://localhost') @app.task def add(x,y): print("running...",x,y) return x+y ``` 4. 啟動celery worker 來開始監聽并執行任務 ``` celery -A tasks worker --loglevel=info ``` tasks 任務文件名,worker 任務角色,--loglevel=info 任務日志級別 5. 調用任務 打開另外終端,進入命令行模式,調用任務 ![](https://box.kancloud.cn/ccf1e9529cc2e20728b3e0ab1c6baa08_573x167.png) 6. celery常用接口 * tasks.add(4,6) ---> 本地執行 * tasks.add.delay(3,4) --> worker執行 * t=tasks.add.delay(3,4) --> t.get() 獲取結果,或卡住,阻塞 * t.ready()---> False:未執行完,True:已執行完 * t.get(propagate=False) 拋出簡單異常,但程序不會停止 * t.traceback 追蹤完整異常 # 項目中使用Celery ## 1. 項目目錄結構 ``` project |-- __init__.py |-- celery.py # 配置文檔 |-- tasks.py # 任務函數 |-- tasks2.py # 任務函數 ``` ## 2. 項目文件 project/celery.py ``` # from celery import Celery 默認當前路徑,更改為絕對路徑(當前路徑有個celery.py文件啦) from celery import Celery app = Celery('project', broker='redis://localhost', backend='redis://localhost', include=['project.tasks','project.tasks2']) # 配置文件和任務文件分開了,可以寫多個任務文件 # app 擴展配置 app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start() ``` celery.py作用相當于配置文件 project/tasks.py ``` from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y ``` project/tasks.py ``` from .celery import app @app.task def hello(): return 'Hello World' ``` 3. 啟動項目worker ``` celery -A project worker -l info ``` 另啟終端,與project同目錄進入python3 ![](https://box.kancloud.cn/4abe47350dd74f51919d94120f8a98b4_585x245.png) 4. 實現分布式 當啟動多個時 celery -A project worker -l info,去broker去相應任務,實現分布式 5. 后臺啟動woker ``` celery multi start w1 -A project -l info celery multi start w2 -A project -l info celery multi start w3 -A project -l info celery multi restart w1 -A project -l info celery multi stop w1 w2 w3 # 任務立刻停止 celery multi stopwait w1 w2 w3 # 任務執行完,停止 ``` # Celery定時任務 celery支持定時任務,設定好任務的執行時間,celery就會定時幫你執行,這個定時任務模塊叫 celery beat 項目目錄結構 ``` project |-- __init__.py |-- celery.py # 配置文件 |-- periodic_task.py # 定時任務文件 ``` 腳本celery.py ``` from celery import Celery app = Celery('project', broker='redis://localhost', backend='redis://localhost', include=['project.periodic_task',]) app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start() ``` 腳本periodic_task.py ``` from .celery import app from celery.schedules import crontab @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # 每10s調用 test('hello') sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # 每20s調用 test('world') sender.add_periodic_task(20.0, test.s('world'), expires=10) # 每周一早上7:30 執行 test('Happy Mondays!') sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), # 可靈活修改 test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg) ``` 啟動角色 worker 執行任務 ``` celery -A project worker -l info ``` 啟動角色 beat 將定時任務放到隊列中 ``` celery -A project.periodic_task beat -l debug ``` 也可以在配置文件celery.py 里添加定時任務 ``` app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'project.tasks.add', 'schedule': 30.0, 'args': (16, 16) }, } app.conf.timezone = 'UTC' ``` # Celery與Django結合 ``` LearnCelery |-- app1 |-- tasks.py |-- models.py |-- app2 |-- tasks.py |-- models.py |-- LearnCelery |-- __init__.py |-- celery.py |-- settings.py ``` 腳本代碼 LearnCelery/app/tasks.py # 必須叫這個名字 ``` from celery import shared_task import time # 所有的app都可以調用 @shared_task def add(x, y): time.sleep(10) return x + y @shared_task def mul(x, y): time.sleep(10) return x * y ``` LearnCelery/LearnCelery/__init__.py ``` # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app'] ``` LearnCelery/LearnCelery/celery.py ``` import os from celery import Celery # 單獨腳本調用Django內容時,需配置腳本的環境變量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') app = Celery('mysite') # CELERY_ 作為前綴,在settings中寫配置 app.config_from_object('django.conf:settings', namespace='CELERY') # 到Django各個app下,自動發現tasks.py 任務腳本 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request)) ``` LearnCelery/LearnCelery/settings.py ``` # For celery CELERY_BROKER_URL = 'redis://localhost' CELERY_RESULT_BACKEND = 'redis://localhost' ``` 3. 啟動celery ``` celery -A LearnCelery worker -l debug ``` 4. urls.py 視圖處理 ``` urlpatterns = [ url(r'^celery_call/$', views.celery_call), url(r'^celery_res/$', views.celery_res), ] ``` # django中使用計劃任務 1. 安裝插件 ``` pip3 install django-celery-beat ``` 2. 修改配置 settings.py ``` INSTALLED_APPS = [ 'django_celery_beat', ] ``` 3.數據庫遷移 ``` python manage.py migrate ``` 4. 啟動 celery beat ``` celery -A LearnCelery beat -l info -S django ``` 定時任務存到數據庫里,啟動beat定時取任務放到隊列里執行 5. admin管理 ![](https://box.kancloud.cn/33d360707ed8e3019c407402435e8eac_875x323.png) ![](https://box.kancloud.cn/b884ec26e2cd1c98d8bf5c480e91a478_1680x1594.png) 此時啟動你的celery beat 和worker,會發現每隔2分鐘,beat會發起一個任務消息讓worker執行scp_task任務 注意,經測試,每添加或修改一個任務,celery beat都需要重啟一次,要不然新的配置不會被celery beat進程讀到
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看