## 連接池的意義
對于基于php-fpm的傳統php-web應用,包括且不限于Mysql,Redis,RabbitMq,每次請求到來都需要為其新建一套獨享的的連接,這直接帶來了一些典型問題:
1. **連接開銷** : 連接隨著http請求到來而新建,隨著請求返回而銷毀,大量連接新建銷毀是對系統資源的浪費。
2. **連接數量過高** :每一個請求都需要一套自己的連接,系統連接數和并發數會成一個近線性的關系。如果系統并發量達到了1w,那么就需要建立1w個對應的連接,這對于Mysql之類的后端服務而言,是一個大的負荷。
3. **空閑連接**:假設我們有一個接口使用了一個Mysql連接。該接口在一開始進行一次sql查詢后,后面的操作都是sql無關的,那么該請求占據的空閑連接完全就是一種資源的浪費。
對于異步系統而言,這個問題變得更加的嚴峻。一個請求處理進程要對同一個服務進行并發的操作,意味著這個請求要持有1個以上同類的連接,這對于系統壓力而言,無疑是雪上加霜了,所以連接池對于基于Swoole的Web框架而言已經是一個必需實現的機制了。事實上基本所有的框架都內置了連接池管理機制,比如 [easyswoole Pool管理器](http://www.easyswoole.com/Manual/3.x/Cn/_book/Components/CoroutinePool/pool.html),[swoft 連接池](https://doc.swoft.org/master/zh-CN/core/connection-pool.html) 等等。
## 實現Mysql連接池
一個基本連接池,大致要實現以下功能:
1. 創建連接:連接池啟動后,初始化一定的空閑連接,指定為最少的連接min。當連接池為空,不夠用時,創建新的連接放到池里,但不能超過指定的最大連接max數量。
2. 連接釋放:每次使用完連接,一定要調用釋放方法,把連接放回池中,給其他程序或請求使用。
3. 連接分配:連接池中用pop和push的方式對等入隊和出隊分配與回收。能實現阻塞分配,也就是在池空并且已創建數量大于max,阻塞一定時間等待其他請求的連接釋放,超時則返回null。
4. 連接管理:對連接池中的連接,定時檢活和釋放空閑連接等。
**連接池管理器**
~~~
<?php
/**
* 連接池抽象類
*/
use Swoole\Coroutine\Channel;
abstract class AbstractPool
{
private $min;//最少連接數
private $max;//最大連接數
private $count;//當前連接數
private $connections;//連接池組
protected $spareTime;//用于空閑連接回收判斷
private $inited = false;
protected abstract function createDb();
public function __construct()
{
$this->min = 10;
$this->max = 100;
$this->spareTime = 10 * 3600;
$this->connections = new Channel($this->max + 1);
}
protected function createObject()
{
$obj = null;
$db = $this->createDb();
if ($db) {
$obj = [
'last_used_time' => time() ,
'db' => $db ,
];
}
return $obj;
}
/**
* 初始化最小數量連接池
* @return $this|null
*/
public function init()
{
if ($this->inited) {
return null;
}
for ($i = 0;$i < $this->min;$i ++) {
$obj = $this->createObject();
$this->count ++;
$this->connections->push($obj);
}
return $this;
}
public function getConnection($timeOut = 3)
{
$obj = null;
if(!$this->connections->isEmpty()){
return $this->connections->pop($timeOut);
}
// 大量并發請求過多,連接池connections為空
if ($this->count < $this->max) {
// 連接數沒達到最大,新建連接返回
$this->count ++;
$obj = $this->createObject();
return $obj;
}
// timeout為出隊的最大的等待時間
// 如果超過最大等待時間后會返回false,客戶端要判斷一下
// 此處還起到一個限流作用
return $this->connections->pop($timeOut);
}
/*
* 鏈接使用完進行回收
*/
public function free($obj)
{
if ($obj) {
$this->connections->push($obj);
}
}
/**
* 處理空閑連接
*/
public function gcSpareObject()
{
//大約2分鐘檢測一次連接
swoole_timer_tick(
120000 , function (){
$list = [];
while (true) {
if (!$this->connections->isEmpty()) {
// 等待的時間要快,免得鏈接被用掉
$obj = $this->connections->pop(0.001);
$last_used_time = $obj['last_used_time'];
// 超過$this->spareTime的認為是空閑連接,pop掉
if (time() - $last_used_time > $this->spareTime) {
$this->count --;
} else {
// 沒超過就繼續push回去
array_push($list , $obj);
}
} else {
break;
}
}
foreach ($list as $item) {
$this->connections->push($item);
}
unset($list);
// keepMin(); 處理完之后就要保證最低連接數
$this->keepMin();
}
);
}
private function keepMin()
{
if ($this->count >= $this->min) {
return $this->count;
} else {
$num = $this->min - $this->count;
}
for ($i = 0;$i < $num;$i ++) {
$obj = $this->createObject();
$this->count ++;
$this->connections->push($obj);
}
return $this->count;
}
}
~~~
**pdo同步客戶端鏈接**
~~~
<?php
require "AbstractPool.php";
class MysqlPoolPdo extends AbstractPool
{
//數據庫配置
protected $dbConfig = [
'host' => 'mysql:host=127.0.0.1:3306;dbname=test' ,
'port' => 3306 ,
'user' => 'root' ,
'password' => '123456' ,
'database' => 'test' ,
'charset' => 'utf8' ,
'timeout' => 2 ,
];
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolPdo();
}
return self::$instance;
}
protected function createDb()
{
return new PDO($this->dbConfig['host'] , $this->dbConfig['user'] , $this->dbConfig['password']);
}
}
$httpServer = new swoole_http_server('0.0.0.0' , 9501);
$httpServer->set(
['worker_num' => 1]
);
$httpServer->on(
"WorkerStart" , function (){
// 初始化最少數量(min指定)的連接對象,放進類型為Channel的connections對象中。
MysqlPoolPdo::getInstance()->init();
}
);
$httpServer->on(
"request" , function ($request , $response){
$db = null;
// 從channle中pop數據庫鏈接對象出來
$obj = MysqlPoolPdo::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj['db'];
}
if ($db) {
// 此時如果并發了10個請求,server因為配置了1個worker,所以再pop到一個對象返回時,遇到sleep()的查詢,
// 因為用的連接對象是pdo的查詢,是同步阻塞的,所以此時的woker進程只能等待,完成后才能進入下一個請求。
// 因此,池中的其余連接其實是多余的,同步客戶端的請求速度只能和woker的數量有關。
// ab -c 10 -n 10 http://127.0.0.1:9501/
$db->query("select sleep(2)");
$ret = $db->query("select * from tb_game limit 1");
// 使用完鏈接對象就回收
MysqlPoolPdo::getInstance()->free($obj);
$response->end(json_encode($ret));
}
}
);
$httpServer->start();
~~~
**協程異步客戶端鏈接**
~~~
<?php
require "AbstractPool.php";
class MysqlPoolCoroutine extends AbstractPool
{
protected $dbConfig = [
'host' => '127.0.0.1' ,
'port' => 3306 ,
'user' => 'root' ,
'password' => '123456' ,
'database' => 'test' ,
'charset' => 'utf8' ,
'timeout' => 10 ,
];
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolCoroutine();
}
return self::$instance;
}
protected function createDb()
{
$db = new Swoole\Coroutine\Mysql();
$db->connect(
$this->dbConfig
);
return $db;
}
}
$httpServer = new swoole_http_server('0.0.0.0' , 9501);
$httpServer->set(
['worker_num' => 1]
);
$httpServer->on(
"WorkerStart" , function (){
MysqlPoolCoroutine::getInstance()->init();
}
);
$httpServer->on(
"request" , function ($request , $response){
$db = null;
$obj = MysqlPoolCoroutine::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj['db'];
}
if ($db) {
// 遇上sleep阻塞時,woker進程不是在等待select的完成,而是切換到另外的協程去處理下一個請求。
$db->query("select sleep(2)");
$ret = $db->query("select * from tb_game limit 1");
// 完成后同樣釋放對象到池中
MysqlPoolCoroutine::getInstance()->free($obj);
$response->end(json_encode($ret));
}
}
);
$httpServer->start();
~~~
可以通過以下語句來觀察兩者之間的差異:
ab -c 10 -n 100 http://127.0.0.1:9501/
# mysql console內執行
SHOW STATUS LIKE 'Threads%';
show processlist;
網站之所以慢絕大多數原因是發生了阻塞。而發生阻塞的原因多半是mysql阻塞。
>[info] 更完整的連接池代碼請參考 easyswoole /vendor/easyswoole/component/src/Pool/AbstractPool.php