Flink RoaringBitmap去重
1、RoaringBitmap的依赖
<!-- 去重大哥-->
<dependency><groupId>org.roaringbitmap</groupId><artifactId>RoaringBitmap</artifactId><version>0.9.21</version>
</dependency>
2、Demo去重
package com.gwm.driver;import com.alibaba.fastjson.JSON;
import com.alibaba.flink.connectors.datahub.datastream.source.DatahubSourceFunction;
import com.aliyun.datahub.client.model.RecordEntry;
import com.gwm.pojo.EventSuccessInfo;
import com.gwm.utils.TimeToStampUtil;
import com.gwm.utils.getString;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import scala.Tuple4;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.UUID;/*** @author yangyingchun* @version 1.0* @date 2022/11/14 16:26*/
public class EventOrderSuccessRoaringBitmap {private static String endPoint = "endPoint ";//private static String endPoint ="public endpoint";//公网访问(填写内网Endpoint,就不用填写公网Endpoint)。private static String projectName = "projectName ";private static String topicSourceName = "topicSourceName ";
// private static String topicSourceName = "topicSourceName ";private static String accessId = "accessId ";private static String accessKey = "accessKey ";//设置消费的启动位点对应的时间。TimeToStampUtil.timeToStamp("2021-12-21") 此时间至少为当前时间
// private static Long datahubStartInMs = TimeToStampUtil.timeToStamp("2023-02-23");private static Long datahubStartInMs = System.currentTimeMillis();private static Long datahubEndInMs=Long.MAX_VALUE;private static SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static SimpleDateFormat sd1 = new SimpleDateFormat("yyyy-MM-dd");private static Date startDate;static {try {startDate = sd1.parse(sd.format(new Date()));} catch (ParseException e) {e.printStackTrace();}};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.enableCheckpointing(3600000L);
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));env.setParallelism(1);DataStreamSource<List<RecordEntry>> aedata = env.addSource(new DatahubSourceFunction(endPoint,projectName,topicSourceName,accessId,accessKey,datahubStartInMs,datahubEndInMs,20L,1000L,1000));DataStream<Tuple4<String, EventSuccessInfo, String, Long>> aecollectordataDataStream = aedata.flatMap(new FlatMapFunction<List<RecordEntry>, Tuple4<String, EventSuccessInfo, String, Long>>() {@Overridepublic void flatMap(List<RecordEntry> value, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {for (RecordEntry recordEntry : value) {String phone = getString.getString(recordEntry, "customer_phone");Long order_sn = Long.parseLong(getString.getString(recordEntry, "order_no"));String brand = getString.getString(recordEntry, "brand");String car_model = getString.getString(recordEntry, "car_model");String action_time = "null".equals(getString.getString(recordEntry, "paid_at"))||"".equals(getString.getString(recordEntry, "paid_at"))?null:sd.format(new Date(Long.parseLong(getString.getString(recordEntry, "paid_at"))/1000));Double paid_amount = "null".equals(getString.getString(recordEntry, "paid_amount"))?null:Double.parseDouble(getString.getString(recordEntry, "paid_amount"));String name = getString.getString(recordEntry, "customer_name");String operation_flag = getString.getString(recordEntry, "new_dts_sync_dts_after_flag");String order_time = "null".equals(getString.getString(recordEntry, "order_time"))||"".equals(getString.getString(recordEntry, "order_time"))?null:sd.format(new Date(Long.parseLong(getString.getString(recordEntry, "order_time"))/1000));String order_state = getString.getString(recordEntry, "order_state"); //'订购成功'Date add_time ="null".equals(getString.getString(recordEntry, "order_time"))||"".equals(getString.getString(recordEntry, "order_time"))?null:new Date(Long.parseLong(getString.getString(recordEntry, "order_time")) / 1000);
// startDate = sd1.parse(sd.format(new Date()));System.out.println(order_state+"====startDate:"+startDate+"====paid_at:"+order_time+"=====phone+order_sn:"+phone+"--"+order_sn);//这里有三个问题,// 1、技术+业务:因为获取的是数据库操作日志,所以数据是重复的,(已经做了重复校验,确保不会重复发且无时效性)// 2、技术:如果操作了历史数据,且用户的订单状态恰好还是订购成功时,也会触达,是不是要加限制,加的话加什么合适,// 新增且当天(很多数据是获取不到时间的)?还是所有时间都推,再ma测加一个时间的控制条件// 结论:空的也要,// 3、业务:需要明确订购成功的规则,否则极易造成异常, order_state=12当前是订购成功 能复用吗if (
// "12".equals(order_state)&&"Y".equals(operation_flag)
// &&!StringUtils.isNullOrWhitespaceOnly(order_time)
// &&add_time.after(startDate)){EventSuccessInfo eventSuccessInfo = new EventSuccessInfo(phone, order_sn, brand, car_model, action_time, paid_amount, name, operation_flag,order_time,order_state);// System.out.println(eventSuccessInfo);Tuple4<String, EventSuccessInfo, String, Long> tuple4= new Tuple4<String, EventSuccessInfo, String, Long>("test_event_order_success",eventSuccessInfo,UUID.randomUUID().toString().replace("-",""),System.currentTimeMillis());out.collect(tuple4);}}}});KeyedStream<Tuple4<String, EventSuccessInfo, String, Long>, String> tuple4StringKeyedStream= aecollectordataDataStream.keyBy(x -> x._2().getPhone());// StateTtlConfig ttlConfig = StateTtlConfig
// .newBuilder(Time.days(2))
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// .build();//create StateDescriptor//这里进行状态注册通过bitmap高效存储实现去重,当然bitmap去重只适合bigint场景ValueStateDescriptor<Roaring64Bitmap> bitmapDescriptor = new ValueStateDescriptor("Roaring64Bitmap",TypeInformation.of(new TypeHint<Roaring64Bitmap>() {}));//手机号去重逻辑 通过Roaring64BitmapSingleOutputStreamOperator<Tuple4<String, EventSuccessInfo, String, Long>> map = tuple4StringKeyedStream.filter(new RichFilterFunction<Tuple4<String, EventSuccessInfo, String, Long>>() {//1.定义状态 进行手机号去重private transient ValueState<Roaring64Bitmap> bitmapState;@Overridepublic void open(Configuration parameters) throws Exception {// 设置状态生命周期
// StateTtlConfig stateTtlConfig = new StateTtlConfig
// .Builder(Time.days(1)) // 周期为1天
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建或者更新状态时重新刷新生命周期
// .build();bitmapState = getRuntimeContext().getState(bitmapDescriptor);;}@Overridepublic boolean filter(Tuple4<String, EventSuccessInfo, String, Long> value) throws Exception {//由于本程序只筛选订购成功的,所以每个手机号+每个订单唯一确认一条数据(订单状态已经在上游过滤过了)Roaring64Bitmap bitmap = bitmapState.value();if (bitmap == null) {bitmap = new Roaring64Bitmap();}if (!bitmap.contains(value._2().getOrder_sn())) {bitmap.addLong(value._2().getOrder_sn());bitmapState.update(bitmap);return true;}return false;}});//因为是binlog,但需求只要数据时间是当天的 :通过flink定时器 定义每天零晨更新比较时间SingleOutputStreamOperator<Tuple4<String, EventSuccessInfo, String, Long>> process = map.keyBy(x -> x._2().getPhone()).process(new KeyedProcessFunction<String, Tuple4<String, EventSuccessInfo, String, Long>, Tuple4<String, EventSuccessInfo, String, Long>>() {//1.定义状态 进行手机号去重private ValueState<String> timeSate;@Overridepublic void processElement(Tuple4<String, EventSuccessInfo, String, Long> value, Context ctx, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {//获取格林威治标准时间的第二天00:00:00即获取北京时间的第二天08:00:00
// long ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (1000 * 60 * 60 * 24);//获取北京时间的第二天00:00:00long ts = ( ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24)- 8 * 60 * 60 * 1000;// long ts = 1677054000000L;//如果注册相同数据的TimeTimer,后面的会将前面的覆盖,即相同的timeTimer只会触发一次ctx.timerService().registerProcessingTimeTimer(ts);out.collect(value);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {//定时器质性,每天凌晨更新开始时间
// System.out.println(timestamp);System.out.println("定时器执行了:" + timestamp);//状态初始化timeSate.clear();startDate = sd1.parse(sd.format(new Date()));System.out.println(startDate);
// startDate = sd1.parse("2023-02-01");}});SingleOutputStreamOperator<Tuple4<String, String, String, Long>> jsonString = process.map(new MapFunction<Tuple4<String, EventSuccessInfo, String, Long>, Tuple4<String, String, String, Long>>() {@Overridepublic Tuple4<String, String, String, Long> map(Tuple4<String, EventSuccessInfo, String, Long> value) throws Exception {return new Tuple4<String, String, String, Long>(value._1(),JSON.toJSONString(value._2()),value._3(),value._4());}});jsonString.print();
// jsonString.addSink(new EventOmsSuccessSink());env.execute("EventOrderSuccess===>");}
}
3、注意:Roaring64Bitmap 去重只适合去重整形情况
相关文章:
Flink RoaringBitmap去重
1、RoaringBitmap的依赖 <!-- 去重大哥--> <dependency><groupId>org.roaringbitmap</groupId><artifactId>RoaringBitmap</artifactId><version>0.9.21</version> </dependency> 2、Demo去重 package com.gwm.driver…...
Elasticsearch—(MacOs)
1⃣️环境准备 准备 Java 环境:终端输入 java -version 命令来确认版本是否符合 Elasticsearch 要求下载并解压 Elasticsearch:前往(https://www.elastic.co/downloads/elasticsearch)选择适合你的 Mac 系统的 Elasticsearch 版本…...
插入排序与希尔排序
个人主页:Lei宝啊 愿所有美好如期而遇 前言: 这两个排序在思路上有些相似,所以有人觉得插入排序和希尔排序差别不大,事实上,他们之间的差别不小,插入排序只是希尔排序的最后一步。 目录 前言:…...
C# OpenCvSharp 基于直线检测的文本图像倾斜校正
效果 项目 代码 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using OpenCvSharp;namespace OpenCvSharp_基于直线检测的文…...
“智慧时代的引领者:探索人工智能的无限可能性“
目录 一.背景 二.应用 2.1金融领域 2.2医疗领域 2.3教育领域 三.发展 四.总结: 一.背景 人工智能(Artificial Intelligence,简称AI),是指通过计算机程序模拟人类智能的一种技术。它是计算机科学、工程学、语言学、哲学等多…...
PMSM——转子位置估算基于QPLL
文章目录 前言仿真模型观测器速度观测位置观测转矩波形电流波形 前言 今后是电机控制方向的研究生的啦,期待有同行互相交流。 仿真模型 观测器 速度观测 位置观测 转矩波形 电流波形...
Android Studio之Gradle和Gradle插件的区别
解释的很详细 Android Studio之Gradle和Gradle插件的区别...
DataExcel控件读取和保存excel xlsx 格式文件
需要引用NPOI库 https://github.com/dotnetcore/NPOI 调用Read 函数将excel读取到dataexcel控件 调用Save 函数将dataexcel控件文件保存为excel文件 using NPOI.HSSF.UserModel; using NPOI.HSSF.Util; using NPOI.SS.UserModel; using NPOI.SS.Util; using System; using …...
【JavaEE】CAS(Compare And Swap)操作
文章目录 什么是 CASCAS 的应用如何使用 CAS 操作实现自旋锁CAS 的 ABA 问题CAS 相关面试题 什么是 CAS CAS(Compare and Swap)是一种原子操作,用于在无锁情况下保证数据一致性的问题。它包含三个操作数——内存位置、预期原值及更新值。在执…...
第三章:最新版零基础学习 PYTHON 教程(第三节 - Python 运算符—Python 中的关系运算符)
关系运算符用于比较值。它根据条件返回 True 或 False。这些运算符也称为比较运算符。 操作员描述 句法> 大于:如果左操作数大于右操作数,则为 Truex > y...
【GDB】使用 GDB 自动画红黑树
阅读本文前需要的基础知识 用 python 扩展 gdb python 绘制 graphviz 使用 GDB 画红黑树 前面几节中介绍了 gdb 的 python 扩展,参考 用 python 扩展 gdb 并且 python 有 graphviz 模块,那么可以用 gdb 调用 python,在 python 中使用 grap…...
使用Vue3+elementPlus的Tree组件实现一个拖拽文件夹管理
文章目录 1、前言2、分析3、实现4、踩坑4.1、拖拽辅助线的坑4.2、数据的坑4.3、限制拖拽4.4、样式调整 1、前言 最近在做一个文件夹管理的功能,要实现一个树状的文件夹面板。里面包含两种元素,文件夹以及文件。交互要求如下: 创建、删除&am…...
小谈设计模式(7)—装饰模式
小谈设计模式(7)—装饰模式 专栏介绍专栏地址专栏介绍 装饰模式装饰模式角色Component(抽象组件)ConcreteComponent(具体组件)Decorator(抽象装饰器)ConcreteDecorator(具…...
nginx 多层代理 + k8s ingress 后端服务获取客户真实ip 配置
1.nginx http 七层代理 修改命令空间: namespace: nginx-ingress : configmap:nginx-configuration kubectl get cm nginx-configuration -n ingress-nginx -o yaml添加如上配置 compute-full-forwarded-for: “true” forwarded-for-header: X-Forwa…...
6种最常用的3D点云语义分割AI模型对比
由于增强现实/虚拟现实的发展及其在计算机视觉、自动驾驶和机器人领域的广泛应用,点云学习最近引起了人们的关注。 深度学习已成功用于解决 2D 视觉问题,然而,由于其处理面临独特的挑战,深度学习技术在点云上的使用仍处于起步阶段…...
UG NX二次开发(C#)-获取UI中选择对象的handle值
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 1、前言2、设计一个简单的UI界面3、创建工程项目4、测试结果1、前言 我在哔哩哔哩的视频中看到有人问我如何获取UI选择对象的Handle,本来想把Tag、Taggedobject、Handle三者的关系讲一下,然后看…...
win10,WSL的Ubuntu配python3.7手记
1.装linux 先在windows上安装WSL版本的Ubuntu Windows10系统安装Ubuntu子系统_哔哩哔哩_bilibili (WSL2什么的一直没搞清楚) 图形界面会出一些问题,注意勾选ccsm出的界面设置 win10安装Ubuntu16.04子系统,并开启桌面环境_win…...
02-Zookeeper实战
上一篇:01-Zookeeper特性与节点数据类型详解 1. zookeeper安装 Step1: 配置JAVA环境,检验环境: java -versionStep2: 下载解压 zookeeper wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeepe…...
【C语言深入理解指针(1)】
1.内存和地址 1.1内存 在讲内存和地址之前,我们想有个⽣活中的案例: 假设有⼀栋宿舍楼,把你放在楼⾥,楼上有100个房间,但是房间没有编号,你的⼀个朋友来找你玩,如果想找到你,就得挨…...
模拟实现简单的通讯录
前言:生活中处处都会看到或是用到通讯录,今天我们就通过C语言来简单的模拟实现一下通讯录。 鸡汤:跨越山海,终见曙光! 链接:gitee仓库:代码链接 目录 主函数声明部分初始化通讯录实现扩容的函数增加通讯录所…...
Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...
Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...
SpringAI实战:ChatModel智能对话全解
一、引言:Spring AI 与 Chat Model 的核心价值 🚀 在 Java 生态中集成大模型能力,Spring AI 提供了高效的解决方案 🤖。其中 Chat Model 作为核心交互组件,通过标准化接口简化了与大语言模型(LLM࿰…...
flow_controllers
关键点: 流控制器类型: 同步(Sync):发布操作会阻塞,直到数据被确认发送。异步(Async):发布操作非阻塞,数据发送由后台线程处理。纯同步(PureSync…...
goreplay
1.github地址 https://github.com/buger/goreplay 2.简单介绍 GoReplay 是一个开源的网络监控工具,可以记录用户的实时流量并将其用于镜像、负载测试、监控和详细分析。 3.出现背景 随着应用程序的增长,测试它所需的工作量也会呈指数级增长。GoRepl…...
【题解-洛谷】P10480 可达性统计
题目:P10480 可达性统计 题目描述 给定一张 N N N 个点 M M M 条边的有向无环图,分别统计从每个点出发能够到达的点的数量。 输入格式 第一行两个整数 N , M N,M N,M,接下来 M M M 行每行两个整数 x , y x,y x,y,表示从 …...
第2课 SiC MOSFET与 Si IGBT 静态特性对比
2.1 输出特性对比 2.2 转移特性对比 2.1 输出特性对比 器件的输出特性描述了当温度和栅源电压(栅射电压)为某一具体数值时,漏极电流(集电极电流...
