当前位置: 首页 > news >正文

【源码分析】zeebe actor模型源码解读

zeebe actor 模型🙋‍♂️

如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。

环境⛅

zeebe release-8.1.14

actor.run() 是怎么开始的🌈

zeebe actor 模型

LongPollingActivateJobsHandler.java

以LongPollingActivateJobsHandler 的激活任务方法为例,我们可以看到run 方法实际上执行ActorControl类的run 方法,让我们进到run 方法中。

	private ActorControl actor;public void activateJobs(final InflightActivateJobsRequest request) {actor.run(() -> {final InFlightLongPollingActivateJobsRequestsState state =getJobTypeState(request.getType());if (state.shouldAttempt(failedAttemptThreshold)) {activateJobsUnchecked(state, request);} else {completeOrResubmitRequest(request, false);}});}

ActorControl

可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中,添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了,因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中,对于job 的执行要等到队列不断被线程pop 到当前job 的时候。

	final ActorTask task;@Overridepublic void run(final Runnable action) {scheduleRunnable(action);}private void scheduleRunnable(final Runnable runnable) {final ActorThread currentActorThread = ActorThread.current();if (currentActorThread != null && currentActorThread.getCurrentTask() == task) {final ActorJob newJob = currentActorThread.newJob();newJob.setRunnable(runnable);newJob.onJobAddedToTask(task);// 插入到执行队列task.insertJob(newJob);} else {final ActorJob job = new ActorJob();job.setRunnable(runnable);job.onJobAddedToTask(task);// 提交到外部队列// submit 实际上是将task 放到thread group 里边task.submit(job);}}

job 是怎么被执行的⚡

并不是任意一个ActorControl 都可以执行run 方法的,按照上图所示,Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task,每个task 中又会有多个job,按照策略选取不同的job 执行,我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。

Gateway.java

gateway 注册task

  private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(final ActivateJobsHandler handler) {final var future = new CompletableFuture<ActivateJobsHandler>();final var actor =Actor.newActor().name("ActivateJobsHandler").actorStartedHandler(handler.andThen(t -> future.complete(handler))).build();// 将task 注册到TaskQueuesactorSchedulingService.submitActor(actor);return future;}

ActorThreadGroup.java

就是上面提到的“线程池”,负责初始化每一条ActorThread 线程,并为其分配默认的WorkStealingGroup

	protected final String groupName;protected final ActorThread[] threads;protected final WorkStealingGroup tasks;protected final int numOfThreads;// 构造器,初始化每条线程,并为其分配一个默认的WorkStealingGroup 任务队列public ActorThreadGroup(final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {this.groupName = groupName;this.numOfThreads = numOfThreads;tasks = new WorkStealingGroup(numOfThreads);threads = new ActorThread[numOfThreads];for (int t = 0; t < numOfThreads; t++) {final String threadName = String.format("%s-%d", groupName, t);final ActorThread thread =builder.getActorThreadFactory().newThread(threadName,t,this,tasks,builder.getActorClock(),builder.getActorTimerQueue(),builder.isMetricsEnabled());threads[t] = thread;}}// startpublic void start() {for (final ActorThread actorThread : threads) {// 启动每一个ActorThreadactorThread.start();}}

ActorThread.java

ActorThread 继承自Thread,可以看到start=>run=>doWork 的引用流程,在doWork 方法中,首先从taskScheduler 中获取当前task,然后执行当前task

// 继承自Thread @Overridepublic synchronized void start() {if (UNSAFE.compareAndSwapObject(this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {// super.start 会执行下面的run 方法super.start();} else {throw new IllegalStateException("Cannot start runner, not in state 'NEW'.");}}// 主要执行doWork 方法@Overridepublic void run() {idleStrategy.init();while (state == ActorThreadState.RUNNING) {try {doWork();} catch (final Exception e) {LOG.error("Unexpected error occurred while in the actor thread {}", getName(), e);}}state = ActorThreadState.TERMINATED;terminationFuture.complete(null);}private void doWork() {submittedCallbacks.drain(this);if (clock.update()) {timerJobQueue.processExpiredTimers(clock);}// 从taskScheduler 中获取当前taskcurrentTask = taskScheduler.getNextTask();if (currentTask != null) {final var actorName = currentTask.actor.getName();try (final var timer = actorMetrics.startExecutionTimer(actorName)) {// 执行当前任务executeCurrentTask();}if (actorMetrics.isEnabled()) {actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());actorMetrics.countExecution(actorName);}} else {idleStrategy.onIdle();}}private void executeCurrentTask() {final var properties = currentTask.getActor().getContext();MDC.setContextMap(properties);idleStrategy.onTaskExecuted();boolean resubmit = false;try {// 真正执行当前任务resubmit = currentTask.execute(this);} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);LOG.error("Unexpected error occurred in task {}", currentTask, e);} finally {MDC.remove("actor-name");clock.update();}if (resubmit) {currentTask.resubmit();}}

ActorTask.java

ActorTask 的执行流程,它会不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll(),从这里可以看到submittedJobs 和fastlaneJobs 的区别!

找到job 后开始执行当前job

public boolean execute(final ActorThread runner) {schedulingState.set(TaskSchedulingState.ACTIVE);boolean resubmit = false;// 不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll()while (!resubmit && (currentJob != null || poll())) {currentJob.execute(runner);switch (currentJob.schedulingState) {case TERMINATED -> {final ActorJob terminatedJob = currentJob;// 从fastlaneJobs任务集合中拉取任务currentJob = fastLaneJobs.poll();// 如果是通过订阅触发的任务if (terminatedJob.isTriggeredBySubscription()) {final ActorSubscription subscription = terminatedJob.getSubscription();// 如果订阅是一次性的,那么在订阅触发后则将订阅移除if (!subscription.isRecurring()) {removeSubscription(subscription);}// 执行订阅的回调任务subscription.onJobCompleted();} else {runner.recycleJob(terminatedJob);}}case QUEUED ->// the task is experiencing backpressure: do not retry it right now, instead re-enqueue// the actor task.// this allows other tasks which may be needed to unblock the backpressure to runresubmit = true;default -> {}}if (shouldYield) {shouldYield = false;resubmit = currentJob != null;break;}}if (currentJob == null) {resubmit = onAllJobsDone();}return resubmit;}private boolean poll() {boolean result = false;result |= pollSubmittedJobs();result |= pollSubscriptions();return result;}

ActorJob.java

ActorJob 的执行逻辑

还记得上面说过ActorJob 可以理解为runnable 的吗,在invoke 中ActorJob 中的runnable 真正执行了,至此job 的执行过程结束

	void execute(final ActorThread runner) {actorThread = runner;observeSchedulingLatency(runner.getActorMetrics());try {// 执行actor 的 callable 或者 runnable 方法invoke();if (resultFuture != null) {resultFuture.complete(invocationResult);resultFuture = null;}} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);task.onFailure(e);} finally {actorThread = null;// 无论那种情况,成功或者失败,都要判断是否job 应该被resubmitted// in any case, success or exception, decide if the job should be resubmittedif (isTriggeredBySubscription() || runnable == null) {schedulingState = TaskSchedulingState.TERMINATED;} else {schedulingState = TaskSchedulingState.QUEUED;scheduledAt = System.nanoTime();}}}private void invoke() throws Exception {if (callable != null) {invocationResult = callable.call();} else {// only tasks triggered by a subscription can "yield"; everything else just executes onceif (!isTriggeredBySubscription()) {final Runnable r = runnable;runnable = null;r.run();} else {// runnable 真正执行runnable.run();}}}

总结📝

本文中的激活例子其实只是列举了Actor 的实现原理,想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此,对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘🤏~

zeebe 团队真的是太喜欢functional programming了,找一个方法的调用链头都大了😅

相关文章:

【源码分析】zeebe actor模型源码解读

zeebe actor 模型&#x1f64b;‍♂️ 如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法&#xff0c;那么这篇文章就围绕actor.run 方法&#xff0c;说说zeebe actor 的模型。 环境⛅ zeebe release-8.1.14 actor.run() 是怎么开始的&#x1f308; Lon…...

python3实现类似expect shell的交互式与SFTP的脚本

前面写过一篇关于python实现类似expect shell的交互式能力的文章&#xff0c;现在补全一下加上sftp的能力脚本。 例子在代码中__example()方法。 依赖paramiko库&#xff0c;所以需要执行pip install paramiko来安装。 import os import queue import re import threading im…...

java游戏制作-飞翔的鸟游戏

一.准备工作 首先创建一个新的Java项目命名为“飞翔的鸟”&#xff0c;并在src中创建一个包命名为“com.qiku.bird"&#xff0c;在这个包内分别创建4个类命名为“Bird”、“BirdGame”、“Column”、“Ground”&#xff0c;并向需要的图片素材导入到包内。 二.代码呈现 pa…...

NodeMCU ESP8266构建Web Server网页端控制设备

NodeMCU ESP8266构建Web Server网页端控制设备 前言 NodeMCU ESP8266 内部集成了TCP/IP协议栈&#xff0c;可以快速构建网络功能&#xff0c;搭建联网应用的硬件平台&#xff1b; ESP8266可以作为WiFi接入点&#xff08;Station&#xff09;&#xff0c;这样可以方便连接互联…...

搭建区块链

参考B站FISCO BCOS(十八) java SDK与区块链交互_哔哩哔哩_bilibili 林中有神君 一、搭建第一个区块链网络 根据官方文档搭建&#xff1a;搭建第一个区块链网络 — FISCO BCOS v2.9.0 文档 (fisco-bcos-documentation.readthedocs.io) 使用javajdk 控制台2.6之后 本处是2.9.2…...

Python通过selenium调用IE11浏览器报错解决方法

前提 正常安装Python 工具&#xff0c;selenium 包可以正常导入。IE浏览器驱动 IEDriverServer.exe 已经正确放置到已经添加path目录的文件下。 报错现象&#xff1a; 解决方法 打开浏览器进入 internet 选项 切换到安全页签 &#xff0c;去除“应用保护模式” 再次调用验证…...

Ubuntu 1.84.2Visual Studio Code 下载配置与vscode查看内存Hex Editor插件,简单易懂

目录 前言 一 首先我为啥要重装Vs Code呢&#xff1f; 二 下载1.84.2Visual Studio Code 三 配置Vscode终端字体 四 安装插件 前言 这是一篇将老版本的VsCode下载至最新版的博文&#xff0c;从下载到调试全篇 一 首先我为啥要重装Vs Code呢&#xff1f; 因为我想安装这个…...

opencv-图像金字塔

图像金字塔是一种图像处理技术&#xff0c;它通过不断降低图像的分辨率&#xff0c;形成一系列图像。金字塔分为两种类型&#xff1a;高斯金字塔和拉普拉斯金字塔。 高斯金字塔&#xff08;Gaussian Pyramid&#xff09;&#xff1a; 高斯金字塔是通过使用高斯滤波和降采样&a…...

字符串匹配算法——KMP

有文本串aabaabaaf&#xff0c;模式串aabaaf问文本串中是否出现过模式串 暴力解法 最不用动脑子的&#xff0c;直接两层for循环&#xff0c;逐个匹配&#xff0c;匹配到不相等的值时把文本串后移一位&#xff0c;再重新比较。这种方法的复杂度是O(mn)&#xff0c;该方法低效的…...

电子学会C/C++编程等级考试2023年03月(一级)真题解析

C/C++等级考试(1~8级)全部真题・点这里 第1题:字符长方形 给定一个字符,用它构造一个长为4个字符,宽为3个字符的长方形,可以参考样例输出。 时间限制:1000 内存限制:65536输入 输入只有一行, 包含一个字符。输出 该字符构成的长方形,长4个字符,宽3个字符。样例输入…...

微信小程序汽车租赁系统

微信小程序汽车租赁系统 本系统包含了3类用户&#xff0c;分别为客户、员工以及管理员&#xff0c;客户主要是满足自身的租车需求&#xff0c;员工主要负责车辆的调度问题和维修状况&#xff0c;管理员则是主要对人员、车辆和订单的管理。以下是对各自功能的详细介绍: 客户可…...

docker部署微服务

目录 docker操作命令 镜像操作命令 拉取镜像 导出镜像 删除镜像 加载镜像 推送镜像 部署 pom文件加上 在每个模块根目录加上DockerFile文件 项目根目录加上docker-compose.yml文件 打包&#xff0c;clean&#xff0c;package 服务器上新建文件夹 测试docker-compo…...

统计voc格式数据中的xml标签、bndbox到excel表格中

有这么个需求是将xml的内容: 1,filename 2.label 3.bndbox:xmin,xmax,ymin,ymax。 … 将这些东西写入excel表格中,方便我统计标签数量和框的分布! 于是撰写了脚本:xml2csv.py 我的xml文件形式如下。大家的目标检测格式大同小异! <annotation><folder>UAV_d…...

51单片机利用I/O口高阻状态实现触摸控制LED灯

51单片机利用I/O口高阻状态实现触摸控制LED灯 1.概述 这篇文章介绍使用I/O口的高阻状态实现一个触摸控制LED灯亮灭的实验。该实验通过手触摸P3.7引脚&#xff0c;改变电平信号控制灯的亮灭。 2.实验过程 2.1.实验材料 名称型号数量单片机STC12C20521LED彩灯无1晶振12MHZ1电…...

自动驾驶术语汇总

目录 智驾级别芯片相关自动驾驶相关辅助驾驶相关预警相关传感器相关泊车相关安全相关车灯相关 智驾级别 L0-L2属于辅助驾驶&#xff0c;L4-L5才算自动驾驶 L0&#xff08;Level 0&#xff09;&#xff1a;无自动化。这是大多数传统汽车的级别&#xff0c;所有的驾驶任务都需要…...

Jsonpath - 数据中快速查找和提取的强大工具

JSON&#xff08;JavaScript Object Notation&#xff09;在现代应用程序中广泛使用&#xff0c;但是如何在复杂的JSON数据中 查找和提取所需的信息呢&#xff1f; JSONPath是一种功能强大的查询语言&#xff0c;可以通过简单的表达式来快速准确地定位和提取JSON数据。本文将介…...

java中,通过替换word模板中的关键字后输出一个新文档

一、要用到的jar包 我已上传了相关的jar包&#xff0c;需要的可以通过以下链接直接下载&#xff1a; https://download.csdn.net/download/qq_27387133/88558034 具体jar包截图&#xff1a; 二、实现的代码 注意&#xff1a;文件要用docx格式!!! word变量替换的方法&#…...

MySQL数据库约束你真的懂吗?

✏️✏️✏️今天给各位带来的是关于数据库约束方面的知识 清风的CSDN博客 &#x1f61b;&#x1f61b;&#x1f61b;希望我的文章能对你有所帮助&#xff0c;有不足的地方还请各位看官多多指教&#xff0c;大家一起学习交流&#xff01; 动动你们发财的小手&#xff0c;点点关…...

YOCTO 下载repo工具失败解决办法

curl https://mirrors.tuna.tsinghua.edu.cn/git/git-repo -o repocp repo ~/binchmod ax ~/bin/repo如果使用时报错&#xff0c; 切换ubuntu 到 python3 版本。gedit repo 修改repo默认链接地址&#xff1a;REPO_URL "https://gerrit.googlesource.com/git-repo"…...

github连接失败Host key verification failed.解决方案

问题描述 之前一直用的gitee协同协作&#xff0c;然后再最近一次云计算项目中团队使用的是github进行协作&#xff0c;但是按照常规步骤再GitHub上配置了ssh密钥后&#xff0c;却依然显示连接失败&#xff0c;无法推送和拉取代码&#xff0c;克隆仓库也是报错拒绝。具体报错信…...

KeyPass密码管理架构解析:如何在本地构建企业级安全防线

KeyPass密码管理架构解析&#xff1a;如何在本地构建企业级安全防线 【免费下载链接】KeyPass KeyPass: Open Source Project & An Offline Password Manager. Store, manage, and take control securely. 项目地址: https://gitcode.com/gh_mirrors/ke/KeyPass 在云…...

Python 开发中“相对导入超出包范围” 问题详解

文章目录 Python 开发中“相对导入超出包范围” 问题详解 一、相对导入基础语法 二、错误复现:直接执行一个使用相对导入的模块 三、根因分析:`__name__` 与 `__package__` 的魔法 1. 当模块被直接运行时 2. 当模块被作为包的一部分导入时 四、常见触发场景 场景 1:直接在 I…...

3步安装Mitsuba-Blender插件:免费实现Blender物理级渲染效果

3步安装Mitsuba-Blender插件&#xff1a;免费实现Blender物理级渲染效果 【免费下载链接】mitsuba-blender Mitsuba integration add-on for Blender 项目地址: https://gitcode.com/gh_mirrors/mi/mitsuba-blender 想要在Blender中体验专业级的物理渲染效果吗&#xff…...

【2024 Laravel AI生产环境故障白皮书】:基于172个真实项目日志分析的TOP 5致命报错及Hotfix补丁包

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Laravel 12 AI集成故障的底层归因模型与防御范式演进 Laravel 12 引入了原生异步任务调度、更严格的类型约束及基于 PHP 8.3 的 JIT 兼容性增强&#xff0c;但其与外部 AI 服务&#xff08;如 LLM API…...

将Claude Code编程助手配置为使用Taotoken通道的具体方法

将Claude Code编程助手配置为使用Taotoken通道的具体方法 1. 准备工作 在开始配置之前&#xff0c;请确保您已经拥有有效的Taotoken API Key。该Key可以在Taotoken控制台的API密钥管理页面创建。同时&#xff0c;您需要确定要使用的模型ID&#xff0c;该信息可以在Taotoken模…...

电网电压畸变也不怕:5分钟看懂SOGI-PLL如何让你的PWM整流器更稳定

电网电压畸变下的稳定之道&#xff1a;SOGI-PLL在PWM整流器中的实战解析 当电网电压出现谐波污染、频率波动或三相不平衡时&#xff0c;传统锁相环就像在暴风雨中航行的船只&#xff0c;难以保持稳定。而双二阶广义积分锁相环(DSOGI-PLL)则如同装备了先进稳定系统的现代舰艇&am…...

逆向爬虫时,那些VM开头的JS文件到底是什么?从原理到实战绕过动态Debugger

逆向爬虫中VM脚本的奥秘&#xff1a;从动态代码注入到Debugger绕过实战 打开Chrome开发者工具时&#xff0c;你是否注意过那些以"VM"开头的神秘脚本文件&#xff1f;这些看似随机的数字编号背后&#xff0c;隐藏着现代JavaScript引擎的核心机制。对于从事逆向工程和…...

DoVer框架:多智能体系统调试的高效解决方案

1. 项目背景与核心价值 去年在构建一个基于大语言模型&#xff08;LLM&#xff09;的客服系统时&#xff0c;我遇到了一个典型问题&#xff1a;当多个AI智能体协同工作时&#xff0c;系统经常出现难以追踪的异常行为。某个对话流程突然中断&#xff0c;或是智能体之间传递了错误…...

Polyscope与Python集成:打造高效科学计算可视化工作流

Polyscope与Python集成&#xff1a;打造高效科学计算可视化工作流 【免费下载链接】polyscope A C & Python viewer for 3D data like meshes and point clouds 项目地址: https://gitcode.com/gh_mirrors/po/polyscope Polyscope是一款强大的C/Python 3D数据可视化…...

保姆级教程:用Monocle2和ggplot2搞定单细胞拟时分析的可视化(附代码)

单细胞拟时分析可视化实战&#xff1a;从Monocle2基础到ggplot2高级定制 在单细胞转录组研究中&#xff0c;拟时分析&#xff08;Pseudotime Analysis&#xff09;已经成为解析细胞动态变化过程的重要工具。不同于传统的静态细胞分类&#xff0c;拟时分析能够揭示细胞状态转变的…...