主方法开启线程
生产者生产连接 ,消费者获取连接
/**
* 线程池阻塞队列
*/
public class Test {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(10);
ProducerThread producerThread = new ProducerThread(blockingQueue);
ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
Thread p = new Thread(producerThread);
Thread c = new Thread(consumerThread);
p.start();
c.start();
try {
//等待10秒
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
producerThread.stop();
}
}
生产者线程
/**
* 生产者添加队列
*/
class ProducerThread implements Runnable{
public BlockingQueue<String> blockingQueue;
public volatile Boolean FLAG = true;
AtomicInteger atomicInteger =new AtomicInteger();
public ProducerThread(BlockingQueue<String> blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println("生产者线程已经启动");
try {
while (FLAG){
//i++的意思,保证安全
String data = atomicInteger.incrementAndGet() +"";
boolean offer = blockingQueue.offer(data,2, TimeUnit.SECONDS);
if (offer){
System.err.println("生产者存入队列成功!data:" + data);
}else {
System.err.println("生产者存入队列失败!data:" + data);
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("生产者生产结束");
}
}
//线程停止
public void stop(){
this.FLAG = false;
}
}
消费者线程
/**
* 消费者消费队列
*/
class ConsumerThread implements Runnable{
public BlockingQueue<String> blockingQueue;
public volatile Boolean FLAG = true;
AtomicInteger atomicInteger =new AtomicInteger();
public ConsumerThread(BlockingQueue<String> blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println("消费者线程启动");
try {
while (FLAG){
String data = blockingQueue.poll(2, TimeUnit.SECONDS);
if (data == null){
System.out.println("超过两秒时间没有获取到");
FLAG = false;
return;
}
System.out.println("消费者获取队列data:"+data);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
System.out.println("消费结束");
}
}
}
引用的包
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
图片分析