前言
上个月由于业务量突然增大,公司后台系统需要统计每日用户UV,导致系统卡顿明显。
经排查,问题源于单线程计算,造成接口响应极慢、CPU 占用过高,高峰期甚至阻塞主线程,影响其他业务正常运行。
为解决该性能瓶颈,采用线程池异步并行处理日志数据,将日志分片后交给多线程同时去重统计,最后合并结果。既显著提升计算速度,又不阻塞主流程,成功解决高数据量下的基数统计性能问题。
一、实现原理
⚠️核心流程
✔️线程 → 工作队列 → 非核心线程 → 拒绝策略
二、构造方法
1、ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }2、线程池七大核心参数
2.1、corePoolSize:核心线程数
线程池初始化时默认不创建任何线程,只有当任务提交时,才会逐步创建核心线程来执行任务。
2.2、maximumPoolSize:最大线程数
当核心线程全部忙碌、且任务队列已满时,若当前工作线程总数小于maximumPoolSize,则会创建非核心线程执行任务。
2.3、keepAliveTime:非核心线程空闲超时时间
非核心线程的空闲时长超过该值时,会被自动终止并回收;
若corePoolSize = maximumPoolSize(无额外非核心线程),则此参数无效。
2.4、unit:keepAliveTime对应的时间单位(如秒、毫秒、分钟等)
2.5、workQueue:任务等待队列,用于暂存待执行的任务,常见类型:
- ArrayBlockingQueue:有界队列,容量固定。队列满后,会创建非核心线程;若最大线程数也打满,则触发拒绝策略。
- LinkedBlockingQueue:无界队列,容量理论上无限。若任务生产速度远大于消费速度,可能导致内存暴涨、OOM。
- SynchronousQueue:同步移交队列,不做任务缓冲,容量为 0,提交任务必须立刻有线程接手。
2.6、threadFactory:线程创建工厂
- 是创建线程的接口,默认使用
Executors.defaultThreadFactory()。 - 可通过实现
ThreadFactory接口,自定义线程名称、优先级、守护状态等。
2.7、handler:拒绝策略(线程池已满,无法接收新任务时执行)
- AbortPolicy:默认策略,直接抛出
RejectedExecutionException拒绝任务。 - CallerRunsPolicy:由提交任务的主线程自己执行该任务。
- DiscardOldestPolicy:丢弃队列中最老的待执行任务,再尝试提交当前任务。
- DiscardPolicy:直接静默丢弃当前任务,不抛异常、无任何通知。
- 也可通过实现
RejectedExecutionHandler接口,自定义拒绝逻辑。
三、执行方法
1、ThreadPoolExecutor中提供了两种执行任务的方法
- void execute(Runnable command)
- Future submit(Runnable task)
实际上,submit()底层最终还是调用了execute()方法,区别在于它会额外返回一个 Future 对象,用来接收并获取任务的执行结果:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }2、execute和submit区别
| 特性 | execute() | submit() |
|---|---|---|
| 返回值 | void(无返回) | Future<?>(可获取结果) |
| 异常处理 | 异常直接抛出,无法捕获 | 异常封装在 Future 中,调用 get() 时抛出 |
| 适用场景 | 只需执行,不关心结果 | 需要获取返回值或处理异常 |
3、execute分析
3.1、源码
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 工作线程数为0,表示线程池处于运行中 int c = ctl.get(); // 如果当前工作线程数 < 核心线程数,则添加工作线程,并把command作为该线程要执行的任务 if (workerCountOf(c) < corePoolSize) { // addWorker判断当前工作线程数是不是超过了核心线程数 // 超过了addWorker返回false,不能直接开启新的线程来执行任务,而是应该先入队 if (addWorker(command, true)) return; // 添加核心工作线程失败,重新获取ctl // 1、线程池状态被其他线程修改了 // 2、其他线程也在向线程池提交任务,导致核心工作线程已经超过了核心线程数 c = ctl.get(); } // 线程池状态是否还是RUNNING,尝试将任务放入阻塞队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //二次检查:防止在入队过程中线程池被关闭 if (! isRunning(recheck) && remove(command)) reject(command); //判断工作线程数,如果为0,那就添加一个非核心的工作线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // false表示非核心工作线程,addWorker内部会判断当前工作线程数已经超过最大线程数 // 超过了则添加失败,执行拒绝策略 else if (!addWorker(command, false)) reject(command); }4、增加线程方法
4.1、addWorker
addWorker是线程池的核心方法,负责创建并新增工作线程。其中core参数用于标识,本次创建的是核心线程还是非核心线程。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 第一阶段:CAS 增加线程计数 int c = ctl.get(); int rs = runStateOf(c); // 线程池状态 >= SHUTDOWN(SHUTDOWN/STOP/TIDYING/TERMINATED),拒绝创建新线程 // 状态是 SHUTDOWN 且没有新任务(firstTask == null)且队列不为空时,允许创建线程来处理队列中的任务 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 判断工作线程数是否超过了限制 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 超过限制了,则return false return false; // CAS 增加工作线程数 if (compareAndIncrementWorkerCount(c)) // 成功则跳出外层循环 break retry; c = ctl.get(); if (runStateOf(c) != rs) // 状态变化,重新外层循环 continue retry; } } //第二阶段:创建 Worker 并启动 // ctl修改成功,工作线程数+1成功,开启一个新的工作线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // Worker 实现 Runnable 接口,作为线程池的工作单元 // 构造 Worker 对象时,通过 ThreadFactory 创建专属执行线程 // Worker 包含两个核心属性: // Runnable firstTask:Worker 首次执行的任务,后续任务从阻塞队列中获取 // Thread thread:Worker 绑定的执行线程,负责从队列拉取任务并执行 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 再次检查状态(双重检查) // 若线程池已处于 SHUTDOWN 状态,且当前工作线程无初始任务 // 说明本次调用 addWorker 是为了从任务队列中取任务执行 // 正常情况下 SHUTDOWN 状态不允许创建新工作线程,但队列中存在待执行任务时,属于允许创建的特例 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // Worker对象对应的线程已经在运行了,直接抛异常 if (t.isAlive()) throw new IllegalThreadStateException(); // workers记录当前线程池中工作线程 workers.add(w); // largestPoolSize用来跟踪线程池在运行过程中工作线程数的峰值 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动 Worker 内部的线程 t.start(); workerStarted = true; } } } finally { // 在上述过程中如果抛了异常,需要从works中移除所添加的work,并且还要修改ctl,工作线程数-1,表示新建工作线程失败 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }⭕addWorker方法的核心执行分两个阶段:
⚠️1、CAS 循环增加工作线程数(第一个 retry 循环)
✔️首先校验当前工作线程数,判断是否超出阈值限制;
✔️通过 CAS 修改ctl变量,完成工作线程计数加1;
⚠️2、加锁(ReentrantLock)创建 Worker 并启动线程(第二个 try 块)
✔️新建 Worker 任务对象,并将其加入线程池的 workers 集合;
✔️最终启动 Worker 内部绑定的工作线程。
5、工作线程执行
5.1、Worker的构造方法
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 重点:把 Worker 自己(this)作为 Runnable 传给线程 this.thread = getThreadFactory().newThread(this); }⚠️注意
✔️addWorker方法中执行构造:w = new Worker(firstTask);
5.2、工作线程运行时,就会执行Worker的run方法
public void run() { // 工作线程运行时的执行逻辑 runWorker(this); }⚠️注意
✔️构建时通过getThreadFactory().newThread(this)将Worker自身作为Runnable传入线程,因此addWorker中调用t.start(),本质是执行当前Worker的run方法。
5.3、runWorker:运行
final void runWorker(Worker w) { // 当前工作线程 Thread wt = Thread.currentThread(); // 把Worker的第一个任务拿,创建的时候赋值的任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { //当前线程是否有第一个任务,没有就从阻塞队列中获取任务,都没有就阻塞线程 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //留给子类扩展监控逻辑 beforeExecute(wt, task); Throwable thrown = null; try { // 执行用户任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //留给子类扩展监控逻辑 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 清理工作线程 processWorkerExit(w, completedAbruptly); } }⚠️注意
✔️执行 firstTask → 循环从队列取任务(getTask) → 执行任务 → 清理退出
5.4、getTask:获取任务
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // STOP 状态 或 SHUTDOWN+队列空:工作线程数减1,返回null让线程退出 //由于线程池正在关闭或已停止,所以线程不阻塞,直接退出 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //工作线程数超过了工作线程的最大限制或者线程超时了,则要修改ctl if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) //return null外层的while循环退出,线程直接运行结束 return null; continue; } try { // 超时阻塞,无限阻塞逻辑 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //阻塞期间获取到了任务 if (r != null) return r; //发生超时后,线程会重新进入循环。 //上方代码会检测到当前线程已阻塞超时,最终返回 null,线程随之正常终止。 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }⚠️注意
✔️阻塞线程:基于阻塞队列workQueue.take(),当队列无任务可取时,线程进入阻塞状态。
✔️超时阻塞线程:使用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),超时未获取到任务则退出阻塞。
5.5、processWorkerExit:结束方法
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //累加完成任务数,供监控使用 completedTaskCount += w.completedTasks; // 从集合中移除 workers.remove(w); } finally { mainLock.unlock(); } //检查是否可以完全终止线程池(队列为空且无工作线程时,将状态转为 TERMINATED)。 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 正常退出才判断 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; //当前工作线程数 >= 最小要求,不需要补充新线程 if (workerCountOf(c) >= min) // 线程数足够,不补充 return; } // 补充新线程 addWorker(null, false); } }⚠️注意
✔️执行流程:统计任务数 → 移除Worker → 尝试终止线程池 → 必要时补充新线程
✔️completedAbruptly = true:执行任务时发生异常,需更新 ctl,将工作线程数 -1
✔️completedAbruptly = false:线程因阻塞超时/中断退出,同样需更新 ctl,将工作线程数 -1
6、关闭线程池
6.1、shutdown:优雅关闭
public void shutdown() { final ReentrantLock mainLock = this.mainLock; //防止多个线程同时调用 shutdown() 导致状态混乱 mainLock.lock(); try { //检查是否有权限关闭线程池(安全管理器) checkShutdownAccess(); // 修改ctl,将线程池状态改为SHUTDOWN advanceRunState(SHUTDOWN); // 中断所有正在等待任务的线程(不是正在执行任务的线程) interruptIdleWorkers(); // 子类扩展使用 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 检查是否可以完全终止线程池(队列为空且无工作线程时,将状态转为 TERMINATED)。 tryTerminate(); }⚠️注意
✔️优雅关闭:中断所有正在等待任务的线程(不是正在执行任务的线程)
6.2、正在执行的任务怎么退出呢?
正在执行的任务不会被中断,退出的关键是:
- 当前任务执行完 → 继续循环
- 尝试取下一个任务 → getTask() 发现线程池已关闭
- 返回 null → 循环结束,线程退出
⚠️注意
✔️在runWorker执行流程中,若getTask检测到线程池为SHUTDOWN状态,会直接返回null,当前工作线程随即正常退出。
6.3、shutdownNow:强制关闭
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改ctl,线程池状态改为STOP,正在执行的任务也会被中断、不再接受新任务、不再处理队列中的任务 advanceRunState(STOP); // 中断所有线程(包括正在执行的) interruptWorkers(); // 清空队列并返回未执行任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 检查是否可以完全终止线程池(队列为空且无工作线程时,将状态转为 TERMINATED)。 tryTerminate(); return tasks; }⚠️注意
✔️强制关闭:不管任务是否在执行,都尝试中断
✔️返回未完成任务:队列中的任务不会丢失,交给调用者处
✔️生产环境一般用 shutdown() 优雅关闭
四、面试题
1、线程池为什么使用阻塞队列?
线程池内的工作线程启动后,在执行完初始化绑定的首个任务后,会持续从阻塞队列中循环获取并执行任务。
当队列暂无待执行任务时,核心线程不会直接销毁,而是在获取任务的逻辑处阻塞等待。一旦队列新增任务,阻塞的线程便可被唤醒,继续处理任务。
借助该阻塞等待机制,线程池得以稳定维持指定数量的核心线程。核心实现代码如下:
try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }2、工作线程一旦发生异常,会直接被线程池回收移除吗?
线程执行任务一旦抛出异常,最终会进入processWorkerExit()方法。该方法执行完毕后,当前工作线程会正常消亡。
但关键在于:processWorkerExit()内部会主动补充创建新的工作线程,以此保障线程池核心线程数量稳定,不会因异常线程退出而减少。
⚠️注意
✔️源码参考上文5.5、processWorkerExit方法解析
五、总结
ThreadPoolExecutor依靠 Worker 封装工作线程,通过runWorker循环消费阻塞队列任务。- 核心线程通过
take()永久阻塞常驻,非核心线程依靠poll()超时自动回收。 - 线程池状态变更、队列无任务、阻塞超时或任务异常,都会让线程退出;
processWorkerExit会自动补建新线程,维持核心线程数量。