Rust : FnOnce、线程池与多策略执行
一、问题:mpsc如何发送各类不同的函数?
3个关键词:闭包、Box与FnOnce;请细品。
use std::sync::{mpsc,Arc,Mutex};
use std::thread;
fn process<F>(old:f32,name:String,f:F) where F: FnOnce(f32,String) {f(old,name);
}
fn add_f32(a:f32,b:f32) ->f32{a+b
}
fn doit(amount: f32,code:String){println!("amount:{:?} code:{:?}",amount,code);
}
fn workit(amount:f32){println!("amount:{:?}",amount);
}
fn test(){process(2.0, "b".into(), doit);let fn1: Box<dyn FnOnce()> = Box::new(move || doit(2.0,"hello cat!".into()));fn1();let fn2: Box<dyn FnOnce(f32,String)> = Box::new(move |amount:f32,code:String| doit(amount,code));fn2(2.0,"hello john!".into());let fn3:Box<dyn FnOnce(f32,String)> = Box::new(move |amount:f32,code:String| process(amount,code,doit));fn3(2.0,"hello rose!".into());let fn4: Box<dyn FnOnce()> = Box::new(move ||workit(3.0));fn4();let fn5: Box<dyn FnOnce()> = Box::new(move ||println!("hello world!"));fn5();// 带类型返还的结构let fn6:Box<dyn FnOnce()->f32> = Box::new(move ||add_f32(1.0,2.0));fn6();
}
// 定义闭包中没有参数输入的函数类型,做为发送对象
type box_fn = Box<dyn FnOnce() + Send>;
fn main(){//test();let (sender, receiver) = mpsc::channel::<box_fn>();let receiver = Arc::new(Mutex::new(receiver));// 注意:||中均没有参数,故是FnOnce(); 具体参考test()let vec_fn:Vec<Box<dyn FnOnce()+Send>> = vec![Box::new(move ||println!("hello world!")),Box::new(move ||workit(3.0)),Box::new(move || doit(2.0,"hello cat!".into()))];for f in vec_fn {sender.send(f).unwrap();}let rx = receiver.clone();let handle = thread::spawn(move ||{loop{let box_fn = rx.lock().unwrap().recv().unwrap() ;println!("from son thread!");box_fn();}});println!("main thread sended all box_fn!");handle.join().unwrap();}
二、线程池的应用:发送函数有什么用处?
如果需要让每一个函数都分配一个线程来执行这些函数(任务),或者用一个线程池来执行函数,这个时侯就可以用上场了。
在线程池中,FnOnce是一个其中的灵魂。他可以把所有的函数进行抽象统一,便一管理和执行。
// 这个线程池是一个比较粗的框架;可以执行不同策略类型:多周期、趋势、套利、市场中性等各类策略组的运行。
use std::sync::atomic::AtomicU32;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
// 存在不均衡的情况 => steal work todo!
// 简易的threadpool,没有用condvar.
// send_num可以不要,这里只是记录一下发送的情况
pub struct ThreadPool {threads_num: usize,workers: Vec<Worker>,sender: mpsc::Sender<Message>,send_num: Arc<Mutex<usize>>,
}
// 定义执行各类函数的参数结构类型,带不带参数,带参数的类型,多少
// 其中参数只是举例。比如某类策略只需要输入vec<bar>或vec<tick>; 某类策略需要输入hashmap<string,vec<bar>>;
// 或者套利策略中:两组或多组pair: (vec<bar>,vec<bar>)......
// 一组1min vec<bar>,3min vec<bar>,5min vec<bar>等......
// 不同的策略类型,都可以在这里进行抽象;
// 最后线程池可以执行各类不同的策略;
enum StrategyFn
{T(Box<dyn FnOnce() + Send +'static>),U(Box<dyn FnOnce(f32, f32) + Send +'static>,Arc<f32>,Arc<f32>),W(Box<dyn FnOnce(Vec<f32>, f32) + Send +'static>,Arc<Vec<f32>>,Arc<f32>),
}// 定义发送内容
enum Message{Task(Task),Shutdown,
}
struct Task{job :Job,task_id :usize,
}type Job = Box<dyn FnOnce() + Send +'static>;impl ThreadPool {pub fn new(threads_num: usize) -> ThreadPool {assert!(threads_num > 0);let (sender, receiver) = mpsc::channel::<Message>();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(threads_num);let send_num = Arc::new(Mutex::new(0_usize));let execute_num = Arc::new(Mutex::new(0_usize));for id in 0..threads_num {let mut worker = Worker::new(id);worker.run(&receiver, &execute_num);workers.push(worker);}ThreadPool { threads_num,workers, sender,send_num}}pub fn execute(&mut self, task_id:usize,strategy_fn: StrategyFn){let mut job:Box<dyn FnOnce()+Send +'static> = Box::new(move||{});match strategy_fn{StrategyFn::U(f, input1, input2) => {job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));},StrategyFn::T(f) =>{job = Box::new(move || f());}StrategyFn::W(f,input1,input2)=> {job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));},}let task = Task{job:job,task_id:task_id};let mut send_num = self.send_num.lock().unwrap();*send_num = *send_num +1;self.sender.send(Message::Task(task)).unwrap();}}pub struct Worker {id: usize,thread: Option<thread::JoinHandle<()>>,}impl Worker {pub fn new(id: usize)-> Self{Worker {id:id,thread: None,}}pub fn run(&mut self, receiver: &Arc<Mutex<mpsc::Receiver<Message>>>,execute_num: &Arc<Mutex<usize>>){let id = self.id;let receiver = receiver.clone();let execute_num = execute_num.clone();let thread = thread::spawn(move || loop {let msg = receiver.lock().unwrap().recv().unwrap();match msg{Message::Task(task) => {println!("Worker {} got a task; executing task {}.", id,task.task_id);let mut execute_num = execute_num.lock().unwrap();*execute_num = *execute_num + 1;println!("work {} = >execute_num: {}", id,*execute_num);(task.job)();},Message::Shutdown => {println!("Worker {} received shutdown message.", id);break;//很关键}}});self.thread = Some(thread);}
}impl Drop for ThreadPool {fn drop(&mut self) {for _ in 0..self.threads_num{self.sender.send(Message::Shutdown).unwrap();}println!("drop worker :{:?}",self.workers.len());for (i,worker) in (&mut self.workers).into_iter().enumerate() {println!("------Shutting down worker {} i:{} -----------", worker.id,i);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}
// 这里只是通过宏生成一类:U(Box<dyn FnOnce(f32, f32) + Send +'static>,Arc<f32>,Arc<f32>)策略数据类型;
//
macro_rules! create_strategy_U{($($s:ident),*) => ($(pub fn $s(_a:f32,_b:f32){println!("run 【U】 strategy {:?}",stringify!($s));//thread::sleep(Duration::from_millis(1));})* );
}
//这里只是通过宏生成一类:T(Box<dyn FnOnce() + Send +'static>)策略数据类型;
macro_rules! create_strategy_T{($($s:ident),*) => ($(pub fn $s(){println!("run 【T】 strategy {:?}",stringify!($s));//thread::sleep(Duration::from_millis(1));})* );
}
// 生成U策略组若干
create_strategy_U!(A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q);
// 生成T策略组若干
create_strategy_T!(AA,BB,CC,DD,EE,FF,GG);
pub fn main() {let mut pool = ThreadPool::new(2);let input1 = Arc::new(1.0);let input2 = Arc::new(2.0);let u_strategies = vec![A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q];let t_strategies = vec![AA,BB,CC,DD,EE,FF,GG];println!("u_strategies num : {:?}",u_strategies.len());for (task_id,u_strategy) in u_strategies.into_iter().enumerate(){let strategy_fn = StrategyFn::U(Box::new(u_strategy),input1.clone(), input2.clone());pool.execute(task_id as usize,strategy_fn);}println!("t_strategies num : {:?}",t_strategies.len());for (task_id,t_strategy) in t_strategies.into_iter().enumerate(){let strategy_fn = StrategyFn::T(Box::new(t_strategy));pool.execute(task_id as usize,strategy_fn);}println!("toal send_num : {:?}",*pool.send_num.lock().unwrap());
}
输出:
能过U和T类型策略运行实例,说明只要事先进行枚举定义,可以看出,这个线程池可以执行不同类型的策略。
这里有一个问题:这些线程池中线程中如何抢到这个任务,并完整执行完这些任务?这里依靠的是Arc<Mutex<mpsc::Receiver>>结构来保证。
具体如下:
u_strategies num : 17
t_strategies num : 7
toal send_num : 24
drop worker :4
------Shutting down worker 0 i:0 -----------
Worker 2 got a task; executing task 3.
work 2 = >execute_num: 1
run 【U】 strategy "D"
Worker 2 got a task; executing task 4.
work 2 = >execute_num: 2
run 【U】 strategy "E"
Worker 2 got a task; executing task 5.
work 2 = >execute_num: 3
Worker 1 got a task; executing task 1.
Worker 3 got a task; executing task 2.
Worker 0 got a task; executing task 0.
run 【U】 strategy "F"
Worker 2 got a task; executing task 6.
work 1 = >execute_num: 4
run 【U】 strategy "B"
Worker 1 got a task; executing task 7.
work 3 = >execute_num: 5
run 【U】 strategy "C"
work 0 = >execute_num: 6
run 【U】 strategy "A"
Worker 0 got a task; executing task 9.
Worker 3 got a task; executing task 8.
work 2 = >execute_num: 7
run 【U】 strategy "G"
Worker 2 got a task; executing task 10.
work 1 = >execute_num: 8
run 【U】 strategy "H"
Worker 1 got a task; executing task 11.
work 0 = >execute_num: 9
run 【U】 strategy "J"
Worker 0 got a task; executing task 12.
work 3 = >execute_num: 10
run 【U】 strategy "I"
Worker 3 got a task; executing task 13.
work 2 = >execute_num: 11
run 【U】 strategy "K"
Worker 2 got a task; executing task 14.
work 1 = >execute_num: 12
run 【U】 strategy "L"
Worker 1 got a task; executing task 15.
work 0 = >execute_num: 13
run 【U】 strategy "M"
Worker 0 got a task; executing task 16.
work 3 = >execute_num: 14
run 【U】 strategy "N"
Worker 3 got a task; executing task 0.
work 2 = >execute_num: 15
run 【U】 strategy "O"
Worker 2 got a task; executing task 1.
work 1 = >execute_num: 16
run 【U】 strategy "P"
Worker 1 got a task; executing task 2.
work 0 = >execute_num: 17
run 【U】 strategy "Q"
Worker 0 got a task; executing task 3.
work 3 = >execute_num: 18
run 【T】 strategy "AA"
Worker 3 got a task; executing task 4.
work 2 = >execute_num: 19
run 【T】 strategy "BB"
Worker 2 got a task; executing task 5.
work 1 = >execute_num: 20
run 【T】 strategy "CC"
Worker 1 got a task; executing task 6.
work 0 = >execute_num: 21
run 【T】 strategy "DD"
Worker 0 received shutdown message.
work 3 = >execute_num: 22
run 【T】 strategy "EE"
Worker 3 received shutdown message.
work 2 = >execute_num: 23
run 【T】 strategy "FF"
------Shutting down worker 1 i:1 -----------
Worker 2 received shutdown message.
work 1 = >execute_num: 24
run 【T】 strategy "GG"
Worker 1 received shutdown message.
------Shutting down worker 2 i:2 -----------
------Shutting down worker 3 i:3 -----------
相关文章:
Rust : FnOnce、线程池与多策略执行
一、问题:mpsc如何发送各类不同的函数? 3个关键词:闭包、Box与FnOnce;请细品。 use std::sync::{mpsc,Arc,Mutex}; use std::thread; fn process<F>(old:f32,name:String,f:F) where F: FnOnce(f32,String) {f(old,name);…...
一个汉字占几个字节、JS中如何获得一个字符串占用多少字节?
浅浅记录 一个汉字占几个字节?JS中如何获得一个字符串占用多少字节? 一个汉字占几个字节? GBK编码:一个汉字、中文字符都是占2个字节,英文字符占1个字节 UTF-8编码:一个汉字、中文字符都是占3个字节&#…...
CommonJS 和 ES modules
CommonJS 和 ES modules (ESM) 是两种不同的模块系统,它们用于组织 JavaScript 代码,并允许不同文件之间共享代码。 CommonJS (CJS) CommonJS 是最早的 JavaScript 模块化规范之一,主要用于 Node.js 环境中。CommonJS 规定每个文件都是一个…...
计算机网络——CDN
空间编码例子:不是发送N个相同颜色值,而是仅发送2个值,颜色和重复个数 时间编码例子:不是发送i1帧的全部编码,而是仅发送帧i差别的地方 视频播放时,先下载manifest file文件——>解析(不…...
大数据治理:挑战与策略
随着信息技术的飞速发展,大数据已成为当今社会的重要资源。大数据治理作为管理和利用大数据的关键手段,对于提升数据质量、保障数据安全、实现数据价值具有重要意义。本文首先阐述了大数据治理的概念和目标,接着分析了大数据治理面临的挑战&a…...
屋面通风器安装方案及流程
屋面通风器的安装方案及流程是一个系统性工作,需要仔细规划和执行,以确保安装质量和通风器的正常运行。昱合昇天窗厂家为大家整理了详细的安装方案及流程,供您参考。一、安装前准备 1、确定安装位置 根据建筑物屋顶结构和通风需求,…...
ComfyUI一键更换服装:IP-Adapter V2 + FaceDetailer(DeepFashion)
在这篇文章中,我们将探索如何使用新版的IP-Adapter和ComfyUI软件为人物进行换装。 整个过程非常简单,仅需要两张图片:一张服装图片和一张人物图片。 通过一系列节点的操作,ComfyUI就会把这个服装换到人物身上,并利用…...
AWS账号与亚马逊账号的关系解析
在当今数字化时代,云计算已成为企业和个人用户不可或缺的一部分。亚马逊网络服务(AWS)是全球领先的云计算平台,而亚马逊(Amazon)则是全球最大的在线零售商之一。许多人在使用这两个平台时,常常会…...
Java八大基本数据类型详解
引言 一、整数类型 二、 浮点类型 三、.字符类型 四、布尔类型 示例代码 注意事项 引言 在Java编程语言中,基本数据类型是构建程序的基础。了解这些数据类型的特性和使用方法对于编写高效且正确的代码至关重要。本文将详细介绍Java的八大基本数据类型ÿ…...
ChatGPT的终极指南概要
ChatGPT的终极指南概要 [ Prompt Format(提示格式) 是一种用于指导ChatGPT生成特定类型回答的模板。它通常包括以下几个部分: 角色(Role):定义AI模型在Prompt中所扮演的角色,例如专家、顾问、…...
Android应用性能优化的方法
Android应用性能优化是一个复杂而关键的过程,涉及多个方面,包括布局优化、网络优化、安装包优化、内存优化、卡顿优化、启动优化等。以下是对这些优化方法的详细解析: 一、布局优化 布局优化是Android性能优化的基础,主要目标是…...
『网络游戏』客户端发送消息到服务器【17】
将上一章服务器的协议PEProtocol的.dll文件重新生成导入unity客户端中 命名为Net 点击生成 另一种导入.dll文件方式 在客户端粘贴即可 此时Net文件夹的.dll文件就导入进来了 创建脚本:NetSvc.cs 编写脚本:NetSvc.cs 修改脚本:GameRoot.cs 在…...
【系统架构设计师】专题:数据库系统考点梳理
更多内容请见: 备考系统架构设计师-核心总结目录 文章目录 一、数据库基本概念1、数据库技术的发展2、数据模型3、数据库管理系统4、数据库三级模式二、关系数据库1、关系数据库基本概念2、关系运算3、关系数据库设计基本理论三、数据库设计1、数据库设计的基本步骤2、数据需求…...
Java传递对象是值传递还是引用传递?
🎉 前言 之前一直以为Java传对象是引用传递,直到最近用Java写数据结构链表时遇到一些问题,这才让我重新思考这个问题,经过我的一番研究,发现不能一棒子打死,其实这其中既有值传递,又有引用传递…...
解锁C++多态的魔力:灵活与高效的编码艺术(上)
文章目录 前言🌸一、多态的定义与概念🌻1.1 多态的核心思想:🌻1.2 多态的两种主要形式: 🌸二、多态的使用条件🌻2.1 基类指针或引用2.1.1 为什么需要基类指针或引用 🌻2.2 虚函数&am…...
k8s系列-Rancher 上操作的k8s容器网络配置总结
Rancher 上操作的k8s容器网络配置总结 要在 Rancher 中配置Spring Boot 应用 ykhd-zhjgyw-xpwfxfjfl 服务,正确的配置方式如下: 1. 应用程序监听端口 在 application.yaml 文件中,配置的应用监听端口是 10001,并且应用的上下文…...
2024年【氯化工艺】考试题库及氯化工艺考试内容
题库来源:安全生产模拟考试一点通公众号小程序 氯化工艺考试题库根据新氯化工艺考试大纲要求,安全生产模拟考试一点通将氯化工艺模拟考试试题进行汇编,组成一套氯化工艺全真模拟考试试题,学员可通过氯化工艺考试内容全真模拟&…...
从commit校验失效问题探究husky原理
一、背景 之前创建的项目,发现代码 commit 提交的时候没有了任何校验,具体表现: 一是 feat fix 等主题格式校验没有了二是代码 lint 不通过也能提交 尝试解决这个问题,并深入了解husky的实现原理,将相关的一些知识点…...
Azure OpenAI 服务上线具有音频和语音功能的 GPT-4o-Realtime-Preview,免费申请试用
微软宣布 GPT-4o-Realtime-Preview 音频和语音公开预览版的推出,这是对Microsoft Azure OpenAI 服务的重大增强,增加了高级语音功能并扩展了 GPT-4o 的多模式产品。 这一里程碑进一步巩固了 Azure 在人工智能领域的领导地位,尤其是在语音技术…...
基于IMX6UL的EPIT的定时器实验
定时器是最常用的外设,常常需要使用定时器来完成精准的定时功能,I.MX6U 提供了多 种硬件定时器,有些定时器功能非常强大。本章我们从最基本的 EPIT 定时器开始,学习如何配置EPIT 定时器,使其按照给定的时间,…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...
【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)
1.获取 authorizationCode: 2.利用 authorizationCode 获取 accessToken:文档中心 3.获取手机:文档中心 4.获取昵称头像:文档中心 首先创建 request 若要获取手机号,scope必填 phone,permissions 必填 …...
dify打造数据可视化图表
一、概述 在日常工作和学习中,我们经常需要和数据打交道。无论是分析报告、项目展示,还是简单的数据洞察,一个清晰直观的图表,往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server,由蚂蚁集团 AntV 团队…...
STM32---外部32.768K晶振(LSE)无法起振问题
晶振是否起振主要就检查两个1、晶振与MCU是否兼容;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容(CL)与匹配电容(CL1、CL2)的关系 2. 如何选择 CL1 和 CL…...
Linux系统部署KES
1、安装准备 1.版本说明V008R006C009B0014 V008:是version产品的大版本。 R006:是release产品特性版本。 C009:是通用版 B0014:是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存:1GB 以上 硬盘…...
【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制
目录 节点的功能承载层(GATT/Adv)局限性: 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能,如 Configuration …...
关于easyexcel动态下拉选问题处理
前些日子突然碰到一个问题,说是客户的导入文件模版想支持部分导入内容的下拉选,于是我就找了easyexcel官网寻找解决方案,并没有找到合适的方案,没办法只能自己动手并分享出来,针对Java生成Excel下拉菜单时因选项过多导…...
windows系统MySQL安装文档
概览:本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容,为学习者提供全面的操作指导。关键要点包括: 解压 :下载完成后解压压缩包,得到MySQL 8.…...
【SpringBoot自动化部署】
SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一,能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时,需要添加Git仓库地址和凭证,设置构建触发器(如GitHub…...
