一 CountDownLatch
用来干什么
CountDownLatch
被用来同步一个或多个任务,强制它们等待其它任务执行的一组操作完成。比如:在开会的时候,大家都会等到人齐后才会开始(同步)。
相关方法
//构造器
public CountDownLatch(int count){} // 参数为 count 的计数器
//方法
public void await() throws InterruptedException { }; //调用await()方法的线程会被挂
起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//和await()类似,只不过等待一定 的时间后count值还没变为0的话就会继续执行
public void countDown() { }; //将count值减1
我们可以向 CountDownLatch
对象设置一个初始计数值,那么在这个对象上调用 await()
的方法都将会被堵塞,直至这个计数为0。其它任务在结束工作后,可以调用 countDown()
来减小这个计数值(-1)。 CountDownLatch
只能触发一次,计数值不可以被重置,如果需要多次使用,可以使用 CyclicBarrier
。
栗子
public class Test {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
new Thread() {
public void run() {
try {
System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
Thread.sleep(1000);
System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
;
}.start();
new Thread() {
public void run() {
try {
System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
Thread.sleep(2000);
System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
;
}.start();
try {
System.out.println("等待2个子线程执行完毕");
latch.await();
System.out.println("2个子线程已经执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**output
线程Thread-0正在执行
线程Thread-1正在执行
等待2个子线程执行完毕
线程Thread-0执行完毕
线程Thread-1执行完毕
2个子线程已经执行完毕
/**
二 CyclicBarrier
用来干什么
CyclicBarrier
和 CountDownLatch
很类似,只是 CountDownLatch
只能触发一次的事件(某一次重要的会议需要等人齐后在开),而 CyclicBarrier
则可以被重复使用(每一次开会都需要等人齐后开)
相关方法
构造器
public CyclicBarrier(int parties, Runnable barrierAction) {}//
public CyclicBarrier(int parties) {}
parties:指可以让多少个线程执行 await(),执行 await() 的数量和 parties 相等时(线程同步),
那么就可以执行下面的任务
barrierAction:当线程达到同步后,需要执行的任务(线程随机执行)
//方法
public int await() throws InterruptedException, BrokenBarrierException { };
//用来挂起当前线程,直至所有线程都执行 await() 后再同时执行后续任务;
public int await(long timeout, TimeUnit unit)throws InterruptedException
,BrokenBarrierException,TimeoutException { };
栗子
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() {
@Override
public void run() {
System.out.println("当前线程"+Thread.currentThread().getName());
}
});
for(int i=0;i<N;i++){
new Writer(barrier).start();
}
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(1000);
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}
三 PriorityBlockingQueue
PriorityBlockingQueue
是一个线程安全的优先级队列,PriorityBlockingQueue
中存储的对象必须实现 Comparable
接口并实现 compareTo()
方法。
方法
栗子
public class Test {
public static PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<User>();
public static void main(String[] args) {
queue.add(new User(1, "1"));
queue.add(new User(5, "5"));
queue.add(new User(23, "23"));
queue.add(new User(55, "55"));
queue.add(new User(9, "9"));
queue.add(new User(3, "3"));
for (User user : queue) {
try {
System.out.println(queue.take().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class User implements Comparable<User> {
private int age;
private String name;
public User(int age, String name) {
this.age = age;
this.name = name;
}
@Override
public int compareTo(User o) {
return this.age > o.age ? -1 : 1;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
/**output
55
23
9
5
3
1
**/
四 ScheduledExecutorService
ScheduledExecutorService
提供了按时间安排执行任务的功能。
方法
其中:
schedule(task,initDelay)
// 安排所提交的Callable或Runnable任务在initDelay指定的时间后执行。
scheduleAtFixedRate(Runnable command, //执行的线程
long initialDelay, //初始化延迟时间
long period, //两次执行的间隔时间
TimeUnit unit);//计时单位
//安排所提交的Runnable任务按指定的间隔重复执行
scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
//安排所提交的Runnable任务在每次执行完后,等待delay所指定的时间后重复执行。
栗子
/**
* 每天晚上8点执行一次
* 每天定时安排任务进行执行
*/
public static void executeEightAtNightPerDay() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
long oneDay = 24 * 60 * 60 * 1000;
long initDelay = getTimeMillis("20:00:00") - System.currentTimeMillis();
initDelay = initDelay > 0 ? initDelay : oneDay + initDelay;
executor.scheduleAtFixedRate(
new EchoServer(),
initDelay,
oneDay,
TimeUnit.MILLISECONDS);
}
/**
* 获取指定时间对应的毫秒数
* @param time "HH:mm:ss"
* @return
*/
private static long getTimeMillis(String time) {
try {
DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
return curDate.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
}
class EchoServer implements Runnable {
@Override
public void run() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("This is a echo server. The current time is " +
System.currentTimeMillis() + ".");
}
}
五 Semaphore
Semaphore
可以控制同时访问线程的个数。
相关方法
//构造器
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//permits表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits, boolean fair) {
sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}
//fair表示是否是公平的,即等待时间越久的越先获取许可
//方法
public void acquire() throws InterruptedException { } //获取一个许可
public void acquire(int permits) throws InterruptedException { }//获取permits个许可
public void release() { } //释放一个许可
public void release(int permits) { } //释放permits个许可
public boolean tryAcquire() { }; //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public int availablePermits()//得到可用的许可数目。
栗子
public class Test {
public static void main(String[] args) {
int N = 8;
Semaphore semaphore = new Semaphore(5);
for(int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("worker"+this.num+"占用一个坑..");
Thread.sleep(2000);
System.out.println("worker"+this.num+"释放出一个坑");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**output
worker0占用一个一个坑...
worker1占用一个一个坑...
worker2占用一个一个坑...
worker4占用一个一个坑...
worker5占用一个一个坑...
worker0释放出一个坑
worker2释放出一个坑
worker3占用一个一个坑...
worker7占用一个一个坑...
worker4释放出一个坑
worker5释放出一个坑
worker1释放出一个坑
worker6占用一个一个坑...
worker3释放出一个坑
worker7释放出一个坑
worker6释放出一个坑
**/