理解程序、进程、线程的概念
程序可以理解为静态的代码
进程可以理解为执行中的程序
线程可以理解为进程的进一步细分,程序的一条执行路径
使用多线程的优点:
提高应用程序的响应。对图形化界面更有意义,可增强用户体验。
提高计算机系统CPU的利用率
改善程序结构。将既长又复杂的进程分为多个线程,独立运行,利于理解和修改
在java中要想实现多线程,有两种手段,一种是继续Thread类,另外一种是实现Runable接口
继承java.lang.Thread类
下面来看一个简单的实例:
class Thread1 extends Thread{
private String name;
public Thread1(String name) {
this.name=name;
}
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + "运行 : " + i);
try {
sleep((int) Math.random() * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
Thread1 mTh1=new Thread1("A");
Thread1 mTh2=new Thread1("B");
mTh1.start();
mTh2.start();
}
}
输出
输出:
A运行 : 0
B运行 : 0
A运行 : 1
A运行 : 2
A运行 : 3
A运行 : 4
B运行 : 1
B运行 : 2
B运行 : 3
B运行 : 4
再运行一下:
A运行 : 0
B运行 : 0
B运行 : 1
B运行 : 2
B运行 : 3
B运行 : 4
A运行 : 1
A运行 : 2
A运行 : 3
A运行 : 4
说明:
程序启动运行main时候,java虚拟机启动一个进程,主线程main在main()调用时候被创建。随着调用Thread1的两个对象的start方法,另外两个线程也启动了,这样,整个应用就在多线程下运行。
以下是关系到线程运行状态的几个方法:
1)start方法
start()用来启动一个线程,当调用start方法后,系统才会开启一个新的线程来执行用户定义的子任务,在这个过程中,会为相应的线程分配需要的资源。
2)run方法
run()方法是不需要用户来调用的,当通过start方法启动一个线程之后,当线程获得了CPU执行时间,便进入run方法体去执行具体的任务。注意,继承Thread类必须重写run方法,在run方法中定义具体要执行的任务。
3)sleep方法
sleep相当于让线程睡眠,交出CPU,让CPU去执行其他的任务。
实现java.lang.Runnable接口
用Runnable也是非常常见的一种,我们只需要重写run方法即可。下面也来看个实例:
class Thread2 implements Runnable{
private String name;
public Thread2(String name) {
this.name=name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + "运行 : " + i);
try {
Thread.sleep((int) Math.random() * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
new Thread(new Thread2("C")).start();
new Thread(new Thread2("D")).start();
}
}
输出
输出:
C运行 : 0
D运行 : 0
D运行 : 1
C运行 : 1
D运行 : 2
C运行 : 2
D运行 : 3
C运行 : 3
D运行 : 4
C运行 : 4
说明:
Thread2类通过实现Runnable接口,使得该类有了多线程类的特征。run()方法是多线程程序的一个约定。所有的多线程代码都在run方法里面。Thread类实际上也是实现了Runnable接口的类。
在启动的多线程的时候,需要先通过Thread类的构造方法Thread(Runnable target) 构造出对象,然后调用Thread对象的start()方法来运行多线程代码。
实际上所有的多线程代码都是通过运行Thread的start()方法来运行的。因此,不管是扩展Thread类还是实现Runnable接口来实现多线程,最终还是通过Thread的对象的API来控制线程的,熟悉Thread类的API是进行多线程编程的基础。
Thread和Runnable的区别
对比一下继承的方式 vs 实现的方式
1.联系:public class Thread implements Runnable(继承的方式的Thread也实现了Runnable接口)
2.哪个方式好?
实现的方式优于继承的方式 why?
① 避免了java单继承的局限性
② 如果多个线程要操作同一份资源(或数据),更适合使用实现的方式
看一个例子:
//模拟火车站售票窗口,开启三个窗口售票,总票数为100张
//存在线程的安全问题
class Window extends Thread {
int ticket = 100;
public void run() {
while (true) {
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + "售票,票号为:"+ ticket--);
} else {
break;
}
}
}
}
public class TestWindow {
public static void main(String[] args) {
Window w1 = new Window();
Window w2 = new Window();
Window w3 = new Window();
w1.setName("窗口1");
w2.setName("窗口2");
w3.setName("窗口3");
w1.start();
w2.start();
w3.start();
}
}
class Window implements Runnable {
int ticket = 100;//要将全局变量声明为静态,不然每个对象都有这个属性,会卖出300张票
public void run() {
while (true) {
if (ticket > 0) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "售票,票号为:"+ ticket--);
} else {
break;
}
}
}
}
public class Main {
//模拟火车站售票窗口,开启三个窗口售票,总票数为100张
//存在线程的安全问题
public static void main(String[] args) {
Window w1 = new Window();
Thread t1 = new Thread(w1, "t1");
Thread t2 = new Thread(w1, "t2");
Thread t3 = new Thread(w1, "t3");
t1.start();
t2.start();
t3.start();
}
}
问题原因:
某个线程执行完输出ticket后,还没有来得及ticket--,CPU时间片被分配给了另外一个线程,导致同一个票号被输出2次。
另外一种情况,打印到ticket=1时,有2个线程同时进入到了条件里,导致-1的票号被输出。
解决办法,为关键代码段,加锁,参见后文。
中断线程
当线程的run方法执行方法体中最后一条语句后,并经由return语句返回时,或者出现了在方法中过没有捕获的异常时,线程将被终止。
线程同步
根据各线程访问数据的次序,可能会产生讹误的对象。这样的一个情况称为竞争条件(race condition)。
1)竞争条件的一个例子
银行例程:多线程操作时,本应恒等的余额总值发生了变化。
public class SynchBankTest
{
public static final int NACCOUNTS = 100;
public static final double INITIAL_BALANCE = 1000;
public static void main(String[] args)
{
Bank b = new Bank(NACCOUNTS, INITIAL_BALANCE);
int i;
for (i = 0; i < NACCOUNTS; i++)
{
TransferRunnable r = new TransferRunnable(b, i, INITIAL_BALANCE);
Thread t = new Thread(r);
t.start();
}
}
}
public class Bank
{
private final double[] accounts;
public Bank(int n, double initialBalance)
{
accounts = new double[n];
for (int i = 0; i < accounts.length; i++)
accounts[i] = initialBalance;
}
public void transfer(int from, int to, double amount) throws InterruptedException
{
if (accounts[from] >= amount) {
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}
}
public double getTotalBalance()
{
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}
public int size()
{
return accounts.length;
}
}
public class TransferRunnable implements Runnable
{
private Bank bank;
private int fromAccount;
private double maxAmount;
private int DELAY = 10;
public TransferRunnable(Bank b, int from, double max)
{
bank = b;
fromAccount = from;
maxAmount = max;
}
public void run()
{
try
{
while (true)
{
int toAccount = (int) (bank.size() * Math.random());
double amount = maxAmount * Math.random();
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
}
catch (InterruptedException e)
{
}
}
}
2)详解竞争条件
假定两个线程同时执行指令
accounts[to] += amount;
这不是原子操作。该指令可能被处理如下:
1)将accounts[to]加载到寄存器
2)增加amount
3)将结果写回accounts[to]
假定第一个线程执行步骤1和2,然后,它被剥夺了运行权。假定第二个线程被唤醒并修改了accounts数组中的同一项。然后,第一个线程被唤醒并完成其第三步。这一动作擦去了第二个线程所做的更新。
线程同步
当使用多个线程来访问同一个数据时,非常容易出现线程安全问题(比如多个线程都在操作同一数据导致数据不一致),所以我们用同步机制来解决这些问题。
锁和条件的关键之处:
锁用来保护代码片段,任意时刻只能有一个线程执行被保护的代码。
锁可以管理试图进入保护代码片段的线程
锁可以拥有一个或者多个相关的条件对象
每个条件对象管理那些已经进入被保护的代码段但还不能运行的线程。
锁对象
在Java SE5.0引入ReentrantLock类。Lock是Java.util.concurrent.locks包下的接口,Lock 实现提供了比使用synchronized 方法和语句可获得的更广泛的锁定操作。
用ReentrantLock保护代码块的基本结构如下:
myLock.lock(); //a ReentrantLock object
try
{
critical section
}
finally
{
myLock.unlock();//确保代码抛出异常锁必须被释放
}
这一结构确保任何时刻只有一个线程进入临界区。一旦一个线程封锁了锁对象,其他任何线程都无法通过lock语句。当其他线程调用lock时,他们被阻塞,直到第一个线程释放锁对象。把解锁操作放在finally子句之内是至关重要的。如果在临界区的代码抛出异常,锁必须释放。否则,其他线程将永远阻塞。
public class Bank
{
private Lock bankLock= new ReentrantLock();
public void transfer(int from, int to, double amount)
{
bankLock.lock();
try
{
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}
finally
{
bankLock.unlock();
}
}
public double getTotalBalance()
{
bankLock.lock();
try
{
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}
finally
{
bankLock.unlock();
}
}
}
假定一个线程调用transfer,在执行结束前被剥夺运行权。假定第二个线程也调用了transfer,由于第二个线程并不能获得锁,将在调用lock方法时被阻塞。他必须等待第一个线程完成transfer方法之后才能再度被激活。当第一个线程释放锁时,第二个线程才能开始运行。这样余额就不会出错了。
每个Bank对象有自己的ReentrantLock对象,如果两个线程试图访问同一个Bank对象,那么锁以串行方式提供服务。但是,如果两个线程访问不同的Bank对象,每个线程得到不同的锁对象,两个线程都不会发生阻塞。两个线程在操作不同的Bank实例的时候,线程之间不会相互影响。
锁是可重入的,因为线程可以重复的获得已经持有的锁。锁保持一个持有计数来跟踪对Lock方法的嵌套调用。线程每一次调用都用unlock来释放锁。由于这一特性,被一个锁保护的代码可以调用另一个使用相同锁的方法。例如,transfer方法调用getTotalBalance方法,这也会封锁bankLock对象,此时bankLock对象的持有计数为2,当getTotalBalance方法退出时,持有计数变为1,当transfer方法退出时,持有计数变为0。线程释放锁。通常,可能想要保护需若干个操作来更新或者检查共享对象的代码块。要确保这些操作完成后,另一个线程才能使用相同的对象。
要留心临界区的代码,不要因为异常的抛出二跳出了临界区。如果在临界区代码结束之前抛出了异常,finally字句释放锁,但会使对象可能出于一种受损状态。
条件对象
通常,线程进入临界区,却发现在某一条件满足后才能执行。要使用一个条件对象来管理那些已经获得了一个锁,但是不能做有用工作的线程。(条件对象经常被称为条件变量)
分析上文中的银行模拟程序,
if(bank.getBalance(from)>=amount)
transfer(from, to, amount);
如果当前程序通过if条件判断,且在调用transfer之前被中断,在线程再次运行前,账户余额可能已经低于提款金额。必须确保没有其他线程在本检查余额与转账活动之间修改余额。通过使用锁来保护检查与转账动作来做到这一点:
public void transfer(int from, int to, double amount)
{
bankLock.lock();
try
{
while (accounts[from] < amount)
{
//wait()
}
//transfer funds
........
finally
{
bankLock.unlock();
}
}
现在,当账户中没有足够的余额时,等待直到另一个线程向账户中注入资金。但是,这一线程刚刚获得了bankLock的排他性访问,因此别的线程没有进行存款操作的机会,这就是为什么需要用条件对象的原因。
一个锁对象可以有一个或者多个相关的条件对象。可以用newCondition方法获得一个条件对象。习惯的给每一个条件对象命名为可以反应它所表达条件的名字。如sufficientFunds = bankLock.newCondition();
如果transfer方法发现余额不足,它调用sufficientFunds.await();当前线程被他阻塞了,并放弃锁。我们希望这样可以等待另一个线程进行增加账户余额的操作。
等待获得锁的线程和调用await方法的线程在本质上存在不同。一旦一个线程调用await方法,他进入该条件的等待集。当该锁可用时,该线程不能马上解除阻塞。相反,它处于阻塞状态,直到另一个线程调用同一条件上的signalAll方法时为止。
当另一个线程转账时,它应该调用sufficientFunds.signalAll();这一调用重新激活因为这一条件等待的所有线程。当这些线程从等待集当中移除时,他们再次成为可运行的,调度器将再次激活它们。同时,他们试图重新进入该对象。一旦锁成为可用,他们将从await调用返回,获得该锁并从被阻塞的地方继续执行。
此时,线程应该再次测试该条件。由于无法确保该条件被满足,signalAll方法仅仅通知正在等待的线程:此时有可能已经满足条件,值得再次去检测条件。
最关重要的是最终需要某个其他线程调用signalAll方法。当一个线程调用await时,他没有办法自己激活自身,它寄希望于其他线程。如果没有其他线程重新来激活等待的线程,他就永远不再运行。导致死锁。如果所有其他线程被阻塞,最后一个线程再解除其他阻塞线程之前就调用await,那么它也被阻塞。没有线程解除其他阻塞线程,那么该程序就挂起。
应该何时调用signalAll呢,在本例中,当一个账户的余额发生改变时,等待的线程就有机会检查余额。调用signalAll不会立即激活一个等待线程。它仅仅解除等待线程的阻塞,以便这些线程可以在当前线程同步推出后,通过竞争实现对对象的访问。当一个线程拥有某个条件的锁时,它仅仅可以在该条件上调用await,signalAll和signal方法。
以下为完整的例子:
synch/Bank.java
package synch;
import java.util.concurrent.locks.*;
public class Bank
{
private final double[] accounts;
private Lock bankLock;
private Condition sufficientFunds;
public Bank(int n, double initialBalance)
{
accounts = new double[n];
for (int i = 0; i < accounts.length; i++)
accounts[i] = initialBalance;
bankLock = new ReentrantLock();
sufficientFunds = bankLock.newCondition();
}
public void transfer(int from, int to, double amount) throws InterruptedException
{
bankLock.lock();
try
{
while (accounts[from] < amount)
sufficientFunds.await();
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
sufficientFunds.signalAll();
}
finally
{
bankLock.unlock();
}
}
public double getTotalBalance()
{
bankLock.lock();
try
{
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}
finally
{
bankLock.unlock();
}
}
public int size()
{
return accounts.length;
}
}
[java] view plain copy
synch/TransferRunnable.java
package synch;
public class TransferRunnable implements Runnable
{
private Bank bank;
private int fromAccount;
private double maxAmount;
private int DELAY = 10;
public TransferRunnable(Bank b, int from, double max)
{
bank = b;
fromAccount = from;
maxAmount = max;
}
public void run()
{
try
{
while (true)
{
int toAccount = (int) (bank.size() * Math.random());
double amount = maxAmount * Math.random();
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
}
catch (InterruptedException e)
{
}
}
}
synch/SynchBankTest
package synch;
public class SynchBankTest
{
public static final int NACCOUNTS = 100;
public static final double INITIAL_BALANCE = 1000;
public static void main(String[] args)
{
Bank b = new Bank(NACCOUNTS, INITIAL_BALANCE);
int i;
for (i = 0; i < NACCOUNTS; i++)
{
TransferRunnable r = new TransferRunnable(b, i, INITIAL_BALANCE);
Thread t = new Thread(r);
t.start();
}
}
}
生产者消费者问题
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Buffer {
private final Lock lock;
private final Condition notFull;
private final Condition notEmpty;
private int maxSize;
private List<Date> storage;
Buffer(int size){
//使用锁lock,并且创建两个condition,相当于两个阻塞队列
lock=new ReentrantLock();
notFull=lock.newCondition();
notEmpty=lock.newCondition();
maxSize=size;
storage=new LinkedList<>();
}
public void put() {
lock.lock();
try {
while (storage.size() ==maxSize ){//如果队列满了
System.out.print(Thread.currentThread().getName()+": wait \n");;
notFull.await();//阻塞生产线程
}
storage.add(new Date());
System.out.print(Thread.currentThread().getName()+": put:"+storage.size()+ "\n");
Thread.sleep(1000);
notEmpty.signalAll();//唤醒消费线程
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
lock.unlock();
}
}
public void take() {
lock.lock();
try {
while (storage.size() ==0 ){//如果队列满了
System.out.print(Thread.currentThread().getName()+": wait \n");;
notEmpty.await();//阻塞消费线程
}
Date d=((LinkedList<Date>)storage).poll();
System.out.print(Thread.currentThread().getName()+": take:"+storage.size()+ "\n");
Thread.sleep(1000);
notFull.signalAll();//唤醒生产线程
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
class Producer implements Runnable{
private Buffer buffer;
Producer(Buffer b){
buffer=b;
}
@Override
public void run() {
while(true){
buffer.put();
}
}
}
class Consumer implements Runnable{
private Buffer buffer;
Consumer(Buffer b){
buffer=b;
}
@Override
public void run() {
while(true){
buffer.take();
}
}
}
public class Main{
public static void main(String[] arg){
Buffer buffer=new Buffer(10);
Producer producer=new Producer(buffer);
Consumer consumer=new Consumer(buffer);
for(int i=0;i<3;i++){
new Thread(producer,"producer-"+i).start();
}
for(int i=0;i<3;i++){
new Thread(consumer,"consumer-"+i).start();
}
}
}
synchronized关键字
Lock和condition接口为程序设计人员提供了高度的锁定控制。大多数情况下并不需要那样的控制,并且可以使用一种嵌入到Java语言内部的机制。从Java1.0版本开始,Java中的每个对象都有一个内部锁。如果一个方法用synchronized关键字声明,那么对象的锁将保护整个方法。也就是说,要调用该方法,线程必须获得内部的对象锁。使用synchronized修饰的方法或者代码块可以看成是一个原子操作。
synchronized方法是一种粗粒度的并发控制,某一时刻,只能有一个线程执行该synchronized方法;
synchronized块则是一种细粒度的并发控制,只会将块中的代码同步,位于方法内、synchronized块之外的代码是可以被多个线程同时访问到的。
用同步方法实现的银行例子:
package synch2;
public class Bank
{
private final double[] accounts;
public Bank(int n, double initialBalance)
{
accounts = new double[n];
for (int i = 0; i < accounts.length; i++)
accounts[i] = initialBalance;
}
public synchronized void transfer(int from, int to, double amount) throws InterruptedException
{
while (accounts[from] < amount)
wait();
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
notifyAll();
}
public synchronized double getTotalBalance()
{
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}
public int size()
{
return accounts.length;
}
}
public class TransferRunnable implements Runnable
{
private Bank bank;
private int fromAccount;
private double maxAmount;
private int DELAY = 10;
public TransferRunnable(Bank b, int from, double max)
{
bank = b;
fromAccount = from;
maxAmount = max;
}
public void run()
{
try
{
while (true)
{
int toAccount = (int) (bank.size() * Math.random());
double amount = maxAmount * Math.random();
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
}
catch (InterruptedException e)
{
}
}
}
public class SynchBankTest2
{
public static final int NACCOUNTS = 100;
public static final double INITIAL_BALANCE = 1000;
public static void main(String[] args)
{
Bank b = new Bank(NACCOUNTS, INITIAL_BALANCE);
int i;
for (i = 0; i < NACCOUNTS; i++)
{
TransferRunnable r = new TransferRunnable(b, i, INITIAL_BALANCE);
Thread t = new Thread(r);
t.start();
}
}
}
线程状态
线程可以有以下6中状态
- New(新创建)
- Runnable(可运行)
- Blocked(被阻塞)
- Waiting(等待)
- Timed waiting(计时等待)
- Terminated(被终止)
要想确定一个线程当前的状态,可调用getState方法。
由上图可以看出,一个线程由出生到死亡分为五个阶段:
1).创建状态
•当用new操作符创建一个新的线程对象时,该线程处于创建状态。
•处于创建状态的线程只是一个空的线程对象,系统不为它分配资源
2). 可运行状态
• 执行线程的start()方法将为线程分配必须的系统资源,安排其运行,并调用线程体—run()方法,这样就使得该线程处于可运行( Runnable )状态。
• 这一状态并不是运行中状态(Running ),因为线程也许实际上并未真正运行。这取决于操作系统给线程提供的运行时间。
一旦一个线程开始运行,它不必始终保持运行。事实上,运行中的线程被中断,目的是为了让其他线程获得运行机会。线程调度的细节依赖于操作系统提供的服务。抢占式调度系统给每一个可运行线程一个时间片来执行任务。当时间片用完,操作系统剥夺该线程的运行权,并给另一个线程机会运行。
3).被阻塞的线程或等待线程
当一个线程试图获取一个内部的对象锁,而该锁被其他的线程持有,则该线程进入阻塞状态。当所有其他线程释放该锁,并且线程调度器允许本线程持有它的时候,该线程将变为非阻塞状态。
当线程等待另一个线程通知调度器一个条件时,它自己 进入等待状态。
方法有一个超时参数时。调用他们导致线程进入计时等待状态。这一状态将一直保持到超时期满或者收到适当的通知。带有超时参数的方法有:Thread.sleep
4). 被终止的线程
两个原因之一而被终止:
当线程的run方法执行结束后,该线程自然消亡。
因为一个没有捕获的异常终止了run方法而意外死亡。
阻塞队列
对于实际贬称过来说,应该尽可能原理底层结构。使用由并发处理的专业人士实现的较高层次的结构要方便和安全的多。
许多线程问题可以通过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插入元素,消费者线程则取出它们。使用队列,可以安全地从一个线程向另一个线程传递数据。例如,转账程序中,转账线程将转账指令对象插入一个队列中,而不是直接访问银行对象。另一个线程从队列中取出指令执行转账。只有该线程可以访问该银行对象的内部。因此不需要同步。(当然,线程安全的队列类的实现者不能不考虑锁和条件。)
当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(BlockingQueue)导致线程阻塞。队列会自动地平衡负载。
LinkedBlockingQueue的容量在默认下是没有上边界的,也可设置之。
阻塞队列方法:
方法 | 正常动作 | 特殊情况下的动作 |
---|---|---|
add | 添加一个元素 | 队列满时抛出IllegalStateException异常 |
element | 返回队列的头元素 | 队列空时抛出NoSuchElementException异常 |
offer | 添加一个元素并返回true | 如果队列满,则返回false |
peek | 返回队列的头元素 | 如果队列空,则返回null |
poll | 移出并返回队列的头元素 | 如果队列空,则返回null |
put | 添加一个元素 | 如果队列满,则阻塞 |
remove | 移出并返回头元素 | 队列空时抛出NoSuchElementException异常 |
take | 移出并返回头元素 | 如果队列空,则阻塞 |
poll和peek方法返回空来指示失败,因此,向这些队列中插入null值是非法的。
还有带有超时的offer方法和poll方法的变体。例如
boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);
尝试在100ms内在队列的尾部插入一个元素。如果成功返回true;否则,达到超时时返回false。类似地,下面的调用:
Object head = q.poll(100, TimeUnit.MILLISECONDS);
尝试在100ms内移除队列的头元素;如果成功返回头元素,否则,达到超时时返回false。
阻塞队列练习,开启一个线程读取目录结构,把文件添加到阻塞队列中,另外数个线程从队列中读取文件,并把文件中包含已设定关键字的行打印出来:
package learn.test.object;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueTest
{
public static void main(String[] args)
{
BlockingQueue<File> q = new LinkedBlockingQueue<File>(Q_SIZE);
FilePicker aFilePicker = new FilePicker(q, ROOT_DIR);
new Thread(aFilePicker).start();
for(int i=0; i< THREAD_COUNT; i++)
{
FileAnalyzer analyzer = new FileAnalyzer(q, KEY_WORD);
new Thread(analyzer).start();
}
}
public static final String ROOT_DIR = "/home/joseph/Documents/WorkSpace/Source/Java";
public static final String KEY_WORD = "public";
public static final int THREAD_COUNT = 10;
public static final int Q_SIZE = 10;
}
class FilePicker implements Runnable
{
public FilePicker(BlockingQueue<File> q, String rootDir)
{
this.q = q;
this.rootDir = rootDir;
}
public void run()
{
try
{
pickFile();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
public void pickFile() throws InterruptedException
{
File root = new File(rootDir);
if(!root.exists())
System.out.println("the selected file/dir do not exists");
pickFile(root);
q.put(END_FLAG);
}
public void pickFile(File file) throws InterruptedException
{
if(file.isDirectory())
{
for(File subFile : file.listFiles())
pickFile(subFile);
}
else
{
q.put(file);
}
}
private BlockingQueue<File> q;
private String rootDir;
public static final File END_FLAG = new File("");
}
class FileAnalyzer implements Runnable
{
public FileAnalyzer(BlockingQueue<File> q, String key)
{
this.key = key;
this.q = q;
}
public void run()
{
try
{
while((curFile = q.take()) != FilePicker.END_FLAG)
{
Scanner in = new Scanner(new FileInputStream(curFile));
int lineCount = 1;
while(in.hasNext())
{
String str = in.nextLine();
if(str.contains(key))
System.out.printf("thread [%s] line [%d] in file [%s] : %s%n",Thread.currentThread().getName(), lineCount++, curFile.getName(), str);
}
}
q.put(FilePicker.END_FLAG);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
catch (FileNotFoundException e)
{
e.printStackTrace();
}
}
private File curFile;
private BlockingQueue<File> q;
private String key;
}
线程安全的集合
[ArrayList线程不安全分析]
一个 ArrayList ,在添加一个元素的时候,它可能会有两步来完成:
1. 在 Items[Size] 的位置存放此元素;
2. 增大 Size 的值。
在单线程运行的情况下,如果 Size = 0,添加一个元素后,此元素在位置 0,而且 Size=1;
而 如果是在多线程情况下,比如有两个线程,线程 A 先将元素存放在位置 0。但是此时 CPU 调度线程A暂停,线程 B 得到运行的机会。线程B也向此 ArrayList 添加元素,因为此时 Size 仍然等于 0 (注意哦,我们假设的是添加一个元素是要两个步骤哦,而线程A仅仅完成了步骤1),所以线程B也将元素存放在位置0。
线程不安全的例子:
public class ArrayListInThread implements Runnable {
List<String> list1 = new ArrayList<String>(1);// not thread safe
// List<String> list1 = Collections.synchronizedList(new ArrayList<String>());// thread safe
public void run() {
try {
Thread.sleep((int)(Math.random() * 2));
}
catch (InterruptedException e) {
e.printStackTrace();
}
list1.add(Thread.currentThread().getName());
}
public static void main(String[] args) throws InterruptedException {
ThreadGroup group = new ThreadGroup("testgroup");
ArrayListInThread t = new ArrayListInThread();
for (int i = 0; i < 10000; i++) {
Thread th = new Thread(group, t, String.valueOf(i));
th.start();
}
while (group.activeCount() > 0) {
Thread.sleep(10);
}
System.out.println();
System.out.println(t.list1.size()); // it should be 10000 if thread safe collection is used.
}
}
作业
使用同步机制修正窗口卖票问题。