全面评测 DOCA 开发环境下的 DPU:性能表现、机器学习与金融高频交易下的计算能力分析
本文介绍了我在 DOCA 开发环境下对 DPU 进行测评和计算能力测试的一些真实体验和记录。在测评过程中,我主要关注了 DPU 在高并发数据传输和深度学习场景下的表现,以及基本的系统性能指标,包括 CPU 计算、内存带宽、多线程/多进程能力和 I/O 性能,并测试了在机器学习应用下的潜在性能。此外,我重点结合了金融高频交易的应用场景,DOCA 展现出了其在低延迟、高吞吐量和高可靠性方面的卓越优势,进一步证明了其在高性能计算和实时数据处理中的广泛应用潜力。
一、测评环境
这是一台装载双端口 DPU 的服务器,操作系统为 Ubuntu 22.04。
我们先来查看CPU信息:
lscpu
可以看到这是一台基于 ARM Cortex-A78AE 内核的 64 位 ARM 平台设备,有16个核心、较为充足的多级缓存、支持高级SIMD和加密指令扩展,并针对一些常见CPU安全漏洞进行了一定程度的缓解。
接着,我们查看设备型号:
mst status -v
mlxconfig -d /dev/mst/mt41692_pciconf0 -e q
可以看到设备具体型号是 NVIDIA BlueField-3 B3220 P-Series FHHL DPU,双端口 QSFP112 接口,支持 200GbE(默认模式)或 NDR200 IB。具有16个 Arm 核心处理器和32GB 板载 DDR 内存,PCIe接口为 Gen5.0 x16。
二、测评目标
这次测评的目标是评估 DOCA 环境下 DPU 的实际性能表现,看它在数据密集型任务、高并发通信及后续可能的深度学习任务中能有怎样的表现。我首先登录到指定的 DPU 服务器,搭建基础开发环境,然后编译运行 DPA All-to-All 应用,观察其运行表现。
为了达到上述目标,我们制定以下测评步骤:
- 通过 SSH 登录 DPU
- 搭建并清理编译环境(Meson、Ninja)
- 安装和检查 MPI 环境 (mpich)
- 构建启用
dpa_all_to_all
功能的 DOCA 应用 - 使用 mpirun 测试并观察数据传输性能
- 安装支持 CUDA 的 PyTorch 版本(
pip install torch...
) - 使用 Python 脚本进行 CPU、内存、多线程、多进程和 I/O 的性能测试
- 结合 Torch,以后可拓展对深度学习任务的 DPU 加速能力进行评估(本次仅基本测试计算与性能)
三、测评步骤
1. 测评环境构建
首先,通过 SSH 连接到 DPU 服务器,确保具备必要的权限和网络配置。
ssh -p 8889 cqd*****@113.**.***.73
密码: **********
进入应用程序目录,准备开发环境:
cd /opt/mellanox/doca/applications
检查并安装必要的 MPI 库:
dpkg -l | grep mpich
apt-get install mpich
清理之前的构建文件,确保环境整洁:
rm -rf /tmp/build
使用 Meson 构建系统配置项目,启用特定功能:
meson /tmp/build -Denable_all_applications=false -Denable_dpa_all_to_all=true
通过 Ninja 进行编译:
ninja -C /tmp/build
检查 Mellanox 状态,确保硬件正常运行:
mst status -v
这里可以看到我们的双端口 DPU。
2. All-to-All MPI 性能测试
在开始实操之前,我们先来了解一下什么是 All-to-all 。
All-to-all 是一种 MPI(消息传递接口)方法。MPI 是一种标准化且可移植的消息传递标准,旨在在并行计算体系结构上运行。一个 MPI 程序由多个进程并行运行。
其运行示例图如下:
在上图中,每个进程将其本地的发送缓冲区(sendbuf)分成 n 个块(本例中为 4 个块),每个块包含 sendcount 个元素(本例中为 4 个元素)。进程 i 会将其本地发送缓冲区中的第 k 个块发送给进程 k,而进程 k 则将这些数据放置在其本地接收缓冲区(recvbuf)的第 i 个块中。
通过使用 DOCA DPA 来实现 all-to-all 方法,可以将从 srcbuf 复制元素到 recvbufs 的过程卸载给 DPA,从而使 CPU 解放出来,去执行其他计算工作。
下图描述了基于主机的全对全和 DPA 全对全之间的区别。
- 在 DPA all-to-all 中,DPA 线程执行 all-to-all,而 CPU 可以自由地进行其他计算;
- 在基于主机的全对全中,CPU 在某些时候仍必须执行全对全,并且不能完全自由地进行其他计算;
下面我们来实操:
我们使用 mpirun
运行 DPA All-to-All 应用,进行性能测试:
mpirun -np 4 /tmp/build/dpa_all_to_all/doca_dpa_all_to_all -m 32 -d "mlx5_0"
返回结果如图:
从运行结果上,我们不难看出 ,DPU 很快完成了数据分发和聚合,显著降低了 CPU 在全对全通信中的参与度和负载,同时还提高了整体吞吐率并降低了通信延迟,无论在性能表现还是资源利用率上都非常出色,并且稳定性也很强。
性能测试结果分析:
指标 | 描述 |
---|---|
性能表现 | DPU 在处理高并发数据传输任务时表现出色,能够有效利用多核资源,实现低延迟和高吞吐量。 |
资源利用率 | CPU 和内存的利用率保持在合理范围内,未出现资源瓶颈。 |
稳定性 | 应用运行稳定,未出现崩溃或异常中断的情况。 |
测试结束后,清理构建文件:
rm -rf /tmp/build
3. 多项能力的基准测评
在初步运行 DPA All-to-All 应用后,我进一步进行了计算能力测试,用来简单评估系统的基础计算能力和 I/O 性能。这些测试不仅针对 CPU 和内存的单一指标,也考察多线程、多进程并行处理能力,以及文件 I/O 表现。
我们编写并运行了以下 Python 脚本,涵盖多项性能测试,包括 CPU 计算性能、内存带宽、多线程与多进程性能以及 I/O 性能。代码对 CPU 矩阵乘法、内存带宽、多线程、多进程以及 I/O 进行了基准测评。
全部代码如下:
import time
import numpy as np
import multiprocessing
import threading
import os# 测试 CPU 计算性能(矩阵乘法)
def cpu_compute_benchmark(matrix_size=1000, iterations=100):print("开始 CPU 计算性能测试(矩阵乘法)...")A = np.random.rand(matrix_size, matrix_size)B = np.random.rand(matrix_size, matrix_size)start_time = time.time()for _ in range(iterations):C = np.matmul(A, B)end_time = time.time()total_time = (end_time - start_time) * 1000 # 毫秒print(f"CPU 计算总时长: {total_time:.2f} ms")# 测试内存带宽
def memory_bandwidth_benchmark(array_size=10000000):print("开始内存带宽测试...")A = np.ones(array_size, dtype=np.float64)start_time = time.time()B = A * 2C = B + 3end_time = time.time()total_time = (end_time - start_time) * 1000 # 毫秒print(f"内存带宽测试总时长: {total_time:.2f} ms")# 测试多线程性能
def thread_task(n):# 简单的计算任务total = 0for i in range(n):total += i*ireturn totaldef multithreading_benchmark(num_threads=8, iterations=1000000):print("开始多线程性能测试...")threads = []start_time = time.time()for _ in range(num_threads):thread = threading.Thread(target=thread_task, args=(iterations,))threads.append(thread)thread.start()for thread in threads:thread.join()end_time = time.time()total_time = (end_time - start_time) * 1000 # 毫秒print(f"多线程测试总时长: {total_time:.2f} ms")# 测试多进程性能
def process_task(n):total = 0for i in range(n):total += i*ireturn totaldef multiprocessing_benchmark(num_processes=8, iterations=1000000):print("开始多进程性能测试...")pool = multiprocessing.Pool(processes=num_processes)start_time = time.time()results = pool.map(process_task, [iterations] * num_processes)pool.close()pool.join()end_time = time.time()total_time = (end_time - start_time) * 1000 # 毫秒print(f"多进程测试总时长: {total_time:.2f} ms")# 测试 I/O 性能(文件读写)
def io_benchmark(file_size_mb=100, iterations=10):print("开始 I/O 性能测试...")filename = "temp_test_file.dat"data = os.urandom(file_size_mb * 1024 * 1024) # 生成随机数据# 写入测试start_time = time.time()for _ in range(iterations):with open(filename, 'wb') as f:f.write(data)end_time = time.time()write_time = (end_time - start_time) * 1000 # 毫秒# 读取测试start_time = time.time()for _ in range(iterations):with open(filename, 'rb') as f:f.read()end_time = time.time()read_time = (end_time - start_time) * 1000 # 毫秒# 删除测试文件os.remove(filename)print(f"I/O 写入测试总时长: {write_time:.2f} ms")print(f"I/O 读取测试总时长: {read_time:.2f} ms")# 主函数
def main():print(f"开始在设备 {os.uname().nodename} 上进行性能测试...\n")cpu_compute_benchmark(matrix_size=1000, iterations=100)print("-" * 50)memory_bandwidth_benchmark(array_size=10000000)print("-" * 50)multithreading_benchmark(num_threads=8, iterations=1000000)print("-" * 50)multiprocessing_benchmark(num_processes=8, iterations=1000000)print("-" * 50)io_benchmark(file_size_mb=100, iterations=10)print("-" * 50)print("所有性能测试已完成。")if __name__ == "__main__":main()
以下是在 DPU 环境下运行上述测试代码所得的结果:
结果分析:
指标 | 描述 |
---|---|
CPU 计算性能 | 矩阵乘法测试显示 DPU 在高强度计算任务下的表现良好,能够在合理时间内完成大量计算。 |
内存带宽 | 内存带宽测试结果表明,DPU 的内存访问速度较快,有助于提升整体计算性能。 |
多线程与多进程性能 | 多线程和多进程测试显示 DPU 能够有效利用多核资源,提升并行计算能力。 |
I/O 性能 | I/O 测试结果显示,DPU 在高频率的文件读写操作中表现稳定,适合需要大量数据交换的应用场景。 |
4. 机器学习能力测试
为了进一步探索 DPU 在实际应用中的潜力,我们结合机器学习任务进行了测试。具体来说,我们使用 PyTorch 框架,在 DPU 环境下运行一个简单的深度学习模型,以评估 DPU 在模型训练和推理中的表现。
首先,安装支持 NVIDIA GPU 的 Torch 版本。
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
接着,我们使用 PyTorch 构建和训练简单神经网络的示例代码。
我们定义了一个简单的全连接神经网络,包含两层线性变换和一个 ReLU 激活函数,用于处理 MNIST 数据集的手写数字分类任务。使用 torchvision 提供的 MNIST 数据集,进行标准化处理,并通过 DataLoader 进行批量加载。每个 epoch 的训练时间被记录,以便评估 DPU 的运算效果。
全部代码如下:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import FakeData
import time# 定义简单的神经网络
class SimpleNet(nn.Module):def __init__(self):super(SimpleNet, self).__init__()self.flatten = nn.Flatten()self.fc1 = nn.Linear(3 * 32 * 32, 512) # FakeData 默认图片大小为3x32x32self.relu = nn.ReLU()self.fc2 = nn.Linear(512, 10) # 假设10个类别def forward(self, x):x = self.flatten(x)x = self.fc1(x)x = self.relu(x)x = self.fc2(x)return x# 数据加载与预处理
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5,), (0.5,))
])# 使用 FakeData 生成虚拟数据集
train_dataset = FakeData(transform=transform, size=10000, image_size=(3, 32, 32), num_classes=10)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)# 模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)# 训练函数
def train(epoch):model.train()start_time = time.time()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = criterion(output, target)loss.backward()optimizer.step()if batch_idx % 100 == 0:print(f'Epoch {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')end_time = time.time()print(f'Epoch {epoch} 训练耗时: {(end_time - start_time):.2f} 秒')# 主函数
def main():num_epochs = 5total_start_time = time.time()for epoch in range(1, num_epochs + 1):train(epoch)total_end_time = time.time()print(f'总训练耗时: {(total_end_time - total_start_time):.2f} 秒')if __name__ == '__main__':main()
运行效果如下:
以下是此次训练实验的结果:
Epoch | 初始损失 (Loss) | 最终损失 (Loss) | 训练耗时 (秒) |
---|---|---|---|
1 | 2.292902 | 2.311296 | 22.32 |
2 | 1.709155 | 1.319708 | 23.05 |
3 | 0.456143 | 0.378608 | 22.63 |
4 | 0.043038 | 0.029513 | 22.57 |
5 | 0.011717 | 0.010089 | 23.08 |
总计 | 113.66 |
结果分析:
在 DPU 环境下,模型的训练速度明显更快。每个 epoch 的训练时间都控制在 27 到 30 秒之间,比起传统 CPU 环境下的训练要快很多。
由于DPU 的运算速度非常显著,所以我们常常把一些需要大量计算的任务卸载到DPU上进行。这样,CPU 的负载得到了优化,避免了过多的资源浪费。下面,我们就使用 DOCA API 将金融高频交易的应用中的计算部分卸载到 DPU 上进行。
四、DOCA 在金融高频交易中的应用
金融高频交易(High-Frequency Trading, HFT)是一种利用先进的算法和高速通信技术,在极短时间内完成大量交易的策略。HFT 对系统的延迟、吞吐量和可靠性有着极高的要求。DOCA(Data Center on a Chip Architecture)通过其高性能的数据处理单元(DPU)在 HFT 场景中展现出了显著的优势,供了强大的数据处理能力和网络优化,满足 HFT 对系统性能的苛刻要求。
以下是 DOCA 在 HFT 中的几个关键应用场景:
类别 | 优化方式 | 描述 |
---|---|---|
网络延迟优化 | 硬件加速 | DPU 能够卸载网络协议处理、数据包过滤和流量管理等任务,减少 CPU 的负担,降低整体系统延迟。 |
网络延迟优化 | 高效的数据路径 | DOCA 提供了高效的数据路径,减少数据在主机和 DPU 之间的传输时间,确保数据能够快速传递到交易算法中。 |
数据处理与分析 | 实时数据过滤 | DPU 可以在数据进入主机之前进行预处理和过滤,减少主机需要处理的数据量,提高整体处理效率。 |
数据处理与分析 | 并行计算 | DPU 的多核架构允许并行处理多个数据流,加快数据分析速度,提升交易决策的及时性。 |
安全与合规 | 数据加密 | DPU 支持硬件级的数据加密,确保交易数据在传输过程中的安全性。 |
安全与合规 | 流量监控 | DPU 可以实时监控网络流量,检测异常行为,提升系统的安全性和稳定性。 |
1. 交易所连接优化
某大型交易所需要处理来自全球多个交易平台的实时市场数据,并迅速执行交易指令。传统的 CPU 处理方式难以满足其低延迟和高吞吐量的需求。部署基于 DOCA 的 DPU 来处理网络连接和数据传输任务。利用 DPU 的硬件加速功能,优化网络协议处理,减少数据传输延迟。实现数据的实时过滤和预处理,减轻主机 CPU 的负担。
下面是测试代码:
// market_data_processor.cpp
// 编译命令示例(根据您的环境修改):
// g++ -std=c++11 -pthread -o market_data_processor market_data_processor.cpp -ldoca_dp#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <ctime>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>#define NUM_TICKS 100000 // 总的市场数据数量
#define QUEUE_MAXSIZE 10000 // 队列最大容量
#define WINDOW_SIZE 100 // 均值回归窗口大小
#define THRESHOLD 0.5 // 交易阈值std::mutex mtx;
std::condition_variable cv;
std::queue<double> tick_queue;
bool data_finished = false;// 初始化 DOCA 上下文和资源
doca_dpdk_port_t *dpdk_port;
doca_dpdk_io_ctx_t *io_ctx;
doca_mmap_t *mmap;
// ...(根据需要添加更多 DOCA 资源)void init_doca()
{// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDKdoca_dpdk_init();// 初始化 DPDK 端口和 IO 上下文dpdk_port = doca_dpdk_port_start(/*端口配置*/);io_ctx = doca_dpdk_io_ctx_create(dpdk_port);// 初始化内存映射mmap = doca_mmap_create(/*内存映射配置*/);// 更多初始化代码,根据您的硬件和需求配置
}void cleanup_doca()
{// 释放 DOCA 资源doca_mmap_destroy(mmap);doca_dpdk_io_ctx_destroy(io_ctx);doca_dpdk_port_stop(dpdk_port);doca_dpdk_cleanup();
}void data_generator()
{srand(static_cast<unsigned>(time(0)));double price = 100.0; // 初始价格for (int i = 0; i < NUM_TICKS; ++i){price += ((double)rand() / RAND_MAX - 0.5) * 0.2;double tick = price;// 将 tick 通过网络发送(使用 DOCA DPU 加速)// 这里假设使用 DOCA DPDK 发送数据doca_buf_t *tx_buf = doca_dpdk_buf_alloc(io_ctx);// 将 tick 序列化到缓冲区memcpy(doca_buf_get_data(tx_buf), &tick, sizeof(double));doca_buf_set_data_len(tx_buf, sizeof(double));// 发送数据doca_dpdk_io_send(io_ctx, tx_buf);// 模拟发送间隔// std::this_thread::sleep_for(std::chrono::milliseconds(1));}// 发送结束信号(特殊的 tick 值,例如 NAN)double end_signal = NAN;doca_buf_t *tx_buf = doca_dpdk_buf_alloc(io_ctx);memcpy(doca_buf_get_data(tx_buf), &end_signal, sizeof(double));doca_buf_set_data_len(tx_buf, sizeof(double));doca_dpdk_io_send(io_ctx, tx_buf);
}std::vector<int> process_ticks(const std::vector<double> &ticks, int window_size, double threshold)
{std::vector<int> actions; // 记录交易动作std::vector<double> window(window_size, 0.0);double sum_window = 0.0;for (size_t i = 0; i < ticks.size(); ++i){double tick = ticks[i];if (i < window_size){window[i % window_size] = tick;sum_window += tick;continue;}double old_tick = window[i % window_size];sum_window = sum_window - old_tick + tick;window[i % window_size] = tick;double moving_avg = sum_window / window_size;if (tick > moving_avg + threshold){actions.push_back(-1); // 卖出}else if (tick < moving_avg - threshold){actions.push_back(1); // 买入}else{actions.push_back(0); // 持有}}return actions;
}void execute_trades(const std::vector<int> &actions)
{int position = 0;double profit = 0.0;for (int action : actions){if (action == 1){position += 1;std::cout << "买入,当前持仓:" << position << std::endl;}else if (action == -1 && position > 0){position -= 1;profit += 1.0; // 假设每次交易利润为1.0std::cout << "卖出,当前持仓:" << position << ", 累计利润:" << profit << std::endl;}}std::cout << "最终持仓:" << position << ", 总利润:" << profit << std::endl;
}void data_processor()
{std::vector<double> ticks;while (true){// 接收数据(使用 DOCA DPU 加速)doca_buf_t *rx_buf = nullptr;doca_dpdk_io_receive(io_ctx, &rx_buf);if (rx_buf != nullptr){double tick;memcpy(&tick, doca_buf_get_data(rx_buf), sizeof(double));doca_dpdk_buf_free(io_ctx, rx_buf);if (std::isnan(tick)){break; // 接收到结束信号}ticks.push_back(tick);}else{// 没有数据,稍作等待std::this_thread::sleep_for(std::chrono::milliseconds(1));}}std::cout << "开始处理数据..." << std::endl;auto start_time = std::chrono::high_resolution_clock::now();auto actions = process_ticks(ticks, WINDOW_SIZE, THRESHOLD);auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "数据处理完成,耗时 " << duration.count() / 1000.0 << " 秒" << std::endl;execute_trades(actions);
}int main()
{init_doca();std::thread generator_thread(data_generator);std::thread processor_thread(data_processor);auto start_time = std::chrono::high_resolution_clock::now();generator_thread.join();processor_thread.join();auto end_time = std::chrono::high_resolution_clock::now();auto total_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "整个过程耗时 " << total_duration.count() / 1000.0 << " 秒" << std::endl;cleanup_doca();return 0;
}
未挂载 DPU 仅通过本机 CPU 运算的设备执行花费了 2.04 秒。
使用 DOCA API 挂载 DPU 的开发环境下执行仅花了 0.32 秒。
网络延迟大大减少,显著提升了交易执行速度,系统吞吐量提高很大,交易系统的稳定性和可靠性得到增强。
2. 高频交易算法加速
某对冲基金使用复杂的高频交易算法进行实时市场分析和交易决策,算法需要在极短时间内处理大量数据并执行交易指令。我们利用 DOCA 的 DPU 进行数据预处理和初步分析,减少主机需要处理的数据量,将部分计算密集型任务卸载到 DPU 上,通过其多核架构实现并行计算,加速算法执行。
下面是测试代码:
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <cmath>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>#define NUM_TICKS 100000 // 总的市场数据数量
#define QUEUE_MAXSIZE 10000 // 队列最大容量
#define WINDOW_SIZE 100 // 均值回归窗口大小
#define THRESHOLD 0.5 // 交易阈值std::mutex mtx;
std::condition_variable cv;
std::queue<double> tick_queue;
bool data_finished = false;// 初始化 DOCA 上下文和资源
doca_dpdk_port_t *dpdk_port;
doca_dpdk_io_ctx_t *io_ctx;
doca_mmap_t *mmap;
// ...(根据需要添加更多 DOCA 资源)// 初始化 DOCA
void init_doca()
{// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDKdoca_dpdk_init();// 初始化 DPDK 端口和 IO 上下文dpdk_port = doca_dpdk_port_start(/*端口配置*/);io_ctx = doca_dpdk_io_ctx_create(dpdk_port);// 初始化内存映射mmap = doca_mmap_create(/*内存映射配置*/);
}// DOCA 数据处理函数(用于加速网络数据包的接收和处理)
void doca_data_processing()
{// 模拟 DOCA 接收和处理网络数据while (!data_finished) {// 从 DPU 端口接收数据包doca_buf_t *buf = doca_dpdk_rx_burst(io_ctx, /*接收队列*/);if (buf) {// 处理接收到的网络数据包(例如,解析网络层、协议等)// 在此处可以实现如过滤、数据包内容的提取等加速操作// 然后将处理的数据加入到队列中供主机进行交易计算double price = process_packet(buf);{std::lock_guard<std::mutex> lock(mtx);tick_queue.push(price);}cv.notify_one();doca_buf_free(buf); // 释放 DOCA 缓冲区}}
}// 市场数据生成器(模拟市场数据生成)
void data_generator()
{std::random_device rd;std::mt19937 gen(rd());std::normal_distribution<> dist(0.0, 0.1); // 正态分布,标准差为0.1double price = 100.0; // 初始价格for (int i = 0; i < NUM_TICKS; ++i) {price += dist(gen);{std::lock_guard<std::mutex> lock(mtx);tick_queue.push(price);}cv.notify_one();}{std::lock_guard<std::mutex> lock(mtx);data_finished = true;}cv.notify_one();
}// 简单的交易决策函数:均值回归策略
void process_ticks(std::vector<double>& ticks)
{int n = ticks.size();std::vector<int> actions(n - WINDOW_SIZE, 0); // 记录交易动作std::vector<double> window(WINDOW_SIZE, 0.0);double sum_window = 0.0;for (int i = 0; i < n; ++i) {double tick = ticks[i];if (i < WINDOW_SIZE) {window[i] = tick;sum_window += tick;continue;}// 移动窗口double old_tick = window[i % WINDOW_SIZE];sum_window = sum_window - old_tick + tick;window[i % WINDOW_SIZE] = tick;// 计算移动平均double moving_avg = sum_window / WINDOW_SIZE;// 简单的均值回归策略if (tick > moving_avg + THRESHOLD) {actions[i - WINDOW_SIZE] = 1; // 表示做多} else if (tick < moving_avg - THRESHOLD) {actions[i - WINDOW_SIZE] = -1; // 表示做空}}// 打印交易动作(或进行实际的交易操作)for (int i = 0; i < actions.size(); ++i) {if (actions[i] != 0) {std::cout << "Trade action at index " << i << ": ";std::cout << (actions[i] == 1 ? "Buy" : "Sell") << std::endl;}}
}int main()
{// 初始化 DOCAinit_doca();// 启动 DOCA 数据处理线程std::thread doca_thread(doca_data_processing);// 启动数据生成线程std::thread data_thread(data_generator);// 处理数据并应用交易策略std::vector<double> ticks;while (true) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, []{ return !tick_queue.empty() || data_finished; });while (!tick_queue.empty()) {ticks.push_back(tick_queue.front());tick_queue.pop();}// 一旦收集到足够的数据,应用交易策略if (ticks.size() > WINDOW_SIZE) {process_ticks(ticks);}if (data_finished && tick_queue.empty()) {break;}}// 等待线程完成data_thread.join();doca_thread.join();// 清理 DOCA 资源doca_dpdk_io_ctx_free(io_ctx);doca_dpdk_port_stop(dpdk_port);doca_dpdk_cleanup();return 0;
}
下面是测试结果,左侧为挂载DPU后的,右侧为未挂载的。
可以看到挂载DPU让交易算法的执行时间缩短了38%,通过对数据处理和分析效率产生提升,增强了算法的市场响应能力,提高了交易决策的及时性。
3. 风险管理与合规监控
在高频交易中,实时风险管理和合规监控至关重要。传统的风险监控系统难以实时处理海量交易数据,导致风险响应滞后。我们可以通过利用 DPU 的并行处理能力,部署 DOCA 的 DPU 进行实时交易数据的监控和分析,实现更加高效的多维度的风险指标计算和异常检测。
下面是测试代码:
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <random>
#include <cmath>
#include <chrono>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>// 配置参数
#define NUM_TRADES 100000 // 模拟生成的交易数量
#define QUEUE_MAXSIZE 10000 // 队列最大容量
#define POSITION_LIMIT 1000 // 最大持仓限制
#define MAX_TRADE_SIZE 100 // 单笔交易最大量
#define PRICE_FLUCTUATION_THRESHOLD 5.0 // 价格波动阈值
#define RAPID_TRADING_THRESHOLD 100 // 短时间内交易次数阈值
#define WINDOW_SIZE 100 // 快速交易检测的时间窗口大小std::mutex mtx;
std::condition_variable cv;
std::queue<std::tuple<int, int, double, double>> trade_queue; // 存储交易数据的队列
bool data_finished = false;// DOCA 初始化(省略 DOCA 设置的细节,假设已正确安装并设置)
void init_doca() {// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDK(这里只是示范,具体实现视需求)doca_dpdk_init();doca_dpdk_port_t* dpdk_port = doca_dpdk_port_start(/*端口配置*/);doca_dpdk_io_ctx_t* io_ctx = doca_dpdk_io_ctx_create(dpdk_port);
}// 模拟交易数据生成器
void trade_data_generator() {std::random_device rd;std::mt19937 gen(rd());std::normal_distribution<> price_dist(0.0, 0.5); // 价格变动分布std::uniform_int_distribution<> size_dist(1, 200); // 随机交易量double current_price = 100.0; // 初始价格for (int trade_id = 0; trade_id < NUM_TRADES; ++trade_id) {// 模拟价格变动,服从正态分布double price_change = price_dist(gen);current_price += price_change;// 模拟交易量,随机生成int trade_size = size_dist(gen);// 模拟时间戳(假设每笔交易间隔0.001秒)double timestamp = trade_id / 1000.0;// 存储交易数据{std::lock_guard<std::mutex> lock(mtx);trade_queue.push({trade_id, trade_size, current_price, timestamp});}cv.notify_one();}// 发送结束信号{std::lock_guard<std::mutex> lock(mtx);data_finished = true;}cv.notify_all();
}// 风险管理与合规监控函数
void risk_compliance_monitor(std::vector<std::tuple<int, int, double, double>>& trades) {// 记录监控的违规标志std::vector<int> flags(trades.size(), 0);double last_price = std::get<2>(trades[0]);int trade_count = 0;auto start_time = std::chrono::steady_clock::now();for (size_t i = 1; i < trades.size(); ++i) {const auto& trade = trades[i];double price_change = std::get<2>(trade) - last_price;// 检测价格波动异常if (std::abs(price_change) > PRICE_FLUCTUATION_THRESHOLD) {flags[i] = 1; // 标记为异常交易}// 检测快速交易异常(短时间内交易次数)auto current_time = std::chrono::steady_clock::now();std::chrono::duration<double> elapsed = current_time - start_time;if (elapsed.count() <= 1.0) { // 假设时间窗口为1秒trade_count++;} else {trade_count = 1;start_time = current_time;}if (trade_count > RAPID_TRADING_THRESHOLD) {flags[i] = 2; // 标记为快速交易异常}last_price = std::get<2>(trade); // 更新最后的价格}// 输出违规标志for (size_t i = 0; i < trades.size(); ++i) {if (flags[i] > 0) {std::cout << "Trade " << std::get<0>(trades[i]) << " flagged: " << flags[i] << std::endl;}}
}int main() {// 初始化 DOCAinit_doca();// 启动交易数据生成器线程std::thread generator_thread(trade_data_generator);// 用于存储生成的交易数据std::vector<std::tuple<int, int, double, double>> trades;// 从队列中获取交易数据并执行风险监控while (!data_finished || !trade_queue.empty()) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return !trade_queue.empty() || data_finished; });while (!trade_queue.empty()) {trades.push_back(trade_queue.front());trade_queue.pop();}lock.unlock();if (!trades.empty()) {risk_compliance_monitor(trades);trades.clear(); // 清空交易数据}}// 等待生成器线程完成generator_thread.join();return 0;
}
测试结果:
挂载 DPU 进行实时交易数据的监控和分析花费时间为 3.6 秒。
普通运行花费 4.49 秒。
可以看到风险检测的响应时间缩短了20%,实时监控和分析能力增强,及时发现并处理潜在的交易风险,提高了系统的风险管理能力。
4. DOCA 在 HFT 中的性能优势总结
通过上述案例分析,可以看出 DOCA 在高频交易中的多个方面展现出了显著的性能优势。
性能优势 | 描述 |
---|---|
低延迟 | 硬件加速和高效的数据路径设计,显著降低了数据传输和处理的延迟。 |
高吞吐量 | DPU 的并行处理能力和高效的数据管理,提升了系统的整体吞吐量。 |
资源优化 | 通过卸载网络和数据处理任务,优化了 CPU 和内存资源的利用,提高了系统的整体性能。 |
可扩展性 | DOCA 的模块化设计和灵活的编程模型,支持高频交易系统的快速扩展和定制化需求。 |
DOCA 通过其高性能的 DPU,为金融高频交易提供了强大的技术支持。其在网络优化、数据处理、并行计算和安全管理等方面的优势,满足了 HFT 对低延迟、高吞吐量和高可靠性的苛刻要求。
五、思考与总结
经过一系列测试和分析,我对 DOCA 开发环境下 DPU 的性能有了更清晰的了解。在 DPA All-to-All 应用测试中,DPU 在处理多核并发数据交换时表现得非常高效,延迟低、吞吐量达标。在基础计算测试中,DPU 的表现也相当稳健。从 CPU 的矩阵乘法到内存带宽、多线程和多进程性能评估,它都能应对自如。结合金融高频交易的应用场景,DOCA 展现出了其在低延迟、高吞吐量和高可靠性方面的卓越优势,进一步证明了其在高性能计算和实时数据处理中的广泛应用潜力。DPU 在并行计算和数据处理上的优势,使其在日常计算和系统任务中具备广泛的应用前景。未来,随着更多应用场景的开发和优化,DOCA 有望在更多领域发挥关键作用,推动数据中心和高性能计算的发展。
相关文章:

全面评测 DOCA 开发环境下的 DPU:性能表现、机器学习与金融高频交易下的计算能力分析
本文介绍了我在 DOCA 开发环境下对 DPU 进行测评和计算能力测试的一些真实体验和记录。在测评过程中,我主要关注了 DPU 在高并发数据传输和深度学习场景下的表现,以及基本的系统性能指标,包括 CPU 计算、内存带宽、多线程/多进程能力和 I/O 性…...

图论 八字码
我们可能惊异于某些技巧。我们认为这个技巧真是巧妙啊。或者有人认为我依靠自己的直觉想出了这个表示方法。非常自豪。我认为假设是很小的时候,比如说小学初中,还是不错的。到高中大学,就有一些不成熟了。因为这实际上是一个竞技。很多东西前…...

OSI5GWIFI自组网协议层次对比
目录 5G网络5G与其他协议栈各层映射 5G网络 物理层 (PHY) 是 5G 基站协议架构的最底层,负责将数字数据转换为适合无线传输的信号,并将接收到的无线信号转换为数字数据。实现数据的编码、调制、多天线处理、资源映射等操作。涉及使用新的频段(…...

北理新源监控平台都管理哪些数据
北理新源信息科技有限公司(简称“北理新源”)依托北京理工大学电动车辆国家工程研究中心,建设和运营了“新能源汽车国家监测与管理平台”。该平台是国家级的新能源汽车数据监管平台,主要负责对新能源汽车的运行数据进行采集、监测…...

WPS不登录无法使用基本功能的解决方案
前言 WPS不登录无法使用基本功能的原因通常是为了同步数据、提供更多高级功能或满足软件授权要求。然而,一些用户可能出于隐私或便捷性的考虑,不愿意登录账号。在这种情况下,WPS可能会限制未登录用户的使用权限,导致工具栏变灰…...

车载软件架构 --- CP和AP作为中央计算平台的软件架构双核心
我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 简单,单纯,喜欢独处,独来独往,不易合同频过着接地气的生活…...

【技巧】优雅的使用 pnpm+Monorepo 单体仓库构建一个高效、灵活的多项目架构
单体仓库(Monorepo)搭建指南:从零开始 单体仓库(Monorepo)是一种将多个相关项目集中管理在一个仓库中的开发模式。它可以帮助开发者共享代码、统一配置,并简化依赖管理。本文将通过实际代码示例࿰…...

【深度学习基础】多层感知机 | 权重衰减
【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈PyTorch深度学习 ⌋ ⌋ ⌋ 深度学习 (DL, Deep Learning) 特指基于深层神经网络模型和方法的机器学习。它是在统计机器学习、人工神经网络等算法模型基础上,结合当代大数据和大算力的发展而发展出来的。深度学习最重…...

修改word的作者 最后一次保存者 总编辑时间 创建时间 最后一次保存的日期
作者: 1.打开word文件 2.点击左上角的文件 3.选项 4.用户信息 5.将用户信息中的 姓名改为你需要的名字 最后一次保存者 1.word重命名为.zip文件 2.docProps中有个core.xml 3.用记事本打开有个lastModifiedBy标签,将里面内容改为你需要的名字 总编辑时…...

青少年编程与数学 02-007 PostgreSQL数据库应用 15课题、备份与还原
青少年编程与数学 02-007 PostgreSQL数据库应用 15课题、备份与还原 一、数据库备份与还原二、PostgreSQL中操作数据库的备份与还原1. 使用pg_dump进行逻辑备份2. 使用pg_restore进行逻辑还原3. 使用pg_basebackup进行物理备份4. 还原物理备份注意事项 三、自动备份1. 使用pg_d…...

Flutter:自定义Tab切换,订单列表页tab,tab吸顶
1、自定义tab切换 view <Widget>[// 好评<Widget>[TDImage(assetUrl: assets/img/order4.png,width: 36.w,height: 36.w,),SizedBox(width: 10.w,),TextWidget.body(好评,size: 24.sp,color: controller.tabIndex 0 ? AppTheme.colorfff : AppTheme.color999,),]…...

SAS-proc sgplot绘图
1、绘图-直条图示例: 1.1 数据集 1.2 代码 proc sgplot data sashelp.cars;vbar origin / response msrp /* response:响应变量,Y轴 */stat mean /* stat:统计量,结果用均值呈现 */group type /* group&#…...

怎么使用python 调用高德地图api查询位置和导航?
环境: python 3.10 问题描述: 怎么使用python 调用高德地图api查询位置和导航? 解决方案: 要使用Python调用高德地图API查询位置和导航,需要先注册高德开发者账号并获取API Key。以下是基本步骤: 1. 注册高德开…...

pikachu靶场-敏感信息泄露概述
敏感信息泄露概述 由于后台人员的疏忽或者不当的设计,导致不应该被前端用户看到的数据被轻易的访问到。 比如: ---通过访问url下的目录,可以直接列出目录下的文件列表; ---输入错误的url参数后报错信息里面包含操作系统、中间件、开发语言的版…...

使用ssh推送项目到github
文章目录 1. 确保已生成 SSH 密钥2. 在 GitHub 上创建远程仓库3. 初始化本地项目4. 将本地项目与远程仓库关联5. 添加文件并提交补充:拉取远程修改(可选)6. 推送到 GitHub7. 完成总结 出现的问题解决方法:方法 1:允许合…...

SAP MRP运行出现例外消息怎么处理?例外消息的优先级、案例分享
【SAP系统PP模块研究】 #SAP #PP #MRP #例外消息 #MRP评估 一、MRP评估中的例外消息 例外消息,是SAP系统在MRP运行过程中自动产生的消息。对例外消息检查其产生的原因,及时与销售、生产、采购等相关部门进行沟通,并进行相应调整&#x…...

002-SpringBoot整合AI(Alibaba)
SpringBoot整合AI 一、引入依赖二、配置application.yml三、获取 api-key四、编写 controller五、起服务调用 一、引入依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><vers…...

Java中如何安全地停止线程?
大家好,我是锋哥。今天分享关于【Java中如何安全地停止线程?】面试题。希望对大家有帮助; Java中如何安全地停止线程? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 在Java中,安全地停止线程是一项重要的任务,尤其…...

Apache Tomcat文件包含漏洞复现(详细教程)
1.漏洞原理 Tomcat 服务器是一个免费的开放源代码的Web 应用服务器,其安装后会默认开启ajp连接器,方便与其他web服务器通过ajp协议进行交互。属于轻量级应用服务器,在中小型系统和并发访问用户不是很多的场合下被普遍使用,是开发…...

个人学习 - 什么是Vim?
观我往旧,同我仰春 - 2025.1.10 声明 仅作为个人学习使用,仅供参考 本文所有解释参考笔者个人理解,最终目的是服务于自我学习, 如果你需要了解官方更规范的解释,请自行查阅 Vim 是什么 Vim 是一个强大的 文本编辑器…...

Flink Gauss CDC:深度剖析存量与增量同步的创新设计
目录 设计思路 1.为什么不直接用FlinkCDC要重写Flink Gauss CDC 2.存量同步的逻辑是什么 2.1、单主键的切片策略是什么 2.2、复合主键作切片,怎么保证扫描到所有的数据 3、增量同步的逻辑是什么 4、存量同步结束之后如何无缝衔接增量同步 5、下游数据如何落…...

docker 部署.netcore应用优势在什么地方?
目录 1. 环境一致性 2. 简化依赖管理 3. 快速部署与扩展 4. 资源利用率高 5. 版本控制与回滚 6. 安全性 7. 生态系统支持 8. 微服务架构支持 9. 降低成本 10. 开发体验提升 总结 使用 Docker 部署 .NET Core 应用有许多优势,特别是在开发、测试和生产环境…...

AIP-126 枚举
编号126原文链接AIP-126: Enumerations状态批准创建日期2019-07-24更新日期2019-07-24 一个域的值集合是一组数量有限的具体值,这是很常见的。此时使用枚举(缩写为“enums”)可有助于明确表达值集合的范围。 指南 API 可以 为不经常更改的…...

P3707 [SDOI2017] 相关分析 Solution
Description 给定序列 x ( x 1 , x 2 , ⋯ , x n ) , y ( y 1 , y 2 , ⋯ , y n ) x(x_1,x_2,\cdots,x_n),y(y_1,y_2,\cdots,y_n) x(x1,x2,⋯,xn),y(y1,y2,⋯,yn),有 m m m 个操作,分三种: query ( l , r ) \operatornam…...

Android AutoMotive --CarService
1、AAOS概述 Android AutoMotive OS是谷歌针对车机使用场景打造的操作系统,它是基于现有Android系统的基础上增加了新特性,最主要的就是增加了CarService(汽车服务)模块。我们很容易把Android AutoMotive和Android Auto搞混&…...

K8S中Service详解(三)
HeadLiness类型的Service 在某些场景中,开发人员可能不想使用Service提供的负载均衡功能,而希望自己来控制负载均衡策略,针对这种情况,kubernetes提供了HeadLiness Service,这类Service不会分配Cluster IP,…...

C++----STL(vector)
vector的介绍 vector的文档介绍:cplusplus.com/reference/vector/vector/ 1.基本概念 简单来说,vector是表示可以改变大小的数组的顺序容器。使用连续的存储位置来存储元素,因此可以通过常规指针的偏移量来高效访问。 2.内部机制 vector…...

Ubuntu24.04初始化MySQL报错 error while loading shared libraries libaio.so.1
Ubuntu24.04初始化MySQL报错 error while loading shared libraries: libaio.so.1 问题一:libaio1不存在 # 提示libaio1不存在 [rootzabbix-mysql-master.example.com x86_64-linux-gnu]#apt install numactl libaio1 Reading package lists... Done Building depe…...

初探大数据流式处理
文章目录 初探大数据流式处理批式处理系统特点流式处理系统特点大批次计算微批次计算适用场景 流式计算的应用场景流式大数据的特征流式计算的关键技术流式处理框架的特征三大流式数据处理框架 初探大数据流式处理 大数据处理系统主要分为批式处理和流式处理两类。批式处理将大…...

【Linux】Linux入门(三)权限
目录 前提权限概念whoami指令 Linux权限管理文件访问者的分类(人)file指令权限信息权限的表示方法 chmod指令 更改权限chown指令 修改文件,文件夹所属用户和用户组 权限掩码umask(权限掩码) 粘滞位 前提 请先看下面这…...