Linux - 高级IO
目录
- 理解五种IO模型
- 非阻塞IO的设置
- 多路转接之select
- 实现一个简易的select服务器
- select服务器的优缺点
- 多路转接之poll
- 实现一个简易的poll服务器
- poll服务器的优缺点
- 多路转接之epoll
- epoll原理
- epoll的优势
- 用epoll实现一个简易的echo服务器
- epoll的LT和ET工作模式
- 什么是LT和ET
- 实现一个简易的reactor服务器
五种IO模型
如何理解五种IO模型:
应用层调用read/recv和write/send的时候,本质就是数据在用户层和操作系统的缓冲区的拷贝交互。要进行拷贝,必须先判断条件成立,该条件就是读写事件就绪。如果条件不成立,这些函数就会被阻塞住,等待资源就绪。
所以输入输出IO可以分为两个步骤:等待 + 拷贝
给大家举个例子帮助大家理解五种IO模型:
钓鱼 = 等待 + 钓(动作)
河边的张三在钓鱼,张三的钓鱼动作:一直盯着鱼漂,直到鱼漂抖动就说明有鱼来了。
李四也在钓鱼,李四的钓鱼动作:一边做自己的事,一边时不时观察鱼漂。
王五有个高级鱼竿,鱼漂上带着铃铛,王五的钓鱼动作:一直做自己的事,铃铛鱼漂响了,就去钓鱼
赵六买了20个鱼竿钓鱼,赵六的钓鱼动作:一直检查每个鱼漂,任何一个鱼漂动了就钓鱼。
小王开车载田七去公司的时候,田七路过的时候看见别人钓鱼,突然自己想吃野生鱼了,就叫小王去钓鱼,钓到了就给田七送过去。
人:进程/线程。鱼:数据。河:内核空间。鱼漂:数据就绪的事件。鱼竿:文件描述符(都是通过文件描述符进行操作)。钓鱼:recv/read
张三对应的是阻塞式IO。李四对应的是非阻塞IO,王五对应的是信号阻塞式IO(其中王五提前是知道铃铛响了该怎么办,该钓鱼),赵六对应的是多路转接/多路复用IO。田七对应的是异步IO(小王(操作系统)有鱼了告诉就告诉田七,田七直接用即可)
在这几个人中,你会觉得哪个人的钓鱼效率是最高的(钓的鱼最多的)?很明显是赵六,因为河中的鱼对每个钩子的咬钩概率是相同的,而赵六的钩子占比是最大的,所以必定单位时间内钓鱼的数量是最多的。
阻塞式IO:在内核将数据准备好之前,系统调用会一直等待,所有的文件描述符,默认都是阻塞方式。阻塞IO是最常见的IO模型,因为简单易上手。
非阻塞IO:如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码。非阻塞IO往往需要程序员循环的方式反复尝试读写该文件描述符,这个过程称为轮询,这对CPU来说是较大的浪费,一般只有特定场景下才使用。
信号驱动IO:内核将数据准备好的时候,使用SIGIO信号通知应用程序进行IO操作
IO多路转接:虽然从流程上看起来和阻塞IO类似,实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态
异步IO:由内核在数据拷贝完成时,通知应用程序(而信号驱动是告诉进程何时可以开始拷贝数据,需要进程自己拷贝)
什么叫做高效IO呢?
任何IO过程,都包含两个步骤,等待 + 拷贝。而且在实际的应用场景中,等待消耗的时间往往都远远高于拷贝的时间。让IO更高效,最核心的办法就是让等待的时间尽量少。在单位时间内,IO过程中,等待对于拷贝的比重越小,IO效率越高。
阻塞式IO和非阻塞IO有什么区别?
区别:等待时的状态不同。
阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在到得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
同步IO和异步IO有什么区别?
同步,就是在发出一个调用时,在没有得到该调用的结果之前,该调用就不会返回。但是一旦调用返回,就得到返回值了。换句话说,就是有调用者主动等待这个调用结果。
异步,调用在发出之后,这个调用就直接返回,所以没有返回结果。直到被动的通过状态/信号来通知来处理
非阻塞IO的设置
如何设置非阻塞IO?
以前我们在写网络编程的时候,recv的flag选项填的是0
如果我们想非阻塞等待,则可以设flags为MSG_DONTWAIT。也可以用open非阻塞的方式打开,方法有很多。
但提供一种更通用的做法,文件描述符本质就是下标,每一个下标指向的就是内核里的文件对象,文件对象中是有文件描述符标志的。
传入的cmd值不同,后面追加的参数也不相同
fcntl函数有5种功能:
复制一个现有的描述符(cmd = F_DUPFD)
获得/设置文件描述符标记(cmd = F_GETFD或F_SETFD)
获得/设置文件状态标记(cmd = F_GETFL或F_SETFL)
获得/设置异步I/O所有权(cmd = F_GETOWN或F_SETOWN)
获得/设置记录锁(cmd = F_GETLK, F_SETLK或F_SETLKW)
实现非阻塞轮询方式读取标准输入
#include <iostream>
#include <unistd.h>
#include <fcntl.h>void SetNoBlock(int fd)
{int flag = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}int main()
{SetNoBlock(0);char buffer[4096];while (1){std::cout << "Enter#";fflush(stdout);int n = read(0, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << buffer << std::endl;}else if (n == 0){std::cout << "read done" << std::endl;break;}else{std::cout << "read error: " << strerror(errno) <<std::endl;break;}}return 0;
}
为什么还没有输入就直接读出错了?错误信息是资源暂时还没有就绪。
结论:
设置成为非阻塞,如果底层fd数据没有就绪,recv/read/write/send,返回值会以出错的形式返回。这样会有两种情况:a、真的出错 b、底层资源没有就绪
我们怎么区分呢? – 通过errno区分
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>void SetNoBlock(int fd)
{int flag = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}int main()
{SetNoBlock(0);char buffer[4096];while (1){int n = read(0, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << buffer << std::endl;}else if (n == 0){std::cout << "read done" << std::endl;break;}else{if (errno == EWOULDBLOCK){//只是底层没有数据就绪,这种错误是可以容忍的//do_other_thing(); //也可以做其他事情,设置一些方法,就不演示了continue;}std::cout << "read error: " << strerror(errno) <<std::endl;break;}}return 0;
}
多路转接之select
select是干什么的?select负责多路转接IO中的等待,一次可以等待多个文件描述符,并不负责拷贝。
select系统调用是用来让我们的程序监视多个文件描述符的状态变化的,程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态变化。
nfds:填写要监视的所有要监视的fd中的最大值 + 1。这与底层内核有关系,内核会从0一直遍历到nfds。
timeout:为输入输出型参数
如果设置为nullptr,则为阻塞式。
若设置timeout = {0, 0};为非阻塞,会一直函数返回。
若设置timeout = {5, 0};如果一直收不到消息:阻塞5s,每过5s就超时(函数返回)一次,这样反复循环。如果过2s收到了消息,则该参数会返回{3, 0},表示还有3s才超时。
return返回值:大于0,有几个fd就绪了。等于0,超时/非阻塞返回了。小于0,select调用失败了
readfds:为输入输出参数
做输入参数时:表示用户告诉内核,你要帮我关心的文件描述符的读,fd_set表示位图,可以通过位图设置多个文件描述符。
做输出参数时:表示内核告诉用户,你要关心的文件描述中有哪些已经就绪了。
writefds和exceptfds同readfds参数
该位图既然是一种类型,必然有大小,所以能够想关心的fd的个数一定是有上限的。
int main()
{
std::cout << sizeof(fd_set) << std::endl;
return 0;
}
128 * 8 = 1024bit,最多只能关心1024个fd。bit位的位置代表fd是多少,为0则代表没设置该fd,为1则代表设置了该fd。
实现一个简易的select服务器
为了节省篇幅,后面再贴完整代码
#include "Socket.hpp"class SelectServer
{
public:SelectServer(uint16_t port):_port(port){}void Init(){_listensock.CreateSocket();_listensock.Setsockopt(); //设置端口复用_listensock.Bind(_port);_listensock.Listen();}void Start(){while (1){//能否直接调用accept?不能直接accept,因为会阻塞!我们的目标就是写多路转接IO,所以不能出现阻塞}}~SelectServer(){_listensock.Close();}
private:Socket _listensock;uint16_t _port;
};
更新代码
#include "Socket.hpp"class SelectServer
{
public:SelectServer(uint16_t port):_port(port){}void Init(){_listensock.CreateSocket();_listensock.Setsockopt();//设置地址复用_listensock.Bind(_port);_listensock.Listen();}void HandleEvent(){}void Start(){while (1){//能否直接调用accept?不能直接accept,因为会阻塞!fd_set readfd;FD_SET(_listensock.Fd(), &readfd);struct timeval timeout = {3, 0};int n = select(_listensock.Fd() + 1, &readfd, nullptr, nullptr, &timeout);//暂时这样设置if (n < 0){std::cout << "select error: " << strerror(errno) << std::endl;break;}if (n == 0){lg(Debug, "[%d: %d]", timeout.tv_sec, timeout.tv_usec);}else{std::cout << "Get a New Link" << std::endl;sleep(2);HandleEvent();}}}~SelectServer(){_listensock.Close();}
private:Socket _listensock;uint16_t _port;
};
运行结果如下:
timeout设置为{3, 0}可以看到select每隔3s就超时一次,同理如果timeout设置为{0, 0},则会一直超时,如果设置为null,则会阻塞。
客户端连接上后,我们也看到了,他一直在打印Get a New Link,为什么一直打印?因为上层没有把底层数据拿上去处理,所以select就会一直提醒事件就绪了。
怎么处理?
更新代码
#include "Socket.hpp"class SelectServer
{
public:SelectServer(uint16_t port):_port(port){}void Init(){_listensock.CreateSocket();_listensock.Setsockopt();_listensock.Bind(_port);_listensock.Listen();}void HandleEvent(fd_set* rfd){//代码走到这里就说明我们的连接事件就绪了if (FD_ISSET(_listensock.Fd(), rfd) == true){string clientIp; uint16_t clientPort;int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success: %s, %d\n", clientIp.c_str(), clientPort);}}void Start(){while (1){//能否直接调用accept?不能直接accept,因为会阻塞!fd_set readfd;FD_SET(_listensock.Fd(), &readfd);struct timeval timeout = {3, 0};int n = select(_listensock.Fd() + 1, &readfd, nullptr, nullptr, /*&timeout*/0);if (n < 0){std::cout << "select error: " << strerror(errno) << std::endl;break;}if (n == 0){lg(Debug, "[%d: %d]", timeout.tv_sec, timeout.tv_usec);}else{std::cout << "Get a New Link" << std::endl;sleep(1);HandleEvent(&readfd);}}}~SelectServer(){_listensock.Close();}
private:Socket _listensock;uint16_t _port;
};
代码运行结果:
更新HandleEvent函数的代码
void HandleEvent(fd_set* rfd)
{//代码走到这里就说明我们的连接事件就绪了if (FD_ISSET(_listensock.Fd(), rfd) == true){string clientIp; uint16_t clientPort;int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success: %s, %d\n", clientIp.c_str(), clientPort);char buffer[4096];ssize_t n = read(newSockfd, buffer, sizeof(buffer) - 1);/*我们已经得到了新的sock,可以直接用read接口吗?不可以,因为我们这里是单线程/单进程,直接read可能会被阻塞住,以前我们的代码是直接托管给了多线程/多进程,他们的阻塞不会影响主进程。我们这个单进程应该将新到来的newSocketfd交给select,让select帮我们来管理*/}
}
所以此时,我们需要将HandleEvent函数里面的newSocketfd传递给Start函数。如何做呢?可以使用数组作为该类的成员 – 这个数组也叫做辅助数组,这也是select最大的特点之一:使用辅助数组,让文件描述符在函数之间互相传递。
#include "Socket.hpp"class SelectServer
{static const int MaxFdNum = sizeof(fd_set) * 8; //因为位图fd_set有大小,所以最多只能管理1024个fdstatic const int DefaultNum = -1;
public:SelectServer(uint16_t port):_port(port){//初始化辅助数组for (int i = 0; i < MaxFdNum; ++i){read_fd_array[i] = DefaultNum;}}void Init(){_listensock.CreateSocket();_listensock.Setsockopt();_listensock.Bind(_port);_listensock.Listen();}void HandleEvent(fd_set* rfd){for (int i = 0; i < MaxFdNum; ++i)//遍历所有的read_fd_array中是否有满足条件FD_ISSET{int socket = read_fd_array[i];if (socket == DefaultNum)continue;if (FD_ISSET(socket, rfd) == true && _listensock.Fd() == socket){//代码走到这里就说明我们的连接事件就绪了string clientIp; uint16_t clientPort;int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);//添加新的文件描述符到辅助数组for (int i = 0; i < MaxFdNum; ++i){if (read_fd_array[i] == DefaultNum){if (_maxfd < newSockfd)_maxfd = newSockfd;read_fd_array[i] = newSockfd;break;}if (i == MaxFdNum - 1){lg(Warning, "server is full, close sock: %d", newSockfd);close(newSockfd);}}}else if (FD_ISSET(socket, rfd) == true){//说明是其他的套接字读事件就绪了char buffer[4096];int n = read(socket, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;lg(Info, "client say:%s", buffer);}else if (n == 0){//说明对端把连接关闭了lg(Info, "client closed...");//把该socket就可以移除关心状态了read_fd_array[i] = DefaultNum;close(socket);}else{//读出错了lg(Error, "read error");//把该socket就可以移除关心状态了read_fd_array[i] = DefaultNum;close(socket);}}}}void Start(){//将lisntensockfd设置进辅助数组read_fd_array[0] = _listensock.Fd();_maxfd = read_fd_array[0];while (1){//因为select的参数为输入输出参数,所以我们每次调用select的时候都需要重新设置一遍select的参数,所以需要将下面的语句写入循环里fd_set readfd;FD_ZERO(&readfd); //一定要设置为0,否则可能会出现select失败的问题for (int i = 0; i < MaxFdNum; ++i){if (read_fd_array[i] != DefaultNum){if (_maxfd < read_fd_array[i]) _maxfd = read_fd_array[i];//将辅助数组中所要关心的文件描述符全部都设置进去FD_SET(read_fd_array[i], &readfd);}}int n = select(_maxfd + 1, &readfd, nullptr, nullptr, 0);//为了易于观察,我们将timout参数设置为0if (n < 0){std::cout << "select error: " << strerror(errno) << std::endl;break;}if (n == 0){//超时返回(非阻塞返回)lg(Info, "timeout...");}else{HandleEvent(&readfd);}}}~SelectServer(){_listensock.Close();}
private:int _maxfd; //最大的文件描述符是什么Socket _listensock;uint16_t _port;int read_fd_array[MaxFdNum]; //记录的是需要关心的读事件的文件描述符,-1表示不关心,非-1表示需要关心的文件描述符是什么// int write_fd_array[MaxFdNum]; //仅为了理解多路转接IO,不考虑这两种情况,情况简单化,只考虑读事件// int except_fd_array[MaxFdNum];
};
代码运行结果:
注意,我们的可是单进程哦。实现了并发处理多个请求。
整理代码,完整代码(允许结果和上面演示效果相同):
Log.hpp文件 – 往期文章实现过
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>using namespace std;#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4#define Screen 1
#define Onefile 2
#define Classfile 3#define fileName "log.txt"
//使用前需要创建log目录
class Log
{
public:Log(){printMethod = Screen;path = "./log/";}void Enable(int method){printMethod = method;}void printOneFile(string logname, const string& logtxt){logname = path + logname;int fd = open(logname.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);//open只会创建文件不会创建目录if (fd < 0){perror("open failed");return;}write(fd, logtxt.c_str(), logtxt.size());close(fd);}void printClassFile(int level, const string& logtxt){string filename = fileName;filename += ".";filename += leveltoString(level);printOneFile(filename, logtxt);}void printLog(int level, const string& logtxt){if (printMethod == Screen){cout << logtxt << endl;return;}else if (printMethod == Onefile){printOneFile(fileName, logtxt);return;}else if (printMethod == Classfile){printClassFile(level, logtxt);return;}}const char* leveltoString(int level){if (level == Info) return "Info";else if (level == Debug) return "Debug";else if (level == Error) return "Error";else if (level == Fatal) return "Fatal";else return "default";}void operator()(int level, const char* format, ...){time_t t = time(nullptr);struct tm* st = localtime(&t);char leftbuffer[4096];snprintf(leftbuffer, sizeof(leftbuffer), "year: %d, month: %d, day: %d, hour: %d, minute: %d, second: %d\n\[%s]:",st->tm_year + 1900, st->tm_mon + 1, st->tm_mday, st->tm_hour, st->tm_min, st->tm_sec, leveltoString(level));char rightbuffer[4096];va_list start;va_start(start, format);vsnprintf(rightbuffer, sizeof(rightbuffer), format, start);va_end(start);char logtxt[4096 * 2];snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);printLog(level, logtxt);}
private:int printMethod;string path;//路径与文件名解耦,最后将路径和文件粘合起来,再用open打开即可
};
Sock.hpp文件 – 往期文章实现过
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <cstring>
#include "Log.hpp"Log lg;
class Socket
{
public:Socket(){}~Socket(){}void CreateSocket(){_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){lg(Fatal, "create socket failed:%s", strerror(errno));exit(1);}}void Bind(uint16_t serverPort){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_port = htons(serverPort);server.sin_addr.s_addr = INADDR_ANY;int n = bind(_sockfd, (struct sockaddr*)&server, sizeof(server));if (n < 0){lg(Fatal, "bind socket failed:%s", strerror(errno));exit(2);}}void Listen(){int n = listen(_sockfd, 5);if (n < 0){lg(Fatal, "listen socket failed:%s", strerror(errno));exit(3);}}int Accept(string* clientIp, uint16_t* clinetPort){struct sockaddr_in peer;socklen_t len = sizeof(peer);int newsocket = accept(_sockfd, (struct sockaddr*)&peer, &len);if (newsocket < 0){lg(Error, "accept error:%s", strerror(errno));return -1;}*clientIp = inet_ntoa(peer.sin_addr);*clinetPort = ntohs(peer.sin_port);return newsocket;}int Connect(const string& serverIp, uint16_t serverPort){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_addr.s_addr = inet_addr(serverIp.c_str());server.sin_port = htons(serverPort);int n = connect(_sockfd, (struct sockaddr*)&server, sizeof(server));if (n < 0){lg(Error, "connect error:%s", strerror(errno));return -1;}return 0;}void Setsockopt(){int opt = 1;if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt)) < 0)lg(Error, "%s\n", strerror(errno));}void Close(){close(_sockfd);}int Fd(){return _sockfd;}
private:int _sockfd;
};
SelectServer.hpp文件
#include "Socket.hpp"class SelectServer
{static const int MaxFdNum = sizeof(fd_set) * 8; //因为位图fd_set有大小,所以最多只能管理1024个fdstatic const int DefaultNum = -1;
public:SelectServer(uint16_t port):_port(port){//初始化辅助数组for (int i = 0; i < MaxFdNum; ++i){read_fd_array[i] = DefaultNum;}}void Init(){_listensock.CreateSocket();_listensock.Setsockopt();_listensock.Bind(_port);_listensock.Listen();}void Recver(int socket, int pos){char buffer[4096];int n = read(socket, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;lg(Info, "client say:%s", buffer);}else if (n == 0){//说明对端把连接关闭了lg(Info, "client closed...");//把该socket就可以移除关心状态了read_fd_array[pos] = DefaultNum;close(socket);}else{//读出错了lg(Error, "read error");//把该socket就可以移除关心状态了read_fd_array[pos] = DefaultNum;close(socket);}}void Acceptor(){string clientIp; uint16_t clientPort;int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);//添加新的文件描述符到辅助数组for (int i = 0; i < MaxFdNum; ++i){if (read_fd_array[i] == DefaultNum){if (_maxfd < newSockfd)_maxfd = newSockfd;read_fd_array[i] = newSockfd;break;}if (i == MaxFdNum - 1){lg(Warning, "server is full, close sock: %d", newSockfd);close(newSockfd);}}}void Dispatcher(fd_set* rfd) //事件派发器{for (int i = 0; i < MaxFdNum; ++i)//遍历所有的read_fd_array中是否有满足条件FD_ISSET{int socket = read_fd_array[i];if (socket == DefaultNum)continue;if (FD_ISSET(socket, rfd) == true && _listensock.Fd() == socket){//代码走到这里就说明我们的连接事件就绪了Acceptor();}else if (FD_ISSET(socket, rfd) == true){//说明是其他的套接字读事件就绪了Recver(socket, i);}}}void EventLoop() //事件循环{//将lisntensockfd设置进辅助数组read_fd_array[0] = _listensock.Fd();_maxfd = read_fd_array[0];while (1){//因为select的参数为输入输出参数,所以我们每次调用select的时候都需要重新设置一遍select的参数,所以需要将下面的语句写入循环里fd_set readfd;FD_ZERO(&readfd); //一定要设置为0,否则可能会出现select失败的问题for (int i = 0; i < MaxFdNum; ++i){if (read_fd_array[i] != DefaultNum){if (_maxfd < read_fd_array[i]) _maxfd = read_fd_array[i];//将辅助数组中所要关心的文件描述符全部都设置进去FD_SET(read_fd_array[i], &readfd);}}int n = select(_maxfd + 1, &readfd, nullptr, nullptr, 0);//为了易于观察,我们将timout参数设置为0if (n < 0){std::cout << "select error: " << strerror(errno) << std::endl;break;}if (n == 0){//超时返回(非阻塞返回)lg(Info, "timeout...");}else{Dispatcher(&readfd); //事件派发器}}}~SelectServer(){_listensock.Close();}
private:int _maxfd; //最大的文件描述符是什么Socket _listensock;uint16_t _port;int read_fd_array[MaxFdNum]; //记录的是需要关心的读事件的文件描述符,-1表示不关心,非-1表示需要关心的文件描述符是什么// int write_fd_array[MaxFdNum]; //仅为了理解多路转接IO,不考虑这两种情况,情况简单化,只考虑读事件// int except_fd_array[MaxFdNum];
};
main.cc文件
#include "SelectServer.hpp"int main()
{SelectServer svr(8080);svr.Init();svr.EventLoop();return 0;
}
select服务器的优缺点
优点:能实现成多路转接的服务器,即单进程也能处理多用户的请求
缺点:
1.同时等待的fd是有上限的,因为位图fd_set是有大小的。
2.用户必须借助第三方数组来维护合法的fd
3.使用select接口设置参数时,每次都要对关心的fd进行事件重置。
4.数据拷贝的频率比较高,select函数需要每次都需要重新设置参数,每传一次位图fd_set内核就需要拷贝一次。
5.如果只有一个fd就绪,用户层也需要全部遍历(虽然可以改进但没必要)
6.内核中检测位图fd_set的事件就绪也要遍历(这也就是为什么select的第一个参数传的是文件描述符的最大值,因为内核要以这个范围来遍历)
多路转接之poll
为了解决上面一些的问题,poll就出现了
fds:struct pollfd数组的首地址
nfds:数组元素的个数
timeout:设置超时时间,单位是ms
填0:非阻塞等待
小于0:表示阻塞等待
大于0:每过timeout就超时一次。(理解同select)
fd:你要让内核关心的某一个fd
events:输入参数,用户让内核关心fd的事件
revents:输出参数,内核告诉用户,你要关心的fd哪些事件就绪了
从这里看出来了,poll将输入和输出参数分离,所以poll不需要像select一样每次都需要将参数重新设定。
short是一个16bit的位图,events和revents的取值如下:
| 事件 | 描 述 | 是否可作为输入 | 是否可作为输出 |
|---|---|---|---|
| POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
| POLLRDNORM | 普通数据可读 | 是 | 是 |
| POLLRDBAND | 优先级带数据可读(Linux不支持) | 是 | 是 |
| POLLPRI | 高优先级数据可读,比如TCP带外数据 | 是 | 是 |
| POLLOUT | 数据(包括普通数据和优先数据)可写 | 是 | 是 |
| POLLWRNORM | 普通数据可写 | 是 | 是 |
| POLLWRBAND | 优先级带数据可写 | 是 | 是 |
| POLLRDHUP | POLLRDHUPTCP连接被对方关闭,或者对方关闭了写操作。它GNU 引入 | 是 | 是 |
| POLLERR | 错误 | 否 | 是 |
| POLLHUP | 挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 | 否 | 是 |
| POLLNVAL | 文件描述符没有打开 | 否 | 是 |
select只分为了读、写、异常事件,而poll将fd的事件分的更细了。
实现一个简易的poll服务器
参考代码:
Socket.hpp、Log.hpp文件同上
PollerServer.hpp文件
#include "Socket.hpp"
#include <poll.h>
#include <vector>class PollServer
{static const int DefaultNum = -1;
public:PollServer(uint16_t port):_port(port){}void Init(){_listensock.CreateSocket();_listensock.Setsockopt();_listensock.Bind(_port);_listensock.Listen();}void Recver(int socket, int pos){char buffer[4096];int n = read(socket, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;lg(Info, "client say:%s", buffer);}else if (n == 0){//说明对端把连接关闭了lg(Info, "client closed...");//把该socket就可以移除关心状态了_event_fds.erase(_event_fds.begin() + pos);close(socket);}else{//读出错了lg(Error, "read error");//把该socket就可以移除关心状态了_event_fds.erase(_event_fds.begin() + pos);close(socket);}}void Acceptor(){string clientIp; uint16_t clientPort;int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);//添加新的文件描述符到辅助数组struct pollfd event;event.fd = newSockfd;event.events |= POLLIN;_event_fds.push_back(move(event));}void Dispatcher() //事件派发器{for (int i = 0; i < _event_fds.size(); ++i)//遍历所有的_event_fds中是否有revents事件就绪的{int socket = _event_fds[i].fd;if ((_event_fds[i].revents & POLLIN) == true){if (_listensock.Fd() == socket){//代码走到这里就说明我们的连接事件就绪了Acceptor();}else{//说明是其他的套接字读事件就绪了Recver(socket, i);}}else if ((_event_fds[i].revents & POLLOUT) == true){//写事件就绪}else{//...}}}void EventLoop() //事件循环{//将lisntensockfd设置进辅助数组struct pollfd event;event.fd = _listensock.Fd();event.events |= POLLIN;_event_fds.push_back(move(event)); //将listensock添加到_event_fds数组里while (1){int n = poll(_event_fds.data(), _event_fds.size(), -1); //易于观察就设置为-1if (n < 0){std::cout << "poll error: " << strerror(errno) << std::endl;break;}if (n == 0){//超时返回(非阻塞返回)lg(Info, "timeout...");}else{Dispatcher(); //事件派发器}}}~PollServer(){_listensock.Close();}
private:Socket _listensock;uint16_t _port;std::vector<struct pollfd> _event_fds;
};
main.cc文件
#include "PollServer.hpp"int main()
{PollServer svr(8080);svr.Init();svr.EventLoop();return 0;
}
运行结果
poll服务器的优缺点
优点:
pollfd结构包含了要监视的event事件和就绪的revent事件,输入与输出分离了,接口使用比select方便。
poll解除了fd有上限的问题,数组为vector了,vector最大能有多大取决于操作系统了,不关poll函数的事了。
缺点:
用户层和内核层都需要遍历该数组,都是o(n)的效率,如果用户要关心的fd有上万个,频繁的遍历会影响效率问题。
每次调用poll,都需要把大量的pollfd结果从用户态拷贝到内核中。
有大量fd,若只有少量的fd处于就绪状态,也需要全部线性遍历一遍,随着监视描述符数量的增长,其效率也会线性下降。
总结:解决了select的1、3问题
多路转接之epoll
按照man手册的说法:是为处理大批量的socketfd而作了改进的poll。它是在2.5.44内核中被引进的,linux2.6下,它几乎具备了之前所说的一切优点,被公认为性能最好的多路转接IO。现在只要涉及服务器组件的,都用的是epoll。
size:该参数已经被废弃,但是要填大于0的值
该函数会创建一个epoll模型,返回epoll模型的文件描述符,什么是epoll模型,后面会讲。
epfd:epoll模型的描述符
events:输出型参数,struct epoll_event数组的首地址
maxevents:该数组的个数
timeout:意义同epoll参数的timeout
返回值和select、poll一样
fd:当事件就绪时,能知道该事件是哪个fd就绪了events可以是以下几个宏的集合
| 事件 | 描 述 |
|---|---|
| EPOLLIN | 表示对应的文件描述符可以读(包括对端SOCKET正常关闭) |
| EPOLLOUT | 表示对应的文件描述符可以写 |
| POLLPRI | 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来) |
| POLLERR | 表示示对应的文件描述符发生错误 |
| POLLPRI | 表示对应的文件描述符被挂断 |
| POLLPRI | 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的 |
| EPOLLONESHOT | 只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里 |
epfd:epoll模型
op、fd、event:用户告诉内核,要对fd的event事件进行op处理
op:
EPOLL_CTL_ADD添加事件
EPOLL_CTL_MOD修改事件
EPOLL_CTL_DEL删除事件
epoll解决了select的所有缺点,epoll是怎样设计的呢?它的原理是什么?
epoll原理
如图所示,内核中的三个部分(红黑树,回调函数,就绪队列)组成了epoll模型。epoll模型并且由struct_file结构体所指向,想管理epoll模型,就能通过fd找到epoll模型。
红黑树管理已注册的文件描述符:epoll_ctl对红黑树的增删改操作时间复杂度为O(logN)
就绪事件的通知机制:每当有fd就绪了,网卡驱动就会通知(调用回调函数)epoll,并执行内核曾经注册好的回调函数。有了这个就绪事件通知机制,内核就不用再O(N)的遍历所有的文件描述符是否已经就绪
就绪列表的管理:epoll_wait所捞取的都是已经就绪的
epoll的优势
1.用户层不再需要每次调用接口时都将关心fd拷贝给内核了
2.管理fd个数没有上限
3.不再需要辅助数组了,因为内核中的红黑树就代替了曾经用户要管理的数组
4.内核中通过事件通知机制O(1)就能将就绪fd放入就绪队列,不再需要遍历所有的fd,来判断是否就绪
5.对事件的管理更方便,只需要对epoll_ctl进行操作
select和poll的底层如图所示
用epoll实现一个简易的echo服务器
参考代码:
Log.hpp文件 和 Socket.hpp文件和之前一样
Epoller.hpp文件
#include <sys/epoll.h>
#include "Log.hpp"
#include <cstring>class Epoller
{
public:Epoller(){}void EpollerCreate(){_epollfd = epoll_create(64);if (_epollfd < 0){lg(Fatal, "Create epoll failed: %s", strerror(errno));abort();}}void EpollerUpate(int oper, int fd, uint32_t event){int n = 0;if (oper == EPOLL_CTL_ADD || oper == EPOLL_CTL_MOD){struct epoll_event epEvent;epEvent.data.fd = fd;epEvent.events = 0;epEvent.events |= event;n = epoll_ctl(_epollfd, oper, fd, &epEvent);if (n < 0){lg(Error, "epoll ctl failed: %s", strerror(errno));}}else if (oper == EPOLL_CTL_DEL){n = epoll_ctl(_epollfd, oper, fd, 0);if (n < 0){lg(Error, "epoll ctl failed: %s", strerror(errno));}}}int Wait(struct epoll_event* events, int n, int timeout){int m = epoll_wait(_epollfd, events, n, timeout);if (m < 0){lg(Error, "epoll wait failed: %s", strerror(errno));}else if (m == 0){//超时返回(非阻塞返回)lg(Info, "timeout...");}return m;}int Fd(){return _epollfd;}
private:int _epollfd;
};
nocpy.hpp文件
//编程技巧:写防拷贝的类的时候直接继承,就不用自己再手动写了
class nocpy
{
public:nocpy(){}~nocpy(){}
private:nocpy(const nocpy&) = delete;nocpy& operator=(const nocpy&) = delete;
};
EpollServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>class EpollServer : public nocpy
{static const int num = 64; //一次性最多捞取上来多少个就绪fd, 一次捞取不完下一次可以继续捞取
public:EpollServer(uint16_t port):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->Listen();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());_epoller_ptr->EpollerCreate();lg(Info, "create epoller success");}void Recver(int socket, int pos){char buffer[4096];int n = read(socket, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;lg(Info, "client say:%s", buffer);string s = "server echo:";s += buffer;int m = write(socket, s.c_str(), s.size());if (m < 0){lg(Error, "write failed: %s", strerror(errno));}}else if (n == 0){//说明对端把连接关闭了lg(Info, "client closed...");//把该socket就可以移除关心状态了_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, socket, 0);close(socket);}else{//读出错了lg(Error, "read error");//把该socket就可以移除关心状态了_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, socket, 0);close(socket);}}void Acceptor(){string clientIp; uint16_t clientPort;int newSockfd = _listensock_ptr->Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);//添加新的文件描述符到Epoller里面_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, newSockfd, EPOLLIN);}void Dispatcher(struct epoll_event* recvs, int n) //事件派发器{for (int i = 0; i < n; ++i)//recvs数组里面全部都是就绪的fd{int socket = recvs[i].data.fd;if ((recvs[i].events & EPOLLIN) == true){if (_listensock_ptr->Fd() == socket){//代码走到这里就说明我们的连接事件就绪了Acceptor();}else{//说明是其他的套接字读事件就绪了Recver(socket, i);}}else if ((recvs[i].events & EPOLLOUT) == true){//写事件就绪}else{//...}}}void EventLoop() //事件循环{//添加_listensock到Epoller里面_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, _listensock_ptr->Fd(), EPOLLIN);struct epoll_event recvs[num];while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //易于观察就设置为-1Dispatcher(recvs, n); //捞取上来了n个fd}}~EpollServer(){_listensock_ptr->Close();}
private:std::shared_ptr<Socket> _listensock_ptr; //智能指针优势:1.可共享拷贝资源 - 不怕浅拷贝 2.灵活的生命周期,可随时释放 3.可延迟初始化 4.异常安全std::shared_ptr<Epoller> _epoller_ptr;uint16_t _port;
};
main.cc文件
#include "EpollServer.hpp"int main()
{std::unique_ptr<EpollServer> svr(new EpollServer(8080 ));svr->Init();svr->EventLoop();return 0;
}
代码运行结果:
epoll的LT和ET工作模式
什么是LT和ET
一个现象:
我们将这行代码注释掉,上层不处理新连接到来的这个事件
代码运行结果:
发现底层会一直通知上层,有新事件到来,请上层处理。
一旦有新的连接到来或者有新的数据到来,上层如果你不取走,底层会一直通知你去取走哦,这种模式就叫做LT
LT:水平触发Level Triggered ,ET:边缘触发Edge Triggered
LT、ET,和示波器很像
LT一直处于高电平,表示为真。ET只有从低点到高点变化的时候,才为真
epoll默认模式:LT模式。事件到来,但是上层不处理,高电平,一直有效
ET:底层数据或者连接,从无到有,从有到多,变化的时候,才会通知我们一次。
给大家举一个例子
张三是个快递员,是一个比较负责的快递员。假如你买的快递到了,有6个快递,下楼取一下,你可能在打游戏没时间取,过一会儿,张三又给你打电话:你有6个快递,你下来取一下。你游戏没打完就是不取,但是呢,张三期间也一直打电话。张三的同事李四,李四也有你的快递,路过,张三就给李四说:我帮你派发吧。张三现在一共有十个快递,让你下来取,你不下来取张三就一直会给你打电话,张三送快递的模式就叫做LT模式。
过了一周,又要派发你的快递。但是是李四来派发你的快递,李四给你打电话说:你不来取,我就不给你打电话了,我只通知你一次。你一听:那我还是下去取吧,要是走了我快递里就找不到了,我的快递在一堆快递里,除了快递员不知道哪个是我的快递。你取的时候,发现你只能拿走6个当中的4个,一次拿不完,你把快递拿上去之后剩下的快递你不知道在哪了,索性就不要了,你又去打你的游戏了。李四看到张三也有你的快递,还人情就帮张三派发,又给你打电话:你有新的快递到了,你不下来取,我就走了。你一听,这人只通知一次,你就又下去取了,这次就把上次没取完的一并给取走了。
总结:
张三是有快递就一直给你打电话。
李四是从无到有,从有到多,只给你打一次电话,你没取干净,我也不再通知你。
你认为两个快递员谁的工作效率更高呢?
ET,因为这个快递员在单位时间内通知的人数是更多的。ET一小时内可以通知50人,而LT可能只通知了10人。主要是ET的通知效率高。
ET:因为只会通知一次,所以会倒逼程序员使用ET工作模式的时候,每次通知,都必须要把本轮数据全部取走。你怎么知道你把本次就绪底层的数据读取完毕了呢?循环读取,知道读取不到数据了。一般的fd,是阻塞式的fd,如果没有数据了会阻塞,所以在ET模式下,我们的fd必须是非阻塞的。
ET的通知效率高,不仅仅如此,ET的IO效率也更高,原因在于,每通知一次就要求程序员把本轮数据全部取走,这意味着tcp会向对方通告一个更大的窗口,从而概率上让对方一次能给我发送更多的数据,提高了网络的吞吐量,则提高IO效率。
ET的效率也不是一定比LT高,LT也可以将所有的fd设置成为非阻塞,然后循环读取,通知第一次的时候就全部取走,不就和ET一样了嘛。LT是epoll的默认行为,使用ET能够减少epoll触发的次数,但是代价就是强逼着程序员一次就绪响应就把所有的数据都处理完。所以说,如果LT情况下如果也能做到每次就绪的文件描述符都立即处理,不让这个就绪被重复提示的话,其实性能也是一样的。那为什么LT不代替ET呢?因为程序员不能保证完全的替代,会可能写出bug。
实现一个简易的reactor服务器
我们之前read函数只读一次,因为tcp是面向字节流的,所以不能保证读上来的是一个完整的报文。
如果读一次没有读完,那需要读第二次,也就是循环读,所以读到的数据需要存到一个缓冲区里。
下面我们就用epoll来实现一个ET模式的reactor服务器
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd, TcpServer* tcp_server_ptr):_sockfd(fd), _tcp_server_ptr(tcp_server_ptr){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}
private:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};class TcpServer : public nocpy
{
public:TcpServer(uint16_t port):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());}void Dispatcher(int n){for (int i = 0; i < n; ++i){}}void Start(){_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, _listensock_ptr->Fd(), EVENT_IN);while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;
};
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd, TcpServer* tcp_server_ptr):_sockfd(fd), _tcp_server_ptr(tcp_server_ptr){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};class TcpServer : public nocpy
{
public:TcpServer(uint16_t port):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, nullptr, nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将listensock添加到Connection中,同时,将listen和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(_listensock_ptr->Fd(), this));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(_listensock_ptr->Fd(), conn));//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;
};
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd, TcpServer* tcp_server_ptr):_sockfd(fd), _tcp_server_ptr(tcp_server_ptr){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};class TcpServer : public nocpy
{
public:TcpServer(uint16_t port):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(fd, this));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(fd, conn));//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Acceptor(std::shared_ptr<Connection> conn){while (1){std::string clientIp;uint16_t clientPort;Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);if (newSocket.Fd() < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "listensock accept failed: %s", strerror(errno));break;}lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);newSocket.SetNonBlock();AddConnection(newSocket.Fd(), EVENT_IN, nullptr, nullptr, nullptr);}}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;
};
用三个客户端连接,代码运行结果
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd, TcpServer* tcp_server_ptr):_sockfd(fd), _tcp_server_ptr(tcp_server_ptr){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}
public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针std::string _ip;uint16_t _port;
};class TcpServer : public nocpy
{
public:TcpServer(uint16_t port, func_t OnMessage):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(fd, this));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(fd, conn));//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可{lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);auto it = _connections.find(conn->_sockfd);assert (it != _connections.end());_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);close(conn->_sockfd);}void Recver(std::shared_ptr<Connection> conn){int fd = conn->_sockfd;char buffer[4096];while (1){int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取if (n < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "read failed: %s", strerror(errno));conn->_except_cb(conn);return;}else if (n == 0){lg(Info, "server closed...");conn->_except_cb(conn);return;}buffer[n] = 0;conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理}_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理}void Acceptor(std::shared_ptr<Connection> conn){while (1){std::string clientIp;uint16_t clientPort;Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);if (newSocket.Fd() < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "listensock accept failed: %s", strerror(errno));break;}lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);newSocket.SetNonBlock();AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\nullptr, std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;func_t _OnMessage; //让上层处理信息
};
main.cc文件
#include "TcpServer.hpp"void DefaultOnMessage(std::shared_ptr<Connection> conn)
{std::cout << "sock: " << conn->_sockfd <<"的缓冲区里的数据为:" << conn->_inbuffer << std::endl;
}int main()
{std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));svr->Init();svr->Start();return 0;
}
代码运行结果如下
但是发现了一个问题:类似于循环引用,异常事件关闭连接时,导致程序出错。
更新代码
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd):_sockfd(fd){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}
public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针std::string _ip;uint16_t _port;
};class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer> //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:TcpServer(uint16_t port, func_t OnMessage):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(fd));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(fd, conn));if (fd == _listensock_ptr->Fd())conn->_tcp_server_ptr = shared_ptr<TcpServer>(this); //如果是listensock连接,就构造智能指针来管理TcpServerelseconn->_tcp_server_ptr = shared_from_this(); //如果是其他连接,就共享该资源来管理//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可{lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);auto it = _connections.find(conn->_sockfd);assert (it != _connections.end());_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);close(conn->_sockfd);}void Recver(std::shared_ptr<Connection> conn){int fd = conn->_sockfd;char buffer[4096];while (1){int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取if (n < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "read failed: %s", strerror(errno));conn->_except_cb(conn);return;}else if (n == 0){lg(Info, "server closed...");conn->_except_cb(conn);return;}buffer[n] = 0;conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理}_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理}void Acceptor(std::shared_ptr<Connection> conn){while (1){std::string clientIp;uint16_t clientPort;Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);if (newSocket.Fd() < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "listensock accept failed: %s", strerror(errno));break;}lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);newSocket.SetNonBlock();AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\nullptr, std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;func_t _OnMessage; //让上层处理信息
};
运行结果如下:
每一个socket都配套有一个读写缓冲区,为什么要有读缓冲区?因为你不能保证一次read上来的数据就是一个完整的数据。为什么要有写缓冲区?因为你不能保证内核缓冲区是有空间的,也就是你无法保证发送条件是否就绪。
异常和读事件我们都处理了,如何处理写事件呢?
通常情况下,发送缓冲区一般都是有空间的,写事件一般都是就绪的,如果我们设置对EPOLLOUT关心,那EPOLLOUT几乎每次都是就绪,会导致epoll_wait经常返回,浪费CPU资源。
结论:对于读事件,设置常关心,对于写事件,按需设置。什么是按需设置?直接写入,如果写入完成就结束。如果没有将这一轮的数据写完,outbuffer里还有数据,我们就需要对写事件进行关心了,如果写完了,就去掉对写事件的关心。
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd):_sockfd(fd){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}
public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针std::string _ip;uint16_t _port;
};class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer> //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:TcpServer(uint16_t port, func_t OnMessage):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(fd));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(fd, conn));if (fd == _listensock_ptr->Fd()) //如果是listensock连接,就构造智能指针来管理TcpServerconn->_tcp_server_ptr = shared_ptr<TcpServer>(this);elseconn->_tcp_server_ptr = shared_from_this(); //如果是其他连接,就共享该资源来管理//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Sender(std::shared_ptr<Connection> conn){while (1){int n = write(conn->_sockfd, conn->_outbuffer.c_str(), conn->_outbuffer.size());if (n > 0){conn->_outbuffer.erase(0, n);if (conn->_outbuffer.empty() == true){return;}}else if (n == 0){return;}else{if (errno == EWOULDBLOCK || errno == EAGAIN){//说明底层数据不就绪,即内核写缓冲区没有空间了break;}if (errno == EINTR){continue;}lg(Error, "write failed, socket: %d", conn->_sockfd);conn->_except_cb(conn);return;}}if (!conn->_outbuffer.empty()){//说明没有将数据发完,则需要设置写事件关心了uint32_t event = EVENT_OUT | EVENT_IN;_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);}else {//说明将数据发完了,则取消对写事件关心了uint32_t event = EVENT_IN;_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);}}void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可{lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);auto it = _connections.find(conn->_sockfd);assert (it != _connections.end());_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);close(conn->_sockfd);}void Recver(std::shared_ptr<Connection> conn){int fd = conn->_sockfd;char buffer[4096];while (1){int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取if (n < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "read failed: %s", strerror(errno));conn->_except_cb(conn);return;}else if (n == 0){lg(Info, "server closed...");conn->_except_cb(conn);return;}buffer[n] = 0;conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理}_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理}void Acceptor(std::shared_ptr<Connection> conn){while (1){std::string clientIp;uint16_t clientPort;Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);if (newSocket.Fd() < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "listensock accept failed: %s", strerror(errno));break;}lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);newSocket.SetNonBlock();AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\std::bind(&TcpServer::Sender, this, std::placeholders::_1),\std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;func_t _OnMessage; //让上层处理信息
};
main.cc文件
#include "TcpServer.hpp"std::string Handle(std::string& inbuffer) //业务处理
{std::string tmp = inbuffer;inbuffer.clear();return tmp;
}
void DefaultOnMessage(std::shared_ptr<Connection> conn)
{string response_str = Handle(conn->_inbuffer);if (response_str.empty()) return;conn->_outbuffer += response_str;if (conn->_send_cb)conn->_send_cb(conn);
}int main()
{std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));svr->Init();svr->Start();return 0;
}
代码运行结果如下:
套用之前写的网络版本计算器业务逻辑 - 文末附完整代码
代码运行结果
reactor模型是半同步半异步的模型
半同步体现的是等:由epoll来做的:保证了就绪事件的通知和进行IO
半异步的体现:负责了业务处理,如果把业务处理放到线程池中去处理,就是半异步了。因为我们的业务处理并不耗时,所以就没有开多线程
还有一种模式叫做Proactor,纯异步的编写方式,在linux服务器设计里面没有,我们就不涉及了。
reactor也叫做反应堆,是什么意思呢?如同打地鼠的游戏
玩游戏的人就相当于是一个多路转接,我们要检测每一个洞口有没有地鼠出来,虽然没出来,但是我们知道一旦出来了我就要执行我的回调方法(来砸它) – 回调函数是提前设置好的。
游戏的面板就相当于我们的reactor,每一个洞就相当于Connection,老鼠上来了就叫做事件就绪,执行砸方法就是执行回调函数。这种就叫做反应堆
redis底层用的就是单reactor,处理用的是reactor的LT模式
完整版代码如下:
文件目录

Epoller.hpp文件
#include <sys/epoll.h>
#include "Log.hpp"
#include <cstring>class Epoller
{
public:Epoller(){}void EpollerCreate(){_epollfd = epoll_create(64);if (_epollfd < 0){lg(Fatal, "Create epoll failed: %s", strerror(errno));abort();}}void EpollerUpate(int oper, int fd, uint32_t event){int n = 0;if (oper == EPOLL_CTL_ADD || oper == EPOLL_CTL_MOD){struct epoll_event epEvent;epEvent.data.fd = fd;epEvent.events = 0;epEvent.events |= event;n = epoll_ctl(_epollfd, oper, fd, &epEvent);if (n < 0){lg(Error, "epoll ctl failed: %s", strerror(errno));}}else if (oper == EPOLL_CTL_DEL){n = epoll_ctl(_epollfd, oper, fd, 0);if (n < 0){lg(Error, "epoll ctl failed: %s", strerror(errno));}}}int Wait(struct epoll_event* events, int n, int timeout){int m = epoll_wait(_epollfd, events, n, timeout);if (m < 0){lg(Error, "epoll wait failed: %s", strerror(errno));}else if (m == 0){//超时返回(非阻塞返回)lg(Info, "timeout...");}return m;}int Fd(){return _epollfd;}
private:int _epollfd;
};
Log.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>using namespace std;#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4#define Screen 1
#define Onefile 2
#define Classfile 3#define fileName "log.txt"
//使用前需要创建log目录
class Log
{
public:Log(){printMethod = Screen;path = "./log/";}void Enable(int method){printMethod = method;}void printOneFile(string logname, const string& logtxt){logname = path + logname;int fd = open(logname.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);//open只会创建文件不会创建目录if (fd < 0){perror("open failed");return;}write(fd, logtxt.c_str(), logtxt.size());close(fd);}void printClassFile(int level, const string& logtxt){string filename = fileName;filename += ".";filename += leveltoString(level);printOneFile(filename, logtxt);}void printLog(int level, const string& logtxt){if (printMethod == Screen){cout << logtxt << endl;return;}else if (printMethod == Onefile){printOneFile(fileName, logtxt);return;}else if (printMethod == Classfile){printClassFile(level, logtxt);return;}}const char* leveltoString(int level){if (level == Info) return "Info";else if (level == Debug) return "Debug";else if (level == Error) return "Error";else if (level == Fatal) return "Fatal";else return "default";}void operator()(int level, const char* format, ...){time_t t = time(nullptr);struct tm* st = localtime(&t);char leftbuffer[4096];snprintf(leftbuffer, sizeof(leftbuffer), "year: %d, month: %d, day: %d, hour: %d, minute: %d, second: %d\n\[%s]:",st->tm_year + 1900, st->tm_mon + 1, st->tm_mday, st->tm_hour, st->tm_min, st->tm_sec, leveltoString(level));char rightbuffer[4096];va_list start;va_start(start, format);vsnprintf(rightbuffer, sizeof(rightbuffer), format, start);va_end(start);char logtxt[4096 * 2];snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);printLog(level, logtxt);}
private:int printMethod;string path;//路径与文件名解耦,最后将路径和文件粘合起来,再用open打开即可
};
nocpy.hpp文件
#pragma once
//编程技巧:写防拷贝的类的时候直接继承,就不用自己再手动写了
class nocpy
{
public:nocpy(){}~nocpy(){}
private:nocpy(const nocpy&) = delete;nocpy& operator=(const nocpy&) = delete;
};
Socket.hpp文件
#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <cstring>
#include "Log.hpp"Log lg;
class Socket
{
public:Socket(int fd = -1):_sockfd(fd){}~Socket(){}void CreateSocket(){_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){lg(Fatal, "create socket failed:%s", strerror(errno));exit(1);}}void Bind(uint16_t serverPort){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_port = htons(serverPort);server.sin_addr.s_addr = INADDR_ANY;int n = bind(_sockfd, (struct sockaddr*)&server, sizeof(server));if (n < 0){lg(Fatal, "bind socket failed:%s", strerror(errno));exit(2);}}void Listen(){int n = listen(_sockfd, 5);if (n < 0){lg(Fatal, "listen socket failed:%s", strerror(errno));exit(3);}}int Accept(string* clientIp, uint16_t* clinetPort){struct sockaddr_in peer;socklen_t len = sizeof(peer);int newsocket = accept(_sockfd, (struct sockaddr*)&peer, &len);if (newsocket < 0){return -1;}*clientIp = inet_ntoa(peer.sin_addr);*clinetPort = ntohs(peer.sin_port);return newsocket;}int Connect(const string& serverIp, uint16_t serverPort){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_addr.s_addr = inet_addr(serverIp.c_str());server.sin_port = htons(serverPort);int n = connect(_sockfd, (struct sockaddr*)&server, sizeof(server));if (n < 0){return -1;}return 0;}void Setsockopt(){int opt = 1;if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt)) < 0)lg(Error, "%s\n", strerror(errno));}void SetNonBlock(){int flag = fcntl(_sockfd, F_GETFL);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}void Close(){close(_sockfd);}int Fd(){return _sockfd;}
private:int _sockfd;
};
TcpServer.hpp文件如下
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd):_sockfd(fd){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}
public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针std::string _ip;uint16_t _port;
};class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer> //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:TcpServer(uint16_t port, func_t OnMessage):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(fd));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(fd, conn));if (fd == _listensock_ptr->Fd()) //如果是listensock连接,就构造智能指针来管理TcpServerconn->_tcp_server_ptr = shared_ptr<TcpServer>(this);elseconn->_tcp_server_ptr = shared_from_this(); //如果是其他连接,就共享该资源来管理//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Sender(std::shared_ptr<Connection> conn){while (1){int n = write(conn->_sockfd, conn->_outbuffer.c_str(), conn->_outbuffer.size());if (n > 0){conn->_outbuffer.erase(0, n);if (conn->_outbuffer.empty() == true){return;}}else if (n == 0){return;}else{if (errno == EWOULDBLOCK || errno == EAGAIN){//说明底层数据不就绪,即内核写缓冲区没有空间了break;}if (errno == EINTR){continue;}lg(Error, "write failed, socket: %d", conn->_sockfd);conn->_except_cb(conn);return;}}if (!conn->_outbuffer.empty()){//说明没有将数据发完,则需要设置写事件关心了uint32_t event = EVENT_OUT | EVENT_IN;_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);}else {//说明将数据发完了,则取消对写事件关心了uint32_t event = EVENT_IN;_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);}}void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可{lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);auto it = _connections.find(conn->_sockfd);assert (it != _connections.end());_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);close(conn->_sockfd);}void Recver(std::shared_ptr<Connection> conn){int fd = conn->_sockfd;char buffer[4096];while (1){int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取if (n < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "read failed: %s", strerror(errno));conn->_except_cb(conn);return;}else if (n == 0){lg(Info, "server closed...");conn->_except_cb(conn);return;}buffer[n] = 0;conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理}_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理}void Acceptor(std::shared_ptr<Connection> conn){while (1){std::string clientIp;uint16_t clientPort;Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);if (newSocket.Fd() < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "listensock accept failed: %s", strerror(errno));break;}lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);newSocket.SetNonBlock();AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\std::bind(&TcpServer::Sender, this, std::placeholders::_1),\std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;func_t _OnMessage; //让上层处理信息
};
main.cc文件
#include "TcpServer.hpp"
#include "ServerCal.hpp"void DefaultOnMessage(std::shared_ptr<Connection> conn)
{ServerCal calculator;std::string response_str = calculator.Calculator(conn->_inbuffer);if (response_str.empty()) return;conn->_outbuffer += response_str;if (conn->_send_cb)conn->_send_cb(conn);
}int main()
{std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));svr->Init();svr->Start();return 0;
}
Protocol.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>using namespace std;const string blank_space_sep = " ";
const string protocol_sep = "\n";
//添加报头:"内容" 转为 "长度\n内容\n"
bool Encode(string *package, const string& content)
{package->clear();*package += to_string(content.size());*package += protocol_sep;*package += content;*package += protocol_sep;return true;
}
//去除报头:"长度\n内容\n" 转为 "内容" -- 可能存在本来是5\n1 + 1\n 但接受到的是5\n1 +
bool Decode(string &package, string *content)
{content->clear();int pos = package.find(protocol_sep);if (pos == string::npos) return false;int len = stoi(package.substr(0, pos));int totalLen = pos + len + 2;if (package.size() < totalLen) return false;//如果报文不完整,说明*content = package.substr(pos + protocol_sep.size(), len);package.erase(0, totalLen);return true;
}//提取字符串
bool Extract(const string& in, int& x, int& y, char& oper)
{int pos = in.find(blank_space_sep);if (pos == string::npos) return false;x = stoi(in.substr(0, pos));oper = *in.substr(pos + blank_space_sep.size(), 1).c_str();int pos2 = in.rfind(blank_space_sep);if (pos2 == string::npos) return false;y = stoi(in.substr(pos2 + blank_space_sep.size()));return true;
}//请求协议
class Request
{
public:Request(int x = 0, int y = 0, char oper = '+'):_x(x), _y(y), _oper(oper){}//序列化 -- 将结构体转为"_x + _y"bool Serialize(string* out){
#ifdef MySelfout->clear();*out += to_string(_x);*out += blank_space_sep;*out += _oper;*out += blank_space_sep;*out += to_string(_y);return true;
#elseJson::Value root;root["x"] = _x;root["y"] = _y;root["oper"] = _oper;Json::StyledWriter w;*out = w.write(root);return true;
#endif}//反序列化 -- 将"_x + _y"转为结构体bool Deserialize(const string& in){
#ifdef MySelfreturn Extract(in, _x, _y, _oper);
#elseJson::Reader r;Json::Value root;r.parse(in, root);_x = root["x"].asInt();_y = root["y"].asInt();_oper = root["oper"].asInt();return true;
#endif}
public:int _x;int _y;char _oper;
};//响应协议
class Response
{const string blank_space_sep = " ";
public:Response(int result = 0, int code = 0):_result(), _code(code){}//序列化 -- 将结构体转为"_result _code"bool Serialize(string* out){
#ifdef MySelf*out = to_string(_result) + blank_space_sep + to_string(_code);return true;
#else Json::Value root;root["code"] = _code;root["result"] = _result;Json::StyledWriter w;*out = w.write(root);return true;
#endif}//反序列化 -- 将"_result _code"转为结构体bool Deserialize(const string& in){
#ifdef MySelfint pos = in.find(blank_space_sep);if (pos == string::npos) return false;_result = stoi(in.substr(0, pos));_code = stoi(in.substr(pos + blank_space_sep.size()));return true;
#elseJson::Reader r;Json::Value root;r.parse(in, root);_result = root["result"].asInt();_code = root["code"].asInt();return true;
#endif}
public:int _result;int _code; // 0,可信,否则!0具体是几,表明对应的错误原因
};
ServerCal.hpp文件
#pragma once
#include "Protocol.hpp"//服务器的计算服务
class ServerCal
{
public:ServerCal(){}Response CalculatorHelper(const Request &req){int x = req._x;char oper = req._oper;int y = req._y;Response rsp(0, 0);switch (oper){case '+':{rsp._result = x + y;rsp._code = 0;break;}case '-':{rsp._result = x - y;rsp._code = 0;break;}case '*':{rsp._result = x * y;rsp._code = 0;break;}case '/':{if (y == 0){rsp._result = 0;rsp._code = -1;break;}rsp._result = x / y;rsp._code = 0;break;}case '%':{if (y == 0){rsp._result = 0;rsp._code = -1;break;}rsp._result = x % y;rsp._code = 0;break;}default:break;}return rsp;}string Calculator(string& s){string content;if(!Decode(s, &content))//将"长度/n内容/n" -> "内容"return "";Request rq;rq.Deserialize(content);//将"内容"反序列化Response rsp = CalculatorHelper(rq);//得到答案内容content.clear();rsp.Serialize(&content);//序列化答案内容string ret;Encode(&ret, content);//将"内容" -> "长度/n内容/n"return ret;}
};
TcpClient.cc文件
#include "Protocol.hpp"
#include "Socket.hpp"static void Usage(const std::string &proc)
{std::cout << "\nUsage: " << proc << "\tserverIp\tport\n" << std::endl;
}
int main(int argc, char* argv[])
{if (argc != 3){Usage(argv[0]);exit(0);}string serverIp = argv[1];uint16_t serverPort = atoi(argv[2]);Socket skt;skt.CreateSocket();skt.Connect(serverIp, serverPort);string streamBuffer;while (1){cout << "Enter#";fflush(stdout);string content;getline(cin, content);//提取字符串,得到”请求反序列化“Request rq;Extract(content, rq._x, rq._y, rq._oper);string s;rq.Serialize(&s);string ret;Encode(&ret, s);write(skt.Fd(), ret.c_str(), ret.size());char readBuffer[4096];int n = read(skt.Fd(), readBuffer, sizeof(readBuffer) - 1);if (n > 0){readBuffer[n] = 0;streamBuffer += readBuffer;string ret;Decode(streamBuffer, &ret);Response rsp;rsp.Deserialize(ret);cout << "code: " << rsp._code << " result: " << rsp._result << endl;}else if (n == 0){cerr << "Server closed..." << endl;break;}else {cerr << "read failed:" << strerror(errno) << endl;exit(3);}}return 0;
}
Makefile文件
1=main
.PHONY:all
all:$1 TcpClient
$1:$1.ccg++ -o $@ $^ -std=c++11 -ljsoncpp
TcpClient:TcpClient.ccg++ -o $@ $^ -std=c++11 -ljsoncpp
.PHONY:clean
clean:rm -rf $1 TcpClient
相关文章:
Linux - 高级IO
目录 理解五种IO模型非阻塞IO的设置多路转接之select 实现一个简易的select服务器select服务器的优缺点 多路转接之poll 实现一个简易的poll服务器poll服务器的优缺点 多路转接之epoll epoll原理epoll的优势用epoll实现一个简易的echo服务器 epoll的LT和ET工作模式 什么是LT和…...
面试题:说一下 http 报文都有哪些东西?
面试题:说一下 http 报文都有哪些东西? HTTP 是传输超文本(实际上除了 HTML,可以传输任何类型的文件,如视频、音频、文本等)的协议,是一组用于浏览器-服务器之间数据传输的规则。 HTTP 位于 OS…...
开山之作!Python数据与算法分析手册,登顶GitHub!
若把编写代码比作行军打仗,那么要想称霸沙场,不能仅靠手中的利刃,还需深谙兵法。 Python是一把利刃,数据结构与算法则是兵法。只有熟读兵法,才能使利刃所向披靡。只有洞彻数据结构与算法,才能真正精通Pyth…...
编译安装gcc-11及可能遇到的bug
编译安装脚本 GCC_VERSION11.1.0 PACKAGE_DIR/path/to/gcc/source/code GCC_DIR$PACKAGE_DIR/gcc-$GCC_VERSION GCC_INSTALL_DIR/path/to/install/gccmkdir -p $GCC_INSTALL_DIR cd $GCC_INSTALL_DIR rm -rf * cd $PACKAGE_DIR rm -rf gcc-$GCC_VERSION if [ ! -f "gcc-$…...
vue项目引入json/js文件批量或单个方法
vue项目 json // 方式一 : 将文件内容完整的引入 import json from ./src/assets/xxx.json console.log(json) console.log(---)// 方式二 : 部分引入-名称必须是文件中定义的key import {name1,name2} from ./src/assets/xxx.json console.log(name1)…...
守护任务用来防止资源冲突
背景:有三个任务,他们都需要操作数码管。每个任务对应三个数码管,共9个数码管。硬件上9个数码管的控制使用一套硬件完成。 策略:每个任务都往自己的队列里面发数据,单独建立一个监听任务:处理所有队列的数…...
fast admin实现多数据库导入数据
思路 1创建多数据库连接 2后端的前台代码能使用get或者post请求传递选中数据给后台 3后台能够接收到 4后台接收到id或者全字段数据后对数据进行处理,然后使用多数据库操作将其存入第二个数据库 实现 1config文件下创建新数据库连接 db_config2 > [// 数据库类…...
NLP基础——序列模型(动手学深度学习)
序列模型 定义 序列模型是自然语言处理(NLP)和机器学习领域中一类重要的模型,它们特别适合处理具有时间顺序或序列结构的数据,例如文本、语音信号或时间序列数据。 举个例子:一部电影的评分在不同时间段的评分可能是…...
机器学习AI大模型的开源与闭源:哪个更好?
文章目录 前言一、开源AI模型1.1 开源的优点1.2 开源的缺点 二、闭源AI模型2.1 闭源的优点2.2 闭源的缺点 三、开源与闭源的平衡3.1 开源与闭源结合的案例3.2 开源与闭源的战略选择 小结 前言 在过去的几年里,人工智能(AI)和机器学习…...
关于大模型多轮问答的两种方式
前言 大模型的多轮问答难点就是在于如何精确识别用户最新的提问的真实意图,而在常见的使用大模型进行多轮对话方式中,我接触到的只有两种方式: 一种是简单地直接使用 user 和 assistant 两个角色将一问一答的会话内容喂给大模型,…...
达梦数据库相关SQL及适配Mysql配置总结
🍓 简介:java系列技术分享(👉持续更新中…🔥) 🍓 初衷:一起学习、一起进步、坚持不懈 🍓 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正🙏 🍓 希望这篇文章对你有所帮助,欢…...
Centos7.9实现多台机器ssh免密登录
1.本机(172.16.10.228)先生成密钥对 ssh-keygen -t rsa 2.执行命令,把本机公钥拷贝到远程机器 ssh-copy-id rootdistinctIp 3.查看一下远程机器 、/root/.ssh/authorized_keys文件 cat /root/.ssh/authorized_keys 会看到里边多了个公钥…...
Unity3D DOTS JobSystem物理引擎的使用详解
前言 Unity3D DOTS(Data-Oriented Technology Stack)是Unity引擎的一项新技术,旨在提高游戏性能和扩展性。其中的Job System是一种用于并行处理任务的系统,可以有效地利用多核处理器的性能。在本文中,我们将重点介绍如…...
vue3+element-plus 表单校验和循环form表单校验
1.HTML页面 //el-form 标签添加上 ref"form2Form" :rules"rules2" :model"form2" 正常表单校验 //没有循环表单的使用事例<el-form-item label"投保人名称" class"insurance-date-no1" prop"tbrName">…...
Java集合基础知识点系统性总结篇
目录 集合一、图解集合的继承体系?([图片来源](https://www.cnblogs.com/mrhgw/p/9728065.html))点击查看大图二、List,Set,Map三者的区别?三、List接口的实现3.1、Arraylist 、 LinkedList、Vector3.2、Arraylist 、 LinkedList、…...
智能网联汽车信息安全风险识别与应对策略研究综述
摘要:随着智能网联汽车技术的飞速发展,其信息安全问题逐渐成为公众关注的焦点。本文概述了智能网联汽车技术的发展背景和信息安全风险的来源,采用STRIDE威胁分析方法对智能网联汽车的四层模型进行风险识别,进一步探讨了抗女巫攻击…...
python-web应用程序-Django数据库-数据库表设计
python-web应用程序-Django数据库-数据库表设计 在models中创建一个类会自动对数据库进行管理,那么如何用类的声明来实现数据库表的设计呢? from django.db import models# Create your models here. class Department(models.Model):title models.Ch…...
C#知识|封装典型的SQLServer数据库查询方法。
哈喽,你好啊,我是雷工! 前边学习封装了增删改的方法封装: 《C#知识|通用数据访问类SQLHelper的编写》; 本节继续学习将两种典型的查询方法封装成类。 下边为学习笔记。 01 封装单一返回结果的封装 在查看封装后的代码之前,可以先看下封装前代码的写法: 《C#知识|通过A…...
第一篇 逻辑门(与门、或门、非门、异或门)
一、实验目的 了解DE1-SOC开发板一些外设。 掌握常用组合逻辑门电路的基本原理。 学习Verilog HDL的基本语法。 学习使用ModelSim工具对设计的电路进行仿真,包括编写Testbench仿真代码,以及ModelSim工具的使用。 熟悉使用Quartus软件从创建Quartus工…...
车牌号码智能监测识别摄像机
车牌号码智能监测识别摄像机是一项革命性的技术,为交通管理和安全提供了全新的解决方案。这种摄像机利用先进的人工智能和图像识别技术,能够实时监测道路上的车辆,并准确识别车辆的车牌号码,为交通管理和安全提供了强有力的支持。…...
wordpress后台更新后 前端没变化的解决方法
使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...
使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装
以下是基于 vant-ui(适配 Vue2 版本 )实现截图中照片上传预览、删除功能,并封装成可复用组件的完整代码,包含样式和逻辑实现,可直接在 Vue2 项目中使用: 1. 封装的图片上传组件 ImageUploader.vue <te…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/
使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题:docker pull 失败 网络不同,需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...
06 Deep learning神经网络编程基础 激活函数 --吴恩达
深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
Java 二维码
Java 二维码 **技术:**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...
Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...






































