<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                ## 生產者 - 消費者模式 生產者消費者模式是一個經典的多線程設計模式。 ![](https://img.kancloud.cn/41/43/4143ae884c19ccfe6de85972767bbb8a_1410x782.png) 總結: 1. 生產者線程將任務提交到內存緩沖區,消費者線程從內存緩沖區獲取任務并執行。 2. 通過內存緩沖區,避免了生成者和消費者直接通信,從而將生產者和消費者解耦。 3. 通過內存緩沖區,允許生產者和消費者的性能差異。 在`JDK`中提供的線程池(`ThreadPoolExecutor`)就是典型的生產者消費者模式(其中任務是線程),其中內存緩沖區的實現使用的是`BlockingQueue`阻塞隊列。 ## 生產者 - 消費者模式(無鎖實現) 在`ThreadPoolExecutor`中使用了`BlockingQueue`阻塞隊列來做內存緩沖區,但是由于使用了鎖和阻塞等待來實現線程間的同步,所以新能不高。 而LMAX公司開發了一套無鎖實現的高性能生產者消費者模式的框架,叫做`Disruptor`。 ![](https://img.kancloud.cn/a2/02/a202315ee123d28a2ed43caaccd2019f_1394x788.png) 例子:(生產者生成數據,消費者計算數據平方) ![](https://img.kancloud.cn/21/c0/21c0f822d14c169c53460167f38f5594_1106x798.png) 引入依賴: ~~~ <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.4</version> </dependency> ~~~ 數據實體: ~~~ public class PCData { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } } ~~~ 數據工廠: ~~~ public class PCDataFactory implements EventFactory<PCData> { @Override public PCData newInstance() { return new PCData(); } } ~~~ 生產者: ~~~ import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; public class Producer { private final RingBuffer<PCData> ringBuffer; public Producer(RingBuffer<PCData> ringBuffer){ this.ringBuffer = ringBuffer; } public void pushData(ByteBuffer bb){ // 獲取環上的下一個序列 long sequence = ringBuffer.next(); PCData data = ringBuffer.get(sequence); // 設置數據 data.setValue(bb.getLong(0)); // 發布序列 ringBuffer.publish(sequence); } } ~~~ 消費者: ~~~ import com.lmax.disruptor.WorkHandler; public class Consumer implements WorkHandler<PCData> { @Override public void onEvent(PCData pcData) throws Exception { // 打印平方值 System.out.println(Thread.currentThread().getName() + " -- value="+pcData.getValue() + " -- 平方="+Math.pow(pcData.getValue(),2)); } } ~~~ 客戶端: ~~~ public class Main { public static void main(String[] args) throws InterruptedException { // 大小需要是2的冪 int bufferSize = 1024; Disruptor<PCData> disruptor = new Disruptor<>( new PCDataFactory(), bufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI, // 選擇合適的策略,提高消費者的響應時間 new BlockingWaitStrategy() // 阻塞等待策略 // new SleepingWaitStrategy() // 休眠等待策略 // new YieldingWaitStrategy() // 謙讓等待策略 // new BusySpinWaitStrategy() // 忙自旋等待策略,死循環 ); // 4個消費者 disruptor.handleEventsWithWorkerPool( new Consumer(), new Consumer(), new Consumer(), new Consumer() ); disruptor.start(); // 生成數據 RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer(); long size = 1000L; // 2個生產者 new Thread(()->{ Producer producer = new Producer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for(long i = 0L;i<size;i++){ bb.putLong(0,i); producer.pushData(bb); System.out.println(Thread.currentThread().getName() + " - 產生數據:"+i); } }).start(); new Thread(()->{ Producer producer = new Producer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for(long i = size;i<2*size;i++){ bb.putLong(0,i); producer.pushData(bb); System.out.println(Thread.currentThread().getName() + " - 產生數據:"+i); } }).start(); } } ~~~ 總結: 1. 選擇合適的策略,提高消費者的響應時間 ``` new BlockingWaitStrategy() // 阻塞等待策略,省CPU new SleepingWaitStrategy() // 休眠等待策略,中等延遲,自旋等待失敗后休眠,不占用太多CPU new YieldingWaitStrategy() // 謙讓等待策略,低延遲,CPU物理核大于線程數 new BusySpinWaitStrategy() // 忙自旋等待策略,死循環,吃掉所有CPU資源 ``` 2. `Disruptor`對`Sequence`使用對齊填充的方式解決CPU緩存偽共享問題。 ## CPU緩存偽共享 看下圖,能知道`CPU緩存偽共享`的問題 ![](https://img.kancloud.cn/63/19/631947b40dfe1946558f5c5378146ee0_1114x676.png) 可以通過將存儲的數據使用填充對齊到緩存行(64字節)大小,使得每個緩存行只存一個數據。 ![](https://img.kancloud.cn/a0/fa/a0fa7022ad434165d8c66d1a0b44f748_1122x684.png) 如下代碼片段是`Disruptor`中`Sequence`繼承的`RhsPadding`類,里面填充了7個`long`類型的值(一個`long`類型64位即8字節,補7個加上自己的一個工8個,合計64字節,剛好占一個緩存行大小) ~~~ class RhsPadding extends Value { protected long p9; protected long p10; protected long p11; protected long p12; protected long p13; protected long p14; protected long p15; RhsPadding() { } } ~~~ ## 參考資料 * 書籍 葛一鳴 * 《Java高并發程序設計》
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看