当前位置: 首页 > news >正文

Flink系列之:Elasticsearch SQL 连接器

Flink系列之:Elasticsearch SQL 连接器

  • 一、Elasticsearch SQL 连接器
  • 二、创建 Elasticsearch表
  • 三、连接器参数
  • 四、Key 处理
  • 五、动态索引
  • 六、数据类型映射

一、Elasticsearch SQL 连接器

  • Sink: Batch
  • Sink: Streaming Append & Upsert Mode
  • Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。
  • 连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。
  • 如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。

二、创建 Elasticsearch表

以下示例展示了如何创建 Elasticsearch sink 表:

CREATE TABLE myUserTable (user_id STRING,user_name STRING,uv BIGINT,pv BIGINT,PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'users'
);

三、连接器参数

参数是否必选默认值数据类型描述
connector必选(none)String指定要使用的连接器,有效值为:elasticsearch-6:连接到 Elasticsearch 6.x 的集群。elasticsearch-7:连接到 Elasticsearch 7.x 及更高版本的集群。
hosts必选(none)String要连接到的一台或多台 Elasticsearch 主机,例如 ‘http://host_name:9092;http://host_name:9093’。
index必选(none)StringElasticsearch 中每条记录的索引。可以是一个静态索引(例如 ‘myIndex’)或一个动态索引(例如 'index-{log_ts
document-type6.x 版本中必选(none)StringElasticsearch 文档类型。在 elasticsearch-7 中不再需要。
document-id.key-delimiter可选-String复合键的分隔符(默认为"_“),例如,指定为” " 将导致文档 I D 为 " K E Y 1 "将导致文档 ID 为"KEY1 "将导致文档ID"KEY1KEY2$KEY3"。
username可选(none)String用于连接 Elasticsearch 实例的用户名。请注意,Elasticsearch 没有预绑定安全特性,但你可以通过如下指南启用它来保护 Elasticsearch 集群。
password可选(none)String用于连接 Elasticsearch 实例的密码。如果配置了username,则此选项也必须配置为非空字符串。
failure-handler可选failString对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为:fail:如果请求失败并因此导致作业失败,则抛出异常。ignore:忽略失败并放弃请求。retry-rejected:重新添加由于队列容量饱和而失败的请求。自定义类名称:使用 ActionRequestFailureHandler 的子类进行失败处理。
sink.delivery-guarantee可选AT_LEAST_ONCEString承诺时可选择交付保证。有效值为:EXACTLY_ONCE:在故障转移情况下,记录也仅传递一次。AT_LEAST_ONCE:确保记录被传递,但可能会发生同一条记录被传递多次的情况。NONE:尽力提供记录。
sink.flush-on-checkpoint可选trueBoolean在进行 checkpoint 时是否保证刷出缓冲区中的数据。如果关闭这一选项,在进行checkpoint时 sink 将不再为所有进行 中的请求等待 Elasticsearch 的执行完成确认。因此,在这种情况下 sink 将不对至少一次的请求的一致性提供任何保证。
sink.bulk-flush.max-actions可选1000Integer每个批量请求的最大缓冲操作数。 可以设置为’0’来禁用它。
sink.bulk-flush.max-size可选2mbMemorySize每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为’0’来禁用它。
sink.bulk-flush.interval可选1sDurationflush 缓冲操作的间隔。 可以设置为’0’来禁用它。注意,'sink.bulk-flush.max-size’和’sink.bulk-flush.max-actions’都设置为’0’的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。
sink.bulk-flush.backoff.strategy可选DISABLEDString指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为:DISABLED:不执行重试,即第一次请求错误后失败。CONSTANT:等待重试之间的回退延迟。EXPONENTIAL:先等待回退延迟,然后在重试之间指数递增。
sink.bulk-flush.backoff.max-retries可选(none)Integer最大回退重试次数。
sink.bulk-flush.backoff.delay可选(none)Duration每次退避尝试之间的延迟。对于 CONSTANT 退避策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 退避策略,该值是初始的延迟。
connection.path-prefix可选(none)String添加到每个 REST 通信中的前缀字符串,例如,‘/v1’。
connection.request-timeout可选(none)Duration从连接管理器请求连接的超时时间。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。
connection.timeout可选(none)Duration建立请求的超时时间 。超时时间必须大于或者等于 0 ,如果设置为 0 则是无限超时。
socket.timeout可选(none)Duration等待数据的 socket 的超时时间 (SO_TIMEOUT)。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。
format可选jsonStringElasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。 默认使用内置的 ‘json’ 格式。

四、Key 处理

  • Elasticsearch sink 可以根据是否定义了一个主键来确定是在 upsert 模式还是 append 模式下工作。 如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 消息的查询。
  • 在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。 Elasticsearch 连接器通过使用 document-id.key-delimiter 指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。 某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES,ROW,ARRAY,MAP 等。 如果未指定主键,Elasticsearch 将自动生成文档 id。

五、动态索引

  • Elasticsearch sink 同时支持静态索引和动态索引。
  • 如果你想使用静态索引,则 index 选项值应为纯字符串,例如 ‘myusers’,所有记录都将被写入到 “myusers” 索引中。
  • 如果你想使用动态索引,你可以使用 {field_name} 来引用记录中的字段值来动态生成目标索引。 你也可以使用 ‘{field_name|date_format_string}’ 将 TIMESTAMP/DATE/TIME 类型的字段值转换为 date_format_string 指定的格式。 date_format_string 与 Java 的 DateTimeFormatter 兼容。 例如,如果选项值设置为 ‘myusers-{log_ts|yyyy-MM-dd}’,则 log_ts 字段值为 2020-03-27 12:25:55 的记录将被写入到 “myusers-2020-03-27” 索引中。
  • 你也可以使用 ‘{now()|date_format_string}’ 将当前的系统时间转换为 date_format_string 指定的格式。now() 对应的时间类型是 TIMESTAMP_WITH_LTZ 。 在将系统时间格式化为字符串时会使用 session 中通过 table.local-time-zone 中配置的时区。 使用 NOW(), now(), CURRENT_TIMESTAMP, current_timestamp 均可以。
  • 注意: 使用当前系统时间生成的动态索引时, 对于 changelog 的流,无法保证同一主键对应的记录能产生相同的索引名, 因此使用基于系统时间的动态索引,只能支持 append only 的流。

六、数据类型映射

Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。 Flink 为 Elasticsearch 连接器使用内置的 ‘json’ 格式。

下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink SQL类型JSON类型
CHAR/VARCHAR/STRINGstring
BOOLEANboolean
BINARY/VARBINARYstring with encoding: base64
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
TIMEstring with format: time
TIMESTAMPstring with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONEstring with format: date-time (with UTC time zone)
INTERVALnumber
ARRAYarray
MAP / MULTISETobject
ROWobject

相关文章:

Flink系列之:Elasticsearch SQL 连接器

Flink系列之:Elasticsearch SQL 连接器 一、Elasticsearch SQL 连接器二、创建 Elasticsearch表三、连接器参数四、Key 处理五、动态索引六、数据类型映射 一、Elasticsearch SQL 连接器 Sink: BatchSink: Streaming Append & Upsert ModeElasticsearch 连接器…...

java中将Map集合、对象、字符串转换为JSON对象

1、Map集合转JSON对象 创建一个Map集合&#xff1b; 新建json对象&#xff0c;并将Map引入json中。 public void demo1(){ //创建一个Map集合Map<String, String> map new HashMap<>();map.put("1729210001","zhangsan");map.put("17292…...

理解Spring中bean的作用域

singleton:Spring Ioc容器中只会存在一个共享的Bean实例&#xff0c;无论有多少个Bean引用它&#xff0c;始终指向同一个对象&#xff0c;作用域为Spring中的缺省&#xff08;同一package&#xff09;作用域 prototype:每次通过Spring容器获取prototype定义的bean时&#xff0c…...

edge中以右键“打印”的方式“保存”当前页面的pdf形式,下载过程中卡进度的问题

目录 问题描述&#xff1a; 可能的问题&#xff1a; 解决&#xff1a; 问题描述&#xff1a; 特殊情况下需要保存网页的pdf形式&#xff0c;但页面没有类似“导出pdf”的功能按钮&#xff0c;可以通过页面右键“打印”的方式“保存”当前页面的pdf形式。在pdf文件下载过程中出…...

c# 使用OpenCV

C#和OpenCV的结合主要通过一个名为OpenCVSharp的库实现。OpenCVSharp是一个C#包装器&#xff0c;它提供了对OpenCV&#xff08;一个开源的计算机视觉和机器学习库&#xff09;功能的访问。 安装OpenCVSharp NuGet包&#xff1a; 在Visual Studio中&#xff0c;右键点击你的项目…...

数据库连接问题 - ChatGPT对自身的定位

1.一段关于数据库连接的技术性对话 sweetie&#xff0c;连接数据库的时候&#xff0c;需要在每次读写数据后就把连接释放吗&#xff1f; 亲爱的&#xff0c;连接数据库后&#xff0c;通常会在每次读写数据后将连接释放。这是为了确保数据库连接的及时释放和有效管理。如果不及…...

常见可视化大屏编辑器有哪些?

前言&#xff1a; 在当今数字化时代&#xff0c;可视化大屏编辑器成为了数据展示和决策支持的重要工具。大屏编辑器不仅仅是数据的呈现&#xff0c;更是数据背后的故事的讲述者。它通过图表、图形和实时数据的呈现&#xff0c;为用户提供了全面的信息视图&#xff0c;帮助用户更…...

利用ffmpeg cv2取h265码流视频(转换图片灰屏问题解决)

利用海康威视相机拍出来的视频是H265格式的&#xff0c;相比于常规的H264编码&#xff0c;压缩率更高&#xff0c;但因此如果直接用正常取流方法读取&#xff0c;会出现无法读取的情况 1. 如图h265码流取出图片为灰屏 2 、解决灰屏问题 import subprocess import cv2# 将h265流…...

Android Uri scheme协议file转content

一、Uri的介绍 在Android开发中&#xff0c;Uri&#xff08;Uniform Resource Identifier&#xff09;是用于标识和访问各种资源的核心概念。这些资源可能包括文件、网络URL、数据库记录等。在处理这些资源时&#xff0c;我们可能会遇到不同的Uri协议&#xff0c;如file和conte…...

【Jenkins】远程API接口:Java 包装接口使用示例

jenkins-rest 库是一个面向对象的 Java 项目&#xff0c;它通过编程方式提供对 Jenkins REST API 的访问&#xff0c;以访问 Jenkins 提供的一些远程 API。它使用 jclouds 工具包构建&#xff0c;可以轻松扩展以支持更多 REST 端点。其功能集不断发展&#xff0c;用户可以通过拉…...

未能加载工具箱项问题的解决

解决办法是项目属性要设置成any cpu 在解决方案里的所有项目上右键&#xff0c;属性&#xff0c;生成&#xff0c;看目标平台是不是都设置成了any cpu...

算法模板之栈图文详解

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;算法模板、数据结构 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. ⛳️模拟栈1.1 &#x1f514;用数组模拟实现栈1.1.1 &#x1f47b;栈的定义1.1.…...

Ajax Search Pro Live WordPress网站内容实时搜索插件

点击阅读Ajax Search Pro Live WordPress网站内容实时搜索插件原文 Ajax Search Pro Live WordPress网站内容实时搜索插件是 WordPress 最好的实时搜索引擎插件。高度可定制&#xff0c;具有许多功能和选项&#xff0c;可提供最佳结果&#xff01;用更美观、更高效的搜索引擎替…...

mysql SQL执行超时问题

show variables like max_execution_time 使用这个命令查看了&#xff0c;没有设置sql执行超时时间&#xff0c;那么大概率问题就出在阿里的Druid数据库连接池出了问题 尝试着socketTimeout由60000毫秒改成10000毫秒&#xff0c;果然执行了十几秒就超时报错了 socketTime…...

51单片机基于时间片轮转的简单rtos

早就想写写这个了&#xff0c;正好赶上有点时间&#xff0c;写了一下基于51单片机的时间片轮转调度系统&#xff0c;简单的rtos&#xff0c;呵呵。直接上代码。 //基于51单片机时间片轮转的简单rtos。 #include"reg52.h" sbit led1 P2^7; sbit led2 P2^0; sbit key…...

python pycurl 安装使用

python pycurl 安装使用 本文主要讲下pycurl 安装使用. 1.安装 首先使用 pip 命令安装. pip install pycurl 输出如下: Collecting pycurlUsing cached pycurl-7.45.2.tar.gz (234 kB)ERROR: Command errored out with exit status 1:command: /usr/bin/python3 -c impor…...

C语言数据结构-排序

文章目录 1 排序的概念及运用1.1 排序的概念1.2 排序的应用 2 插入排序2.1 直接插入排序2.2 希尔排序2.3 直接排序和希尔排序对比 3 选择排序3.1 堆排序3.2 直接选择排序 4 交换排序4.1 冒泡排序4.2 快速排序4.2.1 挖坑法14.2.2 挖坑法24.2.3 挖坑法3 5 并归排序6 十万级别数据…...

Spring AOP入门指南:轻松掌握面向切面编程的基础知识

面向切面编程 1&#xff0c;AOP简介1.1 什么是AOP?1.2 AOP作用1.3 AOP核心概念 2&#xff0c;AOP入门案例2.1 需求分析2.2 思路分析2.3 环境准备2.4 AOP实现步骤步骤1:添加依赖步骤2:定义接口与实现类步骤3:定义通知类和通知步骤4:定义切入点步骤5:制作切面步骤6:将通知类配给…...

【顶级快刊】IEEE(Trans),审稿快仅2个月录用,入选CCF-B,现在投最快!

计算机类 • 好刊解读 今天小编带来IEEE旗下计算机领域顶刊&#xff0c;顶级快刊&#xff0c;CCF-B类推荐&#xff0c;如您有投稿需求&#xff0c;可作为重点关注&#xff01;后文有相关领域真实发表案例&#xff0c;供您投稿参考~ 01 期刊简介 IEEE Transactions on Affect…...

深入浅出堆排序: 高效算法背后的原理与性能

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《linux深造日志》 《高效算法》 ⛺️生活的理想&#xff0c;就是为了理想的生活! &#x1f4cb; 前言 &#x1f308;堆排序一个基于二叉堆数据结构的排序算法&#xff0c;其稳定性和排序效率在八大排序中也…...

COMSOL激光与电火花高斯热源作用下5.6版本两相流水平集仿真模型:流体传热-层流耦合研究

comsol激光、电火花&#xff08;高斯热源&#xff09;加工的水平集两相流仿真模型&#xff0c;5.6版本的&#xff0c;是流体传热—层流—两相流水平集耦合。在COMSOL Multiphysics 5.6中&#xff0c;模拟激光或电火花加工过程中的热源分布和流体行为&#xff0c;是一个相当有趣…...

超级AI数字员工源码系统,支持贴牌OEM,独立部署交付

温馨提示&#xff1a;文末有资源获取方式最近“龙虾AI”概念很火&#xff0c;到处都在讨论。但说实话&#xff0c;这类技术对普通用户而言存在明显门槛&#xff0c;部署要代码、配置要工程师、日常运行的Token成本也不低——轻度使用每月100-200元&#xff0c;重度甚至单日上千…...

PTA编程题‘Person抽象类’避坑指南:变量命名冲突、多态指针数组与输出格式化的那些坑

PTA编程题‘Person抽象类’避坑指南&#xff1a;变量命名冲突、多态指针数组与输出格式化的那些坑 在C面向对象编程的实战中&#xff0c;抽象类和派生类的设计看似简单&#xff0c;却暗藏诸多陷阱。许多初学者在完成PTA/LeetCode这类编程题时&#xff0c;往往因为一些看似微不足…...

UReport2实战:如何优雅地导出多Sheet页报表(动态/静态分页全解析)

UReport2实战&#xff1a;如何优雅地导出多Sheet页报表&#xff08;动态/静态分页全解析&#xff09; 在数据驱动的商业环境中&#xff0c;报表导出功能已成为企业级应用的标配需求。当面对海量数据时&#xff0c;传统的单Sheet页Excel导出方案往往导致文件臃肿、查阅困难。URe…...

Web AR技术深度探秘:7个创新案例重构浏览器增强现实体验

Web AR技术深度探秘&#xff1a;7个创新案例重构浏览器增强现实体验 【免费下载链接】AR.js Image tracking, Location Based AR, Marker tracking. All on the Web. 项目地址: https://gitcode.com/gh_mirrors/arj/AR.js 你是一个文章写手&#xff0c;你负责为开源项目…...

全桥LLC变换器死区时间优化实战:从IGBT硬开通到完美ZVS的调试记录

全桥LLC变换器死区时间优化实战&#xff1a;从IGBT硬开通到完美ZVS的调试记录 在电力电子领域&#xff0c;LLC谐振变换器因其高效率、高功率密度和良好的EMI特性&#xff0c;已成为中高功率应用的理想选择。然而&#xff0c;实际调试过程中&#xff0c;死区时间与励磁电感的匹配…...

STM32模拟Linux内核自动初始化机制实现

STM32模拟Linux内核自动初始化机制实现1. 项目概述1.1 技术背景在传统嵌入式开发中&#xff0c;程序通常按照顺序逻辑执行&#xff0c;当系统复杂度增加时会导致代码臃肿、模块耦合紧密。Linux内核通过initcall机制实现了模块化初始化&#xff0c;本项目在STM32平台上模拟实现了…...

深度技术解析:IDM激活脚本(IAS)的注册表锁定机制与长期试用方案

深度技术解析&#xff1a;IDM激活脚本&#xff08;IAS&#xff09;的注册表锁定机制与长期试用方案 【免费下载链接】IDM-Activation-Script IDM Activation & Trail Reset Script 项目地址: https://gitcode.com/gh_mirrors/id/IDM-Activation-Script Internet Dow…...

实战应用:使用autoclaw在快马平台快速开发销售数据监控看板

最近在做一个销售数据监控看板的需求&#xff0c;发现用autoclaw配合InsCode(快马)平台可以快速实现从开发到部署的全流程。整个过程比想象中顺畅很多&#xff0c;特别适合需要快速验证业务场景的情况。这里记录下具体实现思路和关键点&#xff1a; 数据准备与连接 首先用autoc…...

CasRel开源镜像部署教程:适配低显存(12GB)GPU的轻量级方案

CasRel开源镜像部署教程&#xff1a;适配低显存&#xff08;12GB&#xff09;GPU的轻量级方案 1. 前言&#xff1a;为什么选择这个方案 如果你正在处理文本数据&#xff0c;想要自动提取人物、地点、事件之间的关系&#xff0c;那么关系抽取技术就是你需要的工具。CasRel作为…...