pyfink1.20版本下实现消费kafka中数据并实时计算
1、环境
JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.20
2、执行python-flink脚本
从kafka的demo获取消息,并将其中的a字段存入kafka的test_kafka_topic内,并打印sum(b)的值
from pyflink.table import TableEnvironment, EnvironmentSettingsdef log_processing():# 创建流处理环境env_settings = EnvironmentSettings.in_streaming_mode()t_env = TableEnvironment.create(env_settings)# 设置 Kafka 连接器 JAR 文件的路径# 确保 JAR 文件确实存在于指定路径,并且与 Flink 版本兼容t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///home/data/flink/flink-1.20.0/lib/flink-sql-connector-kafka-3.3.0-1.20.jar")# 定义源表 DDLsource_ddl = """CREATE TABLE source_table(a VARCHAR,b INT -- 如果 b 字段不重要,可以考虑从源表中移除它) WITH ('connector' = 'kafka','topic' = 'demo','properties.bootstrap.servers' = '192.168.15.130:9092','properties.group.id' = 'test_3','scan.startup.mode' = 'latest-offset','format' = 'json')"""# 定义目标表 DDLsink_ddl = """CREATE TABLE sink_table(a VARCHAR) WITH ('connector' = 'kafka','topic' = 'test_kafka_topic','properties.bootstrap.servers' = '192.168.15.130:9092','format' = 'json')"""# 执行 DDL 语句创建表t_env.execute_sql(source_ddl)#table = t_env.from_path("sql_source")#table.execute().print()table_result = t_env.execute_sql("select sum(b) sb from source_table")table_result.print()t_env.execute_sql(sink_ddl)# 执行 SQL 查询并将结果插入到目标表# 注意:wait() 方法会阻塞,直到插入操作完成(在流处理中通常是无限的)t_env.sql_query("SELECT a FROM source_table") \.execute_insert("sink_table").wait() # 考虑是否真的需要 wait()if __name__ == '__main__':log_processing()
python3 KafkaSource.py
3、启动kafka生产者
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.130:9092 --topic demo
输入模拟数据进行测试
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
可以看到sum(b)值已输出
4、启动kafka消费者
查看往test_kafka_topic插入的a字段数据已被消费
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.130:9092 --from-beginning --topic test_kafka_topic
相关文章:

pyfink1.20版本下实现消费kafka中数据并实时计算
1、环境 JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.202、执行python-flink脚本 从kafka的demo获取消…...
国产Tomcat东方通(TongWeb):Linux系统部署全攻略
国产Tomcat东方通(TongWeb):Linux系统部署全攻略 国产tomcat东方通tongweb在linux系统上部署全流程 [这里是图片001] 项目地址: https://gitcode.com/Resource-Bundle-Collection/fca68 项目介绍 在当前的软件开发和运维环境中,…...
bilstm -crf 的torch官方的教程 不支持 batchsize 。。。。有更效率更高的方案吗
根据您的需求,您正在寻找一个支持批量处理(batch size)的BiLSTM-CRF模型的PyTorch教程。以下是一些资源,它们提供了更高效的解决方案,包括批量处理的支持: 1. **a-PyTorch-Tutorial-to-Sequence-Labeling*…...
Python面试常见问题及答案6
一、基础部分 问题1: 在Python中,如何将字符串转换为整数?如果字符串不是合法的数字字符串会怎样? 答案: 在Python中,可以使用int()函数将字符串转换为整数。如果字符串是合法的数字字符串,转换…...
代码随想录算法训练营第三天 | 链表理论基础 | 203.移除链表元素
感觉上是可以轻松完成的,因为对链接的结构,元素的删除过程心里明镜似的 实际上四处跑气 结构体的初始化好像完全忘掉了,用malloc折腾半天,忘记了用new,真想扇自己嘴巴子到飞起删除后写一个函数,把链表打印…...
1. 机器学习基本知识(5)——练习题(1)
1.7 🐦🔥练习题(本章重点回顾与总结) 0.回答格式约定: 对于书本内容的回答,将优先寻找书本内容作为答案进行回答。 书本内容回答完毕后,将对问题进行补充回答,上面分割线作为两个…...

vue 自定义组件image 和 input
本章主要是介绍自定义的组件:WInput:这是一个验证码输入框,自动校验,输入完成回调等;WImage:这是一个图片展示组件,集成了缩放,移动等操作。 目录 一、安装 二、引入组件 三、使用…...
系列3:基于Centos-8.6 Kubernetes使用nfs挂载pod的应用日志文件
每日禅语 古代,一位官员被革职遣返,心中苦闷无处排解,便来到一位禅师的法堂。禅师静静地听完了此人的倾诉,将他带入自己的禅房之中。禅师指着桌上的一瓶水,微笑着对官员说:“你看这瓶水,它已经…...
Jfinal项目整合Redis
1、引入相关依赖 <!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version> </dependency><depen…...

在Ubuntu服务器上备份文件到自己的百度网盘
文章目录 概述安装bypy同步文件定时任务脚本 概述 之前自购了一台阿里云服务器,系统镜像为Ubuntu 22.04, 并且搭建了LNMP开发环境(可以参考:《Ubuntu搭建PHP开发环境操作步骤(保姆级教程)》)。由于项目运行中会产生附…...

Unity 模板测试透视效果(URP)
可以实现笼中窥梦和PicoVR中通过VST局部透视效果。 使用到的Shader: Shader "Unlit/StencilShader" {Properties{[IntRange]_Index("Stencil Index",Range(0,255))0}SubShader{Tags{"RenderType""Opaque""Queue""Geo…...

《计算机视觉证书:开启职业发展新航道》
一、引言 在当今科技飞速发展的时代,计算机视觉技术正以惊人的速度改变着我们的生活和工作方式。从智能手机的人脸识别解锁到自动驾驶汽车的环境感知,计算机视觉技术的应用无处不在。而计算机视觉证书作为这一领域的专业认证,其作用愈发凸显…...

.NET6 WebApi第1讲:VSCode开发.NET项目、区别.NET5框架【两个框架启动流程详解】
一、使用VSCode开发.NET项目 1、创建文件夹,使用VSCode打开 2、安装扩展工具 1>C# 2>安装NuGet包管理工具,外部dll包依靠它来加载 法1》:NuGet Gallery,注意要启动科学的工具 法2》NuGet Package Manager GUl,…...

Git-分布式版本控制工具
目录 1. 概述 1. 1集中式版本控制工具 1.2分布式版本控制工具 2.Git 2.1 git 工作流程 1. 概述 在开发活动中,我们经常会遇到以下几个场景:备份、代码回滚、协同开发、追溯问题代码编写人和编写时间(追责)等。备份的话是为了…...
C++ 第10章 对文件的输入输出
https://www.bilibili.com/video/BV1cx4y1d7Ut/?p147&spm_id_from333.1007.top_right_bar_window_history.content.click&vd_sourcee8984989cddeb3ef7b7e9fd89098dbe8 🍁🍁🍁本篇为贺宏宏老师C语言视频教程文件输入输出部分笔记整理…...

【机器学习】手写数字识别的最优解:CNN+Softmax、Sigmoid与SVM的对比实战
一、基于CNNSoftmax函数进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分析 二、 基于CNNsigmoid函数进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分析 三、 基于CNNSVM进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分…...
android 聊天界面键盘、表情切换丝滑
1、我们在聊天页面时候,往往会遇到,键盘、表情、其他选择切换时候页面会出现掉下来再弹起问题,这是因为,我们切换时候,键盘异步导致内容View高度变化,页面掉下来后,又被其他内容顶起这种很差视觉…...

Web项目图片视频加载缓慢/首屏加载白屏
Web项目图片视频加载缓慢/首屏加载白屏 文章目录 Web项目图片视频加载缓慢/首屏加载白屏一、原因二、 解决方案2.1、 图片和视频的优化2.1.1、压缩图片或视频2.1.2、 选择合适的图片或视频格式2.1.3、 使用图片或视频 CDN 加速2.1.4、Nginx中开启gzip 三、压缩工具推荐 一、原因…...

关于Git分支合并,跨仓库合并方式
关于Git合并代码的方式说明 文章目录 关于Git合并代码的方式说明前情提要开始合并方式一:git merge方式二:git cherry-pick方式三:git checkout Git跨仓库合并的准备事项前提拉取源仓库代码 前情提要 同仓库不同分支代码的合并可直接往下看文…...

[网络] UDP协议16位校验和
16位校验和是udp报头中的一个字段,绝大多数的教材和网课都会忽略这个字段,不去细究,我闲的蛋疼问了问ai,得到了一个答案,故作此文,以证明我爱学习之心惊天地泣鬼神(狗头 ai的回答 仅从作用来说,它会根据整个应用层报文进行运算,生成一个准确的数字,这个数字不能保证唯一性,但根…...

利用最小二乘法找圆心和半径
#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

边缘计算医疗风险自查APP开发方案
核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

初探Service服务发现机制
1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能:服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源…...
go 里面的指针
指针 在 Go 中,指针(pointer)是一个变量的内存地址,就像 C 语言那样: a : 10 p : &a // p 是一个指向 a 的指针 fmt.Println(*p) // 输出 10,通过指针解引用• &a 表示获取变量 a 的地址 p 表示…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现指南针功能
指南针功能是许多位置服务应用的基础功能之一。下面我将详细介绍如何在HarmonyOS 5中使用DevEco Studio实现指南针功能。 1. 开发环境准备 确保已安装DevEco Studio 3.1或更高版本确保项目使用的是HarmonyOS 5.0 SDK在项目的module.json5中配置必要的权限 2. 权限配置 在mo…...
FTXUI::Dom 模块
DOM 模块定义了分层的 FTXUI::Element 树,可用于构建复杂的终端界面,支持响应终端尺寸变化。 namespace ftxui {...// 定义文档 定义布局盒子 Element document vbox({// 设置文本 设置加粗 设置文本颜色text("The window") | bold | color(…...
算法刷题-回溯
今天给大家分享的还是一道关于dfs回溯的问题,对于这类问题大家还是要多刷和总结,总体难度还是偏大。 对于回溯问题有几个关键点: 1.首先对于这类回溯可以节点可以随机选择的问题,要做mian函数中循环调用dfs(i&#x…...
ABB馈线保护 REJ601 BD446NN1XG
配电网基本量程数字继电器 REJ601是一种专用馈线保护继电器,用于保护一次和二次配电网络中的公用事业和工业电力系统。该继电器在一个单元中提供了保护和监控功能的优化组合,具有同类产品中最佳的性能和可用性。 REJ601是一种专用馈线保护继电器…...