Java线程通信

传统的线程通信

假设现在系统当中有两个线程,这两个进程分别代表了存款者和取钱者。现在假设系统有一个特殊的需求,系统要求存款者和取钱者能够不断的重复存款、取钱的动作,而且要求每当存款者将钱存入指定的账户后,取钱者立即取出这笔钱。不允许存款者连续两次存钱,也不允许取钱者连续两次取钱。

为了实现这种功能就要借助Object类提供的wait()、notify()和notifyAll()三个方法。但是这三个方法必须由同步监视器对象调用,还可以分为以下两种情况:

  • 对于使用synchronized修饰的同步方法,因为该类的默认实例(this)就是同步监视器,所以可以在同步方法中直接调用这三个方法。
  • 对于使用synchronized修饰的同步代码块,同步监视器是synchronized后括号里的对象,所以必须使用对象调用这三个方法。

关于这三个方法的解释如下:

  • wait():导致当前线程等待,直到其它线程调用该线程的notify()和notifyAll()方法来唤醒该线程。wait()方法有三种形式:无时间参数的wait、带毫秒参数的wait和带毫秒、毫微秒参数的wait()(这两种方法都是等待指定时间后自动苏醒)。带wait方法的当前线程会释放对同步监视器的锁定。
  • notify():唤醒在此同步监视器上等待的单个线程。如果所有线程都在此同步监视器上等待,则会选择唤醒其中一个线程。选择是任意性的。只有当前线程放弃对同步监视器的锁定后(使用wait()方法),才可以指定被唤醒的线程。
  • notifyAll():唤醒在此同步监视器上等待的所有线程。只有当线程放弃对该同步监视器的锁定后,才可以执行被唤醒的线程。

程序中可以通过一个旗标来标识账户中是否有存款,当旗标为false时,表明账户中没有存款,存款线程可以向下执行,当存款者把钱存入账户后,将旗标设为true,并调用notify()或notifyAll()方法来唤醒其它线程;当存款者线程进入线程体,如果旗标为true就调用wait()方法让线程等待。

当旗标为true时,表明账户中已经存入了存款,则取钱者线程可以向下执行。当取钱者将钱从账户中取出时,将旗标设为false,并调用notify()或notifyAll()方法来唤醒其它线程:当取钱者线程进入线程体后,如果旗标为false就调用wait()方法让线程等待。

public class Account {
    private String accountNo;
    private double balance;
    //标识账户中是否有存款的旗标
    private boolean flag = false;
    public Account() {}
    
    public Account(String accountNo, double balance){
        this.accountNo = accountNo;
        this.balance = balance;
    }
    
    public String getAccountNo() {
        return accountNo;
    }

    public void setAccountNo(String accountNo) {
        this.accountNo = accountNo;
    }

    public double getBalance() {
        return balance;
    }

    public synchronized void draw(double drawAmount){
        try {
            if(!flag){
                wait();
            }else{
                System.out.println(Thread.currentThread().getName() + " Succeeded in drawing, poping money. " + drawAmount);
                balance -= drawAmount;
                System.out.println("Balance is " + balance);
                flag = false;
                notifyAll();
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public synchronized void deposit(double depositAmount){
        try {
            if(flag){
                wait();
                System.out.println(Thread.currentThread().getName() + " Succeeded in depositing, storing money. " + depositAmount);
                balance += depositAmount;
                System.out.println("Balance is " + balance);
                flag = true;
                notifyAll();
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public int hashCode(){
        return accountNo.hashCode();
    }
    
    public boolean equals(Object obj){
        if(this == obj)
            return true;
        if(obj != null && obj.getClass() == Account.class){
            Account target = (Account)obj;
            return target.getAccountNo().equals(accountNo);
        }
        return false;
    }
}

上面的程序使用了wait()和notifyAll()进行控制,对存款者而言,当程序进入了deposit方法时,如果flag为true,则表明账户中已经有存款了,程序调用wait()方法就会阻塞;否则程序向下执行存款操作,当存款操作完成后,系统将flag设为true,然后调用notifyAll()方法来唤醒其它被阻塞的线程——如果系统中有存款者线程,存款者线程也会被唤醒。

public class DrawThread extends Thread {
    private Account account;
    private double drawAmount;
    
    public DrawThread(String name, Account account, double drawAmount) {
        super(name);
        this.account = account;
        this.drawAmount = drawAmount;
    }
    
    public void run() {
        for(int i=0;i<100;i++){
            account.draw(drawAmount);
        }
    }
}

public class DepositThread extends Thread {
    private Account account;
    private double depositAmount;
    public DepositThread(String name, Account account, double depositAmount) {
        super(name);
        this.account = account;
        this.depositAmount = depositAmount;
    }
    
    public void run() {
        for(int i=0;i<100;i++){
            account.deposit(depositAmount);
        }
    }
}

public class Main {
    public static void main(String[] args) {
        Account acct = new Account("1234567", 0);
        new DrawThread("Drawer", acct, 800).start();
        new DepositThread("Deposit1", acct, 800).start();
        new DepositThread("Deposit2", acct, 800).start();
        new DepositThread("Deposit3", acct, 800).start();
    }
}

运行程序可以看到存款者线程和取钱者线程交替执行。每当存款者向账户存入800元之后,取钱者线程立即从账户取走这笔钱。由于该程序是三个存款者线程随机的向账户中存款,只有一个取钱者线程执行取钱的操作。只有当取钱者线程取钱之后存款者才可以存款。反过来也是一样的。

最后该程序进入了阻塞的状态无法继续向下执行。这是因为三个存款者线程总共有300次尝试存款的操作,但一个取钱者线程只有100次尝试取钱的操作,所以程序最后被阻塞。

阻塞并不是死锁,对于这种情况取钱者线程已经执行结束。而存款者线程只是在等待其他线程来取钱而已,并不是在等待其他线程释放同步监视器。

使用Condition控制线程通信

如果程序不使用synchronized关键字来保证同步,而是直接使用Lock对象来保证同步,则系统中不存在隐式的同步监视器,也就不能使用wait()、notify()、notifyAll()等方法进行线程通信了。

当使用Lock对象来保证同步的时候,Java提供了一个Condition类来保证协调,使用Condition可以让那些已经得到Lock对象却无法继续执行的线程释放Lock对象,Condition对象也可以唤醒其他处于等待的线程。

Condition将同步监视器方法分解为截然不同的对象,以便通过将这些对象和Lock对象组合使用,为每个对象提供多个等待集。在这种情况下Lock替代了同步方法和同步代码块,Condition替代了同步监视器的功能。

Condition实例被绑定在一个Lock对象上。要获得特定Lock实例的Condition实例,调用Lock对象的newCondition方法即可。Condition类提供了如下三个方法:

  • await():类似于隐式同步监视器上的wait()方法,导致当前线程等待,直到其它线程调用该Condition的signal()方法来唤醒该线程。该await()方法有更多的变体,如long awaitNanos(long nanosTimeout)、void awaitUninterruptibly()、awaitUntil(Date deadline)等,可以完成更丰富的等待操作。
  • signal():唤醒在此Lock对象上等待的单个线程。如果所有线程都在该Lock对象上等待,则会选择唤醒其中的一个线程。选择是任意性的。只有当前线程放弃对该Lock对象的锁定后(使用await()方法),才可以执行被唤醒的线程。
  • signalAll():唤醒在此Lock对象上等待的所有线程。只有当前线程放弃对该Lock对象的锁定后,才可以执行被唤醒的线程。
public class Account {
    //显示定义Lock对象
    private final Lock lock = new ReentrantLock();
    //获得指定Lock对象对应的Condition
    private final Condition cond = lock.newCondition();
    private String accountNo;
    private double balance;
    //标识账户中是否有存款的旗标
    private boolean flag = false;
    public Account() {}
    
    public Account(String accountNo, double balance){
        this.accountNo = accountNo;
        this.balance = balance;
    }
    
    public String getAccountNo() {
        return accountNo;
    }

    public void setAccountNo(String accountNo) {
        this.accountNo = accountNo;
    }

    public double getBalance() {
        return balance;
    }

    public void draw(double drawAmount){
        //加锁
        lock.lock();
        try {
            if(!flag){
                cond.await();
            }else{
                System.out.println(Thread.currentThread().getName() + " Succeeded in drawing, poping money. " + drawAmount);
                balance -= drawAmount;
                System.out.println("Balance is " + balance);
                flag = false;
                cond.signalAll();
            }
        }catch (Exception e) {
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }
    
    public void deposit(double depositAmount){
        lock.lock();
        try {
            if(flag){
                cond.await();
            }else{
                System.out.println(Thread.currentThread().getName() + " Succeeded in depositing, storing money. " + depositAmount);
                balance += depositAmount;
                System.out.println("Balance is " + balance);
                flag = true;
                cond.signalAll();
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }
}

由于两个线程属于同一进程,它们可以非常方便的共享数据,因此很少需要使用管道流进行通信。

使用阻塞队列(BlockingQueue)控制线程通信

虽然BlockingQueue也是Queue的子接口,但它的主要用途并不是容器,而是作为线程同步的工具。BlockingQueue具有一个特征:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满则线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。

程序的两个线程通过交替向BlockingQueue中放入元素、取出元素,即可很好的控制线程的通信。

BlockingQueue提供了如下两个支持阻塞的方法:

  • put(E e):尝试将E元素放入BlockingQueue中,如果该队列的元素已满,则阻塞该线程。
  • take():尝试从BlockingQueue的头部取出元素,如果该队列的元素已空,则阻塞该线程。
    BlockingQueue继承了Queue接口,当然也可以使用Queue接口中的方法。这些方法可以分为如下三组:
  • 在队列尾部插入元素。包括add(E e)、offer(E e)和put(E e)方法,当该队列已满,这三个方法会分别抛出异常、返回false、阻塞队列。
  • 在队列头部删除并返回删除的元素。包括remove()、poll()和take()方法。当该队列已空时,这三个方法会分别抛出异常、返回false、阻塞队列。
  • 在队列头部取出但不删除元素。包括element()和peek()方法,当队列已空时,这两个方法分别抛出异常、返回false。

使用BlockingQueue实现线程的通信:

public class Producer extends Thread {
    private BlockingQueue<String>bq;
    public Producer(BlockingQueue<String>bq) {
        this.bq = bq;
    }
    
    @Override
    public void run() {
        String [] strArr = new String[]{
                "Java",
                "Struts",
                "Spring"
        };
        for(int i=0;i<999999999;i++){
            System.out.println(getName() + " Producer is ready to produce set elements. ");
            try {
                Thread.sleep(200);
                bq.put(strArr[i % 3]);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(getName() + " produced " + bq);
        }
    }
}

public class Consumer extends Thread {
    private BlockingQueue<String>bq;
    public Consumer(BlockingQueue<String>bq) {
        this.bq = bq;
    }
    
    @Override
    public void run() {
        while(true){
            System.out.println(getName() + " Consumer is ready to consume set elements. ");
            try {
                Thread.sleep(200);
                bq.take();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(getName() + " consumed. " + bq);
        }
    }
}

public class Main {
    public static void main(String[] args) {
        //创建一个容量为1的BlockingQueue
        BlockingQueue<String>bq = new ArrayBlockingQueue<String>(1);
        //启动三个生产者线程
        new Producer(bq).start();
        new Producer(bq).start();
        new Producer(bq).start();
        //启动一个消费者线程
        new Consumer(bq).start();
    }
}
程序运行结果

上面的程序启动了三个生产者线程,一个消费者线程。生产者线程用于向BlockingQueue中放入元素,消费者线程用于从BlockingQueue中取出元素。该程序的BlockingQueue的容量为1.因此三个生产者线程无法连续放入元素,必须等待消费者线程取出一个元素后,三个生产者线程的其中之一才能放入一个元素。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容

  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    胜浩_ae28阅读 5,084评论 0 23
  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    小徐andorid阅读 2,796评论 3 53
  • 转自线程通信 线程通信的目标是使线程间能够互相发送信号。另一方面,线程通信使线程能够等待其他线程的信号。 例如,线...
    骑摩托马斯阅读 479评论 0 3
  •   一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个程序运行时,内部可能包含了多个顺序执行流,每个顺...
    OmaiMoon阅读 1,661评论 0 12
  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 11,228评论 4 56