Skip to content

Commit

Permalink
V4.0.0 (#123)
Browse files Browse the repository at this point in the history
* v4 main

* fixed cacheHit reporting

* updated interfaces

* fixed readme and tests to match new config spec

* fixed tests, remote store integrations

* removed test .only

Co-authored-by: Frederic Charette <[email protected]>
  • Loading branch information
fed135 and Frederic Charette authored Oct 24, 2022
1 parent 25db802 commit eedde87
Show file tree
Hide file tree
Showing 23 changed files with 693 additions and 288 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ jobs:
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- name: Start Redis
uses: supercharge/[email protected]
- name: npm install, build, and test
run: |
npm install
Expand Down
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@

---

**HA-store** is a generic wrapper for your data queries, it features:
**HA-store** is a wrapper for your data queries, it features:

- Smart TLRU cache for 'hot' information
- Supports mutliple caching levels
- Request coalescing and batching (solves the [Thundering Herd problem](https://en.wikipedia.org/wiki/Thundering_herd_problem))
- Insightful stats and [events](#Monitoring-and-events)
- Lightweight, configurable, battle-tested
Expand Down Expand Up @@ -68,9 +69,8 @@ Name | Required | Default | Description
--- | --- | --- | ---
resolver | true | - | The method to wrap, and how to interpret the returned data. Uses the format `<function(ids, params)>`
delimiter | false | `[]` | The list of parameters that, when passed, generate unique results. Ex: 'language', 'view', 'fields', 'country'. These will generate different combinations of cache keys.
store | false | `null` | A custom store for the data, like [ha-store-redis](https://github.com/fed135/ha-redis-adapter).
cache | false | <pre>{&#13;&#10;&nbsp;&nbsp;limit: 5000,&#13;&#10;&nbsp;&nbsp;ttl: 300000&#13;&#10;}</pre> | Caching options for the data - `limit` - the maximum number of records, and `ttl` - time to live for a record in milliseconds.
batch | false | <pre>{&#13;&#10;&nbsp;&nbsp;delay: 50,&#13;&#10;&nbsp;&nbsp;limit: 100&#13;&#10;}</pre> | Batching options for the requests
cache | false | <pre>{&#13;&#10;&nbsp;&nbsp;enabled: false,&#13;&#10;&nbsp;&nbsp;tiers: [&#13;&#10;&nbsp;&nbsp;{&#13;&#10;&nbsp;&nbsp;&nbsp;&nbsp;store: &#60;instance of a store&#62;,&#13;&#10;&nbsp;&nbsp;&nbsp;&nbsp;limit: 5000,&#13;&#10;&nbsp;&nbsp;&nbsp;&nbsp;ttl: 300000&#13;&#10;&nbsp;&nbsp;}&#13;&#10;&nbsp;&nbsp;]&#13;&#10;}</pre> | A list of storage tiers for the data. The order indicates where to look first. It's recommended to keep an instance of an in-memory store, like `ha-store/stores/in-memory` as the first one, and then expend to external stores like [ha-store-redis](https://github.com/fed135/ha-redis-adapter). Caching options for the data - `limit` - the maximum number of records, and `ttl` - time to live for a record in milliseconds.
batch | false | <pre>{&#13;&#10;&nbsp;&nbsp;enabled: false,&#13;&#10;&nbsp;&nbsp;delay: 50,&#13;&#10;&nbsp;&nbsp;limit: 100&#13;&#10;}</pre> | Batching options for the requests - `delay` is the amount of time to wait before sending the batch, `limit` is the maximum number of data items to send in a batch.

*All options are in (ms)

Expand All @@ -80,7 +80,8 @@ HA-store emits events to track cache hits, miss and outbound requests.

Event | Format | Description
--- | --- | ---
cacheHit | `<number>` | When the requested item is present in the microcache, or is already being fetched. Prevents another request from being created.
localCacheHit | `<number>` | When the requested item is present in the first listed local store (usually in-memory).
cacheHit | `<number>` | When the requested item is present in a store.
cacheMiss | `<number>` | When the requested item not cached or coalesced and must be fetched.
coalescedHit | `<number>` | When a record query successfully hooks to the promise of the same record in transit.
query | `<object>` | When a batch of requests is about to be sent, gives the detail of the query and what triggered it.
Expand Down
1 change: 1 addition & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

| Version | Supported |
| ------- | ------------------ |
| 4.0.x | :white_check_mark: |
| 3.1.x | :white_check_mark: |
| 3.0.x | :white_check_mark: |
| < 3.0 | :x: |
Expand Down
12 changes: 4 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ha-store",
"version": "3.2.0",
"version": "4.0.0",
"description": "Efficient data fetching",
"main": "src/index.js",
"scripts": {
Expand All @@ -20,11 +20,7 @@
},
"keywords": [
"store",
"availability",
"optimize",
"throughput",
"cache",
"service",
"batch",
"congestion",
"tlru"
Expand All @@ -35,9 +31,9 @@
"author": "frederic charette <[email protected]>",
"license": "Apache-2.0",
"devDependencies": {
"@ha-store/redis": "^4.0.1",
"chai": "^4.3.0",
"eslint": "^8.18.0",
"ha-store-redis": "^2.0.1",
"eslint": "^8.20.0",
"mocha": "^10.0.0",
"sinon": "^14.0.0",
"split2": "^4.1.0"
Expand All @@ -49,6 +45,6 @@
],
"typings": "./src/index.d.ts",
"dependencies": {
"lru-cache": "^7.10.0"
"lru-cache": "^7.13.0"
}
}
63 changes: 25 additions & 38 deletions src/buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ const BufferState = {
COMPLETED: 2,
};

function queryBuffer(config, emitter, targetStore) {
function queryBufferConstructor(config, emitter, caches) {
const buffers = [];

let numCoalesced = 0;
let numCached = 0;
let numMisses = 0;

class RequestBuffer {
constructor(key, params) {
Expand All @@ -26,15 +24,15 @@ function queryBuffer(config, emitter, targetStore) {
}

tick() {
const sizeLimit = config.batch?.limit || 1;
const sizeLimit = (config.batch.enabled && config.batch.limit) || 1;

if (this.ids.length >= sizeLimit) {
this.run('limit');
return this;
}

if (this.timer === null) {
this.timer = setTimeout(this.run.bind(this, 'timeout'), config.batch?.delay || 0);
this.timer = setTimeout(this.run.bind(this, 'timeout'), config.batch.enabled && config.batch.delay || 0);
}

return this;
Expand All @@ -59,45 +57,34 @@ function queryBuffer(config, emitter, targetStore) {
this.state = BufferState.COMPLETED;
emitter.emit('querySuccess', { key: this.contextKey, uid: this.uid, size: this.ids.length, params: this.params });
this.handle.resolve(entries);
if (config.cache !== null) targetStore.set(contextRecordKey(this.contextKey), this.ids, entries || {});
if (config.cache.enabled) caches.set(contextRecordKey(this.contextKey), this.ids, entries || {});
buffers.splice(buffers.indexOf(this), 1);
}
}

function getHandles(key, ids, params, context, cacheResult) {
const handles = Array.from(new Array(ids.length));
for (let i = 0; i < ids.length; i++) {
if (cacheResult[i] !== undefined) {
numCached++;
handles[i] = cacheResult[i];
}
else {
const liveBuffer = buffers.find(buffer => buffer.contextKey === key && buffer.ids.includes(ids[i]));
if (liveBuffer) {
numCoalesced++;
handles[i] = liveBuffer.handle.promise.then(results => results[ids[i]]);
}
else {
numMisses++;
handles[i] = assignQuery(key, ids[i], params, context).handle.promise.then(results => results[ids[i]]);
function getHandles(key, ids, params, context) {
return caches.getMulti(contextRecordKey(key), ids.concat())
.then((handles) => {
for (let i = 0; i < ids.length; i++) {
if (handles[i] === undefined) {
const liveBuffer = buffers.find(buffer => buffer.contextKey === key && buffer.ids.includes(ids[i]));
if (liveBuffer) {
numCoalesced++;
handles[i] = liveBuffer.handle.promise.then(results => results[ids[i]]);
}
else {
handles[i] = assignQuery(key, ids[i], params, context).handle.promise.then(results => results && results[ids[i]]);
}
}
}
}
}

if (numCached > 0) {
emitter.emit('cacheHit', numCached);
numCached = 0;
}
if (numMisses > 0) {
emitter.emit('cacheMiss', numMisses);
numMisses = 0;
}
if (numCoalesced > 0) {
emitter.emit('coalescedHit', numCoalesced);
numCoalesced = 0;
}
if (numCoalesced > 0) {
emitter.track('coalescedHit', numCoalesced);
numCoalesced = 0;
}

return handles;
return handles;
});
}

function assignQuery(key, id, params, context) {
Expand All @@ -124,4 +111,4 @@ function queryBuffer(config, emitter, targetStore) {
return { getHandles, size };
}

module.exports = queryBuffer;
module.exports = queryBufferConstructor;
103 changes: 103 additions & 0 deletions src/caches.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
const {settleAndLog} = require('./utils');

function cachesConstructor(config, emitter) {
const caches = config.cache.enabled && config.cache.tiers.map(tier => tier.store(tier)) || [];
const local = caches.find(cache => cache.local);
const remotes = caches.filter(cache => !cache.local);

function getLocal(key) {
return local && local.get(key);
}

function getMultiLocal(recordKey, keys) {
return local && local.getMulti(recordKey, keys);
}

function get(key) {
if (!config.cache.enabled) return undefined;

const localValue = getLocal(key);
if (localValue !== undefined) {
emitter.track('localCacheHit', 1);
emitter.track('cacheHit', 1);
return localValue;
}

return settleAndLog(remotes.map((remote) => remote.get(key)))
.then((remoteValues) => {
const responseValue = remoteValues.find(value => value !== undefined);
if (responseValue !== undefined) {
emitter.track('cacheHit', 1);
}
else {
emitter.track('cacheMiss', 1);
}
return remoteValues.find((response) => response !== undefined);
});
}

function getMulti(recordKey, keys) {
if (!config.cache.enabled) return Promise.resolve(Array.from(new Array(keys.length), () => undefined));

const localValues = getMultiLocal(recordKey, keys);
const foundLocally = localValues && localValues.filter(value => value !== undefined).length;
if (foundLocally) {
emitter.track('localCacheHit', foundLocally);
emitter.track('cacheHit', foundLocally);
}
if (foundLocally && foundLocally === keys.length) {
return Promise.resolve(localValues);
}

return settleAndLog(remotes.map((remote) => remote.getMulti(recordKey, keys)))
.then((remoteValues) => {
const responseValues = Object.assign(...remoteValues, localValues).map((value) => (value === null || value === undefined) ? undefined : JSON.parse(value));
const foundRemotely = remoteValues.filter((value) => value !== undefined);
const missingValues = responseValues.filter((value) => value === undefined);
emitter.track('cacheHit', foundRemotely.length);
emitter.track('cacheMiss', missingValues.length);
return responseValues;
});
}

function set(recordKey, keys, values) {
local && local.set(recordKey, keys, values);
return Promise.all(remotes.map((remote) => remote.set(recordKey, keys, values))).catch((err) => console.log('error writing', err));
}

function clear(key) {
local && local.clear(key);
remotes.forEach((remote) => remote.clear(key));
return true;
}

function size() {
if (!config.cache.enabled) {
return Promise.resolve({
local: 0,
remote: 0,
status: 'disabled',
});
}

return Promise.resolve(remotes[0] && remotes[0].size())
.then((remoteItems) => {
return {
local: local && local.size(),
remote: remoteItems || 0,
}
});
}

return {
get,
getLocal,
getMulti,
getMultiLocal,
set,
clear,
size,
};
}

module.exports = cachesConstructor;
26 changes: 26 additions & 0 deletions src/emitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
const EventEmitter = require('events').EventEmitter;

class DeferredEmitter extends EventEmitter {
constructor() {
super();
this._counters = {
localCacheHit: 0,
cacheHit: 0,
cacheMiss: 0,
coalescedHit: 0,
};

this._timer = setInterval(() => {
for (const type in this._counters) {
if (this._counters[type] !== 0) this.emit(type, this._counters[type]);
this._counters[type] = 0;
}
}, 1000);
}

track(type, number) {
this._counters[type] += number;
}
}

module.exports = DeferredEmitter;
34 changes: 24 additions & 10 deletions src/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { EventEmitter } from 'events'

type Params = {
[key: string]: string
}
Expand All @@ -8,7 +6,7 @@ export interface HAExternalStore {
get<Response>(key: string): Promise<Response>
getMulti<Response>(recordKey: (contextKey: string) => string, keys: RequestIds): Promise<Response[]>
set<DataType>(recordKey: (contextKey: string) => string, keys: RequestIds, values: DataType): boolean
clear(key?: string): boolean
clear(key: '*' | string): boolean
size(): number
connection?: any
}
Expand All @@ -18,25 +16,41 @@ export interface HAStoreConfig {
resolver<Response, Context>(ids: string[], params?: Params, context?: Context): Promise<{ [id: string]: Response }>
delimiter?: string[]
cache?: {
limit?: number
ttl?: number
enabled: boolean
tiers: {
store: HAExternalStore
limit?: number
ttl?: number
}
}
batch?: {
enabled: boolean
delay?: number
limit?: number
},
store?: HAExternalStore
}
}

type QueryEvent = {
key: string
uid: string
size: number
params: any
error?: Error
}

export interface HAStore extends EventEmitter {
export interface HAStore {
get<Response>(id: string, params?: Params): Promise<Response>
get<Response, Context>(id: string, params?: Params, context?: Context): Promise<Response>
getMany<Response>(id: string[], params?: Params): Promise<{status: string, value: Response}[]>
getMany<Response, Context>(id: string[], params?: Params, context?: Context): Promise<{status: string, value: Response}[]>
set(items: { [id: string]: any }, ids: string[], params?: Params): boolean
clear(ids: string[], params?: Params): void
clear(ids: '*' | string | string[], params?: Params): void
size(): { pendingBuffers: number, activeBuffers: number, records: number }
getStorageKey(id: string, params?: Params): string
on(event: 'cacheHit' | 'cacheMiss' | 'localCacheHit' | 'coalescedHit', callback: (_: number) => any): void
on(event: 'query' | 'queryFailed' | 'querySuccess', callback: (_: QueryEvent) => any): void
once(event: 'cacheHit' | 'cacheMiss' | 'localCacheHit' | 'coalescedHit'): Promise<number>
once(event: 'query' | 'queryFailed' | 'querySuccess'): Promise<QueryEvent>
}

export default function batcher(config: HAStoreConfig, emitter?: EventEmitter): HAStore
export default function batcher(config: HAStoreConfig): HAStore
Loading

0 comments on commit eedde87

Please sign in to comment.