Skip to content

Commit

Permalink
added test, fixed serializers
Browse files Browse the repository at this point in the history
  • Loading branch information
fed135 committed Mar 18, 2020
1 parent aa5fe42 commit 13011e5
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 27 deletions.
33 changes: 27 additions & 6 deletions packages/kalm/src/components/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ export function Client(params: ClientConfig, emitter: NodeJS.EventEmitter, handl
function _wrap(event: RawFrame): void {
const payload: Buffer = params.framing === 'kalm'
? serializeLegacy(event.frameId, channels[event.channel], event.packets)
: Buffer.from(JSON.stringify({ frameId: event.frameId, channel: event.channel, packets: event.packets }));
: Buffer.from(JSON.stringify({
frameId: event.frameId,
channel: event.channel,
packets: event.packets.map(packet => packet.toString()),
}));

socket.send(handle, payload);
}

Expand All @@ -43,8 +48,16 @@ export function Client(params: ClientConfig, emitter: NodeJS.EventEmitter, handl

function _handlePackets(frame: RawFrame, packet: Buffer, index: number): Promise<void> {
if (packet.length === 0) return;
const decodedPacket = (params.json === true) ? JSON.parse(packet.toString()) : packet;
if (channels[frame.channel]) {

let decodedPacket;

try {
decodedPacket = (params.json === true) ? JSON.parse(packet.toString()) : packet;
} catch (e) {
emitter.emit('error', `Error decoding packet: ${e}`);
}

if (decodedPacket && channels[frame.channel]) {
channels[frame.channel].emitter.emit(
'message',
decodedPacket,
Expand Down Expand Up @@ -76,9 +89,14 @@ export function Client(params: ClientConfig, emitter: NodeJS.EventEmitter, handl
}

function _handleRequest(payload: Buffer): void {
const frame: RawFrame = params.framing === 'kalm' ? deserializeLegacy(payload) : JSON.parse(payload.toString());
emitter.emit('frame', frame);
frame.packets.forEach((packet, i) => _handlePackets(frame, packet, i));
let frame: RawFrame;
try {
frame = params.framing === 'kalm' ? deserializeLegacy(payload) : JSON.parse(payload.toString());
} catch (e) {
emitter.emit(`Error decoding frame: ${e}`);
}
emitter.emit('frame', frame || payload.toString());
if (frame && frame.packets) frame.packets.forEach((packet, i) => _handlePackets(frame, Buffer.from(packet), i));
}

function _handleDisconnect() {
Expand All @@ -87,6 +105,9 @@ export function Client(params: ClientConfig, emitter: NodeJS.EventEmitter, handl
}

function write(channel: string, message: Serializable): void {
if (params.json !== true && !Buffer.isBuffer(message)) {
throw new Error(`Unable to serialize message: ${message}, expected type Buffer`);
}
return _resolveChannel(channel)
.queue.add(params.json === true ? Buffer.from(JSON.stringify(message)) : message as Buffer);
}
Expand Down
34 changes: 28 additions & 6 deletions packages/kalm/src/routines/dynamic.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
/* Methods -------------------------------------------------------------------*/

export function dynamic({ hz, maxPackets = Infinity }: { hz: number, maxPackets?: number }): KalmRoutine {
export function dynamic({
hz,
maxPackets = Infinity,
maxBytes = 60000,
}: { hz: number, maxPackets?: number, maxBytes?: number }): KalmRoutine {
if (hz <= 0 || hz > 1000) {
throw new Error(`Unable to set Hertz value of ${hz}. Must be between 0.1e13 and 1000`);
}

return function queue(channel: string, params: object, channelEmitter: NodeJS.EventEmitter, clientEmitter: NodeJS.EventEmitter): Queue {
let timer: NodeJS.Timer = null;
const packets: Buffer[] = [];
let totalBytes = 0;
let i: number = 0;

function _step(): void {
Expand All @@ -16,21 +21,38 @@ export function dynamic({ hz, maxPackets = Infinity }: { hz: number, maxPackets?
channelEmitter.emit('runQueue', { frameId: i++, channel, packets });
if (i > 255) i = 0;
packets.length = 0;
totalBytes = 0;
clientEmitter.emit(`${channel}.queueRun`, { frameId: i, packets: packets.length });
}

function add(packet: Buffer): void {
function _add(packet: Buffer) {
packets.push(packet);
totalBytes += packet.length;
clientEmitter.emit(`${channel}.queueAdd`, { frameId: i, packet: packets.length });
}

function add(packet: Buffer) {
if (packet.length > maxBytes) {
throw new Error(`Cannot send packet of size ${packet.length} while maximum bytes per frame is ${maxBytes}`);
}

if (packet.length + totalBytes >= maxBytes) {
_step();
_add(packet);
return;
}

if (packets.length >= maxPackets - 1) {
packets.push(packet);
_add(packet);
_step();
clientEmitter.emit(`${channel}.queueAdd`, { frameId: i, packet: packets.length });
return;
}

if (timer === null) {
timer = setTimeout(_step, Math.round(1000 / hz));
}
packets.push(packet);
clientEmitter.emit(`${channel}.queueAdd`, { frameId: i, packet: packets.length });

_add(packet);
}

function size(): number { return packets.length; }
Expand Down
34 changes: 23 additions & 11 deletions packages/kalm/src/utils/parser.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
/* Local variables -----------------------------------------------------------*/

const singleIndiceCache = {};
const doubleIndiceCache = {};

/* Methods -------------------------------------------------------------------*/

export function doubleIndiceBuffer(num) {
const buf = Buffer.allocUnsafe(2);
buf.writeUInt16BE(num, 0);
export function indiceBuffer(num): Buffer {
if (singleIndiceCache[`${num}`]) return singleIndiceCache[`${num}`];
const buf = Buffer.allocUnsafe(1);
buf.writeUInt8(num, 0);
singleIndiceCache[`${num}`] = buf;
return buf;
}

export function indiceBuffer(num) {
const buf = Buffer.allocUnsafe(1);
buf.writeUInt8(num, 0);
export function doubleIndiceBuffer(num): Buffer {
if (doubleIndiceCache[`${num}`]) return doubleIndiceCache[`${num}`];
const buf = Buffer.allocUnsafe(2);
buf.writeUInt16BE(num, 0);
doubleIndiceCache[`${num}`] = buf;
return buf;
}

Expand All @@ -17,14 +26,17 @@ function _numericSize(bytes: Buffer, index: number): number {
}

export function serializeLegacy(frameId: number, channel: Channel, packets: Buffer[]): Buffer {
const serializedPackets = packets.reduce((acc, curr) => {
acc.push(doubleIndiceBuffer(curr.length));
acc.push(curr);
return acc;
}, []);

return Buffer.concat([
indiceBuffer(frameId % 255),
channel.channelBuffer,
doubleIndiceBuffer(packets.length),
...packets.map((packet: Buffer) => {
if (!(packet instanceof Buffer)) throw new Error(`Cannot send packet ${packet}. Must be of type Buffer`);
return Buffer.concat([doubleIndiceBuffer(packet.length), packet]);
}),
...serializedPackets,
]);
}

Expand All @@ -36,7 +48,7 @@ export function deserializeLegacy(payload: Buffer): RawFrame {
function _parseFramePacket(): Buffer[] {
const packets: Buffer[] = [];
for (let p = 0; p < totalPackets; p++) {
if (caret >= payload.length) continue;
if (caret >= payload.length) break;
const packetLength = _numericSize(payload, caret);
packets.push(payload.slice(2 + caret, 2 + packetLength + caret));
caret = 2 + caret + packetLength;
Expand Down
2 changes: 1 addition & 1 deletion packages/kalm/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ declare module 'kalm' {
export const connect: (config: ClientConfig) => Client;
export const routines: {
tick: (config: { hz: number, seed?: number }) => KalmRoutine
dynamic: (config: { hz: number, maxPackets?: number }) => KalmRoutine
dynamic: (config: { hz: number, maxPackets?: number, maxBytes?: number }) => KalmRoutine
realtime: () => KalmRoutine
};
}
2 changes: 2 additions & 0 deletions scripts/benchmarks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ adpts.forEach((i) => {

tests.push(_postResults);

console.log(`Launching benchmarks for ${settings.testDuration/1000} second(s) -- MAKE SURE THAT YOU BUILD THE CODE FIRST --`)

tests.reduce(
(c, n) => c.then((resolve) => new Promise(n).then(resolve, _errorHandler), _errorHandler),
Promise.resolve(),
Expand Down
3 changes: 2 additions & 1 deletion scripts/benchmarks/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ module.exports = {
transport: 'tcp',
port: 3001,
routine: ['dynamic', { hz: 200 }],
testDuration: 1000 * 10,
testDuration: 1000 * 1,
testPayload: { foo: 'bar' },
testChannel: 'test',
framing: null,
};
10 changes: 10 additions & 0 deletions scripts/benchmarks/transports/kalm.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function setup(resolve) {
server = Kalm.listen({
port: settings.port,
json: true,
framing: settings.framing,
transport: transports[settings.transport](),
routine: Kalm.routines[settings.routine[0]](settings.routine[1]),
});
Expand All @@ -37,6 +38,10 @@ function setup(resolve) {
c.subscribe(settings.testChannel, (msg) => c.write(settings.testChannel, msg));
});

server.on('error', (e) => {
console.error('Server error:', e);
});

handbreak = false;
setTimeout(resolve, 0);
}
Expand All @@ -59,10 +64,15 @@ function step(resolve) {
client = Kalm.connect({
port: settings.port,
json: true,
framing: settings.framing,
transport: transports[settings.transport](),
routine: Kalm.routines.realtime(),
});
client.subscribe(settings.testChannel, () => count++);

client.on('error', (e) => {
console.error('Client error:', e);
});
}

client.write(settings.testChannel, settings.testPayload);
Expand Down
2 changes: 1 addition & 1 deletion scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
echo "Building" $1
../../scripts/cleanup.sh
cp ../../tsconfig.json ./tsconfig.json
../../node_modules/typescript/bin/tsc --outDir ./bin
../../node_modules/typescript/bin/tsc --outDir ./bin >/dev/null
echo "Build completed, cleaning up"
rm -rf ./tsconfig.json
rm -rf ./dist
37 changes: 37 additions & 0 deletions tests/integration/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,33 @@ describe('Integration tests', () => {
done();
});
});
server.on('error', e => {
throw new Error(e);
});

const client = connect({ transport: soc });
client.on('error', e => {
throw new Error(e);
});
client.write('test', payload);
});

it(`should handle foreign characters with ${transport}`, done => {
const payload = { foo: '한자' };
server.on('connection', c => {
c.subscribe('test', data => {
expect(data).toEqual(payload);
done();
});
});
server.on('error', e => {
throw new Error(e);
});

const client = connect({ transport: soc });
client.on('error', e => {
throw new Error(e);
});
client.write('test', payload);
});

Expand All @@ -58,8 +83,14 @@ describe('Integration tests', () => {
done();
});
});
server.on('error', e => {
throw new Error(e);
});

const client = connect({ transport: soc });
client.on('error', e => {
throw new Error(e);
});
client.write('test.large', largePayload);
});

Expand All @@ -74,8 +105,14 @@ describe('Integration tests', () => {

c.unsubscribe('test');
});
server.on('error', e => {
throw new Error(e);
});

const client = connect({ transport: soc });
client.on('error', e => {
throw new Error(e);
});
setTimeout(() => client.write('test', payload), 100);
setTimeout(() => done(), 200);
});
Expand Down
2 changes: 1 addition & 1 deletion types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ declare module 'kalm' {
export const connect: (config: ClientConfig) => Client;
export const routines: {
tick: (config: { hz: number, seed?: number }) => KalmRoutine
dynamic: (config: { hz: number, maxPackets?: number }) => KalmRoutine
dynamic: (config: { hz: number, maxPackets?: number, maxBytes?: number }) => KalmRoutine
realtime: () => KalmRoutine
};
}

0 comments on commit 13011e5

Please sign in to comment.