Java并发编程(三) - 实战:线程同步的实现

synchronized关键字

首先,来看一个多线程竞争临界资源导致的同步不安全问题。

package com.example.weishj.mytester.concurrency.sync;

/**
 * 同步安全测试
 *
 * 在无任何同步措施时,并发会导致错误的结果
 */
public class SyncTest1 implements Runnable {
    // 共享资源(临界资源)
    private static int race = 0;
    private static final int THREADS_COUNT = 10;

    public void increase() {
        race++;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            increase();
        }
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        SyncTest1 runnable = new SyncTest1();
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            threads[i] = new Thread(runnable);
            threads[i].start();
        }

        // 等待所有累加线程都结束
        while (Thread.activeCount() > 1) {
            Thread.yield();
        }
        // 期待的结果应该是(THREADS_COUNT * 10000)= 100000
        System.out.println("race = " + race + ", time: " + (System.currentTimeMillis() - start));
    }
}

运行结果:

race = 69309, time: 4

synchronized实例方法

  • 锁定实例对象(this)

以开头的代码为例,对 increase() 做同步安全控制:

// synchronized实例方法,安全访问临界资源
public synchronized void increase() {
    race++;
}

运行结果:

race = 100000, time: 29

既然锁定的是this对象,那么任何同步安全就必须建立在当前对象锁的前提之上,脱离了当前对象,就不再有同步安全可言。仍然以开头的代码为例:

package com.example.weishj.mytester.concurrency.sync;

/**
 * 同步安全测试
 *
 * 脱离了"同一个对象"的前提,synchronized实例方法将不再具有同步安全性
 */
public class SyncTest3 implements Runnable {
    // 共享资源(临界资源)
    private static int race = 0;
    private static final int THREADS_COUNT = 10;

    // synchronized实例方法,安全访问临界资源
    public synchronized void increase() {
        race++;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            increase();
        }
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
//      SyncTest3 runnable = new SyncTest3();
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            // 不同的对象锁,将导致临界资源不再安全
            threads[i] = new Thread(new SyncTest3());
            threads[i].start();
        }

        // 等待所有累加线程都结束
        while (Thread.activeCount() > 1) {
            Thread.yield();
        }
        // 期待的结果应该是(THREADS_COUNT * 10000)= 100000
        System.out.println("race = " + race + ", time: " + (System.currentTimeMillis() - start));
    }
}

运行结果:

race = 72446, time: 5

因此,使用synchronized实例方法时,需要格外注意实例对象是不是同一个:

  • 单例:安全
  • 非单例:同一个实例对象上才存在同步安全

另外,既然是针对对象加锁,那么同一个对象中的多个同步实例方法之间,也是互斥的。

package com.example.weishj.mytester.concurrency.sync;

/**
 * 同步安全测试
 *
 * 同一个对象的不同synchronized实例方法之间,也是互斥的
 */
public class SyncTest4 {
    private static final int THREADS_COUNT = 2;

    public synchronized void a() {
        int i = 5;
        while (i-- > 0) {
            System.out.println("Thread: " + Thread.currentThread().getName() + ", method: a, running...");
        }
    }

    public synchronized void b() {
        int i = 5;
        while (i-- > 0) {
            System.out.println("Thread: " + Thread.currentThread().getName() + ", method: b, running...");
        }
    }

    public static void main(String[] args) {
        final SyncTest4 instance = new SyncTest4();
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            final int finalI = i;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (finalI % 2 == 0) {
                        // 若通过不同对象调用方法ab,则ab之间不存在互斥关系
//                      new SyncTest4().a();
                        // 在同一个对象上调用方法ab,则ab之间是互斥的
                        instance.a();
                    } else {
                        // 若通过不同对象调用方法ab,则ab之间不存在互斥关系
//                      new SyncTest4().b();
                        // 在同一个对象上调用方法ab,则ab之间是互斥的
                        instance.b();
                    }
                }
            });
            threads[i].start();
        }
    }
}

运行结果:

Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...

若两个线程分别通过不同的对象调用方法ab(上述示例中被注释的代码),则ab之间就不存在互斥关系。可以通过上述示例中被注释的代码来验证,运行结果:

Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...

综上分析,synchronized实例方法 有以下关键点需要记住:

  • 锁定实例对象(this)
  • 每个实例都有独立的对象锁,因此只有针对同一个实例,才具备互斥性
  • 同一个实例中的多个synchronized实例方法之间,也是互斥的

synchronized静态方法

  • 锁定类对象(class)
package com.example.weishj.mytester.concurrency.sync.synchronizedtest;

/**
 * 同步安全测试
 *
 * 同步静态方法,实现线程安全
 */
public class SyncStaticTest1 implements Runnable {
    // 共享资源(临界资源)
    private static int race = 0;
    private static final int THREADS_COUNT = 10;

    public static synchronized void increase() {
        race++;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            // 这里加this只是为了显式地表明是通过对象来调用increase方法
            this.increase();
        }
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            // 每次都创建新的SyncStaticTest1实例
            threads[i] = new Thread(new SyncStaticTest1());
            threads[i].start();
        }

        // 等待所有累加线程都结束
        while (Thread.activeCount() > 1) {
            Thread.yield();
        }
        // 期待的结果应该是(THREADS_COUNT * 10000)= 100000
        System.out.println("race = " + race + ", time: " + (System.currentTimeMillis() - start));
    }
}

运行结果:

race = 100000, time: 25

可见,就算是10个线程分别通过不同的SyncStaticTest1实例调用increase方法,仍然是线程安全的。同样地,不同线程分别通过实例对象和类对象调用同步静态方法,也是线程安全的,这里不再做演示。

但是,同一个类的 同步静态方法同步实例方法 之间,则不存在互斥性,因为他们的同步锁不同。如下示例:

package com.example.weishj.mytester.concurrency.sync.synchronizedtest;

/**
 * 同步安全测试
 *
 * 同步静态方法和同步实例方法之间,不存在互斥性
 */
public class SyncStaticTest2 {
    private static final int THREADS_COUNT = 2;

    public synchronized static void a() {
        int i = 5;
        while (i-- > 0) {
            System.out.println("Thread: " + Thread.currentThread().getName() + ", method: a, running...");
        }
    }

    public synchronized void b() {
        int i = 5;
        while (i-- > 0) {
            System.out.println("Thread: " + Thread.currentThread().getName() + ", method: b, running...");
        }
    }

    public static void main(String[] args) {
        final SyncStaticTest2 instance = new SyncStaticTest2();
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            final int finalI = i;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (finalI % 2 == 0) {
                        // 静态方法即可以通过实例调用,也可以通过类调用
                        instance.a();
                    } else {
                        // 实例方法则只能通过实例调用
                        instance.b();
                    }
                }
            });
            threads[i].start();
        }
    }
}

运行结果:

Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-1, method: b, running...
Thread: Thread-0, method: a, running...
Thread: Thread-1, method: b, running...
Thread: Thread-0, method: a, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...

综上分析,synchronized静态方法 有以下关键点需要记住:

  • 锁定类对象(class)
  • 同步静态方法在任意实例对象之间,也是互斥的
  • 同个类的同步静态方法和同步实例方法之间,不具备互斥性

synchronized代码块

从之前的演示示例中,我们可以发现,方法同步后,其耗时(time)一般都在20ms以上,而不同步时,time则只有3ms左右,这印证了synchronized关键字其实是非常低效的,不应该随意使用,如果必须使用,也应该考虑尽量减少同步的范围,尤其当方法体比较大时,应该尽量避免使用同步方法,此时可以考虑用同步代码块来代替。

synchronized(obj) {...}
  • 锁住指定的对象(可以是任意实例对象,类对象)
package com.example.weishj.mytester.concurrency.sync.synchronizedtest;

/**
 * 同步安全测试
 *
 * 同步代码块,实现线程安全
 */
public class SyncBlockTest1 implements Runnable {
    // 共享资源(临界资源)
    private static int race = 0;
    private static final int THREADS_COUNT = 10;

    public void increase() {
        race++;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            // 要注意这里锁定的对象是谁
            synchronized (this) {
                increase();
            }
        }
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        SyncBlockTest1 runnable = new SyncBlockTest1();
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            // 必须使用同一个实例,才能达到同步效果
            threads[i] = new Thread(runnable);
            threads[i].start();
        }

        // 等待所有累加线程都结束
        while (Thread.activeCount() > 1) {
            Thread.yield();
        }
        // 期待的结果应该是(THREADS_COUNT * 10000)= 100000
        System.out.println("race = " + race + ", time: " + (System.currentTimeMillis() - start));
    }
}

运行结果:

race = 100000, time: 29

上例中,我们锁定了当前对象 this ,如果类的使用情况比较复杂,无法用this做对象锁,也可以自行创建任意对象充当对象锁,此时建议使用长度为0的byte数组,因为在所有对象中,它的创建是最经济的(查看编译后的字节码:byte[] lock = new byte[0] 只需3条操作码,而Object lock = new Object() 则需要7行操作码)。

// 使用一个长度为0的byte数组作为对象锁
private byte[] lock = new byte[0];

synchronized (lock) {
    increase();
}

使用同步代码块时,同样必须明确你的对象锁是谁,这样才能写出正确的使用逻辑。以上例来说,无论是 this 还是 lock ,他们都是与当前对象相关的,所以,为了达到同步效果,必须如下使用:

SyncBlockTest1 runnable = new SyncBlockTest1();
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
    // 必须使用同一个实例,才能达到同步效果
    threads[i] = new Thread(runnable);
    threads[i].start();
}

可如果你的使用方法如下,就失去了线程安全性:

Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
    // 每次都创建新的SyncStaticTest1实例,就会失去线程安全性
    threads[i] = new Thread(new SyncBlockTest1());
    threads[i].start();
}

此时,运行结果为:

race = 62629, time: 7

但如果你锁定的是类对象 SyncStaticTest1.class ,那10个线程无论使用同一个实例还是各自使用不同的实例,都是安全的。

// 锁定类对象
synchronized (SyncStaticTest1.class) {
    increase();
}

Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
    // 每次都创建新的SyncStaticTest1实例,仍然是线程安全的
    threads[i] = new Thread(new SyncBlockTest1());
    threads[i].start();
}

运行结果:

race = 100000, time: 25

综上分析,synchronized代码块 有以下关键点需要记住:

  • 锁住指定的对象(可以是任意实例对象,类对象)
  • 需要创建对象锁时,建议使用 new byte[0] ,因为在所有对象中,它的创建是最经济的
  • 必须时刻明确对象锁是谁,只有配合正确的使用方法,才能得到正确的同步效果

至此,synchronized的三种用法就说完了,可见,使用synchronized时,明确对象锁是非常重要的。另外,搞清楚了对象锁的相关知识后,就不难推断出以下2个等式:

synchronized void method() {
// method logic
}

等价于:

void method() {
synchronized(this) {
// method logic
}
}
static synchronized void method() {
// method logic
}

等价于:

static void method() {
synchronized(TestClass.class) {
// method logic
}
}

Lock接口

除了synchronized关键字,JDK1.5中还新增了另外一种线程同步机制:Lock接口。来看看其接口定义:

package java.util.concurrent.locks;

import java.util.concurrent.TimeUnit;

public interface Lock {
    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unlock();

    Condition newCondition();
}
lock()

获取普通锁,若锁已被获取,则只能等待,效果与synchronized相同。只不过lock后需要unlock。

lockInterruptibly()

获取可中断锁,当两个线程同时通过 lockInterruptibly() 想获取某个锁时,假设A获取到了,那么B只能等待,此时如果对B调用 interrupt() 方法,就可以中断B的等待状态。但是注意,A是不会被 interrupt() 中断的,也就是说,只有处于等待状态的线程,才可以响应中断。

tryLock()

尝试获取锁,如果获取成功返回true,反之立即返回false。此方法不会阻塞等待获取锁。

tryLock(long time, TimeUnit unit)

等待time时间,如果在time时间内获取到锁返回true,如果阻塞等待time时间内没有获取到锁返回false。

unlock()

业务处理完毕,释放锁。

newCondition()

创建一个Condition。Condition与Lock结合使用,可以达到synchronized与wait/notify/notifyAll结合使用时同样的线程等待与唤醒的效果,而且功能更强大。

Lock接口与synchronized关键字的区别
  • synchronized加解锁是自动的;而Lock需要手动加解锁,操作复杂,但更加灵活
    • lock与unlock需要成对使用,否则可能造成线程长期占有锁,其他线程长期等待
    • unlock应该放在 finally 中,以防发生异常时未能及时释放锁
  • synchronized不可响应中断,一个线程获取不到锁就一直等待;而Lock可以响应中断
    • 当两个线程同时通过 Lock.lockInterruptibly() 想获取某个锁时,假设A获取到了,那么B只能等待,此时如果对B调用 interrupt() 方法,就可以中断B的等待状态。但是注意,A是不会被 interrupt() 中断的,也就是说,只有处于等待状态的线程,才可以响应中断。
  • synchronized无法实现公平锁;而Lock可以实现公平锁
    • 公平锁与非公平锁的概念稍后再说

ReentrantLock可重入锁

ReentrantLock是Lock的实现类。首先,看一个简单的售票程序:

package com.example.weishj.mytester.concurrency.sync.synchronizedtest;

/**
 * 同步安全测试
 *
 * 一个简单的售票程序,多线程同时售票时,会出现线程安全问题
 */
public class ReentrantLockTest1 {
    private static final int THREADS_COUNT = 3; // 线程数
    private static final int TICKETS_PER_THREAD = 5;    // 每个线程分配到的票数
    // 共享资源(临界资源)
    private int ticket = THREADS_COUNT * TICKETS_PER_THREAD;    // 总票数

    public void buyTicket() {
        try {
            if (ticket > 0) {
                System.out.println("Thread: " + Thread.currentThread().getName() + ", bought ticket-" + ticket--);
                // 为了更容易出现安全问题,这里加一个短暂睡眠
                Thread.sleep(2);
            }
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

    public void readTicket() {
        System.out.println("Thread: " + Thread.currentThread().getName() + ", tickets left: " + ticket);
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        final ReentrantLockTest1 instance = new ReentrantLockTest1();
        // 启动 THREADS_COUNT 个线程
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    // 每个线程可以卖 TICKETS_PER_THREAD 张票
                    for (int j = 0; j < TICKETS_PER_THREAD; j++) {
                        instance.buyTicket();
                    }
                }
            });
            threads[i].start();
        }

        // 等待所有累加线程都结束
        while (Thread.activeCount() > 1) {
            Thread.yield();
        }

        // 读取剩余票数
        instance.readTicket();
        // 耗时
        System.out.println("time: " + (System.currentTimeMillis() - start));
    }
}

库存有15张票,同时启动3个线程出售,每个线程分配5张,线程安全时,结果应该是所有票正好都被卖掉,不多不少。然而,在没有任何同步措施的情况下,运行结果如下:

Thread: Thread-0, bought ticket-15
Thread: Thread-2, bought ticket-13
Thread: Thread-1, bought ticket-14
Thread: Thread-1, bought ticket-12
Thread: Thread-2, bought ticket-11
Thread: Thread-0, bought ticket-12
Thread: Thread-2, bought ticket-10
Thread: Thread-1, bought ticket-10
Thread: Thread-0, bought ticket-9
Thread: Thread-2, bought ticket-8
Thread: Thread-1, bought ticket-7
Thread: Thread-0, bought ticket-6
Thread: Thread-0, bought ticket-5
Thread: Thread-2, bought ticket-5
Thread: Thread-1, bought ticket-4
Thread: main, tickets left: 3
time: 14

可见,ticket-12、ticket-10、ticket-5均被售出了2次,而Ticket-1、Ticket-2、Ticket-3没有售出。

下面是使用Lock的实现类 ReentrantLock 对上例做的改造:

package com.example.weishj.mytester.concurrency.sync.synchronizedtest;

import java.util.concurrent.locks.ReentrantLock;

/**
 * 同步安全测试
 *
 * 演示ReentrantLock实现同步,以及公平锁与非公平锁
 */
public class ReentrantLockTest2 {
    private static final int THREADS_COUNT = 3; // 线程数
    private static final int TICKETS_PER_THREAD = 5;    // 每个线程分配到的票数
    // 共享资源(临界资源)
    private int ticket = THREADS_COUNT * TICKETS_PER_THREAD;    // 总票数
    private static final ReentrantLock lock;

    static {
        // 创建一个公平锁/非公平锁
        lock = new ReentrantLock(false);    // 修改参数,看看公平锁与非公平锁的差别
    }

    public void buyTicket() {
        try {
            lock.lock();
            if (ticket > 0) {
                System.out.println("Thread: " + Thread.currentThread().getName() + ", bought ticket-" + ticket--);
                // 为了演示出公平锁与非公平锁的效果,这里加一个短暂睡眠,让其他线程获得一个等待时间
                Thread.sleep(2);
            }
        } catch (Throwable t) {
            t.printStackTrace();
        } finally {
            // unlock应该放在finally中,防止发生异常时来不及解锁
            lock.unlock();
        }
    }

    public void readTicket() {
        System.out.println("Thread: " + Thread.currentThread().getName() + ", tickets left: " + ticket);
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        final ReentrantLockTest2 instance = new ReentrantLockTest2();
        // 启动 THREADS_COUNT 个线程
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    // 每个线程可以卖 TICKETS_PER_THREAD 张票
                    for (int j = 0; j < TICKETS_PER_THREAD; j++) {
                        instance.buyTicket();
                    }
                }
            });
            threads[i].start();
        }

        // 等待所有累加线程都结束
        while (Thread.activeCount() > 1) {
            Thread.yield();
        }

        // 读取剩余票数
        instance.readTicket();
        // 耗时
        System.out.println("time: " + (System.currentTimeMillis() - start));
    }
}

运行结果:

Thread: Thread-0, bought ticket-15
Thread: Thread-0, bought ticket-14
Thread: Thread-0, bought ticket-13
Thread: Thread-1, bought ticket-12
Thread: Thread-1, bought ticket-11
Thread: Thread-1, bought ticket-10
Thread: Thread-1, bought ticket-9
Thread: Thread-1, bought ticket-8
Thread: Thread-2, bought ticket-7
Thread: Thread-2, bought ticket-6
Thread: Thread-2, bought ticket-5
Thread: Thread-2, bought ticket-4
Thread: Thread-2, bought ticket-3
Thread: Thread-0, bought ticket-2
Thread: Thread-0, bought ticket-1
Thread: main, tickets left: 0
time: 36

可见,从 ticket-15 到 ticket-1 都被按顺序售出了,只不过每张票由哪条线程售出则存在不确定性。上述运行结果是使用 非公平锁 得到的,我们再通过修改代码 lock = new ReentrantLock(true) ,看看公平锁的运行效果:

Thread: Thread-0, bought ticket-15
Thread: Thread-1, bought ticket-14
Thread: Thread-2, bought ticket-13
Thread: Thread-0, bought ticket-12
Thread: Thread-1, bought ticket-11
Thread: Thread-2, bought ticket-10
Thread: Thread-0, bought ticket-9
Thread: Thread-1, bought ticket-8
Thread: Thread-2, bought ticket-7
Thread: Thread-0, bought ticket-6
Thread: Thread-1, bought ticket-5
Thread: Thread-2, bought ticket-4
Thread: Thread-0, bought ticket-3
Thread: Thread-1, bought ticket-2
Thread: Thread-2, bought ticket-1
Thread: main, tickets left: 0
time: 47

我们看到,在公平锁环境下,不仅ticket安全性得到保证,就连线程获得锁的顺序也得到了保证,以“Thread-0、1、2”的顺序循环执行。这里的“公平性”体现在哪里呢?通俗点说,就是先排队等待(也就是等待时间越长)的线程先得到锁,显然,这种”先到先得“的效果,用队列”先进先出“的特性实现最为合适。

Java也确实是通过”等待队列“来实现”公平锁“的。所有等待锁的线程都会被挂起并且进入等待队列,当锁被释放后,系统只允许等待队列的头部线程被唤醒并获得锁。而”非公平锁“其实同样有这样一个队列,只不过当锁被释放后,系统并不会只从等待队列中获取头部线程,而是如果发现此时正好有一个还没进入等待队列的线程想要获取锁(此时该线程还未被挂起)时,则直接将锁给了它(公平性被打破),这条线程就可以直接执行,而不用进行状态切换,于是就省去了切换的开销,这也就是非公平锁效率高于公平锁的原因所在。

有了上述理解,我们就可以推断

  1. 若在释放锁时,总是没有新的线程来打扰,则每次都必定从等待队列中取头部线程唤醒,此时非公平锁等于公平锁。
  2. 对于非公平锁来说,只要线程进入了等待队列,队列里面仍然是FIFO的原则,跟公平锁的顺序是一样的。有人认为,”非公平锁环境下,哪条线程获得锁完全是随机的“,这种说法明显是不对的,已经进入等待队列中的那些线程就不是随机获得锁的。

Condition条件

在Lock接口定义中,还定义了一个 newCondition() 方法,用于返回一个Condition。

Condition与Lock结合起来使用,可以达到Object监视器方法(wait/notify/notifyAll)与synchronized结合起来使用时同样甚至更加强大的线程等待与唤醒效果。其中,Lock替代synchronized,Condition替代Object监视器方法。

在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()。传统的线程间通信方式,Condition都能实现,需要注意的是,Condition是绑定在Lock上的,必须通过Lock对象的 newCondition() 方法获得。

Condition的强大之处,在于它可以针对同一个lock对象,创建多个不同的Condition条件,以处理复杂的线程等待与唤醒场景。典型的例子就是“生产者-消费者”问题。生产者与消费者共用同一个固定大小的缓冲区,当缓冲区满了,生产者还想向其中添加数据时,就必须休眠,等待消费者取走一个或多个数据后再唤醒。同样,当缓冲区空了,消费者还想从中取走数据时,也要休眠,等待生产者向其中添加一个或多个数据后再唤醒。可见,Condition可以指定哪条线程被唤醒,而notify/notifyAll则不行

package com.example.weishj.mytester.concurrency.sync.synchronizedtest;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Condition测试
 *
 * 生产者-消费者问题
 */
public class ConditionTest {
    private static final int REPOSITORY_SIZE = 3;
    private static final int PRODUCT_COUNT = 10;

    public static void main(String[] args)  {
        // 创建一个容量为REPOSITORY_SIZE的仓库
        final Repository repository = new Repository(REPOSITORY_SIZE);

        Thread producer = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < PRODUCT_COUNT; i++) {
                    try {
                        repository.put(Integer.valueOf(i));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }) ;

        Thread consumer = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < PRODUCT_COUNT; i++) {
                    try {
                        Object val = repository.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }) ;

        producer.start();
        consumer.start();
    }

    /**
     * Repository 是一个定长集合,当集合为空时,take方法需要等待,直到有元素时才返回元素
     * 当其中的元素数达到最大值时,put方法需要等待,直到元素被take之后才能继续put
     */
    static class Repository {
        final Lock lock = new ReentrantLock();
        final Condition notFull = lock.newCondition();
        final Condition notEmpty = lock.newCondition();

        final Object[] items;
        int putIndex, takeIndex, count;

        public Repository(int size) {
            items = new Object[size];
        }

        public void put(Object x) throws InterruptedException {
            try {
                lock.lock();
                while (count == items.length) {
                    System.out.println("Buffer full, please wait");
                    // 开始等待库存不为满
                    notFull.await();
                }

                // 生产一个产品
                items[putIndex] = x;
                // 增加当前库存量
                count++;
                System.out.println("Produce: " + x);
                if (++putIndex == items.length) {
                    putIndex = 0;
                }
                // 通知消费者线程库存已经不为空了
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }

        public Object take() throws InterruptedException {
            try {
                lock.lock();
                while (count == 0) {
                    System.out.println("No element, please wait");
                    // 开始等待库存不为空
                    notEmpty.await();
                }
                // 消费一个产品
                Object x = items[takeIndex];
                // 减少当前库存量
                count--;
                System.out.println("Consume: " + x);
                if (++takeIndex == items.length) {
                    takeIndex = 0;
                }
                // 通知生产者线程库存已经不为满了
                notFull.signal();
                return x;
            } finally {
                lock.unlock();
            }
        }
    }
}

运行结果:

Produce: 0
Produce: 1
Produce: 2
Buffer full, please wait
Consume: 0
Consume: 1
Produce: 3
Produce: 4
Buffer full, please wait
Consume: 2
Consume: 3
Consume: 4
No element, please wait
Produce: 5
Produce: 6
Produce: 7
Buffer full, please wait
Consume: 5
Consume: 6
Consume: 7
No element, please wait
Produce: 8
Produce: 9
Consume: 8
Consume: 9

ReadWriteLock读写锁

ReadWriteLock也是一个接口,其优势是允许”读并发“,也就是”读写互斥,写写互斥,读读不互斥“。在多线程读的场景下,能极大的提高运算效率,提升服务器吞吐量。其接口定义很简单:

package java.util.concurrent.locks;

public interface ReadWriteLock {
    Lock readLock();

    Lock writeLock();
}

ReentrantReadWriteLock可重入读写锁

ReentrantReadWriteLock是读写锁的实现类。我们将售票程序做个简单的改造:

package com.example.weishj.mytester.concurrency.sync.synchronizedtest;

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 同步安全测试
 *
 * 演示ReentrantReadWriteLock实现同步,它的特点是"读并发"、"写互斥"、"读写互斥"
 */
public class ReentrantReadWriteLockTest1 {
    private static final int THREADS_COUNT = 3; // 线程数
    private static final int TICKETS_PER_THREAD = 4;    // 每个线程分配到的票数
    // 共享资源(临界资源)
    private int ticket = THREADS_COUNT * TICKETS_PER_THREAD;    // 总票数
    private static final ReadWriteLock lock;

    static {
        // 为了通过一个示例同时演示"读并发"、"写互斥"、"读写互斥"的效果,创建一个公平锁
        lock = new ReentrantReadWriteLock(false);   // 此处也说明读锁与写锁之间同样遵守公平性原则
    }

    public void buyTicket() {
        try {
            lock.writeLock().lock();
            if (ticket > 0) {
                System.out.println("Thread: " + Thread.currentThread().getName() + ", bought ticket-" + ticket--);
                Thread.sleep(2);
            }
        } catch (Throwable t) {
            t.printStackTrace();
        } finally {
            System.out.println("Thread: " + Thread.currentThread().getName() + ", unlocked write");
            lock.writeLock().unlock();
        }
    }

    public void readTicket() {
        try {
            lock.readLock().lock();
            System.out.println("Thread: " + Thread.currentThread().getName() + ", tickets left: " + ticket);
            Thread.sleep(5);
        } catch (Throwable t) {
            t.printStackTrace();
        } finally {
            System.out.println("Thread: " + Thread.currentThread().getName() + ", unlocked read");
            lock.readLock().unlock();
        }
    }

    public static void main(String[] args) {
        final ReentrantReadWriteLockTest1 instance = new ReentrantReadWriteLockTest1();
        // 启动 THREADS_COUNT 个线程
        Thread[] writeThreads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            writeThreads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    // 每个线程可以卖 TICKETS_PER_THREAD 张票
                    for (int j = 0; j < TICKETS_PER_THREAD; j++) {
                        instance.buyTicket();
                    }
                }
            });
            writeThreads[i].start();
        }

        // 读取此时的剩余票数
        Thread[] readThreads = new Thread[2];
        for (int i = 0; i < 2; i++) {
            readThreads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    // 每个线程可以读 2 次剩余票数
                    for (int j = 0; j < 2; j++) {
                        instance.readTicket();
                    }
                }
            });
            readThreads[i].start();
        }
    }
}

运行结果:

Thread: Thread-0, bought ticket-12
Thread: Thread-0, unlocked write
Thread: Thread-0, bought ticket-11
Thread: Thread-0, unlocked write
Thread: Thread-0, bought ticket-10
Thread: Thread-0, unlocked write
Thread: Thread-0, bought ticket-9
Thread: Thread-0, unlocked write
Thread: Thread-1, bought ticket-8
Thread: Thread-1, unlocked write
Thread: Thread-1, bought ticket-7
Thread: Thread-1, unlocked write
Thread: Thread-1, bought ticket-6
Thread: Thread-1, unlocked write
Thread: Thread-1, bought ticket-5
Thread: Thread-1, unlocked write
Thread: Thread-2, bought ticket-4
Thread: Thread-2, unlocked write
Thread: Thread-2, bought ticket-3
Thread: Thread-2, unlocked write
Thread: Thread-2, bought ticket-2
Thread: Thread-2, unlocked write
Thread: Thread-2, bought ticket-1
Thread: Thread-2, unlocked write
Thread: Thread-3, tickets left: 0
Thread: Thread-4, tickets left: 0
Thread: Thread-3, unlocked read
Thread: Thread-3, tickets left: 0
Thread: Thread-4, unlocked read
Thread: Thread-4, tickets left: 0
Thread: Thread-3, unlocked read
Thread: Thread-4, unlocked read

上述结果是在”非公平锁“的环境下得到的,无论尝试运行多少次,2条读线程都是被放在3条写线程执行完毕后才开始执行,为了一次性验证所有结论,我们再换”公平锁“重新执行一次,结果如下:

Thread: Thread-0, bought ticket-12
Thread: Thread-0, unlocked write
Thread: Thread-1, bought ticket-11
Thread: Thread-1, unlocked write
Thread: Thread-2, bought ticket-10
Thread: Thread-2, unlocked write
Thread: Thread-3, tickets left: 9
Thread: Thread-4, tickets left: 9
Thread: Thread-4, unlocked read
Thread: Thread-3, unlocked read
Thread: Thread-0, bought ticket-9
Thread: Thread-0, unlocked write
Thread: Thread-1, bought ticket-8
Thread: Thread-1, unlocked write
Thread: Thread-2, bought ticket-7
Thread: Thread-2, unlocked write
Thread: Thread-4, tickets left: 6
Thread: Thread-3, tickets left: 6
Thread: Thread-3, unlocked read
Thread: Thread-4, unlocked read
Thread: Thread-0, bought ticket-6
Thread: Thread-0, unlocked write
Thread: Thread-1, bought ticket-5
Thread: Thread-1, unlocked write
Thread: Thread-2, bought ticket-4
Thread: Thread-2, unlocked write
Thread: Thread-0, bought ticket-3
Thread: Thread-0, unlocked write
Thread: Thread-1, bought ticket-2
Thread: Thread-1, unlocked write
Thread: Thread-2, bought ticket-1
Thread: Thread-2, unlocked write

这次读线程就被穿插到写线程中间了,从上述结果中可以看到:

  • 当任意线程写的时候,其他线程既不能读也不能写
  • Thread-3读的时候,Thread-4同样可以读,但是不能有任何写线程
  • 3条写线程永远按照”0-1-2“的顺序执行,他们遵守”公平性“原则
  • 2条读线程之间非互斥,所以也谈不上什么”公平性”原则
  • 3条写线程”Thread-0、1、2“各获得过一次锁之后,必定轮到2条读线程”Thread-3、4“获得锁,而不是如”非公平锁“的结果那样,读线程总是等到写线程全部执行结束后才开始执行,也就是说读线程与写线程之间遵守同一个”公平性“原则

使用场景分析

synchronized
  • 不需要“中断”与“公平锁”的业务场景
  • 较为简单的“等待与唤醒”业务(与Object监视器方法结合使用)
ReentrantLock可重入锁
  • 需要“响应中断”的业务场景:处于等待状态的线程可以中断
  • 需要“公平锁”的业务场景:线程有序获得锁,亦即“有序执行”
  • 与Condition结合,可以满足更为复杂的“等待与唤醒”业务(可以指定哪个线程被唤醒)
ReentrantReadWriteLock可重入读写锁
  • 允许“读读并发”的业务场景,可以大幅提高吞吐量

总结

synchronized实例方法
  • 锁定实例对象(this)
  • 每个实例都有独立的对象锁,因此只有针对同一个实例,才具备互斥性
  • 同一个实例中的多个synchronized实例方法之间,也是互斥的
synchronized静态方法
  • 锁定类对象(class)
  • 同步静态方法在任意实例对象之间,也是互斥的
  • 同个类的同步静态方法和同步实例方法之间,不具备互斥性
synchronized代码块
  • 锁住指定的对象(可以是任意实例对象,类对象)
  • 需要创建对象锁时,建议使用 new byte[0] ,因为在所有对象中,它的创建是最经济的
  • 必须时刻明确对象锁是谁,只有配合正确的使用方法,才能得到正确的同步效果
ReentrantLock可重入锁
  • ReentrantLock是Lock接口的一种实现
  • 需要手动加解锁,操作复杂,但更加灵活
  • lock与unlock需要成对使用,且unlock应该放在 finally
  • 可以响应中断
  • 可以实现“公平锁”:先排队等待(也就是等待时间越长)的线程先得到锁
  • 非公平锁环境下,哪条线程获得锁并非是完全随机的,已经进入等待队列中的那些线程就仍然是根据FIFO原则获得锁的
  • 非公平锁效率高于公平锁
  • ReentrantLock与Condition结合使用,类似synchronized与Object监视器方法结合使用
  • 在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()
  • Condition的强大之处,在于它可以针对同一个lock对象,创建多个不同的Condition条件,以处理复杂的线程等待与唤醒场景
  • Condition可以指定哪条线程被唤醒,而notify/notifyAll则不行
ReentrantReadWriteLock可重入读写锁
  • ReentrantReadWriteLock是ReadWriteLock接口(读写锁)的一个实现类,而ReadWriteLock内部则是由Lock实现的
  • ReentrantReadWriteLock具有ReentrantLock的一切特性,同时还具有自己的独立特性:"读读并发"、"写写互斥"、"读写互斥"
  • ReentrantReadWriteLock可以有效提高并发,增加吞吐量
  • 在“公平锁”环境下,读线程之间没有”公平性“可言,而写线程之间,以及读线程与写线程之间,则遵守同一个“公平性”原则
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343