一、背景
限流是对某一时间窗口内的请求数进行限制,保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机。常用的限流算法有令牌桶和和漏桶,而Google开源项目Guava中的RateLimiter使用的就是令牌桶控制算法。在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流- 缓存:缓存的目的是提升系统访问速度和增大系统处理容量。
二、代码
自定义注解(实现按需控制)
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RequestLimiter {
/**
* 每秒创建令牌个数 默认50
*/
double qps() default 50D;
/**
* 获取令牌等待超时时间 默认500
*/
long timeOut() default 500;
/**
* 超时时间单位默认:毫秒
*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
/**
* 无法获取令牌返回提示信息
*/
String resultMsg() default "请求频繁,请稍后再试!";
}
限流类:
@Component
@Slf4j
public class RequestLimitingInterceptor implements HandlerInterceptor, CommandLineRunner {
private final Map<String, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
//限流脚本url统一返回
private final Response<Void> output = new Response<>("9999", "请求频繁,请稍后再试!");
@Override
public void run(String... args) {
// TODO: 2022/9/11
// 根据自己需求按需定制,比如查询数据库等操作
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
//标准文档返回值
JSONObject jsonObject = new JSONObject();
jsonObject.put("success", Boolean.FALSE);
jsonObject.put("resultCode", "9999");
jsonObject.put("result", null);
try {
if (handler instanceof HandlerMethod) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
RequestLimiter rateLimit = handlerMethod.getMethodAnnotation(RequestLimiter.class);
String requestIp = IpUtil.getIpAddress(request);
String servletPath = request.getServletPath();
log.info("请求ip:{},请求uri:{}", requestIp, servletPath);
//注解为空 或者 非限流uri
if (rateLimit == null || noLimitEngineUriList.contains(servletPath)) {
return true;
}
//有request 流中的请求数据只能获取一次,因此需要需要重新拷贝流
CustomRequestWrapper customRequestWrapper = new CustomRequestWrapper(request);
String bodyString = getBodyString(customRequestWrapper);
//获取请求url
String url = servletPath;
log.info("请求appKey和uri:{}", url);
RateLimiter rateLimiter;
//判断map集合中是否有创建好的令牌桶
if (!rateLimiterMap.containsKey(url)) {
//创建令牌桶,以nr / s往桶中放入令牌
rateLimiter = RateLimiter.create(rateLimit.qps());
rateLimiterMap.put(url, rateLimiter);
}
rateLimiter = rateLimiterMap.get(url);
//1 获取令牌
boolean acquire = rateLimiter.tryAcquire(rateLimit.timeOut(), rateLimit.timeUnit());
if (acquire) {
//获取令牌成功
return true;
} else {
log.info("请求被限流,url:{}", url);
//获取令牌失败
//根据自身逻辑做处理
return false;
}
}
return true;
} catch (Exception var6) {
var6.printStackTrace();
returnResult(response, jsonObject);
return false;
}
}
private void returnResult(HttpServletResponse response, JSONObject jsonObject) {
response.setContentType("application/json;charset=utf-8");
response.setCharacterEncoding("UTF-8");
try (PrintWriter out = response.getWriter()) {
out.append(jsonObject.toJSONString());
} catch (Exception e) {
e.printStackTrace();
}
}
public static String getBodyString(ServletRequest request) {
StringBuilder sb = new StringBuilder();
InputStream inputStream = null;
BufferedReader reader = null;
try {
inputStream = request.getInputStream();
reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
String line = "";
while ((line = reader.readLine()) != null) {
sb.append(line);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return sb.toString();
}
}
springMvc 拦截
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
protected final RequestLimitingInterceptor requestLimitingInterceptor;
public WebMvcConfig(RequestLimitingInterceptor requestLimitingInterceptor) {
this.requestLimitingInterceptor = requestLimitingInterceptor;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
//请求限流
registry.addInterceptor(requestLimitingInterceptor).addPathPatterns("/**");
}
}
springboot 启动类 配置@ServletComponentScan 注解
由于request.getInputStream()中的参数只能使用一次,要想重复使用需要对流重复拷贝
filter拦截类
@Component
@WebFilter(filterName = "CommonFilter", urlPatterns = "/*")
public class CommonFilter extends HttpServlet implements Filter {
@Override
public void init(FilterConfig filterConfig) {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
ServletRequest requestWrapper = null;
if (servletRequest instanceof HttpServletRequest) {
requestWrapper = new CustomRequestWrapper((HttpServletRequest) servletRequest);
}
// 调用后续过滤器
if (requestWrapper == null) {
filterChain.doFilter(servletRequest, servletResponse);
} else {
// 封装RequestBody请求
filterChain.doFilter(requestWrapper, servletResponse);
}
}
}
流拷贝
public class CustomRequestWrapper extends HttpServletRequestWrapper {
private byte[] requestBody = null;
public CustomRequestWrapper(HttpServletRequest request) {
super(request);
//缓存请求body
try {
//对流进行拷贝放在成员变量requestBody里
requestBody = StreamUtils.copyToByteArray(request.getInputStream());
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 重写 getInputStream()
*/
@Override
public ServletInputStream getInputStream() {
if (requestBody == null) {
requestBody = new byte[0];
}
final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(requestBody);
return new ServletInputStream() {
public boolean isFinished() {
return false;
}
public boolean isReady() {
return false;
}
public void setReadListener(ReadListener readListener) {
}
@Override
public int read() {
return byteArrayInputStream.read();
}
};
}
@Override
public BufferedReader getReader() {
return new BufferedReader(new InputStreamReader(getInputStream()));
}
}