Skip to content

Commit

Permalink
Merge pull request #55 from bouthilx/feature/db/index
Browse files Browse the repository at this point in the history
Add indexation to databases and fix race condition for experiment configuration
  • Loading branch information
tsirif authored Mar 21, 2018
2 parents e2e1098 + c60a82e commit dff43f1
Show file tree
Hide file tree
Showing 7 changed files with 472 additions and 112 deletions.
49 changes: 43 additions & 6 deletions src/metaopt/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import logging

from metaopt.core import resolve_config
from metaopt.core.io.database import Database
from metaopt.core.io.database import Database, DuplicateKeyError
from metaopt.core.worker import workon
from metaopt.core.worker.experiment import Experiment

Expand Down Expand Up @@ -43,7 +43,8 @@ def infer_experiment():
expconfig = resolve_config.fetch_default_options()

# Fetch mopt system variables (database and resource information)
# See :const:`metaopt.core.io.resolve_config.ENV_VARS` for environmental variables used
# See :const:`metaopt.core.io.resolve_config.ENV_VARS` for environmental
# variables used
expconfig = resolve_config.merge_env_vars(expconfig)

# Initialize singleton database object
Expand All @@ -58,9 +59,37 @@ def infer_experiment():
exp_name = tmpconfig['name']
if exp_name is None:
raise RuntimeError("Could not infer experiment's name. "
"Please use either `name` cmd line arg or provide one "
"in metaopt's configuration file.")
"Please use either `name` cmd line arg or provide "
"one in metaopt's configuration file.")

experiment = create_experiment(exp_name, expconfig, cmdconfig, cmdargs)

return experiment


def create_experiment(exp_name, expconfig, cmdconfig, cmdargs):
"""Create an experiment based on configuration.
Configuration is a combination of command line, experiment configuration
file, experiment configuration in database and metaopt configuration files.
Precedence of configurations is:
`cmdargs` > `cmdconfig` > `dbconfig` > `expconfig`
This means `expconfig` values would be overwritten by `dbconfig` and so on.
Parameters
----------
exp_name: str
Name of the experiment
expconfig: dict
Configuration coming from default configuration files.
cmdconfig: dict
Configuration coming from configuration file.
cmdargs: dict
Configuration coming from command line arguments.
"""
# Initialize experiment object.
# Check for existing name and fetch configuration.
experiment = Experiment(exp_name)
Expand All @@ -77,8 +106,16 @@ def infer_experiment():
expconfig.pop('resources', None)
expconfig.pop('status', None)

# Finish experiment's configuration
experiment.configure(expconfig)
# Finish experiment's configuration and write it to database.
try:
experiment.configure(expconfig)
except DuplicateKeyError:
# Fails if concurrent experiment with identical (name, metadata.user)
# is written first in the database.
# Next infer_experiment() should either load experiment from database
# and run smoothly if identical or trigger an experiment fork.
# In other words, there should not be more than 1 level of recursion.
experiment = create_experiment(exp_name, expconfig, cmdconfig, cmdargs)

return experiment

Expand Down
54 changes: 49 additions & 5 deletions src/metaopt/core/io/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""
from abc import abstractmethod, abstractproperty
import logging

from metaopt.core.utils import (AbstractSingletonType, SingletonFactory)

Expand All @@ -39,11 +38,12 @@ class AbstractDB(object, metaclass=AbstractSingletonType):
"""

ASCENDING = 0
DESCENDING = 1

def __init__(self, host='localhost', name=None,
port=None, username=None, password=None):
"""Init method, see attributes of :class:`AbstractDB`."""
self._logger = logging.getLogger(__name__)

self.host = host
self.name = name
self.port = port
Expand Down Expand Up @@ -74,8 +74,34 @@ def close_connection(self):
pass

@abstractmethod
def write(self, collection_name, data,
query=None):
def ensure_index(self, collection_name, keys, unique=False):
"""Create given indexes if they do not already exist in database.
Parameters
----------
collection_name : str
A collection inside database, a table.
keys: str or list of tuples
Can be a string representing a key to index, or a list of tuples
with the structure `[(key_name, sort_order)]`. `key_name` must be a
string and sort_order can be either `AbstractDB.ASCENDING` or
AbstractDB.DESCENDING`.
unique: bool, optional
Ensure each document have a different key value. If not, operations
like `write()` and `read_and_write()` will raise
`DuplicateKeyError`.
Defaults to False.
.. note::
Depending on the backend, the indexing operation might operate in
background. This means some operations on the database might occur
before the indexes are totally built.
"""
pass

@abstractmethod
def write(self, collection_name, data, query=None):
"""Write new information to a collection. Perform insert or update.
Parameters
Expand All @@ -97,6 +123,11 @@ def write(self, collection_name, data,
In the case of an update operation, if `query` fails to find a
document that matches, insert of `data` will be performed instead.
:raises :exc:`DuplicateKeyError`: if the operation is creating duplicate
keys in two different documents. Only occurs if the keys have
unique indexes. See :meth:`AbstractDB.ensure_index` for more
information about indexes.
"""
pass

Expand Down Expand Up @@ -139,6 +170,11 @@ def read_and_write(self, collection_name, query, data, selection=None):
:return: updated first matched document or None if nothing found
:raises :exc:`DuplicateKeyError`: if the operation is creating duplicate
keys in two different documents. Only occurs if the keys have
unique indexes. See :meth:`AbstractDB.ensure_index` for more
information about indexes.
"""
pass

Expand Down Expand Up @@ -181,6 +217,14 @@ class DatabaseError(RuntimeError):
pass


class DuplicateKeyError(DatabaseError):
"""Exception type used when a write attempt is made but the new document
have an index already contained in the database.
"""

pass


# pylint: disable=too-few-public-methods,abstract-method
class Database(AbstractDB, metaclass=SingletonFactory):
"""Class used to inject dependency on a database framework.
Expand Down
116 changes: 96 additions & 20 deletions src/metaopt/core/io/database/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,57 @@
:synopsis: Implement :class:`metaopt.core.io.database.AbstractDB` for MongoDB.
"""
import functools

import pymongo

from metaopt.core.io.database import (AbstractDB, DatabaseError)
from metaopt.core.io.database import (
AbstractDB, DatabaseError, DuplicateKeyError)


AUTH_FAILED_MESSAGES = [
"auth failed",
"Authentication failed."]

DUPLICATE_KEY_MESSAGES = [
"duplicate key error"]


def mongodb_exception_wrapper(method):
"""Convert pymongo exceptions to generic exception types defined in src.core.io.database.
Current exception types converted:
pymongo.errors.DuplicateKeyError -> DuplicateKeyError
pymongo.errors.BulkWriteError[DUPLICATE_KEY_MESSAGES] -> DuplicateKeyError
pymongo.errors.ConnectionFailure -> DatabaseError
pymongo.errors.OperationFailure(AUTH_FAILED_MESSAGES) -> DatabaseError
"""
@functools.wraps(method)
def _decorator(self, *args, **kwargs):

try:
rval = method(self, *args, **kwargs)
except pymongo.errors.DuplicateKeyError as e:
raise DuplicateKeyError(str(e)) from e
except pymongo.errors.BulkWriteError as e:
for error in e.details['writeErrors']:
if any(m in error["errmsg"] for m in DUPLICATE_KEY_MESSAGES):
raise DuplicateKeyError(error["errmsg"]) from e

raise
except pymongo.errors.ConnectionFailure as e:
raise DatabaseError("Connection Failure: database not found on "
"specified uri") from e
except pymongo.errors.OperationFailure as e:
if any(m in str(e) for m in AUTH_FAILED_MESSAGES):
raise DatabaseError("Authentication Failure: bad credentials") from e

raise

return rval

return _decorator


class MongoDB(AbstractDB):
Expand All @@ -29,6 +77,7 @@ class MongoDB(AbstractDB):
"""

@mongodb_exception_wrapper
def initiate_connection(self):
"""Connect to database, unless MongoDB `is_connected`.
Expand All @@ -40,23 +89,13 @@ def initiate_connection(self):

self._sanitize_attrs()

try:
self._conn = pymongo.MongoClient(host=self.host,
port=self.port,
username=self.username,
password=self.password,
authSource=self.name)
self._db = self._conn[self.name]
self._db.command('ismaster') # .. seealso:: :meth:`is_connected`
except pymongo.errors.ConnectionFailure as e:
self._logger.error("Could not connect to host, %s:%s",
self.host, self.port)
raise DatabaseError("Connection Failure: database not found on "
"specified uri") from e
except pymongo.errors.OperationFailure as e:
self._logger.error("Could not verify user, %s, on database, %s",
self.username, self.name)
raise DatabaseError("Authentication Failure: bad credentials") from e
self._conn = pymongo.MongoClient(host=self.host,
port=self.port,
username=self.username,
password=self.password,
authSource=self.name)
self._db = self._conn[self.name]
self._db.command('ismaster') # .. seealso:: :meth:`is_connected`

@property
def is_connected(self):
Expand All @@ -82,8 +121,44 @@ def close_connection(self):
"""
self._conn.close()

def write(self, collection_name, data,
query=None):
def ensure_index(self, collection_name, keys, unique=False):
"""Create given indexes if they do not already exist in database.
.. seealso:: :meth:`AbstractDB.ensure_index` for argument documentation.
"""
# MongoDB's `create_index()` is idempotent, which means it will only
# create new indexes if they do not already exists. That's why we do
# not need to verify if indexes already exists.
dbcollection = self._db[collection_name]

keys = self._convert_index_keys(keys)

dbcollection.create_index(keys, unique=unique, background=True)

def _convert_index_keys(self, keys):
"""Convert index keys to MongoDB ones."""
if not isinstance(keys, (list, tuple)):
keys = [(keys, self.ASCENDING)]

converted_keys = []
for key, sort_order in keys:
converted_keys.append((key, self._convert_sort_order(sort_order)))

return converted_keys

def _convert_sort_order(self, sort_order):
"""Convert generic `AbstractDB` sort orders to MongoDB ones."""
if sort_order is self.ASCENDING:
return pymongo.ASCENDING
elif sort_order is self.DESCENDING:
return pymongo.DESCENDING
else:
raise RuntimeError("Invalid database sort order %s" %
str(sort_order))

@mongodb_exception_wrapper
def write(self, collection_name, data, query=None):
"""Write new information to a collection. Perform insert or update.
.. seealso:: :meth:`AbstractDB.write` for argument documentation.
Expand Down Expand Up @@ -119,6 +194,7 @@ def read(self, collection_name, query=None, selection=None):

return dbdocs

@mongodb_exception_wrapper
def read_and_write(self, collection_name, query, data, selection=None):
"""Read a collection's document and update the found document.
Expand Down
27 changes: 23 additions & 4 deletions src/metaopt/core/worker/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self, name):
log.debug("Creating Experiment object with name: %s", name)
self._init_done = False
self._db = Database() # fetch database instance
self._setup_db() # build indexes for collections

self._id = None
self.name = name
Expand Down Expand Up @@ -134,6 +135,19 @@ def __init__(self, name):

self._last_fetched = self.metadata['datetime']

def _setup_db(self):
self._db.ensure_index('experiments',
[('name', Database.ASCENDING),
('metadata.user', Database.ASCENDING)],
unique=True)
self._db.ensure_index('experiments', 'status')

self._db.ensure_index('trials', 'experiment')
self._db.ensure_index('trials', 'status')
self._db.ensure_index('trials', 'results')
self._db.ensure_index('trials', 'start_time')
self._db.ensure_index('trials', [('end_time', Database.DESCENDING)])

def reserve_trial(self, score_handle=None):
"""Find *new* trials that exist currently in database and select one of
them based on the highest score return from `score_handle` callable.
Expand Down Expand Up @@ -324,15 +338,20 @@ def configure(self, config):

# If everything is alright, push new config to database
if is_new:
# TODO: No need for read_and_write here, because unique indexes
# will make sure no experiments will be added with identical
# names. That means, this need refactoring once support for
# additional indexes is added to database.
# This will raise DuplicateKeyError if a concurrent experiment with
# identical (name, metadata.user) is written first in the database.

self._db.write('experiments', final_config)
# XXX: Reminder for future DB implementations:
# MongoDB, updates an inserted dict with _id, so should you :P
self._id = final_config['_id']
else:
# Writing the final config to an already existing experiment raises
# a DuplicatKeyError because of the embedding id `metadata.user`.
# To avoid this `final_config["name"]` is popped out before
# `db.write()`, thus seamingly breaking the compound index
# `(name, metadata.user)`
final_config.pop("name")
self._db.write('experiments', final_config, {'_id': self._id})

@property
Expand Down
2 changes: 1 addition & 1 deletion tests/unittests/core/experiment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
suspend: False
done: False

- name: supernaedo2
- name: supernaedo3

metadata:
user: tsirif
Expand Down
Loading

0 comments on commit dff43f1

Please sign in to comment.