【源码分析】zeebe actor模型源码解读
zeebe actor 模型🙋♂️
如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。
环境⛅
zeebe release-8.1.14
actor.run() 是怎么开始的🌈

LongPollingActivateJobsHandler.java
以LongPollingActivateJobsHandler 的激活任务方法为例,我们可以看到run 方法实际上执行ActorControl类的run 方法,让我们进到run 方法中。
private ActorControl actor;public void activateJobs(final InflightActivateJobsRequest request) {actor.run(() -> {final InFlightLongPollingActivateJobsRequestsState state =getJobTypeState(request.getType());if (state.shouldAttempt(failedAttemptThreshold)) {activateJobsUnchecked(state, request);} else {completeOrResubmitRequest(request, false);}});}
ActorControl
可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中,添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了,因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中,对于job 的执行要等到队列不断被线程pop 到当前job 的时候。
final ActorTask task;@Overridepublic void run(final Runnable action) {scheduleRunnable(action);}private void scheduleRunnable(final Runnable runnable) {final ActorThread currentActorThread = ActorThread.current();if (currentActorThread != null && currentActorThread.getCurrentTask() == task) {final ActorJob newJob = currentActorThread.newJob();newJob.setRunnable(runnable);newJob.onJobAddedToTask(task);// 插入到执行队列task.insertJob(newJob);} else {final ActorJob job = new ActorJob();job.setRunnable(runnable);job.onJobAddedToTask(task);// 提交到外部队列// submit 实际上是将task 放到thread group 里边task.submit(job);}}
job 是怎么被执行的⚡
并不是任意一个ActorControl 都可以执行run 方法的,按照上图所示,Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task,每个task 中又会有多个job,按照策略选取不同的job 执行,我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。
Gateway.java
gateway 注册task
private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(final ActivateJobsHandler handler) {final var future = new CompletableFuture<ActivateJobsHandler>();final var actor =Actor.newActor().name("ActivateJobsHandler").actorStartedHandler(handler.andThen(t -> future.complete(handler))).build();// 将task 注册到TaskQueuesactorSchedulingService.submitActor(actor);return future;}
ActorThreadGroup.java
就是上面提到的“线程池”,负责初始化每一条ActorThread 线程,并为其分配默认的WorkStealingGroup
protected final String groupName;protected final ActorThread[] threads;protected final WorkStealingGroup tasks;protected final int numOfThreads;// 构造器,初始化每条线程,并为其分配一个默认的WorkStealingGroup 任务队列public ActorThreadGroup(final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {this.groupName = groupName;this.numOfThreads = numOfThreads;tasks = new WorkStealingGroup(numOfThreads);threads = new ActorThread[numOfThreads];for (int t = 0; t < numOfThreads; t++) {final String threadName = String.format("%s-%d", groupName, t);final ActorThread thread =builder.getActorThreadFactory().newThread(threadName,t,this,tasks,builder.getActorClock(),builder.getActorTimerQueue(),builder.isMetricsEnabled());threads[t] = thread;}}// startpublic void start() {for (final ActorThread actorThread : threads) {// 启动每一个ActorThreadactorThread.start();}}
ActorThread.java
ActorThread 继承自Thread,可以看到start=>run=>doWork 的引用流程,在doWork 方法中,首先从taskScheduler 中获取当前task,然后执行当前task
// 继承自Thread @Overridepublic synchronized void start() {if (UNSAFE.compareAndSwapObject(this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {// super.start 会执行下面的run 方法super.start();} else {throw new IllegalStateException("Cannot start runner, not in state 'NEW'.");}}// 主要执行doWork 方法@Overridepublic void run() {idleStrategy.init();while (state == ActorThreadState.RUNNING) {try {doWork();} catch (final Exception e) {LOG.error("Unexpected error occurred while in the actor thread {}", getName(), e);}}state = ActorThreadState.TERMINATED;terminationFuture.complete(null);}private void doWork() {submittedCallbacks.drain(this);if (clock.update()) {timerJobQueue.processExpiredTimers(clock);}// 从taskScheduler 中获取当前taskcurrentTask = taskScheduler.getNextTask();if (currentTask != null) {final var actorName = currentTask.actor.getName();try (final var timer = actorMetrics.startExecutionTimer(actorName)) {// 执行当前任务executeCurrentTask();}if (actorMetrics.isEnabled()) {actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());actorMetrics.countExecution(actorName);}} else {idleStrategy.onIdle();}}private void executeCurrentTask() {final var properties = currentTask.getActor().getContext();MDC.setContextMap(properties);idleStrategy.onTaskExecuted();boolean resubmit = false;try {// 真正执行当前任务resubmit = currentTask.execute(this);} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);LOG.error("Unexpected error occurred in task {}", currentTask, e);} finally {MDC.remove("actor-name");clock.update();}if (resubmit) {currentTask.resubmit();}}
ActorTask.java
ActorTask 的执行流程,它会不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll(),从这里可以看到submittedJobs 和fastlaneJobs 的区别!
找到job 后开始执行当前job
public boolean execute(final ActorThread runner) {schedulingState.set(TaskSchedulingState.ACTIVE);boolean resubmit = false;// 不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll()while (!resubmit && (currentJob != null || poll())) {currentJob.execute(runner);switch (currentJob.schedulingState) {case TERMINATED -> {final ActorJob terminatedJob = currentJob;// 从fastlaneJobs任务集合中拉取任务currentJob = fastLaneJobs.poll();// 如果是通过订阅触发的任务if (terminatedJob.isTriggeredBySubscription()) {final ActorSubscription subscription = terminatedJob.getSubscription();// 如果订阅是一次性的,那么在订阅触发后则将订阅移除if (!subscription.isRecurring()) {removeSubscription(subscription);}// 执行订阅的回调任务subscription.onJobCompleted();} else {runner.recycleJob(terminatedJob);}}case QUEUED ->// the task is experiencing backpressure: do not retry it right now, instead re-enqueue// the actor task.// this allows other tasks which may be needed to unblock the backpressure to runresubmit = true;default -> {}}if (shouldYield) {shouldYield = false;resubmit = currentJob != null;break;}}if (currentJob == null) {resubmit = onAllJobsDone();}return resubmit;}private boolean poll() {boolean result = false;result |= pollSubmittedJobs();result |= pollSubscriptions();return result;}
ActorJob.java
ActorJob 的执行逻辑
还记得上面说过ActorJob 可以理解为runnable 的吗,在invoke 中ActorJob 中的runnable 真正执行了,至此job 的执行过程结束
void execute(final ActorThread runner) {actorThread = runner;observeSchedulingLatency(runner.getActorMetrics());try {// 执行actor 的 callable 或者 runnable 方法invoke();if (resultFuture != null) {resultFuture.complete(invocationResult);resultFuture = null;}} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);task.onFailure(e);} finally {actorThread = null;// 无论那种情况,成功或者失败,都要判断是否job 应该被resubmitted// in any case, success or exception, decide if the job should be resubmittedif (isTriggeredBySubscription() || runnable == null) {schedulingState = TaskSchedulingState.TERMINATED;} else {schedulingState = TaskSchedulingState.QUEUED;scheduledAt = System.nanoTime();}}}private void invoke() throws Exception {if (callable != null) {invocationResult = callable.call();} else {// only tasks triggered by a subscription can "yield"; everything else just executes onceif (!isTriggeredBySubscription()) {final Runnable r = runnable;runnable = null;r.run();} else {// runnable 真正执行runnable.run();}}}
总结📝
本文中的激活例子其实只是列举了Actor 的实现原理,想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此,对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘🤏~
zeebe 团队真的是太喜欢functional programming了,找一个方法的调用链头都大了😅
相关文章:
【源码分析】zeebe actor模型源码解读
zeebe actor 模型🙋♂️ 如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。 环境⛅ zeebe release-8.1.14 actor.run() 是怎么开始的🌈 Lon…...
python3实现类似expect shell的交互式与SFTP的脚本
前面写过一篇关于python实现类似expect shell的交互式能力的文章,现在补全一下加上sftp的能力脚本。 例子在代码中__example()方法。 依赖paramiko库,所以需要执行pip install paramiko来安装。 import os import queue import re import threading im…...
java游戏制作-飞翔的鸟游戏
一.准备工作 首先创建一个新的Java项目命名为“飞翔的鸟”,并在src中创建一个包命名为“com.qiku.bird",在这个包内分别创建4个类命名为“Bird”、“BirdGame”、“Column”、“Ground”,并向需要的图片素材导入到包内。 二.代码呈现 pa…...
NodeMCU ESP8266构建Web Server网页端控制设备
NodeMCU ESP8266构建Web Server网页端控制设备 前言 NodeMCU ESP8266 内部集成了TCP/IP协议栈,可以快速构建网络功能,搭建联网应用的硬件平台; ESP8266可以作为WiFi接入点(Station),这样可以方便连接互联…...
搭建区块链
参考B站FISCO BCOS(十八) java SDK与区块链交互_哔哩哔哩_bilibili 林中有神君 一、搭建第一个区块链网络 根据官方文档搭建:搭建第一个区块链网络 — FISCO BCOS v2.9.0 文档 (fisco-bcos-documentation.readthedocs.io) 使用javajdk 控制台2.6之后 本处是2.9.2…...
Python通过selenium调用IE11浏览器报错解决方法
前提 正常安装Python 工具,selenium 包可以正常导入。IE浏览器驱动 IEDriverServer.exe 已经正确放置到已经添加path目录的文件下。 报错现象: 解决方法 打开浏览器进入 internet 选项 切换到安全页签 ,去除“应用保护模式” 再次调用验证…...
Ubuntu 1.84.2Visual Studio Code 下载配置与vscode查看内存Hex Editor插件,简单易懂
目录 前言 一 首先我为啥要重装Vs Code呢? 二 下载1.84.2Visual Studio Code 三 配置Vscode终端字体 四 安装插件 前言 这是一篇将老版本的VsCode下载至最新版的博文,从下载到调试全篇 一 首先我为啥要重装Vs Code呢? 因为我想安装这个…...
opencv-图像金字塔
图像金字塔是一种图像处理技术,它通过不断降低图像的分辨率,形成一系列图像。金字塔分为两种类型:高斯金字塔和拉普拉斯金字塔。 高斯金字塔(Gaussian Pyramid): 高斯金字塔是通过使用高斯滤波和降采样&a…...
字符串匹配算法——KMP
有文本串aabaabaaf,模式串aabaaf问文本串中是否出现过模式串 暴力解法 最不用动脑子的,直接两层for循环,逐个匹配,匹配到不相等的值时把文本串后移一位,再重新比较。这种方法的复杂度是O(mn),该方法低效的…...
电子学会C/C++编程等级考试2023年03月(一级)真题解析
C/C++等级考试(1~8级)全部真题・点这里 第1题:字符长方形 给定一个字符,用它构造一个长为4个字符,宽为3个字符的长方形,可以参考样例输出。 时间限制:1000 内存限制:65536输入 输入只有一行, 包含一个字符。输出 该字符构成的长方形,长4个字符,宽3个字符。样例输入…...
微信小程序汽车租赁系统
微信小程序汽车租赁系统 本系统包含了3类用户,分别为客户、员工以及管理员,客户主要是满足自身的租车需求,员工主要负责车辆的调度问题和维修状况,管理员则是主要对人员、车辆和订单的管理。以下是对各自功能的详细介绍: 客户可…...
docker部署微服务
目录 docker操作命令 镜像操作命令 拉取镜像 导出镜像 删除镜像 加载镜像 推送镜像 部署 pom文件加上 在每个模块根目录加上DockerFile文件 项目根目录加上docker-compose.yml文件 打包,clean,package 服务器上新建文件夹 测试docker-compo…...
统计voc格式数据中的xml标签、bndbox到excel表格中
有这么个需求是将xml的内容: 1,filename 2.label 3.bndbox:xmin,xmax,ymin,ymax。 … 将这些东西写入excel表格中,方便我统计标签数量和框的分布! 于是撰写了脚本:xml2csv.py 我的xml文件形式如下。大家的目标检测格式大同小异! <annotation><folder>UAV_d…...
51单片机利用I/O口高阻状态实现触摸控制LED灯
51单片机利用I/O口高阻状态实现触摸控制LED灯 1.概述 这篇文章介绍使用I/O口的高阻状态实现一个触摸控制LED灯亮灭的实验。该实验通过手触摸P3.7引脚,改变电平信号控制灯的亮灭。 2.实验过程 2.1.实验材料 名称型号数量单片机STC12C20521LED彩灯无1晶振12MHZ1电…...
自动驾驶术语汇总
目录 智驾级别芯片相关自动驾驶相关辅助驾驶相关预警相关传感器相关泊车相关安全相关车灯相关 智驾级别 L0-L2属于辅助驾驶,L4-L5才算自动驾驶 L0(Level 0):无自动化。这是大多数传统汽车的级别,所有的驾驶任务都需要…...
Jsonpath - 数据中快速查找和提取的强大工具
JSON(JavaScript Object Notation)在现代应用程序中广泛使用,但是如何在复杂的JSON数据中 查找和提取所需的信息呢? JSONPath是一种功能强大的查询语言,可以通过简单的表达式来快速准确地定位和提取JSON数据。本文将介…...
java中,通过替换word模板中的关键字后输出一个新文档
一、要用到的jar包 我已上传了相关的jar包,需要的可以通过以下链接直接下载: https://download.csdn.net/download/qq_27387133/88558034 具体jar包截图: 二、实现的代码 注意:文件要用docx格式!!! word变量替换的方法&#…...
MySQL数据库约束你真的懂吗?
✏️✏️✏️今天给各位带来的是关于数据库约束方面的知识 清风的CSDN博客 😛😛😛希望我的文章能对你有所帮助,有不足的地方还请各位看官多多指教,大家一起学习交流! 动动你们发财的小手,点点关…...
YOCTO 下载repo工具失败解决办法
curl https://mirrors.tuna.tsinghua.edu.cn/git/git-repo -o repocp repo ~/binchmod ax ~/bin/repo如果使用时报错, 切换ubuntu 到 python3 版本。gedit repo 修改repo默认链接地址:REPO_URL "https://gerrit.googlesource.com/git-repo"…...
github连接失败Host key verification failed.解决方案
问题描述 之前一直用的gitee协同协作,然后再最近一次云计算项目中团队使用的是github进行协作,但是按照常规步骤再GitHub上配置了ssh密钥后,却依然显示连接失败,无法推送和拉取代码,克隆仓库也是报错拒绝。具体报错信…...
19c补丁后oracle属主变化,导致不能识别磁盘组
补丁后服务器重启,数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后,存在与用户组权限相关的问题。具体表现为,Oracle 实例的运行用户(oracle)和集…...
Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
LeetCode - 199. 二叉树的右视图
题目 199. 二叉树的右视图 - 力扣(LeetCode) 思路 右视图是指从树的右侧看,对于每一层,只能看到该层最右边的节点。实现思路是: 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...
MySQL JOIN 表过多的优化思路
当 MySQL 查询涉及大量表 JOIN 时,性能会显著下降。以下是优化思路和简易实现方法: 一、核心优化思路 减少 JOIN 数量 数据冗余:添加必要的冗余字段(如订单表直接存储用户名)合并表:将频繁关联的小表合并成…...
【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案
目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后,迭代器会失效,因为顺序迭代器在内存中是连续存储的,元素删除后,后续元素会前移。 但一些场景中,我们又需要在执行删除操作…...
mac:大模型系列测试
0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何,是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试,是可以跑通文章里面的代码。训练速度也是很快的。 注意…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
