四、单线程多路IO复用+多线程业务工作池
文章目录
- 一、前言
- 1 编译方法
- 二、单线程多路IO复用+多线程业务工作池结构
- 三、重写`Client_Context`类
- 四、编写`Server`类
一、前言
我们以及讲完单线程多路IO复用 以及任务调度与执行的C++线程池,接下来我们就给他结合起来。
由于项目变大,尝试解耦项目,使用
CMake,可以看这篇文章现代CMake使用,使C++代码解耦
本节代码均可在仓库TinyWebServer 中找到
1 编译方法
# 进入Server目录下
mkdir build
cd build
cmake ..
cmake --build .
二、单线程多路IO复用+多线程业务工作池结构
简单来说就是把,读写任务交给线程池。

三、重写Client_Context类
上一节的Client_Context类并不能做到线程安全,以及管理客户端状态。所以做以下改变。
// client_context.hclass ClientContext {
public:ClientContext() : active(true) {}void pushMessage(const string &msg);bool hasMessages() const;string popMessage();void setWriteReady(bool ready);bool isWriteReady() const;bool isActive() const;void deactivate();private:queue<string> send_queue; // 消息队列bool write_ready = false; // 是否可写mutable mutex mtx; // const下也可锁atomic<bool> active; // 活跃检测
};
// client_context.cpp#include "client_context.h"// ClientContext implementation
void ClientContext::pushMessage(const string &msg) {lock_guard<mutex> lock(mtx);send_queue.push(msg);
}bool ClientContext::hasMessages() const {lock_guard<mutex> lock(mtx);return !send_queue.empty();
}
string ClientContext::popMessage() {lock_guard<mutex> lock(mtx);string msg = send_queue.front();send_queue.pop();return msg;
}
void ClientContext::setWriteReady(bool ready) {lock_guard<mutex> lock(mtx);write_ready = ready;
}
bool ClientContext::isWriteReady() const {lock_guard<mutex> lock(mtx);return write_ready;
}
bool ClientContext::isActive() const { return active; }
void ClientContext::deactivate() { active = false; }
四、编写Server类
封装成类,隐藏细节。
#pragma once
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <memory>
#include <unordered_map>#include "client_context.h"
#include "thread_pool.h"const int MAX_EVENTS = 10;
const int BUFFER_SIZE = 1024;class Server {
public:Server(int port);void run();private:void handleNewConnection();void handleClientEvent(epoll_event &event);void handleRead(int client_fd);void handleWrite(int client_fd);void removeClient(int client_fd);void modifyEpollEvent(int fd, uint32_t events);int server_fd;int epoll_fd;ThreadPool pool;unordered_map<int, shared_ptr<ClientContext>> clients;mutex clients_mutex;
};
#include "server.h"
#include <cstdint>// Server implementation
Server::Server(int port) : pool(4) {server_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (server_fd < 0) {throw runtime_error("Socket creation failed");}sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(port);if (bind(server_fd, (sockaddr *)&server_addr, sizeof(server_addr)) < 0) {close(server_fd);throw runtime_error("Bind failed");}if (listen(server_fd, SOMAXCONN) < 0) {close(server_fd);throw runtime_error("Listen failed");}epoll_fd = epoll_create1(0);if (epoll_fd < 0) {close(server_fd);throw runtime_error("epoll_create1 failed");}epoll_event event;event.data.fd = server_fd;event.events = EPOLLIN | EPOLLET;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) < 0) {close(server_fd);close(epoll_fd);throw runtime_error("epoll_ctl failed");}
}void Server::run() {vector<epoll_event> events(MAX_EVENTS);while (true) {int event_count = epoll_wait(epoll_fd, events.data(), MAX_EVENTS, -1);if (event_count < 0) {cerr << "epoll_wait failed: " << endl;break;}for (int i = 0; i < event_count; i++) {if (events[i].data.fd == server_fd) {handleNewConnection();} else {handleClientEvent(events[i]);}}}
}void Server::handleNewConnection() {while (true) {sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);int client_fd = accept4(server_fd, (sockaddr *)&client_addr, &client_len, SOCK_NONBLOCK);if (client_fd < 0) {if (errno == EAGAIN || EWOULDBLOCK) {cout << "No more new connections to accept" << endl;break;} else {cerr << "Accept failed: " << endl;break;}}epoll_event event;event.data.fd = client_fd;event.events = EPOLLIN | EPOLLET;int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event);if (epoll_ctl_result < 0) {cerr << "epoll_ctl failed for client socket: " << endl;close(client_fd);} else {{lock_guard<mutex> lock(clients_mutex);clients[client_fd] = make_shared<ClientContext>();}}}
}void Server::handleClientEvent(epoll_event &event) {int client_fd = event.data.fd;if (event.events & (EPOLLERR | EPOLLHUP)) {cout << "Error or hangup event for client: " << client_fd << endl;removeClient(client_fd);} else {if (event.events & EPOLLIN) handleRead(client_fd);if (event.events & EPOLLOUT) handleWrite(client_fd); }
}void Server::handleRead(int client_fd) {pool.enqueue([this, client_fd] {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it == clients.end() || !it->second->isActive()) {cout << "Client " << client_fd << " not found or not active, skipping read handling" << endl;return;}client = it->second;}string buffer(BUFFER_SIZE, 0);while (true) {int read_len = read(client_fd, buffer.data(), buffer.size());if (read_len < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK)break;else {cerr << "Read failed on socket " << client_fd << endl;removeClient(client_fd);break;}} else if (read_len == 0) {cout << "Client disconnected: " << client_fd << endl;removeClient(client_fd);break;} else {cout << "Received from client " << client_fd << ": " << buffer.substr(0, read_len) << endl;string message = "Echo: " + buffer.substr(0, read_len);client->pushMessage(message);client->setWriteReady(true);modifyEpollEvent(client_fd, EPOLLIN | EPOLLOUT);}}});
}void Server::handleWrite(int client_fd) {pool.enqueue([this, client_fd] {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it == clients.end() || !it->second->isActive()) {cout << "Client " << client_fd << " not found or not active, skipping write handling" << endl;return;}client = it->second;}if (!client->isWriteReady()) return;bool keep_writing = true;while (keep_writing && client->hasMessages()) {string message = client->popMessage();size_t total_sent = 0;while (total_sent < message.size()) {int write_len = write(client_fd, message.data() + total_sent, message.size() - total_sent);if (write_len < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {client->pushMessage(message.substr(total_sent));keep_writing = false;break;} else {cerr << "Write error on socket " << client_fd << endl;removeClient(client_fd);return;}} else total_sent += write_len;}if (total_sent == message.size()) cout << "Sent to client " << client_fd << ": " << message << endl;}if (!client->hasMessages()) {client->setWriteReady(false);modifyEpollEvent(client_fd, EPOLLIN);}});
}void Server::removeClient(int client_fd) {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it != clients.end()) {client = it->second;clients.erase(it);}}if (client) {client->deactivate();int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);if (epoll_ctl_result < 0) cerr << "Failed to remove client from epoll: " << endl;close(client_fd);}
}void Server::modifyEpollEvent(int fd, uint32_t events) {epoll_event event;event.data.fd = fd;event.events = events | EPOLLET;if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0) cerr << "Failed to modify epoll event for fd " << fd << endl;
}
相关文章:
四、单线程多路IO复用+多线程业务工作池
文章目录 一、前言1 编译方法 二、单线程多路IO复用多线程业务工作池结构三、重写Client_Context类四、编写Server类 一、前言 我们以及讲完单线程多路IO复用 以及任务调度与执行的C线程池,接下来我们就给他结合起来。 由于项目变大,尝试解耦项目&#…...
单元测试--Junit
Junit是Java的单元测试框架提供了一些注解方便我们进行单元测试 1. 常用注解 常用注解: TestBeforeAll,AfterAllBeforeEach,AfterEach 使用这些注解需要先引入依赖: <dependency><groupId>org.junit.jupiter<…...
达梦数据库系列—30. DTS迁移Mysql到DM
目录 1.MySQL 源端信息 2.DM 目的端信息 3.迁移评估 4.数据库迁移 4.1源端 MySQL 准备 4.2目的端达梦准备 初始化参数设置 兼容性参数设置 创建迁移用户和表空间 4.3迁移步骤 创建迁移 配置迁移对象及策略 开始迁移 对象补迁 5.数据校验 统计 MySQL 端对象及数…...
随记0000——从0、1 到 C语言
C语言的发展历程是计算机科学史上的一个重要里程碑。 下面是从最早的机器语言到汇编语言,再到高级语言如 C 语言的简化演进过程: 1. 机器语言 定义与特点 机器语言是最底层的编程语言,由一系列二进制代码组成。直接被CPU执行,…...
C++ | Leetcode C++题解之第264题丑数II
题目: 题解: class Solution { public:int nthUglyNumber(int n) {vector<int> dp(n 1);dp[1] 1;int p2 1, p3 1, p5 1;for (int i 2; i < n; i) {int num2 dp[p2] * 2, num3 dp[p3] * 3, num5 dp[p5] * 5;dp[i] min(min(num2, num3…...
前端系列-8 集中式状态管理工具pinia
集中式状态管理工具—pinia vue3中使用pinia作为集中式状态管理工具,替代vue2中的vuex。 pinia文档可参考: https://pinia.web3doc.top/introduction.html 1.项目集成pinia 安装pinia依赖: npm install pinia在main.ts中引入pinia import { createApp } from vu…...
pytest使用
主要技术内容 1.pytest设计 接口测试 框架设想 common—公共的东西封装 1.request请求 2.Session 3.断言 4.Log 5.全局变量 6.shell命令 ❖ config---配置文件及读取 ❖ Log— ❖ payload—请求参数—*.yaml及读取 ❖ testcases—conftest.py; testcase1.py…….可…...
单表查询总结与多表查询概述
1. 单表查询总结 执行顺序: 从一张表,过滤数据,进行分组,对分组后的数据再过滤,查询出来所需数据,排序之后输出; from > where > group by > having > select > order by 2. …...
redis的使用场景和持久化方式
redis的使用场景 热点数据的缓存。热点:频繁读取的数据。限时任务的操作:短信验证码。完成session共享的问题完成分布式锁。 redis的持久化方式 什么是持久化:把内存中的数据存储到磁盘的过程,同时也可以把磁盘中的数据加载到内存…...
嵌入式Linux学习: 设备树实验
设备树(DeviceTree)是一种硬件描述机制,用于在嵌入式系统和操作系统中描述硬件设备的特性、连接关系和配置信息。它提供了一种与平台无关的方式来描述硬件,使得内核与硬件之间的耦合度降低,提高了系统的可移植性和可维…...
eqmx上读取数据处理以后添加到数据库中
目录 定义一些静态变量 定时器事件的处理器 订阅数据的执行器 处理json格式数据和将处理好的数据添加到数据库中 要求和最终效果 总结一下 定义一些静态变量 // 在这里都定义成全局的 一般都定义成静态的private static MqttClient mqttClient; // mqtt客户端 private s…...
【中项】系统集成项目管理工程师-第5章 软件工程-5.3软件设计
前言:系统集成项目管理工程师专业,现分享一些教材知识点。觉得文章还不错的喜欢点赞收藏的同时帮忙点点关注。 软考同样是国家人社部和工信部组织的国家级考试,全称为“全国计算机与软件专业技术资格(水平)考试”&…...
C++学习笔记-内联函数使用和含义
引言 内联函数是C为了优化在函数的调用带来的性能开销而设计的,特别是当函数体很小且频繁调用时,内联函数可以让编译器在调用点直接展开函数体,从而避免了函数调用的开销。 一、内联函数的定义与含义 1.1 定义 内联函数是通过在函数声明或…...
数据库(MySQL)-视图、存储过程、触发器
一、视图 视图的定义、作用 视图是从一个或者几个基本表(或视图)导出的表。它与基本表不同,是一个虚表。但是视图只能用来查看表,不能做增删改查。 视图的作用:①简化查询 ②重写格式化数据 ③频繁访问数据库 ④过…...
js 优雅的实现模板方法设计模式
在JavaScript中,优雅地实现模板方法设计模式通常意味着我们要遵循一些最佳实践,如清晰地定义算法的骨架(模板方法),并确保子类能够灵活地扩展或修改这些算法中的特定步骤。由于JavaScript是一种动态语言,我…...
C语言——输入输出
C语言——输入输出 输入输出函数的类型getcharputcharprintf占位符的分类 scanf 什么是输入输出呢? 所谓输入输出是以计算机为主机而言的,往内存中输入数据为输入,反之从内存中输出数据为输出。 输入输出的功能 C语言本身是不提供输入输出功能…...
【微软蓝屏】微软Windows蓝屏问题汇总与应对解决策略
✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,…...
OpenCV图像滤波(2)均值平滑处理函数blur()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 在OpenCV中,blur()函数用于对图像应用简单的均值模糊(mean blur)。这种模糊效果可以通过将图像中的每个像素替…...
Android lmkd机制详解
目录 一、lmkd介绍 二、lmkd实现原理 2.1 工作原理图 2.2 初始化 2.3 oom_adj获取 2.4 监听psi事件及处理 2.5 进程选取与查杀 2.5.1 进程选取 2.5.2 进程查杀 三、关键系统属性 四、核心数据结构 五、代码时序 一、lmkd介绍 Android lmkd采用epoll方式监听linux内…...
linux shell(中)
结构化命令 if语句 if-then 最基本的结构化命令是 if-then 语句。if-then 语句的格式如下: if command thencommands ifif command; then # 通过把分号(;)放在待求值的命令尾部,可以将 then 语句写在同一行commands ifbash sh…...
合宙 MCP 工具:TRAE AI 自然语言控制 Luatools 实操
合宙MCP工具基于 MCP 协议,实现 AI 大模型与 Luatools 的无缝连接,开发者通过简单 JSON 配置,就能在 TRAE 编辑器用自然语言操控 Luatools 完成固件下载、日志获取等操作,告别手动烧录的繁琐。 核心能力: 固件自动烧录…...
通用GUI编程技术——Win32 原生编程实战(十八)——GDI 设备上下文(HDC)完全指南
通用GUI编程技术——Win32 原生编程实战(十八)——GDI 设备上下文(HDC)完全指南 前面一系列文章我们聊了对话框、控件、资源这些内容,我们的窗口已经能够显示各种控件了。但你可能已经发现了一个问题:我们所…...
财务银行对账费时间?RPA自动对接流水,10分钟对完1个月账
RPA自动化银行对账的优势传统手工对账通常需要财务人员逐笔核对银行流水和企业账目,耗时费力且易出错。RPA(机器人流程自动化)技术可实现银行流水与企业账务系统的自动对接,大幅提升效率。10分钟完成1个月账目核对已成为现实。RPA…...
【大模型工程实践③】RAG 基础架构与完整实现
【大模型工程实践③】RAG 基础架构与完整实现:从0到1跑通 作者:AI学习者 | 来源:大模型工程实践学习系列 | 更新:2026年3月 【理论要点速览】 学习本篇前,建议先掌握以下核心理论(点击跳转): ① 为什么需要RAG? ② RAG vs Fine-tuning vs Long Context的决策框架 ③ …...
从GTS-800到GTS-400:手把手教你移植C#点胶机程序到不同固高控制卡
从GTS-800到GTS-400:工业点胶系统迁移实战指南 当生产线上的点胶机控制卡需要从GTS-800更换为GTS-400时,许多工程师会发现"使用方法类似"这个说法背后隐藏着大量细节差异。去年我们团队完成了一个医疗设备点胶系统的迁移项目,原计划…...
力扣原题《有效的数独游戏》,纯手搓,已验证
请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 ,验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。 数字 1-9 在每一列只能出现一次。 数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。(请参考示例图) 注…...
4个步骤掌握高频交易策略:High-Frequency-Trading-Model-with-IB实战指南
4个步骤掌握高频交易策略:High-Frequency-Trading-Model-with-IB实战指南 【免费下载链接】High-Frequency-Trading-Model-with-IB A high-frequency trading model using Interactive Brokers API with pairs and mean-reversion in Python 项目地址: https://gi…...
IntelliJ IDEA突然无法启动的快速修复指南
1. IntelliJ IDEA突然无法启动的常见原因 作为一名常年与IntelliJ IDEA打交道的开发者,我遇到过无数次IDE突然罢工的情况。最让人头疼的是,明明昨天还用得好好的,今天双击图标却毫无反应。这种情况通常由以下几个原因导致: 首先是…...
Hutool CronUtil实战:5分钟搞定Spring Boot定时任务(含动态任务配置)
Hutool CronUtil实战:5分钟搞定Spring Boot定时任务(含动态任务配置) 在Java开发领域,定时任务几乎是每个项目都绕不开的基础需求。传统方案如Spring Scheduler虽然简单易用,但在动态任务管理和细粒度控制方面往往力不…...
白城腾讯广告服务商
在白城,有不少企业想借助腾讯广告拓展业务,这就离不开靠谱的腾讯广告服务商。今天就和大家聊聊白城腾讯广告服务商的那些事儿,长春中网互联技术在这一领域表现就相当不错。白城腾讯广告服务商现状行业报告显示,近几年白城地区对腾…...
