【Linux】Reactor模式

​🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
在这里插入图片描述

目录

    • 👉Reactor 模式👈
      • 什么是 Reactor 模式
      • Reactor 模式的组件
      • Reactor 模式的工作流程
    • 👉使用 Reactor 模式设计 TcpServer👈
      • socket 的封装
      • 日志模块
      • Epoll 模型的封装
      • TcpServer 的设计
      • 协议定制
      • 服务端的编写
      • 客户端的编写
      • 综合演示
    • 👉总结👈

👉Reactor 模式👈

什么是 Reactor 模式

Reactor 模式是一种事件驱动的设计模式,是将就绪的事件交给特定的事件处理器,用于实现高效的事件驱动程序。它主要用于网络编程中,用于处理大量的并发连接。

Reactor 模式的组件

Reactor 模式的核心思想是将 I / O 操作(如网络请求、文件操作等)抽象为事件,然后通过一个事件循环(Event Loop)来监视和分发这些事件。它的基本组成部分包括:

  • 事件(Event):在网络服务器中,事件通常是指 socket 上的读事件、写事件和异常事件等。事件通常都需要和 socket 与事件处理器绑定在一起。事件可以在 I / O 通道状态发生变化时被触发,然后被事件分发器通知给事件循环。事件循环通过事件的类型和相关信息来确定哪个事件处理器应该处理这个事件。

  • 事件处理器(Handler):每一种 I / O 事件都对应一个处理器,负责处理特定类型的事件。例如,网络连接事件、数据读取事件、数据写入事件等都可以有对应的事件处理器。

  • 事件循环(Event Loop):是 Reactor 模式的核心部分,它不断地检查是否有新的事件发生,如果有就将事件分发给相应的事件处理器进行处理。事件循环通常以无限循环的形式运行,直到系统关闭。

  • 事件分发器(Demultiplexer):用于监视多个 I / O 通道的状态,以确定哪些通道已经就绪(可读或可写)。这个组件可以使用操作系统提供的机制,如 select、poll、epoll 等。高性能的网络服务器使用的都是 epoll 接口。

Reactor 模式的工作流程

Reactor模式的工作流程如下:

  • 应用程序初始化:创建事件处理器和事件循环,将事件处理器注册到事件循环中。

  • 事件循环开始:事件循环开始无限循环,在每次循环中,它会通过事件分发器检查所有的I/O通道,看是否有事件就绪。

  • 事件分发:当某个I/O通道就绪时,事件分发器将通知事件循环,事件循环根据就绪的通道找到对应的事件处理器,并将事件传递给它进行处理。

  • 事件处理:事件处理器根据收到的事件类型执行相应的操作,这可能涉及读取数据、写入数据、连接管理等。

  • 事件处理完成:事件处理器执行完毕后,将结果返回给事件循环。

通过这种方式,Reactor 模式可以实现高并发的 I / O 操作,因为它使用事件循环在单线程中处理多个 I / O 事件,避免了创建多个线程或进程,从而减少了资源开销和上下文切换的成本。

但是 Reactor 模式适用于 I / O 密集型的应用,但不适用于 CPU 密集型的场景,因为在事件处理过程中如果发生阻塞操作,会影响其他事件的处理。为了解决这个问题,可以将阻塞的操作委托给线程池等机制来处理。

👉使用 Reactor 模式设计 TcpServer👈

socket 的封装

#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
#include <fcntl.h>class Sock
{
private:// listen的第二个参数是底层全链接队列的长度,其数值为listen的第二个参数+1const static int gbackLog = 10;
public:Sock(){}// 常见套接字static int Socket(){int sock = socket(AF_INET, SOCK_STREAM, 0);if(listen < 0) exit(2);int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));return sock;}// 绑定 IP 地址和端口号static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0"){struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(port);inet_pton(AF_INET, ip.c_str(), &local.sin_addr);if(bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0) exit(3);}static void Listen(int sock){if(listen(sock, gbackLog) < 0)exit(4);}// 一般经验// const std::string &: 输入型参数// std::string *: 输出型参数// std::string &: 输入输出型参数// 获取新连接static int Accept(int listenSock, std::string* ip, uint16_t* port, int* acceptErrno){struct sockaddr_in src;socklen_t len = sizeof(src);*acceptErrno = 0;int serviceSock = accept(listenSock, (struct sockaddr*)&src, &len);if(serviceSock < 0) {*acceptErrno = errno;return -1;}if(port) *port = ntohs(src.sin_port);if(ip) *ip = inet_ntoa(src.sin_addr);return serviceSock;}// 连接服务器static bool Connect(int sock, const std::string& serverIp, const uint16_t& serverPort){struct sockaddr_in server;memset(&server, 0, sizeof server);server.sin_family = AF_INET;server.sin_port = htons(serverPort);server.sin_addr.s_addr = inet_addr(serverIp.c_str());if(connect(sock, (struct sockaddr*)&server, sizeof server) == 0) return true;else return false;}// 将 sock 设置为非阻塞,与 ET 模式配合使用static bool SetNonBlock(int sock){int fl = fcntl(sock, F_GETFL);if(fl < 0)  return false;fcntl(sock, F_SETFL, fl | O_NONBLOCK);return true;}~Sock(){}
};

Sock 类主要是封装了 socket 的相关接口,主要是创建套接字、绑定 IP 地址和端口号、将套接字设置为监听套接字、连接服务器、服务器获取新连接以及将套接字设置为非阻塞等等。

日志模块

#pragma once#include <cstdio>
#include <cstdarg>
#include <string>
#include <iostream>
#include <poll.h>
#include <ctime>// 日志等级
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4#define LOGFILE "./ThreadPool.log"const char* levelMap[] = 
{"DEBUG","NORMAL","WARNING","ERROR","FATAL"
};void logMessage(int level, const char* format, ...)
{char stdBuffer[1024];   // 标准部分time_t timestamp = time(nullptr);// struct tm *localtime = localtime(&timestamp);snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", levelMap[level], timestamp);char logBuffer[1024];   // 自定义部分va_list args;   // va_list就是char*的别名va_start(args, format); // va_start是宏函数,让args指向参数列表的第一个位置// vprintf(format, args); // 以format形式向显示器上打印参数列表vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args);   // va_end将args弄成nullptr// FILE* fp = fopen(LOGFILE, "a");printf("%s%s\n", stdBuffer, logBuffer);// fprintf(fp, "%s%s\n", stdBuffer, logBuffer);    // 向文件中写入日志信息// fclose(fp);
}

日志模块在之前的博客中已经多次提及到,就不再赘述了。

Epoll 模型的封装

#pragma once#include <iostream>
#include <sys/epoll.h>class Epoll
{
private:const static int defaultSize = 128;const static int defaultTimeOut = 3000;public:Epoll(int timeout = defaultTimeOut): _timeout(timeout){}~Epoll(){if(_epfd >= 0) close(_epfd);}// 创建 epoll 模型void CreateEpoll(){_epfd = epoll_create(defaultSize);if(_epfd < 0) exit(5);}// 将 socket 添加到 epoll 模型中bool AddSockToEpoll(int sock, uint32_t events){events |= EPOLLET; // ET 模式struct epoll_event ev;ev.data.fd = sock;ev.events = events;int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);return n == 0;}bool DelSockFromEpoll(int sock){int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);return n == 0;}int WaitEpoll(struct epoll_event* readyEvents, int maxEvents){return epoll_wait(_epfd, readyEvents, maxEvents, _timeout);}bool CtlEpoll(int sock, uint32_t events){events |= EPOLLET;struct epoll_event ev;ev.data.fd = sock;ev.events = events;int n = epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev);return n == 0;}private:int _epfd;int _timeout;
};

Epoll 类主要是封装了 epoll 的相关接口,方便 TcpServer 服务器进行调用。

  • CreateEpoll:TcpServer 调用该接口,即可创建一个 epoll 模型。
  • AddSockToEpoll:将 TcpServer 需要关心该 sock 的 events 事件添加到 epoll 模型中,让 epoll 来关心这些事件,事件发生后通知 TcpServer 进行处理。
  • DelSockFromEpoll:让 epoll 模型不再关心 sock 上发生的事件。
  • WaitEpoll:让 epoll 模型等待事件的发生,事件发生后通知 TcpServer 进行处理。
  • CtlEpoll:修改 epoll 模型对 sock 上发生的事件的关心。比如,之前只让 epoll 模型关心 sock 的读事件,现在让 epoll 模型也关心 sock 的写事件。

TcpServer 的设计

TcpServer 需要处理的事件

  • 读事件:如果监听套接字的读事件就绪,则调用封装好 Accepter 函数获取底层建立好的连接,并进行相应的处理。而如果是普通套接字的读事件就绪,则调用封装好的 Recver 函数读取客户单发来的数据,并进行业务处理。
  • 写事件:当写事件就绪时,就调用封装好的 Sender 函数写入到底层 TCP 的发送缓冲区中。
  • 异常事件:当某个文件描述符的异常事件发生了,我们直接将该文件描述符关闭掉,不进行过多的处理。

Connection 类的封装

#pragma once#include <iostream>
#include <functional>
#include <vector>
#include <string>
#include <unordered_map>
#include <cassert>
#include <cerrno>
#include "Log.hpp"
#include "Sock.hpp"
#include "Epoll.hpp"
#include "Protocol.hpp"// 前置声明
class TcpServer; 
class Connection;using func_t = std::function<void(Connection*)>;
using callback_t = std::function<void(Connection*, std::string&)>;// 使用 Connection 来表示一个连接
// 该连接有自己的接收缓冲区和发送缓冲区
class Connection
{
public:Connection(int sock = -1): _sock(sock), _ptr(nullptr){}// 设置该连接的回调函数void SetCallBack(func_t readCallBack, func_t writeCallBack, func_t exceptCallBack){_readCallBack = readCallBack;_writeCallBack = writeCallBack;_exceptCallBack = exceptCallBack;}~Connection(){if(_sock >= 0) close(_sock);}public:// 负责进行 IO 的文件描述符int _sock; // 三个回调方法,表示对 _sock 的读事件、写事件和异常事件的处理方法func_t _readCallBack;func_t _writeCallBack;func_t _exceptCallBack;// 每个连接独占的接收缓冲区和发送缓冲区std::string _inBuffer; // 暂时没有办法处理二进制流,文本数据是可以处理的std::string _outBuffer;// TcpServer 的回指指针,用于业务处理TcpServer* _ptr;
};

Connection 类中除了包含文件描述符和其对应的读时间回调、写事件回调和异常事件回调之外,还包含一个输入缓冲区 _inBuffer、一个输出缓冲区 _outBuffer 以及一个回指指针 _ptr。

  • 当某个文件文件描述符的读事件就绪时,我们会调用 recv 函数读取客户端发过来的数据,但是这并不能保证我们读取到了一个完整的报文,因此我们需要将读取到的数据暂时存放在该文件描述符对应的接收缓冲区 _inBuffer 中。当 _inBuffer 中的数据能够分离出一个完整的报文,我们再进行业务处理,所以 _inBuffer 的本质就是用来解决粘包问题的。
  • 当处理完一个网络请求后,需要将响应的数据发送给客户端,但是我们并不能保证底层 TCP 的发送缓冲区中有足够的空间供我们写入,因此需要将要发送的响应数据暂时保存在该文件描述符对应的发送缓冲区 _outBuffer 中。等待底层 TCP 发送缓冲区中有空间时,再将 _outBuffer 中的数据发送出去。
  • Connection 类中还包含了指向 TcpServer 的回指指针,便于我们能够快速找到 TcpServer,这种操作在开源项目中非常常见,有利于我们进行软件分层。业务处理完,需要将响应发给客户端,此时我们就可以开启对文件描述符写事件的关心,然后就能够触发一次写事件就绪将响应发送出去。

TcpServer 的构造和析构

class TcpServer
{
private:const static int defaultPort = 8080;const static int defaultMaxEvents = 128;public:TcpServer(int port = defaultPort, int maxEvents = defaultMaxEvents): _port(port), _maxEvents(maxEvents){// 1. 创建监听套接字_listenSock = Sock::Socket();Sock::Bind(_listenSock, _port);Sock::Listen(_listenSock);// 2. 创建 epoll 模型_epoll.CreateEpoll();// 3. 将 _listenSock 封装成 Connection 添加到 TcpServer 中// _listenSock 只需要关心读事件即可AddConnection(_listenSock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);// 4. 申请保存就绪事件的数组_readyEvents = new struct epoll_event[_maxEvents];logMessage(NORMAL, "Init TcpServer Success");}~TcpServer(){if (_listenSock >= 0)close(_listenSock);if (_readyEvents)delete[] _readyEvents;}private:int _listenSock; // 监听套接字int _port; // 端口号Epoll _epoll;std::unordered_map<int, Connection *> _connections; // 用来管理连接的哈希表struct epoll_event *_readyEvents; // 保存就绪事件的数组,可以用 vector 代替int _maxEvents; // _readyEvents 的最大容量// 业务处理的回调函数callback_t _callBack;
};

在 TcpServer 类中,除了必要的服务器的端口号、监听套接字之外,还有封装好的 _epoll、用来管理连接的哈希表以及业务处理的回调函数等等。

  • 在构造函数中,主要做的事件就是创建监听套接字、创建 epoll 模型、调用 AddConnetion 函数设置监听套接字的读事件的回调函数并将监听套接字封装成 Connection 保存到哈希表 _connections 中进行管理、申请空间用来保存就绪的事件。
  • 在析构函数中,需要调用 close 函数来关闭监听套接字,还需要释放用来保存就绪事件的空间。
  • 设置业务处理的回调函数的目的是让网络层的代码和业务层的代码解耦,避免强耦合。这样做的好处就是修改业务层的代码,并不会影响到网络层的代码。

AddConnetion 函数的介绍

class TcpServer
{// ...
private:// 将连接添加到 TcpServer 中void AddConnection(int sock, func_t readCallBack, func_t writeCallBack, func_t exceptCallBack){/* ************************************************************* TcpServer 上将会存在大量的连接,也就是会存在大量的 Connection* 那么 TcpServer 就需要采用先描述再组织的方式来将大量的 Connection* 管理起来,在这里我是要哈希表的方式来管理* *************************************************************/// 1. 将文件描述符设置为非阻塞(ET模式)Sock::SetNonBlock(sock);// 2. 构建 Connection 对象Connection *conn = new Connection(sock);conn->SetCallBack(readCallBack, writeCallBack, exceptCallBack);conn->_ptr = this;// 3. 将 sock 添加到 epoll 中// 任何的多路转接服务器一般默认只会打开对// 读事件的关心,对写事件的关心按需打开_epoll.AddSockToEpoll(sock, EPOLLIN | EPOLLET);// 4. 保存文件描述符与 Connection 的映射关系_connections[sock] = conn;}// ...
};
  • 需要给 AddConnetion 函数传入四个参数,分别是套接字 sock,读事件回调 readCallBack、写事件回调 writeCallBack 以及异常事件回调 exceptCallBack。
  • 由于 TcpServer 采用的是 ET 模式的 epoll 模型,所以需要将套接字 sock 设置为非阻塞。
  • 申请 Connection 对象保存套接字 sock,设置 Connection 的相关回调,然后设置指向 TcpServer 的回指指针。
  • 将 sock 添加到 epoll 模型中,让 epoll 来帮我们关心 sock 上发送的事件。需要注意的是,我们只需要让 epoll 关心 sock 的读事件即可,写事件按照需要来进行开启。
  • 最后,需要将 sock 和 Connection 的映射关系保存到哈希表中。

事件分发 Dispather

class TcpServer
{// ...
public:// 根据就绪的事件进行特定事件的派发void Dispacther(callback_t callBack){_callBack = callBack;while (true){LoopOnce();}}private:void LoopOnce(){int n = _epoll.WaitEpoll(_readyEvents, _maxEvents);for (int i = 0; i < n; ++i){int sock = _readyEvents[i].data.fd;        // 就绪的文件描述符uint32_t revents = _readyEvents[i].events; // 就绪的事件// 将所有的异常都交给Recver或Sender来处理if (revents & EPOLLERR) // 文件描述符发生了错误revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLHUP) // 对端关闭了连接revents |= (EPOLLIN | EPOLLOUT);// 如果连接存在且设置了相应的回调方法,才可以调用回调方法if (revents & EPOLLIN){if (ConnectionIsExists(sock) && _connections[sock]->_readCallBack != nullptr)_connections[sock]->_readCallBack(_connections[sock]);}if (revents & EPOLLOUT){if (ConnectionIsExists(sock) && _connections[sock]->_writeCallBack != nullptr)_connections[sock]->_writeCallBack(_connections[sock]);}}}// 检查连接是否存在bool ConnectionIsExists(int sock){return _connections.count(sock) == 1;}// ...
};
  • Dispacther 函数需要调用者传入一个回调函数,这个回调函数是用于处理相关业务的,只要传入的回调函数不同,就能够处理不同的业务了。保存好用于处理业务的回调函数后,就需要开启事件循环了。事件循环通常以死循环的形式运行,也就是一直调用 LoopOnce 函数。
  • 在 LoopOnce 函数中,主要是调用 WaitEpoll 函数等待事件的发生。当有事件发生后,如果 sock 在哈希表中并设置了相应的回调函数,则调用相应的回调函数,也就是事件处理。

Accepter 函数的介绍

class TcpServer
{// ...   
private:// _listenSock 用来接收新连接的回调方法void Accepter(Connection *conn){// 来到这里说明有新的连接到来,因为不知道底层有多少个连// 接,所以需要通过while循环一直获取连接。如果不是这样// 的话,可能会导致有些客户端长时间连接不上服务器的问题while (true){std::string clientIp;uint16_t clientPort;int acceptErrno = 0;int sock = Sock::Accept(_listenSock, &clientIp, &clientPort, &acceptErrno);if (sock < 0){if (acceptErrno == EAGAIN || acceptErrno == EWOULDBLOCK)break; // 底层的连接已经全部获取完了,可以退出循环了else if (acceptErrno == EINTR)continue; // 被信号中断,可以继续获取连接else{// accept失败logMessage(WARNING, "Accepter Error:%d %s", acceptErrno, strerror(acceptErrno));break;}}// 将 sock 添加到 TcpServer 中if (sock >= 0){AddConnection(sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1),std::bind(&TcpServer::Sender, this, std::placeholders::_1),std::bind(&TcpServer::Excepter, this, std::placeholders::_1));logMessage(DEBUG, "Accept Client[%s:%d:%d] Success", clientIp.c_str(), clientPort, sock);}}}// ...
};
  • 在 Accepter 函数中,主要就是将底层建立好的连接全部获取上来,然后调用 AddConnection 设置 sock 的事件处理器并将连接保存在哈希表中管理起来。
  • 当 Accept 函数的返回值为 -1 时,需要检查错误码。当错误码为 EAGAIN 或 EWOULDBLOCK 时,则表示底层的连接全部获取完了,可以退出循环了。当错误码为 EINTR 时,则表明获取连接时收到了信号,继续循环即可。当错误码为其他时,则表明真的遇到了错误,那么就打印错误信息并退出循环。

Recver 函数的介绍

class TcpServer
{// ...   
private:// 普通套接字读事件就绪的回调方法void Recver(Connection *conn){const int num = 1024;bool err = false; // 表示对端是否关闭连接while (true){char buffer[num];ssize_t n = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0);if (n < 0){if (errno == EAGAIN || errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;else{logMessage(ERROR, "Recver Error:%d %s", errno, strerror(errno));conn->_exceptCallBack(conn);err = true;break;}}else if (n == 0){logMessage(DEBUG, "Client[%d] Quit, Me Too", conn->_sock);conn->_exceptCallBack(conn);err = true;break;}else{// 读取底层数据成功buffer[n] = 0;conn->_inBuffer += buffer; // 将读取到的事件添加到接受缓存中}}if (!err)logMessage(DEBUG, "Client[%d] Send Message:%s", conn->_sock, conn->_inBuffer.c_str());if (!err){std::vector<std::string> messages;// 对 _inBuffer 进行切分,并将切分结果放入到  messages 中SpliteMessage(conn->_inBuffer, messages);// 将分割出来的独立报文交给具体业务进行处理// 在这里可以将message封装成为task,然后push// 到任务队列,任务处理交给后端线程池for (auto &msg : messages)_callBack(conn, msg); // msg 就是一个完整的报文}}// ...
};
  • Recver 函数是普通套接字读事件就绪的回调方法。
  • 在 Recver 函数中,主要是调用 recv 函数将底层收到的数据获取上来,然后根据 recv 函数的返回值 n 来进行具体的操作。
  • 当 n 大于 0 时,则说明获取底层的数据成功。但这并不意味着底层的数据全部获取完了,所以还需要进行读取,不能退出循环。当 n 等于 0 时,则说明对端关闭连接,需要调用异常事件处理函数 Excepter 进行相应的异常处理,然后直接 return 返回。当 n 小于 0 时,则需要关心错误码 errno 了。当 errno 为 EAGAIN 或 EWOULDBLOCK 时,则表示底层的数据全部获取完了,可以退出循环了。当 errno 为 EINTR 时,则表明获取数据时收到了信号,继续循环即可。当 errno 为其他时,则表明真的遇到了错误,那么就打印错误信息,调用 Excepter 函数,然后直接 return 返回。
  • 当退出 while 循环时,则需要对获取到的数据进行解析,然后再进行相关的业务处理。
    -注:对端关闭连接,也会让文件描述符的读事件就绪。

Sender 函数的介绍

class TcpServer
{// ...   
public:void EnableReadWrite(Connection *conn, bool readAble, bool writeAble){uint32_t events = ((readAble ? EPOLLIN : 0) | (writeAble ? EPOLLOUT : 0));bool ret = _epoll.CtlEpoll(conn->_sock, events);assert(ret);}private:void Sender(Connection *conn){while (true){ssize_t n = send(conn->_sock, conn->_outBuffer.c_str(), conn->_outBuffer.size(), 0);if (n > 0){conn->_outBuffer.erase(0, n);if (conn->_outBuffer.empty())break;}else{if (errno == EAGAIN || errno == EWOULDBLOCK) // 底层TCP的发送缓冲区没有空间了break;else if (errno == EINTR)continue;else{logMessage(ERROR, "Sender Error:%d %s", errno, strerror(errno));conn->_exceptCallBack(conn);return;}}}// 如果数据发送完了,则需要取消对conn->_sock写事件的关心// 如果数据没有发送完,则不需要取消对conn->_sock写事件的关心if (conn->_outBuffer.empty())EnableReadWrite(conn, true, false);elseEnableReadWrite(conn, true, true);}// ...
};
  • Sender 函数是将响应发回给客户端,但是发送可能会出现各种情况,因此我们要对 send 的返回值 n 进行判断。
  • 当 n 大于 0 时,则说明数据已经成功拷贝到底层 TCP 的发送缓冲区中了。当 n 小于等于 0 时,则需要判断错误码。当错误码为 EAGAIN 或 EWOULDBLOCK 时,则说明底层 TCP 的发送缓冲区已经满了,退出循环,下次再调用 send。当错误码为 EINTR 时,则说明收到了信号,继续循环调用 send。当错误等于其他时,则说明遇到了错误,调用 Excepter 函数,然后直接 return 返回。
  • 退出 while 循环后,如果 Connection 发送缓冲区中的数据都拷贝到底层 TCP 的发送缓冲区了,则关闭对该文件描述符写事件的关心。否则,保持对该文件描述符写事件的关心。

Excepter 函数的介绍

class TcpServer
{// ...   
private:void Excepter(Connection *conn){if (!ConnectionIsExists(conn->_sock))return;// 1. 从epoll中移除conn->_sockint n = _epoll.DelSockFromEpoll(conn->_sock);assert(n);(void)n;// 2. 从_connections中移除conn->_sock_connections.erase(conn->_sock);// 3. 释放conn所占用的资源(Connect的析构函数会关闭文件描述符)delete conn;}// ...
};
  • Excepter 函数的主要工作就是移除 epoll 模型对文件描述符的关心、从哈希表中删除文件描述符以及释放 Connection。

协议定制

#pragma once#include <iostream>
#include <vector>
#include <string>
#include <cstring>// 使用特殊字符作为分隔符来解决粘包问题,处理独立报文
#define SEP "X"
#define SEP_LEN strlen(SEP)
// 用于序列化和反序列化
#define SPACE " "
#define SPACE_LEN strlen(SPACE)// buffer: 输入输出型参数
// out: 输出型参数
// buffer中可能会存在多个报文,没有报文是以SEP作为分隔符的
// 我们想要将不同的报文切分好,放入到out中
void SpliteMessage(std::string& buffer, std::vector<std::string>& out)
{while(true){auto pos = buffer.find(SEP);if(pos == std::string::npos) break;std::string message = buffer.substr(0, pos);buffer.erase(0, pos + SEP_LEN);out.emplace_back(message);}    
}// 给报文加上分隔符
std::string Encode(std::string& s)
{return s + SEP;
}// 去除报文中的分割符
std::string Decode(std::string& s)
{auto pos = s.find(SEP);// s中没有一个完整的报文时,返回空串// s中有一个完整的报文时,返回该报文if(pos == std::string::npos) return "";std::string ret = s.substr(0, pos);s.erase(0, pos + SEP_LEN);return ret;
}// 网络版计算器的 Request 和 Response
class Request
{
public:// 序列化std::string Serialize(){std::string str;str = std::to_string(_x);str += SPACE;str += _op;str += SPACE;str += std::to_string(_y);return str;}// 反序列化bool Deserialized(const std::string &str) {std::size_t left = str.find(SPACE);if (left == std::string::npos)return false;std::size_t right = str.rfind(SPACE);if (right == std::string::npos)return false;_x = atoi(str.substr(0, left).c_str());_y = atoi(str.substr(right + SPACE_LEN).c_str());if (left + SPACE_LEN > str.size())return false;else_op = str[left + SPACE_LEN];return true;}public:Request(){}Request(int x, int y, char op) : _x(x), _y(y), _op(op){}~Request() {}public:int _x;   int _y;   char _op; 
};class Response
{
public:// 序列化std::string Serialize(){std::string s;s = std::to_string(_code);s += SPACE;s += std::to_string(_ret);return s;}// 反序列化bool Deserialized(const std::string &s){std::size_t pos = s.find(SPACE);if (pos == std::string::npos)return false;_code = atoi(s.substr(0, pos).c_str());_ret = atoi(s.substr(pos + SPACE_LEN).c_str());return true;}public:Response(){}Response(int ret, int code) : _ret(ret), _code(code){}~Response() {}public:int _ret; // 计算结果int _code;   // 计算结果的状态码
};

我们的业务还是一个网络版的计算器,我们需要定制协议来解决粘包问题。

  • SpliteMessage 函数的作用就是以 SEP 为分割符,将 Connection 的接受缓冲区中的数据分割成一个个的报文,然后交给业务层进行处理。
  • Encode 函数的作用是给报文加上分割符,以明确报文和报文之间的边界,以解决粘包问题。Decode 函数的作用是去除报文中的分割符并返回一个完整的报文,如果没有一个完整的报文,则返回空串。

服务端的编写

#include "TcpServer.hpp"
#include <memory>
#include <unordered_map>
#include <fstream>static Response Calculator(const Request &req)
{Response resp(0, 0);switch (req._op){case '+':resp._ret = req._x + req._y;break;case '-':resp._ret = req._x - req._y;break;case '*':resp._ret = req._x * req._y;break;case '/':if (req._y == 0)resp._code = 1; // 除零错误elseresp._ret = req._x / req._y;break;case '%':if (req._y == 0)resp._code = 2; // 模零错误elseresp._ret = req._x % req._y;break;default:resp._code = 3;break;}return resp;
}void NetCal(Connection* conn, std::string& request)
{logMessage(DEBUG, "NetCal Been Called, Get Request: %s", request.c_str());// 1. 反序列化Request req;if(!req.Deserialized(request)) {// 按道理来说,来到这里收到的一定是一个完整的报文// 但现在却反序列化失败,输出一下错误日志logMessage(ERROR, "Request Deserialized Error");return; }// 2. 业务处理Response resp = Calculator(req);// 3. 序列化,构建应答std::string sendStr = resp.Serialize();sendStr = Encode(sendStr);// 4. 业务层不需要关心数据如何发送给客户端,只需要将序// 列化后的应答交给TcpServer,让它将应答发送给客户端conn->_outBuffer += sendStr;// 一旦开启对EPOLLOUT的关心,epoll会立即触发一次发送事件就绪// 如果后续保持对EPOLL的关心,TcpServer会一直进行发送conn->_ptr->EnableReadWrite(conn, true, true);
}int main()
{std::unique_ptr<TcpServer> svr(new TcpServer());svr->Dispather(NetCal);return 0;
}

创建一个 TcpServer,然后调用它的 Dispatcher 函数。调用 Dispatcher 函数时,需要传入业务处理的回调函数。传入的回调函数不同,就可以处理不同的业务了。

客户端的编写

#include <iostream>
#include <string>
#include <ctime>
#include "Sock.hpp"
#include "Protocol.hpp"// 客户端调用
void Send(int sock, const std::string& sendStr)
{int n = send(sock, sendStr.c_str(), sendStr.size(), 0);if(n != sendStr.size())std::cerr << "Send Error" << std::endl; 
}// 客户端调用
bool Recv(int sock, std::string& out)
{char buffer[1024];while(true){ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);if(n > 0){buffer[n] = 0;out += buffer;}else if(n == 0){return false;}else{if(errno == EAGAIN || errno == EWOULDBLOCK){if(out.empty()) continue; // 如果没有收到响应,则继续对应 recv 函数return true;}else if(errno == EINTR){continue;}else{std::cerr << "Recv Error" << std::endl;return false;}}}
}static void Usage(char* proc)
{std::cout << "\nUsage: " << proc << "serverIp serverPort" << std::endl;
}// 网络版计算器客户端
int main(int argc, char* argv[])
{srand((unsigned)time(nullptr));if(argc != 3){Usage(argv[0]);exit(1);}int sock = Sock::Socket();if(!Sock::Connect(sock, std::string(argv[1]), atoi(argv[2]))){std::cerr << "Connect Error: " << errno << " " << strerror(errno) << std::endl; }const char* op = "+-*/%#";Sock::SetNonBlock(sock);std::string buffer;while(true){Request req;// req._x = rand() % 100;// req._y = rand() % 100;// req._op = op[rand() % 6];std::cout << "Please Enter x: ";std::cin >> req._x;std::cout << "Please Enter op: ";std::cin >> req._op;std::cout << "Please Enter y: ";std::cin >> req._y;// 序列化std::string sendStr = req.Serialize();sendStr = Encode(sendStr);// 发送请求Send(sock, sendStr);// 非阻塞读取全部数据bool ret = Recv(sock, buffer);if(!ret) break; // 服务器关闭连接或者Recv出错了std::string package = Decode(buffer);if(package.empty()) continue;// 接收到了一个完整的报文Response resp;resp.Deserialized(package);std::string err;switch(resp._code){case 1:err = "Division By Zero Error";break;case 2:err = "Modular Division By Zero Error";break;case 3:err = "Operation Error";break;default:std::cout << "Calculation Result: " << resp._ret << std::endl;break;}if(!err.empty()) std::cout << err << std::endl;}close(sock);return 0;
}

综合演示

在这里插入图片描述

将业务替换成在线词典

服务端

#define FILENAME "OnlineDict.txt"// 加载单词
void LoadWord(std::unordered_map<std::string, std::string>& m)
{std::ifstream ifs(FILENAME);std::string key, value;while(ifs >> key >> value){m[key] = value;}
}void OnlineDict(Connection* conn, std::string& request)
{static std::unordered_map<std::string, std::string> m;// 加载单词且只加载一次if(m.empty()) LoadWord(m);// 观察单词是否加载成功// for(auto& kv : m)// {//     std::cout << kv.first << ":" << kv.second << std::endl;// }logMessage(DEBUG, "NetCal Been Called, Get Request: %s", request.c_str());// request就是要查找的单词auto it = m.find(request);std::string sendStr;if(it == m.end())sendStr = request +  " Is Not Found";elsesendStr = request + " Means " + m[request];sendStr = Encode(sendStr); // 添加分隔符conn->_outBuffer += sendStr;// 一旦开启对EPOLLOUT的关心,epoll会立即触发一次发送事件就绪// 如果后续保持堆EPOLL的关心,TcpServer会一直进行发送conn->_ptr->EnableReadWrite(conn, true, true);
}int main()
{std::unique_ptr<TcpServer> svr(new TcpServer());svr->Dispacther(OnlineDict);return 0;
}

客户端

int main(int argc, char* argv[])
{srand((unsigned)time(nullptr));if(argc != 3){Usage(argv[0]);exit(1);}int sock = Sock::Socket();if(!Sock::Connect(sock, std::string(argv[1]), atoi(argv[2]))){std::cerr << "Connect Error: " << errno << " " << strerror(errno) << std::endl; }Sock::SetNonBlock(sock);std::string word;std::string buffer;while(true){std::cout << "Please Enter The Word: ";std::cin >> word;word = Encode(word);Send(sock, word);// 非阻塞读取全部数据bool ret = Recv(sock, buffer);if(!ret) break; // 服务器关闭连接或者Recv出错了std::string package = Decode(buffer);std::cout << package << std::endl;}return 0;
}

在这里插入图片描述

在这里插入图片描述

👉总结👈

本篇博客主要讲解了什么是 Reactor 模式、Reactor 模式的组件、Reactor 模式的工作流程以及基于 Reactor 模式的 TCP 服务器等等。以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家啦!💖💝❣️

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/1383376.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

如何练习打字/盲打(作者的感想与建议)

18年12月底的时候还不会盲打&#xff0c;在阅读了一些关于练习打字的文章之后&#xff0c;开始了自己的练字路途 打字会‘上瘾’&#xff01;在练习过程中一定要注意劳逸结合 一些基础的准备 typingclub https://www.typingclub.com/ typingclub是在线练习的网站 里面的图形…

使用metalink下载文件——以Sentinel-1数据为例

1、下载aria2&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1cqTBtKgmi3CGHkPl9pHfNw?pwd01g5 提取码&#xff1a;01g5 2、软件进行解压&#xff0c;进入ASF官网选取要下载的数据&#xff1a; 点击右下角“metalink”文件 3、将该文件放入aria2安装包内&#xff0c;…

文件(图片)上传下载(项目必备)

引入upload.html文件&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content&quo…

[转]TLM通信

一、概述 在芯片开发流程中&#xff0c;系统原型和芯片验证对项目的助推起到了关键作用。系统原型一般是通过硬件功能描述文档来模拟硬件行为&#xff0c;而行为要求不同于RTL模型。系统原型可以提供一个准确到硬件比特级别、按照地址段访问、不依赖于时钟周期的模型&#xff…

TLM通信(transaction level modle)

一、概述 在芯片开发流程中&#xff0c;系统原型和芯片验证对项目的助推起到了关键作用。系统原型一般是通过硬件功能描述文档来模拟硬件行为&#xff0c;而行为要求不同于RTL模型。系统原型可以提供一个准确到硬件比特级别、按照地址段访问、不依赖于时钟周期的模型&#xff…

TLM通信

一、概述 在芯片开发流程中&#xff0c;系统原型和芯片验证对项目的助推起到了关键作用。系统原型一般是通过硬件功能描述文档来模拟硬件行为&#xff0c;而行为要求不同于RTL模型。系统原型可以提供一个准确到硬件比特级别、按照地址段访问、不依赖于时钟周期的模型&#xff…

Typora如何把图片上传到图床smms.app

Typora 下载地址&#xff1a;百度云 官方下载 PicGo 下载地址&#xff1a;百度云 官方下载 免费图床 smms.app 访问地址&#xff1a;https://smms.app/ 用Typora平时做笔记挺好用&#xff0c;但是插入图片后&#xff0c;在公司保存好的md文件拿到家里的电脑打开的时候&a…

HM和VTM的下载与安装

一、HM安装及使用 1.下载HM 需要手动cmake&#xff1a; jvet / HM GitLab (fraunhofer.de) &#xff08;建议&#xff09;直接下载包含解决方案的代码&#xff08;用SVN下载&#xff09; svn_HEVCSoftware - Revision 4998: /tags (fraunhofer.de) SVN下载界面如下图&…

【雕爷学编程】Arduino动手做(09)---火焰传感器模块3

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…

mybatis-plus逻辑删除的坑

一旦在逻辑字段上加了TableLogic逻辑删除的配置&#xff0c;并且使用mybatis-plus自带的方法时&#xff08;如果自己用xml写SQL不会出现下面的情况&#xff09; 查询、修改时会自动排除逻辑删除的数据 当使用mybatis-plus自带的查询方法时&#xff0c;就不用每次查询的时候跟…

在Linux,误删磁盘分区怎么恢复呢【转】

在我们运维工作中&#xff0c;频繁的操作&#xff0c;可能命令写入错误&#xff0c;造成磁盘分区的删除&#xff0c;那么应该怎么办呢&#xff1f;怎么恢复磁盘分区呢&#xff1f; 一不小心删除了磁盘分区。如下图&#xff0c;删除了sda磁盘的第一个分区&#xff0c;为系统boot…

一、磁盘数据恢复实验报告

目录 “磁盘数据恢复”实验报告 一、【实验目的】 二、【实验设备】 三、【实验要求】 四、【实验步骤】 1.先创建虚拟磁盘 2.破坏磁盘数据文件。 3.进行恢复。 五、【实验总结】 “磁盘数据恢复”实验报告 一、【实验目的】 学习运用winhex的使用方法&#xff0c;掌…

服务器Raid5磁盘阵列数据恢复步骤和数据恢复方法

[磁盘阵列数据恢复故障描述] 客户的一台HP DL380 G4服务器&#xff0c;服务器使用hp smart array控制器挂载了一台国产磁盘阵列&#xff0c;磁盘阵列由14块146G SCSI硬盘组成一组RAID5。操作系统为LINUX&#xff0c;构建了NFSFTP&#xff0c;作为公司内部文件服务器使用。 由…

FIO入门到删盘 -- 恢复磁盘数据

FIO命令一般用于测裸盘的IO速度&#xff0c;有数据的硬盘不能用&#xff01;&#xff01;&#xff01; 一、背景 由于任务需要测试硬盘的IO随机读取和写入速度&#xff0c;因此上网找到了FIO命令&#xff0c;搜出来都是怎么用这个命令去测试硬盘&#xff0c;而并没有人强调这个…

虚拟机占用磁盘的恢复方式。

VMware虚拟机安装LINUX系统分盘后&#xff0c; 如果使用物理磁盘分盘会出现ext格式。 当移除虚拟机时&#xff0c;磁盘不会恢复&#xff0c;Windows系统不会识别此格式&#xff0c;分完的磁盘不能使用。 此图片是恢复后的&#xff0c;类似红框内的状态&#xff0c;鼠标右击磁盘…

恢复磁盘I:时出错,在此驱动器上找不到恢复密钥。无法解锁此驱动器。

BitLocker 概述 BitLocker 驱动器加密是一项数据保护功能&#xff0c;它与操作系统集成&#xff0c;用于解决来自丢失、被盗或销毁不当的计算机的数据被盗或泄露的威胁。 工具/软件/环境 Bitlocker加密程序(windows自带) 要恢复数据的故障介质 windows7(及以上)系统 加密时…

恢复磁盘原始空间大小

前情提要&#xff1a;使用树莓派烧录系统镜像文件时&#xff0c;不小心烧录错地方了&#xff0c;烧进了我的移动硬盘。 通常磁盘损坏恢复的都是数据&#xff0c;但是今天我遇到的问题是磁盘的空间大小由1T变成了43M&#xff0c;而且可见部分只有43M&#xff0c;格式化也不行。…

[ubuntu]创建root权限的用户 该用户登录后自动切换为root用户

一、创建新用户 1、创建新用户 sudo useradd -r -m -s /bin/bash 用户名 # -r&#xff1a;建立系统账号 -m&#xff1a;自动建立用户的登入目录 -s&#xff1a;指定用户登入后所使用的shell2、手动为用户设置密码 passwd 用户名 二、为用户增加root权限 1、添加写权限 ch…

关于file zilla连接服务器超时和无法列出目录的解决办法

这几天做项目&#xff0c;可是工作室的服务器一直连接不上&#xff0c;最开始是无法列出目录&#xff0c;找了好久找不到原因&#xff1b;以为是客户端的问题。又升级了file zilla,结果竟然更严重了&#xff0c;连接超时&#xff0c;连不上服务器了。 把它放了几天&#xff0c;…

File zilla远程连接服务器报错:服务器发回了不可路由的地址,使用服务器地址代替...

百度的答案都是&#xff1a;更改Filezilla设置&#xff0c;编辑-设置-连接-FTP-被动模式&#xff0c;将“使用服务器的外部ip地址来代替”改为“回到主动模式”即可。但问题没有解决&#xff01;&#xff01;&#xff01; 由于使用的是阿里云的服务器。安全组里面默认的端口都是…