当前位置: 首页 > news >正文

实战Flink Java api消费kafka实时数据落盘HDFS

文章目录

  • 1 需求分析
  • 2 实验过程
    • 2.1 启动服务程序
    • 2.2 启动kafka生产
  • 3 Java API 开发
    • 3.1 依赖
    • 3.2 代码部分
  • 4 实验验证
    • STEP1
    • STEP2
    • STEP3
  • 5 时间窗口

1 需求分析

在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。

flink版本1.13

kafka版本0.8

hadoop版本3.1.4

2 实验过程

2.1 启动服务程序

为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件:

[root@hadoop10 ~]# jps
3073 SecondaryNameNode
2851 DataNode
2708 NameNode
12854 Jps
1975 StandaloneSessionClusterEntrypoint
2391 QuorumPeerMain
2265 TaskManagerRunner
9882 ConsoleProducer
9035 Kafka
3517 NodeManager
3375 ResourceManager

确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。

确保 Kafka Server 在运行,因为 Flink 的 Kafka Consumer 需要连接到 Kafka Broker。

启动 Flink 的 JobManager 和 TaskManager,这是执行 Flink 任务的核心组件。

确保这些组件都在运行,以便 Flink 作业能够正常消费 Kafka 中的数据并将其写入 HDFS。

  • 具体的启动命令在此不再赘述。

2.2 启动kafka生产

  • 当前kafka没有在守护进程后台运行;
  • 创建主题,启动该主题的生产者,在kafka的bin目录下执行;
  • 此时可以生产数据,从该窗口键入任意数据进行发送。
kafka-topics.sh --zookeeper hadoop10:2181 --create --topic topic1 --partitions 1 --replication-factor 1kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1

在这里插入图片描述

3 Java API 开发

3.1 依赖

此为项目的所有依赖,包括flink、spark、hbase、ck等,实际本需求无需全部依赖,均可在阿里云或者maven开源镜像站下载。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-test</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.13.6</flink.version><hbase.version>2.4.0</hbase.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.14.6</version></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version><exclusions><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>log4j</artifactId><groupId>log4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>${hbase.version}</version><exclusions><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.4.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-2.2_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@hadoop10:/opt/app</url></configuration></plugin></plugins></build></project>
  • 依赖参考
    在这里插入图片描述

3.2 代码部分

  • 请注意kafka和hdfs的部分需要配置服务器地址,域名映射。
  • 此代码的功能是消费topic1主题,将数据直接写入hdfs中。
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class Test9_kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop10:9092");properties.setProperty("group.id", "test");// 使用FlinkKafkaConsumer作为数据源DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));String outputPath = "hdfs://hadoop10:8020/out240102";// 使用StreamingFileSink将数据写入HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")).build();// 添加Sink,将Kafka数据直接写入HDFSds1.addSink(sink);ds1.print();env.execute("Flink Kafka HDFS");}
}

4 实验验证

STEP1

运行idea代码,程序开始执行,控制台除了日志外为空。下图是已经接收到生产者的数据后,消费在控制台的截图。

在这里插入图片描述

STEP2

启动生产者,将数据写入,数据无格式限制,随意填写。此时发送的数据,是可以在STEP1中的控制台中看到屏幕打印结果的。
在这里插入图片描述

STEP3

在HDFS中查看对应的目录,可以看到数据已经写入完成。
我这里生成了多个inprogress文件,是因为我测试了多次,断码运行了多次。ide打印在屏幕后,到hdfs落盘写入,中间有一定时间,需要等待,在HDFS中刷新数据,可以看到文件大小从0到被写入数据的过程。
在这里插入图片描述

5 时间窗口

  • 使用另一种思路实现,以时间窗口的形式,将数据实时写入HDFS,实验方法同上。截图为发送数据消费,并且在HDFS中查看到数据。
    在这里插入图片描述

在这里插入图片描述

package day2;import day2.CustomProcessFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class Test9_kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop10:9092");properties.setProperty("group.id", "test");// 使用FlinkKafkaConsumer作为数据源DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));String outputPath = "hdfs://hadoop10:8020/out240102";// 使用StreamingFileSink将数据写入HDFSStreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")).build();// 在一个时间窗口内将数据写入HDFSds1.process(new CustomProcessFunction())  // 使用自定义 ProcessFunction.addSink(sink);// 执行程序env.execute("Flink Kafka HDFS");}
}
package day2;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class CustomProcessFunction extends ProcessFunction<String, String> {@Overridepublic void processElement(String value, Context ctx, Collector<String> out) throws Exception {// 在这里可以添加具体的逻辑,例如将数据写入HDFSSystem.out.println(value);  // 打印结果到屏幕out.collect(value);}
}

相关文章:

实战Flink Java api消费kafka实时数据落盘HDFS

文章目录 1 需求分析2 实验过程2.1 启动服务程序2.2 启动kafka生产 3 Java API 开发3.1 依赖3.2 代码部分 4 实验验证STEP1STEP2STEP3 5 时间窗口 1 需求分析 在Java api中&#xff0c;使用flink本地模式&#xff0c;消费kafka主题&#xff0c;并直接将数据存入hdfs中。 flin…...

爬虫与反爬-localStorage指纹(某易某盾滑块指纹检测)(Hook案例)

概述&#xff1a;本文将用于了解爬虫中localStorage的检测原理以及讲述一个用于检测localStorage的反爬虫案例&#xff0c;最后对该参数进行Hook断点定位 目录&#xff1a; 一、LocalStorage 二、爬虫中localStorage的案例&#xff08;以某盾滑块为例&#xff09; 三、如何…...

聊一聊 webpack 和 vite 的开发服务代理的问题

webpack 和 vite webpackVite重新编辑的问题 changOrigin: true如何定义 /api ? webPack And Vite 都是两个比较好用的打包工具&#xff0c;尤其是 Vite, 几几年流行忘记了&#xff0c;特色就是服务启动极快&#xff0c;实现预加载&#xff0c;感觉 webPack 要比 Vite 要复杂一…...

【鸿蒙4.0】安装DevEcoStudio

1.下载安装包 HUAWEI DevEco Studio和SDK下载和升级 | HarmonyOS开发者华为鸿蒙DevEco Studio是面向全场景的一站式集成开发环境,&#xff0c;在鸿蒙官网下载或升级操作系统开发工具DevEco Studio最新版本&#xff0c;SDK配置和下载&#xff0c;2.1支持Mac、Windows操作系统。…...

[概率论]四小时不挂猴博士

贝叶斯公式是什么 贝叶斯公式是概率论中的一个重要定理&#xff0c;用于计算在已知一些先验信息的情况下&#xff0c;更新对事件发生概率的估计。贝叶斯公式的表达式如下&#xff1a; P(A|B) P(B|A) * P(A) / P(B) 其中&#xff0c;P(A|B)表示在事件B发生的条件下事件A发生的概…...

算法通关村第二十关-黄金挑战图的常见算法

大家好我是苏麟 , 今天聊聊图的常见算法 . 图里的算法是很多的&#xff0c;这里我们介绍一些常见的图算法。这些算法一般都比较复杂&#xff0c;我们这里介绍这些算法的基本含义&#xff0c;适合面试的时候装*&#xff0c;如果手写&#xff0c;那就不用啦。 图分析算法&#xf…...

服务器内存不足怎么办?会有什么影响?

服务器内存&#xff0c;也被称为RAM&#xff08;Random Access Memory&#xff09;&#xff0c;是一种临时存储设备&#xff0c;用于临时存放正在运行的程序和数据。它是服务器上的超高速存储介质&#xff0c;可以快速读取和写入数据&#xff0c;提供给CPU进行实时计算和操作。…...

GPT实战系列-简单聊聊LangChain

GPT实战系列-简单聊聊LangChain LLM大模型相关文章&#xff1a; GPT实战系列-ChatGLM3本地部署CUDA111080Ti显卡24G实战方案 GPT实战系列-Baichuan2本地化部署实战方案 GPT实战系列-大话LLM大模型训练 GPT实战系列-探究GPT等大模型的文本生成 GPT实战系列-Baichuan2等大模…...

【读书笔记】《白帽子讲web安全》浏览器安全

目录 第二篇 客户端脚本安全 第2章 浏览器安全 2.1同源策略 2.2浏览器沙箱 2.3恶意网址拦截 2.4高速发展的浏览器安全 第二篇 客户端脚本安全 第2章 浏览器安全 近年来随着互联网的发展&#xff0c;人们发现浏览器才是互联网最大的入口&#xff0c;绝大多数用户使用互联…...

海外服务器2核2G/4G/8G和4核8G配置16M公网带宽优惠价格表

腾讯云海外服务器租用优惠价格表&#xff0c;2核2G10M带宽、2核4G12M、2核8G14M、4核8G16M配置可选&#xff0c;可以选择Linux操作系统或Linux系统&#xff0c;相比较Linux服务器价格要更优惠一些&#xff0c;腾讯云服务器网txyfwq.com分享腾讯云国外服务器租用配置报价&#x…...

Linux 编译安装 Nginx

目录 一、前言二、四种安装方式介绍三、本文安装方式&#xff1a;源码安装3.1、安装依赖库3.2、开始安装 Nginx3.3、Nginx 相关操作3.4、把 Nginx 注册成系统服务 四、结尾 一、前言 Nginx 是一款轻量级的 Web 服务器、[反向代理]服务器&#xff0c;由于它的内存占用少&#xf…...

Oracle文件自动“减肥”记

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…...

【csharp】抽象类与接口有哪些不同?什么时候应该使用抽象类?

抽象类与接口有哪些不同&#xff1f; 抽象类和接口是在面向对象编程中两个不同的概念&#xff0c;它们有一些重要的区别。以下是抽象类和接口的主要不同点&#xff1a; 抽象类&#xff08;Abstract Class&#xff09;&#xff1a; 成员类型&#xff1a; 抽象类可以包含抽象方…...

最新-mybatis-plus 3.5分页插件配置

mybatis-plus 3.5分页插件配置 前提 1.项目不是springboot, 是以前的常规spring项目 2.mp 从3.2升级到3.5&#xff0c;升级后发现原本的分页竟然不起作用了&#xff0c;每次查询都是查出所有 前后配置对比 jar包对比 jsqlparser我这里单独引了包&#xff0c;因为版本太低…...

案例098:基于微信小程序的电子购物系统的设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…...

亚信安慧AntDB数据库:数字化时代的数据库创新引领者

AntDB数据库以其卓越的创新能力&#xff0c;集中体现在融合统一与实时处理两大关键领域。作为一款服务全国超过10亿用户的分布式数据库&#xff0c;其独特之处在于长期积累的经验、多样性的支持能力、快速响应的数据处理速度以及卓越的系统稳定性。AntDB不仅仅是一个数据库系统…...

【MySQL】关于日期转换的方法

力扣题 1、题目地址 1853. 转换日期格式 2、模拟表 表: Days Column NameTypedaydate day 是这个表的主键。 3、要求 给定一个Days表&#xff0c;请你编写SQL查询语句&#xff0c;将Days表中的每一个日期转化为"day_name, month_name day, year"格式的字符串…...

Ubuntu 虚拟机挂接 Windows 目录

Windows 共享目录 首先 Windows 下共享目录 我这里偷懒直接直接 Everyone &#xff0c;也可以指定用户啥的 Ubuntu 挂接 挂接命令&#xff0c;类似如下&#xff1a; sudo mount -o usernamefananchong,passwordxxxx,uid1000,gid1000,file_mode0644,dir_mode0755,dynperm //…...

机器学习模型可解释性的结果分析

模型的可解释性是机器学习领域的一个重要分支&#xff0c;随着 AI 应用范围的不断扩大&#xff0c;人们越来越不满足于模型的黑盒特性&#xff0c;与此同时&#xff0c;金融、自动驾驶等领域的法律法规也对模型的可解释性提出了更高的要求&#xff0c;在可解释 AI 一文中我们已…...

静态网页设计——环保网(HTML+CSS+JavaScript)(dw、sublime Text、webstorm、HBuilder X)

前言 声明&#xff1a;该文章只是做技术分享&#xff0c;若侵权请联系我删除。&#xff01;&#xff01; 感谢大佬的视频&#xff1a; https://www.bilibili.com/video/BV1BC4y1v7ZY/?vd_source5f425e0074a7f92921f53ab87712357b 使用技术&#xff1a;HTMLCSSJS&#xff08;…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】

微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来&#xff0c;Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...

Python爬虫(二):爬虫完整流程

爬虫完整流程详解&#xff08;7大核心步骤实战技巧&#xff09; 一、爬虫完整工作流程 以下是爬虫开发的完整流程&#xff0c;我将结合具体技术点和实战经验展开说明&#xff1a; 1. 目标分析与前期准备 网站技术分析&#xff1a; 使用浏览器开发者工具&#xff08;F12&…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生&#xff0c;我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要&#xff0c;而您认真负责的教学态度&#xff0c;让课程的每一部分都充满了实用价值。 尤其让我…...

如何在最短时间内提升打ctf(web)的水平?

刚刚刷完2遍 bugku 的 web 题&#xff0c;前来答题。 每个人对刷题理解是不同&#xff0c;有的人是看了writeup就等于刷了&#xff0c;有的人是收藏了writeup就等于刷了&#xff0c;有的人是跟着writeup做了一遍就等于刷了&#xff0c;还有的人是独立思考做了一遍就等于刷了。…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

QT开发技术【ffmpeg + QAudioOutput】音乐播放器

一、 介绍 使用ffmpeg 4.2.2 在数字化浪潮席卷全球的当下&#xff0c;音视频内容犹如璀璨繁星&#xff0c;点亮了人们的生活与工作。从短视频平台上令人捧腹的搞笑视频&#xff0c;到在线课堂中知识渊博的专家授课&#xff0c;再到影视平台上扣人心弦的高清大片&#xff0c;音…...

python打卡第47天

昨天代码中注意力热图的部分顺移至今天 知识点回顾&#xff1a; 热力图 作业&#xff1a;对比不同卷积层热图可视化的结果 def visualize_attention_map(model, test_loader, device, class_names, num_samples3):"""可视化模型的注意力热力图&#xff0c;展示模…...

云原生时代的系统设计:架构转型的战略支点

&#x1f4dd;个人主页&#x1f339;&#xff1a;一ge科研小菜鸡-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 一、云原生的崛起&#xff1a;技术趋势与现实需求的交汇 随着企业业务的互联网化、全球化、智能化持续加深&#xff0c;传统的 I…...