## 1.安裝zookeeper
* 解壓zookeeper
```
tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ~/app/
```
* 配置進環境變量
```
export ZK_HOME=/home/bizzbee/app/zookeeper-3.4.5-cdh5.7.0
export PATH=$ZK_HOME/bin:$PATH
#source 一下
```
* 修改配置:復制conf目錄下的zoo_sample.cfg
* 里面的dataDir位置要修改掉。因為默認存儲在/tmp下面,重啟會清空。
* 所以修改到自己新建的目錄。`/home/bizzbee/app/zk_tmp`
```
dataDir=/home/bizzbee/app/zk_tmp
```
* 啟動zk
`./zkServer.sh start`
## 2.單節點單broker
* 解壓kafka
```
tar -zxvf kafka_2.11-2.1.1.tgz -C ~/app/
```
* 修改配置`config/server.properties`
```
listeners = PLAINTEXT://spark:9092
log.dirs=/home/bizzbee/app/kafka-logs
#log地址同樣是自己創建目錄,不能在/tmp下面
zookeeper.connect=spark:2181
```
* 啟動kafka,啟動之前配置kafka環境到配置文件,并source生效。
```
kafka-server-start.sh $KAFKA_HOME/config/server.properties
```
* 創建一個topic(之前啟動的zookeeper不要關掉)
```
kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 1 --partitions 1 --topic bizzbee-topic
```
* 查看所有topic
```
[bizzbee@spark config]$ kafka-topics.sh --list --zookeeper spark:2181
bizzbee-topic
```
* 發送消息(生產消息)
```
[bizzbee@spark config]$ kafka-console-producer.sh --broker-list spark:9092 --topic bizzbee-topic
>
```
* 接受消息(消費)
```
kafka-console-consumer.sh --bootstrap-server spark:9092 --topic bizzbee-topic --from-beginning
```

對于**消費者**,kafka中有兩個設置的地方:對于老的消費者,由**\--zookeeper參數**設置;對于新的消費者,由**\--bootstrap-server參數**設置
如果使用了--zookeeper參數,那么consumer的信息將會存放在zk之中
查看的方法是使用./zookeeper-client,然后 ls /consumers/\[group\_id\]/offsets/\[topic\]/\[broker\_id-part\_id\],這個是查看某個group\_id的某個topic的offset
如果使用了--bootstrap-server參數,那么consumer的信息將會存放在kafka之中。
> 所以注意,新的消費者應該是kafka所在的9092端口。老的消費者應該從zk的2181端口拿消息。
* 在生產端輸入消息

* from-beginning的意思是,消費者啟動時之前已經消費的消息也會接收到。就是從最開始接收。
* 查看所有topic詳細信息。
```
kafka-topics.sh --describe --zookeer spark:2181
```
## 3.單節點多broker部署
* 復制三個之前的配置文件。
```
[bizzbee@spark config]$ cp server.properties server1.properties
[bizzbee@spark config]$ cp server.properties server2.properties
[bizzbee@spark config]$ cp server.properties server3.properties
```
* 三個都進行修改(一下是需要修改的)
```
# id:1,2,3
broker.id=1
#端口9093,94,95
listeners = PLAINTEXT://spark:9093
# 日志也新建三個文件夾
log.dirs=/home/bizzbee/app/tmp/kafka-logs1
```
* 這次以后臺daemon的方式啟動kafka(不占用終端)
```
kafka-server-start.sh -daemon $KAFKA_HOME/config/server1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server3.properties &
```
* 創建三個副本的topic
```
kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 3 --partitions 1 --topic bizzbee-replicated-topic
```
* 查看剛創建的topic
```
[bizzbee@spark config]$ kafka-topics.sh --describe --zookeeper spark:2181 --topic bizzbee-replicated-topic
Topic:bizzbee-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: bizzbee-replicated-topic Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
```
* 創建生產者:
```
kafka-console-producer.sh --broker-list spark:9093,spark:9094,spark:9095 --topic bizzbee-replicated-topic
```
* 創建消費者:
```
kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic
```
> 這里從9093,94,95 都可以取到消息。