1.并发容器
1.1.第一类Collection,也叫做集合
集合的意思就是说这个容器是什么结构,你都可以把一个元素一个元素的往里面添加。从数据结构的角度来说,这个存储的数据结构也就两种
连续存储的数组Array,和非连续存储的一个指向另外一个的链表,但是逻辑结构就很多了。Collection又分成了3大类,分别为Set,List,Queue,Queue队列接口就是为了高并发准备的,Set不会有重复的元素。队列最主要的原因就是实现了任务的状态和获取,叫做阻塞队列,其中有一个子接口Deque叫做双端队列,一般的队列只能一端进另外一端出,但是双端队列却可以同进同出。
1.2.第二类Map,集合特殊的变种,一对一对的
1.2.1.HashTable
最开始java1.0的容器里只有两个,Vector和Hashtable,但是这两个容器所有的默认方法都是加了锁synchronized的,这是最早设计不太合理的地方。后来又加了HashMap,它是完全没有加锁的,后来为了适配有锁的场景又加了SynchronizedMap,它是HashMap的加锁版本。
Vector和Hashtable 自带锁,基本不用
1.2.2.HashMap
我们来看看这HashMap,同学们想一下这个HashMap往里头插会不会有问题,因为HashMap没有锁,线程不安全。虽然他速度比较快,但是数据会出问题,并且可能报各种异常。
主要是因为它内部会把这个变成TreeNode。具体细节不再细究
package com.learn.thread.six;
import com.learn.thread.six.enums.Constans;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.UUID;
public class TestHashMap {
private static HashMap<UUID, UUID> m = new HashMap<>();
static int count = Constans.COUNT;
static UUID[] keys = new UUID[count];
static UUID[] values = new UUID[count];
static final int THREAD_COUNT = Constans.THREAD_COUNT;
static {
for (int i = 0; i < count; i++) {
keys[i] = UUID.randomUUID();
values[i] = UUID.randomUUID();
}
}
static class Mythread extends Thread {
/**
* 开始位置
*/
int start;
/**
* 每一个线程放入的个数
*/
int gap = count/THREAD_COUNT;
public Mythread(int start) {
this.start = start;
}
@Override
public void run() {
for (int i = start; i < start + gap; i++) {
m.put(keys[i], values[i]);
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Mythread(i * (count/THREAD_COUNT));
}
for (Thread item : threads) {
item.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(m.size());
}
}
}
可以看到,HashMap不是线程安全的
1.2.3.SynchronizedHashMap
这个是给HashMap手动加了锁,它的源码自己做了一个Object,然后每次都是SynchronizedObject,严格来讲他和那个HashMap效率上区别不大
package com.learn.thread.six;
import com.learn.thread.six.enums.Constans;
import java.util.*;
public class TestSynchronziedHashMap {
private static Map<UUID, UUID> m = Collections.synchronizedMap(new HashMap<UUID, UUID>());
static int count = Constans.COUNT;
static UUID[] keys = new UUID[count];
static UUID[] values = new UUID[count];
static final int THREAD_COUNT = Constans.THREAD_COUNT;
static {
for (int i = 0; i < count; i++) {
keys[i] = UUID.randomUUID();
values[i] = UUID.randomUUID();
}
}
static class Mythread extends Thread {
/**
* 开始位置
*/
int start;
/**
* 每一个线程放入的个数
*/
int gap = count/THREAD_COUNT;
public Mythread(int start) {
this.start = start;
}
@Override
public void run() {
for (int i = start; i < start + gap; i++) {
m.put(keys[i], values[i]);
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Mythread(i * (count/THREAD_COUNT));
}
for (Thread item : threads) {
item.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(m.size());
// ---------------------------------
start = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10; j++) {
System.out.println(m.get(keys[10]));
}
});
}
for (Thread item : threads) {
item.start();
}
for (Thread item : threads) {
try {
item.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
end = System.currentTimeMillis();
System.out.println(end - start);
}
}
}
1.2.4.ConcurrentHashMap
ConcurrentHashMap 是多线程里边真正用得到的,这个效率主要体现在读上面,由于它往里边插的时候做了各种判断,本来是链表的,后来改成了红黑树,然后又做了各种各样的CAS判断,所以插入的时候是更低一点。
HashMap和HashTable层虽然读的效率稍微低一点,但是他往里插的时候检查的东西比较少,就加个锁往后一插。
所以,各种Map的使用,看你实际当中的需求。下面用个程序看看ConcurrentHashMap的查询速度和插入速度
package com.learn.thread.six;
import com.learn.thread.six.enums.Constans;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class TestConcurrentHashMap {
private static Map<UUID, UUID> m = new ConcurrentHashMap<>();
static int count = Constans.COUNT;
static UUID[] keys = new UUID[count];
static UUID[] values = new UUID[count];
static final int THREAD_COUNT = Constans.THREAD_COUNT;
static {
for (int i = 0; i < count; i++) {
keys[i] = UUID.randomUUID();
values[i] = UUID.randomUUID();
}
}
static class Mythread extends Thread {
/**
* 开始位置
*/
int start;
/**
* 每一个线程放入的个数
*/
int gap = count/THREAD_COUNT;
public Mythread(int start) {
this.start = start;
}
@Override
public void run() {
for (int i = start; i < start + gap; i++) {
m.put(keys[i], values[i]);
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Mythread(i * (count/THREAD_COUNT));
}
for (Thread item : threads) {
item.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(m.size());
// ---------------------------------
start = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10; j++) {
System.out.println(m.get(keys[10]));
}
});
}
for (Thread item : threads) {
item.start();
}
for (Thread item : threads) {
try {
item.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
end = System.currentTimeMillis();
System.out.println(end - start);
}
}
}
1.2.5.ConcurrentSkipListMap
通过跳表来实现的高并发容器,并且这个Map是有排序的
跳表: 底层本身存储的是一个链表元素,是排好序的。如果链表是排好序的,往里边插入和取值都变得特别麻烦。因为你得从头到尾的找。这时候跳表就出现了
跳表在链表的基础上拿出了一些关键元素,在上面做一层链表,如果这一层数据量还是很大,就在这个基础上再抽一些元素做一个链表。所以查找数据的时候是一层一层的往下查找,实现上比TreeMap 又容易了很多。
下面做一个小程序,试试这几个Map 的性能
package com.learn.thread.six.map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
public class TestConcurrentSkipListMap {
public static void main(String[] args) {
Map<String, String> map1 = new ConcurrentSkipListMap<>();
Map<String, String> map2 = new ConcurrentHashMap<>();
Map<String, String> map3 = new HashMap<>();
Hashtable<String, String> map4 = new Hashtable<>();
test(map1);
test(map2);
test(map3);
test(map4);
}
public static void test(Map<String, String> map) {
Random random = new Random();
Thread[] tds = new Thread[100];
CountDownLatch count = new CountDownLatch(tds.length);
long start = System.currentTimeMillis();
for (int i = 0; i < tds.length; i++) {
tds[i] = new Thread(() -> {
for (int j = 0; j < 10; j++) {
map.put(UUID.randomUUID().toString(), "a" + random.nextInt(100000));
}
count.countDown();
});
}
Arrays.asList(tds).forEach(item -> item.start());
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(map.size());
}
}
1.2.6.ArraryList
我们先来认识一下vector到Queue的发展历程,下面有一个程序展示超卖的现象
package com.learn.thread.six.list;
import java.util.ArrayList;
import java.util.List;
public class TestTicketSeller {
private static List<String> tickets = new ArrayList<>();
static {
for (int i = 0; i < 1000; i++) {
tickets.add("票编号" + i);
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (tickets.size() >0 ){
System.out.println("销售了 ---" + tickets.remove(0));
}
}).start();
}
}
}
运行一下,我们发现卖了超过1000条
1.2.7.Vector
我们来看看最早的容器Vector,内部是带锁的,(add 和remove 都是带锁的),但是我们的代码逻辑中间Vector 是不会带锁的。
package com.learn.thread.six.list;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
public class TestSeller2 {
private static Vector<String> tickets = new Vector<>();
static {
for (int i = 0; i < 100; i++) {
tickets.add("票编号" + i);
}
}
public static void main(String[] args) {
testSeller();
}
public static void testSeller() {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (tickets.size() >0 ){
// 下面这段代码并没有加锁
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("销售了 ---" + tickets.remove(0));
}
}).start();
}
}
}
可以看到,中间代码睡眠2秒是存在线程安全问题的,如果把它去掉,整个程序不会报异常。所以还是得最外层加一个锁。如下所示
public static void testSynchronizedSeller() {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 保证原子性操作
synchronized (tickets) {
while (tickets.size() >0 ){
// 下面这段代码并没有加锁
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("销售了 ---" + tickets.remove(0));
}
}
}).start();
}
}
1.2.8.LinkedList
LinkList是一个双链表,在添加和删除元素时具有比ArrayList更好的性能.但在get与set方面弱于ArrayList.当然,这些对比都是指数据量很大或者操作很频繁。
LinkedList并不需要连续的空间,大小不确定
package com.learn.thread.six.list;
import java.util.Vector;
public class TestLinkedList {
private static Vector<String> tickets = new Vector<>();
static {
for (int i = 0; i < 10; i++) {
tickets.add("票编号" + i);
}
}
public static void main(String[] args) {
testSeller();
}
public static void testSeller() {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
synchronized (tickets) {
if (tickets.size() <= 0) {
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("销售了------" + tickets.remove(0));
}
}
}).start();
}
}
}
1.2.9.Queue
效率最高的就是这个Queue,这是Java 最新的一个借口,主要目标就是为了高并发使用的,就是为了多线程,所以以后多线程就考虑Queue。
Queue 使用最多的就是ConcurrentLinkedQueue,然后里边没有加锁,调用一个叫poll的方法去取值,如果取空了就说明里面的值已经没了。
package com.learn.thread.six.queue;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
public class TestQueue {
static Queue<String> queue = new ConcurrentLinkedDeque<>();
static {
for (int i = 0; i < 10; i++) {
queue.add("票编号" + i);
}
}
public static void main(String[] args) {
testQueue();
}
public static void testQueue() {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
String s = queue.poll();
if (s == null) {
return;
}
System.out.println("销售了" + s);
}
}).start();
}
}
}
1.2.10.CopyOnWrite
再来看看并发的时候经常使用的一个类,CopyOnWrite ,CopyOnWirteList,CopyOnWirteSet, CopyOnwirte的意思就是复制,并且是写时复制
public boolean add(E e) {
// 添加的时候,上锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 原本的数组
Object[] elements = getArray();
// 原本数组的长度
int len = elements.length;
// 调用native方法进行复制
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 新的元素
newElements[len] = e;
// 替换数组
setArray(newElements);
// 成功
return true;
} finally {
// 解锁
lock.unlock();
}
}
package com.learn.thread.six.queue;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList;
public class TestCopyOnwirte {
public static void main(String[] args) {
List<Integer> list1 = new CopyOnWriteArrayList<>();
List<Integer> list2 = new ArrayList<>();
List<Integer> list3 = new Vector<>();
List<Integer> list4 = new LinkedList<>();
test(list1);
test(list2);
test(list3);
test(list4);
}
public static void test(List<Integer> list) {
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 100; j++) {
int finalJ = j;
new Thread(() -> {
list.add(finalJ);
}) .start();
}
}
System.out.println(list.size());
long end = System.currentTimeMillis();
System.out.println("time" + (end - start));
}
}
可以发现CopyOnWirte的写是最慢的,所以这个适用于,多读少写的场景。如果对象很大,CopyOnWirte存在内存溢出的情况,因为写操作都是复制一份,需要开辟另外一个空间
3.队列
3.1.BlockingQueue
这个是后面线程池的时候需要讲的内存,重点就是Block阻塞,顾名思义,阻塞队列,我们可以在其方法上做到线程自动阻塞。
offer offer是往里头添加,加没加进去会返回一个boolean值
add add 同样往里头加,但是如果没加进去是会抛出异常的
poll 提取数据,并且remove掉
peek 提取数据,但是不remove
package com.learn.thread.six.queue;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
public class TestBlockingQueue {
public static void main(String[] args) {
Queue<String> queue = new ConcurrentLinkedDeque<>();
for (int i = 0; i < 10; i++) {
queue.offer("a" + i);
}
System.out.println(queue);
System.out.println(queue.size());
// 队列先进后出,poll是提取最后一个进去的
System.out.println(queue.poll());
System.out.println(queue.peek());
System.out.println(queue.size());
}
}
3.2.LinkedBlockingQueue
LinkedBlockingQueue 是用链表实现的无界队列,一直往里头加,直到内存装满了为止
LinkedBlockingQueue 在Queue的基础上,添加两个很重要的方法
put 往队列里边加,加满了,这个线程就会阻塞住。
take 往外取数据,如果空了,线程也会阻塞住
所以LinkedBlockingQueue是实现生产者消费者最好的容器。
下面以一个小程序演示一下一个线程a到i 装数据,没装一个睡一秒,另外启5个线程不断的从里面take,空了我就等着,什么时候新加了我就立马把他取出来
package com.learn.thread.six.queue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestLinkedBlockingQueue {
static BlockingQueue<String> queue = new LinkedBlockingQueue<>(3);
static Random random = new Random();
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 5; i++) {
// 如果满了就会等待
try {
queue.put("a" + i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"p1").start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (true) {
try {
System.out.println(Thread.currentThread().getName() + queue.take());
} catch (Exception ex) {
}
}
}).start();
}
}
}
3.3.ArrayBlockingQueue
ArrayBlockingQueue 是有界的,你可以指定一个固定的值,他的容器就是10,那么当你往里面扔内容的时候,一旦满了put方法就是阻塞住,如果继续add 的话就会抛出异常.
offer 用返回值来判断是否添加成功,还有另外一个写法,你可以指定一个时间尝试往里边添加,比如一秒钟,如果一秒钟内没有添加成功,他就返回了。
面试题经常会被问到 Queue和List 的区别是什么
答:添加了offer\peek\poll\put\take 这些方法对线程友好的阻塞或者等待。
package com.learn.thread.six.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class TestArrayLBlockingQueue {
public static BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
queue.put("a" + i);
}
// 满了就会阻塞
queue.put("a");
// queue.add("a");
queue.offer("a");
queue.offer("a", 1, TimeUnit.SECONDS);
System.out.println(queue);
}
}
下面看看几个比较特殊的Queue,这些都是BlockingQueue ,同样也是阻塞的
3.4.DelayQueue
DelayQueue 可以实现时间上的排序,这个DelayQueue能实现按里面等待的时间来排序。
但是要实现Comparable接口重写compareTo来确定是怎么排序的,DelayQueue就是按时间顺序进行任务调度
package com.learn.thread.six.queue;
import lombok.ToString;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class TestDelayQueue {
private static BlockingQueue queue = new DelayQueue<>();
@ToString
static class MyThread implements Delayed {
String name;
long runningTime;
public MyThread(String name, long runningTime) {
this.name = name;
this.runningTime = runningTime;
}
/**
* 根据这个时间去排序
*
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 修改排序
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else {
return 0;
}
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
MyThread t1 = new MyThread("t1", now + 1000);
MyThread t2 = new MyThread("t2", now + 2000);
MyThread t3 = new MyThread("t3", now + 1500);
MyThread t4 = new MyThread("t4", now + 2500);
MyThread t5 = new MyThread("t5", now + 500);
queue.put(t1);
queue.put(t2);
queue.put(t3);
queue.put(t4);
queue.put(t5);
System.out.println(queue);
for (int i = 0; i < 5; i++) {
System.out.println(queue.take());
}
}
}
}
DelayQueue本质上是用了个PriorityQueue,PriorityQueue是AbstractQueue继承的。PriorityQueue的特点是它内部往里装的时候不是按顺序的,而是内部进行了一个排序。按照优先级,最小的优先,内部结构是一个二叉树,这个二叉树可以认为是堆排序里面那个最小堆值排在最上面。
package com.learn.thread.six.queue;
import java.util.PriorityQueue;
public class TestPriorityQueue {
public static void main(String[] args) {
PriorityQueue<String> queue = new PriorityQueue<>();
queue.add("a");
queue.add("e");
queue.add("b");
queue.add("c");
queue.add("f");
for (int i = 0; i < 5; i++) {
System.out.println(queue.poll());
}
}
}
3.5.SynchronousQueue
SynchronousQueue的容量为0,这个容器不是用来装内容的,专门用来两个线程之间传内容的,给线程下达任务的,跟之前讲过的Exchange一样。
add方法会直接报错,原因是容量为0,不能添加。只能用put调用,并且是要求前面有人拿着这个东西你才可以往里面装
SynchronousQueue线程池多数情况下会使用,很多线程取任务,互相之间进行任务调度的都是他
package com.learn.thread.six.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class TestSynchronousQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 阻塞等待消费者消费,如果没有被消费会一直等待
queue.put("aaa");
System.out.println("niadsasdasd");
System.out.println(queue.size());
}
}
3.6.TransferQueue
Transfer意思为传递,实际上和SynchronousQueue一样,不过SynchronousQueue只能传递一个,它能传递多个,并且它还有一个transfer方法,意思就是装完后就在这里等着被消费。
一般使用场景,我做了一件事情,这个事情一定要有一个结果,有了结果之后我才可以继续进行我下面的事情。
比如说我支付了钱,这个订单付款完了,也是要一直等这个付账结果完了了我才可以给客户反馈
package com.learn.thread.six.queue;
import java.util.concurrent.LinkedTransferQueue;
import static java.lang.Thread.sleep;
public class TestTransferQueue {
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> queue = new LinkedTransferQueue<String>();
new Thread(() -> {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
queue.transfer("asdasdasd");
new Thread(() -> {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
容器这章主要为了后面的线程池
总结
从hashTable到ConcurrentHashMap,这不是一个替代关系,而是不同场景下不同的使用
Vector 到Queue这样一个过程,主要是Vector添加了许多对线程友好的API,offer\peek\poll,它的一个子类叫BlockingQueue又添加了put和take 实现了阻塞操作。