workflow源码解析:ThreadTask
1、使用程序,一个简单的加法运算程序
#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。// 定义INPUT
struct AddInput
{int x;int y;
};// 定义OUTPUT
struct AddOutput
{int res;
};// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{output->res = input->x + input->y;
}using AddTask = WFThreadTask<AddInput, AddOutput>;void callback(AddTask *task)
{auto *input = task->get_input();auto *output = task->get_output();assert(task->get_state() == WFT_STATE_SUCCESS);fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}int main()
{using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;AddTask *task = AddFactory::create_thread_task("add_task",add_routine,callback);AddInput *input = task->get_input();input->x = 1;input->y = 2;task->start();getchar();return 0;
}
2、类继承关系
WFThreadTaskFactory代码
// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:using T = WFThreadTask<INPUT, OUTPUT>;...
public:static T *create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (T *)> callback);...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),WFGlobal::get_compute_executor(),std::move(routine),std::move(callback));
}
__WFThreadTask代码
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:virtual void execute() //实现ExecSession的纯虚函数{this->routine(&this->input, &this->output); //执行用户程序的routine}protected:std::function<void (INPUT *, OUTPUT *)> routine;public:__WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (INPUT *, OUTPUT *)>&& rt,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),routine(std::move(rt)){}
};
WFThreadTask代码
// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:void start();void dismiss();INPUT *get_input() { return &this->input; }OUTPUT *get_output() { return &this->output; }void *user_data;int get_state() const { return this->state; }int get_error() const { return this->error; }void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:virtual SubTask *done();protected:INPUT input;OUTPUT output;std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;public:WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :ExecRequest(queue, executor),callback(std::move(cb)){// 初始化}protected:virtual ~WFThreadTask() { }
};
ExecRequest代码
// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:ExecRequest(ExecQueue *queue, Executor *executor);ExecQueue *get_request_queue() const { return this->queue; }void set_request_queue(ExecQueue *queue) { this->queue = queue; }virtual void dispatch() // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口{this->executor->request(this, this->queue);...}protected:int state;int error;ExecQueue *queue;Executor *executor;protected:virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};
SubTask代码
class SubTask
{// 子任务被调起的时机virtual void dispatch() = 0;// 子任务执行完成的时机virtual SubTask *done() = 0;// 内部实现,决定了任务流走向void subtask_done();...
};
ExecSession代码
/src/kernel/Executor.h
class ExecSession
{
private:virtual void execute() = 0;virtual void handle(int state, int error) = 0;protected:ExecQueue *get_queue() { return this->queue; }private:ExecQueue *queue;...
};
继承关系图
__WFThreadTask__目前还未用到,暂不清楚

3、两个重要成员: ExecQueue, Executor
ExecQueue代码
/src/kernel/Executor.h
class ExecQueue
{...
private:struct list_head task_list;pthread_mutex_t mutex;
};
Executor代码
/src/kernel/Executor.h
class Executor
{
public:// 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中int request(ExecSession *session, ExecQueue *queue);private:// 执行器和系统资源,是一个包含关系thrdpool_t *thrdpool;
};
request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine
// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{ExecSessionEntry *entry = new ExecSessionEntry;session->queue = queue;entry->session = session;entry->thrdpool = this->thrdpool;queue->mutex.lock();list_add_tail(&entry->list, &queue->session_list);if (queue->session_list.next == &entry->list){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine = Executor::executor_thread_routine,.context = queue};*/if (thrdpool_schedule(&task, this->thrdpool) < 0){list_del(&entry->list);delete entry;entry = NULL;}}queue->mutex.unlock();return -!entry;
}
struct ExecSessionEntry
{struct list_head list;ExecSession *session;thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{ExecQueue *queue = (ExecQueue *)context;ExecSessionEntry *entry;ExecSession *session;queue->mutex.lock();entry = list_entry(queue->session_list.next, ExecSessionEntry, list);list_del(&entry->list);session = entry->session;if (!list_empty(&queue->session_list)){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine = Executor::executor_thread_routine,.context = queue};*/__thrdpool_schedule(&task, entry, entry->thrdpool);}elsedelete entry;queue->mutex.unlock();session->execute(); //这里会执行到用户routinesession->handle(ES_STATE_FINISHED, 0);
}
4、参考链接
https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502
相关文章:
workflow源码解析:ThreadTask
1、使用程序,一个简单的加法运算程序 #include <iostream> #include <workflow/WFTaskFactory.h> #include <errno.h>// 直接定义thread_task三要素 // 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序协议算…...
为何谷歌强制要求安装ssl证书?
在当今数字化的世界中,网络安全已成为至关重要的议题之一。作为全球最大的搜索引擎之一,谷歌一直在推动网络安全标准的提升。其强制要求网站安装SSL证书的决策引起了广泛关注。本文将深入探讨谷歌为何强制要求安装SSL证书,以及这一举措对互联…...
【刷题】 leetcode 2 .两数相加
两数相加 两数相加1 思路一 (暴毙版)2 思路二 (本质出发) 谢谢阅读Thanks♪(・ω・)ノ下一篇文章见!!!!!! 两数相加 我们来看…...
Webpack5入门到原理2:基本使用
Webpack 是一个静态资源打包工具。 它会以一个或多个文件作为打包的入口,将我们整个项目所有文件编译组合成一个或多个文件输出出去。 输出的文件就是编译好的文件,就可以在浏览器段运行了。 我们将 Webpack 输出的文件叫做 bundle。 功能介绍 Webp…...
企业微信上传临时素材errcode:44001,errmsg:empty media data
企业微信,上传临时素材,报错: {“errcode”:44001,“errmsg”:“empty media data [logid:]”}, 开发语言C# 重点代码: formData.Headers.ContentType new MediaTypeHeaderValue(“application/octet-stream”); 解…...
Docker技巧汇总
Docker技巧汇总 前言使用流程安装配置镜像管理创建并运行容器使用容器/常用命令导出和导入查看元数据挂载数据卷端口映射/转发VS Code连接Docker 前言 Docker 是一个开源的应用容器引擎,可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中…...
学习使用微信小程序实现智能名片电子名片功能代码
学习使用微信小程序实现智能名片电子名片功能代码 拨打手机号功能一键复制信息功能定位导航功能存入手机通讯录功能转发分享功能 拨打手机号功能 wx.makePhoneCall({phoneNumber: qipa250 //仅为示例,并非真实的电话号码 })一键复制信息功能 wx.getClipboardData(…...
学习响应式编程中遇到的奇奇怪怪的问题
spring项目无法启动 Description: Web application could not be started as there was no org.springframework.boot.web.reactive.server.ReactiveWebServerFactory bean defined in the context. Action: Check your application’s dependencies for a supported react…...
前端常用js、css效果
前端常用js效果 效果参考代码文本横向滚动文本无限滚动无缝轮播无缝滚动盒子上下移动樱花飘落 效果 主要整理了几个常用的,方便平时做项目的时候参考 文本横向滚动 文本无限滚动 无缝轮播 无缝滚动 盒子上下滚动 樱花飘落效果 参考代码 文本横向滚动 <!DOCTYP…...
Modern C++ 条件变量
今天无意中看到一篇帖子,关于条件变量的,不过仔细看看发现它并达不到原本的目的。 程序如下,读者可以先想想他的本意,以及有没有问题: #include <iostream> #include <thread> #include <condition_v…...
免费chartGPT网站汇总--
https://s.suolj.com - (支持文心、科大讯飞、智谱等国内大语言模型,Midjourney绘画、语音对讲、聊天插件)国内可以直连,响应速度很快 很稳定 https://seboai.github.io - 国内可以直连,响应速度很快 很稳定 http://gp…...
关于C#中的async/await的理解
1. 使用async标记的方法被认为是一个异步方法,如果不使用await关键字,调用跟普通方法没有区别 static async Task Main(string[] args){Console.WriteLine("主线程id:" Thread.CurrentThread.ManagedThreadId);TestAwait();Consol…...
docker硬件交互 _ROS2
docker硬件交互 _ROS2 将自己需要挂载的设备接到主板上,在宿主机中建立udev规则(/etc/udev/rules.d/)然后在开启容器时,将设置了规则的devices 通过 --device/dev/myserial --device/dev/rplidar 等 参数挂载到docker容器中 doc…...
JS的数据类型和运算符
typeof()方法:检测数据类型 JS中的基本数据类型 基本数据类型 1.number 数字 2.string 字符串 3.boolean 布尔 4.null 代表空值(typeof方法检测出来的数据类型是object类型) 5.underfined 未定义;变量已声明但是未赋值 6.…...
CSS实现平行四边形
1、为什么实现平行四边形 在日常开发过程中,有些时候我们可以会遇到一种情况,如可视化大屏中要求我们横线实现对应的进度条,但进度条的内容是由无数个平行四边形组装类似于进度条的形式,那么我们就需要使用CSS来进行对应的实现。 …...
第11章 GUI Page500~504 步骤三十二:打开画板文件02
各个图元类新增GetTypeName_Static(),并将原来的GetTypeName()改为调用静态方法实现: 直线: 圆: 十字: 矩形: 文字: tool_4_save_load.hpp添加两行 tool_4_save_load.cpp增加: 增加…...
【ROS2】ROS2使用C++实现简单服务端
使用ROS2实现简单的服务端,功能为将客户端提供的两个数相加后返回给客户端。 代码如下: #include "rclcpp/rclcpp.hpp" #include "std_msgs/msg/string.hpp" #include "base_interfaces_demo/msg/student.hpp" #include "base_interfac…...
WAF攻防相关知识点总结1--信息收集中的WAF触发及解决方案
什么是WAF WAF可以通过对Web应用程序的流量进行过滤和监控,识别并阻止潜在的安全威胁。WAF可以检测Web应用程序中的各种攻击,例如SQL注入、跨站点脚本攻击(XSS)、跨站请求伪造(CSRF)等,并采取相…...
行云部署前端架构解析-前言 | 京东云技术团队
一个简单的自我介绍 项目规模 截止目前上万次代码提交,总代码行数1超过21万行,其中人工维护的代码超过 13万行,近千个文件。 前端线上服务直接对接的后端服务,达十多个。 跟很多应用一样, 它有行云的入口, 也有独立的服务, 还…...
git提交代码到远端仓库的方法详解
一、何为git git就是版本控制器,就比如说你新建了一个git文件夹,里面用于存放你的C语言实习报告,现在要用git对该文件夹进行接管。当你修改了你的C语言实习报告点击保存之后,就用git的相关命令,提交给git,让…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
工程地质软件市场:发展现状、趋势与策略建议
一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的
修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...
【Linux系统】Linux环境变量:系统配置的隐形指挥官
。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量:setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...
windows系统MySQL安装文档
概览:本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容,为学习者提供全面的操作指导。关键要点包括: 解压 :下载完成后解压压缩包,得到MySQL 8.…...
怎么开发一个网络协议模块(C语言框架)之(六) ——通用对象池总结(核心)
+---------------------------+ | operEntryTbl[] | ← 操作对象池 (对象数组) +---------------------------+ | 0 | 1 | 2 | ... | N-1 | +---------------------------+↓ 初始化时全部加入 +------------------------+ +-------------------------+ | …...
Python学习(8) ----- Python的类与对象
Python 中的类(Class)与对象(Object)是面向对象编程(OOP)的核心。我们可以通过“类是模板,对象是实例”来理解它们的关系。 🧱 一句话理解: 类就像“图纸”,对…...
