当前位置: 首页 > news >正文

【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch

文章目录

  • 01 Elasticsearch Sink 基础概念
  • 02 Elasticsearch Sink 工作原理
  • 03 Elasticsearch Sink 核心组件
  • 04 Elasticsearch Sink 配置参数
  • 05 Elasticsearch Sink 依赖管理
  • 06 Elasticsearch Sink 初阶实战
  • 07 Elasticsearch Sink 进阶实战
    • 7.1 包结构 & 项目配置
      • 项目配置application.properties
      • 日志配置log4j2.properties
      • 项目pom.xml文件
    • 7.2 实体类ElasticsearchEntity
    • 7.3 客户端工厂类CustomRestClientFactory
    • 7.4 回调函数类CustomRequestConfigCallback
    • 7.5 客户端配置类CustomHttpClientConfigCallback
    • 7.6 Es操作类CustomElasticsearchSinkFunction
    • 7.7 异常处理类CustomActionRequestFailureHandler
    • 7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

01 Elasticsearch Sink 基础概念

Flink的Elasticsearch Sink是用于将Flink数据流(DataStream)中的数据发送到Elasticsearch的组件。它是Flink的一个连接器(Connector),用于实现将实时处理的结果或数据持续地写入Elasticsearch集群中的索引中。

下面是一些关于Flink的Elasticsearch Sink的基础概念:

  1. 数据源(Source):Flink数据流的源头,可以是各种数据源,例如Kafka、文件系统、Socket等。Elasticsearch Sink通常是连接到Flink数据流的末端,用于将最终处理结果或数据写入Elasticsearch。
  2. Elasticsearch集群:一个或多个Elasticsearch节点的集合,用于存储和处理数据。Elasticsearch提供了分布式的数据存储和搜索功能。
  3. 索引(Index):在Elasticsearch中,索引是存储相关数据的地方,类似于关系数据库中的表。每个索引可以包含多个文档(Document),每个文档包含一个或多个字段(Field)。
  4. 文档(Document):在Elasticsearch中,文档是最小的数据单元。它们以JSON格式表示,并存储在索引中。
  5. Elasticsearch Sink:是Flink的一个数据接收器,用于将数据流中的数据发送到Elasticsearch集群中的特定索引。Sink负责将Flink数据流中的事件转换为Elasticsearch要求的格式,并将其发送到指定的索引。
  6. 序列化与映射:在将数据写入Elasticsearch之前,通常需要对数据进行序列化和映射。序列化是将数据从Flink的内部表示转换为Elasticsearch要求的JSON格式。映射则是定义如何将Flink数据流中的字段映射到Elasticsearch文档中的字段。
  7. 并行度控制:Elasticsearch Sink支持并行度控制,可以根据需要调整并发写入Elasticsearch的任务数量。这有助于优化性能并避免对Elasticsearch集群造成过大的负载。

总的来说,Flink的Elasticsearch Sink是一个关键的组件,用于将实时处理的结果或数据可靠地写入Elasticsearch中,从而支持各种实时数据分析和搜索应用。

02 Elasticsearch Sink 工作原理

Elasticsearch Sink 是 Apache Flink 提供的一个连接器,用于将 Flink 数据流中的数据发送到 Elasticsearch 集群中。以下是 Elasticsearch Sink 的工作原理:

  1. 数据流入 Flink 程序: 数据首先从外部数据源(如 Kafka、RabbitMQ、文件系统等)进入到 Flink 程序中。Flink 以流式处理的方式处理数据,这意味着数据会一条一条地进入 Flink 的数据流中。
  2. 数据转换与处理: 一旦数据进入 Flink,您可以对数据进行各种转换和处理。这可能包括数据清洗、转换、聚合、窗口操作等。在您的 Flink 程序中,您可以通过各种 Flink 的算子来实现这些转换和处理。
  3. Elasticsearch Sink 的配置: 当需要将数据写入 Elasticsearch 时,您需要配置 Elasticsearch Sink。这通常包括指定 Elasticsearch 集群的地址、端口、索引名称等信息。您还可以配置其他参数,例如批量写入的大小、超时时间等。
  4. 数据发送到 Elasticsearch: 一旦配置完成,Elasticsearch Sink 会将 Flink 数据流中的数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。通常,Elasticsearch Sink 会将数据批量发送到 Elasticsearch,以提高写入的效率和性能。
  5. 序列化与映射: 在发送数据之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式,并根据 Elasticsearch 索引的映射规则进行字段映射。这确保了发送到 Elasticsearch 的数据与索引的结构一致。
  6. 容错与错误处理: Flink 提供了容错机制来确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Flink 会自动进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。
  7. 性能优化: 为了提高性能,Elasticsearch Sink 可以通过调整批量写入的大小、并发度等参数来优化性能。这可以减少与 Elasticsearch 的通信开销,并提高写入的效率。

总的来说,Elasticsearch Sink 通过将 Flink 数据流中的数据转换为 JSON 格式,并利用 Elasticsearch 的 REST API 将数据发送到指定的索引中,实现了将实时流数据写入 Elasticsearch 的功能。

03 Elasticsearch Sink 核心组件

Elasticsearch Sink 在 Apache Flink 中是一个核心组件,它负责将 Flink 数据流中的数据发送到 Elasticsearch。下面是 Elasticsearch Sink 的核心组件:

  1. SinkFunction: SinkFunction 是 Flink 中的一个接口,用于定义将数据发送到外部系统的逻辑。在 Elasticsearch Sink 中,您需要实现 SinkFunction 接口,以将 Flink 数据流中的数据发送到 Elasticsearch。通常,您需要在 SinkFunction 中实现将数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。
  2. BulkProcessor: BulkProcessor 是 Elasticsearch Java 客户端提供的一个功能,用于批量写入数据到 Elasticsearch。在 Elasticsearch Sink 中,BulkProcessor 负责将 Flink 数据流中的数据批量发送到 Elasticsearch。您可以通过 BulkProcessor 来配置批量写入的大小、并发度等参数,以优化写入性能。
  3. TransportClient 或 RestHighLevelClient: 在 Elasticsearch Sink 中,您可以使用 Elasticsearch Java 客户端的 TransportClient 或 RestHighLevelClient 来与 Elasticsearch 集群进行通信。这些客户端提供了与 Elasticsearch 集群交互的接口,使您可以发送数据到 Elasticsearch、执行查询、索引管理等操作。
  4. 序列化器(Serializer): 在将数据发送到 Elasticsearch 之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式。序列化器负责将 Flink 数据流中的数据转换为 Elasticsearch 所需的 JSON 格式。您可以根据具体的数据类型和业务需求来实现自定义的序列化器。
  5. Elasticsearch 连接配置: 在 Elasticsearch Sink 中,您需要配置与 Elasticsearch 集群的连接信息,包括 Elasticsearch 集群的地址、端口、索引名称等。这些配置信息通常在初始化 Elasticsearch Sink 时进行设置,并在发送数据时使用。
  6. 容错与错误处理机制: Elasticsearch Sink 需要具备容错和错误处理机制,以确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Sink 需要能够进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。

这些组件共同作用,构成了 Elasticsearch Sink 在 Flink 中的核心功能,使得 Flink 用户可以轻松地将实时流数据发送到 Elasticsearch,并实现各种实时数据分析和搜索应用。

04 Elasticsearch Sink 配置参数

nodes :Elasticsearch 集群的节点地址列表

port :Elasticsearch 集群的端口

Elasticsearch 集群的节点地址列表

scheme : Elasticsearch 集群的通信协议,http或https

type :Elasticsearch 集群的文档类型,es7以后是_doc

index :Elasticsearch 集群的索引名称

bulkFlushMaxActions :内部批量处理器,刷新前最大缓存的操作数

bulkFlushMaxSizeMb :刷新前最大缓存的数据量(以兆字节为单位)

bulkFlushInterval :刷新的时间间隔(不论缓存操作的数量或大小如何)

bulkFlushBackoff :是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。

bulkFlushBackoffDelay :设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试

bulkFlushBackoffRetries :设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试

connectTimeout :设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常

socketTimeout :设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。

connectionRequestTimeout :设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。

redirectsEnabled :设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。

maxRedirects :客户端允许的最大重定向次数

authenticationEnabled :启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。

circularRedirectsAllowed :设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。

contentCompressionEnabled :设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。

expectContinueEnabled :设置是否启用 “Expect: continue” 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。

normalizeUri :设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等

05 Elasticsearch Sink 依赖管理

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_1.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_1.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_1.12</artifactId><version>1.14.4</version>
</dependency>

06 Elasticsearch Sink 初阶实战

package com.aurora.demo;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;/*** 描述:Flink集成Elasticsearch Connector连接器快速入门运行demo* 实现实时数据流如何无缝地流向Elasticsearch** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 22:25:58*/
public class ElasticsearchSinkStreamJobQuickDemo {private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobQuickDemo.class);public static void main(String[] args) throws Exception {// 创建elasticsearch集群的httpHost连接HttpHost httpHost = new HttpHost("localhost", 9200, "http");List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(httpHost);// 创建elasticsearchSinkFunction函数对象,专门用于处理数据写入elasticsearchSink算子队列,会自动创建索引ElasticsearchSinkFunction<JSONObject> elasticsearchSinkFunction = new ElasticsearchSinkFunction<JSONObject>() {@Overridepublic void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {String transId = element.getString("transId");String tradeTime = element.getString("tradeTime");String index = "flink_" + tradeTime;logger.info("交易流水={},数据写入索引{}成功", transId, index);IndexRequest indexRequest = Requests.indexRequest().index(index).type("_doc").id(transId).source(element, XContentType.JSON);indexer.add(indexRequest);}};// 构建elasticsearchSink算子BuilderElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);// 每个请求最多发送的文档数量esSinkBuilder.setBulkFlushMaxActions(1);// 每次发送请求的时间间隔esSinkBuilder.setBulkFlushInterval(1000);//构建elasticsearchSink算子ElasticsearchSink<JSONObject> sink = esSinkBuilder.build();// 自定义数据源,模拟生产环境交易接入,每秒下发一个json格式数据SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {//交易流水号String tradeId = UUID.randomUUID().toString();//交易发生时间戳long timeStamp = System.currentTimeMillis();//交易发生金额long tradeAmount = new Random().nextInt(1000);//交易名称String tradeName = "支付宝转账";JSONObject dataObj = new JSONObject();dataObj.put("transId", tradeId);dataObj.put("timeStamp", timeStamp);dataObj.put("tradeTime", dateUtil(timeStamp));dataObj.put("tradeAmount", tradeAmount);dataObj.put("tradeName", tradeName);//模拟生产,每隔1秒生成一笔交易Thread.sleep(1000);logger.info("源交易流水={},原始报文={}", tradeId, dataObj.toJSONString());sourceContext.collect(dataObj);}}@Overridepublic void cancel() {}};// 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);// 数据源写入数据算子,进行输出到elasticsearchdataStreamSource.addSink(sink);// 执行任务env.execute();}/*** 描述:时间格式化工具类** @param timestamp 时间戳* @return {@code String }*/private static String dateUtil(long timestamp) {//时间戳加工timestamp = timestamp / 1000;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");// 格式化日期时间对象为指定格式的字符串String dateTimeFormat = formatter.format(dateTime);return dateTimeFormat;}
}

启动上述作业后,根据对应的交易流水号查询es,或者查询es的索引数据,但是索引数据一般是一段时间才更新

验证1:检查索引数据变化
http://127.0.0.1:9200/_cat/indices?v

在这里插入图片描述

验证2:根据id查询es的文档记录

在这里插入图片描述
在这里插入图片描述

07 Elasticsearch Sink 进阶实战

进阶实战主要是包括ElasticsearchSink的各种参数配置,以及性能调优

7.1 包结构 & 项目配置

在这里插入图片描述

项目配置application.properties

es.cluster.hosts=localhost
es.cluster.port=9200
es.cluster.scheme=http
es.cluster.type=_doc
es.cluster.indexPrefix=flink_#内部批量处理器,刷新前最大缓存的操作数
es.cluster.bulkFlushMaxActions=1
#刷新前最大缓存的数据量(以兆字节为单位)
es.cluster.bulkFlushMaxSizeMb=10
#刷新的时间间隔(不论缓存操作的数量或大小如何)
es.cluster.bulkFlushInterval=10000#是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。
es.cluster.bulkFlushBackoff=false
#设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试
es.cluster.bulkFlushBackoffDelay=10000
#设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试
es.cluster.bulkFlushBackoffRetries=3#设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常
es.cluster.connectTimeout=10000
#设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。
es.cluster.socketTimeout=10000
#设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。
es.cluster.connectionRequestTimeout=10000
设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。
es.cluster.redirectsEnabled=false
#客户端允许的最大重定向次数
es.cluster.maxRedirects=3#启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。
es.cluster.authenticationEnabled=false
#设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。
es.cluster.circularRedirectsAllowed=false
#设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。
es.cluster.contentCompressionEnabled=false
#设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。
es.cluster.expectContinueEnabled=false
#设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。
es.cluster.normalizeUri=false

日志配置log4j2.properties

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

项目pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.aurora</groupId><artifactId>aurora_elasticsearch_connector</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>1.8</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.14.4</flink.version><!--scala版本--><scala.binary.version>2.12</scala.binary.version></properties><!--依赖管理--><dependencies><!-- fastJson工具类依赖 start --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- fastJson工具类依赖 end --><!-- log4j日志框架依赖 start --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!-- log4j日志框架依赖 end --><!-- Flink基础依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink基础依赖 end --><!-- Flink Elasticsearch 连接器依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink Elasticsearch 连接器依赖 end --></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.aurora.demo,ElasticsearchSinkStreamingJobDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build></project>

7.2 实体类ElasticsearchEntity

package com.aurora.advanced;import java.io.Serializable;/*** 描述:elasticsearch实体类** @author 浅夏的猫* @version 1.0.0* @date 2024-02-10 20:08:20*/
public class ElasticsearchEntity implements Serializable {private static final long serialVersionUID = 1L;/*** 集群地址* */private String hosts;/*** 集群端口* */private Integer port;/***执行计划* */private String scheme;/*** 文档类型,es7一般都是_doc* */private String type;/*** 索引前缀* */private String indexPrefix;/*** 内部批量处理器,刷新前最大缓存的操作数* */private Integer bulkFlushMaxActions=1;/*** 刷新前最大缓存的数据量(以兆字节为单位)* */private Integer bulkFlushMaxSizeMb=10;/*** 刷新的时间间隔(不论缓存操作的数量或大小如何)* */private Integer bulkFlushInterval=10000;/*** 是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。* 此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。* */private Boolean bulkFlushBackoff=false;/*** 设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试* */private Integer bulkFlushBackoffDelay=10000;/*** 设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试* */private Integer bulkFlushBackoffRetries=3;/*** 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常* */private Integer connectTimeout=10000;/*** 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。* */private Integer socketTimeout=10000;/*** 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。* */private Integer connectionRequestTimeout=10000;/*** 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。* */private Boolean redirectsEnabled=false;/*** 客户端允许的最大重定向次数* */private Integer maxRedirects=3;/*** 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。* */private Boolean authenticationEnabled=true;/*** 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。* */private Boolean circularRedirectsAllowed=false;/*** 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。* */private Boolean contentCompressionEnabled=false;/*** 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。* 如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。* */private Boolean expectContinueEnabled=false;/*** 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。* */private Boolean normalizeUri=false;/*** 用于设置 HTTP 请求的路径前缀。* 这个配置选项通常用于设置反向代理或者负载均衡器等中间件与 Elasticsearch 集群之间的连接* */private String pathPrefix;public String getHosts() {return hosts;}public void setHosts(String hosts) {this.hosts = hosts;}public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}public String getScheme() {return scheme;}public void setScheme(String scheme) {this.scheme = scheme;}public String getType() {return type;}public void setType(String type) {this.type = type;}public String getIndexPrefix() {return indexPrefix;}public void setIndexPrefix(String indexPrefix) {this.indexPrefix = indexPrefix;}public Integer getBulkFlushMaxActions() {return bulkFlushMaxActions;}public void setBulkFlushMaxActions(Integer bulkFlushMaxActions) {this.bulkFlushMaxActions = bulkFlushMaxActions;}public Integer getBulkFlushMaxSizeMb() {return bulkFlushMaxSizeMb;}public void setBulkFlushMaxSizeMb(Integer bulkFlushMaxSizeMb) {this.bulkFlushMaxSizeMb = bulkFlushMaxSizeMb;}public Integer getBulkFlushInterval() {return bulkFlushInterval;}public void setBulkFlushInterval(Integer bulkFlushInterval) {this.bulkFlushInterval = bulkFlushInterval;}public Boolean getBulkFlushBackoff() {return bulkFlushBackoff;}public void setBulkFlushBackoff(Boolean bulkFlushBackoff) {this.bulkFlushBackoff = bulkFlushBackoff;}public Integer getBulkFlushBackoffDelay() {return bulkFlushBackoffDelay;}public void setBulkFlushBackoffDelay(Integer bulkFlushBackoffDelay) {this.bulkFlushBackoffDelay = bulkFlushBackoffDelay;}public Integer getBulkFlushBackoffRetries() {return bulkFlushBackoffRetries;}public void setBulkFlushBackoffRetries(Integer bulkFlushBackoffRetries) {this.bulkFlushBackoffRetries = bulkFlushBackoffRetries;}public Integer getConnectTimeout() {return connectTimeout;}public void setConnectTimeout(Integer connectTimeout) {this.connectTimeout = connectTimeout;}public Integer getSocketTimeout() {return socketTimeout;}public void setSocketTimeout(Integer socketTimeout) {this.socketTimeout = socketTimeout;}public Integer getConnectionRequestTimeout() {return connectionRequestTimeout;}public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {this.connectionRequestTimeout = connectionRequestTimeout;}public Boolean getRedirectsEnabled() {return redirectsEnabled;}public void setRedirectsEnabled(Boolean redirectsEnabled) {this.redirectsEnabled = redirectsEnabled;}public Integer getMaxRedirects() {return maxRedirects;}public void setMaxRedirects(Integer maxRedirects) {this.maxRedirects = maxRedirects;}public Boolean getAuthenticationEnabled() {return authenticationEnabled;}public void setAuthenticationEnabled(Boolean authenticationEnabled) {this.authenticationEnabled = authenticationEnabled;}public Boolean getCircularRedirectsAllowed() {return circularRedirectsAllowed;}public void setCircularRedirectsAllowed(Boolean circularRedirectsAllowed) {this.circularRedirectsAllowed = circularRedirectsAllowed;}public Boolean getContentCompressionEnabled() {return contentCompressionEnabled;}public void setContentCompressionEnabled(Boolean contentCompressionEnabled) {this.contentCompressionEnabled = contentCompressionEnabled;}public Boolean getExpectContinueEnabled() {return expectContinueEnabled;}public void setExpectContinueEnabled(Boolean expectContinueEnabled) {this.expectContinueEnabled = expectContinueEnabled;}public Boolean getNormalizeUri() {return normalizeUri;}public void setNormalizeUri(Boolean normalizeUri) {this.normalizeUri = normalizeUri;}public String getPathPrefix() {return pathPrefix;}public void setPathPrefix(String pathPrefix) {this.pathPrefix = pathPrefix;}
}

7.3 客户端工厂类CustomRestClientFactory

作用:设置用于创建 Elasticsearch REST 客户端的工厂,可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口

package com.aurora.advanced;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:设置用于创建 Elasticsearch REST 客户端的工厂* 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 00:12:15*/
public class CustomRestClientFactory implements RestClientFactory {private ElasticsearchEntity elasticsearchEntity;public CustomRestClientFactory(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic void configureRestClientBuilder(RestClientBuilder restClientBuilder) {//设置默认的 HTTP 头部信息,这些信息将在每个请求中包含Header contentType = new BasicHeader("Content-Type", "application/json");Header authorization = new BasicHeader("Authorization", "Bearer your_access_token");Header[] headers = {contentType, authorization};restClientBuilder.setDefaultHeaders(headers);//设置用于监听节点故障的监听器。当节点发生故障时,可以执行特定的操作restClientBuilder.setFailureListener(new RestClient.FailureListener());//配置用于选择与之通信的节点的策略。这涉及到 Elasticsearch 集群中多个节点的选择。restClientBuilder.setNodeSelector(NodeSelector.ANY);//为每个请求设置路径前缀。这可以用于将请求定向到特定的子路径。if(StringUtils.isNoneBlank(elasticsearchEntity.getPathPrefix())){restClientBuilder.setPathPrefix(elasticsearchEntity.getPathPrefix());}//允许在创建每个请求的时候进行额外的请求配置。restClientBuilder.setRequestConfigCallback(new CustomRequestConfigCallback(elasticsearchEntity));//允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置。restClientBuilder.setHttpClientConfigCallback(new CustomHttpClientConfigCallback(elasticsearchEntity));//设置是否启用严格的废弃模式,用于警告有关已弃用功能的使用。restClientBuilder.setStrictDeprecationMode(false);}
}

7.4 回调函数类CustomRequestConfigCallback

作用:允许在创建每个请求的时候进行额外的请求配置

package com.aurora.advanced;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:* 允许在创建每个请求的时候进行额外的请求配置* @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 23:24:42*/
public class CustomRequestConfigCallback implements RestClientBuilder.RequestConfigCallback {private ElasticsearchEntity elasticsearchEntity;public CustomRequestConfigCallback(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder custom) {// 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());// 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());// 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());// 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());// 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());// 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());// 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());// 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。//  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());// 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());// 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范custom.setCookieSpec(new DefaultCookieSpec().toString());return custom;}
}

7.5 客户端配置类CustomHttpClientConfigCallback

作用:允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置

package com.aurora.advanced;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:客户端配置* 允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 23:28:15*/
public class CustomHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {private ElasticsearchEntity elasticsearchEntity;CustomHttpClientConfigCallback(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {RequestConfig.Builder custom = RequestConfig.custom();// 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());// 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());// 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());// 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());// 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());// 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。custom.setAuthenticationEnabled(elasticsearchEntity.getAuthenticationEnabled());// 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());// 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());// 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。//  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());// 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());// 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范custom.setCookieSpec(new DefaultCookieSpec().toString());return httpAsyncClientBuilder.setDefaultRequestConfig(custom.build());}
}

7.6 Es操作类CustomElasticsearchSinkFunction

作用:实时把数据写入到队列中,再通过批量提交到Elasticsearch中,实现数据写入

package com.aurora.advanced;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 描述:自定义elasticsearch sink 算子函数* ElasticsearchSinkFunction 是用于将数据流写入 Elasticsearch 的接口。* 它允许您自定义如何将 Flink 流式处理的数据写入 Elasticsearch 索引** @author 浅夏的猫* @version 1.0.0* @date 2024-02-12 23:49:22*/
public class CustomElasticsearchSinkFunction implements ElasticsearchSinkFunction<JSONObject> {private static final Logger logger = LoggerFactory.getLogger(CustomElasticsearchSinkFunction.class);private ElasticsearchEntity elasticsearchEntity;public CustomElasticsearchSinkFunction(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {String transId = element.getString("transId");String tradeTime = element.getString("tradeTime");String index = elasticsearchEntity.getIndexPrefix() + tradeTime;logger.info("交易流水={},数据写入索引{}成功", tradeTime, index);IndexRequest indexRequest = Requests.indexRequest().index(index).type(elasticsearchEntity.getType()).id(transId).source(element, XContentType.JSON);indexer.add(indexRequest);}
}

7.7 异常处理类CustomActionRequestFailureHandler

作用:当sink写Elasticsearch出现异常时,可以自定义操作策略

package com.aurora.advanced;import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 描述:es写入异常处理** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 00:04:24*/
public class CustomActionRequestFailureHandler implements ActionRequestFailureHandler {private static final Logger logger = LoggerFactory.getLogger(CustomActionRequestFailureHandler.class);@Overridepublic void onFailure(ActionRequest action, Throwable throwable, int restStatusCode, RequestIndexer requestIndexer) throws Throwable {// 处理不同类型的异常if (throwable instanceof EsRejectedExecutionException) {// 如果是由于线程池饱和导致的拒绝执行异常,可以采取相应的处理措施logger.warn("Elasticsearch action execution was rejected due to thread pool saturation.");// 这里你可以选择执行重试或者其他处理逻辑,例如将数据写入到一个备用存储// 例如: indexer.add(createAnotherRequest(action));} else {// 对于其他类型的异常,默认返回放弃策略logger.error("Unhandled failure, abandoning request: {}", action.toString());}}
}

7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

package com.aurora.advanced;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;/*** 描述:Flink集成Elasticsearch Connector连接器进阶Demo* 实现实时数据流如何无缝地流向Elasticsearch** @author 浅夏的猫* @version 1.0.0* @date 2024-02-11 22:06:45*/
public class ElasticsearchSinkStreamJobAdvancedDemo {private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobAdvancedDemo.class);public static void main(String[] args) {try {// 读取配置参数ElasticsearchEntity elasticsearchEntity = paramsInit();// 设置elasticsearch节点List<HttpHost> httpHosts = esClusterHttpHostHandler(elasticsearchEntity);// 创建esSinkFunction函数ElasticsearchSinkFunction<JSONObject> esSinkFunction = new CustomElasticsearchSinkFunction(elasticsearchEntity);// 构建ElasticsearchSink算子builderElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, esSinkFunction);// es参数配置esBuilderHandler(esSinkBuilder, elasticsearchEntity);// 构建sink算子ElasticsearchSink<JSONObject> esSink = esSinkBuilder.build();// 自定义数据源,模拟生产环境交易接入,json格式数据SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {//交易流水号String tradeId = UUID.randomUUID().toString();//交易发生时间戳long timeStamp = System.currentTimeMillis();//交易发生金额long tradeAmount = new Random().nextInt(100);//交易名称String tradeName = "支付宝转账";JSONObject dataObj = new JSONObject();dataObj.put("transId", tradeId);dataObj.put("timeStamp", timeStamp);dataObj.put("tradeTime", dateUtil(timeStamp));dataObj.put("tradeAmount", tradeAmount);dataObj.put("tradeName", tradeName);//模拟生产,每隔1秒生成一笔交易Thread.sleep(1000);logger.info("交易接入,原始报文={}", dataObj.toJSONString());sourceContext.collect(dataObj);}}@Overridepublic void cancel() {}};// 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);// 构建sink算子dataStreamSource.addSink(esSink);// 运行作业env.execute();} catch (Exception e) {e.printStackTrace();}}/*** 描述:Flink参数配置读取** @return {@code ElasticsearchEntity }* @throws IOException*/private static ElasticsearchEntity paramsInit() throws IOException {// 通过flink内置工具类获取命令行参数String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink_connector_elasticsearch\\src\\main\\resources\\application.properties";ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);ElasticsearchEntity elasticsearchEntity = new ElasticsearchEntity();String hosts = paramsMap.get("es.cluster.hosts");int port = paramsMap.getInt("es.cluster.port");String scheme = paramsMap.get("es.cluster.scheme");String type = paramsMap.get("es.cluster.type");String indexPrefix = paramsMap.get("es.cluster.indexPrefix");int bulkFlushMaxActions = paramsMap.getInt("es.cluster.bulkFlushMaxActions");int bulkFlushMaxSizeMb = paramsMap.getInt("es.cluster.bulkFlushMaxSizeMb");int bulkFlushInterval = paramsMap.getInt("es.cluster.bulkFlushInterval");boolean bulkFlushBackoff = paramsMap.getBoolean("es.cluster.bulkFlushBackoff");int bulkFlushBackoffDelay = paramsMap.getInt("es.cluster.bulkFlushBackoffDelay");int bulkFlushBackoffRetries = paramsMap.getInt("es.cluster.bulkFlushBackoffRetries");int connectTimeout = paramsMap.getInt("es.cluster.connectTimeout");int socketTimeout = paramsMap.getInt("es.cluster.socketTimeout");int connectionRequestTimeout = paramsMap.getInt("es.cluster.connectionRequestTimeout");boolean redirectsEnabled = paramsMap.getBoolean("es.cluster.redirectsEnabled");int maxRedirects = paramsMap.getInt("es.cluster.maxRedirects");boolean authenticationEnabled = paramsMap.getBoolean("es.cluster.authenticationEnabled");boolean circularRedirectsAllowed = paramsMap.getBoolean("es.cluster.circularRedirectsAllowed");boolean contentCompressionEnabled = paramsMap.getBoolean("es.cluster.contentCompressionEnabled");boolean expectContinueEnabled = paramsMap.getBoolean("es.cluster.expectContinueEnabled");boolean normalizeUri = paramsMap.getBoolean("es.cluster.normalizeUri");elasticsearchEntity.setHosts(hosts);elasticsearchEntity.setPort(port);elasticsearchEntity.setScheme(scheme);elasticsearchEntity.setType(type);elasticsearchEntity.setIndexPrefix(indexPrefix);elasticsearchEntity.setBulkFlushMaxActions(bulkFlushMaxActions);elasticsearchEntity.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);elasticsearchEntity.setBulkFlushInterval(bulkFlushInterval);elasticsearchEntity.setBulkFlushBackoff(bulkFlushBackoff);elasticsearchEntity.setBulkFlushBackoffDelay(bulkFlushBackoffDelay);elasticsearchEntity.setBulkFlushBackoffRetries(bulkFlushBackoffRetries);elasticsearchEntity.setConnectTimeout(connectTimeout);elasticsearchEntity.setSocketTimeout(socketTimeout);elasticsearchEntity.setConnectionRequestTimeout(connectionRequestTimeout);elasticsearchEntity.setRedirectsEnabled(redirectsEnabled);elasticsearchEntity.setMaxRedirects(maxRedirects);elasticsearchEntity.setAuthenticationEnabled(authenticationEnabled);elasticsearchEntity.setCircularRedirectsAllowed(circularRedirectsAllowed);elasticsearchEntity.setExpectContinueEnabled(expectContinueEnabled);elasticsearchEntity.setContentCompressionEnabled(contentCompressionEnabled);elasticsearchEntity.setNormalizeUri(normalizeUri);return elasticsearchEntity;}/*** 描述:时间格式化工具类** @param timestamp 时间戳* @return {@code String }*/private static String dateUtil(long timestamp) {timestamp = timestamp / 1000;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");// 格式化日期时间对象为指定格式的字符串String dateTimeFormat = formatter.format(dateTime);return dateTimeFormat;}/*** 描述:es参数配置** @param esSinkBuilder       esSinkBuilder建造器* @param elasticsearchEntity es实体类*/private static void esBuilderHandler(ElasticsearchSink.Builder<JSONObject> esSinkBuilder, ElasticsearchEntity elasticsearchEntity) {// 设置触发批量写入的最大动作数,// 解释:当达到指定的最大动作数时,将触发批量写入到 Elasticsearch。如果你希望在每次写入到 Elasticsearch 时都进行批量写入,可以将该值设置为 1esSinkBuilder.setBulkFlushMaxActions(elasticsearchEntity.getBulkFlushMaxActions());// 设置触发批量写入的最大数据量// 解释:当写入的数据量达到指定的最大值时,将触发批量写入到 Elasticsearch。单位为 MBesSinkBuilder.setBulkFlushMaxSizeMb(elasticsearchEntity.getBulkFlushMaxSizeMb());// 设置批量写入的时间间隔// 解释:每隔指定的时间间隔,无论是否达到最大动作数或最大数据量,都会触发批量写入esSinkBuilder.setBulkFlushInterval(elasticsearchEntity.getBulkFlushInterval());// 启用批量写入的退避策略// 解释:当 Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。esSinkBuilder.setBulkFlushBackoff(elasticsearchEntity.getBulkFlushBackoff());// 设置批量写入的退避延迟时间// 解释:在发生写入失败后,等待指定的延迟时间后再进行重试esSinkBuilder.setBulkFlushBackoffDelay(elasticsearchEntity.getBulkFlushBackoffDelay());// 设置批量写入的最大重试次数// 解释:设置在写入失败后的最大重试次数。超过这个次数后,将不再重试esSinkBuilder.setBulkFlushBackoffRetries(elasticsearchEntity.getBulkFlushBackoffRetries());// 设置写入失败时的处理策略// 解释:可以自定义处理失败的策略,实现 ElasticsearchSinkFunction.FailureHandler 接口esSinkBuilder.setFailureHandler(new CustomActionRequestFailureHandler());// 设置用于创建 Elasticsearch REST 客户端的工厂// 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口esSinkBuilder.setRestClientFactory(new CustomRestClientFactory(elasticsearchEntity));}/*** 描述:* elasticsearch 节点配置** @param elasticsearchEntity es实体类* @return {@code List<HttpHost> }*/private static List<HttpHost> esClusterHttpHostHandler(ElasticsearchEntity elasticsearchEntity) {List<HttpHost> httpHosts = new ArrayList<>();String[] clusterArray = elasticsearchEntity.getHosts().split(",");for (String node : clusterArray) {httpHosts.add(new HttpHost(node, elasticsearchEntity.getPort(), elasticsearchEntity.getScheme()));}return httpHosts;}
}

相关文章:

【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch

文章目录 01 Elasticsearch Sink 基础概念02 Elasticsearch Sink 工作原理03 Elasticsearch Sink 核心组件04 Elasticsearch Sink 配置参数05 Elasticsearch Sink 依赖管理06 Elasticsearch Sink 初阶实战07 Elasticsearch Sink 进阶实战7.1 包结构 & 项目配置项目配置appl…...

一、ActiveMQ介绍

ActiveMQ介绍 一、JMS1.jms介绍2.jms消息传递模式3.JMS编码总体架构 二、消息中间件三、ActiveMQ介绍1.引入的原因1.1 原因1.2 遇到的问题1.3 解决思路 2.定义3.特点3.1 异步处理3.2 应用系统之间解耦3.3 实际-整体架构 4.作用 一、JMS 1.jms介绍 jms是java消息服务接口规范&…...

【牛客】寒假训练营1 I-It‘s bertrand paradox. Again! 题解

传送门&#xff1a;It’s bertrand paradox. Again! 标签&#xff1a;随机 题目大意 有两个人分别用两种方式在二维平面上随机生成1e5个圆&#xff0c;每个圆上的每一个点(x,y)都满足-100<x<100且-100<y<100&#xff0c;现在将某个人生成的1e5个圆的圆心和半径告…...

各种手型都合适,功能高度可定制,雷柏VT9PRO mini和VT9PRO游戏鼠标上手

去年雷柏推出了一系列支持4KHz回报率的鼠标&#xff0c;有着非常敏捷的反应速度&#xff0c;在游戏中操作体验十分出色。尤其是这系列4K鼠标不仅型号丰富&#xff0c;而且对玩家的操作习惯、手型适应也很好&#xff0c;像是VT9系列就主打轻巧&#xff0c;还有专门针对小手用户的…...

sql建库,建表基础操作

当涉及到SQL建库和建表操作时&#xff0c;以下是一个简单的示例&#xff1a; 1. 建库&#xff08;创建数据库&#xff09; sql复制代码 CREATE DATABASE mydatabase; 上述语句将创建一个名为mydatabase的数据库。 2. 选择数据库 在创建表之前&#xff0c;需要选择要在其中…...

算法训练营day32,贪心算法6

import "strconv" //738. 单调递增的数字 func monotoneIncreasingDigits(n int) int { str : strconv.Itoa(n) nums : []byte(str) length : len(nums) if length < 1 { return n } for i : length - 1; i > 0; i-- { //如果前一个数字比当前值大&#xff0…...

CTR之行为序列建模用户兴趣:DIN

在前面的文章中&#xff0c;已经介绍了很多关于推荐系统中CTR预估的相关技术&#xff0c;今天这篇文章也是延续这个主题。但不同的&#xff0c;重点是关于用户行为序列建模&#xff0c;阿里出品。 概要 论文&#xff1a;Deep Interest Network for Click-Through Rate Predict…...

Java使用Redis实现分页功能

分页功能实现应该是比较常见的&#xff0c;对于redis来说&#xff0c;近期刷题就发现了lrange、zrange这些指令&#xff0c;这个指令怎么使用呢&#xff1f; 我们接下来就来讲解下。 目录 指令简介lrangezrange Java实现Redis实现分页功能 指令简介 lrange lrange 是 Redis 中…...

Qt标准对话框设置

Qt标准对话框设置&#xff0c;设置字体、调色板、进度条等。 #include "mainwindow.h" #include "ui_mainwindow.h"MainWindow::MainWindow(QWidget *parent): QMainWindow(parent), ui(new Ui::MainWindow) {ui->setupUi(this); }MainWindow::~MainWi…...

如何让Obsidian实现电脑端和安卓端同步

Obsidian是一款知名的笔记软件&#xff0c;支持Markdown语法&#xff0c;它允许用户在多个设备之间同步文件。要在安卓设备上实现同步&#xff0c;可以使用remote save插件&#xff0c;以下是具体操作步骤&#xff1a; 首先是安装电脑端的obsidian&#xff0c;然后依次下载obs…...

windows系统中jenkins构建报错提示“拒绝访问”

一.背景 之前徒弟在windows中安装的jenkins,运行的时候用的是java -jar jenkins.war来运行的。服务器只有1个盘符C盘。今天说构建错误了&#xff0c;问我修改了啥&#xff0c;我年前是修改过构建思路的。 二.问题分析 先看jenkins构建任务的日志&#xff0c;大概是xcopy命令执…...

服务器防火墙的应用技术有哪些?

随着互联网的发展&#xff0c;网络安全问题更加严峻。服务器防火墙技术作为一种基础的网络安全技术&#xff0c;对于保障我们的网络安全至关重要。本文将介绍服务器防火墙的概念和作用&#xff0c;以及主要的服务器防火墙技术&#xff0c;包括数据包过滤、状态检测、代理服务、…...

力扣:40. 组合总和 II

回溯&#xff1a; 1.先声明好大集合和小集合&#xff0c;在调用回溯函数&#xff0c;终止条件为sumtarget&#xff0c;要进行剪枝操作减少遍历的次数&#xff0c;去重操作防止数组中有两个相同的值来组成的集合相同。 class Solution {List<List<Integer>> li1ne…...

Java设计模式——责任链模式

当一个请求需要在多个对象之间传递&#xff0c;每个对象都可能处理该请求或将其传递给下一个对象。在这种情况下&#xff0c;需要避免将发送者与接收者之间的耦合&#xff0c;以及确定请求的处理方式。此时可使用责任链模式&#xff0c;它的优点有降低耦合度&#xff08;无需关…...

c++面试

c基础 面试题 1&#xff1a;变量的声明和定义有什么区别 1.定义:为变量分配地址和存储空间&#xff0c;声明:不分配地址。 2.一个变量可以在多个地方声明&#xff0c;但是只在一个地方定义。 3.加入 extern 修饰的是变量的声明&#xff0c;说明此变量将在文件以外或在文件后…...

[ansible] playbook运用

一、复习playbook剧本 --- - name: first play for install nginx #设置play的名称gather_facts: false #设置不收集facts信息hosts: webservers:dbservers #指定执行此play的远程主机组remote_user: root #指定执行此play的用…...

MSSQL运用

做过的事情&#xff0c;隔几年又再做相同的事情&#xff0c;做一下记录。 角色与权限 创建账号与设定执行存储过程权限 Use testDB CREATE LOGIN acct WITH PASSWORDp1 CREATE USER acct FOR LOGIN acct GO GRANT EXECUTE ON SP_Test TO acct; GO 存储过程 调用写好的SQL语…...

linux命令--pidof

文章目录 linux命令--pidof linux命令–pidof pidof 是Linux系统中用来查找正在运行进程的进程号&#xff08;pid)的工具&#xff0c;功能类似pgrep和ps。 pidof命令用于查找指定名称的进程的进程号id号。 语法 pidof(选项)(参数) 选项 -s&#xff1a;仅返回一个进程号&…...

计算机视觉发展的方向和潜在机会

计算机视觉发展的方向 文章目录 计算机视觉发展的方向计算机视觉发展的方向潜在机会 计算机视觉发展的方向 未来计算机视觉发展的方向可能包括以下几个方面&#xff1a; 深度学习和神经网络&#xff1a;深度学习已经成为计算机视觉领域的重要技术&#xff0c;未来将继续深入研…...

Java Web(六)--XML

介绍 官网&#xff1a;XML 教程 为什么需要&#xff1a; 需求 1 : 两个程序间进行数据通信&#xff1f;需求 2 : 给一台服务器&#xff0c;做一个配置文件&#xff0c;当服务器程序启动时&#xff0c;去读取它应当监听的端口号、还有连接数据库的用户名和密码。spring 中的…...

CTF show Web 红包题第六弹

提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框&#xff0c;很难让人不联想到SQL注入&#xff0c;但提示都说了不是SQL注入&#xff0c;所以就不往这方面想了 ​ 先查看一下网页源码&#xff0c;发现一段JavaScript代码&#xff0c;有一个关键类ctfs…...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例

一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

1688商品列表API与其他数据源的对接思路

将1688商品列表API与其他数据源对接时&#xff0c;需结合业务场景设计数据流转链路&#xff0c;重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点&#xff1a; 一、核心对接场景与目标 商品数据同步 场景&#xff1a;将1688商品信息…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

Redis数据倾斜问题解决

Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中&#xff0c;部分节点存储的数据量或访问量远高于其他节点&#xff0c;导致这些节点负载过高&#xff0c;影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

【JavaSE】多线程基础学习笔记

多线程基础 -线程相关概念 程序&#xff08;Program&#xff09; 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序&#xff0c;比如我们使用QQ&#xff0c;就启动了一个进程&#xff0c;操作系统就会为该进程分配内存…...

渗透实战PortSwigger靶场:lab13存储型DOM XSS详解

进来是需要留言的&#xff0c;先用做简单的 html 标签测试 发现面的</h1>不见了 数据包中找到了一个loadCommentsWithVulnerableEscapeHtml.js 他是把用户输入的<>进行 html 编码&#xff0c;输入的<>当成字符串处理回显到页面中&#xff0c;看来只是把用户输…...

Python环境安装与虚拟环境配置详解

本文档旨在为Python开发者提供一站式的环境安装与虚拟环境配置指南&#xff0c;适用于Windows、macOS和Linux系统。无论你是初学者还是有经验的开发者&#xff0c;都能在此找到适合自己的环境搭建方法和常见问题的解决方案。 快速开始 一分钟快速安装与虚拟环境配置 # macOS/…...