<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ZMQ三種基本模型 ~~~ 1、Request-Reply 模型(請求與應答模式) 2、Publisher-Subscriber模型(發布與訂閱模式) 3、Parallel Pipeline模型(并行管道模式) ~~~ ## **Request-Reply 模型(請求與應答模式)** 客戶端和服務端都可以是 1:N 的模型。通常把 1 認為是服務端,N 認為是客戶端 。ZMQ可以很好的支持路由功能(實現路由功能的組件叫作 Device),把 1:N 擴展為N:M(只需要加入若干路由節點)。 server.php代碼如下: ~~~ <?php //創建一個新的套接字上下文 $context = new ZMQContext(); //創建一個ZMQ響應套接字 $rep = new ZMQSocket($context, ZMQ::SOCKET_REP); //綁定端口 $rep->bind("tcp://127.0.0.1:6666"); while(true) { //循環處理消息 //獲取消息 $req = $rep->recv(); echo "Received Message: {$req} \r\n"; sleep(1); //向客戶端發送消息 $rep->send('World'); } ~~~ client.php代碼如下: ~~~ <?php //創建一個新的套接字上下文 $context = new ZMQContext(); //創建一個ZMQ請求套接字 $req = new ZMQSocket($context, ZMQ::SOCKET_REQ); //連接到端口 $req->connect("tcp://127.0.0.1:6666"); for($ix = 0; $ix < 5; ++$ix) { //發送請求 $req->send('Hello'); $reply = $req->recv(); echo "Received Reply: {$reply} \r\n"; } ~~~ >[danger]需要注意如下幾點: 1、服務端和客戶端無論誰先啟動,效果是相同的,這點不同于 Socket。 2、在服務端收到信息以前,程序是阻塞的,會一直等待客戶端連接上來。 3、服務端收到信息以后,會send一個"World"給客戶端。 值得注意的是一定是client連接上來以后, send 消息給 Server,然后 Server 再 recv 然后響應 client,這種一問一答式的。 如果 Server 先 send,client 先 recv 是會報錯的。 4、ZMQ通信通信單元是消息,他除了知道 Bytes 的大小,他并不關心的消息格式。 因此,你可以使用任何你覺得好用的數據格式。Xml、Protocol Buffers、Thrift、json等。 5、雖然可以使用 ZMQ 實現 HTTP 協議,但是,這絕不是它所擅長的。 ## **Publisher-Subscriber模型(發布與訂閱模式)** server.php代碼如下: ``` <?php //創建一個新的套接字上下文 $context = new ZMQContext(); //創建一個ZMQ發布套接字 $pub = new ZMQSocket($context, ZMQ::SOCKET_PUB); //綁定端口 $pub->bind("tcp://127.0.0.1:7777"); while(true) { $typeArr = array('A', 'B', 'C', 'D'); $tmp = mt_rand(0, 3); $type = $typeArr[$tmp]; $num = mt_rand(1, 9999); $update = sprintf('%s %04d', $type, $num); echo "Send Message: {$update} \r\n"; sleep(1); //向客戶端發送消息 $pub->send($update); } ``` client.php代碼如下: ``` <?php //創建一個新的套接字上下文 $context = new ZMQContext(); //創建一個ZMQ訂閱套接字 $sub = new ZMQSocket($context, ZMQ::SOCKET_SUB); //連接到端口 $sub->connect("tcp://127.0.0.1:7777"); $filter = 'B'; //設置消息過濾器,只接收類型為'B'的消息 $sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter); for($ix = 0; $ix < 50; ++$ix) { $update = $sub->recv(); sscanf($update, '%s %d', $type, $num); echo "type: {$type} num: {$num} \r\n"; } ``` 上述代碼表示,Pub端不斷的發送類型在(A,B,C,D)中的隨機數$num,而Sub端通過設置消息過濾,只接收類型為B的消息,接收50次后就退出。 >[danger]注意點如下: 1、與Hello World不同的是,Socket的類型變成SOCKET_PUB和SOCKET_SUB類型。 2、客戶端需要$sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter); 設置一個過濾值,相當于設定一個訂閱頻道,否則什么信息也收不到。 3、服務器端一直不斷的廣播中,如果中途有 Sub 端退出,并不影響他繼續的廣播, 當 Sub 再連接上來的時候,收到的就是后來發送的新的信息了。 這對比較晚加入的,或者是中途離開的訂閱者,必然會丟失掉一部分信息,這是這個模式的一個問題。 4、但是,如果 Pub 中途離開,所有的 Sub 會阻塞住,等待 Pub 再上線的時候,會繼續接受信息。 ## **Parallel Pipeline模型(并行管道模式)** send.php代碼如下: ``` <?php //創建一個新的套接字上下文 $context = new ZMQContext(); //創建一個ZMQ分發套接字 $sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH); //綁定端口 $sed->bind("tcp://127.0.0.1:8881"); //統計從0到1000000累加數 $num = 1000000; //分50步 $step = 50; //每步大小 $size = $num / $step; //開始大小 $start = 0; //結束大小 $end = 0; //將任務分發給worker節點 for($ix = 1; $ix <= $step; ++$ix) { $end = $start + $size; $data = sprintf('%d %d', $start, $end); echo "start: {$start} end: {$end} \r\n"; $sed->send($data); $start = $end; } ``` worker.php代碼如下: ``` <?php //創建一個新的套接字上下文 $context = new ZMQContext(); $rev = new ZMQSocket($context, ZMQ::SOCKET_PULL); $rev->connect("tcp://127.0.0.1:8881"); $sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH); $sed->connect("tcp://127.0.0.1:8882"); while(true) { //獲取分發過來的數據 $data = $rev->recv(); sscanf($data, '%d %d', $start, $end); //計算累加和 $total = 0; for($ix = $start; $ix < $end; ++$ix) { $total += $ix; } echo "worker start: {$start} end: {$end} total: {$total} \r\n"; //把計算的結果發送給result $sed->send($total); } ``` result.php代碼如下: ``` <?php //創建一個新的套接字上下文 $context = new ZMQContext(); $rev = new ZMQSocket($context, ZMQ::SOCKET_PULL); $rev->bind("tcp://127.0.0.1:8882"); $step = 50; $total = 0; for($ix = 1; $ix <= $step; ++$ix) { //接收worker計算后的結果 $result = $rev->recv();   //累加結果 $total += $result; } //輸出最后的計算結果 echo "result: {$total} \r\n"; ``` send.php通過把累加任務分發給50個worker節點計算,然后worker節點計算完成后,把結果發送給result.php進行統一的匯總。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看