-
Notifications
You must be signed in to change notification settings - Fork 1
/
pomdp_parser.py
executable file
·228 lines (182 loc) · 7.43 KB
/
pomdp_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
#!/usr/bin/env python
import sys
import numpy as np
from numpy import matrix
from numpy import matlib
class Pomdp(object):
def __init__(self, filename='models/20150501.pomdp', parsing_print_flag=True):
self.filename = filename
self.print_flag = parsing_print_flag
try:
f = open(self.filename, 'r')
except:
print('Error: not be able to read ' + filename)
self.s = f.read()
start_states = self.s.find('states:')
self.states = self.s[start_states + 7 :
self.s.find('\n', start_states)].split()
start_actions = self.s.find('actions:')
self.actions = self.s[start_actions + 8 :
self.s.find('\n', start_actions)].split()
start_observations = self.s.find('observations:')
self.observations = self.s[start_observations + 13 :
self.s.find('\n', start_observations)].split()
self.trans_mat = np.ones((len(self.actions), len(self.states),
len(self.states)))
self.obs_mat = np.ones((len(self.actions), len(self.states),
len(self.observations)))
self.reward_mat = np.zeros((len(self.actions), len(self.states), ))
if self.print_flag:
print('number of states: ' + str(len(self.states)))
print(self.states)
print
print('number of actions: ' + str(len(self.actions)))
print(self.actions)
print
print('number of observations: ' + str(len(self.observations)))
print(self.observations)
print
self.parse_transition_matrix()
self.parse_observation_matrix()
self.parse_reward_matrix()
def parse_transition_matrix(self):
from_here = 0
while True:
ind = self.s.find('T:', from_here)
if ind == -1:
break
ind_enter = self.s.find('\n', ind)
next_ind_enter = self.s.find('\n', ind_enter + 1)
action = self.s[ind + 2 : ind_enter]
# just to remove extra spaces
action = action.split()[0]
if action not in self.actions and action is not '*':
print('Error in reading action: ' + action)
sys.exit()
first_line = self.s[ind_enter + 1 : next_ind_enter]
if 'identity' in first_line:
if '*' in action:
self.trans_mat[:] = np.matlib.identity(len(self.states))
else:
self.trans_mat[self.actions.index(action)] = np.matlib.identity(
len(self.states))
from_here = next_ind_enter
elif 'uniform' in first_line:
if '*' in action:
self.trans_mat[:] = np.ones((len(self.states), len(self.states, ))) / len(self.states)
else:
self.trans_mat[self.actions.index(action)] = np.ones((len(self.states),
len(self.states, ))) / len(self.states)
from_here = next_ind_enter
else:
# The current POMDP model does not need this part, so it's not tested -
# use with your own risk
start_matrix = ind_enter + 1
end_matrix = self.s.find('\n\n', start_matrix)
str_matrix = self.s[start_matrix : end_matrix]
str_matrix = str_matrix.replace('\n', ';')
if '*' in action:
print('str matrix: ' + str_matrix)
self.trans_mat[:] = np.matrix(str_matrix)
else:
self.trans_mat[self.actions.index(action)] = np.matrix(str_matrix)
from_here = end_matrix
for i in range(len(self.actions)):
for j in range(len(self.states)):
if abs(self.trans_mat[i,j].sum() - 1.0) > 0.00001 :
print('transition matrix, [' + str(i) + ',' + str(j) + \
',:], does not sum to 1: ' + str(self.trans_mat[i,j].sum()))
if self.print_flag:
print('reading transition matrix successfully')
print(self.trans_mat.shape)
print
return self.trans_mat
def parse_observation_matrix(self):
# search the first observation matrix from the first char
from_here = 0
while True:
ind = self.s.find('O:', from_here)
if ind == -1:
break
ind_enter = self.s.find('\n', ind)
next_ind_enter = self.s.find('\n', ind_enter + 1)
action = self.s[ind + 2 : ind_enter]
# just to remove extra spaces
action = action.split()[0]
if action not in self.actions and action is not '*':
print('Error in reading action: ' + action)
sys.exit()
# the below code assumes:
# 1, probability values come instantly below the lines with action name,
# like "O: ask_i";
# 2, there is an empty line between two observation matrices
start_matrix = ind_enter + 1
end_matrix = self.s.find('\n\n', start_matrix)
str_matrix = self.s[start_matrix : end_matrix]
str_matrix = str_matrix.replace('\n', '; ')
# convert a string separated by "; " into a matrix
self.obs_mat[self.actions.index(action)] = np.matrix(str_matrix)
# search the next observation matrix by 'O: '
from_here = ind_enter
# sanity check: probabilities sum to 1
for i in range(len(self.actions)):
for j in range(len(self.states)):
if abs(self.obs_mat[i,j].sum() - 1.0) > 0.00001:
print('observation matrix, [' + i + ',' + j + ',:], does not sum to 1')
if self.print_flag:
print('reading observation matrix successfully')
print(self.obs_mat.shape)
print
return self.obs_mat
def parse_reward_matrix(self):
# here we assume reward is assigned to (s, a), which means the ending state
# and observation are not considered --- this assumption holds in most
# problmes, as far as I know
# search the first reward matrix from the first char
from_here = 0
while True:
# find the first colon
ind_colon_first = self.s.find('R:', from_here) + 1
# if no more 'R:' can be found, then break the while loop
# for unknown reasons, sometimes it returns 0
if ind_colon_first <= 0:
break
# find the second, third and fourth colons
ind_colon_second = self.s.find(':', ind_colon_first + 1)
ind_colon_third = self.s.find(':', ind_colon_second + 1)
ind_colon_fourth = self.s.find(':', ind_colon_third + 1)
# find the enter
ind_enter = self.s.find('\n', ind_colon_fourth)
action = self.s[ind_colon_first + 1 : ind_colon_second]
action = action.split()[0]
if action not in self.actions and action is not '*':
print('Error in parsing action for reward matrix: ' + action)
sys.exit()
state = self.s[ind_colon_second + 1 : ind_colon_third]
state = state.split()[0]
if state not in self.states and state is not '*':
print('Error in parsing state for reward matrix: ' + state)
value = self.s[self.s.rfind('*', 0, ind_enter) + 1 : ind_enter]
value = float(value.split()[0])
if action == '*':
if state == '*':
self.reward_mat[:] = value
else:
self.reward_mat[:, self.states.index(state)] = value
else:
if state == '*':
self.reward_mat[self.actions.index(action), :] = value
else:
self.reward_mat[self.actions.index(action),
self.states.index(state)] = value
# search the next observation matrix by 'O: '
from_here = ind_enter
if self.print_flag:
print('reading reward matrix successfully')
print(self.reward_mat.shape)
print
return self.reward_mat
def main():
p = Pomdp()
if __name__ == '__main__':
main()