-
Notifications
You must be signed in to change notification settings - Fork 0
/
selective_memory_dqn.py
160 lines (130 loc) · 4.88 KB
/
selective_memory_dqn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import gym
from random import sample
from numpy import random, argmax
import numpy as np
from nn_maker import make_dnn
class SelectiveDQNLearner:
def __init__(self, env, good_episode, epsilon=.3, N=1000, sample_size=32, lr=.2, discount=.96):
"""
Used with Acrobot-v1
good episode is fed the reward upon termination
"""
self.good_episode = good_episode
self.env = env
self._state = None
self.epsilon = epsilon
self.bad_memory = []
self.good_memory = []
self.temp_memory = []
self.memory_size = N
self.sample_size = sample_size
self.lr = lr
self.discount = discount
self.silent = True
self.Qnn = make_dnn(env)
self.reset()
def reset(self):
if self.good_episode(self): # see if last memory is good
for trans in self.temp_memory:
self._remember(trans, dumping=True)
self.temp_memory = []
self.state = self.env.reset()
def _get_action(self, learning):
exploring = random.uniform() < self.epsilon
if learning and exploring:
return self.env.action_space.sample()
else: # exploiting
return self.optimal_action(self.state)
def step(self, learning):
action = self._get_action(learning)
obs, reward, done, info = self.env.step(action)
prev_state = self.state
self.state = obs
if learning:
transition = (prev_state, action, reward, obs, done)
self._remember(transition)
self._update_q()
if done:
self.reset()
return reward, done
def _get_x_y(self, good, bad):
good_replay = sample(self.good_memory, min(good, len(self.good_memory)))
bad_replay = sample(self.bad_memory, min(bad, len(self.bad_memory)))
replay = np.stack(good_replay + bad_replay, axis=0)
states = np.array([a[0] for a in replay])
new_states = np.array([a[3] for a in replay])
Q = self.Qnn.predict(states)
Q_new = self.Qnn.predict(new_states)
replay_size = len(replay)
X = np.empty((replay_size, len(states[0])))
y = np.empty((replay_size, len(Q[0])))
for i in range(replay_size):
state_r, action_r, reward_r, new_state_r, done_r = replay[i]
target = Q[i]
target[action_r] = reward_r
# If we're done the utility is simply the reward of executing action a in
# state s, otherwise we add the expected maximum future reward as well
if not done_r:
target[action_r] += self.discount * np.amax(Q_new[i])
X[i] = state_r
y[i] = target
return X, y
def _update_q(self):
X, y = self._get_x_y(self.sample_size * 2, self.sample_size)
# ®self.Qnn.optimizer.lr.assign(self.lr)
self.Qnn.train_on_batch(X, y)
def silent_level(self):
if self.silent:
return 0
else:
return 2
def optimal_action(self, state):
actions = self.Qnn.predict(np.expand_dims(state, axis=0))[0]
assert max(actions) != float('nan') and max(actions) != float('inf')
return argmax(actions)
def _remember(self, transition, dumping=False):
if not dumping:
self.temp_memory.append(transition)
self.bad_memory.append(transition)
else:
self.good_memory.append(transition)
while len(self.good_memory) > self.memory_size * 2:
self.bad_memory.append(self.good_memory.pop(0))
while len(self.bad_memory) > self.memory_size:
self.bad_memory.pop(0)
@property
def state(self):
return self._state
@state.setter
def state(self, value):
if type(value) is np.ndarray:
self._state = value
elif type(value) is int or type(value) is np.int64:
arr = np.ndarray((1,))
assert len(arr) == 1
arr[0] = value
self._state = arr
else:
raise ValueError(f'got value {value} of type {type(value)}')
def learn(self, steps, static_epsilon=None):
i = 0
self.reset()
while i < steps:
epsilon = (steps - i) / steps
self.epsilon = epsilon
if static_epsilon is not None:
self.epsilon = static_epsilon
i += 1
self.step(True)
def good_episode(learner):
max_steps = learner.env._max_episode_steps
steps = learner.env._elapsed_steps
return steps is not None and steps != 0 and steps < max_steps
if __name__ == '__main__':
import pickle
from gym.envs.toy_text.frozen_lake import generate_random_map
env_name = 'Acrobot-v1'
env = gym.make(env_name)
actor = SelectiveDQNLearner(env, good_episode)
file = open("pickles/selective_dqn.pkl", 'wb+')
pickle.dump(actor, file)