当前位置: 首页 > news >正文

flink自定义process,使用状态求历史总和(scala)

es idea maven 依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.11.1</version>
</dependency>


import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.client.{Requests, RestClientBuilder}import java.time.Duration
import java.util.Propertiesobject Test {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//需要状态开启下面的配置//env.setStateBackend(new RocksDBStateBackend(s"hdfs://${namenodeID}", true))//hdfs 作为状态后端//env.enableCheckpointing(10 * 60 * 1000L)//env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000L)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间val props = new Propertiesprops.setProperty("bootstrap.servers", "host:6667") //有些是9092端口props.setProperty("group.id", "groupId")props.setProperty("retries", "10")props.setProperty("retries.backoff.ms", "100")props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000")//是否配置了权限,有的话加上下面的配置// props.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='' password='';")//props.setProperty("security.protocol", "SASL_PLAINTEXT");// props.setProperty("sasl.mechanism", "PLAIN")val httpHosts = new java.util.ArrayList[HttpHost]httpHosts.add(new HttpHost("esIPOne", 9200, "http"))httpHosts.add(new HttpHost("esIPTwo", 9200, "http"))httpHosts.add(new HttpHost("esIPThree", 9200, "http"))val esSinkBuilder = new ElasticsearchSink.Builder[ResultBean](httpHosts, new ElasticsearchSinkFunction[ResultBean] {def process(element: ResultBean, ctx: RuntimeContext, indexer: RequestIndexer) {val json = new java.util.HashMap[String, Any]json.put("@timestamp", element.ts)json.put("data", element.data)json.put("sum", element.sum)val rqst = Requests.indexRequest().index("indexName").id(element.id).source(json).opType(DocWriteRequest.OpType.INDEX)indexer.add(rqst)}})setESConf(esSinkBuilder, 5000)val myConsumer = new FlinkKafkaConsumer[DemoBean]("topicName", new DemoKafka(), props).setStartFromEarliest() //从什么时间开始读val source = env.addSource(myConsumer).uid("source-data").name("数据源").assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[DemoBean](Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner[DemoBean] {override def extractTimestamp(element: DemoBean, recordTimestamp: Long): Long = element.ts}).withIdleness(Duration.ofSeconds(5))).uid("water-marks").name("注册水位线")source.keyBy(k => k.id).process(new DemoProcess()).uid("demo-process").name("process 示例").addSink(esSinkBuilder.build()).uid("es-sink").name("数据写入es")env.execute("任务名")}private class DemoKafka() extends KafkaDeserializationSchema[DemoBean] {override def isEndOfStream(t: DemoBean): Boolean = falseoverride def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): DemoBean = {val value = new String(consumerRecord.value())val list = value.split("\t")DemoBean(list(0), list(1), list(2).toInt, list(3).toLong)}override def getProducedType: TypeInformation[DemoBean] = TypeExtractor.getForClass(classOf[DemoBean])}private class DemoProcess extends KeyedProcessFunction[String, DemoBean, ResultBean] {private var hisSumState: ValueState[Int] = _override def open(parameters: Configuration): Unit = {hisSumState = getRuntimeContext.getState(new ValueStateDescriptor("his-sum", classOf[Int]))}override def processElement(data: DemoBean, ctx: KeyedProcessFunction[String, DemoBean, ResultBean]#Context, out: Collector[ResultBean]): Unit = {val his = if (hisSumState.value() == null) 0 else hisSumState.value()val now = data.valuehisSumState.update(now)out.collect(ResultBean(data.id, data.data, his + now, data.value))}}def setESConf[T](esSinkBuilder: ElasticsearchSink.Builder[T], numMaxActions: Int) {esSinkBuilder.setBulkFlushMaxActions(numMaxActions)esSinkBuilder.setBulkFlushMaxSizeMb(10)esSinkBuilder.setBulkFlushInterval(10000)esSinkBuilder.setBulkFlushBackoff(true)esSinkBuilder.setBulkFlushBackoffDelay(2)esSinkBuilder.setBulkFlushBackoffRetries(3)esSinkBuilder.setRestClientFactory(new RestClientFactory {override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = {requestConfigBuilder.setConnectTimeout(12000)requestConfigBuilder.setSocketTimeout(90000)}})}})}private case class DemoBean(id: String, data: String, value: Int, ts: Long)private case class ResultBean(id: String, data: String, sum: Int, ts: Long)
}

相关文章:

flink自定义process,使用状态求历史总和(scala)

es idea maven 依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>1.11.1</version> </dependency> import org.apache.flink.api.common.eve…...

股指期货理论价格计算公式是什么?

股指期货&#xff0c;作为金融衍生品的一种&#xff0c;其价格与现货市场的股指价格紧密相关&#xff0c;但又受到多种因素的影响。了解股指期货理论价格的计算公式&#xff0c;对于投资者进行套利交易、风险管理等具有重要意义。本文将详细解读股指期货理论价格的计算公式&…...

解决R包依赖版本不兼容问题

ERROR: dependency ‘Matrix’ is not available for package ‘irlba’ removing ‘/root/anaconda3/envs/myview/lib/R/library/irlba’ ERROR: dependency ‘Matrix’ is not available for package ‘N2R’ removing ‘/root/anaconda3/envs/myview/lib/R/library/N2R’ ER…...

HarmonyOS开发者基础认证考试试题

文章目录 一、判断题二、单选题三、多选题 因考试只有91分&#xff0c;所以下方答案有部分错误&#xff0c;如果有发现错误&#xff0c;欢迎提出 一、判断题 1. HarmonyOS提供了基础的应用加固安全能力&#xff0c;包括混淆、加密和代码签名能力 正确 2. 用户首选项是关系型数…...

如何使用 React、TypeScript、TailwindCSS 和 Vite 创建 Chrome 插件

创建一个 Chrome 插件是一个有趣的项目&#xff0c;特别是当结合使用强大的工具如 React、TypeScript、TailwindCSS 和 Vite 时 在这篇文章中&#xff0c;我们将逐步引导完成整个过程&#xff0c;了解如何在 2024 年构建自己的 Chrome 插件。无论是经验丰富的开发者还是刚刚起…...

机器学习——Stacking

Stacking&#xff1a; 方法&#xff1a;训练多个模型(可以是强模型)&#xff0c;然后将这些模型的预测结果作为新的特征&#xff0c;输入到下一层新的模型&#xff08;可以是多个&#xff09;中进行训练&#xff0c;从而得到最终的预测结果。 代表&#xff1a;Stacking本身并没…...

在HTML中添加图片

在HTML中添加图片&#xff0c;你需要使用<img>标签。这个标签用于在网页上嵌入图像。<img>是一个空元素&#xff0c;它只包含属性&#xff0c;并且没有闭合标签。要在<img>标签中指定要显示的图像&#xff0c;你需要使用src&#xff08;source的缩写&#xf…...

R语言机器学习算法实战系列(二) SVM算法(Support Vector Machine)

文章目录 介绍原理应用方向下载数据加载R包导入数据数据预处理数据描述数据切割标准化数据设置参数训练模型预测测试数据评估模型模型准确性混淆矩阵模型评估指标ROC CurvePRC Curve特征的重要性保存模型总结系统信息介绍 支持向量机(Support Vector Machine,简称SVM)是一种…...

gdb调试使用记录

使用 GDB&#xff08;GNU Debugger&#xff09;进行问题排查是非常有效的。且可以通过core文件进行排查bug&#xff0c;core文件是程序异常崩溃的时候(段错误&#xff0c;非法指令等)&#xff0c;系统自动生成的core文件。用户可以通过core文件配合gdb调试命令&#xff0c;调试…...

ESXi安装【真机和虚拟机】(超详细)

项目简介&#xff1a; ESXi&#xff08;Elastic Sky X Integrated&#xff09;是VMware公司开发的一种裸机虚拟化管理程序&#xff0c;允许用户在单一物理服务器上运行多个虚拟机&#xff08;VM&#xff09;。它直接安装在服务器硬件上&#xff0c;而不是操作系统之上&#xff…...

基于SpringBoot+Vue的高校门禁管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、SSM项目源码 精品专栏&#xff1a;Java精选实战项目源码、Python精…...

【Linux-基础IO】C语言文件接口回顾 系统文件概念及接口

目录 一、C语言文件接口回顾 C语言基础知识 C中文件操作示例 二、系统文件概念及接口 重定向基本理解的回顾 文件的基本概念 系统调用接口 open read write close lseek 什么是当前路径 一、C语言文件接口回顾 引言&#xff1a;我们并不理解文件&#xff01;从语…...

系统架构笔记-3-信息系统基础知识

知识要点 结构化方法&#xff1a;结构是指系统内各个组成要素之间的相互联系、相互作用的框架。结构化方法也称为生命周期法&#xff0c;是一种传统的信息系统开发方法&#xff0c;由结构化分析、结构化设计、结构化程序设计三部分有机组合而成&#xff0c;精髓是自顶向下、逐…...

Linux下编程实现网络传送文件

本程序是在Linux下开发的,使用的是C语言,再结合Socket进行编程,分为客户端和服务器两个程序,即采用的是C/S架构,相应的源代码如下: 服务器端: #include <stdio.h> //#include <stdlib.h> #include <sys/socket.h> #include <netinet/in.h&g…...

【速成Redis】04 Redis 概念扫盲:事务、持久化、主从复制、哨兵模式

前言&#xff1a; 前三篇如下&#xff1a; 【速成Redis】01 Redis简介及windows上如何安装redis-CSDN博客 【速成Redis】02 Redis 五大基本数据类型常用命令-CSDN博客 【速成Redis】03 Redis 五大高级数据结构介绍及其常用命令 | 消息队列、地理空间、HyperLogLog、BitMap、…...

SQL Server 2022的数据类型

新书速览|SQL Server 2022从入门到精通&#xff1a;视频教学超值版_sql server 2022 出版社-CSDN博客 《SQL Server 2022从入门到精通&#xff08;视频教学超值版&#xff09;&#xff08;数据库技术丛书&#xff09;》(王英英)【摘要 书评 试读】- 京东图书 (jd.com) 数据类…...

Linux基础3-基础工具4(git),冯诺依曼计算机体系结构

上篇文章&#xff1a;Linux基础3-基础工具3&#xff08;make,makefile,gdb详解&#xff09;-CSDN博客 本章重点&#xff1a; 1. git简易使用 2. 冯诺依曼计算机体系结构介绍 目录 一. git使用 1.1 什么是git? 1.2 git发展史 1.3 git创建仓库 1.4 git命令操作 二. 冯诺依…...

后台数据管理系统 - 项目架构设计-Vue3+axios+Element-plus(0916)

接口文档: https://apifox.com/apidoc/shared-26c67aee-0233-4d23-aab7-08448fdf95ff/api-93850835 接口根路径&#xff1a; http://big-event-vue-api-t.itheima.net 本项目的技术栈 本项目技术栈基于 ES6、vue3、pinia、vue-router 、vite 、axios 和 element-plus http:/…...

MySQL基础篇(黑马程序员2022-01-18)

1 MySQL数据库概述 1.1 MySQL数据库的下载,安装,启动停止 1.2 数据模型 (1)关系型数据库(RDBMS) 概念&#xff1a;建立在关系模型基础上&#xff0c;由多张相互连接的二维表组成的数据库。 特点&#xff1a; A. 使用表存储数据&#xff0c;格式统一&#xff0c;便于维护。…...

nodejs 013:Prect 样式复用(multiple classes)例子

Prect 简单示例 Prect 为使用相同的现代 API 的快速 3kB React 替代方案。代码形式与 React 基本相同。部分语法区别可见 prect-differences-to-react。以下是一个 Prect 简单示例。 Button目录Button.css&#xff1a; .this {display: inline-block;padding: 3px 8px;margi…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题&#xff1a; 下面创建一个简单的Flask RESTful API示例。首先&#xff0c;我们需要创建环境&#xff0c;安装必要的依赖&#xff0c;然后…...

三维GIS开发cesium智慧地铁教程(5)Cesium相机控制

一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点&#xff1a; 路径验证&#xff1a;确保相对路径.…...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

Neo4j 集群管理:原理、技术与最佳实践深度解析

Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

HTML前端开发:JavaScript 常用事件详解

作为前端开发的核心&#xff0c;JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例&#xff1a; 1. onclick - 点击事件 当元素被单击时触发&#xff08;左键点击&#xff09; button.onclick function() {alert("按钮被点击了&#xff01;&…...

【Go语言基础【12】】指针:声明、取地址、解引用

文章目录 零、概述&#xff1a;指针 vs. 引用&#xff08;类比其他语言&#xff09;一、指针基础概念二、指针声明与初始化三、指针操作符1. &&#xff1a;取地址&#xff08;拿到内存地址&#xff09;2. *&#xff1a;解引用&#xff08;拿到值&#xff09; 四、空指针&am…...

2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...

在 Spring Boot 中使用 JSP

jsp&#xff1f; 好多年没用了。重新整一下 还费了点时间&#xff0c;记录一下。 项目结构&#xff1a; pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...