JUC系列:(七)同步器

布鸽不鸽 Lv4

AQS

核心思想

AQS(AbstractQueuedSynchronizer),是阻塞式锁的框架,许多同步类都依赖于该同步器

独占与共享:

  • AQS用状态属性,来表示资源的状态

  • 独占模式:只有一个线程能访问资源

  • 共享模式:允许多个线程访问资源,比如Semaphore,ReentrantReadWriteLock

AQS核心思想:

  • 如果请求的资源空闲,则将当前线程设为有效的工作线程,资源设为锁定状态
  • 如果请求的资源锁定,则将当前线程加入到队列中

CLH是一种基于单向链表的高性能、公平的自旋锁。AQS将所有请求的线程封装为CLH锁队列的一个结点,来实现锁的分配

image-20231212202746559

设计原理

  • 获取锁
1
2
3
4
5
6
while (state 状态不允许获取) {
if (队列中没有此线程) {
入队并阻塞 park
}
}
当前线程出队
  • 释放锁
1
2
3
if (state 状态允许了) {
恢复阻塞的线程(s) unpark
}

AQS中state设计:

  • state使用32位 int来维护同步状态,0表示未加锁,大于0表示已加锁
  • state使用volatile,配合CAS保证修改的原子性
  • state表示线程重入的次数(独占模式),或者剩余许可数(共享模式)
  • state API
    • protected final int getState():获取state状态
    • protected final void setState(int newState):设置state状态
    • protected final boolean compareAndSetState(int expect,int update):CAS安全设置state

线程的Node节点中waitstate设计:

  • 使用volatile,配合CAS保证修改的原子性
  • 状态有以下几种
1
2
3
4
5
volatile int waitStatus;			// 默认为0
static final int CANCELLED = 1; // 由于超时或中断,此节点被取消,不会再改变状态
static final int SIGNAL = -1; // 此节点后面的节点已被阻止(park)(当前节点释放或取消时必须唤醒后面的节点)
static final int CONDITION = -2; // 此节点在条件队列中
static final int PROPAGATE = -3; // 将releaseShared传播到其他节点

阻塞恢复设计:

  • 使用park-unpark来实现线程的暂停和恢复(命令的先后不影响结果)
  • park线程可以通过interrupt来打断

队列设计:

  • 双向链表,FIFO队列
1
2
3
4
5
6
7
8
9
10
11
12
private transient volatile Node head;	// 头节点,哑元节点
private transient volatile Node tail; // 尾节点

static final class Node {

static final Node SHARED = new Node(); // 枚举:共享模式
static final Node EXCLUSIVE = null; // 枚举:独占模式
volatile Node prev; // 指向上一个节点
volatile Node next; // 指向下一个节点
volatile Thread thread; // 当前node所属线程
Node nextWaiter; // (条件队列使用该属性)条件队列是单向链表,只有后继指针
}
  • 条件变量,来实现:等待、唤醒
  • 支持多个条件变量,类似Monitor的WaitSet
  • 条件队列是单向队列
1
2
3
4
5
public class ConditionObject implements Condition, java.io.Serializable {

private transient Node firstWaiter; // 指向条件队列的第一个node节点
private transient Node lastWaiter; // 指向条件队列的最后一个node节点
}

模板对象

  • 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法
  • 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,这些模板方法会调用使用者重写的方法

自定义同步器需要重写下面几个AQS提供的模板方法

1
2
3
4
5
isHeldExclusively()		// 该线程是否正在独占资源。只有用到condition才需要去实现它
tryAcquire(int) // 独占方式。尝试获取资源,成功则返回true,失败则返回false
tryRelease(int) // 独占方式。尝试释放资源,成功则返回true,失败则返回false
tryAcquireShared(int) // 共享方式。尝试获取资源。负数表示失败;0表示成功但没有剩余可用资源;正数表示成功且有剩余资源
tryReleaseShared(int) // 共享方式。尝试释放资源,成功则返回true,失败则返回false
  • 默认,这些方法会抛出UnsupportedOperationException
  • 这些方法必须内部线程安全

自定义

  • 自定义一个独占模式的可重入锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class MyLock implements Lock {

// AQS独占锁,不可重入
class MySync extends AbstractQueuedSynchronizer {

// 尝试加锁
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
// 设置owner为当前线程
this.setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// 尝试解锁
@Override
protected boolean tryReleaseShared(int arg) {
this.setExclusiveOwnerThread(null);
this.setState(0);
return true;
}

// 判断,是否持有独占锁
@Override
protected boolean isHeldExclusively() {
return this.getState() == 1;
}

public Condition newCondition() {
return new ConditionObject();
}
}

private MySync sync = new MySync();

// 加锁(不成功加入等待队列)
@Override
public void lock() {
sync.acquire(1);
}

// 加锁,可以打断
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

// 尝试加锁,只尝试一次
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

// 尝试加锁,带超时
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

// 解锁
@Override
public void unlock() {
sync.release(1);
}

// 条件变量
@Override
public Condition newCondition() {
return sync.newCondition();
}
}

ReentrantLock

对比synchronized

  1. 锁的实现:ReentrantLock是JDK实现的,synchronized是JVM实现的
  2. 性能:ReentrantLock和synchronized大致相同
  3. 使用:ReentrantLock需要手动解锁,synchronized自动解锁
  4. 可中断:ReentrantLock可中断,而synchronized不行
  5. 公平锁:ReentrantLock可以设置公平锁,synchronized中的锁是非公平的
  6. 锁超时:ReentrantLock可以设置超时时间,synchronized会一直等待
  7. 锁绑定多个条件:ReentrantLock可以同时绑定多个Condition对象(细粒度的唤醒)
  8. 可重入:两者都是可重入锁

使用锁

  • public void lock():获得锁
    • 锁没有被其他线程占用,计数设为1
    • 锁被当前线程占用,计数+1
    • 锁被其他线程占用,进入休眠状态
  • public void unlock():尝试释放锁
    • 锁被当前线程占用,计数-1;如果计数器为0,则锁被释放
    • 如果锁不是被当前线程占用,抛出异常
1
2
3
4
5
6
reentrantLock.lock();		// 获取锁
try {
// 临界区
} finally {
reentrantLock.unlock(); // 释放锁
}

公平锁

基本使用

构造方法:ReentrantLock lock = new ReentrantLock(true)

  • 默认是非公平的
  • 公平锁一般没必要,会降低并发度
1
2
3
4
5
6
7
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

非公平锁原理

NonfairSync继承自AQS

  • NonfairSync -> Sync -> AQS

加锁原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// NonfairSync类
final void lock() {
if (compareAndSetState(0, 1)) // 使用CAS尝试一次,将state从0改为1
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 失败进入
}


// AQS类
public final void acquire(int arg) {
// tryAcquire尝试获取锁
// 获取锁失败,会调用addWaiter将当前线程封装成Node入队,acquireQueued阻塞当前线程
// acquireQueued返回true表示被中断唤醒过,false表示从未被中断唤醒过
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); // 如果被中断唤醒过,会来到这里(执行Thread.currentThread().interrupt();)
}

image-20231212231824339

  • tryAcquire尝试获取锁,只有两种情况能成功
    1. 当前AQS锁未被占有
    2. 当前AQS锁被自己占有,可以进行锁重入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// NonfairSync类
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}


// AQS类
// 抢占成功返回true,失败返回false
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 当前AQS的状态
if (c == 0) { // 锁没有被占有,尝试使用CAS获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 锁被当前线程占有,锁重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; // 否则都返回false
}
  • 接下来是addWaiter的逻辑
    • 图中黄色三角表示Node的waitStatus状态,0为默认正常状态
    • Node的创建是懒惰的,第一个Node是哑元,用来占位

image-20231212233215319

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
abstract static class Sync extends AbstractQueuedSynchronizer {

// addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 创建Node对象,模式为独占模式
Node pred = tail;
if (pred != null) { // tail不为null,说明有队列
node.prev = pred;
if (compareAndSetTail(pred, node)) { // CAS,尾插法加入
pred.next = node;
return node;
}
}
enq(node); // 初始队列为空,或者CAS失败
return node;
}

private Node enq(final Node node) {
for (;;) { // 自旋入队,必须成功才结束循环
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node())) // tail为null,CAS设置哑元头节点
tail = head;
} else {
node.prev = t; // 当前节点前序节点设为之前的tail
if (compareAndSetTail(t, node)) { // CAS自旋尝试入队(tail指针指向它)
t.next = node; // 之前的tail节点的后继节点设为当前节点
return t;
}
}
}
}
}
  • 接下来是acquireQueued
    • 返回true表示被中断唤醒过
    • 返回false表示从未被中断唤醒过
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 是否被中断唤醒过
for (;;) { // 开始for循环
final Node p = node.predecessor(); // 获得当前线程节点的前驱节点
if (p == head && tryAcquire(arg)) { // 前驱是head,尝试去获取锁
setHead(node); // 获取成功,设置自己的node为head
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 判断是否应当park
parkAndCheckInterrupt()) // park,会返回期间是否被打断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
  • shouldParkAfterFailedAcquire,判断是否应当park
    • 将前驱的node的waitStatus改为Node.SIGNAL(-1)
    • waitStatus为-1的节点用来唤醒下一个节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
abstract static class Sync extends AbstractQueuedSynchronizer {

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 如果前置节点可以唤醒当前节点(waitState等于-1)
return true; // 返回应该park

if (ws > 0) { // 前置节点处于取消状态,需要不断删除前面取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // CAS尝试一次,设置上一个节点状态为Node.SIGNAL
}

return false; // 返回不应该park(返回外部,会继续循环重试)
}
}
  • parkAndCheckInterrupt,判断是否被打断
1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 阻塞当前线程,如果打断标记是true则park失败
return Thread.interrupted(); // 判断是否被打断,清除打断标记
}

image-20231213000328943

解锁原理:

1
2
3
public void unlock() {
sync.release(1);
}
1
2
3
4
5
6
7
8
9
10
// AQS类 
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁,返回true表示成功
Node h = head;
if (h != null && h.waitStatus != 0) // 头节点不为空,说明有唤醒队列,需要唤醒head节点后面的线程
unparkSuccessor(h);
return true;
}
return false;
}
  • tryRelease方法,尝试解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
// AQS类  
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 减去释放的值
if (Thread.currentThread() != getExclusiveOwnerThread()) // 不是当前线程持有锁,直接报错
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // state为0,表示锁释放成功
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // 当前线程持有锁,直接更新新值,不需要CAS
return free;
}
  • unparkSuccessor,唤醒当前节点的后继节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// AQS类  
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
// 尝试将当前节点状态重置为0,因为要完成对后续节点的唤醒,不需要-1了
compareAndSetWaitStatus(node, ws, 0);

// 找到需要unpark的节点,当前节点的下一个
Node s = node.next;

// 如果后继节点为null,或者已被取消,就不能唤醒,所以需要找到距离最新的非取消节点
if (s == null || s.waitStatus > 0) {
s = null;
// 从后向前,找到需要unpark的节点,直到t等于当前node为止,找不到就算了
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}

从后向前的原因:

enq方法中,节点是尾插法,首先赋值的是尾节点的前驱节点。

此时前驱节点的next并没有指向尾节点,从前向后遍历会丢失尾节点。

  • 唤醒的线程从之前park位置开始,继续在循环中tryAcquire(arg)尝试获取锁,如果成功
    • exclusiveOwnerThread为Thread-1,state=1
    • head指向Thread1所在的Node
    • 原本的head出队,然后被GC垃圾回收(图中有误,原本head的waitState改为0了)

image-20231213121916458

  • 如果有其他线程来竞争,假如此时有Thread-4来抢占了锁
    • exclusiveOwnerThread为Thread-4,state=1
    • Thread-1再次进入acquireQueued流程,获取锁失败,重新进入park阻塞

image-20231213124223536

公平锁原理

主要的区别在于tryAcquire方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class FairSync extends Sync {

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查AQS队列有没有待唤醒节点,或者待唤醒节点是不是自己,才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class AbstractQueuedSynchronizer 
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 1. 头尾指向一个节点,链表为空,返回false
// 2. 头尾之间有节点,头节点的下一个是不是空
// 3. 头节点下一个不是空,判断是不是本线程所有
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
}

可重入

如果没有可重入,拥有锁的线程再次加锁,自己也会被挡住,造成死锁

源码解析部分参考之前的:nonfairTryAcquiretryRelease

  • 判断ExclusiveOwnerThread是不是自己
  • 计数器增减

加锁一次,解锁两次,程序会直接报错

可打断

public void lockInterruptibly():获得可打断的锁

  • 如果没有竞争,就会获得lock对象锁
  • 如果有竞争,就进入阻塞队列,但是可以被其他线程用interrupt打断
1
2
3
4
5
6
7
8
9
10
11
12
Thread t1 = new Thread(() -> {        
try {
lock.lockInterruptibly(); // 尝试获取锁
} catch (InterruptedException e) {
return; // 没有获取到锁,在等待队列中被打断,直接返回
}
try {
// do sth // 获取到锁
} finally {
lock.unlock(); // 最终解锁
}
});

实现原理

  • 不可打断模式:即使被打断,任会驻留在AQS队列中,一直到获得锁之后才能直到自己被打断了
    • acquireQueued方法中,用interrupted变量记录了是否被打断过
1
2
3
4
5
6
7
8
9
10
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 阻塞等待
// 如果acquireQueued返回true,打断状态interrupted=true
selfInterrupt();
}

static void selfInterrupt() {
// 自己被打断了,重新产生一次打断,完成真正的中断效果
Thread.currentThread().interrupt();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted; // 还是要获得锁后,才能返回打断的状态
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 第二个条件,判断自己是否被打断了
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 阻塞当前线程,如果打断标记已经是true, 则 park会失效
return Thread.interrupted(); // 返回线程打断状态(且会清除打断标记)
}
  • 可打断模式
1
2
3
4
5
6
7
8
9
10
11
12
// ReentrantLock类方法
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

// AQS类方法
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 被打断了直接抛出错误
if (!tryAcquire(arg)) // CAS尝试获取锁
doAcquireInterruptibly(arg); // 没获取到锁,进入这里
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// AQS类方法
private void doAcquireInterruptibly(int arg) throws InterruptedException {

final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 和之前不一样,没有变量记录,如果被中断会直接抛出错误
}
} finally {
if (failed)
cancelAcquire(node); // 抛出错误,进入如下逻辑
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// AQS类方法
// 将当前节点出队
private void cancelAcquire(Node node) {

if (node == null)
return;
node.thread = null; // 当前节点的Thread置空
Node pred = node.prev; // 获取当前节点前的,第一个没被取消的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next; // 获取前驱节点的后继节点(可能是当前node,也可能是某个被取消的节点)
node.waitStatus = Node.CANCELLED; // 当前节点状态设置为1
if (node == tail && compareAndSetTail(node, pred)) { // 如果当前节点是尾节点,则将前驱节点设为尾节点
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head && // 当前节点不是head.next
((ws = pred.waitStatus) == Node.SIGNAL || // 前驱节点状态是不是-1
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 如果不是-1,前驱节点状态设为-1
pred.thread != null) { // 前驱节点的线程不为null
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 当前节点的后驱节点是正常节点
compareAndSetNext(pred, predNext, next); // 前驱的后继,设为当前节点的后继(队伍删除当前节点)
} else {
unparkSuccessor(node); // 当前节点就是head.next节点,唤醒后继节点
}
node.next = node;
}
}

锁超时

基本使用

public boolean tryLock():尝试获取锁,获取到返回 true,获取不到直接返回false

public boolean tryLock(long timeout, TimeUnit unit):在给定时间内获取锁,获取不到返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ReentrantLock lock = new ReentrantLock();
Thread thread = new Thread(() -> {
try {
if (!lock.tryLock(2, TimeUnit.SECONDS)) {
System.out.println("获取锁失败");
return;
}
} catch (InterruptedException e) {
System.out.println("被打断,获取不到锁");
return;
}

try {
// do sth
} finally {
lock.unlock();
}
});

实现原理

1
2
3
4
// ReetrantLock类方法
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
1
2
3
4
5
6
7
// ReetrantLock类方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 先尝试获取一次锁,没获取到进入doAcquireNanos逻辑
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
  • doAcquireNanos方法,在一定时间内获取锁,获取不到返回false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// static final long spinForTimeoutThreshold = 1000L;

// AQS类方法
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout; // 计算DDL
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime(); // 计算还需等待的时间
if (nanosTimeout <= 0L)
return false; // 超时还没拿到锁,直接返回false
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) // 如果还需等待的时间大于这个值,才有阻塞的意义
// 否则自旋好一些
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted()) // 如果被打断,抛出错误
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

条件变量

基本使用

  • synchronized的条件变量:当不满足时进入WaitSet等待
  • ReentrantLock的条件变量:强大之处在于,支持多个条件变量

ReentrantLock类获取Condition变量:public Condition newCondition()

  • void await():当前线程进入等待状态,释放锁
  • void signal():唤醒一个等待在当前Condition上的线程(必须已经获取了当前的锁)

使用流程:

  • await / signal前,需要获得锁
  • await执行后,会释放锁进入ConditionObject等待
  • await的线程被唤醒后,会重新竞争锁
  • 线程在条件队列中被打断,会抛出异常
  • 竞争lock成功后,从await后继续执行

实现原理

await

概括:

  • 开始Thread-0持有锁,调用await,线程进入ConditionObject等待,直到被唤醒或打断。如果被唤醒,就转移到AQS阻塞队列,等待获取锁。
image-20231216172849550
  • 每个Condition都包含一个队列

  • 将await线程包装成node节点,放入ConditionObject的条件队列。

  • 如果被唤醒就将node转移到AQS队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ConditionObject implements Condition, java.io.Serializable {

public final void await() throws InterruptedException {
if (Thread.interrupted()) // 当前线程是中断状态,直接抛出异常
throw new InterruptedException();
Node node = addConditionWaiter(); // 将调用await的线程包装成Node,添加到条件队列并返回
int savedState = fullyRelease(node); // 完全释放锁
int interruptMode = 0; // 设置状态为没有被打断
while (!isOnSyncQueue(node)) { // 阻塞,直至节点转移至AQS队列
LockSupport.park(this);
// 如果被打断,退出等待队列
// 对应的node也会迁移至AQS阻塞队列尾部,状态设为0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

// 逻辑到这里,说明当前线程已经进入AQS阻塞队列,且再次被唤醒了

// 尝试获取锁,释放多少锁,就要重新获取多少次锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;

// node在条件队列时,如果被外部线程中断唤醒,会加入到阻塞队列,但是并未设node.nextWaiter=null
if (node.nextWaiter != null)
unlinkCancelledWaiters(); // 清理条件队列内所有已取消的Node

if (interruptMode != 0) // 条件成立,说明期间发生过中断
reportInterruptAfterWait(interruptMode); // 应用打断模式
}
}
  • addConditionWaiter方法:将调用await的线程包装成Node,添加到条件队列并返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ConditionObject implements Condition, java.io.Serializable {

private Node addConditionWaiter() {
Node t = lastWaiter; // 条件队列尾节点
if (t != null && t.waitStatus != Node.CONDITION) { // 尾节点不是等待状态
unlinkCancelledWaiters(); // 清理所有已取消节点
t = lastWaiter; // 重新获取尾节点
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); // 创建节点,状态为等待状态
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

// 从头遍历,清理条件队列中,所有已取消的node(状态不为CONDITION)
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
}
  • 接下来进入AQS的fullyRelease方法,释放同步器上的锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// AQS类中的方法

// 线程可能重入,需要将state全部释放
final int fullyRelease(Node node) {
boolean failed = true; // 释放锁是否失败
try {
int savedState = getState(); // 获取当前线程持有的state值总数
if (release(savedState)) { // 解锁重入锁
failed = false; // 释放成功
return savedState; // 返回解锁的深度
} else {
throw new IllegalMonitorStateException(); // 解锁失败抛出异常
}
} finally {
if (failed) // 没有释放成功,将node设为取消状态
node.waitStatus = Node.CANCELLED;
}
}
  • Thread-0进入isOnSyncQueue(node)逻辑,判断节点是否移动到了阻塞队列中,没有就阻塞Thread-0
1
2
3
4
5
6
7
8
9
10
11
final boolean isOnSyncQueue(Node node) {
// node的状态是CONDITION(-2),或前驱为null,signal方法是先修改状态再迁移,所以前驱节点为空证明还【没有完成迁移】
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 说明当前节点已经成功入队到阻塞队列,且当前节点后面已经有其它node
if (node.next != null)
return true;
// 说明【可能在阻塞队列,但是是尾节点】
// 从阻塞队列的尾节点开始向前【遍历查找 node】,如果查找到返回 true,查找不到返回 false
return findNodeFromTail(node);
}
  • await线程park后如果被unpark或者被打断,都会进入 checkInterruptWhileWaiting 判断线程是否被打断
  • 在条件队列被打断的线程需要抛出异常
1
2
3
4
private int checkInterruptWhileWaiting(Node node) {
// 如果被中断了,根据是否在条件队列被中断的,设置中断状态码
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 如果是被中断唤醒的
final boolean transferAfterCancelledWait(Node node) {
// 条件成立说明当前node一定是在条件队列内,因为signal迁移节点到阻塞队列时,会将节点的状态修改为0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); // 把中断唤醒的node,加入到阻塞队列中
return true; // 表示是在条件队列内被中断了,设置interruptMode为THROW_IE(-1)
}

// 执行到这里的情况:
// 1.当前node已经被外部线程调用signal方法将其迁移到【阻塞队列】内了
// 2.当前node正在被外部线程调用signal方法将其迁移至【阻塞队列】进行中状态

// 如果当前线程还没到阻塞队列,一直释放 CPU
while (!isOnSyncQueue(node))
Thread.yield();

// 表示当前节点被中断唤醒时不在条件队列了,设置为REINTERRUPT为1
return false;
}
  • 最后开始处理中断状态
1
2
3
4
5
6
7
8
9
10
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
// 条件成立说明【在条件队列内发生过中断,此时await方法抛出中断异常】
if (interruptMode == THROW_IE)
throw new InterruptedException();

// 条件成立说明【在条件队列外发生的中断,此时设置当前线程的中断标记位为true】
else if (interruptMode == REINTERRUPT)
// 进行一次自己打断,产生中断的效果
selfInterrupt();
}
signal

fullyRelease中会unpark AQS队列中的下一个节点,来竞争锁,假设Thread-1竞争成功

image-20231216173049012

  • Thread-1调用signal方法
1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively()) // 判断调用signal方法的线程是否是独占锁
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 获取条件队列中第一个Node
if (first != null)
doSignal(first); // 不为空,就将该节点迁移至AQS队列
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void doSignal(Node first) {
do {
// 成立,说明当前节点的下一个节点是null,只剩这一个节点了
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 将等待的node转移至AQS队列,不成功且还有节点,则继续循环
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}

// signalAll()方法会调这个方法,唤醒所有节点
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
  • transferForSignal,将节点加入AQS队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final boolean transferForSignal(Node node) {
// CAS修改状态为0
// 如果修改失败,说明当前状态不是Node.CONDITION,说明线程被取消(await释放全部锁失败)或者中断了(可打断lock)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 返回false,寻找下一个节点
return false;

// 当前node入阻塞队列,p是前驱节点
Node p = enq(node);
int ws = p.waitStatus;

// 如果前驱节点被取消,或者不能设置状态为Node.SIGNAL,就unpark阻塞node.thread,直到它被唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
image-20231217112524497

哲学家就餐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class Main7 {
public static void main(String[] args) throws InterruptedException {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
}
}

class Philosopher extends Thread {

private String name;
private Chopstick left;
private Chopstick right;

public Philosopher(String name, Chopstick left, Chopstick right) {
super();
this.name = name;
this.left = left;
this.right = right;
}

@Override
public void run() {
while (true) {
if (left.tryLock()) {
try {
if (right.tryLock()) {
try {
System.out.println(name + " eating");
return;
} finally {
right.unlock();
}
}
} finally {
left.unlock();
}
}
}
}
}

class Chopstick extends ReentrantLock {
String name;

public Chopstick(String name) {
this.name = name;
}
}

ReentrantReadWriteLock

  • 独占锁:只能被一个线程持有
  • 共享锁:可以被多个线程持有

ReentrantReadWriteLock:读锁是共享锁,写锁是独占锁

基本使用

1
2
3
4
5
6
r.lock();
try {
// 临界区
} finally {
r.unlock();
}
  • 【读-读】能共存,【读-写】不能共存,【写-写】不能共存
  • 读锁不支持条件变量
  • 不支持重入升级。原本获取了读锁,不能重入写锁(不然获取写锁会永久等待)
  • 支持重入降级。原本获取了写锁,支持重入获取读锁(造成当前线程只持有读锁)
1
2
3
4
5
6
7
8
9
10
11
w.lock();
try {
r.lock(); // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
try {
// 临界区
} finally{
r.unlock(); // 要在写锁释放之前获取读锁
}
} finally{
w.unlock();
}

构造方法:

  • public ReentrantReadWriteLock():默认构造方法,非公平锁
  • public ReentrantReadWriteLock(boolean fair):true 为公平锁

常用API:

  • public ReentrantReadWriteLock.ReadLock readLock():返回读锁
  • public ReentrantReadWriteLock.WriteLock writeLock():返回写锁
  • public void lock():加锁
  • public void unlock():解锁
  • public boolean tryLock():尝试获取锁

缓存应用

缓存更新时,应该先清缓存,还是先更新数据库

  • 先清缓存:线程直接从数据库查到了旧数据
  • 先更新数据库:线程可能从缓存中拿到了旧数据

可以用读写锁进行操作

成员属性

读写锁公用一个Sync同步器,等待队列,state也是同一个。原理和ReetrantLock没什么区别,只是写状态占state的低16位,读状态占state的高16位

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {

private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;

final Sync sync;

public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
}

Sync类的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
abstract static class Sync extends AbstractQueuedSynchronizer {

static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 高16位的1(的单位)

static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 16个1,65535,表示最大重入次数

static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 掩码,低位16个1

// 读锁分配总次数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

// 写锁分配总次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

// 用于记录读线程自己持有的读锁的次数
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}

// 线程安全的存放线程各自的HoldCounter对象
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

// 内部类实例:
private transient HoldCounter cachedHoldCounter;
private transient ThreadLocalHoldCounter readHolds;

// 首次获取锁:
private transient Thread firstReader = null; // 第一个获取读锁的线程
private transient int firstReaderHoldCount; // 记录该线程持有的读锁次数(读重入次数)

// 构造方法:
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // AQS中,state是volatile修饰的
// 重写该值,可以将线程本地缓存写到主内存,确保对其他线程可见性
}
}

加锁原理

写锁加锁

  • t1线程:w.lock(写锁),成功加锁state=0_1
1
2
3
4
5
6
7
8
9
10
11
// ReentrantReadWriteLock类
public void lock() {
sync.acquire(1);
}

// AQS类
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  • 大体和ReetrantLock一致,最大的变动与state有关
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Sync内部类
protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 低16位(写锁)
if (c != 0) { // 不为0,说明有写锁或者读锁
if (w == 0 || current != getExclusiveOwnerThread()) // 有读锁,所以不能升级,直接false
// 有写锁,且不是自己,直接false
return false;

if (w + exclusiveCount(acquires) > MAX_COUNT) // 写锁重入,如果数量超了,报错
throw new Error("Maximum lock count exceeded");
setState(c + acquires); // 写锁重入成功,没有并发,所以不需要CAS
return true;
}

// 执行到这里,说明没有任何锁
if (writerShouldBlock() || // 判断是否需要阻止获取锁(主要为了公平锁而设置)
!compareAndSetState(c, c + acquires)) // 尝试获取锁
return false; // 失败返回false
setExclusiveOwnerThread(current);
return true;
}
1
2
3
4
// NonfairSync类
final boolean writerShouldBlock() { // 非公平锁writerShouldBlock总是false, 无需阻塞
return false;
}
1
2
3
4
// FairSync类
final boolean writerShouldBlock() { // 公平锁会检查AQS队列中是否有前驱节点, 没有才去竞争
return hasQueuedPredecessors();
}

读锁加锁

  • t2线程:r.lock(读锁)
1
2
3
4
5
6
7
8
9
10
// ReentrantReadWriteLock类
public void lock() {
sync.acquireShared(1);
}

// AQS类
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
  • tryAcquireShared尝试获取共享锁,负数表示失败
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Sync内部类
protected final int tryAcquireShared(int unused) {

Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && // 低16位(写锁)有人持有
getExclusiveOwnerThread() != current) // 写锁的持有者也不是自己
return -1; // 返回失败

int r = sharedCount(c); // 高16位,读锁分配出去的总次数
if (!readerShouldBlock() && // 判断是否需要阻止获取锁(主要为了公平锁而设置)
r < MAX_COUNT && // 锁总数有没有超
compareAndSetState(c, c + SHARED_UNIT)) { // CAS增加锁计数
if (r == 0) { // 以下加锁成功,如果这是第一个加锁成功的
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 第一个加锁成功的就是自己,然后发生了锁重入
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter; // cachedHoldCounter对象(最后一个获取读锁的线程设置的)
if (rh == null || rh.tid != getThreadId(current)) // cachedHoldCounter(为null,或者不是自己的)
cachedHoldCounter = rh = readHolds.get(); // 重写过initialValue方法,第一次这里会创建
else if (rh.count == 0)
readHolds.set(rh); // 计数为0,加入到readHolds中
rh.count++; // 重入+1
}
return 1;
}
return fullTryAcquireShared(current); // 逻辑到这里,说明应当阻止获取锁,或者CAS加锁失败
// 会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞
}
1
2
3
4
5
// NonfairSync:偏向写锁一些,若AQS中第一个节点是写锁,则不应该尝试获取锁,返回false
// 防止一直有读线程,让写线程饿死
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
1
2
3
4
// FairSync:等待队列有,则阻止获取锁
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
  • fullTryAcquireShared真正不断获取共享锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// ReentrantReadWriteLock类
final int fullTryAcquireShared(Thread current) {

HoldCounter rh = null; // 记录当前线程持有的读锁次数
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) { // 说明持有写锁,要是不是自己持有的写锁,直接返回失败
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) { // 同之前一样,判断是否应当阻止获取锁
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0) // 如果条件成立,说明HoldCounter对象是上一步代码新建的
// 当前线程不是锁重入,在readerShouldBlock返回true时,需要去排队
readHolds.remove(); // 防止内存泄露
}
}
if (rh.count == 0)
return -1; // 返回失败
}
}
if (sharedCount(c) == MAX_COUNT) // 越界判断
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 读锁加锁,条件内的逻辑和tryAcquireShared相同
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
  • 获取读锁失败,进入sync.doAcquireShared(1)流程开始阻塞
  • 首先也是调用addWaiter添加节点。不过,节点被设置为Node.SHARED模式,而非Node.EXCLUSIVE模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// AQS类
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 获取前驱节点
if (p == head) { // 前驱节点是头节点,就尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 获取成功,将自己设为头节点,唤醒后续连续的共享节点
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && // 是否在获取读锁失败后阻塞
parkAndCheckInterrupt()) // park当前线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
  • t2线程:当前节点设为头节点
  • 检查下一个节点是否是shared,是则将head状态从-1改为0,并唤醒下一个节点。
    • t3将恢复运行,唤醒下一个…
1
2
3
4
5
6
7
8
9
10
11
12
13
// AQS类
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node); // 自己设为头节点

// propagate 表示有共享资源(例如共享读锁或信号量),为0就没有资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next; // 获取下一个节点
if (s == null || s.isShared()) // 如果当前是最后一个节点,或下一个节点是等待共享锁的节点
doReleaseShared(); // 唤醒连续的后继所有共享节点
}
}
  • doReleaseShared,唤醒连续的,后续所有共享节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 读锁是共享的,防止其他线程也在释放锁
// CAS将state改为0,才能unpark
continue;
unparkSuccessor(h);
}
else if (ws == 0 && // 如果已经是0了,改为-3,来解决传播性
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 上一步已经将waitState改为0了
// 万一刚好有一个节点入队,把这个ws设为-1了,所以需要重新循环
continue;
}
if (h == head)
// 唤醒后继节点之后,后继节点没有更换头节点才会退出
// 在唤醒到队列尾之后头节点将不再改变,才可以结束
// 会产生一个唤醒的风暴,前面的线程都在唤醒后面的线程
break;
}
}
  • 如果获取锁没有成功,在doAcquireShared内:

    • for循环一次,shouldParkAfterFailedAcquire内把前驱节点的waitStatus改为-1

    • 再for循环一次,尝试tryAcquireShared,不成功就在parkAndCheckInterrupt()处park

image-20231217231852527
  • 假如后面又有t3,t4来获取读锁,就变成下面这样了
image-20231217231940862

解锁原理

写锁解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ReentrantReadWriteLock类
public void unlock() {
sync.release(1);
}

// AQS类
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0) // 头节点不为空,且状态不为0,唤醒后继非取消节点
unparkSuccessor(h);
return true;
}
return false;
}

// Sync类
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0; // 因为可重入的原因,计数为0才算释放成功
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

读锁解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
// ReentrantReadWriteLock类
public void unlock() {
sync.releaseShared(1);
}

// AQS类
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 将头节点从-1改为0,并唤醒下一个节点
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Sync类
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// 读锁的计数不会影响其他读锁进程,但会影响写锁进程,计数为0才是真正的释放
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

StampedLock

读写锁,JDK8加入,进一步优化读性能

特点:

  • 使用读锁、写锁时都必须配合戳使用
  • 不支持条件变量
  • 不支持重入

基本使用

  • 加解读锁
1
2
3
4
StampedLock lock = new StampedLock();
long stamp = lock.readLock();

lock.unlockRead(stamp); // 类似unpark,解指定的锁
  • 加解写锁
1
2
3
4
StampedLock lock = new StampedLock();
long stamp = lock.writeLock();

lock.unlockWrite(stamp);
  • 乐观锁,可以进行戳校验。通过,则表示期间没有写操作,数据可以安全使用;没通过,需要重新获取读锁
1
2
3
4
5
6
StampedLock lock = new StampedLock();

long stamp = lock.tryOptimisticRead();
if (!lock.validate(stamp)) {
// 锁升级
}

优化代码

  • 依靠乐观锁优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private final StampedLock lock = new StampedLock();

public int read() {
long stamp = lock.tryOptimisticRead();
// 戳有效,直接返回数据
if (lock.validate(stamp)) {
return data;
}

// 说明其他线程更改了戳,需要锁升级了,从乐观读升级到读锁
try {
stamp = lock.readLock();
return data;
} finally {
lock.unlockRead(stamp);
}
}

public void write(int val) {
long stamp = lock.writeLock();
try {
data = val;
} finally {
lock.unlockWrite(stamp);
}
}

CountDownLatch

基本使用

计数器,用来进行线程同步协作

构造器:

  • public CountDownLatch(int count):唤醒需要几步

常用API:

  • public void await():让当前线程等待
  • public void countDown():计数器减1

应用:同步多个远程调用结束(比如LOL游戏,10个人都准备好了才能开始)

实现原理

共享锁,解锁一次state减1。减到0后唤醒所有阻塞的节点。

阻塞等待

1
2
3
4
5
6
7
8
9
10
11
12
// CountDownLatch类
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// AQS类,之前有讲过
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
  • tryAcquireShared方法,在CountDownLatch中的Sync内部类
1
2
3
4
// Sync类
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
  • doAcquireSharedInterruptibly方法,之前也讲过,阻塞挂起
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// AQS类
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { // 前驱节点是头节点,就可以尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 获取锁成功,将当前节点设为head,并向后传播
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 阻塞在这里
throw new InterruptedException();
}
} finally {
if (failed) // 阻塞线程被中断,抛出异常,进入取消节点的逻辑
cancelAcquire(node);
}
}
  • setHeadAndPropagate方法,会唤醒后继节点
1
2
3
4
5
6
7
8
9
10
11
// AQS类
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node); // 当前节点设为head,前驱节点和持有线程设为null

if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next; // 获取后继节点
if (s == null || s.isShared()) // 当前节点是尾节点,或者后继节点是共享模式
doReleaseShared(); // 唤醒所有等待共享锁的节点
}
}

计数减一

1
2
3
4
5
6
7
8
9
10
11
12
13
// CountDownLatch类
public void countDown() {
sync.releaseShared(1);
}

// AQS类
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 尝试释放共享锁
doReleaseShared(); // 释放锁成功,唤醒阻塞节点
return true;
}
return false;
}
  • tryReleaseShared尝试释放锁
1
2
3
4
5
6
7
8
9
10
11
// Sync内部类
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) // 说明前面已经有线程触发唤醒操作了,返回false
return false;
int nextc = c-1; // 计数器减1
if (compareAndSetState(c, nextc)) // 自旋CAS
return nextc == 0; // 计数器减为0,返回true
}
}
  • state等于0,执行doReleaseShared方法,唤醒阻塞的节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) { // 队列不为空
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 头节点状态为signal(-1),说明后续有待唤醒节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // CAS自旋,将其waitState改为0
unparkSuccessor(h); // 唤醒后继节点
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 读锁解锁部分,有解释过这个问题,此处略过
}
if (h == head) // 读锁解锁部分,有解释过这个问题,此处略过
break;
}
}

CyclicBarrier

基本使用

  • 循环屏障,线程到达这个屏障时会被阻塞;所有线程都到了,触发自己运行一个任务;运行完成后,屏障开门,被拦截的线程又可以继续运行

常用方法:

  • public CyclicBarrier(int parties, Runnable barrierAction):用于在线程到达屏障时,执行barrierAction
    • parties:代表多少个线程到达屏障开始触发线程任务
    • barrierAction:线程任务
  • public int await():线程通知CyclicBarrier本线程已经到达屏障
1
2
3
4
5
6
7
8
9
10
11
12
13
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("finish");
});

for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}

实现原理

成员属性

  • 全局锁:利用可重入锁实现的工具类
1
2
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition(); // 挂起是基于Condition的
  • 线程数量
1
2
private final int parties;				// 有多少个线程到达屏障
private final Runnable barrierCommand; // 还有多少线程未到位,初始值为parties
  • 最后一个线程到位后,需要执行的事件
1
private final Runnable barrierCommand;
1
2
3
4
5
6
private static class Generation {
boolean broken = false; // 表示当前“代”是否被打破
} // 如果被打破,再来到这一代的线程直接抛出异常
// 且这一代挂起的线程都会被唤醒,然后抛出异常

private Generation generation = new Generation(); // 表示barrier对象,当前代
  • 构造方法
1
2
3
4
5
6
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException(); // 小于等于0就没有意义了
this.parties = partie
this.count = parties;
this.barrierCommand = barrierAction; // 可以为null
}

成员方法

  • await方法,阻塞等待所有线程到位
1
2
3
4
5
6
7
8
// CyclicBarrier类
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
  • dowait方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// CyclicBarrier类
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try {
final Generation g = generation; // 获取当前代

if (g.broken)
throw new BrokenBarrierException(); // 如果当前代已经被打破,直接抛出异常

if (Thread.interrupted()) { // 如果线程被中断了,打破当前代,然后抛出异常
breakBarrier(); // broken=true;count=parties;唤醒条件队列中所有线程
throw new InterruptedException();
}

int index = --count; // 假设数量为5,index则为4,3,2,1,0
if (index == 0) { // 条件成立,说明是最后一个到达的线程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 执行触发的任务
ranAction = true;
nextGeneration(); // 开启新的一代,这里会唤醒所有阻塞线程
return 0;
} finally {
if (!ranAction) // 如果执行command异常的话,会进入这里
breakBarrier();
}
}

// 自旋,一直到:条件满足;当前代被打破;线程被中断;等待超时
for (;;) {
try {
if (!timed) // 如果:不需要超时等待
trip.await(); // 当前线程释放掉锁,进入trip条件队列挂起自己,等待被唤醒
else if (nanos > 0L) // 如果:需要超时等待
nanos = trip.awaitNanos(nanos); // 使用能超时等待的await
} catch (InterruptedException ie) {
// 被中断后,来到这里
if (g == generation && ! g.broken) { // 当前代没有变化,且没有被打破
breakBarrier(); // 打破屏障
throw ie; // 抛出错误(以便后面finally中unlock)
} else {
Thread.currentThread().interrupt(); // 等待中,代变化了,自我打断
}
}

if (g.broken) // 唤醒后的线程,发现当前代被打破了,抛出异常
throw new BrokenBarrierException();

if (g != generation) // 线程挂起期间,最后一个线程到位了,触发了开启新一代的逻辑
return index;

if (timed && nanos <= 0L) { // 等待超时,当前线程主动转移到阻塞队列
breakBarrier();
throw new TimeoutException(); // 抛出超时异常
}
}
} finally {
lock.unlock(); // 解锁
}
}
  • breakBarrier():打破Barrier屏障
1
2
3
4
5
private void breakBarrier() {
generation.broken = true; // 表示这一代被打破了,再来到这一代的线程直接抛出错误
count = parties; // 重置count
trip.signalAll(); // 唤醒trip条件队列中所有线程
}
  • nextGeneration():开启新的一代
1
2
3
4
5
private void nextGeneration() {
trip.signalAll(); // 唤醒trip条件队列中所有线程
count = parties; // 重置count
generation = new Generation(); // 开启新的一代
}

Semaphore

基本使用

  • synchronized可以起到锁的作用,但是某个时间段内只能有一个线程允许执行

  • Semaphore(信号量)可以用来限制【能同时访问共享资源的线程上限】

  • 它是非重入锁

构造方法:

  • public Semaphore(int permits):permits 表示许可线程的数量(state)
  • public Semaphore(int permits, boolean fair):fair 表示公平性,如果设为 true,下次执行的线程会是等待最久的线程

常用API:

  • public void acquire():表示获取许可
  • public void release():表示释放许可,acquire() 和 release() 方法之间的代码为同步代码
1
2
3
4
5
6
7
8
9
10
11
12
13
Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < 9; i++) {
new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.release();
}
}).start();
}

实现原理

获取锁

  • Semaphore的state设为3,假设有5个线程来获取资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Semaphore类
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

// NonfairSync内部类
NonfairSync(int permits) {
super(permits);
}

// Sync内部类
Sync(int permits) {
setState(permits);
}
  • acquire获取资源
1
2
3
4
5
6
7
8
9
10
11
12
// Semaphore类
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// AQS类
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 尝试获取通信证
doAcquireSharedInterruptibly(arg); // 进入阻塞
}
  • tryAcquireShared尝试获取许可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// NonfairSync类
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

// NonfairSync类
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || // 剩余state不足直接返回false
compareAndSetState(available, remaining)) // 如果剩余state大于0,CAS尝试更改,返回是否成功
return remaining;
}
}
  • doAcquireSharedInterruptibly带阻塞的获取许可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// AQS类,和之前一样,不赘述了
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 将自己设为头,并唤醒后继节点
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

释放锁

  • release用于释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
// Semaphore类
public void release() {
sync.releaseShared(1);
}

// AQS类
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
// Sync类
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState(); // 当前可用资源数
int next = current + releases;
if (next < current) // 防止越界
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // 释放锁
return true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// AQS类,之前有讲过,不赘述
private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 设置Node.PROPAGATE的原因之前也讲过,不赘述
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}

Exchanger

用于线程间数据交换

工作流程:

  • 两个线程通过exchange方法交换数据
  • 一个线程先到达exchange方法,他会一直等待第二个线程也执行exchange方法。
  • 当两个线程都到达同步点时,两个线程就可以交换数据。

常用API:

  • public Exchanger():创建一个新的交换器
  • public V exchange(V x):等待另一个线程到达此交换点
  • public V exchange(V x, long timeout, TimeUnit unit):等待一定的时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
try {
String s = exchanger.exchange("我是A发的消息");
System.out.println("A收到消息:" + s);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();

new Thread(() -> {
try {
String s = exchanger.exchange("我是B发的消息");
System.out.println("B收到消息:" + s);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
  • 标题: JUC系列:(七)同步器
  • 作者: 布鸽不鸽
  • 创建于 : 2024-01-10 14:54:58
  • 更新于 : 2024-01-10 14:56:51
  • 链接: https://xuedongyun.cn//post/1126/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论