flink job同时使用BroadcastProcessFunction和KeyedBroadcastProcessFunction例子
背景:
广播状态可以用于规则表或者配置表的实时更新,本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用
BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用
1.首先看主流程,主流程中使用了两个Broadcast广播的状态,这两个Broadcast广播的状态是独立的
// 这里面包含规则广播状态的两次使用方法,分别在DynamicKeyFunction处理函数和DynamicAlertFunction处理函数,注意这两个处理函数中的广播状态是独立的,也就是需要分别维度,不能共享// Processing pipeline setupDataStream<Alert> alerts =transactions.connect(rulesStream).process(new DynamicKeyFunction()).uid("DynamicKeyFunction").name("Dynamic Partitioning Function").keyBy((keyed) -> keyed.getKey()).connect(rulesStream).process(new DynamicAlertFunction()).uid("DynamicAlertFunction").name("Dynamic Rule Evaluation Function");
2.BroadcastProcessFunction的处理,这里面会维护这个算子本身的广播状态,并把所有的事件扩散发送到下一个算子
public class DynamicKeyFunctionextends BroadcastProcessFunction<Transaction, Rule, Keyed<Transaction, String, Integer>> {@Overridepublic void open(Configuration parameters) {}// 这里会把每个事件结合上广播状态中的每个规则生成N条记录,流转到下一个算子@Overridepublic void processElement(Transaction event, ReadOnlyContext ctx, Collector<Keyed<Transaction, String, Integer>> out)throws Exception {ReadOnlyBroadcastState<Integer, Rule> rulesState =ctx.getBroadcastState(Descriptors.rulesDescriptor);forkEventForEachGroupingKey(event, rulesState, out);}// 独立维护广播状态,可以在广播状态中新增删除或者清空广播状态@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Keyed<Transaction, String, Integer>> out) throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule.getControlType(), broadcastState);}}}static void handleRuleBroadcast(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {switch (rule.getRuleState()) {case ACTIVE:case PAUSE:broadcastState.put(rule.getRuleId(), rule);break;case DELETE:broadcastState.remove(rule.getRuleId());break;}}
3.KeyedBroadcastProcessFunction的处理,这里面也是会维护这个算子本身的广播状态,此外还有键值分区状态,特别注意的是在处理广播元素时,可以用applyToKeyedState方法对所有的键值分区状态应用某个方法,对于ontimer方法,依然可以访问键值分区状态和广播状态
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.field.dynamicrules.functions;import static com.ververica.field.dynamicrules.functions.ProcessingUtils.addToStateValuesSet;
import static com.ververica.field.dynamicrules.functions.ProcessingUtils.handleRuleBroadcast;import com.ververica.field.dynamicrules.Alert;
import com.ververica.field.dynamicrules.FieldsExtractor;
import com.ververica.field.dynamicrules.Keyed;
import com.ververica.field.dynamicrules.Rule;
import com.ververica.field.dynamicrules.Rule.ControlType;
import com.ververica.field.dynamicrules.Rule.RuleState;
import com.ververica.field.dynamicrules.RuleHelper;
import com.ververica.field.dynamicrules.RulesEvaluator.Descriptors;
import com.ververica.field.dynamicrules.Transaction;
import java.math.BigDecimal;
import java.util.*;
import java.util.Map.Entry;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.accumulators.SimpleAccumulator;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;/** Implements main rule evaluation and alerting logic. */
@Slf4j
public class DynamicAlertFunctionextends KeyedBroadcastProcessFunction<String, Keyed<Transaction, String, Integer>, Rule, Alert> {private static final String COUNT = "COUNT_FLINK";private static final String COUNT_WITH_RESET = "COUNT_WITH_RESET_FLINK";private static int WIDEST_RULE_KEY = Integer.MIN_VALUE;private static int CLEAR_STATE_COMMAND_KEY = Integer.MIN_VALUE + 1;private transient MapState<Long, Set<Transaction>> windowState;private Meter alertMeter;private MapStateDescriptor<Long, Set<Transaction>> windowStateDescriptor =new MapStateDescriptor<>("windowState",BasicTypeInfo.LONG_TYPE_INFO,TypeInformation.of(new TypeHint<Set<Transaction>>() {}));@Overridepublic void open(Configuration parameters) {windowState = getRuntimeContext().getMapState(windowStateDescriptor);alertMeter = new MeterView(60);getRuntimeContext().getMetricGroup().meter("alertsPerSecond", alertMeter);}// 键值分区状态和广播状态联合处理,在这个方法中可以更新键值分区状态,然后广播状态只能读取@Overridepublic void processElement(Keyed<Transaction, String, Integer> value, ReadOnlyContext ctx, Collector<Alert> out)throws Exception {long currentEventTime = value.getWrapped().getEventTime();addToStateValuesSet(windowState, currentEventTime, value.getWrapped());long ingestionTime = value.getWrapped().getIngestionTimestamp();ctx.output(Descriptors.latencySinkTag, System.currentTimeMillis() - ingestionTime);Rule rule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());if (noRuleAvailable(rule)) {log.error("Rule with ID {} does not exist", value.getId());return;}if (rule.getRuleState() == Rule.RuleState.ACTIVE) {Long windowStartForEvent = rule.getWindowStartFor(currentEventTime);long cleanupTime = (currentEventTime / 1000) * 1000;ctx.timerService().registerEventTimeTimer(cleanupTime);SimpleAccumulator<BigDecimal> aggregator = RuleHelper.getAggregator(rule);for (Long stateEventTime : windowState.keys()) {if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {aggregateValuesInState(stateEventTime, aggregator, rule);}}BigDecimal aggregateResult = aggregator.getLocalValue();boolean ruleResult = rule.apply(aggregateResult);ctx.output(Descriptors.demoSinkTag,"Rule "+ rule.getRuleId()+ " | "+ value.getKey()+ " : "+ aggregateResult.toString()+ " -> "+ ruleResult);if (ruleResult) {if (COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {evictAllStateElements();}alertMeter.markEvent();out.collect(new Alert<>(rule.getRuleId(), rule, value.getKey(), value.getWrapped(), aggregateResult));}}}//维护广播状态,新增/删除或者整个清空,值得注意的是,处理广播元素时可以对所有的键值分区状态应用某个函数,比如这里当收到某个属于控制消息的广播消息时,使用applyToKeyedState方法把所有的键值分区状态都清空@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out)throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);updateWidestWindowRule(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule, broadcastState, ctx);}}private void handleControlCommand(Rule command, BroadcastState<Integer, Rule> rulesState, Context ctx) throws Exception {ControlType controlType = command.getControlType();switch (controlType) {case EXPORT_RULES_CURRENT:for (Map.Entry<Integer, Rule> entry : rulesState.entries()) {ctx.output(Descriptors.currentRulesSinkTag, entry.getValue());}break;case CLEAR_STATE_ALL:ctx.applyToKeyedState(windowStateDescriptor, (key, state) -> state.clear());break;case CLEAR_STATE_ALL_STOP:rulesState.remove(CLEAR_STATE_COMMAND_KEY);break;case DELETE_RULES_ALL:Iterator<Entry<Integer, Rule>> entriesIterator = rulesState.iterator();while (entriesIterator.hasNext()) {Entry<Integer, Rule> ruleEntry = entriesIterator.next();rulesState.remove(ruleEntry.getKey());log.info("Removed Rule {}", ruleEntry.getValue());}break;}}private boolean isStateValueInWindow(Long stateEventTime, Long windowStartForEvent, long currentEventTime) {return stateEventTime >= windowStartForEvent && stateEventTime <= currentEventTime;}private void aggregateValuesInState(Long stateEventTime, SimpleAccumulator<BigDecimal> aggregator, Rule rule) throws Exception {Set<Transaction> inWindow = windowState.get(stateEventTime);if (COUNT.equals(rule.getAggregateFieldName())|| COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {for (Transaction event : inWindow) {aggregator.add(BigDecimal.ONE);}} else {for (Transaction event : inWindow) {BigDecimal aggregatedValue =FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);aggregator.add(aggregatedValue);}}}private boolean noRuleAvailable(Rule rule) {// This could happen if the BroadcastState in this CoProcessFunction was updated after it was// updated and used in `DynamicKeyFunction`if (rule == null) {return true;}return false;}private void updateWidestWindowRule(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {Rule widestWindowRule = broadcastState.get(WIDEST_RULE_KEY);if (rule.getRuleState() != Rule.RuleState.ACTIVE) {return;}if (widestWindowRule == null) {broadcastState.put(WIDEST_RULE_KEY, rule);return;}if (widestWindowRule.getWindowMillis() < rule.getWindowMillis()) {broadcastState.put(WIDEST_RULE_KEY, rule);}}// ontimer方法中可以访问/更新键值分区状态,读取广播状态,此外ontimer方法和processElement方法以及processBroadcastElement方法是同步的,不需要考虑并发访问的问题@Overridepublic void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<Alert> out)throws Exception {Rule widestWindowRule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);Optional<Long> cleanupEventTimeWindow =Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);Optional<Long> cleanupEventTimeThreshold =cleanupEventTimeWindow.map(window -> timestamp - window);cleanupEventTimeThreshold.ifPresent(this::evictAgedElementsFromWindow);}private void evictAgedElementsFromWindow(Long threshold) {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {Long stateEventTime = keys.next();if (stateEventTime < threshold) {keys.remove();}}} catch (Exception ex) {throw new RuntimeException(ex);}}private void evictAllStateElements() {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {keys.next();keys.remove();}} catch (Exception ex) {throw new RuntimeException(ex);}}
}
ps: ontimer方法和processElement方法是同步访问的,没有并发的问题,所以不需要考虑同时更新键值分区状态的线程安全问题
参考文献:
https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/
相关文章:
flink job同时使用BroadcastProcessFunction和KeyedBroadcastProcessFunction例子
背景: 广播状态可以用于规则表或者配置表的实时更新,本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流…...
数据中心系统解决方案
设计思路 系统设计过程中充分考虑各个子系统的信息共享要求,对各子系统进行结构化和标准化设计,通过系统间的各种联动方式将其整合成一个有机的整体,使之成为一套整体的、全方位的数据中心大楼综合管理系统,达到人防、物防和技防…...
服务器开设新账户,创建账号并设置密码
实验室又进新同学了,服务器开设新账号搞起来 1、创建用户: 在root权限下,输入命令useradd -m 用户名,如下 sudo useradd -m yonghuming 2、设置密码: 输入命令passwd 用户名 回车,接着输入密码操作&…...
【C++】关于构造函数后面冒号“:“的故事------初始化列表(超详细解析,小白一看就懂)
目录 一、前言 二、 初始化的概念区分 三、初始化列表 (重点) 💦初始化列表的概念理解 💦初始化列表的注意事项 四、共勉 一、前言 在之前的博客学习中,我们已经学习了【C】的六大默认成员函数 ,想必大…...
【Shell 系列教程】shell基本运算符(四)
文章目录 往期回顾关系运算符布尔运算符逻辑运算符字符串运算符文件测试运算符其他检查符: 往期回顾 【Shell 系列教程】shell介绍(一)【Shell 系列教程】shell变量(二)【Shell 系列教程】shell数组(三&am…...
MongoDB安装及开发系例全教程
一、系列文章目录 一、MongoDB安装教程—官方原版 二、MongoDB 使用教程(配置、管理、监控)_linux mongodb 监控 三、MongoDB 基于角色的访问控制 四、MongoDB用户管理 五、MongoDB基础知识详解 六、MongoDB—Indexs 七、MongoDB事务详解 八、MongoDB分片教程 九、Mo…...
ffmpeg命令帮助文档
一:帮助文档的命令格式 ffmpeg -h帮助的基本信息ffmpeg -h long帮助的高级信息ffmpeg -h full帮助的全部信息 ffmpeg的命令使用方式:ffmpeg [options] [[infile options] -i infile] [[outfile options] outfile] 二:将帮助文档输出到文件 …...
回归预测 | Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测
Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测 目录 Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量…...
【原创】java+swing+mysql校园共享单车管理系统设计与实现
摘要: 校园共享单车作为一种绿色、便捷的出行方式,在校园内得到了广泛的应用。然而,随着单车数量的增加,管理难度也不断加大。如何提高单车的利用率和管理效率,成为校园共享单车发展面临的重要问题。本文针对这一问题…...
(自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载
(自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载 带后台系统PbootCMS内核开发的网站模板,该模板适用于新闻博客网站、自媒体运营网站等企业,当然其他行业也可以做,只需要把文字图片换成其他行业的即可&#…...
SystemC入门完整编写示例:全加器测试平台
导读: 本文将完整演示基于systemC编写一个全加器的测试平台。具体内容包括:激励平台,监控平台,待测单元的编写,波形文件读取。 1,main函数模块 搭建一个测试平台主要由:Driver, Monitor, DUT(design under …...
动手学深度学习:2.线性回归pytorch实现
动手学深度学习:2.线性回归pytorch实现 1.手动构造数据集2.小批量读取数据集3.定义模型和损失函数4.初始化模型参数5.小批量随机梯度下降优化算法6.训练完整代码Q&A 1.手动构造数据集 import torch from torch.utils import data from d2l import torch as d2l…...
重要的linux指令
系统管理命令 切换用户 su 用户名管理员身份运行 sudo 命令实时显示进程信息(linux下任务管理器) top查看进程信息(ps) ps -efps -ef | grep 进程名 ps -aux | grep 进程名参数说明e 显示所有进程f 全格式a 显示所有程序u 以用户为主的格式来显示程序状况x 显示无控制终端…...
delphi7安装并使用皮肤控件
1、下载控件 我已经上传到云盘,存储位置 2、下载后并解压。 3、打开dephi7,File-Open,打开路径D:\LC\Desktop\vclskin2_XiaZaiBa\d7, 然后将 D:\LC\Desktop\vclskin2_XiaZaiBa\d7文件夹中所有后缀.dcu的文件复制粘贴到delphi安装路…...
安徽省黄山景区免9天门票为哪般?
今日浑浑噩噩地睡了大半天,强撑起身子写网文......可是,题材不好选,本“人民体验官”只得推广人民日报官方微博文化产品《这两个月“黄山每周三免门票”》。 图:来源“人民体验官”推广平台 因年事渐高,又有未愈的呼吸…...
MFC 窗体插入图片
1.制作BMP图像1.bmp 放到res文件夹下,资源视图界面导入res文件夹下的1.bmp 2.添加控件 控件类型修改为Bitmap 图像,选择IDB_BITMAP1 3.效果...
关于中间件技术
中间件是一种独立的系统软件或服务程序,可以帮助分布式应用软件在不同的技术之间共享资源。中间件可以: 1、负责客户机与服务器之间的连接和通信,以及客户机与应用层之间的高效率通信机制。 2、提供应用的负载均衡和高可用性、安全机制与管…...
机器学习中的嵌入:释放表征的威力
简介 机器学习通过使计算机能够从数据学习和做出预测来彻底改变了人工智能领域。机器学习的一个关键方面是数据的表示,因为表示形式的选择极大地影响了算法的性能和有效性。嵌入已成为机器学习中的一种强大技术,提供了一种捕获和编码数据点之间复杂关系的…...
【Midjourney入门教程3】写好prompt常用的参数
文章目录 1、图片描述词(图片链接)文字描述词后缀参数2、权重划分3、后缀参数版本选择:--v版本风格:--style长宽比:--ar多样性: --c二次元化:--niji排除内容:--no--stylize--seed--tile、--q 4、…...
01-单节点部署clickhouse及简单使用
1、下载rpm安装包: 官网:https://packages.clickhouse.com/rpm/stable/ clickhouse19.4版本之后只需下载3个rpm安装包,上传到节点目录即可 2、rpm包安装: 安装顺序为conmon->server->client 执行 rpm -ivh ./clickhouse-…...
FreeRTOS和RT-Thread的内存管理实战:如何正确使用pvPortMalloc与rt_malloc替代C库malloc
FreeRTOS与RT-Thread内存管理实战:从标准库陷阱到RTOS最佳实践 在嵌入式实时操作系统开发中,动态内存分配就像高空走钢丝——一步失误可能导致系统崩溃。传统C库的malloc/free在RTOS环境中如同穿着拖鞋走钢丝,而pvPortMalloc和rt_malloc则是专…...
别再手撸流程图了!用Vue-super-flow + Element UI 10分钟搞定审批流原型
用Vue-super-flow Element UI快速构建企业级审批流原型 在企业内部管理系统中,审批流程是最常见的功能需求之一。传统的手工绘制流程图方式不仅效率低下,而且难以与业务系统无缝集成。现在,借助Vue-super-flow这一强大的Vue流程图组件&#…...
怀旧服WLK:10人NAXX教官拉苏维奥斯保姆级攻略,暗牧控制与学员轮换时间轴详解
怀旧服WLK:10人NAXX教官拉苏维奥斯保姆级攻略,暗牧控制与学员轮换时间轴详解 在《魔兽世界》怀旧服巫妖王之怒版本中,纳克萨玛斯军事区的教官拉苏维奥斯堪称团队配合的"试金石"。这个看似机制简单的BOSS,却因学员控制与…...
理想汽车AI组织架构重组
把公司拆成心脏、大脑和手脚——理想汽车这波AI组织架构重组到底在赌什么? 导读:李想用一场2小时的全员会,把一家年营收千亿的公司按人体器官逻辑重新组装。这不是比喻,这是组织结构图上的真实节点。从造车到"造人"&…...
ARM架构自托管调试与追踪技术详解
1. ARM架构自托管调试与追踪技术概述在嵌入式系统开发领域,调试技术始终是开发者面临的核心挑战之一。传统JTAG调试方式虽然功能强大,但在生产环境或安全敏感场景中存在明显局限。ARM架构提供的自托管调试(Self-hosted Debug)和追踪(Trace)机制ÿ…...
SAP ABAP OData 接口开发核心知识点梳理(含详图)
在SAP S/4HANA项目开发与前后端对接场景中,OData接口几乎是目前企业最主流、最核心的数据交互方案。无论是SAP Fiori前端页面开发、第三方系统对接、移动端集成,还是外部系统读写SAP业务数据,基本都依赖OData服务实现标准化、轻量化的数据通信…...
github拆分小批量上传文件
Windows端1.把项目重置干净Remove-Item -Recurse -Force tool/.git2.打开文件夹3.把里面所有东西 全部剪切移到桌面只留 1 个小小的文件 就行4.回到终端,依次运行git initPS D:\soft\github\tool> git init Initialized empty Git repository in D:/soft/github/…...
ToDesk、向日葵、UU远程横评:谁才是2026国产远控首
ToDesk、向日葵、UU远程横评:谁才是2026国产远控首选一、前言:国产远控崛起,2026 怎么选?远程控制早已从 “小众工具” 变成个人、办公、游戏、运维的刚需。2026 年国产远控阵营已全面崛起,ToDesk、向日葵、UU 远程成为…...
【YOLO26实战全攻略】21——YOLO26工业质检实战:PCB缺陷检测+划痕分割全流程落地指南
摘要:工业质检中,PCB板微小缺陷漏检、缺陷无法量化、小样本过拟合等问题长期困扰产线效率。本文基于YOLO26的STAL小目标感知机制与实例分割能力,打造从缺陷检测到量化分析的全流程解决方案。涵盖PCB六类缺陷数据集构建、YOLO26模型训练优化、实例分割掩码提取、缺陷尺寸精准…...
Steel:专为AI智能体设计的浏览器自动化API与部署实战
1. 项目概述:为AI应用赋能的浏览器自动化引擎 如果你正在构建一个需要与真实网页交互的AI智能体,或者开发一个复杂的浏览器自动化工具,那么你大概率会遇到一个共同的难题:如何稳定、高效地管理浏览器实例?从处理无头Ch…...
