JUC系列:(六)线程池

布鸽不鸽 Lv4

基本概述

线程池作用:线程复用

  • 降低资源消耗:减少了创建和销毁线程的次数
  • 提高响应速度:任务到达,如果有线程可以直接用
  • 提高可管理性:使用线程池可以进行统一的分配,管理

阻塞队列

基本介绍

阻塞队列的实现(java.util.concurrent.BlockingQueue接口):

  • ArrayBlockQueue:数组结构,有界阻塞队列
  • LinkedBlockingQueue:链表结构,无界阻塞队列(有界,默认大小Integer.MAX_VALUE
  • PriorityBlockQueue:支持优先级排序的,无界阻塞队列
  • DelayedWorkQueue:使用优先级队列实现的,延迟无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,每一个生产线程会阻塞到有一个put的线程放入元素为止
  • LinkedTransferQueue:链表结构,无界阻塞队列
  • LinkedBlockingDeque:链表结构,双向阻塞队列

阻塞队列:

  • 阻塞添加put():队列已满时,添加元素的线程会被阻塞
  • 阻塞删除take():队列为空时,删除元素的线程会被阻塞

核心方法

方法类型抛出异常特殊值(boolean或者null)阻塞超时(抛出错误)
插入(尾)add(e)offer(e)put(e)offer(e,time,unit)
移除(头)remove()poll()take()poll(time,unit)
检查(队首元素)element()peek()不可用不可用

链表阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
public class LinkedBlockingQueue<E> 
extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

static class Node<E> {
E item;
// 真正的后继节点
// null,表示队尾
// 自己,发生在出队时
Node<E> next;
Node(E x) { item = x; }
}
}

入队出队

  • 有一个Dummy节点用来占位,item为null

image-20231212195823591

  • 初始化链表,last = head = new Node<E>(null)
1
2
3
4
5
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
  • 节点入队(尾节点入队),last = last.next = node
1
2
3
private void enqueue(Node<E> node) {
last = last.next = node;
}
  • 节点出队(头节点出队)
1
2
3
4
5
6
7
8
9
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next; // 头节点的后继节点
h.next = h; // 旧头节点GC
head = first; // 设置新的头节点
E x = first.item; // 返回的值
first.item = null; // 当前head节点变为Dummy节点
return x;
}
image-20231212200606216 image-20231212200611933

总结:无论如何,head节点总是Dummy节点(item为null)

加锁分析

用了两把锁 + dummy节点

线程安全分析:

  • 节点总数大于2时(包括dummy节点),putLock保证last节点的线程安全,takeLock保证head节点的安全
  • 节点总数等于2时(一个dummy节点,一个正常节点),仍然是两把锁,不会竞争
  • 节点总数等于1时(一个dummy节点),take线程被noEmpty条件阻塞,有竞争,会阻塞
1
2
3
4
5
6
7
// 用于put(阻塞),没满才能put
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

// 用于take(阻塞),不为空才能take
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
put入队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// LinkedBlockingQueue类
public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException(); // 空指针异常
int c = -1;
Node<E> node = new Node<E>(e); // 把待添加元素封装为node节点
final ReentrantLock putLock = this.putLock; // 获取全局putLock
final AtomicInteger count = this.count; // count维护元素计数
putLock.lockInterruptibly(); // putLock加锁
try {
while (count.get() == capacity) {
notFull.await(); // 队满了等待
}
enqueue(node); // 有空位,尾插法入队
c = count.getAndIncrement(); // count自增
if (c + 1 < capacity)
notFull.signal(); // put完,发现队列还有空位,唤醒一个其他put线程
} finally {
putLock.unlock(); // putLock解锁
}
if (c == 0)
signalNotEmpty(); // 自增前是0,说明生产了一个元素,唤醒一个take线程
}
1
2
3
4
5
6
7
8
9
10
11
// LinkedBlockingQueue类
private void signalNotEmpty() {

final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal(); // 调用signal而不是signalAll,是为了减少竞争
} finally {
takeLock.unlock();
}
}
take出队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await(); // 没有元素可以出队,则阻塞
}
x = dequeue(); // 出队
c = count.getAndDecrement(); // count自减
if (c > 1)
notEmpty.signal(); // 如果队列还有元素,唤醒一个take线程
} finally {
takeLock.unlock(); // takeLock解锁
}
if (c == capacity)
signalNotFull(); // 自减前是满的,说明消费了一个元素,唤醒一个put线程
return x;
}
1
2
3
4
5
6
7
8
9
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

同步阻塞队列

SynchronousQueue 是一个不存储元素的BlockingQueue每一个生产者必须阻塞匹配到一个消费者

成员属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SynchronousQueue<E> extends AbstractQueue<E> {

// 当前平台CPU数量
static final int NCPUS = Runtime.getRuntime().availableProcessors();

// 指定超时时间,当前线程最大自旋数
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

// 未指定超时时间,当前线程最大自旋数
static final int maxUntimedSpins = maxTimedSpins * 16;

// 指定超时限制的阈值,小于该值的线程不会被挂起
static final long spinForTimeoutThreshold = 1000L;
}
  • 转换器
1
2
3
4
5
6
7
8
9
10
11
12
13
private transient volatile Transferer<E> transferer;

abstract static class Transferer<E> {
/**
* 参数一:可以为null,表示这个是一个REQUEST类型的请求;反之是一个DATA类型的请求
* 参数二:true表示指定了超时时间;false表示不支持超时,会一直阻塞到匹配或者被打断
* 参数三:超时时间限制,单位是纳秒
*
* 返回值:返回值不为null,表示匹配成功。DATA类型的请求,会返回当前线程put的数据。
* 返回值为null,表示请求超时或者被中断。
**/
abstract E transfer(E e, boolean timed, long nanos);
}
  • 构造方法
1
2
3
4
5
public SynchronousQueue(boolean fair) {
// 非公平同步队列:TransferStack
// 公平同步队列:TransferQueue
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
  • 成员方法
1
2
3
4
5
6
7
8
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}

public E poll() {
return transferer.transfer(null, true, 0);
}

非公平(TransferStack)

TransferStack是非公平同步队列

所有请求都被压入栈中,栈顶元素最先匹配,栈底元素存在饥饿的问题

TransferStack类成员变量
  • 请求类型
1
2
3
4
5
static final int REQUEST    = 0;	// 表示Node类型为请求类型
static final int DATA = 1; // 表示Node类型为数据类型
static final int FULFILLING = 2; // 表示Node类型为匹配中类型
// 假如栈顶元素为REQUEST,当前请求类型为DATA,入栈会修改为FULFILLING
// 假如栈顶元素为DATA,当前请求类型为REQUEST,入栈会修改为FULFILLING
  • 栈顶元素
1
volatile SNode head;
SNode内部类
  • 成员变量
1
2
3
4
5
6
7
static final class SNode {
volatile SNode next; // 下一个栈帧
volatile SNode match; // 与当前node匹配的节点
volatile Thread waiter; // 挂起前,需要保存线程引用,以便唤醒
Object item; // 数据域,不为null表示请求类型为DATA;为null表示请求类型为REQUEST
int mode; // 表示当前Node的模式【REQUEST / DATA / FULFILLING】
}
  • 构造方法
1
2
3
SNode(Object item) {
this.item = item;
}
  • 设置方法:设置Node对象的next字段,此处使用了CAS
1
2
3
4
5
boolean casNext(SNode cmp, SNode val) {
// cmp != next,就已经不需要CAS了
// cmp == next,开始CAS修改Node对象的next字段
return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
  • 匹配方法:为当前Node匹配一个其他Node
1
2
3
4
5
6
7
8
9
10
11
12
13
boolean tryMatch(SNode s) {
// 当前match为null,使用CAS设置match字段为s
if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {

Thread w = waiter; // 匹配上了,会获取当前Node对应的线程
if (w != null) { // 条件成立,说明Node对应,Thread正在阻塞
waiter = null;
LockSupport.unpark(w); // 使用unpark方式唤醒线程
}
return true;
}
return match == s; // 匹配成功返回true
}
  • 取消方法:取消节点
1
2
3
4
5
6
7
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this); // 通过CAS将match指向自己,表示取消
}

boolean isCancelled() {
return match == this;
}
TransferStack类成员方法
  • snode方法:填充节点方法
1
2
3
4
5
6
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e); // 引用指向空时,snode方法创建一个SNode对象
s.mode = mode;
s.next = next;
return s;
}
  • transfer方法:核心方法,请求匹配出栈,不匹配则阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
E transfer(E e, boolean timed, long nanos) {

SNode s = null;
int mode = (e == null) ? REQUEST : DATA; // 判断当前请求类型

for (;;) {
SNode h = head; // 栈顶指针
if (h == null || h.mode == mode) { // 【case1】:栈顶为空,或者与当前请求无法匹配,入栈
if (timed && nanos <= 0) { // 支持超时,但是超时时间小于0,说明不支持“阻塞等待”
if (h != null && h.isCancelled()) // 栈顶是取消状态,则栈顶出栈,设置新的栈顶
casHead(h, h.next);
else // 匹配失败,直接返回null
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) { // 入栈
SNode m = awaitFulfill(s, timed, nanos); // 阻塞等待被匹配,成功返回匹配的节点,失败返回自己
if (m == s) {
clean(s); // 失败出栈
return null;
}
if ((h = head) != null && h.next == s) // 协助与之匹配的节点,出栈
casHead(h, s.next);
// 当前node模式为REQUEST类型:返回匹配节点的m.item数据域
// 当前node模式为DATA类型:返回node.item数据域,当前请求提交的数据e
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // 【case2】:栈顶不是FULFILLING说明没被匹配,当前可以匹配
if (h.isCancelled()) // 头节点是取消节点,协助出栈
casHead(h, h.next);
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 入栈当前请求的节点
for (;;) {
SNode m = s.next; // m是s的匹配节点
if (m == null) { // 可能m在await过程中被中断,clean了自己
casHead(s, null); // 清空栈并返回外层自旋
s = null;
break;
}
SNode mn = m.next; // 获取匹配节点的下一个节点
if (m.tryMatch(s)) { // 尝试匹配,匹配成功则将FULLFILLING和m出栈,并唤醒被匹配节点的线程
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
s.casNext(m, mn); // 匹配失败,出栈m
}
}
} else { // 【case3】:栈顶是FULFILLING,栈顶和其下节点正在匹配,当前请求需要做协助
SNode m = h.next; // h和m都是FULFILLING节点
if (m == null)
casHead(h, null); // 清空栈
else {
SNode mn = m.next;
if (m.tryMatch(h)) // m和h匹配,唤醒m中的线程
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}
  • awaitFulfill方法:阻塞当前线程,等待被匹配,或者取消当前节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
SNode awaitFulfill(SNode s, boolean timed, long nanos) {

final long deadline = timed ? System.nanoTime() + nanos : 0L; // 等待的截止时间
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ? // for自旋检查的次数
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted()) // 当前线程收到中断信号,设置node状态为取消
s.tryCancel();
SNode m = s.match; // 获取与当前s匹配的节点
if (m != null) // 可能是正常的匹配,也可能是取消的
return m;
if (timed) { // 如果超时就取消节点
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0) // 如果当前线程可以自旋检查,就自旋并减1
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null) // 说明没有自旋次数了,保存线程,准备阻塞
s.waiter = w;
else if (!timed) // 没有超时限制,直接阻塞
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold) // nanos > 1000纳秒的情况下,才允许挂起
LockSupport.parkNanos(this, nanos);
}
}

公平(TransferQueue)

TransferQueue是公平的同步队列,采用FIFO的队列实现,请求节点与队尾模式不同,需要与队头发生匹配

TransferQueue类成员变量
1
2
3
transient volatile QNode head;		// 指向队列的dummy节点
transient volatile QNode tail; // 指向队列的尾节点
transient volatile QNode cleanMe; // 被清理节点的前驱节点
QNode内部类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class QNode {
volatile QNode next; // 当前节点的下一个节点
volatile Object item; // 数据域,为null代表是REQUEST类型,不为null表示为DATA类型
volatile Thread waiter; // 保存对应的线程引用
final boolean isData; // 是否是DATA类型

QNode(Object item, boolean isData) { // 构造方法
this.item = item;
this.isData = isData;
}

void tryCancel(Object cmp) { // 尝试取消当前node,取消的node的item指向自己
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}

boolean isCancelled() { // 判断当前node是否被取消
return item == this;
}

boolean isOffList() { // 判断当前节点是否不在队列中(next指向自己)
return next == this;
}
}
TransferQueue类成员方法
1
2
3
4
5
6
7
8
9
10
11
void advanceHead(QNode h, QNode nh) {
// 设置头指针指向新的节点
if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // 老头节点出队
}

void advanceTail(QNode t, QNode nt) {
if (tail == t)
// 更新队尾节点指针
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
  • transfer核心方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
E transfer(E e, boolean timed, long nanos) {

QNode s = null; // s指向当前请求对应的node
boolean isData = (e != null); // 是否是Data类型的请求

for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null)
continue;

if (h == t || t.isData == isData) { // 空队列,或者队尾类型和当前一样(说明无法匹配)
QNode tn = t.next; // 获取队尾t的next节点
if (t != tail) // 多线程情况下,其他线程可能修改队尾节点
continue;
if (tn != null) { // 已经有线程入队了,更新tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 如果允许超时,超时时间小于0,这种方法不支持阻塞等待
return null;
if (s == null) // 创建当前Node
s = new QNode(e, isData);
if (!t.casNext(null, s)) // 当前Node添加到队尾
continue;

advanceTail(t, s); // 更新队尾指针
Object x = awaitFulfill(s, e, timed, nanos); // 阻塞等待
if (x == s) { // 当前node为取消状态,需要出队
clean(t, s);
return null;
}

if (!s.isOffList()) { // 当前node仍在队列中,匹配成功,需要做出队逻辑
advanceHead(t, s);
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;

} else { // 队尾节点与当前请求节点互补匹配
QNode m = h.next; // 需要和队头进行匹配
if (t != tail || m == null || h != head) // 其他线程修改了队尾节点,或者已经把head.next匹配走了
continue;

Object x = m.item; // 获取匹配节点的数据保存到x
if (isData == (x != null) || // 判断是否匹配成功
x == m ||
!m.casItem(x, e)) {
advanceHead(h, m);
continue;
}

advanceHead(h, m); // 匹配完成,将头节点出队,让这个新的头节点成为dummy节点
LockSupport.unpark(m.waiter); // 唤醒该匹配节点的线程
return (x != null) ? (E)x : e;
}
}
}
  • awaitFulfill
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {

final long deadline = timed ? System.nanoTime() + nanos : 0L; // 等待截止时间
Thread w = Thread.currentThread();
int spins = ((head.next == s) ? // 自旋次数
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted()) // 被打断就取消
s.tryCancel(e);
Object x = s.item; // 获取当前Node数据域
if (x != e)
return x; // 当前请求为REQUEST类型,返回数据
if (timed) { // 超时检查
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0) // 自旋次数减1
--spins;
else if (s.waiter == null) // 没有自旋次数了,把当前线程封装进waiter
s.waiter = w;
else if (!timed) // 阻塞
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}

操作Pool

创建方法

Executor

Executor构造方法:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,						// 核心线程数,即最小线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 存活时间,核心线程数外的线程超时销毁
TimeUnit unit,
BlockingQueue<Runnable> workQueue, // 阻塞队列,存放提交的任务
ThreadFactory threadFactory, // 线程工厂,创建线程时用到
RejectedExecutionHandler handler) // 拒绝策略(线程达到最大,仍有新任务)

拒绝策略有四个实现类:

  • ThreadPoolExecutor.AbortPolicy:让调用者抛出RejectedExecutionException异常(默认策略)
  • ThreadPoolExecutor.CallerRunsPolicy:将某些任务回退到调用者,让调用者运行
  • ThreadPoolExecutor.DiscardPolicy:直接丢弃任务
  • ThreadPoolExecutor.DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交

其他框架的拒绝策略:

  • Dubbo:抛出异常前记录日志,并dump线程栈信息,方便定位问题
  • Netty:创建一个新线程来执行任务
  • ActiveMQ:超时等待(60s),尝试放入队列
  • PinPoint:使用一个拒绝链,逐一尝试每种拒绝策略

工作原理:

image-20231223173100161
  1. 创建线程池(此时还没有创建线程)

  2. 当调用execute()方法添加一个任务时:

    • 如果当前线程数少于corePoolSize,马上创建线程运行任务

    • 如果当前线程数大于等于corePoolSize,将这个任务放入队列

    • 如果队列满了,但运行线程数还小于maximumPoolSize,创建非核心线程立即运行这个任务(对阻塞队列的任务来说不公平)

    • 如果队列满了,且运行线程数大于等于maximumPoolSize,线程池会启动拒绝策略

  3. 当一个线程完成任务,会从队列中取下一个任务来执行

  4. 当线程空闲时间超过keepAliveTime,会销毁线程(令线程数量收缩到corePoolSize大小)

Executors

提供了四种线程池的创建:

  • newFixedThreadPool:创建具有n个线程的线程池
    • 核心线程数 = 最大线程数(所以无需超时时间)
    • 单向链表实现阻塞队列,默认大小Integer.MAX_VALUE(无界的),任务较多可能OOM
    • 适用于任务量已知,相对耗时的长期任务
1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • newCachedThreadPool:创建一个可扩容的线程池
    • 核心线程数是0,最大线程数是29个1,全是救急线程(60s后可以回收)
    • SynchronousQueue作为阻塞队列,没有容量
    • 适合任务数较密集,但每个任务执行时间较短的情况
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • newSingleThreadExecutor:创建只有一个线程的线程池
    • 保证所有任务按照指定顺序执行
1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

开发要求

阿里巴巴开发手册:

  • 线程必须通过线程池提供
    • 减少内存消耗,避免过度切换
  • 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式
    • 更明确线程池的运行规则
    • 避免无界队列导致的OOM

线程池多大合适:

  • 总线程数是核心线程池数量的两倍
    • 过小不能充分利用系统资源
    • 过大导致更多上下文切换
  • 核心线程数常用公式
    • CPU密集型任务:N+1
    • IO密集型任务:2N 或者 N/(1-阻塞系数),阻塞系数在0.8-0.9之间

提交方法

ExecutorService类API:

方法说明
void execute(Runnable command)执行任务(Executor 类 API)
Future<?> submit(Runnable task)提交任务 task()
Future submit(Callable<T> task)提交任务 task,用返回值 Future 获得任务执行结果
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)提交 tasks 中所有任务
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)提交 tasks 中所有任务,超时时间针对所有task,超时会取消没有执行完的任务,并抛出超时异常
T invokeAny(Collection<? extends Callable<T>> tasks)提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
  • execute只能执行Runnable类型的任务,没有返回值
  • submit既能提交Runnable类型任务,也能提交Callable类型任务
    • 底层封装成FutureTask,然后调用execute执行
  • execute直接抛出任务执行时的异常
  • submit会吞掉异常,可通过Future的get方法将任务执行时的异常重新抛出

关闭方法

ExecutorService类API:

方法说明
void shutdown()线程池状态变为SHUTDOWN,等待任务执行完后关闭线程池,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定任务)
List<Runnable> shutdownNow()线程池状态变为 STOP,用 interrupt 中断正在执行的任务,直接关闭线程池,不会接收新任务,会将队列中的任务返回
boolean isShutdown()不在RUNNING状态的线程池,此执行者已被关闭,方法返回 true
boolean isTerminated()线程池状态是否是TERMINATED,如果所有任务在关闭后完成,返回 true
boolean awaitTermination(long timeout, TimeUnit unit)调用shutdown后,由于调用线程不会等待所有任务运行结束,如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待

处理异常

execute会直接抛出任务执行时的异常,submit会吞掉异常

方法一:主动捕获异常

1
2
3
4
5
6
7
8
9
ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.submit(() -> {
try {
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
}
});

方法二:使用Future对象

1
2
3
4
5
6
7
8
9
10
11
ExecutorService executorService = Executors.newSingleThreadExecutor();

Future<?> future = executorService.submit(() -> {
int i = 1 / 0;
});

try {
future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}

工作原理

状态信息

ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量

存储在原子变量ctl中,可以用一次CAS原子操作修改

  • 状态表示
1
2
3
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 等于29,表示低29位
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 29位能装下的最大容量
  • 四种状态
1
2
3
4
5
private static final int RUNNING    = -1 << COUNT_BITS;		// 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010
private static final int TERMINATED = 3 << COUNT_BITS; // 011
状态高3位接收新任务处理阻塞任务队列说明
RUNNING111YY
SHUTDOWN000NY不接收新任务,但处理阻塞队列剩余任务
STOP001NN中断正在执行的任务,并抛弃阻塞队列任务
TIDYING010--任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED011--终止状态
  • 获取当前线程数量
1
private static int workerCountOf(int c)  { return c & CAPACITY; }
  • 构建ctl
1
2
// rs:线程池状态;wc:线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 比较线程池ctl
1
2
3
4
5
6
7
8
9
10
11
12
13
// RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

private static boolean runStateLessThan(int c, int s) { // 当前状态是否小于某个状态
return c < s;
}

private static boolean runStateAtLeast(int c, int s) { // 当前状态是否大于等于某个状态
return c >= s;
}

private static boolean isRunning(int c) { // 当前状态是否是Running
return c < SHUTDOWN;
}
  • 设置ctl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ctl值加1,返回true或false
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

// ctl值减1,返回true或false
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

// 自旋,将ctl值减1,直至成功
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

成员属性

成员变量:

  • 线程池中存放Worker的容器
1
private final HashSet<Worker> workers = new HashSet<Worker>();
  • 线程全局锁
1
2
// 增减worker,或者修改线程池状态,需要持有锁
private final ReentrantLock mainLock = new ReentrantLock();
  • 可重入锁的条件变量
1
2
// 外部调用awaitTermination时,会等待当前线程池状态位Terminatation为止
private final Condition termination = mainLock.newCondition();
  • 相关参数
1
2
3
4
5
6
7
8
9
10
11
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile long keepAliveTime;
private volatile ThreadFactory threadFactory; // 创建线程时的线程工厂
private final BlockingQueue<Runnable> workQueue; // 超过核心线程数,放入阻塞队列

private volatile RejectedExecutionHandler handler; // 拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 默认策略

private int largestPoolSize; // 记录生命周期内线程数最大值
private long completedTaskCount; // 记录生命周期内完成任务的总数
  • 控制核心线程数量内的线程是否可以回收
1
private volatile boolean allowCoreThreadTimeOut;	// 为true时,即使是核心线程,空闲时间超时也可以回收

内部类:

  • Worker类
    • 每个Worker会绑定一个初始任务,启动Worker时优先执行。
    • Worker继承自AQS,本身具有锁的特性,采用独占锁模式
      • (state=0未占用,state>0被占用,state<0表示初始状态,不能被抢锁)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

final Thread thread; // worker内部封装的工作线程
Runnable firstTask; // worker第一个执行的任务
volatile long completedTasks; // 记录当前worker完成的任务数量

Worker(Runnable firstTask) { // 构造方法
setState(-1); // 设置AQS独占模式为初始化中状态,不能被抢占锁
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}


protected boolean tryAcquire(int unused) { // 不可重入锁
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
// Executor.DefaultThreadFactory类
public Thread newThread(Runnable r) {
// 将当前worker指定为thread的指定方法,线程调用start()会调用r.run()
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
  • 拒绝策略相关的内部类
1
2
3
4
CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

成员方法

提交方法

ThreadPoolExecutor=>AbstractExecutorService=>ExecutorService=>Executor

  • submit()方法,将提交的Runnable或者Callable封装为FutureTask执行。会返回任务对象,通过get阻塞获取结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// AbstractExecutorService类
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException(); // 空指针判断
RunnableFuture<Void> ftask = newTaskFor(task, null); // Runnable对象封装为未来任务对象
execute(ftask); // 执行方法
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException(); // 空指针判断
RunnableFuture<T> ftask = newTaskFor(task); // Callable对象封装为未来任务对象
execute(ftask); // 执行方法
return ftask;
}
1
2
3
4
5
6
7
8
// AbstractExecutorService类
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value); // Runnable封装成FutureTask,并指定返回值
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable); // Callable直接封装成FutureTask
}
  • execute()方法,执行任务,但是没有返回值。出现异常会直接抛出
    • 只能提交Runnable,或者FutureTask
    • 有担保机制,保证线程池至少有一个worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// ThreadPoolExecutor类

public void execute(Runnable command) {

if (command == null) // 非空判断
throw new NullPointerException();

int c = ctl.get(); // 获取ctl最新值
if (workerCountOf(c) < corePoolSize) { // 【case1】当前线程数少于核心线程数
if (addWorker(command, true)) // 直接创建一个新的worker,当前任务作为firstTask
return;
c = ctl.get(); // 添加worker失败(存在并发或者线程池状态改变了),重新获取ctl
}

// 【case2】线程数已达核心线程数,或者addWorker失败
if (isRunning(c) && workQueue.offer(command)) { // 【case2】线程池是否处于Running状态,成立就将任务放入队列
int recheck = ctl.get()
if (!isRunning(recheck) && remove(command)) // 如果线程池状态被外部修改了,则要把提交的任务删除
reject(command); // 任务出队成功,走拒绝策略
else if (workerCountOf(recheck) == 0) // (担保机制)线程池是running状态,判断线程池中线程数量是否为0
addWorker(null, false); // (担保机制)保证线程池在running状态,至少有一个worker
}

else if (!addWorker(command, false)) // 【case3】offer失败,说明queue满了
reject(command); // 走拒绝策略
}

添加线程

  • prestartAllCoreThreads()方法,提前预热,创建所有的核心线程
1
2
3
4
5
6
7
8
// ThreadPoolExecutor类

public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
  • addWorker()方法:添加线程到线程池,返回true表示成功,且线程启动。
    • 首先判断线程池是否允许添加线程,允许线程数量+1,然后创建Worker加入线程池
    • SHUTDOWN状态也能添加线程,但是要求新的Worker没有firstTask,且当前queue不为空,来帮助执行队列中的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// ThreadPoolExecutor类

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get(); // 线程池ctl
int rs = runStateOf(c); // 线程池状态

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) // 如上条件
return false;

for (;;) {
int wc = workerCountOf(c); // 线程数量
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 超过数量上限
return false;
if (compareAndIncrementWorkerCount(c)) // CAS线程数+1
break retry; // 成功跳出循环
c = ctl.get();
if (runStateOf(c) != rs) // 失败判断状态有无变化
continue retry; // 有变化返回外层循环,重新判断
}
}

// 【令牌申请成功】开始创建线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建Worker(底层使用传入工厂的newTread方法)
final Thread t = w.thread; // 拿到其中的thread
if (t != null) { // 确保工厂创建Thread没BUG
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加互斥锁,要添加worker了
try {
int rs = runStateOf(ctl.get()); // 获取最新状态

if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 确保满足条件
if (t.isAlive()) // 此时如果线程已经启动了,应该报错
throw new IllegalThreadStateException();
workers.add(w); // 添加到线程池
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新largestPoolSize
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) { // 如果已经成功添加到线程池,启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) // 如果启动失败,做清理工作
addWorkerFailed(w);
}
return workerStarted; // 返回启动是否成功
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ThreadPoolExecutor类

private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 操作线程池,需要先获取全局锁
try {
if (w != null)
workers.remove(w); // 移除指定worker
decrementWorkerCount(); // 线程池计数-1,相当于归还令牌
tryTerminate(); // 尝试停止线程池
} finally {
mainLock.unlock(); // 释放全局锁
}
}

运行方法

Worker=>Runnable & AQS

  • Worker实现了Runnable接口,线程启动时会调用Worker的run方法
1
2
3
4
// ThreadPoolExecutor.Worker类
public void run() {
runWorker(this);
}
  • runWorker方法,会一直while循环获取任务并执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// ThreadPoolExecutor类

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 获取worker的firstTask
w.firstTask = null; // 引用置空
w.unlock(); // 初始化worker时设置state=-1,表示不允许抢占锁
// 设置state=0,exclusiveOwnerThread=null,开始独占模式强锁
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // firstTask不是null直接运行,否则去queue获取
w.lock(); // worker加锁(表示不空闲)

// 说明线程处于STOP/TIDYING/TERMINATION,需要给线程一个中断信号
if ((runStateAtLeast(ctl.get(), STOP) ||
// 说明线程处于RUNNING或者SHUTDOWN状态,清除打断标记
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
// 中断线程,标志位为true
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子方法,任务执行前的前置处理
Throwable thrown = null;
try {
task.run(); // 执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 钩子方法,任务执行的后置处理
}
} finally {
task = null; // 局部变量task设为null,表示执行完成
w.completedTasks++; // 更新worker完成任务数量
w.unlock(); // 解锁
}
}
completedAbruptly = false; // 正常退出,completedAbruptly=false
// 异常退出,completedAbruptly=true
} finally {
processWorkerExit(w, completedAbruptly); // 根据情况,向线程池中添加线程
}
}
  • unlock()方法,重置锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ThreadPoolExecutor类
public void unlock() {
release(1);
}

// AQS类
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// ThreadPoolExecutor类
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null); // 设置持有者为 null
setState(0); // 设置 state = 0
return true;
}
  • getTask():获取任务,线程空闲时间超过keepAliveTime就会回收,判断依据时当前线程阻塞获取任务超过保活时间
    • 方法返回null,代表当前线程要被回收了,进入退出逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// ThreadPoolExecutor类
private Runnable getTask() {
boolean timedOut = false; // 超时标记

for (;;) {
int c = ctl.get(); // 当前ctl
int rs = runStateOf(c); // 当前状态

// RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
// 状态为SHUTDOWN且队列为空,或者状态为STOP之后,可以直接返回null了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 自旋的方式让ctl值-1
return null;
}

int wc = workerCountOf(c); // 线程池中的线程数量

// 条件一:允许回收核心线程,那就无所谓了,都支持超时回收
// 条件二:线程数大于核心线程数,有保活时间,去超时获取任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 条件一:如果线程数大于最大线程数,直接回收
// 条件二:允许超时,且已经超时了,直接回收
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // CAS将ctl减1,返回null,失败继续循环
return null;
continue;
}

try {
// 根据是否需要超时回收,workQueue使用带超时的poll,或者take
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r; // 成功直接返回
timedOut = true; // r为null,说明超时了
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
  • processWorkerExit()方法:线程退出线程池,也有担保机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ThreadPoolExecutor类
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果是异常退出的
decrementWorkerCount(); // 发生了异常ctl还没有减1,ctl需要减1

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 完成的任务数量加1
workers.remove(w); // worker从线程池移出
} finally {
mainLock.unlock();
}

tryTerminate(); // 尝试停止线程池,唤醒下一个线程

int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 如果是RUNNING或者SHUTDOWN状态【担保机制】
if (!completedAbruptly) { // 不是异常退出,是对空闲线程回收
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) // 最小允许线程可以是0,但是队列又不为空,需要一个线程完成任务担保
min = 1;
if (workerCountOf(c) >= min) // 线程池中的线程数量大于min,可以直接返回,不需要添加worker
return;
}
addWorker(null, false); // 是异常退出,需要添加worker
// 不是异常退出,但是线程池中线程不够,需要添加worker
}
}

停止方法

  • shutdown()方法:停止线程池,会等待执行完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ThreadPoolExecutor类
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 设置状态为SHUTDOWN,如果已经大于SHUTDOWN了则直接返回
interruptIdleWorkers(); // 中断空闲线程
onShutdown(); // 空方法,留给子类用
} finally {
mainLock.unlock();
}
tryTerminate();
}
  • shutdownNow()方法:直接关闭线程池,不会等待执行完成
    • 会返回没完成的任务列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ThreadPoolExecutor类
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 设置状态为STOP
interruptWorkers(); // 中断所有线程
tasks = drainQueue(); // 从阻塞队列中取出未处理的task
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks; // 返回未处理的任务
}
  • tryTerminate()方法:设置为TERMINATED状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// ThreadPoolExecutor类
final void tryTerminate() {
for (;;) {
int c = ctl.get(); // 获取ctl的值
if (isRunning(c) || // 条件一,RUNNING状态
runStateAtLeast(c, TIDYING) || // 条件二,TIDYING/TERMINATED状态
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) // 条件三,SHUTDOWN状态,阻塞队列不为空
return; // 直接返回

// 执行到这里,说明线程池状态为STOP,或者SHUTDOWN且队列为空
if (workerCountOf(c) != 0) { // 线程池中线程数量不为0
// 中断一个空闲线程,将导致阻塞在queue.take/queue.poll处的线程唤醒,最终getTask返回null。
// 后面执行processWorkerExit退出逻辑时,会再次调用tryTerminate(),唤醒下一个空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}


// 池中的线程数量为0时,来到这里
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 设置状态为TIDYING,线程数为0
try {
terminated(); // 结束线程池
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 设置线程池状态为TERMINATED,线程数为0
termination.signalAll(); // 唤醒所有调用awaitTermination()方法的线程
}
return;
}
} finally {
mainLock.unlock();
}
}
}

FutureTask

基本使用

FutureTask是未来任务对象,继承Runnable、Future接口,用于包装Callable对象,实现任务的提交

1
2
3
4
5
Callable<String> stringCallable = () -> "Hello";
FutureTask<String> task = new FutureTask<>(stringCallable);
new Thread(task).start();

String s = task.get();

构造方法:

1
2
3
4
5
6
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable; // 属性注入
this.state = NEW; // 状态设为NEW
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result); // 适配器模式
this.state = NEW;
}

// Executors类
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result); // 适配器模式,将Runnable转换为Callable
}

// Executors类内部类
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;

RunnableAdapter(Runnable task, T result) { // 适配器模式,将Runnable转换为Callable
this.task = task;
this.result = result;
}

public T call() {
task.run();
return result;
}
}

成员属性

FutureTask类的成员属性

  • 任务状态
1
2
3
4
5
6
7
8
9
10
11
private volatile int state;						// 当前状态

private static final int NEW = 0; // 任务创建,尚未执行
private static final int COMPLETING = 1; // 任务正在结束,尚未完全结束
private static final int NORMAL = 2; // 任务正常结束

private static final int EXCEPTIONAL = 3; // 任务执行发生异常
private static final int CANCELLED = 4; // 任务被取消

private static final int INTERRUPTING = 5; // 任务中断中
private static final int INTERRUPTED = 6; // 任务已中断
  • 任务对象
1
private Callable<V> callable;
  • 存储任务执行的结果
1
private Object outcome;
  • 执行任务的线程对象
1
private volatile Thread runner;
  • 线程阻塞队列的头节点
1
private volatile WaitNode waiters;
  • 内部类
1
2
3
4
5
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

成员方法

任务执行:

  • FutureTask::run()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void run() {
// 条件一:状态不是NEW,说明任务已经被别人执行过了。直接返回
// 条件二:尝试将runner设为自己的线程,失败说明被别人抢去了。直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;

try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran; // 表示是否抛出过异常
try {
result = c.call();
ran = true; // 没有异常
} catch (Throwable ex) {
result = null;
ran = false; // 有异常
setException(ex);
}
if (ran)
set(result); // 设置返回值
}
} finally {
runner = null; // 任务执行完成,取消线程引用
int s = state;
if (s >= INTERRUPTING) // 判断任务是否被中断
handlePossibleCancellationInterrupt(s); // 执行中断处理方法,见后面
}
}
  • FutureTask::run()方法,设置返回值
1
2
3
4
5
6
7
8
protected void set(V v) {
// CAS设置当前状态为COMPLETING,失败说明任务被其他线程取消了
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 将值赋给outcome
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 当前任务状态修改为NORMAL
finishCompletion(); // 见后面
}
}
  • FutureTask::setException()方法,设置异常返回值
1
2
3
4
5
6
7
8
protected void setException(Throwable t) {
// CAS设置当前状态为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t; // 异常值赋给outcome,用来往外抛
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 当前任务状态修改为EXCEPTIONAL
finishCompletion(); // 见后面
}
}
  • FutureTask::finishCompletion()方法,唤醒get阻塞线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) { // 遍历所有等待的节点
// 使用CAS设置waiter为null,防止外部线程使用cancel取消当前任务,触发finishCompletion重复执行
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread; // 获取当前waitNode节点封装的thread
if (t != null) { // 当前线程不为null,唤醒get等待获取数据的线程
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next; // 获取下一个节点
if (next == null)
break;
q.next = null; // 断开链表
q = next;
}
break;
}
}

done();
callable = null;
}
  • FutureTask::handlePossibleCancellationInterrupt()方法,任务中断处理
1
2
3
4
5
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // 需要一直等待中断完成
}

结果获取:

  • FutureTask::get()方法,获取线程执行的返回值。可以有多个线程get
1
2
3
4
5
6
public V get() throws InterruptedException, ExecutionException {
int s = state; // 获取当前任务状态
if (s <= COMPLETING) // 条件成立,说明还没执行完成
s = awaitDone(false, 0L); // 等待完成,最终返回状态
return report(s);
}
  • FutureTask::awaitDone()方法,get线程封装成WaitNode对象,进入阻塞队列等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L; // 等候的DDL
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) { // 返回true说明被打断,会清除打断标记
removeWaiter(q); // 当前waitNode出队
throw new InterruptedException(); // 抛出错误
}

int s = state; // 获取任务状态
if (s > COMPLETING) { // 条件成立,说明已经有结果了
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 条件成立,说明任务接近完成,这里让线程释放一下cpu,下一次再抢占
Thread.yield();
else if (q == null) // 【第一次自旋】创建WaitNode对象
q = new WaitNode();
else if (!queued) // 【第二次自旋】创建了WaitNode对象,但还没入队
// waiters指向队首,CAS让waiters指向当前WaitNode(头插法)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos); // 超时阻塞等待
}
else
LockSupport.park(this); // 阻塞等待
}
}
  • FutureTask::report()方法,封装运行结果(成员变量outcome)
1
2
3
4
5
6
7
8
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x; // 正常结束,直接返回
if (s >= CANCELLED)
throw new CancellationException(); // 被取消或中断,抛出异常
throw new ExecutionException((Throwable)x); // 说明自定义的Callable方法有异常,抛出outcome异常
}
  • FutureTask::cancel()方法,任务取消,打断正在执行该任务的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean cancel(boolean mayInterruptIfRunning) {
// 条件一:表示当前任务处于运行中,或者队列中
// 条件二:修改状态,成功才能继续执行,否则返回false
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;

try {
if (mayInterruptIfRunning) { // 如果任务已经运行允许被打断
try {
Thread t = runner; // 获取当前FutureTask的thread
if (t != null)
t.interrupt(); // 打断
} finally {
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // 设置状态为打断完成
}
}
} finally {
finishCompletion(); // 唤醒所有get阻塞的线程
}
return true;
}

任务调度

Timer

Timer实现定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Timer timer = new Timer();

TimerTask task1 = new TimerTask() {
@Override
public void run() {
System.out.println("task1");
Thread.sleep(2000);
}
};

TimerTask task2 = new TimerTask() {
@Override
public void run() {
System.out.println("task2");
Thread.sleep(1000);
}
};

// 使用timer添加两个任务,都是延时1s后执行
// 但由于timer内,只有一个线程来顺序执行任务,因此task1的延时会影响task2的执行
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);

Scheduled

任务调度线程池ScheduledThreadPoolExecutor,继承ThreadPoolExecutor

  • 使用内部类ScheduledFutureTask,封装任务
  • 使用内部类DelayedWorkQueue,作为线程池队列
  • 重写onShutdown方法,去处理shutdown后的任务
  • 提供decorateTask方法,作为ScheduledFutureTask 的修饰方法,以便开发者进行扩展

构造方法:

1
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
1
2
3
4
5
6
7
8
9
10
11
// Executors类
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

// ScheduledThreadPoolExecutor类
public ScheduledThreadPoolExecutor(int corePoolSize) {

// 最大线程数固定为MAX_VALUE,保活时间keepAliveTime固定为0
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}

常用API:

  • 返回值都是ScheduledFuture<?>
API参数说明
scheduleRunnable command, long delay, TimeUnit unit执行延时任务
scheduleCallable<V> callable, long delay, TimeUnit unit执行延时任务
scheduleAtFixedRateRunnable command, long initialDelay, long period, TimeUnit unit定时执行周期任务(不考虑执行的耗时)
scheduleWithFixedDelayRunnable command, long initialDelay, long delay, TimeUnit unit定时执行周期任务(要考虑执行的耗时)

基本使用:

  • 延迟任务
1
2
3
4
5
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

executor.schedule(() -> {
System.out.println("task1");
}, 1000, TimeUnit.MILLISECONDS);
  • 周期任务scheduleAtFixedRate一次任务启动到下一次任务启动之间,只要大于间隔时间,抢占到CPU就会立即执行
1
2
3
4
5
6
7
8
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

executor.scheduleAtFixedRate(() -> {
System.out.println(new Date());
Thread.sleep(2000);
}, 1000, 1000, TimeUnit.MILLISECONDS);

// 两次打印间隔2s
  • 周期任务scheduleWithFixedDelay一次任务结束到下一次任务开始间隔,严格等于initialDelay
1
2
3
4
5
6
7
8
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

executor.scheduleWithFixedDelay(() -> {
System.out.println(new Date());
Thread.sleep(2000);
}, 1000, 1000, TimeUnit.MILLISECONDS);

// 两次打印间隔3s

ForkJoin

ForkJoin:线程池的实现,体现的是分治思想,用于并行计算。

任务拆分:将一个大任务拆分为算法上相同的小任务,直至不能拆分,可以直接求解。

跟递归相关的一些计算,比如归并排序,斐波那契数列都可以用分治思想。

  • ForkJoin在分治基础上,加入了多线程,把每个任务的分解和合并交给不同线程来完成,提高了运算效率
  • ForkJoin使用ForkJoinPool来启动,是一个特殊的线程池,默认创建与CPU核心数大小相同的线程池
  • 任务有返回值继承RecursiveTask,没有返回值继承RecursiveAction
1
2
3
ForkJoinPool pool = new ForkJoinPool();
MyTask task = new MyTask(100);
Integer result = pool.invoke(task);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MyTask extends RecursiveTask<Integer> {
private int n;
public MyTask(int n) {
this.n = n;
}

@Override
protected Integer compute() {
if (n == 1) {
return n;
}

// 将任务进行拆分
MyTask t1 = new MyTask(n - 1);
t1.fork();
return n + t1.join();
}
}
  • 上一个拆分不是很合理,优化一下
1
2
3
ForkJoinPool pool = new ForkJoinPool();
MyTask task = new MyTask(1, 100);
Integer result = pool.invoke(task);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class MyTask extends RecursiveTask<Integer> {
private int begin;
private int end;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}

@Override
protected Integer compute() {
if (begin == end) {
return begin;
}
if (end - begin == 1) {
return begin + end;
}

int mid = (begin + end) / 2;
MyTask t1 = new MyTask(begin, mid);
MyTask t2 = new MyTask(mid + 1, end);
t1.fork();
t2.fork();

return t1.join() + t2.join();
}
}

ForkJoinPool实现了工作窃取算法,来提高CPU利用率

  • 每个线程都维护了一个双端队列,用来存储需要执行的任务
  • 工作窃取算法:允许空闲的线程,从其他线程双端队列中窃取任务来执行
    • 窃取的必须是最晚的任务,避免和队列所属线程发生竞争(但是队列中只有一个任务时,还是会发生竞争)

享元模式

享元模式:用于减少创建对象的数量,以减少内存占用和提高性能。

异步模式:让有限的工作线程,轮流异步处理无限多的任务(典型实现:线程池)

工作机制:享元模式,尝试重用现有的同类对象,如果未匹配才创建新对象

  • 自定义连接池:
1

  • 标题: JUC系列:(六)线程池
  • 作者: 布鸽不鸽
  • 创建于 : 2024-01-10 14:54:58
  • 更新于 : 2024-01-10 14:56:39
  • 链接: https://xuedongyun.cn//post/11329/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论