4、Java并发编程入门与高并发面试-线程安全性

慕课网 Jimin老师 Java并发编程入门与高并发面试 学习笔记
Java并发编程入门与高并发面试

线程安全性定义:
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的

原子性:提供了互斥访问,同一时刻只能有一一个线程来对它进行操作
可见性: 一个线程对主内存的修改可以及时的被其他线程观察到
有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果- -般杂乱无序

原子性-Atomic包
◆AtomicXXX : CAS. Unsafe.compareAndSwapInt
package com.huhao.concurrency.example.count;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 代码模拟并发测试
 * Atomic包演示
 */
@Slf4j
@ ThreadSafe
public class CountExample2 {

    //请求总数
    public static int clientTotal = 5000;

    //同时并发执行的线程数
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);


    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int index = 0; index < clientTotal; index++) {
            exec.execute(() -> {
                try {
                    //线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
                    semaphore.acquire();
                    add();
                    //add执行完后,释放当前线程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                    e.printStackTrace();
                }

                countDownLatch.countDown();
            });
        }
        //保证线程减到0
        countDownLatch.await();
        //关闭线程池
        exec.shutdown();
        log.error("count:{}", count);//count:4952
    }

    private static void add() {
        count.incrementAndGet();
    }
}

原因详解:

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

/**
 * eg:2+1=3
 * @param var1 :当前count对象
 * @param var2:当前的值  2
 * @param var4 :加的值   1
 * 
 * var5是底层方法获取到的值。如果没有别的线程处理更改的时候,正常应该var5=2,
 * compareAndSwapInt 来判断,直到var2与var5相同时,才会var5+var4=2+1,返回3
 * @return the updated value
 */
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}
AtomicLong、LongAdder

由于getAndAddInt方法,一直循环比较,在线程数少的时候,比较的成功率比较高,在多的时候,失败的次数会增多,导致一直循环,效率较低
并发量大的时候,使用LongAdder效率会高一点。如果小的话,也可以用AtomicInteger

AtomicReference 、AtomicReferenceFieldUpdater
package com.huhao.concurrency.example.atomic;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

/**
 * 代码模拟并发测试
 * AtomicReference包演示
 */
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicReference {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0, 2);//count=2
        count.compareAndSet(0, 1);//count=2,不是0,则不执行
        count.compareAndSet(1, 3);//count=2,不是0,则不执行
        count.compareAndSet(2, 4);//count=4
        count.compareAndSet(3, 5);//count=4,不是0,则不执行
        log.info("count:{}", count.get());//count:4
    }
}
package com.huhao.concurrency.example.atomic;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 代码模拟并发测试
 * AtomicReferenceFieldUpdater包演示
 */
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicExampleAtomicReference {

    //AtomicExampleAtomicExampleAtomicReference的count字段,必须用volatile修饰
    private static AtomicIntegerFieldUpdater<AtomicExampleAtomicExampleAtomicReference>
            updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExampleAtomicExampleAtomicReference.class, "count");

    @Getter
    public volatile int count = 100;

    public static void main(String[] args) {

        AtomicExampleAtomicExampleAtomicReference example5 = new AtomicExampleAtomicExampleAtomicReference();

        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 1,{}", example5.getCount());   // update success 1,120
            if (updater.compareAndSet(example5, 100, 120)) {
                log.info("update success 2,{}", example5.getCount());
            } else {
                log.info("update failed,{}", example5.getCount());  // update failed,120
            }
        }
    }
}
AtomicStampReference : CAS的ABA问题

aba问题:cas操作时候,其他线程将变量值A,从A改到B,又改回A。本线程在比较的时候,发现A变量没有变,于是就讲A值进行了交换操作,其实该值以及被其他线程更新过,与最初设计初衷不符。
解决:在变量更新时候,将变量版本号加1

AtomicBoolean

只执行一次


import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 代码模拟并发测试
 * AtomicBoolean包演示
 */
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicBoolean {

    //请求总数
    public static int clientTotal = 5000;

    //同时并发执行的线程数
    public static int threadTotal = 200;

    public static AtomicBoolean isHappened = new AtomicBoolean();


    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int index = 0; index < clientTotal; index++) {
            exec.execute(() -> {
                try {
                    //线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
                    semaphore.acquire();
                    test();
                    //add执行完后,释放当前线程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                    e.printStackTrace();
                }

                countDownLatch.countDown();
            });
        }
        //保证线程减到0
        countDownLatch.await();
        //关闭线程池
        exec.shutdown();
        log.error("count:{}", isHappened.get());
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)) {
            //5000次线程,只会执行一次
            log.info("exec");
        }
    }
}

原子性-synchronized

◆修饰代码块:大括号括起来的代码,作用于调用的对象
◆修饰方法:整个方法,作用于调用的对象
◆修饰静态方法:整个静态方法,作用于所有对象
◆修饰类:括号括起来的部分,作用于所有对象

package com.huhao.concurrency.example.sync;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 *  Synchronized测试修饰代码块和方法
 */
@Slf4j
@ThreadSafe
public class SynchronizedExample {

    /**
     * 一般的循环,多线程测下看看对不对
     */
    private void notSyncBlock() {
        for (int i = 0; i < 10; i++) {
            log.info("notSyncBlock - {}", i);
        }
    }

    /**
     * 修饰代码块
     * 大括号括起来的代码,作用于调用的对象
     */
    private void synchronizedBlock() {
        synchronized (this) {
            for (int i = 0; i < 4; i++) {
                log.info("test1 - {}", i);
            }
        }
    }

    /**
     * 修饰代码块
     * 大括号括起来的代码,作用于调用的对象
     */
    private void synchronizedBlock(int j) {
        synchronized (this) {
            for (int i = 0; i < 4; i++) {
                log.info("synchronizedBlock {} - {}", j, i);
            }
        }
    }

    /**
     * 修饰方法
     * 整个方法,作用于调用的对象
     */
    private synchronized void synchronizedMethod() {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}", i);
        }
    }

    //请求总数
    public static int clientTotal = 4;

    //同时并发执行的线程数
    public static int threadTotal = 3;

    public static void main(String[] args) throws InterruptedException {
        SynchronizedExample synchronizedExample = new SynchronizedExample();
        SynchronizedExample synchronizedExample2 = new SynchronizedExample();

        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        /**
         * 线程同步执行测试是不是同步一个个的执行
         */
        for (int index = 0; index < clientTotal; index++) {
            exec.execute(() -> {
                try {
                    //线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
                    semaphore.acquire();
                    /**
                     *  没有加sync的,
                     *  notSyncBlock - 0
                     *  notSyncBlock - 0
                     *  notSyncBlock - 0
                     *  notSyncBlock - 1
                     *  notSyncBlock - 1
                     *  notSyncBlock - 1
                     *  notSyncBlock - 2
                     *  notSyncBlock - 2
                     *  notSyncBlock - 2
                     *  notSyncBlock - 3
                     */
//                    synchronizedExample.notSyncBlock();


                    /**
                     * 加了sync的,同步执行
                     * test1 - 0
                     * test1 - 1
                     * test1 - 2
                     * test1 - 3
                     * test1 - 0
                     * test1 - 1
                     * test1 - 2
                     * test1 - 3
                     * test1 - 0
                     * test1 - 1
                     * test1 - 2
                     * test1 - 3
                     * test1 - 0
                     */
//                    synchronizedExample.synchronizedBlock();

                    /**
                     * example和example2会同时调用,但是本身是顺序执行的
                     * 同一个对象,是同步执行
                     * 不同对象间,不干扰
                     */
//                    synchronizedExample.synchronizedBlock(0);
//                    synchronizedExample2.synchronizedBlock(1);
//                    synchronizedExample2.synchronizedBlock(2);

                    //add执行完后,释放当前线程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                    e.printStackTrace();
                }

                countDownLatch.countDown();
            });
        }
        //保证线程减到0
        countDownLatch.await();
        //关闭线程池
        exec.shutdown();
//        synchronizedExample.synchronizedMethod();
    }
}
package com.huhao.concurrency.example.sync;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

/**
 * Synchronized 测试修饰静态方法和类
 */
@Slf4j
@ThreadSafe
public class SynchronizedExample2 {

    /**
     * 一般的循环,多线程测下看看对不对
     */
    public void notSyncBlock() {
        for (int i = 0; i < 10; i++) {
            log.info("notSyncBlock - {}", i);
        }
    }

    /**
     * 修饰类
     */
    public static void synchronizedBlock() {
        synchronized (SynchronizedExample2.class) {
            for (int i = 0; i < 4; i++) {
                log.info("test1 - {}", i);
            }
        }
    }

    /**
     * 修饰静态方法
     */
    public static synchronized void synchronizedMethod() {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}", i);
        }
    }

    //请求总数
    public static int clientTotal = 4;

    //同时并发执行的线程数
    public static int threadTotal = 3;

    public static void main(String[] args) throws InterruptedException {
    }
}

对比:

◆synchronized :不可中断锁,适合竞争不激烈,可读性好
◆Lock :可中断锁,多样化同步,竞争激烈时能维持常态
◆Atomic :竞争激烈时能维持常态,比Lock性能好;只能同步一个值

可见性

导致共享变量在线程间不可见的原因
◆线程交叉执行
◆重排序结合线程交叉执行
◆共享变量更新后的值没有在工作内存与主存间及时更新

可见性- synchronized

JMM关于synchronized的两条规定:
◆线程解锁前 ,必须把共享变量的最新值刷新到主内存
◆线程加锁时 ,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(注意,
加锁与解锁是同一把锁)

可见性- volatile

通过加入内存屏障和禁止重排序优化来实现
◆对volatile变量写操作时 ,会在写操作后加入-条store屏障指令,将本地内存中的共享变量值刷新到主内存
◆对volatile变量读操作时 ,会在读操作前加入-条load屏障指令,从主内存中读取共享变量

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343