1.CompletableFuture的使用
使用CompletableFuture编排 AService.get()、BService.get()、CService.get(int i) (ABC三个服务返回结果都是int),满足使用A、B的结果相加,再作为参数传给C,取得C的结果后乘以 100 后返回。
要求:
1.显示指定线程池
2.处理异常情况,异常情况返回0。
public class ThreadOrchestration {
// 手动创建线程池
private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(
3,
6,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) {
int result = calculateResult();
System.out.println("Final Result: " + result);
// 关闭线程池
executorService.shutdown();
}
public static int calculateResult() {
CompletableFuture<Integer> taskA = CompletableFuture.supplyAsync(AService::get, executorService);
CompletableFuture<Integer> taskB = CompletableFuture.supplyAsync(BService::get, executorService);
CompletableFuture<Integer> combinedFuture = taskA.thenCombine(taskB, (a, b) -> a + b)
.thenCompose(result -> CompletableFuture.supplyAsync(() -> CService.get(result), executorService));
try {
int finalResult = combinedFuture.get();
return finalResult * 100;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return 0;
}
}
}
class AService {
public static int get() {
return 10;
}
}
class BService {
public static int get() {
return 20;
}
}
class CService {
public static int get(int num) {
return num;
}
}
2.消息队列实现
编写一个队列,生产者生产消息(消息内容是1,2,3,...,n),消费者消费消息,并将消息打印到控制台。
要求:
1.生产者以每秒10个的速率生产消息,队列满了后阻塞等待;
2.队列长度为100;
3.消费者以每秒1个的速率消费消息;
生产者和消费者在不同的线程;
public class ProducerConsumerTest1 {
// 消息队列容量
private static final int QUEUE_CAPACITY = 100;
// 每秒10个消息
private static final int PRODUCER_RATE = 10;
// 每秒1个消息
private static final int CONSUMER_RATE = 1;
public static void main(String[] args) throws InterruptedException {
BlockingQueue<AtomicInteger> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
AtomicInteger message = new AtomicInteger(0);
// 生产者任务
Thread producerThread = new Thread(() -> {
while (true) {
// 将消息放入队列,如果队列已满则阻塞等待
try {
message.getAndIncrement();
queue.put(message);
System.out.println(Thread.currentThread().getName() + " 生产者生产消息:" + message.get());
TimeUnit.MILLISECONDS.sleep(1000 / PRODUCER_RATE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "producer-thread");
Thread consumerThread = new Thread(() -> {
while (true) {
// 从消息队列中取出消息,如果队列为空则阻塞
try {
int num = queue.take().get();
System.out.println(Thread.currentThread().getName() + "----消费者消费消息----:" + num);
TimeUnit.MILLISECONDS.sleep(1000 / CONSUMER_RATE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "consumer-thread");
producerThread.start();
consumerThread.start();
// 等待30后中断生产者和消费者线程
TimeUnit.SECONDS.sleep(30);
producerThread.interrupt();
consumerThread.interrupt();
}
}
3.消费者消费不到消息就阻塞等待,并且不能重复消费
public class ProducerConsumerTest2 {
// 消息队列容量
private static final int QUEUE_CAPACITY = 100;
// 每秒10个消息
private static final int PRODUCER_RATE = 10;
// 每秒1个消息
private static final int CONSUMER_RATE = 1;
// 消费者数量
private static final int CONSUMER_NUM = 20;
public static void main(String[] args) throws InterruptedException {
BlockingQueue<AtomicInteger> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
Set<Integer> set = Collections.synchronizedSet(new HashSet<Integer>());
ExecutorService threadPool = Executors.newFixedThreadPool(CONSUMER_NUM);
AtomicInteger message = new AtomicInteger(0);
// 生产者任务
Thread producerThread = new Thread(() -> {
while (true) {
// 将消息放入队列,如果队列已满则阻塞等待
try {
message.getAndIncrement();
queue.put(message);
System.out.println(Thread.currentThread().getName() + " 生产者生产消息:" + message.get());
TimeUnit.MILLISECONDS.sleep(1000 / PRODUCER_RATE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "producer-thread");
for (int i = 0; i < CONSUMER_NUM; i++) {
threadPool.execute(() -> {
while (true) {
// 从消息队列中取出消息,如果队列为空则阻塞
try {
int num = queue.take().get();
// 判断是否重复消费
if (!set.contains(num)) {
set.add(num);
System.out.println(Thread.currentThread().getName() + "----消费者消费消息----:" + num);
}
TimeUnit.MILLISECONDS.sleep(1000 / CONSUMER_RATE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
producerThread.start();
// 等待30后中断生产者线程、关闭线程池资源
TimeUnit.SECONDS.sleep(30);
producerThread.interrupt();
threadPool.shutdown();
}
}
4.创建两个线程,一个线程打印奇数,另一个线程打印偶数
要求打印结果按顺序交替输出,例如:1、2、3、4、5、6、...
public class EvenOddPrinter {
private static final Object lock = new Object();
private static int count = 1;
private static final int max = 100;
public static void main(String[] args) {
// thread1 打印奇数数字
Thread thread1 = new Thread(() -> {
while (count < max) {
synchronized (lock) {
if (count % 2 == 1) {
System.out.println(Thread.currentThread().getName() + ": " + count);
count ++;
}
Thread.yield();
}
}
});
// thread2 打印偶数数字
Thread thread2 = new Thread(() -> {
while (count < max) {
synchronized (lock) {
if (count % 2 == 0) {
System.out.println(Thread.currentThread().getName() + ": " + count);
count ++;
}
Thread.yield();
}
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(Thread.currentThread().getName() + ": 两个子线程交替打印数字完毕");
}
}
5.基本的线程调度问题
编写一个Java函数,通过调用AService.get()、BService.get()、CService.get()三个接口,获取三个整数,然后将这三个整数累加,最终返回累加的值。要求:
1.调用三个接口的操作需要并行执行,以提高效率;
2.累加操作需要在获取三个整数的操作完成后进行,因此需要保证三个整数均已获取后才能进行累加操作;
3.考虑多线程安全问题。
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Service aService = new AService();
Service bService = new BService();
Service cService = new CService();
CompletableFuture<Integer> taskA = CompletableFuture.supplyAsync(aService::get);
CompletableFuture<Integer> taskB = CompletableFuture.supplyAsync(bService::get);
CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(cService::get);
// 完成所有任务,才可以进行下一步累加操作
CompletableFuture<Void> allFutures = CompletableFuture.allOf(taskA, taskB, taskC);
// 等待所有任务执行完成
allFutures.join();
int total = 0;
// 累加三个任务的执行结果
total = taskA.get() + taskB.get() + taskC.get();
System.out.println("total: " + total);
}
}
interface Service {
int get();
}
class AService implements Service{
@Override
public int get() {
return 10;
}
}
class BService implements Service{
@Override
public int get() {
return 20;
}
}
class CService implements Service{
@Override
public int get() {
return 30;
}
}