flink job同时使用BroadcastProcessFunction和KeyedBroadcastProcessFunction例子
背景:
广播状态可以用于规则表或者配置表的实时更新,本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用
BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用
1.首先看主流程,主流程中使用了两个Broadcast广播的状态,这两个Broadcast广播的状态是独立的
// 这里面包含规则广播状态的两次使用方法,分别在DynamicKeyFunction处理函数和DynamicAlertFunction处理函数,注意这两个处理函数中的广播状态是独立的,也就是需要分别维度,不能共享// Processing pipeline setupDataStream<Alert> alerts =transactions.connect(rulesStream).process(new DynamicKeyFunction()).uid("DynamicKeyFunction").name("Dynamic Partitioning Function").keyBy((keyed) -> keyed.getKey()).connect(rulesStream).process(new DynamicAlertFunction()).uid("DynamicAlertFunction").name("Dynamic Rule Evaluation Function");
2.BroadcastProcessFunction的处理,这里面会维护这个算子本身的广播状态,并把所有的事件扩散发送到下一个算子
public class DynamicKeyFunctionextends BroadcastProcessFunction<Transaction, Rule, Keyed<Transaction, String, Integer>> {@Overridepublic void open(Configuration parameters) {}// 这里会把每个事件结合上广播状态中的每个规则生成N条记录,流转到下一个算子@Overridepublic void processElement(Transaction event, ReadOnlyContext ctx, Collector<Keyed<Transaction, String, Integer>> out)throws Exception {ReadOnlyBroadcastState<Integer, Rule> rulesState =ctx.getBroadcastState(Descriptors.rulesDescriptor);forkEventForEachGroupingKey(event, rulesState, out);}// 独立维护广播状态,可以在广播状态中新增删除或者清空广播状态@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Keyed<Transaction, String, Integer>> out) throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule.getControlType(), broadcastState);}}}static void handleRuleBroadcast(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {switch (rule.getRuleState()) {case ACTIVE:case PAUSE:broadcastState.put(rule.getRuleId(), rule);break;case DELETE:broadcastState.remove(rule.getRuleId());break;}}
3.KeyedBroadcastProcessFunction的处理,这里面也是会维护这个算子本身的广播状态,此外还有键值分区状态,特别注意的是在处理广播元素时,可以用applyToKeyedState方法对所有的键值分区状态应用某个方法,对于ontimer方法,依然可以访问键值分区状态和广播状态
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.field.dynamicrules.functions;import static com.ververica.field.dynamicrules.functions.ProcessingUtils.addToStateValuesSet;
import static com.ververica.field.dynamicrules.functions.ProcessingUtils.handleRuleBroadcast;import com.ververica.field.dynamicrules.Alert;
import com.ververica.field.dynamicrules.FieldsExtractor;
import com.ververica.field.dynamicrules.Keyed;
import com.ververica.field.dynamicrules.Rule;
import com.ververica.field.dynamicrules.Rule.ControlType;
import com.ververica.field.dynamicrules.Rule.RuleState;
import com.ververica.field.dynamicrules.RuleHelper;
import com.ververica.field.dynamicrules.RulesEvaluator.Descriptors;
import com.ververica.field.dynamicrules.Transaction;
import java.math.BigDecimal;
import java.util.*;
import java.util.Map.Entry;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.accumulators.SimpleAccumulator;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;/** Implements main rule evaluation and alerting logic. */
@Slf4j
public class DynamicAlertFunctionextends KeyedBroadcastProcessFunction<String, Keyed<Transaction, String, Integer>, Rule, Alert> {private static final String COUNT = "COUNT_FLINK";private static final String COUNT_WITH_RESET = "COUNT_WITH_RESET_FLINK";private static int WIDEST_RULE_KEY = Integer.MIN_VALUE;private static int CLEAR_STATE_COMMAND_KEY = Integer.MIN_VALUE + 1;private transient MapState<Long, Set<Transaction>> windowState;private Meter alertMeter;private MapStateDescriptor<Long, Set<Transaction>> windowStateDescriptor =new MapStateDescriptor<>("windowState",BasicTypeInfo.LONG_TYPE_INFO,TypeInformation.of(new TypeHint<Set<Transaction>>() {}));@Overridepublic void open(Configuration parameters) {windowState = getRuntimeContext().getMapState(windowStateDescriptor);alertMeter = new MeterView(60);getRuntimeContext().getMetricGroup().meter("alertsPerSecond", alertMeter);}// 键值分区状态和广播状态联合处理,在这个方法中可以更新键值分区状态,然后广播状态只能读取@Overridepublic void processElement(Keyed<Transaction, String, Integer> value, ReadOnlyContext ctx, Collector<Alert> out)throws Exception {long currentEventTime = value.getWrapped().getEventTime();addToStateValuesSet(windowState, currentEventTime, value.getWrapped());long ingestionTime = value.getWrapped().getIngestionTimestamp();ctx.output(Descriptors.latencySinkTag, System.currentTimeMillis() - ingestionTime);Rule rule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());if (noRuleAvailable(rule)) {log.error("Rule with ID {} does not exist", value.getId());return;}if (rule.getRuleState() == Rule.RuleState.ACTIVE) {Long windowStartForEvent = rule.getWindowStartFor(currentEventTime);long cleanupTime = (currentEventTime / 1000) * 1000;ctx.timerService().registerEventTimeTimer(cleanupTime);SimpleAccumulator<BigDecimal> aggregator = RuleHelper.getAggregator(rule);for (Long stateEventTime : windowState.keys()) {if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {aggregateValuesInState(stateEventTime, aggregator, rule);}}BigDecimal aggregateResult = aggregator.getLocalValue();boolean ruleResult = rule.apply(aggregateResult);ctx.output(Descriptors.demoSinkTag,"Rule "+ rule.getRuleId()+ " | "+ value.getKey()+ " : "+ aggregateResult.toString()+ " -> "+ ruleResult);if (ruleResult) {if (COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {evictAllStateElements();}alertMeter.markEvent();out.collect(new Alert<>(rule.getRuleId(), rule, value.getKey(), value.getWrapped(), aggregateResult));}}}//维护广播状态,新增/删除或者整个清空,值得注意的是,处理广播元素时可以对所有的键值分区状态应用某个函数,比如这里当收到某个属于控制消息的广播消息时,使用applyToKeyedState方法把所有的键值分区状态都清空@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out)throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);updateWidestWindowRule(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule, broadcastState, ctx);}}private void handleControlCommand(Rule command, BroadcastState<Integer, Rule> rulesState, Context ctx) throws Exception {ControlType controlType = command.getControlType();switch (controlType) {case EXPORT_RULES_CURRENT:for (Map.Entry<Integer, Rule> entry : rulesState.entries()) {ctx.output(Descriptors.currentRulesSinkTag, entry.getValue());}break;case CLEAR_STATE_ALL:ctx.applyToKeyedState(windowStateDescriptor, (key, state) -> state.clear());break;case CLEAR_STATE_ALL_STOP:rulesState.remove(CLEAR_STATE_COMMAND_KEY);break;case DELETE_RULES_ALL:Iterator<Entry<Integer, Rule>> entriesIterator = rulesState.iterator();while (entriesIterator.hasNext()) {Entry<Integer, Rule> ruleEntry = entriesIterator.next();rulesState.remove(ruleEntry.getKey());log.info("Removed Rule {}", ruleEntry.getValue());}break;}}private boolean isStateValueInWindow(Long stateEventTime, Long windowStartForEvent, long currentEventTime) {return stateEventTime >= windowStartForEvent && stateEventTime <= currentEventTime;}private void aggregateValuesInState(Long stateEventTime, SimpleAccumulator<BigDecimal> aggregator, Rule rule) throws Exception {Set<Transaction> inWindow = windowState.get(stateEventTime);if (COUNT.equals(rule.getAggregateFieldName())|| COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {for (Transaction event : inWindow) {aggregator.add(BigDecimal.ONE);}} else {for (Transaction event : inWindow) {BigDecimal aggregatedValue =FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);aggregator.add(aggregatedValue);}}}private boolean noRuleAvailable(Rule rule) {// This could happen if the BroadcastState in this CoProcessFunction was updated after it was// updated and used in `DynamicKeyFunction`if (rule == null) {return true;}return false;}private void updateWidestWindowRule(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {Rule widestWindowRule = broadcastState.get(WIDEST_RULE_KEY);if (rule.getRuleState() != Rule.RuleState.ACTIVE) {return;}if (widestWindowRule == null) {broadcastState.put(WIDEST_RULE_KEY, rule);return;}if (widestWindowRule.getWindowMillis() < rule.getWindowMillis()) {broadcastState.put(WIDEST_RULE_KEY, rule);}}// ontimer方法中可以访问/更新键值分区状态,读取广播状态,此外ontimer方法和processElement方法以及processBroadcastElement方法是同步的,不需要考虑并发访问的问题@Overridepublic void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<Alert> out)throws Exception {Rule widestWindowRule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);Optional<Long> cleanupEventTimeWindow =Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);Optional<Long> cleanupEventTimeThreshold =cleanupEventTimeWindow.map(window -> timestamp - window);cleanupEventTimeThreshold.ifPresent(this::evictAgedElementsFromWindow);}private void evictAgedElementsFromWindow(Long threshold) {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {Long stateEventTime = keys.next();if (stateEventTime < threshold) {keys.remove();}}} catch (Exception ex) {throw new RuntimeException(ex);}}private void evictAllStateElements() {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {keys.next();keys.remove();}} catch (Exception ex) {throw new RuntimeException(ex);}}
}
ps: ontimer方法和processElement方法是同步访问的,没有并发的问题,所以不需要考虑同时更新键值分区状态的线程安全问题
参考文献:
https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/
相关文章:
flink job同时使用BroadcastProcessFunction和KeyedBroadcastProcessFunction例子
背景: 广播状态可以用于规则表或者配置表的实时更新,本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流…...
数据中心系统解决方案
设计思路 系统设计过程中充分考虑各个子系统的信息共享要求,对各子系统进行结构化和标准化设计,通过系统间的各种联动方式将其整合成一个有机的整体,使之成为一套整体的、全方位的数据中心大楼综合管理系统,达到人防、物防和技防…...
服务器开设新账户,创建账号并设置密码
实验室又进新同学了,服务器开设新账号搞起来 1、创建用户: 在root权限下,输入命令useradd -m 用户名,如下 sudo useradd -m yonghuming 2、设置密码: 输入命令passwd 用户名 回车,接着输入密码操作&…...
【C++】关于构造函数后面冒号“:“的故事------初始化列表(超详细解析,小白一看就懂)
目录 一、前言 二、 初始化的概念区分 三、初始化列表 (重点) 💦初始化列表的概念理解 💦初始化列表的注意事项 四、共勉 一、前言 在之前的博客学习中,我们已经学习了【C】的六大默认成员函数 ,想必大…...
【Shell 系列教程】shell基本运算符(四)
文章目录 往期回顾关系运算符布尔运算符逻辑运算符字符串运算符文件测试运算符其他检查符: 往期回顾 【Shell 系列教程】shell介绍(一)【Shell 系列教程】shell变量(二)【Shell 系列教程】shell数组(三&am…...
MongoDB安装及开发系例全教程
一、系列文章目录 一、MongoDB安装教程—官方原版 二、MongoDB 使用教程(配置、管理、监控)_linux mongodb 监控 三、MongoDB 基于角色的访问控制 四、MongoDB用户管理 五、MongoDB基础知识详解 六、MongoDB—Indexs 七、MongoDB事务详解 八、MongoDB分片教程 九、Mo…...
ffmpeg命令帮助文档
一:帮助文档的命令格式 ffmpeg -h帮助的基本信息ffmpeg -h long帮助的高级信息ffmpeg -h full帮助的全部信息 ffmpeg的命令使用方式:ffmpeg [options] [[infile options] -i infile] [[outfile options] outfile] 二:将帮助文档输出到文件 …...
回归预测 | Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测
Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测 目录 Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量…...
【原创】java+swing+mysql校园共享单车管理系统设计与实现
摘要: 校园共享单车作为一种绿色、便捷的出行方式,在校园内得到了广泛的应用。然而,随着单车数量的增加,管理难度也不断加大。如何提高单车的利用率和管理效率,成为校园共享单车发展面临的重要问题。本文针对这一问题…...
(自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载
(自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载 带后台系统PbootCMS内核开发的网站模板,该模板适用于新闻博客网站、自媒体运营网站等企业,当然其他行业也可以做,只需要把文字图片换成其他行业的即可&#…...
SystemC入门完整编写示例:全加器测试平台
导读: 本文将完整演示基于systemC编写一个全加器的测试平台。具体内容包括:激励平台,监控平台,待测单元的编写,波形文件读取。 1,main函数模块 搭建一个测试平台主要由:Driver, Monitor, DUT(design under …...
动手学深度学习:2.线性回归pytorch实现
动手学深度学习:2.线性回归pytorch实现 1.手动构造数据集2.小批量读取数据集3.定义模型和损失函数4.初始化模型参数5.小批量随机梯度下降优化算法6.训练完整代码Q&A 1.手动构造数据集 import torch from torch.utils import data from d2l import torch as d2l…...
重要的linux指令
系统管理命令 切换用户 su 用户名管理员身份运行 sudo 命令实时显示进程信息(linux下任务管理器) top查看进程信息(ps) ps -efps -ef | grep 进程名 ps -aux | grep 进程名参数说明e 显示所有进程f 全格式a 显示所有程序u 以用户为主的格式来显示程序状况x 显示无控制终端…...
delphi7安装并使用皮肤控件
1、下载控件 我已经上传到云盘,存储位置 2、下载后并解压。 3、打开dephi7,File-Open,打开路径D:\LC\Desktop\vclskin2_XiaZaiBa\d7, 然后将 D:\LC\Desktop\vclskin2_XiaZaiBa\d7文件夹中所有后缀.dcu的文件复制粘贴到delphi安装路…...
安徽省黄山景区免9天门票为哪般?
今日浑浑噩噩地睡了大半天,强撑起身子写网文......可是,题材不好选,本“人民体验官”只得推广人民日报官方微博文化产品《这两个月“黄山每周三免门票”》。 图:来源“人民体验官”推广平台 因年事渐高,又有未愈的呼吸…...
MFC 窗体插入图片
1.制作BMP图像1.bmp 放到res文件夹下,资源视图界面导入res文件夹下的1.bmp 2.添加控件 控件类型修改为Bitmap 图像,选择IDB_BITMAP1 3.效果...
关于中间件技术
中间件是一种独立的系统软件或服务程序,可以帮助分布式应用软件在不同的技术之间共享资源。中间件可以: 1、负责客户机与服务器之间的连接和通信,以及客户机与应用层之间的高效率通信机制。 2、提供应用的负载均衡和高可用性、安全机制与管…...
机器学习中的嵌入:释放表征的威力
简介 机器学习通过使计算机能够从数据学习和做出预测来彻底改变了人工智能领域。机器学习的一个关键方面是数据的表示,因为表示形式的选择极大地影响了算法的性能和有效性。嵌入已成为机器学习中的一种强大技术,提供了一种捕获和编码数据点之间复杂关系的…...
【Midjourney入门教程3】写好prompt常用的参数
文章目录 1、图片描述词(图片链接)文字描述词后缀参数2、权重划分3、后缀参数版本选择:--v版本风格:--style长宽比:--ar多样性: --c二次元化:--niji排除内容:--no--stylize--seed--tile、--q 4、…...
01-单节点部署clickhouse及简单使用
1、下载rpm安装包: 官网:https://packages.clickhouse.com/rpm/stable/ clickhouse19.4版本之后只需下载3个rpm安装包,上传到节点目录即可 2、rpm包安装: 安装顺序为conmon->server->client 执行 rpm -ivh ./clickhouse-…...
springboot 百货中心供应链管理系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,百货中心供应链管理系统被用户普遍使用,为方…...
通过Wrangler CLI在worker中创建数据库和表
官方使用文档:Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后,会在本地和远程创建数据库: npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库: 现在,您的Cloudfla…...
【位运算】消失的两个数字(hard)
消失的两个数字(hard) 题⽬描述:解法(位运算):Java 算法代码:更简便代码 题⽬链接:⾯试题 17.19. 消失的两个数字 题⽬描述: 给定⼀个数组,包含从 1 到 N 所有…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...
