flink1.17 eventWindow不要配置processTrigger
理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出.
flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃
bug复现操作
idea运行代码后 往source kafka发送一条数据
a,1,1690304400000
可以看到无限输出:

理论上时间语义不建议混用,但是在rich函数中的确可以做到混用且正常使用
问题复现代码
package com.yy.flinkWindowAndTriggerimport com.yy.flinkWindow.M1
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.time.Time.seconds
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.joda.time.Secondsobject flinkEventWindowAndProcessTriggerBUGLearn {def main(args: Array[String]): Unit = {// flink 启动本地webuival conf = new Configurationconf.setInteger(RestOptions.PORT, 28080)// val env = StreamExecutionEnvironment.getExecutionEnvironmentval env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)// val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.configure(conf)/*kafka输入:a,1,1690304400000 //对应 2023-07-26 01:00:00 (无限输出) //如果传入 a,1,1693037756000 对应:2023-08-26 16:15:56 (1条/s)a,1,7200000 // 1970-01-1 10:00:00*/val brokers = "172.18.105.147:9092"val source = KafkaSource.builder[String].setBootstrapServers(brokers).setTopics("t1").setGroupId("my-group-23asdf46").setStartingOffsets(OffsetsInitializer.latest())// .setDeserializer() // 参数: KafkaRecordDeserializationSchema.setDeserializer(new M1()).build()val ds1 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")val s1 = ds1.map(_.split(",")).map(x => C1(x(0), x(1).toInt, x(2).toLong)) // key number 时间戳.assignTimestampsAndWatermarks(new OTAWatermarks(Time.seconds(0))).keyBy(_.f1).window(TumblingEventTimeWindows.of(seconds(10))).trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](seconds(10L))).reduce((x, y) => C1(x.f1, x.f2 + y.f2, 100L))s1.print()env.execute("KafkaNewSourceAPi")}// 乱序流class OTAWatermarks(time: Time) extends BoundedOutOfOrdernessTimestampExtractor[C1](time) {override def extractTimestamp(element: C1): Long = {element.f3}}// key num timestampcase class C1(f1: String, f2: Int, f3: Long)
}
-

-
maven pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkLocalDemo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>FlinkLocalDemo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.8</scala.version></properties><dependencies><!-- https://mvnrepository.com/artifact/joda-time/joda-time --><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.33</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
<!-- <version>1.2.17</version>--></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 引入flink1.13.0 scala2.12.12 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- Either... --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- or... --><!-- 下面几个是代码中写sql需要的包 四个中一个都不能少 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version>
<!-- <scope>test</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!-- 注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存--><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>--><!-- <version>${flink.version}</version>--><!-- <scope>provided</scope>--><!-- </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version><scope>provided</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.11</version></dependency></dependencies><build><plugins><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution></executions><configuration><scalaVersion>${scala.version}</scalaVersion></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.5.5</version><configuration><!--这部分可有可无,加上的话则直接生成可运行jar包--><!--<archive>--><!--<manifest>--><!--<mainClass>${exec.mainClass}</mainClass>--><!--</manifest>--><!--</archive>--><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>
相关文章:
flink1.17 eventWindow不要配置processTrigger
理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出. flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃 bug复现操作 idea运行代码后 往source kafka发送一条数据 a,1,1690304400000 可以看到无限输出…...
Python导出SqlServerl数据字典为excel
sql代码 SELECTtableName D.name ,tableIntroduce isnull(F.value, ),sort A.colorder,fieldName A.name,catogary B.name,bytes A.Length,lengths COLUMNPROPERTY(A.id, A.name, PRECISION),scales isnull(COLUMNPROPERTY(A.id, A.name, Scale), 0),isOrNotNull Cas…...
PB:DDE服务器函数
1、GetCommandDDE() 功 能:得到DDE客户应用发送的命令。 语 法:GetCommandDDE ( string ) 参 数:string:string类型的变量,用于保存DDE客户应用发送的命令。 返回值:Integer。函数执行成功时返回1,发生错误时返回-1。如果string参数的值为NULL, GetCommandDDE()…...
awk经典实战、正则表达式
目录 1.筛选给定时间范围内的日志 2.统计独立IP 案列 需求 代码 运行结果 3.根据某字段去重 案例 运行结果 4.正则表达式 1)认识正则 2)匹配字符 3)匹配次数 4)位置锚定:定位出现的位置 5)分组…...
Python脚本-时间盲注
BlindBool_get import requests from optparse import OptionParser import threading#存放变量 DBName "" DBTables [] DBColumns [] DBData {} flag You are in #设置重连次数以及将连接改为短连接 #防止因为HTTP连接数过多导致的MAX retries exceeded with …...
面试总结-Redis篇章(十)——Redis哨兵模式、集群脑裂
Redis哨兵模式、集群脑裂 哨兵模式哨兵的作用服务状态监控 Redis集群(哨兵模式)脑裂解决办法 哨兵模式 为了保证Redis的高可用,Redis提供了哨兵模式 哨兵的作用 服务状态监控 Redis集群(哨兵模式)脑裂 假设由于网络原…...
el-table那些事
el-table那些事 获取el-table所有勾选的行数据 用于记录工作和日常学习遇到的坑,需求。 vue3element-plusts 获取el-table所有勾选的行数据 1、需要先声明一个ref变量,并赋值给el-table 2、通过el-table提供的getSelectionRows()函数获取选中的"行…...
kubernetes(一)
文章目录 1. k8s架构2. k8s集群搭建 1. k8s架构 2. k8s集群搭建...
计算机网络(6) --- https协议
计算机网络(5) --- http协议_哈里沃克的博客-CSDN博客http协议https://blog.csdn.net/m0_63488627/article/details/132089130?spm1001.2014.3001.5501 目录 1.HTTPS的出现 1.HTTPS协议介绍 2.补充概念 1.加密 1.解释 2.原因 3.加密方式 对称加…...
(三)Node.js - 模块化
1. Node.js中的模块化 Node.js中根据模块来源不同,将模块分为了3大类,分别是: 内置模块:内置模块由Node.js官方提供的,例如fs、path、http等自定义模块:用户创建的每个.js文件,都是自定义模块…...
502 bad gateway报错
代码在本地运行可以正常访问后端接口,部署服务器报错502。直接检查防火墙状态是否开启,先关闭防火墙试一下。如果是防火墙的原因在打开防火墙,开放需要的端口即可。 1、先查看防火墙状态: systemctl status firewalld2、停止防火…...
Flink学习教程
最近因为用到了Flink,所以博主开了《Flink教程》专栏来记录Flink的学习笔记。 【Apache Flink v1.16 中文文档】 【官网 - Apache Flink v1.3 中文文档】 一、基础 参考链接如下: Flink教程(01)- Flink知识图谱Flink教程&…...
flutter开发实战-实现音效soundpool播放音频及控制播放暂停停止设置音量
flutter开发实战-实现音效soundpool播放音频 最近开发过程中遇到低配置设备时候,在Media播放音频时候出现音轨限制问题。所以将部分音频采用音效sound来播放。 一、音效类似iOS中的Sound 在iOS中使用sound来播放mp3音频示例如下 // 通过通知的Sound设置为voip_c…...
Sequence 2023牛客暑期多校训练营6 E
登录—专业IT笔试面试备考平台_牛客网 题目大意:有一长度为n的数组a,有q次询问,每次要求将[l,r]的区间分成k个连续区间,满足每个区间和都是偶数,能满足要求就输出YES 1<n,q<1e5;0<ai<1e10;1<l<r&l…...
【ASP.NET MVC】使用动软(二)(10)
一、添加动软生成工程 按前文添加动态到工程 双击动软 完成新建数据库服务器后 ,需要关闭重新打开 选择简单三层,注意保存位置 注意切换数据库: 生成后拷贝五个文件夹到工程目录 注意目录结构: 添加四个项目到原来的工程&…...
STM32入门学习之独立看门狗(IWDG)
1.STM32的独立看门狗是一个具有独立时钟的片上外设。通常,为了防止程序卡死,可以设置看门狗定时复位。当看看门狗被使能之后,会按初始化时设置的计数值进行计数。当根据计数值计数的倒数时间为0时,便会自动复位程序,即…...
抖音seo矩阵系统源码搭建开发详解
抖音SEO矩阵系统是一个用于提高抖音视频在搜索引擎排名的工具。如果你想开发自己的抖音SEO矩阵系统,以下是详细的步骤: 开发步骤详解: 确定你需要的功能和算法 抖音SEO矩阵系统包含很多功能,比如关键词研究、内容优化、链接建设、…...
2685. 统计完全连通分量的数量;2718. 查询后矩阵的和;1600. 王位继承顺序
2685. 统计完全连通分量的数量 核心思想:枚举所有的连通分量,然后判断这些连通分量是不是完全连通分量,完全连通分量满足边数2e 点数v(v-1)。 2718. 查询后矩阵的和 核心思想:后面的改变更重要,所以我们直接逆向思维…...
SpringBoot统一功能处理(AOP思想实现)(统一用户登录权限验证 / 异常处理 / 数据格式返回)
主要是三个处理: 1、统一用户登录权限验证; 2、统一异常处理; 3、统一数据格式返回。 目录 一、用户登录权限校验 🍅 1、使用拦截器 🎈 1.1自定义拦截器 🎈 1.2 设置自定义拦截器 🎈创建cont…...
git stash 用法
起始 今天在看一个bug,之前一个分支的版本是正常的,在新的分支上上加了很多日志没找到原因,希望回溯到之前的版本,确定下从哪个提交引入的问题,但是还不想把现在的修改提交,也不希望在Git上看到当前修改的…...
【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...
【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...
IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...
