(Caffe)基本类DataReader、QueuePair、Body(四)

本文从CSDN上转移过来:
http://blog.csdn.net/mounty_fsc/article/details/51088361

1 简介

QueuePair与BodyDataReader的内部类。一个DataReader对应一个任务,一个Body生成一个线程来读取数据库(如examples/mnist/mnist_train_lmdb)。QueuePair为前面两者之间的衔接、通信。

2 源代码

/**
 * @brief Reads data from a source to queues available to data layers.
 * A single reading thread is created per source, even if multiple solvers
 * are running in parallel, e.g. for multi-GPU training. This makes sure
 * databases are read sequentially, and that each solver accesses a different
 * subset of the database. Data is distributed to solvers in a round-robin
 * way to keep parallel training deterministic.
 */
class DataReader {
 public:
...
 protected:
  // Queue pairs are shared between a body and its readers
  class QueuePair {
   public:
    explicit QueuePair(int size);
    ~QueuePair();

    BlockingQueue<Datum*> free_;
    BlockingQueue<Datum*> full_;
  };

  // A single body is created per source
  class Body : public InternalThread {
   public:
...
   protected:
    void InternalThreadEntry();
    void read_one(db::Cursor* cursor, QueuePair* qp);

    const LayerParameter param_;
    BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;
...
  };
...

  const shared_ptr<QueuePair> queue_pair_;
  shared_ptr<Body> body_;
  static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;
};

2 类QueuePair

DataReader::QueuePair::QueuePair(int size) {
  // Initialize the free queue with requested number of datums
  for (int i = 0; i < size; ++i) {
    free_.push(new Datum());
  }
}

说明:

  1. 一个QueuePair对应一个任务队列,从数据库(如examples/mnist/mnist_train_lmdb)中读取size个样本
  2. BlockingQueue为一个线程安全的队列容器,其模板类型可能是DatumBatch等。此处装的是Datum
  3. BlockingQueue<Datum*> free_为Datum队列,均为新new出来的,没有包含原始数据(图像)信息
  4. BlockingQueue<Datum*> full_为从数据库读取信息后的队列,包含了原始数据(图像)信息
  5. Datum为一个样本单元,关于Datum的定义,参见caffe.proto文件,一般来说,Datum对应于一张图像(及其label

3 类Body

DataReader::Body::Body(const LayerParameter& param)
    : param_(param),
      new_queue_pairs_() {
  StartInternalThread();
}

说明:

  1. Body类继承了InternalThread(详见博文)。在构造函数了开启这个线程
  2. Body类重载了 DataReader::Body::InternalThreadEntry()函数,从数据库读取数据的操作在该函数中实现,见本文第5节

4 类DataReader

DataReader类的构造函数如下:

map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_;
static boost::mutex bodies_mutex_;

DataReader::DataReader(const LayerParameter& param)
    : queue_pair_(new QueuePair(  //
        param.data_param().prefetch() * param.data_param().batch_size())) {
  // Get or create a body
  boost::mutex::scoped_lock lock(bodies_mutex_);
  string key = source_key(param);
  weak_ptr<Body>& weak = bodies_[key];
  body_ = weak.lock();
  if (!body_) {
    body_.reset(new Body(param));
    bodies_[key] = weak_ptr<Body>(body_);
  }
  body_->new_queue_pairs_.push(queue_pair_);
}

说明:

  1. 一个数据库只可能有Body对象,如examples/mnist/mnist_train_lmdb不管在任何线程的任何DataReader对象中,都只会有一个Body对象,因为bodies_是静态的
  2. 所以有,一个Body的对象也可以有多个DataReader对象
  3. 此外有,一个DataReader对象可以有多个Body对象,即map<string,weak_ptr<Body>> bodies_
  4. 由代码5,6行及16行可知,每一个DataReader对应一个读的任务,即从数据库(如examples/mnist/mnist_train_lmdb)中读取param.data_param().prefetch() * param.data_param().batch_size()(LeNet5中默认为4×64)个样本
  5. 由此可见,一个DataReader为一个任务,通过QueuePair(也对应于该任务)“通知”Body某个数据库中读去N个样本
  6. 由代码13行可知,某个数据库(如examples/mnist/mnist_train_lmdb)对应的Body若不存在,将新建一个Body来处理该数据库,也可以理解成新建一个唯一对应于该数据库的线程来处理该数据可。

5 函数DataReader::Body::InternalThreadEntry

void DataReader::Body::InternalThreadEntry() {
...
  vector<shared_ptr<QueuePair> > qps;
  try {
...
    // To ensure deterministic runs, only start running once all solvers
    // are ready. But solvers need to peek on one item during initialization,
    // so read one item, then wait for the next solver.
    for (int i = 0; i < solver_count; ++i) {
      shared_ptr<QueuePair> qp(new_queue_pairs_.pop());
      read_one(cursor.get(), qp.get());
      qps.push_back(qp);
    }
    // Main loop
    while (!must_stop()) {
      for (int i = 0; i < solver_count; ++i) {
        read_one(cursor.get(), qps[i].get());
      }
...
    }
  } catch (boost::thread_interrupted&) {
    // Interrupted exception is expected on shutdown
  }
}

说明:

  1. read_one()QueuePair的free_中取出<font color="red">一个</font>Datum,从数据库读入数据至Datum,然后放入full_
  2. 由第4节16行可知,一个新的任务(DataReader)到来时,将把一个命令队列(QueuePair)放入到某个数据库(Body)的缓冲命令队列中(new_queue_pairs_
  3. 9到13行从每个solver的任务中读取一个Datum,在15到18行从数据库中循环读出数据
  4. <u>该线程何时停止呢?</u>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容