diff --git a/README.md b/README.md index 0f68671..ec69cd5 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,31 @@ # Neural machine translation -Repository to collect code for neural machine translation internally at MILA. The short-term objective is to have an attention-based model working on multiple GPUs (see [#6](https://github.com/bartvm/nmt/issues/6)). My proposal is to base the model code of Cho's for now (see [#1](https://github.com/bartvm/nmt/issues/1), because it has simpler internals than Blocks that we can hack away at if needed for multi-GPU. +Repository to collect code for neural machine translation internally at MILA. +The short-term objective is to have an attention-based model working on +multiple GPUs (see [#6](https://github.com/bartvm/nmt/issues/6)). My proposal +is to base the model code of Cho's for now (see +[#1](https://github.com/bartvm/nmt/issues/1), because it has simpler internals +than Blocks that we can hack away at if needed for multi-GPU. -To have a central collection of research ideas and discussions, please create issues and comment on them. +To have a central collection of research ideas and discussions, please create +issues and comment on them. ## Training on the lab computers To train efficiently, make sure of the following: -* Use cuDNN 4; if cuDNN is disabled it will take the gradient of the softmax on the CPU which is much slower. -* Enable CNMeM (e.g. add `cnmem = 0.98` in the `[lib]` section of your `.theanorc`). +* Use cuDNN 4; if cuDNN is disabled it will take the gradient of the + softmax on the CPU which is much slower. +* Enable CNMeM (e.g. add `cnmem = 0.98` in the `[lib]` section of your + `.theanorc`). -Launching with Platoon can be done using `platoon-launcher nmt gpu0 gpu1 -c config.json`. To watch the logs it's wortwhile to alias the command `watch tail "$(ls -1dt PLATOON_LOGS/nmt/*/ | head -n 1)*"`. +Launching with Platoon can be done using `platoon-launcher nmt gpu0 gpu1 +-c="config.json 4"` where 4 is the number of workers.. To watch the logs +it's wortwhile to alias the command `watch tail "$(ls -1dt +PLATOON_LOGS/nmt/*/ | head -n 1)*"`. -Starting a single GPU experiment is done with `python nmt_singly.py config.json`. +Starting a single GPU experiment is done with `python nmt_single.py +config.json`. ## WMT16 data diff --git a/config.json b/config.json index 7f22514..affca51 100644 --- a/config.json +++ b/config.json @@ -13,7 +13,8 @@ "valid_sync": true, "train_len": 32, "control_port": 5567, - "batch_port": 5568 + "batch_port": 5568, + "beta": 0.9 }, "management": { "sample_freq": 1000, diff --git a/data_iterator.py b/data_iterator.py index f9d6137..60f2a51 100644 --- a/data_iterator.py +++ b/data_iterator.py @@ -1,15 +1,47 @@ from itertools import count +import numpy + from fuel.datasets.text import TextFile from fuel.transformers import Merge from fuel.schemes import ConstantScheme -from fuel.transformers import Batch, Cache, Mapping, SortMapping, Padding +from fuel.transformers import (Batch, Cache, Mapping, SortMapping, Padding, + Transformer) EOS_TOKEN = '' # 0 UNK_TOKEN = '' # 1 +class Shuffle(Transformer): + def __init__(self, data_stream, buffer_size, **kwargs): + if kwargs.get('iteration_scheme') is not None: + raise ValueError + super(Shuffle, self).__init__( + data_stream, produces_examples=data_stream.produces_examples, + **kwargs) + self.buffer_size = buffer_size + self.cache = [[] for _ in self.sources] + + def get_data(self, request=None): + if request is not None: + raise ValueError + if not self.cache[0]: + self._cache() + return tuple(cache.pop() for cache in self.cache) + + def _cache(self): + temp_caches = [[] for _ in self.sources] + for i in range(self.buffer_size): + for temp_cache, data in zip(temp_caches, + next(self.child_epoch_iterator)): + temp_cache.append(data) + shuffled_indices = numpy.random.permutation(self.buffer_size) + for i in shuffled_indices: + for temp_cache, cache in zip(temp_caches, self.cache): + cache.append(temp_cache[i]) + + def _source_length(sentence_pair): """Returns the length of the second element of a sequence. @@ -83,6 +115,7 @@ def get_stream(source, target, source_dict, target_dict, batch_size=128, ) sorted_batches = Mapping(large_batches, SortMapping(_source_length)) batches = Cache(sorted_batches, ConstantScheme(batch_size)) - masked_batches = Padding(batches) + shuffled_batches = Shuffle(batches, buffer_multiplier) + masked_batches = Padding(shuffled_batches) return masked_batches diff --git a/nmt_controller.py b/nmt_controller.py index 3cc29b0..0d56161 100644 --- a/nmt_controller.py +++ b/nmt_controller.py @@ -4,6 +4,7 @@ import json import os import shutil +import signal import sys from multiprocessing import Process @@ -19,7 +20,7 @@ class NMTController(Controller): This multi-process controller implements patience-based early-stopping SGD """ - def __init__(self, experiment_id, config): + def __init__(self, experiment_id, config, num_workers): """ Initialize the NMTController @@ -29,8 +30,12 @@ def __init__(self, experiment_id, config): A string that uniquely identifies this run. config : dict The deserialized JSON configuration file + num_workers : int + The number of workers (GPUs), used to calculate the alpha + parameter for EASGD. """ + self.beta = config['multi'].pop('beta') self.config = config super(NMTController, self).__init__(config['multi']['control_port']) self.batch_port = config['multi']['batch_port'] @@ -43,11 +48,23 @@ def __init__(self, experiment_id, config): self.min_valid_cost = numpy.inf self.valid = False + self._stop = False + + signal.signal(signal.SIGTERM, self.stop) + signal.signal(signal.SIGINT, self.stop) self.experiment_id = experiment_id ServerLogger(filename='{}.log.jsonl.gz'.format(self.experiment_id), threaded=True) + self.num_workers = num_workers + + def stop(self, signum, frame): + print('Received SIGINT/SIGTERM') + self._stop = True + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + def start_batch_server(self): self.p = Process(target=self._send_mb) self.p.daemon = True @@ -90,6 +107,9 @@ def handle_control(self, req, worker_id): if req == 'config': control_response = self.config + elif req == 'alpha': + tau = self.config['multi']['train_len'] + control_response = self.beta / tau / self.num_workers elif req == 'experiment_id': control_response = self.experiment_id elif req == 'next': @@ -113,7 +133,8 @@ def handle_control(self, req, worker_id): else: self.bad_counter += 1 - if self.uidx > self.max_mb or self.bad_counter > self.patience: + if (self._stop or self.uidx > self.max_mb or + self.bad_counter > self.patience): control_response = 'stop' self.worker_is_done(worker_id) @@ -124,10 +145,11 @@ def handle_control(self, req, worker_id): # Load the configuration file with io.open(sys.argv[1]) as f: config = json.load(f) + num_workers = int(sys.argv[2]) # Create unique experiment ID and backup config file experiment_id = binascii.hexlify(os.urandom(3)).decode() shutil.copyfile(sys.argv[1], '{}.config.json'.format(experiment_id)) # Start controller - l = NMTController(experiment_id, config) + l = NMTController(experiment_id, config, num_workers) l.start_batch_server() l.serve() diff --git a/nmt_worker.py b/nmt_worker.py index 346c922..ef48d1f 100644 --- a/nmt_worker.py +++ b/nmt_worker.py @@ -1,6 +1,5 @@ from __future__ import print_function -import copy import logging import os import time @@ -59,7 +58,8 @@ def train(worker, model_options, data_options, LOGGER.info('Initializing parameters') tparams = init_tparams(params) - worker.init_shared_params(tparams.values(), param_sync_rule=EASGD(0.5)) + alpha = worker.send_req('alpha') + worker.init_shared_params(tparams.values(), param_sync_rule=EASGD(alpha)) # use_noise is for dropout trng, use_noise, \ @@ -153,6 +153,7 @@ def train(worker, model_options, data_options, float(y_mask.sum(0).mean()) log_entry['update_time'] = time.clock() - update_start log_entry['train_time'] = time.clock() - train_start + log_entry['time'] = time.time() log.log(log_entry) step = worker.send_req({'done': train_len}) @@ -167,10 +168,12 @@ def train(worker, model_options, data_options, valid_err = float(valid_errs.mean()) res = worker.send_req({'valid_err': valid_err}) log.log({'validation_cost': valid_err, - 'train_time': time.clock() - train_start}) + 'train_time': time.clock() - train_start, + 'time': time.time()}) - if res == 'best': + if res == 'best' and saveto: best_p = unzip(tparams) + save_params(best_p, model_filename, saveto_filename) if valid_sync: worker.copy_to_local() @@ -181,22 +184,6 @@ def train(worker, model_options, data_options, # Release all shared ressources. worker.close() - LOGGER.info('Saving') - - if best_p is not None: - params = best_p - else: - params = unzip(tparams) - - use_noise.set_value(0.) - - if saveto: - numpy.savez(saveto, **best_p) - LOGGER.info('model saved') - - params = copy.copy(best_p) - save_params(params, model_filename, saveto_filename) - if __name__ == "__main__": LOGGER.info('Connecting to worker')