Rust : FnOnce、线程池与多策略执行
一、问题:mpsc如何发送各类不同的函数?
3个关键词:闭包、Box与FnOnce;请细品。
use std::sync::{mpsc,Arc,Mutex};
use std::thread;
fn process<F>(old:f32,name:String,f:F) where F: FnOnce(f32,String) {f(old,name);
}
fn add_f32(a:f32,b:f32) ->f32{a+b
}
fn doit(amount: f32,code:String){println!("amount:{:?} code:{:?}",amount,code);
}
fn workit(amount:f32){println!("amount:{:?}",amount);
}
fn test(){process(2.0, "b".into(), doit);let fn1: Box<dyn FnOnce()> = Box::new(move || doit(2.0,"hello cat!".into()));fn1();let fn2: Box<dyn FnOnce(f32,String)> = Box::new(move |amount:f32,code:String| doit(amount,code));fn2(2.0,"hello john!".into());let fn3:Box<dyn FnOnce(f32,String)> = Box::new(move |amount:f32,code:String| process(amount,code,doit));fn3(2.0,"hello rose!".into());let fn4: Box<dyn FnOnce()> = Box::new(move ||workit(3.0));fn4();let fn5: Box<dyn FnOnce()> = Box::new(move ||println!("hello world!"));fn5();// 带类型返还的结构let fn6:Box<dyn FnOnce()->f32> = Box::new(move ||add_f32(1.0,2.0));fn6();
}
// 定义闭包中没有参数输入的函数类型,做为发送对象
type box_fn = Box<dyn FnOnce() + Send>;
fn main(){//test();let (sender, receiver) = mpsc::channel::<box_fn>();let receiver = Arc::new(Mutex::new(receiver));// 注意:||中均没有参数,故是FnOnce(); 具体参考test()let vec_fn:Vec<Box<dyn FnOnce()+Send>> = vec![Box::new(move ||println!("hello world!")),Box::new(move ||workit(3.0)),Box::new(move || doit(2.0,"hello cat!".into()))];for f in vec_fn {sender.send(f).unwrap();}let rx = receiver.clone();let handle = thread::spawn(move ||{loop{let box_fn = rx.lock().unwrap().recv().unwrap() ;println!("from son thread!");box_fn();}});println!("main thread sended all box_fn!");handle.join().unwrap();}
二、线程池的应用:发送函数有什么用处?
如果需要让每一个函数都分配一个线程来执行这些函数(任务),或者用一个线程池来执行函数,这个时侯就可以用上场了。
在线程池中,FnOnce是一个其中的灵魂。他可以把所有的函数进行抽象统一,便一管理和执行。
// 这个线程池是一个比较粗的框架;可以执行不同策略类型:多周期、趋势、套利、市场中性等各类策略组的运行。
use std::sync::atomic::AtomicU32;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
// 存在不均衡的情况 => steal work todo!
// 简易的threadpool,没有用condvar.
// send_num可以不要,这里只是记录一下发送的情况
pub struct ThreadPool {threads_num: usize,workers: Vec<Worker>,sender: mpsc::Sender<Message>,send_num: Arc<Mutex<usize>>,
}
// 定义执行各类函数的参数结构类型,带不带参数,带参数的类型,多少
// 其中参数只是举例。比如某类策略只需要输入vec<bar>或vec<tick>; 某类策略需要输入hashmap<string,vec<bar>>;
// 或者套利策略中:两组或多组pair: (vec<bar>,vec<bar>)......
// 一组1min vec<bar>,3min vec<bar>,5min vec<bar>等......
// 不同的策略类型,都可以在这里进行抽象;
// 最后线程池可以执行各类不同的策略;
enum StrategyFn
{T(Box<dyn FnOnce() + Send +'static>),U(Box<dyn FnOnce(f32, f32) + Send +'static>,Arc<f32>,Arc<f32>),W(Box<dyn FnOnce(Vec<f32>, f32) + Send +'static>,Arc<Vec<f32>>,Arc<f32>),
}// 定义发送内容
enum Message{Task(Task),Shutdown,
}
struct Task{job :Job,task_id :usize,
}type Job = Box<dyn FnOnce() + Send +'static>;impl ThreadPool {pub fn new(threads_num: usize) -> ThreadPool {assert!(threads_num > 0);let (sender, receiver) = mpsc::channel::<Message>();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(threads_num);let send_num = Arc::new(Mutex::new(0_usize));let execute_num = Arc::new(Mutex::new(0_usize));for id in 0..threads_num {let mut worker = Worker::new(id);worker.run(&receiver, &execute_num);workers.push(worker);}ThreadPool { threads_num,workers, sender,send_num}}pub fn execute(&mut self, task_id:usize,strategy_fn: StrategyFn){let mut job:Box<dyn FnOnce()+Send +'static> = Box::new(move||{});match strategy_fn{StrategyFn::U(f, input1, input2) => {job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));},StrategyFn::T(f) =>{job = Box::new(move || f());}StrategyFn::W(f,input1,input2)=> {job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));},}let task = Task{job:job,task_id:task_id};let mut send_num = self.send_num.lock().unwrap();*send_num = *send_num +1;self.sender.send(Message::Task(task)).unwrap();}}pub struct Worker {id: usize,thread: Option<thread::JoinHandle<()>>,}impl Worker {pub fn new(id: usize)-> Self{Worker {id:id,thread: None,}}pub fn run(&mut self, receiver: &Arc<Mutex<mpsc::Receiver<Message>>>,execute_num: &Arc<Mutex<usize>>){let id = self.id;let receiver = receiver.clone();let execute_num = execute_num.clone();let thread = thread::spawn(move || loop {let msg = receiver.lock().unwrap().recv().unwrap();match msg{Message::Task(task) => {println!("Worker {} got a task; executing task {}.", id,task.task_id);let mut execute_num = execute_num.lock().unwrap();*execute_num = *execute_num + 1;println!("work {} = >execute_num: {}", id,*execute_num);(task.job)();},Message::Shutdown => {println!("Worker {} received shutdown message.", id);break;//很关键}}});self.thread = Some(thread);}
}impl Drop for ThreadPool {fn drop(&mut self) {for _ in 0..self.threads_num{self.sender.send(Message::Shutdown).unwrap();}println!("drop worker :{:?}",self.workers.len());for (i,worker) in (&mut self.workers).into_iter().enumerate() {println!("------Shutting down worker {} i:{} -----------", worker.id,i);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}
// 这里只是通过宏生成一类:U(Box<dyn FnOnce(f32, f32) + Send +'static>,Arc<f32>,Arc<f32>)策略数据类型;
//
macro_rules! create_strategy_U{($($s:ident),*) => ($(pub fn $s(_a:f32,_b:f32){println!("run 【U】 strategy {:?}",stringify!($s));//thread::sleep(Duration::from_millis(1));})* );
}
//这里只是通过宏生成一类:T(Box<dyn FnOnce() + Send +'static>)策略数据类型;
macro_rules! create_strategy_T{($($s:ident),*) => ($(pub fn $s(){println!("run 【T】 strategy {:?}",stringify!($s));//thread::sleep(Duration::from_millis(1));})* );
}
// 生成U策略组若干
create_strategy_U!(A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q);
// 生成T策略组若干
create_strategy_T!(AA,BB,CC,DD,EE,FF,GG);
pub fn main() {let mut pool = ThreadPool::new(2);let input1 = Arc::new(1.0);let input2 = Arc::new(2.0);let u_strategies = vec![A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q];let t_strategies = vec![AA,BB,CC,DD,EE,FF,GG];println!("u_strategies num : {:?}",u_strategies.len());for (task_id,u_strategy) in u_strategies.into_iter().enumerate(){let strategy_fn = StrategyFn::U(Box::new(u_strategy),input1.clone(), input2.clone());pool.execute(task_id as usize,strategy_fn);}println!("t_strategies num : {:?}",t_strategies.len());for (task_id,t_strategy) in t_strategies.into_iter().enumerate(){let strategy_fn = StrategyFn::T(Box::new(t_strategy));pool.execute(task_id as usize,strategy_fn);}println!("toal send_num : {:?}",*pool.send_num.lock().unwrap());
}
输出:
能过U和T类型策略运行实例,说明只要事先进行枚举定义,可以看出,这个线程池可以执行不同类型的策略。
这里有一个问题:这些线程池中线程中如何抢到这个任务,并完整执行完这些任务?这里依靠的是Arc<Mutex<mpsc::Receiver>>结构来保证。
具体如下:
u_strategies num : 17
t_strategies num : 7
toal send_num : 24
drop worker :4
------Shutting down worker 0 i:0 -----------
Worker 2 got a task; executing task 3.
work 2 = >execute_num: 1
run 【U】 strategy "D"
Worker 2 got a task; executing task 4.
work 2 = >execute_num: 2
run 【U】 strategy "E"
Worker 2 got a task; executing task 5.
work 2 = >execute_num: 3
Worker 1 got a task; executing task 1.
Worker 3 got a task; executing task 2.
Worker 0 got a task; executing task 0.
run 【U】 strategy "F"
Worker 2 got a task; executing task 6.
work 1 = >execute_num: 4
run 【U】 strategy "B"
Worker 1 got a task; executing task 7.
work 3 = >execute_num: 5
run 【U】 strategy "C"
work 0 = >execute_num: 6
run 【U】 strategy "A"
Worker 0 got a task; executing task 9.
Worker 3 got a task; executing task 8.
work 2 = >execute_num: 7
run 【U】 strategy "G"
Worker 2 got a task; executing task 10.
work 1 = >execute_num: 8
run 【U】 strategy "H"
Worker 1 got a task; executing task 11.
work 0 = >execute_num: 9
run 【U】 strategy "J"
Worker 0 got a task; executing task 12.
work 3 = >execute_num: 10
run 【U】 strategy "I"
Worker 3 got a task; executing task 13.
work 2 = >execute_num: 11
run 【U】 strategy "K"
Worker 2 got a task; executing task 14.
work 1 = >execute_num: 12
run 【U】 strategy "L"
Worker 1 got a task; executing task 15.
work 0 = >execute_num: 13
run 【U】 strategy "M"
Worker 0 got a task; executing task 16.
work 3 = >execute_num: 14
run 【U】 strategy "N"
Worker 3 got a task; executing task 0.
work 2 = >execute_num: 15
run 【U】 strategy "O"
Worker 2 got a task; executing task 1.
work 1 = >execute_num: 16
run 【U】 strategy "P"
Worker 1 got a task; executing task 2.
work 0 = >execute_num: 17
run 【U】 strategy "Q"
Worker 0 got a task; executing task 3.
work 3 = >execute_num: 18
run 【T】 strategy "AA"
Worker 3 got a task; executing task 4.
work 2 = >execute_num: 19
run 【T】 strategy "BB"
Worker 2 got a task; executing task 5.
work 1 = >execute_num: 20
run 【T】 strategy "CC"
Worker 1 got a task; executing task 6.
work 0 = >execute_num: 21
run 【T】 strategy "DD"
Worker 0 received shutdown message.
work 3 = >execute_num: 22
run 【T】 strategy "EE"
Worker 3 received shutdown message.
work 2 = >execute_num: 23
run 【T】 strategy "FF"
------Shutting down worker 1 i:1 -----------
Worker 2 received shutdown message.
work 1 = >execute_num: 24
run 【T】 strategy "GG"
Worker 1 received shutdown message.
------Shutting down worker 2 i:2 -----------
------Shutting down worker 3 i:3 -----------
相关文章:
Rust : FnOnce、线程池与多策略执行
一、问题:mpsc如何发送各类不同的函数? 3个关键词:闭包、Box与FnOnce;请细品。 use std::sync::{mpsc,Arc,Mutex}; use std::thread; fn process<F>(old:f32,name:String,f:F) where F: FnOnce(f32,String) {f(old,name);…...
一个汉字占几个字节、JS中如何获得一个字符串占用多少字节?
浅浅记录 一个汉字占几个字节?JS中如何获得一个字符串占用多少字节? 一个汉字占几个字节? GBK编码:一个汉字、中文字符都是占2个字节,英文字符占1个字节 UTF-8编码:一个汉字、中文字符都是占3个字节&#…...
CommonJS 和 ES modules
CommonJS 和 ES modules (ESM) 是两种不同的模块系统,它们用于组织 JavaScript 代码,并允许不同文件之间共享代码。 CommonJS (CJS) CommonJS 是最早的 JavaScript 模块化规范之一,主要用于 Node.js 环境中。CommonJS 规定每个文件都是一个…...
计算机网络——CDN
空间编码例子:不是发送N个相同颜色值,而是仅发送2个值,颜色和重复个数 时间编码例子:不是发送i1帧的全部编码,而是仅发送帧i差别的地方 视频播放时,先下载manifest file文件——>解析(不…...
大数据治理:挑战与策略
随着信息技术的飞速发展,大数据已成为当今社会的重要资源。大数据治理作为管理和利用大数据的关键手段,对于提升数据质量、保障数据安全、实现数据价值具有重要意义。本文首先阐述了大数据治理的概念和目标,接着分析了大数据治理面临的挑战&a…...
屋面通风器安装方案及流程
屋面通风器的安装方案及流程是一个系统性工作,需要仔细规划和执行,以确保安装质量和通风器的正常运行。昱合昇天窗厂家为大家整理了详细的安装方案及流程,供您参考。一、安装前准备 1、确定安装位置 根据建筑物屋顶结构和通风需求,…...
ComfyUI一键更换服装:IP-Adapter V2 + FaceDetailer(DeepFashion)
在这篇文章中,我们将探索如何使用新版的IP-Adapter和ComfyUI软件为人物进行换装。 整个过程非常简单,仅需要两张图片:一张服装图片和一张人物图片。 通过一系列节点的操作,ComfyUI就会把这个服装换到人物身上,并利用…...
AWS账号与亚马逊账号的关系解析
在当今数字化时代,云计算已成为企业和个人用户不可或缺的一部分。亚马逊网络服务(AWS)是全球领先的云计算平台,而亚马逊(Amazon)则是全球最大的在线零售商之一。许多人在使用这两个平台时,常常会…...
Java八大基本数据类型详解
引言 一、整数类型 二、 浮点类型 三、.字符类型 四、布尔类型 示例代码 注意事项 引言 在Java编程语言中,基本数据类型是构建程序的基础。了解这些数据类型的特性和使用方法对于编写高效且正确的代码至关重要。本文将详细介绍Java的八大基本数据类型ÿ…...
ChatGPT的终极指南概要
ChatGPT的终极指南概要 [ Prompt Format(提示格式) 是一种用于指导ChatGPT生成特定类型回答的模板。它通常包括以下几个部分: 角色(Role):定义AI模型在Prompt中所扮演的角色,例如专家、顾问、…...
Android应用性能优化的方法
Android应用性能优化是一个复杂而关键的过程,涉及多个方面,包括布局优化、网络优化、安装包优化、内存优化、卡顿优化、启动优化等。以下是对这些优化方法的详细解析: 一、布局优化 布局优化是Android性能优化的基础,主要目标是…...
『网络游戏』客户端发送消息到服务器【17】
将上一章服务器的协议PEProtocol的.dll文件重新生成导入unity客户端中 命名为Net 点击生成 另一种导入.dll文件方式 在客户端粘贴即可 此时Net文件夹的.dll文件就导入进来了 创建脚本:NetSvc.cs 编写脚本:NetSvc.cs 修改脚本:GameRoot.cs 在…...
【系统架构设计师】专题:数据库系统考点梳理
更多内容请见: 备考系统架构设计师-核心总结目录 文章目录 一、数据库基本概念1、数据库技术的发展2、数据模型3、数据库管理系统4、数据库三级模式二、关系数据库1、关系数据库基本概念2、关系运算3、关系数据库设计基本理论三、数据库设计1、数据库设计的基本步骤2、数据需求…...
Java传递对象是值传递还是引用传递?
🎉 前言 之前一直以为Java传对象是引用传递,直到最近用Java写数据结构链表时遇到一些问题,这才让我重新思考这个问题,经过我的一番研究,发现不能一棒子打死,其实这其中既有值传递,又有引用传递…...
解锁C++多态的魔力:灵活与高效的编码艺术(上)
文章目录 前言🌸一、多态的定义与概念🌻1.1 多态的核心思想:🌻1.2 多态的两种主要形式: 🌸二、多态的使用条件🌻2.1 基类指针或引用2.1.1 为什么需要基类指针或引用 🌻2.2 虚函数&am…...
k8s系列-Rancher 上操作的k8s容器网络配置总结
Rancher 上操作的k8s容器网络配置总结 要在 Rancher 中配置Spring Boot 应用 ykhd-zhjgyw-xpwfxfjfl 服务,正确的配置方式如下: 1. 应用程序监听端口 在 application.yaml 文件中,配置的应用监听端口是 10001,并且应用的上下文…...
2024年【氯化工艺】考试题库及氯化工艺考试内容
题库来源:安全生产模拟考试一点通公众号小程序 氯化工艺考试题库根据新氯化工艺考试大纲要求,安全生产模拟考试一点通将氯化工艺模拟考试试题进行汇编,组成一套氯化工艺全真模拟考试试题,学员可通过氯化工艺考试内容全真模拟&…...
从commit校验失效问题探究husky原理
一、背景 之前创建的项目,发现代码 commit 提交的时候没有了任何校验,具体表现: 一是 feat fix 等主题格式校验没有了二是代码 lint 不通过也能提交 尝试解决这个问题,并深入了解husky的实现原理,将相关的一些知识点…...
Azure OpenAI 服务上线具有音频和语音功能的 GPT-4o-Realtime-Preview,免费申请试用
微软宣布 GPT-4o-Realtime-Preview 音频和语音公开预览版的推出,这是对Microsoft Azure OpenAI 服务的重大增强,增加了高级语音功能并扩展了 GPT-4o 的多模式产品。 这一里程碑进一步巩固了 Azure 在人工智能领域的领导地位,尤其是在语音技术…...
基于IMX6UL的EPIT的定时器实验
定时器是最常用的外设,常常需要使用定时器来完成精准的定时功能,I.MX6U 提供了多 种硬件定时器,有些定时器功能非常强大。本章我们从最基本的 EPIT 定时器开始,学习如何配置EPIT 定时器,使其按照给定的时间,…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
idea大量爆红问题解决
问题描述 在学习和工作中,idea是程序员不可缺少的一个工具,但是突然在有些时候就会出现大量爆红的问题,发现无法跳转,无论是关机重启或者是替换root都无法解决 就是如上所展示的问题,但是程序依然可以启动。 问题解决…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
label-studio的使用教程(导入本地路径)
文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...
【WiFi帧结构】
文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成:MAC头部frame bodyFCS,其中MAC是固定格式的,frame body是可变长度。 MAC头部有frame control,duration,address1,address2,addre…...
什么是库存周转?如何用进销存系统提高库存周转率?
你可能听说过这样一句话: “利润不是赚出来的,是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业,很多企业看着销售不错,账上却没钱、利润也不见了,一翻库存才发现: 一堆卖不动的旧货…...
新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...
vulnyx Blogger writeup
信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...
GitHub 趋势日报 (2025年06月06日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...
