forked from PaddlePaddle/PARL
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mujoco_agent.py
77 lines (66 loc) · 2.81 KB
/
mujoco_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import paddle
import parl
import utils
from optimizers import Adam
class MujocoAgent(parl.Agent):
def __init__(self, algorithm, config):
self.config = config
super(MujocoAgent, self).__init__(algorithm)
self.obs_shape = config['obs_dim']
weights = self.get_weights()
self.weights_name = list(weights.keys())
weights = list(weights.values())
self.weights_shapes = [x.shape for x in weights]
self.weights_total_size = np.sum(
[np.prod(x) for x in self.weights_shapes])
self.optimizer = Adam(self.weights_total_size, self.config['stepsize'])
def predict(self, obs):
obs = obs.astype('float32')
obs = np.expand_dims(obs, axis=0)
obs = paddle.to_tensor(obs, dtype='float32')
predict_actions = self.alg.predict(obs)
return predict_actions.detach().numpy()
def learn(self, noisy_rewards, noises):
""" Update weights of the model in the numpy level.
Compute the grident and take a step.
Args:
noisy_rewards(np.float32): [batch_size, 2]
noises(np.float32): [batch_size, weights_total_size]
"""
g = utils.batched_weighted_sum(
# Mirrored sampling: evaluate pairs of perturbations \epsilon, −\epsilon
noisy_rewards[:, 0] - noisy_rewards[:, 1],
noises,
batch_size=500)
g /= noisy_rewards.size
latest_flat_weights = self.get_flat_weights()
# Compute the new weights theta.
theta, update_ratio = self.optimizer.update(
latest_flat_weights,
-g + self.config["l2_coeff"] * latest_flat_weights)
self.set_flat_weights(theta)
def get_flat_weights(self):
weights = list(self.get_weights().values())
flat_weights = np.concatenate([x.flatten() for x in weights])
return flat_weights
def set_flat_weights(self, flat_weights):
weights = utils.unflatten(flat_weights, self.weights_shapes)
weights_dcit = {}
assert len(weights) == len(self.weights_name)
for name, values in zip(self.weights_name, weights):
weights_dcit[name] = values
self.set_weights(weights_dcit)