当前位置: 首页 > news >正文

仿RabbitMq实现简易消息队列基础篇(future操作实现异步线程池)

@TOC

介绍        

        std::future 是C++11标准库中的一个模板类,他表示一个异步操作的结果,当我们在多线程编程中使用异步任务时,std::future可以帮助我们在需要的时候,获取任务的执行结果,std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。

应用场景

  • 异步任务:当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,std::future可以用来表示这些异步任务的结果,通过将任务与主线程分离,我们可以实现任务的并行操作,从而提高程序的执行效率
  • 并发操作:在多线程编程中,我们可能需要等待某些任务完成后才能执行其他操作,通过使用std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续结果
  • 结果获取:std::future提供了一种安全的方式来获取异步任务的结果。我们可以使用std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调用get()函数时,我们可以确保已经获取到了所需的结果

使用方法

std::async

        std::async是一种将任务与std::future关联的简单方法,它创建并运行一个异步任务,并返回一个与该任务结果关联的std::future对象。默认情况下,std::async是否启动了一个新的线程,或者在等待future时,任务是否同步运行取决于你给的参数,这个参数为std::launch类型:

  1. std::launch::deferred 表明该函数会被延迟调用,直到在future上调用get()或者wait()才会开始执行任务
  2. std::launch::async表明函数会在自己创建的线程上运行
  3. std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略
#include <iostream>
#include <future>
#include <thread>
#include <chrono>using namespace std;int async_task()
{std::this_thread::sleep_for(std::chrono::seconds(2));return 2;
}int main()
{// 关联异步任务async_task 和 futurestd::future<int> result = std::async(std::launch::deferred | std::launch::async, async_task);// 此时可以执行其他操作cout << "干其他事情" << endl;// 获取异步任务结果int ret = result.get();cout << ret << endl;return 0;
}

std::packaged_task

        std::packaged_task就是将任务和std::future绑定在一起的模板,是一种对任务的封装,我们可以通过std::packaged_task对象获取任务相关联的std::future对象,通过调用get_future()方法获取。std::packaged_task的模板参数是函数签名。

所谓函数签名就是一个函数头去掉函数名

下面介绍一下std::packaged_task,首先这个类型的对象是不可复制的

可以看到拷贝构造函数被delete了。

        std::packaged_task是用来包装异步任务的工具,它的本质是将一个可调用对象封装起来,和std::future结合起来,这个不能被直接调用,因为这样的实质是同步调用任务,而不是异步调用,并且std::packaged_task对象是没有返回值的,因为是不可拷贝的, 所以std::packaged_task对象在使用的时候,需要创建一个线程,然后使用智能指针或者move函数来进行传递。注意因为创建了一个新的线程,并且需要获取到这个新的线程执行任务的结果,所以我们就需要进行等待或者分离,即join和detach()。

使用move进行移动

#include <iostream>
#include <future>
#include <thread>
#include <memory>
#include <chrono>int Add(int num1, int num2)
{std::this_thread::sleep_for(std::chrono::seconds(2));return num1 + num2;
}int main()
{std::packaged_task<int(int, int)> task(Add);std::future<int> fu = task.get_future();std::thread th(std::move(task), 11, 22);int ret = fu.get();std::cout << ret << std::endl;th.join();return 0;
}

使用智能指针

#include <iostream>
#include <future>
#include <thread>
#include <memory>
#include <chrono>int Add(int num1, int num2)
{std::this_thread::sleep_for(std::chrono::seconds(2));return num1 + num2;
}int main()
{auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(Add);std::future<int> fu = ptask->get_future();std::thread th([ptask](){(*ptask)(11, 22);});int ret = fu.get();std::cout << ret << std::endl;th.join();return 0;
}

std::promise

        std::promise提供了一种设置值的方式,它可以在设置之后通过相关联的std::future对象进行读取,换种说法来说就是之前说过的std::future可以读取一个异步函数的返回值了,但是要等待就绪,而std::promise就提供了一个方式手动让std::future就绪

#include <iostream>
#include <future>
#include <chrono>void task(std::promise<int> result_promise)
{int result = 2;result_promise.set_value(result);std::cout << result << std::endl;
}int main()
{std::promise<int> result;std::future<int> reuslt_future = result.get_future();std::thread th(task, std::move(result));int ret = reuslt_future.get();std::cout << ret << std::endl;th.join();return 0;
}

线程池设计

#include <iostream>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <mutex>
#include <condition_variable>
#include <vector>class threadPool
{
public:using ptr = std::shared_ptr<threadPool>;using Functor = std::function<void(void)>;threadPool(int thr_count = 1): _stop(false){for (int i = 0; i < thr_count; i++){_threads.emplace_back(&threadPool::entry, this);}}~threadPool(){stop();}// push传入的是首先有一个函数--用户要执行的函数,接下来是不定参,标识要处理的数据就是要传入到函数中的参数// push函数内部,会将这个传入的函数封装成一个异步任务(packaged_task),// 使用 lambda 生成一个可调用对象(内部执行异步程序)抛入到任务池中,由工作线程取出进行执行template <typename F, typename ...Args>auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>{// 1. 将传入的函数封装成一个packaged_task任务using return_type = decltype(func(args...));auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);//auto task = std::make_shared<std::packaged_task<return_type(std::forward<Args>(args)...)>>(std::forward(func));auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);std::future<return_type> fu = task->get_future();// 2. 构造一个lambda 匿名函数(捕获任务对象),函数内执行任务对象{std::unique_lock<std::mutex> lock(_mutex);// 3. 将构造出来的匿名函数,抛入到任务池中_taskpool.push_back([task](){ (*task)(); });_cv.notify_one();}return fu;}void stop(){if(_stop == true) return ;_stop = true;_cv.notify_all();for (auto &thread : _threads){thread.join();}}private:// 线程入口函数---内部不断的从任务池中取出任务进行执行void entry(){while (!_stop){std::vector<Functor> tmp_taskpool;{// 加锁std::unique_lock<std::mutex> lock(_mutex);// 等待任务池不为空,或者_stop 被置位返回true_cv.wait(lock, [this](){ return _stop || !_taskpool.empty(); });// 取出任务进行执行tmp_taskpool.swap(_taskpool);   // 这里是将全局任务池里面的任务全部给一个线程里面的任务池    }for (auto &task : tmp_taskpool){task();}}}private:std::atomic<bool> _stop;std::vector<Functor> _taskpool;   // 任务池std::mutex _mutex;              // 互斥锁std::condition_variable _cv;    // 条件变量std::vector<std::thread> _threads;
};

main函数

#include "threadpool.hpp"int Add(int num1, int num2)
{return num1 + num2;
}int main()
{threadPool pool;for(int i = 0; i < 10; i++){std::future<int> fu = pool.push(Add, 11, i);std::cout << fu.get() << std::endl;}pool.stop();return 0;
}

相关文章:

仿RabbitMq实现简易消息队列基础篇(future操作实现异步线程池)

TOC 介绍 std::future 是C11标准库中的一个模板类&#xff0c;他表示一个异步操作的结果&#xff0c;当我们在多线程编程中使用异步任务时&#xff0c;std::future可以帮助我们在需要的时候&#xff0c;获取任务的执行结果&#xff0c;std::future 的一个重要特性是能…...

经典算法题总结:数组常用技巧(双指针,二分查找和位运算)篇

双指针 在处理数组和链表相关问题时&#xff0c;双指针技巧是经常用到的&#xff0c;双指针技巧主要分为两类&#xff1a;左右指针和快慢指针。所谓左右指针&#xff0c;就是两个指针相向而行或者相背而行&#xff1b;而所谓快慢指针&#xff0c;就是两个指针同向而行&#xf…...

版本控制基础理论

一、本地版本控制 在本地记录文件每次的更新&#xff0c;可以对每个版本做一个快照&#xff0c;或是记录补丁文件&#xff0c;适合个人使用&#xff0c;如RCS. 二、集中式版本控制&#xff08;代表SVN&#xff09; 所有的版本数据都保存在服务器上&#xff0c;协同开发者从…...

微分方程(Blanchard Differential Equations 4th)中文版Section1.4

1.4 NUMERICAL TECHNIQUE: EULER’S METHOD 上一节中讨论的斜率场的几何概念与近似微分方程解的基本数值方法密切相关。给定一个初值问题 d y d t = f ( t , y ) , y ( t 0 ) = y 0 , \frac{dy}{dt}=f(t,y), \quad y(t_0) = y_0, dtdy​=f(t,y),y(t0​)=y0​, 我们可以通过首…...

求职Leetcode算法题(7)

1.搜索旋转排序数组 这道题要求时间复杂度为o&#xff08;log n&#xff09;&#xff0c;那么第一时间想到的就是二分法&#xff0c;二分法有个前提条件是在有序数组下&#xff0c;我们发现在这个数组中存在两部分是有序的&#xff0c;所以我们只需要对前半部分和后半部分分别…...

ActiveMQ、RabbitMQ、Kafka、RocketMQ在事务性消息、性能、高可用和容错、定时消息、负载均衡、刷盘策略的区别

ActiveMQ、RabbitMQ、Kafka、RocketMQ这四种消息队列在事务性消息、性能、高可用和容错、定时消息、负载均衡、刷盘策略等方面各有其特点和差异。以下是对这些方面的详细比较&#xff1a; 1. 事务性消息 ActiveMQ&#xff1a;支持事务性消息。ActiveMQ可以基于JMS&#xff08…...

HanLP分词的使用与注意事项

1 概述 HanLP是一个自然语言处理工具包&#xff0c;它提供的主要功能如下&#xff1a; 分词转化为拼音繁转简、简转繁提取关键词提取短语提取词语自动摘要依存文法分析 下面将介绍其分词功能的使用。 2 依赖 下面是依赖的jar包。 <dependency><groupId>com.ha…...

Python 的进程、线程、协程的区别和联系是什么?

一、区别 1. 进程 • 定义&#xff1a;进程是操作系统分配资源的基本单位。 • 资源独立性&#xff1a;每个进程都有独立的内存空间&#xff0c;包括代码、数据和运行时的环境。 • 并发性&#xff1a;可以同时运行多个进程&#xff0c;操作系统通过时间片轮转等方式在不同…...

实时数据推送:Spring Boot 中两种 SSE 实战方案

在 Web 开发中&#xff0c;实时数据交互变得越来越普遍。无论是股票价格的波动、比赛比分的更新&#xff0c;还是聊天消息的传递&#xff0c;都需要服务器能够及时地将数据推送给客户端。传统的 HTTP 请求-响应模式在处理这类需求时显得力不从心&#xff0c;而服务器推送事件&a…...

数据守护者:SQL一致性检查的艺术与实践

标题&#xff1a;数据守护者&#xff1a;SQL一致性检查的艺术与实践 在数据驱动的商业世界中&#xff0c;数据的一致性是确保决策准确性和业务流程顺畅的关键。SQL作为数据查询和操作的基石&#xff0c;提供了多种工具来维护数据的一致性。本文将深入探讨如何使用SQL进行数据一…...

jenkins配置+vue打包多环境切换

jenkins配置流水线过程 1.新建item 加入相关的参数就行了。 流水线脚本设置 后端脚本 node {stage checkoutsh"""#每次打包清空工作空间目录rm -rf $workspace/*cd $workspace#到工作空间下从远端svn服务端拉取代码svn co svn://10.1.19.21/repo/技术中台/低…...

idea和jdk的安装教程

1.JDK的安装 下载 进入官网&#xff0c;找到你需要的JDK版本 Java Downloads | Oracle 中国 我这里是windows的jdk17&#xff0c;选择以下 安装 点击下一步&#xff0c;安装完成 配置环境变量 打开查看高级系统设置 在系统变量中添加两个配置 一个变量名是 JAVA_HOME …...

HTML静态网页成品作业(HTML+CSS)——电影网首页网页设计制作(1个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有1个页面。 二、作品演示 三、代…...

大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库

大数据系列之&#xff1a;Flink Doris Connector&#xff0c;实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、…...

LabVIEW VI 多语言动态加载与运行的实现

在多语言应用程序开发中&#xff0c;确保用户界面能够根据用户的语言偏好动态切换是一个关键需求。本文通过分析一个LabVIEW程序框图&#xff0c;详细说明了如何使用LabVIEW中的属性节点和调用节点来实现VI&#xff08;虚拟仪器&#xff09;界面语言的动态加载与运行。此程序允…...

Unity引擎基础知识

目录 Unity基础知识概要 1. 创建工程 2. 工程目录介绍 3. Unity界面和五大面板 4. 游戏物体创建与操作 5. 场景和层管理 6. 组件系统 7. 脚本语言C# 8. 物理引擎和UI系统 学习资源推荐 Unity引擎中如何优化大型游戏项目的性能&#xff1f; Unity C#脚本语言的高级编…...

练习题- 探索正则表达式对象和对象匹配

正则表达式(Regular Expressions)是一种强大而灵活的文本处理工具,它允许我们通过模式匹配来处理字符串。这在数据清理、文本分析等领域有着广泛的应用。在Python中,正则表达式通过re模块提供支持,学习和掌握正则表达式对于处理复杂的文本数据至关重要。 本文将探索如何在…...

Java集合提升

1. 手写ArrayList 1.1. ArrayList底层原理细节 底层结构是一个长度可以动态增长的数组&#xff08;顺序表&#xff09;transient Object[] elementData; 特点&#xff1a;在内存中分配连续的空间&#xff0c;只存储数据&#xff0c;不存储地址信息。位置就隐含着地址。优点 节…...

uniapp 微信小程序生成水印图片

效果 源码 <template><view style"overflow: hidden;"><camera device-position"back" flash"auto" class"camera"><cover-view class"text-white padding water-mark"><cover-view class"…...

ElasticSearch相关知识点

ElasticSearch中的倒排索引是如何工作的&#xff1f; 倒排索引是ElasticSearch中用于全文检索的一种数据结构&#xff0c;与正排索引不同的是&#xff0c;正排索引将文档按照词汇顺序组织。而倒排索引是将词汇映射到包含该词汇的文档中。 在ElasticSearch中&#xff0c;倒排索…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

06 Deep learning神经网络编程基础 激活函数 --吴恩达

深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

Java毕业设计:WML信息查询与后端信息发布系统开发

JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发&#xff0c;实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构&#xff0c;服务器端使用Java Servlet处理请求&#xff0c;数据库采用MySQL存储信息&#xff0…...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.

ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #&#xff1a…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...

spring Security对RBAC及其ABAC的支持使用

RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型&#xff0c;它将权限分配给角色&#xff0c;再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...

WEB3全栈开发——面试专业技能点P7前端与链上集成

一、Next.js技术栈 ✅ 概念介绍 Next.js 是一个基于 React 的 服务端渲染&#xff08;SSR&#xff09;与静态网站生成&#xff08;SSG&#xff09; 框架&#xff0c;由 Vercel 开发。它简化了构建生产级 React 应用的过程&#xff0c;并内置了很多特性&#xff1a; ✅ 文件系…...

河北对口计算机高考MySQL笔记(完结版)(2026高考)持续更新~~~~

MySQL 基础概念 数据&#xff08;Data&#xff09;&#xff1a;文本&#xff0c;数字&#xff0c;图片&#xff0c;视频&#xff0c;音频等多种表现形式&#xff0c;能够被计算机存储和处理。 **数据库&#xff08;Data Base—简称DB&#xff09;&#xff1a;**存储数据的仓库…...

C++课设:实现本地留言板系统(支持留言、搜索、标签、加密等)

名人说&#xff1a;路漫漫其修远兮&#xff0c;吾将上下而求索。—— 屈原《离骚》 创作者&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 专栏介绍&#xff1a;《编程项目实战》 目录 一、项目功能概览与亮点分析1. 核心功能…...

达梦使用存储过程实现删除重复记录、判断并添加主键和自增列的逻辑

在达梦数据库中&#xff0c;要确保主键的唯一性约束&#xff0c;可以在存储过程的最前面添加删除重复记录的逻辑。以下是一个完整的存储过程&#xff0c;包含删除重复记录、判断并添加主键和自增列的逻辑&#xff1a; 存储过程示例 -- 切换到指定模式;schema_name 是目标模…...