Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
fed135 committed Apr 26, 2018
0 parents commit 012e3b6
Show file tree
Hide file tree
Showing 10 changed files with 760 additions and 0 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# REST-store

Want to make your app faster and don't want to spend on extra infrastructure ?

**REST-store** is:

An in-memory, self-adjustting cache:

- Helps reduce the number of requests for 'hot' information
- No noticeable footprint
- No need for extra caching architecture (redis/memcache)

With request dedupping, batching, retrying and circuit-breaking:

- Process-wide request profiling and mapping
- Greatly reduces the number of requests
- Fully configurable

## Use cases

// Sample data query distribution graph

// Call reduction graph

// Median response time graph

## Installing

`npm install rest-store`

## Usage

```node

```

## Testing

`npm test`

## Contributing

## License

14 changes: 14 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "rest-batcher",
"version": "1.0.0",
"description": "A resource access indexer and batcher",
"main": "index.js",
"directories": {
"test": "test"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC"
}
99 changes: 99 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Batcher index
*/

/* Requires ------------------------------------------------------------------*/

const abatch = require('./batch');
const { requiredParam } = require('./utils');
const EventEmitter = require('events').EventEmitter;

/* Methods -------------------------------------------------------------------*/

/**
* Batcher constructor
* @class batcher
*/
function batcher(config = {
getter: requiredParam('getter', '<object>{ method: <function(ids, params)>, responseParser: <function(response, requestedIds)> }'),
uniqueOptions = [],
cache = {
enabled: true,
step: 1000,
ttl: 10000,
},
batch = {
enabled: false,
tick: 40,
limit: 100,
},
retry = {
enabled: true,
max: 3,
scale: {
mult: 2.5,
base: 5,
}
}
}) {
// Local variables
const emitter = new EventEmitter();
const queue = abatch(config, emitter);

if (config.batch.enabled === true) _checkGetterConfig('many');

/**
* Gets a single record from source
* @param {string|number} id The id of the record to fetch
* @param {object} params (Optional) The Request parameters
* @param {object} overrides (Optional) Batcher options for this call
* @returns {Promise} The eventual single record
*/
function one(id, params = {}, overrides = {}) {
let method = queue.add;
if (overrides.batch === false || config.batch.enabled === false) {
method = queue.skip;
}

return method(id, params);
}

/**
* Gets a list of records from source
* @param {array<string|number>} ids The id of the record to fetch
* @param {object} params (Optional)The Request parameters
* @param {object} overrides (Optional) Batcher options for this call
* @returns {Promise} The eventual single record
*/
function many(ids, params = {}, overrides = {}) {
return Promise.all(ids.map(id => one(id, params, overrides)));
}

/**
* Checks if one or more recors are present in temp store
* @param {string|number|array<string|number>} ids The id(s) to lookup
* @param {object} params (Optional) The Request parameters
* @returns {boolean} If all records requested are in temp store
*/
function has(ids, params) {
if (Array.isArray(ids)) return ids.every(id => has(id, params));
return queue.store.has(queue.store.key(ids, params));
}

/**
* Clears one or more recors from temp store
* @param {string|number|array<string|number>} ids The id(s) to clear
* @param {object} params (Optional) The Request parameters
* @returns {boolean} The result of the clearing
*/
function clear(ids, params) {
if (Array.isArray(ids)) return ids.map(id => clear(id, params));
return queue.store.clear(queue.store.key(ids, params));
}

return Object.assign({ one, many, has, clear }, emitter);
}

/* Exports -------------------------------------------------------------------*/

module.exports = batcher;
134 changes: 134 additions & 0 deletions src/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* Queue processing
*/

/* Requires ------------------------------------------------------------------*/

const localStore = require('./store');

/* Methods -------------------------------------------------------------------*/

function queue(config, emitter) {
// Local variables
const store = localStore(config, emitter);
const contexts = new Map();

/**
* Adds an element to the end of the queue
* @param {*} id
* @param {*} params
*/
function add(id, params) {
const key = contextKey(params);

const record = store.get(recordKey(key, id));
if (record !== undefined) {
emitter.emit('cacheHit', { key, id, params, deferred: !!(record.value) });
return record.value || record.promise;
}

emitter.emit('cacheMiss', { key, id, params });

const context = store.get(key);
if (context === undefined) {
contexts.set(key, {
ids: [id],
params,
attempts: 0,
scale: config.retry.scale.base,
timer: setTimeout(() => query(key), config.batch.tick),
});
} else {
context.ids.push(id);
if (context.ids.length >= config.batch.limit) {
query(key);
}
else {
if (context.timer === null) {
context.timer = setTimeout(() => query(key), config.batch.tick);
}
}
}

const recordDef = { promise: new Promise(), value: null };

store.set(recordKey(key, id), recordDef);
return recordDef.promise;
}

/**
* Skips queue and cache, gets an element directly
* @param {*} id
* @param {*} params
*/
function skip(id, params) {
return config.getter.method(id, params);
}

/**
* Runs the getter function
*/
function query(key, ids) {
const context = store.get(key);
if (context !== undefined) {
clearTimeout(context.timer);
const targetIds = ids || context.ids.splice(0,config.batch.limit);
emitter.emit('batch', { key, ids: targetIds, params });
config.getter.method(targetIds, context.params)
.catch(err => retry(key, targetIds, params, err))
.then(
results => complete(key, targetIds, params, results),
err => retry(key, targetIds, params, err)
);

if (context.ids.length > 0) {
context.timer = setTimeout(() => query(key), config.batch.tick);
}
else {
context.timer = null;
}
}
}

function retry(key, ids, params, err) {
emitter.emit('batchFailed', { key, ids, params, error: err });
const context = store.get(key);
if (context !== undefined) {
context.attempts = context.attempts + 1;
if (config.retry.enabled === true) {
if (config.retry.max >= context.attempts) {
context.scale = context.scale * config.retry.scale.mult;
context.timer = setTimeout(() => query(key, ids), context.scale);
}
}
else {
emitter.emit('batchCancelled', { key, ids, params, error: err });
}
}
}

function complete(key, ids, params, results) {
emitter.emit('batchSuccess', { key, ids, params });
const parser = config.getter.responseParser || (results => results);
const records = parser(results, ids, params);
const context = store.get(key);
if (context !== undefined) {
context.attempts = 0;
context.scale = config.retry.scale.base;
}

return Promise.all(ids.map(id => store.set(recordKey(key, id), { value: records[id] }, { ttl: config.cache.ttl })));
}

function contextKey(params) {
return config.uniqueOptions.map(opt => `${curr}=${params[curr]}`).join(';');
}

function recordKey(context, id) {
return `${context}::${id}`;
}

return { add, skip, store };
}

module.exports = batcher;
87 changes: 87 additions & 0 deletions src/store.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* A-Store
*/

/* Methods -------------------------------------------------------------------*/

/**
* Store constructor
* @param {object} config The options for the store
* @param {EventEmitter} emitter The event-emitter instance for the batcher
*/
function localStore(config, emitter) {
const store = new Map();

/**
* Performs a query that returns a single entities to be cached
* @param {object} opts The options for the dao
* @param {string} method The dao method to call
* @returns {Promise}
*/
function get(key) {
const record = store.get(key);
if (record) {
if (record.value && record.timer) {
record.bump = true;
}
}
return record;
}

/**
* Performs a query that returns a single entities to be cached
* @param {object} opts The options for the dao
* @param {string} method The dao method to call
* @returns {Promise}
*/
function set(key, value, opts) {
if (opts && opts.ttl) {
value.timestamp = Date.now();
value.timer = setTimeout(() => lru(key), config.cache.step);
}
return store.set(key, value);
}

/**
* Checks if a computed key is present in the store
* @param {string} key The key to search for
* @returns {boolean} Wether the key is in the store or not
*/
function has(key) {
return store.has(key);
}

/**
* Clears a specified computed key from the store
* @param {string} key The key to search for
* @returns {boolean} Wether the key was removed or not
*/
function clear(key) {
return store.delete(key);
}

function lru(key) {
const record = store.get(key);
if (record) {
if (record.value && record.timer) {
const now = Date.now();
if (now + config.cache.step <= record.timestamp + config.cache.ttl && record.bump === true) {
emitter.emit('bumpCache', { key, timestamp: record.timestamp, expires: now + config.cache.step });
clearTimeout(record.timer);
value.timer = setTimeout(() => clear(key), config.cache.step);
record.bump = false;
}
else {
emitter.emit('clearCache', { key, timestamp: record.timestamp, expires: now });
clear(key);
}
}
}
}

return { get, set, has, clear };
}

/* Exports -------------------------------------------------------------------*/

module.exports = localStore;
Loading

0 comments on commit 012e3b6

Please sign in to comment.