当前位置: 首页 > news >正文

spring boot 使用 Kafka

一、Kafka作为消息队列的好处

  1. 高吞吐量:Kafka能够处理大规模的数据流,并支持高吞吐量的消息传输。

  2. 持久性:Kafka将消息持久化到磁盘上,保证了消息不会因为系统故障而丢失。

  3. 分布式:Kafka是一个分布式系统,可以在多个节点上运行,具有良好的可扩展性和容错性。

  4. 支持多种协议:Kafka支持多种协议,如TCP、HTTP、UDP等,可以与不同的系统进行集成。

  5. 灵活的消费模式:Kafka支持多种消费模式,如拉取和推送,可以根据需要选择合适的消费模式。

  6. 可配置性强:Kafka的配置参数非常丰富,可以根据需要进行灵活配置。

  7. 社区支持:Kafka作为Apache旗下的开源项目,拥有庞大的用户基础和活跃的社区支持,方便用户得到及时的技术支持。

二、springboot中使用Kafka

  1. 添加依赖:在pom.xml文件中添加Kafka的依赖,包括spring-kafka和kafka-clients。确保版本与你的项目兼容。

  2. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

  3. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

  4. 发送消息:在需要发送消息的地方,注入Kafka生产者,并使用其发送消息到指定的Kafka主题。

  5. 创建消费者:创建一个Kafka消费者类,实现Consumer接口,并使用KafkaTemplate订阅指定的Kafka主题。

  6. 配置消费者:在Spring Boot的配置文件中配置Kafka消费者的相关参数,例如group id、auto offset reset等。

  7. 接收消息:在需要接收消息的地方,注入Kafka消费者,并使用其接收消息。

  8. 处理消息:对接收到的消息进行处理,例如保存到数据库或进行其他业务逻辑处理。

三、使用Kafka

pom中填了依赖

<dependency>  <groupId>org.springframework.kafka</groupId>  <artifactId>spring-kafka</artifactId>  <version>2.8.1</version>  
</dependency>  
<dependency>  <groupId>org.apache.kafka</groupId>  <artifactId>kafka-clients</artifactId>  <version>2.8.1</version>  
</dependency>
  1. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

import org.apache.kafka.clients.producer.*;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  @Component  
public class KafkaProducer {  @Value("${kafka.bootstrap}")  private String bootstrapServers;  @Value("${kafka.topic}")  private String topic;  private KafkaTemplate<String, String> kafkaTemplate;  public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {  this.kafkaTemplate = kafkaTemplate;  }  public void sendMessage(String message) {  Producer<String, String> producer = new KafkaProducer<>(bootstrapServers, new StringSerializer(), new StringSerializer());  try {  producer.send(new ProducerRecord<>(topic, message));  } catch (Exception e) {  e.printStackTrace();  } finally {  producer.close();  }  }  
}
  1. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.kafka.core.DefaultKafkaProducerFactory;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.kafka.core.ProducerFactory;  
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;  
import org.springframework.kafka.core.ConsumerFactory;  
import org.springframework.kafka.core.ConsumerConfig;  
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;  
import org.springframework.kafka.listener.MessageListener;  
import org.springframework.context.annotation.PropertySource;  
import java.util.*;  
import org.springframework.beans.factory.*;  
import org.springframework.*;  
import org.springframework.*;expression.*;value; 																																		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	  @Value("${kafka}")   Properties kafkaProps = new Properties(); @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf){ KafkaTemplate<String, String> template = new KafkaTemplate<>(pf); template .setMessageConverter(new StringJsonMessageConverter()); template .setSendTimeout(Duration .ofSeconds(30)); return template ; } @Bean public ProducerFactory<String, String> producerFactory(){ DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaProps); factory .setBootstrapServers(bootstrapServers); factory .setKeySerializer(new StringSerializer()); factory .setValueSerializer(new StringSerializer()); return factory ; } @Bean public ConsumerFactory<String, String> consumerFactory(){ DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(consumerConfigProps); factory .setBootstrapServers(bootstrapServers); factory .setKeyDeserializer(new StringDeserializer()); factory .setValueDeserializer(new StringDeserializer()); return factory ; } @Bean public ConcurrentMessageListenerContainer<String, String> container(ConsumerFactory<String, String> consumerFactory, MessageListener listener){ ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory); container .setMessageListener(listener); container .setConcurrency(3); return container ; } @Bean public MessageListener

消费者

import org.apache.kafka.clients.consumer.*;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  @Component  
public class KafkaConsumer {  @Value("${kafka.bootstrap}")  private String bootstrapServers;  @Value("${kafka.group}")  private String groupId;  @Value("${kafka.topic}")  private String topic;  private KafkaTemplate<String, String> kafkaTemplate;  public KafkaConsumer(KafkaTemplate<String, String> kafkaTemplate) {  this.kafkaTemplate = kafkaTemplate;  }  public void consume() {  Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs());  consumer.subscribe(Collections.singletonList(topic));  while (true) {  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  for (ConsumerRecord<String, String> record : records) {  System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  }  }  }  private Properties consumerConfigs() {  Properties props = new Properties();  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  return props;  }  
}

四、kafka与rocketMQ比较

Kafka和RocketMQ都是开源的消息队列系统,它们具有许多相似之处,但在一些关键方面也存在差异。以下是它们在数据可靠性、性能、消息传递方式等方面的比较:

  1. 数据可靠性:
  • Kafka使用异步刷盘方式,而RocketMQ支持异步实时刷盘、同步刷盘、同步复制和异步复制。这使得RocketMQ在单机可靠性上比Kafka更高,因为它不会因为操作系统崩溃而导致数据丢失。此外,RocketMQ新增的同步刷盘机制也进一步保证了数据的可靠性。
  1. 性能:
  • Kafka和RocketMQ在性能方面各有千秋。由于Kafka的数据以partition为单位,一个Kafka实例上可能有多达上百个partition,而一个RocketMQ实例上只有一个partition。这使得RocketMQ可以充分利用IO组的commit机制,批量传输数据,从而在replication时具有更好的性能。然而,Kafka的异步replication性能理论上低于RocketMQ的replication,因为同步replication与异步replication相比,性能上会有约20%-30%的损耗。
  1. 消息传递方式:
  • Kafka和RocketMQ在消息传递方式上也有所不同。Kafka采用Producer发送消息后,broker马上把消息投递给consumer,这种方式实时性较高,但会增加broker的负载。而RocketMQ基于Pull模式和Push模式的长轮询机制,来平衡Push和Pull模式各自的优缺点。RocketMQ的消息及时性较好,严格的消息顺序得到了保证。
  1. 其他特性:
  • Kafka在单机支持的队列数超过64个队列,而RocketMQ最高支持5万个队列。队列越多,可以支持的业务就越多。

五、kafka使用场景

  1. 实时数据流处理:Kafka可以处理大量的实时数据流,这些数据流可以来自不同的源,如用户行为、传感器数据、日志文件等。通过Kafka,可以将这些数据流进行实时的处理和分析,例如进行实时数据分析和告警。
  2. 消息队列:Kafka可以作为一个消息队列使用,用于在分布式系统中传递消息。它能够处理高吞吐量的消息,并保证消息的有序性和可靠性。
  3. 事件驱动架构:Kafka可以作为事件驱动架构的核心组件,将事件数据发布到不同的消费者,以便进行实时处理。这种架构可以简化应用程序的设计和开发,提高系统的可扩展性和灵活性。
  4. 数据管道:Kafka可以用于数据管道,将数据从一个系统传输到另一个系统。例如,可以将数据从数据库或日志文件传输到大数据平台或数据仓库。
  5. 业务事件通知:Kafka可以用于通知业务事件,例如订单状态变化、库存更新等。通过订阅Kafka主题,相关的应用程序和服务可以实时地接收到这些事件通知,并进行相应的处理。
  6. 流数据处理框架集成:Kafka可以与流处理框架集成,如Apache Flink、Apache Spark等。通过集成,可以将流数据从Kafka中实时导入到流处理框架中进行处理,实现流式计算和实时分析。

 

相关文章:

spring boot 使用 Kafka

一、Kafka作为消息队列的好处 高吞吐量&#xff1a;Kafka能够处理大规模的数据流&#xff0c;并支持高吞吐量的消息传输。 持久性&#xff1a;Kafka将消息持久化到磁盘上&#xff0c;保证了消息不会因为系统故障而丢失。 分布式&#xff1a;Kafka是一个分布式系统&#xff0c…...

LFU缓存(Leetcode460)

例题&#xff1a; 分析&#xff1a; 这道题可以用两个哈希表来实现&#xff0c;一个hash表&#xff08;kvMap&#xff09;用来存储节点&#xff0c;另一个hash表&#xff08;freqMap&#xff09;用来存储双向链表&#xff0c;链表的头节点代表最近使用的元素&#xff0c;离头节…...

Vue学习笔记:计算属性

计算属性 入门进阶二次进阶三次进阶四次进阶结界五次进阶六次进阶七次进阶八次进阶九次进阶终章彩蛋 入门 Vue.js中&#xff0c;计算属性示例&#xff1a; export default {data() {return {firstName: John,lastName: Doe};},computed: {// 计算属性&#xff1a;全名fullNam…...

深度学习本科课程 实验2 前馈神经网络

任务 3.3 课程实验要求 &#xff08;1&#xff09;手动实现前馈神经网络解决上述回归、二分类、多分类任务 l 从训练时间、预测精度、Loss变化等角度分析实验结果&#xff08;最好使用图表展示&#xff09; &#xff08;2&#xff09;利用torch.nn实现前馈神经网络解决上述回归…...

【python】python爱心代码【附源码】

一、实现效果&#xff1a; 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 二、完整代码&#xff1a; import math import random import threading import time from math import sin, cos, pi, log from tkinter import * import re# 烟花相关设置 Fireworks [] m…...

Linux---信号

前言 到饭点了&#xff0c;我点了一份外卖&#xff0c;然后又开了一把网游&#xff0c;这个时候&#xff0c;我在打游戏的过程中&#xff0c;我始终记得外卖小哥会随时给我打电话&#xff0c;通知我我去取外卖&#xff0c;这个时候游戏还没有结束。我在打游戏的过程中需要把外…...

24种设计模式之行为型模式(下)-Java版

软件设计模式是前辈们代码设计经验的总结&#xff0c;可以反复使用。设计模式共分为3大类&#xff0c;创建者模式(6种)、结构型模式(7种)、行为型模式(11种)&#xff0c;一共24种设计模式&#xff0c;软件设计一般需要满足7大基本原则。下面通过5章的学习一起来看看设计模式的魅…...

基于微信小程序的校园水电费管理小程序的研究与实现

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…...

python二维高斯热力图绘制简单的思路代码

import numpy as np import matplotlib.pyplot as plt from scipy.ndimage import gaussian_filter import cv2# 生成一个示例图像 image_size 100 image np.zeros((image_size, image_size))# 在图像中心创建一个高亮区域 center_x, center_y image_size // 2, image_size …...

k8s 部署 nocas 同时部署mysql

使用 ygqygq2 的 helm 模板部署 官方地址&#xff1a;https://artifacthub.io/packages/helm/ygqygq2/nacos 添加 helm 仓库 helm repo add ygqygq2 https://ygqygq2.github.io/charts/下载 helm 安装文件 helm pull ygqygq2/nacos解压 tar -zxvf nacos-2.1.6.tgz执行 hel…...

GolangCI-Lint配置变更实践

GolangCI-Lint配置变更实践 Golang编程中&#xff0c;为了便于调试和代码质量和安全性检查。利用该方法可以在开发周期的早期捕获错误&#xff0c;并且检查团队编程风格&#xff0c;提高一致性。这对团队协作开发特别有用&#xff0c;可以提高开发的效率&#xff0c;保持代码质…...

UE中对象创建方法示例和类的理解

对象创建方法示例集 创建Actor示例 //创建一个护甲道具 AProp* armor GetWorld()->SpawnActor<AProp>(pos, rotator); 创建Component示例 UCapsuleComponent* CapsuleComponent CreateDefaultSubobject<UCapsuleComponent>(TEXT("CapsuleComponent&qu…...

ElementUI鼠标拖动没列宽度

其实 element ui 表格Table有提供给我们一个resizable属性 按官方文档上描述 它就是控制是否允许拖拽表格列大小的属性 而且 它的默认值就是 true 但是依旧很多人会反应拖拽不了 首先 表格要有边框 如果没有变宽 确实是拖拽不了 给 el-table加上 border属性 运行结果如下 但…...

Flutter canvas 画一条会动的波浪线 进度条

之前用 Flutter Canvas 画过一个三角三角形&#xff0c;html 的 Canvas 也画过一次类似的&#xff0c; 今天用 Flutter Canvas 试了下 感觉差不多&#xff1a; html 版本 大致效果如下&#xff1a; 思路和 html 实现的类似&#xff1a; 也就是找出点的位置&#xff0c;使用二阶…...

算法训练营day22, 回溯2

216. 组合总和 III func combinationSum3(k int, n int) [][]int { //存储全部集合 result : make([][]int, 0) //存储单次集合 path : make([]int, 0) var backtrace func(k int, n int, sum int, startIndex int) backtrace func(k int, n int, sum int, startIndex int) {…...

undefined symbol: avio_protocol_get_class, version LIBAVFORMAT_58

rv1126上进行编译和在虚拟机里面进行交叉编译ffmpeg都不行 解决办法查看 查看安装的ffmpeg链接的文件 ldd ./ffmpeg rootEASY-EAI-NANO:/home/nano/ffmpeg-4.3.6# ldd ffmpeg linux-vdso.so.1 (0xaeebd000)libavdevice.so.58 > /lib/arm-linux-gnueabihf/libavde…...

Android简单支持项目符号的EditText

一、背景及样式效果 因项目需要&#xff0c;需要文本编辑时&#xff0c;支持项目符号&#xff08;无序列表&#xff09;尝试了BulletSpan&#xff0c;但不是很理想&#xff0c;并且考虑到影响老版本回显等因素&#xff0c;最终决定自定义一个BulletEditText。 先看效果&…...

【axios报错异常】: Uncaught ReferenceError: axios is not defined

问题描述: 当前代码在vivo手机和小米手机运行是正常的,点击分享按钮调出相关弹框,发送接口进行分享,但是现在oppo手机出现了问题: 点击分享按钮没有反应. 问题解析: 安卓同事经过查询后,发现打印了错误: 但是不清楚这个问题是安卓端造成的还是前端造成的,大家都不清楚. 问题…...

Docker基础与持续集成

docker 基础知识&#xff1a; docker与虚拟机 !左边为虚拟机&#xff0c;右边为docker环境 – Server :物理机服务器Host OS &#xff1a;构建的操作系统Hypervisor &#xff1a;一种虚拟机软件&#xff0c;装了之后才能虚拟化操作系统Guest OS &#xff1a;虚拟化的操作系统…...

flutter开发实战-ijkplayer视频播放器功能

flutter开发实战-ijkplayer视频播放器功能 使用better_player播放器进行播放视频时候&#xff0c;在Android上会出现解码失败的问题&#xff0c;better_player使用的是video_player&#xff0c;video_player很多视频无法解码。最终采用ijkplayer播放器插件&#xff0c;在flutt…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件

今天呢&#xff0c;博主的学习进度也是步入了Java Mybatis 框架&#xff0c;目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学&#xff0c;希望能对大家有所帮助&#xff0c;也特别欢迎大家指点不足之处&#xff0c;小生很乐意接受正确的建议&…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)

文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

uniapp中使用aixos 报错

问题&#xff1a; 在uniapp中使用aixos&#xff0c;运行后报如下错误&#xff1a; AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...