当前位置: 首页 > news >正文

四、单线程多路IO复用+多线程业务工作池

文章目录

  • 一、前言
    • 1 编译方法
  • 二、单线程多路IO复用+多线程业务工作池结构
  • 三、重写`Client_Context`类
  • 四、编写`Server`类

一、前言

我们以及讲完单线程多路IO复用 以及任务调度与执行的C++线程池,接下来我们就给他结合起来。

由于项目变大,尝试解耦项目,使用CMake ,可以看这篇文章现代CMake使用,使C++代码解耦

本节代码均可在仓库TinyWebServer 中找到

1 编译方法

# 进入Server目录下
mkdir build
cd build
cmake ..
cmake --build .

二、单线程多路IO复用+多线程业务工作池结构

简单来说就是把,读写任务交给线程池。

单线程多路IO复用+多线程业务工作池

三、重写Client_Context

上一节的Client_Context类并不能做到线程安全,以及管理客户端状态。所以做以下改变。

// client_context.hclass ClientContext {
public:ClientContext() : active(true) {}void pushMessage(const string &msg);bool hasMessages() const;string popMessage();void setWriteReady(bool ready);bool isWriteReady() const;bool isActive() const;void deactivate();private:queue<string> send_queue;	// 消息队列bool write_ready = false;	// 是否可写mutable mutex mtx;			// const下也可锁atomic<bool> active;		// 活跃检测
};
// client_context.cpp#include "client_context.h"// ClientContext implementation
void ClientContext::pushMessage(const string &msg) {lock_guard<mutex> lock(mtx);send_queue.push(msg);
}bool ClientContext::hasMessages() const {lock_guard<mutex> lock(mtx);return !send_queue.empty();
}
string ClientContext::popMessage() {lock_guard<mutex> lock(mtx);string msg = send_queue.front();send_queue.pop();return msg;
}
void ClientContext::setWriteReady(bool ready) {lock_guard<mutex> lock(mtx);write_ready = ready;
}
bool ClientContext::isWriteReady() const {lock_guard<mutex> lock(mtx);return write_ready;
}
bool ClientContext::isActive() const { return active; }
void ClientContext::deactivate() { active = false; }

四、编写Server

封装成类,隐藏细节。

#pragma once
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <memory>
#include <unordered_map>#include "client_context.h"
#include "thread_pool.h"const int MAX_EVENTS = 10;
const int BUFFER_SIZE = 1024;class Server {
public:Server(int port);void run();private:void handleNewConnection();void handleClientEvent(epoll_event &event);void handleRead(int client_fd);void handleWrite(int client_fd);void removeClient(int client_fd);void modifyEpollEvent(int fd, uint32_t events);int server_fd;int epoll_fd;ThreadPool pool;unordered_map<int, shared_ptr<ClientContext>> clients;mutex clients_mutex;
};
#include "server.h"
#include <cstdint>// Server implementation
Server::Server(int port) : pool(4) {server_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (server_fd < 0) {throw runtime_error("Socket creation failed");}sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(port);if (bind(server_fd, (sockaddr *)&server_addr, sizeof(server_addr)) < 0) {close(server_fd);throw runtime_error("Bind failed");}if (listen(server_fd, SOMAXCONN) < 0) {close(server_fd);throw runtime_error("Listen failed");}epoll_fd = epoll_create1(0);if (epoll_fd < 0) {close(server_fd);throw runtime_error("epoll_create1 failed");}epoll_event event;event.data.fd = server_fd;event.events = EPOLLIN | EPOLLET;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) < 0) {close(server_fd);close(epoll_fd);throw runtime_error("epoll_ctl failed");}
}void Server::run() {vector<epoll_event> events(MAX_EVENTS);while (true) {int event_count = epoll_wait(epoll_fd, events.data(), MAX_EVENTS, -1);if (event_count < 0) {cerr << "epoll_wait failed: " << endl;break;}for (int i = 0; i < event_count; i++) {if (events[i].data.fd == server_fd) {handleNewConnection();} else {handleClientEvent(events[i]);}}}
}void Server::handleNewConnection() {while (true) {sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);int client_fd = accept4(server_fd, (sockaddr *)&client_addr, &client_len, SOCK_NONBLOCK);if (client_fd < 0) {if (errno == EAGAIN || EWOULDBLOCK) {cout << "No more new connections to accept" << endl;break;} else {cerr << "Accept failed: " << endl;break;}}epoll_event event;event.data.fd = client_fd;event.events = EPOLLIN | EPOLLET;int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event);if (epoll_ctl_result < 0) {cerr << "epoll_ctl failed for client socket: " << endl;close(client_fd);} else {{lock_guard<mutex> lock(clients_mutex);clients[client_fd] = make_shared<ClientContext>();}}}
}void Server::handleClientEvent(epoll_event &event) {int client_fd = event.data.fd;if (event.events & (EPOLLERR | EPOLLHUP)) {cout << "Error or hangup event for client: " << client_fd << endl;removeClient(client_fd);} else {if (event.events & EPOLLIN) handleRead(client_fd);if (event.events & EPOLLOUT) handleWrite(client_fd);   }
}void Server::handleRead(int client_fd) {pool.enqueue([this, client_fd] {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it == clients.end() || !it->second->isActive()) {cout << "Client " << client_fd << " not found or not active, skipping read handling" << endl;return;}client = it->second;}string buffer(BUFFER_SIZE, 0);while (true) {int read_len = read(client_fd, buffer.data(), buffer.size());if (read_len < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK)break;else {cerr << "Read failed on socket " << client_fd << endl;removeClient(client_fd);break;}} else if (read_len == 0) {cout << "Client disconnected: " << client_fd << endl;removeClient(client_fd);break;} else {cout << "Received from client " << client_fd << ": " << buffer.substr(0, read_len) << endl;string message = "Echo: " + buffer.substr(0, read_len);client->pushMessage(message);client->setWriteReady(true);modifyEpollEvent(client_fd, EPOLLIN | EPOLLOUT);}}});
}void Server::handleWrite(int client_fd) {pool.enqueue([this, client_fd] {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it == clients.end() || !it->second->isActive()) {cout << "Client " << client_fd << " not found or not active, skipping write handling" << endl;return;}client = it->second;}if (!client->isWriteReady()) return;bool keep_writing = true;while (keep_writing && client->hasMessages()) {string message = client->popMessage();size_t total_sent = 0;while (total_sent < message.size()) {int write_len = write(client_fd, message.data() + total_sent, message.size() - total_sent);if (write_len < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {client->pushMessage(message.substr(total_sent));keep_writing = false;break;} else {cerr << "Write error on socket " << client_fd << endl;removeClient(client_fd);return;}} else total_sent += write_len;}if (total_sent == message.size()) cout << "Sent to client " << client_fd << ": " << message << endl;}if (!client->hasMessages()) {client->setWriteReady(false);modifyEpollEvent(client_fd, EPOLLIN);}});
}void Server::removeClient(int client_fd) {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it != clients.end()) {client = it->second;clients.erase(it);}}if (client) {client->deactivate();int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);if (epoll_ctl_result < 0) cerr << "Failed to remove client from epoll: " << endl;close(client_fd);}
}void Server::modifyEpollEvent(int fd, uint32_t events) {epoll_event event;event.data.fd = fd;event.events = events | EPOLLET;if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0) cerr << "Failed to modify epoll event for fd " << fd << endl;
}

相关文章:

四、单线程多路IO复用+多线程业务工作池

文章目录 一、前言1 编译方法 二、单线程多路IO复用多线程业务工作池结构三、重写Client_Context类四、编写Server类 一、前言 我们以及讲完单线程多路IO复用 以及任务调度与执行的C线程池&#xff0c;接下来我们就给他结合起来。 由于项目变大&#xff0c;尝试解耦项目&#…...

单元测试--Junit

Junit是Java的单元测试框架提供了一些注解方便我们进行单元测试 1. 常用注解 常用注解&#xff1a; TestBeforeAll&#xff0c;AfterAllBeforeEach&#xff0c;AfterEach 使用这些注解需要先引入依赖&#xff1a; <dependency><groupId>org.junit.jupiter<…...

达梦数据库系列—30. DTS迁移Mysql到DM

目录 1.MySQL 源端信息 2.DM 目的端信息 3.迁移评估 4.数据库迁移 4.1源端 MySQL 准备 4.2目的端达梦准备 初始化参数设置 兼容性参数设置 创建迁移用户和表空间 4.3迁移步骤 创建迁移 配置迁移对象及策略 开始迁移 对象补迁 5.数据校验 统计 MySQL 端对象及数…...

随记0000——从0、1 到 C语言

C语言的发展历程是计算机科学史上的一个重要里程碑。 下面是从最早的机器语言到汇编语言&#xff0c;再到高级语言如 C 语言的简化演进过程&#xff1a; 1. 机器语言 定义与特点 机器语言是最底层的编程语言&#xff0c;由一系列二进制代码组成。直接被CPU执行&#xff0c;…...

C++ | Leetcode C++题解之第264题丑数II

题目&#xff1a; 题解&#xff1a; class Solution { public:int nthUglyNumber(int n) {vector<int> dp(n 1);dp[1] 1;int p2 1, p3 1, p5 1;for (int i 2; i < n; i) {int num2 dp[p2] * 2, num3 dp[p3] * 3, num5 dp[p5] * 5;dp[i] min(min(num2, num3…...

前端系列-8 集中式状态管理工具pinia

集中式状态管理工具—pinia vue3中使用pinia作为集中式状态管理工具&#xff0c;替代vue2中的vuex。 pinia文档可参考: https://pinia.web3doc.top/introduction.html 1.项目集成pinia 安装pinia依赖: npm install pinia在main.ts中引入pinia import { createApp } from vu…...

pytest使用

主要技术内容 1.pytest设计 接口测试 框架设想 common—公共的东西封装 1.request请求 2.Session 3.断言 4.Log 5.全局变量 6.shell命令 ❖ config---配置文件及读取 ❖ Log— ❖ payload—请求参数—*.yaml及读取 ❖ testcases—conftest.py; testcase1.py…….可…...

单表查询总结与多表查询概述

1. 单表查询总结 执行顺序&#xff1a; 从一张表&#xff0c;过滤数据&#xff0c;进行分组&#xff0c;对分组后的数据再过滤&#xff0c;查询出来所需数据&#xff0c;排序之后输出&#xff1b; from > where > group by > having > select > order by 2. …...

redis的使用场景和持久化方式

redis的使用场景 热点数据的缓存。热点&#xff1a;频繁读取的数据。限时任务的操作&#xff1a;短信验证码。完成session共享的问题完成分布式锁。 redis的持久化方式 什么是持久化&#xff1a;把内存中的数据存储到磁盘的过程&#xff0c;同时也可以把磁盘中的数据加载到内存…...

嵌入式Linux学习: 设备树实验

设备树&#xff08;DeviceTree&#xff09;是一种硬件描述机制&#xff0c;用于在嵌入式系统和操作系统中描述硬件设备的特性、连接关系和配置信息。它提供了一种与平台无关的方式来描述硬件&#xff0c;使得内核与硬件之间的耦合度降低&#xff0c;提高了系统的可移植性和可维…...

eqmx上读取数据处理以后添加到数据库中

目录 定义一些静态变量 定时器事件的处理器 订阅数据的执行器 处理json格式数据和将处理好的数据添加到数据库中 要求和最终效果 总结一下 定义一些静态变量 // 在这里都定义成全局的 一般都定义成静态的private static MqttClient mqttClient; // mqtt客户端 private s…...

【中项】系统集成项目管理工程师-第5章 软件工程-5.3软件设计

前言&#xff1a;系统集成项目管理工程师专业&#xff0c;现分享一些教材知识点。觉得文章还不错的喜欢点赞收藏的同时帮忙点点关注。 软考同样是国家人社部和工信部组织的国家级考试&#xff0c;全称为“全国计算机与软件专业技术资格&#xff08;水平&#xff09;考试”&…...

C++学习笔记-内联函数使用和含义

引言 内联函数是C为了优化在函数的调用带来的性能开销而设计的&#xff0c;特别是当函数体很小且频繁调用时&#xff0c;内联函数可以让编译器在调用点直接展开函数体&#xff0c;从而避免了函数调用的开销。 一、内联函数的定义与含义 1.1 定义 内联函数是通过在函数声明或…...

数据库(MySQL)-视图、存储过程、触发器

一、视图 视图的定义、作用 视图是从一个或者几个基本表&#xff08;或视图&#xff09;导出的表。它与基本表不同&#xff0c;是一个虚表。但是视图只能用来查看表&#xff0c;不能做增删改查。 视图的作用&#xff1a;①简化查询 ②重写格式化数据 ③频繁访问数据库 ④过…...

js 优雅的实现模板方法设计模式

在JavaScript中&#xff0c;优雅地实现模板方法设计模式通常意味着我们要遵循一些最佳实践&#xff0c;如清晰地定义算法的骨架&#xff08;模板方法&#xff09;&#xff0c;并确保子类能够灵活地扩展或修改这些算法中的特定步骤。由于JavaScript是一种动态语言&#xff0c;我…...

C语言——输入输出

C语言——输入输出 输入输出函数的类型getcharputcharprintf占位符的分类 scanf 什么是输入输出呢&#xff1f; 所谓输入输出是以计算机为主机而言的&#xff0c;往内存中输入数据为输入&#xff0c;反之从内存中输出数据为输出。 输入输出的功能 C语言本身是不提供输入输出功能…...

【微软蓝屏】微软Windows蓝屏问题汇总与应对解决策略

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…...

OpenCV图像滤波(2)均值平滑处理函数blur()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在OpenCV中&#xff0c;blur()函数用于对图像应用简单的均值模糊&#xff08;mean blur&#xff09;。这种模糊效果可以通过将图像中的每个像素替…...

Android lmkd机制详解

目录 一、lmkd介绍 二、lmkd实现原理 2.1 工作原理图 2.2 初始化 2.3 oom_adj获取 2.4 监听psi事件及处理 2.5 进程选取与查杀 2.5.1 进程选取 2.5.2 进程查杀 三、关键系统属性 四、核心数据结构 五、代码时序 一、lmkd介绍 Android lmkd采用epoll方式监听linux内…...

linux shell(中)

结构化命令 if语句 if-then 最基本的结构化命令是 if-then 语句。if-then 语句的格式如下&#xff1a; if command thencommands ifif command; then # 通过把分号&#xff08;;&#xff09;放在待求值的命令尾部&#xff0c;可以将 then 语句写在同一行commands ifbash sh…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

使用VSCode开发Django指南

使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架&#xff0c;专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用&#xff0c;其中包含三个使用通用基本模板的页面。在此…...

mongodb源码分析session执行handleRequest命令find过程

mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程&#xff0c;并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令&#xff0c;把数据流转换成Message&#xff0c;状态转变流程是&#xff1a;State::Created 》 St…...

C++中string流知识详解和示例

一、概览与类体系 C 提供三种基于内存字符串的流&#xff0c;定义在 <sstream> 中&#xff1a; std::istringstream&#xff1a;输入流&#xff0c;从已有字符串中读取并解析。std::ostringstream&#xff1a;输出流&#xff0c;向内部缓冲区写入内容&#xff0c;最终取…...

均衡后的SNRSINR

本文主要摘自参考文献中的前两篇&#xff0c;相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程&#xff0c;其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt​ 根发送天线&#xff0c; n r n_r nr​ 根接收天线的 MIMO 系…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

C++:多态机制详解

目录 一. 多态的概念 1.静态多态&#xff08;编译时多态&#xff09; 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1&#xff09;.协变 2&#xff09;.析构函数的重写 5.override 和 final关键字 1&#…...

HubSpot推出与ChatGPT的深度集成引发兴奋与担忧

上周三&#xff0c;HubSpot宣布已构建与ChatGPT的深度集成&#xff0c;这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋&#xff0c;但同时也存在一些关于数据安全的担忧。 许多网络声音声称&#xff0c;这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...

Chrome 浏览器前端与客户端双向通信实战

Chrome 前端&#xff08;即页面 JS / Web UI&#xff09;与客户端&#xff08;C 后端&#xff09;的交互机制&#xff0c;是 Chromium 架构中非常核心的一环。下面我将按常见场景&#xff0c;从通道、流程、技术栈几个角度做一套完整的分析&#xff0c;特别适合你这种在分析和改…...

Vue3中的computer和watch

computed的写法 在页面中 <div>{{ calcNumber }}</div>script中 写法1 常用 import { computed, ref } from vue; let price ref(100);const priceAdd () > { //函数方法 price 1price.value ; }//计算属性 let calcNumber computed(() > {return ${p…...