本文公开一个基于强化学习算法DQN的五子棋游戏自动下棋算法源码,并对思路进行讲解。
完整代码和预训练模型(Saver文件夹)地址:
python_强化学习算法DQN_玩五子棋游戏
一个基于CNN构成的DQN算法的8*8的五子棋游戏
1、Q-Learning介绍
Q-Learning的思想并不是很复杂,很多文章都有详细的介绍,这里只是简单举个例子,不做详细讲解。
2、DQN介绍
DQN也叫deepQ-Learning,在Q-Learning前面加一个Deep。Q-Learning有一个缺点,如果状态特别多,比如五子棋的棋盘,每个位置都有(空白、黑子、白子)三个状态,那么假如一个10*10的棋盘 就有3^100个状态,那么这个Q表是没办法做出来的。那么我们就没办法构建这个Q表来获取状态价值状态转移价值了。
DQN就是搭建一个人工神经网络,输入是当前状态,输出是状态转移价值。或者输入是当前状态,输出是当前状态的Q值。通过多次迭代训练,使得神经网络输出逼近真实的Q值(逼近而不是等于,因为毕竟是神经网络,参数数量,存储占用量远小于Q表,如果能做到完全等于的话,还要存储干啥)
那么神经网络的训练的损失就是 预测Q值和(max(下一步的真实Q值)乘系数 +奖励值)的差的平方。 预测Q值就是神经网络一次前向传播输出的Q值,真实Q值就是神经网络曾经预测的Q值。为什么真实Q值是神经网络曾经预测过的Q值呢?因为神经网络每次训练都会对输出值产生影响,如果这个真实Q值一直变化的话,那么神经网络是没办法收敛的。所以需要搭建另一个参数一模一样的神经网络来生成真实Q值。这个生成真实Q值的网络不需要训练,只需要迭代一定次数以后,复制一份预测网络的参数即可。就好比一个笨老师教一个学生,学生学会了以后当了老师,教新的学生,然后青出于蓝而胜于蓝,这个学生越来越强。
本文中代码用的方法是,保存历史预测的Q值,等一个棋局结束后,再用这些Q值来训练每一步的预测Q值,这样做到一个神经网络就可以了。相当于一个聪明的学生,不停的复习,归纳,总结,然后逐渐变强。
3、对抗算法介绍
根据上面介绍的Q-Learning算法,解决的是一个单智能体的问题,这个智能体如何能够用最小的代价获得最大的回报。但是对弈的学习过程不一样,博弈中存在两个智能体,当前状态和当前动作对应的下一个状态会有很多,因为对手怎么下子我们不知道。那么当前状态和当前动作对应的什么状态是固定的呢?对手的状态。那么我能不能预测一下对手下一步能达到的最大的Q值呢?对手的Q值和我的Q值又有什么关系呢?对于零和博弈,对手的优势就是我得劣势,对手的劣势就是我的优势,那么我就可以用对手的Q值乘一个负的系数来训练当前的Q值。这样就解决了。
训练的过程就是,先自己和自己下一局棋,并记录每一步和每一步预测的最大Q值。等棋局结束后,再把整个棋局用神经网络"回顾"一遍,用记录的步子,Q值训练。
4、训练过程中注意的地方
下子的时候按照常理,咱们都是选择Q值最大的动作来下子,这样下子是没问题的,但是我们是来训练网络的,如果每次选择最大的步子下子的话容易陷入一个僵局。获胜方一直用同样或相似的套路打败败方,神经网络很快损失下降很快,但是还是不会正确的落子,或者说它只对某一种棋局局面的风格掌握得很好,对不按照套路出牌的人就没办法应对。那么我们就要加一个随即事件,一部分步子是按照最大值去走的,一部分步子是随机走的,但是最大Q值是每次都要计算出来保存用于回顾训练用的。
不同的棋子最好放在不同的channel里面,我发现如果用0背景1白棋2黑棋这样标注放到一个棋盘里面神经网络无法收敛
5、完整代码
运行代码入口如下,已经写了很详细的注释了,我很辛苦的
如果需要预训练模型需要从文章开始的链接下载Saver文件夹
import numpy as np
import random
import os
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
from DQN_point_game import Map
'''
此文件主要用于实现强化学习算法DQN玩五子棋
'''
class DQN():
def __init__(self):
self.n_input = Map.mapsize * Map.mapsize
self.n_output = 1
self.current_q_step = 0
self.avg_loss = 0
# placeholder是在神经网络构建graph的时候在模型中的占位,此时并没有把要输入的数据传入模型,它只会分配必要的内存。
# 建立完session后,在会话中,运行模型的时候通过feed_dict()函数向占位符喂入数据。
self.x = tf.placeholder("float", [None, Map.mapsize, Map.mapsize], name='x')
self.y = tf.placeholder("float", [None, self.n_output], name='y')
self.create_Q_network()
self.create_training_method()
self.saver = tf.train.Saver()
self.sess = tf.Session()
# 它能让你在运行图的时候,插入一些计算图
self.sess = tf.InteractiveSession()
self.sess.run(tf.global_variables_initializer())
def create_Q_network(self):
# tf.random_normal()函数用于从“服从指定正态分布的序列”中随机取出指定个数的值。 stddev: 正态分布的标准差
wc1 = tf.Variable(tf.random_normal([3, 3, 1, 64], stddev=0.1), dtype=tf.float32, name='wc1')
wc2 = tf.Variable(tf.random_normal([3, 3, 64, 128], stddev=0.1), dtype=tf.float32, name='wc2')
wc3 = tf.Variable(tf.random_normal([3, 3, 128, 256], stddev=0.1), dtype=tf.float32, name='wc3')
wd1 = tf.Variable(tf.random_normal([256, 128], stddev=0.1), dtype=tf.float32, name='wd1')
wd2 = tf.Variable(tf.random_normal([128, self.n_output], stddev=0.1), dtype=tf.float32, name='wd2')
# tf.Variable 得到的是张量,而张量并不是具体的值,而是计算过程
bc1 = tf.Variable(tf.random_normal([64], stddev=0.1), dtype=tf.float32, name='bc1')
bc2 = tf.Variable(tf.random_normal([128], stddev=0.1), dtype=tf.float32, name='bc2')
bc3 = tf.Variable(tf.random_normal([256], stddev=0.1), dtype=tf.float32, name='bc3')
bd1 = tf.Variable(tf.random_normal([128], stddev=0.1), dtype=tf.float32, name='bd1')
bd2 = tf.Variable(tf.random_normal([self.n_output], stddev=0.1), dtype=tf.float32, name='bd2')
weights = {
'wc1': wc1,
'wc2': wc2,
'wc3': wc3,
'wd1': wd1,
'wd2': wd2
}
biases = {
'bc1': bc1,
'bc2': bc2,
'bc3': bc3,
'bd1': bd1,
'bd2': bd2
}
self.Q_value = self.conv_basic(self.x, weights, biases)
self.Q_Weihgts = [weights, biases]
def conv_basic(self, _input, _w, _b):
# input
_out = tf.reshape(_input, shape=[-1, Map.mapsize, Map.mapsize, 1])
# conv layer 1 conv2d 用于做二维卷积 strides, # 步长参数 padding, # 卷积方式
_out = tf.nn.conv2d(_out, _w['wc1'], strides=[1, 1, 1, 1], padding='SAME')
# bias_add 一个叫bias的向量加到一个叫value的矩阵上,是向量与矩阵的每一行进行相加
_out = tf.nn.relu(tf.nn.bias_add(_out, _b['bc1']))
# ksize 池化窗口的大小,取一个四维向量 padding: 填充的方法,SAME或VALID,SAME表示添加全0填充,VALID表示不添加
_out = tf.nn.max_pool(_out, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
# conv layer2
_out = tf.nn.conv2d(_out, _w['wc2'], strides=[1, 1, 1, 1], padding='SAME')
_out = tf.nn.relu(tf.nn.bias_add(_out, _b['bc2']))
_out = tf.nn.max_pool(_out, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
# conv layer3
_out = tf.nn.conv2d(_out, _w['wc3'], strides=[1, 1, 1, 1], padding='SAME')
_out = tf.nn.relu(tf.nn.bias_add(_out, _b['bc3']))
# 计算张量tensor沿着指定的数轴(tensor的某一维度)上的的平均值,主要用作降维或者计算tensor(图像)的平均值。
_out = tf.reduce_mean(_out, [1, 2])
# fully connected layer1 matmul 两个矩阵中对应元素各自相乘
_out = tf.nn.relu(tf.add(tf.matmul(_out, _w['wd1']), _b['bd1']))
# fully connected layer2
_out = tf.add(tf.matmul(_out, _w['wd2']), _b['bd2'])
return _out
def create_training_method(self):
# squared_difference 计算张量 x、y 对应元素差平方
self.cost = tf.reduce_mean(tf.squared_difference(self.Q_value, self.y))
self.optm = tf.train.AdamOptimizer(learning_rate=0.001, name='Adam').minimize(self.cost)
def restore(self):
if os.path.exists('Saver/cnnsaver.ckpt-0.index'):
self.saver.restore(self.sess, os.path.abspath('Saver/cnnsaver.ckpt-0'))
def computerPlay(self, IsTurnWhite):
if IsTurnWhite:
print('白旗走')
# 如果该白旗走的话 用黑的棋盘,1代表黑,-1代表白
board = np.array(Map.blackBoard)
else:
print('黑旗走')
# 如果该黑旗走的话 用白的棋盘 1代表白,-1代表黑
board = np.array(Map.whiteBoard)
# 建立所有可下位置的数组,每下一个位置一个数组
boards = []
# 当前棋谱中空白的地方
positions = []
for i in range(Map.mapsize):
for j in range(Map.mapsize):
# 如果这个当前棋谱这个位置是空白的
if board[j][i] == Map.backcode:
predx = np.copy(board)
# -1代表自己,更方便计算
predx[j][i] = -1
boards.append(predx)
positions.append([i, j])
if len(positions) == 0:
return 0, 0, 0
# 计算所有可下的位置的价值
nextStep = self.sess.run(self.Q_value, feed_dict={self.x: boards})
maxx = 0
maxy = 0
maxValue = -1000 # 实际最大价值 用于后续学习
# 从所有可下的地方找一个价值最大的位置下棋
for i in range(len(positions)):
value = nextStep[i] + random.randint(0, 10) / 1000 # 如果没有最优步子 则随机选择一步
if value > maxValue:
maxValue = value
maxx = positions[i][0]
maxy = positions[i][1]
print(str(maxx) + ',' + str(maxy))
print('此位置的价值为:' + str(maxValue[0]))
return maxx, maxy, maxValue
# 下完了一局就更新一下AI模型
def TrainOnce(self, winner):
# 记录棋图
# board1 白棋 board2 黑棋
board1 = np.array(Map.mapRecords1)
board2 = np.array(Map.mapRecords2)
# 记录棋步
step1 = np.array(Map.stepRecords1)
step2 = np.array(Map.stepRecords2)
# 记录得分
scoreR1 = np.array(Map.scoreRecords1)
scoreR2 = np.array(Map.scoreRecords2)
board1 = np.reshape(board1, [-1, Map.mapsize, Map.mapsize])
board2 = np.reshape(board2, [-1, Map.mapsize, Map.mapsize])
step1 = np.reshape(step1, [-1, Map.mapsize, Map.mapsize])
step2 = np.reshape(step2, [-1, Map.mapsize, Map.mapsize])
score1 = []
score2 = []
board1 = (board1 * (1 - step1)) + step1 * Map.blackcode
board2 = (board2 * (1 - step2)) + step2 * Map.blackcode
# 每步的价值 = 奖励(胜1 负-0.9) + 对方棋盘能达到的最大价值(max taget Q) * (-0.9)
for i in range(len(board1)):
if i == len(scoreR2): # 白方已经五连 白方赢
print('白方已经五连,白方赢')
score1.append([1.0]) # 白方的最后一步获得1分奖励
else:
# 白方的价值为:黑方棋盘能达到的最大价值(max taget Q) * (-0.9)
score1.append([scoreR2[i][0] * -0.9])
if winner == 2:
print('惩罚白方的最后一步,将其价值设为 -0.9')
score1[len(score1) - 1][0] = -0.9
# 1 白棋 2 黑棋
for i in range(len(board2)):
if i == len(scoreR1) - 1: # 黑方赢
print('黑方已经五连,黑方赢')
score2.append([1.0])
else:
# 黑棋的得分为:白方棋盘能达到的最大价值(max taget Q) * (-0.9)
score2.append([scoreR1[i + 1][0] * -0.9])
if winner == 1:
print('惩罚黑方的最后一步,将其价值设为 -0.9')
# 惩罚黑方的最后一步
score2[len(score2) - 1][0] = -0.9
# 一次完成多个数组的拼接
borders = np.concatenate([board1, board2], axis=0)
scores = np.concatenate([score1, score2], axis=0)
_, totalLoss = self.sess.run([self.optm, self.cost], feed_dict={self.x: borders,
self.y: scores})
self.avg_loss += totalLoss
print('train avg loss ' + str(self.avg_loss))
self.avg_loss = 0
# os.path.abspath取决于os.getcwd,如果是一个绝对路径,就返回,
# 如果不是绝对路径,根据编码执行getcwd/getcwdu.然后把path和当前工作路径连接起来
self.saver.save(self.sess, os.path.abspath('Saver/cnnsaver.ckpt'), global_step=0)
def PlayWidthHuman(self):
# 读取历史存储的模型
self.restore()
Map.PlayWithComputer = self.computerPlay
Map.TrainNet = self.TrainOnce
Map.ShowWind()
if __name__ == '__main__':
dqn = DQN()
dqn.PlayWidthHuman()
用于构建棋谱的代码
Map.py
import tkinter as tk
import os
import time
import copy
# 定义窗口
top = tk.Tk()
top.title("AI自动玩五子棋")
top.geometry('400x300')
# 定义地图尺寸
mapsize = 8
# 元素尺寸
pixsize = 20
# 连子个数
winSet = 5
# 空白编号
backcode = 0
# 白棋
whitecode = 1
# 黑棋
blackcode = -1
# 定义画布
canvas = tk.Canvas(top, height=mapsize * pixsize, width=mapsize * pixsize,
bg="gray")
canvas.pack(pady=25)
for i in range(mapsize):
canvas.create_line(i * pixsize, 0,
i * pixsize, mapsize * pixsize,
fill='black')
canvas.create_line(0, i * pixsize,
mapsize * pixsize, i * pixsize,
fill='black')
# 初始棋盘
whiteBoard = []
stepBoard = []
for i in range(mapsize):
row = []
rowBak = []
for j in range(mapsize):
row.append(0)
rowBak.append(backcode)
whiteBoard.append(rowBak)
stepBoard.append(row)
blackBoard = copy.deepcopy(whiteBoard)
# 棋子列表
childMap = []
# 记录棋图
mapRecords1 = []
mapRecords2 = []
# 记录棋步
stepRecords1 = []
stepRecords2 = []
# 记录得分
scoreRecords1 = []
scoreRecords2 = []
isGameOver = False
IsTurnWhite = True
def Restart():
global isGameOver
global IsTurnWhite
for child in childMap:
canvas.delete(child)
childMap.clear()
isGameOver = False
IsTurnWhite = True
mapRecords1.clear()
mapRecords2.clear()
stepRecords1.clear()
stepRecords2.clear()
scoreRecords1.clear()
scoreRecords2.clear()
for i in range(mapsize):
for j in range(mapsize):
whiteBoard[j][i] = backcode
blackBoard[j][i] = backcode
WinDataSetPath = 'DataSets\\win'
LosDataSetPath = 'DataSets\\los'
TrainNet = None
def SaveDataSet(tag):
if TrainNet != None:
TrainNet(tag)
else:
winfilename = WinDataSetPath + '\\' + time.strftime("%Y%m%d%H%M%S", time.localtime()) + '.txt'
losfilename = LosDataSetPath + '\\' + time.strftime("%Y%m%d%H%M%S", time.localtime()) + '.txt'
if not os.path.exists('DataSets'):
os.mkdir('DataSets')
if not os.path.exists(WinDataSetPath):
os.mkdir(WinDataSetPath)
if not os.path.exists(LosDataSetPath):
os.mkdir(LosDataSetPath)
strInfo1 = ''
for i in range(len(mapRecords1)):
for j in range(mapsize):
for k in range(mapsize):
strInfo1 += str(mapRecords1[i][j][k]) + ','
strInfo1 += '\n'
for j in range(mapsize):
for k in range(mapsize):
strInfo1 += str(stepRecords1[i][j][k]) + ','
strInfo1 += '\n'
strInfo2 = ''
for i in range(len(mapRecords2)):
for j in range(mapsize):
for k in range(mapsize):
strInfo2 += str(mapRecords2[i][j][k]) + ','
strInfo2 += '\n'
for j in range(mapsize):
for k in range(mapsize):
strInfo2 += str(stepRecords2[i][j][k]) + ','
strInfo2 += '\n'
if tag == 1:
with open(winfilename, "w") as f:
f.write(strInfo1)
with open(losfilename, "w") as f:
f.write(strInfo2)
else:
with open(winfilename, "w") as f:
f.write(strInfo2)
with open(losfilename, "w") as f:
f.write(strInfo1)
def JudgementResult():
global isGameOver
judgemap = whiteBoard
for i in range(mapsize):
for j in range(mapsize):
if judgemap[j][i] != backcode:
tag = judgemap[j][i]
checkrow = True
checkCol = True
checkLine = True
checkLine2 = True
for k in range(winSet - 1):
if i + k + 1 < mapsize: # 行
if (judgemap[j][i + k + 1] != tag) and checkrow:
checkrow = False
if j + k + 1 < mapsize: # 斜线
if (judgemap[j + k + 1][i + k + 1] != tag) and checkLine:
checkLine = False
else:
checkLine = False
else:
checkrow = False
checkLine = False
if j + k + 1 < mapsize: # 列
if (judgemap[j + k + 1][i] != tag) and checkCol:
checkCol = False
if i - k - 1 >= 0: # 斜线
if (judgemap[j + k + 1][i - k - 1] != tag) and checkLine2:
checkLine2 = False
else:
checkLine2 = False
else:
checkCol = False
checkLine2 = False
if not checkrow and not checkCol and not checkLine and not checkLine2:
break
if checkrow or checkCol or checkLine or checkLine2:
isGameOver = True
SaveDataSet(tag)
return tag
return 0
PlayWithComputer = None
GetMaxScore = None
def playChess(event):
if isGameOver:
print('game is over, restart!')
Restart()
return
x = event.x // pixsize
y = event.y // pixsize
if x >= mapsize or y >= mapsize:
return
if whiteBoard[y][x] != backcode:
return
score = 0
if PlayWithComputer != None:
_x, _y, score = PlayWithComputer(IsTurnWhite)
res = chess(x, y, score)
if res == 0:
if PlayWithComputer != None:
x, y, score = PlayWithComputer(IsTurnWhite)
res = chess(x, y, score)
def chess(x, y, score):
global IsTurnWhite
if isGameOver:
print('game is over, restart!')
Restart()
return -1
if whiteBoard[y][x] != backcode:
print('game is over, restart!')
Restart()
return -1
step = copy.deepcopy(stepBoard)
step[y][x] = 1
if IsTurnWhite: # 白棋是人工走的 如果过用来当训练集 用反转棋盘
mapRecords1.append(copy.deepcopy(blackBoard))
stepRecords1.append(step)
scoreRecords1.append(score)
whiteBoard[y][x] = whitecode # 1白 -1黑
blackBoard[y][x] = blackcode
child = canvas.create_oval(x * pixsize,
y * pixsize,
x * pixsize + pixsize,
y * pixsize + pixsize, fill='white')
else:
mapRecords2.append(copy.deepcopy(whiteBoard))
stepRecords2.append(step)
scoreRecords2.append(score)
whiteBoard[y][x] = blackcode # 1白 -1黑
blackBoard[y][x] = whitecode
child = canvas.create_oval(x * pixsize,
y * pixsize,
x * pixsize + pixsize,
y * pixsize + pixsize, fill='black')
IsTurnWhite = not IsTurnWhite
childMap.append(child)
return JudgementResult()
# 按钮的点击事件
def AutoPlayOnce():
if PlayWithComputer != None:
x, y, score = PlayWithComputer(IsTurnWhite)
chess(x, y, score)
btnAuto = tk.Button(top, text="重新开始或者自动走1次", command=AutoPlayOnce)
btnAuto.pack()
# 画布与鼠标左键进行绑定
# canvas.bind("<B1-Motion>", playChess)
canvas.bind("<Button-1>", playChess)
# 按钮的点击事件
def AutoPlayOne():
global isGameOver
if PlayWithComputer != None:
for i in range(222):
if isGameOver:
break
x, y, score = PlayWithComputer(IsTurnWhite)
chess(x, y, score)
btnAuto = tk.Button(top, text="自动玩一局", command=AutoPlayOne)
btnAuto.pack()
canvas.bind("<Button-2>", playChess)
# 显示游戏窗口
def ShowWind():
top.mainloop()
喜欢请记得一键三连,之后会更新更有趣的算法,欢迎大家一起交流!