KafkaStream:Springboot中集成
1、在kafka-demo中创建配置类
配置kafka参数
package com.heima.kafkademo.config;import lombok.Data;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Data
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}
2、在application.yml中配置上面配置类需要的参数
server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}
3、新增配置类,创建KStream对象,进行聚合
package com.heima.kafkademo.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}
4、启动kafka-demo服务测试
使用生产者发送消息可以看到控制台接收成功

相关文章:
KafkaStream:Springboot中集成
1、在kafka-demo中创建配置类 配置kafka参数 package com.heima.kafkademo.config;import lombok.Data; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.springframework.boot.context.properties.Configu…...
包管理工具 nvm npm nrm yarn cnpm npx pnpm详解
包管理工具 nvm npm yarn cnpm npx pnpm npm、cnpm、yarn、pnpm、npx、nvm的区别:https://blog.csdn.net/weixin_53791978/article/details/122533843 npm、cnpm、yarn、pnpm、npx、nvm的区别:https://blog.csdn.net/weixin_53791978/article/details/1…...
【java】mybatis-plus代码生成
正常的代码生成这里就不介绍了。旨在记录实现如下功能: 分布式微服务环境下,生成的entity、dto、vo、feignClient等等api模块,需要和mapper、service、controller等等分在不同的目录生成。 为什么会出现这个需求? mybatis-plus&am…...
小样本UIE 信息抽取微调快速上手(不含doccona标注)
文章目录 1.安装环境(可略过)2.模型简介(略读)抽取任务输入输出示例:1.实体识别2.关系抽取 3.快速上手(主菜)(1)转换数据标注数据样例 (2)生成训练数据训练数据样例 &…...
Vue项目(购物车)
目录 购物车效果展示: 购物车代码: 购物车效果展示: 此项目添加、修改、删除数据的地方都写了浏览器都会把它存储起来 下次运行项目时会把浏览器数据拿出来并在页面展示 Video_20230816145047 购物车代码: 复制完代码࿰…...
23.08.16驱动点灯
#include <linux/init.h> #include <linux/module.h> #include <linux/fs.h> #include <linux/uaccess.h> #include <linux/io.h> #include <linux/device.h> #include "head.h"int major; char kbuf[128] {0};//定义指针接收映…...
数据结构——堆
数据结构——堆 堆堆简介堆的分类 二叉堆过程插入操作 删除操作向下调整: 增加某个点的权值实现参考代码:建堆方法一:使用 decreasekey(即,向上调整)方法二:使用向下调整 应用对顶堆 其他&#…...
重复学习1:NLP
目录 1. 自然语言处理与知识图谱1.1 RNN 循环神经网络初探 2. 吴恩达深度学习 1. 自然语言处理与知识图谱 1.1 RNN 循环神经网络初探 1.1.2 回顾数据维度与神经网络(1) 2. 吴恩达深度学习 P151 1.1 为什么选择序列模型(1,2) P152 1.2 数学符号(1,)...
做海外游戏推广有哪些条件?
做海外游戏推广需要充分准备和一系列条件的支持。以下是一些关键条件: 市场调研和策略制定:了解目标市场的文化、玩家偏好、竞争格局等是必要的。根据调研结果制定适合的推广策略。 本地化:将游戏内容、界面、语言、货币等进行本地化&#…...
JavaFx基础学习【五】:FXML布局文件使用
目录 前言 一、介绍 二、简单体验 三、FXML标签元素 四、fx属性介绍 五、重写initialize(名字需要保持一致)方法 六、Scene Builder快速布局 前言 如果你还没有看过前面的文章,可以通过以下链接快速前往学习: JavaFx基础学…...
通过Python爬虫提升网站搜索排名
目录 怎么使用Python爬虫提升排名 1. 抓取竞争对手数据: 2. 关键词研究: 3. 网页内容优化: 4. 内部链接建设: 5. 外部链接建设: 6. 监测和调整: 需要注意哪些方面 1. 合法性和道德性: …...
【博客698】为什么当linux作为router使用时,安装docker后流量转发失败
为什么当linux作为router使用时,安装docker后流量转发失败 场景 当一台linux机器作为其它服务器的router,负责转发流量的时候,让你在linux上安装docker之后,就会出现流量都被drop掉了 原因 没装docker之前: [root~]…...
el-dialog嵌套,修改内层el-dialog样式(自定义样式)
el-dialog嵌套使用时,内层的el-dialog要添加append-to-body属性 给内层的el-dialog添加custom-class属性,添加自定义类名 <el-dialog:visible.sync"dialogVisible"append-to-bodycustom-class"tree-cesium-container"><span>这是一段信息<…...
B树和B+树区别
B树和B树的区别 B树 B树被称为平衡树,在B树中,一个节点可以有两个以上的子节点。B树的高度为log M N。在B树中,数据按照特定的顺序排序,最小值在左侧,最大值在右侧。 B树是一种平衡的多分树,通常我们说m阶…...
intelJ IDEA\PHPStorm \WebStorm\PyCharm 通过ssh连接远程Mysql\Postgresql等数据库
最容易出错的地方是在general面板下的host,不应该填真实的host地址,而应该填localhost或者127.0.0.1 具体操作步骤见下图...
vfuhyuuy
Sublime Text is an awesome text editor. If you’ve never heard of it, you shouldcheck it out right now. I’ve made this tutorial because there’s no installer for the Linux versions of Sublime Text. While that’s not a real problem, I feel there is a clean…...
CSS自学框架之表单
首先我们看一下表单样式,下面共有5张截图 一、CSS代码 /*表单*/fieldset{border: none;margin-bottom: 2em;}fieldset > *{ margin-bottom: 1em }fieldset:last-child{ margin-bottom: 0 }fieldset legend{ margin: 0 0 1em }/* legend标签是CSS中用于定义…...
使用Spring Boot和Redis实现用户IP接口限流的详细指南
系列文章目录 文章目录 系列文章目录前言一、准备工作二、编写限流过滤器三、配置Redis四、测试接口限流总结 前言 在高并发场景下,为了保护系统免受恶意请求的影响,接口限流是一项重要的安全措施。本文将介绍如何使用Spring Boot和Redis来实现用户IP的…...
前端性能优化——包体积压缩插件,打包速度提升插件,提升浏览器响应的速率模式
前端代码优化 –其他的优化可以具体在网上搜索 压缩项目打包后的体积大小、提升打包速度,是前端性能优化中非常重要的环节,结合工作中的实践总结,梳理出一些 常规且有效 的性能优化建议 ue 项目可以通过添加–report命令: "…...
配置vscode
配置vscode 设置相关 网址:https://code.visualstudio.com/ 搜索不要用百度用这个:cn.bing.com 1.安装中文包 Chinese (Simplified) (简体中文) 2.安装 open in browser 3.安装主题 Atom One Dark Theme 4. 安装图标样式 VSCode Great Icons 5.安装 L…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
系统设计 --- MongoDB亿级数据查询优化策略
系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log,共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题,不能使用ELK只能使用…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...
【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...
Visual Studio Code 扩展
Visual Studio Code 扩展 change-case 大小写转换EmmyLua for VSCode 调试插件Bookmarks 书签 change-case 大小写转换 https://marketplace.visualstudio.com/items?itemNamewmaurer.change-case 选中单词后,命令 changeCase.commands 可预览转换效果 EmmyLua…...
