<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # Storm 狀態管理 ## 核心 Storm 中的狀態支持 Storm 核心為 Bolt 提供用于保存和重新獲取其操作狀態的抽象. 提供一個基于內存的默認狀態實現,同時還提供了一個使用 Redis 做狀態保持的實現. ## 狀態管理 若 Bolt 需要通過框架來管理和保持其狀態, 應該實現接口 `IStatefulBolt`,或者繼承類 `BaseStatefulBolt`,然后實現方法 `void initState(T state)`. 方法 `initState` 在 Bolt 使用保存的歷史狀態進行初始化期間通過框架執行. 執行時機在 `prepare` 方法之后,在 Bolt 開始處理 Tuple 數據之前. 當前支持的唯一一種 `State` 實現是提供 key-value 映射的 `KeyValueState`. 例如, 一個單詞計數 bolt 可以使用 key-value 狀態抽象實現單詞計數, 步驟如下. 1. 繼承 `BaseStatefulBolt` 類, 添加一個 `KeyValueState` 實例變量, 用于存儲單詞到單詞數量的映射. 2. 在 init 方法中用之前保存的狀態來初始化 Bolt. 這里面含有上次程序運行的時候框架最后一次提交的單詞計數. 3. 在 `execute` 方法中, 更新單詞計數. ``` public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, Long>> { private KeyValueState<String, Long> wordCounts; private OutputCollector collector; ... @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void initState(KeyValueState<String, Long> state) { wordCounts = state; } @Override public void execute(Tuple tuple) { String word = tuple.getString(0); Integer count = wordCounts.get(word, 0); count++; wordCounts.put(word, count); collector.emit(tuple, new Values(word, count)); collector.ack(tuple); } ... } ``` 1. 框架周期性的檢查并保存 Bolt 的狀態 (默認每秒一次). 頻率可以通過設置 storm config 的 `topology.state.checkpoint.interval.ms`來自己定義。 2. 對于狀態持久化, 可以設置 storm config 中的 `topology.state.provider` 來使用支持持久化的 state provider. 例如, 若使用基于 Redis 的 key-value 狀態實現, 需要在 storm.yaml 文件中設置 `topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider`. provider 實現代碼的 jar 包需要放在 class path 下, 在這個例子中, 需要把 `storm-redis-*.jar` 置于 extlib 目錄下. 3. state provider 的屬性可以通過設置 `topology.state.provider.config` 來進行覆蓋. 對于 Redis state, 是一個具有下列屬性的 JSON 字符串. ``` { "keyClass": "Optional fully qualified class name of the Key type.", "valueClass": "Optional fully qualified class name of the Value type.", "keySerializerClass": "Optional Key serializer implementation class.", "valueSerializerClass": "Optional Value Serializer implementation class.", "jedisPoolConfig": { "host": "localhost", "port": 6379, "timeout": 2000, "database": 0, "password": "xyz" } } ``` ## 檢查點機制 檢查點通過一個內部的 checkpoint spout 來觸發,觸發周期在 `topology.state.checkpoint.interval.ms` 指定. 如果在拓撲中至少有一個 `IStatefulBolt`, topology builder 會自動添加 checkpoint spout. 對于有狀態的拓撲, topology builder 使用 `StatefulBoltExecutor` 包裝 `IStatefulBolt`, 負責在收到 checkpoint tuple 的時候來執行狀態提交. 無狀態的 Bolt 被包裝在 `CheckpointTupleForwarder`, 僅會轉發 checkpoint tuple 以確保其可以貫穿整個拓撲DAG(有向無環圖). checkpoint tuple 在一個名為 `$checkpoint` 的內部 stream 中流動. topology builder 組織 checkpoint spout 源流出的 checkpoint stream 穿過整個拓撲. ``` default default default [spout1] ---------------> [statefulbolt1] ----------> [bolt1] --------------> [statefulbolt2] | ----------> --------------> | ($chpt) ($chpt) | [$checkpointspout] _______| ($chpt) ``` 當到了檢查周期, checkpoint tuples 被 checkpoint spout 發射出來. 一旦接收到 state tuple, Bolt 的狀態就會被保存, 然后 checkpoint tuple 會轉發到下一個組件. 每一個 Bolt 在保存狀態之前, 會在所有的輸入流上等待 checkpoint 到達, 使得狀態表現為一個跨整個拓撲的持續的狀態. 一旦 checkpoint spout 從所有的 Bolt 中接收到ACK消息, 狀態提交就完成了, 事務會被 checkpoint spout 記錄為已提交. checkpoint 當前不會檢查 Spout 的狀態. 目前, 一旦所有的 Bolt 被檢查完畢, 并且一旦 checkpoint tuple 被 ack, Spout 發射的 tuples 也會被 ack. 這也意味著, `topology.state.checkpoint.interval.ms` 要小于 `topology.message.timeout.secs`. 狀態提交的工作方式就像一個具有 `準備` 和 `提交` 階段的三段式提交協議, 以達到跨整個拓撲的狀態的保存操作具有一致性和原子性. ### 恢復 恢復階段會在拓撲首次啟動的時候觸發. 如果前置事務沒有成功裝備好, 會向拓撲中發送一個 `rollback` 消息, Bolt 會丟棄已經就緒的事務. 如果前置事務成功準備好但是未提交, 會向拓撲中發送一個 `commit` 消息讓所有已經就緒的事務可以被提交. 當這些步驟完成后, Bolt 狀態初始化完成. 恢復也會在其中一個 Bolt 未成功確認 checkpoint 消息或者 worker 在這中間掛了的時候觸發. 因此, 當 supervisor 重啟一個 worker, checkpoint 機制會確保 Bolt 使用之前的狀態初始化, 同時檢查操作會從上次離開的點繼續執行. ### 可靠性 Storm 使用 acking 機制在 tuples 處理失敗的時候進行重新發送. 有可能狀態已經提交但是 worker 在確認(ack) tuple 之前掛掉. 在這種情況下重新發送的 tuple 會導致狀態重復更新. 當前, `StatefulBoltExecutor` 在接收到一個流中的 checkpoint tuple 以后繼續從一個流中獲取并處理 tuple, 同時等待 checkpoint 到達其他輸入流以保存狀態. 這也可能導致恢復期間造成重復的狀態更新. 狀態抽象并不能消除重復, 當前僅提供'至少一次'的保障. 為了提供'至少一次'的保障, 有狀態拓撲中的所有 Bolt 都會對 Tuple 進行標記, 同時在處理完成后發射并確認輸入 Tuple. 對于無狀態的 Bolt, 繼承 `BaseBasicBolt` 可以自動管理"標記/確認"操作. 有狀態的 Bolt 標記 Tuple同時在處理完成后發射和確認tuple, 就像上面"狀態管理"一節中的 `WordCountBolt`. ### IStateful bolt 鉤子 IStateful 接口提供鉤子方法用以在有狀態 Bolt 中可以實現一些自定義的動作 ``` /** * This is a hook for the component to perform some actions just before the * framework commits its state. */ void preCommit(long txid); /** * This is a hook for the component to perform some actions just before the * framework prepares its state. */ void prePrepare(long txid); /** * This is a hook for the component to perform some actions just before the * framework rolls back the prepared state. */ void preRollback(); ``` 這個功能是可選的, 并且有狀態 Bolt 未提供任何實現. 提供這個功能是為了可以在狀態抽象的頂層(我們可能想在有狀態 Bolt 的狀態準備好之前做一些其他動作如提交或者回滾的地方)建立其他系統級組件. ## 提供自定義狀態實現 當前唯一支持的 `State` 實現是提供 key-value 的映射的 `KeyValueState`. 自定義狀態實現應當為接口 `org.apache.storm.State` 的方法提供實現. 這些方法是`void prepareCommit(long txid)`, `void commit(long txid)`, `rollback()`. `commit()` 方法是可選的且在 Bolt 管理自己的狀態的時候非常有用. 這些當前僅用于內部系統 Bolt, 例如 CheckpointSpout 在保存自己狀態的時候. `KeyValueState` 的實現也應當實現定義在接口 `org.apache.storm.state.KeyValueState` 中的方法. ### State provider 框架通過對應的 `StateProvider` 來實例化狀態. 一個自定義的狀態應當也提供一個可以加載和返回基于命名空間的狀態的 `StateProvider` 實現. 每一個狀態屬于一個獨有的命名空間. 命名空間通常是每個 Task 唯一的, 因此每個任務可以有自己的狀態. StateProvider 和相應的 State 實現應該位于 Storm 的 class path 下(一般放在 extlib 目錄中).
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看