进程池(Process Pool)是一种并发编程的模型,用于管理和复用多个进程,以提高系统的效率和性能。它主要解决的问题是减少因频繁创建和销毁进程而带来的性能开销,特别是在需要处理大量并发任务时尤为有效。
主要组成部分和工作原理
-
进程池管理器:通常由编程语言或框架提供的管理器,负责创建、管理和调度进程池中的各个进程。
-
工作进程:池中的每个进程都是一个独立的执行单元,它们从任务队列中获取任务并执行。工作进程的数量可以根据需求配置。
-
任务队列:用于存储需要执行的任务。主程序将任务提交到任务队列中,进程池会根据任务的到来和工作进程的空闲情况来动态分配任务。
1.进程池整体结构说明
我们以一个文件下载的应用为例子来介绍进程池结构:客户端可以向服务端建立连接,随后将服务端中存储的文件通过网络传输发送到客户端,其中一个服务端可以同时处理多个客户端连接的,彼此之间互不干扰。
1.1进程池模型结构图说明
1. 客户端与服务端的交互流程
- 客户端发送请求:客户端向服务端发送请求,要求下载某个文件。
- 服务端接收请求:服务端的主进程(main)监听请求,当接收到来自客户端的请求时,获取连接并分配处理任务。
2. 服务端进程池的工作流程
-
请求分配:服务端主进程将新的连接分配给进程池中的一个工作进程(worker)。主进程保持一个进程池记录,用于管理所有工作进程。
-
工作进程处理:分配到任务的工作进程接收连接文件描述符,然后读取客户端请求的文件。
- 读取文件:工作进程根据客户端的请求,从磁盘中读取相应的文件。
- 响应客户端:读取文件后,工作进程将文件通过网络连接发送回客户端。
-
工作进程回收:当一个工作进程完成任务后,它会将自身的状态返回到进程池记录中,表示该工作进程已空闲,可以接收新的任务。
3. 父进程和子进程的关系
- 父进程(main):负责监听客户端请求,分配连接,管理进程池。它不会直接处理请求,而是将任务分配给子进程处理。
- 子进程(worker):由进程池管理的工作进程,负责实际处理任务,如读取文件、响应客户端请求等。
1.2进程池的详细工作流程
父进程的工作流程:
-
创建子进程:
- 父进程在启动时创建N个子进程,并将这些子进程挂起,等待文件传输任务。
-
监听客户端连接:
- 父进程创建一个监听套接字,绑定特定端口并开始监听来自客户端的新连接。
-
创建epoll实例:
- 父进程创建一个epoll实例,用于监控多个文件描述符的事件。主要监控监听套接字和子进程间通信的管道。
-
接受客户端连接:
- 当有客户端连接到来时,监听套接字上会触发事件,父进程使用
accept
函数接收连接,得到客户端的文件描述符(peerfd)。
- 当有客户端连接到来时,监听套接字上会触发事件,父进程使用
-
分配任务给子进程:
- 父进程检查子进程的状态表,找到一个空闲的子进程,通过进程间通信的管道,将客户端的文件描述符传递给这个子进程。
-
监控子进程状态:
- 父进程通过管道监控子进程的状态。如果管道可读,表示子进程已完成任务,父进程将该子进程标记为空闲状态。
子进程的工作流程:
-
等待任务:
- 子进程启动后,阻塞在管道的读操作上,等待父进程传递文件描述符。
-
处理任务:
- 当管道中有数据到来时,子进程从管道中读取文件描述符,开始执行文件传输任务,将文件内容发送给客户端。
-
完成任务:
- 文件传输完成后,子进程关闭客户端的文件描述符,释放资源。
-
通知父进程:
- 子进程通过管道通知父进程自己已完成任务,并进入等待状态,准备处理下一个任务。
2.进程池的实现
2.1父子进程共享文件描述符(难点)
那么父进程向子进程到底需要传递哪些信息呢?除了传递一般的控制信息和文本信息(比如上传)以外,需要特别注意的是需要传递已连接套接字的文件描述符 。
父进程会监听特定某个 IP:PORT ,如果有某个客户端连接之后,子进程需要能够连上 accept 得到的已连接套接字的文件描述符(就是父进程得到的和客户端通信的通信套接字),这样子进程才能和客户端进行通信。这种文件描述符的传递不是简单地传输一个整型数字就行了,而是需要让父子进程共享一个套接字文件对象。
但是这里会遇到麻烦,因为 accept 调用是在 fork 之后的,所以父子进程之间并不是天然地共享文件对象。倘若想要在父子进程之间共享 acccept 调用返回的已连接套接字,需要采用一些特别的手段: 一方面,父子进程之间需要使用本地套接字来通信数据。另一方面需要使用 sendmsg 和 recvmsg 函数来传递数据。

用什么方法可以实现将父进程得到通信套接字传递给子进程呢?
使用socketpair、sendmsg、和recvmsg三个函数。这三个函数的具体使用方法如下:
进程间的文件描述传递
2.2完整示例代码
//头文件process_pool.h
#ifndef __WD_FUNC_H
#define __WD_FUNC_H#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <errno.h>
#include <error.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <dirent.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/epoll.h>
#include <assert.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <pthread.h>
#include <sys/uio.h>#define SIZE(a) (sizeof(a)/sizeof(a[0]))typedef void (*sighandler_t)(int);#define ARGS_CHECK(argc, num) {\if(argc != num){\fprintf(stderr, "ARGS ERROR!\n");\return -1;\}}#define ERROR_CHECK(ret, num, msg) {\if(ret == num) {\perror(msg);\return -1;\}}//进程状态
typedef enum {FREE,BUSY
}status_t;typedef struct {pid_t pid; //子进程的idint pipefd; //与子进程通信的管道status_t status; //0 空闲, 1 是忙碌
}process_data;int makeChild(process_data *, int );
int doTask(int pipefd);int sendFd(int pipefd, int fd);
int recvFd(int pipefd, int * pfd);int tcpInit(const char * ip, unsigned short port);
int epollAddReadEvent(int epfd, int fd);
int epollDelReadEvent(int epfd, int fd);#endif
//main.c
#include "process_pool.h"int main(int argc, char **argv){//ip port processnum 命令行传入的三个参数ARGS_CHECK(argc, 4);int processNum = atoi(argv[3]); //将传入的第三个参数进程数量转换成int类型//申请进程池的地址process_data* pProcess = calloc(processNum, sizeof(process_data));//创建N个子进程makeChild(pProcess, processNum);//创建监听的服务器int listenfd = tcpInit(argv[1], atoi(argv[2]));//创建epoll实例int epfd = epoll_create1(0);ERROR_CHECK(epfd, -1, "epfd");//epoll添加监听套接字listenfd的可读事件,是否有客户端的连接epollAddReadEvent(epfd, listenfd);//epoll添加进程池每个子进程与与父进程之间的读管道的读事件for(int i = 0; i < processNum; ++i){epollAddReadEvent(epfd, pProcess[i].pipefd);}//定义保存就绪的文件描述符的数组struct epoll_event eventArr[10] = {0};int nready = 0;while (1){nready = epoll_wait(epfd, eventArr, sizeof(eventArr), -1);for(int i = 0; i < nready; ++i){int fd = eventArr[i].data.fd;//新客户端的连接if(fd == listenfd){struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);//接受客户端的连接,得到通信套接字int peerfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);ERROR_CHECK(peerfd, -1, "accept");printf("client %s:%d connected.\n",inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));//将通信套接字peerfd发送给一个空闲的子进程for(int j = 0; j < processNum; ++j){if(pProcess[j].status == FREE){sendFd(pProcess[j].pipefd, peerfd);pProcess[j].status = BUSY;break;}}//如果要断开与客户端的连接,这里还得执行一次close(peerfd);}else{//管道发生了事件: 子进程已经执行完任务了int howmany = 0;read(fd, &howmany, sizeof(howmany));for(int j = 0; j < processNum; ++j) {if(pProcess[j].pipefd == fd) {pProcess[j].status = FREE;printf("child %d is not busy.\n", pProcess[j].pid);break;}}}}}close(listenfd);close(epfd);return 0;
}
//sendFd.c 接受和传递文件描述符
#include <func.h>int sendFd(int pipefd, int fd)
{//构建第二组成员char buff[6] = {0};struct iovec iov;memset(&iov, 0, sizeof(iov));iov.iov_base = buff;iov.iov_len = sizeof(buff);//构建第三组成员int len = CMSG_LEN(sizeof(fd));struct cmsghdr * pcmsg = (struct cmsghdr*)calloc(1, len);pcmsg->cmsg_len = len;pcmsg->cmsg_level = SOL_SOCKET;pcmsg->cmsg_type = SCM_RIGHTS;int * p = (int*)CMSG_DATA(pcmsg);*p = fd;//构建msghdrstruct msghdr msg;memset(&msg, 0, sizeof(msg));msg.msg_iov = &iov;msg.msg_iovlen = 1;msg.msg_control = pcmsg;//传递文件描述符msg.msg_controllen = len;//sendmsg的返回值大于0时,就是iov传递的数据长度int ret = sendmsg(pipefd, &msg, 0);printf("sendmsg ret: %d\n", ret);ERROR_CHECK(ret, -1, "sendmsg");free(pcmsg);return 0;
}int recvFd(int pipefd, int * pfd)
{//构建第二组成员char buff[6] = {0};struct iovec iov;memset(&iov, 0, sizeof(iov));iov.iov_base = buff;iov.iov_len = sizeof(buff);//构建第三组成员int len = CMSG_LEN(sizeof(int));struct cmsghdr * pcmsg = (struct cmsghdr*)calloc(1, len);pcmsg->cmsg_len = len;pcmsg->cmsg_level = SOL_SOCKET;pcmsg->cmsg_type = SCM_RIGHTS;//构建一个struct msghdrstruct msghdr msg;memset(&msg, 0, sizeof(msg));msg.msg_iov = &iov;msg.msg_iovlen = 1;msg.msg_control = pcmsg;//传递文件描述符msg.msg_controllen = len;int ret = recvmsg(pipefd, &msg, 0);ERROR_CHECK(ret, -1, "recvmsg");int * p = (int*)CMSG_DATA(pcmsg);*pfd = *p;//读取文件描述符的值,并传给外界的变量return 0;
}
//server.c
#include "process_pool.h"//TCP服务端初始化
int tcpInit(const char *ip, unsigned short port){//创建服务器的监听套接字int listenfd = socket(AF_INET, SOCK_STREAM, 0);ERROR_CHECK(listenfd, -1, "socket");//设置套接字的网络地址可以重用int on = 1;int ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));ERROR_CHECK(ret, -1, "setsockopt");struct sockaddr_in serveraddr;memset(&serveraddr, 0, sizeof(serveraddr));//指定使用的是IPv4的地址类型 AF_INETserveraddr.sin_family = AF_INET;serveraddr.sin_port = htons(port);serveraddr.sin_addr.s_addr = inet_addr(ip);//以人类可阅读的方式打印网络地址printf("%s:%d\n", inet_ntoa(serveraddr.sin_addr),ntohs(serveraddr.sin_port));//绑定服务器的网络地址ret = bind(listenfd, (const struct sockaddr*)&serveraddr, sizeof(serveraddr));ERROR_CHECK(ret, -1, "bind");//监听客户端的到来ret = listen(listenfd, 1);ERROR_CHECK(ret, -1, "listen");return listenfd;}//epoll添加监听读事件
int epollAddReadEvent(int epfd, int fd){struct epoll_event ev;memset(&ev, 0, sizeof(ev));ev.events = EPOLLIN;ev.data.fd = fd;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); //上树ERROR_CHECK(ret, -1, "epoll_ctl");return 0;
}//epoll移除监听的读事件
int epollDelReadEvent(int epfd, int fd)
{struct epoll_event ev;memset(&ev, 0, sizeof(ev));ev.events = EPOLLIN;ev.data.fd = fd;int ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev); //下树ERROR_CHECK(ret, -1, "epoll_ctl");return 0;
}
//child.c
#include "process_pool.h"//创建num个进程
int makeChild(process_data* pProcess, int num){for(int i = 0; i < num; ++i){int fds[2];socketpair(AF_LOCAL, SOCK_STREAM, 0, fds); //创建全双工管道,用于父子进程之间传递文件描述符pid_t pid = fork();if(pid == 0){//子进程执行close(fds[1]); //关闭写端doTask(fds[0]);exit(0);}//父进程执行close(fds[0]); //关闭读端//初始化子进程的数据pProcess[i].pid = pid;pProcess[i].pipefd = fds[1]; //写端,用与子进程通信的管道pProcess[i].status = FREE;}return 0;
}int doTask(int pipefd)
{printf("proces %d is doTask...\n", getpid());while(1){int peerfd = -1;//子进程不断地读取管道中传递过来的peerfd,通信套接字recvFd(pipefd, &peerfd);//模拟发送文件的操作send(peerfd, "hello,client",12,0);printf("child %d send finish.\n", getpid());//transferFile(peerfd);//关闭peerfdclose(peerfd);//通知父进程,任务执行完毕int one = 1;write(pipefd, &one, sizeof(one));}return 0;
}
//client.c 客户端的代码
#include <func.h>
#include <unistd.h>int main()
{//创建客户端的套接字int clientfd = socket(AF_INET, SOCK_STREAM, 0);ERROR_CHECK(clientfd, -1, "socket");struct sockaddr_in serveraddr;memset(&serveraddr, 0, sizeof(serveraddr));//指定使用的是IPv4的地址类型 AF_INETserveraddr.sin_family = AF_INET;serveraddr.sin_port = htons(8080);serveraddr.sin_addr.s_addr = inet_addr("127.0.0.1");//连接服务器int ret = connect(clientfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));ERROR_CHECK(ret, -1, "connect");printf("connect success.\n");//进行数据的接收和发送fd_set rdset;FD_ZERO(&rdset);char buff[100] = {0};//事件循环while(1) {FD_SET(STDIN_FILENO, &rdset);FD_SET(clientfd, &rdset);select(clientfd + 1, &rdset, NULL, NULL, NULL);//当select函数返回时,rdset会被修改的if(FD_ISSET(STDIN_FILENO, &rdset)) {//读取从键盘输入的字符串memset(buff, 0, sizeof(buff));//通过read函数会把'\n'也读进来ret = read(STDIN_FILENO, buff, sizeof(buff));if(strcmp(buff, "bye\n") == 0) {break;}//在发送时,不需要发送'\n'send(clientfd, buff, ret - 1, 0);}if(FD_ISSET(clientfd, &rdset)) {//从服务器接收数据memset(buff, 0, sizeof(buff));ret = recv(clientfd, buff, sizeof(buff), 0);if(ret == 0) {printf("byebye.\n");break;}printf("ret: %d, recv: %s\n", ret, buff);}}close(clientfd);return 0;
}
(另外需要在Linux中的 /usr/include(主要存放系统的一些头文件) 下加一个头文件,不然你的电脑中无法识别上述代码中的func.h头文件)
//func.h
#ifndef __WD_FUNC_H
#define __WD_FUNC_H#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <errno.h>
#include <error.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <dirent.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/epoll.h>
#include <assert.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <pthread.h>
#include <unistd.h>
#include <pthread.h>
#include <netdb.h>
#define SIZE(a) (sizeof(a)/sizeof(a[0]))
#define ERROR_CHECK(retval, errnumber, message){\if((int)retval == (int)errnumber){ \error(1,errno,(char *)message); \} \
}
#define ARGC_CHECK(argc, needed){ \if((int)argc != (int)needed){ \error(1,0,"the arguments should be %d \n",needed);\} \
}typedef void (*sighandler_t)(int);#endif