十三、消息总线
SpringCloudBus配合SpringCloudConfig一起使用,可以实现配置的动态刷新。
1、是什么
消息总线有两种设计思想:
一种是如上图所示,利用消息总线触发一个客户端/bus/refresh,而刷新所有的客户配置。这样的话微服务的单一内聚性会达不到,可能即作为支付微服务提供者又作为服务配置,而且万一宕机......
另一种是下图所示利用消息总线触发一个服务器ConfigServer的bus/refresh端点,而刷新所有客户端的配置。
2、能干嘛
Spring Cloud Bus能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改,事件推送等,也可以当做微服务间的通信通道。
3、为何被称为总线
什么是消息总线(类似于公众号):在微服务架构的系统中,通
常会使用消息代理来构建一个共同的消息主题,并让系统中所有微服务实例都连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以称为消息总线。在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息。
基本原理:ConfigClient实例都监听MQ中同一个topic(默认是SpringCloudBus)。当一个服务刷新数据的时候,他会把这个信息放到Topic中,这样其他监听同一个Topic的微服务就能得到通知,然后去更新自身的配置。
4.怎么来实现
首先SpringCloudConfig服务端
1.添加pom依赖
<!--添加消息总线RabbitMQ支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
2、修改yml文件,添加rabbitmq的相关配置
server:
port: 3344
spring:
application:
name: cloud-config-server #注册进eureka的微服务名
cloud:
config:
server:
git:
uri: https://github.com/fushier12/springcloud-config.git #Git上面的git仓库名称
###搜索目录
search-paths:
-springcloud-config
###读取分支
label: master
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
#rabbitmq的相关配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
#rabbitmq的相关配置,暴露bus刷新配置的端点
management:
endpoints:
web:
exposure:
include: 'bus-refresh'
SpringCloudConfig客户端
1.添加pom
<!--添加消息总线RabbitMQ支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
2.添加yml(其他的客户端服务同理,只不过改一下端口号就可以了)
server:
port: 3355
spring:
application:
name: config-client #注册进eureka的微服务名
cloud:
config:
###读取分支
label: master #分支名称
name: config #配置文件名称
profile: dev #读取后缀名称
uri: http://localhost:3344 #配置中心地址
#rabbitmq的相关配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
#暴露监控端点
management:
endpoints:
web:
exposure:
include: "*"
5、测试
1、启动服务注册7001,然后启动config3344,最后启动3355、3366
2、修改github上面的配置文件(已经将version是2修改为3)
3、这时可以发现3344中读取的配置已经更改了,因为配置中心是直接从github读取数据的,但是3355和3366还没有更改
4、这时还需要触发服务器ConfigServer的bus/refresh端点,执行以下命令
curl -X POST "http://localhost:3344/actuator/bus-refresh"
5、此时可以看到配置都发生了更改,因为消息总线将通知都广播出去了
6、动态刷新定点通知
1、假如说只想通知3355,不想通知3366,进行定点通知
2、curl发生变化
curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"
3、结果可以发现3366没有变化
7、总结
十四、消息驱动
1、消息驱动概述
1.1是什么?
简单来说就是屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架,应用程序通过inputs或者outputs来与SpringCloudStream中binder对象交互,通过我们配置来binding(绑定),而与Spring Cloud Stream的binder对象负责与消息中间件交互,所以,只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka。
1.2设计思想
没有引用SpringCloudStream之前的标准MQ
比方说用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,如果用了来了两个消息队列中的其中一种,如果在后面需要向另外一种消息队列进行迁移,一大堆东西就需要推到重做,这时需要一个目的地的绑定器,SpringCloudStream提供了一种解耦合的方式。通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
1.3标准流程套路
Binder:很方便的连接中间件,屏蔽差异
Channal:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。
Source和Sink:简单的可理解为参照对象是SpringCloudStream自身,从Stream发布消息就是输出,接受消息就是输入。
2、Stream消息驱动之生产者
1.建module
2.写pom
添加新的依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3.写yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #在此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding的整合
type: rabbit #消息组件类型
environment: #设置rabbitmq的相关环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
output: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的Exchange名称定义
content_type: application/json #设置消息类型,本次为json,文本则设置"text/planin"
binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: send-8801.com #在信息列表显示主机名称
prefer-ip-address: true #访问的路径变为IP地址
4.业务类
/**
* 主要是下面这个业务实现类的注解,不再是使用@Service注解,
* 而是使用@EnableBinding(Source.class)注解,并且作为消息生产者绑定Source。
*不再是调用Dao层,而是调用MessageChannel消息通道中的方法进行绑定。
*/
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial*****:" + serial);
return null;
}
}
控制层不变
@RestController
public class SendMessageController {
@Resource
private IMessageProvider iMessageProvider;
@GetMapping("/sendMessage")
public String sendMessage() {
return iMessageProvider.send();
}
}
3、Stream驱动之消费者
1.建module
2.写pom
3.写yml
几乎是一样的,只有除了端口以及在Eureka显示的ID以外,唯一的区别就是,把这里的output换成是input
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: #在此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding的整合
type: rabbit #消息组件类型
environment: #设置rabbitmq的相关环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
input: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的Exchange名称定义
content_type: application/json #设置消息类型,本次为json,文本则设置"text/planin"
binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: recieve-8802.com #在信息列表显示主机名称
prefer-ip-address: true #访问的路径变为IP地址
4.业务类
作为消费者不需要有service接口和实现类这些,只需要有controller调用就可以了
@Component
@EnableBinding(Sink.class)
public class RecieveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消费者一号" + message.getPayload() + "\t" + "server:" + serverPort);
}
}
5.测试
这样在浏览器http://localhost:8801/sendMessage进行测试,在RabbitMQ上通道上就可以看到访问了。
4、分组消费与持久化
1.重复消费
举例:比如以下场景,一个订单系统做了集群部署,都会从RabbitMQ中获取订单信息,如果同一个订单同时被两个订单系统获取到,就会造成数据错误。
注意在Stream中处于同一个group中的多个消费者是竞争关系,就能保证消息只被其中一个应用消费一次,但是如果是不同的组是可以重复消费的。要想解决这个问题可以用Stream的消息分组。
以下是在rabbitmq中显示的两个组,假如不进行分组的话,默认是不同的组的,都会进行消费。
1.1怎么解决重复消费
只需要在yml配置文件中加一个group配置,将两个应用设置为同一个组,这样的话同一个组内就可以进行轮询消费。而避免如上订单系统出现的问题。
2.消费持久化
假如8801在发送消息的时候8802和8803都宕机了,但是配置文件中加了group配置,当应用重启之后还是可以获取到这个消息,但是,倘若不加这个配置,就会造成消息丢失,拿不到在这宕机期间发送的消息。试想一下,假如你买东西了,银行卡的钱没有少,那商家得多倒霉。
所以:group一定要配置,太重要了
十五、SpringCloudSleuth分布式请求链路跟踪
1、概述
痛点
在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的服务节点调用来协同产生最后的请求结果,每一个前阶请求都会形成一条复杂的分布式调用链路,链路中任何一环出现高延时或错误都会引起整个请求最后的失败。
是什么
SpringCloudSleuth提供了一套完整的服务跟踪的解决方案,在分布式系统中提供追踪解决方案并且兼容支持了zipkin。简单来说Sleuth负责收集整理,zipkin负责展现。
2.服务监控步骤
安装zipkin
下载地址:
https://dl.bintray.com/openzipkin/maven/io/zipkin/java/zipkin-server/2.12.9/
运行java -jar "jar包"进行安装如下
运行控制台
http://localhost:9411/zipkin/
Trace:类似于树结构的span集合,表示一条调用链路,存在唯一标识。就像是下面的一条链路。
span:标识调用链路来源,通俗理解span就是一次请求信息。
3.怎么玩
服务端
1.建module(cloud-provider-payment8001)
2.添加pom
<!--包含了Sleuth和Zipkin-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
3.写yml(添加一些zipkin的配置)
spring:
application:
name: cloud-payment-service
zipkin:
base-url: http://localhost:9411 #表示数据要上传到哪里
sleuth:
sampler:
#采样率介于0到1之间,1则表示全部采集
probability: 1
4.业务类(随便加一个方法)
@GetMapping("/payment/zipkin")
public String paymentZipkin() {
return "这里是zipkin";
}
消费端
module采用cloud-consumer-order8000,pom、yml的配置和服务端完全一样,也在业务类中添加一个方法
//---------------zipkin+sleuth-------------
@GetMapping("/comsumer/payment/zipkin")
public String paymentZipkin() {
String result = restTemplate.getForObject("http://localhost:8001/payment/zipkin", String.class);
return result;
}
在浏览器调用,然后在zipkin即可看到链路监控效果
http://localhost:8000/comsumer/payment/zipkin
十六、SpringCloudAlibaba入门概述
1、SpringCloudAlibaba带来了什么
官网简介
https://spring.io/projects/spring-cloud-alibaba
https://github.com/alibaba/spring-cloud-alibaba/blob/master/README-zh.md
主要功能
- 服务限流降级:默认支持 WebServlet、WebFlux, OpenFeign、RestTemplate、Spring Cloud Gateway, Zuul, Dubbo 和 RocketMQ 限流降级功能的接入,可以在运行时通过控制台实时修改限流降级规则,还支持查看限流降级 Metrics 监控。
- 服务注册与发现:适配 Spring Cloud 服务注册与发现标准,默认集成了 Ribbon 的支持。
- 分布式配置管理:支持分布式系统中的外部化配置,配置更改时自动刷新。
- 消息驱动能力:基于 Spring Cloud Stream 为微服务应用构建消息驱动能力。
- 分布式事务:使用 @GlobalTransactional 注解, 高效并且对业务零侵入地解决分布式事务问题。。
- 阿里云对象存储:阿里云提供的海量、安全、低成本、高可靠的云存储服务。支持在任何应用、任何时间、任何地点存储和访问任意类型的数据。
- 分布式任务调度:提供秒级、精准、高可靠、高可用的定时(基于 Cron 表达式)任务调度服务。同时提供分布式的任务执行模型,如网格任务。网格任务支持海量子任务均匀分配到所有 Worker(schedulerx-client)上执行。
- 阿里云短信服务:覆盖全球的短信服务,友好、高效、智能的互联化通讯能力,帮助企业迅速搭建客户触达通道。
组件
Sentinel:把流量作为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
Nacos:一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
RocketMQ:一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
Dubbo:Apache Dubbo™ 是一款高性能 Java RPC 框架。
Seata:阿里巴巴开源产品,一个易于使用的高性能微服务分布式事务解决方案。
Alibaba Cloud ACM:一款在分布式架构环境中对应用配置进行集中管理和推送的应用配置中心产品。
Alibaba Cloud OSS: 阿里云对象存储服务(Object Storage Service,简称 OSS),是阿里云提供的海量、安全、低成本、高可靠的云存储服务。您可以在任何应用、任何时间、任何地点存储和访问任意类型的数据。
Alibaba Cloud SchedulerX: 阿里中间件团队开发的一款分布式任务调度产品,提供秒级、精准、高可靠、高可用的定时(基于 Cron 表达式)任务调度服务。
Alibaba Cloud SMS: 覆盖全球的短信服务,友好、高效、智能的互联化通讯能力,帮助企业迅速搭建客户触达通道。
十七、Spring Cloud Alibaba Nacos服务注册与配置中心
1、简介
1.1为什么叫Nacos
Naming和Configuration的前两个字母,最后的s是Service
1.2是什么
一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。其实就是注册中心加配置中心的组合,技术来看就是Nacos=Eureka+Config+Bus
2.安装并运行Nacos
下载完成后,打开bin目录
#linux:
./startup.sh -m standalone
#windows:
cmd startup.cmd
运行成功后访问http://localhost:8848/nacos
3、Nacos之服务提供者注册
1、建module
2、写pom
在父pom上加上以下依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
微服务提供者模块加上以下依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
3、写yml
server:
port: 9002
spring:
application:
name: nacos-payment-provider
cloud:
nacos:
discovery:
server-addr: localhost:8848 #配置Nacos地址
management:
endpoints:
web:
exposure:
include: '*'
4、主启动(nacos用的都是EnableDiscoveryClient注解,很重要)
@SpringBootApplication
@EnableDiscoveryClient
public class PaymentMain9001 {
public static void main(String[] args) {
SpringApplication.run(PaymentMain9001.class, args);
}
}
5、业务类
@RestController
public class PaymentController {
@Value("${server.port}")
private String serverPort;
@GetMapping("/payment/nacos/{id}")
public String getPayment(@PathVariable("id") Integer id) {
return "nacos serverPort:" + serverPort + "\t id" + id;
}
}
根据以上9001再搭建一个9002作为服务提供者集群
4、Nacos之服务消费者注册
1、建module
2、写pom
和微服务提供端加的依赖相同
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
3、写yml
server:
port: 8300
spring:
application:
name: nacos-order-consumer
cloud:
nacos:
discovery:
server-addr: localhost:8848 #配置Nacos地址
#消费者将要去访问的微服务名称(注册成功进nacos的微服务提供者)
service-url:
nacos-user-service: http://nacos-payment-provider
4、启动类
@SpringBootApplication
@EnableDiscoveryClient
public class OrderNacosMain8300 {
public static void main(String[] args) {
SpringApplication.run(OrderNacosMain8300.class, args);
}
}
5、配置类
首先要加一个配置类,因为Nacos自带负载均衡,如下可以发现,nacos的依赖中带有ribbon的依赖,因为有ribbon,所以就需要加一个配置类。
@Configuration
public class ApplicationContextConfig {
@Bean
@LoadBalanced
public RestTemplate getRestTemplate() {
return new RestTemplate();
}
}
6、业务类
@RestController
@Slf4j
public class OrderNacosController {
@Resource
private RestTemplate restTemplate;
@Value("${service-url.nacos-user-service}")
private String serverUrl;
@GetMapping(value = "/consumer/payment/nacos/{id}")
public String getPaymentInfo(@PathVariable("id") Long id) {
return restTemplate.getForObject(serverUrl + "/payment/nacos/" + id, String.class);
}
}
5、Nacos服务注册中心对比提升
CAP中的C指的是:所有节点在同一时间看到的数据是一致的;而A的定义高可用是所有的请求都会响应。P则是分区容错属性。
AP模式
如果不需要存储服务级别的信息并且服务实例是通过nacos-client注册,并且能够保持心跳上报,那么就可以选择AP模式,当前主流的服务SpringCloud和Dubbo服务,都适用于AP模式,AP模式为了服务的可能性而减弱了一致性,因此AP模式下只注册临时实例。
CP模式
如果需要在服务级别编辑或者存储配置信息,那么CP是必须的,K8S服务和DNS服务则适用于CP模式,CP模式下则支持持久化实例,此时则是以Raft协议为集群运行模式,该模式下注册实例之前必须先注册服务,如果服务不存在,则会返回错误。
切换模式
curl -X PUT '$NACOS_SERVER:8848/nacos/v1/ns/operator/switches?entry=serverMode&value=CP'
6、Nacos服务配置中心(替代config)
一、怎么用
1、建Module
cloudalibaba-config-nacos-client3377
2、写pom
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
3、写yml
注意这里是两个配置文件,一个是bootstrap.yml,另一个是application.yml,前者优先级高于后者。
#bootstrap.yml的配置
#nacos配置
server:
port: 3377
spring:
application:
name: nacos-config-client
cloud:
nacos:
discovery:
server-addr: localhost:8848 #nacos服务注册中心地址
config:
server-addr: localhost:8848 #nacos作为配置中心地址
file-extension: yaml #指定yaml格式的配置
#application.yml的配置
spring:
profiles:
active: dev #表示开发环境
4、主启动类
@SpringBootApplication
@EnableDiscoveryClient
public class NacosConfigClientMain3377 {
public static void main(String[] args) {
SpringApplication.run(NacosConfigClientMain3377.class, args);
}
}
5、业务类
@RestController
@RefreshScope //支持Nacos的动态刷新功能
public class ConfigClientController {
@Value("${config.info}")
private String configInfo;
@GetMapping("/config/info")
public String getConfigInfo() {
return configInfo;
}
}
6、Nacos中进行配置
见官网https://nacos.io/zh-cn/docs/quick-start-spring-cloud.html
7、测试
在浏览器中输入映射地址就可以显示配置信息(而且在nacos修改配置信息之后可以直接广播生效,直接刷新就可以)
8、Nacos的NameSpace、Group、DataID
在nacos中添加分组设置后,在IDEA中可以指定访问那个个命名的空间、在指定命名的Group,然后指定DataID,就是是一个工程一样,多级目录
#nacos配置
server:
port: 3377
spring:
application:
name: nacos-config-client
cloud:
nacos:
discovery:
server-addr: localhost:8848 #nacos服务注册中心地址
config:
server-addr: localhost:8848 #nacos作为配置中心地址
file-extension: yaml #指定yaml格式的配置
#在config中添加group属性可以指定访问配置的分组
group:
#对应着在nacos中添加命名空间时生成的ID
namespace:
7、Nacos集群和持久化配置(重点)
1、官网说明
Nacos默认使用嵌入式数据库实现数据的存储(也就是说,当关闭Nacos之后,重启Nacos之后之前的配置依然在,数据实际是存储在数据库中的)。但是,如果启动多个默认配置下的Nacos节点,数据存储存在一致性问题。所以,Nacos采用了集中式存储的方式来支持集群化部署,目前只支持MySQL的存储。配置MySQL数据库之后,所有写入嵌入书数据库的数据就可以都存储到MySQL了。
Nacos支持三种部署模式:
单机模式:用于测试和单机使用
集群模式:用于生产环境,确保高可用
多集群模式:用于多数据中心场景
2、Nacos持久化解释
Nacos自带的是derby数据库,怎么将derby切换到MySQL?
1、找到nacos/conf/nacos-mysql.sql文件,放到MySQL中执行
2、修改nacos/conf/application.properties文件
spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://ip地址:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
db.user=root
db.password=root
3、因为登陆报错,将mysql驱动jar包替换
3、Linux版Nacos+MySQL生产环境配置
linux的切换数据源的配置和上述相同,在完成上述配置之后,搭建集群只需要更改如下配置
①、修改nacos/conf/cluster.conf.example
②、修改nacos/bin/startup.sh
③、运行集群
#输入以下命令就可以运行集群了
./startup.sh -p 3333
./startup.sh -p 4444
./startup.sh -p 5555
④、配置Nginx
学完Nginx再回来看第110节,最后效果如下
十八、SpringCloudAlibaba Sentinel实现熔断与限流
1、前言:和Hystrix作对比
Hystrix:
1、需要我们自己手工搭建监控平台
2、没有一套web界面,不可以给我们进行更加细粒度化的配置流控、速率控制、服务熔断、服务降级
Sentinel:
1、单独一个组件,可以独立出来
2、直接界面化的细粒度统一配置
2、Sentinel(哨兵)是什么
官网:https://github.com/alibaba/Sentinel/wiki/%E4%BB%8B%E7%BB%8D
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。主要就是服务雪崩、服务降级、服务熔断、服务限流
3、Sentinel的下载安装运行
Sentinel分为两部分:
核心库(java客户端):不依赖任何框架/库,能够运行于所有java运行环境,同时对Dubbo/Spring Cloud等框架也有较好的支持。
控制台(Dashboard):基于Spring Boot开发,打包后可以直接运行,不需要额外的Tomcat等应用容器。
下载Sentinel的jar包,然后直接java -jar 运行即可 ,Sentinel的用户和密码默认都是Sentinel
4、Sentinel的初始化监控
1、建立一个module被Sentinel保护
2、pom
//sentinel的依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
//后面做持久化会用到
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
//添加openfeign的依赖,后面会用到
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
3、yml
server:
port: 8401
spring:
application:
name: cloudalibaba-sentinel-service
cloud:
nacos:
discovery:
#Nacos服务注册中心地址
server-addr: localhost:8848
sentinel:
transport:
#配置Sentinel dashboard地址
dashboard: localhost:8080
#默认8719端口,假如被占用会自动从8719+1依次扫描
port: 8719
management:
endpoints:
web:
exposure:
include: '*'
4、主启动类
@SpringBootApplication
@EnableDiscoveryClient
public class MainApp8401 {
public static void main(String[] args) {
SpringApplication.run(MainApp8401.class,args);
}
}
5、业务类
@RestController
public class FlowLimitController {
@GetMapping("/testA")
public String testA() {
return "testA";
}
@GetMapping("/testB")
public String testB() {
return "testB";
}
}
6、测试
此时访问一下controller中指定的路径,Sentinel就将对应用进行保护了,因为Sentinel采取的是懒加载,访问localhost:8080也就可以看到应用。
5、Sentinel的流量控制
流量控制(flow control),其原理是监控应用流量的 QPS 或并发线程数等指标,当达到指定的阈值时对流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。
资源名:唯一名称,默认请求路径
阈值类型/单机阈值:
①QPS(每秒钟的请求数量):当调用该api的QPS达到阈值的时候,进行限流
②线程数:当调用该api的线程数达到阈值的时候进行限流
流控模式:
①直接:api达到限流条件时,直接限流
②关联:当关联的资源达到阈值时,就限流自己。
③链路:只记录指定链路上的流量,如果达到阈值,就进行限流
流控效果
①快速失败:直接失败,抛异常
②Warm up:根据codeFactor(冷加载因子,默认3)的值,从阈值/codeFactor,经过预热时长,才达到设置的QPS阈值
③排队等待:匀速排队,让请求以匀速的速度通过,阈值类型必须为QPS,否则无效
1、QPS直接快速失败报错
2、QPS和线程作对比
如下图所示,在这里都以设置为1作比较,QPS表示的是每秒的请求数量,如果一旦一秒钟超过了1个,就相当于是有一扇门直接给挡住了,然后直接报错。线程就像是,进去了这扇门,不管有多少数据都可以进来,但是只能一个个的依次的来处理,就像是银行的业务人员,一旦两个想同时访问,不好意思,不行。
3、QPS关联模式快速报错(因为微服务中两个服务之间是有关联的,比如说支付接口挂了,那你下单接口也歇一歇)当与A关联的B达到阈值时,A就限流自己,B惹事A遭殃
@GetMapping("/testA")
public String testA() {
return "testA";
}
@GetMapping("/testB")
public String testB() {
return "testB";
}
4、QPS warm up流控效果
Warm Up(
RuleConstant.CONTROL_BEHAVIOR_WARM_UP
)方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。详细文档可以参考 流量控制 - Warm Up 文档,具体的例子可以参见 WarmUpFlowDemo。
通常冷启动的过程系统允许通过的 QPS 曲线如下图所示:
如下图所示,我们最后想要设置的值是单机阈值是10,但是不想让他一下子到10,而是想让他慢慢的增长,一开始的单机阈值设置为10(单机阈值)/3(冷加载因子)=3.3,5秒(预热时长)之后达到理想单机阈值10。
公式:默认 coldFactor 为 3,即请求 QPS 从 threshold / 3 开始,经预热时长逐渐升至设定的 QPS 阈值。
5、QPS匀速排队
匀速排队
匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER
)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。详细文档可以参考 流量控制 - 匀速器模式,具体的例子可以参见 PaceFlowDemo。
该方式的作用如下图所示:
这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期间逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。
如上配置表示不管有多少的请求放进来,一秒钟只会处理一个请求,超时时间设置为了20s,表示如果这个请求在20s之后还没有被处理就会报错。
6、Sentinel的服务降级
1、简介
Sentinel熔断降级会在调用链路中某个资源出现不稳定状态时(例如调用超时或异常比例升高),对这个资源的调用进行限制,让请求快速的失败,当资源被降级时,在接下来的降级时间窗口之内,对该资源的调用都自动熔断,而Hystrix则会有一个半开状态,这是不一样的。
RT、异常比例、异常数
2、RT
平均响应时间 (DEGRADE_GRADE_RT):当 1s 内持续进入 N 个请求,对应时刻的平均响应时间(秒级)均超过阈值(count,以 ms 为单位)
,那么在接下的时间窗口(DegradeRule 中的 timeWindow,以 s 为单位)之内,对这个方法的调用都会自动地熔断(抛出 DegradeException)。注意 Sentinel 默认统计的 RT 上限是 4900 ms,超出此阈值的都会算作 4900 ms,若需要变更此上限可以通过启动配置项 -Dcsp.sentinel.statistic.max.rt=xxx 来配置。
如上图所示,设置RT降级规则,RT设置为200毫秒,时间窗口设置为1s,也就是说,如果访问/testB资源进入的N个请求,访问响应的时间都超过200ms了,那么在这个时间窗口期1s内,就会发生服务熔断。
3、异常比例
异常比例 (DEGRADE_GRADE_EXCEPTION_RATIO):当资源的每秒请求量 >= N(可配置),并且每秒异常总数占通过量的比值超过阈值(DegradeRule 中的 count)之后,资源进入降级状态,即在接下的时间窗口(DegradeRule 中的 timeWindow,以 s 为单位)之内,对这个方法的调用都会自动地返回。异常比率的阈值范围是 [0.0, 1.0],代表 0% - 100%。
如上图所示,如果一秒钟内有四个请求访问/testB,50%以上的请求,比如说3个请求访问发生错误,就会在1s的时间窗口内发生服务熔断。
4、异常数
异常数 (DEGRADE_GRADE_EXCEPTION_COUNT):当资源近 1 分钟的异常数目超过阈值之后会进行熔断。注意由于统计时间窗口是分钟级别的,若 timeWindow(时间窗口) 小于 60s,则结束熔断状态后仍可能再进入熔断状态。所以时间窗口一定要大于等于一分钟时间
如上图所示,如果在一分钟内,异常数超过10个,就会发生服务降级。注意时间窗口一定要大于60s。
7、热点参数限流
何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如:
商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制
热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。
1、在服务类新加配置
@GetMapping("/testHotKey")
@SentinelResource(value = "testHotKey", blockHandler = "deal_testHotKey")
public String testHotKey(@RequestParam(value = "p1", required = false) String p1,
@RequestParam(value = "p2", required = false) String p2) {
return "-----testHotKey";
}
public String deal_testHotKey(String p1, String p2, BlockedException e) {
return "------deal_testHotKey failed";
}
2、浏览器访问
此时如果访问这个路径http://localhost:8401/testHotKey?p1=a,因为上图进行了热点限流,参数索引是0,也就是p1,单机阈值为1,也就是1s内访问超过1次就会触发热点限流,而这里就是触发我们写的兜底方法。如果没有兜底方法的话就会是一个很不友好的error界面。而且只要有p1就会进行热点限流
http://localhost:8401/testHotKey?p1=a会进行热点限流
http://localhost:8401/testHotKey?p1=a&p2=b会进行热点限流
http://localhost:8401/testHotKey?p2=b不会进行热点限流
3、参数例外项(VIP)
如上图所示,进行了参数例外项的配置,其实意思就是当索引0下标对应的值也就是p1,当他是5时,阈值QPS可以达到200在进行热点限流,而不再是1。相当于是给p1=5开了一个vip。参数类型必须是八大基本类型和String。
4、系统保护规则
系统保护规则是从应用级别的入口流量进行控制,从单台机器的 load、CPU 使用率、平均 RT、入口 QPS 和并发线程数等几个维度监控应用指标,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。系统保护规则是应用整体维度的,而不是资源维度的,并且仅对入口流量生效。入口流量指的是进入应用的流量。
Load 自适应(仅对 Linux/Unix-like 机器生效):
系统的 load1 作为启发指标,进行自适应系统保护。当系统 load1 超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才会触发系统保护(BBR 阶段)。系统容量由系统的 maxQps * minRt 估算得出。设定参考值一般是 CPU cores * 2.5。
CPU usage(1.5.0+ 版本):
当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0),比较灵敏。
平均 RT:
当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。
并发线程数:
当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。
入口 QPS:
当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。
8、@SentinelResource注解
@GetMapping(value = "/byResource")
@SentinelResource(value = "byResource", blockHandler = "handlerException")
public String CommonResult() {
return "success";
}
public String handlerException(BlockException e) {
return "failed" + e;
}
如上图所示,如果每一个方法都要配置一个blockHandler,那么会造成业务和代码高耦合,且造成代码冗余,所以,把处理方法提取出来放到一个类里面,并且可以在这个类里面配置多个方法。
@RestController
public class RateLimitController {
@GetMapping(value = "/extract")
@SentinelResource(value = "extract", blockHandlerClass = CustomerBlockHandler.class, blockHandler = "Handler1")
public String HandlerExtract() {
return "我成功了";
}
}
public class CustomerBlockHandler {
public static String Handler1(BlockException e) {
return "handler1由我来处理";
}
public static String Handler2(BlockException e) {
return "handler2由我来处理";
}
}
如上图所示,这样就可以避免了代码的冗余和业务的耦合。并且能够实现全局的统一处理方法,
@SentinelResource注解其他注意:
注意不可以用private注解
注意Sentinel的三个核心API:
①:sphU定义资源
②:Tracer定义统计
③:contextUtil定义上下文
9、服务熔断功能
一、环境预说
9003和9004只有端口号的差异,举例9003配置:
1、module
2、pom
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.atguigu.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
3、yml
server:
port: 9003
spring:
application:
name: nacos-payment-provider
cloud:
nacos:
discovery:
server-addr: localhost:8848 #配置Nacos地址
management:
endpoints:
web:
exposure:
include: '*'
4、主启动
@SpringBootApplication
@EnableDiscoveryClient
public class PaymentMain9003 {
public static void main(String[] args) {
SpringApplication.run(PaymentMain9003.class, args);
}
}
5、业务类
@RestController
public class PaymentController {
@Value("${server.port}")
private String serverPort;
public static HashMap<Long, Payment> hashMap = new HashMap<>();
static {
hashMap.put(1L, new Payment(1L, "dfadfhasdfgjadsf"));
hashMap.put(2L, new Payment(2L, "nnnnnnnn"));
hashMap.put(3L, new Payment(3L, "mmmmmmm"));
}
@GetMapping(value = "/paymentSQL/{id}")
public CommonResult<Payment> paymentSQL(@PathVariable("id") Long id) {
Payment payment = hashMap.get(id);
CommonResult<Payment> result = new CommonResult<>(200, "serverPort:" + serverPort, payment);
return result;
}
}
8402客户端环境:
1、module
2、pom
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.atguigu.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
3、yml
server:
port: 8402
spring:
application:
name: nacos-order-consumer
cloud:
nacos:
discovery:
server-addr: localhost:8848 #配置Nacos地址
sentinel:
transport:
#配置Sentinel dashboard地址
dashboard: localhost:8080
#默认8719端口,假如被占用会自动从8719+1依次扫描
port: 8719
#消费者将要去访问的微服务名称(注册成功进nacos的微服务提供者)
service-url:
nacos-user-service: http://nacos-payment-provider
4、主启动
@SpringBootApplication
@EnableDiscoveryClient
public class OrderMain {
public static void main(String[] args) {
SpringApplication.run(OrderMain.class, args);
}
}
5、配置类
@Configuration
public class ApplicationContextConfig {
@Bean
@LoadBalanced
public RestTemplate getRestTemplate() {
return new RestTemplate();
}
}
6、业务类
@RestController
@Slf4j
public class CircleBreakerController {
public static final String SERVICE_URL = "http://nacos-payment-provider";
@Resource
private RestTemplate restTemplate;
@GetMapping(value = "/consumer/fallback/{id}")
public CommonResult<Payment> fallBack(@PathVariable("id") Long id) {
CommonResult<Payment> result = restTemplate.getForObject(SERVICE_URL + "/paymentSQL/" + id, CommonResult.class, id);
if (id == 4) {
throw new IllegalArgumentException("非法参数异常");
} else if (result.getData() == null) {
throw new NullPointerException("没有对应的记录");
}
return result;
}
}
如上所示,客户端在访问时可以实现轮询
二、然后由业务类来进行服务熔断的学习
运用fallback管理程序运行时异常,如下所示添加一个fallback方法,就不再是难看的errorpage页面,而是我们自定义的页面。
@GetMapping(value = "/consumer/fallback/{id}")
@SentinelResource(value = "fallback",fallback = "handFallback")
public CommonResult<Payment> fallBack(@PathVariable("id") Long id) {
CommonResult<Payment> result = restTemplate.getForObject(SERVICE_URL + "/paymentSQL/" + id, CommonResult.class, id);
if (id == 4) {
throw new IllegalArgumentException("非法参数异常");
} else if (result.getData() == null) {
throw new NullPointerException("没有对应的记录");
}
return result;
}
public CommonResult<Payment> handFallback(@PathVariable("id") Long id, Throwable e) {
Payment payment = new Payment(id, "null");
return new CommonResult<>(444, "兜底异常" + e.getMessage(), payment);
}
三、fallback只会管java产生的异常,blockHandler只会管Sentinel控制台的配置违规
如下这样,既配置blockhandler又配置fallback就可以既处理java异常又处理Sentinel配置违规,如果两个规则都违规时,则会进入blockHandler的处理
。
@GetMapping(value = "/consumer/fallback/{id}")
// @SentinelResource(value = "fallback",fallback = "handFallback")
@SentinelResource(value = "fallback",fallback = "handFallback",blockHandler = "blockHandler")
public CommonResult<Payment> fallBack(@PathVariable("id") Long id) {
CommonResult<Payment> result = restTemplate.getForObject(SERVICE_URL + "/paymentSQL/" + id, CommonResult.class, id);
if (id == 4) {
throw new IllegalArgumentException("非法参数异常");
} else if (result.getData() == null) {
throw new NullPointerException("没有对应的记录");
}
return result;
}
public CommonResult<Payment> handFallback(@PathVariable("id") Long id, Throwable e) {
Payment payment = new Payment(id, "null");
return new CommonResult<>(444, "兜底异常" + e.getMessage(), payment);
}
public CommonResult<Payment> blockHandler(@PathVariable("id") Long id, BlockException e) {
Payment payment = new Payment(id, "null");
return new CommonResult<>(445, "blockhandler异常" + e.getMessage(), payment);
}
exceptionsToIgnore表示忽略这个异常,保证这个程序先走通,如下所示,这里是个数组,可以加多个异常VIP
四、Sentinel和OpenFeign
1、修改pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
2、修改yml
#添加激活Sentinel对Feign支持
feign:
sentinel:
enabled: true
3、主启动类添加注解@EnableFeignClients
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class OrderMain8402 {
public static void main(String[] args) {
SpringApplication.run(OrderMain8402.class, args);
}
}
4、业务类
首先是一个专门调用服务端方法的接口,详细看之前OpenFeign的学习,FeignClient的value对应着服务类的实例名,fallback表示处理降级的类
@FeignClient(value = "nacos-payment-provider", fallback = PaymentServiceImpl.class)
public interface PaymentService {
@GetMapping(value = "/paymentSQL/{id}")
CommonResult<Payment> paymentSQL(@PathVariable("id") Long id);
}
@Component
public class PaymentServiceImpl implements PaymentService {
@Override
public CommonResult<Payment> paymentSQL(Long id) {
return new CommonResult<>(44444, "服务降级返回", new Payment(id, "err"));
}
}
然后在controller就可以直接调用了,而不再需要RestTemplate了
@Resource
private PaymentService paymentService;
@GetMapping(value = "consumer/paymentSQL/{id}")
public CommonResult<Payment> paymentSQL(@PathVariable("id") Long id) {
return paymentService.paymentSQL(id);
}
5、测试
将服务端停掉
10、Sentinel的持久化规则
痛点:每次重启服务器,Sentinel的配置就都没有了,导致了每次都要重新配置
解决方法:将限流规则配置到nacos上面,只要刷新8401某个rest地址,Sentinel的流控规则就能看到,只要Nacos不删除配置,就一直有效。
详细步骤:8401示例
1、pom添加依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
2、修改yml(yml一定要格式正确,因为格式不正确,我的Sentinel持久化一直没生效)
server:
port: 8401
spring:
application:
name: cloudalibaba-sentinel-service
cloud:
nacos:
discovery:
#Nacos服务注册中心地址
server-addr: localhost:8848
sentinel:
transport:
#配置Sentinel dashboard地址
dashboard: localhost:8080
#默认8719端口,假如被占用会自动从8719+1依次扫描
port: 8719
datasource:
ds1:
nacos:
server-addr: localhost:8848
dataId: cloudalibaba-sentinel-service
groupId: DEFAULT_GROUP
data_type: json
rule-type: flow
management:
endpoints:
web:
exposure:
include: '*'
在yml中配置的dataId即对应着在如下Nacos中配置的Data ID。
3、在nacos中配置Sentinel流控规则
配置规则:
resource:
资源名,即限流规则的作用对象;
limitApp:
流控针对的调用来源,若为 default 则不区分调用来源;
grade:
限流阈值类型: 0表示线程数,1表示QPS;
strategy:
流控模式,0表示直接,1表示关联,2表示链路;
controlBehavior:
流量控制效果:0快速失败、1Warm Up、2排队等待;
clusterMode:
是否集群。
这时Sentinel中就会一直有我们配置的限流规则了,但是目前来看Sentinel的持久化配置比较复杂,应该是个半成品,期待阿里后期继续完善。
十九、SpringCloudAlibaba Seata处理分布式事务
1、概述
解决痛点:
单体应用被拆分为微服务应用,原来的三个模块被拆分成三个独立的应用,分别连接三个独立的数据源,业务操作就需要调用这个三个服务来完成,此时每个服务内部的数据一致性可以由本事务保证,但是全局一致性问题没法解决。比如说在一个微服务电商平台,用于购买商品的业务逻辑,整个业务逻辑由三个微服务提供支持:
仓储服务:
对给定的商品扣除仓储数量
订单服务:
根据采购需求创建订单
账户服务:
从用户账户中扣除余额
此时就需要一个大哥来保证全局一致性问题——Seata
Seata一加三的套件组成:
一是全局的Transaction ID XID
全局的唯一事务ID
三个常见Seata术语:
TC - 事务协调者
(协调者)
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM - 事务管理器
(发起者)
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM - 资源管理器
(参与者)
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
1、TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID
(就好像班主任向授课老师申请开一个授课班级号)
2、XID在微服务调用链路的上下文中传播;
(把班号都告诉同学们,好让他们加进来)
3、RM向TC注册分支事务,将其纳入XID对应全局事务的管辖
(同学们知道了XID后加入进来此时就归授课老师管理了)
4、TM向TC发起针对XID的全局提交或者回滚协议
(同学们全部签到,班主任告诉授课老师可以开始讲课了)
5、TC调度XID下管辖的全部分支事务完成提交或回滚请求
(讲完了,授课老师结课)
从别的地方看到的另一个解释:
(1)发起方会向协调者申请一个全局事务id,并保存到ThreadLocal中(为什么要保存到ThreadLocal中?弱引用,线程之间不会发生数据冲突)
(2)Seata数据源代理发起方和参与方的数据源,将前置镜像和后置镜像写入到undo_log表中,方便后期回滚使用
(3)发起方获取全局事务id,通过改写Feign客户端请求头传入全局事务id。
(4)参与方从请求头中获取全局事务id保存到ThreadLocal中,并把该分支注册到SeataServer中。
(5)如果没有出现异常,发起方会通知协调者,协调者通知所有分支,通过全局事务id和本地事务id删除undo_log数据,如果出现异常,通过undo_log逆向生成sql语句并执行,然后删除undo_log语句。如果处理业务逻辑代码超时,也会回滚。
- TM 开启分布式事务(TM 向 TC 注册全局事务记录);
- 按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );
- TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);
- TC 汇总事务信息,决定分布式事务是提交还是回滚;
- TC 通知所有 RM 提交/回滚 资源,事务二阶段结束。
2、下载安装配置
一、下载安装
下载地址:https://github.com/seata/seata/releases
二、修改conf下的file.conf文件:
主要修改自定义事务组名称+事务日志存储为db+数据库连接信息
service:
store:
db:
三、Mysql新建库seata、建表
建表的SQL是由Seata默认提供的,但是1.0之后就没有了
1.db_store.sql
drop table if exists `global_table`;
create table `global_table` (
`xid` varchar(128) not null,
`transaction_id` bigint,
`status` tinyint not null,
`application_id` varchar(32),
`transaction_service_group` varchar(32),
`transaction_name` varchar(128),
`timeout` int,
`begin_time` bigint,
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`xid`),
key `idx_gmt_modified_status` (`gmt_modified`, `status`),
key `idx_transaction_id` (`transaction_id`)
);
drop table if exists `branch_table`;
create table `branch_table` (
`branch_id` bigint not null,
`xid` varchar(128) not null,
`transaction_id` bigint ,
`resource_group_id` varchar(32),
`resource_id` varchar(256) ,
`lock_key` varchar(128) ,
`branch_type` varchar(8) ,
`status` tinyint,
`client_id` varchar(64),
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`branch_id`),
key `idx_xid` (`xid`)
);
drop table if exists `lock_table`;
create table `lock_table` (
`row_key` varchar(128) not null,
`xid` varchar(96),
`transaction_id` long ,
`branch_id` long,
`resource_id` varchar(256) ,
`table_name` varchar(32) ,
`pk` varchar(36) ,
`gmt_create` datetime ,
`gmt_modified` datetime,
primary key(`row_key`)
);
2.db_undo_log.sql
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
四、修改registry.conf配置文件
五、启动nacos
六、启动Seata
这时候会报错,因为我的数据库是8.0.17版本的,而Seata默认支持的是5.1.30版本
1、首先将jar包替换掉
2、然后将file.conf的类加载器进行更改,在mysql6之后就进行替换了,注意还要加上时区
3、启动成功
3、SEATA 的分布式交易解决方案
一、业务概述
这里我们会创建三个微服务,一个订单服务,一个库存服务,一个账户服务。当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减加单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额。
最后在订单服务中修改订单状态为已完成。
该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。
二、数据库准备
1、建三个库对应着三个微服务
create database seata_order;
create database seata_storage;
create database seata_account;
2、各库下建自己的表及一张回滚表
#seata_order下建t_order
use seata_order;
create table t_order (
`id` bigint(20) not null auto_increment primary key,
`user_id` bigint(20) default null comment '用户ID',
`product_id` bigint(20) default null comment '产品ID',
`count` int(11) default null comment '数量',
`money` decimal(18,2) default null comment '金额',
`status` int(1) default null comment '订单状态:0-创建中,1-已完结'
) engine=INNODB auto_increment = 1 default charset = 'utf8';
select * from t_order;
#seata_storage下建表t_storage
use seata_storage;
create table t_storage (
`id` bigint(20) not null auto_increment primary key,
`product_id` bigint(20) default null comment '产品ID',
`total` int(11) default null comment '总库存',
`used` int(11) default null comment '使用库存',
`residue` int(11) default null comment '剩余库存'
) engine=INNODB auto_increment = 1 default charset = 'utf8';
insert into t_storage(`id`, `product_id`, `total`, `used`, `residue`) values('1', '1', '100' , '0', '100');
select * from t_storage;
#seata_account下建表t_account
use seata_account;
create table t_account (
`id` bigint(20) not null auto_increment primary key,
`user_id` bigint(20) default null comment '用户ID',
`total` decimal(18,2) default null comment '总额度',
`used` decimal(18,2) default null comment '使用额度',
`residue` decimal(18,2) default '0' null comment '剩余额度'
) engine=INNODB auto_increment = 1 default charset = 'utf8';
insert into t_account(`id`, `user_id`, `total`, `used`, `residue`) values('1', '1', '10000' , '0', '10000');
select * from t_account;
##三个库下都建立这张回滚表
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id',
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME NOT NULL COMMENT 'modify datetime',
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
三、微服务准备
三个微服务的大概就是,客户访问一个地址进行下单商品,订单模块controller调用Service方法,在Service的实现类中,调用Dao通过mybatis对mysql数据库进行修改,创建一个订单在订单库的订单表中,因为订单创建了,那么库存就要减少。所以利用Feign调用库存服务模块的方法,对库存进行减少,库存减少了,用户的账户余额也要减少,然后再通过Feign调用账户服务的方法,对账户余额进行减少。这些都执行完了,那么这个订单算是创建完成了。
但是一旦其中的某一个模块,比如说,库存服务的方法调用超时了,那么可能就会产生一种后果,账户上的钱少了,但是库存没有少,而且订单也没有创建成功,而只需要在业务方法上添加@GlobalTransactional注解就可以了。就可以保证要么全部成功,要么全部失败。
1、支付模块
(1)建module
seata-order-service2001
(2)改pom
<dependencies>
<!--springcloud alibaba nacos discovery-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.0.0</version>
</dependency>
<!--springcloud alibaba nacos config-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--springcloud alibaba sentinel-datasource-nacos 后续做持久化用到-->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
<!--springcloud alibaba sentinel-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<!--springcloud openfeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
(3)改yml
server:
port: 2001
spring:
appliaction:
name: seata-order-service
cloud:
alibaba:
seata:
# 自定义事务组名称需要 seata-service 中的对应
tx-service-group: fsp_tx_group
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.0.163:3306/seata_order?useUnicode=true&characterEncoding=utf-8&useSSL=false
username: root
password: root
feign:
hystrix:
enabled: false
logging:
level:
io:
seata: info
mybatis:
mapper-locations: classpath:mapper/*.xml
(4)改file.conf(直接把内容剪贴过来即可)
(5)改registry.conf(直接把内容剪贴过来即可)
(6)domain(也就是实体类entity)
(7)Dao
@Mapper
public interface OrderDao {
void create(Order order);
void update(@Param("userId") Long userId, @Param("status") Integer status);
}
(8)Service
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Resource
private OrderDao orderDao;
@Resource
private AccountService accountService;
@Resource
private StorageService storageService;
@Override
@GlobalTransactional(name="fsp-create-order",rollbackFor = Exception.class)
public void create(Order order) {
log.info("开始创建订单");
orderDao.create(order);
log.info("订单库存开始做扣减");
storageService.decrease(order.getProductId(), order.getTCount());
log.info("账户余额进行扣减");
accountService.decrease(order.getProductId(), order.getTCount(), order.getMoney());
log.info("修改订单状态");
orderDao.update(order.getUserId(), 0);
log.info("下订单结束");
}
@Override
public void update(Long id, Long userId, Integer status) {
}
}
(9)Controller
@RestController
public class OrderController {
@Resource
private OrderService orderService;
@GetMapping(value = "/order/create")
public CommonResult createOrder(Order order) {
orderService.create(order);
return new CommonResult(200, "订单创建成功");
}
}
(10)config配置
@Configuration
@MapperScan("com.atguigu.springcloud.alibaba.dao")
public class MybatisConfig {
}
(11)主启动类
@SpringBootApplication
@EnableFeignClients
@EnableDiscoveryClient
public class SeataOrderMainApp2001 {
public static void main(String[] args) {
SpringApplication.run(SeataOrderMainApp2001.class, args);
}
}
另外两个服务配置类似,只不过是为了实现减库存,减金额的业务逻辑,所以最重点的就是在业务的入口create方法上面加了@GlobalTransactional注解,保证了全局数据的一致性。
四、Seata原理
整体机制(AT模式)
两阶段提交协议的演变:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:①提交异步化,非常快速地完成。②回滚通过一阶段的回滚日志进行反向补偿。
在一阶段,Seata会拦截“业务SQL”
1.解析SQL语义,找到“业务SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”
2.执行“业务SQL”更新业务数据,在业务数据更新之后,将其保存为“after image”,最后生成行锁
以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
在二阶段分为两种情况,如果是顺利提交的话,因为“业务SQL”在一阶段已经提交到数据库,所以Seata框架只需要将一阶段保存的快照数据和行锁删掉,完成数据清理即可
在二阶段分为两种情况,如果是回滚的话,Seata就需要回滚一阶段已经执行的业务SQL,还原业务数据。回滚方式是“before image”还原业务数据,但是在还原前首先要校验脏写,也就是对比“数据库当前业务数据”和“after image”如果两分数据完全一致说明没有脏写。(脏数据就是业务数据被其他人动过了)可以还原业务数据,如果不一致就说明有脏写,需要人工处理。