当前位置: 首页 > news >正文

Spark实时(六):Output Sinks案例演示

文章目录

Output Sinks案例演示

一、​​​​​​​File sink

二、​​​​​​​​​​​​​​Memory Sink

三、​​​​​​​​​​​​​​Foreach Sink

1、​​​​​​​foreachBatch

2、​​​​​​​​​​​​​​foreach


Output Sinks案例演示

当我们对流式数据处理完成之后,可以将数据写出到Flie、Kafka、console控制台、memory内存,或者直接使用foreach做个性化处理。关于将数据结果写出到Kafka在StructuredStreaming与Kafka整合部分再详细描述。

对于一些可以保证端到端容错的sink输出,需要指定checkpoint目录来写入数据信息,指定的checkpoint目录可以是HDFS中的某个路径,设置checkpoint可以通过SparkSession设置也可以通过DataStreamWriter设置,设置方式如下:

//通过SparkSession设置checkpoint
spark.conf.set("spark.sql.streaming.checkpointLocation","hdfs://mycluster/checkpintdir")或者//通过DataStreamWriter设置checkpoint
df.writeStream.format("xxx").option("checkpointLocation","./checkpointdir").start()

checkpoint目录中会有以下目录及数据:

  • offsets:记录偏移量目录,记录了每个批次的偏移量。
  • commits:记录已经完成的批次,方便重启任务检查完成的批次与offset批次做对比,继续offset消费数据,运行批次。
  • metadata:metadata元数据保存jobid信息。
  • sources:数据源各个批次读取详情。
  • sinks:数据sink写出批次情况。
  • state:记录状态值,例如:聚合、去重等场景会记录相应状态,会周期性的生成snapshot文件记录状态。

下面对File、memoery、foreach output Sink进行演示。

一、​​​​​​​​​​​​​​File sink

Flie Sink就是数据结果实时写入到执行目录下的文件中,每次写出都会形成一个新的文件,文件格式可以是parquet、orc、json、csv格式。

Scala代码如下:

package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}/***  读取Socket数据,将数据写入到csv文件*/
object FileSink {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("File Sink").config("spark.sql.shuffle.partitions", 1).getOrCreate()val result: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()val query: StreamingQuery = result.writeStream.format("csv").option("path", "./dataresult/csvdir").option("checkpointLocation","./checkpint/dir3").start()query.awaitTermination()}
}

 ​​​​​​​

在socket中输入数据之后,每批次数据写入到一个csv文件中。 

二、​​​​​​​​​​​​​​Memory Sink

memory Sink是将结果作为内存表存储在内存中,支持Append和Complete输出模式,这种结果写出到内存表方式多用于测试,如果数据量大要慎用。另外查询结果表中数据时需要写一个循环每隔一段时间读取内存中的数据。

Scala代码如下:

package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery/***  读取scoket 数据写入memory 内存,再读取*/
object MemorySink {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("Memory Sink").config("spark.sql.shuffle.partitions", 1).getOrCreate()spark.sparkContext.setLogLevel("Error")val result: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()val query: StreamingQuery = result.writeStream.format("memory").queryName("mytable").start()//查询内存中表数据while(true){Thread.sleep(2000)spark.sql("""|select * from mytable""".stripMargin).show()}query.awaitTermination()}}

三、​​​​​​​​​​​​​​Foreach Sink

foreach 可以对输出的结果数据进行自定义处理逻辑,针对结果数据自定义处理逻辑数据除了有foreach之外还有foreachbatch,两者区别是foreach是针对一条条的数据进行自定义处理,foreachbatch是针对当前小批次数据进行自定义处理。

1、​​​​​​​foreachBatch

foreachBatch可以针对每个批次数据进行自定义处理,该方法需要传入一个函数,函数有2个参数,分别为当前批次数据对应的DataFrame和当前batchId。

案例:实时读取socket数据,将结果批量写入到mysql中。

Scala代码如下:

package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/***  读取Socket 数据,将数据写出到mysql中*/
object ForeachBatchTest {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("ForeachBatch Sink").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import spark.implicits._val df: DataFrame = spark.readStream.format("socket").option("host", "node2").option("port", 9999).load()val personDF: DataFrame = df.as[String].map(line => {val arr: Array[String] = line.split(",")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")val query: StreamingQuery = personDF.writeStream.foreachBatch((batchDF: DataFrame, batchId: Long) => {println("batchID : " + batchId)batchDF.write.mode(SaveMode.Append).format("jdbc").option("url","jdbc:mysql://node3:3306/testdb?useSSL=false").option("user","root").option("password","123456").option("dbtable","person").save()}).start()query.awaitTermination();}}

运行结果: 

Java代码如下:

 

package com.lanson.structuredStreaming.sink;import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;public class ForeachBatchTest01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {SparkSession spark = SparkSession.builder().master("local").appName("ForeachBatchTest01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> result = spark.readStream().format("socket").option("host", "node2").option("port", 9999).load().as(Encoders.STRING()).map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {@Overridepublic Tuple3<Integer, String, Integer> call(String line) throws Exception {String[] arr = line.split(",");return new Tuple3<>(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT())).toDF("id", "name", "age");result.writeStream().foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {@Overridepublic void call(Dataset<Row> df, Long batchId) throws Exception {System.out.println("batchID : "+batchId);//将df 保存到mysqldf.write().format("jdbc").mode(SaveMode.Append).option("url","jdbc:mysql://node3:3306/testdb?useSSL=false" ).option("user","root" ).option("password","123456" ).option("dbtable","person" ).save();}}).start().awaitTermination();}
}

运行结果:

 

在mysql中创建testdb库,并创建person表,这里也可以不创建表:

create database testdb;
create table person(id int(10),name varchar(255),age int(2));
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29

mysql结果如下:

2、​​​​​​​​​​​​​​foreach

foreach可以针对数据结果每条数据进行处理。

案例:实时读取socket数据,将结果一条条写入到mysql中。

Scala代码如下:

package com.lanson.structuredStreaming.sinkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.sql.execution.streaming.sources.ForeachWrite
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}object ForeachSinkTest {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("ForeachBatch Sink").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()spark.sparkContext.setLogLevel("Error")import spark.implicits._val df: DataFrame = spark.readStream.format("socket").option("host", "node2").option("port", 9999).load()val personDF: DataFrame = df.as[String].map(line => {val arr: Array[String] = line.split(",")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")personDF.writeStream.foreach(new ForeachWriter[Row]() {var  conn: Connection  = _var pst: PreparedStatement = _//打开资源override def open(partitionId: Long, epochId: Long): Boolean = {conn = DriverManager.getConnection("jdbc:mysql://node3:3306/testdb?useSSL=false","root","123456")pst = conn.prepareStatement("insert into person values (?,?,?)")true}//一条条处理数据override def process(row: Row): Unit = {val id: Int = row.getInt(0)val name: String = row.getString(1)val age: Int = row.getInt(2)pst.setInt(1,id)pst.setString(2,name)pst.setInt(3,age)pst.executeUpdate()}//关闭释放资源override def close(errorOrNull: Throwable): Unit = {pst.close()conn.close()}}).start().awaitTermination()}}

运行结果:

Java代码如下:

package com.lanson.structuredStreaming.sink;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;public class ForeachSinkTest01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> result = spark.readStream().format("socket").option("host", "node2").option("port", 9999).load().as(Encoders.STRING()).map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {@Overridepublic Tuple3<Integer, String, Integer> call(String line) throws Exception {String[] arr = line.split(",");return new Tuple3<>(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT())).toDF("id", "name", "age");result.writeStream().foreach(new ForeachWriter<Row>() {Connection conn;PreparedStatement pst ;@Overridepublic boolean open(long partitionId, long epochId) {try {conn = DriverManager.getConnection("jdbc:mysql://node3:3306/testdb?useSSL=false", "root", "123456");pst = conn.prepareStatement("insert into person values (?,?,?)");} catch (SQLException e) {e.printStackTrace();}return true;}@Overridepublic void process(Row row) {int id = row.getInt(0);String name = row.getString(1);int age = row.getInt(2);try {pst.setInt(1,id );pst.setString(2,name );pst.setInt(3,age );pst.executeUpdate();} catch (SQLException e) {e.printStackTrace();}}@Overridepublic void close(Throwable errorOrNull) {try {pst.close();conn.close();} catch (SQLException e) {e.printStackTrace();}}}).start().awaitTermination();}
}

运行

以上代码编写完成后,清空mysql person表数据,然后输入以下数据:

1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29

mysql结果如下:


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

相关文章:

Spark实时(六):Output Sinks案例演示

文章目录 Output Sinks案例演示 一、​​​​​​​File sink 二、​​​​​​​​​​​​​​Memory Sink 三、​​​​​​​​​​​​​​Foreach Sink 1、​​​​​​​foreachBatch 2、​​​​​​​​​​​​​​foreach Output Sinks案例演示 当我们对流式…...

在SQL编程中DROP、DELETE和TRUNCATE的区别

在SQL编程中&#xff0c;DROP、DELETE和TRUNCATE都是用于删除数据的命令&#xff0c;但它们之间有着显著的区别&#xff0c;主要体现在它们删除数据的范围、操作的不可逆性、对表结构的影响、性能以及事务日志的影响上。 DROP: 作用&#xff1a;DROP命令用于删除整个表及其所有…...

【AI大模型】Prompt 提示词工程使用详解

目录 一、前言 二、Prompt 提示词工程介绍 2.1 Prompt提示词工程是什么 2.1.1 Prompt 构成要素 2.2 Prompt 提示词工程有什么作用 2.2.1 Prompt 提示词工程使用场景 2.3 为什么要学习Prompt 提示词工程 三、Prompt 提示词工程元素构成与操作实践 3.1 前置准备 3.2 Pro…...

学习记录day18——数据结构 算法

算法的相关概念 程序 数据结构 算法 算法是程序设计的灵魂&#xff0c;结构式程序设计的肉体 算法&#xff1a;计算机解决问题的方法护额步骤 算法的特性 1、确定性&#xff1a;算法中每一条语句都有确定的含义&#xff0c;不能模棱两可 2、有穷性&#xff1a;程序执行一…...

一篇文章带你学完Java所有的时间与日期类

目录 一、传统时间与日期类 1.Date类 构造方法 获取日期和时间信息的方法 设置日期和时间信息的方法 2.Calendar类 主要特点和功能 常用方法 1. 获取当前日历对象 2. 获取日历中的某个信息 3. 获取日期对象 4. 获取时间毫秒值 5. 修改日历的某个信息 6. 为某个信息增…...

利用GPT4o Captcha工具和AI技术全面识别验证码

利用GPT4o Captcha工具和AI技术全面识别验证码 &#x1f9e0;&#x1f680; 摘要 GPT4o Captcha工具是一款命令行工具&#xff0c;通过Python和Selenium测试各种类型的验证码&#xff0c;包括拼图、文本、复杂文本和reCAPTCHA&#xff0c;并使用OpenAI GPT-4帮助解决验证码问…...

大学生算法高等数学学习平台设计方案 (第一版)

目录 目标用户群体的精准定位 初阶探索者 进阶学习者 资深研究者 功能需求的深度拓展 个性化学习路径定制 概念图谱构建 公式推导展示 交互式问题解决系统 新功能和创新点的引入 虚拟教室环境 数学建模工具集成 算法可视化平台 学术论文资源库 技术实现的前瞻性…...

机器学习算法与Python实战 | 两行代码即可应用 40 个机器学习模型--lazypredict 库!

本文来源公众号“机器学习算法与Python实战”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;两行代码即可应用 40 个机器学习模型 今天和大家一起学习使用 lazypredict 库&#xff0c;我们可以用一行代码在我们的数据集上实现许多…...

使用WebSocket协议调用群发方法将消息返回客户端页面

目录 一.C/S架构&#xff1a; 二.Http协议与WebSocket协议的区别&#xff1a; 1.Http协议与WebSocket协议的区别&#xff1a; 2.WebSocket协议的使用场景&#xff1a; 三.项目实际操作&#xff1a; 1.导入依赖&#xff1a; 2.通过WebSocket实现页面与服务端保持长连接&a…...

【北京迅为】《i.MX8MM嵌入式Linux开发指南》-第三篇 嵌入式Linux驱动开发篇-第五十七章 Linux中断实验

i.MX8MM处理器采用了先进的14LPCFinFET工艺&#xff0c;提供更快的速度和更高的电源效率;四核Cortex-A53&#xff0c;单核Cortex-M4&#xff0c;多达五个内核 &#xff0c;主频高达1.8GHz&#xff0c;2G DDR4内存、8G EMMC存储。千兆工业级以太网、MIPI-DSI、USB HOST、WIFI/BT…...

每日一题~961div2A+B+C(阅读题,思维,数学log)

A 题意&#xff1a;给你 n*n 的表格和k 个筹码。每个格子上至多放一个 问至少占据多少对角线。 显然&#xff0c;要先 格数的多的格子去放。 n n-1 n-2 …1 只有n 的是一个&#xff08;主对角线&#xff09;&#xff0c;其他的是两个。 #include <bits/stdc.h> using na…...

Fireflyrk3288 ubuntu18.04添加Qt开发环境、安装mysql-server

1、创建一台同版本的ubuntu18.04的虚拟机 2、下载rk3288_ubuntu_18.04_armhf_ext4_v2.04_20201125-1538_DESKTOP.img 3、创建空img镜像容器 dd if/dev/zero ofubuntu_rootfs.img bs1M count102404、将该容器格式化成ext4文件系统 mkfs.ext4 ubuntu_rootfs.img5、将该镜像文件…...

简化mybatis @Select IN条件的编写

最近从JPA切换到Mybatis&#xff0c;使用无XML配置&#xff0c;Select注解直接写到interface上&#xff0c;发现IN条件的编写相当麻烦。 一般得写成这样&#xff1a; Select({"<script>","SELECT *", "FROM blog","WHERE id IN&quo…...

Windows图形界面(GUI)-MFC-C/C++ - Control

公开视频 -> 链接点击跳转公开课程博客首页 -> ​​​链接点击跳转博客主页 目录 Control 资源编辑器 添加控件 设置控件属性 添加控件变量 添加消息处理 处理控件事件 控件焦点顺序 Control 资源编辑器 资源编辑器&#xff1a;用于可视化地编辑对话框和控件。…...

SQL Server数据库安全:策略制定与实践指南

SQL Server数据库安全&#xff1a;策略制定与实践指南 在当今数字化时代&#xff0c;数据安全是每个组织的核心关注点。SQL Server作为广泛使用的关系型数据库管理系统&#xff0c;提供了一套强大的安全特性来保护存储的数据。制定有效的数据库安全策略是确保数据完整性、可用…...

Spring Boot入门指南:留言板

一.留言板 1.输⼊留⾔信息,点击提交.后端把数据存储起来. 2.⻚⾯展⽰输⼊的表⽩墙的信息 规范&#xff1a; 1.写一个类MessageInfo对象&#xff0c;添加构造方法 虽然有快捷键&#xff0c;但是还是不够偷懒 项目添加Lombok。 Lombok是⼀个Java⼯具库&#xff0c;通过添加注…...

Docker 中安装和配置带用户名和密码保护的 Elasticsearch

在 Docker 中安装和配置带用户名和密码保护的 Elasticsearch 需要以下步骤。Elasticsearch 的安全功能&#xff08;包括基本身份验证&#xff09;在默认情况下是启用的&#xff0c;但在某些版本中可能需要手动配置。以下是详细步骤&#xff0c;包括如何设置用户名和密码。 1. …...

面试官:说说JVM内存调优及内存结构

1. JVM简介 JVM&#xff08;Java虚拟机&#xff09;是运行Java程序的平台&#xff0c;它使得Java能够跨平台运行。JVM负责内存的自动分配和回收&#xff0c;减轻了程序员的负担。 2. JVM内存结构 运行时数据区是JVM中最重要的部分&#xff0c;包含多个内存区域&#xff1a; …...

Ansible的脚本-----playbook剧本【下】

目录 实战演练六&#xff1a;tags 模块 实战演练七&#xff1a;Templates 模块 实战演练六&#xff1a;tags 模块 可以在一个playbook中为某个或某些任务定义“标签”&#xff0c;在执行此playbook时通过ansible-playbook命令使用--tags选项能实现仅运行指定的tasks。 playboo…...

Mysql开启远程控制简化版,亲测有效

首先关闭防火墙 改表法 打开上图的CMD&#xff0c;输入密码进入&#xff0c;然后输入一下指令 1.use mysql; 2.update user set host % where user root;//更新root用户的权限&#xff0c;允许任何主机连接 3.FLUSH PRIVILEGES;//刷新权限&#xff0c;使更改生效 具体参考…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议&#xff08;EPSFD 2025&#xff09;将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会&#xff0c;EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

可靠性+灵活性:电力载波技术在楼宇自控中的核心价值

可靠性灵活性&#xff1a;电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中&#xff0c;电力载波技术&#xff08;PLC&#xff09;凭借其独特的优势&#xff0c;正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据&#xff0c;无需额外布…...

测试markdown--肇兴

day1&#xff1a; 1、去程&#xff1a;7:04 --11:32高铁 高铁右转上售票大厅2楼&#xff0c;穿过候车厅下一楼&#xff0c;上大巴车 &#xffe5;10/人 **2、到达&#xff1a;**12点多到达寨子&#xff0c;买门票&#xff0c;美团/抖音&#xff1a;&#xffe5;78人 3、中饭&a…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

C# SqlSugar:依赖注入与仓储模式实践

C# SqlSugar&#xff1a;依赖注入与仓储模式实践 在 C# 的应用开发中&#xff0c;数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护&#xff0c;许多开发者会选择成熟的 ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;SqlSugar 就是其中备受…...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”

2025年#高考 将在近日拉开帷幕&#xff0c;#AI 监考一度冲上热搜。当AI深度融入高考&#xff0c;#时间同步 不再是辅助功能&#xff0c;而是决定AI监考系统成败的“生命线”。 AI亮相2025高考&#xff0c;40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕&#xff0c;江西、…...

2025季度云服务器排行榜

在全球云服务器市场&#xff0c;各厂商的排名和地位并非一成不变&#xff0c;而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势&#xff0c;对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析&#xff1a; 一、全球“三巨头”…...

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...