flink自定义process,使用状态求历史总和(scala)
es idea maven 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.11.1</version>
</dependency>
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.client.{Requests, RestClientBuilder}import java.time.Duration
import java.util.Propertiesobject Test {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//需要状态开启下面的配置//env.setStateBackend(new RocksDBStateBackend(s"hdfs://${namenodeID}", true))//hdfs 作为状态后端//env.enableCheckpointing(10 * 60 * 1000L)//env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000L)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间val props = new Propertiesprops.setProperty("bootstrap.servers", "host:6667") //有些是9092端口props.setProperty("group.id", "groupId")props.setProperty("retries", "10")props.setProperty("retries.backoff.ms", "100")props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000")//是否配置了权限,有的话加上下面的配置// props.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='' password='';")//props.setProperty("security.protocol", "SASL_PLAINTEXT");// props.setProperty("sasl.mechanism", "PLAIN")val httpHosts = new java.util.ArrayList[HttpHost]httpHosts.add(new HttpHost("esIPOne", 9200, "http"))httpHosts.add(new HttpHost("esIPTwo", 9200, "http"))httpHosts.add(new HttpHost("esIPThree", 9200, "http"))val esSinkBuilder = new ElasticsearchSink.Builder[ResultBean](httpHosts, new ElasticsearchSinkFunction[ResultBean] {def process(element: ResultBean, ctx: RuntimeContext, indexer: RequestIndexer) {val json = new java.util.HashMap[String, Any]json.put("@timestamp", element.ts)json.put("data", element.data)json.put("sum", element.sum)val rqst = Requests.indexRequest().index("indexName").id(element.id).source(json).opType(DocWriteRequest.OpType.INDEX)indexer.add(rqst)}})setESConf(esSinkBuilder, 5000)val myConsumer = new FlinkKafkaConsumer[DemoBean]("topicName", new DemoKafka(), props).setStartFromEarliest() //从什么时间开始读val source = env.addSource(myConsumer).uid("source-data").name("数据源").assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[DemoBean](Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner[DemoBean] {override def extractTimestamp(element: DemoBean, recordTimestamp: Long): Long = element.ts}).withIdleness(Duration.ofSeconds(5))).uid("water-marks").name("注册水位线")source.keyBy(k => k.id).process(new DemoProcess()).uid("demo-process").name("process 示例").addSink(esSinkBuilder.build()).uid("es-sink").name("数据写入es")env.execute("任务名")}private class DemoKafka() extends KafkaDeserializationSchema[DemoBean] {override def isEndOfStream(t: DemoBean): Boolean = falseoverride def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): DemoBean = {val value = new String(consumerRecord.value())val list = value.split("\t")DemoBean(list(0), list(1), list(2).toInt, list(3).toLong)}override def getProducedType: TypeInformation[DemoBean] = TypeExtractor.getForClass(classOf[DemoBean])}private class DemoProcess extends KeyedProcessFunction[String, DemoBean, ResultBean] {private var hisSumState: ValueState[Int] = _override def open(parameters: Configuration): Unit = {hisSumState = getRuntimeContext.getState(new ValueStateDescriptor("his-sum", classOf[Int]))}override def processElement(data: DemoBean, ctx: KeyedProcessFunction[String, DemoBean, ResultBean]#Context, out: Collector[ResultBean]): Unit = {val his = if (hisSumState.value() == null) 0 else hisSumState.value()val now = data.valuehisSumState.update(now)out.collect(ResultBean(data.id, data.data, his + now, data.value))}}def setESConf[T](esSinkBuilder: ElasticsearchSink.Builder[T], numMaxActions: Int) {esSinkBuilder.setBulkFlushMaxActions(numMaxActions)esSinkBuilder.setBulkFlushMaxSizeMb(10)esSinkBuilder.setBulkFlushInterval(10000)esSinkBuilder.setBulkFlushBackoff(true)esSinkBuilder.setBulkFlushBackoffDelay(2)esSinkBuilder.setBulkFlushBackoffRetries(3)esSinkBuilder.setRestClientFactory(new RestClientFactory {override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = {requestConfigBuilder.setConnectTimeout(12000)requestConfigBuilder.setSocketTimeout(90000)}})}})}private case class DemoBean(id: String, data: String, value: Int, ts: Long)private case class ResultBean(id: String, data: String, sum: Int, ts: Long)
}
相关文章:
flink自定义process,使用状态求历史总和(scala)
es idea maven 依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>1.11.1</version> </dependency> import org.apache.flink.api.common.eve…...
股指期货理论价格计算公式是什么?
股指期货,作为金融衍生品的一种,其价格与现货市场的股指价格紧密相关,但又受到多种因素的影响。了解股指期货理论价格的计算公式,对于投资者进行套利交易、风险管理等具有重要意义。本文将详细解读股指期货理论价格的计算公式&…...
解决R包依赖版本不兼容问题
ERROR: dependency ‘Matrix’ is not available for package ‘irlba’ removing ‘/root/anaconda3/envs/myview/lib/R/library/irlba’ ERROR: dependency ‘Matrix’ is not available for package ‘N2R’ removing ‘/root/anaconda3/envs/myview/lib/R/library/N2R’ ER…...
HarmonyOS开发者基础认证考试试题
文章目录 一、判断题二、单选题三、多选题 因考试只有91分,所以下方答案有部分错误,如果有发现错误,欢迎提出 一、判断题 1. HarmonyOS提供了基础的应用加固安全能力,包括混淆、加密和代码签名能力 正确 2. 用户首选项是关系型数…...
如何使用 React、TypeScript、TailwindCSS 和 Vite 创建 Chrome 插件
创建一个 Chrome 插件是一个有趣的项目,特别是当结合使用强大的工具如 React、TypeScript、TailwindCSS 和 Vite 时 在这篇文章中,我们将逐步引导完成整个过程,了解如何在 2024 年构建自己的 Chrome 插件。无论是经验丰富的开发者还是刚刚起…...
机器学习——Stacking
Stacking: 方法:训练多个模型(可以是强模型),然后将这些模型的预测结果作为新的特征,输入到下一层新的模型(可以是多个)中进行训练,从而得到最终的预测结果。 代表:Stacking本身并没…...
在HTML中添加图片
在HTML中添加图片,你需要使用<img>标签。这个标签用于在网页上嵌入图像。<img>是一个空元素,它只包含属性,并且没有闭合标签。要在<img>标签中指定要显示的图像,你需要使用src(source的缩写…...
R语言机器学习算法实战系列(二) SVM算法(Support Vector Machine)
文章目录 介绍原理应用方向下载数据加载R包导入数据数据预处理数据描述数据切割标准化数据设置参数训练模型预测测试数据评估模型模型准确性混淆矩阵模型评估指标ROC CurvePRC Curve特征的重要性保存模型总结系统信息介绍 支持向量机(Support Vector Machine,简称SVM)是一种…...
gdb调试使用记录
使用 GDB(GNU Debugger)进行问题排查是非常有效的。且可以通过core文件进行排查bug,core文件是程序异常崩溃的时候(段错误,非法指令等),系统自动生成的core文件。用户可以通过core文件配合gdb调试命令,调试…...
ESXi安装【真机和虚拟机】(超详细)
项目简介: ESXi(Elastic Sky X Integrated)是VMware公司开发的一种裸机虚拟化管理程序,允许用户在单一物理服务器上运行多个虚拟机(VM)。它直接安装在服务器硬件上,而不是操作系统之上ÿ…...
基于SpringBoot+Vue的高校门禁管理系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、SSM项目源码 精品专栏:Java精选实战项目源码、Python精…...
【Linux-基础IO】C语言文件接口回顾 系统文件概念及接口
目录 一、C语言文件接口回顾 C语言基础知识 C中文件操作示例 二、系统文件概念及接口 重定向基本理解的回顾 文件的基本概念 系统调用接口 open read write close lseek 什么是当前路径 一、C语言文件接口回顾 引言:我们并不理解文件!从语…...
系统架构笔记-3-信息系统基础知识
知识要点 结构化方法:结构是指系统内各个组成要素之间的相互联系、相互作用的框架。结构化方法也称为生命周期法,是一种传统的信息系统开发方法,由结构化分析、结构化设计、结构化程序设计三部分有机组合而成,精髓是自顶向下、逐…...
Linux下编程实现网络传送文件
本程序是在Linux下开发的,使用的是C语言,再结合Socket进行编程,分为客户端和服务器两个程序,即采用的是C/S架构,相应的源代码如下: 服务器端: #include <stdio.h> //#include <stdlib.h> #include <sys/socket.h> #include <netinet/in.h&g…...
【速成Redis】04 Redis 概念扫盲:事务、持久化、主从复制、哨兵模式
前言: 前三篇如下: 【速成Redis】01 Redis简介及windows上如何安装redis-CSDN博客 【速成Redis】02 Redis 五大基本数据类型常用命令-CSDN博客 【速成Redis】03 Redis 五大高级数据结构介绍及其常用命令 | 消息队列、地理空间、HyperLogLog、BitMap、…...
SQL Server 2022的数据类型
新书速览|SQL Server 2022从入门到精通:视频教学超值版_sql server 2022 出版社-CSDN博客 《SQL Server 2022从入门到精通(视频教学超值版)(数据库技术丛书)》(王英英)【摘要 书评 试读】- 京东图书 (jd.com) 数据类…...
Linux基础3-基础工具4(git),冯诺依曼计算机体系结构
上篇文章:Linux基础3-基础工具3(make,makefile,gdb详解)-CSDN博客 本章重点: 1. git简易使用 2. 冯诺依曼计算机体系结构介绍 目录 一. git使用 1.1 什么是git? 1.2 git发展史 1.3 git创建仓库 1.4 git命令操作 二. 冯诺依…...
后台数据管理系统 - 项目架构设计-Vue3+axios+Element-plus(0916)
接口文档: https://apifox.com/apidoc/shared-26c67aee-0233-4d23-aab7-08448fdf95ff/api-93850835 接口根路径: http://big-event-vue-api-t.itheima.net 本项目的技术栈 本项目技术栈基于 ES6、vue3、pinia、vue-router 、vite 、axios 和 element-plus http:/…...
MySQL基础篇(黑马程序员2022-01-18)
1 MySQL数据库概述 1.1 MySQL数据库的下载,安装,启动停止 1.2 数据模型 (1)关系型数据库(RDBMS) 概念:建立在关系模型基础上,由多张相互连接的二维表组成的数据库。 特点: A. 使用表存储数据,格式统一,便于维护。…...
nodejs 013:Prect 样式复用(multiple classes)例子
Prect 简单示例 Prect 为使用相同的现代 API 的快速 3kB React 替代方案。代码形式与 React 基本相同。部分语法区别可见 prect-differences-to-react。以下是一个 Prect 简单示例。 Button目录Button.css: .this {display: inline-block;padding: 3px 8px;margi…...
(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...
