libevent高并发网络编程 - 06_基于libevent的C++线程池实现
文章目录
- 1 功能简介
- 线程池的初始化
- 线程池执行流程
- 2 线程池类的设计
- 线程类XThread
- XThread.h
- XThread.cpp
- 线程池类XThreadPool
- XThreadPool.h
- XThreadPool.cpp
- 任务基类task
- XTask.h
- 3 自定义任务的例子
- 自定义任务类ServerCMD
- ServerCMD.h
- ServerCMD.cpp
- 测试程序
- 运行效果
1 功能简介
本文利用libevent,实现一个C++线程池,,可自定义用户任务类,继承于任务task基类,重写任务基类的纯虚函数实现多态。比如将定义定义处理客户端的请求任务类,实现对客户端请求的并发处理。
-
工作队列:可以理解为线程的队列,一个线程同时可以处理一个任务,空闲的线程回从任务队列取出任务执行。当工作队列空时,线程会睡眠。
-
任务队列:用户将任务加入任务队列,然后通知工作队列,取出一个任务到线程中执行。
线程池的初始化

线程池执行流程

2 线程池类的设计
线程类XThread
线程类的接口功能
Start() -> 管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务
Setup() -> 设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
Main() -> 此函数只进入事件循环,等待事件循环退出Notify() -> 读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
AddTask() -> 将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
Activate() -> 通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。调用多次Activate表示加入多个任务,任务顺序被执行。
XThread.h
#pragma once
#include <vector>/*线程类声明*/
class XThread;/*任务类声明*/
class XTask;/*线程池类*/
class XThreadPool
{
public://单例模式创建返回唯一对象static XThreadPool* GetInstance();//初始化所有线程并启动线程void Init(int threadCount);//分发线程void Dispatch(XTask* task);private://将构造函数的访问属性设置为 private//将构造函数构造声明成私有不使用//声明成私有不使用XThreadPool(){} //无参构造XThreadPool(const XThreadPool&); //拷贝构造XThreadPool& operator= (const XThreadPool&); //赋值运算符重载//线程数量int threadCount = 0;//用来标记下一个使用的线程号int lastThread = -1;//线程对象数组std::vector<XThread *> threads;//线程池对象static XThreadPool* pInstance;
};
XThread.cpp
#include "XThread.h"
#include "XTask.h"
#include <thread>
#include <iostream>
#include <event2/event.h>
#include <unistd.h>using namespace std;XThread::XThread()
{}XThread::~XThread()
{}//sock 文件描述符,which 事件类型 arg传递的参数
/*
* 函数名: NotifyCB
* 作用: 管道可读事件触发回调函数
*/
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{XThread *th = (XThread*)arg;th->Notify(fd, which);
}/*
* 函数名: XThread::Start
* 作用: 启动线程
* 解释: 管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务。
*/
void XThread::Start()
{//安装线程,初始化event_base和管道监听事件用于激活Setup();//启动线程thread th(&XThread::Main, this);//线程分离th.detach();
}/*
* 函数名: XThread::Main
* 作用: 线程入口函数
* 解释: 此函数只进入事件循环,等待事件循环退出
*/
void XThread::Main()
{cout << id << " XThread::Main() begin" << endl;event_base_dispatch(base); //进入事件循环event_base_free(base);cout << id << " XThread::Main() end" << endl;
}/*
* 函数名: XThread::Setup
* 作用: 安装线程
* 解释: 设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
*/
bool XThread::Setup()
{//windows用配对socket linux用管道//创建的管道int fds[2];if(pipe(fds)){cerr << "pipe failed!" << endl;return false; }//读取绑定到event事件中,写入要保存//保存管道的写fdnotify_send_fd = fds[1];//创建一个新的事件处理器对象this->base = event_base_new();//创建一个新的事件对象//添加管道监听事件读fd,用于激活线程执行任务event *ev = event_new(base, fds[0], EV_READ|EV_PERSIST, NotifyCB, this);//将事件对象(struct event)添加到指定的事件处理器(event_base)中event_add(ev, 0);return true;
}/*
* 函数名: XThread::Notify
* 作用: 线程激活执行任务
* 解释: 读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
*/
void XThread::Notify(evutil_socket_t fd, short which)
{//水平触发 只要没有接受完成,会再次进来char buf[2] = {0};int len = read(fd, buf, 1);if (len <= 0)return;cout << id << " thread " << buf << endl;//获取任务,并初始化任务XTask* task = NULL;tasks_mutex.lock();if(tasks.empty()){ //队列为空tasks_mutex.unlock();return;}task = tasks.front(); //先进先出tasks.pop_front();tasks_mutex.unlock();task->Init();
}/*
* 函数名: XThread::Activate
* 作用: 激活线程
* 解释: 通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。
* 调用多次Activate表示加入多个任务,任务顺序被执行。
*/
void XThread::Activate()
{char act[10] = {0};int len = write(this->notify_send_fd, "c", 1);if (len <= 0){cerr << "XThread::Activate() failed!" << endl;}cout << "currect thread:" << id << ", notify_send_fd:" << this->notify_send_fd << endl;
}/*
* 函数名: XThread::AddTask
* 作用: 将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
*/
void XThread::AddTask(XTask* task)
{if(!task)return;task->base = this->base;tasks_mutex.lock();tasks.push_back(task);tasks_mutex.unlock();
}
线程池类XThreadPool
线程类的接口功能
GetInstance() -> 单例模式创建返回唯一对象
Init() -> 创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组
Dispatch() -> 从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务
XThreadPool.h
#pragma once
#include <vector>/*线程类声明*/
class XThread;/*任务类声明*/
class XTask;/*线程池类*/
class XThreadPool
{
public://单例模式创建返回唯一对象static XThreadPool* GetInstance();//初始化所有线程并启动线程void Init(int threadCount);//分发线程void Dispatch(XTask* task);private://将构造函数的访问属性设置为 private//将构造函数构造声明成私有不使用//声明成私有不使用XThreadPool(){} //无参构造XThreadPool(const XThreadPool&); //拷贝构造XThreadPool& operator= (const XThreadPool&); //赋值运算符重载//线程数量int threadCount = 0;//用来标记下一个使用的线程号int lastThread = -1;//线程对象数组std::vector<XThread *> threads;//线程池对象static XThreadPool* pInstance;
};
XThreadPool.cpp
#include "XThreadPool.h"
#include "XThread.h"
#include <thread>
#include <iostream>
//#include <chrono>using namespace std;//静态成员变量类外初始化
XThreadPool* XThreadPool::pInstance = NULL;/*
* 函数名: XThreadPool::GetInstance
* 作用: 单例模式创建返回唯一对象
*/
XThreadPool* XThreadPool::GetInstance()
{//当需要使用对象时,访问instance 的值//空值:创建对象,并用instance 标记//非空值: 返回instance 标记的对象if( pInstance == NULL ){pInstance = new XThreadPool();}return pInstance;
}/*
* 函数名: XThreadPool::Init
* 作用: 初始化所有线程并启动线程
* 解释: 创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组
*/
void XThreadPool::Init(int threadCount)
{this->threadCount = threadCount;this->lastThread = -1;for (int i = 0; i < threadCount; i++){XThread *t = new XThread();t->id = i + 1;cout << "Create thread " << i << endl;//启动线程t->Start();threads.push_back(t);this_thread::sleep_for(std::chrono::microseconds(10));}
}/*
* 函数名: XThreadPool::Dispatch
* 作用: 分发线程
* 解释: 从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务。
*/
void XThreadPool::Dispatch(XTask* task)
{//轮询if(!task)return;int tid = (lastThread + 1) % threadCount;lastThread = tid;cout << "lastThread:" << lastThread << endl;XThread *XTh = threads[tid];//添加任务XTh->AddTask(task);//线程激活XTh->Activate();
}
任务基类task
XTask.h
#pragma once
#include <iostream>class XTask
{
public://事件处理器对象struct event_base* base = NULL;//客户端连接的socketint sock = 0;//初始化任务 纯虚函数virtual bool Init() = 0;
};
3 自定义任务的例子
自定义任务类ServerCMD
线程类的接口功能
Init() -> 初始化任务,注册当前socket的读事件和超时事件,绑定回调函数
ReadCB() -> 读事件回调函数
EventCB() -> 客户端超时未发请求,断开连接退出任务
ServerCMD.h
#pragma once#include "XTask.h"class XFtpServerCMD : public XTask
{
public://初始化任务virtual bool Init();XFtpServerCMD();~XFtpServerCMD();
};
ServerCMD.cpp
#include "XFtpServerCMD.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <string.h>using namespace std;/*
* 函数名: EventCB
* 作用: 超时事件回调函数
* 解释: 客户端超时未发请求,断开连接退出任务
*/
void EventCB(struct bufferevent *bev, short what, void *arg)
{XFtpServerCMD* cmd = (XFtpServerCMD*)arg;//如果对方网络断掉,或者机器死机有可能收不到BEV_EVENT_EOF数据if(what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)){cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR |BEV_EVENT_TIMEOUT" << endl;bufferevent_free(bev);delete cmd;}
}/*
* 函数名: ReadCB
* 作用: 读事件回调函数
*/
void ReadCB(struct bufferevent *bev, void *arg)
{XFtpServerCMD* cmd = (XFtpServerCMD*)arg;char data[1024] = {0};for (;;){int len = bufferevent_read(bev, data, sizeof(data)-1);if(len <= 0)break;data[len] = '\0';cout << data << endl << flush;//测试代码,要清理掉if(strstr(data, "quit")){bufferevent_free(bev);delete cmd;break;}}
}/*
* 函数名: XFtpServerCMD::Init
* 作用: 初始化任务
* 解释: 初始化任务,注册当前socket的读事件和超时事件,绑定回调函数。
*/
bool XFtpServerCMD::Init()
{cout << "XFtpServerCMD::Init() sock:" << sock << endl;//监听socket bufferevent// base socketbufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);bufferevent_setcb(bev, ReadCB, 0 ,EventCB, this);bufferevent_enable(bev, EV_READ | EV_WRITE);//添加超时timeval rt = {10, 0}; //10秒bufferevent_set_timeouts(bev, &rt, 0); //设置读超时回调函数return true;
}XFtpServerCMD::XFtpServerCMD()
{}XFtpServerCMD::~XFtpServerCMD()
{}
测试程序
#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#include "XThreadPool.h"
#include <signal.h>
#include <iostream>#include "XFtpServerCMD.h"using namespace std;
#define SPORT 5001/*
* 函数名: listen_cb
* 作用: 接收到连接的回调函数
* 解释: 通过多态来创建任务对象,将当前socket保存到任务对象中,分发任务执行
*/
void listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{cout << "listen_cb" << endl;XTask* task = new XFtpServerCMD();task->sock = s;XThreadPool::GetInstance()->Dispatch(task);
}int main()
{//忽略管道信号,发送数据给已关闭的socketif (signal(SIGPIPE, SIG_IGN) == SIG_ERR)return 1;//1 初始化线程池XThreadPool::GetInstance()->Init(5);std::cout << "test thread pool!\n"; //创建libevent的上下文event_base* base = event_base_new();if (base){cout << "event_base_new success!" << endl;}//监听端口//socket ,bind,listen 绑定事件sockaddr_in sin;memset(&sin, 0, sizeof(sin));sin.sin_family = AF_INET;sin.sin_port = htons(SPORT);evconnlistener* ev = evconnlistener_new_bind(base, // libevent的上下文listen_cb, //接收到连接的回调函数base, //回调函数获取的参数 argLEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, //地址重用,evconnlistener关闭同时关闭socket10, //连接队列大小,对应listen函数(sockaddr*)&sin, //绑定的地址和端口sizeof(sin));//事件分发处理if(base)event_base_dispatch(base);if(ev)evconnlistener_free(ev);if(base)event_base_free(base);return 0;
}
运行效果
初始化线程池,创建5个线程,通过telnet和网络调试软件模拟客户端的接入,客户端发送信息服务器打印出来,当客户端超时未发请求,断开连接退出任务。

相关文章:
libevent高并发网络编程 - 06_基于libevent的C++线程池实现
文章目录 1 功能简介线程池的初始化线程池执行流程 2 线程池类的设计线程类XThreadXThread.hXThread.cpp 线程池类XThreadPoolXThreadPool.hXThreadPool.cpp 任务基类taskXTask.h 3 自定义任务的例子自定义任务类ServerCMDServerCMD.hServerCMD.cpp 测试程序运行效果 1 功能简介…...
【Java EE 初阶】线程安全及死锁解决方案
目录 1.多线程下线程不安全的问题 1.使用多个线程对Array List集合进行添加操作并打印,查看结果 2.如何在多线程环境下使用线程安全的集合类 CopyOnWriteArrayList 3.多线程环境下使用队列 4.多线程环境下使用哈希表 1.HashTable线程安全 2.Concurrent Hash M…...
C语言函数大全-- _w 开头的函数(5)
C语言函数大全 本篇介绍C语言函数大全-- _w 开头的函数 1. _wspawnl 1.1 函数说明 函数声明函数功能int _wspawnl(int mode, const wchar_t* cmdname, const wchar_t* arglist, ...);启动一个新的进程并运行指定的可执行文件 参数: mode : 启动命令的…...
机械大专生能学会云计算吗,完全零基础的
机械大专生能学会云计算吗,完全零基础的 正常来说,大专及以上学历都能学会云计算,但是会和满足就业需求是两回事哈。如果你想通过学习就业,就需要根据当下相关岗位的普遍技术需求以及其他方面的要求,来针对性的学习和提…...
腾讯云EdgeOne为什么能让客户降本增效?
随着数字化时代的来临,各类线上互动场景不断出现,并成为人们日常工作生活中的一部分。然而,基于互联网提供线上娱乐、线上办公、线上购物等服务的企业,在复杂的全球网络环境下会遇到网络延迟不稳定的情况,海外环境更多…...
基于粒子群算法的微网经济优化调度——附Matalb代码
目录 摘要: 代码主要内容: 研究背景: 微电网模型: 粒子群算法: 运行结果: Matlab代码分享: 摘要: 提出了一种经济与环保相协调的微电网优化调度模型,针对光伏电池…...
Flink入门
目录 一、Flink简介 二、为什么选择Flink 三、与传统数据处理架构相比 四、Flinik批处理数据基础代码 五、Flink流处理基础代码 一、Flink简介 Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数 据流进行状态计算。 二、为什么选择Flink 流数据更…...
【Go微服务开发】gin+grpc+etcd 重构 grpc-todolist 项目
写在前面 最近稍微重构了之前写的 grpc-todolist 模块 项目地址:https://github.com/CocaineCong/grpc-todoList 1. 项目结构改变 与之前的目录有很大的区别 1.1 grpc_todolist 项目总体 1.1.1 改变前 grpc-todolist/ ├── api-gatway // 网关模块 ├── ta…...
单板硬件设计:存储器SD卡( NAND FLASH)
在单板设计中,无论是涉及到一个简易的CPU、MCU小系统或者是复杂的单板设计,都离不开存储器设计: 1、存储器介绍 存储器的分类大致可以划分如下: ROM和RAM指的都是半导体存储器,ROM在系统停止供电的时候仍然可以保持数…...
C++实现日期类Date(超详细)
个人主页:平行线也会相交💪 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 平行线也会相交 原创 收录于专栏【C之路】💌 本专栏旨在记录C的学习路线,望对大家有所帮助🙇 希望我们一起努力、成长&…...
实验室检验系统源码,集检验业务、质量控制、报告、统计分析、两癌等模块于一体
云 LIS 系统针对区域化 LIS 而设计,依托底层云架构,将传统的 LIS 功能模块进行“云化”。 该系统是集检验业务、科室管理、质量控制、报告、统计分析、两癌等模块于一体的数据检验信息平台。通过计算机联网,实现各类仪器数据结果的实时自动接…...
学习RHCSA的day.03
目录 2.6 Linux系统的目录结构 2.7 目录操作命令 2.8 文件操作命令 2.6 Linux系统的目录结构 1、Linux目录结构的特点 分区加载于目录结构: 使用树形目录结构来组织和管理文件。整个系统只有一个位于根分区的一个根目录(树根)、一棵树。…...
电子邮件协议(SMTP,MIME,POP3,IMAP)
SMTP 关键词: 电子邮件协议:SMTP简单邮件传输协议,负责将邮件上传到服务器,采用TCP的25端口,C/S工作。仅传送ASCII码文本 详细介绍: SMTP是一种提供可靠且有效的电子邮件传输的协议。SMTP是建立在FTP文件传输服务上…...
Golang笔记:使用embed包将静态资源嵌入到程序中
文章目录 目的使用演示//go:embed 指令在WebServer中应用总结 目的 Golang编译程序默认是静态编译,会把相关的库都打包到一起,这在分发部署使用时非常方便。不过如果项目中用到的外部的静态资源文件,通常就需要将这些资源和程序一起拷贝分发…...
ImportError: cannot import name ‘OldCsv‘ from ‘pyflink.table.descriptors‘
我最近开始使用flink用于数据处理。 当我尝试执行table api 用于计数时 我不能导入OldCsv and FileSystem from pyflink.table.descriptors. I have also downloaded apache-flink using: pip install apache-flink [rootmaster flink]# pip3 list | grep flink apache-fli…...
YouCompleteMe(YCM)安装
vim在各个linux版本中是个比较好编辑器,反正nano我是用不惯。但这个ycm的安装也是不断的在变,现在的安装比之前要简单的多,基本个几命令就搞定了,而且 也不用关心系统里有没有vim,ycm已经可以自动安装。具体安装步骤如下ÿ…...
day33_css
今日内容 零、 复习昨日 一、CSS 零、 复习昨日 见代码 一 、引言 1.1CSS概念 层叠样式表(英文全称:Cascading Style Sheets)是一种用来表现HTML(标准通用标记语言的一个应用)或XML(标准通用标记语言的一个子集)等文…...
10个最流行的向量数据库【AI】
矢量数据库是一种将数据存储为高维向量的数据库,高维向量是特征或属性的数学表示。 每个向量都有一定数量的维度,范围从几十到几千不等,具体取决于数据的复杂性和粒度。 推荐:用 NSDT场景设计器 快速搭建3D场景。 矢量数据库&…...
vite3+vue3 项目打包优化二 —— 依赖分包策略
在没有配置构建工具的分包功能时,构建出来的文件将无比巨大且是独立的一个 js 和 css 文件(如下图),这样本地加载文件时会存在巨大的压力。 默认情况下,浏览器重复请求相同名称的静态资源时,会直接使用缓存…...
中国社科院与美国杜兰大学金融管理硕士——与时间赛跑,充分利用每一分钟
不管你愿不愿意,时间总是在不经意间流去。林清玄在《和时间赛跑》中写道:“虽然我知道人永远跑不过时间,但是可以比原来快跑几步。那几步虽然很小很小,但作用却很大很大”。是的,我们需要与时间赛跑,充分利…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...
Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...
2025季度云服务器排行榜
在全球云服务器市场,各厂商的排名和地位并非一成不变,而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势,对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析: 一、全球“三巨头”…...
Golang——6、指针和结构体
指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...
Linux中《基础IO》详细介绍
目录 理解"文件"狭义理解广义理解文件操作的归类认知系统角度文件类别 回顾C文件接口打开文件写文件读文件稍作修改,实现简单cat命令 输出信息到显示器,你有哪些方法stdin & stdout & stderr打开文件的方式 系统⽂件I/O⼀种传递标志位…...
6️⃣Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙
Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙 一、前言:离区块链还有多远? 区块链听起来可能遥不可及,似乎是只有密码学专家和资深工程师才能涉足的领域。但事实上,构建一个区块链的核心并不复杂,尤其当你已经掌握了一门系统编程语言,比如 Go。 要真正理解区…...
Vue 3 + WebSocket 实战:公司通知实时推送功能详解
📢 Vue 3 WebSocket 实战:公司通知实时推送功能详解 📌 收藏 点赞 关注,项目中要用到推送功能时就不怕找不到了! 实时通知是企业系统中常见的功能,比如:管理员发布通知后,所有用户…...
C# winform教程(二)----checkbox
一、作用 提供一个用户选择或者不选的状态,这是一个可以多选的控件。 二、属性 其实功能大差不差,除了特殊的几个外,与button基本相同,所有说几个独有的 checkbox属性 名称内容含义appearance控件外观可以变成按钮形状checkali…...
欢乐熊大话蓝牙知识17:多连接 BLE 怎么设计服务不会乱?分层思维来救场!
多连接 BLE 怎么设计服务不会乱?分层思维来救场! 作者按: 你是不是也遇到过 BLE 多连接时,调试现场像网吧“掉线风暴”? 温度传感器连上了,心率带丢了;一边 OTA 更新,一边通知卡壳。…...
