-
Notifications
You must be signed in to change notification settings - Fork 0
/
dqnCartPole.py
106 lines (85 loc) · 3.32 KB
/
dqnCartPole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import gym
import random
from keras import Sequential
from collections import deque
from keras.layers import Dense
from keras.optimizers import Adam
import numpy as np
class DQN:
""" Implementation of DQN algorithm """
def __init__(self, action_space, state_space):
self.action_space = action_space
self.state_space = state_space
self.epsilon = 1
self.gamma = .95
self.batch_size = 32
self.epsilon_min = .01
self.epsilon_decay = .995
self.learning_rate = 0.001
self.memory = deque(maxlen=10000)
self.model = self.build_model()
self.scores = []
self.episodes = []
self.average = []
def build_model(self):
model = Sequential()
model.add(Dense(32, input_shape=(self.state_space,), activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(self.action_space, activation='linear'))
model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
return model
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_space)
act_values = self.model.predict(state)
return np.argmax(act_values[0])
def replay(self):
if len(self.memory) < self.batch_size:
return
minibatch = random.sample(self.memory, self.batch_size)
states = np.array([i[0] for i in minibatch])
actions = np.array([i[1] for i in minibatch])
rewards = np.array([i[2] for i in minibatch])
next_states = np.array([i[3] for i in minibatch])
dones = np.array([i[4] for i in minibatch])
states = np.squeeze(states)
next_states = np.squeeze(next_states)
targets = rewards + self.gamma*(np.amax(self.model.predict_on_batch(next_states), axis=1))*(1-dones)
targets_full = self.model.predict_on_batch(states)
ind = np.array([i for i in range(self.batch_size)])
targets_full[[ind], [actions]] = targets
self.model.fit(states, targets_full, epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def train_dqn(episode):
agent = DQN(env.action_space.n, env.observation_space.shape[0])
scores = []
for e in (range(episode)):
state = env.reset()
state = np.reshape(state, (1, 4))
score = 0
max_steps = 1000
for i in range(max_steps):
#env.render()
action = agent.act(state)
next_state, reward, done, _ = env.step(action)
score += reward
next_state = np.reshape(next_state, (1, 4))
agent.remember(state, action, reward, next_state, done)
state = next_state
agent.replay()
if done:
scores.append(score)
np.savetxt('result.out', scores, delimiter='\n')
print("episode: {}/{}, score: {}".format(e, episode, score))
#agent.PlotModel(score, e)
break
return scores
if __name__ == '__main__':
env = gym.make('CartPole-v1')
env.seed(0)
np.random.seed(0)
ep = 300
scores = train_dqn(ep)