四、单线程多路IO复用+多线程业务工作池
文章目录
- 一、前言
- 1 编译方法
- 二、单线程多路IO复用+多线程业务工作池结构
- 三、重写`Client_Context`类
- 四、编写`Server`类
一、前言
我们以及讲完单线程多路IO复用 以及任务调度与执行的C++线程池,接下来我们就给他结合起来。
由于项目变大,尝试解耦项目,使用
CMake
,可以看这篇文章现代CMake使用,使C++代码解耦
本节代码均可在仓库TinyWebServer 中找到
1 编译方法
# 进入Server目录下
mkdir build
cd build
cmake ..
cmake --build .
二、单线程多路IO复用+多线程业务工作池结构
简单来说就是把,读写任务交给线程池。
三、重写Client_Context
类
上一节的Client_Context
类并不能做到线程安全,以及管理客户端状态。所以做以下改变。
// client_context.hclass ClientContext {
public:ClientContext() : active(true) {}void pushMessage(const string &msg);bool hasMessages() const;string popMessage();void setWriteReady(bool ready);bool isWriteReady() const;bool isActive() const;void deactivate();private:queue<string> send_queue; // 消息队列bool write_ready = false; // 是否可写mutable mutex mtx; // const下也可锁atomic<bool> active; // 活跃检测
};
// client_context.cpp#include "client_context.h"// ClientContext implementation
void ClientContext::pushMessage(const string &msg) {lock_guard<mutex> lock(mtx);send_queue.push(msg);
}bool ClientContext::hasMessages() const {lock_guard<mutex> lock(mtx);return !send_queue.empty();
}
string ClientContext::popMessage() {lock_guard<mutex> lock(mtx);string msg = send_queue.front();send_queue.pop();return msg;
}
void ClientContext::setWriteReady(bool ready) {lock_guard<mutex> lock(mtx);write_ready = ready;
}
bool ClientContext::isWriteReady() const {lock_guard<mutex> lock(mtx);return write_ready;
}
bool ClientContext::isActive() const { return active; }
void ClientContext::deactivate() { active = false; }
四、编写Server
类
封装成类,隐藏细节。
#pragma once
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <memory>
#include <unordered_map>#include "client_context.h"
#include "thread_pool.h"const int MAX_EVENTS = 10;
const int BUFFER_SIZE = 1024;class Server {
public:Server(int port);void run();private:void handleNewConnection();void handleClientEvent(epoll_event &event);void handleRead(int client_fd);void handleWrite(int client_fd);void removeClient(int client_fd);void modifyEpollEvent(int fd, uint32_t events);int server_fd;int epoll_fd;ThreadPool pool;unordered_map<int, shared_ptr<ClientContext>> clients;mutex clients_mutex;
};
#include "server.h"
#include <cstdint>// Server implementation
Server::Server(int port) : pool(4) {server_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (server_fd < 0) {throw runtime_error("Socket creation failed");}sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(port);if (bind(server_fd, (sockaddr *)&server_addr, sizeof(server_addr)) < 0) {close(server_fd);throw runtime_error("Bind failed");}if (listen(server_fd, SOMAXCONN) < 0) {close(server_fd);throw runtime_error("Listen failed");}epoll_fd = epoll_create1(0);if (epoll_fd < 0) {close(server_fd);throw runtime_error("epoll_create1 failed");}epoll_event event;event.data.fd = server_fd;event.events = EPOLLIN | EPOLLET;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) < 0) {close(server_fd);close(epoll_fd);throw runtime_error("epoll_ctl failed");}
}void Server::run() {vector<epoll_event> events(MAX_EVENTS);while (true) {int event_count = epoll_wait(epoll_fd, events.data(), MAX_EVENTS, -1);if (event_count < 0) {cerr << "epoll_wait failed: " << endl;break;}for (int i = 0; i < event_count; i++) {if (events[i].data.fd == server_fd) {handleNewConnection();} else {handleClientEvent(events[i]);}}}
}void Server::handleNewConnection() {while (true) {sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);int client_fd = accept4(server_fd, (sockaddr *)&client_addr, &client_len, SOCK_NONBLOCK);if (client_fd < 0) {if (errno == EAGAIN || EWOULDBLOCK) {cout << "No more new connections to accept" << endl;break;} else {cerr << "Accept failed: " << endl;break;}}epoll_event event;event.data.fd = client_fd;event.events = EPOLLIN | EPOLLET;int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event);if (epoll_ctl_result < 0) {cerr << "epoll_ctl failed for client socket: " << endl;close(client_fd);} else {{lock_guard<mutex> lock(clients_mutex);clients[client_fd] = make_shared<ClientContext>();}}}
}void Server::handleClientEvent(epoll_event &event) {int client_fd = event.data.fd;if (event.events & (EPOLLERR | EPOLLHUP)) {cout << "Error or hangup event for client: " << client_fd << endl;removeClient(client_fd);} else {if (event.events & EPOLLIN) handleRead(client_fd);if (event.events & EPOLLOUT) handleWrite(client_fd); }
}void Server::handleRead(int client_fd) {pool.enqueue([this, client_fd] {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it == clients.end() || !it->second->isActive()) {cout << "Client " << client_fd << " not found or not active, skipping read handling" << endl;return;}client = it->second;}string buffer(BUFFER_SIZE, 0);while (true) {int read_len = read(client_fd, buffer.data(), buffer.size());if (read_len < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK)break;else {cerr << "Read failed on socket " << client_fd << endl;removeClient(client_fd);break;}} else if (read_len == 0) {cout << "Client disconnected: " << client_fd << endl;removeClient(client_fd);break;} else {cout << "Received from client " << client_fd << ": " << buffer.substr(0, read_len) << endl;string message = "Echo: " + buffer.substr(0, read_len);client->pushMessage(message);client->setWriteReady(true);modifyEpollEvent(client_fd, EPOLLIN | EPOLLOUT);}}});
}void Server::handleWrite(int client_fd) {pool.enqueue([this, client_fd] {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it == clients.end() || !it->second->isActive()) {cout << "Client " << client_fd << " not found or not active, skipping write handling" << endl;return;}client = it->second;}if (!client->isWriteReady()) return;bool keep_writing = true;while (keep_writing && client->hasMessages()) {string message = client->popMessage();size_t total_sent = 0;while (total_sent < message.size()) {int write_len = write(client_fd, message.data() + total_sent, message.size() - total_sent);if (write_len < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {client->pushMessage(message.substr(total_sent));keep_writing = false;break;} else {cerr << "Write error on socket " << client_fd << endl;removeClient(client_fd);return;}} else total_sent += write_len;}if (total_sent == message.size()) cout << "Sent to client " << client_fd << ": " << message << endl;}if (!client->hasMessages()) {client->setWriteReady(false);modifyEpollEvent(client_fd, EPOLLIN);}});
}void Server::removeClient(int client_fd) {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it != clients.end()) {client = it->second;clients.erase(it);}}if (client) {client->deactivate();int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);if (epoll_ctl_result < 0) cerr << "Failed to remove client from epoll: " << endl;close(client_fd);}
}void Server::modifyEpollEvent(int fd, uint32_t events) {epoll_event event;event.data.fd = fd;event.events = events | EPOLLET;if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0) cerr << "Failed to modify epoll event for fd " << fd << endl;
}
相关文章:

四、单线程多路IO复用+多线程业务工作池
文章目录 一、前言1 编译方法 二、单线程多路IO复用多线程业务工作池结构三、重写Client_Context类四、编写Server类 一、前言 我们以及讲完单线程多路IO复用 以及任务调度与执行的C线程池,接下来我们就给他结合起来。 由于项目变大,尝试解耦项目&#…...

单元测试--Junit
Junit是Java的单元测试框架提供了一些注解方便我们进行单元测试 1. 常用注解 常用注解: TestBeforeAll,AfterAllBeforeEach,AfterEach 使用这些注解需要先引入依赖: <dependency><groupId>org.junit.jupiter<…...

达梦数据库系列—30. DTS迁移Mysql到DM
目录 1.MySQL 源端信息 2.DM 目的端信息 3.迁移评估 4.数据库迁移 4.1源端 MySQL 准备 4.2目的端达梦准备 初始化参数设置 兼容性参数设置 创建迁移用户和表空间 4.3迁移步骤 创建迁移 配置迁移对象及策略 开始迁移 对象补迁 5.数据校验 统计 MySQL 端对象及数…...

随记0000——从0、1 到 C语言
C语言的发展历程是计算机科学史上的一个重要里程碑。 下面是从最早的机器语言到汇编语言,再到高级语言如 C 语言的简化演进过程: 1. 机器语言 定义与特点 机器语言是最底层的编程语言,由一系列二进制代码组成。直接被CPU执行,…...

C++ | Leetcode C++题解之第264题丑数II
题目: 题解: class Solution { public:int nthUglyNumber(int n) {vector<int> dp(n 1);dp[1] 1;int p2 1, p3 1, p5 1;for (int i 2; i < n; i) {int num2 dp[p2] * 2, num3 dp[p3] * 3, num5 dp[p5] * 5;dp[i] min(min(num2, num3…...

前端系列-8 集中式状态管理工具pinia
集中式状态管理工具—pinia vue3中使用pinia作为集中式状态管理工具,替代vue2中的vuex。 pinia文档可参考: https://pinia.web3doc.top/introduction.html 1.项目集成pinia 安装pinia依赖: npm install pinia在main.ts中引入pinia import { createApp } from vu…...

pytest使用
主要技术内容 1.pytest设计 接口测试 框架设想 common—公共的东西封装 1.request请求 2.Session 3.断言 4.Log 5.全局变量 6.shell命令 ❖ config---配置文件及读取 ❖ Log— ❖ payload—请求参数—*.yaml及读取 ❖ testcases—conftest.py; testcase1.py…….可…...

单表查询总结与多表查询概述
1. 单表查询总结 执行顺序: 从一张表,过滤数据,进行分组,对分组后的数据再过滤,查询出来所需数据,排序之后输出; from > where > group by > having > select > order by 2. …...

redis的使用场景和持久化方式
redis的使用场景 热点数据的缓存。热点:频繁读取的数据。限时任务的操作:短信验证码。完成session共享的问题完成分布式锁。 redis的持久化方式 什么是持久化:把内存中的数据存储到磁盘的过程,同时也可以把磁盘中的数据加载到内存…...

嵌入式Linux学习: 设备树实验
设备树(DeviceTree)是一种硬件描述机制,用于在嵌入式系统和操作系统中描述硬件设备的特性、连接关系和配置信息。它提供了一种与平台无关的方式来描述硬件,使得内核与硬件之间的耦合度降低,提高了系统的可移植性和可维…...

eqmx上读取数据处理以后添加到数据库中
目录 定义一些静态变量 定时器事件的处理器 订阅数据的执行器 处理json格式数据和将处理好的数据添加到数据库中 要求和最终效果 总结一下 定义一些静态变量 // 在这里都定义成全局的 一般都定义成静态的private static MqttClient mqttClient; // mqtt客户端 private s…...

【中项】系统集成项目管理工程师-第5章 软件工程-5.3软件设计
前言:系统集成项目管理工程师专业,现分享一些教材知识点。觉得文章还不错的喜欢点赞收藏的同时帮忙点点关注。 软考同样是国家人社部和工信部组织的国家级考试,全称为“全国计算机与软件专业技术资格(水平)考试”&…...

C++学习笔记-内联函数使用和含义
引言 内联函数是C为了优化在函数的调用带来的性能开销而设计的,特别是当函数体很小且频繁调用时,内联函数可以让编译器在调用点直接展开函数体,从而避免了函数调用的开销。 一、内联函数的定义与含义 1.1 定义 内联函数是通过在函数声明或…...

数据库(MySQL)-视图、存储过程、触发器
一、视图 视图的定义、作用 视图是从一个或者几个基本表(或视图)导出的表。它与基本表不同,是一个虚表。但是视图只能用来查看表,不能做增删改查。 视图的作用:①简化查询 ②重写格式化数据 ③频繁访问数据库 ④过…...

js 优雅的实现模板方法设计模式
在JavaScript中,优雅地实现模板方法设计模式通常意味着我们要遵循一些最佳实践,如清晰地定义算法的骨架(模板方法),并确保子类能够灵活地扩展或修改这些算法中的特定步骤。由于JavaScript是一种动态语言,我…...

C语言——输入输出
C语言——输入输出 输入输出函数的类型getcharputcharprintf占位符的分类 scanf 什么是输入输出呢? 所谓输入输出是以计算机为主机而言的,往内存中输入数据为输入,反之从内存中输出数据为输出。 输入输出的功能 C语言本身是不提供输入输出功能…...

【微软蓝屏】微软Windows蓝屏问题汇总与应对解决策略
✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,…...

OpenCV图像滤波(2)均值平滑处理函数blur()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 在OpenCV中,blur()函数用于对图像应用简单的均值模糊(mean blur)。这种模糊效果可以通过将图像中的每个像素替…...

Android lmkd机制详解
目录 一、lmkd介绍 二、lmkd实现原理 2.1 工作原理图 2.2 初始化 2.3 oom_adj获取 2.4 监听psi事件及处理 2.5 进程选取与查杀 2.5.1 进程选取 2.5.2 进程查杀 三、关键系统属性 四、核心数据结构 五、代码时序 一、lmkd介绍 Android lmkd采用epoll方式监听linux内…...

linux shell(中)
结构化命令 if语句 if-then 最基本的结构化命令是 if-then 语句。if-then 语句的格式如下: if command thencommands ifif command; then # 通过把分号(;)放在待求值的命令尾部,可以将 then 语句写在同一行commands ifbash sh…...

VMware三种网络模式---巨细
文章目录 目录 ‘一.网络模式概述 二.桥接模式 二.NAT模式 三.仅主机模式 四.案例演示 防火墙配置: 虚拟电脑配置 前言 本文主要介绍VMware的三种网络模式 ‘一.网络模式概述 VMware中分为三种网络模式: 桥接模式:默认与宿主机VMnet0绑…...

力扣高频SQL 50 题(基础版)第一题
文章目录 力扣高频SQL 50 题(基础版)第一题1757.可回收且低脂的产品题目说明思路分析实现过程准备数据:实现方式:结果截图: 力扣高频SQL 50 题(基础版)第一题 1757.可回收且低脂的产品 题目说…...

2.1.卷积层
卷积 用MLP处理图片的问题:假设一张图片有12M像素,那么RGB图片就有36M元素,使用大小为100的单隐藏层,模型有3.6B元素,这个数量非常大。 识别模式的两个原则: 平移不变性(translation inva…...

网易《永劫无间》手游上线,掀起游戏界狂潮
原标题:网易《永劫无间》手游上线,网友:发烧严重 易采游戏网7月26日消息:自网易宣布《永劫无间》手游即将上线以来,广大游戏玩家的期待值就不断攀升。作为一款拥有丰富内容和极高自由度的游戏,《永劫无间》…...

RNN(一)——循环神经网络的实现
文章目录 一、循环神经网络RNN1.RNN是什么2.RNN的语言模型3.RNN的结构形式 二、完整代码三、代码解读1.参数return_sequences2.调参过程 一、循环神经网络RNN 1.RNN是什么 循环神经网络RNN主要体现在上下文对理解的重要性,他比传统的神经网络(传统的神…...

php 根据位置的经纬度计算距离
在开发中,我们要经常和位置打交道,要计算附近的位置、距离什么的。如下: 一.sql语句 SELECT houseID,title,location,chamber,room,toward,area,rent,is_verify,look_type,look_time, traffic,block_name,images,tag,create_time,update_time, location->&g…...

17 Python常用内置函数——基本输入输出
input() 和 print() 是 Python 的基本输入输出函数,前者用来接收用户的键盘输入,后者用来把数据以指定的格式输出到标准控制台或指定的文件对象。无论用户输入什么内容,input() 一律作为字符串对待,必要时可以使用内置函数 int()、…...

【Web】LitCTF 2024 题解(全)
目录 浏览器也能套娃? 一个....池子? 高亮主题(划掉)背景查看器 百万美元的诱惑 SAS - Serializing Authentication exx 浏览器也能套娃? 随便试一试,一眼ssrf file:///flag直接读本地文件 一个....池子? {…...

家政项目小程序的设计
管理员账户功能包括:系统首页,个人中心,用户管理,家政人员管理,家政服务管理,咨询信息管理,咨询服务管理,家政预约管理,留言板管理,系统管理 微信端账号功能…...

electron TodoList网页应用打包成linux deb、AppImage应用
这里用的是windows的wsl的ubuntu环境 electron应用打包linux应用需要linux下打包,这里用windows的wsl的ubuntu环境进行操作 1)linux ubuntu安装nodejs、electron 安装nodejs: sudo apt update sudo apt upgrade ##快捷安装 curl -fsSL http…...