-
Notifications
You must be signed in to change notification settings - Fork 1
/
rl_environment.py
194 lines (157 loc) · 7.56 KB
/
rl_environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import numpy as np
import matplotlib.pyplot as plt
class StateRepresentation:
"""
Represents the current state in the reinforcement learning environment.
"""
def __init__(self, current_location, current_direction, nearby_streets):
"""
Initialize the state representation.
Parameters:
current_location (tuple): Current GPS coordinates.
current_direction (tuple): Current direction vector.
nearby_streets (list): List of nearby street information.
"""
self.current_location = current_location
self.current_direction = current_direction
self.nearby_streets = nearby_streets
def get_state_vector(self):
"""
Convert the state into a feature vector.
Returns:
np.ndarray: Feature vector representing the state.
"""
location_vector = np.array([self.current_location[0], self.current_location[1]])
direction_vector = np.array([self.current_direction[0], self.current_direction[1]])
nearby_streets_vector = np.array(self.nearby_streets)
state_vector = np.concatenate((location_vector, direction_vector, nearby_streets_vector))
return state_vector
class RLEnvironment:
"""
Reinforcement learning environment for navigation.
"""
def __init__(self, starting_address, max_intersections, rule_choice):
"""
Initialize the RL environment.
Parameters:
starting_address (str): Starting address for navigation.
max_intersections (int): Maximum number of intersections to navigate.
rule_choice (str): Choice of navigation rule ('right', 'left', 'alternate').
"""
self.starting_address = starting_address
self.max_intersections = max_intersections
self.rule_choice = rule_choice
self.intersection_count = 0
self.current_state = self.initialize_environment(starting_address)
self.action_space = ['turn_left', 'turn_right', 'go_straight', 'turn_back']
def initialize_environment(self, current_address):
"""
Initialize the environment state.
Parameters:
current_address (str): Current address for navigation.
Returns:
dict: Initialized state of the environment.
"""
current_state = {
'current_address': current_address,
'previous_instruction': None # Initialize as None
# Add other relevant state information here
}
return current_state
def is_episode_terminated(self):
"""
Check if the episode is terminated.
Returns:
bool: True if episode is terminated, False otherwise.
"""
return self.intersection_count >= self.max_intersections
def step(self, chosen_action):
"""
Perform a step in the environment based on the chosen action.
Parameters:
chosen_action (str): Chosen action to take.
Returns:
tuple: Next state, reward, and episode termination flag.
"""
# Calculate reward based on chosen action and previous instruction
reward = self.calculate_reward(self.current_state['previous_instruction'], chosen_action)
# Update the environment and get the next state
next_state = self.update_environment(chosen_action)
# Update intersection count
if 'turn' in chosen_action:
self.intersection_count += 1
# Check if the episode is terminated
done = self.is_episode_terminated()
return next_state, reward, done
def get_observation(self):
"""
Get the current observation (state) of the environment.
Returns:
dict: Current state of the environment.
"""
return self.current_state
def visualize_agent_path(self, path, rewards, actions, correct_actions):
"""
Visualize the agent's path, actions, rewards, and correctness of actions.
Parameters:
path (list): List of visited locations.
rewards (list): List of rewards received at each step.
actions (list): List of actions taken at each step.
correct_actions (list): List of boolean values indicating correctness of actions.
"""
plt.figure(figsize=(12, 8))
plt.plot(*zip(*path), marker='o', color='b', label='Visited Locations')
for i, txt in enumerate(path):
action = actions[i]
reward = rewards[i]
correct = correct_actions[i]
# Determine color based on correctness of action
color = 'g' if correct else 'r'
plt.annotate(f"Step {i+1}\nAction: {action}\nReward: {reward:.2f}", (path[i]), textcoords="offset points", xytext=(-15, 10), ha='center', color=color)
plt.title("Agent's Path, Actions, Rewards, and Correctness")
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.legend()
plt.grid()
plt.show()
def calculate_reward(self, previous_instruction, chosen_action):
"""
Calculate the reward for a chosen action based on the previous instruction.
Parameters:
previous_instruction (str): Previous navigation instruction.
chosen_action (str): Chosen action to take.
Returns:
float: Calculated reward value.
"""
# Reward for taking the right turn at intersections
if self.rule_choice == 'right':
if 'right' in previous_instruction.lower() and chosen_action == 'turn_right':
return 1.0 # Positive reward for correct turn
elif 'right' in previous_instruction.lower():
return -0.5 # Negative reward for incorrect action after right turn
else:
return 0.0 # Neutral reward for other actions
# Reward for taking the left turn at intersections
elif self.rule_choice == 'left':
if 'left' in previous_instruction.lower() and chosen_action == 'turn_left':
return 1.0 # Positive reward for correct turn
elif 'left' in previous_instruction.lower():
return -0.5 # Negative reward for incorrect action after left turn
else:
return 0.0 # Neutral reward for other actions
# Reward for alternating turns at intersections
elif self.rule_choice == 'alternate':
if 'left' in previous_instruction.lower() and chosen_action == 'turn_right':
return 1.0 # Positive reward for alternating turn
elif 'right' in previous_instruction.lower() and chosen_action == 'turn_left':
return 1.0 # Positive reward for alternating turn
elif 'left' in previous_instruction.lower() or 'right' in previous_instruction.lower():
return -0.5 # Negative reward for incorrect action after alternate turn
else:
return 0.0 # Neutral reward for other actions
def update_environment(self, chosen_action):
# Implement how the environment is updated based on chosen action
# For example, update the current state based on chosen_action
# Update current state before returning the next state
self.current_state = next_state
return next_state