文章目录
- 概述
- 如何使用
- 流量控制
- 熔断降级
- 控制台
- 总结
Sentinel是阿里中间件团队开源的,面向分布式服务架构的轻量级高可用流量控制组件。与Hystrix库相比,它提供了更丰富的功能,以及更直观的实现。本文主参考Sentinel官方提供的示例,让更多的朋友体对sentinel有一个初步的认识。在后续的文章中会讲述它的实现原理细节。
1. 概述
Sentinel主要提供了流量控制、服务降级、系统负载保护、实时数据监控等功能。
先从流量控制说起,什么是流量控制?顾名思义,和字面名字的含义差不多,只不过这里的流量一般指的是对RPC服务调用的流量。那么为什么要对它进行控制呢?一个原因是一般我们的一台服务器可能对提供过个RPC服务,如果一个服务占用过多的资源,那么其他的服务可能不能正常的提供服务了。还有过多的请求会降低服务质量等原因。
Sentinel提供了两种流量控制方式,一是QPS,二是服务的线程数。
关于服务熔断降级,老生常谈的话题。Sentinel提供了两种降级策略,一是RT(run time),二是异常比例。后面结合Demo解释
2. 如何使用?
Sentinel资源保护(流量控制和熔断降级),这里的资源可以使RPC服务、方法等。主要分为两个步骤:
- 定义规则,就是定义某个资源保护的规则,比如限流策略、降级策略等。
- 访问资源,说白了就是给资源起一个名字,为了区分其他资源
其实官网上把第二步称为资源定义,但是个人认为把它成为资源访问更为合适,因为KEY代表了某个资源,但是具体代表了哪个资源是通过访问某资源时才能确定的。
3. 流量控制
下面定义规则,Sentinel 支持三种规则:流量控制规则、熔断降级规则以及系统保护规则。这里先试试流量控制规则。
private static void initFlowQpsRule() {
// 规则对应的类为FlowRule,用List保存,可以有多个规则
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(KEY);
// QPS为20
rule1.setCount(20);
//限流的类型
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setControlBehavior(CONTROL_BEHAVIOR_DEFAULT);
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
流量控制规则对应的类为FlowRule
- setResource(KEY)方法是设置资源名,也就是限流规则的作用对象,更通俗的讲:本条规则对哪个资源生效。
- count是限流阈值,当我们定义的是流量控制规则是根据QPS进行限流时,它表示QPS的阈值,当然如果是根据线程数限流,它表示线程数。
- grade表示限流阈值类型,是按照 QPS 还是线程数默认根据 QPS。
- controlBehavior表示发生拦截后是直接拒绝,还是排队等待,还是慢启动模式,默认是直接拒绝,如果设置为排队等待则还需要设置maxQueueingTimeMs(最大排队时间)。
最后 FlowRuleManager.loadRules(rules)是将rules生效。总体的来说Sentinel的API比较直观易懂,我们继续往下,开始访问我们定义的资源。
下面结合一个例子对Sentinel有进一步的认识。接下来是访问资源,其实就是对服务的请求代码请求进行一个控制,代码如下:
private static String KEY = "biz"// 资源名
public void request() {
try {
// token acquired, means pass
Entry entry = SphU.entry(KEY);
//被保护biz代码
} catch (BlockException e1) {
// 资源访问阻止,被限流或被降级
} catch (Exception e2) {
// biz exception
} finally {
if (entry != null) {
entry.exit();
}
}
}
这里可以看出所谓的资源其实就是Entry entry = SphU.entry(KEY);
代码后的业务代码,并且Sentinel通过一个KEY作为它的唯一表示。
SphU的entry() 方法,将返回KEY的调用的信息。 当超过任何规则的阈值时。 将抛出BlockException。进入catch BlockException 代码块。
下面是参考官网的的一个完整的例子:
public class FlowQpsDemo {
private static final String KEY = "abc";
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static int seconds = 20;
public static void main(String[] args) {
initFlowQpsRule();
//单开一条线程,统计流量数据
new Thread(new CountFlowTool()).start();
//多线程请求
new simulateFlow(10).start();
}
private static void initFlowQpsRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(KEY);
// set limit qps to 20
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rule1.setControlBehavior(CONTROL_BEHAVIOR_DEFAULT);
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
static class CountFlowTool implements Runnable {
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + " send qps is: " + oneSecondTotal);
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ ", pass:" + oneSecondPass
+ ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get());
System.exit(0);
}
}
static class simulateFlow {
int threadCount;
simulateFlow(int threadCount) {
this.threadCount = threadCount;
}
public void start() {
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new Runnable() {
public void run() {
while (!stop) {
Entry entry = null;
try {
total.incrementAndGet();
entry = SphU.entry(KEY);
// token acquired, means pass
pass.addAndGet(1);
} catch (BlockException e1) {
block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
if (entry != null) {
entry.exit();
}
}
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
}
}
}
});
t.start();
}
}
}
}
主方法中显示定义规则,让后通过多线程模拟请求,内部通过循环不停的请求资源。统计工具是统计前一秒的请求数据。总体代码比较直观易懂。
根据线程数的流量限制也是类似。只是规则设置代码不同。还有一个问题就是根据线程数的流量限制中,Sentinel没有对线程的控制权限,内部只是对请求线程的统计。如果超出阈值,新的请求会被立即拒绝。但是根据QPS进行流量控制中可以有多个选择:1. 直接拒绝(CONTROL_BEHAVIOR_DEFAULT),2. 慢启动也叫冷启动(CONTROL_BEHAVIOR_WARM_UP)过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮的情况。,3. 匀速通过(CONTROL_BEHAVIOR_RATE_LIMITER),这种方式严格控制了请求通过的间隔时间,也即是让请求以均匀的速度通过,内部实现是漏桶算法。
4. 熔断降级
Sentinel中,对资源的调用都自动熔断。通常用两种方式来衡量资源是否处于稳定的状态:RT和异常。以平均响应时间为例,如果请求的的时间大于阈值,那么接下来会尝试5次,如果这5次的请求时间平均时间大于阈值,那么在接下来的时间窗口(timeWindow)内将自动降级处理(直接跳入BlockException),当时间窗口结束再次尝试5次,以此重复。这里尝试五次的触发点是RT超过阈值。
下面结合Demo讲解,这里指列出规则定义的代码:
private static void initDegradeRule() {
List<DegradeRule> rules = new ArrayList<DegradeRule>();
DegradeRule rule = new DegradeRule();
rule.setResource(KEY);
// 设置响应时间,50ms
rule.setCount(50);
rule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
rule.setTimeWindow(5);
rules.add(rule);
DegradeRuleManager.loadRules(rules);
}
通过异常降级于此类似。当资源的每秒异常总数占通过总数的比值超过阈值(DegradeRule 中的 count)之后,资源进入降级状态,即在接下的时间窗口(DegradeRule 中的 timeWindow,以 s 为单位)之内,对这个方法的调用都会自动地返回。
下面参考官网给出一个完整的基于RT熔断的例子:
public class RtDegradeDemo {
private static final String KEY = "abc";
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static int seconds = 10000;
private static boolean stop = false;
public static void main(String[] args) throws Exception {
initDegradeRule();
new Thread(new CountFlowTool()).start();
new simulateFlow(1).start();
}
private static void initDegradeRule() {
List<DegradeRule> rules = new ArrayList<DegradeRule>();
DegradeRule rule = new DegradeRule();
rule.setResource(KEY);
rule.setCount(50);
rule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
rule.setTimeWindow(10);
rules.add(rule);
DegradeRuleManager.loadRules(rules);
}
static class simulateFlow {
int threadCount;
public simulateFlow(int threadCount) {
this.threadCount = threadCount;
}
public void start() {
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
while (!stop) {
Entry entry = null;
try {
total.incrementAndGet();
TimeUnit.MILLISECONDS.sleep(100);
entry = SphU.entry(KEY);
pass.getAndIncrement();
TimeUnit.MILLISECONDS.sleep(100);
} catch (BlockException e) {
block.incrementAndGet();
} catch (Throwable e) {
//biz exception
} finally {
if (entry != null) {
entry.exit();
}
}
}
}).start();
}
}
}
static class CountFlowTool implements Runnable {
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(TimeUtil.currentTimeMillis() + ", laseSec total:" + oneSecondTotal
+ ", pass:" + oneSecondPass
+ ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get());
System.exit(0);
}
}
}
5. 控制台
Sentinel提供了控制台模块,用于对实时限流熔断数据的可视化和对规则(限流和熔断)的实时推送。控制台是一个基于Spring boot开发的控制台。其主要流程为:
- 控制台工程启动
- APP启动,向控制台发送心跳数据(APP信息包括机器信息、APP name等),可以从代码找到:
public class Env {
public static final NodeBuilder nodeBuilder = new DefaultNodeBuilder();
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();// doInit方法会调用sendHeadBeat
}
}
- 前端发送HTTP请求(请求规则、实时运行数据) -> 控制台HTTP请求-> APP Netty Server
搭建控制台步骤:
- 启动控制台工程,有两种方式直接运行Jar包和在源码中启动SpringBoot,笔者在这里介绍后者。代码工程如下:
添加如下参数:-Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard
2 . 启动APP应用,参数如下:-Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=demo
maven 依赖:
<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>0.2.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>0.2.1-SNAPSHOT</version>
</dependency>
</dependencies>
恭喜你!
6. 总结
本文主要介绍了Sentinel的限流和熔断降级功能,但是Sentinel的功能不止于此。比如系统负载保护,系统负载高于某个阈值,就禁止或者减少流量的进入;当 load 开始好转,则恢复流量的进入。以及对注解的支持、控制面板等。
相比Hystrix来说,Sentinel的熔断和限流逻辑更简单明了。后续会出Sentinel的实现原理文章。