面試官: 有了解過線程池的工作原理嗎?說說看
前言
目前正在出一個(gè)Java多線程專題長期系列教程,從入門到進(jìn)階含源碼解讀, 篇幅會(huì)較多, 喜歡的話,給個(gè)關(guān)注 ~
本節(jié)主要帶大家從ThreadPoolExecutor源碼角度來了解一下線程池的工作原理,一起來看下吧~
Executor 接口
首先Executor這個(gè)接口是線程池實(shí)現(xiàn)的頂層接口類,我們上節(jié)遇到的ExecutorService也是繼承了Executor
public interface ExecutorService extends Executor {…}
ExecutorService的上層AbstractExecutorService這個(gè)抽象類實(shí)現(xiàn)了接口ExecutorService
public abstract class AbstractExecutorService implements ExecutorService {…}
ThreadPoolExecutor繼承了AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {…}
ThreadPoolExecutor這個(gè)類我們需要重點(diǎn)看一下,它是接口的實(shí)現(xiàn)類,我們以newCachedThreadPool為例
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
可以看到內(nèi)部其實(shí)還是調(diào)用了ThreadPoolExecutor,我們?cè)倏磏ewFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
內(nèi)部也是調(diào)了它, 下面我們就看下這個(gè)類
ThreadPoolExecutor
首先我們從構(gòu)造函數(shù)看起,它主要有四個(gè)構(gòu)造函數(shù)
構(gòu)造函數(shù)一
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
一共有五個(gè)參數(shù):
- corePoolSize – 保留在池中的線 程數(shù),即使是空閑的,除非設(shè)置allowCoreThreadTimeOut
- maximumPoolSize – 池中允許的最大線程數(shù)
- keepAliveTime – 當(dāng)線程數(shù)大于核心時(shí),這是多余的空閑線程在終止前等待新任務(wù)的最長時(shí)間。
- unit – keepAliveTime參數(shù)的時(shí)間單位
- workQueue – 用于在執(zhí)行任務(wù)之前保存任務(wù)的隊(duì)列。此隊(duì)列將僅保存由execute方法提交的Runnable任務(wù)。
構(gòu)造函數(shù)二
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
其它參數(shù)同上
- threadFactory 執(zhí)行器創(chuàng)建新線程時(shí)使用的工廠
構(gòu)造函數(shù)三
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
- handler 由于達(dá)到線程邊界和隊(duì)列容量而阻塞執(zhí)行時(shí)使用的處理程序
構(gòu)造函數(shù)四
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
這個(gè)把之前都綜合了一下,其實(shí)可以看到前幾個(gè)內(nèi)部都調(diào)用了this,調(diào)用自身,也就是調(diào)用這個(gè)構(gòu)造函數(shù),進(jìn)行一些初始化
BlockingQueue 阻塞隊(duì)列
有幾個(gè)參數(shù)比較好理解,我們來看下這個(gè)參數(shù)workQueue, 它是一個(gè)阻塞隊(duì)列,這里簡(jiǎn)要給大家提一下,這塊內(nèi)容也比較重要,后邊會(huì)專門去講
BlockingQueue本身是一個(gè)還接口,它有幾個(gè)比較常用的阻塞隊(duì)列
- LinkedBlockingQueue 鏈?zhǔn)阶枞?duì)列,底層數(shù)據(jù)結(jié)構(gòu)是鏈表
- ArrayBlockingQueue 數(shù)組阻塞隊(duì)列,底層數(shù)據(jù)結(jié)構(gòu)是數(shù)組,需要指定隊(duì)列的大小。
- SynchronousQueue 同步隊(duì)列,內(nèi)部容量為0,每個(gè)put操作必須等待一個(gè)take操作
- DelayQueue 延遲隊(duì)列,該隊(duì)列中的元素只有當(dāng)其指定的延遲時(shí)間到了,才能夠從隊(duì)列中獲取到該元素
ThreadFactory 線程工廠
這個(gè)是線程工廠類,統(tǒng)一在創(chuàng)建線程時(shí)設(shè)置一些參數(shù),如是否守護(hù)線程、線程的優(yōu)先級(jí)等, 同樣它也是一個(gè)接口,我們?cè)赥hreadPoolExecutor內(nèi)部看到了 Executors.defaultThreadFactory(),這個(gè)是一個(gè)默認(rèn)工廠
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = “pool-” + poolNumber.getAndIncrement() + “-thread-“; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
沒有指定參數(shù),就會(huì)默認(rèn)創(chuàng)建DefaultThreadFactory,還有其它的factory,大家可以自行看下,區(qū)別就在創(chuàng)建線程時(shí)指定的參數(shù)
RejectedExecutionHandler 拒絕策略
RejectedExecutionHandler同樣是一個(gè)接口,這個(gè)處理器是用來專門處理拒絕的任務(wù),也就是ThreadPoolExecutor無法處理的程序。同理,我們可以看到ThreadPoolExecutor內(nèi)部有調(diào)了defaultHandler
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
這個(gè)是默認(rèn)的拒絕策略, 可以看到它的默認(rèn)處理是拋出拒絕的異常
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(“Task ” + r.toString() + ” rejected from ” + e.toString()); } }
再帶大家看下另外的策略, DiscardPolicy,這個(gè)策略不會(huì)拋出異常,它會(huì)丟棄這個(gè)任務(wù)
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy 該策略丟棄最舊的未處理請(qǐng)求,然后重試execute ,除非執(zhí)行程序被關(guān)閉,在這種情況下任務(wù)被丟棄。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 判斷是否關(guān)閉 if (!e.isShutdown()) { e.getQueue().poll(); // 任務(wù)重試 e.execute(r); } } }
CallerRunsPolicy 直接在execute方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù),除非執(zhí)行程序已關(guān)閉,在這種情況下,任務(wù)將被丟棄。
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 直接判斷是否關(guān)閉 未關(guān)閉就執(zhí)行 if (!e.isShutdown()) { r.run(); } } }
線程調(diào)度策略
看完構(gòu)造函數(shù),下面看下它的一些常量
private static final int COUNT_BITS = Integer.SIZE – 3;// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
通過變量名,我們大致知道是用來表示線程池的狀態(tài)。線程池本身有一個(gè)調(diào)度線程,這個(gè)線程就是用于管理整個(gè)線程池的各種任務(wù)和事務(wù),例如創(chuàng)建線程、銷毀線程、任務(wù)隊(duì)列管理、線程隊(duì)列管理等等,所以它本身也有上面的狀態(tài)值。
當(dāng)線程池被創(chuàng)建后就會(huì)處于RUNNING狀態(tài), 主池控制狀態(tài)ctl是一個(gè)原子整數(shù)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
調(diào)用shutdown()方法后處于「SHUTDOWN」?fàn)顟B(tài),線程池不能接受新的任務(wù),清除一些空閑worker,不會(huì)等待阻塞隊(duì)列的任務(wù)完成。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
另外,還有一個(gè)shutdownNow,調(diào)用后處于「STOP」?fàn)顟B(tài),線程池不能接受新的任務(wù),中斷所有線程,阻塞隊(duì)列中沒有被執(zhí)行的任務(wù)全部丟棄。此時(shí),poolsize=0,阻塞隊(duì)列的size也為0。
public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中斷所有線程 interruptWorkers(); // 將任務(wù)隊(duì)列排空到一個(gè)新列表中 這里要注意下 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會(huì)變?yōu)椤窽IDYING」?fàn)顟B(tài)。接著會(huì)執(zhí)行terminated()函數(shù)。
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 不等于0時(shí) 中斷任務(wù)線程 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 將狀態(tài)設(shè)置為 TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 終止 terminated(); } finally { // 執(zhí)行完 terminated 轉(zhuǎn)為 TERMINATED狀態(tài) ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
execute
這個(gè)是執(zhí)行任務(wù)的核心方法,我們一起看一下
public void execute(Runnable command) { // 如果任務(wù)不存在 拋空異常 if (command == null) throw new NullPointerException(); // 獲取當(dāng)前狀態(tài)值 int c = ctl.get(); // 當(dāng)前線程數(shù)小于corePoolSize,則調(diào)用addWorker創(chuàng)建核心線程執(zhí)行任務(wù) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果不小于corePoolSize,則將任務(wù)添加到workQueue隊(duì)列。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果isRunning返回false(狀態(tài)檢查),則remove這個(gè)任務(wù),然后執(zhí)行拒絕策略。 if (! isRunning(recheck) && remove(command)) reject(command); // 線程池處于running狀態(tài),但是沒有線程,則創(chuàng)建線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果放入workQueue失敗,則創(chuàng)建非核心線程執(zhí)行任務(wù), else if (!addWorker(command, false)) // 如果這時(shí)創(chuàng)建失敗,就會(huì)執(zhí)行拒絕策略。 reject(command); }
在源碼中,我們可以看到,多次進(jìn)行了isRunning判斷。在多線程的環(huán)境下,線程池的狀態(tài)是多變的。很有可能剛獲取線程池狀態(tài)后線程池狀態(tài)就改變了
總結(jié)
下面給大家簡(jiǎn)要的總結(jié)一下線程池的處理流程
結(jié)束語
它的源碼還是比較長的,一篇文章說不清楚,有興趣的同學(xué)可以通過本篇文章的理解繼續(xù)閱讀它的源碼。
下一節(jié), 繼續(xù)帶大家詳探討ThreadPoolExecutor中是如何進(jìn)行線程復(fù)用 ~
往期內(nèi)容推薦
- Java多線程專題之線程與進(jìn)程概述
- Java多線程專題之線程類和接口入門
- Java多線程專題之進(jìn)階學(xué)習(xí)Thread(含源碼分析)
- Java多線程專題之Callable、Future與FutureTask(含源碼分析)
- 面試官: 有了解過線程組和線程優(yōu)先級(jí)嗎
- 面試官: 說一下線程的生命周期過程
- 面試官: 說一下線程間的通信
- 面試官: 說一下Java的共享內(nèi)存模型
- 面試官: 有了解過指令重排嗎,什么是happens-before
- 面試官: 有了解過volatile關(guān)鍵字嗎 說說看
- 面試官: 有了解過Synchronized嗎 說說看
- Java多線程專題之Lock鎖的使用
- 面試官: 有了解過ReentrantLock的底層實(shí)現(xiàn)嗎?說說看
- 面試官: 有了解過CAS和原子操作嗎?說說看
- Java多線程專題之線程池的基本使用
- 我的博客(閱讀體驗(yàn)較佳)
- 寫給初學(xué)者的Java基礎(chǔ)教程
- 一文帶你快速學(xué)習(xí)Java集合類
- 花幾分鐘快速了解一下泛型與枚舉
- Java注解與反射入門到進(jìn)階
- JavaIO教程從入門到進(jìn)階
項(xiàng)目源碼(源碼已更新 歡迎star )
- java-thread-all
- 地址: https://github.com/qiuChengleiy/java-thread-all.git
推薦 SpringBoot & SpringCloud (源碼已更新 歡迎star )
- springboot-all
- 地址: https://github.com/qiuChengleiy/springboot-all.git
- SpringBoot系列教程合集
- 一起來學(xué)SpringCloud合集