当前位置: 首页 > news >正文

19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4

26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录

  • Flink 系列文章
    • 5、聚合函数
      • 1)、示例
    • 6、表值聚合函数
      • 1)、示例1- 计算topN
      • 2)、示例2 - emitUpdateWithRetract 方法使用(老版本Planner可用)


本文介绍了标量聚合函数和表值聚合函数的自定义实现,分别以具体的示例进行展示。特别需要提醒的是表值聚合函数自定义实现时针对emitValue和emitUpdateWithRetract方法的不同版本实现要求,该处在其官网上没有特别的说明,会导致运行异常,具体原因及解决办法在示例2emitUpdateWithRetract中有说明。
本文依赖flink集群能正常使用。
本文分为2个部分,即标量聚合函数以及表值聚合函数的自定义实现。
本文的示例如无特殊说明则是在Flink 1.17版本中运行。

5、聚合函数

自定义聚合函数(UDAGG)是把一个表(一行或者多行,每行可以有一列或者多列)聚合成一个标量值。
在这里插入图片描述
上面的图片展示了一个聚合的例子。
假设有一个关于饮料的表。表里面有三个字段,分别是 id、name、price,表里有 5 行数据。
假设需要找到所有饮料里最贵的饮料的价格,即执行一个 max() 聚合。
需要遍历所有 5 行数据,而结果就只有一个数值。

自定义聚合函数是通过扩展 AggregateFunction 来实现的。AggregateFunction 的工作过程如下。
首先,它需要一个 accumulator,它是一个数据结构,存储了聚合的中间结果。通过调用 AggregateFunction 的 createAccumulator() 方法创建一个空的 accumulator。
接下来,对于每一行数据,会调用 accumulate() 方法来更新 accumulator。
当所有的数据都处理完了之后,通过调用 getValue 方法来计算和返回最终的结果。

下面几个方法是每个 AggregateFunction 必须要实现的:

  • createAccumulator()
  • accumulate()
  • getValue()

Flink 的类型推导在遇到复杂类型的时候可能会推导出错误的结果,比如那些非基本类型和普通的 POJO 类型的复杂类型。所以跟 ScalarFunction 和 TableFunction 一样,AggregateFunction 也提供了 AggregateFunction#getResultType() 和 AggregateFunction#getAccumulatorType() 来分别指定返回值类型和 accumulator 的类型,两个函数的返回值类型也都是 TypeInformation。

除了上面的方法,还有几个方法可以选择实现。这些方法有些可以让查询更加高效,而有些是在某些特定场景下必须要实现的。
例如,如果聚合函数用在会话窗口(当两个会话窗口合并的时候需要 merge 他们的 accumulator)的话,merge() 方法就是必须要实现的。

AggregateFunction 的以下方法在某些场景下是必须实现的:

  • retract() 在 bounded OVER 窗口中是必须实现的。
  • merge() 在许多批式聚合和会话以及滚动窗口聚合中是必须实现的。除此之外,这个方法对于优化也很多帮助。例如,两阶段聚合优化就需要所有的 AggregateFunction 都实现 merge 方法。
  • resetAccumulator() 在许多批式聚合中是必须实现的。

AggregateFunction 的所有方法都必须是 public 的,不能是 static 的,而且名字必须跟上面写的一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个函数是在抽象类 AggregateFunction 中定义的,而其他函数都是约定的方法。如果要定义一个聚合函数,你需要扩展 org.apache.flink.table.functions.AggregateFunction,并且实现一个(或者多个)accumulate 方法。accumulate 方法可以重载,每个方法的参数类型不同,并且支持变长参数。

AggregateFunction 的所有方法的详细文档说明如下。


/*** Base class for user-defined aggregates and table aggregates.* 用户定义聚合和表聚合的基类。** @param <T>   the type of the aggregation result.* @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the*             aggregated values which are needed to compute an aggregation result.* 聚合累加器的类型。累加器用于保存计算聚合结果所需的聚合值。*/
public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFunction {/*** Creates and init the Accumulator for this (table)aggregate function.* 创建和初始化aggregate function 的Accumulator 方法** @return the accumulator with the initial value*/public ACC createAccumulator(); // MANDATORY/*** Returns the TypeInformation of the (table)aggregate function's result.* 返回aggregate function的结果类型TypeInformation ** @return The TypeInformation of the (table)aggregate function's result or null if the result*         type should be automatically inferred.* 返回aggregate function的结果类型TypeInformation ,如果结果为null,则会自动推导类型*/public TypeInformation<T> getResultType = null; // PRE-DEFINED/*** Returns the TypeInformation of the (table)aggregate function's accumulator.* 返回aggregate function's accumulator 的类型TypeInformation ** @return The TypeInformation of the (table)aggregate function's accumulator or null if the*         accumulator type should be automatically inferred.*/public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
}/*** Base class for aggregation functions.** @param <T>   the type of the aggregation result* @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the*             aggregated values which are needed to compute an aggregation result.*             AggregateFunction represents its state using accumulator, thereby the state of the*             AggregateFunction must be put into the accumulator.* acc aggregation accumulator的类型,accumulator 用来保存计算聚合结果所需的聚合值。* AggregateFunction使用累加器表示其状态,从而表示AggregateFunction必须放入累加器*/
public abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {/** Processes the input values and update the provided accumulator instance. The method* accumulate can be overloaded with different custom types and arguments. An AggregateFunction* requires at least one accumulate() method.* 处理输入值并更新提供的累加器实例。方法accumulate 可以用不同的自定义类型和参数重载。* 聚合函数至少需要一个accumulate()方法。** @param accumulator  the accumulator which contains the current aggregated results* @param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY/*** Retracts the input values from the accumulator instance. The current design assumes the* inputs are the values that have been previously accumulated. The method retract can be* overloaded with different custom types and arguments. This function must be implemented for* datastream bounded over aggregate.* 收回累加器实例中的输入值。当前设计假设输入是先前累积的值。收回方法可以是重载了不同的自定义类型和参数。* 此功能在datastream的有界流基于over aggregate必须被实现。** @param accumulator the accumulator which contains the current aggregated results* @param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL/*** Merges a group of accumulator instances into one accumulator instance. This function must be* implemented for datastream session window grouping aggregate and bounded grouping aggregate.* 将一组accumulator 实例合并为一个accumulator 实例。* 该函数在datastream session window的分组聚合 和 有界流的分组聚合必须实现。* @param accumulator  the accumulator which will keep the merged aggregate results. It should*                     be noted that the accumulator may contain the previous aggregated*                     results. Therefore user should not replace or clean this instance in the*                     custom merge method. 累加器,用于保存合并后的聚合结果。* 					  应该注意的是,累加器可以包含先前的聚合结果。* 					  因此,用户不应在自定义合并方法中替换或清除此实例。* @param its          an {@link java.lang.Iterable} pointed to a group of accumulators that will be*                     merged.*/public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL/*** Called every time when an aggregation result should be materialized.* The returned value could be either an early and incomplete result* (periodically emitted as data arrive) or the final result of the aggregation.* 每次应该具体化(materialized)聚合结果时调用。* 返回的值可能是早期且不完整的结果(随着数据的到达而定期发出),也可能是聚合的最终结果。** @param accumulator the accumulator which contains the current aggregated results* @return the aggregation result*/public T getValue(ACC accumulator); // MANDATORY/*** Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for* bounded grouping aggregate.* 重置此[[AggregateFunction]]的累加器。必须为有界分组聚合实现此函数。** @param accumulator  the accumulator which needs to be reset*/public void resetAccumulator(ACC accumulator); // OPTIONAL/*** Returns true if this AggregateFunction can only be applied in an OVER window.* 如果此AggregateFunction只能在OVER窗口中应用,则返回true。** @return true if the AggregateFunction requires an OVER window, false otherwise.*/public Boolean requiresOver = false; // PRE-DEFINED
}

1)、示例

该示例包含以下三个功能:

  • 定义一个聚合函数来计算某一列的加权平均
  • 在 TableEnvironment 中注册函数
  • 在查询中使用函数

为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数。
在例子里,定义了一个类 Aalan_WeightedAvgAccum 来作为 accumulator。Flink 的 checkpoint 机制会自动保存 accumulator,在失败时进行恢复,以此来保证精确一次的语义。

例子的WeightedAvgAggregateFunction(自定义聚合函数)的 accumulate 方法有三个输入参数。
第一个是 Aalan_WeightedAvgAccum accumulator
另外两个是用户自定义的输入:输入的值 ivalue(balance) 和 输入的权重 iweight(age)。

尽管 retract()、merge()、resetAccumulator() 这几个方法在大多数聚合类型中都不是必须实现的,样例中提供了他们的实现。

在 Scala 样例中也是用的是 Java 的基础类型,并且定义了 getResultType() 和 getAccumulatorType(),因为 Flink 的类型推导对于 Scala 的类型推导做的不是很好。

import static org.apache.flink.table.api.Expressions.$;import java.util.Arrays;
import java.util.Iterator;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDAGGDemo {// 加权平均累加器bean,加上名称,以示区别,避免混淆public static class Aalan_WeightedAvgAccum {public long sum = 0;public int count = 0;}// 聚合函数的自定义实现,计算加权平均public static class WeightedAvgAggregateFunction extends AggregateFunction<Long, Aalan_WeightedAvgAccum> {/*** 创建和初始化aggregate function 的Accumulator 方法*/@Overridepublic Aalan_WeightedAvgAccum createAccumulator() {return new Aalan_WeightedAvgAccum();}/*** 每次应该具体化(materialized)聚合结果时调用。 返回的值可能是早期且不完整的结果(随着数据的到达而定期发出),也可能是聚合的最终结果。*/@Overridepublic Long getValue(Aalan_WeightedAvgAccum acc) {if (acc.count == 0) {return null;} else {return acc.sum / acc.count;}}/*** 处理输入值并更新提供的累加器实例。方法accumulate 可以用不同的自定义类型和参数重载。聚合函数至少需要一个accumulate()方法。* * @param acc     累加器bean,包含当前汇总结果的累加器* @param iValue  输入的需要的累加值* @param iWeight 输入的需要累加的值的权重*/public void accumulate(Aalan_WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}/*** 收回累加器实例中的输入值。当前设计假设输入是先前累积的值。收回方法可以是重载了不同的自定义类型和参数。 此功能在datastream的有界流基于over* aggregate必须被实现。* * @param acc     累加器bean,包含当前汇总结果的累加器* @param iValue  输入的需要的累加值* @param iWeight 输入的需要累加的值的权重*/public void retract(Aalan_WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum -= iValue * iWeight;acc.count -= iWeight;}/*** 将一组accumulator 实例合并为一个accumulator 实例。 该函数在datastream session window的分组聚合 和* 有界流的分组聚合必须实现。* * @param acc 累加器,用于保存合并后的聚合结果。 应该注意的是,累加器可以包含先前的聚合结果。 因此,用户不应在自定义合并方法中替换或清除此实例。* @param it  指向将被合并的一组累加器的Iterable。*/public void merge(Aalan_WeightedAvgAccum acc, Iterable<Aalan_WeightedAvgAccum> it) {Iterator<Aalan_WeightedAvgAccum> iter = it.iterator();while (iter.hasNext()) {Aalan_WeightedAvgAccum a = iter.next();acc.count += a.count;acc.sum += a.sum;}}/*** 重置此[[AggregateFunction]]的累加器。必须为有界分组聚合实现此函数。* * @param acc*/public void resetAccumulator(Aalan_WeightedAvgAccum acc) {acc.count = 0;acc.sum = 0L;}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private long id;private String name;private int age;private long balance;private Long rowtime;}final static List<User> userList = Arrays.asList(new User(1L, "alan", 18, 20, 1698742358391L), new User(2L, "alan", 19, 25, 1698742359396L),new User(3L, "alan", 25, 30, 1698742360407L), new User(4L, "alanchan", 28, 35, 1698742361409L), new User(5L, "alanchan", 29, 35, 1698742362424L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 将聚合函数注册为函数tenv.registerFunction("alan_weightavgAF", new WeightedAvgAggregateFunction());DataStream<User> users = env.fromCollection(userList);Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"), $("balance"), $("rowtime"));tenv.createTemporaryView("user_view", users);// 使用函数String sql = "SELECT name, alan_weightavgAF(balance, age) AS avgPoints FROM user_view GROUP BY name";Table result = tenv.sqlQuery(sql);DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);resultDS.print();
//		16> (true,+I[alanchan, 35])
//		2> (true,+I[alan, 20])
//		2> (false,-U[alan, 20])
//		2> (true,+U[alan, 22])
//		2> (false,-U[alan, 22])
//		2> (true,+U[alan, 25])env.execute();}}

6、表值聚合函数

自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表,结果中可以有多行多列。
在这里插入图片描述
上图展示了一个表值聚合函数的例子。
假设有一个饮料的表,这个表有 3 列,分别是 id、name 和 price,一共有 5 行。
假设需要找到价格最高的两个饮料,类似于 top2() 表值聚合函数。
需要遍历所有 5 行数据,结果是有 2 行数据的一个表。

用户自定义表值聚合函数是通过扩展 TableAggregateFunction 类来实现的。
一个 TableAggregateFunction 的工作过程如下。
首先,它需要一个 accumulator,这个 accumulator 负责存储聚合的中间结果。 通过调用 TableAggregateFunction 的 createAccumulator 方法来构造一个空的 accumulator。
接下来,对于每一行数据,会调用 accumulate 方法来更新 accumulator。
当所有数据都处理完之后,调用 emitValue 方法来计算和返回最终的结果。

下面几个 TableAggregateFunction 的方法是必须要实现的:

  • createAccumulator()
  • accumulate()

Flink 的类型推导在遇到复杂类型的时候可能会推导出错误的结果,比如那些非基本类型和普通的 POJO 类型的复杂类型。所以类似于 ScalarFunction 和 TableFunction,TableAggregateFunction 也提供了 TableAggregateFunction#getResultType() 和 TableAggregateFunction#getAccumulatorType() 方法来指定返回值类型和 accumulator 的类型,这两个方法都需要返回 TypeInformation。

除了上面的方法,还有几个其他的方法可以选择性的实现。有些方法可以让查询更加高效,而有些方法对于某些特定场景是必须要实现的。比如,在会话窗口(当两个会话窗口合并时会合并两个 accumulator)中使用聚合函数时,必须要实现merge() 方法。

下面几个 TableAggregateFunction 的方法在某些特定场景下是必须要实现的:

  • retract() 在 bounded OVER 窗口中的聚合函数必须要实现。
  • merge() 在许多批式聚合和以及流式会话和滑动窗口聚合中是必须要实现的。
  • resetAccumulator() 在许多批式聚合中是必须要实现的。
  • emitValue() 在批式聚合以及窗口聚合中是必须要实现的。

下面的 TableAggregateFunction 的方法可以提升流式任务的效率:

  • emitUpdateWithRetract() 在 retract 模式下,该方法负责发送被更新的值。

emitValue 方法会发送所有 accumulator 给出的结果。拿 TopN 来说,emitValue 每次都会发送所有的最大的 n 个值。这在流式任务中可能会有一些性能问题。为了提升性能,用户可以实现 emitUpdateWithRetract 方法。这个方法在 retract 模式下会增量的输出结果,比如有数据更新了,我们必须要撤回老的数据,然后再发送新的数据。如果定义了 emitUpdateWithRetract 方法,那它会优先于 emitValue 方法被使用,因为一般认为 emitUpdateWithRetract 会更加高效,因为它的输出是增量的。

TableAggregateFunction 的所有方法都必须是 public 的、非 static 的,而且名字必须跟上面提到的一样。createAccumulator、getResultType 和 getAccumulatorType 这三个方法是在抽象父类 TableAggregateFunction 中定义的,而其他的方法都是约定的方法。要实现一个表值聚合函数,你必须扩展 org.apache.flink.table.functions.TableAggregateFunction,并且实现一个(或者多个)accumulate 方法。accumulate 方法可以有多个重载的方法,也可以支持变长参数。

TableAggregateFunction 的所有方法的详细文档说明如下,其中部分与AggregateFunction 类似的方法不再赘述。

/*** Base class for user-defined aggregates and table aggregates.** @param <T>   the type of the aggregation result.* @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the*             aggregated values which are needed to compute an aggregation result.*/
public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFunction {/*** Creates and init the Accumulator for this (table)aggregate function.** @return the accumulator with the initial value*/public ACC createAccumulator(); // MANDATORY/*** Returns the TypeInformation of the (table)aggregate function's result.** @return The TypeInformation of the (table)aggregate function's result or null if the result*         type should be automatically inferred.*/public TypeInformation<T> getResultType = null; // PRE-DEFINED/*** Returns the TypeInformation of the (table)aggregate function's accumulator.** @return The TypeInformation of the (table)aggregate function's accumulator or null if the*         accumulator type should be automatically inferred.*/public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
}/*** Base class for table aggregation functions.** @param <T>   the type of the aggregation result* @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the*             aggregated values which are needed to compute a table aggregation result.*             TableAggregateFunction represents its state using accumulator, thereby the state of*             the TableAggregateFunction must be put into the accumulator.*/
public abstract class TableAggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {/** Processes the input values and update the provided accumulator instance. The method* accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction* requires at least one accumulate() method.** @param accumulator           the accumulator which contains the current aggregated results* @param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY/*** Retracts the input values from the accumulator instance. The current design assumes the* inputs are the values that have been previously accumulated. The method retract can be* overloaded with different custom types and arguments. This function must be implemented for* datastream bounded over aggregate.** @param accumulator           the accumulator which contains the current aggregated results* @param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL/*** Merges a group of accumulator instances into one accumulator instance. This function must be* implemented for datastream session window grouping aggregate and bounded grouping aggregate.** @param accumulator  the accumulator which will keep the merged aggregate results. It should*                     be noted that the accumulator may contain the previous aggregated*                     results. Therefore user should not replace or clean this instance in the*                     custom merge method.* @param its          an {@link java.lang.Iterable} pointed to a group of accumulators that will be*                     merged.*/public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL/*** Called every time when an aggregation result should be materialized. The returned value* could be either an early and incomplete result  (periodically emitted as data arrive) or* the final result of the  aggregation.* 每次应该具体化聚合结果时调用。返回的值可能是早期且不完整的结果(随着数据的到达而定期发出),也可能是聚合的最终结果。** @param accumulator the accumulator which contains the current*                    aggregated results* @param out         the collector used to output data*/public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL/*** Called every time when an aggregation result should be materialized. The returned value* could be either an early and incomplete result (periodically emitted as data arrive) or* the final result of the aggregation.** Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.* This method outputs data incrementally in retract mode, i.e., once there is an update, we* have to retract old records before sending new updated ones. The emitUpdateWithRetract* method will be used in preference to the emitValue method if both methods are defined in the* table aggregate function, because the method is treated to be more efficient than emitValue* as it can outputvalues incrementally.* 每次应该具体化聚合结果时调用。返回的值可能是早期且不完整的结果(随着数据的到达而定期发出),也可能是聚合的最终结果。* 与emitValue不同,emitUpdateWithRetract用于发出已更新的值。* 该方法以收回模式递增地输出数据,在发送新的更新记录之前,我们必须收回旧记录。* 如果表聚合函数中定义了两个方法,则emitUpdateWithRetract方法将优先于emitValue方法,* 因为该方法被认为比emitValue更有效,因为它可以增量输出值。* * @param accumulator the accumulator which contains the current*                    aggregated results* @param out         the retractable collector used to output data. Use collect method*                    to output(add) records and use retract method to retract(delete)*                    records.*/public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL/*** Collects a record and forwards it. The collector can output retract messages with the retract* method. Note: only use it in {@code emitRetractValueIncrementally}.*/public interface RetractableCollector<T> extends Collector<T> {/*** Retract a record.** @param record The record to retract.*/void retract(T record);}
}

1)、示例1- 计算topN

下面的例子展示了如何

  • 定义一个 TableAggregateFunction 来计算给定列的最大的 3 个值
  • 在 TableEnvironment 中注册函数
  • 在 Table API 查询中使用函数(当前只在 Table API 中支持 TableAggregateFunction)

为了计算最大的 3 个值,accumulator 需要保存当前看到的最大的 3 个值。
在例子中,定义了类 TopAccum 来作为 accumulator。Flink 的 checkpoint 机制会自动保存 accumulator,并且在失败时进行恢复,来保证精确一次的语义。

TopTableAggregateFunction 表值聚合函数(TableAggregateFunction)的 accumulate() 方法有两个输入,
第一个是 TopAccum accumulator,
另一个是用户定义的输入:输入的值 v。

尽管 merge() 方法在大多数聚合类型中不是必须的,也在样例中提供了它的实现。

在 Scala 样例中也使用的是 Java 的基础类型,并且定义了 getResultType() 和 getAccumulatorType() 方法,因为 Flink 的类型推导对于 Scala 的类型推导支持的不是很好。

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDTAGGDemo {/*** Accumulator for Top3**/@Datapublic static class TopAccum {public Integer first;public Integer second;public Integer third;}public static class TopTableAggregateFunction extends TableAggregateFunction<Tuple2<Integer, Integer>, TopAccum> {@Overridepublic TopAccum createAccumulator() {TopAccum acc = new TopAccum();acc.first = Integer.MIN_VALUE;acc.second = Integer.MIN_VALUE;acc.third = Integer.MIN_VALUE;return acc;}public void accumulate(TopAccum acc, Integer v) {if (v > acc.first) {acc.third = acc.second;acc.second = acc.first;acc.first = v;} else if (v > acc.second) {acc.third = acc.second;acc.second = v;} else if (v > acc.third) {acc.third = v;}}public void merge(TopAccum acc, java.lang.Iterable<TopAccum> iterable) {for (TopAccum otherAcc : iterable) {accumulate(acc, otherAcc.first);accumulate(acc, otherAcc.second);accumulate(acc, otherAcc.third);}}public void emitValue(TopAccum acc, Collector<Tuple2<Integer, Integer>> out) {
//			System.out.println("acc:"+acc);// emit the value and rankif (acc.first != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.first, 1));}if (acc.second != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.second, 2));}if (acc.third != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.third, 3));}}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static List<User> userList = Arrays.asList(new User(1L, "alan", 18, 20, 1698742358391L), new User(2L, "alan", 19, 25, 1698742359396L),new User(3L, "alan", 25, 30, 1698742360407L), new User(4L, "alanchan", 28, 35, 1698742361409L), new User(5L, "alanchan", 29, 35, 1698742362424L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 将聚合函数注册为函数tenv.registerFunction("top", new TopTableAggregateFunction());DataStream<User> users = env.fromCollection(userList);Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"), $("balance"), $("rowtime"));// 使用函数Table result = usersTable.groupBy($("name")).flatAggregate(call("top", $("balance"))).select($("name"), $("f0").as("balance"), $("f1").as("rank"));DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);resultDS.print();
//		2> (true,+I[alan, 20, 1])
//		16> (true,+I[alanchan, 35, 1])
//		2> (false,-D[alan, 20, 1])
//		16> (false,-D[alanchan, 35, 1])
//		2> (true,+I[alan, 25, 1])
//		16> (true,+I[alanchan, 35, 1])
//		2> (true,+I[alan, 20, 2])
//		16> (true,+I[alanchan, 35, 2])
//		2> (false,-D[alan, 25, 1])
//		2> (false,-D[alan, 20, 2])
//		2> (true,+I[alan, 30, 1])
//		2> (true,+I[alan, 25, 2])
//		2> (true,+I[alan, 20, 3])env.execute();}}

2)、示例2 - emitUpdateWithRetract 方法使用(老版本Planner可用)

下面的例子展示了如何使用 emitUpdateWithRetract 方法来只发送更新的数据。
为了只发送更新的结果,accumulator 保存了上一次的最大的3个值,也保存了当前最大的3个值。

如果 TopN 中的 n 非常大,这种既保存上次的结果,也保存当前的结果的方式不太高效。

一种解决这种问题的方式是把输入数据直接存储到 accumulator 中,然后在调用 emitUpdateWithRetract 方法时再进行计算。

需要特别说明的是

下面的示例需要使用到useOldPlanner,对应的planner的maven依赖见下文

<!-- flink执行计划,这是1.9版本之前的--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.13.6</version></dependency>

如果flink的版本比较高的话,下面的示例将不能运行,因为新版本的Builder没有useOldPlanner()方法了,已经移除。不能构造EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();

		//新版本该方法已经被移除@Deprecatedpublic Builder useOldPlanner() {this.plannerClass = OLD_PLANNER_FACTORY;this.executorClass = OLD_EXECUTOR_FACTORY;return this;}

如果使用OldPlanner的话,emitValue和emitUpdateWithRetract仅需定义一个就可以了,并且emitUpdateWithRetract的优先级大于emitValue。但是在Blink Planner里,只看有没有定义emitValue。

也即在Blink Planner中,只能使用emitValue,不能使用emitUpdateWithRetract。
否则会报如下异常
Exception in thread “main” org.apache.flink.table.api.ValidationException: Could not find an implementation method ‘emitValue’ in class ‘org.tablesql.udf.TestUDTAGGDemo2$TopNTableAggregateFunction’ for function ‘TopNTableAggregateFunction’ that matches the following signature:

void emitValue(org.tablesql.udf.TestUDTAGGDemo2.TopNAccum, org.apache.flink.util.Collector)

具体示例如下


import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDTAGGDemo2 {@Datapublic static class TopNAccum {public Integer first;public Integer second;public Integer third;public Integer oldFirst;public Integer oldSecond;public Integer oldThird;}/*** 自定义聚合函数实现* * @author alanchan**/public static class TopNTableAggregateFunction extends TableAggregateFunction<Tuple2<Integer, Integer>, TopNAccum> {@Overridepublic TopNAccum createAccumulator() {TopNAccum topNAccum = new TopNAccum();topNAccum.first = Integer.MIN_VALUE;topNAccum.second = Integer.MIN_VALUE;topNAccum.third = Integer.MIN_VALUE;topNAccum.oldFirst = Integer.MIN_VALUE;topNAccum.oldSecond = Integer.MIN_VALUE;topNAccum.oldThird = Integer.MIN_VALUE;return topNAccum;}public void accumulate(TopNAccum acc, Integer v) {if (v > acc.first) {acc.third = acc.second;acc.second = acc.first;acc.first = v;} else if (v > acc.second) {acc.third = acc.second;acc.second = v;} else if (v > acc.third) {acc.third = v;}}public void emitUpdateWithRetract(TopNAccum acc, RetractableCollector<Tuple2<Integer, Integer>> out) {System.out.println("emitUpdateWithRetract----acc:" + acc);if (!acc.first.equals(acc.oldFirst)) {// if there is an update, retract old value then emit new value.if (acc.oldFirst != Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldFirst, 1));}out.collect(Tuple2.of(acc.first, 1));acc.oldFirst = acc.first;}if (!acc.second.equals(acc.oldSecond)) {// if there is an update, retract old value then emit new value.if (acc.oldSecond != Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldSecond, 2));}out.collect(Tuple2.of(acc.second, 2));acc.oldSecond = acc.second;}if (!acc.third.equals(acc.oldThird)) {// if there is an update, retract old value then emit new value.if (acc.oldThird != Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldThird, 3));}out.collect(Tuple2.of(acc.third, 3));acc.oldThird = acc.third;}}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static List<User> userList = Arrays.asList(new User(1L, "alan", 18, 20, 1698742358391L), new User(2L, "alan", 19, 25, 1698742359396L),new User(3L, "alan", 25, 30, 1698742360407L), new User(11L, "alan", 28, 31, 1698742358391L), new User(12L, "alan", 29, 32, 1698742359396L),new User(13L, "alan", 35, 35, 1698742360407L), new User(23L, "alan", 45, 36, 1698742360407L), new User(14L, "alanchan", 28, 15, 1698742361409L), new User(15L, "alanchan", 29, 16, 1698742362424L), new User(24L, "alanchan", 30, 20, 1698742361409L),new User(25L, "alanchan", 31, 22, 1698742362424L), new User(34L, "alanchan", 32, 24, 1698742361409L), new User(35L, "alanchan", 33, 26, 1698742362424L),new User(44L, "alanchan", 34, 28, 1698742361409L), new User(55L, "alanchan", 35, 35, 1698742362424L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//		env.setParallelism(1);EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);// 将聚合函数注册为函数tenv.registerFunction("topN", new TopNTableAggregateFunction());DataStream<User> users = env.fromCollection(userList);Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"), $("balance"), $("rowtime"));// 使用函数Table result = usersTable.groupBy($("name")).flatAggregate(call("topN", $("balance"))).select($("name"), $("f0").as("balance"), $("f1").as("rank"));DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);resultDS.print();env.execute();}}

运行结果如下:

emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=20, second=-2147483648, third=-2147483648, oldFirst=-2147483648, oldSecond=-2147483648, oldThird=-2147483648)
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=15, second=-2147483648, third=-2147483648, oldFirst=-2147483648, oldSecond=-2147483648, oldThird=-2147483648)
14> (true,+I[alan, 20, 1])
9> (true,+I[alanchan, 15, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=25, second=20, third=-2147483648, oldFirst=20, oldSecond=-2147483648, oldThird=-2147483648)
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=16, second=15, third=-2147483648, oldFirst=15, oldSecond=-2147483648, oldThird=-2147483648)
14> (false,+I[alan, 20, 1])
14> (true,+I[alan, 25, 1])
14> (true,+I[alan, 20, 2])
9> (false,+I[alanchan, 15, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=30, second=25, third=20, oldFirst=25, oldSecond=20, oldThird=-2147483648)
14> (false,+I[alan, 25, 1])
9> (true,+I[alanchan, 16, 1])
14> (true,+I[alan, 30, 1])
14> (false,+I[alan, 20, 2])
9> (true,+I[alanchan, 15, 2])
14> (true,+I[alan, 25, 2])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=20, second=16, third=15, oldFirst=16, oldSecond=15, oldThird=-2147483648)
14> (true,+I[alan, 20, 3])
9> (false,+I[alanchan, 16, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=31, second=30, third=25, oldFirst=30, oldSecond=25, oldThird=20)
9> (true,+I[alanchan, 20, 1])
9> (false,+I[alanchan, 15, 2])
14> (false,+I[alan, 30, 1])
9> (true,+I[alanchan, 16, 2])
14> (true,+I[alan, 31, 1])
9> (true,+I[alanchan, 15, 3])
14> (false,+I[alan, 25, 2])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=22, second=20, third=16, oldFirst=20, oldSecond=16, oldThird=15)
14> (true,+I[alan, 30, 2])
9> (false,+I[alanchan, 20, 1])
14> (false,+I[alan, 20, 3])
9> (true,+I[alanchan, 22, 1])
14> (true,+I[alan, 25, 3])
9> (false,+I[alanchan, 16, 2])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=32, second=31, third=30, oldFirst=31, oldSecond=30, oldThird=25)
9> (true,+I[alanchan, 20, 2])
9> (false,+I[alanchan, 15, 3])
9> (true,+I[alanchan, 16, 3])
14> (false,+I[alan, 31, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=24, second=22, third=20, oldFirst=22, oldSecond=20, oldThird=16)
14> (true,+I[alan, 32, 1])
9> (false,+I[alanchan, 22, 1])
14> (false,+I[alan, 30, 2])
9> (true,+I[alanchan, 24, 1])
9> (false,+I[alanchan, 20, 2])
14> (true,+I[alan, 31, 2])
9> (true,+I[alanchan, 22, 2])
14> (false,+I[alan, 25, 3])
9> (false,+I[alanchan, 16, 3])
14> (true,+I[alan, 30, 3])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=35, second=32, third=31, oldFirst=32, oldSecond=31, oldThird=30)
9> (true,+I[alanchan, 20, 3])
14> (false,+I[alan, 32, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=26, second=24, third=22, oldFirst=24, oldSecond=22, oldThird=20)
14> (true,+I[alan, 35, 1])
9> (false,+I[alanchan, 24, 1])
14> (false,+I[alan, 31, 2])
9> (true,+I[alanchan, 26, 1])
14> (true,+I[alan, 32, 2])
9> (false,+I[alanchan, 22, 2])
14> (false,+I[alan, 30, 3])
9> (true,+I[alanchan, 24, 2])
14> (true,+I[alan, 31, 3])
9> (false,+I[alanchan, 20, 3])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=36, second=35, third=32, oldFirst=35, oldSecond=32, oldThird=31)
9> (true,+I[alanchan, 22, 3])
14> (false,+I[alan, 35, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=28, second=26, third=24, oldFirst=26, oldSecond=24, oldThird=22)
14> (true,+I[alan, 36, 1])
9> (false,+I[alanchan, 26, 1])
14> (false,+I[alan, 32, 2])
9> (true,+I[alanchan, 28, 1])
14> (true,+I[alan, 35, 2])
9> (false,+I[alanchan, 24, 2])
9> (true,+I[alanchan, 26, 2])
14> (false,+I[alan, 31, 3])
9> (false,+I[alanchan, 22, 3])
14> (true,+I[alan, 32, 3])
9> (true,+I[alanchan, 24, 3])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first=35, second=28, third=26, oldFirst=28, oldSecond=26, oldThird=24)
9> (false,+I[alanchan, 28, 1])
9> (true,+I[alanchan, 35, 1])
9> (false,+I[alanchan, 26, 2])
9> (true,+I[alanchan, 28, 2])
9> (false,+I[alanchan, 24, 3])
9> (true,+I[alanchan, 26, 3])

以上,介绍了标量聚合函数和表值聚合函数的自定义实现,分别以具体的示例进行展示。特别需要提醒的是表值聚合函数自定义实现时针对emitValue和emitUpdateWithRetract方法的不同版本实现要求,该处在其官网上没有特别的说明,会导致运行异常,具体原因及解决办法在示例2emitUpdateWithRetract中有说明。

相关文章:

19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…...

Flutter IOS 前后台切换主题自动变化的问题

BUG 触发条件 设备 IOS 15 模拟器GetX 实现换肤GetMaterialApp 里面配置好 theme和darkTheme使用GetView和GetController进行开发 此时如果把App前后台切换&#xff0c;使用Obx包括起来的内容会跟谁异常主题变换&#xff0c;未使用Obx的颜色不会变化。 解决路径 首先在获取 …...

rabbitmq入门学习

写在前面 本文看下rabbit mq的基础概念以及使用。 1&#xff1a;简单介绍 为了不同进程间通信的解耦&#xff0c;出现了消息队列&#xff0c;为了规范消息队列的具体实现&#xff0c;Java制定了jms规范&#xff0c;这是一套基于接口的规范&#xff0c;因此是绑定语言的&…...

说说对Fiber架构的理解?解决了什么问题?

一、问题 JavaScript引擎和页面渲染引擎两个线程是互斥的&#xff0c;当其中一个线程执行时&#xff0c;另一个线程只能挂起等待 如果 JavaScript 线程长时间地占用了主线程&#xff0c;那么渲染层面的更新就不得不长时间地等待&#xff0c;界面长时间不更新&#xff0c;会导…...

Spring Security笔记

Spring Security 是 Spring家族中的一个安全管理框架。 一般来说中大型的项目都是使用 SpringSecurity 来做安全框架&#xff0c;小项目用相对简单的Shiro。认证、授权是 SpringSecurity 作为安全框架的核心功能。 认证&#xff1a;通过用户名密码验证当前访问系统的是不是本…...

快速教程|如何在 AWS EC2上使用 Walrus 部署 GitLab

Walrus 是一款基于平台工程理念的开源应用管理平台&#xff0c;致力于解决应用交付领域的深切痛点。借助 Walrus 将云原生的能力和最佳实践扩展到非容器化环境&#xff0c;并支持任意应用形态统一编排部署&#xff0c;降低使用基础设施的复杂度&#xff0c;为研发和运维团队提供…...

[vmware]vmware虚拟机压缩空间清理空间

vmware中的ubuntu使用如果拷贝文件进去在删除&#xff0c;vmare镜像文件并不会减少日积月累会不断是的真实物理磁盘空间大幅度减少&#xff0c;比如我以前windows操作系统本来只有30GB最后居然占道硬盘200GB&#xff0c;清理方法有2种。 第一种&#xff1a;vmware界面操作 第二…...

一篇文章带你使用(MMKV--基于 mmap 的高性能通用 key-value 组件)

一、MMKV是什么&#xff1f; MMKV 是基于 mmap 内存映射的 key-value 组件&#xff0c;底层序列化/反序列化使用 protobuf 实现&#xff0c;性能高&#xff0c;稳定性强。也是腾讯微信团队使用的技术。 支持的数据类型 支持以下 Java 语言基础类型&#xff1a; boolean、int…...

Pytorch 里面torch.no_grad 和model.eval(), model.train() 的作用

torch.no_grad: 影响模型的自微分器&#xff0c;使得其停止工作&#xff1b;这样的话&#xff0c;数据计算的数据就会变快&#xff0c;内存占用也会变小&#xff0c;因为没有了反向梯度计算&#xff0c;当然&#xff0c;我哦们也无法做反向传播。 model.eval() 和model.train()…...

Ozon产品内容评级功能上线,妙手ERP实力助力Ozon卖家全方位打造爆款产品!

产品内容评级&#xff0c;可以直接反映产品质量的高低&#xff0c;也是影响产品排名的关键。具有较高内容评级的产品&#xff0c;将有更大机会显示在搜索结果和类目的前几页中&#xff0c;从而引起买家的关注&#xff0c;促进销售。 为帮助卖家打造高质量产品&#xff0c;妙手…...

Linux 下最主流的文件系统格式——ext

硬盘分成相同大小的单元&#xff0c;我们称为块&#xff08;Block&#xff09;。一块的大小是扇区大小的整数倍&#xff0c;默认是 4K。在格式化的时候&#xff0c;这个值是可以设定的。 一大块硬盘被分成了一个个小的块&#xff0c;用来存放文件的数据部分。这样一来&#xf…...

变量环境、变量提升和暂时性死区

JavaScript中的提升 在JavaScript中&#xff0c;“Hoisting”&#xff08;提升&#xff09;是一种特性&#xff0c;它将变量和函数的声明移动到作用域的顶部。这意味着可以在声明之前使用这些变量和函数&#xff0c;而不会报错。 当JavaScript代码执行时&#xff0c;会经过两个…...

yolov8+多算法多目标追踪+实例分割+目标检测+姿态估计(代码+教程)

多目标追踪实例分割目标检测 YOLO (You Only Look Once) 是一个流行的目标检测算法&#xff0c;它能够在图像中准确地定位和识别多个物体。 本项目是基于 YOLO 算法的目标跟踪系统&#xff0c;它将 YOLO 的目标检测功能与目标跟踪技术相结合&#xff0c;实现了实时的多目标跟…...

【神经网络】【GoogleNet】

1、引言 卷积神经网络是当前最热门的技术&#xff0c;我想深入地学习这门技术&#xff0c;从他的发展历史开始&#xff0c;了解神经网络算法的兴衰起伏&#xff1b;同时了解他在发展过程中的**里程碑式算法**&#xff0c;能更好的把握神经网络发展的未来趋势&#xff0c;了解神…...

网络安全深入学习第八课——正向代理(工具:ReGeorg)

文章目录 一、环境配置二、开始模拟1、拿下跳板机的Webshell权限&#xff0c;并上传shell文件1.1、查看跳板机网络环境1.2、查看arp表 2、使用ReGeorg来建立连接2.1、生产ReGeorg隧道文件2.2、上传ReGeorg隧道的PHP脚本到跳板机2.3、连接隧道2.4、尝试浏览器连接 3、使用Proxif…...

Jmeter全流程性能测试实战

项目背景&#xff1a; 我们的平台为全国某行业监控平台&#xff0c;经过3轮功能测试、接口测试后&#xff0c;98%的问题已经关闭&#xff0c;决定对省平台向全国平台上传数据的接口进行性能测试。 01、测试步骤 1、编写性能测试方案 由于我是刚进入此项目组不久&#xff0c…...

Python算法例8 将整数A转换为B

1. 问题描述 给定整数A和B&#xff0c;求出将整数A转换为B&#xff0c;需要改变bit的位数。 2. 问题示例 把31转换为14&#xff0c;需要改变2个bit位&#xff0c;即&#xff1a;&#xff08;31&#xff09;10&#xff08;11111&#xff09;2&#xff0c;&#xff08;14&…...

一个基于百度飞桨封装的.NET版本OCR工具类库 - PaddleOCRSharp

前言 大家有使用过.NET开发过OCR工具吗&#xff1f;今天给大家推荐一个基于百度飞桨封装的.NET版本OCR工具类库&#xff1a;PaddleOCRSharp。 OCR工具有什么用&#xff1f; OCR&#xff08;Optical Character Recognition&#xff09;工具可以将图像或扫描文件中的文本内容转…...

在 CelebA 数据集上训练的 PyTorch 中的基本变分自动编码器

摩西西珀博士 一、说明 我最近发现自己需要一种方法将图像编码到潜在嵌入中&#xff0c;调整嵌入&#xff0c;然后生成新图像。有一些强大的方法可以创建嵌入或从嵌入生成。如果你想同时做到这两点&#xff0c;一种自然且相当简单的方法是使用变分自动编码器。 这样的深度网络不…...

利用Ansible实现批量Linux服务器安全配置

1.摘要 在上一篇<<初步利用Ansible实现批量服务器自动化管理>>文章中, 我初步实现了通过编写清单和剧本来实现多台服务器的自动化管理,在本章节中, 我将利用Ansible的剧本来实现更实用、更复杂一点的功能, 主要功能包括三个:1.同时在三台服务器中增加IP访问控制,只…...

读书笔记:彼得·德鲁克《认识管理》第8章 战略规划:企业家技能

一、章节内容概述 战略规划帮助做好当前的业务以迎接未来。战略规划需要思考业务应该是什么&#xff0c;当前必须做什么才能赢得未来。战略规划需要进行风险决策&#xff0c;需要有组织地抛弃过去的业务&#xff0c;要求清晰界定和明确安排为实现理想的未来而开展的工作。战略…...

HarmonyOS应用开发-视频播放器与弹窗

Viedo组件 在手机、平板或是智慧屏这些终端设备上&#xff0c;媒体功能可以算作是我们最常用的场景之一。无论是实现音频的播放、录制、采集&#xff0c;还是视频的播放、切换、循环&#xff0c;亦或是相机的预览、拍照等功能&#xff0c;媒体组件都是必不可少的。以视频功能为…...

java中对象的引用是什么?

引用和指向 例如&#xff1a; new Student(); 代表创建了一个Student对象&#xff0c;但是也仅仅是创建了一个对象&#xff0c;没有办法访问它。 为了访问这个对象&#xff0c;会使用引用来代表这个对象 Student s new Student(); s这个变量是Student类型&#xff0c;又叫做引…...

jenkins插件迁移

将Jenkins插件迁移至不同的Jenkins实例或更新插件版本是一项常见的任务。以下是迁移Jenkins插件的一般步骤&#xff1a; 备份现有插件&#xff1a; 在开始迁移之前&#xff0c;首先备份你当前的Jenkins实例以及所有相关的插件。这可以通过复制Jenkins的JENKINS_HOME目录来实现…...

RK356X Android13.0 HDMI和喇叭同时出声音

补丁适用范围:RK356X Android13.0 Android默认音频输出逻辑,不接HDMI默认喇叭音频输出,若检测到HDMI接入后,关闭喇叭输出,开启HDMI音频输出,但是BOX产品的使用场景需要插入HDMI后,喇叭仍然输出,可加入此补丁 $ vim frameworks/base/services/core/java/com/android/s…...

vue sass-loader,webpack安装卸载操作命令

检查 node-sass 的可用版本&#xff1a;运行下面的命令&#xff0c;查看 node-sass 的可用版本列表。 查看 npm view node-sass versions卸载 npm uninstall node-sass安装指定版本 npm install node-sass4.14.1安装最新版本 npm install sass-loaderlatest如果没有指定特定…...

nacos应用——占用内存过多问题解决(JVM调优初步)

问题描述 最近搞了一台1年的阿里云服务器&#xff0c;安装了一下常用的MySQL&#xff0c;Redis&#xff0c;rabbitmq&#xff0c;minio&#xff0c;然后有安装了一下nacos&#xff0c;结果一启动nacos内存占用就很高&#xff0c;就比较限制我继续安装其他镜像或者启动别的服务…...

大漠插件(二、Qt使用插件时注意事项)

本章目的 在上篇已经注册完毕大漠&#xff0c;那么怎么使用大漠来制作脚本&#xff0c;我选择了我最熟悉的Qt来开发&#xff0c;毕竟只是小软件&#xff0c;用脚本或者c都差不了多少。本章就是开发途中的一些坑。 本人开发环境是 win11 64、Qt 5.15.2安装了5.10.0的msvc2015 32…...

CSS 浮动

目标target✓ 能够说出来为什么需要浮动能够说出来浮动的排列特性能够说出来三种最常见的布局方式能够说出来为什么需要清除浮动,能够至少写出两种清楚浮动的方法能够利用Photoshop实现基本的切图能够利用Photoshop插件实现切图能够完成学成在线的页面布 传统网页布局的三种模…...

基于STM32+华为云IOT设计的火灾感知系统

一、设计需求 【1】 项目背景 随着城市化进程的加快和人们生活水平的提高,火灾事故频繁发生,给人们的生命财产安全带来巨大威胁。因此,开发一种可靠的火灾感知系统对于预防和减少火灾事故具有重要意义。近年来,随着物联网技术的发展,基于物联网的火灾感知系统逐渐成为研…...