当前位置: 首页 > news >正文

springboot整合kafka入门

kafka基本概念

producer: 生产者,负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等。

consumer: 消费者,每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。创建消费者时,要指定消费者接受的消息的topic,该消费者只会接受该topic的消息。

topic: 每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)。

broker: kafka集群包含一个或多个服务器,这些服务器就叫做broker。

本机安装kafka测试

安装kafka(mac下)

kafka下载: 从官网下载 kafka_2.13-2.7.0.tgz,直接解压即可。

本机测试kafka

1、进入到kafka的解压目录,输入命令启动zookeeper:

./bin/zookeeper-server-start.sh config/zookeeper.properties

复制

打开另一个终端输入命令启动kafka:

./bin/kafka-server-start.sh config/server.properties 

复制

2、服务启起来后,可以创建生产者和消费者了。 再打开另一个终端输入命令创建生产者:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

复制

broker-list: 参数指定生产者所使用的broker localhost: 9092 参数表示broker,这个broker为本机(127.0.0.1),且使用的端口是kafka的默认端口号是9092 topic: 参数表示生产者生产的消息的topic 为 “test_topic”

最后再打开另一个终端创建消费者:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning

复制

bootstrap-server: 是指定consumer从哪里(broker)取出消息 topic: 指定消费者consumer取出的 topic 为“test_topic”的消息。 from-beginning: Kafka实际环境有可能会出现Consumer全部宕机,虽然基于Kafka的高可用特性,消费者群组中的消费者可以实现再均衡,所有Consumer不处理数据的情况很少,但是还是有可能会出现,此时就要求Consumer重启的时候能够读取在宕机期间Producer发送的数据。基于消费者订阅模式默认是无法实现的,因为只能订阅最新发送的数据。通过消费者命令行可以实现,只要在命令行中加上–from-beginning即可

3、都创建完了可以通过生产者输入消息,消费者来接收并显示消息,效果图如下:

springboot整合kafka(IDEA)

注意: kafka要是部署在服务器的话,本机就 要和服务器之间能ping通。

1、创建springboot项目:

2、创建两个类,分别为生产者和消费者 项目目录结构:

配置文件application.yml:(一般项目自动生成的是applicaiton.properties,但为了书写简便,改成yml)

spring:kafka:bootstrap-servers: 127.0.0.1:9092 #服务器的ip及端口,可以写多个,服务器之间用“:”间隔producer: #生产者配置key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: #消费者配置group-id: test #设置消费者的组idenable-auto-commit: true
# auto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

复制

springboot启动类入口,KafkaStudyApplication.java:

package com.study.kafka.kafka_study;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaStudyApplication { public static void main(String[] args) { SpringApplication.run(KafkaStudyApplication.class, args);}}

复制

TestKafkaProducerController.java:(生产者)

package com.study.kafka.kafka_study;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController     //定义这是一个控制器,可以通过浏览器访问
@RequestMapping("/kafka")
public class TestKafkaProducerController { @Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//当在浏览器上输入http://localhost:8080/kafka/send?msg=abc,就会发送abc到服务器上去让消费者接收,msg对应下面的String msg
@RequestMapping("/producerSend")
public String send(String msg){ kafkaTemplate.send("test_topic", msg); //使用kafka模板发送信息
return "success";
}
}

复制

TestConsumer.java:(消费者)

package com.study.kafka.kafka_study;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class TestConsumer { /** * 定义此消费者接收topic为“test_topic”的消息,监听服务器上的kafka是否有相关的消息发过来 * @param record record变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息 * */
@KafkaListener(topics = "test_topic")
public void listen (ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}

复制

测试

1、运行KafkaStudyApplication.java之后,终端上输入消息时,不仅终端上(服务器)运行的测试消费者能收到,IDEA上的程序也能收到。

2、在浏览器上输入http://localhost:8080/kafka/producerSend?msg=web world31231,不仅IDEA上的消费者能收到,在终端(服务器)上运行的测试消费者也能收到:(其中8080是tomcat服务器的端口,springboot默认下带的是tomcat)

相关文章:

springboot整合kafka入门

kafka基本概念 producer&#xff1a; 生产者&#xff0c;负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view&#xff0c;或者是服务器日志&#xff0c;系统CPU、memory等。 consumer&#xff1a; 消费者&#xff0c;每个consumer属于一个特定的c…...

Rust 笔记:Rust 语言中的字符串

Rust 笔记 Rust 语言中的字符串 作者&#xff1a;李俊才 &#xff08;jcLee95&#xff09;&#xff1a;https://blog.csdn.net/qq_28550263?spm1001.2101.3001.5343 邮箱 &#xff1a;291148484163.com 本文地址&#xff1a;https://blog.csdn.net/qq_28550263/article/detail…...

华为OD机试真题 Java 实现【将真分数分解为埃及分数】【牛客练习题】

一、题目描述 分子为1的分数称为埃及分数。现输入一个真分数(分子比分母小的分数,叫做真分数),请将该分数分解为埃及分数。如:8/11 = 1/2+1/5+1/55+1/110。 注:真分数指分子小于分母的分数,分子和分母有可能gcd不为1! 如有多个解,请输出任意一个。 二、输入描述 输…...

Zemax Lumerical | 二维光栅出瞳扩展系统优化

简介 本文提出并演示了一种以二维光栅耦出的光瞳扩展&#xff08;EPE&#xff09;系统优化和公差分析的仿真方法。 在这个工作流程中&#xff0c;我们将使用3个软件进行不同的工作 &#xff0c;以实现优化系统的大目标。首先&#xff0c;我们使用 Lumerical 构建光栅模型并使用…...

Linux-0.11 文件系统read_write.c详解

Linux-0.11 文件系统read_write.c详解 模块简介 该模块实现了文件系统通用的读写的方法read/write/lseek。 根据文件类型的不同&#xff0c;在内部将调用不同的方法。如果是管道文件&#xff0c;则调用pipe.c中的读写方法&#xff0c;如果是字符设备&#xff0c;则会调用cha…...

什么是用户态和内核态?用户态切换内核态会有什么影响?

一、什么是用户态和内核态&#xff1f; 简单来讲&#xff0c;像使用java开发时&#xff0c;调用java中封装的普通方法程序时属于用户态&#xff0c;而操作内存或者cpu比如 new Thread()创建一个线程&#xff0c;Class.forName(xxx.class)这种属于内核态 用户态和内核态是操作系…...

探索iOS之CoreImage框架

CoreImage提供图像处理、人脸识别、图像增强、图像滤镜、图像转场。它操作的数据来自Core Graphics、Core Video、Image IO&#xff0c;使用CPU或GPU进行渲染。CoreImage对底层实现进行封装&#xff0c;为上层提供简单易用的API。 一、CoreImage框架 CoreImage框架分为&#…...

qml 使用Shape 画图形

最近在做项目的时候想这实现一个能够根据相对位置动态改变大小的进度条提示框&#xff0c;偶尔发现了一个很有用的组件Shape这个控件里面可以画各种线条,实线虚线矩形三角形圆角的三角形或者各种自定义形状。下面提供一个2条虚线加上一个矩形的小栗子。更多的自定义形状还是请自…...

MySQL数据库修改root账户密码

博主今天登录数据库遇到了一个问题&#xff0c;通过这篇文章&#xff08;http://t.csdn.cn/58ECT&#xff09;解决了。文中关于修改root账户密码的部分&#xff0c;博主觉得有必要写一篇文章总结下。 第一步&#xff1a;用管理员账户打开CMD 第二步&#xff1a;开启mysql服务 …...

基于springboot+Vue+ Element-Plus+mysql实现学生宿舍管理系统

基于springbootVue Element-Plusmysql实现学生宿舍管理系统 一、系统介绍二、功能展示1.登陆2、主页--学生3、主页--宿舍管理员4.学生管理--管理员5.宿管信息--管理员6.宿舍管理--管理员7.信息管理--管理员8.申请管理--管理员9.访客管理--管理员10.水电费管理--管理员11.卫生管…...

中国人才选拔制度演变

1、世官制 是西周时人们仍保持着牢固的宗族血缘联系&#xff0c;人群基本以族区分&#xff0c;并得到宗法封建制的制度上的保证&#xff0c;从而自然形成了各级宗族长同时也就是各级官长&#xff0c;家国一体、家国同构的统治模式、格局。换句话来讲就是我们所说的世袭制。 其…...

【JavaSE】Java基础语法(十六):抽象类

文章目录 1. 抽象类的概述2. 抽象类的特点3. 抽象类的实用价值4. 抽象类的案例 1. 抽象类的概述 当我们在做子类共性功能抽取时&#xff0c;有些方法在父类中并没有具体的体现&#xff0c;这个时候就需要抽象类了&#xff01; 在Java中&#xff0c;一个没有方法体的方法应该定义…...

【Kafka】超详细介绍

文章目录 概念部署方案磁盘网络CPUpartition的数量 命令查看版本找kafka和zookeeper的ip/porttopic创建 topic查看get topic 列表get topic 详情 修改topic修改分区级别参数(如增加partition) 删除topic设置消息大小上限 生产查看生产生产消息 查看消费server 查看 offset查看积…...

2023 华为 Datacom-HCIE 真题题库 07/12--含解析

多项选择题 1.[试题编号&#xff1a;190187] &#xff08;多选题&#xff09;如图所示的拓扑采用了VXLAN分布式网关&#xff0c;SW1上的VBDIF10配置了&#xff1a;arp-proxy local enable命令&#xff0c;则以下描述中正确的有哪些项&#xff1f; A、SW1收到PC1发往PC2的报文&…...

Spring的作用域和生命周期

目录 1.Bean的作用域 2.Bean的作用域的分类 3.设置作用域 4.Spring的执行流程&#xff08;生命周期&#xff09; 5.Bean的生命周期 1.Bean的作用域 lombok &#xff08;dependency依赖&#xff09; 是为了解决代码的冗余&#xff08;比如说get和set方法&#xff09;那些构造…...

岭回归有看点:正则化参数解密,显著性不再成问题!

一、概述 「L2正则化&#xff08;也称为岭回归&#xff09;」 是一种用于线性回归模型的正则化方法&#xff0c;它通过在模型的损失函数中添加一个惩罚项来防止过拟合。L2正则化的惩罚项是模型参数的平方和&#xff0c;乘以一个正则化参数λ&#xff0c;即&#xff1a; L2正则化…...

Android 12.0修改recovery 菜单项字体大小

1.概述 在Android 12.0进入recovery模式后,界面会g_menu_actions 菜单选项和 提示文字,而这些文字的大小不像上层一样是通过设置属性来表示大小的 而它确是通过字体png图片的大小来计算文字的宽和高的,然后可以修改字体大小 2. 修改recovery 菜单项字体大小的核心类 buil…...

【计算机网络】 7、websocket 概念、sdk、实现

文章目录 一、背景二、简介三、client3.1 ws 构造函数3.2 ws.readyState3.3 ws.onopen3.4 ws.onclose3.5 ws.onmessage3.6 ws.send3.7 ws.bufferedAmount3.8 ws.onerror 四、server4.1 go4.1.1 apifox client4.1.2 js client 五、范式 一、背景 已经有了 http 协议&#xff0c…...

python中的常见运算符

文章目录 算数运算符赋值运算关系运算符逻辑运算符非布尔值的与或非运算条件运算符(也叫三元运算符)运算符的优先级 算数运算符 加法运算符&#xff08;如果两个字符串之间进行加法运算&#xff0c;则会进行拼串操作&#xff09; - 减法运算符 * 乘法运算符&#xff08;如果将字…...

TypeScript类型

TypeScript 是什么&#xff1f; 是以avaScript为基础构建的语言个一JavaScript的超集。可以在任何支持JavaScript的平台中执行。TypeScript扩展了JavaScript,并添加了类型。TS不能被JS解析器直接执行&#xff0c;需要编译成js。 基本类型 声明完变量直赴进行赋值 let c: boo…...

【kafka】Golang实现分布式Masscan任务调度系统

要求&#xff1a; 输出两个程序&#xff0c;一个命令行程序&#xff08;命令行参数用flag&#xff09;和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽&#xff0c;然后将消息推送到kafka里面。 服务端程序&#xff1a; 从kafka消费者接收…...

云计算——弹性云计算器(ECS)

弹性云服务器&#xff1a;ECS 概述 云计算重构了ICT系统&#xff0c;云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台&#xff0c;包含如下主要概念。 ECS&#xff08;Elastic Cloud Server&#xff09;&#xff1a;即弹性云服务器&#xff0c;是云计算…...

React Native 导航系统实战(React Navigation)

导航系统实战&#xff08;React Navigation&#xff09; React Navigation 是 React Native 应用中最常用的导航库之一&#xff0c;它提供了多种导航模式&#xff0c;如堆栈导航&#xff08;Stack Navigator&#xff09;、标签导航&#xff08;Tab Navigator&#xff09;和抽屉…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

【项目实战】通过多模态+LangGraph实现PPT生成助手

PPT自动生成系统 基于LangGraph的PPT自动生成系统&#xff0c;可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析&#xff1a;自动解析Markdown文档结构PPT模板分析&#xff1a;分析PPT模板的布局和风格智能布局决策&#xff1a;匹配内容与合适的PPT布局自动…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...

【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error

在前端开发中&#xff0c;JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作&#xff08;如 Promise、async/await 等&#xff09;&#xff0c;开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝&#xff08;r…...