当前位置: 首页 > news >正文

3.阿里云flinkselectdb-py作业

1.概述

Python API中文文档
本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发;
先说结论:
在实际开发中遇到了很多问题,导致python作业基本基本无法运行。最后放弃了;

  • python作业中的标量函数的错误没有日志,永远是报这个错误:ExceptionInChainedOperatorException: Could not forward element to next operator,定位不到具体问题;
  • python作业中的用户定义的标量函数基本无法运行。本地测试没有问题的函数,提交到flink中就报错。怀疑是环境中没有flink-python.jar,自己上传此jar和flink中的包不兼容(阿里云flink和开源版本flink有些jar包不一样);
  • 如果各位遇到些问题并且有解决方案,麻烦也告知我,非常感谢;

2.目标

把阿里云sls日志中的数据准实时同步到云服务selectdb;

源表flink结果表
阿里云sls实时计算flink云服务selectdb

3.步骤

3.1.搭建环境

#**创建虚拟环境essa-flink,pyhton版本为3.11.9
conda create -n essa-flink python=3.11.9#**安装apache-flink-1.20版本。安装的依赖比较大,指定国内的pip源
pip install apache-flink==1.20.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

3.2.创建作业

作业代码本身很简单,逐行读取sls的日志,进行转换后保存到selectdb中。转换函数为do_active_log,在本地测试过程中遇到了第一个问题后,很轻松愉快就通过了。部署在flink中出现了其它问题;

  • 首先是阿里云提供sls连接器(ververica-connector-sls-1.17-vvr-8.0.8.jar)不可用,报错缺少com/alibaba/ververica/connectors/sls/source/SLSRecordReader。查看源码,确实没有定义此类。提工单后,建设使用低版本解决;
  • 然后报错缺少flink-python,不能执行python函数。于是把flink-python上传,并在作业中引用依赖;
  • 最后报错ExceptionInChainedOperatorException: Could not forward element to next,无法执行。把作业中函数调用do_active_log删除后正常。提工单后还是没有解决。最后放弃,改用jar作业;
def do_active_log(row: Row) -> Row:'''用户登录日志处理'''logging.info('执行do_active_log函数...')params = json.loads(row[2])occurred = datetime.fromtimestamp(float(row[1]))user_id = params['userId']platform = params['platform']last_active_time = occurredcreate_time = occurredid = occurred.strftime("%Y%m%d") + str(user_id)return Row(str(id), int(user_id), platform, last_active_time, create_time)def create_active_log_sink_table(table_env: StreamTableEnvironment, sink_table: str):'''创建用户登录日志结果表'''sql = '''create temporary table {}(id string,user_id int,platform string,last_active_time timestamp,create_time timestamp,primary key(id) not enforced) with ('connector' = 'doris','fenodes' = '{}','table.identifier' = '{}','username' = '{}','password' = '{}','sink.properties.format' = 'json')'''.format(sink_table, sink_config['fenodes'], sink_config['table.identifier'], sink_config['username'], sink_config['password'])table_env.execute_sql(sql)def get_soruce_datastream(table_env: StreamTableEnvironment):'''创建datastream'''times = {'start_time': '', 'stop_time': ''}sql = '''create temporary table essa_ubc(ip string,`time` string,content string,__topic__ string metadata virtual,__source__ string metadata virtual,__timestamp__ string metadata virtual) with ('connector' = 'sls','endpoint' = '{}','accessId' = '{}','accessKey' = '{}','project' = '{}','logstore' ='essa-ubc','startTime' = '{}','stopTime' = '{}','exitAfterFinish' = 'true')'''.format(source_config['sls_endpoint'], source_config['access_id'], source_config['access_secret'],source_config['sls_project'], times['start_time'], times['stop_time'])table_env.execute_sql(sql)source_table = table_env.from_path('essa_ubc')return table_env.to_append_stream(source_table, Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),Types.STRING(), Types.STRING()]))if __name__ == '__main__':env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)#**加载依赖的jar包t_env.get_config().set("pipeline.jars", "依赖包.jar")#**创建sls源ds = get_soruce_datastream(t_env)#**用户登录日志处理#**读取sls日志数据,然后使用自定义标量函数处理数据ds = ds.filter(lambda d: d[3] == 'activeLog').map(do_active_log, Types.ROW([Types.STRING(), Types.INT(), Types.STRING(),Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()]))table = t_env.from_data_stream(ds)active_log_sink_table = 'user_active_log'create_active_log_sink_table(t_env, active_log_sink_table)table.execute_insert(active_log_sink_table).wait()

相关文章:

3.阿里云flinkselectdb-py作业

1.概述 Python API中文文档 本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发; 先说结论: 在实际开发中遇到了很多问题,导致python作业基本基本无法…...

MATLAB语言的网络编程

标题:MATLAB中的网络编程:深入探索与实践 一、引言 在现代科学和工程领域中,网络编程已经成为了数据处理、信号分析、模型构建等众多任务中不可或缺的一环。MATLAB作为一款强大的数学计算软件,不仅提供了丰富的数值计算功能&…...

深入浅出 Linux 操作系统

深入浅出 Linux 操作系统 引言 在当今数字化的时代,Linux 操作系统无处不在。从支撑互联网巨头庞大的数据中心,到嵌入智能家居设备的微型芯片,Linux 都发挥着关键作用。然而,对于许多人来说,Linux 仍笼罩着一层神秘的…...

golang实现生产者消费者模式

在Go语言中,生产者消费者模式可以通过使用Goroutines和Channels来实现。Goroutines允许并发执行,而Channels则用于在生产者和消费者之间安全地传递数据。 生产者消费者模式的基本思路 生产者:负责生成数据并将其放入一个共享的缓冲区&#xf…...

自动化测试-Pytest测试

目录 pytest简介 基本测试实例 编写测试文件 执行测试 pytest运行时参数 mark标记 Fixture pytest插件 Allure测试报告 测试步骤 pytest简介 Pytest‌是一个非常流行的Python测试框架,它支持简单的单元测试和复杂的功能测试,具有易于上手、功…...

Ingress-Nginx Annotations 指南:配置要点全方面解读(下)

文章目录 1.HTTP2 Push Preload2.Server Alias3.Server snippet4.Client Body Buffer Size5.External Authentication6.Global External Authentication7.Rate Limiting8.Global Rate Limiting9.Permanent Redirect10.Permanent Redirect Code11.Temporal Redirect12.SSL Passt…...

【QED】等式构造

文章目录 题目题目描述输入输出格式数据范围测试样例 思路代码复杂度分析时间复杂度空间复杂度 题目 题目链接🔗 题目描述 有关 「上述等式为何正确」 的问题解决了,然而 「如何构造出上述那种让人啼笑皆非的正确等式」 成为了一个新的问题。 我们认…...

Kafka数据迁移全解析:同集群和跨集群

文章目录 一、同集群迁移二、跨集群迁移 Kafka两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。 一、同集群迁移 应用场景: broker 迁移 主要使用的场景是broker 上线,下线,或者扩容等.基于同一套zookeeper的操作。 实践: 将需要新添加…...

Debian安装配置RocketMQ

安装配置 本次安装在/tools/rocket目录下 下载 wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.1/rocketmq-all-5.3.1-bin-release.zip 解压缩 unzip rocketmq-all-5.3.1-bin-release.zip 如果出现以下报错 -bash: unzip: command not found可安装unzip工具后执…...

vue之axios基本使用

文章目录 1. axios 网络请求库2. axiosvue 1. axios 网络请求库 <body> <input type"button" value"get请求" class"get"> <input type"button" value"post请求" class"post"> <!-- 官网提供…...

三只脚的电感是什么东西?

最近在做加湿器&#xff0c;把水雾化的陶瓷片需要有专门的驱动电路。 我参考了某宝卖家的驱动板以及网上的开源项目&#xff0c;发现了驱动电路的核心就是一个三脚电感。 在此之前我都没注意过这玩意&#xff0c;三脚电感不也还是电感嘛&#xff1f; 今天我们就来看看三脚电…...

【数据库学习笔记】SQL触发器(例题+代码)

数据库SQL 1、触发器概念 &#xff08;1&#xff09;触发器&#xff08;trigger&#xff09;是用户定义在关系表上的一类由事件驱动的存储过程&#xff0c;由服务器自动激活。 &#xff08;2&#xff09;触发器可进行更为复杂的检查和操作&#xff0c;具有更精细和更强大的数…...

Unittest02|TestSuite、TestRunner、HTMLTestRunner、处理excel表数据、邮件接收测试结果

目录 八、测试套件TestSuite和测试运行器TestRunner 1、基本概念 2、创建和使用测试套件 3、 自动发现测试用例、创建测试套件、运行测试 4、生成html的测试报告&#xff1a;HTMLTestRunner 1️⃣导入HTMLTestRunner模块 2️⃣运行测试用例并生成html文件 九、unittest…...

BAPI_BATCH_CHANGE在更新后不自动更新批次特征

1、问题介绍 在CL03中看到分类特性配置了制造日期字段&#xff0c;并绑定了生产日期字段MCH1~HSDAT MSC2N修改批次的生产日期字段时&#xff0c;自动修改了对应的批次特性 但是通过BAPI&#xff1a;BAPI_BATCH_CHANGE修改生产日期时&#xff0c;并没有更新到批次特性中 2、BAPI…...

顶会评测集解读-AlignBench: 大语言模型中文对齐基准

评测集社区 CompssHub 作为司南 OpenCompass大模型评测体系的重要组成部分&#xff0c;致力于简化并加快研究人员和行业人士搜索和使用评测集的过程。评测集社区 CompssHub 目前已收录了学科、推理、知识、代码等12个方向的评测集&#xff0c;欢迎大家探索。 为了将评测集社区…...

MySQL外键类型与应用场景总结:优缺点一目了然

前言&#xff1a; MySQL的外键简介&#xff1a;在 MySQL 中&#xff0c;外键 (Foreign Key) 用于建立和强制表之间的关联&#xff0c;确保数据的一致性和完整性。外键的作用主要是限制和维护引用完整性 (Referential Integrity)。 主要体现在引用操作发生变化时的处理方式&…...

【含开题报告+文档+PPT+源码】基于SpringBoot+Vue的网上书店管理系统的设计与实现

开题报告 本研究论文主要介绍了基于Spring Boot框架开发的全面网上书店管理系统的构建与实现。该系统以用户为核心&#xff0c;提供了丰富的个性化服务功能。首先&#xff0c;系统支持用户进行便捷的登录注册操作&#xff0c;并具备安全可靠的密码修改机制&#xff0c;同时允许…...

力扣面试题 - 40 迷路的机器人 C语言解法

题目&#xff1a; 设想有个机器人坐在一个网格的左上角&#xff0c;网格 r 行 c 列。机器人只能向下或向右移动&#xff0c;但不能走到一些被禁止的网格&#xff08;有障碍物&#xff09;。设计一种算法&#xff0c;寻找机器人从左上角移动到右下角的路径。 网格中的障碍物和空…...

ElementPlus 自定义封装 el-date-picker 的快捷功能

文章目录 需求分析 需求 分析 我们看到官网上给出的案例如下&#xff0c;但是不太满足我们用户想要的快捷功能&#xff0c;因为不太多&#xff0c;因此需要我们自己封装一些&#xff0c;方法如下 外部自定义该组件的快捷内容 export const getPickerOptions () > {cons…...

二百八十二、ClickHouse——删除Linux中的ClickHouse

一、目的 由于ClickHosue的库表发生变化&#xff0c;需要删除原有的表结构数据&#xff0c;才能直接把脚本里文件重新安装 二、删除步骤 1、关闭ClickHouse服务 systemctl stop clickhouse-server 2、卸载ClickHouse软件包 sudo yum remove clickhouse-server clickhouse…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

USB Over IP专用硬件的5个特点

USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中&#xff0c;从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备&#xff08;如专用硬件设备&#xff09;&#xff0c;从而消除了直接物理连接的需要。USB over IP的…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

push [特殊字符] present

push &#x1f19a; present 前言present和dismiss特点代码演示 push和pop特点代码演示 前言 在 iOS 开发中&#xff0c;push 和 present 是两种不同的视图控制器切换方式&#xff0c;它们有着显著的区别。 present和dismiss 特点 在当前控制器上方新建视图层级需要手动调用…...

CSS | transition 和 transform的用处和区别

省流总结&#xff1a; transform用于变换/变形&#xff0c;transition是动画控制器 transform 用来对元素进行变形&#xff0c;常见的操作如下&#xff0c;它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...

如何应对敏捷转型中的团队阻力

应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中&#xff0c;明确沟通敏捷转型目的尤为关键&#xff0c;团队成员只有清晰理解转型背后的原因和利益&#xff0c;才能降低对变化的…...