这里想将自己学习到的分布式锁和乐观锁的一些知识记录下,希望能够对这两样东西有更加深刻的理解和更为娴熟的使用。
分布式锁
分布式锁可以基于很多组件来完成。
zookeeper,基于临时节点
redis,基于setnx命令;
memcache,基于add函数;
分布式锁我们可以理解为排他锁。一个分布式锁需要具备哪些条件呢?
1. 锁能主动释放
2. 锁超时后能够释放,防止死锁
以redis为例,来说明分布式锁的在实际中的使用(以下代码是示例代码, 运行不了,缺少必要的pom文件和配置文件以及必要的jar包)。
首先给出自定义的RedisLockAnnotation注解,然后在需要加锁的方法上添加对应注解: @RedisLockAnnotation(key = "DistributedLock:Records")
。
package com.meituan.redisLockAspect;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisLockAnnotation {
//redis缓存key
String key();
//redis缓存key对应的value
String value() default "";
//过期时间
long expire() default 100;
}
package com.meituan.redisLockAspect;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;
interface Records {
void doSomething();
}
@Component("dealRecords")
public class DealRecords implements Records {
@RedisLockAnnotation(key = "DistributedLock:Records")
public void doSomething() {
System.out.println("doSomething");
}
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
Records dealRecords = (Records) ctx.getBean("dealRecords");
dealRecords.doSomething();
}
}
完成这些后,需要利用spring的AOP编程编写(前置-before,后置-after,环绕-around)切换函数:
package com.meituan.redisLockAspect;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Method;
@Aspect
@Component
public class RedisLockAspect {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(RedisLockAspect.class);
@Resource
private RedisLockUtil redisLockUtil;
@Around(value = "execution(* *(..)) &&@annotation(RedisLockAnnotation)", argNames = "pjp,RedisLockAnnotation")
public Object redisDistributedLock(final ProceedingJoinPoint pjp, RedisLockAnnotation redisLockAnnotation) throws Throwable {
System.out.println("Around.... ");
Object result = null;
Signature signature = pjp.getSignature();
if (!(signature instanceof MethodSignature)) {
return result;
}
MethodSignature methodSignature = (MethodSignature) signature;
Method targetMethod = methodSignature.getMethod();
//获取注解信息
String key = redisLockAnnotation.key();
try {
boolean isHit = redisLockUtil.acquireLock(key);
if (isHit) {
result = pjp.proceed();
}
} catch (Exception e) {
logger.error("RedisLockAspect内错误,错误信息为: {}", e.getMessage());
} finally {
redisLockUtil.releaseLock(key);
}
return result;
}
}
最后编写获取redis key的流程,利用setnx命名的特性:
package com.meituan.redisLockAspect;
import com.dianping.squirrel.client.impl.redis.RedisStoreClient;
import org.apache.commons.lang.StringUtils;
import org.slf4j.LoggerFactory;
import com.dianping.squirrel.client.StoreKey;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class RedisLockUtil {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);
public static final int PRODUCTLOCK_VALID_TIME = 4;
public static final int MAX_RETRY_GET_LOCK_NUMBER = 2;
public static final int NEXT_RETRY_TIME = 50;
private String REDIS_LOCK_NAME_PREFIX = "redis_lock_name_prefix";
@Resource(name = "redisClient")
private RedisStoreClient redisStoreClient;
public boolean acquireLock(String key) {
return acquireLock(REDIS_LOCK_NAME_PREFIX, key, PRODUCTLOCK_VALID_TIME);
}
public boolean releaseLock(String key) {
return releaseLock(REDIS_LOCK_NAME_PREFIX, key);
}
private boolean acquireLock(String prefixName, String key, int expireSecond) {
int retryNumber = 1;
boolean isGetLock = false;
while (retryNumber <= MAX_RETRY_GET_LOCK_NUMBER) {
StoreKey storeKey = getStoreKey(prefixName, key);
boolean result = redisStoreClient.setnx(storeKey, 1, expireSecond);
if (result) {
isGetLock = true;
break;
} else {
try {
Thread.sleep(NEXT_RETRY_TIME);
} catch (InterruptedException e) {
logger.error("获取分布式锁失败", e);
}
retryNumber++;
}
}
if (!isGetLock) {
logger.error("获取分布式锁失败,key=" + prefixName + key);
}
return isGetLock;
}
public boolean releaseLock(String prefixName, String key) {
boolean result = redisStoreClient.delete(getStoreKey(prefixName, key)) == null ? false : true;
if (result) {
return result;
} else {
logger.error(new StringBuilder().append("删除分布式锁失败,key=").append(prefixName).append(key).toString());
}
return result;
}
private StoreKey getStoreKey(String prefixName, String key) {
if (!StringUtils.isBlank(key)) {
return new StoreKey(prefixName, key);
}
return null;
}
}
乐观锁
我理解的乐观锁就是CAS(CompareAndSwap),乐观锁一般认为数据的冲突不会非常频繁出现,所以数据在正式提交的时候才去检测是否冲突,如果出现冲突就返回错误给用户,让用户自己决定如何处理。一般的乐观锁实现是给这条数据增加一个标记字段:version
或者timestamp
。在读取数据的时候将标记字段信息一并读取出来,更新记录的时候,带上这个标记,如果更新时候发现这条记录的标记不等于之前读取的,那么就说明这条记录已经被其他线程修改了,直接返回,我们采用Mybatis来写demo说明。最终落到sql上的表现是:
update table_name
set status=2,
version=version+1
where id = #{id}
and version=#{version}
使用Mybatis实践,如下: