跳转至

Java线程池

线程池

基本概念

在池内维护多个长期存在的 “常驻线程” ,通过重复使用这些 “常驻线程” 来避免线程的反复创建和销毁。

使用方式

ExecutorService 继承自 Executor 接口,并扩展了更多功能,例如:

  • 提交任务(submit)。
  • 关闭线程池(shutdownshutdownNow)。
  • 检查线程池状态(isShutdownisTerminated)。
  • 等待任务完成(awaitTermination)。

Executors 是一个工具类,提供了静态方法来创建常见的线程池实例

// 自定义
ExecutorService mytp = new ThreadPoolExecutor(/* 参数 */)
// 默认线程池(不会新开线程)
ExecutorService ftp = Executors.newFixedThreadPool(10);
// 单线程池
ExecutorService stp = Executors.newSingleThreadExecutor();
// 缓存线程池
ExecutorService ctp = Executors.newCachedThreadPool();
// 定时线程池
ExecutorService shtp = Executors.newScheduledThreadPool(5);

1. 线程池的核心参数

参数名称 类型 简要介绍
corePoolSize int 核心线程数
maximumPoolSize int 最大线程数
keepAliveTime long 空闲时间
unit TimeUnit 空闲时间的时间单位
workQueue BlockingQueue 任务队列
threadFactory ThreadFactory 线程工厂
handler RejectedExecutionHandler 拒绝策略
  • corePoolSize:核心线程的数量。即使没有任务,核心线程也不会被回收
  • maximumPoolSize:最大线程数,大于 corePoolSize 的部分会在空闲一段时间后被回收
  • keepAliveTime:多出来的这部分线程在空闲多长时间后会被回收
  • unit:空闲时间的单位
  • workQueue:任务队列:正常情况下,任务被提交到线程池之后,会立即被核心线程所执行,但是当核心线程都处于忙碌状态的时候,没有核心线程去执行这个任务,那么这个任务会被暂时提交到任务队列中等待核心线程空闲下来再去执行!当任务队列被放满了,比如一个长度为 10 的队列,里面已经放了 10 个任务,那么第 11 个任务就会触发 maximumPoolSize 线程的执行。
  • threadFactory:控制线程池产生一个怎样的新线程:置线程池中产生线程的名字、优先级、是不是守护线程等
  • handler:拒绝策略,当 corePoolSize、maximumPoolSize、workQueue 全部都被任务填满了之后,线程池会认为已经无力再执行后续提交的任务,此时对于后续的任务会触发拒绝策略来拒绝任务

需要额外说明:这里的核心线程与非核心线程只是一个称呼,在 ThreadPoolExecutor 内部,只要小于核心线程数的线程统称为核心线程,大于核心线程数的统称为非核心线程,不分先后,不一定先创建的就是核心线程、后创建的就是非核心线程。

举个例子,当 coreSize 为 1、maxSize 为 3、队列长度为 0 的时候,提交三个任务,A、B、C 三个线程分别去执行,A 并不一定是核心线程,当 A 执行完毕后,B、C 还在运行中时,此时 A 就会在到达超时时间之后被回收掉, B 和 C 中有一个线程就会被当作核心线程使用。

线程池如何安排任务

当任务开始提交后,线程池中:

  • (0, corePoolSize) 线程池立刻创建一个新线程执行任务
  • [corePoolSize, maximumPoolSize) 线程池将任务暂存到 workQueue 中,等待核心线程
  • workQueue 放不下了:创建新线程直到 存活的 线程数为 maximumPoolSize
  • 全满后仍在提交任务:调用 handler 拒绝多出的任务
  • 没有新的提交后,所有现存的线程会帮助消费 workQueue 中的任务,随后多出 corePoolSize 的线程开始等待并被回收

image-20241216144513399

回收核心线程
threadPoolExecutor.allowCoreThreadTimeOut(true);

上述代码就是指定是否回收核心线程,在设置了该项参数之后 ,当核心线程空闲之后也会被回收,如果线程池一个任务也没有,那么在空闲一段时间之后,线程池中线程会被全部回收,等有任务了再去新建线程。

workQueue 常用类型

规定这个队列为阻塞的队列

  • java.util.concurrent. SynchronousQueue:容量为 0 ,只是做一个简单的交换。因为没有容量,线程池内的线程数可以很轻松地达到 maximumPoolSize 设置的容量。

    • 当线程池的工作队列是 SynchronousQueue 时,提交的任务不会被存储在队列中,而是直接交给空闲的线程处理。
    • 由于 SynchronousQueue 没有容量,线程池会尽可能地创建新线程来处理任务,直到达到 maximumPoolSize(最大线程数)。
  • java.util.concurrent. LinkedBlockingQueue:无界队列,所谓无界队列的意思就是它没有边界,大小近乎无限,队列容量为Integer.MAX_VALUE。使用这种队列的时候需要特别注意,因为它的容量近乎无限,所以线程池参数 maximumPoolSize 是不生效的,拒绝策略也是失效的,因为队列永远也装不满;所以在任务的执行速度低于任务产生的情况下,众多的任务可能被无限地堆积在无界队列中,最终导致 OOM 的发生。
  • java.util.concurrent. ArrayBlockingQueue:有界队列,它的概念与无界队列恰恰相反,它可以设置一个长度,这种情况下maximumPoolSize和拒绝策略就有了意义,当队列被塞满后就会执行我们分析的逻辑。
  • java.util.concurrent. ScheduledThreadPoolExecutor.DelayedWorkQueue:延时队列,它可以写入一个任务并定义一个时间,这个任务只有在达到超时时间后才能被消费,这种队列适用于定时线程池,后面会详细分析。
handler 的意义
  • java.util.concurrent.ThreadPoolExecutor. DiscardOldestPolicy:当满足拒绝策略时,丢弃任务队列中旧的任务,将新任务添加到任务队列。
  • java.util.concurrent.ThreadPoolExecutor. AbortPolicy:当满足拒绝策略时,提交的任务会直接抛出 RejectedExecutionException 异常。
  • java.util.concurrent.ThreadPoolExecutor. CallerRunsPolicy:当满足拒绝策略时,被拒绝的任务会交由提交任务的那个线程去执行,谁提交的谁执行。
  • java.util.concurrent.ThreadPoolExecutor. DiscardPolicy:当满足拒绝策略时,新提交的任务会被静默丢弃,不会出现任何异常。

当我们的系统对于某一个任务特别敏感的时候,就是即使线程池处理不了了,那么这个任务也必须执行,此时就可以使用 CallerRunsPolicy,它会直接让主线程来执行。比如,A 线程向线程池提交任务,结果线程池处理不了了,那么这个拒绝策略就会直接让 A 线程自己去执行这个任务!从而保证任务一定能够被执行。但是注意,这种拒绝策略会导致调用者线程阻塞。

使用者也可以自定义拒绝策略,比如我们在线程池满了之后,输出一行丢弃的日志之后将任务丢弃,只需要实现 java.util.concurrent.RejectedExecutionHandler 接口即可,具体的定义方式如下:

public  class MyPolicy implements RejectedExecutionHandler {
    @Override
    public  void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println( "线程池已经达到最大极限,该任务被丢弃..." );
    }
}
threadFactory

开发者可以自定义线程池工厂(否则使用默认工厂创建优先级相同的非守护线程)

public  class MyThreadFactory implements ThreadFactory {

/**
 * 线程名称递增id
 */
 private  final  static AtomicLong IDX = new AtomicLong();

    @Override
    public Thread newThread(Runnable r) {
        //将任务包装为线程
        Thread thread = new Thread(r);
        //设置线程名称
        thread.setName( "test-Thread-" +IDX.getAndIncrement());
        return thread;
    }
}

2. JDK 默认的线程池创建方式

线程池的常用创建方式:

ExecutorService es = new ThreadPoolExecutor(10, 20, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

ex.submit(() -> System.out.print("yes"));

下面创建的都是 ThreadPoolExecutor 实例

(1) 定长线程池

corePoolSize 等于 maximumPoolSize,队列使用无界队列(永远不会触发拒绝)

ExecutorService ftp = Executors.newFixedThreadPool(10);
(2) 简单线程池

定长池的特殊情况,corePoolSize = maximuPoolSize = 1

所有的任务都保存在队列LinkedBlockingQueue中,等待唯一的单线程来执行任务。

ExecutorService ste = Executors.newSingleThreadExecutor();
(3) 缓存线程池
Executors.newCachedThreadPool();

是来多少任务,我开启多少线程,当任务执行完毕后,线程空闲一定时间后会被回收。

它的corePoolSize为 0,但是maximumPoolSize却为Integer.MAX_VALUE,使用的队列是SynchronousQueue,空闲时间为 60 s

极限情况会无限创造线程

(4) 定时线程池
Executors.newScheduledThreadPool();

延迟 1 秒后执行

scheduledExecutorService.schedule(() -> System.out.println("定时任务执行"), 1, TimeUnit.SECONDS);
  • scheduledExecutorService:一个 ScheduledExecutorService 实例,用于调度任务。
  • () -> System.out.println("定时任务执行"):这是一个 Lambda 表达式,表示一个 Runnable 任务。
  • 1:延迟时间为 1 秒。
  • TimeUnit.SECONDS:时间单位为秒。

以固定频率执行:10表示初始执行的延迟,5表示每隔5秒执行一次(上个任务需要执行完)

 scheduledExecutorService.scheduleWithFixedDelay(() -> System.out.println("扫描数据库邮件表,并发送邮件"), 10, 5, TimeUnit.SECONDS);

无论上一个是否执行完,都以一定频率开始执行

scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("定时任务执行"), 1, 1, TimeUnit.SECONDS);

3. 自定义线程池

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r);
    }
}, new ThreadPoolExecutor.AbortPolicy());

ThreadPoolExecutor 中,ArrayBlockingQueue 用于存储任务(RunnableCallable),因此类型参数通常是 RunnableCallable

4. 线程池的主要 API

(1) 提交任务

提交一个任务后不代表被提交的任务会立刻执行

execute 方法

void execute(Runnable task);

public class ThreadPoolNotResultSubmitTest {
    /**
     * 使用默认的线程工厂
     */
    private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    public static void main(String[] args) {
        THREAD_POOL_EXECUTOR.execute(() ->{
            System.out.println("线程池执行任务,线程名为: " + Thread.currentThread().getName());
        });
    }
}

submit 方法

public class ThreadPoolResultSubmitTest {
    /**
     * 使用默认的线程工厂
     */
    private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Future<String> future = THREAD_POOL_EXECUTOR.submit(() -> {
            System.out.println("我执行了");
            return String.format("我是执行结果,我被线程【%s】执行", Thread.currentThread().getName());
        });
        System.out.println("线程执行结果: "+future.get());
    }
}

在上述的代码中,我们通过 submit 提交了一个异步任务,任务提交后会返回一个 Future,我们基于 Future 可以获取任务的返回结果和异常信息。

(2) 停止线程池
THREAD_POOL_EXECUTOR.shutdown();

并不会立即把线程池停掉,而是等待线程池内的所有任务全部执行完毕后,才会关闭线程池。

需要注意的是,发起 shutdown 的信号后,线程池会停止接收新任务。此时如果再调用 shutdown 后再去提交任务,线程池会将任务直接推送到拒绝策略去执行。简单说,任务停止后是不允许提交新任务的

THREAD_POOL_EXECUTOR.shutdownNow();

强行停止线程池及其所有任务的执行

线程池扩展钩子函数

所谓的钩子函数就是线程池在任务执行前或执行后会主动触发一下这个钩子函数,使得线程池能够在任务执行前后有一定的介入能力!

线程池为我们提供的钩子回调分别是afterExecutebeforeExecuteafterExecute的执行时机是任务执行完成后,而beforeExecute的调用时机是任务执行前。

假设我们有这样一个需求,因为向线程池提交任务之后,任务何时执行我们并不知道,如果我们想要在任务执行之前记录一个任务的开始时间,任务结束之后记录一个结束时间,此时我们就可以使用如下的方式来记录:

public class ExThreadPoolTest extends ThreadPoolExecutor {
    public ExThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        //任务开始执行
        System.out.println("任务开始执行,执行时间为:" + new Date());
        super.beforeExecute(t, r);
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("任务执行完毕,结束时间为:" + new Date());
        super.afterExecute(r, t);
    }
    public static void main(String[] args) {
        ExThreadPoolTest exThreadPoolTest = new ExThreadPoolTest(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        exThreadPoolTest.execute(() ->{
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println("任务结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

注意,这里重写 beforeExecute() 和 afterExecute() 方法时都需要调用父类的方法,调用 super.afterExecute(r, t) 是为了确保父类的默认行为得以执行,避免破坏继承链,并且遵循“扩展而非替换”的原则。

线程池的五种状态

  1. RUNNING:正常运行,接受任务
  2. SHUTDOWN:不再接受新任务,但会执行已经提交的,执行完后进入 TERMINATED
  3. STOP:不再接受新任务,,不执行已提交的,尝试中断正在执行的任务,随后进入 TERMINATED 状态
  4. TIDYING:所有任务都已经终止,TERMINATED 前的状态,线程池会执行一些清理工作,此时调用 terminated() 勾子方法
  5. TERMINATED:terminated() 方法执行完后,终止

调用 shutdown() 会从 RUNNING 转到 SHUTDOWN

调用 shutdownNow() 会从 RUNNING 转到 STOP

调用 awaitTermination() 用于等待线程池进入 TERMINATED(当前线程会阻塞)

Future 类

Future 最主要的作用是,比如当做一定运算的时候,运算过程可能比较耗时,有时会去查数据库,或是繁重的计算,比如压缩、加密等,在这种情况下,如果我们一直在原地等待方法返回,显然是不明智的,整体程序的运行效率会大大降低。我们可以把运算的过程放到子线程去执行,再通过 Future 去控制子线程执行的计算过程,最后获取到计算结果。这样一来就可以把整个程序的运行效率提高,是一种异步的思想。

Future 接口的用法

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException;

}

注意 submit() 方法返回的是指向 FutureTask 的引用,但是在下面这段代码中被向上转型为 Future

public class FutureDemo {
    public static void main(String[] args)
    {
        ExecutorService es = Executors.newFixedThreadPool(10);
        Future<Integer> future = es.submit(() -> {
            System.out.print("");
        })

        System.out.print(future.get());
    }
}

image-20241217225001310

FutureTask

既然 RunnableFuture 继承了 Runnable 接口和 Future 接口,而 FutureTask 又实现了 RunnableFuture 接口,所以 FutureTask 既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

image-20241217224929569

典型用法是,把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,最后还可以通过 FutureTask 获取任务执行的结果。

public class FutureTaskDemo {
    class Task implements Callable {
        @Override
        public Integer call() {
            int sum = 0;
            for(int i=0;i<100;i++)
            {
                sum += i;
            }
            return sum;
        }
    }

    public static void main(String[] args) {
        Task task = new Task();
        FutureTask<Integer> ft = new FutureTask<>(task);
        new Thread(ft).start();
        System.out.println(ft.get());
    }
}