Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

Cluster #42

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"train_len": 32,
"control_port": 5567,
"batch_port": 5568,
"beta": 0.9
"beta": 0.9,
"log_port": 5569
},
"management": {
"sample_freq": 1000,
Expand All @@ -24,16 +25,18 @@
"save_freq": 5000
},
"data": {
"src": "/data/lisatmp4/vanmerb/wmt16/wmt16.de-en.tok.true.clean.shuf.en",
"src_vocab": "/data/lisatmp4/vanmerb/wmt16/wmt16.de-en.vocab.en",
"trg": "/data/lisatmp4/vanmerb/wmt16/wmt16.de-en.tok.true.clean.shuf.de",
"valid_src": "/data/lisatmp4/vanmerb/wmt16/dev/newstest2013.tok.true.en",
"trg_vocab": "/data/lisatmp4/vanmerb/wmt16/wmt16.de-en.vocab.de",
"src": "train/wmt16.de-en.tok.true.clean.shuf.en",
"src_vocab": "train/wmt16.de-en.vocab.en",
"trg": "train/wmt16.de-en.tok.true.clean.shuf.de",
"valid_src": "dev/newstest2013.tok.true.en",
"trg_vocab": "train/wmt16.de-en.vocab.de",
"batch_size": 64,
"n_words": 30000,
"valid_batch_size": 64,
"n_words_src": 30000,
"valid_trg": "/data/lisatmp4/vanmerb/wmt16/dev/newstest2013.tok.true.de"
"valid_trg": "dev/newstest2013.tok.true.de",
"max_src_length": 50,
"max_trg_length": 50
},
"model": {
"dim": 1024,
Expand Down
19 changes: 15 additions & 4 deletions data_iterator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
from itertools import count

import numpy
Expand All @@ -6,7 +7,7 @@
from fuel.transformers import Merge
from fuel.schemes import ConstantScheme
from fuel.transformers import (Batch, Cache, Mapping, SortMapping, Padding,
Transformer)
Filter, Transformer)


EOS_TOKEN = '<EOS>' # 0
Expand Down Expand Up @@ -55,7 +56,7 @@ def _source_length(sentence_pair):
def load_dict(filename, n_words=0):
"""Load vocab from TSV with words in last column."""
dict_ = {EOS_TOKEN: 0, UNK_TOKEN: 1}
with open(filename) as f:
with io.open(filename) as f:
if n_words > 0:
indices = range(len(dict_), n_words)
else:
Expand All @@ -64,8 +65,9 @@ def load_dict(filename, n_words=0):
return dict_


def get_stream(source, target, source_dict, target_dict, batch_size=128,
buffer_multiplier=100, n_words_source=0, n_words_target=0):
def get_stream(source, target, source_dict, target_dict, batch_size,
buffer_multiplier=100, n_words_source=0, n_words_target=0,
max_src_length=None, max_trg_length=None):
"""Returns a stream over sentence pairs.

Parameters
Expand Down Expand Up @@ -108,6 +110,15 @@ def get_stream(source, target, source_dict, target_dict, batch_size=128,
]
merged = Merge(streams, ('source', 'target'))

# Filter sentence lengths
if max_src_length or max_trg_length:
def filter_pair(pair):
src, trg = pair
src_ok = (not max_src_length) or len(src) < max_src_length
trg_ok = (not max_trg_length) or len(trg) < max_trg_length
return src_ok and trg_ok
merged = Filter(merged, filter_pair)

# Batches of approximately uniform size
large_batches = Batch(
merged,
Expand Down
52 changes: 52 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: root
dependencies:
- cffi=1.2.1=py35_0
- conda=3.19.1=py35_0
- conda-env=2.4.5=py35_0
- cython=0.23.4=py35_0
- freetype=2.5.5=0
- h5py=2.5.0=np110py35_4
- hdf5=1.8.15.1=2
- jbig=2.1=0
- jpeg=8d=0
- libffi=3.0.13=0
- libgfortran=1.0=0
- libpng=1.6.17=0
- libsodium=1.0.3=0
- libtiff=4.0.6=1
- mkl=11.3.1=0
- numexpr=2.4.6=np110py35_1
- numpy=1.10.4=py35_0
- openssl=1.0.2f=0
- pillow=3.1.1=py35_0
- pip=8.0.2=py35_0
- pycosat=0.6.1=py35_0
- pycparser=2.14=py35_0
- pycrypto=2.6.1=py35_0
- pytables=3.2.2=np110py35_0
- python=3.5.1=0
- pyyaml=3.11=py35_1
- pyzmq=15.2.0=py35_0
- readline=6.2=2
- requests=2.9.1=py35_0
- scipy=0.17.0=np110py35_1
- setuptools=19.6.2=py35_0
- six=1.10.0=py35_0
- sqlite=3.9.2=0
- tk=8.5.18=0
- toolz=0.7.4=py35_0
- wheel=0.29.0=py35_0
- xz=5.0.5=1
- yaml=0.1.6=0
- zeromq=4.1.3=0
- zlib=1.2.8=0
- pip:
- fuel (/home/vanmerb/fuel)==0.1.1
- mimir (/home/vanmerb/mimir)==0.1.dev1
- picklable-itertools (/home/vanmerb/picklable-itertools)==0.1.1
- platoon (/home/vanmerb/platoon)==0.5.0
- posix-ipc==1.0.0
- progressbar2==3.6.0
- tables==3.2.2
- theano (/home/vanmerb/Theano)==0.8.0.dev0

98 changes: 98 additions & 0 deletions nmt.pbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/bin/bash
#PBS -A jvb-000-ag
#PBS -l signal=SIGTERM@300
#PBS -m ae

# Invocation: msub nmt.pbs -F "\"config.json\"" -l nodes=1:gpus=2 -l walltime=1:00:00 -l feature=k80

echo "Using config file $1"

# Kill this job if any of these commands fail
set -e

cd "${PBS_O_WORKDIR}"

# This should load CUDA as well as cuDNN
module load cuda/7.5.18 libs/cuDNN/4

# Use own Python installation
export PATH=$HOME/miniconda3/bin:$PATH

# This is where we store e.g. jq
export PATH=${RAP}nmt/bin:$PATH

# Use the following Theano settings
declare -A theano_flags
theano_flags["floatX"]="float32"
theano_flags["force_device"]="true"
theano_flags["base_compiledir"]="/rap/jvb-000-aa/${USER}/theano_compiledir"
theano_flags["lib.cnmem"]="0.9"
theano_flags["dnn.enabled"]="True"

function join { local IFS="$1"; shift; echo "$*"; }
function merge {
for i in $(seq 1 $(($# / 2)))
do
eval "k=\${$i}"
eval "v=\${$(($i + $# / 2))}"
rval[$i]="$k=$v"
done
echo $(join , ${rval[@]})
}

export THEANO_FLAGS=$(merge ${!theano_flags[@]} ${theano_flags[@]})
echo "THEANO_FLAGS=$THEANO_FLAGS"

# Make sure we pick a port that isn't in use already
control_port=$(($(((RANDOM<<15)|RANDOM)) % 16383 + 49152))
batch_port=$(($(((RANDOM<<15)|RANDOM)) % 16383 + 49152))
log_port=$(($(((RANDOM<<15)|RANDOM)) % 16383 + 49152))

# Try to connect to ports to see if they are taken
# type nc &>/dev/null
# until ! (nc -z localhost $control_port || false)
# do
# echo "Trying another control port!"
# control_port=$((control_port+1))
# done
# until ! (nc -z localhost $batch_port || false) && (( batch_port != control_port ))
# do
# echo "Trying another batch port!"
# batch_port=$((batch_port+1))
# done

# Write ports to config
_1="$(mktemp)"
cat "$1" | jq ".multi.control_port |= $control_port | .multi.batch_port |= $batch_port | .multi.log_port |= $log_port" > "$_1"

# Read data from Luster parallel file system
echo "Working from ${RAP}nmt"
files=(src trg src_vocab trg_vocab valid_src valid_trg)
for file in "${files[@]}"
do
filename="${RAP}nmt/$(cat "$_1" | jq -r ".data.$file")"
test -e "$filename" || (echo "$filename doesn't exist" && exit 1)
FILTERS[$((${#FILTERS[@]} + 1))]=".data.$file |= \"${RAP}nmt/\" + ."
done
_2="$(mktemp)"
cat "$_1" | jq "$(join '|' "${FILTERS[@]}")" > "$_2"

# Print final config
cat "$_2" | jq '.'

# The following GPUs are available
for id in $(nvidia-smi --query-gpu=index --format=csv,noheader)
do
GPUS[${#GPUS[@]}]="gpu$id"
done
echo "Using GPUs ${GPUS[*]}"

# Make sure the GPU and cuDNN work
THEANO_FLAGS=device=${GPUS[0]} python -c "import theano.sandbox.cuda; theano.sandbox.cuda.dnn_available()"

# For some strange reason this is set to C (ANSI_X3.4-1968)
export LANG=en_US.UTF-8

# Let's run this thing (but keep Theano's cache if things go wrong)
set +e
platoon-launcher nmt ${GPUS[@]} -d -c "$_2 ${#GPUS[@]}" -w "$control_port"
7 changes: 5 additions & 2 deletions nmt_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def load_data(src, trg,
valid_src, valid_trg,
src_vocab, trg_vocab,
n_words, n_words_src,
batch_size, valid_batch_size):
batch_size, valid_batch_size,
max_src_length, max_trg_length):
LOGGER.info('Loading data')

dictionaries = [src_vocab, trg_vocab]
Expand All @@ -47,7 +48,9 @@ def load_data(src, trg,
dictionaries[1],
n_words_source=n_words_src,
n_words_target=n_words,
batch_size=batch_size)
batch_size=batch_size,
max_src_length=max_src_length,
max_trg_length=max_trg_length)
valid_stream = get_stream([valid_datasets[0]],
[valid_datasets[1]],
dictionaries[0],
Expand Down
11 changes: 7 additions & 4 deletions nmt_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def __init__(self, experiment_id, config, num_workers):
"""
self.beta = config['multi'].pop('beta')
self.config = config
super(NMTController, self).__init__(config['multi']['control_port'])
self.batch_port = config['multi']['batch_port']
super(NMTController, self).__init__(config['multi']['control_port'],
config['multi']['batch_port'])
self.patience = config['training']['patience']
self.max_mb = config['training']['finish_after']

Expand All @@ -55,7 +55,7 @@ def __init__(self, experiment_id, config, num_workers):

self.experiment_id = experiment_id
ServerLogger(filename='{}.log.jsonl.gz'.format(self.experiment_id),
threaded=True)
threaded=True, port=config['multi']['log_port'])

self.num_workers = num_workers

Expand All @@ -71,12 +71,15 @@ def start_batch_server(self):
self.p.start()

def _send_mb(self):
self.init_data(self.batch_port)
LOGGER.debug('Loading training data stream')
_, train_stream, _ = load_data(**self.config['data'])

while True:
LOGGER.debug('Start new epoch sending batches')
for x, x_mask, y, y_mask in train_stream.get_epoch_iterator():
LOGGER.debug('Sending batch')
self.send_mb([x.T, x_mask.T, y.T, y_mask.T])
LOGGER.debug('Sent batch')

def handle_control(self, req, worker_id):
"""
Expand Down
8 changes: 6 additions & 2 deletions nmt_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import os
import sys
import time

import numpy
Expand Down Expand Up @@ -40,12 +41,14 @@ def train(worker, model_options, data_options,
sample_freq, # generate some samples after every sampleFreq
control_port,
batch_port,
log_port,
reload_):

LOGGER.info('Connecting to data socket and loading validation data')
worker.init_mb_sock(batch_port)
_, _, valid_stream = load_data(**data_options)

worker.start_compilation()
LOGGER.info('Building model')
params = init_params(model_options)
# reload parameters
Expand Down Expand Up @@ -119,10 +122,11 @@ def train(worker, model_options, data_options,
LOGGER.info('Building optimizers')
f_grad_shared, f_update = getattr(optimizers, optimizer)(lr, tparams,
grads, inps, cost)
worker.finish_compilation()

LOGGER.info('Optimization')

log = RemoteLogger()
log = RemoteLogger(port=log_port)
train_start = time.clock()
best_p = None

Expand Down Expand Up @@ -187,7 +191,7 @@ def train(worker, model_options, data_options,

if __name__ == "__main__":
LOGGER.info('Connecting to worker')
worker = Worker(control_port=5567)
worker = Worker(int(sys.argv[1]))
LOGGER.info('Retrieving configuration')
config = worker.send_req('config')
train(worker, config['model'], config['data'],
Expand Down