当前位置: 首页 > news >正文

Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

目录

StreamExecutionEnvironment

Watermark

watermark策略简介

使用 Watermark 策略

内置水印生成器

处理空闲数据源

算子处理 Watermark 的方式

创建DataStream的方式

通过list对象创建

​​​​​​使用DataStream connectors创建

使用Table & SQL connectors创建


StreamExecutionEnvironment

编写一个 Flink Python DataStream API 程序,首先需要声明一个执行环境StreamExecutionEnvironment,这是流式程序执行的上下文。

你将通过它来设置作业的属性(例如默认并发度、重启策略等)、创建源、并最终触发作业的执行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)

创建了 StreamExecutionEnvironment 之后,你可以使用它来声明数据源。数据源从外部系统(如 Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到 Flink 作业里。

为了简单起见,本教程读取文件作为数据源。

ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source"
)

Watermark

大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。

为了解决乱序数据,flink引入watermark。引入watermark机制则会等待晚到的数据一段时间,等待时间到则触发计算,如果数据延迟很大,通常也会被丢弃或者另外处理。

为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。

watermark策略简介

时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 WatermarkGenerator 来配置 watermark 的生成方式。

使用 Flink API 时需要设置一个同时包含 TimestampAssigner WatermarkGenerator WatermarkStrategyWatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。

使用 Watermark 策略

WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是直接在数据源上使用,第二种是直接在非数据源的操作之后使用。

第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy 意味着你必须使用特定数据源接口。

仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy

内置水印生成器

水印策略定义了如何在流源中生成水印。WatermarkStrategy是生成水印的WatermarkGenerator和分配记录内部时间戳的TimestampAssigner的生成器/工厂。

BoundedOutOfOrdernessDuration),为创建WatermarkStrategy常见的内置策略。

for_bound_out_of_ordernness(max_out_of_ordernesspyflink.common.time.Duration)为记录无序的情况创建水印策略,但可以设置事件无序程度的上限。

无序绑定B意味着一旦遇到时间戳为T的事件,就不会再出现早于(T-B)的事件。

for_bound_out_of_ordernness(5)

for_mononous_timestamps()为时间戳单调递增的情况创建水印策略。

水印是定期生成的,并严格遵循数据中的最新时间戳。该策略引入的延迟主要是生成水印的周期间隔。

WatermarkStrategy.for_monotonous_timestamps()

with_timestamp_assigner(timestamp_assigner:pyflink.common.watermark_strategy.TimestampAssigner)

创建一个新的WatermarkStrategy,该策略通过实现TimestampAssigner接口使用给定的TimestampAssigner。

参数: timestamp_assigner 给定的TimestampAssigner。

Return: 包装TimestampAssigner的WaterMarkStrategy。

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()

    with_timestamp_assigner(MyTimestampAssigner())

处理空闲数据源

如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。

为了解决这个问题,你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。WatermarkStrategy 为此提供了一个工具接口withIdleness(Duration.ofMinutes(1))

with_idleness(idle_timeout:pyfrink.common.time.Duration)

创建一个新的丰富的WatermarkStrategy,它也在创建的WatermarkGenerator中执行空闲检测。

参数:idle_timeout–空闲超时。

Return:配置了空闲检测的新水印策略。

算子处理 Watermark 的方式

一般情况下,在将 watermark 转发到下游之前,需要算子对其进行触发的事件完全进行处理。例如,WindowOperator 将首先计算该 watermark 触发的所有窗口数据,当且仅当由此 watermark 触发计算进而生成的所有数据被转发到下游之后,其才会被发送到下游。换句话说,由于此 watermark 的出现而产生的所有数据元素都将在此 watermark 之前发出。

相同的规则也适用于 TwoInputstreamOperator。但是,在这种情况下,算子当前的 watermark 会取其两个输入的最小值。

创建DataStream的方式

通过list对象创建

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],type_info=Types.ROW([Types.INT(), Types.STRING()]))

​​​​​​使用DataStream connectors创建

使用add_source函数,此函数仅支持FlinkKafkaConsumer,仅在streaming执行模式下使用

from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumerenv = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")
deserialization_schema = JsonRowDeserializationSchema.builder() \.type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()kafka_consumer = FlinkKafkaConsumer(topics='test_source_topic',deserialization_schema=deserialization_schema,properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})ds = env.add_source(kafka_consumer)

使用from_source函数,此函数仅支持NumberSequenceSource和FileSource自定义数据源,仅在streaming执行模式下使用

from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSourceenv = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
ds = env.from_source(source=seq_num_source,watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name='seq_num_source',type_info=Types.LONG())

​​​​​​​使用Table & SQL connectors创建

首先用Table & SQL connectors创建表,再转换为DataStream.

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql("""CREATE TABLE my_source (a INT,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")ds = t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.INT(), Types.STRING()]))

相关文章:

Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table & SQL connectors…...

javeee spring cglib动态代理

cglib动态代理 依赖 <dependency><groupId>cglib</groupId><artifactId>cglib-nodep</artifactId><version>3.2.4</version></dependency>代理类 package com.test.cglibProxy;import net.sf.cglib.proxy.Enhancer; import …...

【Docker】Dockerfile介绍

Dockerfile是一个文本文件&#xff0c;其中包含了一系列的指令&#xff0c;用于构建Docker镜像。这些指令可以用来自动化镜像的构建过程&#xff0c;并创建自定义镜像。 以下是一些常用的Dockerfile指令及其功能&#xff1a; FROM&#xff1a;指定基础镜像。这是Dockerfile中…...

两个hdfs之间迁移传输数据

本文参考其他大数据大牛的博文做了整理和实际验证&#xff0c;主要解决hdfs跨集群复制/迁移问题。 在hdfs数据迁移时总会涉及到两个hdfs版本版本问题&#xff0c;致力解决hdfs版本相同和不同两种情况的处理方式&#xff0c;长话短说&#xff0c;进正文。 distcp: hadoop自带的…...

C++ 缺失的数字

有n个数字&#xff0c;值就是1~n&#xff0c;现发现丢失了2个数字&#xff0c;请你根据剩余的n-2个数字&#xff0c;编程计算一下&#xff0c;缺失的是哪两个数字呢&#xff1f; &#xff08;使用桶排&#xff0c;标记输入过的数字&#xff09; #include<bits/stdc.h> us…...

JVM,JRE和JDK的区别

JVM&#xff0c;JRE和JDK的区别 JVM(Java Virtual Machine&#xff0c;Java虚拟机)JREJRE目录结构 JDK JVM(Java Virtual Machine&#xff0c;Java虚拟机) Java程序的跨平台特性主要是指字节码文件可以在任何具有Java虚拟机的计算机或者电子设备上运行&#xff0c;Java虚拟机中…...

合宙Air724UG LuatOS-Air LVGL API控件--日历 (Calendar)

日历 (Calendar) LVGL 提供了一个用来选择和显示当前日期的日历控件。 示例代码 – 高亮显示的日期 highlightDate lvgl.calendar_date_t() – 日历点击的回调函数 – 将点击日期设置高亮 function event_handler(obj, event) if event lvgl.EVENT_VALUE_CHANGED then da…...

[python]问题:pandas处理excel里的多个sheet

Pandas 可以很容易地处理 Excel 文件中的多个工作表。首先,你需要安装 pandas 和 openpyxl(用于读取 .xlsx 文件)库。你可以使用以下命令安装这两个库: pip install pandas openpyxl接下来,你可以使用以下代码来处理 Excel 文件中的多个工作表: import pandas as pd# 读…...

[MySQL] MySQL基础操作汇总

文章目录 前言1.数据库概述1.1 数据库相关概念1.2登录MySQL&#xff1a;1.3 MySQL常用命令1.4表&#xff1a;1.5SQL语句分类&#xff1a; 2.CRUD操作2.1 DQL1.基础查询基础查询&#xff08;简单查询&#xff09;条件查询&#xff1a;排序查询&#xff1a;分组查询&#xff1a;分…...

C语言每日一题 ---- 打印从1到最大的n位数(Day 1)

本专栏为c语言练习专栏&#xff0c;适合刚刚学完c语言的初学者。本专栏每天会不定时更新&#xff0c;通过每天练习&#xff0c;进一步对c语言的重难点知识进行更深入的学习。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;C语言天天练 &#x…...

2023-08-23 LeetCode每日一题(统计点对的数目)

2023-08-23每日一题 一、题目编号 1782. 统计点对的数目二、题目链接 点击跳转到题目位置 三、题目描述 给你一个无向图&#xff0c;无向图由整数 n &#xff0c;表示图中节点的数目&#xff0c;和 edges 组成&#xff0c;其中 edges[i] [ui, vi] 表示 ui 和 vi 之间有一…...

LLMs之Code:SQLCoder的简介、安装、使用方法之详细攻略

LLMs之Code&#xff1a;SQLCoder的简介、安装、使用方法之详细攻略 目录 SQLCoder的简介 1、结果 2、按问题类别的结果 SQLCoder的安装 1、硬件要求 2、下载模型权重 3、使用SQLCoder 4、Colab中运行SQLCoder 第一步&#xff0c;配置环境 第二步&#xff0c;测试 第…...

数学建模(四)整数规划—匈牙利算法

目录 一、0-1型整数规划问题 1.1 案例 1.2 指派问题的标准形式 2.2 非标准形式的指派问题 二、指派问题的匈牙利解法 2.1 匈牙利解法的一般步骤 2.2 匈牙利解法的实例 2.3 代码实现 一、0-1型整数规划问题 1.1 案例 投资问题&#xff1a; 有600万元投资5个项目&…...

openGauss学习笔记-47 openGauss 高级数据管理-权限

文章目录 openGauss学习笔记-47 openGauss 高级数据管理-权限47.1 语法格式47.2 参数说明47.3 示例 openGauss学习笔记-47 openGauss 高级数据管理-权限 数据库对象创建后&#xff0c;进行对象创建的用户就是该对象的所有者。数据库安装后的默认情况下&#xff0c;未开启三权分…...

开始MySQL之路——MySQL 事务(详解分析)

MySQL 事务概述 MySQL 事务主要用于处理操作量大&#xff0c;复杂度高的数据。比如说&#xff0c;在人员管理系统中&#xff0c;你删除一个人员&#xff0c;你即需要删除人员的基本资料&#xff0c;也要删除和该人员相关的信息&#xff0c;如信箱&#xff0c;文章等等&#xf…...

注解和class对象和mysql

注解 override 通常是用在方法上的注解表示该方法是有重写的 interface 表示一个注解类 比如 public interface override{} 这就表示是override是一个注解类 target 修饰注解的注解表示元注解 deprecated 修饰某个元素表示该元素已经过时了 1.不代表该元素不能用了&…...

【桌面小屏幕项目】ESP32开发环境搭建

视频教程链接&#xff1a; 【【有手就行系列】嵌入式单片机教程-桌面小屏幕实战教学 从设计、硬件、焊接到代码编写、调试 ESP32 持续更新2022】 https://www.bilibili.com/video/BV1wV4y1G7Vk/?share_sourcecopy_web&vd_source4fa5fad39452b08a8f4aa46532e890a7 一、esp…...

CSS 滚动容器与固定 Tabbar 自适应的几种方式

问题 容器高度使用 px 定高时&#xff0c;随着页面高度发生变化&#xff0c;组件展示的数量不能最大化的铺满&#xff0c;导致出现底部留白。容器高度使用 vw 定高时&#xff0c;随着页面宽度发生变化&#xff0c;组件展示的数量不能最大化的铺满&#xff0c;导致出现底部留白…...

IP 地址追踪工具

IP 地址跟踪工具是一种网络实用程序&#xff0c;允许您扫描、跟踪和获取详细信息&#xff0c;例如 IP 地址的 MAC 和接口 ID。IP 跟踪解决方案通过使用不同的网络扫描协议来检查网络地址空间来收集这些详细信息。一些高级 IP 地址跟踪器软件&#xff08;如 OpUtils&#xff09;…...

最新企业网盘产品推荐榜发布

随着数字化发展&#xff0c;传统的文化存储方式已无法跟上企业发展的步伐。云存储的出现为企业提供了新的文件管理存储模式。企业网盘作为云存储的代表性工具&#xff0c;被越来越多的企业所青睐。那么在众多企业网盘产品中&#xff0c;企业该如何找到合适的企业网盘呢&#xf…...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

OpenLayers 可视化之热力图

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 热力图&#xff08;Heatmap&#xff09;又叫热点图&#xff0c;是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

【WiFi帧结构】

文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成&#xff1a;MAC头部frame bodyFCS&#xff0c;其中MAC是固定格式的&#xff0c;frame body是可变长度。 MAC头部有frame control&#xff0c;duration&#xff0c;address1&#xff0c;address2&#xff0c;addre…...

逻辑回归:给不确定性划界的分类大师

想象你是一名医生。面对患者的检查报告&#xff08;肿瘤大小、血液指标&#xff09;&#xff0c;你需要做出一个**决定性判断**&#xff1a;恶性还是良性&#xff1f;这种“非黑即白”的抉择&#xff0c;正是**逻辑回归&#xff08;Logistic Regression&#xff09;** 的战场&a…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢

随着互联网技术的飞速发展&#xff0c;消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁&#xff0c;不仅优化了客户体验&#xff0c;还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用&#xff0c;并…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习

禁止商业或二改转载&#xff0c;仅供自学使用&#xff0c;侵权必究&#xff0c;如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…...

深度剖析 DeepSeek 开源模型部署与应用:策略、权衡与未来走向

在人工智能技术呈指数级发展的当下&#xff0c;大模型已然成为推动各行业变革的核心驱动力。DeepSeek 开源模型以其卓越的性能和灵活的开源特性&#xff0c;吸引了众多企业与开发者的目光。如何高效且合理地部署与运用 DeepSeek 模型&#xff0c;成为释放其巨大潜力的关键所在&…...

多元隐函数 偏导公式

我们来推导隐函数 z z ( x , y ) z z(x, y) zz(x,y) 的偏导公式&#xff0c;给定一个隐函数关系&#xff1a; F ( x , y , z ( x , y ) ) 0 F(x, y, z(x, y)) 0 F(x,y,z(x,y))0 &#x1f9e0; 目标&#xff1a; 求 ∂ z ∂ x \frac{\partial z}{\partial x} ∂x∂z​、 …...