文章目录
- 引言
- 基础知识
- 线程池原理
- Executor框架
- Executor框架的两级调度模型
- Executor框架结构
- Executor框架成员
- ThreadPoolExecutor详解——这里简单过一下,知道原理即可
- FixedThreadPool简介
- SingleThreadExecutor
- CachedThreadPool
- ScheduledThreadPoolExecutor详解——简答了解即可
- FutureTask详解——看一下
- FutureTask简介
- FutureTask的使用
- 面试问题
- 线程池手撕算法——对一批任务按照批次按照顺序执行
- 个人实现
- 参考实现
- 总结
引言
- 关于线程池,已经遇到了好几次
- 第一次是学习spring的时候
- 第二次是多并发编程练习题
- 第三次是项目中应用的线程池调优
- 但是一直没有深入学习过,今天补充一下!
- 还是按照以前的套路,先是过一遍基础知识,然后在针对面试题总结一下!
基础知识
线程池原理
线程池的作用
- 降低资源消耗:重复利用已经创建的线程,降低线程创建和线程销毁的消耗
- 提高响应速度:当任务达到的时候,不需要在经历等待线程创建的过程
- 提高线程可管理性:线程是稀缺资源,无限制的创建会消耗自愿,降低系统的稳定性,使用线程池可以进行统一分配、监控和调优
线程池的实现原理
- 当一个任务提交到线程池之后,线程池处理任务的流程如下
- 首先判断核心线程池是否已满
- 核心线程池未满,直接创建新线程来执行任务
- 核心线程池满了,进行下一步判断
- 然后判断任务队列是否已经满了
- 任务队列未满,将任务放入队列中,结束
- 任务队列已满,进行下一步判断
- 最后判断线程池是否已经满了
- 线程池没有满,创建新的线程来执行任务
- 线程池满了,按照饱和策略来处理任务
- 首先判断核心线程池是否已满
核心线程池和线程池的区别
- 核心线程池中的线程是正式员工,专门用来处理任务队列中的任务,并且不会被销毁
- 线程池(非核心线程池)是外包员工,在没有任务并且空闲一段时间后,会被销毁。
Executor框架
Executor框架的两级调度模型
- JVM的线程和操作系统的线程关系
- Java线程被一对一映射为本地操作系统线程
- Java线程启动的时候,会创建一个本地操作系统线程
- Java线程终止时,对应的操作系统线程会被回收的
- 所有线程是由操作系统调用和分配CPU
- Java线程被一对一映射为本地操作系统线程
- Executor作用
- 是用户级的调度器,将多线程程序中的若干任务映射为固定数量的线程
- 底层是由操作系统内核检线程映射到CPU上。
Executor框架结构
- 任务
- 执行任务需要实现的接口,Runnable接口和Callable接口
- 继承并实现上述两个接口的具体的任务都能够被下述Executor接口的实现类执行
- 执行任务需要实现的接口,Runnable接口和Callable接口
- 任务执行
- 任务执行机制的核心接口Executor
- 继承自Executor的ExecutorService接口,主要是以下两个实现类
- ThreadPoolExecutor
- 线程池的核心实现类,用来执行被提交的任务
- ScheduledThreadPoolExecutor
- 线程池的实现类,能够实现定时执行的任务的状态
- ThreadPoolExecutor
- 异步计算的结果
- 接口Future和实现Future接口的FutureTask类
流程分析
-
主线程创建实现Runnable接口和Callable接口的任务对象。
-
将创建的任务对象直接交给ExecutorService执行,有两种方式
- 直接执行
- 将Runnable对象直接交给ExecutorService执行,ExecutorService.execute()
- 提交执行
- 将Runnable或者Callbale对爱那个,提交给ExecutorService自己执行,ExecutorService.submit()
- 直接执行
-
主线程执行FutureTask.**get()**方法来等待任务执行完成
- 调用FutureTask.cancel取消任务的执行
-
具体流程框架图见下图
Executor框架成员
ThreadPoolExecutor线程池实现类
该类是Executor最核心的实现类,包括三个具体的实现类,FixedThreadPool、SingleThreadExecutor和CachedThreadPool
-
FixedThreadpool
- 固定数量线程池
- 适用于为了满足资源管理的需求,限制当前线程数量的应用场景
- 适用于负载比较重的服务器,CPU负载高或者内存占用率高
-
SingleThreadExecutor
- 使用单个线程的线程池,
- 适用于需要保证顺序执行各个任务,并且在多个时间点,不会有多个线程是活动的应用场景。
-
CachedThreadPool
- 会根据需要创建新线程的线程池,是大小无界的线程池
- 适用于执行很多短期一步任务的小程序
- 适用于负载较轻的服务器
ScheduledThreadPoolExecutor线程实现类
用于定时任务或者周期任务的线程池,主要有两个实现类,分别是ScheduledThreadPoolExecutor和SingleThreadScheduledExectuor
-
ScheduledThreadPoolExecutor
- 适用于需要多个后台线程执行周期任务
- 为了满足资源管理的需求,需要限制后台线程数量的应用场景。
-
SingleThreadScheduledExectuor
- 只包含一个线程
- 适用于需要单个后台线程执行周期任务,同事需要保证顺序执行各个任务的应用场景。
Future接口
Future接口和FutureTask用来表示异步计算的结果,线程池执行完毕后,返回的是实现了Future接口的对象返回值
- 提交Runnable接口的对象时,返回Future<?>的占位符,检查计算是否完成的,没有实际值
- 提交Callable接口的对象时,submit(Callable task)方法会返回一个Future对象
Runnable接口和Callable接口
- Runnable接口没有返回值
- Callable有返回值
ThreadPoolExecutor详解——这里简单过一下,知道原理即可
主要是四个组件,这里要结合上一个小节的基本流程进行学习
-
corePoolSize核心线程池大小
- 只要核心线程数量小于corePoolSize,每次提交任务都会创建线程去执行
-
blockingQueue任务队列
- 保持等待执行的任务的阻塞队列,常见以下四种队列
- ArrayBlockingQueue:数组阻塞队列
- LinkedBlockingQueue:基于链表,FixedThreadPool使用这个结构
- SynchronousQueue:不存储元素,插入就调用,否则阻塞,CachedThreadPool使用
- PriorityBlockingQueu:优先队列
- 保持等待执行的任务的阻塞队列,常见以下四种队列
-
maximumPoolSize最大线程池数量
- 允许创建的最大线程数,包括普通线程数
- 队列满了才开始创建普通线程,同时创建线程数小于该指标
-
RejectedExecutionHandler饱和策略
- 队列和线程池都满了,才会采用,常见以下4种
- AbortPolicy:直接抛出异常
- CallerRunsPolicy:使用调用者的线程运行任务
- DiscardOldestPolicy:丢弃队列中最旧的任务,充实提交当前任务
- DiscardPolicy:直接丢弃,不处理
- 队列和线程池都满了,才会采用,常见以下4种
-
线程图示意图如下,这里再结合之前讲的过程再回顾一遍
- 1表示当前运行的线程少于corePool,创建核心线程来完成任务
- 2表示当前运行的线程数量多余corePool,这里 将新创建的任务加入到BlockingQueue中,由核心线程继续执行
- 3表示blockingQueue已经满了,那么就创建(临时工)普通线程,来完成任务
- 4表示创建的普通线程数量超过了maxinumPool,采用饱和策略进行处理
FixedThreadPool简介
public static ExecutorService newFixedThreadPool(int nThreads)
- FixedThreadPool的corePool和maximumPool的初始化,都是在创建的时候就指定为nThreads
- 使用无界队列LinkedBlockingQueue,不会拒绝任务
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor()
- corePoolSize和maxmumPoolSize都是1
- 使用无界队列LinkedBlockingQueue作为线程池的工作队列
CachedThreadPool
public static ExecutorService newCachedThreadPool();
- corePoolSize为0,然后maximumPoolSize是Max_value,所以遇到一个新的任务就会创建一个线程,然后超时了直接删除。
- 使用SynchronousQueue作为线程池的工作队列,但是没有容量限制。只要交了任务就会创建线程处理。
ScheduledThreadPoolExecutor详解——简答了解即可
不同于之前的ThreadPoolExecutor,这个ScheduledThreadPoolExecutor是一下三个部分构成
- DelayQueue:无界队列,按照任务的时序字段进行排列。
- 任务添加模块:这里scheduledAtFixedRate()函数和 scheduledAtFixedDealy()函数按照固定的时限往delayQueue中添加任务
- CorePool:核心线程池,这里只有这一个,没有其他的maximumPoolSize,直接从DelayQueue中获取任务。
执行步骤 - 1核心线程池中的线程,从DelayQueue队列中获取已经到期的任务,任务的执行时间 <= 当前的时间
- 2线程开始执行这个任务
- 3线程修改当前任务的time变量,改为下次执行的之间,并且任务放回队列中
这里两个函数的一个是获取任务还有添加任务,就不看了,没在关于线程池的面试体中看到过,而且不是很好记忆!
FutureTask详解——看一下
- FutureTask是Future的基础实现,用来获取任务执行结果
- 如果任务尚未完,获取任务执行结果时,会陷入阻塞
- 线程安全是由CAS保证
FutureTask简介
FutureTask三种状态
- 未启动:创建一个FutureTask,没调用Run方法
- 已启动:调用run方法,在执行过程中
- 已完成:run方法完成之后,或者被取消了cancel,
常见两个操作get获取结果和cancel取消给线程状态变化
- get方法:
- 未启动状态或者已经启动,没有出结果的状态,线程阻塞
- 完成了,直接返回结果或者抛出异常
- cancel方法
- 未启动状态,cancel让这个任务永远不会被执行
- 已启动状态,cancel(true),将使用中断的方式停止任务
- cancel(false),等他执行完毕
- 已完成状态,cancel返回false
FutureTask的使用
主要的5个方法,具体如下
- cancel(true / false)
- 如果在运行中,true中断返回,false等他运行完了再返回
- 其余状态就是正常,不会让这个任务执行。
- isCancelled()
- 正常结束前,终止返回,就是返回true
- isDone()
- 任务有没有做,正常、异常或者取消导致任务完成,都会返回true
- get()
- 阻塞等到任务结束,获取结果
- 等待中断,抛出InterruptedException
- 取消中断,抛出CancellationException
- 异常中断,抛出TimeoutException
- get(timeout,timeunit)
- 规定时间内完成并返回结果,否则抛出超时异常
内部状态转换
- FutureTask所有方法都是围绕State展开的,该变量是volatile,使用不同数字表示状态,这里了解一下
面试问题
线程池手撕算法——对一批任务按照批次按照顺序执行
题目描述
- 有一批任务,需要实现按批次执行,并且批次可以动态指定,例如[2,4,6,8]第一批执行,[12,14,16,18]第二批执行,。。。。,最后没有指定的任务最后一起执行掉。批次之间按顺序执行,前一批执行完了,才能执行下一批。
个人实现
- 这个题目完全不会,要是前面的synchronized或者reentrantlock等,还是会做的,但是这个题目,是一点都不会做,但是不妨碍我进行初步分析一下,用到了那些知识!
- 这里要保证批次的顺序,就不能使用ThreadPool:
- 使用FixedThreadPool后面就不能保证任务执行的顺序
- SingleThreadExecutor看起来行的,但是这个和主线程执行有什么区别,感觉意义不大
- CachedThreadPool是给一个任务创建一个线程,但是线程执行有先后的话,并不能严格保证批次的顺序执行
- 还是得使用ScheduledThreadPoolExecutor
- 指定同一批次使用的时间是相同的,然后不同的批次之间增加一个任务间隔就行了。
代码补充学习
定义任务
static class Task implements Runnable{private int id;public Task(int id){this.id = id;}@Overridepublic void run(){System.out.println("Task "+id+" is running");}}
定义corePoolSize为10的ScheduledThreadPoolExecutor
- 添加任务,并指定任务的执行时间
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit
ScheduledExecutorService batchExecutor = Executors.newScheduledThreadPool(corePoolSize);
batchExecutor.schdule(task,delay,TimeUnit.SECONDS);
batchExecutor.shutdown(); // 启动有序关闭,停止接受新的任务,并在所有已经提交任务完全后终止try {scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);} catch (InterruptedException e) {e.printStackTrace();}
**代码实现 **
- 基本逻辑,创建100个线程,然后分三个批次,然后一块加入到ScheduledThreadPoolExecutor中执行,每一个批次间隔10秒钟
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Observer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;class Main{static class Task implements Runnable{private int id;public Task(int id){this.id = id;}@Overridepublic void run(){System.out.println("Task "+id+" is running");}}public static void main(String[] args) throws InterruptedException {int batchSize = 10;// define the ScheduledThreadPool to run the tasksScheduledExecutorService schedPE = Executors.newScheduledThreadPool(batchSize);// create the batch to store the batch taskList<List<Task>> batches = new ArrayList<>();for(int i = 0;i < 3;i ++) batches.add(new ArrayList<>());for (int i = 0;i < 100;i ++){if(i % 2 == 0 && i >= 12 && i <= 18)batches.get(1).add(new Task(i));else if(i % 2 == 0 && i >= 2 && i <= 8)batches.get(0).add(new Task(i));elsebatches.get(2).add(new Task(i));}for(int i = 0;i < 3;i ++) {int delay = i * 5;for(Task j:batches.get(i))schedPE.schedule(j,delay, TimeUnit.SECONDS);}schedPE.shutdown();schedPE.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);}
}
参考实现
- 使用FixedThreadThreadPool来完成任务,但是是一个批次一个批次增加任务,上一个批次的任务完成了,再执行下一个批次的任务。
- 使用Future的get方法,来获取每一个任务的执行结果,判定是否执行成功
import java.util.*;
import java.util.concurrent.*;class Main{public static void runTasksInBatches(List<Runnable> tasks,List<List<Integer>> batches){// create the fixedThreadPool to run the tasksExecutorService executor = Executors.newFixedThreadPool(10);// finish the first batch tasks// 继承的是runnbale接口,没有返回值,但是有future作为占位List<Future<Void>> futues = new ArrayList<>();Set<Integer> allBatchTaskIndexes = new HashSet<>();for (int i = 0;i < batches.size();i ++){List<Integer> batch = batches.get(i);for(int j:batch){allBatchTaskIndexes.add(j);futues.add(executor.submit(tasks.get(j),null));}for(Future<Void> future:futues){try{future.get();}catch(InterruptedException | ExecutionException e){e.printStackTrace();}}futues.clear();System.out.println("Batch "+i+" is finished");}// finish the redunt tasksfor(int i = 0;i < tasks.size();i ++){if(!allBatchTaskIndexes.contains(i)){futues.add(executor.submit(tasks.get(i),null));}}// close the thread poolexecutor.shutdown();while (!executor.isTerminated()){try{executor.awaitTermination(1,TimeUnit.SECONDS);}catch(InterruptedException e){e.printStackTrace();}}}private static List<Runnable> createTasks(){// create 100 tasks implement runnable interfaceList<Runnable> tasks = new ArrayList<>();for(int i = 0;i < 100;i ++){final int taskid = i;tasks.add(()->{System.out.println("Task "+taskid+" is running");});}return tasks;}public static void main(String[] args) {// create taskList<Runnable> tasks = createTasks();// first batch and second batch idxList<Integer> batch1 = new ArrayList<>(Arrays.asList(2,4,6,8));List<Integer> batch2 = new ArrayList<>(Arrays.asList(12,14,16,18));List<List<Integer>> batches = new ArrayList<>(Arrays.asList(batch1,batch2));// run the tasks in batchesrunTasksInBatches(tasks,batches);System.out.println("All tasks are finished");}
}
以下几个东西是值得学习的
使用Future<Void>来表示Runnable接口的返回值
List<Future<Void>> futures = new ArrayList<>();
...
ExecutorService executor=Executors.newFixedThreadPool(10);
futures.add(executor.submit(task,null)); // 提交一个task,任务执行完毕默认返回null
- 遍历任务确定线程池是否完全成功
for(Future<Void> future:futues){try{future.get();}catch(InterruptedException | ExecutionException e){e.printStackTrace();}}futues.clear();
总结
- 靠,昨天晚上早知道不熬夜了,现在写的这个题目,感觉整个人都被掏空了,精神不集中,然后很疲惫,今天写完这个早点睡吧,后续再补充一下关于线程池的八股题,这里就补充了编程题。
- 明天继续加油,先完成笔试,然后在完成项目!
- 最怕的就是你说是你做的,但是想不起来,所以,明天抓紧看项目,马上应该就是腾讯面试、讯飞面试等等,还是得加油!