当前位置: 首页 > news >正文

Kafka消息服务之Java工具类

注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN;

Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关键任务应用程序。
Kafka 可以很好地替代更传统的消息代理。消息代理的使用原因多种多样(将处理与数据生产者分离开来、缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。

Java工具类

此基于kafka客户端的工具类,提供基础的消息发送与监听功能。

pom.xml

       <!-- 集成kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.2</version></dependency>

KafkaUtils.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;/*** @Description kafka工具类,提供消息发送与监听*/
public class KafkaUtils {/*** 获取实始化KafkaStreamServer对象* @return*/public static KafkaStreamServer bulidServer(){return new KafkaStreamServer();}/*** 获取实始化KafkaStreamClient对象* @return*/public static KafkaStreamClient bulidClient(){return new KafkaStreamClient();}public static class KafkaStreamServer{KafkaProducer<String, String> kafkaProducer = null;private KafkaStreamServer(){}/*** 创建配置属性* @param host* @param port* @return*/public KafkaStreamServer createKafkaStreamServer(String host, int port){String bootstrapServers = String.format("%s:%d", host, port);if (kafkaProducer != null){return this;}Properties properties = new Properties();//kafka地址,多个地址用逗号分割properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);kafkaProducer = new KafkaProducer<>(properties);return this;}/*** 向kafka服务发送生产者消息* @param topic* @param msg* @return*/public Future<RecordMetadata> sendMsg(String topic, String msg){ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);Future<RecordMetadata> future = kafkaProducer.send(record);System.out.println("消息发送成功:" + msg);return future;}/*** 关闭kafka连接*/public void close(){if (kafkaProducer != null){kafkaProducer.flush();kafkaProducer.close();kafkaProducer = null;}}}public static class KafkaStreamClient {KafkaConsumer<String, String> kafkaConsumer = null;private KafkaStreamClient(){}/*** 配置属性,创建消费者* @param host* @param port* @return*/public KafkaStreamClient createKafkaStreamClient(String host, int port, String groupId){String bootstrapServers = String.format("%s:%d", host, port);if (kafkaConsumer != null){return this;}Properties properties = new Properties();//kafka地址,多个地址用逗号分割properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  bootstrapServers);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);kafkaConsumer = new KafkaConsumer<String, String>(properties);return this;}/*** 客户端消费者拉取消息,并通过回调HeaderInterface实现类传递消息* @param topic* @param headerInterface*/public void pollMsg(String topic, HeaderInterface headerInterface) {kafkaConsumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {try{headerInterface.execute(record);}catch(Exception e){e.printStackTrace();}}}}/*** 关闭kafka连接*/public void close(){if (kafkaConsumer != null){kafkaConsumer.close();kafkaConsumer = null;}}}@FunctionalInterfaceinterface HeaderInterface{void execute(ConsumerRecord<String, String> record);}/*** 测试示例* @param args* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {//生产者发送消息
//        KafkaStreamServer kafkaStreamServer =  KafkaUtils.bulidServer().createKafkaStreamServer("127.0.0.1", 9092);
//        int i=0;
//        while (i<10) {
//            String msg = "Hello," + new Random().nextInt(100);
//            kafkaStreamServer.sendMsg("test", msg);
//            i++;
//            Thread.sleep(100);
//        }
//        kafkaStreamServer.close();
//        System.out.println("发送结束");System.out.println("接收消息");KafkaStreamClient kafkaStreamClient =  KafkaUtils.bulidClient().createKafkaStreamClient("127.0.0.1", 9092, "consumer-45");kafkaStreamClient.pollMsg("test", new HeaderInterface() {@Overridepublic void execute(ConsumerRecord<String, String> record) {System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));}});}
}

相关文章:

Kafka消息服务之Java工具类

注&#xff1a;此内容是本人在另一个技术平台发布的历史文章&#xff0c;转载发布到CSDN&#xff1b; Apache Kafka是一个开源分布式事件流平台&#xff0c;也是当前系统开发中流行的高性能消息队列服务&#xff0c;数千家公司使用它来实现高性能数据管道、流分析、数据集成和关…...

迪威模型网:免费畅享 3D 打印盛宴,科技魅力与趣味创意并存

还在为寻找优质3D打印模型而发愁&#xff1f;快来迪威模型网&#xff08;https://www.3dwhere.com/&#xff09;&#xff0c;一个集前沿科技与无限趣味于一体的免费3D打印宝藏平台&#xff01; 踏入迪威模型网&#xff0c;仿佛开启一场未来科技之旅。其“3D打印”专区&#xff…...

ECharts极简入门

ECharts 是一个基于 JavaScript的开源可视化图表库&#xff0c;广泛应用于数据可视化的场景中&#xff0c;支持多种图表类型&#xff0c;如柱状图、折线图、饼图、散点图、雷达图等&#xff0c;且具有强大的自定义功能。 1. ECharts 基本使用 首先需要引入 ECharts 库&#xf…...

PHP培训机构教务管理系统小程序源码

&#x1f511; 培训机构教务管理系统——智慧教育&#xff0c;高效管理新典范 &#x1f680; 这款教务管理系统&#xff0c;是基于前沿的ThinkPHP框架与Uniapp技术深度融合&#xff0c;匠心打造的培训机构管理神器。它犹如一把开启高效运营与精细管理的金钥匙&#xff0c;专为…...

JAVA学习第五天

接口的变量定义固定为静态变量 接口里面只能有抽象方法&#xff0c;且不能有构造方法 如果不重写tostring方法&#xff0c;会打印没有价值的信息...

pnpm和npm安装TailwindCss

npm下载及初始化来自Tailwind官方文档 npm下载&#xff1a; npm install -D tailwindcss npm初始化Tailwind&#xff1a; npx tailwindcss init pnpm下载&#xff1a; pnpm add -D tailwindcss3.4.1 postcss autoprefixer pnpm初始化Tailwind&#xff1a; pnpm exec tailwindc…...

【云安全】云原生-K8S(四)安全问题分析

Kubernetes&#xff08;K8S&#xff09;因其强大的容器编排能力成为了云计算和微服务架构的首选&#xff0c;但同时也带来了复杂的安全挑战。本文将概述K8S的主要安全问题&#xff0c;帮助安全工程师理解潜在威胁&#xff0c;并采取相应的防护措施。 K8S 攻击面概览 下面两张…...

Cloud之快照存储(Cloud Snapshot Storage)

Cloud之快照存储 一、什么是快照 1. 快照的定义 快照&#xff08;Snapshot&#xff09;是一种记录某一时刻数据状态的技术。在计算机存储和虚拟化环境中&#xff0c;快照能够将文件系统或虚拟机的状态保存下来&#xff0c;以便以后能够回溯到某一特定时间点。快照通常用于备…...

cs106x-lecture11(Autumn 2017)-SPL实现

打卡cs106x(Autumn 2017)-lecture11 (以下皆使用SPL实现&#xff0c;非STL库&#xff0c;后续课程结束会使用STL实现) 1、diceRolls Write a recursive function named diceRolls accepts an integer representing a number of 6-sided dice to roll, and output all possibl…...

负载均衡集群( LVS 相关原理与集群构建 )

目录 1、LVS 相关原理 1.1、LVS集群的体系结构以及特点 1.1.1 LVS简介 1.1.2 LVS体系结构 1.1.3 LVS相关术语 1.1.4 LVS工作模式 1.1.5 LVS调度算法 1.2 LVS-DR集群介绍 1.2.1 LVS-DR模式工作原理 1.2.2 LVS-DR模式应用特点 1.2.3 LVS-DR模式ARP抑制 1.3 LVS – NA…...

【分布式】Hadoop完全分布式的搭建(零基础)

Hadoop完全分布式的搭建 环境准备&#xff1a; &#xff08;1&#xff09;VMware Workstation Pro17&#xff08;其他也可&#xff09; &#xff08;2&#xff09;Centos7 &#xff08;3&#xff09;FinalShell &#xff08;一&#xff09;模型机配置 0****&#xff09;安…...

基于Java+Swing+Mysql实现人事管理信息系统

基于JavaSwingMysql实现人事管理信息系统 一、系统介绍二、功能展示1.用户登陆2.用户注册3.员工信息添加、删除4.员工信息查询、修改5.部门管理6、员工考核 三、数据库四、其它1.其他系统实现五.获取源码 一、系统介绍 系统功能&#xff1a;用户登陆、用户注册、员工信息添加、…...

DeepSeek与ChatGPT:会取代搜索引擎和人工客服的人工智能革命

云边有个稻草人-CSDN博客 在众多创新技术中&#xff0c;DeepSeek和ChatGPT无疑是最为引人注目的。它们通过强大的搜索和对话生成能力&#xff0c;能够改变我们与计算机交互的方式&#xff0c;帮助我们高效地获取信息&#xff0c;增强智能服务。本文将深入探讨这两项技术如何结合…...

企业级RAG开源项目分享:Quivr、MaxKB、Dify、FastGPT、RagFlow

企业级 RAG GitHub 开源项目深度分享&#xff1a;Quivr、MaxKB、Dify、FastGPT、RagFlow 及私有化 LLM 部署建议 随着生成式 AI 技术的成熟&#xff0c;检索增强生成&#xff08;RAG&#xff09;已成为企业构建智能应用的关键技术。RAG 技术能够有效地将大型语言模型&#xff…...

js基础知识总结

1、js数据类型有哪些&#xff1f;存储区别 js基础类型及引用类型存储区别代码示例如下&#xff1a; // 基本数据类型 let a 10; let b a; // b 是 a 的一个副本 b 20; // 修改 b 不会影响 …...

LearnOpenGL——高级OpenGL(下)

教程地址&#xff1a;简介 - LearnOpenGL CN 高级数据 原文链接&#xff1a;高级数据 - LearnOpenGL CN 在OpenGL中&#xff0c;我们长期以来一直依赖缓冲来存储数据。本节将深入探讨一些操作缓冲的高级方法。 OpenGL中的缓冲本质上是一个管理特定内存块的对象&#xff0c;它…...

vue脚手架开发打地鼠游戏

游戏设计&#xff1a; 规划游戏的核心功能&#xff0c;如场景、随机出现的地鼠、计分系统、游戏时间限制等。简单设计游戏流程&#xff0c;包括开始界面、游戏进行中、关卡设置&#xff08;如不同关卡地鼠出现数量、游戏时间等&#xff09;、关卡闯关成功|失败、游戏结束闯关成…...

uniapp 连接mqtt

1&#xff1a;下载插件 npm install mqtt 2&#xff1a;创建 mqtt.js /* main.js 项目主入口注入实例 */ // import mqttTool from ./lib/mqttTool.js // Vue.prototype.$mqttTool mqttTool/* 使用范例见 /pages/index/index.vue */ // mqtt协议&#xff1a;H5使用ws/wss APP-…...

EX_25/2/19

1. 封装一个 File 类&#xff0c;用有私有成员 File* fp 实现以下功能 File f "文件名" 要求打开该文件 f.write(string str) 要求将str数据写入文件中 string str f.read(int size) 从文件中读取最多size个字节&#xff0c;并将读取到的数据返回 析构函数 …...

Breakout Tool

思科 CML 使用起来还是很麻烦的&#xff0c;很多操作对于习惯了 secure crt 或者 putty 等工具的网络工程师都不友好。 Breakout Tool 提供对远程实验室中虚拟机控制台与图形界面的本地化接入能力&#xff0c;其核心特性如下&#xff1a; Console 访问&#xff1a;基于 Telnet…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程&#xff1a;&#xff08;白话解释&#xff09; 我们将原始待发送的消息称为 M M M&#xff0c;依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)&#xff08;意思就是 G &#xff08; x ) G&#xff08;x) G&#xff08;x) 是已知的&#xff09;&#xff0…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

USB Over IP专用硬件的5个特点

USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中&#xff0c;从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备&#xff08;如专用硬件设备&#xff09;&#xff0c;从而消除了直接物理连接的需要。USB over IP的…...

[ACTF2020 新生赛]Include 1(php://filter伪协议)

题目 做法 启动靶机&#xff0c;点进去 点进去 查看URL&#xff0c;有 ?fileflag.php说明存在文件包含&#xff0c;原理是php://filter 协议 当它与包含函数结合时&#xff0c;php://filter流会被当作php文件执行。 用php://filter加编码&#xff0c;能让PHP把文件内容…...

SQL Server 触发器调用存储过程实现发送 HTTP 请求

文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...