支付操作出现的问题与解决方法
- 场景介绍
- 存在的问题
- 解决方法
- 代码级别加锁
- 数据库级加锁
- 悲观锁
- 乐观锁
- 最终结果
场景介绍
用户完成支付后, 微信支付系统执行回调地址,通知支付服务系统更新用户金额与支付订单记录的状态。
/**
* 更新付款流水,增加用户金额
*
* 1.微信执行回调地址后,更新付款流水,增加用户金额
* (isSuccess = false 微信支付失败的回调,记录订单信息,不更新用户金额)
*
* 2.当订单的状态已更新后,微信再一次执行回调函数时,不执行操作,返回
*
* @param serialNum 订单号
* @param isSuccess 支付是否成功
*/
@Transactional
public void updateDepositState(String serialNum, boolean isSuccess) {
String state = isSuccess ? Constants.DOPOSIT_RECORD_STATE_PAYED : Constants.DOPOSIT_RECORD_STATE_PAYFL;
PacpDepositRecord record = pacpDepositRecordDao.findBySerialNum(serialNum);
if (!record.getState().equals(Constants.DOPOSIT_RECORD_STATE_PAYING)){
//当订单的状态已更新后,微信再一次执行回调函数时,不执行操作
WXPayUtil.getLogger().info("wxnotify:微信支付回调:订单号===>"+serialNum+"状态非充值中");
return;
}
String openId = record.getUserCode();
PacpUser pacpUser = pacpUserDao.findByCode(openId);
Long accountBalance = pacpUser.getAccountBalance();
record.setState(state);
record.setUpdateTime(DateTimeUtils.getCurrentTime());
record.setRestMoney(accountBalance + record.getMoney());
pacpDepositRecordDao.save(record);
if (!isSuccess){
return; //如果支付失败,则不更新用户的金额
}
pacpBillInfoService.createBillInfo(record);
Long expectAccount = accountBalance + record.getMoney();
pacpUserDao.updateAccountBalance(pacpUser.getId(), expectAccount, accountBalance);
}
存在的问题
第二类丢失更新
时间 | 充值事务A | 消费事务B |
---|---|---|
T 1 | 开启事务 | |
T 2 | 开启事务 | |
T 3 | 查询账户余额为1000 | |
T 4 | 查询账户余额为1000 | |
T 5 | 充值金额100 | |
T 6 | 提交事务 | |
T 7 | 消费金额500 | |
T 8 | 提交事务 | |
T 9 | 余额为500 |
用户有1000,先充值100后,在消费500元。实际余额应为600元。但在此种情况内查询余额却为500(丢失了更新)。
上面这个案例只是展示了对金额操作会产生丢失更新的问题,在支付服务系统内更改支付订单状态时也会出现该问题。
解决方法
代码级别加锁
- 使用synchronized锁方法
synchronized(this) { // 读 - 写 }
总结: 不可行, 1. 多个充值任务都会执行该方法。会造成严重的堵塞 2. 在分布式环境下无法保证数据的一致性还是会出现丢失更新问题。
- 使用分布式锁Redisson
try{
// 使用订单号 + 方法名加锁
DistributedLocker.lock(serialNum+"-deposit-save-key");
// 读 - 写
}finally {
DistributedLocker.unlock(serialNum+"-deposit-save-key");
}
总结: 可行, 使用订单号 + 方法名加锁。可以保证每个支付订单的更新的一致。而且只会对当前支付订单号的更新订单方法加锁
不会造成严重的堵塞。不过要在每一个对用户金额或支付订单状态的更改都要加锁。
数据库级别加锁
要求我们使用的MySQL引擎为InnoDB,其为我们提供了两种类型的行锁:
共享锁(S):允许一个事务去读一行,阻止其他事务获得相同数据集的排他锁。
排他锁(X):允许获得排他锁的事务更新数据,阻止其他事务取得相同数据集的共享读锁和排他写锁。
事务可以通过以下语句显式给记录集加共享锁或排他锁:
共享锁(S):SELECT * FROM table_name WHERE ... LOCK IN SHARE MODE。 其他 session 仍然可以查询记录,并也可以对该记录加 share mode 的共享锁。但是如果当前事务需要对该记录进行更新操作,则很有可能造成死锁。
排他锁(X):SELECT * FROM table_name WHERE ... FOR UPDATE。其他 session 可以查询该记录,但是不能对该记录加共享锁或排他锁,而是等待获得锁。
我们一般给数据库加锁比较多的说法是悲观锁和乐观锁,其实无论是悲观锁还是乐观锁,都是人们定义出来的概念,可以认为是一种思想。而某个数据库的某个引擎只是通过自身机制对其进行了实现而已。
- 悲观锁
当我们要对一个数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。
这种借助数据库锁机制在修改数据之前先锁定,再修改的方式被称之为悲观并发控制。之所以叫做悲观锁,是因为这是一种对数据的修改抱有悲观态度的并发控制方式。我们一般认为数据被并发修改的概率比较大,所以需要在修改之前先加锁。
悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。但是在效率方面,处理加锁的机制会让数据库产生额外的开销,还有增加产生死锁的机会;另外,还会降低并行性,一个事务如果锁定了某行数据,其他事务就必须等待该事务处理完才可以处理那行数据。
而悲观锁的实现就是上面说的共享锁和排他锁。其中共享锁是读锁,多个事务都可以获取,容易造成死锁;我们通常用的比较多的是排他锁,也就是FOR UPDATE语句加锁,配合开启事务实现。
注:MySQL InnoDB默认行级锁都是基于索引的,如果一条SQL语句用不到索引是不会使用行级锁的,会使用表级锁把整张表锁住,这点需要注意。
- 乐观锁
乐观锁( Optimistic Locking ) 是相对悲观锁而言的,乐观锁假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做。
相对于悲观锁,在对数据库进行处理的时候,乐观锁并不会使用数据库提供的锁机制。一般的实现乐观锁的方式就是记录数据版本(version)。
乐观并发控制相信事务之间的数据竞争(data race)的概率是比较小的,因此尽可能直接做下去,直到提交的时候才去锁定,所以不会产生任何锁和死锁。
乐观锁的概念中其实已经阐述了他的具体实现细节,主要就是两个步骤:冲突检测和数据更新。
其实现方式有一种比较典型的就是Compare and Swap(CAS)。CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。
在使用Data JPA时,可以通过对对象增加一个version字段和增加一个@version注解实现记录数据版本。具体可看https://www.cnblogs.com/wangzhongqiu/p/7550985.html
在乐观锁与悲观锁的选择上面,主要看下两者的区别以及适用场景就可以了:
乐观锁并未真正加锁,效率高。一旦锁的粒度掌握不好,更新失败的概率就会比较高,容易发生业务失败。
悲观锁依赖数据库锁,效率低。更新失败的概率比较低。
解决方案
根据更新操作的场景。我们需要有两步的操作。
更新订单支付记录状态State
更新用户的金额AccountBalance
我们可以发现对更新订单支付记录状态操作的场景并不常见。对其更新时可以采用悲观锁。
在更新用户的金额AccountBalance是我们可以发现其实很多场景都需要进行更改用户金额的操作)如消费,充值,提现等),所以可以采用乐观锁。
- 更新订单支付记录状态State添加悲观锁
注: 需要给流水号serialNum 添加索引,否则该操作将会进行表锁
public interface PacpDepositRecordDao extends BaseJPADao<PacpDepositRecord, String> {
@Query(value = "SELECT * FROM pacp_deposit_record dc WHERE serial_num = :serialNum FOR UPDATE" ,nativeQuery = true)
PacpDepositRecord findBySerialNumForUpdate(@Param(value = "serialNum") String serialNum);
}
在查询该条记录时对其进行添加排他锁(锁定该行记录)。防止其他事务对其进行读取(堵塞)
PacpDepositRecord record = pacpDepositRecordDao.findBySerialNumForUpdate(serialNum);
在对记录更改状态时,可以保证record不会丢失更新。
直到事务提交后, 释放锁
- 更新用户的金额AccountBalance添加乐观锁,进行CAS更新
由于是要保证金额更新的准确无误,所以只需对金额的字段进行加锁。
public interface PacpUserDao extends BaseJPADao<PacpUser, String> {
/**
* CAS 更新用户金额
* @param id userid
* @param expectAccount 所期待的金额
* @param originalAccount 原始的金额
* @return 更新记录条数
*/
@Modifying
@Query(value = "UPDATE PacpUser u SET u.accountBalance = :expectAccount WHERE u.id = :id AND u.accountBalance = :originalAccount")
int updateAccountBalanceById(@Param(value = "id") String id, @Param("expectAccount") Long expectAccount,@Param("originalAccount") Long originalAccount);
}
添加AOP注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface IsTryAgain {
//重试次数
int retryTimes() default 10;
}
添加切面,捕获异常,自旋CAS
/**
* 定义重试切面方法,是为了发生乐观锁异常时在一个全新的事务里提交上一次的操作,
* 直到达到重试上限;因此切面实现 org.springframework.core.Ordered 接口,
* 们就可把切面的优先级设定为高于事务通知 。
*/
@Aspect
@Component
public class ConcurrentOperationExecutor implements Ordered {
private final Logger log = LogManager.getLogger(getClass());
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Pointcut("@annotation(com.syni.pacp.pay.aop.annotation.IsTryAgain)")
public void operationService() {
}
@Around("operationService()")
@Transactional(rollbackOn = Exception.class)
public Object doConcurrentOperation(ProceedingJoinPoint pjp) throws Throwable {
Signature signature = pjp.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method targetMethod = methodSignature.getMethod();
IsTryAgain annotation = targetMethod.getAnnotation(IsTryAgain.class);
int retryTimes = annotation.retryTimes();
while (true) {
try {
return pjp.proceed();
}catch (OptFailureException e) {
if (retryTimes > 0) {
retryTimes--;
log.info("CAS更新失败, 重新执行更新, 还剩余更新次数:"+ retryTimes);
} else {
throw e;
}
}
}
}
}
自旋执行更新操作ing
int flag = pacpUserDao.updateAccountBalanceById(pacpUser.getId(), expectAccount, accountBalance);
if (flag != 1 ){
//抛出自定义异常
throw new OptFailureException("更新用户金额失败!");
}
当更新用户的金额AccountBalance 失败时,抛出异常new OptFailureException(),ConcurrentOperationExecutor 捕获异常。
查看重试更新操作是否达到指定的最大值,如果没有这继续尝试更新。直到更新成功或者超出最大值为止。
到此最终的解决方案如下:
@IsTryAgain //添加AOP注解
@Transactional(rollbackOn = Exception.class)
public void updateDepositState(String serialNum, boolean isSuccess){
String state = isSuccess ? Constants.DOPOSIT_RECORD_STATE_PAYED : Constants.DOPOSIT_RECORD_STATE_PAYFL;
PacpDepositRecord record = pacpDepositRecordDao.findBySerialNumForUpdate(serialNum);
if (!record.getState().equals(Constants.DOPOSIT_RECORD_STATE_PAYING)){
//当订单的状态已更新后,微信再一次执行回调函数时,不执行操作
WXPayUtil.getLogger().info("wxnotify:微信支付回调:订单号===>"+serialNum+"状态非充值中");
return;
}
String openId = record.getUserCode();
PacpUser pacpUser = pacpUserDao.findByCode(openId);
Long accountBalance = pacpUser.getAccountBalance();
record.setState(state);
record.setUpdateTime(DateTimeUtils.getCurrentTime());
record.setRestMoney(accountBalance + record.getMoney());
pacpDepositRecordDao.save(record);
if (!isSuccess){
return; //如果支付失败,则不更新用户的金额
}
pacpBillInfoService.createBillInfo(record);
Long expectAccount = accountBalance + record.getMoney();
int flag = pacpUserDao.updateAccountBalanceById(pacpUser.getId(), expectAccount, accountBalance);
if (flag != 1 ){
//抛出自定义异常
throw new OptFailureException("更新用户金额失败!");
}
}