二、共享受限资源
有了并发就可以同时做多件事情了。但是,两个或多个线程彼此互相干涉的问题也就出现了。如果不防范这种冲突,就可能发生两个线程同时试图访问同一个银行账户,或向同一个打印机打印,改变同一个值等诸如此类的问题。
1. 不正确地访问资源
考虑下面的例子,其中一个任务产生偶数,而其他任务消费这些数字。这里,消费者任务的唯一工作就是检查偶数的有效性。
首先,我们定义EvenChecker,即消费者任务,因为他将在随后所有的示例中被复用。为了将EvenChecker与我们要试验的各种类型的生成器解耦,我们将创建一个名为IntGenerator的抽象类,它包含EvenChecker必须了解的必不可少的方法:即一个next()方法,和一个可以执行撤销的方法。这个类没有实现Generator接口,因为ta它必须产生一个int,而泛型不支持基本类型的参数:
public abstract class IntGenerator {
private volatile boolean cancelled = false;
public abstract int next();
public void cancel() {
cancelled = true;
}
public boolean isCancelled() {
return cancelled;
}
}
IntGenerator有一个cancel()方法,可以修改boolean类型的canceled标志的状态;还有一个isCanceled()方法,可以查看该对象是否已经被取消。因为canceled标志是boolean类型的,所以它是原子性的,即诸如赋值和返回值这样的简单操作在发生时没有中断的可能,因此你不会看到这个域处于在执行这些简单操作的过程中的中间状态。为了保证可视性,canceled标志还是volatile的。你将在稍后学习原子性和可视性。
任何IntGenerator都可以用下面的EvenChecker类来测试:
public class EvenChecker implements Runnable {
private IntGenerator intGenerator;
private final int id;
public EvenChecker(IntGenerator intGenerator, int id) {
this.intGenerator = intGenerator;
this.id = id;
}
@Override
public void run() {
while (!intGenerator.isCancelled()) {
int val = intGenerator.next();
if (val % 2 != 0) {
System.out.println(val + " not even!");
intGenerator.cancel();
}
}
}
public static void test(IntGenerator ig, int count) {
System.out.println("Press Control C to exit");
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < count; i++) {
exec.execute(new EvenChecker(ig, i));
}
exec.shutdown();
}
public static void test(IntGenerator ig) {
test(ig, 10);
}
}
test()方法通过启动大量使用相同的IntGenerator的EvenChecker,设置并执行对任何类型的IntGenerator的测试。如果IntGenerator引发失败,那么test()将报告它并返回,否则,你必须按下ctrl-c来终止他。
我们看到的第一个IntGenerator有一个可以产生一系列偶数值的next()方法:
public class EvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
@Override
public int next() {
++ currentEvenValue;
++ currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args){
EvenChecker.test(new EvenGenerator());
}
}
有一点很重要,那就是要注意到递增程序自身也需要多个步骤,并且在递增过程中任务可能会被线程机制挂起--也就是说,在Java中,递增不是原子性的操作。因此,如果不保护任务,即使单一的递增也不是安全的。
2. 解决共享资源竞争
对于并发工作,你需要某种方式来防止两个任务访问相同的资源,至少在关键阶段不能出现这种情况。
防止这种冲突的方法就是当资源被一个任务使用时,在其上加锁。第一个访问某项资源的任务必须锁定这项资源,使其他任务在其被解锁之前,就无法访问它了,而在其被解锁之时,另一个任务就可以锁定并使用它,以此类推。
基本上所有的并发模式在解决线程冲突问题的时候,都是采用序列化访问共享资源的方案。这意味着在给定时刻只允许一个任务访问共享资源。通常这是通过在代码前jia'shang'yi't加上一条锁语句来实现的,这就使得在一段时间内只有一个任务可以运行这段代码。因为锁语句产生了一种互相排斥的效果,所以这种机制常常称为互斥量(mutex)。
java以提供关键字synchronized的形式,为防止资源冲突提供了内置支持。当任务要执行被synchronized关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,执行代码,释放锁。
共享资源一般是以对象形式存在的内存片段,但也可以是文件、输入\输出端口,或者是打印机。要控制对共享资源的访问,得先把他包装进一个对象。然后把所有要访问这个资源的方法标记为synchronized。如果某个任务处于一个对标记为synchronized的方法的调用中,那么在这个线程从该方法返回之前,其他所有要调用类中任何标记为synchronized方法的线程都会被阻塞。
注意,在使用并发时,将域设置为private是非常重要的,否则,synchronized关键字就不能防止其他任务直接访问域,这样就会产生冲突。
一个任务可以多次获得对象的锁。如果一个方法在同一个对象上调用了第二个方法,后者又调用了同一对象上的另一个方法,就会发生这种情况。JVM负责跟踪对象被加锁的次数。如果一个对象被解锁(即锁被完全释放),其计数变为0。在任务第一次给对象加锁的时候,计数变为1。每当这个相同的任务在这个对象上获得suo锁时,计数都会递增。显然,只有首先获得了锁的任务才能允许继续获取多个锁。每当任务离开一个synchronized方法,计数递减,当计数为0的时候,锁被完全释放,此时别的任务就可以使用此资源。
针对每个类,也有一个锁(作为类的class对象的一部分),所以synchronized static方法可以在类的范围内防止对static数据的并发访问。
你应该什么时候同步呢?可以运用Brian的同步规则:
如果你正在写一个变量,他可能接下来被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么你必须使用同步,并且,读写线程都必须用相同的监视器锁同步。
如果在你的类中有超过一个方法在处理临界数据,那么你必须同步所有相关的方法。如果只同步一个方法,那么其他方法将会随意的忽略这个对象锁,并可以在无任何惩罚的情况下被调用。这是很重要的一点:每个访问临界共享资源的方法都必须被同步,否则他们就不会正确的工作。
同步控制EvenGenerator
通过在EvenGenerator.java中加入synchronized关键字,可以防止不希望的线程访问:
public class SynchronizedEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
@Override
public synchronized int next() {
++currentEvenValue;
Thread.yield();
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args){
EvenChecker.test(new SynchronizedEvenGenerator());
}
}
在Thread.yield()的调用被插入到了两个递增操作之间,以提高在currentEvenValue是奇数状态时上下文切换的可能性。因为互斥可以防止多个任务同时进入临界区,所以这不会产生任何失败。但是如果失败将会发生,调用yield()是一种促使其发生的有效方式。
使用显式的Lock对象
Java SE5的java.util.concurrent类库还包含有定义在java.util.concurrent.locks中的显式的互斥机制。Lock对象必须被显式的创建、锁定和释放。因此,他与内建的锁形式相比,代码缺乏优雅性。但是对于解决某些类型的问题来说,它更加灵活。
public class MutexEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
private Lock lock = new ReentrantLock();
@Override
public int next() {
lock.lock();
try {
++currentEvenValue;
Thread.yield();
++currentEvenValue;
return currentEvenValue;
}finally {
lock.unlock();
}
}
public static void main(String[] args){
EvenChecker.test(new MutexEvenGenerator());
}
}
尽管try-finally所需的代码比synchronized关键字要多,但是这也代表了显式的Lock对象的优点之一。如果在使用synchronized关键字时,某些事物失败了,那么就会抛出一个异常。但是你没有机会去做任何清理工作,以维护系统使其处于良好状态。有了显式的Lock对象,你就可以使用finally子句将系统维护在正确的状态了。
大体上,当你使用synchronized关键字时,需要写的代码量更少,并且用户错误出现的可能性也会降低,因此通常只有在解决特殊问题时,才会显式的使用Lock对象。例如,用synchronized关键字不能尝试着获取锁并且最终获取锁会失败,或者尝试着获取锁一段时间,然后放弃它,要实现这些,你必须使用concurrent类库。
显式的Lock对象在加锁和释放锁方面,相对于内建的synchronized锁来说还赋予了你更细粒度的控制力,这对于实现专有同步结构是很有用的。
3. 原子性和易变性
原子操作是不能被线程调度机制中断的操作;一旦操作开始,那么他一定可以在可能发生的“上下文切换”之前(切换到其他线程执行)执行完毕。依赖于原子性是很棘手且很危险的。
volatile关键字确保了应用中的可视性。如果你将一个域声明为volatile的,那么还要对这个域产生了写操作,那么所有的读操作都可以看到这个修改。即便使用了本地缓存,情况也确实如此,volatileyu域会立即被写入到主存中,而读取操作就发生在主存中。
理解原子性和易变性是不同的概念这一点很重要。在fei非volatile域上的院子操作不必刷新到主存中去,因此其他读取该域的任务也不必看到这个新值。如果多个任务在同时访问某个域,那么这个域就应该是volatile的,否则,这个域就应该只能经由同步来访问。同步也会导致向主存中刷新,因此如果一个域完全有synchronized方法或语句块来防护,就不必将其设置为是volatile的。
一个任务所作的任何写入操作对这个任务来说都是可视的,因此如果它只需要在这个任务内部可视,那么你就不需要将其设置为volavolatile的。
使用volatile而不是synchronized的唯一安全的情况是类中只有一个可变的域。再次提醒,你的第一选择应该是使用synchronized关键字,这是最安全的方式,而尝试其他任何方式都是有风险的。
基本上,如果一个域可能会被多个任务同时访问,或者这些任务中至少有一个是写入任务,那么你就应该jian将这个域设置为volatile的。如果你将一个域定义为volatile,那么他就会告诉编译器不要执行任何移除读取和写入操作的优化,这些操作的目的是用线程中的局部变量维护对这个域的精确同步。实际上,读取和写入都是直接针对内存的,而却没有被缓存。但是,volatile并不能对递增不是原子性操作这一事实产生影响。
4. 原子类
Java SE5引入了诸如AtomicInteger、AtomicLong、AtomicReference等特殊的原子性变量类,他们提供下面形式的原子性条件更新操作:
boolean compareAndSet(expectedValue, updateValue);
这些类调整为可以使用在某些现在处理器上的可获得的,并且是在机器级别的原子性,因此在使用它们时,通常不需要担心。对于常规编程来说,他们很少会派上用场,但是在涉及性能调优时,他们就大有用武之地了。
public class AtomicIntegerTest implements Runnable {
private AtomicInteger i = new AtomicInteger(0);
private int getValue() {
return i.get();
}
public void evenIncrement() {
i.addAndGet(2);
}
@Override
public void run() {
while (true) {
evenIncrement();
}
}
public static void main(String[] args) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
System.err.println("aborting...");
System.exit(0);
}
}, 5000);
ExecutorService exec = Executors.newCachedThreadPool();
AtomicIntegerTest ait = new AtomicIntegerTest();
exec.execute(ait);
while (true) {
int val = ait.getValue();
if (val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
应该强调的是,Atomic类被设计用来构建java.util.concurrent中的类,因此只有在特殊情况下才在自己的代码中使用它们,即便使用了也需要确保不存在其他可能出现的问题。通常依赖于锁要更安全一些。
5. 临界区
有时,我们只是希望防止多个线程同时访问方法内部的部分代码而不是防止访问整个方法,通过这个方式分离出来的代码被称为临界区(critical section),他也使用synchronized关键字建立,这里,synchronized被用来指定某个对象,此对象的锁被用来对花括号内的代码进行同步控制:
synchronized(syncObject) {
// This code can be accessed
// by only one task at a time
}
这也被成为同步控制块;在进入此段代码前,必须得到syncObject对象的锁。如果其他线程已经得到这个锁,那么就得等到锁被释放后,才能进入临界区。通过使用同步控制卡,而不是对整个方法进行同步控制,可以使多个任务访问对象的时间性能得到显著提高。
6. 在其他对象上同步
synchronized块必须给定一个在其上进行同步的对象,并且最合理的方式是,使用其方法正在被调用的当前对象:synchronized(this)。在这种方式中,如果获得了synchronized块上的锁,那么该对象其他的synchronized方法和临界区就不能被调用了。因此,如果在this上同步,临界区的效果就会直接缩小在同步的范围内。
有时必须在另一个对象上同步,但是如果你要这么做,就必须确保所有相关的任务都是在同一个对象上同步的。
7. 线程本地存储
防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。线程本地存储是一种自动化机制,可以为使用相同变量的每个不同的线程都创建不同的存储。因此,如果你有5个线程都要用变量x所表示的对象,那线程本地存储就会生成5个用于x的不同的存储块。主要是,他们使得你可以将状态与线程关联起来。
创建和管理线程本地存储可以由java.lang.ThreadLocal类来实现,如下所示:
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
private Random rand = new Random(47);
protected synchronized Integer initialValue() {
return rand.nextInt(10000);
}
};
public static void increment() {
value.set(value.get() + 1);
}
public static int get() {
return value.get();
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Accessor(i));
}
TimeUnit.SECONDS.sleep(3);
exec.shutdownNow();
}
}
class Accessor implements Runnable {
private final int id;
public Accessor(int id) {
this.id = id;
}
public void run() {
while (!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
public String toString() {
return "#" + id + ": " + ThreadLocalVariableHolder.get();
}
}
ThreadLocal对象通常当做静态域存储。在创建ThreadLocal时,你只能通过get()和set()方法来访问该对象的内容,其中,get()方法将返回与其线程相关联的对象的副本,而set()会将参数插入到其线程存储的对象中,并返回存储中原有的对象。incrememt()和get()方法在ThreadLocalVariableHolder中演示了这一点。注意,increment()和get()方法都不是synchronized的,因为ThreadLocal保证不会出现竞争条件。
当运行这个程序时,你可以看到每个单独的线程都被分配了自己的存储,因为他们每个都需要跟踪自己的计数值,即便只有一个ThreadLocalVariableHolder对象。