完成 ISSUES #815 。
主要内容如下:
- 添加了插件接口
ThreadPoolPlugin
,插件管理器 ThreadPoolManager
,以及支持使用借助插件接口对特定操作点进行扩展的线程池实现 ExtensibleThreadPoolExecutor
;
- 对
DynamicThreadPoolExecutor
进行了重构,另其不再继承 AbstractDynamicExecutorSupport
,而是继承 ExtensibleThreadPoolExecutor
,并将其原有的扩展功能抽离为默认注册的插件,以便不影响其原有的 API;
插件接口
本次重构引入的插件机制,本质上是一套类似 Spring 的 Aware
或者 PostProcessor
,又或者 mybatis 的 Interceptor
这样的回调接口体系。
扩展的线程池 ExtensibleThreadPoolExecutor
重写了 ThreadPoolExecutor
的一些方法,在这些方法执行前后会回调对应类型的回调接口中的方法,从而允许用户通过向线程池注册回调接口——也就是所谓的插件——的方式植入所需的逻辑。
目前支持的扩展点有四种:
-
TaskAwarePlugin
:提供在 Callable
和 Runable
任务提交到线程池前调用,可以用它来包装或者替换提交进线程池的任务:
beforeTaskSubmit()
:在通过 ThreadPoolExecutor.submit
方法提交Callable
或 Runable
后, taskFor
创建任务前调用;
beforeTaskExecute()
:在通过 ExecuteService.execute
方法提交 Runable
任务前调用;
-
ExecuteAwarePlugin
:在线程池中的线程执行前后调用,提供两个方法:
beforeExecute()
:任务执行前调用;
afterExecute()
:任务执行后调用;
-
ShutdownAwarePlugin
:当线程池调用 shutdown
或者 shutdownNow
前后,以及线程池终止的时候调用,提供三个方法:
beforeShutdown()
:调用 shutdown()
或者 shutdownNow()
前调用;
afterShutdown()
:调用 shutdown()
或者 shutdownNow()
后调用;
afterTerminated()
:线程池终止后调用;
-
RejectAwarePlugin
:提供 beforeRejectedExecution()
一个方法,当触发拒绝策略后,在调用策略的 RejectExecuteHandler.rejectedExecution()
方法前调用;
执行流程
在线程池创建时,每个 ExtensibleThreadPoolExecutor
都会先创建一个插件管理器 ThreadPoolPluginManager
并绑定,用于对注册到线程池中插件进行管理。
当提交任务后,线程池在进行每一步支持回调的操作前,都会向插件管理器申请实现了与扩展点对应接口的插件,并调用插件方法,插件本身可以是单例也可以是多例的,因此也可以通过插件进一步了解单个或多个线程池的一些状态信息。
举个例子,如果我们希望能够有个插件能够记录线程池任务异常任务数量,可以这么做:
// 1、创建插件类,实现ExecuteAwarePlugin接口,重写afterExecute方法
@Getter
public class FailTaskCountRecordPlugin implements ExecuteAwarePlugin {
private final AtomicInteger failCount = new AtomicInteger(0);
private final String id = "TestExecuteAwarePlugin";
@Override
public void afterExecute(Runnable runnable, Throwable throwable) {
if (Objects.nonNull(throwable)) {
failCount.incrementAndGet();
}
}
}
// 2、注册到线程池
FailTaskCountRecordPlugin plugin = new FailTaskCountRecordPlugin();
executor.register(plugin);
executor.execute(() -> {throw new IllegalArgumentException("???");});
executor.execute(() -> {throw new IllegalArgumentException("???");});
ThreadUtil.sleep(50L);
// 3、获取插件实例,并得到数据
plugin = executor.getPluginOfType("TestExecuteAwarePlugin", FailTaskCountRecordPlugin.class).orElse(null);
plugin.getFailCount().get(); // = 2
功能分离
本次重构基于上述机制,将原有 DynamicThreadPoolExecutor
的一些默认功能也抽离为插件:
TaskDecoratorPlugin
:在提交任务前,对任务进行装饰;
TaskRejectCountRecordPlugin
:记录线程池拒绝任务数;
TaskTimeRecordPlugin
:记录线程池任务的最大、最小、总体执行时间;
TaskTimeoutNotifyAlarmPlugin
:当任务执行超时时进行告警;
ThreadPoolExecutorShutdownPlugin
:在线程池关闭后,等待线程池在指定时间内完成任务;
DynamicThreadPoolExecutor
作为 ExtensibleThreadPoolExecutor
的子类,创建时会默认注册上述插件,并且仍然支持以原有的 API 访问和设置上述功能的相关信息及配置。