3.阿里云flinkselectdb-py作业
1.概述
Python API中文文档
本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发;
先说结论:
在实际开发中遇到了很多问题,导致python作业基本基本无法运行。最后放弃了;
- python作业中的标量函数的错误没有日志,永远是报这个错误:ExceptionInChainedOperatorException: Could not forward element to next operator,定位不到具体问题;
- python作业中的用户定义的标量函数基本无法运行。本地测试没有问题的函数,提交到flink中就报错。怀疑是环境中没有flink-python.jar,自己上传此jar和flink中的包不兼容(阿里云flink和开源版本flink有些jar包不一样);
- 如果各位遇到些问题并且有解决方案,麻烦也告知我,非常感谢;
2.目标
把阿里云sls日志中的数据准实时同步到云服务selectdb;
源表 | flink | 结果表 |
---|---|---|
阿里云sls | 实时计算flink | 云服务selectdb |
3.步骤
3.1.搭建环境
#**创建虚拟环境essa-flink,pyhton版本为3.11.9
conda create -n essa-flink python=3.11.9#**安装apache-flink-1.20版本。安装的依赖比较大,指定国内的pip源
pip install apache-flink==1.20.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
3.2.创建作业
作业代码本身很简单,逐行读取sls的日志,进行转换后保存到selectdb中。转换函数为do_active_log,在本地测试过程中遇到了第一个问题后,很轻松愉快就通过了。部署在flink中出现了其它问题;
- 首先是阿里云提供sls连接器(ververica-connector-sls-1.17-vvr-8.0.8.jar)不可用,报错缺少com/alibaba/ververica/connectors/sls/source/SLSRecordReader。查看源码,确实没有定义此类。提工单后,建设使用低版本解决;
- 然后报错缺少flink-python,不能执行python函数。于是把flink-python上传,并在作业中引用依赖;
- 最后报错ExceptionInChainedOperatorException: Could not forward element to next,无法执行。把作业中函数调用do_active_log删除后正常。提工单后还是没有解决。最后放弃,改用jar作业;
def do_active_log(row: Row) -> Row:'''用户登录日志处理'''logging.info('执行do_active_log函数...')params = json.loads(row[2])occurred = datetime.fromtimestamp(float(row[1]))user_id = params['userId']platform = params['platform']last_active_time = occurredcreate_time = occurredid = occurred.strftime("%Y%m%d") + str(user_id)return Row(str(id), int(user_id), platform, last_active_time, create_time)def create_active_log_sink_table(table_env: StreamTableEnvironment, sink_table: str):'''创建用户登录日志结果表'''sql = '''create temporary table {}(id string,user_id int,platform string,last_active_time timestamp,create_time timestamp,primary key(id) not enforced) with ('connector' = 'doris','fenodes' = '{}','table.identifier' = '{}','username' = '{}','password' = '{}','sink.properties.format' = 'json')'''.format(sink_table, sink_config['fenodes'], sink_config['table.identifier'], sink_config['username'], sink_config['password'])table_env.execute_sql(sql)def get_soruce_datastream(table_env: StreamTableEnvironment):'''创建datastream'''times = {'start_time': '', 'stop_time': ''}sql = '''create temporary table essa_ubc(ip string,`time` string,content string,__topic__ string metadata virtual,__source__ string metadata virtual,__timestamp__ string metadata virtual) with ('connector' = 'sls','endpoint' = '{}','accessId' = '{}','accessKey' = '{}','project' = '{}','logstore' ='essa-ubc','startTime' = '{}','stopTime' = '{}','exitAfterFinish' = 'true')'''.format(source_config['sls_endpoint'], source_config['access_id'], source_config['access_secret'],source_config['sls_project'], times['start_time'], times['stop_time'])table_env.execute_sql(sql)source_table = table_env.from_path('essa_ubc')return table_env.to_append_stream(source_table, Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),Types.STRING(), Types.STRING()]))if __name__ == '__main__':env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)#**加载依赖的jar包t_env.get_config().set("pipeline.jars", "依赖包.jar")#**创建sls源ds = get_soruce_datastream(t_env)#**用户登录日志处理#**读取sls日志数据,然后使用自定义标量函数处理数据ds = ds.filter(lambda d: d[3] == 'activeLog').map(do_active_log, Types.ROW([Types.STRING(), Types.INT(), Types.STRING(),Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()]))table = t_env.from_data_stream(ds)active_log_sink_table = 'user_active_log'create_active_log_sink_table(t_env, active_log_sink_table)table.execute_insert(active_log_sink_table).wait()
相关文章:
3.阿里云flinkselectdb-py作业
1.概述 Python API中文文档 本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发; 先说结论: 在实际开发中遇到了很多问题,导致python作业基本基本无法…...
MATLAB语言的网络编程
标题:MATLAB中的网络编程:深入探索与实践 一、引言 在现代科学和工程领域中,网络编程已经成为了数据处理、信号分析、模型构建等众多任务中不可或缺的一环。MATLAB作为一款强大的数学计算软件,不仅提供了丰富的数值计算功能&…...

深入浅出 Linux 操作系统
深入浅出 Linux 操作系统 引言 在当今数字化的时代,Linux 操作系统无处不在。从支撑互联网巨头庞大的数据中心,到嵌入智能家居设备的微型芯片,Linux 都发挥着关键作用。然而,对于许多人来说,Linux 仍笼罩着一层神秘的…...
golang实现生产者消费者模式
在Go语言中,生产者消费者模式可以通过使用Goroutines和Channels来实现。Goroutines允许并发执行,而Channels则用于在生产者和消费者之间安全地传递数据。 生产者消费者模式的基本思路 生产者:负责生成数据并将其放入一个共享的缓冲区…...

自动化测试-Pytest测试
目录 pytest简介 基本测试实例 编写测试文件 执行测试 pytest运行时参数 mark标记 Fixture pytest插件 Allure测试报告 测试步骤 pytest简介 Pytest是一个非常流行的Python测试框架,它支持简单的单元测试和复杂的功能测试,具有易于上手、功…...
Ingress-Nginx Annotations 指南:配置要点全方面解读(下)
文章目录 1.HTTP2 Push Preload2.Server Alias3.Server snippet4.Client Body Buffer Size5.External Authentication6.Global External Authentication7.Rate Limiting8.Global Rate Limiting9.Permanent Redirect10.Permanent Redirect Code11.Temporal Redirect12.SSL Passt…...
【QED】等式构造
文章目录 题目题目描述输入输出格式数据范围测试样例 思路代码复杂度分析时间复杂度空间复杂度 题目 题目链接🔗 题目描述 有关 「上述等式为何正确」 的问题解决了,然而 「如何构造出上述那种让人啼笑皆非的正确等式」 成为了一个新的问题。 我们认…...

Kafka数据迁移全解析:同集群和跨集群
文章目录 一、同集群迁移二、跨集群迁移 Kafka两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。 一、同集群迁移 应用场景: broker 迁移 主要使用的场景是broker 上线,下线,或者扩容等.基于同一套zookeeper的操作。 实践: 将需要新添加…...

Debian安装配置RocketMQ
安装配置 本次安装在/tools/rocket目录下 下载 wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.1/rocketmq-all-5.3.1-bin-release.zip 解压缩 unzip rocketmq-all-5.3.1-bin-release.zip 如果出现以下报错 -bash: unzip: command not found可安装unzip工具后执…...

vue之axios基本使用
文章目录 1. axios 网络请求库2. axiosvue 1. axios 网络请求库 <body> <input type"button" value"get请求" class"get"> <input type"button" value"post请求" class"post"> <!-- 官网提供…...

三只脚的电感是什么东西?
最近在做加湿器,把水雾化的陶瓷片需要有专门的驱动电路。 我参考了某宝卖家的驱动板以及网上的开源项目,发现了驱动电路的核心就是一个三脚电感。 在此之前我都没注意过这玩意,三脚电感不也还是电感嘛? 今天我们就来看看三脚电…...
【数据库学习笔记】SQL触发器(例题+代码)
数据库SQL 1、触发器概念 (1)触发器(trigger)是用户定义在关系表上的一类由事件驱动的存储过程,由服务器自动激活。 (2)触发器可进行更为复杂的检查和操作,具有更精细和更强大的数…...

Unittest02|TestSuite、TestRunner、HTMLTestRunner、处理excel表数据、邮件接收测试结果
目录 八、测试套件TestSuite和测试运行器TestRunner 1、基本概念 2、创建和使用测试套件 3、 自动发现测试用例、创建测试套件、运行测试 4、生成html的测试报告:HTMLTestRunner 1️⃣导入HTMLTestRunner模块 2️⃣运行测试用例并生成html文件 九、unittest…...

BAPI_BATCH_CHANGE在更新后不自动更新批次特征
1、问题介绍 在CL03中看到分类特性配置了制造日期字段,并绑定了生产日期字段MCH1~HSDAT MSC2N修改批次的生产日期字段时,自动修改了对应的批次特性 但是通过BAPI:BAPI_BATCH_CHANGE修改生产日期时,并没有更新到批次特性中 2、BAPI…...

顶会评测集解读-AlignBench: 大语言模型中文对齐基准
评测集社区 CompssHub 作为司南 OpenCompass大模型评测体系的重要组成部分,致力于简化并加快研究人员和行业人士搜索和使用评测集的过程。评测集社区 CompssHub 目前已收录了学科、推理、知识、代码等12个方向的评测集,欢迎大家探索。 为了将评测集社区…...

MySQL外键类型与应用场景总结:优缺点一目了然
前言: MySQL的外键简介:在 MySQL 中,外键 (Foreign Key) 用于建立和强制表之间的关联,确保数据的一致性和完整性。外键的作用主要是限制和维护引用完整性 (Referential Integrity)。 主要体现在引用操作发生变化时的处理方式&…...

【含开题报告+文档+PPT+源码】基于SpringBoot+Vue的网上书店管理系统的设计与实现
开题报告 本研究论文主要介绍了基于Spring Boot框架开发的全面网上书店管理系统的构建与实现。该系统以用户为核心,提供了丰富的个性化服务功能。首先,系统支持用户进行便捷的登录注册操作,并具备安全可靠的密码修改机制,同时允许…...

力扣面试题 - 40 迷路的机器人 C语言解法
题目: 设想有个机器人坐在一个网格的左上角,网格 r 行 c 列。机器人只能向下或向右移动,但不能走到一些被禁止的网格(有障碍物)。设计一种算法,寻找机器人从左上角移动到右下角的路径。 网格中的障碍物和空…...

ElementPlus 自定义封装 el-date-picker 的快捷功能
文章目录 需求分析 需求 分析 我们看到官网上给出的案例如下,但是不太满足我们用户想要的快捷功能,因为不太多,因此需要我们自己封装一些,方法如下 外部自定义该组件的快捷内容 export const getPickerOptions () > {cons…...

二百八十二、ClickHouse——删除Linux中的ClickHouse
一、目的 由于ClickHosue的库表发生变化,需要删除原有的表结构数据,才能直接把脚本里文件重新安装 二、删除步骤 1、关闭ClickHouse服务 systemctl stop clickhouse-server 2、卸载ClickHouse软件包 sudo yum remove clickhouse-server clickhouse…...

VB.net复制Ntag213卡写入UID
本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...

Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)
在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马(服务器方面的)的原理,连接,以及各种木马及连接工具的分享 文件木马:https://w…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

LabVIEW双光子成像系统技术
双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制,展现出显著的技术优势: 深层组织穿透能力:适用于活体组织深度成像 高分辨率观测性能:满足微观结构的精细研究需求 低光毒性特点:减少对样本的损伤…...
Python网页自动化Selenium中文文档
1. 安装 1.1. 安装 Selenium Python bindings 提供了一个简单的API,让你使用Selenium WebDriver来编写功能/校验测试。 通过Selenium Python的API,你可以非常直观的使用Selenium WebDriver的所有功能。 Selenium Python bindings 使用非常简洁方便的A…...
从零手写Java版本的LSM Tree (一):LSM Tree 概述
🔥 推荐一个高质量的Java LSM Tree开源项目! https://github.com/brianxiadong/java-lsm-tree java-lsm-tree 是一个从零实现的Log-Structured Merge Tree,专为高并发写入场景设计。 核心亮点: ⚡ 极致性能:写入速度超…...

ubuntu中安装conda的后遗症
缘由: 在编译rk3588的sdk时,遇到编译buildroot失败,提示如下: 提示缺失expect,但是实测相关工具是在的,如下显示: 然后查找借助各个ai工具,重新安装相关的工具,依然无解。 解决&am…...