当前位置: 首页 > news >正文

Flink——进行数据转换时,报:Recovery is suppressed by NoRestartBackoffTimeStrategy

热词统计案例:

用flink中的窗口函数(apply)读取kafka中数据,并对热词进行统计。

apply:全量聚合函数,指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)。

代码演示:

kafka发送消息端: 

package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class Demo01_windows_kafka发消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一种Properties properties = new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 创建了一个消息生产者对象KafkaProducer kafkaProducer = new KafkaProducer<>(properties);String[] arr = {"联通换猫","遥遥领先","恒大歌舞团","恒大足球队","郑州烂尾楼"};Random random = new Random();for (int i = 0; i < 500; i++) {ProducerRecord record = new ProducerRecord<>("topic1",arr[random.nextInt(arr.length)]);// 调用这个里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();}
}

kafka接受消息端: 

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-数据处理转换DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(tuple2 -> tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分组key    {"俄乌战争",[1,1,1,1,1]}TimeWindow window, // 窗口对象Iterable<Tuple2<String, Integer>> input, // 分组key在窗口的所有数据Collector<String> out  // 用于输出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具类String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-执行env.execute();}
}

当执行kafka接收消息端时,会报如下错误: 

 错误原因:在对kafka中数据进行KeyBy分组处理时,使用了lambda表达式

 

解决方法:

在使用KeyBy时,将函数的各种参数类型都写清楚,修改后的代码如下:

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-数据处理转换DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分组key    {"俄乌战争",[1,1,1,1,1]}TimeWindow window, // 窗口对象Iterable<Tuple2<String, Integer>> input, // 分组key在窗口的所有数据Collector<String> out  // 用于输出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具类String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-执行env.execute();}
}

相关文章:

Flink——进行数据转换时,报:Recovery is suppressed by NoRestartBackoffTimeStrategy

热词统计案例&#xff1a; 用flink中的窗口函数&#xff08;apply&#xff09;读取kafka中数据&#xff0c;并对热词进行统计。 apply:全量聚合函数&#xff0c;指在窗口触发的时候才会对窗口内的所有数据进行一次计算&#xff08;等窗口的数据到齐&#xff0c;才开始进行聚合…...

技能之发布自己的依赖到npm上

目录 开始 解决 步骤一&#xff1a; 步骤二&#xff1a; 步骤三&#xff1a; 运用 一直以为自己的项目在github上有了&#xff08;之传了github&#xff09;就可以进行npm install下载&#xff0c;有没有和我一样萌萌的同学。没事&#xff0c;萌萌乎乎的不犯罪。 偶然的机…...

COMSOL工作站:配置指南与性能优化

COMSOL Multiphysics 求解的问题类型相当广泛&#xff0c;提供了仿真单一物理场以及灵活耦合多个物理场的功能&#xff0c;供工程师和科研人员来精确分析各个工程领域的设备、工艺和流程。 软件内置的#模型开发器#包含完整的建模工作流程&#xff0c;可实现从几何建模、材料参数…...

Qt导出Excel图表

目的 就是利用Qt导出Excel图表,如果直接画Excel 图表&#xff0c;比较麻烦些&#xff0c;代码写得也复杂了&#xff1b;而直接利用Excel模块就简单了&#xff0c;图表在模块当中已经是现成的了&#xff0c;Qt程序只更改数据就可以了&#xff0c;这篇文章就是记录一下利用模块上…...

分布式协同 - 分布式系统的特性与互斥问题

文章目录 导图概述分布式系统的特性与挑战分布式互斥算法的目标分布式互斥算法集中互斥算法集中互斥算法示意图集中互斥算法流程 基于许可的互斥算法Lamport 算法示意图Lamport 流程 令牌环互斥算法令牌环互斥算法示意图 1. 集中互斥算法&#xff08;Centralized Mutual Exclus…...

windows安装itop

本文介绍 win10 安装 itop 安装WAMP集成环境前 先安装visual c 安装itop前需要安装WAMP集成环境(windowsApacheMysqlPHP) 所需文件百度云盘 通过网盘分享的文件&#xff1a;itop.zip 链接: https://pan.baidu.com/s/1D5HrKdbyEaYBZ8_IebDQxQ 提取码: m9fh 步骤一&#xff1…...

LAMP环境的部署

一、软件安装介绍 在Linux系统中安装软件有rpm安装、yum安装、源码安装等方法&#xff0c;在这里主要给大家介绍 yum 安装&#xff0c;这是一种最简单方便的一种安装方法。 YUM&#xff08;Yellow dog Upadate Modifie&#xff09;是改进版的 RPM 管理器&#xff0c;很好地解…...

Go语言压缩文件处理

目录 Go 语言压缩文件处理1. 压缩文件&#xff1a;Zip函数2. 解压文件&#xff1a;UnZip 函数3. 小结 Go 语言压缩文件处理 在现代的应用开发中&#xff0c;处理压缩文件&#xff08;如 .zip 格式&#xff09;是常见的需求。Go 语言提供了内置的 archive/zip 包来处理 .zip 文…...

rocylinux9.4安装prometheus监控

一.上传软件包 具体的软件包如下&#xff0c;其中kubernetes-mixin是下载的监控kubernetes的一些监控规则、dashbaordd等。 二.Prometheus配置 1.promethes软件安装 #解压上传后的软件包 [rootlocalhost ] cd /opt [rootlocalhost opt]# tar xf prometheus-2.35.3.linux-amd…...

屏幕分辨率|尺寸|颜色深度指纹

一、前端通过window.screen接口获取屏幕分辨率 尺寸 颜色深度&#xff0c;横屏竖屏信息。 二、window.screen c接口实现&#xff1a; 1、third_party\blink\renderer\core\frame\screen.idl // https://drafts.csswg.org/cssom-view/#the-screen-interface[ExposedWindow ] …...

docker-elasticsearch-kibana-logstash

一、安装 Elasticsearch 尝试直接拉取 Elasticsearch 镜像&#xff1a; 执行 docker pull docker.elastic.co/elasticsearch/elasticsearch&#xff0c;拉取失败&#xff0c;错误提示为 “Error response from daemon: manifest for docker.elastic.co/elasticsearch/elasticse…...

C#设计模式——抽象工厂模式(重点)

文章目录 项目地址一、抽象工厂模式1.1 特性1.2 使用反射获取特性标记的类1.3 完整代码 项目地址 教程作者&#xff1a;教程地址&#xff1a; 代码仓库地址&#xff1a; 所用到的框架和插件&#xff1a; dbt airflow一、抽象工厂模式 工厂方法模式依然存在一个问题就是&…...

全新AI模型家族登场:完全可复现的开源语言模型OLMo 2

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…...

用Matlab和SIMULINK实现DPCM仿真和双边带调幅系统仿真

1、使用SIMULINK或Matlab实现DPCM仿真 1.1 DPCM原理 差分脉冲编码调制&#xff0c;简称DPCM&#xff0c;主要用于将模拟信号转换为数字信号&#xff0c;同时减少数据的冗余度以实现数据压缩。在DPCM中&#xff0c;信号的每个抽样值不是独立编码的&#xff0c;而是通过预测前一…...

RabbitMQ的交换机总结

1.direct交换机 2.fanout交换机...

Android so库的编译

在没弄明白so库编译的关系前,直接看网上博主的博文,常常会觉得云里雾里的,为什么一会儿通过Android工程cmake编译,一会儿又通过NDK命令去编译。两者编译的so库有什么区别? android版第三方库编译总体思路: 对于新手小白来说搞明白上面的总体思路图很有必…...

2024年底-Arch linux或转为0BSD许可证!

原文&#xff1a;https://archlinux.org/news/providing-a-license-for-package-sources/ 解读&#xff1a;Arch Linux社区通过RFC 40达成共识&#xff0c;决定将所有软件包源代码更改为0BSD许可证。 0BSD许可证是什么&#xff1f;&#xff1a;这是一个非常自由的开源许可证&a…...

深入解析音视频流媒体SIP协议交互过程

一、引言 在音视频流媒体传输过程中&#xff0c;SIP&#xff08;Session Initiation Protocol&#xff09;协议发挥着举足轻重的作用。本文将详细全面地介绍音视频流媒体传输中的SIP协议&#xff0c;包括其基本概念、交互过程、关键信令以及应用场景 二、SIP协议基本概念 1.…...

linux安装mysql8.0.40

一、下载MySQL安装包 1.查看glibc版本 rpm -qa | grep glibc 2.到mysql官网下载安装包 ​ 二、解压安装 1.上传压缩包纸/usr/local 目录下&#xff0c;解压&#xff1a; tar -xvf mysql-8.0.40-linux-glibc2.17-x86_64.tar.xz 2.重命名&#xff1a; mv mysql-8.0.40-linux-…...

Java基础之控制语句:开启编程逻辑之门

一、Java控制语句概述 Java 中的控制语句主要分为选择结构、循环结构和跳转语句三大类&#xff0c;它们在程序中起着至关重要的作用&#xff0c;能够决定程序的执行流程。 选择结构用于根据不同的条件执行不同的代码路径&#xff0c;主要包括 if 语句和 switch 语句。if 语句有…...

终极指南:Flair如何引领NLP技术未来发展趋势

终极指南&#xff1a;Flair如何引领NLP技术未来发展趋势 【免费下载链接】flair A very simple framework for state-of-the-art Natural Language Processing (NLP) 项目地址: https://gitcode.com/gh_mirrors/fl/flair Flair是一个由柏林洪堡大学开发的简单而强大的自…...

Free List Allocator实现原理:memory-allocators中的通用内存分配器

Free List Allocator实现原理&#xff1a;memory-allocators中的通用内存分配器 【免费下载链接】memory-allocators Custom memory allocators in C to improve the performance of dynamic memory allocation 项目地址: https://gitcode.com/gh_mirrors/me/memory-allocato…...

避开BUUCTF《Life on Mars》的思维陷阱:当information_schema查询结果‘不对劲’时,你的排查清单应该有哪些?

破解BUUCTF《Life on Mars》的数据库迷局&#xff1a;当information_schema说谎时的七种侦查策略 在CTF赛场上&#xff0c;SQL注入类题目往往不会按教科书上的剧本发展。当你在BUUCTF《Life on Mars》这道题中执行group_concat(database()) from information_schema.schemata却…...

3步快速上手RobotHelper:安卓自动化脚本框架新手指南

3步快速上手RobotHelper&#xff1a;安卓自动化脚本框架新手指南 【免费下载链接】RobotHelper 安卓游戏自动化脚本框架|Automated script for Android games 项目地址: https://gitcode.com/gh_mirrors/ro/RobotHelper 你是否想要开发安卓游戏自动化脚本&#xff0c;却…...

MCP Loom:快速构建AI工具与数据连接器的开发框架

1. 项目概述&#xff1a;MCP Loom&#xff0c;一个连接AI与真实世界的“织布机”如果你最近在折腾AI应用开发&#xff0c;特别是想让你的AI助手&#xff08;比如Claude、Cursor等&#xff09;能直接操作你电脑上的文件、数据库&#xff0c;甚至调用外部API&#xff0c;那么你很…...

避坑指南:LabVIEW做3D模型旋转动画时,90%的人会忽略的‘添加对象及引用’模式

LabVIEW 3D模型旋转动画深度解析&#xff1a;从"乱跑"到精准控制的进阶指南 在LabVIEW中创建3D模型旋转动画时&#xff0c;许多开发者都会遇到一个令人困惑的现象&#xff1a;明明只想让模型旋转&#xff0c;结果整个坐标系也跟着"翩翩起舞"。这种看似简单…...

3PEAK思瑞浦 TPA2772-VS1R MSOP8 运算放大器

特性 供电电压:3V至36V 偏移电压:在25C时最大3.5mV 轨到轨输入和输出 带宽:4.6 MHz 噪声容限:-良好&#xff0c;THD0.0008% 低噪声:1kHz时53nV/vHz 零交叉输入: -优异的总谐波失真加噪声:0.0008%...

稀疏记忆微调技术:解决LLM持续学习中的灾难性遗忘

1. 稀疏记忆微调技术解析 1.1 持续学习的核心挑战 在大型语言模型&#xff08;LLM&#xff09;的实际应用中&#xff0c;灾难性遗忘&#xff08;Catastrophic Forgetting&#xff09;是持续学习面临的最大障碍。想象一下&#xff0c;当你教会一个学生新知识时&#xff0c;他却…...

工程师如何运用专业技能参与人道主义项目:从思维转变到实践落地

1. 项目概述&#xff1a;工程师的人道主义行动倡议每年8月19日&#xff0c;世界人道主义日都会提醒我们关注那些在全球最艰苦、最危险地区默默奉献的人们。这个日子最初是为了纪念在履职中牺牲的人道主义工作者&#xff0c;如今已演变为一个更广泛的号召——庆祝那种激励全球人…...

GD32F303硬件I2C实战:手把手教你用AT24C02 EEPROM存储和读取设备配置参数

GD32F303硬件I2C实战&#xff1a;构建工业级参数存储系统 在嵌入式设备开发中&#xff0c;系统参数的持久化存储是个看似简单却暗藏玄机的需求。想象一下&#xff0c;当你的智能温控器经历突然断电后&#xff0c;所有用户设置的日程和偏好全部归零——这种体验足以让产品口碑崩…...