forked from rlcode/reinforcement-learning-kr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cartpole_a2c.py
159 lines (128 loc) · 5.8 KB
/
cartpole_a2c.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import sys
import gym
import pylab
import numpy as np
from keras.layers import Dense
from keras.models import Sequential
from keras.optimizers import Adam
from keras import backend as K
EPISODES = 1000
# 카트폴 예제에서의 액터-크리틱(A2C) 에이전트
class A2CAgent:
def __init__(self, state_size, action_size):
self.render = False
self.load_model = False
# 상태와 행동의 크기 정의
self.state_size = state_size
self.action_size = action_size
self.value_size = 1
# 액터-크리틱 하이퍼파라미터
self.discount_factor = 0.99
self.actor_lr = 0.001
self.critic_lr = 0.005
# 정책신경망과 가치신경망 생성
self.actor = self.build_actor()
self.critic = self.build_critic()
self.actor_updater = self.actor_optimizer()
self.critic_updater = self.critic_optimizer()
if self.load_model:
self.actor.load_weights("./save_model/cartpole_actor_trained.h5")
self.critic.load_weights("./save_model/cartpole_critic_trained.h5")
# actor: 상태를 받아 각 행동의 확률을 계산
def build_actor(self):
actor = Sequential()
actor.add(Dense(24, input_dim=self.state_size, activation='relu',
kernel_initializer='he_uniform'))
actor.add(Dense(self.action_size, activation='softmax',
kernel_initializer='he_uniform'))
actor.summary()
return actor
# critic: 상태를 받아서 상태의 가치를 계산
def build_critic(self):
critic = Sequential()
critic.add(Dense(24, input_dim=self.state_size, activation='relu',
kernel_initializer='he_uniform'))
critic.add(Dense(24, input_dim=self.state_size, activation='relu',
kernel_initializer='he_uniform'))
critic.add(Dense(self.value_size, activation='linear',
kernel_initializer='he_uniform'))
critic.summary()
return critic
# 정책신경망의 출력을 받아 확률적으로 행동을 선택
def get_action(self, state):
policy = self.actor.predict(state, batch_size=1).flatten()
return np.random.choice(self.action_size, 1, p=policy)[0]
# 정책신경망을 업데이트하는 함수
def actor_optimizer(self):
action = K.placeholder(shape=[None, self.action_size])
advantage = K.placeholder(shape=[None, ])
action_prob = K.sum(action * self.actor.output, axis=1)
cross_entropy = K.log(action_prob) * advantage
loss = -K.sum(cross_entropy)
optimizer = Adam(lr=self.actor_lr)
updates = optimizer.get_updates(self.actor.trainable_weights, [], loss)
train = K.function([self.actor.input, action, advantage], [],
updates=updates)
return train
# 가치신경망을 업데이트하는 함수
def critic_optimizer(self):
target = K.placeholder(shape=[None, ])
loss = K.mean(K.square(target - self.critic.output))
optimizer = Adam(lr=self.critic_lr)
updates = optimizer.get_updates(self.critic.trainable_weights, [], loss)
train = K.function([self.critic.input, target], [], updates=updates)
return train
# 각 타임스텝마다 정책신경망과 가치신경망을 업데이트
def train_model(self, state, action, reward, next_state, done):
value = self.critic.predict(state)[0]
next_value = self.critic.predict(next_state)[0]
act = np.zeros([1, self.action_size])
act[0][action] = 1
# 벨만 기대 방정식를 이용한 어드벤티지와 업데이트 타깃
if done:
advantage = reward - value
target = [reward]
else:
advantage = (reward + self.discount_factor * next_value) - value
target = reward + self.discount_factor * next_value
self.actor_updater([state, act, advantage])
self.critic_updater([state, target])
if __name__ == "__main__":
# CartPole-v1 환경, 최대 타임스텝 수가 500
env = gym.make('CartPole-v1')
# 환경으로부터 상태와 행동의 크기를 받아옴
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
# 액터-크리틱(A2C) 에이전트 생성
agent = A2CAgent(state_size, action_size)
scores, episodes = [], []
for e in range(EPISODES):
done = False
score = 0
state = env.reset()
state = np.reshape(state, [1, state_size])
while not done:
if agent.render:
env.render()
action = agent.get_action(state)
next_state, reward, done, info = env.step(action)
next_state = np.reshape(next_state, [1, state_size])
# 에피소드가 중간에 끝나면 -100 보상
reward = reward if not done or score == 499 else -100
agent.train_model(state, action, reward, next_state, done)
score += reward
state = next_state
if done:
# 에피소드마다 학습 결과 출력
score = score if score == 500.0 else score + 100
scores.append(score)
episodes.append(e)
pylab.plot(episodes, scores, 'b')
pylab.savefig("./save_graph/cartpole_a2c.png")
print("episode:", e, " score:", score)
# 이전 10개 에피소드의 점수 평균이 490보다 크면 학습 중단
if np.mean(scores[-min(10, len(scores)):]) > 490:
agent.actor.save_weights("./save_model/cartpole_actor.h5")
agent.critic.save_weights(
"./save_model/cartpole_critic.h5")
sys.exit()