当前位置: 首页 > news >正文

网络编程(18)——使用asio协程实现并发服务器

十八、day18

到目前为止,我们以及学习了单线程同步/异步服务器、多线程IOServicePool和多线程IOThreadPool模型,今天学习如何通过asio协程实现并发服务器

并发服务器有以下几种好处:

  • 协程比线程更轻量,创建和销毁协程的开销较小,适合高并发场景
  • 协程通常在单线程中运行,避免了多线程带来的资源竞争和同步问题,从而减少了内存使用
  • 将回调函数改写为顺序调用,让异步的函数能够以同步的方式写出来的同时不降低性能,提高开发效率
  • 协程调度比线程调度更轻量化,因为协程是运行在用户空间的,线程切换需要在用户空间和内核空间切换

首先需将C++语言标准换为C++20标准,协程是在C++20之后引入的新标准

1. 官方案例

asio官网提供了一个协程并发编程的案例,如下

#include <iostream>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro;awaitable<void> echo(tcp::socket socket) {try {char data[1024];for (;;) {std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);}}catch (std::exception& e) {std::cout << "Echo exception is " << e.what() << std::endl;}
}awaitable<void> listener() {auto executor = co_await this_coro::executor;tcp::acceptor acceptor(executor, { tcp::v4(), 10086 });for (;;) {tcp::socket socket = co_await acceptor.async_accept(use_awaitable);co_spawn(executor, echo(std::move(socket)), detached);}
}int main()
{try {boost::asio::io_context io_context(1); // 1被用于提供有关所需并发级别的提示boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);signals.async_wait([&](auto, auto) { // 处理几个信号就传入几个参数,这里使用auto自动推断io_context.stop();});co_spawn(io_context, listener(), detached);io_context.run();}catch (std::exception& e) {std::cout << "Exception is " << e.what() << std::endl;}
}

a. 声明

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro;
  • awaitable :用于定义可以在协程中使用的异步操作,可以通过 co_await 关键字等待异步任务的完成,使异步的函数能够以同步的方式写出来的同时不降低性能
  • co_spawn:用于启动新的协程的函数,可以用它来创建新的异步任务并在指定的执行上下文中运行
  • detached:指示器,表示创建的协程不需要等待其结果。使用 detached 后,协程会在后台独立运行
  • use_awaitable:适配器,指示以协程的方式使用 Boost.Asio 的异步操作,它使得异步操作可以与 co_await 关键字结合使用。适配器允许将异步操作的结果直接与协程的执行流结合,使得异步调用能够以同步的方式写出,从而避免了手动管理回调函数
  • co_await 关键字的作用:
    • 当协程遇到 co_await 时,它会挂起执行,直到被等待的异步操作完成。这允许当前线程释放 CPU,去处理其他任务或协程。
    • 一旦等待的操作完成,协程会自动恢复执行,继续从挂起的地方运行。这样可以避免复杂的回调地狱,提供更直观的控制流。
    • co_await 会自动获取异步操作的结果并将其返回给调用者。例如,如果等待的是一个返回值的异步操作,结果会被赋给相应的变量。
    • 如果在 co_await 等待的异步操作中发生异常,协程可以捕获这些异常,方便进行错误处理。

b. echo()

awaitable<void> echo(tcp::socket socket) {try {char data[1024];for (;;) {std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);}}catch (std::exception& e) {std::cout << "Echo exception is " << e.what() << std::endl;}
}

awaitable<void>类型允许函数在执行时可以被暂停和恢复,这使得它能够与 co_await 一起使用,所以函数返回类型必须是awaitable<void>。

echo 函数能够高效处理多个客户端连接而不阻塞线程,主要是因为:

  • echo 函数使用 socket.async_read_some 和 async_write 方法进行异步读写操作。这意味着当函数执行这些操作时,它不会阻塞当前线程,而是可以在等待 I/O 完成时让出控制权。
  • 使用协程和 co_await,当 I/O 操作挂起时,协程会被暂停并释放线程。这使得同一线程可以处理其他任务或更多的连接,而不需要为每个连接创建新的线程。
  • 服务器的主循环(io_context.run())会持续运行,处理所有已准备好的异步操作。这样一来,多个连接可以并发处理,而不需要多个线程同时活跃。
  • 当协程通过 co_await 等待 I/O 操作时,它不会占用 CPU 资源。主线程可以继续接受新的连接或处理其他已完成的操作,从而提高并发能力。

其中

std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);

该段代码使用co_await 关键字等待异步读取操作完成,并将读取的字节数存储到n中。和之前异步服务器异步操作需要绑定回调函数不同,这里通过协程实现的并发服务器读写通过co_await 关键字和use_awaitable适配器组合使用,会自动处理异步操作的结果。当调用 socket.async_read_some 时,协程会暂停执行,并在操作完成时恢复。这个机制隐藏了回调的复杂性,使得代码更简洁和易读。当异步操作完成时,协程会自动继续执行,并将结果传递给 n 。

co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);

同理,异步写函数也以同步的方式使用,不需要显示bind回调函数,co_await 关键字会等待异步读取操作完成,而适配器use_awaitable允许将异步操作的结果直接与协程的执行流结合

c. listener()

awaitable<void> listener() {auto executor = co_await this_coro::executor;tcp::acceptor acceptor(executor, { tcp::v4(), 10086 });for (;;) {tcp::socket socket = co_await acceptor.async_accept(use_awaitable);co_spawn(executor, echo(std::move(socket)), detached);}
}

该函数不断监听 TCP 端口,接受来自客户端的连接。每当有新连接到达时,它会启动一个 echo 协程来处理该连接。这种设计使得服务器能够同时处理多个客户端连接而不会阻塞,提高了并发处理能力。

auto executor = co_await this_coro::executor;

获取执行器:

  • this_coro::executor 是特殊的上下文,用于获取当前协程的执行器(executor),它定义了协程将在哪个上下文(io_context)中运行
  • co_await 关键字使得协程在获取执行器时可以暂停,并在获取到执行器后恢复执行。
co_spawn(executor, echo(std::move(socket)), detached);
  • 启动处理协程:
    • co_spawn 启动一个新的协程
    • executor 指定了新的协程的执行上下文
    • echo(std::move(socket)) 创建一个新的 echo 协程来处理该连接。std::move(socket) 将 socket 移动到 echo 协程中,避免不必要的拷贝。移动socket之后,上面的socket便无法发挥作用,因为该socket已经被移动至echo中。
    • detached 表示新协程的执行不需要主协程等待其完成。

d. main()

int main()
{try {boost::asio::io_context io_context(1); // 1被用于提供有关所需并发级别的提示boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);signals.async_wait([&](auto, auto) { // 处理几个信号就传入几个参数,这里使用auto自动推断io_context.stop();});co_spawn(io_context, listener(), detached);io_context.run();}catch (std::exception& e) {std::cout << "Exception is " << e.what() << std::endl;}
}

io_context有多个重载,这里使用的重载原型为

explicit io_context(int concurrency_hint);

concurrency_hint用来提示实现该类的系统,它应当允许多少个线程(不是协程)同时运行。

  • concurrency_hint=0时,则I/O操作的实现将使用默认的并发级别,此时,io_context 将根据内部实现和系统资源自动决定使用多少线程;
  • concurrency_hint=1时,则I/O操作的实现将尝试最小化线程的创建,并且不会创建额外的工作线程,常表示仅使用一个线程来处理所有 I/O 操作,适用于大多数简单的应用场景,避免不必要的线程开销;
  • concurrency_hint>1时,则I/O操作的实现将允许同时运行多个工作线程,允许程序在多个线程中并行处理 I/O 操作,从而提高性能。
        signals.async_wait([&](auto, auto) { // 处理几个信号就传入几个参数,这里使用auto自动推断io_context.stop();});

信号处理,当遇到退出信号(ctrl+c或强制终止信号)时,执行lambd函数,停止ioc的运行。

co_spawn(io_context, listener(), detached);

启动一个listener协程,开始监听客户端连接,并且这个协程的执行不需要主协程等待其完成。

2. 客户端

#include <iostream>
#include <boost/asio.hpp>const int MAX_LENGTH = 1024;int main()
{try {boost::asio::io_context ioc;boost::asio::ip::tcp::endpoint remote_ep(boost::asio::ip::address::from_string("127.0.0.1"), 10086);boost::asio::ip::tcp::socket sock(ioc);boost::system::error_code error = boost::asio::error::host_not_found;sock.connect(remote_ep, error);if (error) {std::cout << "connect failed, code is " << error.value() << " error msg is " << error.what() << std::endl;return 0;}std::cout << "Enter message: ";char request[MAX_LENGTH];std::cin.getline(request, MAX_LENGTH);size_t request_length = strlen(request);boost::asio::write(sock, boost::asio::buffer(request, request_length));char reply[MAX_LENGTH];size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply, request_length));std::cout << "reply is " << std::string(reply, reply_length) << std::endl;getchar();}catch (std::exception& e) {std::cerr << "Exception is " << e.what() << std::endl;}return 0;
}

和之前的客户端处理基本类似,只不过忽略了消息节点封装和序列号操作。

3. 修改之前的服务器函数

void CSession::Start() {auto shared_this = shared_from_this();//开启接收协程co_spawn(_io_context, [=]()->awaitable<void> {try {for (;!_b_close;) {_recv_head_node->Clear();std::size_t n = co_await boost::asio::async_read(_socket,boost::asio::buffer(_recv_head_node->_data, HEAD_TOTAL_LEN),use_awaitable);if (n == 0) {std::cout << "receive peer closed" << endl;Close();_server->ClearSession(_uuid);co_return;}//获取头部MSGID数据short msg_id = 0;memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);//网络字节序转化为本地字节序msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);std::cout << "msg_id is " << msg_id << endl;//id非法if (msg_id > MAX_LENGTH) {std::cout << "invalid msg_id is " << msg_id << endl;_server->ClearSession(_uuid);co_return;}short msg_len = 0;memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);//网络字节序转化为本地字节序msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);std::cout << "msg_len is " << msg_len << endl;//长度非法if (msg_len > MAX_LENGTH) {std::cout << "invalid data length is " << msg_len << endl;_server->ClearSession(_uuid);co_return;}_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);//读出包体n = co_await boost::asio::async_read(_socket,boost::asio::buffer(_recv_msg_node->_data, _recv_msg_node->_total_len), use_awaitable);if (n == 0) {std::cout << "receive peer closed" << endl;Close();_server->ClearSession(_uuid);co_return;}_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';cout << "receive data is " << _recv_msg_node->_data << endl;//投递给逻辑线程LogicSystem::GetInstance().PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));}}catch (std::exception& e) {std::cout << "exception is " << e.what() << endl;Close();_server->ClearSession(_uuid);}}, detached);
}

在新的Session中,不需要绑定回调函数进行处理,而是通过关键字co_await 和适配器use_awaitable,使异步函数通过同步方式写出来,在一个函数中进行数据的粘包处理、网络序列-本地序列转换、序列化处理,并将消息投递至逻辑队列。

通过协程实现并发服务器可大大减少代码量,相比异步编程更加直观,但受限于平台,目前C++20的协程说是协程库,实际上只是开放了无栈协程的协议,正儿八经的官方协程还未发布。

相关文章:

网络编程(18)——使用asio协程实现并发服务器

十八、day18 到目前为止&#xff0c;我们以及学习了单线程同步/异步服务器、多线程IOServicePool和多线程IOThreadPool模型&#xff0c;今天学习如何通过asio协程实现并发服务器。 并发服务器有以下几种好处&#xff1a; 协程比线程更轻量&#xff0c;创建和销毁协程的开销较…...

Koa2项目实战2(路由管理、项目结构优化)

添加路由&#xff08;处理不同的URL请求&#xff09; 路由&#xff1a;根据不同的URL&#xff0c;调用对应的处理函数。 每一个接口服务&#xff0c;最核心的功能是&#xff1a;根据不同的URL请求&#xff0c;返回不同的数据。也就是调用不同的接口返回不同的数据。 在 Node…...

决战Linux操作系统

前言&#xff1a; 你是否也曾经为Linux所困扰过&#xff0c;在网上找的资料零零散散&#xff0c;是否学完Linux后还是懵懵懂懂&#xff0c;别怕&#xff0c;这篇博客是博主精心为你准备的&#xff0c;现在&#xff0c;就让我们一起来走进Linux的世界&#xff0c;决战Linux&…...

OceanBase 3.2.2 数据库问题处理记录

只记录OceanBase 数据库与OCP的异常处理&#xff0c;其它组件暂时不写录。 一、问题1&#xff1a; 说明&#xff1a;OMS 出现异常&#xff0c;无法访问(OB无法访问) OB数据库架构&#xff1a;1:1:1 原因&#xff1a;某一台OBserver因为内存问题&#xff0c;被服务器直接kill掉…...

HCIP--以太网交换安全(二)端口安全

端口安全 一、端口安全概述 1.1、端口安全概述&#xff1a;端口安全是一种网络设备防护措施&#xff0c;通过将接口学习的MAC地址设为安全地址防止非法用户通信。 1.2、端口安全原理&#xff1a; 类型 定义 特点 安全动态MAC地址 使能端口而未是能Stichy MAC功能是转换的…...

在 Windows 11 安卓子系统中安装 APK 的操作指南

这个软件好像不可以在纯android系统中使用&#xff08;不知道是缺了什么&#xff09;&#xff0c;其他对于android的虚拟机要不缺少必要功能组件&#xff0c;要不性能过于低下。本方法致力于在带有谷歌框架WSA中运行该APK 在 Windows 11 安卓子系统中安装 APK 的操作指南 本指…...

[C语言] 函数详解:库函数与自定义函数

文章目录 函数的概念库函数和自定义函数库函数使用库函数示例常用库函数及头文件 自定义函数自定义函数的基本结构示例&#xff1a;实现两个数的求和函数自定义函数的好处 函数的返回值有返回值的函数无返回值的函数 函数的声明与调用声明函数在另一个文件中调用函数示例&#…...

0x11 科迈 RAS系统 Cookie验证越权漏洞

参考: 科迈 RAS系统 Cookie验证越权漏洞 | PeiQi文库 (wgpsec.org)免责声明 欢迎访问我的博客。以下内容仅供教育和信息用途: 合法性:我不支持或鼓励非法活动。请确保遵守法律法规。信息准确性:尽管我尽力提供准确的信息,但不保证其完全准确或适用。使用前请自行验证。风…...

MoonBit 双周报 Vol.57:AI助手功能增强、表达式优先级调整、JS 交互优化、标准库与实验库API多项更新!

2024-10-08 IDE更新 AI Codelens支持 /generate 和 /fix 命令 /generate 命令能够提供一个通用的用以生成代码的聊天界面。 /fix 命令能够读取当前函数的错误信息给出修复建议。 MoonBit更新 调整中缀表达式和if、match、loop、while、for、try表达式的优先级, 后者这些控制…...

element ui input textarea控制显示高度

样式代码 .testPage { position: absolute; left: 0; top: 0; right: 0; bottom: 0; display: flex; height: 100%; /* 控制输入框高度 */ .el-textarea { height: 90%; ::v-deep .el-textarea__inner { height: 90%; } } }...

Chromium 中chrome.downloads扩展接口c++

一、前端chrome.downloads 使用 chrome.downloads API 以编程方式启动、监控、操作和搜索下载内容。 权限 downloads 您必须在扩展程序清单中声明 "downloads" 权限&#xff0c;才能使用此 API。 {"name": "My extension",..."permiss…...

微信小程序常见问题

一、编译报错 [ app.json 文件内容错误] app.json: 在项目根目录未找到 app.json 解决办法&#xff1a; 微信开发者工具中打开设置->安全设置->打开服务端口用HBuilder X打开小程序文件夹&#xff0c;点击“运行到小程序模拟器”&#xff0c;生成配置文件&#xff0c;…...

进程的理解

进程的理解 目录&#xff1a; 什么是进程主要特征主要组成部分进程状态进程优先级 1.什么是进程 概念&#xff1a; 在操作系统中&#xff0c;**进程&#xff08;Process&#xff09;**是一个正在执行的程序实例。可以将进程理解为一个动态的实体&#xff0c;它不仅包括静态…...

LeetCode494:目标和

题目链接&#xff1a;494. 目标和 - 力扣&#xff08;LeetCode&#xff09; 代码如下 class Solution { public:int findTargetSumWays(vector<int>& nums, int target) {int sum 0;for(int i 0; i < nums.size(); i){sum nums[i];}if(abs(target) > sum)…...

vue3中自定义校验函数密码不生效问题

vue3中自定义校验函数密码不生效问题 由于在自定义的校验规则中只校验了有数据的情况&#xff0c;以至于在没输入时&#xff0c;校验不生效 &#xff08;1&#xff09;用户不输入校验不生效 const validateSurePassword (rule, value, callback) > {if (value ! ) {if (…...

RabbitMQ(死信队列)

一、本文抒写背景 前面我也在延迟队列篇章提到过死信队列&#xff0c;也提到过一些应用场景&#xff01; 今天呢&#xff0c;这篇文章&#xff0c;主要就是实战一个业务场景的小Demo流程&#xff0c;哈哈&#xff0c;那就是延迟关闭订单。 二、开始啦&#xff01;letgo! 首…...

HTTP代理的优点和局限性

在这个信息爆炸的时代&#xff0c;网络已成为我们获取知识、交流思想、开展商务的重要平台。但随之而来的隐私泄露、网络安全威胁、以及无处不在的网络监控&#xff0c;却让我们的每一次在线活动都充满了风险。 在这样的背景下&#xff0c;HTTP代理技术应运而生&#xff0c;它不…...

大厂面试真题-如果通过JVM自带的工具排查和解决线上CPU100%的问题

通过JVM自带的工具去定位和解决线上CPU 100%的问题&#xff0c;可以遵循以下步骤&#xff1a; 一、使用top和jps定位Java进程 使用top命令&#xff1a; 在Linux服务器上执行top命令&#xff0c;查看所有进程的CPU使用情况。找到CPU使用率最高的进程&#xff0c;并记录其PID&a…...

kubernetes中微服务部署

微服务 问&#xff1a;用控制器来完成集群的工作负载&#xff0c;那么应用如何暴漏出去&#xff1f; 答&#xff1a;需要通过微服务暴漏出去后才能被访问 Service 是一组提供相同服务的Pod对外开放的接口借助Service&#xff0c;应用可以实现服务发现和负载均衡Service 默认只…...

基于 Java 的天气预报系统设计与实现

随着互联网的飞速发展&#xff0c;天气预报系统变得越来越重要。它可以帮助用户了解未来几天的天气情况&#xff0c;便于出行、活动安排。本文将介绍如何使用 Java 构建一个简单的天气预报系统&#xff0c;涉及系统架构设计、核心功能开发以及完整的代码实现。 1. 系统架构设计…...

synchronized 学习

学习源&#xff1a; https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖&#xff0c;也要考虑性能问题&#xff08;场景&#xff09; 2.常见面试问题&#xff1a; sync出…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

EtherNet/IP转DeviceNet协议网关详解

一&#xff0c;设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络&#xff0c;本网关连接到EtherNet/IP总线中做为从站使用&#xff0c;连接到DeviceNet总线中做为从站使用。 在自动…...

算法:模拟

1.替换所有的问号 1576. 替换所有的问号 - 力扣&#xff08;LeetCode&#xff09; ​遍历字符串​&#xff1a;通过外层循环逐一检查每个字符。​遇到 ? 时处理​&#xff1a; 内层循环遍历小写字母&#xff08;a 到 z&#xff09;。对每个字母检查是否满足&#xff1a; ​与…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

从 GreenPlum 到镜舟数据库:杭银消费金融湖仓一体转型实践

作者&#xff1a;吴岐诗&#xff0c;杭银消费金融大数据应用开发工程师 本文整理自杭银消费金融大数据应用开发工程师在StarRocks Summit Asia 2024的分享 引言&#xff1a;融合数据湖与数仓的创新之路 在数字金融时代&#xff0c;数据已成为金融机构的核心竞争力。杭银消费金…...

DBLP数据库是什么?

DBLP&#xff08;Digital Bibliography & Library Project&#xff09;Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高&#xff0c;数据库文献更新速度很快&#xff0c;很好地反映了国际计算机科学学术研…...

JavaScript 标签加载

目录 JavaScript 标签加载script 标签的 async 和 defer 属性&#xff0c;分别代表什么&#xff0c;有什么区别1. 普通 script 标签2. async 属性3. defer 属性4. type"module"5. 各种加载方式的对比6. 使用建议 JavaScript 标签加载 script 标签的 async 和 defer …...