这一篇,我们来做一件真正“工程化”的事:
封装一套统一的“生产级线程池”,带线程命名、异常捕获、拒绝策略日志、基础监控与超时控制。
文章会以实际代码为主,你可以直接拷贝到项目中进一步改造。
一、目标:为什么要封装线程池?
先把痛点列清楚:
禁止直接用 Executors 默认工厂
newFixedThreadPool / newCachedThreadPool / newSingleThreadExecutor 都有隐藏坑(无界队列、线程无限增长等)。
线程池要统一管理
不要满项目到处散落 new ThreadPoolExecutor,定位问题非常难。
线程要有“读得懂的名字”
日志里看到的是
pool-1-thread-3完全不直观。
任务异常要统一捕获 + 打日志
默认行为:线程执行 Runnable 的异常如果没捕获,会直接丢掉。
拒绝策略必须有日志/报警
默默丢任务或只抛异常,很难查。
优雅停机 & 监控
服务停止时线程池要正常 shutdown。
至少能看到当前线程数、队列长度、拒绝次数等。
所以我们需要一个:
ThreadPoolManager / ThreadPoolFactory
来统一创建 & 管理线程池。
二、自定义 ThreadFactory:线程命名 + 异常兜底
第一步:让每个线程池的线程名字有语义。
比如:biz-io-1,biz-cpu-2,sched-worker-1。
定义一个简单的 ThreadFactory:
import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class NamedThreadFactory implements ThreadFactory { private final AtomicInteger idx = new AtomicInteger(1); private final String namePrefix; private final boolean daemon; public NamedThreadFactory(String namePrefix) { this(namePrefix, false); } public NamedThreadFactory(String namePrefix, boolean daemon) { this.namePrefix = namePrefix; this.daemon = daemon; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, namePrefix + "-" + idx.getAndIncrement()); t.setDaemon(daemon); // 兜底异常处理,防止异常直接把线程干掉而没人知道 t.setUncaughtExceptionHandler((thread, ex) -> { System.err.println("[UNCAUGHT] Thread = " + thread.getName()); ex.printStackTrace(); // 实际项目中换成日志/报警 }); return t; } }这样:
new NamedThreadFactory("biz-io")
日志里看到的就是类似biz-io-1、biz-io-2,定位问题非常直观。
三、自定义 RejectedExecutionHandler:拒绝时打日志 + 可选回压
第二步:统一处理拒绝策略。
生产一般不直接用 JDK 默认的 AbortPolicy,而是:
- 记录日志 / 打点
- 再选择具体策略(比如 CallerRunsPolicy)
我们可以包装一下:
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class LoggedCallerRunsPolicy implements RejectedExecutionHandler { private final String poolName; public LoggedCallerRunsPolicy(String poolName) { this.poolName = poolName; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 这里可以打日志 / 上报监控 System.err.println("[REJECTED] pool=" + poolName + ", active=" + e.getActiveCount() + ", poolSize=" + e.getPoolSize() + ", queueSize=" + e.getQueue().size()); // 回退到调用线程执行,形成反压 if (!e.isShutdown()) { r.run(); } } }这个策略的好处:
- 池子爆了 → 有日志可查 + 有回压
- 调用线程被拖慢 → 上游就自然降速,防止雪崩。
四、封装 ThreadPoolManager:统一出口创建线程池
我们可以做一个“线程池管理类”,按业务分类暴露几个常用线程池:
- CPU 密集型
- IO 密集型
- 定时调度线程池
示例(简单版单例):
import java.util.concurrent.*; public class ThreadPoolManager { private static final int CPU = Runtime.getRuntime().availableProcessors(); // CPU 密集任务线程池 private static final ThreadPoolExecutor CPU_POOL = new ThreadPoolExecutor( CPU + 1, CPU + 1, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new NamedThreadFactory("biz-cpu"), new LoggedCallerRunsPolicy("biz-cpu") ); // IO 密集任务线程池 private static final ThreadPoolExecutor IO_POOL = new ThreadPoolExecutor( CPU * 2, CPU * 4, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new NamedThreadFactory("biz-io"), new LoggedCallerRunsPolicy("biz-io") ); // 定时任务线程池 private static final ScheduledThreadPoolExecutor SCHEDULED_POOL = new ScheduledThreadPoolExecutor( CPU, new NamedThreadFactory("sched-worker"), new LoggedCallerRunsPolicy("sched") ); static { // 设置 Scheduled 线程池的策略:定时任务异常不影响后续调度 SCHEDULED_POOL.setRemoveOnCancelPolicy(true); } private ThreadPoolManager() {} public static ExecutorService cpuPool() { return CPU_POOL; } public static ExecutorService ioPool() { return IO_POOL; } public static ScheduledExecutorService scheduledPool() { return SCHEDULED_POOL; } // 优雅停机(可以在 Spring 的 Shutdown Hook 或 main 的 finally 中调用) public static void shutdownAll() { shutdownPool("biz-cpu", CPU_POOL); shutdownPool("biz-io", IO_POOL); shutdownPool("sched", SCHEDULED_POOL); } private static void shutdownPool(String name, ExecutorService pool) { System.out.println("[SHUTDOWN] " + name); pool.shutdown(); try { if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { System.out.println("[SHUTDOWN-NOW] " + name); pool.shutdownNow(); } } catch (InterruptedException e) { pool.shutdownNow(); Thread.currentThread().interrupt(); } } }之后项目中统一这样用:
ThreadPoolManager.ioPool().submit(() -> { // IO 任务 }); ThreadPoolManager.cpuPool().submit(() -> { // 计算任务 }); ThreadPoolManager.scheduledPool().scheduleAtFixedRate(() -> { // 定时任务 }, 0, 1, TimeUnit.MINUTES);这样全项目的线程池:
- 都走同一套工厂
- 有统一命名
- 有统一拒绝策略日志
- shutdown 时可以统一关闭
五、封装任务:统一异常捕获 + 打日志 + Trace(可选)
默认ThreadPoolExecutor对 Runnable 的异常处理方式是:
如果
run()抛异常而你没 try/catch,异常会从线程栈往上冒到线程,最终打印一次 uncaught exception(或被吞掉),不会再抛回 submit/execute 的调用方。
为了避免任务里有人忘记 try/catch,我们可以封一层:
public class SafeRunnable implements Runnable { private final Runnable delegate; private final String name; public SafeRunnable(Runnable delegate, String name) { this.delegate = delegate; this.name = name; } @Override public void run() { try { delegate.run(); } catch (Throwable e) { System.err.println("[TASK-EXCEPTION] task=" + name + ", thread=" + Thread.currentThread().getName()); e.printStackTrace(); // 这里可以对接日志系统 / 监控告警 } } public static Runnable wrap(Runnable r, String name) { return new SafeRunnable(r, name); } }使用方式:
ThreadPoolManager.ioPool().submit( SafeRunnable.wrap(() -> { // 业务代码,异常不用担心漏日志 int x = 1 / 0; }, "demo-io-task") );六、加入超时控制:Future + 超时 + 降级
对于某些关键任务,如:
下游接口调用
某个批量处理
我们不希望任务无限执行,可以加入 Future 超时控制:
ExecutorService io = ThreadPoolManager.ioPool(); Future<String> future = io.submit(() -> { // 模拟调用下游,耗时不确定 TimeUnit.SECONDS.sleep(5); return "OK"; }); try { String result = future.get(2, TimeUnit.SECONDS); // 最多等 2 秒 System.out.println("result = " + result); } catch (TimeoutException e) { System.err.println("[TIMEOUT] 调用超时,进行降级处理"); future.cancel(true); // 尝试中断任务 } catch (Exception e) { System.err.println("[ERROR] 调用异常"); e.printStackTrace(); }这就是最基础的“线程池级超时 + 降级”。
七、简单监控:在没有 Prometheus 之前先打印指标
可以先提供一个简单的方法,用来定时打印线程池的状态(后面再接监控系统):
public static void logState(String name, ThreadPoolExecutor pool) { System.out.println(String.format( "[POOL] %s | poolSize=%d, active=%d, queue=%d, completed=%d", name, pool.getPoolSize(), pool.getActiveCount(), pool.getQueue().size(), pool.getCompletedTaskCount() )); }然后可以用ScheduledExecutorService定时调用:
ThreadPoolManager.scheduledPool().scheduleAtFixedRate(() -> { ThreadPoolManager.logState("biz-cpu", (ThreadPoolExecutor) ThreadPoolManager.cpuPool()); ThreadPoolManager.logState("biz-io", (ThreadPoolExecutor) ThreadPoolManager.ioPool()); }, 0, 30, TimeUnit.SECONDS);等后期接入:
Micrometer
Prometheus / Grafana
自己的监控平台
都可以复用这些指标。
八、总结:生产级线程池封装的关键点
总结:
- 不要直接用 Executors 默认线程池,自己用 ThreadPoolExecutor + 有界队列。
- 统一线程池出口(ThreadPoolManager),避免到处散落 new。
- 用NamedThreadFactory给线程起有意义的名字,排查问题一眼就能看出哪个池出的事。
- 用自定义RejectedExecutionHandler(如 LoggedCallerRunsPolicy)对拒绝任务打日志 + 回压。
- 用SafeRunnable包装任务,统一捕获异常,防止任务异常悄悄丢失。
- 对重要任务用Future + 超时控制 + 降级,防止线程长期占用。
- 提供定时日志 / 监控方法,查看线程池的队列长度、活跃线程数等指标。
- 在应用优雅停机时统一调用
shutdownAll(),避免线程池悬挂。
做到这些,你从“会用线程池”升级为“能在生产环境放心地用线程池”。