sequence介绍
sequence是一个基于雪花算法(Snowflake)实现的64位自增ID算法,实现语言是JAVA。
其在雪花算法的基础上,做了一些优化,解决了原生算法的一些痛点:
- 支持用户自定义允许时间回拨的范围;
- 解决了跨毫秒时起始值从0开始增长的问题;
- 解决了高并发场景中获取时间戳性能的问题;
雪花算法的介绍可以参考博主上篇文章:还在用数据库自增ID做主键?建议了解一下雪花算法生成的分布式ID
性能
从官方的性能测试数据来看,sequence每秒可以生成最多418万个有序的ID,即TPS=400w/s,已经基本满足绝大部分的业务场景。
源代码及使用
核心代码一共两个类,分别是Sequence.java和SystemClock.java。
Sequence是产生分布式ID的核心,SystemClock主要解决了高并发场景下System.currentTimeMills()的性能问题。
大家也可根据自己的需要,修改其中的部署属性值,比如自定义机器标识,起始时间戳等。
Sequence.java :
public class Sequence {
private static final Logger log = LoggerFactory.getLogger(Sequence.class);
/**
* 时间起始标记点,作为基准,一般取系统的最近时间
* 一旦确定不能变动,确定后改变此值可能造成id重复
*/
private final long twepoch = 1519740777809L;
//5位的机房id
private final long datacenterIdBits = 5L;
//5位的机器id
private final long workerIdBits = 5L;
//每毫秒内产生的id数: 2的12次方个
private final long sequenceBits = 12L;
protected final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
protected final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
//所属机房id
private final long datacenterId;
//所属机器id
private final long workerId;
//并发控制序列
private long sequence = 0L;
//上次生产 ID 时间戳
private long lastTimestamp = -1L;
//获取IP
private static volatile InetAddress LOCAL_ADDRESS = null;
private static final Pattern IP_PATTERN = Pattern.compile("\d{1,3}(\.\d{1,3}){3,5}$");
//默认的无参构造
public Sequence() {
this.datacenterId = getDatacenterId();
this.workerId = getMaxWorkerId(datacenterId);
}
//有参构造器,我们可以自定义机器id和机房id
public Sequence(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("Worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("Datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}
//基于网卡MAC地址计算余数作为数据中心,如果不想用网卡可以自定义
protected long getDatacenterId() {
long id = 0L;
try {
NetworkInterface network = NetworkInterface.getByInetAddress(getLocalAddress());
if (null == network) {
id = 1L;
} else {
byte[] mac = network.getHardwareAddress();
if (null != mac) {
id = ((0x000000FF & (long) mac[mac.length - 2]) | (0x0000FF00 & (((long) mac[mac.length - 1]) << 8))) >> 6;
id = id % (maxDatacenterId + 1);
}
}
} catch (Exception e) {
log.warn(" getDatacenterId: " + e.getMessage());
}
return id;
}
//基于 MAC + PID 的 hashcode 获取16个低位
protected long getMaxWorkerId(long datacenterId) {
StringBuilder mpId = new StringBuilder();
mpId.append(datacenterId);
String name = ManagementFactory.getRuntimeMXBean().getName();
if (name != null && name.length() > 0) {
// GET jvmPid
mpId.append(name.split("@")[0]);
}
// MAC + PID 的 hashcode 获取16个低位
return (mpId.toString().hashCode() & 0xffff) % (maxWorkerId + 1);
}
//获取下一个 ID
public synchronized long nextId() {
//获取当前时间戳,这里通过SystemClock优化获取性能
long timestamp = timeGen();
// 时间回拨了
if (timestamp < lastTimestamp) {
long offset = lastTimestamp - timestamp;
if (offset <= 5) {
try {
// 休眠双倍差值后重新获取,再次校验
wait(offset << 1);
timestamp = timeGen();
if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", offset));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", offset));
}
}
if (lastTimestamp == timestamp) {
// 相同毫秒内,序列号自增
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
// 同一毫秒的序列数已经达到最大
timestamp = tilNextMillis(lastTimestamp);
}
} else {
// 不同毫秒内,序列号置为 1 - 3 随机数
sequence = ThreadLocalRandom.current().nextLong(1, 3);
}
lastTimestamp = timestamp;
// 时间戳部分 | 数据中心部分 | 机器标识部分 | 序列号部分
return ((timestamp - twepoch) << timestampLeftShift)
| (datacenterId << datacenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
protected long timeGen() {
return SystemClock.INSTANCE.currentTimeMillis();
}
/**
* Find first valid IP from local network card
*
* @return first valid local IP
*/
public static InetAddress getLocalAddress() {
if (LOCAL_ADDRESS != null) {
return LOCAL_ADDRESS;
}
LOCAL_ADDRESS = getLocalAddress0();
return LOCAL_ADDRESS;
}
private static InetAddress getLocalAddress0() {
InetAddress localAddress = null;
try {
localAddress = InetAddress.getLocalHost();
if (isValidAddress(localAddress)) {
return localAddress;
}
} catch (Throwable e) {
log.warn("Failed to retrieving ip address, " + e.getMessage(), e);
}
try {
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
if (interfaces != null) {
while (interfaces.hasMoreElements()) {
try {
NetworkInterface network = interfaces.nextElement();
Enumeration<InetAddress> addresses = network.getInetAddresses();
while (addresses.hasMoreElements()) {
try {
InetAddress address = addresses.nextElement();
if (isValidAddress(address)) {
return address;
}
} catch (Throwable e) {
log.warn("Failed to retrieving ip address, " + e.getMessage(), e);
}
}
} catch (Throwable e) {
log.warn("Failed to retrieving ip address, " + e.getMessage(), e);
}
}
}
} catch (Throwable e) {
log.warn("Failed to retrieving ip address, " + e.getMessage(), e);
}
log.error("Could not get local host ip address, will use 127.0.0.1 instead.");
return localAddress;
}
private static boolean isValidAddress(InetAddress address) {
if (address == null || address.isLoopbackAddress()) {
return false;
}
String name = address.getHostAddress();
return (name != null && !"0.0.0.0".equals(name) && !"127.0.0.1".equals(name) && IP_PATTERN.matcher(name).matches());
}
}
SystemClock.java:
//利用ScheduledExecutorService实现高并发场景下
//System.curentTimeMillis()的性能问题的优化.
public enum SystemClock {
INSTANCE(1);
private final long period;
private final AtomicLong nowTime;
private boolean started = false;
private ScheduledExecutorService executorService;
SystemClock(long period) {
this.period = period;
this.nowTime = new AtomicLong(System.currentTimeMillis());
}
public void initialize() {
if (started) {
return;
}
this.executorService = new ScheduledThreadPoolExecutor(1, r -> {
Thread thread = new Thread(r, "system-clock");
thread.setDaemon(true);
return thread;
});
executorService.scheduleAtFixedRate(() -> nowTime.set(System.currentTimeMillis()),
this.period, this.period, TimeUnit.MILLISECONDS);
Runtime.getRuntime().addShutdownHook(new Thread(this::destroy));
started = true;
}
public long currentTimeMillis() {
return started ? nowTime.get() : System.currentTimeMillis();
}
public String currentTime() {
return new Timestamp(currentTimeMillis()).toString();
}
public void destroy() {
if (executorService != null) {
executorService.shutdown();
}
}
}
最后
sequence虽然大幅提升了性能,但是在某些情况下仍然可能出现重复的情况,比如机器标识重复、起始时间戳被修改重置等,这些问题需要我们特别注意。
总得来说,sequence被称为分布式高效ID生产黑科技并不为过,著名的ORM框架mybatis plus用的也是这个组件,大家有兴趣也可以去了解一下。
学习技术,分享技术,期待与大家共同进步,也感谢您的点赞与关注。