<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                Work Queues(工作隊列) 大體結構如下所示: ![](http://www.rabbitmq.com/img/tutorials/python-two.png) 在第一節的教程里,我們創建了一個程序,發送和接收消息,從一個named queue(命名隊列 )。 本節,我們會創建一個 Work Queue(工作隊列),用來分發耗時任務給多個Workers(工人)。 使用Work Queues(別名:Task Queue)是為了避免立即做一個資源密集型任務,而不得不等待它完成。我們可以把這個耗時的任務封裝提取起來作為message,發送給一個queue。一個Worker 后臺進程會獲取task,然后執行他。當有多個Workers 時,他們平分這些task。 ##Preparation 準備 上一節的教程,我們發送已一條包含“Hello World”的消息。這一節我們要發送一個耗時的任務。為了簡單起見,我們使用 `Thread.Sleep()`方法, ###發送端 為了簡單起見,我們修改上一節的`Send.cs`的代碼,從命令行獲取message的參數。重命名文件為“NewTask.cs” ``` var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); ``` 其中 `GetMessage(string[] args)`方法定義如下: ``` private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } ``` ###接收端 接下來修改上衣節的`Receive.cs`文件,我們重命名為“Worker.cs” ``` var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); ``` 然后編譯代碼。 ##Round-robin dispatching 循環調度 使用Task Queue的一個優點就是可以很容易的平均分配任務。如果queue里有堆積過多的任務,我們可以添加更多的Worker就行了。 接下來,我們運行兩個Worker.cs的實例,分別為C1和C2 C1 ``` shell1$ Worker.exe Worker [*] Waiting for messages. To exit press CTRL+C ``` C2 ``` shell2$ Worker.exe Worker [*] Waiting for messages. To exit press CTRL+C ``` 在接下來我們運行 NewTask.exe 來發布任務 ``` shell3$ NewTask.exe First message. shell3$ NewTask.exe Second message.. shell3$ NewTask.exe Third message... shell3$ NewTask.exe Fourth message.... shell3$ NewTask.exe Fifth message..... ``` 我們會看到任務的分配情況 C1 ``` shell1$ Worker.exe [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....' ``` C2 ``` shell2$ Worker.exe [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....' ``` 默認,RabbitMQ會順序的,平均的把任務發給每個consumer,到最后每個Consumer會得到相同數量的任務。 ##Message acknowledgment 執行一個耗時的任務,你可能會想知道任務的執行情況。是否有Consumer開始執行任務了?是否任務執行到一半死機了? 當前我們上面的代碼,一旦RabbitMQ分發message給Custoerm,它就會立刻從內存刪除。這種情況下,如果你關閉一個Worker,我們就會丟失他正在執行的消息。同樣,我們也會丟失之前分發給他,還沒有來的及執行的消息。 但是我們不想丟失任何 task。如果一個Worker死了,我們想把任務分發給其他的Worker。 為了確保message不丟失,RabbitMQ 提供了 message acknowledgments。Ack是consumer 發送給RabbitMQ的,告訴它,task 已經接受,并處理了,RabbitMQ 可以刪除它了。 如果一個consumer死機了(channel closed,connection closed or Tcp connection lost),沒有返回ack,RabbitMQ就會知道task 沒有處理完,該task就會重新排隊。如果這時候有另外一個Consumer在線,RabbitMQ 就會把它分發給他。 默認Message acknowLedgments 是打開的,之前的例子,我們是顯式的關閉了(設置 noAck=true)。 下面修改代碼,打開Message acknowLedgments。 ``` var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); ``` 運行上面的代碼,如果我們kill 一個Worker,message 不會丟失,他會被分發給其他Worker。 > **Forgotten acknowledgment 遺失acknowledgment** 丟失BasicAck是很常見的錯誤,盡管這個錯很小,但后果很嚴重。當Client quit,Messages 會重新分發,但是RabbitMQ 由于不能釋放掉那些unacked message ,所以會消耗越來越多的內存。 為了 調試這種錯誤, 你可以使用`rabbitmqctl`來打印出 `messages_unacknowledged` 的message信息 > ``` $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done. >``` ##Message durability 持久化 通過上面的ACK配置,當consumer 死亡的時候,task 不會丟失。但是如果RabbitMQ服務停了,task 仍然會丟失。 這里我們就要持久化 task的信息了。 首先,我們確保RabbitMQ不會丟失我們的queue ``` channel.QueueDeclare(queue: "hello", durable: true, //持久化 exclusive: false, autoDelete: false, arguments: null); ``` 盡管我們定義名字叫hello 隊列要持久化,但是仍然不會生效。這是因為我們已經定義了一個沒有持久化的名字叫hello 隊列。RabbitMQ 不允許重新定義(用不同的參數)一個已經存在的隊列,會報錯。因此這里我們應該另外定義一個隊列, 例如 task_queue ``` channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); ``` >NOTE:queue 持久化的修改,producer 和consumer的代碼都要修改。 通過上面的代碼設置我們的queue,即使RabbitMQ重啟也不會丟失。 接下來,我們來持久化message。 ``` var properties = channel.CreateBasicProperties(); properties.Persistent=true; ``` >NOTE:盡管我們設置message持久化了,但是這也不能完全保證message不會丟失。 >這是由于RabbitMQ保存message到硬盤是需要時間的,如果再此期間RabbitMQ服務掛了,message就丟失了。不過對于一般的程序已經足夠了。如果要一個更強壯的方案,你可是使用[publisher confirms](https://www.rabbitmq.com/confirms.html) ##Fair dispatch 公平調度 也許你已經主要到,上面代碼實現的message的調度不是你想要的。例如,假設有兩個Worker,所有的奇數的message都是耗時的操作,而偶數的message都是很簡單的。你會發現一個Worker很空閑,而另一個Woker累死累活的。然而RabbitMQ不知道,還是不停的給他發任務。 這個情況的發生,是由于RabbitMQ 不看 the number of unacknowledged message,只要message進入隊列就分發message。他只是盲目的分發message。 ![](http://www.rabbitmq.com/img/tutorials/prefetch-count.png) 為了解決上面的問題,我們可以使用 `basicQos`方法 設置 `prefetchCount=1`。這個設置會告訴RabbitMQ 每次給Workder只分配一個task,只有當task執行完了,才分發下一個任務。 ``` channel.BasicQos(0, 1, false); ``` >NOTE:注意queue的size > 如果所有的Worker都busy,你的queue會填滿,因此你需要監測queue的情況,添加更多的worker 或者采用其他的策略 ##演示 全部代碼 NewTask.cs ``` using System; using RabbitMQ.Client; using System.Text; class NewTask { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } } ``` Worker.cs ``` using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading; class Worker { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看