【JAVA提升】- 线程、线程池、并发包(4)

[toc]

1 Future与Callable

使用Runnable接口有很大的局限性,他不能够返回一个值或者一个受检查的异常。这种情况下,可以使用Callable<V>接口,其中V就是返回的结果。

Future<V>用来接收callable结束后返回的结果。ExecuteService 的submit方法都是返回一个Future,可以利用Future获取执行的结果,同时可以利用Future取消任务。任务生命周期 创建,提交,执行,结束。如果任务提交,但是没有执行,可以使用Future 取消。

使用线程池(ThreadPoolExecute)的时候,ExecuteService的submit方法实际上使用的是AbstractExecutorService的submit。查看源码可以看到提交任务后返回的是 FutureTask<T>

示例:

package com.fun.concurrent;

import java.util.concurrent.*;

/**
 * callable示例
 *
 * @author fun
 * @date 2017-04-01 10:59
 */
public class FutureCallableDemo {

    public static void main(String[] args) {
        FutureCallableDemo test = new FutureCallableDemo();

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        //do test
//      test.testCallableTask(executorService);
        test.testRunableTask(executorService);
        executorService.shutdown();
    }

    public void testCallableTask(ExecutorService executorService) {
        Callable<String> task = new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("invoke method call, " + System.currentTimeMillis());
                Thread.sleep(3000);
                System.out.println("method call is going to return, " + System.currentTimeMillis());
                return "SUCCESS";
            }
        };
        Future<String> future = executorService.submit(task);

        System.out.println("main is going to get callable future result, " + System.currentTimeMillis());
        try {
            System.out.println("callable result = " + future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("main got the future result, " + System.currentTimeMillis());
    }

    public void testRunableTask(ExecutorService executorService) {
        Runnable runTask = new Runnable() {
            @Override
            public void run() {
                System.out.println("invoke method run, " + System.currentTimeMillis());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("method run is going to end, " + System.currentTimeMillis());
            }
        };

        Future<Integer> runFuture = executorService.submit(runTask,new Integer(100));
//      Future<?> runFuture = executorService.submit(runTask); // 这两种方式区别,打开注释观察

        try {
            System.out.println("main is going to get runnable future result, " + System.currentTimeMillis());
            System.out.println("runnable result = " + runFuture.get());
            System.out.println("main got the future result, " + System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

执行会发现future.get() 方法是一个阻塞的方法,一直等到任务执行完成得到结果。

思考:看上面例子可以看到一个问题,runnable的任务和callable的任务都是可以返回Futrue的,那么他们有什么区别呢 ?

观察不难发现,Callable<V> 返回结果是在call方法执行完成后返回的,他返回什么结果可以是call里面的计算得到的,类型为V即可。他的结果是可变的,程序运行返回的是什么就是什么。

但是Runable的返回结果只是提前定义的一个结果,可预期正确执行后的一个结果。他的结果在任务提交的时候已经决定了具体的值。

2 原子类 atomic

原子类是如何保证原子操作的?

回答这个问题之前,先一起来看一个原子类的源码

// AtomicInteger部分源码
// AtomicInteegr 加法操作
public final int getAndAdd(int delta) {
    for (;;) {
        int current = get();
        int next = current + delta;
        if (compareAndSet(current, next))
            return current;
    }
}

/**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 *
 * @param expect the expected value
 * @param update the new value
 * @return true if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

这段代码很关键的一个地方就是compareAndSwap (CAS) ,每次操作(写)之前,先比较一下值,确认没有被改过,才写数据。在compareAndSet的注释上面也很清楚的可以看到,只有当cuurent value==expect value的时候,才把value更新成update value。整个执行的原则就是: 先检查后执行

而compareAndSwapXXX是一个native的方法,是虚拟机底层的实现。

测试示例:

package com.fun.concurrent;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 原子类操作
 *
 * @author fun
 * @date 2017-04-01 12:07
 */
public class AtomicClassDemo {

    public static void main(String[] args) {
        AtomicInteger aint = new AtomicInteger(100);
        boolean b = aint.compareAndSet(100,200); // 修改expect与当前值不同测试
        if (b) {
            System.out.println(aint.get());
        }
        System.out.println(aint.getAndAdd(100));
        System.out.println(aint.get());

        System.out.println("-------------------");
        AtomicClassDemo ref1 = new AtomicClassDemo();
        AtomicClassDemo ref2 = new AtomicClassDemo();
        System.out.println("ref1=" + ref1);
        System.out.println("ref2=" + ref2);
        AtomicReference<AtomicClassDemo> ref3 = new AtomicReference<>(ref1);
        System.out.println("ref3 before set=" + ref3);
        boolean b2 = ref3.compareAndSet(ref2,ref1); // 修改expect为ref1测试
        System.out.println(b2);
        System.out.println("ref3 after set =" + ref3);
    }

}

/**
 output:
 200
 200
 300
 -------------------
 ref1=com.fun.concurrent.AtomicClassDemo@74a14482
 ref2=com.fun.concurrent.AtomicClassDemo@1540e19d
 ref3 before set=com.fun.concurrent.AtomicClassDemo@74a14482
 false
 ref3 after set =com.fun.concurrent.AtomicClassDemo@74a14482
 */

3 lock与ReentrantLock

此小节重点学习下ReentrantLock,区别于内置锁,ReentrantLock是一个显示锁。他那有那些特性呢?

3.1 轮询锁和定时锁

使用tryLock() 方法可以在不能获取到锁的情况下,使用定时或者轮询的方式获取所,执行时间内没有完成就释放锁,平滑的退出任务。而内置锁会阻塞在获取锁的地方,一旦操作不当就可能发生死锁,如果出现死锁了,唯一的解决办法就是重启程序。使用定时或者轮询锁可以有效的避免死锁的问题

转账示例:

package com.fun.concurrent;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by fun
 * @date 2017/4/12.
 */
public class ReentrantLockDemo {

    public static void main(String[] args) {

        ReentrantLockDemo test = new ReentrantLockDemo();

        Account fromAcct = test.newAccount(4000);
        Account toAcct = test.newAccount(1000);

        test.transferMoney(fromAcct,toAcct,1000L,20000,TimeUnit.NANOSECONDS);

        System.out.println("fromAccount balance=" + fromAcct.getBalance() + "\ntoAccount balance=" + toAcct.getBalance());

    }

    // 转账示例
    public boolean transferMoney(Account fromAcct,
                                 Account toAcct,
                                 long amount,
                                 long timeout,
                                 TimeUnit unit) {
        long stopTime = System.nanoTime() + unit.toNanos(timeout); // 超时时间
        while (true) {
            if (fromAcct.lock.tryLock()) {
                try {
                    if (toAcct.lock.tryLock()) {
                        try {
                            boolean rs = false;
                            if (fromAcct.debit(amount) ){
                                rs = toAcct.credit(amount);
                            }
                            return rs;
                        } finally {
                            toAcct.lock.unlock();
                        }
                    }
                } finally {
                    fromAcct.lock.unlock();
                }
            }
            if (System.nanoTime() > stopTime) { // 如果已经超时了,就直接返回,提前结束任务
                return false;
            }
            try {
                long x = new Random().nextInt(50) + 10;
                Thread.sleep(timeout/x ); // 过一会儿再尝试下一次获取锁
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    protected class Account {
        private Lock lock = new ReentrantLock();

        private long balance;

        public boolean debit(long amount) {
            if (balance < amount) {
                return false;
            }
            balance = balance - amount;
            return true;
        }

        public boolean credit(long amount) {
            balance = balance + amount;
            return true;
        }

        /*getter and setter*/
    }

    public Account newAccount(long balance) {
        Account account = new Account();
        if (balance > 0) {
            account.setBalance(balance);
        } else {
            account.setBalance(0);
        }
        return account;
    }
}

在几次获得锁的地方,都是trylock, 在多线程的情况下,如果没有获得锁的时候,线程并不会阻塞,而是之后往后面运行。进入判断是否超时的语句。如果超时就退出,可以避免等待加锁可能出现的死锁问题。

注意:但是使用显示锁人的时候,很容易在编写程序的时候忘记了释放锁,切记,在使用显示锁的时候一定要在try-finally 的finally里面对锁进行释放。

3.2 可中断的锁操作

lockInterruptibly()方法,可中断的获取锁的方式,在获取锁的过程中可能被中断,方法本身是可以抛出InterruptException的

// 此方法源码
 public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
 }

他的使用和不同lock和tryLock一样,只是需要在外面处理lockInterruptibly的InterruptException.

3.3 非块结构加锁

对链表上的每个节点单独建立锁,使不同的线程可以独立的对链表的不同部分进行操作。所得很模糊,需要结合ConcurrentHashMap理解

3.3 公平性

公平性是在竞争资源时候的一种策略,大部分情况都是使用公平原则来获取锁,例如:FIFO 队列。但是,有时候前面的操作比较耗时的时候,会拖慢整个处理速率,这个时候不公平原则可以提前获得锁。例如,线程A获得一个对象的锁,现线程B和C都在等待这个锁,当A释放锁的时候,如果B唤醒的时间比较的久,此时C先获得锁,使用了并释放了,B刚好唤醒,获得锁。这个过程B的时间没有耽误。同时C也处理了,增加了吞吐量。但是,我还要说但是。使用不公平性的时候同样会有问题,不公平的比较极端的情况就是找出一个线程一直拿不到锁,一直等待。所以使用时候需要权衡和控制。

ReentrantLock 可以设置不保证公平性。

4 CountDownLatch & Semaphore

4.1 CountDownLatch

CountDownLatch 有什么作用呢 ?它就是一个同步助手,它能够让一个或者多个线程等到另外的线程完成一系列的操作之后再执行。

内部有一个倒数计数器,当倒数计数器减到0的时候释放锁。

先看示例:

package com.fun.concurrent;

import java.util.concurrent.CountDownLatch;

/**
 * CountDownLatch 使用和测试
 *
 * @author fun
 * @date 2017-04-13 17:50
 */
public class CountDownLatchDemo {

    public static void main(String[] args) {

        CountDownLatchDemo test = new CountDownLatchDemo();

        int N = 10;
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; i++) {
            new Thread(test.newWorker(startSignal,doneSignal)).start();
        }

        try {
            System.out.println("do something else 1");
            startSignal.countDown();
            System.out.println("do something else 2");
            doneSignal.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public Worker newWorker(CountDownLatch startSignal, CountDownLatch doneSignal){
        return new Worker(startSignal,doneSignal);
    }

    class Worker implements Runnable {

        private final CountDownLatch startSignal;
        private final CountDownLatch doneSignal;

        Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
            this.startSignal = startSignal;
            this.doneSignal = doneSignal;
        }
        public void run() {
            try {
                startSignal.await();
                doWork();
                doneSignal.countDown();
            } catch (InterruptedException ex) {} // return;
        }
        void doWork() {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("do something...");
        }
    }



}

此示例中,所有Runnable共享一个startSignal 和 一个 doneSignal。CountDownLatch的 await() 方法等待计数器变为0在唤醒。而在执行countDown() 方法的时候,每次countDown
就会是计数器减一,知道减少为0才释放。

查看源码中countDown 的过程

// countDown
public void countDown() {
    sync.releaseShared(1);
}

// sync.releaseShared
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// tryReleaseShared 在countDownLatch中的Sync中有重写父类的方法
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

很显然,只有当count=0的时候才会释放锁。在回头看上面的示例程序,分析如下:

  1. startSignal.countDown(); Runnable任务线程创建并start了,但是出于wait状态,等待计数器变为0,次代码操作把计数器减一变为0,所有任务开始工作。
  2. System.out.println("do something else 2");主线程忙其他事情
  3. doneSignal.await();所有任务完成之前,主线程从此处开始阻塞(挂起等待)。doneSignal计数器不变为0,主线程一直挂起。完成一个任务,count-1,直到所有任务完成,count=0,主线程醒过来并完成后面的动作。

利用CountDownLacth 能做很多事,例如进项大数据的一个累加,可以分成多个线程处理,然后在主线程中合并(累加)多个任务的结果。增加处理速率(这个有点像MapReduce的思想)。

思考: 其实看了CountDownLatch 之后,发现和volatile+synchronized效果很像。完全可以控制一个volatile的count变量等于任务数,完成一个任务,count-1,主线程wait,等到count=0。 效果差不多。但是代码实现上就较CountDownLatch 复杂点。所有有类似这样的功能,应该优先想到CountDownLatch

4.2 Semaphore

使用信号量的时候,一个线程想要获得一个item,必须要先从Semaphore那里获得许可(permit),保证item是可用的。当线程完成任务的时候,在向pool归还item同时还需要向Semaphore归还许可,以便其他线程可以使用item。需要注意的是,当调用acquire的时候,不需要额外加锁限制,因为这样将会阻止item被归还到pool。 实际上Semaphore已经封装了同步锁来保证item的获取,并且pool对每个item有单独的维护。

当Semaphore被初始化成一个并且只有之多一个许可的时候,他就表现成了一个互斥锁。这个更像一个Binary Semaphore一样,因为他只有两个状态: 有一个可用许可,没有可用许可。当以这种方式使用的时候,semaphore 和其他Lock的实现不同,他能够允许锁被线程本身释放,而不是锁的所有者。

Semaphore 构造方法有个释放使用公平锁的方式。当使用不公平锁的时候,不保证获取许可的先后顺序。公平性在前文有说过,有时候可以提高吞吐量,避免一直等待。但是也同样也会出现一直获取不到锁也进去一直等待。

Semaphore可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池

实现分析:

Semaphore 实现和CountDownLatch有几分相似:

  1. CountDownLatch里面有个count计数器,每次操作countDown 则count = count-1 ,当count==0 的时候才释放所
  2. Semaphore 则是内部维护一个available的数量,每次减去获取permits的数量,得到剩余的数量,释放锁的时候available加上归还的permit的数量。获取的锁的过程就是对available减操作,release则是加回对available的操作。

Semaphore默认实现是不公平性的,就以不公平性的实现来看吗:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

选取源码中关键的两个方法吗,加锁和解锁。

  1. 获取permit: 先使用getState后去当前可用的permit的数量,剩余数量=当前数量-申请的数量,然后再执行CAS设置状态,并返回剩余可用数量。
  2. 释放锁:释放锁的过程时间就是归还permit可用数量的过程。当前可用数量+归还数量<当前数量的是时候,或者已经溢出,归还数量为负数了,如果current + releases >=cuurent,执行CAS 设置状态值。

5 ConcurrentHashMap

使用分段锁(Lock striping)的方式,使锁的粒度更细来实现更大程度的共享,提高并发性和伸缩性。

锁分段(Lock striping):在某些情况下,可以将锁分解技术进一步扩展为对一组独立对象上的锁进行分解。这种情况被称为锁分段。

例如: 在concurrentHashMap 中使用一个包含16个锁的数组,每个锁保护一个散列桶,其中第N个散列桶由第(N mod 16)个锁来写入。假设散列函数具有合理性,并且关键字分布均匀,那么这大约能都把对于锁的请求减少到原来1/16。正是这项技术使得ConcurrentHashMap能够支持多大16个并发的写入器。

锁分段劣势: 与采用单个锁来实现独占访问相比,要获取多个锁来实现独占访问将更加困难并且开销更高。例如:ConcurrentHashMap在扩容的时候,以及重新计算Hash并且重新散列分布时候,都需要获取所有锁,实现整个Map的独占访问。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,798评论 1 19
  • Java-Review-Note——4.多线程 标签: JavaStudy PS:本来是分开三篇的,后来想想还是整...
    coder_pig阅读 1,629评论 2 17
  • 下面是我自己收集整理的Java线程相关的面试题,可以用它来好好准备面试。 参考文档:-《Java核心技术 卷一》-...
    阿呆变Geek阅读 14,738评论 14 507
  • 这几天沈阳天气极好。于是就想起了一句形容官运亨通的话,要风得风,要雨得雨。你说太阳底下热吧,那小风一吹,片刻就凉爽...
    戚盛鹏阅读 182评论 0 0
  • 图片发自简书App 风去,羞涩之年华,遗留飘飘衣袂,更兼几缕乱青丝。凝神远望处,有剑在夜色最深处轻舞飞扬。 清风抚...
    匹马观潮阅读 455评论 4 0