当前位置: 首页 > news >正文

flink1.17 eventWindow不要配置processTrigger

理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出.

flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃

bug复现操作

idea运行代码后 往source kafka发送一条数据  

a,1,1690304400000

可以看到无限输出:

理论上时间语义不建议混用,但是在rich函数中的确可以做到混用且正常使用

问题复现代码

package com.yy.flinkWindowAndTriggerimport com.yy.flinkWindow.M1
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.time.Time.seconds
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.joda.time.Secondsobject flinkEventWindowAndProcessTriggerBUGLearn {def main(args: Array[String]): Unit = {// flink 启动本地webuival conf = new Configurationconf.setInteger(RestOptions.PORT, 28080)//    val env = StreamExecutionEnvironment.getExecutionEnvironmentval env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)//    val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.configure(conf)/*kafka输入:a,1,1690304400000        //对应 2023-07-26 01:00:00 (无限输出)       //如果传入 a,1,1693037756000 对应:2023-08-26 16:15:56 (1条/s)a,1,7200000               // 1970-01-1 10:00:00*/val brokers = "172.18.105.147:9092"val source = KafkaSource.builder[String].setBootstrapServers(brokers).setTopics("t1").setGroupId("my-group-23asdf46").setStartingOffsets(OffsetsInitializer.latest())// .setDeserializer() // 参数: KafkaRecordDeserializationSchema.setDeserializer(new M1()).build()val ds1 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")val s1 = ds1.map(_.split(",")).map(x => C1(x(0), x(1).toInt, x(2).toLong)) // key number 时间戳.assignTimestampsAndWatermarks(new OTAWatermarks(Time.seconds(0))).keyBy(_.f1).window(TumblingEventTimeWindows.of(seconds(10))).trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](seconds(10L))).reduce((x, y) => C1(x.f1, x.f2 + y.f2, 100L))s1.print()env.execute("KafkaNewSourceAPi")}// 乱序流class OTAWatermarks(time: Time) extends BoundedOutOfOrdernessTimestampExtractor[C1](time) {override def extractTimestamp(element: C1): Long = {element.f3}}// key num timestampcase class C1(f1: String, f2: Int, f3: Long)
}

-

-

maven pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkLocalDemo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>FlinkLocalDemo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.8</scala.version></properties><dependencies><!-- https://mvnrepository.com/artifact/joda-time/joda-time --><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.33</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
<!--            <version>1.2.17</version>--></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 引入flink1.13.0 scala2.12.12   --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- Either... --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- or... --><!--        下面几个是代码中写sql需要的包 四个中一个都不能少 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version>
<!--            <scope>test</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--  https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--        注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存--><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>--><!--            <version>${flink.version}</version>--><!--            <scope>provided</scope>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version><scope>provided</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.11</version></dependency></dependencies><build><plugins><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution></executions><configuration><scalaVersion>${scala.version}</scalaVersion></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.5.5</version><configuration><!--这部分可有可无,加上的话则直接生成可运行jar包--><!--<archive>--><!--<manifest>--><!--<mainClass>${exec.mainClass}</mainClass>--><!--</manifest>--><!--</archive>--><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>

 

相关文章:

flink1.17 eventWindow不要配置processTrigger

理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出. flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃 bug复现操作 idea运行代码后 往source kafka发送一条数据 a,1,1690304400000 可以看到无限输出…...

Python导出SqlServerl数据字典为excel

sql代码 SELECTtableName D.name ,tableIntroduce isnull(F.value, ),sort A.colorder,fieldName A.name,catogary B.name,bytes A.Length,lengths COLUMNPROPERTY(A.id, A.name, PRECISION),scales isnull(COLUMNPROPERTY(A.id, A.name, Scale), 0),isOrNotNull Cas…...

PB:DDE服务器函数

1、GetCommandDDE() 功 能:得到DDE客户应用发送的命令。 语 法:GetCommandDDE ( string ) 参 数:string:string类型的变量,用于保存DDE客户应用发送的命令。 返回值:Integer。函数执行成功时返回1,发生错误时返回-1。如果string参数的值为NULL, GetCommandDDE()…...

awk经典实战、正则表达式

目录 1.筛选给定时间范围内的日志 2.统计独立IP 案列 需求 代码 运行结果 3.根据某字段去重 案例 运行结果 4.正则表达式 1&#xff09;认识正则 2&#xff09;匹配字符 3&#xff09;匹配次数 4&#xff09;位置锚定&#xff1a;定位出现的位置 5&#xff09;分组…...

Python脚本-时间盲注

BlindBool_get import requests from optparse import OptionParser import threading#存放变量 DBName "" DBTables [] DBColumns [] DBData {} flag You are in #设置重连次数以及将连接改为短连接 #防止因为HTTP连接数过多导致的MAX retries exceeded with …...

面试总结-Redis篇章(十)——Redis哨兵模式、集群脑裂

Redis哨兵模式、集群脑裂 哨兵模式哨兵的作用服务状态监控 Redis集群&#xff08;哨兵模式&#xff09;脑裂解决办法 哨兵模式 为了保证Redis的高可用&#xff0c;Redis提供了哨兵模式 哨兵的作用 服务状态监控 Redis集群&#xff08;哨兵模式&#xff09;脑裂 假设由于网络原…...

el-table那些事

el-table那些事 获取el-table所有勾选的行数据 用于记录工作和日常学习遇到的坑&#xff0c;需求。 vue3element-plusts 获取el-table所有勾选的行数据 1、需要先声明一个ref变量&#xff0c;并赋值给el-table 2、通过el-table提供的getSelectionRows()函数获取选中的"行…...

kubernetes(一)

文章目录 1. k8s架构2. k8s集群搭建 1. k8s架构 2. k8s集群搭建...

计算机网络(6) --- https协议

计算机网络&#xff08;5&#xff09; --- http协议_哈里沃克的博客-CSDN博客http协议https://blog.csdn.net/m0_63488627/article/details/132089130?spm1001.2014.3001.5501 目录 1.HTTPS的出现 1.HTTPS协议介绍 2.补充概念 1.加密 1.解释 2.原因 3.加密方式 对称加…...

(三)Node.js - 模块化

1. Node.js中的模块化 Node.js中根据模块来源不同&#xff0c;将模块分为了3大类&#xff0c;分别是&#xff1a; 内置模块&#xff1a;内置模块由Node.js官方提供的&#xff0c;例如fs、path、http等自定义模块&#xff1a;用户创建的每个.js文件&#xff0c;都是自定义模块…...

502 bad gateway报错

代码在本地运行可以正常访问后端接口&#xff0c;部署服务器报错502。直接检查防火墙状态是否开启&#xff0c;先关闭防火墙试一下。如果是防火墙的原因在打开防火墙&#xff0c;开放需要的端口即可。 1、先查看防火墙状态&#xff1a; systemctl status firewalld2、停止防火…...

Flink学习教程

最近因为用到了Flink&#xff0c;所以博主开了《Flink教程》专栏来记录Flink的学习笔记。 【Apache Flink v1.16 中文文档】 【官网 - Apache Flink v1.3 中文文档】 一、基础 参考链接如下&#xff1a; Flink教程&#xff08;01&#xff09;- Flink知识图谱Flink教程&…...

flutter开发实战-实现音效soundpool播放音频及控制播放暂停停止设置音量

flutter开发实战-实现音效soundpool播放音频 最近开发过程中遇到低配置设备时候&#xff0c;在Media播放音频时候出现音轨限制问题。所以将部分音频采用音效sound来播放。 一、音效类似iOS中的Sound 在iOS中使用sound来播放mp3音频示例如下 // 通过通知的Sound设置为voip_c…...

Sequence 2023牛客暑期多校训练营6 E

登录—专业IT笔试面试备考平台_牛客网 题目大意&#xff1a;有一长度为n的数组a&#xff0c;有q次询问&#xff0c;每次要求将[l,r]的区间分成k个连续区间&#xff0c;满足每个区间和都是偶数&#xff0c;能满足要求就输出YES 1<n,q<1e5;0<ai<1e10;1<l<r&l…...

【ASP.NET MVC】使用动软(二)(10)

一、添加动软生成工程 按前文添加动态到工程 双击动软 完成新建数据库服务器后 &#xff0c;需要关闭重新打开 选择简单三层&#xff0c;注意保存位置 注意切换数据库&#xff1a; 生成后拷贝五个文件夹到工程目录 注意目录结构&#xff1a; 添加四个项目到原来的工程&…...

STM32入门学习之独立看门狗(IWDG)

1.STM32的独立看门狗是一个具有独立时钟的片上外设。通常&#xff0c;为了防止程序卡死&#xff0c;可以设置看门狗定时复位。当看看门狗被使能之后&#xff0c;会按初始化时设置的计数值进行计数。当根据计数值计数的倒数时间为0时&#xff0c;便会自动复位程序&#xff0c;即…...

抖音seo矩阵系统源码搭建开发详解

抖音SEO矩阵系统是一个用于提高抖音视频在搜索引擎排名的工具。如果你想开发自己的抖音SEO矩阵系统&#xff0c;以下是详细的步骤&#xff1a; 开发步骤详解&#xff1a; 确定你需要的功能和算法 抖音SEO矩阵系统包含很多功能&#xff0c;比如关键词研究、内容优化、链接建设、…...

2685. 统计完全连通分量的数量;2718. 查询后矩阵的和;1600. 王位继承顺序

2685. 统计完全连通分量的数量 核心思想&#xff1a;枚举所有的连通分量&#xff0c;然后判断这些连通分量是不是完全连通分量&#xff0c;完全连通分量满足边数2e 点数v(v-1)。 2718. 查询后矩阵的和 核心思想&#xff1a;后面的改变更重要&#xff0c;所以我们直接逆向思维…...

SpringBoot统一功能处理(AOP思想实现)(统一用户登录权限验证 / 异常处理 / 数据格式返回)

主要是三个处理&#xff1a; 1、统一用户登录权限验证&#xff1b; 2、统一异常处理&#xff1b; 3、统一数据格式返回。 目录 一、用户登录权限校验 &#x1f345; 1、使用拦截器 &#x1f388; 1.1自定义拦截器 &#x1f388; 1.2 设置自定义拦截器 &#x1f388;创建cont…...

git stash 用法

起始 今天在看一个bug&#xff0c;之前一个分支的版本是正常的&#xff0c;在新的分支上上加了很多日志没找到原因&#xff0c;希望回溯到之前的版本&#xff0c;确定下从哪个提交引入的问题&#xff0c;但是还不想把现在的修改提交&#xff0c;也不希望在Git上看到当前修改的…...

工业安全零事故的智能守护者:一体化AI智能安防平台

前言&#xff1a; 通过AI视觉技术&#xff0c;为船厂提供全面的安全监控解决方案&#xff0c;涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面&#xff0c;能够实现对应负责人反馈机制&#xff0c;并最终实现数据的统计报表。提升船厂…...

高频面试之3Zookeeper

高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个&#xff1f;3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制&#xff08;过半机制&#xff0…...

测试markdown--肇兴

day1&#xff1a; 1、去程&#xff1a;7:04 --11:32高铁 高铁右转上售票大厅2楼&#xff0c;穿过候车厅下一楼&#xff0c;上大巴车 &#xffe5;10/人 **2、到达&#xff1a;**12点多到达寨子&#xff0c;买门票&#xff0c;美团/抖音&#xff1a;&#xffe5;78人 3、中饭&a…...

《通信之道——从微积分到 5G》读书总结

第1章 绪 论 1.1 这是一本什么样的书 通信技术&#xff0c;说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号&#xff08;调制&#xff09; 把信息从信号中抽取出来&am…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题

在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件&#xff0c;这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下&#xff0c;实现高效测试与快速迭代&#xff1f;这一命题正考验着…...

人工智能--安全大模型训练计划:基于Fine-tuning + LLM Agent

安全大模型训练计划&#xff1a;基于Fine-tuning LLM Agent 1. 构建高质量安全数据集 目标&#xff1a;为安全大模型创建高质量、去偏、符合伦理的训练数据集&#xff0c;涵盖安全相关任务&#xff08;如有害内容检测、隐私保护、道德推理等&#xff09;。 1.1 数据收集 描…...

tauri项目,如何在rust端读取电脑环境变量

如果想在前端通过调用来获取环境变量的值&#xff0c;可以通过标准的依赖&#xff1a; std::env::var(name).ok() 想在前端通过调用来获取&#xff0c;可以写一个command函数&#xff1a; #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...

Vue 模板语句的数据来源

&#x1f9e9; Vue 模板语句的数据来源&#xff1a;全方位解析 Vue 模板&#xff08;<template> 部分&#xff09;中的表达式、指令绑定&#xff08;如 v-bind, v-on&#xff09;和插值&#xff08;{{ }}&#xff09;都在一个特定的作用域内求值。这个作用域由当前 组件…...