libprocess原理&Actor模型

一.Actor模型

1.并发通信方式

一般来说,并发线程中通信方式有两种:

  • 共享数据:常见的基于锁的多线程编程;
  • 消息传递:常见的Actor模式

2.Actor介绍

Actor是通用的并发编程模型,并非某个语言或框架特有。典型的是erlang,从语言层面上支持Actor模型。

  • 面向对象=数据+行为
  • Actor模型=数据+行为+消息。


    WX20201124-225434@2x.png

    特性:

  • 无锁:每个Actor在同一时间处理最多一个消息,可以发送消息给其他Actor,保证了单独写原则。从而巧妙避免了多线程写争夺。
  • 异步:Actor之间通信方式是异步的,这是Actor实现异步的基础。每个Actor都有一个MailBox用来接受消息,Actor按照接受消息的顺序,逐条处理。
    这样的设计主要优势就是解耦了Actor,数万个Actor并发的运行,每个actor都以自己的步调运行,且发送消息,接收消息都不会被阻塞。
  • 隔离:Actor模型内部的状态由它自己维护,即它内部数据只能由它自己修改(通过消息传递来进行状态修改)。不同Actor处于物理隔离状态。
  • 位置透明:由于Actor模型基于消息传递方式,每个Actor实例的位置透明,无论Actor地址是在本地还是在远程机器上对于代码来说都是一样的。
  • 容错:传统编程是防御式编程,在可能出错的地方加异常处理。Actor模型遵循:任其崩溃哲学理念,让Actor的管理者去处理这些崩溃问题。每个Actor的崩溃或者异常都反馈到管理者处,由管理者决定处理方式。

3.Actor做什么

当一个actor接收到消息后,它能做如下三件事中的一件:

  • Create more actors; 创建其他actors
  • Send messages to other actors; 向其他actors发送消息
  • Designates what to do with the next message. 指定下一条消息到来的行为
    「指定下一条消息来到做什么」意味着可以定义下条消息来到时的状态。更清楚地说,就是actors如何修改状态。
    例子:想有一个actor像计算器,它的初始状态是数字0。当这个actor接收到add(1)消息时,它并不改变它原本的状态,而是指定当它接收到下一个消息时,状态会变为1。

二.Libprocess

每个Actor(Process)占用一个线程。不同Actor,可能处于同一个进程内,也可以处于不同进程。调用方式:

  • 线程内函数调用:A-->B 或者 A.dispacher(self())向自己写event方式。
  • 相同进程,不同Actor函数调用:A.disptcher(B.id, message),A向B发送一个event事件.
  • 不同进程,不同Actor函数调用:A.Send(B.id, message),A向B发送一个event事件。

1.Libprocess中概念介绍

Libprocess是基于Actor模型实现的库,提供基于actor style的并行编程框架。通过使用libprocess,异步并行变成变得相对简单。

  • process:与OS的进程process概念不同,libprocess中的process表示一个actor。具体来说:是一个class。

libprocess的基础是:event visitor和event list,其继承关系可表述为:


深度截图_选择区域_20201130162329.png

process的基类是event_visitor,其内部有个属性:event_list,process的工作实际是visit这个list中的events并执行。
但是process本身不会主动去visitor各个events,实际上各个Actor仍然运行在实体Thread上。在libprocess库中,全局静态变量类process_manager(声明在:process.h中,在process.cpp中定义)将负责启动线程并运行Actor.

运行流程总结如下:

  • 创建process对象;
    调用processBase()构造函数-->process::initialize()函数,访问全局变量process_manager。process_manager负责记录管理Actor信息。
//如果process::initialize()从未被调用过,则创建至少8个线程或者等于CPU数目的线程
long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLY))
for (int i = 0; i < cpus, i++) {
  pthread_t thread; //for now , not saving handles on our threads
  if (phtread_create(&thread, null, schedule, null) != 0) {
  LOG(FATAL) << "Failed to initialize, pthread_create";
}
}

声明process之后,实体线程运行的schedule函数取出运行队列中process并运行。

//调度器不断查看process_manager队列中是否有process,如果有,恢复并启动process.
void* schedule(void* arg)
{
  do{
      ProcessBase* process = process_manager->dequeue();
      if (process == NULL) {

      }
      process_manager->resume(process);
  }
}

所以process声明之后,还需要写入相应线程的运行队列中,才能够开始运行。
将process写入运行队列需要通过spawn(process)函数。

  • spawn(process):
    1)首先将调用process本身的initialize()函数,进行初始化。
    2)然后调用process_manager的enqueue()方法,将process写入某一个thread的运行队列中。
UPID ProcessManager::spawn(ProcessBase* process, bool manager)
{
  CHECK(process != NULL);
  synchronized (process) {
    //processes已经记录过该process
    if (processes.count(process->pid.id) > 0) {
      return UPID();
    } else {
      //如果是新的Actor,记录下来
      processes[process->pid.id] = process;
    }
  }

  if (manager) {
    dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
  }

  UPID pid = process->self();
   //Actor入队
   enqueue(process);

  return pid;
}

至此,Actor创建成功,并放入process_manager的队列中,process_manager中有一个线程,不断激活各个Actor。Actor启动成功。

1.libprocess中多个process的单机并行通信:dispatcehr/delay

同一个进程内的不同Actor之间的并行通信,不需要获取返回值。

  1. dispatcher()
process::PID<SimpleProcess> pid = process::spwn(simpleProcess);
process::dispatcher(pid, &SimplerProcesss::doSomething, "test");

dispatcher()方法将一个event插入目标process的event队列中。event由目标process的成员函数和相应的变量组成,dispatcher成功后,对应process的event队列将有一个执行相关函数的event.

  1. delay()
delay(Seconds(5), self(), &Self::batch)

delay()是延迟的dispatcher(),dispatch和delay方法均可以通过self()函数对本线程写入新event。

2.libporcess中的异步并发:future/promise/defer

同一个进程内的不同Actor之间的并行通信,需要获取返回值。

  1. future
    用于异步回调的对象,一般由promise产生,可以在不同process中拷贝/引用,future中定义了各个回调函数接口:
const Future<T>& onDiscard(DiscardCallback&& callback) const;
const Future<T>& onRready(ReadyCallback&& callback) const;
const Future<T>& onFailed(FailedCallback&& callback) const;
const Future<T>& onDiscarded(DiscardedCallback&& callback) const;
const Future<T>& onAny(AnyCallback&& callback) const;

当一个future对象状态发生变化时,相应的回调函数会被调用,从而达到异步并发的效果。

  1. promise
    future的入口,一般只在一个process中set(赋值)。promise不应该再多个process之间拷贝/引用。promise中只包含了future成员变量。
    一般通过promise.future的方式产生一个future。futrue可以在不同process中引用/拷贝,但只能在一个进程中通过promise设置。因为promise一般只存在一个process中。
//简单实例
class Simple : public process::Process<SimpleProcess>
{
public:
  Future<Nothing> doSomething(const string msg) {
    cout << "Wrapping message" << msg << endl;
    return Nothing();
  }

  Future<int> calc(int lhs, int rhs) {
    return Promise<int>(lhs+rhs).future();
  }
private:
  Promise<bool> shouldQuit;  
}

int runProcess()
{
  SimpleProcess simpleProcess;
  process::PID<SimpleProcess> pid = process::spawn(simpleProcess);

  process::dispatcher(pid, &SimpleProcess::doSomething, "test");
  
  Future<int> sum = process::dispatch(pid, &SimpleProcess::calc, 99, 101);
  sum.then([](int n) {
    cout << "99 + 101 = " << n << endl;
    return Nothing();
  })

  sum.await();
  process::terminate(SimpleProcess);
  process::wait(simpleProcess);
}
  1. defer
    如果使用如下语句:
dispatch(master_, &Master::indicateInverseOffer, request).onAny(dispatch(self(), &Self::updateRequestQueue, lambda::_1))

这一命令向master process dispach一个event,调用master类中的indicateInverseOffer函数。该函数返回一个future。我们希望future状态变化时,往本线程dispatcher一个event。
但是,onAny中的dispatch函数是由master process赋值的future调用,从而是由master process所属线程发起的dispatcher调用。不合目的。

dispatch(master_, &Master::indicateInverseOffer, request).onAny(defer(self(), &Self::updateRequestQueue, lambad::_1))

简单来说,defer()返回一个called object, 这个called object是在相应线程被初始化的。
那么当indicateInverseOffer()所返回的future状态发生变化时,onAny调用的是defer()函数的返回值,这个返回值是再代码所属的process产生的,这保证了程序的意图。

3.Libprocess分布式系统编程:基于protobuf消息的方法

不同进程间的不同Actor之间的并行通信。
libprocess通过提供send()和install()函数进行分布式通信。虽然libprocess分布式通信不依赖于protobuf, 但是为了简化消息结构序列化问题,libprocess提供protobufProcess类,这一定制的process支持基于protobuf的消息通信。

  1. install()
    Install方法将handler注册到process类所属的handler map中,一旦相应message到来。即启用handler map对应的handler(函数),以下定义作为install方法的实现路径之一:
template <typename M>
void install(void (T::*method)(const process::UPID))
{
  google::protobuf::Message* m = new M();
  T* t = static_cast<T*>(this);
  protobufHandlers[m->GetTypeName()] = lambda::bind(handler0, t, method,   
  lambda::_1, lambda::_2);
  delete m;
}

install 接受protobuf message中的field作为独立的输入,如:
install<SlaveRegisteredMessage>
(&SimpleMasterProcess::SlaveRegistered,&SlaveRegisteredMessage::slave_id)

当SlaveRegisteredMessage信息到来之后,process将把slaveRegistered()函数作为一个event写入队列,并将message中的slave_id field作为输入量。
特别注意的是,protobufProcess中,protobuf消息中的repeat fileds将会被自动转换成vector.

template<typename T>
std::vector<T> convert(const google::protobuf::RepeatedPtrFiled<T>& items)
{
  vector<T> result;
  for(int i = 0; i < items.size(); i++) {
    result.push_back(items.Get(i));
  }
  
  return result;
}
  1. UPID与通信
    非分布式通信,即同一进程内,不同Actor区分通过PID。
    在基于libprocess的分布式系统中,每个process均有一个唯一的UPID,process之间分布式通信通过UPID识别地址,UPID的主要信息如下:
std::string id;
//一般是ip:port
network::Addresss address;

每个UPID实现了libprocess的地址空间中唯一地址,其中id默认是从1开始,spawn一个新的process,则增加到(2),如此类推。

  1. send()
    使用send()换气远程process的回调函数:通过UPID,向远程process发送一个protobuf消息。远程process接收到消息后,将取出消息对应的handler,将它放入任务队列。
    同时注意,send()函数总会发送起始process的UPID,所以protobufProcess的install函数所有的handler的第一个输入永远是:
//源UPID
const process::UPID&

这样,可以知道每个消息来源的UPID.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容