-
Notifications
You must be signed in to change notification settings - Fork 114
Draft Design doc about Supporting PyTorch
ElasticDL is an open-source distributed deep learning programming framework based on TensorFlow and Kubernetes. Considering that PyTorch is more widely used in academia, this project will support PyTorch.
The training of most neural networks can be simplified to this process:
- Define the network: Define the
Class
of the network, declare the instance of the network net=Net(). - Define the optimizer:
optimizer=optim.xxx(net.parameters(),lr=xxx)
- Define the loss function:
compute_loss=nn.MSELoss()
- training loop:
(a) Clear the gradient information in the optimizer:
optimizer.zero_grad()
(b) Forward:output=net(input)
(c) Calculate the loss:loss=compute_loss(target,output)
(d) Backward:loss.backward()
(e) Update parameters:optimizer.step()
If you want to build a network, you need to define a Class
first which inherits
nn.Module
. nn
is a very useful toolbox, import torch.nn as nn
is needed.
For example, there are mainly two functions written in this class called Net
,
one is the initialized __init__
function, and the other is the forward
function.
class Net(nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.conv1 = nn.Sequential( # input shape (1, 28, 28)
nn.Conv2d(
in_channels=1, # input height
out_channels=16, # n_filters
kernel_size=5, # filter size
stride=1, # filter movement/step
padding=2, # if want same width and length of this image after Conv2d, padding=(kernel_size-1)/2 if stride=1
), # output shape (16, 28, 28)
nn.ReLU(), # activation
nn.MaxPool2d(kernel_size=2), # choose max value in 2x2 area, output shape (16, 14, 14)
)
self.conv2 = nn.Sequential( # input shape (16, 14, 14)
nn.Conv2d(16, 32, 5, 1, 2), # output shape (32, 14, 14)
nn.ReLU(), # activation
nn.MaxPool2d(2), # output shape (32, 7, 7)
)
self.out = nn.Linear(32 * 7 * 7, 10) # fully connected layer, output 10 classes
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = x.view(x.size(0), -1) # flatten the output of conv2 to (batch_size, 32 * 7 * 7)
output = self.out(x)
return output
net = Net()
__init__
is the definition of the convolutional layer, and super()
must be
executed first to initialize the parent class nn.Module
.forward
is the real
execution of the data flow. For example, in the above code, the input x
passes
through the defined conv1
first and then passes through the activation function
F.relu()
.In the beginning, you should import torch.nn.functional
as F
.
F.relu()
is an official function. If you define relu
as myrelu
in __init__
,
then your first sentence will be x=F.max_pool2d(myrelu(self.conv1(x)),2)
.
After a series of flows, return x
to the outside.
PyTorch has an abstract Dataset
class. Thistutorial
walks through a nice example of creating a custom FacialLandmarkDataset
class as
a subclass of Dataset
.
PyTorch’s TensorDataset
is a Dataset
wrapping tensors.
torch.utils.data.DataLoader
is an iterator that provides all these features.
- Batching the data
- Shuffling the data
- Load the data in parallel using multiprocessing workers.
# Mnist digits dataset
train_data = torchvision.datasets.MNIST(
root='./mnist/',
train=True, # this is training data
transform=torchvision.transforms.ToTensor(), # Converts a PIL.Image or numpy.ndarray to
# torch.FloatTensor of shape (C x H x W) and normalize in the range [0.0, 1.0]
download=DOWNLOAD_MNIST,
)
Next, we input train_data
into neural network and get output by forward()
, and
finally calculate the error. The code below omits the part of calculating the accuracy.
If you want to take a closer look at the accuracy code, please go to me see all the
code on github.
optimizer = torch.optim.Adam(cnn.parameters(), lr=LR) # optimize all cnn parameters
loss_func = nn.CrossEntropyLoss() # the target label is not one-hotted
# training and testing
for epoch in range(EPOCH):
for step, (b_x, b_y) in enumerate(train_loader): # 分配 batch data, normalize x when iterate train_loader
output = cnn(b_x) # cnn output
loss = loss_func(output, b_y) # cross entropy loss
optimizer.zero_grad() # clear gradients for this training step
loss.backward() # backpropagation, compute gradients
optimizer.step() # apply gradients
If you want the output
of the neural network to be similar to the ground truth
you expect, that is to keep reducing the difference between the two. This difference
is defined by you, which is the object function or loss function. If the loss
function approaches 0, then the goal is naturally achieved.
The master process of ElasticDL uses asynchronous or synchronous SGD methods to
coordinate workers for training
. When using asynchronous SGD method, the master
will start a high-performance parameter server for each worker to use. When using
synchronous SGD, ElasticDL uses Kubernetes-native's fault-tolerable AllReduce implementation.
ElasticDL holds master-worker architecture. The master node plays the master role in two aspects.
- It's the master of the cluster.
- It's the master of the model training/evaluation/prediction process.
In a distributed deep learning system, several workers need to be started and monitored. It is necessary to split the training data to the workers and to update the model by integrating the gradients calculated by each worker, which involves communication and synchronization. Fault-tolerant design is also an issue that must be considered.
A common solution is to provide a distributed programming framework for jobs so that users only need to fill in the business logic like cloze, and distributed computing processes such as communication, synchronization, and fault tolerance are completed by the code of the framework.
After completing the definition of class, the user is required to fill in the
forward
, loss
, optimizer
and feed
functions.
forward
defines the forward calculation process of deep learning.The back propagation
process is automatically derived by PyTorch.The loss
function returns the loss
function used during model training. The optimizer
function returns the optimizer
used during model training.feed
customizes the conversion process of training data
to PyTorch model input.
The programming of all these functions only requires knowledge of the PyTorch API, and no background knowledge of distributed training is required. After writing, users can use small data for debugging and verification on a single machine. If it passes, it can be submitted to the Kubernetes cluster for distributed fault-tolerant large-scale training without any code modification.
The specific model building method can refer to this mnist_subclass.py.
ElasticDL introduces a master process for each job. By calling the Kubernetes API, the master process understands the cluster situation. The data is distributed by the master.dynamic_data_sharding.md
- A worker get a task from the master.
- A worker reads real data according to the offset in the task
feed
customizes the conversion process of training data to PyTorch model input.
TODO: Make DataLoader works with task, more details will be added.
There is a tutorial about feed in TensorFlow.
A task received by an ElasticDL worker usually includes multiple minibatches. For each task, the worker opens the corresponding file or table, and then:
- Get a mini-batch training data.
- Call the user-defined
forward
function with the local model as a parameter to calculate the cost. If the model is large, some parameters may come from the parameter server. - The worker performs backward calculations to obtain the gradient.
- In synchronous SGD, the worker calls
AllReduce
to implement FTlib to synchronize gradients and update the model. In asynchronous SGD, the worker uploads gradients to theparameter server
from time to time, and also obtains global model parameters from the parameter server from time to time.
while (True):
task = get_task()
dataset = create_dataset(task)
for minibatch in dataset:
pull_parameters()
forward()
backward()
push_gradients()
The advanced API in PyTorch such as torch.optim
is not available,we had to
update the value of each parameter by name, and manually zero the gradient of
each parameter.
torch.no_grad()
context is necessary because we don't want to record these
operations in the next gradient calculation.
To go further, we can use model.parameters()
and model.zero_grad()
(defined
by PyTorch for nn.Module
) to make these steps more concise, and there will be
no errors of forgetting some parameters, especially when we build a complex model:
with torch.no_grad():
for param in model.parameters():
param -= param.grad * lr
model.zero_grad()
This document describes the design of a distributed parameter server for ElasticDL.
Each PS pod has a RPC server to provide RPC services. Workers use RPC services
to pull model parameters. pull_variable
service is to pull all non-embedding
parameters. pull_embedding_vector
service is to pull embedding vectors
specified by an embedding layer name and a list of discrete IDs.
service PServer{
rpc pull_variable(PullModelRequest) returns (PullModelResponse);
rpc pull_embedding_vector(PullEmbeddingVectorRequest) returns (Tensor);
}
A worker computes gradients in each training iteration, which contain gradients
for non-embedding parameters and some embedding vectors if applicable. The worker
partitions these gradients using their corresponding parameter names or discrete
IDs for embedding vectors. Then the worker sends gradient partitions to their
corresponding PS pods by RPC calls push_gradient
.
service PServer{
rpc push_gradient(PushGradientRequest) returns (PushGradientResponse);
}
When a PS pod receives gradients in push_gradient
, it uses a PyTorch optimizer
to apply gradients to non-embedding parameters.
import os
import torch
import torch.nn as nn
import torch.utils.data as Data
import torchvision
import matplotlib.pyplot as plt
# torch.manual_seed(1) # reproducible
EPOCH = 1 # train the training data n times, to save time, we just train 1 epoch
BATCH_SIZE = 50
LR = 0.001 # learning rate
DOWNLOAD_MNIST = False
# Mnist digits dataset
if not(os.path.exists('./mnist/')) or not os.listdir('./mnist/'):
# not mnist dir or mnist is empyt dir
DOWNLOAD_MNIST = True
train_data = torchvision.datasets.MNIST(
root='./mnist/',
train=True, # this is training data
transform=torchvision.transforms.ToTensor(), # Converts a PIL.Image or numpy.ndarray to
# torch.FloatTensor of shape (C x H x W) and normalize in the range [0.0, 1.0]
download=DOWNLOAD_MNIST,
)
# plot one example
print(train_data.train_data.size()) # (60000, 28, 28)
print(train_data.train_labels.size()) # (60000)
plt.imshow(train_data.train_data[0].numpy(), cmap='gray')
plt.title('%i' % train_data.train_labels[0])
plt.show()
# Data Loader for easy mini-batch return in training, the image batch shape will be (50, 1, 28, 28)
train_loader = Data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
# pick 2000 samples to speed up testing
test_data = torchvision.datasets.MNIST(root='./mnist/', train=False)
test_x = torch.unsqueeze(test_data.test_data, dim=1).type(torch.FloatTensor)[:2000]/255. # shape from (2000, 28, 28) to (2000, 1, 28, 28), value in range(0,1)
test_y = test_data.test_labels[:2000]
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.conv1 = nn.Sequential( # input shape (1, 28, 28)
nn.Conv2d(
in_channels=1, # input height
out_channels=16, # n_filters
kernel_size=5, # filter size
stride=1, # filter movement/step
padding=2, # if want same width and length of this image after Conv2d, padding=(kernel_size-1)/2 if stride=1
), # output shape (16, 28, 28)
nn.ReLU(), # activation
nn.MaxPool2d(kernel_size=2), # choose max value in 2x2 area, output shape (16, 14, 14)
)
self.conv2 = nn.Sequential( # input shape (16, 14, 14)
nn.Conv2d(16, 32, 5, 1, 2), # output shape (32, 14, 14)
nn.ReLU(), # activation
nn.MaxPool2d(2), # output shape (32, 7, 7)
)
self.out = nn.Linear(32 * 7 * 7, 10) # fully connected layer, output 10 classes
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = x.view(x.size(0), -1) # flatten the output of conv2 to (batch_size, 32 * 7 * 7)
output = self.out(x)
return output, x # return x for visualization
cnn = CNN()
print(cnn) # net architecture
# following function (plot_with_labels) is for visualization, can be ignored if not interested
from matplotlib import cm
try: from sklearn.manifold import TSNE; HAS_SK = True
except: HAS_SK = False; print('Please install sklearn for layer visualization')
def plot_with_labels(lowDWeights, labels):
plt.cla()
X, Y = lowDWeights[:, 0], lowDWeights[:, 1]
for x, y, s in zip(X, Y, labels):
c = cm.rainbow(int(255 * s / 9)); plt.text(x, y, s, backgroundcolor=c, fontsize=9)
plt.xlim(X.min(), X.max()); plt.ylim(Y.min(), Y.max()); plt.title('Visualize last layer'); plt.show(); plt.pause(0.01)
plt.ion()
optimizer = torch.optim.Adam(cnn.parameters(), lr=LR) # optimize all cnn parameters
loss_func = nn.CrossEntropyLoss() # the target label is not one-hotted
learning_rate = 1e-4
# training and testing
for epoch in range(EPOCH):
for step, (b_x, b_y) in enumerate(train_loader): # gives batch data, normalize x when iterate train_loader
output = cnn(b_x)[0] # cnn output
loss = loss_func(output, b_y) # cross entropy loss
# print(epoch, loss.item())
# optimizer.zero_grad() # clear gradients for this training step
# loss.backward() # backpropagation, compute gradients
# optimizer.step() # apply gradients
cnn.zero_grad()
loss.backward()
with torch.no_grad():
for param in cnn.parameters():
param -= learning_rate * param.grad
if step % 50 == 0:
test_output, last_layer = (test_x)
pred_y = torch.max(test_output, 1)[1].data.numpy()
accuracy = float((pred_y == test_y.data.numpy()).astype(int).sum()) / float(test_y.size(0))
print('Epoch: ', epoch, '| train loss: %.4f' % loss.data.numpy(), '| test accuracy: %.2f' % accuracy)
if HAS_SK:
# Visualization of trained flatten layer (T-SNE)
tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
plot_only = 500
low_dim_embs = tsne.fit_transform(last_layer.data.numpy()[:plot_only, :])
labels = test_y.numpy()[:plot_only]
plot_with_labels(low_dim_embs, labels)
plt.ioff()
# print 10 predictions from test data
test_output, _ = cnn(test_x[:10])
pred_y = torch.max(test_output, 1)[1].data.numpy()
print(pred_y, 'prediction number')
print(test_y[:10].numpy(), 'real number')