当前位置: 首页 > news >正文

Kafka系列(四)

本文接kafka三,代码实践kafkaStream的应用,用来完成流式计算。

kafkastream

        关于流式计算也就是实时处理,无时间概念边界的处理一些数据。想要更有性价比地和java程序进行结合,因此了解了kafka。但是本人阅读了kafka地官网,觉得可阅读性并不是很高,当然是个人认为,就是界面做的就不是很舒服。

简介

简介一下kafkaStream

Kafka Stream的特点
  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署

  • 除了Kafka外,无任何外部依赖

  • 充分利用Kafka分区机制实现水平扩展和顺序性保证(想要保证消息有序性就要设置一个分区)

  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)

  • 支持正好一次处理语义

  • 提供记录级的处理能力,从而实现毫秒级的低延迟

  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)

  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

关键概念
  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

Kstream

(1)数据结构类似于map,key-value键值对

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果流处理应用是要总结每个用户的价值,它将返回alice,4。因为第二条数据记录不会覆盖第一条,而是做了一个insert,累加。

代码实现

依赖
       <!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>
kafkaStream配置类

需要在nacos的配置里面配置hosts属性和group,本地等怎么配置都可以,只要能读取到就行。


/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

这里生产者和消费者我就不再举例子了,直接举中间这个stream怎么写。

stream需要知道是谁发的,所以生产者和stream需要绑定一个相同的主题,而stream需要知道要给谁发送过去,消费者知道是谁发的,所以stream和消费者又有一个相同的主题。

streamhandler代码

具体的每一行代码的含义结合个人理解都在注释里面。

package com.neu.article.stream;import com.alibaba.fastjson.JSON;import com.neu.base.constants.HotArticleConstants;
import com.neu.base.model.mess.ArticleVisitStreamMess;
import com.neu.base.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;@Configuration
@Slf4j
public class HotArticleStreamHandler {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//接收消息KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);//聚合流式处理stream.map((key,value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);//重置消息的key:1234343434(文章id)   和  value: likes:1 当前文章点赞一次//mess.getType().name():用于区分是点赞还是阅读 mess.getAdd():用于区分是加1还是减1return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());})//按照文章id进行聚合.groupBy((key,value)->key)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))/*** 自行的完成聚合的计算*/.aggregate(new Initializer<String>() {/*** 初始方法,返回值是消息的value->aggValue,聚合之后的value* @return*/@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}/*** 真正的聚合操作,返回值是消息的value*/}, new Aggregator<String, String, String>() {/**** @param key 消息的key :mess.getArticleId().toString()* @param value  消息的value likes:1* @param aggValue   初始化消息聚合后的一个值 COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0* @return*/@Overridepublic String apply(String key, String value, String aggValue) {System.out.println(value);if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col = 0,com=0,lik=0,vie=0;for (String agg : aggAry) {//agg遍历第一次的时候最开始为 COLLECTION:0String[] split = agg.split(":");//split[0] = COLLECTION  split[1] = 0/*** 获得初始值,也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}/*** 累加操作   likes:1*/String[] valAry = value.split(":");//valAry[0] = likes  valAry[1] = 1switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}//返回值是有要求的,必须与初始化apply方法的返回值形式一致String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id:"+key);System.out.println("当前时间窗口内的消息处理结果:"+formatStr);return formatStr;}//Materialized.as("hot-atricle-stream-count-001"):用于指定六十处理的状态,字符串可以随便给,多个流处理的话不重复就行}, Materialized.as("hot-atricle-stream-count-001")).toStream().map((key,value)->{//key.key().toString():文章id,value:COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));})//发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/public String formatObj(String articleId,String value){ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));return JSON.toJSONString(mess);}
}

相关文章:

Kafka系列(四)

本文接kafka三&#xff0c;代码实践kafkaStream的应用&#xff0c;用来完成流式计算。 kafkastream 关于流式计算也就是实时处理&#xff0c;无时间概念边界的处理一些数据。想要更有性价比地和java程序进行结合&#xff0c;因此了解了kafka。但是本人阅读了kafka地官网&#…...

【Linux学习】进程信号

目录 十七.进程信号 导言 17.1 linux中的信号列表 17.2 标准信号与实时信号 17.3 信号的产生 17.3.1 通过终端按键产生信号 17.3.2 调用系统函数产生信号 17.3.3 软件条件产生信号 17.3.4 硬件异常产生信号 17.3.5 【补充】核心转储 Core Dump 17.4 信号的阻塞 17.4.1 信号相关…...

机器学习没那么难,Azure AutoML帮你简单3步实现自动化模型训练

在Machine Learning 这个领域&#xff0c;通常训练一个业务模型的难点并不在于算法的选择&#xff0c;而在于前期的数据清理和特征工程这些纷繁复杂的工作&#xff0c;训练过程中的问题在于参数的反复迭代优化。 AutoML 是 Azure Databricks 的一项功能&#xff0c;它自动的对…...

数学建模实战Matlab绘图

二维曲线、散点图 绘图命令&#xff1a;plot(x,y,’line specifiers’,’PropertyName’,PropertyValue) 例子&#xff1a;绘图表示年收入与年份的关系 ‘--r*’:--设置线型&#xff1b;r:设置颜色为红色&#xff1b;*节点型号 ‘linewidth’&#xff1a;设置线宽&#xff1…...

TypeError the JSON object must be str, bytes or bytearray, not ‘list‘

在使用python的jason库时&#xff0c;偶然碰到以下问题 TypeError: the JSON object must be str, bytes or bytearray, not ‘list’ 通过如下代码可复现问题 >>> a [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> import json >>> ra json.loads(a) Trac…...

数字IC后端设计实现 | PR工具中到底应该如何控制density和congestion?(ICC2Innovus)

吾爱IC社区星友提问&#xff1a;请教星主和各位大佬&#xff0c;对于一个模块如果不加干预工具会让inst挤成一团&#xff0c;后面eco修时序就没有空间了。如果全都加instPadding会导致面积不够overlap&#xff0c;大家一般怎么处理这种问题&#xff1f; 在数字IC后端设计实现中…...

产品经理与产品运营的区别和联系

一、两者的职责区别 产品经理的目的&#xff1a;是创造有价值的产品 产品运营的目的&#xff1a;是让产品能有效的发挥出它应有的价值 二、两者的工作内容区别产品经理的工作内容 产品的经理的目的是创造有价值的产品&#xff0c;因此产品经理的所有工作都是围绕着&#xf…...

CMU15-445-Spring-2023-分布式DBMS初探(lec21-24)

Lecture #21_ Introduction to Distributed Databases Distributed DBMSs 分布式 DBMS 将单个逻辑数据库划分为多个物理资源。应用程序&#xff08;通常&#xff09;并不知道数据被分割在不同的硬件上。系统依靠单节点 DBMS 的技术和算法来支持分布式环境中的事务处理和查询执…...

Arch linux 安装

Arch linux 安装 介绍下载制作iSO启动盘安装arch linux设置字体连接互联网 安装过程磁盘分区设置设置镜像源设置引导文件挂载点安装base等基础软件生成fatab文件更改时区更改编码、语言更改编码更改语言 用户管理设置root密码新建普通用户 安装grub启动网络服务/GDM查看系统网络…...

最新ChatGPT/GPT4科研应用与AI绘图及论文高效写作

详情点击链接&#xff1a;最新ChatGPT/GPT4科研应用与AI绘图及论文高效写作 一OpenAI 1.最新大模型GPT-4 Turbo 2.最新发布的高级数据分析&#xff0c;AI画图&#xff0c;图像识别&#xff0c;文档API 3.GPT Store 4.从0到1创建自己的GPT应用 5. 模型Gemini以及大模型Clau…...

【leetcode】移除元素

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家刷题&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 一.暴力求解法二.使用额外数组三.原地修改数组 点击查看题目 一.暴力求解法 若我们不考虑时间复杂度…...

Spring Boot整合Redis的高效数据缓存实践

引言 在现代Web应用开发中&#xff0c;数据缓存是提高系统性能和响应速度的关键。Redis作为一种高性能的缓存和数据存储解决方案&#xff0c;被广泛应用于各种场景。本文将研究如何使用Spring Boot整合Redis&#xff0c;通过这个强大的缓存工具提高应用的性能和可伸缩性。 整合…...

FastApi-参数接收的正确使用(2)

前言 本文是该专栏的第2篇,后面会持续分享FastApi以及项目实战的各种干货知识,值得关注。 本文重点介绍,在使用FastApi使用“参数接收”时遇到的三种类型“路径参数”,“查询参数”,“请求体”的相关问题以及相应的解决方案。 具体详细知识点,跟着笔者直接往下看正文。…...

三、需求规格说明书(软件工程示例)

1&#xff0e;引言 1.1编写目的 1.2项目背景 1.3定义 1.4参考资料 2&#xff0e;任务概述 2.1目标 2.2运行环境 2.3条件与限制 3&#xff0e;数据描述 3.1静态数据 3.2动态数据 3.3数据库介绍 3.4数据词典 3.5数据采集 4&#xff0e;功能需求 …...

Elasticsearch 查询语句概述

目录 1. Match Query 2. Term Query 3. Terms Query 4. Range Query 5. Bool Query 6. Wildcard Query 7. Fuzzy Query 8. Prefix Query 9. Aggregation Query Elasticsearch 是一个基于 Lucene 的搜索引擎&#xff0c;提供了丰富的查询DSL&#xff08;Domain Specifi…...

kafka简单介绍和代码示例

“这是一篇理论文章&#xff0c;给大家讲一讲kafka” 简介 在大数据领域开发者常常会听到MQ这个术语&#xff0c;该术语便是消息队列的意思&#xff0c; Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布&#xff0c;使用Scala语言编写&#xff0c;与2010年…...

一次解决ForkJoinPool日志追踪的辛酸经历

本文主要分享了一次解决ForkJoinPool日志追踪的辛酸经历。历时3个月终于找到通用的解决方案&#xff0c;以此文分享给有需要的你。 一、需求背景 1.某日&#xff0c;某同事根据日志ID排查生产环境问题过程中&#xff0c;发现日志不全 2.经排查发现中间有很多线程为ForkJoinP…...

VM使用教程--SDK取图 视频笔记

本笔记均由海康机器人官网的V学院视频中记录所得&#xff0c;属于省流大师了[doge] 图像采集 图像采集包括1图像源&#xff0c;2多图采集&#xff0c;3输出图像&#xff0c;4缓存图像&#xff0c;5光源 1图像源 图像源包括本地图像&#xff0c;相机采图&#xff0c;SDK 本…...

11.spring boot 启动源码(一)

目录 概述SpringApplication静态方法构造方法run 实例方法配置文件Actuator 工作原理*EndpointAutoConfigurationBeansEndpointAutoConfigurationShutdownEndpointAutoConfiguration结束概述 spring boot 版本 2.6.13 spring boot 启动源码(一) 涉及 SpringApplication 中静态…...

【微服务】springcloud集成sleuth与zipkin实现链路追踪

目录 一、前言 二、分布式链路调用问题 三、链路追踪中的几个概念 3.1 什么是链路追踪 3.2 常用的链路追踪技术 3.3 链路追踪的几个术语 3.3.1 span ​编辑 3.3.2 trace 3.3.3 Annotation 四、sluth与zipkin概述 4.1 sluth介绍 4.1.1 sluth是什么 4.1.2 sluth核心…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施&#xff0c;由雇主和个人按一定比例缴纳保险费&#xff0c;建立社会医疗保险基金&#xff0c;支付雇员医疗费用的一种医疗保险制度&#xff0c; 它是促进社会文明和进步的…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

el-switch文字内置

el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...

cf2117E

原题链接&#xff1a;https://codeforces.com/contest/2117/problem/E 题目背景&#xff1a; 给定两个数组a,b&#xff0c;可以执行多次以下操作&#xff1a;选择 i (1 < i < n - 1)&#xff0c;并设置 或&#xff0c;也可以在执行上述操作前执行一次删除任意 和 。求…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

爬虫基础学习day2

# 爬虫设计领域 工商&#xff1a;企查查、天眼查短视频&#xff1a;抖音、快手、西瓜 ---> 飞瓜电商&#xff1a;京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空&#xff1a;抓取所有航空公司价格 ---> 去哪儿自媒体&#xff1a;采集自媒体数据进…...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

力扣热题100 k个一组反转链表题解

题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...