libevent高并发网络编程 - 06_基于libevent的C++线程池实现
文章目录
- 1 功能简介
- 线程池的初始化
- 线程池执行流程
- 2 线程池类的设计
- 线程类XThread
- XThread.h
- XThread.cpp
- 线程池类XThreadPool
- XThreadPool.h
- XThreadPool.cpp
- 任务基类task
- XTask.h
- 3 自定义任务的例子
- 自定义任务类ServerCMD
- ServerCMD.h
- ServerCMD.cpp
- 测试程序
- 运行效果
1 功能简介
本文利用libevent,实现一个C++线程池,,可自定义用户任务类,继承于任务task基类,重写任务基类的纯虚函数实现多态。比如将定义定义处理客户端的请求任务类,实现对客户端请求的并发处理。
-
工作队列:可以理解为线程的队列,一个线程同时可以处理一个任务,空闲的线程回从任务队列取出任务执行。当工作队列空时,线程会睡眠。
-
任务队列:用户将任务加入任务队列,然后通知工作队列,取出一个任务到线程中执行。
线程池的初始化
线程池执行流程
2 线程池类的设计
线程类XThread
线程类的接口功能
Start() -> 管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务
Setup() -> 设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
Main() -> 此函数只进入事件循环,等待事件循环退出Notify() -> 读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
AddTask() -> 将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
Activate() -> 通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。调用多次Activate表示加入多个任务,任务顺序被执行。
XThread.h
#pragma once
#include <vector>/*线程类声明*/
class XThread;/*任务类声明*/
class XTask;/*线程池类*/
class XThreadPool
{
public://单例模式创建返回唯一对象static XThreadPool* GetInstance();//初始化所有线程并启动线程void Init(int threadCount);//分发线程void Dispatch(XTask* task);private://将构造函数的访问属性设置为 private//将构造函数构造声明成私有不使用//声明成私有不使用XThreadPool(){} //无参构造XThreadPool(const XThreadPool&); //拷贝构造XThreadPool& operator= (const XThreadPool&); //赋值运算符重载//线程数量int threadCount = 0;//用来标记下一个使用的线程号int lastThread = -1;//线程对象数组std::vector<XThread *> threads;//线程池对象static XThreadPool* pInstance;
};
XThread.cpp
#include "XThread.h"
#include "XTask.h"
#include <thread>
#include <iostream>
#include <event2/event.h>
#include <unistd.h>using namespace std;XThread::XThread()
{}XThread::~XThread()
{}//sock 文件描述符,which 事件类型 arg传递的参数
/*
* 函数名: NotifyCB
* 作用: 管道可读事件触发回调函数
*/
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{XThread *th = (XThread*)arg;th->Notify(fd, which);
}/*
* 函数名: XThread::Start
* 作用: 启动线程
* 解释: 管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务。
*/
void XThread::Start()
{//安装线程,初始化event_base和管道监听事件用于激活Setup();//启动线程thread th(&XThread::Main, this);//线程分离th.detach();
}/*
* 函数名: XThread::Main
* 作用: 线程入口函数
* 解释: 此函数只进入事件循环,等待事件循环退出
*/
void XThread::Main()
{cout << id << " XThread::Main() begin" << endl;event_base_dispatch(base); //进入事件循环event_base_free(base);cout << id << " XThread::Main() end" << endl;
}/*
* 函数名: XThread::Setup
* 作用: 安装线程
* 解释: 设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
*/
bool XThread::Setup()
{//windows用配对socket linux用管道//创建的管道int fds[2];if(pipe(fds)){cerr << "pipe failed!" << endl;return false; }//读取绑定到event事件中,写入要保存//保存管道的写fdnotify_send_fd = fds[1];//创建一个新的事件处理器对象this->base = event_base_new();//创建一个新的事件对象//添加管道监听事件读fd,用于激活线程执行任务event *ev = event_new(base, fds[0], EV_READ|EV_PERSIST, NotifyCB, this);//将事件对象(struct event)添加到指定的事件处理器(event_base)中event_add(ev, 0);return true;
}/*
* 函数名: XThread::Notify
* 作用: 线程激活执行任务
* 解释: 读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
*/
void XThread::Notify(evutil_socket_t fd, short which)
{//水平触发 只要没有接受完成,会再次进来char buf[2] = {0};int len = read(fd, buf, 1);if (len <= 0)return;cout << id << " thread " << buf << endl;//获取任务,并初始化任务XTask* task = NULL;tasks_mutex.lock();if(tasks.empty()){ //队列为空tasks_mutex.unlock();return;}task = tasks.front(); //先进先出tasks.pop_front();tasks_mutex.unlock();task->Init();
}/*
* 函数名: XThread::Activate
* 作用: 激活线程
* 解释: 通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。
* 调用多次Activate表示加入多个任务,任务顺序被执行。
*/
void XThread::Activate()
{char act[10] = {0};int len = write(this->notify_send_fd, "c", 1);if (len <= 0){cerr << "XThread::Activate() failed!" << endl;}cout << "currect thread:" << id << ", notify_send_fd:" << this->notify_send_fd << endl;
}/*
* 函数名: XThread::AddTask
* 作用: 将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
*/
void XThread::AddTask(XTask* task)
{if(!task)return;task->base = this->base;tasks_mutex.lock();tasks.push_back(task);tasks_mutex.unlock();
}
线程池类XThreadPool
线程类的接口功能
GetInstance() -> 单例模式创建返回唯一对象
Init() -> 创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组
Dispatch() -> 从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务
XThreadPool.h
#pragma once
#include <vector>/*线程类声明*/
class XThread;/*任务类声明*/
class XTask;/*线程池类*/
class XThreadPool
{
public://单例模式创建返回唯一对象static XThreadPool* GetInstance();//初始化所有线程并启动线程void Init(int threadCount);//分发线程void Dispatch(XTask* task);private://将构造函数的访问属性设置为 private//将构造函数构造声明成私有不使用//声明成私有不使用XThreadPool(){} //无参构造XThreadPool(const XThreadPool&); //拷贝构造XThreadPool& operator= (const XThreadPool&); //赋值运算符重载//线程数量int threadCount = 0;//用来标记下一个使用的线程号int lastThread = -1;//线程对象数组std::vector<XThread *> threads;//线程池对象static XThreadPool* pInstance;
};
XThreadPool.cpp
#include "XThreadPool.h"
#include "XThread.h"
#include <thread>
#include <iostream>
//#include <chrono>using namespace std;//静态成员变量类外初始化
XThreadPool* XThreadPool::pInstance = NULL;/*
* 函数名: XThreadPool::GetInstance
* 作用: 单例模式创建返回唯一对象
*/
XThreadPool* XThreadPool::GetInstance()
{//当需要使用对象时,访问instance 的值//空值:创建对象,并用instance 标记//非空值: 返回instance 标记的对象if( pInstance == NULL ){pInstance = new XThreadPool();}return pInstance;
}/*
* 函数名: XThreadPool::Init
* 作用: 初始化所有线程并启动线程
* 解释: 创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组
*/
void XThreadPool::Init(int threadCount)
{this->threadCount = threadCount;this->lastThread = -1;for (int i = 0; i < threadCount; i++){XThread *t = new XThread();t->id = i + 1;cout << "Create thread " << i << endl;//启动线程t->Start();threads.push_back(t);this_thread::sleep_for(std::chrono::microseconds(10));}
}/*
* 函数名: XThreadPool::Dispatch
* 作用: 分发线程
* 解释: 从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务。
*/
void XThreadPool::Dispatch(XTask* task)
{//轮询if(!task)return;int tid = (lastThread + 1) % threadCount;lastThread = tid;cout << "lastThread:" << lastThread << endl;XThread *XTh = threads[tid];//添加任务XTh->AddTask(task);//线程激活XTh->Activate();
}
任务基类task
XTask.h
#pragma once
#include <iostream>class XTask
{
public://事件处理器对象struct event_base* base = NULL;//客户端连接的socketint sock = 0;//初始化任务 纯虚函数virtual bool Init() = 0;
};
3 自定义任务的例子
自定义任务类ServerCMD
线程类的接口功能
Init() -> 初始化任务,注册当前socket的读事件和超时事件,绑定回调函数
ReadCB() -> 读事件回调函数
EventCB() -> 客户端超时未发请求,断开连接退出任务
ServerCMD.h
#pragma once#include "XTask.h"class XFtpServerCMD : public XTask
{
public://初始化任务virtual bool Init();XFtpServerCMD();~XFtpServerCMD();
};
ServerCMD.cpp
#include "XFtpServerCMD.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <string.h>using namespace std;/*
* 函数名: EventCB
* 作用: 超时事件回调函数
* 解释: 客户端超时未发请求,断开连接退出任务
*/
void EventCB(struct bufferevent *bev, short what, void *arg)
{XFtpServerCMD* cmd = (XFtpServerCMD*)arg;//如果对方网络断掉,或者机器死机有可能收不到BEV_EVENT_EOF数据if(what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)){cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR |BEV_EVENT_TIMEOUT" << endl;bufferevent_free(bev);delete cmd;}
}/*
* 函数名: ReadCB
* 作用: 读事件回调函数
*/
void ReadCB(struct bufferevent *bev, void *arg)
{XFtpServerCMD* cmd = (XFtpServerCMD*)arg;char data[1024] = {0};for (;;){int len = bufferevent_read(bev, data, sizeof(data)-1);if(len <= 0)break;data[len] = '\0';cout << data << endl << flush;//测试代码,要清理掉if(strstr(data, "quit")){bufferevent_free(bev);delete cmd;break;}}
}/*
* 函数名: XFtpServerCMD::Init
* 作用: 初始化任务
* 解释: 初始化任务,注册当前socket的读事件和超时事件,绑定回调函数。
*/
bool XFtpServerCMD::Init()
{cout << "XFtpServerCMD::Init() sock:" << sock << endl;//监听socket bufferevent// base socketbufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);bufferevent_setcb(bev, ReadCB, 0 ,EventCB, this);bufferevent_enable(bev, EV_READ | EV_WRITE);//添加超时timeval rt = {10, 0}; //10秒bufferevent_set_timeouts(bev, &rt, 0); //设置读超时回调函数return true;
}XFtpServerCMD::XFtpServerCMD()
{}XFtpServerCMD::~XFtpServerCMD()
{}
测试程序
#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#include "XThreadPool.h"
#include <signal.h>
#include <iostream>#include "XFtpServerCMD.h"using namespace std;
#define SPORT 5001/*
* 函数名: listen_cb
* 作用: 接收到连接的回调函数
* 解释: 通过多态来创建任务对象,将当前socket保存到任务对象中,分发任务执行
*/
void listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{cout << "listen_cb" << endl;XTask* task = new XFtpServerCMD();task->sock = s;XThreadPool::GetInstance()->Dispatch(task);
}int main()
{//忽略管道信号,发送数据给已关闭的socketif (signal(SIGPIPE, SIG_IGN) == SIG_ERR)return 1;//1 初始化线程池XThreadPool::GetInstance()->Init(5);std::cout << "test thread pool!\n"; //创建libevent的上下文event_base* base = event_base_new();if (base){cout << "event_base_new success!" << endl;}//监听端口//socket ,bind,listen 绑定事件sockaddr_in sin;memset(&sin, 0, sizeof(sin));sin.sin_family = AF_INET;sin.sin_port = htons(SPORT);evconnlistener* ev = evconnlistener_new_bind(base, // libevent的上下文listen_cb, //接收到连接的回调函数base, //回调函数获取的参数 argLEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, //地址重用,evconnlistener关闭同时关闭socket10, //连接队列大小,对应listen函数(sockaddr*)&sin, //绑定的地址和端口sizeof(sin));//事件分发处理if(base)event_base_dispatch(base);if(ev)evconnlistener_free(ev);if(base)event_base_free(base);return 0;
}
运行效果
初始化线程池,创建5个线程,通过telnet和网络调试软件模拟客户端的接入,客户端发送信息服务器打印出来,当客户端超时未发请求,断开连接退出任务。
相关文章:

libevent高并发网络编程 - 06_基于libevent的C++线程池实现
文章目录 1 功能简介线程池的初始化线程池执行流程 2 线程池类的设计线程类XThreadXThread.hXThread.cpp 线程池类XThreadPoolXThreadPool.hXThreadPool.cpp 任务基类taskXTask.h 3 自定义任务的例子自定义任务类ServerCMDServerCMD.hServerCMD.cpp 测试程序运行效果 1 功能简介…...

【Java EE 初阶】线程安全及死锁解决方案
目录 1.多线程下线程不安全的问题 1.使用多个线程对Array List集合进行添加操作并打印,查看结果 2.如何在多线程环境下使用线程安全的集合类 CopyOnWriteArrayList 3.多线程环境下使用队列 4.多线程环境下使用哈希表 1.HashTable线程安全 2.Concurrent Hash M…...

C语言函数大全-- _w 开头的函数(5)
C语言函数大全 本篇介绍C语言函数大全-- _w 开头的函数 1. _wspawnl 1.1 函数说明 函数声明函数功能int _wspawnl(int mode, const wchar_t* cmdname, const wchar_t* arglist, ...);启动一个新的进程并运行指定的可执行文件 参数: mode : 启动命令的…...

机械大专生能学会云计算吗,完全零基础的
机械大专生能学会云计算吗,完全零基础的 正常来说,大专及以上学历都能学会云计算,但是会和满足就业需求是两回事哈。如果你想通过学习就业,就需要根据当下相关岗位的普遍技术需求以及其他方面的要求,来针对性的学习和提…...

腾讯云EdgeOne为什么能让客户降本增效?
随着数字化时代的来临,各类线上互动场景不断出现,并成为人们日常工作生活中的一部分。然而,基于互联网提供线上娱乐、线上办公、线上购物等服务的企业,在复杂的全球网络环境下会遇到网络延迟不稳定的情况,海外环境更多…...

基于粒子群算法的微网经济优化调度——附Matalb代码
目录 摘要: 代码主要内容: 研究背景: 微电网模型: 粒子群算法: 运行结果: Matlab代码分享: 摘要: 提出了一种经济与环保相协调的微电网优化调度模型,针对光伏电池…...

Flink入门
目录 一、Flink简介 二、为什么选择Flink 三、与传统数据处理架构相比 四、Flinik批处理数据基础代码 五、Flink流处理基础代码 一、Flink简介 Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数 据流进行状态计算。 二、为什么选择Flink 流数据更…...

【Go微服务开发】gin+grpc+etcd 重构 grpc-todolist 项目
写在前面 最近稍微重构了之前写的 grpc-todolist 模块 项目地址:https://github.com/CocaineCong/grpc-todoList 1. 项目结构改变 与之前的目录有很大的区别 1.1 grpc_todolist 项目总体 1.1.1 改变前 grpc-todolist/ ├── api-gatway // 网关模块 ├── ta…...

单板硬件设计:存储器SD卡( NAND FLASH)
在单板设计中,无论是涉及到一个简易的CPU、MCU小系统或者是复杂的单板设计,都离不开存储器设计: 1、存储器介绍 存储器的分类大致可以划分如下: ROM和RAM指的都是半导体存储器,ROM在系统停止供电的时候仍然可以保持数…...

C++实现日期类Date(超详细)
个人主页:平行线也会相交💪 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 平行线也会相交 原创 收录于专栏【C之路】💌 本专栏旨在记录C的学习路线,望对大家有所帮助🙇 希望我们一起努力、成长&…...

实验室检验系统源码,集检验业务、质量控制、报告、统计分析、两癌等模块于一体
云 LIS 系统针对区域化 LIS 而设计,依托底层云架构,将传统的 LIS 功能模块进行“云化”。 该系统是集检验业务、科室管理、质量控制、报告、统计分析、两癌等模块于一体的数据检验信息平台。通过计算机联网,实现各类仪器数据结果的实时自动接…...

学习RHCSA的day.03
目录 2.6 Linux系统的目录结构 2.7 目录操作命令 2.8 文件操作命令 2.6 Linux系统的目录结构 1、Linux目录结构的特点 分区加载于目录结构: 使用树形目录结构来组织和管理文件。整个系统只有一个位于根分区的一个根目录(树根)、一棵树。…...

电子邮件协议(SMTP,MIME,POP3,IMAP)
SMTP 关键词: 电子邮件协议:SMTP简单邮件传输协议,负责将邮件上传到服务器,采用TCP的25端口,C/S工作。仅传送ASCII码文本 详细介绍: SMTP是一种提供可靠且有效的电子邮件传输的协议。SMTP是建立在FTP文件传输服务上…...

Golang笔记:使用embed包将静态资源嵌入到程序中
文章目录 目的使用演示//go:embed 指令在WebServer中应用总结 目的 Golang编译程序默认是静态编译,会把相关的库都打包到一起,这在分发部署使用时非常方便。不过如果项目中用到的外部的静态资源文件,通常就需要将这些资源和程序一起拷贝分发…...
ImportError: cannot import name ‘OldCsv‘ from ‘pyflink.table.descriptors‘
我最近开始使用flink用于数据处理。 当我尝试执行table api 用于计数时 我不能导入OldCsv and FileSystem from pyflink.table.descriptors. I have also downloaded apache-flink using: pip install apache-flink [rootmaster flink]# pip3 list | grep flink apache-fli…...

YouCompleteMe(YCM)安装
vim在各个linux版本中是个比较好编辑器,反正nano我是用不惯。但这个ycm的安装也是不断的在变,现在的安装比之前要简单的多,基本个几命令就搞定了,而且 也不用关心系统里有没有vim,ycm已经可以自动安装。具体安装步骤如下ÿ…...

day33_css
今日内容 零、 复习昨日 一、CSS 零、 复习昨日 见代码 一 、引言 1.1CSS概念 层叠样式表(英文全称:Cascading Style Sheets)是一种用来表现HTML(标准通用标记语言的一个应用)或XML(标准通用标记语言的一个子集)等文…...

10个最流行的向量数据库【AI】
矢量数据库是一种将数据存储为高维向量的数据库,高维向量是特征或属性的数学表示。 每个向量都有一定数量的维度,范围从几十到几千不等,具体取决于数据的复杂性和粒度。 推荐:用 NSDT场景设计器 快速搭建3D场景。 矢量数据库&…...

vite3+vue3 项目打包优化二 —— 依赖分包策略
在没有配置构建工具的分包功能时,构建出来的文件将无比巨大且是独立的一个 js 和 css 文件(如下图),这样本地加载文件时会存在巨大的压力。 默认情况下,浏览器重复请求相同名称的静态资源时,会直接使用缓存…...

中国社科院与美国杜兰大学金融管理硕士——与时间赛跑,充分利用每一分钟
不管你愿不愿意,时间总是在不经意间流去。林清玄在《和时间赛跑》中写道:“虽然我知道人永远跑不过时间,但是可以比原来快跑几步。那几步虽然很小很小,但作用却很大很大”。是的,我们需要与时间赛跑,充分利…...
渲染学进阶内容——模型
最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...
是否存在路径(FIFOBB算法)
题目描述 一个具有 n 个顶点e条边的无向图,该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序,确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数,分别表示n 和 e 的值(1…...

企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
docker 部署发现spring.profiles.active 问题
报错: org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码
目录 一、👨🎓网站题目 二、✍️网站描述 三、📚网站介绍 四、🌐网站效果 五、🪓 代码实现 🧱HTML 六、🥇 如何让学习不再盲目 七、🎁更多干货 一、👨…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...
高防服务器价格高原因分析
高防服务器的价格较高,主要是由于其特殊的防御机制、硬件配置、运营维护等多方面的综合成本。以下从技术、资源和服务三个维度详细解析高防服务器昂贵的原因: 一、硬件与技术投入 大带宽需求 DDoS攻击通过占用大量带宽资源瘫痪目标服务器,因此…...
React从基础入门到高级实战:React 实战项目 - 项目五:微前端与模块化架构
React 实战项目:微前端与模块化架构 欢迎来到 React 开发教程专栏 的第 30 篇!在前 29 篇文章中,我们从 React 的基础概念逐步深入到高级技巧,涵盖了组件设计、状态管理、路由配置、性能优化和企业级应用等核心内容。这一次&…...

链式法则中 复合函数的推导路径 多变量“信息传递路径”
非常好,我们将之前关于偏导数链式法则中不能“约掉”偏导符号的问题,统一使用 二重复合函数: z f ( u ( x , y ) , v ( x , y ) ) \boxed{z f(u(x,y),\ v(x,y))} zf(u(x,y), v(x,y)) 来全面说明。我们会展示其全微分形式(偏导…...

leetcode73-矩阵置零
leetcode 73 思路 记录 0 元素的位置:遍历整个矩阵,找出所有值为 0 的元素,并将它们的坐标记录在数组zeroPosition中置零操作:遍历记录的所有 0 元素位置,将每个位置对应的行和列的所有元素置为 0 具体步骤 初始化…...