<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # Storm HDFS Integration Storm組件和 HDFS 文件系統交互. ## Usage 以下示例將pipe(“|”)分隔的文件寫入HDFS路徑hdfs://localhost:54310/foo。 每1000個 tuple 之后,它將同步文件系統,使該數據對其他HDFS客戶端可見。當它們達到5MB大小時,它將旋轉文件。 ``` // use "|" instead of "," for field delimiter RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // sync the filesystem after every 1k tuples SyncPolicy syncPolicy = new CountSyncPolicy(1000); // rotate files when they reach 5MB FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/foo/"); HdfsBolt bolt = new HdfsBolt() .withFsUrl("hdfs://localhost:54310") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); ``` ### Packaging a Topology 當打包你的 topology(拓撲)代碼的時候,要使用 插件,不要使用插件. shade 插件提供了合并 Jar manifest entries 的功能,hadoop client 可以用來做URL scheme 方案. 如果你經歷了類似于下面的錯誤: ``` java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs ``` 這表明你的 topology jar沒有正確的打包. 如果你使用maven來創建你的topology jar,你應該使用下面 `maven-shade-plugin` 配置來創建你的 topology jar: ``` <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> ``` ### Specifying a Hadoop Version 默認情況下,storm-hdfs使用下面的Hadoop依賴. ``` <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> ``` 如果你使用的Hadoop版本不同,你可以移除storm-hdfs中 Hadoop依賴,并添加你自己的依賴到你的 pom中. Hadoop客戶端版本不兼容,錯誤如: ``` com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero) ``` ## Customization ### Record Formats(記錄格式化) 記錄格式化可以通過提供的`org.apache.storm.hdfs.format.RecordFormat`接口來控制: ``` public interface RecordFormat extends Serializable { byte[] format(Tuple tuple); } ``` 提供的`org.apache.storm.hdfs.format.DelimitedRecordFormat`實現可以生成如 CSV 和 制表符分隔 的文件. T ### File Naming 文件名稱可以通過提供的`org.apache.storm.hdfs.format.FileNameFormat`接口來控制: ``` public interface FileNameFormat extends Serializable { void prepare(Map conf, TopologyContext topologyContext); String getName(long rotation, long timeStamp); String getPath(); } ``` 提供的 `org.apache.storm.hdfs.format.DefaultFileNameFormat` 創建的文件名稱格式如下: ``` {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension} ``` 例如: ``` MyBolt-5-7-1390579837830.txt ``` 默認情況下,前綴是空的,擴展標識是".txt". ### Sync Policies 同步策略允許你將 buffered data 緩沖到底層文件系統(從而client可以讀取數據),通過實現`org.apache.storm.hdfs.sync.SyncPolicy` 接口: ``` public interface SyncPolicy extends Serializable { boolean mark(Tuple tuple, long offset); void reset(); } ``` `HdfsBolt` 會為每個要處理的 tuple 調用 `mark()`方法.返回 `true` 會觸發 `HdfsBolt`執行同步/刷新,之后會調用`reset()`方法. `org.apache.storm.hdfs.sync.CountSyncPolicy`類可以簡單的觸發同步,當一定數量的tuple執行完成后. ### File Rotation Policies 類似于同步策略,文件反轉策略允許你通過 `org.apache.storm.hdfs.rotation.FileRotation` 接口來控制數據文件反轉. ``` public interface FileRotationPolicy extends Serializable { boolean mark(Tuple tuple, long offset); void reset(); } ``` `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy`實現允許數據文件達到指定的文件大小后,觸發文件反轉. ``` FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); ``` ### File Rotation Actions HDFS bolt 和 Trident State實現允許你注冊任意數量的`RotationAction`s. `RotationAction`s要做的就是提供一個hook,當文件反轉后執行一些操作。例如,移動一個文件到不同的路徑下,或者重命名. ``` public interface RotationAction extends Serializable { void execute(FileSystem fileSystem, Path filePath) throws IOException; } ``` Storm-HDFS 包括一個簡單的操作,反轉后移動一個文件: ``` public class MoveFileAction implements RotationAction { private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class); private String destination; public MoveFileAction withDestination(String destDir){ destination = destDir; return this; } @Override public void execute(FileSystem fileSystem, Path filePath) throws IOException { Path destPath = new Path(destination, filePath.getName()); LOG.info("Moving file {} to {}", filePath, destPath); boolean success = fileSystem.rename(filePath, destPath); return; } } ``` 如果你使用 Trident,并且是有序的文件,你可以像下面這樣使用: ``` HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions() .withFileNameFormat(fileNameFormat) .withSequenceFormat(new DefaultSequenceFormat("key", "data")) .withRotationPolicy(rotationPolicy) .withFsUrl("hdfs://localhost:54310") .addRotationAction(new MoveFileAction().withDestination("/dest2/")); ``` ## Support for HDFS Sequence Files `org.apache.storm.hdfs.bolt.SequenceFileBolt`類允許你寫入storm data 到連續的HDFS文件中: ``` // sync the filesystem after every 1k tuples SyncPolicy syncPolicy = new CountSyncPolicy(1000); // rotate files when they reach 5MB FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withExtension(".seq") .withPath("/data/"); // create sequence format instance. DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence"); SequenceFileBolt bolt = new SequenceFileBolt() .withFsUrl("hdfs://localhost:54310") .withFileNameFormat(fileNameFormat) .withSequenceFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy) .withCompressionType(SequenceFile.CompressionType.RECORD) .withCompressionCodec("deflate"); ``` `SequenceFileBolt` 需要你提供一個 `org.apache.storm.hdfs.bolt.format.SequenceFormat`,用來映射 tuples到 key/value pairs。 ``` public interface SequenceFormat extends Serializable { Class keyClass(); Class valueClass(); Writable key(Tuple tuple); Writable value(Tuple tuple); } ``` ## Trident API storm-hdfs 還包括一個 Trident `state` 實現,用于寫入數據到HDFS,API類似于 bolts. ``` Fields hdfsFields = new Fields("field1", "field2"); FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/trident") .withPrefix("trident") .withExtension(".txt"); RecordFormat recordFormat = new DelimitedRecordFormat() .withFields(hdfsFields); FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); HdfsState.Options options = new HdfsState.HdfsFileOptions() .withFileNameFormat(fileNameFormat) .withRecordFormat(recordFormat) .withRotationPolicy(rotationPolicy) .withFsUrl("hdfs://localhost:54310"); StateFactory factory = new HdfsStateFactory().withOptions(options); TridentState state = stream .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields()); ``` 要使用序列文件`State`實現,請使用`HdfsState.SequenceFileOptions`: ``` HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions() .withFileNameFormat(fileNameFormat) .withSequenceFormat(new DefaultSequenceFormat("key", "data")) .withRotationPolicy(rotationPolicy) .withFsUrl("hdfs://localhost:54310") .addRotationAction(new MoveFileAction().toDestination("/dest2/")); ``` ## Working with Secure HDFS 如果您的 topology(拓撲)將與安全的HDFS進行交互,則您的 bolts/states 需要通過NameNode進行身份驗證。我們 目前有2個選項支持: ### Using HDFS delegation tokens 您的管理員可以配置nimbus來代表拓撲提交者用戶自動獲取授權令牌。 nimbus需要從以下配置開始: nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] hdfs.keytab.file: "/path/to/keytab/on/nimbus" (hdfs 超級管理員可以代理其他用戶.) hdfs.kerberos.principal: "[superuser@EXAMPLE.com](mailto:superuser@EXAMPLE.com)" nimbus.credential.renewers.freq.secs : 82800 (23 小時, hdfs tokens 需要每24個小時更新一次.) topology.hdfs.uri:"hdfs://host:port" (可選的配置, 默認情況下,我們會在core-site.xml 文件中指定 "fs.defaultFS" 屬性) 你的topology 配置應該包括: topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"] 如果nimbus沒有上述配置,您需要添加它,然后重新啟動它。確保hadoop配置 文件(core-site.xml和hdfs-site.xml)以及具有所有依賴項的storm-hdfs jar都存在于nimbus的類路徑中。 Nimbus將使用配置文件中指定的 keytab 和主體對 Namenode 進行身份驗證。從那時起每一個 topology 提交,nimbus將模擬拓撲提交者用戶并代表代理令牌 topology 提交者用戶。如果通過將topology.auto-credentials設置為AutoHDFS啟動 topology(拓撲),nimbus將推送 將所有的工作人員的代理令牌用于您的 topology(拓撲),并且hdfs bolt / state將使用namenode進行身份驗證 這些令牌。 由于nimbus模擬topology(拓撲)提交者用戶,您需要確保hdfs.kerberos.principal中指定的用戶 具有代表其他用戶獲取令牌的權限。要實現這一點,您需要遵循配置指導 列在此鏈接上: [http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html](http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html) 你可以看這里如何配置安全的HDFS: [http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html](http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html). ### Using keytabs on all worker hosts 如果您已將hdfs用戶的 keytab 文件分發給所有潛在的worker ,那么可以使用此方法。你應該指定一個 使用HdfsBolt / State.withconfigKey(“somekey”)方法的hdfs配置密鑰,該密鑰的值映射應具有以下2個屬性: hdfs.keytab.file: "/path/to/keytab/" hdfs.kerberos.principal: "[user@EXAMPLE.com](mailto:user@EXAMPLE.com)" 在workers 上,bolt/Ttrident-staet code 將使用配置中提供的主體的keytab文件進行認證 Namenode。這種方法很危險,因為您需要確保所有 worker 的keytab文件位于同一位置,您需要 在集群中啟動新主機時記住這一點.
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看