当前位置: 首页 > news >正文

Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

目录

    • 一、独立消费者消费某一个主题中某个分区数据案例
      • 1.1、案例需求
      • 1.2、案例代码
      • 1.3、测试

一、独立消费者消费某一个主题中某个分区数据案例

1.1、案例需求

  • 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示:
    在这里插入图片描述

1.2、案例代码

  • 生产者往firstTopic主题 0 号分区发送数据代码

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("firstTopic", 0,"","hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
    }
  • 消费者消费firstTopic主题 0 分区数据代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题对应的分区ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("firstTopic",0));kafkaConsumer.assign(topicPartitions);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
    }
    

1.3、测试

  • 在 IDEA 中执行消费者程序,如下图:
    在这里插入图片描述
  • 在 IDEA 中执行生产者程序 ,在控制台观察生成几个 0号分区的数据,如下图:
    在这里插入图片描述
  • 在 IDEA 控制台,观察接收到的数据,只能消费到 0 号分区数据表示正确。
    在这里插入图片描述

相关文章:

Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

目录 一、独立消费者消费某一个主题中某个分区数据案例1.1、案例需求1.2、案例代码1.3、测试 一、独立消费者消费某一个主题中某个分区数据案例 1.1、案例需求 创建一个独立消费者&#xff0c;消费firstTopic主题 0 号分区的数据&#xff0c;所下图所示&#xff1a; 1.2、案…...

基于Simulink的用于电力系统动态分析

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

日200亿次调用,喜马拉雅网关的架构设计

说在前面 在40岁老架构师 尼恩的读者社区(50)中&#xff0c;很多小伙伴拿到一线互联网企业如阿里、网易、有赞、希音、百度、滴滴的面试资格。 最近&#xff0c;尼恩指导一个小伙伴简历&#xff0c;写了一个《API网关项目》&#xff0c;此项目帮这个小伙拿到 字节/阿里/微博/…...

构造函数和析构函数(个人学习笔记黑马学习)

构造函数:主要作用在于创建对象时为对象的成员属性赋值&#xff0c;构造函数由编译器自动调用&#xff0c;无须手动调用。析构函数:主要作用在于对象销毁前系统自动调用&#xff0c;执行一些清理工作。 #include <iostream> using namespace std;//对象初始化和清理class…...

GPT引领前沿与应用突破之GPT4科研实践技术与AI绘图教程

详情点击链接&#xff1a;GPT引领前沿与应用突破之GPT4科研实践技术与AI绘图教程 前沿 GPT对于每个科研人员已经成为不可或缺的辅助工具&#xff0c;不同的研究领域和项目具有不同的需求。 如在科研编程、绘图领域&#xff1a; 1、编程建议和示例代码: 无论你使用的编程语言是…...

Git上传新项目

第一步&#xff1a;初始化 Git 仓库 首先&#xff0c;打开终端或命令行界面&#xff0c;然后导航到项目目录。运行下面的命令来初始化一个新的 Git 仓库&#xff1a; git init这将创建一个新的 .git 子目录&#xff0c;其中包含了初始化的 Git 仓库。 第二步&#xff1a;添加…...

C语言文件操作总结

目录 字符方式读入文件 数据块方式读写文件 文件定位与随机读写 文件中数据的修改 字符方式读入文件 1.向文件中写入&#xff08;输入字符&#xff09; 用 fputc 函数或 puts 函数可以把一个字符写到磁盘文件中去。 int fputc(int ch,FILE * fp) ch 是要输出的字符&#…...

原生js之dom如何进行事件监听(事件捕获/冒泡)

那么好,这次主要讲解的就是dom是如何进行事件监听和事件取消监听的,我们知道vue中主要用watch来进行监听. js监听与取消监听 那么原生js主要用到的就是addListenEvent事件来进行监听,可以监听文档dom对象也可以监听浏览器bom对象,监听事件的语法结构如下 Dom/Bom监听 eleme…...

使用SimPowerSystems并网光伏阵列研究(Simulink实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

BUUCTF-WEB-[ACTF2020 新生赛]Includel

打开靶机 点击tips 利用Burp抓包&#xff0c;未见异常 但发现了响应头是 PHP/7.3.13 想到了"php://input"伪协议POST发送PHP代码 构建Payload&#xff1a;?filephp://filter/readconvert.base64-encode/resourceflag.php 这里需要注意的是使用php://filter伪协议…...

算法通关村十四关:白银挑战-堆能高效解决的经典问题

白银挑战-堆能高效解决的经典问题 1.在数组中找第K大的元素 LeetCode215 https://leetcode.cn/problems/kth-largest-element-in-an-array/ 思路分析 主要解决方法有3个&#xff0c;选择法&#xff0c;堆查找法和快速排序法 方法1&#xff1a;选择法 先遍历一遍找到最大的…...

跨站请求伪造(CSRF)攻击与防御原理

跨站请求伪造&#xff08;CSRF&#xff09; 1.1 CSRF原理 1.1.1 基本概念 跨站请求伪造&#xff08;Cross Site Request Forgery&#xff0c;CSRF&#xff09;是一种攻击&#xff0c;它强制浏览器客户端用户在当前对其进行身份验证后的Web 应用程序上执行非本意操作的攻击&a…...

从0到1实现播放控制器

这系列文章主要讲诉如何从0到1使用QT实现带时间显示、滚动字幕等的自定义配置视频播放控制器。平时我们乘坐地铁经常看到各条线的播放控制器都大同小异。其实都是通过QT等界面开发软件来实现的。 在具体开发之前&#xff0c;需要明确我们需要做什么&#xff1f; 1. 开发一个可…...

【Vue-Element-Admin】导出el-table全部数据

背景 因为el-table实现了分页查询&#xff0c;所以想要实现el-table需要重新编写一个查询全部数据的方法 查询全部数据 listQuery: export default{return{listQuery:{//page:1,//limit:20,//如果想兼容按条件导出&#xff0c;可以定义查询条件age:undefined,sex:undefined…...

MFC 更改控件的大小和位置

获取当前主窗体的位置rect CRect dlgNow;GetWindowRect(&dlgNow);获取某一个控件当前的位置 CRect rect;CButton* pBtn (CButton*)GetDlgItem(IDC_BUTTONXXX);//获取按钮控件pBtn->GetWindowRect(rect);CWnd* pWnd(CWnd*)GetDlgItem(IDC_EDITXXX);//其它控件&#xff0…...

【向量数据库】相似向量检索Faiss数据库的安装及余弦相似度计算(C++)

目录 简介安装方法安装OpenBLAS安装lapack编译Faiss 代码示例余弦相似度计算输出ID号而非索引的改进版 简介 Faiss 是一个强大的向量相似度搜索库&#xff0c;具有以下优点&#xff1a; 高效的搜索性能&#xff1a;Faiss 在处理大规模向量数据时表现出色。它利用了高度优化的索…...

教育培训小程序的设计与功能解析

随着互联网的发展&#xff0c;线上教育逐渐成为一种趋势&#xff0c;越来越多的人开始选择在线学习。而搭建一个适合自己的线上教育小程序&#xff0c;可以为教育机构或个人提供更好的教学和学习体验。在本文中&#xff0c;我们将介绍如何通过一个第三方制作平台来搭建在线教育…...

【ES】illegal_argument_exception“,“reason“:“Result window is too large

查询ES数据返回错误&#xff1a; {"root_cause":[{"type":"illegal_argument_exception","reason":"Result window is too large, from size must be less than or equal to: [10000] but was [999999]. See the scroll api for…...

SpringBoot实现登录拦截

如果我们不进行登录拦截的话&#xff0c;即使我们跳过登录页面直接去访问任意一个页面也能访问成功&#xff0c;那么登录功能就没有意义&#xff0c;同时也会存在安全问题&#xff0c;因为有些操作是要用户登录后才能执行的&#xff0c;如果用户没有登录&#xff0c;该接口就获…...

浅谈泛在电力物联网、能源互联网与虚拟电厂

导读&#xff1a;从能源互联网推进受阻&#xff0c;到泛在电力物联网名噪一时&#xff0c;到虚拟电厂再次走向火爆&#xff0c;能源领域亟需更进一步的数智化发展。如今&#xff0c;随着新型电力系统建设推进&#xff0c;虚拟电厂有望迎来快速发展。除了国网和南网公司下属的电…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽&#xff0c;大家好&#xff0c;我是左手python&#xff01; Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库&#xff0c;用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

前端倒计时误差!

提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

使用分级同态加密防御梯度泄漏

抽象 联邦学习 &#xff08;FL&#xff09; 支持跨分布式客户端进行协作模型训练&#xff0c;而无需共享原始数据&#xff0c;这使其成为在互联和自动驾驶汽车 &#xff08;CAV&#xff09; 等领域保护隐私的机器学习的一种很有前途的方法。然而&#xff0c;最近的研究表明&…...

04-初识css

一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...

Linux --进程控制

本文从以下五个方面来初步认识进程控制&#xff1a; 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程&#xff0c;创建出来的进程就是子进程&#xff0c;原来的进程为父进程。…...

解读《网络安全法》最新修订,把握网络安全新趋势

《网络安全法》自2017年施行以来&#xff0c;在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂&#xff0c;网络攻击、数据泄露等事件频发&#xff0c;现行法律已难以完全适应新的风险挑战。 2025年3月28日&#xff0c;国家网信办会同相关部门起草了《网络安全…...

怎么让Comfyui导出的图像不包含工作流信息,

为了数据安全&#xff0c;让Comfyui导出的图像不包含工作流信息&#xff0c;导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo&#xff08;推荐&#xff09;​​ 在 save_images 方法中&#xff0c;​​删除或注释掉所有与 metadata …...

作为测试我们应该关注redis哪些方面

1、功能测试 数据结构操作&#xff1a;验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化&#xff1a;测试aof和aof持久化机制&#xff0c;确保数据在开启后正确恢复。 事务&#xff1a;检查事务的原子性和回滚机制。 发布订阅&#xff1a;确保消息正确传递。 2、性…...

PostgreSQL——环境搭建

一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在&#xff0…...