序
本文主要研究一下HystrixMetricsPublisher
HystrixMetricsPublisher
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisher.java
/**
* Abstract class with default implementations of Factory methods for creating "Metrics Publisher" instances for getting metrics and other related data
* exposed, published or otherwise retrievable by external systems such as Servo (https://github.com/Netflix/servo)
* for monitoring and statistical purposes.
* <p>
* See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a
* href="https://github.com/Netflix/Hystrix/wiki/Plugins">https://github.com/Netflix/Hystrix/wiki/Plugins</a>.
*/
public abstract class HystrixMetricsPublisher {
// TODO should this have cacheKey functionality like HystrixProperties does?
// I think we do otherwise dynamically provided owner and properties won't work
// a custom override would need the caching strategy for properties/publisher/owner etc to be in sync
/**
* Construct an implementation of {@link HystrixMetricsPublisherCommand} for {@link HystrixCommand} instances having key {@link HystrixCommandKey}.
* <p>
* This will be invoked once per {@link HystrixCommandKey} instance.
* <p>
* <b>Default Implementation</b>
* <p>
* Return instance of {@link HystrixMetricsPublisherCommandDefault}
*
* @param commandKey
* {@link HystrixCommandKey} representing the name or type of {@link HystrixCommand}
* @param commandGroupKey
* {@link HystrixCommandGroupKey} of {@link HystrixCommand}
* @param metrics
* {@link HystrixCommandMetrics} instance tracking metrics for {@link HystrixCommand} instances having the key as defined by {@link HystrixCommandKey}
* @param circuitBreaker
* {@link HystrixCircuitBreaker} instance for {@link HystrixCommand} instances having the key as defined by {@link HystrixCommandKey}
* @param properties
* {@link HystrixCommandProperties} instance for {@link HystrixCommand} instances having the key as defined by {@link HystrixCommandKey}
* @return instance of {@link HystrixMetricsPublisherCommand} that will have its <code>initialize</code> method invoked once.
*/
public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties) {
return new HystrixMetricsPublisherCommandDefault(commandKey, commandGroupKey, metrics, circuitBreaker, properties);
}
/**
* Construct an implementation of {@link HystrixMetricsPublisherThreadPool} for {@link HystrixThreadPool} instances having key {@link HystrixThreadPoolKey}.
* <p>
* This will be invoked once per {@link HystrixThreadPoolKey} instance.
* <p>
* <b>Default Implementation</b>
* <p>
* Return instance of {@link HystrixMetricsPublisherThreadPoolDefault}
*
* @param threadPoolKey
* {@link HystrixThreadPoolKey} representing the name or type of {@link HystrixThreadPool}
* @param metrics
* {@link HystrixThreadPoolMetrics} instance tracking metrics for the {@link HystrixThreadPool} instance having the key as defined by {@link HystrixThreadPoolKey}
* @param properties
* {@link HystrixThreadPoolProperties} instance for the {@link HystrixThreadPool} instance having the key as defined by {@link HystrixThreadPoolKey}
* @return instance of {@link HystrixMetricsPublisherThreadPool} that will have its <code>initialize</code> method invoked once.
*/
public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) {
return new HystrixMetricsPublisherThreadPoolDefault(threadPoolKey, metrics, properties);
}
/**
* Construct an implementation of {@link HystrixMetricsPublisherCollapser} for {@link HystrixCollapser} instances having key {@link HystrixCollapserKey}.
* <p>
* This will be invoked once per {@link HystrixCollapserKey} instance.
* <p>
* <b>Default Implementation</b>
* <p>
* Return instance of {@link HystrixMetricsPublisherCollapserDefault}
*
* @param collapserKey
* {@link HystrixCollapserKey} representing the name or type of {@link HystrixCollapser}
* @param metrics
* {@link HystrixCollapserMetrics} instance tracking metrics for the {@link HystrixCollapser} instance having the key as defined by {@link HystrixCollapserKey}
* @param properties
* {@link HystrixCollapserProperties} instance for the {@link HystrixCollapser} instance having the key as defined by {@link HystrixCollapserKey}
* @return instance of {@link HystrixMetricsPublisherCollapser} that will have its <code>initialize</code> method invoked once.
*/
public HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) {
return new HystrixMetricsPublisherCollapserDefault(collapserKey, metrics, properties);
}
}
- getMetricsPublisherForCommand,默认返回HystrixMetricsPublisherCommandDefault
- getMetricsPublisherForThreadPool,默认返回HystrixMetricsPublisherThreadPoolDefault
- getMetricsPublisherForCollapser,默认返回HystrixMetricsPublisherCollapserDefault
HystrixMetricsPublisherCommandDefault
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherCommandDefault.java
/**
* Default implementation of {@link HystrixMetricsPublisherCommand} that does nothing.
* <p>
* See <a href="https://github.com/Netflix/Hystrix/wiki/Plugins">Wiki docs</a> about plugins for more information.
*
* @ExcludeFromJavadoc
*/
public class HystrixMetricsPublisherCommandDefault implements HystrixMetricsPublisherCommand {
public HystrixMetricsPublisherCommandDefault(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties) {
// do nothing by default
}
@Override
public void initialize() {
// do nothing by default
}
}
目前是空操作
HystrixMetricsPublisherThreadPoolDefault
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherThreadPoolDefault.java
/**
* Default implementation of {@link HystrixMetricsPublisherThreadPool} that does nothing.
* <p>
* See <a href="https://github.com/Netflix/Hystrix/wiki/Plugins">Wiki docs</a> about plugins for more information.
*
* @ExcludeFromJavadoc
*/
public class HystrixMetricsPublisherThreadPoolDefault implements HystrixMetricsPublisherThreadPool {
public HystrixMetricsPublisherThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) {
// do nothing by default
}
@Override
public void initialize() {
// do nothing by default
}
}
目前也是空操作
HystrixMetricsPublisherCollapserDefault
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherCollapserDefault.java
/**
* Default implementation of {@link HystrixMetricsPublisherCollapser} that does nothing.
* <p>
* See <a href="https://github.com/Netflix/Hystrix/wiki/Plugins">Wiki docs</a> about plugins for more information.
*
* @ExcludeFromJavadoc
*/
public class HystrixMetricsPublisherCollapserDefault implements HystrixMetricsPublisherCollapser {
public HystrixMetricsPublisherCollapserDefault(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) {
// do nothing by default
}
@Override
public void initialize() {
// do nothing by default
}
}
目前也是空操作
HystrixMetricsPublisherDefault
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherDefault.java
/**
* Default implementation of {@link HystrixMetricsPublisher}.
* <p>
* See <a href="https://github.com/Netflix/Hystrix/wiki/Plugins">Wiki docs</a> about plugins for more information.
*
* @ExcludeFromJavadoc
*/
public class HystrixMetricsPublisherDefault extends HystrixMetricsPublisher {
private static HystrixMetricsPublisherDefault INSTANCE = new HystrixMetricsPublisherDefault();
public static HystrixMetricsPublisher getInstance() {
return INSTANCE;
}
private HystrixMetricsPublisherDefault() {
}
}
默认提供了一个单例方法
MicrometerMetricsPublisher
micrometer-core-1.0.5-sources.jar!/io/micrometer/core/instrument/binder/hystrix/MicrometerMetricsPublisher.java
/**
* @author Clint Checketts
*/
@NonNullApi
@NonNullFields
public class MicrometerMetricsPublisher extends HystrixMetricsPublisher {
private final MeterRegistry registry;
public MicrometerMetricsPublisher(MeterRegistry registry) {
this.registry = registry;
}
@Override
public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey,
HystrixCommandGroupKey commandGroupKey,
HystrixCommandMetrics metrics,
HystrixCircuitBreaker circuitBreaker,
HystrixCommandProperties properties) {
return new MicrometerMetricsPublisherCommand(registry, commandKey, commandGroupKey, metrics, circuitBreaker, properties);
}
}
micrometer组件重写了getMetricsPublisherForCommand 方法,返回MicrometerMetricsPublisherCommand
MicrometerMetricsPublisherCommand
micrometer-core-1.0.5-sources.jar!/io/micrometer/core/instrument/binder/hystrix/MicrometerMetricsPublisherCommand.java
@NonNullApi
@NonNullFields
public class MicrometerMetricsPublisherCommand implements HystrixMetricsPublisherCommand {
private static final Logger LOG = LoggerFactory.getLogger(MicrometerMetricsPublisherCommand.class);
private static final List<HystrixEventType> executionEvents = Arrays.asList(
HystrixEventType.EMIT,
HystrixEventType.SUCCESS,
HystrixEventType.FAILURE,
HystrixEventType.TIMEOUT,
HystrixEventType.BAD_REQUEST,
HystrixEventType.SHORT_CIRCUITED,
HystrixEventType.THREAD_POOL_REJECTED,
HystrixEventType.SEMAPHORE_REJECTED);
private static final List<HystrixEventType> fallbackEvents = Arrays.asList(
HystrixEventType.FALLBACK_EMIT,
HystrixEventType.FALLBACK_SUCCESS,
HystrixEventType.FALLBACK_FAILURE,
HystrixEventType.FALLBACK_REJECTION,
HystrixEventType.FALLBACK_MISSING);
private static final String NAME_HYSTRIX_CIRCUIT_BREAKER_OPEN = "hystrix.circuit.breaker.open";
private static final String NAME_HYSTRIX_COMMAND_OTHER = "hystrix.command.other";
private static final String NAME_HYSTRIX_EXECUTION = "hystrix.execution";
private static final String NAME_HYSTRIX_FALLBACK = "hystrix.fallback";
private static final String NAME_HYSTRIX_ERRORS = "hystrix.errors";
private static final String NAME_HYSTRIX_REQUESTS = "hystrix.requests";
private static final String NAME_HYSTRIX_LATENCY_EXECUTION = "hystrix.latency.execution";
private static final String NAME_HYSTRIX_LATENCY_TOTAL = "hystrix.latency.total";
private static final String NAME_HYSTRIX_THREADPOOL_CONCURRENT_EXECUTION_CURRENT = "hystrix.threadpool.concurrent.execution.current";
private static final String NAME_HYSTRIX_THREADPOOL_CONCURRENT_EXECUTION_ROLLING_MAX = "hystrix.threadpool.concurrent.execution.rolling.max";
private static final String DESCRIPTION_HYSTRIX_COMMAND_OTHER = "Other execution results. See https://github.com/Netflix/Hystrix/wiki/Metrics-and-Monitoring#other-command-event-types-comnetflixhystrixhystrixeventtype for type definitions";
private static final String DESCRIPTION_HYSTRIX_EXECUTION = "Execution results. See https://github.com/Netflix/Hystrix/wiki/Metrics-and-Monitoring#command-execution-event-types-comnetflixhystrixhystrixeventtype for type definitions";
private static final String DESCRIPTION_HYSTRIX_FALLBACK = "Fallback execution results. See https://github.com/Netflix/Hystrix/wiki/Metrics-and-Monitoring#command-fallback-event-types-comnetflixhystrixhystrixeventtype for type definitions";
private final MeterRegistry meterRegistry;
private final HystrixCommandMetrics metrics;
private final HystrixCircuitBreaker circuitBreaker;
private final Iterable<Tag> tags;
private final HystrixCommandKey commandKey;
public MicrometerMetricsPublisherCommand(MeterRegistry meterRegistry, HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties) {
this.meterRegistry = meterRegistry;
this.metrics = metrics;
this.circuitBreaker = circuitBreaker;
this.commandKey = commandKey;
tags = Tags.of("group", commandGroupKey.name(), "key", commandKey.name());
//Initialize commands at zero
Counter.builder(NAME_HYSTRIX_ERRORS).tags(tags).register(meterRegistry);
Counter.builder(NAME_HYSTRIX_REQUESTS).tags(tags).register(meterRegistry);
Timer.builder(NAME_HYSTRIX_LATENCY_EXECUTION).tags(tags).register(meterRegistry);
Timer.builder(NAME_HYSTRIX_LATENCY_TOTAL).tags(tags).register(meterRegistry);
executionEvents.forEach(this::getExecutionCounter);
fallbackEvents.forEach(this::getFallbackCounter);
Arrays.stream(HystrixEventType.values()).filter(e -> !executionEvents.contains(e) && !fallbackEvents.contains(e))
.forEach(this::getOtherExecutionCounter);
}
@Override
public void initialize() {
Gauge.builder(NAME_HYSTRIX_CIRCUIT_BREAKER_OPEN, circuitBreaker, c -> c.isOpen() ? 1 : 0)
.tags(tags).register(meterRegistry);
HystrixCommandCompletionStream.getInstance(commandKey)
.observe()
.subscribe(hystrixCommandCompletion -> {
/*
our assumptions about latency as returned by hystrixCommandCompletion:
# a latency of >= 0 indicates that this the execution occurred.
# a latency of == -1 indicates that the execution didn't occur (default in execution result)
# a latency of < -1 indicates some clock problems.
We will only count executions, and ignore non-executions with a value of -1.
Latencies of < -1 are ignored as they will decrement the counts, and Prometheus will
take this as a reset of the counter, therefore this should be avoided by all means.
*/
long totalLatency = hystrixCommandCompletion.getTotalLatency();
if (totalLatency >= 0) {
Timer.builder(NAME_HYSTRIX_LATENCY_TOTAL)
.tags(tags)
.register(meterRegistry)
.record(totalLatency, TimeUnit.MILLISECONDS);
} else if (totalLatency < -1) {
LOG.warn("received negative totalLatency, event not counted. " +
"This indicates a clock skew? {}",
hystrixCommandCompletion);
}
long executionLatency = hystrixCommandCompletion.getExecutionLatency();
if (executionLatency >= 0) {
Timer.builder(NAME_HYSTRIX_LATENCY_EXECUTION)
.tags(tags)
.register(meterRegistry)
.record(executionLatency, TimeUnit.MILLISECONDS);
} else if (executionLatency < -1) {
LOG.warn("received negative executionLatency, event not counted. " +
"This indicates a clock skew? {}",
hystrixCommandCompletion);
}
for (HystrixEventType hystrixEventType : HystrixEventType.values()) {
int count = hystrixCommandCompletion.getEventCounts().getCount(hystrixEventType);
if (count > 0) {
switch (hystrixEventType) {
/* this list is derived from {@link HystrixCommandMetrics.HealthCounts.plus} */
case FAILURE:
case TIMEOUT:
case THREAD_POOL_REJECTED:
case SEMAPHORE_REJECTED:
Counter.builder(NAME_HYSTRIX_ERRORS)
.tags(tags)
.register(meterRegistry)
.increment(count);
case SUCCESS:
Counter.builder(NAME_HYSTRIX_REQUESTS)
.tags(tags)
.register(meterRegistry)
.increment(count);
break;
}
if (executionEvents.contains(hystrixEventType)) {
getExecutionCounter(hystrixEventType).increment(count);
} else if (fallbackEvents.contains(hystrixEventType)) {
getFallbackCounter(hystrixEventType).increment(count);
} else {
getOtherExecutionCounter(hystrixEventType).increment(count);
}
}
}
});
String threadPool = metrics.getThreadPoolKey().name();
Gauge.builder(NAME_HYSTRIX_THREADPOOL_CONCURRENT_EXECUTION_CURRENT, metrics, HystrixCommandMetrics::getCurrentConcurrentExecutionCount)
.tags(Tags.concat(tags, "threadpool", threadPool))
.register(meterRegistry);
Gauge.builder(NAME_HYSTRIX_THREADPOOL_CONCURRENT_EXECUTION_ROLLING_MAX, metrics, HystrixCommandMetrics::getRollingMaxConcurrentExecutions)
.tags(Tags.concat(tags, "threadpool", threadPool))
.register(meterRegistry);
}
private Counter getOtherExecutionCounter(HystrixEventType hystrixEventType) {
return Counter.builder(NAME_HYSTRIX_COMMAND_OTHER)
.description(DESCRIPTION_HYSTRIX_COMMAND_OTHER)
.tags(Tags.concat(tags, "event", hystrixEventType.name().toLowerCase()))
.register(meterRegistry);
}
private Counter getFallbackCounter(HystrixEventType hystrixEventType) {
return Counter.builder(NAME_HYSTRIX_FALLBACK)
.description(DESCRIPTION_HYSTRIX_FALLBACK)
.tags(Tags.concat(tags, "event", hystrixEventType.name().toLowerCase()))
.register(meterRegistry);
}
private Counter getExecutionCounter(HystrixEventType hystrixEventType) {
return Counter.builder(NAME_HYSTRIX_EXECUTION)
.description(DESCRIPTION_HYSTRIX_EXECUTION)
.tags(Tags.concat(tags, "event", hystrixEventType.name().toLowerCase()))
.register(meterRegistry);
}
}
- 将HystrixEventType归类为两类,一类是executionEvents,一类是fallbackEvents
- Counter指标建立了NAME_HYSTRIX_ERRORS、NAME_HYSTRIX_REQUESTS、NAME_HYSTRIX_COMMAND_OTHER、NAME_HYSTRIX_FALLBACK、NAME_HYSTRIX_EXECUTION
- Timer指标建立了NAME_HYSTRIX_LATENCY_EXECUTION、NAME_HYSTRIX_LATENCY_TOTAL
- Gauge指标建立了NAME_HYSTRIX_CIRCUIT_BREAKER_OPEN、NAME_HYSTRIX_THREADPOOL_CONCURRENT_EXECUTION_CURRENT、NAME_HYSTRIX_THREADPOOL_CONCURRENT_EXECUTION_ROLLING_MAX
小结
HystrixMetricsPublisher提供了扩展,可以自己将metrics落地存储,另外HystrixMetricsPublisherCommand接口主要是实现initialize方法,在里头去注册指标收集,具体可以参考micrometer的实现MicrometerMetricsPublisherCommand。