3.阿里云flinkselectdb-py作业
1.概述
Python API中文文档
本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发;
先说结论:
在实际开发中遇到了很多问题,导致python作业基本基本无法运行。最后放弃了;
- python作业中的标量函数的错误没有日志,永远是报这个错误:ExceptionInChainedOperatorException: Could not forward element to next operator,定位不到具体问题;
- python作业中的用户定义的标量函数基本无法运行。本地测试没有问题的函数,提交到flink中就报错。怀疑是环境中没有flink-python.jar,自己上传此jar和flink中的包不兼容(阿里云flink和开源版本flink有些jar包不一样);
- 如果各位遇到些问题并且有解决方案,麻烦也告知我,非常感谢;
2.目标
把阿里云sls日志中的数据准实时同步到云服务selectdb;
| 源表 | flink | 结果表 |
|---|---|---|
| 阿里云sls | 实时计算flink | 云服务selectdb |
3.步骤
3.1.搭建环境
#**创建虚拟环境essa-flink,pyhton版本为3.11.9
conda create -n essa-flink python=3.11.9#**安装apache-flink-1.20版本。安装的依赖比较大,指定国内的pip源
pip install apache-flink==1.20.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
3.2.创建作业
作业代码本身很简单,逐行读取sls的日志,进行转换后保存到selectdb中。转换函数为do_active_log,在本地测试过程中遇到了第一个问题后,很轻松愉快就通过了。部署在flink中出现了其它问题;
- 首先是阿里云提供sls连接器(ververica-connector-sls-1.17-vvr-8.0.8.jar)不可用,报错缺少com/alibaba/ververica/connectors/sls/source/SLSRecordReader。查看源码,确实没有定义此类。提工单后,建设使用低版本解决;
- 然后报错缺少flink-python,不能执行python函数。于是把flink-python上传,并在作业中引用依赖;
- 最后报错ExceptionInChainedOperatorException: Could not forward element to next,无法执行。把作业中函数调用do_active_log删除后正常。提工单后还是没有解决。最后放弃,改用jar作业;
def do_active_log(row: Row) -> Row:'''用户登录日志处理'''logging.info('执行do_active_log函数...')params = json.loads(row[2])occurred = datetime.fromtimestamp(float(row[1]))user_id = params['userId']platform = params['platform']last_active_time = occurredcreate_time = occurredid = occurred.strftime("%Y%m%d") + str(user_id)return Row(str(id), int(user_id), platform, last_active_time, create_time)def create_active_log_sink_table(table_env: StreamTableEnvironment, sink_table: str):'''创建用户登录日志结果表'''sql = '''create temporary table {}(id string,user_id int,platform string,last_active_time timestamp,create_time timestamp,primary key(id) not enforced) with ('connector' = 'doris','fenodes' = '{}','table.identifier' = '{}','username' = '{}','password' = '{}','sink.properties.format' = 'json')'''.format(sink_table, sink_config['fenodes'], sink_config['table.identifier'], sink_config['username'], sink_config['password'])table_env.execute_sql(sql)def get_soruce_datastream(table_env: StreamTableEnvironment):'''创建datastream'''times = {'start_time': '', 'stop_time': ''}sql = '''create temporary table essa_ubc(ip string,`time` string,content string,__topic__ string metadata virtual,__source__ string metadata virtual,__timestamp__ string metadata virtual) with ('connector' = 'sls','endpoint' = '{}','accessId' = '{}','accessKey' = '{}','project' = '{}','logstore' ='essa-ubc','startTime' = '{}','stopTime' = '{}','exitAfterFinish' = 'true')'''.format(source_config['sls_endpoint'], source_config['access_id'], source_config['access_secret'],source_config['sls_project'], times['start_time'], times['stop_time'])table_env.execute_sql(sql)source_table = table_env.from_path('essa_ubc')return table_env.to_append_stream(source_table, Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),Types.STRING(), Types.STRING()]))if __name__ == '__main__':env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)#**加载依赖的jar包t_env.get_config().set("pipeline.jars", "依赖包.jar")#**创建sls源ds = get_soruce_datastream(t_env)#**用户登录日志处理#**读取sls日志数据,然后使用自定义标量函数处理数据ds = ds.filter(lambda d: d[3] == 'activeLog').map(do_active_log, Types.ROW([Types.STRING(), Types.INT(), Types.STRING(),Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()]))table = t_env.from_data_stream(ds)active_log_sink_table = 'user_active_log'create_active_log_sink_table(t_env, active_log_sink_table)table.execute_insert(active_log_sink_table).wait()
相关文章:
3.阿里云flinkselectdb-py作业
1.概述 Python API中文文档 本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发; 先说结论: 在实际开发中遇到了很多问题,导致python作业基本基本无法…...
MATLAB语言的网络编程
标题:MATLAB中的网络编程:深入探索与实践 一、引言 在现代科学和工程领域中,网络编程已经成为了数据处理、信号分析、模型构建等众多任务中不可或缺的一环。MATLAB作为一款强大的数学计算软件,不仅提供了丰富的数值计算功能&…...
深入浅出 Linux 操作系统
深入浅出 Linux 操作系统 引言 在当今数字化的时代,Linux 操作系统无处不在。从支撑互联网巨头庞大的数据中心,到嵌入智能家居设备的微型芯片,Linux 都发挥着关键作用。然而,对于许多人来说,Linux 仍笼罩着一层神秘的…...
golang实现生产者消费者模式
在Go语言中,生产者消费者模式可以通过使用Goroutines和Channels来实现。Goroutines允许并发执行,而Channels则用于在生产者和消费者之间安全地传递数据。 生产者消费者模式的基本思路 生产者:负责生成数据并将其放入一个共享的缓冲区…...
自动化测试-Pytest测试
目录 pytest简介 基本测试实例 编写测试文件 执行测试 pytest运行时参数 mark标记 Fixture pytest插件 Allure测试报告 测试步骤 pytest简介 Pytest是一个非常流行的Python测试框架,它支持简单的单元测试和复杂的功能测试,具有易于上手、功…...
Ingress-Nginx Annotations 指南:配置要点全方面解读(下)
文章目录 1.HTTP2 Push Preload2.Server Alias3.Server snippet4.Client Body Buffer Size5.External Authentication6.Global External Authentication7.Rate Limiting8.Global Rate Limiting9.Permanent Redirect10.Permanent Redirect Code11.Temporal Redirect12.SSL Passt…...
【QED】等式构造
文章目录 题目题目描述输入输出格式数据范围测试样例 思路代码复杂度分析时间复杂度空间复杂度 题目 题目链接🔗 题目描述 有关 「上述等式为何正确」 的问题解决了,然而 「如何构造出上述那种让人啼笑皆非的正确等式」 成为了一个新的问题。 我们认…...
Kafka数据迁移全解析:同集群和跨集群
文章目录 一、同集群迁移二、跨集群迁移 Kafka两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。 一、同集群迁移 应用场景: broker 迁移 主要使用的场景是broker 上线,下线,或者扩容等.基于同一套zookeeper的操作。 实践: 将需要新添加…...
Debian安装配置RocketMQ
安装配置 本次安装在/tools/rocket目录下 下载 wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.1/rocketmq-all-5.3.1-bin-release.zip 解压缩 unzip rocketmq-all-5.3.1-bin-release.zip 如果出现以下报错 -bash: unzip: command not found可安装unzip工具后执…...
vue之axios基本使用
文章目录 1. axios 网络请求库2. axiosvue 1. axios 网络请求库 <body> <input type"button" value"get请求" class"get"> <input type"button" value"post请求" class"post"> <!-- 官网提供…...
三只脚的电感是什么东西?
最近在做加湿器,把水雾化的陶瓷片需要有专门的驱动电路。 我参考了某宝卖家的驱动板以及网上的开源项目,发现了驱动电路的核心就是一个三脚电感。 在此之前我都没注意过这玩意,三脚电感不也还是电感嘛? 今天我们就来看看三脚电…...
【数据库学习笔记】SQL触发器(例题+代码)
数据库SQL 1、触发器概念 (1)触发器(trigger)是用户定义在关系表上的一类由事件驱动的存储过程,由服务器自动激活。 (2)触发器可进行更为复杂的检查和操作,具有更精细和更强大的数…...
Unittest02|TestSuite、TestRunner、HTMLTestRunner、处理excel表数据、邮件接收测试结果
目录 八、测试套件TestSuite和测试运行器TestRunner 1、基本概念 2、创建和使用测试套件 3、 自动发现测试用例、创建测试套件、运行测试 4、生成html的测试报告:HTMLTestRunner 1️⃣导入HTMLTestRunner模块 2️⃣运行测试用例并生成html文件 九、unittest…...
BAPI_BATCH_CHANGE在更新后不自动更新批次特征
1、问题介绍 在CL03中看到分类特性配置了制造日期字段,并绑定了生产日期字段MCH1~HSDAT MSC2N修改批次的生产日期字段时,自动修改了对应的批次特性 但是通过BAPI:BAPI_BATCH_CHANGE修改生产日期时,并没有更新到批次特性中 2、BAPI…...
顶会评测集解读-AlignBench: 大语言模型中文对齐基准
评测集社区 CompssHub 作为司南 OpenCompass大模型评测体系的重要组成部分,致力于简化并加快研究人员和行业人士搜索和使用评测集的过程。评测集社区 CompssHub 目前已收录了学科、推理、知识、代码等12个方向的评测集,欢迎大家探索。 为了将评测集社区…...
MySQL外键类型与应用场景总结:优缺点一目了然
前言: MySQL的外键简介:在 MySQL 中,外键 (Foreign Key) 用于建立和强制表之间的关联,确保数据的一致性和完整性。外键的作用主要是限制和维护引用完整性 (Referential Integrity)。 主要体现在引用操作发生变化时的处理方式&…...
【含开题报告+文档+PPT+源码】基于SpringBoot+Vue的网上书店管理系统的设计与实现
开题报告 本研究论文主要介绍了基于Spring Boot框架开发的全面网上书店管理系统的构建与实现。该系统以用户为核心,提供了丰富的个性化服务功能。首先,系统支持用户进行便捷的登录注册操作,并具备安全可靠的密码修改机制,同时允许…...
力扣面试题 - 40 迷路的机器人 C语言解法
题目: 设想有个机器人坐在一个网格的左上角,网格 r 行 c 列。机器人只能向下或向右移动,但不能走到一些被禁止的网格(有障碍物)。设计一种算法,寻找机器人从左上角移动到右下角的路径。 网格中的障碍物和空…...
ElementPlus 自定义封装 el-date-picker 的快捷功能
文章目录 需求分析 需求 分析 我们看到官网上给出的案例如下,但是不太满足我们用户想要的快捷功能,因为不太多,因此需要我们自己封装一些,方法如下 外部自定义该组件的快捷内容 export const getPickerOptions () > {cons…...
二百八十二、ClickHouse——删除Linux中的ClickHouse
一、目的 由于ClickHosue的库表发生变化,需要删除原有的表结构数据,才能直接把脚本里文件重新安装 二、删除步骤 1、关闭ClickHouse服务 systemctl stop clickhouse-server 2、卸载ClickHouse软件包 sudo yum remove clickhouse-server clickhouse…...
OpenProject:构建高效团队协作的终极开源项目管理平台
OpenProject:构建高效团队协作的终极开源项目管理平台 【免费下载链接】openproject OpenProject is the leading open source project management software. 项目地址: https://gitcode.com/GitHub_Trending/op/openproject OpenProject 是一款领先的开源项…...
CosyVoice2-0.5B效果实测:背景噪音音频对克隆效果影响量化
CosyVoice2-0.5B效果实测:背景噪音音频对克隆效果影响量化 1. 测试背景与目的 声音克隆技术近年来发展迅猛,阿里开源的CosyVoice2-0.5B作为一款强大的零样本语音合成系统,能够在短短3秒内复刻任意说话人的声音。但在实际应用中,…...
10个C语言开源项目解析与学习指南
1. 10个值得学习的C语言开源项目解析 作为一名在嵌入式领域摸爬滚打多年的开发者,我深知阅读优秀开源代码对提升编程能力的重要性。今天要分享的这10个C语言项目,每一个都是精炼而实用的典范,特别适合想要深入理解系统编程、网络协议和底层实…...
YOLOv8显存溢出?CPU轻量版部署教程让资源占用降低80%
YOLOv8显存溢出?CPU轻量版部署教程让资源占用降低80% 1. 项目背景与价值 你是不是遇到过这样的情况:想用YOLOv8做目标检测,结果一运行就显存溢出,或者GPU资源被占满导致其他程序卡顿?这种情况在资源有限的开发环境中…...
Llama-3.2V-11B-cotGPU算力优化:双卡4090自动拆分模型实测报告
Llama-3.2V-11B-cot GPU算力优化:双卡4090自动拆分模型实测报告 1. 项目概述 Llama-3.2V-11B-cot是基于Meta最新多模态大模型开发的高性能视觉推理工具,专为双卡RTX 4090环境深度优化。作为一款11B参数规模的视觉推理工具,它解决了传统大模…...
告别系统管理困境:WinUtil让Windows优化效率提升300%
告别系统管理困境:WinUtil让Windows优化效率提升300% 【免费下载链接】winutil Chris Titus Techs Windows Utility - Install Programs, Tweaks, Fixes, and Updates 项目地址: https://gitcode.com/GitHub_Trending/wi/winutil 作为Windows用户,…...
微信聊天记录永久保存终极指南:WeChatMsg免费工具完整解决方案
微信聊天记录永久保存终极指南:WeChatMsg免费工具完整解决方案 【免费下载链接】WeChatMsg 提取微信聊天记录,将其导出成HTML、Word、CSV文档永久保存,对聊天记录进行分析生成年度聊天报告 项目地址: https://gitcode.com/GitHub_Trending/…...
SPI Flash性能翻倍秘籍:RT-Thread下W25Q的QSPI模式实战
SPI Flash性能翻倍秘籍:RT-Thread下W25Q的QSPI模式实战 在IoT设备开发中,存储性能往往是系统瓶颈之一。传统SPI接口的Flash存储器虽然成本低廉,但在高速数据读写场景下显得力不从心。本文将深入探讨如何通过QSPI模式充分释放W25Q系列Flash的潜…...
intv_ai_mk11GPU利用率提升:Llama中型模型批处理与并发请求调优方案
intv_ai_mk11 GPU利用率提升:Llama中型模型批处理与并发请求调优方案 1. 背景与挑战 intv_ai_mk11 是基于 Llama 架构的中等规模文本生成模型,在实际部署中我们发现单请求处理时GPU利用率往往不足30%。这种低效的资源使用导致两个主要问题:…...
windows java jar 包后台运行
使用 javaw 实现后台运行(简单场景)这是最简单的方法。Java 自带的 javaw.exe 是 java.exe 的变体,它运行程序时不会打开任何控制台窗口。操作步骤:创建一个新的文本文件,命名为 start.bat。在文件中写入以下内容&…...
