pyfink1.20版本下实现消费kafka中数据并实时计算
1、环境
JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.20
2、执行python-flink脚本
从kafka的demo获取消息,并将其中的a字段存入kafka的test_kafka_topic内,并打印sum(b)的值
from pyflink.table import TableEnvironment, EnvironmentSettingsdef log_processing():# 创建流处理环境env_settings = EnvironmentSettings.in_streaming_mode()t_env = TableEnvironment.create(env_settings)# 设置 Kafka 连接器 JAR 文件的路径# 确保 JAR 文件确实存在于指定路径,并且与 Flink 版本兼容t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///home/data/flink/flink-1.20.0/lib/flink-sql-connector-kafka-3.3.0-1.20.jar")# 定义源表 DDLsource_ddl = """CREATE TABLE source_table(a VARCHAR,b INT -- 如果 b 字段不重要,可以考虑从源表中移除它) WITH ('connector' = 'kafka','topic' = 'demo','properties.bootstrap.servers' = '192.168.15.130:9092','properties.group.id' = 'test_3','scan.startup.mode' = 'latest-offset','format' = 'json')"""# 定义目标表 DDLsink_ddl = """CREATE TABLE sink_table(a VARCHAR) WITH ('connector' = 'kafka','topic' = 'test_kafka_topic','properties.bootstrap.servers' = '192.168.15.130:9092','format' = 'json')"""# 执行 DDL 语句创建表t_env.execute_sql(source_ddl)#table = t_env.from_path("sql_source")#table.execute().print()table_result = t_env.execute_sql("select sum(b) sb from source_table")table_result.print()t_env.execute_sql(sink_ddl)# 执行 SQL 查询并将结果插入到目标表# 注意:wait() 方法会阻塞,直到插入操作完成(在流处理中通常是无限的)t_env.sql_query("SELECT a FROM source_table") \.execute_insert("sink_table").wait() # 考虑是否真的需要 wait()if __name__ == '__main__':log_processing()
python3 KafkaSource.py
3、启动kafka生产者
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.130:9092 --topic demo
输入模拟数据进行测试
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
可以看到sum(b)值已输出
4、启动kafka消费者
查看往test_kafka_topic插入的a字段数据已被消费
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.130:9092 --from-beginning --topic test_kafka_topic
相关文章:

pyfink1.20版本下实现消费kafka中数据并实时计算
1、环境 JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.202、执行python-flink脚本 从kafka的demo获取消…...
国产Tomcat东方通(TongWeb):Linux系统部署全攻略
国产Tomcat东方通(TongWeb):Linux系统部署全攻略 国产tomcat东方通tongweb在linux系统上部署全流程 [这里是图片001] 项目地址: https://gitcode.com/Resource-Bundle-Collection/fca68 项目介绍 在当前的软件开发和运维环境中,…...
bilstm -crf 的torch官方的教程 不支持 batchsize 。。。。有更效率更高的方案吗
根据您的需求,您正在寻找一个支持批量处理(batch size)的BiLSTM-CRF模型的PyTorch教程。以下是一些资源,它们提供了更高效的解决方案,包括批量处理的支持: 1. **a-PyTorch-Tutorial-to-Sequence-Labeling*…...
Python面试常见问题及答案6
一、基础部分 问题1: 在Python中,如何将字符串转换为整数?如果字符串不是合法的数字字符串会怎样? 答案: 在Python中,可以使用int()函数将字符串转换为整数。如果字符串是合法的数字字符串,转换…...
代码随想录算法训练营第三天 | 链表理论基础 | 203.移除链表元素
感觉上是可以轻松完成的,因为对链接的结构,元素的删除过程心里明镜似的 实际上四处跑气 结构体的初始化好像完全忘掉了,用malloc折腾半天,忘记了用new,真想扇自己嘴巴子到飞起删除后写一个函数,把链表打印…...
1. 机器学习基本知识(5)——练习题(1)
1.7 🐦🔥练习题(本章重点回顾与总结) 0.回答格式约定: 对于书本内容的回答,将优先寻找书本内容作为答案进行回答。 书本内容回答完毕后,将对问题进行补充回答,上面分割线作为两个…...

vue 自定义组件image 和 input
本章主要是介绍自定义的组件:WInput:这是一个验证码输入框,自动校验,输入完成回调等;WImage:这是一个图片展示组件,集成了缩放,移动等操作。 目录 一、安装 二、引入组件 三、使用…...
系列3:基于Centos-8.6 Kubernetes使用nfs挂载pod的应用日志文件
每日禅语 古代,一位官员被革职遣返,心中苦闷无处排解,便来到一位禅师的法堂。禅师静静地听完了此人的倾诉,将他带入自己的禅房之中。禅师指着桌上的一瓶水,微笑着对官员说:“你看这瓶水,它已经…...
Jfinal项目整合Redis
1、引入相关依赖 <!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version> </dependency><depen…...

在Ubuntu服务器上备份文件到自己的百度网盘
文章目录 概述安装bypy同步文件定时任务脚本 概述 之前自购了一台阿里云服务器,系统镜像为Ubuntu 22.04, 并且搭建了LNMP开发环境(可以参考:《Ubuntu搭建PHP开发环境操作步骤(保姆级教程)》)。由于项目运行中会产生附…...

Unity 模板测试透视效果(URP)
可以实现笼中窥梦和PicoVR中通过VST局部透视效果。 使用到的Shader: Shader "Unlit/StencilShader" {Properties{[IntRange]_Index("Stencil Index",Range(0,255))0}SubShader{Tags{"RenderType""Opaque""Queue""Geo…...

《计算机视觉证书:开启职业发展新航道》
一、引言 在当今科技飞速发展的时代,计算机视觉技术正以惊人的速度改变着我们的生活和工作方式。从智能手机的人脸识别解锁到自动驾驶汽车的环境感知,计算机视觉技术的应用无处不在。而计算机视觉证书作为这一领域的专业认证,其作用愈发凸显…...

.NET6 WebApi第1讲:VSCode开发.NET项目、区别.NET5框架【两个框架启动流程详解】
一、使用VSCode开发.NET项目 1、创建文件夹,使用VSCode打开 2、安装扩展工具 1>C# 2>安装NuGet包管理工具,外部dll包依靠它来加载 法1》:NuGet Gallery,注意要启动科学的工具 法2》NuGet Package Manager GUl,…...

Git-分布式版本控制工具
目录 1. 概述 1. 1集中式版本控制工具 1.2分布式版本控制工具 2.Git 2.1 git 工作流程 1. 概述 在开发活动中,我们经常会遇到以下几个场景:备份、代码回滚、协同开发、追溯问题代码编写人和编写时间(追责)等。备份的话是为了…...
C++ 第10章 对文件的输入输出
https://www.bilibili.com/video/BV1cx4y1d7Ut/?p147&spm_id_from333.1007.top_right_bar_window_history.content.click&vd_sourcee8984989cddeb3ef7b7e9fd89098dbe8 🍁🍁🍁本篇为贺宏宏老师C语言视频教程文件输入输出部分笔记整理…...

【机器学习】手写数字识别的最优解:CNN+Softmax、Sigmoid与SVM的对比实战
一、基于CNNSoftmax函数进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分析 二、 基于CNNsigmoid函数进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分析 三、 基于CNNSVM进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分…...
android 聊天界面键盘、表情切换丝滑
1、我们在聊天页面时候,往往会遇到,键盘、表情、其他选择切换时候页面会出现掉下来再弹起问题,这是因为,我们切换时候,键盘异步导致内容View高度变化,页面掉下来后,又被其他内容顶起这种很差视觉…...

Web项目图片视频加载缓慢/首屏加载白屏
Web项目图片视频加载缓慢/首屏加载白屏 文章目录 Web项目图片视频加载缓慢/首屏加载白屏一、原因二、 解决方案2.1、 图片和视频的优化2.1.1、压缩图片或视频2.1.2、 选择合适的图片或视频格式2.1.3、 使用图片或视频 CDN 加速2.1.4、Nginx中开启gzip 三、压缩工具推荐 一、原因…...

关于Git分支合并,跨仓库合并方式
关于Git合并代码的方式说明 文章目录 关于Git合并代码的方式说明前情提要开始合并方式一:git merge方式二:git cherry-pick方式三:git checkout Git跨仓库合并的准备事项前提拉取源仓库代码 前情提要 同仓库不同分支代码的合并可直接往下看文…...

[网络] UDP协议16位校验和
16位校验和是udp报头中的一个字段,绝大多数的教材和网课都会忽略这个字段,不去细究,我闲的蛋疼问了问ai,得到了一个答案,故作此文,以证明我爱学习之心惊天地泣鬼神(狗头 ai的回答 仅从作用来说,它会根据整个应用层报文进行运算,生成一个准确的数字,这个数字不能保证唯一性,但根…...

【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...

ETLCloud可能遇到的问题有哪些?常见坑位解析
数据集成平台ETLCloud,主要用于支持数据的抽取(Extract)、转换(Transform)和加载(Load)过程。提供了一个简洁直观的界面,以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)
1.获取 authorizationCode: 2.利用 authorizationCode 获取 accessToken:文档中心 3.获取手机:文档中心 4.获取昵称头像:文档中心 首先创建 request 若要获取手机号,scope必填 phone,permissions 必填 …...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲
文章目录 前言第一部分:体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分:体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...
MySQL 8.0 事务全面讲解
以下是一个结合两次回答的 MySQL 8.0 事务全面讲解,涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容,并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念(ACID) 事务是…...

抽象类和接口(全)
一、抽象类 1.概念:如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象,这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法,包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中,⼀个类如果被 abs…...