当前位置: 首页 > news >正文

Flink+Kafka消费

引入jar

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.8.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.8.0</version>
</dependency>
<!-- flink整合kafka_2.11 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version>
</dependency>

二、处理逻辑

//2、定义环境 => Env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(9);
env.enableCheckpointing(1000);FlinkKafkaConsumer<String> consumer = this.getConsumer();//调用下面的方法获取数据源
consumer.setStartFromLatest();//消费最新数据//2、绑定数据源=> resource
DataStream<String> stream = env.addSource(consumer);//3、批量读取的方法=>
stream.timeWindowAll(Time.milliseconds(500)) //timeWindowAll:时间滚动窗口,滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠.apply(new ReadKafkaFlinkWindowFunction())//使用自己定义的apply来收集.addSink(new KafkaBatchSink());//批量的sink方法
env.execute();

2、定义消费者,并且将消费者consumer转成FlinkKafkaConsumer

public FlinkKafkaConsumer<String> getConsumer(){//定义消费者信息Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.131.147:9092");properties.put("group.id", "flink-consumer-kafka-group");properties.put("auto.offset.reset", "latest");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), properties);return consumer;
}

3、收集数据ReadKafkaFlinkWindowFunction的实现类

4、KafkaBatchSink的实现逻辑

总结

分布式处理引擎Flink使用至少一个【job】调度和至少一个【task】实现分布式处理

有界:就是指flink【消费指定范围内】的数据。例如我定义某个作业间隔时间为0.5秒,则flink已0.5秒为界,进行数据处理。有界数据用在离线数据的处理场景较多

无界:就是指flink始终【监听数据源】里的数据,获取到就处理。无界数据往往用在【实时数据】处理下的场景较多。

我这里结合我们项目的场景来给各位说一下该选那种处理。我们的场景为:

1:尽量支持最多的数据落地
2:数据必须要准确。所以我们最终了有界处理,将flink的界限设置为0.5秒,0.5秒内收集的所有数据整体使用一个算
子消费。保证数据的准确和消费高效性。

1、一定要有抛出异常的机制

我们都知道抛出异常会终止消费,但是为什么要抛出异常呢?这注意是因为如果用户不抛出异常的话,flink会认为当前的数据时正常消费的,这就造成了我们的kafka数据误消费

2、关于并行度parallelism

并行度的配置都是setParallelism,对于env和stream来说,stream的优先级比env高

3、关于checkpoint

我们如果定义程序运行在SPring Boot时,一定要配置检查点这个是flink实现容错的核心配置!

4、关于并行度

我们在设置并行度的时候,将里边的数字设置为多少,最终就会有多少个线程来执行任务。
所以大家一定要清楚对于数据准确性高的数据来说,宁愿牺牲多线程带来的效率提升也要只设置一个线程来执行消费。
可能大家没有注意,如果你不设置flink的并行度为1时。它是以的是系统的线程数来作为并行度!这样顺序是会乱的。

5、saveBatch很好

但是我建议你先封装一下或者改为批量的保存。可能大家都知道或者说都用过mybatis plus的saveBatch,它能将一个列表的inseert封装为一条sql(insert into a values(a1),(a2),(a3),但是我们一条sql的长度过长的话会存在性能问题。建议在批量处理的时候每隔1000条记录saveBatch一次

为什么flink消费kafka比官方的listener都要快

1、并行度和分区处理: Flink 具有高度的并行度支持

可以为每个 Kafka 分区创建独立的消费者实例,以便并行地处理多个分区。这使得 Flink能够更有效地利用资源,并提高整体的消费速度。相比之下,一些官方 Kafka Consumer 实现可能没有明确的并行度配置或并行处理策略。

2、事件时间处理

Flink 强调【事件时间】处理,支持按照事件的实际发生时间进行【有序处理】。这对于一些需要处理时间相关业务逻辑的应用来说很重要。Flink 可以轻松处理乱序事件,并确保事件按照正确的顺序进行处理。官方 Kafka Consumer 也提供了类似的功能,但 Flink在这方面的设计更加深入和专注。

状态管理

Flink 提供了强大的状态管理机制,对于需要在处理过程中保持状态的任务,这一点非常重要。在消费 Kafka消息时,可能需要追踪某些状态,例如记录已处理的偏移量。Flink 的状态管理可以更好地支持这种场景,而官方 Kafka Consumer可能没有提供类似的状态管理机制。

异步处理模型:

Flink 使用异步处理模型,这使得在处理一条消息时,可以同时进行其他处理而无需等待。这有助于提高整体的处理效率。

相关文章:

Flink+Kafka消费

引入jar <dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.8.0</version> </dependency> <dependency><groupId>org.apache.flink</groupId><artifactI…...

Seconds_Behind_Master越来越大,主从同步延迟

问题现象 发现从库mysql_slave的参数Seconds_Behind_Master越来越大。已排除主从服务器时间不一致&#xff1b;那么主要就判断两点&#xff1a;是io thread慢还是 sql thread慢&#xff1f;先观察show slave status\G 。 判断3个参数&#xff08;参数后面的值是默认空闲时候的…...

除法求值[中等]

一、题目 给你一个变量对数组equations和一个实数值数组values作为已知条件&#xff0c;其中equations[i] [Ai, Bi]和values[i]共同表示等式Ai / Bi values[i]。每个Ai或Bi是一个表示单个变量的字符串。另有一些以数组queries表示的问题&#xff0c;其中queries[j] [Cj, Dj…...

新时代商业市场:AR技术的挑战与机遇并存

随着科技的不断发展&#xff0c;增强现实&#xff08;AR&#xff09;技术逐渐成为当今社会的一个重要组成部分。AR技术能够将虚拟世界与现实世界相结合&#xff0c;为人们提供更加丰富、多样化的体验。在新时代的社会商业市场中&#xff0c;AR技术也正逐渐被应用于各种商业活动…...

RHEL8中ansible的使用

编写ansible.cfg和清单文件ansible的基本用法 本章实验三台RHEL8系统&#xff08;rhel801&#xff0c;rhel802&#xff0c;rhel803&#xff09;&#xff0c;其中rhel801是ansible主机 这里要确保ansible主机能够解析所有被管理的机器&#xff0c;这里通过配置/etc/hosts来实现…...

【1.6计算机组成与体系结构】存储系统

目录 1.层次化存储结构2.Cache2.1 Cache的介绍2.2 局部性原理2.3 Cache应用 1.层次化存储结构 由 ⬆ CPU&#xff1a;寄存器。 快 ⬆ Cache&#xff1a;按内容存取(相联存储器)。 到 ⬆内存&#xff08;主存&#xff09;&#xff1a;DRAM。 慢 ⬆ 外存&#xff08;辅存&#…...

TCP/UDP 协议

目录 一.TCP协议 1.介绍 2.报文格式 ​编辑 确认号 控制位 窗口大小 3.TCP特性 二.TCP协议的三次握手 1.tcp 三次握手的过程 三.四次挥手 2.有限状态机 四.tcp协议和udp协议的区别 五.udp协议 UDP特性 六.telnet协议 一.TCP协议 1.介绍 TCP&#xff08;Transm…...

如何正确理解和使用 Golang 中 nil ?

目录 指针中的 nil 切片中的 nil map 中的 nil 通道中的 nil 函数中的 nil 接口中的 nil 避免 nil 相关问题的最佳实践 小结 在 Golang 中&#xff0c;nil 是一个预定义的标识符&#xff0c;在不同的上下文环境中有不同的含义&#xff0c;但通常表示“无”、“空”或“…...

IDEA新建jdk8 spring boot项目

今天新建spring boot项目发现JDK版本最低可选17。 但是目前用的最多的还是JDK8啊。 解决办法 Server URL中设置&#xff1a; https://start.aliyun.com/设置完成后&#xff0c;又可以愉快的用jdk8创建项目了。 参考 https://blog.csdn.net/imbzz/article/details/13469117…...

Qt/C++音视频开发59-使用mdk-sdk组件/原qtav作者力作/性能凶残/超级跨平台

一、前言 最近一个月一直在研究mdk-sdk音视频组件&#xff0c;这个组件是原qtav作者的最新力作&#xff0c;提供了各种各样的示例demo&#xff0c;不仅限于支持C&#xff0c;其他各种比如java/flutter/web/android等全部支持&#xff0c;性能上也是杠杠的&#xff0c;目前大概…...

智安网络|企业网络安全工具对比:云桌面与堡垒机,哪个更适合您的需求

随着云计算技术的快速发展&#xff0c;越来越多的企业开始采用云计算解决方案来提高效率和灵活性。在云计算环境下&#xff0c;云桌面和堡垒机被广泛应用于企业网络安全和办公环境中。尽管它们都有助于提升企业的安全和效率&#xff0c;但云桌面和堡垒机在功能和应用方面存在着…...

Git忽略已经提交的文件

原理类似于 Android修改submodule的lib包名...

MVVM和MVC以及MVP的原理以及它们的区别

MVVM、MVC 和 MVP 都是前端架构模式&#xff0c;它们各自有不同的原理和特点。 MVC&#xff08;Model-View-Controller&#xff09; 原理&#xff1a;MVC 将应用程序分为三个部分&#xff1a;模型&#xff08;Model&#xff09;、视图&#xff08;View&#xff09;和控制器&a…...

WeChatMsg: 导出微信聊天记录 | 开源日报 No.108

Mozilla-Ocho/llamafile Stars: 3.5k License: NOASSERTION llamafile 是一个开源项目&#xff0c;旨在通过将 lama.cpp 与 Cosmopolitan Libc 结合成一个框架&#xff0c;将 LLM (Large Language Models) 的复杂性折叠到单个文件可执行程序中&#xff0c;并使其能够在大多数…...

Python学习之复习MySQL-Day3(DQL)

目录 文章声明⭐⭐⭐让我们开始今天的学习吧&#xff01;DQL简介基本查询查询多个/全部字段设置别名去除重复记录 条件查询条件查询介绍实例演示 聚合函数什么是聚合函数&#xff1f;常见的聚合函数实例演示 分组查询分组查询语法where 和 having 的区别实例演示 排序查询语法实…...

AI超级个体:ChatGPT与AIGC实战指南

目录 前言 一、ChatGPT在日常工作中的应用场景 1. 客户服务与支持 2. 内部沟通与协作 3. 创新与问题解决 二、巧用ChatGPT提升工作效率 1. 自动化工作流程 2. 信息整合与共享 3. 提高决策效率 三、巧用ChatGPT创造价值 1. 优化产品和服务 2. 提高员工满意度和留任率…...

SpringBoot集成websocket(5)|(使用OkHttpClient实现websocket以及详细介绍)

SpringBoot集成websocket&#xff08;5&#xff09;|&#xff08;使用OkHttpClient实现websocket以及详细介绍&#xff09; 文章目录 SpringBoot集成websocket&#xff08;5&#xff09;|&#xff08;使用OkHttpClient实现websocket以及详细介绍&#xff09;[TOC] 前言一、初始…...

Kafka-Kafka基本原理与集群快速搭建(实践)

Kafka单机搭建 下载Kafka Apache Download Mirrors 解压 tar -zxvf kafka_2.12-3.4.0.tgz -C /usr/local/src/software/kafkakafka内部bin目录下有个内置的zookeeper(用于单机) 启动zookeeper&#xff08;在后台启动&#xff09; nohup bin/zookeeper-server-start.sh conf…...

Elasticsearch 进阶(索引、类型、字段、分片、副本、集群等详细说明)-06

笔记来源&#xff1a;Elasticsearch Elasticsearch进阶 进阶-核心概念 索引Index 一个索引就是一个拥有几分相似特征的文档的集合。比如说&#xff0c;你可以有一个客户数据的索引&#xff0c;另一个产品目录的索引&#xff0c;还有一个订单数据的索引。一个索引由一个名字…...

hive的分区表和分桶表详解

分区表 Hive中的分区就是把一张大表的数据按照业务需要分散的存储到多个目录&#xff0c;每个目录就称为该表的一个分区。在查询时通过where子句中的表达式选择查询所需要的分区&#xff0c;这样的查询效率会提高很多。 静态分区表基本语法 创建分区表 create table dept_p…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)

要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况&#xff0c;可以通过以下几种方式模拟或触发&#xff1a; 1. 增加CPU负载 运行大量计算密集型任务&#xff0c;例如&#xff1a; 使用多线程循环执行复杂计算&#xff08;如数学运算、加密解密等&#xff09;。运行图…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

Webpack性能优化:构建速度与体积优化策略

一、构建速度优化 1、​​升级Webpack和Node.js​​ ​​优化效果​​&#xff1a;Webpack 4比Webpack 3构建时间降低60%-98%。​​原因​​&#xff1a; V8引擎优化&#xff08;for of替代forEach、Map/Set替代Object&#xff09;。默认使用更快的md4哈希算法。AST直接从Loa…...

C++ 设计模式 《小明的奶茶加料风波》

&#x1f468;‍&#x1f393; 模式名称&#xff1a;装饰器模式&#xff08;Decorator Pattern&#xff09; &#x1f466; 小明最近上线了校园奶茶配送功能&#xff0c;业务火爆&#xff0c;大家都在加料&#xff1a; 有的同学要加波霸 &#x1f7e4;&#xff0c;有的要加椰果…...