-
Notifications
You must be signed in to change notification settings - Fork 1
/
dataLoader.py
113 lines (79 loc) · 3.48 KB
/
dataLoader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os
import numpy as np
from numba import jit
from sklearn.preprocessing import StandardScaler
@jit
def generateWindow(X:np.ndarray, window_size:int) -> np.ndarray:
x_data = []
n_data = X.shape[0]
for idx in range(n_data - window_size):
x_data.append(
X[idx:idx+window_size].reshape(1, window_size, -1)
)
x_data = np.concatenate(x_data)
return x_data
class dataLoader():
def __init__(self, base_path = None):
self.sensor_train = dict()
self.motion_train = dict()
self.motion_pre_train = dict()
self.sensor_test = dict()
self.motion_test = dict()
self.window_size = 120
self.tags = ['W' + str(i) for i in range(2, 7)]
if base_path == None:
self.base_path = os.path.join(os.getcwd(), 'dataset')
else :
self.base_path = os.path.join(base_path, 'dataset')
self.sensor_scaler = StandardScaler()
self.loadDataSet()
self.normalizeSensorData()
def normalizeSensorData(self):
base_data = [data for _, data in self.sensor_train.items()]
base_data = np.concatenate(base_data)
self.sensor_scaler.fit(base_data)
def loadDataSet(self):
# sensor
file_name = "train_{}_S.csv"
for tag in self.tags :
self.sensor_train[tag] = np.loadtxt(os.path.join(self.base_path,file_name.format(tag)), delimiter=',')
file_name = "test_{}_S.csv"
for tag in self.tags :
self.sensor_test[tag] = np.loadtxt(os.path.join(self.base_path,file_name.format(tag)), delimiter=',')
# motion
file_name = "train_{}_M.csv"
for tag in self.tags :
self.motion_train[tag] = np.loadtxt(os.path.join(self.base_path,file_name.format(tag)), delimiter=',')
file_name = "test_{}_M.csv"
for tag in self.tags :
self.motion_test[tag] = np.loadtxt(os.path.join(self.base_path,file_name.format(tag)), delimiter=',')
file_name = "pre_{}_M.csv"
for tag in self.tags :
self.motion_pre_train[tag] = np.loadtxt(os.path.join(self.base_path,file_name.format(tag)), delimiter=',')
def getPretrainDataSet(self) -> np.ndarray:
data = [data for _, data in self.motion_pre_train.items()]
return np.concatenate(data)
def getTrainDataSet(self) -> (np.ndarray, np.ndarray) :
sensor = []
for _ , data in self.sensor_train.items():
data = self.sensor_scaler.transform(data)
data = generateWindow(data, self.window_size)
sensor.append(data)
sensor = np.concatenate(sensor)
motion = np.concatenate([data for _, data in self.motion_train.items()])
return sensor, motion
def getTestDataSet(self) -> (np.ndarray, np.ndarray):
sensor = []
for _, data in self.sensor_test.items():
data = self.sensor_scaler.transform(data)
data = generateWindow(data, self.window_size)
sensor.append(data)
sensor = np.concatenate(sensor)
motion = np.concatenate([data for _, data in self.motion_test.items()])
return sensor, motion
def getTestDataSetTags(self, tag:str) -> (np.ndarray, np.ndarray):
sensor = self.sensor_test[tag]
sensor = self.sensor_scaler.transform(sensor)
sensor = generateWindow(sensor, self.window_size)
motion = self.motion_test[tag]
return sensor, motion