<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                # Mysql 驅動 ## 數據庫表結構 *Driver/queue.sql* ``` CREATE TABLE `queues` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `tube` varchar(30) NOT NULL DEFAULT 'default', `status` enum('ready','reserved') DEFAULT 'ready', `job_data` text NOT NULL, `attempts` int(11) NOT NULL DEFAULT '0', `sort` int(10) NOT NULL DEFAULT '100', `reserved_at` int(11) DEFAULT NULL, `available_at` int(11) DEFAULT NULL, `created_at` int(11) DEFAULT NULL, PRIMARY KEY (`id`), KEY `queues_index` (`tube`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ``` > 字段意義可以參考《實現框架》一節中的 Job.php 注釋 > 為tube加上索引,可以提升根據 tube 接收 job 的效率 ## 驅動邏輯 *Driver/MysqlDriver.php* ``` <?php // +---------------------------------------------------------------------- // | MysqlDriver.php // +---------------------------------------------------------------------- // | Description: mysql驅動 // +---------------------------------------------------------------------- // | Time: 2018/12/19 上午11:17 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐貍<2252390865@qq.com> // +---------------------------------------------------------------------- namespace Driver; class MysqlDriver implements QueueI { private $conn; // 數據庫連接 private $config; // 配置 private $table; // 表 private $select_suffix; // 查詢的前綴 private $delete_suffix; // 刪除的前綴 private $update_suffix; // 更新的前綴 private $insert_suffix; // 插入的前綴 public function __construct($options = []) { $this->config = $options; $this->conn = new \PDO( $this->config['dsn'], $this->config['username'], $this->config['password'] ); $field_string = Job::$field_string; $this->table = $this->config['table']; $this->select_suffix = "SELECT {$field_string} FROM {$this->table}"; $this->delete_suffix = "DELETE FROM {$this->table}"; $this->update_suffix = "UPDATE {$this->table}"; $this->insert_suffix = "INSERT INTO {$this->table}"; } public function tubes(): array { $sql = "SELECT `tube` FROM {$this->table} GROUP BY `tube`"; $res = $this->conn->query($sql); if (!$res) { throw new \PDOException('查詢錯誤:' . $sql . '-錯誤提示:' . json_encode($statement->errorInfo())); } return $res->fetchAll(\PDO::FETCH_ASSOC); } public function delete(Job $job): bool { if (!$job->id) { throw new \Exception('job id 不能為空'); } $sql = "{$this->delete_suffix} WHERE id = :id"; $statement = $this->conn->prepare($sql); $res = $statement->execute([':id' => $job->id]); return $res; } public function jobs(string $tube): array { $sql = "{$this->select_suffix} WHERE tube = :tube"; $statement = $this->conn->prepare($sql); $res = $statement->execute([':tube' => $tube]); if (!$res) { throw new \PDOException('查詢錯誤:' . $sql . '-錯誤提示:' . json_encode($statement->errorInfo())); } return Job::arr2job($statement->fetchAll(\PDO::FETCH_ASSOC)); } public function put(Job $job): Job { // 組裝sql $sql = "{$this->insert_suffix}"; $field = ''; $prepare = ''; $value = []; foreach (Job::$field as $v) { if ($job->$v) { $field .= "{$v},"; $prepare .= ":{$v},"; $value[":{$v}"] = $job->$v; } } $field = '(' . trim($field, ',') . ')'; $prepare = '(' . trim($prepare, ',') . ')'; $sql = "{$sql} {$field} VALUES {$prepare}"; // 執行sql $statement = $this->conn->prepare($sql); $res = $statement->execute($value); // 結果 if (!$res) { throw new \PDOException("插入錯誤:" . $sql . '-錯誤提示:' . json_encode($statement->errorInfo())); } $job->id = $this->conn->lastInsertId(); return $job; } public function reserve(string $tube): Job { $time = time(); $over_time = $time - $this->config['ttr']; $sql = "{$this->select_suffix} WHERE (status = 'ready' OR (status = 'reserved' AND reserved_at <= {$over_time})) AND available_at <= {$time} AND tube = :tube ORDER BY sort limit 1"; $statement = $this->conn->prepare($sql); $res = $statement->execute([':tube' => $tube]); if (!$res) { throw new \PDOException('查詢錯誤:', $sql); } if ($data = $statement->fetch()) { $job = new Job($data); $attempts = $job->attempts + 1; $time = time(); $sql = "{$this->update_suffix} SET status='reserved',attempts = {$attempts},reserved_at = {$time} WHERE id = {$job->id}"; $rows = $this->conn->exec($sql); if ($rows <= 0) { throw new \PDOException('更新出錯:' . $sql . '-錯誤提示:' . json_encode($statement->errorInfo())); } return $job; } return new Job(); } } ``` > 關于mysql的實現,主要觀察其實現接口每個方法的sql與流程,特別是reserve方法,當接收成功之后,還需更新該條消息的狀態。 # Redis 驅動 在這里,我們使用 redis 的 list 和 sorted-set 來實現數據的存儲,因為它本身具備順序和去重的特性。 > redis 服務的搭建與操作,此處不做介紹,這里主要模擬實現消息隊列 另外,你的 PHP 環境還需開啟 Redis 擴展。 接下來,是驅動的實現。 *Driver/RedisDriver.php* ``` <?php // +---------------------------------------------------------------------- // | RedisDriver.php // +---------------------------------------------------------------------- // | Description: redis驅動 // +---------------------------------------------------------------------- // | Time: 2018/12/19 上午11:17 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐貍<2252390865@qq.com> // +---------------------------------------------------------------------- namespace Driver; class RedisDriver implements QueueI { private $conn; private $config; private $tubes_key; public function __construct($options = []) { $this->conn = new \Redis(); $this->conn->connect($options['ip'], $options['port']); if (isset($options['password'])) { $this->conn->auth($options['password']); } $this->tubes_key = $options['tubes']; } public function tubes(): array { // 使用 sorted-set 存儲當前擁有的隊列,比如你 default、test、sms 隊列 return $this->conn->zRange($this->tubes_key, 0, -1); } public function jobs(string $tube): array { return Job::arr2job($this->conn->lRange($tube, 0, -1)); } public function put(Job $job): Job { // 維護 tube 集合,可實現不重復 $this->conn->zAdd($this->tubes_key, 1, $job->tube); // 用 list 存儲隊列內容,返回的隊列長度,就是這個 job 在 list 中的下標 if ($id = $this->conn->lPush($job->tube, json_encode($job))) { $job->id = $id; } else { throw new \RedisException('插入失敗'); } return $job; } public function delete(Job $job): bool { // 在 redis 的 list 中不可使用 lRem 來刪除具體項,具體原因,在后面測試一節描述 return true; } public function reserve(string $tube): Job { // redis 的rPop在接收時就會將 job 從 list 中刪除,所以,沒有 reserve 狀態 if ($data = $this->conn->rPop($tube)) { $job = json_decode($data, true); } return new Job($job ?? []); } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看