原生实现
本文承接sharding-jdbc源码之分布式ID,在这篇文章中详细介绍了sharding-jdbc的分布式ID是如何实现的;很遗憾的是sharding-jdbc只是基于snowflake算法实现了如何生成分布式ID,并没有解决snowflake算法的缺点:
- 时钟回拨问题;
- 趋势递增,而不是绝对递增;
- 不能在一台服务器上部署多个分布式ID服务;
第2点算不上缺点,毕竟如果绝对递增的话,需要牺牲不少的性能;第3点也算不上缺点,即使一台足够大内存的服务器,在部署一个分布式ID服务后,还有很多可用的内存,可以用来部署其他的服务,不一定非得在一台服务器上部署一个服务的多个实例;可以参考elasticsearch的主分片和副本的划分规则:某个主分片的副本是不允许和主分片在同一台节点上--因为这样意思不大,如果这个分片只拥有一个副本,且这个节点宕机后,服务状态就是"RED";
所以这篇文章的主要目的是解决第1个缺点:时钟回拨问题
;先来看一下sharding-jdbc的DefaultKeyGenerator.java中generateKey()
方法的源码:
@Override
public synchronized Number generateKey() {
long currentMillis = timeService.getCurrentMillis();
Preconditions.checkState(lastTime <= currentMillis, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds", lastTime, currentMillis);
if (lastTime == currentMillis) {
if (0L == (sequence = ++sequence & SEQUENCE_MASK)) {
currentMillis = waitUntilNextTime(currentMillis);
}
} else {
sequence = 0;
}
lastTime = currentMillis;
if (log.isDebugEnabled()) {
log.debug("{}-{}-{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime)), workerId, sequence);
}
return ((currentMillis - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (workerId << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
}
说明:从这段代码可知,sharding-jdbc并没有尝试去解决snowflake算法时钟回拨的问题,只是简单判断如果
lastTime <= currentMillis
不满足就抛出异常:Preconditions.checkState(lastTime <= currentMillis, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds", lastTime, currentMillis);
改进思路
snowflake算法一个很大的优点就是不需要依赖任何第三方组件,笔者想继续保留这个优点:毕竟多一个依赖的组件,多一个风险,并增加了系统的复杂性。
snowflake算法给workerId预留了10位,即workId的取值范围为[0, 1023],事实上实际生产环境不大可能需要部署1024个分布式ID服务,所以:将workerId取值范围缩小为[0, 511],[512, 1023]这个范围的workerId当做备用workerId。workId为0的备用workerId是512,workId为1的备用workerId是513,以此类推……
说明:如果你的业务真的需要512个以上分布式ID服务才能满足需求,那么不需要继续往下看了,这个方案不适合你^^;
改进实现
对generateKey()
方法的改进如下:
// 修改处: workerId原则上上限为1024, 但是为了每台sequence服务预留一个workerId, 所以实际上workerId上限为512
private static final long WORKER_ID_MAX_VALUE = 1L << WORKER_ID_BITS >> 1;
/**
* 保留workerId和lastTime, 以及备用workerId和其对应的lastTime
*/
private static Map<Long, Long> workerIdLastTimeMap = new ConcurrentHashMap<>();
/**
* Generate key. 考虑时钟回拨, 与sharding-jdbc源码的区别就在这里</br>
* 缺陷: 如果连续两次时钟回拨, 可能还是会有问题, 但是这种概率极低极低
* @return key type is @{@link Long}.
* @Author 阿飞
*/
@Override
public synchronized Number generateKey() {
long currentMillis = System.currentTimeMillis();
// 当发生时钟回拨时
if (lastTime > currentMillis){
// 如果时钟回拨在可接受范围内, 等待即可
if (lastTime - currentMillis < MAX_BACKWARD_MS){
try {
Thread.sleep(lastTime - currentMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
// 如果时钟回拨太多, 那么换备用workerId尝试
// 当前workerId和备用workerId的值的差值为512
long interval = 512L;
// 发生时钟回拨时, 计算备用workerId[如果当前workerId小于512,
// 那么备用workerId=workerId+512; 否则备用workerId=workerId-512, 两个workerId轮换用]
if (MyKeyGenerator.workerId >= interval) {
MyKeyGenerator.workerId = MyKeyGenerator.workerId - interval;
} else {
MyKeyGenerator.workerId = MyKeyGenerator.workerId + interval;
}
// 取得备用workerId的lastTime
Long tempTime = workerIdLastTimeMap.get(MyKeyGenerator.workerId);
lastTime = tempTime==null?0L:tempTime;
// 如果在备用workerId也处于过去的时钟, 那么抛出异常
// [这里也可以增加时钟回拨是否超过MAX_BACKWARD_MS的判断]
Preconditions.checkState(lastTime <= currentMillis, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds", lastTime, currentMillis);
// 备用workerId上也处于时钟回拨范围内的逻辑还可以优化: 比如摘掉当前节点. 运维通过监控发现问题并修复时钟回拨
}
}
// 如果和最后一次请求处于同一毫秒, 那么sequence+1
if (lastTime == currentMillis) {
if (0L == (sequence = ++sequence & SEQUENCE_MASK)) {
currentMillis = waitUntilNextTime(currentMillis);
}
} else {
// 如果是一个更近的时间戳, 那么sequence归零
sequence = 0;
}
lastTime = currentMillis;
// 更新map中保存的workerId对应的lastTime
workerIdLastTimeMap.put(MyKeyGenerator.workerId, lastTime);
if (log.isDebugEnabled()) {
log.debug("{}-{}-{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime)), workerId, sequence);
}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime))
+" -- "+workerId+" -- "+sequence+" -- "+workerIdLastTimeMap);
return ((currentMillis - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (workerId << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
}
改进验证
第一个红色箭头的地方通过修改本地时间,通过启动了备用workerId从而避免了时钟回拨60s(window系统直接修改系统时间模拟)引起的问题;第二个红色箭头在60s内再次模拟时钟回拨,就会有问题,因为无论是workerId还是备用workerId都会有冲突;如果第二红色箭头是60s后模拟时钟回拨,依然可以避免问题,原因嘛,你懂得;
再次优化
每个workerId可以配置任意个备用workerId,由使用者去平衡sequence服务的性能及高可用,终极版代码如下:
public final class MyKeyGenerator implements KeyGenerator {
private static final long EPOCH;
private static final long SEQUENCE_BITS = 12L;
private static final long WORKER_ID_BITS = 10L;
private static final long SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1;
private static final long WORKER_ID_LEFT_SHIFT_BITS = SEQUENCE_BITS;
private static final long TIMESTAMP_LEFT_SHIFT_BITS = WORKER_ID_LEFT_SHIFT_BITS + WORKER_ID_BITS;
/**
* 每台workerId服务器有3个备份workerId, 备份workerId数量越多, 可靠性越高, 但是可部署的sequence ID服务越少
*/
private static final long BACKUP_COUNT = 3;
/**
* 实际的最大workerId的值<br/>
* workerId原则上上限为1024, 但是需要为每台sequence服务预留BACKUP_AMOUNT个workerId,
*/
private static final long WORKER_ID_MAX_VALUE = (1L << WORKER_ID_BITS) / (BACKUP_COUNT + 1);
/**
* 目前用户生成ID的workerId
*/
private static long workerId;
static {
Calendar calendar = Calendar.getInstance();
calendar.set(2018, Calendar.NOVEMBER, 1);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
// EPOCH是服务器第一次上线时间点, 设置后不允许修改
EPOCH = calendar.getTimeInMillis();
}
private long sequence;
private long lastTime;
/**
* 保留workerId和lastTime, 以及备用workerId和其对应的lastTime
*/
private static Map<Long, Long> workerIdLastTimeMap = new ConcurrentHashMap<>();
static {
// 初始化workerId和其所有备份workerId与lastTime
// 假设workerId为0且BACKUP_AMOUNT为4, 那么map的值为: {0:0L, 256:0L, 512:0L, 768:0L}
// 假设workerId为2且BACKUP_AMOUNT为4, 那么map的值为: {2:0L, 258:0L, 514:0L, 770:0L}
for (int i = 0; i<= BACKUP_COUNT; i++){
workerIdLastTimeMap.put(workerId + (i * WORKER_ID_MAX_VALUE), 0L);
}
System.out.println("workerIdLastTimeMap:" + workerIdLastTimeMap);
}
/**
* 最大容忍时间, 单位毫秒, 即如果时钟只是回拨了该变量指定的时间, 那么等待相应的时间即可;
* 考虑到sequence服务的高性能, 这个值不易过大
*/
private static final long MAX_BACKWARD_MS = 3;
/**
* Set work process id.
* @param workerId work process id
*/
public static void setWorkerId(final long workerId) {
Preconditions.checkArgument(workerId >= 0L && workerId < WORKER_ID_MAX_VALUE);
MyKeyGenerator.workerId = workerId;
}
/**
* Generate key. 考虑时钟回拨, 与sharding-jdbc源码的区别就在这里</br>
* 缺陷: 如果连续两次时钟回拨, 可能还是会有问题, 但是这种概率极低极低
* @return key type is @{@link Long}.
* @Author 阿飞
*/
@Override
public synchronized Number generateKey() {
long currentMillis = System.currentTimeMillis();
// 当发生时钟回拨时
if (lastTime > currentMillis){
// 如果时钟回拨在可接受范围内, 等待即可
if (lastTime - currentMillis < MAX_BACKWARD_MS){
try {
Thread.sleep(lastTime - currentMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
tryGenerateKeyOnBackup(currentMillis);
}
}
// 如果和最后一次请求处于同一毫秒, 那么sequence+1
if (lastTime == currentMillis) {
if (0L == (sequence = ++sequence & SEQUENCE_MASK)) {
currentMillis = waitUntilNextTime(currentMillis);
}
} else {
// 如果是一个更近的时间戳, 那么sequence归零
sequence = 0;
}
lastTime = currentMillis;
// 更新map中保存的workerId对应的lastTime
workerIdLastTimeMap.put(MyKeyGenerator.workerId, lastTime);
if (log.isDebugEnabled()) {
log.debug("{}-{}-{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime)), workerId, sequence);
}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime))
+" -- "+workerId+" -- "+sequence+" -- "+workerIdLastTimeMap);
return ((currentMillis - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (workerId << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
}
/**
* 尝试在workerId的备份workerId上生成
* @param currentMillis 当前时间
*/
private long tryGenerateKeyOnBackup(long currentMillis){
System.out.println("try GenerateKey OnBackup, map:" + workerIdLastTimeMap);
// 遍历所有workerId(包括备用workerId, 查看哪些workerId可用)
for (Map.Entry<Long, Long> entry:workerIdLastTimeMap.entrySet()){
MyKeyGenerator.workerId = entry.getKey();
// 取得备用workerId的lastTime
Long tempLastTime = entry.getValue();
lastTime = tempLastTime==null?0L:tempLastTime;
// 如果找到了合适的workerId
if (lastTime<=currentMillis){
return lastTime;
}
}
// 如果所有workerId以及备用workerId都处于时钟回拨, 那么抛出异常
throw new IllegalStateException("Clock is moving backwards, current time is "
+currentMillis+" milliseconds, workerId map = " + workerIdLastTimeMap);
}
private long waitUntilNextTime(final long lastTime) {
long time = System.currentTimeMillis();
while (time <= lastTime) {
time = System.currentTimeMillis();
}
return time;
}
}
核心优化代码在方法
tryGenerateKeyOnBackup()
中,BACKUP_COUNT
即备份workerId数越多,sequence服务避免时钟回拨影响的能力越强,但是可部署的sequence服务越少,设置BACKUP_COUNT
为3,最多可以部署1024/(3+1)即256个sequence服务,完全够用,抗时钟回拨影响的能力也得到非常大的保障。
改进总结
这种改进方案最大优点就是没有引入任何第三方中间件(例如redis,zookeeper等),但是避免时钟回拨能力得到极大的提高,而且时钟回拨本来就是极小概率。阿飞Javaer认为这种方案能够达到绝大部分sequence服务的需求了;