并发的多面性
使用并发解决问题大体上可以分为“速度”和“设计可管理性”两种。
更快的执行
并发通常是提高单处理器上程序的性能。并发首要解决的问题是阻塞。在单处理器系统中的性能提高的常见例子是事件驱动的编程。函数型语言将并发任务彼此隔离,每个函数调用都不会产生副作用,也不能干涉其他函数,函数作为独立的任务驱动。而java采取了更传统的方式,在顺序型语言的基础上提供了对线程的支持,优点是操作系统的透明性。
改进代码设计
java的线程机制是抢占式。调度机制会周期性地中断线程,将上下文切换到别的线程,从而为每个线程都提供时间片。
基本的线程机制
定义任务
使用Runnable接口描述任务。任务的run()方法通常会有某种形式的循环,使得任务一直运行下去直到不再需要,因此要设定跳出循环的条件或直接从run()中返回。
在run()中可以使用Thread.yield()对线程调度器提出建议(非确定性 ),表示可以切换到别的线程了。
Thread类
将Runnable对象转变为工作任务的传统方式是将它提交给Thread构造器。
mian()线程中的其他操作优先于run()中的操作。
使用Executor
java SE5的java.util.concurrent包中的执行器(Executor)为你管理Thread对象,简化了并发编程。Executor作为中介对象,桥接了客户端和任务执行,使你无需管理线程的生命周期。
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());//提交任务
exec.shutdown();//防止新任务提交到exec
}
Executor类型:
- newCachedThreadPool:优先。创建所需数量的线程,在回收旧线程时停止创建新线程
- newFixedThreadPool:一次性预先执行高昂的线程分配,固定线程数量
- newSingelThreadExecutor:数量为1的FixedThreadPool。若提交了多个任务,则这些任务将排队。任何时候都只有唯一的任务在运行。
- newScheduledThreadPool:创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
任务中产生返回值
Runnable不返回任何值。Callable接口返回值,这是一个具有类型参数的泛型,必须使用ExecutorService.submit()方法调用它。
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() {
return "result of TaskWithResult " + id;
}
}
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results =
new ArrayList<Future<String>>(); //使用Future对象获取返回值
for(int i = 0; i < 10; i++)
results.add(exec.submit(new TaskWithResult(i)));
for(Future<String> fs : results)
try {
// get() 阻塞知道返回值
System.out.println(fs.get());
} catch(InterruptedException e) {
System.out.println(e);
return;
} catch(ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
可以在get()获取结果之前,使用具有超时的get()或isDone()查看任务是否完成。
休眠
sleep()使任务中止执行给定的时间。对sleep()的调用interrupt()可能抛出InterruptedException异常,异常不能跨线程传播回main(),因此必须在本地处理所有在任务内部产生的异常。
优先级
虽然调度器的处理是不确定的,但是依然倾向于优先权较高的线程先执行,优先权较低的线程执行频率较低。thread.getPriority()读取优先级,setPriority(Thread.MAX_PRIORITY | Thread.NORM_PRIORITY | Thread.MIN_PRIORITY)修改优先级。若使用executor,优先级一般在run()方法中设定,调用Thread.currentThread()来获取驱动该任务的Thread对象。
后台(daemon)线程
程序运行的时候在后台提供某种通用服务的线程,当所有非后台线程结束时,程序就终止了,并会杀死所有的后台线程。
必须在线程启动前调用thread.setDaemon()方法,才能将它设置为后台线程。而使用executor时可以继承ThreadFactory定义thread,传入executor构造器中。
//Thread工厂
public class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
//任务
public class DaemonFromFactory implements Runnable {
public void run() {
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
print(Thread.currentThread() + " " + this);
}
} catch(InterruptedException e) {
print("Interrupted");
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool(
new DaemonThreadFactory());
for(int i = 0; i < 10; i++)
exec.execute(new DaemonFromFactory());
print("All daemons started");
TimeUnit.MILLISECONDS.sleep(500); // Run for a while
}
}
可以通过thread.isDaemon()判断是否是后台线程,后台线程创建的任何线程都被自动设置为后台线程。而且后台进程在不执行finanlly子句的情况下就会终止其run()。
编码的变体
可以直接从Thread继承重写run()方法或在Runnable实现内中创建Thread对象,并可以在构造器中调用start()启动线程。需要注意的是,在构造器中启动线程可能会导致别的任务在构造器结束之前开始执行,有可能会访问处于不稳定的对象,因此需要避免在构造器中启动线程。还可以使用内部类将线程代码隐藏。
加入一个线程
一个线程可以调用t.join(),此线程将被挂起,直到t线程结束(t.isAlive()返回假)才恢复。
class Sleeper extends Thread {
private int duration;
public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
}
public void run() {
try {
sleep(duration);
} catch(InterruptedException e) {
print(getName() + " was interrupted. " +
"isInterrupted(): " + isInterrupted());
return;
}
print(getName() + " has awakened");
}
}
class Joiner extends Thread {
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
}
public void run() {
try {
sleeper.join();
} catch(InterruptedException e) {
print("Interrupted");
}
print(getName() + " join completed");
}
}
public class Joining {
public static void main(String[] args) {
Sleeper
sleepy = new Sleeper("Sleepy", 1500),
grumpy = new Sleeper("Grumpy", 1500);
Joiner
dopey = new Joiner("Dopey", sleepy),
doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
}
} /* Output:
Grumpy was interrupted. isInterrupted(): false
Doc join completed
Sleepy has awakened
Dopey join completed
*///:~
捕获异常
由于线程的本质特性,不能捕获从线程中逃逸的异常,一旦异常逃出任务的run()方法,就会对外传播到控制台。一般使用Executor捕获异常。Thread.UncaughtExecptionHandler允许在每个Thread对象上附着一个异常处理器,Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用。
方法一:
class ExceptionThread2 implements Runnable {
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by " + t);
System.out.println(
"eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughtExceptionHandler implements
Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}
class HandlerThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread");
Thread t = new Thread(r);
System.out.println("created " + t);
//设置异常处理器
t.setUncaughtExceptionHandler(
new MyUncaughtExceptionHandler());
System.out.println(
"eh = " + t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaughtException {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool(
new HandlerThreadFactory());
exec.execute(new ExceptionThread2());
}
}
方法二:对Thread类中设置一个静态域,在线程没有专有的异常捕获器时调用。
Thread.setDefaultUncaughtExceptionHandler(
new MyUncaughtExceptionHandler());
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
共享受限资源
不正确的访问资源
解决共享资源竞争
防止冲突的方法就是当资源被一个任务使用时,在其上加锁。其他任务在解锁前就无法访问它了。基本上所有并发模式在解决线程冲突问题时都是采用序列化访问共享资源的方案(任何时刻值允许一个任务访问资源),具体是通过加入互斥量(锁)实现的。
synchronized关键字保护变量,当执行到该代码片段时,检查锁是否可用,然后获取锁,执行代码,释放锁。要控制对共享资源的访问,先包装进一个对象,然后将所有要访问这个资源的方法标记为synchronized,如果某个任务处于对该方法的访问中,其他所有调用该类中的synchronized方法都要被阻塞。
synchronized void f() {}
此时,锁是加在对象上的。注意,在使用并发时,所有域为private非常必要。因为synchronized不能防止直接访问域。
一个任务可以多次获得对象的锁,JVM会跟踪对象被加锁的次数。
针对每一个类也有锁(作为类的Class对象的一部分),所以synchronized修饰 static方法可以在类的范围防止对static数据的并发访问,其实就是在类对象上加锁。
Synchronized同步静态方法和非静态方法总结
显式的Lock对象
与内建的锁对比,代码缺乏优雅性,但更灵活,还可以使用tryLock()尝试获得锁。
Lock lock = new ReetrantLock();
lock.lock();
boolean captured = lock.tryLock();
lock.unlock();
原子性和易变性
原子性操作是不能被线程调度机制中断的操作。
除long或double之外的基本类型的简单操作是原子操作。JVM将64位的读写当做两个32位操作执行,所以有字撕裂的可能性。当定义long或double变量是,可以使用volatile(易变性)关键字,会获得读写的原子性。但一般不要依靠原子性操作。
volatile关键字保存了域的可视性,只要对域产生写操作,所有读操作都可以看到这个修改。即时使用本地缓存也是,valatile域会立即被写入到主存中,而读取操作就发送在主存中。如果使用了synchronized来保护域,就不必设置为volatile,所以synchronized是第一选择。
若域依赖之前的值或收到其他域的限制,则volatile无法工作。
原子性操作的判断由语言而不同,因此不要凭直觉依赖。
原子类
java SE5引入了AtomicInteger、AtomicLong、AtomicReference等原子性变量类。提供原子性条件更新操作:
boolean compareAndSet(expedtedValue, updateValue)
性能调优时可以用到
临界区
只希望防止多个线程同时访问方法的部分代码时,可以使用临界区(同步控制块):
synchronized(synObject){//
// this code can be accessed by only one task at a time
}
利用同步控制块,可以使用线程安全的类保护线程不安全的类。
在其他对象上同步
synchronized块必须给定一个在其上进行同步的对象,一般使用方法在被调用的当前对象:synchronized(this)。这种方式,获取了synchronized块上的🔐,那么该对象其他的synchronized方法和临界区就不能被调用了。若在另一个对象上同步,就必须确保所有相关的任务都是在同一个对象上同步的。不同对象上的不同是独立的。
线程本地存储
防止共享资源冲突的第二种方式是根除对变量的共享。线程本地存储是一种自动化机制,可以为使用相同变量的每个不同的线程都创建不同的存储,这可以使用java.lang.ThreadLocal类来实现。
class Accessor implements Runnable {
private final int id;
public Accessor(int idn) { id = idn; }
public void run() {
while(!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
public String toString() {
return "#" + id + ": " +
ThreadLocalVariableHolder.get();
}
}
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value =
new ThreadLocal<Integer>() {
private Random rand = new Random(47);
protected synchronized Integer initialValue() {
return rand.nextInt(10000);
}
};//一般使用静态域
public static void increment() {
value.set(value.get() + 1);
}
public static int get() { return value.get(); }
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new Accessor(i));
TimeUnit.SECONDS.sleep(3); // Run for a while
exec.shutdownNow(); // All Accessors will quit
}
}
终结任务
在阻塞中终结
一个线程可以处于一下四种状态之一:
1)新建(new):当线程被创建时,会短暂地处于这种状态。已经分配了必需的系统资源,并执行了初始化。已经有资格获得CPU时间,之后调度器吧这个线程转变为可运行或阻塞状态。
2)就绪(Runnable):只要调度器吧时间片分给线程,就可以运行了。在任意时刻,线程可以运行也可以不运行。
3)阻塞(Blocked):线程能够运行,但有某个条件阻止它的运行。当线程处于阻塞状态时,调度器会忽略线程,直到线程重新进入就绪状态。
4)死亡(Dead):再也不可调度,任务已借宿。一般死亡是从run()方法返回,但是还可以被中断。
中断
Thread类中有interrupt()方法,你可以协作终止被阻塞的任务。如果一个线程已经被阻塞,或者试图执行一个阻塞操作,那么设置这个线程的中断状态将抛出InterruptedException。Executor调用shutdownNow()时,就会发送一个iterrupt()给它启动的所有线程。事实上,你无法保证一定能使用interrupt()终止一个线程。
//中断由Executor启动的单个线程的方式
Future<?> f = executor.submit(new Runnable());//不能调用get()
f.cancel(true); //传入true可以拥有在该线程上调用interrupt()的权限。
sleep()阻塞是可中断的,IO和synchronized是不可中断的阻塞,因此后两者不需要InterruptedException处理器。
因此IO和synchronized有可能锁住你的多线程程序,解决方案是关闭任务在其上发送阻塞的底层资源。但被阻塞的nio通道会自动响应中断。
ReetrantLock上阻塞的任务具有被中断的能力。
检查中断
调用Thread.interrupted()检查线程是否被中断,但状态在该方法调用后就会被清除,比如,你连续调用两次这个方法,第二次调用会返回false,除非线程被再次interrupt()。Thread.isInterrupted()不会清除状态。
中断依旧要执行finally(),所以在清理对象中都要加上try-finally。
线程之间的协作
Object的wait()与notifyAll()
当任务遇到对wait()的调用时,当前的线程执行被挂起,对象上的锁被释放,其他任务就可以获得这个锁(sleep和yield的锁没有释放)。通过notify()、notifyAll()或者时间到期,线程从wait()中恢复执行。
注意,只能在同步控制方法或块中调用这些方法(必须拥有对象的锁),否则运行时会获得IllegalMonitorStateException异常(This method should only be called by a thread that is the owner of this object's monitor)。
class Car {
private boolean waxOn = false;
//同步块中运行
public synchronized void waxed() { //打蜡
waxOn = true; // Ready to buff
notifyAll();
}
public synchronized void buffed() { //抛光
waxOn = false; // Ready for another coat of wax
notifyAll();
}
public synchronized void waitForWaxing()
throws InterruptedException { //等打蜡
while(waxOn == false) //使用判断while非常重要,因为有可能是无关任务的notify
wait();
}
public synchronized void waitForBuffing()
throws InterruptedException { //等抛光
while(waxOn == true)
wait();
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
printnb("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch(InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax On task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) { //判断当前Thread是否被中断
car.waitForWaxing();
printnb("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch(InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax Off task");
}
}
public class WaxOMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5); // Run for a while...
exec.shutdownNow(); // Interrupt all tasks
}
}
Object的notify()与notifyAll()
notify()并不是notifyAll()的优化,使用notify()时在多个等待任务中只有一个被唤醒。为了使用notify(),所有任务必须等待相同的条件,所以一般使用notifyAll()。notifyAll()因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒。
生产者和消费者
class Meal {
private final int orderNum;
public Meal(int orderNum) { this.orderNum = orderNum; }
public String toString() { return "Meal " + orderNum; }
}
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant r) { restaurant = r; }
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
while(restaurant.meal == null)
wait(); // ... for the chef to produce a meal
}
print("Waitperson got " + restaurant.meal);
synchronized(restaurant.chef) {
restaurant.meal = null;
//注意notify的调用对象
restaurant.chef.notifyAll(); // Ready for another
}
}
} catch(InterruptedException e) {
print("WaitPerson interrupted");
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant r) { restaurant = r; }
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
while(restaurant.meal != null)
wait(); // ... for the meal to be taken
}
if(++count == 10) {
print("Out of food, closing");
restaurant.exec.shutdownNow();
}
printnb("Order up! ");
synchronized(restaurant.waitPerson) {
restaurant.meal = new Meal(count);
//注意notify的调用对象
restaurant.waitPerson.notifyAll();
}
//注意,若没有sleep阻塞,任务将回到run()循环的顶部,并由于Thread.interrupted()测试而退出,并不抛出异常
TimeUnit.MILLISECONDS.sleep(100);
}
} catch(InterruptedException e) {
print("Chef interrupted");
}
}
}
public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
} /* Output:
Order up! Waitperson got Meal 1
Order up! Waitperson got Meal 2
Order up! Waitperson got Meal 3
Order up! Waitperson got Meal 4
Order up! Waitperson got Meal 5
Order up! Waitperson got Meal 6
Order up! Waitperson got Meal 7
Order up! Waitperson got Meal 8
Order up! Waitperson got Meal 9
Out of food, closing
WaitPerson interrupted
Order up! Chef interrupted
*///:~
使用显式的Lock和Condition对象
使用互斥并允许任务挂起的基本类是Condition,可以使用condition.await()来挂起任务,再用signal()(signalAll())唤醒这个任务。condition由lock.newCondition()获得。
class Car {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean waxOn = false;
public void waxed() {
lock.lock();
try {
waxOn = true; // Ready to buff
condition.signalAll();
} finally {
lock.unlock();
}
}
public void buffed() {
lock.lock();
try {
waxOn = false; // Ready for another coat of wax
condition.signalAll();
} finally {
lock.unlock();
}
}
public void waitForWaxing() throws InterruptedException {
lock.lock();
try {
while(waxOn == false)
condition.await();
} finally {
lock.unlock();
}
}
public void waitForBuffing() throws InterruptedException{
lock.lock();
try {
while(waxOn == true)
condition.await();
} finally {
lock.unlock();
}
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
printnb("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch(InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax On task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
car.waitForWaxing();
printnb("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch(InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax Off task");
}
}
public class WaxOMatic2 {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
} /* Output: (90% match)
Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
Ending Wax Off task
Exiting via interrupt
Ending Wax On task
*///:~
生产者-消费者与队列
使用同步队列解决任务协作问题。同步队列在任何时刻只允许一个任务插入或移除元素。java.util.concurrent.BlockingQueue接口提供了队列,实现有LinkedBlockingQueue(无界队列)、ArrayBlockingQueue(有限尺寸)。消费者从队列中获取对象,生产者从队列中插入对象。
class LiftOffRunner implements Runnable {
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> queue) {
rockets = queue;
}
public void add(LiftOff lo) {
try {
rockets.put(lo);
} catch(InterruptedException e) {
print("Interrupted during put()");
}
}
public void run() {
try {
while(!Thread.interrupted()) {
LiftOff rocket = rockets.take();
rocket.run(); // Use this thread
// 显式调用run()而使用自己的线程来运行,而不是为每个任务启动一个线程
}
} catch(InterruptedException e) {
print("Waking from take()");
}
print("Exiting LiftOffRunner");
}
}
public class TestBlockingQueues {
static void getkey() {
try {
// Compensate for Windows/Linux difference in the
// length of the result produced by the Enter key:
new BufferedReader(
new InputStreamReader(System.in)).readLine();
} catch(java.io.IOException e) {
throw new RuntimeException(e);
}
}
static void getkey(String message) {
print(message);
getkey();
}
static void test(String msg, BlockingQueue<LiftOff> queue) {
print(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for(int i = 0; i < 5; i++)
runner.add(new LiftOff(5));
getkey("Press 'Enter' (" + msg + ")");
t.interrupt();
print("Finished " + msg + " test");
}
public static void main(String[] args) {
test("LinkedBlockingQueue", // Unlimited size
new LinkedBlockingQueue<LiftOff>());
test("ArrayBlockingQueue", // Fixed size
new ArrayBlockingQueue<LiftOff>(3));
test("SynchronousQueue", // Size of 1
new SynchronousQueue<LiftOff>());
}
}
吐司BlockingQueue
class Toast {
public enum Status { DRY, BUTTERED, JAMMED }
private Status status = Status.DRY;
private final int id;
public Toast(int idn) { id = idn; }
public void butter() { status = Status.BUTTERED; }
public void jam() { status = Status.JAMMED; }
public Status getStatus() { return status; }
public int getId() { return id; }
public String toString() {
return "Toast " + id + ": " + status;
}
}
class ToastQueue extends LinkedBlockingQueue<Toast> {}
class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) { toastQueue = tq; }
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(
100 + rand.nextInt(500));
// Make toast
Toast t = new Toast(count++);
print(t);
// Insert into queue
toastQueue.put(t);
}
} catch(InterruptedException e) {
print("Toaster interrupted");
}
print("Toaster off");
}
}
// Apply butter to toast:
class Butterer implements Runnable {
private ToastQueue dryQueue, butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
public void run() {
try {
while(!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = dryQueue.take();
t.butter();
print(t);
butteredQueue.put(t);
}
} catch(InterruptedException e) {
print("Butterer interrupted");
}
print("Butterer off");
}
}
// Apply jam to buttered toast:
class Jammer implements Runnable {
private ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = butteredQueue.take();
t.jam();
print(t);
finishedQueue.put(t);
}
} catch(InterruptedException e) {
print("Jammer interrupted");
}
print("Jammer off");
}
}
// Consume the toast:
class Eater implements Runnable {
private ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = finishedQueue.take();
// Verify that the toast is coming in order,
// and that all pieces are getting jammed:
if(t.getId() != counter++ ||
t.getStatus() != Toast.Status.JAMMED) {
print(">>>> Error: " + t);
System.exit(1);
} else
print("Chomp! " + t);
}
} catch(InterruptedException e) {
print("Eater interrupted");
}
print("Eater off");
}
}
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue, butteredQueue));
exec.execute(new Jammer(butteredQueue, finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
} /* (Execute to see output) *///:~
任务间使用管道进行输入/输出
PipedWriter和PipedReader在引入同步队列前使用此方法实现阻塞队列。
class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() { return out; }
public void run() {
try {
while(true)
for(char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
} catch(IOException e) {
print(e + " Sender write exception");
} catch(InterruptedException e) {
print(e + " Sender sleep interrupted");
}
}
}
class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while(true) {
// Blocks until characters are there:
printnb("Read: " + (char)in.read() + ", ");
}
} catch(IOException e) {
print(e + " Receiver read exception");
}
}
}
public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(4);
exec.shutdownNow();//PipedReader是可中断的
}
} /* Output: (65% match)
Read: A, Read: B, Read: C, Read: D, Read: E, Read: F, Read: G, Read: H, Read: I, Read: J, Read: K, Read: L, Read: M, java.lang.InterruptedException: sleep interrupted Sender sleep interrupted
java.io.InterruptedIOException Receiver read exception
*///:~
死锁
死锁的条件:
1)互斥条件。任务使用的资源中至少有一个是不能共享的。
2)至少有一个任务它必须持有一个资源,而且正在等待获取一个当前被别的任务持有的资源。
3)资源不能被任务强制获取
4)必须有循环等待。一个任务等待其他任务所持有的资源,后者又在等待另一个任务持有的资源。。。。直到最后大家都被锁住。
常用的防止死锁的办法是破坏第4个条件,即循环等待条件。
类库的线程安全
静态的Random对象,一般会有多个任务同时调用Random.nextInt(),是否安全呢。一般JDK文档并没有指明哪些类库是安全的,而Random.nextInt()碰巧是安全的。在并发中使用类库是需小心确认。
新类库中的构件
java.util.concurrent引入了大量涉及用来解决并发问题的新类。
CountDownLatch
被用来同步一个或多个任务,强制他们等待由其他任务执行的一组操作完成。CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
补充资料
CyclicBarrier
CyclicBarrier使用于当你希望一组任务并行地执行工作,然后在进行下一个步骤前等待(在栅栏处等待),直到所有任务完成(有点像join())。和CountDownLatch类型,只是后者只触发一次的事件,而前者可以多次重用。
-
“JUC锁”10之 CyclicBarrier原理和示例
](https://www.cnblogs.com/skywang12345/p/3533995.html) - Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
DelayQueue
这是一个无界的BlockingQueue实现了Delayed接口的类,其中的对象只能在其到期时才能从队列中取走。此队列是有序的,队头对象的延迟到期的事件最长。
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence =
new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
trigger = System.nanoTime() +
NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
public long getDelay(TimeUnit unit) {
return unit.convert(
trigger - System.nanoTime(), NANOSECONDS);
}
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask)arg;
if(trigger < that.trigger) return -1;
if(trigger > that.trigger) return 1;
return 0;
}
public void run() { printnb(this + " "); }
public String toString() {
return String.format("[%1$-4d]", delta) +
" Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
public void run() {
for(DelayedTask pt : sequence) {
printnb(pt.summary() + " ");
}
print();
print(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
public void run() {
try {
while(!Thread.interrupted())
q.take().run(); // Run task with the current thread
} catch(InterruptedException e) {
// Acceptable way to exit
}
print("Finished DelayedTaskConsumer");
}
}
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue =
new DelayQueue<DelayedTask>();
// Fill with tasks that have random delays:
for(int i = 0; i < 20; i++)
queue.put(new DelayedTask(rand.nextInt(5000)));
// Set the stopping point
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
} /* Output:
[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6 (0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000)
[5000] Task 20 Calling shutdownNow()
Finished DelayedTaskConsumer
*///:~
PriorityBlockingQueue
这是一个很基础的优先级队列,具有可阻塞的读取操作。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor的schedule()(运行一次任务)或scheduleAtFixedRate()(周期运行)可以将runnable对象设置为将来的某个时刻执行。
Semaphore
计数信号量允许n个任务同时访问一个带有锁的资源。可以将信号量看作是在向外分发使用资源的许可证。
Exchanger
Exchanger是两个任务之间交换对象的栅栏。
性能调优
一般synchronized方法比使用ReentrantLock快。而次数上升后,synchronized较慢,Lock和Atomic则较稳定。所以Lock比synchronized高效。但synchronized较优雅和容易阅读。
免锁容器
java添加了新的容器,通过更灵活(比synchronized)的技术来消除加锁,从而提高线程安全的性能,这就是免锁容器。
免锁容器背后的策略是:对容器的修改可以与读取操作同时发生,只要读取者只能看到完成修改的结果即可。修改是在容器数据结构的某个部分的一个单独副本上执行的,这个副本在修改过程中是不可视的。只有当修改完成时,被修改的结构才会自动地与主数据结构进行交换,之后读取者才能看到这个最新值。
免锁容器补充
乐观加锁
活动对象
多线程模型来自过程型编程,而对于面向对象编程,可以使用活动对象或行动者替换。活动的意味着每个对象都维护着它自己的工作器线程和消息队列,并且所有对这种对象的请求都将进入队列排队,任何时刻都只能运行其中的一个。有了活动对象后,就可以串化消息而不是方法,也不需要注意任务被中断。
public class ActiveObjectDemo {
private ExecutorService ex =
Executors.newSingleThreadExecutor();
private Random rand = new Random(47);
// Insert a random delay to produce the effect
// of a calculation time:
private void pause(int factor) {
try {
TimeUnit.MILLISECONDS.sleep(
100 + rand.nextInt(factor));
} catch(InterruptedException e) {
print("sleep() interrupted");
}
}
public Future<Integer>
calculateInt(final int x, final int y) {
return ex.submit(new Callable<Integer>() {
public Integer call() {
print("starting " + x + " + " + y);
pause(500);
return x + y;
}
});
}
public Future<Float>
calculateFloat(final float x, final float y) {
return ex.submit(new Callable<Float>() {
public Float call() {
print("starting " + x + " + " + y);
pause(2000);
return x + y;
}
});
}
public void shutdown() { ex.shutdown(); }
public static void main(String[] args) {
ActiveObjectDemo d1 = new ActiveObjectDemo();
// Prevents ConcurrentModificationException:
List<Future<?>> results =
new CopyOnWriteArrayList<Future<?>>();
for(float f = 0.0f; f < 1.0f; f += 0.2f)
results.add(d1.calculateFloat(f, f));
for(int i = 0; i < 5; i++)
results.add(d1.calculateInt(i, i));
print("All asynch calls made");
while(results.size() > 0) {
for(Future<?> f : results)
if(f.isDone()) {
try {
print(f.get());
} catch(Exception e) {
throw new RuntimeException(e);
}
results.remove(f);
}
}
d1.shutdown();
}
}