- 多线程出现目的
- 如何使用多线程
- 线程状态(6种)
- 线程开启/停止
- 线程安全
- Volilate
- Sychronized
- 机制
- 如何实现锁
- 为什么任何一个对象都可以成为锁
- 锁的优化
- 机制
- Lock与Synchronized区别
- CAS
- AQS(AbstractQueuedSychronizer)
- ReentrantLock
- Lock()加锁分析
- unlock()释放锁分析
- CountDownLatch
- 是什么
- 如何使用
- 分析
- ReentrantLock
多线程出现目的
场景:
- 当一个进程处理过程中,遇到网络与IO操作都会进入阻塞状态,不再处理任何东西,浪费系统资源。
- 一个函数的处理非常耗时,其实其中多个逻辑可以并行处理。
多线程的面世就是要解决以上问题。
如何使用多线程
- extends Thread
public class ThreadDemo extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + "ThreadDemo Running");
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new ThreadDemo().start();
}
}
}
- implements Runnable
public class RunnableDemo implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + "RunnableDemo Running");
}
public static void main(String[] args) {
RunnableDemo runnableDemo = new RunnableDemo();
for (int i = 0; i < 10; i++) {
new Thread(runnableDemo).start();
}
}
}
- ExecutorService
- Executors.newFixedThreadPool
- Executors.newCachedThreadPool
- Executors.newSingleThreadPool
- Executors.newScheduledThreadPool
public class ExecutorServiceDemo {
private static ThreadPoolExecutor threadPool;
private static ThreadFactory factory = new ThreadFactory() {
private final AtomicInteger integer = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
int threadName = integer.getAndIncrement();
System.out.println("Created Thread:" + threadName);
return new Thread(r, "ThreadPool Thread:" + threadName);
}
};
private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
threadPool = new ThreadPoolExecutor(10, 15, 1000L,
TimeUnit.SECONDS,
workQueue,
factory);
//execute()与submit()的区别在于submit有一个Future类型的返回,
// 实际submit是把Callable入参包装成RunnableFuture类型后再调用execute();
for (int i = 0; i < 15; i++) {
System.out.println("threadPool.execute");
threadPool.execute(new RunnableDemo());
}
for (int i = 0; i < 15; i++) {
System.out.println("threadPool.submit");
Future<?> future = threadPool.submit(new CallableDemo());
System.out.println(future.get());
}
}
}
- implements Callable<>
public class CallableDemo implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + ":" +"CallableDemo Running");
return "Callable Result";
}
public static void main(String[] args) throws Exception {
CallableDemo callableDemo = new CallableDemo();
String callableReturn = callableDemo.call();
System.out.println("callableReturn :" + callableReturn);
}
}
Callable与Runable区别:
Re:
- Callable任务线程能返回执行结果,而Runnable任务线程不能返回结果
- Callable能向上抛出异常,而Runnable接口异常只能内部消化
为什么提供extends Thread又提供implements Runnable
Re:因为JAVA不支持多继承
线程状态(6种)
- NEW 调用Start方法前
- RUNNABLE 运行
- BLOCKED 阻塞
- 等待阻塞 wait
- 同步阻塞 synchronized
- 其它阻塞 sleep/join
- WAITING 等待
- TIMED_WAITING 超时等待
- TERMINATED 终止
状态变更图示:
线程开启/停止
开始:start()
停止:interrupt()
通过设置标志位的方式终止线程,使其能有机会去清理资源,而非暴力的方式直接kill掉,这种方式更新安全。
public class demo4 {
private static int num;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
num++;
System.out.println("Num:" + num);
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
}
}
线程安全
- 可见性
- 原子性
- 有序性
Volilate
public class VolatileDemo {
private volatile static boolean stop = false;
public static void main(String[] args) {
Thread thread = new Thread(() -> {
int i = 0;
while (!stop) {
i++;
System.out.println("i: " + i);
}
});
try {
long startTime = System.currentTimeMillis();
thread.start();
System.out.println("Thread Start");
TimeUnit.SECONDS.sleep(1);
stop = true;
long endTime = System.currentTimeMillis();
System.out.println("Runtime: " + (endTime - startTime) / 1000 + " Second");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
因为每个线程有自己私有的内存空间,修改变量需要同步回主存才能对其它线程可见,而volatile就会有哪下作用
- 修改volatile修饰的变量时,会强制将修改的值回写到主存。
- 读取volatile修饰的变量时,会强制到主存获取数据,不再到缓存读取
- volatile会使被volatile修饰的语句禁止指令重排序
指令重排序实例:
int a = 1;
int b = 2;
int c = 3;
//以上例子,可能是int c = 3优先于int a = 1和int b = 2执行
int a = 1;
volatile int b = 2;
int c = 3;
//以上例子则volatile int b = 2一定是在int a = 1和int c = 3之间执行。
问题:为什么需要编译器指令重排?
Re:优化执行效率。
问题:什么是CPU乱序执行?
问题:为什么要禁止编译器指令重排呢?
Re:因为多线程下指令重排可能会导致处理出错,例如:
Thread-1:
int b = 10;
int c = b;
boolean flag = true;
Thread-2:
while(flag){
System.out.println(b);
}
如果编译器把Thread-1的第3条指令重排到第一行,那Thread-2就有可能出错,因为B还没有初始化。
问题:volilate为什么不能保证原子性
Re:因为volilate对变量的操作在字节码层面是由多条指令组成,非原子性操作,所以它只保证了可见性,不保证原子性。
Volilate因为只保证了Read and Load即从主存加载到工作内存时加载的值是最新的,例如:
线程1和线程2在执行Read and Load的时候,发现主存里的值都是5,双方都加载了这个最新值,然后双方都对该值加1,再把值放回主存,事实主存值结果为6,此操作有线程安全问题。
小结
声明了volatile的变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,把这个变量所在的缓存行的数据写回到系统内存,再根据我们前面提到过的MESI的缓存一致性协议,来保证多CPU下的各个高速缓存中的数据的一致性。
Sychronized
机制
- 使用方法
- 修饰实例方法
- 修饰静态方法
- 修饰代码块
- 使用Sychronized后,会通过字节码生成以下指令:
-
修饰方法时:ACC_SYNCHRONIZED
-
修饰代码块:monitorenter monitorexit
- 获取锁情况
- 修饰实例方法
进行同步代码前,需要获取当前实例的锁 - 修饰静态方法
进行同步代码前,需要获取当前类对象的锁 - 修饰代码块
进行同步代码前,需要获取给定对象的锁
如何实现锁
本质:对象监视器的获取(独占锁)
为什么任何一个对象都可以成为锁
因为对象在内存中分为三块区域:对象头、实例数据、对齐填充
对象头:
而Synchroned使用的锁存在每一个对象的对象头里,其中锁标志位指向的是monitor对象(也称为管程或监视器锁)的起始地址。
锁的优化
锁的状态:
- 无向锁
- 偏向锁
大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得。使用传统的重量级锁会有频繁锁操作,为了让线程获得锁的代价更低,引入了偏向锁, - 轻量锁
当多线程竟然偏向锁时会升级为轻量级锁 - 重量锁
基于monitor的锁实现。
注意:锁只能从轻到重的方向发展,不可逆。
Lock与Synchronized区别
- Lock是一个接口
- synchronized是JVM层的一个实现
- synchronized是被动的触发锁机制,而Lock是可以灵活的控制,锁的创建和释放都需要人为控制,特别是异常发生的时候要注意释放锁。
- Lock相对来讲控制粒度更小,例如还可以分别控制读写锁
- Lock支持公平、非公平锁,而synchronized只支持非公平锁
CAS
CAS是JDK提供的Unsafe类里的一系列操作,这一系列操作由JDK来保证原子性。
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
Atomic一系列的对象是根据CAS的封装来实例原子性。
AQS(AbstractQueuedSychronizer)
AQS的关键数据结构:
链表的操作通过CAS原子操作来保证多线程下的原子性:
- compareAndSetTail
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
- compareAndSetHead
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
通过compareAndSwapObject
这个native方法来保证链表操作的线程安全性
ReentrantLock
Lock()加锁分析
非公平锁逻辑流程图
公平锁与非公平锁的差异
- 公平锁获取锁的过程
final void lock() {
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 非公平锁获取锁的过程
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
以上可以看出,非公平锁在Lock()
方法被调用时是首先尝试当前线程是否能直接获得锁,然后tryAcquire()
方法的时候公平锁是需要检查AQS队列里是否有等待的节点,有的话是当前线程获取锁不成功,而非公平锁是直接CAS当前锁的状态,若通过就把锁给当前线程了。同时也可以看出双方在获取不到锁的时候,进行AQS队列方式是一样的,都是加在队尾。在加入队列后,还需要根据当前节点的前驱节点的waitStatus
若是Node.SIGNAL
状态判断是否需要把当前线程挂起,以省系统资源,
unlock()释放锁分析
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
可以看出,每个unlock()
操作都是一个State - 1
操作,直到State == 0
的时候,把ExclusiveOwnerThread
即当前获得锁的线程设置为null来释放锁。
小结
在获取锁的时候,会维护一个双向链表,用于存放获取锁失败的的线程到队列中进行自旋来获取锁,
CountDownLatch
是什么
CountDownLatch是JUC中提供的一个同步工具,使用调用await()
它可以使一个或者多个线程进行等待,直到其它线程执行CountDown()
方法把倒数器减至0后,等待的线程才会启动。
如何使用
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
System.out.println(Thread.currentThread() + "执行完毕");
countDownLatch.countDown();
}, "Thread-1").start();
new Thread(() -> {
System.out.println(Thread.currentThread() + "执行完毕");
countDownLatch.countDown();
}, "Thread-2").start();
new Thread(() -> {
System.out.println(Thread.currentThread() + "执行完毕");
countDownLatch.countDown();
}, "Thread-3").start();
countDownLatch.await();
System.out.println("全部线程执行完毕");
}
}
分析
- await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 若线程中端,直接抛异常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 计数为0时,表示获取锁成功
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 阻塞,并入队
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 入队
boolean failed = true;
try {
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取锁成功,设置队列头为node节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) // 线程挂起
&& parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 检查计数器是否为0,为0直接返回
- 计数器大于0,即当前线程需要阻塞并等待计数器变为0
- 当前线程需要被封装成Node对象并添加到AQS双向链表里去
- 最后自旋尝试获取锁,即检查计数器是否为0,获取成功即出队,然后放行当前线程
- countDonw()
// 计数-1
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 首先尝试释放锁
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0) //如果计数已经为0,则返回失败
return false;
int nextc = c-1;
// 原子操作实现计数-1
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// 唤醒被阻塞的线程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 队列非空,表示有线程被阻塞
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 头结点如果为SIGNAL,则唤醒头结点下个节点上关联的线程,并出队
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // 没有线程被阻塞,直接跳出
break;
}
}
- 尝试释放锁,即将计数器-1,并判断state是否为0,若为0即表示当前没有锁,可以开始唤醒链表中阻塞中的线程
- 如果链表里为空,即没有阻塞的线程,直接退出
- 如果头节点waitStatus为SIGNAL,就依次唤醒下个节点的线程,并出队