flink学习(14)—— 双流join
概述
Join:内连接
CoGroup:内连接,左连接,右连接
Interval Join:点对面
Join
1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。
2、Join 可以支持处理时间(processing time)和事件时间(event time)两种时间特征。
3、Join 通用用法如下:stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)
滚动窗口
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 内连接* 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2*/
public class _01_双流join_join_内连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 双流joinDataStream<Tuple3<String, Integer, Integer>> rsSource = greenSource.join(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}// 滚动窗口}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();}
}
滑动窗口
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @基本功能: 演示join的滑动窗口* @program:FlinkDemo* @author: 闫哥* @create:2024-05-20 09:11:13**/
public class Demo02Join {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 将并行度设置为1,否则很难看到现象env.setParallelism(1);// 创建一个绿色的流DataStreamSource<String> greenSource = env.socketTextStream("localhost", 8899);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenDataStream = greenSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。String time = element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 创建一个橘色的流DataStreamSource<String> orangeSource = env.socketTextStream("localhost", 9988);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperator<Tuple3<String, Integer, String>> orangeDataStream = orangeSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。String time = element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));//2. source-加载数据//3. transformation-数据处理转换DataStream<Tuple3<String, Integer, Integer>> resultStream = greenDataStream.join(orangeDataStream).where(tuple3 -> tuple3.f0).equalTo(tuple3 -> tuple3.f0)// 滑动窗口.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});//4. sink-数据输出greenDataStream.print("绿色的流:");orangeDataStream.print("橘色的流:");resultStream.print("最终的结果:");//5. execute-执行env.execute();}
}
CoGroup
1、优势:可以实现内连接,左连接,右连接
2、劣势:内存压力大
3、和上面的写法区别:将join换成coGroup,apply中实现的具体方法有区别
4、流程
stream.coGroup(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<CoGroupFunction>);
内连接
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 内连接*/
public class _02_双流join_CoGroup_内连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 连接DataStream<Tuple3<String, String, String>> rsSource = greenSource.coGroup(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, String, String>>() {@Overridepublic void coGroup(Iterable<Tuple3<String, Integer, String>> first, Iterable<Tuple3<String, Integer, String>> second, Collector<Tuple3<String, String, String>> out) throws Exception {for (Tuple3<String, Integer, String> firesTuple3 : first) {for (Tuple3<String, Integer, String> secondTuple3 : second) {out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red"+secondTuple3.f1));}}}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();}
}
外连接
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 外连接*/
public class _03_双流join_CoGroup_外连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));DataStream<Tuple3<String, String, String>> rsSource = greenSource.coGroup(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, String, String>>() {@Overridepublic void coGroup(Iterable<Tuple3<String, Integer, String>> first, Iterable<Tuple3<String, Integer, String>> second, Collector<Tuple3<String, String, String>> out) throws Exception {// 内连接,左连接,右连接的区别只在这里面存在,两层循环for (Tuple3<String, Integer, String> firesTuple3 : first) {boolean isExist = false;for (Tuple3<String, Integer, String> secondTuple3 : second) {isExist = true;out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red"+secondTuple3.f1));}if (!isExist){out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red null"));}}}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();}
}
Interval Join

1、Join以及CoGroup 原因是 Join和CoGroup是窗口Join,必须给定窗口
2、Interval Join不需要给窗口。Interval Join 必须先分组才能使用。
3、先对数据源进行keyBy
4、 外流.intervalJoin(内流).between(-2,2).processbetween 左不包,右包
内部的流为下面的流(取单个值)
代码实现
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;public class _04_双流join_Interval_Join {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//绿色的流DataStreamSource<String> source = env.socketTextStream("localhost", 7777);KeyedStream<Tuple3<String, Integer, String>, String> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}// 水印}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}// keyBy})).keyBy(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}});// 红色的流DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);KeyedStream<Tuple3<String, Integer, String>, String> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] split = line.split(",");return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}// 水印}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {String timeStr = tuple3.f2;try {Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}// 分组})).keyBy(new KeySelector<Tuple3<String, Integer, String>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {return tuple3.f0;}});// 实现SingleOutputStreamOperator<String> rsSource = greenSource.intervalJoin(redSource).between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {@Overridepublic void processElement(Tuple3<String, Integer, String> left, Tuple3<String, Integer, String> right, ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>.Context ctx, Collector<String> out) throws Exception {out.collect("left中的key:"+left.f0+",value="+left.f1+",time="+left.f2+",right中的key:"+right.f0+",value="+right.f1+",time="+right.f2);}});redSource.print("红色的流:");greenSource.print("绿色的流:");rsSource.print("合并后的流:");env.execute();
/*** 红色的为下面的流* 范围:* 假如现在是10* 9 10 11 12*/}
}
相关文章:
flink学习(14)—— 双流join
概述 Join:内连接 CoGroup:内连接,左连接,右连接 Interval Join:点对面 Join 1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。 2、Join 可以支持处理时间(processing time)和事件时…...
HTTP协议详解:从HTTP/1.0到HTTP/3的演变与优化
深入浅出:从头到尾全面解析HTTP协议 一、HTTP协议概述 1.1 HTTP协议简介 HTTP(HyperText Transfer Protocol,超文本传输协议)是互联网上应用最广泛的通信协议之一。它用于客户端与服务器之间的数据传输,尤其是在Web…...
张量并行和流水线并行在Transformer中的具体部位
目录 张量并行和流水线并行在Transformer中的具体部位 一、张量并行 二、流水线并行 张量并行和流水线并行在Transformer中的具体部位 张量并行和流水线并行是Transformer模型中用于提高训练效率的两种并行策略。它们分别作用于模型的不同部位,以下是对这两种并行的具体说…...
WEB开发: 丢掉包袱,拥抱ASP.NET CORE!
今天的 Web 开发可以说进入了一个全新的时代,前后端分离、云原生、微服务等等一系列现代技术架构应运而生。在这个背景下,作为开发者,你一定希望找到一个高效、灵活、易于扩展且具有良好性能的框架。那么,ASP.NET Core 显然是一个…...
【论文阅读】Federated learning backdoor attack detection with persistence diagram
目的:检测联邦学习环境下,上传上来的模型是不是恶意的。 1、将一个模型转换为|L|个PD,(其中|L|为层数) 如何将每一层转换成一个PD? 为了评估第𝑗层的激活值,我们需要𝑐个输入来获…...
Gooxi Eagle Stream 2U双路通用服务器:性能强劲 灵活扩展 稳定易用
人工智能的高速发展开启了飞轮效应,实施数字化变革成为了企业的一道“抢答题”和“必答题”,而数据已成为现代企业的命脉。以HPC和AI为代表的新业务就像节节攀高的树梢,象征着业务创新和企业成长。但在树梢之下,真正让企业保持成长…...
【计算机网络】实验2:总线型以太网的特性
实验 2:总线型以太网的特性 一、 实验目的 加深对MAC地址,IP地址,ARP协议的理解。 了解总线型以太网的特性(广播,竞争总线,冲突)。 二、 实验环境 • Cisco Packet Tracer 模拟器 三、 实…...
如何在Spark中使用gbdt模型分布式预测
这目录 1 训练gbdt模型2 第三方包python环境打包3 Spark中使用gbdt模型3.1 spark配置文件3.2 主函数main.py 4 spark任务提交 1 训练gbdt模型 我们可以基于lightgbm快速的训练一个gbdt模型,训练相对比较简单,只要把训练样本处理好,几行代码可…...
Qt-5.14.2 example
官方历程很丰富,modbus、串口、chart图表、3D、视频 共享方便使用 Building and Running an Example You can test that your Qt installation is successful by opening an existing example application project. To run an example application on an Android …...
virtualbox给Ubuntu22创建共享文件夹
1.在windows上的操作,创建共享文件夹Share 2.Ubuntu22上的操作,创建共享文件夹LinuxShare 3.在virtualbox虚拟机设置里,设置共享文件夹 共享文件夹路径:选择Windows系统中你需要共享的文件夹 共享文件夹名称:挂载至wi…...
GPT打字机效果—— fetchEventSouce进行sse流式请求
EventStream基本用法 与 WebSocket 不同的是,服务器发送事件是单向的。数据消息只能从服务端到发送到客户端(如用户的浏览器)。这使其成为不需要从客户端往服务器发送消息的情况下的最佳选择。 const evtSource new EventSource(“/api/v1/…...
SpringBoot 在线家具商城:设计考量与实现细节聚焦
第4章 系统设计 市面上设计比较好的系统都有一个共同特征,就是主题鲜明突出。通过对页面简洁清晰的布局,让页面的内容,包括文字语言,或者视频图片等元素可以清晰表达出系统的主题。让来访用户无需花费过多精力和时间找寻需要的内容…...
每日速记10道java面试题07
其他资料: 每日速记10道java面试题01-CSDN博客 每日速记10道java面试题02-CSDN博客 每日速记10道java面试题03-CSDN博客 每日速记10道java面试题04-CSDN博客 每日速记10道java面试题05-CSDN博客 每日速记10道java面试题06-CSDN博客 目录 1.线程的生命周期在j…...
前端面试热门题(二)[html\css\js\node\vue)
Vue 性能优化的方法 Vue 性能优化的方法多种多样,以下是一些常用的策略: 使用v-show替换v-if:v-show是通过CSS控制元素的显示与隐藏,而v-if是通过操作DOM来控制元素的显示与隐藏,频繁操作DOM会导致性能下降。因此&am…...
mvc基础及搭建一个静态网站
mvc asp.net core mvc环境 .net8vscode * Asp.Net Core 基础* .net8* 前辈* .net 4.9 非跨平台版本 VC* 跨平台版本* 1.0* 2.0* 2.1* 3.1* 5* 语言* C#* F# * Visual Basic* 框架* web应用* asp应用* WebFrom* mvc应用* 桌面应用* Winform* WPF* Web Api api应用或者叫服务* …...
AOSP的同步问题
repo sync同步时提示出错: error: .repo/manifests/: contains uncommitted changesRepo command failed due to the following UpdateManifestError errors: contains uncommitted changes解决方法: 1、cd 进入.repo/manifests cd .repo/manifests2、执行如下三…...
HarmonyOS4+NEXT星河版入门与项目实战(23)------实现手机游戏摇杆功能
文章目录 1、案例效果2、案例实现1、代码实现2、代码解释4、总结1、案例效果 2、案例实现 1、代码实现 代码如下(示例): import router from @ohos.router import {ResizeDirection } from @ohos.UiTest import curves...
Logistic Regression(逻辑回归)、Maximum Likelihood Estimatio(最大似然估计)
Logistic Regression(逻辑回归)、Maximum Likelihood Estimatio(最大似然估计) 逻辑回归(Logistic Regression,LR)逻辑回归的基本思想逻辑回归模型逻辑回归的目标最大似然估计优化方法 逻辑回归…...
Vue文字转语音实现
在开发流程中,面对语音支持的需求,小规模语音内容或许可以通过预处理后播放来轻松应对,但当涉及大量语音时,这一方法就显得繁琐低效了。为此,智慧的开发者们总能找到便捷的解决方案——利用Web技术实现语音播放&#x…...
Docker快速部署RabbitMq
在外网服务器拉取镜像 docker pull arm64v8/rabbitmq:3.8.9-management或者拉去我的服务器的 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/linux_arm64_rabbitmq:3.8.9-management重新命名 docker tag registry.cn-hangzhou.aliyuncs.com/qiluo-images/lin…...
未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
VB.net复制Ntag213卡写入UID
本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...
2024年赣州旅游投资集团社会招聘笔试真
2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
