创建线程的方式
创建线程主要有三种方法 : 继承Thread类,实现Runnable接口,实现Callable接口。
方法一,直接使用Thread
//创建线程
Thread t = new Thread(){
@Override
public void run(){
System.out.println("======");
}
};
t.setName("yi");
//启动线程
t.start();
t.start()
调用后才能和操作系统关联起来
方法二,使用 Runnable 配合Thread
Runnable runnable = () -> {
System.out.println("---------------");
System.out.println("12");
System.out.println("12");
System.out.println("12");
System.out.println("12");
};
Thread t1 = new Thread(runnable);
t1.start();
方法三,实现Callable接口
这种方式创建线程是可以获得子线程返回结果的,是Runnable的加强版。
public class ThreadThree implements Callable<Long> {
@Override
public Long call() throws Exception {
System.out.println("thread three");
return 23L;
}
}
常用方法
方法名 | static | 功能说明 | 注意 |
---|---|---|---|
start() | 启动一个新线程,在新的线程运行run方法中的代码 | start方法只是让线程进入就绪,里面的代码不一定立刻运行(cpu时间片还没有分给它)。每个线程对象的start方法只能调用一次 | |
run() | 新线程启动后会调用的方法 | 如果在否早Thread对象时传递了Runnable参数,则线程启动后会调用Runnable中的run方法,否则不执行任何线程操作。 | |
join() | 等待线程运行结束 | ||
join(long n) | 等待线程运行结束,最多等待n毫秒 | ||
interrupt() | 打断线程 | 如果打断线程整在sleep,wait,join会导致被打断的线程抛出InterruptedException,并清除打断标记;如果打断正在运行的线程,则会设置打断标记;park的线程被打断,也会设置打断标记 | |
isInterrupted() | 线程是否被打断过,不会清除打断标记 | 不会清除打断标记,正常线程被打断后,不会停止;需要使用打断标记来判断 | |
interrupted() | static | 判断当前线程是否被打断 | 会清除 打断标记 |
两阶段终止
线程状态
(操作系统)层面
- 初始状态:仅仅实在语言层面创建了线程对象,还未与操作系统线程相关联
- 就绪状态:该线程已经被创建(与操作系统线程关联),可由cpu调度执行
- 运行状态:获取cpu时间片运行中的状态
- 阻塞状态:缺少资源的状态
- 终止状态:线程已经执行完毕,生命周期已经结束,不会再转换为其他状态
(java api)层面
- new:线程刚被创建,但是还没有调用start()方法
- runnable:当调用了start()后的状态,对应着操纵系统中的就绪状态、运行状态、和阻塞状态
- blocked:
- waiting:
- timed_waiting:
- terminated:终止状态
状态转换
情况一:NEW –> RUNNABLE
当调用了 t.start() 方法时,由 NEW –> RUNNABLE
情况二: RUNNABLE <–> WAITING
当调用了t 线程用 synchronized(obj) 获取了对象锁后,调用 obj.wait() 方法时,t 线程从 RUNNABLE –> WAITING
-
调用 obj.notify() , obj.notifyAll() , t.interrupt() 时,会在 WaitSet 等待队列中出现锁竞争,非公平竞争
竞争锁成功,t 线程从 WAITING –> RUNNABLE
竞争锁失败,t 线程从 WAITING –> BLOCKED
情况三:RUNNABLE <–> WAITING
当前线程调用 t.join() 方法时,当前线程从 RUNNABLE –> WAITING 注意是当前线程在 t 线程对象的监视器上等待 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING –> RUNNABLE
情况四: RUNNABLE <–> WAITING
当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE –> WAITING 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING –> RUNNABLE
情况五: RUNNABLE <–> TIMED_WAITING
t 线程用 synchronized(obj) 获取了对象锁后 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE –> TIMED_WAITING t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时 竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE 竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED
情况六:RUNNABLE <–> TIMED_WAITING
当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE –> TIMED_WAITING 注意是当前线程在 t 线程对象的监视器上等待 当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING –> RUNNABLE
情况七:RUNNABLE <–> TIMED_WAITING
当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE –> TIMED_WAITING 当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING –> RUNNABLE
情况八:RUNNABLE <–> TIMED_WAITING
当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线 程从 RUNNABLE –> TIMED_WAITING 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING–> RUNNABLE
情况九:RUNNABLE <–> BLOCKED
t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争 成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED
情况十: RUNNABLE <–> TERMINATED
当前线程所有代码运行完毕,进入 TERMINATED
共享模型之管程(Monitor)
i++实际的jvm字节码指令
getstatic i //获取静态变量i的值
iconst_1 //准备常量1
iadd //自增
putstatic i //将修改后的值存入静态变量i
方法上的synchronized
class test{ //锁成员方法 等价于锁当前对象
public synchronized void test(){}
}
等价于
class test{
public void test(){
synchronized(this){
}
}
}
class Test{ //锁静态方法 等价于锁当前类对象
public synchronized static void test(){}
}
等价于
class Test{
public static void test(){
synchronized(Test.class){
}
}
}
偏向锁的撤销
将对象从可偏向变成不可偏向状态
- 调用对象的hashCode
- 其他线程使用对象
- 调用wait/notify
sleep和wait的区别
wait | sleep | |
---|---|---|
同步 | 只能在同步上下文中调用wait方法,。否则或抛出iiegalMonitorStateException异常 | 不与需要在同步方法或者同步块中调用 |
作用对象 | wait方法定义在Object类中,作用于对象本身 | sleep方法定义在Java.lang.Thread中,作用于当前线程 |
释放锁资源 | 是 | 否 |
唤醒条件 | 其他线程调用对象的notify()或者notifyAll()方法 | 超时或者调用interrupt方法体 |
方法属性 | wait是实例方法 | sleep是静态方法 |
park和unpark 与Object的wait & notify区别
- wait,notify和notifyAll必须配合Object Monitor一起使用,而unpark不必
- park & unpark 是以线程为单位来阻塞和唤醒线程,而notify只能随机唤醒一个等待线程,notiifyAll是唤醒所有的等待进程,就不那么精确
- park & unpark 可以先unpark,而wait & notify 不能先notify
死锁和活锁
死锁:竞争对方已拥有的资源 可使用顺序加锁的方式解决
活锁:任务或者执行者没有被阻塞,由于某些条件没有被满足,导致一直重复尝试,失败,尝试,失败。 可以使用先来先服务方式解决
ReentrantLock
相对于synchronized 有如下特点:
- 可中断
- 可以设置超时时间
- 可以设置公平锁 (先进先出) 默认为非公平锁
- 支持多个条件变量 (等价于多个waitSet)
- 在对象的级别保护临界区
与synchronized一样,都支持可重入
可重入
同一个线程吐过首次获得了这把锁,那么因为她是这把锁的拥有者,因此有权利再次获取这把锁
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
可打断
如果某个线程处于阻塞状态,可以调用其interrup他方法让其停止阻塞,获得锁失败
简而言之就是:处于阻塞状态的线程,被打断了就不用阻塞了,直接停止运行
调用 lock.lockInterruptibly();方法 如果使用lock.lock()方法不可打断
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
// 加锁,可打断锁
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
// 被打断,返回,不再向下执行
return;
}finally {
// 释放锁
lock.unlock();
}
});
lock.lock();
try {
t1.start();
Thread.sleep(1000);
// 打断
t1.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
锁超时
使用 lock.tryLock 方法会返回获取锁是否成功。如果成功则返回 true ,反之则返回 false 。
并且 tryLock 方法可以指定等待时间,参数为:tryLock(long timeout, TimeUnit unit), 其中 timeout 为最长等待时间,TimeUnit 为时间单位
简而言之就是:获取锁失败了、获取超时了或者被打断了,不再阻塞,直接停止运行。
不设置等待时间
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
// 未设置等待时间,一旦获取失败,直接返回false
if(!lock.tryLock()) {
System.out.println("获取失败");
// 获取失败,不再向下执行,返回
return;
}
System.out.println("得到了锁");
lock.unlock();
});
lock.lock();
try{
t1.start();
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
设置等待时间
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
// 判断获取锁是否成功,最多等待1秒
if(!lock.tryLock(1, TimeUnit.SECONDS)) {
System.out.println("获取失败");
// 获取失败,不再向下执行,直接返回
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
// 被打断,不再向下执行,直接返回
return;
}
System.out.println("得到了锁");
// 释放锁
lock.unlock();
});
lock.lock();
try{
t1.start();
// 打断等待
t1.interrupt();
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
公平锁
在线程获取锁失败,进入阻塞队列时,先进入的会在锁被释放后先获得锁。这样的获取方式就是公平的。
// 默认是不公平锁,需要在创建时指定为公平锁
ReentrantLock lock = new ReentrantLock(true);
条件变量
synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入waitSet 等待。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比
- synchronized 是那些不满足条件的线程都在一间休息室等消息
- 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
使用要点:
- await 前需要获得锁
- await 执行后,会释放锁,进入 conditionObject 等待
- await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
- 竞争 lock 锁成功后,从 await 后继续执
- 使用await后要使用signal来唤醒
同步模式之顺序控制
wait/notify实现
public class Code_32_Test {
public static void main(String[] args) {
WaitAndNotify waitAndNotify = new WaitAndNotify(1, 5);
new Thread(()->{
waitAndNotify.run("a", 1, 2);
}).start();
new Thread(()->{
waitAndNotify.run("b", 2, 3);
}).start();
new Thread(()->{
waitAndNotify.run("c", 3, 1);
}).start();
}
}
class WaitAndNotify {
public void run(String str, int flag, int nextFlag) {
for(int i = 0; i < loopNumber; i++) {
synchronized(this) {
while (flag != this.flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
// 设置下一个运行的线程标记
this.flag = nextFlag;
// 唤醒所有线程
this.notifyAll();
}
}
}
private int flag;
private int loopNumber;
public WaitAndNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
}
park和unpark实现
public class Code_33_Test {
public static Thread t1, t2, t3;
public static void main(String[] args) {
ParkAndUnPark obj = new ParkAndUnPark(5);
t1 = new Thread(() -> {
obj.run("a", t2);
});
t2 = new Thread(() -> {
obj.run("b", t3);
});
t3 = new Thread(() -> {
obj.run("c", t1);
});
t1.start();
t2.start();
t3.start();
LockSupport.unpark(t1);
}
}
class ParkAndUnPark {
public void run(String str, Thread nextThread) {
for(int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(nextThread);
}
}
private int loopNumber;
public ParkAndUnPark(int loopNumber) {
this.loopNumber = loopNumber;
}
}
await/signal实现
public class Code_34_Test {
public static void main(String[] args) {
AwaitAndSignal lock = new AwaitAndSignal(5);
Condition a = lock.newCondition();
Condition b = lock.newCondition();
Condition c = lock.newCondition();
new Thread(() -> {
lock.run("a", a, b);
}).start();
new Thread(() -> {
lock.run("b", b, c);
}).start();
new Thread(() -> {
lock.run("c", c, a);
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
a.signal();
}finally {
lock.unlock();
}
}
}
class AwaitAndSignal extends ReentrantLock {
public void run(String str, Condition current, Condition nextCondition) {
for(int i = 0; i < loopNumber; i++) {
lock();
try {
current.await();
System.out.print(str);
nextCondition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
unlock();
}
}
}
private int loopNumber;
public AwaitAndSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
}
共享模型之内存
java内存模型(JMM)
JMM 即 Java Memory Model,它定义了主存(共享内存)、工作内存(线程私有)抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、 CPU 指令优化等。
JMM 体现在以下几个方面
- 原子性 - 保证指令不会受到线程上下文切换的影响
- 可见性 - 保证指令不会受 cpu 缓存的影响
- 有序性 - 保证指令不会受 cpu 指令并行优化的影响
volatile原理
volatile的底层实现原理是内存屏障,Memory Barrier
- 对volatile变量的写指令后会加入写屏障
- 对volatile变量的读指令前会加入读屏障
保证可见性
- 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
public void actor2(I_Result r) {
num = 2;
ready = true; // ready 是被 volatile 修饰的,赋值带写屏障
// 写屏障
}
- 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
public void actor1(I_Result r) {
// 读屏障
// ready是被 volatile 修饰的,读取值带读屏障
if(ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
分析如图:
保证有序性
- 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
public void actor2(I_Result r) {
num = 2;
ready = true; // ready 是被 volatile 修饰的,赋值带写屏障
// 写屏障
}
- 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
public void actor1(I_Result r) {
// 读屏障
// ready 是被 volatile 修饰的,读取值带读屏障
if(ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
注意
volatile只能保证有序性和可见性,不能解决指令交错
写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证其它线程的读跑到它前面去。 而有序性的保证也只是保证了本线程内相关代码不被重排序
synchronized可以保证原子性、有序性、可见性是对其可以完全管理的资源来说的,如使用double-checked时的INSTANCE
未被完全管理,则不能完全实现有序性、可见性
// 最开始的单例模式是这样的
public final class Singleton {
private Singleton() { }
private static Singleton INSTANCE = null;
public static Singleton getInstance() {
// 首次访问会同步,而之后的使用不用进入synchronized
synchronized(Singleton.class) {
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
}
}
return INSTANCE;
}
}
// 但是上面的代码块的效率是有问题的,因为即使已经产生了单实例之后,之后调用了getInstance()方法之后还是会加锁,这会严重影响性能!因此就有了模式如下double-checked lockin:
public final class Singleton {
private Singleton() { }
private static Singleton INSTANCE = null;
public static Singleton getInstance() {
if(INSTANCE == null) { // t2
// 首次访问会同步,而之后的使用没有 synchronized
synchronized(Singleton.class) {
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}
//但是上面的if(INSTANCE == null)判断代码没有在同步代码块synchronized中,不能享有synchronized保证的原子性,可见性。所以
以上的实现特点是:
- 懒惰实例化
- 首次使用 getInstance() 才使用 synchronized 加锁,后续使用时无需加锁
- 有隐含的,但很关键的一点:第一个 if 使用了 INSTANCE 变量,是在同步块之外
但在多线程环境下,上面的代码是有问题的,getInstance 方法对应的字节码为:
0: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
3: ifnonnull 37
// ldc是获得类对象
6: ldc #3 // class cn/itcast/n5/Singleton
// 复制操作数栈栈顶的值放入栈顶, 将类对象的引用地址复制了一份
8: dup
// 操作数栈栈顶的值弹出,即将对象的引用地址存到局部变量表中
// 将类对象的引用地址存储了一份,是为了将来解锁用
9: astore_0
10: monitorenter
11: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
14: ifnonnull 27
// 新建一个实例
17: new #3 // class cn/itcast/n5/Singleton
// 复制了一个实例的引用
20: dup
// 通过这个复制的引用调用它的构造方法
21: invokespecial #4 // Method "<init>":()V
// 最开始的这个引用用来进行赋值操作
24: putstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
27: aload_0
28: monitorexit
29: goto 37
32: astore_1
33: aload_0
34: monitorexit
35: aload_1
36: athrow
37: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
40: areturn
其中:
- 17 表示创建对象,将对象引用入栈 // new Singleton
- 20 表示复制一份对象引用 // 复制了引用地址
- 21 表示利用一个对象引用,调用构造方法 // 根据复制的引用地址调用构造方法
- 24 表示利用一个对象引用,赋值给 static INSTANCE
可能出现的问题
线程t1 还未对对象初始化完成,但时赋值操作已经完成,此时线程t2就会使用未初始化完全的对象。
简单来说,就是
new
对象这个操作不是原子操作。会分成创建引用且复制一份引用,其中一份引用赋值给变量,另外一份执行<init>
函数。所以,如果在初始化没有完成前,就赋值了则会出现并发问题。
double-checked locking 解决
加volatile就行了。
public final class Singleton {
private Singleton() { }
private static volatile Singleton INSTANCE = null;
public static Singleton getInstance() {
// 实例没创建,才会进入内部的 synchronized代码块
if (INSTANCE == null) {
synchronized (Singleton.class) { // t2
// 也许有其它线程已经创建实例,所以再判断一次
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}
共享模型之无锁
管程即 monitor 是阻塞式的悲观锁实现并发控制,这章我们将通过非阻塞式的乐观锁的来实现并发控制
CAS方式
class AccountSafe implements Account{
AtomicInteger atomicInteger ;
public AccountSafe(Integer balance){
this.atomicInteger = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return atomicInteger.get();
}
@Override
public void withdraw(Integer amount) {
// 核心代码
while (true){
int pre = getBalance();
int next = pre - amount;
if (atomicInteger.compareAndSet(pre,next)){
break;
}
}
}
}
原子整数
java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:
使用原子的方式更新基本类型
- AtomicInteger:整型原子类
- AtomicLong:长整型原子类
- AtomicBoolean :布尔型原子类
以 AtomicInteger 为例
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
}
原子引用
为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。
基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。
- AtomicReference:引用类型原子类
- AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
- AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起。只关心是否被更改过
原子数组
改变的只是对象本身里面的值,而不需要创建一个新的数组对象
使用原子的方式更新数组里的某个元素
- AtomicIntegerArray:整形数组原子类
- AtomicLongArray:长整形数组原子类
- AtomicReferenceArray :引用类型数组原子类
public class Code_10_AtomicArrayTest {
public static void main(String[] args) throws InterruptedException {
/**
参数1,提供数组、可以是线程不安全数组或线程安全数组
参数2,获取数组长度的方法
参数3,自增方法,回传array,index
参数4,打印数组的方法
函数式编程:
//supplier 提供者 无中生有 ()->结果
//function 函数 一个参数一个结果 (参数)->结果, BiFunction(参数1,参数2)-> 结果
//consumer 消费者 一个参数没有结果 ()->void, BiComsumer(参数1,参数2)->
* 结果如下:
* [9934, 9938, 9940, 9931, 9935, 9933, 9944, 9942, 9939, 9940]
* [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
*/
demo(
() -> new int[10],
(array) -> array.length,
(array, index) -> array[index]++,
(array) -> System.out.println(Arrays.toString(array))
);
TimeUnit.SECONDS.sleep(1);
demo(
() -> new AtomicIntegerArray(10),
(array) -> array.length(),
(array, index) -> array.getAndIncrement(index),
(array) -> System.out.println(array)
);
}
private static <T> void demo(
Supplier<T> arraySupplier,
Function<T, Integer> lengthFun,
BiConsumer<T, Integer> putConsumer,
Consumer<T> printConsumer) {
ArrayList<Thread> ts = new ArrayList<>(); // 创建集合
T array = arraySupplier.get(); // 获取数组
int length = lengthFun.apply(array); // 获取数组的长度
for(int i = 0; i < length; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j % length);
}
}));
}
ts.forEach(Thread::start);
ts.forEach((thread) -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
printConsumer.accept(array);
}
}
字段更新器
原子累加器
AtomicLong Vs LongAdder
public static void main(String[] args) {
for(int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(0), (ref) -> ref.getAndIncrement());
}
for(int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), (ref) -> ref.increment());
}
}
private static <T> void demo(Supplier<T> supplier, Consumer<T> consumer) {
ArrayList<Thread> list = new ArrayList<>();
T adder = supplier.get();
// 4 个线程,每人累加 50 万
for (int i = 0; i < 4; i++) {
list.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
consumer.accept(adder);
}
}));
}
long start = System.nanoTime();
list.forEach(t -> t.start());
list.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start)/1000_000);
}
执行代码后,发现使用 LongAdder 比 AtomicLong 快2,3倍,使用 LongAdder 性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
LongAdder 原理
和ConcurrentHashMap中的countcell数组使用了相同的思想
LongAdder 类有几个关键域
public class LongAdder extends Striped64 implements Serializable {}
下面的变量属于 Striped64 被 LongAdder 继承。
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;
使用cas实现一个自旋锁
public void lock() {
while (true) {
if(state.compareAndSet(0, 1)) {
break;
}
}
}
public void unlock() {
log.debug("unlock...");
state.set(0);
}
原理之伪内存
// 防止缓存行伪共享
@sun.misc.Contended //防止一个缓存行容纳多个cell对象
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}
每个cpu拥有着多级缓存(一般为3级缓存),缓存以缓存行为单位i,每个缓存行对应着一块内存,一般是64字节
cpu要保证数据的一致性,如果某个cpu核心更改了数据,其它cpu核心对应的整个缓存行必须失效
Unsafe
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。LockSupport 的 park 方法,cas 相关的方法底层都是通过Unsafe类来实现的。
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
// Unsafe 使用了单例模式,unsafe 对象是类中的一个私有的变量
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe)theUnsafe.get(null);
}
final原理
和volatile一样,使用了读写屏障
线程池
具体请见线程池简易实现和线程池源码
自定义线程池
package Thread;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 自定义线程池
*/
public class demo06 {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,
(queue, task) -> {
// 1. 阻塞等待。
// queue.put(task);
// 2. 带超时的等待
// queue.offer(task, 500, TimeUnit.MILLISECONDS);
// 3. 调用者放弃
// log.info("放弃 {}", task);
// 4. 调用者抛出异常
// throw new RuntimeException("任务执行失败" + task);
// 5. 调用者自己执行任务
task.run();
});
for(int i = 0; i < 4; i++) {
int j = i;
threadPool.executor(() ->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
// 实现线程池
class ThreadPool {
// 线程集合
private Set<Worker> works = new HashSet<Worker>();
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程池的核心数
private int coreSize;
// 获取任务的超时时间
private long timeout;
private TimeUnit unit;
// 使用策略模式。
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit unit, int queueCapacity,
RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.unit = unit;
taskQueue = new BlockingQueue<Runnable>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
// 执行任务
public void executor(Runnable task) {
// 如果线程池满了. 就将任务加入到任务队列, 否则执行任务
synchronized (works) {
if(works.size() < coreSize) {
Worker worker = new Worker(task);
// log.info("新增 worker {} ,任务 {}", worker, task);
works.add(worker);
worker.start();
} else {
// taskQueue.put(task);
// 1)死等
// 2)带超时等待
// 3)让调用者放弃任务执行
// 4)让调用者抛出异常
// 5)让调用者自己执行任务
taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
// 执行任务
// 1)当 task 不为空,执行任务
// 2)当 task 执行完毕,再接着从任务队列获取任务并执行
@Override
public void run() {
// while (task != null || (task = taskQueue.take()) != null) {
while (task != null || (task = taskQueue.poll(timeout, unit)) != null) {
try {
// log.info("正在执行 {}", task);
task.run();
}catch (Exception e) {
} finally {
task = null;
}
}
synchronized (works) {
// log.info("worker 被移除 {}", this);
works.remove(this);
}
}
}
}
// 实现阻塞队列
class BlockingQueue<T> {
// 阻塞队列的容量
private int capacity;
// 双端链表, 从头取, 从尾加
private Deque<T> queue;
// 定义锁
private ReentrantLock lock;
// 当阻塞队列满了时候, 去 fullWaitSet 休息, 生产者条件变量
private Condition fullWaitSet;
// 当阻塞队列空了时候,去 emptyWaitSet 休息, 消费者小件变量
private Condition emptyWaitSet;
public BlockingQueue(int capacity) {
queue = new ArrayDeque<>(capacity);
lock = new ReentrantLock();
fullWaitSet = lock.newCondition();
emptyWaitSet = lock.newCondition();
this.capacity = capacity;
}
// 带有超时时间的获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 同一时间单位
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if(nanos <= 0) {
return null;
}
// 防止虚假唤醒, 返回的是所剩时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
// 获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
// 添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
// log.info("等待加入任务队列 {}", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// log.info("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
// 带有超时时间的添加
public boolean offer(T task, long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if(nanos <= 0) {
return false;
}
// log.info("等待加入任务队列 {}", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// log.info("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断判断是否满
if(queue.size() == capacity) {
rejectPolicy.reject(this, task);
} else { // 有空闲
// log.info("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
public int getSize() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
AQS
具体请见AQS源码浅析
全称是AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架。
特点:
-
用state属性来表示资源的状态(分为独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和设防所
getState-获取state状态
setState=设置state状态
compareAndSetState - 乐观锁设置state状态
独占模式是只有以一个线程能够访问资源,而共享模式可以允许多个线程访问资源
提供了基于FIFO的等待队列,类似于Monitor的EntryList
条件变量来实现等待,唤醒机制,支持多个条件变量,类似于Monitor的WaitSet
ReentrantLock
非公平锁实现原理
从构造器开始看,默认为非公平锁实现
public ReentrantLock() {
sync = new NonfairSync();
}
lock()
总结:
掌握知识点:
Synchronized原理 LockSupport原理 ReenTrantLock原理
- 分析多线程访问共享资源时,哪些代码片段属于临界区
- 使用 synchronized 互斥解决临界区的线程安全问题
- 掌握 synchronized 锁对象语法
- 掌握 synchronzied 加载成员方法和静态方法语法
- 掌握 wait/notify 同步方法
- 使用 lock 互斥解决临界区的线程安全问题 掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量
- 学会分析变量的线程安全性、掌握常见线程安全类的使用
- 了解线程活跃性问题:死锁、活锁、饥饿
- 应用方面
- 互斥:使用 synchronized 或 Lock 达到共享资源互斥效果,实现原子性效果,保证线程安全。
- 同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果。
- 原理方面
- monitor、synchronized 、wait/notify 原理
- synchronized 进阶原理
- park & unpark 原理
- 模式方面
- 同步模式之保护性暂停
- 异步模式之生产者消费者
- 同步模式之顺序控制
JMM CAS原理 Volatile原理
- 可见性 - 由 JVM 缓存优化引起
- 有序性 - 由 JVM 指令重排序优化引起
- happens-before 规则
- 原理方面
- volatile
- 模式方面
- 两阶段终止模式的 volatile 改进
- 同步模式之 balking
- CAS 与 volatile
- juc 包下 API
- 原子整数
- 原子引用
- 原子数组
- 字段更新器
- 原子累加器
- Unsafe
- 原理方面
- LongAdder 源码
- 伪共享