当前位置: 首页 > news >正文

KafkaStream:Springboot中集成

1、在kafka-demo中创建配置类

        配置kafka参数

package com.heima.kafkademo.config;import lombok.Data;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Data
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

2、在application.yml中配置上面配置类需要的参数

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

3、新增配置类,创建KStream对象,进行聚合

package com.heima.kafkademo.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}

4、启动kafka-demo服务测试

        使用生产者发送消息可以看到控制台接收成功

 

相关文章:

KafkaStream:Springboot中集成

1、在kafka-demo中创建配置类 配置kafka参数 package com.heima.kafkademo.config;import lombok.Data; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.springframework.boot.context.properties.Configu…...

包管理工具 nvm npm nrm yarn cnpm npx pnpm详解

包管理工具 nvm npm yarn cnpm npx pnpm npm、cnpm、yarn、pnpm、npx、nvm的区别&#xff1a;https://blog.csdn.net/weixin_53791978/article/details/122533843 npm、cnpm、yarn、pnpm、npx、nvm的区别&#xff1a;https://blog.csdn.net/weixin_53791978/article/details/1…...

【java】mybatis-plus代码生成

正常的代码生成这里就不介绍了。旨在记录实现如下功能&#xff1a; 分布式微服务环境下&#xff0c;生成的entity、dto、vo、feignClient等等api模块&#xff0c;需要和mapper、service、controller等等分在不同的目录生成。 为什么会出现这个需求&#xff1f; mybatis-plus&am…...

小样本UIE 信息抽取微调快速上手(不含doccona标注)

文章目录 1.安装环境&#xff08;可略过&#xff09;2.模型简介&#xff08;略读&#xff09;抽取任务输入输出示例&#xff1a;1.实体识别2.关系抽取 3.快速上手(主菜)&#xff08;1&#xff09;转换数据标注数据样例 &#xff08;2&#xff09;生成训练数据训练数据样例 &…...

Vue项目(购物车)

目录 购物车效果展示&#xff1a; 购物车代码&#xff1a; 购物车效果展示&#xff1a; 此项目添加、修改、删除数据的地方都写了浏览器都会把它存储起来 下次运行项目时会把浏览器数据拿出来并在页面展示 Video_20230816145047 购物车代码&#xff1a; 复制完代码&#xff0…...

23.08.16驱动点灯

#include <linux/init.h> #include <linux/module.h> #include <linux/fs.h> #include <linux/uaccess.h> #include <linux/io.h> #include <linux/device.h> #include "head.h"int major; char kbuf[128] {0};//定义指针接收映…...

数据结构——堆

数据结构——堆 堆堆简介堆的分类 二叉堆过程插入操作 删除操作向下调整&#xff1a; 增加某个点的权值实现参考代码&#xff1a;建堆方法一&#xff1a;使用 decreasekey&#xff08;即&#xff0c;向上调整&#xff09;方法二&#xff1a;使用向下调整 应用对顶堆 其他&#…...

重复学习1:NLP

目录 1. 自然语言处理与知识图谱1.1 RNN 循环神经网络初探 2. 吴恩达深度学习 1. 自然语言处理与知识图谱 1.1 RNN 循环神经网络初探 1.1.2 回顾数据维度与神经网络(1) 2. 吴恩达深度学习 P151 1.1 为什么选择序列模型&#xff08;1,2&#xff09; P152 1.2 数学符号(1,)...

做海外游戏推广有哪些条件?

做海外游戏推广需要充分准备和一系列条件的支持。以下是一些关键条件&#xff1a; 市场调研和策略制定&#xff1a;了解目标市场的文化、玩家偏好、竞争格局等是必要的。根据调研结果制定适合的推广策略。 本地化&#xff1a;将游戏内容、界面、语言、货币等进行本地化&#…...

JavaFx基础学习【五】:FXML布局文件使用

目录 前言 一、介绍 二、简单体验 三、FXML标签元素 四、fx属性介绍 五、重写initialize&#xff08;名字需要保持一致&#xff09;方法 六、Scene Builder快速布局 前言 如果你还没有看过前面的文章&#xff0c;可以通过以下链接快速前往学习&#xff1a; JavaFx基础学…...

通过Python爬虫提升网站搜索排名

目录 怎么使用Python爬虫提升排名 1. 抓取竞争对手数据&#xff1a; 2. 关键词研究&#xff1a; 3. 网页内容优化&#xff1a; 4. 内部链接建设&#xff1a; 5. 外部链接建设&#xff1a; 6. 监测和调整&#xff1a; 需要注意哪些方面 1. 合法性和道德性&#xff1a; …...

【博客698】为什么当linux作为router使用时,安装docker后流量转发失败

为什么当linux作为router使用时&#xff0c;安装docker后流量转发失败 场景 当一台linux机器作为其它服务器的router&#xff0c;负责转发流量的时候&#xff0c;让你在linux上安装docker之后&#xff0c;就会出现流量都被drop掉了 原因 没装docker之前&#xff1a; [root~]…...

el-dialog嵌套,修改内层el-dialog样式(自定义样式)

el-dialog嵌套使用时,内层的el-dialog要添加append-to-body属性 给内层的el-dialog添加custom-class属性,添加自定义类名 <el-dialog:visible.sync"dialogVisible"append-to-bodycustom-class"tree-cesium-container"><span>这是一段信息<…...

B树和B+树区别

B树和B树的区别 B树 B树被称为平衡树&#xff0c;在B树中&#xff0c;一个节点可以有两个以上的子节点。B树的高度为log M N。在B树中&#xff0c;数据按照特定的顺序排序&#xff0c;最小值在左侧&#xff0c;最大值在右侧。 B树是一种平衡的多分树&#xff0c;通常我们说m阶…...

intelJ IDEA\PHPStorm \WebStorm\PyCharm 通过ssh连接远程Mysql\Postgresql等数据库

最容易出错的地方是在general面板下的host&#xff0c;不应该填真实的host地址&#xff0c;而应该填localhost或者127.0.0.1 具体操作步骤见下图...

vfuhyuuy

Sublime Text is an awesome text editor. If you’ve never heard of it, you shouldcheck it out right now. I’ve made this tutorial because there’s no installer for the Linux versions of Sublime Text. While that’s not a real problem, I feel there is a clean…...

CSS自学框架之表单

首先我们看一下表单样式&#xff0c;下面共有5张截图 一、CSS代码 /*表单*/fieldset{border: none;margin-bottom: 2em;}fieldset > *{ margin-bottom: 1em }fieldset:last-child{ margin-bottom: 0 }fieldset legend{ margin: 0 0 1em }/* legend标签是CSS中用于定义…...

使用Spring Boot和Redis实现用户IP接口限流的详细指南

系列文章目录 文章目录 系列文章目录前言一、准备工作二、编写限流过滤器三、配置Redis四、测试接口限流总结 前言 在高并发场景下&#xff0c;为了保护系统免受恶意请求的影响&#xff0c;接口限流是一项重要的安全措施。本文将介绍如何使用Spring Boot和Redis来实现用户IP的…...

前端性能优化——包体积压缩插件,打包速度提升插件,提升浏览器响应的速率模式

前端代码优化 –其他的优化可以具体在网上搜索 压缩项目打包后的体积大小、提升打包速度&#xff0c;是前端性能优化中非常重要的环节&#xff0c;结合工作中的实践总结&#xff0c;梳理出一些 常规且有效 的性能优化建议 ue 项目可以通过添加–report命令&#xff1a; "…...

配置vscode

配置vscode 设置相关 网址&#xff1a;https://code.visualstudio.com/ 搜索不要用百度用这个&#xff1a;cn.bing.com 1.安装中文包 Chinese (Simplified) (简体中文) 2.安装 open in browser 3.安装主题 Atom One Dark Theme 4. 安装图标样式 VSCode Great Icons 5.安装 L…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15

缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下&#xff1a; struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验

一、多模态商品数据接口的技术架构 &#xff08;一&#xff09;多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如&#xff0c;当用户上传一张“蓝色连衣裙”的图片时&#xff0c;接口可自动提取图像中的颜色&#xff08;RGB值&…...

springboot整合VUE之在线教育管理系统简介

可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生&#xff0c;小白用户&#xff0c;想学习知识的 有点基础&#xff0c;想要通过项…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝

目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为&#xff1a;一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...

关于uniapp展示PDF的解决方案

在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项&#xff1a; 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库&#xff1a; npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...

API网关Kong的鉴权与限流:高并发场景下的核心实践

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 引言 在微服务架构中&#xff0c;API网关承担着流量调度、安全防护和协议转换的核心职责。作为云原生时代的代表性网关&#xff0c;Kong凭借其插件化架构…...

《信号与系统》第 6 章 信号与系统的时域和频域特性

目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...