DQN系列算法对连续空间分布的action心有余而力不足,而Policy Gradient系列的算法能够有效的预测连续的动作。在此基础上DPG和DDPG算法被提了出来,并且能够有效地处理连续动作问题。
Paper:
DPG:Deterministic policy gradient algorithms
DDPG:Continuous Control with Deep Reinforcement Learning
Github:https://github.com/xiaochus/Deep-Reinforcement-Learning-Practice
环境
- Python 3.6
- Tensorflow-gpu 1.8.0
- Keras 2.2.2
- Gym 0.10.8
DPG
DPG(Deterministic Policy Gradient)确定性行为策略是D.Silver等在2014年提出的,DPG每一步的行为通过函数μ直接获得确定的值。在DPG之前业界普遍认为,环境模型无关的确定性策略是不存在的,而D.Silver等通过严密的数学推导证明了DPG的存在。根据DPG论文的证明,当概率策略的方差趋近于0的时候,就是确定性策略。
在之前的文章中,Policy Network通过log损失和discount reward来引策略导梯度的更新,AC方法通过log损失和TD error来引策略导梯度的更新,最后得到的策略都是一个action的概率分布。我们在选择action的时候其实是根据概率分布进行采样,因此Policy Gradient本质上是一个随机策略。采用随机策略时,即使在相同的状态,每次所采取的动作也很可能不一样。而确定性策略能够得到一个确定的action。
随机策略:π(a∣s)=P[a∣s]
确定性策略:a=μ(s)
DPG的学习框架采用AC的方法,DPG求解时少了重要性权重,这是因为重要性采样是用简单的概率分布去估计复杂的概率分布,DPG的action是确定值而不是概率分布。另外DPG的值函数评估用的是Q-learning的方法,即用TD error来估计动作值函数并忽略重要性权重。确定性策略AC方法的梯度公式和随机策略的梯度公式如下图所示。跟随机策略梯度相比,确定性策略少了对action的积分,多了reward对action的导数。
DDPG
DDPG(Deep Deterministic Policy Gradient)是利用 DQN 扩展 Q 学习算法的思路对DPG方法进行改造得到的(Actor-Critic,AC)框架的算法,该算法可用于解决连续动作空间上的 DRL 问题。相对于DPG的核心改进是采用卷积神经网络作为策略函数μ和Q函数的函数近似,即策略网络和Q网络;然后使用深度学习的方法来训练上述神经网络,如下图所示。
DDPG主要的关键点有以下几个:
1、DDPG可以看做是Nature DQN、Actor-Critic和DPG三种方法的组合算法。
2、Critic部分的输入为states和action。
3、Actor部分不再使用自己的Loss函数和Reward进行更新,而是使用DPG的思想,使用critic部分Q值对action的梯度来对actor进行更新。
4、使用了Nature DQN的思想,加入了经验池、随机抽样和目标网络,real Q值使用两个target网络共同计算。
5、target网络更新改为软更新,在每个batch缓慢更新target网络的参数。
6、 将ε-greedy探索的方法使用在连续值采样上,通过Ornstein-Uhlenbeck process为action添加噪声。
关于上述中的actor我们不直接计算损失而是使用criric的损失,我们可以这样理解:我们的actor的目的是尽量得到一个高Q值的action,因此actor的损失可以简单的理解为得到的反馈Q值越大损失越小,得到的反馈Q值越小损失越大。
如下图公式,actor(θ)中action对参数的梯度为da/dθ,critic中Q对action的梯度dq/da,最后得到的Q值对actor(θ)的梯度公式就为-(dq/da * da/dθ)(负数的原因是优化器的方向为最小化loss而我们的目的是最大化Q值)。
DDPG的算法流程如下所示:
算法实现
使用Pendulum来实验连续值预测,keras实现的DDPG如下所示:
# -*- coding: utf-8 -*-
import os
import random
import gym
from collections import deque
import numpy as np
import tensorflow as tf
from keras.layers import Input, Dense, Lambda, concatenate
from keras.models import Model
from keras.optimizers import Adam
import keras.backend as K
from DRL import DRL
class DDPG(DRL):
"""Deep Deterministic Policy Gradient Algorithms.
"""
def __init__(self):
super(DDPG, self).__init__()
self.sess = K.get_session()
self.env = gym.make('Pendulum-v0')
self.bound = self.env.action_space.high[0]
# update rate for target model.
self.TAU = 0.01
# experience replay.
self.memory_buffer = deque(maxlen=4000)
# discount rate for q value.
self.gamma = 0.95
# epsilon of action selection
self.epsilon = 1.0
# discount rate for epsilon.
self.epsilon_decay = 0.995
# min epsilon of ε-greedy.
self.epsilon_min = 0.01
# actor learning rate
self.a_lr = 0.0001
# critic learining rate
self.c_lr = 0.001
# ddpg model
self.actor = self._build_actor()
self.critic = self._build_critic()
# target model
self.target_actor = self._build_actor()
self.target_actor.set_weights(self.actor.get_weights())
self.target_critic = self._build_critic()
self.target_critic.set_weights(self.critic.get_weights())
# gradient function
self.get_critic_grad = self.critic_gradient()
self.actor_optimizer()
if os.path.exists('model/ddpg_actor.h5') and os.path.exists('model/ddpg_critic.h5'):
self.actor.load_weights('model/ddpg_actor.h5')
self.critic.load_weights('model/ddpg_critic.h5')
def _build_actor(self):
"""Actor model.
"""
inputs = Input(shape=(3,), name='state_input')
x = Dense(40, activation='relu')(inputs)
x = Dense(40, activation='relu')(x)
x = Dense(1, activation='tanh')(x)
output = Lambda(lambda x: x * self.bound)(x)
model = Model(inputs=inputs, outputs=output)
model.compile(loss='mse', optimizer=Adam(lr=self.a_lr))
return model
def _build_critic(self):
"""Critic model.
"""
sinput = Input(shape=(3,), name='state_input')
ainput = Input(shape=(1,), name='action_input')
s = Dense(40, activation='relu')(sinput)
a = Dense(40, activation='relu')(ainput)
x = concatenate([s, a])
x = Dense(40, activation='relu')(x)
output = Dense(1, activation='linear')(x)
model = Model(inputs=[sinput, ainput], outputs=output)
model.compile(loss='mse', optimizer=Adam(lr=self.c_lr))
return model
def actor_optimizer(self):
"""actor_optimizer.
Returns:
function, opt function for actor.
"""
self.ainput = self.actor.input
aoutput = self.actor.output
trainable_weights = self.actor.trainable_weights
self.action_gradient = tf.placeholder(tf.float32, shape=(None, 1))
# tf.gradients will calculate dy/dx with a initial gradients for y
# action_gradient is dq / da, so this is dq/da * da/dparams
params_grad = tf.gradients(aoutput, trainable_weights, -self.action_gradient)
grads = zip(params_grad, trainable_weights)
self.opt = tf.train.AdamOptimizer(self.a_lr).apply_gradients(grads)
self.sess.run(tf.global_variables_initializer())
def critic_gradient(self):
"""get critic gradient function.
Returns:
function, gradient function for critic.
"""
cinput = self.critic.input
coutput = self.critic.output
# compute the gradient of the action with q value, dq/da.
action_grads = K.gradients(coutput, cinput[1])
return K.function([cinput[0], cinput[1]], action_grads)
def OU(self, x, mu=0, theta=0.15, sigma=0.2):
"""Ornstein-Uhlenbeck process.
formula:ou = θ * (μ - x) + σ * w
Arguments:
x: action value.
mu: μ, mean fo values.
theta: θ, rate the variable reverts towards to the mean.
sigma:σ, degree of volatility of the process.
Returns:
OU value
"""
return theta * (mu - x) + sigma * np.random.randn(1)
def get_action(self, X):
"""get actor action with ou noise.
Arguments:
X: state value.
"""
action = self.actor.predict(X)[0][0]
# add randomness to action selection for exploration
noise = max(self.epsilon, 0) * self.OU(action)
action = np.clip(action + noise, -self.bound, self.bound)
return action
def remember(self, state, action, reward, next_state, done):
"""add data to experience replay.
Arguments:
state: observation.
action: action.
reward: reward.
next_state: next_observation.
done: if game done.
"""
item = (state, action, reward, next_state, done)
self.memory_buffer.append(item)
def update_epsilon(self):
"""update epsilon.
"""
if self.epsilon >= self.epsilon_min:
self.epsilon *= self.epsilon_decay
def process_batch(self, batch):
"""process batch data.
Arguments:
batch: batch size.
Returns:
states: states.
actions: actions.
y: Q_value.
"""
y = []
# ranchom choice batch data from experience replay.
data = random.sample(self.memory_buffer, batch)
states = np.array([d[0] for d in data])
actions = np.array([d[1] for d in data])
next_states = np.array([d[3] for d in data])
# Q_target。
next_actions = self.target_actor.predict(next_states)
q = self.target_critic.predict([next_states, next_actions])
# update Q value
for i, (_, _, reward, _, done) in enumerate(data):
target = reward
if not done:
target += self.gamma * q[i][0]
y.append(target)
return states, actions, y
def update_model(self, X1, X2, y):
"""update ddpg model.
Arguments:
states: states.
actions: actions.
y: Q_value.
Returns:
loss: critic loss.
"""
# loss = self.critic.train_on_batch([X1, X2], y)
loss = self.critic.fit([X1, X2], y, verbose=0)
loss = np.mean(loss.history['loss'])
X3 = self.actor.predict(X1)
a_grads = np.array(self.get_critic_grad([X1, X3]))[0]
self.sess.run(self.opt, feed_dict={
self.ainput: X1,
self.action_gradient: a_grads
})
return loss
def update_target_model(self):
"""soft update target model.
formula:θt ← τ * θ + (1−τ) * θt, τ << 1.
"""
critic_weights = self.critic.get_weights()
actor_weights = self.actor.get_weights()
critic_target_weights = self.target_critic.get_weights()
actor_target_weights = self.target_actor.get_weights()
for i in range(len(critic_weights)):
critic_target_weights[i] = self.TAU * critic_weights[i] + (1 - self.TAU) * critic_target_weights[i]
for i in range(len(actor_weights)):
actor_target_weights[i] = self.TAU * actor_weights[i] + (1 - self.TAU) * actor_target_weights[i]
self.target_critic.set_weights(critic_target_weights)
self.target_actor.set_weights(actor_target_weights)
def train(self, episode, batch):
"""training model.
Arguments:
episode: ganme episode.
batch: batch size of episode.
Returns:
history: training history.
"""
history = {'episode': [], 'Episode_reward': [], 'Loss': []}
for i in range(episode):
observation = self.env.reset()
reward_sum = 0
losses = []
for j in range(200):
# chocie action from ε-greedy.
x = observation.reshape(-1, 3)
# actor action
action = self.get_action(x)
observation, reward, done, _ = self.env.step(action)
# add data to experience replay.
reward_sum += reward
self.remember(x[0], action, reward, observation, done)
if len(self.memory_buffer) > batch:
X1, X2, y = self.process_batch(batch)
# update DDPG model
loss = self.update_model(X1, X2, y)
# update target model
self.update_target_model()
# reduce epsilon pure batch.
self.update_epsilon()
losses.append(loss)
loss = np.mean(losses)
history['episode'].append(i)
history['Episode_reward'].append(reward_sum)
history['Loss'].append(loss)
print('Episode: {}/{} | reward: {} | loss: {:.3f}'.format(i, episode, reward_sum, loss))
self.actor.save_weights('model/ddpg_actor.h5')
self.critic.save_weights('model/ddpg_critic.h5')
return history
def play(self):
"""play game with model.
"""
print('play...')
observation = self.env.reset()
reward_sum = 0
random_episodes = 0
while random_episodes < 10:
self.env.render()
x = observation.reshape(-1, 3)
action = self.actor.predict(x)[0]
observation, reward, done, _ = self.env.step(action)
reward_sum += reward
if done:
print("Reward for this episode was: {}".format(reward_sum))
random_episodes += 1
reward_sum = 0
observation = self.env.reset()
self.env.close()
if __name__ == '__main__':
model = DDPG()
history = model.train(200, 128)
model.save_history(history, 'ddpg.csv')
model.play()
训练结果如下图所示,reward持续上升的同时critic loss持续下降。由于Pendulum的每个action的reward在-16~0之间,因此reward越接近0效果越好。
测试结果如下所示,每轮游戏的总reward在-100左右,杆子能够保持直立状态,说明DDPG算法解决了这个问题。
play...
Reward for this episode was: -123.71978446919498
Reward for this episode was: -115.70330575701709
Reward for this episode was: -123.30843994892032
Reward for this episode was: -377.2392365834364
Reward for this episode was: -131.49351601402685
Reward for this episode was: -245.04125509091233
Reward for this episode was: -250.5214695454614
Reward for this episode was: -129.1264146531351
Reward for this episode was: -126.59492808745193
Reward for this episode was: -130.41697205331536
PS:在实现代码时,critic部分使用fit()和train_on_batch()会出现完全不同的结果,前者能使模型收敛而后者会导致模型不收敛。这个问题困扰了我很久,Debug很久才发现这里出现问题,而在之前的算法实现中都没有出现过。推测原因可能是因为critic部分求gradient时使用了K.function()的原因,导致critic没有进行参数更新,使用tf来求导会解决这个问题。