当前位置: 首页 > news >正文

Kafka消息服务之Java工具类

注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN;

Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关键任务应用程序。
Kafka 可以很好地替代更传统的消息代理。消息代理的使用原因多种多样(将处理与数据生产者分离开来、缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。

Java工具类

此基于kafka客户端的工具类,提供基础的消息发送与监听功能。

pom.xml

       <!-- 集成kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.2</version></dependency>

KafkaUtils.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;/*** @Description kafka工具类,提供消息发送与监听*/
public class KafkaUtils {/*** 获取实始化KafkaStreamServer对象* @return*/public static KafkaStreamServer bulidServer(){return new KafkaStreamServer();}/*** 获取实始化KafkaStreamClient对象* @return*/public static KafkaStreamClient bulidClient(){return new KafkaStreamClient();}public static class KafkaStreamServer{KafkaProducer<String, String> kafkaProducer = null;private KafkaStreamServer(){}/*** 创建配置属性* @param host* @param port* @return*/public KafkaStreamServer createKafkaStreamServer(String host, int port){String bootstrapServers = String.format("%s:%d", host, port);if (kafkaProducer != null){return this;}Properties properties = new Properties();//kafka地址,多个地址用逗号分割properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);kafkaProducer = new KafkaProducer<>(properties);return this;}/*** 向kafka服务发送生产者消息* @param topic* @param msg* @return*/public Future<RecordMetadata> sendMsg(String topic, String msg){ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);Future<RecordMetadata> future = kafkaProducer.send(record);System.out.println("消息发送成功:" + msg);return future;}/*** 关闭kafka连接*/public void close(){if (kafkaProducer != null){kafkaProducer.flush();kafkaProducer.close();kafkaProducer = null;}}}public static class KafkaStreamClient {KafkaConsumer<String, String> kafkaConsumer = null;private KafkaStreamClient(){}/*** 配置属性,创建消费者* @param host* @param port* @return*/public KafkaStreamClient createKafkaStreamClient(String host, int port, String groupId){String bootstrapServers = String.format("%s:%d", host, port);if (kafkaConsumer != null){return this;}Properties properties = new Properties();//kafka地址,多个地址用逗号分割properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  bootstrapServers);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);kafkaConsumer = new KafkaConsumer<String, String>(properties);return this;}/*** 客户端消费者拉取消息,并通过回调HeaderInterface实现类传递消息* @param topic* @param headerInterface*/public void pollMsg(String topic, HeaderInterface headerInterface) {kafkaConsumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {try{headerInterface.execute(record);}catch(Exception e){e.printStackTrace();}}}}/*** 关闭kafka连接*/public void close(){if (kafkaConsumer != null){kafkaConsumer.close();kafkaConsumer = null;}}}@FunctionalInterfaceinterface HeaderInterface{void execute(ConsumerRecord<String, String> record);}/*** 测试示例* @param args* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {//生产者发送消息
//        KafkaStreamServer kafkaStreamServer =  KafkaUtils.bulidServer().createKafkaStreamServer("127.0.0.1", 9092);
//        int i=0;
//        while (i<10) {
//            String msg = "Hello," + new Random().nextInt(100);
//            kafkaStreamServer.sendMsg("test", msg);
//            i++;
//            Thread.sleep(100);
//        }
//        kafkaStreamServer.close();
//        System.out.println("发送结束");System.out.println("接收消息");KafkaStreamClient kafkaStreamClient =  KafkaUtils.bulidClient().createKafkaStreamClient("127.0.0.1", 9092, "consumer-45");kafkaStreamClient.pollMsg("test", new HeaderInterface() {@Overridepublic void execute(ConsumerRecord<String, String> record) {System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));}});}
}

相关文章:

Kafka消息服务之Java工具类

注&#xff1a;此内容是本人在另一个技术平台发布的历史文章&#xff0c;转载发布到CSDN&#xff1b; Apache Kafka是一个开源分布式事件流平台&#xff0c;也是当前系统开发中流行的高性能消息队列服务&#xff0c;数千家公司使用它来实现高性能数据管道、流分析、数据集成和关…...

迪威模型网:免费畅享 3D 打印盛宴,科技魅力与趣味创意并存

还在为寻找优质3D打印模型而发愁&#xff1f;快来迪威模型网&#xff08;https://www.3dwhere.com/&#xff09;&#xff0c;一个集前沿科技与无限趣味于一体的免费3D打印宝藏平台&#xff01; 踏入迪威模型网&#xff0c;仿佛开启一场未来科技之旅。其“3D打印”专区&#xff…...

ECharts极简入门

ECharts 是一个基于 JavaScript的开源可视化图表库&#xff0c;广泛应用于数据可视化的场景中&#xff0c;支持多种图表类型&#xff0c;如柱状图、折线图、饼图、散点图、雷达图等&#xff0c;且具有强大的自定义功能。 1. ECharts 基本使用 首先需要引入 ECharts 库&#xf…...

PHP培训机构教务管理系统小程序源码

&#x1f511; 培训机构教务管理系统——智慧教育&#xff0c;高效管理新典范 &#x1f680; 这款教务管理系统&#xff0c;是基于前沿的ThinkPHP框架与Uniapp技术深度融合&#xff0c;匠心打造的培训机构管理神器。它犹如一把开启高效运营与精细管理的金钥匙&#xff0c;专为…...

JAVA学习第五天

接口的变量定义固定为静态变量 接口里面只能有抽象方法&#xff0c;且不能有构造方法 如果不重写tostring方法&#xff0c;会打印没有价值的信息...

pnpm和npm安装TailwindCss

npm下载及初始化来自Tailwind官方文档 npm下载&#xff1a; npm install -D tailwindcss npm初始化Tailwind&#xff1a; npx tailwindcss init pnpm下载&#xff1a; pnpm add -D tailwindcss3.4.1 postcss autoprefixer pnpm初始化Tailwind&#xff1a; pnpm exec tailwindc…...

【云安全】云原生-K8S(四)安全问题分析

Kubernetes&#xff08;K8S&#xff09;因其强大的容器编排能力成为了云计算和微服务架构的首选&#xff0c;但同时也带来了复杂的安全挑战。本文将概述K8S的主要安全问题&#xff0c;帮助安全工程师理解潜在威胁&#xff0c;并采取相应的防护措施。 K8S 攻击面概览 下面两张…...

Cloud之快照存储(Cloud Snapshot Storage)

Cloud之快照存储 一、什么是快照 1. 快照的定义 快照&#xff08;Snapshot&#xff09;是一种记录某一时刻数据状态的技术。在计算机存储和虚拟化环境中&#xff0c;快照能够将文件系统或虚拟机的状态保存下来&#xff0c;以便以后能够回溯到某一特定时间点。快照通常用于备…...

cs106x-lecture11(Autumn 2017)-SPL实现

打卡cs106x(Autumn 2017)-lecture11 (以下皆使用SPL实现&#xff0c;非STL库&#xff0c;后续课程结束会使用STL实现) 1、diceRolls Write a recursive function named diceRolls accepts an integer representing a number of 6-sided dice to roll, and output all possibl…...

负载均衡集群( LVS 相关原理与集群构建 )

目录 1、LVS 相关原理 1.1、LVS集群的体系结构以及特点 1.1.1 LVS简介 1.1.2 LVS体系结构 1.1.3 LVS相关术语 1.1.4 LVS工作模式 1.1.5 LVS调度算法 1.2 LVS-DR集群介绍 1.2.1 LVS-DR模式工作原理 1.2.2 LVS-DR模式应用特点 1.2.3 LVS-DR模式ARP抑制 1.3 LVS – NA…...

【分布式】Hadoop完全分布式的搭建(零基础)

Hadoop完全分布式的搭建 环境准备&#xff1a; &#xff08;1&#xff09;VMware Workstation Pro17&#xff08;其他也可&#xff09; &#xff08;2&#xff09;Centos7 &#xff08;3&#xff09;FinalShell &#xff08;一&#xff09;模型机配置 0****&#xff09;安…...

基于Java+Swing+Mysql实现人事管理信息系统

基于JavaSwingMysql实现人事管理信息系统 一、系统介绍二、功能展示1.用户登陆2.用户注册3.员工信息添加、删除4.员工信息查询、修改5.部门管理6、员工考核 三、数据库四、其它1.其他系统实现五.获取源码 一、系统介绍 系统功能&#xff1a;用户登陆、用户注册、员工信息添加、…...

DeepSeek与ChatGPT:会取代搜索引擎和人工客服的人工智能革命

云边有个稻草人-CSDN博客 在众多创新技术中&#xff0c;DeepSeek和ChatGPT无疑是最为引人注目的。它们通过强大的搜索和对话生成能力&#xff0c;能够改变我们与计算机交互的方式&#xff0c;帮助我们高效地获取信息&#xff0c;增强智能服务。本文将深入探讨这两项技术如何结合…...

企业级RAG开源项目分享:Quivr、MaxKB、Dify、FastGPT、RagFlow

企业级 RAG GitHub 开源项目深度分享&#xff1a;Quivr、MaxKB、Dify、FastGPT、RagFlow 及私有化 LLM 部署建议 随着生成式 AI 技术的成熟&#xff0c;检索增强生成&#xff08;RAG&#xff09;已成为企业构建智能应用的关键技术。RAG 技术能够有效地将大型语言模型&#xff…...

js基础知识总结

1、js数据类型有哪些&#xff1f;存储区别 js基础类型及引用类型存储区别代码示例如下&#xff1a; // 基本数据类型 let a 10; let b a; // b 是 a 的一个副本 b 20; // 修改 b 不会影响 …...

LearnOpenGL——高级OpenGL(下)

教程地址&#xff1a;简介 - LearnOpenGL CN 高级数据 原文链接&#xff1a;高级数据 - LearnOpenGL CN 在OpenGL中&#xff0c;我们长期以来一直依赖缓冲来存储数据。本节将深入探讨一些操作缓冲的高级方法。 OpenGL中的缓冲本质上是一个管理特定内存块的对象&#xff0c;它…...

vue脚手架开发打地鼠游戏

游戏设计&#xff1a; 规划游戏的核心功能&#xff0c;如场景、随机出现的地鼠、计分系统、游戏时间限制等。简单设计游戏流程&#xff0c;包括开始界面、游戏进行中、关卡设置&#xff08;如不同关卡地鼠出现数量、游戏时间等&#xff09;、关卡闯关成功|失败、游戏结束闯关成…...

uniapp 连接mqtt

1&#xff1a;下载插件 npm install mqtt 2&#xff1a;创建 mqtt.js /* main.js 项目主入口注入实例 */ // import mqttTool from ./lib/mqttTool.js // Vue.prototype.$mqttTool mqttTool/* 使用范例见 /pages/index/index.vue */ // mqtt协议&#xff1a;H5使用ws/wss APP-…...

EX_25/2/19

1. 封装一个 File 类&#xff0c;用有私有成员 File* fp 实现以下功能 File f "文件名" 要求打开该文件 f.write(string str) 要求将str数据写入文件中 string str f.read(int size) 从文件中读取最多size个字节&#xff0c;并将读取到的数据返回 析构函数 …...

Breakout Tool

思科 CML 使用起来还是很麻烦的&#xff0c;很多操作对于习惯了 secure crt 或者 putty 等工具的网络工程师都不友好。 Breakout Tool 提供对远程实验室中虚拟机控制台与图形界面的本地化接入能力&#xff0c;其核心特性如下&#xff1a; Console 访问&#xff1a;基于 Telnet…...

3.3.1_1 检错编码(奇偶校验码)

从这节课开始&#xff0c;我们会探讨数据链路层的差错控制功能&#xff0c;差错控制功能的主要目标是要发现并且解决一个帧内部的位错误&#xff0c;我们需要使用特殊的编码技术去发现帧内部的位错误&#xff0c;当我们发现位错误之后&#xff0c;通常来说有两种解决方案。第一…...

Ascend NPU上适配Step-Audio模型

1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统&#xff0c;支持多语言对话&#xff08;如 中文&#xff0c;英文&#xff0c;日语&#xff09;&#xff0c;语音情感&#xff08;如 开心&#xff0c;悲伤&#xff09;&#x…...

2023赣州旅游投资集团

单选题 1.“不登高山&#xff0c;不知天之高也&#xff1b;不临深溪&#xff0c;不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?

uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件&#xff0c;用于在原生应用中加载 HTML 页面&#xff1a; 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...

【VLNs篇】07:NavRL—在动态环境中学习安全飞行

项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战&#xff0c;克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...

2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...

uniapp 开发ios, xcode 提交app store connect 和 testflight内测

uniapp 中配置 配置manifest 文档&#xff1a;manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号&#xff1a;4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...

软件工程 期末复习

瀑布模型&#xff1a;计划 螺旋模型&#xff1a;风险低 原型模型: 用户反馈 喷泉模型:代码复用 高内聚 低耦合&#xff1a;模块内部功能紧密 模块之间依赖程度小 高内聚&#xff1a;指的是一个模块内部的功能应该紧密相关。换句话说&#xff0c;一个模块应当只实现单一的功能…...

数据库正常,但后端收不到数据原因及解决

从代码和日志来看&#xff0c;后端SQL查询确实返回了数据&#xff0c;但最终user对象却为null。这表明查询结果没有正确映射到User对象上。 在前后端分离&#xff0c;并且ai辅助开发的时候&#xff0c;很容易出现前后端变量名不一致情况&#xff0c;还不报错&#xff0c;只是单…...

ArcPy扩展模块的使用(3)

管理工程项目 arcpy.mp模块允许用户管理布局、地图、报表、文件夹连接、视图等工程项目。例如&#xff0c;可以更新、修复或替换图层数据源&#xff0c;修改图层的符号系统&#xff0c;甚至自动在线执行共享要托管在组织中的工程项。 以下代码展示了如何更新图层的数据源&…...