当前位置: 首页 > news >正文

flink1.17 eventWindow不要配置processTrigger

理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出.

flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃

bug复现操作

idea运行代码后 往source kafka发送一条数据  

a,1,1690304400000

可以看到无限输出:

理论上时间语义不建议混用,但是在rich函数中的确可以做到混用且正常使用

问题复现代码

package com.yy.flinkWindowAndTriggerimport com.yy.flinkWindow.M1
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.time.Time.seconds
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.joda.time.Secondsobject flinkEventWindowAndProcessTriggerBUGLearn {def main(args: Array[String]): Unit = {// flink 启动本地webuival conf = new Configurationconf.setInteger(RestOptions.PORT, 28080)//    val env = StreamExecutionEnvironment.getExecutionEnvironmentval env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)//    val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.configure(conf)/*kafka输入:a,1,1690304400000        //对应 2023-07-26 01:00:00 (无限输出)       //如果传入 a,1,1693037756000 对应:2023-08-26 16:15:56 (1条/s)a,1,7200000               // 1970-01-1 10:00:00*/val brokers = "172.18.105.147:9092"val source = KafkaSource.builder[String].setBootstrapServers(brokers).setTopics("t1").setGroupId("my-group-23asdf46").setStartingOffsets(OffsetsInitializer.latest())// .setDeserializer() // 参数: KafkaRecordDeserializationSchema.setDeserializer(new M1()).build()val ds1 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")val s1 = ds1.map(_.split(",")).map(x => C1(x(0), x(1).toInt, x(2).toLong)) // key number 时间戳.assignTimestampsAndWatermarks(new OTAWatermarks(Time.seconds(0))).keyBy(_.f1).window(TumblingEventTimeWindows.of(seconds(10))).trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](seconds(10L))).reduce((x, y) => C1(x.f1, x.f2 + y.f2, 100L))s1.print()env.execute("KafkaNewSourceAPi")}// 乱序流class OTAWatermarks(time: Time) extends BoundedOutOfOrdernessTimestampExtractor[C1](time) {override def extractTimestamp(element: C1): Long = {element.f3}}// key num timestampcase class C1(f1: String, f2: Int, f3: Long)
}

-

-

maven pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkLocalDemo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>FlinkLocalDemo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.8</scala.version></properties><dependencies><!-- https://mvnrepository.com/artifact/joda-time/joda-time --><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.33</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
<!--            <version>1.2.17</version>--></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 引入flink1.13.0 scala2.12.12   --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- Either... --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- or... --><!--        下面几个是代码中写sql需要的包 四个中一个都不能少 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version>
<!--            <scope>test</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--  https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--        注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存--><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>--><!--            <version>${flink.version}</version>--><!--            <scope>provided</scope>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version><scope>provided</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.11</version></dependency></dependencies><build><plugins><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution></executions><configuration><scalaVersion>${scala.version}</scalaVersion></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.5.5</version><configuration><!--这部分可有可无,加上的话则直接生成可运行jar包--><!--<archive>--><!--<manifest>--><!--<mainClass>${exec.mainClass}</mainClass>--><!--</manifest>--><!--</archive>--><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>

 

相关文章:

flink1.17 eventWindow不要配置processTrigger

理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出. flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃 bug复现操作 idea运行代码后 往source kafka发送一条数据 a,1,1690304400000 可以看到无限输出…...

Python导出SqlServerl数据字典为excel

sql代码 SELECTtableName D.name ,tableIntroduce isnull(F.value, ),sort A.colorder,fieldName A.name,catogary B.name,bytes A.Length,lengths COLUMNPROPERTY(A.id, A.name, PRECISION),scales isnull(COLUMNPROPERTY(A.id, A.name, Scale), 0),isOrNotNull Cas…...

PB:DDE服务器函数

1、GetCommandDDE() 功 能:得到DDE客户应用发送的命令。 语 法:GetCommandDDE ( string ) 参 数:string:string类型的变量,用于保存DDE客户应用发送的命令。 返回值:Integer。函数执行成功时返回1,发生错误时返回-1。如果string参数的值为NULL, GetCommandDDE()…...

awk经典实战、正则表达式

目录 1.筛选给定时间范围内的日志 2.统计独立IP 案列 需求 代码 运行结果 3.根据某字段去重 案例 运行结果 4.正则表达式 1&#xff09;认识正则 2&#xff09;匹配字符 3&#xff09;匹配次数 4&#xff09;位置锚定&#xff1a;定位出现的位置 5&#xff09;分组…...

Python脚本-时间盲注

BlindBool_get import requests from optparse import OptionParser import threading#存放变量 DBName "" DBTables [] DBColumns [] DBData {} flag You are in #设置重连次数以及将连接改为短连接 #防止因为HTTP连接数过多导致的MAX retries exceeded with …...

面试总结-Redis篇章(十)——Redis哨兵模式、集群脑裂

Redis哨兵模式、集群脑裂 哨兵模式哨兵的作用服务状态监控 Redis集群&#xff08;哨兵模式&#xff09;脑裂解决办法 哨兵模式 为了保证Redis的高可用&#xff0c;Redis提供了哨兵模式 哨兵的作用 服务状态监控 Redis集群&#xff08;哨兵模式&#xff09;脑裂 假设由于网络原…...

el-table那些事

el-table那些事 获取el-table所有勾选的行数据 用于记录工作和日常学习遇到的坑&#xff0c;需求。 vue3element-plusts 获取el-table所有勾选的行数据 1、需要先声明一个ref变量&#xff0c;并赋值给el-table 2、通过el-table提供的getSelectionRows()函数获取选中的"行…...

kubernetes(一)

文章目录 1. k8s架构2. k8s集群搭建 1. k8s架构 2. k8s集群搭建...

计算机网络(6) --- https协议

计算机网络&#xff08;5&#xff09; --- http协议_哈里沃克的博客-CSDN博客http协议https://blog.csdn.net/m0_63488627/article/details/132089130?spm1001.2014.3001.5501 目录 1.HTTPS的出现 1.HTTPS协议介绍 2.补充概念 1.加密 1.解释 2.原因 3.加密方式 对称加…...

(三)Node.js - 模块化

1. Node.js中的模块化 Node.js中根据模块来源不同&#xff0c;将模块分为了3大类&#xff0c;分别是&#xff1a; 内置模块&#xff1a;内置模块由Node.js官方提供的&#xff0c;例如fs、path、http等自定义模块&#xff1a;用户创建的每个.js文件&#xff0c;都是自定义模块…...

502 bad gateway报错

代码在本地运行可以正常访问后端接口&#xff0c;部署服务器报错502。直接检查防火墙状态是否开启&#xff0c;先关闭防火墙试一下。如果是防火墙的原因在打开防火墙&#xff0c;开放需要的端口即可。 1、先查看防火墙状态&#xff1a; systemctl status firewalld2、停止防火…...

Flink学习教程

最近因为用到了Flink&#xff0c;所以博主开了《Flink教程》专栏来记录Flink的学习笔记。 【Apache Flink v1.16 中文文档】 【官网 - Apache Flink v1.3 中文文档】 一、基础 参考链接如下&#xff1a; Flink教程&#xff08;01&#xff09;- Flink知识图谱Flink教程&…...

flutter开发实战-实现音效soundpool播放音频及控制播放暂停停止设置音量

flutter开发实战-实现音效soundpool播放音频 最近开发过程中遇到低配置设备时候&#xff0c;在Media播放音频时候出现音轨限制问题。所以将部分音频采用音效sound来播放。 一、音效类似iOS中的Sound 在iOS中使用sound来播放mp3音频示例如下 // 通过通知的Sound设置为voip_c…...

Sequence 2023牛客暑期多校训练营6 E

登录—专业IT笔试面试备考平台_牛客网 题目大意&#xff1a;有一长度为n的数组a&#xff0c;有q次询问&#xff0c;每次要求将[l,r]的区间分成k个连续区间&#xff0c;满足每个区间和都是偶数&#xff0c;能满足要求就输出YES 1<n,q<1e5;0<ai<1e10;1<l<r&l…...

【ASP.NET MVC】使用动软(二)(10)

一、添加动软生成工程 按前文添加动态到工程 双击动软 完成新建数据库服务器后 &#xff0c;需要关闭重新打开 选择简单三层&#xff0c;注意保存位置 注意切换数据库&#xff1a; 生成后拷贝五个文件夹到工程目录 注意目录结构&#xff1a; 添加四个项目到原来的工程&…...

STM32入门学习之独立看门狗(IWDG)

1.STM32的独立看门狗是一个具有独立时钟的片上外设。通常&#xff0c;为了防止程序卡死&#xff0c;可以设置看门狗定时复位。当看看门狗被使能之后&#xff0c;会按初始化时设置的计数值进行计数。当根据计数值计数的倒数时间为0时&#xff0c;便会自动复位程序&#xff0c;即…...

抖音seo矩阵系统源码搭建开发详解

抖音SEO矩阵系统是一个用于提高抖音视频在搜索引擎排名的工具。如果你想开发自己的抖音SEO矩阵系统&#xff0c;以下是详细的步骤&#xff1a; 开发步骤详解&#xff1a; 确定你需要的功能和算法 抖音SEO矩阵系统包含很多功能&#xff0c;比如关键词研究、内容优化、链接建设、…...

2685. 统计完全连通分量的数量;2718. 查询后矩阵的和;1600. 王位继承顺序

2685. 统计完全连通分量的数量 核心思想&#xff1a;枚举所有的连通分量&#xff0c;然后判断这些连通分量是不是完全连通分量&#xff0c;完全连通分量满足边数2e 点数v(v-1)。 2718. 查询后矩阵的和 核心思想&#xff1a;后面的改变更重要&#xff0c;所以我们直接逆向思维…...

SpringBoot统一功能处理(AOP思想实现)(统一用户登录权限验证 / 异常处理 / 数据格式返回)

主要是三个处理&#xff1a; 1、统一用户登录权限验证&#xff1b; 2、统一异常处理&#xff1b; 3、统一数据格式返回。 目录 一、用户登录权限校验 &#x1f345; 1、使用拦截器 &#x1f388; 1.1自定义拦截器 &#x1f388; 1.2 设置自定义拦截器 &#x1f388;创建cont…...

git stash 用法

起始 今天在看一个bug&#xff0c;之前一个分支的版本是正常的&#xff0c;在新的分支上上加了很多日志没找到原因&#xff0c;希望回溯到之前的版本&#xff0c;确定下从哪个提交引入的问题&#xff0c;但是还不想把现在的修改提交&#xff0c;也不希望在Git上看到当前修改的…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:

在 HarmonyOS 应用开发中&#xff0c;手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力&#xff0c;既支持点击、长按、拖拽等基础单一手势的精细控制&#xff0c;也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档&#xff0c…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

高等数学(下)题型笔记(八)空间解析几何与向量代数

目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

关于 WASM:1. WASM 基础原理

一、WASM 简介 1.1 WebAssembly 是什么&#xff1f; WebAssembly&#xff08;WASM&#xff09; 是一种能在现代浏览器中高效运行的二进制指令格式&#xff0c;它不是传统的编程语言&#xff0c;而是一种 低级字节码格式&#xff0c;可由高级语言&#xff08;如 C、C、Rust&am…...

IT供电系统绝缘监测及故障定位解决方案

随着新能源的快速发展&#xff0c;光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域&#xff0c;IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选&#xff0c;但在长期运行中&#xff0c;例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...