引言
ThreadLocal
ThreadLocal这个类给线程提供了一个本地变量,这个变量是该线程自己拥有,各线程间不共享。在该线程存活和ThreadLocal实例能访问的时候,保存了对这个变量副本的引用。当线程消失的时候,所有的本地实例都会被GC。并且建议ThreadLocal最好是使用 private static 修饰。
InheritableThreadLocal
InheritableThreadLocal是为了解决子线程获得父线程本地变量的需求,继承自ThreadLocal。如果你使用它,那么保存的所有东西都已经不在原来的threadLocals里面,而是在一个新的叫inheritableThreadLocals变量中。意思就是说每个线程Thread里面还有一个Map变量,名叫inheritableThreadLocals,它保存的是需要传递的引用(通过InheritableThreadLocal设置的线程变量)。
这种父子传递的需求还是有些比较重要的应用场景,如上下文传递(用户标识、事务等),调用日志跟踪等。Log4j中的MDC就是基于InheritableThreadLocal实现。但一般来说我们用线程池比较多,线程池会缓存线程,重复使用,线程可能会执行不同的任务。这样一来它的上下文传递就达不到正确的效果。
TransmittableThreadLocal
阿里巴巴有个开源项目就是为了解决线程池中变量传递,它里面有个叫TransmittableThreadLocal,继承于InheritableThreadLocal。它通过包装返回Runnable的方式代理了run方法,在run之前copy装载线程变量,run之后清除线程变量,来实现此功能。TransmittableThreadLocal除了继承过来的线程Map,它还定义了一个名叫holder的InheritableThreadLocal静态变量,也就是说TransmittableThreadLocal有两套线程变量。
但事实上我们不可能所有的应用均采用InheritableThreadLocal,尽管他是一个不错的选择,但如何让ThreadLocal也实现在Hystrix应用场景下实现线程上下文的传播呢。这就是本章的重点了。
HystrixConcurrencystrategy
Hystrixconcurrencystrategy是Hystrix的线程池创建源码所在,并且提供方法wrapCallable来装饰线程池执行环境。所以我们我们可以自定义一个并发策略,即可于Hystrix应用场景内实现线程上下文的传播。附wrapCallable源码
/**
* Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
* <p>
* This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
* <p>
* <b>Default Implementation</b>
* <p>
* Pass-thru that does no wrapping.
*
* @param callable
* {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
* @return {@code Callable<T>} either as a pass-thru or wrapping the one given
*/
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return callable;
}
Ps:注释上有一句这么说的:This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
拓展
既然这个方法可以装饰线程池回调,那么我们亦可定义一个装饰器接口,只要这个接口的实现类,都会通过上述并发策略装饰不同业务不同场景需要的线程变量。
装饰器接口定义
/**
* Hystrix CallBack 装饰器定义
*
* @author wangzhuhua
* @date 2018/09/07 下午4:18
**/
public interface HystrixCallableWrapper {
/**
* 装饰 Callable实例
*
* @param callable
* 待装饰实例
* @param <T>
* 返回类型
* @return 装饰后的实例
*/
<T> Callable<T> wrap(Callable<T> callable);
}
HystrixConcurrencystrategy Custom
/**
* Hystrix并发策略
*
* @author wangzhuhua
* @date 2018/09/07 下午4:06
**/
public class HystrixConcurrencyStrategyCustom extends HystrixConcurrencyStrategy {
/** 装饰队列 */
private final List<HystrixCallableWrapper> wrappers;
public HystrixConcurrencyStrategyCustom(List<HystrixCallableWrapper> wrappers) {
this.wrappers = wrappers;
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return new CallableWrapperChain(callable, this.wrappers).wrapCallable();
}
/**
* callback 调用链
*
* @author wangzhuhua
* @date 2018/09/07 下午4:38
**/
private static class CallableWrapperChain<T> {
/** 回调 */
private final Callable<T> callable;
/** 回调包装 */
private final List<HystrixCallableWrapper> wrappers;
CallableWrapperChain(Callable<T> callable, List<HystrixCallableWrapper> wrappers) {
this.callable = callable;
this.wrappers = wrappers;
}
/**
* 装饰线程hystrix callable
*
* @return {@link Callable}
*/
Callable<T> wrapCallable() {
Callable<T> delegate = callable;
for (HystrixCallableWrapper wrapper : wrappers) {
delegate = wrapper.wrap(delegate);
}
return delegate;
}
}
}
Configuration
/**
* Hystrix配置
*
* @author wangzhuhua
* @date 2018/09/07 下午4:01
**/
@Configuration
@ConditionalOnProperty(value = "feign.hystrix.enabled", havingValue = "true")
public class HystrixConfiguration {
@Autowired(required = false)
private List<HystrixCallableWrapper> wrappers = new ArrayList<>();
@PostConstruct
public void init() {
HystrixPlugins.getInstance().registerConcurrencyStrategy(new HystrixConcurrencyStrategyCustom(wrappers));
}
}
实现示例
/**
* Token 装饰器
*
* @author wangzhuhua
* @date 2018/09/10 上午11:28
**/
@Component
public class TokenWrapper implements HystrixCallableWrapper {
@Override
public <T> Callable<T> wrap(Callable<T> callable) {
return new TokenAwareCallable(callable, TokenHandler.getToken());
}
/**
* Token装饰
*
* @param <T>
*/
static class TokenAwareCallable<T> implements Callable<T> {
/** 回调代理 */
private final Callable<T> delegate;
/** Token */
private final String token;
TokenAwareCallable(Callable<T> callable, String token) {
this.delegate = callable;
this.token = token;
}
@Override
public T call() throws Exception {
try {
// 填充当前线程变量
TokenHandler.setToken(this.token);
return delegate.call();
} finally {
TokenHandler.remove();
}
}
}
}
其中TokenHandler内使用线程变量存储