<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # 流程圖 ![](https://box.kancloud.cn/663ab23ca3b63ea2e6ffcf0aaafd9a47_1107x681.png) ![](https://box.kancloud.cn/4b901fc8e8db4d176704f18826a988ba_700x882.png) # 啟動流程分析 ------------程序員client------------------ 1. 客戶端運行storm nimbus時,會調用storm的python腳本,該腳本中為每個命令編寫一個方法,每個方法都可以生成一條相應的java命令。 命令格式如下:`java -server xxxx.ClassName -args` ~~~ nimbus---> Running: /export/servers/jdk/bin/java -server backtype.storm.daemon.nimbus supervisor---> Running: /export/servers/jdk/bin/java -server backtype.storm.daemon.supervisor ~~~ --------------nimbus--------------------- 2. nibums啟動之后,接受客戶端提交任務 命令格式:`storm jar xxx.jar xxx驅動類 參數` ~~~ Running: /export/servers/jdk/bin/java -client -Dstorm.jar=/export/servers/storm/examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount-28 ~~~ 該命令會執行 storm-starter-topologies-0.9.6.jar 中的storm-starter-topologies-0.9.6.jar的main方法,main方法中會執行以下代碼 ~~~ StormSubmitter.submitTopology("mywordcount",config,topologyBuilder.createTopology()); ~~~ topologyBuilder.createTopology(),會將編寫的spout對象和bolt對象進行序列化。 會將用戶的jar上傳到 nimbus物理節點的 /export/data/storm/workdir/nimbus/inbox目錄下。并且改名,改名的規則是添加了一個UUID字符串。 在nimbus物理節點的 /export/data/storm/workdir/nimbus/stormdist目錄下。有當前正在運行的topology的jar包和配置文件,序列化對象文件。 3. nimbus接受到任務之后,會將任務進行分配,分配會產生一個assignment對象,該對象會保存到zk中,目錄是/storm/assignments ,該目錄只保存正在運行的topology任務。 --------supervisor------------------ 4. supervisor通過watch機制,感知到nimbus在zk上的任務分配信息,從zk上拉取任務信息,分辨出屬于自己任務。 ~~~ ResourceWorkerSlot[hostname=192.168.1.106,memSize=0,cpu=0,tasks=[1, 2, 3, 4, 5, 6, 7, 8],jvm=<null>,nodeId=61ce10a7-1e78-4c47-9fb3-c21f43a331ba,port=6900] ~~~ 5. supervisor 根據自己的任務信息,啟動自己的worker,并分配一個端口 ~~~ '/export/servers/jdk/bin/java' '-server' '-Xmx768m' export/data/storm/workdir/supervisor/stormdist/wordcount1-3-1461683066/stormjar.jar' 'backtype.storm.daemon.worker' 'wordcount1-3-1461683066' 'a69bb8fc-e08e-4d55-b51f-e539b066f90b' '6701' '9fac2805-7d2b-4e40-aabc-1c85c9856d64' ~~~ ---------worker---------------------- 6. worker啟動之后,連接zk,拉取任務 ~~~ ResourceWorkerSlot[hostname=192.168.1.106,memSize=0,cpu=0,tasks=[1, 2, 3, 4, 5, 6, 7, 8],jvm=<null>,nodeId=61ce10a7-1e78-4c47-9fb3-c21f43a331ba,port=6900] ~~~ 假設任務信息: 1--->spout---type:spout 2--->bolt ---type:bolt 3--->acker---type:bolt 得到對象有幾種方式? new ClassName 創建對象、 class.forName 反射對象、 clone 克隆對象、 序列化反序列化 worker通過反序列化,得到程序員自己定義的spout和bolt對象。 7. worker根據任務類型,分別執行spout任務或者bolt任務。 spout的生命周期是:open、nextTuple、outPutFiled bolt的生命周期是:prepare、execute(tuple)、outPutFiled # 代碼流程 jstorm supervisor如何啟動worker,worker如何啟動task 1. 下載Jstorm源碼,在源碼包下找到 daemon包,在這個包下有三個子包,分別是nimbus,supervisor,worker。 2. 通過架構圖,我們已知nimbus分配任務,并將任務信息寫入到zk上,supervisor讀取zk上的任務后啟動自己的worker。所以我們分析supervisor如何啟動worker,worker如何啟動task。 3. supervisor如何啟動worker。打開 com.alibaba.jstorm.daemon.supervisor.Supervisor 發現supervisor有幾個方法,方法中有個mkSupervisor方法。 4. 進去Supervisor中的mkSupervisor方法,在第144行有以下的代碼,該代碼創建了SyncSupervisorEvent 對象。 ~~~ SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorId, conf, syncSupEventManager, stormClusterState, localState, syncProcessEvent, hb); ~~~ 5. SyncSupervisorEvent對象實現了RunnableCallback接口,該接口有個run方法會被定時執行。在run方法的191行,有代碼如下,主要是要supervisor獲取到任務信息,要開始準備啟動worker了。 ~~~ syncProcesses.run(zkAssignment, downloadFailedTopologyIds); ~~~ 6. syncProcesses是com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent的引用變量,該類中有個自定義的run方法中有段代碼如下,調用的startNewWorkers方法 ~~~ startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds); ~~~ 7. SyncProcessEvent的startNewWorkers方法有代碼片段如下,主要是根據集群模式啟動不同模式下的worker。我們跟蹤分布式集群模式下的worker啟動。 ~~~ for (Entry<Integer, LocalAssignment> entry : newWorkers.entrySet()) { if (clusterMode.equals(“distributed”)) { launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, assignment); } else if (clusterMode.equals(“local”)) { launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, workerThreadPids); } } ~~~ 8. 在分布式模式下worker啟動最終會調用一個類似于java -server xxx.worker 啟動worker。由于第7步中,有個for循環,該for循環會迭代出屬于當前supervisor的所有worker任務并啟動。 ~~~ JStormUtils.launchProcess(cmd, environment, true); ~~~ 9. java -server xxx.worker,命令執行之后,會執行Worker的mian方法。worker的main方法有代碼如下,其實調用了worker自己內部的靜態方法,叫做mk_worker方法。 ~~~ WorkerShutdown sd = mk_worker(conf, null, topology_id, supervisor_id, Integer.parseInt(port_str), worker_id, jar_path); sd.join(); ~~~ 10. mk_worker靜態方法,會執行以下代碼,創建一個worker的實例,并立即執行execute方法。 ~~~ Worker w = new Worker(conf, context, topology_id, supervisor_id, port, worker_id, jar_path); return w.execute(); ~~~ 11. execute方法會執行以下代碼創建一個RefreshConnections 的實例。 ~~~ RefreshConnections refreshConn = makeRefreshConnections(); ~~~ 12. makeRefreshConnections 方法會執行以下代碼創建一個RefreshConnections 實例。 ~~~ RefreshConnections refresh_connections = new RefreshConnections(workerData); ~~~ 13. RefreshConnections 是繼承了 RunnableCallback,該實例的會有一個run方法會被定時執行。run方法中有以下代碼,其中createTasks(addedTasks)方法用來創建Task任務。 ~~~ shutdownTasks(removedTasks); createTasks(addedTasks); updateTasks(updatedTasks); ~~~ 14. createTasks方法有代碼如下,循環啟動屬于該worker的Task任務,啟動Task任務主要調用Task.mk_task(workerData, taskId); ~~~ for (Integer taskId : tasks) { try { TaskShutdownDameon shutdown = Task.mk_task(workerData, taskId); workerData.addShutdownTask(shutdown); } catch (Exception e) { LOG.error(“Failed to create task-” + taskId, e); throw new RuntimeException(e); } } ~~~ 15. Task.mk_task(workerData, taskId)方法實現如下,創建一個Task對象并立即調用execute方法。 ~~~ Task t = new Task(workerData, taskId); return t.execute(); ~~~ 16. execute方法實現如下,用來初始化一個Executor,我們知道在默認情況下一個task等于一個executor。 ~~~ RunnableCallback baseExecutor = prepareExecutor(); ~~~ 17. 進入prepareExecutor()方法,代碼如下,發現代碼調用了mkExecutor方法。 ~~~ final BaseExecutors baseExecutor = mkExecutor(); ~~~ 18. mkExecutor方法,代碼如下,如果當前taskObj是Bolt就創建Bolt的executor,如果當前taskObj是Spout就創建相應的Spout executor。 ~~~ public BaseExecutors mkExecutor() { BaseExecutors baseExecutor = null; if (taskObj instanceof IBolt) { baseExecutor = new BoltExecutors(this); } else if (taskObj instanceof ISpout) { if (isSingleThread(stormConf) == true) { baseExecutor = new SingleThreadSpoutExecutors(this); } else { baseExecutor = new MultipleThreadSpoutExecutors(this); } } return baseExecutor; } ~~~ 19. 創建完了executor,現在有兩條線,分別是bolt executor和spout executor。以bolt executor 為例,這個executor會實現Disruptor的EventHandler接口。 接口onevent方法需要實現,實現代碼中會調用processTupleEvent()方法。下面節選onevent中的部分代碼。 ~~~ if (event instanceof Tuple) { processControlEvent(); processTupleEvent((Tuple) event); } else if (event instanceof BatchTuple) { for (Tuple tuple : ((BatchTuple) event).getTuples()) { processControlEvent(); processTupleEvent((Tuple) tuple); } } ~~~ 20. 進入processTupleEvent方法,發現有代碼如下,其實最終是調用了bolt.execute()方法。 ~~~ private void processTupleEvent(Tuple tuple) { try { if (xxx) { backpressureTrigger.handle(tuple); } else { bolt.execute(tuple); } } catch (Throwable e) { error = e; LOG.error(“bolt execute error “, e); report_error.report(e); } } ~~~ # 序列化 當topology發布的時候,所有的bolt和spout組件首先會進行序列化,然后通過網絡發送到集群中. 如果spout或者bolt在序列化之前(比如說在構造函數中生成)實例化了任何無法序列化的實例變量,在進行序列化時會拋出NotSerializableException異常,topology就會部署失敗
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看