当前位置: 首页 > news >正文

pyflink过滤kafka数据

from pyflink.table import (TableEnvironment, EnvironmentSettings)# 输入、输出、过滤条件
columns_in = [
...
]columns_out = [
...
]
filter_condition = "name = '蒋介石' and sex = '男'"# 创建执行环境t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///work/flink-sql-connector-kafka-3.2.0-1.19.jar")source_topic = "foo"
sink_topic = "baa"
kafka_servers = "kafka:9092"
kafka_consumer_group_id = "flink consumer"columnstr = ','.join([f"`{col}` VARCHAR"  for col in columns_in])
source_ddl = f"""
CREATE TABLE kafka_source({columnstr}) WITH ('connector' = 'kafka','topic' = '{source_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','format' = 'json')
"""columnstr2 = ','.join([f"`{col}` VARCHAR"  for col in columns_out])
sink_ddl = f"""
CREATE TABLE kafka_sink ({columnstr2}) with ('connector' = 'kafka','topic' = '{sink_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','format' = 'json')
"""
# 过滤字段
filtersql = f"""
insert into kafka_sink
select {
','.join([f"`{col}`"  for col in columns_out])
}
from kafka_source
where {filter_condition}
"""
t_env.execute_sql(filtersql)
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

相关文章:

pyflink过滤kafka数据

from pyflink.table import (TableEnvironment, EnvironmentSettings)# 输入、输出、过滤条件 columns_in [ ... ]columns_out [ ... ] filter_condition "name 蒋介石 and sex 男"# 创建执行环境t_env TableEnvironment.create(EnvironmentSettings.in_stream…...

Webpack 完整指南

​🌈个人主页:前端青山 🔥系列专栏:Webpack篇 🔖人终将被年少不可得之物困其一生 依旧青山,本期给大家带来webpack篇专栏内容:webpack介绍 目录 介绍 一、webpack 1.1、webpack是什么 1.2 webpack五个核心配置 1.…...

如何在 Ubuntu20.04 安装FTP Server vsftpd

1.安装: sudo apt-get install vsftpd 2.启动 sudo service vsftpd start //启动 sudo service vsftpd stop //停止 sudo service vsftpd restart //重新启动 3.打开配置文件 sudo nano /etc/vsftpd.conf 4.配置:限制在指定目录&…...

基于FPGA的DDS信号发生器(图文并茂+深度原理解析)

篇幅有限,本文详细源文件已打包 至个人主页资源,需要自取...... 前言 DDS(直接数字合成)技术是先进的频率合成手段,在数字信号处理与硬件实现领域作用关键。它因低成本、低功耗、高分辨率以及快速转换时间等优点备受认可。 本文着重探究基于 FPGA 的简易 DDS 信号发生器设…...

QT:绘制事件和定时器

1.绘制时针 xx.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTimer> #include<QPainter> #include <QTime>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpubl…...

【算法——递归回溯】

这个东西还是很重要的&#xff0c;直接决定了你的动态规划章节的学习深度 78. 子集 方法1&#xff1a; vector<vector<int>>V; void dfs(vector<int> v,vector<int> nums,int index) {if(indexnums.size()) V.push_back(v);else{v.push_back(nums[i…...

手机在网状态接口的使用和注意事项

手机在网状态接口是用于查询手机号码在运营商数据库中的实时状态的工具&#xff0c;这种接口在互联网金融、贷款、租赁、保险等相关行业中尤为重要&#xff0c;因为它可以帮助这些行业进行更有效的风控审核。以下是对手机在网状态接口的详细介绍&#xff1a; 一、手机在网状态…...

WebGl 使用uniform变量动态修改点的颜色

在WebGL中&#xff0c;uniform变量用于在顶点着色器和片元着色器之间传递全局状态信息&#xff0c;这些信息在渲染过程中不会随着顶点的变化而变化。uniform变量可以用来设置变换矩阵、光照参数、材料属性等。由于它们在整个渲染过程中共享&#xff0c;因此可以被所有使用该着色…...

Leetcode 划分字母区间

题目要求&#xff1a; 将字符串 s 划分成尽量多的片段&#xff0c;保证每个片段中出现的字母不会出现在其他片段中。 具体解释如下&#xff1a; 尽量多的片段&#xff1a;题目要求的是在划分过程中&#xff0c;我们要尽量让划分的片段数量最大化&#xff0c;而不是最少化。每…...

可编辑div遇到的那些事

在日常开发中有时可能会遇到input 或 textarea 不能满足的开发场景&#xff0c;比如多行输入的情况下&#xff0c;textarea 的右下角icon 无法去除, 所以此时可以使用div 设置可编辑状态&#xff0c;完成功能开发&#xff0c;在开发的过程中仍会遇到一下问题。 1&#xff0c;如…...

什麼是高速HTTP代理?

高速HTTP代理是一種用於加速和優化互聯網連接的技術。它通過在用戶和目標網站之間充當仲介伺服器&#xff0c;幫助用戶快速訪問網路資源。HTTP代理不僅可以提高訪問速度&#xff0c;還能提供一定程度的隱私保護和安全性。 高速HTTP代理的工作原理 HTTP代理伺服器位於用戶設備…...

三子棋(C 语言)

目录 一、游戏设计的整体思路二、各个步骤的代码实现1. 菜单及循环选择的实现2. 棋盘的初始化和显示3. 轮流下棋及结果判断实现4. 结果判断实现 三、所有代码四、总结 一、游戏设计的整体思路 &#xff08;1&#xff09;提供一个菜单让玩家选择人机对战、玩家对战或者退出游戏…...

HWS赛题 入门 MIPS Pwn-Mplogin(MIPS_shellcode)

解题所涉知识点&#xff1a; 泄露或修改内存数据&#xff1a; 堆地址&#xff1a;栈地址&#xff1a;栈上数据的连带输出(Stack Leak) && Stack溢出覆盖内存libc地址&#xff1a;BSS段地址&#xff1a; 劫持程序执行流程&#xff1a;[[MIPS_ROP]] 获得shell或flag&am…...

纯血鸿蒙启动公测,爱加密鸿蒙加固平台发布,助力鸿蒙应用安全运营!

鸿蒙系统打破了移动操作系统两极格局&#xff0c;实现操作系统核心技术的自主可控、安全可靠&#xff0c;在神州大地上掀起一波科技革新的浪潮&#xff0c;HarmonyOS NEXT成为大型企业必须要布局的应用系统之一。 HarmonyOS NEXT于10月8日正式开启公测&#xff0c;距离面向全体…...

MySQL中 truncate、drop和delete的区别

MySQL中 truncate、drop和delete区别 truncate 执行速度快&#xff0c;删除所有数据&#xff0c;但是保留表结构不记录日志事务不安全&#xff0c;不能回滚可重置自增主键计数器 drop 执行速度较快&#xff0c;删除整张表数据和结构不记录日志事务不安全&#xff0c;不能回…...

什么开放式耳机值得买?开放式耳机推荐排行榜!

长时间佩戴传统入耳式耳机有时可能会影响耳道健康&#xff0c;鉴于此&#xff0c;转而选择不入耳设计的开放式耳机就成了不少人的新倾向&#xff0c;它们有助于减少细菌滋生和耳道闷热的烦恼。为了帮助大家找到合适的选项&#xff0c;下面我将列举一些市面上口碑不错的开放式耳…...

Apache Doris的分区与分桶详解

目录 第一章 Doris介绍和分区分桶作用 1.1 Doris背景介绍 1.2 分区与分桶的意义 第二章 原理解析 2.1 分区机制 2.1.1 定义 2.1.2 类型 2.1.3 工作原理 2.2 分桶机制 2.2.1 概念 2.2.2 实现方式 2.2.3 与分区的关系 第三章 手动分区与自动分区对比 3.1 手动分区 …...

docker详解介绍+基础操作 (二)info详解

1 docker相关信息和优化配置 1&#xff09;查看docker版本详解 rootzz:~# docker version Client: Docker Engine - CommunityVersion: 27.3.1API version: 1.47Go version: go1.22.7Git commit: ce12230Built: Fri Sep 20 11:40:…...

C0023.在Clion中创建控件,对控件进行提升为自定义控件的步骤

新建Ui界面文件 修改新生成的ui文件头文件 关闭之前打开的ui文件&#xff0c;如上图Qt Designer中打开的&#xff0c;然后修改新生成的ui文件对应的头文件&#xff0c;改成自己需要的控件类即可。 提升控件为自定义类 将如下头文件中的类名和头文件名输入到提升窗口中&#…...

探索 C# 常用第三方库与框架

在 C# 开发中&#xff0c;第三方库和框架极大地提高了开发效率和代码质量。通过这些库&#xff0c;开发者可以快速处理 JSON 数据、简化对象映射、记录日志、以及高效地与数据库交互。本文将介绍四个常用的 C# 第三方库&#xff1a;Newtonsoft.Json、AutoMapper、NLog/Serilog …...

Leetcode 3576. Transform Array to All Equal Elements

Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接&#xff1a;3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到&#xf…...

可靠性+灵活性:电力载波技术在楼宇自控中的核心价值

可靠性灵活性&#xff1a;电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中&#xff0c;电力载波技术&#xff08;PLC&#xff09;凭借其独特的优势&#xff0c;正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据&#xff0c;无需额外布…...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...

React19源码系列之 事件插件系统

事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)

要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况&#xff0c;可以通过以下几种方式模拟或触发&#xff1a; 1. 增加CPU负载 运行大量计算密集型任务&#xff0c;例如&#xff1a; 使用多线程循环执行复杂计算&#xff08;如数学运算、加密解密等&#xff09;。运行图…...

鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/

使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题&#xff1a;docker pull 失败 网络不同&#xff0c;需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

MySQL JOIN 表过多的优化思路

当 MySQL 查询涉及大量表 JOIN 时&#xff0c;性能会显著下降。以下是优化思路和简易实现方法&#xff1a; 一、核心优化思路 减少 JOIN 数量 数据冗余&#xff1a;添加必要的冗余字段&#xff08;如订单表直接存储用户名&#xff09;合并表&#xff1a;将频繁关联的小表合并成…...

scikit-learn机器学习

# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...