多线程篇四(手写一个Lock实现类以及AQS详解)

前言
结合我之前写的文章,我们已经学会了如何使用jdk提供的Lock,也知道了一些原理,那么我们自己能不能写一个Lock实现呢?答案是肯定的!
我们分析jdk自己提供的Lock实现类不难发现,他们底层都是基于AbstractQueuedSynchronizer同步器实现的

正文
一、AQS基本使用方法
同步器的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态。AQS的设计是基于模板方法设计模式,使用者需要继承AQS并重写指定的方法,随后将同步器组合在自定义的不同组件的实现中,并且调用同步器提供的模板方法,而这些模板方法将调用使用者重写的方法
通过分析AQS类,我们发现我们能重写的方法也不多


AQS.png

1、重点关注下面几个可重写的方法
(1)、tryAcquire(int arg):获取独占锁
(2)、tryRelease(int arg):释放独占锁
(3)、tryAcquireShared(int arg):获取共享锁
(4)、tryReleaseShared(int arg):释放共享锁
(5)、isHeldExclusively():快速判断当前锁释放被线程独占
2、对同步状态进行更改,这时就需要使用同步器提供的3个final方法来进行操作
(1)、getState()获取同步状态
(2)、setState(int newState)设置同步状态
(3)、compareAndSetState(int expect,int update)原子的设置同步状态
3、自定义Lock实现类

package com.yubin.concurrent;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 自定义Lock接口实现类
 */
public class MyLock implements Lock {

    private Sync sync;

    public MyLock() {
        sync = new Sync();
    }

    private static class Sync extends AbstractQueuedSynchronizer {

        // 尝试获取独占锁
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 尝试是否独占锁
        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 快速判断当前锁释放已被独占
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }


    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

测试类

package com.yubin.concurrent;

import com.yubin.thread.basic.threadstate.SleepUtils;

import java.util.concurrent.locks.Lock;

/**
 * 测试自定义的Lock实现类
 *
 * @Author YUBIN
 * @create 2018-06-24
 */
public class TestMyLock {

    private final Lock lock = new MyLock();

    private volatile int count = 100;

    private static class WorkThread extends Thread {
        private TestMyLock myLock;

        public WorkThread(TestMyLock myLock) {
            this.myLock = myLock;
        }

        @Override
        public void run() {
            myLock.test();
        }
    }

    public void test() {
        lock.lock();
        try {
            SleepUtils.second(500);
            System.out.println(Thread.currentThread().getName() + "获取到的count=" + count--);
            SleepUtils.second(500);
            if (count == 98) {
                //test();
            }
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        TestMyLock myLock = new TestMyLock();
        // 启5个子线程
        for (int i = 0; i < 5; i++) {
            new WorkThread(myLock).start();
        }
        // 主线程没秒打印一行空格
        for (int i = 0; i < 10; i++) {
            SleepUtils.second(1000);
            System.out.println();
        }
    }

}

运行上面测试类的main方法,count值在以我们预期的方法一样依次的减一

Thread-1获取到的count=100

Thread-0获取到的count=99

Thread-4获取到的count=98

Thread-2获取到的count=97

Thread-3获取到的count=96

假如此时我把test()方法中的注释掉的方法放开就会发现所有的子线程都处于waiting状态

"Thread-4" #15 prio=5 os_prio=0 tid=0x00000000586df800 nid=0x31c waiting on condition [0x0000000059b5f000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000d962b078> (a com.yubin.concurrent.MyLock$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
    at com.yubin.concurrent.MyLock.lock(MyLock.java:57)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:33)
    at com.yubin.concurrent.TestMyLock$WorkThread.run(TestMyLock.java:28)

"Thread-3" #14 prio=5 os_prio=0 tid=0x00000000586de800 nid=0x1e14 waiting on condition [0x00000000590bf000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000d962b078> (a com.yubin.concurrent.MyLock$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
    at com.yubin.concurrent.MyLock.lock(MyLock.java:57)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:33)
    at com.yubin.concurrent.TestMyLock$WorkThread.run(TestMyLock.java:28)

"Thread-2" #13 prio=5 os_prio=0 tid=0x00000000586d8000 nid=0x304 waiting on condition [0x00000000599bf000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000d962b078> (a com.yubin.concurrent.MyLock$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
    at com.yubin.concurrent.MyLock.lock(MyLock.java:57)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:33)
    at com.yubin.concurrent.TestMyLock$WorkThread.run(TestMyLock.java:28)

"Thread-1" #12 prio=5 os_prio=0 tid=0x00000000586d7000 nid=0x1e0c waiting on condition [0x0000000058d2e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000d962b078> (a com.yubin.concurrent.MyLock$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
    at com.yubin.concurrent.MyLock.lock(MyLock.java:57)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:33)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:38)
    at com.yubin.concurrent.TestMyLock$WorkThread.run(TestMyLock.java:28)

"Thread-0" #11 prio=5 os_prio=0 tid=0x00000000586d5000 nid=0x196c waiting on condition [0x00000000597cf000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000d962b078> (a com.yubin.concurrent.MyLock$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
    at com.yubin.concurrent.MyLock.lock(MyLock.java:57)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:33)
    at com.yubin.concurrent.TestMyLock$WorkThread.run(TestMyLock.java:28)

那么为什么导致所有的子线程都处于waiting状态呢?看我们自己定义的Lock我们很容易会发现原来我们自己定义的Lock实现类不支持锁的可重入,由锁的可重入概念我们知道锁的可重入是以线程为单位和状态变量来实现可重入锁的
接下来我们对自定义的Lock实现类进行改造
4、自定义Lock实现类优化支持锁的可重入

package com.yubin.concurrent;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 自定义Lock接口实现类
 * @Author YUBIN
 * @create 2018-06-24
 */
public class MyLock implements Lock {

    private Sync sync;

    public MyLock() {
        sync = new Sync();
    }

    private static class Sync extends AbstractQueuedSynchronizer {

        // 尝试获取独占锁
        @Override
        protected boolean tryAcquire(int arg) {
            // 获取当前线程
            Thread current = Thread.currentThread();
            // 获取锁的状态
            int c = getState();
            if (c == 0) {
                // 如果锁的状态是0则以CAS的形式将状态变量设置为1
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                // 因为是独占锁,所以同一时刻只能有一个线程能获取到锁,如果当前的锁是被当前线程获取过了,则将状态变量+1
                int nextC = c + arg;
                // 设置新的状态变量
                setState(nextC);
                return true;
            }
            return false;
        }

        // 尝试是否独占锁
        @Override
        protected boolean tryRelease(int arg) {
            // 判断当前锁释放是当前线程锁独占的,如果判断不成立则抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            int c = getState() - arg;
            boolean free = false;
            if (c == 0){
                // 如果状态为0了则说明当前线程可以释放对锁的持有了
                setExclusiveOwnerThread(null);
                free = true;
            }
            setState(c);
            return free;
        }

        // 快速判断当前锁释放已被独占
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }


    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

在改造好的Lock实现类里面我只是对tryAcquire()方法和tryRelease()方法进行了改写。
在tryAcquire方法主要加了如果state不为0的时候,判断当前线程是否已经和锁绑定,已经绑定的话则将state+1同时返回true
在tryRelease方法中主要增加了释放锁的时候是对state变量逐次减一当减到0的时候才将锁与当前线程绑定的状态去除,释放锁。
再次运行测试类的main方法程序可以正常的运行了,是不是很简单,嘻嘻嘻。

二、AQS原理剖析
1、如果线程没有获取到锁,我们的线程是如何等待的呢?AQS又是怎么支持的呢?接下来我们还是回到lock()方法上。
在lock方法中调用了同步器的acquire()方法

public void lock() {
    sync.acquire(1);
}

同步器的acquire方法中的具体实现如下

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(arg):这个tryAcquire方法执行的是我们自己定义的尝试独占的获取锁方法,如果获取失败就会走acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。
addWaiter(Node.EXCLUSIVE):将当前线程封装成Node对象挂在等待队列的尾部

private Node addWaiter(Node mode) {
    // 把当前线程打包成一个Node对象 node中thread为当前线程 mode为null表示当前节点为独占节点 独占锁
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 首先尝试快速的挂尾节点 (假设自己是唯一的,不存在冲突)
    Node pred = tail;
    if (pred != null) {
        // 把当前节点的前一个节点设置为当前尾节点
        node.prev = pred;
        // 以CAS的形式把当前节点设置为尾节点
        if (compareAndSetTail(pred, node)) {
            // 把之前的尾节点的下一个节点设置为当前节点
            pred.next = node;
            // 返回当前节点
            return node;
        }
    }
    // 如果尝试失败就会走全面的入队操作 自旋CAS的模式来挂尾节点
    enq(node);
    return node;
}

enq(node):全面的入队操作

private Node enq(final Node node) {
    // 自旋CAS的形式来挂尾节点
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 如果AQS同步器对象中的尾节点为空的话则初始化一个空的尾节点,初始化成功的话则将这个初始化尾节点设置为头节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 将当前节点的前一个节点设置为尾节点
            node.prev = t;
            // 以CAS的形式将当前节点设置为尾节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                // 返回尾节点
                return t;
            }
        }
    }
}

上面的代码执行完则说明当前线程已经挂在尾节点上了,下图为AQS中的同步队列基本结构图


同步队列的基本结构.png

接下来就会执行acquireQueued(final Node node, int arg)方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋CAS
        for (;;) {
            // 获取当前节点的前一个节点
            final Node p = node.predecessor();
            // 如果当前节点的前一个节点 == AQS对象的头结点则再次尝试获取一下锁
            if (p == head && tryAcquire(arg)) {
                // 获取锁成功了的话则将当前节点设置为头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在上面这段代码中就使用到了LockSupport,LockSupport定义了一组公共静态方法,这些方法提供了最基本的线程阻塞和唤醒的功能,而LockSupport也成为构建同步组件的基础工具。LockSupport定义一组以park开头的方法用来阻塞当前线程,以及unPark(Thread thread)方法来唤醒一个被阻塞的线程

2、线程获取到锁并且执行完自己的程序又是怎么释放锁的呢?是否锁的过程又是怎样的呢?AQS又是怎么支持的呢?接下来我们回到unLock()方法上。
在unLock()方法中调用了release()方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

该方法先调用我们自定义的tryRelease()方法,成功的话则会去调用unparkSuccessor(h)方法,在这个方法中会获取同步队列中的头结点的next节点中对应的线程,然后通过LockSupport类中的静态方法unpark(Thread thread)方法将其唤醒。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

总结
根据上面的流程我们总结了如下几幅图
1、尝试获取独占锁


01.png

2、获取独占锁失败将当前线程挂在同步队列的尾部及唤醒之后的操作


1.png

3、释放锁的过程
2.png

3、节点的相关操作图
3.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容