本文是Java线程安全和并发编程知识总结的一部分。
2.2.2 条件锁Conditioin
从实际上来看,条件锁本身其实并不是锁,而是从锁上获取到的一个工具包,这个工具包在底层通过条件线程队列,将一组线程和某个条件绑定起来,并提供了适当的时候挂起当前线程、唤醒(条件线程队列内匹配的)其他线程、或被其(条件线程队列内)他线程唤醒的方法。
Condition本身表现为一个接口,以及JDK的内置实现ConditionObject,并提供如下方法:
- await(): 该方法将导致当前线程挂起,释放锁,直到被其他线程唤醒。
- signal(): 该方法将唤醒等候当前条件的线程队列中的某个线程。
- signalAll(): 该方法将唤醒等候当前条件的线程队列中的所有线程。
- await的各种重载方法或类似方法:awaitUninterruptibly、awaitNanos、awaitUntil
条件对象和条件线程队列的关联是隐式的,当你使用Condition上的上述方法时,就启用了对应的条件线程队列,我们无需直接去创建、维护和使用条件线程队列。同时每个条件对象都对应了一个条件线程队列。实际上,JDK提供的两种条件对象的实现ConditionObject
,本身就是两种条件队列实现类 AbstractQueuedSynchronizer
、AbstractQueuedLongSynchronizer
的内部类。
实际上,正如每个对象背后都有一个内置锁一样,每个对象背后,都有一个内置条件线程队列。当你使用Object提供的如下方法时,你就激活了这个队列:
- wait(): 挂起当前线程直到被唤醒。
- notify(): 唤醒等候当前对象内置锁的内置条件队列中的某个线程。
- notifyAll():唤醒等候当前对象内置锁的内置条件队列中的所有线程。
看看是不是和Condition的对应方法非常像?差别只在于内置条件线程队列是和对象的内置锁绑定的,也就是内置锁只有一个内置条件线程队列;而通过Condition访问的条件线程队列是和创建Condition的锁绑定给的,因此一个锁可以存着多个条件线程队列,只要通过。
内置锁、内置条件线程队列,他们已经被默认提供并在你使用synchronized关键词时被使用,非常的方便。但也失去了一些灵活性。比如,你不能将内置锁和任意条件关联。而通过Condition以及其背后的条件线程队列,则允许你自己控制锁到底和那个(或那几个)条件关联。
如何使用条件锁
条件对象和那个状态相关联,是由调用者决定的。下面通过一个例子来说明如何使用。假设我们要实现一个连接池的,获取连接的方法满足如下行为:
- 当池中有空闲连接时,获取线程方法立即返回并成功;
- 当池中无空闲连接且连接池未满,则获取连接的线程阻塞直到有空闲连接;
- 当池已满,且无空闲连接,则获取连接的线程阻塞直到超时或有空闲连接;
- 为了简化问题,其他方法就不提供实现了,示意即可;
/**
* @author xx
* 2020年2月5日 下午5:28:18 xx
*/
public class ConnectionPool<T> {
private final Lock lock = new ReentrantLock();
/**
* 连接池的容量
*/
private int capacity;
/**
* 获取连接的超时时长
*/
private int fetchTimeOut;
/**
* 当前连接池的总大小
*/
private int totalSize;
/**
* 代表连接池尚未满的条件(连接数未超过池大小,和状态属性 totalSize 和 capacity 有关)
*/
private final Condition isFull = this.lock.newCondition();
/**
* 代表连接池未满但无空闲连接的条件。
*/
private final Condition needIncr = this.lock.newCondition();
/**
* 空闲连接的容器
*/
private LinkedList<T> freeConnections;
/**
* 正在使用的连接的容器
*/
private LinkedList<T> busyConnections;
/**
*
* 构造函数
* @param capacity 连接池大小
* @param fetchTimeOut 连接池获取连接超时时长(s)
*/
public ConnectionPool(int capacity, int fetchTimeOut) {
this.capacity = capacity;
this.fetchTimeOut = fetchTimeOut;
this.totalSize = 0;
this.freeConnections = new LinkedList<T>();
this.busyConnections = new LinkedList<T>();
}
/**
* 缓存的连接低于1个时,自动增加空闲连接。
* 2020年2月5日 下午6:40:36 xx添加此方法
*/
public void incNewConnections() {
this.lock.lock();
try {
// 如果空闲连接数多余1个,则当前线程休眠,并放入 needIncr 条件对应的条件线程队列。
while (this.freeConnections.size() > 1) {
// 该语句将导致当前线程休眠,并被放入 needIncr 条件对应的条件线程队列
this.needIncr.await();
}
// 新缓存一批空闲连接
this.createConnections();
// 该语句会唤醒 needIncr 条件对应的条件线程队列中所有线程(获取连接的线程),方便通知获取连接的线程可以获取连接了
this.needIncr.notifyAll();
} catch (InterruptedException e) {
throw new RuntimeException("创建连接时线程中断异常", e);
} finally {
this.lock.unlock();
}
}
/**
* 从连接池获取一个连接
* 2020年2月5日 下午5:28:18 xx添加此方法
*/
public T fetchConnectioin(int index) {
this.lock.lock();
try {
// 如果连接池满了,则当前线程休眠,等到有人释放连接或超时
while (this.totalSize == this.capacity) {
// 该语句将导致当前线程休眠并被放入 isFull 条件对应的条件线程队列
this.isFull.await(this.fetchTimeOut, TimeUnit.SECONDS);
}
// 如果没有空闲连接,则当前线程挂起等待连接初始化完成
if (this.freeConnections.isEmpty()) {
// 该语句将导致当前线程休眠并被放入 needIncr 条件对应的条件线程队列,等待连接创建线程完成工作后唤醒。
this.needIncr.await();
}
T element = this.freeConnections.remove(this.freeConnections.size() - 1);
this.busyConnections.add(element);
return element;
} catch (InterruptedException e) {
throw new RuntimeException("获取连接时发生线程中断", e);
} finally {
this.lock.unlock();
}
}
/**
* 创建一个物理连接。
* 本方法无需线程安全保护,因为它会被其他线程安全的方法调用。
* 2020年2月5日 下午5:28:18 xx添加此方法
* @param batchSize 一批次大小
* @return
*/
private void createConnections() {
// 如果空闲连接数少于等于1个,则新创建最多10个连接并缓存起来
int batchSize = this.capacity - this.totalSize;
if (batchSize > 10) {
batchSize = 10;
}
// 这里模拟创建连接的逻辑
List<T> ts = new ArrayList<>(10);
this.freeConnections.addAll(ts);
this.totalSize += batchSize;
}
/**
* 将连接释放会连接池
* 2020年2月5日 下午5:28:18 xx添加此方法
*/
public void releaseConnection(T connection) {
this.lock.lock();
try {
if (this.busyConnections.remove(connection)) {
this.freeConnections.add(connection);
// 该语句将唤醒 isFull 条件对应条件线程队列中的所有线程(都是在等待空闲连接的线程)
this.isFull.signalAll();
}
} finally {
this.lock.unlock();
}
}
}
温馨提示:
本连接池例子只是为了说明如何使用条件锁,真实的连接池绝非如此简单,请勿模仿。
这个例子想要说明如下几点:
- 条件是通过锁创建的。
- 锁创建条件,那个锁创建的条件,就和那个锁绑定。
- 一个锁可以创建多个条件。
- 内置锁有且只有一个对应的内置条件线程队列。
- 条件锁中的条件(本例中的 isFull 属性),实际上是代表一种状态的抽象概念;至于这种状态具体是什么,有什么意义,都由调用者的逻辑决定;如果调用者的逻辑使用不当,JDK的条件锁本身,并不能分辨,也不能控制。
- 本例中的 isFull 条件,实际上是和
this.totalSize == this.capacity
这个场景关联的。- 本例中的 needIncr 条件,实际上是和
this.freeConnections.size() <= 1
这个场景对应的。- 如果两个条件使用的场景发生错乱或遗留,JDK无法发现,将可能导致线程泄露、死锁或其他锁使用不当问题。
- 条件本身,实际上只是用来和背后的条件线程队列沟通的工具包;每个条件都有对应的一个条件线程队列。
- 每个条件都有自己的条件线程队列;
- 通过await,将当前线程休眠,放到和条件对应的条件线程队列中,并释放锁;但它并不能影响到其他条件对应的条件线程队列。
- 通过 signalAll/signal,唤醒条件对应的条件线程队列中的所有线程,让它们去竞争锁;但它并不能影响到其他条件对应的条件线程队列。