互联网全景消息(10)之Kafka深度剖析(中)
一、深入应用
1.1 SpringBoot集成Kafka
引入对应的依赖。
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--swagger2增强,官方ui太low , 访问地址: /doc.html --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>swagger-bootstrap-ui</artifactId><version>1.8.8</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency></dependencies>
添加配置文件:
spring:application:name: demokafka:bootstrap-servers: 这里换成自己的kafka信息producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: com.itheima.demo.config.MySerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # consumer消费者group-id: javagroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: com.itheima.demo.config.MyDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
启动服务:
@SpringBootApplication
@RestController
public class Demo {public static void main(String[] args) {new SpringApplicationBuilder(Demo.class).run(args);}}
启动信息如下:
1.2 消息发送
1.2.1 异步发送
KafkaTemplate调用的send默认采用的是异步发送,如果需要同步发送获取发送结果,则需要调用get方法。
@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}@KafkaListener(topics = {"test"})public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("message:{}", msg);}}
效果如下:
同步发送代码如下:
@GetMapping("/kafka/sync/{msg}")public void sync(@PathVariable("msg") String msg) throws Exception {Message message = new Message();message.setMessage(msg);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);logger.info("send result:{}",result.getProducerRecord().value());}
1.2.2 序列化
序列化详解:
- 前面我们用到的是Kafka自带的字符串序列化器,
org.apache.kafka.common.serialization.StringDeserializer
- 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long等;
- 这些序列化接器都实现了org.apache.kafka.common.serialization.Serializer
自定义序列化,自己实现序列化对应的接口即可,如下:
public class MySerializer implements Serializer {@Overridepublic byte[] serialize(String s, Object o) {String json = JSON.toJSONString(o);return json.getBytes();}}
然后在yaml配置自己的编辑器:
value-serializer: com.itheima.demo.config.MySerializer
对应的我们在消费者消费消息的时候也需要按照我们自定义的方式进行解码,具体代码如下:
package com.itheima.demo.config;import com.alibaba.fastjson.JSON;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.Map;public class MyDeserializer implements Deserializer {private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);@Overridepublic Object deserialize(String s, byte[] bytes) {try {String json = new String(bytes,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}
1.2.3 分区策略
分区策略决定了消息根据key投放到那个分区,也是顺序消费保障的基石。
- 给定了分区号,直接将数据发送到指定分区里面去;
- 没有给定了分区号,给定数据的key值,通过key取上hashcode进行分区;
- 既没有给定分区号,也没有给定key值,直接轮询进行分区;
- 自定义分区策略,按照自定义需求选择分区号;
生产者代码如下:
//测试分区发送
@RestController
public class PartitionProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;// 指定分区发送
// 不管你key是什么,到同一个分区@GetMapping("/kafka/partitionSend/{key}")public void setPartition(@PathVariable("key") String key) {kafkaTemplate.send("test", 0,key,"key="+key+",msg=指定0号分区");}// 指定key发送,不指定分区
// 根据key做hash,相同的key到同一个分区@GetMapping("/kafka/keysend/{key}")public void setKey(@PathVariable("key") String key) {kafkaTemplate.send("test", key,"key="+key+",msg=不指定分区");}// 什么也不指定@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}
消费者代码如下:
//指定消费组消费
@Component
public class PartitionConsumer {private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);//分区消费@KafkaListener(topics = {"test"},topicPattern = "0")public void onMessage(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=0,message:[{}]", msg);}}@KafkaListener(topics = {"test"},topicPattern = "1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=1,message:[{}]", msg);}}
}
1)测试默认分区策略,发送什么也不指定的消息
可以发现发送的是同一条的数据,他是跟partition轮询发送的;
2)测试指定分区
3)按照key的hashcode来分区
1.3 消息消费
1.3.1 消费者分组
public class GroupConsumer {private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);//组1,消费者1@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-1 , message:{}", msg);}}//组1,消费者2@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-2 , message:{}", msg);}}//组2,只有一个消费者@KafkaListener(topics = {"test"},groupId = "group2")public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group2 , message:{}", msg);}}
}
启动:
需要注意的是,注意分区数与消费者数的搭配,如果(消费者数 > 分区数量),消费者将会出现闲置,浪费资源!
1.3.2 位移提交
1)自动提交,我们在前面设置了以下两个选项,则kafka会延时设置自动提交:
2)手动提交,有些时候我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复,如下我们配置手动提交:
@Configuration
public class MyOffsetConfig {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 注意这里!!!设置手动提交configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));// ack模式:// AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种://// RECORD// 每处理一条commit一次//// BATCH(默认)// 每次poll的时候批量提交一次,频率取决于每次poll的调用频率//// TIME// 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)//// COUNT// 累积达到ackCount次的ack去commit//// COUNT_TIME// ackTime或ackCount哪个条件先满足,就commit//// MANUAL// listener负责ack,但是背后也是批量上去//// MANUAL_IMMEDIATE// listner负责ack,每调用一次,就立即commitfactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}
相关文章:

互联网全景消息(10)之Kafka深度剖析(中)
一、深入应用 1.1 SpringBoot集成Kafka 引入对应的依赖。 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupI…...
Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动同步
Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动同步 目录 Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动…...
pytorch小记(一):pytorch矩阵乘法:torch.matmul(x, y)
pytorch小记(一):pytorch矩阵乘法:torch.matmul(x, y)/ x y 代码代码 1:torch.matmul(x, y)输入张量:计算逻辑:输出结果: 代码 2:y y.view(4,1)…...

PyTorch环境配置常见报错的解决办法
目标 小白在最基础的环境配置里一般都会出现许多问题。 这里把一些常见的问题分享出来。希望可以节省大家一些时间。 最终目标是可以在cmd虚拟环境里进入jupyter notebook,new的时候有对应的环境,并且可以跑通所有的import code。 第一步:…...

罗永浩再创业,这次盯上了 AI?
罗永浩,1972年7月9日生于中国延边朝鲜族自治州的一个军人家庭,是一名朝鲜族人;早年在新东方授课,2004年当选 “网络十大红人” ;2006年8月1日,罗永浩创办牛博网;2008年5月,罗永浩注册…...
VUE3 provide 和 inject,跨越多层级组件传递数据
provide 和 inject 是 Vue 3 提供的 API,主要用于实现祖先组件与后代组件之间的依赖注入。它们可以让你在组件树中,跨越多层组件传递数据,而不需要通过 props 或事件的方式逐层传递。这个机制主要用于状态共享、插件系统或某些跨层级的功能。…...

git打补丁
1、应用场景 跨仓库升级 开发项目B使用的是开源项目A。开源项目A发现漏洞,作者进行了修复,我们可以通过使用git补丁的方式,将作者修改的内容复制到我 们的项目B中。 2、TortoiseGit方式 源仓库 格式化补丁 根据提交数量,生成…...

机械燃油车知识图谱、知识大纲、知识结构(持续更新...)
一、发动机 曲柄连杆机构 配气机构 点火系统 起动系统 燃油供给系统 润滑系统 冷却系统 二、底盘 (一)传动系统 1、离合器 2、变速器 3、万向传动装置 4、驱动桥 (二)行驶系统 1、车架 2、车桥 3、悬架 4、车轮 &a…...
Vue3学习总结
一、Vue 3 基础搭建与核心语法 1.创建 Vue 3 应用 在项目的入口文件 main.js 中,通过以下代码创建 Vue 3 应用实例: import { createApp } from vue; import App from ./App.vue;const app createApp(App); app.mount(#app); 这几行代码的作用是引入…...

Type-C双屏显示器方案
在数字化时代,高效的信息处理和视觉体验已成为我们日常生活和工作的关键需求。随着科技的进步,一款结合了便携性和高效视觉输出的设备——双屏便携屏,逐渐崭露头角,成为追求高效工作和娱乐体验人群的新宠。本文将深入探讨双屏便携…...

【读书与思考】焦虑与内耗
【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】【读书与思考】 导言 今天一个朋友和我说,最近比较焦虑和内耗,无心工作和学习,我问他你焦虑内耗的时候,时间主要花在哪了,他告诉我说主要花在看有关移…...
基于python的网页表格数据下载--转excel
基于 Python 的网页表格数据爬取与下载:以维基百科为例 目录 基于 Python 的网页表格数据爬取与下载:以维基百科为例1. 背景介绍2. 工具与环境3. 操作步骤1. 获取网页内容2. 定位表格元素3. 表格变身 Pandas DataFrame4. 检查数据,收工!5. 进阶玩法与优化6. 完整代码4. 结果…...
Vue.js开发入门:从零开始搭建你的第一个项目
前言 嘿,小伙伴们!今天咱们来聊聊 Vue.js,一个超火的前端框架。如果你是编程小白,别怕,跟着我一步步来,保证你能轻松上手,搭建起属于自己的第一个 Vue 项目。Vue.js 可能听起来有点高大上&#…...

LS1046+XILINX XDMA PCIE调通
欢迎点赞收藏,欢迎私下讨论技术,分享技术 硬件平台 :NXP LS1046 XILINX FPGA 软件平台:LINUX 4.19.68 buildroot LS1046 PEX3 接 XILINX FPGA,linux使用designware的PCI主控制器。下载XILINX DMA驱动,解…...

HarmonyOS:@LocalBuilder装饰器: 维持组件父子关系
一、前言 当开发者使用Builder做引用数据传递时,会考虑组件的父子关系,使用了bind(this)之后,组件的父子关系和状态管理的父子关系并不一致。为了解决组件的父子关系和状态管理的父子关系保持一致的问题,引入LocalBuilder装饰器。…...
YOLOv10-1.1部分代码阅读笔记-downloads.py
downloads.py ultralytics\utils\downloads.py 目录 downloads.py 1.所需的库和模块 2.def is_url(url, checkFalse): 3.def delete_dsstore(path, files_to_delete(".DS_Store", "__MACOSX")): 4.def zip_directory(directory, compressTrue, ex…...

计算机图形学【绘制立方体和正六边形】
工具介绍 OpenGL:一个跨语言的图形API,用于渲染2D和3D图形。它提供了绘制图形所需的底层功能。 GLUT:OpenGL的一个工具库,简化了窗口创建、输入处理和其他与图形环境相关的任务。 使用的函数 1. glClear(GL_COLOR_BUFFER_BIT |…...

基于django中医药数据可视化平台(源码+lw+部署文档+讲解),源码可白嫖!
摘要 时代在飞速进步,每个行业都在努力发展现在先进技术,通过这些先进的技术来提高自己的水平和优势,中医药管理平台当然不能排除在外。中医药数据可视化平台是在实际应用和软件工程的开发原理之上,运用Python语言、ECharts技术、…...

kafka消费堆积问题探索
背景 我们的商城项目用PHP写的,原本写日志方案用的是PHP的方案,但是,这个方案导致资源消耗一直降不下来,使用了20个CPU。后面考虑使用通过kafka的方案写日志,商城中把产生的日志丢到kafka中,在以go写的项目…...
Vue.js 使用插槽(Slots)优化组件结构
Vue.js 使用插槽(Slots)优化组件结构 今天我们聊聊 Vue.js 的一个超实用功能——插槽(Slots)。插槽是 Vue 组件开发中的神器,用好它,你可以让组件变得更灵活、更可复用,还能写出优雅的代码结构…...
Cursor实现用excel数据填充word模版的方法
cursor主页:https://www.cursor.com/ 任务目标:把excel格式的数据里的单元格,按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例,…...

简易版抽奖活动的设计技术方案
1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...

视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...

蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...