kafka:消费者从指定时间的偏移开始消费(二)
我的前一篇博客《kafka:AdminClient获取指定主题的所有消费者的消费偏移(一)》为了忽略忽略掉上线之前的所有消息,从获取指定主题的所有消费者的消费偏移并计算出最大偏移来解决此问题。
但这个方案需要使用不常用的AdminClient类,而且如果该主题如果是第一次被消费者拉取消息时,因为得不到消费者的消费偏移,最后的结果,就是从0偏移开始拉取所有消息。并不能真正实现忽略上线之前所有消息的目的。
所以我又优化了方案。基本的原理就是使用KafkaConsumer.offsetsForTimes方法获取消费者的所有主题分区的指定时间的偏移,并将这个偏移作为消费开始的偏移(KafkaConsumer.seek方法) 。
@Testpublic void test3SeekToTime() {// 配置Kafka消费者的属性Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my_consumer_group");props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());// 创建Kafka消费者实例try(Consumer<String, String> consumer = new KafkaConsumer<>(props)){ boolean seek = false;/** * 循环开始的时间,* 忽略该时间之前的消息*/long startMills = System.currentTimeMillis();while (true) {try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(4000));if(!seek) {if(!records.isEmpty()) {/** * 获取第一批消息时更新消息偏移到循环开始的时间*/consumer.offsetsForTimes(Maps.asMap(consumer.assignment(),t->startMills)).forEach((k,v)->{if(null != v) {System.out.println("seek %s to %s",k,v.offset());consumer.seek(k,v.offset());}});seek = true;}/** 跳过第一批获取到的消息,继续循环 */continue;}records.forEach(record -> {String value = record.value();System.out.println("Received message: " + value);});}catch (Exception e) {e.printStackTrace();}}}}
相关文章:
kafka:消费者从指定时间的偏移开始消费(二)
我的前一篇博客《kafka:AdminClient获取指定主题的所有消费者的消费偏移(一)》为了忽略忽略掉上线之前的所有消息,从获取指定主题的所有消费者的消费偏移并计算出最大偏移来解决此问题。 但这个方案需要使用不常用的AdminClient类,而且如果该主题如果是第…...
Spring的加载配置文件、容器和获取bean的方式
🐌个人主页: 🐌 叶落闲庭 💨我的专栏:💨 c语言 数据结构 javaweb 石可破也,而不可夺坚;丹可磨也,而不可夺赤。 Spring配置文件和容器相关 一、加载properties文件1.1加载…...
(二)利用Streamlit创建第一个app——单页面、多页面
1 单页面app Step1:创建一个新的Python脚本。我们称之为uber_pickups.py。 Step2:在您喜爱的IDE或文本编辑器中打开uber_pickups.py,然后添加以下行: import streamlit as st import pandas as pd import numpy as npStep3&…...
一条sql查询语句在mysql中的执行过程是什么
mysql的连接器 我们想要在mysql中执行一条sql查询语句,首先需要连接到mysql服务,那么客户端首先要向mysql服务端发起连接请求,我们可以在客户端用mysql -h [ip] -P [port] -u 用户名 -p 密码 命令向服务端发起连接请求,这个连接请…...
网络互联究竟是需要什么协议相同,什么协议不同?
混淆概念的几个说法: 说法1: “以太网交换机不可以实现采用不同网络层协议的互联” 原因:以太网交换机是数据链路层的设备,不懂网络层的知识 说法2: “网桥可互联不同的物理层、不同的MAC子层以及不同速率的以太网”…...
ajax axios json
目录 一、ajax概述 1. 概念 2. 实现方式 (1)原生的JS实现方式(了解) (2) JQeury实现方式 二、axios 介绍 三、axios使用 1. axios 发送get/post请求 2. axios验证用户名称是否存在 四、json 1. …...
外观模式——提供统一入口
1、简介 1.1、概述 在软件开发中,有时候为了完成一项较为复杂的功能,一个类需要和多个其他业务类交互,而这些需要交互的业务类经常会作为一个完整的整体出现,由于涉及的类比较多,导致使用时代码较为复杂。此时&#…...
Vue中导入并读取Excel数据
在工作中遇到需要前端上传excel文件获取到相应数据处理之后传给后端并且展示上传文件的数据. 一、引入依赖 npm install -S file-saver xlsxnpm install -D script-loadernpm install xlsx二、在main.js中引入 import XLSX from xlsx三、创建vue文件 <div><el-uplo…...
CUDA常用函数
cudaDeviceSynchronize cudaDeviceSynchronize是一个CUDA函数,用于同步当前设备上的所有CUDA流。它会阻塞调用它的线程,直到所有设备上的CUDA流都执行完为止。这可以确保在进行后续的CUDA操作时,先前的操作已经完成。 在CUDA程序中࿰…...
72. ElasticSearch常用命令
索引管理 1新建索引 curl -XPUT http://10.42.172.35:9200/index012 读写权限 curl -XPUT -d {"blocks.read":false} http://10.42.172.35:9200/index01/_settings3 查看索引 单个 curl -XGET http://10.42.172.35:9200/index01/_settings多个 curl -XGET http…...
2023.7.26(同余方程的通解与特解)
Water(扩欧求特解与通解) 题意:给容量分别为A与B的水杯,问确切喝到C水的最小操作次数 有4种操作:选一杯全喝,选一杯全部倒掉,选一杯装满,将一杯的水尽量倒到另一杯中 思路:只有AxByC有解时才能确…...
Diffusion扩散模型学习3——Stable Diffusion结构解析-以图像生成图像(图生图,img2img)为例
Diffusion扩散模型学习3——Stable Diffusion结构解析-以图像生成图像(图生图,img2img)为例 学习前言源码下载地址网络构建一、什么是Stable Diffusion(SD)二、Stable Diffusion的组成三、img2img生成流程1、输入图片编…...
LangChain||什么是LangChain? LangChain有什么用?
从Auto-GPT说起: Auto-GPT可以调用本地电脑工具处理复杂信息;Auto-GPT可以围绕目标查阅资 料、“独立思考”、及时反馈、并 及时调整下一步操作…Auto-GPT的诞生,创造了大家 对“将LLM作为智慧大脑来高效 处理综合复杂任务”的想象;首次尝试串联大语言模…...
秋招算法备战第28天 | 93.复原IP地址、78.子集、90.子集II
93. 复原 IP 地址 - 力扣(LeetCode) 这个问题可以通过深度优先搜索(DFS)的方法来解决。我们要做的就是在字符串的每个可能位置插入点,然后检查生成的每一部分是否在 0-255 的范围内,以及是否没有前导零(除非这一部分本…...
Mongodb空间索引的使用以及与Django的对接
Mongodb的空间索引 Mongodb数据库大家都非常熟悉,是一个基于分布式文件存储的开源数据库系统,在高负载的情况下,添加更多的节点,可以保证服务器性能,数据结构由键值(key>value)对组成。MongoDB 文档类似于 JSON 对…...
Windows安装MySQL数据库
MySQL数据库安装 MySQL下载 下载地址:https://dev.mysql.com/downloads/mysql/ 可以选择下载msi或zip,以下为zip模式安装步骤 下载了mysql的zip安装包之后解压即可; Windows安装步骤 初始化MySQL,并记录生成的用户密码root的随机…...
聊聊函数式编程中的“式”
当谈到函数式编程的“式”时,通常指的是函数的组合、转换和应用,以及处理数据的方式和风格。在函数式编程中,式是用来构建程序逻辑的基本单元。 下面更详细解释函数式编程中的几个关键式: 函数的组合: 函数式编程中…...
ubuntu目录分析
在Ubuntu根目录下,以下是一些常见文件夹的含义: /bin:存放可执行文件,包含一些基本的命令和工具。 /boot:存放启动时所需的文件,如内核和引导加载程序。 /dev:包含设备文件,用于与硬…...
Python 进阶(三):正则表达式(re 模块)
❤️ 博客主页:水滴技术 🌸 订阅专栏:Python 入门核心技术 🚀 支持水滴:点赞👍 收藏⭐ 留言💬 文章目录 1. 导入re模块2. re模块中的常用函数2.1 re.search()2.2 re.findall()2.3 re.sub()2.4…...
Vue2 第六节 key的作用与原理
(1)虚拟DOM (2)v-for中的key的作用 一.虚拟DOM 1.虚拟DOM就是内存中的数据 2.原生的JS没有虚拟DOM: 如果新的数据和原来的数据有重复数据,不会在原来的基础上新加数据,而是重新生成一份 3. Vue会有虚拟…...
Excel VBA实战:打造高精度自定义计时器
1. 为什么需要自定义计时器? 在实验室数据采集、运动训练计时、工业生产监控等场景中,我们经常需要精确记录时间间隔。虽然Excel自带的时间函数能解决部分需求,但遇到以下情况时,原生功能就显得力不从心: 毫秒级精度要…...
实战指南:基于快马平台与Touchgal,从零开发移动端手写绘图应用
今天想和大家分享一个实战项目:基于Touchgal开发移动端手写绘图应用。这个项目特别适合需要复杂手势交互的场景,比如绘图软件、地图导航等。下面我会详细介绍整个开发流程和关键实现点。 项目初始化与环境搭建 首先需要创建一个基础的HTML5项目结构。画…...
Qwen3-ASR-0.6B作品集:Qwen3-ForcedAligner-0.6B时间戳精度图谱
Qwen3-ASR-0.6B作品集:Qwen3-ForcedAligner-0.6B时间戳精度图谱 你有没有想过,一段语音里的每个字、每个词,甚至每个音节,是在哪个精确的时间点被说出来的?这听起来像是电影后期制作里的黑科技,但现在&…...
MIKE URBAN中污水处理厂如何进行概化
01 前言应用厂网一体化耦合模型研究水厂间调度和厂前溢流入河污染量等内容时,由于不需要关注污水处理厂内部的具体处理工艺,需要对污水处理厂的关键设施进行概化处理。02 水厂资料收集收集污水处理的工艺流程图和设施设计参数。依据厂网一体化模型的研究…...
为什么92%的团队在MCP项目中期被迫重构?Python 4大模板的抽象泄漏、协议耦合与测试盲区深度拆解
第一章:MCP服务器开发模板的行业现状与重构困局当前,MCP(Model Control Protocol)服务器作为AI智能体协同调度与协议网关的核心组件,在金融风控、工业边缘控制、多模态Agent编排等场景中加速落地。然而,主流…...
音乐版权侵权避坑指南:明星翻唱踩的红线,这些行为也在踩
短视频/直播/门店公播全场景合规方案 正版商用音乐授权平台推荐央广网北京3月30日消息(记者费权)近日,歌手单依纯在深圳演唱会上未经授权演唱李荣浩原创作品《李白》,而此前李荣浩方已明确婉拒其版权授权申请,中国音乐…...
ESP-IDF嵌入式类型工具:轻量级字节与位操作库
1. 项目概述 esp_type_utils 是面向 ESP-IDF 生态的轻量级类型工具组件,专为嵌入式底层开发中高频出现的字节级数据操作与字符串格式化需求而设计。它并非 ESP-IDF 官方 SDK 的一部分,而是由开发者 Eric Gionet(K0I05)维护的开源…...
【中文文献管理效率提升90%】茉莉花插件:科研工作者的智能文献处理解决方案
【中文文献管理效率提升90%】茉莉花插件:科研工作者的智能文献处理解决方案 【免费下载链接】jasminum A Zotero add-on to retrive CNKI meta data. 一个简单的Zotero 插件,用于识别中文元数据 项目地址: https://gitcode.com/gh_mirrors/ja/jasminum…...
Qwen-Image-Edit-F2P在Vue前端项目中的可视化应用
Qwen-Image-Edit-F2P在Vue前端项目中的可视化应用 1. 引言 想象一下这样的场景:用户上传一张简单的人脸照片,几秒钟后就能看到自己穿着优雅礼服站在巴黎街头,或是化身古风侠客执剑而立。这种曾经只存在于科幻电影中的体验,现在通…...
Lychee Rerank MM GPU算力:Qwen2.5-VL 7B模型在A10上16GB显存高效运行
Lychee Rerank MM GPU算力:Qwen2.5-VL 7B模型在A10上16GB显存高效运行 1. 引言:当多模态检索遇到“选择困难症” 想象一下,你正在一个庞大的多媒体资料库里搜索。你输入“一只在草地上玩耍的棕色小狗”,系统返回了100个结果&…...
