四、单线程多路IO复用+多线程业务工作池

文章目录

  • 一、前言
    • 1 编译方法
  • 二、单线程多路IO复用+多线程业务工作池结构
  • 三、重写`Client_Context`类
  • 四、编写`Server`类

一、前言

我们以及讲完单线程多路IO复用 以及任务调度与执行的C++线程池,接下来我们就给他结合起来。

由于项目变大,尝试解耦项目,使用CMake ,可以看这篇文章现代CMake使用,使C++代码解耦

本节代码均可在仓库TinyWebServer 中找到

1 编译方法

# 进入Server目录下
mkdir build
cd build
cmake ..
cmake --build .

二、单线程多路IO复用+多线程业务工作池结构

简单来说就是把,读写任务交给线程池。

单线程多路IO复用+多线程业务工作池

三、重写Client_Context

上一节的Client_Context类并不能做到线程安全,以及管理客户端状态。所以做以下改变。

// client_context.hclass ClientContext {
public:ClientContext() : active(true) {}void pushMessage(const string &msg);bool hasMessages() const;string popMessage();void setWriteReady(bool ready);bool isWriteReady() const;bool isActive() const;void deactivate();private:queue<string> send_queue;	// 消息队列bool write_ready = false;	// 是否可写mutable mutex mtx;			// const下也可锁atomic<bool> active;		// 活跃检测
};
// client_context.cpp#include "client_context.h"// ClientContext implementation
void ClientContext::pushMessage(const string &msg) {lock_guard<mutex> lock(mtx);send_queue.push(msg);
}bool ClientContext::hasMessages() const {lock_guard<mutex> lock(mtx);return !send_queue.empty();
}
string ClientContext::popMessage() {lock_guard<mutex> lock(mtx);string msg = send_queue.front();send_queue.pop();return msg;
}
void ClientContext::setWriteReady(bool ready) {lock_guard<mutex> lock(mtx);write_ready = ready;
}
bool ClientContext::isWriteReady() const {lock_guard<mutex> lock(mtx);return write_ready;
}
bool ClientContext::isActive() const { return active; }
void ClientContext::deactivate() { active = false; }

四、编写Server

封装成类,隐藏细节。

#pragma once
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <memory>
#include <unordered_map>#include "client_context.h"
#include "thread_pool.h"const int MAX_EVENTS = 10;
const int BUFFER_SIZE = 1024;class Server {
public:Server(int port);void run();private:void handleNewConnection();void handleClientEvent(epoll_event &event);void handleRead(int client_fd);void handleWrite(int client_fd);void removeClient(int client_fd);void modifyEpollEvent(int fd, uint32_t events);int server_fd;int epoll_fd;ThreadPool pool;unordered_map<int, shared_ptr<ClientContext>> clients;mutex clients_mutex;
};
#include "server.h"
#include <cstdint>// Server implementation
Server::Server(int port) : pool(4) {server_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (server_fd < 0) {throw runtime_error("Socket creation failed");}sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(port);if (bind(server_fd, (sockaddr *)&server_addr, sizeof(server_addr)) < 0) {close(server_fd);throw runtime_error("Bind failed");}if (listen(server_fd, SOMAXCONN) < 0) {close(server_fd);throw runtime_error("Listen failed");}epoll_fd = epoll_create1(0);if (epoll_fd < 0) {close(server_fd);throw runtime_error("epoll_create1 failed");}epoll_event event;event.data.fd = server_fd;event.events = EPOLLIN | EPOLLET;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) < 0) {close(server_fd);close(epoll_fd);throw runtime_error("epoll_ctl failed");}
}void Server::run() {vector<epoll_event> events(MAX_EVENTS);while (true) {int event_count = epoll_wait(epoll_fd, events.data(), MAX_EVENTS, -1);if (event_count < 0) {cerr << "epoll_wait failed: " << endl;break;}for (int i = 0; i < event_count; i++) {if (events[i].data.fd == server_fd) {handleNewConnection();} else {handleClientEvent(events[i]);}}}
}void Server::handleNewConnection() {while (true) {sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);int client_fd = accept4(server_fd, (sockaddr *)&client_addr, &client_len, SOCK_NONBLOCK);if (client_fd < 0) {if (errno == EAGAIN || EWOULDBLOCK) {cout << "No more new connections to accept" << endl;break;} else {cerr << "Accept failed: " << endl;break;}}epoll_event event;event.data.fd = client_fd;event.events = EPOLLIN | EPOLLET;int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event);if (epoll_ctl_result < 0) {cerr << "epoll_ctl failed for client socket: " << endl;close(client_fd);} else {{lock_guard<mutex> lock(clients_mutex);clients[client_fd] = make_shared<ClientContext>();}}}
}void Server::handleClientEvent(epoll_event &event) {int client_fd = event.data.fd;if (event.events & (EPOLLERR | EPOLLHUP)) {cout << "Error or hangup event for client: " << client_fd << endl;removeClient(client_fd);} else {if (event.events & EPOLLIN) handleRead(client_fd);if (event.events & EPOLLOUT) handleWrite(client_fd);   }
}void Server::handleRead(int client_fd) {pool.enqueue([this, client_fd] {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it == clients.end() || !it->second->isActive()) {cout << "Client " << client_fd << " not found or not active, skipping read handling" << endl;return;}client = it->second;}string buffer(BUFFER_SIZE, 0);while (true) {int read_len = read(client_fd, buffer.data(), buffer.size());if (read_len < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK)break;else {cerr << "Read failed on socket " << client_fd << endl;removeClient(client_fd);break;}} else if (read_len == 0) {cout << "Client disconnected: " << client_fd << endl;removeClient(client_fd);break;} else {cout << "Received from client " << client_fd << ": " << buffer.substr(0, read_len) << endl;string message = "Echo: " + buffer.substr(0, read_len);client->pushMessage(message);client->setWriteReady(true);modifyEpollEvent(client_fd, EPOLLIN | EPOLLOUT);}}});
}void Server::handleWrite(int client_fd) {pool.enqueue([this, client_fd] {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it == clients.end() || !it->second->isActive()) {cout << "Client " << client_fd << " not found or not active, skipping write handling" << endl;return;}client = it->second;}if (!client->isWriteReady()) return;bool keep_writing = true;while (keep_writing && client->hasMessages()) {string message = client->popMessage();size_t total_sent = 0;while (total_sent < message.size()) {int write_len = write(client_fd, message.data() + total_sent, message.size() - total_sent);if (write_len < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {client->pushMessage(message.substr(total_sent));keep_writing = false;break;} else {cerr << "Write error on socket " << client_fd << endl;removeClient(client_fd);return;}} else total_sent += write_len;}if (total_sent == message.size()) cout << "Sent to client " << client_fd << ": " << message << endl;}if (!client->hasMessages()) {client->setWriteReady(false);modifyEpollEvent(client_fd, EPOLLIN);}});
}void Server::removeClient(int client_fd) {shared_ptr<ClientContext> client;{lock_guard<mutex> lock(clients_mutex);auto it = clients.find(client_fd);if (it != clients.end()) {client = it->second;clients.erase(it);}}if (client) {client->deactivate();int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);if (epoll_ctl_result < 0) cerr << "Failed to remove client from epoll: " << endl;close(client_fd);}
}void Server::modifyEpollEvent(int fd, uint32_t events) {epoll_event event;event.data.fd = fd;event.events = events | EPOLLET;if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0) cerr << "Failed to modify epoll event for fd " << fd << endl;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3267062.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

谷粒商城实战笔记-66-商品服务-API-品牌管理-JSR303数据校验

文章目录 一&#xff0c;引入JSR 303依赖二&#xff0c;接口参数启用校验功能三&#xff0c;给字段添加校验注解NotBlank 和 NotNull 的区别NotBlankNotNull比较 四&#xff0c;BindingResult获取校验结果五&#xff0c;自定义错误消息六&#xff0c;其他校验规则 在Web应用程序…

【AIGC】构建自己的谷歌搜索引擎服务并使用

一、谷歌 谷歌的搜索引擎需要自己创建服务才能启用检索api。&#xff08;需自行翻墙和创建自己的谷歌账号&#xff09; 1.1 API服务创建 1&#xff09;登陆https://console.cloud.google.com/: 2&#xff09; 选择新建项目&#xff0c;取号项目名即可&#xff08;比如:Olin…

【Java】/* 异常 */

目录 ​编辑 一、错误和异常的体系 二、异常 2.1 异常的分类 2.2 异常的处理形式 2.3 如何抛出异常 2.4 捕获处理异常 2.5 finally存在的意义 2.6 异常处理流程总结 2.7 自定义异常类 一、错误和异常的体系 1. Java中所有的异常和错误都有对应的类来进行描述。 2. 由…

【日常记录】【JS】JS中查询参数处理工具URLSearchParams

文章目录 1. 引言2. URLSearchParams2.1 URLSearchParams 的构造函数2.2 append() 方法2.3 delete() 方法2.4 entries() 方法2.5 forEach() 方法2.6 get() 方法2.7 getAll() 方法2.8 has() 方法2.9 keys() 方法2.10 set() 方法2.11 toString() 方法2.12 values() 方法 参考链接…

Protobuf序列化原理学习

Protobuf 序列化原理学习 1. 基本概念 消息&#xff1a;由于Protocp Buffer主要用于数据存储、网络通信的场景&#xff0c;将结构化的数据&#xff08;数据结构或对象&#xff09;进行序列化&#xff0c;生成二进制串来保存或传输。把要序列化的结构化数据称为**消息**。 T -…

Activiti学习之入门个人任务(07)

这里写目录标题 一、分配任务负责人1.1 固定分配1.2 表达式分配1.2.1 UEL表达式1.2.2 使用流程变量分配任务1.2.3 注意事项 1.3 监听器分配 二、查询任务2.1 查询负责人待办2.2 关联businessKey 三、办理任务 一、分配任务负责人 1.1 固定分配 在进行业务流程建模时指定固定的…

vdb:虚拟数据库

将文件虚拟成数据库&#xff0c;序列化写入、反序列化读取、直接读取。

离散型制造业中,MES系统的应用场景

在离散型制造业中&#xff0c;MES&#xff08;制造执行系统&#xff09;系统的应用场景极为丰富且关键&#xff0c;它贯穿于整个生产过程的各个环节&#xff0c;从生产计划到生产执行&#xff0c;再到质量控制和物料管理&#xff0c;都发挥着不可替代的作用。以下是离散制造业中…

本地化部署一个简单的AI大模型,Llama3.1

7 月 23 日消息&#xff0c;Meta 今晚正式发布llama3.1&#xff0c;提供 8B、70B 及 405B 参数版本。 Meta 称 4050 亿参数的 Llama 3.1-405B 在常识、可引导性、数学、工具使用和多语言翻译等一系列任务中&#xff0c;可与 GPT-4、GPT-4o、Claude 3.5 Sonnet 等领先的闭源模型…

收藏:高性价比https证书

在当今的数字化世界中&#xff0c;网络安全已经成为了每个网站所有者的首要关注点&#xff0c;为了保护网站的安全&#xff0c;防止数据被窃取或篡改&#xff0c;使用SSL证书已经成为了一种标准的做法&#xff0c;SSL证书是一种用于加密网站和用户之间数据传输的证书&#xff0…

[240726] Mistral AI 发布新一代旗舰模型 | Node.js 合并 TypeScript 文件执行提案

目录 Mistral AI 发布新一代旗舰模型&#xff1a;Mistral Large 2Node.js 合并 TypeScript 文件执行提案&#xff1a;--experimental-strip-types Mistral AI 发布新一代旗舰模型&#xff1a;Mistral Large 2 Mistral AI 宣布推出新一代旗舰模型 Mistral Large 2&#xff0c;该…

2024新版 黑马程序员《C++零基础入门》笔记——第一章19 cin中文乱码的解决

1.cin中文乱码的解决 2.代码实践 # include "iostream" # include "windows.h" using namespace std;int main() {SetConsoleOutputCP(CP_UTF8);string str;cin >> str;cout << str << endl;return 0; } 注意&#xff0c;勾选之后以后也…

【echarts】中如何设置曲线展示最新值、最大值、最小值

需要用到的属性&#xff1a;图表标注 series-line. markPoint 默认可以通过 type直接标注&#xff1a;‘min’ 最小值、‘max’ 最大值、‘average’ 平均值。 markPoint: {data: [{type: max},{type: min}]}如何展示最新值 如果要展示最新值得话&#xff0c;需要设置 标注…

如何让C++程序自动生成dump文件?以及如何分析dump文件?

目录 1、API函数SetUnhandledExceptionFilter介绍 2、调用SetUnhandledExceptionFilter设置异常处理函数 3、调用MiniDumpWriteDump函数导出包含异常上下文的dump文件 4、dump文件的多种生成方式 5、使用Windbg分析dump文件 6、最后 C++软件异常排查从入门到精通系列教程…

数业智能心大陆:定制你的专属心理健康方案

在快速变化的社会中&#xff0c;随着人们对自我健康认识的不断加深&#xff0c;心理健康已成为影响生活质量的关键因素&#xff0c;许多成年人在其一生中会遇到心理健康问题。在探索人类心理奥秘的旅程中&#xff0c;我们发现&#xff0c;每个人的心理状态和需求都是独一无二的…

mmdetection训练后评估指标,验证Loss

项目场景&#xff1a; 对mmdetection框架下训练好的log.json文件进行评估。 问题描述 使用框架底下自带的评估文件&#xff0c;不能对loss进行评估。也就是文件&#xff1a;tools/analysis_tools/analyze_logs.py 解决方案&#xff1a; 自己做了评估loss的代码&#xff0c;目…

使用nginx解决本地环境访问线上接口跨域问题

前言 前端项目开发过程中&#xff0c;经常会遇到各种各样的跨域问题。 虽然大部分时候&#xff0c;由脚手架自带的proxy功能即可解决问题&#xff0c;如webpack&#xff0c;vite等&#xff1b;但是若没有通过脚手架搭建项目&#xff0c;或者必须使用某些特殊规则转发时&#…

C语言常见字符函数和字符串函数精讲

目录 引言 一、字符函数 1.字符分类函数 2.字符转换函数 二、字符串函数 1.gets、puts 2.strlen 3.strcpy 4.strncpy 5.strcat 6.strncat 7.strcmp 8.strncmp 9.strstr 10.strchr 11.strtok 12.strlwr 13.strupr 引言 在C语言编程中&#xff0c;字符函数…

Python小工具——监听某网站的数据变化并进行邮件通知

目录 一、需求描述 二、解析 三、实例代码 一、需求描述 监听自考网2024年广东省6月份的毕业生学历注册进度&#xff0c;这是网址&#xff1a;https://www.chsi.com.cn/xlcx/count_zk.jsp&#xff0c; 如上图所示&#xff0c;我们想知道这个红色的空格啥时候被填满&#xf…

Linux下使用gdb进行调试入门级

个人名片&#xff1a; &#x1f393;作者简介&#xff1a;嵌入式领域优质创作者&#x1f310;个人主页&#xff1a;妄北y &#x1f4de;个人QQ&#xff1a;2061314755 &#x1f48c;个人邮箱&#xff1a;[mailto:2061314755qq.com] &#x1f4f1;个人微信&#xff1a;Vir2025WB…