当前位置: 首页 > news >正文

Kafka模拟器产生数据仿真-集成StructuredStreaming做到”毫秒“级实时响应StreamData落地到mysql

          这是仿真过程某图:

仿真场景kafkaStream
仿真实战kafka
 

 kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收

数据落地情况: 

全部接收到并all存入mysql

下面就简单分享一下StructuredStreaming代码吧

import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.streaming.{ OutputMode, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType,  StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}val spark: SparkSession = SparkSession.builder().appName("kafkaConsumer").master("local[3]").getOrCreate()import spark.implicits._// 定义json字段类型格式val Jsonschmea: StructType = new StructType().add("id", dataType = IntegerType).add("name", dataType = StringType).add("sorce", dataType = IntegerType)val message: DataFrame = spark.readStream // message为从kafka读到的原数据.format("kafka").option("kafka.bootstrap.servers", "xxxxx:9092,xxxx:9092,xxxx:9092").option("subscribe", "xxxx").option("startingOffsets", "latest").load()// 将json字符串转化为结构化数据val streamData: DataFrame = message.selectExpr("cast(value as String) as message") .select(from_json($"message", Jsonschmea).alias("data"))// 将json结构化为新的df// 预加载mysql驱动// 实时写入 第二个参数预占位,want给每一批次加入唯一表示, but本次仅占位没有传参数def writeToMysql(batchDF: DataFrame, epochId: Long): Unit = {val sqlurl = "jdbc:mysql://localhost:xxxx/xxxx"val sqluser = "xxxx"val sqlpass = "xxxxx"Class.forName("com.mysql.cj.jdbc.Driver")  // mysql 8.0后得驱动,旧版本去掉cjbatchDF.foreachPartition {partitionOfRecords =>val connection = DriverManager.getConnection(sqlurl, sqluser, sqlpass)// 关闭自动提交以支持增量写入connection.setAutoCommit(false)// 创建预编译的插入语句val insertsql = "insert into jsonstream(id,name,sorce) values(?,?,?)"val preparedStatement = connection.prepareStatement(insertsql)partitionOfRecords.foreach {row =>
//              val id = row.getAs[Int]("data.id")
//              val name = row.getAs[String]("data.name")
//              val score = row.getAs[Int]("data.sorce")val id = row.getAs[Row]("data").getAs[Int]("id")val name = row.getAs[Row]("data").getAs[String]("name")val sorce = row.getAs[Row]("data").getAs[Int]("sorce")// 设置参数到预处理sql函数中preparedStatement.setInt(1, id)preparedStatement.setString(2, name)preparedStatement.setInt(3, sorce)// 执行添加到批次操作preparedStatement.addBatch()}preparedStatement.executeBatch()connection.commit() // 执行批处理后手动提交事务preparedStatement.close()  // 手动GCconnection.close()}}// 数据落地到数据库streamData.writeStream.outputMode(OutputMode.Append()).foreachBatch(writeToMysql _).trigger(Trigger.ProcessingTime("1 millisecond")) // 1 毫秒每个batch.start().awaitTermination()

存储按照一定批次量做存储   

友情提示 : 上述程序是经过脱敏处理的哦

----彩蛋----

如果你看到者你会知道scala在11更新之后也就是12版本如下:

batchDF.foreachPartition {partitionOfRecords => ... 这个位置

 Dataset的foreachPartition 里面不能处理 Row的Iterator, 所以需要转为rdd在做处理

所以更改后为

batchDF.rdd.foreachPartition { partitionOfRecords => ...

而且这里不能用foreach , 否则无法序列化就能存储到mysql, 不能被序列化的数据是不能在网络中进行传输的,通过二进制流的形式传出,在被反序列化回来转化为对象的形式存储

ok -----

相关文章:

Kafka模拟器产生数据仿真-集成StructuredStreaming做到”毫秒“级实时响应StreamData落地到mysql

这是仿真过程某图: 仿真实战kafka kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收 数据落地情况: 全部接收到并all存入mysql 下面就简单分享一下StructuredStreaming代码吧 import org.apache.spark.sql.function…...

IDEA如何删除git最新一次远程提交

IDEA如何删除git最新一次远程提交 选择应用 -> Git -> Show History 选择最新提交上一次提交 -> Reset Current Branch to Here… Reset 提示框选择 Hard push到远程分支 -> 选择Force Push 结果验证 (最新分支已被删除)...

什么是单向数据流

单向数据流是一种数据流动的模式,通常用于前端框架(如 React、Vue 等)中。在单向数据流中,数据只能从一个方向流向另一个方向,不会出现数据的双向流动。这种模式有助于简化数据的管理和状态的维护,提高代码…...

Qt 线程池 QThreadPool

一.Qt 线程池 QThreadPool介绍 Qt线程池是一种管理多个线程的并发编程模型,通过使用线程池可以提高性能、控制并发度、提供任务队列和简化线程管理。 在Qt中,线程池的使用主要涉及以下几个步骤: 创建任务类:需要定义一个任务类&am…...

【兔子机器人】实现从初始状态到站立

一、遥想星空up主的方法 由于我有卡位结构,无法做到劈腿,而且底盘也不一样,无法使用此方法 但是其代码思想是可以借鉴的。 参考视频: 【【开源啦!】无刷轮腿平衡机器人】 【精准空降到 01:16】 https://www.bilibili…...

ImportError: cannot import name ‘open_filename‘ from ‘pdfminer.utils‘已搞定

报错内容 ImportError: cannot import name ‘open_filename’ from ‘pdfminer.utils’ 第一步:pip uninstall pdfminer 解决办法 pip3 install pdfminer.six注意不要 pip install pdfminer.six是安装不了的...

一文解决Word中公式插入问题(全免费/latex公式输入/texsWord)

分文不花,搞定你的word公式输入/texsWord完全使用指南 背景 碎碎念:折折腾腾至少装了几个小时,遇到了若干大坑。遇到的问题网上都搜索不到答案!!!就让我来当指路的小火柴吧。 本篇适用于在word中输入la…...

C语言实战——扫雷游戏

目录 1. 扫雷游戏分析和设计2.扫雷游戏的代码实现 1. 扫雷游戏分析和设计 1.1扫雷游戏的功能说明 使用控制台实现经典的扫雷游戏游戏可以通过菜单实现继续玩或者退出游戏扫雷的棋盘是9*9的格子默认随机布置10个雷可以排查雷 如果位置不是雷,就显示周围有几个雷 如果…...

.Net使用ElasticSearch

文章目录 前言主体内容一.Kibana中ElasticSearch的基础操作1.GET(查询)1.POST(新增)1.PUT(修改)1.DELET(删除) 二.在.Net中,对ElasticSearch进行基础操作1.DotNet连接Ela…...

HTML5、CSS3面试题(二)

上一章:HTML5、CSS3面试题(一) 哪些是块级元素那些是行内元素,各有什么特点 ?(必会) 行内元素: a、span、b、img、strong、input、select、lable、em、button、textarea 、selecting 块级元素&#xff1…...

sqllab第十一关通关笔记

知识点: 发现登录框就可以尝试注入登录框一般都是字符型注入通过注入可以获取其他表的信息绕过手段 单引号闭合联合注入也可以进行错误注入 首先看界面是一个登录框;通过admin admin登录进去,发现页面会把用户名和密码的登录信息打印出来&am…...

机械女生,双非本985硕,目前学了C 基础知识,转嵌入式还是java更好?

作为单片机项目开发的卖课佬,个人建议,先转嵌入式单片机开发方向,哈哈。 java我也学过,还学过oracle、mysql数据库,只是当时没做笔记,找不好充分的装逼证据了。 从实习通过业余时间,学到快正式毕…...

Python之字符串操作大全(29种方法)

本章详细介绍了常用的29种字符串操作方法及代码示例。 1. 重复输出字符串 print(x * 20) 输出:xxxxxxxxxxxxxxxxxxxx 2. 通过索引获取字符串 print(hello world[2:5]) 输出:llo 3. in 判断字符是否在字符串内 print(e in hello world) 输出&…...

ArcGIS学习(十五)用地适宜性评价

ArcGIS学习(十五)用地适宜性评价 本任务给大家带来的内容是用地适宜性评价。 用地适宜性评价是大家在平时工作中最常接触到的分析场景之一。尤其是在国土空间规划的大背景下,用地适宜性评价变得越来越重要。 此外,我们之前的任务主要是使用矢量数据进行分析。本案例是主讲…...

【matlab】如何将.mat文件与.nii文件互转

【matlab】如何将.mat文件与.nii文件互转 .mat转为.nii文件 有时候代码需要读取的是.nii文件,但是如何现有的数据是.mat格式,需要将.mata转化为.nii文件 1、先加载.mat文件 % 加载.mat文件 load(your_mat_file.mat); % 请将your_mat_file.mat替换为实…...

Uni-app开发Canvas当子组件示例,点点绘制图形

前言 使用Uni-app 实现封装一个Canvas渲染的子组件,实现通过传入两点绘制一条完整的路程 具体逻辑看我发的后端和数据库设计 C# 根据两点名称,寻找两短路程的最优解,【有数据库设计,完整代码】 即使不了解具体逻辑,该…...

从金蝶云星空到钉钉通过接口配置打通数据

从金蝶云星空到钉钉通过接口配置打通数据 对接系统金蝶云星空 金蝶K/3Cloud(金蝶云星空)是移动互联网时代的新型ERP,是基于WEB2.0与云技术的新时代企业管理服务平台。金蝶K/3Cloud围绕着“生态、人人、体验”,旨在帮助企业打造面…...

Unreal发布Android在刘海屏手机上不能全屏显示问题

Unreal 4.27发布Android在刘海屏手机上不能全屏显示问题 Android设置全屏刘海屏全屏设置4.27设置刘海屏在部分手机不能显示问题 Android设置全屏 AndroidManifest.xml文件配置 ...<activity android:name"com.epicgames.ue4.GameActivity" android:label"st…...

hive库表占用空间大小的命令

1、查每个hive表占用的空间大小 hdfs dfs -du -h /user/hive/warehouse 2、按占用空间大小降序排列 hdfs dfs -du /user/hive/warehouse/ipms.db | sort -nr 3、查某一个分区占用空间大小&#xff08;单位G) hadoop fs -ls /user/hive/warehouse/ipms.db/dw_ft_se_nt_u_gen…...

关于go中的select

笔记仓库&#xff1a;gitee.com/xiaoyinhui 代码中的解释纯个人理解&#xff0c;有不对的望指出 package testsimport ("fmt""testing" )var uCnt int 0func TestSelece(t *testing.T) {// 对于 select 语句&#xff0c;在进入该语句时&#xff0c;会按源…...

Nuxt.js 中的路由配置详解

Nuxt.js 通过其内置的路由系统简化了应用的路由配置&#xff0c;使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

爬虫基础学习day2

# 爬虫设计领域 工商&#xff1a;企查查、天眼查短视频&#xff1a;抖音、快手、西瓜 ---> 飞瓜电商&#xff1a;京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空&#xff1a;抓取所有航空公司价格 ---> 去哪儿自媒体&#xff1a;采集自媒体数据进…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

es6+和css3新增的特性有哪些

一&#xff1a;ECMAScript 新特性&#xff08;ES6&#xff09; ES6 (2015) - 革命性更新 1&#xff0c;记住的方法&#xff0c;从一个方法里面用到了哪些技术 1&#xff0c;let /const块级作用域声明2&#xff0c;**默认参数**&#xff1a;函数参数可以设置默认值。3&#x…...

从零开始了解数据采集(二十八)——制造业数字孪生

近年来&#xff0c;我国的工业领域正经历一场前所未有的数字化变革&#xff0c;从“双碳目标”到工业互联网平台的推广&#xff0c;国家政策和市场需求共同推动了制造业的升级。在这场变革中&#xff0c;数字孪生技术成为备受关注的关键工具&#xff0c;它不仅让企业“看见”设…...

DeepSeek越强,Kimi越慌?

被DeepSeek吊打的Kimi&#xff0c;还有多少人在用&#xff1f; 去年&#xff0c;月之暗面创始人杨植麟别提有多风光了。90后清华学霸&#xff0c;国产大模型六小虎之一&#xff0c;手握十几亿美金的融资。旗下的AI助手Kimi烧钱如流水&#xff0c;单月光是投流就花费2个亿。 疯…...

Linux操作系统共享Windows操作系统的文件

目录 一、共享文件 二、挂载 一、共享文件 点击虚拟机选项-设置 点击选项&#xff0c;设置文件夹共享为总是启用&#xff0c;点击添加&#xff0c;可添加需要共享的文件夹 查询是否共享成功 ls /mnt/hgfs 如果显示Download&#xff08;这是我共享的文件夹&#xff09;&…...

CMS内容管理系统的设计与实现:多站点模式的实现

在一套内容管理系统中&#xff0c;其实有很多站点&#xff0c;比如企业门户网站&#xff0c;产品手册&#xff0c;知识帮助手册等&#xff0c;因此会需要多个站点&#xff0c;甚至PC、mobile、ipad各有一个站点。 每个站点关联的有站点所在目录及所属的域名。 一、站点表设计…...

k8s从入门到放弃之Pod的容器探针检测

k8s从入门到放弃之Pod的容器探针检测 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;容器探测是指kubelet对容器执行定期诊断的过程&#xff0c;以确保容器中的应用程序处于预期的状态。这些探测是保障应用健康和高可用性的重要机制。Kubernetes提供了两种种类型…...