当前位置: 首页 > news >正文

互联网全景消息(10)之Kafka深度剖析(中)

一、深入应用

1.1 SpringBoot集成Kafka

        引入对应的依赖。

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--swagger2增强,官方ui太low , 访问地址: /doc.html  --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>swagger-bootstrap-ui</artifactId><version>1.8.8</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency></dependencies>

        添加配置文件:

spring:application:name: demokafka:bootstrap-servers: 这里换成自己的kafka信息producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: com.itheima.demo.config.MySerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # consumer消费者group-id: javagroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: com.itheima.demo.config.MyDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

        启动服务:

@SpringBootApplication
@RestController
public class Demo {public static void main(String[] args) {new SpringApplicationBuilder(Demo.class).run(args);}}

        启动信息如下:

1.2 消息发送 

 1.2.1 异步发送

        KafkaTemplate调用的send默认采用的是异步发送,如果需要同步发送获取发送结果,则需要调用get方法。 

@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}@KafkaListener(topics = {"test"})public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("message:{}", msg);}}

        效果如下:

        同步发送代码如下:

    @GetMapping("/kafka/sync/{msg}")public void sync(@PathVariable("msg") String msg) throws Exception {Message message = new Message();message.setMessage(msg);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);logger.info("send result:{}",result.getProducerRecord().value());}

1.2.2 序列化 

        序列化详解:

  • 前面我们用到的是Kafka自带的字符串序列化器,
    org.apache.kafka.common.serialization.StringDeserializer
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long等;
  • 这些序列化接器都实现了org.apache.kafka.common.serialization.Serializer 

        自定义序列化,自己实现序列化对应的接口即可,如下:

public class MySerializer implements Serializer {@Overridepublic byte[] serialize(String s, Object o) {String json = JSON.toJSONString(o);return json.getBytes();}}

        然后在yaml配置自己的编辑器:

value-serializer: com.itheima.demo.config.MySerializer

         对应的我们在消费者消费消息的时候也需要按照我们自定义的方式进行解码,具体代码如下:

package com.itheima.demo.config;import com.alibaba.fastjson.JSON;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.Map;public class MyDeserializer implements Deserializer {private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);@Overridepublic Object deserialize(String s, byte[] bytes) {try {String json = new String(bytes,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}

 1.2.3 分区策略

         分区策略决定了消息根据key投放到那个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定分区里面去;
  • 没有给定了分区号,给定数据的key值,通过key取上hashcode进行分区;
  • 既没有给定分区号,也没有给定key值,直接轮询进行分区;
  • 自定义分区策略,按照自定义需求选择分区号;

        生产者代码如下:

//测试分区发送
@RestController
public class PartitionProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;//    指定分区发送
//    不管你key是什么,到同一个分区@GetMapping("/kafka/partitionSend/{key}")public void setPartition(@PathVariable("key") String key) {kafkaTemplate.send("test", 0,key,"key="+key+",msg=指定0号分区");}//    指定key发送,不指定分区
//    根据key做hash,相同的key到同一个分区@GetMapping("/kafka/keysend/{key}")public void setKey(@PathVariable("key") String key) {kafkaTemplate.send("test", key,"key="+key+",msg=不指定分区");}// 什么也不指定@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}

        消费者代码如下:

//指定消费组消费
@Component
public class PartitionConsumer {private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);//分区消费@KafkaListener(topics = {"test"},topicPattern = "0")public void onMessage(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=0,message:[{}]", msg);}}@KafkaListener(topics = {"test"},topicPattern = "1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=1,message:[{}]", msg);}}
}

        1)测试默认分区策略,发送什么也不指定的消息

        可以发现发送的是同一条的数据,他是跟partition轮询发送的;

         2)测试指定分区

         3)按照key的hashcode来分区

1.3 消息消费 

 1.3.1 消费者分组

public class GroupConsumer {private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);//组1,消费者1@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-1 , message:{}", msg);}}//组1,消费者2@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-2 , message:{}", msg);}}//组2,只有一个消费者@KafkaListener(topics = {"test"},groupId = "group2")public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group2 , message:{}", msg);}}
}

         启动:

        需要注意的是,注意分区数与消费者数的搭配,如果(消费者数 > 分区数量),消费者将会出现闲置,浪费资源!

1.3.2 位移提交

        1)自动提交,我们在前面设置了以下两个选项,则kafka会延时设置自动提交:

        2)手动提交,有些时候我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复,如下我们配置手动提交:

@Configuration
public class MyOffsetConfig {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 注意这里!!!设置手动提交configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));// ack模式://          AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:////          RECORD//          每处理一条commit一次////          BATCH(默认)//          每次poll的时候批量提交一次,频率取决于每次poll的调用频率////          TIME//          每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)////          COUNT//          累积达到ackCount次的ack去commit////          COUNT_TIME//          ackTime或ackCount哪个条件先满足,就commit////          MANUAL//          listener负责ack,但是背后也是批量上去////          MANUAL_IMMEDIATE//          listner负责ack,每调用一次,就立即commitfactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

相关文章:

互联网全景消息(10)之Kafka深度剖析(中)

一、深入应用 1.1 SpringBoot集成Kafka 引入对应的依赖。 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupI…...

Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动同步

Oracle Dataguard&#xff08;主库为双节点集群&#xff09;配置详解&#xff08;5&#xff09;&#xff1a;将主库复制到备库并启动同步 目录 Oracle Dataguard&#xff08;主库为双节点集群&#xff09;配置详解&#xff08;5&#xff09;&#xff1a;将主库复制到备库并启动…...

pytorch小记(一):pytorch矩阵乘法:torch.matmul(x, y)

pytorch小记&#xff08;一&#xff09;&#xff1a;pytorch矩阵乘法&#xff1a;torch.matmul&#xff08;x, y&#xff09;/ x y 代码代码 1&#xff1a;torch.matmul(x, y)输入张量&#xff1a;计算逻辑&#xff1a;输出结果&#xff1a; 代码 2&#xff1a;y y.view(4,1)…...

PyTorch环境配置常见报错的解决办法

目标 小白在最基础的环境配置里一般都会出现许多问题。 这里把一些常见的问题分享出来。希望可以节省大家一些时间。 最终目标是可以在cmd虚拟环境里进入jupyter notebook&#xff0c;new的时候有对应的环境&#xff0c;并且可以跑通所有的import code。 第一步&#xff1a;…...

罗永浩再创业,这次盯上了 AI?

罗永浩&#xff0c;1972年7月9日生于中国延边朝鲜族自治州的一个军人家庭&#xff0c;是一名朝鲜族人&#xff1b;早年在新东方授课&#xff0c;2004年当选 “网络十大红人” &#xff1b;2006年8月1日&#xff0c;罗永浩创办牛博网&#xff1b;2008年5月&#xff0c;罗永浩注册…...

VUE3 provide 和 inject,跨越多层级组件传递数据

provide 和 inject 是 Vue 3 提供的 API&#xff0c;主要用于实现祖先组件与后代组件之间的依赖注入。它们可以让你在组件树中&#xff0c;跨越多层组件传递数据&#xff0c;而不需要通过 props 或事件的方式逐层传递。这个机制主要用于状态共享、插件系统或某些跨层级的功能。…...

git打补丁

1、应用场景 跨仓库升级 开发项目B使用的是开源项目A。开源项目A发现漏洞&#xff0c;作者进行了修复&#xff0c;我们可以通过使用git补丁的方式&#xff0c;将作者修改的内容复制到我 们的项目B中。 2、TortoiseGit方式 源仓库 格式化补丁 根据提交数量&#xff0c;生成…...

机械燃油车知识图谱、知识大纲、知识结构(持续更新...)

一、发动机 曲柄连杆机构 配气机构 点火系统 起动系统 燃油供给系统 润滑系统 冷却系统 二、底盘 &#xff08;一&#xff09;传动系统 1、离合器 2、变速器 3、万向传动装置 4、驱动桥 &#xff08;二&#xff09;行驶系统 1、车架 2、车桥 3、悬架 4、车轮 &a…...

Vue3学习总结

一、Vue 3 基础搭建与核心语法 1.创建 Vue 3 应用 在项目的入口文件 main.js 中&#xff0c;通过以下代码创建 Vue 3 应用实例&#xff1a; import { createApp } from vue; import App from ./App.vue;const app createApp(App); app.mount(#app); 这几行代码的作用是引入…...

Type-C双屏显示器方案

在数字化时代&#xff0c;高效的信息处理和视觉体验已成为我们日常生活和工作的关键需求。随着科技的进步&#xff0c;一款结合了便携性和高效视觉输出的设备——双屏便携屏&#xff0c;逐渐崭露头角&#xff0c;成为追求高效工作和娱乐体验人群的新宠。本文将深入探讨双屏便携…...

【读书与思考】焦虑与内耗

【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】【读书与思考】 导言 今天一个朋友和我说&#xff0c;最近比较焦虑和内耗&#xff0c;无心工作和学习&#xff0c;我问他你焦虑内耗的时候&#xff0c;时间主要花在哪了&#xff0c;他告诉我说主要花在看有关移…...

基于python的网页表格数据下载--转excel

基于 Python 的网页表格数据爬取与下载:以维基百科为例 目录 基于 Python 的网页表格数据爬取与下载:以维基百科为例1. 背景介绍2. 工具与环境3. 操作步骤1. 获取网页内容2. 定位表格元素3. 表格变身 Pandas DataFrame4. 检查数据,收工!5. 进阶玩法与优化6. 完整代码4. 结果…...

Vue.js开发入门:从零开始搭建你的第一个项目

前言 嘿&#xff0c;小伙伴们&#xff01;今天咱们来聊聊 Vue.js&#xff0c;一个超火的前端框架。如果你是编程小白&#xff0c;别怕&#xff0c;跟着我一步步来&#xff0c;保证你能轻松上手&#xff0c;搭建起属于自己的第一个 Vue 项目。Vue.js 可能听起来有点高大上&#…...

LS1046+XILINX XDMA PCIE调通

欢迎点赞收藏&#xff0c;欢迎私下讨论技术&#xff0c;分享技术 硬件平台 &#xff1a;NXP LS1046 XILINX FPGA 软件平台&#xff1a;LINUX 4.19.68 buildroot LS1046 PEX3 接 XILINX FPGA&#xff0c;linux使用designware的PCI主控制器。下载XILINX DMA驱动&#xff0c;解…...

HarmonyOS:@LocalBuilder装饰器: 维持组件父子关系

一、前言 当开发者使用Builder做引用数据传递时&#xff0c;会考虑组件的父子关系&#xff0c;使用了bind(this)之后&#xff0c;组件的父子关系和状态管理的父子关系并不一致。为了解决组件的父子关系和状态管理的父子关系保持一致的问题&#xff0c;引入LocalBuilder装饰器。…...

YOLOv10-1.1部分代码阅读笔记-downloads.py

downloads.py ultralytics\utils\downloads.py 目录 downloads.py 1.所需的库和模块 2.def is_url(url, checkFalse): 3.def delete_dsstore(path, files_to_delete(".DS_Store", "__MACOSX")): 4.def zip_directory(directory, compressTrue, ex…...

计算机图形学【绘制立方体和正六边形】

工具介绍 OpenGL&#xff1a;一个跨语言的图形API&#xff0c;用于渲染2D和3D图形。它提供了绘制图形所需的底层功能。 GLUT&#xff1a;OpenGL的一个工具库&#xff0c;简化了窗口创建、输入处理和其他与图形环境相关的任务。 使用的函数 1. glClear(GL_COLOR_BUFFER_BIT |…...

基于django中医药数据可视化平台(源码+lw+部署文档+讲解),源码可白嫖!

摘要 时代在飞速进步&#xff0c;每个行业都在努力发展现在先进技术&#xff0c;通过这些先进的技术来提高自己的水平和优势&#xff0c;中医药管理平台当然不能排除在外。中医药数据可视化平台是在实际应用和软件工程的开发原理之上&#xff0c;运用Python语言、ECharts技术、…...

kafka消费堆积问题探索

背景 我们的商城项目用PHP写的&#xff0c;原本写日志方案用的是PHP的方案&#xff0c;但是&#xff0c;这个方案导致资源消耗一直降不下来&#xff0c;使用了20个CPU。后面考虑使用通过kafka的方案写日志&#xff0c;商城中把产生的日志丢到kafka中&#xff0c;在以go写的项目…...

Vue.js 使用插槽(Slots)优化组件结构

Vue.js 使用插槽&#xff08;Slots&#xff09;优化组件结构 今天我们聊聊 Vue.js 的一个超实用功能——插槽&#xff08;Slots&#xff09;。插槽是 Vue 组件开发中的神器&#xff0c;用好它&#xff0c;你可以让组件变得更灵活、更可复用&#xff0c;还能写出优雅的代码结构…...

GitHub 趋势日报 (2025年06月08日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...

关于easyexcel动态下拉选问题处理

前些日子突然碰到一个问题&#xff0c;说是客户的导入文件模版想支持部分导入内容的下拉选&#xff0c;于是我就找了easyexcel官网寻找解决方案&#xff0c;并没有找到合适的方案&#xff0c;没办法只能自己动手并分享出来&#xff0c;针对Java生成Excel下拉菜单时因选项过多导…...

绕过 Xcode?使用 Appuploader和主流工具实现 iOS 上架自动化

iOS 应用的发布流程一直是开发链路中最“苹果味”的环节&#xff1a;强依赖 Xcode、必须使用 macOS、各种证书和描述文件配置……对很多跨平台开发者来说&#xff0c;这一套流程并不友好。 特别是当你的项目主要在 Windows 或 Linux 下开发&#xff08;例如 Flutter、React Na…...

WEB3全栈开发——面试专业技能点P7前端与链上集成

一、Next.js技术栈 ✅ 概念介绍 Next.js 是一个基于 React 的 服务端渲染&#xff08;SSR&#xff09;与静态网站生成&#xff08;SSG&#xff09; 框架&#xff0c;由 Vercel 开发。它简化了构建生产级 React 应用的过程&#xff0c;并内置了很多特性&#xff1a; ✅ 文件系…...

前端调试HTTP状态码

1xx&#xff08;信息类状态码&#xff09; 这类状态码表示临时响应&#xff0c;需要客户端继续处理请求。 100 Continue 服务器已收到请求的初始部分&#xff0c;客户端应继续发送剩余部分。 2xx&#xff08;成功类状态码&#xff09; 表示请求已成功被服务器接收、理解并处…...

对象回调初步研究

_OBJECT_TYPE结构分析 在介绍什么是对象回调前&#xff0c;首先要熟悉下结构 以我们上篇线程回调介绍过的导出的PsProcessType 结构为例&#xff0c;用_OBJECT_TYPE这个结构来解析它&#xff0c;0x80处就是今天要介绍的回调链表&#xff0c;但是先不着急&#xff0c;先把目光…...

拟合问题处理

在机器学习中&#xff0c;核心任务通常围绕模型训练和性能提升展开&#xff0c;但你提到的 “优化训练数据解决过拟合” 和 “提升泛化性能解决欠拟合” 需要结合更准确的概念进行梳理。以下是对机器学习核心任务的系统复习和修正&#xff1a; 一、机器学习的核心任务框架 机…...

统计学(第8版)——统计抽样学习笔记(考试用)

一、统计抽样的核心内容与问题 研究内容 从总体中科学抽取样本的方法利用样本数据推断总体特征&#xff08;均值、比率、总量&#xff09;控制抽样误差与非抽样误差 解决的核心问题 在成本约束下&#xff0c;用少量样本准确推断总体特征量化估计结果的可靠性&#xff08;置…...

Python_day48随机函数与广播机制

在继续讲解模块消融前&#xff0c;先补充几个之前没提的基础概念 尤其需要搞懂张量的维度、以及计算后的维度&#xff0c;这对于你未来理解复杂的网络至关重要 一、 随机张量的生成 在深度学习中经常需要随机生成一些张量&#xff0c;比如权重的初始化&#xff0c;或者计算输入…...