当前位置: 首页 > news >正文

SpringBoot 集成 Kafka

SpringBoot 集成 Kafka

  • 1 安装 Kafka
  • 2 创建 Topic
  • 3 Java 创建 Topic
  • 4 SpringBoot 项目
    • 4.1 pom.xml
    • 4.2 application.yml
    • 4.3 KafkaApplication.java
    • 4.4 CustomizePartitioner.java
    • 4.5 KafkaInitialConfig.java
    • 4.6 SendMessageController.java
  • 5 测试

1 安装 Kafka

Docker 安装 Kafka

2 创建 Topic

创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)

PS C:\Users\Administrator> docker exec -it kafka /bin/sh$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
Created topic topic1.$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.

3 Java 创建 Topic

package com.xu.mq.demo.test.service;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.admin.NewTopic;/*** kafka 初始化配置类 创建 Topic** @author Administrator* @date 2023年2月17日11点30分*/
@Configuration
public class KafkaInitialConfig {public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";@Beanpublic NewTopic audioUploadTopic() {return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);}@Beanpublic NewTopic textUploadTopic() {return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);}}

4 SpringBoot 项目

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.8</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.xu</groupId><artifactId>kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.12</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

4.2 application.yml

server:port: 8001spring:application:name: hello-kafkakafka:# 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)bootstrap-servers: 192.168.1.92:9092producer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: allconsumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: true# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 4

4.3 KafkaApplication.java

package com.xu.mq.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;/*** @author Administrator*/
@EnableKafka
@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}}

4.4 CustomizePartitioner.java

package com.xu.kafka.config;import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;/*** @author Administrator*/
public class CustomizePartitioner implements Partitioner {/*** 自定义分区规则** @param topic      The topic name* @param key        The key to partition on (or null if no key)* @param keyBytes   The serialized key to partition on( or null if no key)* @param value      The value to partition on or null* @param valueBytes The serialized value to partition on or null* @param cluster    The current cluster metadata* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}

4.5 KafkaInitialConfig.java

package com.xu.kafka.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.admin.NewTopic;/*** kafka 初始化配置类 创建 Topic** @author Administrator* @date 2023年2月17日11点30分*/
@Configuration
public class KafkaInitialConfig {public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";@Beanpublic NewTopic audioUploadTopic() {return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);}@Beanpublic NewTopic textUploadTopic() {return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);}}

4.6 SendMessageController.java

package com.xu.kafka.message.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import com.xu.kafka.config.KafkaInitialConfig;import cn.hutool.json.JSONUtil;/*** @author Administrator*/
@RequestMapping(value = "/kafka")
@RestController
public class SendMessageController {@Autowiredprivate KafkaTemplate template;/*** KafkaTemplate 发送消息** @param message*/@GetMapping("/test1/{message}")public void test1(@PathVariable("message") String message) {template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message);}/*** KafkaTemplate 发送消息 有回调** @param message*/@GetMapping("/test2/{message}")public void test2(@PathVariable("message") String message) {template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message).addCallback(success -> {System.out.println("发送成功\t" + success);}, fail -> {System.out.println("发送失败\t" + fail);});}
}

5 测试

在这里插入图片描述

发送成功	SendResult [producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]

在这里插入图片描述

相关文章:

SpringBoot 集成 Kafka

SpringBoot 集成 Kafka1 安装 Kafka2 创建 Topic3 Java 创建 Topic4 SpringBoot 项目4.1 pom.xml4.2 application.yml4.3 KafkaApplication.java4.4 CustomizePartitioner.java4.5 KafkaInitialConfig.java4.6 SendMessageController.java5 测试1 安装 Kafka Docker 安装 Kafk…...

OpenCV 图像金字塔算子

本文是OpenCV图像视觉入门之路的第14篇文章&#xff0c;本文详细的介绍了图像金字塔算子的各种操作&#xff0c;例如&#xff1a;高斯金字塔算子 、拉普拉斯金字塔算子等操作。 高斯金字塔中的较高级别&#xff08;低分辨率&#xff09;是通过先用高斯核对图像进行卷积再删除偶…...

【自学Linux】Linux一切皆文件

Linux一切皆文件 Linux一切皆文件教程 Linux 中所有内容都是以文件的形式保存和管理的&#xff0c;即一切皆文件&#xff0c;普通文件是文件&#xff0c;目录是文件&#xff0c;硬件设备&#xff08;键盘、监视器、硬盘、打印机&#xff09;是文件&#xff0c;就连套接字&…...

CUDA C++扩展的详细描述

CUDA C扩展的详细描述 文章目录CUDA C扩展的详细描述CUDA函数执行空间说明符B.1.1 \_\_global\_\_B.1.2 \_\_device\_\_B.1.3 \_\_host\_\_B.1.4 Undefined behaviorB.1.5 __noinline__ and __forceinline__B.2 Variable Memory Space SpecifiersB.2.1 \_\_device\_\_B.2.2. \_…...

为什么重写equals必须重写hashCode

关于这个问题&#xff0c;看了网上很多答案&#xff0c;感觉都参差不齐&#xff0c;没有答到要点&#xff0c;这次就记录一下&#xff01; 首先我们为什么要重写equals&#xff1f;这个方法是用来干嘛的&#xff1f; public boolean equals &#xff08;Object object&#x…...

< 每日小技巧:N个很棒的 Vue 开发技巧, 持续记录ing >

每日小技巧&#xff1a;6 个很棒的 Vue 开发技巧&#x1f449; ① Watch 妙用> watch的高级使用> 一个监听器触发多个方法> watch 监听多个变量&#x1f449; ② 自定义事件 $emit() 和 事件参数 $event&#x1f449; ③ 监听组件生命周期常规写法hook写法&#x1f44…...

数据结构与算法之二分查找分而治之思想

决定我们成为什么样人的&#xff0c;不是我们的能力&#xff0c;而是我们的选择。——《哈利波特与密室》二分查找是查找算法里面是很优秀的一个算法&#xff0c;特别是在有序的数组中&#xff0c;这种算法思想体现的淋漓尽致。一.题目描述及其要求请实现无重复数字的升序数组的…...

训练自己的中文word2vec(词向量)--skip-gram方法

训练自己的中文word2vec&#xff08;词向量&#xff09;–skip-gram方法 什么是词向量 ​ 将单词映射/嵌入&#xff08;Embedding&#xff09;到一个新的空间&#xff0c;形成词向量&#xff0c;以此来表示词的语义信息&#xff0c;在这个新的空间中&#xff0c;语义相同的单…...

ubuntu系统环境配置和常用软件安装

系统环境 修改文件夹名称为英文 参考链接 export LANGen_US xdg-user-dirs-gtk-update 常用软件安装 常用工具 ping 和ifconfig工具 sudo apt install -y net-tools inetutils-ping 截图软件 sudo apt install -y net-tools inetutils-ping flameshot 录屏 sudo apt-get i…...

【1139. 最大的以 1 为边界的正方形】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 给你一个由若干 0 和 1 组成的二维网格 grid&#xff0c;请你找出边界全部由 1 组成的最大 正方形 子网格&#xff0c;并返回该子网格中的元素数量。如果不存在&#xff0c;则返回 0。 示例 1&#…...

windows11安装sqlserver2022报错

window11安装SQL Server 2022 报错 糟糕… 无法安装SQL Server (setup.exe)。此 SQL Server安装程序介质不支持此OS的语言&#xff0c;或没有SQL Server英语版本的安装文件。请使用匹配的特定语言SQL Server介质;或安装两个特定语言MUI&#xff0c;然后通过控制面板的区域设置…...

Python快速上手系列--日志模块--详解篇

前言本篇主要说说日志模块&#xff0c;在写自动化测试框架的时候我们就需要用到这个模块了&#xff0c;方便我们快速的定位错误&#xff0c;了解软件的运行情况&#xff0c;更加顺畅的调试程序。为什么要用到日志模块&#xff0c;直接print不就好了&#xff01;那得写多少print…...

【THREE.JS学习(1)】绘制一个可以旋转、放缩的立方体

学习新技能&#xff0c;做一下笔记。在使用ThreeJS的时候&#xff0c;首先创建一个场景const scene new THREE.Scene();接着&#xff0c;创建一个相机其中&#xff0c;THREE.PerspectiveCamera&#xff08;&#xff09;四个参数分别为&#xff1a;1.fov 相机视锥体竖直方向视野…...

数仓实战 - 滴滴出行

项目大致流程&#xff1a; 1、项目业务背景 1.1 目的 本案例将某出行打车的日志数据来进行数据分析&#xff0c;例如&#xff1a;我们需要统计某一天订单量是多少、预约订单与非预约订单的占比是多少、不同时段订单占比等 数据海量 – 大数据 hive比MySQL慢很多 1.2 项目架…...

python虚拟环境与环境变量

一、环境变量 1.环境变量 在命令行下&#xff0c;使用可执行文件&#xff0c;需要来到可执行文件的路径下执行 如果在任意路径下执行可执行文件&#xff0c;能够有响应&#xff0c;就需要在环境变量配置 2.设置环境变量 用户变量&#xff1a;当前用户登录到系统&#xff0c;…...

BeautifulSoup文档4-详细方法 | 用什么方法对文档树进行搜索?

4-详细方法 | 用什么方法对文档树进行搜索&#xff1f;1 过滤器1.1 字符串1.2 正则表达式1.3 列表1.4 True1.5 可以自定义方法2 find_all()2.1 参数原型2.2 name参数2.3 keyword 参数2.4 string 参数2.5 limit 参数2.6 recursive 参数3 find()4 find_parents()和find_parent()5…...

初识Tkinter界面设计

目录 前言 一、初识Tkinter 二、Label控件 三、Button控件 四、Entry控件 前言 本文简单介绍如何使用Python创建一个界面。 一、初识Tk...

软件测试面试题中的sql题目你会做吗?

目录 1.学生表 2.一道SQL语句面试题&#xff0c;关于group by表内容&#xff1a; 3.表中有A B C三列,用SQL语句实现&#xff1a;当A列大于B列时选择A列否则选择B列&#xff0c;当B列大于C列时选择B列否则选择C列 4. 5.姓名&#xff1a;name 课程&#xff1a;subject 分数&…...

VS实用调试技巧

一.什么是BUG&#x1f41b;Bug一词的原意是虫子&#xff0c;而在电脑系统或程序中隐藏着的一些未被发现的缺陷或问题&#xff0c;人们也叫它"bug"。这是为什么呢&#xff1f;这就要追溯到一个程序员与飞蛾的故事了。Bug的创始人格蕾丝赫柏&#xff08;Grace Murray H…...

通俗易懂理解三次握手、四次挥手(TCP)

文章目录1、通俗语言理解1.1 三次握手1.2 四次挥手2、进一步理解三次握手和四次挥手2.1 三次握手2.2 四次挥手1、通俗语言理解 1.1 三次握手 C:客户端 S&#xff1a;服务器端 第一次握手&#xff1a; C&#xff1a;在吗&#xff1f;我要和你建立连接。 第二次握手&#xff…...

基于COMSOL 5.5的精确非局部损伤模型:模拟脆性材料压缩、摩擦和剪切条件下的破坏行为研究

开发了一种基于COMSOL 5.5的损伤模型&#xff0c;专门用于模拟脆性材料在压缩、摩擦和剪切条件下的破坏行为。 该模型采用非局部本构关系&#xff0c;通过考虑材料内部微观结构的影响&#xff0c;精确捕捉脆性材料在受力过程中的应力分布和破坏机理。脆性材料的破坏模拟一直是工…...

技术日报|字节DeerFlow今日强势登顶日增3787星总量破4.6万,3D建筑编辑器黑马杀入前二

&#x1f31f; TrendForge 每日精选 - 发现最具潜力的开源项目 &#x1f4ca; 今日共收录 12 个热门项目&#x1f310; 智能中文翻译版 - 项目描述已自动翻译&#xff0c;便于理解&#x1f3c6; 今日最热项目 Top 10 &#x1f947; bytedance/deer-flow 项目简介: DeerFlow是一…...

基于RAG的智能客服系统实战:从架构设计到生产环境优化

最近在做一个智能客服系统的升级项目&#xff0c;之前用规则引擎维护起来太痛苦了&#xff0c;纯用大模型又贵又不准。经过一番折腾&#xff0c;最终用RAG&#xff08;检索增强生成&#xff09;技术搞定了&#xff0c;效果提升非常明显。今天就来分享一下从架构设计到上线优化的…...

Frida安装后别急着‘玩’!这5个必做的环境验证与排错步骤你做了吗?

Frida安装后必做的5个环境验证与排错步骤 当你兴冲冲地按照教程安装完Frida和Server&#xff0c;准备开始"玩耍"时&#xff0c;却发现frida-ps -U毫无反应&#xff0c;或者遇到各种连接失败的问题。这种"安装成功却用不了"的尴尬&#xff0c;往往源于环境…...

Java 17 新特性实战:现代 Java 开发的优雅实践

Java 17 新特性实战&#xff1a;现代 Java 开发的优雅实践 前言 大家好。最近很多读者朋友询问 Java 17 的新特性以及如何在项目中应用这些特性。作为一个长期使用 Java 的架构师&#xff0c;今天我想分享一下 Java 17 的新特性以及在实际项目中的应用经验。 Java 17 的核心新特…...

BoneAnimCopy: 跨模型骨骼动画复用解决方案,提升10倍效率的动画师实践指南

BoneAnimCopy: 跨模型骨骼动画复用解决方案&#xff0c;提升10倍效率的动画师实践指南 【免费下载链接】blender_BoneAnimCopy 用于在blender中桥接骨骼动画的插件 项目地址: https://gitcode.com/gh_mirrors/bl/blender_BoneAnimCopy 在3D动画制作领域&#xff0c;动画…...

如何高效使用英雄联盟智能助手:5分钟快速上手指南

如何高效使用英雄联盟智能助手&#xff1a;5分钟快速上手指南 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 你是否经常因为错过…...

引入电转气协同的含碳捕集与垃圾焚烧虚拟电厂优化调度

【文章复现 可】计及电转气协同的含碳捕集与垃圾焚烧 虚拟电厂优化调度 引入碳捕集电厂–电转气–燃气机组协同利用框架&#xff0c;碳捕集的 CO2可作为电转气原料&#xff0c;生成的天然气则供应给燃气机组&#xff1b;并通过联合调度将碳捕集能耗和烟气处理能耗进行负荷转移…...

ThinkPad 4G/5G 连网不支持 IPv6?一文教你判断与设置

很多用 ThinkPad 内置 4G/5G 模块上网的用户&#xff0c;在使用 IPv6 相关服务、测速或网络诊断时&#xff0c;会发现自己明明设备很新&#xff0c;却始终无法获取 IPv6 地址&#xff0c;甚至误以为电脑硬件不支持。尤其在办公、远程、云服务等场景&#xff0c;IPv6 支持与否直…...

联合仿真模型验证:Carsim + 车辆动力学模型(十四自由度)实践

联合仿真模型验证Carsim车辆动力学模型(十四自由度)软件使用:Carsim2019.0Matlab/Simulink 适用场景:采用模块化建模方法&#xff0c;搭建14自由度整车模型&#xff0c;将此模型与carsim进行联合仿真模型验证。 (模型和 carsim存在一定误差) 产品 simulink源码包含如下模块:工况…...