当前位置: 首页 > news >正文

Flink——进行数据转换时,报:Recovery is suppressed by NoRestartBackoffTimeStrategy

热词统计案例:

用flink中的窗口函数(apply)读取kafka中数据,并对热词进行统计。

apply:全量聚合函数,指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)。

代码演示:

kafka发送消息端: 

package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class Demo01_windows_kafka发消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一种Properties properties = new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 创建了一个消息生产者对象KafkaProducer kafkaProducer = new KafkaProducer<>(properties);String[] arr = {"联通换猫","遥遥领先","恒大歌舞团","恒大足球队","郑州烂尾楼"};Random random = new Random();for (int i = 0; i < 500; i++) {ProducerRecord record = new ProducerRecord<>("topic1",arr[random.nextInt(arr.length)]);// 调用这个里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();}
}

kafka接受消息端: 

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-数据处理转换DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(tuple2 -> tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分组key    {"俄乌战争",[1,1,1,1,1]}TimeWindow window, // 窗口对象Iterable<Tuple2<String, Integer>> input, // 分组key在窗口的所有数据Collector<String> out  // 用于输出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具类String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-执行env.execute();}
}

当执行kafka接收消息端时,会报如下错误: 

 错误原因:在对kafka中数据进行KeyBy分组处理时,使用了lambda表达式

 

解决方法:

在使用KeyBy时,将函数的各种参数类型都写清楚,修改后的代码如下:

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-数据处理转换DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分组key    {"俄乌战争",[1,1,1,1,1]}TimeWindow window, // 窗口对象Iterable<Tuple2<String, Integer>> input, // 分组key在窗口的所有数据Collector<String> out  // 用于输出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具类String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-执行env.execute();}
}

相关文章:

Flink——进行数据转换时,报:Recovery is suppressed by NoRestartBackoffTimeStrategy

热词统计案例&#xff1a; 用flink中的窗口函数&#xff08;apply&#xff09;读取kafka中数据&#xff0c;并对热词进行统计。 apply:全量聚合函数&#xff0c;指在窗口触发的时候才会对窗口内的所有数据进行一次计算&#xff08;等窗口的数据到齐&#xff0c;才开始进行聚合…...

技能之发布自己的依赖到npm上

目录 开始 解决 步骤一&#xff1a; 步骤二&#xff1a; 步骤三&#xff1a; 运用 一直以为自己的项目在github上有了&#xff08;之传了github&#xff09;就可以进行npm install下载&#xff0c;有没有和我一样萌萌的同学。没事&#xff0c;萌萌乎乎的不犯罪。 偶然的机…...

COMSOL工作站:配置指南与性能优化

COMSOL Multiphysics 求解的问题类型相当广泛&#xff0c;提供了仿真单一物理场以及灵活耦合多个物理场的功能&#xff0c;供工程师和科研人员来精确分析各个工程领域的设备、工艺和流程。 软件内置的#模型开发器#包含完整的建模工作流程&#xff0c;可实现从几何建模、材料参数…...

Qt导出Excel图表

目的 就是利用Qt导出Excel图表,如果直接画Excel 图表&#xff0c;比较麻烦些&#xff0c;代码写得也复杂了&#xff1b;而直接利用Excel模块就简单了&#xff0c;图表在模块当中已经是现成的了&#xff0c;Qt程序只更改数据就可以了&#xff0c;这篇文章就是记录一下利用模块上…...

分布式协同 - 分布式系统的特性与互斥问题

文章目录 导图概述分布式系统的特性与挑战分布式互斥算法的目标分布式互斥算法集中互斥算法集中互斥算法示意图集中互斥算法流程 基于许可的互斥算法Lamport 算法示意图Lamport 流程 令牌环互斥算法令牌环互斥算法示意图 1. 集中互斥算法&#xff08;Centralized Mutual Exclus…...

windows安装itop

本文介绍 win10 安装 itop 安装WAMP集成环境前 先安装visual c 安装itop前需要安装WAMP集成环境(windowsApacheMysqlPHP) 所需文件百度云盘 通过网盘分享的文件&#xff1a;itop.zip 链接: https://pan.baidu.com/s/1D5HrKdbyEaYBZ8_IebDQxQ 提取码: m9fh 步骤一&#xff1…...

LAMP环境的部署

一、软件安装介绍 在Linux系统中安装软件有rpm安装、yum安装、源码安装等方法&#xff0c;在这里主要给大家介绍 yum 安装&#xff0c;这是一种最简单方便的一种安装方法。 YUM&#xff08;Yellow dog Upadate Modifie&#xff09;是改进版的 RPM 管理器&#xff0c;很好地解…...

Go语言压缩文件处理

目录 Go 语言压缩文件处理1. 压缩文件&#xff1a;Zip函数2. 解压文件&#xff1a;UnZip 函数3. 小结 Go 语言压缩文件处理 在现代的应用开发中&#xff0c;处理压缩文件&#xff08;如 .zip 格式&#xff09;是常见的需求。Go 语言提供了内置的 archive/zip 包来处理 .zip 文…...

rocylinux9.4安装prometheus监控

一.上传软件包 具体的软件包如下&#xff0c;其中kubernetes-mixin是下载的监控kubernetes的一些监控规则、dashbaordd等。 二.Prometheus配置 1.promethes软件安装 #解压上传后的软件包 [rootlocalhost ] cd /opt [rootlocalhost opt]# tar xf prometheus-2.35.3.linux-amd…...

屏幕分辨率|尺寸|颜色深度指纹

一、前端通过window.screen接口获取屏幕分辨率 尺寸 颜色深度&#xff0c;横屏竖屏信息。 二、window.screen c接口实现&#xff1a; 1、third_party\blink\renderer\core\frame\screen.idl // https://drafts.csswg.org/cssom-view/#the-screen-interface[ExposedWindow ] …...

docker-elasticsearch-kibana-logstash

一、安装 Elasticsearch 尝试直接拉取 Elasticsearch 镜像&#xff1a; 执行 docker pull docker.elastic.co/elasticsearch/elasticsearch&#xff0c;拉取失败&#xff0c;错误提示为 “Error response from daemon: manifest for docker.elastic.co/elasticsearch/elasticse…...

C#设计模式——抽象工厂模式(重点)

文章目录 项目地址一、抽象工厂模式1.1 特性1.2 使用反射获取特性标记的类1.3 完整代码 项目地址 教程作者&#xff1a;教程地址&#xff1a; 代码仓库地址&#xff1a; 所用到的框架和插件&#xff1a; dbt airflow一、抽象工厂模式 工厂方法模式依然存在一个问题就是&…...

全新AI模型家族登场:完全可复现的开源语言模型OLMo 2

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…...

用Matlab和SIMULINK实现DPCM仿真和双边带调幅系统仿真

1、使用SIMULINK或Matlab实现DPCM仿真 1.1 DPCM原理 差分脉冲编码调制&#xff0c;简称DPCM&#xff0c;主要用于将模拟信号转换为数字信号&#xff0c;同时减少数据的冗余度以实现数据压缩。在DPCM中&#xff0c;信号的每个抽样值不是独立编码的&#xff0c;而是通过预测前一…...

RabbitMQ的交换机总结

1.direct交换机 2.fanout交换机...

Android so库的编译

在没弄明白so库编译的关系前,直接看网上博主的博文,常常会觉得云里雾里的,为什么一会儿通过Android工程cmake编译,一会儿又通过NDK命令去编译。两者编译的so库有什么区别? android版第三方库编译总体思路: 对于新手小白来说搞明白上面的总体思路图很有必…...

2024年底-Arch linux或转为0BSD许可证!

原文&#xff1a;https://archlinux.org/news/providing-a-license-for-package-sources/ 解读&#xff1a;Arch Linux社区通过RFC 40达成共识&#xff0c;决定将所有软件包源代码更改为0BSD许可证。 0BSD许可证是什么&#xff1f;&#xff1a;这是一个非常自由的开源许可证&a…...

深入解析音视频流媒体SIP协议交互过程

一、引言 在音视频流媒体传输过程中&#xff0c;SIP&#xff08;Session Initiation Protocol&#xff09;协议发挥着举足轻重的作用。本文将详细全面地介绍音视频流媒体传输中的SIP协议&#xff0c;包括其基本概念、交互过程、关键信令以及应用场景 二、SIP协议基本概念 1.…...

linux安装mysql8.0.40

一、下载MySQL安装包 1.查看glibc版本 rpm -qa | grep glibc 2.到mysql官网下载安装包 ​ 二、解压安装 1.上传压缩包纸/usr/local 目录下&#xff0c;解压&#xff1a; tar -xvf mysql-8.0.40-linux-glibc2.17-x86_64.tar.xz 2.重命名&#xff1a; mv mysql-8.0.40-linux-…...

Java基础之控制语句:开启编程逻辑之门

一、Java控制语句概述 Java 中的控制语句主要分为选择结构、循环结构和跳转语句三大类&#xff0c;它们在程序中起着至关重要的作用&#xff0c;能够决定程序的执行流程。 选择结构用于根据不同的条件执行不同的代码路径&#xff0c;主要包括 if 语句和 switch 语句。if 语句有…...

JavaSec-RCE

简介 RCE(Remote Code Execution)&#xff0c;可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景&#xff1a;Groovy代码注入 Groovy是一种基于JVM的动态语言&#xff0c;语法简洁&#xff0c;支持闭包、动态类型和Java互操作性&#xff0c…...

简易版抽奖活动的设计技术方案

1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装&#xff1b;只需暴露 19530&#xff08;gRPC&#xff09;与 9091&#xff08;HTTP/WebUI&#xff09;两个端口&#xff0c;即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

HBuilderX安装(uni-app和小程序开发)

下载HBuilderX 访问官方网站&#xff1a;https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本&#xff1a; Windows版&#xff08;推荐下载标准版&#xff09; Windows系统安装步骤 运行安装程序&#xff1a; 双击下载的.exe安装文件 如果出现安全提示&…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

ip子接口配置及删除

配置永久生效的子接口&#xff0c;2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...

OPENCV形态学基础之二腐蚀

一.腐蚀的原理 (图1) 数学表达式&#xff1a;dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一&#xff0c;腐蚀跟膨胀属于反向操作&#xff0c;膨胀是把图像图像变大&#xff0c;而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...