当前位置: 首页 > news >正文

C/C++|基于回调函数实现异步操作

首先,要搞懂一点,异步操作本质上也是并发,我们想要在线程级别实现异步并发基本就靠三种方式:

  • 多线程并发
  • 回调函数
  • 协程

今天我们讨论的是回调函数,我们如何通过回调函数来实现异步操作呢?

  1. 非阻塞I/O操作+回调函数实现异步IO
  2. 基于定时器+回调函数实现异步任务调度
  3. 事件队列+回调函数

我们就分别来实现一下他们吧,代码比较长,请耐心阅读。

非阻塞I/O+回调

这种方式允许程序在等待 I/O 操作完成时继续执行其他任务,从而提高并发性和性能。

我们可以使用标准 C++ 库和 C++11 中的线程库来实现一个基于 epoll 的非阻塞 I/O 和回调函数的示例。epoll 是 Linux 提供的高效 I/O 多路复用机制,适用于处理大量并发连接。我们将使用 epoll 来实现异步 I/O,并使用回调函数处理完成的 I/O 操作。

实例代码:使用 epoll 实现非阻塞 I/O 和回调函数

#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <iostream>
#include <cstring>
#include <functional>
#include <unordered_map>
#include <vector>
#include <thread>
#include <chrono>void set_non_blocking(int fd) {int flags = fcntl(fd, F_GETFL, 0);if (flage == -1) {throw std::runtime_error("fcntl get error");}if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {throw std::runtime_error("fcntl set error");}
}//事件处理器类
class EventLoop {
public:EventLoop() {epollFd_ = epoll_create1(0);if (epoll_fd == -1) {throw std::runtime_error("epoll_creat1 error");}}~EpollLoop() {close(epoll_fd);}//添加文件描述符及其对应的回调函数void add_fd(int fd, std::function<void(int)> cb) {set_non_blocking(fd);epoll_event event;event.events = EPOLLIN | EPOLLET;event.data.fd = fd;if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event) == -1) {throw std::runtime_error("epoll_ctl add error");}callbacks_[fd] = cb;}//运行时间循环void run() {while (running_) {std::vector<epoll_event> events(10);int numActives = epoll_wait(epollFd_, events.data(), events.size(), -1);if (numActives == -1) {throw std::runtime_error("epoll_wait error");}for (int i = 0; i < n; ++i) {int fd = events[i].data.fd;if (callbacks_.find(fd) != callbacks_.end()) callbacks[fd](fd);}}}//停止事件循环void stop() {return = false;}private:int epollFd_; //epoll实例std::unordered_map<int, std::function<void(int)>> callbacks_;bool running_ = true;
};// 示例回调函数,处理标准输入的读取
void handle_stdin(int fd) {char buffer[128];ssize_t n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0) {buffer[n] = '\0';std::cout << "Read from stdin: " << buffer << std::endl;} else if (n == 0) {std::cout << "EOF from stdin" << std::endl;} else {if (errno != EAGAIN && errno != EWOULDBLOCK) {std::ceer << "Read error: " <<  strerror(errno) << std::endl;}	}
}int main () {try {EventLoop loop;//注册标准输入的文件描述符和回调函数loop.add_fd(STDIN_FILENO, handle_stdin);//运行事件循环std::thread loop_thread([&loop]() {loop.run();});//模拟只线程的其他工作std::this_thread::sleep_for(std::chrono::seconds(10));//停止事件循环loop.stop();loop_thread.join();} catch (const std::exception &e) {std::cerr << "Exception: " << e.what() << std::endl;return 1}return 0;
}

在该示例中,由于是非阻塞IO,所以如果有两个 fd 对应的事件被激活,那么他们会循环执行各自的回调操作,因为我们的EventLoop里面是又一个 while 循环的,如果这两个 fd 如果都是读一个很大很大的文件,那么他们在while循环中会交替执行各自的回调任务,实现异步操作。

基于定时器+回调函数实现异步任务调度

通过定时器和回调函数,可以在单线程环境中实现异步任务调度。在这个示例中,我们实现了一个简单的 Web 服务器定时清理过期会话的功能。类似的技术可以应用于其他需要定时任务调度的场景,如定时备份、日志轮换、定时数据采集等。

1. SessionManager 类

这个类负责管理用户会话,提供添加、删除和清理过期会话的方法。

class SessionManager {
#include <iostream>
#include <unordered_map>
#include <chrono>
#include <functional>
#include <thread>
#include <vector>
#include <algorithm>class SessionManager {
public:void add_session(const std::string& session_id) {sessions[session_id] = std::chrono::steady_clock::now();}void remove_session(const std::string& session_id) {sessions.erase(session_id);}void clean_expired_sessions(std::chrono::seconds timeout) {auto now = std::chrono::steady_clock::now();for (auto it = sessions.begin(); it != sessions.end(); ) {if (now - it->second > timeout) {std::cout << "Removing expired session: " << it->first << std::endl;it = sessions.erase(it);} else {++it;}}}private:std::unordered_map<std::string, std::chrono::steady_clock::time_point> sessions;
};
  • add_session:添加一个新会话,记录会话 ID 和当前时间。
  • remove_session:删除指定的会话。
  • clean_expired_sessions:清理过期会话,检查所有会话,如果会话时间超过指定的超时时间,则删除会话。

2. EventLoop类

这个类管理定时任务,通过维护一个事件列表并运行事件循环,在指定时间间隔内执行任务。

class EventLoop {
public:void add_event(std::function<void()> callback, std::chrono::milliseconds interval) {auto next_run = std::chrono::steady_clock::now() + interval;events.emplace_back(next_run, interval, callback);}void run() {while (running) {auto now = std::chrono::steady_clock::now();for (auto& event : events) {if (now >= event.next_run) {event.callback();event.next_run = now + event.interval;}}std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 小休眠,减少CPU占用}}void stop() {running = false;}private:struct Event {std::chrono::steady_clock::time_point next_run;std::chrono::milliseconds interval;std::function<void()> callback;Event(std::chrono::steady_clock::time_point nr, std::chrono::milliseconds i, std::function<void()> cb): next_run(nr), interval(i), callback(cb) {}};std::vector<Event> events;bool running = true;
};
  • add_event:添加一个新的定时事件,指定回调函数和时间间隔。
  • run:启动事件循环,循环检查所有事件,如果事件到期则执行回调函数,并更新下一次执行时间。
  • stop:停止事件循环。

3. 回调函数

定义一个回调函数,用于清理过期会话。
如果我们定义两个不同任务的回调函数,那么是不是就可以在某段时间实现多个任务了呢?

void clean_sessions_task(SessionManager& session_manager, std::chrono::seconds timeout) {session_manager.clean_expired_sessions(timeout);
}

这个函数调用 SessionManager 的 clean_expired_sessions 方法来清理过期会话。

4. main 函数

int main() {SessionManager session_manager;EventLoop event_loop;// 添加一些示例会话session_manager.add_session("session1");std::this_thread::sleep_for(std::chrono::seconds(1));session_manager.add_session("session2");// 每2秒清理一次过期会话(假设过期时间为3秒)event_loop.add_event(std::bind(clean_sessions_task, std::ref(session_manager), std::chrono::seconds(3)), std::chrono::seconds(2));// 运行事件循环std::thread event_loop_thread([&event_loop]() {event_loop.run();});// 模拟服务器运行std::this_thread::sleep_for(std::chrono::seconds(10));// 停止事件循环event_loop.stop();event_loop_thread.join();return 0;
}
  1. 创建 SessionManager 和 EventLoop 对象。
  2. 添加两个示例会话,分别在 0 秒和 1 秒时添加。
  3. 添加一个定时事件,每 2 秒调用一次 clean_sessions_task 来清理过期会话,过期时间为 3 秒。
  4. 启动一个线程运行事件循环。
  5. 主线程模拟服务器运行 10 秒钟。
  6. 停止事件循环并等待事件循环线程结束。

如果我们添加两个定时事件,并且打开循环,那么不是就在某段时间实现了异步任务调度呢?

基于事件队列+回调函数实现异步操作

这个示例展示了如何使用标准 C++ 库和 C++11 线程库,通过事件队列和回调函数在单线程中实现异步任务调度。具体来说,我们实现了一个简单的系统,可以调度并执行延迟任务。

1.EventLoop 类

这个类管理定时任务,通过维护一个事件列表并运行事件循环,在指定时间间隔内执行任务。

class EventLoop {
public:// 添加一个新的定时事件void add_event(std::function<void()> callback, std::chrono::milliseconds interval) {auto next_run = std::chrono::steady_clock::now() + interval;events.emplace_back(next_run, interval, callback);}// 运行事件循环void run() {while (running) {auto now = std::chrono::steady_clock::now();for (auto& event : events) {if (now >= event.next_run) {event.callback(); // 执行回调函数event.next_run = now + event.interval; // 更新下一次执行时间}}std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 小休眠,减少CPU占用}}// 停止事件循环void stop() {running = false;}private:struct Event {std::chrono::steady_clock::time_point next_run;std::chrono::milliseconds interval;std::function<void()> callback;Event(std::chrono::steady_clock::time_point nr, std::chrono::milliseconds i, std::function<void()> cb): next_run(nr), interval(i), callback(cb) {}};std::vector<Event> events;bool running = true;
};
  • add_event:添加一个新的定时事件,指定回调函数和时间间隔。
    • 参数 callback 是一个回调函数,当事件触发时调用。
    • 参数 interval 是一个时间间隔,表示事件应该在多长时间后执行。
    • next_run 记录了事件的下一次执行时间。
  • run:启动事件循环,循环检查所有事件,如果事件到期则执行回调函数,并更新下一次执行时间。
    • while (running):事件循环在 running 为 true 时持续运行。
    • event.callback():执行回调函数,表示事件触发。
    • event.next_run = now + event.interval:更新事件的下一次执行时间。
    • std::this_thread::sleep_for(std::chrono::milliseconds(10)):通过短暂休眠来减少 CPU 占用。
  • stop:停止事件循环,将 running 设置为 false。

2. 回调函数

定义两个简单的回调函数,用于示例任务。

void say_hello() {std::cout << "Hello, World!" << std::endl;
}void say_goodbye() {std::cout << "Goodbye, World!" << std::endl;
}

3.main函数

设置并运行事件循环,添加一些示例任务,并定期执行这些任务。

int main() {EventLoop loop;// 添加定时事件loop.add_event(say_hello, std::chrono::seconds(1)); // 每1秒执行一次loop.add_event(say_goodbye, std::chrono::seconds(2)); // 每2秒执行一次// 运行事件循环loop.run();return 0;
}

异步任务的实现

异步任务的实现核心在于 EventLoop 类中的 add_event 和 run 方法:

  • add_event 方法将任务(回调函数)和执行时间(间隔)添加到事件队列中。
  • run 方法启动事件循环,定期检查事件队列,触发到期的任务,并通过回调函数执行这些任务。

具体来说,异步任务的执行发生在以下代码行:

if (now >= event.next_run) {event.callback(); // 执行回调函数event.next_run = now + event.interval; // 更新下一次执行时间
}
  • event.callback():当当前时间 now 大于等于 event.next_run 时,执行回调函数,表示异步任务触发并执行。

这个机制允许在单线程环境中,通过事件队列和回调函数实现异步任务调度,无需多线程或协程。每个任务在指定的时间间隔后触发,保持事件循环的运行和任务的调度。

事件队列\定时器 + 回调真的能实现异步任务吗

当回调任务是一个非常耗时的操作时,如果在单线程中执行,确实会导致事件循环被阻塞,从而影响其他任务的执行。这违背了异步执行的初衷,即不阻塞程序的其他部分。

为了解决这个问题,主要还是以下方法:

  • 使用线程池
    • 我们可以使用一个线程池来处理耗时的任务,从而避免阻塞事件循环。线程池可以并发执行多个任务,确保事件循环保持响应。

相关文章:

C/C++|基于回调函数实现异步操作

首先&#xff0c;要搞懂一点&#xff0c;异步操作本质上也是并发&#xff0c;我们想要在线程级别实现异步并发基本就靠三种方式&#xff1a; 多线程并发回调函数协程 今天我们讨论的是回调函数&#xff0c;我们如何通过回调函数来实现异步操作呢&#xff1f; 非阻塞I/O操作回…...

Mac上搭建Python环境:深入探索与高效实践

Mac上搭建Python环境&#xff1a;深入探索与高效实践 在Mac上搭建Python环境&#xff0c;对于开发者来说是一个既具挑战性又充满乐趣的过程。本文将从四个方面、五个方面、六个方面和七个方面详细阐述如何在Mac上成功搭建Python环境&#xff0c;并帮助您更好地理解和应用Pytho…...

数据标准的制定落地

目录 什么是数据标准 基本定义 目的 数据标准体系分类 从内容层面分类 从管理视角分类 从面向的对象分类 从数据结构的角度分类 数据标准价值 业务价值 技术价值 管理价值 数据标准和数据治理的关系 数据标准在数据治理各项任务中的作用 数据标准与主数据 数据…...

微信小程序基础 -- 小程序UI组件(5)

小程序UI组件 1.小程序UI组件概述 开发文档&#xff1a;https://developers.weixin.qq.com/miniprogram/dev/framework/view/component.html 什么是组件&#xff1a; 组件是视图层的基本组成单元。 组件自带一些功能与微信风格一致的样式。 一个组件通常包括 开始标签 和 结…...

Linux shell编程学习笔记55:hostname命令——获取或设置主机名,显示IP地址和DNS、NIS

0 前言 2024年的网络安全检查又开始了&#xff0c;对于使用基于Linux的国产电脑&#xff0c;我们可以编写一个脚本来收集系统的有关信息。其中主机名也是我们要收集的信息之一。 1. hostname命令 的功能、格式和选项说明 我们可以使用命令 hostname --help 来查看hostname命令…...

【鸟哥】Linux笔记-硬件搭配

在Linux这个系统当中&#xff0c;几乎所有的硬件设备文件都在/dev这个目录内。打印机与软盘呢&#xff1f;分别是/dev/lp0, /dev/fd0。 几个常见的设备与其在Linux当中的文件名&#xff1a; 如果你的机器使用的是跟网际网络供应商 &#xff08;ISP&#xff09; 申请使用的云端…...

代码随想三刷数组篇

代码随想三刷数组篇1 704. 二分查找题目代码27. 移除元素题目代码977.有序数组的平方题目代码209.长度最小的子数组题目代码59.螺旋矩阵II题目代码704. 二分查找 题目...

windows环境下重建oracle监听

由于某种原因导致数据库监听启动失败&#xff0c;需要重新创建监听。 过程如下&#xff1a; 第一步&#xff1a;修改 listenr.ora 文件 &#xff0c;增加新的监听配置 LISTENER4 (DESCRIPTION_LIST (DESCRIPTION (ADDRESS (PROTOCOL TCP)(HOST DESKTOP-BE6GDNT)(PORT 152…...

单元测试框架Pytest的基本操作

Pytest基本操作 1. 详解1.1 命名规则:1.2 自定义查找规则:1.3 3种运行方式1.4 执行顺序2. 断言2.1 定义2.2 断言的规则3. mark3.1 mark的作用3.2 mark的标记方式3.3 注册标签名3.4 skip跳过标记4. pytest的参数化5. pytest的夹具(fixture测试夹具)5.1. 作用5.2. 夹具应用场…...

Java web应用性能分析之【java进程问题分析工具】

Java web应用性能分析之【java进程问题分析概叙】-CSDN博客 前面大概讲了java进程问题分析流程&#xff0c;这里再小结一下分析工具&#xff0c;后面也会小结一下java进程问题分析定位。 1.分析工具 1.1.linux命令工具 参考&#xff1a;Java web应用性能分析之【Linux服务器性…...

02-2.3.2_2 单链表的查找

喜欢《数据结构》部分笔记的小伙伴可以订阅专栏&#xff0c;今后还会不断更新。 此外&#xff0c;《程序员必备技能》专栏和《程序员必备工具》专栏&#xff08;该专栏暂未开设&#xff09;日后会逐步更新&#xff0c;感兴趣的小伙伴可以点一下订阅、收藏、关注&#xff01; 谢…...

设计模式(十四)行为型模式---访问者模式(visitor)

文章目录 访问者模式简介分派的分类什么是双分派&#xff1f;结构UML图具体实现UML图代码实现 优缺点 访问者模式简介 访问者模式&#xff08;visitor pattern&#xff09;是封装一些作用于某种数据结构中的元素的操作&#xff0c;它可以在不改变这个数据结构&#xff08;实现…...

【Matplotlib作图-3.Ranking】50 Matplotlib Visualizations, Python实现,源码可复现

目录 03 Ranking 3.0 Prerequisite 3.1 有序条形图(Ordered Bar Chart) 3.2 棒棒糖图(Lollipop Chart) 3.3 点图(Dot Plot) 3.4 斜率图(Slope Chart) 3.5 杠铃图(Dumbbell Plot) References 03 Ranking 3.0 Prerequisite Setup.py # !pip install brewer2mpl import n…...

加入不正确的位置编码会破坏掉原本的信息吗?

会 位置编码的作用 在Transformer中&#xff0c;位置编码的主要作用是让模型感知输入序列中各个词的位置。因为Transformer完全依赖自注意力机制&#xff0c;它本身并没有序列信息&#xff0c;位置编码的引入就是为了补充这一点。 加法操作的合理性 位置编码通过加法操作与…...

区块链合约开发流程

区块链合约开发&#xff0c;尤其是以太坊智能合约开发&#xff0c;是一个多步骤的过程&#xff0c;从需求分析到部署和维护&#xff0c;每一步都需要仔细规划和执行。以下是详细的开发流程。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合…...

建筑企业有闲置资质怎么办?

如果建筑企业拥有闲置资质&#xff0c;可以考虑以下几种方式来充分利用这些资质&#xff1a; 1. 租赁或转让资质&#xff1a; 将闲置的建筑资质租赁给其他企业或个人使用&#xff0c;或者通过转让的方式将资质出售给有需要的企业或个人。 2. 提供咨询服务&#xff1a; 利用建…...

Java开发-特殊文本文件,日志技术

目录 01.特殊文件,日志技术概述 02.特殊文件:Properties属性文件 ​编辑Properties案例 特殊文件:XML文件 XML的作用和应用场景 读取XML文件中的数据 XML的生成 约束XML文件的编写[了解] 日志技术 日志技术的体系 ​编辑 ​编辑 Logback日志框架的概述 Logback快…...

Django ORM深度游:探索多对一、一对一与多对多数据关系的奥秘与实践

系列文章目录 Django入门全攻略&#xff1a;从零搭建你的第一个Web项目Django ORM入门指南&#xff1a;从概念到实践&#xff0c;掌握模型创建、迁移与视图操作Django ORM实战&#xff1a;模型字段与元选项配置&#xff0c;以及链式过滤与QF查询详解Django ORM深度游&#xff…...

无人机路径规划:基于鸽群优化算法PIO的无人机三维路径规划MATLAB代码

一、无人机模型介绍 无人机三维航迹规划_无人机航迹规划-CSDN博客 二、部分代码 close all clear clc warning (off) global model global gca1 gca2 gca3 gca4 model CreateModel(); % Create search map and parameters load(BestPosition5.mat); load(ConvergenceCurve5…...

ArcGIS属性域和子类型

01 属性域 道路的车道数值是小于10的。在编辑道路的此属性时&#xff0c;为了限制其值在10以内&#xff0c;可以使用属性域。当输入数据超过10时&#xff0c;就会限制输入。 限制输入这个功能是Pro特有的&#xff0c;在ArcMap中输入超出限制的值也是合法的&#xff0c;需要手动…...

SciencePlots——绘制论文中的图片

文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了&#xff1a;一行…...

MVC 数据库

MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

Caliper 配置文件解析:config.yaml

Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3

一&#xff0c;概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本&#xff1a;2014.07&#xff1b; Kernel版本&#xff1a;Linux-3.10&#xff1b; 二&#xff0c;Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01)&#xff0c;并让boo…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

基于Springboot+Vue的办公管理系统

角色&#xff1a; 管理员、员工 技术&#xff1a; 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能&#xff1a; 该办公管理系统是一个综合性的企业内部管理平台&#xff0c;旨在提升企业运营效率和员工管理水…...

MySQL 主从同步异常处理

阅读原文&#xff1a;https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主&#xff0c;遇到的这个错误&#xff1a; Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一&#xff0c;通常表示&#xff…...