当前位置: 首页 > news >正文

flink1.18 广播流 The Broadcast State Pattern 官方案例scala版本

对应官网

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/

测试数据

 * 广播流 官方案例 scala版本* 广播状态*    https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/* 事件流:*    red,4side*    red,5side*    red,1side*    red,4side** 规则流:*    rule1,4side,1side* 广播规则流,使用mapstate存储每种规则和对应的事件流数据 eg: {rule1 -> [4side,4side]} 遇到1side到来,则全部输出.* map可存储多个规则

完整scala版本代码

package com.yy.state.operatorStateDemoimport org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, TypeInformation}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.streaming.api.datastream.KeyedStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.util.Collectorimport java.time.ZoneId
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._/*** 广播流 官方案例 scala版本* 广播状态*    https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/* 事件流:*    red,4side*    red,5side*    red,4side*    red,1side*    red,4side** 规则流:*    rule1,4side,1side* 广播规则流,使用mapstate存储每种规则和对应的事件流数据 eg: {rule1 -> [4side,4side]} 遇到1side到来,则全部输出.* map可存储多个规则*/
object BroadcastStateV1 {case class Item(color:Color,shape: Shape){def getShape()={shape}}case class Rule(name:String,first:Shape,second:Shape)case class Color(color:String)case class Shape(shape:String)def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)// 指定国内时区tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))val itemStream = env.socketTextStream("localhost", 9999).map(_.split(",")).map(arr => Item(Color(arr(0)), Shape(arr(1))))val ruleStream = env.socketTextStream("localhost", 9998).broadcast().map(s => Rule(s.split(",")(0), Shape(s.split(",")(1)), Shape(s.split(",")(2))))val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Rule](){}));val ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor)val colorPartitionedStream: KeyedStream[Item, Color] = itemStream.keyBy(new KeySelector[Item, Color] {override def getKey(value: Item): Color = value.color})colorPartitionedStream.connect(ruleBroadcastStream).process(// type arguments in our KeyedBroadcastProcessFunction represent://   1. the key of the keyed stream//   2. the type of elements in the non-broadcast side//   3. the type of elements in the broadcast side//   4. the type of the result, here a stringnew KeyedBroadcastProcessFunction[Color, Item, Rule, String]() {val  mapStateDesc =new MapStateDescriptor("items",BasicTypeInfo.STRING_TYPE_INFO,new ListTypeInfo(classOf[Item]))val ruleStateDescriptor =new MapStateDescriptor("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Rule]() {}))override def processElement(value: Item, ctx: KeyedBroadcastProcessFunction[Color, Item, Rule, String]#ReadOnlyContext, out: Collector[String]): Unit = {val state = getRuntimeContext().getMapState(mapStateDesc)val shape = value.getShape()// 遍历广播的 rulectx.getBroadcastState(ruleStateDescriptor).immutableEntries().asScala.foreach{entry =>val ruleName = entry.getKey()val rule = entry.getValue()val stored: ListBuffer[Item] = {if (state.contains(ruleName)) {state.get(ruleName).asScala.to[ListBuffer]} else {new ListBuffer[Item]()}}//if (shape == rule.second && stored.nonEmpty) {stored.foreach { i =>out.collect("MATCH: " + i + " - " + value);}stored.clear();}// there is no else{} to cover if rule.first == rule.secondif (shape.equals(rule.first)) {stored.append(value);}if (stored.isEmpty) {// 规则已经匹配输出 清理状态state.remove(ruleName)} else {// 没输出则更新状态state.put(ruleName, stored.asJava)}}}override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[Color, Item, Rule, String]#Context, out: Collector[String]): Unit = {ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);}}).print("sink --> ")env.execute("flink-broadcast-state")}
}

相关文章:

flink1.18 广播流 The Broadcast State Pattern 官方案例scala版本

对应官网 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/ 测试数据 * 广播流 官方案例 scala版本* 广播状态* https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance…...

vueRouter中scrollBehavior实现滚动固定位置

使用前端路由,当切换到新路由时,想要页面滚到顶部,或者是保持原先的滚动位置,就像重新加载页面那样。 vue-router 能做到,而且更好,它让你可以自定义路由切换时页面如何滚动。 注意: 这个功能只在 HTML5 h…...

解决WinForms跨线程操作控件的问题

解决WinForms跨线程操作控件的问题 介绍 在构建Windows窗体应用程序时,我们通常会遇到需要从非UI线程更新UI元素的场景。由于WinForms控件并不是线程安全的,直接这样做会抛出一个异常:“控件’control name’是从其他线程创建的,…...

从零开始:Git 上传与使用指南

Git 是一种非常强大的版本控制系统,它可以帮助您在多人协作开发项目中更好地管理代码版本,并确保每个团队成员都能及时地获取最新的代码更改。在使用 Git 进行版本控制之前,您需要先进行一些设置,以确保您的代码能够顺利地与远程仓…...

Docker compose部署Golang服务

Docker Compose 部署 在使用docker部署时,除了使用--link的方式来关联容器之外,还可以使用 docker compose 运行多个容器。 本文以项目:https://github.com/johncxf/go-api 为例。 定义 Dockerfile 我这里用于区分默认 Dockerfile 文件&a…...

Day36 435无重叠区间 763划分字母区间

435 无重叠区间 给定一个区间的集合,找到需要移除区间的最小数量,使剩余区间互不重叠。 注意: 可以认为区间的终点总是大于它的起点。 区间 [1,2] 和 [2,3] 的边界相互“接触”,但没有相互重叠。 本题与上一题类似: 如果按照左…...

【Servlet】如何编写第一个Servlet程序

个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【Servlet】 本专栏旨在分享学习Servlet的一点学习心得,欢迎大家在评论区交流讨论💌 Servlet是Java编写的服务器端…...

读懂比特币—bitcoin代码分析(五)

今天的代码分析主要是 bitcoin/src/init.cpp 文件中的三个函数:AppInitSanityChecks、AppInitLockDataDirectory、AppInitInterfaces,下面我们来说明这三个函数是用来干什么的,并逐行解读函数代码,先贴出源代码如下: …...

uniapp使用uQRCode插件生成二维码的简单使用

最近在找移动端绘制二维码的问题 &#xff0c;直接上代码 下载 weapp-qrcode.js(可以通过npm install weapp-qrcode --save 下载,之后把它父子到untils目录下&#xff09; npm install weapp-qrcode --save在组件页面使用 <canvas id"couponQrcode" canvas-id&qu…...

【寒假每日一题·2024】AcWing 4965. 三国游戏(补)

文章目录 一、题目1、原题链接2、题目描述 二、解题报告1、思路分析2、时间复杂度3、代码详解 一、题目 1、原题链接 4965. 三国游戏 2、题目描述 二、解题报告 1、思路分析 思路参考y总&#xff1a;y总讲解视频 &#xff08;1&#xff09;题目中的获胜情况分为三种&#xff…...

docker 安装mongodb 数据库

1.拉取mongodb镜像 docker pull mongo2.创建文件夹 mkdir -p /home/mongo/conf/ mkdir -p /home/mongo/data/ mkdir -p /home/mongo/logs/3.新增mongod.conf文件 cd /home/mongo/conf && vi mongod.confmongod.conf文件内容&#xff1a; # 数据库文件存储位置 dbpa…...

整数反转算法(leetcode第7题)

题目描述&#xff1a; 给你一个 32 位的有符号整数 x &#xff0c;返回将 x 中的数字部分反转后的结果。如果反转后整数超过 32 位的有符号整数的范围 [−231, 231 − 1] &#xff0c;就返回 0。假设环境不允许存储 64 位整数&#xff08;有符号或无符号&#xff09;。示例 1…...

微信小程序(十)表单组件(入门)

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.type 属性指定表单类型 2.placeholder 属性指定输入框为空时的占位文字 源码&#xff1a; form.wxml <!-- 提前准备好的布局结构代码 --> <view class"register"><view class"…...

xcode 设置 ios苹果图标,为Flutter应用程序配置iOS图标

图标设置 1,根据图片构建各类尺寸的图标2.xcode打开ios文件3.xcode设置图标4.打包提交审核,即可(打包教程可通过我的主页查找) 1,根据图片构建各类尺寸的图标 工具网址:https://icon.wuruihong.com/ 下载之后文件目录如下 拷贝到项目的ios\Runner\Assets.xcassets\AppIcon.ap…...

什么是IDE?新手用哪个IDE比较好?

哈喽大家好&#xff0c;我是咕噜美乐蒂&#xff0c;很高兴又见面啦&#xff01;今天我们来了解一下什么是IDE以及新手应该如何选择IDE比较合适。 一、什么是IDE&#xff1f; IDE&#xff08;Integrated Development Environment&#xff0c;集成开发环境&#xff09;是一种软…...

【数据库学习】pg安装与运维

1&#xff0c;安装与配置 #安装 yum install https:....rpm1&#xff09;安装目录 bin目录&#xff1a;二进制可执行文件目录&#xff0c;此目录下有postgres、psql等可执行程序&#xff1b;pg_ctl工具在此目录&#xff0c;可以通过pg_ctl --help查看具体使用。 conf目录&…...

第二篇【传奇开心果短博文系列】Python的OpenCV库技术点案例示例:图像处理

传奇开心果短博文系列 系列短博文目录Python的OpenCV库技术点案例示例短博文系列 博文目录一、项目目标二、第一个示例代码三、第二个示例代码四、第三个示例代码五、第四个示例代码六、第五个示例代码七、知识点归纳总结 系列短博文目录 Python的OpenCV库技术点案例示例短博文…...

【vue oidc-client】invalid_requestRequest Id: 0HN0OOPFRLSF2:00000002

需求&#xff1a;完成统一登录&#xff0c;需要从三方平台跳到我们的平台。 oidc-client报错记录。这个一般是配置信息出错&#xff0c;需要和三方平台进行沟通&#xff0c;一定要把client_id&#xff0c;密钥进行对应&#xff1b; 同时关于此次出错还修改了以下代码&#xff…...

什么是中间人攻击? ssh 连接出现 Host key verification failed 解决方法

文章目录 前言known_hosts 文件是什么文件路径示例 连接出现 Host key verification failedssh-keygen -R [hostname or ip address]删除整个 known_hosts 文件 其它聊聊中间人攻击ssh 如何保证安全&#xff1f;加密流程漏洞在哪里如何避免中间人攻击 个人简介 前言 最近服务器…...

数据结构系统刷题

本文为系统刷leetcode的记录&#xff0c;会记录自己根据代码随想录刷过的leetcode&#xff0c;方便直接点开刷题&#xff0c;时常更新 时间复杂度简记为s 空间复杂度简记为k 数组 704 二分查找 一维二分查找 &#xff08;1&#xff09;[left, right] class Solution { publi…...

学校招生小程序源码介绍

基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码&#xff0c;专为学校招生场景量身打造&#xff0c;功能实用且操作便捷。 从技术架构来看&#xff0c;ThinkPHP提供稳定可靠的后台服务&#xff0c;FastAdmin加速开发流程&#xff0c;UniApp则保障小程序在多端有良好的兼…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

Java求职者面试指南:计算机基础与源码原理深度解析

Java求职者面试指南&#xff1a;计算机基础与源码原理深度解析 第一轮提问&#xff1a;基础概念问题 1. 请解释什么是进程和线程的区别&#xff1f; 面试官&#xff1a;进程是程序的一次执行过程&#xff0c;是系统进行资源分配和调度的基本单位&#xff1b;而线程是进程中的…...

LLMs 系列实操科普(1)

写在前面&#xff1a; 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容&#xff0c;原视频时长 ~130 分钟&#xff0c;以实操演示主流的一些 LLMs 的使用&#xff0c;由于涉及到实操&#xff0c;实际上并不适合以文字整理&#xff0c;但还是决定尽量整理一份笔…...

零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程

STM32F1 本教程使用零知标准板&#xff08;STM32F103RBT6&#xff09;通过I2C驱动ICM20948九轴传感器&#xff0c;实现姿态解算&#xff0c;并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化&#xff0c;适合嵌入式及物联网开发者。在基础驱动上新增…...

【SpringBoot自动化部署】

SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一&#xff0c;能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时&#xff0c;需要添加Git仓库地址和凭证&#xff0c;设置构建触发器&#xff08;如GitHub…...

沙箱虚拟化技术虚拟机容器之间的关系详解

问题 沙箱、虚拟化、容器三者分开一一介绍的话我知道他们各自都是什么东西&#xff0c;但是如果把三者放在一起&#xff0c;它们之间到底什么关系&#xff1f;又有什么联系呢&#xff1f;我不是很明白&#xff01;&#xff01;&#xff01; 就比如说&#xff1a; 沙箱&#…...

从物理机到云原生:全面解析计算虚拟化技术的演进与应用

前言&#xff1a;我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM&#xff08;Java Virtual Machine&#xff09;让"一次编写&#xff0c;到处运行"成为可能。这个软件层面的虚拟化让我着迷&#xff0c;但直到后来接触VMware和Doc…...

Axure 下拉框联动

实现选省、选完省之后选对应省份下的市区...

JS红宝书笔记 - 3.3 变量

要定义变量&#xff0c;可以使用var操作符&#xff0c;后跟变量名 ES实现变量初始化&#xff0c;因此可以同时定义变量并设置它的值 使用var操作符定义的变量会成为包含它的函数的局部变量。 在函数内定义变量时省略var操作符&#xff0c;可以创建一个全局变量 如果需要定义…...