-
Notifications
You must be signed in to change notification settings - Fork 0
/
mnist.py
117 lines (86 loc) · 3.11 KB
/
mnist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import numpy as np
import os
import cntk as C
from cntk.io import MinibatchSource, CTFDeserializer, StreamDef, StreamDefs, INFINITELY_REPEAT
from cntk.layers import Sequential, Dense
from cntk.ops import input, element_times, constant, sigmoid
from cntk.losses import cross_entropy_with_softmax
from cntk.metrics import classification_error
from cntk.learners import sgd, learning_rate_schedule, UnitType
from cntk.logging import ProgressPrinter
from cntk.train import Trainer, training_session
from cntk.device import try_set_default_device, gpu
data_dir = abs_path = os.path.dirname(os.path.abspath(__file__)) + '/datasets/MNIST/'
def create_reader(path, is_training):
return MinibatchSource(
CTFDeserializer(
path,
StreamDefs(
features = StreamDef(field='features', shape=784, is_sparse=False),
labels = StreamDef(field='labels', shape=10, is_sparse=False)
)
),
randomize = is_training,
max_sweeps = INFINITELY_REPEAT if is_training else 1)
def create_model(hidden_dim, output_dim, feature, label):
scaled_feature = element_times(constant(0.00390625), feature)
model = Sequential([
Dense(hidden_dim, activation=sigmoid),
Dense(output_dim)
])
return model(scaled_feature)
def train(path, hidden_dim, output_dim, feature, label):
minibatch_size = 100
num_inputs = 60000
num_epochs = 10
reader = create_reader(path, True)
input_map = {
feature : reader.streams.features,
label : reader.streams.labels
}
y = create_model(hidden_dim, output_dim, feature, label)
loss = cross_entropy_with_softmax(y, label)
label_error = classification_error(y, label)
lr = learning_rate_schedule(0.2, UnitType.minibatch)
trainer = Trainer(
model = y,
criterion = (loss, label_error),
parameter_learners = sgd(y.parameters, lr),
progress_writers = [ProgressPrinter(tag='train', num_epochs=num_epochs)]
)
session = training_session(
trainer = trainer,
mb_source = reader,
mb_size = minibatch_size,
model_inputs_to_streams = input_map,
max_samples = num_inputs * num_epochs,
progress_frequency = num_inputs
)
session.train()
return trainer
def test(path, trainer, feature, label):
num_inputs = 10000
minibatch_size = 1024
num_minibatches_to_test = num_inputs // minibatch_size
reader = create_reader(path, False)
input_map = {
feature : reader.streams.features,
label : reader.streams.labels
}
test_result = 0.0
for i in range(num_minibatches_to_test):
minibatch = reader.next_minibatch(minibatch_size, input_map = input_map)
eval_error = trainer.test_minibatch(minibatch)
test_result = test_result + eval_error
return test_result / num_minibatches_to_test
if __name__=='__main__':
# try_set_default_device(gpu(0))
train_data_path = data_dir + 'Train-28x28_cntk_text.txt'
test_data_path = data_dir + 'Test-28x28_cntk_text.txt'
input_dim = 784
output_dim = 10
feature = input(input_dim, np.float32)
label = input(output_dim, np.float32)
trainer = train(train_data_path, 15, 10, feature, label)
error = test(test_data_path, trainer, feature, label)
print("Error: %f" % error)