Nipype学习笔记(3)——Nipype中的并行

7. 并行计算

7.1 Iterables

长期进行神经影像学数据处理的朋友应该都知道,对一批被试进行同样的数据处理是一种很常见的方式,例如要对所有的被试执行:

  • Dicom2Nifti
  • Remove first images
  • Slice Timing
  • Realign
  • Normalize
  • ...

对于这样的基本需求,Nipype当然也帮我们准备好了一些并行处理的手段,那就是Iterables。此外,如果你还想尝试一下不同的参数对于数据处理的结果有什么影响,那么使用Gretna这样的软件通常会需要执行几遍(麻烦的是不是运行,而是需要重新设定参数,如果没有处理好,运行结果还可能彼此覆盖),这时候如果你能想到用Nipype中的Iterables,那就太好了。废话少说,先看东西:

Example 1 试验不同参数

from nipype import Node, Workflow
from nipype.interfaces.fsl import BET, IsotropicSmooth

# Initiate a skull stripping Node with BET
skullstrip = Node(
      BET(mask=True, in_file='/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz'),
      name="skullstrip")
isosmooth = Node(IsotropicSmooth(), name='iso_smooth')
isosmooth.iterables = ("fwhm", [4, 8, 16])
wf = Workflow(name="smoothflow")
wf.base_dir = "/output"
wf.connect(skullstrip, 'out_file', isosmooth, 'in_file')

# Run it in parallel (one core for each smoothing kernel)
wf.run('MultiProc', plugin_args={'n_procs': 3})

在这个例子中,整个Workflow由两个节点组成,第一个是剥头骨的节点,第二个是进行平滑化的节点,为了测试不同的平滑化参数,因此设置了一个iterables,指定了参数fwhm的不同值,分别是[4, 8, 16]。Workflow在运行时指定使用了MultiProc插件(官方文档中称之为plugin,不知道为什么要这么称呼),插件的参数为{'n_procs': 3},也就是最多可以运行三个进程(如果不指定的话好像是根据系统情况自行选择,但是没说自动选择是个啥概念)。

结果

运行之后的结果可以从上图中看出来,对应于不同的参数,会生成三个不同的文件夹。

Example 2 用同一套计算流程计算所有被试

subject_list = ['01', '02', '03', '04', '05']
from nipype import IdentityInterface
infosource = Node(IdentityInterface(fields=['subject_id']),
                  name="infosource")
infosource.iterables = [('subject_id', subject_list)]

from os.path import join as opj
from nipype.interfaces.io import SelectFiles, DataSink

anat_file = opj('sub-{subject_id}', 'ses-test', 'anat', 'sub-{subject_id}_ses-test_T1w.nii.gz')

templates = {'anat': anat_file}

selectfiles = Node(SelectFiles(templates,
                               base_directory='/data/ds000114'),
                   name="selectfiles")

# Datasink - creates output folder for important outputs
datasink = Node(DataSink(base_directory="/output",
                         container="datasink"),
                name="datasink")

wf_sub = Workflow(name="choosing_subjects")
wf_sub.connect(infosource, "subject_id", selectfiles, "subject_id")
wf_sub.connect(selectfiles, "anat", datasink, "anat_files")
wf_sub.run()

在这个例子中,是用的是IdentityInterface的iterables来处理不同的被试。这是最基本的套路,只要你想用Nipype来运行多个被试,就必然采用这种框架。

7.2 MapNode

在平时的计算中还有一种需求,就是当我们使用Iterable的时候,会生成多个输出,这个时候如果要将这些输出合并成一个列表并作为一个节点的输入(如图所示),那么该怎么办呢?这就需要我们的MapNode出场了。

还是通过一个简单的小例子来看一下:

from nipype import Function
def square_func(x):
    return x ** 2
square = Function(["x"], ["f_x"], square_func)
square.run(x=2).outputs.f_x

以上是一个计算平方的函数,我们用Function将其封装起来,这样当x=2时,square.run(x=2).outputs.f_x的输出结果就是4了。此时,如果我们用MapNode来封装Function,就可以得到一个带有迭代功能的Node了,代码如下:

from nipype import MapNode
square_node = MapNode(square, name="square", iterfield=["x"])
square_node.inputs.x = [0, 1, 2, 3]
res = square_node.run()

需要注意的是,我们在构造MapNode对象的时候,指定了一个iterfiled参数,该参数表明哪些参数是需要迭代的。例如,在该示例中,iterfield被指向为x,因此可以在设置input.x的时候将其设置为[0,1,2,3],这样一来,workflow就会在运行的时候依次将0,1,2,3代入其中并运行。

运行之后查看结果:

res.outputs.f_x

会看到结果为[0, 1, 4, 9]。如果你观察够仔细的话,会发现iterfield是一个列表,因此我们可以预见的是,其中可以迭代多个变量。

在下面这个例子中,不妨引入两个变量来看看结果:

def power_func(x, y):
    return x ** y
power = Function(["x", "y"], ["f_xy"], power_func)
power_node = MapNode(power, name="power", iterfield=["x", "y"])
power_node.inputs.x = [0, 1, 2, 3]
power_node.inputs.y = [0, 1, 2, 3]
res = power_node.run()
print(res.outputs.f_xy)

在上面这个例子中,我们将iterfield参数设置为["x", "y"],而xy的变化都是从0到4,这样一来运行之后的结果是[1, 1, 4, 27]

目前,在我接触到的处理中还不涉及这一用途,所以用的还比较少。

7.3 JoinNode

就如官方文档中所说,JoinNode的作用和Iterable完全相反,Iterable会产生多个并行的分支,但是JoinNode则会将多个分支进行合并(如下图所示)

JoinNode

其中,D就是一个JoinNode,按照我目前的理解,Iterables+JoinNode = MapNode,当然,使用Iterables+JoinNode的方式要比MapNode的方式要灵活的多,因为MapNode刚刚分手就和好了嘛,而Iterables+JoinNode却可以像以色列一样,分手千年,仍能复国。

下面的代码是上面这张图的伪代码,在编写程序的时候可以参考一下:

from nipype import Node, JoinNode, Workflow

# Specify fake input node A
a = Node(interface=A(), name="a")

# Iterate over fake node B's input 'in_file?
b = Node(interface=B(), name="b")
b.iterables = ('in_file', [file1, file2])

# Pass results on to fake node C
c = Node(interface=C(), name="c")

# Join forked execution workflow in fake node D
d = JoinNode(interface=D(),
             joinsource="b",
             joinfield="in_files",
             name="d")

# Put everything into a workflow as usual
workflow = Workflow(name="workflow")
workflow.connect([(a, b, [('subject', 'subject')]),
                  (b, c, [('out_file', 'in_file')])
                  (c, d, [('out_file', 'in_files')])
                  ])

7.3 并行中的一些参数设置

7.3.1 synchronize

synchronize的本意是同步,在Nipype中表示对于两个迭代变量,可以并行执行,这句话比较拗口,直接看图最合适不过了(都是官方的图和代码)。

b.iterables = [("m", [1, 2]), ("n", [3, 4])]
image.png

假设节点b有两个迭代变量mn,其迭代值分别是[1, 2][3, 4],如果我们不设置synchronize变量,那么正常情况下,会生成四条分支,也就是m和n值的组合:[1,3][1,4][2, 3][2,4]
一旦我们设置了synchronize变量:

b.iterables = [("m", [1, 2]), ("n", [3, 4])]
b.synchronize = True

就会生成如下的分支图:


image.png

如图所示,设置synchronize后,就会生成mn两两配对的分支。

7.3.2 itersource

itersource其实可以看成是synchronize的高级版本,能够实现更加复杂的分支控制。关于其功能,官方文档只给出了一句话的描述,对于这句话该如何翻译,我还拿捏的不是很准确,暂且先放在这里"The itersource feature allows you to expand a downstream iterable based on a mapping of an upstream iterable.",编程嘛,还是通过实际例子来看看会比较好。

a = Node(interface=A(), name="a")
b = Node(interface=B(), name="b")
b.iterables = ("m", [1, 2])
c = Node(interface=C(), name="c")
d = Node(interface=D(), name="d")
d.itersource = ("b", "m")
d.iterables = [("n", {1:[3,4], 2:[5,6]})]
my_workflow = Workflow(name="my_workflow")
my_workflow.connect([(a,b,[('out_file','in_file')]),
                     (b,c,[('out_file','in_file')])
                     (c,d,[('out_file','in_file')])
                     ])

上面代码对应的分支图如下图所示:

image.png

关键点在于节点d的设置,其先设置了itersourcebm,也就是说d节点的迭代是依赖于b节点的m值,之后又执行了代码:

d.iterables = [("n", {1:[3,4], 2:[5,6]})]

也就是说变化的是d节点的n变量,当m=1时,n值为13m=2时,n值为56

未完待续

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,527评论 5 470
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,314评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,535评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,006评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,961评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,220评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,664评论 3 392
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,351评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,481评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,397评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,443评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,123评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,713评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,801评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,010评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,494评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,075评论 2 341

推荐阅读更多精彩内容

  • PYTHON-进阶-ITERTOOLS模块小结转自wklken:http://wklken.me/posts/20...
    C_Y_阅读 949评论 0 2
  • 内置函数Python解释器内置了许多功能和类型,总是可用的。他们是按字母顺序列在这里。 abs(x)返回一个数的绝...
    uangianlap阅读 1,212评论 0 0
  • pyton review 学习指南 https://www.zhihu.com/question/29138020...
    孙小二wuk阅读 1,039评论 0 2
  • python学习笔记 声明:学习笔记主要是根据廖雪峰官方网站python学习学习的,另外根据自己平时的积累进行修正...
    renyangfar阅读 3,013评论 0 10
  • 每天适应着早起床,慢慢的向着考研的生活状态所靠拢,我同舍友决定每天早上六点多起床去后山散步,别人笑我们傻,傻就傻...
    人自为王阅读 119评论 0 0