基于set实现
set有有序表,因此基于时间可以做到一个有序的任务列表,方便的实现添加、删除和查询功能。
struct TimerItem {
list_head link;
int64_t target_time;
};
template<class T, TimerItem T::*timer_item = &T::timer_item>
class TimerBySet {
using Unit = int64_t;
inline static void set_timeout(T* t, Unit u) { (t->*timer_item).target_time = u;}
inline static Unit get_timeout(const T* t) { return (t->*timer_item).target_time;}
inline static list_head* get_link(T* t) {
return const_cast<list_head*>(get_link(static_cast<const T*>(t)));
}
inline static const list_head* get_link(const T* t) {
return &((t->*timer_item).link);
}
struct Compare {
bool operator()(T*c, T*r) const {
auto cus = get_timeout(c), rus = get_timeout(r);
if (cus != rus) return cus < rus;
return reinterpret_cast<const void*>(c) < reinterpret_cast<const void*>(r);
}
};
public:
void init(Unit) {}
bool empty() const {
return items_.empty();
}
size_t size() const {
return items_.size();
}
Unit min_time() const {
if (items_.empty()) return 0;
auto itr = *items_.begin();
return get_timeout(itr);
}
void insert(Unit time_us, T* t) {
set_timeout(t, time_us);
items_.insert(t);
}
void remove(T* t) {
items_.erase(t);
}
void pop(Unit now, list_head* result_list) {
result_list->clear();
if (empty() || min_time() > now) return;
while (!items_.empty() && get_timeout(*items_.begin()) <= now) {
T* t = *items_.begin();
items_.erase(items_.begin());
set_timeout(t, 0);
result_list->push_back(get_link(t));
}
}
private:
std::set<T*, Compare> items_;
};
基于堆
小根堆可以很方便得到最小值,插入和删除也是O(log(N))复杂度。
struct TimerHeapItem: public TimerItem {
size_t index;
};
template<class T, TimerHeapItem T::*timer_item = &T::timer_item>
class TimerByHeap {
using Unit = int64_t;
inline static void set_timeout(T* t, Unit u) { (t->*timer_item).target_time = u;}
inline static Unit get_timeout(const T* t) { return (t->*timer_item).target_time;}
inline static void set_index(T* t, size_t index) { (t->*timer_item).index = index;}
inline static size_t get_index(const T* t) { return (t->*timer_item).index;}
inline static list_head* get_link(T* t) {
return const_cast<list_head*>(get_link(static_cast<const T*>(t)));
}
inline static const list_head* get_link(const T* t) {
return &((t->*timer_item).link);
}
public:
void init(Unit) {}
bool empty() const { return heap_.empty();}
size_t size() const { return heap_.size();}
Unit min_time() const {
if (empty()) return 0;
return get_timeout(heap_[0]);
}
void insert(Unit tp, T* t) {
assert(tp > 0);
set_timeout(t, tp);
set_index(t, size());
heap_.push_back(t);
_adjust(size() - 1);
}
void remove(T* t) {
size_t index = get_index(t);
if (unlikely(index >= size())) {
return;
}
set_index(t, std::numeric_limits<size_t>::max());
if (unlikely(index + 1 == size())) {
heap_.back() = nullptr;
heap_.pop_back();
return;
}
t = heap_[index] = heap_.back();
set_index(t, index);
heap_.back() = nullptr;
heap_.pop_back();
_adjust(index);
}
void pop(Unit now, list_head* result_list) {
result_list->clear();
while (!empty() && get_timeout(heap_[0]) <= now) {
T* t = heap_[0];
remove(t);
result_list->push_back(get_link(t));
}
}
private:
bool _less_than(size_t a, size_t b) {
return get_timeout(heap_[a]) < get_timeout(heap_[b]);
}
void _swap(size_t a, size_t b) {
set_index(heap_[a], b);
set_index(heap_[b], a);
std::swap(heap_[a], heap_[b]);
}
void _adjust(size_t index) {
assert(index < heap_.size());
// adjust up
for (size_t pos = index; pos > 0; ) {
size_t father = (pos - 1) >> 1;
if (!_less_than(pos, father)) {
break;
}
_swap(pos, father);
pos = father;
}
// adjust down
for (;index < size();) {
size_t left = 2 * index + 1, right = left + 1;
if (left >= size()) break;
if (right < size() && _less_than(right, index) && _less_than(right, left)) {
_swap(index, right);
index = right;
continue;
}
if (_less_than(left, index)) {
_swap(index, left);
index = left;
continue;
}
break;
}
}
std::deque<T*> heap_;
};
基于时间轮
时间轮本质上是利用时间的整数特性实现的特殊的哈希表。
这里的采用int64表示时间,每6位二进制做一个时间轮,总计11个轮,每个轮有2^6=64个分片。插入的时间按照高位到低位和now对比,第一个不同的轮就是需要插入的位置,再取轮中的6位插入对应的分片中。删除时也能从时间反推到插入的位置,由于采用双向链表结构,插入和删除都是常量时间复杂度。pop和查询min_time()相对麻烦,耗时更多。考虑到插入和删除是频率最高的调用,因此这个耗时在接受范围内。为了加快pop和查询,同时还采用位图记录每个分片是否为空,通过位图的高效查找跳过空的分片。
位图的代码如下。
inline size_t high_zero_bit_num(uint64_t num) {
return __builtin_clzll(num);
}
inline size_t low_zero_bit_num(uint64_t num) {
return __builtin_ctzll(num);
}
template<size_t BitNum>
class BitMap {
public:
BitMap() {
clear();
}
void clear() {
for (size_t i=0; i< kUnitNum;++i) bitmap_[i] = 0;
}
inline void clear_bit(size_t nth) {
if (unlikely(nth >= BitNum)) return;
size_t idx = nth >> kBitmapShift;
size_t bit = nth & kUnitMask;
bitmap_[idx] &= ~(uint64_t(1) << bit);
}
inline void set_bit(size_t nth) {
if (unlikely(nth >= BitNum)) return;
size_t idx = nth >> kBitmapShift;
size_t bit = nth & kUnitMask;
bitmap_[idx] |= uint64_t(1) << bit;
}
size_t size() const { return BitNum;}
inline bool get_bit(size_t nth) const {
if (unlikely(nth >= BitNum)) return false;
size_t idx = nth >> kBitmapShift;
size_t bit = nth & kUnitMask;
return (bitmap_[idx] >> bit) & 1;
}
// [start, end)
// return nth where get_bit(nth) == true, else return end if not found
inline size_t find_first_set_bit(size_t start = 0, size_t end = BitNum) const {
if (unlikely(end > BitNum)) {
end = BitNum;
}
if (unlikely(start >= end)) {
return end;
}
size_t idx_start = start >> kBitmapShift;
size_t bit_start = start & kUnitMask;
size_t idx_end = end >> kBitmapShift;
size_t bit_end = end & kUnitMask;
uint64_t unit = (bitmap_[idx_start] >> bit_start) << bit_start;
size_t zn = low_zero_bit_num(unit);
if (zn < kBitmapUnit) {
return (idx_start<<kBitmapShift) + zn;
}
for (size_t k=idx_start+1; k<idx_end; ++k) {
zn = low_zero_bit_num(bitmap_[k]);
if (zn == kBitmapUnit) {
continue;
}
return (k<<kBitmapShift) + zn;
}
if (idx_end >= kUnitNum) {
return end;
}
zn = low_zero_bit_num(bitmap_[idx_end]);
if (zn < bit_end) {
return (idx_end<<kBitmapShift) + zn;
}
return end;
}
private:
static constexpr size_t kBitmapShift = 6;
static constexpr size_t kBitmapUnit = 1 << kBitmapShift;
static constexpr size_t kUnitMask = kBitmapUnit - 1;
static constexpr size_t kUnitNum =
(BitNum + kBitmapUnit - 1) >> kBitmapShift;
std::array<uint64_t, kUnitNum> bitmap_;
};
时间轮代码如下。
struct TimerItem {
list_head link;
int64_t target_time;
};
template<class T, TimerItem T::*timer_item = &T::timer_item>
class TimeWheel {
private:
// timer = [wheel1, wheel2, ...]
// wheel = [slot1, slot2, ... slot_64]
using Unit = int64_t;
static constexpr size_t kUnitBits = 8 * sizeof(Unit);
static constexpr size_t kSlotBits = 6;
static constexpr size_t kSlotNum = 1 << kSlotBits;
static constexpr size_t kWheels = (kUnitBits + kSlotBits - 1)/kSlotBits;
static constexpr size_t kSlotAll = kWheels * kSlotNum;
Unit now_{};
size_t size_{};
BitMap<kSlotAll> slot_bitmap_;
list_head timeouts_{};
std::array<list_head, kSlotAll> items_;
inline static void set_timeout(T* t, Unit u) { (t->*timer_item).target_time = u;}
inline static Unit get_timeout(const T* t) { return (t->*timer_item).target_time;}
inline static list_head* get_link(T* t) {
return const_cast<list_head*>(get_link(static_cast<const T*>(t)));
}
inline static const list_head* get_link(const T* t) {
return &((t->*timer_item).link);
}
inline static T* from_link(list_head* h) {
return const_cast<T*>(from_link(static_cast<const list_head*>(h)));
}
inline static const T* from_link(const list_head* h) {
static const size_t off =
reinterpret_cast<size_t>(get_link(static_cast<const T*>(nullptr)));
if (h == nullptr) return nullptr;
const char* ptr = reinterpret_cast<const char*>(h) - off;
const T* t = reinterpret_cast<const T*>(ptr);
return t;
}
inline size_t slot_in_wheel(Unit u, size_t wheel) const {
// assert(wheel < kWheels);
return (u >> (kSlotBits * (kWheels - 1 - wheel))) & (kSlotNum - 1);
}
inline size_t wheel_at(Unit u) const {
size_t zero_bit_n = high_zero_bit_num(now_ ^ u);
assert(zero_bit_n < kUnitBits);
return (zero_bit_n + kWheels * kSlotBits - kUnitBits) / kSlotBits;
}
inline size_t slot_in_global(Unit u) const {
size_t wheel = wheel_at(u);
size_t index = (wheel << kSlotBits) | slot_in_wheel(u, wheel);
return index;
}
inline void _remove(list_head* h, list_head* result_list) {
remove(from_link(h));
result_list->push_back(h);
}
inline void _insert_to_slots(Unit tp, list_head* node) {
auto global_slot = slot_in_global(tp);
list_head& h = items_[global_slot];
bool empty = h.empty();
h.push_back(node);
if (empty) {
slot_bitmap_.set_bit(global_slot);
}
}
inline void _pop_to_list(list_head* result_list, list_head* head) {
for (list_head* h=head->next; h != head; h=head->next) {
_remove(h, result_list);
}
}
// [start_slot, end_slot)
inline void _pop_wheel(size_t wheel, size_t end_slot, list_head* result_list) {
size_t global_base_slot = wheel << kSlotBits;
size_t start_slot = slot_in_wheel(now_, wheel);
size_t start_global = global_base_slot + start_slot;
size_t end_global = global_base_slot + end_slot;
for(size_t i = start_global; i < end_global; ++i) {
i = slot_bitmap_.find_first_set_bit(i, end_global);
if (i == end_global) {
break;
}
list_head* head = &items_[i];
_pop_to_list(result_list, head);
slot_bitmap_.clear_bit(i);
}
}
inline bool _find_min_time(size_t wheel, Unit* tp) const {
size_t slot_base = wheel << kSlotBits;
size_t slot_end = (wheel+1) << kSlotBits;
size_t slot_start = slot_in_wheel(now_, wheel) + 1 + slot_base;
size_t global_slot = slot_bitmap_.find_first_set_bit(slot_start, slot_end);
if (global_slot >= slot_end) {
return false;
}
*tp = now_ & ~((Unit(1)<<(kSlotBits*(kWheels - wheel))) - 1);
*tp += (global_slot - slot_base) << (kSlotBits * (kWheels - wheel - 1));
return true;
}
public:
TimeWheel() = default;
void debug_check_bitmap() const {
for(size_t i=0;i<kSlotAll;++i){
if (items_[i].empty()){
log_assert(!slot_bitmap_.get_bit(i))<<"bit not 0, i="<<i;
} else {
log_assert(slot_bitmap_.get_bit(i))<<"bit not 1, i="<<i;
}
}
}
void init(Unit now) {
assert(now >= 0);
now_ = now;
slot_bitmap_.clear();
}
bool empty() const { return 0 == size_;}
size_t size() const { return size_;}
// return mt, all timer after mt
// if some timer timeouts or no timer, return now
Unit min_time() const {
if (empty() || !timeouts_.empty()) return now_;
Unit tp = 0;
for (size_t w=kWheels; w > 0 && !_find_min_time(w-1, &tp);--w);
return tp;
}
void insert(Unit tp, T* t) {
if (unlikely(t == nullptr)) {
return;
}
auto node = get_link(t);
assert(node->empty());
assert(tp > 0);
set_timeout(t, tp);
++size_;
if (tp <= now_) {
timeouts_.push_back(node);
return;
}
_insert_to_slots(tp, node);
}
void remove(T* t) {
if (unlikely(t == nullptr)) {
return;
}
list_head* th = get_link(t);
auto tp = get_timeout(t);
if (th->empty() || tp < 0) return;
set_timeout(t, 0);
th->remove();
assert(size_ > 0);
--size_;
if (tp > now_) {
size_t global_slot = slot_in_global(tp);
if (items_[global_slot].empty()) {
slot_bitmap_.clear_bit(global_slot);
}
}
}
void pop(Unit now, list_head* result_list) {
result_list->remove();
if (now_ > now) {
return;
}
if (now == now_) {
_pop_to_list(result_list, &timeouts_);
return;
}
auto start_wheel = wheel_at(now);
/*
now_: A1 B1 C1 D1
now : A1 B2 C2 D2
*/
for (size_t wheel = kWheels - 1; wheel > start_wheel; --wheel) {
size_t end_slot = kSlotNum;
_pop_wheel(wheel, end_slot, result_list);
}
size_t end_slot = slot_in_wheel(now, start_wheel);
_pop_wheel(start_wheel, end_slot, result_list);
now_ = now;
size_t target_slot = start_wheel * kSlotNum + end_slot;
list_head* slot_head = &items_[target_slot];
while (slot_head->next != slot_head) {
list_head* node=slot_head->next;
T* t = from_link(node);
auto tp = get_timeout(t);
if (tp <= now) {
_remove(node, result_list);
continue;
}
_insert_to_slots(tp, node);
}
slot_bitmap_.clear_bit(target_slot);
}
DISALLOW_COPY(TimeWheel);
};
测试代码
struct TimeoutPerfItem {
// TimerItem timer_item;
TimerHeapItem timer_item;
int64_t get_timeout_us() const { return timer_item.target_time;}
const list_head* get_timeout_link() const { return &timer_item.link;}
};
// using TestTimer = TimerBySet<TimeoutPerfItem>;
using TestTimer = TimerByHeap<TimeoutPerfItem>;
// using TestTimer = wt::TimeWheel<TimeoutPerfItem>;
static void test_timer_benchmark(size_t num) {
size_t rm_num = num/2;
int64_t base_us = wt::steady_us();
TestTimer tm1{};
std::vector<TimeoutPerfItem> items;
items.resize(num);
auto m1 = wt::System::process_memory_kb();
auto t0 = wt::steady_us();
tm1.init(base_us);
for (size_t i = 0; i < num; i++) {
tm1.insert(base_us + i * 97, &items[i]);
}
auto t1 = wt::steady_us();
auto m2 = wt::System::process_memory_kb();
log_info()<<"timer benchmark: num="<<num;
// insert num:1000000, mem_kb:0
log_info()<<"insert num:"<<num<<",items_mem_kb:"<<m1<<", mem_kb:"<<(m2-m1);
// insert cost_ms:8, avg_ns:8
log_assert(tm1.size() == num)<<"insert, size="<<tm1.size();
log_info()<<"insert cost_ms:"<<(t1-t0)/1000
<<", avg_ns:"<<1000*(t1-t0)/num;
auto t2 = wt::steady_us();
for (size_t i = 0; i < rm_num;++i) {
tm1.remove(&items[i]);
}
auto t3 = wt::steady_us();
auto dt_rm = t3 - t2;
log_assert(tm1.size() == num - rm_num)<<"remove, size="<<tm1.size();
// remove cost_ms:2, avg_ns:4
log_info()<<"remove cost_ms:"<<dt_rm/1000
<<", avg_ns:"<<1000*dt_rm/rm_num;
auto t4 = wt::steady_us();
for (size_t i = rm_num; i < num;++i) {
list_head result;
tm1.pop(items[i].get_timeout_us(), &result);
log_assert(result.size() == 1);
log_assert(result.next == items[i].get_timeout_link());
}
auto t5 = wt::steady_us();
auto dt_pop = t5 - t4;
log_assert(tm1.size() == 0)<<"pop, size="<<tm1.size();
// pop cost_ms:40, avg_ns:81
log_info()<<"pop cost_ms:"<<dt_pop/1000
<<", avg_ns:"<<1000*dt_pop/(num - rm_num);
tm1.init(base_us);
for (size_t i = rm_num; i < num; i++) {
tm1.insert(base_us + i * 97, &items[i]);
}
// tm1.debug_check_bitmap();
auto t6 = wt::steady_us();
for (size_t i = rm_num; i < num;++i) {
auto min_time = tm1.min_time();
log_assert(min_time > items[i-1].get_timeout_us()
&& min_time <= items[i].get_timeout_us()) << "i=" <<i
<<",min_time="<<min_time<<",expect="<<items[i].get_timeout_us();
list_head result;
tm1.pop(items[i].get_timeout_us(), &result);
log_assert(result.size() == 1);
log_assert(result.next == items[i].get_timeout_link());
}
auto dt7 = wt::steady_us() - t6;
auto dt_mintime = std::max(dt7, dt_pop) - dt_pop;
// min_time cost_ms:11, avg_ns:22
log_info()<<"min_time cost_ms:"<<dt_mintime/1000
<<", avg_ns:"<<1000*dt_mintime/(num - rm_num);
}
DefineBoolArg(enable_test_timer, "run test_timer()");
static void test_timer() {
if (!enable_test_timer) return;
for (size_t num: {10, 100, 1000, 2000}) {
test_timer_benchmark(10000 * num);
}
}
性能对比结果
实现类型 | num | insert | remove | pop | min_time | 内存/KB |
---|---|---|---|---|---|---|
基于Set | 1万 | 147 | 74 | 50 | 0 | 4804 |
100万 | 186 | 103 | 40 | 0 | 42240 | |
1000万 | 239 | 124 | 31 | 0 | 421872 | |
2000万 | 285 | 132 | 35 | 2 | 468596 | |
基于堆 | 1万 | 27 | 301 | 338 | 0 | 1108 |
100万 | 15 | 299 | 283 | 8 | 7392 | |
1000万 | 14 | 398 | 422 | 0 | 73656 | |
2000万 | 15 | 458 | 462 | 0 | 147704 | |
基于时间轮 | 1万 | 4 | 4 | 53 | 7 | 0 |
100万 | 4 | 4 | 52 | 3 | 0 | |
1000万 | 4 | 4 | 51 | 18 | 0 | |
2000万 | 5 | 4 | 65 | 7 | 0 |
结论:基于set的实现插入耗时巨大,应该是申请内存和节点遍历的原因,删除耗时次之,可见节点太多后树太高,遍历耗时,pop只删除起始节点,耗时相对小一点,min_time()查询很快。set耗内存很高,1千万定时器内存达到422MB。
结论:基于堆的实现deque由大块内存组织而成,插入又是固定在尾巴,耗时很小;pop和删除总是在头部节点,删除后将尾部置换到头部再重新调整,需要调整堆的所有层,因此耗时巨大;min_time()没有耗时,符合预期。1千万定时器内存消耗74MB,相对set小了很多。
结论:基于时间轮的实现插入和删除耗时仅5ns,而且保持在常量时间,符合预期;pop耗时50-60ns,在可接受范围内,min_time查询时间在7ns左右,可以当做常量时间。内存上没有额外开销,完全采用item即可。综上性能上,完胜前两个实现。