Skip to content

Commit

Permalink
Implement preemptive map tag acquisition.
Browse files Browse the repository at this point in the history
Clean up kv_store example.
  • Loading branch information
Jace A Mogill committed Feb 16, 2016
1 parent 975ef54 commit a112a3f
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 49 deletions.
64 changes: 31 additions & 33 deletions Examples/kv_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,56 +65,52 @@ function handleRequest(request, response) {
var key = Object.keys(parsed_request.query)[0];
var value = parsed_request.query[key];
if (value === '') { value = undefined; }

console.log("Request #" + requestN + ", slave process " + myID + ": key(" + key + ") val(" + value + ")");
var data;
var opname;

if (!key) {
response.end("ERROR: No key specified in request");
return;
}

if (parsed_request.pathname.indexOf("/readFE") >= 0) {
var data = persistent_data.readFE(key);
console.log("do_readFE(" + key + ")=|" + data + "|");
response.end(JSON.stringify(data));
data = persistent_data.readFE(key);
opname = 'readFE';
} else if (parsed_request.pathname.indexOf("/readFF") >= 0) {
var data = persistent_data.readFF(key);
console.log("do_readFF(" + key + ")=|" + data + "|");
response.end(JSON.stringify(data));
data = persistent_data.readFF(key);
opname = 'readFF';
} else if (parsed_request.pathname.indexOf("/read") >= 0) {
var data = persistent_data.read(key);
console.log("do_read(" + key + ")=|" + data + "|");
response.end(JSON.stringify(data));
data = persistent_data.read(key);
opname = 'read';
} else if (parsed_request.pathname.indexOf("/writeXE") >= 0) {
var data = persistent_data.writeXE(key, value);
console.log("do_writeXE(" + key + ")=|" + data + "|");
response.end(JSON.stringify(data));
data = persistent_data.writeXE(key, value);
opname = 'writeXE';
} else if (parsed_request.pathname.indexOf("/writeXF") >= 0) {
var data = persistent_data.writeXF(key, value);
console.log("do_writeXF(" + key + ")=|" + data + "|");
response.end(JSON.stringify(data));
data = persistent_data.writeXF(key, value);
opname = 'writeXF';
} else if (parsed_request.pathname.indexOf("/writeEF") >= 0) {
var data = persistent_data.writeEF(key, value);
console.log("do_writeEF(" + key + ")=|" + data + "|");
response.end(JSON.stringify(data));
data = persistent_data.writeEF(key, value);
opname = 'writeEF';
} else if (parsed_request.pathname.indexOf("/write") >= 0) {
data = persistent_data.write(key, value);
opname = 'write';
} else if (parsed_request.pathname.indexOf("/faa") >= 0) {
var data = persistent_data.faa(key, value);
console.log("do_faa(" + key + ")=|" + data + "|");
response.end(JSON.stringify(data));
data = persistent_data.faa(key, value);
opname = 'faa';
} else if (parsed_request.pathname.indexOf("/cas") >= 0) {
opname = 'cas';
var old_new_vals = value.split(',');
if (old_new_vals[0] === '') { old_new_vals[0] = undefined; }
if (old_new_vals[1] === '') { old_new_vals[1] = undefined; }
var data = persistent_data.cas(key, old_new_vals[0], old_new_vals[1]);
console.log("do_cas(" + key + ")=|" + data + "|");
response.end(JSON.stringify(data));
} else if (parsed_request.pathname.indexOf("/write") >= 0) {
var data = persistent_data.write(key, value);
console.log("do_write(" + key + ")=|" + data + "|");
response.end(JSON.stringify(data));
data = persistent_data.cas(key, old_new_vals[0], old_new_vals[1]);
} else {
response.end("ERROR: No EMS command specified.");
data = "ERROR: No EMS command specified.";
}

var datastr = JSON.stringify(data);
console.log("Request #" + requestN + ", slave process " + myID + ": Op(" +
opname + ") key(" + key + ") val(" + value + ") data=" + datastr);
response.end(datastr);
}


Expand All @@ -129,8 +125,10 @@ if (cluster.isMaster) {
// Seed the GUID generator with a unique starting value
shared_counters.writeXF('GUID', Math.floor(Math.random() * 10000000));

// All the one-time initialization is complete, now start slave processes
for (var i = 0; i < 8; i++) {
// All the one-time initialization is complete, now start slave processes.
// The number of processes is the limit of the number of pending requests
// before deadlock occurs.
for (var procnum = 0; procnum < 8; procnum++) {
cluster.fork();
}

Expand Down
23 changes: 18 additions & 5 deletions ems.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ static void EMSarrFinalize(char *data, void *hint) {
// Wait until the FE tag is a particular state, then transition it to the new state
// Return new tag state
//
unsigned char EMStransitionFEtag(EMStag_t volatile *tag, unsigned char oldFE, unsigned char newFE, unsigned char oldType) {
unsigned char EMStransitionFEtag(EMStag_t volatile *tag, EMStag_t volatile *mapTag,
unsigned char oldFE, unsigned char newFE, unsigned char oldType) {
RESET_NAP_TIME;
EMStag_t oldTag; // Desired tag value to start of the transition
EMStag_t newTag; // Tag value at the end of the transition
Expand All @@ -130,7 +131,10 @@ unsigned char EMStransitionFEtag(EMStag_t volatile *tag, unsigned char oldFE, un
if (memTag.byte == oldTag.byte) {
return (newTag.byte);
} else {
// Allow preemptive map acquisition while waiting for data
if (mapTag) { mapTag->tags.fe = EMS_TAG_FULL; }
NANOSLEEP;
if (mapTag) { EMStransitionFEtag(mapTag, NULL, EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY); }
memTag.byte = tag->byte; // Re-load tag in case was transitioned by another thread
}
}
Expand Down Expand Up @@ -213,7 +217,7 @@ int64_t EMSreadIndexMap(const Nan::FunctionCallbackInfo<v8::Value>& info) {
while (nTries < MAX_OPEN_HASH_STEPS && !matched && !notPresent) {
idx = idx % bufInt64[EMScbData(EMS_ARR_NELEM)];
// Wait until the map key is FULL, mark it busy while map lookup is performed
mapTags.byte = EMStransitionFEtag(&bufTags[EMSmapTag(idx)], EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
mapTags.byte = EMStransitionFEtag(&bufTags[EMSmapTag(idx)], NULL, EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
if (mapTags.tags.type == idxType) {
switch (idxType) {
case EMS_TYPE_BOOLEAN:
Expand Down Expand Up @@ -335,7 +339,7 @@ int64_t EMSwriteIndexMap(const Nan::FunctionCallbackInfo<v8::Value>& info) {
while (nTries < MAX_OPEN_HASH_STEPS && !matched) {
idx = idx % bufInt64[EMScbData(EMS_ARR_NELEM)];
// Wait until the map key is FULL, mark it busy while map lookup is performed
mapTags.byte = EMStransitionFEtag(&bufTags[EMSmapTag(idx)], EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
mapTags.byte = EMStransitionFEtag(&bufTags[EMSmapTag(idx)], NULL, EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
mapTags.tags.fe = EMS_TAG_FULL; // When written back, mark FULL
if (mapTags.tags.type == idxType || mapTags.tags.type == EMS_TYPE_UNDEFINED) {
switch (mapTags.tags.type) {
Expand Down Expand Up @@ -545,8 +549,13 @@ void EMSreadUsingTags(const Nan::FunctionCallbackInfo<v8::Value>& info, // Index
} else {
// Tag was already marked BUSY, must retry
}
// CAS failed or memory wasn't in initial state, wait and retry
// CAS failed or memory wasn't in initial state, wait and retry.
// Permit preemptive map acquisition while waiting for data.
if (EMSisMapped) { bufTags[EMSmapTag(idx)].tags.fe = EMS_TAG_FULL; }
NANOSLEEP;
if (EMSisMapped) {
EMStransitionFEtag(&bufTags[EMSmapTag(idx)], NULL, EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
}
}
}

Expand Down Expand Up @@ -669,7 +678,11 @@ void EMSwriteUsingTags(const Nan::FunctionCallbackInfo<v8::Value>& info, // Ind

// Wait for the memory to be in the initial F/E state and transition to Busy
if (initialFE != EMS_TAG_ANY) {
EMStransitionFEtag(&bufTags[EMSdataTag(idx)], initialFE, EMS_TAG_BUSY, EMS_TAG_ANY);
volatile EMStag_t *maptag;
if (EMSisMapped) { maptag = &bufTags[EMSmapTag(idx)]; }
else { maptag = NULL; }
EMStransitionFEtag(&bufTags[EMSdataTag(idx)], maptag,
initialFE, EMS_TAG_BUSY, EMS_TAG_ANY);
}

while (true) {
Expand Down
2 changes: 1 addition & 1 deletion ems.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void EMSenqueue(const Nan::FunctionCallbackInfo<v8::Value> &info);
void EMSdequeue(const Nan::FunctionCallbackInfo<v8::Value> &info);
void EMSloopInit(const Nan::FunctionCallbackInfo<v8::Value>& info);
void EMSloopChunk(const Nan::FunctionCallbackInfo<v8::Value>& info);
unsigned char EMStransitionFEtag(EMStag_t volatile *tag, unsigned char oldFE, unsigned char newFE, unsigned char oldType);
unsigned char EMStransitionFEtag(EMStag_t volatile *tag, EMStag_t volatile *mapTag, unsigned char oldFE, unsigned char newFE, unsigned char oldType);
int64_t EMSwriteIndexMap(const Nan::FunctionCallbackInfo<v8::Value>& info);
int64_t EMSreadIndexMap(const Nan::FunctionCallbackInfo<v8::Value>& info);
void EMSindex2key(const Nan::FunctionCallbackInfo<v8::Value> &info);
Expand Down
14 changes: 7 additions & 7 deletions primitives.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void EMSpush(const Nan::FunctionCallbackInfo<v8::Value> &info) {
}

// Wait until the stack top is full, then mark it busy while updating the stack
EMStransitionFEtag(&bufTags[EMScbTag(EMS_ARR_STACKTOP)], EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
EMStransitionFEtag(&bufTags[EMScbTag(EMS_ARR_STACKTOP)], NULL, EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
int32_t idx = bufInt64[EMScbData(EMS_ARR_STACKTOP)]; // TODO BUG: Truncating the full 64b range
bufInt64[EMScbData(EMS_ARR_STACKTOP)]++;
if (idx == bufInt64[EMScbData(EMS_ARR_NELEM)] - 1) {
Expand All @@ -57,7 +57,7 @@ void EMSpush(const Nan::FunctionCallbackInfo<v8::Value> &info) {
}

// Wait until the target memory at the top of the stack is empty
newTag.byte = EMStransitionFEtag(&bufTags[EMSdataTag(idx)], EMS_TAG_EMPTY, EMS_TAG_BUSY, EMS_TAG_ANY);
newTag.byte = EMStransitionFEtag(&bufTags[EMSdataTag(idx)], NULL, EMS_TAG_EMPTY, EMS_TAG_BUSY, EMS_TAG_ANY);
newTag.tags.rw = 0;
newTag.tags.type = EMSv8toEMStype(info[0], stringIsJSON);
newTag.tags.fe = EMS_TAG_FULL;
Expand Down Expand Up @@ -114,7 +114,7 @@ void EMSpop(const Nan::FunctionCallbackInfo<v8::Value> &info) {
EMStag_t dataTag;

// Wait until the stack pointer is full and mark it empty while pop is performed
EMStransitionFEtag(&bufTags[EMScbTag(EMS_ARR_STACKTOP)], EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
EMStransitionFEtag(&bufTags[EMScbTag(EMS_ARR_STACKTOP)], NULL, EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
bufInt64[EMScbData(EMS_ARR_STACKTOP)]--;
int64_t idx = bufInt64[EMScbData(EMS_ARR_STACKTOP)];
if (idx < 0) {
Expand All @@ -127,7 +127,7 @@ void EMSpop(const Nan::FunctionCallbackInfo<v8::Value> &info) {

// Wait until the data pointed to by the stack pointer is full, then mark it
// busy while it is copied, and set it to EMPTY when finished
dataTag.byte = EMStransitionFEtag(&bufTags[EMSdataTag(idx)], EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
dataTag.byte = EMStransitionFEtag(&bufTags[EMSdataTag(idx)], NULL, EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
switch (dataTag.tags.type) {
case EMS_TYPE_BOOLEAN: {
bool retBool = bufInt64[EMSdataData(idx)];
Expand Down Expand Up @@ -199,7 +199,7 @@ void EMSenqueue(const Nan::FunctionCallbackInfo<v8::Value> &info) {
}

// Wait until the heap top is full, and mark it busy while data is enqueued
EMStransitionFEtag(&bufTags[EMScbTag(EMS_ARR_STACKTOP)], EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
EMStransitionFEtag(&bufTags[EMScbTag(EMS_ARR_STACKTOP)], NULL, EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
int32_t idx = bufInt64[EMScbData(EMS_ARR_STACKTOP)] % bufInt64[EMScbData(EMS_ARR_NELEM)]; // TODO: BUG This could be trucated
bufInt64[EMScbData(EMS_ARR_STACKTOP)]++;
if (bufInt64[EMScbData(EMS_ARR_STACKTOP)] - bufInt64[EMScbData(EMS_ARR_Q_BOTTOM)] >
Expand Down Expand Up @@ -260,7 +260,7 @@ void EMSdequeue(const Nan::FunctionCallbackInfo<v8::Value> &info) {
EMStag_t dataTag;

// Wait for bottom of heap pointer to be full, and mark it busy while data is dequeued
EMStransitionFEtag(&bufTags[EMScbTag(EMS_ARR_Q_BOTTOM)], EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
EMStransitionFEtag(&bufTags[EMScbTag(EMS_ARR_Q_BOTTOM)], NULL, EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
int64_t idx = bufInt64[EMScbData(EMS_ARR_Q_BOTTOM)] % bufInt64[EMScbData(EMS_ARR_NELEM)];
// If Queue is empty, return undefined
if (bufInt64[EMScbData(EMS_ARR_Q_BOTTOM)] >= bufInt64[EMScbData(EMS_ARR_STACKTOP)]) {
Expand All @@ -273,7 +273,7 @@ void EMSdequeue(const Nan::FunctionCallbackInfo<v8::Value> &info) {
bufInt64[EMScbData(EMS_ARR_Q_BOTTOM)]++;
// Wait for the data pointed to by the bottom of the heap to be full,
// then mark busy while copying it, and finally set it to empty when done
dataTag.byte = EMStransitionFEtag(&bufTags[EMSdataTag(idx)], EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
dataTag.byte = EMStransitionFEtag(&bufTags[EMSdataTag(idx)], NULL, EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
dataTag.tags.fe = EMS_TAG_EMPTY;
switch (dataTag.tags.type) {
case EMS_TYPE_BOOLEAN: {
Expand Down
17 changes: 14 additions & 3 deletions rmw.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,14 @@ void EMSfaa(const Nan::FunctionCallbackInfo<v8::Value>& info) {
return;
}

// Wait until the data is FULL, mark it busy while FAA is performed
oldTag.byte = EMStransitionFEtag(&bufTags[EMSdataTag(idx)], EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
{
volatile EMStag_t *maptag;
if (EMSisMapped) { maptag = &bufTags[EMSmapTag(idx)]; }
else { maptag = NULL; }
// Wait until the data is FULL, mark it busy while FAA is performed
oldTag.byte = EMStransitionFEtag(&bufTags[EMSdataTag(idx)], maptag,
EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
}
oldTag.tags.fe = EMS_TAG_FULL; // When written back, mark FULL
int argType = EMSv8toEMStype(info[1], false); // Never add to an object, treat as string
switch (oldTag.tags.type) {
Expand Down Expand Up @@ -329,7 +335,12 @@ void EMScas(const Nan::FunctionCallbackInfo<v8::Value> &info) {
memType = EMS_TYPE_UNDEFINED;
} else {
// Wait for the memory to be Full, then mark it Busy while CAS works
EMStransitionFEtag(&bufTags[EMSdataTag(idx)], EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
volatile EMStag_t *maptag;
if (EMSisMapped) { maptag = &bufTags[EMSmapTag(idx)]; }
else { maptag = NULL; }
// Wait until the data is FULL, mark it busy while FAA is performed
EMStransitionFEtag(&bufTags[EMSdataTag(idx)], maptag,
EMS_TAG_FULL, EMS_TAG_BUSY, EMS_TAG_ANY);
memType = bufTags[EMSdataTag(idx)].tags.type;
}

Expand Down

0 comments on commit a112a3f

Please sign in to comment.