当前位置: 首页 > news >正文

SpringBoot 集成 Kafka

SpringBoot 集成 Kafka

  • 1 安装 Kafka
  • 2 创建 Topic
  • 3 Java 创建 Topic
  • 4 SpringBoot 项目
    • 4.1 pom.xml
    • 4.2 application.yml
    • 4.3 KafkaApplication.java
    • 4.4 CustomizePartitioner.java
    • 4.5 KafkaInitialConfig.java
    • 4.6 SendMessageController.java
  • 5 测试

1 安装 Kafka

Docker 安装 Kafka

2 创建 Topic

创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)

PS C:\Users\Administrator> docker exec -it kafka /bin/sh$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
Created topic topic1.$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.

3 Java 创建 Topic

package com.xu.mq.demo.test.service;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.admin.NewTopic;/*** kafka 初始化配置类 创建 Topic** @author Administrator* @date 2023年2月17日11点30分*/
@Configuration
public class KafkaInitialConfig {public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";@Beanpublic NewTopic audioUploadTopic() {return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);}@Beanpublic NewTopic textUploadTopic() {return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);}}

4 SpringBoot 项目

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.8</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.xu</groupId><artifactId>kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.12</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

4.2 application.yml

server:port: 8001spring:application:name: hello-kafkakafka:# 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)bootstrap-servers: 192.168.1.92:9092producer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: allconsumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: true# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 4

4.3 KafkaApplication.java

package com.xu.mq.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;/*** @author Administrator*/
@EnableKafka
@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}}

4.4 CustomizePartitioner.java

package com.xu.kafka.config;import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;/*** @author Administrator*/
public class CustomizePartitioner implements Partitioner {/*** 自定义分区规则** @param topic      The topic name* @param key        The key to partition on (or null if no key)* @param keyBytes   The serialized key to partition on( or null if no key)* @param value      The value to partition on or null* @param valueBytes The serialized value to partition on or null* @param cluster    The current cluster metadata* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}

4.5 KafkaInitialConfig.java

package com.xu.kafka.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.admin.NewTopic;/*** kafka 初始化配置类 创建 Topic** @author Administrator* @date 2023年2月17日11点30分*/
@Configuration
public class KafkaInitialConfig {public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";@Beanpublic NewTopic audioUploadTopic() {return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);}@Beanpublic NewTopic textUploadTopic() {return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);}}

4.6 SendMessageController.java

package com.xu.kafka.message.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import com.xu.kafka.config.KafkaInitialConfig;import cn.hutool.json.JSONUtil;/*** @author Administrator*/
@RequestMapping(value = "/kafka")
@RestController
public class SendMessageController {@Autowiredprivate KafkaTemplate template;/*** KafkaTemplate 发送消息** @param message*/@GetMapping("/test1/{message}")public void test1(@PathVariable("message") String message) {template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message);}/*** KafkaTemplate 发送消息 有回调** @param message*/@GetMapping("/test2/{message}")public void test2(@PathVariable("message") String message) {template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message).addCallback(success -> {System.out.println("发送成功\t" + success);}, fail -> {System.out.println("发送失败\t" + fail);});}
}

5 测试

在这里插入图片描述

发送成功	SendResult [producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]

在这里插入图片描述

相关文章:

SpringBoot 集成 Kafka

SpringBoot 集成 Kafka1 安装 Kafka2 创建 Topic3 Java 创建 Topic4 SpringBoot 项目4.1 pom.xml4.2 application.yml4.3 KafkaApplication.java4.4 CustomizePartitioner.java4.5 KafkaInitialConfig.java4.6 SendMessageController.java5 测试1 安装 Kafka Docker 安装 Kafk…...

OpenCV 图像金字塔算子

本文是OpenCV图像视觉入门之路的第14篇文章&#xff0c;本文详细的介绍了图像金字塔算子的各种操作&#xff0c;例如&#xff1a;高斯金字塔算子 、拉普拉斯金字塔算子等操作。 高斯金字塔中的较高级别&#xff08;低分辨率&#xff09;是通过先用高斯核对图像进行卷积再删除偶…...

【自学Linux】Linux一切皆文件

Linux一切皆文件 Linux一切皆文件教程 Linux 中所有内容都是以文件的形式保存和管理的&#xff0c;即一切皆文件&#xff0c;普通文件是文件&#xff0c;目录是文件&#xff0c;硬件设备&#xff08;键盘、监视器、硬盘、打印机&#xff09;是文件&#xff0c;就连套接字&…...

CUDA C++扩展的详细描述

CUDA C扩展的详细描述 文章目录CUDA C扩展的详细描述CUDA函数执行空间说明符B.1.1 \_\_global\_\_B.1.2 \_\_device\_\_B.1.3 \_\_host\_\_B.1.4 Undefined behaviorB.1.5 __noinline__ and __forceinline__B.2 Variable Memory Space SpecifiersB.2.1 \_\_device\_\_B.2.2. \_…...

为什么重写equals必须重写hashCode

关于这个问题&#xff0c;看了网上很多答案&#xff0c;感觉都参差不齐&#xff0c;没有答到要点&#xff0c;这次就记录一下&#xff01; 首先我们为什么要重写equals&#xff1f;这个方法是用来干嘛的&#xff1f; public boolean equals &#xff08;Object object&#x…...

< 每日小技巧:N个很棒的 Vue 开发技巧, 持续记录ing >

每日小技巧&#xff1a;6 个很棒的 Vue 开发技巧&#x1f449; ① Watch 妙用> watch的高级使用> 一个监听器触发多个方法> watch 监听多个变量&#x1f449; ② 自定义事件 $emit() 和 事件参数 $event&#x1f449; ③ 监听组件生命周期常规写法hook写法&#x1f44…...

数据结构与算法之二分查找分而治之思想

决定我们成为什么样人的&#xff0c;不是我们的能力&#xff0c;而是我们的选择。——《哈利波特与密室》二分查找是查找算法里面是很优秀的一个算法&#xff0c;特别是在有序的数组中&#xff0c;这种算法思想体现的淋漓尽致。一.题目描述及其要求请实现无重复数字的升序数组的…...

训练自己的中文word2vec(词向量)--skip-gram方法

训练自己的中文word2vec&#xff08;词向量&#xff09;–skip-gram方法 什么是词向量 ​ 将单词映射/嵌入&#xff08;Embedding&#xff09;到一个新的空间&#xff0c;形成词向量&#xff0c;以此来表示词的语义信息&#xff0c;在这个新的空间中&#xff0c;语义相同的单…...

ubuntu系统环境配置和常用软件安装

系统环境 修改文件夹名称为英文 参考链接 export LANGen_US xdg-user-dirs-gtk-update 常用软件安装 常用工具 ping 和ifconfig工具 sudo apt install -y net-tools inetutils-ping 截图软件 sudo apt install -y net-tools inetutils-ping flameshot 录屏 sudo apt-get i…...

【1139. 最大的以 1 为边界的正方形】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 给你一个由若干 0 和 1 组成的二维网格 grid&#xff0c;请你找出边界全部由 1 组成的最大 正方形 子网格&#xff0c;并返回该子网格中的元素数量。如果不存在&#xff0c;则返回 0。 示例 1&#…...

windows11安装sqlserver2022报错

window11安装SQL Server 2022 报错 糟糕… 无法安装SQL Server (setup.exe)。此 SQL Server安装程序介质不支持此OS的语言&#xff0c;或没有SQL Server英语版本的安装文件。请使用匹配的特定语言SQL Server介质;或安装两个特定语言MUI&#xff0c;然后通过控制面板的区域设置…...

Python快速上手系列--日志模块--详解篇

前言本篇主要说说日志模块&#xff0c;在写自动化测试框架的时候我们就需要用到这个模块了&#xff0c;方便我们快速的定位错误&#xff0c;了解软件的运行情况&#xff0c;更加顺畅的调试程序。为什么要用到日志模块&#xff0c;直接print不就好了&#xff01;那得写多少print…...

【THREE.JS学习(1)】绘制一个可以旋转、放缩的立方体

学习新技能&#xff0c;做一下笔记。在使用ThreeJS的时候&#xff0c;首先创建一个场景const scene new THREE.Scene();接着&#xff0c;创建一个相机其中&#xff0c;THREE.PerspectiveCamera&#xff08;&#xff09;四个参数分别为&#xff1a;1.fov 相机视锥体竖直方向视野…...

数仓实战 - 滴滴出行

项目大致流程&#xff1a; 1、项目业务背景 1.1 目的 本案例将某出行打车的日志数据来进行数据分析&#xff0c;例如&#xff1a;我们需要统计某一天订单量是多少、预约订单与非预约订单的占比是多少、不同时段订单占比等 数据海量 – 大数据 hive比MySQL慢很多 1.2 项目架…...

python虚拟环境与环境变量

一、环境变量 1.环境变量 在命令行下&#xff0c;使用可执行文件&#xff0c;需要来到可执行文件的路径下执行 如果在任意路径下执行可执行文件&#xff0c;能够有响应&#xff0c;就需要在环境变量配置 2.设置环境变量 用户变量&#xff1a;当前用户登录到系统&#xff0c;…...

BeautifulSoup文档4-详细方法 | 用什么方法对文档树进行搜索?

4-详细方法 | 用什么方法对文档树进行搜索&#xff1f;1 过滤器1.1 字符串1.2 正则表达式1.3 列表1.4 True1.5 可以自定义方法2 find_all()2.1 参数原型2.2 name参数2.3 keyword 参数2.4 string 参数2.5 limit 参数2.6 recursive 参数3 find()4 find_parents()和find_parent()5…...

初识Tkinter界面设计

目录 前言 一、初识Tkinter 二、Label控件 三、Button控件 四、Entry控件 前言 本文简单介绍如何使用Python创建一个界面。 一、初识Tk...

软件测试面试题中的sql题目你会做吗?

目录 1.学生表 2.一道SQL语句面试题&#xff0c;关于group by表内容&#xff1a; 3.表中有A B C三列,用SQL语句实现&#xff1a;当A列大于B列时选择A列否则选择B列&#xff0c;当B列大于C列时选择B列否则选择C列 4. 5.姓名&#xff1a;name 课程&#xff1a;subject 分数&…...

VS实用调试技巧

一.什么是BUG&#x1f41b;Bug一词的原意是虫子&#xff0c;而在电脑系统或程序中隐藏着的一些未被发现的缺陷或问题&#xff0c;人们也叫它"bug"。这是为什么呢&#xff1f;这就要追溯到一个程序员与飞蛾的故事了。Bug的创始人格蕾丝赫柏&#xff08;Grace Murray H…...

通俗易懂理解三次握手、四次挥手(TCP)

文章目录1、通俗语言理解1.1 三次握手1.2 四次挥手2、进一步理解三次握手和四次挥手2.1 三次握手2.2 四次挥手1、通俗语言理解 1.1 三次握手 C:客户端 S&#xff1a;服务器端 第一次握手&#xff1a; C&#xff1a;在吗&#xff1f;我要和你建立连接。 第二次握手&#xff…...

Appium+python自动化(十六)- ADB命令

简介 Android 调试桥(adb)是多种用途的工具&#xff0c;该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具&#xff0c;其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利&#xff0c;如安装和调试…...

线程同步:确保多线程程序的安全与高效!

全文目录&#xff1a; 开篇语前序前言第一部分&#xff1a;线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分&#xff1a;synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分&#xff…...

JVM垃圾回收机制全解析

Java虚拟机&#xff08;JVM&#xff09;中的垃圾收集器&#xff08;Garbage Collector&#xff0c;简称GC&#xff09;是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象&#xff0c;从而释放内存空间&#xff0c;避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建

华为云FlexusDeepSeek征文&#xff5c;DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色&#xff0c;华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型&#xff0c;能助力我们轻松驾驭 DeepSeek-V3/R1&#xff0c;本文中将分享如何…...

蓝桥杯3498 01串的熵

问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798&#xff0c; 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...