线程池与面向对象——如何改造你的线程池?

前言

在准备面试的时候,看到了一道很有意思的面试题,与 Java 线程池相关:

线程池执行任务 Runnable 的时候,如果任务队列没满,则先放到队列中等待;如果任务队列满了,则创建一个新的临时线程执行任务。

如果此时我有一类紧急任务 InstantRunnable,我希望能够立即执行——不管队列满没满,都创建临时线程直接执行。

我应该如何改造线程池,实现这个功能?

在回答这个问题之前,我们先回顾一下线程池执行任务的流程。

线程池的执行流程

具体的代码实现在 JDK 中 ThreadPoolExecutor.execute()可以找到,对应的流程图如下所示:


flowchart LR
ex[Execute]
core?{Core threads full?\n 【1】}
q?{Task queue full?\n 【2】}
max?{Max threads full?\n 【3】}


ex --> proc
subgraph proc[Execution Process of a Task in a ThreadPool]
direction LR
core? --yes--> q?
core? --nope----> exCT[exe by a core thread]
q? --yes--> max?
q? --nope---> addQ[add to the task queue]

max? --nope--> create[create a new thread & execute]
max? --yes--> reject[reject the task]
end

对于一个新来的任务 Runnable

  1. 如果有核心线程空闲,则由核心线程执行。
  2. 如果核心线程满了,则尝试把任务放到队列中等待。
  3. 如果队列也满了,则创建一个临时线程去执行。
  4. 如果临时线程也满了,则执行拒绝策略。

改造分析

一种直接且糟糕的方法是直接修改线程池的源码。有没有更好的方案呢?

观察实现紧急执行的需求,可以发现,本质上是希望:

  • 对于InstantRunnable,忽略流程图中条件【2】的判断,直接走向其 yes 分支。

所以条件【2】在什么情况下会走向 yes 分支?

打开源码看看,可以发现条件【2】的判断语句是:

if (isRunning(c) && workQueue.offer(command)) {
    // 加入...
} else if (!addWorker(command, false)){
    // ...
}

也就是说,判断任务队列是否塞满是通过 boolean offer(Ruunable r) 方法实现的。

  • offer 方法返回 true,则添加队列成功,任务进入队列等待。
  • offer 方法返回 false,则队列塞满,尝试创建临时线程。

因此,我们只需要重写任务队列的 offer(Ruunable r) 方法即可:

  • 对于 InstantRunnable 类型的任务,offer() 方法直接返回 false。
  • 对于普通的 Runnable,正常执行原有的逻辑。

具体实现

定义 InstantRunnable 类型

首先我们需要定义 InstantRunnable,由于 offer() 接受的是 Runnable 类型,为了能让 offer() 也接受 InstantRunnable,我们需要把 InstantRunnable 定义为 Runnable子接口

回顾:能接受父类的方法也能接受子类,但是接受子类的方法不能接受父类。向下兼容。

当然这在逻辑上也是合理的,紧急任务是任务的子集。

所以我们这样定义 InstantRunnable

public interface InstantRunnable extends Runnable{
    // 定义 InstantRunnable 用于区分类型,接口内的方法和 Runnable 是一样的。
}

重写任务队列的 offer() 方法

线程池接收实现了 BlockingQueue<T> 接口的任务队列,有很多种不同的 BlockingQueue 都可以作为线程池的任务队列。

这里以 LinkedBlockingQueue<T> 为例,通过重写将其改造成 MyLinkedBlockingQueue<T>

public class MyLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> {
    @Override
    public boolean offer(T item) {
        if (item instanceof InstantRunnable){
            return false;
        }
        return super.offer(item);
    }
}

可以看到,主要添加了针对 InstantRunnable 类型的判断处理逻辑。

回顾:

  • 关键字 instanceof 用于判断某个变量的类型。
  • 关键字 super 用于在子类中直接使用父类的方法。

验证

使用如下方法进行测试;

public void test() {
        BlockingQueue<Runnable> q = new MyLinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 999, 100, TimeUnit.MINUTES, q);
        executor.execute(() -> {
            System.out.println("task 1");
            sleepIgnoreException(5000);
        });
        executor.execute(() -> {
            System.out.println("task 2");
            sleepIgnoreException(5000);
        });
        executor.execute((InstantRunnable) () -> {
            System.out.println("task 3");
            sleepIgnoreException(5000);
        });
        sleepIgnoreException(100000);
    }

我们创建了一个 coreSize = 1, maxSize = 999 的线程池。并且往里面添加了三个任务。

  • 任务1可以由核心线程直接执行
  • 任务2会被加入队列等待,直到任务1或者任务3执行完。
  • 任务3是 InstantRunnable,因此会由临时线程直接执行。

所以,任务执行流是:


graph LR
    Task1 -.-> Task2
    Task3 -.-> Task2

期望的输出应该是:Task 1 - Task 3 - Task 2。

实测的输出是:

task 1
task 3
task 2

是正确的。

如果我们把测试代码当中的 MyLinkedBlockingQueue 换成普通的 LinkedBlockingQueue,那么任务2、3都会在队列当中等待。任务的执行流变成:


graph LR
    Task1-->Task2-->Task3

期望的输出应该是:Task 1 - Task 2 - Task 3。

实测的输出是:

task 1
task 2
task 3

也是正确的。

写在最后

分析和实现的过程并不复杂,还是很好做的。

在实现的过程当中也可以发现线程池在设计上其实还是有很强的可扩展性的,其中运用到的很多 面向对象 的特性和原则,例如:

  • 继承和多态。
  • 开闭原则:总是新增而避免修改。
  • 里式替换:子类可以替换父类,实现复用。

线程池是 Java 多线程的核心机制之一,除了熟背八股之外,记得在八股的基础上多参悟理解。


线程池与面向对象——如何改造你的线程池?
https://max-wzm.github.io/2023/10/17/interview/线程池/
作者
Max Wang
发布于
2023年10月17日
许可协议