flink job同时使用BroadcastProcessFunction和KeyedBroadcastProcessFunction例子
背景:
广播状态可以用于规则表或者配置表的实时更新,本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用
BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用
1.首先看主流程,主流程中使用了两个Broadcast广播的状态,这两个Broadcast广播的状态是独立的
// 这里面包含规则广播状态的两次使用方法,分别在DynamicKeyFunction处理函数和DynamicAlertFunction处理函数,注意这两个处理函数中的广播状态是独立的,也就是需要分别维度,不能共享// Processing pipeline setupDataStream<Alert> alerts =transactions.connect(rulesStream).process(new DynamicKeyFunction()).uid("DynamicKeyFunction").name("Dynamic Partitioning Function").keyBy((keyed) -> keyed.getKey()).connect(rulesStream).process(new DynamicAlertFunction()).uid("DynamicAlertFunction").name("Dynamic Rule Evaluation Function");
2.BroadcastProcessFunction的处理,这里面会维护这个算子本身的广播状态,并把所有的事件扩散发送到下一个算子
public class DynamicKeyFunctionextends BroadcastProcessFunction<Transaction, Rule, Keyed<Transaction, String, Integer>> {@Overridepublic void open(Configuration parameters) {}// 这里会把每个事件结合上广播状态中的每个规则生成N条记录,流转到下一个算子@Overridepublic void processElement(Transaction event, ReadOnlyContext ctx, Collector<Keyed<Transaction, String, Integer>> out)throws Exception {ReadOnlyBroadcastState<Integer, Rule> rulesState =ctx.getBroadcastState(Descriptors.rulesDescriptor);forkEventForEachGroupingKey(event, rulesState, out);}// 独立维护广播状态,可以在广播状态中新增删除或者清空广播状态@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Keyed<Transaction, String, Integer>> out) throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule.getControlType(), broadcastState);}}}static void handleRuleBroadcast(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {switch (rule.getRuleState()) {case ACTIVE:case PAUSE:broadcastState.put(rule.getRuleId(), rule);break;case DELETE:broadcastState.remove(rule.getRuleId());break;}}
3.KeyedBroadcastProcessFunction的处理,这里面也是会维护这个算子本身的广播状态,此外还有键值分区状态,特别注意的是在处理广播元素时,可以用applyToKeyedState方法对所有的键值分区状态应用某个方法,对于ontimer方法,依然可以访问键值分区状态和广播状态
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.field.dynamicrules.functions;import static com.ververica.field.dynamicrules.functions.ProcessingUtils.addToStateValuesSet;
import static com.ververica.field.dynamicrules.functions.ProcessingUtils.handleRuleBroadcast;import com.ververica.field.dynamicrules.Alert;
import com.ververica.field.dynamicrules.FieldsExtractor;
import com.ververica.field.dynamicrules.Keyed;
import com.ververica.field.dynamicrules.Rule;
import com.ververica.field.dynamicrules.Rule.ControlType;
import com.ververica.field.dynamicrules.Rule.RuleState;
import com.ververica.field.dynamicrules.RuleHelper;
import com.ververica.field.dynamicrules.RulesEvaluator.Descriptors;
import com.ververica.field.dynamicrules.Transaction;
import java.math.BigDecimal;
import java.util.*;
import java.util.Map.Entry;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.accumulators.SimpleAccumulator;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;/** Implements main rule evaluation and alerting logic. */
@Slf4j
public class DynamicAlertFunctionextends KeyedBroadcastProcessFunction<String, Keyed<Transaction, String, Integer>, Rule, Alert> {private static final String COUNT = "COUNT_FLINK";private static final String COUNT_WITH_RESET = "COUNT_WITH_RESET_FLINK";private static int WIDEST_RULE_KEY = Integer.MIN_VALUE;private static int CLEAR_STATE_COMMAND_KEY = Integer.MIN_VALUE + 1;private transient MapState<Long, Set<Transaction>> windowState;private Meter alertMeter;private MapStateDescriptor<Long, Set<Transaction>> windowStateDescriptor =new MapStateDescriptor<>("windowState",BasicTypeInfo.LONG_TYPE_INFO,TypeInformation.of(new TypeHint<Set<Transaction>>() {}));@Overridepublic void open(Configuration parameters) {windowState = getRuntimeContext().getMapState(windowStateDescriptor);alertMeter = new MeterView(60);getRuntimeContext().getMetricGroup().meter("alertsPerSecond", alertMeter);}// 键值分区状态和广播状态联合处理,在这个方法中可以更新键值分区状态,然后广播状态只能读取@Overridepublic void processElement(Keyed<Transaction, String, Integer> value, ReadOnlyContext ctx, Collector<Alert> out)throws Exception {long currentEventTime = value.getWrapped().getEventTime();addToStateValuesSet(windowState, currentEventTime, value.getWrapped());long ingestionTime = value.getWrapped().getIngestionTimestamp();ctx.output(Descriptors.latencySinkTag, System.currentTimeMillis() - ingestionTime);Rule rule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());if (noRuleAvailable(rule)) {log.error("Rule with ID {} does not exist", value.getId());return;}if (rule.getRuleState() == Rule.RuleState.ACTIVE) {Long windowStartForEvent = rule.getWindowStartFor(currentEventTime);long cleanupTime = (currentEventTime / 1000) * 1000;ctx.timerService().registerEventTimeTimer(cleanupTime);SimpleAccumulator<BigDecimal> aggregator = RuleHelper.getAggregator(rule);for (Long stateEventTime : windowState.keys()) {if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {aggregateValuesInState(stateEventTime, aggregator, rule);}}BigDecimal aggregateResult = aggregator.getLocalValue();boolean ruleResult = rule.apply(aggregateResult);ctx.output(Descriptors.demoSinkTag,"Rule "+ rule.getRuleId()+ " | "+ value.getKey()+ " : "+ aggregateResult.toString()+ " -> "+ ruleResult);if (ruleResult) {if (COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {evictAllStateElements();}alertMeter.markEvent();out.collect(new Alert<>(rule.getRuleId(), rule, value.getKey(), value.getWrapped(), aggregateResult));}}}//维护广播状态,新增/删除或者整个清空,值得注意的是,处理广播元素时可以对所有的键值分区状态应用某个函数,比如这里当收到某个属于控制消息的广播消息时,使用applyToKeyedState方法把所有的键值分区状态都清空@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out)throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);updateWidestWindowRule(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule, broadcastState, ctx);}}private void handleControlCommand(Rule command, BroadcastState<Integer, Rule> rulesState, Context ctx) throws Exception {ControlType controlType = command.getControlType();switch (controlType) {case EXPORT_RULES_CURRENT:for (Map.Entry<Integer, Rule> entry : rulesState.entries()) {ctx.output(Descriptors.currentRulesSinkTag, entry.getValue());}break;case CLEAR_STATE_ALL:ctx.applyToKeyedState(windowStateDescriptor, (key, state) -> state.clear());break;case CLEAR_STATE_ALL_STOP:rulesState.remove(CLEAR_STATE_COMMAND_KEY);break;case DELETE_RULES_ALL:Iterator<Entry<Integer, Rule>> entriesIterator = rulesState.iterator();while (entriesIterator.hasNext()) {Entry<Integer, Rule> ruleEntry = entriesIterator.next();rulesState.remove(ruleEntry.getKey());log.info("Removed Rule {}", ruleEntry.getValue());}break;}}private boolean isStateValueInWindow(Long stateEventTime, Long windowStartForEvent, long currentEventTime) {return stateEventTime >= windowStartForEvent && stateEventTime <= currentEventTime;}private void aggregateValuesInState(Long stateEventTime, SimpleAccumulator<BigDecimal> aggregator, Rule rule) throws Exception {Set<Transaction> inWindow = windowState.get(stateEventTime);if (COUNT.equals(rule.getAggregateFieldName())|| COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {for (Transaction event : inWindow) {aggregator.add(BigDecimal.ONE);}} else {for (Transaction event : inWindow) {BigDecimal aggregatedValue =FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);aggregator.add(aggregatedValue);}}}private boolean noRuleAvailable(Rule rule) {// This could happen if the BroadcastState in this CoProcessFunction was updated after it was// updated and used in `DynamicKeyFunction`if (rule == null) {return true;}return false;}private void updateWidestWindowRule(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {Rule widestWindowRule = broadcastState.get(WIDEST_RULE_KEY);if (rule.getRuleState() != Rule.RuleState.ACTIVE) {return;}if (widestWindowRule == null) {broadcastState.put(WIDEST_RULE_KEY, rule);return;}if (widestWindowRule.getWindowMillis() < rule.getWindowMillis()) {broadcastState.put(WIDEST_RULE_KEY, rule);}}// ontimer方法中可以访问/更新键值分区状态,读取广播状态,此外ontimer方法和processElement方法以及processBroadcastElement方法是同步的,不需要考虑并发访问的问题@Overridepublic void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<Alert> out)throws Exception {Rule widestWindowRule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);Optional<Long> cleanupEventTimeWindow =Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);Optional<Long> cleanupEventTimeThreshold =cleanupEventTimeWindow.map(window -> timestamp - window);cleanupEventTimeThreshold.ifPresent(this::evictAgedElementsFromWindow);}private void evictAgedElementsFromWindow(Long threshold) {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {Long stateEventTime = keys.next();if (stateEventTime < threshold) {keys.remove();}}} catch (Exception ex) {throw new RuntimeException(ex);}}private void evictAllStateElements() {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {keys.next();keys.remove();}} catch (Exception ex) {throw new RuntimeException(ex);}}
}
ps: ontimer方法和processElement方法是同步访问的,没有并发的问题,所以不需要考虑同时更新键值分区状态的线程安全问题
参考文献:
https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/
相关文章:

flink job同时使用BroadcastProcessFunction和KeyedBroadcastProcessFunction例子
背景: 广播状态可以用于规则表或者配置表的实时更新,本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流…...

数据中心系统解决方案
设计思路 系统设计过程中充分考虑各个子系统的信息共享要求,对各子系统进行结构化和标准化设计,通过系统间的各种联动方式将其整合成一个有机的整体,使之成为一套整体的、全方位的数据中心大楼综合管理系统,达到人防、物防和技防…...

服务器开设新账户,创建账号并设置密码
实验室又进新同学了,服务器开设新账号搞起来 1、创建用户: 在root权限下,输入命令useradd -m 用户名,如下 sudo useradd -m yonghuming 2、设置密码: 输入命令passwd 用户名 回车,接着输入密码操作&…...

【C++】关于构造函数后面冒号“:“的故事------初始化列表(超详细解析,小白一看就懂)
目录 一、前言 二、 初始化的概念区分 三、初始化列表 (重点) 💦初始化列表的概念理解 💦初始化列表的注意事项 四、共勉 一、前言 在之前的博客学习中,我们已经学习了【C】的六大默认成员函数 ,想必大…...

【Shell 系列教程】shell基本运算符(四)
文章目录 往期回顾关系运算符布尔运算符逻辑运算符字符串运算符文件测试运算符其他检查符: 往期回顾 【Shell 系列教程】shell介绍(一)【Shell 系列教程】shell变量(二)【Shell 系列教程】shell数组(三&am…...

MongoDB安装及开发系例全教程
一、系列文章目录 一、MongoDB安装教程—官方原版 二、MongoDB 使用教程(配置、管理、监控)_linux mongodb 监控 三、MongoDB 基于角色的访问控制 四、MongoDB用户管理 五、MongoDB基础知识详解 六、MongoDB—Indexs 七、MongoDB事务详解 八、MongoDB分片教程 九、Mo…...

ffmpeg命令帮助文档
一:帮助文档的命令格式 ffmpeg -h帮助的基本信息ffmpeg -h long帮助的高级信息ffmpeg -h full帮助的全部信息 ffmpeg的命令使用方式:ffmpeg [options] [[infile options] -i infile] [[outfile options] outfile] 二:将帮助文档输出到文件 …...

回归预测 | Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测
Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测 目录 Matlab实现SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量机的多输入单输出回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.SO-CNN-SVM蛇群算法优化卷积神经网络-支持向量…...

【原创】java+swing+mysql校园共享单车管理系统设计与实现
摘要: 校园共享单车作为一种绿色、便捷的出行方式,在校园内得到了广泛的应用。然而,随着单车数量的增加,管理难度也不断加大。如何提高单车的利用率和管理效率,成为校园共享单车发展面临的重要问题。本文针对这一问题…...

(自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载
(自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载 带后台系统PbootCMS内核开发的网站模板,该模板适用于新闻博客网站、自媒体运营网站等企业,当然其他行业也可以做,只需要把文字图片换成其他行业的即可&#…...

SystemC入门完整编写示例:全加器测试平台
导读: 本文将完整演示基于systemC编写一个全加器的测试平台。具体内容包括:激励平台,监控平台,待测单元的编写,波形文件读取。 1,main函数模块 搭建一个测试平台主要由:Driver, Monitor, DUT(design under …...

动手学深度学习:2.线性回归pytorch实现
动手学深度学习:2.线性回归pytorch实现 1.手动构造数据集2.小批量读取数据集3.定义模型和损失函数4.初始化模型参数5.小批量随机梯度下降优化算法6.训练完整代码Q&A 1.手动构造数据集 import torch from torch.utils import data from d2l import torch as d2l…...

重要的linux指令
系统管理命令 切换用户 su 用户名管理员身份运行 sudo 命令实时显示进程信息(linux下任务管理器) top查看进程信息(ps) ps -efps -ef | grep 进程名 ps -aux | grep 进程名参数说明e 显示所有进程f 全格式a 显示所有程序u 以用户为主的格式来显示程序状况x 显示无控制终端…...

delphi7安装并使用皮肤控件
1、下载控件 我已经上传到云盘,存储位置 2、下载后并解压。 3、打开dephi7,File-Open,打开路径D:\LC\Desktop\vclskin2_XiaZaiBa\d7, 然后将 D:\LC\Desktop\vclskin2_XiaZaiBa\d7文件夹中所有后缀.dcu的文件复制粘贴到delphi安装路…...

安徽省黄山景区免9天门票为哪般?
今日浑浑噩噩地睡了大半天,强撑起身子写网文......可是,题材不好选,本“人民体验官”只得推广人民日报官方微博文化产品《这两个月“黄山每周三免门票”》。 图:来源“人民体验官”推广平台 因年事渐高,又有未愈的呼吸…...

MFC 窗体插入图片
1.制作BMP图像1.bmp 放到res文件夹下,资源视图界面导入res文件夹下的1.bmp 2.添加控件 控件类型修改为Bitmap 图像,选择IDB_BITMAP1 3.效果...

关于中间件技术
中间件是一种独立的系统软件或服务程序,可以帮助分布式应用软件在不同的技术之间共享资源。中间件可以: 1、负责客户机与服务器之间的连接和通信,以及客户机与应用层之间的高效率通信机制。 2、提供应用的负载均衡和高可用性、安全机制与管…...

机器学习中的嵌入:释放表征的威力
简介 机器学习通过使计算机能够从数据学习和做出预测来彻底改变了人工智能领域。机器学习的一个关键方面是数据的表示,因为表示形式的选择极大地影响了算法的性能和有效性。嵌入已成为机器学习中的一种强大技术,提供了一种捕获和编码数据点之间复杂关系的…...

【Midjourney入门教程3】写好prompt常用的参数
文章目录 1、图片描述词(图片链接)文字描述词后缀参数2、权重划分3、后缀参数版本选择:--v版本风格:--style长宽比:--ar多样性: --c二次元化:--niji排除内容:--no--stylize--seed--tile、--q 4、…...

01-单节点部署clickhouse及简单使用
1、下载rpm安装包: 官网:https://packages.clickhouse.com/rpm/stable/ clickhouse19.4版本之后只需下载3个rpm安装包,上传到节点目录即可 2、rpm包安装: 安装顺序为conmon->server->client 执行 rpm -ivh ./clickhouse-…...

项目实战:展示第一页数据
1、在FruitDao接口中添加查询第一页数据和查询总记录条数 package com.csdn.fruit.dao; import com.csdn.fruit.pojo.Fruit; import java.util.List; //dao :Data Access Object 数据访问对象 //接口设计 public interface FruitDao {void addFruit(Fruit fruit);vo…...

c#中使用METest单元测试
METest是一个用于测试C#代码的单元测试框架。单元测试是一种软件测试方法,用于验证代码的各个单元(函数、方法、类等)是否按照预期工作。METest提供了一种简单而强大的方式来编写和运行单元测试。 TestMethod:这是一个特性&#…...

七月论文审稿GPT第二版:从Meta Nougat、GPT4审稿到Mistral、LLaMA LongLora
前言 如此前这篇文章《学术论文GPT的源码解读与微调:从chatpaper、gpt_academic到七月论文审稿GPT》中的第三部分所述,对于论文的摘要/总结、对话、翻译、语法检查而言,市面上的学术论文GPT的效果虽暂未有多好,可至少还过得去&am…...

社群团购对接合作,你有研究过社群团购平台的选品吗?
社群团购对接合作,你有研究过社群团购平台的选品吗? 社群团购选品是非常重要的一项工作,一个好的社群团购平台选品逻辑包含了:用户定位,时节性,产品性价比,售后率。用户定位在选品过程中非常重要…...

VSCode 如何设置背景图片
VSCode 设置背景图片 1.打开应用商店,搜索 background ,选择第一个,点击安装。 2. 安装完成后点击设置,点击扩展设置。 3.点击在 settings.json 中编辑。 4.将原代码注释后,加入以下代码。 // { // "workben…...

【数据结构】单向链表的增删查改以及指定pos位置的插入删除
目录 单向链表的概念及结构 尾插 头插 尾删 编辑 头删 查找 在pos位置前插 在pos位置后插 删除pos位置 删除pos的后一个位置 总结 代码 单向链表的概念及结构 概念:链表是一种 物理存储结构上非连续 、非顺序的存储结构,数据元素的 逻辑顺序 是…...

PageRank算法c++实现
首先用邻接矩阵A表示从页面j到页面i的概率,然后根据公式生成转移概率矩阵 M(1-d)*Qd*A 常量矩阵Q(qi,j),qi,j1/n 给定点击概率d,等级值初始向量R0,迭代终止条件e; 计算Ri1M*R…...

超低价:阿里云双11服务器优惠价格表_87元一年起
2023阿里云双十一优惠活动已经开启了,轻量2核2G服务器3M带宽优惠价87元一年、2核4G4M带宽优惠价165元一年,云服务器ECS经济型e实例2核2G3M固定带宽优惠价格99元一年,还有2核4G、2核8G、4核8G、4核16G、8核32G等配置报价,云服务器e…...

docker的安装Centos8
在CentOS 7中,可以使用yum安装Docker。Docker官方提供了一个yum源,可以用于安装Docker。以下是安装Docker的步骤: 卸载旧版本的Docker(如果有) 如果你之前安装过Docker,需要先卸载旧版本的Docker。执行以…...

Android.mk文件制定了链接库,但是出现ld Error
问题描述 Android.mk文件中,指定了库: LOCAL_LDLIBS : -lmylib LOCAL_LDFLAGS -L$(MYLIB_DIR)/lib出现ld: error: undefined symbol: my_function,于是查看so里面是否有my_function函数: nm -D libmylib.so | grep my_functio…...