第十一课 Volatile
例子
public class VolatileTest extends Thread {
/**
* 使用了volatile,则1秒后,子线程会退出循环,因为在主线程将isRunning置位为false
*/
//private volatile boolean isRunning = true;
/**
* 不使用volatile,1秒后,主线程置位isRunning为false,但主线程对isRunning的修改对子线程不可见,子线程看见的还是true,循环继续
*
* 将setRunning方法设置为synchronized也可以达到volatile的效果,意思就是同步代码块保护的变量修改时也会直接刷新到主存
*
*/
private boolean isRunning = true;
public boolean isRunning(){
return isRunning;
}
public void setRunning(boolean isRunning){
this.isRunning= isRunning;
}
public void run(){
System.out.println("进入了run...............");
while (isRunning){}
System.out.println("isUpdated的值被修改为为false,线程将被停止了");
}
public static void main(String[] args) throws InterruptedException {
VolatileTest volatileThread = new VolatileTest();
volatileThread.start();
Thread.sleep(1000);
volatileThread.setRunning(false); //停止线程
}
}
Volatile和原子性没什么直接关系
如果变量被同步代码保护了,就不必考虑volatile
怎么引出这个问题呢
public abstract class IntGenerator {
private volatile boolean canceled = false;
public abstract int next();
// Allow this to be canceled:
public void cancel() { canceled = true; }
public boolean isCanceled() { return canceled; }
}
- 分析
- 看这个类的canceled字段,在这里一个IntGenerator对象可以被多个EventChecker对象调用cancel()
- 这样就在每个EventChecker的线程里,保留了一份对canceled的本地缓存,这个本地缓存可能是每个CPU一个
- 在每个线程里调用修改cenceled的值,首先会保存到本地缓存,然后也会同步到主存里,据说这是规定,必须的
- 但是其他线程通过isCanceled()读取它的值,是从本地缓存读,没被改变,即不可见,它已经看不见主存里的值了
- 所以用volatile来修饰,保证每次对它的修改,都会同步到主存的同时,也会对所有其他线程的内存可见,或者就是保证对于volatile变量,不会在工作内存中拷贝一份,都是在主存中读写
- 整了半天,还是挺麻烦,推荐首选用同步来解决问题,volatile适用于只有一个字段可变的情况
-
摘要
- Volatile是Java提供的一种弱同步机制,当一个变量被声明成volatile类型后编译器不会将该变量的操作与其他内存操作进行重排序。
- 在某些场景下使用volatile代替锁可以减少代码量和使代码更易阅读
-
Volatile的特性
- 可见性:当一条线程对volatile变量进行了修改操作时,其他线程能立即知道修改的值,即当读取一个volatile变量时总是返回最近一次写入的值
- 原子性:对于单个voatile变量其具有原子性(能保证long double类型的变量具有原子性),但对于i ++ 这类复合操作其不具有原子性(见下面分析)
-
Volatile使用的前提
- 对变量的写入操作不依赖变量的当前值,或者能够确保只有单一的线程修改变量的值
- 该变量不会与其他状态变量一起纳入不变性条件中
- 在访问变量时不需要加锁
原理:
原因:Java内存模型(JMM)规定了所有的变量都存储在主内存中,主内存中的变量为共享变量,
而每条线程都有自己的工作内存,线程的工作内存保存了从主内存拷贝的变量,
所有对变量的操作都在自己的工作内存中进行,完成后再刷新到主内存中,
回到例1,第18行号代码主线程(线程main)虽然对isRunning的变量进行了修改且有刷新
回主内存中(《深入理解java虚拟机》中关于主内存与工作内存的交互协议提到变量在工作 内存中改变后必须将该变化同步回主内存
),但volatileThread线程读的仍是自己工作内存
的旧值导致出现多线程的可见性问题,解决办法就是给isRunning变量加上volatile关键字。
- volatile内存语义总结如下
- 当线程对volatile变量进行写操作时,会将修改后的值刷新回主内存
- 当线程对volatile变量进行读操作时,会先将自己工作内存中的变量置为无效,之后再通过主内存拷贝新值到工作内存中使用。
- Synchronized与volatile区别
- volatile只能修饰变量,而synchronized可以修改变量,方法以及代码块
- volatile在多线程中不会存在阻塞问题,synchronized会存在阻塞问题
- volatile能保证数据的可见性,但不能完全保证数据的原子性,synchronized即保证了数据的可见性也保证了原子性
- volatile解决的是变量在多个线程之间的可见性,而sychroized解决的是多个线程之间访问资源的同步性
第十二课 java提供的并发构件
1 CountDownLatch
package com.cowthan.concurrent.c14;
//: concurrency/CountDownLatchDemo.java
import java.util.concurrent.*;
import java.util.*;
// Performs some portion of a task:
class TaskPortion implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private final CountDownLatch latch;
TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
doWork();
latch.countDown();
} catch (InterruptedException ex) {
// Acceptable way to exit
}
}
public void doWork() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
System.out.println(this + "completed");
}
public String toString() {
return String.format("%1$-3d ", id);
}
}
// Waits on the CountDownLatch:
class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;
WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
System.out.println("Latch barrier passed for " + this);
} catch (InterruptedException ex) {
System.out.println(this + " interrupted");
}
}
public String toString() {
return String.format("WaitingTask %1$-3d ", id);
}
}
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// All must share a single CountDownLatch object:
CountDownLatch latch = new CountDownLatch(SIZE);
for (int i = 0; i < 10; i++)
exec.execute(new WaitingTask(latch));
for (int i = 0; i < SIZE; i++)
exec.execute(new TaskPortion(latch));
System.out.println("Launched all tasks");
exec.shutdown(); // Quit when all tasks complete
}
} /* (Execute to see output) */// :~
-
适用于:
- 一组子任务并行执行,另一组任务等待着一组完成才进行,或等待某个条件完成才进行
- 并行执行的任务数,或者等待的这个条件,可以抽象成倒数,倒数到0,则另一组任务就可以继续执行
- 一个任务会被分解成多个子任务x,y,z
- 其中一个子任务B会等待其他几个子任务完成才会继续执行
- 所以提供一个CountDownLatch对象,并设置初始值
- 任务B在CountDownLatch对象上await:latch.await();
- 每完成一个子任务,就在CountDownLatch对象上倒数一次:latch.countDown();
- 直到倒数到0,await的对象就会被唤醒
- 任务B可以有多个
- 一组子任务并行执行,另一组任务等待着一组完成才进行,或等待某个条件完成才进行
-
限制:
- 只能用一次,如果要用多次,参考CyclicBarrier
2 CyclicBarrier
例子
package com.cowthan.concurrent.c14;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) {
barrier = b;
}
public synchronized int getStrides() {
return strides;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
strides += rand.nextInt(3); // Produces 0, 1 or 2
}
barrier.await();
}
} catch (InterruptedException e) {
// A legitimate way to exit
} catch (BrokenBarrierException e) {
// This one we want to know about
throw new RuntimeException(e);
}
}
public String toString() {
return "Horse " + id + " ";
}
public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++)
s.append("*");
s.append(id);
return s.toString();
}
}
class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause) {
barrier = new CyclicBarrier(nHorses, new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++)
s.append("="); // The fence on the racetrack
System.out.println(s);
for (Horse horse : horses)
System.out.println(horse.tracks());
for (Horse horse : horses)
if (horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "won!");
exec.shutdownNow();
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
}
});
for (int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
}
public class CyclicBarrierDemo {
public static void main(String[] args) {
int nHorses = 3; //几匹马
int pause = 200; //等多久走一步
new HorseRace(nHorses, pause);
}
}
-
适用于:
- 某个人物要等待多个任务并行进行,直到都完成,才会执行
- 可以重用
- 不得不说,CyclicBarrier还有点不好理解,看了demo代码还是没整明白
- 怎么是horse在barrier上await呢
- CyclicBarrier构造怎么还得传入必须await的线程个数呢
-
介绍
- 构造:barrier = new CyclicBarrier(n, new Runnable(){})
- 参数1:计数值,当有线程在barrier上await时,计数减一,n个线程都await了,计数就成0了,栅栏动作就会执行
- 参数2:叫做栅栏动作,计数到0时,会自动执行
- 构造:barrier = new CyclicBarrier(n, new Runnable(){})
-
CyclicBarrierDemo讲解:
- 栅栏动作做的事情是:
- 打印线路,打印终点
- 打印每匹马当前的位置
- 判断是否有马走到终点,有则提示夺冠,并结束所有线程(shutdownNow)
- 栅栏动作执行完后,计数又会重置,此时
- 每匹马再向前走一步,距离是随机数
- 走完之后,await一下
- 所有马都走完一步,await倒数计数值又是0了,再激活栅栏动作
- 如此循环
- 所以,栅栏动作等所有子任务都await了,才运行,此时所有子任务都阻塞,子任务等栅栏动作完成,计数自动重置,再被唤醒
- 栅栏动作做的事情是:
-
总结:
- 构造时,传入计数值和栅栏动作
- 计数值减一操作由子任务的await完成
- 栅栏动作在计数值为0时激活,并且运行完会自动重置计数值,并唤醒await的线程们
- 更多:
- 思考:如果没有CyclicBarrier,仿真赛马你会怎么实现?
- 你的实现会考虑起始和终结的情况吗?宣布夺冠之后,所有的马都能立即停止前进吗?统计开始和结束时,所有马的状态保持前后一致吗?
- 提示1:CyclicBarrier的子任务,会在await上等待栅栏动作的结束,并且await是可以被interrupt的
- 提示2:赛场统计是由栅栏动作完成的,此动作会在每一批马都前进一步之后,所有马都await,栅栏动作开始统计