当前位置: 首页 > news >正文

pyflink 环境测试以及测试案例

1. py 的 环境以来采用Anaconda环境包

安装版本:https://www.anaconda.com/distribution/#download-section
Python3.8.8版本:Anaconda3-2021.05-Linux-x86_64.sh
下载地址
https://repo.anaconda.com/archive/

2. 安装

bash Anaconda3-2021.05-Linux-x86_64.sh

2.1 如图

在这里插入图片描述

在这里插入图片描述

3. 配置配置anaconda的环境变量:

vim /etc/profile
##增加如下配置
export ANACONDA_HOME=/root/anaconda3/bin
export PATH=$PATH:$ANACONDA_HOME/bin
重新加载环境变量: source /etc/profile

4. 修改bashrc文件

sudo vim ~/.bashrc
添加如下内容:
export PATH=~/anaconda3/bin:$PATH

说明:

profile
其实看名字就能了解大概了, profile 是某个用户唯一的用来设置环境变量的地方, 因为用户可以有多个 shell 比如 bash, sh, zsh 之类的, 但像环境变量这种其实只需要在统一的一个地方初始化就可以了, 而这就是 profile.
bashrc
bashrc 也是看名字就知道, 是专门用来给 bash 做初始化的比如用来初始化 bash 的设置, bash 的代码补全, bash 的别名, bash 的颜色. 以此类推也就还会有 shrc, zshrc 这样的文件存在了, 只是 bash 太常用了而已.

5. 启动anaconda并测试

注意: 请将当前连接node1的节点窗口关闭,然后重新打开,否则无法识别
如图:
在这里插入图片描述

如果没有可以重启服务器。

如果大家发现命令行最前面出现了 (base) 信息, 可以通过以下方式, 退出Base环境

vim ~/.bashrc
拉到文件的最后面: 输入 i 进入插入模式
将以下内容添加:
conda deactivate

6. Anaconda相关组件命令

地址:https://www.continuum.io/downloads

安装包:pip install xxx,conda install xxx
卸载包:pip uninstall xxx,conda uninstall xxx
升级包:pip install upgrade xxx,conda update xxx

6.1 功能:

Anaconda自带,无需单独安装
实时查看运行过程
基本的web编辑器(本地)
ipynb 文件分享
可交互式
记录历史运行结果
修改jupyter显示的文件路径:
通过jupyter notebook --generate-config命令创建配置文件,之后在进入用户文件夹下面查看.jupyter隐藏文件夹,修改其中文件jupyter_notebook_config.py的202行为计算机本地存在的路径。

IPython:

命令:ipython,其功能如下
1.Anaconda自带,无需单独安装
2.Python的交互式命令行 Shell
3.可交互式
4.记录历史运行结果
5.及时验证想法

7. Anaconda中的conda命令做详细介绍和配置。

** 7.1. conda命令及pip命令**

conda install  包名    pip install 包名
conda uninstall 包名   pip uninstall 包名
conda install -U 包名   pip install -U 包名

7.2 Anaconda设置为国内下载镜像

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --set show_channel_urls yes

7.3 conda创建虚拟环境

conda env list
conda create py_env python=3.8.8 #创建python3.8.8环境#现在使用以下命令激活新创建的环境:
source activate py_env   
# 或者
conda activate py_env   
deactivate py_env #退出环境

----------------------------------------------- Pyflink 环境安装-------------------------------------

8. pyflink 环境安装

激活虚拟环境

source ~/Documents/install/miniconda/bin/activate

创建 pyflink 虚拟环境

conda create --name py310_pyflink171_venv -y -q python=3.10.8
conda activate py310_pyflink171_venv
pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
pip install apache-flink==1.17.1 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517

或者 flink 14.5

# 创建 pyflink 虚拟环境
conda create --name py314_pyflink171_venv -y -q python=3.8.8
conda activate py314_pyflink171_venv
pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
pip install apache-flink==1.14.5 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517

9. 官网测试例子

地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/api/python/examples/table/word_count.html

vi word_count2.py

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import argparse
import logging
import sysfrom pyflink.table import TableEnvironment, EnvironmentSettings, TableDescriptor, Schema,\DataTypes, FormatDescriptor
from pyflink.table.expressions import col, lit
from pyflink.table.udf import udfwords = ["flink", "window", "timer", "event_time", "processing_time", "state","connector", "pyflink", "checkpoint", "watermark", "sideoutput", "sql","datastream", "broadcast", "asyncio", "catalog", "batch", "streaming"]max_word_id = len(words) - 1def streaming_word_count(output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# define the source# randomly select 5 words per second from a predefined listt_env.create_temporary_table('source',TableDescriptor.for_connector('datagen').schema(Schema.new_builder().column('word_id', DataTypes.INT()).build()).option('fields.word_id.kind', 'random').option('fields.word_id.min', '0').option('fields.word_id.max', str(max_word_id)).option('rows-per-second', '5').build())tab = t_env.from_path('source')# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udf(result_type=DataTypes.STRING())def id_to_word(word_id):return words[word_id]# compute word counttab.select(id_to_word(col('word_id'))).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)streaming_word_count(known_args.output)

10.执行命令

 python word_count2.py

11. 执行结果

在这里插入图片描述

12 实例2

12.1 安装mysql docker 安装
https://blog.csdn.net/wudonglianga/article/details/133927305
12.2 创建表语句

CREATE TABLE `bigdatauser` (`id` int(32) NOT NULL AUTO_INCREMENT,`name` varchar(64) DEFAULT NULL,`age` int(32) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4

12.3 python 脚本

from pyflink.table import EnvironmentSettings, TableEnvironmentprint('step 01')
# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
print('step 02')table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-connector-jdbc_2.12-1.14.5.jar;file:/opt/flink/lib/mysql-connector-java-5.1.49.jar")
print('step 03')# 2. create source Table=
table_env.execute_sql("""CREATE TABLE products (id INT,name STRING,age INT,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'jdbc',  'url' = 'jdbc:mysql://192.168.43.185:3306/wudl','username'='root','password'='123456','table-name' = 'bigdatauser')
""")# 3. create sink Table
table_env.execute_sql("""CREATE TABLE dproducts (id int,name STRING,age int) WITH ('connector' = 'print')
""")print('step 04')
table_env.execute_sql("INSERT INTO dproducts SELECT  p.id, p.name, p.age FROM products AS p").wait()
print('step 06')

12.4 执行结果
在这里插入图片描述

13. 任务提交服务器运行

打包运行环境

# 找到 minconda(安装路径 envs目录下) 或者对应虚拟环境安装目录
# 打包 py310_pyflink171_venv 虚拟环境
cd ~/Documents/install/miniconda/env
zip -r py310_pyflink171_venv.zip py310_pyflink171_venv

** 1.提交至 jobmanager**

./flink run \
--jobmanager localhost:8081 \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-py /workplace/src/word_count.py

2. 带目录,指定入口模块提交

./flink run \
--jobmanager localhost:8081 \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyfs /workplace/src \
-pym word_count

3. 提交至 yarn 集群管理
提交运行
3.1.本地 py虚拟环境

./flink run -m yarn-cluster \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-py word_count.py

3.2. hdfs py虚拟环境

./flink run  -m yarn-cluster \
-pyarch hdfs://dae-ns/py_env/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv \
-py word_count.py

3.3.带目录 src

./bin/flink run-application -t yarn-application \
-Dyarn.application.name=wordcount \
-Dyarn.ship-files=/workplace/src \
-pyarch shipfiles/py310_pyflink171_venv.zip \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyfs src \
-pym word_count

注意:

虚拟环境打包,该虚拟环境创建方式建议使用 conda,或者virtualenv --always-copy 方式创建,这样打的虚拟环境更全
提交虚拟环境地址:py310_pyflink171_venv.zip/py310_pyflink171_venv 注意这个地址是双层

相关文章:

pyflink 环境测试以及测试案例

1. py 的 环境以来采用Anaconda环境包 安装版本:https://www.anaconda.com/distribution/#download-section Python3.8.8版本:Anaconda3-2021.05-Linux-x86_64.sh 下载地址 https://repo.anaconda.com/archive/ 2. 安装 bash Anaconda3-2021.05-Linux-x…...

EtherNet/IP转Modbus TCP协议网关的接口

远创智控的YC-EIPM-TCP网关产品,它有什么作用呢?一起来了解一下吧! 远创智控YC-EIPM-TCP网关产品可以通过各种数据接口和工业领域的仪表、PLC、计量设备等产品连接,实时采集这些设备中的运行数据、状态数据等信息,并把…...

视频集中存储/视频监控管理平台EasyCVR如何免密登录系统?详细操作如下

视频云存储/安防监控EasyCVR视频汇聚平台基于云边端智能协同,支持海量视频的轻量化接入与汇聚、转码与处理、全网智能分发、视频集中存储等。音视频流媒体视频平台EasyCVR拓展性强,视频能力丰富,具体可实现视频监控直播、视频轮播、视频录像、…...

京东商品详情API接口(标题|主图|SKU|价格|库存..)

京东商品详情接口的应用场景有很多,以下为您推荐几种: 电商平台集成:如果想要实现商品查询、购买、支付等功能,提高自身平台的电商能力,可以将京东API接口集成到自己的电商网站或应用程序中。第三方开发者插件&#x…...

Istio Service Entry介绍

目录 ServiceEntry.Resolution 解析模式 STATC模式 场景一:将http地址:httpbin.org:80 解析到192.168.1.1:8080 场景二:将TCP地址:httpbin.org:8080 解析到192.168.1.1:8080 DNS模式 场景一:服务网格内部访问外部…...

设备巡检管理系统有什么用?企业如何提高生产效率和生产安全?

在当今工业生产领域,设备巡检的重要性不言而喻。然而,传统巡检方式存在的诸多问题,如数据不规范、漏检误检等,严重制约了企业生产效率和产品质量。为解决这一问题,我们推出了一款设备巡检管理系统——“的修”工单管理…...

浅谈单例模式

饿汉式懒汉式/Double check(双重检索)静态内部类枚举单例 饿汉式 private static final DispatchSingleton instence new DispatchSingleton();public static DispatchSingleton getInstence() {return instence;} 饿汉式是在jvm加载这个单例类的时候&…...

【非root用户、CentOS系统】中使用源码安装gcc/g++的教程

1.引言 系统:CentOS-7.9 显卡驱动版本:460 CUDA Version: 11.2 🌼基于本地环境选择安装gcc-10.1.0 👉 gcc下载网址 gcc与cuda版本的对应关系: 2.安装说明 下载好对应的gcc的安装包并解压: 打开gcc-10.1.0/…...

Qemu镜像安全加密测试

文章目录 简介1.已经过时的qemu自带的加密方式介绍1.1.创建secret uuid的xml1.2.产生uuid1.3.给secret赋值1.4.创建一个存储池1.5.在存储池中创建一个镜像1.6.在虚拟机中使用该镜像 2.弃用以上加密方式2.1.原作者Daniel Berrange的观点2.2.Markus Armbruster更深入的操作 3. LU…...

Ubuntu 18.04 LTS中cmake-gui编译opencv-3.4.16并供Qt Creator调用

一、安装opencv 1.下载opencv-3.4.16的源码并解压 2.在解压后的文件夹内新建文件夹build以及opencv_install 3.启动cmake-gui并设置 sudo cmake-gui(1)设置界面中source及build路径 (2)点击configure,选择第一个def…...

SpringBoot (2) yaml,整合项目

目录 1 YAML配置文件 1.1 书写规则 1.2 代码示例 1.3 用yaml进行复杂数据绑定 2 整合日志 2.1 日志配置 3 整合web 3.1 默认配置 3.2 web应用开发方式 3.2.1 全自动 3.2.2 全手动 3.2.3 手自一体(推荐) 4 整合mybatis 4.1 导包 4.2 application.yaml 4.3 dao接…...

django建站过程(2)创建第一个应用程序页面

创建第一个应用程序页面 设置第一个页面【settings.py,urls.py,views.py】settings.pyurls.pyviews.py django是由一系列应用程序组成,协同工作,让项目成为一个整体。前面已创建了一个应用程序baseapp,使用的命令 python manage.py startapp baseapps这…...

竞赛 深度学习人体语义分割在弹幕防遮挡上的实现 - python

文章目录 1 前言1 课题背景2 技术原理和方法2.1基本原理2.2 技术选型和方法 3 实例分割4 实现效果5 最后 1 前言 🔥 优质竞赛项目系列,今天要分享的是 🚩 深度学习人体语义分割在弹幕防遮挡上的应用 该项目较为新颖,适合作为竞…...

网络编程开发及实战(下)

一、IO模型 一、基本概念 (一)I/O基本概念 1、基本概念 1)一个完整I/O分为两个阶段: 用户进程空间->内核空间 内核空间->设备空间(磁盘、网卡) 2)内存I/O(无名管道&…...

(H5轮播)vue一个轮播里显示多个内容/一屏展示两个半内容

效果图 : html: <div class"content"><van-swipeclass"my-swipe com-long-swipe-indicator":autoplay"2500"indicator-color"#00C4FF"><van-swipe-itemclass"flex-row-wrap"v-for"(items, index) in M…...

【Proteus仿真】【Arduino单片机】蜂鸣器

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真Arduino单片机控制器&#xff0c;使用蜂鸣器等。 主要功能&#xff1a; 系统运行后&#xff0c;蜂鸣器播放音乐。 二、软件设计 /* 作者&#xff1a;嗨小易&#xff08;QQ&#x…...

seatunnel web ui 构建时报错

报错内容如下 Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.29.0:check (default) on project seatunnel-web: The following files had format violations:Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.29.0:check (defa…...

Js使用ffmpeg在视频中添加png或gif

Js使用ffmpeg在视频中添加png或gif ffmpeg 使用场景是需要在web端对视频进行编辑 添加图片和gif。 注意: 以下所有的使用案例均基于vue3 setup。 同时由于ffmpeg版本不同会导致使用的api不同&#xff0c;使用案例前需要注意ffmpeg版本问题。 如果使用的是0.12需要使用新的…...

多线程 Leetcode 打印零与奇偶数

现有函数 printNumber 可以用一个整数参数调用&#xff0c;并输出该整数到控制台。 例如&#xff0c;调用 printNumber(7) 将会输出 7 到控制台。 给你类 ZeroEvenOdd 的一个实例&#xff0c;该类中有三个函数&#xff1a;zero、even 和 odd 。ZeroEvenOdd 的相同实例将会传递…...

杭电oj--数列有序

有n(n<100)个整数&#xff0c;已经按照从小到大顺序排列好&#xff0c;现在另外给一个整数x&#xff0c;请将该数插入到序列中&#xff0c;并使新的序列仍然有序。 输入数据包含多个测试实例&#xff0c;每组数据由两行组成&#xff0c;第一行是n和m&#xff0c;第二行是已…...

工业安全零事故的智能守护者:一体化AI智能安防平台

前言&#xff1a; 通过AI视觉技术&#xff0c;为船厂提供全面的安全监控解决方案&#xff0c;涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面&#xff0c;能够实现对应负责人反馈机制&#xff0c;并最终实现数据的统计报表。提升船厂…...

三维GIS开发cesium智慧地铁教程(5)Cesium相机控制

一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点&#xff1a; 路径验证&#xff1a;确保相对路径.…...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

AGain DB和倍数增益的关系

我在设置一款索尼CMOS芯片时&#xff0c;Again增益0db变化为6DB&#xff0c;画面的变化只有2倍DN的增益&#xff0c;比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析&#xff1a; 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...

【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)

LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 题目描述解题思路Java代码 题目描述 题目链接&#xff1a;LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...

如何应对敏捷转型中的团队阻力

应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中&#xff0c;明确沟通敏捷转型目的尤为关键&#xff0c;团队成员只有清晰理解转型背后的原因和利益&#xff0c;才能降低对变化的…...

Leetcode33( 搜索旋转排序数组)

题目表述 整数数组 nums 按升序排列&#xff0c;数组中的值 互不相同 。 在传递给函数之前&#xff0c;nums 在预先未知的某个下标 k&#xff08;0 < k < nums.length&#xff09;上进行了 旋转&#xff0c;使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...

spring Security对RBAC及其ABAC的支持使用

RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型&#xff0c;它将权限分配给角色&#xff0c;再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...