Kafka系列(四)
本文接kafka三,代码实践kafkaStream的应用,用来完成流式计算。
kafkastream
关于流式计算也就是实时处理,无时间概念边界的处理一些数据。想要更有性价比地和java程序进行结合,因此了解了kafka。但是本人阅读了kafka地官网,觉得可阅读性并不是很高,当然是个人认为,就是界面做的就不是很舒服。
简介
简介一下kafkaStream
Kafka Stream的特点
-
Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
-
除了Kafka外,无任何外部依赖
-
充分利用Kafka分区机制实现水平扩展和顺序性保证(想要保证消息有序性就要设置一个分区)
-
通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
-
支持正好一次处理语义
-
提供记录级的处理能力,从而实现毫秒级的低延迟
-
支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
-
同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
关键概念
-
源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
-
Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。
Kstream
(1)数据结构类似于map,key-value键值对
KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。
为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:
(“ alice”,1)->(“” alice“,3)
如果流处理应用是要总结每个用户的价值,它将返回alice,4
。因为第二条数据记录不会覆盖第一条,而是做了一个insert,累加。
代码实现
依赖
<!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>
kafkaStream配置类
需要在nacos的配置里面配置hosts属性和group,本地等怎么配置都可以,只要能读取到就行。
/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}
这里生产者和消费者我就不再举例子了,直接举中间这个stream怎么写。
stream需要知道是谁发的,所以生产者和stream需要绑定一个相同的主题,而stream需要知道要给谁发送过去,消费者知道是谁发的,所以stream和消费者又有一个相同的主题。
streamhandler代码
具体的每一行代码的含义结合个人理解都在注释里面。
package com.neu.article.stream;import com.alibaba.fastjson.JSON;import com.neu.base.constants.HotArticleConstants;
import com.neu.base.model.mess.ArticleVisitStreamMess;
import com.neu.base.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;@Configuration
@Slf4j
public class HotArticleStreamHandler {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//接收消息KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);//聚合流式处理stream.map((key,value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);//重置消息的key:1234343434(文章id) 和 value: likes:1 当前文章点赞一次//mess.getType().name():用于区分是点赞还是阅读 mess.getAdd():用于区分是加1还是减1return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());})//按照文章id进行聚合.groupBy((key,value)->key)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))/*** 自行的完成聚合的计算*/.aggregate(new Initializer<String>() {/*** 初始方法,返回值是消息的value->aggValue,聚合之后的value* @return*/@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}/*** 真正的聚合操作,返回值是消息的value*/}, new Aggregator<String, String, String>() {/**** @param key 消息的key :mess.getArticleId().toString()* @param value 消息的value likes:1* @param aggValue 初始化消息聚合后的一个值 COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0* @return*/@Overridepublic String apply(String key, String value, String aggValue) {System.out.println(value);if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col = 0,com=0,lik=0,vie=0;for (String agg : aggAry) {//agg遍历第一次的时候最开始为 COLLECTION:0String[] split = agg.split(":");//split[0] = COLLECTION split[1] = 0/*** 获得初始值,也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}/*** 累加操作 likes:1*/String[] valAry = value.split(":");//valAry[0] = likes valAry[1] = 1switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}//返回值是有要求的,必须与初始化apply方法的返回值形式一致String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id:"+key);System.out.println("当前时间窗口内的消息处理结果:"+formatStr);return formatStr;}//Materialized.as("hot-atricle-stream-count-001"):用于指定六十处理的状态,字符串可以随便给,多个流处理的话不重复就行}, Materialized.as("hot-atricle-stream-count-001")).toStream().map((key,value)->{//key.key().toString():文章id,value:COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));})//发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/public String formatObj(String articleId,String value){ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));return JSON.toJSONString(mess);}
}
相关文章:

Kafka系列(四)
本文接kafka三,代码实践kafkaStream的应用,用来完成流式计算。 kafkastream 关于流式计算也就是实时处理,无时间概念边界的处理一些数据。想要更有性价比地和java程序进行结合,因此了解了kafka。但是本人阅读了kafka地官网&#…...

【Linux学习】进程信号
目录 十七.进程信号 导言 17.1 linux中的信号列表 17.2 标准信号与实时信号 17.3 信号的产生 17.3.1 通过终端按键产生信号 17.3.2 调用系统函数产生信号 17.3.3 软件条件产生信号 17.3.4 硬件异常产生信号 17.3.5 【补充】核心转储 Core Dump 17.4 信号的阻塞 17.4.1 信号相关…...

机器学习没那么难,Azure AutoML帮你简单3步实现自动化模型训练
在Machine Learning 这个领域,通常训练一个业务模型的难点并不在于算法的选择,而在于前期的数据清理和特征工程这些纷繁复杂的工作,训练过程中的问题在于参数的反复迭代优化。 AutoML 是 Azure Databricks 的一项功能,它自动的对…...

数学建模实战Matlab绘图
二维曲线、散点图 绘图命令:plot(x,y,’line specifiers’,’PropertyName’,PropertyValue) 例子:绘图表示年收入与年份的关系 ‘--r*’:--设置线型;r:设置颜色为红色;*节点型号 ‘linewidth’:设置线宽࿱…...
TypeError the JSON object must be str, bytes or bytearray, not ‘list‘
在使用python的jason库时,偶然碰到以下问题 TypeError: the JSON object must be str, bytes or bytearray, not ‘list’ 通过如下代码可复现问题 >>> a [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> import json >>> ra json.loads(a) Trac…...

数字IC后端设计实现 | PR工具中到底应该如何控制density和congestion?(ICC2Innovus)
吾爱IC社区星友提问:请教星主和各位大佬,对于一个模块如果不加干预工具会让inst挤成一团,后面eco修时序就没有空间了。如果全都加instPadding会导致面积不够overlap,大家一般怎么处理这种问题? 在数字IC后端设计实现中…...
产品经理与产品运营的区别和联系
一、两者的职责区别 产品经理的目的:是创造有价值的产品 产品运营的目的:是让产品能有效的发挥出它应有的价值 二、两者的工作内容区别产品经理的工作内容 产品的经理的目的是创造有价值的产品,因此产品经理的所有工作都是围绕着…...

CMU15-445-Spring-2023-分布式DBMS初探(lec21-24)
Lecture #21_ Introduction to Distributed Databases Distributed DBMSs 分布式 DBMS 将单个逻辑数据库划分为多个物理资源。应用程序(通常)并不知道数据被分割在不同的硬件上。系统依靠单节点 DBMS 的技术和算法来支持分布式环境中的事务处理和查询执…...
Arch linux 安装
Arch linux 安装 介绍下载制作iSO启动盘安装arch linux设置字体连接互联网 安装过程磁盘分区设置设置镜像源设置引导文件挂载点安装base等基础软件生成fatab文件更改时区更改编码、语言更改编码更改语言 用户管理设置root密码新建普通用户 安装grub启动网络服务/GDM查看系统网络…...

最新ChatGPT/GPT4科研应用与AI绘图及论文高效写作
详情点击链接:最新ChatGPT/GPT4科研应用与AI绘图及论文高效写作 一OpenAI 1.最新大模型GPT-4 Turbo 2.最新发布的高级数据分析,AI画图,图像识别,文档API 3.GPT Store 4.从0到1创建自己的GPT应用 5. 模型Gemini以及大模型Clau…...

【leetcode】移除元素
大家好,我是苏貝,本篇博客带大家刷题,如果你觉得我写的还不错的话,可以给我一个赞👍吗,感谢❤️ 目录 一.暴力求解法二.使用额外数组三.原地修改数组 点击查看题目 一.暴力求解法 若我们不考虑时间复杂度…...

Spring Boot整合Redis的高效数据缓存实践
引言 在现代Web应用开发中,数据缓存是提高系统性能和响应速度的关键。Redis作为一种高性能的缓存和数据存储解决方案,被广泛应用于各种场景。本文将研究如何使用Spring Boot整合Redis,通过这个强大的缓存工具提高应用的性能和可伸缩性。 整合…...
FastApi-参数接收的正确使用(2)
前言 本文是该专栏的第2篇,后面会持续分享FastApi以及项目实战的各种干货知识,值得关注。 本文重点介绍,在使用FastApi使用“参数接收”时遇到的三种类型“路径参数”,“查询参数”,“请求体”的相关问题以及相应的解决方案。 具体详细知识点,跟着笔者直接往下看正文。…...
三、需求规格说明书(软件工程示例)
1.引言 1.1编写目的 1.2项目背景 1.3定义 1.4参考资料 2.任务概述 2.1目标 2.2运行环境 2.3条件与限制 3.数据描述 3.1静态数据 3.2动态数据 3.3数据库介绍 3.4数据词典 3.5数据采集 4.功能需求 …...
Elasticsearch 查询语句概述
目录 1. Match Query 2. Term Query 3. Terms Query 4. Range Query 5. Bool Query 6. Wildcard Query 7. Fuzzy Query 8. Prefix Query 9. Aggregation Query Elasticsearch 是一个基于 Lucene 的搜索引擎,提供了丰富的查询DSL(Domain Specifi…...

kafka简单介绍和代码示例
“这是一篇理论文章,给大家讲一讲kafka” 简介 在大数据领域开发者常常会听到MQ这个术语,该术语便是消息队列的意思, Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年…...

一次解决ForkJoinPool日志追踪的辛酸经历
本文主要分享了一次解决ForkJoinPool日志追踪的辛酸经历。历时3个月终于找到通用的解决方案,以此文分享给有需要的你。 一、需求背景 1.某日,某同事根据日志ID排查生产环境问题过程中,发现日志不全 2.经排查发现中间有很多线程为ForkJoinP…...

VM使用教程--SDK取图 视频笔记
本笔记均由海康机器人官网的V学院视频中记录所得,属于省流大师了[doge] 图像采集 图像采集包括1图像源,2多图采集,3输出图像,4缓存图像,5光源 1图像源 图像源包括本地图像,相机采图,SDK 本…...
11.spring boot 启动源码(一)
目录 概述SpringApplication静态方法构造方法run 实例方法配置文件Actuator 工作原理*EndpointAutoConfigurationBeansEndpointAutoConfigurationShutdownEndpointAutoConfiguration结束概述 spring boot 版本 2.6.13 spring boot 启动源码(一) 涉及 SpringApplication 中静态…...

【微服务】springcloud集成sleuth与zipkin实现链路追踪
目录 一、前言 二、分布式链路调用问题 三、链路追踪中的几个概念 3.1 什么是链路追踪 3.2 常用的链路追踪技术 3.3 链路追踪的几个术语 3.3.1 span 编辑 3.3.2 trace 3.3.3 Annotation 四、sluth与zipkin概述 4.1 sluth介绍 4.1.1 sluth是什么 4.1.2 sluth核心…...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...

对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...

Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...

Python 实现 Web 静态服务器(HTTP 协议)
目录 一、在本地启动 HTTP 服务器1. Windows 下安装 node.js1)下载安装包2)配置环境变量3)安装镜像4)node.js 的常用命令 2. 安装 http-server 服务3. 使用 http-server 开启服务1)使用 http-server2)详解 …...

《信号与系统》第 6 章 信号与系统的时域和频域特性
目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...
深入理解 React 样式方案
React 的样式方案较多,在应用开发初期,开发者需要根据项目业务具体情况选择对应样式方案。React 样式方案主要有: 1. 内联样式 2. module css 3. css in js 4. tailwind css 这些方案中,均有各自的优势和缺点。 1. 方案优劣势 1. 内联样式: 简单直观,适合动态样式和…...

CVE-2023-25194源码分析与漏洞复现(Kafka JNDI注入)
漏洞概述 漏洞名称:Apache Kafka Connect JNDI注入导致的远程代码执行漏洞 CVE编号:CVE-2023-25194 CVSS评分:8.8 影响版本:Apache Kafka 2.3.0 - 3.3.2 修复版本:≥ 3.4.0 漏洞类型:反序列化导致的远程代…...