当前位置: 首页 > news >正文

workflow源码解析:ThreadTask

1、使用程序,一个简单的加法运算程序

#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。// 定义INPUT
struct AddInput
{int x;int y;
};// 定义OUTPUT
struct AddOutput
{int res;
};// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{output->res = input->x + input->y;
}using AddTask = WFThreadTask<AddInput, AddOutput>;void callback(AddTask *task)
{auto *input = task->get_input();auto *output = task->get_output();assert(task->get_state() == WFT_STATE_SUCCESS);fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}int main()
{using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;AddTask *task = AddFactory::create_thread_task("add_task",add_routine,callback);AddInput *input = task->get_input();input->x = 1;input->y = 2;task->start();getchar();return 0;
}

2、类继承关系

WFThreadTaskFactory代码

// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:using T = WFThreadTask<INPUT, OUTPUT>;...
public:static T *create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (T *)> callback);...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),WFGlobal::get_compute_executor(),std::move(routine),std::move(callback));
}

__WFThreadTask代码

// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:virtual void execute()  //实现ExecSession的纯虚函数{this->routine(&this->input, &this->output); //执行用户程序的routine}protected:std::function<void (INPUT *, OUTPUT *)> routine;public:__WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (INPUT *, OUTPUT *)>&& rt,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),routine(std::move(rt)){}
};

WFThreadTask代码

// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:void start();void dismiss();INPUT *get_input() { return &this->input; }OUTPUT *get_output() { return &this->output; }void *user_data;int get_state() const { return this->state; }int get_error() const { return this->error; }void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:virtual SubTask *done();protected:INPUT input;OUTPUT output;std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;public:WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :ExecRequest(queue, executor),callback(std::move(cb)){// 初始化}protected:virtual ~WFThreadTask() { }
};

ExecRequest代码

// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:ExecRequest(ExecQueue *queue, Executor *executor);ExecQueue *get_request_queue() const { return this->queue; }void set_request_queue(ExecQueue *queue) { this->queue = queue; }virtual void dispatch()  // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口{this->executor->request(this, this->queue);...}protected:int state;int error;ExecQueue *queue;Executor *executor;protected:virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};

SubTask代码

class SubTask
{// 子任务被调起的时机virtual void dispatch() = 0;// 子任务执行完成的时机virtual SubTask *done() = 0;// 内部实现,决定了任务流走向void subtask_done();...
};

ExecSession代码

/src/kernel/Executor.h
class ExecSession
{
private:virtual void execute() = 0;virtual void handle(int state, int error) = 0;protected:ExecQueue *get_queue() { return this->queue; }private:ExecQueue *queue;...
};

继承关系图

__WFThreadTask__目前还未用到,暂不清楚

在这里插入图片描述

3、两个重要成员: ExecQueue, Executor

ExecQueue代码

/src/kernel/Executor.h
class ExecQueue
{...
private:struct list_head task_list;pthread_mutex_t mutex;
};

Executor代码

/src/kernel/Executor.h
class Executor
{
public:// 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中int request(ExecSession *session, ExecQueue *queue);private:// 执行器和系统资源,是一个包含关系thrdpool_t *thrdpool;
};

request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine

// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{ExecSessionEntry *entry = new ExecSessionEntry;session->queue = queue;entry->session = session;entry->thrdpool = this->thrdpool;queue->mutex.lock();list_add_tail(&entry->list, &queue->session_list);if (queue->session_list.next == &entry->list){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/if (thrdpool_schedule(&task, this->thrdpool) < 0){list_del(&entry->list);delete entry;entry = NULL;}}queue->mutex.unlock();return -!entry;
}
struct ExecSessionEntry
{struct list_head list;ExecSession *session;thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{ExecQueue *queue = (ExecQueue *)context;ExecSessionEntry *entry;ExecSession *session;queue->mutex.lock();entry = list_entry(queue->session_list.next, ExecSessionEntry, list);list_del(&entry->list);session = entry->session;if (!list_empty(&queue->session_list)){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/__thrdpool_schedule(&task, entry, entry->thrdpool);}elsedelete entry;queue->mutex.unlock();session->execute(); //这里会执行到用户routinesession->handle(ES_STATE_FINISHED, 0);
}

4、参考链接

https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502

相关文章:

workflow源码解析:ThreadTask

1、使用程序&#xff0c;一个简单的加法运算程序 #include <iostream> #include <workflow/WFTaskFactory.h> #include <errno.h>// 直接定义thread_task三要素 // 一个典型的后端程序由三个部分组成&#xff0c;并且完全独立开发。即&#xff1a;程序协议算…...

为何谷歌强制要求安装ssl证书?

在当今数字化的世界中&#xff0c;网络安全已成为至关重要的议题之一。作为全球最大的搜索引擎之一&#xff0c;谷歌一直在推动网络安全标准的提升。其强制要求网站安装SSL证书的决策引起了广泛关注。本文将深入探讨谷歌为何强制要求安装SSL证书&#xff0c;以及这一举措对互联…...

【刷题】 leetcode 2 .两数相加

两数相加 两数相加1 思路一 &#xff08;暴毙版&#xff09;2 思路二 &#xff08;本质出发&#xff09; 谢谢阅读Thanks♪(&#xff65;ω&#xff65;)&#xff89;下一篇文章见&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 两数相加 我们来看…...

Webpack5入门到原理2:基本使用

Webpack 是一个静态资源打包工具。 它会以一个或多个文件作为打包的入口&#xff0c;将我们整个项目所有文件编译组合成一个或多个文件输出出去。 输出的文件就是编译好的文件&#xff0c;就可以在浏览器段运行了。 我们将 Webpack 输出的文件叫做 bundle。 功能介绍 Webp…...

企业微信上传临时素材errcode:44001,errmsg:empty media data

企业微信&#xff0c;上传临时素材&#xff0c;报错&#xff1a; {“errcode”:44001,“errmsg”:“empty media data [logid:]”}&#xff0c; 开发语言C# 重点代码&#xff1a; formData.Headers.ContentType new MediaTypeHeaderValue(“application/octet-stream”); 解…...

Docker技巧汇总

Docker技巧汇总 前言使用流程安装配置镜像管理创建并运行容器使用容器/常用命令导出和导入查看元数据挂载数据卷端口映射/转发VS Code连接Docker 前言 Docker 是一个开源的应用容器引擎&#xff0c;可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xf…...

学习使用微信小程序实现智能名片电子名片功能代码

学习使用微信小程序实现智能名片电子名片功能代码 拨打手机号功能一键复制信息功能定位导航功能存入手机通讯录功能转发分享功能 拨打手机号功能 wx.makePhoneCall({phoneNumber: qipa250 //仅为示例&#xff0c;并非真实的电话号码 })一键复制信息功能 wx.getClipboardData(…...

学习响应式编程中遇到的奇奇怪怪的问题

spring项目无法启动 Description: Web application could not be started as there was no org.springframework.boot.web.reactive.server.ReactiveWebServerFactory bean defined in the context. Action: Check your application’s dependencies for a supported react…...

前端常用js、css效果

前端常用js效果 效果参考代码文本横向滚动文本无限滚动无缝轮播无缝滚动盒子上下移动樱花飘落 效果 主要整理了几个常用的&#xff0c;方便平时做项目的时候参考 文本横向滚动 文本无限滚动 无缝轮播 无缝滚动 盒子上下滚动 樱花飘落效果 参考代码 文本横向滚动 <!DOCTYP…...

Modern C++ 条件变量

今天无意中看到一篇帖子&#xff0c;关于条件变量的&#xff0c;不过仔细看看发现它并达不到原本的目的。 程序如下&#xff0c;读者可以先想想他的本意&#xff0c;以及有没有问题&#xff1a; #include <iostream> #include <thread> #include <condition_v…...

免费chartGPT网站汇总--

https://s.suolj.com - &#xff08;支持文心、科大讯飞、智谱等国内大语言模型&#xff0c;Midjourney绘画、语音对讲、聊天插件&#xff09;国内可以直连&#xff0c;响应速度很快 很稳定 https://seboai.github.io - 国内可以直连&#xff0c;响应速度很快 很稳定 http://gp…...

关于C#中的async/await的理解

1. 使用async标记的方法被认为是一个异步方法&#xff0c;如果不使用await关键字&#xff0c;调用跟普通方法没有区别 static async Task Main(string[] args){Console.WriteLine("主线程id&#xff1a;" Thread.CurrentThread.ManagedThreadId);TestAwait();Consol…...

docker硬件交互 _ROS2

docker硬件交互 _ROS2 将自己需要挂载的设备接到主板上&#xff0c;在宿主机中建立udev规则&#xff08;/etc/udev/rules.d/&#xff09;然后在开启容器时&#xff0c;将设置了规则的devices 通过 --device/dev/myserial --device/dev/rplidar 等 参数挂载到docker容器中 doc…...

JS的数据类型和运算符

typeof()方法&#xff1a;检测数据类型 JS中的基本数据类型 基本数据类型 1.number 数字 2.string 字符串 3.boolean 布尔 4.null 代表空值&#xff08;typeof方法检测出来的数据类型是object类型&#xff09; 5.underfined 未定义&#xff1b;变量已声明但是未赋值 6.…...

CSS实现平行四边形

1、为什么实现平行四边形 在日常开发过程中&#xff0c;有些时候我们可以会遇到一种情况&#xff0c;如可视化大屏中要求我们横线实现对应的进度条&#xff0c;但进度条的内容是由无数个平行四边形组装类似于进度条的形式&#xff0c;那么我们就需要使用CSS来进行对应的实现。 …...

第11章 GUI Page500~504 步骤三十二:打开画板文件02

各个图元类新增GetTypeName_Static()&#xff0c;并将原来的GetTypeName()改为调用静态方法实现&#xff1a; 直线&#xff1a; 圆&#xff1a; 十字&#xff1a; 矩形&#xff1a; 文字&#xff1a; tool_4_save_load.hpp添加两行 tool_4_save_load.cpp增加&#xff1a; 增加…...

【ROS2】ROS2使用C++实现简单服务端

使用ROS2实现简单的服务端,功能为将客户端提供的两个数相加后返回给客户端。 代码如下: #include "rclcpp/rclcpp.hpp" #include "std_msgs/msg/string.hpp" #include "base_interfaces_demo/msg/student.hpp" #include "base_interfac…...

WAF攻防相关知识点总结1--信息收集中的WAF触发及解决方案

什么是WAF WAF可以通过对Web应用程序的流量进行过滤和监控&#xff0c;识别并阻止潜在的安全威胁。WAF可以检测Web应用程序中的各种攻击&#xff0c;例如SQL注入、跨站点脚本攻击&#xff08;XSS&#xff09;、跨站请求伪造&#xff08;CSRF&#xff09;等&#xff0c;并采取相…...

行云部署前端架构解析-前言 | 京东云技术团队

一个简单的自我介绍 项目规模 截止目前上万次代码提交&#xff0c;总代码行数1超过21万行&#xff0c;其中人工维护的代码超过 13万行&#xff0c;近千个文件。 前端线上服务直接对接的后端服务&#xff0c;达十多个。 跟很多应用一样, 它有行云的入口, 也有独立的服务, 还…...

git提交代码到远端仓库的方法详解

一、何为git git就是版本控制器&#xff0c;就比如说你新建了一个git文件夹&#xff0c;里面用于存放你的C语言实习报告&#xff0c;现在要用git对该文件夹进行接管。当你修改了你的C语言实习报告点击保存之后&#xff0c;就用git的相关命令&#xff0c;提交给git&#xff0c;让…...

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15

缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下&#xff1a; struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

ffmpeg(四):滤镜命令

FFmpeg 的滤镜命令是用于音视频处理中的强大工具&#xff0c;可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下&#xff1a; ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜&#xff1a; ffmpeg…...

[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...

从零实现STL哈希容器:unordered_map/unordered_set封装详解

本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说&#xff0c;直接开始吧&#xff01; 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...

C++中string流知识详解和示例

一、概览与类体系 C 提供三种基于内存字符串的流&#xff0c;定义在 <sstream> 中&#xff1a; std::istringstream&#xff1a;输入流&#xff0c;从已有字符串中读取并解析。std::ostringstream&#xff1a;输出流&#xff0c;向内部缓冲区写入内容&#xff0c;最终取…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”

目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...