当前位置: 首页 > news >正文

Kafka消费者api编写教程

1.基本属性配置

输入new Properties().var 回车

//创建属性Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//指定消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称

//创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics

        //创建一个数组列表变量接收topics值ArrayList<String> topics = new ArrayList<>();//指定要订阅的主题topics.add("customers");//订阅主题kafkaConsumer.subscribe(topics);

3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

4.消费数据

//消费数据while (true){//if (flag  == true) flag 标志位置//break;//}生产中退出循环的位置;ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历for (ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}

5.运行MyConsumer,通过生产者api发送消息

输出台上可以看到输出的都是订阅的主题/分区的信息

6.完整代码

package com.ljr.kafka.replay;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {//创建属性Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//指定消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");//创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);/*//订阅主题//创建一个数组列表变量接收topics值ArrayList<String> topics = new ArrayList<>();//指定要订阅的主题topics.add("customers");//订阅主题kafkaConsumer.subscribe(topics);*///订阅分区//创建一个数组列表变量接收主题分区值ArrayList<TopicPartition> topicPartitions = new ArrayList<>();//指定要订阅的分区topicPartitions.add(new TopicPartition("customers",2));//订阅分区kafkaConsumer.assign(topicPartitions);//消费数据while (true){//if (flag  == true) flag 标志位置//break;//}生产中退出循环的位置;ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历for (ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}
}

相关文章:

Kafka消费者api编写教程

1.基本属性配置 输入new Properties().var 回车 //创建属性Properties properties new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CL…...

什么情况下要配置DNS服务

什么是DNS 一、DNS就是域名解析 我们上网的方式通常都由ip地址组成&#xff0c;但是为了有个规范&#xff0c;而且我们也不可能去记住那么多一串Ip数字&#xff0c;首先域名就会比ip好记很多&#xff0c;其次固定性&#xff0c;一旦服务器换了&#xff0c;只要重新绑定域名对…...

华为端云一体化开发 (起步1.0)(HarmonyOS学习第七课)

官方文献&#xff1a; 为丰富HarmonyOS对云端开发的支持、实现端云联动&#xff0c;DevEco Studio推出了云开发功能&#xff0c;开发者在创建工程时选择云开发模板&#xff0c;即可在DevEco Studio内同时完成HarmonyOS应用/元服务的端侧与云侧开发&#xff0c;体验端云一体化协…...

数据结构之ArrayList与顺序表(上)

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a;数据结构&#xff08;Java版&#xff09; 顺序表的学习&#xff0c;点我 上面这篇博文是关于顺序表的基础知识&#xff0c;以及顺序表的实现。…...

Java 8 中的 Stream API,用于处理集合数据

Java 8 引入了 Stream API&#xff0c;使得处理集合数据变得更加简洁和高效。Stream API 允许开发者以声明式编程风格操作数据集合&#xff0c;而不是使用传统的迭代和条件语句。 一、基本概念 1.1 什么是 Stream Stream 是 Java 8 中的一个新抽象&#xff0c;它允许对集合数…...

106、python-第四阶段-3-设计模式-单例模式

不是单例类&#xff0c;如下&#xff1a; class StrTools():pass str1StrTools() str2StrTools() print(str1) print(str2) 运用单例&#xff0c;先创建一个test.py class StrTools():pass str1StrTools()然后创建一个hello.py&#xff0c;在这个文件中引用test.py中的对象&a…...

【猫狗识别系统】图像识别Python+TensorFlow+卷积神经网络算法+人工智能深度学习

猫狗识别系统。通过TensorFlow搭建MobileNetV2轻量级卷积神经算法网络模型&#xff0c;通过对猫狗的图片数据集进行训练&#xff0c;得到一个进度较高的H5格式的模型文件。然后使用Django框架搭建了一个Web网页端可视化操作界面。实现用户上传一张图片识别其名称。 一、前言 …...

记录汇川:红绿灯与HMI-ST

项目要求&#xff1a; 子程序&#xff1a; 子程序&#xff1a; 实际动作如下&#xff1a; 红绿灯与HMI-ST...

已解决java.nio.charset.CoderMalfunctionError: 编码器故障错误的正确解决方法,亲测有效!!!

已解决java.nio.charset.CoderMalfunctionError: 编码器故障错误的正确解决方法&#xff0c;亲测有效&#xff01;&#xff01;&#xff01; 亲测有效 报错问题解决思路解决方法1. 检查和清理输入数据2. 选择正确的字符集3. 处理异常情况4. 更新Java版本或库5. 检查第三方库的依…...

Linux 中常用的设置、工具和操作

1.设置固定的ip地址步骤 1.1 添加IPADDR“所设置的固定ip地址” TYPE"Ethernet" PROXY_METHOD"none" BROWSER_ONLY"no" BOOTPROTO"static" DEFROUTE"yes" IPV4_FAILURE_FATAL"no" IPV6INIT"yes" IPV6…...

[论文笔记]AIOS: LLM Agent Operating System

引言 这是一篇有意思的论文AIOS: LLM Agent Operating System&#xff0c;把LLM智能体(代理)看成是操作系统。 基于大语言模型(LLMs)的智能代理的集成和部署过程中存在着许多挑战&#xff0c;其中问题包括代理请求在LLM上的次优调度和资源分配&#xff0c;代理和LLM之间在交互…...

2024全国高考作文题解读(文心一言 4.0版本)

新课标I卷 阅读下面的材料&#xff0c;根据要求写作。&#xff08;60分&#xff09; 随着互联网的普及、人工智能的应用&#xff0c;越来越多的问题能很快得到答案。那么&#xff0c;我们的问题是否会越来越少&#xff1f; 以上材料引发了你怎样的联想和思考&#xff1f;请写…...

【功能超全】基于OpenCV车牌识别停车场管理系统软件开发【含python源码+PyqtUI界面+功能详解】-车牌识别python 深度学习实战项目

车牌识别基础功能演示 摘要&#xff1a;车牌识别系统(Vehicle License Plate Recognition&#xff0c;VLPR) 是指能够检测到受监控路面的车辆并自动提取车辆牌照信息&#xff08;含汉字字符、英文字母、阿拉伯数字及号牌颜色&#xff09;进行处理的技术。车牌识别是现代智能交通…...

TESSENT2024.1安装

一、安装过程参考Calibre安装过程&#xff08;此处省略&#xff0c;不再赘述&#xff09; 二、安装license管理器&#xff1a; SiemensLicenseServer_v2.2.1.0_Lnx64_x86-64.bin 三、Patch补丁&#xff1a; tessent安装目录和license管理安装目录&#xff0c;执行FlexNetLic…...

【机器学习】原理与应用场景 Python代码展现

机器学习&#xff1a;原理、应用与实例深度解析 引言一、机器学习的基本原理二、机器学习的应用范围三、机器学习实例解析四、机器学习部分讲解五、机器学习的挑战与未来 引言 随着大数据和计算能力的飞速发展&#xff0c;机器学习&#xff08;Machine Learning, ML&#xff0…...

Python怎么循环计数:深入解析与实践

Python怎么循环计数&#xff1a;深入解析与实践 在Python编程中&#xff0c;循环计数是一项基础且重要的技能。无论是处理列表、遍历文件&#xff0c;还是执行重复任务&#xff0c;循环计数都发挥着不可或缺的作用。本文将从四个方面、五个方面、六个方面和七个方面详细阐述Py…...

Facebook企业户 | Facebook公共主页经营

Facebook作为社交媒体巨头&#xff0c;拥有庞大的用户基数&#xff0c;因此&#xff0c;有效经营公共主页是获取持续流量、提升客户信任度和粘性、促进产品或服务销售与转化的关键。要优化Facebook主页&#xff0c;关注以下几点&#xff1a; 1、参与度是关键指标&#xff1a;因…...

排序数组 ---- 分治-归并

题目链接 题目: 分析: 用这道题来回顾一下归并排序的思想找到中间结点, 将数组分成两半, 运用递归的思想, 继续对一半进行分半, 分到最后剩一个元素, 再将左右数组合并, 合并两个有序数组, 是先分解, 再合并的过程在合并两个有序数组时, 需要一个额外的数组来记录, 为了避免每…...

【红黑树变色+旋转】

文章目录 一. 红黑树规则二. 情况一叔叔存在且为红情况二.变色旋旋 一. 红黑树规则 对于红黑树&#xff0c;进行变色旋转处理&#xff0c;终究都是为了维持颜色以下几条规则&#xff0c;只有颜色和规则维持住了&#xff0c;红黑树就维持住了最长路径的长度不超过最短路径的两倍…...

pytorch 使用tensor混合:进行index操作

(Pdb) tmp torch.randn(3,5) (Pdb) indx torch.tensor([1,0]).long() (Pdb) temp(indx) *** NameError: name ‘temp’ is not defined (Pdb) tmp(indx) *** TypeError: ‘Tensor’ object is not callable (Pdb) tmp[indx] tensor([[ 0.1633, 0.9389, 1.2806, -0.2525, 0.28…...

Canvas Quest人像修复与增强实战:老照片修复与画质提升

Canvas Quest人像修复与增强实战&#xff1a;老照片修复与画质提升 1. 老照片修复的痛点与解决方案 翻开家里的老相册&#xff0c;总能看到一些泛黄、破损或模糊的照片。这些承载着珍贵记忆的画面&#xff0c;往往因为年代久远而变得难以辨认。传统的手工修复不仅耗时费力&am…...

中文医疗大模型避坑指南:从MedBench评测看5大常见训练误区

中文医疗大模型实战避坑手册&#xff1a;从MedBench看模型训练的5个致命盲区 当ChatGPT掀起通用大模型的热潮时&#xff0c;医疗领域正在经历一场更为严谨的技术革命。不同于开放域的对话生成&#xff0c;医疗大模型的每个输出都可能直接影响临床决策——这要求开发者必须跨越专…...

UE5.3与Colosseum集成配置指南及常见问题解析

1. 环境准备&#xff1a;Windows系统下的基础配置 在开始Colosseum与UE5.3的集成之前&#xff0c;我们需要确保开发环境满足基本要求。我最近在Windows 11系统上完成了一次完整配置&#xff0c;实测下来这几个关键组件版本组合最稳定&#xff1a; 操作系统&#xff1a;Windows …...

ReAct Agent:新手程序员必看!收藏这款融合推理与行动的AI智能体框架,轻松入门大模型应用开发

ReAct框架通过结合推理与行动&#xff0c;解决了传统提示工程的局限性&#xff0c;构建出能主动思考、决策并执行复杂任务的智能体。本文详细介绍了ReAct的核心设计思想&#xff0c;包括推理模块的动态思考链和错误回溯机制&#xff0c;以及行动模块的工具集成和环境状态感知。…...

【独家首发】Polars 2.0 vs Pandas 2.2清洗基准测试:10亿行CSV清洗仅耗11.3秒?真相在此

第一章&#xff1a;Polars 2.0大规模数据清洗的范式跃迁Polars 2.0 不再是 Pandas 的轻量替代品&#xff0c;而是一次面向现代硬件与真实业务场景的数据处理范式重构。其核心跃迁体现在零拷贝内存布局、全链路惰性执行引擎&#xff08;LazyFrame&#xff09;与原生支持的并行流…...

WWW-万维网

万维网的概念与组成结构万维网&#xff08;World Wide Web&#xff0c;WWW&#xff09;是一个分布式的信息存储空间&#xff0c;在这个空间中&#xff1a;一个事物被称为一样 “资源”&#xff0c;并由一个全域 “统一资源定位符”&#xff08;URL&#xff09;标识。这些资源通…...

保姆级避坑指南:Ubuntu系统下Hadoop HA集群搭建,我踩过的那些SSH和配置文件的“坑”

Ubuntu下Hadoop HA集群搭建&#xff1a;那些教科书不会告诉你的实战陷阱 第一次在Ubuntu上搭建Hadoop HA集群时&#xff0c;我天真地以为照着官方文档就能顺利跑起来。直到SSH连接莫名其妙失败、JournalNode权限报错刷屏、ZKFC死活不启动时&#xff0c;才明白为什么有人说大数据…...

MRM-MOT4X3.6CAN电机驱动库:工业级CAN总线电机控制抽象层

1. 项目概述mrm-mot4x3.6can是一款面向工业级电机控制场景的专用 CAN 总线驱动库&#xff0c;专为 MRMS&#xff08;Modular Robotic Motor Systems&#xff09;公司推出的MRM-MOT4X3.6CAN 四通道直流电机控制器设计。该控制器集成 4 路独立 H 桥驱动单元&#xff0c;每路持续输…...

告别Python环境依赖!用PyInstaller打包Tkinter/Selenium程序的最佳实践

告别Python环境依赖&#xff01;用PyInstaller打包Tkinter/Selenium程序的最佳实践 你是否遇到过这样的尴尬场景&#xff1f;精心开发的Python程序在本地运行完美&#xff0c;但分享给同事或客户时&#xff0c;对方却因为缺少Python环境或依赖库而无法使用。尤其当程序涉及图形…...

2025年开源工具jable-download:视频下载工具高效解决方案

2025年开源工具jable-download&#xff1a;视频下载工具高效解决方案 【免费下载链接】jable-download 方便下载jable的小工具 项目地址: https://gitcode.com/gh_mirrors/ja/jable-download 在数字化内容消费日益增长的今天&#xff0c;视频资源的获取与保存成为许多用…...