什么是线程池
线程池其实就是一种多线程处理形式,处理过程中可以将任务添加到队列中,然后在创建线程后自动启动这些任务。
为什么要引入线程池
- 在单线程的场景中,如在服务端的服务中,要对多个客户端进行服务,单个线程会阻塞住,使效率大大降低,这时候就要多个线程,
- 然而在使用多个线程的时候,来一个任务,就要创建一个线程去服务,当客户端退出又需要销毁线程,而频繁的创建线程销毁线程效率很低,另一方面,
- 如果一次有很多个线程,计算机当中的线程越多,CPU的压力就越大,因为CPU要不断在这些线程之间来回切换,此时CPU在调度线程的时候,线程和线程之间切换的成本就会变得很高。此外,一旦线程太多,每一个线程再次被调度的周期就变长了,而线程是为客户端提供服务的,线程被调度的周期变长,客户端也迟迟得不到应答。
- 所以这里可以引入线程池,用固定数量的线程来处理任务,而这些任务由一个任务队列来管理,当有新的任务到来的时候,就可以将任务Push到线程池当中,如果线程池当中没有任务那么当前线程就会进入休眠状态。
线程池的应用场景
- 12306网上购票系统等
- 云盘文件上传和下载
- 网购
线程池的代码实现
- 线程池中的多个线程负责从任务队列当中拿任务,并将拿到的任务进行处理。
- 线程池对外提供一个pushtask接口,用于让外部线程能够将任务push到任务队列当中。
- 任务类中能够在接收任务后对任务进行处理
- 任务队列可能会被多个线程同时访问,所以在对任务队列操作引入互斥锁
任务类的实现
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>class Task
{
public:Task(){}Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0){}std::string formatArg(){return std::to_string(_x) + _op + std::to_string(_y) + "= ";}std::string formatRes(){return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";}void run(){switch (_op){case '+':_result = _x + _y;break;case '-':_result = _x - _y;break;case '*':_result = _x * _y;break;case '/':{if (_y == 0)_exitCode = -1;else_result = _x / _y;}break;case '%':{if (_y == 0)_exitCode = -2;else_result = _x % _y;}break;default:break;}usleep(100000);std::cout<<formatArg()<<formatRes()<<std::endl;}~Task(){}private:int _x;int _y;char _op;int _result;int _exitCode;
};
线程池的实现
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"const static int N = 5;template <class T>
class ThreadPool
{
public:ThreadPool(int num = N) : _num(num), _threads(num){pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_cond, nullptr);}void lockQueue(){pthread_mutex_lock(&_lock);}void unlockQueue(){pthread_mutex_unlock(&_lock);}void threadWait(){pthread_cond_wait(&_cond, &_lock);}void threadWakeup(){pthread_cond_signal(&_cond);}bool isEmpty(){return _tasks.empty();}T popTask(){T t = _tasks.front();_tasks.pop();return t;}static void *threadRoutine(void *args){pthread_detach(pthread_self());ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);while (true){// 1. 检测有没有任务// 2. 有:处理// 3. 无:等待// 细节:必定加锁tp->lockQueue();while (tp->isEmpty()){tp->threadWait();}T t = tp->popTask(); // 从公共区域拿到私有区域tp->unlockQueue();t.run(); // 处理任务,应不应该在临界区中处理?1,0}}void init(){// TODO}void start(){for (int i = 0; i < _num; i++){pthread_create(&_threads[i], nullptr, threadRoutine, this); // ?}}void pushTask(const T &t){lockQueue();_tasks.push(t);threadWakeup();unlockQueue();}~ThreadPool(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_cond);}private:std::vector<pthread_t> _threads;int _num;std::queue<T> _tasks; // 使用stl的自动扩容的特性pthread_mutex_t _lock;pthread_cond_t _cond;
};
测试代码
#include"ThreadPool.hpp"
#include<memory>
int main(){std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());tp->init();tp->start();const char* op = "+-*/%";for( ;;){sleep(1);int x = rand() % 100;int y = rand() % 100;int index = rand() % 5;Task task(x, y, op[index]);tp->pushTask(task);}
}
拓展操作
- 封装一个线程类
- 封装一个互斥锁类,在构造的时候加锁,析构的时候解锁
- 用单例模式来操作线程池
拓展后的代码
线程类
#pragma once#include <iostream>
#include <string>
#include <cstdlib>
#include <pthread.h>class Thread
{
public:typedef enum{NEW = 0,RUNNING,EXITED} ThreadStatus;typedef void (*func_t)(void *);public:Thread(int num, func_t func, void *args) : _tid(0), _status(NEW), _func(func), _args(args){char name[128];snprintf(name, sizeof(name), "thread-%d", num);_name = name;}int status() { return _status; }std::string threadname() { return _name; }pthread_t threadid(){if (_status == RUNNING)return _tid;else{return 0;}}// 是不是类的成员函数,而类的成员函数,具有默认参数this,需要static// 但是会有新的问题:static成员函数,无法直接访问类属性和其他成员函数static void *runHelper(void *args){Thread *ts = (Thread*)args; // 就拿到了当前对象// _func(_args);(*ts)();return nullptr;}void operator ()() //仿函数{if(_func != nullptr) _func(_args);}void run(){int n = pthread_create(&_tid, nullptr, runHelper, this);if(n != 0) exit(1);_status = RUNNING;}void join(){int n = pthread_join(_tid, nullptr);if( n != 0){std::cerr << "main thread join thread " << _name << " error" << std::endl;return;}_status = EXITED;}~Thread(){}private:pthread_t _tid;std::string _name;func_t _func; // 线程未来要执行的回调void *_args;ThreadStatus _status;
};
互斥锁类
#pragma once#include <iostream>
#include <pthread.h>class Mutex // 自己不维护锁,有外部传入
{
public:Mutex(pthread_mutex_t *mutex):_pmutex(mutex){}void lock(){pthread_mutex_lock(_pmutex);}void unlock(){pthread_mutex_unlock(_pmutex);}~Mutex(){}
private:pthread_mutex_t *_pmutex;
};class LockGuard // 自己不维护锁,有外部传入
{
public:LockGuard(pthread_mutex_t *mutex):_mutex(mutex){_mutex.lock();}~LockGuard(){_mutex.unlock();}
private:Mutex _mutex;
};
任务类
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>class Task
{
public:Task(){}Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0){}void operator()(){switch (_op){case '+':_result = _x + _y;break;case '-':_result = _x - _y;break;case '*':_result = _x * _y;break;case '/':{if (_y == 0)_exitCode = -1;else_result = _x / _y;}break;case '%':{if (_y == 0)_exitCode = -2;else_result = _x % _y;}break;default:break;}usleep(100000);}std::string formatArg(){return std::to_string(_x) + _op + std::to_string(_y) + "= ?";}std::string formatRes(){return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";}~Task(){}private:int _x;int _y;char _op;int _result;int _exitCode;
};
线程池类
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include "Thread.hpp"
#include "Task.hpp"
#include "lockGuard.hpp"const static int N = 5;template <class T>
class ThreadPool
{
private:ThreadPool(int num = N) : _num(num){pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool<T> &tp) = delete;void operator=(const ThreadPool<T> &tp) = delete;public:static ThreadPool<T> *getinstance(){if(nullptr == instance) // 为什么要这样?提高效率,减少加锁的次数!{LockGuard lockguard(&instance_lock);if (nullptr == instance){instance = new ThreadPool<T>();instance->init();instance->start();}}return instance;}pthread_mutex_t *getlock(){return &_lock;}void threadWait(){pthread_cond_wait(&_cond, &_lock);}void threadWakeup(){pthread_cond_signal(&_cond);}bool isEmpty(){return _tasks.empty();}T popTask(){T t = _tasks.front();_tasks.pop();return t;}static void threadRoutine(void *args){// pthread_detach(pthread_self());ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);while (true){// 1. 检测有没有任务// 2. 有:处理// 3. 无:等待// 细节:必定加锁T t;{LockGuard lockguard(tp->getlock());while (tp->isEmpty()){tp->threadWait();}t = tp->popTask(); // 从公共区域拿到私有区域}// for testt();std::cout << "thread handler done, result: " << t.formatRes() << std::endl;// t.run(); // 处理任务,应不应该在临界区中处理?1,0}}void init(){for (int i = 0; i < _num; i++){_threads.push_back(Thread(i, threadRoutine, this));}}void start(){for (auto &t : _threads){t.run();}}void check(){for (auto &t : _threads){std::cout << t.threadname() << " running..." << std::endl;}}void pushTask(const T &t){LockGuard lockgrard(&_lock);_tasks.push(t);threadWakeup();}~ThreadPool(){for (auto &t : _threads){t.join();}pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_cond);}private:std::vector<Thread> _threads;int _num;std::queue<T> _tasks; // 使用stl的自动扩容的特性pthread_mutex_t _lock;pthread_cond_t _cond;static ThreadPool<T> *instance;static pthread_mutex_t instance_lock;
};template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;template <class T>
pthread_mutex_t ThreadPool<T>::instance_lock = PTHREAD_MUTEX_INITIALIZER;
测试代码
#include "ThreadPoolEnd.hpp"
#include "Task.hpp"
#include <memory>int main()
{printf("0X%x\n", ThreadPool<Task>::getinstance());//打印单例的地址printf("0X%x\n", ThreadPool<Task>::getinstance());printf("0X%x\n", ThreadPool<Task>::getinstance());printf("0X%x\n", ThreadPool<Task>::getinstance());printf("0X%x\n", ThreadPool<Task>::getinstance());printf("0X%x\n", ThreadPool<Task>::getinstance());const char* op = "+-*/%";while (true){sleep(1);int x = rand() % 100;int y = rand() % 100;int index = rand() % 5;Task t(x, y, op[index]);ThreadPool<Task>::getinstance()->pushTask(t); //单例对象也有可能在多线程场景中使用!}
}