-
Notifications
You must be signed in to change notification settings - Fork 0
/
agent_cuda.py
85 lines (69 loc) · 2.71 KB
/
agent_cuda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import random
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
from model import DQN
from parameters import DEVICE, batch_size
class DQNAgent:
def __init__(
self,
state_dim,
action_dim,
lr,
gamma,
epsilon,
epsilon_decay,
epsilon_min,
buffer_size,
):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.lr = lr
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
self.memory = deque(maxlen=buffer_size)
self.batch_size = batch_size
self.policy_net = DQN(state_dim, action_dim).to(DEVICE)
self.target_net = DQN(state_dim, action_dim).to(DEVICE)
self.target_net.load_state_dict(self.policy_net.state_dict())
self.target_net.eval()
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
self.loss_fn = nn.MSELoss()
self.steps_done = 0
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if random.random() < self.epsilon:
return random.choices([0, 1], weights=[0.70, 0.30])[0]
state = torch.FloatTensor(state).unsqueeze(0).to(DEVICE)
with torch.no_grad():
q_values = self.policy_net(state)
return q_values.argmax().item()
def replay(self):
if len(self.memory) < self.batch_size:
return
batch = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.FloatTensor(np.array(states)).to(DEVICE)
actions = torch.LongTensor(actions).unsqueeze(1).to(DEVICE)
rewards = torch.FloatTensor(rewards).unsqueeze(1).to(DEVICE)
next_states = torch.FloatTensor(np.array(next_states)).to(DEVICE)
dones = torch.FloatTensor(dones).unsqueeze(1).to(DEVICE)
current_q = self.policy_net(states).gather(1, actions)
with torch.no_grad():
max_next_q = self.target_net(next_states).max(1)[0].unsqueeze(1)
target_q = rewards + (self.gamma * max_next_q * (1 - dones))
loss = self.loss_fn(current_q, target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def decay_epsilon(self):
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
self.epsilon = max(self.epsilon_min, self.epsilon)
def update_target_network(self):
self.target_net.load_state_dict(self.policy_net.state_dict())