[텐서플로로 배우는 딥러닝] -> 이책이 설명이 잘 되어있음!
<에이전트와 환경>
에이전트 - 무언가의 주체 (상태)
MDP
<참고자료> 인프런APP(sung kim) , 유투브(김성의 딥러닝)
여러방향에 대한 값이있음 큐를 최대로 하는 액션을 선택하는것을 argmaxQ(s , a)
- s상태일때 큐를 최대로 하는 a(Action)
- ㅠ* (*가 들어가면 최적이란 뜻)
나보다 한발 앞서있는 큐 - S'
다른 길찾기 방법을 찾기 위해서 E-greedy 를 사용
임계값을 주고 랜덤 값을 발생시키는데 ,, ? (exploit vs exploration)
- Discounted future reward..
= 상태가치함수와 행동가치함수
- train_catch_game.py
# -*- coding: utf-8 -*-
import tensorflow as tf
import numpy as np
import random
import math
import os
# 학습에 필요한 설정값들을 선언합니다.
epsilon = 1 # epsilon-Greedy 기법에 사용할 최초의 epsilon값
epsilonMinimumValue = 0.001 # epsilon의 최소값 (이 값 이하로 Decay하지 않습니다)
num_actions = 3 # 에이전트가 취할 수 있는 행동의 개수 - (좌로 움직이기, 가만히 있기, 우로 움직이기)
num_epochs = 2000 # 학습에 사용할 반복횟수
hidden_size = 128 # 히든레이어의 노드 개수
maxMemory = 500 # Replay Memory의 크기
batch_size = 50 # 학습에 사용할 배치 개수
gridSize = 10 # 에이전트가 플레이하는 게임 화면 크기 (10x10 grid)
state_size = gridSize * gridSize # 게임 환경의 현재상태 (10x10 grid)
discount = 0.9 # Discount Factor \gamma
learning_rate = 0.2 # 러닝 레이트
# s와 e사이의 랜덤한 값을 리턴하는 유틸리티 함수를 정의합니다.
def randf(s, e):
return (float(random.randrange(0, (e - s) * 9999)) / 10000) + s;
# DQN 모델을 정의합니다.
# 100(현재 상태 - 10x10 Grid) -> 128 -> 128 -> 3(예측된 각 행동의 Q값)
def build_DQN(x):
W1 = tf.Variable(tf.truncated_normal(shape=[state_size, hidden_size], stddev=1.0 / math.sqrt(float(state_size))))
b1 = tf.Variable(tf.truncated_normal(shape=[hidden_size], stddev=0.01))
H1_output = tf.nn.relu(tf.matmul(x, W1) + b1)
W2 = tf.Variable(tf.truncated_normal(shape=[hidden_size, hidden_size],stddev=1.0 / math.sqrt(float(hidden_size))))
b2 = tf.Variable(tf.truncated_normal(shape=[hidden_size], stddev=0.01))
H2_output = tf.nn.relu(tf.matmul(H1_output, W2) + b2)
W3 = tf.Variable(tf.truncated_normal(shape=[hidden_size, num_actions],stddev=1.0 / math.sqrt(float(hidden_size))))
b3 = tf.Variable(tf.truncated_normal(shape=[num_actions], stddev=0.01))
output_layer = tf.matmul(H2_output, W3) + b3 # 출력에서는 활성화 함수 X
return tf.squeeze(output_layer)
# 인풋 화면 이미지와 타겟 Q값을 받기 위한 플레이스 홀더를 선언합니다.
x = tf.placeholder(tf.float32, shape=[None, state_size])
y = tf.placeholder(tf.float32, shape=[None, num_actions])
# DQN 모델을 선언하고 예측결과를 리턴받습니다.
y_pred = build_DQN(x)
# MSE 손실 함수와 옵티마이저를 정의합니다.
# 정확하게하려면 2를 곱해주는게 좋음.
loss = tf.reduce_sum(tf.square(y-y_pred)) / (2*batch_size) # MSE 손실 함수
optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss)
# CatchGame을 수행하는 Environment를 구현합니다.
class CatchEnvironment():
# 상태의 초기값을 지정합니다.
def __init__(self, gridSize):
self.gridSize = gridSize
self.state_size = self.gridSize * self.gridSize
self.state = np.empty(3, dtype = np.uint8)
# 관찰 결과를 리턴합니다.
def observe(self):
canvas = self.drawState()
canvas = np.reshape(canvas, (-1,self.state_size))
return canvas
# 현재 상태(fruit, basket)를 화면에 출력합니다.
def drawState(self):
canvas = np.zeros((self.gridSize, self.gridSize))
# fruit를 화면에 그립니다.
canvas[self.state[0]-1, self.state[1]-1] = 1
# basket을 화면에 그립니다.
canvas[self.gridSize-1, self.state[2] -1 - 1] = 1
canvas[self.gridSize-1, self.state[2] -1] = 1
canvas[self.gridSize-1, self.state[2] -1 + 1] = 1
return canvas
# 게임을 초기 상태로 리셋합니다.
def reset(self):
initialFruitColumn = random.randrange(1, self.gridSize + 1)
initialBucketPosition = random.randrange(2, self.gridSize + 1 - 1)
self.state = np.array([1, initialFruitColumn, initialBucketPosition])
return self.getState()
# 현재 상태를 불러옵니다.
def getState(self):
stateInfo = self.state
fruit_row = stateInfo[0]
fruit_col = stateInfo[1]
basket = stateInfo[2]
return fruit_row, fruit_col, basket
# 에이전트가 취한 행동에 대한 보상을 줍니다.
def getReward(self):
fruitRow, fruitColumn, basket = self.getState()
# 만약 fruit가 바닥에 닿았을 때
if (fruitRow == self.gridSize - 1):
# basket이 fruit을 받아내면 1의 reward를 줍니다.
if (abs(fruitColumn - basket) <= 1):
return 1
# fruit를 받아내지 못하면 -1의 reward를 줍니다.
else:
return -1
# fruit가 바닥에 닿지 않은 중립적인 상태는 0의 reward를 줍니다.
else:
return 0
# 게임이 끝났는지를 체크합니다.(fruit가 바닥에 닿으면 한게임이 종료됩니다.)
def isGameOver(self):
if (self.state[0] == self.gridSize - 1):
return True
else:
return False
# action(좌로 한칸 이동, 제자리, 우로 한칸이동)에 따라 basket의 위치를 업데이트합니다.
def updateState(self, action):
move = 0
if (action == 0):
move = -1
elif (action == 1):
move = 0
elif (action == 2):
move = 1
fruitRow, fruitColumn, basket = self.getState()
newBasket = min(max(2, basket + move), self.gridSize - 1) # min/max는 basket이 grid밖으로 벗어나는것을 방지합니다.
fruitRow = fruitRow + 1 # fruit는 매 행동을 취할때마다 1칸씩 아래로 떨어집니다.
self.state = np.array([fruitRow, fruitColumn, newBasket])
# 행동을 취합니다. 0 : 왼쪽으로 이동, 1 : 가만히 있기, 2 : 오른쪽으로 이동
def act(self, action):
self.updateState(action)
reward = self.getReward()
gameOver = self.isGameOver()
return self.observe(), reward, gameOver, self.getState()
# Replay Memory를 class로 정의합니다.
class ReplayMemory:
def __init__(self, gridSize, maxMemory, discount):
self.maxMemory = maxMemory
self.gridSize = gridSize
self.state_size = self.gridSize * self.gridSize
self.discount = discount
canvas = np.zeros((self.gridSize, self.gridSize))
canvas = np.reshape(canvas, (-1,self.state_size))
self.inputState = np.empty((self.maxMemory, 100), dtype = np.float32)
self.actions = np.zeros(self.maxMemory, dtype = np.uint8)
self.nextState = np.empty((self.maxMemory, 100), dtype = np.float32)
self.gameOver = np.empty(self.maxMemory, dtype = np.bool)
self.rewards = np.empty(self.maxMemory, dtype = np.int8)
self.count = 0
self.current = 0
# 경험을 Replay Memory에 저장합니다.
def remember(self, currentState, action, reward, nextState, gameOver):
self.actions[self.current] = action
self.rewards[self.current] = reward
self.inputState[self.current, ...] = currentState
self.nextState[self.current, ...] = nextState
self.gameOver[self.current] = gameOver
self.count = max(self.count, self.current + 1)
self.current = (self.current + 1) % self.maxMemory
def getBatch(self, y_pred, batch_size, num_actions, state_size, sess, X):
# 취할 수 있는 가장 큰 배치 사이즈를 선택합니다. (학습 초기에는 batch_size만큼의 기억이 없습니다.)
memoryLength = self.count
chosenBatchSize = min(batch_size, memoryLength)
# 인풋 데이터와 타겟데이터를 선언합니다.
inputs = np.zeros((chosenBatchSize, state_size))
targets = np.zeros((chosenBatchSize, num_actions))
# 배치안의 값을 설정합니다.
for i in range(chosenBatchSize):
# 배치에 포함될 기억을 랜덤으로 선택합니다.
randomIndex = random.randrange(0, memoryLength)
# 현재 상태와 Q값을 불러옵니다.
current_inputState = np.reshape(self.inputState[randomIndex], (1, 100))
target = sess.run(y_pred, feed_dict={X: current_inputState})
# 현재 상태 바로 다음 상태를 불러오고 다음 상태에서 취할수 있는 가장 큰 Q값을 계산합니다.
current_nextState = np.reshape(self.nextState[randomIndex], (1, 100))
nextStateQ = sess.run(y_pred, feed_dict={X: current_nextState})
nextStateMaxQ = np.amax(nextStateQ)
# 만약 게임오버라면 reward로 Q값을 업데이트하고
if (self.gameOver[randomIndex] == True):
target[self.actions[randomIndex]] = self.rewards[randomIndex]
# 게임오버가 아니라면 타겟 Q값(최적의 Q값)을 아래 수식을 이용해서 계산합니다.
# Q* = reward + discount(gamma) * max_a' Q(s',a')
else:
target[self.actions[randomIndex]] = self.rewards[randomIndex] + self.discount * nextStateMaxQ
# 인풋과 타겟 데이터에 값을 지정합니다.
inputs[i] = current_inputState
targets[i] = target
return inputs, targets
# 학습을 진행하는 main 함수를 정의합니다.
def main(_):
print("트레이닝을 시작합니다.")
# 게임 플레이 환경을 선언합니다.
env = CatchEnvironment(gridSize)
# Replay Memory를 선언합니다.
memory = ReplayMemory(gridSize, maxMemory, discount)
# 학습된 파라미터를 저장하기 위한 tf.train.Saver를 선언합니다.
saver = tf.train.Saver()
winCount = 0
with tf.Session() as sess:
# 변수들의 초기값을 할당합니다.
sess.run(tf.global_variables_initializer())
for i in range(num_epochs+1):
# 환경을 초기화합니다.
err = 0
env.reset()
isGameOver = False
# 최초의 상태를 불러옵니다.
currentState = env.observe()
while (isGameOver != True):
action = -9999 # Q값을 초기화합니다.
# epsilon-Greedy 기법에 따라 랜덤한 행동을 할지 최적의 행동을 할지를 결정합니다.
global epsilon
if (randf(0, 1) <= epsilon):
# epsilon 확률만큼 랜덤한 행동을 합니다.
action = random.randrange(0, num_actions)
else:
# (1-epsilon) 확률만큼 최적의 행동을 합니다.
# 현재 상태를 DQN의 인풋으로 넣어서 예측된 최적의 Q(s,a)값들을 리턴받습니다.
q = sess.run(y_pred, feed_dict={x: currentState})
# Q(s,a)가 가장 높은 행동을 선택합니다.
action = q.argmax()
# epsilon값을 0.9999만큼 Decay합니다.
if (epsilon > epsilonMinimumValue):
epsilon = epsilon * 0.999
# 에이전트가 행동을 하고 다음 보상과 다음 상태에 대한 정보를 리턴 받습니다.
nextState, reward, gameOver, stateInfo = env.act(action)
# 만약 과일을 제대로 받아냈다면 승리 횟수를 1 올립니다.
if (reward == 1):
winCount = winCount + 1
# 에이전트가 수집한 정보를 Replay Memory에 저장합니다.
memory.remember(currentState, action, reward, nextState, gameOver)
# 현재 상태를 다음 상태로 업데이트하고 GameOver유무를 체크합니다.
currentState = nextState
isGameOver = gameOver
# Replay Memory로부터 학습에 사용할 Batch 데이터를 불러옵니다.
inputs, targets = memory.getBatch(y_pred, batch_size, num_actions, state_size, sess, x)
# 최적화를 수행하고 손실함수를 리턴받습니다.
_, loss_print = sess.run([optimizer, loss], feed_dict={x: inputs, y: targets})
err = err + loss_print
print("반복(Epoch): %d, 에러(err): %.4f, 승리횟수(Win count): %d, 승리비율(Win ratio): %.4f" % (i, err, winCount, float(winCount)/float(i+1)*100))
# 학습이 모두 끝나면 파라미터를 지정된 경로에 저장합니다.
print("트레이닝 완료")
save_path = saver.save(sess, os.getcwd()+"/model.ckpt")
print("%s 경로에 파라미터가 저장되었습니다" % save_path)
if __name__ == '__main__':
# main 함수를 호출합니다.
tf.app.run()
- play_catch_game.ipyth
# -*- coding: utf-8 -*-
%matplotlib
%matplotlib inline
from train_catch_game import *
from IPython import display
import matplotlib.patches as patches
import pylab as pl
import time
import tensorflow as tf
import os
# 설정값들을 정의합니다.
gridSize = 10
maxGames = 30
env = CatchEnvironment(gridSize)
winCount = 0
loseCount = 0
numberOfGames = 0
# 화면을 그리기 위한 설정들을 정의합니다.
ground = 1
plot = pl.figure(figsize=(12,12))
axis = plot.add_subplot(111, aspect='equal')
axis.set_xlim([-1, 12])
axis.set_ylim([0, 12])
# 파라미터를 불러오기 위한 tf.train.Saver() class를 선언합니다.
saver = tf.train.Saver()
# 현재 상태를 그리기 위한 drawState 함수를 정의합니다.
def drawState(fruitRow, fruitColumn, basket, gridSize):
# 과일이 몇번째 세로축에 있는지 정의합니다.
fruitX = fruitColumn
# 과일이 몇번째 가로축에 있는지 정의합니다.
fruitY = (gridSize - fruitRow + 1)
# 승리 횟수, 패배 횟수, 전체 게임 횟수를 화면 상단에 출력합니다.
statusTitle = "Wins: " + str(winCount) + " Losses: " + str(loseCount) + " TotalGame: " + str(numberOfGames)
axis.set_title(statusTitle, fontsize=30)
for p in [
# 배경의 위치를 지정합니다.
patches.Rectangle(
((ground - 1), (ground)), 11, 10,
facecolor="#000000" # Black
),
# 바구니의 위치를 지정합니다.
patches.Rectangle(
(basket - 1, ground), 2, 0.5,
facecolor="#FF0000" # Red
),
# 과일의 위치를 지정합니다.
patches.Rectangle(
(fruitX - 0.5, fruitY - 0.5), 1, 1,
facecolor="#0000FF" # Blue
),
]:
axis.add_patch(p)
display.clear_output(wait=True) # True : 다음그림이 올때 지워짐
display.display(pl.gcf()) # 완전히 지움
with tf.Session() as sess:
# 저장된 파라미터를 불러옵니다.
saver.restore(sess, os.getcwd()+"/model.ckpt")
print('저장된 파라미터를 불러왔습니다!')
# maxGames 횟수만큼 게임을 플레이합니다.
while (numberOfGames < maxGames):
numberOfGames = numberOfGames + 1
# 최초의 상태를 정의합니다.
isGameOver = False
fruitRow, fruitColumn, basket = env.reset()
currentState = env.observe()
drawState(fruitRow, fruitColumn, basket, gridSize)
while (isGameOver != True):
# 현재 상태를 DQN의 입력값으로 넣고 구한 Q값중 가장 큰 Q값을 갖는 행동을 취합니다.
q = sess.run(y_pred, feed_dict={x: currentState})
action = q.argmax()
# 행동을 취하고 다음 상태로 넘어갑니다.
nextState, reward, gameOver, stateInfo = env.act(action)
fruitRow = stateInfo[0]
fruitColumn = stateInfo[1]
basket = stateInfo[2]
# 과일을 받아내면 winCount를 1 늘리고 과일을 받아내지 못하면 loseCount를 1 늘립니다.
if (reward == 1):
winCount = winCount + 1
elif (reward == -1):
loseCount = loseCount + 1
currentState = nextState
isGameOver = gameOver
drawState(fruitRow, fruitColumn, basket, gridSize)
# 다음 행동을 취하기 전에 0.05초의 일시정지를 줍니다.
time.sleep(0.05)
# 최종 출력결과 이미지를 하나로 정리합니다.
display.clear_output(wait=True)
'딥러닝' 카테고리의 다른 글
Bagging/Boosting/Stacking (0) | 2019.08.21 |
---|---|
0814 RNN 구조로 뒤에올 문자 예측하기 (0) | 2019.08.14 |
0814 embedding으로 단어 연관성 예측하기 (0) | 2019.08.14 |
0813 ResNet 프로젝트 (0) | 2019.08.13 |
0813 CNN으로 MNIST 분류기 구현하기 (0) | 2019.08.13 |