当前位置: 首页 > news >正文

flink消费kafka数据,按照指定时间开始消费

kafka中根据时间戳开始消费数据

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.nodes.CollectionNode;import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;/*** 支持按topic指定开始消费时间戳** @author */
public class KafkaOffsetsInitializer implements OffsetsInitializer  {private Logger logger = LoggerFactory.getLogger(KafkaOffsetsInitializer.class);private static final long serialVersionUID = 1L;/*** key:topic,value:开始消费时间戳*/private Map<String, Long> topicStartingTimestamps;private ParameterTool parameters;/*** @param topicStartingTimestamps* @param parameters*/public KafkaOffsetsInitializer(Map<String, Long> topicStartingTimestamps, ParameterTool parameters) {this.topicStartingTimestamps = topicStartingTimestamps;this.parameters = parameters;}@Overridepublic Map<TopicPartition, Long> getPartitionOffsets(Collection<TopicPartition> partitions,PartitionOffsetsRetriever partitionOffsetsRetriever) {//定义起始时间,初始offsetMap<TopicPartition, Long> startingTimestamps = new HashMap<>();Map<TopicPartition, Long> initialOffsets = new HashMap<>();//commited offsetMap<TopicPartition, Long> committedOffsets = partitionOffsetsRetriever.committedOffsets(partitions);//beginningOffsets the first offset for the given partitions.Map<TopicPartition, Long> beginningOffsets = partitionOffsetsRetriever.beginningOffsets(partitions);//endOffsets the for the given partitions.Map<TopicPartition, Long> endOffsets = partitionOffsetsRetriever.endOffsets(partitions);final long now = System.currentTimeMillis();partitions.forEach(tp -> {//起始时间赋值为从redis中获取到相对应topic的时间Long startingTimestamp = topicStartingTimestamps.get(tp.topic());if (startingTimestamp == null) {//redis里没有取到消费开始时间从启动时间消费startingTimestamp = now;logger.info("从redis没有取到时间戳,topic:{},partition:{},使用当前时间:{},{}", tp.topic(), tp.partition(), now, new Date(now));}logger.info("读取时间戳,topic:{},partition:{},时间戳:{},{}", tp.topic(), tp.partition(), now, new Date(now));startingTimestamps.put(tp, startingTimestamp);});partitionOffsetsRetriever.offsetsForTimes(startingTimestamps).forEach((tp, offsetMetadata) -> {long offsetForTime = beginningOffsets.get(tp);long offsetForCommit = beginningOffsets.get(tp);if (offsetMetadata != null) {offsetForTime = offsetMetadata.offset();logger.info("根据时间戳取到offset,topic:{},partition:{},offset:{}", tp.topic(), tp.partition(), offsetForTime);}Long commitedOffset = committedOffsets.get(tp);if (commitedOffset != null) {offsetForCommit = commitedOffset.longValue();logger.info("根据已提交offset取到offset,topic:{},partition:{},offset:{}", tp.topic(), tp.partition(), offsetForCommit);}logger.info("设置读取offset,topic:{},partition:{},offset:{},endOffset:{}", tp.topic(), tp.partition(), Math.max(offsetForTime, offsetForCommit), endOffsets.get(tp));//对比时间戳对应的offset和checkpoint保存的offset,取较大值//initialOffsets.put(tp, Math.max(offsetForTime, offsetForCommit));initialOffsets.put(tp, offsetForCommit);});return initialOffsets;}@Overridepublic OffsetResetStrategy getAutoOffsetResetStrategy() {return OffsetResetStrategy.NONE;}
}

相关文章:

flink消费kafka数据,按照指定时间开始消费

kafka中根据时间戳开始消费数据 import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetRese…...

【SpringCloud】Feign使用

文章目录 配置maven启动类添加yml 使用添加Feign服务Controller 其他设置超时设置YML开启OpenFeign客户端超时控制&#xff08;Ribbon Timeout&#xff09;OpenFeign日志打印功能日志级别YML开启日志 配置 maven <dependencies><!--openfeign--><dependency&g…...

WebApIs 第五天

window对象 BOM&#xff08;浏览器对象模型&#xff09;定时器-延时函数JS执行机制location对象navigator对象histroy对象 本地存储 一.BOM&#xff08;浏览器对象模型&#xff09; ① BOM是浏览器对象模型 window 对象是一个全局对象&#xff0c;也可以说是JavaScript中的…...

按斤称的C++散知识

一、多线程 std::thread()、join() 的用法&#xff1a;使用std::thread()可以创建一个线程&#xff0c;同时指定线程执行函数以及参数&#xff0c;同时也可使用lamda表达式。 #include <iostream> #include <thread>void threadFunction(int num) {std::cout <…...

C++策略模式

1 简介&#xff1a; 策略模式是一种行为型设计模式&#xff0c;用于在运行时根据不同的情况选择不同的算法或行为。它将算法封装成一个个具体的策略类&#xff0c;并使这些策略类可以相互替换&#xff0c;以达到动态改变对象的行为的目的。 2 实现步骤&#xff1a; 以下是使用…...

如何在网页下载腾讯视频为本地MP4格式

1.打开腾讯视频官网地址 腾讯视频 2.搜索你想要下载的视频 3. 点击分享,选择复制通用代码 <iframe frameborder="0" src="ht...

opencv-yolov8-目标检测

import cv2 from ultralytics import YOLO# 模型加载权重model YOLO(yolov8n.pt)# 视频路径cap cv2.VideoCapture(0)# 对视频中检测到目标画框标出来 while cap.isOpened():# Read a frame from the videosuccess, frame cap.read()if success:# Run YOLOv8 inference on th…...

CRYPTO 密码学-笔记

一、古典密码学 1.替换法&#xff1a;用固定的信息&#xff0c;将原文替换成密文 替换法的加密方式&#xff1a;一种是单表替换&#xff0c;另一种是多表替换 单表替换&#xff1a;原文和密文使用同一张表 abcde---》sfdgh 多表替换&#xff1a;有多涨表&#xff0c;原文和密文…...

基于YOLOv8模型的五类动物目标检测系统(PyTorch+Pyside6+YOLOv8模型)

摘要&#xff1a;基于YOLOv8模型的五类动物目标检测系统可用于日常生活中检测与定位动物目标&#xff08;狼、鹿、猪、兔和浣熊&#xff09;&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的目标检测&#xff0c;另外本系统还支持图片、视频等格式的结果可视化与…...

Java课题笔记~ SpringBoot基础配置

二、基础配置 1. 配置文件格式 问题导入 框架常见的配置文件有哪几种形式&#xff1f; 1.1 修改服务器端口 http://localhost:8080/books/1 >>> http://localhost/books/1 SpringBoot提供了多种属性配置方式 application.properties server.port80 applicati…...

vue实现文件上传,前后端

前端封装el-upload组件&#xff0c;父组件传值dialogVisible&#xff08;用于显示el-dialog&#xff09;&#xff0c;子组件接收&#xff0c;并且关闭的时候返回一个值&#xff08;用于隐藏el-dialog&#xff09;,最多上传五个文件&#xff0c;文件格式为.jpg\pdf\png <tem…...

OJ练习第151题——克隆图

克隆图 力扣链接&#xff1a;133. 克隆图 题目描述 给你无向 连通 图中一个节点的引用&#xff0c;请你返回该图的 深拷贝&#xff08;克隆&#xff09;。 示例 分析 对于一张图而言&#xff0c;它的深拷贝即构建一张与原图结构&#xff0c;值均一样的图&#xff0c;但是…...

keepalived+lvs实现高可用

目录 环境 1.配置real-server服务器 2.配置keepalived和lvs 3.测试&#xff1a; 概述 keepalivedlvs实现高可用&#xff1a; lvs可以监控后端服务器&#xff0c;当服务器宕机之后可以对其进行故障切换。 keepalived是对VIP进行检测&#xff0c;当某一个主机的vip错误&…...

【Let‘s make it big】英语合集61~70

61(82) top-of-range it doesn’t get any better than this There seems to be a problem with my account What seems to be the problem withdraw money from my saving account charged an overdraft fee we don’t give loans to customers whose accounts are overdrawn…...

python实现图像的二分类

要实现图像的二分类&#xff0c;可以使用深度学习中的卷积神经网络&#xff08;Convolutional Neural Network, CNN&#xff09;模型。下面是一个使用Keras库实现的简单CNN模型示例&#xff1a; from keras.models import Sequential from keras.layers import Conv2D, MaxPoo…...

8.深浅拷贝和异常处理

开发中我们经常需要复制一个对象。如果直接用赋值会有下面问题: 8.1 浅拷贝 首先浅拷贝和深拷贝只针对引用类型 浅拷贝&#xff1a;拷贝的是地址 常见方法: 1.拷贝对象&#xff1a;Object.assgin() / 展开运算符{…obj} 拷贝对象 2.拷贝数组&#xff1a;Array.prototype.con…...

Element Plus el-table 数据为空时自定义内容【默认为 No Data】

1. 通过 Table 属性设置 <div class"el-plus-table"><el-table empty-text"暂无数据" :data"tableData" style"width: 100%"><el-table-column prop"date" label"Date" width"180" /&g…...

使用nginx和frp实现高效内网穿透:简单配置,畅通无阻

I. 引言 A. 介绍内网穿透的概念和用途 内网穿透是一种网络技术&#xff0c;它允许用户通过公共网络访问位于私有网络&#xff08;内网&#xff09;中的资源和服务。在传统的网络环境中&#xff0c;内网通常是由路由器或防火墙保护的&#xff0c;无法直接从外部网络访问内部资…...

Python土力学与基础工程计算.PDF-螺旋板载荷试验

python 求解代码如下&#xff1a; 1. import numpy as np 2. 3. # 已知参数 4. p_a 100 # 标准压力&#xff0c; kPa 5. p np.array([25, 50, 100, 200) # 荷载&#xff0c; kPa 6. s np.array([2.88, 5.28, 9.50, 15.00) / 10 # 沉降量&#xff0c; cm 7. D 10 # 螺旋板直…...

低代码开发ERP:精打细算,聚焦核心投入

企业数字化转型已经成为现代商业环境中的一项关键任务。如今&#xff0c;企业面临着日益激烈的竞争和不断变化的市场需求。在这样的背景下&#xff0c;数字化转型不仅是企业生存的必然选择&#xff0c;也是取得竞争优势和实现可持续发展的关键因素。 在数字化转型的过程中&…...

Arm Compiler 6.16LTS功能安全认证语言扩展解析

1. Arm Compiler for Embedded FuSa 6.16LTS语言扩展支持现状解析在功能安全关键型嵌入式系统开发中&#xff0c;编译器工具链的认证状态直接关系到最终产品的合规性。Arm Compiler for Embedded FuSa 6.16LTS作为经过功能安全认证的工具链&#xff0c;其语言扩展支持情况需要开…...

<数据集>yolo 易拉罐识别<目标检测>

数据集下载链接https://download.csdn.net/download/qq_53332949/92882375数据集格式&#xff1a;VOCYOLO格式 图片数量&#xff1a;3253张 标注数量(xml文件个数)&#xff1a;3253 标注数量(txt文件个数)&#xff1a;3253 标注类别数&#xff1a;1 标注类别名称&#xff…...

支付系统架构设计:从交易核心到资金核算的稳定性实践

1. 支付系统总览&#xff1a;从业务到资金的桥梁但凡涉及在线交易的公司&#xff0c;支付系统都是其技术架构中当之无愧的“心脏”。它远不止是调用一个第三方支付接口那么简单&#xff0c;而是一套连接用户、业务、资金渠道和内部账务的复杂工程体系。一个设计得当的支付系统&…...

ENVI实战:从直方图拉伸到图像变换,解锁遥感影像增强核心技巧

1. 遥感影像增强入门&#xff1a;为什么需要处理&#xff1f; 第一次接触遥感影像时&#xff0c;很多人会疑惑&#xff1a;为什么卫星拍回来的原始图像总是灰蒙蒙的&#xff1f;这就像用手机在雾天拍照&#xff0c;所有景物都像蒙了一层纱。我在处理长江流域水体监测项目时就遇…...

基于ESP32与NeoPixel的智能灯光控制系统:从硬件选型到Web控制全解析

1. 项目概述&#xff1a;打造你的专属智能光效中心几年前&#xff0c;我为了给家里的节日装饰增添点科技感&#xff0c;琢磨着怎么让一串普通的LED灯带变得“听话”——能从手机或电脑上随意切换颜色和动画。当时市面上成品的智能灯带要么价格不菲&#xff0c;要么功能受限&…...

用Java+GDAL+OpenCV玩转遥感图像:手把手教你实现Landsat标准假彩色合成(附完整代码)

JavaGDALOpenCV遥感图像处理实战&#xff1a;Landsat标准假彩色合成全流程解析 遥感图像处理正逐渐从专业软件向通用编程语言生态迁移。对于熟悉Java的开发者而言&#xff0c;利用GDAL和OpenCV这两个强大的库&#xff0c;完全可以构建自主可控的遥感处理流程。本文将完整展示如…...

【Qt串口实战】硬件升级后readyRead信号丢失的排查与修复

1. 问题现象&#xff1a;硬件升级后readyRead信号神秘消失 那天早上刚到公司&#xff0c;硬件组的同事兴冲冲地跑过来告诉我&#xff1a;"老王&#xff0c;我们给设备升级了最新固件&#xff0c;性能提升30%&#xff01;"我心想这是好事啊&#xff0c;结果打开调试软…...

AI黑魔法实战:LLM应用性能优化与成本控制高级技巧

1. 项目概述&#xff1a;当AI遇上“黑魔法”最近在GitHub上闲逛&#xff0c;发现了一个名为“lvcn/ai-black-magic”的项目&#xff0c;这个名字本身就充满了吸引力。对于任何在AI领域摸爬滚打过的开发者来说&#xff0c;“黑魔法”这个词往往意味着那些不按常理出牌、却能解决…...

j | 禁忌 | n |孩

通过网盘分享的文件&#xff1a;禁 | 忌女 | 孩(日版) 链接: https://pan.baidu.com/s/1bjsnnvP2f1EiA8ySTbCAOg?pwdtqp2 提取码: tqp2...

别再搞混了!Docker export和save到底啥区别?用busybox实战带你分清

深入解析Docker镜像与容器快照&#xff1a;从busybox实战看export与save的本质差异 在Docker的日常使用中&#xff0c;许多开发者经常对docker export和docker save这两个命令感到困惑。它们都能生成.tar文件&#xff0c;看似功能相似&#xff0c;实则针对完全不同的场景和对象…...