动态线程池oneThread系统
基本概念
什么是线程池(Thread Pool)
是一种基于池化思想管理线程的工具,线程池维护多个线程,等待管理者分配可并发执行的任务
线程池的作用
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
线程池解决的问题
- 防止频繁申请/销毁资源和调度资源
- 抑制对资源无限的申请
- 合理管理内部的资源分布
虚拟线程
为了突破传统平台线程在内存占用、上下文切换以及线程池参数难以评估的瓶颈,Java引入了虚拟线程,以更轻量的方式实现高并发,解决“一请求一线程”模式下资源消耗过大的问题
- 和传统线程的区别:不再一一对应 OS 线程,而是大量虚拟线程映射到少量线程上。当虚拟线程执行阻塞操作时,JVM 会自动将其挂起并释放底层的 OS 线程供其他虚拟线程使用。因此内存占用少、支持百万级并发线程
- 优势区间:有大量IO密集型任务时,省去了切换线程的开销
- 问题:
- 固定平台线程(pinning)。如虚拟线程在获取synchronized锁后进入阻塞状态,这时载体线程也必须一起阻塞,而不能释放资源,导致虚拟线程的优势消失
- 无限创建虚拟线:虚拟线程的占用很低,可以一下大量创建很多,因此如果不加限制,会导致竞争资源被一下消耗光
- 技术相对较新,还不是很稳定、成熟,老版本JDK不支持
Java中线程池的实现
Java中的线程池核心实现类是ThreadPoolExecutor
运行流程

运行机制
维护线程池状态
通过AtomicInteger变量ctl来保存:
- 运行状态(高3位)
- 线程数量(后29位)
用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源
运行状态种类
| 运行状态 | 状态描述 |
|---|---|
| RUNNING | 能接受新提交的任务,并且也能处理阻塞队列中的任务。 |
| SHUTDOWN | 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。 |
| STOP | 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。 |
| TIDYING | 所有的任务都已终止了,workerCount (有效线程数) 为 0。 |
| TERMINATED | 在 terminated () 方法执行完后进入该状态。 |
执行任务
workerCount:当前线程数
corePoolSize:核心线程数(会被保留下来的线程数量)
maximumPoolSize:能存在的最大线程数
- workerCount < corePoolSize:创建线程来执行
- workerCount >= corePoolSize:放到阻塞队列里排队
- workerCount >= corePoolSize 且阻塞队列满了:创建新线程来执行
- workerCount >= corePoolSize 且阻塞队列满了,且workerCount >= maximumPoolSize:根据拒绝策略来处理该任务
拒绝任务策略
当阻塞队列满了 且 线程数达到最大值 时会触发拒绝策略
| 名称 | 描述 | |
|---|---|---|
| 1 | ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出 RejectedExecutionException 异常。是线程池默认的拒绝策略。 |
| 2 | ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常。 |
| 3 | ThreadPoolExecutor.DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新提交被拒绝的任务。 |
| 4 | ThreadPoolExecutor.CallerRunsPolicy | 由调用线程(提交任务的线程)处理该任务。 |
工作线程Worker
有两个变量:
- thread:线程
- firstTask:执行的任务
如果是核心线程,会无限轮询获取任务;
如果是非核心线程,当无法获取到新任务时,轮询会结束,并主动消除在线程池里的引用,被JVM回收;
实际Worker线程并没有被标记是否为核心线程:
当线程数 <= 核心线程数时,线程会阻塞直到获取到任务
当线程数 > 核心线程数时,让线程有限等待获取任务,超时就返回null,让线程自己销毁
阻塞队列
要根据实际情况来选择
| 名称 | 描述 |
|---|---|
| ArrayBlockingQueue | 一个用数组实现的有界阻塞队列,此队列按照先进先出 (FIFO) 的原则对元素进行排序。支持公平锁和非公平锁。 |
| LinkedBlockingQueue | 一个由链表结构组成的有界队列,此队列按照先进先出 (FIFO) 的原则对元素进行排序。此队列的默认长度为 Integer.MAX_VALUE,所以默认创建的该队列有容量危险。 |
| PriorityBlockingQueue | 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现 compareTo () 方法来指定元素排序规则,不能保证同优先级元素的顺序。 |
| DelayQueue | 一个实现 PriorityBlockingQueue 实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。 |
| SynchronousQueue | 一个不存储元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue 的一个使用场景是在线程池里。Executors.newCachedThreadPool () 就使用了 SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了 60 秒后会被回收。 |
| LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue 队列多了 transfer 和 tryTransfer 方法。 |
| LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。 |
线程池的使用场景
场景1:快速响应用户请求
一个网页有多个数据需要查询时,可用线程池并行查询;
这种场景重要的响应速度,所以应不设置阻塞队列(SynchronousQueue,容量为0的队列),而是适当调大核心线程数和最大线程数;
场景2:快速处理批量任务
离线的大量计算任务;
目标是尽可能在单位时间内处理更多的任务,并不需要瞬时的完成,因此需要设置阻塞队列;
具体实现
线程池
1 | |
工作线程(Worker)
所有的Worker存储在一个HashSet中
1 | |
Wroker构造函数的主要组成
1 | |
锁
整个线程池提供一把主锁 mainLock,每次操作或读取这种全局性变量的时候,都需要获取主锁才能进行
1 | |
存储线程池状态
通过AtomicInteger的ctl来存储线程池状态(前3位)和工作线程数(后29位)
1 | |
停止线程
由最后一个执行完任务的线程来调用tryTerminate()终止线程池
- shutdown():不接收新任务,等待已有任务完成
- shutdownNow():不接收新任务,中断已有任务
1 | |
任务调度
1 | |
项目概述
oneThread 是基于配置中心 构建的 动态可观测 Java 线程池框架,弥补了 JDK 原生线程池参数配置不灵活的不足 ,支持核心参数的在线动态调整 、运行时状态监控 与 阈值告警 ,有效提升系统的稳定性与可运维性。框架兼容主流配置中心如 Nacos、Apollo,实现线程池参数的热更新
实现原理
读取远端配置
绑定配置到对象
Nacos配置本质上是字符串形式的键值对形式,通过绑定到类BootstrapConfigProperties上来便于类型转换、代码提示和后续维护
非Spring环境全局共享变量
而BootstrapConfigProperties类是在非Spring环境的core包下,通过在Bean装配的时候赋值给静态单例变量来实现全局共享
如果是Spring环境下,用@ConfigurationProperties(prefix = “onethread”)就能自动绑定对应前缀的配置
1 | |
感知远端配置变化
通过自定义的starter,自动装配
NacosCloudRefresherHandler这个Bean;这个Bean实现了
ApplicationRunner接口,当Spring容器初始化完成后,会注册一个监听来监听Nacos远端参数的变化;当远端参数发生变化后,监听器会感知到,并将传来的字符串形式的配置信息,通过
Binder绑定更新到配置类中发布更新事件
保障并发更新线程池参数
通过接口
ApplicationListener监听参数更新的事件来触发更新可能存在多个用户同时更新线程池参数的情况,因此用
synchronized在线程池Id上加锁,实现细粒度的上锁1
2
3// intern() 保证是从字符串常量池里拿到的同一个对象
synchronized (threadPoolId.intern()) {
...比较远端参数和本地的线程池参数是否有变化
修改本地的线程池参数:主要的是核心线程数和最大核心数,若新核心线程数大于旧最大核心数,就需要先设置最大核心数;否则就先设置和新线程数(因为新最大线程数可能调小了,会让旧核心线程数超过最大线程数)
线程池的核心线程数必须 <= 最大线程数,否则会报错
阻塞队列热更新
在请求的高峰期和低谷期需要动态地设置队列容量,防止高峰流量造成服务雪崩
Java原生的阻塞队列是不能动态调整容量的
在原生
LinkedBlockingQueue的基础上进行修改主要修改部分:
去掉了
capacity的final关键字,让容量可修改在修改容量
capacity后,主动去唤醒等待队列里的线程1
2
3
4
5
6
7
8public void setCapacity(int capacity) {
final int oldCapacity = this.capacity;
this.capacity = capacity;
final int size = count.get();
if (capacity > size && size >= oldCapacity) {
signalNotFull();
}
}调整进入已满等待队列的进入条件
1
2
3
4// 原本的是count.get() == capacity
while (count.get() >= capacity) {
notFull.await();
}修改后,若调小容量,也能阻塞线程
线程池监控
通过定时任务扫描,对线程池活跃度、队列负载和拒绝异常三个方向进行监控
- 活跃度:活跃线程数 / 最大线程数
- 队列负载:任务数 / 队列容量
- 拒绝异常:监控到新的
RejectedExecutionException
实现:
监控
通过JDK提供的
ScheduledExecutorService实现定时任务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public void start() {
// 每10秒检查一次,初始延迟0秒
scheduler.scheduleWithFixedDelay(this::checkAlarm, 0, 5, TimeUnit.SECONDS);
}
// 定时任务
private void checkAlarm() {
Collection<ThreadPoolExecutorHolder> holders = OneThreadRegistry.getAllHolders();
for (ThreadPoolExecutorHolder holder : holders) {
if (holder.getExecutorProperties().getAlarm().getEnable()) {
checkQueueUsage(holder); // 检查队列负载
checkActiveRate(holder); // 检查活跃度
checkRejectCount(holder); // 检查拒绝数
}
}
}通过配置文件注入监听Bean
1
2
3
4@Bean(initMethod = "start", destroyMethod = "stop")
public ThreadPoolAlarmChecker threadPoolAlarmChecker(NotifierDispatcher notifierDispatcher) {
return new ThreadPoolAlarmChecker(notifierDispatcher);
}告警
队列负载和线程活跃度都是通过轮询线程池,获取参数并计算百分比,当超过限额时就发送告警
拒绝数告警是通过动态模式代理线程池的拒绝方法,添加计数功能和告警功能实现,在定时任务里会扫描拒绝次数来发出告警
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30@AllArgsConstructor
public class RejectedProxyInvocationHandler implements InvocationHandler {
private final Object target;
private final AtomicLong rejectCount;
private static final String REJECT_METHOD = "rejectedExecution";
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 判断是否是要代理的方法,通过方法名和参数来校验
if (REJECT_METHOD.equals(method.getName()) &&
args != null &&
args.length == 2 &&
args[0] instanceof Runnable &&
args[1] instanceof ThreadPoolExecutor) {
rejectCount.incrementAndGet();
}
if (method.getName().equals("toString") && method.getParameterCount() == 0) {
return target.getClass().getSimpleName();
}
try {
return method.invoke(target, args);
} catch (InvocationTargetException ex) {
throw ex.getCause();
}
}
}
告警
要控制告警的间隔
限制告警次数策略:
时间窗口限流:为每一个线程池设立一个时间戳,只有距离上次时间戳一段时间后才能发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private static final Map<String, Long> ALARM_RECORD = new ConcurrentHashMap<>();
public static boolean allowAlarm(String threadPoolId, String alarmType, int intervalMinutes) {
String key = buildKey(threadPoolId, alarmType);
long currentTime = System.currentTimeMillis();
long intervalMillis = intervalMinutes * 60 * 1000L;
// compute:保证了整个"读取-计算-写入"过程的原子性
return ALARM_RECORD.compute(key, (k, lastTime) -> {
if (lastTime == null || (currentTime - lastTime) > intervalMillis) {
return currentTime; // 更新时间为当前时间
}
return lastTime; // 保持原时间
}) == currentTime; // 返回值等于当前时间说明允许发送
}
private static String buildKey(String threadPoolId, String alarmType) {
return threadPoolId + "|" + alarmType;
}
用到的设计模式
Builder
在创建字段特别多、构造复杂的对象时,可不用写一堆重载构造方法,不用臃肿的 set 方法。
通过链式调用、清晰优雅、还能保证对象不可变
在ThreadPoolExecutorBuilder中用到了这种设计模式,
1 | |
使用时先用builder()方法创建出实例,然后链式设置参数,最后bulid()检查必要参数并创建对象并返回
1 | |
模板方法
父类定义一套「固定不变的执行流程 / 算法骨架」,流程中可变的步骤定义为抽象方法,交给子类去实现;子类只重写可变细节,不修改父类的核心流程。
动态线程池在多配置中心支持的场景下,监听器注册与配置刷新存在高度相似的处理流程。为了统一逻辑、增强可扩展性,引入模板方法模式 进行重构,形成一套结构清晰、职责分明的刷新机制。
观察者模式
通过Spring的ApplicationListener接口来创建监听器,通过ApplicationContextHolder.getInstance().publishEvent来发布事件;
当比如当参数更新时就会发布事件来触发线程池的参数更新
模板方法
模板方法核心思想:在抽象类的一个方法里定义一个操作的框架,将具体的实现步骤留到子类来实现。
优点:
- 复用性强
- 扩展性强
- 结构清晰
为了支持多种配置中心并保持良好的可维护性,采用了模板方法模式 来封装公共流程、抽象差异行为,从而实现可扩展、高内聚的动态配置刷新能力。
定义模板抽象类AbstractDynamicThreadPoolRefresher,定义刷新步骤
1 | |
在具体的Nacos刷新类和Apollo刷新类里实现注册监听器逻辑
代理模式
真实对象和代理对象都实现同一个接口,调用方通过引入代理对象对其行为进行增强的设计手段
- 静态代理:编译期手动编写代理类代码,一个目标类对应一个代理类
- 动态代理:在运行时生成代理类
项目里通过实现JDK提供的接口InvocationHandler ,实现invoke方法实现动态代理