juc包中锁的应用
ReentrantReadWriteLock读写锁
public class MyTest2 {private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();public void method() {try {readWriteLock.readLock().lock();// 读锁是共享锁,可以多个线程同时获得
// readWriteLock.writeLock().lock();//写锁是排他锁,只能有一个线程获得try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("method");} finally {readWriteLock.writeLock().unlock();
// readWriteLock.writeLock().unlock();}}public static void main(String[] args) {MyTest2 myTest2 = new MyTest2();IntStream.range(0, 10).forEach(i -> new Thread(myTest2::method).start());}
}
关于ReentrantReadWriteLock的操作逻辑:
读锁:
1. 在获取读锁时,会尝试判断当前对象是否拥有了写锁,如果已经拥有,则直接失败。
2. 如果没有写锁,就表示当前对象没有排他锁,则当前线程会尝试给对象加锁
3. 如果当前线程已经持有了该对象的锁,那么直接将读锁数量加1写锁:
4. 在获取写锁时,会尝试判断当前对象是否拥有了锁(读锁与写锁),如果已经拥有且持有的线程并非当前线程,直接失败。
5. 如果当前对象没有被加锁,那么写锁就会为为当前对象上锁,并且将写锁的个数加1.
6. 将当前对象的排他锁线程持有者设为自己
StampedLock对象在应用时需要注意如下几个点:
· 获取锁时返回一个stamp值,值为0表示获取失败,其余都表示成功。
· 释放锁时需要一个stamp值,这个值必须是和成功获取锁时得到的Stamp值是一致的。
· StampedLock是不可重入的。(如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)
StampedLock有三种访问模式;
第一:Reading(读模式):功能和ReentrantReadwriteLock的读锁类似
第二:Writing(写模式):功能和ReentrantReadwriteLock的写锁类似
第三:optimistic reading(乐观读模式):这是一种优化的读模式。
· StampedLock支持读锁和写锁的相互转换。
· 无论写锁还是读锁,都不支持Conditon等待。
总之:相比ReadwriteLock读写锁,StampedLock通过提供乐观读在多线程多写线程少的情况下可以提供更好的性能,因为乐观读不需要进行CAS设置锁的状态。
JUC包中原子(Atomic)类的应用
JUC包中并发工具类的应用(重要)
CountDownLatch对象
案例2
一个主任务,需要等待其他多个子任务全部完成后才能继续执行主任务的情况下使用:
public static void main(String[] args) {CountDownLatch countDownLatch = new CountDownLatch(3); //设置计数器为3IntStream.range(0, 3).forEach(i -> new Thread(() -> { //创建3个线程,每个线程最后都让计数器减1try {Thread.sleep(2000);System.out.println("hello");} catch (InterruptedException ex) {ex.printStackTrace();} finally {countDownLatch.countDown(); //计数器减1}}).start());System.out.println("启动子线程完毕");try {countDownLatch.await(); // 当countDownLatch计数器为0时,才能继续往下执行,否则一直阻塞。 它有一个重载的方法countDownLatch.await(200,TimeUnit.MILLISECONDS),可以指定阻塞的时间200ms,在200ms内没有将计数器减为0,也会自动往下执行。 } catch (InterruptedException e) {e.printStackTrace();}System.out.println("主线程执行完毕");}
CyclicBarrier对象
cBarrier.await()方法调用表示需要等待参与的3个线程全部完成step-01后再继续执行step-02
可以使用reset()方法重置计数器
案例3
当多个线程都到达某一个阶段后,才能继续下一个阶段代码或者触发某个任务执行。冲破屏障后,计数器又恢复到原状。
public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {System.out.println("hello world");//当3个线程都调用await方法时触发这里的任务});for (int i = 0; i < 2; ++i) {for (int n = 0; n < 3; ++n) {new Thread(() -> {try {Thread.sleep((long) (Math.random() * 2000));int randomInt = new Random().nextInt(500);System.out.println("hello " + randomInt);cyclicBarrier.await();//在屏障前等待,当3个线程都执行到这里时,立刻都继续往下执行System.out.println("world " + randomInt);} catch (Exception ex) {ex.printStackTrace();}}).start();}}}
关于CyclicBarrier的底层执行流程
1. 初始化CyclicBarrier中的各种成员变量,包括parties、count以及Runnable(可选)
2. 当调用await方法时,底层会先检查计数器是否已经归零,如果是的话,那么就首先执行可选的Runnable,接下来开始下一个generation;
3. 在下一个分代中,将会重置count值为parties,并且创建新的Generation实例。
4. 同时会调用Condition的signalAll方法,唤醒所有在屏障前面等待的线程,让其开始继续执行。
5. 如果计数器没有归零,那么当前的调用线程将会通过Condition的await方法,在屏障前进行等待。
6. 以上所有执行流程均在lock锁的控制范围内,不会出现并发情况。
Semaphore对象
未来模式
Future未来模式
常用实现类FutureTask< V >
public static void main(String[] args) {Callable<Integer> callable = () -> { //Callable和Runnable的区别在于Callable有返回值System.out.println("pre execution");Thread.sleep(5000);int randomNumber = new Random().nextInt(500); //随机生成一个500内的整数System.out.println("post execution");return randomNumber;};FutureTask<Integer> futureTask = new FutureTask(callable);//将任务变成异步任务new Thread(futureTask).start();System.out.println("thread has started");try {Thread.sleep(2000);System.out.println(futureTask.get());//等待futureTask完成任务,再返回futureTask的结果,所以get()方法时阻塞的。} catch (Exception e) {e.printStackTrace();}
}
CompletableFuture
弥补Future接口中get方法的阻塞,可以使用completableFuture.whenComplete回调方法,它不是阻塞方法。
public static void main(String[] args) {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(2000); //相当于Thread.sleep(2000)} catch (InterruptedException e) {e.printStackTrace();}System.out.println("task finished");});completableFuture.whenComplete((t, action) -> System.out.println("执行完成!"));//不是阻塞方法System.out.println("主线程执行完毕");try {TimeUnit.MILLISECONDS.sleep(7000);} catch (InterruptedException e) {e.printStackTrace();}}
JUC包中线程池的应用
线程池应用
shutdown只是将线程池的状态设置为SHUTWDOWN状态,正在执行的任务会继续执行下去,新的任务将会被拒绝。但这个方法不会等待提交的任务执行完,我们可以用awaitTermination来等待任务执行完。shutdownNow()方法是线程池处于STOP状态,此时线程池不再接受新的任务,并且会去尝试终止正在执行的任务,然后清空并返回队列。整个过程类似超市或商场关门。
线程池中任务调度
按固定频率执行任务:
按固定时长执行任务,与任务执行时长有关(如果不执行完第n次任务永远不会执行第n+1次任务的):
基本ThreadPoolExecutor
/*int corePoolSize:线程池当中所一直维护的线程数量,如果线程池处于任务空闲期间,那么该线程也并不会被回收掉int maximumPoolSize:线程池中所维护的线程数的最大数量long keepAliveTime:超过了corePoolSize的线程在经过keepAliveTime时间后如果一直处于空闲状态,那么超过的这部分线程将会被回收掉TimeUnit unit:指的是keepAliveTime的时间单位BlockingQueue<Runnable> workQueue:向线程池所提交的任务位于的阻塞队列,它的实现有多种方式ThreadFactory threadFactory:线程工厂,用于创建新的线程并被线程池所管理,默认线程工厂所创建的线程都是用户线程且优先级为正常优先级RejectedExecutionHandler handler:表示当线程池中的线程都在忙于执行任务且阻塞队列也已经满了的情况下,新到来的任务该如何被对待和处理。它有四种实现策略:AbortPolicy: 直接抛出一个运行期异常。DiscardPolicy:默默地丢弃掉提交的任务,什么都不做且不抛出任何异常DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务(队头元素),并且为当前所提交的任务留出一个队列中的空闲空间,以便将其放进到队列中CallerRunsPolicy:直接由提交任务的线程来运行这个提交的任务在线程池中,最好将偏向锁的标记关闭。*/public class MyTest1 {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(3);//实际开发中不建议使用IntStream.range(0, 50).forEach(i -> {executorService.submit(() -> {IntStream.range(0, 50).forEach(j -> {System.out.println(Thread.currentThread().getName());});});});executorService.shutdown();//关闭线程池}
}
实际开发中使用线程池
public class MyTest2 {public static void main(String[] args) {ExecutorService executorService = new ThreadPoolExecutor(3, 5, 0, TimeUnit.SECONDS,new LinkedBlockingQueue(3), new ThreadPoolExecutor.CallerRunsPolicy());IntStream.range(0, 9).forEach(i -> {executorService.submit(() -> {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}IntStream.range(0, 1).forEach(j -> {System.out.println(Thread.currentThread().getName());});});});executorService.shutdown();}
}
对于线程池来说,其提供了execute与submit两种方式来向线程池提交任务
总体来说,submit方法是可以取代execute方法的,因为它既可以接收Callable任务,也可以接收Runnable任务。
关于线程池的总体执行策略:1. 如果线程池中正在执行的线程数 < corePoolSize,那么线程池就会优先选择创建新的线程而非将提交的任务加到阻塞队列中。
2. 如果线程池中正在执行的线程数 >= corePoolSize,那么线程池就会优先选择对提交的任务进行阻塞排队而非创建新的线程。
3. 如果提交的任务无法加入到阻塞队列当中,那么线程池就会创建新的线程;如果创建的线程数超过了maximumPoolSize,那么拒绝策略就会起作用。关于线程池任务提交的总结:1. 两种提交方式:submit与execute
2. submit有三种方式,无论哪种方式,最终都是将传递进来的任务转换为一个Callable对象进行处理
3. 当Callable对象构造完毕后,最终都会调用Executor接口中声明的execute方法进行统一的处理对于线程池来说,存在两个状态需要维护:1. 线程池本身的状态:ctl的高3位来表示
2. 线程池中所运行着的线程的数量:ctl的其余29位来表示线程池一共存在5种状态:1. RUNNING:线程池可以接收新的任务提交,并且还可以正常处理阻塞队列中的任务。
2. SHUTDOWN:不再接收新的任务提交,不过线程池可以继续处理阻塞队列中的任务。
3. STOP:不再接收新的任务,同时还会丢弃阻塞队列中的既有任务;此外,它还会中断正在处理中的任务。
4. TIDYING:所有的任务都执行完毕后(同时也涵盖了阻塞队列中的任务),当前线程池中的活动的线程数量降为0,将会调用terminated方法。
5. TERMINATED:线程池的终止状态, 当terminated方法执行完毕后,线程池将会处于该状态之下。RUNNING -> SHUTDOWN:当调用了线程池的shutdown方法时,或者当finalize方法被隐式调用后(该方法内部会调用shutdown方法)
RUNNING, SHUTDOWN -> STOP:当调用了线程池的shutdownNow方法时
SHUTDOWN -> TIDYING:在线程池与阻塞队列均变为空时
STOP -> TIDYING:在线程池变为空时
TIDYING -> TERMINATED:在terminated方法被执行完毕时
ForkJoinPool线程池(大量的计算操作会提升性能,如果大量的IO操作不一定提升性能)
把一个大任务fork为多个原子任务,原子任务执行结束后再join为原始的大任务。
例:
public class MyTest3 {public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();MyTask myTask = new MyTask(1, 100);//计算1+2+3...+100的和int result = forkJoinPool.invoke(myTask);System.out.println(result);forkJoinPool.shutdown();//关闭线程池}
}
/**
计算两个数之间所有数字之和
*/
class MyTask extends RecursiveTask<Integer> {private int limit = 4; //定义最小任务单位,4以下的任务不能再fork拆分private int firstIndex;private int lastIndex;public MyTask(int firstIndex, int lastIndex) {this.firstIndex = firstIndex;this.lastIndex = lastIndex;}@Overrideprotected Integer compute() {int result = 0;int gap = lastIndex - firstIndex;boolean flag = gap <= limit;if (flag) {System.out.println(Thread.currentThread().getName());for (int i = firstIndex; i <= lastIndex; ++i) {result += i;}} else {int middleIndex = (firstIndex + lastIndex) / 2;MyTask leftTask = new MyTask(firstIndex, middleIndex);MyTask rightTask = new MyTask(middleIndex + 1, lastIndex);invokeAll(leftTask, rightTask); //invokeAll方法内部会调用compute方法,这一步是fork拆解任务int leftTaskResult = leftTask.join(); //返回左边任务结果int rightTaskResult = rightTask.join(); //返回右边任务结果result = leftTaskResult + rightTaskResult;}return result;}
}
ExecutorCompletionService
线程池默认是按照线程提交的顺序获取到线程结果,使用ExecutorCompletionService可以按照任务执行完成顺序获取到线程结果。使得系统效率更高效。
public class MyTest4 {public static void main(String[] args) throws Exception {ExecutorService executorService = new ThreadPoolExecutor(4, 10, 10, TimeUnit.SECONDS,new LinkedBlockingQueue<>(20), new ThreadPoolExecutor.AbortPolicy());CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);//执行任务IntStream.range(0, 10).forEach(i -> { completionService.submit(() -> { //使用completionService提交任务Thread.sleep((long) (Math.random() * 1000));System.out.println(Thread.currentThread().getName());return i * i;});});//拿到任务结果for (int i = 0; i < 10; ++i) { int result = completionService.take().get();System.out.println(result);}//关闭线程池executorService.shutdown();}
}
ThreadLocalRandom
并发情况下使用ThreadLocalRandom代替Random,特别是ForkJoinTask的时候
public class MyTest5 {public static void main(String[] args) {Random random = new Random();IntStream.range(0, 10).forEach(i -> {System.out.println(random.nextInt(10));});System.out.println("===========");ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();IntStream.range(0, 10).forEach(i -> {System.out.println(threadLocalRandom.nextInt(10));});}
}