implement parameter server (PS) with PyTorch and OpenMPI
- Mar 21, 2019. All functionalities written in this repo has been re-wrote in PyTorch Communication Library. Please check my new GitHub repo to take a look and feel free to modify any part of it!
- Nov 18, 2018. ps_pytorch now supports both python 2.7 and python 3.6 and is moved to PyTorch 0.4.1
- Motivations
- System design
- Basic usages
- How to prepare datasets
- How to launch a distributed task
- Future work
- PyTorch provides easy-to-use APIs with dynamic computational graph
- PyTorch dose not offer full distributed packages (there is some communication libararies, but not support operations/APIs with the same flexiblity as OpenMPI)
- mpi4py provides a good Python binding for any distributions of MPI (e.g. OpenMPI, MPICH, and etc)
- PS node: This node serves both as master and PS in our system, i.e. it synchronize all workers to enter next iteration by broadcast global step to workers and also store the global model, which are keeping fetched by worker nodes at beginning of one iteration. For a user defined frequency, PS node will save the current model as checkpoint to shared file system (NFS in our system) for model evaluation.
- workers mainly aim at sample data points (or mini-batch) in from local dataset (we don't pass data among nodes to maintain data locality), computing gradients, and ship them back to PS.
- evaluator read the checkpoints from the shared directory, and do model evaluation. Note that: there is only testset data saved on evaluator nodes.
- gradient compression is implemented using high-speed compression tool Blosc to mitigate communication overhead
Anaconda is highly recommended for installing depdencies for this project. Assume a conda setup machine is used, you can run
bash ./tools/pre_run.sh
to install all depdencies needed.
Altough this project focuses on implementing PS in PyTorch, we do provide single machine version to measure scalability of this implementation.
python single_machine.py --dataset=MNIST/Cifar10 --network=LeNet/Resnet --batch-size=${BATCH_SIZE}
For running on distributed cluster, the first thing you need do is to launch AWS EC2 instances.
This script helps you to launch EC2 instances automatically, but before running this script, you should follow the instruction to setup AWS CLI on your local machine.
After that, please edit this part in ./tools/pytorch_ec2.py
cfg = Cfg({
"name" : "PS_PYTORCH", # Unique name for this specific configuration
"key_name": "NameOfKeyFile", # Necessary to ssh into created instances
# Cluster topology
"n_masters" : 1, # Should always be 1
"n_workers" : 8,
"num_replicas_to_aggregate" : "8", # deprecated, not necessary
"method" : "spot",
# Region speficiation
"region" : "us-west-2",
"availability_zone" : "us-west-2b",
# Machine type - instance type configuration.
"master_type" : "m4.2xlarge",
"worker_type" : "m4.2xlarge",
# please only use this AMI for pytorch
"image_id": "ami-xxxxxxxx", # id of AMI
# Launch specifications
"spot_price" : "0.15", # Has to be a string
# SSH configuration
"ssh_username" : "ubuntu", # For sshing. E.G: ssh ssh_username@hostname
"path_to_keyfile" : "/dir/to/NameOfKeyFile.pem",
# NFS configuration
# To set up these values, go to Services > ElasticFileSystem > Create new filesystem, and follow the directions.
#"nfs_ip_address" : "172.31.3.173", # us-west-2c
#"nfs_ip_address" : "172.31.35.0", # us-west-2a
"nfs_ip_address" : "172.31.14.225", # us-west-2b
"nfs_mount_point" : "/home/ubuntu/shared", # NFS base dir
For setting everything up on EC2 cluster, the easiest way is to setup one machine and create an AMI. Then use the AMI id for image_id
in pytorch_ec2.py
. Then, launch EC2 instances by running
python ./tools/pytorch_ec2.py launch
After all launched instances are ready (this may take a while), getting private ips of instances by
python ./tools/pytorch_ec2.py get_hosts
this will write ips into a file named hosts_address
, which looks like
172.31.16.226 (${PS_IP})
172.31.27.245
172.31.29.131
172.31.18.108
172.31.18.174
172.31.17.228
172.31.16.25
172.31.30.61
172.31.29.30
After generating the hosts_address
of all EC2 instances, running the following command will copy your keyfile to the parameter server (PS) instance whose address is always the first one in hosts_address
. local_script.sh
will also do some basic configurations e.g. clone this git repo
bash ./tool/local_script.sh ${PS_IP}
At this stage, you should ssh to the PS instance and all operation should happen on PS. In PS setting, PS should be able to ssh to any compute node, this part dose the job for you by running (after ssh to the PS)
bash ./tools/remote_script.sh
We currently support MNIST and Cifar10 datasets. Download, split, and transform datasets by (and ./tools/remote_script.sh
dose this for you)
bash ./src/data_prepare.sh
Since this project is built on MPI, tasks are required to be launched by PS (or master) instance. run_pytorch.sh
wraps job-launching process up. Commonly used options (arguments) are listed as following:
Argument | Comments |
---|---|
n |
Number of processes (size of cluster) e.g. if we have P compute node and 1 PS, n=P+1. |
hostfile |
A directory to the file that contains Private IPs of every node in the cluster, we use hosts_address here as mentioned before. |
lr |
Inital learning rate that will be use. |
momentum |
Value of momentum that will be use. |
max-steps |
The maximum number of iterations to train. |
epochs |
The maximal number of epochs to train (somehow redundant). |
network |
Types of deep neural nets, currently LeNet , ResNet-18/32/50/110/152 , and VGGs are supported. |
dataset |
Datasets use for training. |
batch-size |
Batch size for optimization algorithms. |
eval-freq |
Frequency of iterations to evaluation the model. |
enable-gpu |
Training on CPU/GPU, if CPU please leave this argument empty. |
train-dir |
Directory to save model checkpoints for evaluation. |
Distributed evaluator will fetch model checkpoints from the shared directory and evaluate model on validation set. To evaluate model, you can run
bash ./src/evaluate_pytorch.sh
with specified arguments.
Evaluation arguments are listed as following:
Argument | Comments |
---|---|
eval-batch-size |
Batch size (on validation set) used during model evaluation. |
eval-freq |
Frequency of iterations to evaluation the model, should be set to the same value as run_pytorch.sh. |
network |
Types of deep neural nets, should be set to the same value as run_pytorch.sh. |
dataset |
Datasets use for training, should be set to the same value as run_pytorch.sh. |
model-dir |
Directory to save model checkpoints for evaluation, should be set to the same value as run_pytorch.sh. |
(Please note that this project is still in early alpha version)
- (Already done! Please check this repo) Move APIs into PyTorch completely using its built-in communication lib
- Optimize the speedups and minize communication overhead
- Support async communication mode i.e. Backup Worker
- Wrap up more state-of-art deep models and dataset