马尔可夫决策过程(MDP)
一:介绍
- 马尔可夫决策过程是用来形式化地描述强化学习中的环境
- 其中环境是完全可以观测的
- 值得注意的是,大部分强化学习问题都可以看作 MDP 问题。
简单地理解,MDP是用来描述环境的,且 agent 可以观察到环境的全部信息。也就是说是完全可以观测。所以 agent的状态会等于环境的状态,因此在MDP中会出现action这个概念。
二:马尔可夫性质
- 现在或未来的状态依赖于过于的状态
- 它可以被定义为:
如果一个状态是马尔可夫链中的一个状态,当且仅当:
- 当前状态能捕捉到过去状态的所有信息
- 一旦当前状态被确认,那么历史信息就可以被扔掉
状态转移矩阵
对于一个马尔可夫状态 和它的后继状态, 状态转移概率可以定义为:
故转移概率矩阵 可以定义为:
三:马尔可夫链
- 马尔可夫过程是一个无记忆性的随机过程,也就是说马尔可夫过程就是一串随机的状态序列 . 为什么是无记忆性的呢?因为
即下一状态的概率分布只能由当前状态决定,在时间序列中它前面的事件均与之无关
如上是学生的马尔可夫链,可以看出有7个节点,即有7个状态,同时每一个状态就是一个动作,现在,我们代码进行表示:
import numpy as np
# 设置字典: 从状态名到索引编号
state_to_index = dict()
state_to_index["C1"] = 0
state_to_index["C2"] = 1
state_to_index["C3"] = 2
state_to_index["Pass"] = 3
state_to_index["Pub"] = 4
state_to_index["FB"] = 5
state_to_index["Sleep"] = 6
# 设置字典: 从编号到状态名
index_to_state = dict()
for i, name in zip(state_to_index.values(), state_to_index.keys()):
index_to_state[i] = name
# 设置状态转移矩阵
# C1 C2 C3 Pass Pub FB Sleep
Pss = [
[0.0, 0.5, 0.0, 0.0, 0.0, 0.5, 0.0],
[0.0, 0.0, 0.8, 0.0, 0.0, 0.0, 0.2],
[0.0, 0.0, 0.0, 0.6, 0.4, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
[0.2, 0.4, 0.4, 0.0, 0.0, 0.0, 0.0],
[0.1, 0.0, 0.0, 0.0, 0.0, 0.9, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
]
Pss = np.array(Pss)
四:马尔可夫奖励过程
我们把马尔可夫奖励过程定义为:
其中是有限的状态集合, 是状态的转移矩阵。是回报函数,它定义为:
值得注意的是,这是立即回报,不是累计回报。
而 是折扣因子
五: Return
- 我们定义 return 是从步骤 开始的累计回报:
下面给出Return计算方法:
代码:
# 设置每一个状态的立即回报
# C1 C2 C3 Pass Pub FB Sleep
rewards = [-2, -2, -2, 10, 1, -1, 0]
# 计算 return 的函数
def compute_return(gamma=0.9, chains=None):
k = 0
total_value = 0
for i in range(len(chains)):
total_value += np.power(gamma, k) * rewards[state_to_index[chains[i]]]
# print("{}:{} reward:{}".format(chains[i], state_to_index[chains[i]], rewards[state_to_index[chains[i]]]))
k += 1
return total_value
chain1 = ["C1", "C2", "C3", "Pass", "Sleep"]
chain2 = ["C1", "FB", "FB", "C1", "C2", "Sleep"]
chain3 = ["C1", "C2", "C3", "Pub", "C2", "C3", "Pass", "Sleep"]
print(compute_return(gamma=0.5, chains=chain1))
print(compute_return(gamma=0.5, chains=chain2))
print(compute_return(gamma=0.5, chains=chain3))
运行结果
-2.25
-3.125
-3.40625
六:value function
- 现在定义value function : 表示当前状态的长期回报
也就是说它是 return 的期望
看看Devil给出的几个例子:
上面三个例子,是value fcuntion 的例子,先不要去纠结那些数字是怎么得到的,其实除了第一张图片我们可以确定value的值正确性外,其余两个例子我们都无法确定。因为我们并不知道变量G。
七: 贝尔曼方程
为什么要将这个方程呢?因为 根据上面介绍value的计算(即定义),计算起来比较复杂,所以我们寻求简单的解法,而贝尔曼方程的正好满足条件。
对 value 进行分解:
简单如下图示意:
把期望展开:
也就是说,当前状态 的 value 值等于立即奖励 加上下一个时刻状态的 value 值的 倍数
对于这个方程的实现,如果你有数值分析的基础,这个是十分容易实现的, 把贝尔曼方程写成矩阵形式,再通过迭代方法(动态规划、蒙特卡罗、时间差分法)求解
其实展开后就是:
代码简单表示如下
def compute_value(Pss_, r, gamma=0.8):
r = np.array(r).reshape((-1, 1))
v = np.dot(np.linalg.inv(np.eye(7, 7) - gamma * Pss_), r)
return v
print(compute_value(Pss, rewards, gamma=0.999999))
八: 马尔可夫决策过程
- 马尔可夫决策过程(MDP) 是马尔可夫奖励过程(MRP) 加上决策。其中所有马尔可夫状态就是环境。
- 定义马尔可夫决策过程:
- 其中 就是动作的集合
策略 Policies
- 一个策略完全由 agent 的行为决定
- MDP 策略依赖于当前状态,而不是依赖于历史状态
-
每一个策略就是一个分布, 所以概率转移矩阵和当前状态的reward可以写为:
value function
value function 可以分为两个:state value function 和 action value function 。 对于前者,我们上面已经定义过, 而对于 action value function, 定义为: 。
同样的,我们也可以对 action-value function 应用贝尔曼方程进行分解
示意图:
九: 代码
首先,我们想建立一个 MDP 模型来模拟学生马尔可夫决策过程,那我们就必须定义好状态空间、动作空间、策略、reward 设计
状态空间
S = ['浏览手机中', '第一节课', '第二节课', '第三节课', '睡觉']
动作空间
A = ['浏览手机', '学习', '离开浏览', '去酒吧', '退出学习']
定义状态动作-奖励的字典 和 状态-状态的转移概率矩阵
R = {} # 字典: 记录从一个状态采取一个动作后会得到的立即奖励
P = {} # 字典: 状态转移字典。记录从一个状态转移到另一个状态的概率
gamma = 1.0 # 衰减因子
对于 reward的设计,题目已经给出。现在,为了方便操作字典 P 和 R, 要用到如下几个工具:
# 现在, 构建学生马尔可夫决策过程
# 首先, 先定义一些工具类函数
def str_key(*args):
new_arg = []
for arg in args:
if type(arg) in [tuple, list]:
new_arg += [str(i) for i in arg]
else:
new_arg.append(str(arg))
return "_".join(new_arg)
def set_dict(target_dict, value, *args):
target_dict[str_key(*args)] = value
def set_prob(P, s, a, s1, p=1.0):
set_dict(P, p, s, a, s1)
def get_prob(P, s, a, s1):
return P.get(str_key(s, a, s1), 0)
def set_reward(R, s, a, r):
set_dict(R, r, s, a)
def get_reward(R, s, a):
return R.get(str_key(s, a), 0)
def display_dict(target_dict):
for key in target_dict.keys():
print("{}: {:.2f}".format(key, target_dict[key]))
print("")
def set_value(V, s, v):
set_dict(V, v, s)
def get_value(V, s):
return V.get(str_key(s), 0)
def set_pi(Pi, s, a, p=0.5):
set_dict(Pi, p, s, a)
def get_pi(P1, s, a):
return Pi.get(str_key(s, a), 0)
现在就设置环境:
set_prob(P, S[0], A[0], S[0]) # 浏 览 手 机 中 - 浏 览 手 机 -> 浏 览 手 机 中
set_prob(P, S[0], A[2], S[1]) # 浏 览 手 机 中 - 离 开 浏 览 -> 第 一 节 课
set_prob(P, S[1], A[0], S[0]) # 第 一 节 课 - 浏 览 手 机 -> 浏 览 手 机 中
set_prob(P, S[1], A[1], S[2]) # 第 一 节 课 - 学 习 -> 第 二 节 课
set_prob(P, S[2], A[1], S[3]) # 第 二 节 课 - 学 习 -> 第 三 节 课
set_prob(P, S[2], A[4], S[4]) # 第 二 节 课 - 退 出 学 习 -> 退 出 休 息
set_prob(P, S[3], A[1], S[4]) # 第 三 节 课 - 学 习 -> 退 出 休 息
set_prob(P, S[3], A[3], S[1], p = 0.2) # 第 三 节 课 - 泡 吧 -> 第 一 节 课
set_prob(P, S[3], A[3], S[2], p = 0.4) # 第 三 节 课 - 泡 吧 -> 第 一 节 课
set_prob(P, S[3], A[3], S[3], p = 0.4) # 第 三 节 课 - 泡 吧 -> 第 一 节 课
set_reward(R, S[0], A[0], -1) # 浏 览 手 机 中 - 浏 览 手 机 -> -1
set_reward(R, S[0], A[2], 0)
set_reward(R, S[1], A[0], -1) # 第 一 节 课 - 浏 览 手 机 -> -1
set_reward(R, S[1], A[1], -2) # 第 一 节 课 - 学 习 -> -2
set_reward(R, S[2], A[1], -2) # 第 二 节 课 - 学 习 -> -2
set_reward(R, S[2], A[4], 0)
set_reward(R, S[3], A[1], 10) # 第 三 节 课 - 学 习 -> 10
set_reward(R, S[3], A[3], +1) # 第 三 节 课 - 泡 吧 -> -1
# 浏 览 手 机 中 - 离 开 浏 览 -> 0
# 第 二 节 课 - 退 出 学 习 -> 0
MDP = (S, A, R, P, gamma)
print("----状 态 转 移 概 率 字 典 ( 矩 阵 ) 信 息:----")
display_dict(P)
print("----奖 励 字 典 ( 函 数 ) 信 息:----")
display_dict(R)
Pi = {}
set_pi(Pi, S[0], A[0], 0.5) # 浏 览 手 机 中 - 浏 览 手 机
set_pi(Pi, S[0], A[2], 0.5) # 浏 览 手 机 中 - 离 开 浏 览
set_pi(Pi, S[1], A[0], 0.5) # 第 一 节 课 - 浏 览 手 机
set_pi(Pi, S[1], A[1], 0.5) # 第 一 节 课 - 学 习
set_pi(Pi, S[2], A[1], 0.5) # 第 二 节 课 - 学 习
set_pi(Pi, S[2], A[4], 0.5) # 第 二 节 课 - 退 出 学 习
set_pi(Pi, S[3], A[1], 0.5) # 第 三 节 课 - 学 习
set_pi(Pi, S[3], A[3], 0.5) # 第 三 节 课 - 泡 吧
print("----状 态 转 移 概 率 字 典 ( 矩 阵 ) 信 息:----")
display_dict(Pi)
# 初 始 时 价 值 为 空 , 访 问 时 会 返 回0
print("----状 态 转 移 概 率 字 典 ( 矩 阵 ) 信 息:----")
V = {}
display_dict(V)
计算 q function
def compute_q(MDP, V, s, a):
# 根 据 给 定 的MDP, 价 值 函 数V, 计 算 状 态 行 为 对s,a的 价 值qsa
S, A, R, P, gamma = MDP
q_sa = 0
for s_prime in S:
q_sa += get_prob(P, s,a,s_prime) * get_value(V, s_prime)
q_sa = get_reward(R, s,a) + gamma * q_sa
return q_sa
计算 v function
def compute_v(MDP, V, Pi, s):
# 给 定MDP下 依 据 某 一 策 略Pi和 当 前 状 态 价 值 函 数V计 算 某 状 态s的 价 值
S, A, R, P, gamma = MDP
v_s = 0
for a in A:
v_s += get_pi(Pi, s,a) * compute_q(MDP, V, s, a)
return v_s
更新策略
# 根 据 当 前 策 略 使 用 回 溯 法 来 更 新 状 态 价 值, 本 章 不 做 要 求
def update_V(MDP, V, Pi):
# 给 定 一 个MDP和 一 个 策 略 , 更 新 该 策 略 下 的 价 值 函 数V
S, _, _, _, _ = MDP
V_prime = V.copy()
for s in S:
#set_value(V_prime, s, V_S(MDP, V_prime, Pi, s))
V_prime[str_key(s)] = compute_v(MDP, V_prime, Pi, s)
return V_prime
# 策 略 评 估, 得 到 该 策 略 下 最 终 的 状 态 价 值。 本 章 不 做 要 求
def policy_evaluate(MDP, V, Pi, n):
# 使 用n次 迭 代 计 算 来 评 估 一 个MDP在 给 定 策 略Pi下 的 状 态 价 值 , 初 始 时 价 值 为V
for i in range(n):
V = update_V(MDP, V, Pi)
return V
输出
V = policy_evaluate(MDP, V, Pi, 100)
display_dict(V)
# 验证状态在某策略下的价值
v = compute_v(MDP, V, Pi, "第三节课")
print("第三节课在当前策略下的价值为:{:.2f}".format(v))