Rust : FnOnce、线程池与多策略执行
一、问题:mpsc如何发送各类不同的函数?
3个关键词:闭包、Box与FnOnce;请细品。
use std::sync::{mpsc,Arc,Mutex};
use std::thread;
fn process<F>(old:f32,name:String,f:F) where F: FnOnce(f32,String) {f(old,name);
}
fn add_f32(a:f32,b:f32) ->f32{a+b
}
fn doit(amount: f32,code:String){println!("amount:{:?} code:{:?}",amount,code);
}
fn workit(amount:f32){println!("amount:{:?}",amount);
}
fn test(){process(2.0, "b".into(), doit);let fn1: Box<dyn FnOnce()> = Box::new(move || doit(2.0,"hello cat!".into()));fn1();let fn2: Box<dyn FnOnce(f32,String)> = Box::new(move |amount:f32,code:String| doit(amount,code));fn2(2.0,"hello john!".into());let fn3:Box<dyn FnOnce(f32,String)> = Box::new(move |amount:f32,code:String| process(amount,code,doit));fn3(2.0,"hello rose!".into());let fn4: Box<dyn FnOnce()> = Box::new(move ||workit(3.0));fn4();let fn5: Box<dyn FnOnce()> = Box::new(move ||println!("hello world!"));fn5();// 带类型返还的结构let fn6:Box<dyn FnOnce()->f32> = Box::new(move ||add_f32(1.0,2.0));fn6();
}
// 定义闭包中没有参数输入的函数类型,做为发送对象
type box_fn = Box<dyn FnOnce() + Send>;
fn main(){//test();let (sender, receiver) = mpsc::channel::<box_fn>();let receiver = Arc::new(Mutex::new(receiver));// 注意:||中均没有参数,故是FnOnce(); 具体参考test()let vec_fn:Vec<Box<dyn FnOnce()+Send>> = vec![Box::new(move ||println!("hello world!")),Box::new(move ||workit(3.0)),Box::new(move || doit(2.0,"hello cat!".into()))];for f in vec_fn {sender.send(f).unwrap();}let rx = receiver.clone();let handle = thread::spawn(move ||{loop{let box_fn = rx.lock().unwrap().recv().unwrap() ;println!("from son thread!");box_fn();}});println!("main thread sended all box_fn!");handle.join().unwrap();}
二、线程池的应用:发送函数有什么用处?
如果需要让每一个函数都分配一个线程来执行这些函数(任务),或者用一个线程池来执行函数,这个时侯就可以用上场了。
在线程池中,FnOnce是一个其中的灵魂。他可以把所有的函数进行抽象统一,便一管理和执行。
// 这个线程池是一个比较粗的框架;可以执行不同策略类型:多周期、趋势、套利、市场中性等各类策略组的运行。
use std::sync::atomic::AtomicU32;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
// 存在不均衡的情况 => steal work todo!
// 简易的threadpool,没有用condvar.
// send_num可以不要,这里只是记录一下发送的情况
pub struct ThreadPool {threads_num: usize,workers: Vec<Worker>,sender: mpsc::Sender<Message>,send_num: Arc<Mutex<usize>>,
}
// 定义执行各类函数的参数结构类型,带不带参数,带参数的类型,多少
// 其中参数只是举例。比如某类策略只需要输入vec<bar>或vec<tick>; 某类策略需要输入hashmap<string,vec<bar>>;
// 或者套利策略中:两组或多组pair: (vec<bar>,vec<bar>)......
// 一组1min vec<bar>,3min vec<bar>,5min vec<bar>等......
// 不同的策略类型,都可以在这里进行抽象;
// 最后线程池可以执行各类不同的策略;
enum StrategyFn
{T(Box<dyn FnOnce() + Send +'static>),U(Box<dyn FnOnce(f32, f32) + Send +'static>,Arc<f32>,Arc<f32>),W(Box<dyn FnOnce(Vec<f32>, f32) + Send +'static>,Arc<Vec<f32>>,Arc<f32>),
}// 定义发送内容
enum Message{Task(Task),Shutdown,
}
struct Task{job :Job,task_id :usize,
}type Job = Box<dyn FnOnce() + Send +'static>;impl ThreadPool {pub fn new(threads_num: usize) -> ThreadPool {assert!(threads_num > 0);let (sender, receiver) = mpsc::channel::<Message>();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(threads_num);let send_num = Arc::new(Mutex::new(0_usize));let execute_num = Arc::new(Mutex::new(0_usize));for id in 0..threads_num {let mut worker = Worker::new(id);worker.run(&receiver, &execute_num);workers.push(worker);}ThreadPool { threads_num,workers, sender,send_num}}pub fn execute(&mut self, task_id:usize,strategy_fn: StrategyFn){let mut job:Box<dyn FnOnce()+Send +'static> = Box::new(move||{});match strategy_fn{StrategyFn::U(f, input1, input2) => {job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));},StrategyFn::T(f) =>{job = Box::new(move || f());}StrategyFn::W(f,input1,input2)=> {job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));},}let task = Task{job:job,task_id:task_id};let mut send_num = self.send_num.lock().unwrap();*send_num = *send_num +1;self.sender.send(Message::Task(task)).unwrap();}}pub struct Worker {id: usize,thread: Option<thread::JoinHandle<()>>,}impl Worker {pub fn new(id: usize)-> Self{Worker {id:id,thread: None,}}pub fn run(&mut self, receiver: &Arc<Mutex<mpsc::Receiver<Message>>>,execute_num: &Arc<Mutex<usize>>){let id = self.id;let receiver = receiver.clone();let execute_num = execute_num.clone();let thread = thread::spawn(move || loop {let msg = receiver.lock().unwrap().recv().unwrap();match msg{Message::Task(task) => {println!("Worker {} got a task; executing task {}.", id,task.task_id);let mut execute_num = execute_num.lock().unwrap();*execute_num = *execute_num + 1;println!("work {} = >execute_num: {}", id,*execute_num);(task.job)();},Message::Shutdown => {println!("Worker {} received shutdown message.", id);break;//很关键}}});self.thread = Some(thread);}
}impl Drop for ThreadPool {fn drop(&mut self) {for _ in 0..self.threads_num{self.sender.send(Message::Shutdown).unwrap();}println!("drop worker :{:?}",self.workers.len());for (i,worker) in (&mut self.workers).into_iter().enumerate() {println!("------Shutting down worker {} i:{} -----------", worker.id,i);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}
// 这里只是通过宏生成一类:U(Box<dyn FnOnce(f32, f32) + Send +'static>,Arc<f32>,Arc<f32>)策略数据类型;
//
macro_rules! create_strategy_U{($($s:ident),*) => ($(pub fn $s(_a:f32,_b:f32){println!("run 【U】 strategy {:?}",stringify!($s));//thread::sleep(Duration::from_millis(1));})* );
}
//这里只是通过宏生成一类:T(Box<dyn FnOnce() + Send +'static>)策略数据类型;
macro_rules! create_strategy_T{($($s:ident),*) => ($(pub fn $s(){println!("run 【T】 strategy {:?}",stringify!($s));//thread::sleep(Duration::from_millis(1));})* );
}
// 生成U策略组若干
create_strategy_U!(A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q);
// 生成T策略组若干
create_strategy_T!(AA,BB,CC,DD,EE,FF,GG);
pub fn main() {let mut pool = ThreadPool::new(2);let input1 = Arc::new(1.0);let input2 = Arc::new(2.0);let u_strategies = vec![A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q];let t_strategies = vec![AA,BB,CC,DD,EE,FF,GG];println!("u_strategies num : {:?}",u_strategies.len());for (task_id,u_strategy) in u_strategies.into_iter().enumerate(){let strategy_fn = StrategyFn::U(Box::new(u_strategy),input1.clone(), input2.clone());pool.execute(task_id as usize,strategy_fn);}println!("t_strategies num : {:?}",t_strategies.len());for (task_id,t_strategy) in t_strategies.into_iter().enumerate(){let strategy_fn = StrategyFn::T(Box::new(t_strategy));pool.execute(task_id as usize,strategy_fn);}println!("toal send_num : {:?}",*pool.send_num.lock().unwrap());
}
输出:
能过U和T类型策略运行实例,说明只要事先进行枚举定义,可以看出,这个线程池可以执行不同类型的策略。
这里有一个问题:这些线程池中线程中如何抢到这个任务,并完整执行完这些任务?这里依靠的是Arc<Mutex<mpsc::Receiver>>结构来保证。
具体如下:
u_strategies num : 17
t_strategies num : 7
toal send_num : 24
drop worker :4
------Shutting down worker 0 i:0 -----------
Worker 2 got a task; executing task 3.
work 2 = >execute_num: 1
run 【U】 strategy "D"
Worker 2 got a task; executing task 4.
work 2 = >execute_num: 2
run 【U】 strategy "E"
Worker 2 got a task; executing task 5.
work 2 = >execute_num: 3
Worker 1 got a task; executing task 1.
Worker 3 got a task; executing task 2.
Worker 0 got a task; executing task 0.
run 【U】 strategy "F"
Worker 2 got a task; executing task 6.
work 1 = >execute_num: 4
run 【U】 strategy "B"
Worker 1 got a task; executing task 7.
work 3 = >execute_num: 5
run 【U】 strategy "C"
work 0 = >execute_num: 6
run 【U】 strategy "A"
Worker 0 got a task; executing task 9.
Worker 3 got a task; executing task 8.
work 2 = >execute_num: 7
run 【U】 strategy "G"
Worker 2 got a task; executing task 10.
work 1 = >execute_num: 8
run 【U】 strategy "H"
Worker 1 got a task; executing task 11.
work 0 = >execute_num: 9
run 【U】 strategy "J"
Worker 0 got a task; executing task 12.
work 3 = >execute_num: 10
run 【U】 strategy "I"
Worker 3 got a task; executing task 13.
work 2 = >execute_num: 11
run 【U】 strategy "K"
Worker 2 got a task; executing task 14.
work 1 = >execute_num: 12
run 【U】 strategy "L"
Worker 1 got a task; executing task 15.
work 0 = >execute_num: 13
run 【U】 strategy "M"
Worker 0 got a task; executing task 16.
work 3 = >execute_num: 14
run 【U】 strategy "N"
Worker 3 got a task; executing task 0.
work 2 = >execute_num: 15
run 【U】 strategy "O"
Worker 2 got a task; executing task 1.
work 1 = >execute_num: 16
run 【U】 strategy "P"
Worker 1 got a task; executing task 2.
work 0 = >execute_num: 17
run 【U】 strategy "Q"
Worker 0 got a task; executing task 3.
work 3 = >execute_num: 18
run 【T】 strategy "AA"
Worker 3 got a task; executing task 4.
work 2 = >execute_num: 19
run 【T】 strategy "BB"
Worker 2 got a task; executing task 5.
work 1 = >execute_num: 20
run 【T】 strategy "CC"
Worker 1 got a task; executing task 6.
work 0 = >execute_num: 21
run 【T】 strategy "DD"
Worker 0 received shutdown message.
work 3 = >execute_num: 22
run 【T】 strategy "EE"
Worker 3 received shutdown message.
work 2 = >execute_num: 23
run 【T】 strategy "FF"
------Shutting down worker 1 i:1 -----------
Worker 2 received shutdown message.
work 1 = >execute_num: 24
run 【T】 strategy "GG"
Worker 1 received shutdown message.
------Shutting down worker 2 i:2 -----------
------Shutting down worker 3 i:3 -----------
相关文章:
Rust : FnOnce、线程池与多策略执行
一、问题:mpsc如何发送各类不同的函数? 3个关键词:闭包、Box与FnOnce;请细品。 use std::sync::{mpsc,Arc,Mutex}; use std::thread; fn process<F>(old:f32,name:String,f:F) where F: FnOnce(f32,String) {f(old,name);…...
一个汉字占几个字节、JS中如何获得一个字符串占用多少字节?
浅浅记录 一个汉字占几个字节?JS中如何获得一个字符串占用多少字节? 一个汉字占几个字节? GBK编码:一个汉字、中文字符都是占2个字节,英文字符占1个字节 UTF-8编码:一个汉字、中文字符都是占3个字节&#…...
CommonJS 和 ES modules
CommonJS 和 ES modules (ESM) 是两种不同的模块系统,它们用于组织 JavaScript 代码,并允许不同文件之间共享代码。 CommonJS (CJS) CommonJS 是最早的 JavaScript 模块化规范之一,主要用于 Node.js 环境中。CommonJS 规定每个文件都是一个…...
计算机网络——CDN
空间编码例子:不是发送N个相同颜色值,而是仅发送2个值,颜色和重复个数 时间编码例子:不是发送i1帧的全部编码,而是仅发送帧i差别的地方 视频播放时,先下载manifest file文件——>解析(不…...
大数据治理:挑战与策略
随着信息技术的飞速发展,大数据已成为当今社会的重要资源。大数据治理作为管理和利用大数据的关键手段,对于提升数据质量、保障数据安全、实现数据价值具有重要意义。本文首先阐述了大数据治理的概念和目标,接着分析了大数据治理面临的挑战&a…...
屋面通风器安装方案及流程
屋面通风器的安装方案及流程是一个系统性工作,需要仔细规划和执行,以确保安装质量和通风器的正常运行。昱合昇天窗厂家为大家整理了详细的安装方案及流程,供您参考。一、安装前准备 1、确定安装位置 根据建筑物屋顶结构和通风需求,…...
ComfyUI一键更换服装:IP-Adapter V2 + FaceDetailer(DeepFashion)
在这篇文章中,我们将探索如何使用新版的IP-Adapter和ComfyUI软件为人物进行换装。 整个过程非常简单,仅需要两张图片:一张服装图片和一张人物图片。 通过一系列节点的操作,ComfyUI就会把这个服装换到人物身上,并利用…...
AWS账号与亚马逊账号的关系解析
在当今数字化时代,云计算已成为企业和个人用户不可或缺的一部分。亚马逊网络服务(AWS)是全球领先的云计算平台,而亚马逊(Amazon)则是全球最大的在线零售商之一。许多人在使用这两个平台时,常常会…...
Java八大基本数据类型详解
引言 一、整数类型 二、 浮点类型 三、.字符类型 四、布尔类型 示例代码 注意事项 引言 在Java编程语言中,基本数据类型是构建程序的基础。了解这些数据类型的特性和使用方法对于编写高效且正确的代码至关重要。本文将详细介绍Java的八大基本数据类型ÿ…...
ChatGPT的终极指南概要
ChatGPT的终极指南概要 [ Prompt Format(提示格式) 是一种用于指导ChatGPT生成特定类型回答的模板。它通常包括以下几个部分: 角色(Role):定义AI模型在Prompt中所扮演的角色,例如专家、顾问、…...
Android应用性能优化的方法
Android应用性能优化是一个复杂而关键的过程,涉及多个方面,包括布局优化、网络优化、安装包优化、内存优化、卡顿优化、启动优化等。以下是对这些优化方法的详细解析: 一、布局优化 布局优化是Android性能优化的基础,主要目标是…...
『网络游戏』客户端发送消息到服务器【17】
将上一章服务器的协议PEProtocol的.dll文件重新生成导入unity客户端中 命名为Net 点击生成 另一种导入.dll文件方式 在客户端粘贴即可 此时Net文件夹的.dll文件就导入进来了 创建脚本:NetSvc.cs 编写脚本:NetSvc.cs 修改脚本:GameRoot.cs 在…...
【系统架构设计师】专题:数据库系统考点梳理
更多内容请见: 备考系统架构设计师-核心总结目录 文章目录 一、数据库基本概念1、数据库技术的发展2、数据模型3、数据库管理系统4、数据库三级模式二、关系数据库1、关系数据库基本概念2、关系运算3、关系数据库设计基本理论三、数据库设计1、数据库设计的基本步骤2、数据需求…...
Java传递对象是值传递还是引用传递?
🎉 前言 之前一直以为Java传对象是引用传递,直到最近用Java写数据结构链表时遇到一些问题,这才让我重新思考这个问题,经过我的一番研究,发现不能一棒子打死,其实这其中既有值传递,又有引用传递…...
解锁C++多态的魔力:灵活与高效的编码艺术(上)
文章目录 前言🌸一、多态的定义与概念🌻1.1 多态的核心思想:🌻1.2 多态的两种主要形式: 🌸二、多态的使用条件🌻2.1 基类指针或引用2.1.1 为什么需要基类指针或引用 🌻2.2 虚函数&am…...
k8s系列-Rancher 上操作的k8s容器网络配置总结
Rancher 上操作的k8s容器网络配置总结 要在 Rancher 中配置Spring Boot 应用 ykhd-zhjgyw-xpwfxfjfl 服务,正确的配置方式如下: 1. 应用程序监听端口 在 application.yaml 文件中,配置的应用监听端口是 10001,并且应用的上下文…...
2024年【氯化工艺】考试题库及氯化工艺考试内容
题库来源:安全生产模拟考试一点通公众号小程序 氯化工艺考试题库根据新氯化工艺考试大纲要求,安全生产模拟考试一点通将氯化工艺模拟考试试题进行汇编,组成一套氯化工艺全真模拟考试试题,学员可通过氯化工艺考试内容全真模拟&…...
从commit校验失效问题探究husky原理
一、背景 之前创建的项目,发现代码 commit 提交的时候没有了任何校验,具体表现: 一是 feat fix 等主题格式校验没有了二是代码 lint 不通过也能提交 尝试解决这个问题,并深入了解husky的实现原理,将相关的一些知识点…...
Azure OpenAI 服务上线具有音频和语音功能的 GPT-4o-Realtime-Preview,免费申请试用
微软宣布 GPT-4o-Realtime-Preview 音频和语音公开预览版的推出,这是对Microsoft Azure OpenAI 服务的重大增强,增加了高级语音功能并扩展了 GPT-4o 的多模式产品。 这一里程碑进一步巩固了 Azure 在人工智能领域的领导地位,尤其是在语音技术…...
基于IMX6UL的EPIT的定时器实验
定时器是最常用的外设,常常需要使用定时器来完成精准的定时功能,I.MX6U 提供了多 种硬件定时器,有些定时器功能非常强大。本章我们从最基本的 EPIT 定时器开始,学习如何配置EPIT 定时器,使其按照给定的时间,…...
XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...
智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql
智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...
《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...
关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
微信小程序云开发平台MySQL的连接方式
注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...
