线程问题
线程出现问题的根本原因是因为线程上下文切换,导致线程里的指令没有执行完就切换执行其它线程了,
举例
★
t1和t2线程分别并行执行5000次++操作和--操作,理论上结果应该等于0。
”
代码模拟
static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
for (int i = 1;i<5000;i++){
count++;
}
});
Thread t2 =new Thread(()->{
for (int i = 1;i<5000;i++){
count--;
}
});
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("count的值是{}",count);
}
实际count的值有正有负,分析i++与i--操作的字节码
getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
iadd // 自增
putstatic i // 将修改后的值存入静态变量i
getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
isub // 自减
putstatic i // 将修改后的值存入静态变量i
可以看到count++ 和 count-- 操作实际都是需要这个4个指令完成的,那么这里问题就来了!Java 的内存模型如下,完成静态变量的自增,自减需要在主存和工作内存中进行数据交换:
临界区
一个程序运行多线程本身是没有问题的
问题出现在多个线程共享资源的时候
多个线程同时对共享资源进行读操作本身也没有问题
问题出现在对对共享资源同时进行读写操作时就有问题了
先定义一个叫做临界区的概念:一段代码内如果存在对共享资源的多线程读写操作,那么称这段代码为临界区
如static int counter = 0; static void increment()
{// 临界区 counter++;
} static void decrement()
{// 临界区 counter--;
}
竞态条件
多个线程在临界区执行,那么由于代码指令的执行不确定(线程上下文切换)而导致的结果问题,称为竞态条件
synchronized 解决方案
为了避免临界区中的竞态条件发生,由多种手段可以达到
- 阻塞式解决方案:synchronized ,Lock
- 非阻塞式解决方案:原子变量
现在讨论使用synchronized来进行解决,即俗称的对象锁,它采用互斥的方式让同一时刻至多只有一个线程持有对象锁,其他线程如果想获取这个锁就会阻塞住,这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换
★
注意 虽然 java 中互斥和同步都可以采用 synchronized 关键字来完成,但它们还是有区别的:
互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区的代码
同步是由于线程执行的先后,顺序不同但是需要一个线程等待其它线程运行到某个点。
”
synchronized
synchronized(对象) // 线程1获得锁, 那么线程2的状态是(blocked)
{
临界区
}
改进的代码
@Slf4j
public class Test03 {
//定义锁对象
final private static Object lock = new Object();
static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 1; i < 5000; i++) {
//临界区加锁
synchronized (lock) {
count++;
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 1; i < 5000; i++) {
//临界区加锁
synchronized (lock) {
count--;
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("count的值是{}", count);
}
}
synchronized原理
synchronized实际上利用对象保证了临界区代码的原子性,临界区内的代码在外界看来是不可分割的,不会被线程切换所打断。
小结
关注点【锁对象】,【原子性】
面向对象思想优化
再次改进的代码
@Slf4j
public class Test03 {
public static void main(String[] args) throws InterruptedException {
Room room = new Room();
Thread t1 = new Thread(() -> {
for (int i = 1; i < 5000; i++) {
synchronized (room) {
room.increment();
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 1; i < 5000; i++) {
room.decrease();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("count的值是{}", room.getCounter());
}
}
class Room {
private Integer counter = 0;
public void increment() {
//临界区加锁
synchronized (this) {
counter++;
}
}
public void decrease() {
//临界区加锁
synchronized (this) {
counter--;
}
}
public Integer getCounter() {
synchronized (this) {
//避免读取到中间值,加锁,因为有join方法,不加也没问题
return counter;
}
}
}
变量的线程安全分析
成员变量和静态变量的线程安全分析
如果没有变量没有在线程间共享,那么变量是安全的
如果变量在线程间共享
如果只有读操作,则线程安全
如果有读写操作,则这段代码是临界区,需要考虑线程安全
public class Test04 {
ArrayList<Integer> arrayList=new ArrayList<>(); //成员变量
public void method1(int n){
for (int i = 0; i < n; i++) {
add();
remove();
}
}
public void add(){
arrayList.add(1); //读写操作
}
public void remove(){
arrayList.remove(0); //读写操作
}
}
arrayList对象被两线程共享,执行读写操作会出现线程安全问题。
局部变量线程安全分析
局部变量【局部变量被初始化为基本数据类型】是安全的public void test1(){
//局部变量 int类型变量i int i = 10;
i++; //读写操作 } 从图中可以看到局部变量i并未被两线程共享。局部变量引用的对象未必是安全的
如果局部变量引用的对象没有引用线程共享的对象,那么是线程安全的上述例子改善后的代码****public void method1(int n){
ArrayList<Integer> List=new ArrayList<>(); //局部变量 for (int i = 0; i < n; i++) {
add(List);
remove(List);
}
} public void add(ArrayList<Integer> arrayList){
arrayList.add(1); //读写操作 } public void remove(ArrayList<Integer> arrayList){
arrayList.remove(0); //读写操作 }如果局部变量引用的对象引用了一个线程共享的对象,那么要考虑线程安全的class Sub extends Test04 {
@Override public void add(ArrayList<Integer> arrayList) {
Thread sub = new Thread(() -> arrayList.add(1));
sub.start();
}
} ★如果子类创建了新的线程重写了父类的方法,那么又会造成线程安全问题,子类创建的线程会共享其中一个线程的局部变量,为避免此类问题,可加final或private关键字修饰,使得该方法不会被继承或访问。”常见线程安全类★注意:这里说它们是线程安全的是指,多个线程调用它们<!!!同一个实例>的某个方法时,是线程安全的。也可以理解为它们的每个方法是原子的,即每个方法都加了synchronized关键字,但是组合在一起线程就不安全了。”Hashtable table = newHashtable(); new Thread(()->{
table.put("key", "value1");
}).start(); //安全 new Thread(()->{
table.put("key", "value2");
}).start(); //安全 线程安全类方法的组合注意多个线程调用同一实例(table)的读写方法的组合(get方法和put方法组合)不是原子的Hashtable table = newHashtable(); // 线程1,线程2 if( table.get("key") == null) {
table.put("key", value);
} 结论:输出table中的key值,结果可能是v1也可能是v2,组合方法导致线程不安全。String
Integer
StringBuffer
Random
Vector
Hashtable
java.util.concurrent 包下的类
不可变类的线程安全
String和Integer类都是不可变的类,因为其类内部状态是不可改变的,因此它们的方法都是线程安全的,有同
学或许有疑问,String 有 replace,substring 等方法【可以】改变值啊,其实调用这些方法返回的已经是
一个新创建的对象了!
习题一
★
方法中有多个实例对象的临界区锁住this无效。
”
@Slf4j
public class Test05 {
public static void main(String[] args) throws InterruptedException {
Account A = new Account(1000);
Account B = new Account(1000);
Thread T1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
A.transferMoney(B, randomMoney());
}
});
Thread T2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
B.transferMoney(A, randomMoney());
}
});
T1.start();
T2.start();
T1.join();
T2.join();
log.debug("总钱数为{}", A.getMoney() + B.getMoney());
}
static Random random = new Random();
static public int randomMoney() {
return random.nextInt(100) + 1;
}
}
class Account {
private int money;
public Account(int money) {
this.money = money;
}
public int getMoney() {
return money;
}
public Account() {
}
public void setMoney(int money) {
this.money = money;
}
public void transferMoney(Account target, int money) {
if (this.money >= money){
this.setMoney(this.getMoney()-money);
target.setMoney(target.getMoney() + money);
}
}
}
★分析”
线程t1和t2共享Account类中的成员变量money,且调用方法中包含读写操作(临界区),因此会有线程安全问题,但
是在transferMoney方法中添加synchronized关键字相当于
public void transferMoney(Account target, int money) {
synchronized (this){
if (this.money >= money){
this.setMoney(this.getMoney()-money);
target.setMoney(target.getMoney() + money);
}
}}
此时线程并不安全,因为target对象的money变量并未加锁,t1和t2读写操作还仍然可以访问并修改target对象的成员
变量money,造成线程不安全。
public void transferMoney(Account target, int money) {
synchronized (Account.class){
if (this.money >= money){
this.setMoney(this.getMoney()-money);
target.setMoney(target.getMoney() + money);
}
}}
注意:
此次可以采取锁住类对象解决,target和this都是类对象的实例,因此两个对象中的成员变量都会被锁住,线程安全,
但是如果有多个无关实例对象也会被同时锁住,处于阻塞状态,就会导致效率低下。
Monitor 概念
Java 对象头
- Mark Word 包含对象hashcode值,年龄代,锁状态等信息
- Klass Word 包含对象的类型信息
以 32 位虚拟机为例,普通对象的对象头结构如下,其中的Klass Word为指针,指向对应的Class对象;
数组对象
其中 Mark Word 结构为
1583651590160
所以一个对象的结构如下:
1583678624634
原理
Monitor被翻译为监视器或者说管程
每个java对象都可以关联一个Monitor,如果使用synchronized给对象上锁(重量级),该对象头的Mark Word中就被设置为指向Monitor对象的指针
- 刚开始时Monitor中的Owner为null
- 当Thread-2 执行synchronized(obj){}代码时就会将Monitor的所有者Owner 设置为 Thread-2,上锁成功,Monitor中同一时刻只能有一个Owner
- 当Thread-2 占据锁时,如果线程Thread-3,Thread-4也来执行synchronized(obj){}代码,就会进入EntryList中变成BLOCKED状态
- Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争时是非公平的
- 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲wait-notify 时会分析
★
注意:synchronized 必须是进入同一个对象的 monitor 才有上述的效果不加 synchronized 的对象不会关联监视器,不遵从以上规则
”
synchronized原理
static final Object lock=new Object();
static int counter = 0;
public static void main(String[] args) {
synchronized (lock) {
counter++;
}
}
反编译后的部分字节码
0 getstatic #2 <com/concurrent/test/Test17.lock>
# 取得lock的引用(synchronized开始了)
3 dup
# 复制操作数栈栈顶的值放入栈顶,即复制了一份lock的引用
4 astore_1
# 操作数栈栈顶的值弹出,即将lock的引用存到局部变量表中
5 monitorenter
# 将lock对象的Mark Word置为指向Monitor指针
6 getstatic #3 <com/concurrent/test/Test17.counter>
9 iconst_1
10 iadd
11 putstatic #3 <com/concurrent/test/Test17.counter>
14 aload_1
# 从局部变量表中取得lock的引用,放入操作数栈栈顶
15 monitorexit
# 将lock对象的Mark Word重置,唤醒EntryList
16 goto 24 (+8)
# 下面是异常处理指令,可以看到,如果出现异常,也能自动地释放锁
19 astore_2
20 aload_1
21 monitorexit
22 aload_2
23 athrow
24 return
★
注意:方法级别的 synchronized 不会在字节码指令中有所体现
”
重量级锁优化
轻量级锁
轻量级锁的使用场景是:如果一个对象虽然有多个线程要对它进行加锁,但是加锁的时间是错开的(也就是没有人可以竞争的),那么可以使用轻量级锁来进行优化。轻量级锁对使用者是透明的,即语法仍然是synchronized,假设有两个方法同步块,利用同一个对象加锁
static final Object obj = new Object();
public static void method1() {
synchronized( obj ) {
// 同步块 A
method2();
}
}
public static void method2() {
synchronized( obj ) {
// 同步块 B
}
}
每次指向到synchronized代码块时,都会创建锁记录(Lock Record)对象,每个线程都会包括一个锁记录的结构,锁记录内部可以储存对象的Mark Word和对象引用reference
1583755737580
让锁记录中的Object reference指向对象,并且尝试用cas(compare and sweep)替换Object对象的Mark Word ,将Mark Word 的值存入锁记录中
1583755888236
如果cas替换成功,那么对象的对象头储存的就是锁记录的地址和状态01,如下所示
1583755964276
如果cas失败,有两种情况
1583756190177
如果是其它线程已经持有了该Object的轻量级锁,那么表示有竞争,将进入锁膨胀阶段
如果是自己的线程已经执行了synchronized进行加锁,那么那么再添加一条 Lock Record 作为重入的计数
当线程退出synchronized代码块的时候,**如果获取的是取值为 null 的锁记录 **,表示有重入,这时重置锁记录,表示重入计数减一
1583756357835
当线程退出synchronized代码块的时候,如果获取的锁记录取值不为 null,那么使用cas将Mark Word的值恢复给对象
成功则解锁成功
失败,则说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
锁膨胀
如果在尝试加轻量级锁的过程中,cas操作无法成功,这是有一种情况就是其它线程已经为这个对象加上了轻量级锁,这是就要进行锁膨胀,将轻量级锁变成重量级锁。
当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁
1583757433691
这时 Thread-1 加轻量级锁失败,进入锁膨胀流程
即为对象申请Monitor锁,让Object指向重量级锁地址,然后自己进入Monitor 的EntryList 变成BLOCKED状态
1583757586447
当Thread-0 推出synchronized同步块时,使用cas将Mark Word的值恢复给对象头,失败,那么会进入重量级锁的解锁过程,即按照Monitor的地址找到Monitor对象,将Owner设置为null,唤醒EntryList 中的Thread-1线程
自旋优化
重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即在自旋的时候持锁的线程释放了锁),那么当前线程就可以不用进行上下文切换就获得了锁
自旋重试成功的情况
1583758113724
自旋重试失败的情况,自旋了一定次数还是没有等到持锁的线程释放锁
1583758136650
自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。
智能控制自选可能性
★
在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;
反之,就少自旋甚至不自旋,总之,比较智能。Java 7 之后不能控制是否开启自旋功能
”
偏向锁
在轻量级的锁中,我们可以发现,如果同一个线程对同一个2对象进行重入锁时,也需要执行CAS操作,这是有点耗时
滴,那么Java6开始引入了偏向锁概念,只有第一次使用CAS时将对象的Mark Word头设置为入锁线程ID(操作系统提供)
之后这个入锁线程再进行重入锁时,发现线程ID是自己的,那么就不用再进行CAS了
1583760728806
偏向状态
1583762169169
一个对象的创建过程
如果开启了偏向锁(默认是开启的),那么对象刚创建之后,Mark Word 最后三位的值101,并且这是它的Thread,epoch,age都是0,在加锁的时候进行设置这些的值.
偏向锁默认是延迟的,不会在程序启动的时候立刻生效,如果想避免延迟,可以添加虚拟机参数来禁用延迟:-XX:BiasedLockingStartupDelay=0来禁用延迟
注意:处于偏向锁的对象解锁后,线程 ID(不能用Java方法获取是操作系统提供的)仍存储于对象头中
加上虚拟机参数-XX:BiasedLockingStartupDelay=0进行测试
输出结果如下,三次输出的状态码都为101
public static void main(String[] args) throws InterruptedException {
Test1 t = new Test1();
test.parseObjectHeader(getObjectHeader(t));
synchronized(t){
test.parseObjectHeader(getObjectHeader(t));
}
test.parseObjectHeader(getObjectHeader(t));
} biasedLockFlag (1bit): 1
LockFlag (2bit): 01
biasedLockFlag (1bit): 1
LockFlag (2bit): 01
biasedLockFlag (1bit): 1
LockFlag (2bit): 01
禁用偏向锁
测试禁用:如果没有开启偏向锁,那么对象创建后最后三位的值为001,这时候它的hashcode,age都为0,hashcode
是第一次用到hashcode时才赋值的。在上面测试代码运行时<typo id="typo-11993" data-origin="在" ignoretag="true">在</typo>添加 VM 参数-XX:-UseBiasedLocking禁用偏向锁
(禁用偏向锁则优先使用轻量级锁),退出synchronized状态变回001
- 虚拟机参数-XX:-UseBiasedLocking
- 输出结果如下,最开始状态为001,然后加轻量级锁变成00,最后恢复成001biasedLockFlag (1bit): 0
LockFlag (2bit): 01
LockFlag (2bit): 00
biasedLockFlag (1bit): 0
LockFlag (2bit): 01
撤销偏向锁-【锁对象调用hashcode方法】
测试 hashCode:当调用对象的hashcode方法的时候就会撤销这个对象的偏向锁,因为使用偏向锁时没有位置存
hashcode的值了。
仅限于偏向锁
- 轻量级锁会将hashcode值
-
测试代码如下,使用虚拟机参数-XX:BiasedLockingStartupDelay=0 ,确保我们的程序最开始使用了偏向锁!但是结果显示程序还是使用了轻量级锁。 public static void main(String[] args) throws InterruptedException {
Test1 t = new Test1();
t.hashCode();
test.parseObjectHeader(getObjectHeader(t));**synchronized** (t){ test.parseObjectHeader(getObjectHeader(t)); } test.parseObjectHeader(getObjectHeader(t)); }
输出结果biasedLockFlag (1bit): 0
LockFlag (2bit): 01
LockFlag (2bit): 00
biasedLockFlag (1bit): 0
LockFlag (2bit): 01
撤销偏向锁-【其它线程使用该锁对象】
这里我们演示的是偏向锁撤销变成轻量级锁的过程,那么就得满足轻量级锁的使用条件,就是没有线程对同一个对象进行锁竞争,我们使用wait 和 notify 来辅助实现
- 代码,虚拟机参数-XX:BiasedLockingStartupDelay=0确保我们的程序最开始使用了偏向锁!
- 输出结果,最开始使用的是偏向锁,但是第二个线程尝试获取对象锁时,发现本来对象偏向的是线程一,那么偏向锁就会失效,加的就是轻量级锁biasedLockFlag (1bit): 1
LockFlag (2bit): 01
biasedLockFlag (1bit): 1
LockFlag (2bit): 01
biasedLockFlag (1bit): 1
LockFlag (2bit): 01
biasedLockFlag (1bit): 1
LockFlag (2bit): 01
LockFlag (2bit): 00
biasedLockFlag (1bit): 0
LockFlag (2bit): 01
撤销 - 调用 wait/notify
会使对象的锁变成重量级锁,因为wait/notify方法之后重量级锁才支持
批量重偏向
如果对象被多个线程访问,但是没有竞争,这时候偏向了线程一的对象仍有机会重新偏向线程二,会重置其的线程ID。
重新偏向条件:当撤销偏向锁 >= 20次时
当撤销偏向锁阈值超过40次后,该类中所有的实例对象都会变为不可偏向的。
批量撤销偏向条件:当撤销偏向锁 >= 40次时
锁消除
Java运行时有JIT即时编译器,会对Java的字节码进行进一步优化,如对热点代码进行优化,在优化过程中也会分析变量
是否会被共享,如果不可能被共享,就不会执行 synchronized 关键字
总结
Java6开始,锁根据不同情况进行了优化,偏向锁(默认延时)--> 轻量级锁 --> (锁自旋) --> 重量级锁
wait和notify
- Owener发现条件不满足,调用wait方法,即可进入WitSet变为WAITING状态
- BLOCKERD和WAITING状态的线程都处于堵塞状态,不占用CPU时间片
- BLOCKED线程会在Owner线程释放锁时唤醒
- WAITING线程会在Owner线程调用notify或notifyAll时唤醒,但唤醒后并不意味着立刻获得锁,仍需要进入EntryList重新竞争锁
API介绍
- obj.wait()让进入Object监视器的线程到waitSet等待,如果不唤醒一直等待
- obj.wait(long timeout) 让进入Object监视器的线程到waitSet等待,即使不唤醒超时也会被唤醒
- obj.notify()在Object上正在waitSet等待的线程中挑一个线程唤醒
- obj.notifyAll()让Object上正在waitSet等待的线程的所有线程全部唤醒
前提:这些都是线程之间进行协作的手段,都属于Object对象的方法,调用前必须获得此对象的锁。
正确使用方法
final static Object lock = new Object(); //final避免修改
synchronized (lock){
while(条件不成立){ //所有线程都唤醒后但条件仍可能不成立,while循环可以再次等待
lock.wait();
}
//继续干活
}
synchronized (lock){
lock.notifyAll(); //避免唤醒错误线程
}
对比sleep(long n) 和 wait(long n)
- sleep是Thread方法,而是Object的方法
- sleep不需要强制和synchronized配合使用,但wait需要
- sleep在睡眠的同时不会释放对象锁,但wait在等待时候会释放对象锁
- 线程的状态都是TIME_WAIT
同步模式之保护性暂停
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果,要点:
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
1594473284105
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
GuardedObject guardedObject = new GuardedObject();
new Thread("t1"){
@Override
public void run() {
log.debug("等待结果...");
Object o = guardedObject.get(2000);
log.debug("获得了结果{}",o);
}
}.start();
Thread.sleep(1000);
log.debug("执行下载");
// guardedObject.complete(new Object());
guardedObject.complete(null); //虚假唤醒,唤醒线程但条件仍不满足
}
}
class GuardedObject {
// 结果
private Object response;
//获取结果
//timeout表示要等待多久
public synchronized Object get(long timeout) {
long begin = System.currentTimeMillis();
long passedTime = 0;
while (response == null) {
long waitTime = timeout - passedTime;
if (waitTime <= 0)
break;
else {
try {
this.wait(waitTime);//参数为waitTime,避免虚假唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
//求得经历时间
passedTime = System.currentTimeMillis() - begin;
}
}
return response;
}
//产生结果
public synchronized void complete(Object response){
this.response =response;
this.notifyAll();
}
}
join方法
关于超时的增强,在join(long millis) 的源码中得到了体现:
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) { //join()底层就是调用了wait(0)但是不是无限等待而是等待线程代码执行结束
while (isAlive()) {
wait(0);
}
} else {
// join一个指定的时间
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
多任务版 GuardedObject
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),
左侧的 t0,t2,t4 就好比等待邮件的【居民】,
右侧的 t1,t3,t5 就好比【邮递员】
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,
因此设计一个用来解耦的中间类(信箱),这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。
和生产者消费者模式的区别就是:这个生产者和消费者之间是一一对应的关系,但是生产者消费者模式并不是。rpc框
架的调用中就使用到了这种模式。
1594518049426
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
Thread.sleep(1000);
for (Integer id : Mailbox.getIds()) {
new Postman(id,"内容"+id).start();
}
}
}
@Slf4j
class Postman extends Thread{
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailbox.getGuardedObject(id);
log.debug("开始送信...,id为{},内容为{}",id,mail);
guardedObject.complete(mail);
}
}
@Slf4j
class People extends Thread {
@Override
public void run() {
//收信
GuardedObject guardedObject = Mailbox.createdObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到id为{},结果是{}",guardedObject.getId(),mail);
}
}
class Mailbox {
private static Hashtable<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
private synchronized static int generatedId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createdObject() {
GuardedObject object = new GuardedObject(generatedId());
boxes.put(object.getId(), object);
return object;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
class GuardedObject {
private int id;
public int getId() {
return id;
}
public GuardedObject(int id) {
this.id = id;
}
// 结果
private Object mail;
//获取结果
//timeout表示要等待多久
public synchronized Object get(long timeout) {
long begin = System.currentTimeMillis();
long passedTime = 0;
while (mail == null) {
long waitTime = timeout - passedTime;
if (waitTime <= 0)
break;
else {
try {
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
//求得经历时间
passedTime = System.currentTimeMillis() - begin;
}
}
return mail;
}
//产生结果
public synchronized void complete(Object mail){
this.mail =mail;
this.notifyAll();
}
}
异步模式之生产/消费者
要点
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
“异步”的意思就是生产者产生消息之后消息没有被立刻消费,而“同步模式”中,消息在产生之后被立刻消费了。
1594524622020
注意:我们写一个线程间通信的消息队列,要注意区别,像rabbit mq等消息框架是进程间通信的。
代码实现
@Slf4j
public class Main {
public static void main(String[] args) {
//创建容量为2的双向链表
MessageQueue messageQueue = new MessageQueue(2);
for (int i = 1; i <= 3; i++) {
int id = i;
new Thread(() -> {
while (true) {
log.debug(id + "号生产者开始生产了...");
messageQueue.put(new Message(id, "内容" + id));
}
}).start();
}
new Thread(() -> {
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("消费开始消费了...");
Message message = messageQueue.take();
log.debug("消费的id为{}内容为{}",message.getId(),message.getValue());
}
}).start();
}
}
@Slf4j
class MessageQueue {
private LinkedList<Message> list = new LinkedList();
private int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
//获取消息
public Message take() {
synchronized (list) {
while (list.isEmpty()) {
try {
log.debug("队列为空,消费者线程开始阻塞...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从头部取消息
Message message = list.removeFirst();
//通知生产者可以生产了
log.debug("生产者可以生产了");
list.notifyAll();
return message;
}
}
//存入消息
public void put(Message message) {
synchronized (list) {
while (list.size() == capcity) {
try {
log.debug("队列已满,生产者线程开始阻塞...");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从队列尾部加入
list.addLast(message);
//通知消费者可以消费了
log.debug("消费者可以消费了");
list.notifyAll();
}
}
}
final class Message {
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
public int getId() {
return id;
}
}
park & unpack
基本使用
它们是 LockSupport 类中的方法
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
log.debug("begin...");
log.debug("park...");
LockSupport.park();
log.debug("resume...");
},"t1");
t1.start();
Thread.sleep(1000);
log.debug("unpark...");
LockSupport.unpark(t1); //t1线程解锁
}
20:59:18.776 [t1] DEBUG com.Long.Park.Park - begin...
20:59:18.791 [t1] DEBUG com.Long.Park.Park - park...
20:59:19.789 [main] DEBUG com.Long.Park.Park - unpark...
20:59:19.789 [t1] DEBUG com.Long.Park.Park - resume...
特点
与Object的wait & notify 相比
- wait,notify 和 notifyAll 必须配合Object Monitor 一起使用,而 park & unpark 不必
- park & unpark 是以线程为单位 【阻塞】和【唤醒】线程,而notify 只能随机唤醒一个等待线程,而 notifyAll 是唤醒所有等待线程,就不那么精确。
- park & unpark 可以先unpark,而wait & notify不能先 notify 和 notifyAll
原理
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter, _cond和 _mutex
打个比喻线程就像一个旅人,Parker 就像他随身携带的背包,条件变量 _ cond就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
调用 park 就是要看需不需要停下来歇息
如果备用干粮耗尽,那么钻进帐篷歇息
如果备用干粮充足,那么不需停留,继续前进
调用 unpark,就好比令干粮充足
因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
如果这时线程还在帐篷,就唤醒让他继续前进
如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
先调用park再调用upark的过程
1.先调用park
- 当前线程调用 Unsafe.park() 方法
- 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁(mutex对象有个等待队列 _cond)
- 线程进入 _cond 条件变量阻塞
- 设置 _counter = 0
1594531894163
2.调用upark
- 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
- 唤醒 _cond 条件变量中的 Thread_0
- Thread_0 恢复运行
- 设置 _counter 为 0
1594532057205
先调用upark再调用park的过程
- 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
- 当前线程调用 Unsafe.park() 方法
- 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
- 设置 _counter 为 0
1594532135616
线程状态转换
img
①NEW = = > RUNNABLE
- 调用线程的start方法时 NEW = = > RUNNABLE
②RUNNABLE < = = > WAITING
线程用synchronized(obj)获取了对象锁后
竞争锁成功,t 线程从WAITING = = >RUNNABLE
竞争锁失败,t 线程从WAITING = = > BLOCKED
调用obj.wait()方法时,t 线程从RUNNABLE = = > WAITING
调用obj.notify(),obj.notifyAll(),t.interrupt()时
③RUNNABLE < = = > WAITING
- 线程调用t.join()方法时(当前线程方法内在t线程对象的监视器上等待) RUNNABLE = = > WAITING
- t线程运行结束或调用当前线程的interrupt()方法时 WAITING = = > RUNNABLE
④RUNNABLE < = = > WAITING
- 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE = = > WAITING
- 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING = = > RUNNABLE
⑤RUNNABLE < = = > TIMED_WAITING
t 线程用 synchronized(obj) 获取了对象锁后
调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE = = > TIMED_WAITING
t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
竞争锁成功,t 线程从 TIMED_WAITING = = > RUNNABLE
竞争锁失败,t 线程从 TIMED_WAITING = = > BLOCKED
⑥RUNNABLE < = = > TIMED_WAITING
- 当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE = = >TIMED_WAITING注意是当前线程在t 线程对象的监视器上等待
- 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从TIMED_WAITING = = > RUNNABLE
⑦RUNNABLE < = = > TIMED_WAITING
- 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE = = > TIMED_WAITING
- 当前线程等待时间超过了 n 毫秒或调用了线程的interrupt() ,当前线程从 TIMED_WAITING = = > RUNNABLE
⑧RUNNABLE < = = > TIMED_WAITING
- 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(longmillis) 时,当前线程从 RUNNABLE = = > TIMED_WAITING
- 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从TIMED_WAITING = = > RUNNABLE
⑨RUNNABLE < = = > BLOCKED
- t线程用synchronized(obj)获取了对象锁如果竞争失败,RUNNABLE = = > BLOCKED
- 持obj锁线程的同步代码块执行完毕,会唤醒该对象上所有BLOCKED的线程重新竞争,如果其中t线程竞争成功从BLOCKED = = > RUNNABLE,其他失败的线程仍然是BLOCKED
⑩RUNNABLE < = = > TIMED_WAITING
当前线程所有代码运行完毕,进入TIMED_WAITING
多把锁【增加并发度】
只使用一把锁的情况下,不相干的任务也会等待,比如一个线程睡觉和一个线程学习
这种情况下,睡觉和学习不应该只是一把锁,应该分开,两者便可并发执行。
但使用多把锁既有好处,也有坏处
- 优点可以增加并发度,即不相关的任务可以并行
- 缺点会造成死锁等问题
@Slf4j
public class MultiLock {
private final Object studyLock = new Object(); //学习锁
private final Object sleepLock = new Object(); //睡觉锁
public void study() throws InterruptedException {
synchronized (studyLock){
log.debug(Thread.currentThread().getName()+"在学习!");
Thread.sleep(2000);
log.debug("学习结束!");
}
}
public void sleep() throws InterruptedException {
synchronized (sleepLock){
log.debug(Thread.currentThread().getName()+"在睡觉!");
Thread.sleep(1000);
log.debug("睡觉结束!");
}
}
}
class Test{
public static void main(String[] args) {
MultiLock multiLock = new MultiLock();
new Thread(()->{
try {
multiLock.study();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
multiLock.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
活跃性
线程没有按预期结束,执行不下去的情况,归类为【活跃性】问题,除了死锁以外,还有活锁和饥饿者两种情况
死锁
一个线程在占用一个锁的同时需要获取多把锁,但这些锁被其他线程占用,其他线程也需要该线程占用的锁,就会陷入死循环,无限等待,造成死锁问题,
- t1线程需要获取A对象锁,接下来要获取B对象锁,但被t2线程占用
- t2线程需要获取B对象锁,接下来要获取A对象锁,但被t1线程占用
检测方法
- 检测死锁可以使用 jconsole工具;
- 使用 jps 定位进程 id,再用 jstack 定位死锁
哲学家就餐问题
1594553609905
有五位哲学家,围坐在圆桌旁。他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。
吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。如果筷子被身边的人拿着,自己就得等待
代码实现
public class Main {
public static void main(String[] args) {
chopstick c1 = new chopstick("c1");
chopstick c2 = new chopstick("c2");
chopstick c3 = new chopstick("c3");
chopstick c4 = new chopstick("c4");
chopstick c5 = new chopstick("c5");
new philosopher("阿基米德",c1,c2).start();
new philosopher("苏格拉底",c2,c3).start();
new philosopher("柏拉图",c3,c4).start();
new philosopher("黑格尔",c4,c5).start();
new philosopher("亚里士多德",c5,c1).start();
}
}
class chopstick{
private String name;
public chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "chopstick{" +
"name='" + name + '\'' +
'}';
}
}
@Slf4j
class philosopher extends Thread{
private chopstick left;
private chopstick right;
public philosopher(String name,chopstick left,chopstick right){
super(name);
this.left=left;
this.right=right;
}
@Override
public void run() {
while (true) {
synchronized (left){
synchronized (right){
eat();
}
}
}
}
private void eat() {
log.debug(Thread.currentThread().getName()+"开始吃饭了");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
★
当每个哲学家即线程持有一根筷子时,他们都在等待另一个线程释放锁,因此造成了死锁。
”
活锁
出现在两个线程互相改变对方的结束条件(二者条件互斥),最后谁也无法结束。
结束条件count<20, 初始 count =25
- t1线程执行count++操作
- t2线程并行执行 count-- 操作
解决办法:
设置随机的睡眠时间,避免类似加减这样的操作一起执行
饥饿
很多教程中把饥饿定义为,一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,饥饿的情况不易演
示,讲读写锁时会涉及饥饿问题。
一个线程饥饿的例子:
先来看看使用顺序加锁的方式解决之前的死锁问题,就是两个线程对两个不同的对象加锁的时候都使用相同的顺序进行
加锁。但是这样又会产生饥饿问题
1594558469826
顺序加锁的解决方案(会造成饥饿问题,其中一个线程会一直执行,其他线程执行的机会大大减少)
ReentrantLock (JUC下的类)
相对于 synchronized 它具备如下特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量,即<typo id="typo-29757" data-origin="对与" ignoretag="true">对与</typo>不满足条件的线程可以放到不同的集合中等待
与 synchronized 一样,都支持可重入
基本语法
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可重入
可重入是指同一个线程如果首次获得了这把锁,那么它就是这把锁的拥有者,因此有权利再次获取这把锁
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
可打断
锁超时
使用锁超时解决哲学家就餐死锁问题:
公平锁
synchronized锁中,在entrylist等待的锁在竞争时不是按照先到先得来获取锁的,所以说synchronized锁时不公平的;ReentranLock锁默认是不公平的,但是可以通过设置实现公平锁。本意是为了解决之前提到的饥饿问题,但是公平锁一般没有必要,会降低并发度,使用trylock也可以实现。
条件变量(休息室)
synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待 ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比
- synchronized 是那些不满足条件的线程都在一间休息室等消息
- 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤 醒
使用要点:
- await 前需要获得锁
- await 执行后,会释放锁,进入 conditionObject 等待
- await 的线程被唤醒(或打断、或超时)<typo id="typo-30493" data-origin="取" ignoretag="true">取</typo>重新竞争 lock 锁,执行唤醒的线程<typo id="typo-30513" data-origin="爷" ignoretag="true">爷</typo>必须先获得锁
- 竞争 lock 锁成功后,从 await 后继续执行
同步模式之顺序控制
固定运行顺序,比如,必须先 2 后 1 打印
wait notify 版
Park Unpark 版
Lock 条件变量版
交替输出,线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现
wait notify 版
Lock 条件变量版
Park Unpark 版
本章小结
本章我们需要重点掌握的是
分析多线程访问共享资源时,哪些代码片段属于临界区
使用 synchronized 互斥解决临界区的线程安全问题
掌握 synchronized 锁对象语法
掌握 synchronzied 加载成员方法和静态方法语法
掌握 wait/notify 同步方法
使用 lock 互斥解决临界区的线程安全问题 掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量
学会分析变量的线程安全性、掌握常见线程安全类的使用
了解线程活跃性问题:死锁、活锁、饥饿
应用方面
互斥:使用 synchronized 或 Lock 达到共享资源互斥效果,实现原子性效果,保证线程安全。
同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果。
原理方面
monitor、synchronized 、wait/notify 原理
synchronized 进阶原理
park & unpark 原理
模式方面
同步模式之保护性暂停
异步模式之生产者消费者
同步模式之顺序控制