<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                # 使用`ThreadPoolExecutor`和`Semaphore`限制任務提交率 > 原文: [https://howtodoinjava.com/java/multi-threading/throttling-task-submission-rate-using-threadpoolexecutor-and-semaphore/](https://howtodoinjava.com/java/multi-threading/throttling-task-submission-rate-using-threadpoolexecutor-and-semaphore/) 如果您知道在 Web 服務器中,則可以配置到服務器的最大并發連接數。 如果有更多連接超出此限制,則它們必須等待直到釋放或關閉某些其他連接。 此限制可以視為節流。 節流是為輸出速率比輸入速率慢的系統調節輸入速率的能力。 必須停止系統崩潰或資源耗盡。 在與`BlockingQueue`和`ThreadPoolExecutor`相關的上一篇文章中,我們了解了如何創建具有以下能力的`CustomThreadPoolExecutor`: 1)提交到阻塞隊列 的任務,2)一個執行器,從隊列中拾取任務并執行它們,3)已在`ExecuteGate`之后覆蓋了`Execute()`方法以執行一些必要的額外活動,4)附加了一個`RejectedExecutionHandler`,用于處理由于隊列已滿而被拒絕的任務 我們的方法已經足夠好,并且能夠處理大多數實際情況。 現在,我們再添加一個概念,在某些情況下可能會證明是有益的。 這個概念是圍繞隊列中任務提交的限制。 在此示例中,節流將有助于使隊列中的任務數保持在限制范圍內,從而使任何任務都不會被拒絕。 它從本質上也消除了`RejectedExecutionHandler`的必要性。 ## 使用`CustomThreadPoolExecutor`和`RejectedExecutionHandler`的先前解決方案 在此解決方案中,我們有以下類: **`DemoTask.java`** ```java public class DemoTask implements Runnable { private String name = null; public DemoTask(String name) { this.name = name; } public String getName() { return this.name; } @Override public void run(){ try { Thread.sleep(1000); } catch (InterruptedException e){ e.printStackTrace(); } System.out.println("Executing : " + name); } } ``` **`CustomThreadPoolExecutor.java`** ```java import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class CustomThreadPoolExecutor extends ThreadPoolExecutor { public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t != null) { t.printStackTrace(); } } } ``` **`DemoExecutor.java`** ```java import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DemoExecutor { public static void main(String[] args) { Integer threadCounter = 0; BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50); CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue); executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Lets add another time : " + ((DemoTask) r).getName()); executor.execute(r); } }); // Let start all core threads initially executor.prestartAllCoreThreads(); while (true) { threadCounter++; // Adding threads one by one //System.out.println("Adding DemoTask : " + threadCounter); executor.execute(new DemoTask(threadCounter.toString())); if (threadCounter == 1000) break; } } } ``` 如果運行上述程序,則將獲得**輸出**,如下所示: ```java DemoTask Rejected : 71 Executing : 3 Executing : 5 ... ... ``` 將出現多次“`DemoTask Rejected`”。 在下一個解決方案中,我們將使用節流技術,以使任何任務都不會被拒絕。 ## 使用`ThreadPoolExecutor`和`Semaphore`限制任務的提交率 在此解決方案中,我們將創建一個[`Semaphore`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html "Semaphore"),其編號必須等于在任何給定時間點阻塞隊列中的最大任務數。 因此該方法如下所示: 1)在執行任務之前,要求鎖定信號量 2)如果獲取了鎖定,則執行正常。 否則,將重試直到獲得鎖 3)任務完成后; 鎖被釋放到信號量 我們啟用節流的新`BlockingThreadPoolExecutor`如下所示: ```java package threadpoolDemo; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final Semaphore semaphore; public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); semaphore = new Semaphore(corePoolSize + 50); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); } @Override public void execute(final Runnable task) { boolean acquired = false; do { try { semaphore.acquire(); acquired = true; } catch (final InterruptedException e) { //LOGGER.warn("InterruptedException whilst aquiring semaphore", e); } } while (!acquired); try { super.execute(task); } catch (final RejectedExecutionException e) { System.out.println("Task Rejected"); semaphore.release(); throw e; } } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t != null) { t.printStackTrace(); } semaphore.release(); } } ``` 現在,如下測試代碼。 ```java package threadpoolDemo; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DemoExecutor { public static void main(String[] args) { Integer threadCounter = 0; BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50); BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue); executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Lets add another time : " + ((DemoTask) r).getName()); executor.execute(r); } }); // Let start all core threads initially executor.prestartAllCoreThreads(); while (true) { threadCounter++; // Adding threads one by one System.out.println("Adding DemoTask : " + threadCounter); executor.execute(new DemoTask(threadCounter.toString())); if (threadCounter == 1000) break; } } } ``` 當使用`BlockingThreadPoolExecutor`代替`CustomThreadPoolExecutor`運行`DemoExecutor`程序時,您不會看到任何任務被拒絕,并且所有任務都將成功執行。 您可以控制在任何時候通過`Semaphore`構造函數傳遞參數的任務數量。 這就是這篇文章的全部內容。 您應該閱讀有關[**并發**](//howtodoinjava.com/category/java/multi-threading/ "multi-threading")的更多信息,以提高信心。 學習愉快!
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看