Flink RoaringBitmap去重
1、RoaringBitmap的依赖
<!-- 去重大哥-->
<dependency><groupId>org.roaringbitmap</groupId><artifactId>RoaringBitmap</artifactId><version>0.9.21</version>
</dependency>
2、Demo去重
package com.gwm.driver;import com.alibaba.fastjson.JSON;
import com.alibaba.flink.connectors.datahub.datastream.source.DatahubSourceFunction;
import com.aliyun.datahub.client.model.RecordEntry;
import com.gwm.pojo.EventSuccessInfo;
import com.gwm.utils.TimeToStampUtil;
import com.gwm.utils.getString;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import scala.Tuple4;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.UUID;/*** @author yangyingchun* @version 1.0* @date 2022/11/14 16:26*/
public class EventOrderSuccessRoaringBitmap {private static String endPoint = "endPoint ";//private static String endPoint ="public endpoint";//公网访问(填写内网Endpoint,就不用填写公网Endpoint)。private static String projectName = "projectName ";private static String topicSourceName = "topicSourceName ";
// private static String topicSourceName = "topicSourceName ";private static String accessId = "accessId ";private static String accessKey = "accessKey ";//设置消费的启动位点对应的时间。TimeToStampUtil.timeToStamp("2021-12-21") 此时间至少为当前时间
// private static Long datahubStartInMs = TimeToStampUtil.timeToStamp("2023-02-23");private static Long datahubStartInMs = System.currentTimeMillis();private static Long datahubEndInMs=Long.MAX_VALUE;private static SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static SimpleDateFormat sd1 = new SimpleDateFormat("yyyy-MM-dd");private static Date startDate;static {try {startDate = sd1.parse(sd.format(new Date()));} catch (ParseException e) {e.printStackTrace();}};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.enableCheckpointing(3600000L);
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));env.setParallelism(1);DataStreamSource<List<RecordEntry>> aedata = env.addSource(new DatahubSourceFunction(endPoint,projectName,topicSourceName,accessId,accessKey,datahubStartInMs,datahubEndInMs,20L,1000L,1000));DataStream<Tuple4<String, EventSuccessInfo, String, Long>> aecollectordataDataStream = aedata.flatMap(new FlatMapFunction<List<RecordEntry>, Tuple4<String, EventSuccessInfo, String, Long>>() {@Overridepublic void flatMap(List<RecordEntry> value, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {for (RecordEntry recordEntry : value) {String phone = getString.getString(recordEntry, "customer_phone");Long order_sn = Long.parseLong(getString.getString(recordEntry, "order_no"));String brand = getString.getString(recordEntry, "brand");String car_model = getString.getString(recordEntry, "car_model");String action_time = "null".equals(getString.getString(recordEntry, "paid_at"))||"".equals(getString.getString(recordEntry, "paid_at"))?null:sd.format(new Date(Long.parseLong(getString.getString(recordEntry, "paid_at"))/1000));Double paid_amount = "null".equals(getString.getString(recordEntry, "paid_amount"))?null:Double.parseDouble(getString.getString(recordEntry, "paid_amount"));String name = getString.getString(recordEntry, "customer_name");String operation_flag = getString.getString(recordEntry, "new_dts_sync_dts_after_flag");String order_time = "null".equals(getString.getString(recordEntry, "order_time"))||"".equals(getString.getString(recordEntry, "order_time"))?null:sd.format(new Date(Long.parseLong(getString.getString(recordEntry, "order_time"))/1000));String order_state = getString.getString(recordEntry, "order_state"); //'订购成功'Date add_time ="null".equals(getString.getString(recordEntry, "order_time"))||"".equals(getString.getString(recordEntry, "order_time"))?null:new Date(Long.parseLong(getString.getString(recordEntry, "order_time")) / 1000);
// startDate = sd1.parse(sd.format(new Date()));System.out.println(order_state+"====startDate:"+startDate+"====paid_at:"+order_time+"=====phone+order_sn:"+phone+"--"+order_sn);//这里有三个问题,// 1、技术+业务:因为获取的是数据库操作日志,所以数据是重复的,(已经做了重复校验,确保不会重复发且无时效性)// 2、技术:如果操作了历史数据,且用户的订单状态恰好还是订购成功时,也会触达,是不是要加限制,加的话加什么合适,// 新增且当天(很多数据是获取不到时间的)?还是所有时间都推,再ma测加一个时间的控制条件// 结论:空的也要,// 3、业务:需要明确订购成功的规则,否则极易造成异常, order_state=12当前是订购成功 能复用吗if (
// "12".equals(order_state)&&"Y".equals(operation_flag)
// &&!StringUtils.isNullOrWhitespaceOnly(order_time)
// &&add_time.after(startDate)){EventSuccessInfo eventSuccessInfo = new EventSuccessInfo(phone, order_sn, brand, car_model, action_time, paid_amount, name, operation_flag,order_time,order_state);// System.out.println(eventSuccessInfo);Tuple4<String, EventSuccessInfo, String, Long> tuple4= new Tuple4<String, EventSuccessInfo, String, Long>("test_event_order_success",eventSuccessInfo,UUID.randomUUID().toString().replace("-",""),System.currentTimeMillis());out.collect(tuple4);}}}});KeyedStream<Tuple4<String, EventSuccessInfo, String, Long>, String> tuple4StringKeyedStream= aecollectordataDataStream.keyBy(x -> x._2().getPhone());// StateTtlConfig ttlConfig = StateTtlConfig
// .newBuilder(Time.days(2))
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// .build();//create StateDescriptor//这里进行状态注册通过bitmap高效存储实现去重,当然bitmap去重只适合bigint场景ValueStateDescriptor<Roaring64Bitmap> bitmapDescriptor = new ValueStateDescriptor("Roaring64Bitmap",TypeInformation.of(new TypeHint<Roaring64Bitmap>() {}));//手机号去重逻辑 通过Roaring64BitmapSingleOutputStreamOperator<Tuple4<String, EventSuccessInfo, String, Long>> map = tuple4StringKeyedStream.filter(new RichFilterFunction<Tuple4<String, EventSuccessInfo, String, Long>>() {//1.定义状态 进行手机号去重private transient ValueState<Roaring64Bitmap> bitmapState;@Overridepublic void open(Configuration parameters) throws Exception {// 设置状态生命周期
// StateTtlConfig stateTtlConfig = new StateTtlConfig
// .Builder(Time.days(1)) // 周期为1天
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建或者更新状态时重新刷新生命周期
// .build();bitmapState = getRuntimeContext().getState(bitmapDescriptor);;}@Overridepublic boolean filter(Tuple4<String, EventSuccessInfo, String, Long> value) throws Exception {//由于本程序只筛选订购成功的,所以每个手机号+每个订单唯一确认一条数据(订单状态已经在上游过滤过了)Roaring64Bitmap bitmap = bitmapState.value();if (bitmap == null) {bitmap = new Roaring64Bitmap();}if (!bitmap.contains(value._2().getOrder_sn())) {bitmap.addLong(value._2().getOrder_sn());bitmapState.update(bitmap);return true;}return false;}});//因为是binlog,但需求只要数据时间是当天的 :通过flink定时器 定义每天零晨更新比较时间SingleOutputStreamOperator<Tuple4<String, EventSuccessInfo, String, Long>> process = map.keyBy(x -> x._2().getPhone()).process(new KeyedProcessFunction<String, Tuple4<String, EventSuccessInfo, String, Long>, Tuple4<String, EventSuccessInfo, String, Long>>() {//1.定义状态 进行手机号去重private ValueState<String> timeSate;@Overridepublic void processElement(Tuple4<String, EventSuccessInfo, String, Long> value, Context ctx, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {//获取格林威治标准时间的第二天00:00:00即获取北京时间的第二天08:00:00
// long ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (1000 * 60 * 60 * 24);//获取北京时间的第二天00:00:00long ts = ( ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24)- 8 * 60 * 60 * 1000;// long ts = 1677054000000L;//如果注册相同数据的TimeTimer,后面的会将前面的覆盖,即相同的timeTimer只会触发一次ctx.timerService().registerProcessingTimeTimer(ts);out.collect(value);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {//定时器质性,每天凌晨更新开始时间
// System.out.println(timestamp);System.out.println("定时器执行了:" + timestamp);//状态初始化timeSate.clear();startDate = sd1.parse(sd.format(new Date()));System.out.println(startDate);
// startDate = sd1.parse("2023-02-01");}});SingleOutputStreamOperator<Tuple4<String, String, String, Long>> jsonString = process.map(new MapFunction<Tuple4<String, EventSuccessInfo, String, Long>, Tuple4<String, String, String, Long>>() {@Overridepublic Tuple4<String, String, String, Long> map(Tuple4<String, EventSuccessInfo, String, Long> value) throws Exception {return new Tuple4<String, String, String, Long>(value._1(),JSON.toJSONString(value._2()),value._3(),value._4());}});jsonString.print();
// jsonString.addSink(new EventOmsSuccessSink());env.execute("EventOrderSuccess===>");}
}
3、注意:Roaring64Bitmap 去重只适合去重整形情况
相关文章:
Flink RoaringBitmap去重
1、RoaringBitmap的依赖 <!-- 去重大哥--> <dependency><groupId>org.roaringbitmap</groupId><artifactId>RoaringBitmap</artifactId><version>0.9.21</version> </dependency> 2、Demo去重 package com.gwm.driver…...
Elasticsearch—(MacOs)
1⃣️环境准备 准备 Java 环境:终端输入 java -version 命令来确认版本是否符合 Elasticsearch 要求下载并解压 Elasticsearch:前往(https://www.elastic.co/downloads/elasticsearch)选择适合你的 Mac 系统的 Elasticsearch 版本…...
插入排序与希尔排序
个人主页:Lei宝啊 愿所有美好如期而遇 前言: 这两个排序在思路上有些相似,所以有人觉得插入排序和希尔排序差别不大,事实上,他们之间的差别不小,插入排序只是希尔排序的最后一步。 目录 前言:…...
C# OpenCvSharp 基于直线检测的文本图像倾斜校正
效果 项目 代码 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using OpenCvSharp;namespace OpenCvSharp_基于直线检测的文…...
“智慧时代的引领者:探索人工智能的无限可能性“
目录 一.背景 二.应用 2.1金融领域 2.2医疗领域 2.3教育领域 三.发展 四.总结: 一.背景 人工智能(Artificial Intelligence,简称AI),是指通过计算机程序模拟人类智能的一种技术。它是计算机科学、工程学、语言学、哲学等多…...
PMSM——转子位置估算基于QPLL
文章目录 前言仿真模型观测器速度观测位置观测转矩波形电流波形 前言 今后是电机控制方向的研究生的啦,期待有同行互相交流。 仿真模型 观测器 速度观测 位置观测 转矩波形 电流波形...
Android Studio之Gradle和Gradle插件的区别
解释的很详细 Android Studio之Gradle和Gradle插件的区别...
DataExcel控件读取和保存excel xlsx 格式文件
需要引用NPOI库 https://github.com/dotnetcore/NPOI 调用Read 函数将excel读取到dataexcel控件 调用Save 函数将dataexcel控件文件保存为excel文件 using NPOI.HSSF.UserModel; using NPOI.HSSF.Util; using NPOI.SS.UserModel; using NPOI.SS.Util; using System; using …...
【JavaEE】CAS(Compare And Swap)操作
文章目录 什么是 CASCAS 的应用如何使用 CAS 操作实现自旋锁CAS 的 ABA 问题CAS 相关面试题 什么是 CAS CAS(Compare and Swap)是一种原子操作,用于在无锁情况下保证数据一致性的问题。它包含三个操作数——内存位置、预期原值及更新值。在执…...
第三章:最新版零基础学习 PYTHON 教程(第三节 - Python 运算符—Python 中的关系运算符)
关系运算符用于比较值。它根据条件返回 True 或 False。这些运算符也称为比较运算符。 操作员描述 句法> 大于:如果左操作数大于右操作数,则为 Truex > y...
【GDB】使用 GDB 自动画红黑树
阅读本文前需要的基础知识 用 python 扩展 gdb python 绘制 graphviz 使用 GDB 画红黑树 前面几节中介绍了 gdb 的 python 扩展,参考 用 python 扩展 gdb 并且 python 有 graphviz 模块,那么可以用 gdb 调用 python,在 python 中使用 grap…...
使用Vue3+elementPlus的Tree组件实现一个拖拽文件夹管理
文章目录 1、前言2、分析3、实现4、踩坑4.1、拖拽辅助线的坑4.2、数据的坑4.3、限制拖拽4.4、样式调整 1、前言 最近在做一个文件夹管理的功能,要实现一个树状的文件夹面板。里面包含两种元素,文件夹以及文件。交互要求如下: 创建、删除&am…...
小谈设计模式(7)—装饰模式
小谈设计模式(7)—装饰模式 专栏介绍专栏地址专栏介绍 装饰模式装饰模式角色Component(抽象组件)ConcreteComponent(具体组件)Decorator(抽象装饰器)ConcreteDecorator(具…...
nginx 多层代理 + k8s ingress 后端服务获取客户真实ip 配置
1.nginx http 七层代理 修改命令空间: namespace: nginx-ingress : configmap:nginx-configuration kubectl get cm nginx-configuration -n ingress-nginx -o yaml添加如上配置 compute-full-forwarded-for: “true” forwarded-for-header: X-Forwa…...
6种最常用的3D点云语义分割AI模型对比
由于增强现实/虚拟现实的发展及其在计算机视觉、自动驾驶和机器人领域的广泛应用,点云学习最近引起了人们的关注。 深度学习已成功用于解决 2D 视觉问题,然而,由于其处理面临独特的挑战,深度学习技术在点云上的使用仍处于起步阶段…...
UG NX二次开发(C#)-获取UI中选择对象的handle值
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 1、前言2、设计一个简单的UI界面3、创建工程项目4、测试结果1、前言 我在哔哩哔哩的视频中看到有人问我如何获取UI选择对象的Handle,本来想把Tag、Taggedobject、Handle三者的关系讲一下,然后看…...
win10,WSL的Ubuntu配python3.7手记
1.装linux 先在windows上安装WSL版本的Ubuntu Windows10系统安装Ubuntu子系统_哔哩哔哩_bilibili (WSL2什么的一直没搞清楚) 图形界面会出一些问题,注意勾选ccsm出的界面设置 win10安装Ubuntu16.04子系统,并开启桌面环境_win…...
02-Zookeeper实战
上一篇:01-Zookeeper特性与节点数据类型详解 1. zookeeper安装 Step1: 配置JAVA环境,检验环境: java -versionStep2: 下载解压 zookeeper wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeepe…...
【C语言深入理解指针(1)】
1.内存和地址 1.1内存 在讲内存和地址之前,我们想有个⽣活中的案例: 假设有⼀栋宿舍楼,把你放在楼⾥,楼上有100个房间,但是房间没有编号,你的⼀个朋友来找你玩,如果想找到你,就得挨…...
模拟实现简单的通讯录
前言:生活中处处都会看到或是用到通讯录,今天我们就通过C语言来简单的模拟实现一下通讯录。 鸡汤:跨越山海,终见曙光! 链接:gitee仓库:代码链接 目录 主函数声明部分初始化通讯录实现扩容的函数增加通讯录所…...
brpc并发编程模型性能对比:基准测试结果
brpc并发编程模型性能对比:基准测试结果 【免费下载链接】brpc brpc is an Industrial-grade RPC framework using C Language, which is often used in high performance system such as Search, Storage, Machine learning, Advertisement, Recommendation etc. &…...
OpenClaw+nanobot技能开发:从零编写自定义文件处理器
OpenClawnanobot技能开发:从零编写自定义文件处理器 1. 为什么需要自定义文件处理技能 上周我整理项目文档时,遇到了一个典型问题:需要将数百个Markdown文件按照"日期-标题"格式批量重命名。手动操作不仅耗时,还容易出…...
OneAPI 百度文心一言ERNIE-Bot接入:千帆平台Key对接指南
OneAPI 百度文心一言ERNIE-Bot接入:千帆平台Key对接指南 安全提示:使用 root 用户初次登录系统后,务必修改默认密码 123456! 1. 引言:为什么需要统一的API管理平台 在当今AI技术快速发展的时代,企业和开发…...
当神经网络遇上麻雀:转向架构架可靠性优化实战
基于CSSA -BR的转向架构架可靠性优化可靠性分析 静强度分析 稳健优化 仿真分析 问题定义: 研究的是包含区间变量和概率变量的混合结构可靠性分析问题。 提出方法: 提出了一种基于混沌麻雀搜索算法(CSSA)和贝叶斯正则化…...
Postiz消息队列:任务优先级与重试机制的终极指南
Postiz消息队列:任务优先级与重试机制的终极指南 【免费下载链接】clickvote Add upvotes, likes, and reviews to any context ⭐️ 项目地址: https://gitcode.com/GitHub_Trending/cl/clickvote Postiz是一款功能强大的开源项目,专注于为开发者…...
避坑指南:HuggingFace本地数据集加载常见的5个报错及解决方法
HuggingFace本地数据集加载实战:5类典型报错深度解析与解决方案 当你第一次尝试将本地数据集加载到HuggingFace生态系统中时,可能会遇到各种令人困惑的错误信息。这些报错往往隐藏着数据格式、特征定义或路径处理等关键问题。本文将剖析开发者最常遇到的…...
致远OA任意文件上传漏洞的深度利用与防御策略
致远OA文件上传漏洞的攻防全景解析与企业级防护指南 1. 漏洞背景与影响范围 致远OA作为国内广泛使用的协同办公系统,其安全性直接影响数百万企业的数据资产。近年来曝光的任意文件上传漏洞因其高危害性成为攻击者重点利用目标。该漏洞允许攻击者在未授权情况下上传恶…...
Spatial Audio(空间音频)与多声道环绕声:从5.1到7.1的沉浸式体验升级
1. 从立体声到环绕声:音频技术的进化之路 记得我第一次在朋友家体验5.1声道家庭影院时,那种子弹从耳边呼啸而过的感觉让我彻底震撼了。这完全颠覆了我对"好音质"的认知——原来声音可以如此立体、如此真实。要理解现代的空间音频技术…...
计算机毕业设计springboot基于java技术的计算机实训室管理系统的设计与实现 基于SpringBoot框架的高校实训室资源预约与信息化管理平台的设计与实现 实验室智能调度与实训过程管理系统
计算机毕业设计springboot基于java技术的计算机实训室管理系统的设计与实现k8svdqb1 (配套有源码 程序 mysql数据库 论文) 本套源码可以在文本联xi,先看具体系统功能演示视频领取,可分享源码参考。随着高校信息化建设的深入推进,传…...
Windows 内网 Web 服务穿透方案推荐
Windows 内网 Web 服务穿透方案推荐 面向场景:内网机器为 Windows,需从公网或外网访问内网 HTTP/HTTPS Web 服务;优先选择相对不易被误报、来源清晰、可审计的方案。 关于「报毒」的说明 穿透类软件常被启发式引擎标为「风险/可疑」…...
