-
Notifications
You must be signed in to change notification settings - Fork 1
/
capsLayer.py
155 lines (136 loc) · 6.85 KB
/
capsLayer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# encoding:UTF-8
import numpy as np
import tensorflow as tf
import tensorflow.contrib.layers as tcl
class CapsLayer(object):
""" Capsule layer.
Args:
input: A 4-D tensor.
num_outputs: the number of capsule in this layer.
vec_len: integer, the length of the output vector of a capsule.
layer_type: string, one of 'FC' or "CONV", the type of this layer,
fully connected or convolution, for the future expansion capability
with_routing: boolean, this capsule is routing with the
lower-level layer capsule.
Returns:
A 4-D tensor.
"""
def __init__(self, batch_size, num_outputs, vec_len, with_routing=True, layer_type='FC'):
self.batch_size = batch_size
self.num_outputs = num_outputs
self.vec_len = vec_len
self.with_routing = with_routing
self.layer_type = layer_type
pass
def __call__(self, input, kernel_size=None, stride=None):
"""
The parameters 'kernel_size' and 'stride' will be used while 'layer_type' equal 'CONV'
"""
if self.layer_type == 'CONV':
self.kernel_size = kernel_size
self.stride = stride
# the PrimaryCaps layer, a convolutional layer
# input: [?, 20, 20, 256]
if not self.with_routing:
'''
# version 1, computational expensive
capsules = []
for i in range(self.vec_len):
# each capsule i: [batch_size, 6, 6, 32]
with tf.variable_scope('ConvUnit_' + str(i)):
caps_i = tf.contrib.layers.conv2d(input, self.num_outputs,
self.kernel_size, self.stride,
padding="VALID", activation_fn=None)
caps_i = tf.reshape(caps_i, shape=(cfg.batch_size, -1, 1, 1))
capsules.append(caps_i)
assert capsules[0].get_shape() == [cfg.batch_size, 1152, 1, 1]
capsules = tf.concat(capsules, axis=2)
'''
# version 2, equivalent to version 1 but higher computational efficiency.
# NOTE: I can't find out any words from the paper whether the
# PrimaryCap convolution does a ReLU activation or not before
# squashing function, but experiment show that using ReLU get a
# higher test accuracy. So, which one to use will be your choice
capsules = tcl.conv2d(input, self.num_outputs * self.vec_len, self.kernel_size, self.stride,
padding="VALID", activation_fn=tf.nn.relu) # [?, 256, 6, 6]
capsules = tf.reshape(capsules, (self.batch_size, -1, self.vec_len, 1)) # [?, 1152, 8, 1]
return self.squash(capsules) # [?, 1152, 8, 1]
if self.layer_type == 'FC':
if self.with_routing:
# the DigitCaps layer, a fully connected layer
# Reshape the input into [?, 1152, 1, 8, 1]
self.input = tf.reshape(input, shape=(self.batch_size, -1, 1, input.shape[-2].value, 1))
with tf.variable_scope('routing'):
capsules = self.routing(self.input) # [?, 1, 10, 16, 1]
capsules = tf.squeeze(capsules, axis=1)
return capsules
pass
pass
def routing(self, input, stddev=0.01, iter_routing=3):
"""
The routing algorithm.
Args:
input: A Tensor with [batch_size, num_caps_in=1152, 1, length(u_i)=8, 1] shape,
num_caps_l meaning the number of capsule in the layer l.
Returns:
A Tensor of shape [batch_size, num_caps_out, length(v_j)=16, 1] representing
the vector output `v_j` in the layer l+1
Notes:
u_i represents the vector output of capsule i in the layer in, and
v_j the vector output of capsule j in the layer out.
:param input:
:param iter_routing:
:param stddev:
"""
# Eq.2, u_hat:W_ij * u_i
# W: [num_caps_j, num_caps_i, len_u_i, len_v_j]
w = tf.get_variable('Weight', shape=(1, 1152, 10, 8, 16), dtype=tf.float32,
initializer=tf.random_normal_initializer(stddev=stddev))
# do tiling for input and W before matmul
input = tf.tile(input, [1, 1, 10, 1, 1]) # input => [?, 1152, 10, 8, 1]
w = tf.tile(w, [self.batch_size, 1, 1, 1, 1]) # W => [?, 1152, 10, 8, 16]
u_hat = tf.matmul(w, input, transpose_a=True) # [8, 16].T x [8, 1] => [16, 1] => [?, 1152, 10, 16, 1]
# line 2:
# b_IJ: [batch_size, num_caps_in, num_caps_out, 1, 1],
# about the reason of using 'batch_size', see issue #21
b_ij = np.zeros([self.batch_size, input.shape[1].value, self.num_outputs, 1, 1], dtype=np.float32)
b_ij = tf.constant(b_ij) # line 2: b_ij <- 0 # [batch_size, 1152, 10, 1, 1]
v_j = None
# line 3:
# for r iterations do
for r_iter in range(iter_routing):
with tf.variable_scope('iter_' + str(r_iter)):
# line 4: b_ij.shape = [?, 1152, 10, 1, 1]
c_ij = tf.nn.softmax(b_ij, dim=2) # [?, 1152, 10, 1, 1]
# line 5:
# weighting u_hat with c_ij, element-wise in the last two dims
# [?, 1152, 10, 1, 1] x [?, 1152, 10, 16, 1] => [?, 1152, 10, 16, 1]
s_j = tf.multiply(c_ij, u_hat) # [?, 1152, 10, 16, 1]
# then sum in the second dim, resulting in [?, 1, 10, 16, 1]
s_j = tf.reduce_sum(s_j, axis=1, keep_dims=True) # [?, 1, 10, 16, 1]
# line 6:
# squash using Eq.1,
v_j = self.squash(s_j) # [?, 1, 10, 16, 1]
# line 7:
v_j_tiled = tf.tile(v_j, [1, 1152, 1, 1, 1]) # [?, 1152, 10, 16, 1]
# [?, 1152, 10, 16, 1].T x [?, 1152, 10, 16, 1] => [?, 1152, 10, 1, 1]
b_ij += tf.matmul(u_hat, v_j_tiled, transpose_a=True)
pass
pass
return v_j
# 挤压函数:向量单位化和缩放操作
@staticmethod
def squash(vector):
"""
Squashing function corresponding to Eq.1
Args:
vector: A 5-D tensor with shape [batch_size, 1, num_caps, vec_len, 1],
Returns:
A 5-D tensor with the same shape as vector but squashed in 4rd and 5th dimensions.
"""
epsilon = 1e-9
vec_squared_norm = tf.reduce_sum(tf.square(vector), -2, keep_dims=True)
scalar_factor = (vec_squared_norm / (1 + vec_squared_norm)) / tf.sqrt(vec_squared_norm + epsilon)
vec_squashed = scalar_factor * vector # element-wise
return vec_squashed
pass