一個集群包含若干成員,要對這些成員進行管理就必須要有一張包含所有成員的列表,當要對某個節點做操作時通過這個列表可以準確找到該節點的地址進而對該節點發送操作消息。如何維護這張包含所有成員的列表是本節要討論的主題。
成員維護是集群的基礎功能,一般劃分一個獨立模塊或層完成此功能,它提供成員列表查詢、成員維護、成員列表改變事件通知等能力。由于tribes定位于基于同等節點之間的通信,所以并不存在主節點選舉的問題,它所要具備的功能是自動發現節點,即新節點加入要通知集群其他成員更新成員列表,讓每個節點都能及時更新成員列表,每個節點都維護一份集群成員表。如圖,節點1、節點2、節點3使用組播通過交換機各自已經維護一份成員列表,且他們隔一段時間向交換機組播自己節點消息,即心跳操作。當第四個節點加入集群組,節點四向交換機組播自己的節點消息,原理三個節點接收到后各自把節點四加入到各自的成員列表中,而原來三個節點也不斷向交換機發送節點消息,節點四接收到后依次更新成員列表信息,最終達到四個節點都擁有四個節點成員信息。
?
看下tribes的集群是如何設計實現以上功能的,其成員列表的創建維護是基于經典的組播方式實現,每個節點都創建一個節點信息發射器和節點信息接收器,讓他們運行于獨立的線程中。發射器用于向組內發送自己節點的消息,而接收器則用于接收其他節點發送過來的節點消息并進行處理。要使節點之間通信能被識別就需要定義一個語義,即約定報文協議的結構,tribes的成員報文是這樣定義的,兩個固定值用于表示報文的開始和結束,開始標識TRIBES_MBR_BEGIN?的值為字節數組84,?82,?73,?66,?69,?83,?45,?66,?1,?0,結束標識TRIBES_MBR_END的值為字節數組84,?82,?73,?66,?69,?83,?45,?69,?1,?0,整個協議包結構為:開始標識(10bytes)+包長度(4bytes)+存活時間(8bytes)+tcp端口(4bytes)+安全端口(4bytes)+udp端口(4bytes)+host長度(1byte)+host(nbytes)+命令長度(4bytes)+命令(nbytes)+域名長度(4bytes)+域名(nbytes)+唯一會話id(16bytes)+有效負載長度(4bytes)+有效負載(nbytes)+結束標識(10bytes)。成員發射器按照協議組織成包結構并組播,接收器接收包并按照協議進行解包,根據包信息維護成員表。
下面用一段代碼簡單展示實現過程,由于篇幅問題包的處理省略:
~~~
public?class?McastService?{
private?MulticastSocket?socket;
private?String?address?=?"228.0.0.4";
private?int?port?=?8000;
private?InetAddress?addr;
private?byte[]?buffer?=?new?byte[2048];
private?DatagramPacket?receivePacket;
private?final?Object?sendLock?=?new?Object();
public?void?start()?{
try?{
addr?=?InetAddress.getByName(address);
receivePacket?=?new?DatagramPacket(buffer,?buffer.length,?addr,port);
socket.joinGroup(addr);
new?ReceiverThread().start();
new?SenderThread().start();
}?catch?(IOException?e)?{
}
}
public?class?ReceiverThread?extends?Thread?{
public?void?run()?{
while?(true)?{
try?{
receive();
}?catch?(ArrayIndexOutOfBoundsException?ax)?{
}
}
}
}
public?class?SenderThread?extends?Thread?{
public?void?run()?{
while?(true)?{
try?{
send();
}?catch?(Exception?x)?{
}
try?{
Thread.sleep(1000);
}?catch?(Exception?ignore)?{
}
}
}
}
public?void?send()?{
byte[]?data?=?按照成員協議組織包結構;
DatagramPacket?packet?=?new?DatagramPacket(data,?data.length,?addr,?port);
try?{
socket.send(packet);
}?catch?(IOException?e)?{
}
}
public?void?receive()?{
try?{
socket.receive(receivePacket);
解析處理成員報文。
}?catch?(IOException?e)?{
}
}
}
~~~
第一步要先執行加入組播成員操作,接著分別啟動接收器線程、發射器線程,一般接收器要優先啟動。發射器每隔1秒組織協議包發送心跳,組播組內成員的接收器對接收到的協議報文進行解析,按照一定的邏輯更新各自節點本地成員列表,如果成員表已包含協議包的成員則只更新存活時間等消息。
Tribes利用上述原理維護集群成員,并且由獨立模塊MembershipService提供成員的相關服務,例如獲取集群所有成員相關信息等。
喜歡java的同學可以交個朋友:
