当前位置: 首页 > news >正文

pyfink1.20版本下实现消费kafka中数据并实时计算

1、环境

JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.20

2、执行python-flink脚本

从kafka的demo获取消息,并将其中的a字段存入kafka的test_kafka_topic内,并打印sum(b)的值

from pyflink.table import TableEnvironment, EnvironmentSettingsdef log_processing():# 创建流处理环境env_settings = EnvironmentSettings.in_streaming_mode()t_env = TableEnvironment.create(env_settings)# 设置 Kafka 连接器 JAR 文件的路径# 确保 JAR 文件确实存在于指定路径,并且与 Flink 版本兼容t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///home/data/flink/flink-1.20.0/lib/flink-sql-connector-kafka-3.3.0-1.20.jar")# 定义源表 DDLsource_ddl = """CREATE TABLE source_table(a VARCHAR,b INT  -- 如果 b 字段不重要,可以考虑从源表中移除它) WITH ('connector' = 'kafka','topic' = 'demo','properties.bootstrap.servers' = '192.168.15.130:9092','properties.group.id' = 'test_3','scan.startup.mode' = 'latest-offset','format' = 'json')"""# 定义目标表 DDLsink_ddl = """CREATE TABLE sink_table(a VARCHAR) WITH ('connector' = 'kafka','topic' = 'test_kafka_topic','properties.bootstrap.servers' = '192.168.15.130:9092','format' = 'json')"""# 执行 DDL 语句创建表t_env.execute_sql(source_ddl)#table = t_env.from_path("sql_source")#table.execute().print()table_result  = t_env.execute_sql("select sum(b) sb from source_table")table_result.print()t_env.execute_sql(sink_ddl)# 执行 SQL 查询并将结果插入到目标表# 注意:wait() 方法会阻塞,直到插入操作完成(在流处理中通常是无限的)t_env.sql_query("SELECT a FROM source_table") \.execute_insert("sink_table").wait()  # 考虑是否真的需要 wait()if __name__ == '__main__':log_processing()
python3 KafkaSource.py

3、启动kafka生产者

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.130:9092 --topic demo
输入模拟数据进行测试
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}

可以看到sum(b)值已输出
在这里插入图片描述

4、启动kafka消费者

查看往test_kafka_topic插入的a字段数据已被消费

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.130:9092  --from-beginning --topic test_kafka_topic

在这里插入图片描述

相关文章:

pyfink1.20版本下实现消费kafka中数据并实时计算

1、环境 JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.202、执行python-flink脚本 从kafka的demo获取消…...

国产Tomcat东方通(TongWeb):Linux系统部署全攻略

国产Tomcat东方通(TongWeb):Linux系统部署全攻略 国产tomcat东方通tongweb在linux系统上部署全流程 [这里是图片001] 项目地址: https://gitcode.com/Resource-Bundle-Collection/fca68 项目介绍 在当前的软件开发和运维环境中&#xff0c…...

bilstm -crf 的torch官方的教程 不支持 batchsize 。。。。有更效率更高的方案吗

根据您的需求,您正在寻找一个支持批量处理(batch size)的BiLSTM-CRF模型的PyTorch教程。以下是一些资源,它们提供了更高效的解决方案,包括批量处理的支持: 1. **a-PyTorch-Tutorial-to-Sequence-Labeling*…...

Python面试常见问题及答案6

一、基础部分 问题1: 在Python中,如何将字符串转换为整数?如果字符串不是合法的数字字符串会怎样? 答案: 在Python中,可以使用int()函数将字符串转换为整数。如果字符串是合法的数字字符串,转换…...

代码随想录算法训练营第三天 | 链表理论基础 | 203.移除链表元素

感觉上是可以轻松完成的,因为对链接的结构,元素的删除过程心里明镜似的 实际上四处跑气 结构体的初始化好像完全忘掉了,用malloc折腾半天,忘记了用new,真想扇自己嘴巴子到飞起删除后写一个函数,把链表打印…...

1. 机器学习基本知识(5)——练习题(1)

1.7 🐦‍🔥练习题(本章重点回顾与总结) 0.回答格式约定: 对于书本内容的回答,将优先寻找书本内容作为答案进行回答。 书本内容回答完毕后,将对问题进行补充回答,上面分割线作为两个…...

vue 自定义组件image 和 input

本章主要是介绍自定义的组件:WInput:这是一个验证码输入框,自动校验,输入完成回调等;WImage:这是一个图片展示组件,集成了缩放,移动等操作。 目录 一、安装 二、引入组件 三、使用…...

系列3:基于Centos-8.6 Kubernetes使用nfs挂载pod的应用日志文件

每日禅语 古代,一位官员被革职遣返,心中苦闷无处排解,便来到一位禅师的法堂。禅师静静地听完了此人的倾诉,将他带入自己的禅房之中。禅师指着桌上的一瓶水,微笑着对官员说:​“你看这瓶水,它已经…...

Jfinal项目整合Redis

1、引入相关依赖 <!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version> </dependency><depen…...

在Ubuntu服务器上备份文件到自己的百度网盘

文章目录 概述安装bypy同步文件定时任务脚本 概述 之前自购了一台阿里云服务器&#xff0c;系统镜像为Ubuntu 22.04&#xff0c; 并且搭建了LNMP开发环境&#xff08;可以参考&#xff1a;《Ubuntu搭建PHP开发环境操作步骤(保姆级教程)》&#xff09;。由于项目运行中会产生附…...

Unity 模板测试透视效果(URP)

可以实现笼中窥梦和PicoVR中通过VST局部透视效果。 使用到的Shader: Shader "Unlit/StencilShader" {Properties{[IntRange]_Index("Stencil Index",Range(0,255))0}SubShader{Tags{"RenderType""Opaque""Queue""Geo…...

《计算机视觉证书:开启职业发展新航道》

一、引言 在当今科技飞速发展的时代&#xff0c;计算机视觉技术正以惊人的速度改变着我们的生活和工作方式。从智能手机的人脸识别解锁到自动驾驶汽车的环境感知&#xff0c;计算机视觉技术的应用无处不在。而计算机视觉证书作为这一领域的专业认证&#xff0c;其作用愈发凸显…...

.NET6 WebApi第1讲:VSCode开发.NET项目、区别.NET5框架【两个框架启动流程详解】

一、使用VSCode开发.NET项目 1、创建文件夹&#xff0c;使用VSCode打开 2、安装扩展工具 1>C# 2>安装NuGet包管理工具&#xff0c;外部dll包依靠它来加载 法1》&#xff1a;NuGet Gallery&#xff0c;注意要启动科学的工具 法2》NuGet Package Manager GUl&#xff0c…...

Git-分布式版本控制工具

目录 1. 概述 1. 1集中式版本控制工具 1.2分布式版本控制工具 2.Git 2.1 git 工作流程 1. 概述 在开发活动中&#xff0c;我们经常会遇到以下几个场景&#xff1a;备份、代码回滚、协同开发、追溯问题代码编写人和编写时间&#xff08;追责&#xff09;等。备份的话是为了…...

C++ 第10章 对文件的输入输出

https://www.bilibili.com/video/BV1cx4y1d7Ut/?p147&spm_id_from333.1007.top_right_bar_window_history.content.click&vd_sourcee8984989cddeb3ef7b7e9fd89098dbe8 &#x1f341;&#x1f341;&#x1f341;本篇为贺宏宏老师C语言视频教程文件输入输出部分笔记整理…...

【机器学习】手写数字识别的最优解:CNN+Softmax、Sigmoid与SVM的对比实战

一、基于CNNSoftmax函数进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分析 二、 基于CNNsigmoid函数进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分析 三、 基于CNNSVM进行分类 1数据集准备 2模型设计 3模型训练 4模型评估 5结果分…...

android 聊天界面键盘、表情切换丝滑

1、我们在聊天页面时候&#xff0c;往往会遇到&#xff0c;键盘、表情、其他选择切换时候页面会出现掉下来再弹起问题&#xff0c;这是因为&#xff0c;我们切换时候&#xff0c;键盘异步导致内容View高度变化&#xff0c;页面掉下来后&#xff0c;又被其他内容顶起这种很差视觉…...

Web项目图片视频加载缓慢/首屏加载白屏

Web项目图片视频加载缓慢/首屏加载白屏 文章目录 Web项目图片视频加载缓慢/首屏加载白屏一、原因二、 解决方案2.1、 图片和视频的优化2.1.1、压缩图片或视频2.1.2、 选择合适的图片或视频格式2.1.3、 使用图片或视频 CDN 加速2.1.4、Nginx中开启gzip 三、压缩工具推荐 一、原因…...

关于Git分支合并,跨仓库合并方式

关于Git合并代码的方式说明 文章目录 关于Git合并代码的方式说明前情提要开始合并方式一&#xff1a;git merge方式二&#xff1a;git cherry-pick方式三&#xff1a;git checkout Git跨仓库合并的准备事项前提拉取源仓库代码 前情提要 同仓库不同分支代码的合并可直接往下看文…...

[网络] UDP协议16位校验和

16位校验和是udp报头中的一个字段,绝大多数的教材和网课都会忽略这个字段,不去细究,我闲的蛋疼问了问ai,得到了一个答案,故作此文,以证明我爱学习之心惊天地泣鬼神(狗头 ai的回答 仅从作用来说,它会根据整个应用层报文进行运算,生成一个准确的数字,这个数字不能保证唯一性,但根…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

边缘计算医疗风险自查APP开发方案

核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

2023赣州旅游投资集团

单选题 1.“不登高山&#xff0c;不知天之高也&#xff1b;不临深溪&#xff0c;不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

蓝桥杯 冶炼金属

原题目链接 &#x1f527; 冶炼金属转换率推测题解 &#x1f4dc; 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V&#xff0c;是一个正整数&#xff0c;表示每 V V V 个普通金属 O O O 可以冶炼出 …...

C#学习第29天:表达式树(Expression Trees)

目录 什么是表达式树&#xff1f; 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持&#xff1a; 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...

Git常用命令完全指南:从入门到精通

Git常用命令完全指南&#xff1a;从入门到精通 一、基础配置命令 1. 用户信息配置 # 设置全局用户名 git config --global user.name "你的名字"# 设置全局邮箱 git config --global user.email "你的邮箱example.com"# 查看所有配置 git config --list…...

js 设置3秒后执行

如何在JavaScript中延迟3秒执行操作 在JavaScript中&#xff0c;要设置一个操作在指定延迟后&#xff08;例如3秒&#xff09;执行&#xff0c;可以使用 setTimeout 函数。setTimeout 是JavaScript的核心计时器方法&#xff0c;它接受两个参数&#xff1a; 要执行的函数&…...