From 55abc570e72cfce352c852ac7cc57a334e296b09 Mon Sep 17 00:00:00 2001 From: Matthew V Date: Wed, 15 Feb 2017 14:18:39 -0500 Subject: [PATCH] rebase to pickup test improvements, OSX build improvement, thread task cleanups, leveldb 2.0.34 --- BASHO_RELEASES | 7 + c_src/build_deps.sh | 6 +- c_src/eleveldb.cc | 96 ++------- c_src/workitems.cc | 206 ++++++++++++++++++ c_src/workitems.h | 175 +++------------ src/eleveldb.erl | 502 +++++++++++++++++++++++++++++++------------- test/cacheleak.erl | 82 +++++--- test/cleanup.erl | 224 ++++++++++---------- test/iterators.erl | 181 ++++++++-------- 9 files changed, 874 insertions(+), 605 deletions(-) diff --git a/BASHO_RELEASES b/BASHO_RELEASES index 3fb9ab7f..077012d3 100644 --- a/BASHO_RELEASES +++ b/BASHO_RELEASES @@ -1,3 +1,10 @@ +github.com tag 2.0.34 - February 15, 2017 +----------------------------------------- +mv-hot-backup2: - correct MakeTieredDbname() within db/filename.cc + for case where dbname input is blank and fast/slow + already populated in options. Corrects issue + with hot backup in non-tiered storage situations + github.com tag 2.0.33 - November 21, 2016 ----------------------------------------- mv-bucket-expiry: - partial branch to enable X-Riak-Meta-Expiry-Base-Seconds diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 14ad33b4..336be0b9 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -8,7 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then fi unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well -LEVELDB_VSN="2.0.33" +LEVELDB_VSN="2.0.34" SNAPPY_VSN="1.0.4" @@ -65,9 +65,11 @@ case "$1" in ;; *) + export MACOSX_DEPLOYMENT_TARGET=10.8 + if [ ! -d snappy-$SNAPPY_VSN ]; then tar -xzf snappy-$SNAPPY_VSN.tar.gz - (cd snappy-$SNAPPY_VSN && ./configure --prefix=$BASEDIR/system --libdir=$BASEDIR/system/lib --with-pic) + (cd snappy-$SNAPPY_VSN && ./configure --disable-shared --prefix=$BASEDIR/system --libdir=$BASEDIR/system/lib --with-pic) fi if [ ! -f system/lib/libsnappy.a ]; then diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index f8ae599f..94e0e4fe 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -612,6 +612,21 @@ ERL_NIF_TERM send_reply(ErlNifEnv *env, ERL_NIF_TERM ref, ERL_NIF_TERM reply) return ATOM_OK; } +// Boilerplate for submitting to the thread queue. +// Takes ownership of the item. assumes allocated through new + +ERL_NIF_TERM +submit_to_thread_queue(eleveldb::WorkTask *work_item, ErlNifEnv* env, ERL_NIF_TERM caller_ref){ + eleveldb_priv_data& data = *static_cast(enif_priv_data(env)); + if(false == data.thread_pool.Submit(work_item)) + { + delete work_item; + return send_reply(env, caller_ref, + enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); + } // if + return eleveldb::ATOM_OK; +} + ERL_NIF_TERM async_open( ErlNifEnv* env, @@ -666,15 +681,7 @@ async_open( eleveldb::WorkTask *work_item = new eleveldb::OpenTask(env, caller_ref, db_name, opts); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } - - return eleveldb::ATOM_OK; + return submit_to_thread_queue(work_item, env, caller_ref); } // async_open @@ -705,8 +712,6 @@ async_write( if(NULL == db_ptr->m_Db) return send_reply(env, caller_ref, error_einval(env)); - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - // Construct a write batch: leveldb::WriteBatch* batch = new leveldb::WriteBatch; @@ -728,16 +733,7 @@ async_write( eleveldb::WorkTask* work_item = new eleveldb::WriteTask(env, caller_ref, db_ptr, batch, opts); - - if(false == priv.thread_pool.Submit(work_item)) - { - // work_item contains "batch" and the delete below gets both memory allocations - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } // if - - return eleveldb::ATOM_OK; + return submit_to_thread_queue(work_item, env, caller_ref); } @@ -771,17 +767,7 @@ async_get( eleveldb::WorkTask *work_item = new eleveldb::GetTask(env, caller_ref, db_ptr, key_ref, opts); - - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } // if - - return eleveldb::ATOM_OK; + return submit_to_thread_queue(work_item, env, caller_ref); } // async_get @@ -818,18 +804,7 @@ async_iterator( eleveldb::WorkTask *work_item = new eleveldb::IterTask(env, caller_ref, db_ptr, keys_only, opts); - - // Now-boilerplate setup (we'll consolidate this pattern soon, I hope): - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref)); - } // if - - return ATOM_OK; - + return submit_to_thread_queue(work_item, env, caller_ref); } // async_iterator @@ -1050,15 +1025,8 @@ async_close( { eleveldb::WorkTask *work_item = new eleveldb::CloseTask(env, caller_ref, db_ptr); + return submit_to_thread_queue(work_item, env, caller_ref); - // Now-boilerplate setup (we'll consolidate this pattern soon, I hope): - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref)); - } // if } // if else if (!term_ok) { @@ -1098,15 +1066,7 @@ async_iterator_close( { eleveldb::WorkTask *work_item = new eleveldb::ItrCloseTask(env, caller_ref, itr_ptr); - - // Now-boilerplate setup (we'll consolidate this pattern soon, I hope): - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref)); - } // if + return submit_to_thread_queue(work_item, env, caller_ref); } // if // this close/cleanup call is way late ... bad programmer! @@ -1136,25 +1096,15 @@ async_destroy( ERL_NIF_TERM caller_ref = argv[0]; - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - leveldb::Options *opts = new leveldb::Options; fold(env, argv[2], parse_open_option, *opts); eleveldb::WorkTask *work_item = new eleveldb::DestroyTask(env, caller_ref, db_name, opts); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } - - return eleveldb::ATOM_OK; - + return submit_to_thread_queue(work_item, env, caller_ref); } // async_destroy + } // namespace eleveldb diff --git a/c_src/workitems.cc b/c_src/workitems.cc index 0966ef08..911a45cd 100644 --- a/c_src/workitems.cc +++ b/c_src/workitems.cc @@ -178,12 +178,141 @@ OpenTask::DoWork() } // OpenTask::DoWork() +/** + * WriteTask functions + */ + +WriteTask::WriteTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + leveldb::WriteBatch* _batch, + leveldb::WriteOptions* _options) + : WorkTask(_owner_env, _caller_ref, _db_handle), + batch(_batch), + options(_options) +{} + +WriteTask::~WriteTask() +{ + delete batch; + delete options; +} +work_result +WriteTask::DoWork() +{ + leveldb::Status status = m_DbPtr->m_Db->Write(*options, batch); + + return (status.ok() ? work_result(ATOM_OK) : work_result(local_env(), ATOM_ERROR_DB_WRITE, status)); +} + +/** + * GetTask functions + */ + +GetTask::GetTask(ErlNifEnv *_caller_env, + ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + ERL_NIF_TERM _key_term, + leveldb::ReadOptions &_options) + : WorkTask(_caller_env, _caller_ref, _db_handle), + options(_options) +{ + ErlNifBinary key; + + enif_inspect_binary(_caller_env, _key_term, &key); + m_Key.assign((const char *)key.data, key.size); +} + +GetTask::~GetTask() {} + +work_result +GetTask::DoWork() +{ + ERL_NIF_TERM value_bin; + BinaryValue value(local_env(), value_bin); + leveldb::Slice key_slice(m_Key); + + leveldb::Status status = m_DbPtr->m_Db->Get(options, key_slice, &value); + + if(!status.ok()){ + if ( status.IsNotFound() ) + return work_result(ATOM_NOT_FOUND); + else + return work_result(local_env(), ATOM_ERROR, status); + } + + return work_result(local_env(), ATOM_OK, value_bin); +} + +/** + * IterTask functions + */ + +IterTask::IterTask(ErlNifEnv *_caller_env, + ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + const bool _keys_only, + leveldb::ReadOptions &_options) + : WorkTask(_caller_env, _caller_ref, _db_handle), + keys_only(_keys_only), options(_options) +{} + +IterTask::~IterTask() {} + +work_result +IterTask::DoWork() +{ + ItrObject * itr_ptr=0; + void * itr_ptr_ptr=0; + + // NOTE: transferring ownership of options to ItrObject + itr_ptr_ptr=ItrObject::CreateItrObject(m_DbPtr, keys_only, options); + + // Copy caller_ref to reuse in future iterator_move calls + itr_ptr=((ItrObjErlang*)itr_ptr_ptr)->m_ItrPtr; + itr_ptr->itr_ref_env = enif_alloc_env(); + itr_ptr->itr_ref = enif_make_copy(itr_ptr->itr_ref_env, caller_ref()); + + ERL_NIF_TERM result = enif_make_resource(local_env(), itr_ptr_ptr); + + // release reference created during CreateItrObject() + enif_release_resource(itr_ptr_ptr); + + return work_result(local_env(), ATOM_OK, result); +} // operator() /** * MoveTask functions */ +// Constructor with no seek target: + +MoveTask::MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, + ItrObjectPtr_t & Iter, action_t& _action) + : WorkTask(NULL, _caller_ref, Iter->m_DbPtr), + m_Itr(Iter), action(_action) +{ + // special case construction + local_env_=NULL; + enif_self(_caller_env, &local_pid); +} + +// Constructor with seek target: + +MoveTask::MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, + ItrObjectPtr_t & Iter, action_t& _action, + std::string& _seek_target) + : WorkTask(NULL, _caller_ref, Iter->m_DbPtr), + m_Itr(Iter), action(_action), + seek_target(_seek_target) +{ + // special case construction + local_env_=NULL; + enif_self(_caller_env, &local_pid); +} + +MoveTask::~MoveTask() {}; + work_result MoveTask::DoWork() { @@ -359,6 +488,83 @@ MoveTask::recycle() } // MoveTask::recycle +/** + * CloseTask functions + */ + +CloseTask::CloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle) + : WorkTask(_owner_env, _caller_ref, _db_handle) +{} + +CloseTask::~CloseTask() +{ +} + +work_result +CloseTask::DoWork() +{ + DbObject * db_ptr; + + // get db pointer then clear reference count to it + db_ptr=m_DbPtr.get(); + m_DbPtr.assign(NULL); + + if (NULL!=db_ptr) + { + // set closing flag, this is blocking + db_ptr->InitiateCloseRequest(); + + // db_ptr no longer valid + db_ptr=NULL; + + return(work_result(ATOM_OK)); + } // if + else + { + return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); + } // else +} + +/** + * ItrCloseTask functions + */ + +ItrCloseTask::ItrCloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, + ItrObjectPtr_t & _itr_handle) + : WorkTask(_owner_env, _caller_ref), + m_ItrPtr(_itr_handle) +{} + +ItrCloseTask::~ItrCloseTask() +{ +} + +work_result +ItrCloseTask::DoWork() +{ + ItrObject * itr_ptr; + + // get iterator pointer then clear reference count to it + itr_ptr=m_ItrPtr.get(); + m_ItrPtr.assign(NULL); + + if (NULL!=itr_ptr) + { + // set closing flag, this is blocking + itr_ptr->InitiateCloseRequest(); + + // itr_ptr no longer valid + itr_ptr=NULL; + + return(work_result(ATOM_OK)); + } // if + else + { + return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); + } // else +} + /** * DestroyTask functions */ diff --git a/c_src/workitems.h b/c_src/workitems.h index e984709a..475b7512 100644 --- a/c_src/workitems.h +++ b/c_src/workitems.h @@ -132,32 +132,23 @@ class WriteTask : public WorkTask { protected: leveldb::WriteBatch* batch; - leveldb::WriteOptions* options; + leveldb::WriteOptions* options; public: - WriteTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, DbObjectPtr_t & _db_handle, leveldb::WriteBatch* _batch, - leveldb::WriteOptions* _options) - : WorkTask(_owner_env, _caller_ref, _db_handle), - batch(_batch), - options(_options) - {} + leveldb::WriteOptions* _options); - virtual ~WriteTask() - { - delete batch; - delete options; - } + virtual ~WriteTask(); protected: - virtual work_result DoWork() - { - leveldb::Status status = m_DbPtr->m_Db->Write(*options, batch); + virtual work_result DoWork(); - return (status.ok() ? work_result(ATOM_OK) : work_result(local_env(), ATOM_ERROR_DB_WRITE, status)); - } +private: + WriteTask(); + WriteTask(const WriteTask &); + WriteTask & operator=(const WriteTask &); }; // class WriteTask @@ -209,34 +200,11 @@ class GetTask : public WorkTask ERL_NIF_TERM _caller_ref, DbObjectPtr_t & _db_handle, ERL_NIF_TERM _key_term, - leveldb::ReadOptions &_options) - : WorkTask(_caller_env, _caller_ref, _db_handle), - options(_options) - { - ErlNifBinary key; + leveldb::ReadOptions &_options); - enif_inspect_binary(_caller_env, _key_term, &key); - m_Key.assign((const char *)key.data, key.size); - } - - virtual ~GetTask() - { - } - -protected: - virtual work_result DoWork() - { - ERL_NIF_TERM value_bin; - BinaryValue value(local_env(), value_bin); - leveldb::Slice key_slice(m_Key); + virtual ~GetTask(); - leveldb::Status status = m_DbPtr->m_Db->Get(options, key_slice, &value); - - if(!status.ok()) - return work_result(ATOM_NOT_FOUND); - - return work_result(local_env(), ATOM_OK, value_bin); - } + virtual work_result DoWork(); }; // class GetTask @@ -258,36 +226,11 @@ class IterTask : public WorkTask ERL_NIF_TERM _caller_ref, DbObjectPtr_t & _db_handle, const bool _keys_only, - leveldb::ReadOptions &_options) - : WorkTask(_caller_env, _caller_ref, _db_handle), - keys_only(_keys_only), options(_options) - {} - - virtual ~IterTask() - { - } - -protected: - virtual work_result DoWork() - { - ItrObject * itr_ptr; - void * itr_ptr_ptr; - - // NOTE: transfering ownership of options to ItrObject - itr_ptr_ptr=ItrObject::CreateItrObject(m_DbPtr, keys_only, options); + leveldb::ReadOptions &_options); - // Copy caller_ref to reuse in future iterator_move calls - itr_ptr=((ItrObjErlang*)itr_ptr_ptr)->m_ItrPtr; - itr_ptr->itr_ref_env = enif_alloc_env(); - itr_ptr->itr_ref = enif_make_copy(itr_ptr->itr_ref_env, caller_ref()); + virtual ~IterTask(); - ERL_NIF_TERM result = enif_make_resource(local_env(), itr_ptr_ptr); - - // release reference created during CreateItrObject() - enif_release_resource(itr_ptr_ptr); - - return work_result(local_env(), ATOM_OK, result); - } + virtual work_result DoWork(); }; // class IterTask @@ -308,28 +251,14 @@ class MoveTask : public WorkTask // No seek target: MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, - ItrObjectPtr_t & Iter, action_t& _action) - : WorkTask(NULL, _caller_ref, Iter->m_DbPtr), - m_Itr(Iter), action(_action) - { - // special case construction - local_env_=NULL; - enif_self(_caller_env, &local_pid); - } + ItrObjectPtr_t & Iter, action_t& _action); // With seek target: MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, ItrObjectPtr_t & Iter, action_t& _action, - std::string& _seek_target) - : WorkTask(NULL, _caller_ref, Iter->m_DbPtr), - m_Itr(Iter), action(_action), - seek_target(_seek_target) - { - // special case construction - local_env_=NULL; - enif_self(_caller_env, &local_pid); - } - virtual ~MoveTask() {}; + std::string& _seek_target); + + virtual ~MoveTask(); virtual ErlNifEnv *local_env(); @@ -352,38 +281,11 @@ class CloseTask : public WorkTask public: CloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, - DbObjectPtr_t & _db_handle) - : WorkTask(_owner_env, _caller_ref, _db_handle) - {} - - virtual ~CloseTask() - { - } - -protected: - virtual work_result DoWork() - { - DbObject * db_ptr; - - // get db pointer then clear reference count to it - db_ptr=m_DbPtr.get(); - m_DbPtr.assign(NULL); - - if (NULL!=db_ptr) - { - // set closing flag, this is blocking - db_ptr->InitiateCloseRequest(); + DbObjectPtr_t & _db_handle); - // db_ptr no longer valid - db_ptr=NULL; + virtual ~CloseTask(); - return(work_result(ATOM_OK)); - } // if - else - { - return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); - } // else - } + virtual work_result DoWork(); }; // class CloseTask @@ -398,41 +300,12 @@ class ItrCloseTask : public WorkTask ReferencePtr m_ItrPtr; public: - ItrCloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, - ItrObjectPtr_t & _itr_handle) - : WorkTask(_owner_env, _caller_ref), - m_ItrPtr(_itr_handle) - {} + ItrObjectPtr_t & _itr_handle); - virtual ~ItrCloseTask() - { - } + virtual ~ItrCloseTask(); -protected: - virtual work_result DoWork() - { - ItrObject * itr_ptr; - - // get iterator pointer then clear reference count to it - itr_ptr=m_ItrPtr.get(); - m_ItrPtr.assign(NULL); - - if (NULL!=itr_ptr) - { - // set closing flag, this is blocking - itr_ptr->InitiateCloseRequest(); - - // itr_ptr no longer valid - itr_ptr=NULL; - - return(work_result(ATOM_OK)); - } // if - else - { - return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); - } // else - } + virtual work_result DoWork(); }; // class ItrCloseTask diff --git a/src/eleveldb.erl b/src/eleveldb.erl index a21edd6c..d700ce5c 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010-2012 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2010-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -19,6 +17,8 @@ %% under the License. %% %% ------------------------------------------------------------------- + +%% @doc Erlang NIF wrapper for LevelDB -module(eleveldb). -export([open/2, @@ -49,14 +49,29 @@ -on_load(init/0). -ifdef(TEST). --compile(export_all). +-export([ + assert_close/1, + assert_open/1, + assert_open/2, + assert_open_small/1, + create_test_dir/0, + delete_test_dir/1, + terminal_format/2 +]). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). --define(QC_OUT(P), - eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)). --endif. +-define(QC_OUT(P), eqc:on_output(fun terminal_format/2, P)). +-endif. % EQC -include_lib("eunit/include/eunit.hrl"). --endif. + +%% Maximum number of distinct database instances to create in any test. +%% The highest runtime limit used is the lower of this value or +%% ((num-schedulers x 4) + 1). +%% This limit is driven by filesystem size constraints on builders - MvM's +%% trials have shown this value to work on the majority of builders the +%% majority of the time. +-define(MAX_TEST_OPEN, 21). +-endif. % TEST %% This cannot be a separate function. Code must be inline to trigger %% Erlang compiler's use of optimized selective receive. @@ -370,7 +385,12 @@ do_fold(Itr, Fun, Acc0, Opts) -> true = is_binary(Start) or (Start == first), fold_loop(iterator_move(Itr, Start), Itr, Fun, Acc0) after - iterator_close(Itr) + %% This clause shouldn't change the operation's result. + %% If the iterator has been invalidated by it or the db being closed, + %% the try clause above will raise an exception, and that's the one we + %% want to propagate. Catch the exception this raises in that case and + %% ignore it so we don't obscure the original. + catch iterator_close(Itr) end. fold_loop({error, iterator_closed}, _Itr, _Fun, Acc0) -> @@ -395,105 +415,280 @@ validate_type(_, _) -> false. %% =================================================================== -%% EUnit tests +%% Tests %% =================================================================== -ifdef(TEST). -open_test() -> [{open_test_Z(), l} || l <- lists:seq(1, 20)]. -open_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.open.test"), - {ok, Ref} = open("/tmp/eleveldb.open.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - {ok, <<"123">>} = ?MODULE:get(Ref, <<"abc">>, []), - not_found = ?MODULE:get(Ref, <<"def">>, []). - -fold_test() -> [{fold_test_Z(), l} || l <- lists:seq(1, 20)]. -fold_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.fold.test"), - {ok, Ref} = open("/tmp/eleveldb.fold.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - ok = ?MODULE:put(Ref, <<"hij">>, <<"789">>, []), - [{<<"abc">>, <<"123">>}, - {<<"def">>, <<"456">>}, - {<<"hij">>, <<"789">>}] = lists:reverse(fold(Ref, fun({K, V}, Acc) -> [{K, V} | Acc] end, - [], [])). - -fold_keys_test() -> [{fold_keys_test_Z(), l} || l <- lists:seq(1, 20)]. -fold_keys_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.fold.keys.test"), - {ok, Ref} = open("/tmp/eleveldb.fold.keys.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - ok = ?MODULE:put(Ref, <<"hij">>, <<"789">>, []), - [<<"abc">>, <<"def">>, <<"hij">>] = lists:reverse(fold_keys(Ref, - fun(K, Acc) -> [K | Acc] end, - [], [])). - -fold_from_key_test() -> [{fold_from_key_test_Z(), l} || l <- lists:seq(1, 20)]. -fold_from_key_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.fold.fromkeys.test"), - {ok, Ref} = open("/tmp/eleveldb.fromfold.keys.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - ok = ?MODULE:put(Ref, <<"hij">>, <<"789">>, []), - [<<"def">>, <<"hij">>] = lists:reverse(fold_keys(Ref, - fun(K, Acc) -> [K | Acc] end, - [], [{first_key, <<"d">>}])). - -destroy_test() -> [{destroy_test_Z(), l} || l <- lists:seq(1, 20)]. -destroy_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.destroy.test"), - {ok, Ref} = open("/tmp/eleveldb.destroy.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - {ok, <<"456">>} = ?MODULE:get(Ref, <<"def">>, []), - close(Ref), - ok = ?MODULE:destroy("/tmp/eleveldb.destroy.test", []), - {error, {db_open, _}} = open("/tmp/eleveldb.destroy.test", [{error_if_exists, true}]). - -compression_test() -> [{compression_test_Z(), l} || l <- lists:seq(1, 20)]. -compression_test_Z() -> - CompressibleData = list_to_binary([0 || _X <- lists:seq(1,20)]), - os:cmd("rm -rf /tmp/eleveldb.compress.0 /tmp/eleveldb.compress.1"), - {ok, Ref0} = open("/tmp/eleveldb.compress.0", [{write_buffer_size, 5}, - {create_if_missing, true}, - {compression, false}]), - [ok = ?MODULE:put(Ref0, <>, CompressibleData, [{sync, true}]) || - I <- lists:seq(1,10)], - {ok, Ref1} = open("/tmp/eleveldb.compress.1", [{write_buffer_size, 5}, - {create_if_missing, true}, - {compression, true}]), - [ok = ?MODULE:put(Ref1, <>, CompressibleData, [{sync, true}]) || - I <- lists:seq(1,10)], - %% Check both of the LOG files created to see if the compression option was correctly - %% passed down - MatchCompressOption = - fun(File, Expected) -> - {ok, Contents} = file:read_file(File), - case re:run(Contents, "Options.compression: " ++ Expected) of - {match, _} -> match; - nomatch -> nomatch - end - end, - Log0Option = MatchCompressOption("/tmp/eleveldb.compress.0/LOG", "0"), - Log1Option = MatchCompressOption("/tmp/eleveldb.compress.1/LOG", "1"), - ?assert(Log0Option =:= match andalso Log1Option =:= match). - - -close_test() -> [{close_test_Z(), l} || l <- lists:seq(1, 20)]. -close_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.close.test"), - {ok, Ref} = open("/tmp/eleveldb.close.test", [{create_if_missing, true}]), - ?assertEqual(ok, close(Ref)), - ?assertEqual({error, einval}, close(Ref)). - -close_fold_test() -> [{close_fold_test_Z(), l} || l <- lists:seq(1, 20)]. -close_fold_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.close_fold.test"), - {ok, Ref} = open("/tmp/eleveldb.close_fold.test", [{create_if_missing, true}]), - ok = eleveldb:put(Ref, <<"k">>,<<"v">>,[]), - ?assertException(throw, {iterator_closed, ok}, % ok is returned by close as the acc - eleveldb:fold(Ref, fun(_,_A) -> eleveldb:close(Ref) end, undefined, [])). +%% =================================================================== +%% Exported Test Helpers +%% =================================================================== + +-spec assert_close(DbRef :: db_ref()) -> ok | no_return(). +%% +%% Closes DbRef inside an ?assert... macro. +%% +assert_close(DbRef) -> + ?assertEqual(ok, ?MODULE:close(DbRef)). + +-spec assert_open(DbPath :: string()) -> db_ref() | no_return(). +%% +%% Opens Path inside an ?assert... macro, creating the database directory if needed. +%% +assert_open(DbPath) -> + assert_open(DbPath, [{create_if_missing, true}]). + +-spec assert_open(DbPath :: string(), OpenOpts :: open_options()) + -> db_ref() | no_return(). +%% +%% Opens DbPath, with OpenOpts, inside an ?assert... macro. +%% +assert_open(DbPath, OpenOpts) -> + OpenRet = ?MODULE:open(DbPath, OpenOpts), + ?assertMatch({ok, _}, OpenRet), + {_, DbRef} = OpenRet, + DbRef. + +-spec assert_open_small(DbPath :: string()) -> db_ref() | no_return(). +%% +%% Opens Path inside an ?assert... macro, using a limited storage footprint +%% and creating the database directory if needed. +%% +assert_open_small(DbPath) -> + assert_open(DbPath, [{create_if_missing, true}, {limited_developer_mem, true}]). + +-spec create_test_dir() -> string() | no_return(). +%% +%% Creates a new, empty, uniquely-named directory for testing and returns +%% its full path. This operation *should* never fail, but would raise an +%% ?assert...-ish exception if it did. +%% +create_test_dir() -> + string:strip(?cmd("mktemp -d /tmp/" ?MODULE_STRING ".XXXXXXX"), both, $\n). + +-spec delete_test_dir(Dir :: string()) -> ok | no_return(). +%% +%% Deletes a test directory fully, whether or not it exists. +%% This operation *should* never fail, but would raise an ?assert...-ish +%% exception if it did. +%% +delete_test_dir(Dir) -> + ?assertCmd("rm -rf " ++ Dir). + +-spec terminal_format(Fmt :: io:format(), Args :: list()) -> ok. +%% +%% Writes directly to the terminal, bypassing EUnit hooks. +%% +terminal_format(Fmt, Args) -> + io:format(user, Fmt, Args). + +%% =================================================================== +%% EUnit Tests +%% =================================================================== + +-define(local_test(Timeout, TestFunc), + fun(TestRoot) -> + Title = erlang:atom_to_list(TestFunc), + TestDir = filename:join(TestRoot, TestFunc), + {Title, {timeout, Timeout, fun() -> TestFunc(TestDir) end}} + end +). +-define(local_test(TestFunc), ?local_test(10, TestFunc)). +-define(max_test_open(Calc), erlang:min(?MAX_TEST_OPEN, Calc)). + +eleveldb_test_() -> + {foreach, + fun create_test_dir/0, + fun delete_test_dir/1, + [ + ?local_test(test_open), + ?local_test(test_close), + ?local_test(test_destroy), + ?local_test(test_fold), + ?local_test(test_fold_keys), + ?local_test(test_fold_from_key), + ?local_test(test_close_fold), + % On weak machines the following can take a while, so we tweak + % them a bit to avoid timeouts. On anything resembling a competent + % computer, these should complete in a small fraction of a second, + % but on some lightweight VMs used for validation, that can be + % extended by orders of magnitude. + ?local_test(15, test_compression), + fun(TestRoot) -> + TestName = "test_open_many", + TestDir = filename:join(TestRoot, TestName), + Count = ?max_test_open(erlang:system_info(schedulers) * 4 + 1), + Title = lists:flatten(io_lib:format("~s(~b)", [TestName, Count])), + {Title, {timeout, 30, fun() -> test_open_many(TestDir, Count) end}} + end + ] + }. + +%% fold accumulator used in a few tests +accumulate(Val, Acc) -> + [Val | Acc]. + +%% +%% Individual tests +%% + +test_open(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual({ok, <<"123">>}, ?MODULE:get(Ref, <<"abc">>, [])), + ?assertEqual(not_found, ?MODULE:get(Ref, <<"def">>, [])), + assert_close(Ref). + +test_open_many(TestDir, HowMany) -> + Insts = lists:seq(1, HowMany), + KNonce = erlang:make_ref(), + VNonce = erlang:self(), + WorkSet = [ + begin + D = lists:flatten(io_lib:format("~s.~b", [TestDir, N])), + T = os:timestamp(), + K = erlang:phash2([T, N, KNonce], 1 bsl 32), + V = erlang:phash2([N, T, VNonce], 1 bsl 32), + {assert_open_small(D), + <>, <>} + end || N <- Insts], + lists:foreach( + fun({Ref, Key, Val}) -> + ?assertEqual(ok, ?MODULE:put(Ref, Key, Val, [])) + end, WorkSet), + lists:foreach( + fun({Ref, Key, Val}) -> + ?assertEqual({ok, Val}, ?MODULE:get(Ref, Key, [])) + end, WorkSet), + lists:foreach(fun assert_close/1, [R || {R, _, _} <- WorkSet]). + +test_close(TestDir) -> + Ref = assert_open(TestDir, [{create_if_missing, true}]), + assert_close(Ref), + ?assertError(badarg, ?MODULE:close(Ref)). + +test_fold(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"hij">>, <<"789">>, [])), + ?assertEqual( + [{<<"abc">>, <<"123">>}, {<<"def">>, <<"456">>}, {<<"hij">>, <<"789">>}], + lists:reverse(?MODULE:fold(Ref, fun accumulate/2, [], []))), + assert_close(Ref). + +test_fold_keys(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"hij">>, <<"789">>, [])), + ?assertEqual( + [<<"abc">>, <<"def">>, <<"hij">>], + lists:reverse(?MODULE:fold_keys(Ref, fun accumulate/2, [], []))), + assert_close(Ref). + +test_fold_from_key(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"hij">>, <<"789">>, [])), + ?assertEqual([<<"def">>, <<"hij">>], lists:reverse( + ?MODULE:fold_keys(Ref, fun accumulate/2, [], [{first_key, <<"d">>}]))), + assert_close(Ref). + +test_destroy(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual({ok, <<"456">>}, ?MODULE:get(Ref, <<"def">>, [])), + assert_close(Ref), + ?assertEqual(ok, ?MODULE:destroy(TestDir, [])), + ?assertMatch({error, {db_open, _}}, ?MODULE:open(TestDir, [{error_if_exists, true}])). + +test_compression(TestDir) -> + IntSeq = lists:seq(1, 10), + CompressibleData = list_to_binary(lists:duplicate(20, 0)), + + Ref0 = assert_open(TestDir ++ ".0", [ + {write_buffer_size, 5}, {create_if_missing, true}, {compression, false}]), + lists:foreach( + fun(I) -> + ?assertEqual(ok, + ?MODULE:put(Ref0, <>, CompressibleData, [{sync, true}])) + end, IntSeq), + + Ref1 = assert_open(TestDir ++ ".1", [ + {write_buffer_size, 5}, {create_if_missing, true}, {compression, true}]), + lists:foreach( + fun(I) -> + ?assertEqual(ok, + ?MODULE:put(Ref1, <>, CompressibleData, [{sync, true}])) + end, IntSeq), + + %% Check both of the LOG files created to see if the compression option was + %% passed down correctly + lists:foreach( + fun(Val) -> + File = filename:join(TestDir ++ [$. | Val], "LOG"), + RRet = file:read_file(File), + ?assertMatch({ok, _}, RRet), + {_, Data} = RRet, + Pattern = "Options.compression: " ++ Val, + ?assertMatch({match, _}, re:run(Data, Pattern)) + end, ["0", "1"]), + assert_close(Ref0), + assert_close(Ref1). + +test_close_fold(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"k">>,<<"v">>,[])), + ?assertError(badarg, + ?MODULE:fold(Ref, fun(_,_) -> assert_close(Ref) end, undefined, [])). + +%% +%% Parallel tests +%% + +parallel_test_() -> + ParaCnt = ?max_test_open(erlang:system_info(schedulers) * 2 + 1), + LoadCnt = 99, + TestSeq = lists:seq(1, ParaCnt), + {foreach, + fun create_test_dir/0, + fun delete_test_dir/1, + [fun(TestRoot) -> + {inparallel, [begin + T = lists:flatten(io_lib:format("load proc ~b", [N])), + D = filename:join(TestRoot, io_lib:format("parallel_test.~b", [N])), + S = lists:seq(N, (N + LoadCnt - 1)), + {T, fun() -> run_load(D, S) end} + end || N <- TestSeq]} + end] + }. + +run_load(TestDir, IntSeq) -> + KNonce = [os:timestamp(), erlang:self()], + Ref = assert_open_small(TestDir), + VNonce = [erlang:make_ref(), os:timestamp()], + KVIn = [ + begin + K = erlang:phash2([N | KNonce], 1 bsl 32), + V = erlang:phash2([N | VNonce], 1 bsl 32), + {<>, <>} + end || N <- IntSeq], + lists:foreach( + fun({Key, Val}) -> + ?assertEqual(ok, ?MODULE:put(Ref, Key, Val, [])) + end, KVIn), + {L, R} = lists:split(erlang:hd(IntSeq), KVIn), + KVOut = R ++ L, + lists:foreach( + fun({Key, Val}) -> + ?assertEqual({ok, Val}, ?MODULE:get(Ref, Key, [])) + end, KVOut), + assert_close(Ref). + +%% =================================================================== +%% QuickCheck Tests +%% =================================================================== -ifdef(EQC). @@ -512,56 +707,81 @@ ops(Keys, Values) -> apply_kv_ops([], _Ref, Acc0) -> Acc0; apply_kv_ops([{put, K, V} | Rest], Ref, Acc0) -> - ok = eleveldb:put(Ref, K, V, []), + ?assertEqual(ok, ?MODULE:put(Ref, K, V, [])), apply_kv_ops(Rest, Ref, orddict:store(K, V, Acc0)); apply_kv_ops([{async_put, K, V} | Rest], Ref, Acc0) -> MyRef = make_ref(), Context = {my_context, MyRef}, - ok = eleveldb:async_put(Ref, Context, K, V, []), + ?assertEqual(ok, ?MODULE:async_put(Ref, Context, K, V, [])), receive {Context, ok} -> apply_kv_ops(Rest, Ref, orddict:store(K, V, Acc0)); Msg -> - error({unexpected_msg, Msg}) + erlang:error({unexpected_msg, Msg}) end; apply_kv_ops([{delete, K, _} | Rest], Ref, Acc0) -> - ok = eleveldb:delete(Ref, K, []), + ?assertEqual(ok, ?MODULE:delete(Ref, K, [])), apply_kv_ops(Rest, Ref, orddict:store(K, deleted, Acc0)). -prop_put_delete() -> +prop_put_delete(TestDir) -> ?LET({Keys, Values}, {keys(), values()}, - ?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))), - begin - ?cmd("rm -rf /tmp/eleveldb.putdelete.qc"), - {ok, Ref} = eleveldb:open("/tmp/eleveldb.putdelete.qc", - [{create_if_missing, true}]), - Model = apply_kv_ops(Ops, Ref, []), - - %% Valdiate that all deleted values return not_found - F = fun({K, deleted}) -> - ?assertEqual(not_found, eleveldb:get(Ref, K, [])); - ({K, V}) -> - ?assertEqual({ok, V}, eleveldb:get(Ref, K, [])) - end, - lists:map(F, Model), - - %% Validate that a fold returns sorted values - Actual = lists:reverse(fold(Ref, fun({K, V}, Acc) -> [{K, V} | Acc] end, - [], [])), - ?assertEqual([{K, V} || {K, V} <- Model, V /= deleted], - Actual), - ok = eleveldb:close(Ref), - true - end)). + ?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))), + begin + delete_test_dir(TestDir), + Ref = assert_open(TestDir, [{create_if_missing, true}]), + Model = apply_kv_ops(Ops, Ref, []), + + %% Validate that all deleted values return not_found + lists:foreach( + fun({K, deleted}) -> + ?assertEqual(not_found, ?MODULE:get(Ref, K, [])); + ({K, V}) -> + ?assertEqual({ok, V}, ?MODULE:get(Ref, K, [])) + end, Model), + + %% Validate that a fold returns sorted values + Actual = lists:reverse( + ?MODULE:fold(Ref, fun({K, V}, Acc) -> [{K, V} | Acc] end, [], [])), + ?assertEqual([{K, V} || {K, V} <- Model, V /= deleted], Actual), + assert_close(Ref), + true + end)). prop_put_delete_test_() -> Timeout1 = 10, Timeout2 = 15, - %% We use the ?ALWAYS(300, ...) wrapper around the second test as a - %% regression test. - [{timeout, 3*Timeout1, {"No ?ALWAYS()", fun() -> qc(eqc:testing_time(Timeout1,prop_put_delete())) end}}, - {timeout, 10*Timeout2, {"With ?ALWAYS()", fun() -> qc(eqc:testing_time(Timeout2,?ALWAYS(150,prop_put_delete()))) end}}]. - --endif. - --endif. + {foreach, + fun create_test_dir/0, + fun delete_test_dir/1, + [ + fun(TestRoot) -> + TestDir = filename:join(TestRoot, "putdelete.qc"), + InnerTO = Timeout1, + OuterTO = (InnerTO * 3), + Title = "Without ?ALWAYS()", + TestFun = fun() -> + qc(eqc:testing_time(InnerTO, prop_put_delete(TestDir))) + end, + {timeout, OuterTO, {Title, TestFun}} + end, + fun(TestRoot) -> + TestDir = filename:join(TestRoot, "putdelete.qc"), + InnerTO = Timeout2, + OuterTO = (InnerTO * 10), + AwCount = (InnerTO * 9), + %% We use the ?ALWAYS(AwCount, ...) wrapper as a regression test. + %% It's not clear how this is effectively different than the first + %% fixture, but I'm leaving it here in case I'm missing something. + Title = lists:flatten(io_lib:format("With ?ALWAYS(~b)", [AwCount])), + TestFun = fun() -> + qc(eqc:testing_time(InnerTO, + ?ALWAYS(AwCount, prop_put_delete(TestDir)))) + end, + {timeout, OuterTO, {Title, TestFun}} + end + ] + }. + +-endif. % EQC + +-endif. % TEST diff --git a/test/cacheleak.erl b/test/cacheleak.erl index 19b3bce1..7b24675a 100644 --- a/test/cacheleak.erl +++ b/test/cacheleak.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010-2013 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2012-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -19,60 +17,80 @@ %% under the License. %% %% ------------------------------------------------------------------- --module(cacheleak). --compile(export_all). +-module(cacheleak). +-ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-define(KV_PAIRS, (1000 * 10)). +-define(VAL_SIZE, (1024 * 10)). +-define(MAX_RSS, (1000 * 500)). % driven by ?KV_PAIRS and ?VAL_SIZE ? + +-define(TEST_LOOPS, 10). +-define(TIMEOUT, (?TEST_LOOPS * 60)). + cacheleak_test_() -> - {timeout, 10*60, fun() -> - [] = os:cmd("rm -rf /tmp/eleveldb.cacheleak.test"), - Blobs = [{<>, compressible_bytes(10240)} || - I <- lists:seq(1, 10000)], - cacheleak_loop(10, Blobs, 500000) - end}. + TestRoot = eleveldb:create_test_dir(), + TestDir = filename:join(TestRoot, ?MODULE), + {setup, + fun() -> TestRoot end, + fun eleveldb:delete_test_dir/1, + {timeout, ?TIMEOUT, fun() -> + Bytes = compressible_bytes(?VAL_SIZE), + Blobs = [{<>, Bytes} || I <- lists:seq(1, ?KV_PAIRS)], + eleveldb:terminal_format("RSS limit: ~b\n", [?MAX_RSS]), + cacheleak_loop(0, Blobs, ?MAX_RSS, TestDir) + end}}. %% It's very important for this test that the data is compressible. Otherwise, -%% the file will be mmaped, and nothing will fill up the cache. +%% the file will be mmapped, and nothing will fill up the cache. compressible_bytes(Count) -> - list_to_binary([0 || _I <- lists:seq(1, Count)]). + erlang:list_to_binary(lists:duplicate(Count, 0)). -cacheleak_loop(0, _Blobs, _MaxFinalRSS) -> - ok; -cacheleak_loop(Count, Blobs, MaxFinalRSS) -> +cacheleak_loop(Count, Blobs, MaxFinalRSS, TestDir) when Count < ?TEST_LOOPS -> %% We spawn a process to open a LevelDB instance and do a series of %% reads/writes to fill up the cache. When the process exits, the LevelDB %% ref will get GC'd and we can re-evaluate the memory footprint of the %% process to make sure everything got cleaned up as expected. F = fun() -> - - {ok, Ref} = eleveldb:open("/tmp/eleveldb.cacheleak.test", - [{create_if_missing, true}, - {limited_developer_mem, true}]), - [ok = eleveldb:put(Ref, I, B, []) || {I, B} <- Blobs], - eleveldb:fold(Ref, fun({_K, _V}, A) -> A end, [], [{fill_cache, true}]), - [{ok, B} = eleveldb:get(Ref, I, []) || {I, B} <- Blobs], - ok = eleveldb:close(Ref), - erlang:garbage_collect(), - io:format(user, "RSS1: ~p\n", [rssmem()]) - end, - {_Pid, Mref} = spawn_monitor(F), + Ref = eleveldb:assert_open_small(TestDir), + lists:foreach( + fun({Key, Val}) -> + ?assertEqual(ok, eleveldb:put(Ref, Key, Val, [])) + end, Blobs), + ?assertEqual([], eleveldb:fold(Ref, + fun({_K, _V}, A) -> A end, [], [{fill_cache, true}])), + lists:foreach( + fun({Key, Val}) -> + ?assertEqual({ok, Val}, eleveldb:get(Ref, Key, [])) + end, Blobs), + eleveldb:assert_close(Ref), + erlang:garbage_collect(), + eleveldb:terminal_format("RSS ~2b: ~p\n", [Count, rssmem()]) + end, + {_Pid, Mon} = erlang:spawn_monitor(F), receive - {'DOWN', Mref, process, _, _} -> + {'DOWN', Mon, process, _, _} -> ok end, RSS = rssmem(), ?assert(MaxFinalRSS > RSS), - cacheleak_loop(Count-1, Blobs, MaxFinalRSS). + cacheleak_loop((Count + 1), Blobs, MaxFinalRSS, TestDir); + +cacheleak_loop(_Count, _Blobs, _MaxFinalRSS, _TestDir) -> + ok. rssmem() -> Cmd = io_lib:format("ps -o rss= -p ~s", [os:getpid()]), - S = string:strip(os:cmd(Cmd), both), + % Don't try to use eunit's ?cmd macro here, it won't do the right thing. + S = string:strip(os:cmd(Cmd), left), % only matters that the 1st character is $0-$9 case string:to_integer(S) of {error, _} -> - io:format(user, "Error parsing integer in: ~s\n", [S]), + eleveldb:terminal_format("Error parsing integer in: ~s\n", [S]), error; {I, _} -> I end. + +-endif. % TEST diff --git a/test/cleanup.erl b/test/cleanup.erl index 9a1c6af3..abb3f8b4 100644 --- a/test/cleanup.erl +++ b/test/cleanup.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2013-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -20,148 +18,141 @@ %% %% ------------------------------------------------------------------- -%% Test various scenarios that properly and improperly close LevelDB DB/iterator -%% handles and ensure everything cleans up properly. - +%% Test various scenarios that properly and improperly close LevelDB +%% DB/iterator handles and ensure everything cleans up properly. -module(cleanup). --compile(export_all). - +-ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). --define(COMMON_INSTANCE_DIR, "/tmp/eleveldb.cleanup.test"). +-define(local_test(Timeout, TestFunc), + fun(TestRoot) -> + Title = erlang:atom_to_list(TestFunc), + TestDir = filename:join(TestRoot, TestFunc), + {Title, {timeout, Timeout, fun() -> TestFunc(TestDir) end}} + end +). +-define(local_test(TestFunc), ?local_test(10, TestFunc)). + +cleanup_test_() -> + {foreach, + fun eleveldb:create_test_dir/0, + fun eleveldb:delete_test_dir/1, + [ + ?local_test(test_open_twice), + ?local_test(test_open_close), + ?local_test(test_open_exit), + ?local_test(test_iterator), + ?local_test(15, test_iterator_db_close), + ?local_test(15, test_iterator_exit) + ] + }. %% Purposely reopen an already opened database to test failure assumption -assumption_test() -> - DB = open(), - try - io:format(user, "assumption_test: top\n", []), - ok = failed_open(), - io:format(user, "assumption_test: bottom\n", []), - ok - after - eleveldb:close(DB), - timer:sleep(500) - end. +test_open_twice(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertMatch({error, {db_open, _}}, + eleveldb:open(TestDir, [{create_if_missing, true}])), + eleveldb:assert_close(DB). %% Open/close -open_close_test() -> - DB = open(), - eleveldb:close(DB), - check(). +test_open_close(TestDir) -> + check_open_close(TestDir), + check_open_close(TestDir). %% Open w/o close -open_exit_test() -> - spawn_wait(fun() -> - _DB = open() - end), - timer:sleep(500), - check(). +test_open_exit(TestDir) -> + spawn_wait(fun() -> eleveldb:assert_open(TestDir) end), + check_open_close(TestDir). %% Iterator open/close -iterator_test() -> - DB = open(), - try - write(100, DB), - {ok, Itr} = eleveldb:iterator(DB, []), - iterate(Itr), - eleveldb:iterator_close(Itr), - eleveldb:close(DB), - check(), - ok - after - catch eleveldb:close(DB), - timer:sleep(500) - end. +test_iterator(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertEqual(ok, write(100, DB)), + ItrRet = eleveldb:iterator(DB, []), + ?assertMatch({ok, _}, ItrRet), + {_, Itr} = ItrRet, + ?assertEqual(ok, iterate(Itr)), + ?assertEqual(ok, eleveldb:iterator_close(Itr)), + eleveldb:assert_close(DB), + check_open_close(TestDir). %% Close DB while iterator running %% Expected: reopen should fail while iterator reference alive %% however, iterator should fail after DB is closed %% once iterator process exits, open should succeed -iterator_db_close_test() -> - DB = open(), - try - write(100, DB), - Parent = self(), - spawn_monitor(fun() -> - {ok, Itr} = eleveldb:iterator(DB, []), - Parent ! continue, - try - iterate(Itr, 10) - catch - error:badarg -> - ok - end, - try - eleveldb:iterator_close(Itr) - catch - error:badarg -> - ok - end - end), - receive continue -> ok end, - eleveldb:close(DB), - %%failed_open(), - wait_down(), - erlang:garbage_collect(), - timer:sleep(500), - check(), - ok - after - catch eleveldb:close(DB), - timer:sleep(500) - end. +test_iterator_db_close(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertEqual(ok, write(100, DB)), + Parent = self(), + {Pid, Mon} = Proc = erlang:spawn_monitor( + fun() -> + {ok, Itr} = eleveldb:iterator(DB, []), + Parent ! continue, + try + iterate(Itr, 10) + catch + error:badarg -> + ok + end, + try + eleveldb:iterator_close(Itr) + catch + error:badarg -> + ok + end + end), + ?assertEqual(ok, receive + continue -> + ok; + {'DOWN', Mon, process, Pid, Info} -> + Info + end), + eleveldb:assert_close(DB), + ?assertEqual(ok, wait_down(Proc)), + check_open_close(TestDir). %% Iterate open, iterator process exit w/o close -iterator_exit_test() -> - DB = open(), - try - write(100, DB), - spawn_wait(fun() -> - {ok, Itr} = eleveldb:iterator(DB, []), - iterate(Itr) - end), - eleveldb:close(DB), - check(), - ok - after - catch eleveldb:close(DB), - timer:sleep(500) - end. +test_iterator_exit(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertEqual(ok, write(100, DB)), + spawn_wait(fun() -> + {ok, Itr} = eleveldb:iterator(DB, []), + iterate(Itr) + end), + eleveldb:assert_close(DB), + check_open_close(TestDir). spawn_wait(F) -> - spawn_monitor(F), - wait_down(). + wait_down(erlang:spawn_monitor(F)). -wait_down() -> - receive {'DOWN', _, process, _, _} -> +wait_down({Pid, Mon}) when erlang:is_pid(Pid) andalso erlang:is_reference(Mon) -> + receive + {'DOWN', Mon, process, Pid, _} -> + ok + end; +wait_down(Mon) when erlang:is_reference(Mon) -> + receive + {'DOWN', Mon, process, _, _} -> + ok + end; +wait_down(Pid) when erlang:is_pid(Pid) -> + receive + {'DOWN', _, process, Pid, _} -> ok end. -check() -> - timer:sleep(500), - DB = open(), - eleveldb:close(DB), - timer:sleep(500), - ok. - -open() -> - {ok, Ref} = eleveldb:open(?COMMON_INSTANCE_DIR, - [{create_if_missing, true}]), - Ref. - -failed_open() -> - {error, {db_open, _}} = eleveldb:open(?COMMON_INSTANCE_DIR, - [{create_if_missing, true}]), - ok. +check_open_close(TestDir) -> + eleveldb:assert_close(eleveldb:assert_open(TestDir)). write(N, DB) -> write(0, N, DB). write(Same, Same, _DB) -> ok; write(N, End, DB) -> - eleveldb:put(DB, <>, <>, []), - write(N+1, End, DB). + KV = <>, + ?assertEqual(ok, eleveldb:put(DB, KV, KV, [])), + write((N + 1), End, DB). iterate(Itr) -> iterate(Itr, 0). @@ -174,5 +165,6 @@ do_iterate({ok, K, _V}, {Itr, Expected, Delay}) -> <> = K, ?assertEqual(Expected, N), (Delay == 0) orelse timer:sleep(Delay), - do_iterate(eleveldb:iterator_move(Itr, next), - {Itr, Expected + 1, Delay}). + do_iterate(eleveldb:iterator_move(Itr, next), {Itr, (Expected + 1), Delay}). + +-endif. % TEST diff --git a/test/iterators.erl b/test/iterators.erl index 438f77b4..ce303308 100644 --- a/test/iterators.erl +++ b/test/iterators.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010-2013 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2013-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -19,128 +17,131 @@ %% under the License. %% %% ------------------------------------------------------------------- --module(iterators). --compile(export_all). +-module(iterators). -ifdef(TEST). - -include_lib("eunit/include/eunit.hrl"). iterator_test_() -> - {spawn, - [{setup, - fun setup/0, - fun cleanup/1, - fun(Ref) -> - [ - prev_test_case(Ref), - seek_and_next_test_case(Ref), - basic_prefetch_test_case(Ref), - seek_and_prefetch_test_case(Ref), - aae_prefetch1(Ref), - aae_prefetch2(Ref), - aae_prefetch3(Ref) - ] - end}] - }. + {spawn, [ + {setup, + fun setup/0, + fun cleanup/1, + fun({_, Ref}) -> [ + prev_test_case(Ref), + seek_and_next_test_case(Ref), + basic_prefetch_test_case(Ref), + seek_and_prefetch_test_case(Ref), + aae_prefetch1(Ref), + aae_prefetch2(Ref), + aae_prefetch3(Ref) + ] end + }]}. setup() -> - os:cmd("rm -rf ltest"), % NOTE - {ok, Ref} = eleveldb:open("ltest", [{create_if_missing, true}]), - eleveldb:put(Ref, <<"a">>, <<"w">>, []), - eleveldb:put(Ref, <<"b">>, <<"x">>, []), - eleveldb:put(Ref, <<"c">>, <<"y">>, []), - eleveldb:put(Ref, <<"d">>, <<"z">>, []), - Ref. - -cleanup(Ref) -> - eleveldb:close(Ref). + Dir = eleveldb:create_test_dir(), + Ref = eleveldb:assert_open(Dir), + ?assertEqual(ok, eleveldb:put(Ref, <<"a">>, <<"w">>, [])), + ?assertEqual(ok, eleveldb:put(Ref, <<"b">>, <<"x">>, [])), + ?assertEqual(ok, eleveldb:put(Ref, <<"c">>, <<"y">>, [])), + ?assertEqual(ok, eleveldb:put(Ref, <<"d">>, <<"z">>, [])), + {Dir, Ref}. + +cleanup({Dir, Ref}) -> + eleveldb:assert_close(Ref), + eleveldb:delete_test_dir(Dir). + +assert_iterator(DbRef, ItrOpts) -> + ItrRet = eleveldb:iterator(DbRef, ItrOpts), + ?assertMatch({ok, _}, ItrRet), + {_, Itr} = ItrRet, + Itr. prev_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<>>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, next)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, prev)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<>>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, next)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, prev)) end. seek_and_next_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, next)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, next)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, next)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, next)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, next)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, next)) end. basic_prefetch_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<>>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<>>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch)) end. seek_and_prefetch_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)) end. aae_prefetch1(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), - {ok, J} = eleveldb:iterator(Ref, []), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, <<"z">>)), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, prefetch_stop)) + J = assert_iterator(Ref, []), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, <<"z">>)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, prefetch_stop)) end. aae_prefetch2(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch_stop)), - - {ok, J} = eleveldb:iterator(Ref, []), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, <<"z">>)), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, prefetch)), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, prefetch_stop)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch_stop)), + + J = assert_iterator(Ref, []), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, <<"z">>)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, prefetch_stop)) end. aae_prefetch3(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)) + I = assert_iterator(Ref, []), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)) end. --endif. +-endif. % TEST