当前位置: 首页 > news >正文

Flink——进行数据转换时,报:Recovery is suppressed by NoRestartBackoffTimeStrategy

热词统计案例:

用flink中的窗口函数(apply)读取kafka中数据,并对热词进行统计。

apply:全量聚合函数,指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)。

代码演示:

kafka发送消息端: 

package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class Demo01_windows_kafka发消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一种Properties properties = new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 创建了一个消息生产者对象KafkaProducer kafkaProducer = new KafkaProducer<>(properties);String[] arr = {"联通换猫","遥遥领先","恒大歌舞团","恒大足球队","郑州烂尾楼"};Random random = new Random();for (int i = 0; i < 500; i++) {ProducerRecord record = new ProducerRecord<>("topic1",arr[random.nextInt(arr.length)]);// 调用这个里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();}
}

kafka接受消息端: 

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-数据处理转换DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(tuple2 -> tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分组key    {"俄乌战争",[1,1,1,1,1]}TimeWindow window, // 窗口对象Iterable<Tuple2<String, Integer>> input, // 分组key在窗口的所有数据Collector<String> out  // 用于输出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具类String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-执行env.execute();}
}

当执行kafka接收消息端时,会报如下错误: 

 错误原因:在对kafka中数据进行KeyBy分组处理时,使用了lambda表达式

 

解决方法:

在使用KeyBy时,将函数的各种参数类型都写清楚,修改后的代码如下:

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-数据处理转换DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分组key    {"俄乌战争",[1,1,1,1,1]}TimeWindow window, // 窗口对象Iterable<Tuple2<String, Integer>> input, // 分组key在窗口的所有数据Collector<String> out  // 用于输出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具类String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-执行env.execute();}
}

相关文章:

Flink——进行数据转换时,报:Recovery is suppressed by NoRestartBackoffTimeStrategy

热词统计案例&#xff1a; 用flink中的窗口函数&#xff08;apply&#xff09;读取kafka中数据&#xff0c;并对热词进行统计。 apply:全量聚合函数&#xff0c;指在窗口触发的时候才会对窗口内的所有数据进行一次计算&#xff08;等窗口的数据到齐&#xff0c;才开始进行聚合…...

技能之发布自己的依赖到npm上

目录 开始 解决 步骤一&#xff1a; 步骤二&#xff1a; 步骤三&#xff1a; 运用 一直以为自己的项目在github上有了&#xff08;之传了github&#xff09;就可以进行npm install下载&#xff0c;有没有和我一样萌萌的同学。没事&#xff0c;萌萌乎乎的不犯罪。 偶然的机…...

COMSOL工作站:配置指南与性能优化

COMSOL Multiphysics 求解的问题类型相当广泛&#xff0c;提供了仿真单一物理场以及灵活耦合多个物理场的功能&#xff0c;供工程师和科研人员来精确分析各个工程领域的设备、工艺和流程。 软件内置的#模型开发器#包含完整的建模工作流程&#xff0c;可实现从几何建模、材料参数…...

Qt导出Excel图表

目的 就是利用Qt导出Excel图表,如果直接画Excel 图表&#xff0c;比较麻烦些&#xff0c;代码写得也复杂了&#xff1b;而直接利用Excel模块就简单了&#xff0c;图表在模块当中已经是现成的了&#xff0c;Qt程序只更改数据就可以了&#xff0c;这篇文章就是记录一下利用模块上…...

分布式协同 - 分布式系统的特性与互斥问题

文章目录 导图概述分布式系统的特性与挑战分布式互斥算法的目标分布式互斥算法集中互斥算法集中互斥算法示意图集中互斥算法流程 基于许可的互斥算法Lamport 算法示意图Lamport 流程 令牌环互斥算法令牌环互斥算法示意图 1. 集中互斥算法&#xff08;Centralized Mutual Exclus…...

windows安装itop

本文介绍 win10 安装 itop 安装WAMP集成环境前 先安装visual c 安装itop前需要安装WAMP集成环境(windowsApacheMysqlPHP) 所需文件百度云盘 通过网盘分享的文件&#xff1a;itop.zip 链接: https://pan.baidu.com/s/1D5HrKdbyEaYBZ8_IebDQxQ 提取码: m9fh 步骤一&#xff1…...

LAMP环境的部署

一、软件安装介绍 在Linux系统中安装软件有rpm安装、yum安装、源码安装等方法&#xff0c;在这里主要给大家介绍 yum 安装&#xff0c;这是一种最简单方便的一种安装方法。 YUM&#xff08;Yellow dog Upadate Modifie&#xff09;是改进版的 RPM 管理器&#xff0c;很好地解…...

Go语言压缩文件处理

目录 Go 语言压缩文件处理1. 压缩文件&#xff1a;Zip函数2. 解压文件&#xff1a;UnZip 函数3. 小结 Go 语言压缩文件处理 在现代的应用开发中&#xff0c;处理压缩文件&#xff08;如 .zip 格式&#xff09;是常见的需求。Go 语言提供了内置的 archive/zip 包来处理 .zip 文…...

rocylinux9.4安装prometheus监控

一.上传软件包 具体的软件包如下&#xff0c;其中kubernetes-mixin是下载的监控kubernetes的一些监控规则、dashbaordd等。 二.Prometheus配置 1.promethes软件安装 #解压上传后的软件包 [rootlocalhost ] cd /opt [rootlocalhost opt]# tar xf prometheus-2.35.3.linux-amd…...

屏幕分辨率|尺寸|颜色深度指纹

一、前端通过window.screen接口获取屏幕分辨率 尺寸 颜色深度&#xff0c;横屏竖屏信息。 二、window.screen c接口实现&#xff1a; 1、third_party\blink\renderer\core\frame\screen.idl // https://drafts.csswg.org/cssom-view/#the-screen-interface[ExposedWindow ] …...

docker-elasticsearch-kibana-logstash

一、安装 Elasticsearch 尝试直接拉取 Elasticsearch 镜像&#xff1a; 执行 docker pull docker.elastic.co/elasticsearch/elasticsearch&#xff0c;拉取失败&#xff0c;错误提示为 “Error response from daemon: manifest for docker.elastic.co/elasticsearch/elasticse…...

C#设计模式——抽象工厂模式(重点)

文章目录 项目地址一、抽象工厂模式1.1 特性1.2 使用反射获取特性标记的类1.3 完整代码 项目地址 教程作者&#xff1a;教程地址&#xff1a; 代码仓库地址&#xff1a; 所用到的框架和插件&#xff1a; dbt airflow一、抽象工厂模式 工厂方法模式依然存在一个问题就是&…...

全新AI模型家族登场:完全可复现的开源语言模型OLMo 2

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…...

用Matlab和SIMULINK实现DPCM仿真和双边带调幅系统仿真

1、使用SIMULINK或Matlab实现DPCM仿真 1.1 DPCM原理 差分脉冲编码调制&#xff0c;简称DPCM&#xff0c;主要用于将模拟信号转换为数字信号&#xff0c;同时减少数据的冗余度以实现数据压缩。在DPCM中&#xff0c;信号的每个抽样值不是独立编码的&#xff0c;而是通过预测前一…...

RabbitMQ的交换机总结

1.direct交换机 2.fanout交换机...

Android so库的编译

在没弄明白so库编译的关系前,直接看网上博主的博文,常常会觉得云里雾里的,为什么一会儿通过Android工程cmake编译,一会儿又通过NDK命令去编译。两者编译的so库有什么区别? android版第三方库编译总体思路: 对于新手小白来说搞明白上面的总体思路图很有必…...

2024年底-Arch linux或转为0BSD许可证!

原文&#xff1a;https://archlinux.org/news/providing-a-license-for-package-sources/ 解读&#xff1a;Arch Linux社区通过RFC 40达成共识&#xff0c;决定将所有软件包源代码更改为0BSD许可证。 0BSD许可证是什么&#xff1f;&#xff1a;这是一个非常自由的开源许可证&a…...

深入解析音视频流媒体SIP协议交互过程

一、引言 在音视频流媒体传输过程中&#xff0c;SIP&#xff08;Session Initiation Protocol&#xff09;协议发挥着举足轻重的作用。本文将详细全面地介绍音视频流媒体传输中的SIP协议&#xff0c;包括其基本概念、交互过程、关键信令以及应用场景 二、SIP协议基本概念 1.…...

linux安装mysql8.0.40

一、下载MySQL安装包 1.查看glibc版本 rpm -qa | grep glibc 2.到mysql官网下载安装包 ​ 二、解压安装 1.上传压缩包纸/usr/local 目录下&#xff0c;解压&#xff1a; tar -xvf mysql-8.0.40-linux-glibc2.17-x86_64.tar.xz 2.重命名&#xff1a; mv mysql-8.0.40-linux-…...

Java基础之控制语句:开启编程逻辑之门

一、Java控制语句概述 Java 中的控制语句主要分为选择结构、循环结构和跳转语句三大类&#xff0c;它们在程序中起着至关重要的作用&#xff0c;能够决定程序的执行流程。 选择结构用于根据不同的条件执行不同的代码路径&#xff0c;主要包括 if 语句和 switch 语句。if 语句有…...

保姆级教程:手把手教你将若依(RuoYi)项目从Java 8迁移到Java 17(含Spring Boot 3升级)

保姆级教程&#xff1a;手把手教你将若依(RuoYi)项目从Java 8迁移到Java 17&#xff08;含Spring Boot 3升级&#xff09; 最近几年Java生态发生了翻天覆地的变化&#xff0c;从Java 8到Java 17不仅仅是版本号的跳跃&#xff0c;更是一次技术栈的全面革新。作为国内广泛使用的…...

如何快速实现Font Awesome图标字体文件格式转换:终极在线工具指南

如何快速实现Font Awesome图标字体文件格式转换&#xff1a;终极在线工具指南 【免费下载链接】Font-Awesome The iconic SVG, font, and CSS toolkit 项目地址: https://gitcode.com/GitHub_Trending/fo/Font-Awesome Font Awesome作为一款标志性的SVG、字体和CSS工具包…...

别再让AI芯片‘睡大觉’了:手把手教你用华为昇腾+CANN搞定异构算力调度

华为昇腾CANN实战&#xff1a;破解AI芯片利用率困局的5个关键策略 推开实验室玻璃门&#xff0c;迎面是十几台Atlas 800服务器闪烁的指示灯&#xff0c;而工程师小王正对着监控大屏上30%的平均利用率皱眉——这场景在采用国产AI芯片的团队中太常见了。当我们谈论异构算力调度时…...

破解企业AI应用开发困境:Dify工作流架构的颠覆性价值

破解企业AI应用开发困境&#xff1a;Dify工作流架构的颠覆性价值 【免费下载链接】Awesome-Dify-Workflow 分享一些好用的 Dify DSL 工作流程&#xff0c;自用、学习两相宜。 Sharing some Dify workflows. 项目地址: https://gitcode.com/GitHub_Trending/aw/Awesome-Dify-W…...

DeOldify图像上色服务技术解析:其背后的卷积神经网络架构

DeOldify图像上色服务技术解析&#xff1a;其背后的卷积神经网络架构 老照片上色&#xff0c;听起来像是个魔法。你可能见过一些黑白照片瞬间变得色彩鲜艳的对比图&#xff0c;感觉既神奇又有点不可思议。DeOldify就是这样一个能把“魔法”变成现实的开源工具&#xff0c;它能…...

160+实用功能:OneMore插件如何让OneNote笔记管理效率翻倍?[特殊字符]

160实用功能&#xff1a;OneMore插件如何让OneNote笔记管理效率翻倍&#xff1f;&#x1f680; 【免费下载链接】OneMore A OneNote add-in with simple, yet powerful and useful features 项目地址: https://gitcode.com/gh_mirrors/on/OneMore 还在为OneNote单调的功…...

粒子追踪模拟单透镜聚焦comsol ansys Fluent 二维三维模型 仿真模型,文献复现

粒子追踪模拟单透镜聚焦comsol ansys Fluent 二维三维模型 仿真模型&#xff0c;文献复现&#xff0c;热湿传递在实验室折腾粒子追踪仿真的时候&#xff0c;最让人上头的莫过于单透镜聚焦的场景搭建。COMSOL和ANSYS这对冤家各有各的脾气——前者把物理场耦合玩出花&#xff0…...

Pixel Fashion Atelier实战教程:如何导出带元数据的PNG并适配Unity像素精灵管线

Pixel Fashion Atelier实战教程&#xff1a;如何导出带元数据的PNG并适配Unity像素精灵管线 1. 教程概述 Pixel Fashion Atelier作为一款专为像素艺术设计的AI生成工具&#xff0c;其输出结果需要经过特殊处理才能完美适配Unity的像素精灵管线。本教程将手把手教你如何导出带…...

FigmaCN:5分钟快速实现Figma中文界面的终极解决方案

FigmaCN&#xff1a;5分钟快速实现Figma中文界面的终极解决方案 【免费下载链接】figmaCN 中文 Figma 插件&#xff0c;设计师人工翻译校验 项目地址: https://gitcode.com/gh_mirrors/fi/figmaCN 还在为Figma英文界面而烦恼吗&#xff1f;figmaCN是一款专为中文用户打造…...

个人开发者如何高效率APP上架安卓应用市场?软著、备案、资质、审核详解大全,一篇文章讲透流程规则!

一、上架前的资质准备 1. 软件著作权登记证书&#xff08;软著&#xff09; 软著是证明APP拥有自主知识产权的重要文件&#xff0c;多数应用商店要求上架时提供。申请周期通常为1-2个月&#xff0c;建议提前规划。 2. APP备案 根据工信部要求&#xff0c;APP主办者需要在接…...