Kafka消息服务之Java工具类
注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN;
Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关键任务应用程序。
Kafka 可以很好地替代更传统的消息代理。消息代理的使用原因多种多样(将处理与数据生产者分离开来、缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。
Java工具类
此基于kafka客户端的工具类,提供基础的消息发送与监听功能。
pom.xml
<!-- 集成kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.2</version></dependency>
KafkaUtils.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;/*** @Description kafka工具类,提供消息发送与监听*/
public class KafkaUtils {/*** 获取实始化KafkaStreamServer对象* @return*/public static KafkaStreamServer bulidServer(){return new KafkaStreamServer();}/*** 获取实始化KafkaStreamClient对象* @return*/public static KafkaStreamClient bulidClient(){return new KafkaStreamClient();}public static class KafkaStreamServer{KafkaProducer<String, String> kafkaProducer = null;private KafkaStreamServer(){}/*** 创建配置属性* @param host* @param port* @return*/public KafkaStreamServer createKafkaStreamServer(String host, int port){String bootstrapServers = String.format("%s:%d", host, port);if (kafkaProducer != null){return this;}Properties properties = new Properties();//kafka地址,多个地址用逗号分割properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);kafkaProducer = new KafkaProducer<>(properties);return this;}/*** 向kafka服务发送生产者消息* @param topic* @param msg* @return*/public Future<RecordMetadata> sendMsg(String topic, String msg){ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);Future<RecordMetadata> future = kafkaProducer.send(record);System.out.println("消息发送成功:" + msg);return future;}/*** 关闭kafka连接*/public void close(){if (kafkaProducer != null){kafkaProducer.flush();kafkaProducer.close();kafkaProducer = null;}}}public static class KafkaStreamClient {KafkaConsumer<String, String> kafkaConsumer = null;private KafkaStreamClient(){}/*** 配置属性,创建消费者* @param host* @param port* @return*/public KafkaStreamClient createKafkaStreamClient(String host, int port, String groupId){String bootstrapServers = String.format("%s:%d", host, port);if (kafkaConsumer != null){return this;}Properties properties = new Properties();//kafka地址,多个地址用逗号分割properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);kafkaConsumer = new KafkaConsumer<String, String>(properties);return this;}/*** 客户端消费者拉取消息,并通过回调HeaderInterface实现类传递消息* @param topic* @param headerInterface*/public void pollMsg(String topic, HeaderInterface headerInterface) {kafkaConsumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {try{headerInterface.execute(record);}catch(Exception e){e.printStackTrace();}}}}/*** 关闭kafka连接*/public void close(){if (kafkaConsumer != null){kafkaConsumer.close();kafkaConsumer = null;}}}@FunctionalInterfaceinterface HeaderInterface{void execute(ConsumerRecord<String, String> record);}/*** 测试示例* @param args* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {//生产者发送消息
// KafkaStreamServer kafkaStreamServer = KafkaUtils.bulidServer().createKafkaStreamServer("127.0.0.1", 9092);
// int i=0;
// while (i<10) {
// String msg = "Hello," + new Random().nextInt(100);
// kafkaStreamServer.sendMsg("test", msg);
// i++;
// Thread.sleep(100);
// }
// kafkaStreamServer.close();
// System.out.println("发送结束");System.out.println("接收消息");KafkaStreamClient kafkaStreamClient = KafkaUtils.bulidClient().createKafkaStreamClient("127.0.0.1", 9092, "consumer-45");kafkaStreamClient.pollMsg("test", new HeaderInterface() {@Overridepublic void execute(ConsumerRecord<String, String> record) {System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));}});}
}相关文章:
Kafka消息服务之Java工具类
注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN; Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关…...
迪威模型网:免费畅享 3D 打印盛宴,科技魅力与趣味创意并存
还在为寻找优质3D打印模型而发愁?快来迪威模型网(https://www.3dwhere.com/),一个集前沿科技与无限趣味于一体的免费3D打印宝藏平台! 踏入迪威模型网,仿佛开启一场未来科技之旅。其“3D打印”专区ÿ…...
ECharts极简入门
ECharts 是一个基于 JavaScript的开源可视化图表库,广泛应用于数据可视化的场景中,支持多种图表类型,如柱状图、折线图、饼图、散点图、雷达图等,且具有强大的自定义功能。 1. ECharts 基本使用 首先需要引入 ECharts 库…...
PHP培训机构教务管理系统小程序源码
🔑 培训机构教务管理系统——智慧教育,高效管理新典范 🚀 这款教务管理系统,是基于前沿的ThinkPHP框架与Uniapp技术深度融合,匠心打造的培训机构管理神器。它犹如一把开启高效运营与精细管理的金钥匙,专为…...
JAVA学习第五天
接口的变量定义固定为静态变量 接口里面只能有抽象方法,且不能有构造方法 如果不重写tostring方法,会打印没有价值的信息...
pnpm和npm安装TailwindCss
npm下载及初始化来自Tailwind官方文档 npm下载: npm install -D tailwindcss npm初始化Tailwind: npx tailwindcss init pnpm下载: pnpm add -D tailwindcss3.4.1 postcss autoprefixer pnpm初始化Tailwind: pnpm exec tailwindc…...
【云安全】云原生-K8S(四)安全问题分析
Kubernetes(K8S)因其强大的容器编排能力成为了云计算和微服务架构的首选,但同时也带来了复杂的安全挑战。本文将概述K8S的主要安全问题,帮助安全工程师理解潜在威胁,并采取相应的防护措施。 K8S 攻击面概览 下面两张…...
Cloud之快照存储(Cloud Snapshot Storage)
Cloud之快照存储 一、什么是快照 1. 快照的定义 快照(Snapshot)是一种记录某一时刻数据状态的技术。在计算机存储和虚拟化环境中,快照能够将文件系统或虚拟机的状态保存下来,以便以后能够回溯到某一特定时间点。快照通常用于备…...
cs106x-lecture11(Autumn 2017)-SPL实现
打卡cs106x(Autumn 2017)-lecture11 (以下皆使用SPL实现,非STL库,后续课程结束会使用STL实现) 1、diceRolls Write a recursive function named diceRolls accepts an integer representing a number of 6-sided dice to roll, and output all possibl…...
负载均衡集群( LVS 相关原理与集群构建 )
目录 1、LVS 相关原理 1.1、LVS集群的体系结构以及特点 1.1.1 LVS简介 1.1.2 LVS体系结构 1.1.3 LVS相关术语 1.1.4 LVS工作模式 1.1.5 LVS调度算法 1.2 LVS-DR集群介绍 1.2.1 LVS-DR模式工作原理 1.2.2 LVS-DR模式应用特点 1.2.3 LVS-DR模式ARP抑制 1.3 LVS – NA…...
【分布式】Hadoop完全分布式的搭建(零基础)
Hadoop完全分布式的搭建 环境准备: (1)VMware Workstation Pro17(其他也可) (2)Centos7 (3)FinalShell (一)模型机配置 0****)安…...
基于Java+Swing+Mysql实现人事管理信息系统
基于JavaSwingMysql实现人事管理信息系统 一、系统介绍二、功能展示1.用户登陆2.用户注册3.员工信息添加、删除4.员工信息查询、修改5.部门管理6、员工考核 三、数据库四、其它1.其他系统实现五.获取源码 一、系统介绍 系统功能:用户登陆、用户注册、员工信息添加、…...
DeepSeek与ChatGPT:会取代搜索引擎和人工客服的人工智能革命
云边有个稻草人-CSDN博客 在众多创新技术中,DeepSeek和ChatGPT无疑是最为引人注目的。它们通过强大的搜索和对话生成能力,能够改变我们与计算机交互的方式,帮助我们高效地获取信息,增强智能服务。本文将深入探讨这两项技术如何结合…...
企业级RAG开源项目分享:Quivr、MaxKB、Dify、FastGPT、RagFlow
企业级 RAG GitHub 开源项目深度分享:Quivr、MaxKB、Dify、FastGPT、RagFlow 及私有化 LLM 部署建议 随着生成式 AI 技术的成熟,检索增强生成(RAG)已成为企业构建智能应用的关键技术。RAG 技术能够有效地将大型语言模型ÿ…...
js基础知识总结
1、js数据类型有哪些?存储区别 js基础类型及引用类型存储区别代码示例如下: // 基本数据类型 let a 10; let b a; // b 是 a 的一个副本 b 20; // 修改 b 不会影响 …...
LearnOpenGL——高级OpenGL(下)
教程地址:简介 - LearnOpenGL CN 高级数据 原文链接:高级数据 - LearnOpenGL CN 在OpenGL中,我们长期以来一直依赖缓冲来存储数据。本节将深入探讨一些操作缓冲的高级方法。 OpenGL中的缓冲本质上是一个管理特定内存块的对象,它…...
vue脚手架开发打地鼠游戏
游戏设计: 规划游戏的核心功能,如场景、随机出现的地鼠、计分系统、游戏时间限制等。简单设计游戏流程,包括开始界面、游戏进行中、关卡设置(如不同关卡地鼠出现数量、游戏时间等)、关卡闯关成功|失败、游戏结束闯关成…...
uniapp 连接mqtt
1:下载插件 npm install mqtt 2:创建 mqtt.js /* main.js 项目主入口注入实例 */ // import mqttTool from ./lib/mqttTool.js // Vue.prototype.$mqttTool mqttTool/* 使用范例见 /pages/index/index.vue */ // mqtt协议:H5使用ws/wss APP-…...
EX_25/2/19
1. 封装一个 File 类,用有私有成员 File* fp 实现以下功能 File f "文件名" 要求打开该文件 f.write(string str) 要求将str数据写入文件中 string str f.read(int size) 从文件中读取最多size个字节,并将读取到的数据返回 析构函数 …...
Breakout Tool
思科 CML 使用起来还是很麻烦的,很多操作对于习惯了 secure crt 或者 putty 等工具的网络工程师都不友好。 Breakout Tool 提供对远程实验室中虚拟机控制台与图形界面的本地化接入能力,其核心特性如下: Console 访问:基于 Telnet…...
实时视频翻译系统架构优化与工程实践
1. 实时视频翻译系统的技术挑战与架构演进在全球化协作日益频繁的今天,视频会议已成为跨国商务、学术交流和远程办公的核心工具。然而语言障碍始终是阻碍沟通效率的关键瓶颈。传统字幕翻译方案存在明显缺陷:文字信息无法传递说话者的语气情感,…...
PAT乙级2024春B-1题解:用Python验证‘偶数个奇数’与‘最小和’的数学直觉
PAT乙级数学思维突破:从奇偶性到最小和的解题艺术 当键盘敲击声在考场此起彼伏,真正的高手早已看透题目背后的数学本质。PAT乙级考试中那些看似复杂的组合问题,往往只需要几个关键洞察就能迎刃而解。今天我们要探讨的"合成2024"问…...
LinkSwift:八大网盘直链解析工具,突破下载限制的智能解决方案
LinkSwift:八大网盘直链解析工具,突破下载限制的智能解决方案 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 ,支持 百度网盘 / 阿里云盘 / 中…...
OpenBMB/IoA框架:构建多智能体协作系统的核心原理与工程实践
1. 项目概述:当大模型学会“开会”,一场关于智能协作的范式革命 最近在折腾大模型应用落地的朋友,可能都遇到过这样一个头疼的问题:单个模型能力再强,面对复杂任务时也常常力不从心。比如,你需要它帮你写一…...
OpCore-Simplify:5分钟完成黑苹果OpenCore自动化配置终极指南
OpCore-Simplify:5分钟完成黑苹果OpenCore自动化配置终极指南 【免费下载链接】OpCore-Simplify A tool designed to simplify the creation of OpenCore EFI 项目地址: https://gitcode.com/GitHub_Trending/op/OpCore-Simplify OpCore-Simplify是一款专为简…...
【YOLOv11】062、YOLOv11模型硬件感知优化:针对特定硬件架构的优化
上周在部署YOLOv11到边缘设备时遇到了一个典型问题:在服务器上推理速度能达到30FPS的模型,搬到Jetson Orin上直接掉到了8FPS。更诡异的是,GPU利用率始终上不去,CPU倒是忙得不行。盯着nvidia-smi看了半天才反应过来——这模型压根没跟硬件对上话。 硬件不是黑盒子 很多人把…...
计科毕业设计简单的题目怎么选
0 选题推荐 - 云计算篇 毕业设计是大家学习生涯的最重要的里程碑,它不仅是对四年所学知识的综合运用,更是展示个人技术能力和创新思维的重要过程。选择一个合适的毕业设计题目至关重要,它应该既能体现你的专业能力,又能满足实际应…...
Dev Container启动慢、调试卡顿、扩展失效,深度诊断与7步精准修复全流程
更多请点击: https://intelliparadigm.com 第一章:Dev Container性能问题的典型现象与影响面分析 Dev Container 在现代云原生开发中广泛用于环境一致性保障,但其性能瓶颈常被低估。当容器启动缓慢、代码补全延迟显著、或调试会话频繁中断时…...
BiliTools终极指南:如何用一款工具搞定B站视频下载与弹幕处理
BiliTools终极指南:如何用一款工具搞定B站视频下载与弹幕处理 【免费下载链接】BiliTools A cross-platform bilibili toolbox. 跨平台哔哩哔哩工具箱,支持下载视频、番剧等等各类资源 项目地址: https://gitcode.com/GitHub_Trending/bilit/BiliTools…...
ArcGIS新手必看:别再手动量了!用‘计算几何’批量搞定线要素长度(附坐标系避坑指南)
ArcGIS高效计算线要素长度的完整指南:从基础操作到坐标系深度解析 引言:为什么你的线要素长度计算总出错? 刚接触ArcGIS的用户在处理线要素长度时,往往会遇到各种令人困惑的问题——计算结果与实际不符、功能按钮变灰禁用、弹出…...
