当前位置: 首页 > news >正文

flink自定义process,使用状态求历史总和(scala)

es idea maven 依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.11.1</version>
</dependency>


import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.client.{Requests, RestClientBuilder}import java.time.Duration
import java.util.Propertiesobject Test {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//需要状态开启下面的配置//env.setStateBackend(new RocksDBStateBackend(s"hdfs://${namenodeID}", true))//hdfs 作为状态后端//env.enableCheckpointing(10 * 60 * 1000L)//env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000L)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间val props = new Propertiesprops.setProperty("bootstrap.servers", "host:6667") //有些是9092端口props.setProperty("group.id", "groupId")props.setProperty("retries", "10")props.setProperty("retries.backoff.ms", "100")props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000")//是否配置了权限,有的话加上下面的配置// props.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='' password='';")//props.setProperty("security.protocol", "SASL_PLAINTEXT");// props.setProperty("sasl.mechanism", "PLAIN")val httpHosts = new java.util.ArrayList[HttpHost]httpHosts.add(new HttpHost("esIPOne", 9200, "http"))httpHosts.add(new HttpHost("esIPTwo", 9200, "http"))httpHosts.add(new HttpHost("esIPThree", 9200, "http"))val esSinkBuilder = new ElasticsearchSink.Builder[ResultBean](httpHosts, new ElasticsearchSinkFunction[ResultBean] {def process(element: ResultBean, ctx: RuntimeContext, indexer: RequestIndexer) {val json = new java.util.HashMap[String, Any]json.put("@timestamp", element.ts)json.put("data", element.data)json.put("sum", element.sum)val rqst = Requests.indexRequest().index("indexName").id(element.id).source(json).opType(DocWriteRequest.OpType.INDEX)indexer.add(rqst)}})setESConf(esSinkBuilder, 5000)val myConsumer = new FlinkKafkaConsumer[DemoBean]("topicName", new DemoKafka(), props).setStartFromEarliest() //从什么时间开始读val source = env.addSource(myConsumer).uid("source-data").name("数据源").assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[DemoBean](Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner[DemoBean] {override def extractTimestamp(element: DemoBean, recordTimestamp: Long): Long = element.ts}).withIdleness(Duration.ofSeconds(5))).uid("water-marks").name("注册水位线")source.keyBy(k => k.id).process(new DemoProcess()).uid("demo-process").name("process 示例").addSink(esSinkBuilder.build()).uid("es-sink").name("数据写入es")env.execute("任务名")}private class DemoKafka() extends KafkaDeserializationSchema[DemoBean] {override def isEndOfStream(t: DemoBean): Boolean = falseoverride def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): DemoBean = {val value = new String(consumerRecord.value())val list = value.split("\t")DemoBean(list(0), list(1), list(2).toInt, list(3).toLong)}override def getProducedType: TypeInformation[DemoBean] = TypeExtractor.getForClass(classOf[DemoBean])}private class DemoProcess extends KeyedProcessFunction[String, DemoBean, ResultBean] {private var hisSumState: ValueState[Int] = _override def open(parameters: Configuration): Unit = {hisSumState = getRuntimeContext.getState(new ValueStateDescriptor("his-sum", classOf[Int]))}override def processElement(data: DemoBean, ctx: KeyedProcessFunction[String, DemoBean, ResultBean]#Context, out: Collector[ResultBean]): Unit = {val his = if (hisSumState.value() == null) 0 else hisSumState.value()val now = data.valuehisSumState.update(now)out.collect(ResultBean(data.id, data.data, his + now, data.value))}}def setESConf[T](esSinkBuilder: ElasticsearchSink.Builder[T], numMaxActions: Int) {esSinkBuilder.setBulkFlushMaxActions(numMaxActions)esSinkBuilder.setBulkFlushMaxSizeMb(10)esSinkBuilder.setBulkFlushInterval(10000)esSinkBuilder.setBulkFlushBackoff(true)esSinkBuilder.setBulkFlushBackoffDelay(2)esSinkBuilder.setBulkFlushBackoffRetries(3)esSinkBuilder.setRestClientFactory(new RestClientFactory {override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = {requestConfigBuilder.setConnectTimeout(12000)requestConfigBuilder.setSocketTimeout(90000)}})}})}private case class DemoBean(id: String, data: String, value: Int, ts: Long)private case class ResultBean(id: String, data: String, sum: Int, ts: Long)
}

相关文章:

flink自定义process,使用状态求历史总和(scala)

es idea maven 依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>1.11.1</version> </dependency> import org.apache.flink.api.common.eve…...

股指期货理论价格计算公式是什么?

股指期货&#xff0c;作为金融衍生品的一种&#xff0c;其价格与现货市场的股指价格紧密相关&#xff0c;但又受到多种因素的影响。了解股指期货理论价格的计算公式&#xff0c;对于投资者进行套利交易、风险管理等具有重要意义。本文将详细解读股指期货理论价格的计算公式&…...

解决R包依赖版本不兼容问题

ERROR: dependency ‘Matrix’ is not available for package ‘irlba’ removing ‘/root/anaconda3/envs/myview/lib/R/library/irlba’ ERROR: dependency ‘Matrix’ is not available for package ‘N2R’ removing ‘/root/anaconda3/envs/myview/lib/R/library/N2R’ ER…...

HarmonyOS开发者基础认证考试试题

文章目录 一、判断题二、单选题三、多选题 因考试只有91分&#xff0c;所以下方答案有部分错误&#xff0c;如果有发现错误&#xff0c;欢迎提出 一、判断题 1. HarmonyOS提供了基础的应用加固安全能力&#xff0c;包括混淆、加密和代码签名能力 正确 2. 用户首选项是关系型数…...

如何使用 React、TypeScript、TailwindCSS 和 Vite 创建 Chrome 插件

创建一个 Chrome 插件是一个有趣的项目&#xff0c;特别是当结合使用强大的工具如 React、TypeScript、TailwindCSS 和 Vite 时 在这篇文章中&#xff0c;我们将逐步引导完成整个过程&#xff0c;了解如何在 2024 年构建自己的 Chrome 插件。无论是经验丰富的开发者还是刚刚起…...

机器学习——Stacking

Stacking&#xff1a; 方法&#xff1a;训练多个模型(可以是强模型)&#xff0c;然后将这些模型的预测结果作为新的特征&#xff0c;输入到下一层新的模型&#xff08;可以是多个&#xff09;中进行训练&#xff0c;从而得到最终的预测结果。 代表&#xff1a;Stacking本身并没…...

在HTML中添加图片

在HTML中添加图片&#xff0c;你需要使用<img>标签。这个标签用于在网页上嵌入图像。<img>是一个空元素&#xff0c;它只包含属性&#xff0c;并且没有闭合标签。要在<img>标签中指定要显示的图像&#xff0c;你需要使用src&#xff08;source的缩写&#xf…...

R语言机器学习算法实战系列(二) SVM算法(Support Vector Machine)

文章目录 介绍原理应用方向下载数据加载R包导入数据数据预处理数据描述数据切割标准化数据设置参数训练模型预测测试数据评估模型模型准确性混淆矩阵模型评估指标ROC CurvePRC Curve特征的重要性保存模型总结系统信息介绍 支持向量机(Support Vector Machine,简称SVM)是一种…...

gdb调试使用记录

使用 GDB&#xff08;GNU Debugger&#xff09;进行问题排查是非常有效的。且可以通过core文件进行排查bug&#xff0c;core文件是程序异常崩溃的时候(段错误&#xff0c;非法指令等)&#xff0c;系统自动生成的core文件。用户可以通过core文件配合gdb调试命令&#xff0c;调试…...

ESXi安装【真机和虚拟机】(超详细)

项目简介&#xff1a; ESXi&#xff08;Elastic Sky X Integrated&#xff09;是VMware公司开发的一种裸机虚拟化管理程序&#xff0c;允许用户在单一物理服务器上运行多个虚拟机&#xff08;VM&#xff09;。它直接安装在服务器硬件上&#xff0c;而不是操作系统之上&#xff…...

基于SpringBoot+Vue的高校门禁管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、SSM项目源码 精品专栏&#xff1a;Java精选实战项目源码、Python精…...

【Linux-基础IO】C语言文件接口回顾 系统文件概念及接口

目录 一、C语言文件接口回顾 C语言基础知识 C中文件操作示例 二、系统文件概念及接口 重定向基本理解的回顾 文件的基本概念 系统调用接口 open read write close lseek 什么是当前路径 一、C语言文件接口回顾 引言&#xff1a;我们并不理解文件&#xff01;从语…...

系统架构笔记-3-信息系统基础知识

知识要点 结构化方法&#xff1a;结构是指系统内各个组成要素之间的相互联系、相互作用的框架。结构化方法也称为生命周期法&#xff0c;是一种传统的信息系统开发方法&#xff0c;由结构化分析、结构化设计、结构化程序设计三部分有机组合而成&#xff0c;精髓是自顶向下、逐…...

Linux下编程实现网络传送文件

本程序是在Linux下开发的,使用的是C语言,再结合Socket进行编程,分为客户端和服务器两个程序,即采用的是C/S架构,相应的源代码如下: 服务器端: #include <stdio.h> //#include <stdlib.h> #include <sys/socket.h> #include <netinet/in.h&g…...

【速成Redis】04 Redis 概念扫盲:事务、持久化、主从复制、哨兵模式

前言&#xff1a; 前三篇如下&#xff1a; 【速成Redis】01 Redis简介及windows上如何安装redis-CSDN博客 【速成Redis】02 Redis 五大基本数据类型常用命令-CSDN博客 【速成Redis】03 Redis 五大高级数据结构介绍及其常用命令 | 消息队列、地理空间、HyperLogLog、BitMap、…...

SQL Server 2022的数据类型

新书速览|SQL Server 2022从入门到精通&#xff1a;视频教学超值版_sql server 2022 出版社-CSDN博客 《SQL Server 2022从入门到精通&#xff08;视频教学超值版&#xff09;&#xff08;数据库技术丛书&#xff09;》(王英英)【摘要 书评 试读】- 京东图书 (jd.com) 数据类…...

Linux基础3-基础工具4(git),冯诺依曼计算机体系结构

上篇文章&#xff1a;Linux基础3-基础工具3&#xff08;make,makefile,gdb详解&#xff09;-CSDN博客 本章重点&#xff1a; 1. git简易使用 2. 冯诺依曼计算机体系结构介绍 目录 一. git使用 1.1 什么是git? 1.2 git发展史 1.3 git创建仓库 1.4 git命令操作 二. 冯诺依…...

后台数据管理系统 - 项目架构设计-Vue3+axios+Element-plus(0916)

接口文档: https://apifox.com/apidoc/shared-26c67aee-0233-4d23-aab7-08448fdf95ff/api-93850835 接口根路径&#xff1a; http://big-event-vue-api-t.itheima.net 本项目的技术栈 本项目技术栈基于 ES6、vue3、pinia、vue-router 、vite 、axios 和 element-plus http:/…...

MySQL基础篇(黑马程序员2022-01-18)

1 MySQL数据库概述 1.1 MySQL数据库的下载,安装,启动停止 1.2 数据模型 (1)关系型数据库(RDBMS) 概念&#xff1a;建立在关系模型基础上&#xff0c;由多张相互连接的二维表组成的数据库。 特点&#xff1a; A. 使用表存储数据&#xff0c;格式统一&#xff0c;便于维护。…...

nodejs 013:Prect 样式复用(multiple classes)例子

Prect 简单示例 Prect 为使用相同的现代 API 的快速 3kB React 替代方案。代码形式与 React 基本相同。部分语法区别可见 prect-differences-to-react。以下是一个 Prect 简单示例。 Button目录Button.css&#xff1a; .this {display: inline-block;padding: 3px 8px;margi…...

嵌入式NTP客户端高精度时间同步实现

1. NTP客户端库深度解析&#xff1a;嵌入式系统中的高精度时间同步实现1.1 项目背景与工程痛点NTP&#xff08;Network Time Protocol&#xff09;是嵌入式设备实现网络时间同步的核心协议。在工业控制、数据采集、日志记录等场景中&#xff0c;毫秒级甚至亚毫秒级的时间精度直…...

避坑指南:STM32CubeIDE按键消抖到底怎么做?HAL库延时函数调用详解

STM32按键消抖实战&#xff1a;从HAL_Delay到定时器的进阶方案 按键消抖是嵌入式开发中最基础却又最容易被忽视的技术细节之一。许多开发者在初次实现按键功能时&#xff0c;往往直接读取GPIO状态就认为完成了任务&#xff0c;直到产品进入现场测试阶段才发现按键响应不稳定、误…...

从零搭建你的数字工作室:一套搞定Ps、Pr、Ae、C4D、达芬奇的电脑配置与软件协同方案

从零搭建你的数字工作室&#xff1a;一套搞定Ps、Pr、Ae、C4D、达芬奇的电脑配置与软件协同方案 当你决定投身数字内容创作——无论是成为UP主、独立导演&#xff0c;还是开设小型广告工作室&#xff0c;一套能流畅运行主流创意软件的工作站是必不可少的。但面对Adobe全家桶、…...

BYD Battery Emulator:让电动汽车电池成为家庭储能的智能桥梁

BYD Battery Emulator&#xff1a;让电动汽车电池成为家庭储能的智能桥梁 【免费下载链接】BYD-Battery-Emulator-For-Gen24 This software enables EV battery packs to be used for stationary storage in combination with solar inverters. 项目地址: https://gitcode.co…...

Nunchaku-FLUX.1-dev镜像安全加固:非root运行/最小权限/网络策略限制

Nunchaku-FLUX.1-dev镜像安全加固&#xff1a;非root运行/最小权限/网络策略限制 1. 为什么需要安全加固&#xff1f; 当你把Nunchaku-FLUX.1-dev这个强大的文生图模型部署在自己的服务器上时&#xff0c;可能更多关注的是它能生成多么精美的图片&#xff0c;或者处理中文提示…...

BLIP-Diffusion实战解析:如何通过预训练主题表示实现高效可控的图像生成

1. BLIP-Diffusion的核心创新点解析 第一次看到BLIP-Diffusion这个模型时&#xff0c;最让我惊讶的是它解决了一个困扰行业多年的难题&#xff1a;如何在不需要反复微调的情况下&#xff0c;让AI生成的图像既保持输入主题的特征&#xff0c;又能灵活响应文本指令。这就像教一个…...

OpenClaw备份与迁移:GLM-4.7-Flash项目完整转移指南

OpenClaw备份与迁移&#xff1a;GLM-4.7-Flash项目完整转移指南 1. 为什么需要完整的迁移方案 上周我的主力开发机突然硬盘故障&#xff0c;导致所有数据丢失。虽然OpenClaw本身是开源工具可以重装&#xff0c;但那些精心调试的配置文件、自定义技能和对接好的GLM-4.7-Flash模…...

有关数组的学习

数组的概念简介数组是编程中最基础也最常用的数据结构之一&#xff0c;理解它能帮你高效管理一组同类型的数据。1. 什么是数组&#xff1f;核心概念同类型&#xff1a;数组里的所有元素必须是相同的数据类型&#xff08;如全是 int 或全是 float&#xff09;。连续内存&#xf…...

避坑指南:解决多Livox雷达在ROS中TF树报错‘extrapolation into the past’的完整流程

避坑指南&#xff1a;解决多Livox雷达在ROS中TF树报错‘extrapolation into the past’的完整流程 当你在ROS系统中整合多个Livox雷达时&#xff0c;突然遭遇[WARN] Lookup would require extrapolation into the past的警告信息&#xff0c;Rviz中点云显示异常或TF树断裂&…...

PHP反序列化实战:手把手教你绕过CTF题中的字符检查与属性保护

PHP反序列化漏洞实战&#xff1a;从CTF解题到真实场景防御 在网络安全竞赛中&#xff0c;PHP反序列化漏洞一直是高频考点。这类漏洞不仅存在于CTF比赛中&#xff0c;也广泛存在于真实世界的Web应用中。本文将从一个典型CTF题目入手&#xff0c;深入剖析PHP反序列化的攻击手法与…...