当前位置: 首页 > news >正文

Spark(39):Streaming DataFrame 和 Streaming DataSet 输出

目录

0. 相关文章链接

1. 输出的选项

2. 输出模式(output mode)

2.1. Append 模式(默认)

2.2. Complete 模式

2.3. Update 模式

2.4. 输出模式总结

3. 输出接收器(output sink)

3.1. file sink

3.2. kafka sink

3.2.1. 以 Streaming 方式输出数据

3.2.2. 以 batch 方式输出数据

3.3. console sink

3.4. memory sink

3.5. foreach sink

3.6. ForeachBatch Sink


0. 相关文章链接

 Spark文章汇总 

1. 输出的选项

一旦定义了最终结果DataFrame / Dataset,剩下的就是开始流式计算。为此,必须使用返回的 DataStreamWriter Dataset.writeStream()。

需要指定一下选项:

  • 输出接收器的详细信息:数据格式,位置等。
  • 输出模式:指定写入输出接收器的内容。
  • 查询名称:可选,指定查询的唯一名称以进行标识。
  • 触发间隔:可选择指定触发间隔。如果未指定,则系统将在前一处理完成后立即检查新数据的可用性。如果由于先前的处理尚未完成而错过了触发时间,则系统将立即触发处理。
  • 检查点位置:对于可以保证端到端容错的某些输出接收器,请指定系统写入所有检查点信息的位置。这应该是与HDFS兼容的容错文件系统中的目录。

2. 输出模式(output mode)

2.1. Append 模式(默认)

        默认输出模式, 仅仅添加到结果表的新行才会输出。采用这种输出模式, 可以保证每行数据仅输出一次。在查询过程中, 如果没有使用 watermask 机制, 则不能使用聚合操作。 如果使用了 watermask 机制, 则只能使用基于 event-time 的聚合操作。watermask 用于高速 append 模式如何输出不会再发生变动的数据。 即只有过期的聚合结果才会在 Append 模式中被“有且仅有一次”的输出。

2.2. Complete 模式

每次触发, 整个结果表的数据都会被输出。 仅仅聚合操作才支持。同时该模式使用 watermask 无效。

2.3. Update 模式

        该模式在 从 spark 2.1.1 可用. 在处理完数据之后, 该模式只输出相比上个批次变动的内容(新增或修改)。如果没有聚合操作, 则该模式与 append 模式一样。如果有聚合操作, 则可以基于 watermast 清理过期的状态。

2.4. 输出模式总结

不同的查询支持不同的输出模式

3. 输出接收器(output sink)

spark 提供了几个内置的 output-sink,不同 output sink 所适用的 output mode 不尽相同:

SinkSupported Output ModesOptionsFault-tolerantNotes
File SinkAppendpath: path to the output directory, must be specified. For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for “parquet” format options see DataFrameWriter.parquet()Yes (exactly-once)Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka SinkAppend, Update, CompleteSee the Kafka Integration GuideYes (at-least-once)More details in the Kafka Integration Guide
Foreach SinkAppend, Update, CompleteNoneDepends on ForeachWriter implementationMore details in the next section
ForeachBatch SinkAppend, Update, CompleteNoneDepends on the implementationMore details in the next section
Console SinkAppend, Update, CompletenumRows: Number of rows to print every trigger (default: 20) truncate: Whether to truncate the output if too long (default: true)No
Memory SinkAppend, CompleteNoneNo. But in Complete Mode, restarted query will recreate the full table.Table name is the query name.

3.1. file sink

存储输出到目录中 仅仅支持 append 模式

需求: 把单词和单词的反转组成 json 格式写入到目录中。

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 设置数据源,并接收数据val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 数据计算val words: DataFrame = lines.as[String].flatMap((line: String) => {line.split("\\W+").map((word: String) => {(word, word.reverse)})}).toDF("原单词", "反转单词")// 结果输出words.writeStream.outputMode("append").format("json") // 支持 "orc", "json", "csv".option("path", "./filesink") // 输出目录.option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录.start.awaitTermination()// 关闭执行环境spark.stop()}
}

输出的数据:

{"原单词":"abc","反转单词":"cba"}

3.2. kafka sink

将 wordcount 结果写入到 kafka

写入到 kafka 的时候应该包含如下列:

ColumnType
key (optional)string or binary
value (required)string or binary
topic (optional)string

注意:

  • 如果没有添加 topic option 则 topic 列必须有.
  • kafka sink 三种输出模式都支持

3.2.1. 以 Streaming 方式输出数据

这种方式使用流的方式源源不断的向 kafka 写入数据:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 设置数据源,并接收数据val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 数据计算val words = lines.as[String].flatMap((_: String).split("\\W+")).groupBy("value").count().map((row: Row) => row.getString(0) + "," + row.getLong(1)).toDF("value") // 写入数据时候, 必须有一列 "value"words.writeStream.outputMode("update").format("kafka").trigger(Trigger.ProcessingTime(0)).option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092") // kafka 配置.option("topic", "update") // kafka 主题.option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录.start.awaitTermination()// 关闭执行环境spark.stop()}
}

3.2.2. 以 batch 方式输出数据

这种方式输出离线处理的结果, 将已存在的数据分为若干批次进行处理. 处理完毕后程序退出:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._val wordCount: DataFrame = spark.sparkContext.parallelize(Array("hello hello abc", "abc, hello")).toDF("word").groupBy("word").count().map(row => row.getString(0) + "," + row.getLong(1)).toDF("value") // 写入数据时候, 必须有一列 "value"wordCount.write // batch 方式.format("kafka").option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092") // kafka 配置.option("topic", "update") // kafka 主题.save()// 关闭执行环境spark.stop()}
}

3.3. console sink

主要用于测试数据输出

3.4. memory sink

该 sink 也是用于测试, 将其统计结果全部输入内中指定的表中, 然后可以通过 sql 与从表中查询数据。

如果数据量非常大, 可能会发送内存溢出:

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestamp
import java.util.{Timer, TimerTask}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 设置数据源,并接收数据val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).loadval words: DataFrame = lines.as[String].flatMap((_: String).split("\\W+")).groupBy("value").count()val query: StreamingQuery = words.writeStream.outputMode("complete").format("memory") // memory sink.queryName("word_count") // 内存临时表名.start// 测试使用定时器执行查询表val timer: Timer = new Timer(true)val task: TimerTask = new TimerTask {override def run(): Unit = spark.sql("select * from word_count").show}timer.scheduleAtFixedRate(task, 0, 2000)query.awaitTermination()// 关闭执行环境spark.stop()}
}

3.5. foreach sink

foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出;把 wordcount 数据写入到 mysql。

注意(需要在依赖中添加MySQL的驱动依赖):

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
</dependency>

建表语句如下所示:

create database ss;
use ss;
create table word_count
(word  varchar(255) primary key not null,count bigint                   not null
);

代码示例如下:

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import java.util.{Timer, TimerTask}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 设置数据源,并接收数据val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).loadval wordCount: DataFrame = lines.as[String].flatMap((_: String).split("\\W+")).groupBy("value").count()val query: StreamingQuery = wordCount.writeStream.outputMode("update")// 使用 foreach 的时候, 需要传递ForeachWriter实例, 三个抽象方法需要实现. 每个批次的所有分区都会创建 ForeeachWriter 实例.foreach(new ForeachWriter[Row] {var conn: Connection = _var ps: PreparedStatement = _var batchCount = 0// 一般用于 打开链接. 返回 false 表示跳过该分区的数据,override def open(partitionId: Long, epochId: Long): Boolean = {println("open ..." + partitionId + "  " + epochId)Class.forName("com.mysql.jdbc.Driver")conn = DriverManager.getConnection("jdbc:mysql://hadoop201:3306/ss", "root", "aaa")// 插入数据, 当有重复的 key 的时候更新val sql = "insert into word_count values(?, ?) on duplicate key update word=?, count=?"ps = conn.prepareStatement(sql)conn != null && !conn.isClosed && ps != null}// 把数据写入到连接override def process(value: Row): Unit = {println("process ...." + value)val word: String = value.getString(0)val count: Long = value.getLong(1)ps.setString(1, word)ps.setLong(2, count)ps.setString(3, word)ps.setLong(4, count)ps.execute()}// 用户关闭连接override def close(errorOrNull: Throwable): Unit = {println("close...")ps.close()conn.close()}}).startquery.awaitTermination()// 关闭执行环境spark.stop()}
}

3.6. ForeachBatch Sink

ForeachBatch Sink 是 spark 2.4 才新增的功能, 该功能只能用于输出批处理的数据。将统计结果同时输出到本地文件和 mysql 中。

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import java.util.{Properties, Timer, TimerTask}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 设置数据源,并接收数据val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).loadval wordCount: DataFrame = lines.as[String].flatMap(_.split("\\W+")).groupBy("value").count()val props: Properties = new Properties()props.setProperty("user", "root")props.setProperty("password", "aaa")val query: StreamingQuery = wordCount.writeStream.outputMode("complete").foreachBatch((df: Dataset[Row], batchId: Long) => { // 当前分区id, 当前批次idif (df.count() != 0) {df.cache()df.write.json(s"./$batchId")df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop201:3306/ss", "word_count", props)}}).start()query.awaitTermination()// 关闭执行环境spark.stop()}
}

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


相关文章:

Spark(39):Streaming DataFrame 和 Streaming DataSet 输出

目录 0. 相关文章链接 1. 输出的选项 2. 输出模式(output mode) 2.1. Append 模式(默认) 2.2. Complete 模式 2.3. Update 模式 2.4. 输出模式总结 3. 输出接收器(output sink) 3.1. file sink 3.2. kafka sink 3.2.1. 以 Streaming 方式输出数据 3.2.2. 以 batch …...

【云原生】Docker 详解(一):从虚拟机到容器

Docker 详解&#xff08;一&#xff09;&#xff1a;从虚拟机到容器 1.虚拟化 要解释清楚 Docker&#xff0c;首先要解释清楚 容器&#xff08;Container&#xff09;的概念。要解释容器的话&#xff0c;就需要从操作系统说起。操作系统太底层&#xff0c;细说的话一两本书都说…...

代码随想录第48天 | 198. 打家劫舍、213. 打家劫舍II、337. 打家劫舍III

198. 打家劫舍 当前房屋偷与不偷取决于 前一个房屋和前两个房屋是否被偷了。 递归五部曲&#xff1a; dp[i]&#xff1a;考虑下标i&#xff08;包括i&#xff09;以内的房屋&#xff0c;最多可以偷窃的金额为dp[i]。决定dp[i]的因素就是第i房间偷还是不偷。 如果偷第i房间&…...

【LeetCode】按摩师

按摩师 题目描述算法分析编程代码 链接: 按摩师 题目描述 算法分析 编程代码 class Solution { public:int massage(vector<int>& nums) {int n nums.size();if(n 0) return 0;vector<int> f(n);auto g f;f[0] nums[0];for(int i 1;i<n;i){f[i] g[i…...

国际腾讯云账号云核算概述!!

云核算概述 维基百科界说&#xff1a;云核算是一种依据互联网的新型核算方法&#xff0c;经过互联网上异构、自治的服务为个人和企业供给按需即取的核算。 云核算描绘的一起特征&#xff1a;云是一种按需运用的服务&#xff0c;运用者只重视服务本身。 云核算作为IT服务形式&am…...

.NET 6.0 重启 IIS 进程池

在 .NET 6.0 中&#xff0c;你可以使用 Microsoft.Web.Administration 命名空间提供的 API 来管理 IIS 进程池并实现重启操作。以下是一个示例代码&#xff0c;展示如何使用 .NET 6.0 中的 Microsoft.Web.Administration 来重启 IIS 进程池&#xff1a; using Microsoft.Web.A…...

一位心理学教师对ChatGPT的看法,提到了正确地使用它的几个要点

在没有自主学习能力和有自主学习能力的两类学生中&#xff0c;ChatGPT的出现&#xff0c;会加大他们在知识学习及思维发展上的鸿沟。爱学习的人会因为AI变得更好…… 从2022年年底起&#xff0c;ChatGPT的技术突破使人类终于进入了一个AI被广泛应用在工作、学习、生活的时代。…...

认识Node.js及三个模块

文章目录 1.初识 Node.js1.1 什么是 Node.js1.2 Node.js 中的 JavaScript 运行环境1.3 Node.js 可以做什么1.4 Node.js 环境的安装1.4.1 区分 LTS 版本和 Current 版本的不同1.4.2 查看已安装的 Node.js 的版本号1.4.3 什么是终端1.4.4 终端中的快捷键 1.5 在 Node.js 环境中执…...

49 | 公司销售数据分析

公司销售数据分析报告 本数据是2012~2014年间一家生产体育类产品的全球销售订单数据,分别按时间、产品类别、销售国家统计产品销售情况,分析销售额和利润额统计各产品市场占有份额,为下一步生产计划提供有价值的建议。 数据大小:88475 行, 11 列 Retailer country销售国…...

Android 项目导入高德SDK初次上手

文章目录 一、前置知识&#xff1a;二、学习目标三、学习资料四、操作过程1、创建空项目2、高德 SDK 环境接入2.1 获取高德 key2.2下载 SDK 并导入2.2.1、下载SDK 文件2.2.2、SDK 导入项目2.2.3、清单文件配置2.2.4、隐私权限 3、显示地图 一、前置知识&#xff1a; 1、Java 基…...

生成树协议用来解决网络风暴的问题?(第三十二课)

生成树协议用来解决网络风暴的问题?(第三十二课) 一 STP RSTP MSTP 介绍 STP(Spanning Tree Protocol)、RSTP(Rapid Spanning Tree Protocol)和MSTP(Multiple Spanning Tree Protocol)都是用于网络中避免环路的协议。 STP是最初的协议,它通过将某些端口阻塞来防止…...

git分支操作

Git分支的操作 1.1 Git分支简介 Git分支是由指针管理起来的&#xff0c;所以创建、切换、合并、删除分支都非常快&#xff0c;非常适合大型项目的开发。 在分支上做开发&#xff0c;调试好了后再合并到主分支。那么每个人开发模块式都不会影响到别人。 分支使用策略&#xf…...

【基础学习笔记 enum】TypeScript 中的 enum 枚举类型介绍

因为之前网上查好多博客都是只说最基础的&#xff0c;所以这里记录一下&#xff0c;最基础的放在最后面。 这里重点要记录的是枚举成员的值可以是字符串&#xff08;字符串枚举&#xff0c;因为网上大部分只介绍常数枚举&#xff09;&#xff0c;需要注意的一点是&#xff0c;…...

SpringBoot中间件使用之EventBus、Metric、CommandLineRunner

1、EventBus 使用EventBus 事件总线的方式可以实现消息的发布/订阅功能&#xff0c;EventBus是一个轻量级的消息服务组件&#xff0c;适用于Android和Java。 // 1.注册事件通过 EventBus.getDefault().register(); // 2.发布事件 EventBus.getDefault().post(“事件内容”); …...

ffmpeg命令行是如何打开vf_scale滤镜的

前言 在ffmpeg命令行中&#xff0c;ffmpeg -i test -pix_fmt rgb24 test.rgb&#xff0c;会自动打开ff_vf_scale滤镜&#xff0c;本章主要追踪这个流程。 通过gdb可以发现其基本调用栈如下&#xff1a; 可以看到&#xff0c;query_formats&#xff08;&#xff09;中创建的v…...

【Vue3】自动引入插件-`unplugin-auto-import`

Vue3自动引入插件-unplugin-auto-import&#xff0c;不必再手动 import 。 自动导入 api 按需为 Vite, Webpack, Rspack, Rollup 和 esbuild 。支持TypeScript。由unplugin驱动。 插件安装&#xff1a;unplugin-auto-import 配置vite.config.ts&#xff08;配置完后需要重启…...

每日温度(力扣)单调栈 JAVA

给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 answer[i] 是指对于第 i 天&#xff0c;下一个更高温度出现在几天后。如果气温在这之后都不会升高&#xff0c;请在该位置用 0 来代替。 示例 1: 输入: temperatur…...

博客项目(Spring Boot)

1.需求分析 注册功能&#xff08;添加用户操纵&#xff09;登录功能&#xff08;查询操作)我的文章列表页&#xff08;查询我的文章|文章修改|文章详情|文章删除&#xff09;博客编辑页&#xff08;添加文章操作&#xff09;所有人博客列表&#xff08;带分页功能&#xff09;…...

修改Jenkins存储目录

注意&#xff1a;在Jenkins运行时是不能更改的. 请先将Jenkins停止运行。 1、windows环境下更改JENKINS的主目录 Windows环境中&#xff0c;Jenkins主目录默认在C:Documents and SettingsAAA.jenkins 。可以通过设置环境变量来修改&#xff0c;例如&#xff1a; JENKINS_HOME…...

数据结构【第4章】——栈与队列

队列是只允许在一端进行插入操作、而在另-端进行删除操作的线性表。 栈 栈与队列&#xff1a;栈是限定仅在表尾进行插入和删除操作的线性表。 我们把允许插入和删除的一端称为栈顶&#xff08;top&#xff09;&#xff0c;另一端称为栈底&#xff08;bottom&#xff09;&…...

网络编程(Modbus进阶)

思维导图 Modbus RTU&#xff08;先学一点理论&#xff09; 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议&#xff0c;由 Modicon 公司&#xff08;现施耐德电气&#xff09;于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...

【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15

缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下&#xff1a; struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...

基于大模型的 UI 自动化系统

基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例

使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件&#xff0c;常用于在两个集合之间进行数据转移&#xff0c;如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model&#xff1a;绑定右侧列表的值&…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

《通信之道——从微积分到 5G》读书总结

第1章 绪 论 1.1 这是一本什么样的书 通信技术&#xff0c;说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号&#xff08;调制&#xff09; 把信息从信号中抽取出来&am…...

C++中string流知识详解和示例

一、概览与类体系 C 提供三种基于内存字符串的流&#xff0c;定义在 <sstream> 中&#xff1a; std::istringstream&#xff1a;输入流&#xff0c;从已有字符串中读取并解析。std::ostringstream&#xff1a;输出流&#xff0c;向内部缓冲区写入内容&#xff0c;最终取…...

今日科技热点速览

&#x1f525; 今日科技热点速览 &#x1f3ae; 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售&#xff0c;主打更强图形性能与沉浸式体验&#xff0c;支持多模态交互&#xff0c;受到全球玩家热捧 。 &#x1f916; 人工智能持续突破 DeepSeek-R1&…...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX&#xff08;不访问内存&#xff09;XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...