一、QuartzSchedulerThread 的 run方法大致闡述
先說(shuō)一下run方法的執(zhí)行時(shí)機(jī):
當(dāng) Quartzscheduler執(zhí)行start方法 時(shí),方法體中有一句
schedThread.togglePause(false); ,接著就會(huì)調(diào)用 QuartzSchedulerThread 下的 togglePause 方法,將 paused 置為 false ,在此之后, QuartzSchedulerThread 下的 run 方法開(kāi)始真正運(yùn)行
/**通知主處理循環(huán)在下一個(gè)可能的點(diǎn)暫停 */ void togglePause(boolean pause) { synchronized (sigLock) { paused = pause; if (paused) { signalSchedulingChange(0); } else { sigLock.notifyAll(); } } }復(fù)制代碼 public void run() { int acquiresFailed = 0; // 這里就是判斷調(diào)度器是否該停止,如果沒(méi)有收到信號(hào)的話,這個(gè)調(diào)度器是一直處于循環(huán)之中的 while (!halted.get()) { try { // 這里是檢查我們是否應(yīng)該暫停 synchronized (sigLock) { // 我們?cè)诔跏蓟臅r(shí)候,paused 是置為 true的, // 因此在上文中,我們才說(shuō) // 當(dāng) Quartzscheduler 執(zhí)行 start方法時(shí)調(diào)用togglePause, // 將 paused 置為false,run 方法才開(kāi)始運(yùn)行 // 也是因?yàn)榇颂幍呐袛? while (paused && !halted.get()) { try { sigLock.wait(1000L); }catch (InterruptedException ignore) {} acquiresFailed = 0; } if (halted.get()) { break;} } // 如果從作業(yè)存儲(chǔ)中讀取一直失?。ɡ鐢?shù)據(jù)庫(kù)關(guān)閉或重新啟動(dòng)) // 就會(huì)等待一段時(shí)間~ if (acquiresFailed > 1) { try { //這里就是計(jì)算延遲時(shí)間 long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } catch (Exception ignore) {} } // 從線程池拿出空閑可利用的線程數(shù)量 // 這里多談一嘴 blockForAvailableThreads()方法 // 它是一個(gè)阻塞式方法,直到至少有一個(gè)可用線程。 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { List triggers; long now = System.currentTimeMillis(); // 清除信號(hào)調(diào)度變更 clearSignaledSchedulingChange(); try { //如果可用線程數(shù)量足夠那么就是30后再次掃描, //acquireNextTriggers方法的三個(gè)參數(shù)的意思分別是: //idleWaitTime :為如果沒(méi)有的再次掃描的時(shí)間,默認(rèn)是 // private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L; 30秒 //Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()) :這里的意思就是一次最多能取幾個(gè)出來(lái) //batchTimeWindow :默認(rèn)是0,同樣是一個(gè)時(shí)間范圍, //如果有兩個(gè)任務(wù)只差一兩秒,而執(zhí)行線程數(shù)量滿(mǎn)足及batchTimeWindow時(shí)間也滿(mǎn)足的情況下就會(huì)兩個(gè)都取出來(lái) // 具體的方法的執(zhí)行,后文再看~ triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()){ //… } //在獲取到 triggers 觸發(fā)器不為空后, //trigger列表是以下次執(zhí)行時(shí)間排序查出來(lái)的 if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); //取出集合中最早執(zhí)行的觸發(fā)器 //獲取它的下一個(gè)觸發(fā)時(shí)間 long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime – now; // 判斷距離執(zhí)行時(shí)間是否大于2 毫秒 while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } //判斷是不是距離觸發(fā)事件最近的, if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // 沒(méi)有的話,就進(jìn)行阻塞,稍后進(jìn)行執(zhí)行 now = System.currentTimeMillis(); timeUntilTrigger = triggerTime – now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) {} } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime – now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to ‘executing’ List bndles = new ArrayList(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { //開(kāi)始根據(jù)需要執(zhí)行的trigger從數(shù)據(jù)庫(kù)中獲取相應(yīng)的JobDetail 同時(shí)這一步也更新了 triggers 的狀態(tài),稍后會(huì)講到~ List res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( “An error occurred while firing triggers ‘” + triggers + “‘”, se); for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } //將查詢(xún)到的結(jié)果封裝成為 TriggerFiredResult for (int i = 0; i 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime – now; synchronized(sigLock) { // …. } } // …. } // while (!halted) // …. }復(fù)制代碼
二、一些細(xì)節(jié)
2.1、先獲取線程池中的可用線程數(shù)量
(若沒(méi)有可用的會(huì)阻塞,直到有可用的);
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); 復(fù)制代碼
2.2、獲取 30m 內(nèi)要執(zhí)行的 trigger
(即 acquireNextTriggers )
我們來(lái)看一看 acquireNextTriggers 方法
首先說(shuō) acquireNextTriggers 具體實(shí)現(xiàn)是在 JobStoreSupport 中,同時(shí) quartz 與數(shù)據(jù)庫(kù)關(guān)聯(lián)的實(shí)現(xiàn)大都在 JobStoreSupport 中,當(dāng)然更具體的SQL執(zhí)行還是在 DriverDelegate 接口下的。
acquireNextTriggers 做了哪些事情呢?
我們看看這兩個(gè)方法:
首先看第一個(gè) acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
主要就是獲取下一個(gè) 30m內(nèi)可執(zhí)行的triggers的觸發(fā)器,在里面 JobStoreSupport 從數(shù)據(jù)庫(kù)取出 triggers 時(shí)是按照 nextFireTime 排序的
更具體的就需要大家點(diǎn)進(jìn)方法去看啦~另外里面還包含triggers狀態(tài)的變更,屬于是更加細(xì)節(jié)化的東西。
第二個(gè)就是獲取到觸發(fā)的觸發(fā)記錄~
然后在執(zhí)行 executeInNonManagedTXLock 時(shí),是需要先獲得鎖,之后再在提交時(shí)釋放鎖的。
待直到獲取的trigger中最先執(zhí)行的trigger在2ms內(nèi);
if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime – now; while(timeUntilTrigger > 2) { //… } }復(fù)制代碼
2.3、triggersFired(triggers)
List res = qsRsrcs.getJobStore().triggersFired(triggers);
這一步看著只是獲取了 List 對(duì)象,實(shí)際上在 triggersFired(triggers) 方法中隱藏了很多東西~
首先查詢(xún),確保觸發(fā)器沒(méi)有被刪除、暫?;蛲瓿?#8230;,就更新 firedTrigger 的 status=STATE_EXECUTING ;代碼的注釋上還說(shuō),如果沒(méi)有這些就會(huì)將狀態(tài)該為 deleted
另外就是更新觸發(fā)觸發(fā)器:
如果下一次的執(zhí)行時(shí)間為空,狀態(tài)則改為 STATE_COMPLETE
在執(zhí)行 executeInNonManagedTXLock 方法時(shí),提交前先獲得鎖, transOwner = getLockHandler().obtainLock(conn, lockName);
最后是釋放鎖: commitConnection(conn);
2.4、創(chuàng)建JobRunShell,放進(jìn)線程池執(zhí)行
針對(duì)每個(gè)要執(zhí)行的trigger,創(chuàng)建JobRunShell,并放入線程池執(zhí)行:
然后由execute:執(zhí)行job
更詳細(xì)的看不下去啦~
來(lái)源:https://juejin.cn/post/7131671438901116935