当前位置: 首页 > news >正文

【kafka实战】06 kafkaTemplate java代码使用示例

在 Spring Boot 中使用 KafkaTemplate 可以方便地向 Kafka 发送消息。下面为你详细介绍使用步骤和示例代码。

1. 创建 Spring Boot 项目

你可以使用 Spring Initializr(https://start.spring.io/ )来创建一个新的 Spring Boot 项目,添加以下依赖:

  • Spring for Apache Kafka

或者在 pom.xml 中手动添加依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>

2. 配置 Kafka

application.propertiesapplication.yml 中配置 Kafka 的相关信息,示例如下:

application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
application.yml
spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

3. 创建消息生产者服务

创建一个服务类,使用 KafkaTemplate 发送消息。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private static final String TOPIC_NAME = "test_topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(TOPIC_NAME, message);System.out.println("Message sent: " + message);}public void sendMessageWithKey(String key, String message) {kafkaTemplate.send(TOPIC_NAME, key, message);System.out.println("Message sent with key " + key + ": " + message);}
}

4. 创建控制器(可选)

如果你想通过 RESTful API 触发消息发送,可以创建一个控制器。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/kafka")
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;@GetMapping("/send/{message}")public String sendMessage(@PathVariable String message) {kafkaProducerService.sendMessage(message);return "Message sent successfully";}@GetMapping("/send/{key}/{message}")public String sendMessageWithKey(@PathVariable String key, @PathVariable String message) {kafkaProducerService.sendMessageWithKey(key, message);return "Message with key sent successfully";}
}

5. 异步发送消息并处理结果

KafkaTemplatesend 方法返回一个 ListenableFuture,可以用来异步处理消息发送的结果。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;@Service
public class AsyncKafkaProducerService {private static final String TOPIC_NAME = "test_topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendAsyncMessage(String message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {System.out.println("Message sent successfully: " + message + ", Offset: " + result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {System.err.println("Failed to send message: " + message + ", Error: " + ex.getMessage());}});}
}

6. 运行 Spring Boot 应用

启动 Spring Boot 应用程序,你可以通过以下方式测试消息发送:

  • 如果使用了控制器,可以通过访问 http://localhost:8080/kafka/send/your_messagehttp://localhost:8080/kafka/send/your_key/your_message 来发送消息。
  • 也可以在代码中直接调用 KafkaProducerServiceAsyncKafkaProducerService 的方法来发送消息。

7. 注意事项

  • 确保你的 Kafka 服务正在运行,并且 bootstrap-servers 配置正确。
  • 对于不同的数据类型,你可能需要调整 key-serializervalue-serializer 的配置。例如,如果要发送 JSON 数据,可以使用 org.springframework.kafka.support.serializer.JsonSerializer

通过以上步骤,你可以在 Spring Boot 项目中使用 KafkaTemplate 方便地向 Kafka 发送消息。

相关文章:

【kafka实战】06 kafkaTemplate java代码使用示例

在 Spring Boot 中使用 KafkaTemplate 可以方便地向 Kafka 发送消息。下面为你详细介绍使用步骤和示例代码。 1. 创建 Spring Boot 项目 你可以使用 Spring Initializr&#xff08;https://start.spring.io/ &#xff09;来创建一个新的 Spring Boot 项目&#xff0c;添加以下…...

Java 23新特性

文章目录 Java 23新特性一、引言二、Markdown文档注释&#xff08;JEP 467&#xff09;示例 三、ZGC&#xff1a;默认的分代模式&#xff08;JEP 474&#xff09;1. 为什么要引入分代模式2. 使用分代模式的优势3. 如何启用分代模式 四、隐式声明的类和实例主方法&#xff08;JE…...

bat脚本实现自动化漏洞挖掘

bat脚本 BAT脚本是一种批处理文件&#xff0c;可以在Windows操作系统中自动执行一系列命令。它们可以简化许多日常任务&#xff0c;如文件操作、系统配置等。 bat脚本执行命令 echo off#下面写要执行的命令 httpx 自动存活探测 echo off httpx.exe -l url.txt -o 0.txt nuc…...

[创业之路-285]:《产品开发管理-方法.流程.工具 》-1- IPD的功能列表以及导入步骤

一、概述&#xff1a; 对于没有IPD&#xff08;集成产品开发&#xff09;流程的公司来说&#xff0c;导入IPD需要循序渐进、有序进行&#xff0c;而不是一步到位。这是因为IPD不仅仅是一种新的产品开发流程&#xff0c;它还涉及到公司文化、组织结构、团队协作方式以及思维方式…...

Redis命令:列表模糊删除详解

前言 在Redis中&#xff0c;列表&#xff08;List&#xff09;是一种非常常用的数据结构&#xff0c;允许存储多个有序的元素。然而&#xff0c;在实际应用中&#xff0c;可能会遇到需要删除列表中符合某种模式的元素的需求。本文将详细介绍如何在Redis中实现列表的模糊删除。…...

Day36-【13003】短文,数组的行主序方式,矩阵的压缩存储,对称、三角、稀疏矩阵和三元组线性表,广义表求长度、深度、表头、表尾等

文章目录 本次课程内容第四章 数组、广义表和串第一节 数组及广义表数组的基本操作数组的顺序存储方式-借用矩阵行列式概念二维数组C语言对应的函数-通常行主序方式 矩阵的压缩存储对称矩阵和三角矩阵压缩存储后&#xff0c;采用不同的映射函数稀疏矩阵-可以构成三元组线性表三…...

大数据sql查询速度慢有哪些原因

1.索引问题 可能缺少索引&#xff0c;也有可能是索引不生效 2.连接数配置&#xff1a;连接数过少/连接池比较小 连接数过 3.sql本身有问题&#xff0c;响应比较慢&#xff0c;比如多表 4.数据量比较大 -这种最好采用分表设计 或分批查询 5.缓存池大小 可能是缓存问题&#xff…...

文件 I/O 和序列化

文件I/O C#提供了多种方式来读写文件&#xff0c;主要通过System.IO命名空间中的类来实现&#xff0c;下方会列一些常用的类型&#xff1a; StreamReader/StreamWriter&#xff1a;用于以字符为单位读取或写入文本文件。 BinaryReader/BinaryWriter&#xff1a;用于以二进制格…...

机器学习中的关键概念:通过SKlearn的MNIST实验深入理解

欢迎来到我的主页&#xff1a;【Echo-Nie】 本篇文章收录于专栏【机器学习】 1 sklearn相关介绍 Scikit-learn 是一个广泛使用的开源机器学习库&#xff0c;提供了简单而高效的数据挖掘和数据分析工具。它建立在 NumPy、SciPy 和 matplotlib 等科学计算库之上&#xff0c;支持…...

HELLOCTF反序列化靶场全解

level 2 <?php/* --- HelloCTF - 反序列化靶场 关卡 2 : 类值的传递 --- HINT&#xff1a;尝试将flag传递出来~# -*- coding: utf-8 -*- # Author: 探姬 # Date: 2024-07-01 20:30 # Repo: github.com/ProbiusOfficial/PHPSerialize-labs # email: adminhello-ctf.com…...

十二、Docker Compose 部署 SpringCloudAlibaba 微服务

一、部署基础服务 0、项目部署结构 项目目录结构如下: /home/zhzl_hebei/ ├── docker-compose.yml └── geochance-auth/└── Dockerfile└── geochance-auth.jar └── geochance-system/└── Dockerfile└── geochance-system.jar └── geochance-gateway/…...

VUE之插槽

1、默认插槽 <template><div class"father"></div><h3>父组件</h3><div class"content"><Category title"热门游戏列表"><ul><li v-for"g in games" :key"g.id">{{…...

4. Go结构体使用

1、结构体的简介 结构体&#xff08;Struct&#xff09;是编程语言中常见的一种复合数据类型&#xff0c;它将不同类型的数据元素&#xff08;成员&#xff09;组合成一个单一的实体。通过结构体&#xff0c;程序员可以将具有不同类型和性质的信息绑定到一个对象中&#xff0c…...

版本控制的重要性及 Git 入门

版本控制&#xff1a;软件开发的基石 在软件开发的浩瀚宇宙中&#xff0c;版本控制无疑是那颗最为闪耀的恒星&#xff0c;照亮了整个开发过程&#xff0c;成为现代软件开发不可或缺的基石。 历史追溯&#xff0c;定位问题根源 版本控制就像是一位不知疲倦的史官&#xff0c;…...

[NKU]C++安装环境 VScode

bilibili安装教程 vscode 关于C/C的环境配置全站最简单易懂&#xff01;&#xff01;大学生及初学初学C/C进&#xff01;&#xff01;&#xff01;_哔哩哔哩_bilibili 1安装vscode和插件 汉化插件 ​ 2安装插件 2.1 C/C 2.2 C/C Compile run ​ 2.3 better C Syntax ​ 查看已…...

deepseek本地部署

DeepSeek本地部署详细指南 DeepSeek作为一款开源且性能强大的大语言模型&#xff0c;提供了灵活的本地部署方案&#xff0c;让用户能够在本地环境中高效运行模型&#xff0c;同时保护数据隐私&#xff0c;这里记录自己DeepSeek本地部署流程。 主机环境 cpu:amd 7500Fgpu:406…...

网络编程day1

实例&#xff1a; struct sockaddr_in addr {0};//初始化 addr.sin_family AF_INET;//设置地址族 addr.sin_port htons(8888);//设置端口号 addr.sin_addr.s_addr inet_addr("192.168.1.1"); //设置ip地址 bind(sock,(struct sockaddr *)&addr,sizeof(ad…...

QFileDialog::getOpenFileName(this,“文件对话框“,“.“,“c++ files(*.cpp);;“); 文件对话框显示乱码

在使用 QFileDialog::getOpenFileName 时&#xff0c;如果文件对话框显示乱码&#xff0c;通常是因为编码问题。Qt 默认使用 UTF-8 编码&#xff0c;但如果你的系统或源代码文件的编码不一致&#xff0c;可能会导致乱码。 以下是几种可能的解决方法&#xff1a; 1. 确保源代码…...

绿联NAS安装cpolar内网穿透工具实现无公网IP远程访问教程

文章目录 前言1. 开启ssh服务2. ssh连接3. 安装cpolar内网穿透4. 配置绿联NAS公网地址 前言 本文主要介绍如何在绿联NAS中使用ssh远程连接后&#xff0c;使用一行代码快速安装cpolar内网穿透工具&#xff0c;轻松实现随时随地远程访问本地内网中的绿联NAS&#xff0c;无需公网…...

C++学习——缺省参数、重载函数、引用

目录 前言 一、缺省参数 1.1概念 1.2写法 1.3半缺省 1.4使用 二、重载函数 2.1.概念 2.2类型 2.3参数 2.4顺序 2.5问题 2.6原理 三、引用 1、引用是什么&#xff1f; 2、引用的使用方法 3、引用特性 1、引用在定义的时候必须要初始化 2、一个变量会有多个引用…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

线程同步:确保多线程程序的安全与高效!

全文目录&#xff1a; 开篇语前序前言第一部分&#xff1a;线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分&#xff1a;synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分&#xff…...

【项目实战】通过多模态+LangGraph实现PPT生成助手

PPT自动生成系统 基于LangGraph的PPT自动生成系统&#xff0c;可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析&#xff1a;自动解析Markdown文档结构PPT模板分析&#xff1a;分析PPT模板的布局和风格智能布局决策&#xff1a;匹配内容与合适的PPT布局自动…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

Spring数据访问模块设计

前面我们已经完成了IoC和web模块的设计&#xff0c;聪明的码友立马就知道了&#xff0c;该到数据访问模块了&#xff0c;要不就这俩玩个6啊&#xff0c;查库势在必行&#xff0c;至此&#xff0c;它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据&#xff08;数据库、No…...

管理学院权限管理系统开发总结

文章目录 &#x1f393; 管理学院权限管理系统开发总结 - 现代化Web应用实践之路&#x1f4dd; 项目概述&#x1f3d7;️ 技术架构设计后端技术栈前端技术栈 &#x1f4a1; 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 &#x1f5c4;️ 数据库设…...