当前位置: 首页 > news >正文

四、单线程多路IO复用+多线程业务工作池

文章目录

  • 一、前言
    • 1 编译方法
  • 二、单线程多路IO复用+多线程业务工作池结构
  • 三、重写`Client_Context`类
  • 四、编写`Server`类

一、前言

我们以及讲完单线程多路IO复用 以及任务调度与执行的C++线程池,接下来我们就给他结合起来。

由于项目变大,尝试解耦项目,使用CMake ,可以看这篇文章现代CMake使用,使C++代码解耦

本节代码均可在仓库TinyWebServer 中找到

1 编译方法

# 进入Server目录下
mkdir build
cd build
cmake ..
cmake --build .

二、单线程多路IO复用+多线程业务工作池结构

简单来说就是把,读写任务交给线程池。

单线程多路IO复用+多线程业务工作池

三、重写Client_Context

上一节的Client_Context类并不能做到线程安全,以及管理客户端状态。所以做以下改变。

// client_context.hclass ClientContext {
public:ClientContext() : active(true) {}void pushMessage(const string &msg);bool hasMessages() const;string popMessage();void setWriteReady(bool ready);bool isWriteReady() const;bool isActive() const;void deactivate();private:queue<string> send_queue;	// 消息队列bool write_ready = false;	// 是否可写mutable mutex mtx;			// const下也可锁atomic<bool> active;		// 活跃检测
};
// client_context.cpp#include "client_context.h"// ClientContext implementation
void ClientContext::pushMessage(const string &msg) {lock_guard<mutex> lock(mtx);send_queue.push(msg);
}bool ClientContext::hasMessages() const {lock_guard<mutex> lock(mtx);return !send_queue.empty();
}
string ClientContext::popMessage() {lock_guard<mutex> lock(mtx);string msg = send_queue.front();send_queue.pop();return msg;
}
void ClientContext::setWriteReady(bool ready) {lock_guard<mutex> lock(mtx);write_ready = ready;
}
bool ClientContext::isWriteReady() const {lock_guard<mutex> lock(mtx);return write_ready;
}
bool ClientContext::isActive() const { return active; }
void ClientContext::deactivate() { active = false; }

四、编写Server

封装成类,隐藏细节。

#pragma once
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <memory>
#include <unordered_map>#include "client_context.h"
#include "thread_pool.h"const int MAX_EVENTS = 10;
const int BUFFER_SIZE = 1024;class Server {
public:Server(int port);void run();private:void handleNewConnection();void handleClientEvent(epoll_event &event);void handleRead(int client_fd);void handleWrite(int client_fd);void removeClient(int client_fd);void modifyEpollEvent(int fd, uint32_t events);int server_fd;int epoll_fd;ThreadPool pool;unordered_map<int, shared_ptr<ClientContext>> clients;mutex clients_mutex;
};
#include "server.h"
#include <cstdint>// Server implementation
Server::Server(int port) : pool(4) {server_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (server_fd < 0) {throw runtime_error("Socket creation failed");}sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(port);if (bind(server_fd, (sockaddr *)&server_addr, sizeof(server_addr)) < 0) {close(server_fd);throw runtime_error("Bind failed");}if (listen(server_fd, SOMAXCONN) < 0) {close(server_fd);throw runtime_error("Listen failed");}epoll_fd = epoll_create1(0);if (epoll_fd < 0) {close(server_fd);throw runtime_error("epoll_create1 failed");}epoll_event event;event.data.fd = server_fd;event.events = EPOLLIN | EPOLLET;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) < 0) {close(server_fd);close(epoll_fd);throw runtime_error("epoll_ctl failed");}
}void Server::run() {vector<epoll_event> events(MAX_EVENTS);while (true) {int event_count = epoll_wait(epoll_fd, events.data(), MAX_EVENTS, -1);if (event_count < 0) {cerr << "epoll_wait failed: " << endl;break;}for (int i = 0; i < event_count; i++) {if (events[i].data.fd == server_fd) {handleNewConnection();} else {handleClientEvent(events[i]);}}}
}void Server::handleNewConnection() {while (true) {sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);int client_fd = accept4(server_fd, (sockaddr *)&client_addr, &client_len, SOCK_NONBLOCK);if (client_fd < 0) {if (errno == EAGAIN || EWOULDBLOCK) {cout << "No more new connections to accept" << endl;break;} else {cerr << "Accept failed: " << endl;break;}}epoll_event event;event.data.fd = client_fd;event.events = EPOLLIN | EPOLLET;int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event);if (epoll_ctl_result < 0) {cerr << "epoll_ctl failed for client socket: " << endl;close(client_fd);} else {{lock_guard<mutex> lock(clients_mutex);clients[client_fd] = make_shared<ClientContext>();}}}
}void Server::handleClientEvent(epoll_event &event) {int client_fd = event.data.fd;if (event.events & (EPOLLERR | EPOLLHUP)) {cout << "Error or hangup event for client: " << client_fd << endl;removeClient(client_fd);} else {if (event.events & EPOLLIN) handleRead(client_fd);if (event.events & EPOLLOUT) handleWrite(client_fd);   }
}void Server::handleRead(int client_fd) {pool.enqueue([this, client_fd] {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it == clients.end() || !it->second->isActive()) {cout << "Client " << client_fd << " not found or not active, skipping read handling" << endl;return;}client = it->second;}string buffer(BUFFER_SIZE, 0);while (true) {int read_len = read(client_fd, buffer.data(), buffer.size());if (read_len < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK)break;else {cerr << "Read failed on socket " << client_fd << endl;removeClient(client_fd);break;}} else if (read_len == 0) {cout << "Client disconnected: " << client_fd << endl;removeClient(client_fd);break;} else {cout << "Received from client " << client_fd << ": " << buffer.substr(0, read_len) << endl;string message = "Echo: " + buffer.substr(0, read_len);client->pushMessage(message);client->setWriteReady(true);modifyEpollEvent(client_fd, EPOLLIN | EPOLLOUT);}}});
}void Server::handleWrite(int client_fd) {pool.enqueue([this, client_fd] {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it == clients.end() || !it->second->isActive()) {cout << "Client " << client_fd << " not found or not active, skipping write handling" << endl;return;}client = it->second;}if (!client->isWriteReady()) return;bool keep_writing = true;while (keep_writing && client->hasMessages()) {string message = client->popMessage();size_t total_sent = 0;while (total_sent < message.size()) {int write_len = write(client_fd, message.data() + total_sent, message.size() - total_sent);if (write_len < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {client->pushMessage(message.substr(total_sent));keep_writing = false;break;} else {cerr << "Write error on socket " << client_fd << endl;removeClient(client_fd);return;}} else total_sent += write_len;}if (total_sent == message.size()) cout << "Sent to client " << client_fd << ": " << message << endl;}if (!client->hasMessages()) {client->setWriteReady(false);modifyEpollEvent(client_fd, EPOLLIN);}});
}void Server::removeClient(int client_fd) {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it != clients.end()) {client = it->second;clients.erase(it);}}if (client) {client->deactivate();int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);if (epoll_ctl_result < 0) cerr << "Failed to remove client from epoll: " << endl;close(client_fd);}
}void Server::modifyEpollEvent(int fd, uint32_t events) {epoll_event event;event.data.fd = fd;event.events = events | EPOLLET;if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0) cerr << "Failed to modify epoll event for fd " << fd << endl;
}

相关文章:

四、单线程多路IO复用+多线程业务工作池

文章目录 一、前言1 编译方法 二、单线程多路IO复用多线程业务工作池结构三、重写Client_Context类四、编写Server类 一、前言 我们以及讲完单线程多路IO复用 以及任务调度与执行的C线程池&#xff0c;接下来我们就给他结合起来。 由于项目变大&#xff0c;尝试解耦项目&#…...

单元测试--Junit

Junit是Java的单元测试框架提供了一些注解方便我们进行单元测试 1. 常用注解 常用注解&#xff1a; TestBeforeAll&#xff0c;AfterAllBeforeEach&#xff0c;AfterEach 使用这些注解需要先引入依赖&#xff1a; <dependency><groupId>org.junit.jupiter<…...

达梦数据库系列—30. DTS迁移Mysql到DM

目录 1.MySQL 源端信息 2.DM 目的端信息 3.迁移评估 4.数据库迁移 4.1源端 MySQL 准备 4.2目的端达梦准备 初始化参数设置 兼容性参数设置 创建迁移用户和表空间 4.3迁移步骤 创建迁移 配置迁移对象及策略 开始迁移 对象补迁 5.数据校验 统计 MySQL 端对象及数…...

随记0000——从0、1 到 C语言

C语言的发展历程是计算机科学史上的一个重要里程碑。 下面是从最早的机器语言到汇编语言&#xff0c;再到高级语言如 C 语言的简化演进过程&#xff1a; 1. 机器语言 定义与特点 机器语言是最底层的编程语言&#xff0c;由一系列二进制代码组成。直接被CPU执行&#xff0c;…...

C++ | Leetcode C++题解之第264题丑数II

题目&#xff1a; 题解&#xff1a; class Solution { public:int nthUglyNumber(int n) {vector<int> dp(n 1);dp[1] 1;int p2 1, p3 1, p5 1;for (int i 2; i < n; i) {int num2 dp[p2] * 2, num3 dp[p3] * 3, num5 dp[p5] * 5;dp[i] min(min(num2, num3…...

前端系列-8 集中式状态管理工具pinia

集中式状态管理工具—pinia vue3中使用pinia作为集中式状态管理工具&#xff0c;替代vue2中的vuex。 pinia文档可参考: https://pinia.web3doc.top/introduction.html 1.项目集成pinia 安装pinia依赖: npm install pinia在main.ts中引入pinia import { createApp } from vu…...

pytest使用

主要技术内容 1.pytest设计 接口测试 框架设想 common—公共的东西封装 1.request请求 2.Session 3.断言 4.Log 5.全局变量 6.shell命令 ❖ config---配置文件及读取 ❖ Log— ❖ payload—请求参数—*.yaml及读取 ❖ testcases—conftest.py; testcase1.py…….可…...

单表查询总结与多表查询概述

1. 单表查询总结 执行顺序&#xff1a; 从一张表&#xff0c;过滤数据&#xff0c;进行分组&#xff0c;对分组后的数据再过滤&#xff0c;查询出来所需数据&#xff0c;排序之后输出&#xff1b; from > where > group by > having > select > order by 2. …...

redis的使用场景和持久化方式

redis的使用场景 热点数据的缓存。热点&#xff1a;频繁读取的数据。限时任务的操作&#xff1a;短信验证码。完成session共享的问题完成分布式锁。 redis的持久化方式 什么是持久化&#xff1a;把内存中的数据存储到磁盘的过程&#xff0c;同时也可以把磁盘中的数据加载到内存…...

嵌入式Linux学习: 设备树实验

设备树&#xff08;DeviceTree&#xff09;是一种硬件描述机制&#xff0c;用于在嵌入式系统和操作系统中描述硬件设备的特性、连接关系和配置信息。它提供了一种与平台无关的方式来描述硬件&#xff0c;使得内核与硬件之间的耦合度降低&#xff0c;提高了系统的可移植性和可维…...

eqmx上读取数据处理以后添加到数据库中

目录 定义一些静态变量 定时器事件的处理器 订阅数据的执行器 处理json格式数据和将处理好的数据添加到数据库中 要求和最终效果 总结一下 定义一些静态变量 // 在这里都定义成全局的 一般都定义成静态的private static MqttClient mqttClient; // mqtt客户端 private s…...

【中项】系统集成项目管理工程师-第5章 软件工程-5.3软件设计

前言&#xff1a;系统集成项目管理工程师专业&#xff0c;现分享一些教材知识点。觉得文章还不错的喜欢点赞收藏的同时帮忙点点关注。 软考同样是国家人社部和工信部组织的国家级考试&#xff0c;全称为“全国计算机与软件专业技术资格&#xff08;水平&#xff09;考试”&…...

C++学习笔记-内联函数使用和含义

引言 内联函数是C为了优化在函数的调用带来的性能开销而设计的&#xff0c;特别是当函数体很小且频繁调用时&#xff0c;内联函数可以让编译器在调用点直接展开函数体&#xff0c;从而避免了函数调用的开销。 一、内联函数的定义与含义 1.1 定义 内联函数是通过在函数声明或…...

数据库(MySQL)-视图、存储过程、触发器

一、视图 视图的定义、作用 视图是从一个或者几个基本表&#xff08;或视图&#xff09;导出的表。它与基本表不同&#xff0c;是一个虚表。但是视图只能用来查看表&#xff0c;不能做增删改查。 视图的作用&#xff1a;①简化查询 ②重写格式化数据 ③频繁访问数据库 ④过…...

js 优雅的实现模板方法设计模式

在JavaScript中&#xff0c;优雅地实现模板方法设计模式通常意味着我们要遵循一些最佳实践&#xff0c;如清晰地定义算法的骨架&#xff08;模板方法&#xff09;&#xff0c;并确保子类能够灵活地扩展或修改这些算法中的特定步骤。由于JavaScript是一种动态语言&#xff0c;我…...

C语言——输入输出

C语言——输入输出 输入输出函数的类型getcharputcharprintf占位符的分类 scanf 什么是输入输出呢&#xff1f; 所谓输入输出是以计算机为主机而言的&#xff0c;往内存中输入数据为输入&#xff0c;反之从内存中输出数据为输出。 输入输出的功能 C语言本身是不提供输入输出功能…...

【微软蓝屏】微软Windows蓝屏问题汇总与应对解决策略

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…...

OpenCV图像滤波(2)均值平滑处理函数blur()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在OpenCV中&#xff0c;blur()函数用于对图像应用简单的均值模糊&#xff08;mean blur&#xff09;。这种模糊效果可以通过将图像中的每个像素替…...

Android lmkd机制详解

目录 一、lmkd介绍 二、lmkd实现原理 2.1 工作原理图 2.2 初始化 2.3 oom_adj获取 2.4 监听psi事件及处理 2.5 进程选取与查杀 2.5.1 进程选取 2.5.2 进程查杀 三、关键系统属性 四、核心数据结构 五、代码时序 一、lmkd介绍 Android lmkd采用epoll方式监听linux内…...

linux shell(中)

结构化命令 if语句 if-then 最基本的结构化命令是 if-then 语句。if-then 语句的格式如下&#xff1a; if command thencommands ifif command; then # 通过把分号&#xff08;;&#xff09;放在待求值的命令尾部&#xff0c;可以将 then 语句写在同一行commands ifbash sh…...

VMware三种网络模式---巨细

文章目录 目录 ‘一.网络模式概述 二.桥接模式 二.NAT模式 三.仅主机模式 四.案例演示 防火墙配置&#xff1a; 虚拟电脑配置 前言 本文主要介绍VMware的三种网络模式 ‘一.网络模式概述 VMware中分为三种网络模式&#xff1a; 桥接模式&#xff1a;默认与宿主机VMnet0绑…...

力扣高频SQL 50 题(基础版)第一题

文章目录 力扣高频SQL 50 题&#xff08;基础版&#xff09;第一题1757.可回收且低脂的产品题目说明思路分析实现过程准备数据&#xff1a;实现方式&#xff1a;结果截图&#xff1a; 力扣高频SQL 50 题&#xff08;基础版&#xff09;第一题 1757.可回收且低脂的产品 题目说…...

2.1.卷积层

卷积 ​ 用MLP处理图片的问题&#xff1a;假设一张图片有12M像素&#xff0c;那么RGB图片就有36M元素&#xff0c;使用大小为100的单隐藏层&#xff0c;模型有3.6B元素&#xff0c;这个数量非常大。 识别模式的两个原则&#xff1a; 平移不变性&#xff08;translation inva…...

网易《永劫无间》手游上线,掀起游戏界狂潮

原标题&#xff1a;网易《永劫无间》手游上线&#xff0c;网友&#xff1a;发烧严重 易采游戏网7月26日消息&#xff1a;自网易宣布《永劫无间》手游即将上线以来&#xff0c;广大游戏玩家的期待值就不断攀升。作为一款拥有丰富内容和极高自由度的游戏&#xff0c;《永劫无间》…...

RNN(一)——循环神经网络的实现

文章目录 一、循环神经网络RNN1.RNN是什么2.RNN的语言模型3.RNN的结构形式 二、完整代码三、代码解读1.参数return_sequences2.调参过程 一、循环神经网络RNN 1.RNN是什么 循环神经网络RNN主要体现在上下文对理解的重要性&#xff0c;他比传统的神经网络&#xff08;传统的神…...

php 根据位置的经纬度计算距离

在开发中,我们要经常和位置打交道,要计算附近的位置、距离什么的。如下: 一.sql语句 SELECT houseID,title,location,chamber,room,toward,area,rent,is_verify,look_type,look_time, traffic,block_name,images,tag,create_time,update_time, location->&g…...

17 Python常用内置函数——基本输入输出

input() 和 print() 是 Python 的基本输入输出函数&#xff0c;前者用来接收用户的键盘输入&#xff0c;后者用来把数据以指定的格式输出到标准控制台或指定的文件对象。无论用户输入什么内容&#xff0c;input() 一律作为字符串对待&#xff0c;必要时可以使用内置函数 int()、…...

【Web】LitCTF 2024 题解(全)

目录 浏览器也能套娃&#xff1f; 一个....池子&#xff1f; 高亮主题(划掉)背景查看器 百万美元的诱惑 SAS - Serializing Authentication exx 浏览器也能套娃&#xff1f; 随便试一试&#xff0c;一眼ssrf file:///flag直接读本地文件 一个....池子&#xff1f; {…...

家政项目小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;家政人员管理&#xff0c;家政服务管理&#xff0c;咨询信息管理&#xff0c;咨询服务管理&#xff0c;家政预约管理&#xff0c;留言板管理&#xff0c;系统管理 微信端账号功能…...

electron TodoList网页应用打包成linux deb、AppImage应用

这里用的是windows的wsl的ubuntu环境 electron应用打包linux应用需要linux下打包&#xff0c;这里用windows的wsl的ubuntu环境进行操作 1&#xff09;linux ubuntu安装nodejs、electron 安装nodejs&#xff1a; sudo apt update sudo apt upgrade ##快捷安装 curl -fsSL http…...