对于有序并且对增删改操作友好的数据结构有List、Tree等,对于Tree实现起来可能比较复杂,而SkipList(跳表)也可实现有序存储并且增删改的性能也不错,只是增加了空间复杂度,是一种空间换时间的思路,在Java中ConcurrentSkipListMap就是基于跳表实现的,它可以作为并发安全的有序集合使用,并且锁粒度可控,比Collections.synchronizedMap性能要好。至于为什么不用Tree来实现并发安全的有序集合,很大程度上是因为Tree的复杂性很难控制锁粒度。
1、SkipList
下面以一个例子来解释跳表的原理:
一个LinkedList,存储了【1,3,6,8,9,11,15,33】,它的存储结构是这个样子的:
当然作为链表,查找操作一个元素的时间复杂度是O(n),效率更高的树存储,可以达到O(lgn)。但是如上面所说树的复杂性较高,实现起来需要注意很多地方,而在并发场景下更重要的是树没法最小化控制锁粒度,而跳表则更适合,在SkipList中上面例子的存储结构可能是这样的:
SkipList使用分层的结构,第一层结构和List一样,往上一层以一定间隔提取数据建立一个类似索引的结构,往上依次可以理解为一级索引与二级索引。
比如查找33就不需要从头遍历链表了,查找路径是这样的:
2、ConcurrentSkipListMap
首先参考上面的图来介绍ConcurrentSkipListMap中定义的存储结构
Node:
数据存储还是使用Node节点,包括key、value、next:
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
......
}
index:
从上面的图可以知道,还需要一个结构来定义某一层的某一个节点,这个节点需要存储的信息有自己所属的level、下一个节点、右一个节点、该节点对应的Node:
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
/**
* compareAndSet right field.
*/
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
return RIGHT.compareAndSet(this, cmp, val);
}
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
Node<K,V> n = node;
newSucc.right = succ;
return n.value != null && casRight(succ, newSucc);
}
final boolean unlink(Index<K,V> succ) {
return node.value != null && casRight(succ, succ.right);
}
// VarHandle mechanics
private static final VarHandle RIGHT;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
RIGHT = l.findVarHandle(Index.class, "right", Index.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
在ConcurrentSkipListMap中定义了Index和HeadIndex两个。
我们从最基本的查询数据方法来看看ConcurrentSkipListMap是怎么实现的:
private Node<K,V> findNode(Object key) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
Comparator<? super K> cmp = comparator; //比较器,可以构造初始化时传入
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //findPredecessor获取前驱节点
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read //两次读取不一致,循环重试
break;
if ((v = n.value) == null) { // n is deleted //该节点已被删除,循环重试
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted //前驱节点被删除,循环重试
break;
if ((c = cpr(cmp, key, n.key)) == 0) //比较节点key值,相等则返回该节点value,不等说明这期间发生了变化,循环重试
return n;
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}
/**
* Returns a base-level node with key strictly less than given key,
* or the base-level header if there is no such node. Also
* unlinks indexes to deleted nodes found along the way. Callers
* rely on this side-effect of clearing indices to deleted nodes.
* @param key the key
* @return a predecessor of key
*/
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
for (Index<K,V> q = head, r = q.right, d;;) {
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
if ((d = q.down) == null)
return q.node;
q = d;
r = d.right;
}
}
}
简单的说跳表查找数据key-A的值的时候先通过index索引节点来定位到小于A的level =1的最大索引节点,再向右遍历查找到key = A的节点。(这也是跳表相比与树的另一个优势:跳表查找某一区间的数的复杂度也是lgn,因为只要定位到左区间,向右就可以遍历到右区间了,而树对于获取区间值却不行,Redis中数据存储使用的就是跳表+散列表。当然像红黑树这种对于跳表的优势在于出现的比较早,Java中很多已有的结构都是红黑树实现的,直接用就行了,而跳表用的比较少,如果自己实现还要写跳表的逻辑)