在深入研究熔断器之前,我们需要先看一下Hystrix的几个重要的默认配置,这几个配置在HystrixCommandProperties
中
//时间窗(ms)
static final Integer default_metricsRollingStatisticalWindow = 10000;
//最少请求次数
private static final Integer default_circuitBreakerRequestVolumeThreshold = 20;
//熔断器打开后开始尝试半开的时间间隔
private static final Integer default_circuitBreakerSleepWindowInMilliseconds = 5000;
//错误比例
private static final Integer default_circuitBreakerErrorThresholdPercentage = 50;
这几个属性共同组成了熔断器的核心逻辑,即:
- 每10秒的窗口期内,当请求次数超过20次,且出错比例超过50%,则触发熔断器打开
- 当熔断器5秒后,会尝试放过去一部分流量进行试探
熔断器初始化
熔断器的初始化是在HystrixCircuitBreaker.Factory
的getInstance
方法
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// this should find it for all but the first time
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
// Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
// 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
// If 2 threads hit here only one will get added and the other will get a non-null response instead.
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
// this means the putIfAbsent step just created a new one so let's retrieve and return it
return circuitBreakersByCommand.get(key.name());
} else {
// this means a race occurred and while attempting to 'put' another one got there before
// and we instead retrieved it and will now return it
return cbForCommand;
}
}
由上方代码可知,每一个熔断器都是由HystrixCircuitBreakerImpl
实现的,而所有的熔断器都维护在circuitBreakersByCommand
这个ConcurrentHashMap
中
熔断器实现
构造方法
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
enum Status {
CLOSED, OPEN, HALF_OPEN
}
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
}
先介绍一下几个比较基础的属性:
-
HystrixCommandProperties
:当前熔断器的配置 -
HystrixCommandMetrics
: 请求统计组件 -
Status
:熔断器状态枚举,一共包含三种,关闭、打开和半开 -
status
:当前熔断器的状态 -
circuitOpened
:当前熔断器的打开时间 -
activeSubscription
:订阅请求统计的处理函数
请求统计处理
private Subscription subscribeToStream() {
/*
* This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
*/
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
直接看onNext
方法里的处理方式:
- 时间窗内的请求数量是否达标,按默认配置就是10秒钟的请求数是否超过20次,如果不达标不能开启熔断器
- else中首先判断错误比例是否达到比例,按默认就是50%
- 满足打开条件,使用CAS修改状态为打开,并记录打开时间
circuitOpened
为当前时间
当记录了当前应用的统计数据之后,在每次请求的时候就可以根据这些数据来判断是否应该打开熔断器了
请求过滤
不知你是否还记得在系列文章第一篇中曾经提到了一个方法applyHystrixSemantics
,在这个方法中就包含了判断是否应该熔断的逻辑,如果熔断器打开的情况下会直接进入降级逻辑。这个判断的方法如下:
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
if (isAfterSleepWindow()) {
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
//only the first request after sleep window should execute
return true;
} else {
return false;
}
} else {
return false;
}
}
}
- 第一个if,如果配置强制熔断则返回false表示开启熔断器进入降级逻辑
- 第二个,如果配置强制关闭则返回正常不进行后续的判断
- 第三个,打开时间为空则肯定没打开过
- 第四个,判断是否满足尝试时间,默认是5秒钟。时间计算方式如下:
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
- 当满足尝试时则使用CAS方式修改熔断器为半开状态
而当请求成功的时候则会调用如下方法清除统计数据,更改熔断器状态为关闭
public void markSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
circuitOpened.set(-1L);
}
}
请求失败则再次打开熔断器,并更新打开时间
public void markNonSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//This thread wins the race to re-open the circuit - it resets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
}
}