当前位置: 首页 > news >正文

基于多线程的Reactor模式的 回声服务器 EchoServer

记录下  

一个线程专门用来接受accept获取客户端的fd 

获取fd之后 从剩余的执行线程中 找到一个连接客户端数量最少的线程

然后将客户端的fd加入到这个线程中并通过EPOLL监听这个fd

线程之间通过eventfd来通信  将客户端的fd传到 对应的线程中  

参考了MediaServer   引入EventPollerPoll 和 EventPoller的 概念  

最少两个两个线程 设置为1的话 会改成2

cpp代码:

#include "durian.h"#include <sys/epoll.h>namespace DURIAN
{EventPoller::EventPoller(int id){m_id = id;}EventPoller::~EventPoller(){printf("~EventPoller signal m_id = %d m_run_flag = %d\n",m_id,m_run_flag);Wait();}bool EventPoller::Init(){m_poll_fd = epoll_create1(0);if(m_poll_fd == -1){return false;}m_event_fd = eventfd(0,0);if(m_event_fd == -1){printf("new fd failed\n");close(m_poll_fd);return false ;}return true;}void EventPoller::RunLoop(){static const int MAX_EVENTS = 1024;struct epoll_event events[MAX_EVENTS];while(m_run_flag){int ready_count = epoll_wait(m_poll_fd,events,MAX_EVENTS,2000);if(ready_count == -1){if(errno != EINTR){//exit(1);}//ready_count = 0;}else if(ready_count == 0){if(m_run_flag == false){//printf("time out and runflag = false exit thread\n");//break;}}for(int i = 0;i<ready_count;i++){const struct epoll_event &ev = events[i];int fd = events[i].data.fd;if(ev.events &(EPOLLIN | EPOLLERR |EPOLLHUP)){auto handler = m_accept_handlers[fd];handler(fd);}else if(ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)){auto it = m_buffer_pool.find(fd);if(it!= m_buffer_pool.end()){auto &buf = it->second;if(buf.WriteData(fd) == false){Close(fd);}}}}}}int EventPoller::GetEventFD(){return m_event_fd;}int EventPoller::GetClients(){return m_accept_handlers.size();}void EventPoller::Stop(){m_run_flag = false;}void EventPoller::Start(){//printf("Enter EventPoller Start  m_id = %d pollfd = %d eventid = %d\n",m_id,m_poll_fd,m_event_fd);m_run_flag = true;m_thread_id = std::thread(&EventPoller::RunLoop,this);}void EventPoller::Wait(){if(m_thread_id.joinable()){m_thread_id.join();}}bool EventPoller::Add2Epoll(int fd){if(m_accept_handlers.count(fd) != 0){return false;}int flags = 1;if(ioctl(fd,FIONBIO,&flags) == -1){return false;}struct epoll_event ev;ev.events = EPOLLIN |EPOLLOUT |EPOLLET;ev.data.fd = fd;if(epoll_ctl(m_poll_fd,EPOLL_CTL_ADD,fd,&ev)==-1){return false;}return true;}void EventPoller::DeliverConn(int conn_fd){//printf("DeliverConn fd = %d\n",conn_fd);uint64_t count = conn_fd;if(write(m_event_fd,&count,sizeof(count)) == -1){printf("Deliverconn write failed\n");}}bool EventPoller::AddListener(int fd,ACCEPTER on_accept){if(Add2Epoll(fd) == false){return false;}std::cout<<"EventPoller AddListener fd = "<<fd<<std::endl;m_accept_handlers[fd] = [this,on_accept]( int server_fd){for(;;){int new_fd = accept(server_fd,nullptr,nullptr);std::cout<<"accept client fd = "<<new_fd<<std::endl;	if(new_fd == -1){if(errno!= EAGAIN){Close(server_fd);}return 0;}int enable = 1;setsockopt(new_fd,IPPROTO_TCP,TCP_NODELAY,&enable,sizeof(enable));on_accept(new_fd);}return 0;};return true;}bool EventPoller::AddEventer(int fd, EVENTER on_event){if(Add2Epoll(fd) == false){return false;}m_accept_handlers[fd] = [this,on_event](int cfd){for(;;){uint64_t count;if(read(cfd,&count,sizeof(count)) == -1){if(errno != EAGAIN){Close(cfd);}return 0;}on_event(count);}return 0;};return true;}bool EventPoller::AddReader(int fd, READER on_read){	if(Add2Epoll(fd) == false){return false;}m_accept_handlers[fd] = [this,on_read](int cfd){for(;;){char buf[4096] = {0};ssize_t ret = read(cfd,buf,sizeof(buf));if(ret == -1){if(errno != EAGAIN){Close(cfd);}return -1;}if(ret == 0){Close(cfd);printf("客户端关闭了连接 %d\n",cfd);return 0 ;}on_read(cfd,buf,ret);}};return true;}void EventPoller::Close(int fd){m_accept_handlers.erase(fd);m_buffer_pool.erase(fd);close(fd);}bool EventPoller::FlushData(int fd, const char * buf, size_t len){WriteBuffer *wb = nullptr;auto it = m_buffer_pool.find(fd);if(it == m_buffer_pool.end()){while(len >0){ssize_t ret = write(fd,buf,len);if(ret == -1){if(errno != EAGAIN){Close(fd);return false;}wb = &m_buffer_pool[fd];break;}buf+= ret;len-=ret;}if(len == 0){//Successreturn true;}}else{wb = &it->second;}wb->Add2Buffer(buf,len);return true;}static size_t g_pool_size = 0;
void EventPollerPool::SetPoolSize(size_t size)
{g_pool_size = size;
}
EventPollerPool & EventPollerPool::Instance()
{static std::shared_ptr<EventPollerPool> s_instance(new EventPollerPool()); static EventPollerPool &s_instance_ref = *s_instance; return s_instance_ref; 
}EventPollerPool::EventPollerPool()
{auto size = g_pool_size;auto cpus = std::thread::hardware_concurrency();size = size > 0 ? size : cpus;std::cout<<"Thread size:"<<size<<std::endl;if(size <2)size = 2;for (int i = 0; i < size; ++i) {std::shared_ptr<EventPoller> poller = std::make_shared<EventPoller>(i);m_pollers.emplace_back(poller);}
}std::shared_ptr<EventPoller> EventPollerPool::GetPoller()
{if(m_pollers.size()>1){int min_clients = 10000;int target_index = 0;for(int i = 1;i<m_pollers.size();i++){if(m_pollers[i]-> GetClients() < min_clients){min_clients = m_pollers[i]->GetClients();target_index = i;}}//printf("target index = %d min_clients = %d\n",target_index,min_clients);return m_pollers[target_index];}return m_pollers[0];}
std::shared_ptr<EventPoller> EventPollerPool::GetFirstPoller()
{return m_pollers[0];
}void EventPollerPool::StartPollers()
{for(int i = 1;i<m_pollers.size();i++){m_pollers[i]->Init();int event_fd = m_pollers[i]->GetEventFD();m_pollers[i]->AddEventer(event_fd,[&,i](uint64_t cfd){READER reader = [&,i](int fd,const char*data,size_t len){printf("Len[%s] content[%d] m_pollers[i] = %p i = %d\n",data,len,m_pollers[i],i);m_pollers[i]->FlushData(fd,data,len);return 0;};m_pollers[i]->AddReader(cfd,reader);return 0;});m_pollers[i]->Start();	}
}void EventPollerPool::Stop()
{for(int i = 0;i<m_pollers.size();i++){m_pollers[i]->Stop();}}}

头文件

#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <netinet/tcp.h>#include <sys/eventfd.h>
#include <signal.h>#include <iostream>
#include <memory>
#include <list>
#include <vector>
#include <functional>
#include <thread>
#include <mutex>#include <unordered_map>namespace DURIAN
{class WriteBuffer{private:std::list<std::string> buf_items;size_t offset = 0;public:bool IsEmpty() const{return buf_items.empty();}void Add2Buffer(const char* data,size_t len){if(buf_items.empty() || buf_items.back().size()+len >4096){buf_items.emplace_back(data,len);}else{buf_items.back().append(data,len);}}bool WriteData(int fd){while (IsEmpty() == false){auto const &item = buf_items.front();const char *p = item.data() + offset;size_t len = item.size() -offset;while(len >0){ssize_t ret = write(fd,p,len);if(ret == -1){if(errno == EAGAIN){return true;}return false;}offset += ret;p+=ret;len-= ret;}buf_items.pop_front();}return true;}};using ACCEPTER = std::function<int(int)>;using WRITER = std::function<int(int)>;using EVENTER = std::function<int(int)>;using READER = std::function<int(int,const char *data,size_t)>;//static thread_local std::unordered_map<int fd,READER>g_th_handlers;class EventPoller{private:int m_poll_fd = -1;int m_id;bool m_run_flag = false;std::unordered_map<int,ACCEPTER> m_accept_handlers;std::unordered_map<int,WriteBuffer> m_buffer_pool;std::mutex m_connction_lock;int m_event_fd;std::thread m_thread_id ;std::vector<int>m_connections;void RunLoop();public:EventPoller(int i);~EventPoller();int GetEventFD();int GetClients();std::vector<int> & GetConnections();bool Init();void Start();void Stop();void Wait();	void DeliverConn(int conn_fd);bool AddListener(int fd,ACCEPTER on_listen);bool AddEventer(int fd,EVENTER on_event);bool AddReader(int fd,READER on_read);void Close(int fd);bool Add2Epoll(int fd);bool FlushData(int fd,const char *buf,size_t len);};class EventPollerPool{public:static EventPollerPool &Instance();static void SetPoolSize(size_t size = 0);std::shared_ptr<EventPoller>GetPoller(); std::shared_ptr<EventPoller>GetFirstPoller(); 	void StartPollers();void Stop(); private:int m_size;std::vector<std::shared_ptr<EventPoller>> m_pollers;EventPollerPool();		  };}

main文件

#include "durian.h"static bool g_run_flag = true;
void sig_handler(int signo)
{signal(SIGINT, SIG_IGN);signal(SIGTERM, SIG_IGN);signal(SIGKILL, SIG_IGN);g_run_flag = false;printf("Get exit flag\n");if (SIGINT == signo || SIGTSTP == signo || SIGTERM == signo|| SIGKILL == signo){g_run_flag = false;printf("\033[0;31mprogram exit by kill cmd !\033[0;39m\n");}}bool StartServer()
{int listen_fd = socket(AF_INET,SOCK_STREAM,0);if(listen_fd == -1){printf("Create socket failed\n");return false;}else{printf("Server listen fd is:%d\n",listen_fd);}int reuseaddr = 1;if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr ,sizeof(reuseaddr)) == -1){return false;}struct sockaddr_in listen_addr = {0};listen_addr.sin_family  = AF_INET;listen_addr.sin_addr.s_addr = INADDR_ANY;listen_addr.sin_port = htons(8888);if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr)) == -1){printf("bind failed\n");return false;}if(listen(listen_fd,100) == -1){printf("listen failed\n");return false;}DURIAN::EventPollerPool::SetPoolSize(1);DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); pool.StartPollers();auto poller = pool.GetFirstPoller(); if(poller->Init()){if(poller->AddListener(listen_fd,[&](int conn_fd){printf("将新的fd加到epoll监听 fd =%d\n",conn_fd);//Deliver client fd to other pollerspool.GetPoller()->DeliverConn(conn_fd);return 0;}) == false){return false;}poller->Start();}while(g_run_flag){sleep(2);}pool.Stop();}void StopServer()
{DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); pool.Stop();
}int main(int argc,char *argv[])
{printf(" cpp version :%d\n",__cplusplus);int thread_size = 1;bool run_flag = true;signal(SIGPIPE,SIG_IGN);signal(SIGTERM, sig_handler);signal(SIGKILL, sig_handler);signal(SIGINT,sig_handler); StartServer();return 0;
}

性能测试

ulimit -HSn 102400

ab -n 100000 -c 20000 http://192.168.131.131:8888/index.html
 

相关文章:

基于多线程的Reactor模式的 回声服务器 EchoServer

记录下 一个线程专门用来接受accept获取客户端的fd 获取fd之后 从剩余的执行线程中 找到一个连接客户端数量最少的线程 然后将客户端的fd加入到这个线程中并通过EPOLL监听这个fd 线程之间通过eventfd来通信 将客户端的fd传到 对应的线程中 参考了MediaServer 引入…...

《TWS蓝牙耳机通信原理与接口技术》

+他V hezkz17进数字音频系统研究开发交流答疑群(课题组) 耳机BT与手机BT通信 主耳与从耳通信 耳机BLE盒手机BLE通信 充电盒与耳机通信 上位机与耳机通信 上位机与充电盒通信 1 耳机BT与手机BT通信 传输音频数据传递控制信息 (3) 耳机BLE与手机BLE通信 安卓/苹果app-耳机…...

敏捷开发使用

1.敏捷开发 敏捷开发以用户的需求进化为核心&#xff0c;采用迭代、循序渐进的方法进行软件开发。在敏捷开发中&#xff0c;软件项目在构建初期被切分成多个子项目&#xff0c;各个子项目的成果都经过测试&#xff0c;具备可视、可集成和可运行使用的特征。换言之&#xff0c;就…...

cdsn目录处理:```,```# 目录校正

原标题 <small> cdsn目录处理&#xff1a; &#xff0c;中间添加 # 空格 空行后 遇到的底部空行出错&#xff0c;书接上回&#xff0c;处理空行【python查找替换&#xff1a;查找空行&#xff0c;空行前后添加&#xff0c;中间添加 # 空格 空行后遇到的第1行文字&am…...

前端TypeScript学习day03-TS高级类型

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 TypeScript 高级类型 class 类 class继承 extends implements 类成员可见性 public protected …...

LeetCode-101-对称二叉树

题目描述&#xff1a; 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 题目链接&#xff1a;LeetCode-101-对称二叉树 解题思路&#xff1a;判断2个二叉树是否可以相互翻转&#xff0c;考察同时处理2个二叉树的遍历情况。 代码实现&#xff1a; class Solution …...

9-AJAX-上-原理详解

一、定义 1、什么是Ajax Ajax&#xff1a;即异步 JavaScript 和XML。Ajax是一种用于创建快速动态网页的技术。通过在后台与进行少量数据交换&#xff0c;Ajax可以使网页实现异步更新。这意味着可以在不重新加载整个网页的情况下&#xff0c;对网页的某部分进行更新。而传统的…...

Python3操作Redis最新版|CRUD基本操作(保姆级)

Python3中类的高级语法及实战 Python3(基础|高级)语法实战(|多线程|多进程|线程池|进程池技术)|多线程安全问题解决方案 Python3数据科学包系列(一):数据分析实战 Python3数据科学包系列(二):数据分析实战 Python3数据科学包系列(三):数据分析实战 Win11查看安装的Python路…...

微信又被吐槽了,委屈啊

昨天的时候&#xff0c;一打开微博热搜榜&#xff0c;一看&#xff0c;微信又被吐槽了&#xff0c;微信占用存储这件事几乎年年会被骂&#xff0c;几乎也会年年被吐槽。 这次的起因呢&#xff0c;是一个人整理了一个方法&#xff1a;「微信内存从 126G 清理到 75G 我是怎么做到…...

刷题笔记27——并查集

很长一段时间&#xff0c;我的生活看似马上就要开始了。但是总有一些障碍阻挡着&#xff0c;有些事得先解决&#xff0c;有些工作还有待完成&#xff0c;时间貌似不够用&#xff0c;还有一笔债务8要去付清&#xff0c;然后生活就会开始。最后我终于明白&#xff0c;这些障碍&am…...

Python 模拟类属性

文章目录 模拟类属性的原因模拟类属性的可能解决方案使用 PropertyMock 模拟类属性在不使用 PropertyMock 的情况下模拟类属性Python 模拟类构造函数使用 patch.object 装饰器来修补构造函数本文的主要目的是介绍如何使用 python 单元测试模块 unittest 操作类属性以进行测试和…...

面试算法24:反转链表

题目 定义一个函数&#xff0c;输入一个链表的头节点&#xff0c;反转该链表并输出反转后链表的头节点。例如&#xff0c;把图4.8&#xff08;a&#xff09;中的链表反转之后得到的链表如图4.8&#xff08;b&#xff09;所示。 分析 由于节点j的next指针指向了它的前一个节…...

【论文阅读】面向抽取和理解基于Transformer的自动作文评分模型的隐式评价标准(实验结果部分)

方法 结果 在这一部分&#xff0c;我们展示对于每个模型比较的聚合的统计分析当涉及到计算特征和独立的特征组&#xff08;表格1&#xff09;&#xff0c;抽取功能组和对齐重要功能组&#xff08;表格2&#xff09;&#xff0c;并且最后&#xff0c;我们提供从模型比较&#x…...

VueRouter与expres/koa中间件的关联

ueRouter: runQueue 路由守卫都是有三个参数to,from,next。其中next就是下方的fn执行时候传入的第二个参数(回调函数)&#xff0c;只有该回调执行后才会挨个遍历queue内的守卫。 中间件的作用 隔离基础设施与业务逻辑之间的细节。详细的内容位于《深入浅出Node.js》P210 另外一…...

二十、SpringCloud Alibaba Seata处理分布式事务

目录 一、分布式事务问题1、分布式之前2、分布式之后 二、Seata简介1、Seata是什么&#xff1f;2、Seata能干嘛&#xff1f;3、去拿下&#xff1f;4、怎么玩 三、Seata-server安装四、订单、库存、账户业务数据库准备五、订单、库存、账户业务微服务准备六、Seata原理介绍 一、…...

标准误与聚类稳健标准误的理解

1 标准误 1.1 定义 标准误&#xff08;Standard Error&#xff09;是用来衡量统计样本估计量&#xff08;如均值、回归系数等&#xff09;与总体参数之间的差异的一种统计量。标准误衡量了样本估计量的变异程度&#xff0c;提供了对总体参数的估计的不确定性的度量。标准误越…...

【Github】将本地仓库同步到github上

许久没有用GitHub了&#xff0c;怎么传仓库都忘记了。在这里记录一下 If you have a local folder on your machine and you want to transform it into a GitHub repository, follow the steps below: 1. Install Git (if not already installed) Make sure you have Git in…...

c++视觉--通道分离,合并处理,在分离的通道中的ROI感兴趣区域里添加logo图片

c视觉–通道分离&#xff0c;合并处理 通道分离: split()函数 #include <opencv2/opencv.hpp>int main() {// 读取图像cv::Mat image cv::imread("1.jpg");// 检查图像是否成功加载if (image.empty()) {std::cerr << "Error: Could not read the…...

python爬虫:多线程收集/验证IP从而搭建有效IP代理池

目录 一、前言 二、IP池的实现 1. 收集代理IP 2. 验证代理IP可用性 3. 搭建IP代理池 三、多线程实现 四、代理IP的使用 五、总结 一、前言 在网络爬虫中&#xff0c;IP代理池的作用非常重要。网络爬虫需要大量的IP地址来发送请求&#xff0c;同时为了降低被封禁的风险…...

阻塞队列以及阻塞队列的一个使用

阻塞队列以及阻塞队列的一个使用 阻塞队列简介 阻塞队列&#xff08;Blocking Queue&#xff09;是一种常见的队列数据结构&#xff0c;它具有特殊的行为&#xff0c;可以用于多线程编程中&#xff0c;以协调不同线程之间的任务执行和数据传递。阻塞队列在多线程环境中非常有…...

React Native 开发环境搭建(全平台详解)

React Native 开发环境搭建&#xff08;全平台详解&#xff09; 在开始使用 React Native 开发移动应用之前&#xff0c;正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南&#xff0c;涵盖 macOS 和 Windows 平台的配置步骤&#xff0c;如何在 Android 和 iOS…...

云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地

借阿里云中企出海大会的东风&#xff0c;以**「云启出海&#xff0c;智联未来&#xff5c;打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办&#xff0c;现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...

3.3.1_1 检错编码(奇偶校验码)

从这节课开始&#xff0c;我们会探讨数据链路层的差错控制功能&#xff0c;差错控制功能的主要目标是要发现并且解决一个帧内部的位错误&#xff0c;我们需要使用特殊的编码技术去发现帧内部的位错误&#xff0c;当我们发现位错误之后&#xff0c;通常来说有两种解决方案。第一…...

mongodb源码分析session执行handleRequest命令find过程

mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程&#xff0c;并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令&#xff0c;把数据流转换成Message&#xff0c;状态转变流程是&#xff1a;State::Created 》 St…...

【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】

1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件&#xff08;System Property Definition File&#xff09;&#xff0c;用于声明和管理 Bluetooth 模块相…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...