仿RabbitMq实现简易消息队列基础篇(future操作实现异步线程池)
@TOC
介绍
std::future 是C++11标准库中的一个模板类,他表示一个异步操作的结果,当我们在多线程编程中使用异步任务时,std::future可以帮助我们在需要的时候,获取任务的执行结果,std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。
应用场景
- 异步任务:当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,std::future可以用来表示这些异步任务的结果,通过将任务与主线程分离,我们可以实现任务的并行操作,从而提高程序的执行效率
- 并发操作:在多线程编程中,我们可能需要等待某些任务完成后才能执行其他操作,通过使用std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续结果
- 结果获取:std::future提供了一种安全的方式来获取异步任务的结果。我们可以使用std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调用get()函数时,我们可以确保已经获取到了所需的结果
使用方法
std::async
std::async是一种将任务与std::future关联的简单方法,它创建并运行一个异步任务,并返回一个与该任务结果关联的std::future对象。默认情况下,std::async是否启动了一个新的线程,或者在等待future时,任务是否同步运行取决于你给的参数,这个参数为std::launch类型:
- std::launch::deferred 表明该函数会被延迟调用,直到在future上调用get()或者wait()才会开始执行任务
- std::launch::async表明函数会在自己创建的线程上运行
- std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略
#include <iostream>
#include <future>
#include <thread>
#include <chrono>using namespace std;int async_task()
{std::this_thread::sleep_for(std::chrono::seconds(2));return 2;
}int main()
{// 关联异步任务async_task 和 futurestd::future<int> result = std::async(std::launch::deferred | std::launch::async, async_task);// 此时可以执行其他操作cout << "干其他事情" << endl;// 获取异步任务结果int ret = result.get();cout << ret << endl;return 0;
}
std::packaged_task
std::packaged_task就是将任务和std::future绑定在一起的模板,是一种对任务的封装,我们可以通过std::packaged_task对象获取任务相关联的std::future对象,通过调用get_future()方法获取。std::packaged_task的模板参数是函数签名。
所谓函数签名就是一个函数头去掉函数名
下面介绍一下std::packaged_task,首先这个类型的对象是不可复制的

可以看到拷贝构造函数被delete了。
std::packaged_task是用来包装异步任务的工具,它的本质是将一个可调用对象封装起来,和std::future结合起来,这个不能被直接调用,因为这样的实质是同步调用任务,而不是异步调用,并且std::packaged_task对象是没有返回值的,因为是不可拷贝的, 所以std::packaged_task对象在使用的时候,需要创建一个线程,然后使用智能指针或者move函数来进行传递。注意因为创建了一个新的线程,并且需要获取到这个新的线程执行任务的结果,所以我们就需要进行等待或者分离,即join和detach()。
使用move进行移动
#include <iostream>
#include <future>
#include <thread>
#include <memory>
#include <chrono>int Add(int num1, int num2)
{std::this_thread::sleep_for(std::chrono::seconds(2));return num1 + num2;
}int main()
{std::packaged_task<int(int, int)> task(Add);std::future<int> fu = task.get_future();std::thread th(std::move(task), 11, 22);int ret = fu.get();std::cout << ret << std::endl;th.join();return 0;
}
使用智能指针
#include <iostream>
#include <future>
#include <thread>
#include <memory>
#include <chrono>int Add(int num1, int num2)
{std::this_thread::sleep_for(std::chrono::seconds(2));return num1 + num2;
}int main()
{auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(Add);std::future<int> fu = ptask->get_future();std::thread th([ptask](){(*ptask)(11, 22);});int ret = fu.get();std::cout << ret << std::endl;th.join();return 0;
}
std::promise
std::promise提供了一种设置值的方式,它可以在设置之后通过相关联的std::future对象进行读取,换种说法来说就是之前说过的std::future可以读取一个异步函数的返回值了,但是要等待就绪,而std::promise就提供了一个方式手动让std::future就绪
#include <iostream>
#include <future>
#include <chrono>void task(std::promise<int> result_promise)
{int result = 2;result_promise.set_value(result);std::cout << result << std::endl;
}int main()
{std::promise<int> result;std::future<int> reuslt_future = result.get_future();std::thread th(task, std::move(result));int ret = reuslt_future.get();std::cout << ret << std::endl;th.join();return 0;
}
线程池设计
#include <iostream>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <mutex>
#include <condition_variable>
#include <vector>class threadPool
{
public:using ptr = std::shared_ptr<threadPool>;using Functor = std::function<void(void)>;threadPool(int thr_count = 1): _stop(false){for (int i = 0; i < thr_count; i++){_threads.emplace_back(&threadPool::entry, this);}}~threadPool(){stop();}// push传入的是首先有一个函数--用户要执行的函数,接下来是不定参,标识要处理的数据就是要传入到函数中的参数// push函数内部,会将这个传入的函数封装成一个异步任务(packaged_task),// 使用 lambda 生成一个可调用对象(内部执行异步程序)抛入到任务池中,由工作线程取出进行执行template <typename F, typename ...Args>auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>{// 1. 将传入的函数封装成一个packaged_task任务using return_type = decltype(func(args...));auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);//auto task = std::make_shared<std::packaged_task<return_type(std::forward<Args>(args)...)>>(std::forward(func));auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);std::future<return_type> fu = task->get_future();// 2. 构造一个lambda 匿名函数(捕获任务对象),函数内执行任务对象{std::unique_lock<std::mutex> lock(_mutex);// 3. 将构造出来的匿名函数,抛入到任务池中_taskpool.push_back([task](){ (*task)(); });_cv.notify_one();}return fu;}void stop(){if(_stop == true) return ;_stop = true;_cv.notify_all();for (auto &thread : _threads){thread.join();}}private:// 线程入口函数---内部不断的从任务池中取出任务进行执行void entry(){while (!_stop){std::vector<Functor> tmp_taskpool;{// 加锁std::unique_lock<std::mutex> lock(_mutex);// 等待任务池不为空,或者_stop 被置位返回true_cv.wait(lock, [this](){ return _stop || !_taskpool.empty(); });// 取出任务进行执行tmp_taskpool.swap(_taskpool); // 这里是将全局任务池里面的任务全部给一个线程里面的任务池 }for (auto &task : tmp_taskpool){task();}}}private:std::atomic<bool> _stop;std::vector<Functor> _taskpool; // 任务池std::mutex _mutex; // 互斥锁std::condition_variable _cv; // 条件变量std::vector<std::thread> _threads;
};
main函数
#include "threadpool.hpp"int Add(int num1, int num2)
{return num1 + num2;
}int main()
{threadPool pool;for(int i = 0; i < 10; i++){std::future<int> fu = pool.push(Add, 11, i);std::cout << fu.get() << std::endl;}pool.stop();return 0;
}
相关文章:
仿RabbitMq实现简易消息队列基础篇(future操作实现异步线程池)
TOC 介绍 std::future 是C11标准库中的一个模板类,他表示一个异步操作的结果,当我们在多线程编程中使用异步任务时,std::future可以帮助我们在需要的时候,获取任务的执行结果,std::future 的一个重要特性是能…...
经典算法题总结:数组常用技巧(双指针,二分查找和位运算)篇
双指针 在处理数组和链表相关问题时,双指针技巧是经常用到的,双指针技巧主要分为两类:左右指针和快慢指针。所谓左右指针,就是两个指针相向而行或者相背而行;而所谓快慢指针,就是两个指针同向而行…...
版本控制基础理论
一、本地版本控制 在本地记录文件每次的更新,可以对每个版本做一个快照,或是记录补丁文件,适合个人使用,如RCS. 二、集中式版本控制(代表SVN) 所有的版本数据都保存在服务器上,协同开发者从…...
微分方程(Blanchard Differential Equations 4th)中文版Section1.4
1.4 NUMERICAL TECHNIQUE: EULER’S METHOD 上一节中讨论的斜率场的几何概念与近似微分方程解的基本数值方法密切相关。给定一个初值问题 d y d t = f ( t , y ) , y ( t 0 ) = y 0 , \frac{dy}{dt}=f(t,y), \quad y(t_0) = y_0, dtdy=f(t,y),y(t0)=y0, 我们可以通过首…...
求职Leetcode算法题(7)
1.搜索旋转排序数组 这道题要求时间复杂度为o(log n),那么第一时间想到的就是二分法,二分法有个前提条件是在有序数组下,我们发现在这个数组中存在两部分是有序的,所以我们只需要对前半部分和后半部分分别…...
ActiveMQ、RabbitMQ、Kafka、RocketMQ在事务性消息、性能、高可用和容错、定时消息、负载均衡、刷盘策略的区别
ActiveMQ、RabbitMQ、Kafka、RocketMQ这四种消息队列在事务性消息、性能、高可用和容错、定时消息、负载均衡、刷盘策略等方面各有其特点和差异。以下是对这些方面的详细比较: 1. 事务性消息 ActiveMQ:支持事务性消息。ActiveMQ可以基于JMS(…...
HanLP分词的使用与注意事项
1 概述 HanLP是一个自然语言处理工具包,它提供的主要功能如下: 分词转化为拼音繁转简、简转繁提取关键词提取短语提取词语自动摘要依存文法分析 下面将介绍其分词功能的使用。 2 依赖 下面是依赖的jar包。 <dependency><groupId>com.ha…...
Python 的进程、线程、协程的区别和联系是什么?
一、区别 1. 进程 • 定义:进程是操作系统分配资源的基本单位。 • 资源独立性:每个进程都有独立的内存空间,包括代码、数据和运行时的环境。 • 并发性:可以同时运行多个进程,操作系统通过时间片轮转等方式在不同…...
实时数据推送:Spring Boot 中两种 SSE 实战方案
在 Web 开发中,实时数据交互变得越来越普遍。无论是股票价格的波动、比赛比分的更新,还是聊天消息的传递,都需要服务器能够及时地将数据推送给客户端。传统的 HTTP 请求-响应模式在处理这类需求时显得力不从心,而服务器推送事件&a…...
数据守护者:SQL一致性检查的艺术与实践
标题:数据守护者:SQL一致性检查的艺术与实践 在数据驱动的商业世界中,数据的一致性是确保决策准确性和业务流程顺畅的关键。SQL作为数据查询和操作的基石,提供了多种工具来维护数据的一致性。本文将深入探讨如何使用SQL进行数据一…...
jenkins配置+vue打包多环境切换
jenkins配置流水线过程 1.新建item 加入相关的参数就行了。 流水线脚本设置 后端脚本 node {stage checkoutsh"""#每次打包清空工作空间目录rm -rf $workspace/*cd $workspace#到工作空间下从远端svn服务端拉取代码svn co svn://10.1.19.21/repo/技术中台/低…...
idea和jdk的安装教程
1.JDK的安装 下载 进入官网,找到你需要的JDK版本 Java Downloads | Oracle 中国 我这里是windows的jdk17,选择以下 安装 点击下一步,安装完成 配置环境变量 打开查看高级系统设置 在系统变量中添加两个配置 一个变量名是 JAVA_HOME …...
HTML静态网页成品作业(HTML+CSS)——电影网首页网页设计制作(1个页面)
🎉不定期分享源码,关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 🏷️本套采用HTMLCSS,未使用Javacsript代码,共有1个页面。 二、作品演示 三、代…...
大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库
大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、…...
LabVIEW VI 多语言动态加载与运行的实现
在多语言应用程序开发中,确保用户界面能够根据用户的语言偏好动态切换是一个关键需求。本文通过分析一个LabVIEW程序框图,详细说明了如何使用LabVIEW中的属性节点和调用节点来实现VI(虚拟仪器)界面语言的动态加载与运行。此程序允…...
Unity引擎基础知识
目录 Unity基础知识概要 1. 创建工程 2. 工程目录介绍 3. Unity界面和五大面板 4. 游戏物体创建与操作 5. 场景和层管理 6. 组件系统 7. 脚本语言C# 8. 物理引擎和UI系统 学习资源推荐 Unity引擎中如何优化大型游戏项目的性能? Unity C#脚本语言的高级编…...
练习题- 探索正则表达式对象和对象匹配
正则表达式(Regular Expressions)是一种强大而灵活的文本处理工具,它允许我们通过模式匹配来处理字符串。这在数据清理、文本分析等领域有着广泛的应用。在Python中,正则表达式通过re模块提供支持,学习和掌握正则表达式对于处理复杂的文本数据至关重要。 本文将探索如何在…...
Java集合提升
1. 手写ArrayList 1.1. ArrayList底层原理细节 底层结构是一个长度可以动态增长的数组(顺序表)transient Object[] elementData; 特点:在内存中分配连续的空间,只存储数据,不存储地址信息。位置就隐含着地址。优点 节…...
uniapp 微信小程序生成水印图片
效果 源码 <template><view style"overflow: hidden;"><camera device-position"back" flash"auto" class"camera"><cover-view class"text-white padding water-mark"><cover-view class"…...
ElasticSearch相关知识点
ElasticSearch中的倒排索引是如何工作的? 倒排索引是ElasticSearch中用于全文检索的一种数据结构,与正排索引不同的是,正排索引将文档按照词汇顺序组织。而倒排索引是将词汇映射到包含该词汇的文档中。 在ElasticSearch中,倒排索…...
零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?
一、核心优势:专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发,是一款收费低廉但功能全面的Windows NAS工具,主打“无学习成本部署” 。与其他NAS软件相比,其优势在于: 无需硬件改造:将任意W…...
Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
CMake控制VS2022项目文件分组
我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...
USB Over IP专用硬件的5个特点
USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中,从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备(如专用硬件设备),从而消除了直接物理连接的需要。USB over IP的…...
