Linux网络编程5——多路IO转接
一.TCP状态时序理解
1.TCP状态理解

**CLOSED:**表示初始状态。
**LISTEN:**该状态表示服务器端的某个SOCKET处于监听状态,可以接受连接。
**SYN_SENT:**这个状态与SYN_RCVD遥相呼应,当客户端SOCKET执行CONNECT连接时,它首先发送SYN报文,随即进入到了SYN_SENT状态,并等待服务端的发送三次握手中的第2个报文。SYN_SENT状态表示客户端已发送SYN报文。
SYN_RCVD: 该状态表示接收到SYN报文,在正常情况下,这个状态是服务器端的SOCKET在建立TCP连接时的三次握手会话过程中的一个中间状态,很短暂。此种状态时,当收到客户端的ACK报文后,会进入到ESTABLISHED状态。
**ESTABLISHED:**表示连接已经建立。
FIN_WAIT_1: FIN_WAIT_1和FIN_WAIT_2状态的真正含义都是表示等待对方的FIN报文。区别是:
FIN_WAIT_1状态是当socket在ESTABLISHED状态时,想主动关闭连接,向对方发送了FIN报文,此时该socket进入到FIN_WAIT_1状态。
FIN_WAIT_2状态是当对方回应ACK后,该socket进入到FIN_WAIT_2状态,正常情况下,对方应马上回应ACK报文,所以FIN_WAIT_1状态一般较难见到,而FIN_WAIT_2状态可用netstat看到。
**FIN_WAIT_2:主动关闭链接的一方,发出FIN收到ACK以后进入该状态。称之为半连接或半关闭状态。**该状态下的socket只能接收数据,不能发。
TIME_WAIT: 表示收到了对方的FIN报文,并发送出了ACK报文,等2MSL后即可回到CLOSED可用状态。如果FIN_WAIT_1状态下,收到对方同时带 FIN标志和ACK标志的报文时,可以直接进入到TIME_WAIT状态,而无须经过FIN_WAIT_2状态。
CLOSING: 这种状态较特殊,属于一种较罕见的状态。正常情况下,当你发送FIN报文后,按理来说是应该先收到(或同时收到)对方的 ACK报文,再收到对方的FIN报文。但是CLOSING状态表示你发送FIN报文后,并没有收到对方的ACK报文,反而却也收到了对方的FIN报文。什么情况下会出现此种情况呢?如果双方几乎在同时close一个SOCKET的话,那么就出现了双方同时发送FIN报文的情况,也即会出现CLOSING状态,表示双方都正在关闭SOCKET连接。
CLOSE_WAIT: 此种状态表示在等待关闭。当对方关闭一个SOCKET后发送FIN报文给自己,系统会回应一个ACK报文给对方,此时则进入到CLOSE_WAIT状态。接下来呢,察看是否还有数据发送给对方,如果没有可以 close这个SOCKET,发送FIN报文给对方,即关闭连接。所以在CLOSE_WAIT状态下,需要关闭连接。
LAST_ACK: 该状态是被动关闭一方在发送FIN报文后,最后等待对方的ACK报文。当收到ACK报文后,即可以进入到CLOSED可用状态。
2.端口复用
在server的TCP连接没有完全断开之前不允许重新监听是不合理的。因为,TCP连接没有完全断开指的是connfd(127.0.0.1:6666)没有完全断开,而我们重新监听的是lis-tenfd(0.0.0.0:6666),虽然是占用同一个端口,但IP地址不同,connfd对应的是与某个客户端通讯的一个具体的IP地址,而listenfd对应的是wildcard address。解决这个问题的方法是使用setsockopt()设置socket描述符的选项SO_REUSEADDR为1,表示允许创建端口号相同但IP地址不同的多个socket描述符。
在server代码的socket()和bind()调用之间插入如下代码:
int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
半关闭:
通信双方中,只有一端关闭通信。 --- FIN_WAIT_2close(cfd);shutdown(int fd, int how); how: SHUT_RD 关读端SHUT_WR 关写端SHUT_RDWR 关读写shutdown在关闭多个文件描述符应用的文件时,采用全关闭方法。close,只关闭一个。
3.多路IO转接思想
上次学习的并发服务器技术,分别包括多进程并发服务器和多线程并发服务器,解决了有多个客户端来连接服务器的问题,但是在效率方面,如果没有客户端来连接,那么服务器一直在while循环里面阻塞监听(accept),非常影响效率。那么针对这种情况,我们可以在中间建立一个“秘书”,客户端想连接服务器就先通知秘书,秘书再通知服务器,这样服务器就不用一直阻塞在accept监听里面啦。
这个“秘书”有三种方式实现:select、poll和epoll。通过这种中间“秘书”方式的构建服务器,就是多路IO转接服务器,也叫多任务IO服务器,该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。
二.select多路IO转接服务器
1.函数分析
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);nfds: 监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态readfds: 监控有读数据到达文件描述符集合,传入传出参数writefds: 监控写数据到达文件描述符集合,传入传出参数exceptfds: 监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数timeout: 定时阻塞监控时间,3种情况1.NULL,永远等下去2.设置timeval,等待固定时间3.设置timeval里时间均为0,检查描述字后立即返回,轮询struct timeval {long tv_sec; /* seconds */long tv_usec; /* microseconds */};
知识点1:文件描述符
在Unix-like系统中,文件描述符是一个广泛的概念,它用于表示几乎所有类型的文件、管道、目录以及特殊类型的文件,如socket套接字。因此,socket套接字确实是文件描述符的一种,它被用来在进程间或不同计算机之间进行网络通信。总之一句话,文件描述符大于套接字,就相当于套接字是文件描述符的一种。

在Linux操作系统中,我们使用位图的方式描述文件描述符,而0、1、2三个文件描述符,已经被系统调用,所以一般3用来描述服务器的监听套接字,后面4、5、6……才用来与客户端建立连接。那么在进行select函数传参时,需要注意,传递与客户端通信的最大文件描述符+1,才能保证循环遍历所有的套接字。
知识点2:文件描述集合
readfds、writefds、exceptfds三个文件描述符集合,他们都是传入传出参数。在传入时,readfds里面表示的是需要监控有数据到达需要读的文件描述符;在传出时,其表示在需要监控有数据到达的文件描述符中,监测发生了数据到达事件的文件描述符。同理,在传入时,writefds里面表示的是需要监控有写数据达到文件的描述符;在传出时,其表示在需要监控有写数据到达的文件描述符中,监测发生了写数据到达事件的文件描述符。同理,在传入时,exceptfds里面表示的是需要监控是否有异常事件发生的文件描述符;在传出时,其表示在需要监控有异常事件发生的文件描述符中,检测到发生了异常事件的文件描述符。
知识点3:定时阻塞监控时间
struct timeval {long tv_sec; /* 秒:seconds */long tv_usec; /* 微妙:microseconds */};
在其作为参数传递给select函数时,其有三种不同的含义。
知识点4:文件描述符集合操作函数
void FD_CLR(int fd, fd_set *set); //把文件描述符集合里fd清0int FD_ISSET(int fd, fd_set *set); //测试文件描述符集合里fd是否置1void FD_SET(int fd, fd_set *set); //把文件描述符集合里fd位置1void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0

知识点5:返回值
> 0:所有监听集合(3个)中,满足对应事件的总个数0:没有满足事件的文件描述符-1:报错errno
2.思路分析

案例练习:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <ctype.h>#define ser_port 9527int main()
{int maxfd = 0;int lfd,cfd;int ret,nread,nwrite;char buf[BUFSIZ];lfd = socket(AF_INET,SOCK_STREAM,0);if(lfd == -1){perror("socket");return -1;}maxfd = lfd;struct sockaddr_in ser_addr,clt_addr;socklen_t clt_addr_len = sizeof(clt_addr);ser_addr.sin_family = AF_INET;ser_addr.sin_port = htons(ser_port);ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);bind(lfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));listen(lfd,128);fd_set rset,allset;FD_ZERO(&allset);FD_SET(lfd,&allset);while(1){rset = allset; //每次都从新设置的监控集开始ret = select(maxfd+1,&rset,NULL,NULL,NULL);if(ret > 0){if(FD_ISSET(lfd,&rset)) //说明有新的客户端请求建立连接{cfd = accept(lfd,(struct sockaddr *)&clt_addr,&clt_addr_len);FD_SET(cfd,&allset);if(maxfd < cfd)maxfd = cfd;if(--ret == 0) //select只有一个返回,说明是lfd,不需要执行后续的操作continue;}}for(int i = lfd+1; i <= maxfd; i++){if(FD_ISSET(i,&rset)){nread = read(i,buf,sizeof(buf));if(nread == 0) //检测到客户端关闭{close(i);FD_CLR(i,&allset);}else if(nread > 0){for(int j = 0;j<nread;j++){buf[j] = toupper(buf[j]);}write(i,buf,nread);write(STDIN_FILENO,buf,nread);}}}}close(lfd);return 0;
}
3.优缺点
-
缺点:
- 监听上限受文件描述符限制, 最大
1024 - 检测满足条件的fd, 自己添加业务逻辑提高小。 提高了编码难度。
- 监听上限受文件描述符限制, 最大
-
优点:跨平台,win、linux、macOS、Unix、类Unix、mips
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <ctype.h>#include "wrap.h"#define SERV_PORT 6666int main(int argc, char *argv[])
{int i, j, n, maxi;int nready, client[FD_SETSIZE]; /* 自定义数组client, 防止遍历1024个文件描述符 FD_SETSIZE默认为1024 */int maxfd, listenfd, connfd, sockfd;char buf[BUFSIZ], str[INET_ADDRSTRLEN]; /* #define INET_ADDRSTRLEN 16 */struct sockaddr_in clie_addr, serv_addr;socklen_t clie_addr_len;fd_set rset, allset; /* rset 读事件文件描述符集合 allset用来暂存 */listenfd = Socket(AF_INET, SOCK_STREAM, 0);int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));bzero(&serv_addr, sizeof(serv_addr));serv_addr.sin_family= AF_INET;serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);serv_addr.sin_port= htons(SERV_PORT);Bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));Listen(listenfd, 128);maxfd = listenfd; /* 起初 listenfd 即为最大文件描述符 */maxi = -1; /* 将来用作client[]的下标, 初始值指向0个元素之前下标位置 */for (i = 0; i < FD_SETSIZE; i++)client[i] = -1; /* 用-1初始化client[] */FD_ZERO(&allset);FD_SET(listenfd, &allset); /* 构造select监控文件描述符集 */while (1) { rset = allset; /* 每次循环时都从新设置select监控信号集 */nready = select(maxfd+1, &rset, NULL, NULL, NULL); //2 1--lfd 1--connfdif (nready < 0)perr_exit("select error");if (FD_ISSET(listenfd, &rset)) { /* 说明有新的客户端链接请求 */clie_addr_len = sizeof(clie_addr);connfd = Accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len); /* Accept 不会阻塞 */printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)),ntohs(clie_addr.sin_port));for (i = 0; i < FD_SETSIZE; i++)if (client[i] < 0) { /* 找client[]中没有使用的位置 */client[i] = connfd; /* 保存accept返回的文件描述符到client[]里 */break;}if (i == FD_SETSIZE) { /* 达到select能监控的文件个数上限 1024 */fputs("too many clients\n", stderr);exit(1);}FD_SET(connfd, &allset); /* 向监控文件描述符集合allset添加新的文件描述符connfd */if (connfd > maxfd)maxfd = connfd; /* select第一个参数需要 */if (i > maxi)maxi = i; /* 保证maxi存的总是client[]最后一个元素下标 */if (--nready == 0)continue;} for (i = 0; i <= maxi; i++) { /* 检测哪个clients 有数据就绪 */if ((sockfd = client[i]) < 0)continue;if (FD_ISSET(sockfd, &rset)) {if ((n = Read(sockfd, buf, sizeof(buf))) == 0) { /* 当client关闭链接时,服务器端也关闭对应链接 */Close(sockfd);FD_CLR(sockfd, &allset); /* 解除select对此文件描述符的监控 */client[i] = -1;} else if (n > 0) {for (j = 0; j < n; j++)buf[j] = toupper(buf[j]);Write(sockfd, buf, n);Write(STDOUT_FILENO, buf, n);}if (--nready == 0)break; /* 跳出for, 但还在while中 */}}}Close(listenfd);return 0;
}
三.poll多路IO转接服务器
1.函数分析
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int fd; /* 文件描述符 */short events; /* 监控的事件 */short revents; /* 监控事件中满足条件返回的事件 */};POLLIN 普通或带外优先数据可读,即POLLRDNORM | POLLRDBANDPOLLOUT 普通或带外数据可写POLLERR 发生错误nfds 监控数组中有多少文件描述符需要被监控timeout 毫秒级等待-1:阻塞等,#define INFTIM -1 Linux中没有定义此宏0:立即返回,不阻塞进程>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值

2.思路分析

3.read函数返回值补充
read 函数返回值:
>0:实际读到的字节数=0: socket中,表示对端关闭。close()-1:- 如果
errno == EINTR被异常终端。 需要重启。 - 如果
errno == EAGIN或EWOULDBLOCK以非阻塞方式读数据,但是没有数据。 需要,再次读。 - 如果
errno == ECONNRESET说明连接被 重置。 需要close(),移除监听队列。 - 其他错误。
- 如果
4.优缺点
优点:
- 自带数组结构,可以将 监听事件集合 和 返回事件集合 分离。
- 拓展 监听上限,超出 1024限制。
缺点:
- 不能跨平台。
Linux - 无法直接定位满足监听事件的文件描述符, 编码难度较大。
5.突破1024文件描述符限制
可以使用cat命令查看一个进程可以打开的socket描述符上限:
cat /proc/sys/fs/file-max
方式一:如有需要,可以通过修改配置文件的方式修改该上限值。
sudo vi /etc/security/limits.conf
在文件尾部写入以下配置,soft软限制,hard硬限制,可以在这里直接修改。如下图所示。
* soft nofile 65536 --> 设置默认值, 可以直接借助命令修改。 【注销用户,使其生效】* hard nofile 100000 --> 命令修改上限。
方式二:可以通过命令的方式修改
ulimit -n num
这里的num不能超过硬件限制
四.epoll实现多路转接服务器
1.epoll相关函数
-
创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。
int epoll_create(int size); 创建一棵监听红黑树size:创建的红黑树的监听节点数量。(仅供内核参考。)返回值:指向新创建的红黑树的根节点的 fd。 失败: -1 errno -
控制某个epoll监控的文件描述符上的事件:注册、修改、删除。
#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)epfd: 为epoll_creat的句柄op: 表示动作,用3个宏来表示:EPOLL_CTL_ADD (注册新的fd到epfd),EPOLL_CTL_MOD (修改已经注册的fd的监听事件),EPOLL_CTL_DEL (从epfd删除一个fd);event: 告诉内核需要监听的事件struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */};typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t;EPOLLIN : 表示对应的文件描述符可以读(包括对端SOCKET正常关闭)EPOLLOUT: 表示对应的文件描述符可以写EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)EPOLLERR: 表示对应的文件描述符发生错误EPOLLHUP: 表示对应的文件描述符被挂断;EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里 -
等待所监控文件描述符上有事件的产生,类似于
select()调用。int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); 阻塞监听。epfd:epoll_create 函数的返回值。 epfdevents:传出参数,【数组】, 满足监听条件的 哪些 fd 结构体。maxevents:数组 元素的总个数。 1024struct epoll_event evnets[1024]timeout:-1: 阻塞0: 不阻塞>0: 超时时间 (毫秒)返回值:> 0: 满足监听的 总个数。 可以用作循环上限。0: 没有fd满足监听事件-1:失败。 errno
2.思路分析

基础版:
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <ctype.h>#include "wrap.h"#define MAXLINE 8192
#define SERV_PORT 8000#define OPEN_MAX 5000int main(int argc, char *argv[])
{int i, listenfd, connfd, sockfd;int n, num = 0;ssize_t nready, efd, res;char buf[MAXLINE], str[INET_ADDRSTRLEN];socklen_t clilen;struct sockaddr_in cliaddr, servaddr;listenfd = Socket(AF_INET, SOCK_STREAM, 0);int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); //端口复用bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);Bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));Listen(listenfd, 20);efd = epoll_create(OPEN_MAX); //创建epoll模型, efd指向红黑树根节点if (efd == -1)perr_exit("epoll_create error");struct epoll_event tep, ep[OPEN_MAX]; //tep: epoll_ctl参数 ep[] : epoll_wait参数tep.events = EPOLLIN; tep.data.fd = listenfd; //指定lfd的监听时间为"读"res = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &tep); //将lfd及对应的结构体设置到树上,efd可找到该树if (res == -1)perr_exit("epoll_ctl error");for ( ; ; ) {/*epoll为server阻塞监听事件, ep为struct epoll_event类型数组, OPEN_MAX为数组容量, -1表永久阻塞*/nready = epoll_wait(efd, ep, OPEN_MAX, -1); if (nready == -1)perr_exit("epoll_wait error");for (i = 0; i < nready; i++) {if (!(ep[i].events & EPOLLIN)) //如果不是"读"事件, 继续循环continue;if (ep[i].data.fd == listenfd) { //判断满足事件的fd是不是lfd clilen = sizeof(cliaddr);connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen); //接受链接printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port));printf("cfd %d---client %d\n", connfd, ++num);tep.events = EPOLLIN; tep.data.fd = connfd;res = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &tep); //加入红黑树if (res == -1)perr_exit("epoll_ctl error");} else { //不是lfd, sockfd = ep[i].data.fd;n = Read(sockfd, buf, MAXLINE);if (n == 0) { //读到0,说明客户端关闭链接res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL); //将该文件描述符从红黑树摘除if (res == -1)perr_exit("epoll_ctl error");Close(sockfd); //关闭与该客户端的链接printf("client[%d] closed connection\n", sockfd);} else if (n < 0) { //出错perror("read n < 0 error: ");res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL); //摘除节点Close(sockfd);} else { //实际读到了字节数for (i = 0; i < n; i++)buf[i] = toupper(buf[i]); //转大写,写回给客户端Write(STDOUT_FILENO, buf, n);Writen(sockfd, buf, n);}}}}Close(listenfd);Close(efd);return 0;
}
3.ET模式和LT模式
EPOLL事件有两种模型:
-
Edge Triggered (ET)边缘触发只有数据到来才触发,不管缓存区中是否还有数据。 -
Level Triggered (LT)水平触发只要有数据都会触发。
案例:
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>#define MAXLINE 10int main(int argc, char *argv[])
{int efd, i;int pfd[2];pid_t pid;char buf[MAXLINE], ch = 'a';pipe(pfd);pid = fork();if (pid == 0) {close(pfd[0]);while (1) {//aaaa\nfor (i = 0; i < MAXLINE/2; i++)buf[i] = ch;buf[i-1] = '\n';ch++;//bbbb\nfor (; i < MAXLINE; i++)buf[i] = ch;buf[i-1] = '\n';ch++;write(pfd[1], buf, sizeof(buf));sleep(2);}close(pfd[1]);} else if (pid > 0) bian{struct epoll_event event;struct epoll_event resevent[10];int res, len;close(pfd[1]);efd = epoll_create(10);/* event.events = EPOLLIN; */event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */event.data.fd = pfd[0];epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);while (1) {res = epoll_wait(efd, resevent, 10, -1);printf("res %d\n", res);if (resevent[0].data.fd == pfd[0]) {len = read(pfd[0], buf, MAXLINE/2); /*这里因为只读取了一半,即aaaa\n,剩下的一半bbbb\n在水平触发时依旧可以wait返回继续读取,边沿触发时则需要等待下次子进程写事件才会读取*/write(STDOUT_FILENO, buf, len);}}close(pfd[0]);close(efd);} else {perror("fork");exit(-1);}return 0;
}
如果是想在网络socket模型中使用ET或LT模式,可以在如下代码中修改:
struct epoll_event event;
event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */
event.data.fd = connfd;
epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);
即修改epoll_ctl函数的第四个参数即可。
在ET模式下,epoll只能使用非阻塞的模式:
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>#define MAXLINE 10
#define SERV_PORT 8000int main(void)
{struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int efd, flag;listenfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));listen(listenfd, 20);///struct epoll_event event;struct epoll_event res_event[10];int res, len;efd = epoll_create(10);event.events = EPOLLIN | EPOLLET; /* ET 边沿触发,默认是水平触发 *///event.events = EPOLLIN;printf("Accepting connections ...\n");cliaddr_len = sizeof(cliaddr);connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));flag = fcntl(connfd, F_GETFL); /* 修改connfd为非阻塞读 */flag |= O_NONBLOCK;fcntl(connfd, F_SETFL, flag);event.data.fd = connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event); //将connfd加入监听红黑树while (1) {printf("epoll_wait begin\n");res = epoll_wait(efd, res_event, 10, -1); //最多10个, 阻塞监听printf("epoll_wait end res %d\n", res);if (res_event[0].data.fd == connfd) {while ((len = read(connfd, buf, MAXLINE/2)) >0 ) //非阻塞读, 轮询write(STDOUT_FILENO, buf, len);}}return 0;
}
重点,使用fcntl函数设置为非阻塞读模式。
4.优缺点
- 优点:高效,突破1024文件描述符。
- 缺点:不能跨平台,Linux。
5.epoll反应堆模型
epoll反应堆是libevent库的核心实现思想,libevent是写多并发服务器必须了解的库,其可以跨平台支持。之所以叫反应堆,是形容他比较快,之所以快,是因为大量的函数回调。
epoll 反应堆模型:
epoll ET模式 + 非阻塞、轮询 + void *ptr。原来: socket、bind、listen -- epoll_create 创建监听 红黑树 -- 返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while(1)---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() --- 小->大 -- write回去。反应堆:不但要监听 cfd 的读事件、还要监听cfd的写事件。socket、bind、listen -- epoll_create 创建监听 红黑树 -- 返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while(1)---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() --- 小->大 -- cfd从监听红黑树上摘下 -- EPOLLOUT -- 回调函数 -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听写事件-- 等待 epoll_wait 返回 -- 说明 cfd 可写 -- write回去 -- cfd从监听红黑树上摘下 -- EPOLLIN -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听读事件 -- epoll_wait 监听
反应堆代码:
/**epoll基于非阻塞I/O事件驱动*/
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>#define MAX_EVENTS 1024 //监听上限数
#define BUFLEN 4096
#define SERV_PORT 8080void recvdata(int fd, int events, void *arg);
void senddata(int fd, int events, void *arg);/* 描述就绪文件描述符相关信息 */struct myevent_s {int fd; //要监听的文件描述符int events; //对应的监听事件void *arg; //泛型参数void (*call_back)(int fd, int events, void *arg); //回调函数int status; //是否在监听:1->在红黑树上(监听), 0->不在(不监听)char buf[BUFLEN];int len;long last_active; //记录每次加入红黑树 g_efd 的时间值
};int g_efd; //全局变量, 保存epoll_create返回的文件描述符
struct myevent_s g_events[MAX_EVENTS+1]; //自定义结构体类型数组. +1-->listen fd/*将结构体 myevent_s 成员变量 初始化*/void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
{ev->fd = fd;ev->call_back = call_back;ev->events = 0;ev->arg = arg;ev->status = 0;memset(ev->buf, 0, sizeof(ev->buf));ev->len = 0;ev->last_active = time(NULL); //调用eventset函数的时间return;
}/* 向 epoll监听的红黑树 添加一个 文件描述符 *///eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
void eventadd(int efd, int events, struct myevent_s *ev)
{struct epoll_event epv = {0, {0}};int op;epv.data.ptr = ev;epv.events = ev->events = events; //EPOLLIN 或 EPOLLOUTif (ev->status == 0) { //已经在红黑树 g_efd 里op = EPOLL_CTL_ADD; //将其加入红黑树 g_efd, 并将status置1ev->status = 1;}if (epoll_ctl(efd, op, ev->fd, &epv) < 0) //实际添加/修改printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);elseprintf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);return ;
}/* 从epoll 监听的 红黑树中删除一个 文件描述符*/void eventdel(int efd, struct myevent_s *ev)
{struct epoll_event epv = {0, {0}};if (ev->status != 1) //不在红黑树上return ;//epv.data.ptr = ev;epv.data.ptr = NULL;ev->status = 0; //修改状态epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); //从红黑树 efd 上将 ev->fd 摘除return ;
}/* 当有文件描述符就绪, epoll返回, 调用该函数 与客户端建立链接 */void acceptconn(int lfd, int events, void *arg)
{struct sockaddr_in cin;socklen_t len = sizeof(cin);int cfd, i;if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) {if (errno != EAGAIN && errno != EINTR) {/* 暂时不做出错处理 */}printf("%s: accept, %s\n", __func__, strerror(errno));return ;}do {for (i = 0; i < MAX_EVENTS; i++) //从全局数组g_events中找一个空闲元素if (g_events[i].status == 0) //类似于select中找值为-1的元素break; //跳出 forif (i == MAX_EVENTS) {printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);break; //跳出do while(0) 不执行后续代码}int flag = 0;if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) { //将cfd也设置为非阻塞printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno));break;}/* 给cfd设置一个 myevent_s 结构体, 回调函数 设置为 recvdata */eventset(&g_events[i], cfd, recvdata, &g_events[i]); eventadd(g_efd, EPOLLIN, &g_events[i]); //将cfd添加到红黑树g_efd中,监听读事件} while(0);printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);return ;
}void recvdata(int fd, int events, void *arg)
{struct myevent_s *ev = (struct myevent_s *)arg;int len;len = recv(fd, ev->buf, sizeof(ev->buf), 0); //读文件描述符, 数据存入myevent_s成员buf中eventdel(g_efd, ev); //将该节点从红黑树上摘除if (len > 0) {ev->len = len;ev->buf[len] = '\0'; //手动添加字符串结束标记printf("C[%d]:%s\n", fd, ev->buf);eventset(ev, fd, senddata, ev); //设置该 fd 对应的回调函数为 senddataeventadd(g_efd, EPOLLOUT, ev); //将fd加入红黑树g_efd中,监听其写事件} else if (len == 0) {close(ev->fd);/* ev-g_events 地址相减得到偏移元素位置 */printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events);} else {close(ev->fd);printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));}return;
}void senddata(int fd, int events, void *arg)
{struct myevent_s *ev = (struct myevent_s *)arg;int len;len = send(fd, ev->buf, ev->len, 0); //直接将数据 回写给客户端。未作处理eventdel(g_efd, ev); //从红黑树g_efd中移除if (len > 0) {printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);eventset(ev, fd, recvdata, ev); //将该fd的 回调函数改为 recvdataeventadd(g_efd, EPOLLIN, ev); //从新添加到红黑树上, 设为监听读事件} else {close(ev->fd); //关闭链接printf("send[fd=%d] error %s\n", fd, strerror(errno));}return ;
}/*创建 socket, 初始化lfd */void initlistensocket(int efd, short port)
{struct sockaddr_in sin;int lfd = socket(AF_INET, SOCK_STREAM, 0);fcntl(lfd, F_SETFL, O_NONBLOCK); //将socket设为非阻塞memset(&sin, 0, sizeof(sin)); //bzero(&sin, sizeof(sin))sin.sin_family = AF_INET;sin.sin_addr.s_addr = INADDR_ANY;sin.sin_port = htons(port);bind(lfd, (struct sockaddr *)&sin, sizeof(sin));listen(lfd, 20);/* void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg); */eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]);/* void eventadd(int efd, int events, struct myevent_s *ev) */eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);return ;
}int main(int argc, char *argv[])
{unsigned short port = SERV_PORT;if (argc == 2)port = atoi(argv[1]); //使用用户指定端口.如未指定,用默认端口g_efd = epoll_create(MAX_EVENTS+1); //创建红黑树,返回给全局 g_efd if (g_efd <= 0)printf("create efd in %s err %s\n", __func__, strerror(errno));initlistensocket(g_efd, port); //初始化监听socketstruct epoll_event events[MAX_EVENTS+1]; //保存已经满足就绪事件的文件描述符数组 printf("server running:port[%d]\n", port);int checkpos = 0, i;while (1) {/* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭此客户端链接 */long now = time(NULL); //当前时间for (i = 0; i < 100; i++, checkpos++) { //一次循环检测100个。 使用checkpos控制检测对象if (checkpos == MAX_EVENTS)checkpos = 0;if (g_events[checkpos].status != 1) //不在红黑树 g_efd 上continue;long duration = now - g_events[checkpos].last_active; //客户端不活跃的世间if (duration >= 60) {close(g_events[checkpos].fd); //关闭与该客户端链接printf("[fd=%d] timeout\n", g_events[checkpos].fd);eventdel(g_efd, &g_events[checkpos]); //将该客户端 从红黑树 g_efd移除}}/*监听红黑树g_efd, 将满足的事件的文件描述符加至events数组中, 1秒没有事件满足, 返回 0*/int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);if (nfd < 0) {printf("epoll_wait error, exit\n");break;}for (i = 0; i < nfd; i++) {/*使用自定义结构体myevent_s类型指针, 接收 联合体data的void *ptr成员*/struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr; if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { //读就绪事件ev->call_back(ev->fd, events[i].events, ev->arg);//lfd EPOLLIN }if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { //写就绪事件ev->call_back(ev->fd, events[i].events, ev->arg);}}}/* 退出前释放所有资源 */return 0;
}
6.保活机制
客户端和服务器通信,服务器怎么知道客户端掉线?
- 心跳包
- 乒乓包
- 设置TCP属性(探测分节)
另外推荐一个查看代码的工具:

五.线程池
1.线程池思想

struct threadpool_t {pthread_mutex_t lock; /* 用于锁住本结构体 */ pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid; /* 存管理线程tid */threadpool_task_t *task_queue; /* 任务队列(数组首地址) */int min_thr_num; /* 线程池最小线程数 */int max_thr_num; /* 线程池最大线程数 */int live_thr_num; /* 当前存活线程个数 */int busy_thr_num; /* 忙状态线程个数 */int wait_exit_thr_num; /* 要销毁的线程个数 */int queue_front; /* task_queue队头下标 */int queue_rear; /* task_queue队尾下标 */int queue_size; /* task_queue队中实际任务数 */int queue_max_size; /* task_queue队列可容纳任务数上限 */int shutdown; /* 标志位,线程池使用状态,true或false */
};typedef struct {void *(*function)(void *); /* 函数指针,回调函数 */void *arg; /* 上面函数的参数 */} threadpool_task_t; /* 各子线程任务结构体 */
2.搭建线程池步骤
线程池模块分析:
1. main(); 创建线程池。向线程池中添加任务。 借助回调处理任务。销毁线程池。2. pthreadpool_create();创建线程池结构体 指针。初始化线程池结构体 { N 个成员变量 }创建 N 个任务线程。创建 1 个管理者线程。失败时,销毁开辟的所有空间。(释放)3. threadpool_thread()进入子线程回调函数。接收参数 void *arg --》 pool 结构体加锁 --》lock --》 整个结构体锁判断条件变量 --》 wait -------------------1704. adjust_thread()循环 10 s 执行一次。进入管理者线程回调函数接收参数 void *arg --》 pool 结构体加锁 --》lock --》 整个结构体锁获取管理线程池要用的到 变量。 task_num, live_num, busy_num根据既定算法,使用上述3变量,判断是否应该 创建、销毁线程池中 指定步长的线程。5. threadpool_add ()总功能:模拟产生任务。 num[20]设置回调函数, 处理任务。 sleep(1) 代表处理完成。内部实现:加锁初始化 任务队列结构体成员。 回调函数 function, arg利用环形队列机制,实现添加任务。 借助队尾指针挪移 % 实现。唤醒阻塞在 条件变量上的线程。解锁6. 从 3. 中的wait之后继续执行,处理任务。加锁获取 任务处理回调函数,及参数利用环形队列机制,实现处理任务。 借助队头指针挪移 % 实现。唤醒阻塞在 条件变量 上的 server。解锁加锁 改忙线程数++解锁执行处理任务的线程加锁 改忙线程数——解锁7. 创建 销毁线程管理者线程根据 task_num, live_num, busy_num 根据既定算法,使用上述3变量,判断是否应该 创建、销毁线程池中 指定步长的线程。如果满足 创建条件pthread_create(); 回调 任务线程函数。 live_num++如果满足 销毁条件wait_exit_thr_num = 10; signal 给 阻塞在条件变量上的线程 发送 假条件满足信号 跳转至 --170 wait阻塞线程会被 假信号 唤醒。判断: wait_exit_thr_num > 0 pthread_exit();
3.代码
threadpool.c
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "threadpool.h"#define DEFAULT_TIME 10 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10 /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/
#define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
#define true 1
#define false 0typedef struct {void *(*function)(void *); /* 函数指针,回调函数 */void *arg; /* 上面函数的参数 */
} threadpool_task_t; /* 各子线程任务结构体 *//* 描述线程池相关信息 */struct threadpool_t {pthread_mutex_t lock; /* 用于锁住本结构体 */ pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid; /* 存管理线程tid */threadpool_task_t *task_queue; /* 任务队列(数组首地址) */int min_thr_num; /* 线程池最小线程数 */int max_thr_num; /* 线程池最大线程数 */int live_thr_num; /* 当前存活线程个数 */int busy_thr_num; /* 忙状态线程个数 */int wait_exit_thr_num; /* 要销毁的线程个数 */int queue_front; /* task_queue队头下标 */int queue_rear; /* task_queue队尾下标 */int queue_size; /* task_queue队中实际任务数 */int queue_max_size; /* task_queue队列可容纳任务数上限 */int shutdown; /* 标志位,线程池使用状态,true或false */
};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool = NULL; /* 线程池 结构体 */do {if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { printf("malloc threadpool fail");break; /*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num; /* 活着的线程数 初值=最小线程数 */pool->wait_exit_thr_num = 0;pool->queue_size = 0; /* 有0个产品 */pool->queue_max_size = queue_max_size; /* 最大任务队列数 */pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false; /* 不关闭线程池 *//* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 给 任务队列 开辟空间 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail");break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail");break;}/* 启动 min_thr_num 个 work thread */for (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); /*pool指向当前线程池*/printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); /* 创建管理者线程 */return pool;} while (0);threadpool_free(pool); /* 前面代码调用失败时,释放poll存储空间 */return NULL;
}/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 process: 小写---->大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->lock));/* ==为真,队列已经满, 调wait阻塞 */while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}/*添加任务到任务队列里*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 队尾指针移动, 模拟环形 */pool->queue_size++;/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/while ((pool->queue_size == 0) && (!pool->shutdown)) { printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);}}}/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL); /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* 出队,模拟环形队列 */pool->queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast(&(pool->queue_not_full));/*任务取出后,立即将 线程池琐 释放*/pthread_mutex_unlock(&(pool->lock));/*执行任务*/ printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/pool->busy_thr_num++; /*忙状态线程数+1*/pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg); /*执行回调函数任务*///task.function(task.arg); /*执行回调函数任务*//*任务结束处理*/ printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME); /*定时 对线程池管理*/pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size; /* 关注 任务数 */int live_thr_num = pool->live_thr_num; /* 存活 线程数 */pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num; /* 忙着的线程数 */pthread_mutex_unlock(&(pool->thread_counter));/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock)); int add = 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;}}pthread_mutex_unlock(&(pool->lock));}/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* 要销毁的线程数 设置为10 */pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool == NULL) {return -1;}pool->shutdown = true;/*先销毁管理线程*/pthread_join(pool->adjust_tid, NULL);for (i = 0; i < pool->live_thr_num; i++) {/*通知所有的空闲线程*/pthread_cond_broadcast(&(pool->queue_not_empty));}for (i = 0; i < pool->live_thr_num; i++) {pthread_join(pool->threads[i], NULL);}threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if (pool == NULL) {return -1;}if (pool->task_queue) {free(pool->task_queue);}if (pool->threads) {free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum = -1; // 总线程数pthread_mutex_lock(&(pool->lock));all_threadnum = pool->live_thr_num; // 存活线程数pthread_mutex_unlock(&(pool->lock));return all_threadnum;
}int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum = -1; // 忙线程数pthread_mutex_lock(&(pool->thread_counter));busy_threadnum = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter));return busy_threadnum;
}int is_thread_alive(pthread_t tid)
{int kill_rc = pthread_kill(tid, 0); //发0号信号,测试线程是否存活if (kill_rc == ESRCH) {return false;}return true;
}/*测试*/ #if 1/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);sleep(1); //模拟 小---大写printf("task %d is end\n",(int)arg);return NULL;
}int main(void)
{/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp = threadpool_create(3,100,100); /*创建线程池,池里最小3个线程,最大100,队列最大100*/printf("pool inited");//int *num = (int *)malloc(sizeof(int)*20);int num[20], i;for (i = 0; i < 20; i++) {num[i] = i;printf("add task %d\n",i);/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 */}sleep(10); /* 等子线程完成任务 */threadpool_destroy(thp);return 0;
}#endif相关文章:
Linux网络编程5——多路IO转接
一.TCP状态时序理解 1.TCP状态理解 **CLOSED:**表示初始状态。 **LISTEN:**该状态表示服务器端的某个SOCKET处于监听状态,可以接受连接。 **SYN_SENT:**这个状态与SYN_RCVD遥相呼应,当客户端SOCKET执行CONNECT连接时…...
Redis常见
Redis 事务 什么是 Redis 事务? 你可以将 Redis 中的事务理解为:Redis 事务提供了一种将多个命令请求打包的功能。然后,再按顺序执行打包的所有命令,并且不会被中途打断。 Redis 事务实际开发中使用的非常少,功能比…...
提升 PHP 编码效率的 10 个实用函数
PHP开发者始终追求更简洁、高效的代码。幸运的是,PHP 提供了丰富的内置函数,能显著减少手动编码,提升开发效率。无论经验深浅,掌握这些函数的使用技巧都至关重要。 以下列出了 10 个可以显著加快您的编码过程的 PHP 函数…...
设计模式 行为型 访问者模式(Visitor Pattern)与 常见技术框架应用 解析
访问者模式(Visitor Pattern)是一种行为设计模式,它允许你在不改变元素类的前提下定义作用于这些元素的新操作。这种模式将算法与对象结构分离,使得可以独立地变化那些保存在复杂对象结构中的元素的操作。 假设我们有一个复杂的对…...
golang之数据库操作
1.导入必要的包 import("database/sql"_ "github.com/go-sql-driver/mysql" //使用此作为数据库驱动 ) 2.相关操作 连接数据库 使用sql.Open()函数进行数据库的连接 db, err : sql.Open("mysql", "user:passwordtcp(127.0.0.1:3306)/db…...
对话新晋 Apache SeaTunnel Committer:张圣航的开源之路与技术洞察
近日,张圣航被推选为 Apache SeaTunnel 的 Committer成员。带着对技术的热情和社区的责任,他将如何跟随 Apache SeaTunnel 社区迈向新的高度?让我们一起来聆听他的故事。 自我介绍 请您简单介绍一下自己,包括职业背景、当前的工作…...
Mac 删除ABC 输入法
参考链接:百度安全验证 Mac下删除系统自带输入法ABC,正解!_mac删除abc输入法-CSDN博客 ABC 输入法和搜狗输入法等 英文有冲突~~ 切换后还会在英文状态,可以删除 ;可能会对DNS 输入有影响,但是可以通过复…...
《机器学习》之K-means聚类
目录 一、简介 二、K-means聚类实现步骤 1、初始化数据点、确定K值 2、通过距离分配数据点 3、更新簇中心 4、 迭代更新 三、聚类效果评价方式 1、轮廓系数的定义 2、整体轮廓系数 3、使用场景 4、优点 5、缺点 6、代码实现方法 四、K-means聚类代码实现 1、API接…...
日常工作之 Elasticsearch 常用查询语句汇总
日常工作之 Elasticsearch 常用查询语句汇总 查询现有索引创建索引查询索引结构插入数据查询索引数据查看索引磁盘占用信息删除索引查看分词器分词结果指定查询数量指定条件查询数据迁移统计索引数据量更新数据 在使用 es 的过程中,总是会用到 es 的查询语句&#x…...
WeakAuras NES Script(lua)
WeakAuras NES Script 修星脚本字符串 脚本1:NES !WA:2!TMZFWXX1zDxVAs4siiRKiBN4eV(sTRKZ5Z6opYbhQQSoPtsxr(K8ENSJtS50(J3D7wV3UBF7E6hgmKOXdjKsgAvZFaPTtte0mD60XdCmmecDMKruyykDcplAZiGPfWtSsag6myGuOuq89EVDV9wPvKeGBM7U99EFVVVV33VFFB8Z2TJ8azYMlZj7Ur3QDR(…...
JVM 触发类加载的条件有哪些?
目录 一、类加载生命周期 二、主动引用 2.1、创建类的实例 2.2、访问类的静态字段或静态方法 2.3、反射 2.4、初始化类的子类时,先初始化父类 2.5、虚拟机启动时,初始化 main 方法所在的类 2.6、动态语言支持 三、被动引用 3.1、通过子类引用父…...
Android实战经验篇-增加系统分区
系列文章转如下链接: Android Display Graphics系列文章-汇总 Android实战经验篇-系列文章汇总 本文主要包括部分: 一、Android分区说明 1.1 系统分区查看 1.2 分区表修改 1.3 验证新分区 二、源码修改 2.1 generate_extra_images 2.2 fstab 2…...
深入学习 Python 量化编程
深入学习 Python 量化编程 第一章:Python 基础与量化编程环境搭建 1.1 安装必要的库 首先,你需要安装一些在量化编程中常用的 Python 库。可以通过以下命令安装这些库: pip install numpy pandas matplotlib yfinance backtrader scikit-…...
机器学习笔记——特征工程
大家好,这里是好评笔记,公主号:Goodnote,专栏文章私信限时Free。本笔记介绍机器学习中常见的特征工程方法、正则化方法和简要介绍强化学习。 文章目录 特征工程(Fzeature Engineering)1. 特征提取ÿ…...
4种革新性AI Agent工作流设计模式全解析
文章目录 导读:AI Agent的四种关键设计模式如下:1. 反思2. 工具使用3. 规划4. 多Agent协作 总结内容简介: 导读: AI Agent是指能够在特定环境中自主执行任务的人工智能系统,不仅接收任务,还自主制定和执行…...
【入门级】计算机网络学习
网络安全:前端开发者必知:Web安全威胁——XSS与CSRF攻击及其防范-CSDN博客 三次握手四次挥手:前端网络—三次握手四次挥手_前端三次握手-CSDN博客 http协议和https协议的区别:前端网络—http协议和https协议的区别-CSDN博客 网…...
安装 Jenkins 后无法访问用户名或密码且忘记这些凭证怎么办?
Jenkins 是一款功能强大的自动化服务器,在持续集成与交付(CI/CD)领域应用广泛。不过,用户在使用过程中,尤其是首次接触该系统或系统重启后,常常会遇到登录方面的问题。要是 Jenkins 突然要求输入用户名和密…...
day08_Kafka
文章目录 day08_Kafka课程笔记一、今日课程内容一、消息队列(了解)**为什么消息队列就像是“数据的快递员”?****实际意义**1、产生背景2、消息队列介绍2.1 常见的消息队列产品2.2 应用场景2.3 消息队列中两种消息模型 二、Kafka的基本介绍1、…...
安装conda 环境
conda create -n my_unet5 python3.8 (必须设置3.8版本) conda activate my_unet5...
【dockerros2】ROS2节点通信:docker容器之间/docker容器与宿主机之间
🌀 一个中大型ROS项目常需要各个人员分别完成特定的功能,而后再组合部署,而各人员完成的功能常常依赖于一定的环境,而我们很难确保这些环境之间不会相互冲突,特别是涉及深度学习环境时。这就给团队项目的部署落地带来了…...
【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...
Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
HTML前端开发:JavaScript 常用事件详解
作为前端开发的核心,JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例: 1. onclick - 点击事件 当元素被单击时触发(左键点击) button.onclick function() {alert("按钮被点击了!&…...
Reasoning over Uncertain Text by Generative Large Language Models
https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...
在Ubuntu24上采用Wine打开SourceInsight
1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
