当前位置: 首页 > news >正文

实战Flink Java api消费kafka实时数据落盘HDFS

文章目录

  • 1 需求分析
  • 2 实验过程
    • 2.1 启动服务程序
    • 2.2 启动kafka生产
  • 3 Java API 开发
    • 3.1 依赖
    • 3.2 代码部分
  • 4 实验验证
    • STEP1
    • STEP2
    • STEP3
  • 5 时间窗口

1 需求分析

在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。

flink版本1.13

kafka版本0.8

hadoop版本3.1.4

2 实验过程

2.1 启动服务程序

为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件:

[root@hadoop10 ~]# jps
3073 SecondaryNameNode
2851 DataNode
2708 NameNode
12854 Jps
1975 StandaloneSessionClusterEntrypoint
2391 QuorumPeerMain
2265 TaskManagerRunner
9882 ConsoleProducer
9035 Kafka
3517 NodeManager
3375 ResourceManager

确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。

确保 Kafka Server 在运行,因为 Flink 的 Kafka Consumer 需要连接到 Kafka Broker。

启动 Flink 的 JobManager 和 TaskManager,这是执行 Flink 任务的核心组件。

确保这些组件都在运行,以便 Flink 作业能够正常消费 Kafka 中的数据并将其写入 HDFS。

  • 具体的启动命令在此不再赘述。

2.2 启动kafka生产

  • 当前kafka没有在守护进程后台运行;
  • 创建主题,启动该主题的生产者,在kafka的bin目录下执行;
  • 此时可以生产数据,从该窗口键入任意数据进行发送。
kafka-topics.sh --zookeeper hadoop10:2181 --create --topic topic1 --partitions 1 --replication-factor 1kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1

在这里插入图片描述

3 Java API 开发

3.1 依赖

此为项目的所有依赖,包括flink、spark、hbase、ck等,实际本需求无需全部依赖,均可在阿里云或者maven开源镜像站下载。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-test</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.13.6</flink.version><hbase.version>2.4.0</hbase.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.14.6</version></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version><exclusions><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>log4j</artifactId><groupId>log4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>${hbase.version}</version><exclusions><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.4.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-2.2_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@hadoop10:/opt/app</url></configuration></plugin></plugins></build></project>
  • 依赖参考
    在这里插入图片描述

3.2 代码部分

  • 请注意kafka和hdfs的部分需要配置服务器地址,域名映射。
  • 此代码的功能是消费topic1主题,将数据直接写入hdfs中。
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class Test9_kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop10:9092");properties.setProperty("group.id", "test");// 使用FlinkKafkaConsumer作为数据源DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));String outputPath = "hdfs://hadoop10:8020/out240102";// 使用StreamingFileSink将数据写入HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")).build();// 添加Sink,将Kafka数据直接写入HDFSds1.addSink(sink);ds1.print();env.execute("Flink Kafka HDFS");}
}

4 实验验证

STEP1

运行idea代码,程序开始执行,控制台除了日志外为空。下图是已经接收到生产者的数据后,消费在控制台的截图。

在这里插入图片描述

STEP2

启动生产者,将数据写入,数据无格式限制,随意填写。此时发送的数据,是可以在STEP1中的控制台中看到屏幕打印结果的。
在这里插入图片描述

STEP3

在HDFS中查看对应的目录,可以看到数据已经写入完成。
我这里生成了多个inprogress文件,是因为我测试了多次,断码运行了多次。ide打印在屏幕后,到hdfs落盘写入,中间有一定时间,需要等待,在HDFS中刷新数据,可以看到文件大小从0到被写入数据的过程。
在这里插入图片描述

5 时间窗口

  • 使用另一种思路实现,以时间窗口的形式,将数据实时写入HDFS,实验方法同上。截图为发送数据消费,并且在HDFS中查看到数据。
    在这里插入图片描述

在这里插入图片描述

package day2;import day2.CustomProcessFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class Test9_kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop10:9092");properties.setProperty("group.id", "test");// 使用FlinkKafkaConsumer作为数据源DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));String outputPath = "hdfs://hadoop10:8020/out240102";// 使用StreamingFileSink将数据写入HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")).build();// 在一个时间窗口内将数据写入HDFSds1.process(new CustomProcessFunction())  // 使用自定义 ProcessFunction.addSink(sink);// 执行程序env.execute("Flink Kafka HDFS");}
}
package day2;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class CustomProcessFunction extends ProcessFunction<String, String> {@Overridepublic void processElement(String value, Context ctx, Collector<String> out) throws Exception {// 在这里可以添加具体的逻辑,例如将数据写入HDFSSystem.out.println(value);  // 打印结果到屏幕out.collect(value);}
}

相关文章:

实战Flink Java api消费kafka实时数据落盘HDFS

文章目录 1 需求分析2 实验过程2.1 启动服务程序2.2 启动kafka生产 3 Java API 开发3.1 依赖3.2 代码部分 4 实验验证STEP1STEP2STEP3 5 时间窗口 1 需求分析 在Java api中&#xff0c;使用flink本地模式&#xff0c;消费kafka主题&#xff0c;并直接将数据存入hdfs中。 flin…...

爬虫与反爬-localStorage指纹(某易某盾滑块指纹检测)(Hook案例)

概述&#xff1a;本文将用于了解爬虫中localStorage的检测原理以及讲述一个用于检测localStorage的反爬虫案例&#xff0c;最后对该参数进行Hook断点定位 目录&#xff1a; 一、LocalStorage 二、爬虫中localStorage的案例&#xff08;以某盾滑块为例&#xff09; 三、如何…...

聊一聊 webpack 和 vite 的开发服务代理的问题

webpack 和 vite webpackVite重新编辑的问题 changOrigin: true如何定义 /api ? webPack And Vite 都是两个比较好用的打包工具&#xff0c;尤其是 Vite, 几几年流行忘记了&#xff0c;特色就是服务启动极快&#xff0c;实现预加载&#xff0c;感觉 webPack 要比 Vite 要复杂一…...

【鸿蒙4.0】安装DevEcoStudio

1.下载安装包 HUAWEI DevEco Studio和SDK下载和升级 | HarmonyOS开发者华为鸿蒙DevEco Studio是面向全场景的一站式集成开发环境,&#xff0c;在鸿蒙官网下载或升级操作系统开发工具DevEco Studio最新版本&#xff0c;SDK配置和下载&#xff0c;2.1支持Mac、Windows操作系统。…...

[概率论]四小时不挂猴博士

贝叶斯公式是什么 贝叶斯公式是概率论中的一个重要定理&#xff0c;用于计算在已知一些先验信息的情况下&#xff0c;更新对事件发生概率的估计。贝叶斯公式的表达式如下&#xff1a; P(A|B) P(B|A) * P(A) / P(B) 其中&#xff0c;P(A|B)表示在事件B发生的条件下事件A发生的概…...

算法通关村第二十关-黄金挑战图的常见算法

大家好我是苏麟 , 今天聊聊图的常见算法 . 图里的算法是很多的&#xff0c;这里我们介绍一些常见的图算法。这些算法一般都比较复杂&#xff0c;我们这里介绍这些算法的基本含义&#xff0c;适合面试的时候装*&#xff0c;如果手写&#xff0c;那就不用啦。 图分析算法&#xf…...

服务器内存不足怎么办?会有什么影响?

服务器内存&#xff0c;也被称为RAM&#xff08;Random Access Memory&#xff09;&#xff0c;是一种临时存储设备&#xff0c;用于临时存放正在运行的程序和数据。它是服务器上的超高速存储介质&#xff0c;可以快速读取和写入数据&#xff0c;提供给CPU进行实时计算和操作。…...

GPT实战系列-简单聊聊LangChain

GPT实战系列-简单聊聊LangChain LLM大模型相关文章&#xff1a; GPT实战系列-ChatGLM3本地部署CUDA111080Ti显卡24G实战方案 GPT实战系列-Baichuan2本地化部署实战方案 GPT实战系列-大话LLM大模型训练 GPT实战系列-探究GPT等大模型的文本生成 GPT实战系列-Baichuan2等大模…...

【读书笔记】《白帽子讲web安全》浏览器安全

目录 第二篇 客户端脚本安全 第2章 浏览器安全 2.1同源策略 2.2浏览器沙箱 2.3恶意网址拦截 2.4高速发展的浏览器安全 第二篇 客户端脚本安全 第2章 浏览器安全 近年来随着互联网的发展&#xff0c;人们发现浏览器才是互联网最大的入口&#xff0c;绝大多数用户使用互联…...

海外服务器2核2G/4G/8G和4核8G配置16M公网带宽优惠价格表

腾讯云海外服务器租用优惠价格表&#xff0c;2核2G10M带宽、2核4G12M、2核8G14M、4核8G16M配置可选&#xff0c;可以选择Linux操作系统或Linux系统&#xff0c;相比较Linux服务器价格要更优惠一些&#xff0c;腾讯云服务器网txyfwq.com分享腾讯云国外服务器租用配置报价&#x…...

Linux 编译安装 Nginx

目录 一、前言二、四种安装方式介绍三、本文安装方式&#xff1a;源码安装3.1、安装依赖库3.2、开始安装 Nginx3.3、Nginx 相关操作3.4、把 Nginx 注册成系统服务 四、结尾 一、前言 Nginx 是一款轻量级的 Web 服务器、[反向代理]服务器&#xff0c;由于它的内存占用少&#xf…...

Oracle文件自动“减肥”记

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…...

【csharp】抽象类与接口有哪些不同?什么时候应该使用抽象类?

抽象类与接口有哪些不同&#xff1f; 抽象类和接口是在面向对象编程中两个不同的概念&#xff0c;它们有一些重要的区别。以下是抽象类和接口的主要不同点&#xff1a; 抽象类&#xff08;Abstract Class&#xff09;&#xff1a; 成员类型&#xff1a; 抽象类可以包含抽象方…...

最新-mybatis-plus 3.5分页插件配置

mybatis-plus 3.5分页插件配置 前提 1.项目不是springboot, 是以前的常规spring项目 2.mp 从3.2升级到3.5&#xff0c;升级后发现原本的分页竟然不起作用了&#xff0c;每次查询都是查出所有 前后配置对比 jar包对比 jsqlparser我这里单独引了包&#xff0c;因为版本太低…...

案例098:基于微信小程序的电子购物系统的设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…...

亚信安慧AntDB数据库:数字化时代的数据库创新引领者

AntDB数据库以其卓越的创新能力&#xff0c;集中体现在融合统一与实时处理两大关键领域。作为一款服务全国超过10亿用户的分布式数据库&#xff0c;其独特之处在于长期积累的经验、多样性的支持能力、快速响应的数据处理速度以及卓越的系统稳定性。AntDB不仅仅是一个数据库系统…...

【MySQL】关于日期转换的方法

力扣题 1、题目地址 1853. 转换日期格式 2、模拟表 表: Days Column NameTypedaydate day 是这个表的主键。 3、要求 给定一个Days表&#xff0c;请你编写SQL查询语句&#xff0c;将Days表中的每一个日期转化为"day_name, month_name day, year"格式的字符串…...

Ubuntu 虚拟机挂接 Windows 目录

Windows 共享目录 首先 Windows 下共享目录 我这里偷懒直接直接 Everyone &#xff0c;也可以指定用户啥的 Ubuntu 挂接 挂接命令&#xff0c;类似如下&#xff1a; sudo mount -o usernamefananchong,passwordxxxx,uid1000,gid1000,file_mode0644,dir_mode0755,dynperm //…...

机器学习模型可解释性的结果分析

模型的可解释性是机器学习领域的一个重要分支&#xff0c;随着 AI 应用范围的不断扩大&#xff0c;人们越来越不满足于模型的黑盒特性&#xff0c;与此同时&#xff0c;金融、自动驾驶等领域的法律法规也对模型的可解释性提出了更高的要求&#xff0c;在可解释 AI 一文中我们已…...

静态网页设计——环保网(HTML+CSS+JavaScript)(dw、sublime Text、webstorm、HBuilder X)

前言 声明&#xff1a;该文章只是做技术分享&#xff0c;若侵权请联系我删除。&#xff01;&#xff01; 感谢大佬的视频&#xff1a; https://www.bilibili.com/video/BV1BC4y1v7ZY/?vd_source5f425e0074a7f92921f53ab87712357b 使用技术&#xff1a;HTMLCSSJS&#xff08;…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

微信小程序之bind和catch

这两个呢&#xff0c;都是绑定事件用的&#xff0c;具体使用有些小区别。 官方文档&#xff1a; 事件冒泡处理不同 bind&#xff1a;绑定的事件会向上冒泡&#xff0c;即触发当前组件的事件后&#xff0c;还会继续触发父组件的相同事件。例如&#xff0c;有一个子视图绑定了b…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽&#xff0c;大家好&#xff0c;我是左手python&#xff01; Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库&#xff0c;用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接&#xff1a;3403. 从盒子中找出字典序最大的字符串 I 代码如下&#xff1a; class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?

Redis 的发布订阅&#xff08;Pub/Sub&#xff09;模式与专业的 MQ&#xff08;Message Queue&#xff09;如 Kafka、RabbitMQ 进行比较&#xff0c;核心的权衡点在于&#xff1a;简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...

Leetcode33( 搜索旋转排序数组)

题目表述 整数数组 nums 按升序排列&#xff0c;数组中的值 互不相同 。 在传递给函数之前&#xff0c;nums 在预先未知的某个下标 k&#xff08;0 < k < nums.length&#xff09;上进行了 旋转&#xff0c;使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...