# Beam 解釋器
原文鏈接 : [http://zeppelin.apache.org/docs/0.7.2/interpreter/beam.html](http://zeppelin.apache.org/docs/0.7.2/interpreter/beam.html)
譯文鏈接 : [http://www.apache.wiki/pages/viewpage.action?pageId=10030766](http://www.apache.wiki/pages/viewpage.action?pageId=10030766)
貢獻者 : [片刻](/display/~jiangzhonglian) [ApacheCN](/display/~apachecn) [Apache中文網](/display/~apachechina)
## 概觀
[Apache Beam](http://beam.incubator.apache.org/)是數據處理流水線的開源統一平臺。可以使用其中一個Beam SDK構建管道。管道的執行由不同的跑步者完成。目前,Beam支持Apache Flink Runner,Apache Spark Runner和Google Dataflow Runner。
## 如何使用
基本上,您可以編寫正常的Beam java代碼,您可以在其中確定Runner。您應該在類中寫入main方法,因為解釋器調用此main來執行管道。與Zeppelin正常模式不同,每段都被視為單獨的工作,與任何其他段落沒有任何關系。
下面是一個字的一個示范與字符串數組表示的數據計數例如但它可以通過更換從文件中讀取數據`Create.of(SENTENCES).withCoder(StringUtf8Coder.of())`用`TextIO.Read.from("path/to/filename.txt")`
```
%beam
// most used imports
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Create;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.*;
import org.apache.spark.SparkContext;
import org.apache.beam.runners.direct.*;
import org.apache.beam.sdk.runners.*;
import org.apache.beam.sdk.options.*;
import org.apache.beam.runners.spark.*;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.flink.*;
import org.apache.beam.runners.flink.examples.WordCount.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.options.PipelineOptions;
public class MinimalWordCount {
static List<String> s = new ArrayList<>();
static final String[] SENTENCES_ARRAY = new String[] {
"Hadoop is the Elephant King!",
"A yellow and elegant thing.",
"He never forgets",
"Useful data, or lets",
"An extraneous element cling!",
"A wonderful king is Hadoop.",
"The elephant plays well with Sqoop.",
"But what helps him to thrive",
"Are Impala, and Hive,",
"And HDFS in the group.",
"Hadoop is an elegant fellow.",
"An elephant gentle and mellow.",
"He never gets mad,",
"Or does anything bad,",
"Because, at his core, he is yellow",
};
static final List<String> SENTENCES = Arrays.asList(SENTENCES_ARRAY);
public static void main(String[] args) {
Options options = PipelineOptionsFactory.create().as(Options.class);
options.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of()))
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))
.apply(Count.<String> perElement())
.apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
@Override
public void processElement(DoFn<KV<String, Long>, String>.ProcessContext arg0)
throws Exception {
s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue());
}
}));
p.run();
System.out.println("%table word\tcount");
for (int i = 0; i < s.size(); i++) {
System.out.print(s.get(i));
}
}
}?
```
- 快速入門
- 什么是Apache Zeppelin?
- 安裝
- 配置
- 探索Apache Zeppelin UI
- 教程
- 動態表單
- 發表你的段落
- 自定義Zeppelin主頁
- 升級Zeppelin版本
- 從源碼編譯
- 使用Flink和Spark Clusters安裝Zeppelin教程
- 解釋器
- 概述
- 解釋器安裝
- 解釋器依賴管理
- 解釋器的模擬用戶
- 解釋員執行Hook(實驗)
- Alluxio 解釋器
- Beam 解釋器
- BigQuery 解釋器
- Cassandra CQL 解釋器
- Elasticsearch 解釋器
- Flink 解釋器
- Geode/Gemfire OQL 解釋器
- HBase Shell 解釋器
- HDFS文件系統 解釋器
- Hive 解釋器
- Ignite 解釋器
- JDBC通用 解釋器
- Kylin 解釋器
- Lens 解釋器
- Livy 解釋器
- Markdown 解釋器
- Pig 解釋器
- PostgreSQL, HAWQ 解釋器
- Python 2&3解釋器
- R 解釋器
- Scalding 解釋器
- Scio 解釋器
- Shell 解釋器
- Spark 解釋器
- 系統顯示
- 系統基本顯示
- 后端Angular API
- 前端Angular API
- 更多
- 筆記本存儲
- REST API
- 解釋器 API
- 筆記本 API
- 筆記本資源 API
- 配置 API
- 憑據 API
- Helium API
- Security ( 安全 )
- Shiro 授權
- 筆記本 授權
- 數據源 授權
- Helium 授權
- Advanced ( 高級 )
- Zeppelin on Vagrant VM ( Zeppelin 在 Vagrant 虛擬機上 )
- Zeppelin on Spark Cluster Mode( Spark 集群模式下的 Zeppelin )
- Zeppelin on CDH ( Zeppelin 在 CDH 上 )
- Contibute ( 貢獻 )
- Writing a New Interpreter ( 寫一個新的解釋器 )
- Writing a new Visualization (Experimental) ( 編寫新的可視化(實驗) )
- Writing a new Application (Experimental) ( 寫一個新的應用程序( 實驗 ) )
- Contributing to Apache Zeppelin ( Code ) ( 向 Apache Zeppelin 貢獻( 代碼 ) )
- Contributing to Apache Zeppelin ( Website ) ( 向 Apache Zeppelin 貢獻(website) )