当前位置: 首页 > news >正文

pyfink1.20版本下实现消费kafka中数据并实时计算

1、环境

JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.20

2、执行python-flink脚本

从kafka的demo获取消息,并将其中的a字段存入kafka的test_kafka_topic内,并打印sum(b)的值

from pyflink.table import TableEnvironment, EnvironmentSettingsdef log_processing():# 创建流处理环境env_settings = EnvironmentSettings.in_streaming_mode()t_env = TableEnvironment.create(env_settings)# 设置 Kafka 连接器 JAR 文件的路径# 确保 JAR 文件确实存在于指定路径,并且与 Flink 版本兼容t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///home/data/flink/flink-1.20.0/lib/flink-sql-connector-kafka-3.3.0-1.20.jar")# 定义源表 DDLsource_ddl = """CREATE TABLE source_table(a VARCHAR,b INT  -- 如果 b 字段不重要,可以考虑从源表中移除它) WITH ('connector' = 'kafka','topic' = 'demo','properties.bootstrap.servers' = '192.168.15.130:9092','properties.group.id' = 'test_3','scan.startup.mode' = 'latest-offset','format' = 'json')"""# 定义目标表 DDLsink_ddl = """CREATE TABLE sink_table(a VARCHAR) WITH ('connector' = 'kafka','topic' = 'test_kafka_topic','properties.bootstrap.servers' = '192.168.15.130:9092','format' = 'json')"""# 执行 DDL 语句创建表t_env.execute_sql(source_ddl)#table = t_env.from_path("sql_source")#table.execute().print()table_result  = t_env.execute_sql("select sum(b) sb from source_table")table_result.print()t_env.execute_sql(sink_ddl)# 执行 SQL 查询并将结果插入到目标表# 注意:wait() 方法会阻塞,直到插入操作完成(在流处理中通常是无限的)t_env.sql_query("SELECT a FROM source_table") \.execute_insert("sink_table").wait()  # 考虑是否真的需要 wait()if __name__ == '__main__':log_processing()
python3 KafkaSource.py

3、启动kafka生产者

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.130:9092 --topic demo
输入模拟数据进行测试
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}

可以看到sum(b)值已输出
在这里插入图片描述

4、启动kafka消费者

查看往test_kafka_topic插入的a字段数据已被消费

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.130:9092  --from-beginning --topic test_kafka_topic

在这里插入图片描述

相关文章:

pyfink1.20版本下实现消费kafka中数据并实时计算

1、环境 JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.202、执行python-flink脚本 从kafka的demo获取消…...

国产Tomcat东方通(TongWeb):Linux系统部署全攻略

国产Tomcat东方通(TongWeb):Linux系统部署全攻略 国产tomcat东方通tongweb在linux系统上部署全流程 [这里是图片001] 项目地址: https://gitcode.com/Resource-Bundle-Collection/fca68 项目介绍 在当前的软件开发和运维环境中&#xff0c…...

bilstm -crf 的torch官方的教程 不支持 batchsize 。。。。有更效率更高的方案吗

根据您的需求,您正在寻找一个支持批量处理(batch size)的BiLSTM-CRF模型的PyTorch教程。以下是一些资源,它们提供了更高效的解决方案,包括批量处理的支持: 1. **a-PyTorch-Tutorial-to-Sequence-Labeling*…...

Python面试常见问题及答案6

一、基础部分 问题1: 在Python中,如何将字符串转换为整数?如果字符串不是合法的数字字符串会怎样? 答案: 在Python中,可以使用int()函数将字符串转换为整数。如果字符串是合法的数字字符串,转换…...

代码随想录算法训练营第三天 | 链表理论基础 | 203.移除链表元素

感觉上是可以轻松完成的,因为对链接的结构,元素的删除过程心里明镜似的 实际上四处跑气 结构体的初始化好像完全忘掉了,用malloc折腾半天,忘记了用new,真想扇自己嘴巴子到飞起删除后写一个函数,把链表打印…...

1. 机器学习基本知识(5)——练习题(1)

1.7 🐦‍🔥练习题(本章重点回顾与总结) 0.回答格式约定: 对于书本内容的回答,将优先寻找书本内容作为答案进行回答。 书本内容回答完毕后,将对问题进行补充回答,上面分割线作为两个…...

vue 自定义组件image 和 input

本章主要是介绍自定义的组件:WInput:这是一个验证码输入框,自动校验,输入完成回调等;WImage:这是一个图片展示组件,集成了缩放,移动等操作。 目录 一、安装 二、引入组件 三、使用…...

系列3:基于Centos-8.6 Kubernetes使用nfs挂载pod的应用日志文件

每日禅语 古代,一位官员被革职遣返,心中苦闷无处排解,便来到一位禅师的法堂。禅师静静地听完了此人的倾诉,将他带入自己的禅房之中。禅师指着桌上的一瓶水,微笑着对官员说:​“你看这瓶水,它已经…...

Jfinal项目整合Redis

1、引入相关依赖 <!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version> </dependency><depen…...

在Ubuntu服务器上备份文件到自己的百度网盘

文章目录 概述安装bypy同步文件定时任务脚本 概述 之前自购了一台阿里云服务器&#xff0c;系统镜像为Ubuntu 22.04&#xff0c; 并且搭建了LNMP开发环境&#xff08;可以参考&#xff1a;《Ubuntu搭建PHP开发环境操作步骤(保姆级教程)》&#xff09;。由于项目运行中会产生附…...

Unity 模板测试透视效果(URP)

可以实现笼中窥梦和PicoVR中通过VST局部透视效果。 使用到的Shader: Shader "Unlit/StencilShader" {Properties{[IntRange]_Index("Stencil Index",Range(0,255))0}SubShader{Tags{"RenderType""Opaque""Queue""Geo…...

《计算机视觉证书:开启职业发展新航道》

一、引言 在当今科技飞速发展的时代&#xff0c;计算机视觉技术正以惊人的速度改变着我们的生活和工作方式。从智能手机的人脸识别解锁到自动驾驶汽车的环境感知&#xff0c;计算机视觉技术的应用无处不在。而计算机视觉证书作为这一领域的专业认证&#xff0c;其作用愈发凸显…...

.NET6 WebApi第1讲:VSCode开发.NET项目、区别.NET5框架【两个框架启动流程详解】

一、使用VSCode开发.NET项目 1、创建文件夹&#xff0c;使用VSCode打开 2、安装扩展工具 1>C# 2>安装NuGet包管理工具&#xff0c;外部dll包依靠它来加载 法1》&#xff1a;NuGet Gallery&#xff0c;注意要启动科学的工具 法2》NuGet Package Manager GUl&#xff0c…...

Git-分布式版本控制工具

目录 1. 概述 1. 1集中式版本控制工具 1.2分布式版本控制工具 2.Git 2.1 git 工作流程 1. 概述 在开发活动中&#xff0c;我们经常会遇到以下几个场景&#xff1a;备份、代码回滚、协同开发、追溯问题代码编写人和编写时间&#xff08;追责&#xff09;等。备份的话是为了…...

C++ 第10章 对文件的输入输出

https://www.bilibili.com/video/BV1cx4y1d7Ut/?p147&spm_id_from333.1007.top_right_bar_window_history.content.click&vd_sourcee8984989cddeb3ef7b7e9fd89098dbe8 &#x1f341;&#x1f341;&#x1f341;本篇为贺宏宏老师C语言视频教程文件输入输出部分笔记整理…...

【机器学习】手写数字识别的最优解:CNN+Softmax、Sigmoid与SVM的对比实战

一、基于CNNSoftmax函数进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分析 二、 基于CNNsigmoid函数进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分析 三、 基于CNNSVM进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分…...

android 聊天界面键盘、表情切换丝滑

1、我们在聊天页面时候&#xff0c;往往会遇到&#xff0c;键盘、表情、其他选择切换时候页面会出现掉下来再弹起问题&#xff0c;这是因为&#xff0c;我们切换时候&#xff0c;键盘异步导致内容View高度变化&#xff0c;页面掉下来后&#xff0c;又被其他内容顶起这种很差视觉…...

Web项目图片视频加载缓慢/首屏加载白屏

Web项目图片视频加载缓慢/首屏加载白屏 文章目录 Web项目图片视频加载缓慢/首屏加载白屏一、原因二、 解决方案2.1、 图片和视频的优化2.1.1、压缩图片或视频2.1.2、 选择合适的图片或视频格式2.1.3、 使用图片或视频 CDN 加速2.1.4、Nginx中开启gzip 三、压缩工具推荐 一、原因…...

关于Git分支合并,跨仓库合并方式

关于Git合并代码的方式说明 文章目录 关于Git合并代码的方式说明前情提要开始合并方式一&#xff1a;git merge方式二&#xff1a;git cherry-pick方式三&#xff1a;git checkout Git跨仓库合并的准备事项前提拉取源仓库代码 前情提要 同仓库不同分支代码的合并可直接往下看文…...

[网络] UDP协议16位校验和

16位校验和是udp报头中的一个字段,绝大多数的教材和网课都会忽略这个字段,不去细究,我闲的蛋疼问了问ai,得到了一个答案,故作此文,以证明我爱学习之心惊天地泣鬼神(狗头 ai的回答 仅从作用来说,它会根据整个应用层报文进行运算,生成一个准确的数字,这个数字不能保证唯一性,但根…...

ARM BRBE技术:硬件级控制流分析与优化

1. ARM分支记录缓冲区扩展(BRBE)技术概述在现代处理器架构中&#xff0c;控制流信息的捕获对于性能分析和代码优化至关重要。ARM分支记录缓冲区扩展(Branch Record Buffer Extension, BRBE)是ARMv8/v9架构中引入的一项硬件特性&#xff0c;它通过专用硬件机制记录程序执行过程中…...

数环通iPaaS流程引擎中断恢复机制设计:快照 + 消息驱动实现无缝续跑

一个无法回避的问题 做iPaaS自动化引擎开发的同学迟早会遇到这个问题&#xff1a;流程跑到一半断了&#xff0c;怎么办&#xff1f; 不是那种代码bug导致的异常退出——那种靠异常处理就行。我说的是更真实、更棘手的场景&#xff1a; 服务发版需要滚动重启&#xff0c;机器上还…...

PixelFlasher:5分钟掌握Pixel设备刷机与Root管理的终极指南

PixelFlasher&#xff1a;5分钟掌握Pixel设备刷机与Root管理的终极指南 【免费下载链接】PixelFlasher Pixel™ phone flashing GUI utility with features. 项目地址: https://gitcode.com/gh_mirrors/pi/PixelFlasher PixelFlasher是一款专为Google Pixel设备设计的图…...

Audio Slicer:智能音频切片工具终极指南,告别手动剪辑烦恼

Audio Slicer&#xff1a;智能音频切片工具终极指南&#xff0c;告别手动剪辑烦恼 【免费下载链接】audio-slicer A simple GUI application that slices audio with silence detection 项目地址: https://gitcode.com/gh_mirrors/aud/audio-slicer 还在为繁琐的音频剪辑…...

在珠宝首饰加工中,遨博协作机器人配合微力控技术,实现宝石的自动化镶嵌

在珠宝首饰的高端制造领域&#xff0c;宝石镶嵌是决定产品最终价值与艺术表现力的灵魂工序。这一过程要求近乎苛刻的精度、无可挑剔的稳定性&#xff0c;以及对脆性材料的极致呵护。长期以来&#xff0c;这依赖于镶嵌师多年练就的“手感”与专注力&#xff0c;属于劳动力高度密…...

Perplexity语法查询功能深度解析(官方未公开的7个语法边界场景)

更多请点击&#xff1a; https://codechina.net 第一章&#xff1a;Perplexity语法查询功能的核心定位与设计哲学 Perplexity语法查询功能并非通用搜索引擎的简单变体&#xff0c;而是面向技术深度用户的语义化推理引擎。其核心定位在于将自然语言提问转化为可执行、可验证、可…...

告别闪烁!用STM32CubeMX快速配置PWM+DMA驱动WS2812彩灯(F4系列实测)

告别闪烁&#xff01;用STM32CubeMX快速配置PWMDMA驱动WS2812彩灯&#xff08;F4系列实测&#xff09; 在嵌入式开发中&#xff0c;驱动WS2812彩灯往往需要精确的时序控制&#xff0c;传统软件延时方式不仅占用CPU资源&#xff0c;还容易因中断干扰导致灯光闪烁。本文将展示如何…...

告别复制粘贴!手把手教你封装可复用的Echarts-for-weixin图表组件

微信小程序Echarts组件化实战&#xff1a;打造高复用图表解决方案 在数据驱动的产品设计中&#xff0c;图表可视化已成为微信小程序不可或缺的组成部分。面对多页面复用、动态数据更新等实际需求&#xff0c;直接使用原生ec-canvas组件往往会导致代码冗余和维护困难。本文将分享…...

别再被‘模糊’搞晕了!用Python模拟SAR距离模糊与方位模糊的直观对比(附代码)

用Python实战解析SAR成像中的距离模糊与方位模糊现象 当你第一次看到SAR图像上那些神秘的条纹和重影时&#xff0c;是否好奇这些"视觉噪音"从何而来&#xff1f;作为雷达成像领域的经典问题&#xff0c;距离模糊和方位模糊直接影响着图像质量。今天&#xff0c;我们不…...

碧蓝航线自动化助手:3小时解放你的游戏时间

碧蓝航线自动化助手&#xff1a;3小时解放你的游戏时间 【免费下载链接】AzurLaneAutoScript Azur Lane bot (CN/EN/JP/TW) 碧蓝航线脚本 | 无缝委托科研&#xff0c;全自动大世界 项目地址: https://gitcode.com/gh_mirrors/az/AzurLaneAutoScript 还在为碧蓝航线中重复…...