经典模式:生产者和消费者
public class ProduceConsume {
public static void main(String[] args) {
SyncStack ss = new SyncStack();//建造一个装馒头的框
Producer p = new Producer(ss);//新建一个生产者,使之持有框
Consume c = new Consume(ss);//新建一个消费者,使之持有同一个框
Thread tp = new Thread(p);//新建一个生产者线程
Thread tc = new Thread(c);//新建一个消费者线程
tp.start();//启动生产者线程
tc.start();//启动消费者线程
}
}
class SteamBread {
int id;
SteamBread(int id) {
this.id = id;
}
public String toString() {
return "steamBread:"+id;
}
}
//装馒头的框,栈结构
class SyncStack{
int index = 0;
SteamBread[] stb = new SteamBread[6];//构造馒头数组,相当于馒头筐,容量是6
//放入框中,相当于入栈
public synchronized void push(SteamBread sb) {
while(index==stb.length){//筐满了,即栈满,
try {
this.wait();//让当前线程等待
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
this.notify();//唤醒在此对象监视器上等待的单个线程,即消费者线程
stb[index] = sb;
this.index++;
}
//从框中拿出,相当于出栈
public synchronized SteamBread pop() {
while(index==0){//筐空了,即栈空
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
this.notify();
this.index--;//push第n个之后,this.index++,使栈顶为n+1,故return之前要减一
return stb[index];
}
}
//生产者类,实现了Runnable接口,以便于构造生产者线程
class Producer implements Runnable {
SyncStack ss = null;
Producer(SyncStack ss){
this.ss = ss;
}
@Override
public void run() {
for(int i = 0; i < 20; i++){
SteamBread stb = new SteamBread(i);
ss.push(stb);
System.out.println("生产了"+stb);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消费者类,实现了Runnable接口,以便于构造消费者线程
class Consume implements Runnable{
SyncStack ss = null;
public Consume(SyncStack ss) {
super();
this.ss = ss;
}
@Override
public void run() {
for(int i = 0;i < 20; i++){//开始消费馒头
SteamBread stb = ss.pop();
System.out.println("消费了"+stb);
try {
Thread.sleep(100);//每消费一个馒头,睡觉100毫秒。即生产多个,消费一个
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
BlockingQueue 本身支持线程安全,类似还有CopyOnWriteArrayList
public class ProducerConsumer {
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
Storage s = pc.new Storage();
ExecutorService service = Executors.newCachedThreadPool();
Producer p = pc.new Producer("张三", s);
Producer p2 = pc.new Producer("李四", s);
Consumer c = pc.new Consumer("王五", s);
Consumer c2 = pc.new Consumer("老刘", s);
Consumer c3 = pc.new Consumer("老林", s);
service.submit(p);
service.submit(p2);
service.submit(c);
service.submit(c2);
service.submit(c3);
}
class Consumer implements Runnable {
private String name;
private Storage s = null;
public Consumer(String name, Storage s) {
this.name = name;
this.s = s;
}
public void run() {
try {
while (true) {
System.out.println(name + "准备消费产品.");
Product product = s.pop();
System.out.println(name + "已消费(" + product.toString() + ").");
System.out.println("===============");
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Producer implements Runnable {
private String name;
private Storage s = null;
public Producer(String name, Storage s) {
this.name = name;
this.s = s;
}
public void run() {
try {
while (true) {
Product product = new Product((int) (Math.random() * 10000)); // 产生0~9999随机整数
System.out.println(name + "准备生产(" + product.toString() + ").");
s.push(product);
System.out.println(name + "已生产(" + product.toString() + ").");
System.out.println("===============");
Thread.sleep(500);
}
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
public class Storage {
BlockingQueue<Product> queues = new LinkedBlockingQueue<Product>(10);
public void push(Product p) throws InterruptedException {
queues.put(p);
}
public Product pop() throws InterruptedException {
return queues.take();
}
}
public class Product {
private int id;
public Product(int id) {
this.id = id;
}
public String toString() {
return "产品:" + this.id;
}
}
}
结果和第一个差不多,就消费速度比第一个快,因为Cousmer多了
Semaphore,信号量,相当于给变量打上计数,此时大于0,就是可以需求使用就减1,使用完要释放恢复计数器,类似停车场,有数目影响。和底层,线程同步有点类似synchronized,底层就是给变量打个计数器,获得就-1,只有非负数才能继续。
public class Main {
public static void main(String[] args) {
PrintQueue2 printQueue = new PrintQueue2();
Thread thread[] = new Thread[10];
for(int i=0;i<10;i++){
thread[i] = new Thread(new Job(printQueue),"Thread"+i);
}
for(int i=0;i<10;i++){
thread[i].start();
}
}
}
public class PrintQueue2 {
private final Semaphore semaphore;
public PrintQueue2() {
semaphore = new Semaphore(1);
}
public void printJob(Object document) {
try {
semaphore.acquire();
long duration = (long)(Math.random()*10);
System.out.println(Thread.currentThread().getName()+" PrintQueue "+duration
);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
semaphore.release();
}
}
}
public class Job implements Runnable{
private PrintQueue2 printQueue;
public Job(PrintQueue2 printQueue){
this.printQueue = printQueue;
}
@Override
public void run() {
System.out.printf("%s:Going to print a job\n",
Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s: The document has been printed\n",Thread.currentThread().getName());
}
}
这里每次抢占一个资源,需要等待释放,否则阻塞。
可修改Semaphores的公平性,在默认的情况下信号量的进入是不公平的。如果在初始化的第二个参数设定为true时,则会选择时间等待最久的一个进入
重入锁
java.util.concurrent.locks.ReentrantLock,其中Condition这玩意可以解决经典问题哲学家吃饭问题。
原子操作
出自深入理解Android 卷1
上面Mutex就是重入锁,只是c++叫这货而已
原理在于同一时刻只能有一个CPU访问总线