[C++]C风格、C++风格和C++11特性的线程池

线程池概念

假设完成一项任务需要的时间=创建线程时间T1+线程执行任务时间T2+销毁线程时间T3,如果T1+T3的时间远大于T2,通常就可以考虑采取线程池来提高服务器的性能

thread pool就是线程的一种使用模式,一个线程池中维护着多个线程等待接收管理者分配的可并发执行的任务。

  • 避免了处理短时间任务时创建与销毁线程的代价

  • 既保证内核的充分利用,又能防止过度调度

  • 可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets的数量

线程池组成部分

  • 线程池管理器(thread pool):创建、销毁线程池

  • 工作线程(pool wroker):在没有任务时处于等待状态,循环读取并执行任务队列中的任务

  • 任务(task):抽象一个任务,主要规定任务的入口、任务执行完后的收尾工作、任务的执行状态等

  • 任务队列(task queue):存放没有处理的任务,提供一种缓冲机制

C风格ThreadPool

1. 抽象一个任务

将待处理的任务抽象成task结构:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n26" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">typedef struct task {
void* (run)(void args); // abstract a job function that need to run
void* arg; // argument of the run function
struct task* next; // point to the next task in task queue
} task_t;</pre>

2. 任务队列

  • threadpool中用firstlast指针指向首尾两个任务

  • task结构体保证每个task都能指向任务队列中下一个task

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n33" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">typedef struct task {
void* (run)(void args); // abstract a job function that need to run
void* arg; // argument of the run function
struct task* next; // point to the next task in task queue
} task_t;

typedef struct threadpool {
condition_t ready; // condition & mutex
task_t* first; // fist task in task queue
task_t* last; // last task in task queue
int counter; // total task number
int idle; // idle task number
int max_threads; // max task number
int quit; // the quit flag
} threadpool_t;
</pre>

3. 线程安全的问题

设计了condition_t类来实现安全并发:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n36" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">typedef struct condition {
/**
* 互斥锁
/
pthread_mutex_t pmutex;
/
*
* 条件变量
*/
pthread_cond_t pcond;
} condition_t;</pre>

提供对应的接口:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n38" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">/**

  • 初始化
    /
    int condition_init(condition_t
    cond);

/**

  • 加锁
    /
    int condition_lock(condition_t
    cond);
    /**
  • 解锁
    /
    int condition_unlock(condition_t
    cond);

/**

  • 条件等待
  • pthread_cond_wait(cond, mutex)的功能有3个:
    1. 调用者线程首先释放mutex
    1. 然后阻塞, 等待被别的线程唤醒
    1. 当调用者线程被唤醒后,调用者线程会再次获取mutex
      /
      int condition_wait(condition_t
      cond);

/**

  • 计时等待
    /
    int condition_timedwait(condition_t
    cond, const timespec* abstime);

/**

  • 激活一个等待该条件的线程
    1. 作用: 发送一个信号给另外一个处于阻塞等待状态的线程, 使其脱离阻塞状态继续执行
    1. 如果没有线程处在阻塞状态, 那么pthread_cond_signal也会成功返回, 所以需要判断下idle thread的数量
    1. 最多只会给一个线程发信号,不会有「惊群现象」
    1. 首先根据线程优先级的高低确定发送给哪个线程信号, 如果优先级相同则优先发给等待最久的线程
    1. 重点: pthread_cond_wait必须放在lock和unlock之间, 因为他要根据共享变量的状态决定是否要等待; 但是pthread_cond_signal既可以放在lock和unlock之间,也可以放在lock和unlock之后
      /
      int condition_signal(condition_t cond);
      /
  • 唤醒所有等待线程
    */
    int condition_broadcast(condition_t *cond);

/**

  • 销毁
    */
    int condition_destroy(condition_t *cond);</pre>

4. 线程池的实现

4.1 初始化一个线程池

仅仅是初始化了conditionmutex,还有一些线程池的属性。但是任务队列是空的,而且此时也一个线程都没有

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n42" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">// initialize the thread pool
void threadpool_init(threadpool_t* pool, int threads_num) {
int n_status = condition_init(&pool ->ready);
if (n_status == 0) {
printf("Info: initialize the thread pool successfully!\n");
} else {
printf("Error: initialize the thread pool failed, status:%d\n", n_status);
}
pool->first = NULL;
pool->last = NULL;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads_num;
pool->quit = 0;
}</pre>

4.2 向线程池中添加任务,并分配给它一个线程

首先构建task结构体,然后将其加入任务队列。

  • 如果当前有空闲线程那么直接调用空闲线程执行函数

  • 如果无空闲线程且当前线程数未满时创建一个新的线程执行任务

  • 如果无空闲线程且当前线程数已满时,任务会呆在任务队列中等待线程池释放出空闲线程

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n52" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">// add a task to thread pool
void threadpool_add_task(threadpool_t* pool, void* (run)(void arg), void arg) {
// create a task
task_t
new_task = reinterpret_cast<task_t*>(malloc(sizeof(task_t)));
new_task->run = run;
new_task->arg = arg;
new_task->next = NULL;

// lock the condition
condition_lock(&pool->ready);

// add the task to task queue
if (pool->first == NULL) {
    pool->first = new_task;
} else {  // else add to the last task
    pool->last->next = new_task;
}
pool->last = new_task;

/*
 * after you add a task to task queue, you need to allocate it to a thread:
 * (1)if idle thread num > 0: awake a idle thread
 * (2)if idle thread num = 0 & thread num does not reach maximum: create a new thread to run the task
 */
if (pool->idle > 0) {
    // awake a thread that wait for longest time
    condition_signal(&pool->ready);
} else if (pool->counter < pool->max_threads) {
    // define a tid to get the thread identifier that we are going to create
    pthread_t tid;
    /*
     * pthread_create():
     * (1)thread identifier
     * (2)set the pthread attribute
     * (3)the function that thread is going to run
     * (4)the args of run func
     *
     *  A realistic limit of thread num is 200 to 400 threads
     *  https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/ptcrea.htm
     */
    pthread_create(&tid, NULL, thread_routine, pool);
    pool->counter++;
} else {  // when (idle == 0 & counter = max_threads), then wait
    printf("Warning: no idle thread, please wait...\n");
}

condition_unlock(&pool->ready);

}</pre>

5. 线程的执行过程

5.1 如果任务队列为空

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n55" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">// when task queue is empty, then block 2 second to get the new task
// If timeout, then destroy the thread
while (pool->first == NULL && !pool->quit) {
printf("Info: thread %ld is waiting for a task\n", (u_int64_t)pthread_self());
// get the system time
clock_gettime(CLOCK_REALTIME, &abs_name);
abs_name.tv_sec += 2;
int status;
status = condition_timedwait(&pool->ready, &abs_name); // block for 2 second
if (status == ETIMEDOUT) {
printf("Info: thread %ld wait timed out\n", (u_int64_t)pthread_self());
timeout = true;
break;
}
}

...

// if visit task queue timeout(means no task in queue), quit destory the thread
if (timeout) {
pool->counter--;
condition_unlock(&pool->ready);
break; // destroy the thread
}</pre>

5.2 如果任务队列非空

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n57" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">// when the thread run the task, we should unlock the thread pool
if (pool->first != NULL) {
// get the task from task queue
task_t* t = pool->first;
pool->first = t->next;
// unlock the thread pool to make other threads visit task queue
condition_unlock(&pool->ready);

// run the task run func
t->run(t->arg);
free(t);

// lock
condition_lock(&pool->ready);

}</pre>

5.3 没有任务且收到退出信号

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n59" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">// when task queue is clean and quit flag is 1, then destroy the thread
if (pool->quit && pool->first == NULL) {
pool->counter--;
// 若线程池中线程数为0,通知等待线程(主线程)全部任务已经完成
if (pool->counter == 0) {
condition_signal(&pool->ready);
}
condition_unlock(&pool->ready);
break; // destroy the thread
}</pre>

6. 代码

condition.h:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n62" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#ifndef CONDITION_H_

define CONDITION_H_

include <pthread.h>

include <cstdio>

typedef struct condition {
/**
* 互斥锁
/
pthread_mutex_t pmutex;
/
*
* 条件变量
*/
pthread_cond_t pcond;
} condition_t;

/**

  • 初始化
    /
    int condition_init(condition_t
    cond);

/**

  • 加锁
    /
    int condition_lock(condition_t
    cond);
    /**
  • 解锁
    /
    int condition_unlock(condition_t
    cond);

/**

  • 条件等待
  • pthread_cond_wait(cond, mutex)的功能有3个:
    1. 调用者线程首先释放mutex
    1. 然后阻塞, 等待被别的线程唤醒
    1. 当调用者线程被唤醒后,调用者线程会再次获取mutex
      /
      int condition_wait(condition_t
      cond);

/**

  • 计时等待
    /
    int condition_timedwait(condition_t
    cond, const timespec* abstime);

/**

  • 激活一个等待该条件的线程
    1. 作用: 发送一个信号给另外一个处于阻塞等待状态的线程, 使其脱离阻塞状态继续执行
    1. 如果没有线程处在阻塞状态, 那么pthread_cond_signal也会成功返回, 所以需要判断下idle thread的数量
    1. 最多只会给一个线程发信号,不会有「惊群现象」
    1. 首先根据线程优先级的高低确定发送给哪个线程信号, 如果优先级相同则优先发给等待最久的线程
    1. 重点: pthread_cond_wait必须放在lock和unlock之间, 因为他要根据共享变量的状态决定是否要等待; 但是pthread_cond_signal既可以放在lock和unlock之间,也可以放在lock和unlock之后
      /
      int condition_signal(condition_t cond);
      /
  • 唤醒所有等待线程
    */
    int condition_broadcast(condition_t *cond);

/**

  • 销毁
    */
    int condition_destroy(condition_t *cond);

endif // CONDITION_H_</pre>

condition.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n64" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include "condition.h"

// 初始化
int condition_init(condition_t* cond) {
int status;
status = pthread_mutex_init(&cond->pmutex, NULL);
if (status != 0) {
printf("Error: pthread_mutex_init failed, return value:%d\n", status);
return status;
}
status = pthread_cond_init(&cond->pcond, NULL);
if (status != 0) {
printf("Error: pthread_cond_init failed, return value:%d\n", status);
return status;
}
return 0;
}

// 加锁
int condition_lock(condition_t* cond) {
return pthread_mutex_lock(&cond->pmutex);
}

// 解锁
int condition_unlock(condition_t* cond) {
return pthread_mutex_unlock(&cond->pmutex);
}

// 条件等待
int condition_wait(condition_t* cond) {
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}

// 计时等待
int condition_timedwait(condition_t* cond, const timespec* abstime) {
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}

// 激活一个等待该条件的线程
int condition_signal(condition_t *cond) {
return pthread_cond_signal(&cond->pcond);
}

// 唤醒所有等待线程
int condition_broadcast(condition_t *cond) {
return pthread_cond_broadcast(&cond->pcond);
}

// 销毁
int condition_destroy(condition_t *cond) {
int status;
status = pthread_mutex_destroy(&cond->pmutex);
if (status != 0) {
return status;
}

status = pthread_cond_destroy(&cond->pcond);
if (status != 0) {
    return status;
}
return 0;

}</pre>

threadpool.h:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n66" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#ifndef THREAD_POLL_H_

define THREAD_POLL_H_

include "condition.h"

typedef struct task {
void* (run)(void args); // abstract a job function that need to run
void* arg; // argument of the run function
struct task* next; // point to the next task in task queue
} task_t;

typedef struct threadpool {
condition_t ready; // condition & mutex
task_t* first; // fist task in task queue
task_t* last; // last task in task queue
int counter; // total task number
int idle; // idle task number
int max_threads; // max task number
int quit; // the quit flag
} threadpool_t;

/**

  • initialize threadpool
    /
    void threadpool_init(threadpool_t
    pool, int threads_num);

/**

  • add a task to threadpool
    /
    void threadpool_add_task(threadpool_t
    pool, void* (*run)(void args), void arg);

/**

  • destroy threadpool
    /
    void threadpool_destroy(threadpool_t
    pool);

endif // THREAD_POLL_H_</pre>

Threadpool.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n68" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">

include <pthread.h>

include <cstdlib>

include <cstdio>

include <cerrno>

include <ctime>

include "threadpool.h"

void *thread_routine(void *arg);

// initialize the thread pool
void threadpool_init(threadpool_t* pool, int threads_num) {
int n_status = condition_init(&pool ->ready);
if (n_status == 0) {
printf("Info: initialize the thread pool successfully!\n");
} else {
printf("Error: initialize the thread pool failed, status:%d\n", n_status);
}
pool->first = NULL;
pool->last = NULL;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads_num;
pool->quit = 0;
}

// add a task to thread pool
void threadpool_add_task(threadpool_t* pool, void* (run)(void arg), void arg) {
// create a task
task_t
new_task = reinterpret_cast<task_t*>(malloc(sizeof(task_t)));
new_task->run = run;
new_task->arg = arg;
new_task->next = NULL;

// lock the condition
condition_lock(&pool->ready);

// add the task to task queue
if (pool->first == NULL) {
    pool->first = new_task;
} else {  // else add to the last task
    pool->last->next = new_task;
}
pool->last = new_task;

/*
 * after you add a task to task queue, you need to allocate it to a thread:
 * (1)if idle thread num > 0: awake a idle thread
 * (2)if idle thread num = 0 & thread num does not reach maximum: create a new thread to run the task
 */
if (pool->idle > 0) {
    // awake a thread that wait for longest time
    condition_signal(&pool->ready);
} else if (pool->counter < pool->max_threads) {
    // define a tid to get the thread identifier that we are going to create
    pthread_t tid;
    /*
     * pthread_create():
     * (1)thread identifier
     * (2)set the pthread attribute
     * (3)the function that thread is going to run
     * (4)the args of run func
     *
     *  A realistic limit of thread num is 200 to 400 threads
     *  https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/ptcrea.htm
     */
    pthread_create(&tid, NULL, thread_routine, pool);
    pool->counter++;
} else {  // when (idle == 0 & counter = max_threads), then wait
    printf("Warning: no idle thread, please wait...\n");
}

condition_unlock(&pool->ready);

}

// create a thread to run the task run func
// and the void *arg means the arg passed by pthread_create: pool
void *thread_routine(void *arg) {
struct timespec abs_name;
bool timeout;
printf("Info: create thread, and the thread id is: %ld\n", (u_int64_t)pthread_self());
threadpool_t *pool = reinterpret_cast<threadpool_t *>(arg);

// keep visiting the task queue
while (true) {
    timeout = false;
    condition_lock(&pool->ready);
    pool->idle++;

    // when task queue is empty, then block 2 second to get the new task
    // If timeout, then destroy the thread
    while (pool->first == NULL && !pool->quit) {
        printf("Info: thread %ld is waiting for a task\n", (u_int64_t)pthread_self());
        // get the system time
        clock_gettime(CLOCK_REALTIME, &abs_name);
        abs_name.tv_sec += 2;
        int status;
        status = condition_timedwait(&pool->ready, &abs_name);  // block for 2 second
        if (status == ETIMEDOUT) {
            printf("Info: thread %ld wait timed out\n", (u_int64_t)pthread_self());
            timeout = true;
            break;
        }
    }

    pool->idle--;
    // when the thread run the task, we should unlock the thread pool
    if (pool->first != NULL) {
        // get the task from task queue
        task_t* t = pool->first;
        pool->first = t->next;
        // unlock the thread pool to make other threads visit task queue
        condition_unlock(&pool->ready);

        // run the task run func
        t->run(t->arg);
        free(t);

        // lock
        condition_lock(&pool->ready);
    }

    // when task queue is clean and quit flag is 1, then destroy the thread
    if (pool->quit && pool->first == NULL) {
        pool->counter--;
        // 若线程池中线程数为0,通知等待线程(主线程)全部任务已经完成
        if (pool->counter == 0) {
            condition_signal(&pool->ready);
        }
        condition_unlock(&pool->ready);
        break;  // destroy the thread
    }

    // if visit task queue timeout(means no task in queue), quit destory the thread
    if (timeout) {
        pool->counter--;
        condition_unlock(&pool->ready);
        break;  // destroy the thread
    }

    condition_unlock(&pool->ready);
}

// if break, destroy the thread
printf("Info: thread %ld quit\n", (u_int64_t)pthread_self());
return NULL;

}

/*

  • destroy a thread pool:

    1. awake all the idle thread
    1. wait for the running thread to finish
      */
      void threadpool_destroy(threadpool_t *pool) {
      if (pool->quit) {
      return;
      }

    condition_lock(&pool->ready);
    pool->quit = 1;
    if (pool->counter > 0) {
    if (pool->idle > 0) {
    condition_broadcast(&pool->ready);
    }
    while (pool->counter > 0) {
    condition_wait(&pool->ready);
    }
    }
    condition_unlock(&pool->ready);
    condition_destroy(&pool->ready);
    }</pre>

test.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n70" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include <stdlib.h>

include <stdio.h>

include "threadpool.h"

define THREADPOOL_MAX_NUM 30

void* mytask(void *arg) {
printf("Info: thread %ld is working on task %d\n", (u_int64_t)pthread_self(), reinterpret_cast<int>(arg));
sleep(1);
free(arg);
return NULL;
}

int main(int argc, char* argv[]) {
threadpool_t pool;
threadpool_init(&pool, THREADPOOL_MAX_NUM);

// add task to task queue
for (int i=0; i < 100; i++) {
    int *arg = reinterpret_cast<int *>(malloc(sizeof(int)));
    *arg = i;
    threadpool_add_task(&pool, mytask, arg);
}
threadpool_destroy(&pool);
return 0;

}</pre>

编译运行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n72" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g test.cpp threadpool.cpp condition.cpp -o test -std=c++11 -lpthread./test
Info: initialize the thread pool successfully!
Info: create thread, and the thread id is: 139898193295104
Info: create thread, and the thread id is: 139898176509696
Info: thread 139898176509696 is working on task 0
Info: create thread, and the thread id is: 139898168116992
Info: create thread, and the thread id is: 139898184902400
Info: create thread, and the thread id is: 139898134546176
Info: create thread, and the thread id is: 139898126153472
Info: create thread, and the thread id is: 139898117760768
Info: thread 139898117760768 is working on task 1
Info: create thread, and the thread id is: 139898100975360
Info: create thread, and the thread id is: 139898092582656
Info: create thread, and the thread id is: 139898084189952
Info: create thread, and the thread id is: 139898159724288
Info: create thread, and the thread id is: 139898109368064
Info: create thread, and the thread id is: 139898067404544
Info: create thread, and the thread id is: 139898059011840
Info: create thread, and the thread id is: 139898050619136
Info: create thread, and the thread id is: 139898042226432
Info: create thread, and the thread id is: 139898033833728
Info: create thread, and the thread id is: 139898025441024
Info: create thread, and the thread id is: 139898017048320
Info: create thread, and the thread id is: 139898008655616
Info: create thread, and the thread id is: 139898075797248
Info: create thread, and the thread id is: 139898000262912
Info: create thread, and the thread id is: 139898142938880
Info: create thread, and the thread id is: 139898151331584
Info: thread 139898159724288 is working on task 2
Info: thread 139898151331584 is working on task 3
Info: create thread, and the thread id is: 139897991870208
Info: create thread, and the thread id is: 139897966692096
Info: create thread, and the thread id is: 139897958299392
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Info: create thread, and the thread id is: 139897949906688
Info: create thread, and the thread id is: 139897983477504
Info: create thread, and the thread id is: 139897975084800
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Info: thread 139898067404544 is working on task 4
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Info: thread 139898168116992 is working on task 5
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Warning: no idle thread, please wait...
Info: thread 139898142938880 is working on task 6
Warning: no idle thread, please wait...
Info: thread 139898042226432 is working on task 7
Warning: no idle thread, please wait...
Info: thread 139897949906688 is working on task 8
Info: thread 139898184902400 is working on task 11
Info: thread 139898134546176 is working on task 13
Info: thread 139898017048320 is working on task 14
Info: thread 139898008655616 is working on task 16
Info: thread 139898193295104 is working on task 18
Info: thread 139898000262912 is working on task 20
Info: thread 139898100975360 is working on task 21
Info: thread 139897983477504 is working on task 9
Info: thread 139897975084800 is working on task 10
Info: thread 139898092582656 is working on task 26
Info: thread 139898050619136 is working on task 24
Info: thread 139897991870208 is working on task 28
Info: thread 139898025441024 is working on task 12
Info: thread 139898084189952 is working on task 15
Info: thread 139898109368064 is working on task 17
Info: thread 139897966692096 is working on task 25
Info: thread 139898075797248 is working on task 19
Info: thread 139898059011840 is working on task 22
Info: thread 139897958299392 is working on task 27
Info: thread 139898033833728 is working on task 29
Info: thread 139898126153472 is working on task 23
Info: thread 139898176509696 is working on task 30
Info: thread 139898117760768 is working on task 31
Info: thread 139898159724288 is working on task 32
Info: thread 139898151331584 is working on task 33
Info: thread 139898067404544 is working on task 34
Info: thread 139898168116992 is working on task 35
Info: thread 139898142938880 is working on task 36
Info: thread 139898042226432 is working on task 37
Info: thread 139897949906688 is working on task 38
Info: thread 139898184902400 is working on task 39
Info: thread 139898000262912 is working on task 40
Info: thread 139898017048320 is working on task 41
Info: thread 139898008655616 is working on task 42
Info: thread 139898134546176 is working on task 43
Info: thread 139898193295104 is working on task 44
Info: thread 139898050619136 is working on task 49
Info: thread 139897991870208 is working on task 50
Info: thread 139898025441024 is working on task 51
Info: thread 139898084189952 is working on task 52
Info: thread 139898109368064 is working on task 53
Info: thread 139898075797248 is working on task 54
Info: thread 139897975084800 is working on task 48
Info: thread 139898100975360 is working on task 45
Info: thread 139898033833728 is working on task 57
Info: thread 139897983477504 is working on task 47
Info: thread 139897966692096 is working on task 55
Info: thread 139897958299392 is working on task 56
Info: thread 139898092582656 is working on task 46
Info: thread 139898126153472 is working on task 58
Info: thread 139898059011840 is working on task 59
Info: thread 139898176509696 is working on task 60
Info: thread 139898117760768 is working on task 61
Info: thread 139898159724288 is working on task 62
Info: thread 139898151331584 is working on task 63
Info: thread 139898067404544 is working on task 64
Info: thread 139898168116992 is working on task 65
Info: thread 139898142938880 is working on task 66
Info: thread 139898042226432 is working on task 67
Info: thread 139897949906688 is working on task 69
Info: thread 139898184902400 is working on task 68
Info: thread 139898000262912 is working on task 70
Info: thread 139898008655616 is working on task 71
Info: thread 139898017048320 is working on task 72
Info: thread 139898050619136 is working on task 73
Info: thread 139898134546176 is working on task 74
Info: thread 139898109368064 is working on task 78
Info: thread 139898100975360 is working on task 80
Info: thread 139897975084800 is working on task 82
Info: thread 139898075797248 is working on task 81
Info: thread 139897966692096 is working on task 85
Info: thread 139897958299392 is working on task 86
Info: thread 139898126153472 is working on task 88
Info: thread 139898059011840 is working on task 89
Info: thread 139898092582656 is working on task 87
Info: thread 139898025441024 is working on task 76
Info: thread 139898084189952 is working on task 77
Info: thread 139898193295104 is working on task 75
Info: thread 139897991870208 is working on task 79
Info: thread 139898033833728 is working on task 83
Info: thread 139897983477504 is working on task 84
Info: thread 139898176509696 is working on task 90
Info: thread 139898117760768 is working on task 91
Info: thread 139898159724288 is working on task 92
Info: thread 139898151331584 is working on task 93
Info: thread 139898067404544 is working on task 94
Info: thread 139898168116992 is working on task 95
Info: thread 139898142938880 is working on task 96
Info: thread 139898042226432 is working on task 97
Info: thread 139897949906688 is working on task 98
Info: thread 139898184902400 is working on task 99
Info: thread 139898000262912 quit
Info: thread 139898008655616 quit
Info: thread 139898017048320 quit
Info: thread 139898050619136 quit
Info: thread 139898134546176 quit
Info: thread 139898109368064 quit
Info: thread 139898100975360 quit
Info: thread 139897966692096 quit
Info: thread 139898126153472 quit
Info: thread 139897975084800 quit
Info: thread 139898075797248 quit
Info: thread 139897958299392 quit
Info: thread 139898059011840 quit
Info: thread 139898092582656 quit
Info: thread 139898025441024 quit
Info: thread 139898084189952 quit
Info: thread 139898193295104 quit
Info: thread 139897991870208 quit
Info: thread 139898033833728 quit
Info: thread 139897983477504 quit
Info: thread 139898176509696 quit
Info: thread 139898117760768 quit
Info: thread 139898159724288 quit
Info: thread 139898151331584 quit
Info: thread 139898067404544 quit
Info: thread 139898168116992 quit
Info: thread 139898142938880 quit
Info: thread 139898042226432 quit
Info: thread 139897949906688 quit
Info: thread 139898184902400 quit</pre>

C++03特性的ThreadPool

1. 基于条件变量的线程池

threadpool.h:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n76" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#ifndef THREAD_POOL_H

define THREAD_POOL_H

include <stdio.h>

include <pthread.h>

include <functional>

include <vector>

include <queue>

class ThreadPool {
public:
typedef void (WrokerFunc)(void arg);

struct Task {
    WrokerFunc* run;
    void* arg;
};

explicit ThreadPool(int thread_num);
~ThreadPool();
void addTask(WrokerFunc* func, void* arg);

private:
std::queue<Task*> task_queue_;
std::vector<pthread_t> thread_list_;
bool is_running_; // note: is_running_不用原子变量或者锁操作可能存在卡死问题
pthread_mutex_t mutex_;
pthread_cond_t condition_;

static void* thread_routine(void* pool_ptr);
void thread_worker();

};

// =========================implementation=========================
inline ThreadPool::ThreadPool(int thread_num) : is_running_(true) {
pthread_mutex_init(&mutex_, NULL);
pthread_cond_init(&condition_, NULL);

for (int i = 0; i < thread_num; i++) {
    pthread_t pid;
    pthread_create(&pid, NULL, thread_routine, this);
    thread_list_.push_back(pid);
}

}

inline ThreadPool::~ThreadPool() {
pthread_mutex_lock(&mutex_);
is_running_ = false;
pthread_mutex_unlock(&mutex_);

pthread_cond_broadcast(&condition_);  // wakeup all threads that block to get task
for (int i = 0; i < thread_list_.size(); i++) {
    pthread_join(thread_list_[i], NULL);
}

pthread_cond_destroy(&condition_);
pthread_mutex_destroy(&mutex_);

}

inline void ThreadPool::addTask(WrokerFunc* func, void* arg) {
Task* task = new Task();
task->run = func;
task->arg = arg;

pthread_mutex_lock(&mutex_);
task_queue_.push(task);
pthread_mutex_unlock(&mutex_);
pthread_cond_signal(&condition_);

}

inline void* ThreadPool::thread_routine(void* pool_ptr) {
ThreadPool* pool = static_cast<ThreadPool*>(pool_ptr);
pool->thread_worker();
}

inline void ThreadPool::thread_worker() {
Task* task = NULL;

while (true) {
    pthread_mutex_lock(&mutex_);
    if (!is_running_) {
        pthread_mutex_unlock(&mutex_);
        break;
    }

    if (task_queue_.empty()) {
        pthread_cond_wait(&condition_, &mutex_);  // 获取不到任务时阻塞, 直到有新的任务入队
        if (task_queue_.empty()) {
            pthread_mutex_unlock(&mutex_);
            continue;
        }
    }
    task = task_queue_.front();
    task_queue_.pop();
    pthread_mutex_unlock(&mutex_);

    (*(task->run))(task->arg);
    delete task;
    task = NULL;
}

// 线程池终止时(is_running_ = false)确保任务队列为空后退出
while (true) {
    pthread_mutex_lock(&mutex_);
    if (task_queue_.empty()) {
        pthread_mutex_unlock(&mutex_);
        break;
    }
    task = task_queue_.front();
    task_queue_.pop();
    pthread_mutex_unlock(&mutex_);
    delete task;
    task = NULL;
}

printf("Info: thread[%lu] exit\n", pthread_self());

}

endif // THREAD_POOL_H</pre>

测试代码threadpool_test.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n78" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <stdio.h>

include <unistd.h>

include <stdlib.h>

include <pthread.h>

include <vector>

include "threadpool.h"

void* MyTaskFunc(void* arg) {
int* i = static_cast<int*>(arg);
printf("[MyTaskFunc]: thread[%lu] is working on %d\n", pthread_self(), *i);
return NULL;
}

int main() {
ThreadPool pool(10);

for (int i = 0; i < 100; i++) {
    int* arg = new int(i);
    pool.addTask(&MyTaskFunc, arg);
}

return 0;

}</pre>

编译运行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n80" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g threadpool_test.cpp -o threadpool_test -lpthread./threadpool_test
[MyTaskFunc]: thread[140224777099008] is working on 0
[MyTaskFunc]: thread[140224793884416] is working on 8
[MyTaskFunc]: thread[140224844240640] is working on 2
Info: thread[140224844240640] exit
[MyTaskFunc]: thread[140224827455232] is working on 4
Info: thread[140224827455232] exit
[MyTaskFunc]: thread[140224810669824] is working on 6
Info: thread[140224810669824] exit
[MyTaskFunc]: thread[140224777099008] is working on 10
Info: thread[140224777099008] exit
[MyTaskFunc]: thread[140224852633344] is working on 1
Info: thread[140224852633344] exit
[MyTaskFunc]: thread[140224835847936] is working on 3
Info: thread[140224835847936] exit
[MyTaskFunc]: thread[140224802277120] is working on 7
Info: thread[140224802277120] exit
Info: thread[140224793884416] exit
[MyTaskFunc]: thread[140224819062528] is working on 5
Info: thread[140224819062528] exit
[MyTaskFunc]: thread[140224785491712] is working on 9
Info: thread[140224785491712] exit</pre>

2. 基于信号量的同步任务队列

上述的线程池无法很好地支持同步任务,因此我们基于信号量实现了SyncTaskQueue。

sync_task_queue.h:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n84" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#ifndef SYNC_TASK_QUEUE_H

define SYNC_TASK_QUEUE_H

include <semaphore.h>

include <vector>

include "threadpool.h"

class SyncTaskQueue {
public:
struct SyncTask {
ThreadPool::Task task;
sem_t* sem;
};

explicit SyncTaskQueue(ThreadPool* pool_ptr);
~SyncTaskQueue();
void addTask(ThreadPool::WrokerFunc* func, void* sync_task_ptr);
void wait();

private:
ThreadPool* threadpool_;
sem_t sem_;
int sync_task_num_; // 务必保证单线程读写, 否则需要加锁

static ThreadPool::WrokerFunc workerFuncWrapper;

};

inline SyncTaskQueue::SyncTaskQueue(ThreadPool* pool_ptr) : threadpool_(pool_ptr), sync_task_num_(0) {
sem_init(&sem_, 0, 0);
}

inline SyncTaskQueue::~SyncTaskQueue() {
// make sure that all task has been finished before destory
if (sync_task_num_ > 0) {
wait();
}
sem_destroy(&sem_);
}

inline void SyncTaskQueue::addTask(ThreadPool::WrokerFunc* func, void* arg) {
sync_task_num_++;

// wrapper the worker function with sem
SyncTask* sync_task = new SyncTask();
sync_task->sem = &(this->sem_);
sync_task->task.run = func;
sync_task->task.arg = arg;
threadpool_->addTask(&workerFuncWrapper, sync_task);

}

inline void SyncTaskQueue::wait() {
while (sync_task_num_) {
int sem_value = 0;
sem_wait(&sem_);
sync_task_num_--;
}
}

inline void* SyncTaskQueue::workerFuncWrapper(void* sync_task_ptr) {
SyncTask* sync_task = static_cast<SyncTask>(sync_task_ptr);
(
(sync_task->task.run))(sync_task->task.arg);
sem_post(sync_task->sem);
delete sync_task;
}

endif // SYNC_TASK_QUEUE_H</pre>

测试文件sync_task_queue_test.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n86" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include "sync_task_queue.h"

void* MyTaskFunc(void* arg) {
int i = static_cast<int>(arg);
printf("[MyTaskFunc]: thread[%lu] is working on %d\n", pthread_self(), i);
sleep(2);
return NULL;
}

int main() {
ThreadPool threadpool(20);
SyncTaskQueue sync_task_queue(&threadpool);

for (int i = 0; i < 15; i++) {
    int* arg = new int(i);
    sync_task_queue.addTask(&MyTaskFunc, arg);
}

printf("====================================wait for result===================================\n");
sync_task_queue.wait();

}</pre>

编译运行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n88" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g sync_task_queue_test.cpp -o sync_task_queue_test -lpthread./sync_task_queue_test
[MyTaskFunc]: thread[140349199148800] is working on 0
[MyTaskFunc]: thread[140349266290432] is working on 12
[MyTaskFunc]: thread[140349358610176] is working on 2
[MyTaskFunc]: thread[140349341824768] is working on 3
[MyTaskFunc]: thread[140349333432064] is working on 4
[MyTaskFunc]: thread[140349325039360] is working on 5
[MyTaskFunc]: thread[140349308253952] is working on 6
[MyTaskFunc]: thread[140349299861248] is working on 8
[MyTaskFunc]: thread[140349316646656] is working on 7
[MyTaskFunc]: thread[140349283075840] is working on 9
[MyTaskFunc]: thread[140349291468544] is working on 10
[MyTaskFunc]: thread[140349274683136] is working on 11
[MyTaskFunc]: thread[140349257897728] is working on 13
[MyTaskFunc]: thread[140349249505024] is working on 14
[MyTaskFunc]: thread[140349350217472] is working on 1
====================================wait for result===================================
Info: thread[140349232719616] exit
Info: thread[140349241112320] exit
Info: thread[140349266290432] exit
Info: thread[140349207541504] exit
Info: thread[140349299861248] exit
Info: thread[140349283075840] exit
Info: thread[140349291468544] exit
Info: thread[140349249505024] exit
Info: thread[140349274683136] exit
Info: thread[140349333432064] exit
Info: thread[140349325039360] exit
Info: thread[140349308253952] exit
Info: thread[140349224326912] exit
Info: thread[140349316646656] exit
Info: thread[140349199148800] exit
Info: thread[140349350217472] exit
Info: thread[140349257897728] exit
Info: thread[140349215934208] exit
Info: thread[140349358610176] exit
Info: thread[140349341824768] exit</pre>

C++11特性的ThreadPool

传统C++线程池仅能接受特殊的Task(执行函数需要满足特殊的格式),使用C++11特性的线程池可以更好地支持任意类型参数的Task。

1. 代码

threadpool.h:

参考自:https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h

另外一种实现:https://github.com/mtrebi/thread-pool/blob/master/include/ThreadPool.h

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n96" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#ifndef THREAD_POOL_H

define THREAD_POOL_H

include <vector>

include <queue>

include <memory>

include <thread>

include <mutex>

include <condition_variable>

include <future>

include <utility>

include <functional>

include <stdexcept>

class ThreadPool {
public:
explicit ThreadPool(size_t);
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();

private:
// need to keep track of threads so we can join them
std::vector<std::thread> workers_;
// the task queue
std::queue<std::function<void()>> tasks_;

// synchronization
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;

};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads) : stop_(false) {
for (size_t i = 0; i < threads; ++i) {
workers_.emplace_back(
[this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex_);
this->condition_.wait(lock,
[this] { return this->stop_ || !this->tasks_.empty(); });
if (this->stop_ && this->tasks_.empty()) {
return;
}
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
task();
}
});
}
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared< std::packaged_task<return_type()>> (
        std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
    std::unique_lock<std::mutex> lock(queue_mutex_);

    // don't allow enqueueing after stopping the pool
    if (stop_) {
        throw std::runtime_error("enqueue on stopped ThreadPool");
    }

    tasks_.emplace([task](){ (*task)(); });
}
condition_.notify_one();
return res;

}

// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread &worker : workers_) {
worker.join();
}
}

endif // THREAD_POOL_H</pre>

test.cpp:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n98" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include "threadpool.h"

void mytask(int i) {
printf("Info: thread %ld is working on task %d\n", (u_int64_t)pthread_self(), i);
sleep(1);
return;
}

int main() {
ThreadPool threadpool(20);
for (int i = 0; i < 100; ++i) {
threadpool.enqueue(mytask, i);
}
return 0;
}</pre>

编译运行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n100" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g test.cpp -o test -std=c++11 -lpthread./test
Info: thread 139679726323456 is working on task 1
Info: thread 139679709538048 is working on task 3
Info: thread 139679717930752 is working on task 2
Info: thread 139679734716160 is working on task 0
Info: thread 139679600432896 is working on task 15
Info: thread 139679684359936 is working on task 6
Info: thread 139679617218304 is working on task 14
Info: thread 139679634003712 is working on task 12
Info: thread 139679667574528 is working on task 8
Info: thread 139679625611008 is working on task 13
Info: thread 139679575254784 is working on task 19
Info: thread 139679659181824 is working on task 9
Info: thread 139679701145344 is working on task 4
Info: thread 139679692752640 is working on task 5
Info: thread 139679592040192 is working on task 17
Info: thread 139679583647488 is working on task 18
Info: thread 139679675967232 is working on task 7
Info: thread 139679642396416 is working on task 11
Info: thread 139679608825600 is working on task 16
Info: thread 139679650789120 is working on task 10
Info: thread 139679684359936 is working on task 21
Info: thread 139679617218304 is working on task 24
Info: thread 139679600432896 is working on task 28
Info: thread 139679709538048 is working on task 23
Info: thread 139679659181824 is working on task 30
Info: thread 139679692752640 is working on task 32
Info: thread 139679583647488 is working on task 34
Info: thread 139679608825600 is working on task 35
Info: thread 139679592040192 is working on task 33
Info: thread 139679634003712 is working on task 25
Info: thread 139679625611008 is working on task 29
Info: thread 139679726323456 is working on task 20
Info: thread 139679701145344 is working on task 31
Info: thread 139679650789120 is working on task 36
Info: thread 139679667574528 is working on task 27
Info: thread 139679575254784 is working on task 37
Info: thread 139679734716160 is working on task 26
Info: thread 139679675967232 is working on task 38
Info: thread 139679717930752 is working on task 22
Info: thread 139679642396416 is working on task 39
Info: thread 139679684359936 is working on task 40
Info: thread 139679692752640 is working on task 45
Info: thread 139679625611008 is working on task 51
Info: thread 139679583647488 is working on task 43
Info: thread 139679659181824 is working on task 44
Info: thread 139679575254784 is working on task 55
Info: thread 139679592040192 is working on task 47
Info: thread 139679617218304 is working on task 41
Info: thread 139679717930752 is working on task 57
Info: thread 139679726323456 is working on task 49
Info: thread 139679634003712 is working on task 50
Info: thread 139679650789120 is working on task 52
Info: thread 139679675967232 is working on task 59
Info: thread 139679667574528 is working on task 54
Info: thread 139679608825600 is working on task 46
Info: thread 139679734716160 is working on task 56
Info: thread 139679600432896 is working on task 48
Info: thread 139679642396416 is working on task 58
Info: thread 139679709538048 is working on task 42
Info: thread 139679701145344 is working on task 53
Info: thread 139679684359936 is working on task 60
Info: thread 139679625611008 is working on task 62
Info: thread 139679692752640 is working on task 61
Info: thread 139679583647488 is working on task 63
Info: thread 139679659181824 is working on task 64
Info: thread 139679575254784 is working on task 65
Info: thread 139679592040192 is working on task 66
Info: thread 139679617218304 is working on task 67
Info: thread 139679717930752 is working on task 68
Info: thread 139679726323456 is working on task 69
Info: thread 139679650789120 is working on task 71
Info: thread 139679634003712 is working on task 70
Info: thread 139679675967232 is working on task 72
Info: thread 139679667574528 is working on task 73
Info: thread 139679608825600 is working on task 74
Info: thread 139679734716160 is working on task 75
Info: thread 139679642396416 is working on task 77
Info: thread 139679709538048 is working on task 78
Info: thread 139679701145344 is working on task 79
Info: thread 139679600432896 is working on task 76
Info: thread 139679684359936 is working on task 80
Info: thread 139679625611008 is working on task 81
Info: thread 139679583647488 is working on task 83
Info: thread 139679692752640 is working on task 82
Info: thread 139679659181824 is working on task 84
Info: thread 139679575254784 is working on task 85
Info: thread 139679717930752 is working on task 88
Info: thread 139679617218304 is working on task 87
Info: thread 139679592040192 is working on task 86
Info: thread 139679634003712 is working on task 91
Info: thread 139679650789120 is working on task 90
Info: thread 139679667574528 is working on task 93
Info: thread 139679675967232 is working on task 92
Info: thread 139679608825600 is working on task 94
Info: thread 139679726323456 is working on task 89
Info: thread 139679734716160 is working on task 95
Info: thread 139679709538048 is working on task 96
Info: thread 139679642396416 is working on task 97
Info: thread 139679701145344 is working on task 98
Info: thread 139679600432896 is working on task 99</pre>

2. 使用方式

2.1 全局线程池 + 异步任务

创建一个ThreadPool的全局变量,将所有需要异步执行的任务丢到该线程池中即可:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n104" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include "threadpool.h"

// 全局异步线程池
ThreadPool g_threadpool2(20);

int main() {
// 执行异步任务
g_threadpool2.enqueue(
[] {
sleep(1);
printf("async task done\n");
});
return 0;
}</pre>

编译运行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n106" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g test.cpp -o test -std=c++11 -lpthread./test
async task done</pre>

2.2 全局线程池 + 同步任务

创建一个ThreadPool的全局变量并添加同步任务,通过std::futurewait()方法阻塞等待同步结果,也可以使用get()方法获取到函数返回值。

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="c++" cid="n109" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include <memory>

include "threadpool.h"

// 全局异步线程池
ThreadPool g_threadpool2(20);

int main() {
// 创建同步任务
auto res = g_threadpool2.enqueue(
[] {
sleep(1);
printf("sync task done\n");
});

// 阻塞等待同步结果
res.wait();

return 0;

}</pre>

编译运行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n111" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g test.cpp -o test -std=c++11 -lpthread./test
sync task done</pre>

2.3 局部线程池实现并发同步

创建一个临时ThreadPool,利用其析构函数完成并发同步任务:

需要注意的是,这种用法已经脱离了线程池的初衷(避免处理短时间任务时创建与销毁线程的代价),它的主要用途是实现「多线程并发」,常用于并发多个IO请求并等待同步结果。

考虑这个场景:代码中仅在某种特殊场景(极少触发)下需要并发请求多个http链接,一方面我们不希望这些请求影响到进程的业务线程池,另一方面我们又不想单独为这个场景创建一个全局线程池使其大部分时间都在空跑。

2.3这种用法解决了我们「临时创建线程+执行并行任务+销毁线程」的局部并发问题,避免我们直接在用户代码处直接创建线程。

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n118" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">#include <unistd.h>

include <memory>

include "threadpool.h"

int main() {
// 创建并发度为5的局部线程池
std::shared_ptr<ThreadPool> threadpool = std::make_shared<ThreadPool>(5);

// 创建30个异步任务
for (int i = 0; i < 30; i++) {
    threadpool->enqueue(
        [i] {
            sleep(1);
            printf("Info: thread %ld is working on task %d\n", (u_int64_t)pthread_self(), i);
        });
}

// 阻塞直至获取同步结果
threadpool.reset();

return 0;

}</pre>

编译运行:

<pre class="md-fences mock-cm md-end-block" spellcheck="false" lang="bash" cid="n120" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--font-monospace); font-size: 0.85rem; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248) !important; position: relative !important; width: inherit; border: 1px solid rgb(244, 244, 244); -webkit-font-smoothing: initial; line-height: 1.43rem; border-radius: 2px; overflow-wrap: normal; margin: 0.8rem 0px !important; padding: 0.3rem 0px !important; color: rgb(52, 73, 94); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;">g++ -g test.cpp -o test -std=c++11 -lpthread./test
Info: thread 139811129124608 is working on task 4
Info: thread 139811145910016 is working on task 2
Info: thread 139811137517312 is working on task 3
Info: thread 139811162695424 is working on task 0
Info: thread 139811154302720 is working on task 1
Info: thread 139811129124608 is working on task 5
Info: thread 139811137517312 is working on task 7
Info: thread 139811145910016 is working on task 6
Info: thread 139811162695424 is working on task 8
Info: thread 139811154302720 is working on task 9
Info: thread 139811129124608 is working on task 10
Info: thread 139811137517312 is working on task 11
Info: thread 139811162695424 is working on task 13
Info: thread 139811154302720 is working on task 14
Info: thread 139811145910016 is working on task 12
Info: thread 139811129124608 is working on task 15
Info: thread 139811137517312 is working on task 18
Info: thread 139811145910016 is working on task 19
Info: thread 139811162695424 is working on task 16
Info: thread 139811154302720 is working on task 17
Info: thread 139811129124608 is working on task 21
Info: thread 139811162695424 is working on task 23
Info: thread 139811154302720 is working on task 24
Info: thread 139811145910016 is working on task 22
Info: thread 139811137517312 is working on task 20
Info: thread 139811162695424 is working on task 25
Info: thread 139811154302720 is working on task 26
Info: thread 139811129124608 is working on task 27
Info: thread 139811137517312 is working on task 29
Info: thread 139811145910016 is working on task 28</pre>

Reference

[1] https://www.cnblogs.com/ailumiyana/p/10016965.html

[2] https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h

[3] https://blog.csdn.net/qq_34771252/article/details/90319537

[4] https://github.com/lizhenghn123/zl_threadpool/tree/master/ThreadPoolCpp03

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342

推荐阅读更多精彩内容