Kafka消费者api编写教程
1.基本属性配置
输入new Properties().var 回车
//创建属性Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//指定消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");
2.创建消费者
输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称
//创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
3.订阅主题/分区
3.1订阅主题
输入new ArrayList<String,String>().var 回车修改变量名为topics
//创建一个数组列表变量接收topics值ArrayList<String> topics = new ArrayList<>();//指定要订阅的主题topics.add("customers");//订阅主题kafkaConsumer.subscribe(topics);
3.2订阅分区
输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions
4.消费数据
//消费数据while (true){//if (flag == true) flag 标志位置//break;//}生产中退出循环的位置;ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历for (ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}
5.运行MyConsumer,通过生产者api发送消息

输出台上可以看到输出的都是订阅的主题/分区的信息
6.完整代码
package com.ljr.kafka.replay;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {//创建属性Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//指定消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");//创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);/*//订阅主题//创建一个数组列表变量接收topics值ArrayList<String> topics = new ArrayList<>();//指定要订阅的主题topics.add("customers");//订阅主题kafkaConsumer.subscribe(topics);*///订阅分区//创建一个数组列表变量接收主题分区值ArrayList<TopicPartition> topicPartitions = new ArrayList<>();//指定要订阅的分区topicPartitions.add(new TopicPartition("customers",2));//订阅分区kafkaConsumer.assign(topicPartitions);//消费数据while (true){//if (flag == true) flag 标志位置//break;//}生产中退出循环的位置;ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历for (ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}
}
相关文章:
Kafka消费者api编写教程
1.基本属性配置 输入new Properties().var 回车 //创建属性Properties properties new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CL…...
什么情况下要配置DNS服务
什么是DNS 一、DNS就是域名解析 我们上网的方式通常都由ip地址组成,但是为了有个规范,而且我们也不可能去记住那么多一串Ip数字,首先域名就会比ip好记很多,其次固定性,一旦服务器换了,只要重新绑定域名对…...
华为端云一体化开发 (起步1.0)(HarmonyOS学习第七课)
官方文献: 为丰富HarmonyOS对云端开发的支持、实现端云联动,DevEco Studio推出了云开发功能,开发者在创建工程时选择云开发模板,即可在DevEco Studio内同时完成HarmonyOS应用/元服务的端侧与云侧开发,体验端云一体化协…...
数据结构之ArrayList与顺序表(上)
找往期文章包括但不限于本期文章中不懂的知识点: 个人主页:我要学编程(ಥ_ಥ)-CSDN博客 所属专栏:数据结构(Java版) 顺序表的学习,点我 上面这篇博文是关于顺序表的基础知识,以及顺序表的实现。…...
Java 8 中的 Stream API,用于处理集合数据
Java 8 引入了 Stream API,使得处理集合数据变得更加简洁和高效。Stream API 允许开发者以声明式编程风格操作数据集合,而不是使用传统的迭代和条件语句。 一、基本概念 1.1 什么是 Stream Stream 是 Java 8 中的一个新抽象,它允许对集合数…...
106、python-第四阶段-3-设计模式-单例模式
不是单例类,如下: class StrTools():pass str1StrTools() str2StrTools() print(str1) print(str2) 运用单例,先创建一个test.py class StrTools():pass str1StrTools()然后创建一个hello.py,在这个文件中引用test.py中的对象&a…...
【猫狗识别系统】图像识别Python+TensorFlow+卷积神经网络算法+人工智能深度学习
猫狗识别系统。通过TensorFlow搭建MobileNetV2轻量级卷积神经算法网络模型,通过对猫狗的图片数据集进行训练,得到一个进度较高的H5格式的模型文件。然后使用Django框架搭建了一个Web网页端可视化操作界面。实现用户上传一张图片识别其名称。 一、前言 …...
记录汇川:红绿灯与HMI-ST
项目要求: 子程序: 子程序: 实际动作如下: 红绿灯与HMI-ST...
已解决java.nio.charset.CoderMalfunctionError: 编码器故障错误的正确解决方法,亲测有效!!!
已解决java.nio.charset.CoderMalfunctionError: 编码器故障错误的正确解决方法,亲测有效!!! 亲测有效 报错问题解决思路解决方法1. 检查和清理输入数据2. 选择正确的字符集3. 处理异常情况4. 更新Java版本或库5. 检查第三方库的依…...
Linux 中常用的设置、工具和操作
1.设置固定的ip地址步骤 1.1 添加IPADDR“所设置的固定ip地址” TYPE"Ethernet" PROXY_METHOD"none" BROWSER_ONLY"no" BOOTPROTO"static" DEFROUTE"yes" IPV4_FAILURE_FATAL"no" IPV6INIT"yes" IPV6…...
[论文笔记]AIOS: LLM Agent Operating System
引言 这是一篇有意思的论文AIOS: LLM Agent Operating System,把LLM智能体(代理)看成是操作系统。 基于大语言模型(LLMs)的智能代理的集成和部署过程中存在着许多挑战,其中问题包括代理请求在LLM上的次优调度和资源分配,代理和LLM之间在交互…...
2024全国高考作文题解读(文心一言 4.0版本)
新课标I卷 阅读下面的材料,根据要求写作。(60分) 随着互联网的普及、人工智能的应用,越来越多的问题能很快得到答案。那么,我们的问题是否会越来越少? 以上材料引发了你怎样的联想和思考?请写…...
【功能超全】基于OpenCV车牌识别停车场管理系统软件开发【含python源码+PyqtUI界面+功能详解】-车牌识别python 深度学习实战项目
车牌识别基础功能演示 摘要:车牌识别系统(Vehicle License Plate Recognition,VLPR) 是指能够检测到受监控路面的车辆并自动提取车辆牌照信息(含汉字字符、英文字母、阿拉伯数字及号牌颜色)进行处理的技术。车牌识别是现代智能交通…...
TESSENT2024.1安装
一、安装过程参考Calibre安装过程(此处省略,不再赘述) 二、安装license管理器: SiemensLicenseServer_v2.2.1.0_Lnx64_x86-64.bin 三、Patch补丁: tessent安装目录和license管理安装目录,执行FlexNetLic…...
【机器学习】原理与应用场景 Python代码展现
机器学习:原理、应用与实例深度解析 引言一、机器学习的基本原理二、机器学习的应用范围三、机器学习实例解析四、机器学习部分讲解五、机器学习的挑战与未来 引言 随着大数据和计算能力的飞速发展,机器学习(Machine Learning, ML࿰…...
Python怎么循环计数:深入解析与实践
Python怎么循环计数:深入解析与实践 在Python编程中,循环计数是一项基础且重要的技能。无论是处理列表、遍历文件,还是执行重复任务,循环计数都发挥着不可或缺的作用。本文将从四个方面、五个方面、六个方面和七个方面详细阐述Py…...
Facebook企业户 | Facebook公共主页经营
Facebook作为社交媒体巨头,拥有庞大的用户基数,因此,有效经营公共主页是获取持续流量、提升客户信任度和粘性、促进产品或服务销售与转化的关键。要优化Facebook主页,关注以下几点: 1、参与度是关键指标:因…...
排序数组 ---- 分治-归并
题目链接 题目: 分析: 用这道题来回顾一下归并排序的思想找到中间结点, 将数组分成两半, 运用递归的思想, 继续对一半进行分半, 分到最后剩一个元素, 再将左右数组合并, 合并两个有序数组, 是先分解, 再合并的过程在合并两个有序数组时, 需要一个额外的数组来记录, 为了避免每…...
【红黑树变色+旋转】
文章目录 一. 红黑树规则二. 情况一叔叔存在且为红情况二.变色旋旋 一. 红黑树规则 对于红黑树,进行变色旋转处理,终究都是为了维持颜色以下几条规则,只有颜色和规则维持住了,红黑树就维持住了最长路径的长度不超过最短路径的两倍…...
pytorch 使用tensor混合:进行index操作
(Pdb) tmp torch.randn(3,5) (Pdb) indx torch.tensor([1,0]).long() (Pdb) temp(indx) *** NameError: name ‘temp’ is not defined (Pdb) tmp(indx) *** TypeError: ‘Tensor’ object is not callable (Pdb) tmp[indx] tensor([[ 0.1633, 0.9389, 1.2806, -0.2525, 0.28…...
IDEA运行Tomcat出现乱码问题解决汇总
最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…...
未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
手游刚开服就被攻击怎么办?如何防御DDoS?
开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...
对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
Java求职者面试指南:计算机基础与源码原理深度解析
Java求职者面试指南:计算机基础与源码原理深度解析 第一轮提问:基础概念问题 1. 请解释什么是进程和线程的区别? 面试官:进程是程序的一次执行过程,是系统进行资源分配和调度的基本单位;而线程是进程中的…...
vulnyx Blogger writeup
信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...
【C++】纯虚函数类外可以写实现吗?
1. 答案 先说答案,可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...
DBLP数据库是什么?
DBLP(Digital Bibliography & Library Project)Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高,数据库文献更新速度很快,很好地反映了国际计算机科学学术研…...
