From e8428a911eb5945eefbc0c7c4296ce1e79700821 Mon Sep 17 00:00:00 2001 From: Bart van Merrienboer Date: Wed, 17 Feb 2016 10:06:35 -0500 Subject: [PATCH 1/2] EASGD parameters, shuffled batches, measuring time For EASGD they generally set beta and use it calculate alpha as a function of the communication period and the number of workers. Because the controller is agnostic to the number of workers connected we must pass it explicitly. In order to plot the logs of multiple workers together we should log the actual time (instead of the relative time). The sorting of batches seems to lead to some strange training artifacts; sometimes one of the workers gets significantly longer sentences than the other for a while, which makes it look like it's doing really badly. Moreover, the sentences sampled are always of the same length if sample_freq divides the batch buffer. To guarantee that this doesn't affect training too much I shuffled the batches. There are still some artifacts (one worker gets shorter/longer sentnces for a while) it reduces the noise a bit. In general it might be better to monitor cost/target length instead of just the cost. --- README.md | 24 ++++++++++++++++++------ config.json | 3 ++- data_iterator.py | 37 +++++++++++++++++++++++++++++++++++-- nmt_controller.py | 14 ++++++++++++-- nmt_worker.py | 7 +++++-- 5 files changed, 72 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 0f68671..ec69cd5 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,31 @@ # Neural machine translation -Repository to collect code for neural machine translation internally at MILA. The short-term objective is to have an attention-based model working on multiple GPUs (see [#6](https://github.com/bartvm/nmt/issues/6)). My proposal is to base the model code of Cho's for now (see [#1](https://github.com/bartvm/nmt/issues/1), because it has simpler internals than Blocks that we can hack away at if needed for multi-GPU. +Repository to collect code for neural machine translation internally at MILA. +The short-term objective is to have an attention-based model working on +multiple GPUs (see [#6](https://github.com/bartvm/nmt/issues/6)). My proposal +is to base the model code of Cho's for now (see +[#1](https://github.com/bartvm/nmt/issues/1), because it has simpler internals +than Blocks that we can hack away at if needed for multi-GPU. -To have a central collection of research ideas and discussions, please create issues and comment on them. +To have a central collection of research ideas and discussions, please create +issues and comment on them. ## Training on the lab computers To train efficiently, make sure of the following: -* Use cuDNN 4; if cuDNN is disabled it will take the gradient of the softmax on the CPU which is much slower. -* Enable CNMeM (e.g. add `cnmem = 0.98` in the `[lib]` section of your `.theanorc`). +* Use cuDNN 4; if cuDNN is disabled it will take the gradient of the + softmax on the CPU which is much slower. +* Enable CNMeM (e.g. add `cnmem = 0.98` in the `[lib]` section of your + `.theanorc`). -Launching with Platoon can be done using `platoon-launcher nmt gpu0 gpu1 -c config.json`. To watch the logs it's wortwhile to alias the command `watch tail "$(ls -1dt PLATOON_LOGS/nmt/*/ | head -n 1)*"`. +Launching with Platoon can be done using `platoon-launcher nmt gpu0 gpu1 +-c="config.json 4"` where 4 is the number of workers.. To watch the logs +it's wortwhile to alias the command `watch tail "$(ls -1dt +PLATOON_LOGS/nmt/*/ | head -n 1)*"`. -Starting a single GPU experiment is done with `python nmt_singly.py config.json`. +Starting a single GPU experiment is done with `python nmt_single.py +config.json`. ## WMT16 data diff --git a/config.json b/config.json index 7f22514..affca51 100644 --- a/config.json +++ b/config.json @@ -13,7 +13,8 @@ "valid_sync": true, "train_len": 32, "control_port": 5567, - "batch_port": 5568 + "batch_port": 5568, + "beta": 0.9 }, "management": { "sample_freq": 1000, diff --git a/data_iterator.py b/data_iterator.py index f9d6137..60f2a51 100644 --- a/data_iterator.py +++ b/data_iterator.py @@ -1,15 +1,47 @@ from itertools import count +import numpy + from fuel.datasets.text import TextFile from fuel.transformers import Merge from fuel.schemes import ConstantScheme -from fuel.transformers import Batch, Cache, Mapping, SortMapping, Padding +from fuel.transformers import (Batch, Cache, Mapping, SortMapping, Padding, + Transformer) EOS_TOKEN = '' # 0 UNK_TOKEN = '' # 1 +class Shuffle(Transformer): + def __init__(self, data_stream, buffer_size, **kwargs): + if kwargs.get('iteration_scheme') is not None: + raise ValueError + super(Shuffle, self).__init__( + data_stream, produces_examples=data_stream.produces_examples, + **kwargs) + self.buffer_size = buffer_size + self.cache = [[] for _ in self.sources] + + def get_data(self, request=None): + if request is not None: + raise ValueError + if not self.cache[0]: + self._cache() + return tuple(cache.pop() for cache in self.cache) + + def _cache(self): + temp_caches = [[] for _ in self.sources] + for i in range(self.buffer_size): + for temp_cache, data in zip(temp_caches, + next(self.child_epoch_iterator)): + temp_cache.append(data) + shuffled_indices = numpy.random.permutation(self.buffer_size) + for i in shuffled_indices: + for temp_cache, cache in zip(temp_caches, self.cache): + cache.append(temp_cache[i]) + + def _source_length(sentence_pair): """Returns the length of the second element of a sequence. @@ -83,6 +115,7 @@ def get_stream(source, target, source_dict, target_dict, batch_size=128, ) sorted_batches = Mapping(large_batches, SortMapping(_source_length)) batches = Cache(sorted_batches, ConstantScheme(batch_size)) - masked_batches = Padding(batches) + shuffled_batches = Shuffle(batches, buffer_multiplier) + masked_batches = Padding(shuffled_batches) return masked_batches diff --git a/nmt_controller.py b/nmt_controller.py index 3cc29b0..86fc2f8 100644 --- a/nmt_controller.py +++ b/nmt_controller.py @@ -19,7 +19,7 @@ class NMTController(Controller): This multi-process controller implements patience-based early-stopping SGD """ - def __init__(self, experiment_id, config): + def __init__(self, experiment_id, config, num_workers): """ Initialize the NMTController @@ -29,8 +29,12 @@ def __init__(self, experiment_id, config): A string that uniquely identifies this run. config : dict The deserialized JSON configuration file + num_workers : int + The number of workers (GPUs), used to calculate the alpha + parameter for EASGD. """ + self.beta = config['multi'].pop('beta') self.config = config super(NMTController, self).__init__(config['multi']['control_port']) self.batch_port = config['multi']['batch_port'] @@ -48,6 +52,8 @@ def __init__(self, experiment_id, config): ServerLogger(filename='{}.log.jsonl.gz'.format(self.experiment_id), threaded=True) + self.num_workers = num_workers + def start_batch_server(self): self.p = Process(target=self._send_mb) self.p.daemon = True @@ -90,6 +96,9 @@ def handle_control(self, req, worker_id): if req == 'config': control_response = self.config + elif req == 'alpha': + tau = self.config['multi']['train_len'] + control_response = self.beta / tau / self.num_workers elif req == 'experiment_id': control_response = self.experiment_id elif req == 'next': @@ -124,10 +133,11 @@ def handle_control(self, req, worker_id): # Load the configuration file with io.open(sys.argv[1]) as f: config = json.load(f) + num_workers = int(sys.argv[2]) # Create unique experiment ID and backup config file experiment_id = binascii.hexlify(os.urandom(3)).decode() shutil.copyfile(sys.argv[1], '{}.config.json'.format(experiment_id)) # Start controller - l = NMTController(experiment_id, config) + l = NMTController(experiment_id, config, num_workers) l.start_batch_server() l.serve() diff --git a/nmt_worker.py b/nmt_worker.py index 346c922..c2c8c31 100644 --- a/nmt_worker.py +++ b/nmt_worker.py @@ -59,7 +59,8 @@ def train(worker, model_options, data_options, LOGGER.info('Initializing parameters') tparams = init_tparams(params) - worker.init_shared_params(tparams.values(), param_sync_rule=EASGD(0.5)) + alpha = worker.send_req('alpha') + worker.init_shared_params(tparams.values(), param_sync_rule=EASGD(alpha)) # use_noise is for dropout trng, use_noise, \ @@ -153,6 +154,7 @@ def train(worker, model_options, data_options, float(y_mask.sum(0).mean()) log_entry['update_time'] = time.clock() - update_start log_entry['train_time'] = time.clock() - train_start + log_entry['time'] = time.time() log.log(log_entry) step = worker.send_req({'done': train_len}) @@ -167,7 +169,8 @@ def train(worker, model_options, data_options, valid_err = float(valid_errs.mean()) res = worker.send_req({'valid_err': valid_err}) log.log({'validation_cost': valid_err, - 'train_time': time.clock() - train_start}) + 'train_time': time.clock() - train_start, + 'time': time.time()}) if res == 'best': best_p = unzip(tparams) From de4db9057a2e12257174a8c2479a107999475b1f Mon Sep 17 00:00:00 2001 From: Bart van Merrienboer Date: Wed, 17 Feb 2016 22:06:37 -0500 Subject: [PATCH 2/2] Add signal handler --- nmt_controller.py | 14 +++++++++++++- nmt_worker.py | 20 ++------------------ 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/nmt_controller.py b/nmt_controller.py index 86fc2f8..0d56161 100644 --- a/nmt_controller.py +++ b/nmt_controller.py @@ -4,6 +4,7 @@ import json import os import shutil +import signal import sys from multiprocessing import Process @@ -47,6 +48,10 @@ def __init__(self, experiment_id, config, num_workers): self.min_valid_cost = numpy.inf self.valid = False + self._stop = False + + signal.signal(signal.SIGTERM, self.stop) + signal.signal(signal.SIGINT, self.stop) self.experiment_id = experiment_id ServerLogger(filename='{}.log.jsonl.gz'.format(self.experiment_id), @@ -54,6 +59,12 @@ def __init__(self, experiment_id, config, num_workers): self.num_workers = num_workers + def stop(self, signum, frame): + print('Received SIGINT/SIGTERM') + self._stop = True + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + def start_batch_server(self): self.p = Process(target=self._send_mb) self.p.daemon = True @@ -122,7 +133,8 @@ def handle_control(self, req, worker_id): else: self.bad_counter += 1 - if self.uidx > self.max_mb or self.bad_counter > self.patience: + if (self._stop or self.uidx > self.max_mb or + self.bad_counter > self.patience): control_response = 'stop' self.worker_is_done(worker_id) diff --git a/nmt_worker.py b/nmt_worker.py index c2c8c31..ef48d1f 100644 --- a/nmt_worker.py +++ b/nmt_worker.py @@ -1,6 +1,5 @@ from __future__ import print_function -import copy import logging import os import time @@ -172,8 +171,9 @@ def train(worker, model_options, data_options, 'train_time': time.clock() - train_start, 'time': time.time()}) - if res == 'best': + if res == 'best' and saveto: best_p = unzip(tparams) + save_params(best_p, model_filename, saveto_filename) if valid_sync: worker.copy_to_local() @@ -184,22 +184,6 @@ def train(worker, model_options, data_options, # Release all shared ressources. worker.close() - LOGGER.info('Saving') - - if best_p is not None: - params = best_p - else: - params = unzip(tparams) - - use_noise.set_value(0.) - - if saveto: - numpy.savez(saveto, **best_p) - LOGGER.info('model saved') - - params = copy.copy(best_p) - save_params(params, model_filename, saveto_filename) - if __name__ == "__main__": LOGGER.info('Connecting to worker')