Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-point-cache): added reconnection strategy to redis #477

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 44 additions & 17 deletions packages/data-point-cache/lib/redis-client.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
/* eslint-disable no-console */
const ms = require("ms");
const _ = require("lodash");
const { backOff } = require("exponential-backoff");
const EventEmitter = require("events");
const IORedis = require("./io-redis");

function reconnectOnError(err) {
console.error("ioredis - reconnectOnError", err.toString());
return true;
}

function redisDecorator(redis, resolve, reject) {
function redisDecorator(redis, options = {}, resolve, reject) {
let wasConnected = false;
redis.on("error", error => {

redis.on("error", async error => {
console.error("ioredis - error", error.toString());
if (!wasConnected) {
redis.disconnect();
reject(error);

if (options.backoff.enable) {
await backOff(redis.connect, {
numOfAttempts: 100,
startingDelay: 1000,
...options.backoff.options
});
options.backoff.bus.emit("redis:backoff:reconnected");
}
}
});

Expand All @@ -40,7 +53,7 @@ function factory(options) {
reconnectOnError
});
const redis = new IORedis(opts);
redisDecorator(redis, resolve, reject);
redisDecorator(redis, options, resolve, reject);
});
}

Expand Down Expand Up @@ -110,21 +123,35 @@ function bootstrap(cache) {
return cache;
}

class RedisInstance {
constructor(options = {}) {
this.emitter = new EventEmitter();
this.cache = {
redis: null,
set: null,
get: null,
del: null,
exists: null,
options
};

this.emitter.on("redis:backoff:reconnected", () => {
_.set(this.cache, "options.backoff.enable", false);
this.init();
});
}

async init() {
if (_.get(this.cache, "options.backoff.enable")) {
this.cache.options.backoff.bus = this.emitter;
}
this.cache.redis = await factory(this.cache.options.redis);
return bootstrap(this.cache);
}
}

async function create(options = {}) {
const cache = {
redis: null,
set: null,
get: null,
del: null,
exists: null,
options
};

const redis = await factory(cache.options.redis);
// eslint-disable-next-line no-param-reassign
cache.redis = redis;

return bootstrap(cache);
return new RedisInstance(options).init();
}

module.exports = {
Expand Down
27 changes: 18 additions & 9 deletions packages/data-point-cache/lib/redis-client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ describe("create", () => {
});
});

describe("create with backoff", () => {
test("It should create a new redis promisified instance with retry strategy", async () => {
const redisClient = await RedisClient.create({ backoff: { enable: true } });

redisClient.redis.emit("error");
console.log(redisClient.redis.get);

expect(typeof redisClient.redis.get === "function").toBeTruthy();

redisClient.redis.emit("connect");
console.log(redisClient.redis.get);
expect(typeof redisClient.redis.get === "function").toBeTruthy();
});
});

describe("get/set/exists", () => {
test("It should test get/set/exists functionality", () => {
return RedisClient.create().then(redisClient => {
Expand Down Expand Up @@ -210,19 +225,13 @@ describe("get/set/exists", () => {
});

describe("redisDecorator", () => {
const consoleError = console.error;
const consoleInfo = console.info;
afterAll(() => {
console.error = consoleError;
console.info = consoleInfo;
});
test("It should execute resolve when ready", done => {
const redis = new EventEmitter();
const resolve = result => {
expect(redis === result).toBeTruthy();
done();
};
RedisClient.redisDecorator(redis, resolve, () => {});
RedisClient.redisDecorator(redis, {}, resolve, () => {});
redis.emit("ready");
});

Expand All @@ -233,15 +242,15 @@ describe("redisDecorator", () => {
expect(redis.disconnect).toBeCalled();
done();
};
RedisClient.redisDecorator(redis, () => {}, reject);
RedisClient.redisDecorator(redis, {}, () => {}, reject);
redis.emit("error", new Error("test"));
});

test("It should log error when already connected", () => {
const redis = new EventEmitter();
// eslint-disable-next-line no-console
console.error = jest.fn();
RedisClient.redisDecorator(redis, () => {});
RedisClient.redisDecorator(redis, {}, () => {});
redis.emit("connect");
redis.emit("error", new Error("test"));
// eslint-disable-next-line no-console
Expand Down
3 changes: 2 additions & 1 deletion packages/data-point-cache/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
"dependencies": {
"ioredis": "4.12.2",
"lodash": "^4.17.20",
"ms": "2.x"
"ms": "2.x",
"exponential-backoff": "3.1.0"
},
"keywords": [
"cache",
Expand Down
Loading