当前位置: 首页 > news >正文

SpringBoot 集成 Kafka

SpringBoot 集成 Kafka

  • 1 安装 Kafka
  • 2 创建 Topic
  • 3 Java 创建 Topic
  • 4 SpringBoot 项目
    • 4.1 pom.xml
    • 4.2 application.yml
    • 4.3 KafkaApplication.java
    • 4.4 CustomizePartitioner.java
    • 4.5 KafkaInitialConfig.java
    • 4.6 SendMessageController.java
  • 5 测试

1 安装 Kafka

Docker 安装 Kafka

2 创建 Topic

创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)

PS C:\Users\Administrator> docker exec -it kafka /bin/sh$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
Created topic topic1.$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.

3 Java 创建 Topic

package com.xu.mq.demo.test.service;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.admin.NewTopic;/*** kafka 初始化配置类 创建 Topic** @author Administrator* @date 2023年2月17日11点30分*/
@Configuration
public class KafkaInitialConfig {public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";@Beanpublic NewTopic audioUploadTopic() {return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);}@Beanpublic NewTopic textUploadTopic() {return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);}}

4 SpringBoot 项目

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.8</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.xu</groupId><artifactId>kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.12</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

4.2 application.yml

server:port: 8001spring:application:name: hello-kafkakafka:# 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)bootstrap-servers: 192.168.1.92:9092producer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: allconsumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: true# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 4

4.3 KafkaApplication.java

package com.xu.mq.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;/*** @author Administrator*/
@EnableKafka
@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}}

4.4 CustomizePartitioner.java

package com.xu.kafka.config;import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;/*** @author Administrator*/
public class CustomizePartitioner implements Partitioner {/*** 自定义分区规则** @param topic      The topic name* @param key        The key to partition on (or null if no key)* @param keyBytes   The serialized key to partition on( or null if no key)* @param value      The value to partition on or null* @param valueBytes The serialized value to partition on or null* @param cluster    The current cluster metadata* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}

4.5 KafkaInitialConfig.java

package com.xu.kafka.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.admin.NewTopic;/*** kafka 初始化配置类 创建 Topic** @author Administrator* @date 2023年2月17日11点30分*/
@Configuration
public class KafkaInitialConfig {public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";@Beanpublic NewTopic audioUploadTopic() {return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);}@Beanpublic NewTopic textUploadTopic() {return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);}}

4.6 SendMessageController.java

package com.xu.kafka.message.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import com.xu.kafka.config.KafkaInitialConfig;import cn.hutool.json.JSONUtil;/*** @author Administrator*/
@RequestMapping(value = "/kafka")
@RestController
public class SendMessageController {@Autowiredprivate KafkaTemplate template;/*** KafkaTemplate 发送消息** @param message*/@GetMapping("/test1/{message}")public void test1(@PathVariable("message") String message) {template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message);}/*** KafkaTemplate 发送消息 有回调** @param message*/@GetMapping("/test2/{message}")public void test2(@PathVariable("message") String message) {template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message).addCallback(success -> {System.out.println("发送成功\t" + success);}, fail -> {System.out.println("发送失败\t" + fail);});}
}

5 测试

在这里插入图片描述

发送成功	SendResult [producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]

在这里插入图片描述

相关文章:

SpringBoot 集成 Kafka

SpringBoot 集成 Kafka1 安装 Kafka2 创建 Topic3 Java 创建 Topic4 SpringBoot 项目4.1 pom.xml4.2 application.yml4.3 KafkaApplication.java4.4 CustomizePartitioner.java4.5 KafkaInitialConfig.java4.6 SendMessageController.java5 测试1 安装 Kafka Docker 安装 Kafk…...

OpenCV 图像金字塔算子

本文是OpenCV图像视觉入门之路的第14篇文章&#xff0c;本文详细的介绍了图像金字塔算子的各种操作&#xff0c;例如&#xff1a;高斯金字塔算子 、拉普拉斯金字塔算子等操作。 高斯金字塔中的较高级别&#xff08;低分辨率&#xff09;是通过先用高斯核对图像进行卷积再删除偶…...

【自学Linux】Linux一切皆文件

Linux一切皆文件 Linux一切皆文件教程 Linux 中所有内容都是以文件的形式保存和管理的&#xff0c;即一切皆文件&#xff0c;普通文件是文件&#xff0c;目录是文件&#xff0c;硬件设备&#xff08;键盘、监视器、硬盘、打印机&#xff09;是文件&#xff0c;就连套接字&…...

CUDA C++扩展的详细描述

CUDA C扩展的详细描述 文章目录CUDA C扩展的详细描述CUDA函数执行空间说明符B.1.1 \_\_global\_\_B.1.2 \_\_device\_\_B.1.3 \_\_host\_\_B.1.4 Undefined behaviorB.1.5 __noinline__ and __forceinline__B.2 Variable Memory Space SpecifiersB.2.1 \_\_device\_\_B.2.2. \_…...

为什么重写equals必须重写hashCode

关于这个问题&#xff0c;看了网上很多答案&#xff0c;感觉都参差不齐&#xff0c;没有答到要点&#xff0c;这次就记录一下&#xff01; 首先我们为什么要重写equals&#xff1f;这个方法是用来干嘛的&#xff1f; public boolean equals &#xff08;Object object&#x…...

< 每日小技巧:N个很棒的 Vue 开发技巧, 持续记录ing >

每日小技巧&#xff1a;6 个很棒的 Vue 开发技巧&#x1f449; ① Watch 妙用> watch的高级使用> 一个监听器触发多个方法> watch 监听多个变量&#x1f449; ② 自定义事件 $emit() 和 事件参数 $event&#x1f449; ③ 监听组件生命周期常规写法hook写法&#x1f44…...

数据结构与算法之二分查找分而治之思想

决定我们成为什么样人的&#xff0c;不是我们的能力&#xff0c;而是我们的选择。——《哈利波特与密室》二分查找是查找算法里面是很优秀的一个算法&#xff0c;特别是在有序的数组中&#xff0c;这种算法思想体现的淋漓尽致。一.题目描述及其要求请实现无重复数字的升序数组的…...

训练自己的中文word2vec(词向量)--skip-gram方法

训练自己的中文word2vec&#xff08;词向量&#xff09;–skip-gram方法 什么是词向量 ​ 将单词映射/嵌入&#xff08;Embedding&#xff09;到一个新的空间&#xff0c;形成词向量&#xff0c;以此来表示词的语义信息&#xff0c;在这个新的空间中&#xff0c;语义相同的单…...

ubuntu系统环境配置和常用软件安装

系统环境 修改文件夹名称为英文 参考链接 export LANGen_US xdg-user-dirs-gtk-update 常用软件安装 常用工具 ping 和ifconfig工具 sudo apt install -y net-tools inetutils-ping 截图软件 sudo apt install -y net-tools inetutils-ping flameshot 录屏 sudo apt-get i…...

【1139. 最大的以 1 为边界的正方形】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 给你一个由若干 0 和 1 组成的二维网格 grid&#xff0c;请你找出边界全部由 1 组成的最大 正方形 子网格&#xff0c;并返回该子网格中的元素数量。如果不存在&#xff0c;则返回 0。 示例 1&#…...

windows11安装sqlserver2022报错

window11安装SQL Server 2022 报错 糟糕… 无法安装SQL Server (setup.exe)。此 SQL Server安装程序介质不支持此OS的语言&#xff0c;或没有SQL Server英语版本的安装文件。请使用匹配的特定语言SQL Server介质;或安装两个特定语言MUI&#xff0c;然后通过控制面板的区域设置…...

Python快速上手系列--日志模块--详解篇

前言本篇主要说说日志模块&#xff0c;在写自动化测试框架的时候我们就需要用到这个模块了&#xff0c;方便我们快速的定位错误&#xff0c;了解软件的运行情况&#xff0c;更加顺畅的调试程序。为什么要用到日志模块&#xff0c;直接print不就好了&#xff01;那得写多少print…...

【THREE.JS学习(1)】绘制一个可以旋转、放缩的立方体

学习新技能&#xff0c;做一下笔记。在使用ThreeJS的时候&#xff0c;首先创建一个场景const scene new THREE.Scene();接着&#xff0c;创建一个相机其中&#xff0c;THREE.PerspectiveCamera&#xff08;&#xff09;四个参数分别为&#xff1a;1.fov 相机视锥体竖直方向视野…...

数仓实战 - 滴滴出行

项目大致流程&#xff1a; 1、项目业务背景 1.1 目的 本案例将某出行打车的日志数据来进行数据分析&#xff0c;例如&#xff1a;我们需要统计某一天订单量是多少、预约订单与非预约订单的占比是多少、不同时段订单占比等 数据海量 – 大数据 hive比MySQL慢很多 1.2 项目架…...

python虚拟环境与环境变量

一、环境变量 1.环境变量 在命令行下&#xff0c;使用可执行文件&#xff0c;需要来到可执行文件的路径下执行 如果在任意路径下执行可执行文件&#xff0c;能够有响应&#xff0c;就需要在环境变量配置 2.设置环境变量 用户变量&#xff1a;当前用户登录到系统&#xff0c;…...

BeautifulSoup文档4-详细方法 | 用什么方法对文档树进行搜索?

4-详细方法 | 用什么方法对文档树进行搜索&#xff1f;1 过滤器1.1 字符串1.2 正则表达式1.3 列表1.4 True1.5 可以自定义方法2 find_all()2.1 参数原型2.2 name参数2.3 keyword 参数2.4 string 参数2.5 limit 参数2.6 recursive 参数3 find()4 find_parents()和find_parent()5…...

初识Tkinter界面设计

目录 前言 一、初识Tkinter 二、Label控件 三、Button控件 四、Entry控件 前言 本文简单介绍如何使用Python创建一个界面。 一、初识Tk...

软件测试面试题中的sql题目你会做吗?

目录 1.学生表 2.一道SQL语句面试题&#xff0c;关于group by表内容&#xff1a; 3.表中有A B C三列,用SQL语句实现&#xff1a;当A列大于B列时选择A列否则选择B列&#xff0c;当B列大于C列时选择B列否则选择C列 4. 5.姓名&#xff1a;name 课程&#xff1a;subject 分数&…...

VS实用调试技巧

一.什么是BUG&#x1f41b;Bug一词的原意是虫子&#xff0c;而在电脑系统或程序中隐藏着的一些未被发现的缺陷或问题&#xff0c;人们也叫它"bug"。这是为什么呢&#xff1f;这就要追溯到一个程序员与飞蛾的故事了。Bug的创始人格蕾丝赫柏&#xff08;Grace Murray H…...

通俗易懂理解三次握手、四次挥手(TCP)

文章目录1、通俗语言理解1.1 三次握手1.2 四次挥手2、进一步理解三次握手和四次挥手2.1 三次握手2.2 四次挥手1、通俗语言理解 1.1 三次握手 C:客户端 S&#xff1a;服务器端 第一次握手&#xff1a; C&#xff1a;在吗&#xff1f;我要和你建立连接。 第二次握手&#xff…...

TDengine 快速体验(Docker 镜像方式)

简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能&#xff0c;本节首先介绍如何通过 Docker 快速体验 TDengine&#xff0c;然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker&#xff0c;请使用 安装包的方式快…...

从零实现富文本编辑器#5-编辑器选区模型的状态结构表达

先前我们总结了浏览器选区模型的交互策略&#xff0c;并且实现了基本的选区操作&#xff0c;还调研了自绘选区的实现。那么相对的&#xff0c;我们还需要设计编辑器的选区表达&#xff0c;也可以称为模型选区。编辑器中应用变更时的操作范围&#xff0c;就是以模型选区为基准来…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...

基于SpringBoot在线拍卖系统的设计和实现

摘 要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统&#xff0c;主要的模块包括管理员&#xff1b;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...