DDPG是google DeepMind团队提出的一种用于输出确定性动作的算法,它解决了
Actor-Critic
神经网络每次参数更新前后都存在相关性,导致神经网络只能片面的看待问题这一缺点。同时也解决了DQN不能用于连续性动作的缺点
1. DDPG简介
Deep Deterministic Policy Gradient(DDPG)即深度确定性策略梯度算法,是一种可以解决连续性控制问题的方法,属于model-free,off-policy,policy-based的方法
- Silver, David, et al. "Deterministic policy gradient algorithms." ICML. 2014.
- Lillicrap, Timothy P., et al. "Continuous control with deep reinforcement learning." arXiv preprint arXiv:1509.02971 (2015)
DDPG可以拆开来看,Deep
是说明需要神经网络;Deterministic
的意思就是最终确定地只输出一个动作。Policy Gradient
是策略梯度算法。DDPG可以看成是DQN的扩展版,不同的是,以往的DQN在最终输出的是一个动作向量,对于DDPG是最终确定地只输出一个动作。而且,DDPG让DQN可以扩展到连续的动作空间
提出DDPG是为了让DQN可以扩展到连续的动作空间,比如车速、角度和电压这种的连续值。
- DDPG直接在DQN基础上加了一个策略网络来直接输出动作值,所以DDPG需要一边学习Q网络,一边学习策略网络
- Q网络的参数用来表示。策略网络的参数用来表示
- 这样的结构为
Actor-Critic
的结构
-
类似于DQN
- DQN的最佳策略是想要学出一个很好的Q网络,学好这个网络之后,希望选取的那个动作使Q值最大
- DDPG的目的也是为了求解让Q值最大的那个action
- Actor只是为了迎合评委的打分而已,所以用来优化策略网络的梯度就是要最大化这个Q值,所以构造的loss函数就是让Q取一个负号
- 实现上把loss函数投入优化器中,它就会自动最小化loss,也就是最大化Q
-
除了策略网络要做优化,DDPG还有一个Q网络也要优化
- 评委在一步一步的学习当中,慢慢地去给出准确的打分。
- 优化Q网络的方法其实跟DQN优化Q网络的方法是一样的,用真实的reward和下一步的即来去拟合未来的收益Q_target
- 让Q网络的输出去逼近这个Q_target
- 所以构造的lossfunction就是直接求这两个值的均方误差(MeanSquaredError,MSE)
- 构造好loss后,让优化器自动去最小化loss就好了
策略网络的loss function是一个复合函数,把代进去,最终策略网络要优化的是策略网络的参数。Q网络要优化的是和Q_target之间的均方误差
但是Q网络的优化存在一个和DQN一模一样的问题就是它后面的Q_target是不稳定的。此外后面的也是不稳定的,因为也是一个预估值
为了稳定这个Q_target,DDPG分别给Q网络和策略网络都搭建了target network:
- target_Q网络就为了来计算Q_target里面的
- 里面的需要的next action 就是通过target_P网络来去输出,即
- 为了区分前面的Q网络和策略网络以及后面的target_Q网络和target_P策略网络,前面的网络的参数是,后面的网络的参数是
- DDPG有四个网络,策略网络的target网络和Q网络的target网络,它是为了让计算Q_target的时候能够更稳定一点,因为这两个网络也是固定一段时间的参数之后再跟评估网络同步一下最新的参数
这里面训练需要用到的数据就是,只需要用到这四个数据。我们就用Replay Memory
把这些数据存起来,然后再sample进来训练。这个经验回放的技巧跟DQN是一模一样的。因为DDPG使用了经验回放这个技巧,所以DDPG是一个off-policy
的算法
2. Exploration vs. Exploitation
DDPG通过off-policy
的方式来训练一个确定性策略。因为策略是确定的,如果agent使用同策略来探索,在一开始的时候,很可能不会尝试足够多的action来找到有用的学习信号。为了让DDPG的策略更好地探索,在训练的时候action加了噪音。DDPG的原作者推荐使用时间相关的OUnoise
,但最近的结果表明不相关的、均值为0的Gaussian noise
的效果非常好,由于后者更简单,因此更喜欢使用它。为了便于获得更高质量的训练数据,可以在训练过程中把噪声变小
在测试的时候,为了查看策略利用它学到的东西的表现,不会在action中加噪音
虽然DDPG表现很好,但它在超参数和其他类型的调整方面经常很敏感。DDPG常见的问题是已经学习好的Q函数开始显著地高估Q值,然后导致策略被破坏了,因为它利用了Q函数中的误差。可以拿实际的Q值跟这个Q-network输出的Q值进行对比。实际的Q值可以用MC来算。根据当前的policy采样1000条轨迹,得到G后取平均,得到实际的Q值
双延迟深度确定性策略梯度(Twin Delayed DDPG,简称 TD3)
通过引入三个关键技巧来解决这个问题:
截断的双Q学习(Clipped Dobule Q-learning): TD3学习两个Q-function(因此名字中有twin)。TD3通过最小化均方误差来同时学习两个Q-function:和。两个Q-function都使用一个目标,两个Q-function中给出较小的值会被作为如下的Q-target:
延迟的策略更新(“Delayed” Policy Updates):相关实验结果表明,同步训练动作网络和评价网络,却不使用目标网络,会导致训练过程不稳定;但是仅固定动作网络时,评价网络往往能够收敛到正确的结果。因此TD3算法以较低的频率更新动作网络,较高频率更新评价网络,通常每更新两次评价网络就更新一次策略
目标策略平滑(Target Policy smoothing): TD3引入了smoothing的思想,TD3在目标动作中加入噪音,通过平滑Q沿动作的变化,使策略更难利用Q函数的误差
这三个技巧加在一起,使得性能相比基线DDPG有了大幅的提升
目标策略平滑化的工作原理如下:
其中本质上是一个噪声,是从正态分布中取样得到的,即,目标策略平滑化是一种正则化方法
3. 算法流程
伪代码如下:
- 初始化Actor和Critic以及其各自的目标网络共4个网络以及经验池replay buffer R
- 在Actor网络输出动作时,DDPG通过添加随机噪声的方式实现exploration,可以让智能体更好的探索潜在的最优策略,之后是采取经验回放的技巧。把智能体与环境交互的数据存储到R。随后每次训练从中随机采样一个minibatch
- 在参数更新上,先利用Critic的目标网络来计算目标值,利用与当前Q值的均方误差构造损失函数,进行梯度更新。对于Actor的策略网络,其实就是把Actor的确定性动作函数代进Q-function的,然后求梯度,最后是更新目标网络
4. 总结
简单来说DQN+Actor-Critic=>Deep Deterministic Policy Gradient(DDPG)。实际上DDPG其实更接近DQN,只是采用了类似Actor-Critic的结构。DDPG吸收了Actor-Critic中策略梯度单步更新的优点,同时还吸收了DQN对Q值估计的技巧。DDPG 最大的优势就是能够在连续动作上更有效地学习
5. 代码
代码主要看DDPG算法主要几个模块:
5.1 Actor
Actor作用是接收状态描述,输出一个action,由于DDPG中的动作空间要求是连续的,所以使用了一个tanh
class Actor(nn.Module):
def __init__(self, n_obs, n_actions, hidden_size, init_w=3e-3):
super(Actor, self).__init__()
self.linear1 = nn.Linear(n_obs, hidden_size)
self.linear2 = nn.Linear(hidden_size, hidden_size)
self.linear3 = nn.Linear(hidden_size, n_actions)
self.linear3.weight.data.uniform_(-init_w, init_w)
self.linear3.bias.data.uniform_(-init_w, init_w)
def forward(self, x):
x = F.relu(self.linear1(x))
x = F.relu(self.linear2(x))
x = F.tanh(self.linear3(x))
return x
实现方面,就是用了几个全连接层来设计的网络,输出的结果是一个连续的值
5.2 Critic
Critic批评者,在DDPG中接受来自Actor的一个Action值和当前的状态,输出的是当前状态下,采用Action动作以后得到的关于Q的期望
class Critic(nn.Module):
def __init__(self, n_obs, n_actions, hidden_size, init_w=3e-3):
super(Critic, self).__init__()
self.linear1 = nn.Linear(n_obs + n_actions, hidden_size)
self.linear2 = nn.Linear(hidden_size, hidden_size)
self.linear3 = nn.Linear(hidden_size, 1)
# 随机初始化为较小的值
self.linear3.weight.data.uniform_(-init_w, init_w)
self.linear3.bias.data.uniform_(-init_w, init_w)
def forward(self, state, action):
# 按维数1拼接
x = torch.cat([state, action], 1)
x = F.relu(self.linear1(x))
x = F.relu(self.linear2(x))
x = self.linear3(x)
return x
5.3 Replay Buffer
Replay Buffer就是用来存储一系列等待学习的SARS片段。
class ReplayBuffer:
def __init__(self, capacity):
self.capacity = capacity
self.buffer = []
self.position = 0
def push(self, state, action, reward, next_state, done):
if len(self.buffer) < self.capacity:
self.buffer.append(None)
self.buffer[self.position] = (state, action, reward, next_state, done)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
state_batch, action_batch, reward_batch, next_state_batch, done_batch = map(np.stack, zip(*batch))
return state_batch, action_batch, reward_batch, next_state_batch, done_batch
def __len__(self):
return len(self.buffer)
可以设置Replay Buffer的容量,push函数是向buffer中添加一个SARS片段;sample代表从buffer中采样batch size个片段
5.4 DDPG
DDPG用到了以上的所有对象,包括Critic、Target Critic、Actor、Target Actor、memory
init函数如下:
def __init__(self, n_states, n_actions, hidden_dim=30, device="cpu", critic_lr=1e-3,
actor_lr=1e-4, gamma=0.99, soft_tau=1e-2, memory_capacity=100000, batch_size=128):
self.device = device
self.critic = Critic(n_states, n_actions, hidden_dim).to(device)
self.actor = Actor(n_states, n_actions, hidden_dim).to(device)
self.target_critic = Critic(n_states, n_actions, hidden_dim).to(device)
self.target_actor = Actor(n_states, n_actions, hidden_dim).to(device)
for target_param, param in zip(self.target_critic.parameters(), self.critic.parameters()):
target_param.data.copy_(param.data)
for target_param, param in zip(self.target_actor.parameters(), self.actor.parameters()):
target_param.data.copy_(param.data)
self.critic_optimizer = optim.Adam(
self.critic.parameters(), lr=critic_lr)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
self.memory = ReplayBuffer(memory_capacity)
self.batch_size = batch_size
self.soft_tau = soft_tau
self.gamma = gamma
其中核心的函数就是update函数:
def update(self):
if len(self.memory) < self.batch_size:
return
state, action, reward, next_state, done = self.memory.sample(
self.batch_size)
# 将所有变量转为张量
state = torch.FloatTensor(state).to(self.device)
next_state = torch.FloatTensor(next_state).to(self.device)
action = torch.FloatTensor(action).to(self.device)
reward = torch.FloatTensor(reward).unsqueeze(1).to(self.device)
done = torch.FloatTensor(np.float32(done)).unsqueeze(1).to(self.device)
# 注意critic将(s_t,a)作为输入
policy_loss = self.critic(state, self.actor(state))
policy_loss = -policy_loss.mean()
next_action = self.target_actor(next_state)
target_value = self.target_critic(next_state, next_action.detach())
expected_value = reward + (1.0 - done) * self.gamma * target_value
expected_value = torch.clamp(expected_value, -np.inf, np.inf)
value = self.critic(state, action)
value_loss = nn.MSELoss()(value, expected_value.detach())
self.actor_optimizer.zero_grad()
policy_loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.zero_grad()
value_loss.backward()
self.critic_optimizer.step()
for target_param, param in zip(self.target_critic.parameters(), self.critic.parameters()):
target_param.data.copy_(
target_param.data * (1.0 - self.soft_tau) +
param.data * self.soft_tau
)
for target_param, param in zip(self.target_actor.parameters(), self.actor.parameters()):
target_param.data.copy_(
target_param.data * (1.0 - self.soft_tau) +
param.data * self.soft_tau
)
整体流程如下:
- 从memory中采样一个batch的数据
- policy_loss = self.critic(state, self.actor(state))
- 将state放到actor对象得到action
- 将state,action放到critic对象得到policy loss
next_action = self.target_actor(next_state)
target_value = self.target_critic(next_state, next_action.detach())
然后target actor和target critic也按照以上过程得到target value
根据target value 计算expected value:
实现如下:
expected_value = reward + (1.0 - done) * self.gamma * target_value
expected_value = torch.clamp(expected_value, -np.inf, np.inf)
如果done为1,代表已经结束了,也就不需要这个系数了。第二行对expected value进行了数值上的限制
-
接下来计算根据数据集中action得到的value值
value = self.critic(state, action)
-
计算优化Q网络的loss, 采用的是MSEloss
value_loss = nn.MSELoss()(value, expected_value.detach())
对policy loss和value loss进行梯度回传,更新训练参数