# Kafka實戰
實驗環境為Ubuntu。
### 安裝
Kafka的安裝非常簡單,只需要下載解壓就可以了。需要注意的是Kafka依賴于Java環境,所以確保你的系統中裝有JDK。
~~~
//安裝sun默認JDK
drfish@kafka-5934:~# sudo apt-get install default-jdk
//下載Kafka并解壓
drfish@kafka-5934:~$ wget http://apache.mirrors.lucidnetworks.net/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
jshen4@kafka-5934:~$ tar -xzf kafka_2.11-0.9.0.0.tgz
jshen4@kafka-5934:~$ cd kafka_2.11-0.9.0.0/
~~~
### 啟動
Kafka的運行是依賴于Zookeeper的,Zookeeper是一個為分布式應用提供一致性服務的軟件,提供的功能包括:配置維護、名字服務、分布式同步、組服務等。在這里我們直接用Kafka自帶的Zookeeper即可。
~~~
//啟動Zookeeper
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/zookeeper-server-start.sh config/zookeeper.properties &
//啟動Kafka
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server.
properties &
~~~
### 創建話題
所有消息要發布到相應的話題下,我們來創建一個測試話題。
~~~
//創建一個名為test的topic
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
//查看現有的topic,正常情況應輸出test和一些日志
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --list --zookeeper l
ocalhost:2181
~~~
### 發送消息
現在可以通過生產者腳本來發布消息了,運行腳本后就可在命令行輸入消息。
~~~
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message.
~~~
### 接受消息
創建一個消費者來接收消息,通過自帶的消費者腳本可以簡單的把接受的消息輸出,在命令行中可以看見剛剛輸入的那條信息“This is a message.”。
~~~
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message.
~~~
到這里一條最基本的流程就走通了,下面來嘗試一下Kafka的多節點集群。
### 多節點集群嘗試
在啟動新節點之前,首先要修改配置文件,因為已經有一個Kafka進程在運行中,我們要保證新的Kafka節點不與它沖突。運行下面的命令,對相應文件做如下修改:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
~~~
//配置節點啟動文件
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ cp config/server.properties config/server-1.properties
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ cp config/server.properties config/serve
r-2.properties
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ vi config/server-1.properties
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ vi config/server-2.properties
//啟動新節點
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server-1.properties &
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server-2.properties &
~~~
接下去要創建能夠被多個節點接收的話題。從返回的結果可以看出,主節點是節點0,節點1和2是從節點。而Isr后面的節點是指與主節點同步的節點。
~~~
//創建一個叫做my-replicated-topic的話題,只有一個分區,但備份因子為3
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
//查看新創建話題的結果
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
~~~
測試一下消息的發布與接收。
~~~
//向新的話題發布消息
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
new message
//消費者讀取消息
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
new message
~~~
既然有了備份節點,我們就來看一下它們的實際作用,我們通過關閉主節點進程來模擬主節點異常的情況,測試從節點能否繼續正常完成主節點應該做的工作。
~~~
//查看我們的主節點運行在哪一個后臺命令中,由于主節點是0,即2號命令
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ jobs
[1] Running bin/zookeeper-server-start.sh config/zookeeper.properties &
[2] Running bin/kafka-server-start.sh config/server.properties &
[3]- Running bin/kafka-server-start.sh config/server-1.properties &
[4]+ Running bin/kafka-server-start.sh config/server-2.properties &
//將對應的命令調到前臺運行,并通過^C終止命令
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ fg 2
//在終止了上面一條命令后發現如下日志,新的主節點是原來的從節點1
[2015-12-07 23:55:53,118] INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer)
[2015-12-07 23:55:53,158] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
//查看話題消息來驗證我們的發現
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2
//看新的主節點能否處理消息
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
new message
~~~
從上面的實驗結果可以看出,當主節點0發生異常時,從節點1變為主節點,此時節點0仍在這個備份組里,但它已經不與其它節點同步(通過Isr屬性看出)。接下去做最后一個實驗
,如果現在我們重啟節點一會怎么樣呢?
~~~
//重啟節點1
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server.
properties &
//查看話題
drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2,0
~~~
我們可以看出來節點1又處在同步狀態了,我們可以看出Kafka節點的可用性是非常好的,如果節點出現異常,它會臨時把該節點廢棄,一旦當節點恢復正常,它又使節點進行正常的備份工作。