AQS 核心思想 AQS(AbstractQueuedSynchronizer),是阻塞式锁的框架,许多同步类都依赖于该同步器
独占与共享:
AQS核心思想:
如果请求的资源空闲,则将当前线程设为有效的工作线程,资源设为锁定状态 如果请求的资源锁定,则将当前线程加入到队列中 CLH是一种基于单向链表的高性能、公平的自旋锁。AQS将所有请求的线程封装为CLH锁队列的一个结点,来实现锁的分配
设计原理 1 2 3 4 5 6 while (state 状态不允许获取) { if (队列中没有此线程) { 入队并阻塞 park } } 当前线程出队
1 2 3 if (state 状态允许了) { 恢复阻塞的线程(s) unpark }
AQS中state设计:
state使用32位 int来维护同步状态,0表示未加锁,大于0表示已加锁 state使用volatile,配合CAS保证修改的原子性 state表示线程重入的次数(独占模式),或者剩余许可数(共享模式) state APIprotected final int getState()
:获取state状态protected final void setState(int newState)
:设置state状态protected final boolean compareAndSetState(int expect,int update)
:CAS安全设置state 线程的Node节点中waitstate设计:
使用volatile,配合CAS保证修改的原子性 状态有以下几种 1 2 3 4 5 volatile int waitStatus; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ;
阻塞恢复设计:
使用park-unpark来实现线程的暂停和恢复(命令的先后不影响结果) park线程可以通过interrupt来打断 队列设计:
1 2 3 4 5 6 7 8 9 10 11 12 private transient volatile Node head; private transient volatile Node tail; static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; }
条件变量,来实现:等待、唤醒 支持多个条件变量,类似Monitor的WaitSet 条件队列是单向队列 1 2 3 4 5 public class ConditionObject implements Condition , java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; }
模板对象 使用者继承 AbstractQueuedSynchronizer
并重写指定的方法 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,这些模板方法会调用使用者重写的方法 自定义同步器需要重写下面几个AQS提供的模板方法
1 2 3 4 5 isHeldExclusively() tryAcquire(int ) tryRelease(int ) tryAcquireShared(int ) tryReleaseShared(int )
默认,这些方法会抛出UnsupportedOperationException
这些方法必须内部线程安全 自定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public class MyLock implements Lock { class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { this .setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryReleaseShared (int arg) { this .setExclusiveOwnerThread(null ); this .setState(0 ); return true ; } @Override protected boolean isHeldExclusively () { return this .getState() == 1 ; } public Condition newCondition () { return new ConditionObject (); } } private MySync sync = new MySync (); @Override public void lock () { sync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { sync.release(1 ); } @Override public Condition newCondition () { return sync.newCondition(); } }
ReentrantLock 对比synchronized 锁的实现:ReentrantLock是JDK实现的,synchronized是JVM实现的 性能:ReentrantLock和synchronized大致相同 使用:ReentrantLock需要手动解锁,synchronized自动解锁 可中断 :ReentrantLock可中断,而synchronized不行公平锁 :ReentrantLock可以设置公平锁,synchronized中的锁是非公平的锁超时 :ReentrantLock可以设置超时时间,synchronized会一直等待锁绑定多个条件 :ReentrantLock可以同时绑定多个Condition对象(细粒度的唤醒)可重入:两者都是可重入锁 使用锁 public void lock()
:获得锁锁没有被其他线程占用,计数设为1 锁被当前线程占用,计数+1 锁被其他线程占用,进入休眠状态 public void unlock()
:尝试释放锁锁被当前线程占用,计数-1;如果计数器为0,则锁被释放 如果锁不是被当前线程占用,抛出异常 1 2 3 4 5 6 reentrantLock.lock(); try { } finally { reentrantLock.unlock(); }
公平锁 基本使用 构造方法:ReentrantLock lock = new ReentrantLock(true)
1 2 3 4 5 6 7 public ReentrantLock () { sync = new NonfairSync (); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
非公平锁原理 NonfairSync
继承自AQS
NonfairSync
-> Sync
-> AQS
加锁原理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire尝试获取锁,只有两种情况能成功当前AQS锁未被占有 当前AQS锁被自己占有,可以进行锁重入 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
接下来是addWaiter的逻辑图中黄色三角表示Node的waitStatus状态,0为默认正常状态 Node的创建是懒惰的,第一个Node是哑元,用来占位
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 abstract static class Sync extends AbstractQueuedSynchronizer { private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } }
接下来是acquireQueued返回true表示被中断唤醒过 返回false表示从未被中断唤醒过 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire,判断是否应当park将前驱的node的waitStatus改为Node.SIGNAL(-1) waitStatus为-1的节点用来唤醒下一个节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 abstract static class Sync extends AbstractQueuedSynchronizer { private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } }
parkAndCheckInterrupt,判断是否被打断 1 2 3 4 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
解锁原理:
1 2 3 public void unlock () { sync.release(1 ); }
1 2 3 4 5 6 7 8 9 10 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
unparkSuccessor,唤醒当前节点的后继节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
从后向前的原因:
enq方法中,节点是尾插法,首先赋值的是尾节点的前驱节点。
此时前驱节点的next并没有指向尾节点,从前向后遍历会丢失尾节点。
唤醒的线程从之前park位置开始,继续在循环中tryAcquire(arg)
尝试获取锁,如果成功exclusiveOwnerThread为Thread-1,state=1 head指向Thread1所在的Node 原本的head出队,然后被GC垃圾回收(图中有误,原本head的waitState改为0了)
如果有其他线程来竞争,假如此时有Thread-4来抢占了锁exclusiveOwnerThread为Thread-4,state=1 Thread-1再次进入acquireQueued流程,获取锁失败,重新进入park阻塞
公平锁原理 主要的区别在于tryAcquire方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static final class FairSync extends Sync { protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } }
可重入 如果没有可重入,拥有锁的线程再次加锁,自己也会被挡住,造成死锁
源码解析部分参考之前的:nonfairTryAcquire
和tryRelease
判断ExclusiveOwnerThread是不是自己 计数器增减 加锁一次,解锁两次,程序会直接报错
可打断 public void lockInterruptibly()
:获得可打断的锁
如果没有竞争,就会获得lock对象锁 如果有竞争,就进入阻塞队列,但是可以被其他线程用interrupt打断 1 2 3 4 5 6 7 8 9 10 11 12 Thread t1 = new Thread (() -> { try { lock.lockInterruptibly(); } catch (InterruptedException e) { return ; } try { } finally { lock.unlock(); } });
实现原理 不可打断模式:即使被打断,任会驻留在AQS队列中,一直到获得锁之后才能直到自己被打断了 acquireQueued方法中,用interrupted变量记录了是否被打断过 1 2 3 4 5 6 7 8 9 10 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } static void selfInterrupt () { Thread.currentThread().interrupt(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
1 2 3 4 5 6 7 8 9 10 11 12 public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
锁超时 基本使用 public boolean tryLock()
:尝试获取锁,获取到返回 true,获取不到直接返回false
public boolean tryLock(long timeout, TimeUnit unit)
:在给定时间内获取锁,获取不到返回false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ReentrantLock lock = new ReentrantLock ();Thread thread = new Thread (() -> { try { if (!lock.tryLock(2 , TimeUnit.SECONDS)) { System.out.println("获取锁失败" ); return ; } } catch (InterruptedException e) { System.out.println("被打断,获取不到锁" ); return ; } try { } finally { lock.unlock(); } });
实现原理 1 2 3 4 public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(timeout)); }
1 2 3 4 5 6 7 public final boolean tryAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
doAcquireNanos方法,在一定时间内获取锁,获取不到返回false 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private boolean doAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L ) return false ; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return true ; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L ) return false ; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if (Thread.interrupted()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
条件变量 基本使用 synchronized的条件变量:当不满足时进入WaitSet等待 ReentrantLock的条件变量:强大之处在于,支持多个条件变量 ReentrantLock类获取Condition变量:public Condition newCondition()
void await()
:当前线程进入等待状态,释放锁void signal()
:唤醒一个等待在当前Condition上的线程(必须已经获取了当前的锁)使用流程:
await / signal前,需要获得锁 await执行后,会释放锁进入ConditionObject等待 await的线程被唤醒后,会重新竞争锁 线程在条件队列中被打断,会抛出异常 竞争lock成功后,从await后继续执行 实现原理 await 概括:
开始Thread-0持有锁,调用await,线程进入ConditionObject等待,直到被唤醒或打断。如果被唤醒,就转移到AQS阻塞队列,等待获取锁。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class ConditionObject implements Condition , java.io.Serializable { public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } }
addConditionWaiter方法:将调用await的线程包装成Node,添加到条件队列并返回 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class ConditionObject implements Condition , java.io.Serializable { private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void unlinkCancelledWaiters () { Node t = firstWaiter; Node trail = null ; while (t != null ) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null ; if (trail == null ) firstWaiter = next; else trail.nextWaiter = next; if (next == null ) lastWaiter = trail; } else trail = t; t = next; } } }
接下来进入AQS的fullyRelease方法,释放同步器上的锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException (); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
Thread-0进入isOnSyncQueue(node)逻辑,判断节点是否移动到了阻塞队列中,没有就阻塞Thread-0 1 2 3 4 5 6 7 8 9 10 11 final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null ) return false ; if (node.next != null ) return true ; return findNodeFromTail(node); }
await线程park后如果被unpark或者被打断,都会进入 checkInterruptWhileWaiting 判断线程是否被打断 在条件队列被打断的线程需要抛出异常 1 2 3 4 private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final boolean transferAfterCancelledWait (Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0 )) { enq(node); return true ; } while (!isOnSyncQueue(node)) Thread.yield (); return false ; }
1 2 3 4 5 6 7 8 9 10 private void reportInterruptAfterWait (int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException (); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
signal fullyRelease中会unpark AQS队列中的下一个节点,来竞争锁,假设Thread-1竞争成功
1 2 3 4 5 6 7 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void doSignal (Node first) { do { if ((firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); }
transferForSignal,将节点加入AQS队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
哲学家就餐 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class Main7 { public static void main (String[] args) throws InterruptedException { Chopstick c1 = new Chopstick ("1" ); Chopstick c2 = new Chopstick ("2" ); Chopstick c3 = new Chopstick ("3" ); Chopstick c4 = new Chopstick ("4" ); Chopstick c5 = new Chopstick ("5" ); new Philosopher ("苏格拉底" , c1, c2).start(); new Philosopher ("柏拉图" , c2, c3).start(); new Philosopher ("亚里士多德" , c3, c4).start(); new Philosopher ("赫拉克利特" , c4, c5).start(); new Philosopher ("阿基米德" , c5, c1).start(); } } class Philosopher extends Thread { private String name; private Chopstick left; private Chopstick right; public Philosopher (String name, Chopstick left, Chopstick right) { super (); this .name = name; this .left = left; this .right = right; } @Override public void run () { while (true ) { if (left.tryLock()) { try { if (right.tryLock()) { try { System.out.println(name + " eating" ); return ; } finally { right.unlock(); } } } finally { left.unlock(); } } } } } class Chopstick extends ReentrantLock { String name; public Chopstick (String name) { this .name = name; } }
ReentrantReadWriteLock 独占锁:只能被一个线程持有 共享锁:可以被多个线程持有 ReentrantReadWriteLock:读锁是共享锁,写锁是独占锁
基本使用 1 2 3 4 5 6 r.lock(); try { } finally { r.unlock(); }
【读-读】能共存,【读-写】不能共存,【写-写】不能共存 读锁不支持条件变量 不支持重入升级。原本获取了读锁,不能重入写锁(不然获取写锁会永久等待) 支持重入降级。原本获取了写锁,支持重入获取读锁(造成当前线程只持有读锁) 1 2 3 4 5 6 7 8 9 10 11 w.lock(); try { r.lock(); try { } finally { r.unlock(); } } finally { w.unlock(); }
构造方法:
public ReentrantReadWriteLock()
:默认构造方法,非公平锁public ReentrantReadWriteLock(boolean fair)
:true 为公平锁常用API:
public ReentrantReadWriteLock.ReadLock readLock()
:返回读锁public ReentrantReadWriteLock.WriteLock writeLock()
:返回写锁public void lock()
:加锁public void unlock()
:解锁public boolean tryLock()
:尝试获取锁缓存应用 缓存更新时,应该先清缓存,还是先更新数据库
先清缓存:线程直接从数据库查到了旧数据 先更新数据库:线程可能从缓存中拿到了旧数据 可以用读写锁进行操作
成员属性 读写锁公用一个Sync同步器,等待队列,state也是同一个。原理和ReetrantLock没什么区别,只是写状态占state的低16位,读状态占state的高16位
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ReentrantReadWriteLock implements ReadWriteLock , java.io.Serializable { private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); readerLock = new ReadLock (this ); writerLock = new WriteLock (this ); } }
Sync类的属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16 ; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1 ; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 ; static int sharedCount (int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount (int c) { return c & EXCLUSIVE_MASK; } static final class HoldCounter { int count = 0 ; final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal <HoldCounter> { public HoldCounter initialValue () { return new HoldCounter (); } } private transient HoldCounter cachedHoldCounter; private transient ThreadLocalHoldCounter readHolds; private transient Thread firstReader = null ; private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter (); setState(getState()); } }
加锁原理 写锁加锁 t1线程:w.lock(写锁),成功加锁state=0_1 1 2 3 4 5 6 7 8 9 10 11 public void lock () { sync.acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
大体和ReetrantLock一致,最大的变动与state有关 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
1 2 3 4 final boolean writerShouldBlock () { return false ; }
1 2 3 4 final boolean writerShouldBlock () { return hasQueuedPredecessors(); }
读锁加锁 1 2 3 4 5 6 7 8 9 10 public void lock () { sync.acquireShared(1 ); } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }
tryAcquireShared尝试获取共享锁,负数表示失败 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); }
1 2 3 4 5 final boolean readerShouldBlock () { return apparentlyFirstQueuedIsExclusive(); }
1 2 3 4 final boolean readerShouldBlock () { return hasQueuedPredecessors(); }
fullTryAcquireShared真正不断获取共享锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null ) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0 ) readHolds.remove(); } } if (rh.count == 0 ) return -1 ; } } if (sharedCount(c) == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null ) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1 ; } } }
获取读锁失败,进入sync.doAcquireShared(1)流程开始阻塞 首先也是调用addWaiter添加节点。不过,节点被设置为Node.SHARED模式,而非Node.EXCLUSIVE模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
t2线程:当前节点设为头节点 检查下一个节点是否是shared,是则将head状态从-1改为0,并唤醒下一个节点。 1 2 3 4 5 6 7 8 9 10 11 12 13 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
doReleaseShared,唤醒连续的,后续所有共享节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
假如后面又有t3,t4来获取读锁,就变成下面这样了 解锁原理 写锁解锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; }
读锁解锁 1 2 3 4 5 6 7 8 9 10 11 12 13 public void unlock () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 protected final boolean tryReleaseShared (int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
StampedLock 读写锁,JDK8加入,进一步优化读性能
特点:
使用读锁、写锁时都必须配合戳使用 不支持条件变量 不支持重入 基本使用 1 2 3 4 StampedLock lock = new StampedLock ();long stamp = lock.readLock();lock.unlockRead(stamp);
1 2 3 4 StampedLock lock = new StampedLock ();long stamp = lock.writeLock();lock.unlockWrite(stamp);
乐观锁,可以进行戳校验。通过,则表示期间没有写操作,数据可以安全使用;没通过,需要重新获取读锁 1 2 3 4 5 6 StampedLock lock = new StampedLock ();long stamp = lock.tryOptimisticRead();if (!lock.validate(stamp)) { }
优化代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private final StampedLock lock = new StampedLock ();public int read () { long stamp = lock.tryOptimisticRead(); if (lock.validate(stamp)) { return data; } try { stamp = lock.readLock(); return data; } finally { lock.unlockRead(stamp); } } public void write (int val) { long stamp = lock.writeLock(); try { data = val; } finally { lock.unlockWrite(stamp); } }
CountDownLatch 基本使用 计数器,用来进行线程同步协作
构造器:
public CountDownLatch(int count)
:唤醒需要几步常用API:
public void await()
:让当前线程等待public void countDown()
:计数器减1应用:同步多个远程调用结束(比如LOL游戏,10个人都准备好了才能开始)
实现原理 共享锁,解锁一次state减1。减到0后唤醒所有阻塞的节点。
阻塞等待 1 2 3 4 5 6 7 8 9 10 11 12 public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
tryAcquireShared方法,在CountDownLatch中的Sync内部类 1 2 3 4 protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; }
doAcquireSharedInterruptibly方法,之前也讲过,阻塞挂起 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
setHeadAndPropagate方法,会唤醒后继节点 1 2 3 4 5 6 7 8 9 10 11 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
计数减一 1 2 3 4 5 6 7 8 9 10 11 12 13 public void countDown () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
1 2 3 4 5 6 7 8 9 10 11 protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
state等于0,执行doReleaseShared方法,唤醒阻塞的节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
CyclicBarrier 基本使用 循环屏障,线程到达这个屏障时会被阻塞;所有线程都到了,触发自己运行一个任务;运行完成后,屏障开门,被拦截的线程又可以继续运行 常用方法:
public CyclicBarrier(int parties, Runnable barrierAction)
:用于在线程到达屏障时,执行barrierActionparties:代表多少个线程到达屏障开始触发线程任务 barrierAction:线程任务 public int await()
:线程通知CyclicBarrier本线程已经到达屏障1 2 3 4 5 6 7 8 9 10 11 12 13 CyclicBarrier cyclicBarrier = new CyclicBarrier (3 , () -> { System.out.println("finish" ); }); for (int i = 0 ; i < 3 ; i++) { new Thread (() -> { try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException (e); } }).start(); }
实现原理 成员属性 1 2 private final ReentrantLock lock = new ReentrantLock ();private final Condition trip = lock.newCondition();
1 2 private final int parties; private final Runnable barrierCommand;
1 private final Runnable barrierCommand;
1 2 3 4 5 6 private static class Generation { boolean broken = false ; } private Generation generation = new Generation ();
1 2 3 4 5 6 public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException (); this .parties = partie this .count = parties; this .barrierCommand = barrierAction; }
成员方法 1 2 3 4 5 6 7 8 public int await () throws InterruptedException, BrokenBarrierException { try { return dowait(false , 0L ); } catch (TimeoutException toe) { throw new Error (toe); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException (); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException (); } int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException (); if (g != generation) return index; if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException (); } } } finally { lock.unlock(); } }
breakBarrier():打破Barrier屏障 1 2 3 4 5 private void breakBarrier () { generation.broken = true ; count = parties; trip.signalAll(); }
1 2 3 4 5 private void nextGeneration () { trip.signalAll(); count = parties; generation = new Generation (); }
Semaphore 基本使用 构造方法:
public Semaphore(int permits)
:permits 表示许可线程的数量(state)public Semaphore(int permits, boolean fair)
:fair 表示公平性,如果设为 true,下次执行的线程会是等待最久的线程常用API:
public void acquire()
:表示获取许可public void release()
:表示释放许可,acquire() 和 release() 方法之间的代码为同步代码1 2 3 4 5 6 7 8 9 10 11 12 13 Semaphore semaphore = new Semaphore (3 );for (int i = 0 ; i < 9 ; i++) { new Thread (() -> { try { semaphore.acquire(); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { semaphore.release(); } }).start(); }
实现原理 获取锁 Semaphore的state设为3,假设有5个线程来获取资源 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public Semaphore (int permits ) { sync = new NonfairSync (permits ); } NonfairSync(int permits ) { super (permits ); } Sync(int permits ) { setState(permits ); }
1 2 3 4 5 6 7 8 9 10 11 12 public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
doAcquireSharedInterruptibly带阻塞的获取许可 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
释放锁 1 2 3 4 5 6 7 8 9 10 11 12 13 public void release () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
1 2 3 4 5 6 7 8 9 10 11 protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
Exchanger 用于线程间数据交换
工作流程:
两个线程通过exchange方法交换数据 一个线程先到达exchange方法,他会一直等待第二个线程也执行exchange方法。 当两个线程都到达同步点时,两个线程就可以交换数据。 常用API:
public Exchanger()
:创建一个新的交换器public V exchange(V x)
:等待另一个线程到达此交换点public V exchange(V x, long timeout, TimeUnit unit)
:等待一定的时间1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Exchanger<String> exchanger = new Exchanger <>(); new Thread (() -> { try { String s = exchanger.exchange("我是A发的消息" ); System.out.println("A收到消息:" + s); } catch (InterruptedException e) { throw new RuntimeException (e); } }).start(); new Thread (() -> { try { String s = exchanger.exchange("我是B发的消息" ); System.out.println("B收到消息:" + s); } catch (InterruptedException e) { throw new RuntimeException (e); } }).start();