当前位置: 首页 > news >正文

Kafka模拟器产生数据仿真-集成StructuredStreaming做到”毫秒“级实时响应StreamData落地到mysql

          这是仿真过程某图:

仿真场景kafkaStream
仿真实战kafka
 

 kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收

数据落地情况: 

全部接收到并all存入mysql

下面就简单分享一下StructuredStreaming代码吧

import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.streaming.{ OutputMode, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType,  StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}val spark: SparkSession = SparkSession.builder().appName("kafkaConsumer").master("local[3]").getOrCreate()import spark.implicits._// 定义json字段类型格式val Jsonschmea: StructType = new StructType().add("id", dataType = IntegerType).add("name", dataType = StringType).add("sorce", dataType = IntegerType)val message: DataFrame = spark.readStream // message为从kafka读到的原数据.format("kafka").option("kafka.bootstrap.servers", "xxxxx:9092,xxxx:9092,xxxx:9092").option("subscribe", "xxxx").option("startingOffsets", "latest").load()// 将json字符串转化为结构化数据val streamData: DataFrame = message.selectExpr("cast(value as String) as message") .select(from_json($"message", Jsonschmea).alias("data"))// 将json结构化为新的df// 预加载mysql驱动// 实时写入 第二个参数预占位,want给每一批次加入唯一表示, but本次仅占位没有传参数def writeToMysql(batchDF: DataFrame, epochId: Long): Unit = {val sqlurl = "jdbc:mysql://localhost:xxxx/xxxx"val sqluser = "xxxx"val sqlpass = "xxxxx"Class.forName("com.mysql.cj.jdbc.Driver")  // mysql 8.0后得驱动,旧版本去掉cjbatchDF.foreachPartition {partitionOfRecords =>val connection = DriverManager.getConnection(sqlurl, sqluser, sqlpass)// 关闭自动提交以支持增量写入connection.setAutoCommit(false)// 创建预编译的插入语句val insertsql = "insert into jsonstream(id,name,sorce) values(?,?,?)"val preparedStatement = connection.prepareStatement(insertsql)partitionOfRecords.foreach {row =>
//              val id = row.getAs[Int]("data.id")
//              val name = row.getAs[String]("data.name")
//              val score = row.getAs[Int]("data.sorce")val id = row.getAs[Row]("data").getAs[Int]("id")val name = row.getAs[Row]("data").getAs[String]("name")val sorce = row.getAs[Row]("data").getAs[Int]("sorce")// 设置参数到预处理sql函数中preparedStatement.setInt(1, id)preparedStatement.setString(2, name)preparedStatement.setInt(3, sorce)// 执行添加到批次操作preparedStatement.addBatch()}preparedStatement.executeBatch()connection.commit() // 执行批处理后手动提交事务preparedStatement.close()  // 手动GCconnection.close()}}// 数据落地到数据库streamData.writeStream.outputMode(OutputMode.Append()).foreachBatch(writeToMysql _).trigger(Trigger.ProcessingTime("1 millisecond")) // 1 毫秒每个batch.start().awaitTermination()

存储按照一定批次量做存储   

友情提示 : 上述程序是经过脱敏处理的哦

----彩蛋----

如果你看到者你会知道scala在11更新之后也就是12版本如下:

batchDF.foreachPartition {partitionOfRecords => ... 这个位置

 Dataset的foreachPartition 里面不能处理 Row的Iterator, 所以需要转为rdd在做处理

所以更改后为

batchDF.rdd.foreachPartition { partitionOfRecords => ...

而且这里不能用foreach , 否则无法序列化就能存储到mysql, 不能被序列化的数据是不能在网络中进行传输的,通过二进制流的形式传出,在被反序列化回来转化为对象的形式存储

ok -----

相关文章:

Kafka模拟器产生数据仿真-集成StructuredStreaming做到”毫秒“级实时响应StreamData落地到mysql

这是仿真过程某图: 仿真实战kafka kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收 数据落地情况: 全部接收到并all存入mysql 下面就简单分享一下StructuredStreaming代码吧 import org.apache.spark.sql.function…...

IDEA如何删除git最新一次远程提交

IDEA如何删除git最新一次远程提交 选择应用 -> Git -> Show History 选择最新提交上一次提交 -> Reset Current Branch to Here… Reset 提示框选择 Hard push到远程分支 -> 选择Force Push 结果验证 (最新分支已被删除)...

什么是单向数据流

单向数据流是一种数据流动的模式,通常用于前端框架(如 React、Vue 等)中。在单向数据流中,数据只能从一个方向流向另一个方向,不会出现数据的双向流动。这种模式有助于简化数据的管理和状态的维护,提高代码…...

Qt 线程池 QThreadPool

一.Qt 线程池 QThreadPool介绍 Qt线程池是一种管理多个线程的并发编程模型,通过使用线程池可以提高性能、控制并发度、提供任务队列和简化线程管理。 在Qt中,线程池的使用主要涉及以下几个步骤: 创建任务类:需要定义一个任务类&am…...

【兔子机器人】实现从初始状态到站立

一、遥想星空up主的方法 由于我有卡位结构,无法做到劈腿,而且底盘也不一样,无法使用此方法 但是其代码思想是可以借鉴的。 参考视频: 【【开源啦!】无刷轮腿平衡机器人】 【精准空降到 01:16】 https://www.bilibili…...

ImportError: cannot import name ‘open_filename‘ from ‘pdfminer.utils‘已搞定

报错内容 ImportError: cannot import name ‘open_filename’ from ‘pdfminer.utils’ 第一步:pip uninstall pdfminer 解决办法 pip3 install pdfminer.six注意不要 pip install pdfminer.six是安装不了的...

一文解决Word中公式插入问题(全免费/latex公式输入/texsWord)

分文不花,搞定你的word公式输入/texsWord完全使用指南 背景 碎碎念:折折腾腾至少装了几个小时,遇到了若干大坑。遇到的问题网上都搜索不到答案!!!就让我来当指路的小火柴吧。 本篇适用于在word中输入la…...

C语言实战——扫雷游戏

目录 1. 扫雷游戏分析和设计2.扫雷游戏的代码实现 1. 扫雷游戏分析和设计 1.1扫雷游戏的功能说明 使用控制台实现经典的扫雷游戏游戏可以通过菜单实现继续玩或者退出游戏扫雷的棋盘是9*9的格子默认随机布置10个雷可以排查雷 如果位置不是雷,就显示周围有几个雷 如果…...

.Net使用ElasticSearch

文章目录 前言主体内容一.Kibana中ElasticSearch的基础操作1.GET(查询)1.POST(新增)1.PUT(修改)1.DELET(删除) 二.在.Net中,对ElasticSearch进行基础操作1.DotNet连接Ela…...

HTML5、CSS3面试题(二)

上一章:HTML5、CSS3面试题(一) 哪些是块级元素那些是行内元素,各有什么特点 ?(必会) 行内元素: a、span、b、img、strong、input、select、lable、em、button、textarea 、selecting 块级元素&#xff1…...

sqllab第十一关通关笔记

知识点: 发现登录框就可以尝试注入登录框一般都是字符型注入通过注入可以获取其他表的信息绕过手段 单引号闭合联合注入也可以进行错误注入 首先看界面是一个登录框;通过admin admin登录进去,发现页面会把用户名和密码的登录信息打印出来&am…...

机械女生,双非本985硕,目前学了C 基础知识,转嵌入式还是java更好?

作为单片机项目开发的卖课佬,个人建议,先转嵌入式单片机开发方向,哈哈。 java我也学过,还学过oracle、mysql数据库,只是当时没做笔记,找不好充分的装逼证据了。 从实习通过业余时间,学到快正式毕…...

Python之字符串操作大全(29种方法)

本章详细介绍了常用的29种字符串操作方法及代码示例。 1. 重复输出字符串 print(x * 20) 输出:xxxxxxxxxxxxxxxxxxxx 2. 通过索引获取字符串 print(hello world[2:5]) 输出:llo 3. in 判断字符是否在字符串内 print(e in hello world) 输出&…...

ArcGIS学习(十五)用地适宜性评价

ArcGIS学习(十五)用地适宜性评价 本任务给大家带来的内容是用地适宜性评价。 用地适宜性评价是大家在平时工作中最常接触到的分析场景之一。尤其是在国土空间规划的大背景下,用地适宜性评价变得越来越重要。 此外,我们之前的任务主要是使用矢量数据进行分析。本案例是主讲…...

【matlab】如何将.mat文件与.nii文件互转

【matlab】如何将.mat文件与.nii文件互转 .mat转为.nii文件 有时候代码需要读取的是.nii文件,但是如何现有的数据是.mat格式,需要将.mata转化为.nii文件 1、先加载.mat文件 % 加载.mat文件 load(your_mat_file.mat); % 请将your_mat_file.mat替换为实…...

Uni-app开发Canvas当子组件示例,点点绘制图形

前言 使用Uni-app 实现封装一个Canvas渲染的子组件,实现通过传入两点绘制一条完整的路程 具体逻辑看我发的后端和数据库设计 C# 根据两点名称,寻找两短路程的最优解,【有数据库设计,完整代码】 即使不了解具体逻辑,该…...

从金蝶云星空到钉钉通过接口配置打通数据

从金蝶云星空到钉钉通过接口配置打通数据 对接系统金蝶云星空 金蝶K/3Cloud(金蝶云星空)是移动互联网时代的新型ERP,是基于WEB2.0与云技术的新时代企业管理服务平台。金蝶K/3Cloud围绕着“生态、人人、体验”,旨在帮助企业打造面…...

Unreal发布Android在刘海屏手机上不能全屏显示问题

Unreal 4.27发布Android在刘海屏手机上不能全屏显示问题 Android设置全屏刘海屏全屏设置4.27设置刘海屏在部分手机不能显示问题 Android设置全屏 AndroidManifest.xml文件配置 ...<activity android:name"com.epicgames.ue4.GameActivity" android:label"st…...

hive库表占用空间大小的命令

1、查每个hive表占用的空间大小 hdfs dfs -du -h /user/hive/warehouse 2、按占用空间大小降序排列 hdfs dfs -du /user/hive/warehouse/ipms.db | sort -nr 3、查某一个分区占用空间大小&#xff08;单位G) hadoop fs -ls /user/hive/warehouse/ipms.db/dw_ft_se_nt_u_gen…...

关于go中的select

笔记仓库&#xff1a;gitee.com/xiaoyinhui 代码中的解释纯个人理解&#xff0c;有不对的望指出 package testsimport ("fmt""testing" )var uCnt int 0func TestSelece(t *testing.T) {// 对于 select 语句&#xff0c;在进入该语句时&#xff0c;会按源…...

【Node.js从基础到高级运用】十一、构建RESTful API

在本篇博客中&#xff0c;我们将综合之前讨论的内容&#xff0c;深入探索如何使用Node.js构建一个RESTful API。我们将重点讨论设计合理的API端点&#xff0c;展示如何通过代码实现这些端点&#xff0c;并指导如何使用Postman测试我们的API&#xff0c;确保其按预期工作。 前提…...

Python和MATLAB数字信号波形和模型模拟

要点 Python和MATLAB实现以下波形和模型模拟 以给定采样率模拟正弦信号&#xff0c;生成给定参数的方波信号&#xff0c;生成给定参数隔离矩形脉冲&#xff0c;生成并绘制线性调频信号。快速傅里叶变换结果释义&#xff1a;复数离散傅里叶变换、频率仓和快速傅里叶变换移位&am…...

华为OD技术C卷“测试用例执行计划”Java解答

描述 示例 算法思路1 整体思路是&#xff0c;先读取特性的优先级和测试用例覆盖的特性列表&#xff0c;然后计算每个测试用例的优先级&#xff0c;并将其与测试用例的索引存储到二维数组中。最后按照优先级和索引排序&#xff0c;输出测试用例的索引&#xff0c;即为执行顺序。…...

solana 入门 1

solana-co-learn Solana 开发学习笔记(一)——从 Hello World 出发 安装开发环境 windows下环境配置 wsl First start with installing WSL on your system. wsl --install wsl安装Ubuntu 列出可用的分发版 wsl.exe --list --online显示&#xff1a; 以下是可安装的有效…...

JavaEE之多线程(创建线程的五种写法)详解

&#x1f63d;博主CSDN主页: 小源_&#x1f63d; &#x1f58b;️个人专栏: JavaEE &#x1f600;努力追逐大佬们的步伐~ 目录 1. 前言 2. 操作系统"内核" 3. 创建线程的五种写法 (我们重点要掌握最后一种写法!!) 3.1 继承 Thread, 重写 run 3. 2 实现 Runnabl…...

ChatGPT国内能用吗?中国用户怎么才能使用ChatGPT?

与ChatGPT类似的国内网站&#xff0c;他们都能提供和ChatGPT相似的能力&#xff0c;而且可以在国内直接使用。 点击直达方式 百科GPT官网&#xff1a;baikegpt.cn ChatGPT是基于GPT-3.5架构的语言模型的一个实例&#xff0c;由OpenAI开发。以下是ChatGPT的发展历史&#xff1…...

集群保持集群负载均衡和hash一致性

集群保持负载均衡和一致性哈希是在构建分布式系统时经常涉及的两个重要概念。 负载均衡&#xff1a; - 在集群中&#xff0c;负载均衡是指将传入的请求有效地分发到不同的服务器上&#xff0c;以确保每台服务器都能够处理适量的流量&#xff0c;避免某些服务器过载而造成性能问…...

吴恩达深度学习笔记:神经网络的编程基础2.9-2.14

目录 第一门课&#xff1a;神经网络和深度学习 (Neural Networks and Deep Learning)第二周&#xff1a;神经网络的编程基础 (Basics of Neural Network programming)2.9 逻辑回归中的梯度下降&#xff08;Logistic Regression Gradient Descent&#xff09; 第一门课&#xff…...

在C++项目中使用python脚本(四种)常见报错解决

上一期我们讲了如何在C中使用python脚本&#xff0c;这期讲讲过程中常会遇到的一些错误。 一、c代码未设置python路径 Py_SetPythonHome(L"D:\\anaconda3\\envs\\envpython3.7");这一句很重要&#xff0c;切记加上并且换成自己的路径 Py_SetPythonHome(L"D:\\a…...

微前端框架 qiankun 配置使用【基于 vue/react脚手架创建项目 】

qiankun官方文档&#xff1a;qiankun - qiankun 一、创建主应用&#xff1a; 这里以 vue 为主应用&#xff0c;vue版本&#xff1a;2.x // 全局安装vue脚手架 npm install -g vue/clivue create main-app 省略 vue 创建项目过程&#xff0c;若不会可以自行百度查阅教程 …...