一.多线程概念与优点
- 程序,进程与线程
-都是运行在操作系统之上
-程序:即一段静态的代码。是为了完成特定任务的一组指令集合
-进程:即正在运行的程序,是一个动态的过程,有其自身的生命周期
-线程:进程可细化为线程,是程序内部的执行路径,若有多个线程即为多线程 - 优点
发挥多核cpu优势,防止阻塞,便于建模(如fork-join框架:采用 “工作窃取”模式(work-stealing))
二.创建线程方式
- 继承Thread类:
实现方式:new Thread().start() - 继承Runnable接口:
实现方式:new Thread(Rb对象).start() - 继承Callable接口
- 区别:Callable 接口类似于 Runnable,但是 Runnable 不会返回结果,并且无法抛出经过检查的异常,而它常和future与futuretask何用得到返回值。
- 实现方式:td(Callable对象)
FutureTask<Integer> result = new FutureTask<>(td);
new Thread(result).start();
Integer sum = result.get()(获取返回的结果);
- 线程池创建线程:
- 实现方式:ExecutorService pool = Executors.newFixedThreadPool(5)
- //执行传统Runnable任务
pool .execute(new RunnableTask());
//执行Callable任务并获得任务结果Future
Future future = pool.submit(new CallableTask());
三.线程的生命周期
四 线程同步方式
- syschronized关键字
加锁方式:
加入同步代码块: synchronized(锁){...}
同步方法:synchronized 权限修饰符 返回值类型 方法名(参数){}锁的对象: 静态方法的同步锁是当前类.class对象,非静态方法的同步锁是当前对象this,同步块的锁就是synchronized()传入的对象
实例:传入同一个类不同对象,线程获取的是不同的锁,本演示消费者A与消费者B为同一个锁(clerk对象),消费者C与消费者D为同一个锁(clerk1对象),加static int product会在各线程共享变量。如果想要所有线程公用一个方法,则在方法加static,此时同一个类不同对象的锁为(clerk.class对象)
public class SynchronizedTest {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Clerk clerk1 = new Clerk();
Consumer con = new Consumer(clerk);
Consumer con1 = new Consumer(clerk1);
new Thread(con, "消费者 A").start();
new Thread(con, "消费者 B").start();
new Thread(con1, "消费者 C").start();
new Thread(con1, "消费者 D").start();
}
}
// 店员
class Clerk {
private //*static*// int product = 5;
// 进货
public synchronized void get() { // 循环:0
while (product > 5) {
System.out.println("产品已满!");
try {
this.wait();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + " : " + product++);
this.notifyAll();
}
// 卖货
public synchronized void sale() {// product = 0, 循环: 0
while (product <= 0) { //为了避免虚假唤醒,wait() 应该总是使用在 循环中
System.out.println("缺货!");
try {
this.wait();// ----
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + " : " + product--);
this.notifyAll();
}
}
// 生产者
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
clerk.get();
}
}
}
// 消费者
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
super();
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.sale();
}
}
//不加static结果
产品已满!
消费者 A : 5
消费者 B : 4
消费者 C : 5
消费者 D : 4
消费者 A : 3
消费者 C : 3
消费者 B : 2
消费者 D : 2
消费者 B : 1
消费者 C : 1
缺货!
缺货!
缺货!
缺货!
//加了static结果
消费者 B : 5
消费者 C : 4
消费者 A : 3
消费者 D : 2
消费者 A : 0
消费者 C : 1
缺货!
缺货!
缺货!
缺货!
- ReentrantLock
- 加锁方式:
private Lock lock = new ReentrantLock();
//线程通信
private Condition condition2 = lock.newCondition();
public void loopA(int totalLoop) {
lock.lock();
...
lock.unLock();
}
- ReentrantLock与syschronized的区别
- syschronized是关键字,Lock是类
- syschronized加锁原理是基于JVM指令,是隐式锁,性能较差且缺乏灵活性(线程要么得到锁,要么阻塞);而ReentrantLock是可重入的显式锁,作用和synchronized关键字类似,但性能、灵活性要更高一点。
- ReentrantLock可获得Condition接口的实例以便对通信线程进行更灵活的控制
class AlternateDemo {
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private int number = 1; //用于确定当前执行线程的标记
public void loopA(int totalLoop) {
lock.lock();
try {
//判断
if (number != 1) {
try {
condition1.await();
} catch (InterruptedException e) {
}
}
//打印
for (int i = 1; i <= 1; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + totalLoop);
}
//唤醒
number = 2;
condition2.signal();
} finally {
lock.unlock();
}
}
public void loopB(int totalLoop) {
lock.lock();
try {
//判断
if (number != 2) {
try {
condition2.await();
} catch (InterruptedException e) {
}
}
//打印
for (int i = 1; i <= 1; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + totalLoop);
}
//唤醒
number = 1;
condition1.signal();
} finally {
lock.unlock();
}
}
}
- CountDownLatch闭锁
加锁方式:用给定的计数 初始化 CountDownLatch。由于调用了 countDown()方法,所以在当前计数到达零之前,await方法会一直受阻塞。在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
CountDownLatch signal = new CountDownLatch(1);
signal.await();
signal.countDown();示例
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(3);
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i=0;i<3;i++){
executorService.execute(new Worker(startSignal, doneSignal));
}
System.out.println("准备完毕");
startSignal.countDown();
System.out.println("开始执行");
doneSignal.await();
System.out.println("执行完毕");
}
static class Worker implements Runnable {
private static Integer count=1;
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork(count);
count++;
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork(Integer count) {
System.out.println(Thread.currentThread().getName()+"-"+count);
}
}
}
//结果
准备完毕
开始执行
pool-1-thread-1-1
pool-1-thread-2-2
pool-1-thread-3-1
执行完毕
- CyclicBarrier
- 加锁方式:
CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() { });//指定所有线程完成后后续任务
CyclicBarrier barrier = new CyclicBarrier(N);//无后续任务
cyclicBarrier.await();//各个线程完成等待其他线程完成 - 与CountDownLatch区别
1.CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成再执行后续任务。CountDownLatch就像接力赛,只有前三个选手(线程)完成,后一个选手(线程)才能继续任务;CyclicBarrier就像个大坝,所有河流的水的汇聚一起,然后到达预设位置(barrier)时,一起开闸放水。
2.CyclicBarrier可重用,CountDownLatch是减计数的,而CyclicBarrier是加计数的。
- 加锁方式:
public class CyclicBarrierTest {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() {
@Override
public void run() {
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
});
for (int i = 0; i < N; i++)
new Writer(barrier).start();
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("CyclicBarrier重用");
for(int i=0;i<N;i++) {
new Writer(barrier).start();
}
}
static class Writer extends Thread {
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
//结果
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
CyclicBarrier重用
线程Thread-4写入数据完毕,等待其他线程写入完毕
线程Thread-5写入数据完毕,等待其他线程写入完毕
线程Thread-6写入数据完毕,等待其他线程写入完毕
线程Thread-7写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
- 信号量Semaphore
- 设置最大线程数(一般使用公平模式):private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
线程获取一个资源:available.acquire();(剩余许可的线程数-1)
一个线程释放资源:available.release();(剩余许可的线程数+1)
- 设置最大线程数(一般使用公平模式):private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
- Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目
public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(6);
ContentPool contentPool = new ContentPool();
System.out.println("总剩余资源为6,每次只能允许4个线程获得资源,当线程资源释放时,其他线程才可以继续获得资源");
for (int i = 0; i < 6; i++) {
executorService.execute(new user(contentPool));
}
}
}
class ContentPool {
private static final int MAX_AVAILABLE = 4;
private static int count = 6;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
--count;
System.out.println("占用一个资源,剩余资源为:" + count);
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
++count;
System.out.println("释放一个资源,剩余资源为:" + count);
}
protected int[] items = {1, 2, 3, 4,5,6};
protected boolean[] used = new boolean[MAX_AVAILABLE];
synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
class user implements Runnable {
private ContentPool contentPool;
private int count = (int) (Math.random() * 5+ 1);
public user(ContentPool contentPool) {
this.contentPool = contentPool;
}
@Override
public void run() {
try {
Object item = contentPool.getItem();
System.out.println(Thread.currentThread().getName()+"得到资源"+item);
Thread.sleep(1000);
contentPool.putItem(count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//结果
总剩余资源为6,每次只能允许4个线程获得资源,当线程资源释放时,其他线程才可以继续获得资源
占用一个资源,剩余资源为:5
pool-1-thread-2得到资源1
占用一个资源,剩余资源为:4
占用一个资源,剩余资源为:3
pool-1-thread-6得到资源2
pool-1-thread-1得到资源3
占用一个资源,剩余资源为:2
pool-1-thread-5得到资源4
释放一个资源,剩余资源为:3
占用一个资源,剩余资源为:2
pool-1-thread-3得到资源3
释放一个资源,剩余资源为:4
释放一个资源,剩余资源为:5
释放一个资源,剩余资源为:6
占用一个资源,剩余资源为:3
pool-1-thread-4得到资源1
释放一个资源,剩余资源为:5
释放一个资源,剩余资源为:6