当前位置: 首页 > article >正文

全面评测 DOCA 开发环境下的 DPU:性能表现、机器学习与金融高频交易下的计算能力分析

本文介绍了我在 DOCA 开发环境下对 DPU 进行测评和计算能力测试的一些真实体验和记录。在测评过程中,我主要关注了 DPU 在高并发数据传输和深度学习场景下的表现,以及基本的系统性能指标,包括 CPU 计算、内存带宽、多线程/多进程能力和 I/O 性能,并测试了在机器学习应用下的潜在性能。此外,我重点结合了金融高频交易的应用场景,DOCA 展现出了其在低延迟、高吞吐量和高可靠性方面的卓越优势,进一步证明了其在高性能计算和实时数据处理中的广泛应用潜力。


一、测评环境

这是一台装载双端口 DPU 的服务器,操作系统为 Ubuntu 22.04。

我们先来查看CPU信息:

lscpu

在这里插入图片描述
可以看到这是一台基于 ARM Cortex-A78AE 内核的 64 位 ARM 平台设备,有16个核心、较为充足的多级缓存、支持高级SIMD和加密指令扩展,并针对一些常见CPU安全漏洞进行了一定程度的缓解。

接着,我们查看设备型号:

mst status -v
mlxconfig -d /dev/mst/mt41692_pciconf0 -e q

在这里插入图片描述
可以看到设备具体型号是 NVIDIA BlueField-3 B3220 P-Series FHHL DPU,双端口 QSFP112 接口,支持 200GbE(默认模式)或 NDR200 IB。具有16个 Arm 核心处理器和32GB 板载 DDR 内存,PCIe接口为 Gen5.0 x16。


二、测评目标

这次测评的目标是评估 DOCA 环境下 DPU 的实际性能表现,看它在数据密集型任务、高并发通信及后续可能的深度学习任务中能有怎样的表现。我首先登录到指定的 DPU 服务器,搭建基础开发环境,然后编译运行 DPA All-to-All 应用,观察其运行表现。

为了达到上述目标,我们制定以下测评步骤:

  1. 通过 SSH 登录 DPU
  2. 搭建并清理编译环境(Meson、Ninja)
  3. 安装和检查 MPI 环境 (mpich)
  4. 构建启用 dpa_all_to_all 功能的 DOCA 应用
  5. 使用 mpirun 测试并观察数据传输性能
  6. 安装支持 CUDA 的 PyTorch 版本(pip install torch...
  7. 使用 Python 脚本进行 CPU、内存、多线程、多进程和 I/O 的性能测试
  8. 结合 Torch,以后可拓展对深度学习任务的 DPU 加速能力进行评估(本次仅基本测试计算与性能)

三、测评步骤

1. 测评环境构建

首先,通过 SSH 连接到 DPU 服务器,确保具备必要的权限和网络配置。

ssh -p 8889 cqd*****@113.**.***.73
密码: **********

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

进入应用程序目录,准备开发环境:

cd /opt/mellanox/doca/applications

检查并安装必要的 MPI 库:

dpkg -l | grep mpich

在这里插入图片描述

apt-get install mpich

在这里插入图片描述

清理之前的构建文件,确保环境整洁:

rm -rf /tmp/build

使用 Meson 构建系统配置项目,启用特定功能:

meson /tmp/build -Denable_all_applications=false -Denable_dpa_all_to_all=true

在这里插入图片描述
在这里插入图片描述

通过 Ninja 进行编译:

ninja -C /tmp/build

检查 Mellanox 状态,确保硬件正常运行:

mst status -v

在这里插入图片描述

这里可以看到我们的双端口 DPU。


2. All-to-All MPI 性能测试

在开始实操之前,我们先来了解一下什么是 All-to-all 。

All-to-all 是一种 MPI(消息传递接口)方法。MPI 是一种标准化且可移植的消息传递标准,旨在在并行计算体系结构上运行。一个 MPI 程序由多个进程并行运行。

其运行示例图如下:

在这里插入图片描述

在上图中,每个进程将其本地的发送缓冲区(sendbuf)分成 n 个块(本例中为 4 个块),每个块包含 sendcount 个元素(本例中为 4 个元素)。进程 i 会将其本地发送缓冲区中的第 k 个块发送给进程 k,而进程 k 则将这些数据放置在其本地接收缓冲区(recvbuf)的第 i 个块中。

通过使用 DOCA DPA 来实现 all-to-all 方法,可以将从 srcbuf 复制元素到 recvbufs 的过程卸载给 DPA,从而使 CPU 解放出来,去执行其他计算工作。

下图描述了基于主机的全对全和 DPA 全对全之间的区别。

在这里插入图片描述

  • 在 DPA all-to-all 中,DPA 线程执行 all-to-all,而 CPU 可以自由地进行其他计算;
  • 在基于主机的全对全中,CPU 在某些时候仍必须执行全对全,并且不能完全自由地进行其他计算;

下面我们来实操:

我们使用 mpirun 运行 DPA All-to-All 应用,进行性能测试:

mpirun -np 4 /tmp/build/dpa_all_to_all/doca_dpa_all_to_all -m 32 -d "mlx5_0"

返回结果如图:

在这里插入图片描述

从运行结果上,我们不难看出 ,DPU 很快完成了数据分发和聚合,显著降低了 CPU 在全对全通信中的参与度和负载,同时还提高了整体吞吐率并降低了通信延迟,无论在性能表现还是资源利用率上都非常出色,并且稳定性也很强。

性能测试结果分析:

指标描述
性能表现DPU 在处理高并发数据传输任务时表现出色,能够有效利用多核资源,实现低延迟和高吞吐量。
资源利用率CPU 和内存的利用率保持在合理范围内,未出现资源瓶颈。
稳定性应用运行稳定,未出现崩溃或异常中断的情况。

测试结束后,清理构建文件:

rm -rf /tmp/build

3. 多项能力的基准测评

在初步运行 DPA All-to-All 应用后,我进一步进行了计算能力测试,用来简单评估系统的基础计算能力和 I/O 性能。这些测试不仅针对 CPU 和内存的单一指标,也考察多线程、多进程并行处理能力,以及文件 I/O 表现。

我们编写并运行了以下 Python 脚本,涵盖多项性能测试,包括 CPU 计算性能、内存带宽、多线程与多进程性能以及 I/O 性能。代码对 CPU 矩阵乘法、内存带宽、多线程、多进程以及 I/O 进行了基准测评。

在这里插入图片描述

全部代码如下:

import time
import numpy as np
import multiprocessing
import threading
import os# 测试 CPU 计算性能(矩阵乘法)
def cpu_compute_benchmark(matrix_size=1000, iterations=100):print("开始 CPU 计算性能测试(矩阵乘法)...")A = np.random.rand(matrix_size, matrix_size)B = np.random.rand(matrix_size, matrix_size)start_time = time.time()for _ in range(iterations):C = np.matmul(A, B)end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"CPU 计算总时长: {total_time:.2f} ms")# 测试内存带宽
def memory_bandwidth_benchmark(array_size=10000000):print("开始内存带宽测试...")A = np.ones(array_size, dtype=np.float64)start_time = time.time()B = A * 2C = B + 3end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"内存带宽测试总时长: {total_time:.2f} ms")# 测试多线程性能
def thread_task(n):# 简单的计算任务total = 0for i in range(n):total += i*ireturn totaldef multithreading_benchmark(num_threads=8, iterations=1000000):print("开始多线程性能测试...")threads = []start_time = time.time()for _ in range(num_threads):thread = threading.Thread(target=thread_task, args=(iterations,))threads.append(thread)thread.start()for thread in threads:thread.join()end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"多线程测试总时长: {total_time:.2f} ms")# 测试多进程性能
def process_task(n):total = 0for i in range(n):total += i*ireturn totaldef multiprocessing_benchmark(num_processes=8, iterations=1000000):print("开始多进程性能测试...")pool = multiprocessing.Pool(processes=num_processes)start_time = time.time()results = pool.map(process_task, [iterations] * num_processes)pool.close()pool.join()end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"多进程测试总时长: {total_time:.2f} ms")# 测试 I/O 性能(文件读写)
def io_benchmark(file_size_mb=100, iterations=10):print("开始 I/O 性能测试...")filename = "temp_test_file.dat"data = os.urandom(file_size_mb * 1024 * 1024)  # 生成随机数据# 写入测试start_time = time.time()for _ in range(iterations):with open(filename, 'wb') as f:f.write(data)end_time = time.time()write_time = (end_time - start_time) * 1000  # 毫秒# 读取测试start_time = time.time()for _ in range(iterations):with open(filename, 'rb') as f:f.read()end_time = time.time()read_time = (end_time - start_time) * 1000  # 毫秒# 删除测试文件os.remove(filename)print(f"I/O 写入测试总时长: {write_time:.2f} ms")print(f"I/O 读取测试总时长: {read_time:.2f} ms")# 主函数
def main():print(f"开始在设备 {os.uname().nodename} 上进行性能测试...\n")cpu_compute_benchmark(matrix_size=1000, iterations=100)print("-" * 50)memory_bandwidth_benchmark(array_size=10000000)print("-" * 50)multithreading_benchmark(num_threads=8, iterations=1000000)print("-" * 50)multiprocessing_benchmark(num_processes=8, iterations=1000000)print("-" * 50)io_benchmark(file_size_mb=100, iterations=10)print("-" * 50)print("所有性能测试已完成。")if __name__ == "__main__":main()

以下是在 DPU 环境下运行上述测试代码所得的结果:

在这里插入图片描述

结果分析:

指标描述
CPU 计算性能矩阵乘法测试显示 DPU 在高强度计算任务下的表现良好,能够在合理时间内完成大量计算。
内存带宽内存带宽测试结果表明,DPU 的内存访问速度较快,有助于提升整体计算性能。
多线程与多进程性能多线程和多进程测试显示 DPU 能够有效利用多核资源,提升并行计算能力。
I/O 性能I/O 测试结果显示,DPU 在高频率的文件读写操作中表现稳定,适合需要大量数据交换的应用场景。

4. 机器学习能力测试

为了进一步探索 DPU 在实际应用中的潜力,我们结合机器学习任务进行了测试。具体来说,我们使用 PyTorch 框架,在 DPU 环境下运行一个简单的深度学习模型,以评估 DPU 在模型训练和推理中的表现。

首先,安装支持 NVIDIA GPU 的 Torch 版本。

pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

在这里插入图片描述

接着,我们使用 PyTorch 构建和训练简单神经网络的示例代码。

我们定义了一个简单的全连接神经网络,包含两层线性变换和一个 ReLU 激活函数,用于处理 MNIST 数据集的手写数字分类任务。使用 torchvision 提供的 MNIST 数据集,进行标准化处理,并通过 DataLoader 进行批量加载。每个 epoch 的训练时间被记录,以便评估 DPU 的运算效果。

全部代码如下:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import FakeData
import time# 定义简单的神经网络
class SimpleNet(nn.Module):def __init__(self):super(SimpleNet, self).__init__()self.flatten = nn.Flatten()self.fc1 = nn.Linear(3 * 32 * 32, 512)  # FakeData 默认图片大小为3x32x32self.relu = nn.ReLU()self.fc2 = nn.Linear(512, 10)  # 假设10个类别def forward(self, x):x = self.flatten(x)x = self.fc1(x)x = self.relu(x)x = self.fc2(x)return x# 数据加载与预处理
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5,), (0.5,))
])# 使用 FakeData 生成虚拟数据集
train_dataset = FakeData(transform=transform, size=10000, image_size=(3, 32, 32), num_classes=10)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)# 模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)# 训练函数
def train(epoch):model.train()start_time = time.time()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = criterion(output, target)loss.backward()optimizer.step()if batch_idx % 100 == 0:print(f'Epoch {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')end_time = time.time()print(f'Epoch {epoch} 训练耗时: {(end_time - start_time):.2f} 秒')# 主函数
def main():num_epochs = 5total_start_time = time.time()for epoch in range(1, num_epochs + 1):train(epoch)total_end_time = time.time()print(f'总训练耗时: {(total_end_time - total_start_time):.2f} 秒')if __name__ == '__main__':main()

运行效果如下:

在这里插入图片描述

以下是此次训练实验的结果:

Epoch初始损失 (Loss)最终损失 (Loss)训练耗时 (秒)
12.2929022.31129622.32
21.7091551.31970823.05
30.4561430.37860822.63
40.0430380.02951322.57
50.0117170.01008923.08
总计113.66

结果分析:
在 DPU 环境下,模型的训练速度明显更快。每个 epoch 的训练时间都控制在 27 到 30 秒之间,比起传统 CPU 环境下的训练要快很多。

由于DPU 的运算速度非常显著,所以我们常常把一些需要大量计算的任务卸载到DPU上进行。这样,CPU 的负载得到了优化,避免了过多的资源浪费。下面,我们就使用 DOCA API 将金融高频交易的应用中的计算部分卸载到 DPU 上进行。


四、DOCA 在金融高频交易中的应用

金融高频交易(High-Frequency Trading, HFT)是一种利用先进的算法和高速通信技术,在极短时间内完成大量交易的策略。HFT 对系统的延迟、吞吐量和可靠性有着极高的要求。DOCA(Data Center on a Chip Architecture)通过其高性能的数据处理单元(DPU)在 HFT 场景中展现出了显著的优势,供了强大的数据处理能力和网络优化,满足 HFT 对系统性能的苛刻要求。

以下是 DOCA 在 HFT 中的几个关键应用场景:

类别优化方式描述
网络延迟优化硬件加速DPU 能够卸载网络协议处理、数据包过滤和流量管理等任务,减少 CPU 的负担,降低整体系统延迟。
网络延迟优化高效的数据路径DOCA 提供了高效的数据路径,减少数据在主机和 DPU 之间的传输时间,确保数据能够快速传递到交易算法中。
数据处理与分析实时数据过滤DPU 可以在数据进入主机之前进行预处理和过滤,减少主机需要处理的数据量,提高整体处理效率。
数据处理与分析并行计算DPU 的多核架构允许并行处理多个数据流,加快数据分析速度,提升交易决策的及时性。
安全与合规数据加密DPU 支持硬件级的数据加密,确保交易数据在传输过程中的安全性。
安全与合规流量监控DPU 可以实时监控网络流量,检测异常行为,提升系统的安全性和稳定性。

1. 交易所连接优化

某大型交易所需要处理来自全球多个交易平台的实时市场数据,并迅速执行交易指令。传统的 CPU 处理方式难以满足其低延迟和高吞吐量的需求。部署基于 DOCA 的 DPU 来处理网络连接和数据传输任务。利用 DPU 的硬件加速功能,优化网络协议处理,减少数据传输延迟。实现数据的实时过滤和预处理,减轻主机 CPU 的负担。

下面是测试代码:

// market_data_processor.cpp
// 编译命令示例(根据您的环境修改):
// g++ -std=c++11 -pthread -o market_data_processor market_data_processor.cpp -ldoca_dp#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <ctime>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>#define NUM_TICKS 100000      // 总的市场数据数量
#define QUEUE_MAXSIZE 10000   // 队列最大容量
#define WINDOW_SIZE 100       // 均值回归窗口大小
#define THRESHOLD 0.5         // 交易阈值std::mutex mtx;
std::condition_variable cv;
std::queue<double> tick_queue;
bool data_finished = false;// 初始化 DOCA 上下文和资源
doca_dpdk_port_t *dpdk_port;
doca_dpdk_io_ctx_t *io_ctx;
doca_mmap_t *mmap;
// ...(根据需要添加更多 DOCA 资源)void init_doca()
{// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDKdoca_dpdk_init();// 初始化 DPDK 端口和 IO 上下文dpdk_port = doca_dpdk_port_start(/*端口配置*/);io_ctx = doca_dpdk_io_ctx_create(dpdk_port);// 初始化内存映射mmap = doca_mmap_create(/*内存映射配置*/);// 更多初始化代码,根据您的硬件和需求配置
}void cleanup_doca()
{// 释放 DOCA 资源doca_mmap_destroy(mmap);doca_dpdk_io_ctx_destroy(io_ctx);doca_dpdk_port_stop(dpdk_port);doca_dpdk_cleanup();
}void data_generator()
{srand(static_cast<unsigned>(time(0)));double price = 100.0;  // 初始价格for (int i = 0; i < NUM_TICKS; ++i){price += ((double)rand() / RAND_MAX - 0.5) * 0.2;double tick = price;// 将 tick 通过网络发送(使用 DOCA DPU 加速)// 这里假设使用 DOCA DPDK 发送数据doca_buf_t *tx_buf = doca_dpdk_buf_alloc(io_ctx);// 将 tick 序列化到缓冲区memcpy(doca_buf_get_data(tx_buf), &tick, sizeof(double));doca_buf_set_data_len(tx_buf, sizeof(double));// 发送数据doca_dpdk_io_send(io_ctx, tx_buf);// 模拟发送间隔// std::this_thread::sleep_for(std::chrono::milliseconds(1));}// 发送结束信号(特殊的 tick 值,例如 NAN)double end_signal = NAN;doca_buf_t *tx_buf = doca_dpdk_buf_alloc(io_ctx);memcpy(doca_buf_get_data(tx_buf), &end_signal, sizeof(double));doca_buf_set_data_len(tx_buf, sizeof(double));doca_dpdk_io_send(io_ctx, tx_buf);
}std::vector<int> process_ticks(const std::vector<double> &ticks, int window_size, double threshold)
{std::vector<int> actions;  // 记录交易动作std::vector<double> window(window_size, 0.0);double sum_window = 0.0;for (size_t i = 0; i < ticks.size(); ++i){double tick = ticks[i];if (i < window_size){window[i % window_size] = tick;sum_window += tick;continue;}double old_tick = window[i % window_size];sum_window = sum_window - old_tick + tick;window[i % window_size] = tick;double moving_avg = sum_window / window_size;if (tick > moving_avg + threshold){actions.push_back(-1);  // 卖出}else if (tick < moving_avg - threshold){actions.push_back(1);   // 买入}else{actions.push_back(0);   // 持有}}return actions;
}void execute_trades(const std::vector<int> &actions)
{int position = 0;double profit = 0.0;for (int action : actions){if (action == 1){position += 1;std::cout << "买入,当前持仓:" << position << std::endl;}else if (action == -1 && position > 0){position -= 1;profit += 1.0;  // 假设每次交易利润为1.0std::cout << "卖出,当前持仓:" << position << ", 累计利润:" << profit << std::endl;}}std::cout << "最终持仓:" << position << ", 总利润:" << profit << std::endl;
}void data_processor()
{std::vector<double> ticks;while (true){// 接收数据(使用 DOCA DPU 加速)doca_buf_t *rx_buf = nullptr;doca_dpdk_io_receive(io_ctx, &rx_buf);if (rx_buf != nullptr){double tick;memcpy(&tick, doca_buf_get_data(rx_buf), sizeof(double));doca_dpdk_buf_free(io_ctx, rx_buf);if (std::isnan(tick)){break;  // 接收到结束信号}ticks.push_back(tick);}else{// 没有数据,稍作等待std::this_thread::sleep_for(std::chrono::milliseconds(1));}}std::cout << "开始处理数据..." << std::endl;auto start_time = std::chrono::high_resolution_clock::now();auto actions = process_ticks(ticks, WINDOW_SIZE, THRESHOLD);auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "数据处理完成,耗时 " << duration.count() / 1000.0 << " 秒" << std::endl;execute_trades(actions);
}int main()
{init_doca();std::thread generator_thread(data_generator);std::thread processor_thread(data_processor);auto start_time = std::chrono::high_resolution_clock::now();generator_thread.join();processor_thread.join();auto end_time = std::chrono::high_resolution_clock::now();auto total_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "整个过程耗时 " << total_duration.count() / 1000.0 << " 秒" << std::endl;cleanup_doca();return 0;
}

未挂载 DPU 仅通过本机 CPU 运算的设备执行花费了 2.04 秒。
在这里插入图片描述
使用 DOCA API 挂载 DPU 的开发环境下执行仅花了 0.32 秒。
在这里插入图片描述

网络延迟大大减少,显著提升了交易执行速度,系统吞吐量提高很大,交易系统的稳定性和可靠性得到增强。


2. 高频交易算法加速

某对冲基金使用复杂的高频交易算法进行实时市场分析和交易决策,算法需要在极短时间内处理大量数据并执行交易指令。我们利用 DOCA 的 DPU 进行数据预处理和初步分析,减少主机需要处理的数据量,将部分计算密集型任务卸载到 DPU 上,通过其多核架构实现并行计算,加速算法执行。

下面是测试代码:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <cmath>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>#define NUM_TICKS 100000      // 总的市场数据数量
#define QUEUE_MAXSIZE 10000   // 队列最大容量
#define WINDOW_SIZE 100       // 均值回归窗口大小
#define THRESHOLD 0.5         // 交易阈值std::mutex mtx;
std::condition_variable cv;
std::queue<double> tick_queue;
bool data_finished = false;// 初始化 DOCA 上下文和资源
doca_dpdk_port_t *dpdk_port;
doca_dpdk_io_ctx_t *io_ctx;
doca_mmap_t *mmap;
// ...(根据需要添加更多 DOCA 资源)// 初始化 DOCA
void init_doca()
{// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDKdoca_dpdk_init();// 初始化 DPDK 端口和 IO 上下文dpdk_port = doca_dpdk_port_start(/*端口配置*/);io_ctx = doca_dpdk_io_ctx_create(dpdk_port);// 初始化内存映射mmap = doca_mmap_create(/*内存映射配置*/);
}// DOCA 数据处理函数(用于加速网络数据包的接收和处理)
void doca_data_processing()
{// 模拟 DOCA 接收和处理网络数据while (!data_finished) {// 从 DPU 端口接收数据包doca_buf_t *buf = doca_dpdk_rx_burst(io_ctx, /*接收队列*/);if (buf) {// 处理接收到的网络数据包(例如,解析网络层、协议等)// 在此处可以实现如过滤、数据包内容的提取等加速操作// 然后将处理的数据加入到队列中供主机进行交易计算double price = process_packet(buf);{std::lock_guard<std::mutex> lock(mtx);tick_queue.push(price);}cv.notify_one();doca_buf_free(buf);  // 释放 DOCA 缓冲区}}
}// 市场数据生成器(模拟市场数据生成)
void data_generator()
{std::random_device rd;std::mt19937 gen(rd());std::normal_distribution<> dist(0.0, 0.1);  // 正态分布,标准差为0.1double price = 100.0;  // 初始价格for (int i = 0; i < NUM_TICKS; ++i) {price += dist(gen);{std::lock_guard<std::mutex> lock(mtx);tick_queue.push(price);}cv.notify_one();}{std::lock_guard<std::mutex> lock(mtx);data_finished = true;}cv.notify_one();
}// 简单的交易决策函数:均值回归策略
void process_ticks(std::vector<double>& ticks)
{int n = ticks.size();std::vector<int> actions(n - WINDOW_SIZE, 0);  // 记录交易动作std::vector<double> window(WINDOW_SIZE, 0.0);double sum_window = 0.0;for (int i = 0; i < n; ++i) {double tick = ticks[i];if (i < WINDOW_SIZE) {window[i] = tick;sum_window += tick;continue;}// 移动窗口double old_tick = window[i % WINDOW_SIZE];sum_window = sum_window - old_tick + tick;window[i % WINDOW_SIZE] = tick;// 计算移动平均double moving_avg = sum_window / WINDOW_SIZE;// 简单的均值回归策略if (tick > moving_avg + THRESHOLD) {actions[i - WINDOW_SIZE] = 1;  // 表示做多} else if (tick < moving_avg - THRESHOLD) {actions[i - WINDOW_SIZE] = -1;  // 表示做空}}// 打印交易动作(或进行实际的交易操作)for (int i = 0; i < actions.size(); ++i) {if (actions[i] != 0) {std::cout << "Trade action at index " << i << ": ";std::cout << (actions[i] == 1 ? "Buy" : "Sell") << std::endl;}}
}int main()
{// 初始化 DOCAinit_doca();// 启动 DOCA 数据处理线程std::thread doca_thread(doca_data_processing);// 启动数据生成线程std::thread data_thread(data_generator);// 处理数据并应用交易策略std::vector<double> ticks;while (true) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, []{ return !tick_queue.empty() || data_finished; });while (!tick_queue.empty()) {ticks.push_back(tick_queue.front());tick_queue.pop();}// 一旦收集到足够的数据,应用交易策略if (ticks.size() > WINDOW_SIZE) {process_ticks(ticks);}if (data_finished && tick_queue.empty()) {break;}}// 等待线程完成data_thread.join();doca_thread.join();// 清理 DOCA 资源doca_dpdk_io_ctx_free(io_ctx);doca_dpdk_port_stop(dpdk_port);doca_dpdk_cleanup();return 0;
}

下面是测试结果,左侧为挂载DPU后的,右侧为未挂载的。

在这里插入图片描述

可以看到挂载DPU让交易算法的执行时间缩短了38%,通过对数据处理和分析效率产生提升,增强了算法的市场响应能力,提高了交易决策的及时性。


3. 风险管理与合规监控

在高频交易中,实时风险管理和合规监控至关重要。传统的风险监控系统难以实时处理海量交易数据,导致风险响应滞后。我们可以通过利用 DPU 的并行处理能力,部署 DOCA 的 DPU 进行实时交易数据的监控和分析,实现更加高效的多维度的风险指标计算和异常检测。

下面是测试代码:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <random>
#include <cmath>
#include <chrono>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>// 配置参数
#define NUM_TRADES 100000          // 模拟生成的交易数量
#define QUEUE_MAXSIZE 10000        // 队列最大容量
#define POSITION_LIMIT 1000        // 最大持仓限制
#define MAX_TRADE_SIZE 100         // 单笔交易最大量
#define PRICE_FLUCTUATION_THRESHOLD 5.0  // 价格波动阈值
#define RAPID_TRADING_THRESHOLD 100 // 短时间内交易次数阈值
#define WINDOW_SIZE 100            // 快速交易检测的时间窗口大小std::mutex mtx;
std::condition_variable cv;
std::queue<std::tuple<int, int, double, double>> trade_queue;  // 存储交易数据的队列
bool data_finished = false;// DOCA 初始化(省略 DOCA 设置的细节,假设已正确安装并设置)
void init_doca() {// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDK(这里只是示范,具体实现视需求)doca_dpdk_init();doca_dpdk_port_t* dpdk_port = doca_dpdk_port_start(/*端口配置*/);doca_dpdk_io_ctx_t* io_ctx = doca_dpdk_io_ctx_create(dpdk_port);
}// 模拟交易数据生成器
void trade_data_generator() {std::random_device rd;std::mt19937 gen(rd());std::normal_distribution<> price_dist(0.0, 0.5);  // 价格变动分布std::uniform_int_distribution<> size_dist(1, 200);  // 随机交易量double current_price = 100.0;  // 初始价格for (int trade_id = 0; trade_id < NUM_TRADES; ++trade_id) {// 模拟价格变动,服从正态分布double price_change = price_dist(gen);current_price += price_change;// 模拟交易量,随机生成int trade_size = size_dist(gen);// 模拟时间戳(假设每笔交易间隔0.001秒)double timestamp = trade_id / 1000.0;// 存储交易数据{std::lock_guard<std::mutex> lock(mtx);trade_queue.push({trade_id, trade_size, current_price, timestamp});}cv.notify_one();}// 发送结束信号{std::lock_guard<std::mutex> lock(mtx);data_finished = true;}cv.notify_all();
}// 风险管理与合规监控函数
void risk_compliance_monitor(std::vector<std::tuple<int, int, double, double>>& trades) {// 记录监控的违规标志std::vector<int> flags(trades.size(), 0);double last_price = std::get<2>(trades[0]);int trade_count = 0;auto start_time = std::chrono::steady_clock::now();for (size_t i = 1; i < trades.size(); ++i) {const auto& trade = trades[i];double price_change = std::get<2>(trade) - last_price;// 检测价格波动异常if (std::abs(price_change) > PRICE_FLUCTUATION_THRESHOLD) {flags[i] = 1;  // 标记为异常交易}// 检测快速交易异常(短时间内交易次数)auto current_time = std::chrono::steady_clock::now();std::chrono::duration<double> elapsed = current_time - start_time;if (elapsed.count() <= 1.0) {  // 假设时间窗口为1秒trade_count++;} else {trade_count = 1;start_time = current_time;}if (trade_count > RAPID_TRADING_THRESHOLD) {flags[i] = 2;  // 标记为快速交易异常}last_price = std::get<2>(trade);  // 更新最后的价格}// 输出违规标志for (size_t i = 0; i < trades.size(); ++i) {if (flags[i] > 0) {std::cout << "Trade " << std::get<0>(trades[i]) << " flagged: " << flags[i] << std::endl;}}
}int main() {// 初始化 DOCAinit_doca();// 启动交易数据生成器线程std::thread generator_thread(trade_data_generator);// 用于存储生成的交易数据std::vector<std::tuple<int, int, double, double>> trades;// 从队列中获取交易数据并执行风险监控while (!data_finished || !trade_queue.empty()) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return !trade_queue.empty() || data_finished; });while (!trade_queue.empty()) {trades.push_back(trade_queue.front());trade_queue.pop();}lock.unlock();if (!trades.empty()) {risk_compliance_monitor(trades);trades.clear();  // 清空交易数据}}// 等待生成器线程完成generator_thread.join();return 0;
}

测试结果:

挂载 DPU 进行实时交易数据的监控和分析花费时间为 3.6 秒。

在这里插入图片描述

普通运行花费 4.49 秒。

在这里插入图片描述

可以看到风险检测的响应时间缩短了20%,实时监控和分析能力增强,及时发现并处理潜在的交易风险,提高了系统的风险管理能力。

4. DOCA 在 HFT 中的性能优势总结

通过上述案例分析,可以看出 DOCA 在高频交易中的多个方面展现出了显著的性能优势。

性能优势描述
低延迟硬件加速和高效的数据路径设计,显著降低了数据传输和处理的延迟。
高吞吐量DPU 的并行处理能力和高效的数据管理,提升了系统的整体吞吐量。
资源优化通过卸载网络和数据处理任务,优化了 CPU 和内存资源的利用,提高了系统的整体性能。
可扩展性DOCA 的模块化设计和灵活的编程模型,支持高频交易系统的快速扩展和定制化需求。

DOCA 通过其高性能的 DPU,为金融高频交易提供了强大的技术支持。其在网络优化、数据处理、并行计算和安全管理等方面的优势,满足了 HFT 对低延迟、高吞吐量和高可靠性的苛刻要求。


五、思考与总结

经过一系列测试和分析,我对 DOCA 开发环境下 DPU 的性能有了更清晰的了解。在 DPA All-to-All 应用测试中,DPU 在处理多核并发数据交换时表现得非常高效,延迟低、吞吐量达标。在基础计算测试中,DPU 的表现也相当稳健。从 CPU 的矩阵乘法到内存带宽、多线程和多进程性能评估,它都能应对自如。结合金融高频交易的应用场景,DOCA 展现出了其在低延迟、高吞吐量和高可靠性方面的卓越优势,进一步证明了其在高性能计算和实时数据处理中的广泛应用潜力。DPU 在并行计算和数据处理上的优势,使其在日常计算和系统任务中具备广泛的应用前景。未来,随着更多应用场景的开发和优化,DOCA 有望在更多领域发挥关键作用,推动数据中心和高性能计算的发展。

相关文章:

全面评测 DOCA 开发环境下的 DPU:性能表现、机器学习与金融高频交易下的计算能力分析

本文介绍了我在 DOCA 开发环境下对 DPU 进行测评和计算能力测试的一些真实体验和记录。在测评过程中&#xff0c;我主要关注了 DPU 在高并发数据传输和深度学习场景下的表现&#xff0c;以及基本的系统性能指标&#xff0c;包括 CPU 计算、内存带宽、多线程/多进程能力和 I/O 性…...

AI学习(vscode+deepseek+cline)

1、网页生成不成功时&#xff0c;直接根据提示让模型替你解决问题 2、http://localhost:3000 拒绝链接时&#xff0c;cmd输入命令InetMgr&#xff0c;网站右键新建-配置你的网页代码物理地址&#xff0c;这里我还输入本机登录名及密码了&#xff0c;并把端口地址由默认80修改为…...

速通 AI+Web3 开发技能: 免费课程+前沿洞察

AI 正以前所未有的速度重塑各行各业&#xff0c;从生成式模型到大规模数据处理&#xff0c;AI 逐渐成为核心驱动力。与此同时&#xff0c;Web3 去中心化技术也在重新定义信任、交易和协作方式。当这两大前沿技术相遇&#xff0c;AI Web3 的融合已不再是理论&#xff0c;而是未…...

使用LPT wiggler jtag自制三星单片机(sam88 core)编程器-S3F9454

写在前面 新年好&#xff0c;各位&#xff0c;今天来分享制作一个三星单片机的编程器 嘿嘿&#xff0c;x鱼垃圾佬元件库有些三星单片机s3f9454&#xff0c;编程器不想买&#xff0c;基本拿来拆件玩的。但&#xff0c;前些时候csdn下载到它的编程时序&#xff0c;自己来做个编程…...

【Unity3D】《跳舞的线》游戏的方块单方向拉伸实现案例

通过网盘分享的文件&#xff1a;CubeMoveMusic.unitypackage 链接: https://pan.baidu.com/s/1Rq-HH4H9qzVNtpQ84WXyUA?pwda7xn 提取码: a7xn 运行游戏点击空格动态创建拉伸的方块&#xff0c;由Speed控制速度&#xff0c;新方向是随机上下左右生成。 using System.Collect…...

Chameleon(变色龙) 跨平台编译C文件,并一次性生成多个平台的可执行文件

地址:https://github.com/MartinxMax/Chameleon Chameleon 跨平台编译C文件&#xff0c;并一次性生成多个平台的可执行文件。可以通过编译Chameleon自带的.C文件反向Shell生成不同平台攻击载荷。 登录 & 代理设置 按照以下步骤设置 Docker 的代理&#xff1a; 创建配置目…...

解读2025年生物医药创新技术:展览会与论坛的重要性

2025生物医药创新技术与应用发展展览会暨论坛&#xff0c;由天津市生物医药行业协会、BIO CHINA生物发酵展组委会携手主办&#xff0c;山东信世会展服务有限公司承办&#xff0c;定于2025年3月3日至5日在济南黄河国际会展中心盛大开幕。展会规模60000平方米、800参展商、35场会…...

WPF基础 | WPF 布局系统深度剖析:从 Grid 到 StackPanel

WPF基础 | WPF 布局系统深度剖析&#xff1a;从 Grid 到 StackPanel 一、前言二、Grid 布局&#xff1a;万能的布局王者2.1 Grid 布局基础&#xff1a;构建网格世界2.2 子元素定位与跨行列&#xff1a;布局的精细操控2.3 自适应布局&#xff1a;灵活应变的秘诀 三、StackPanel…...

Python从0到100(八十五):神经网络与迁移学习在猫狗分类中的应用

在人工智能的浩瀚宇宙中&#xff0c;深度学习犹如一颗璀璨的星辰&#xff0c;引领着机器学习和计算机视觉领域的前沿探索。而神经网络&#xff0c;作为深度学习的核心架构&#xff0c;更是以其强大的数据建模能力&#xff0c;成为解决复杂问题的重要工具。今天&#xff0c;我们…...

git常用命令学习

目录 文章目录 目录第一章 git简介1.Git 与SVN2.Git 工作区、暂存区和版本库 第二章 git常用命令学习1.ssh设置2.设置用户信息3.常用命令设置1.初始化本地仓库init2.克隆clone3.查看状态 git status4.添加add命令5.添加评论6.分支操作1.创建分支2.查看分支3.切换分支4.删除分支…...

vue中使用jquery 实现table 拖动改变尺寸

使用 CDN , 降低打包文件的大小在index.html中 <script src"https://.../cdns/jquery-1.12.4.min.js"></script>在 Vue 中使用 jQuery 一旦你引入 jQuery&#xff0c;你可以在 Vue 实例中使用它。有两种主要方式&#xff1a; 1. 使用全局变量 $ jQue…...

Linux的基本指令(上)

1.ls指令 语法&#xff1a;ls [选项] [目录或文件] 功能&#xff1a;对于⽬录&#xff0c;该命令列出该⽬录下的所有⼦⽬录与⽂件。对于⽂件&#xff0c;将列出⽂件名以及其他信息。 常用选项&#xff1a; -a 列出⽬录下的所有⽂件&#xff0c;包括以 . 开头的隐含⽂件。 -d 将…...

【单链表算法实战】解锁数据结构核心谜题——相交链表

题目如下&#xff1a; 解题过程如下&#xff1a; 相交链表只可以在中间任意位置/头/尾结点相交&#xff0c;如下图&#xff1a; 一个next指针只能指向一块地址&#xff0c;所以不会出现这种情况&#xff1a; 在返回相交链表的起始结点之前先要判断两个链表是否相交&#xff0…...

HTTP 配置与应用(不同网段)

想做一个自己学习的有关的csdn账号&#xff0c;努力奋斗......会更新我计算机网络实验课程的所有内容&#xff0c;还有其他的学习知识^_^&#xff0c;为自己巩固一下所学知识&#xff0c;下次更新校园网设计。 我是一个萌新小白&#xff0c;有误地方请大家指正&#xff0c;谢谢…...

深度学习 Pytorch 单层神经网络

神经网络是模仿人类大脑结构所构建的算法&#xff0c;在人脑里&#xff0c;我们有轴突连接神经元&#xff0c;在算法中&#xff0c;我们用圆表示神经元&#xff0c;用线表示神经元之间的连接&#xff0c;数据从神经网络的左侧输入&#xff0c;让神经元处理之后&#xff0c;从右…...

政安晨的AI大模型训练实践三:熟悉一下LF训练模型的WebUI

政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 希望政安晨的博客能够对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff01; 目录 启动WebUI 微调模型 LLaMA-Factory 支持通过 WebUI 零代码微调大语言模型。 启动Web…...

Flink Gauss CDC:深度剖析存量与增量同步的创新设计

目录 设计思路 1.为什么不直接用FlinkCDC要重写Flink Gauss CDC 2.存量同步的逻辑是什么 2.1、单主键的切片策略是什么 2.2、​​​​​复合主键作切片,怎么保证扫描到所有的数据 3、增量同步的逻辑是什么 4、存量同步结束之后如何无缝衔接增量同步 5、下游数据如何落…...

【深入理解SpringCloud微服务】Sentinel规则持久化实战

Sentinel规则持久化实战 Sentinel规则推送模式原始模式pull&#xff08;拉模式&#xff09;push&#xff08;推模式&#xff09; 实现拉模式实现推模式 Sentinel规则推送模式 原始模式 原始模式由Sentinel控制台直接推送规则到Sentinel客户端&#xff0c;Sentinel客户端将规则…...

三高“高性能、高并发、高可靠”系统架构设计系列文章

目录 高并发系统的艺术&#xff1a;如何在流量洪峰中游刃有余 《数据密集型应用系统设计》读后感与高并发高性能实践案例 系统稳定性与高可用保障的几种思路 软件系统限流的底层原理解析 技术解决方案调研 延迟队列调研 重试调研 异步回调调研 分库分表调研 分布式事…...

ray.rllib-入门实践-11: 自定义模型/网络

在ray.rllib中定义和使用自己的模型&#xff0c; 分为以下三个步骤&#xff1a; 1. 定义自己的模型。 2. 向ray注册自定义的模型 3. 在config中配置使用自定义的模型 环境配置&#xff1a; torch2.5.1 ray2.10.0 ray[rllib]2.10.0 ray[tune]2.10.0 ray[serve]2.10.0 numpy1.23.…...

C语言小项目——通讯录

功能介绍&#xff1a; 1.联系人信息&#xff1a;姓名年龄性别地址电话 2.通讯录中可以存放100个人的信息 3.功能&#xff1a; 1>增加联系人 2>删除指定联系人 3>查找指定联系人的信息 4>修改指定联系人的信息 5显示所有联系人的信息 6>排序&#xff08;名字&…...

C#牵手Blazor,解锁跨平台Web应用开发新姿势

一、引言 在当今数字化时代&#xff0c;Web 应用已成为人们生活和工作中不可或缺的一部分 &#xff0c;而开发跨平台的 Web 应用则是满足不同用户需求、扩大应用影响力的关键。C# 作为一种强大的编程语言&#xff0c;拥有丰富的类库和强大的功能&#xff0c;在企业级开发、游戏…...

PCIE模式配置

对于VU系列FPGA&#xff0c;当DMA/Bridge Subsystem for PCI Express IP配置为Bridge模式时&#xff0c;等同于K7系列中的AXI Memory Mapped To PCI Express IP。...

【论文阅读】RT-SKETCH: GOAL-CONDITIONED IMITATION LEARNING FROM HAND-DRAWN SKETCHES

RT-Sketch&#xff1a;基于手绘草图的目标条件模仿学习 摘要&#xff1a;在目标条件模仿学习&#xff08;imitation learning&#xff0c;IL&#xff09;中&#xff0c;自然语言和图像通常被用作目标表示。然而&#xff0c;自然语言可能存在歧义&#xff0c;图像则可能过于具体…...

【由浅入深认识Maven】第2部分 maven依赖管理与仓库机制

文章目录 第二篇&#xff1a;Maven依赖管理与仓库机制一、前言二、依赖管理基础1.依赖声明2. 依赖范围&#xff08;Scope&#xff09;3. 依赖冲突与排除 三、Maven的仓库机制1. 本地仓库2. 中央仓库3. 远程仓库 四、 版本管理策略1. 固定版本2. 版本范围 五、 总结 第二篇&…...

centos 安全配置基线

CentOS 是一个广泛使用的操作系统&#xff0c;为了确保系统的安全性&#xff0c;需要遵循一系列的安全基线。以下是详细的 CentOS 安全基线配置建议&#xff1a; 通过配置核查,CentOS操作系统未安装入侵防护软件,无法检测到对重要节点进行入侵的 解决方案&#xff1a; 安装入侵…...

备赛蓝桥杯之第十五届职业院校组省赛第一题:智能停车系统

提示&#xff1a;本篇文章仅仅是作者自己目前在备赛蓝桥杯中&#xff0c;自己学习与刷题的学习笔记&#xff0c;写的不好&#xff0c;欢迎大家批评与建议 由于个别题目代码量与题目量偏大&#xff0c;请大家自己去蓝桥杯官网【连接高校和企业 - 蓝桥云课】去寻找原题&#xff0…...

力扣 Hot 100 题解 (js版)更新ing

&#x1f6a9;哈希表 ✅ 1. 两数之和 Code&#xff1a; 暴力法 复杂度分析&#xff1a; 时间复杂度&#xff1a; ∗ O ( N 2 ) ∗ *O(N^2)* ∗O(N2)∗&#xff0c;其中 N 是数组中的元素数量。最坏情况下数组中任意两个数都要被匹配一次。空间复杂度&#xff1a;O(1)。 /…...

DeepSeek-R1:性能对标 OpenAI,开源助力 AI 生态发展

DeepSeek-R1&#xff1a;性能对标 OpenAI&#xff0c;开源助力 AI 生态发展 在人工智能领域&#xff0c;大模型的竞争一直备受关注。最近&#xff0c;DeepSeek 团队发布了 DeepSeek-R1 模型&#xff0c;并开源了模型权重&#xff0c;这一举动无疑为 AI 领域带来了新的活力。今…...

CY T 4 BB 5 CEB Q 1 A EE GS MCAL配置 - MCU组件

1、ResourceM 配置 选择芯片信号: 2、MCU 配置 2.1 General配置 1) McuDevErrorDetect: - 启用或禁用MCU驱动程序模块的开发错误通知功能。 - 注意:采用DET错误检测机制作为安全机制(故障检测)时,不能禁用开发错误检测。2) McuGetRamStateApi - enable/disable th…...