当前位置: 首页 > news >正文

3.阿里云flinkselectdb-py作业

1.概述

Python API中文文档
本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发;
先说结论:
在实际开发中遇到了很多问题,导致python作业基本基本无法运行。最后放弃了;

  • python作业中的标量函数的错误没有日志,永远是报这个错误:ExceptionInChainedOperatorException: Could not forward element to next operator,定位不到具体问题;
  • python作业中的用户定义的标量函数基本无法运行。本地测试没有问题的函数,提交到flink中就报错。怀疑是环境中没有flink-python.jar,自己上传此jar和flink中的包不兼容(阿里云flink和开源版本flink有些jar包不一样);
  • 如果各位遇到些问题并且有解决方案,麻烦也告知我,非常感谢;

2.目标

把阿里云sls日志中的数据准实时同步到云服务selectdb;

源表flink结果表
阿里云sls实时计算flink云服务selectdb

3.步骤

3.1.搭建环境

#**创建虚拟环境essa-flink,pyhton版本为3.11.9
conda create -n essa-flink python=3.11.9#**安装apache-flink-1.20版本。安装的依赖比较大,指定国内的pip源
pip install apache-flink==1.20.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

3.2.创建作业

作业代码本身很简单,逐行读取sls的日志,进行转换后保存到selectdb中。转换函数为do_active_log,在本地测试过程中遇到了第一个问题后,很轻松愉快就通过了。部署在flink中出现了其它问题;

  • 首先是阿里云提供sls连接器(ververica-connector-sls-1.17-vvr-8.0.8.jar)不可用,报错缺少com/alibaba/ververica/connectors/sls/source/SLSRecordReader。查看源码,确实没有定义此类。提工单后,建设使用低版本解决;
  • 然后报错缺少flink-python,不能执行python函数。于是把flink-python上传,并在作业中引用依赖;
  • 最后报错ExceptionInChainedOperatorException: Could not forward element to next,无法执行。把作业中函数调用do_active_log删除后正常。提工单后还是没有解决。最后放弃,改用jar作业;
def do_active_log(row: Row) -> Row:'''用户登录日志处理'''logging.info('执行do_active_log函数...')params = json.loads(row[2])occurred = datetime.fromtimestamp(float(row[1]))user_id = params['userId']platform = params['platform']last_active_time = occurredcreate_time = occurredid = occurred.strftime("%Y%m%d") + str(user_id)return Row(str(id), int(user_id), platform, last_active_time, create_time)def create_active_log_sink_table(table_env: StreamTableEnvironment, sink_table: str):'''创建用户登录日志结果表'''sql = '''create temporary table {}(id string,user_id int,platform string,last_active_time timestamp,create_time timestamp,primary key(id) not enforced) with ('connector' = 'doris','fenodes' = '{}','table.identifier' = '{}','username' = '{}','password' = '{}','sink.properties.format' = 'json')'''.format(sink_table, sink_config['fenodes'], sink_config['table.identifier'], sink_config['username'], sink_config['password'])table_env.execute_sql(sql)def get_soruce_datastream(table_env: StreamTableEnvironment):'''创建datastream'''times = {'start_time': '', 'stop_time': ''}sql = '''create temporary table essa_ubc(ip string,`time` string,content string,__topic__ string metadata virtual,__source__ string metadata virtual,__timestamp__ string metadata virtual) with ('connector' = 'sls','endpoint' = '{}','accessId' = '{}','accessKey' = '{}','project' = '{}','logstore' ='essa-ubc','startTime' = '{}','stopTime' = '{}','exitAfterFinish' = 'true')'''.format(source_config['sls_endpoint'], source_config['access_id'], source_config['access_secret'],source_config['sls_project'], times['start_time'], times['stop_time'])table_env.execute_sql(sql)source_table = table_env.from_path('essa_ubc')return table_env.to_append_stream(source_table, Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),Types.STRING(), Types.STRING()]))if __name__ == '__main__':env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)#**加载依赖的jar包t_env.get_config().set("pipeline.jars", "依赖包.jar")#**创建sls源ds = get_soruce_datastream(t_env)#**用户登录日志处理#**读取sls日志数据,然后使用自定义标量函数处理数据ds = ds.filter(lambda d: d[3] == 'activeLog').map(do_active_log, Types.ROW([Types.STRING(), Types.INT(), Types.STRING(),Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()]))table = t_env.from_data_stream(ds)active_log_sink_table = 'user_active_log'create_active_log_sink_table(t_env, active_log_sink_table)table.execute_insert(active_log_sink_table).wait()

相关文章:

3.阿里云flinkselectdb-py作业

1.概述 Python API中文文档 本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发; 先说结论: 在实际开发中遇到了很多问题,导致python作业基本基本无法…...

MATLAB语言的网络编程

标题:MATLAB中的网络编程:深入探索与实践 一、引言 在现代科学和工程领域中,网络编程已经成为了数据处理、信号分析、模型构建等众多任务中不可或缺的一环。MATLAB作为一款强大的数学计算软件,不仅提供了丰富的数值计算功能&…...

深入浅出 Linux 操作系统

深入浅出 Linux 操作系统 引言 在当今数字化的时代,Linux 操作系统无处不在。从支撑互联网巨头庞大的数据中心,到嵌入智能家居设备的微型芯片,Linux 都发挥着关键作用。然而,对于许多人来说,Linux 仍笼罩着一层神秘的…...

golang实现生产者消费者模式

在Go语言中,生产者消费者模式可以通过使用Goroutines和Channels来实现。Goroutines允许并发执行,而Channels则用于在生产者和消费者之间安全地传递数据。 生产者消费者模式的基本思路 生产者:负责生成数据并将其放入一个共享的缓冲区&#xf…...

自动化测试-Pytest测试

目录 pytest简介 基本测试实例 编写测试文件 执行测试 pytest运行时参数 mark标记 Fixture pytest插件 Allure测试报告 测试步骤 pytest简介 Pytest‌是一个非常流行的Python测试框架,它支持简单的单元测试和复杂的功能测试,具有易于上手、功…...

Ingress-Nginx Annotations 指南:配置要点全方面解读(下)

文章目录 1.HTTP2 Push Preload2.Server Alias3.Server snippet4.Client Body Buffer Size5.External Authentication6.Global External Authentication7.Rate Limiting8.Global Rate Limiting9.Permanent Redirect10.Permanent Redirect Code11.Temporal Redirect12.SSL Passt…...

【QED】等式构造

文章目录 题目题目描述输入输出格式数据范围测试样例 思路代码复杂度分析时间复杂度空间复杂度 题目 题目链接🔗 题目描述 有关 「上述等式为何正确」 的问题解决了,然而 「如何构造出上述那种让人啼笑皆非的正确等式」 成为了一个新的问题。 我们认…...

Kafka数据迁移全解析:同集群和跨集群

文章目录 一、同集群迁移二、跨集群迁移 Kafka两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。 一、同集群迁移 应用场景: broker 迁移 主要使用的场景是broker 上线,下线,或者扩容等.基于同一套zookeeper的操作。 实践: 将需要新添加…...

Debian安装配置RocketMQ

安装配置 本次安装在/tools/rocket目录下 下载 wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.1/rocketmq-all-5.3.1-bin-release.zip 解压缩 unzip rocketmq-all-5.3.1-bin-release.zip 如果出现以下报错 -bash: unzip: command not found可安装unzip工具后执…...

vue之axios基本使用

文章目录 1. axios 网络请求库2. axiosvue 1. axios 网络请求库 <body> <input type"button" value"get请求" class"get"> <input type"button" value"post请求" class"post"> <!-- 官网提供…...

三只脚的电感是什么东西?

最近在做加湿器&#xff0c;把水雾化的陶瓷片需要有专门的驱动电路。 我参考了某宝卖家的驱动板以及网上的开源项目&#xff0c;发现了驱动电路的核心就是一个三脚电感。 在此之前我都没注意过这玩意&#xff0c;三脚电感不也还是电感嘛&#xff1f; 今天我们就来看看三脚电…...

【数据库学习笔记】SQL触发器(例题+代码)

数据库SQL 1、触发器概念 &#xff08;1&#xff09;触发器&#xff08;trigger&#xff09;是用户定义在关系表上的一类由事件驱动的存储过程&#xff0c;由服务器自动激活。 &#xff08;2&#xff09;触发器可进行更为复杂的检查和操作&#xff0c;具有更精细和更强大的数…...

Unittest02|TestSuite、TestRunner、HTMLTestRunner、处理excel表数据、邮件接收测试结果

目录 八、测试套件TestSuite和测试运行器TestRunner 1、基本概念 2、创建和使用测试套件 3、 自动发现测试用例、创建测试套件、运行测试 4、生成html的测试报告&#xff1a;HTMLTestRunner 1️⃣导入HTMLTestRunner模块 2️⃣运行测试用例并生成html文件 九、unittest…...

BAPI_BATCH_CHANGE在更新后不自动更新批次特征

1、问题介绍 在CL03中看到分类特性配置了制造日期字段&#xff0c;并绑定了生产日期字段MCH1~HSDAT MSC2N修改批次的生产日期字段时&#xff0c;自动修改了对应的批次特性 但是通过BAPI&#xff1a;BAPI_BATCH_CHANGE修改生产日期时&#xff0c;并没有更新到批次特性中 2、BAPI…...

顶会评测集解读-AlignBench: 大语言模型中文对齐基准

评测集社区 CompssHub 作为司南 OpenCompass大模型评测体系的重要组成部分&#xff0c;致力于简化并加快研究人员和行业人士搜索和使用评测集的过程。评测集社区 CompssHub 目前已收录了学科、推理、知识、代码等12个方向的评测集&#xff0c;欢迎大家探索。 为了将评测集社区…...

MySQL外键类型与应用场景总结:优缺点一目了然

前言&#xff1a; MySQL的外键简介&#xff1a;在 MySQL 中&#xff0c;外键 (Foreign Key) 用于建立和强制表之间的关联&#xff0c;确保数据的一致性和完整性。外键的作用主要是限制和维护引用完整性 (Referential Integrity)。 主要体现在引用操作发生变化时的处理方式&…...

【含开题报告+文档+PPT+源码】基于SpringBoot+Vue的网上书店管理系统的设计与实现

开题报告 本研究论文主要介绍了基于Spring Boot框架开发的全面网上书店管理系统的构建与实现。该系统以用户为核心&#xff0c;提供了丰富的个性化服务功能。首先&#xff0c;系统支持用户进行便捷的登录注册操作&#xff0c;并具备安全可靠的密码修改机制&#xff0c;同时允许…...

力扣面试题 - 40 迷路的机器人 C语言解法

题目&#xff1a; 设想有个机器人坐在一个网格的左上角&#xff0c;网格 r 行 c 列。机器人只能向下或向右移动&#xff0c;但不能走到一些被禁止的网格&#xff08;有障碍物&#xff09;。设计一种算法&#xff0c;寻找机器人从左上角移动到右下角的路径。 网格中的障碍物和空…...

ElementPlus 自定义封装 el-date-picker 的快捷功能

文章目录 需求分析 需求 分析 我们看到官网上给出的案例如下&#xff0c;但是不太满足我们用户想要的快捷功能&#xff0c;因为不太多&#xff0c;因此需要我们自己封装一些&#xff0c;方法如下 外部自定义该组件的快捷内容 export const getPickerOptions () > {cons…...

二百八十二、ClickHouse——删除Linux中的ClickHouse

一、目的 由于ClickHosue的库表发生变化&#xff0c;需要删除原有的表结构数据&#xff0c;才能直接把脚本里文件重新安装 二、删除步骤 1、关闭ClickHouse服务 systemctl stop clickhouse-server 2、卸载ClickHouse软件包 sudo yum remove clickhouse-server clickhouse…...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

线程同步:确保多线程程序的安全与高效!

全文目录&#xff1a; 开篇语前序前言第一部分&#xff1a;线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分&#xff1a;synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分&#xff…...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

Redis数据倾斜问题解决

Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中&#xff0c;部分节点存储的数据量或访问量远高于其他节点&#xff0c;导致这些节点负载过高&#xff0c;影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)

在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马&#xff08;服务器方面的&#xff09;的原理&#xff0c;连接&#xff0c;以及各种木马及连接工具的分享 文件木马&#xff1a;https://w…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下&#xff0c;风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

LabVIEW双光子成像系统技术

双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制&#xff0c;展现出显著的技术优势&#xff1a; 深层组织穿透能力&#xff1a;适用于活体组织深度成像 高分辨率观测性能&#xff1a;满足微观结构的精细研究需求 低光毒性特点&#xff1a;减少对样本的损伤…...

Python网页自动化Selenium中文文档

1. 安装 1.1. 安装 Selenium Python bindings 提供了一个简单的API&#xff0c;让你使用Selenium WebDriver来编写功能/校验测试。 通过Selenium Python的API&#xff0c;你可以非常直观的使用Selenium WebDriver的所有功能。 Selenium Python bindings 使用非常简洁方便的A…...

从零手写Java版本的LSM Tree (一):LSM Tree 概述

&#x1f525; 推荐一个高质量的Java LSM Tree开源项目&#xff01; https://github.com/brianxiadong/java-lsm-tree java-lsm-tree 是一个从零实现的Log-Structured Merge Tree&#xff0c;专为高并发写入场景设计。 核心亮点&#xff1a; ⚡ 极致性能&#xff1a;写入速度超…...

ubuntu中安装conda的后遗症

缘由: 在编译rk3588的sdk时&#xff0c;遇到编译buildroot失败&#xff0c;提示如下&#xff1a; 提示缺失expect&#xff0c;但是实测相关工具是在的&#xff0c;如下显示&#xff1a; 然后查找借助各个ai工具&#xff0c;重新安装相关的工具&#xff0c;依然无解。 解决&am…...