Boost Asio TCP异步服务端和客户端
服务端
消息分两次发送,第一次发送head,第二次发送body。接收也是先接收head,然后通过head结构中的body长度字段再接收body。
TcpServer.h
#pragma once
#include <atomic>
#include <vector>
#include <unordered_set>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/system/system_error.hpp>
#include "Connection.h"using namespace boost::asio;
using namespace boost::asio::ip;
using namespace boost::system;class TcpServer : public Connection::Listener {
public:using Handler = std::function<void(std::vector<uint8_t>, MessageType)>;TcpServer(uint16_t port, Handler&& handler);~TcpServer();void _startListen();void _startAccept();void _handleAccept(const error_code& error, std::shared_ptr<tcp::socket> socket);virtual void onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type);void send(const char*, int size);private:uint16_t m_localPort;io_service m_oAcceptService;io_service::work m_oAcceptWork;tcp::acceptor *m_pAcceptor = nullptr;std::atomic_bool m_bStop = false;mutable boost::shared_mutex _connectionMutex;std::unordered_set<ConnectionPtr> _connections;Handler m_handler;
};
TcpServer.cpp
#include "TcpServer.h"
#include <boost/asio/buffer.hpp>
#include <fstream>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/asio.hpp>TcpServer::TcpServer(uint16_t port, Handler&& handler): m_oAcceptWork(m_oAcceptService), m_localPort(port), m_handler(handler)
{m_pAcceptor = new boost::asio::ip::tcp::acceptor(m_oAcceptService);_startListen();_startAccept();std::thread([&]() {while (1){m_oAcceptService.run();}}).detach();
}TcpServer::~TcpServer() {m_bStop = true;
}void TcpServer::_startListen() {boost::asio::ip::tcp::endpoint localEndpoint(boost::asio::ip::tcp::v4(), m_localPort);m_pAcceptor->open(localEndpoint.protocol());m_pAcceptor->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));boost::asio::ip::tcp::no_delay noDelayOption(false);m_pAcceptor->set_option(noDelayOption);boost::system::error_code ec;boost::system::error_code code = m_pAcceptor->bind(localEndpoint, ec);if (!ec.value()){m_pAcceptor->listen();}elsestd::cout << (std::string("TcpServer start listen exception: ") + ec.message().c_str()) << std::endl;}void TcpServer::_startAccept() {if (m_bStop){return;}auto socket = std::make_shared<boost::asio::ip::tcp::socket>(m_oAcceptService);m_pAcceptor->async_accept(*socket, boost::bind(&TcpServer::_handleAccept, this, boost::asio::placeholders::error, socket));}void TcpServer::_handleAccept(const error_code& error, std::shared_ptr<tcp::socket> socket) {if (!error) {// read and write.std::cout << "_handleAccept" << std::endl;auto connection = std::make_shared<Connection>(std::move(*socket), socket->get_io_service(), this);boost::unique_lock<boost::shared_mutex> lock(_connectionMutex);_connections.emplace(connection);lock.unlock();connection->start();}_startAccept();}void TcpServer::onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type) {//connection->send(data);m_handler(data, type);
}void TcpServer::send(const char* data, int size)
{for (auto conn : _connections){conn->send(data, size);}
}
Connection.h
#pragma once
#define BOOST_ASIO_DISABLE_STD_CHRONO
#include <boost/asio.hpp>
#include <boost/date_time/time_duration.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/mutex.hpp>
#include <atomic>
#include <memory>
#include <list>
#include <future>
#include <boost/asio/steady_timer.hpp>
#include "message.h"namespace pt = boost::posix_time;
namespace placeholders = boost::asio::placeholders;
using boost::asio::buffer;
using boost::asio::const_buffer;// Connection的最大作用是保存TcpServer连接的客户端socket,以及单独启动线程或异步进行数据收发
class Connection : public std::enable_shared_from_this<Connection> {
public:class Listener;Connection(boost::asio::ip::tcp::socket&& socket, boost::asio::io_service& oTimerService, Listener* listener);~Connection();void start();void stop();void _ranDataReception();void _handleReadHeader(const boost::system::error_code& error);void _handleReadData(const boost::system::error_code& error, const std::vector<uint8_t>& body, MessageType type);void send(const char* data, int size);void send(const std::vector<uint8_t>& data);void on_write(const boost::system::error_code & err, size_t bytes);private:bool _stopped = false;boost::asio::ip::tcp::socket _socket;MessageHeader _header;Listener* _listener;
};typedef std::shared_ptr<Connection> ConnectionPtr;class Connection::Listener {
public:virtual ~Listener() {}virtual void onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type) = 0;
};
Connection.cpp
#include "Connection.h"
#include <boost/bind.hpp>
#include <functional>
#include <iostream>Connection::Connection(boost::asio::ip::tcp::socket&& socket, boost::asio::io_service& oTimerService, Listener* listener): _socket(std::move(socket)), _listener(listener)
{
}Connection::~Connection()
{}void Connection::start()
{_stopped = false;_ranDataReception();
}void Connection::stop()
{_stopped = true;
}void Connection::_ranDataReception() {if (!_stopped){memset(&_header, 0, sizeof(MessageHeader));boost::system::error_code oError;boost::asio::async_read(_socket, boost::asio::buffer(&_header, sizeof(MessageHeader)),boost::asio::transfer_exactly(sizeof(MessageHeader)),boost::bind(&Connection::_handleReadHeader, shared_from_this(), oError));}
}void Connection::_handleReadHeader(const boost::system::error_code& error) {if (!_stopped) {if (!error) {MessageType type = _header.type;int bodyLen = _header.length;//std::string strBody;std::vector<uint8_t> strBody;strBody.resize(bodyLen);//boost::system::error_code error;size_t iReadSize = _socket.read_some(boost::asio::buffer(strBody.data(), bodyLen), error);while (!error){if (iReadSize < bodyLen){iReadSize += _socket.read_some(boost::asio::buffer(strBody.data() + iReadSize, bodyLen - iReadSize), error);}else{break;}}if (!error && iReadSize == bodyLen){_handleReadData(error, strBody, type);}else{}}}
}void Connection::_handleReadData(const boost::system::error_code& error, const std::vector<uint8_t>& body, MessageType type)
{//if (!_stopped){if (!error){_listener->onDataReceived(shared_from_this(), body, type);_ranDataReception();}}
}void Connection::send(const char* data, int size)
{boost::system::error_code error;_socket.async_write_some(boost::asio::buffer(data, size),boost::bind(&Connection::on_write, this,boost::placeholders::_1,boost::placeholders::_2));
}void Connection::send(const std::vector<uint8_t>& data)
{boost::system::error_code error;_socket.async_write_some(boost::asio::buffer(data.data(), data.size()), boost::bind(&Connection::on_write, this, boost::placeholders::_1, boost::placeholders::_2));
}void Connection::on_write(const boost::system::error_code & err, size_t bytes)
{}
客户端
Network.h
#pragma once
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>namespace sinftech {
namespace tv {
class Network {
public:Network(boost::asio::io_service& ioService, const std::string& address, uint16_t port);~Network();void start();void stop();size_t send(char* data, size_t size);size_t receive(char* data, size_t size);private:bool _running;boost::asio::ip::tcp::socket _socket;boost::asio::ip::tcp::endpoint _remoteEndpoint;
};
}//namespace tv
}//namespace sinftech
Network.cpp (windows平台setopt设置超时时间使用整数,Linux平台使用结构体struct timeval)
#include "Network.h"
#include <boost/asio/buffer.hpp>
#include <thread>namespace sinftech {
namespace tv {Network::Network(boost::asio::io_service& ioService, const std::string& address, uint16_t port): _running(false), _socket(ioService), _remoteEndpoint(boost::asio::ip::address::from_string(address), port)
{int timeout = 1000;int iRet = setsockopt(_socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout));if (0 != iRet){printf("Set rcv time out error\n");}int iRcvSize = 1024 * 1000;iRet = setsockopt(_socket.native(), SOL_SOCKET, SO_RCVBUF, (char *)&iRcvSize, sizeof(iRcvSize));if (0 != iRet){printf("Set rcv buffer size error\n");}start();
}Network::~Network() {stop();
}void Network::start() {_running = true;
}void Network::stop() {_running = false;boost::system::error_code ec;_socket.close(ec);
}size_t Network::send(char* data, size_t size) {size_t bytesSent = 0;if (_running) {boost::system::error_code ec;if (!_socket.is_open()) {_socket.connect(_remoteEndpoint, ec);}if (!ec) { bytesSent = _socket.write_some(boost::asio::buffer(data, size), ec);}if (ec) {_socket.close(ec);}}return bytesSent;
}size_t Network::receive(char* data, size_t size) {size_t bytesRecv = 0;if (_running) {boost::system::error_code ec;if (!_socket.is_open()) {_socket.connect(_remoteEndpoint, ec);}if (!ec) { bytesRecv = _socket.read_some(boost::asio::buffer(data, size), ec);}if (ec) {_socket.close(ec);}}return bytesRecv;
}}//namespace tv
}//namespace sinftech
注意,Linux和Windows平台使用setopt设置超时参数的方式是不同的。在Linux上,你可以使用setsockopt来设置套接字选项,包括读取和写入超时。具体的选项是SO_RCVTIMEO和SO_SNDTIMEO。
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>int set_socket_timeout(int sockfd, int timeout_ms) {struct timeval timeout;timeout.tv_sec = timeout_ms / 1000;timeout.tv_usec = (timeout_ms % 1000) * 1000;// 设置接收超时if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {perror("setsockopt failed");return -1;}// 设置发送超时if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {perror("setsockopt failed");return -1;}return 0;
}
在Windows上,setsockopt同样用于设置套接字选项,但超时时间是以毫秒为单位的整数,而不是timeval结构体。你需要使用SO_RCVTIMEO和SO_SNDTIMEO选项,并传递一个DWORD类型的值。
#include <winsock2.h>
#include <ws2tcpip.h>#pragma comment(lib, "Ws2_32.lib")int set_socket_timeout(SOCKET sockfd, DWORD timeout_ms) {// 设置接收超时if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout_ms, sizeof(timeout_ms)) == SOCKET_ERROR) {printf("setsockopt failed with error: %ld\n", WSAGetLastError());return -1;}// 设置发送超时if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char*)&timeout_ms, sizeof(timeout_ms)) == SOCKET_ERROR) {printf("setsockopt failed with error: %ld\n", WSAGetLastError());return -1;}return 0;
}// 在程序开始时需要初始化Winsock库
int main() {WSADATA wsaData;int result = WSAStartup(MAKEWORD(2, 2), &wsaData);if (result != 0) {printf("WSAStartup failed: %d\n", result);return 1;}// ... 创建并配置套接字 ...// 在程序结束前清理Winsock库WSACleanup();return 0;
}
相关文章:
Boost Asio TCP异步服务端和客户端
服务端 消息分两次发送,第一次发送head,第二次发送body。接收也是先接收head,然后通过head结构中的body长度字段再接收body。 TcpServer.h #pragma once #include <atomic> #include <vector> #include <unordered_set> #…...
1.7 ChatGPT:引领AI对话革命的致胜之道
ChatGPT:引领AI对话革命的致胜之道 随着人工智能(AI)技术的迅猛发展,特别是在自然语言处理(NLP)领域,OpenAI 的 ChatGPT 已经成为了举世瞩目的技术突破。从普通的自动化客服到深入的创作与协作,ChatGPT 通过其卓越的语言理解和生成能力,改变了人们与计算机交互的方式…...

WPS数据分析000001
目录 一、表格的新建、保存、协作和分享 新建 保存 协作 二、认识WPS表格界面 三、认识WPS表格选项卡 开始选项卡 插入选项卡 页面布局选项卡 公式选项卡 数据选项卡 审阅选项卡 视图选项卡 会员专享选项卡 一、表格的新建、保存、协作和分享 新建 ctrlN------…...

电脑风扇声音大怎么办? 原因及解决方法
电脑风扇是电脑的重要组件之一,它的作用是为电脑的各个部件提供冷却,防止电脑过热。然而,有时候我们会发现电脑风扇的声音特别大,不仅影响我们的使用体验,也可能是电脑出现了一些问题。那么,电脑风扇声音大…...

高效实现 Markdown 转 PDF 的跨平台指南20250117
高效实现 Markdown 转 PDF 的跨平台指南 引言 Markdown 文件以其轻量化和灵活性受到开发者和技术写作者的青睐,但如何将其转换为易于分享和打印的 PDF 格式,是一个常见需求。本文整合了 macOS、Windows 和 Linux 三大平台的转换方法,并探讨…...
Spark Streaming的核心功能及其示例PySpark代码
Spark Streaming是Apache Spark中用于实时流数据处理的模块。以下是一些常见功能的实用PySpark代码示例: 基础流处理:从TCP套接字读取数据并统计单词数量 from pyspark import SparkContext from pyspark.streaming import StreamingContext# 创建Spar…...
自动驾驶占用网格预测
文章目录 需要阅读的文献:github论文仓库论文idea提取BEVFormer 需要阅读的文献: ⭐[ECCV 2024] SparseOcc 纯稀疏3D占用网络和 RayIoU 评估指标 ECCV 2024|OSP:自动驾驶全新建模方法,端到端输出任意位置的占用结果 S…...

力扣动态规划-2【算法学习day.96】
前言 ###我做这类文章一个重要的目的还是给正在学习的大家提供方向(例如想要掌握基础用法,该刷哪些题?建议灵神的题单和代码随想录)和记录自己的学习过程,我的解析也不会做的非常详细,只会提供思路和一些关…...

软考高级5个资格、中级常考4个资格简介及难易程度排序
一、软考高级5个资格 01、网络规划设计师 资格简介:网络规划设计师要求考生具备全面的网络规划、设计、部署和管理能力;该资格考试适合那些在网络规划和设计方面具有较好理论基础和较丰富从业经验的人员参加。 02、系统分析师 资格简介:系统分…...
2.5 如何评估表示学习
如何评估表示学习 评估表示学习的质量和有效性是确保模型能够成功应用于实际任务的关键步骤。表示学习的目标是从数据中学习到一种有效的、低维的表示,使得下游任务(如分类、回归、聚类等)能够更好地执行。因此,评估表示学习的效果涉及多个维度,包括表示的质量、其对下游…...

Linux-day08
第17章 大数据定制篇-shell编程 shell编程快速入门 shell变量 设置环境变量 把行号打开 set nu 位置参数变量 预定义变量 在一个脚本中执行了另外一个脚本所以卡住了 CTRLC退出 运算符 operator运算符 条件判断 流程控制 单分支多分支 case语句 for循环 反复的把取出来的i值…...

stack_queue的底层,模拟实现,deque和priority_queue详解
文章目录 适配器Stack的模拟实现Queue的模拟实现vector和list的对比dequedeque的框架deque的底层 priority_queuepriority_queue的使用priority_queue的底层仿函数的使用仿函数的作用priority_queue模拟实现 适配器 适配器是一种模式,这种模式将类的接口转化为用户希…...

LabVIEW 实现线路板 PCB 可靠性测试
在电子设备制造领域,线路板 PCB(Printed Circuit Board)的可靠性直接影响产品的整体性能和使用寿命。企业在生产新型智能手机主板时,需要对 PCB 进行严格的可靠性测试,以确保产品在复杂环境下能稳定运行。传统的测试方…...

sqlfather笔记
这里简单记录写学习鱼皮sqlfather项目的笔记,以供以后学习。 运行 将前后端项目clone到本地后,修改对应配置文件运行项目。 后端 1.配置好mysql后运行这个sql文件建立对应的表。 2.修改数据库密码 3.修改完后运行启动类即可 4. 启动结果 5.查看A…...

RabbitMQ(四)
SpringBoot整合RabbitMQ SpringBoot整合1、生产者工程①创建module②配置POM③YAML④主启动类⑤测试程序 2、消费者工程①创建module②配置POM③YAML文件内配置: ④主启动类⑤监听器 3、RabbitListener注解属性对比①bindings属性②queues属性 SpringBoot整合 1、生…...

【Unity3D】远处的物体会闪烁问题(深度冲突) Reversed-Z
知识点:深度冲突、像素闪烁现象、Reversed-Z(反向Z)、浮点数精度问题 前提概要:深度值都是由32位浮点数存储 原因:深度冲突,多个物体之间无法正确地渲染远近关系,出现上一帧可能是A物体在B物体…...

探索与创作:2024年CSDN平台上的成长与突破
文章目录 我与CSDN的初次邂逅初学阶段的阅读CSDN:编程新手的避风港初学者的福音:细致入微的知识讲解考试复习神器:技术总结的“救命指南”曾经的自己:为何迟迟不迈出写博客的第一步兴趣萌芽:从“读”到“想写”的初体验…...

QT笔记- Qt6.8.1 Android编程 添加AndroidManifest.xml文件以支持修改权限
1. 切换项目选项卡,找到构建的步骤下的最后一项构建安卓APK,展开后找到应用程序栏,点击安卓自定义中的创建模板. 2. 弹出对话框勾选图中选项后点完成 3. 回到项目,查看.pro文件,里面多了很多内容不管,在下…...
【Leetcode 每日一题 - 扩展】421. 数组中两个数的最大异或值
问题背景 给你一个整数数组 n u m s nums nums,返回 n u m s [ i ] X O R n u m s [ j ] nums[i]\ XOR\ nums[j] nums[i] XOR nums[j] 的最大运算结果,其中 0 ≤ i ≤ j < n 0 ≤ i ≤ j < n 0≤i≤j<n。 数据约束 1 ≤ n u m s . l e n g…...
计算机网络 | IP地址、子网掩码、网络地址、主机地址计算方式详解
关注:CodingTechWork 引言 在计算机网络中,IP地址、子网掩码和网络地址是构建网络通信的基本元素。无论是企业网络架构、互联网连接,还是局域网(LAN)配置,它们都起着至关重要的作用。理解它们的工作原理&a…...

循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...

STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...

华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...

mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包
文章目录 现象:mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时,可能是因为以下几个原因:1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...

【Linux系统】Linux环境变量:系统配置的隐形指挥官
。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量:setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...
提升移动端网页调试效率:WebDebugX 与常见工具组合实践
在日常移动端开发中,网页调试始终是一个高频但又极具挑战的环节。尤其在面对 iOS 与 Android 的混合技术栈、各种设备差异化行为时,开发者迫切需要一套高效、可靠且跨平台的调试方案。过去,我们或多或少使用过 Chrome DevTools、Remote Debug…...