RocksDB 是一个基于 write-ahead-log 和 log-structured-merge-tree 实现的嵌入式的 KV 数据库. 如果通过类比来理解,可以认为它是单机版的 HBase. 这篇文章介绍和讨论的是如何基于 RocksDB 实现一个持久化队列,这种持久化队列的适用场景有:
- 需要将数据持久化而不是存放在内存队列中,以防止当应用崩溃时的数据丢失
- 当客户端和服务器端的生产消费速度不匹配,基于内存的队列存储不足,可以外溢到磁盘
1.如何在一个 KV 数据库上实现队列?
-
RocksQueue
由queue_name
和_queue_name
两个列簇组成.queue_name
用来存储数据,_queue_name
存储队列的head
和tail
指针. -
RocksStore
是一个创建队列的工厂且负责维护<queue_name, RocksQueue>
的关系。
2.使用
2.1 创建一个队列
StoreOptions storeOptions = StoreOptions.builder().database("rocks_db").build();
RocksStore rocksStore = new RocksStore(storeOptions);
queue = rocksStore.createQueue(generateQueueName());
2.2 出、入队列(Enqueue,Dequeue)
byte[] something = "something".getBytes();
long id = queue.enqueue(something);
QueueItem dequeue = queue.dequeue();
assertArrayEquals(dequeue.getValue(), something);
2.3 获取、删除队头(Consume, RemoveHead)
你可通过 consume
获取队列头然后处理, 然后使用 removeHead
方便删除队头.
QueueItem head = queue.consume();
log.info("Processing queue head {}", head);
queue.removeHead()
3. JMX 监控指标
项目提供了一些 jmx 指标如下
RocksStore
Metric Name | Description |
---|---|
DatabaseName | RocksStore database name |
RocksdbLocation | RocksStore location |
RocksDBDiskUsageInBytes | The current size for RocksStore in bytes |
NumberOfQueueCreated | How many queues have been created in store |
IsOpen | Is RocksStore been open |
IsClosed | Is RocksStore been closed |
RocksQueue
Metric Name | Description |
---|---|
QueueName | The queue name |
QueueSize | Queue size |
AccumulateBytes | The current size of the queue in bytes,enqueue will increase and dequeue decrease |
HeadIndex | The head of the queue |
TailIndex | The tail oft the queue |
IsCreated | Has the queue been created |
IsClosed | Has the queue been closed |
SecondsSinceLastEnqueue | Seconds since the last enqueue in ms |
SecondsSinceLastConsume | Seconds since the last consume in ms |
SecondsSinceLastDequeue | Seconds since the last dequeue in ms |
Benchmark 测试
Benchmark | Mode | Cnt | Score | Error | Units |
---|---|---|---|---|---|
RocksQueueBenchmark.consume | avgt | 50 | 12576.618 | ± 17929.697 | ns/op |
RocksQueueBenchmark.dequeue | avgt | 50 | 2168917.940 | ± 1063197.522 | ns/op |
RocksQueueBenchmark.enqueue | avgt | 50 | 1762257.820 | ± 232716.449 | ns/op |
RocksQueueBenchmark.removeHead | avgt | 50 | 1558168.420 | ± 276410.130 | ns/op |