JUC之线程池

什么是线程池

线程池(thread pool):一种线程使用模式。线程过多会带来调度开销, 进而影响缓存局部性和整体性能。而线程池维护着多个线程, 等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用, 还能防止过分调度。

特点:

  • 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
  • 提高响应速度: 当任务到达时, 任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性: 线程是稀缺资源, 如果无限制的创建 ,不仅会销耗系统资源, 还会降低系统的稳定性, 使用线程池可以进行统一的分配, 调优和监控。

架构:

Java中的线程池是通过Executor框架实现的, 该框架中用到了Executor, Executors, ExecutorService, ThreadPoolExecutor这几个类

三种线程池

  1. Executors.newFixedThreadPool(int) : 一池定线程
    • 创建一个定长的线程池, 可控制线程的最大并发量, 超出的线程会在阻塞队列(LinkedBlockingQueue)中等待
    • 线程池的corePoolSizeMaxmumPoolSize是相等的
  2. Executors.newSingleThreadExecutor( ) : 一池一线程
    • 创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 超出的线程会在阻塞队列(LinkedBlockingQueue)中等待, 保证所有的任务都按照指定顺序执行
    • 线程池的corePoolSizeMaxmumPoolSize都为1
  3. Executors.newCachedThreadPool( ) : 一池N线程
    • 创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活会后空闲线程, 若无可回收, 则创建新线程
    • 线程池的corePoolSize设置为0, MaxmumPoolSize设置为Integer.MAX_VALUE
    • 阻塞队列使用的是SynchronousQueue, 也就是说来了任务就创建线程运行, 如果线程空闲超过60秒, 就销毁该线程

具体代码使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//一池多线程
//ExecutorService executorService = Executors.newFixedThreadPool(5);

//一池一线程
//ExecutorService executorService = Executors.newSingleThreadExecutor();

//可扩容线程池
ExecutorService executorService = Executors.newCachedThreadPool();

//十个请求
try {
for (int i = 1; i <= 10; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}

这三种线程池其实是并不常用的, 我们在写业务的时候会遇到各种各样的情况, 这三种线程池很难满足各式各样的需求, 就需要用到自定义线程池, 想要定制化线程池, 就需要了解创建线程池的七大参数。

线程池的七大参数

  • int corePoolSize, 常驻线程数量(核心)
  • int maximumPoolSize, 最大线程数量, 就是线程池扩容的最大值
  • long keepAliveTime, 额外扩容线程的存活时间
  • TimeUnit unit, 存活时间单位
  • BlockingQueue workQueue, 阻塞队列, 常驻线程数量都在被使用, 新的请求会被阻塞
  • ThreadFactory threadFactory, 线程工厂, 用来创建线程
  • RejetedExecutionHandler handler, 拒绝策略, 表示当线程队列满了并且工作线程大于等于线程池的最大值(maxnumPoolSize)时如何来拒绝

下面用一个简单的银行场景来解释一下这几个参数

在上面的场景中:

  • bank银行就是我们的线程池
  • 椭圆的窗口一共有5个, 但是实线的在开放的只有2个, 这里一直使用的窗口就是corePoolSize, 而窗口总数量就是maximumPoolSize
  • 候客区就是workQueue, 可以看到候客区只有5个位置, 说明最多只能有5个等待线程
  • 当候客区超了, 这时经理(threadFactory)就带着超出来的人去把闲置的窗口打开使用, 也就是扩容线程池, 创建新线程, 但是线程总数不能超过5
  • 当人少的时候, 多开出来的窗口一直空闲, 超过了keepAliveTime就会被关闭
  • 保安就是拒绝策略handler, 当五个窗口都开放了, 候客区还超标的话, 这时候保安就得做点什么, 比如让人出去, 或者把在窗口前墨迹的人赶出去, 这些就需要了解拒绝策略到底有哪几种了

底层运行流程:

拒绝策略

JDK内置了四种拒绝策略:

  • AbortPolicy(默认), 这种拒绝策略在拒绝任务时, 会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException, 让你感知到任务被拒绝了, 于是你便可以根据业务逻辑选择重试或者放弃提交等策略。
  • DiscardPolicy, 这种拒绝策略正如它的名字所描述的一样, 当新任务被提交后直接被丢弃掉, 也不会给你任何的通知, 相对而言存在一定的风险, 因为我们提交的时候根本不知道这个任务会被丢弃, 可能造成数据丢失。
  • DiscardOldestPolicy, 如果线程池没被关闭且没有能力执行, 则会丢弃任务队列中的头结点, 通常是存活时间最长的任务, 这种策略与第二种不同之处在于它丢弃的不是最新提交的, 而是队列中存活时间最长的, 这样就可以腾出空间给新提交的任务, 但同理它也存在一定的数据丢失风险。明明是我等的最久, 却让我走, 表示很难受。
  • CallerRunsPolicy, 相对而言它就比较完善了, 当有新任务提交后, 如果线程池没被关闭且没有能力执行, 则把这个任务交于提交任务的线程执行, 也就是谁提交任务, 谁就负责执行任务。这样做主要有两点好处:
    1. 新提交的任务不会被丢弃, 这样也就不会造成业务损失。
    2. 由于谁提交任务谁就要负责执行任务, 这样提交任务的线程就得负责执行任务, 而执行任务又是比较耗时的, 在这段期间, 提交任务的线程被占用, 也就不会再提交新的任务, 减缓了任务提交的速度, 相当于是一个负反馈。在此期间, 线程池中的线程也可以充分利用这段时间来执行掉一部分任务, 腾出一定的空间, 相当于是给了线程池一定的缓冲期。

自定义线程池

下面就尝试自己创建一个线程池, 使用new ThreadPoolExecutor()方法, 需求是常驻线程数为2, 最大线程数为5, 等待队列长度为3, 其余使用默认即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ThreadPoolDemo {
public static void main(String[] args) {
//自定义线程
ExecutorService executorService = new ThreadPoolExecutor(
2, 5,
2L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);

//十个请求
try {
for (int i = 1; i <= 10; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}