讨论/技术交流/ 交流 | 剖析Linux服务器の定时器设计/
交流 | 剖析Linux服务器の定时器设计

导读: 本期来讲解下Linux服器中的定时器设计。

在上一期走近reactor设计模式,剖析高性能服务器框架核心一文中,我们聊了设计高性能Linux服务器的基本设计要点,其基本设计架构如下:

  void loop() { 
    quit_ = true;
    while(!quit_) { 
      int timeout = next_timer_timeout(); // 获取最近超时的定时任务 

      int numevents = epoll_wait(efd, &events, maxEvents, timeout);
        
      handle_event(events, numevents);	 // 处理监听到的任务
      handle_timer_events();			 // 处理定时器任务
    }
    quit_ = false;
  }

并且留下一个问题:在loop循环中,next_timer_timeout函数是怎么获取最早定时任务的超时时间。这个问题的关键在于怎么设计一个定时器。这一期,我们就继续聊下服务器中定时器的设计要点。

目前,常见定时器设计有如下三种:

  • 基于「链表」:redis的设计方案;
  • 基于「小顶堆」:libuv的设计方案;
  • 基于「时间轮」。

下面对这三种设计方案分别讲解。

链表

使用链表来存储定时任务,思路也很简单:将所有的定时任务按升序排序,那么链表第一个节点即最早超时的任务。这样,每次调用next_timer_timeout 函数获取最早超时任务的超时时间,所需的时间复杂度即 O(1)

对于链表有所了解的小伙伴,应该都知道以下两点:

  • 虽然,取出链表首节点的时间复杂度是 O(1)
  • 但是,当需要向链表中添加新的定时任务时,保持整个链表有序的时间复杂度是 O(N)

因此,当定时任务比较多,基于链表实现的定时器就会降低整个服务器的性能。

redis的定时器设计

redis的定时器就是基于链表设计,但是和上述有点区别:

  • 在插入的时候直接在链表头部插入,时间复杂度为O(1)
  • 在查询最早超时任务时,需要遍历整个链表,时间复杂度为O(N)

那么问题来了,对于redis这样的高性能K-V数据库,怎么会选择基于链表实现呢?

说到底,还是看数据规模。

当数据规模比较大的时候,使用链表来维护数据的有序性就会显得力不从心。但是,当数据量较小,这个O(N)的时间复杂度其实就接近O(1)。这种思想在redis的基础数据结构里,也是随处可见。

恰好,redis的定时器任务并不多,故而就使用了最为朴素的数据结构:「链表」。

aeTimeEvent

在redis中,存储定时器事件的结构是 struct aeTimeEvent ,定时器链表的头节点被存储在aeEventLoop之中。

typedef struct aeTimeEvent {
    long long id;             /* time event identifier. */
    long when_sec;            /* seconds */
    long when_ms;             /* milliseconds */
    aeTimeProc *timeProc;     // 待执行的回答函数
    aeEventFinalizerProc* finalizerProc;
    void *clientData;         // timeProc的入口参数
    
    struct aeTimeEvent *prev; // 指向下一个节点
    struct aeTimeEvent *next; // 指向上一个节点
    int refcount;    
} aeTimeEvent;


typedef struct aeEventLoop {
    //....
    aeTimeEvent *timeEventHead; // 链表头节点
	//..
} aeEventLoop;

aeCreateTimeEvent

aeCreateTimeEvent函数,向链表中添加新的定时任务,其步骤大致如下:

  1. 完成待插入节点te相关参数的初始化;
  2. 将待插入节点te插入到链表头部。

完整的实现如下。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, 
                            long long milliseconds,
                            aeTimeProc *proc, 
                            void *clientData,
                            aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;	// id
    aeTimeEvent *te = zmalloc(sizeof(*te));
    if (te == NULL) 
        return AE_ERR;
    /*** 节点初始化 ***/
    te->id = id;
    // 设置超时时间
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); 
    // 设置回调函数
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    /*** 插入到链表头部 ***/
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

aeSearchNearestTimer

在redis中,由aeSearchNearestTimer函数获取最早超时任务的时间点。执行步骤如下两步:

  1. 获取链表的头节点te,即第一个定时任务;

  2. 遍历整个链表,找到最早超时的定时任务节点,并将其存储在nearest

整个实现,时间复杂度O(N)。具体实现如下。

static aeTimeEvent* aeSearchNearestTimer(aeEventLoop *eventLoop) {
    
    aeTimeEvent *te = eventLoop->timeEventHead;	// 定时器任务的头节点
    aeTimeEvent *nearest = NULL;                // 最早超时的定时任务

    while(te) {
        if (!nearest                           || 
            te->when_sec < nearest->when_sec   || 
            (te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms)) 
        {
            nearest = te;
        }
            
        te = te->next; // 下一个节点
    }
    
    return nearest;
}

小顶堆

当定时任务变得越来越多,如果仍然采用链表来组织定时任务,就无法满足高性能服务器的要求了。

此时,可以采用小顶堆来设计定时器,其属性如下:

  • 最早超时的定时任务总是处于堆顶;
  • 每次只需要取出堆顶的节点,查询它的timeout,计算后就可以设置epoll_wait的阻塞时间,时间复杂度仍然是O(1)
  • 插入一个新的定时任务,时间复杂度O(logN)

当定时任务规模较大时,小顶堆O(logN)的时间复杂度远低于链表的O(N)。下面以libuv作为案例进行分析。

libuv的定时器设计

libuv用c语言自己实现了堆,相关接口都在src\heap-inl.h文件中。如果不熟悉堆的,可以百度下「数据结构中的堆」相关特征,熟悉下堆的特征。

libuv的定时器就是基于小顶堆实现。

堆的节点类型是struct heap_node,不过这里的struct heap_node不存储数据,仅仅提供了指针节点,用于实现完全二叉树的遍历等操作。

struct heap_node {
  struct heap_node* left;
  struct heap_node* right;
  struct heap_node* parent;
};

而真正存储数据的节点是结构体uv_timer_t,即真正的定时任务节点,uv_timer_t的结构定义如下:

typedef struct uv_timer_s uv_timer_t;

struct uv_timer_s {
  //...
  void*       heap_node[3]; // heap_node                                                       
  int         unused;                                                                 
  uint64_t    timeout;     // 超时时间                                                      
  uint64_t    repeat;      // 定时任务触发后,是否重复          	                                         
  uint64_t    start_id;                                                          
  uv_timer_cb timer_cb;    // 回调函数
}

那你可能好奇,即便我取得了堆的顶点,怎么从heap_node节点寻找到它对应的uv_time_t节点呢?

在libuv中,有个container_of宏,可以通过结构体的某个字段,找到该字段所属的结构体地址。

#define container_of(ptr, type, member) \
  ((type *) ((char *) (ptr) - offsetof(type, member)))

由于offsetof函数可以获得字段 member 在结构体 type 中的偏移量。而且,在C/C++中,结构体的内存分布是连续的空间,因此可以将 ptr 向上移动 offsetof(type, member) 个字节,就可以获得字段 member 所属的结构体地址。

对应到 head_nodeuv_timer_t ,即如下:

const struct heap_node* heap_node; 
uv_timer_t* handle = container_of(heap_node, uv_timer_t, heap_node);  

libuv中获取堆顶点、向堆中插入节点的两个函数,如下:

struct heap_node* heap_min(const struct heap* heap)); // 获得堆顶的节点
void heap_insert(struct heap* heap, 
                 struct heap_node* newnode, 
                 heap_compare_fn less_than));         // 插入新的节点

timer_less_than

节点之间的有序性,需要通过一个比较函数来判断。libuv中是由timer_less_than函数完成。

static int timer_less_than(const struct heap_node* ha,
                           const struct heap_node* hb) {
  const uv_timer_t* a;
  const uv_timer_t* b;

  a = container_of(ha, uv_timer_t, heap_node);
  b = container_of(hb, uv_timer_t, heap_node);

  if (a->timeout < b->timeout)
    return 1;
  if (b->timeout < a->timeout)
    return 0;

  // timeout相同,比较谁先创建
  return a->start_id < b->start_id;
}

uv__next_timeout

在redis中,由 aeSearchNearestTimer函数返回最早定时任务触发的绝对时间。在libuv中这一过程是由uv__next_timeout函数完成,不过返回的是当前时间距离最早触发的定时任务之间的「时间差」。

整个过程如下:

  1. 从堆中取出最小节点heap_noe,其实就是堆顶点;

  2. heap_node,找到heap_node所属的uv_timer_t节点handle,进一步获取最早定时任务的超时时间handle->timeout

  3. 判断下该hanlde->timeout是否比当前时间loop->time小:

    • 是,则说明最早的定时任务已经到时间了,那么就直接返回0。这样的话,即使epoll_Wait没有监听到新的任务到,也有不会阻塞,能立马去执行定时任务。

    • 否,则就返回当前时间距离最早触发的定时器之间的时间差。

具体的代码实现如下。

int uv__next_timeout(const uv_loop_t* loop) {
  const struct heap_node* heap_node;
  const uv_timer_t* handle;
  uint64_t diff;

  heap_node = heap_min(timer_heap(loop));	// 堆顶点
  if (heap_node == NULL)
    return -1; 
    
  // 通过堆节点,找到所属的 uv_timer_t 节点的首地址
  handle = container_of(heap_node, uv_timer_t, heap_node);
  // 最早触发的定时任务的超时时间和当前时间比较
  if (handle->timeout <= loop->time)
    return 0;

  diff = handle->timeout - loop->time;
  if (diff > INT_MAX)
    diff = INT_MAX;

  return (int) diff;
}

by the way

如果看的源码较少,可能会对struct heap_node这种只包含指向节点,而不包含数据的设计有些困惑,甚至难于理解。

因为在常用的STL里,节点都是包含了数据。以链表节点为例:

struct Node { 
	int val;
	Node* next;
	Node* prev;
};

这样,拿到了每个节点就能直接使用里面的存储的数据。

如果单纯只表达链表本身的概念,实际上是与数据无关的,此时可以设计如下:

struct Node { 
	Node* next;
	Node* prev;
};

然后,让真正的数据节点包含这个Node。这样,Node只是起到了一个节点之间的连接作用。拿到节点之后,还需要寻找到相应的数据实体,就像libuv中需要经过一个container_of宏来寻找数据实体节点。

这两种设计方式,各有利弊。更多信息,可以搜索「侵入式设计与非侵入式设计」。

时间轮

时间轮(TimeWheel),有点类似于基于开链法实现的哈希表。关于开链法的哈希表实现原理,不清楚地可以参考前文:

时间轮,可以理解为一个以固定周期转动的「轮子」,其中时间轮的每个格子叫槽(slot)。

time_wheel.jpg

为便于描述,设时间轮有N个slot,当前指向的slot是cs(current slot),时间轮每次前进一个slot的时间间隔为 si (slot interval),因此时间轮转动周期是 N*si

设新插入的定时任务的超时时间是 to(timeout),那么将这个定时任务节点插入到时间轮的过程如下:

  1. 先计算出待插入slot的索引index

    index=(cs+to/si)%Nindex = (cs + to/si) \% N

  2. 将待插入的节点,插入到索引为index的slot指向的链表中。

从插入过程不难发现,两个特点:

  1. 同一链表上,节点间的超时时间差是N*si的整数倍;

  2. 因此,当前slot中的定时任务,并不一定比下一个slot中的定时任务早触发。

由第二个特点,可知,如果想要寻找距离当前时间点最早触发的定时器任务,似乎并不容易,可能在当前slot,也可能在下一个slot,甚至下下个....

那么在时间轮中,怎么计算并设置epoll_wait的阻塞时间?

只能以固定的时间间隔si,来作为epoll_wait的阻塞时间。因此,时间轮并不是一个非常精确的定时器,如果周期固定,时间轮的slot越多,则精度越高。与此时同时,slot越多,每个slot的链表上挂载的节点就可能越少,每次遍历链表时的时间复杂度就相应降低。

目前我阅读的开源库中,没有知名的项目使用了时间轮(我阅读的并不多)。因此,时间轮本身的设计源码可以见《Linux高性能服务器编程》的demo。

好嘞,到此,就分析完了常用的定时器设计。


推荐阅读:

  1. 踩坑记 (1) | std::sort函数隐藏着BUG,你知道吗
  2. 网络编程 (1) | 走近reactor设计模式,剖析高性能服务器框架核心
  3. 必先利其器(3) | 分享我所有的C++学习资源
  4. 必先利其器(2) | 手把手教你学会gdb,适应Linux调试环境
12
共 2 个回复

如果觉得写的不错 点个赞吧

3

CPPer 冲冲冲

1