当前位置: 首页 > news >正文

flink如何写入es

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 一、写入到Elasticsearch5
  • 二、写入到Elasticsearch7
  • 总结


前言

Flink sink 流数据写入到es5和es7的简单示例。


一、写入到Elasticsearch5

  • pom maven依赖
        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch5_2.11</artifactId><version>${flink.version}</version></dependency>
  • 代码如下(示例):
public class Es5SinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"));Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"));Row row3=Row.of("张三","002",getTimestamp("2016-10-24 21:51:06"));Row row4=Row.of("李四","003",getTimestamp("2016-10-24 21:50:56"));Row row5=Row.of("李四","004",getTimestamp("2016-10-24 00:48:36"));Row row6=Row.of("王五","005",getTimestamp("2016-10-24 00:48:36"));DataStreamSource<Row> source =env.fromElements(row,row2,row3,row4,row5,row6);Map<String, String> config = new HashMap<>();
//        config.put("cluster.name", "my-cluster-name");
//        config.put("bulk.flush.max.actions", "1");List<InetSocketAddress> transportAddresses = new ArrayList<>();transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.68.8.60"), 9300));//Sink操作DataStreamSink<Row> rowDataStreamSink = source.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<Row>() {public IndexRequest createIndexRequest(Row element) {Map<String, Object> json = new HashMap<>();json.put("name22", element.getField(0).toString());json.put("no22", element.getField(1));json.put("age", 34);json.put("create_time", element.getField(2));return Requests.indexRequest().index("cc").type("mtype").id(element.getField(1).toString()).source(json);}@Overridepublic void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {//利用requestIndexer进行发送请求,写入数据indexer.add(createIndexRequest(element));}}));env.execute("es demo");}private static java.sql.Timestamp getTimestamp(String str) throws Exception {
//		String string = "2016-10-24 21:59:06";SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");java.util.Date date=sdf.parse(str);java.sql.Timestamp s = new java.sql.Timestamp(date.getTime());return s;}

二、写入到Elasticsearch7

  • pom maven依赖
        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
  • 代码如下(示例):
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class EsSinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"));Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"));Row row3=Row.of("张三","002",getTimestamp("2016-10-24 21:51:06"));Row row4=Row.of("李四","003",getTimestamp("2016-10-24 21:50:56"));Row row5=Row.of("李四","004",getTimestamp("2016-10-24 00:48:36"));Row row6=Row.of("王五","005",getTimestamp("2016-10-24 00:48:36"));DataStreamSource<Row> source =env.fromElements(row,row2,row3,row4,row5,row6);Map<String, String> config = new HashMap<>();
//        config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
//        config.put("bulk.flush.max.actions", "1");List<HttpHost> hosts = new ArrayList<>();hosts.add(new HttpHost("10.68.8.69",9200,"http"));ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<Row>(hosts,new ElasticsearchSinkFunction<Row>() {public IndexRequest createIndexRequest(Row element) {Map<String, Object> json = new HashMap<>();json.put("name22", element.getField(0).toString());json.put("no22", element.getField(1));json.put("age", 34);
//                json.put("create_time", element.getField(2));return Requests.indexRequest().index("cc").id(element.getField(1).toString()).source(json);}@Overridepublic void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {//利用requestIndexer进行发送请求,写入数据indexer.add(createIndexRequest(element));}});esSinkBuilder.setBulkFlushMaxActions(100);//Sink操作source.addSink(esSinkBuilder.build());env.execute("es demo");}private static java.sql.Timestamp getTimestamp(String str) throws Exception {
//		String string = "2016-10-24 21:59:06";SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");java.util.Date date=sdf.parse(str);java.sql.Timestamp s = new java.sql.Timestamp(date.getTime());return s;}
}

总结

flink写入es5和es7 的区别是引入不同的flink-connector-elasticsearch,es7已没有type的概念故无需再设置type。

相关文章:

flink如何写入es

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、写入到Elasticsearch5二、写入到Elasticsearch7总结 前言 Flink sink 流数据写入到es5和es7的简单示例。 一、写入到Elasticsearch5 pom maven依赖 <d…...

Java、Python、C++和C#的界面开发框架和工具的重新介绍

好的&#xff0c;以下是Java、Python、C和C#的界面开发框架和工具的重新介绍&#xff1a; Java界面开发&#xff1a; Swing: 是Java提供的一个基于组件的GUI工具包&#xff0c;可以创建跨平台的图形用户界面。它提供了丰富的组件和布局管理器&#xff0c;使得界面开发相对简单。…...

Java二叉树的遍历以及最大深度问题

Java学习面试指南&#xff1a;https://javaxiaobear.cn 1、树的相关概念 1、树的基本定义 树是我们计算机中非常重要的一种数据结构&#xff0c;同时使用树这种数据结构&#xff0c;可以描述现实生活中的很多事物&#xff0c;例如家谱、单位的组织架构、等等。 树是由n&#…...

Apollo 9.0搭建问题记录

虚拟机安装 可以看这个&#xff1a;https://blog.csdn.net/qq_45138078/article/details/129815408 写的很详细 内存 为了学习 Apollo &#xff0c;所以只是使用了虚拟机&#xff0c;内存得大一点&#xff08;128G&#xff09;&#xff0c;第一次&#xff0c;就是因为分配内…...

【心得】PHP文件包含高级利用攻击面个人笔记

目录 一、nginx日志文件包含 二、临时文件包含 三、php的session文件包含 四、pear文件包含 五 、远程文件包含 文件包含 include "/var/www/html/flag.php"; 一 文件名可控 $file$_GET[file]; include $file.".php"; //用php伪协议 &#xff0…...

[scala] 列表常见用法

文章目录 不可变列表 List可变列表 ListBuffer 不可变列表 List 在 Scala 中&#xff0c;列表是一种不可变的数据结构&#xff0c;用于存储一系列元素。列表使用 List 类来表示&#xff0c;它提供了许多方法来操作和处理列表。 下面是一些常见的使用列表的示例&#xff1a; 创…...

python 使用urllib3发起post请求,携带json参数

当通过python脚本&#xff0c;发起http post请求&#xff0c;网络上大多是通过fields传递数据&#xff0c;然而这样&#xff0c;服务器收到的请求&#xff0c;但无法解析json数据。类似这些链接&#xff1a; Python urllib3库使用指南 软件测试|Python urllib3库使用指南 p…...

深入理解堆(Heap):一个强大的数据结构

. 个人主页&#xff1a;晓风飞 专栏&#xff1a;数据结构|Linux|C语言 路漫漫其修远兮&#xff0c;吾将上下而求索 文章目录 前言堆的实现基本操作结构体定义初始化堆&#xff08;HeapInit&#xff09;销毁堆&#xff08;HeapDestroy&#xff09; 重要函数交换函数&#xff08;…...

抖音在线查权重系统源码,附带查询接口

抖音权重在线查询只需输入抖音主页链接&#xff0c;即可查询作品情况。 搭建教程 上传源码并解压 修改数据库“bygoukai.sql” 修改“config.php” 如需修改水印请修改第40行 如需修改限制次数&#xff0c;请修改第156行 访问域名user.php即可查看访问用户&#xff0c;停…...

Spring Framework和SpringBoot的区别

目录 一、前言 二、什么是Spring 三、什么是Spring Framework 四、什么是SpringBoot 五、使用Spring Framework构建工程 六、使用SpringBoot构建工程 七、总结 一、前言 作为Java程序员&#xff0c;我们都听说过Spring&#xff0c;也都使用过Spring的相关产品&#xff0…...

2024--Django平台开发-Django知识点(三)

day03 django知识点 项目相关路由相关 urls.py视图相关 views.py模版相关 templates资源相关 static/media 1.项目相关 新项目 开发时&#xff0c;可能遇到使用其他的版本。虚拟环境 老项目 打开项目虚拟环境 1.1 关于新项目 1.系统解释器命令行【学习】 C:/python38- p…...

Github 2024-01-08开源项目周报 Top14

根据Github Trendings的统计&#xff0c;本周(2024-01-08统计)共有14个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量Python项目5TypeScript项目3C项目2Dart项目1QML项目1Go项目1Shell项目1Rust项目1JavaScript项目1C#项目1 免费…...

vue3 的内置组件汇总

官方给出的说明&#xff1a; Fragment: Vue 3 组件不再要求有一个唯一的根节点&#xff0c;清除了很多无用的占位 div。Teleport: 允许组件渲染在别的元素内&#xff0c;主要开发弹窗组件的时候特别有用。Suspense: 异步组件&#xff0c;更方便开发有异步请求的组件。 一、fr…...

ARM工控机Node-red使用教程

嵌入式ARM工控机Node-red安装教程 从前车马很慢书信很远&#xff0c;而现在人们不停探索“科技改变生活”。 智能终端的出现改变了我们的生活方式&#xff0c;钡铼技术嵌入式工控机协助您灵活布建能源管理、大楼自动化、工业自动化、电动车充电站等各种多元性IoT应用&#xff…...

Visual Studio 发布程序自动更新 ClickOnce和AutoUpdater测试

文章目录 前言运行环境ClickOnce&#xff08;Visual Studio 程序发布&#xff09;IIS新建文件夹C# 控制台测试安装测试更新测试卸载 AutoUpdaterDotNET实现原理简单使用新建一个WPF项目 代码封装自动更新代码封装简单使用 总结 前言 虽然写的大部分都是不联网项目&#xff0c;…...

Codeforces Round 761 (Div. 2) E. Christmas Chocolates(思维题 树的直径 二进制性质 lca)

题目 n(n<2e5)个值&#xff0c;第i个值ai(0<ai<1e9)&#xff0c;所有ai两两不同 初始时&#xff0c;选择两个位置x,y(x≠y)&#xff0c;代表需要对这两个位置进行操作&#xff0c;要把其中一个值变成另一个 你可以执行若干次操作&#xff0c;每一次&#xff0c;你可…...

知识图谱之汽车实战案例综述与前瞻分析

知识图谱的前置介绍 什么是知识图谱 知识图谱本质(Knowledge Graph&#xff09;上是一种叫做语义网络(semantic network &#xff09; 的知识库&#xff0c;即具有有向图结构的一个知识库&#xff1b;图的结点代表实体&#xff08;entity&#xff09;或者概念&#xff08;con…...

网关Gateway

什么是网关? 网关实质上是一个网络通向其他网络的 IP 地址&#xff0c;是当前微服务项目的"统一入口"。 网关能做什么&#xff1f; 反向代理 、鉴权、 流量控制、 熔断、 日志监控等 图片原文&#xff1a;http://t.csdnimg.cn/SvUJh 核心概念 Router&#xff08;…...

java 生成一个当前时间的时间搓

开发过程中 用时间搓数值格式存储 会更加精准 那么 我们在一些日常增删查改中就可以用时间搓来记录操作时间 就一行代码 long timestamp System.currentTimeMillis();他就能生成当前时间的时间搓 运行结果如下 然后 我们可以在 http://shijianchuo.wiicha.com/ 上进行转换查…...

金融中IC和IR的定义

当谈到金融领域时&#xff0c;IC&#xff08;Information Coefficient&#xff09;和IR&#xff08;Information Ratio&#xff09;通常是用来评估投资组合管理绩效的指标。它们都涉及到投资者对信息的利用和管理的效果。 信息系数&#xff08;IC - Information Coefficient&a…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...

DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”

目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...

如何在网页里填写 PDF 表格?

有时候&#xff0c;你可能希望用户能在你的网站上填写 PDF 表单。然而&#xff0c;这件事并不简单&#xff0c;因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件&#xff0c;但原生并不支持编辑或填写它们。更糟的是&#xff0c;如果你想收集表单数据&#xff…...

Angular微前端架构:Module Federation + ngx-build-plus (Webpack)

以下是一个完整的 Angular 微前端示例&#xff0c;其中使用的是 Module Federation 和 npx-build-plus 实现了主应用&#xff08;Shell&#xff09;与子应用&#xff08;Remote&#xff09;的集成。 &#x1f6e0;️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

三分算法与DeepSeek辅助证明是单峰函数

前置 单峰函数有唯一的最大值&#xff0c;最大值左侧的数值严格单调递增&#xff0c;最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值&#xff0c;最小值左侧的数值严格单调递减&#xff0c;最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...