当前位置: 首页 > news >正文

flink1.17.0 集成kafka,并且计算


前言

flink是实时计算的重要集成组件,这里演示如何集成,并且使用一个小例子。例子是kafka输入消息,用逗号隔开,统计每个相同单词出现的次数,这么一个功能。


一、kafka环境准备

1.1 启动kafka

这里我使用的kafka版本是3.2.0,部署的方法可以参考,
kafka部署

cd kafka_2.13-3.2.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

启动后查看java进程是否存在,存在后执行下一步。

1.2 新建topic

新建一个专门用于flink消费topic

bin/kafka-topics.sh --create --topic flinkTest --bootstrap-server 192.168.184.129:9092

1.3 测试生产消费是否正常

生产端:

bin/kafka-console-producer.sh --topic flinkTest --bootstrap-server 192.168.184.129:9092

客户端:

bin/kafka-console-consumer.sh --topic flinkTest --from-beginning --bootstrap-server 192.168.184.129:9092

1.4 测试生产消费

在生产端输入aaa
在这里插入图片描述
查看客户端是否能消费到
在这里插入图片描述
可以看到客户端已经消费成功了,kafka环境准备好了。

二、flink集成kafka

2.1 pom文件修改

pom文件修改之前,先看看官网的指导依赖是什么样的,
这里我们使用的是datastream api去做,
flink1.17.0官方文档

在这里插入图片描述
这里说明了相关的依赖需要引入的依赖包的版本,还有使用kafka消费的时候需要引入的连接包版本
在这里插入图片描述
完整的pom引入依赖如下:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wh.flink</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><name>flink</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink.version>1.17.1</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><!-- Flink Kafka连接器的依赖 -->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><!-- Flink 开发Scala需要导入以下依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><!--<dependency>--><!--<groupId>org.scala-lang</groupId>--><!--<artifactId>scala-library</artifactId>--><!--<version>2.11.12</version>--><!--</dependency>--><!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉--><!--<dependency>--><!--<groupId>org.slf4j</groupId>--><!--<artifactId>slf4j-log4j12</artifactId>--><!--<version>1.7.25</version>--><!--<scope>test</scope>--><!--</dependency>--><!--<dependency>--><!--<groupId>log4j</groupId>--><!--<artifactId>log4j</artifactId>--><!--<version>1.2.17</version>--><!--</dependency>--><!--<dependency>--><!--<groupId>org.slf4j</groupId>--><!--<artifactId>slf4j-api</artifactId>--><!--<version>1.7.25</version>--><!--</dependency>--><!--<dependency>--><!--<groupId>org.slf4j</groupId>--><!--<artifactId>slf4j-nop</artifactId>--><!--<version>1.7.25</version>--><!--<scope>test</scope>--><!--</dependency>--><!--<dependency>--><!--<groupId>org.slf4j</groupId>--><!--<artifactId>slf4j-simple</artifactId>--><!--<version>1.7.5</version>--><!--</dependency>--></dependencies><build><plugins><!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
<!--            <plugin>-->
<!--                <groupId>org.scala-tools</groupId>-->
<!--                <artifactId>maven-scala-plugin</artifactId>-->
<!--                <version>2.15.2</version>-->
<!--                <executions>-->
<!--                    <execution>-->
<!--                        <goals>-->
<!--                            <goal>compile</goal>-->
<!--                            <goal>testCompile</goal>-->
<!--                        </goals>-->
<!--                    </execution>-->
<!--                </executions>-->
<!--            </plugin>--><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.4</version><configuration><!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” --><!--<appendAssemblyId>false</appendAssemblyId>--><archive><manifest><mainClass>com.hadoop.demo.service.flinkDemo.FlinkDemo</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>assembly</goal></goals></execution></executions></plugin></plugins></build>
</project>

项目结构如图
在这里插入图片描述

2.2 代码编写

package com.hadoop.demo.service.flinkDemo;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.FlatMapIterator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.Iterator;public class FlinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//新建kafka连接KafkaSource<String> kfkSource = KafkaSource.<String>builder().setBootstrapServers("192.168.184.129:9092").setGroupId("flink").setTopics("flinkTest").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();//添加到flink环境DataStreamSource<String> lines = env.fromSource(kfkSource, WatermarkStrategy.noWatermarks(), "kafka source");//根据逗号分组SingleOutputStreamOperator<Tuple2<String, Integer>> map = lines.flatMap(new FlatMapIterator<String, String>() {@Overridepublic Iterator<String> flatMap(String s) throws Exception {return Arrays.asList(s.split(",")).iterator();}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {return new Tuple2<>(s, 1);}});//统计每个单词的数量SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map.keyBy(0).sum(1);sum.print();//System.out.println(sum.get);env.execute();}}

2.3 maven打包在这里插入图片描述

点击打包按钮,这里注意要选择带依赖的jar包,否则会出现以下错误。

NoClassDefFoundError: org/apache/flink/connector/kafka/source/KafkaSource

三、测试

3.1启动 hadoop集群,启动flink集群

这里如果不知道怎么搭建这两个集群可以看我其他文章
hadoop集成flink

./hadoop.sh start
./bin/yarn-session.sh --detached

3.2 上传jar包到flink集群

在这里插入图片描述
上传后填写主类类名,点击提交
在这里插入图片描述

3.3 测试

点击后,可以看到执行job这里能看到在运行的job
在这里插入图片描述
点击运行的task
在这里插入图片描述
点击输出
在这里插入图片描述
这里可以看到输出内容,
在kafka消费端输入内容,
在这里插入图片描述
这里的jbs出现了4次,看下输出控制台,
在这里插入图片描述
可以看到这里依次累加了四次,说明统计生效了。


总结

这里只是做了一个简单的消费kafka的flink例子,消费成功后还可以通过sink发送出去,还可以用transform进行转换,这里后面再演示,如果不对的可以指出。

相关文章:

flink1.17.0 集成kafka,并且计算

前言 flink是实时计算的重要集成组件&#xff0c;这里演示如何集成&#xff0c;并且使用一个小例子。例子是kafka输入消息&#xff0c;用逗号隔开&#xff0c;统计每个相同单词出现的次数&#xff0c;这么一个功能。 一、kafka环境准备 1.1 启动kafka 这里我使用的kafka版本…...

【华为OD机试】数组组成的最小数字【2023 B卷|100分】

【华为OD机试】-真题 !!点这里!! 【华为OD机试】真题考点分类 !!点这里 !! 题目描述: 给定一个整型数组,请从该数组中选择3个元素组成最小数字并输出 (如果数组长度小于3,则选择数组中所有元素来组成最小数字)。 输入描述: 一行用半角逗号分割的字符串记录的整型数…...

Exponential Loss 中的关于indicator 函数的一个恒等式

− x y 2 I ( x ≠ y ) − 1 -xy2\mathbf{ I}(x \ne y)-1 −xy2I(xy)−1 其中 I \mathbf{ I} I 是 indicator 函数&#xff0c; 定义域 为True &#xff0c;函数值为 1 反之为 0 x,y 都 可以取值 {-1,1} 证明过程见下表&#xff1a; xy左式右式-1-1-1-111-1-1-11111-111...

【机器学习】浅析过拟合

过度拟合 我们来想象如下一个场景&#xff1a;我们准备了10000张西瓜的照片让算法训练识别西瓜图像&#xff0c;但是这 10000张西瓜的图片都是有瓜梗的&#xff0c;算法在拟合西瓜的特征的时候&#xff0c;将西瓜带瓜梗当作了一个一般性的特征。此时出现一张没有瓜梗的西瓜照片…...

尝试在UNet的不同位置添加SE模块

目录 &#xff08;1&#xff09;se-unet01&#xff08;在卷积后&#xff0c;下采样前&#xff0c;添加SE模块&#xff09; &#xff08;2&#xff09;se-unet02&#xff08;在卷积后&#xff0c;上采样前&#xff0c;添加SE模块&#xff09; &#xff08;3&#xff09;se-un…...

JVM垃圾回收篇之相关概念和算法

垃圾回收相关概念 什么是垃圾 垃圾就是指在运行程序中没有任何指针指向的对象,这个对象就是需要被回收掉的垃圾,如果不及时进行清理,越积越多就会导致内存溢出. 为什么需要GC 不进行回收,早晚会导致内存溢出,Java自动管理垃圾回收,不需要开发人员手动干预,这就有可能导致开…...

(学习日记)2023.04.27

写在前面&#xff1a; 由于时间的不足与学习的碎片化&#xff0c;写博客变得有些奢侈。 但是对于记录学习&#xff08;忘了以后能快速复习&#xff09;的渴望一天天变得强烈。 既然如此 不如以天为单位&#xff0c;以时间为顺序&#xff0c;仅仅将博客当做一个知识学习的目录&a…...

亚马逊CPC广告每日该怎么调整?

01 CPC广告需要每日调整吗&#xff1f; 其实&#xff0c;亚马逊广告是不建议每天都做过多调整的。 为什么呢&#xff1f;调整太频繁了&#xff0c;看不到每天调整的结果是不是&#xff1f; 什么时候需要调整呢&#xff1f; 就是广告指标&#xff0c;比如说曝光、点击、转化率情…...

ffmpeg下载及ffmpy3安装使用

ffmpeg下载及ffmpy3安装使用 1.下载ffmpeg 进入网址&#xff1a;https://www.gyan.dev/ffmpeg/builds/ 在release builds中下载ffmpeg-release-full.7z 下载好后解压到自己想存放的目录&#xff0c;例如&#xff1a;D:\Tool\ffmpeg-6.0-full_build 2.配置环境变量 右键此电…...

设计模式之~原型模式

定义&#xff1a;用原型实例指导创建对象的种类&#xff0c;并且通过拷贝这些原型创建新的对象。原型模式其实就是从一个对象再创建另外一个可定制的对象&#xff0c;而且不需知道任何创建的细节。 优点&#xff1a; 一般在初始化的信息不发生变化的情况下&#xff0c;克隆是最…...

多传感器融合SLAM --- 8.LIO-SAM基础知识解读

目录 1 惯性测量单元简介及预积分 1.1 IMU 器件介绍及选型建议 1.2 IMU状态传递方程...

多模态大模型时代下的文档图像智能分析与处理

多模态大模型时代下的文档图像智能分析与处理 0. 前言1. 人工智能发展历程1.1 传统机器学习1.2 深度学习1.3 多模态大模型时代 2. CCIG 文档图像智能分析与处理论坛2.1 文档图像智能分析与处理的重要性和挑战2.2 文档图像智能分析与处理高峰论坛2.3 走进合合信息 3. 文档图像智…...

SAP-MM-内向外向交货单

1、内向&外向交货单概念 外向交货&#xff08;outbound delivery&#xff09;是用在客户与企业之间的交货单&#xff0c;而内向交货&#xff08;inbound delivery&#xff09;则是用在供应商与企业之间的交货单&#xff1b;换言之&#xff0c;外向交货多用于SD 模块&#…...

Mysql - date、datetime、timestamp 的区别

date、datetime 的区别 顾名思义&#xff0c;date 日期&#xff0c;datetime 日期时间&#xff0c;所以 date 是 datetime 的日期部分MySQL 以 格式检索和显示 datetime 值 YYYY-MM-DD hh:mm:ss datetime 支持的日期时间范围 1000-01-01 00:00:00 ~ 9999-12-31 23:59:59 d…...

离散数学_十章-图 ( 4 ):图的表示和图的同构

&#x1f4f7;10.4 图的表示和图的同构 1. 图的表示1.1 邻接表1.1.1 简单图的邻接表1.1.2 有向图的邻接表 1.2 邻接矩阵❗在邻接表和邻接矩阵之间取舍1.3 关联矩阵 2. 图同构3. ⚡判断两个简单图是否同构 图的表示方式有很多种&#xff0c;选择最方便的表示有助于对图的处理~ …...

MySQL锁的分类

MySQL锁的分类 全局锁 表级锁 ● 表锁 ● 元数据锁&#xff0c;Meta Data Lock&#xff0c;MDL锁 ● 意向锁 ● AUTO_INC 锁 行级锁(Innodb引擎牛比的地方) ● record lock&#xff0c;记录锁&#xff0c;也就是仅仅把一条记录给锁上了 ● gap lock&#xff0c;间隙锁&#xff…...

程序员如何给变量起名字

程序员如何给变量起名字 在编写代码时&#xff0c;为变量命名是非常重要的。良好的命名习惯可以提高代码的可读性和可维护性&#xff0c;使得其他开发者能够更容易地理解你的代码。在这篇文章中&#xff0c;我们将讨论程序员如何为变量选择合适的名称。 规范 首先&#xff0…...

隔板法(求解的组数)

文章目录 隔板法&#xff08;求解的组数&#xff09;隔板法扩展 例题 隔板法&#xff08;求解的组数&#xff09; 文章首发于我的个人博客&#xff1a;欢迎大佬们来逛逛 隔板法 隔板法能够解决的问题&#xff1a; 求线性不定方程的解的组数求相同元素分组的方案数 给我们 …...

智能文档处理黑科技,拥抱更高效的数字世界

目录 0 写在前面1 为何要关注智慧文档&#xff1f;2 图像弯曲矫正3 手写板反光擦除4 版面元素检测5 文档篡改检测总结 0 写在前面 近期&#xff0c;中国图象图形学学会文档图像分析与识别专业委员会与上海合合信息科技有限公司联合打造了《文档图像智能分析与处理》高峰论坛。…...

vue ts写法

Vue.js 和 TypeScript 结合使用可以让你的项目更加健壮和易于维护。在 Vue 3 中&#xff0c;你可以使用 Vue.js 的 Composition API 和 TypeScript 一起使用。以下是一个简单的 Vue.js 和 TypeScript 结合使用的例子&#xff1a; 首先&#xff0c;确保你已经安装了 Vue.js 和 T…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

高频面试之3Zookeeper

高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个&#xff1f;3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制&#xff08;过半机制&#xff0…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

文件上传漏洞防御全攻略

要全面防范文件上传漏洞&#xff0c;需构建多层防御体系&#xff0c;结合技术验证、存储隔离与权限控制&#xff1a; &#x1f512; 一、基础防护层 前端校验&#xff08;仅辅助&#xff09; 通过JavaScript限制文件后缀名&#xff08;白名单&#xff09;和大小&#xff0c;提…...

虚幻基础:角色旋转

能帮到你的话&#xff0c;就给个赞吧 &#x1f618; 文章目录 移动组件使用控制器所需旋转&#xff1a;组件 使用 控制器旋转将旋转朝向运动&#xff1a;组件 使用 移动方向旋转 控制器旋转和移动旋转 缺点移动旋转&#xff1a;必须移动才能旋转&#xff0c;不移动不旋转控制器…...

Selenium 查找页面元素的方式

Selenium 查找页面元素的方式 Selenium 提供了多种方法来查找网页中的元素&#xff0c;以下是主要的定位方式&#xff1a; 基本定位方式 通过ID定位 driver.find_element(By.ID, "element_id")通过Name定位 driver.find_element(By.NAME, "element_name"…...

【大厂机试题解法笔记】矩阵匹配

题目 从一个 N * M&#xff08;N ≤ M&#xff09;的矩阵中选出 N 个数&#xff0c;任意两个数字不能在同一行或同一列&#xff0c;求选出来的 N 个数中第 K 大的数字的最小值是多少。 输入描述 输入矩阵要求&#xff1a;1 ≤ K ≤ N ≤ M ≤ 150 输入格式 N M K N*M矩阵 输…...

SQLSERVER-DB操作记录

在SQL Server中&#xff0c;将查询结果放入一张新表可以通过几种方法实现。 方法1&#xff1a;使用SELECT INTO语句 SELECT INTO 语句可以直接将查询结果作为一个新表创建出来。这个新表的结构&#xff08;包括列名和数据类型&#xff09;将与查询结果匹配。 SELECT * INTO 新…...

汇编语言学习(三)——DoxBox中debug的使用

目录 一、安装DoxBox&#xff0c;并下载汇编工具&#xff08;MASM文件&#xff09; 二、debug是什么 三、debug中的命令 一、安装DoxBox&#xff0c;并下载汇编工具&#xff08;MASM文件&#xff09; 链接&#xff1a; https://pan.baidu.com/s/1IbyJj-JIkl_oMOJmkKiaGQ?pw…...