5. Executor 和 ThreadPoolExecutor
Executor 和 ThreadPoolExecutor 实现的是线程池,主要作用是支持高并发的访问处理。<br>
Executor 是一个接口,与线程池有关的大部分类都实现了此接口。<br>ExecutorService 是 Executor 的子接口;AbstractExecutorService 是 ExecutorService 的实现类,但是是抽象类。<br>ThreadPoolExecutor 是 AbstractExecutorService 的子类,可实例化。<br>
Executors介绍
newCachedThreadPool() <br>返回 ExecutorService 实例,创建无界限线程池,理论上线程最大个数是 Integer.MAX_VALUE<br>
newFixedThreadPool(int) <br>返回 ExecutorService 实例,创建的是有界线线程池,即线程个数可以指定最大数量,如果超过最大数量,则后加入的线程需要等待。<br>
newSingleThreadExecutor() <br>创建单一线程池,实现以队列的方式来执行任务<br>
6. Future 和 Callable
ExecutorService的submit、 execute区别<br><br>
execute <br>方法没有返回值,不能直接捕获异常,但可以通过自定义ThreadFactory的方式捕获异常;<br>
submit <br>方法有返回值,可以直接使用 catch Execution-Exception捕获异常。<br>
Future 的常用api<br>
get() <br>获取线程返回值,阻塞执行<br>
get(long timeout, TimeUnit unit) <br>获取线程返回值(有超时时间),阻塞执行<br>
cancel(boolean mayInterruptIfRunning) <br>取消执行,入参为true表示,如果线程正在运行则中断执行;为false表示,如果线程没有在运行才中断继续执行;返回值表示取消执行命令是否成功完成<br>
isCancelled() <br>是否已取消执行<br>
isDone() <br>如果完成此任务,则返回true。完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true。<br>
jdk1.5中可以使用Future 和 Callable 来获取线程返回值。
7. CompletionService
以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样就可以将执行任务与处理任务分离开。
submit() <br>将异步执行任务提交到管理器中
take() <br>返回的是 最先完成任务的 Future 对象,take() 方法时阻塞执行的<br>
poll() <br>返回的Future可能为 null,因为poll() 是非阻塞执行的。<br>
8. Fork/Join
jdk1.7中提供了Fork/Join并行执行任务框架,主要作用就是把大任务分割成若干个小任务,再对每个小任务得到的结果进行汇总。<br><br>
抽象类ForkJoinTask 类
子类CountedCompleter
子类RecursiveAction
执行的任务具有无返回值,且仅执行一次
子类RecursiveTask
执行任务可以通过方法 join() 或者 get() 取得方法返回值。
1. Semaphore-信号量
可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。
acquire() <br>获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。
acquire(int permits) <br>获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
acquireUninterruptibly() <br>获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。
tryAcquire()<br>尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
tryAcquire(long timeout, TimeUnit unit)<br>尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
release()<br>释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
release(int permits)<br>释放n个令牌,唤醒一个获取令牌不成功的阻塞线程。
hasQueuedThreads()<br>等待队列里是否还存在等待线程。
availablePermits()<br>返回可用的令牌数量。
getQueueLength()<br>获取等待队列里阻塞的线程数。
drainPermits()<br>清空令牌把可用令牌数置为0,返回清空令牌的数量。
应用场景:限流
2. Exchanger-交换者
线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
V exchange(V v)<br>等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。<br><br>
V exchange(V v, long timeout, TimeUnit unit)<br>等待另一个线程到达此交换点(除非当前线程被中断或超出了指定的等待时间),然后将给定的对象传送给该线程,并接收该线程的对象。<br>
应用场景:校对,比如我们需要将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对这两个Excel数据进行校对,看看是否录入的一致
3. CountDownLatch、CyclicBarrier及Phaser
CountDownLatch-闭锁
同步功能的辅助类,初始化时传入一个int类型的count值,当count计数不为0时,则当前线程呈wait状态,如果为0,则继续执行。对应countDown()方法的理解就是,相当于一辆长途汽车,只有人坐满了,才会出发(所有线程报团一起继续执行)
调用await()方法,判断count 是否为0,如果不为0则等待(一直等到count值为0时,继续往下执行);await 还有一个api:await(long timeout,TimeUnit unit)。
调用countDown()方法将count计数减1,当count减到0时,线程继续运行。可使用getCount()方法获取count值。
应用场景:确保某个服务在其依赖的所有其他服务都已经启动之后才启动,等待直到某个操作的所有参与者都就绪在继续执行。(例如:多人游戏中需要所有玩家准备才能开始)
CyclicBarrier-栅栏
同步辅助类
应用场景:比如几个家庭成员决定在某个地方集合,所有人在6:00在某地集合,到了以后要等待其他人,之后才能讨论去哪里吃饭。 并行迭代,将一个问题分成很多子问题,当一系列的子问题都解决之后(所有子问题线程都已经await()),此时将栅栏打开,所有子问题线程被释放,而栅栏位置可以留着下次使用。<br><br>
Phaser
始于jdk1.7 是对CountDownLatch 与 CyclicBarrier 的全面升级,是一个java并发api的一个重量级类
arriveAndAwaitAdvance() <br>每凑齐指定人数就报团执行一次,同一个线程可以执行多次arriveAndAwaitAdvance(),表示不同阶段的报团<br><br>
arriveAndDeregister() <br>退出当前团,且当前团规则人数减1(报完当前团后,不再报下阶段的团)<br><br>
getArrivedParties() <br>当前团凑足了多少人<br><br>
getRegisteredParties() <br>获取注册的团规定人数<br><br>
arrive() <br>使getArrivedParties()数量加1,即用一个虚拟线程占据一个线程的位置, 此虚拟线程不阻塞<br><br>
register() <br>动态增加一个团的规定人数<br><br>
bulkRegister(int parties) <br>动态的增加规定报团人数,是register()的多次调用版<br><br>
forceTermination() <br>取消报团,线程执行各自代码,不再有Phaser阻塞等待情况 <br><br>
getUnarrivedParties() <br>当前还差多少线程开团,是getArrivedParties()方法的补集<br><br>
isTerminated() <br>判断Phaser对象是否已为销毁状态<br><br>
4. ScheduleExecutorService
背景:Java中定时任务Timer工具类提供了计划任务的实现,但是Timer工具类是以队列的方式来管理线程的,并不是以线程池的方式,这样在高并发的情况下,运行效率会有点低。
作用是将定时任务与线程池结合使用。
execute(Runnable command) <br>直接执行,执行命令所需的延迟为零<br>
getQueue() <br>返回 BlockingQueue<Runnable> 返回此执行器使用的任务队列<br>
schedule(Callable<V> callable, long delay, TimeUnit unit) <br> 创建并执行给定延迟后启用的计划任务<br>
schedule(Runnable command, long delay, TimeUnit unit) <br>创建并执行给定延迟后启用的计划任务<br>
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) <br>创建并执行一个周期性操作,该操作首先在给定的初始延迟之后启用,然后以给定的周期启用<br>
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) <br>创建并执行周期性动作,主要左右那个是设置多个任务之间固定的运行时间间隔。<br>
shutdown() <br>启动一个有序的关闭,其中先前提交的任务被执行,但是没有新任务被接受<br>
shutdownNow() <br>尝试停止所有积极执行任务,停止等待任务的处理,并返回等待执行的任务的列表<br>
submit(Callable<T> task) <br>提交一个返回值的任务用于执行<br>
submit(Runnable task) <br>提交可执行任务以执行<br>
submit(Runnable task, T result) <br>提交可执行任务以执行<br>