<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                # 前言 今年接觸了一個策略類手游相關的項目,后端本身計劃是使用skynet進行開發的,后來結合項目的時間緊急程度和客戶端開發組討論后決定使用PHP進行快速開發,后期再使用其他語言框架進行拆分業務;綜合考慮最后選用了webman作為主要開發框架。 整體項目分為配置服務、http-api服務、websocket服務三大部分,其中配置管理主要是兼容客戶端生成的配置數據進行導入導出轉換加載,底層使用MySQL進行儲存,多服務間使用Redis進行一級緩存,服務進程間使用了基于APCu的共享緩存,后期我將該共享緩存組件化也貢獻給了社區。 [https://www.workerman.net/plugin/133](https://www.workerman.net/plugin/133) # Redis 在游戲開發界實際上使用Redis的情況還是比較多的,我們使用Redis主要還是為了將一些數據緩存共享給各個服務器實例: ~~~ ┌─────┐ ┌─────┐ | A | ────────────> service <──────────── | B | └─────┘ └─────┘ / | \ / | \ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ | a | | b | | c | ───────> instance <─────── | a | | b | | c | └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ | | | | | | 1|2 1|2 1|2 ────────> process <──────── 1|2 1|2 1|2 3|4 3|4 3|4 3|4 3|4 3|4 ~~~ 如圖所示,我們A/B為區服,每個區服下可能存在abc不同的服務器實例,他們需要共享相同的區服配置;每個區服各自管理自己的數據庫數據區域/數據庫實例;每個區服下的服務器實例對于數據庫數據的要求是強需求,且為變動較為頻繁的數據內容,與web的微服務有區別,所以我們沒有使用類似Nacos或者其他配置中心進行處理,從而用更適配當前場景的Redis作為緩存服務。 同時Redis也可以作為用戶登錄鑒權相關中的一環,也可以為運營相關功能提供一些輔助,比如使用Redis-Stream作為消息隊列,處理一些事件通知等。 # 共享內存 在游戲開發中,許多業務都是在內存中進行的計算處理,而我們上述的模式是多進程模式,進程間通訊是一個比較頻繁出現的點;一開始解決這個問題是粗暴的將一些固定業務固定在對應的進程上執行,盡可能避免進程間的通訊問題,后來隨著業務逐步的擴大,單純限制業務是沒辦法完全實現的,這時候有考慮過使用webman的channel;但實際上channel基于socket涉及系統內核態用戶態的拷貝等問題,同時受網絡影響受限,在一些業務的計算處理上會帶來比較高的延遲,包括Redis也同樣是這樣的問題,我們需要實現數據的零拷貝。 后續我們的目標鎖定在了共享內存上,因為共享內存可以輕易的在進程間進行通訊交換,而且不存在深拷貝和網絡等問題,效率、性能非常的高,整體微秒級別的響應滿足我們的需求;于是我基于PHP的拓展APCu封裝了適合我們業務場景的插件包進行使用。 # webman-shared-cache 我們的基礎應用實現了定時器來從MySQL數據庫讀取配置信息,定時器的處理器也在讀取數據刷入Redis的同時觸發共享內存的更新事件,上層業務通過更新事件的回調出發會將Redis的數據刷入共享內存中,以便當前區服實例的各個進程能夠使用。 我們使用緩存的場景很多都是MAP數據,所以我在實現插件的時候特別實現了類似Redis-Hash相關的功能: * HSet/HGet/HDel/HKeys/HExists 由于我們需要一些自增自減的運算,所以也實現了以下功能點: * HIncr/HDecr,支持浮點運算 由于APCu的特性所以儲存的數據也是支持儲存對象數據的; # webman-shared-cache為何使用鎖? 之前我有和社區的同學們聊過,他們不是很理解為什么我在實現插件的時候自己使用了鎖,這是因為APCu本身的自行實現了對它自身函數的原子性操作,但我們使用它的時候是在多進程的環境下,每一個進程內存在多次APCu的操作,為了業務的原子性,我們希望這多次的操作要在一個原子性內完成,所以需要一個鎖來進行隔離,以免在多進程的環境下被其他進程的操作污染,整體是類似MySQl的事務的: ~~~php protected static function _HIncr(string $key, string|int $hashKey, int|float $hashValue = 1): bool|int|float { $func = __FUNCTION__; $result = false; $params = func_get_args(); self::_Atomic($key, function () use ( $key, $hashKey, $hashValue, $func, $params, &$result ) { $hash = self::_Get($key, []); if (is_numeric($v = ($hash[$hashKey] ?? 0))) { $hash[$hashKey] = $result = $v + $hashValue; self::_Set($key, $hash); } return [ 'timestamp' => microtime(true), 'method' => $func, 'params' => $params, 'result' => null ]; }, true); return $result; } ~~~ 比如上述代碼,就是一個Hash key的自增操作,我們需要在讀取Hash后在寫入,讀取和寫入應為一體的; 原子性執行函數Atomic的實現如下: ~~~php /** * 原子操作 * - 無法對鎖本身進行原子性操作 * - 只保證handler是否被原子性觸發,對其邏輯是否拋出異常不負責 * - handler盡可能避免超長阻塞 * - lockKey會被自動設置特殊前綴#lock#,可以通過Cache::LockInfo進行查詢 * * @param string $lockKey * @param Closure $handler * @param bool $blocking * @return bool */ protected static function _Atomic(string $lockKey, Closure $handler, bool $blocking = false): bool { $func = __FUNCTION__; $result = false; if ($blocking) { $startTime = time(); while ($blocking) { // 阻塞保險 if (time() >= $startTime + self::$fuse) {return false;} // 創建鎖 apcu_entry($lock = self::GetLockKey($lockKey), function () use ( $lockKey, $handler, $func, &$result, &$blocking ) { $res = call_user_func($handler); $result = true; $blocking = false; return [ 'timestamp' => microtime(true), 'method' => $func, 'params' => [$lockKey, '\Closure'], 'result' => $res ]; }); } } else { // 創建鎖 apcu_entry($lock = self::GetLockKey($lockKey), function () use ( $lockKey, $handler, $func, &$result ) { $res = call_user_func($handler); $result = true; return [ 'timestamp' => microtime(true), 'method' => $func, 'params' => [$lockKey, '\Closure'], 'result' => $res ]; }); } if ($result) { apcu_delete($lock); } return $result; } ~~~ 當使用阻塞模式的時候,我們會在當前進程內使用一個while循環來進行阻塞搶占,為了不將當前進程阻塞死,我們還加入了一個保險,由`self::$fuse`提供; ## 注意 這里在實踐過程中需要注意的是,Atomic在傳入回調函數時切勿再使用匿名函數作為參數值或者是通過use傳入一個匿名函數,如: ~~~ $fuc = function() { // do something } Cache::Atomic('test', function () use ($fuc) { // do anything }) ~~~ APCu底層會對函數參數值或引用參數進行序列化儲存,但匿名函數不可以被序列化,所以會拋出一個異常;但你可以通過當前對象的屬性值或者靜態屬性來保存一個匿名函數,然后在Atomic的回調內調用使用。 # 0.4.x版本 由于目前我使用Webman基于SQLite和共享內存在自行實現一個具備RAFT的輕調度服務插件和服務注冊與發現插件,所以特此為其完善增加了Channel特性; Channel可以輔助實現類似Redis-List、Redis-stream、Redis-Pub/Sub的功能。 ## Channel Channel是個特殊的數據格式,他的格式是固定如下的: ~~~ [ '--default--' => [ 'futureId' => null, 'value' => [] ], workerId_1 => [ 'futureId' => 1, 'value' => [] ], workerId_2 => [ 'futureId' => 1, 'value' => [] ], ...... ] ~~~ 它在共享內存中的鍵默認以**#Channel#**開頭。 * **\--default--**是默認儲存空間,**workerId\_1/workerId\_2**等是子通道儲存空間,命名是由用戶代碼傳入的,這里**建議使用workerman自帶的workerId**即可。 * 默認儲存空間和子通道儲存空間是互斥的,也就是說當存在子通道儲存空間時,是不存在--default--的,反之亦然;子通道儲存空間是當當前通道存在監聽器時生成的,而在監聽器產生前,消息會暫存在--default--空間,當監聽器創建時,--default--的數據value會被同步到子通道儲存空間內,**加入value的隊頭**。 * 每一個子通道儲存空間的value都是拷貝的,存在相同的數據,各自監聽器監聽各自的子通道儲存空間;消息的發布支持向所有子通道發布,也可以指定子通道進行發布。 * 監聽器的底層使用了workerman的定時器,區別與workerman的timer,在event驅動下定時器的間隔是0,也就是一個future,而其他的事件驅動是0.001s為間隔。 ## 實現一個List 由于監聽器創建消費是基于workerId的,我們可以通過不同進程創建相同的workerId的監聽器來對同一個子通道進行監聽: 1. A進程使用list作為workerId: ~~~ Cache::ChCreateListener('test', 'list', function(string $channelKey, string|int $workerId, mixed $message) { // TODO 你的業務邏輯 }); ~~~ 2. B進程也同樣創建list的workerId監聽器: ~~~ Cache::ChCreateListener('test', 'list', function(string $channelKey, string|int $workerId, mixed $message) { // TODO 你的業務邏輯 }); ~~~ 3. 此時Channel test的數據如下: ~~~ [ 'list' => [ 'futureId' => 1, 'value' => [] ], ...... ] ~~~ **注意:共享內存中儲存的futureId為最后一個監聽器創建的futureId;當當前進程需要對監聽器進行移除時,請勿使用該數據,對應進程內可以通過`Cache::ChCreateListener()`的返回值獲取到當前進程創建的futureId用于移除監聽器,不使用共享內存中儲存的futureId即可** 4. 這時任意進程通過`Cache::ChPublish('test', '這是一個測試消息', true);`發送消息,或者指定workerId`Cache::ChPublish('test', '這是一個測試消息', true, 'list');`。 ## 實現一個Pub/Sub 1. A進程使用workerman的workerId作為workerId: ~~~ Cache::ChCreateListener('test', $worker->id, function(string $channelKey, string|int $workerId, mixed $message) { // TODO 你的業務邏輯 }); ~~~ 2. B進程使用workerman的workerId作為workerId: ~~~ Cache::ChCreateListener('test', $worker->id, function(string $channelKey, string|int $workerId, mixed $message) { // TODO 你的業務邏輯 }); ~~~ 3. 此時Channel test的數據可能如下: ~~~ [ 1 => [ 'futureId' => 1, 'value' => [] ], 2 => [ 'futureId' => 1, 'value' => [] ] ] ~~~ 4. 這時,任意進程通過`Cache::ChPublish('test', '這是一個測試消息', false);`發送消息即可。 **注:發送消息第三個參數使用false時,如發送時還未創建監聽器,消息則不會儲存至Channel,即監聽后才可存在消息** ## 實現類似Redis-stream 與Pub/Sub相同,只不過發布消息使用`Cache::ChPublish('test', '這是一個測試消息', true);`, 當發布消息指定workerId時,可以實現類似Redis-Stream Group的功能。 **注:這里更復雜的功能可能需要對workerId進行變通,不能簡單使用workerman自帶的workerId,只需要自行規劃好即可**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看