# Task Worker
---
[TOC=2,4]
* * * * *
## 簡介
Task Worker是Swoole中一種特殊的工作進程,該進程的作用是處理一些耗時較長的任務,以達到釋放Worker進程的目的。Worker進程可以通過`swoole_server`對象的task方法投遞一個任務到Task Worker進程.
Worker進程通過Unix Sock管道將數據發送給Task Worker,這樣Worker進程就可以繼續處理新的邏輯,無需等待耗時任務的執行。
需要注意的是,由于Task Worker是獨立進程,因此無法直接在兩個進程之間共享全局變量,需要使用Redis、MySQL或者swoole_table來實現進程間共享數據。
## Task/Finish特性的用途
task模塊用來做一些異步的慢速任務,比如webim中發廣播,發送郵件等。
* task進程必須是同步阻塞的
* task進程支持定時器
node.js 假如有10萬個連接,要發廣播時,那會循環10萬次,這時候程序不能做任何事情,不能接受新的連接,也不能收包發包。
而swoole不同,丟給task進程之后,worker進程可以繼續處理新的數據請求。任務完成后會異步地通知worker進程告訴它此任務已經完成。
當然task模塊的作用還不僅如此,實現PHP的數據庫連接池,異步隊列等,還需要進一步挖掘。
## 實例
要使用Task Worker,需要進行一些必要的操作。
首先,需要設置swoole_server的配置參數:
```php
$serv->set(array(
'task_worker_num' => 2, // 設置啟動2個task進程
));
```
接著,綁定必要的回調函數:
```php
$serv->on('Task', 'onTask');
$serv->on('Finish','onFinish');
```
其中兩個回調函數的原型如下所示:
```php
/**
* @param $serv swoole_server swoole_server對象
* @param $task_id int 任務id
* @param $from_id int 投遞任務的worker_id
* @param $data string 投遞的數據
*/
function onTask(swoole_server $serv, $task_id, $from_id, $data);
/**
* @param $serv swoole_server swoole_server對象
* @param $task_id int 任務id
* @param $data string 任務返回的數據
*/
function onFinish(swoole_server $serv, $task_id, $data);
```
在實際邏輯中,當需要發起一個任務請求時,可以使用如下方法調用:
```php
$data = "task data";
$serv->task($data , -1 ); // -1代表不指定task進程
// 在1.8.6+的版本中,可以動態指定onFinish函數
$serv->task($data, -1, function (swoole_server $serv, $task_id, $data) {
echo "Task Finish Callback\n";
});
```
### 1.Task簡介
Swoole的業務邏輯部分是同步阻塞運行的,如果遇到一些耗時較大的操作,例如訪問數據庫、廣播消息等,就會影響服務器的響應速度。因此Swoole提供了Task功能,將這些耗時操作放到另外的進程去處理,當前進程繼續執行后面的邏輯。
### 2.開啟Task功能
開啟Task功能只需要在swoole_server的配置項中添加[task_worker_num](server/set.md)一項即可,如下:
```php
$serv->set(array(
'task_worker_num' => 8
));
```
即可開啟task功能。此外,必須給swoole_server綁定兩個回調函數:[onTask](server/02.swoole_server事件回調函數.md)和[onFinish](server/02.swoole_server事件回調函數.md)。這兩個回調函數分別用于執行Task任務和處理Task任務的返回結果。
### 3.使用Task
首先是發起一個Task,代碼如下:
```php
public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
echo "Get Message From Client {$fd}:{$data}\n";
// send a task to task worker.
$param = array(
'fd' => $fd
);
// start a task
$serv->task( json_encode( $param ) );
echo "Continue Handle Worker\n";
}
```
可以看到,發起一個任務時,只需通過swoole_server對象調用task函數即可發起一個任務。swoole內部會將這個請求投遞給task_worker,而當前Worker進程會繼續執行。
當一個任務發起后,task_worker進程會響應[onTask](server/02.swoole_server事件回調函數.md)回調函數,如下:
```php
public function onTask($serv,$task_id,$from_id, $data) {
echo "This Task {$task_id} from Worker {$from_id}\n";
echo "Data: {$data}\n";
for($i = 0 ; $i < 10 ; $i ++ ) {
sleep(1);
echo "Task {$task_id} Handle {$i} times...\n";
}
$fd = json_decode( $data , true )['fd'];
$serv->send( $fd , "Data in Task {$task_id}");
return "Task {$task_id}'s result";
}
```
這里我用sleep函數和循環模擬了一個長耗時任務。在onTask回調中,我們通過task_id和from_id(也就是worker_id)來區分不同進程投遞的不同task。當一個task執行結束后,通過return一個字符串將執行結果返回給Worker進程。Worker進程將通過[onFinish](server/02.swoole_server事件回調函數.md)回調函數接收這個處理結果。
下面來看onFinish回調:
```php
public function onFinish($serv,$task_id, $data) {
echo "Task {$task_id} finish\n";
echo "Result: {$data}\n";
}
```
在[onFinish](server/02.swoole_server事件回調函數.md)回調中,會接收到Task任務的處理結果$data。在這里處理這個返回結果即可。
(**Tip:** 可以通過在傳遞的data中存放fd、buff等數據,來延續投遞Task之前的工作)
[點此查看完整示例](https://github.com/LinkedDestiny/swoole-doc/blob/master/src/02/swoole_task_server.php)
## 3.Task進階:MySQL連接池
上一章中我簡單講解了如何開啟和使用Task功能。這一節,我將提供一個Task的高級用法。<br>
在PHP中,訪問MySQL數據庫往往是性能提升的瓶頸。而MySQL連接池我想大家都不陌生,這是一個很好的提升數據庫訪問性能的方式。傳統的MySQL連接池,是預先申請一定數量的連接,每一個新的請求都會占用其中一個連接,請求結束后再將連接放回池中,如果所有連接都被占用,新來的連接則會進入等待狀態。<br>
知道了MySQL連接池的實現原理,那我們來看如何使用Swoole實現一個連接池。<br>
首先,Swoole允許開啟一定量的Task Worker進程,我們可以讓每個進程都擁有一個MySQL連接,并保持這個連接,這樣,我們就創建了一個連接池。<br>
其次,設置swoole的[dispatch_mode](server/set.md)為搶占模式(主進程會根據Worker的忙閑狀態選擇投遞,只會投遞給處于閑置狀態的Worker)。這樣,每個task都會被投遞給閑置的Task Worker。這樣,我們保證了每個新的task都會被閑置的Task Worker處理,如果全部Task Worker都被占用,則會進入等待隊列。<br>
下面直接上關鍵代碼:<br>
```php
public function onWorkerStart( $serv , $worker_id) {
echo "onWorkerStart\n";
// 判定是否為Task Worker進程
if( $worker_id >= $serv->setting['worker_num'] ) {
$this->pdo = new PDO(
"mysql:host=localhost;port=3306;dbname=Test",
"root",
"123456",
array(
PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES 'UTF8';",
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_PERSISTENT => true
)
);
}
}
```
首先,在每個Task Worker進程中,創建一個MySQL連接。這里我選用了PDO擴展。<br>
```php
public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
$sql = array(
'sql'=>'select * from Test where pid > ?',
'param' => array(
0
),
'fd' => $fd
);
$serv->task( json_encode($sql) );
}
```
其次,在需要的時候,通過[task]()函數投遞一個任務(也就是發起一次SQL請求)<br>
```php
public function onTask($serv,$task_id,$from_id, $data) {
$sql = json_decode( $data , true );
$statement = $this->pdo->prepare($sql['sql']);
$statement->execute($sql['param']);
$result = $statement->fetchAll(PDO::FETCH_ASSOC);
$serv->send( $sql['fd'],json_encode($result));
return true;
}
```
最后,在onTask回調中,根據請求過來的SQL語句以及相應的參數,發起一次MySQL請求,并將獲取到的結果通過send發送給客戶端(或者通過return返回給Worker進程)。而且,這樣的一次MySQL請求還不會阻塞Worker進程,Worker進程可以繼續處理其他的邏輯。<br>
可以看到,簡單十幾行代碼,就實現了一個高效的異步MySQL連接池。<br>
通過測試,單個客戶端一共發起1W次select請求,共耗時9s;<br> 1W次insert請求,共耗時21s。<br>
(客戶端會在每次收到前一個請求的結果后才會發起下一次請求,而不是并發)。
[點此查看完整服務端代碼](https://github.com/LinkedDestiny/swoole-doc/blob/master/src/03/swoole_mysql_pool_server.php)<br>
[點此查看完整客戶端代碼](https://github.com/LinkedDestiny/swoole-doc/blob/master/src/03/swoole_mysql_pool_client.php)<br>
>可參考[swoole_framework中的代碼](http://git.oschina.net/swoole/swoole_framework/blob/master/libs/Swoole/Async/MySQL.php)
>redis連接池可參考[swoole_framework中的代碼](http://git.oschina.net/swoole/swoole_framework/blob/master/libs/Swoole/Async/Redis.php)
## 4.Task實戰:yii中應用task
在YII框架中結合了swoole 的task 做了異步處理。
本例中 主要用到
1、protected/commands/ServerCommand.php 用來做server。
2、protected/event/下的文件 這里是在異步中的具體實現。
客戶端調用參照 TestController
```php
<?php
class TestController extends Controller{
public function actionTT(){
$message['uid'] = 2;
$message['email'] = '83212019@qq.com';
$message['title'] = '接口報警郵件';
$message['contents'] = "'EmailEvent'接口請求過程出錯! 錯誤信息如下:err_no:'00000' err_msg:'測試隊列' 請求參數為:'[]'";
$message['type'] = 2;
$data['param'] = $message;
$data['class'] = 'Email';
$client = new EventClient();
$data = $client->send($data);
}
}
```
有個task表是用來記錄異步任務的。如果失敗重試3次。sql在protected/data/sql.sql里。
[點此查看完整客戶端代碼](https://github.com/LinkedDestiny/swoole-doc/blob/master/src/03/swoole_mysql_pool_client.php)
- swoole簡介
- swoole功能概述
- 序章
- 開發必讀
- 1 環境搭建
- 1.1 環境搭建
- 1.2 搭建Echo服務器
- 2 初識Swoole
- 2.1 Worker進程
- 2.2 TaskWorker進程
- 2.3 Timer定時器
- 2.4 Process進程
- 2.5 Table內存表
- 2.6 多端口監聽
- 2.7 sendfile文件支持
- 2.8 SSL支持
- 2.9 熱重啟
- 2.10 http_server
- 附錄*server配置
- 附錄*server函數
- 附錄*server屬性
- 附錄*server回調函數
- 附錄*server高級特性
- 心跳檢測
- 3 Swoole協議
- 3.1 EOF協議
- 3.2 固定包頭協議
- 3.3 Http協議
- 3.4 WebSocket協議
- 3.5 MTQQ協議
- 內置http_server
- 內置websocket_server
- Swoole\Redis\Server
- 4 Swoole異步IO
- 4.1 AsyncIO
- 異步文件系統IO
- swoole_async_readfile
- swoole_async_writefile
- swoole_async_read
- swoole_async_write
- 5 swoole異步客戶端
- ws_client
- http_client
- mysql_client
- redis_client
- tcp_client
- http2_client
- 6 swoole協程
- Swoole\Coroutine\Http\Client
- Swoole\Coroutine\MySQL
- Swoole\Coroutine\Redis
- Coroutine\PostgreSQL
- Swoole\Coroutine\Client
- Swoole\Coroutine\Socket
- Swoole\Coroutine\Channel
- Coroutine
- Swoole\Coroutine::create
- Swoole\Coroutine::resume
- Swoole\Coroutine::suspend
- Swoole\Coroutine::sleep
- Coroutine::getaddrinfo
- Coroutine::gethostbyname
- swoole_async_dns_lookup_coro
- Swoole\Coroutine::getuid
- getDefer
- setDefer
- recv
- Coroutine::stats
- Coroutine::fread
- Coroutine::fget
- Coroutine::fwrite
- Coroutine::readFIle
- Coroutine::writeFIle
- Coroutine::exec
- 7 swoole_process
- process::construct
- process::start
- process::name
- process::signal
- process::setaffinity
- process::exit
- process::kill
- process::daemon
- process->exec
- process::wait
- process::alarm
- 8 swoole定時器
- swoole_timer_tick
- swoole_timer_after
- swoole_timer_clear
- 9 swoole_event
- swoole_event_add
- swoole_event_set
- swoole_event_del
- swoole_event_wait
- swoole_event_defer
- swoole_event_write
- swoole_event_exit
- swoole提供的function
- 常見問題
- 客戶端鏈接失敗原因
- 如何設置進程數
- 如何實現異步任務
- 如何選擇swoole三種模式
- php中哪些函數是阻塞的
- 是否可以共用1個redis或mysql連接
- 如何在回調函數中訪問外部的變量
- 為什么不要send完后立即close
- 不同的Server程序實例間如何通信
- MySQL的連接池、異步、斷線重連
- 在php-fpm或apache中使用swoole
- 學習Swoole需要掌握哪些基礎知識
- 在phpinfo中有在php-m中沒有
- 同步阻塞與異步非阻塞選擇
- CURL發送POST請求服務器端超時
- 附錄
- 預定義常量
- 內核參數調優
- php四種回調寫法
- 守護進程程序常用數據結構
- swoole生命周期
- swoole_server中內存管理機制
- 使用jemalloc優化swoole內存分配性能
- Reactor、Worker、Task的關系
- Manager進程
- Swoole的實現
- Reactor線程
- 安裝擴展
- swoole-worker手冊
- swoole相關開源項目
- 寫在后面的話
- 版本更新記錄
- 4.0