Thread pool
背景
在我所做过的一个基于libevent项目中, 我所使用的线程模型是 one event_base per thread + thread pool 模型。每个线程最多有一个event_base, 每一个 TCP 连接必须由某一个event_base 管理,所有这个线程的 IO 都会转移到这个 event_base 上面来处理。换句话来说,一个 file descriptor 只能由一个线程来读写。这样,我们就很方便地把不同的 TCP 连接放到不同的线程里面去。
一个节点支持多线程,它有两种模式
- 单线程, accept 与 TCP 的消息处理在同一个线程做 IO
- 多线程, accept 有一个专门的线程接受连接,然后创建一个新的 Thread-Pool, 新的连接会按round-robin的方式分配。
具体实现
在其他的线程中,只要拿到一个 libevent_thread_t 的对象,往里面的 base 添加事件就好了。
typedef struct libevent_thread_s
{
pthread_t thread_id; // pid of this thread
struct event_base * base; // libevent handle this thread uses
// this pipe is used to stop the event_base from its own thread
struct event * notify_event;
int notify_receive_fd;
int notify_send_fd;
int no; // the thread number, which is needed for test
} libevent_thread_t;
创建新线程调用的函数(一直循环):
static void* worker_libevent(void *arg)
{
libevent_thread_t* me = (libevent_thread_t *)arg;
event_base_dispatch(me->base);
return NULL;
}
虽然其他的线程可以往这个线程添加事件,但是当一个 libevent_thread_t 对象释放的时候, 其他的线程不能 break 这个线程的 event_base,只能是这个线程自己 break 自己的 event_base,所以在实现上我们可以首先要打开一个管道
libevent_thread_t* libevent_thread_new(){
int fds[2];
if(pipe(fds)){
printf("Can't create notify pipe");
return NULL;
}
pr_libevent_thread_t* t = g_new0(pr_libevent_thread_t, 1);
t->notify_receive_fd = fds[0];
t->notify_send_fd = fds[1];
setup_thread(t);
return t;
}
为这个管道一端的 fd 在event_base上面添加监听事件,有数据可读的时候就 break event_base 就可以退出了
static void thread_libevent_process(int fd, short which, void * arg)
{
libevent_thread_t* me = (libevent_thread_t *)arg;
char buf[1];
read(fd, buf, 1);
event_base_loopbreak(me->base);
}
static int setup_thread(libevent_thread_t* me)
{
me->base = event_base_new();
if(!me->base)
{
printf( "Can't allocate event base\n");
return -1;
}
me->notify_event = event_new(me->base, me->notify_receive_fd, EV_READ | EV_PERSIST,
thread_libevent_process, me);
if(event_add(me->notify_event, NULL) == -1)
{
printf("Can't monitor libevent notify pipe\n");
return -1;
}
return 0;
}
所以当我们想要退出时,向 fd 发送数据
void libevent_thread_wakeup(libevent_thread_t* me)
{
char one[1];
one[0] = 1;
write(me->notify_send_fd, one, sizeof(one));
}
至于线程池,就是一个 libevent_thread_t 的数组,通过next来指定函数返回的线程,其中 n 为线程池容量大小, next 自增到 n 时变成 0
libevent_thread_t* event_pool_get_next_thread(libevent_thread_pool_t*);
typedef struct libevent_thread_pool_s
{
int next;
struct spinlock lock;
int n;
pr_libevent_thread_t** slot;
} libevent_thread_pool_t;
注意事项
在多线程环境中,libevent 的 event_base 的 loopbreak必须由他自己的线程来实现,所以其他线程只能是通过管道来通知。
在一个线程往另一个线程的 event_base 添加事件的时候,也就是在多个线程对于一个 event_base 操作的时候,event_base 需要对它自己的数据结构加锁,所以在使用线程池或者写多线程的程序的时候,开始时需要调用, 文档
evthread_use_pthreads()