Kafka模拟器产生数据仿真-集成StructuredStreaming做到”毫秒“级实时响应StreamData落地到mysql
这是仿真过程某图:
kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收
数据落地情况:

全部接收到并all存入mysql
下面就简单分享一下StructuredStreaming代码吧
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.streaming.{ OutputMode, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}val spark: SparkSession = SparkSession.builder().appName("kafkaConsumer").master("local[3]").getOrCreate()import spark.implicits._// 定义json字段类型格式val Jsonschmea: StructType = new StructType().add("id", dataType = IntegerType).add("name", dataType = StringType).add("sorce", dataType = IntegerType)val message: DataFrame = spark.readStream // message为从kafka读到的原数据.format("kafka").option("kafka.bootstrap.servers", "xxxxx:9092,xxxx:9092,xxxx:9092").option("subscribe", "xxxx").option("startingOffsets", "latest").load()// 将json字符串转化为结构化数据val streamData: DataFrame = message.selectExpr("cast(value as String) as message") .select(from_json($"message", Jsonschmea).alias("data"))// 将json结构化为新的df// 预加载mysql驱动// 实时写入 第二个参数预占位,want给每一批次加入唯一表示, but本次仅占位没有传参数def writeToMysql(batchDF: DataFrame, epochId: Long): Unit = {val sqlurl = "jdbc:mysql://localhost:xxxx/xxxx"val sqluser = "xxxx"val sqlpass = "xxxxx"Class.forName("com.mysql.cj.jdbc.Driver") // mysql 8.0后得驱动,旧版本去掉cjbatchDF.foreachPartition {partitionOfRecords =>val connection = DriverManager.getConnection(sqlurl, sqluser, sqlpass)// 关闭自动提交以支持增量写入connection.setAutoCommit(false)// 创建预编译的插入语句val insertsql = "insert into jsonstream(id,name,sorce) values(?,?,?)"val preparedStatement = connection.prepareStatement(insertsql)partitionOfRecords.foreach {row =>
// val id = row.getAs[Int]("data.id")
// val name = row.getAs[String]("data.name")
// val score = row.getAs[Int]("data.sorce")val id = row.getAs[Row]("data").getAs[Int]("id")val name = row.getAs[Row]("data").getAs[String]("name")val sorce = row.getAs[Row]("data").getAs[Int]("sorce")// 设置参数到预处理sql函数中preparedStatement.setInt(1, id)preparedStatement.setString(2, name)preparedStatement.setInt(3, sorce)// 执行添加到批次操作preparedStatement.addBatch()}preparedStatement.executeBatch()connection.commit() // 执行批处理后手动提交事务preparedStatement.close() // 手动GCconnection.close()}}// 数据落地到数据库streamData.writeStream.outputMode(OutputMode.Append()).foreachBatch(writeToMysql _).trigger(Trigger.ProcessingTime("1 millisecond")) // 1 毫秒每个batch.start().awaitTermination()
存储按照一定批次量做存储
友情提示 : 上述程序是经过脱敏处理的哦
----彩蛋----
如果你看到者你会知道scala在11更新之后也就是12版本如下:
batchDF.foreachPartition {partitionOfRecords => ... 这个位置
Dataset的foreachPartition 里面不能处理 Row的Iterator, 所以需要转为rdd在做处理
所以更改后为
batchDF.rdd.foreachPartition { partitionOfRecords => ...
而且这里不能用foreach , 否则无法序列化就能存储到mysql, 不能被序列化的数据是不能在网络中进行传输的,通过二进制流的形式传出,在被反序列化回来转化为对象的形式存储
ok -----
相关文章:
Kafka模拟器产生数据仿真-集成StructuredStreaming做到”毫秒“级实时响应StreamData落地到mysql
这是仿真过程某图: 仿真实战kafka kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收 数据落地情况: 全部接收到并all存入mysql 下面就简单分享一下StructuredStreaming代码吧 import org.apache.spark.sql.function…...
IDEA如何删除git最新一次远程提交
IDEA如何删除git最新一次远程提交 选择应用 -> Git -> Show History 选择最新提交上一次提交 -> Reset Current Branch to Here… Reset 提示框选择 Hard push到远程分支 -> 选择Force Push 结果验证 (最新分支已被删除)...
什么是单向数据流
单向数据流是一种数据流动的模式,通常用于前端框架(如 React、Vue 等)中。在单向数据流中,数据只能从一个方向流向另一个方向,不会出现数据的双向流动。这种模式有助于简化数据的管理和状态的维护,提高代码…...
Qt 线程池 QThreadPool
一.Qt 线程池 QThreadPool介绍 Qt线程池是一种管理多个线程的并发编程模型,通过使用线程池可以提高性能、控制并发度、提供任务队列和简化线程管理。 在Qt中,线程池的使用主要涉及以下几个步骤: 创建任务类:需要定义一个任务类&am…...
【兔子机器人】实现从初始状态到站立
一、遥想星空up主的方法 由于我有卡位结构,无法做到劈腿,而且底盘也不一样,无法使用此方法 但是其代码思想是可以借鉴的。 参考视频: 【【开源啦!】无刷轮腿平衡机器人】 【精准空降到 01:16】 https://www.bilibili…...
ImportError: cannot import name ‘open_filename‘ from ‘pdfminer.utils‘已搞定
报错内容 ImportError: cannot import name ‘open_filename’ from ‘pdfminer.utils’ 第一步:pip uninstall pdfminer 解决办法 pip3 install pdfminer.six注意不要 pip install pdfminer.six是安装不了的...
一文解决Word中公式插入问题(全免费/latex公式输入/texsWord)
分文不花,搞定你的word公式输入/texsWord完全使用指南 背景 碎碎念:折折腾腾至少装了几个小时,遇到了若干大坑。遇到的问题网上都搜索不到答案!!!就让我来当指路的小火柴吧。 本篇适用于在word中输入la…...
C语言实战——扫雷游戏
目录 1. 扫雷游戏分析和设计2.扫雷游戏的代码实现 1. 扫雷游戏分析和设计 1.1扫雷游戏的功能说明 使用控制台实现经典的扫雷游戏游戏可以通过菜单实现继续玩或者退出游戏扫雷的棋盘是9*9的格子默认随机布置10个雷可以排查雷 如果位置不是雷,就显示周围有几个雷 如果…...
.Net使用ElasticSearch
文章目录 前言主体内容一.Kibana中ElasticSearch的基础操作1.GET(查询)1.POST(新增)1.PUT(修改)1.DELET(删除) 二.在.Net中,对ElasticSearch进行基础操作1.DotNet连接Ela…...
HTML5、CSS3面试题(二)
上一章:HTML5、CSS3面试题(一) 哪些是块级元素那些是行内元素,各有什么特点 ?(必会) 行内元素: a、span、b、img、strong、input、select、lable、em、button、textarea 、selecting 块级元素࿱…...
sqllab第十一关通关笔记
知识点: 发现登录框就可以尝试注入登录框一般都是字符型注入通过注入可以获取其他表的信息绕过手段 单引号闭合联合注入也可以进行错误注入 首先看界面是一个登录框;通过admin admin登录进去,发现页面会把用户名和密码的登录信息打印出来&am…...
机械女生,双非本985硕,目前学了C 基础知识,转嵌入式还是java更好?
作为单片机项目开发的卖课佬,个人建议,先转嵌入式单片机开发方向,哈哈。 java我也学过,还学过oracle、mysql数据库,只是当时没做笔记,找不好充分的装逼证据了。 从实习通过业余时间,学到快正式毕…...
Python之字符串操作大全(29种方法)
本章详细介绍了常用的29种字符串操作方法及代码示例。 1. 重复输出字符串 print(x * 20) 输出:xxxxxxxxxxxxxxxxxxxx 2. 通过索引获取字符串 print(hello world[2:5]) 输出:llo 3. in 判断字符是否在字符串内 print(e in hello world) 输出&…...
ArcGIS学习(十五)用地适宜性评价
ArcGIS学习(十五)用地适宜性评价 本任务给大家带来的内容是用地适宜性评价。 用地适宜性评价是大家在平时工作中最常接触到的分析场景之一。尤其是在国土空间规划的大背景下,用地适宜性评价变得越来越重要。 此外,我们之前的任务主要是使用矢量数据进行分析。本案例是主讲…...
【matlab】如何将.mat文件与.nii文件互转
【matlab】如何将.mat文件与.nii文件互转 .mat转为.nii文件 有时候代码需要读取的是.nii文件,但是如何现有的数据是.mat格式,需要将.mata转化为.nii文件 1、先加载.mat文件 % 加载.mat文件 load(your_mat_file.mat); % 请将your_mat_file.mat替换为实…...
Uni-app开发Canvas当子组件示例,点点绘制图形
前言 使用Uni-app 实现封装一个Canvas渲染的子组件,实现通过传入两点绘制一条完整的路程 具体逻辑看我发的后端和数据库设计 C# 根据两点名称,寻找两短路程的最优解,【有数据库设计,完整代码】 即使不了解具体逻辑,该…...
从金蝶云星空到钉钉通过接口配置打通数据
从金蝶云星空到钉钉通过接口配置打通数据 对接系统金蝶云星空 金蝶K/3Cloud(金蝶云星空)是移动互联网时代的新型ERP,是基于WEB2.0与云技术的新时代企业管理服务平台。金蝶K/3Cloud围绕着“生态、人人、体验”,旨在帮助企业打造面…...
Unreal发布Android在刘海屏手机上不能全屏显示问题
Unreal 4.27发布Android在刘海屏手机上不能全屏显示问题 Android设置全屏刘海屏全屏设置4.27设置刘海屏在部分手机不能显示问题 Android设置全屏 AndroidManifest.xml文件配置 ...<activity android:name"com.epicgames.ue4.GameActivity" android:label"st…...
hive库表占用空间大小的命令
1、查每个hive表占用的空间大小 hdfs dfs -du -h /user/hive/warehouse 2、按占用空间大小降序排列 hdfs dfs -du /user/hive/warehouse/ipms.db | sort -nr 3、查某一个分区占用空间大小(单位G) hadoop fs -ls /user/hive/warehouse/ipms.db/dw_ft_se_nt_u_gen…...
关于go中的select
笔记仓库:gitee.com/xiaoyinhui 代码中的解释纯个人理解,有不对的望指出 package testsimport ("fmt""testing" )var uCnt int 0func TestSelece(t *testing.T) {// 对于 select 语句,在进入该语句时,会按源…...
wordpress后台更新后 前端没变化的解决方法
使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…...
【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15
缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下: struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...
Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...
Robots.txt 文件
什么是robots.txt? robots.txt 是一个位于网站根目录下的文本文件(如:https://example.com/robots.txt),它用于指导网络爬虫(如搜索引擎的蜘蛛程序)如何抓取该网站的内容。这个文件遵循 Robots…...
ElasticSearch搜索引擎之倒排索引及其底层算法
文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
力扣-35.搜索插入位置
题目描述 给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...
倒装芯片凸点成型工艺
UBM(Under Bump Metallization)与Bump(焊球)形成工艺流程。我们可以将整张流程图分为三大阶段来理解: 🔧 一、UBM(Under Bump Metallization)工艺流程(黄色区域ÿ…...
