Flink实现实时数据处理
代码如下:
#!/usr/bin/python
# -*- coding: UTF-8 -*-from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes# 初始化执行环境
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(4)
# 必须开启 checkpoint,时间间隔为毫秒,否则不能输出数据
s_env.enable_checkpointing(600000) # 600000 十分钟env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(s_env, environment_settings=env_settings)# 显式注册 Kafka 连接器
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///opt/flink/lib/flink-connector-kafka-1.16.0.jar")# 创建 source 表
t_env.execute_sql("""
CREATE TABLE source_blog_kafka (ip_address STRING,post_id INT,event_type STRING,proctime AS PROCTIME() -- 添加处理时间列
) WITH ('connector' = 'kafka','topic' = 'blog_views','properties.bootstrap.servers' = '110.40.130.231:9092','properties.group.id' = 'flink_kafka_hive_blog_consumers','scan.startup.mode' = 'earliest-offset','format' = 'json'
)
""")# 创建 sink 表
t_env.execute_sql("""
CREATE TABLE sink_blog_hive (post_id INT,view_count INT
) WITH ('connector' = 'kafka','topic' = 'blog_view_stats','properties.bootstrap.servers' = '110.40.130.231:9092','format' = 'json'
)
""")# 定义 SQL 查询
t_env.execute_sql("""
INSERT INTO sink_blog_hive
SELECT post_id, CAST(COUNT(*) AS INT) AS view_count
FROM source_blog_kafka
GROUP BY post_id, TUMBLE(proctime, INTERVAL '1' MINUTE)
""")# 启动执行环境
t_env.execute("Blog View Stats")print("Data processing job started.")
展示了一个使用 Apache Flink 进行实时数据处理的例子,该例子从 Kafka 消费数据,对博客文章的浏览量进行统计,并将结果写回到另一个 Kafka 主题。以下是对你提供的代码的一些解释和建议:
解释
-
初始化执行环境:
StreamExecutionEnvironment和StreamTableEnvironment被用来创建流处理环境。set_parallelism(4)设置了并行度为4,意味着任务可以在四个线程上并行执行。enable_checkpointing(600000)开启了检查点机制,确保在发生故障时能够恢复状态。这里设置的检查点间隔是十分钟。
-
配置 Kafka 连接器:
- 通过
pipeline.jars参数指定了 Kafka 连接器的 JAR 文件路径,这是为了让 Flink 能够与 Kafka 集群通信。
- 通过
-
定义 Source 和 Sink 表:
source_blog_kafka表用于从 Kafka 的blog_views主题读取数据,数据格式为 JSON。sink_blog_hive表用于将处理后的数据写入 Kafka 的blog_view_stats主题,同样以 JSON 格式存储。
-
SQL 查询:
- 使用 SQL 对
source_blog_kafka表中的数据按post_id分组,并计算每分钟内的点击次数,然后将结果插入到sink_blog_hive表中。
- 使用 SQL 对
-
启动任务:
- 最后调用
t_env.execute("Blog View Stats")启动任务执行。
- 最后调用
建议
- 检查点配置:确保 Kafka 连接器版本支持 Flink 的检查点功能,这对于实现精确一次(exactly-once)语义非常重要。
- 资源管理:确保 Flink 集群有足够的资源来运行这个任务,特别是当并行度设置较高时。
- 错误处理:考虑添加适当的错误处理逻辑,比如设置重启策略,以便在遇到失败时能够自动恢复。
- 监控和调试:部署后,利用 Flink 提供的监控工具来跟踪作业的状态和性能,及时发现并解决问题。
相关文章:
Flink实现实时数据处理
代码如下: #!/usr/bin/python # -*- coding: UTF-8 -*-from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes# 初始化执行环境 s_env StreamExecutionEnvironment.get_…...
11.9.2024刷华为
文章目录 HJ31 单词倒排HJ32 密码提取语法知识记录 傻逼OD题目又不全又要收费,看毛线,莫名奇妙 HW这叼机构别搁这儿害人得不得? 我觉得我刷完原来的题目 过一遍华为机考的ED卷出处,就行了 HJ31 单词倒排 游戏本做过了好像 HJ3…...
Chromium 中chrome.system.storage扩展接口定义c++
一、chrome.system.storage 您可以使用 chrome.system.storage API 查询存储设备信息,并在连接和分离可移动存储设备时收到通知。 权限 system.storage 类型 EjectDeviceResultCode 枚举 "success" 移除命令成功执行 - 应用可以提示用户移除设备。…...
【Qt聊天室客户端】登录窗口
1. 验证码 具体实现 登录界面中创建验证码图片空间,并添加到布局管理器中 主要功能概述(创建一个verifycodewidget类专门实现验证码操作) 详细代码 // 头文件#ifndef VERIFYCODEWIDGET_H #define VERIFYCODEWIDGET_H#include <QWidget>…...
如何显示模型特征权重占比图【数据分析】
可视化模型的特征权重 1、流程 1、导入库: numpy:用于处理数组和矩阵。 matplotlib.pyplot:用于绘图。 sklearn.datasets:用于加载数据集。 sklearn.ensemble.RandomForestClassifier:用于训练随机森林模型。2、加载数据集: 使用load_iris函数加载Iris数据集。3、训练模…...
Ubuntu24安装MySQL
下载deb包: 先更新系统包: sudo apt update sudo apt update -y下载mysql: wget https://dev.mysql.com/get/mysql-apt-config_0.8.17-1_all.deb 安装deb包: sudo dpkg -i mysql-apt-config_0.8.17-1_all.deb目前mysql还没有正式支持Ubun…...
微服务架构面试内容整理-Eureka
Spring Cloud Netflix 是一个为构建基于 Spring Cloud 的微服务应用提供的解决方案,利用 Netflix 的开源组件来实现常见的分布式系统功能。以下是 Spring Cloud Netflix 的一些主要组件和特点: 服务注册与发现:Eureka 是一个 RESTful 服务,用于注册和发现微服务。服务实例在…...
qt QErrorMessage详解
1、概述 QErrorMessage是Qt框架中用于显示错误消息的一个对话框类。它提供了一个简单的模态对话框,用于向用户显示错误或警告消息。QErrorMessage通常用于应用程序中,当需要向用户报告错误但不希望中断当前操作时。它提供了一个标准的错误消息界面&…...
SpringBoot 将多个Excel打包下载
在Spring Boot应用中,如果你需要将多个Excel文件打包成一个ZIP文件并提供下载,你可以使用一些Java库来帮助完成这个任务。这里我将展示如何使用Apache POI来生成Excel文件,以及使用Java.util.zip来创建ZIP文件,并通过Spring Boot的…...
分页存储小总结
知识点: 什么是分页存储? 将内存空间分为一个个大小相等的分区(比如:每个分区4KB),每个分区就是一个“页框”(页框页帧内存块物理块物理页面)。每个页框有一个编号,即“页框号”(…...
Star-CCM+应用篇之动力电池温度场仿真操作流程与方法
1 动力电池温度场仿真项目 电池包内模组温度分布、电芯温度分布、温升速率、充电时间等。 2 动力电池温度场仿真分析流程图 图1 电池包热流场分析流程 3 动力电池温度场仿真参数需求 类别...
Spring Boot应用开发:从入门到精通
Spring Boot应用开发:从入门到精通 Spring Boot是Spring框架的一个子项目,旨在简化Spring应用的初始搭建和开发过程。通过自动配置和约定大于配置的原则,Spring Boot使开发者能够快速构建独立的、生产级别的Spring应用。本文将深入探讨Sprin…...
【JAVA项目】基于jspm的【医院病历管理系统】
技术简介:采用jsp技术、MySQL等技术实现。 系统简介:通过标签分类管理等方式,实现管理员;个人中心、医院公告管理、用户管理、科室信息管理、医生管理、出诊信息管理、预约时间段管理、预约挂号管理、门诊病历管理、就诊评价管理、…...
Python中的常见配置文件写法
在软件开发过程中,开发者常常需要利用一些固定的参数或常量。对于这些相对恒定且频繁使用的元素,一种常见的做法是将它们集中存储在一个特定的文件中,以避免在多个模块代码中重复定义,从而维护核心代码的清晰度和整洁性。 具体而…...
语义分割实战——基于PSPnet神经网络动物马分割系统源码
第一步:准备数据 动物马分割数据,总共有328张图片,里面的像素值为0和1,所以看起来全部是黑的,不影响使用 第二步:搭建模型 psp模块的样式如下,其psp的核心重点是采用了步长不同,po…...
Python+Appium编写脚本
一、环境配置 1、安装JDK,版本1.8以上 2、安装Python,版本3.x以上,用来解释python 3、安装node.js,版本^14.17.0 || ^16.13.0 || >18.0.0,用来安装Appimu Server 4、安装npm,版本>8,用…...
RK3288 android7.1 适配 ilitek i2c接口TP
一,Ilitek 触摸屏简介 Ilitek 提供多种型号的触控屏控制器,如 ILI6480、ILI9341 等,采用 I2C 接口。 这些控制器能够支持多点触控,并具有优秀的灵敏度和响应速度。 Ilitek 的触摸屏控制器监测屏幕上的触摸事件。 当触摸发生时&am…...
C++ 越来越像函数式编程了!
C 越来越像函数式编程了 大家好,欢迎来到今天的博客话题。今天我们要聊的是 C 这门老牌的强类型语言是如何一步一步向函数式编程靠拢的。从最早的函数指针,到函数对象(Functor),再到 std::function 和 std::bind&…...
maven工程结构说明
1、maven工程文件目录 |-- pom.xml # Maven 项目管理文件 |-- src # 放项目源文件|-- main # 项目主要代码| |-- java # Java 源代码目录| | -- com/example/myapp…...
【GESP】C++一级真题练习(202312)luogu-B3921,小杨的考试
GESP一级真题练习。为2023年12月一级认证真题。逻辑计算问题。 题目题解详见:【GESP】C一级真题练习(202312)luogu-B3921,小杨的考试 | OneCoder 【GESP】C一级真题练习(202312)luogu-B3921,小杨的考试 | OneCoderGESP一级真题练习。为2023…...
像素史诗落地企业知识库:用Pixel Epic构建内部行业情报自动摘要系统
像素史诗落地企业知识库:用Pixel Epic构建内部行业情报自动摘要系统 1. 企业知识管理的新挑战 在信息爆炸的时代,企业面临的最大挑战不是获取信息,而是如何从海量数据中提取有价值的知识。传统知识管理系统存在几个关键痛点: 信…...
Qwen3智能字幕对齐系统与Dify工作流集成:打造自动化视频内容生产线
Qwen3智能字幕对齐系统与Dify工作流集成:打造自动化视频内容生产线 1. 引言 你有没有算过,一个视频剪辑师一天要花多少时间在字幕上?从听写、校对、再到调整时间轴,一个十分钟的视频,光是字幕可能就要耗掉一两个小时…...
《C语言学习:判断语句if-else》5
写在前面:本笔记为个人学习各平台C语言系列课程所作,仅供交流学习,不得作他用。1. if基本用法if(/*条件*/){/*做法*/ } //如果满足条件,则做大括号中的事情圆括号中是条件,或者说一个表达式。当它是0,则不执…...
STM32CubeIDE工程复制粘贴保姆级教程:告别重复配置,5分钟搞定新项目
STM32CubeIDE工程复制粘贴保姆级教程:告别重复配置,5分钟搞定新项目 每次启动新项目时,你是否还在重复那些繁琐的初始化步骤?从零开始配置时钟树、外设参数、中断优先级,不仅耗时费力,还容易出错。对于经验…...
Zephyr与MCUBoot的深度整合:从构建到安全启动的完整指南
1. 为什么需要安全启动? 在嵌入式开发中,设备固件的安全性往往是最容易被忽视的一环。想象一下,如果你的智能门锁固件被恶意篡改,或者医疗设备的程序被非法替换,后果会有多严重?这就是为什么我们需要MCUBoo…...
Qt桌面应用集成PaddleOCR:从环境搭建到精准识别的实践指南
1. 环境准备:搭建PaddleOCR的Qt开发环境 第一次在Qt里折腾PaddleOCR时,我对着官方文档折腾了半天还是报错,后来发现是第三方库的路径没配好。这里分享下我踩坑后总结的可靠方案。 核心依赖三件套:PaddlePaddle推理库、PaddleOCR C…...
Qwen3.5-9B-AWQ-4bit参数调优实战:温度=0.7时中文回答质量与响应速度平衡点
Qwen3.5-9B-AWQ-4bit参数调优实战:温度0.7时中文回答质量与响应速度平衡点 1. 模型概述与参数调优背景 Qwen3.5-9B-AWQ-4bit是一个支持图像理解的多模态模型,能够结合上传图片与文字提示词输出中文分析结果。在实际应用中,我们发现温度参数…...
[Python3高阶编程] - 异步编程深度学习指南二: 同步原语
概述在 Python 异步编程中,虽然协程(coroutine)天然避免了线程切换开销,但多个协程仍可能同时访问共享资源(如全局变量、文件、数据库连接),从而引发竞态条件(Race Condition&#x…...
单片机调试:问题复现与定位的实战技巧
1. 单片机开发中的问题复现方法论在单片机项目开发过程中,遇到问题是不可避免的。作为一名从业多年的嵌入式工程师,我认为问题复现是整个调试过程中最关键的第一步。很多新手开发者常常急于解决问题,却忽略了问题复现的重要性,结果…...
软件实施交付转运维学习第三天:Linux系统命令基础(部分)
从实施到运维的蜕变之路,掌握命令就是掌握Linux的灵魂写在前面作为一名从软件实施交付转向运维的工程师,我深刻体会到:Linux命令不仅仅是简单的指令,更是与操作系统对话的语言。当我们站在实施和运维的交界处,掌握Linu…...
