Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

Signal #39

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
# Neural machine translation

Repository to collect code for neural machine translation internally at MILA. The short-term objective is to have an attention-based model working on multiple GPUs (see [#6](https://github.com/bartvm/nmt/issues/6)). My proposal is to base the model code of Cho's for now (see [#1](https://github.com/bartvm/nmt/issues/1), because it has simpler internals than Blocks that we can hack away at if needed for multi-GPU.
Repository to collect code for neural machine translation internally at MILA.
The short-term objective is to have an attention-based model working on
multiple GPUs (see [#6](https://github.com/bartvm/nmt/issues/6)). My proposal
is to base the model code of Cho's for now (see
[#1](https://github.com/bartvm/nmt/issues/1), because it has simpler internals
than Blocks that we can hack away at if needed for multi-GPU.

To have a central collection of research ideas and discussions, please create issues and comment on them.
To have a central collection of research ideas and discussions, please create
issues and comment on them.

## Training on the lab computers

To train efficiently, make sure of the following:

* Use cuDNN 4; if cuDNN is disabled it will take the gradient of the softmax on the CPU which is much slower.
* Enable CNMeM (e.g. add `cnmem = 0.98` in the `[lib]` section of your `.theanorc`).
* Use cuDNN 4; if cuDNN is disabled it will take the gradient of the
softmax on the CPU which is much slower.
* Enable CNMeM (e.g. add `cnmem = 0.98` in the `[lib]` section of your
`.theanorc`).

Launching with Platoon can be done using `platoon-launcher nmt gpu0 gpu1 -c config.json`. To watch the logs it's wortwhile to alias the command `watch tail "$(ls -1dt PLATOON_LOGS/nmt/*/ | head -n 1)*"`.
Launching with Platoon can be done using `platoon-launcher nmt gpu0 gpu1
-c="config.json 4"` where 4 is the number of workers.. To watch the logs
it's wortwhile to alias the command `watch tail "$(ls -1dt
PLATOON_LOGS/nmt/*/ | head -n 1)*"`.

Starting a single GPU experiment is done with `python nmt_singly.py config.json`.
Starting a single GPU experiment is done with `python nmt_single.py
config.json`.

## WMT16 data

Expand Down
3 changes: 2 additions & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"valid_sync": true,
"train_len": 32,
"control_port": 5567,
"batch_port": 5568
"batch_port": 5568,
"beta": 0.9
},
"management": {
"sample_freq": 1000,
Expand Down
37 changes: 35 additions & 2 deletions data_iterator.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,47 @@
from itertools import count

import numpy

from fuel.datasets.text import TextFile
from fuel.transformers import Merge
from fuel.schemes import ConstantScheme
from fuel.transformers import Batch, Cache, Mapping, SortMapping, Padding
from fuel.transformers import (Batch, Cache, Mapping, SortMapping, Padding,
Transformer)


EOS_TOKEN = '<EOS>' # 0
UNK_TOKEN = '<UNK>' # 1


class Shuffle(Transformer):
def __init__(self, data_stream, buffer_size, **kwargs):
if kwargs.get('iteration_scheme') is not None:
raise ValueError
super(Shuffle, self).__init__(
data_stream, produces_examples=data_stream.produces_examples,
**kwargs)
self.buffer_size = buffer_size
self.cache = [[] for _ in self.sources]

def get_data(self, request=None):
if request is not None:
raise ValueError
if not self.cache[0]:
self._cache()
return tuple(cache.pop() for cache in self.cache)

def _cache(self):
temp_caches = [[] for _ in self.sources]
for i in range(self.buffer_size):
for temp_cache, data in zip(temp_caches,
next(self.child_epoch_iterator)):
temp_cache.append(data)
shuffled_indices = numpy.random.permutation(self.buffer_size)
for i in shuffled_indices:
for temp_cache, cache in zip(temp_caches, self.cache):
cache.append(temp_cache[i])


def _source_length(sentence_pair):
"""Returns the length of the second element of a sequence.

Expand Down Expand Up @@ -83,6 +115,7 @@ def get_stream(source, target, source_dict, target_dict, batch_size=128,
)
sorted_batches = Mapping(large_batches, SortMapping(_source_length))
batches = Cache(sorted_batches, ConstantScheme(batch_size))
masked_batches = Padding(batches)
shuffled_batches = Shuffle(batches, buffer_multiplier)
masked_batches = Padding(shuffled_batches)

return masked_batches
28 changes: 25 additions & 3 deletions nmt_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import os
import shutil
import signal
import sys
from multiprocessing import Process

Expand All @@ -19,7 +20,7 @@ class NMTController(Controller):
This multi-process controller implements patience-based early-stopping SGD
"""

def __init__(self, experiment_id, config):
def __init__(self, experiment_id, config, num_workers):
"""
Initialize the NMTController

Expand All @@ -29,8 +30,12 @@ def __init__(self, experiment_id, config):
A string that uniquely identifies this run.
config : dict
The deserialized JSON configuration file
num_workers : int
The number of workers (GPUs), used to calculate the alpha
parameter for EASGD.

"""
self.beta = config['multi'].pop('beta')
self.config = config
super(NMTController, self).__init__(config['multi']['control_port'])
self.batch_port = config['multi']['batch_port']
Expand All @@ -43,11 +48,23 @@ def __init__(self, experiment_id, config):
self.min_valid_cost = numpy.inf

self.valid = False
self._stop = False

signal.signal(signal.SIGTERM, self.stop)
signal.signal(signal.SIGINT, self.stop)

self.experiment_id = experiment_id
ServerLogger(filename='{}.log.jsonl.gz'.format(self.experiment_id),
threaded=True)

self.num_workers = num_workers

def stop(self, signum, frame):
print('Received SIGINT/SIGTERM')
self._stop = True
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)

def start_batch_server(self):
self.p = Process(target=self._send_mb)
self.p.daemon = True
Expand Down Expand Up @@ -90,6 +107,9 @@ def handle_control(self, req, worker_id):

if req == 'config':
control_response = self.config
elif req == 'alpha':
tau = self.config['multi']['train_len']
control_response = self.beta / tau / self.num_workers
elif req == 'experiment_id':
control_response = self.experiment_id
elif req == 'next':
Expand All @@ -113,7 +133,8 @@ def handle_control(self, req, worker_id):
else:
self.bad_counter += 1

if self.uidx > self.max_mb or self.bad_counter > self.patience:
if (self._stop or self.uidx > self.max_mb or
self.bad_counter > self.patience):
control_response = 'stop'
self.worker_is_done(worker_id)

Expand All @@ -124,10 +145,11 @@ def handle_control(self, req, worker_id):
# Load the configuration file
with io.open(sys.argv[1]) as f:
config = json.load(f)
num_workers = int(sys.argv[2])
# Create unique experiment ID and backup config file
experiment_id = binascii.hexlify(os.urandom(3)).decode()
shutil.copyfile(sys.argv[1], '{}.config.json'.format(experiment_id))
# Start controller
l = NMTController(experiment_id, config)
l = NMTController(experiment_id, config, num_workers)
l.start_batch_server()
l.serve()
27 changes: 7 additions & 20 deletions nmt_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import print_function

import copy
import logging
import os
import time
Expand Down Expand Up @@ -59,7 +58,8 @@ def train(worker, model_options, data_options,

LOGGER.info('Initializing parameters')
tparams = init_tparams(params)
worker.init_shared_params(tparams.values(), param_sync_rule=EASGD(0.5))
alpha = worker.send_req('alpha')
worker.init_shared_params(tparams.values(), param_sync_rule=EASGD(alpha))

# use_noise is for dropout
trng, use_noise, \
Expand Down Expand Up @@ -153,6 +153,7 @@ def train(worker, model_options, data_options,
float(y_mask.sum(0).mean())
log_entry['update_time'] = time.clock() - update_start
log_entry['train_time'] = time.clock() - train_start
log_entry['time'] = time.time()
log.log(log_entry)

step = worker.send_req({'done': train_len})
Expand All @@ -167,10 +168,12 @@ def train(worker, model_options, data_options,
valid_err = float(valid_errs.mean())
res = worker.send_req({'valid_err': valid_err})
log.log({'validation_cost': valid_err,
'train_time': time.clock() - train_start})
'train_time': time.clock() - train_start,
'time': time.time()})

if res == 'best':
if res == 'best' and saveto:
best_p = unzip(tparams)
save_params(best_p, model_filename, saveto_filename)

if valid_sync:
worker.copy_to_local()
Expand All @@ -181,22 +184,6 @@ def train(worker, model_options, data_options,
# Release all shared ressources.
worker.close()

LOGGER.info('Saving')

if best_p is not None:
params = best_p
else:
params = unzip(tparams)

use_noise.set_value(0.)

if saveto:
numpy.savez(saveto, **best_p)
LOGGER.info('model saved')

params = copy.copy(best_p)
save_params(params, model_filename, saveto_filename)


if __name__ == "__main__":
LOGGER.info('Connecting to worker')
Expand Down