Skip to content

Commit

Permalink
Merge pull request #13 from coinbase/snatnael/integrate-networklayer
Browse files Browse the repository at this point in the history
Integrate Network Layer with Scheduler
  • Loading branch information
shedonnatnael-cb authored Nov 20, 2023
2 parents daec48f + cba4876 commit 835ecbf
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 59 deletions.
4 changes: 3 additions & 1 deletion src/storage/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { DEFAULT_IDENTITY, identityInit } from './identity';
import { Identity } from '../types/identity';
import { DEFAULT_DEVICE, deviceInit } from './device';
import { Device } from '../types/device';
import { networkLayerInit, DEFAULT_NETWORK_LAYER } from '../utils/networkLayer';
import { DEFAULT_NETWORK_LAYER, networkLayerInit } from '../utils/networkLayer';
import { NetworkLayer } from '../types/networkLayer';

const storage: Storage = {
Expand All @@ -29,10 +29,12 @@ export const init = (config: Config): void => {
storage.config = config;
storage.networkLayer = networkLayerInit();
storage.metricScheduler = createScheduler<Metric>(
storage.networkLayer.sendMetrics,
config.batchMetricsThreshold,
config.batchMetricsPeriod
);
storage.eventScheduler = createScheduler<Event>(
storage.networkLayer.sendEvents,
config.batchEventsThreshold,
config.batchEventsPeriod
);
Expand Down
16 changes: 5 additions & 11 deletions src/utils/networkLayer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
// PR1
// work on this first
// implement sendEvent and sendMetrics
// migrate all tests

// PR2
// and then you move to the integration

import { Event } from '../types/event';
import { Metric } from '../types/metric';
import { getConfig, getIdentity } from '../storage/storage.ts';
Expand All @@ -15,9 +7,11 @@ import { getChecksum } from './dataIntegrity.ts';
import { apiFetch } from './apiFetch.ts';
import { scheduleEvent } from './scheduler.ts';

const NO_OP = () => {};

export const DEFAULT_NETWORK_LAYER = {
sendMetrics: () => null,
sendEvents: () => null,
sendMetrics: NO_OP,
sendEvents: NO_OP,
};

export const sendEvents = (events: Event[]) => {
Expand Down Expand Up @@ -60,13 +54,13 @@ export const sendEvents = (events: Event[]) => {
};

const eventEndPoint = `${apiEndpoint}${eventPath}`;

apiFetch({
url: eventEndPoint,
data: analyticsServiceData,
onError: onError,
});
};

export const sendMetrics = (metrics: Metric[], skipScheduler = false) => {
const { apiEndpoint, metricPath, onError } = getConfig();
const metricEndpoint = `${apiEndpoint}${metricPath}`;
Expand Down
290 changes: 244 additions & 46 deletions src/utils/scheduler.test.ts
Original file line number Diff line number Diff line change
@@ -1,58 +1,256 @@
import { test, expect, describe, beforeEach, vi } from 'vitest';
import { Event } from '../types/event';
import { Metric, MetricType } from '../types/metric';
import { createScheduler } from './scheduler';
import { Scheduler } from '../types/scheduler';

type TestEvent = {
name: string;
id: string;
};
import { getNetworkLayer } from '../storage/storage';

describe('Scheduler', () => {
let scheduler: Scheduler<TestEvent>;
beforeEach(() => {
vi.resetAllMocks();
scheduler = createScheduler<TestEvent>();
});
describe('Events Scheduler', () => {
let scheduler: Scheduler<Event>;
const networkLayer = getNetworkLayer();
let callbackSpy = vi
.spyOn(networkLayer, 'sendEvents')
.mockImplementation(() => null);
beforeEach(() => {
vi.resetAllMocks();
scheduler = createScheduler<Event>(networkLayer.sendEvents);
});

test('should add an event', () => {
const event = { name: 'test', id: '1' };
scheduler.add(event);
expect(scheduler.length).toBe(1);
expect(scheduler.items).toEqual([event]);
});
test('should add an event', () => {
const event = {
action: 'unknown',
component: 'unknown',
name: 'testEvent',
};
scheduler.add(event);
expect(scheduler.length).toBe(1);
expect(callbackSpy).toHaveBeenCalledTimes(0);
expect(scheduler.items).toEqual([event]);
});

test('should add multiple events', () => {
const event1 = { name: 'test', id: '1' };
const event2 = { name: 'test', id: '2' };
scheduler.add(event1);
scheduler.add(event2);
expect(scheduler.length).toBe(2);
expect(scheduler.items).toEqual([event1, event2]);
});
test('should add/consume an event and access the networkLayer when importance is high', () => {
const event = {
action: 'unknown',
component: 'unknown',
name: 'testEvent',
};
scheduler.add(event, 'high');
expect(scheduler.length).toBe(0);
expect(callbackSpy).toHaveBeenCalledTimes(1);
expect(scheduler.items).toEqual([]);
});

test('should add multiple events', () => {
const event1 = {
action: 'unknown',
component: 'unknown',
name: 'testEvent1',
};
const event2 = {
action: 'unknown',
component: 'unknown',
name: 'testEvent2',
};
scheduler.add(event1);
scheduler.add(event2);
expect(scheduler.length).toBe(2);
expect(scheduler.items).toEqual([event1, event2]);
expect(callbackSpy).toHaveBeenCalledTimes(0);
});

test('should add/consume multiple events and access the networkLayer when importance is high', () => {
const event1 = {
action: 'unknown',
component: 'unknown',
name: 'testEvent1',
};
const event2 = {
action: 'unknown',
component: 'unknown',
name: 'testEvent2',
};
scheduler.add(event1, 'high');
scheduler.add(event2, 'high');
expect(scheduler.length).toBe(0);
expect(scheduler.items).toEqual([]);
expect(callbackSpy).toHaveBeenCalledTimes(2);
});

test('should consume events when threshold is reached', () => {
scheduler = createScheduler<Event>(networkLayer.sendEvents, 3);
const event1 = {
action: 'unknown',
component: 'unknown',
name: 'testEvent1',
};
const event2 = {
action: 'unknown',
component: 'unknown',
name: 'testEvent2',
};
const event3 = {
action: 'unknown',
component: 'unknown',
name: 'testEvent3',
};
scheduler.add(event1);
scheduler.add(event2);
scheduler.add(event3);
expect(scheduler.length).toBe(0);
expect(scheduler.items).toEqual([]);
expect(callbackSpy).toHaveBeenCalledTimes(1);
});

test('should consume events when threshold is reached', () => {
scheduler = createScheduler<TestEvent>(3);
const event1 = { name: 'test', id: '1' };
const event2 = { name: 'test', id: '2' };
const event3 = { name: 'test', id: '3' };
scheduler.add(event1);
scheduler.add(event2);
scheduler.add(event3);
expect(scheduler.length).toBe(0);
expect(scheduler.items).toEqual([]);
test('should consume events when time threshold is reached', () => {
vi.useFakeTimers();
scheduler = createScheduler<Event>(networkLayer.sendEvents, 5, 100);
const event1 = {
action: 'unknown',
component: 'unknown',
name: 'testEvent1',
};
const event2 = {
action: 'unknown',
component: 'unknown',
name: 'testEvent2',
};
const event3 = {
action: 'unknown',
component: 'unknown',
name: 'testEvent3',
};
scheduler.add(event1);
scheduler.add(event2);
scheduler.add(event3);
vi.advanceTimersToNextTimer();
expect(scheduler.length).toBe(0);
expect(scheduler.items).toEqual([]);
expect(callbackSpy).toHaveBeenCalledTimes(1);
});
});

test('should consume events when time threshold is reached', () => {
vi.useFakeTimers();
scheduler = createScheduler<TestEvent>(5, 100);
const event1 = { name: 'test', id: '1' };
const event2 = { name: 'test', id: '2' };
const event3 = { name: 'test', id: '3' };
scheduler.add(event1);
scheduler.add(event2);
scheduler.add(event3);
vi.advanceTimersToNextTimer();
expect(scheduler.length).toBe(0);
expect(scheduler.items).toEqual([]);
describe('Metrics Scheduler', () => {
let scheduler: Scheduler<Metric>;
const networkLayer = getNetworkLayer();
let callbackSpy = vi
.spyOn(networkLayer, 'sendMetrics')
.mockImplementation(() => null);
beforeEach(() => {
vi.resetAllMocks();
scheduler = createScheduler<Metric>(networkLayer.sendMetrics);
});

test('should add an Metric', () => {
const metric = {
metricName: 'testMetric',
metricType: MetricType.count,
value: 1,
};
scheduler.add(metric);
expect(scheduler.length).toBe(1);
expect(callbackSpy).toHaveBeenCalledTimes(0);
expect(scheduler.items).toEqual([metric]);
});

test('should add/consume an Metric and access the networkLayer when importance is high', () => {
const metric = {
metricName: 'testMetric',
metricType: MetricType.count,
value: 1,
};
scheduler.add(metric, 'high');
expect(scheduler.length).toBe(0);
expect(callbackSpy).toHaveBeenCalledTimes(1);
expect(scheduler.items).toEqual([]);
});

test('should add multiple Metrics', () => {
const metric1 = {
metricName: 'testMetric1',
metricType: MetricType.count,
value: 1,
};
const metric2 = {
metricName: 'testMetric2',
metricType: MetricType.count,
value: 1,
};
scheduler.add(metric1);
scheduler.add(metric2);
expect(scheduler.length).toBe(2);
expect(scheduler.items).toEqual([metric1, metric2]);
expect(callbackSpy).toHaveBeenCalledTimes(0);
});

test('should add/consume multiple Metrics and access the networkLayer when importance is high', () => {
const metric1 = {
metricName: 'testMetric',
metricType: MetricType.count,
value: 1,
};
const metric2 = {
metricName: 'testMetric',
metricType: MetricType.count,
value: 1,
};
scheduler.add(metric1, 'high');
scheduler.add(metric2, 'high');
expect(scheduler.length).toBe(0);
expect(scheduler.items).toEqual([]);
expect(callbackSpy).toHaveBeenCalledTimes(2);
});

test('should consume Metrics when threshold is reached', () => {
scheduler = createScheduler<Metric>(networkLayer.sendMetrics, 3);
const metric1 = {
metricName: 'testMetric',
metricType: MetricType.count,
value: 1,
};
const metric2 = {
metricName: 'testMetric',
metricType: MetricType.count,
value: 1,
};
const metric3 = {
metricName: 'testMetric',
metricType: MetricType.count,
value: 1,
};
scheduler.add(metric1);
scheduler.add(metric2);
scheduler.add(metric3);
expect(scheduler.length).toBe(0);
expect(scheduler.items).toEqual([]);
expect(callbackSpy).toHaveBeenCalledTimes(1);
});

test('should consume Metrics when time threshold is reached', () => {
vi.useFakeTimers();
scheduler = createScheduler<Metric>(networkLayer.sendMetrics, 5, 100);
const metric1 = {
metricName: 'testMetric',
metricType: MetricType.count,
value: 1,
};
const metric2 = {
metricName: 'testMetric',
metricType: MetricType.count,
value: 1,
};
const metric3 = {
metricName: 'testMetric',
metricType: MetricType.count,
value: 1,
};
scheduler.add(metric1);
scheduler.add(metric2);
scheduler.add(metric3);
vi.advanceTimersToNextTimer();
expect(scheduler.length).toBe(0);
expect(scheduler.items).toEqual([]);
expect(callbackSpy).toHaveBeenCalledTimes(1);
});
});
});
6 changes: 5 additions & 1 deletion src/utils/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ export const DEFAULT_SCHEDULER = {
};

export const createScheduler = <T>(
sendData: (items: T[]) => void,
batchThreshold = DEFAULT_BATCH_THRESHOLD,
timeThreshold = DEFAULT_TIME_THRESHOLD
): Scheduler<T> => {
const queue = createQueue<T>();
const add = (item: T, importance = 'low'): void => {
queue.add(item);

// todo: if we already consumed the queue we should reset the timeout
if (queue.length >= batchThreshold || importance === 'high') {
consume();
Expand All @@ -30,9 +30,13 @@ export const createScheduler = <T>(
if (queue.length === 0) {
return;
}
sendData(queue.items);
queue.flush();
};

/**
* Schedule the consume function to run every timeThreshold
*/
const schedule = () =>
setTimeout(() => {
consume();
Expand Down

0 comments on commit 835ecbf

Please sign in to comment.