Java中的显示Lock
一 Lock接口
二 队列同步器 AQS
三 重入锁ReentrantLock
四 读写锁 ReentrantReadWriteLock
五 Condition参考:Java并发编程的艺术
一 Lock接口
Lock l = ...;
l.lock();
try {
// access the resource protected by this lock
} finally {
l.unlock();
}
1. API
method |
---|
void lock() 不可中断,获取锁后返回; |
void lockInterruptibly() throws InterruptedException 可中断的获取锁,响应中断; |
boolean tryLock(); 尝试非阻塞的获取锁,调用方法立即返回;true、false |
boolean tryLock(long time, TimeUnit unit) throws InterruptedException; 超时时间+响应中断 |
Condition newCondition(); 获取通知组件,组件和锁绑定,只有获得锁才可以调用组件的wait方法,调用后释放锁。 |
unlock |
2. 实现
ReentrantLock
ReentrantReadWriteLock.ReadLock
ReentrantReadWriteLock.WriteLock
3. Lock相比synchronized的优势
- 尝试非阻塞的获取锁:当前线程尝试去获得锁,如果这一时刻没有被其他线程获取到锁,则成功获取并持有锁;
- 能被中断的获取锁:与synchronized不同,获取锁的线程能响应中断,抛出中断异常;
- 超时获取锁:在指定的截止时间内获取锁,时间到了没有获取锁则返回;
二 队列同步器 AQS
-
AQS是用来构建锁或其他同步组件的基础框架。它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器与锁的关系:
锁是面向使用者的,定义了使用者与锁交互的接口,隐藏了实现细节;
同步器面向的是锁的是实现者,简化了锁的实现方式,屏蔽了同步状态的管理,线程的排队,等待与唤醒等底层操作;
例如:lock获取锁,内部是独占式/共享式的获取同步状态; 同步器提供的方法:
独占式获取与释放同步状态(是否响应中断和超时);
共享式获取与释放同步状态(是否响应中断和超时);
查询同步队列中的等待线程状态;
2.1 同步器的实现
-
同步队列(FIFO双向队列)
线程获取同步状态失败时,构造Node(当前线程+等待状态),添加到同步队列,同时阻塞当前线程;public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //构造节点信息,加入队列 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; 1. 尝试CAS方式添加节点:compareAndSetTail(pred, node)一次,失败后进入2; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } 2. 自旋式添加,直至成功入队列; enq(node); return node; } //2. 自旋式添加,直至成功入队列; private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
当同步状态释放时,唤醒后继节点,使其再次尝试获取同步状态;
private void unparkSuccessor(Node node) { ... /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); //唤醒 }
-
独占式同步状态的获取与释放(中断不响应,不会移除队列)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
同步队列中的节点以‘’自旋”的方式获取同步状态。
如果获取不到则阻塞节点中的线程,而被阻塞的线程的唤醒主要是依赖前节点的出队或阻塞线程被中断来实现;(这里的阻塞并不是Blocking状态而是Waiting状态)
只有前节点是头结点才有机会获取同步状态成功,保证FIFO。final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //“自旋”的方式 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //前节点是头结点获取到同步状态 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //判断条件是否LockSupport.park(this); interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
- 共享式同步状态的获取与释放
共享式同步状态与独占式同步状态最主要的区别是同一时刻能否有多个线程同时读取到同步状态。/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //前节点是头结点&&获取到同步状态 setHeadAndPropagate(node, r); //传递,考虑后节点也在等共享式同步状态 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
- 独占式超时获取同步状态(每次计算剩余时间)
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); //计算时间 if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); //阻塞带时间 if (Thread.interrupted()) //是否被中断 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
三 重入锁ReentrantLock
3.1 特点
- 支持重入锁:一个线程对资源重复加锁
- 支持公平锁(默认)、非公平锁
公平锁:先入先出,获取锁的一定是等待时间最长的线程; (减少饥饿)
非公平锁:反之,刚释放锁又可以拿到锁;(效率高)
3.2 实现重入
- 锁需要识别获取锁的线程是否是当前占有锁的线程,如果是,可以再次获取。
- 锁的释放:线程拿锁计数增加,线程释放锁计数减少,计数为0表示成功释放。
//非公平获取可重入锁 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //先判断有没有加锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //有锁的情况下,自己的锁还是其他线程的锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //计数增加 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
//释放锁 protected final boolean tryRelease(int releases) { int c = getState() - releases; //计数自检 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //为0锁已经释放成功 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
3.3 实现公平、非公平
- 公平锁,锁的顺序严格绝对时间FIFO
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//没有前驱节点时才可以尝试拿锁,FIFO if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } ... }
- 非公平锁
一个线程刚释放锁时再次获取同步状态的几率会非常大。进了队列之后的线程还是FIFO的。//非公平获取可重入锁 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //只要CAS设置成功就可以拿到锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } ... }
四 读写锁 ReentrantReadWriteLock
4.1 特点
- ReentrantReadWriteLock维护了一对锁,一个读锁一个写锁。读写分离提升并发性;
- 支持公平锁,非公平锁(默认);
- 支持可重入:读锁+读锁,写锁+写锁/读锁;
- 支持锁降级:遵循获取写锁,获取读锁,再释放写锁的次序,写锁能够降级为读锁。 (不支持锁升级)
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
Lock r = rwl.readLock();
Lock w = rwl.writeLock();
4.2 读写锁的实现
- 读写状态
读写锁的同步状态需要维护两种状态,采用“按位切割使用”的方式,高16位表示读,低16位表示写。 - 写锁
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // 有读锁false,有其他线程的写锁false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) //公平锁非公平锁 return false; setExclusiveOwnerThread(current); return true; }
- 读锁
//示意 protected final int tryAcquireShared(int unused) { for (;;) { int c = getState(); int nextc = c + (1 << 16); if (nextc < c) { throw new Error("Maxium lock count exceeded"); } //有其他人的写锁,无法获取读锁 if (exclusiveCount(c) != 0 && owner != Thread.currentThread()) { return -1; } if(compareAndSetState(c,nextc)) { return 1; } } }
五 Condition
5.1 介绍
任意一个java对象,都拥有一组监视器方法wait和notify,与synchronized配合使用实现等待/通知模式;
Condition接口也提供了类似Object的监视器方法,与Lock配合使用实现等待/通知模式;
- Object monitor method:一个等待队列,不支持在等待状态中不响应中断,不支持
- Condition:多个等待队列,支持在等待状态中不响应中断;
5.2 Condition实现
每个Condition对象都包含一个FIFO等待队列,该队列实现Condition对象的等待、通知功能。
@梦工厂 2018.3.22