当前位置: 首页 > news >正文

Flink+Kafka消费

引入jar

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.8.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.8.0</version>
</dependency>
<!-- flink整合kafka_2.11 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version>
</dependency>

二、处理逻辑

//2、定义环境 => Env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(9);
env.enableCheckpointing(1000);FlinkKafkaConsumer<String> consumer = this.getConsumer();//调用下面的方法获取数据源
consumer.setStartFromLatest();//消费最新数据//2、绑定数据源=> resource
DataStream<String> stream = env.addSource(consumer);//3、批量读取的方法=>
stream.timeWindowAll(Time.milliseconds(500)) //timeWindowAll:时间滚动窗口,滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠.apply(new ReadKafkaFlinkWindowFunction())//使用自己定义的apply来收集.addSink(new KafkaBatchSink());//批量的sink方法
env.execute();

2、定义消费者,并且将消费者consumer转成FlinkKafkaConsumer

public FlinkKafkaConsumer<String> getConsumer(){//定义消费者信息Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.131.147:9092");properties.put("group.id", "flink-consumer-kafka-group");properties.put("auto.offset.reset", "latest");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), properties);return consumer;
}

3、收集数据ReadKafkaFlinkWindowFunction的实现类

4、KafkaBatchSink的实现逻辑

总结

分布式处理引擎Flink使用至少一个【job】调度和至少一个【task】实现分布式处理

有界:就是指flink【消费指定范围内】的数据。例如我定义某个作业间隔时间为0.5秒,则flink已0.5秒为界,进行数据处理。有界数据用在离线数据的处理场景较多

无界:就是指flink始终【监听数据源】里的数据,获取到就处理。无界数据往往用在【实时数据】处理下的场景较多。

我这里结合我们项目的场景来给各位说一下该选那种处理。我们的场景为:

1:尽量支持最多的数据落地
2:数据必须要准确。所以我们最终了有界处理,将flink的界限设置为0.5秒,0.5秒内收集的所有数据整体使用一个算
子消费。保证数据的准确和消费高效性。

1、一定要有抛出异常的机制

我们都知道抛出异常会终止消费,但是为什么要抛出异常呢?这注意是因为如果用户不抛出异常的话,flink会认为当前的数据时正常消费的,这就造成了我们的kafka数据误消费

2、关于并行度parallelism

并行度的配置都是setParallelism,对于env和stream来说,stream的优先级比env高

3、关于checkpoint

我们如果定义程序运行在SPring Boot时,一定要配置检查点这个是flink实现容错的核心配置!

4、关于并行度

我们在设置并行度的时候,将里边的数字设置为多少,最终就会有多少个线程来执行任务。
所以大家一定要清楚对于数据准确性高的数据来说,宁愿牺牲多线程带来的效率提升也要只设置一个线程来执行消费。
可能大家没有注意,如果你不设置flink的并行度为1时。它是以的是系统的线程数来作为并行度!这样顺序是会乱的。

5、saveBatch很好

但是我建议你先封装一下或者改为批量的保存。可能大家都知道或者说都用过mybatis plus的saveBatch,它能将一个列表的inseert封装为一条sql(insert into a values(a1),(a2),(a3),但是我们一条sql的长度过长的话会存在性能问题。建议在批量处理的时候每隔1000条记录saveBatch一次

为什么flink消费kafka比官方的listener都要快

1、并行度和分区处理: Flink 具有高度的并行度支持

可以为每个 Kafka 分区创建独立的消费者实例,以便并行地处理多个分区。这使得 Flink能够更有效地利用资源,并提高整体的消费速度。相比之下,一些官方 Kafka Consumer 实现可能没有明确的并行度配置或并行处理策略。

2、事件时间处理

Flink 强调【事件时间】处理,支持按照事件的实际发生时间进行【有序处理】。这对于一些需要处理时间相关业务逻辑的应用来说很重要。Flink 可以轻松处理乱序事件,并确保事件按照正确的顺序进行处理。官方 Kafka Consumer 也提供了类似的功能,但 Flink在这方面的设计更加深入和专注。

状态管理

Flink 提供了强大的状态管理机制,对于需要在处理过程中保持状态的任务,这一点非常重要。在消费 Kafka消息时,可能需要追踪某些状态,例如记录已处理的偏移量。Flink 的状态管理可以更好地支持这种场景,而官方 Kafka Consumer可能没有提供类似的状态管理机制。

异步处理模型:

Flink 使用异步处理模型,这使得在处理一条消息时,可以同时进行其他处理而无需等待。这有助于提高整体的处理效率。

相关文章:

Flink+Kafka消费

引入jar <dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.8.0</version> </dependency> <dependency><groupId>org.apache.flink</groupId><artifactI…...

Seconds_Behind_Master越来越大,主从同步延迟

问题现象 发现从库mysql_slave的参数Seconds_Behind_Master越来越大。已排除主从服务器时间不一致&#xff1b;那么主要就判断两点&#xff1a;是io thread慢还是 sql thread慢&#xff1f;先观察show slave status\G 。 判断3个参数&#xff08;参数后面的值是默认空闲时候的…...

除法求值[中等]

一、题目 给你一个变量对数组equations和一个实数值数组values作为已知条件&#xff0c;其中equations[i] [Ai, Bi]和values[i]共同表示等式Ai / Bi values[i]。每个Ai或Bi是一个表示单个变量的字符串。另有一些以数组queries表示的问题&#xff0c;其中queries[j] [Cj, Dj…...

新时代商业市场:AR技术的挑战与机遇并存

随着科技的不断发展&#xff0c;增强现实&#xff08;AR&#xff09;技术逐渐成为当今社会的一个重要组成部分。AR技术能够将虚拟世界与现实世界相结合&#xff0c;为人们提供更加丰富、多样化的体验。在新时代的社会商业市场中&#xff0c;AR技术也正逐渐被应用于各种商业活动…...

RHEL8中ansible的使用

编写ansible.cfg和清单文件ansible的基本用法 本章实验三台RHEL8系统&#xff08;rhel801&#xff0c;rhel802&#xff0c;rhel803&#xff09;&#xff0c;其中rhel801是ansible主机 这里要确保ansible主机能够解析所有被管理的机器&#xff0c;这里通过配置/etc/hosts来实现…...

【1.6计算机组成与体系结构】存储系统

目录 1.层次化存储结构2.Cache2.1 Cache的介绍2.2 局部性原理2.3 Cache应用 1.层次化存储结构 由 ⬆ CPU&#xff1a;寄存器。 快 ⬆ Cache&#xff1a;按内容存取(相联存储器)。 到 ⬆内存&#xff08;主存&#xff09;&#xff1a;DRAM。 慢 ⬆ 外存&#xff08;辅存&#…...

TCP/UDP 协议

目录 一.TCP协议 1.介绍 2.报文格式 ​编辑 确认号 控制位 窗口大小 3.TCP特性 二.TCP协议的三次握手 1.tcp 三次握手的过程 三.四次挥手 2.有限状态机 四.tcp协议和udp协议的区别 五.udp协议 UDP特性 六.telnet协议 一.TCP协议 1.介绍 TCP&#xff08;Transm…...

如何正确理解和使用 Golang 中 nil ?

目录 指针中的 nil 切片中的 nil map 中的 nil 通道中的 nil 函数中的 nil 接口中的 nil 避免 nil 相关问题的最佳实践 小结 在 Golang 中&#xff0c;nil 是一个预定义的标识符&#xff0c;在不同的上下文环境中有不同的含义&#xff0c;但通常表示“无”、“空”或“…...

IDEA新建jdk8 spring boot项目

今天新建spring boot项目发现JDK版本最低可选17。 但是目前用的最多的还是JDK8啊。 解决办法 Server URL中设置&#xff1a; https://start.aliyun.com/设置完成后&#xff0c;又可以愉快的用jdk8创建项目了。 参考 https://blog.csdn.net/imbzz/article/details/13469117…...

Qt/C++音视频开发59-使用mdk-sdk组件/原qtav作者力作/性能凶残/超级跨平台

一、前言 最近一个月一直在研究mdk-sdk音视频组件&#xff0c;这个组件是原qtav作者的最新力作&#xff0c;提供了各种各样的示例demo&#xff0c;不仅限于支持C&#xff0c;其他各种比如java/flutter/web/android等全部支持&#xff0c;性能上也是杠杠的&#xff0c;目前大概…...

智安网络|企业网络安全工具对比:云桌面与堡垒机,哪个更适合您的需求

随着云计算技术的快速发展&#xff0c;越来越多的企业开始采用云计算解决方案来提高效率和灵活性。在云计算环境下&#xff0c;云桌面和堡垒机被广泛应用于企业网络安全和办公环境中。尽管它们都有助于提升企业的安全和效率&#xff0c;但云桌面和堡垒机在功能和应用方面存在着…...

Git忽略已经提交的文件

原理类似于 Android修改submodule的lib包名...

MVVM和MVC以及MVP的原理以及它们的区别

MVVM、MVC 和 MVP 都是前端架构模式&#xff0c;它们各自有不同的原理和特点。 MVC&#xff08;Model-View-Controller&#xff09; 原理&#xff1a;MVC 将应用程序分为三个部分&#xff1a;模型&#xff08;Model&#xff09;、视图&#xff08;View&#xff09;和控制器&a…...

WeChatMsg: 导出微信聊天记录 | 开源日报 No.108

Mozilla-Ocho/llamafile Stars: 3.5k License: NOASSERTION llamafile 是一个开源项目&#xff0c;旨在通过将 lama.cpp 与 Cosmopolitan Libc 结合成一个框架&#xff0c;将 LLM (Large Language Models) 的复杂性折叠到单个文件可执行程序中&#xff0c;并使其能够在大多数…...

Python学习之复习MySQL-Day3(DQL)

目录 文章声明⭐⭐⭐让我们开始今天的学习吧&#xff01;DQL简介基本查询查询多个/全部字段设置别名去除重复记录 条件查询条件查询介绍实例演示 聚合函数什么是聚合函数&#xff1f;常见的聚合函数实例演示 分组查询分组查询语法where 和 having 的区别实例演示 排序查询语法实…...

AI超级个体:ChatGPT与AIGC实战指南

目录 前言 一、ChatGPT在日常工作中的应用场景 1. 客户服务与支持 2. 内部沟通与协作 3. 创新与问题解决 二、巧用ChatGPT提升工作效率 1. 自动化工作流程 2. 信息整合与共享 3. 提高决策效率 三、巧用ChatGPT创造价值 1. 优化产品和服务 2. 提高员工满意度和留任率…...

SpringBoot集成websocket(5)|(使用OkHttpClient实现websocket以及详细介绍)

SpringBoot集成websocket&#xff08;5&#xff09;|&#xff08;使用OkHttpClient实现websocket以及详细介绍&#xff09; 文章目录 SpringBoot集成websocket&#xff08;5&#xff09;|&#xff08;使用OkHttpClient实现websocket以及详细介绍&#xff09;[TOC] 前言一、初始…...

Kafka-Kafka基本原理与集群快速搭建(实践)

Kafka单机搭建 下载Kafka Apache Download Mirrors 解压 tar -zxvf kafka_2.12-3.4.0.tgz -C /usr/local/src/software/kafkakafka内部bin目录下有个内置的zookeeper(用于单机) 启动zookeeper&#xff08;在后台启动&#xff09; nohup bin/zookeeper-server-start.sh conf…...

Elasticsearch 进阶(索引、类型、字段、分片、副本、集群等详细说明)-06

笔记来源&#xff1a;Elasticsearch Elasticsearch进阶 进阶-核心概念 索引Index 一个索引就是一个拥有几分相似特征的文档的集合。比如说&#xff0c;你可以有一个客户数据的索引&#xff0c;另一个产品目录的索引&#xff0c;还有一个订单数据的索引。一个索引由一个名字…...

hive的分区表和分桶表详解

分区表 Hive中的分区就是把一张大表的数据按照业务需要分散的存储到多个目录&#xff0c;每个目录就称为该表的一个分区。在查询时通过where子句中的表达式选择查询所需要的分区&#xff0c;这样的查询效率会提高很多。 静态分区表基本语法 创建分区表 create table dept_p…...

日志分散难管理?用Visual Syslog Server实现企业级日志集中监控的5个实战方案

日志分散难管理&#xff1f;用Visual Syslog Server实现企业级日志集中监控的5个实战方案 【免费下载链接】visualsyslog Syslog Server for Windows with a graphical user interface 项目地址: https://gitcode.com/gh_mirrors/vi/visualsyslog 痛点诊断&#xff1a;日…...

别再死记硬背了!用Python脚本+Modbus Poll工具,5分钟搞懂Modbus功能码怎么用

用PythonModbus Poll实战&#xff1a;5分钟解锁功能码核心逻辑 第一次接触Modbus协议时&#xff0c;那些晦涩的功能码总让我头疼——01H、03H、05H这些十六进制代码就像天书&#xff0c;文档里的理论描述看完就忘。直到我发现用Python脚本配合Modbus Poll工具进行实操测试&…...

从串口通信到内存总线:手把手拆解‘波特率’、‘比特率’与‘总线带宽’的异同与实战计算

从串口通信到内存总线&#xff1a;深度解析波特率、比特率与总线带宽的实战差异 在嵌入式开发和计算机体系结构领域&#xff0c;数据传输速率的计算是工程师日常工作中无法绕开的基础技能。但令人困惑的是&#xff0c;同样的"速率"概念在不同场景下却有着完全不同的…...

Solidity 智能合约入门:从 0 到 1 编写第一个区块链合约

一、什么是 Solidity&#xff1f; Solidity 是一门面向以太坊虚拟机&#xff08;EVM&#xff09;、静态类型的高级编程语言&#xff0c;专门用于编写区块链上的智能合约。 简单来说&#xff1a; 智能合约 运行在区块链上的自动执行代码&#xff08;无需第三方&#xff0c;代…...

无人机控制中的模糊控制:一维与二维模糊控制及其实现要点

无人机 控制方面 模糊控制 有一维模糊和二维模糊两种&#xff0c;文字说明资料已遗失&#xff0c;数学模型可以根据仿真图推导&#xff0c;直接运维simulink会报错&#xff0c;是因为没有导入模糊规则&#xff0c;在运行simulink之前需要在命令窗口输入workreadfis work.fis ,这…...

AI智能体工作完整源码大公开!企业级多Agent框架,一键私有化部署

温馨提示&#xff1a;文末有资源获取方式最近“龙虾AI”的热度席卷技术圈&#xff0c;大家都在讨论如何“养殖”自己的智能体。但真正落地时&#xff0c;技术门槛、Token消耗与复杂的协同问题&#xff0c;往往让普通用户和企业望而却步。今天我们不谈概念&#xff0c;直接分享一…...

Hutool CronUtil实战:5分钟搞定Spring Boot定时任务(含动态任务配置)

Hutool CronUtil实战&#xff1a;5分钟搞定Spring Boot定时任务&#xff08;含动态任务配置&#xff09; 在Java开发领域&#xff0c;定时任务几乎是每个项目都绕不开的基础需求。传统方案如Spring Scheduler虽然简单易用&#xff0c;但在动态任务管理和细粒度控制方面往往力不…...

国产数据库新选择:SpringBoot集成KingbaseES的性能优化全攻略

SpringBoot集成KingbaseES性能调优实战指南 当企业级应用遇到国产数据库新贵KingbaseES&#xff0c;性能优化便成为开发者最关心的核心议题。作为一款兼容PostgreSQL协议的高性能国产数据库&#xff0c;KingbaseES在金融、政务等关键领域展现出越来越强的竞争力。但要让SpringB…...

Stable Diffusion像素艺术工作站:Pixel Fashion Atelier支持LoRA在线热切换

Stable Diffusion像素艺术工作站&#xff1a;Pixel Fashion Atelier支持LoRA在线热切换 1. 像素时装锻造坊简介 Pixel Fashion Atelier是一款基于Stable Diffusion与Anything-v5的图像生成工作站&#xff0c;专为像素艺术创作而设计。与传统AI工具不同&#xff0c;它采用了复…...

用 OpenAI Codex 打造你的 AI 结对编程助手

用 OpenAI Codex 打造你的 AI 结对编程助手 告别重复劳动&#xff0c;让 AI 直接帮你写代码、修 Bug、跑测试 在 AI 编程工具层出不穷的今天&#xff0c;OpenAI Codex 依然是许多开发者心目中的“神器”。与普通的代码补全工具不同&#xff0c;Codex 是一款终端原生的 AI 编程助…...