## 6) tcp客戶端觸發模型
? 我們可以給客戶端添加觸發模型。同時也提供一系列的接口供開發者寫客戶端應用程序來使用。
### 6.1 tcp_client類設計
> lars_reactor/include/tcp_client.h
```c
#pragma once
#include "io_buf.h"
#include "event_loop.h"
#include "message.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
class tcp_client
{
public:
//初始化客戶端套接字
tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name);
//發送message方法
int send_message(const char *data, int msglen, int msgid);
//創建鏈接
void do_connect();
//處理讀業務
int do_read();
//處理寫業務
int do_write();
//釋放鏈接資源
void clean_conn();
~tcp_client();
//設置業務處理回調函數
void set_msg_callback(msg_callback *msg_cb)
{
this->_msg_callback = msg_cb;
}
bool connected; //鏈接是否創建成功
//server端地址
struct sockaddr_in _server_addr;
io_buf _obuf;
io_buf _ibuf;
private:
int _sockfd;
socklen_t _addrlen;
//客戶端的事件處理機制
event_loop* _loop;
//當前客戶端的名稱 用戶記錄日志
const char *_name;
msg_callback *_msg_callback;
};
```
? 這里注意的是,tcp_client并不是tcp_server的一部分,而是單純為寫客戶端提供的接口。所以這里也需要實現一套對讀寫事件處理的業務。 這里使用的讀寫緩沖是原始的`io_buf`,并不是服務器封裝好的`reactor_buf`原因是后者是轉為server做了一層封裝,io_buf的基本方法比較全。
**關鍵成員**:
`_sockfd`:當前客戶端套接字。
`_server_addr`: 鏈接的服務端的IP地址。
`_loop`: 客戶端異步觸發事件機制event_loop句柄。
`_msg_callback`: 當前客戶端處理服務端的回調業務。
`connected`:是否已經成功connect服務端的標致。
**方法**:
`tcp_client()`:構造函數,主要是在里面完成基本的套接字初始化及connect操作.
`do_connect()`:創建鏈接
`do_read()`:處理鏈接的讀業務。
`do_write()`:處理鏈接的寫業務。
`clean_conn()`:清空鏈接資源。
### 6.2 創建鏈接
> lars_reactor/src/tcp_client.cpp
```c
tcp_client::tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name):
_ibuf(4194304),
_obuf(4194304)
{
_sockfd = -1;
_msg_callback = NULL;
_name = name;
_loop = loop;
bzero(&_server_addr, sizeof(_server_addr));
_server_addr.sin_family = AF_INET;
inet_aton(ip, &_server_addr.sin_addr);
_server_addr.sin_port = htons(port);
_addrlen = sizeof(_server_addr);
this->do_connect();
}
```
? 這里初始化tcp_client鏈接信息,然后調用`do_connect()`創建鏈接.
> lars_reactor/src/tcp_client.cpp
```c
//創建鏈接
void tcp_client::do_connect()
{
if (_sockfd != -1) {
close(_sockfd);
}
//創建套接字
_sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP);
if (_sockfd == -1) {
fprintf(stderr, "create tcp client socket error\n");
exit(1);
}
int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen);
if (ret == 0) {
//鏈接創建成功
connected = true;
//注冊讀回調
_loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
//如果寫緩沖去有數據,那么也需要觸發寫回調
if (this->_obuf.length != 0) {
_loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);
}
printf("connect %s:%d succ!\n", inet_ntoa(_server_addr.sin_addr), ntohs(_server_addr.sin_port));
}
else {
if(errno == EINPROGRESS) {
//fd是非阻塞的,可能會出現這個錯誤,但是并不表示鏈接創建失敗
//如果fd是可寫狀態,則為鏈接是創建成功的.
fprintf(stderr, "do_connect EINPROGRESS\n");
//讓event_loop去觸發一個創建判斷鏈接業務 用EPOLLOUT事件立刻觸發
_loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);
}
else {
fprintf(stderr, "connection error\n");
exit(1);
}
}
}
```
### 6.3 有關非阻塞客戶端socket創建鏈接問題
? 這里轉載一篇文章,是有關非阻塞套接字,connect返回-1,并且errno是`EINPROGRESS`的情況。因為我們的client是采用event_loop形式,socket需要被設置為非阻塞。所以需要針對這個情況做處理。下面是說明。
? 客戶端測試程序時,由于出現很多客戶端,經過connect成功后,代碼卡在recv系統調用中,后來發現可能是由于socket默認是阻塞模式,所以會令很多客戶端鏈接處于鏈接卻不能傳輸數據狀態。
? 后來修改socket為非阻塞模式,但在connect的時候,發現返回值為-1,剛開始以為是connect出現錯誤,但在服務器上看到了鏈接是ESTABLISED狀態。證明鏈接是成功的
? 但為什么會出現返回值是-1呢? 經過查詢資料,以及看stevens的APUE,也發現有這么一說。
? 當connect在非阻塞模式下,會出現返回`-1`值,錯誤碼是`EINPROGRESS`,但如何判斷connect是聯通的呢?stevens書中說明要在connect后,繼續判斷該socket是否可寫?
? **若可寫,則證明鏈接成功。**
? 如何判斷可寫,有2種方案,一種是select判斷是否可寫,二用poll模型。
select:
```c
int CheckConnect(int iSocket)
{
fd_set rset;
FD_ZERO(&rset);
FD_SET(iSocket, &rset);
timeval tm;
tm. tv_sec = 0;
tm.tv_usec = 0;
if ( select(iSocket + 1, NULL, &rset, NULL, &tval) <= 0)
{
close(iSocket);
return -1;
}
if (FD_ISSET(iSocket, &rset))
{
int err = -1;
socklen_t len = sizeof(int);
if ( getsockopt(iSocket, SOL_SOCKET, SO_ERROR ,&err, &len) < 0 )
{
close(iSocket);
printf("errno:%d %s\n", errno, strerror(errno));
return -2;
}
if (err)
{
errno = err;
close(iSocket);
return -3;
}
}
return 0;
}
```
poll:
```c
int CheckConnect(int iSocket) {
struct pollfd fd;
int ret = 0;
socklen_t len = 0;
fd.fd = iSocket;
fd.events = POLLOUT;
while ( poll (&fd, 1, -1) == -1 ) {
if( errno != EINTR ){
perror("poll");
return -1;
}
}
len = sizeof(ret);
if ( getsockopt (iSocket, SOL_SOCKET, SO_ERROR, &ret, &len) == -1 ) {
perror("getsockopt");
return -1;
}
if(ret != 0) {
fprintf (stderr, "socket %d connect failed: %s\n",
iSocket, strerror (ret));
return -1;
}
return 0;
}
```
### 6.3 針對EINPROGRESS的連接創建處理
? 看上面`do_connect()`的代碼其中一部分:
```c
if(errno == EINPROGRESS) {
//fd是非阻塞的,可能會出現這個錯誤,但是并不表示鏈接創建失敗
//如果fd是可寫狀態,則為鏈接是創建成功的.
fprintf(stderr, "do_connect EINPROGRESS\n");
//讓event_loop去觸發一個創建判斷鏈接業務 用EPOLLOUT事件立刻觸發
_loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);
}
```
這里是又觸發一個寫事件,直接讓程序流程跳轉到`connection_delay()`方法.那么我們需要在里面判斷鏈接是否已經判斷成功,并且做出一定的創建成功之后的業務動作。
> lars_reactor/src/tcp_client.cpp
```c
//判斷鏈接是否是創建鏈接,主要是針對非阻塞socket 返回EINPROGRESS錯誤
static void connection_delay(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client*)args;
loop->del_io_event(fd);
int result = 0;
socklen_t result_len = sizeof(result);
getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
if (result == 0) {
//鏈接是建立成功的
cli->connected = true;
printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
//建立連接成功之后,主動發送send_message
const char *msg = "hello lars!";
int msgid = 1;
cli->send_message(msg, strlen(msg), msgid);
loop->add_io_event(fd, read_callback, EPOLLIN, cli);
if (cli->_obuf.length != 0) {
//輸出緩沖有數據可寫
loop->add_io_event(fd, write_callback, EPOLLOUT, cli);
}
}
else {
//鏈接創建失敗
fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
}
}
```
? 這是一個事件回調,所以用的是static方法而不是成員方法。首先是利用`getsockopt`判斷鏈接是否創建成功,如果成功,那么 我們當前這個版本的客戶端是直接寫死主動調用`send_message()`方法發送給服務端一個`hello lars!`字符串。然后直接交給我們的`read_callback()`方法處理,當然如果寫緩沖有數據,我們也會觸發寫的`write_callback()`方法。
? 接下來,看看這兩個callback以及send_message是怎么實現的。
**callback**
> lars_reactor/src/tcp_client.cpp
```c
static void write_callback(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client *)args;
cli->do_write();
}
static void read_callback(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client *)args;
cli->do_read();
}
//處理讀業務
int tcp_client::do_read()
{
//確定已經成功建立連接
assert(connected == true);
// 1. 一次性全部讀取出來
//得到緩沖區里有多少字節要被讀取,然后將字節數放入b里面。
int need_read = 0;
if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {
fprintf(stderr, "ioctl FIONREAD error");
return -1;
}
//確保_buf可以容納可讀數據
assert(need_read <= _ibuf.capacity - _ibuf.length);
int ret;
do {
ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);
} while(ret == -1 && errno == EINTR);
if (ret == 0) {
//對端關閉
if (_name != NULL) {
printf("%s client: connection close by peer!\n", _name);
}
else {
printf("client: connection close by peer!\n");
}
clean_conn();
return -1;
}
else if (ret == -1) {
fprintf(stderr, "client: do_read() , error\n");
clean_conn();
return -1;
}
assert(ret == need_read);
_ibuf.length += ret;
//2. 解包
msg_head head;
int msgid, length;
while (_ibuf.length >= MESSAGE_HEAD_LEN) {
memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);
msgid = head.msgid;
length = head.msglen;
/*
if (length + MESSAGE_HEAD_LEN < _ibuf.length) {
break;
}
*/
//頭部讀取完畢
_ibuf.pop(MESSAGE_HEAD_LEN);
//3. 交給業務函數處理
if (_msg_callback != NULL) {
this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
}
//數據區域處理完畢
_ibuf.pop(length);
}
//重置head指針
_ibuf.adjust();
return 0;
}
//處理寫業務
int tcp_client::do_write()
{
//數據有長度,切頭部索引是起始位置
assert(_obuf.head == 0 && _obuf.length);
int ret;
while (_obuf.length) {
//寫數據
do {
ret = write(_sockfd, _obuf.data, _obuf.length);
} while(ret == -1 && errno == EINTR);//非阻塞異常繼續重寫
if (ret > 0) {
_obuf.pop(ret);
_obuf.adjust();
}
else if (ret == -1 && errno != EAGAIN) {
fprintf(stderr, "tcp client write \n");
this->clean_conn();
}
else {
//出錯,不能再繼續寫
break;
}
}
if (_obuf.length == 0) {
//已經寫完,刪除寫事件
printf("do write over, del EPOLLOUT\n");
this->_loop->del_io_event(_sockfd, EPOLLOUT);
}
return 0;
}
//釋放鏈接資源,重置連接
void tcp_client::clean_conn()
{
if (_sockfd != -1) {
printf("clean conn, del socket!\n");
_loop->del_io_event(_sockfd);
close(_sockfd);
}
connected = false;
//重新連接
this->do_connect();
}
tcp_client::~tcp_client()
{
close(_sockfd);
}
```
? 這里是基本的讀數據和寫數據的處理業務實現。我們重點看`do_read()`方法,里面有段代碼:
```c
//3. 交給業務函數處理
if (_msg_callback != NULL) {
this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
}
```
? 是將我們從服務端讀取到的代碼,交給了`_msg_callback()`方法來處理的,這個實際上是用戶開發者自己在業務上注冊的回調業務函數。在tcp_client.h中我們已經提供了`set_msg_callback`暴露給開發者注冊使用。
------
**send_message**
> lars_reactor/src/tcp_client.cpp
```c
//主動發送message方法
int tcp_client::send_message(const char *data, int msglen, int msgid)
{
if (connected == false) {
fprintf(stderr, "no connected , send message stop!\n");
return -1;
}
//是否需要添加寫事件觸發
//如果obuf中有數據,沒必要添加,如果沒有數據,添加完數據需要觸發
bool need_add_event = (_obuf.length == 0) ? true:false;
if (msglen + MESSAGE_HEAD_LEN > this->_obuf.capacity - _obuf.length) {
fprintf(stderr, "No more space to Write socket!\n");
return -1;
}
//封裝消息頭
msg_head head;
head.msgid = msgid;
head.msglen = msglen;
memcpy(_obuf.data + _obuf.length, &head, MESSAGE_HEAD_LEN);
_obuf.length += MESSAGE_HEAD_LEN;
memcpy(_obuf.data + _obuf.length, data, msglen);
_obuf.length += msglen;
if (need_add_event) {
_loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);
}
return 0;
}
```
? 將發送的數據寫給obuf,然后出發write_callback將obuf的數據傳遞給對方服務端。
### 6.4 完成Lars Reactor V0.4開發
? 好了,現在我們框架部分已經完成,接下來我們就要實現一個serverapp 和 一個clientapp來進行測試.
我們創建`example/lars_reactor_0.4`文件夾。
**Makefile**
```makefile
CXX=g++
CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated
INC=-I../../include
LIB=-L../../lib -llreactor -lpthread
OBJS = $(addsuffix .o, $(basename $(wildcard *.cc)))
all:
$(CXX) -o server $(CFLAGS) server.cpp $(INC) $(LIB)
$(CXX) -o client $(CFLAGS) client.cpp $(INC) $(LIB)
clean:
-rm -f *.o server client
```
服務端代碼:
> server.cpp
```c
#include "tcp_server.h"
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
loop.event_process();
return 0;
}
```
客戶端代碼:
> client.cpp
```c
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>
//客戶端業務
void busi(const char *data, uint32_t len, int msgid, tcp_client *conn, void *user_data)
{
//得到服務端回執的數據
printf("recv server: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
int main()
{
event_loop loop;
//創建tcp客戶端
tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.4");
//注冊回調業務
client.set_msg_callback(busi);
//開啟事件監聽
loop.event_process();
return 0;
}
```
編譯并分別啟動server 和client
服務端輸出:
```bash
$ ./server
begin accept
get new connection succ!
read data: hello lars!
server send_message: hello lars!:11, msgid = 1
```
客戶端輸出:
```bash
$ ./client
do_connect EINPROGRESS
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
recv server: [hello lars!]
msgid: [1]
len: [11]
```
? 現在客戶端已經成功的發送數據給服務端,并且回顯的數據也直接被客戶端解析,我們的框架到現在就可以做一個基本的客戶端和服務端的完成了,但是還差很多,接下來我們繼續優化。
---
### 關于作者:
作者:`Aceld(劉丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld)

>**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
- 一、Lars系統概述
- 第1章-概述
- 第2章-項目目錄構建
- 二、Reactor模型服務器框架
- 第1章-項目結構與V0.1雛形
- 第2章-內存管理與Buffer封裝
- 第3章-事件觸發EventLoop
- 第4章-鏈接與消息封裝
- 第5章-Client客戶端模型
- 第6章-連接管理及限制
- 第7章-消息業務路由分發機制
- 第8章-鏈接創建/銷毀Hook機制
- 第9章-消息任務隊列與線程池
- 第10章-配置文件讀寫功能
- 第11章-udp服務與客戶端
- 第12章-數據傳輸協議protocol buffer
- 第13章-QPS性能測試
- 第14章-異步消息任務機制
- 第15章-鏈接屬性設置功能
- 三、Lars系統之DNSService
- 第1章-Lars-dns簡介
- 第2章-數據庫創建
- 第3章-項目目錄結構及環境構建
- 第4章-Route結構的定義
- 第5章-獲取Route信息
- 第6章-Route訂閱模式
- 第7章-Backend Thread實時監控
- 四、Lars系統之Report Service
- 第1章-項目概述-數據表及proto3協議定義
- 第2章-獲取report上報數據
- 第3章-存儲線程池及消息隊列
- 五、Lars系統之LoadBalance Agent
- 第1章-項目概述及構建
- 第2章-主模塊業務結構搭建
- 第3章-Report與Dns Client設計與實現
- 第4章-負載均衡模塊基礎設計
- 第5章-負載均衡獲取Host主機信息API
- 第6章-負載均衡上報Host主機信息API
- 第7章-過期窗口清理與過載超時(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-負載均衡獲取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能測試工具
- 第12章- Lars啟動工具腳本