## 12) udp服務與客戶端
? 接下來為了讓Reactor框架功能更加豐富,結合之前的功能,再加上udpserver的服務接口。udp我們暫時不考慮加線程池實現,只是單線程的處理方式。
### 12.1 udp_server服務端功能實現
> lars_reactor/include/udp_server.h
```c
#pragma once
#include <netinet/in.h>
#include "event_loop.h"
#include "net_connection.h"
#include "message.h"
class udp_server :public net_connection
{
public:
udp_server(event_loop *loop, const char *ip, uint16_t port);
virtual int send_message(const char *data, int msglen, int msgid);
//注冊消息路由回調函數
void add_msg_router(int msgid, msg_callback* cb, void *user_data = NULL);
~udp_server();
//處理消息業務
void do_read();
private:
int _sockfd;
char _read_buf[MESSAGE_LENGTH_LIMIT];
char _write_buf[MESSAGE_LENGTH_LIMIT];
//事件觸發
event_loop* _loop;
//服務端ip
struct sockaddr_in _client_addr;
socklen_t _client_addrlen;
//消息路由分發
msg_router _router;
};
```
? 對應的方法實現方式如下:
> lars_reactor/src/udp_server.cpp
```c
#include <signal.h>
#include <unistd.h>
#include <strings.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include "udp_server.h"
void read_callback(event_loop *loop, int fd, void *args)
{
udp_server *server = (udp_server*)args;
//處理業務函數
server->do_read();
}
void udp_server::do_read()
{
while (true) {
int pkg_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, (struct sockaddr *)&_client_addr, &_client_addrlen);
if (pkg_len == -1) {
if (errno == EINTR) {
continue;
}
else if (errno == EAGAIN) {
break;
}
else {
perror("recvfrom\n");
break;
}
}
//處理數據
msg_head head;
memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkg_len) {
//報文格式有問題
fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkg_len = %d\n", head.msgid, head.msglen, pkg_len);
continue;
}
//調用注冊的路由業務
_router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
}
}
udp_server::udp_server(event_loop *loop, const char *ip, uint16_t port)
{
//1 忽略一些信號
if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
perror("signal ignore SIGHUB");
exit(1);
}
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
perror("signal ignore SIGPIPE");
exit(1);
}
//2 創建套接字
//SOCK_CLOEXEC在execl中使用該socket則關閉,在fork中使用該socket不關閉
_sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
if (_sockfd == -1) {
perror("create udp socket");
exit(1);
}
//3 設置服務ip+port
struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_aton(ip, &servaddr.sin_addr);//設置ip
servaddr.sin_port = htons(port);//設置端口
//4 綁定
bind(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
//3 添加讀業務事件
_loop = loop;
bzero(&_client_addr, sizeof(_client_addr));
_client_addrlen = sizeof(_client_addr);
printf("server on %s:%u is running...\n", ip, port);
_loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
}
int udp_server::send_message(const char *data, int msglen, int msgid)
{
if (msglen > MESSAGE_LENGTH_LIMIT) {
fprintf(stderr, "too large message to send\n");
return -1;
}
msg_head head;
head.msglen = msglen;
head.msgid = msgid;
memcpy(_write_buf, &head, MESSAGE_HEAD_LEN);
memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);
int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, (struct sockaddr*)&_client_addr, _client_addrlen);
if (ret == -1) {
perror("sendto()..");
return -1;
}
return ret;
}
//注冊消息路由回調函數
void udp_server::add_msg_router(int msgid, msg_callback* cb, void *user_data)
{
_router.register_msg_router(msgid, cb, user_data);
}
udp_server::~udp_server()
{
_loop->del_io_event(_sockfd);
close(_sockfd);
}
```
? 這里面實現的方式和tcp_server的實現方式幾乎一樣,需要注意的是,udp的socket編程是不需要listen的,而且也不需要accept。所以recvfrom就能夠得知每個包的對應客戶端是誰,然后回執消息給對應的客戶端就可以。因為沒有連接,所以都是以包為單位來處理的,一個包一個包處理。可能相鄰的兩個包來自不同的客戶端。
### 12.2 udp_client客戶端功能實現
> lars_reactor/include/udp_client.h
```h
#pragma once
#include "net_connection.h"
#include "message.h"
#include "event_loop.h"
class udp_client: public net_connection
{
public:
udp_client(event_loop *loop, const char *ip, uint16_t port);
~udp_client();
void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL);
virtual int send_message(const char *data, int msglen, int msgid);
//處理消息
void do_read();
private:
int _sockfd;
char _read_buf[MESSAGE_LENGTH_LIMIT];
char _write_buf[MESSAGE_LENGTH_LIMIT];
//事件觸發
event_loop *_loop;
//消息路由分發
msg_router _router;
};
```
> lars_reactor/src/udp_client.cpp
```c
#include "udp_client.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <strings.h>
#include <string.h>
#include <stdio.h>
void read_callback(event_loop *loop, int fd, void *args)
{
udp_client *client = (udp_client*)args;
client->do_read();
}
udp_client::udp_client(event_loop *loop, const char *ip, uint16_t port)
{
//1 創建套接字
_sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
if (_sockfd == -1) {
perror("create socket error");
exit(1);
}
struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_aton(ip, &servaddr.sin_addr);
servaddr.sin_port = htons(port);
//2 鏈接
int ret = connect(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
if (ret == -1) {
perror("connect");
exit(1);
}
//3 添加讀事件
_loop = loop;
_loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
}
udp_client::~udp_client()
{
_loop->del_io_event(_sockfd);
close(_sockfd);
}
//處理消息
void udp_client::do_read()
{
while (true) {
int pkt_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, NULL, NULL);
if (pkt_len == -1) {
if (errno == EINTR) {
continue;
}
else if (errno == EAGAIN) {
break;
}
else {
perror("recvfrom()");
break;
}
}
//處理客戶端包
msg_head head;
memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkt_len) {
//報文格式有問題
fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkt_len = %d\n", head.msgid, head.msglen, pkt_len);
continue;
}
//調用注冊的路由業務
_router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
}
}
void udp_client::add_msg_router(int msgid, msg_callback *cb, void *user_data)
{
_router.register_msg_router(msgid, cb, user_data);
}
int udp_client::send_message(const char *data, int msglen, int msgid)
{
if (msglen > MESSAGE_LENGTH_LIMIT) {
fprintf(stderr, "too large message to send\n");
return -1;
}
msg_head head;
head.msglen = msglen;
head.msgid = msgid;
memcpy(_write_buf, &head, MESSAGE_HEAD_LEN);
memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);
int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);
if (ret == -1) {
perror("sendto()..");
return -1;
}
return ret;
}
```
? 客戶端和服務端代碼除了構造函數不同,其他基本差不多。接下來我們可以測試一下udp的通信功能
### 12.3 完成Lars Reactor V0.10開發
服務端
> server.cpp
```c
#include <string>
#include <string.h>
#include "config_file.h"
#include "udp_server.h"
//回顯業務的回調函數
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("callback_busi ...\n");
//直接回顯
conn->send_message(data, len, msgid);
}
int main()
{
event_loop loop;
//加載配置文件
config_file::setPath("./serv.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 8888);
printf("ip = %s, port = %d\n", ip.c_str(), port);
udp_server server(&loop, ip.c_str(), port);
//注冊消息業務路由
server.add_msg_router(1, callback_busi);
loop.event_process();
return 0;
}
```
客戶端
> client.cpp
```c
#include <stdio.h>
#include <string.h>
#include "udp_client.h"
//客戶端業務
void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
//得到服務端回執的數據
char *str = NULL;
str = (char*)malloc(len+1);
memset(str, 0, len+1);
memcpy(str, data, len);
printf("recv server: [%s]\n", str);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
int main()
{
event_loop loop;
//創建udp客戶端
udp_client client(&loop, "127.0.0.1", 7777);
//注冊消息路由業務
client.add_msg_router(1, busi);
//發消息
int msgid = 1;
const char *msg = "Hello Lars!";
client.send_message(msg, strlen(msg), msgid);
//開啟事件監聽
loop.event_process();
return 0;
}
```
啟動服務端和客戶端并允許,結果如下:
server
```bash
$ ./server
ip = 127.0.0.1, port = 7777
msg_router init...
server on 127.0.0.1:7777 is running...
add msg cb msgid = 1
call msgid = 1
call data = Hello Lars!
call msglen = 11
callback_busi ...
=======
```
client
```bash
$ ./client
msg_router init...
add msg cb msgid = 1
call msgid = 1
call data = Hello Lars!
call msglen = 11
recv server: [Hello Lars!]
msgid: [1]
len: [11]
=======
```
---
### 關于作者:
作者:`Aceld(劉丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld)

>**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
- 一、Lars系統概述
- 第1章-概述
- 第2章-項目目錄構建
- 二、Reactor模型服務器框架
- 第1章-項目結構與V0.1雛形
- 第2章-內存管理與Buffer封裝
- 第3章-事件觸發EventLoop
- 第4章-鏈接與消息封裝
- 第5章-Client客戶端模型
- 第6章-連接管理及限制
- 第7章-消息業務路由分發機制
- 第8章-鏈接創建/銷毀Hook機制
- 第9章-消息任務隊列與線程池
- 第10章-配置文件讀寫功能
- 第11章-udp服務與客戶端
- 第12章-數據傳輸協議protocol buffer
- 第13章-QPS性能測試
- 第14章-異步消息任務機制
- 第15章-鏈接屬性設置功能
- 三、Lars系統之DNSService
- 第1章-Lars-dns簡介
- 第2章-數據庫創建
- 第3章-項目目錄結構及環境構建
- 第4章-Route結構的定義
- 第5章-獲取Route信息
- 第6章-Route訂閱模式
- 第7章-Backend Thread實時監控
- 四、Lars系統之Report Service
- 第1章-項目概述-數據表及proto3協議定義
- 第2章-獲取report上報數據
- 第3章-存儲線程池及消息隊列
- 五、Lars系統之LoadBalance Agent
- 第1章-項目概述及構建
- 第2章-主模塊業務結構搭建
- 第3章-Report與Dns Client設計與實現
- 第4章-負載均衡模塊基礎設計
- 第5章-負載均衡獲取Host主機信息API
- 第6章-負載均衡上報Host主機信息API
- 第7章-過期窗口清理與過載超時(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-負載均衡獲取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能測試工具
- 第12章- Lars啟動工具腳本