当前位置: 首页 > news >正文

浅析Kafka Streams消息流式处理流程及原理

以下结合案例:统计消息中单词出现次数,来测试并说明kafka消息流式处理的执行流程

Maven依赖

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency></dependencies>

准备工作

首先编写创建三个类,分别作为消息生产者、消息消费者、流式处理者
KafkaStreamProducer:消息生产者

public class KafkaStreamProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");producer.send(producerRecord);}producer.close();}
}

该消息生产者向主题kafka-stream-topic-input发送五次hello kafka
KafkaStreamConsumer:消息消费者

public class KafkaStreamConsumer {public static void main(String[] args) {Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//手动提交偏移量properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅主题consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));try {while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println("consumerRecord.key() = " + consumerRecord.key());System.out.println("consumerRecord.value() = " + consumerRecord.value());}// 异步提交偏移量consumer.commitAsync();}} catch (Exception e) {e.printStackTrace();} finally {// 同步提交偏移量consumer.commitSync();}}
}

KafkaStreamQuickStart:流式处理类

public class KafkaStreamQuickStart {public static void main(String[] args) {Properties properties = new Properties();properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);kafkaStreams.start();}/*** 消息格式:hello world hello world* 配置并处理流数据。* 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。* 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。** @param streamsBuilder 用于构建KStream对象的StreamsBuilder。*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 从"kafka-stream-topic-input"主题中读取数据流KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");System.out.println("stream = " + stream);// 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);})// 按消息的值进行分组,为后续的窗口化计数操作做准备.groupBy((key, value) -> value)// 定义10秒的时间窗口,在每个窗口内对每个分组进行计数.windowedBy(TimeWindows.of(Duration.ofSeconds(10))).count()// 将计数结果转换为流,以便进行进一步的处理和转换.toStream()// 显示键值对的内容,并将键和值转换为字符串格式.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());})// 将处理后的流数据发送到"kafka-stream-topic-output"主题.to("kafka-stream-topic-output");}}

该处理类首先从主题kafka-stream-topic-input中获取消息数据,经处理后发送到主题kafka-stream-topic-output中,再由消息消费者KafkaStreamConsumer进行消费

执行结果

在这里插入图片描述
在这里插入图片描述

流式处理流程及原理说明

初始阶段

当从输入主题kafka-stream-topic-input读取数据流时,每个消息都是一个键值对。假设输入消息的键是null或一个特定的字符串,这取决于消息是如何被发送到输入主题的。

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");

分割消息值

使用flatMapValues方法分割消息的值,但这个操作不会改变消息的键。如果输入消息的键是null,那么在这个阶段消息的键仍然是null

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);
})

按消息的值进行分组

在 Kafka Streams 中,当使用groupBy方法对流进行分组时,实际上是在指定一个新的键,这个键将用于后续的窗口化操作和聚合操作。在这个案例中groupBy方法被用来按消息的值进行分组:

.groupBy((key, value) -> value)

这意味着在分组操作之后,流中的每个消息的键被设置为消息的值。因此,当你在后续的map方法中看到key参数时,这个key实际上是消息的原始值,因为在groupBy之后,消息的值已经变成了键。

定义时间窗口并计数

在这个阶段,消息被窗口化并计数,但是键保持不变。

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()

将计数结果转换为流

当将计数结果转换为流时,键仍然是之前分组时的键

.toStream()

处理和转换结果

map方法中,你看到的key参数实际上是分组后的键,也就是消息的原始值:

.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());
})

map方法中的key.key().toString()是为了获取键的字符串表示,而value.toString()是为了将计数值转换为字符串。

将处理后的数据发送到输出主题

.to("kafka-stream-topic-output");

相关文章:

浅析Kafka Streams消息流式处理流程及原理

以下结合案例&#xff1a;统计消息中单词出现次数&#xff0c;来测试并说明kafka消息流式处理的执行流程 Maven依赖 <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusio…...

QGroundControl的总体架构,模块化设计和主要组件的功能。

QGroundControl 总体架构详细描述 QGroundControl (QGC) 作为一个开源地面控制站软件&#xff0c;其设计原则是模块化、高扩展性和高可维护性。 总体架构 QGroundControl 由多个层次构成&#xff0c;每个层次负责不同的功能。这种分层结构确保了系统的高内聚性和低耦合性。 …...

oracle 表空间文件迁移

表空间文件迁移 背景 由于各种原因&#xff0c;在实际工作中可能会出现oracle服务器数据盘空间被占满的情况&#xff0c;这个时候单纯的添加新磁盘&#xff0c;后续表空间文件放新盘的方案已经不适用了&#xff0c;因为源盘已经占用满了&#xff0c;数据库服务会异常&#xf…...

JVM学习(day1)

JVM 运行时数据区 线程共享&#xff1a;方法区、堆 线程独享&#xff08;与个体“同生共死”&#xff09;&#xff1a;虚拟机栈、本地方法栈、程序计数器 程序计数器 作用&#xff1a;记录下次要执行的代码行的行号 特点&#xff1a;为一个没有OOM&#xff08;内存溢出&a…...

js项目生产环境中移除 console

1、terser-webpack-plugin webpack 构建的项目中安装使用 安装&#xff1a; npm install terser-webpack-plugin --save-dev 配置 在webpack.config.js文件中 new TerserPlugin({terserOptions: {output: {comments: false, // 去除注释},warnings: false, // 去除黄色警告,co…...

ROS2 + 科大讯飞 初步实现机器人语音控制

环境配置&#xff1a; 电脑端&#xff1a; ubuntu22.04实体机作为上位机 ROS版本&#xff1a;ros2-humble 实体机器人&#xff1a; STM32 思岚A1激光雷达 科大讯飞语音SDK 讯飞开放平台-以语音交互为核心的人工智能开放平台 实现步骤&#xff1a; 1. 下载和处理科大讯飞语音模…...

HTML5新增的input元素属性:placeholder、required、autofocus、min、max等

HTML5 大幅度地增加与改良了 input 元素的属性&#xff0c;可以简单地使用这些属性来实现 HTML5 之前需要使用 JavaScript 才能实现的许多功能。 下面将详细介绍这些新增的 input 元素的属性。 属性说明属性说明placeholder在输入框显示描述性或提示性文本autocomplete是否保…...

Cornerstone3D导致浏览器崩溃的踩坑记录

WebGL: CONTEXT_LOST_WEBGL: loseContext: context lost ⛳️ 问题描述 在使用vue3vite重构Cornerstone相关项目后&#xff0c;在Mac本地运行良好&#xff0c;但是部署测试环境后&#xff0c;在window系统的Chrome浏览器中切换页面会导致页面崩溃。查看Chrome的任务管理器&am…...

【鸿蒙学习笔记】Stage模型

官方文档&#xff1a;Stage模型开发概述 目录标题 Stage模型好处Stage模型概念图ContextAbilityStageUIAbility组件和ExtensionAbility组件WindowStage Stage模型-组件模型Stage模型-进程模型Stage模型-ArkTS线程模型和任务模型关于任务模型&#xff0c;我们先来了解一下什么是…...

Docker进入MongoDB

先是命令行开启docker镜像&#xff0c;然后进入docker镜像&#xff0c;这是两步 进入之后&#xff0c;开头会变成root&#xff0c;我的理解是进入了另一个linux系统了&#xff0c;直接执行相应的软件 这里直接use databse就是进入了&#xff0c;据说MongoDB是慢启动&#xff0c…...

APP与API:魔法世界的咒语与念咒者

1. 什么是API&#xff1f; API&#xff0c;即应用程序编程接口&#xff08;Application Programming Interface&#xff09;&#xff0c;就像是魔法世界中的咒语。API是两个独立软件系统之间进行通信和数据交换的桥梁。通过API&#xff0c;一个软件系统可以调用另一个软件系统中…...

云计算安全需求分析与安全保护工程

云计算基本概念 云计算&#xff08;Cloud Computing&#xff09;是一种通过互联网提供计算资源和服务的技术。它允许用户按需访问和使用计算资源&#xff0c;如服务器、存储、数据库、网络、安全、分析和软件应用等&#xff0c;而无需管理底层基础设施。以下是云计算的基本概念…...

七天.NET 8操作SQLite入门到实战 - 第二天 在 Windows 上配置 SQLite环境

前言 SQLite的一个重要的特性是零配置的、无需服务器&#xff0c;这意味着不需要复杂的安装或管理。它跟微软的Access差不多&#xff0c;只是一个.db格式的文件。但是与Access不同的是&#xff0c;它不需要安装任何软件&#xff0c;非常轻巧。 七天.NET 8操作SQLite入门到实战…...

操作系统——进程的状态与转换

...

80. UE5 RPG 实现UI显示技能冷却进度功能

在上一篇文章里&#xff0c;我们实现了通过GE给技能增加资源消耗和技能冷却功能。UI也能够显示角色能够使用的技能的UI&#xff0c;现在还有一个问题&#xff0c;我们希望在技能释放进去冷却时&#xff0c;技能变成灰色&#xff0c;并在技能冷却完成&#xff0c;技能可以再次使…...

Vue2-集成路由Vue Router介绍与使用

文章目录 路由&#xff08;Vue2&#xff09;1. SPA 与前端路由2. vue-router基本使用创建路由组件声明路由链接和占位标签创建路由模块挂载路由模块 3. vue-router进阶路由重定向嵌套路由动态路由编程式导航导航守卫 本篇小结 更多相关内容可查看 路由&#xff08;Vue2&#xf…...

TemuAPI接口:获取商品详情功能

temu作为拼多多海外的跨境电商平台&#xff0c;已经在海外电商领域崭露头角&#xff0c;越来越多的外贸人选择temu作为发展平台。今天的接口可以用于获取temu平台的商品详情&#xff0c;包括价格、商品图片、规格、评论等内容&#xff0c;如有需要&#xff0c;请点击文末链接或…...

deepstream读取mp4文件及不同类型视频输入bug解决

在deepstream中使用mp4文件&#xff0c;与rtsp类似&#xff0c;使用uridecodebin即可&#xff0c;&#xff08;可见官方test.py文件&#xff09; def create_source_bin(index, uri):print("Creating source bin")# Create a source GstBin to abstract this bins c…...

Redis服务器统计和配置信息简介

Redis服务器统计和配置信息简介 首先使用INFO命令在Redis中用于获取Redis服务器的各种统计和配置信息;执行上述命令后&#xff0c;返回的信息分为多个部分&#xff0c;包括服务器信息、客户端信息、内存信息、持久化信息、统计信息、复制信息、CPU信息和键空间信息&#xff1b;…...

Linux Mac 安装Higress 平替 Spring Cloud Gateway

Linux Mac 安装Higress 平替 Spring Cloud Gateway Higress是什么?传统网关分类Higress定位下载安装包执行安装命令执行脚本 安装成功打开管理界面使用方法configure.shreset.shstartup.shshutdown.shstatus.shlogs.sh Higress官网 Higress是什么? Higress是基于阿里内部的…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?

大家好&#xff0c;欢迎来到《云原生核心技术》系列的第七篇&#xff01; 在上一篇&#xff0c;我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在&#xff0c;我们就像一个拥有了一块崭新数字土地的农场主&#xff0c;是时…...

微信小程序之bind和catch

这两个呢&#xff0c;都是绑定事件用的&#xff0c;具体使用有些小区别。 官方文档&#xff1a; 事件冒泡处理不同 bind&#xff1a;绑定的事件会向上冒泡&#xff0c;即触发当前组件的事件后&#xff0c;还会继续触发父组件的相同事件。例如&#xff0c;有一个子视图绑定了b…...

将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?

Otsu 是一种自动阈值化方法&#xff0c;用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理&#xff0c;能够自动确定一个阈值&#xff0c;将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

Python如何给视频添加音频和字幕

在Python中&#xff0c;给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加&#xff0c;包括必要的代码示例和详细解释。 环境准备 在开始之前&#xff0c;需要安装以下Python库&#xff1a;…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。

1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj&#xff0c;再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...

【7色560页】职场可视化逻辑图高级数据分析PPT模版

7种色调职场工作汇报PPT&#xff0c;橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版&#xff1a;职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...

JS设计模式(4):观察者模式

JS设计模式(4):观察者模式 一、引入 在开发中&#xff0c;我们经常会遇到这样的场景&#xff1a;一个对象的状态变化需要自动通知其他对象&#xff0c;比如&#xff1a; 电商平台中&#xff0c;商品库存变化时需要通知所有订阅该商品的用户&#xff1b;新闻网站中&#xff0…...

搭建DNS域名解析服务器(正向解析资源文件)

正向解析资源文件 1&#xff09;准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2&#xff09;服务端安装软件&#xff1a;bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...