我们以数据流向为主线索,讲讲论文代码做了些什么事情。
跑算法就是先收集数据,然后把它feed到构建好的模型中去训练。这个代码还多了一步planning。planning完收到新的数据,于是又开始新的一轮训练,循环下去。
那么问题来了:
首先数据从哪里得到?
def start(logdir, args):
1.1 #首先config是多层dist(dist嵌套dist嵌套dist...,总之装各种东西的),所有的参数和函数都存放到config里面。
config = tools.AttrDict()
config = getattr(configs, args.config)(config, args.params)
#即这个函数:
def default(config, params):
config.zero_step_losses = tools.AttrDict(_unlocked=True)
config = _data_processing(config, params) # data config
config = _model_components(config, params) # model config
config = _tasks(config, params) # task config
config = _loss_functions(config, params) # loss config
config = _training_schedule(config, params) # training config
return config
1.2 #开始去获取数据,
training.utility.collect_initial_episodes(config)
def random_episodes(env_ctor, num_episodes, output_dir=None):
#层层wrap,每层上都处理一点操作,直达最后的核心。
# 其实这一系列封装的env是在子进程中进行的,是由子进程产生真正的环境互动。
env = env_ctor()
#原进程发出命令,子进程产生了互动,原进程收到后再将episode写入outdir指定的位置。
env = wrappers.CollectGymDataset(env, output_dir)
#起了一个子进程跑env sever,与原进程的通过pipe通信:
env = control.wrappers.ExternalProcess(env_ctor)
#当下面函数运行时,实际是去call-> pipe.send
obs = env.reset()
#其实reset,step,close 都是去call,然后block到receive返回值,这个返回值就是具体observation
2。通过与子进程的环境互动得到的episode保存到哪里?
当层层封装的env调用函数时,env.step会递归地深入最里层,然后执行最后一行self._process_step。这一行将episode写入到outdir。
def step(self, action, *args, **kwargs):
if kwargs.get('blocking', True):
transition = self._env.step(action, *args, **kwargs)
return self._process_step(action, *transition)
3.从这个函数开始将数据从硬盘读到内存来使用:
def numpy_episodes:
1 #三个参数:1 生成数据的函数 2 数据类型 3 tensor shape
train = tf.data.Dataset.from_generator(
functools.partial(loader, train_dir, shape[0], **kwargs), dtypes, shapes):
loader实际是这个函数,即:将硬盘中的npz文件读入
def _read_episodes_reload
将读入的数据chunking(就是将读入的数据x,切成固定的长度chunk_length,这样数据就是以chunk为单位了)
train = train.flat_map(chunking)
好,数据准备好了,就是构造网络,计算出loss,再optimization就好了。
整个loss函数就是两部分,construction部分和KL部分,KL部分用到了overshooting。
那么什么是overshooting?
1. 首先当length=50,即50个time step。由这个函数得出的post 和 posterior ,将posterior作为prev_state 经过第一次cell,输出prior。用这个prior和posterior做KL,就是d=0的overshooting。这个之所以叫zero_step,因为这里KL的prior确实是由posterior直接生成的,且都属于同一个state。
有一个认知特别重要,一个state表示下图虚线框,一个state可以是posterior,也可以是prior。 一个state如图有5个元素,也就是posterior和prior有5个元素。且,同一个state的posterior和prior的belif和rnn_state相同。
(prior, posterior), _ = tf.nn.dynamic_rnn(
cell, (embedded, prev_action, use_obs), length, dtype=tf.float32, # cell, inputs:shape(batchsize,max_time,?):(40,50,?), sequence_length:shape(batchsize,):(40,)
swap_memory=True)
上面这个函数很关键,有两层cell,外层cell先传入dynamic_rnn. 两层cell分别做了什么呢?如下图:
2.继续讲overshooting。因为d不光=0,不光是固定步长,d可以= 1,2,3....amount 。所有overshooting就是求每一列的posterior(在图中每一列的底部)与这列的其他所有priors做KL。
3. (说来说去overshooting本质上就做了这样一件事)将1.中所得posterior 作为prev_state 放入dynamic_rnn()求出每一斜行的priors,与这一斜行对应的投影posterior做KL。
做完overshooting把posterior和priors等都准备好了,可以计算loss了,loss由zero_step loss 和 overshooting loss组成:
1.KL,global KL loss就是求两个分布的距离,而共有(50,50)个这样的分布
2.去调用相应函数去计算出output(reward,image,state) :
output = heads[key](features) # decoder is used.
output与对应的target做交叉熵:
loss = -tools.mask(output.log_prob(target[key]), mask)
最后再将loss求个均值,再根据key存放到一个字典losses中。
loss = -tools.mask(output.log_prob(target[key]), mask)
losses[key]
如下图:
这样zero_step loss就计算完了,现在来看overshooting loss,调用的函数都是compute_losses,区别只是准备好的数据不同。
loss计算完了,接下来是优化部分:
config.optimizers由state和main两个元素,main函数一执行生成tools.CustomOptimizer对象,里面什么配置都有,包括lr,用那个优化函数等。state与main同理
_define_optimizers:
optimizers[name] = functools.partial(
tools.CustomOptimizer, include=r'.*/head_{}/.*'.format(name), **kwargs)
优化设置好了,就开始训练模型N步。这轮训练好了,要用这轮的模型做planning了:
其实无论是计算loss,model还是simulation所有这些工作,代码中都按时间先后分为两大步骤。以simulation工作为例,它的第一步都是在config阶段完成配置,第二步在define_model()去具体执行。
1.配置阶段,把planning会用到cem函数,参数,各种参数都封装好,放入config.sim_collects
config.sim_collects = _active_collection(config, params) -> _define_simulation
2.在loss计算完,optimization 配置好后,在define_model()中开始一系列调用:
注:-> 指调用
这条线首先会判断该不该should_collect,如果应该,则进入这条长长的函数调用线:
define_model() -> simulate_episodes() -> simulate() ->
-> collect_rollouts()此函数里生成了MPCAgent()(即生成algo对象)-> simulate_step模拟出一步的总入口,而tf.scan()会让这个函数执行200次,即走200步。->_define_begin_episode()这里把环境reset了 -> _define_summaries() -> _define_step() -> algo.perform(agent_indices, prevob) 首先embedded,再求posterior,用这个state开始做planning,输出一个action。
上面是不断往里层调函数,返回值中有一个关键的score从哪里来的,返回到哪里去,如下:
add_score = score_var.assign_add(batch_env.reward) 返回给 step, score, length = _define_step() 返回给 define_summaries
最后的总结,如下图,整个过程中的一圈就完成了,再来就是新的一轮fit model再planning。