## 5) tcp鏈接與Message消息封裝
? 好了,現在我們來將服務器的連接做一個簡單的封裝,在這之前,我們要將我我們所發的數據做一個規定,采用TLV的格式,來進行封裝。目的是解決TCP傳輸的粘包問題。
### 5.1 Message消息封裝

? 先創建一個message.h頭文件
> lars_reactor/include/message.h
```h
#pragma once
//解決tcp粘包問題的消息頭
struct msg_head
{
int msgid;
int msglen;
};
//消息頭的二進制長度,固定數
#define MESSAGE_HEAD_LEN 8
//消息頭+消息體的最大長度限制
#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)
```
? 接下來我們每次在server和 client之間傳遞數據的時候,都發送這種數據格式的頭再加上后面的數據內容即可。
### 5.2 創建一個tcp_conn連接類
> lars_reactor/include/tcp_conn.h
```h
#pragma once
#include "reactor_buf.h"
#include "event_loop.h"
//一個tcp的連接信息
class tcp_conn
{
public:
//初始化tcp_conn
tcp_conn(int connfd, event_loop *loop);
//處理讀業務
void do_read();
//處理寫業務
void do_write();
//銷毀tcp_conn
void clean_conn();
//發送消息的方法
int send_message(const char *data, int msglen, int msgid);
private:
//當前鏈接的fd
int _connfd;
//該連接歸屬的event_poll
event_loop *_loop;
//輸出buf
output_buf obuf;
//輸入buf
input_buf ibuf;
};
```
簡單說明一下里面的成員和方法:
**成員**:
`_connfd`:server剛剛accept成功的套接字
`_loop`:當前鏈接所綁定的事件觸發句柄.
`obuf`:鏈接輸出緩沖,向對端寫數據
`ibuf`:鏈接輸入緩沖,從對端讀數據
**方法**:
`tcp_client()`:構造,主要在里面實現初始化及創建鏈接鏈接的connect過程。
`do_read()`:讀數據處理業務,主要是EPOLLIN事件觸發。
`do_write()`:寫數據處理業務,主要是EPOLLOUT事件觸發。
`clean_conn()`:清空鏈接資源。
`send_message()`:將消息打包成TLV格式發送給對端。
? 接下來,實現以下`tcp_conn`類.
> lars_reactor/src/tcp_conn.cpp
```c
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
#include "tcp_conn.h"
#include "message.h"
//回顯業務
void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn)
{
conn->send_message(data, len, msgid);
}
//連接的讀事件回調
static void conn_rd_callback(event_loop *loop, int fd, void *args)
{
tcp_conn *conn = (tcp_conn*)args;
conn->do_read();
}
//連接的寫事件回調
static void conn_wt_callback(event_loop *loop, int fd, void *args)
{
tcp_conn *conn = (tcp_conn*)args;
conn->do_write();
}
//初始化tcp_conn
tcp_conn::tcp_conn(int connfd, event_loop *loop)
{
_connfd = connfd;
_loop = loop;
//1. 將connfd設置成非阻塞狀態
int flag = fcntl(_connfd, F_GETFL, 0);
fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);
//2. 設置TCP_NODELAY禁止做讀寫緩存,降低小包延遲
int op = 1;
setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h
//3. 將該鏈接的讀事件讓event_loop監控
_loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);
//4 將該鏈接集成到對應的tcp_server中
//TODO
}
//處理讀業務
void tcp_conn::do_read()
{
//1. 從套接字讀取數據
int ret = ibuf.read_data(_connfd);
if (ret == -1) {
fprintf(stderr, "read data from socket\n");
this->clean_conn();
return ;
}
else if ( ret == 0) {
//對端正常關閉
printf("connection closed by peer\n");
clean_conn();
return ;
}
//2. 解析msg_head數據
msg_head head;
//[這里用while,可能一次性讀取多個完整包過來]
while (ibuf.length() >= MESSAGE_HEAD_LEN) {
//2.1 讀取msg_head頭部,固定長度MESSAGE_HEAD_LEN
memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
this->clean_conn();
break;
}
if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
//緩存buf中剩余的數據,小于實際上應該接受的數據
//說明是一個不完整的包,應該拋棄
break;
}
//2.2 再根據頭長度讀取數據體,然后針對數據體處理 業務
//TODO 添加包路由模式
//頭部處理完了,往后偏移MESSAGE_HEAD_LEN長度
ibuf.pop(MESSAGE_HEAD_LEN);
//處理ibuf.data()業務數據
printf("read data: %s\n", ibuf.data());
//回顯業務
callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);
//消息體處理完了,往后便宜msglen長度
ibuf.pop(head.msglen);
}
ibuf.adjust();
return ;
}
//處理寫業務
void tcp_conn::do_write()
{
//do_write是觸發玩event事件要處理的事情,
//應該是直接將out_buf力度數據io寫會對方客戶端
//而不是在這里組裝一個message再發
//組裝message的過程應該是主動調用
//只要obuf中有數據就寫
while (obuf.length()) {
int ret = obuf.write2fd(_connfd);
if (ret == -1) {
fprintf(stderr, "write2fd error, close conn!\n");
this->clean_conn();
return ;
}
if (ret == 0) {
//不是錯誤,僅返回0表示不可繼續寫
break;
}
}
if (obuf.length() == 0) {
//數據已經全部寫完,將_connfd的寫事件取消掉
_loop->del_io_event(_connfd, EPOLLOUT);
}
return ;
}
//發送消息的方法
int tcp_conn::send_message(const char *data, int msglen, int msgid)
{
printf("server send_message: %s:%d, msgid = %d\n", data, msglen, msgid);
bool active_epollout = false;
if(obuf.length() == 0) {
//如果現在已經數據都發送完了,那么是一定要激活寫事件的
//如果有數據,說明數據還沒有完全寫完到對端,那么沒必要再激活等寫完再激活
active_epollout = true;
}
//1 先封裝message消息頭
msg_head head;
head.msgid = msgid;
head.msglen = msglen;
//1.1 寫消息頭
int ret = obuf.send_data((const char *)&head, MESSAGE_HEAD_LEN);
if (ret != 0) {
fprintf(stderr, "send head error\n");
return -1;
}
//1.2 寫消息體
ret = obuf.send_data(data, msglen);
if (ret != 0) {
//如果寫消息體失敗,那就回滾將消息頭的發送也取消
obuf.pop(MESSAGE_HEAD_LEN);
return -1;
}
if (active_epollout == true) {
//2. 激活EPOLLOUT寫事件
_loop->add_io_event(_connfd, conn_wt_callback, EPOLLOUT, this);
}
return 0;
}
//銷毀tcp_conn
void tcp_conn::clean_conn()
{
//鏈接清理工作
//1 將該鏈接從tcp_server摘除掉
//TODO
//2 將該鏈接從event_loop中摘除
_loop->del_io_event(_connfd);
//3 buf清空
ibuf.clear();
obuf.clear();
//4 關閉原始套接字
int fd = _connfd;
_connfd = -1;
close(fd);
}
```
? 具體每個方法的實現,都很清晰。其中`conn_rd_callback()`和`conn_wt_callback()`是注冊讀寫事件的回調函數,設置為static是因為函數類型沒有this指針。在里面分別再調用`do_read()`和`do_write()`方法。
### 5.3 修正tcp_server對accept之后的處理方法
> lars_reactor/src/tcp_server.cpp
```c
//...
//開始提供創建鏈接服務
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept與客戶端創建鏈接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立鏈接過多,資源不夠
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error");
exit(1);
}
}
else {
//accept succ!
// ============= 將之前的觸發回調的刪掉,改成如下====
tcp_conn *conn = new tcp_conn(connfd, _loop);
if (conn == NULL) {
fprintf(stderr, "new tcp_conn error\n");
exit(1);
}
// ============================================
printf("get new connection succ!\n");
break;
}
}
}
//...
```
? 這樣,每次accept成功之后,創建一個與當前客戶端套接字綁定的tcp_conn對象。在構造里就完成了基本的對于EPOLLIN事件的監聽和回調動作.
? 現在可以先編譯一下,保證沒有語法錯誤,但是如果想測試,就不能夠使用`nc`指令測試了,因為現在服務端只能夠接收我們自定義的TLV格式的報文。那么我們需要自己寫一個客戶端來完成基本的測試。
---
### 關于作者:
作者:`Aceld(劉丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld)

>**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
- 一、Lars系統概述
- 第1章-概述
- 第2章-項目目錄構建
- 二、Reactor模型服務器框架
- 第1章-項目結構與V0.1雛形
- 第2章-內存管理與Buffer封裝
- 第3章-事件觸發EventLoop
- 第4章-鏈接與消息封裝
- 第5章-Client客戶端模型
- 第6章-連接管理及限制
- 第7章-消息業務路由分發機制
- 第8章-鏈接創建/銷毀Hook機制
- 第9章-消息任務隊列與線程池
- 第10章-配置文件讀寫功能
- 第11章-udp服務與客戶端
- 第12章-數據傳輸協議protocol buffer
- 第13章-QPS性能測試
- 第14章-異步消息任務機制
- 第15章-鏈接屬性設置功能
- 三、Lars系統之DNSService
- 第1章-Lars-dns簡介
- 第2章-數據庫創建
- 第3章-項目目錄結構及環境構建
- 第4章-Route結構的定義
- 第5章-獲取Route信息
- 第6章-Route訂閱模式
- 第7章-Backend Thread實時監控
- 四、Lars系統之Report Service
- 第1章-項目概述-數據表及proto3協議定義
- 第2章-獲取report上報數據
- 第3章-存儲線程池及消息隊列
- 五、Lars系統之LoadBalance Agent
- 第1章-項目概述及構建
- 第2章-主模塊業務結構搭建
- 第3章-Report與Dns Client設計與實現
- 第4章-負載均衡模塊基礎設計
- 第5章-負載均衡獲取Host主機信息API
- 第6章-負載均衡上報Host主機信息API
- 第7章-過期窗口清理與過載超時(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-負載均衡獲取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能測試工具
- 第12章- Lars啟動工具腳本