Kafka消息服务之Java工具类
注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN;
Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关键任务应用程序。
Kafka 可以很好地替代更传统的消息代理。消息代理的使用原因多种多样(将处理与数据生产者分离开来、缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。
Java工具类
此基于kafka客户端的工具类,提供基础的消息发送与监听功能。
pom.xml
<!-- 集成kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.2</version></dependency>
KafkaUtils.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;/*** @Description kafka工具类,提供消息发送与监听*/
public class KafkaUtils {/*** 获取实始化KafkaStreamServer对象* @return*/public static KafkaStreamServer bulidServer(){return new KafkaStreamServer();}/*** 获取实始化KafkaStreamClient对象* @return*/public static KafkaStreamClient bulidClient(){return new KafkaStreamClient();}public static class KafkaStreamServer{KafkaProducer<String, String> kafkaProducer = null;private KafkaStreamServer(){}/*** 创建配置属性* @param host* @param port* @return*/public KafkaStreamServer createKafkaStreamServer(String host, int port){String bootstrapServers = String.format("%s:%d", host, port);if (kafkaProducer != null){return this;}Properties properties = new Properties();//kafka地址,多个地址用逗号分割properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);kafkaProducer = new KafkaProducer<>(properties);return this;}/*** 向kafka服务发送生产者消息* @param topic* @param msg* @return*/public Future<RecordMetadata> sendMsg(String topic, String msg){ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);Future<RecordMetadata> future = kafkaProducer.send(record);System.out.println("消息发送成功:" + msg);return future;}/*** 关闭kafka连接*/public void close(){if (kafkaProducer != null){kafkaProducer.flush();kafkaProducer.close();kafkaProducer = null;}}}public static class KafkaStreamClient {KafkaConsumer<String, String> kafkaConsumer = null;private KafkaStreamClient(){}/*** 配置属性,创建消费者* @param host* @param port* @return*/public KafkaStreamClient createKafkaStreamClient(String host, int port, String groupId){String bootstrapServers = String.format("%s:%d", host, port);if (kafkaConsumer != null){return this;}Properties properties = new Properties();//kafka地址,多个地址用逗号分割properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);kafkaConsumer = new KafkaConsumer<String, String>(properties);return this;}/*** 客户端消费者拉取消息,并通过回调HeaderInterface实现类传递消息* @param topic* @param headerInterface*/public void pollMsg(String topic, HeaderInterface headerInterface) {kafkaConsumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {try{headerInterface.execute(record);}catch(Exception e){e.printStackTrace();}}}}/*** 关闭kafka连接*/public void close(){if (kafkaConsumer != null){kafkaConsumer.close();kafkaConsumer = null;}}}@FunctionalInterfaceinterface HeaderInterface{void execute(ConsumerRecord<String, String> record);}/*** 测试示例* @param args* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {//生产者发送消息
// KafkaStreamServer kafkaStreamServer = KafkaUtils.bulidServer().createKafkaStreamServer("127.0.0.1", 9092);
// int i=0;
// while (i<10) {
// String msg = "Hello," + new Random().nextInt(100);
// kafkaStreamServer.sendMsg("test", msg);
// i++;
// Thread.sleep(100);
// }
// kafkaStreamServer.close();
// System.out.println("发送结束");System.out.println("接收消息");KafkaStreamClient kafkaStreamClient = KafkaUtils.bulidClient().createKafkaStreamClient("127.0.0.1", 9092, "consumer-45");kafkaStreamClient.pollMsg("test", new HeaderInterface() {@Overridepublic void execute(ConsumerRecord<String, String> record) {System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));}});}
}
相关文章:
Kafka消息服务之Java工具类
注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN; Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关…...
迪威模型网:免费畅享 3D 打印盛宴,科技魅力与趣味创意并存
还在为寻找优质3D打印模型而发愁?快来迪威模型网(https://www.3dwhere.com/),一个集前沿科技与无限趣味于一体的免费3D打印宝藏平台! 踏入迪威模型网,仿佛开启一场未来科技之旅。其“3D打印”专区ÿ…...
ECharts极简入门
ECharts 是一个基于 JavaScript的开源可视化图表库,广泛应用于数据可视化的场景中,支持多种图表类型,如柱状图、折线图、饼图、散点图、雷达图等,且具有强大的自定义功能。 1. ECharts 基本使用 首先需要引入 ECharts 库…...

PHP培训机构教务管理系统小程序源码
🔑 培训机构教务管理系统——智慧教育,高效管理新典范 🚀 这款教务管理系统,是基于前沿的ThinkPHP框架与Uniapp技术深度融合,匠心打造的培训机构管理神器。它犹如一把开启高效运营与精细管理的金钥匙,专为…...

JAVA学习第五天
接口的变量定义固定为静态变量 接口里面只能有抽象方法,且不能有构造方法 如果不重写tostring方法,会打印没有价值的信息...
pnpm和npm安装TailwindCss
npm下载及初始化来自Tailwind官方文档 npm下载: npm install -D tailwindcss npm初始化Tailwind: npx tailwindcss init pnpm下载: pnpm add -D tailwindcss3.4.1 postcss autoprefixer pnpm初始化Tailwind: pnpm exec tailwindc…...

【云安全】云原生-K8S(四)安全问题分析
Kubernetes(K8S)因其强大的容器编排能力成为了云计算和微服务架构的首选,但同时也带来了复杂的安全挑战。本文将概述K8S的主要安全问题,帮助安全工程师理解潜在威胁,并采取相应的防护措施。 K8S 攻击面概览 下面两张…...

Cloud之快照存储(Cloud Snapshot Storage)
Cloud之快照存储 一、什么是快照 1. 快照的定义 快照(Snapshot)是一种记录某一时刻数据状态的技术。在计算机存储和虚拟化环境中,快照能够将文件系统或虚拟机的状态保存下来,以便以后能够回溯到某一特定时间点。快照通常用于备…...
cs106x-lecture11(Autumn 2017)-SPL实现
打卡cs106x(Autumn 2017)-lecture11 (以下皆使用SPL实现,非STL库,后续课程结束会使用STL实现) 1、diceRolls Write a recursive function named diceRolls accepts an integer representing a number of 6-sided dice to roll, and output all possibl…...

负载均衡集群( LVS 相关原理与集群构建 )
目录 1、LVS 相关原理 1.1、LVS集群的体系结构以及特点 1.1.1 LVS简介 1.1.2 LVS体系结构 1.1.3 LVS相关术语 1.1.4 LVS工作模式 1.1.5 LVS调度算法 1.2 LVS-DR集群介绍 1.2.1 LVS-DR模式工作原理 1.2.2 LVS-DR模式应用特点 1.2.3 LVS-DR模式ARP抑制 1.3 LVS – NA…...

【分布式】Hadoop完全分布式的搭建(零基础)
Hadoop完全分布式的搭建 环境准备: (1)VMware Workstation Pro17(其他也可) (2)Centos7 (3)FinalShell (一)模型机配置 0****)安…...

基于Java+Swing+Mysql实现人事管理信息系统
基于JavaSwingMysql实现人事管理信息系统 一、系统介绍二、功能展示1.用户登陆2.用户注册3.员工信息添加、删除4.员工信息查询、修改5.部门管理6、员工考核 三、数据库四、其它1.其他系统实现五.获取源码 一、系统介绍 系统功能:用户登陆、用户注册、员工信息添加、…...
DeepSeek与ChatGPT:会取代搜索引擎和人工客服的人工智能革命
云边有个稻草人-CSDN博客 在众多创新技术中,DeepSeek和ChatGPT无疑是最为引人注目的。它们通过强大的搜索和对话生成能力,能够改变我们与计算机交互的方式,帮助我们高效地获取信息,增强智能服务。本文将深入探讨这两项技术如何结合…...

企业级RAG开源项目分享:Quivr、MaxKB、Dify、FastGPT、RagFlow
企业级 RAG GitHub 开源项目深度分享:Quivr、MaxKB、Dify、FastGPT、RagFlow 及私有化 LLM 部署建议 随着生成式 AI 技术的成熟,检索增强生成(RAG)已成为企业构建智能应用的关键技术。RAG 技术能够有效地将大型语言模型ÿ…...

js基础知识总结
1、js数据类型有哪些?存储区别 js基础类型及引用类型存储区别代码示例如下: // 基本数据类型 let a 10; let b a; // b 是 a 的一个副本 b 20; // 修改 b 不会影响 …...

LearnOpenGL——高级OpenGL(下)
教程地址:简介 - LearnOpenGL CN 高级数据 原文链接:高级数据 - LearnOpenGL CN 在OpenGL中,我们长期以来一直依赖缓冲来存储数据。本节将深入探讨一些操作缓冲的高级方法。 OpenGL中的缓冲本质上是一个管理特定内存块的对象,它…...

vue脚手架开发打地鼠游戏
游戏设计: 规划游戏的核心功能,如场景、随机出现的地鼠、计分系统、游戏时间限制等。简单设计游戏流程,包括开始界面、游戏进行中、关卡设置(如不同关卡地鼠出现数量、游戏时间等)、关卡闯关成功|失败、游戏结束闯关成…...
uniapp 连接mqtt
1:下载插件 npm install mqtt 2:创建 mqtt.js /* main.js 项目主入口注入实例 */ // import mqttTool from ./lib/mqttTool.js // Vue.prototype.$mqttTool mqttTool/* 使用范例见 /pages/index/index.vue */ // mqtt协议:H5使用ws/wss APP-…...
EX_25/2/19
1. 封装一个 File 类,用有私有成员 File* fp 实现以下功能 File f "文件名" 要求打开该文件 f.write(string str) 要求将str数据写入文件中 string str f.read(int size) 从文件中读取最多size个字节,并将读取到的数据返回 析构函数 …...

Breakout Tool
思科 CML 使用起来还是很麻烦的,很多操作对于习惯了 secure crt 或者 putty 等工具的网络工程师都不友好。 Breakout Tool 提供对远程实验室中虚拟机控制台与图形界面的本地化接入能力,其核心特性如下: Console 访问:基于 Telnet…...

JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...

让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比
在机器学习的回归分析中,损失函数的选择对模型性能具有决定性影响。均方误差(MSE)作为经典的损失函数,在处理干净数据时表现优异,但在面对包含异常值的噪声数据时,其对大误差的二次惩罚机制往往导致模型参数…...

基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...

面向无人机海岸带生态系统监测的语义分割基准数据集
描述:海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而,目前该领域仍面临一个挑战,即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...