全面评测 DOCA 开发环境下的 DPU:性能表现、机器学习与金融高频交易下的计算能力分析
本文介绍了我在 DOCA 开发环境下对 DPU 进行测评和计算能力测试的一些真实体验和记录。在测评过程中,我主要关注了 DPU 在高并发数据传输和深度学习场景下的表现,以及基本的系统性能指标,包括 CPU 计算、内存带宽、多线程/多进程能力和 I/O 性能,并测试了在机器学习应用下的潜在性能。此外,我重点结合了金融高频交易的应用场景,DOCA 展现出了其在低延迟、高吞吐量和高可靠性方面的卓越优势,进一步证明了其在高性能计算和实时数据处理中的广泛应用潜力。
一、测评环境
这是一台装载双端口 DPU 的服务器,操作系统为 Ubuntu 22.04。
我们先来查看CPU信息:
lscpu

可以看到这是一台基于 ARM Cortex-A78AE 内核的 64 位 ARM 平台设备,有16个核心、较为充足的多级缓存、支持高级SIMD和加密指令扩展,并针对一些常见CPU安全漏洞进行了一定程度的缓解。
接着,我们查看设备型号:
mst status -v
mlxconfig -d /dev/mst/mt41692_pciconf0 -e q

可以看到设备具体型号是 NVIDIA BlueField-3 B3220 P-Series FHHL DPU,双端口 QSFP112 接口,支持 200GbE(默认模式)或 NDR200 IB。具有16个 Arm 核心处理器和32GB 板载 DDR 内存,PCIe接口为 Gen5.0 x16。
二、测评目标
这次测评的目标是评估 DOCA 环境下 DPU 的实际性能表现,看它在数据密集型任务、高并发通信及后续可能的深度学习任务中能有怎样的表现。我首先登录到指定的 DPU 服务器,搭建基础开发环境,然后编译运行 DPA All-to-All 应用,观察其运行表现。
为了达到上述目标,我们制定以下测评步骤:
- 通过 SSH 登录 DPU
- 搭建并清理编译环境(Meson、Ninja)
- 安装和检查 MPI 环境 (mpich)
- 构建启用
dpa_all_to_all功能的 DOCA 应用 - 使用 mpirun 测试并观察数据传输性能
- 安装支持 CUDA 的 PyTorch 版本(
pip install torch...) - 使用 Python 脚本进行 CPU、内存、多线程、多进程和 I/O 的性能测试
- 结合 Torch,以后可拓展对深度学习任务的 DPU 加速能力进行评估(本次仅基本测试计算与性能)
三、测评步骤
1. 测评环境构建
首先,通过 SSH 连接到 DPU 服务器,确保具备必要的权限和网络配置。
ssh -p 8889 cqd*****@113.**.***.73
密码: **********




进入应用程序目录,准备开发环境:
cd /opt/mellanox/doca/applications
检查并安装必要的 MPI 库:
dpkg -l | grep mpich

apt-get install mpich

清理之前的构建文件,确保环境整洁:
rm -rf /tmp/build
使用 Meson 构建系统配置项目,启用特定功能:
meson /tmp/build -Denable_all_applications=false -Denable_dpa_all_to_all=true


通过 Ninja 进行编译:
ninja -C /tmp/build
检查 Mellanox 状态,确保硬件正常运行:
mst status -v

这里可以看到我们的双端口 DPU。
2. All-to-All MPI 性能测试
在开始实操之前,我们先来了解一下什么是 All-to-all 。
All-to-all 是一种 MPI(消息传递接口)方法。MPI 是一种标准化且可移植的消息传递标准,旨在在并行计算体系结构上运行。一个 MPI 程序由多个进程并行运行。
其运行示例图如下:

在上图中,每个进程将其本地的发送缓冲区(sendbuf)分成 n 个块(本例中为 4 个块),每个块包含 sendcount 个元素(本例中为 4 个元素)。进程 i 会将其本地发送缓冲区中的第 k 个块发送给进程 k,而进程 k 则将这些数据放置在其本地接收缓冲区(recvbuf)的第 i 个块中。
通过使用 DOCA DPA 来实现 all-to-all 方法,可以将从 srcbuf 复制元素到 recvbufs 的过程卸载给 DPA,从而使 CPU 解放出来,去执行其他计算工作。
下图描述了基于主机的全对全和 DPA 全对全之间的区别。

- 在 DPA all-to-all 中,DPA 线程执行 all-to-all,而 CPU 可以自由地进行其他计算;
- 在基于主机的全对全中,CPU 在某些时候仍必须执行全对全,并且不能完全自由地进行其他计算;
下面我们来实操:
我们使用 mpirun 运行 DPA All-to-All 应用,进行性能测试:
mpirun -np 4 /tmp/build/dpa_all_to_all/doca_dpa_all_to_all -m 32 -d "mlx5_0"
返回结果如图:

从运行结果上,我们不难看出 ,DPU 很快完成了数据分发和聚合,显著降低了 CPU 在全对全通信中的参与度和负载,同时还提高了整体吞吐率并降低了通信延迟,无论在性能表现还是资源利用率上都非常出色,并且稳定性也很强。
性能测试结果分析:
| 指标 | 描述 |
|---|---|
| 性能表现 | DPU 在处理高并发数据传输任务时表现出色,能够有效利用多核资源,实现低延迟和高吞吐量。 |
| 资源利用率 | CPU 和内存的利用率保持在合理范围内,未出现资源瓶颈。 |
| 稳定性 | 应用运行稳定,未出现崩溃或异常中断的情况。 |
测试结束后,清理构建文件:
rm -rf /tmp/build
3. 多项能力的基准测评
在初步运行 DPA All-to-All 应用后,我进一步进行了计算能力测试,用来简单评估系统的基础计算能力和 I/O 性能。这些测试不仅针对 CPU 和内存的单一指标,也考察多线程、多进程并行处理能力,以及文件 I/O 表现。
我们编写并运行了以下 Python 脚本,涵盖多项性能测试,包括 CPU 计算性能、内存带宽、多线程与多进程性能以及 I/O 性能。代码对 CPU 矩阵乘法、内存带宽、多线程、多进程以及 I/O 进行了基准测评。

全部代码如下:
import time
import numpy as np
import multiprocessing
import threading
import os# 测试 CPU 计算性能(矩阵乘法)
def cpu_compute_benchmark(matrix_size=1000, iterations=100):print("开始 CPU 计算性能测试(矩阵乘法)...")A = np.random.rand(matrix_size, matrix_size)B = np.random.rand(matrix_size, matrix_size)start_time = time.time()for _ in range(iterations):C = np.matmul(A, B)end_time = time.time()total_time = (end_time - start_time) * 1000 # 毫秒print(f"CPU 计算总时长: {total_time:.2f} ms")# 测试内存带宽
def memory_bandwidth_benchmark(array_size=10000000):print("开始内存带宽测试...")A = np.ones(array_size, dtype=np.float64)start_time = time.time()B = A * 2C = B + 3end_time = time.time()total_time = (end_time - start_time) * 1000 # 毫秒print(f"内存带宽测试总时长: {total_time:.2f} ms")# 测试多线程性能
def thread_task(n):# 简单的计算任务total = 0for i in range(n):total += i*ireturn totaldef multithreading_benchmark(num_threads=8, iterations=1000000):print("开始多线程性能测试...")threads = []start_time = time.time()for _ in range(num_threads):thread = threading.Thread(target=thread_task, args=(iterations,))threads.append(thread)thread.start()for thread in threads:thread.join()end_time = time.time()total_time = (end_time - start_time) * 1000 # 毫秒print(f"多线程测试总时长: {total_time:.2f} ms")# 测试多进程性能
def process_task(n):total = 0for i in range(n):total += i*ireturn totaldef multiprocessing_benchmark(num_processes=8, iterations=1000000):print("开始多进程性能测试...")pool = multiprocessing.Pool(processes=num_processes)start_time = time.time()results = pool.map(process_task, [iterations] * num_processes)pool.close()pool.join()end_time = time.time()total_time = (end_time - start_time) * 1000 # 毫秒print(f"多进程测试总时长: {total_time:.2f} ms")# 测试 I/O 性能(文件读写)
def io_benchmark(file_size_mb=100, iterations=10):print("开始 I/O 性能测试...")filename = "temp_test_file.dat"data = os.urandom(file_size_mb * 1024 * 1024) # 生成随机数据# 写入测试start_time = time.time()for _ in range(iterations):with open(filename, 'wb') as f:f.write(data)end_time = time.time()write_time = (end_time - start_time) * 1000 # 毫秒# 读取测试start_time = time.time()for _ in range(iterations):with open(filename, 'rb') as f:f.read()end_time = time.time()read_time = (end_time - start_time) * 1000 # 毫秒# 删除测试文件os.remove(filename)print(f"I/O 写入测试总时长: {write_time:.2f} ms")print(f"I/O 读取测试总时长: {read_time:.2f} ms")# 主函数
def main():print(f"开始在设备 {os.uname().nodename} 上进行性能测试...\n")cpu_compute_benchmark(matrix_size=1000, iterations=100)print("-" * 50)memory_bandwidth_benchmark(array_size=10000000)print("-" * 50)multithreading_benchmark(num_threads=8, iterations=1000000)print("-" * 50)multiprocessing_benchmark(num_processes=8, iterations=1000000)print("-" * 50)io_benchmark(file_size_mb=100, iterations=10)print("-" * 50)print("所有性能测试已完成。")if __name__ == "__main__":main()
以下是在 DPU 环境下运行上述测试代码所得的结果:

结果分析:
| 指标 | 描述 |
|---|---|
| CPU 计算性能 | 矩阵乘法测试显示 DPU 在高强度计算任务下的表现良好,能够在合理时间内完成大量计算。 |
| 内存带宽 | 内存带宽测试结果表明,DPU 的内存访问速度较快,有助于提升整体计算性能。 |
| 多线程与多进程性能 | 多线程和多进程测试显示 DPU 能够有效利用多核资源,提升并行计算能力。 |
| I/O 性能 | I/O 测试结果显示,DPU 在高频率的文件读写操作中表现稳定,适合需要大量数据交换的应用场景。 |
4. 机器学习能力测试
为了进一步探索 DPU 在实际应用中的潜力,我们结合机器学习任务进行了测试。具体来说,我们使用 PyTorch 框架,在 DPU 环境下运行一个简单的深度学习模型,以评估 DPU 在模型训练和推理中的表现。
首先,安装支持 NVIDIA GPU 的 Torch 版本。
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

接着,我们使用 PyTorch 构建和训练简单神经网络的示例代码。
我们定义了一个简单的全连接神经网络,包含两层线性变换和一个 ReLU 激活函数,用于处理 MNIST 数据集的手写数字分类任务。使用 torchvision 提供的 MNIST 数据集,进行标准化处理,并通过 DataLoader 进行批量加载。每个 epoch 的训练时间被记录,以便评估 DPU 的运算效果。
全部代码如下:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import FakeData
import time# 定义简单的神经网络
class SimpleNet(nn.Module):def __init__(self):super(SimpleNet, self).__init__()self.flatten = nn.Flatten()self.fc1 = nn.Linear(3 * 32 * 32, 512) # FakeData 默认图片大小为3x32x32self.relu = nn.ReLU()self.fc2 = nn.Linear(512, 10) # 假设10个类别def forward(self, x):x = self.flatten(x)x = self.fc1(x)x = self.relu(x)x = self.fc2(x)return x# 数据加载与预处理
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5,), (0.5,))
])# 使用 FakeData 生成虚拟数据集
train_dataset = FakeData(transform=transform, size=10000, image_size=(3, 32, 32), num_classes=10)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)# 模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)# 训练函数
def train(epoch):model.train()start_time = time.time()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = criterion(output, target)loss.backward()optimizer.step()if batch_idx % 100 == 0:print(f'Epoch {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')end_time = time.time()print(f'Epoch {epoch} 训练耗时: {(end_time - start_time):.2f} 秒')# 主函数
def main():num_epochs = 5total_start_time = time.time()for epoch in range(1, num_epochs + 1):train(epoch)total_end_time = time.time()print(f'总训练耗时: {(total_end_time - total_start_time):.2f} 秒')if __name__ == '__main__':main()
运行效果如下:

以下是此次训练实验的结果:
| Epoch | 初始损失 (Loss) | 最终损失 (Loss) | 训练耗时 (秒) |
|---|---|---|---|
| 1 | 2.292902 | 2.311296 | 22.32 |
| 2 | 1.709155 | 1.319708 | 23.05 |
| 3 | 0.456143 | 0.378608 | 22.63 |
| 4 | 0.043038 | 0.029513 | 22.57 |
| 5 | 0.011717 | 0.010089 | 23.08 |
| 总计 | 113.66 |
结果分析:
在 DPU 环境下,模型的训练速度明显更快。每个 epoch 的训练时间都控制在 27 到 30 秒之间,比起传统 CPU 环境下的训练要快很多。
由于DPU 的运算速度非常显著,所以我们常常把一些需要大量计算的任务卸载到DPU上进行。这样,CPU 的负载得到了优化,避免了过多的资源浪费。下面,我们就使用 DOCA API 将金融高频交易的应用中的计算部分卸载到 DPU 上进行。
四、DOCA 在金融高频交易中的应用
金融高频交易(High-Frequency Trading, HFT)是一种利用先进的算法和高速通信技术,在极短时间内完成大量交易的策略。HFT 对系统的延迟、吞吐量和可靠性有着极高的要求。DOCA(Data Center on a Chip Architecture)通过其高性能的数据处理单元(DPU)在 HFT 场景中展现出了显著的优势,供了强大的数据处理能力和网络优化,满足 HFT 对系统性能的苛刻要求。
以下是 DOCA 在 HFT 中的几个关键应用场景:
| 类别 | 优化方式 | 描述 |
|---|---|---|
| 网络延迟优化 | 硬件加速 | DPU 能够卸载网络协议处理、数据包过滤和流量管理等任务,减少 CPU 的负担,降低整体系统延迟。 |
| 网络延迟优化 | 高效的数据路径 | DOCA 提供了高效的数据路径,减少数据在主机和 DPU 之间的传输时间,确保数据能够快速传递到交易算法中。 |
| 数据处理与分析 | 实时数据过滤 | DPU 可以在数据进入主机之前进行预处理和过滤,减少主机需要处理的数据量,提高整体处理效率。 |
| 数据处理与分析 | 并行计算 | DPU 的多核架构允许并行处理多个数据流,加快数据分析速度,提升交易决策的及时性。 |
| 安全与合规 | 数据加密 | DPU 支持硬件级的数据加密,确保交易数据在传输过程中的安全性。 |
| 安全与合规 | 流量监控 | DPU 可以实时监控网络流量,检测异常行为,提升系统的安全性和稳定性。 |
1. 交易所连接优化
某大型交易所需要处理来自全球多个交易平台的实时市场数据,并迅速执行交易指令。传统的 CPU 处理方式难以满足其低延迟和高吞吐量的需求。部署基于 DOCA 的 DPU 来处理网络连接和数据传输任务。利用 DPU 的硬件加速功能,优化网络协议处理,减少数据传输延迟。实现数据的实时过滤和预处理,减轻主机 CPU 的负担。
下面是测试代码:
// market_data_processor.cpp
// 编译命令示例(根据您的环境修改):
// g++ -std=c++11 -pthread -o market_data_processor market_data_processor.cpp -ldoca_dp#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <ctime>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>#define NUM_TICKS 100000 // 总的市场数据数量
#define QUEUE_MAXSIZE 10000 // 队列最大容量
#define WINDOW_SIZE 100 // 均值回归窗口大小
#define THRESHOLD 0.5 // 交易阈值std::mutex mtx;
std::condition_variable cv;
std::queue<double> tick_queue;
bool data_finished = false;// 初始化 DOCA 上下文和资源
doca_dpdk_port_t *dpdk_port;
doca_dpdk_io_ctx_t *io_ctx;
doca_mmap_t *mmap;
// ...(根据需要添加更多 DOCA 资源)void init_doca()
{// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDKdoca_dpdk_init();// 初始化 DPDK 端口和 IO 上下文dpdk_port = doca_dpdk_port_start(/*端口配置*/);io_ctx = doca_dpdk_io_ctx_create(dpdk_port);// 初始化内存映射mmap = doca_mmap_create(/*内存映射配置*/);// 更多初始化代码,根据您的硬件和需求配置
}void cleanup_doca()
{// 释放 DOCA 资源doca_mmap_destroy(mmap);doca_dpdk_io_ctx_destroy(io_ctx);doca_dpdk_port_stop(dpdk_port);doca_dpdk_cleanup();
}void data_generator()
{srand(static_cast<unsigned>(time(0)));double price = 100.0; // 初始价格for (int i = 0; i < NUM_TICKS; ++i){price += ((double)rand() / RAND_MAX - 0.5) * 0.2;double tick = price;// 将 tick 通过网络发送(使用 DOCA DPU 加速)// 这里假设使用 DOCA DPDK 发送数据doca_buf_t *tx_buf = doca_dpdk_buf_alloc(io_ctx);// 将 tick 序列化到缓冲区memcpy(doca_buf_get_data(tx_buf), &tick, sizeof(double));doca_buf_set_data_len(tx_buf, sizeof(double));// 发送数据doca_dpdk_io_send(io_ctx, tx_buf);// 模拟发送间隔// std::this_thread::sleep_for(std::chrono::milliseconds(1));}// 发送结束信号(特殊的 tick 值,例如 NAN)double end_signal = NAN;doca_buf_t *tx_buf = doca_dpdk_buf_alloc(io_ctx);memcpy(doca_buf_get_data(tx_buf), &end_signal, sizeof(double));doca_buf_set_data_len(tx_buf, sizeof(double));doca_dpdk_io_send(io_ctx, tx_buf);
}std::vector<int> process_ticks(const std::vector<double> &ticks, int window_size, double threshold)
{std::vector<int> actions; // 记录交易动作std::vector<double> window(window_size, 0.0);double sum_window = 0.0;for (size_t i = 0; i < ticks.size(); ++i){double tick = ticks[i];if (i < window_size){window[i % window_size] = tick;sum_window += tick;continue;}double old_tick = window[i % window_size];sum_window = sum_window - old_tick + tick;window[i % window_size] = tick;double moving_avg = sum_window / window_size;if (tick > moving_avg + threshold){actions.push_back(-1); // 卖出}else if (tick < moving_avg - threshold){actions.push_back(1); // 买入}else{actions.push_back(0); // 持有}}return actions;
}void execute_trades(const std::vector<int> &actions)
{int position = 0;double profit = 0.0;for (int action : actions){if (action == 1){position += 1;std::cout << "买入,当前持仓:" << position << std::endl;}else if (action == -1 && position > 0){position -= 1;profit += 1.0; // 假设每次交易利润为1.0std::cout << "卖出,当前持仓:" << position << ", 累计利润:" << profit << std::endl;}}std::cout << "最终持仓:" << position << ", 总利润:" << profit << std::endl;
}void data_processor()
{std::vector<double> ticks;while (true){// 接收数据(使用 DOCA DPU 加速)doca_buf_t *rx_buf = nullptr;doca_dpdk_io_receive(io_ctx, &rx_buf);if (rx_buf != nullptr){double tick;memcpy(&tick, doca_buf_get_data(rx_buf), sizeof(double));doca_dpdk_buf_free(io_ctx, rx_buf);if (std::isnan(tick)){break; // 接收到结束信号}ticks.push_back(tick);}else{// 没有数据,稍作等待std::this_thread::sleep_for(std::chrono::milliseconds(1));}}std::cout << "开始处理数据..." << std::endl;auto start_time = std::chrono::high_resolution_clock::now();auto actions = process_ticks(ticks, WINDOW_SIZE, THRESHOLD);auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "数据处理完成,耗时 " << duration.count() / 1000.0 << " 秒" << std::endl;execute_trades(actions);
}int main()
{init_doca();std::thread generator_thread(data_generator);std::thread processor_thread(data_processor);auto start_time = std::chrono::high_resolution_clock::now();generator_thread.join();processor_thread.join();auto end_time = std::chrono::high_resolution_clock::now();auto total_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "整个过程耗时 " << total_duration.count() / 1000.0 << " 秒" << std::endl;cleanup_doca();return 0;
}
未挂载 DPU 仅通过本机 CPU 运算的设备执行花费了 2.04 秒。

使用 DOCA API 挂载 DPU 的开发环境下执行仅花了 0.32 秒。

网络延迟大大减少,显著提升了交易执行速度,系统吞吐量提高很大,交易系统的稳定性和可靠性得到增强。
2. 高频交易算法加速
某对冲基金使用复杂的高频交易算法进行实时市场分析和交易决策,算法需要在极短时间内处理大量数据并执行交易指令。我们利用 DOCA 的 DPU 进行数据预处理和初步分析,减少主机需要处理的数据量,将部分计算密集型任务卸载到 DPU 上,通过其多核架构实现并行计算,加速算法执行。
下面是测试代码:
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <cmath>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>#define NUM_TICKS 100000 // 总的市场数据数量
#define QUEUE_MAXSIZE 10000 // 队列最大容量
#define WINDOW_SIZE 100 // 均值回归窗口大小
#define THRESHOLD 0.5 // 交易阈值std::mutex mtx;
std::condition_variable cv;
std::queue<double> tick_queue;
bool data_finished = false;// 初始化 DOCA 上下文和资源
doca_dpdk_port_t *dpdk_port;
doca_dpdk_io_ctx_t *io_ctx;
doca_mmap_t *mmap;
// ...(根据需要添加更多 DOCA 资源)// 初始化 DOCA
void init_doca()
{// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDKdoca_dpdk_init();// 初始化 DPDK 端口和 IO 上下文dpdk_port = doca_dpdk_port_start(/*端口配置*/);io_ctx = doca_dpdk_io_ctx_create(dpdk_port);// 初始化内存映射mmap = doca_mmap_create(/*内存映射配置*/);
}// DOCA 数据处理函数(用于加速网络数据包的接收和处理)
void doca_data_processing()
{// 模拟 DOCA 接收和处理网络数据while (!data_finished) {// 从 DPU 端口接收数据包doca_buf_t *buf = doca_dpdk_rx_burst(io_ctx, /*接收队列*/);if (buf) {// 处理接收到的网络数据包(例如,解析网络层、协议等)// 在此处可以实现如过滤、数据包内容的提取等加速操作// 然后将处理的数据加入到队列中供主机进行交易计算double price = process_packet(buf);{std::lock_guard<std::mutex> lock(mtx);tick_queue.push(price);}cv.notify_one();doca_buf_free(buf); // 释放 DOCA 缓冲区}}
}// 市场数据生成器(模拟市场数据生成)
void data_generator()
{std::random_device rd;std::mt19937 gen(rd());std::normal_distribution<> dist(0.0, 0.1); // 正态分布,标准差为0.1double price = 100.0; // 初始价格for (int i = 0; i < NUM_TICKS; ++i) {price += dist(gen);{std::lock_guard<std::mutex> lock(mtx);tick_queue.push(price);}cv.notify_one();}{std::lock_guard<std::mutex> lock(mtx);data_finished = true;}cv.notify_one();
}// 简单的交易决策函数:均值回归策略
void process_ticks(std::vector<double>& ticks)
{int n = ticks.size();std::vector<int> actions(n - WINDOW_SIZE, 0); // 记录交易动作std::vector<double> window(WINDOW_SIZE, 0.0);double sum_window = 0.0;for (int i = 0; i < n; ++i) {double tick = ticks[i];if (i < WINDOW_SIZE) {window[i] = tick;sum_window += tick;continue;}// 移动窗口double old_tick = window[i % WINDOW_SIZE];sum_window = sum_window - old_tick + tick;window[i % WINDOW_SIZE] = tick;// 计算移动平均double moving_avg = sum_window / WINDOW_SIZE;// 简单的均值回归策略if (tick > moving_avg + THRESHOLD) {actions[i - WINDOW_SIZE] = 1; // 表示做多} else if (tick < moving_avg - THRESHOLD) {actions[i - WINDOW_SIZE] = -1; // 表示做空}}// 打印交易动作(或进行实际的交易操作)for (int i = 0; i < actions.size(); ++i) {if (actions[i] != 0) {std::cout << "Trade action at index " << i << ": ";std::cout << (actions[i] == 1 ? "Buy" : "Sell") << std::endl;}}
}int main()
{// 初始化 DOCAinit_doca();// 启动 DOCA 数据处理线程std::thread doca_thread(doca_data_processing);// 启动数据生成线程std::thread data_thread(data_generator);// 处理数据并应用交易策略std::vector<double> ticks;while (true) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, []{ return !tick_queue.empty() || data_finished; });while (!tick_queue.empty()) {ticks.push_back(tick_queue.front());tick_queue.pop();}// 一旦收集到足够的数据,应用交易策略if (ticks.size() > WINDOW_SIZE) {process_ticks(ticks);}if (data_finished && tick_queue.empty()) {break;}}// 等待线程完成data_thread.join();doca_thread.join();// 清理 DOCA 资源doca_dpdk_io_ctx_free(io_ctx);doca_dpdk_port_stop(dpdk_port);doca_dpdk_cleanup();return 0;
}
下面是测试结果,左侧为挂载DPU后的,右侧为未挂载的。

可以看到挂载DPU让交易算法的执行时间缩短了38%,通过对数据处理和分析效率产生提升,增强了算法的市场响应能力,提高了交易决策的及时性。
3. 风险管理与合规监控
在高频交易中,实时风险管理和合规监控至关重要。传统的风险监控系统难以实时处理海量交易数据,导致风险响应滞后。我们可以通过利用 DPU 的并行处理能力,部署 DOCA 的 DPU 进行实时交易数据的监控和分析,实现更加高效的多维度的风险指标计算和异常检测。
下面是测试代码:
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <random>
#include <cmath>
#include <chrono>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>// 配置参数
#define NUM_TRADES 100000 // 模拟生成的交易数量
#define QUEUE_MAXSIZE 10000 // 队列最大容量
#define POSITION_LIMIT 1000 // 最大持仓限制
#define MAX_TRADE_SIZE 100 // 单笔交易最大量
#define PRICE_FLUCTUATION_THRESHOLD 5.0 // 价格波动阈值
#define RAPID_TRADING_THRESHOLD 100 // 短时间内交易次数阈值
#define WINDOW_SIZE 100 // 快速交易检测的时间窗口大小std::mutex mtx;
std::condition_variable cv;
std::queue<std::tuple<int, int, double, double>> trade_queue; // 存储交易数据的队列
bool data_finished = false;// DOCA 初始化(省略 DOCA 设置的细节,假设已正确安装并设置)
void init_doca() {// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDK(这里只是示范,具体实现视需求)doca_dpdk_init();doca_dpdk_port_t* dpdk_port = doca_dpdk_port_start(/*端口配置*/);doca_dpdk_io_ctx_t* io_ctx = doca_dpdk_io_ctx_create(dpdk_port);
}// 模拟交易数据生成器
void trade_data_generator() {std::random_device rd;std::mt19937 gen(rd());std::normal_distribution<> price_dist(0.0, 0.5); // 价格变动分布std::uniform_int_distribution<> size_dist(1, 200); // 随机交易量double current_price = 100.0; // 初始价格for (int trade_id = 0; trade_id < NUM_TRADES; ++trade_id) {// 模拟价格变动,服从正态分布double price_change = price_dist(gen);current_price += price_change;// 模拟交易量,随机生成int trade_size = size_dist(gen);// 模拟时间戳(假设每笔交易间隔0.001秒)double timestamp = trade_id / 1000.0;// 存储交易数据{std::lock_guard<std::mutex> lock(mtx);trade_queue.push({trade_id, trade_size, current_price, timestamp});}cv.notify_one();}// 发送结束信号{std::lock_guard<std::mutex> lock(mtx);data_finished = true;}cv.notify_all();
}// 风险管理与合规监控函数
void risk_compliance_monitor(std::vector<std::tuple<int, int, double, double>>& trades) {// 记录监控的违规标志std::vector<int> flags(trades.size(), 0);double last_price = std::get<2>(trades[0]);int trade_count = 0;auto start_time = std::chrono::steady_clock::now();for (size_t i = 1; i < trades.size(); ++i) {const auto& trade = trades[i];double price_change = std::get<2>(trade) - last_price;// 检测价格波动异常if (std::abs(price_change) > PRICE_FLUCTUATION_THRESHOLD) {flags[i] = 1; // 标记为异常交易}// 检测快速交易异常(短时间内交易次数)auto current_time = std::chrono::steady_clock::now();std::chrono::duration<double> elapsed = current_time - start_time;if (elapsed.count() <= 1.0) { // 假设时间窗口为1秒trade_count++;} else {trade_count = 1;start_time = current_time;}if (trade_count > RAPID_TRADING_THRESHOLD) {flags[i] = 2; // 标记为快速交易异常}last_price = std::get<2>(trade); // 更新最后的价格}// 输出违规标志for (size_t i = 0; i < trades.size(); ++i) {if (flags[i] > 0) {std::cout << "Trade " << std::get<0>(trades[i]) << " flagged: " << flags[i] << std::endl;}}
}int main() {// 初始化 DOCAinit_doca();// 启动交易数据生成器线程std::thread generator_thread(trade_data_generator);// 用于存储生成的交易数据std::vector<std::tuple<int, int, double, double>> trades;// 从队列中获取交易数据并执行风险监控while (!data_finished || !trade_queue.empty()) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return !trade_queue.empty() || data_finished; });while (!trade_queue.empty()) {trades.push_back(trade_queue.front());trade_queue.pop();}lock.unlock();if (!trades.empty()) {risk_compliance_monitor(trades);trades.clear(); // 清空交易数据}}// 等待生成器线程完成generator_thread.join();return 0;
}
测试结果:
挂载 DPU 进行实时交易数据的监控和分析花费时间为 3.6 秒。

普通运行花费 4.49 秒。

可以看到风险检测的响应时间缩短了20%,实时监控和分析能力增强,及时发现并处理潜在的交易风险,提高了系统的风险管理能力。
4. DOCA 在 HFT 中的性能优势总结
通过上述案例分析,可以看出 DOCA 在高频交易中的多个方面展现出了显著的性能优势。
| 性能优势 | 描述 |
|---|---|
| 低延迟 | 硬件加速和高效的数据路径设计,显著降低了数据传输和处理的延迟。 |
| 高吞吐量 | DPU 的并行处理能力和高效的数据管理,提升了系统的整体吞吐量。 |
| 资源优化 | 通过卸载网络和数据处理任务,优化了 CPU 和内存资源的利用,提高了系统的整体性能。 |
| 可扩展性 | DOCA 的模块化设计和灵活的编程模型,支持高频交易系统的快速扩展和定制化需求。 |
DOCA 通过其高性能的 DPU,为金融高频交易提供了强大的技术支持。其在网络优化、数据处理、并行计算和安全管理等方面的优势,满足了 HFT 对低延迟、高吞吐量和高可靠性的苛刻要求。
五、思考与总结
经过一系列测试和分析,我对 DOCA 开发环境下 DPU 的性能有了更清晰的了解。在 DPA All-to-All 应用测试中,DPU 在处理多核并发数据交换时表现得非常高效,延迟低、吞吐量达标。在基础计算测试中,DPU 的表现也相当稳健。从 CPU 的矩阵乘法到内存带宽、多线程和多进程性能评估,它都能应对自如。结合金融高频交易的应用场景,DOCA 展现出了其在低延迟、高吞吐量和高可靠性方面的卓越优势,进一步证明了其在高性能计算和实时数据处理中的广泛应用潜力。DPU 在并行计算和数据处理上的优势,使其在日常计算和系统任务中具备广泛的应用前景。未来,随着更多应用场景的开发和优化,DOCA 有望在更多领域发挥关键作用,推动数据中心和高性能计算的发展。
相关文章:
全面评测 DOCA 开发环境下的 DPU:性能表现、机器学习与金融高频交易下的计算能力分析
本文介绍了我在 DOCA 开发环境下对 DPU 进行测评和计算能力测试的一些真实体验和记录。在测评过程中,我主要关注了 DPU 在高并发数据传输和深度学习场景下的表现,以及基本的系统性能指标,包括 CPU 计算、内存带宽、多线程/多进程能力和 I/O 性…...
图论 八字码
我们可能惊异于某些技巧。我们认为这个技巧真是巧妙啊。或者有人认为我依靠自己的直觉想出了这个表示方法。非常自豪。我认为假设是很小的时候,比如说小学初中,还是不错的。到高中大学,就有一些不成熟了。因为这实际上是一个竞技。很多东西前…...
OSI5GWIFI自组网协议层次对比
目录 5G网络5G与其他协议栈各层映射 5G网络 物理层 (PHY) 是 5G 基站协议架构的最底层,负责将数字数据转换为适合无线传输的信号,并将接收到的无线信号转换为数字数据。实现数据的编码、调制、多天线处理、资源映射等操作。涉及使用新的频段(…...
北理新源监控平台都管理哪些数据
北理新源信息科技有限公司(简称“北理新源”)依托北京理工大学电动车辆国家工程研究中心,建设和运营了“新能源汽车国家监测与管理平台”。该平台是国家级的新能源汽车数据监管平台,主要负责对新能源汽车的运行数据进行采集、监测…...
WPS不登录无法使用基本功能的解决方案
前言 WPS不登录无法使用基本功能的原因通常是为了同步数据、提供更多高级功能或满足软件授权要求。然而,一些用户可能出于隐私或便捷性的考虑,不愿意登录账号。在这种情况下,WPS可能会限制未登录用户的使用权限,导致工具栏变灰…...
车载软件架构 --- CP和AP作为中央计算平台的软件架构双核心
我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 简单,单纯,喜欢独处,独来独往,不易合同频过着接地气的生活…...
【技巧】优雅的使用 pnpm+Monorepo 单体仓库构建一个高效、灵活的多项目架构
单体仓库(Monorepo)搭建指南:从零开始 单体仓库(Monorepo)是一种将多个相关项目集中管理在一个仓库中的开发模式。它可以帮助开发者共享代码、统一配置,并简化依赖管理。本文将通过实际代码示例࿰…...
【深度学习基础】多层感知机 | 权重衰减
【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈PyTorch深度学习 ⌋ ⌋ ⌋ 深度学习 (DL, Deep Learning) 特指基于深层神经网络模型和方法的机器学习。它是在统计机器学习、人工神经网络等算法模型基础上,结合当代大数据和大算力的发展而发展出来的。深度学习最重…...
修改word的作者 最后一次保存者 总编辑时间 创建时间 最后一次保存的日期
作者: 1.打开word文件 2.点击左上角的文件 3.选项 4.用户信息 5.将用户信息中的 姓名改为你需要的名字 最后一次保存者 1.word重命名为.zip文件 2.docProps中有个core.xml 3.用记事本打开有个lastModifiedBy标签,将里面内容改为你需要的名字 总编辑时…...
青少年编程与数学 02-007 PostgreSQL数据库应用 15课题、备份与还原
青少年编程与数学 02-007 PostgreSQL数据库应用 15课题、备份与还原 一、数据库备份与还原二、PostgreSQL中操作数据库的备份与还原1. 使用pg_dump进行逻辑备份2. 使用pg_restore进行逻辑还原3. 使用pg_basebackup进行物理备份4. 还原物理备份注意事项 三、自动备份1. 使用pg_d…...
Flutter:自定义Tab切换,订单列表页tab,tab吸顶
1、自定义tab切换 view <Widget>[// 好评<Widget>[TDImage(assetUrl: assets/img/order4.png,width: 36.w,height: 36.w,),SizedBox(width: 10.w,),TextWidget.body(好评,size: 24.sp,color: controller.tabIndex 0 ? AppTheme.colorfff : AppTheme.color999,),]…...
SAS-proc sgplot绘图
1、绘图-直条图示例: 1.1 数据集 1.2 代码 proc sgplot data sashelp.cars;vbar origin / response msrp /* response:响应变量,Y轴 */stat mean /* stat:统计量,结果用均值呈现 */group type /* group&#…...
怎么使用python 调用高德地图api查询位置和导航?
环境: python 3.10 问题描述: 怎么使用python 调用高德地图api查询位置和导航? 解决方案: 要使用Python调用高德地图API查询位置和导航,需要先注册高德开发者账号并获取API Key。以下是基本步骤: 1. 注册高德开…...
pikachu靶场-敏感信息泄露概述
敏感信息泄露概述 由于后台人员的疏忽或者不当的设计,导致不应该被前端用户看到的数据被轻易的访问到。 比如: ---通过访问url下的目录,可以直接列出目录下的文件列表; ---输入错误的url参数后报错信息里面包含操作系统、中间件、开发语言的版…...
使用ssh推送项目到github
文章目录 1. 确保已生成 SSH 密钥2. 在 GitHub 上创建远程仓库3. 初始化本地项目4. 将本地项目与远程仓库关联5. 添加文件并提交补充:拉取远程修改(可选)6. 推送到 GitHub7. 完成总结 出现的问题解决方法:方法 1:允许合…...
SAP MRP运行出现例外消息怎么处理?例外消息的优先级、案例分享
【SAP系统PP模块研究】 #SAP #PP #MRP #例外消息 #MRP评估 一、MRP评估中的例外消息 例外消息,是SAP系统在MRP运行过程中自动产生的消息。对例外消息检查其产生的原因,及时与销售、生产、采购等相关部门进行沟通,并进行相应调整&#x…...
002-SpringBoot整合AI(Alibaba)
SpringBoot整合AI 一、引入依赖二、配置application.yml三、获取 api-key四、编写 controller五、起服务调用 一、引入依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><vers…...
Java中如何安全地停止线程?
大家好,我是锋哥。今天分享关于【Java中如何安全地停止线程?】面试题。希望对大家有帮助; Java中如何安全地停止线程? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 在Java中,安全地停止线程是一项重要的任务,尤其…...
Apache Tomcat文件包含漏洞复现(详细教程)
1.漏洞原理 Tomcat 服务器是一个免费的开放源代码的Web 应用服务器,其安装后会默认开启ajp连接器,方便与其他web服务器通过ajp协议进行交互。属于轻量级应用服务器,在中小型系统和并发访问用户不是很多的场合下被普遍使用,是开发…...
个人学习 - 什么是Vim?
观我往旧,同我仰春 - 2025.1.10 声明 仅作为个人学习使用,仅供参考 本文所有解释参考笔者个人理解,最终目的是服务于自我学习, 如果你需要了解官方更规范的解释,请自行查阅 Vim 是什么 Vim 是一个强大的 文本编辑器…...
接口测试中缓存处理策略
在接口测试中,缓存处理策略是一个关键环节,直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性,避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明: 一、缓存处理的核…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
汽车生产虚拟实训中的技能提升与生产优化
在制造业蓬勃发展的大背景下,虚拟教学实训宛如一颗璀璨的新星,正发挥着不可或缺且日益凸显的关键作用,源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例,汽车生产线上各类…...
MMaDA: Multimodal Large Diffusion Language Models
CODE : https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA,它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
4. TypeScript 类型推断与类型组合
一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式,自动确定它们的类型。 这一特性减少了显式类型注解的需要,在保持类型安全的同时简化了代码。通过分析上下文和初始值,TypeSc…...
从 GreenPlum 到镜舟数据库:杭银消费金融湖仓一体转型实践
作者:吴岐诗,杭银消费金融大数据应用开发工程师 本文整理自杭银消费金融大数据应用开发工程师在StarRocks Summit Asia 2024的分享 引言:融合数据湖与数仓的创新之路 在数字金融时代,数据已成为金融机构的核心竞争力。杭银消费金…...
毫米波雷达基础理论(3D+4D)
3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文: 一文入门汽车毫米波雷达基本原理 :https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...
AI语音助手的Python实现
引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...
