# Storm HDFS Integration
Storm組件和 HDFS 文件系統交互.
## Usage
以下示例將pipe(“|”)分隔的文件寫入HDFS路徑hdfs://localhost:54310/foo。 每1000個 tuple 之后,它將同步文件系統,使該數據對其他HDFS客戶端可見。當它們達到5MB大小時,它將旋轉文件。
```
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/foo/");
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:54310")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
```
### Packaging a Topology
當打包你的 topology(拓撲)代碼的時候,要使用 插件,不要使用插件.
shade 插件提供了合并 Jar manifest entries 的功能,hadoop client 可以用來做URL scheme 方案.
如果你經歷了類似于下面的錯誤:
```
java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
```
這表明你的 topology jar沒有正確的打包.
如果你使用maven來創建你的topology jar,你應該使用下面 `maven-shade-plugin` 配置來創建你的 topology jar:
```
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
```
### Specifying a Hadoop Version
默認情況下,storm-hdfs使用下面的Hadoop依賴.
```
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
```
如果你使用的Hadoop版本不同,你可以移除storm-hdfs中 Hadoop依賴,并添加你自己的依賴到你的 pom中.
Hadoop客戶端版本不兼容,錯誤如:
```
com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)
```
## Customization
### Record Formats(記錄格式化)
記錄格式化可以通過提供的`org.apache.storm.hdfs.format.RecordFormat`接口來控制:
```
public interface RecordFormat extends Serializable {
byte[] format(Tuple tuple);
}
```
提供的`org.apache.storm.hdfs.format.DelimitedRecordFormat`實現可以生成如 CSV 和 制表符分隔 的文件. T
### File Naming
文件名稱可以通過提供的`org.apache.storm.hdfs.format.FileNameFormat`接口來控制:
```
public interface FileNameFormat extends Serializable {
void prepare(Map conf, TopologyContext topologyContext);
String getName(long rotation, long timeStamp);
String getPath();
}
```
提供的 `org.apache.storm.hdfs.format.DefaultFileNameFormat` 創建的文件名稱格式如下:
```
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
```
例如:
```
MyBolt-5-7-1390579837830.txt
```
默認情況下,前綴是空的,擴展標識是".txt".
### Sync Policies
同步策略允許你將 buffered data 緩沖到底層文件系統(從而client可以讀取數據),通過實現`org.apache.storm.hdfs.sync.SyncPolicy` 接口:
```
public interface SyncPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
```
`HdfsBolt` 會為每個要處理的 tuple 調用 `mark()`方法.返回 `true` 會觸發 `HdfsBolt`執行同步/刷新,之后會調用`reset()`方法.
`org.apache.storm.hdfs.sync.CountSyncPolicy`類可以簡單的觸發同步,當一定數量的tuple執行完成后.
### File Rotation Policies
類似于同步策略,文件反轉策略允許你通過 `org.apache.storm.hdfs.rotation.FileRotation` 接口來控制數據文件反轉.
```
public interface FileRotationPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
```
`org.apache.storm.hdfs.rotation.FileSizeRotationPolicy`實現允許數據文件達到指定的文件大小后,觸發文件反轉.
```
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
```
### File Rotation Actions
HDFS bolt 和 Trident State實現允許你注冊任意數量的`RotationAction`s. `RotationAction`s要做的就是提供一個hook,當文件反轉后執行一些操作。例如,移動一個文件到不同的路徑下,或者重命名.
```
public interface RotationAction extends Serializable {
void execute(FileSystem fileSystem, Path filePath) throws IOException;
}
```
Storm-HDFS 包括一個簡單的操作,反轉后移動一個文件:
```
public class MoveFileAction implements RotationAction {
private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);
private String destination;
public MoveFileAction withDestination(String destDir){
destination = destDir;
return this;
}
@Override
public void execute(FileSystem fileSystem, Path filePath) throws IOException {
Path destPath = new Path(destination, filePath.getName());
LOG.info("Moving file {} to {}", filePath, destPath);
boolean success = fileSystem.rename(filePath, destPath);
return;
}
}
```
如果你使用 Trident,并且是有序的文件,你可以像下面這樣使用:
```
HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
.withFileNameFormat(fileNameFormat)
.withSequenceFormat(new DefaultSequenceFormat("key", "data"))
.withRotationPolicy(rotationPolicy)
.withFsUrl("hdfs://localhost:54310")
.addRotationAction(new MoveFileAction().withDestination("/dest2/"));
```
## Support for HDFS Sequence Files
`org.apache.storm.hdfs.bolt.SequenceFileBolt`類允許你寫入storm data 到連續的HDFS文件中:
```
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withExtension(".seq")
.withPath("/data/");
// create sequence format instance.
DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence");
SequenceFileBolt bolt = new SequenceFileBolt()
.withFsUrl("hdfs://localhost:54310")
.withFileNameFormat(fileNameFormat)
.withSequenceFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.withCompressionType(SequenceFile.CompressionType.RECORD)
.withCompressionCodec("deflate");
```
`SequenceFileBolt` 需要你提供一個 `org.apache.storm.hdfs.bolt.format.SequenceFormat`,用來映射 tuples到 key/value pairs。
```
public interface SequenceFormat extends Serializable {
Class keyClass();
Class valueClass();
Writable key(Tuple tuple);
Writable value(Tuple tuple);
}
```
## Trident API
storm-hdfs 還包括一個 Trident `state` 實現,用于寫入數據到HDFS,API類似于 bolts.
```
Fields hdfsFields = new Fields("field1", "field2");
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/trident")
.withPrefix("trident")
.withExtension(".txt");
RecordFormat recordFormat = new DelimitedRecordFormat()
.withFields(hdfsFields);
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
HdfsState.Options options = new HdfsState.HdfsFileOptions()
.withFileNameFormat(fileNameFormat)
.withRecordFormat(recordFormat)
.withRotationPolicy(rotationPolicy)
.withFsUrl("hdfs://localhost:54310");
StateFactory factory = new HdfsStateFactory().withOptions(options);
TridentState state = stream
.partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
```
要使用序列文件`State`實現,請使用`HdfsState.SequenceFileOptions`:
```
HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
.withFileNameFormat(fileNameFormat)
.withSequenceFormat(new DefaultSequenceFormat("key", "data"))
.withRotationPolicy(rotationPolicy)
.withFsUrl("hdfs://localhost:54310")
.addRotationAction(new MoveFileAction().toDestination("/dest2/"));
```
## Working with Secure HDFS
如果您的 topology(拓撲)將與安全的HDFS進行交互,則您的 bolts/states 需要通過NameNode進行身份驗證。我們 目前有2個選項支持:
### Using HDFS delegation tokens
您的管理員可以配置nimbus來代表拓撲提交者用戶自動獲取授權令牌。 nimbus需要從以下配置開始:
nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] hdfs.keytab.file: "/path/to/keytab/on/nimbus" (hdfs 超級管理員可以代理其他用戶.) hdfs.kerberos.principal: "[superuser@EXAMPLE.com](mailto:superuser@EXAMPLE.com)" nimbus.credential.renewers.freq.secs : 82800 (23 小時, hdfs tokens 需要每24個小時更新一次.) topology.hdfs.uri:"hdfs://host:port" (可選的配置, 默認情況下,我們會在core-site.xml 文件中指定 "fs.defaultFS" 屬性)
你的topology 配置應該包括: topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"]
如果nimbus沒有上述配置,您需要添加它,然后重新啟動它。確保hadoop配置 文件(core-site.xml和hdfs-site.xml)以及具有所有依賴項的storm-hdfs jar都存在于nimbus的類路徑中。 Nimbus將使用配置文件中指定的 keytab 和主體對 Namenode 進行身份驗證。從那時起每一個 topology 提交,nimbus將模擬拓撲提交者用戶并代表代理令牌 topology 提交者用戶。如果通過將topology.auto-credentials設置為AutoHDFS啟動 topology(拓撲),nimbus將推送 將所有的工作人員的代理令牌用于您的 topology(拓撲),并且hdfs bolt / state將使用namenode進行身份驗證 這些令牌。
由于nimbus模擬topology(拓撲)提交者用戶,您需要確保hdfs.kerberos.principal中指定的用戶 具有代表其他用戶獲取令牌的權限。要實現這一點,您需要遵循配置指導 列在此鏈接上: [http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html](http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html)
你可以看這里如何配置安全的HDFS: [http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html](http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html).
### Using keytabs on all worker hosts
如果您已將hdfs用戶的 keytab 文件分發給所有潛在的worker ,那么可以使用此方法。你應該指定一個 使用HdfsBolt / State.withconfigKey(“somekey”)方法的hdfs配置密鑰,該密鑰的值映射應具有以下2個屬性:
hdfs.keytab.file: "/path/to/keytab/" hdfs.kerberos.principal: "[user@EXAMPLE.com](mailto:user@EXAMPLE.com)"
在workers 上,bolt/Ttrident-staet code 將使用配置中提供的主體的keytab文件進行認證 Namenode。這種方法很危險,因為您需要確保所有 worker 的keytab文件位于同一位置,您需要 在集群中啟動新主機時記住這一點.
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度