多线程引发的问题
- 各线程共享同一变量存在竞争,变量值的改变无法预测
- 可以通过在方法前面加synchronized来保证结果的正确,可是syschronized效率太低,性能太差了,代码示例
Counter类:
public class Counter {
private int count;
public synchronized void add(){
try {
for (int i = 0; i < 200; i++) {
Thread.sleep(100);
this.count++;
System.out.println(this.count);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
Main类:
public class Main {
public static void main(String[] args) {
Counter counter=new Counter();
new Thread(new Runnable() {
@Override
public void run() {
counter.add();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
counter.add();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
counter.add();
}
}).start();
}
}
解释:三个线程都将对Counter类中的count做200次加法,然而线程有可能会同时进行,从而输出同样的值,最后的结果也不是600,在方法上加上synchronized关键字后,每次只用一个线程能进入这个方法,解决了共享问题,但是性能太低,效率太差了,所以一般是在会出现竞争的代码前使用synchronized来实现,代码如下
public class Counter {
private int count;
public void add(){
try {
for (int i = 0; i < 200; i++) {
Thread.sleep(100);
synchronized (this){//竞争条件
this.count++;
System.out.println(this.count);
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
- 除了使用synchronized关键字,jdk还给我们提供了另一种方法,使用AtomicInter类来进行同步,AtomicInteger是个类,必须要实例出来
public class Counter {
private AtomicInteger count=new AtomicInteger(0);//解决并发问题
public void add(){
try {
for (int i = 0; i < 200; i++) {
Thread.sleep(100);
System.out.println( count.incrementAndGet());//原子操作的自增1
}
}catch (Exception e){
e.printStackTrace();
}
}
}
- synchronsized是内部锁,下面介绍内部锁的可重入性
当一个线程请求其它的线程已经占有的锁时,请求线程将被阻塞。然而内部锁是可重进入的,因此线程在试图获得它自己占用的锁是,请求会成功。重进入意味着请求是基于“每一个线程”,而不是基于“每一次调用”(互斥锁是基于每次调用的)。重进入的实现是通过为每一个锁关联一个请求技术器和一个占有他的线程。当计数为0时,认为锁是未被占用的。线程请求一个未被占有的锁时候,JVM将记录锁的占有者,并且将请求计数设置为1。如果同一个线程再次请求这个锁,计数将递增;每次占用线程退出语句块时,计数器值将递减,直到计数器达到0时候,锁被释放。
指令排序问题
编译器或运行时环境为了优化程序性能而采取的对指令进行重新排序执行的一种手段。
也就是说,对于下面两条语句:
int a = 10;
int b = 20;
在计算机执行上面两句话的时候,有可能第二条语句会先于第一条语句执行。所以,千万不要随意假设指令执行的顺序。
代码示例:
Visiblity1:
public class Visiblity1 {
public static int number;
public static boolean read;
}
ReadThread:
public class ReadThread extends Thread{
@Override
public void run() {
while (!Visiblity1.read){
Thread.yield();
System.out.println(Visiblity1.number);
}
}
}
Test:
public class Test {
public static void main(String[] args) {
new ReadThread().start(); //结果又三种可能 42 0 不输出
Visiblity1.number=42;
Visiblity1.read=true;
}
}
解析:结可能有三种情况,42,0,或者不输出(未进入循环)
第一种可能是预想情况
第二中是因为赋值语句在循环语句前执行
第三种是因为指令的排序可能不是按照顺序执行的
cpu缓存问题
cpu会从缓存去数据,而不访问内存
public class Visiblity {
public static boolean bChanged;
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
for (;;){
if(bChanged==true){
System.out.println("!=");
System.exit(0);
}
}
}
}).start();
Thread.sleep(10);//一定要加
new Thread(new Runnable() {
@Override
public void run() {
for (;;){
bChanged=true;
}
}
}).start();
}
}
结果是一直等待,因为第一个线程的bChange变量一直从缓存中取
如果加上public volatile static boolean bChanged;关键字,告诉cpu必须去访问内存而不是从缓存区数据
- volatile 保证变量的修改让所有线程可见 阻止指令排序
- volatile是古老的关键字,synchronized已经优化的很好了,不要去刻意使用volatile
- 所以说你传入synchronized能解决可见性和原子性,volatile只能解决可见性
线程封闭
- 使用final关键字,不要共享变量(就是一句废话)
- 桟封闭 比如 变量在方法内部声明和修改,简单来说就是使用局部变量(变量定义在里面)
- ThreadLocal线程绑定,就是变量在不同线程上的副本(变量定义在外面,但是访问修改的时候每个线程都会有一个副本)
使用ThreadLocal是实现线程封闭的最好方法。ThreadLocal内部维护了一个Map,Map的key是每个线程的名称,而Map的值就是我们要封闭的对象。每个线程中的对象都对应着Map中一个值,也就是ThreadLocal利用Map实现了对象的线程封闭。管理链接方面经常用
public class Visiblity {
private static ThreadLocal<LocalTest> threadLocal=new ThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
LocalTest local=new LocalTest();
new Thread(new Runnable() {
@Override
public void run() {
for (;;){
threadLocal.set(local);
LocalTest l=threadLocal.get();
l.setNum(20);
System.out.println(Thread.currentThread().getName()+"==="+threadLocal.get().getNum());
Thread.yield();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
for (;;){
threadLocal.set(local);
LocalTest l=threadLocal.get();
l.setNum(30);
System.out.println(Thread.currentThread().getName()+"==="+threadLocal.get().getNum());
Thread.yield();
}
}
}).start();
}
}
java同步和并发容器
- 同步容器 Vector,Hashtable 里面的方法都加了synchronized,性能太差
同步容器依旧会有问题,迭代过程中发生修改,会报并发修改异常,可以修改线程时独立拷贝一份集合
并发修改异常不仅是多线程会有,单线程也会有,迭代list集合,直接使用list的remove方法而不使用迭代器的remove方法会抛出并发修改异常 - 并发容器 :
- ConcurrentHashMap是分段锁,普通的HashTable虽然 线程安全 但是它是直接在每个方法前加synchronized关键字,效率低,
分段锁每次访问只允许一个线程修改哈希表的映射关系,但ConcurrentHashMap size()方法有可能返回的不是及时的值
public class multithreading.Test {
public static void main(String[] args) {
ConcurrentHashMap map=new ConcurrentHashMap();
map.putIfAbsent(1,2);//只用当key不存在才去设置
HashTable table=new HashTable();
if(table.get(1)==null){
table.put(1,2);
}//这样子写多线程肯定出问题
}
}
ConcurrentHashMap是jdk1.5才出来的,解决了HashTable的效率问题,但是会可能出现误差,,ConcurrentHashMap是弱一致性的。 多线程环境下,直接使用判空在去设值肯定会出问题。
- CopyOnWriteArrayList/Set 读操作站占绝大部分,写操作比较少
- 阻塞队列BlockingQueue 见的不多
闭锁 栅栏 信号量
- 闭锁:一个线程依赖于其他线程的业务,某个服务只有等到另一个操作结束后才能执行
CountDowmLatch 例子(一家人都到了才能吃饭)
class multithreading.Test{
private static CountDownLatch latch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException
{
new Thread()
{
public void run()
{
fatherToRes();
latch.countDown();
};
}.start();
new Thread()
{
public void run()
{
motherToRes();
latch.countDown();
};
}.start();
new Thread()
{
public void run()
{
meToRes();
latch.countDown();
};
}.start();
latch.await();
togetherToEat();
}
}
解析CountDowmLatch是一种灵活的闭锁实现,包含一个计数器,该计算器初始化为一个正数,表示需要等待事件的数量。countDown方法递减计数器,表示有一个事件发生,而await方法等待计数器到达0,表示所有需要等待的事情都已经完成。
- 栅栏:所有线程必须要拿到某个状态之后这些线程才能进行操作
public class multithreading.Test {
public static void main(String[] args) {
int N=4;
CyclicBarrier barrier=new CyclicBarrier(N);
for (int i = 0; i < N; i++) {
new Writer(barrier).start();
}
}
static class Writer extends Thread{
private CyclicBarrier barrier;
public Writer(CyclicBarrier cyclicBarrier){
this.barrier=cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在输入");
try {
Thread.sleep(5000);//以睡眠来模拟输入
System.out.println("线程"+Thread.currentThread().getName()+"数据写入完毕");
barrier.await();//只用栅栏里的所有线程都执行到这里,其他线程才能执行
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有线程写入完毕");
}
}
}
- 信号量
public class multithreading.Test {
public static void main(String[] args) {
int N=8;//工人数
Semaphore semaphore=new Semaphore(5);//机器数目
for (int i = 0; i < N; i++) {
new Worker(i,semaphore).start();
}
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num ,Semaphore semaphore){
this.num=num;
this.semaphore=semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一它机器");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放机器");
semaphore.release();
}catch (Exception e){
e.printStackTrace();;
}
}
}
} public class multithreading.Test {
public static void main(String[] args) {
int N=8;//工人数
Semaphore semaphore=new Semaphore(5);//机器数目
for (int i = 0; i < N; i++) {
new Worker(i,semaphore).start();
}
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num ,Semaphore semaphore){
this.num=num;
this.semaphore=semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一它机器");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放机器");
semaphore.release();
}catch (Exception e){
e.printStackTrace();;
}
}
}
}
栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待等待时间,而栅栏用于等待线程。
场景对比:
l 闭锁场景:几个人相约去公园游玩,在家做好准备,约定在某一时刻同时出发去公园,准备工作进行的快的不能提前出门,到点出门。
l 栅栏场景:几个人相约去公园游玩,几个人去到公园门口,要等全部到达公园门口后才一起进入公园。
l 信号量场景:几个人相约去公园游玩,等大家都到公园后,发现来的太迟了,公园游客饱和,公园限制入场游客的数量。游客在门口等待,出来一人,再进入一人,只能一个一个进入。
线程池
Executor是一个接口 ExecutorService是Executor的子类
public class multithreading.Test {
public static void main(String[] args)throws Exception {
Executor executor=Executors.newFixedThreadPool(100);//创建线程池
ServerSocket serverSocket=new ServerSocket(8888);
while (true){
Socket socket=serverSocket.accept();
Runnable task=new Runnable() {
@Override
public void run() {
handleRequest(socket);
}
};
//new Thread(task).start();
executor.execute(task);//交给线程池
}
}
private static void handleRequest(Socket socket) {
}
}
使用线程池对线程进行管理能够控制线程的数量
一些其他的线程池:
- Executors.newScheduledThreadPool()线程池中只能放一个,如果这个线程挂了,会在线程池中马上起另一个线程,保证只有一个线程且一直可用
- Executors.newCachedThreadPool() 线程数量没有限制
- ThreadPoolExecutor java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,构造器中各个参数的含义如下
- corePoolSize核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
- maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
- keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
- unit:参数keepAliveTime的时间单位,
- workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
- threadFactory:线程工厂,主要用来创建线程;
- handler:表示当拒绝处理任务时的策略
各种锁
- 死锁
互相等待,锁的嵌套会出现死锁
避免死锁:尽量不要去写锁的嵌套,锁嵌套的顺序相同 - 显示锁
Lock接口
public class multithreading.Test {
public static void main(String[] args)throws Exception {
Lock lock=new ReentrantLock();//创建可重用锁
try {
lock.lock();
System.out.println("开始同步");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
释放锁的操作一定要写在finally里,否则出现异常会直接锁死,一般还是用synchronized,需要在复杂的情况下自定义超时时间的情况下才会使用lock
- ReentrantReadWriteLock 可重入读写锁
设想以下情景:我们在系统中有一个多线程访问的缓存,多个线程都可以对缓存进行读或写操作,但是读操作远远多于写操作,要求写操作要线程安全,且写操作执行完成要求对当前的所有读操作马上可见。
分析上面的需求:因为有多个线程可能会执行写操作,因此多个线程的写操作必须同步串行执行;而写操作执行完成要求对当前的所有读操作马上可见,这就意味着当有线程正在读的时候,要阻塞写操作,当正在执行写操作时,要阻塞读操作。一个简单的实现就是将数据直接加上互斥锁,同一时刻不管是读还是写线程,都只能有一个线程操作数据。但是这样的问题就是如果当前只有N个读线程,没有写线程,这N个读线程也要傻呵呵的排队读,尽管其实是可以安全并发提高效率的。因此理想的实现是:
当有写线程时,则写线程独占同步状态。
当没有写线程时只有读线程时,则多个读线程可以共享同步状态。
读写锁就是为了实现这种效果而生
public class ReadWriteCache {
private static Map<String, Object> data = new HashMap<>();
private static ReadWriteLock lock = new ReentrantReadWriteLock(false);
private static Lock rlock = lock.readLock();
private static Lock wlock = lock.writeLock();
public static Object get(String key) {
rlock.lock();
try {
return data.get(key);
} finally {
rlock.unlock();
}
}
public static Object put(String key, Object value) {
wlock.lock();
try {
return data.put(key, value);
} finally {
wlock.unlock();
}
}
}
- 公平锁 谁下来,谁就先被锁,先来后到
- 乐观锁 悲观锁 分布式锁 数据库层面的
乐观锁的实现:在表中加一个版本字段,每次读的时候判断版本
悲观锁:select * from user for update 相当于synchronized
分布式锁 redis zk