基于ucontext.h的轻量级协程库

本文主要是对自己学习协程并实现轻量级协程过程的一个记录, 语言略显啰嗦, 各位见谅. 水平有限, 如有疏漏, 欢迎各位指正.

一 了解协程

  • 协程可以理解为一种用户态的轻量级线程, 切换由用户定义
  • 协程上下文切换很快, 因为不会陷入内核态
  • 协程拥有自己的寄存器上下文和栈, 协程调度切换时,将寄存器上下文和栈保存到其他地方,在切换回来的时候,恢复先前保存的寄存器上下文和栈

优点

  • 协程具有极高的执行效率 因为子程序切换不是线程切换,是由程序自身控制,因此协程没有线程切换的开销, 多线程的线程数量越多,协程的性能优势就越明显
  • 访问共享资源不需要多线程的锁机制, 因为只有一个线程, 也不存在同时写变量冲突, 所以在协程中控制共享资源无需加锁, 只需要判断状态就好了,执行效率比多线程高很多, 而且代码编写难度也可以相应降低
  • 以同步代码的方式写异步逻辑

缺点

  • 无法利用多核资源, 除非和多进程配合


二 了解ucontext

  • ucontext组件是什么
    • 头文件<ucontext.h>定义了两个数据结构, mcontext_t(暂时用不到)和ucontext_t和四个函数, 可以被用来实现一个进程中的用户级线程(协程)切换

数据结构

  • ucontext_t 结构体至少拥有如下几个域
    typedef struct ucontext {
        struct ucontext *uc_link;
        sigset_t uc_sigmask;
        stack_t uc_stack;
        mcontext_t uc_mcontext;
        ...
    } ucontext_t;
    
    • uc_link指向后继上下文, 当前上下文运行终止时系统会恢复指向的上下文
    • uc_sigmask为该上下文中的阻塞信号集合
    • uc_stack为该上下文中使用的栈
    • uc_mcontex保存上下文的特定机器, 包括调用线程的特定寄存器等等
    • 简而言之这个数据结构是用来保存上下文的

函数

  1. int getcontext(ucontext_t * ucp);

    • 获取当前上下文, 初始化ucp结构体, 将当前上下文保存到ucp中
    • 成功返回0; 失败返回-1, 并设置errno
  2. void makecontext(ucontext_t *ucp, void(*func)(), int argc, ...);

    • 创建一个目标上下文 创建方式: (1) getcontext, (2) 指定分配给上下文的栈uc_stack.ss_sp, (3) 指定这块栈的大小uc_stack.ss_size, (4) 指定uc_stack.ss_flags, (5) 指定后继上下文uc_link
    • 协程运行时使用主协程划分的栈空间,而协程切回主线程时需要将该部分栈空间的内容copy到每个协程各自的一个空间缓存起来,因为主协程中划分的栈空间并不是只用于一个协程,而是会用于多个协程
    • makecontext可以修改通过getcontext初始化得到的上下文, (必须先调用getcontext), 然后为ucp指定一个栈空间ucp->stack, 设置后继的上下文ucp->uc_link
    • 当上下文通过setcontext或者swapcontext激活后, 执行func函数(argc为后续的参数个数, 可变参数). 当func执行返回后, 继承的上下文被激活(ucp->uc_link), 如果为NULL, 则线程退出
    ucontext_t tar_ctx;
    ucontext_t next_ctx;
    char stack[100];
    getcontext(&tar_ctx);
    tar_ctx.uc_stack.ss_sp = stack;
    tar_ctx.uc_stack.ss_sp = sizeof(stack);
    tar_ctx.uc_stack.ss_flags = 0;
    tar_ctx.uc_link = &next_ctx;
    
  3. int setcontext(const ucontext_t *ucp)

    • 设置当前的上下文为ucp(激活ucp)
    • ucp来自getcontext, 那么上下文恢复至ucp
    • ucp来自makecontext, 那么将会调用makecontext函数的第二个参数指向的函数func, 如果func返回, 则恢复至ucp->uc_link指定的后继上下文, 如果该ucp中的uc_link为NULL, 那么线程退出
    • 成功不返回, 失败返回-1, 设置errno
  4. int swapcontext(ucontext_t *oucp, ucontext_t *ucp)

    • 切换上下文
    • 保存当前上下文至oucp, 激活ucp上下文(先执行makecontext指定的ucp入口函数, 而后会执行ucp->uc_link指向的后继上下文)
    • 成功不返回, 失败返回-1, 设置errno


三 轻量级协程实现

名词说明

  • 协程调度器 代码中的SingleSchedule
  • 用户协程 代码中的Coroutine
  • 栈空间/栈区 对应SingleSchedule中的成员stack
  • 栈缓存/缓存区 对应的是Coroutine中的成员Buffer
  • 主协程上下文 SingleSchedule中的main_ctx, 对应main函数中的上下文
  • 用户协程上下文 Coroutine中的ctx, 对应每个用户协程自身的上下文

思路

  • 本文基于ucontext.h实现协程库
  • 基本思路:
    • 构造一个协程调度类, 该类用于调度所有的用户协程, 提供一个协程池ctxPool, 使用单例模式实现.
    • 构造一个用户协程类, 该类对象对应每个用户协程, 提供一个用户协程逻辑虚函数CoProcess, 提供一个用户协程上下文ctx
    • 用户协程首次激活, 将会为其分配协程调度器提供的栈区stack
    • 用户协程被挂起, 那么会将该协程的栈信息栈区stack保存到其自身的缓存区buffer;
    • 用户协程被唤醒, 那么会将该用户协程的栈信息缓存区buffer中Reload至栈区
  • 协程库框架
    • 激活 初始化用户协程(指定协程状态RUNNING), 初始化用户协程上下文(指定协程栈空间stack, 指定后继上下文), 将协程加入协程池
    • 挂起 将栈空间stack对应的数据缓存至当前用户协程的栈缓存buffer中, 更改协程状态SUSPEND
    • 恢复 将用户协程栈缓存buffer中的数据reload进栈空间stack
      示意图

1 用户协程类 Coroutine

数据成员

  • 协程状态CoState state(FREE, RUNNING, SUSPEND)
  • 协程号int id(对应协程调度类中的协程池的id)
  • 栈缓存char * Buffer, 一个动态数组, 当协程被切出时, 缓存栈空间
  • 所需缓存区尺寸int stack_size
  • 用户协程栈容量尺寸int cap cap如果小于stack_size, 那么需要重新分配缓存区, 否则可以直接缓存
  • 用户协程上下文ucontext_t ctx

主要成员函数

  • 挂起协程函数void Yield();
    • 挂起当前协程, 并SaveStack栈空间, 切换状态至SUSPEND
  • 恢复协程函数void Resume()
    • 恢复该协程, 并ReloadStack栈空间
  • 缓存堆栈函数void SaveStack();
    • 调用时机是协程被切出, 会将协程调度对象中的堆栈缓存入用户协程自身的缓存区
  • 载入堆栈函数void ReloadStack();
    • 调用时机是协程被恢复时, 会将该用户协程的堆栈信息从缓存区回复到协程调度对象的堆栈中
  • 用户协程逻辑虚函数virtual void CoProcess();
    • 用于派生成员中定义业务逻辑

2 协程调度类

  • 单例

数据成员

  • 协程池std::map<int, Coroutine*> crtPool;
  • 主协程上下文ucontext_t main_ctx
  • 协程堆栈char stack[DEFAULT_STACK_SIZE], 所有的协程都利用这块区域

成员函数

  • 协程启动函数 void CoroutineNew(Coroutine * crt);
    • 初始化用户协程(指定协程状态RUNNING), 初始化用户协程上下文(指定协程栈空间stack, 指定后继上下文), 将协程加入协程池
  • 用户协程入口函数static void CoroutineEntry(void * crt);
    • 指向用户协程的入口函数
  • 协程恢复函数void Resume(int id);
    • 根据id恢复对应协程
  • 检查并清理协程池int HasCoroutine();
    • 清理FREE的协程, 并返回剩余的协程数量
  • 协程删除函数void Remove(int id);
    • 删除对应协程

注意点

  1. 所有的用户协程都使用调度器的栈空间, 每个用户协程自身的buffer只不过用来作缓存
  2. SaveStack和ReloadStack函数的实现需要注意, 如何缓存协程栈
    • 协程栈是由用户分配的, 如代码中stack数组, 由于该数组的目的是用作栈空间, 而进程中栈是预分配的, 即首先确定栈的高地址, 从高地址开始往低使用, 根据这一点我们可以确定需要被缓存的栈空间大小.
    char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();    // 获取到栈底, 即高地址
    char dumy = 0;                                                       // 最后创建的变量, 必然分配在栈顶
    assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);                     // 被栈缓存不能大于栈空间
    if (cap<stackBottom-&dumy){                                          // cap 代表当前栈缓存大小, 如果不够需要重分配
        if(buffer){                                                      // 释放当前栈缓存
            delete [] buffer;
        }
        cap = stackBottom-&dumy;
        buffer = new char[cap];
    }
    stack_size = stackBottom-&dumy;                                      // 栈空间大小
    memcpy(buffer, &dumy, stack_size);                                   // 缓存
    

代码实现

https://github.com/trioZwShen/my_code_rep/tree/master/My_Coroutine

1 用户协程

/**
 * @file    : Coroutine.h
 * @author  : neilzwshen
 * @time    : 2018-7-31
 * @version : 3.0
 * @remark  : 用户协程类
 */

#ifndef COROUTINE_H_
#define COROUTINE_H_
#define DEFAULT_STACK_SIZE (1024*1024)
#include <stdio.h>
#include <string.h>
#include <ucontext.h>


enum CoState {FREE = 0, RUNNING = 1, SUSPEND = 2};

class Coroutine
{
public:
    Coroutine();
    virtual ~Coroutine();

    /**
     * 用户协程入口函数
     */
    virtual void CoProcess();
    
    /**
     * 用户协程恢复函数
     */
    void Resume();

    /**
     * 获取协程id
     * @return [返回id]
     */
    int GetId()const {
        return id;
    }

    /**
     * 设置协程id
     */
    void SetId(int _id) {
        id = _id;
    }

    /**
     * 获取协程状态
     * @return [返回协程状态]
     */
    int GetState()const {
        return state;
    }

    /**
     * 设置协程状态
     */
    void SetState(CoState _state) {
        state = _state;
    }

protected:
    /**
     * 用户协程挂起函数
     */
    void Yield();

    /**
     * 堆栈缓存
     */
    void SaveStack();

    /**
     * 堆栈恢复
     */
    void ReloadStack();
    
public:
    char *buffer;       // 缓存协程堆栈
    ucontext_t ctx;

private:
    int stack_size;
    int cap;
    int id;
    CoState state;
};

#endif
#include <assert.h>
#include "Coroutine.h"
#include "Schedule.h"

Coroutine::Coroutine()
        :id(0),state(FREE),cap(0),stack_size(0),buffer(nullptr)
{

}

Coroutine::~Coroutine()
{
    delete [] buffer;
}

void Coroutine::CoProcess()
{

}

void Coroutine::Resume()
{
    if(state==SUSPEND){
        ReloadStack();
        state = RUNNING;
        swapcontext(&(SingleSchedule::GetInst()->mainCtx), &ctx);
    }
}

void Coroutine::Yield()
{
    if (state == RUNNING){
        SaveStack();
        state = SUSPEND;
        swapcontext(&ctx, &(SingleSchedule::GetInst()->mainCtx));
    }
}

void Coroutine::SaveStack()
{
    char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();
    char dumy = 0;

    assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);
    if (cap<stackBottom-&dumy){
        if(buffer){
            delete [] buffer;
        }
        cap = stackBottom-&dumy;
        buffer = new char[cap];
    }
    stack_size = stackBottom-&dumy;
    memcpy(buffer, &dumy, stack_size);
}

void Coroutine::ReloadStack()
{
    memcpy(SingleSchedule::GetInst()->GetStackBottom()-stack_size, buffer, stack_size);
}

2 单例模板

/**
 * @file    : Singleton.h
 * @author  : neilzwshen
 * @time    : 2018-7-30
 * @version : 1.0
 * @remark  : 单例模板, 只要将对象作为T, 就可以获取到一个单例对象, 构造函数不能传参
 */

#ifndef SINGLETON_H_
#define SINGLETON_H_

template<class T>
class Singleton {
public:
    /**
     * 单例获取
     * @return [返回T的单例对象]
     */
    static T* GetInst(){
        if (!flag_instance){
            flag_instance = new Singleton();
        }
        return &flag_instance->_instance;
    }

protected:
    /**
     * 单例构造
     */
    Singleton(){}

private:
    /**
     * T对象实例
     */
    T _instance;
    /**
     * 单例模板实例,
     */
    static Singleton<T> * flag_instance;
};

template<class T>
Singleton<T> * Singleton<T>::flag_instance = 0;

#endif

3 协程调度器

/**
 * @file    : Schedule.h
 * @author  : neilzwshen
 * @time    : 2018-7-31
 * @version : 3.0
 * @remark  : 协程调度类
 */

#ifndef SCHEDULE_H_
#define SCHEDULE_H_
#include <stdio.h>
#include <map>
#include <ucontext.h>
#include "Coroutine.h"
#include "Singleton.h"


typedef std::map<int, Coroutine*> CrtMap;

class Schedule
{
public:
    Schedule();
    virtual ~Schedule();

    /**
     * 用户协程入口函数
     */
    static void CoroutineEntry(void * crt);

    /**
     * 将协程crt加入协程池, 并开启
     * @param  crt [协程指针]
     */
    void CoroutineNew(Coroutine * crt);

    /**
     * 恢复用户协程
     * @param id [description]
     */
    void Resume(int id);

    /**
     * 判断协程池中是否还有未完成的协程, 并将已经终止的协程删除
     * @return [返回协程数]
     */
    int HasCoroutine();

    /**
     * 根据协程id删除协程
     * @param id [协程id]
     */
    void Remove(int id);

    /**
     * 获取到栈底
     * @return [返回栈底地址]
     */
    char* GetStackBottom(){
        return stack + DEFAULT_STACK_SIZE;
    }

public:
    ucontext_t mainCtx;
    char stack[DEFAULT_STACK_SIZE];     // 运行协程堆栈

private:
    CrtMap crtPool;
};

typedef Singleton<Schedule> SingleSchedule;

#endif

#include <assert.h>
#include "Schedule.h"

Schedule::Schedule()
{

}

Schedule::~Schedule()
{

}

void Schedule::CoroutineEntry(void * crt) {
    ((Coroutine *)crt)->SetState(RUNNING);
    ((Coroutine *)crt)->CoProcess();
    ((Coroutine *)crt)->SetState(FREE);
}

void Schedule::CoroutineNew(Coroutine * crt) {
    
    int id = crt->GetId();
    CoState state = CoState(crt->GetState());
    assert(id != 0);
    assert(state == FREE);
    //printf("--%d,%d--\n",id, state);

    if (crtPool[id] != nullptr) {
        CrtMap::iterator it = crtPool.find(id);
        crtPool.erase(it);
    }
    
    // 构建用户协程上下文
    getcontext(&(crt->ctx));
    //memset(stack, 0, DEFAULT_STACK_SIZE);
    crt->ctx.uc_stack.ss_sp = stack;
    crt->ctx.uc_stack.ss_size = DEFAULT_STACK_SIZE;
    crt->ctx.uc_stack.ss_flags = 0;
    crt->ctx.uc_link = &mainCtx;
    crtPool[id] = crt;
    
    makecontext(&crt->ctx, (void(*)(void))CoroutineEntry, 1, (void *)crt);
    swapcontext(&mainCtx, &crt->ctx);
}

void Schedule::Resume(int id){
    if (crtPool[id] != nullptr) {
        crtPool[id]->Resume();
    }
}

int Schedule::HasCoroutine() {
    int count = 0;
    CrtMap::iterator it;
    for (it = crtPool.begin(); it != crtPool.end(); it++) {
        if (it->second->GetState() != FREE) {
            count++;
        }else{
            it=crtPool.erase(it);
            it--;
        }
    }
    return count;
}

void Schedule::Remove(int id) {
    if (crtPool[id] != nullptr) {
        crtPool.erase(crtPool.find(id));
    }
}

4 示例

#include <stdio.h>
#include <memory>
#include "Coroutine.h"
#include "Schedule.h"


class Logic1 : public Coroutine{
    void CoProcess(){
        puts("1");
        Yield();
        puts("4");
        Yield();
        puts("7");
    }
};

class Logic2 : public Coroutine{
    void CoProcess(){
        puts("2");
        Yield();
        puts("5");
        Yield();
        puts("8");
    }
};

class Logic3 : public Coroutine{
    void CoProcess(){
        puts("3");
        Yield();
        puts("6");
        Yield();
        puts("9");
    }
};

int main() {

    std::shared_ptr<Coroutine> ct1(new Logic1());
    std::shared_ptr<Coroutine> ct2(new Logic2());
    std::shared_ptr<Coroutine> ct3(new Logic3());

    ct1->SetId(1);
    ct2->SetId(2);
    ct3->SetId(3);

    SingleSchedule::GetInst()->CoroutineNew(ct1.get());
    SingleSchedule::GetInst()->CoroutineNew(ct2.get());
    SingleSchedule::GetInst()->CoroutineNew(ct3.get());

    SingleSchedule::GetInst()->Resume(1);
    SingleSchedule::GetInst()->Resume(2);
    SingleSchedule::GetInst()->Resume(3);
    SingleSchedule::GetInst()->Resume(1);
    SingleSchedule::GetInst()->Resume(2);
    SingleSchedule::GetInst()->Resume(3);


    //SingleSchedule::GetInst()->Remove(1);
    //SingleSchedule::GetInst()->Remove(2);
    //SingleSchedule::GetInst()->Remove(3);

    int count = SingleSchedule::GetInst()->HasCoroutine();
    printf("%d\n", count);

    return 0;
}

5 执行结果

szw@szw-VirtualBox:~/code/coroutine/temp/My_Coroutine_3_0$ ./main 
1
2
3
4
5
6
7
8
9
0
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容