思维导图
Guarded Suspension 模式简介
Guarded Suspension 模式的核心是一个受保护方法(Guarded Method)。该方法在执行其所需要真正执行的操作时需要满足特定的条件(Predicate,称为保护条件)。当条件状态不满足时,执行受保护方法的线程会被挂起并进入等待状态(WAITING)状态,直到条件满足,该线程才会继续运行。此时,受保护方法才会真正执行其所要执行的操作,所要执行的操作又称为:目标动作。
GuardedObject 角色是一个持有被守护的方法(guardedMethod)的类,当线程执行 guardedMethod 方法时,若守护条件成功,则可以立即执行操作;不成立时,就要进行等待。除了 guardedMethod方法,GuardedObject 角色还应该有持有其他改变实例状态的方法(stateChangingMethod)。stateChangingMethod 通过 notify/notifyAll 方法来实现,guardedMethod 通过 while 语句和 wait 方法来实现。
Guarded Suspension 模式的特点:
- 存在循环
- 存在条件检查
- 因为某种原因而等待
核心代码:
while (守护条件) {
try {
wait();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
// 执行操作
todo();
应用场景
服务器请求多,但服务器程序不能丢弃任何一个客户的请求。当大量客户请求到来,而我们不能放弃任何一个请求,必须让客户请求排队,由服务器一个一个去处理。
Guarded Suspension模式:既可以最大限度的保护服务器在大量用户访问时不挂掉,同时也能一个一个处理用户的请求 ,保证所有客户端请求均不丢失。
实例代码
Request封装客户端请求。RequestQueue表示客户端请求队列(充当中间缓存,存放未处理的请求),由Client端和Server端共同维护。Client端负责不断发起请求,并将请求对象放入队列中。Server端则根据自身状态,在有能力的情况下,从RequestQueue队列中提取请求对象并进行处理。
时序图,待完善。
名字 | 说明 |
---|---|
Request | 表示一个请求的类 |
RequestQueue | 依次存放请求的类 |
ClientThread | 发送请求的类 |
ServerThrad | 接受请求的类 |
Main | 测试类 |
package com.learn.concurrent.design;
/**
* @Auther: fc.w
* @Date: 2021/1/11 22:37
* @Description:
*/
public class Request {
private String name;
public Request(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public String toString() {
return "Request{" +
"name='" + name + '\'' +
'}';
}
}
package com.learn.concurrent.design;
import java.util.LinkedList;
import java.util.Queue;
/**
* 此类用于依次存放请求,RequestQueue 通过 putRequest 放入 Request 实例,并按顺序使用 getRequest 取出 Request 实例。
* 这种结构通常称为队列(queue)或者 FIFO(First In First Out ,先进先出)
* @Auther: fc.w
* @Date: 2021/1/11 22:38
* @Description:
*/
public class RequestQueue {
private final Queue<Request> queue = new LinkedList<>();
/**
* 取出最先放在RequestQueue的中的一个请求,作为返回值,如果一个请求也没有就一直等待,直到其他线程执行putRequest.
* @return
*/
public synchronized Request getRequest() {
while (queue.peek() == null) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return queue.remove();
}
/**
* 添加一个请求
* @param request
*/
public synchronized void putRequest(Request request) {
queue.offer(request);
notifyAll();
}
}
package com.learn.concurrent.design;
import java.util.Random;
/**
* 表示发送请求的线程。ClientThread持有RequestQueue的实例,并连续调用该实例的putRequest,放入请求。
* @Auther: fc.w
* @Date: 2021/1/11 22:43
* @Description:
*/
public class ClientThread extends Thread {
private final Random random;
private final RequestQueue requestQueue;
public ClientThread(RequestQueue requestQueue, String name, long seed) {
super(name);
this.requestQueue = requestQueue;
this.random = new Random(seed);
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Request request = new Request("No." + i);
System.out.println(Thread.currentThread().getName() + " requests " + request);
requestQueue.putRequest(request);
try {
// 为了错开发送请求的执行点,使用java.util.Random类随机生成0-1000之间的数
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.learn.concurrent.design;
import java.util.Random;
/**
* 用于表示接收请求的线程,该类也持有RuquestQueue的实例,ServerThread使用该实例的getRequst方法来接受请求。
* @Auther: fc.w
* @Date: 2021/1/11 22:48
* @Description:
*/
public class ServerThread extends Thread{
private final Random random;
private final RequestQueue requestQueue;
public ServerThread(RequestQueue requestQueue,String name ,long seed){
super(name);
this.requestQueue = requestQueue;
this.random = new Random(seed);
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
Request request = requestQueue.getRequest();
System.out.println(Thread.currentThread().getName() + " handles " + request);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.learn.concurrent.design;
/**
* @Auther: fc.w
* @Date: 2021/1/11 22:51
* @Description:
*/
public class Main {
public static void main(String[] args) {
RequestQueue requestQueue = new RequestQueue();
new ClientThread(requestQueue,"client-Alice",3141592L).start();
new ServerThread(requestQueue,"server-Bobby",6535897L).start();
}
}
运行结果:
client-Alice requests Request{name='No.0'}
server-Bobby handles Request{name='No.0'}
client-Alice requests Request{name='No.1'}
client-Alice requests Request{name='No.2'}
server-Bobby handles Request{name='No.1'}
server-Bobby handles Request{name='No.2'}
client-Alice requests Request{name='No.3'}
server-Bobby handles Request{name='No.3'}
client-Alice requests Request{name='No.4'}
server-Bobby handles Request{name='No.4'}
Java 标准库实例
从JDK 1.5 开始提供的阻塞队列类LinkedBlockingQueue就是使用 Guarded Suspension 模式。 该类的 take 方法用于从队列中取出一个元素。 如果take方法被调用时队列为空,则当前线程会被阻塞; 直到队列不为空时,该方法才返回一个出队列的元素,只不过LinkedBlockingQueue在实现Guarded Suspension 模式时直接使用了Condition。
实现时易出错的重要技术细节
-
内存可见性和锁泄漏(Lock Leak)
为了保证保护条件中涉及的变量内存可见性,引入ReentrantLock锁。使用该锁时需要注意临界区中的代码无论执行正常与否,进入临界区前获得的锁实例都应该被释放。否锁就会出现锁泄漏现象;锁对象被某个线程获得,但永远不会被该线程释放,导致其他线程无法获得该锁。为了避免锁泄漏,使用ReentrantLock 的临界区代码需要按照以下格式来编写:
public E take() throws InterruptedException {
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
} finally {
takeLock.unlock();
}
}
-
线程被过早的唤醒
为了应对被挂起的线程可能被“过早” 地唤醒的情景,将 take 方法和保护条件的检测对 Contion 实例的 await 方法的调用总是放到一个 while 循环而非 if 语句中。
public E take() throws InterruptedException {
takeLock.lockInterruptibly();
try {
while (保护条件不成立) {
notEmpty.await();
}
// 执行目标动作
// TODO
} finally {
takeLock.unlock();
}
参考资料
https://blog.csdn.net/weixin_42245930/article/details/88761176
Java多线程并发编程-设计模式篇