4 Java并发-1

第一课 多线程入门

1 基本入门:Thread + Runnable

一个任务:
Runnable:要的是那个run方法
Callable和Future:要的是那个call方法,future里放的是子线程的返回结果,get方法会阻塞等待返回,就是等call方法返回

一个线程:Thread

LiftOff1 launch = new LiftOff1();
new Thread(launch, "thread-1").start();

一个线程承载了一个任务

  • 线程的状态:

    • new:已经创建完毕,且已经start,资源分配完毕,等待分配时间片了,这个状态只会持续很短的时间,下一步就会进入运行或者阻塞
    • run:就绪状态,只要给了时间片,就会运行,在任一时刻,thread可能运行也可能不运行
    • block:阻塞状态,程序本身能够运行,但有个条件阻止了它运行,调度器会忽略这个线程,直到跳出阻塞条件,重新进入就绪状态
    • dead:run()方法返回,或者被中断
  • 线程不安全的分类

    • 活性失败,源自JVM的提升优化,使用volatile解决
    • 安全性失败,源自非原子操作期间被读或写,使用atom或各种锁解决

2 Java的线程管理:

Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command)
Executors:控制着一堆线程池

ExecutorService:一个接口,继承自Executor,具有服务生命周期的Executor
例如关闭,这东西知道如何构建恰当的上下文来执行Runnable对象
是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法

ScheduledExecutorService:一个接口,继承自ExecutorService, 一个可定时调度任务的接口

AbstractExecutorService:ExecutorService执行方法的默认实现

ThreadPoolExecutor:继承自AbstractExecutorService,线程池,可以通过调用Executors以下静态工厂方法来创建线程池并返回一个ExecutorService对象

ScheduledThreadPoolExecutor:ScheduledExecutorService的实现,父类是ThreadPoolExecutor,一个可定时调度任务的线程池

用法:Executors的每个方法都可以传入第二个参数,一个ThreadFactory对象

ExecutorService exec = Executors.newCachedThreadPool(); //线程数总是会满足所有任务,所有任务都是并行执行,同时抢时间片,而旧线程会被缓存和复用
ExecutorService exec = Executors.newFixedThreadPool(2); //两个线程同时运行,其他的会排队等待
ExecutorService exec = Executors.newSingleThreadExecutor(); //1个线程同时运行,即多个任务会串行执行
ExecutorService exec = Executors.newWorkStealingPool();  //不知道啥意思
ScheduledExecutorService exec = Executors.newScheduledThreadPool(10);//不知道啥意思
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();//不知道啥意思

for(int i = 0; i < 5; i++){
    exec.execute(new LiftOff1()); //提交任务
}

//shutdown会关闭线程池入口,不能再提交新任务,但之前提交的,会正常运行到结束
//如果不关闭,线程池会一直开着,等待提交任务,进程也就不会关闭
exec.shutdown();  

这些线程池基本结构都是这个:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) //后两个参数为可选参数
                          
corePoolSize: 线程池维护线程的最少数量

maximumPoolSize:线程池维护线程的最大数量

keepAliveTime: 线程池维护线程所允许的空闲时间,超时则线程死,死到最少数量corePoolSize

unit: keepAliveTime的单位

workQueue: 线程池所使用的缓冲队列

threadFactory:创建新线程的方式

handler: 线程池对拒绝任务的处理策略

3 ThreadFactory

//设置ThreadFactory:只有当需要新线程时,才会来这里调用,就是说ThreadFactory本身不管理线程池,只是给线程池干活的

ExecutorService exec = Executors.newFixedThreadPool(2, new ThreadFactory() {
    
    private int threadCount = 0;
    
    @Override
    public Thread newThread(Runnable r) {
        threadCount++;
        Thread tr = new Thread(r, "thread-from-ThreadFactory-" + threadCount);
        //这里可以给线程设置一些属性
        tr.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){  
            @Override  
            public void uncaughtException(Thread t, Throwable e) {  
                e.printStackTrace();
            }  
        });  
        //tr.setDaemon(true);    //设置这个为true,则主线程退出时,子线程不管是否结束,都退出
        tr.setPriority(Thread.MAX_PRIORITY);  
        
        return tr;
    }
});

4 让出时间片

Thread.yield();
通知并建议线程调度器,我已经做完了主要工作,时间片你可以分给别了
即使调用了这个,还是可能没有切换时间片,或者切换了,但是还是给了当前线程

Thread.sleep(1000);
TimeUnit.SECOND.sleep(1);
让当前线程进入睡眠状态,程序就阻塞在这里了
这个的表现应该是比yield良好多了

但这两个的特性,都不应该过于依赖,作者再三叮嘱了
因为系统对时间片的划分是不可依赖的
你的程序也不会对时间片的划分有什么依赖

5 线程属性:

后台线程:
tr.setDaemon(true); //设置这个为true,则主线程退出时,子线程不管是否结束,都退出
后台线程表示在程序后台提供一种通用服务的线程,且不是程序不可或缺的部分
当所有非后台线程结束了,后台线程也就结束了
isDaemon()判断是否后台线程
从后台线程创建的线程,自动默认是后台线程

优先级:
tr.setPriority(Thread.MAX_PRIORITY);
仅仅是执行频率较低,不会造成死锁(线程得不到执行)
JDK有十个优先级,但和操作系统映射的不是很好
windows有7个优先级,但不固定
Sun的Solaris有2的31次方个优先级
所以调用优先级时,安全的做法是只使用:MAX_PRIORITY, NORM_PRIORITY, MIN_PRIORITY

线程名字:参数2
Thread tr = new Thread(r, "thread-name-" + threadCount);

全局异常捕捉:全局异常是基于线程的,并且异常不能跨线程传递
tr.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){
@Override
public void uncaughtException(Thread t, Throwable e) {
e.printStackTrace();
}
});

而Thread.setUncaughtExceptionHandler是给所有线程都设置一个全局异常捕捉

isAlive():线程是否还活着
这个会影响join
run方法执行完毕,是否isAlive?
线程被中断,是否isAlive?

6 Executor深入分析

public interface Executor {
    void execute(Runnable command);
}

意思就是给你一个command,你想让它在哪儿执行run

Excutor能决定的事:

  • 选择哪个线程
  • 执行runnable

Excutor管不了的事:

  • Callable,Future管不了
  • 没有一个线程池,线程池可能需要自己写,跟Executor没关系
  • 没法延时,定时运行

例子:
看下面代码里的三个Executor的实现,取自java源码里的注释,这几行代码基本阐明了Executor的作用

public class C2 {
    
    public static class MyExecutors{
        
        public static Executor newDirectThreadPool(){
            return new DirectExecutor();
        }
        
        public static Executor newPerTaskPerThreadThreadPool(){
            return new ThreadPerTaskExecutor();
        }
        
        public static Executor newSerialThreadPool(){
            return new SerialExecutor(new DirectExecutor());
        }
    }
    
    /**
     * an executor can run the submitted task immediately in the caller's thread
     */
    public static class DirectExecutor implements Executor {
        public void execute(Runnable r) {
            r.run();
        }
    }
    
    /**
     * spawns a new thread for each task
     */
    public static class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) {
            new Thread(r).start();
        }
    }
    
    /**
     * serializes the submission of tasks to a second executor
     * 类似安卓的AsyncTask里的串行化实现
     */
    public static class SerialExecutor implements Executor {
        final Queue<Runnable> tasks = new ArrayDeque<>();
        final Executor executor;
        Runnable active;
     
        SerialExecutor(Executor executor) {
          this.executor = executor;
        }
     
        public synchronized void execute(final Runnable r) {
          tasks.add(new Runnable() {
            public void run() {
              try {
                r.run();
              } finally {
                scheduleNext();
              }
            }
          });
          if (active == null) {
            scheduleNext();
          }
        }
     
        protected synchronized void scheduleNext() {
          if ((active = tasks.poll()) != null) {
            executor.execute(active);
          }
        }
      }
    
    public static void main(String[] args) {
        Runnable task = new Runnable() {
            public void run() {
                try {
                    Thread.sleep(2000);
                    System.out.println("running on thread " + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        
        MyExecutors.newDirectThreadPool().execute(task);
        MyExecutors.newPerTaskPerThreadThreadPool().execute(task);
        MyExecutors.newSerialThreadPool().execute(task);
        
    }
    
}

第二课:Callable和Future

Runnable不产生返回值,ExecutorService.execute(Runnable),走的是run方法
Callable产生返回值,ExecutorService.submit(Callable),走的是call方法

用法1:submit Callable and get a Future, block in future.get()

interface ArchiveSearcher { String search(String target); }
class App {
    ExecutorService executor = ...
    ArchiveSearcher searcher = ...
    
    void showSearch(final String target) throws InterruptedException {
    
      Future future = executor.submit(new Callable() {
          public String call() {
              return searcher.search(target);
          }}
      );
      
      displayOtherThings(); // do other things while searching
      
      try {
        displayText(future.get()); // use future
      } catch (ExecutionException ex) { cleanup(); return; }
    }
}

用法2:execute a FutureTask, and get a 

FutureTask future = new FutureTask(new Callable<String>() {
        public String call() {
            return searcher.search(target);
        }
    }
);
executor.execute(future);

future.get()


关于Callable:能返回就给返回,不能返回就抛异常
public interface Callable<V> {
    V call() throws Exception;
}


关于Future:
public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)  throws InterruptedException, ExecutionException, TimeoutException;
}

1 boolean cancel(boolean mayInterruptIfRunning);

参数的意思:
正在执行的task是否允许打断,如果是true,会打断,如果false,则允许in-progress的任务执行完

何时失败:
已经运行完的task
已经被cancel过的task
无法被中断的任务

怎么成功:
还没start的任务,比如在等待的,可以cancel
正在running的任务,参数mayInterruptIfRunning指定了是不是可以尝试interrupt

副作用:
只要cancel被调用了且返回true,isDone和isCancelled一直返回true

2 get:取回结果,如有必要,可以阻塞

可以阻塞就可以被interrupt呗
get()没有超时时间
get(1, TimeUnit.Second):表示最多阻塞1秒,过了一秒就抛出超时异常

---------------RunnableFuture,FutureTask

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}


public class FutureTask<V> implements RunnableFuture<V>{

}

FutureTask的初始化:
FutureTask(Callable<V> callable)
FutureTask(Runnable runnable, V result)
注意:Runnable + 返回值就是一个Callable了啊,具体看RunnableAdapter

总之,FutureTask内部就有了一个Callable

-----------------
关于:FutureTask(Runnable runnable, V result)
调用了:Executors.callable()
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

下一步就是:

exec.execute(runnable)
exec.execute(FutureTask)   会调用run方法
Future<Result> future = exec.submit(Callable)
以及,future的get阻塞是怎么实现的

先分析FutureTask,因为ExecuteService还没说到:

这里面FutureTask.run()被调用,大体实现是:

void run(){
    Callable<V> c = callable;
    result = c.call();
    outcome = result;
    U.putOrderedInt(this, STATE, NORMAL); // final state
    finishCompletion();
}

此时对于futureTask.get()
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);  ///在这无限循环等call运行结束,返回结果,如果状态是取消,中断等,抛出异常,还看超时时间,没事干会yield
    return report(s);  ///返回call的返回结果
}

下面再研究ExecutorService和AbstractExecutorService的submit方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

这不就顺起来了嘛,最后都归结到了execute和FutureTask


最后,FutureTask实现的是RunnableFuture,其实你完全可以不用FutureTask,
例如你想实现个MyFutureTask,这个不会在get方法里阻塞,而是基于异步IO,

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
 
    static class CustomTask implements RunnableFuture {...}
 
    protected  RunnableFuture newTaskFor(Callable c) {
        return new CustomTask(c);
    }
    protected  RunnableFuture newTaskFor(Runnable r, V v) {
        return new CustomTask(r, v);
    }
    // ... add constructors, etc.
  }

第三课 ExcecutorService和AbstractExecutorService深入分析

submit相关功能,get阻塞等,都由FutureTask来负责了,那ExecutorService还比Executor多了什么呢

生命周期是在这里管理的??

public static class DirectExecutorService implements ExecutorService{

    public void execute(Runnable command);
    public <T> Future<T> submit(Callable<T> task);
    public Future<?> submit(Runnable task);
    public <T> Future<T> submit(Runnable task, T result);
    
    public boolean isShutdown() ;
    public boolean isTerminated();
    public void shutdown();
    public List<Runnable> shutdownNow();
    
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks);
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
    
}

0 注意,AbstractExecutorService里面并没有对execute方法的实现,而是留给了子类,所以说线程池相关的东西这里是不负责的

1 shutdown系列

shutdown():拒绝再接收新的task,但已有的task会执行到terminate
shutdownNow():禁止再接收新的task,已有task,在waiting的不会再start,已经执行的会尝试stop掉

未shutdown状态:线程池还在运行,不管有没有running,waiting的task,都会一直等待add新task(通过execute或者submit)
shutdown状态:执行完现有task,就会terminate

2 awaitTermination

3 invokeAll和invokeAny

看方法doInvkeAny,大体套路是:

T doInvkeAny(Collection<Callable<T>> tasks, boolean timed, long nanos){

    int ntasks = tasks.size();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    
    ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
    Iterator<? extends Callable<T>> it = tasks.iterator();
    
    后面还有,没整明白是怎么添加的任务
}

涉及到了ExecutorCompletionService,CompletionService,LinkedBlockingQueue等等

这部分是干什么的都没整明白


private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)
        throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
        throw new IllegalArgumentException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);

    // For efficiency, especially in executors with limited
    // parallelism, check to see if previously submitted tasks are
    // done before submitting more of them. This interleaving
    // plus the exception mechanics account for messiness of main
    // loop.

    try {
        // Record exceptions so that if we fail to obtain any
        // result, we can throw the last exception we got.
        ExecutionException ee = null;
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();

        // Start one task for sure; the rest incrementally
        futures.add(ecs.submit(it.next()));
        --ntasks;
        int active = 1;

        for (;;) {
            Future<T> f = ecs.poll();
            if (f == null) {
                if (ntasks > 0) {
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                else if (active == 0)
                    break;
                else if (timed) {
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                else
                    f = ecs.take();
            }
            if (f != null) {
                --active;
                try {
                    return f.get();
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }

        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
}

4 关于ExecutorCompletionService,CompletionService

实现了CompletionService,将执行完成的任务放到阻塞队列中,通过take或poll方法来获得执行结果

///例4:(启动10条线程,谁先执行完成就返回谁)
public class CompletionServiceTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(10);        //创建含10条线程的线程池
        CompletionService completionService = new ExecutorCompletionService(executor);
        for (int i =1; i <=10; i ++) {
            final  int result = i;
            completionService.submit(new Callable() {
                public Object call() throws Exception {
                    Thread.sleep(new Random().nextInt(5000));   //让当前线程随机休眠一段时间
                    return result;
                }
            });
        }
        System.out.println(completionService.take().get());   //获取执行结果
    }
}
//输出结果可能每次都不同(在1到10之间)

第四课:ThreadPoolExecutor的实现,AbstractExecutorService的子类,也是ExecutorService的实现类

先看怎么构造:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,   //使用ThreadFactory创建新线程,默认使用defaultThreadFactory创建线程
                          RejectedExecutionHandler handler) 

corePoolSize:核心线程数,如果运行的线程少于corePoolSize,则创建新线程来执行新任务,即使线程池中的其他线程是空闲的

maximumPoolSize:最大线程数,可允许创建的线程数,corePoolSize和maximumPoolSize设置的边界自动调整池大小:
corePoolSize <运行的线程数< maximumPoolSize:仅当队列满时才创建新线程
corePoolSize=运行的线程数= maximumPoolSize:创建固定大小的线程池

keepAliveTime:如果线程数多于corePoolSize,则这些多余的线程的空闲时间超过keepAliveTime时将被终止

unit:keepAliveTime参数的时间单位

workQueue:保存任务的阻塞队列,与线程池的大小有关:
当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列
当运行的线程数等于或多于corePoolSize,在有新任务添加时则选加入队列,不直接创建线程
当队列满时,在有新任务时就创建新线程

threadFactory:使用ThreadFactory创建新线程,默认使用defaultThreadFactory创建线程

handle:定义处理被拒绝任务的策略,默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException

再看Executors里一堆new方法怎么用的:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  //使用同步队列,将任务直接提交给线程
                                  new SynchronousQueue<Runnable>());
}


//线程池:指定线程个数
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  //使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
                                  new LinkedBlockingQueue<Runnable>());
}


//单线程:基于一个固定个数的线程池,不管在哪里,实现串行执行,都是基于一个其他的线程池和一个队列
public static ExecutorService newSingleThreadExecutor() {
   return new FinalizableDelegatedExecutorService
                        //corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
                        (new ThreadPoolExecutor(1, 1,
                                                0L, TimeUnit.MILLISECONDS,
                                                new LinkedBlockingQueue<Runnable>()));
}

分析:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    
    //当前正在工作的线程数 < 允许的线程数,则创建新线程,运行task
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))  ///有个Worker内部类,内部会调用ThreadFactory.newThread()
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {   
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);             //调用RejectedExecutionHandler的handler.rejectedExecution(command, this);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);   //return false;
    }
    else if (!addWorker(command, false))
        reject(command);  //handler.rejectedExecution(command, this);
}

第五课:定时,延时--Schduled

//使用newScheduledThreadPool来模拟心跳机制
public class HeartBeat {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);  //5是corePoolSize
        Runnable task = new Runnable() {
            public void run() {
                System.out.println("HeartBeat.........................");
            }
        };
        executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS);   //5秒后第一次执行,之后每隔3秒执行一次
    }
}

第六课 join


public class C7 {
    
    public static class Sleeper implements Runnable{

        @Override
        public void run() {
            System.out.println("Sleeper:我先睡个5秒...");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                //e.printStackTrace();
                System.out.println("Sleeper---谁他妈吵醒我!");
            }
            System.out.println("Sleeper:我醒了!");
        }
    }
    
    public static class Guest implements Runnable{
        
        Thread sleeperThread;
        
        public Guest(Thread sleeperThread){
            this.sleeperThread = sleeperThread;
        }
        
        @Override
        public void run() {
            System.out.println("Guest:我是客人,我在这坐着等Sleeper醒来再说话");
            try {
                sleeperThread.join();
            } catch (InterruptedException e) {
                //e.printStackTrace();
                System.out.println("Guest:我得等他睡醒了啊!");
            }
            System.out.println("Guest:这哥终于睡醒了!");
        }
        
    }
    
    public static void main(String[] args) {
        Thread sleeper = new Thread(new Sleeper());
        Thread guest = new Thread(new Guest(sleeper));
        sleeper.start();
        guest.start();
        
        ///注掉下面这段,则sleeper会睡5秒,不注掉,3秒后就会打断sleeper的睡眠:被interrupt的是sleep方法
//      ScheduledExecutorService exec = Executors.newScheduledThreadPool(2);
//      exec.schedule(new Runnable() {
//          public void run() {
//              sleeper.interrupt();
//          }
//      }, 3, java.util.concurrent.TimeUnit.SECONDS);
//      
//      ///注掉下面这段,则guest会等5秒,不注掉,2秒后就会打断等待:被interrupt的是join方法
//      exec.schedule(new Runnable() {
//          public void run() {
//              guest.interrupt();
//          }
//      }, 2, java.util.concurrent.TimeUnit.SECONDS);
    }
    
}

想join到线程t,就得调用t.join(),这个方法类似sleep,会阻塞在这里,也可以interrupt

join():挂起当前线程,等待目标线程, t.join(),这里t是目标线程
join(millis,nano):超时参数,如果过了超时时间还是没等到,join就强制返回

sleeper和guest,guest需要在sleeper的线程对象上join,即sleeper.join()
直到sleeper的run方法返回,线程执行完毕,才会激活join,guest才退出阻塞,继续往下执行
sleeper结束时,sleeper.isAlive()为false

一个线程可以join到其他多个线程上,等到都结束了才继续执行

在当前线程c调用t.join()表示:

  • c等待t执行完毕,期间c和t都可以被中断
  • t必须是从c产生的线程

有类似需求,可以考虑CyclicBarrier,栅栏,可能比join更合适

第七课 共享受限资源

什么时候会出现共享受限资源的冲突?
有一份数据摆在这里,多个worker线程都对其进行修改,状态就可能会乱了

总之,每次访问一个资源时,从进去到出来,都要保证数据的一致性

基本上所有保护共享受限资源的方法,都是序列化对受限资源的访问(同步化),也就是程序到这里就变成串行了,加个锁保证同时只有一个线程访问,这种机制就叫互斥量

如果你改变一个对象的状态是一个复杂的过程:

  • 这期间你最好保证不要出现任何被打断的可能

1 原子类

原子操作被用来写无锁的代码,避免同步

原子操作不是同步化的,而是避免了同步化:
——普通的运算操作,如果要依赖原子性,要谨慎使用,至少编程思想里不推荐的,除非非常懂JVM,能编写JVM,编程思想就是这个意思
——但是可以使用Atom系列类来保证安全

有两部分内容:
——普通的运算操作的原子性,如加减乘除,这个很难搞懂,你知道a+b是不是原子操作?
——Atom系列类,提供了一套原子操作,基本还是有保障的

1.1 原子操作

普通运算的原子性:暂时不做研究了
a++不是原子操作
+-*/也不是原子操作
x = x + 1 =也不是原子操作

想了解更多,再看一遍编程思想

1.2 原子类

原子类是可以信赖的,可以用来做性能调优,避免写同步代码,避免序列化访问资源

public class EvenGenerator extends IntGenerator {
  private int currentEvenValue = 0;
  public int next() {
    ++currentEvenValue; // Danger point here!
    ++currentEvenValue;
    return currentEvenValue;
  }
  public static void main(String[] args) {
    EvenChecker.test(new EvenGenerator());
  }
}

原子操作就是一步能完成的操作:

AtomicInteger currentEvenValue = new AtomicInteger(0);
return currentEvenValue.addAndGet(2);  //这里给value增加了2,并返回其值
  • 注意:

    • 说是原子操作被用来构建Concurrent包,不建议你用
    • 用了原子操作,就省了很多加锁操作
  • 都有什么

    • AtomicInteger
    • AtomicLong
    • AtomicReference

2 Synchronized临界区

例子:

public class SynchronizedEventGenerator extends IntGenerator {
    private int currentEvenValue = 0;

    public int next() {
        synchronized (this) {
            ++currentEvenValue; 
            ++currentEvenValue;
            return currentEvenValue;
        }
    }

    public static void main(String[] args) {
        EvenChecker.test(new SynchronizedEventGenerator());
    }
}
public synchronized int next() {
    ++currentEvenValue; // Danger point here!
    ++currentEvenValue;
    return currentEvenValue;
}

相当于:
public int next() {
    synchronized (this) {
        ++currentEvenValue; 
        ++currentEvenValue;
        return currentEvenValue;
    }
}

  • synchronized的锁始终是加在一个对象上

    • 直接修饰一个方法时,就是this
    • 如果多个对象访问同一资源,锁就得加到一个外部的静态对象上
    • 作用于静态方法/属性时,锁住的是存在于永久的Class对象
  • synchronized的原理:

    • 每个object对象都有一个内置的锁
    • 所有对象都自动含有单一的锁,JVM负责跟踪对象被加锁的次数
    • 在任务(线程)第一次给对象加锁的时候, 计数变为1
    • 每当这个相同的任务(线程)在此对象上获得锁时,计数会递增
    • 只有首先获得锁的任务(线程)才能继续获取该对象上的多个锁
    • 每当任务离开时,计数递减,当计数为0的时候,锁被完全释放
    • 在HotSpot中JVM实现中,锁有个专门的名字:对象监视器
    • 更深入的讲:
    • 当多个线程同时请求某个对象监视器时,对象监视器会设置几种状态用来区分请求的线程
    • Contention List:所有请求锁的线程将被首先放置到该竞争队列,是个虚拟队列,不是实际的Queue的数据结构
    • Entry List:EntryList与ContentionList逻辑上同属等待队列,ContentionList会被线程并发访问,为了降低对 ContentionList队尾的争用,而建立EntryList
    • Contention List中那些有资格成为候选人的线程被移到Entry List
    • Wait Set:那些调用wait方法被阻塞的线程被放置到Wait Set
    • OnDeck:任何时刻最多只能有一个线程正在竞争锁,该线程称为OnDeck

注意:
wait,notify和synchronized的用法

3 锁

import java.util.concurrent.locks.*;

public class MutexEvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;
    private Lock lock = new ReentrantLock();

    public int next() {
        lock.lock();
        try {
            ++currentEvenValue;
            Thread.yield(); // Cause failure faster
            ++currentEvenValue;
            return currentEvenValue;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        EvenChecker.test(new MutexEvenGenerator());
    }
} 

比synchronized多了什么特性:
——可以尝试获取锁,不必非得阻塞在这
——提供了比synchronized更细粒度的控制
——在实现链表遍历节点时,有个节点传递的加锁机制(锁耦合),在释放这个节点的锁之前,必须捕获下个节点的锁
——synchronized引起的阻塞无法被interrupt方法中断,但ReentrantLock提供了可以被中断的机制
——ReentrantLock.lockInterruptly():如果得不到锁(被其他地方占用),就会阻塞,但是这个阻塞可以被interrupt()

例子:

import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class AttemptLocking {
    private ReentrantLock lock = new ReentrantLock();

    public void untimed() {
        boolean captured = lock.tryLock();
        try {
            System.out.println("tryLock(): " + captured);
        } finally {
            if (captured)
                lock.unlock();
        }
    }

    public void timed() {
        boolean captured = false;
        try {
            captured = lock.tryLock(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        try {
            System.out.println("tryLock(2, TimeUnit.SECONDS): " + captured);
        } finally {
            if (captured)
                lock.unlock();
        }
    }

    public static void main(String[] args) {
        final AttemptLocking al = new AttemptLocking();
        al.untimed(); // True -- lock is available
        al.timed(); // True -- lock is available
        // Now create a separate task to grab the lock:
        new Thread() {
            {
                setDaemon(true);
            }

            public void run() {
                al.lock.lock();
                System.out.println("acquired");
            }
        }.start();
        Thread.yield(); // Give the 2nd task a chance
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        al.untimed(); // False -- lock grabbed by task
        al.timed(); // False -- lock grabbed by task
    }
} /*
 * Output: tryLock(): true tryLock(2, TimeUnit.SECONDS): true acquired
 * tryLock(): false tryLock(2, TimeUnit.SECONDS): false
 */// :~
boolean captured = lock.tryLock();//不会阻塞,不管有没有得到锁,都往下执行
captured = lock.tryLock(2, TimeUnit.SECONDS); //会阻塞2秒,然后不管有没有得到锁,都往下执行

4 信号(Semaphore)

略过

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容