# 5.5 連接池
連接池的重要性,這時就不贅述了,下面具體介紹框架中實現的哪些連接池。
## Redis連接池
### 主要特性
- 支持異步+協程
- 支持斷線重連
- 支持自動提取和歸還連接
- 統一同步和異步調用方式
### 配置
```php
<?php
/**
* 本地環境
*/
$config['redis']['p1']['ip'] = '127.0.0.1';
$config['redis']['p1']['port'] = 6379;
//$config['redis']['p1']['password'] = 'xxxx';
//$config['redis']['p1']['select'] = 1;
// Redis序列化選項等同于phpredis序列化的各個選項如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['redisSerialize'] = \Redis::SERIALIZER_PHP;
// PHP序列化選項,為了兼容yii遷移項目的set,get,mset,mget,選項如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['phpSerialize'] = \Redis::SERIALIZER_NONE;
// 是否將key md5后儲存,默認為0,開啟為1
//$config['redis']['p1']['hashKey'] = 1;
// 設置key的前綴
//$config['redis']['p1']['keyPrefix'] = 'demo_';
return $config;
```
示例配置代碼:
[./php-msf-demo/app/config/docker/redis.php](https://github.com/pinguo/php-msf/pinguo/config/docker/redis.php)
- $config['redis']
代表Redis連接池相關配置
- p1,p2,p3,p4,p5,p6
這里的p僅代表一臺或者一組Redis服務器,在使用連接池時會用到,如果Redis服務器端分片(比如twemproxy)就填寫為集群導出的地址與端口等信息。
- ip
Redis服務器地址
- port
Redis服務器端口
- password
Redis認證密鑰
- select
Redis DB
- redisSerialize
Redis序列化選項等同于phpredis序列化的各個選項如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
- phpSerialize
PHP序列化選項,為了兼容yii遷移項目的set,get,mset,mget,選項如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
- hashKey
是否將key md5后儲存,默認為0,開啟為1
- keyPrefix
設置key的前綴
### Redis連接池的使用
```php
/**
* Redis示例控制器
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
use App\Models\Demo as DemoModel;
class Redis extends Controller
{
// Redis連接池讀寫示例
public function actionPoolSetGet()
{
yield $this->getRedisPool('p1')->set('key1', 'val1');
$val = yield $this->getRedisPool('p1')->get('key1');
$this->outputJson($val);
}
}
```
1. $this->getRedisPool($name)
獲取連接池對象,并選擇名為$name的連接池,$name由配置文件中聲明,比如上述配置中的tw
2. 連接池對象的所有方法映射為標準的Redis操作指令
如:`SETEX key seconds value`映射為`$this->getRedisPool($name)->setex($key, $seconds, $value)`
3. string類型的簡化操作
`$this->getRedisPool($name)->cache($key, $value = '', $expire = 0)`,`$key`為redis key,`$value`為緩存的值,`$expire`為過期時間,默認不會過期。
4. 執行lua腳本
`$this->getRedisPool($name)->evalMock($script, $args = array(), $numKeys = 0)`
如:
```php
<?php
function luaExample()
{
$num = 100;
$lua = "
local allWorks = {}
local recWorks = {}
local random = nil
for k, v in pairs(KEYS) do
local works = redis.call('sRandMember', v, '" . $num . "')
if works ~= nil then
for key, val in pairs(works) do
table.insert(allWorks, val)
end
end
end
while #recWorks < " . $num . " and #allWorks > 0 do
random = math.random(#allWorks)
table.insert(recWorks, allWorks[random])
table.remove(allWorks, random)
end
return cjson.encode(recWorks)
";
$keys = ['feedId1', 'feedId2', 'feedId3'];
$this->getRedisPool('tw')->evalMock($lua, $keys, count($keys));
}
```
## Redis代理
在Redis連接池的基本上,MSF框架還實現了Redis代理的基本功能,主要特性有:
- 支持分布式自動分片
- 支持master-slave讀寫分離
- 支持故障自動failover
### 配置
```php
<?php
/**
* 本地環境
*/
$config['redis']['p1']['ip'] = '127.0.0.1';
$config['redis']['p1']['port'] = 6379;
//$config['redis']['p1']['password'] = 'xxxx';
//$config['redis']['p1']['select'] = 1;
// Redis序列化選項等同于phpredis序列化的各個選項如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['redisSerialize'] = \Redis::SERIALIZER_PHP;
// PHP序列化選項,為了兼容yii遷移項目的set,get,mset,mget,選項如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['phpSerialize'] = \Redis::SERIALIZER_NONE;
// 是否將key md5后儲存,默認為0,開啟為1
//$config['redis']['p1']['hashKey'] = 1;
// 設置key的前綴
//$config['redis']['p1']['keyPrefix'] = 'demo_';
$config['redis']['p2']['ip'] = '127.0.0.1';
$config['redis']['p2']['port'] = 6380;
$config['redis']['p3']['ip'] = '127.0.0.1';
$config['redis']['p3']['port'] = 6381;
$config['redis']['p4']['ip'] = '127.0.0.1';
$config['redis']['p4']['port'] = 7379;
$config['redis']['p5']['ip'] = '127.0.0.1';
$config['redis']['p5']['port'] = 7380;
$config['redis']['p6']['ip'] = '127.0.0.1';
$config['redis']['p6']['port'] = 7381;
$config['redis_proxy']['master_slave'] = [
'pools' => ['p1', 'p2', 'p3'],
'mode' => \PG\MSF\Marco::MASTER_SLAVE,
];
$config['redis_proxy']['cluster'] = [
'pools' => [
'p4' => 1,
'p5' => 1,
'p6' => 1
],
'mode' => \PG\MSF\Marco::CLUSTER,
];
return $config;
```
示例配置代碼:
[https://github.com/pinguo/php-msf-demo/app/config/docker/redis.php](https://github.com/pinguo/php-msf-demo/blob/master/config/docker/redis.php)
- $config['redis_proxy']
代表Redis代理相關配置
- cluster
這里的cluster僅代表一組Redis服務器集群,是一個標識
- mode
Redis集群類型,\PG\MSF\Marco::CLUSTER代表分布式的Redis集群;\PG\MSF\Marco::MASTER_SLAVE代表主從結構的Redis集群
- pools
當mode設置為\PG\MSF\Marco::CLUSTER時,pools為array,他的key表示Redis連接池名稱,value表示Redis連接池權重;當mode設置為\PG\MSF\Marco::MASTER_SLAVE,pools為英文逗號分隔的Redis連接池名稱列表。
### Redis代理的使用
```php
<?php
/**
* Redis示例控制器
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
use App\Models\Demo as DemoModel;
class Redis extends Controller
{
// Redis代理使用示例(分布式)
public function actionProxySetGet()
{
for ($i = 0; $i <= 100; $i++) {
yield $this->getRedisProxy('cluster')->set('proxy' . $i, $i);
}
$val = yield $this->getRedisProxy('cluster')->get('proxy22');
$this->outputJson($val);
}
// Redis代理使用示例(主從)
public function actionMaserSlaveSetGet()
{
for ($i = 0; $i <= 100; $i++) {
yield $this->getRedisProxy('master_slave')->set('M' . $i, $i);
}
$val = yield $this->getRedisProxy('master_slave')->get('M66');
$this->outputJson($val);
}
}
```
## Redis連接池與代理的關系

## MySQL連接池
### 配置
```php
<?php
/**
* Docker環境
*/
$config['mysql']['master']['host'] = '127.0.0.1';
$config['mysql']['master']['port'] = 3306;
$config['mysql']['master']['user'] = 'root';
$config['mysql']['master']['password'] = '123456';
$config['mysql']['master']['charset'] = 'utf8';
$config['mysql']['master']['database'] = 'demo';
$config['mysql']['slave1']['host'] = '127.0.0.1';
$config['mysql']['slave1']['port'] = 3306;
$config['mysql']['slave1']['user'] = 'root';
$config['mysql']['slave1']['password'] = '123456';
$config['mysql']['slave1']['charset'] = 'utf8';
$config['mysql']['slave1']['database'] = 'demo';
$config['mysql']['slave2']['host'] = '127.0.0.1';
$config['mysql']['slave2']['port'] = 3306;
$config['mysql']['slave2']['user'] = 'root';
$config['mysql']['slave2']['password'] = '123456';
$config['mysql']['slave2']['charset'] = 'utf8';
$config['mysql']['slave2']['database'] = 'demo';
$config['mysql_proxy']['master_slave'] = [
'pools' => [
'master' => 'master',
'slaves' => ['slave1', 'slave2'],
],
'mode' => \PG\MSF\Marco::MASTER_SLAVE,
];
return $config;
```
示例配置代碼:
[https://github.com/pinguo/php-msf-demo/app/config/docker/mysql.php](https://github.com/pinguo/php-msf-demo/blob/master/config/docker/mysql.php)
### 執行SQL
```php
<?php
/**
* MySQL示例控制器
*
* app/data/demo.sql可以導入到mysql再運行示例方法
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
class MySQL extends Controller
{
// MySQL連接池示例
public function actionBizLists()
{
// SQL DBBuilder更多參考 https://github.com/jstayton/Miner
$bizLists = yield $this->getMysqlPool('master')->select("*")->from('biz')->go();
$this->outputJson($bizLists);
}
// 直接執行sql
public function actionShowDB()
{
/**
* @var \PG\MSF\Pools\Miner $DBBuilder
*/
$dbs = yield $this->getMysqlPool('master')->go(null, 'show databases');
$this->outputJson($dbs);
}
// 事務示例
public function actionTransaction()
{
/**
* @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlPool
*/
$mysqlPool = $this->getMysqlPool('master');
// 開啟一個事務,并返回事務ID
$id = yield $mysqlPool->goBegin();
$up = yield $mysqlPool->update('user')->set('name', '徐典陽-1')->where('id', 3)->go($id);
$ex = yield $mysqlPool->select('*')->from('user')->where('id', 3)->go($id);
if ($ex['result']) {
yield $mysqlPool->goCommit($id);
$this->outputJson('commit');
} else {
yield $mysqlPool->goRollback($id);
$this->outputJson('rollback');
}
}
}
```
示例代碼:
[https://github.com/pinguo/php-msf-demo/app/Controllers/MySQL.php](https://github.com/pinguo/php-msf-demo/blob/master/app/Controllers/MySQL.php)
### DBQueryBuilder
目前php-msf整合的是DB Query Builder是[Miner](https://github.com/jstayton/Miner),更多SQL的拼裝請參考它。
另外,$this->getMysqlPool('連接池配置名'),獲取的連接池對象,可以在上面直接調用Miner的相關方法,進行sql拼裝。
### 關于 go($id = null, $sql = null)
`go($id = null, $sql = null)`是以協程方法執行SQL,它會創建一個MySQL協程,其中`$id`為事務ID,如果未啟用事務,默認為null。`$sql`為手工書寫待執行的SQL。
### 事務
事務操作的一般流程為:
1. 開啟一個事務,并返回事務ID
2. 執行一個SQL,設置事務ID,執行一個SQL,設置事務ID,...
3. 提交(回滾)事務
用代碼實現即:
```
try {
$id = yield $mysqlPool->goBegin();
$res1 = yield $mysqlPool->update($table)->set($filed, $value)->go($id)
$res1 = yield $mysqlPool->update($table)->set($filed, $value)->go($id)
} catch (\Exception $e) {
yield $mysqlPool->goRollback($id);
throw $e;
}
yield $mysqlPool->goCommit($id);
```
## MySQL代理
在MySQL連接池的基本上,MSF框架還實現了MySQL代理的基本功能,主要特性有:
* 支持master-slave讀寫分離
* 支持事務
### 配置代理
如上述配置代碼
```php
$config['mysql_proxy']['master_slave'] = [
'pools' => [
'master' => 'master',
'slaves' => ['slave1', 'slave2'],
],
'mode' => \PG\MSF\Marco::MASTER_SLAVE,
];
```
- $config['mysql_proxy']
代表MySQL代理相關配置
- master_slave
這里的master_slave僅代表一組MySQL服務器集群,是一個標識
- mode
MySQL集群類型\PG\MSF\Marco::MASTER_SLAVE代表主從結構的MySQL集群
- pools
當mode設置為\PG\MSF\Marco::MASTER_SLAVE, `pools.master`表示MySQL主節點對應的連接池標識;`pools.slaves`為數字索引MySQL從節點對應的連接池標識列表
### MySQL代理的使用
```php
<?php
/**
* MySQL示例控制器
*
* app/data/demo.sql可以導入到mysql再運行示例方法
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
class MySQL extends Controller
{
// MySQL代理使用示例
public function actionProxy()
{
/**
* @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlProxy
*/
$mysqlProxy = $this->getMysqlProxy('master_slave');
$bizLists = yield $mysqlProxy->select("*")->from('user')->go();
$up = yield $mysqlProxy->update('user')->set('name', '徐典陽-6')->where('id', 3)->go();
$this->outputJson($bizLists);
}
// MySQL代理事務,事務只會在主節點上執行
public function actionProxyTransaction()
{
/**
* @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlProxy
*/
$mysqlProxy = $this->getMysqlProxy('master_slave');
// 開啟一個事務,并返回事務ID
$id = yield $mysqlProxy->goBegin();
$up = yield $mysqlProxy->update('user')->set('name', '徐典陽-2')->where('id', 3)->go($id);
$ex = yield $mysqlProxy->select('*')->from('user')->where('id', 3)->go($id);
if ($ex['result']) {
yield $mysqlProxy->goCommit($id);
$this->outputJson('commit');
} else {
yield $mysqlProxy->goRollback($id);
$this->outputJson('rollback');
}
}
}
```
MySQL代理基于連接池,它和連接池的使用唯一區別在于從`$this->getMysqlPool`切換為`$this->getMysqlProxy`,所有的調用方式和連接池保持一致,就是這么簡單。
## MySQL同步模式
有一些場景,需要用到MySQL同步查詢數據,比如Task在Tasker進程中執行,由于Tasker是同步阻塞的進程模型,在處理數據過程中又需要查詢數據庫中的數據,然后再計算相關數據,這個時候就需要使用MySQL同步模式。
php-msf框架內部已經將異步和同步查詢MySQL數據的差異屏蔽,同步模式下采用長連接,如果連接斷開,驅動會自動重連,唯一的區別在于同步模式不需要添加yield關鍵字,如:
### MySQL同步Task
```php
<?php
/**
* Demo Task
*
* 注意理論上本文件代碼應該在Tasker進程中執行
*/
namespace App\Tasks;
use \PG\MSF\Tasks\Task;
/**
* Class Demo
* @package App\Tasks
*/
class Demo extends Task
{
/**
* 連接池執行同步查詢
*
* @return array
*/
public function syncMySQLPool()
{
$user = $this->getMysqlPool('master')->select("*")->from("user")->go();
return $user;
}
/**
* 代理執行同步查詢
*
* @return array
*/
public function syncMySQLProxy()
{
$user = $this->getMysqlProxy('master_slave')->select("*")->from("user")->go();
return $user;
}
/**
* 連接池執行同步事務
*
* @return boolean
*/
public function syncMySQLPoolTransaction()
{
$mysqlPool = $this->getMysqlPool('master');
$id = $mysqlPool->begin();
// 開啟一個事務,并返回事務ID
$up = $mysqlPool->update('user')->set('name', '徐典陽-1')->where('id', 3)->go($id);
$ex = $mysqlPool->select('*')->from('user')->where('id', 3)->go($id);
if ($ex['result']) {
$mysqlPool->commit();
return true;
} else {
$mysqlPool->rollback();
return false;
}
}
/**
* 代理執行同步事務
*
* @return boolean
*/
public function syncMySQLProxyTransaction()
{
$mysqlPool = $this->getMysqlProxy('master_slave');
$id = $mysqlPool->begin();
// 開啟一個事務,并返回事務ID
$up = $mysqlPool->update('user')->set('name', '徐典陽-1')->where('id', 3)->go($id);
$ex = $mysqlPool->select('*')->from('user')->where('id', 3)->go($id);
if ($ex['result']) {
$mysqlPool->commit();
return true;
} else {
$mysqlPool->rollback();
return false;
}
}
}
```
示例代碼:
[https://github.com/pinguo/php-msf-demo/app/Tasks/Demo.php](https://github.com/pinguo/php-msf-demo/blob/master/app/Tasks/Demo.php)
### 調用MySQL同步查詢數據
```php
<?php
/**
* MySQL示例控制器
*
* app/data/demo.sql可以導入到mysql再運行示例方法
*
* @author camera360_server@camera360.com
* @copyright Chengdu pinguo Technology Co.,Ltd.
*/
namespace App\Controllers;
use PG\MSF\Controllers\Controller;
use App\Tasks\Demo as DemoTask;
class MySQL extends Controller
{
// 通過Task,同步執行MySQL查詢(連接池)
public function actionSyncMySQLPoolTask()
{
/**
* @var DemoTask $demoTask
*/
$demoTask = $this->getObject(DemoTask::class);
$user = yield $demoTask->syncMySQLPool();
$this->outputJson($user);
}
// 通過Task,同步執行MySQL查詢(代理)
public function actionSyncMySQLProxyTask()
{
/**
* @var DemoTask $demoTask
*/
$demoTask = $this->getObject(DemoTask::class);
$user = yield $demoTask->syncMySQLProxy();
$this->outputJson($user);
}
// 通過Task,同步執行MySQL事務查詢(連接池)
public function actionSyncMySQLPoolTaskTransaction()
{
/**
* @var DemoTask $demoTask
*/
$demoTask = $this->getObject(DemoTask::class);
$user = yield $demoTask->syncMySQLPoolTransaction();
$this->outputJson($user);
}
// 通過Task,同步執行MySQL事務查詢(代理)
public function actionSyncMySQLProxyTaskTransaction()
{
/**
* @var DemoTask $demoTask
*/
$demoTask = $this->getObject(DemoTask::class);
$user = yield $demoTask->syncMySQLProxyTransaction();
$this->outputJson($user);
}
}
```
- 0 文檔說明
- 1 為什么研發新框架
- 1.1 傳統php-fpm工作模式的問題
- 1.2 壓測數據對比
- 1.3 小結
- 2 微服務框架研發概覽
- 2.1 通信框架技術選型
- 2.2 swoole
- 2.3 協程原理
- 2.4 異步、并發
- 2.5 小結
- 3 框架運行環境
- 3.1 環境變量
- 3.2 運行代碼
- 3.3 docker
- 3.4 小結
- 4 框架結構
- 4.1 結構概述
- 4.2 控制器
- 4.3 模型
- 4.4 視圖
- 4.5 同步任務
- 4.6 配置
- 4.7 路由
- 4.8 小結
- 5 框架組件
- 5.1 協程
- 5.2 類的加載
- 5.3 異步Http Client
- 5.4 請求上下文
- 5.5 連接池
- 5.6 對象池
- 5.7 RPC
- 5.8 公共庫
- 5.9 RESTful
- 5.10 多語言
- 5.11 雜項
- 5.12 小結
- 6 常見問題
- 7 附錄