<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                &emsp;&emsp;Node.js 官方提供了[Cluster](https://nodejs.org/dist/latest-v16.x/docs/api/cluster.html)和[Child process](https://nodejs.org/dist/latest-v16.x/docs/api/child_process.html)創建子進程,通過[Worker threads](https://nodejs.org/dist/latest-v16.x/docs/api/worker_threads.html)模塊創建子線程。但前者無法共享內存,通信必須使用 JSON 格式,有一定的局限性和性能問題。后者更輕量,并且可以共享內存,通過傳輸 ArrayBuffer 實例或共享 SharedArrayBuffer 實例來做到這一點,即數據格式沒有太多要求。但是要注意,數據中不能包含函數。 &emsp;&emsp;Worker threads 從 Node V12 開始成為正式標準,其對于執行 CPU 密集型的操作很有用,而對 I/O 密集型工作沒有多大幫助。 Node.js 內置的異步 I/O 操作要比它效率更高。注意,Worker threads 是基于 Node.js 架構的多工作線程,如下圖所示。在每個工作線程中,都會包含 V8 和 libuv,即都包含Event Loop。 :-: ![](https://img.kancloud.cn/e8/36/e8366cc7d9932bcd4fd41edfb891726b_2400x1280.jpeg =800x) ## 一、線程池 &emsp;&emsp;創建、執行、銷毀一個 Worker 的開銷是很大的,所以需要實現一個線程池(Worker Pool),在初始化時創建有限數量的 Worker 并加載單一的 worker.js,主線程和 Worker 可進行進程間通信,當所有任務完成后,這些 Worker 將會被統一銷毀。 &emsp;&emsp;在 Worker 中通過 parentPort.postMessage() 向主線程發送消息,而在主線程中可以通過 worker.on('message') 接收發送過來的消息,worker 是一個 Worker 實例,例如 new Worker(filePath)。 &emsp;&emsp;下面是一個官方示例,isMainThread 可判斷當前是否是主線程,workerData 是傳遞給 Worker 的數據。 ~~~ const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); if (isMainThread) { module.exports = function parseJSAsync(script) { return new Promise((resolve, reject) => { const worker = new Worker(__filename, { workerData: script }); worker.on('message', resolve); worker.on('error', reject); worker.on('exit', (code) => { if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`)); }); }); }; } else { const script = workerData; parentPort.postMessage(script); } ~~~ &emsp;&emsp;下面是一個線程池示例,參考自《[worker\_threads 初體驗](https://blog.skk.moe/post/say-hello-to-nodejs-worker-thread/)》一文,做了微調,具體在此不在贅述,可閱讀原文或注釋。 ~~~ // 獲取當前設備的 CPU 線程數目,作為 numberOfThreads 的默認值。 const { length: cpusLength } = require('os').cpus(); const { Worker } = require('worker_threads'); class WorkerPool { constructor(workerPath, options = {}, numberOfThreads = cpusLength) { if (numberOfThreads < 1) { throw new Error('Number of threads should be greater or equal than 1!'); } this.workerPath = workerPath; this.numberOfThreads = numberOfThreads; // 任務隊列 this._queue = []; // Worker 索引 this._workersById = {}; // Worker 激活狀態索引 this._activeWorkersById = {}; // 創建 Workers for (let i = 0; i < this.numberOfThreads; i++) { const worker = new Worker(workerPath, options); this._workersById[i] = worker; // 將這些 Worker 設置為未激活狀態 this._activeWorkersById[i] = false; } } /** * 檢查空閑的 Worker */ getInactiveWorkerId() { for (let i = 0; i < this.numberOfThreads; i++) { if (!this._activeWorkersById[i]) return i; } return -1; } /** * 調用 Worker 執行,目的是在指定的 Worker 里執行指定的任務 */ runWorker(workerId, taskObj) { const worker = this._workersById[workerId]; // 當任務執行完畢后執行 const doAfterTaskIsFinished = () => { // 去除所有的 Listener,不然一次次添加不同的 Listener 會內存溢出(OOM) worker.removeAllListeners('message'); worker.removeAllListeners('error'); // 將這個 Worker 設為未激活狀態 this._activeWorkersById[workerId] = false; if (this._queue.length) { // 任務隊列非空,使用該 Worker 執行任務隊列中第一個任務 this.runWorker(workerId, this._queue.shift()); } }; // 將這個 Worker 設置為激活狀態 this._activeWorkersById[workerId] = true; // 設置兩個回調,用于 Worker 的監聽器 const messageCallback = result => { taskObj.cb(null, result); doAfterTaskIsFinished(); }; const errorCallback = error => { taskObj.cb(error); doAfterTaskIsFinished(); }; // 為 Worker 添加 'message' 和 'error' 兩個 Listener worker.once('message', messageCallback); worker.once('error', errorCallback); // 將數據傳給 Worker 供其獲取和執行 worker.postMessage(taskObj.data); } /** * 運行線程 */ run(data) { // Promise 是個好東西 return new Promise((resolve, reject) => { // 調用 getInactiveWorkerId() 獲取一個空閑的 Worker const availableWorkerId = this.getInactiveWorkerId(); const taskObj = { data, cb: (error, result) => { // 雖然 Workers 需要使用 Listener 和 Callback,但這不能阻止我們使用 Promise,對吧? // 不,你不能 util.promisify(taskObj) 。人不能,至少不應該。 if (error) reject(error); return resolve(result); } }; if (availableWorkerId === -1) { // 當前沒有空閑的 Workers 了,把任務丟進隊列里,這樣一旦有 Workers 空閑時就會開始執行。 this._queue.push(taskObj); return null; } // 有一個空閑的 Worker,用它執行任務 this.runWorker(availableWorkerId, taskObj); }) } /** * 銷毀 */ destroy(force = false) { for (let i = 0; i < this.numberOfThreads; i++) { if (this._activeWorkersById[i] && !force) { // 通常情況下,不應該在還有 Worker 在執行的時候就銷毀它,這一定是什么地方出了問題,所以還是拋個 Error 比較好 // 不過保留一個 force 參數,總有人用得到的 throw new Error(`The worker ${i} is still runing!`); } // 銷毀這個 Worker this._workersById[i].terminate(); } } } module.exports = WorkerPool; ~~~ ## 二、實踐 &emsp;&emsp;之所以需要多線程,是為了解決一個優化需求。就是有一個接口,里面有很多查詢數據庫(MySQL和MongoDB)的操作,單條語句并不會慢,但累加后整體的響應速度就會變慢,那么就想通過多線程,同時處理一些查詢語句,然后整合結果。 &emsp;&emsp;先對線程池做最簡單的處理,創建 worker.js,接收 userId。 ~~~ const { isMainThread, parentPort } = require('worker_threads'); // 不是主線程時執行 if (!isMainThread) { parentPort.on('message', async ({userId }) => { console.log('postMessage', userId); parentPort.postMessage(userId); }); } ~~~ &emsp;&emsp;然后初始化線程池,將數組中的 userId 傳遞給 Worker,pool.run({ userId: item })。 ~~~ const WorkerPool = require('./workerPool'); const { join } = require('path'); async function workerMain(services) { const workerPath = join(__dirname + '/worker.js'); // 初始化一個 Worker Pool const pool = new WorkerPool(workerPath); Promise.all([4,12,13,15].map(async item => { await pool.run({ userId: item }); })).then(json => { // 銷毀線程池 pool.destroy(); }); } ~~~ &emsp;&emsp;輸出順序沒有按照數組的順序,并且每次的輸出順序還都是不同的,由此可知,代碼是并發運行的。 ~~~ postMessage 12 postMessage 4 postMessage 15 postMessage 13 ~~~ &emsp;&emsp;那么接下來就引入數據庫查詢的代碼,公司項目基于[sequelize.js](https://sequelize.org/)封裝了增刪改查的邏輯,通過 services 變量可以調用相關的操作。在主線程中,計劃將 services 傳遞到 Worker 中。 ~~~ async function workerMain(services) { // Worker Threads 不能共享實例以及帶函數的對象 const workerPath = join(__dirname + '/worker.js', { workerData: services }); // 初始化一個 Worker Pool const pool = new WorkerPool(workerPath); // 省略代碼...... } ~~~ &emsp;&emsp;然而報錯了,大致是下面這個意思,無法克隆,因為對象中包含函數,就會引發錯誤。 ~~~ node:internal/worker:349 ReflectApply(this[kPublicPort].postMessage, this[kPublicPort], args); could not be cloned. ~~~ &emsp;&emsp;想以通信的方式實現數據庫的并發查詢,目前看來不能完成。 &emsp;&emsp;其實可以在 worker.js 中單獨引入 services, 不過由于我們在腳本文件中采用了 import 語法,因此在執行時會報錯,SyntaxError: Cannot use import statement outside a module。 ~~~ const { isMainThread, parentPort, workerData } = require('worker_threads'); const services = require('../services'); // 不是主線程時執行 if (!isMainThread) { // 省略代碼...... } ~~~ &emsp;&emsp;還有一種解決方案,其成本就比較高,就是單獨再實現一套服務層,也就是說再封裝一層符合Node.js 模塊化語法的數據庫操作集合。 ***** > 原文出處: [博客園-Node.js躬行記](https://www.cnblogs.com/strick/category/1688575.html) [知乎專欄-Node.js躬行記](https://zhuanlan.zhihu.com/pwnode) 已建立一個微信前端交流群,如要進群,請先加微信號freedom20180706或掃描下面的二維碼,請求中需注明“看云加群”,在通過請求后就會把你拉進來。還搜集整理了一套[面試資料](https://github.com/pwstrick/daily),歡迎閱讀。 ![](https://box.kancloud.cn/2e1f8ecf9512ecdd2fcaae8250e7d48a_430x430.jpg =200x200) 推薦一款前端監控腳本:[shin-monitor](https://github.com/pwstrick/shin-monitor),不僅能監控前端的錯誤、通信、打印等行為,還能計算各類性能參數,包括 FMP、LCP、FP 等。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看