Kafka消息服务之Java工具类
注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN;
Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关键任务应用程序。
Kafka 可以很好地替代更传统的消息代理。消息代理的使用原因多种多样(将处理与数据生产者分离开来、缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。
Java工具类
此基于kafka客户端的工具类,提供基础的消息发送与监听功能。
pom.xml
<!-- 集成kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.2</version></dependency>
KafkaUtils.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;/*** @Description kafka工具类,提供消息发送与监听*/
public class KafkaUtils {/*** 获取实始化KafkaStreamServer对象* @return*/public static KafkaStreamServer bulidServer(){return new KafkaStreamServer();}/*** 获取实始化KafkaStreamClient对象* @return*/public static KafkaStreamClient bulidClient(){return new KafkaStreamClient();}public static class KafkaStreamServer{KafkaProducer<String, String> kafkaProducer = null;private KafkaStreamServer(){}/*** 创建配置属性* @param host* @param port* @return*/public KafkaStreamServer createKafkaStreamServer(String host, int port){String bootstrapServers = String.format("%s:%d", host, port);if (kafkaProducer != null){return this;}Properties properties = new Properties();//kafka地址,多个地址用逗号分割properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);kafkaProducer = new KafkaProducer<>(properties);return this;}/*** 向kafka服务发送生产者消息* @param topic* @param msg* @return*/public Future<RecordMetadata> sendMsg(String topic, String msg){ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);Future<RecordMetadata> future = kafkaProducer.send(record);System.out.println("消息发送成功:" + msg);return future;}/*** 关闭kafka连接*/public void close(){if (kafkaProducer != null){kafkaProducer.flush();kafkaProducer.close();kafkaProducer = null;}}}public static class KafkaStreamClient {KafkaConsumer<String, String> kafkaConsumer = null;private KafkaStreamClient(){}/*** 配置属性,创建消费者* @param host* @param port* @return*/public KafkaStreamClient createKafkaStreamClient(String host, int port, String groupId){String bootstrapServers = String.format("%s:%d", host, port);if (kafkaConsumer != null){return this;}Properties properties = new Properties();//kafka地址,多个地址用逗号分割properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);kafkaConsumer = new KafkaConsumer<String, String>(properties);return this;}/*** 客户端消费者拉取消息,并通过回调HeaderInterface实现类传递消息* @param topic* @param headerInterface*/public void pollMsg(String topic, HeaderInterface headerInterface) {kafkaConsumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {try{headerInterface.execute(record);}catch(Exception e){e.printStackTrace();}}}}/*** 关闭kafka连接*/public void close(){if (kafkaConsumer != null){kafkaConsumer.close();kafkaConsumer = null;}}}@FunctionalInterfaceinterface HeaderInterface{void execute(ConsumerRecord<String, String> record);}/*** 测试示例* @param args* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {//生产者发送消息
// KafkaStreamServer kafkaStreamServer = KafkaUtils.bulidServer().createKafkaStreamServer("127.0.0.1", 9092);
// int i=0;
// while (i<10) {
// String msg = "Hello," + new Random().nextInt(100);
// kafkaStreamServer.sendMsg("test", msg);
// i++;
// Thread.sleep(100);
// }
// kafkaStreamServer.close();
// System.out.println("发送结束");System.out.println("接收消息");KafkaStreamClient kafkaStreamClient = KafkaUtils.bulidClient().createKafkaStreamClient("127.0.0.1", 9092, "consumer-45");kafkaStreamClient.pollMsg("test", new HeaderInterface() {@Overridepublic void execute(ConsumerRecord<String, String> record) {System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));}});}
}相关文章:
Kafka消息服务之Java工具类
注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN; Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关…...
迪威模型网:免费畅享 3D 打印盛宴,科技魅力与趣味创意并存
还在为寻找优质3D打印模型而发愁?快来迪威模型网(https://www.3dwhere.com/),一个集前沿科技与无限趣味于一体的免费3D打印宝藏平台! 踏入迪威模型网,仿佛开启一场未来科技之旅。其“3D打印”专区ÿ…...
ECharts极简入门
ECharts 是一个基于 JavaScript的开源可视化图表库,广泛应用于数据可视化的场景中,支持多种图表类型,如柱状图、折线图、饼图、散点图、雷达图等,且具有强大的自定义功能。 1. ECharts 基本使用 首先需要引入 ECharts 库…...
PHP培训机构教务管理系统小程序源码
🔑 培训机构教务管理系统——智慧教育,高效管理新典范 🚀 这款教务管理系统,是基于前沿的ThinkPHP框架与Uniapp技术深度融合,匠心打造的培训机构管理神器。它犹如一把开启高效运营与精细管理的金钥匙,专为…...
JAVA学习第五天
接口的变量定义固定为静态变量 接口里面只能有抽象方法,且不能有构造方法 如果不重写tostring方法,会打印没有价值的信息...
pnpm和npm安装TailwindCss
npm下载及初始化来自Tailwind官方文档 npm下载: npm install -D tailwindcss npm初始化Tailwind: npx tailwindcss init pnpm下载: pnpm add -D tailwindcss3.4.1 postcss autoprefixer pnpm初始化Tailwind: pnpm exec tailwindc…...
【云安全】云原生-K8S(四)安全问题分析
Kubernetes(K8S)因其强大的容器编排能力成为了云计算和微服务架构的首选,但同时也带来了复杂的安全挑战。本文将概述K8S的主要安全问题,帮助安全工程师理解潜在威胁,并采取相应的防护措施。 K8S 攻击面概览 下面两张…...
Cloud之快照存储(Cloud Snapshot Storage)
Cloud之快照存储 一、什么是快照 1. 快照的定义 快照(Snapshot)是一种记录某一时刻数据状态的技术。在计算机存储和虚拟化环境中,快照能够将文件系统或虚拟机的状态保存下来,以便以后能够回溯到某一特定时间点。快照通常用于备…...
cs106x-lecture11(Autumn 2017)-SPL实现
打卡cs106x(Autumn 2017)-lecture11 (以下皆使用SPL实现,非STL库,后续课程结束会使用STL实现) 1、diceRolls Write a recursive function named diceRolls accepts an integer representing a number of 6-sided dice to roll, and output all possibl…...
负载均衡集群( LVS 相关原理与集群构建 )
目录 1、LVS 相关原理 1.1、LVS集群的体系结构以及特点 1.1.1 LVS简介 1.1.2 LVS体系结构 1.1.3 LVS相关术语 1.1.4 LVS工作模式 1.1.5 LVS调度算法 1.2 LVS-DR集群介绍 1.2.1 LVS-DR模式工作原理 1.2.2 LVS-DR模式应用特点 1.2.3 LVS-DR模式ARP抑制 1.3 LVS – NA…...
【分布式】Hadoop完全分布式的搭建(零基础)
Hadoop完全分布式的搭建 环境准备: (1)VMware Workstation Pro17(其他也可) (2)Centos7 (3)FinalShell (一)模型机配置 0****)安…...
基于Java+Swing+Mysql实现人事管理信息系统
基于JavaSwingMysql实现人事管理信息系统 一、系统介绍二、功能展示1.用户登陆2.用户注册3.员工信息添加、删除4.员工信息查询、修改5.部门管理6、员工考核 三、数据库四、其它1.其他系统实现五.获取源码 一、系统介绍 系统功能:用户登陆、用户注册、员工信息添加、…...
DeepSeek与ChatGPT:会取代搜索引擎和人工客服的人工智能革命
云边有个稻草人-CSDN博客 在众多创新技术中,DeepSeek和ChatGPT无疑是最为引人注目的。它们通过强大的搜索和对话生成能力,能够改变我们与计算机交互的方式,帮助我们高效地获取信息,增强智能服务。本文将深入探讨这两项技术如何结合…...
企业级RAG开源项目分享:Quivr、MaxKB、Dify、FastGPT、RagFlow
企业级 RAG GitHub 开源项目深度分享:Quivr、MaxKB、Dify、FastGPT、RagFlow 及私有化 LLM 部署建议 随着生成式 AI 技术的成熟,检索增强生成(RAG)已成为企业构建智能应用的关键技术。RAG 技术能够有效地将大型语言模型ÿ…...
js基础知识总结
1、js数据类型有哪些?存储区别 js基础类型及引用类型存储区别代码示例如下: // 基本数据类型 let a 10; let b a; // b 是 a 的一个副本 b 20; // 修改 b 不会影响 …...
LearnOpenGL——高级OpenGL(下)
教程地址:简介 - LearnOpenGL CN 高级数据 原文链接:高级数据 - LearnOpenGL CN 在OpenGL中,我们长期以来一直依赖缓冲来存储数据。本节将深入探讨一些操作缓冲的高级方法。 OpenGL中的缓冲本质上是一个管理特定内存块的对象,它…...
vue脚手架开发打地鼠游戏
游戏设计: 规划游戏的核心功能,如场景、随机出现的地鼠、计分系统、游戏时间限制等。简单设计游戏流程,包括开始界面、游戏进行中、关卡设置(如不同关卡地鼠出现数量、游戏时间等)、关卡闯关成功|失败、游戏结束闯关成…...
uniapp 连接mqtt
1:下载插件 npm install mqtt 2:创建 mqtt.js /* main.js 项目主入口注入实例 */ // import mqttTool from ./lib/mqttTool.js // Vue.prototype.$mqttTool mqttTool/* 使用范例见 /pages/index/index.vue */ // mqtt协议:H5使用ws/wss APP-…...
EX_25/2/19
1. 封装一个 File 类,用有私有成员 File* fp 实现以下功能 File f "文件名" 要求打开该文件 f.write(string str) 要求将str数据写入文件中 string str f.read(int size) 从文件中读取最多size个字节,并将读取到的数据返回 析构函数 …...
Breakout Tool
思科 CML 使用起来还是很麻烦的,很多操作对于习惯了 secure crt 或者 putty 等工具的网络工程师都不友好。 Breakout Tool 提供对远程实验室中虚拟机控制台与图形界面的本地化接入能力,其核心特性如下: Console 访问:基于 Telnet…...
工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明
AI 领域的快速发展正在催生一个新时代,智能代理(agents)不再是孤立的个体,而是能够像一个数字团队一样协作。然而,当前 AI 生态系统的碎片化阻碍了这一愿景的实现,导致了“AI 巴别塔问题”——不同代理之间…...
C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。
1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj,再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...
AspectJ 在 Android 中的完整使用指南
一、环境配置(Gradle 7.0 适配) 1. 项目级 build.gradle // 注意:沪江插件已停更,推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...
苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会
在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...
通过 Ansible 在 Windows 2022 上安装 IIS Web 服务器
拓扑结构 这是一个用于通过 Ansible 部署 IIS Web 服务器的实验室拓扑。 前提条件: 在被管理的节点上安装WinRm 准备一张自签名的证书 开放防火墙入站tcp 5985 5986端口 准备自签名证书 PS C:\Users\azureuser> $cert New-SelfSignedCertificate -DnsName &…...
