当前位置: 首页 > news >正文

pyflink过滤kafka数据

from pyflink.table import (TableEnvironment, EnvironmentSettings)# 输入、输出、过滤条件
columns_in = [
...
]columns_out = [
...
]
filter_condition = "name = '蒋介石' and sex = '男'"# 创建执行环境t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///work/flink-sql-connector-kafka-3.2.0-1.19.jar")source_topic = "foo"
sink_topic = "baa"
kafka_servers = "kafka:9092"
kafka_consumer_group_id = "flink consumer"columnstr = ','.join([f"`{col}` VARCHAR"  for col in columns_in])
source_ddl = f"""
CREATE TABLE kafka_source({columnstr}) WITH ('connector' = 'kafka','topic' = '{source_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','format' = 'json')
"""columnstr2 = ','.join([f"`{col}` VARCHAR"  for col in columns_out])
sink_ddl = f"""
CREATE TABLE kafka_sink ({columnstr2}) with ('connector' = 'kafka','topic' = '{sink_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','format' = 'json')
"""
# 过滤字段
filtersql = f"""
insert into kafka_sink
select {
','.join([f"`{col}`"  for col in columns_out])
}
from kafka_source
where {filter_condition}
"""
t_env.execute_sql(filtersql)
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

相关文章:

pyflink过滤kafka数据

from pyflink.table import (TableEnvironment, EnvironmentSettings)# 输入、输出、过滤条件 columns_in [ ... ]columns_out [ ... ] filter_condition "name 蒋介石 and sex 男"# 创建执行环境t_env TableEnvironment.create(EnvironmentSettings.in_stream…...

Webpack 完整指南

​🌈个人主页:前端青山 🔥系列专栏:Webpack篇 🔖人终将被年少不可得之物困其一生 依旧青山,本期给大家带来webpack篇专栏内容:webpack介绍 目录 介绍 一、webpack 1.1、webpack是什么 1.2 webpack五个核心配置 1.…...

如何在 Ubuntu20.04 安装FTP Server vsftpd

1.安装: sudo apt-get install vsftpd 2.启动 sudo service vsftpd start //启动 sudo service vsftpd stop //停止 sudo service vsftpd restart //重新启动 3.打开配置文件 sudo nano /etc/vsftpd.conf 4.配置:限制在指定目录&…...

基于FPGA的DDS信号发生器(图文并茂+深度原理解析)

篇幅有限,本文详细源文件已打包 至个人主页资源,需要自取...... 前言 DDS(直接数字合成)技术是先进的频率合成手段,在数字信号处理与硬件实现领域作用关键。它因低成本、低功耗、高分辨率以及快速转换时间等优点备受认可。 本文着重探究基于 FPGA 的简易 DDS 信号发生器设…...

QT:绘制事件和定时器

1.绘制时针 xx.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTimer> #include<QPainter> #include <QTime>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpubl…...

【算法——递归回溯】

这个东西还是很重要的&#xff0c;直接决定了你的动态规划章节的学习深度 78. 子集 方法1&#xff1a; vector<vector<int>>V; void dfs(vector<int> v,vector<int> nums,int index) {if(indexnums.size()) V.push_back(v);else{v.push_back(nums[i…...

手机在网状态接口的使用和注意事项

手机在网状态接口是用于查询手机号码在运营商数据库中的实时状态的工具&#xff0c;这种接口在互联网金融、贷款、租赁、保险等相关行业中尤为重要&#xff0c;因为它可以帮助这些行业进行更有效的风控审核。以下是对手机在网状态接口的详细介绍&#xff1a; 一、手机在网状态…...

WebGl 使用uniform变量动态修改点的颜色

在WebGL中&#xff0c;uniform变量用于在顶点着色器和片元着色器之间传递全局状态信息&#xff0c;这些信息在渲染过程中不会随着顶点的变化而变化。uniform变量可以用来设置变换矩阵、光照参数、材料属性等。由于它们在整个渲染过程中共享&#xff0c;因此可以被所有使用该着色…...

Leetcode 划分字母区间

题目要求&#xff1a; 将字符串 s 划分成尽量多的片段&#xff0c;保证每个片段中出现的字母不会出现在其他片段中。 具体解释如下&#xff1a; 尽量多的片段&#xff1a;题目要求的是在划分过程中&#xff0c;我们要尽量让划分的片段数量最大化&#xff0c;而不是最少化。每…...

可编辑div遇到的那些事

在日常开发中有时可能会遇到input 或 textarea 不能满足的开发场景&#xff0c;比如多行输入的情况下&#xff0c;textarea 的右下角icon 无法去除, 所以此时可以使用div 设置可编辑状态&#xff0c;完成功能开发&#xff0c;在开发的过程中仍会遇到一下问题。 1&#xff0c;如…...

什麼是高速HTTP代理?

高速HTTP代理是一種用於加速和優化互聯網連接的技術。它通過在用戶和目標網站之間充當仲介伺服器&#xff0c;幫助用戶快速訪問網路資源。HTTP代理不僅可以提高訪問速度&#xff0c;還能提供一定程度的隱私保護和安全性。 高速HTTP代理的工作原理 HTTP代理伺服器位於用戶設備…...

三子棋(C 语言)

目录 一、游戏设计的整体思路二、各个步骤的代码实现1. 菜单及循环选择的实现2. 棋盘的初始化和显示3. 轮流下棋及结果判断实现4. 结果判断实现 三、所有代码四、总结 一、游戏设计的整体思路 &#xff08;1&#xff09;提供一个菜单让玩家选择人机对战、玩家对战或者退出游戏…...

HWS赛题 入门 MIPS Pwn-Mplogin(MIPS_shellcode)

解题所涉知识点&#xff1a; 泄露或修改内存数据&#xff1a; 堆地址&#xff1a;栈地址&#xff1a;栈上数据的连带输出(Stack Leak) && Stack溢出覆盖内存libc地址&#xff1a;BSS段地址&#xff1a; 劫持程序执行流程&#xff1a;[[MIPS_ROP]] 获得shell或flag&am…...

纯血鸿蒙启动公测,爱加密鸿蒙加固平台发布,助力鸿蒙应用安全运营!

鸿蒙系统打破了移动操作系统两极格局&#xff0c;实现操作系统核心技术的自主可控、安全可靠&#xff0c;在神州大地上掀起一波科技革新的浪潮&#xff0c;HarmonyOS NEXT成为大型企业必须要布局的应用系统之一。 HarmonyOS NEXT于10月8日正式开启公测&#xff0c;距离面向全体…...

MySQL中 truncate、drop和delete的区别

MySQL中 truncate、drop和delete区别 truncate 执行速度快&#xff0c;删除所有数据&#xff0c;但是保留表结构不记录日志事务不安全&#xff0c;不能回滚可重置自增主键计数器 drop 执行速度较快&#xff0c;删除整张表数据和结构不记录日志事务不安全&#xff0c;不能回…...

什么开放式耳机值得买?开放式耳机推荐排行榜!

长时间佩戴传统入耳式耳机有时可能会影响耳道健康&#xff0c;鉴于此&#xff0c;转而选择不入耳设计的开放式耳机就成了不少人的新倾向&#xff0c;它们有助于减少细菌滋生和耳道闷热的烦恼。为了帮助大家找到合适的选项&#xff0c;下面我将列举一些市面上口碑不错的开放式耳…...

Apache Doris的分区与分桶详解

目录 第一章 Doris介绍和分区分桶作用 1.1 Doris背景介绍 1.2 分区与分桶的意义 第二章 原理解析 2.1 分区机制 2.1.1 定义 2.1.2 类型 2.1.3 工作原理 2.2 分桶机制 2.2.1 概念 2.2.2 实现方式 2.2.3 与分区的关系 第三章 手动分区与自动分区对比 3.1 手动分区 …...

docker详解介绍+基础操作 (二)info详解

1 docker相关信息和优化配置 1&#xff09;查看docker版本详解 rootzz:~# docker version Client: Docker Engine - CommunityVersion: 27.3.1API version: 1.47Go version: go1.22.7Git commit: ce12230Built: Fri Sep 20 11:40:…...

C0023.在Clion中创建控件,对控件进行提升为自定义控件的步骤

新建Ui界面文件 修改新生成的ui文件头文件 关闭之前打开的ui文件&#xff0c;如上图Qt Designer中打开的&#xff0c;然后修改新生成的ui文件对应的头文件&#xff0c;改成自己需要的控件类即可。 提升控件为自定义类 将如下头文件中的类名和头文件名输入到提升窗口中&#…...

探索 C# 常用第三方库与框架

在 C# 开发中&#xff0c;第三方库和框架极大地提高了开发效率和代码质量。通过这些库&#xff0c;开发者可以快速处理 JSON 数据、简化对象映射、记录日志、以及高效地与数据库交互。本文将介绍四个常用的 C# 第三方库&#xff1a;Newtonsoft.Json、AutoMapper、NLog/Serilog …...

盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来

一、破局&#xff1a;PCB行业的时代之问 在数字经济蓬勃发展的浪潮中&#xff0c;PCB&#xff08;印制电路板&#xff09;作为 “电子产品之母”&#xff0c;其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透&#xff0c;PCB行业面临着前所未有的挑战与机遇。产品迭代…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

ffmpeg(四):滤镜命令

FFmpeg 的滤镜命令是用于音视频处理中的强大工具&#xff0c;可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下&#xff1a; ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜&#xff1a; ffmpeg…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

基于matlab策略迭代和值迭代法的动态规划

经典的基于策略迭代和值迭代法的动态规划matlab代码&#xff0c;实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

LLMs 系列实操科普(1)

写在前面&#xff1a; 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容&#xff0c;原视频时长 ~130 分钟&#xff0c;以实操演示主流的一些 LLMs 的使用&#xff0c;由于涉及到实操&#xff0c;实际上并不适合以文字整理&#xff0c;但还是决定尽量整理一份笔…...