当前位置: 首页 > news >正文

flink自定义process,使用状态求历史总和(scala)

es idea maven 依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.11.1</version>
</dependency>


import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.client.{Requests, RestClientBuilder}import java.time.Duration
import java.util.Propertiesobject Test {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//需要状态开启下面的配置//env.setStateBackend(new RocksDBStateBackend(s"hdfs://${namenodeID}", true))//hdfs 作为状态后端//env.enableCheckpointing(10 * 60 * 1000L)//env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000L)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间val props = new Propertiesprops.setProperty("bootstrap.servers", "host:6667") //有些是9092端口props.setProperty("group.id", "groupId")props.setProperty("retries", "10")props.setProperty("retries.backoff.ms", "100")props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000")//是否配置了权限,有的话加上下面的配置// props.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='' password='';")//props.setProperty("security.protocol", "SASL_PLAINTEXT");// props.setProperty("sasl.mechanism", "PLAIN")val httpHosts = new java.util.ArrayList[HttpHost]httpHosts.add(new HttpHost("esIPOne", 9200, "http"))httpHosts.add(new HttpHost("esIPTwo", 9200, "http"))httpHosts.add(new HttpHost("esIPThree", 9200, "http"))val esSinkBuilder = new ElasticsearchSink.Builder[ResultBean](httpHosts, new ElasticsearchSinkFunction[ResultBean] {def process(element: ResultBean, ctx: RuntimeContext, indexer: RequestIndexer) {val json = new java.util.HashMap[String, Any]json.put("@timestamp", element.ts)json.put("data", element.data)json.put("sum", element.sum)val rqst = Requests.indexRequest().index("indexName").id(element.id).source(json).opType(DocWriteRequest.OpType.INDEX)indexer.add(rqst)}})setESConf(esSinkBuilder, 5000)val myConsumer = new FlinkKafkaConsumer[DemoBean]("topicName", new DemoKafka(), props).setStartFromEarliest() //从什么时间开始读val source = env.addSource(myConsumer).uid("source-data").name("数据源").assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[DemoBean](Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner[DemoBean] {override def extractTimestamp(element: DemoBean, recordTimestamp: Long): Long = element.ts}).withIdleness(Duration.ofSeconds(5))).uid("water-marks").name("注册水位线")source.keyBy(k => k.id).process(new DemoProcess()).uid("demo-process").name("process 示例").addSink(esSinkBuilder.build()).uid("es-sink").name("数据写入es")env.execute("任务名")}private class DemoKafka() extends KafkaDeserializationSchema[DemoBean] {override def isEndOfStream(t: DemoBean): Boolean = falseoverride def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): DemoBean = {val value = new String(consumerRecord.value())val list = value.split("\t")DemoBean(list(0), list(1), list(2).toInt, list(3).toLong)}override def getProducedType: TypeInformation[DemoBean] = TypeExtractor.getForClass(classOf[DemoBean])}private class DemoProcess extends KeyedProcessFunction[String, DemoBean, ResultBean] {private var hisSumState: ValueState[Int] = _override def open(parameters: Configuration): Unit = {hisSumState = getRuntimeContext.getState(new ValueStateDescriptor("his-sum", classOf[Int]))}override def processElement(data: DemoBean, ctx: KeyedProcessFunction[String, DemoBean, ResultBean]#Context, out: Collector[ResultBean]): Unit = {val his = if (hisSumState.value() == null) 0 else hisSumState.value()val now = data.valuehisSumState.update(now)out.collect(ResultBean(data.id, data.data, his + now, data.value))}}def setESConf[T](esSinkBuilder: ElasticsearchSink.Builder[T], numMaxActions: Int) {esSinkBuilder.setBulkFlushMaxActions(numMaxActions)esSinkBuilder.setBulkFlushMaxSizeMb(10)esSinkBuilder.setBulkFlushInterval(10000)esSinkBuilder.setBulkFlushBackoff(true)esSinkBuilder.setBulkFlushBackoffDelay(2)esSinkBuilder.setBulkFlushBackoffRetries(3)esSinkBuilder.setRestClientFactory(new RestClientFactory {override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = {requestConfigBuilder.setConnectTimeout(12000)requestConfigBuilder.setSocketTimeout(90000)}})}})}private case class DemoBean(id: String, data: String, value: Int, ts: Long)private case class ResultBean(id: String, data: String, sum: Int, ts: Long)
}

相关文章:

flink自定义process,使用状态求历史总和(scala)

es idea maven 依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>1.11.1</version> </dependency> import org.apache.flink.api.common.eve…...

股指期货理论价格计算公式是什么?

股指期货&#xff0c;作为金融衍生品的一种&#xff0c;其价格与现货市场的股指价格紧密相关&#xff0c;但又受到多种因素的影响。了解股指期货理论价格的计算公式&#xff0c;对于投资者进行套利交易、风险管理等具有重要意义。本文将详细解读股指期货理论价格的计算公式&…...

解决R包依赖版本不兼容问题

ERROR: dependency ‘Matrix’ is not available for package ‘irlba’ removing ‘/root/anaconda3/envs/myview/lib/R/library/irlba’ ERROR: dependency ‘Matrix’ is not available for package ‘N2R’ removing ‘/root/anaconda3/envs/myview/lib/R/library/N2R’ ER…...

HarmonyOS开发者基础认证考试试题

文章目录 一、判断题二、单选题三、多选题 因考试只有91分&#xff0c;所以下方答案有部分错误&#xff0c;如果有发现错误&#xff0c;欢迎提出 一、判断题 1. HarmonyOS提供了基础的应用加固安全能力&#xff0c;包括混淆、加密和代码签名能力 正确 2. 用户首选项是关系型数…...

如何使用 React、TypeScript、TailwindCSS 和 Vite 创建 Chrome 插件

创建一个 Chrome 插件是一个有趣的项目&#xff0c;特别是当结合使用强大的工具如 React、TypeScript、TailwindCSS 和 Vite 时 在这篇文章中&#xff0c;我们将逐步引导完成整个过程&#xff0c;了解如何在 2024 年构建自己的 Chrome 插件。无论是经验丰富的开发者还是刚刚起…...

机器学习——Stacking

Stacking&#xff1a; 方法&#xff1a;训练多个模型(可以是强模型)&#xff0c;然后将这些模型的预测结果作为新的特征&#xff0c;输入到下一层新的模型&#xff08;可以是多个&#xff09;中进行训练&#xff0c;从而得到最终的预测结果。 代表&#xff1a;Stacking本身并没…...

在HTML中添加图片

在HTML中添加图片&#xff0c;你需要使用<img>标签。这个标签用于在网页上嵌入图像。<img>是一个空元素&#xff0c;它只包含属性&#xff0c;并且没有闭合标签。要在<img>标签中指定要显示的图像&#xff0c;你需要使用src&#xff08;source的缩写&#xf…...

R语言机器学习算法实战系列(二) SVM算法(Support Vector Machine)

文章目录 介绍原理应用方向下载数据加载R包导入数据数据预处理数据描述数据切割标准化数据设置参数训练模型预测测试数据评估模型模型准确性混淆矩阵模型评估指标ROC CurvePRC Curve特征的重要性保存模型总结系统信息介绍 支持向量机(Support Vector Machine,简称SVM)是一种…...

gdb调试使用记录

使用 GDB&#xff08;GNU Debugger&#xff09;进行问题排查是非常有效的。且可以通过core文件进行排查bug&#xff0c;core文件是程序异常崩溃的时候(段错误&#xff0c;非法指令等)&#xff0c;系统自动生成的core文件。用户可以通过core文件配合gdb调试命令&#xff0c;调试…...

ESXi安装【真机和虚拟机】(超详细)

项目简介&#xff1a; ESXi&#xff08;Elastic Sky X Integrated&#xff09;是VMware公司开发的一种裸机虚拟化管理程序&#xff0c;允许用户在单一物理服务器上运行多个虚拟机&#xff08;VM&#xff09;。它直接安装在服务器硬件上&#xff0c;而不是操作系统之上&#xff…...

基于SpringBoot+Vue的高校门禁管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、SSM项目源码 精品专栏&#xff1a;Java精选实战项目源码、Python精…...

【Linux-基础IO】C语言文件接口回顾 系统文件概念及接口

目录 一、C语言文件接口回顾 C语言基础知识 C中文件操作示例 二、系统文件概念及接口 重定向基本理解的回顾 文件的基本概念 系统调用接口 open read write close lseek 什么是当前路径 一、C语言文件接口回顾 引言&#xff1a;我们并不理解文件&#xff01;从语…...

系统架构笔记-3-信息系统基础知识

知识要点 结构化方法&#xff1a;结构是指系统内各个组成要素之间的相互联系、相互作用的框架。结构化方法也称为生命周期法&#xff0c;是一种传统的信息系统开发方法&#xff0c;由结构化分析、结构化设计、结构化程序设计三部分有机组合而成&#xff0c;精髓是自顶向下、逐…...

Linux下编程实现网络传送文件

本程序是在Linux下开发的,使用的是C语言,再结合Socket进行编程,分为客户端和服务器两个程序,即采用的是C/S架构,相应的源代码如下: 服务器端: #include <stdio.h> //#include <stdlib.h> #include <sys/socket.h> #include <netinet/in.h&g…...

【速成Redis】04 Redis 概念扫盲:事务、持久化、主从复制、哨兵模式

前言&#xff1a; 前三篇如下&#xff1a; 【速成Redis】01 Redis简介及windows上如何安装redis-CSDN博客 【速成Redis】02 Redis 五大基本数据类型常用命令-CSDN博客 【速成Redis】03 Redis 五大高级数据结构介绍及其常用命令 | 消息队列、地理空间、HyperLogLog、BitMap、…...

SQL Server 2022的数据类型

新书速览|SQL Server 2022从入门到精通&#xff1a;视频教学超值版_sql server 2022 出版社-CSDN博客 《SQL Server 2022从入门到精通&#xff08;视频教学超值版&#xff09;&#xff08;数据库技术丛书&#xff09;》(王英英)【摘要 书评 试读】- 京东图书 (jd.com) 数据类…...

Linux基础3-基础工具4(git),冯诺依曼计算机体系结构

上篇文章&#xff1a;Linux基础3-基础工具3&#xff08;make,makefile,gdb详解&#xff09;-CSDN博客 本章重点&#xff1a; 1. git简易使用 2. 冯诺依曼计算机体系结构介绍 目录 一. git使用 1.1 什么是git? 1.2 git发展史 1.3 git创建仓库 1.4 git命令操作 二. 冯诺依…...

后台数据管理系统 - 项目架构设计-Vue3+axios+Element-plus(0916)

接口文档: https://apifox.com/apidoc/shared-26c67aee-0233-4d23-aab7-08448fdf95ff/api-93850835 接口根路径&#xff1a; http://big-event-vue-api-t.itheima.net 本项目的技术栈 本项目技术栈基于 ES6、vue3、pinia、vue-router 、vite 、axios 和 element-plus http:/…...

MySQL基础篇(黑马程序员2022-01-18)

1 MySQL数据库概述 1.1 MySQL数据库的下载,安装,启动停止 1.2 数据模型 (1)关系型数据库(RDBMS) 概念&#xff1a;建立在关系模型基础上&#xff0c;由多张相互连接的二维表组成的数据库。 特点&#xff1a; A. 使用表存储数据&#xff0c;格式统一&#xff0c;便于维护。…...

nodejs 013:Prect 样式复用(multiple classes)例子

Prect 简单示例 Prect 为使用相同的现代 API 的快速 3kB React 替代方案。代码形式与 React 基本相同。部分语法区别可见 prect-differences-to-react。以下是一个 Prect 简单示例。 Button目录Button.css&#xff1a; .this {display: inline-block;padding: 3px 8px;margi…...

从怀疑到真香!2026我日常办公离不开的这款在线文字转换器太好用了

刚入职那半年我踩过太多坑&#xff1a;一周三次新人培训&#xff0c;怕漏记知识点全程录音&#xff0c;下课手动整理1小时录音要熬3小时&#xff0c;知识点散得根本没法复习&#xff1b;部门周会做完记录&#xff0c;散会就要我出整理好的纪要&#xff0c;赶工赶得饭都吃不上&a…...

AI大模型应用开发全攻略:从入门到精通,掌握LLM、RAG、Agent核心技能!“

本文全面介绍了AI大模型应用开发的核心技术和实践。从大模型API交互基础&#xff0c;到关键参数Messages和Tools的作用&#xff0c;深入解析了RAG、ReAct、Agent等应用范式。文章还探讨了Fine-tuning微调和Prompt提示词工程的重要性&#xff0c;强调工程实践与业务需求相结合。…...

BetterJoy完整配置指南:5分钟让Switch手柄在PC上完美运行

BetterJoy完整配置指南&#xff1a;5分钟让Switch手柄在PC上完美运行 【免费下载链接】BetterJoy Allows the Nintendo Switch Pro Controller, Joycons and SNES controller to be used with CEMU, Citra, Dolphin, Yuzu and as generic XInput 项目地址: https://gitcode.c…...

第3篇:系统透视——信息部门如何构建“税务友好型”IT架构

本篇导读&#xff1a;如果你是信息总监或IT负责人&#xff0c;请通读全文&#xff0c;尤其是“系统合规设计的三必须”和“现场检查SOP”&#xff1b;如果你是财税人员&#xff0c;请重点阅读“研产供销全链条的系统对接要求”和“与IT部门的协作要点”&#xff1b;如果你是老板…...

基于声卡与电流互感器的安全交流功率测量系统设计与实践

1. 项目概述&#xff1a;用声卡安全测量交流功率我一直对各种测量技术抱有浓厚的兴趣&#xff0c;毕竟“测量即认知”这句老话在今天依然适用。对于电力消耗和产出&#xff0c;没有什么比直接测量更能说明问题了。交流功率的测量&#xff0c;核心在于同时获取电压和电流的瞬时值…...

告别鼠标手!5分钟上手开源鼠标连点器MouseClick,轻松实现自动化点击

告别鼠标手&#xff01;5分钟上手开源鼠标连点器MouseClick&#xff0c;轻松实现自动化点击 【免费下载链接】MouseClick &#x1f5b1;️ MouseClick &#x1f5b1;️ 是一款功能强大的鼠标连点器和管理工具&#xff0c;采用 QT Widget 开发 &#xff0c;具备跨平台兼容性 。软…...

【MySQL数据库 | 第一篇】 概述

数据库相关概念&#xff1a; 数据库(Database)&#xff1a;数据库是指一组有组织的数据的集合&#xff0c;通过计算机程序进行管理和访问。数据库管理系统&#xff1a;操纵和管理数据库的大型软件SQL&#xff1a;操作关系型数据库的编程语言&#xff0c;定义了一套操作关系型数…...

HDI 高密度互连板阶数的深度理解

一、概述高密度互连板&#xff08;High Density Interconnector, HDI&#xff09;是通过激光微孔技术和逐层积层工艺实现高密度布线的印制电路板。其阶数划分是行业内统一的技术标准&#xff0c;核心依据为独立积层压合次数与配套激光盲孔制程次数&#xff0c;而非单面层数或钻…...

Allegro PCB设计小技巧:如何让Route Keepout区域既能走线又能打过孔(附详细步骤图)

Allegro PCB设计实战&#xff1a;Route Keepout区域的灵活控制技巧 在高速PCB设计中&#xff0c;Route Keepout区域的管理常常让工程师陷入两难境地——元件封装自带的限制区域与实际布线需求产生冲突。特别是处理PCIE等高速信号时&#xff0c;这种矛盾尤为突出。传统做法要么完…...

用Azure Kinect DK和Body Tracking SDK,5分钟实现一个实时人体骨骼点检测Demo(C++版)

5分钟实战&#xff1a;用Azure Kinect DK实现实时人体骨骼点追踪&#xff08;C版&#xff09; 当你第一次拿到Azure Kinect DK时&#xff0c;最令人兴奋的莫过于它强大的人体追踪能力。这款深度相机不仅能捕捉高清彩色图像&#xff0c;更能通过AI算法实时重建人体骨骼关节点。本…...