java多线程(2)-线程同步与线程安全

前言

前面我们在java多线程相关(1)-synchronized,wait,notify,volatile中简单介绍了最简单的多线程同步的方法;就我个人的理解,多线程同步的目的有两个 1.保证多线程的执行顺序 2.维护多线程可见变量(如volalite变量,或者更高一级的synchronized修饰变量)的安全;我将在本章中尝试写出我个人的理解

线程同步常见示例

CountDownLatch示例

CountDownLatch等待多个线程执行完成再继续执行,被多个线程阻塞,类似于多个线程join


mport java.util.concurrent.CountDownLatch;

public class TestMultiThread1 {

private static final CountDownLatch countDwonLatch = new CountDownLatch(2);

private static class TestCountDownLatchThread extends Thread{

@Override

public void run() {

// TODO Auto-generated method stub

System.out.println("TestCountDownLatchThread 子线程开始执行");

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

System.out.println("TestCountDownLatchThread 子线程执行完成");

countDwonLatch.countDown();

}

}

public static void main(String[] args) {

// TODO Auto-generated method stub

System.out.println("主线程开始执行");

new TestCountDownLatchThread().start();

new TestCountDownLatchThread().start();

try {

countDwonLatch.await();

System.out.println("主线程等待子线程执行完成");

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

System.out.println("主线程继续执行");

}

}

结果为:


主线程开始执行

TestCountDownLatchThread 子线程开始执行

TestCountDownLatchThread 子线程开始执行

TestCountDownLatchThread 子线程执行完成

TestCountDownLatchThread 子线程执行完成

主线程等待子线程执行完成

主线程继续执行

改为countDwonLatch.await(1000, TimeUnit.MILLISECONDS);的后结果为:


主线程开始执行

TestCountDownLatchThread 子线程开始执行

TestCountDownLatchThread 子线程开始执行

主线程等待子线程执行完成

主线程继续执行

TestCountDownLatchThread 子线程执行完成

TestCountDownLatchThread 子线程执行完成

CyclicBarrier示例

CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行


private static class TestCyclicBarrierThread extends Thread{

private int worktime;

TestCyclicBarrierThread(int worktime) {

this.worktime = worktime;

}

@Override

public void run() {

// TODO Auto-generated method stub

System.out.println("TestCyclicBarrierThread 子线程开始执行");

try {

Thread.sleep(worktime*1000);

System.out.println("TestCyclicBarrierThread 子线程执行"+worktime+"s");

try {

cyclicBarrier.await();

} catch (BrokenBarrierException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

System.out.println("TestCountDownLatchThread 子线程继续执行");

}

}

public static void main(String[] args) {

for(int i = 0;i<4;i++) {

new TestCyclicBarrierThread(i+1).start();

}

}


TestCyclicBarrierThread 子线程开始执行

TestCyclicBarrierThread 子线程开始执行

TestCyclicBarrierThread 子线程开始执行

TestCyclicBarrierThread 子线程开始执行

TestCyclicBarrierThread 子线程执行1s

TestCyclicBarrierThread 子线程执行2s

TestCyclicBarrierThread 子线程执行3s

TestCyclicBarrierThread 子线程执行4s

TestCountDownLatchThread 子线程继续执行

TestCountDownLatchThread 子线程继续执行

TestCountDownLatchThread 子线程继续执行

TestCountDownLatchThread 子线程继续执行

将CyclicBarrier对象改为


TestCyclicBarrierThread 子线程开始执行

TestCyclicBarrierThread 子线程开始执行

TestCyclicBarrierThread 子线程开始执行

TestCyclicBarrierThread 子线程开始执行

TestCyclicBarrierThread 子线程执行1s

TestCyclicBarrierThread 子线程执行2s

TestCyclicBarrierThread 子线程执行3s

TestCyclicBarrierThread 子线程执行4s

TestCountDownLatchThread 子线程做额外操作 //选一个子线程做额外操作

TestCountDownLatchThread 子线程继续执行

TestCountDownLatchThread 子线程继续执行

TestCountDownLatchThread 子线程继续执行

TestCountDownLatchThread 子线程继续执行

Semaphore示例

Semaphore感觉其实和锁有点类似,它一般用于控制对某组资源的访问权限(不是一个,而是一组同类型资源);用一组资源来维护线程的同步


private static final Semaphore semaphore = new Semaphore(5); //机器数

private static class Worker extends Thread{

private int num;

private Semaphore semaphore;

public Worker(int num,Semaphore semaphore){

this.num = num;

this.semaphore = semaphore;

}

@Override

public void run() {

try {

semaphore.acquire(); //获得信号量

System.out.println("工人"+this.num+"占用一个机器在生产...");

Thread.sleep(2000);

System.out.println("工人"+this.num+"释放出机器");

semaphore.release(); //释放信号量

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

for(int i=0;i<8;i++)

new Worker(i,semaphore).start();

从上面几个示例我们可以看出,java为我们提供了很多种线程同步方式的类,这些都在java.util.concurrent包下面,其示意图可以如图表示

concurrent示意图.png

我们可以看到其中的AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic包中的类),这些concurrent包中的基础类都是使用这种模式来实现的,而concurrent包中的高层类又是依赖于这些基础类来实现的

下面我们就来看看AQS

AQS(线程同步器)

AQS的定义

AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类;大致结构如图

AQS组织结构.png

一个volatile资源,以及一个线程队列,维护征用该资源的多个线程;

该资源可以表现任何状态。比如, CountDownLatch表示剩余持有该资源的线程数,Semaphore 用它来表现剩余的许可数,ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁;FutureTask 用它来表现任务的状态(尚未开始、运行、完成和取消)等等;这些都是根据AQS自定义出来的

AbstractQueuedSynchronizer类

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

一般来说,独占式和共享式实现一种即可,当然也可以都实现(如ReentrantReadWriteLock,表示读写锁)

AbstractQueuedSynchronizer代码分析

以独占模式为例


1209 /**

1210 * Acquires in exclusive mode, ignoring interrupts. Implemented

1211 * by invoking at least once {@link #tryAcquire},

1212 * returning on success. Otherwise the thread is queued, possibly

1213 * repeatedly blocking and unblocking, invoking {@link

1214 * #tryAcquire} until success. This method can be used

1215 * to implement method {@link Lock#lock}.

1216 *

1217 * @param arg the acquire argument. This value is conveyed to

1218 * {@link #tryAcquire} but is otherwise uninterpreted and

1219 * can represent anything you like.

1220 */

1221 public final void acquire(int arg) {

1222 if (!tryAcquire(arg) &&

1223 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

1224 selfInterrupt();

1225 }

函数流程如下:

1.tryAcquire()尝试直接去获取资源,如果成功则直接返回;

2.addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

3.acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

4.如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上

tryAcquire

由各个同步器自己实现,如ReentrantLock中的tryAcquire


213 protected final boolean tryAcquire(int acquires) {

214 return nonfairTryAcquire(acquires);

215 }


126 /**

127 * Performs non-fair tryLock. tryAcquire is implemented in

128 * subclasses, but both need nonfair try for trylock method.

129 */

130 final boolean nonfairTryAcquire(int acquires) {

131 final Thread current = Thread.currentThread();

132 int c = getState();

133 if (c == 0) {

134 if (compareAndSetState(0, acquires)) { //CAS 方式,自旋操作

135 setExclusiveOwnerThread(current); //当前state可用,当前线程得到锁了

136 return true;

137 }

138 }

139 else if (current == getExclusiveOwnerThread()) {

140 int nextc = c + acquires;

141 if (nextc < 0) // overflow

142 throw new Error("Maximum lock count exceeded");

143 setState(nextc); //锁的值+acquires(可重入)

144 return true;

145 }

146 return false;

147 }

addWaiter

此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点


629 /**

630 * Creates and enqueues node for current thread and given mode.

631 *

632 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared

633 * @return the new node

634 */

635 private Node addWaiter(Node mode) {

636 Node node = new Node(mode);

637

638 for (;;) {

639 Node oldTail = tail;

640 if (oldTail != null) {

641 U.putObject(node, Node.PREV, oldTail);

642 if (compareAndSetTail(oldTail, node)) {

643 oldTail.next = node;

644 return node; //node放入队尾并返回

645 }

646 } else {

647 initializeSyncQueue(); //这个线程和mode所创建的node是第一个节点

648 }

649 }

650 }

acquireQueued

通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。聪明的你立刻应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了


881 /**

882 * Acquires in exclusive uninterruptible mode for thread already in

883 * queue. Used by condition wait methods as well as acquire.

884 *

885 * @param node the node

886 * @param arg the acquire argument

887 * @return {@code true} if interrupted while waiting

888 */

889 final boolean acquireQueued(final Node node, int arg) {

890 try {

891 boolean interrupted = false;

892 for (;;) {

893 final Node p = node.predecessor(); //node的前一个节点

894 if (p == head && tryAcquire(arg)) { //前一个为头节点就可以尝试拿state资源了

895 setHead(node); //拿到了,释放头节点

896 p.next = null; // help GC

897 return interrupted;

898 }

//如果自己可以休息了,就进入waiting状态,直到被unpark()

899 if (shouldParkAfterFailedAcquire(p, node) &&

900 parkAndCheckInterrupt())

901 interrupted = true;

902 }

903 } catch (Throwable t) {

904 cancelAcquire(node);

905 throw t;

906 }

907 }

shouldParkAfterFailedAcquire


818 /**

819 * Checks and updates status for a node that failed to acquire.

820 * Returns true if thread should block. This is the main signal

821 * control in all acquire loops. Requires that pred == node.prev.

822 *

823 * @param pred node's predecessor holding status

824 * @param node the node

825 * @return {@code true} if thread should block

826 */

827 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

828 int ws = pred.waitStatus; //拿到前驱的状态

829 if (ws == Node.SIGNAL)

830 /*

831 * This node has already set status asking a release

832 * to signal it, so it can safely park.

833 */

834 return true; //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了

835 if (ws > 0) {

836 /*

837 * Predecessor was cancelled. Skip over predecessors and

838 * indicate retry.

839 */

840 do {

841 node.prev = pred = pred.prev; //如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边

842 } while (pred.waitStatus > 0);

843 pred.next = node;

844 } else {

845 /*

846 * waitStatus must be 0 or PROPAGATE. Indicate that we

847 * need a signal, but don't park yet. Caller will need to

848 * retry to make sure it cannot acquire before parking.

849 */

//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!

850 pred.compareAndSetWaitStatus(ws, Node.SIGNAL);

851 }

852 return false;

853 }

parkAndCheckInterrupt

等线程找到安全的等待位置时,就可以安全进入等待状态(区分与阻塞的关系)


862 /**

863 * Convenience method to park and then check if interrupted.

864 *

865 * @return {@code true} if interrupted

866 */

867 private final boolean parkAndCheckInterrupt() {

868 LockSupport.park(this);

869 return Thread.interrupted();

870 }

线程状态转换

阻塞和等待的区别

阻塞:当一个线程试图获取对象锁(非java.util.concurrent库中的锁,即synchronized),而该锁被其他线程持有,则该线程进入阻塞状态。它的特点是使用简单,由JVM调度器来决定唤醒自己,而不需要由另一个线程来显式唤醒自己,不响应中断。

等待:当一个线程等待另一个线程通知调度器一个条件时,该线程进入等待状态。它的特点是需要等待另一个线程显式地唤醒自己,实现灵活,语义更丰富,可响应中断。例如调用:Object.wait()、Thread.join()以及等待Lock或Condition。

线程状态.png

参考Thread详解

AQS aquire大致流程如下:

AQS_acquire.png

release

唤醒队列的下一个节点代表的线程


1274 /**

1275 * Releases in exclusive mode. Implemented by unblocking one or

1276 * more threads if {@link #tryRelease} returns true.

1277 * This method can be used to implement method {@link Lock#unlock}.

1278 *

1279 * @param arg the release argument. This value is conveyed to

1280 * {@link #tryRelease} but is otherwise uninterpreted and

1281 * can represent anything you like.

1282 * @return the value returned from {@link #tryRelease}

1283 */

1284 public final boolean release(int arg) {

1285 if (tryRelease(arg)) {

1286 Node h = head;

1287 if (h != null && h.waitStatus != 0)

1288 unparkSuccessor(h);

1289 return true;

1290 }

1291 return false;

1292 }

以上是独占的逻辑,共享模式也是类似的,详细可以参考Java并发之AQS详解,这篇文章写的比较清楚

线程安全

我们常见的保护线程安全的几种方式为

1,使用线程安全的类(自定义线程同步器,其实本质还是对各种锁的使用),2,使用锁,3,避免使用和设置成员变量类,保持无状态等;4,使用关键字保证线程安全(例:volatile)等

以 AtomicInteger为例


public class AtomicInteger extends Number implements java.io.Serializable {

private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates

private static final Unsafe unsafe = Unsafe.getUnsafe();

private volatile int value;// 初始int大小

// 省略了部分代码...

// 带参数构造函数,可设置初始int大小

public AtomicInteger(int initialValue) {

value = initialValue;

}

// 不带参数构造函数,初始int大小为0

public AtomicInteger() {

}

// 获取当前值

public final int get() {

return value;

}

// 设置值为 newValue

public final void set(int newValue) {

value = newValue;

}

//返回旧值,并设置新值为 newValue

public final int getAndSet(int newValue) {

/**

* 这里使用for循环不断通过CAS操作来设置新值

* CAS实现和加锁实现的关系有点类似乐观锁和悲观锁的关系

* */

for (;;) {

int current = get();

if (compareAndSet(current, newValue)) //通过自旋,不断比较当前值和理想值,直到可以设置为止

return current;

}

}

// 原子的设置新值为update, expect为期望的当前的值

public final boolean compareAndSet(int expect, int update) {

return unsafe.compareAndSwapInt(this, valueOffset, expect, update);

}

// 获取当前值current,并设置新值为current+1

public final int getAndIncrement() {

for (;;) {

int current = get();

int next = current + 1;

if (compareAndSet(current, next))

return current;

}

}

// 此处省略部分代码,余下的代码大致实现原理都是类似的

}

本质上这是cas(Compare and Swap),是一种乐观锁

CAS 悲观锁与乐观锁

Java在JDK1.5之前都是靠 synchronized关键字保证同步的,这种通过使用一致的锁定协议来协调对共享状态的访问,可以确保无论哪个线程持有共享变量的锁,都采用独占的方式来访问这些变量。这就是一种独占锁,独占锁其实就是一种悲观锁,所以可以说 synchronized 是悲观锁。

悲观锁机制存在以下问题:

1. 在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。

2. 一个线程持有锁会导致其它所有需要此锁的线程挂起。

3. 如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。

对比于悲观锁的这些问题,另一个更加有效的锁就是乐观锁。其实乐观锁就是:每次不加锁而是假设没有并发冲突而去完成某项操作,如果因为并发冲突失败就重试,直到成功为止。

CAS是乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试

乐观锁的缺点

1 ABA 问题

如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然是A值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回A,那CAS操作就会误认为它从来没有被修改过。这个问题被称为CAS操作的 “ABA”问题。(比如说一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且two进行了一些操作变成了B,然后two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后one操作成功。尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的)

JDK 1.5 以后的 AtomicStampedReference 类就提供了此种能力(加一个时间戳),其中的 compareAndSet 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

2 循环时间长开销大

自旋CAS(也就是不成功就一直循环执行直到成功)如果长时间不成功,会给CPU带来非常大的执行开销。 如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。

3 只能保证一个共享变量的原子操作

CAS 只对单个共享变量有效,当操作涉及跨多个共享变量时 CAS 无效。但是从 JDK 1.5开始,提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作.所以我们可以使用锁或者利用AtomicReference类把多个共享变量合并成一个共享变量来操作

总结

综上,我们大致了解了java多线程同步,线程安全的一些注意事项;了解aqs(同步器)和cas(乐观锁技术-原子类);这些都被用来定义concurrent包中的基础类;而concurrent包高层类正是基础类的进一步复用

扩展学习-concurrent高层类,如# ConcurrentHashMap

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容