根据上一篇Demo测试情况反映,当有多个线程同时抢购时,会发生超发现象,所谓超发现象,就是原本设置库存为30000件,但是,当抢购完成后发现库存余量变成了负数,即发货量大于库存量的情况:
造成这种现象的原因:当多个线程请求数据库查询库存余量时,显示有余量,但是当进行扣减库存时,库存已经用完了,但那个线程并不知道,依旧去扣减库存,造成库存为负数的情况,于是乎就出现了超发现象。
测试方法:根据书上是html中使用js,for循环异步请求,发现并不会造成超发现象,后改为在浏览器中同时开启多个窗口访问/test进行抢购,模拟多个用户抢购的场景,内存爆炸...
为了解决这种问题,下面将介绍三种解决方法:
1、悲观锁
发生超发现象的根本原因是共享数据被多个线程所修改,无法保证其执行顺序,如果一个数据库事务读取到一个产品后,就将数据直接锁定,不允许其他线程进行读写操作,直至当前数据库事务完成才释放这条数据的锁,就不会发生超发现象,但是执行效率性能将大大下降。
修改ProductMapper中的SQL语句:
@Mapper
public interface ProductMapper {
@Select("SELECT id, product_name as productName, stock, price, version, note FROM t_product where id=#{id} for update")
ProductPo getProduct(Long id);
@Update("UPDATE t_product SET stock = stock - #{quantity} WHERE id = #{id}")
int decreaseProduct(@Param("id") Long id, @Param("quantity") int quantity);
}
在select语句末尾添加了for update,这样,在数据库事务执行的过程中,就会锁定查询出来的数据,其他事务将不能再对其进行读写,单个请求直至数据库事务完成,才会释放这个锁,下图可见其stock为0,没有发生超发现象,但执行效率下降了,通过购买记录可以得知,相比之前没加锁慢了1/5。
2、乐观锁
为了解决悲观锁带来的性能下降的问题,我们来讨论一下乐观锁的原理:
乐观锁是一种不使用数据库锁和不阻塞线程并发的方案,下图是以本Demo为例的乐观锁流程:
这种方案就是多线程的概念CAS(Compare and Swap),然而这样的方案会引发一种ABA问题:
T1时刻:线程1读取商品库存为A
T2时刻:线程2读取商品库存为A
T3时刻:线程2计算购买商品总价格
T4时刻:当前库存为A,与线程2保存的旧值一致,因此线程2可减库存(当前库存A--->B),此时线程1在当前库存为B的情况下计算剩余商品价格(单价*B)。
T5时刻:线程2取消购买,线程2回退(当前库存B--->A),线程1计算的剩余商品价格错误。
T6时刻:线程1比较旧值与当前数据库库存,发现都为A,返回之前计算好的(单价*B)结果,造成了错误。
从上面的分析中看到一个现象A--->B--->A的过程,就是所谓的ABA问题,解决此问题的方法为加入版本号的限制,只要在操作过程中修改共享值,无论业务正常,回退,还是异常,版本号只能递增,不能回退递减。每次通过比较数据的版本号来查看此数据是否被修改过。
@Mapper
public interface ProductMapper {
@Select("SELECT id, product_name as productName, stock, price, version, note FROM t_product where id=#{id}")
ProductPo getProduct(Long id);
//********************change******************************
@Update("UPDATE t_product SET stock = stock - #{quantity}, version = version + 1 WHERE id = #{id} and version = #{version}")
int decreaseProduct(@Param("id") Long id, @Param("quantity") int quantity, @Param("version") int version);
}
@Override
// 启动Spring数据库事务机制
@Transactional
public boolean purchase(Long userId, Long productId, int quantity) {
// 获取产品
ProductPo product = productMapper.getProduct(productId);
// 比较库存和购买数量
if (product.getStock() < quantity) {
// 库存不足
return false;
}
//**************************change*******************************
// 扣减库存,加入了version
productMapper.decreaseProduct(productId, quantity, product.getVersion());
//***************************************************************
// 初始化购买记录
PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity);
// 插入购买记录
purchaseRecordMapper.insertPurchaseRecord(pr);
return true;
}
发现stock并没有降为0,原因是加入了版本号的判断,所以大量的请求得到了失败的结果,而且失败率有点高。要解决这个方法,就设定为如果失败,就重试,直至成功,但是这样又会造成大量SQL执行,影响性能,所以一般可以使用限制时间或者重入次数的方法来克服。
时间戳限制重入的乐观锁:
将一个请求限制在100ms的生存期,如果在100ms内发生版本号冲突而导致不能更新的,则会重新尝试请求,否则请求失败。
修改service下PurchaseserviceImpl的purchase方法
@Override
// 启动Spring数据库事务机制
@Transactional
public boolean purchase(Long userId, Long productId, int quantity) {
long start = System.currentTimeMillis();
while (true){
long end = System.currentTimeMillis();
if (end - start >100){
return false;
}
// 获取产品
ProductPo product = productMapper.getProduct(productId);
// 比较库存和购买数量
if (product.getStock() < quantity) {
// 库存不足
return false;
}
// 扣减库存
int result = productMapper.decreaseProduct(productId, quantity, product.getVersion());
// 如果数据更新失败,说明数据在多线程中被其他线程修改
// 导致失败,着通过循环重入尝试购买商品
if (result == 0){
continue;
}
// 初始化购买记录
PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity);
// 插入购买记录
purchaseRecordMapper.insertPurchaseRecord(pr);
return true;
}
}
这种方法在测试中效果并不是很好,执行速度很慢,冲突现象并没有减少,反而增多,可能是我测试方法并不好,只开了三个网页来模拟并发,不太懂JS,Demo用的JS是发送异步请求的,但用单窗口测试了好多次都没出现超发现象,只能人肉模拟并发。
限定次数重入的乐观锁:
@Override
// 启动Spring数据库事务机制
@Transactional
public boolean purchase(Long userId, Long productId, int quantity) {
long start = System.currentTimeMillis();
for (int i=0; i<3; i++){
// 获取产品
ProductPo product = productMapper.getProduct(productId);
// 比较库存和购买数量
if (product.getStock() < quantity) {
// 库存不足
return false;
}
// 扣减库存
int result = productMapper.decreaseProduct(productId, quantity, product.getVersion());
// 如果数据更新失败,说明数据在多线程中被其他线程修改
// 导致失败,着通过循环重入尝试购买商品
if (result == 0){
continue;
}
// 初始化购买记录
PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity);
// 插入购买记录
purchaseRecordMapper.insertPurchaseRecord(pr);
return true;
}
return false;
}
这种方式比上一种限定时间好,速度和单纯使用乐观锁差不多,并且消除了冲突。
3、Redis处理高并发
在高并发环境中,直接操作数据库的方式过于缓慢,因为数据库是一个写入磁盘的过程,这个速度没有写入内存的Redis快,Redis的机制也能够帮助我们克服超发现象,但是,因为其命令方式运算能力比较薄弱,所以往往采用Redis Lua去代替它原有的命令方式。Redis Lua在Redis的执行中是局内原子性的,但他被执行时不会被其他客户端发送过来的命令打断,通过这样一种机制,可以在需要高并发的环境下考虑使用Redis去代替数据库作为响应用户的数据载体。但是Redis存储具有不稳定性,所以还需要有一定的机制将Redis存储的数据刷入数据库。
下面先来配置一下Redis:
application.properties
#配置redis
spring.redis.jedis.pool.min-idle=5
spring.redis.jedis.pool.max-active=10
spring.redis.jedis.pool.max-idle=10
spring.redis.jedis.pool.max-wait=2000
spring.redis.port=6379
spring.redis.host=127.0.0.1
#我的Redis没有密码
#spring.redis.password=123456
spring.redis.timeout=1000
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.4.RELEASE</version>
<exclusions>
<!--不依赖Redis的异步客户端lettuce -->
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--引入Redis的客户端驱动jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
PurchaseServiceImpl.java,使用Redis Lua响应请求
@Service
public class PurchaseServiceImpl implements PurchaseService {
@Autowired
private ProductMapper productMapper = null;
@Autowired
private PurchaseRecordMapper purchaseRecordMapper = null;
private PurchaseRecordPo initPurchaseRecord(Long userId, ProductPo product, int quantity) {
PurchaseRecordPo pr = new PurchaseRecordPo();
pr.setNote("购买日志,时间:" + System.currentTimeMillis());
pr.setPrice(product.getPrice());
pr.setProductId(product.getId());
pr.setQuantity(quantity);
double sum = product.getPrice() * quantity;
pr.setSum(sum);
pr.setUserId(userId);
return pr;
}
@Autowired
StringRedisTemplate stringRedisTemplate = null;
String purchaseScript =
// 先将产品编号保存到集合中
" redis.call('sadd', KEYS[1], ARGV[2]) \n"
// 购买列表
+ "local productPurchaseList = KEYS[2]..ARGV[2] \n"
// 用户编号
+ "local userId = ARGV[1] \n"
// 产品key
+ "local product = 'product_'..ARGV[2] \n"
// 购买数量
+ "local quantity = tonumber(ARGV[3]) \n"
// 当前库存
+ "local stock = tonumber(redis.call('hget', product, 'stock')) \n"
// 价格
+ "local price = tonumber(redis.call('hget', product, 'price')) \n"
// 购买时间
+ "local purchase_date = ARGV[4] \n"
// 库存不足,返回0
+ "if stock < quantity then return 0 end \n"
// 减库存
+ "stock = stock - quantity \n"
+ "redis.call('hset', product, 'stock', tostring(stock)) \n"
// 计算价格
+ "local sum = price * quantity \n"
// 合并购买记录数据
+ "local purchaseRecord = userId..','..quantity..','"
+ "..sum..','..price..','..purchase_date \n"
// 保存到将购买记录保存到list里
+ "redis.call('rpush', productPurchaseList, purchaseRecord) \n"
// 返回成功
+ "return 1 \n";
// Redis购买记录集合前缀
private static final String PURCHASE_PRODUCT_LIST = "purchase_list_";
// 抢购商品集合
private static final String PRODUCT_SCHEDULE_SET = "product_schedule_set";
// 32位SHA1编码,第一次执行的时候先让Redis进行缓存脚本返回
private String sha1 = null;
@Override
public boolean purchaseRedis(Long userId, Long productId, int quantity) {
// 购买时间
Long purchaseDate = System.currentTimeMillis();
Jedis jedis = null;
try {
// 获取原始连接
jedis = (Jedis) stringRedisTemplate
.getConnectionFactory().getConnection().getNativeConnection();
// 如果没有加载过,则先将脚本加载到Redis服务器,让其返回sha1
if (sha1 == null) {
sha1 = jedis.scriptLoad(purchaseScript);
}
// 执行脚本,返回结果
Object res = jedis.evalsha(sha1, 2, PRODUCT_SCHEDULE_SET,
PURCHASE_PRODUCT_LIST, userId + "", productId + "",
quantity + "", purchaseDate + "");
Long result = (Long) res;
return result == 1;
} finally {
// 关闭jedis连接
if (jedis != null && jedis.isConnected()) {
jedis.close();
}
}
}
@Override
// 当运行方法启用新的独立事务运行
@Transactional(propagation = Propagation.REQUIRES_NEW)
// 保存购买记录,持久化到数据库
public boolean dealRedisPurchase(List<PurchaseRecordPo> prpList) {
for (PurchaseRecordPo prp : prpList) {
purchaseRecordMapper.insertPurchaseRecord(prp);
productMapper.decreaseProduct(prp.getProductId(), prp.getQuantity());
}
return true;
}
}
使用定时机制,定时将数据持久化到数据库:
首先设置启动文件:
@SpringBootApplication(scanBasePackages = "com.wayne.springboot")
@MapperScan(annotationClass = Mapper.class, basePackages = "com.wayne.springboot")
// 启动springboot的定时机制,为此需要一个定时的方法来提供服务
// 把Redis的数据导入到数据库
@EnableScheduling
public class SpringBootShoppingApplication{
public static void main(String[] args) {
SpringApplication.run(SpringBootShoppingApplication.class, args);
}
}
一个定时的方法来提供服务把Redis的数据导入到数据库:
import com.wayne.springboot.pojo.PurchaseRecordPo;
import com.wayne.springboot.service.PurchaseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@Service
public class TaskServiceImpl implements TaskService {
@Autowired
private StringRedisTemplate stringRedisTemplate = null;
@Autowired
private PurchaseService purchaseService = null;
private static final String PRODUCT_SCHEDULE_SET = "product_schedule_set";
private static final String PURCHASE_PRODUCT_LIST = "purchase_list_";
// 每次取出1000条,避免一次取出消耗太多内存
private static final int ONE_TIME_SIZE = 1000;
@Override
// 每天半夜1点钟开始执行任务
// @Scheduled(cron = "0 0 1 * * ?")
// 下面是用于测试的配置,每分钟执行一次任务
@Scheduled(fixedRate = 1000 * 30)
public void purchaseTask() {
System.out.println("定时任务开始......");
Set<String> productIdList
= stringRedisTemplate.opsForSet().members(PRODUCT_SCHEDULE_SET);
List<PurchaseRecordPo> prpList =new ArrayList<>();
for (String productIdStr : productIdList) {
Long productId = Long.parseLong(productIdStr);
String purchaseKey = PURCHASE_PRODUCT_LIST + productId;
BoundListOperations<String, String> ops
= stringRedisTemplate.boundListOps(purchaseKey);
// 计算记录数
long size = stringRedisTemplate.opsForList().size(purchaseKey);
Long times = size % ONE_TIME_SIZE == 0 ?
size / ONE_TIME_SIZE : size / ONE_TIME_SIZE + 1;
for (int i = 0; i < times; i++) {
// 获取至多TIME_SIZE个抢红包信息
List<String> prList = null;
if (i == 0) {
prList = ops.range(i * ONE_TIME_SIZE,
(i + 1) * ONE_TIME_SIZE);
} else {
prList = ops.range(i * ONE_TIME_SIZE + 1,
(i + 1) * ONE_TIME_SIZE);
}
for (String prStr : prList) {
PurchaseRecordPo prp
= this.createPurchaseRecord(productId, prStr);
prpList.add(prp);
}
try {
// 采用该方法采用新建事务的方式,这样不会导致全局事务回滚
purchaseService.dealRedisPurchase(prpList);
} catch(Exception ex) {
ex.printStackTrace();
}
// 清除列表为空,等待重新写入数据
prpList.clear();
}
// 删除购买列表
stringRedisTemplate.delete(purchaseKey);
// 从商品集合中删除商品
stringRedisTemplate.opsForSet()
.remove(PRODUCT_SCHEDULE_SET, productIdStr);
}
System.out.println("定时任务结束......");
}
private PurchaseRecordPo createPurchaseRecord(
Long productId, String prStr) {
String[] arr = prStr.split(",");
Long userId = Long.parseLong(arr[0]);
int quantity = Integer.parseInt(arr[1]);
double sum = Double.valueOf(arr[2]);
double price = Double.valueOf(arr[3]);
Long time = Long.parseLong(arr[4]);
Timestamp purchaseTime = new Timestamp(time);
PurchaseRecordPo pr = new PurchaseRecordPo();
pr.setProductId(productId);
pr.setPurchaseTime(purchaseTime);
pr.setPrice(price);
pr.setQuantity(quantity);
pr.setSum(sum);
pr.setUserId(userId);
pr.setNote("购买日志,时间:" + purchaseTime.getTime());
return pr;
}
}
到这里基本完成,启动项目前先启动redis服务器,并初始化Redis:
hmset product_1 id 1 stock 10000 price 2.00
然后启动并访问浏览器localhost:8080/test,因为设定的间隔为30s,所以等30s去查看数据库。性能相比之前要快上数倍。