前言
随着业务的越来越复杂,保证程序的健壮性对程序猿来说也变得更加的重要,毕竟不写Bug的程序猿不是一个好的程序猿。但怎样尽可能的保证咱们的程序能够稳定的运行,以及出错后能够进行相应的补偿,这里就需要咱们使用熔断机制了。
PS:在进入正文之前,不妨思考一下两个问题:
①熔断机制究竟为我们解决了什么问题?
②我们怎样去自己实现一个简单的熔断?
自定义熔断的实现
这里咱们简单的实现了一个超时后进行熔断的例子,这里有用到AspectJ的相关知识,对于熟悉Spring AOP知识的同学应该没什么问题。
主要分为两步:
- 使用
Future
控制是否超时,超时后将任务cancel
掉。 - 调用咱们自己定义好的
fallback
方法进行处理。在这里需要注意的是,fallback
方法参数应该要与原方法相同,这样咱们才能进行补偿措施。例如:咱们可以在fallback
方法借助消息中间件将这些参数进行存储,然后在适当的时候从消息中间件中读取出来进行补偿消费处理。
@RestController
public class HelloController {
private Random random = new Random();
@MyHystrixCommand(fallback="errorMethod")
@RequestMapping("/hello")
public String hello(@RequestParam("name") String message) throws InterruptedException {
int time = random.nextInt(200);
System.out.println("spend time : " + time + "ms");
Thread.sleep(time);
System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhh");
return "hello world:" + message;
}
public String errorMethod(String message) {
return "error message";
}
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyHystrixCommand {
int value() default 100;
String fallback() default "";
}
@Aspect
@Component
public class MyHystrixCommandAspect {
ExecutorService executor = Executors.newFixedThreadPool(10);
@Pointcut(value = "@annotation(MyHystrixCommand)")
public void pointCut() {
}
@Around(value = "pointCut()&&@annotation(hystrixCommand)")
public Object doPointCut(ProceedingJoinPoint joinPoint, MyHystrixCommand hystrixCommand) throws Throwable {
int timeout = hystrixCommand.value();
Future future = executor.submit(() -> {
try {
return joinPoint.proceed();
} catch (Throwable throwable) {
}
return null;
});
Object returnValue = null;
try {
returnValue = future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
future.cancel(true);
if (StringUtils.isBlank(hystrixCommand.fallback())){
throw new Exception("fallback is null");
}
returnValue = invokeFallbackMethod(joinPoint, hystrixCommand.fallback());
}
return returnValue;
}
private Object invokeFallbackMethod(ProceedingJoinPoint joinPoint, String fallback) throws Exception {
Method method = findFallbackMethod(joinPoint, fallback);
if (method == null) {
throw new Exception("can not find fallback :" + fallback + " method");
} else {
method.setAccessible(true);
try {
Object invoke = method.invoke(joinPoint.getTarget(), joinPoint.getArgs());
return invoke;
} catch (IllegalAccessException | InvocationTargetException e) {
throw e;
}
}
}
private Method findFallbackMethod(ProceedingJoinPoint joinPoint, String fallbackMethodName) {
Signature signature = joinPoint.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method method = methodSignature.getMethod();
Class<?>[] parameterTypes = method.getParameterTypes();
Method fallbackMethod = null;
try {
//这里通过判断必须取和原方法一样参数的fallback方法
fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallbackMethodName, parameterTypes);
} catch (NoSuchMethodException e) {
}
return fallbackMethod;
}
}
当然,上述例子只是一个简单的超时后熔断处理的实现方式。咱们在实际应用中,还有可能并发超过指定阈值后咱们也需要进行降级处理,一个最普通的场景:秒杀案例。这些东西在Hystrix
中都有相应的处理,它提供了线程池和信号量这两种方式去解决并发的问题。
什么是Hystrix?
咱们看一下官方介绍
In a distributed environment, inevitably some of the many service dependencies will fail. Hystrix is a library that helps you control the interactions between these distributed services by adding latency tolerance and fault tolerance logic. Hystrix does this by isolating points of access between the services, stopping cascading failures across them, and providing fallback options, all of which improve your system’s overall resiliency.
在分布式环境中,调用一些服务不可避免的会出现失败,Hystrix
帮助咱们添加了一些容忍策略,并且将服务进行隔离处理,防止一个服务的失败影响到了另一个服务的调用,这些都提高了咱们系统的弹性。
Hystrix的处理流程
这里咱们结合一下Spring Cloud Hystrix进行说明,从HystrixCommandAspect
开始分析:
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
...
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);//第一步
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);//第二步
...
Object result;
try {
//第三步
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
}
....
return result;
}
这个切面主要针对HystrixCommand
和HystrixCollapser
这两个注解,前者用于进行熔断降级处理,后者用来根据配置进行合并请求(类比数据库操作,将多个insert语句合并成一个insert batch语句)。咱们侧重进行HystrixCommand
这一块的分析。
第一步:获取元数据(MetaHolder)
这段代码对应上面的MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
,里面封装了比如调用方法method
,参数args
,方法所属对象target
,动态代理对象proxy
,回调方法fallbackMethod
等等一些元数据的封装。这些数据在创建命令对象时会被使用。
第二步:获取调用者(HystrixInvokable)
它持有一个命令对象,并且可以在合适的时候通过这个命令对象完成具体的业务逻辑,针对HystrixCommand
上述的命令对象就是GenericObservableCommand
和GenericCommand
的一种,这里命令对象的选择和方法的返回值有关,如果返回值为Observable
类型,则创建GenericObservableCommand
命令,否则创建GenericCommand
命令。
第三步:执行命令(execute)
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
...
switch (executionType) {
case SYNCHRONOUS: {
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
...
}
}
从上面的代码段中,可以很容易的看出共有三种策略,同步、异步、OBSERVABLE,而Observable
又分为Cold Observable(observable.toObservable())
和Hot Observable(observable.observe())
。所以说总共有四种执行方式。但是底层都会调用到AbstractCommand.toObservable()
方法。
- execute():同步执行,返回一个单一的对象结果,发生错误时抛出异常。
- queue():异步执行,返回一个
Future
对象,包含着执行结束后返回的单一结果。 - observe():这个方法返回一个
Observable
对象,它代表操作的多个结果,但是已经被订阅者消费掉了。 - toObservable():这个方法返回一个
Observable
对象,它代表操作的多个结果,需要咱们自己手动订阅并消费掉。
在执行逻辑中,大量用到了RxJava
,各种回调处理,看的着实头晕,感兴趣的同学可以自行阅读源码,我这里只是介绍一些关键的流程点。
①首先会检查是否命中缓存(toObservable
方法中),命中缓存则直接返回:
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
②检查断路器是否打开,如果断路器打开,则通过handleShortCircuitViaFallback
直接进行fallback处理:
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
}else {
return handleShortCircuitViaFallback();
}
...
}
③检查是否用了信号量,如果用了,则判断是否被占满,占满后则抛出异常,通过handleSemaphoreRejectionViaFallback
直接转到fallback中进行执行,不执行后面的逻辑。如果没用,则会返回一个默认的TryableSemaphoreNoOp.DEFAULT
,在进行executionSemaphore.tryAcquire()
时始终返回true。
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
④执行命令中的逻辑
通过重写AbstractCommand
中的getExecutionObservable()
方法使得下面两个命令类中的相应逻辑被调用。
- GenericCommand中的run()方法
- GenericObservableCommand中的construct()方法
如果run
或者construct
中设置了超时时间,如果执行时间超过了阈值,则会抛出TimeoutException
,或者在执行过程中抛出其他异常,都会进入fallback中进行处理逻辑。
⑤发生异常后执行fallback
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd,
final HystrixEventType eventType,
final FailureType failureType,
final String message,
final Exception originalException) {
}
最终都会调用到这个方法,咱们看看FailureType
具体有哪几种类型。
- COMMAND_EXCEPTION:执行
run
方法或者construct
方法抛出异常时。 - TIMEOUT:超时情况下。
- SHORTCIRCUIT:断路器直接打开时,直接执行
handleShortCircuitViaFallback
方法。 - REJECTED_THREAD_EXECUTION:线程池、请求队列被占满的情况下。
- REJECTED_SEMAPHORE_EXECUTION:信号量占满情况下。
- BAD_REQUEST_EXCEPTION:
- REJECTED_SEMAPHORE_FALLBACK:
总结
Hystrix
中大量用了RxJava
,阅读源码看起来不免会觉得头晕,可以考虑在关键点打几个断点看看,不然各种回调会让你绕圈圈。不过个人觉得RxJava
代码看起来还是蛮优美的,只不过有些许不适应而已,后面有时间会研究一下RxJava
。