阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。#### wait、notifyAll实现方式```package blocks;import java.util.ArrayList;/** * Created by maxi on 2017/11/8. */public class ObjectBlockQueuesimplements BlockQueues{ private final ArrayListlists; private int limite;// private ReentrantLock mLocks; public ObjectBlockQueues() { this(10, false); } public ObjectBlockQueues(int limite) { this(limite, false); } public ObjectBlockQueues(int limite, Boolean isFair) { this.limite = limite; lists = new ArrayList(limite);// mLocks = new ReentrantLock(isFair); } @Override public synchronized E get(int index) {// final ReentrantLock mLocks = this.mLocks;// mLocks.lock(); E e = this.lists.get(index);// mLocks.unlock(); return e; } @Override public void add(E e) { checkNotNull(e);// final ReentrantLock mLocks = this.mLocks;// mLocks.lock(); synchronized(this.lists) { try { while (this.lists.size() == this.limite) {// System.out.println("============" + e + " will wait ============="); this.lists.wait();// System.out.println("============" + e + " is break! ============="); } if (this.lists.size() == 0) {// System.out.println("============add " + e + " notifyAll ============="); this.lists.notifyAll(); } this.lists.add(e); } catch (Exception e1) { e1.printStackTrace(); } finally {// mLocks.unlock(); } } } @Override public void remove(E e) { checkNotNull(e);// final ReentrantLock mLocks = this.mLocks;// mLocks.lock(); synchronized(this.lists) { try { while (this.lists.size() == 0) { this.lists.wait(); } if (this.lists.size() == this.limite) {// System.out.println("============remove " + e + " notifyAll ============="); this.lists.notifyAll(); } this.lists.remove(e); } catch (Exception e1) { e1.printStackTrace(); } finally {// mLocks.unlock(); } } } @Override public synchronized void removeAll() {// final ReentrantLock mLocks = this.mLocks;// mLocks.lock(); this.lists.clear();// mLocks.unlock(); } @Override public int size() {// final ReentrantLock mLocks = this.mLocks;// mLocks.lock(); synchronized(this.lists) { int length = this.lists.size();// mLocks.unlock(); return length; } } private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }}```#### 并发类 锁的实现方式```package blocks;import java.util.ArrayList;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/** * Created by maxi on 2017/11/8. */public class ArrayBlockQueueimplements BlockQueues{ private final ArrayListlists; private int limite; private ReentrantLock mLocks; private Condition notEmpty; private Condition notFull; public ArrayBlockQueue() { this(10, false); } public ArrayBlockQueue(int limite) { this(limite, false); } public ArrayBlockQueue(int limite, Boolean isFair) { this.limite = limite; lists = new ArrayList<>(); mLocks = new ReentrantLock(isFair); notEmpty = mLocks.newCondition(); notFull = mLocks.newCondition(); } @Override public E get(int index) { final ReentrantLock mLocks = this.mLocks; mLocks.lock(); E e = this.lists.get(index); mLocks.unlock(); return e; } @Override public void add(E e) { checkNotNull(e);// final ReentrantLock mLocks = this.mLocks; mLocks.lock(); try { while (this.lists.size() == this.limite) {// System.out.println("============" + e + " will wait ============="); notFull.await();// System.out.println("============" + e + " is break! ============="); }// if (this.lists.size() == 0) {//// System.out.println("============add " + e + " notifyAll =============");// this.lists.notifyAll();// } this.lists.add(e); notEmpty.signalAll(); } catch (Exception e1) { e1.printStackTrace(); } finally { mLocks.unlock(); } } @Override public void remove(E e) { checkNotNull(e);// final ReentrantLock mLocks = this.mLocks; mLocks.lock(); try { while (this.lists.size() == 0) {// this.lists.wait(); notEmpty.await(); }// if (this.lists.size() == this.limite) {//// System.out.println("============remove " + e + " notifyAll =============");// this.lists.notifyAll();// } this.lists.remove(e); notFull.signalAll(); } catch (Exception e1) { e1.printStackTrace(); } finally { mLocks.unlock(); } } @Override public synchronized void removeAll() {// final ReentrantLock mLocks = this.mLocks; mLocks.lock(); this.lists.clear(); mLocks.unlock(); } @Override public int size() {// final ReentrantLock mLocks = this.mLocks; mLocks.lock(); int length = this.lists.size(); mLocks.unlock(); return length; } private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }}```#### main函数```package blocks;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;/** * Created by maxi on 2017/11/8. */public class BlockMain { public static void main(String[] args) throws InterruptedException {// BlockQueuesblockQueues = new ObjectBlockQueues<>(); BlockingQueueblockingQueues = new ArrayBlockingQueue(10); BlockQueuesblockQueues = new ArrayBlockQueue(10);
Thread thread1 = new Thread(() -> {
for (int i = 0; i <= 20; i++) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
blockQueues.add("num" + i);
System.out.println("thread 1 add " + i + " queue size = " + blockQueues.size());
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i <= 20; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockQueues.remove("num" + i);
System.out.println("thread 2 remove " + i + " queue size = " + blockQueues.size());
}
});
thread1.start();
thread2.start();
}
}
```