动态线程池oneThread系统

基本概念

  1. 什么是线程池(Thread Pool)

    是一种基于池化思想管理线程的工具,线程池维护多个线程,等待管理者分配可并发执行的任务

  2. 线程池的作用

    • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
    • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
    • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
    • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
  3. 线程池解决的问题

    • 防止频繁申请/销毁资源和调度资源
    • 抑制对资源无限的申请
    • 合理管理内部的资源分布
  4. 虚拟线程

    为了突破传统平台线程在内存占用、上下文切换以及线程池参数难以评估的瓶颈,Java引入了虚拟线程,以更轻量的方式实现高并发,解决“一请求一线程”模式下资源消耗过大的问题

    • 和传统线程的区别:不再一一对应 OS 线程,而是大量虚拟线程映射到少量线程上。当虚拟线程执行阻塞操作时,JVM 会自动将其挂起并释放底层的 OS 线程供其他虚拟线程使用。因此内存占用少支持百万级并发线程
    • 优势区间:有大量IO密集型任务时,省去了切换线程的开销
    • 问题
      • 固定平台线程(pinning)。如虚拟线程在获取synchronized锁后进入阻塞状态,这时载体线程也必须一起阻塞,而不能释放资源,导致虚拟线程的优势消失
      • 无限创建虚拟线:虚拟线程的占用很低,可以一下大量创建很多,因此如果不加限制,会导致竞争资源被一下消耗光
      • 技术相对较新,还不是很稳定、成熟,老版本JDK不支持

Java中线程池的实现

Java线程池实现原理及其在美团业务中的实践

Java中的线程池核心实现类是ThreadPoolExecutor

运行流程

ThreadPoolExecutor运行流程

运行机制

维护线程池状态

通过AtomicInteger变量ctl来保存:

  • 运行状态(高3位)
  • 线程数量(后29位)

用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源

运行状态种类

运行状态 状态描述
RUNNING 能接受新提交的任务,并且也能处理阻塞队列中的任务。
SHUTDOWN 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
STOP 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
TIDYING 所有的任务都已终止了,workerCount (有效线程数) 为 0。
TERMINATED 在 terminated () 方法执行完后进入该状态。

执行任务

workerCount:当前线程数

corePoolSize:核心线程数(会被保留下来的线程数量)

maximumPoolSize:能存在的最大线程数

  1. workerCount < corePoolSize:创建线程来执行
  2. workerCount >= corePoolSize:放到阻塞队列里排队
  3. workerCount >= corePoolSize 且阻塞队列满了:创建新线程来执行
  4. workerCount >= corePoolSize 且阻塞队列满了,且workerCount >= maximumPoolSize:根据拒绝策略来处理该任务

拒绝任务策略

当阻塞队列满了 且 线程数达到最大值 时会触发拒绝策略

名称 描述
1 ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出 RejectedExecutionException 异常。是线程池默认的拒绝策略。
2 ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常。
3 ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新提交被拒绝的任务。
4 ThreadPoolExecutor.CallerRunsPolicy 由调用线程(提交任务的线程)处理该任务。

工作线程Worker

有两个变量:

  • thread:线程
  • firstTask:执行的任务

如果是核心线程,会无限轮询获取任务;

如果是非核心线程,当无法获取到新任务时,轮询会结束,并主动消除在线程池里的引用,被JVM回收;

实际Worker线程并没有被标记是否为核心线程:

当线程数 <= 核心线程数时,线程会阻塞直到获取到任务

当线程数 > 核心线程数时,让线程有限等待获取任务,超时就返回null,让线程自己销毁

阻塞队列

要根据实际情况来选择

名称 描述
ArrayBlockingQueue 一个用数组实现的有界阻塞队列,此队列按照先进先出 (FIFO) 的原则对元素进行排序。支持公平锁和非公平锁。
LinkedBlockingQueue 一个由链表结构组成的有界队列,此队列按照先进先出 (FIFO) 的原则对元素进行排序。此队列的默认长度为 Integer.MAX_VALUE,所以默认创建的该队列有容量危险。
PriorityBlockingQueue 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现 compareTo () 方法来指定元素排序规则,不能保证同优先级元素的顺序。
DelayQueue 一个实现 PriorityBlockingQueue 实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
SynchronousQueue 一个不存储元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue 的一个使用场景是在线程池里。Executors.newCachedThreadPool () 就使用了 SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了 60 秒后会被回收。
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue 队列多了 transfer 和 tryTransfer 方法。
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

线程池的使用场景

场景1:快速响应用户请求

一个网页有多个数据需要查询时,可用线程池并行查询;

这种场景重要的响应速度,所以应不设置阻塞队列(SynchronousQueue,容量为0的队列),而是适当调大核心线程数和最大线程数;

场景2:快速处理批量任务

离线的大量计算任务;

目标是尽可能在单位时间内处理更多的任务,并不需要瞬时的完成,因此需要设置阻塞队列;

具体实现

线程池

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 非核心线程闲置存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 工作队列
ThreadFactory threadFactory, // 创建线程使用的线程工厂
RejectedExecutionHandler handler // 拒绝策略) {
}

工作线程(Worker)

所有的Worker存储在一个HashSet

1
private final HashSet<Worker> workers = new HashSet<Worker>();

Wroker构造函数的主要组成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {

final Thread thread; // 线程对象
Runnable firstTask; // 首个执行的任务,一般执行完任务后就保持为空
volatile long completedTasks; // 该工作线程已经完成的任务数

Worker(Runnable firstTask) {
setState(-1); // 默认状态为 -1,禁止中断直到线程启动为止
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

public void run() {
runWorker(this); // 循环获取阻塞队列里的任务
}
}

整个线程池提供一把主锁 mainLock,每次操作或读取这种全局性变量的时候,都需要获取主锁才能进行

1
private final ReentrantLock mainLock = new ReentrantLock();

存储线程池状态

通过AtomicIntegerctl来存储线程池状态(前3位)和工作线程数(后29位)

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

停止线程

由最后一个执行完任务的线程来调用tryTerminate()终止线程池

  • shutdown():不接收新任务,等待已有任务完成
  • shutdownNow():不接收新任务,中断已有任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 将线程池状态改为 SHUTDOWN,在shutdownNow中是STOP
advanceRunState(SHUTDOWN);
// 中断线程池中的所有空闲线程
interruptIdleWorkers();
// 钩子函数,默认空实现
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}

任务调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// 如果当前工作线程数小于核心线程数,则启动一个新工作线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// 如果当前工作线程数大于核心线程数,则尝试将任务添加到工作队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果任务添加成功,但是发现线程池已经不运行了,则移除任务并且触发拒绝策略
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果任务添加成功,但是线程池中已经没有工作线程,则添加一个新的工作线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

// 如果当前工作线程数大于核心线程数,且工作队列已满,则启动一个新工作线程执行任务,
// 如果工作线程数已经达到最大值,则直接触发拒绝策略
else if (!addWorker(command, false))
reject(command);
}

项目概述

oneThread 是基于配置中心 构建的 动态可观测 Java 线程池框架,弥补了 JDK 原生线程池参数配置不灵活的不足 ,支持核心参数的在线动态调整运行时状态监控阈值告警 ,有效提升系统的稳定性与可运维性。框架兼容主流配置中心如 Nacos、Apollo,实现线程池参数的热更新

实现原理

读取远端配置

绑定配置到对象

Nacos配置本质上是字符串形式的键值对形式,通过绑定到类BootstrapConfigProperties上来便于类型转换、代码提示和后续维护

非Spring环境全局共享变量

BootstrapConfigProperties类是在非Spring环境的core包下,通过在Bean装配的时候赋值给静态单例变量来实现全局共享

如果是Spring环境下,用@ConfigurationProperties(prefix = “onethread”)就能自动绑定对应前缀的配置

1
2
3
4
5
6
7
8
9
10
11
12
public class CommonAutoConfiguration {

@Bean
public BootstrapConfigProperties bootstrapConfigProperties(Environment environment) {
BootstrapConfigProperties bootstrapConfigProperties = Binder.get(environment)
.bind(BootstrapConfigProperties.PREFIX, Bindable.of(BootstrapConfigProperties.class))
.get();
// 赋值给静态单例对象
BootstrapConfigProperties.setInstance(bootstrapConfigProperties);
return bootstrapConfigProperties;
}
}

感知远端配置变化

  1. 通过自定义的starter,自动装配NacosCloudRefresherHandler这个Bean;

  2. 这个Bean实现了ApplicationRunner接口,当Spring容器初始化完成后,会注册一个监听来监听Nacos远端参数的变化;

  3. 当远端参数发生变化后,监听器会感知到,并将传来的字符串形式的配置信息,通过Binder绑定更新到配置类中

  4. 发布更新事件

保障并发更新线程池参数

  1. 通过接口ApplicationListener监听参数更新的事件来触发更新

  2. 可能存在多个用户同时更新线程池参数的情况,因此用synchronized线程池Id上加锁,实现细粒度的上锁

    1
    2
    3
    // intern() 保证是从字符串常量池里拿到的同一个对象
    synchronized (threadPoolId.intern()) {
    ...
  3. 比较远端参数和本地的线程池参数是否有变化

  4. 修改本地的线程池参数:主要的是核心线程数和最大核心数,若新核心线程数大于旧最大核心数,就需要先设置最大核心数;否则就先设置和新线程数(因为新最大线程数可能调小了,会让旧核心线程数超过最大线程数)

    线程池的核心线程数必须 <= 最大线程数,否则会报错

阻塞队列热更新

在请求的高峰期和低谷期需要动态地设置队列容量,防止高峰流量造成服务雪崩

  1. Java原生的阻塞队列是不能动态调整容量的

  2. 在原生LinkedBlockingQueue的基础上进行修改

    主要修改部分:

    • 去掉了capacityfinal关键字,让容量可修改

    • 在修改容量capacity后,主动去唤醒等待队列里的线程

      1
      2
      3
      4
      5
      6
      7
      8
      public void setCapacity(int capacity) {
      final int oldCapacity = this.capacity;
      this.capacity = capacity;
      final int size = count.get();
      if (capacity > size && size >= oldCapacity) {
      signalNotFull();
      }
      }
    • 调整进入已满等待队列的进入条件

      1
      2
      3
      4
      // 原本的是count.get() == capacity
      while (count.get() >= capacity) {
      notFull.await();
      }

      修改后,若调小容量,也能阻塞线程

线程池监控

通过定时任务扫描,对线程池活跃度队列负载拒绝异常三个方向进行监控

  • 活跃度:活跃线程数 / 最大线程数
  • 队列负载:任务数 / 队列容量
  • 拒绝异常:监控到新的RejectedExecutionException

实现

  • 监控

    通过JDK提供的ScheduledExecutorService实现定时任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public void start() {
    // 每10秒检查一次,初始延迟0秒
    scheduler.scheduleWithFixedDelay(this::checkAlarm, 0, 5, TimeUnit.SECONDS);
    }

    // 定时任务
    private void checkAlarm() {
    Collection<ThreadPoolExecutorHolder> holders = OneThreadRegistry.getAllHolders();
    for (ThreadPoolExecutorHolder holder : holders) {
    if (holder.getExecutorProperties().getAlarm().getEnable()) {
    checkQueueUsage(holder); // 检查队列负载
    checkActiveRate(holder); // 检查活跃度
    checkRejectCount(holder); // 检查拒绝数
    }
    }
    }

    通过配置文件注入监听Bean

    1
    2
    3
    4
    @Bean(initMethod = "start", destroyMethod = "stop")
    public ThreadPoolAlarmChecker threadPoolAlarmChecker(NotifierDispatcher notifierDispatcher) {
    return new ThreadPoolAlarmChecker(notifierDispatcher);
    }
  • 告警

    • 队列负载和线程活跃度都是通过轮询线程池,获取参数并计算百分比,当超过限额时就发送告警

    • 拒绝数告警是通过动态模式代理线程池的拒绝方法,添加计数功能和告警功能实现,在定时任务里会扫描拒绝次数来发出告警

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      @AllArgsConstructor
      public class RejectedProxyInvocationHandler implements InvocationHandler {

      private final Object target;
      private final AtomicLong rejectCount;

      private static final String REJECT_METHOD = "rejectedExecution";

      @Override
      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      // 判断是否是要代理的方法,通过方法名和参数来校验
      if (REJECT_METHOD.equals(method.getName()) &&
      args != null &&
      args.length == 2 &&
      args[0] instanceof Runnable &&
      args[1] instanceof ThreadPoolExecutor) {
      rejectCount.incrementAndGet();
      }

      if (method.getName().equals("toString") && method.getParameterCount() == 0) {
      return target.getClass().getSimpleName();
      }

      try {
      return method.invoke(target, args);
      } catch (InvocationTargetException ex) {
      throw ex.getCause();
      }
      }
      }

告警

要控制告警的间隔

限制告警次数策略:

  • 时间窗口限流:为每一个线程池设立一个时间戳,只有距离上次时间戳一段时间后才能发送

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    private static final Map<String, Long> ALARM_RECORD = new ConcurrentHashMap<>();

    public static boolean allowAlarm(String threadPoolId, String alarmType, int intervalMinutes) {
    String key = buildKey(threadPoolId, alarmType);
    long currentTime = System.currentTimeMillis();
    long intervalMillis = intervalMinutes * 60 * 1000L;

    // compute:保证了整个"读取-计算-写入"过程的原子性
    return ALARM_RECORD.compute(key, (k, lastTime) -> {
    if (lastTime == null || (currentTime - lastTime) > intervalMillis) {
    return currentTime; // 更新时间为当前时间
    }
    return lastTime; // 保持原时间
    }) == currentTime; // 返回值等于当前时间说明允许发送
    }

    private static String buildKey(String threadPoolId, String alarmType) {
    return threadPoolId + "|" + alarmType;
    }

用到的设计模式

Builder

在创建字段特别多、构造复杂的对象时,可不用写一堆重载构造方法,不用臃肿的 set 方法

通过链式调用、清晰优雅、还能保证对象不可变

ThreadPoolExecutorBuilder中用到了这种设计模式,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Getter
public class ThreadPoolExecutorBuilder {

/**
* 线程池唯一标识
*/
private String threadPoolId;

/**
* 核心线程数
*/
private Integer corePoolSize = Runtime.getRuntime().availableProcessors();

/**
* 最大线程数
*/
private Integer maximumPoolSize = corePoolSize + (corePoolSize >> 1);

// 其他变量
...

// 方法返回this,用于链式调用
public ThreadPoolExecutorBuilder dynamicPool() {
this.dynamicPool = true;
return this;
}

public ThreadPoolExecutorBuilder threadPoolId(String threadPoolId) {
this.threadPoolId = threadPoolId;
return this;
}

// 其他方法
...
}

使用时先用builder()方法创建出实例,然后链式设置参数,最后bulid()检查必要参数并创建对象并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public static ThreadPoolExecutorBuilder builder() {
return new ThreadPoolExecutorBuilder();
}

/**
* 构建线程池实例
*/
public ThreadPoolExecutor build() {
BlockingQueue<Runnable> blockingQueue = BlockingQueueTypeEnum.createBlockingQueue(workQueueType.getName(), workQueueCapacity);
RejectedExecutionHandler rejectedHandler = Optional.ofNullable(this.rejectedHandler)
.orElseGet(() -> new ThreadPoolExecutor.AbortPolicy());

Assert.notNull(threadFactory, "The thread factory cannot be null.");

ThreadPoolExecutor threadPoolExecutor;
if (dynamicPool) {
threadPoolExecutor = new OneThreadExecutor(
threadPoolId,
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
blockingQueue,
threadFactory,
rejectedHandler,
awaitTerminationMillis
);
} else {
threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
blockingQueue,
threadFactory,
rejectedHandler
);
}

threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
return threadPoolExecutor;
}

模板方法

父类定义一套「固定不变的执行流程 / 算法骨架」,流程中可变的步骤定义为抽象方法,交给子类去实现;子类只重写可变细节,不修改父类的核心流程。

动态线程池在多配置中心支持的场景下,监听器注册与配置刷新存在高度相似的处理流程。为了统一逻辑、增强可扩展性,引入模板方法模式 进行重构,形成一套结构清晰、职责分明的刷新机制。

观察者模式

通过Spring的ApplicationListener接口来创建监听器,通过ApplicationContextHolder.getInstance().publishEvent来发布事件;

当比如当参数更新时就会发布事件来触发线程池的参数更新

模板方法

模板方法核心思想:在抽象类的一个方法里定义一个操作的框架,将具体的实现步骤留到子类来实现。

优点:

  1. 复用性强
  2. 扩展性强
  3. 结构清晰

为了支持多种配置中心并保持良好的可维护性,采用了模板方法模式 来封装公共流程、抽象差异行为,从而实现可扩展、高内聚的动态配置刷新能力。

定义模板抽象类AbstractDynamicThreadPoolRefresher,定义刷新步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 注册配置变更监听器,由子类实现具体逻辑
*
* @throws Exception
*/
protected abstract void registerListener() throws Exception;

/**
* 默认空实现,子类可以按需覆盖
*/
protected void beforeRegister() {
}

/**
* 默认空实现,子类可以按需覆盖
*/
protected void afterRegister() {
}

@Override
public void run(ApplicationArguments args) throws Exception {
beforeRegister();
registerListener();
afterRegister();
}

在具体的Nacos刷新类和Apollo刷新类里实现注册监听器逻辑

代理模式

真实对象和代理对象都实现同一个接口,调用方通过引入代理对象对其行为进行增强的设计手段

  • 静态代理:编译期手动编写代理类代码,一个目标类对应一个代理类
  • 动态代理:在运行时生成代理类

项目里通过实现JDK提供的接口InvocationHandler ,实现invoke方法实现动态代理


动态线程池oneThread系统
http://xwww12.github.io/2026/05/06/后端/动态线程池oneThread系统/
作者
xw
发布于
2026年5月6日
许可协议