Skip to content

Commit

Permalink
rebase to pickup test improvements, OSX build improvement, thread tas…
Browse files Browse the repository at this point in the history
…k cleanups, leveldb 2.0.34
  • Loading branch information
Matthew V committed Feb 15, 2017
1 parent d7f51bd commit 55abc57
Show file tree
Hide file tree
Showing 9 changed files with 874 additions and 605 deletions.
7 changes: 7 additions & 0 deletions BASHO_RELEASES
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
github.com tag 2.0.34 - February 15, 2017
-----------------------------------------
mv-hot-backup2: - correct MakeTieredDbname() within db/filename.cc
for case where dbname input is blank and fast/slow
already populated in options. Corrects issue
with hot backup in non-tiered storage situations

github.com tag 2.0.33 - November 21, 2016
-----------------------------------------
mv-bucket-expiry: - partial branch to enable X-Riak-Meta-Expiry-Base-Seconds
Expand Down
6 changes: 4 additions & 2 deletions c_src/build_deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
fi
unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well

LEVELDB_VSN="2.0.33"
LEVELDB_VSN="2.0.34"

SNAPPY_VSN="1.0.4"

Expand Down Expand Up @@ -65,9 +65,11 @@ case "$1" in
;;

*)
export MACOSX_DEPLOYMENT_TARGET=10.8

if [ ! -d snappy-$SNAPPY_VSN ]; then
tar -xzf snappy-$SNAPPY_VSN.tar.gz
(cd snappy-$SNAPPY_VSN && ./configure --prefix=$BASEDIR/system --libdir=$BASEDIR/system/lib --with-pic)
(cd snappy-$SNAPPY_VSN && ./configure --disable-shared --prefix=$BASEDIR/system --libdir=$BASEDIR/system/lib --with-pic)
fi

if [ ! -f system/lib/libsnappy.a ]; then
Expand Down
96 changes: 23 additions & 73 deletions c_src/eleveldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,21 @@ ERL_NIF_TERM send_reply(ErlNifEnv *env, ERL_NIF_TERM ref, ERL_NIF_TERM reply)
return ATOM_OK;
}

// Boilerplate for submitting to the thread queue.
// Takes ownership of the item. assumes allocated through new

ERL_NIF_TERM
submit_to_thread_queue(eleveldb::WorkTask *work_item, ErlNifEnv* env, ERL_NIF_TERM caller_ref){
eleveldb_priv_data& data = *static_cast<eleveldb_priv_data *>(enif_priv_data(env));
if(false == data.thread_pool.Submit(work_item))
{
delete work_item;
return send_reply(env, caller_ref,
enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref));
} // if
return eleveldb::ATOM_OK;
}

ERL_NIF_TERM
async_open(
ErlNifEnv* env,
Expand Down Expand Up @@ -666,15 +681,7 @@ async_open(

eleveldb::WorkTask *work_item = new eleveldb::OpenTask(env, caller_ref,
db_name, opts);

if(false == priv.thread_pool.Submit(work_item))
{
delete work_item;
return send_reply(env, caller_ref,
enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref));
}

return eleveldb::ATOM_OK;
return submit_to_thread_queue(work_item, env, caller_ref);

} // async_open

Expand Down Expand Up @@ -705,8 +712,6 @@ async_write(
if(NULL == db_ptr->m_Db)
return send_reply(env, caller_ref, error_einval(env));

eleveldb_priv_data& priv = *static_cast<eleveldb_priv_data *>(enif_priv_data(env));

// Construct a write batch:
leveldb::WriteBatch* batch = new leveldb::WriteBatch;

Expand All @@ -728,16 +733,7 @@ async_write(

eleveldb::WorkTask* work_item = new eleveldb::WriteTask(env, caller_ref,
db_ptr, batch, opts);

if(false == priv.thread_pool.Submit(work_item))
{
// work_item contains "batch" and the delete below gets both memory allocations
delete work_item;
return send_reply(env, caller_ref,
enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref));
} // if

return eleveldb::ATOM_OK;
return submit_to_thread_queue(work_item, env, caller_ref);
}


Expand Down Expand Up @@ -771,17 +767,7 @@ async_get(

eleveldb::WorkTask *work_item = new eleveldb::GetTask(env, caller_ref,
db_ptr, key_ref, opts);

eleveldb_priv_data& priv = *static_cast<eleveldb_priv_data *>(enif_priv_data(env));

if(false == priv.thread_pool.Submit(work_item))
{
delete work_item;
return send_reply(env, caller_ref,
enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref));
} // if

return eleveldb::ATOM_OK;
return submit_to_thread_queue(work_item, env, caller_ref);

} // async_get

Expand Down Expand Up @@ -818,18 +804,7 @@ async_iterator(

eleveldb::WorkTask *work_item = new eleveldb::IterTask(env, caller_ref,
db_ptr, keys_only, opts);

// Now-boilerplate setup (we'll consolidate this pattern soon, I hope):
eleveldb_priv_data& priv = *static_cast<eleveldb_priv_data *>(enif_priv_data(env));

if(false == priv.thread_pool.Submit(work_item))
{
delete work_item;
return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref));
} // if

return ATOM_OK;

return submit_to_thread_queue(work_item, env, caller_ref);
} // async_iterator


Expand Down Expand Up @@ -1050,15 +1025,8 @@ async_close(
{
eleveldb::WorkTask *work_item = new eleveldb::CloseTask(env, caller_ref,
db_ptr);
return submit_to_thread_queue(work_item, env, caller_ref);

// Now-boilerplate setup (we'll consolidate this pattern soon, I hope):
eleveldb_priv_data& priv = *static_cast<eleveldb_priv_data *>(enif_priv_data(env));

if(false == priv.thread_pool.Submit(work_item))
{
delete work_item;
return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref));
} // if
} // if
else if (!term_ok)
{
Expand Down Expand Up @@ -1098,15 +1066,7 @@ async_iterator_close(
{
eleveldb::WorkTask *work_item = new eleveldb::ItrCloseTask(env, caller_ref,
itr_ptr);

// Now-boilerplate setup (we'll consolidate this pattern soon, I hope):
eleveldb_priv_data& priv = *static_cast<eleveldb_priv_data *>(enif_priv_data(env));

if(false == priv.thread_pool.Submit(work_item))
{
delete work_item;
return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref));
} // if
return submit_to_thread_queue(work_item, env, caller_ref);
} // if

// this close/cleanup call is way late ... bad programmer!
Expand Down Expand Up @@ -1136,25 +1096,15 @@ async_destroy(

ERL_NIF_TERM caller_ref = argv[0];

eleveldb_priv_data& priv = *static_cast<eleveldb_priv_data *>(enif_priv_data(env));

leveldb::Options *opts = new leveldb::Options;
fold(env, argv[2], parse_open_option, *opts);

eleveldb::WorkTask *work_item = new eleveldb::DestroyTask(env, caller_ref,
db_name, opts);

if(false == priv.thread_pool.Submit(work_item))
{
delete work_item;
return send_reply(env, caller_ref,
enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref));
}

return eleveldb::ATOM_OK;

return submit_to_thread_queue(work_item, env, caller_ref);
} // async_destroy


} // namespace eleveldb


Expand Down
Loading

0 comments on commit 55abc57

Please sign in to comment.