Skip to content
ithewei edited this page Dec 11, 2021 · 1 revision

libhv教程00--目录

libhv是一个比libevent、libev、libuv更易用的跨平台国产网络库,用来开发TCP/UDP/SSL/HTTP/WebSocket 客户端/服务端。

项目地址:https://github.com/ithewei/libhv.git

码云镜像:https://gitee.com/libhv/libhv.git

QQ技术交流群:739352073

libhv博客专栏:https://hewei.blog.csdn.net/category_9866493.html

libhv源码分析:https://blog.csdn.net/qu1993/category_10637982.html

📚 教程目录

🍭 示例代码

c版本

c++版本

模拟实现著名的命令行工具

创作不易,如果你觉得不错,请在 githubstar下吧。


libhv教程01--介绍与体验

名称由来

libhv是一个类似于libevent、libev、libuv的跨平台网络库,提供了带非阻塞IO和定时器的事件循环。 libhv的名称也正是继承此派,寓意高性能的事件循环High-performance event loop library

libhv能干什么

  • 编写跨平台c/c++程序;
  • 基于TCP/UDP/SSL开发自定义协议网络程序;
  • 编写HTTP客户端/服务端程序;
  • 编写WebSocket客户端/服务端程序;
  • 学习实践网络编程;

libhv和libevent、libev、libuv有什么不同

  • libevent最为古老、有历史包袱,bufferevent虽为精妙,却也难以上手;
  • libev可以说是libevent的简化版,代码极为精简,但宏定义用的过多,代码可读性不强,且在Windows上实现不佳;
  • libuv是nodejs的c底层库,最先也是由libevent+对Windows IOCP支持,后来才改写自成一体,同时实现了管道、文件的异步读写,很强大,但结构体比较多,封装比较深;
  • libhv本身是参考了libevent、libev、libuv的实现思路,它们的核心都是事件循环(即在一个事件循环中处理IO、定时器等事件),但提供的接口最为精简,API接近原生系统调用,最容易上手;
  • 具体这几个库的写法比较见echo-servers,可见libhv是最简单的;
  • 此外libhv支持心跳、转发、拆包、多线程安全write和close等特性,提供了HTTP、WebSocket等协议实现;
  • 当然这几个库的性能是接近的,都将非阻塞IO多路复用用到了极致;

体验

linux与mac下的用户可直接执行./getting_started.sh脚本,即可体验使用libhv编写的http服务端httpd与http客户端curl的便利之处。

运行效果如下:

$ ./getting_started.sh
Welcome to libhv!
Press any key to run ...

bin/httpd -c etc/httpd.conf -s restart -d

httpd start/running
hw               83110   0.0  0.0  5100160    588   ??  S     4:14下午   0:00.00 httpd: worker process
hw               83109   0.0  0.0  4951680    580   ??  S     4:14下午   0:00.00 httpd: worker process
hw               83108   0.0  0.0  4820608    572   ??  S     4:14下午   0:00.00 httpd: worker process
hw               83107   0.0  0.0  4689536    600   ??  S     4:14下午   0:00.00 httpd: worker process
hw               83103   0.0  0.0  4950656    808   ??  Ss    4:14下午   0:00.00 httpd: master process

bin/curl -v localhost:8080

GET / HTTP/1.1
Accept: */*
Connection: keep-alive
Content-Length: 0
Host: localhost:8080
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36


HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 130
Content-Type: text/html
Date: Fri, 05 Feb 2021 08:14:14 GMT
Etag: "5fc1057e-82"
Last-Modified: Fri, 27 Nov 2020 13:56:14 GMT
Server: httpd/1.21.2.5

<!DOCTYPE html>
<html>
<head>
  <title>httpd</title>
</head>
<body>
  <center><h1>Welcome to httpd!</h1></center>
</body>
</html>

httpd与curl代码均可在examples目录下找到,是完整的命令行程序。

$ bin/httpd -h
Usage: httpd [hvc:ts:dp:]
Options:

  -h|--help                 Print this information
  -v|--version              Print version
  -c|--confile <confile>    Set configure file, default etc/{program}.conf
  -t|--test                 Test configure file and exit
  -s|--signal <signal>      Send <signal> to process,
                            <signal>=[start,stop,restart,status,reload]
  -d|--daemon               Daemonize
  -p|--port <port>          Set listen port
$ bin/curl -h
Usage: curl [hVvX:H:d:F:n:] url
Options:
    -h|--help           Print this message.
    -V|--version        Print version.
    -v|--verbose        Show verbose infomation.
    -X|--method         Set http method.
    -H|--header         Add http headers, -H "Content-Type:application/json Accept:*/*"
    -d|--data           Set http body.
    -F|--form           Set http form, -F "name1=content;name2=@filename"
    -n|--count          Send request count, used for test keep-alive
       --http2          Use http2
       --grpc           Use grpc over http2
Examples:
    curl -v localhost:8080
    curl -v localhost:8080/hello
    curl -v localhost:8080/query?page_no=1&page_size=10
    curl -v localhost:8080/echo  -d 'hello,world!'
    curl -v localhost:8080/kv    -H "Content-Type:application/x-www-form-urlencoded" -d 'user=admin&pswd=123456'
    curl -v localhost:8080/json  -H "Content-Type:application/json"                  -d '{"user":"admin","pswd":"123456"}'
    curl -v localhost:8080/form  -F 'file=@filename'

当然你也可以通过浏览器访问,地址栏输入:

  • 127.0.0.1:8080/
  • 127.0.0.1:8080/downloads/
  • 127.0.0.1:8080/ping /downloads/

libhv教程02--编译与安装

libhv提供了原生Makefile(这里仅指适用于类unix系统的Makefile)和cmake两种构建方式。

Makefile命令行

CLICommand Line Interface命令行界面。鄙人强烈推荐使用的一种,特别是对于服务端开发人员,必备技能。

对于类Unix系统平台来说,推荐使用Makefile三部曲

./configure
make
sudo make install

Windows平台编译libhv请使用cmake先生成VS工程,各平台具体编译步骤见BUILD.md

cmake命令行

cmake -B build
cmake --build build

cmake图形界面(照顾下小白)

libhv-cmake 一般使用默认配置即可,如需勾选WITH_OPENSSL,请先自行安装openssl

vcpkg

也可通过vcpkg安装:

vcpkg install libhv

注:vcpkg上的版本可能更新较慢,如需体验最新版还是推荐下载源码编译。

编译产物

头文件

类unix系统默认安装在/usr/local/include/hv目录下

.
├── Buffer.h                缓存类
├── Callback.h              回调定义
├── Channel.h               IO通道类
├── Event.h                 事件类
├── EventLoop.h             事件循环类
├── EventLoopThread.h       事件循环线程类
├── EventLoopThreadPool.h   事件循环线程池类
├── HttpMessage.h           HTTP消息类
├── HttpParser.h            HTTP解析类
├── HttpServer.h            HTTP服务类
├── HttpService.h           HTTP业务类
├── Status.h                状态类
├── TcpClient.h             TCP客户端类
├── TcpServer.h             TCP服务端类
├── ThreadLocalStorage.h    线程本地存储类
├── UdpClient.h             UDP客户端类
├── UdpServer.h             UDP服务端类
├── WebSocketChannel.h      WebSocket通道类
├── WebSocketClient.h       WebSocket客户端类
├── WebSocketParser.h       WebSocket解析类
├── WebSocketServer.h       WebSocket服务端类
├── base64.h                BASE64编解码
├── grpcdef.h               grpc定义
├── hatomic.h               原子操作
├── hbase.h                 基本函数
├── hbuf.h                  缓存buffer
├── hconfig.h               configure生成配置
├── hdef.h                  常见宏定义
├── hdir.h                  目录(ls实现)
├── hendian.h               大小端
├── herr.h                  错误码定义
├── hexport.h               DLL导出宏
├── hfile.h                 文件类
├── hlog.h                  日志
├── hloop.h                 事件循环
├── hmain.h                 命令行解析
├── hmath.h                 数学函数
├── hmutex.h                互斥锁
├── hobjectpool.h           对象池
├── hplatform.h             平台相关宏
├── hproc.h                 进程
├── hscope.h                作用域
├── hsocket.h               套接字
├── hssl.h                  SSL/TLS加密
├── hstring.h               字符串操作
├── hsysinfo.h              系统信息
├── hthread.h               线程操作
├── hthreadpool.h           线程池类
├── htime.h                 日期时间
├── http2def.h              http2定义
├── http_client.h           HTTP客户端
├── http_content.h          HTTP Content-Type
├── httpdef.h               http定义
├── hurl.h                  URL操作
├── hv.h                    hv总头文件
├── hversion.h              版本
├── ifconfig.h              ifconfig实现
├── iniparser.h             INI解析类
├── json.hpp                JSON解析
├── md5.h                   MD5数字摘要
├── nlog.h                  网络日志
├── nmap.h                  主机发现
├── requests.h              模拟python requests api
├── sha1.h                  SHA1安全散列算法
└── singleton.h             单例模式宏

库文件

  • 静态库libhv.alibhv_static.a
  • windows动态库hv.dll
  • linux动态库libhv.so
  • macosx动态库libhv.dylib

示例程序

├── hmain_test          命令行解析测试程序
├── hloop_test          事件循环测试程序
├── htimer_test         定时器测试程序
├── http_client_test    HTTP客户端测试程序
├── http_server_test    HTTP服务端测试程序
├── websocket_client_test    WebSocket客户端测试程序
├── websocket_server_test    WebSocket服务端测试程序
├── curl                HTTP客户端
├── httpd               HTTP服务端
├── nc                  网络客户端
├── nmap                主机发现
├── tcp_chat_server     TCP聊天服务
├── tcp_echo_server     TCP回显服务
├── tcp_proxy_server    TCP代理服务
└── udp_echo_server     UDP回显服务

另外,仓库通过 Github Actions 确保master分支在linux、windows、macosx三个平台编译通过,大家再也不用担心编译不过了。


libhv教程03--链库与使用

在上一篇中,我们已经生成了头文件与库文件,接下来我们写个测试程序链库验证下。

测试代码如下:

#include "hv/hv.h"

int main() {
    char exe_filepath[MAX_PATH] = {0};
    char run_dir[MAX_PATH] = {0};

    // 获取hv编译版本
    const char* version = hv_compile_version();

    // 获取可执行文件路径
    get_executable_path(exe_filepath, sizeof(exe_filepath));
    // 获取运行目录
    get_run_dir(run_dir, sizeof(run_dir));

    printf("exe_filepath=%s\n", exe_filepath);
    printf("run_dir=%s\n", run_dir);

    // 写日志
    LOGI("libhv version: %s", version);

    return 0;
}

编译运行:

$ cc -std=c99 test.c -o test -lhv
$ ./test
exe_filepath=/home/hw/github/libhv/test
run_dir=/home/hw/github/libhv
$ cat libhv*.log
2021-02-06 00:16:40.989 INFO  libhv version: 1.21.1.31 [test.c:19:main]

windows链库说明

cmake生成vs工程,打开hv.sln编译后会生成头文件include/hv、静态库lib/hv_static.lib和动态库lib/hv.dll,所以有动态库和静态库两种链库方式。

动态导入库hv.lib + 动态库hv.dll

工程 => 属性 => Linker => Input => Addtional Dependencies 加hv.lib 或代码里添加

#pragma comment(lib, "hv.lib")

静态库声明宏HV_STATICLIB + 静态库hv_static.lib

工程 => 属性 => c/c++ => 预处理器 => 预处理器定义中添加HV_STATICLIB预编译宏,以屏蔽hexport.h头文件中动态库导入宏

#define HV_EXPORT __declspec(dllimport)

工程 => 属性 => Linker => Input => Addtional Dependencies 加 hv_static.lib 或代码里添加

#pragma comment(lib, "hv_static.lib")

libhv教程04--编写一个完整的命令行程序

首先,一个完整的命令行程序应该包含哪些功能?

  • 命令行参数解析
  • 配置文件解析
  • 打印帮助信息和版本信息
  • 信号处理
  • 日志、pid文件
  • 如果是服务端长时间运行后台程序,还需要看门狗(崩溃自动重启)

看看libhv是如何提供这些功能的,参考示例代码见examples/hmain_test.cpp

编译运行:

$ c++ -std=c++11 examples/hmain_test.cpp -o bin/hmain_test -I/usr/local/include/hv -lhv

$ bin/hmain_test -h
Usage: hmain_test [hvc:ts:dp:]
Options:

  -h|--help                 Print this information
  -v|--version              Print version
  -c|--confile <confile>    Set configure file, default etc/{program}.conf
  -t|--test                 Test configure file and exit
  -s|--signal <signal>      Send <signal> to process,
                            <signal>=[start,stop,restart,status,reload]
  -d|--daemon               Daemonize
  -p|--port <port>          Set listen port

$ bin/hmain_test -v
hmain_test version 1.21.1.31

$ bin/hmain_test -c etc/hmain_test.conf -t
Test confile [etc/hmain_test.conf] OK!

$ bin/hmain_test -d

$ bin/hmain_test -s restart -d
hmain_test stop/waiting
hmain_test start/running

$ bin/hmain_test -s status
hmain_test start/running, pid=27766

$ cat logs/hmain_test.pid
27776

$ cat logs/hmain_test*.log
2021-02-06 12:18:53.509 INFO  hmain_test version: 1.21.1.31 [hmain_test.cpp:94:parse_confile]
2021-02-06 12:18:53.509 DEBUG worker_processes=ncpu=2 [hmain_test.cpp:103:parse_confile]
2021-02-06 12:18:53.509 INFO  parse_confile('/home/hw/github/libhv/etc/hmain_test.conf') OK [hmain_test.cpp:129:parse_confile]
2021-02-06 12:18:53.509 INFO  create_pidfile('/home/hw/github/libhv/logs/hmain_test.pid') pid=27766 [hmain.cpp:317:create_pidfile]
2021-02-06 12:18:53.509 INFO  workers[0] start/running, pid=27767 [hmain.cpp:611:master_workers_run]
2021-02-06 12:18:53.509 INFO  workers[1] start/running, pid=27768 [hmain.cpp:611:master_workers_run]
2021-02-06 12:18:53.509 INFO  master start/running, pid=27766 [hmain.cpp:614:master_workers_run]
2021-02-06 12:18:53.509 INFO  worker_thread pid=27767 tid=27767 [hmain.cpp:539:worker_thread]
2021-02-06 12:18:53.510 INFO  worker_thread pid=27768 tid=27768 [hmain.cpp:539:worker_thread]
2021-02-06 12:18:53.510 INFO  worker_thread pid=27767 tid=27769 [hmain.cpp:539:worker_thread]
2021-02-06 12:18:53.510 INFO  worker_thread pid=27768 tid=27770 [hmain.cpp:539:worker_thread]

$ ps aux | grep hmain_test
hw       27776  0.0  0.0  18000  2084 ?        Ss   12:20   0:00 hmain_test: master process
hw       27777  0.0  0.0  91732   240 ?        Sl   12:20   0:00 hmain_test: worker process
hw       27778  0.0  0.0  91732   240 ?        Sl   12:20   0:00 hmain_test: worker process

$ sudo kill -9 27778
$ ps aux | grep hmain_test
hw       27776  0.0  0.0  18000  2084 ?        Ss   12:20   0:00 hmain_test: master process
hw       27777  0.0  0.0  91732   240 ?        Sl   12:20   0:00 hmain_test: worker process
hw       27796  0.0  0.0  91732   244 ?        Sl   12:27   0:00 hmain_test: worker process

可以看到,hmain_test提供了打印帮助信息、打印版本信息、测试配置文件、后台运行、创建pid文件、查看进程状态、开始|停止|重启进程、master-workers多进程模式、崩溃自动重启等功能。

流程图:

st=>start: main
e=>end: 结束

main_ctx_init=>operation: main_ctx_init
main入口初始化
parse_opt=>operation: parse_opt
解析命令行参数
parse_confile=>operation: parse_confile
解析配置文件
hlog_set_xxx=>operation: hlog_set_xxx
日志设置
signal_init=>operation: signal_init
信号初始化
signal_handle=>operation: signal_handle
信号处理
daemon=>operation: daemon
后台运行
create_pidfile=>operation: create_pidfile
创建pid文件
master_workers_run=>operation: master_workers_run
扩展多进程|多线程模式
run=>operation: worker_fn
长时间运行...

st->main_ctx_init->parse_opt->parse_confile->hlog_set_xxx->signal_init->signal_handle->daemon->create_pidfile->master_workers_run->run

libhv教程05--事件循环以及定时器的简单使用

事件循环简介

很多同学不理解事件循环的概念,所以这里有必要前置说明一下。 对于大多数长时间运行程序来说,都会有主循环的存在。

如窗口界面程序,就是等待键盘、鼠标等外设的输入,界面做出相应的变化。 我们以windows窗口消息机制举例说明:

// windows窗口消息循环
MSG msg;
while (GetMessage(&msg, NULL, 0, 0)) {
	TranslateMessage(&msg);
	DispatchMessage(&msg);
}

此循环所在的线程我们称之为GUI线程(即窗口所在线程),MFC、WPF、Qt等界面框架不过是将此过程给封装了。

理解了窗口消息循环的存在,其实就不难理解windows下老生常谈的问题:SendMessagePostMessage的区别。 SendMessagePostMessage都是windows提供的用来向窗口线程发送消息的API。 不同之处是SendMessage是同步的,如果SendMessage调用线程和窗口线程位于同一线程,则直接调用窗口过程处理此消息;如果不是同一线程,则会阻塞等待窗口线程处理完此消息再返回。 PostMessage是异步的,将消息投递到窗口消息队列中就返回了,所以使用PostMessage传递参数时需要注意不能使用栈上的局部变量。

IO多路复用简介

GUI线程具有主循环,网络IO线程亦是如此。

我们都知道IO可分为阻塞BIO非阻塞NIO。 libhv的头文件hsocket.h中提供了跨平台的设置阻塞与非阻塞的方法:

#ifdef OS_WIN
static inline int blocking(int sockfd) {
    unsigned long nb = 0;
    return ioctlsocket(sockfd, FIONBIO, &nb);
}
static inline int nonblocking(int sockfd) {
    unsigned long nb = 1;
    return ioctlsocket(sockfd, FIONBIO, &nb);
}
#else
#define blocking(s)     fcntl(s, F_SETFL, fcntl(s, F_GETFL) & ~O_NONBLOCK)
#define nonblocking(s)  fcntl(s, F_SETFL, fcntl(s, F_GETFL) |  O_NONBLOCK)
#endif

对于BIO,伪代码如下:

while (1) {
    readbytes = read(fd, buf, len);
    if (readbytes <= 0) {
        close(fd);
        break;
    }
    ...
    writebytes = write(fd, buf, len);
    if (writebytes <= 0) {
        close(fd);
        break;
    }
}

因为读写都是阻塞的,所以一个IO线程只能处理一个fd,对于客户端尚可接受,对于服务端来说,每accept一个连接,就创建一个IO线程去读写这个套接字,并发达到几千就需要创建几千个线程,线程上下文的切换开销都会把系统占满。

所以IO多路复用机制应运而生,如最早期的select、后来的polllinuxepollwindowsiocpbsdkqueuesolarisport等,都属于IO多路复用机制。非阻塞NIO搭配IO多路复用机制就是高并发的钥匙

关于select、poll、epoll的区别,可自行百度,这里就不展开说了。仅以select为例,写下伪代码:

while (1) {
    int nselect = select(max_fd+1, &readfds, &writefds, &exceptfds, timeout);
    if (nselect == 0) continue;
    for (int fd = 0; fd <= max_fd; ++fd) {
    	// 可读
    	if (FD_ISSET(fd, &readfds)) {
    		...
    		read(fd, buf, len);
    	}
    	// 可写
    	if (FD_ISSET(fd, &writefds)) {
    		...
    		write(fd, buf, len);
    	}
    }
}

通过IO多路复用机制,一个IO线程就可以同时监听多个fd了,以现代计算机的性能,一个IO线程即可处理几十万数量级别的IO读写。

libhv下的event模块正是封装了多种平台的IO多路复用机制,提供了统一的事件接口,是libhv的核心模块。

libhv中的事件包括IO事件timer定时器事件idle空闲事件自定义事件(见hloop_post_event接口,作用类似于windows窗口消息机制的PostMessage)。

源码分析推荐群友qu1993的博客

使用libhv创建一个事件循环

c版本

#include "hv/hloop.h"

// 定时器回调函数
static void on_timer(htimer_t* timer) {
    printf("time=%lus\n", (unsigned long)time(NULL));
}

int main() {
    // 新建一个事件循环结构体
    hloop_t* loop = hloop_new(0);

    // 添加一个定时器
    htimer_add(loop, on_timer, 1000, INFINITE);

    // 运行事件循环
    hloop_run(loop);

    // 释放事件循环结构体
    hloop_free(&loop);
    return 0;
}

事件循环测试代码examples/hloop_test.c 定时器测试代码见examples/htimer_test.c

c++版本

#include "hv/EventLoop.h"

using namespace hv;

int main() {
    // 新建一个事件循环对象
    EventLoopPtr loop(new EventLoop);

    // 设置一个定时器
    loop->setInterval(1000, [](TimerID timerID){
        printf("time=%lus\n", (unsigned long)time(NULL));
    });

    // 运行事件循环
    loop->run();

    return 0;
}

evpp模块被设计成只包含头文件,不参与编译。 hloop.h中的c接口被封装成了c++的类,参考了muduo和evpp。 类设计如下:

├── Buffer.h                缓存类
├── Channel.h               通道类,封装了hio_t
├── Event.h                 事件类,封装了hevent_t、htimer_t
├── EventLoop.h             事件循环类,封装了hloop_t
├── EventLoopThread.h       事件循环线程类,组合了EventLoop和thread
├── EventLoopThreadPool.h   事件循环线程池类,组合了EventLoop和ThreadPool
├── TcpClient.h             TCP客户端类
├── TcpServer.h             TCP服务端类
├── UdpClient.h             UDP客户端类
└── UdpServer.h             UDP服务端类

示例代码位于evpp目录下

多说两句:

  • EventLoop中实现了muduo有的两个接口,runInLoopqueueInLoop,我觉得命名不错,也直接采用了。runInLoop对应SendMessagequeueInLoop对应PostMessage,这么解释大家是不是更理解文章开头的铺垫了;
  • EventLoopThreadPool的核心思想就是one loop per thread;

libhv教程06--创建一个简单的TCP服务端

下文以TCP echo server为例,使用libhv创建TCP服务端。

c版本

#include "hv/hloop.h"

void on_close(hio_t* io) {
}

void on_recv(hio_t* io, void* buf, int readbytes) {
	// 回显数据
    hio_write(io, buf, readbytes);
}

void on_accept(hio_t* io) {
	// 设置close回调
    hio_setcb_close(io, on_close);
    // 设置read回调
    hio_setcb_read(io, on_recv);
    // 开始读
    hio_read(io);
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: cmd port\n");
        return -10;
    }
    int port = atoi(argv[1]);

    // 创建事件循环
    hloop_t* loop = hloop_new(0);
    // 创建TCP服务
    hio_t* listenio = hloop_create_tcp_server(loop, "0.0.0.0", port, on_accept);
    if (listenio == NULL) {
        return -20;
    }
    // 运行事件循环
    hloop_run(loop);
    // 释放事件循环
    hloop_free(&loop);
    return 0;
}

编译运行:

$ cc examples/tcp_echo_server.c -o bin/tcp_echo_server -I/usr/local/include/hv -lhv
$ bin/tcp_echo_server 1234

类unix系统可使用nc作为客户端测试:

$ nc 127.0.0.1 1234
< hello
> hello

windows端可使用telnet作为客户端测试:

$ telent 127.0.0.1 1234

更多TCP服务端示例参考:

c++版本

代码示例参考evpp/TcpServer_test.cpp

#include "hv/TcpServer.h"

using namespace hv;

int main(int argc, char* argv[]) {
    if (argc < 2) {
        printf("Usage: %s port\n", argv[0]);
        return -10;
    }
    int port = atoi(argv[1]);

    TcpServer srv;
    int listenfd = srv.createsocket(port);
    if (listenfd < 0) {
        return -20;
    }
    printf("server listen on port %d, listenfd=%d ...\n", port, listenfd);
    srv.onConnection = [](const SocketChannelPtr& channel) {
        std::string peeraddr = channel->peeraddr();
        if (channel->isConnected()) {
            printf("%s connected! connfd=%d\n", peeraddr.c_str(), channel->fd());
        } else {
            printf("%s disconnected! connfd=%d\n", peeraddr.c_str(), channel->fd());
        }
    };
    srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
        // echo
        printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
        channel->write(buf);
    };
    srv.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
        printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
    };
    srv.setThreadNum(4);
    srv.start();

    while (1) sleep(1);
    return 0;
}

编译运行:

$ c++ -std=c++11 evpp/TcpServer_test.cpp -o bin/TcpServer_test -I/usr/local/include/hv -lhv
$ bin/TcpServer_test 5678

TcpServer更多实用接口

  • setThreadNum:设置IO线程数
  • setMaxConnectionNum:设置最大连接数
  • setUnpack:设置拆包
  • withTLS:SSL/TLS加密通信

libhv教程07--创建一个简单的TCP客户端

c版本

#include "hv/hloop.h"
#include "hv/htime.h"

void on_timer(htimer_t* timer) {
    char str[DATETIME_FMT_BUFLEN] = {0};
    datetime_t dt = datetime_now();
    datetime_fmt(&dt, str);

    printf("> %s\n", str);
    // 获取userdata
    hio_t* io = (hio_t*)hevent_userdata(timer);
    // 发送当前时间字符串
    hio_write(io, str, strlen(str));
}

void on_close(hio_t* io) {
}

void on_recv(hio_t* io, void* buf, int readbytes) {
    printf("< %.*s\n", readbytes, (char*)buf);
}

void on_connect(hio_t* io) {
	// 设置close回调
    hio_setcb_close(io, on_close);
    // 设置read回调
    hio_setcb_read(io, on_recv);
    // 开始读
    hio_read(io);

	// 添加一个定时器
    htimer_t* timer = htimer_add(hevent_loop(io), on_timer, 1000, INFINITE);
    // 设置userdata
    hevent_set_userdata(timer, io);
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: cmd port\n");
        return -10;
    }
    int port = atoi(argv[1]);

	// 创建事件循环
    hloop_t* loop = hloop_new(0);
    // 创建TCP客户端
    hio_t* listenio = hloop_create_tcp_client(loop, "127.0.0.1", port, on_connect);
    if (listenio == NULL) {
        return -20;
    }
    // 运行事件循环
    hloop_run(loop);
    // 释放事件循环
    hloop_free(&loop);
    return 0;
}

完整TCP/UDP客户端程序可参考examples/nc.c

c++版本

示例代码见:evpp/TcpClient_test.cpp

#include "hv/TcpClient.h"
#include "hv/htime.h"

using namespace hv;

int main(int argc, char* argv[]) {
    if (argc < 2) {
        printf("Usage: %s port\n", argv[0]);
        return -10;
    }
    int port = atoi(argv[1]);

    TcpClient cli;
    int connfd = cli.createsocket(port);
    if (connfd < 0) {
        return -20;
    }
    printf("client connect to port %d, connfd=%d ...\n", port, connfd);
    cli.onConnection = [](const SocketChannelPtr& channel) {
        std::string peeraddr = channel->peeraddr();
        if (channel->isConnected()) {
            printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
            // send(time) every 3s
            setInterval(3000, [channel](TimerID timerID){
                if (channel->isConnected()) {
                    char str[DATETIME_FMT_BUFLEN] = {0};
                    datetime_t dt = datetime_now();
                    datetime_fmt(&dt, str);
                    channel->write(str);
                } else {
                    killTimer(timerID);
                }
            });
        } else {
            printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
        }
    };
    cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
        printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
    };
    cli.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
        printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
    };
    // reconnect: 1,2,4,8,10,10,10...
    ReconnectInfo reconn;
    reconn.min_delay = 1000;
    reconn.max_delay = 10000;
    reconn.delay_policy = 2;
    cli.setReconnect(&reconn);
    cli.start();

    while (1) sleep(1);
    return 0;
}

TcpClient更多实用接口

  • setConnectTimeout:设置连接超时
  • setReconnect:设置重连
  • setUnpack: 设置拆包
  • withTLS:SSL/TLS加密通信

libhv教程08--创建一个简单的UDP服务端

下文以UDP echo server为例,使用libhv创建UDP服务端。

c版本

代码示例参考examples/udp_echo_server.c

#include "hv/hloop.h"
#include "hv/hsocket.h"

static void on_recvfrom(hio_t* io, void* buf, int readbytes) {
    printf("on_recvfrom fd=%d readbytes=%d\n", hio_fd(io), readbytes);
    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("[%s] <=> [%s]\n",
            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
    printf("< %.*s", readbytes, (char*)buf);
    // 回显数据
    printf("> %.*s", readbytes, (char*)buf);
    hio_write(io, buf, readbytes);
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: %s port\n", argv[0]);
        return -10;
    }
    int port = atoi(argv[1]);

	// 创建事件循环
    hloop_t* loop = hloop_new(0);
    // 创建UDP服务
    hio_t* io = hloop_create_udp_server(loop, "0.0.0.0", port);
    if (io == NULL) {
        return -20;
    }
    // 设置read回调
    hio_setcb_read(io, on_recvfrom);
    // 开始读
    hio_read(io);
    // 运行事件循环
    hloop_run(loop);
    // 释放事件循环
    hloop_free(&loop);
    return 0;
}

编译运行:

$ cc examples/udp_echo_server.c -o bin/udp_echo_server -I/usr/local/include/hv -lhv
$ bin/udp_echo_server 1234

可使用nc作为客户端测试:

$ nc -u 127.0.0.1 1234
< hello
> hello

c++版本

代码示例参考evpp/UdpServer_test.cpp

#include "hv/UdpServer.h"

using namespace hv;

int main(int argc, char* argv[]) {
    if (argc < 2) {
        printf("Usage: %s port\n", argv[0]);
        return -10;
    }
    int port = atoi(argv[1]);

    UdpServer srv;
    int bindfd = srv.createsocket(port);
    if (bindfd < 0) {
        return -20;
    }
    printf("server bind on port %d, bindfd=%d ...\n", port, bindfd);
    srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
        // echo
        printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
        channel->write(buf);
    };
    srv.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
        printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
    };
    srv.start();

    while (1) sleep(1);
    return 0;
}

编译运行:

$ c++ -std=c++11 evpp/UdpServer_test.cpp -o bin/UdpServer_test -I/usr/local/include/hv -lhv
$ bin/UdpServer_test 5678

libhv教程09--创建一个简单的UDP客户端

c版本

#include "hv/hloop.h"
#include "hv/htime.h"

void on_timer(htimer_t* timer) {
    char str[DATETIME_FMT_BUFLEN] = {0};
    datetime_t dt = datetime_now();
    datetime_fmt(&dt, str);

    printf("> %s\n", str);
    // 获取userdata
    hio_t* io = (hio_t*)hevent_userdata(timer);
    // 发送时间字符串
    hio_write(io, str, strlen(str));
}

void on_recvfrom(hio_t* io, void* buf, int readbytes) {
    printf("< %.*s\n", readbytes, (char*)buf);
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: cmd port\n");
        return -10;
    }
    int port = atoi(argv[1]);

	// 创建事件循环
    hloop_t* loop = hloop_new(0);
    // 创建UDP客户端
    hio_t* io = hloop_create_udp_client(loop, "127.0.0.1", port);
    if (io == NULL) {
        return -20;
    }
    // 设置read回调
    hio_setcb_read(io, on_recvfrom);
    // 开始读
    hio_read(io);
    // 添加一个定时器
    htimer_t* timer = htimer_add(hevent_loop(io), on_timer, 1000, INFINITE);
    // 设置userdata
    hevent_set_userdata(timer, io);
	// 运行事件循环
    hloop_run(loop);
    // 释放事件循环
    hloop_free(&loop);
    return 0;
}

完整TCP/UDP客户端程序可参考examples/nc.c

c++版本

示例代码见:evpp/UdpClient_test.cpp

#include "hv/UdpClient.h"
#include "hv/htime.h"

using namespace hv;

int main(int argc, char* argv[]) {
    if (argc < 2) {
        printf("Usage: %s port\n", argv[0]);
        return -10;
    }
    int port = atoi(argv[1]);

    UdpClient cli;
    int sockfd = cli.createsocket(port);
    if (sockfd < 0) {
        return -20;
    }
    printf("client sendto port %d, sockfd=%d ...\n", port, sockfd);
    cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
        printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
    };
    cli.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
        printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
    };
    cli.start();

    // sendto(time) every 3s
    cli.loop()->setInterval(3000, [&cli](TimerID timerID) {
        char str[DATETIME_FMT_BUFLEN] = {0};
        datetime_t dt = datetime_now();
        datetime_fmt(&dt, str);
        cli.sendto(str);
    });

    while (1) sleep(1);
    return 0;
}

libhv教程10--创建一个简单的HTTP服务端

HTTP协议作为本世纪最通用的应用层协议,本文就不加以介绍了,不熟悉的自行阅读awesome-http

简单的HTTP服务端示例

示例代码参考examples/http_server_test.cpp

#include "hv/HttpServer.h"

int main() {
    HttpService router;
    router.GET("/ping", [](HttpRequest* req, HttpResponse* resp) {
        return resp->String("pong");
    });

    router.GET("/data", [](HttpRequest* req, HttpResponse* resp) {
        static char data[] = "0123456789";
        return resp->Data(data, 10);
    });

    router.GET("/paths", [&router](HttpRequest* req, HttpResponse* resp) {
        return resp->Json(router.Paths());
    });

	router.POST("/echo", [](const HttpContextPtr& ctx) {
		return ctx->send(ctx->body(), ctx->type());
	});

    http_server_t server;
    server.port = 8080;
    server.service = &router;
    http_server_run(&server);
    return 0;
}

编译运行:

c++ -std=c++11 examples/http_server_test.cpp -o bin/http_server_test -lhv
bin/http_server_test

测试使用curl或浏览器输入以下url

curl -v http://127.0.0.1:8080/ping
curl -v http://127.0.0.1:8080/data
curl -v http://127.0.0.1:8080/paths
curl -v http://127.0.0.1:8080/echo -d "hello,world"

完整的HTTP服务端示例

完整的http服务端示例代码参考examples/httpd 测试步骤:

git clone https://github.com/ithewei/libhv.git
cd libhv
make httpd curl

bin/httpd -h
bin/httpd -d
#bin/httpd -c etc/httpd.conf -s restart -d
ps aux | grep httpd

# http web service
bin/curl -v localhost:8080

# http indexof service
bin/curl -v localhost:8080/downloads/

# http api service
bin/curl -v localhost:8080/ping
bin/curl -v localhost:8080/echo -d "hello,world!"
bin/curl -v localhost:8080/query?page_no=1\&page_size=10
bin/curl -v localhost:8080/kv   -H "Content-Type:application/x-www-form-urlencoded" -d 'user=admin&pswd=123456'
bin/curl -v localhost:8080/json -H "Content-Type:application/json" -d '{"user":"admin","pswd":"123456"}'
bin/curl -v localhost:8080/form -F "user=admin pswd=123456"
bin/curl -v localhost:8080/upload -F "file=@LICENSE"

bin/curl -v localhost:8080/test -H "Content-Type:application/x-www-form-urlencoded" -d 'bool=1&int=123&float=3.14&string=hello'
bin/curl -v localhost:8080/test -H "Content-Type:application/json" -d '{"bool":true,"int":123,"float":3.14,"string":"hello"}'
bin/curl -v localhost:8080/test -F 'bool=1 int=123 float=3.14 string=hello'
# RESTful API: /group/:group_name/user/:user_id
bin/curl -v -X DELETE localhost:8080/group/test/user/123

压力测试

使用apacheab、或者wrk都可以用来做压力测试,一般服务器单线程QPS可轻松达到3w

# sudo apt install apache2-utils
ab -c 100 -n 100000 http://127.0.0.1:8080/

# sudo apt install wrk
wrk -c 100 -t 4 -d 10s http://127.0.0.1:8080/

更多HTTP压力测试工具参考awesome-http-benchmark


libhv教程11--创建一个简单的HTTP客户端

简单的同步HTTP客户端示例

同步http客户端接口模拟实现了pythonrequests

#include "requests.h"

int main() {
    auto resp = requests::get("http://www.example.com");
    if (resp == NULL) {
        printf("request failed!\n");
    } else {
        printf("%d %s\r\n", resp->status_code, resp->status_message());
        printf("%s\n", resp->body.c_str());
    }

    resp = requests::post("127.0.0.1:8080/echo", "hello,world!");
    if (resp == NULL) {
        printf("request failed!\n");
    } else {
        printf("%d %s\r\n", resp->status_code, resp->status_message());
        printf("%s\n", resp->body.c_str());
    }

    return 0;
}

简单的异步HTTP客户端示例

示例代码参考examples/http_client_test.cpp

#include "requests.h"

#include "hthread.h" // import hv_gettid

static void test_http_async_client(int* finished) {
    printf("test_http_async_client request thread tid=%ld\n", hv_gettid());
    HttpRequestPtr req(new HttpRequest);
    req->method = HTTP_POST;
    req->url = "127.0.0.1:8080/echo";
    req->headers["Connection"] = "keep-alive";
    req->body = "this is an async request.";
    req->timeout = 10;
    http_client_send_async(req, [finished](const HttpResponsePtr& resp) {
        printf("test_http_async_client response thread tid=%ld\n", hv_gettid());
        if (resp == NULL) {
            printf("request failed!\n");
        } else {
            printf("%d %s\r\n", resp->status_code, resp->status_message());
            printf("%s\n", resp->body.c_str());
        }
        *finished = 1;
    });
}

int main() {
    int finished = 0;
    test_http_async_client(&finished);

    // demo wait async ResponseCallback
    while (!finished) {
        hv_delay(100);
    }
    printf("finished!\n");

    return 0;
}

完整的HTTP客户端示例

完整的http客户端示例代码参考examples/curl.cpp,模拟实现了curl命令行程序。

测试步骤:

git clone https://github.com/ithewei/libhv.git
cd libhv
make httpd curl

bin/httpd -h
bin/httpd -d
#bin/httpd -c etc/httpd.conf -s restart -d
ps aux | grep httpd

# http web service
bin/curl -v localhost:8080

# http indexof service
bin/curl -v localhost:8080/downloads/

# http api service
bin/curl -v localhost:8080/ping
bin/curl -v localhost:8080/echo -d "hello,world!"
bin/curl -v localhost:8080/query?page_no=1\&page_size=10
bin/curl -v localhost:8080/kv   -H "Content-Type:application/x-www-form-urlencoded" -d 'user=admin&pswd=123456'
bin/curl -v localhost:8080/json -H "Content-Type:application/json" -d '{"user":"admin","pswd":"123456"}'
bin/curl -v localhost:8080/form -F "user=admin pswd=123456"
bin/curl -v localhost:8080/upload -F "file=@LICENSE"

bin/curl -v localhost:8080/test -H "Content-Type:application/x-www-form-urlencoded" -d 'bool=1&int=123&float=3.14&string=hello'
bin/curl -v localhost:8080/test -H "Content-Type:application/json" -d '{"bool":true,"int":123,"float":3.14,"string":"hello"}'
bin/curl -v localhost:8080/test -F 'bool=1 int=123 float=3.14 string=hello'
# RESTful API: /group/:group_name/user/:user_id
bin/curl -v -X DELETE localhost:8080/group/test/user/123

更多HTTP消息使用方式请阅读头文件HttpMessage.h


libhv教程12--创建一个简单的WebSocket服务端

示例代码参考examples/websocket_server_test.cpp

#include "WebSocketServer.h"
#include "EventLoop.h"
#include "htime.h"

using namespace hv;

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: %s port\n", argv[0]);
        return -10;
    }
    int port = atoi(argv[1]);

    WebSocketServerCallbacks ws;
    ws.onopen = [](const WebSocketChannelPtr& channel, const std::string& url) {
        printf("onopen: GET %s\n", url.c_str());
        // send(time) every 1s
        setInterval(1000, [channel](TimerID id) {
            if (channel->isConnected()) {
                char str[DATETIME_FMT_BUFLEN] = {0};
                datetime_t dt = datetime_now();
                datetime_fmt(&dt, str);
                channel->send(str);
            } else {
                killTimer(id);
            }
        });
    };
    ws.onmessage = [](const WebSocketChannelPtr& channel, const std::string& msg) {
        printf("onmessage: %s\n", msg.c_str());
    };
    ws.onclose = [](const WebSocketChannelPtr& channel) {
        printf("onclose\n");
    };

    websocket_server_t server;
    server.port = port;
    server.ws = &ws;
    websocket_server_run(&server);
    return 0;
}

编译运行:

c++ -std=c++11 examples/websocket_server_test.cpp -o bin/websocket_server_test -I/usr/local/include/hv -lhv
bin/websocket_server_test 8888

测试客户端可使用html/websocket_client.html 或者websocket在线测试


libhv教程13--创建一个简单的WebSocket客户端

WebSocket简介

WebSocket 产生背景

在 WebSocket 协议出现以前,创建一个和服务端进行双通道通信的 web 应用,需要依赖HTTP协议进行不停的轮询,这会导致一些问题:

  • 服务端被迫维持来自每个客户端的大量不同的连接
  • 大量的轮询请求会造成高开销,比如会带上多余的header,造成了无用的数据传输

所以,为了解决这些问题,WebSocket 协议应运而生。

WebSocket 的定义

WebSocket 是一种在单个TCP连接上进行全双工通信的协议。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接, 并进行双向数据传输。

WebSocket 握手过程

客户端请求

GET / HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13

服务器回应

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com/

WebSocket 通信协议

WebSocket 通信协议本文居于篇幅,就不展开说明,感兴趣的推荐阅读下面这篇博文:

WebSocket协议:5分钟从入门到精通

示例代码

js示例代码

html/websocket_client.html

function WebSocketTest(url) {
    var ws = new WebSocket(url);
    
    ws.onopen = function() {
        alert("连接已建立");
        ws.send("hello");
    };

    ws.onmessage = function(ev) {
        var received_msg = ev.data;
        console.log("received websocket message: " + received_msg);
    };

    ws.onclose = function() {
        alert("连接已关闭");
    };
}

c++示例代码

libhv提供的WebSocketClient类使用起来与JS的WebSocket一样简单。

示例代码见 examples/websocket_client_test.cpp

#include "WebSocketClient.h"

using namespace hv;

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: %s url\n", argv[0]);
        return -10;
    }
    const char* url = argv[1];

    WebSocketClient ws;
    ws.onopen = [&ws]() {
        printf("onopen\n");
        ws.send("hello");
    };
    ws.onclose = []() {
        printf("onclose\n");
    };
    ws.onmessage = [](const std::string& msg) {
        printf("onmessage: %s\n", msg.c_str());
    };

    // reconnect: 1,2,4,8,10,10,10...
    ReconnectInfo reconn;
    reconn.min_delay = 1000;
    reconn.max_delay = 10000;
    reconn.delay_policy = 2;
    ws.setReconnect(&reconn);

    ws.open(url);

    while (1) hv_delay(1000);
    return 0;
}

编译运行:

c++ -std=c++11 examples/websocket_client_test.cpp -o bin/websocket_client_test -I/usr/local/include/hv -lhv
bin/websocket_client_test ws://127.0.0.1:8888/

libhv教程14--200行实现一个纯C版jsonrpc框架

使用libhv可以在200行内实现一个完整的jsonrpc框架,这得益于libhv新提供的一个接口 hio_set_unpack设置拆包规则,支持固定包长、分隔符、头部长度字段三种常见的拆包方式,调用该接口设置拆包规则后,内部会根据拆包规则处理粘包与分包,保证回调上来的是完整的一包数据,大大节省了上层处理粘包与分包的成本,该接口具体定义如下:

typedef enum {
    UNPACK_BY_FIXED_LENGTH  = 1,    // 根据固定长度拆包
    UNPACK_BY_DELIMITER     = 2,	// 根据分隔符拆包,如常见的“\r\n”
    UNPACK_BY_LENGTH_FIELD  = 3,    // 根据头部长度字段拆包
} unpack_mode_e;

#define DEFAULT_PACKAGE_MAX_LENGTH  (1 << 21)   // 2M

// UNPACK_BY_DELIMITER
#define PACKAGE_MAX_DELIMITER_BYTES 8

// UNPACK_BY_LENGTH_FIELD
typedef enum {
    ENCODE_BY_VARINT        = 1,				// varint编码
    ENCODE_BY_LITTEL_ENDIAN = LITTLE_ENDIAN,    // 小端编码
    ENCODE_BY_BIG_ENDIAN    = BIG_ENDIAN,       // 大端编码
} unpack_coding_e;

typedef struct unpack_setting_s {
    unpack_mode_e   mode; // 拆包模式
    unsigned int    package_max_length; // 最大包长度限制
    // UNPACK_BY_FIXED_LENGTH
    unsigned int    fixed_length; // 固定包长度
    // UNPACK_BY_DELIMITER
    unsigned char   delimiter[PACKAGE_MAX_DELIMITER_BYTES]; // 分隔符
    unsigned short  delimiter_bytes; // 分隔符长度
    // UNPACK_BY_LENGTH_FIELD
    unsigned short  body_offset; // body偏移量(即头部长度)real_body_offset = body_offset + varint_bytes - length_field_bytes
    unsigned short  length_field_offset; // 头部长度字段偏移量
    unsigned short  length_field_bytes; // 头部长度字段所占字节数
    unpack_coding_e length_field_coding; // 头部长度字段编码方式,支持varint、大小端三种编码方式,通常使用大端字节序(即网络字节序)
#ifdef __cplusplus
    unpack_setting_s() {
        // Recommended setting:
        // head = flags:1byte + length:4bytes = 5bytes
        mode = UNPACK_BY_LENGTH_FIELD;
        package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
        fixed_length = 0;
        delimiter_bytes = 0;
        body_offset = 5;
        length_field_offset = 1;
        length_field_bytes = 4;
        length_field_coding = ENCODE_BY_BIG_ENDIAN;
    }
#endif
} unpack_setting_t;

HV_EXPORT void hio_set_unpack(hio_t* io, unpack_setting_t* setting);

ftp为例(分隔符方式)可以这样设置:

unpack_setting_t ftp_unpack_setting;
memset(&ftp_unpack_setting, 0, sizeof(unpack_setting_t));
ftp_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
ftp_unpack_setting.mode = UNPACK_BY_DELIMITER;
ftp_unpack_setting.delimiter[0] = '\r';
ftp_unpack_setting.delimiter[1] = '\n';
ftp_unpack_setting.delimiter_bytes = 2;

mqtt为例(头部长度字段方式)可以这样设置:

unpack_setting_t mqtt_unpack_setting = {
    .mode = UNPACK_BY_LENGTH_FIELD,
    .package_max_length = DEFAULT_PACKAGE_MAX_LENGTH,
    .body_offset = 2,
    .length_field_offset = 1,
    .length_field_bytes = 1,
    .length_field_coding = ENCODE_BY_VARINT,
};

具体实现代码在event/unpack.c中,在内部readbuf的基础上直接原地拆包与组包,基本做到零拷贝,比抛给上层处理更高效,感兴趣的可以研究一下。

示例代码

examples/jsonrpc

#include "hloop.h"
#include "hbase.h"
#include "hsocket.h"

#include "jsonrpc.h"
#include "cJSON.h"
#include "router.h"
#include "handler.h"

// hloop_create_tcp_server -> on_accept -> hio_read -> on_recv -> hio_write

static unpack_setting_t jsonrpc_unpack_setting;

jsonrpc_router router[] = {
    {"add", do_add},
    {"sub", do_sub},
    {"mul", do_mul},
    {"div", do_div},
};
#define JSONRPC_ROUTER_NUM  (sizeof(router)/sizeof(router[0]))

static void on_close(hio_t* io) {
    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}

static void on_recv(hio_t* io, void* readbuf, int readbytes) {
    // unpack -> cJSON_Parse -> router -> cJSON_Print -> pack -> hio_write
    // unpack
    jsonrpc_message msg;
    memset(&msg, 0, sizeof(msg));
    int packlen = jsonrpc_unpack(&msg, readbuf, readbytes);
    if (packlen < 0) {
        printf("jsonrpc_unpack failed!\n");
        return;
    }
    assert(packlen == readbytes);

    // cJSON_Parse
    printf("> %.*s\n", msg.head.length, msg.body);
    cJSON* jreq = cJSON_ParseWithLength(msg.body, msg.head.length);
    cJSON* jres = cJSON_CreateObject();
    cJSON* jmethod = cJSON_GetObjectItem(jreq, "method");
    if (!jmethod || !cJSON_IsString(jmethod)) {
        bad_request(jreq, jres);
    } else {
        // router
        char* method = cJSON_GetStringValue(jmethod);
        bool found = false;
        for (int i = 0; i < JSONRPC_ROUTER_NUM; ++i) {
            if (strcmp(method, router[i].method) == 0) {
                found = true;
                router[i].handler(jreq, jres);
                break;
            }
        }
        if (!found) {
            not_found(jreq, jres);
        }
    }

    // cJSON_Print
    memset(&msg, 0, sizeof(msg));
    msg.body = cJSON_PrintUnformatted(jres);
    msg.head.length = strlen(msg.body);
    printf("< %.*s\n", msg.head.length, msg.body);

    // pack
    packlen = jsonrpc_package_length(&msg.head);
    unsigned char* writebuf = NULL;
    HV_ALLOC(writebuf, packlen);
    packlen = jsonrpc_pack(&msg, writebuf, packlen);
    if (packlen > 0) {
        hio_write(io, writebuf, packlen);
    }

    cJSON_Delete(jreq);
    cJSON_Delete(jres);
    cJSON_free((void*)msg.body);
    HV_FREE(writebuf);
}

static void on_accept(hio_t* io) {
    printf("on_accept connfd=%d\n", hio_fd(io));

    hio_setcb_close(io, on_close);
    hio_setcb_read(io, on_recv);
    hio_set_unpack(io, &jsonrpc_unpack_setting);
    hio_read(io);
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: %s port\n", argv[0]);
        return -10;
    }
    int port = atoi(argv[1]);

    // init jsonrpc_unpack_setting
    memset(&jsonrpc_unpack_setting, 0, sizeof(unpack_setting_t));
    jsonrpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
    jsonrpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
    jsonrpc_unpack_setting.body_offset = JSONRPC_HEAD_LENGTH;
    jsonrpc_unpack_setting.length_field_offset = 1;
    jsonrpc_unpack_setting.length_field_bytes = 4;
    jsonrpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;

    hloop_t* loop = hloop_new(0);
    hio_t* listenio = hloop_create_tcp_server(loop, "0.0.0.0", port, on_accept);
    if (listenio == NULL) {
        return -20;
    }
    printf("listenfd=%d\n", hio_fd(listenio));
    hloop_run(loop);
    hloop_free(&loop);
    return 0;
}

关键函数

  • hloop_new:创建事件循环
  • hloop_run: 运行事件循环
  • hloop_create_tcp_server:创建TCP服务
  • hio_set_unpack:设置拆包规则
  • hio_read:开始接收数据
  • hio_write: 发送数据
  • jsonrpc_unpack:拆包
  • jsonrpc_pack:组包
  • cJSON_xxx:json编解码

测试步骤

git clone https://github.com/ithewei/libhv
cd libhv
make jsonrpc
# mkdir build && cd build && cmake .. && cmake --build . --target jsonrpc
bin/jsonrpc_server 1234
bin/jsonrpc_client 127.0.0.1 1234 add 1 2
bin/jsonrpc_client 127.0.0.1 1234 div 1 0
bin/jsonrpc_client 127.0.0.1 1234 xyz 1 2

结果如下: 服务端:

$ bin/jsonrpc_server 1234
listenfd=4
on_accept connfd=7
> {"id":1,"method":"add","params":[1,2]}
< {"id":1,"result":3}
on_close fd=7 error=0

客户端:

$ bin/jsonrpc_client 127.0.0.1 1234 add 1 2
on_connect fd=4
> {"id":1,"method":"add","params":[1,2]}
< {"id":1,"result":3}
on_close fd=4 error=0
$ bin/jsonrpc_client 127.0.0.1 1234 div 1 0
on_connect fd=4
> {"id":1,"method":"div","params":[1,0]}
< {"id":1,"error":{"code":400,"message":"Bad Request"}}
on_close fd=4 error=0
$ bin/jsonrpc_client 127.0.0.1 1234 xyz 1 2
on_connect fd=4
> {"id":1,"method":"xyz","params":[1,2]}
< {"id":1,"error":{"code":404,"message":"Not Found"}}
on_close fd=4 error=0

libhv教程15--200行实现一个C++版protorpc框架

在上篇教程中,我们200行实现了一个纯C版的jsonrpc框架,使用的event模块+cJSON实现,本篇中我们将介绍200行实现一个C++版的protorpc框架,使用evpp模块+protobuf实现。

evpp模块是event模块的c++封装,具体介绍见evpp/README.md

protobuf是google出品的序列化/反序列化结构化数据存储格式,具体介绍可参考我的另一篇博客protobuf,也可参考protobuf官方文档

protobuf安装

git clone https://github.com/protocolbuffers/protobuf
cd protobuf
./autogen.sh
./configure
make
sudo make install
sudo ldconfig

which protoc
protoc -h

protorpc代码

exmaples/protorpc

#include "TcpServer.h"

using namespace hv;

#include "protorpc.h"
#include "router.h"
#include "handler/handler.h"
#include "handler/calc.h"
#include "handler/login.h"

protorpc_router router[] = {
    {"add", calc_add},
    {"sub", calc_sub},
    {"mul", calc_mul},
    {"div", calc_div},
    {"login", login},
};
#define PROTORPC_ROUTER_NUM  (sizeof(router)/sizeof(router[0]))

class ProtoRpcServer : public TcpServer {
public:
    ProtoRpcServer() : TcpServer()
    {
        onConnection = [](const SocketChannelPtr& channel) {
            std::string peeraddr = channel->peeraddr();
            if (channel->isConnected()) {
                printf("%s connected! connfd=%d\n", peeraddr.c_str(), channel->fd());
            } else {
                printf("%s disconnected! connfd=%d\n", peeraddr.c_str(), channel->fd());
            }
        };
        onMessage = handleMessage;
        // init protorpc_unpack_setting
        unpack_setting_t protorpc_unpack_setting;
        memset(&protorpc_unpack_setting, 0, sizeof(unpack_setting_t));
        protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
        protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
        protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
        protorpc_unpack_setting.length_field_offset = 1;
        protorpc_unpack_setting.length_field_bytes = 4;
        protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
        setUnpack(&protorpc_unpack_setting);
    }

    int listen(int port) { return createsocket(port); }

private:
    static void handleMessage(const SocketChannelPtr& channel, Buffer* buf) {
        // unpack -> Request::ParseFromArray -> router -> Response::SerializeToArray -> pack -> Channel::write
        // protorpc_unpack
        protorpc_message msg;
        memset(&msg, 0, sizeof(msg));
        int packlen = protorpc_unpack(&msg, buf->data(), buf->size());
        if (packlen < 0) {
            printf("protorpc_unpack failed!\n");
            return;
        }
        assert(packlen == buf->size());

        // Request::ParseFromArray
        protorpc::Request req;
        protorpc::Response res;
        if (req.ParseFromArray(msg.body, msg.head.length)) {
            printf("> %s\n", req.DebugString().c_str());
            res.set_id(req.id());
            // router
            const char* method = req.method().c_str();
            bool found = false;
            for (int i = 0; i < PROTORPC_ROUTER_NUM; ++i) {
                if (strcmp(method, router[i].method) == 0) {
                    found = true;
                    router[i].handler(req, &res);
                    break;
                }
            }
            if (!found) {
                not_found(req, &res);
            }
        } else {
            bad_request(req, &res);
        }

        // Response::SerializeToArray + protorpc_pack
        memset(&msg, 0, sizeof(msg));
        msg.head.length = res.ByteSizeLong();
        packlen = protorpc_package_length(&msg.head);
        unsigned char* writebuf = NULL;
        HV_ALLOC(writebuf, packlen);
        packlen = protorpc_pack(&msg, writebuf, packlen);
        if (packlen > 0) {
            printf("< %s\n", res.DebugString().c_str());
            res.SerializeToArray(writebuf + PROTORPC_HEAD_LENGTH, msg.head.length);
            channel->write(writebuf, packlen);
        }
        HV_FREE(writebuf);
    }
};

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: %s port\n", argv[0]);
        return -10;
    }
    int port = atoi(argv[1]);

    ProtoRpcServer srv;
    int listenfd = srv.listen(port);
    if (listenfd < 0) {
        return -20;
    }
    printf("protorpc_server listen on port %d, listenfd=%d ...\n", port, listenfd);
    srv.setThreadNum(4);
    srv.start();

    while (1) hv_sleep(1);
    return 0;
}

流程很清晰,启动一个TcpServer,监听指定端口,通过setUnpack接口设置拆包规则,onMessage回调上来就是完整的一包数据,回调里调用protorpc_unpack拆包、Request::ParseFromArray反序列化得到结构化的请求,通过请求里的method字段查找注册好的router路由表,调用对应的handler处理请求、填充响应,然后Response::SerializeToArray 序列化响应+protorpc_pack 加上头部封包后,最后调用Channel::write发送出去。

base.proto定义如下:

syntax = "proto3";

package protorpc;

message Error {
    int32   code    = 1;
    string  message = 2;
}

message Request {
    uint64  id      = 1;
    string  method  = 2;
    repeated bytes params  = 3;
}

message Response {
    uint64  id      = 1;
    optional bytes  result  = 2;
    optional Error  error   = 3;
}

执行该目录下的protoc.sh会调用protoc根据proto定义文件自动生成对应代码。

测试步骤

git clone https://github.com/ithewei/libhv
cd libhv
make protorpc
bin/protorpc_server 1234
bin/protorpc_client 127.0.0.1 1234 add 1 2
bin/protorpc_client 127.0.0.1 1234 div 1 0
bin/protorpc_client 127.0.0.1 1234 xyz 1 2

结果如下: 服务端:

$ bin/protorpc_server 1234
protorpc_server listen on port 1234, listenfd=3 ...

客户端:

$ bin/protorpc_client 127.0.0.1 1234 add 1 2
connected to 127.0.0.1:1234! connfd=4
id: 1
method: "login"
params: "\n\005admin\022\006123456"

login success!
user_id: 123456
token: "admin:123456"

id: 2
method: "add"
params: "\010\001"
params: "\010\002"

calc success!
1 add 2 = 3
disconnected to 127.0.0.1:1234! connfd=4
$ bin/protorpc_client 127.0.0.1 1234 div 1 0
connected to 127.0.0.1:1234! connfd=4
id: 1
method: "login"
params: "\n\005admin\022\006123456"

login success!
user_id: 123456
token: "admin:123456"

id: 2
method: "div"
params: "\010\001"
params: ""

RPC error:
code: 400
message: "Bad Request"

calc failed!
disconnected to 127.0.0.1:1234! connfd=4
$ bin/protorpc_client 127.0.0.1 1234 xyz 1 2
connected to 127.0.0.1:1234! connfd=4
id: 1
method: "login"
params: "\n\005admin\022\006123456"

login success!
user_id: 123456
token: "admin:123456"

id: 2
method: "xyz"
params: "\010\001"
params: "\010\002"

RPC error:
code: 404
message: "Not Found"

calc failed!
disconnected to 127.0.0.1:1234! connfd=4

libhv教程16--多线程/多进程服务端编程

本篇介绍服务端编程的多线程/多进程模式以及使用libhv如何实现。

one thread per connection:每个连接一个线程

早期的apache就是采用这种模式,用于学习服务端编程的Tinyhttpd也是这种模式。

伪代码如下:

void* worker_thread(void* userdata) {
	int fd = (intptr_t)userdata;
	while (1) {
	    readbytes = read(fd, buf, len);
	    if (readbytes <= 0) {
	        close(fd);
	        break;
	    }
	    ...
	    writebytes = write(fd, buf, len);
	    if (writebytes <= 0) {
	        close(fd);
	        break;
	    }
	}
}

void* accept_thread(void* userdata) {
	int listenfd = (intptr_t)userdata;
	while (1) {
		int connfd = accept(listenfd, &sockaddr, &socklen);
		// 创建一个线程为这个连接服务
		pthread_create(worker_thread, (void *)(intptr_t)connfd);
	}
}

这种模式的缺点显而易见,因为读写都是阻塞的,所以一个IO线程只能处理一个fd,对于客户端尚可接受,对于服务端来说,每accept一个连接,就创建一个IO线程去读写这个套接字,并发达到几千就需要创建几千个线程,线程上下文的切换开销都会把系统占满。

one loop per thread:每个线程一个事件循环

因为BIO(阻塞IO)的局限性,所以IO多路复用机制应运而生,如最早期的select、后来的polllinux的epollwindows的iocpbsd的kqueuesolaris的port等,都属于IO多路复用机制。非阻塞NIO搭配IO多路复用机制就是高并发的钥匙

每个线程里运行一个事件循环,每个事件循环里通过IO多路复用机制(即select、poll、epoll、kqueue等)监听读写事件,这正是libevent、libev、libuv、libhv这类事件循环库的核心思想。

select为例,伪代码如下:

void event_loop_run() {
	while (1) {
	    int nselect = select(max_fd+1, &readfds, &writefds, &exceptfds, timeout);
	    if (nselect == 0) continue;
	    for (int fd = 0; fd <= max_fd; ++fd) {
	    	// 可读
	    	if (FD_ISSET(fd, &readfds)) {
	    		...
	    		read(fd, buf, len);
	    	}
	    	// 可写
	    	if (FD_ISSET(fd, &writefds)) {
	    		...
	    		write(fd, buf, len);
	    	}
	    }
	}
}

通过IO多路复用机制,一个IO线程就可以同时监听多个fd了,以现代计算机的性能,一个IO线程即可处理几十万数量级别的IO读写。redis就是单线程的,但可轻松达到几万QPS

为了充分利用现代计算机的多核处理器,掌握多线程服务端编程也就必不可少了。 libhvexamples/multi-thread 目录下给出了几种常见的多线程/多进程模式的具体写法。

multi-acceptor-processes:多accept进程模式

/*
 *
 * @build   make examples
 * @server  bin/multi-acceptor-processes 1234
 * @client  bin/nc 127.0.0.1 1234
 *          nc     127.0.0.1 1234
 *          telnet 127.0.0.1 1234
 */

#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"
#include "hproc.h"

static const char* host = "0.0.0.0";
static int port = 1234;
static int process_num = 4;
static int listenfd = INVALID_SOCKET;

static void on_close(hio_t* io) {
    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}

static void on_recv(hio_t* io, void* buf, int readbytes) {
    // echo
    hio_write(io, buf, readbytes);
}

static void on_accept(hio_t* io) {
    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("pid=%ld connfd=%d [%s] <= [%s]\n",
            (long)hv_getpid(),
            (int)hio_fd(io),
            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));

    hio_setcb_close(io, on_close);
    hio_setcb_read(io, on_recv);
    hio_read(io);
}

static void loop_proc(void* userdata) {
    hloop_t* loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
    haccept(loop, listenfd, on_accept);
    hloop_run(loop);
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: cmd port\n");
        return -10;
    }
    port = atoi(argv[1]);

    listenfd = Listen(port, host);
    if (listenfd < 0) {
        exit(1);
    }

    proc_ctx_t ctx;
    memset(&ctx, 0, sizeof(ctx));
    ctx.proc = loop_proc;
    for (int i = 0; i < process_num; ++i) {
        hproc_spawn(&ctx);
    }

    while(1) hv_sleep(1);

    return 0;
}

关键之处就是通过hproc_spawn(linux下就是调用fork)衍生子进程,每个进程里运行一个事件循环(hloop_run),accept请求,将连接上来的fd加入到IO多路复用中,监听读写事件。

多进程模式的好处就是父进程可以通过捕获SIGCHLD信号,即可知道子进程退出了(通常是异常崩溃了),然后重新fork一个子进程,也即是崩溃自动重启功能。而且因为进程空间的隔离,一个子进程的崩溃不会影响其它的子进程,导致所有服务进程都不可用,所以鲁棒性比较强,nginx就是采用的多进程模式。libhv中提供的httpd示例也是如此。httpd 注:libhv中提供了一个接口master_workers_run来实现这种带崩溃自动重启的master-workers多进程模式,具体示例可参考 examples/hmain_test.cpp

multi-acceptor-threads:多accept线程模式

/*
 *
 * @build   make examples
 * @server  bin/multi-acceptor-threads 1234
 * @client  bin/nc 127.0.0.1 1234
 *          nc     127.0.0.1 1234
 *          telnet 127.0.0.1 1234
 */

#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"

static const char* host = "0.0.0.0";
static int port = 1234;
static int thread_num = 4;
static int listenfd = INVALID_SOCKET;

static void on_close(hio_t* io) {
    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}

static void on_recv(hio_t* io, void* buf, int readbytes) {
    // echo
    hio_write(io, buf, readbytes);
}

static void on_accept(hio_t* io) {
    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("tid=%ld connfd=%d [%s] <= [%s]\n",
            (long)hv_gettid(),
            (int)hio_fd(io),
            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));

    hio_setcb_close(io, on_close);
    hio_setcb_read(io, on_recv);
    hio_read(io);
}

static HTHREAD_RETTYPE loop_thread(void* userdata) {
    hloop_t* loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
    haccept(loop, listenfd, on_accept);
    hloop_run(loop);
    return 0;
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: cmd port\n");
        return -10;
    }
    port = atoi(argv[1]);

    listenfd = Listen(port, host);
    if (listenfd < 0) {
        exit(1);
    }

    for (int i = 0; i < thread_num; ++i) {
        hthread_create(loop_thread, NULL);
    }

    while(1) hv_sleep(1);

    return 0;
}

和上面多进程类似,只不过是调用了hthread_create(linux下就是pthread_create)创建线程而不是进程,然后每个线程里运行一个事件循环(hloop_run)。

多线程模式相对于多进程模式的好处就是共享进程空间,也即是共享数据,而不是每个进程一份资源,当然这也带来了多线程同步的麻烦。

one-acceptor-multi-workers:一个accept线程+多worker线程

/*
 *
 * @build   make examples
 * @server  bin/one-acceptor-multi-workers 1234
 * @client  bin/nc 127.0.0.1 1234
 *          nc     127.0.0.1 1234
 *          telnet 127.0.0.1 1234
 */

#include "hloop.h"
#include "hsocket.h"
#include "hthread.h"

static const char* host = "0.0.0.0";
static int port = 1234;
static int thread_num = 4;
static hloop_t*  accept_loop = NULL;
static hloop_t** worker_loops = NULL;

static hloop_t* get_next_loop() {
    static int s_cur_index = 0;
    if (s_cur_index == thread_num) {
        s_cur_index = 0;
    }
    return worker_loops[s_cur_index++];
}

static void on_close(hio_t* io) {
    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
}

static void on_recv(hio_t* io, void* buf, int readbytes) {
    // echo
    hio_write(io, buf, readbytes);
}

static void new_conn_event(hevent_t* ev) {
    hloop_t* loop = ev->loop;
    hio_t* io = (hio_t*)hevent_userdata(ev);
    hio_attach(loop, io);

    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("tid=%ld connfd=%d [%s] <= [%s]\n",
            (long)hv_gettid(),
            (int)hio_fd(io),
            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));

    hio_setcb_close(io, on_close);
    hio_setcb_read(io, on_recv);
    hio_read(io);
}

static void on_accept(hio_t* io) {
    hio_detach(io);

    hloop_t* worker_loop = get_next_loop();
    hevent_t ev;
    memset(&ev, 0, sizeof(ev));
    ev.loop = worker_loop;
    ev.cb = new_conn_event;
    ev.userdata = io;
    hloop_post_event(worker_loop, &ev);
}

static HTHREAD_RETTYPE worker_thread(void* userdata) {
    hloop_t* loop = (hloop_t*)userdata;
    hloop_run(loop);
    return 0;
}

static HTHREAD_RETTYPE accept_thread(void* userdata) {
    hloop_t* loop = (hloop_t*)userdata;
    hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept);
    if (listenio == NULL) {
        exit(1);
    }
    hloop_run(loop);
    return 0;
}

int main(int argc, char** argv) {
    if (argc < 2) {
        printf("Usage: cmd port\n");
        return -10;
    }
    port = atoi(argv[1]);

    worker_loops = (hloop_t**)malloc(sizeof(hloop_t*) * thread_num);
    for (int i = 0; i < thread_num; ++i) {
        worker_loops[i] = hloop_new(HLOOP_FLAG_AUTO_FREE);
        hthread_create(worker_thread, worker_loops[i]);
    }

    accept_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
    accept_thread(accept_loop);

    return 0;
}

这种模式相对于上面accept(listenfd, ...)read/write(connfd, ...)都在一个线程里,多了一个步骤,当accept线程接收到一个连接时,需要挑选一个worker_loop(示例里就是简单的轮询策略,实际应用里可能还有根据最少连接数IP hashURL hash负载均衡策略),然后通知该worker_loop有新的连接到来,libhv里是通过hloop_post_event这个接口来进行事件循环间通信的,这个接口是多线程安全的,内部实现是预先创建一对socketpair,向一个fd写入,监听另外一个fd可读,感兴趣的可以读下 event/hloop.c 源码。

这种连接分发模式其实是当下比较流行的模式,因为比较方便控制负载均衡,不会出现一部分线程饱和,一部分线程饥饿的现象,memcached即是使用libevent实现的这种模式,但是因为libevent并没有提供hloop_post_event类似接口,所以需要用户自己实现,而且libevent也没有集成openssl、也没有提供拆包组包等相关功能,所以memcached里网络编程的代码并不清晰好读,如果使用libhv,我想事情会变得超级简单。

last

以上只是展示了网络IO相关的多线程模式,实际掺合业务所涉及的多线程编程比这个更加复杂,IO线程里是不允许做太多耗时操作的(一般只做接收/发送、轻量级的拆包/组包、序列化/反序列化操作),否则会影响线程里其他连接的读写,所以如果涉及CPU密集型计算,如音视频编解码、人脸识别、运动追踪等算法检测,则需要配合队列(根据业务可能叫消息队列、请求队列、任务队列、帧缓存等)+ 消费者线程/线程池 (如请求处理线程、任务执行线程、编解码线程等)使用。好在libhvhio_writehio_close都是多线程安全的,这可以让网络IO事件循环线程里接收数据、拆包组包、反序列化后放入队列,消费者线程从队列里取出数据、处理后发送响应和关闭连接,变得更加简单。

更多后续教程敬请期待

Clone this wiki locally