From 39c86cfad771e2bd7b918133157567fd16da8538 Mon Sep 17 00:00:00 2001 From: Jace A Mogill Date: Sat, 6 Feb 2016 20:39:04 -0800 Subject: [PATCH] Support arguments to parallel regions in fork-join mode Add example web server that goes parallel to generate a response Add fork-join test for arguments Update documentation Bump version to 1.1.0 --- Docs/reference.html | 56 +++++++++++++++++---------- Examples/web_server.js | 76 +++++++++++++++++++++++++++++++++++++ Tests/fj_args.js | 86 ++++++++++++++++++++++++++++++++++++++++++ index.js | 20 +++++++--- package.json | 2 +- 5 files changed, 214 insertions(+), 26 deletions(-) create mode 100644 Examples/web_server.js create mode 100644 Tests/fj_args.js diff --git a/Docs/reference.html b/Docs/reference.html index 74c9820..7fdcd68 100644 --- a/Docs/reference.html +++ b/Docs/reference.html @@ -358,35 +358,51 @@
Parallel Region (Fork-Join execution only)
- + - - - - - - -
CLASS METHOD ems.parallel( func ) ems.parallel( [args, ...] func )
SYNOPSIS When using fork-join execution, the master - thread creates a parallel region by executing func - once by each thread. A join is performed when all the functions have - returned. No results are returned, any side effects are stored - in shared EMS memory. Global Node.js thread state persists from - region to region. + thread enters a parallel region by executing func + once from each process. The master process first starts all the + other processes running the function asynchronously, then runs the function + itself synchronously. No join or implied synchronization between processes is performed, + callers must perform a barrier to ensure all processes + have completed their work. + The results of the function are discarded. Global variables on each node + are persistent between regions.
ARGUMENTS func <Function> Function execute on every thread. Takes no arguments and returns no - value because all communication occurs via shared EMS memory. -
+ + ARGUMENTS + args + <Any> + Zero or more arguments to be passed to the function. + + + + + func + <Function> + Function to be executed once on every thread. The + optional arguments are used when calling the function. Return value is ignored. + + +
- - - - - + + + + + + + + + +
EXAMPLES ems.parallel( doWork ) The function doWork is executed by every thread.
EXAMPLES ems.parallel( doWork ) The function doWork is executed by every thread.
ems.parallel( foo, "Smith", 123, doWork ) The function call doWork(foo, "Smith", 123) + is performed once by each process.
diff --git a/Examples/web_server.js b/Examples/web_server.js new file mode 100644 index 0000000..0a4eadf --- /dev/null +++ b/Examples/web_server.js @@ -0,0 +1,76 @@ +/*-----------------------------------------------------------------------------+ + | Extended Memory Semantics (EMS) Version 1.1.0 | + | Synthetic Semantics http://www.synsem.com/ mogill@synsem.com | + +-----------------------------------------------------------------------------+ + | Copyright (c) 2011-2014, Synthetic Semantics LLC. All rights reserved. | + | Copyright (c) 2015-2016, Jace A Mogill. All rights reserved. | + | | + | Redistribution and use in source and binary forms, with or without | + | modification, are permitted provided that the following conditions are met: | + | * Redistributions of source code must retain the above copyright | + | notice, this list of conditions and the following disclaimer. | + | * Redistributions in binary form must reproduce the above copyright | + | notice, this list of conditions and the following disclaimer in the | + | documentation and/or other materials provided with the distribution. | + | * Neither the name of the Synthetic Semantics nor the names of its | + | contributors may be used to endorse or promote products derived | + | from this software without specific prior written permission. | + | | + | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | + | "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | + | LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | + | A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SYNTHETIC | + | SEMANTICS LLC BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, | + | EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, | + | PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR | + | PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF | + | LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING | + | NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | + | SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | + | | + +-----------------------------------------------------------------------------*/ +'use strict'; +var ems = require('ems')(parseInt(process.argv[2]), true, 'fj'); +var http = require('http'); +var port = 8080; +var shared_data; + +/* Connect to the shared data on every task. The map key will be + * the URL, the value will be the concatenation of work performed + * by all the processes. */ +ems.parallel(function () { + shared_data = ems.new({ + dimensions: [1000], + heapSize: [100000], + useExisting: false, + useMap: true, + setFEtags: 'full', + filename: '/tmp/EMS_shared_web_data.ems' + }); +}); + + +// When a request arrives, each process does some work and appends the results +// to shared memory +function handleRequest(request, response) { + // If this URL has not yet been requested, the data is undefined + // and must be initialized to + // Alternatively, may be initialized not here but at ems.new() + shared_data.cas(request.url, undefined, "Response preamble."); + + // Enter a parallel region, each process does some work, and then + // appends the result the value. + ems.parallel(request.url, function (url) { + // Do some work + shared_data.faa(url, " Work from process " + ems.myID + "."); + ems.barrier(); // Wait for all processes to finish region before any may exit + }); + + // Return the results, leaving them in shared memory for later updates + response.end('Shared results from(' + request.url + "):" + shared_data.readFF(request.url)); +} + +// Create the Web server +http.createServer(handleRequest).listen(port, function () { + ems.diag("Server listening on: http://localhost:" + port); +}); diff --git a/Tests/fj_args.js b/Tests/fj_args.js new file mode 100644 index 0000000..deb1d79 --- /dev/null +++ b/Tests/fj_args.js @@ -0,0 +1,86 @@ +/*-----------------------------------------------------------------------------+ + | Extended Memory Semantics (EMS) Version 1.1.0 | + | Synthetic Semantics http://www.synsem.com/ mogill@synsem.com | + +-----------------------------------------------------------------------------+ + | Copyright (c) 2011-2014, Synthetic Semantics LLC. All rights reserved. | + | Copyright (c) 2015-2016, Jace A Mogill. All rights reserved. | + | | + | Redistribution and use in source and binary forms, with or without | + | modification, are permitted provided that the following conditions are met: | + | * Redistributions of source code must retain the above copyright | + | notice, this list of conditions and the following disclaimer. | + | * Redistributions in binary form must reproduce the above copyright | + | notice, this list of conditions and the following disclaimer in the | + | documentation and/or other materials provided with the distribution. | + | * Neither the name of the Synthetic Semantics nor the names of its | + | contributors may be used to endorse or promote products derived | + | from this software without specific prior written permission. | + | | + | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | + | "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | + | LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | + | A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SYNTHETIC | + | SEMANTICS LLC BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, | + | EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, | + | PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR | + | PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF | + | LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING | + | NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | + | SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | + | | + +-----------------------------------------------------------------------------*/ +'use strict'; +var ems = require('ems')(parseInt(process.argv[2]), true, 'fj'); + +var assert; +var global_str; +var check_glob_str; + +ems.parallel("This is the block that defines global vars", function () { + assert = require('assert'); + global_str = '_this is str0_'; + var local_str = 'you should never see this'; + + check_glob_str = function () { + if(ems.myID === 0) { + assert(global_str === "Process 0 clobbered the global string"); + } else { + assert(global_str === "_this is str0_Updated by process " + ems.myID); + } + } +}); + + +ems.parallel(global_str, 'two', 'three', + function (a, b, c, taskN) { + assert(typeof taskN === "undefined", "Mysterious fourth argument:" + taskN); + assert(typeof local_str === "undefined", "The local string did not stay local"); + ems.diag("START global_str=" + global_str + " a =" + a + " b=" + b + " c=" + c); + assert(a === global_str, "A argument not in closure? (" + JSON.stringify(a) + ") global_str=" + global_str); + global_str += "Updated by process " + ems.myID; + } +); + +ems.diag("==============================================This message happens once: Thus concludes first parallel region"); +global_str = "Process 0 clobbered the global string"; + + +ems.parallel('xxx', 'yyy', 'zzz', -321, + function (a, b, c, x, taskN) { + assert(a === 'xxx' && b === 'yyy' && c === 'zzz' && x === -321 && typeof taskN === "undefined", + "arguments are missing or wrong"); + check_glob_str(); + ems.diag("Second parallel region taskN=" + taskN + " a =" + a + " b=" + b + " c=" + c + " global_str is now:" + global_str); + } +); + +ems.diag("==============================================This message happens once: This is the barrier"); +ems.parallel(function () { + ems.diag("Barrier time!"); + check_glob_str(); + ems.barrier(); +}); + +ems.diag("==============================================This message happens once: This is the end."); + +process.exit(0); diff --git a/index.js b/index.js index 0963261..06c8700 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,5 @@ /*-----------------------------------------------------------------------------+ - | Extended Memory Semantics (EMS) Version 1.0.0 | + | Extended Memory Semantics (EMS) Version 1.1.0 | | Synthetic Semantics http://www.synsem.com/ mogill@synsem.com | +-----------------------------------------------------------------------------+ | Copyright (c) 2011-2014, Synthetic Semantics LLC. All rights reserved. | @@ -71,11 +71,14 @@ function EMSdiag(text) { //================================================================== // Co-Begin a parallel region, executing the function 'func' // -function EMSparallel(func) { +function EMSparallel() { + var user_args = (arguments.length === 1?[arguments[0]]:Array.apply(null, arguments)); + var func = user_args.pop(); // Remove the function + // Loop over remote processes, starting each of them this.tasks.forEach(function (task, taskN) { - task.send({'args': taskN + 1, 'func': func.toString()}); + task.send({'taskN': taskN + 1, 'args': user_args, 'func': func.toString()}); }); - func(0); + func.apply(null, user_args); // Invoke on master process } @@ -594,7 +597,14 @@ function ems_wrapper(nThreadsArg, pinThreadsArg, threadingType, filename) { // The master thread has completed initialization, other threads may now // safely execute. if (targetScript !== undefined && retObj.myID == 0) { - var emsThreadStub = '// Automatically Generated EMS Slave Thread Script\n// Edit index.js: emsThreadStub\n ems = require(\'ems\')(parseInt(process.argv[2])); process.on(\'message\', function(msg) { eval(\'msg.func = \' + msg.func); msg.func(msg.args); } );'; + var emsThreadStub = + '// Automatically Generated EMS Slave Thread Script\n' + + '// To edit this file, see index.js:emsThreadStub()\n' + + 'var ems = require("ems")(parseInt(process.argv[2]));\n' + + 'process.on("message", function(msg) {\n' + + ' eval("func = " + msg.func);\n' + + ' func.apply(null, msg.args);\n' + + '} );\n'; fs.writeFileSync('./EMSthreadStub.js', emsThreadStub, {flag: 'w+'}); for (var taskN = 1; taskN < nThreads; taskN++) { retObj.tasks.push( diff --git a/package.json b/package.json index 93f0c23..4d026f7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ems", - "version": "1.0.14", + "version": "1.1.0", "author": "Synthetic Semantics ", "description": "Shared Memory Parallelism with Transactional Memory and Extended Memory Semantics", "contributors": [