Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

add support for grouping messages in on UDP requests #75

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions lib/statsd.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ var dgram = require('dgram'),
* @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once
* @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent.
* @option global_tags {Array=} Optional tags that will be added to every metric
* @option groupTime {Number} An optional number for grouping all requests in a timeframe
* @option maxGroupSize{Number} An optional number for max packet sieze if grouping is used
* @constructor
*/
var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) {
var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, groupTime, maxGroupSize) {
var options = host || {},
self = this;

Expand All @@ -27,7 +29,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl
globalize : globalize,
cacheDns : cacheDns,
mock : mock === true,
global_tags : global_tags
global_tags : global_tags,
groupTime : groupTime,
maxGroupSize: maxGroupSize
};
}

Expand All @@ -38,6 +42,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl
this.socket = dgram.createSocket('udp4');
this.mock = options.mock;
this.global_tags = options.global_tags || [];
this.groupTime = options.groupTime || 0;
this.maxGroupSize= options.maxGroupSize || 1300;


if(options.cacheDns === true){
dns.lookup(options.host, function(err, address, family){
Expand Down Expand Up @@ -193,7 +200,6 @@ Client.prototype.sendAll = function(stat, value, type, sampleRate, tags, callbac
*/
Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) {
var message = this.prefix + stat + this.suffix + ':' + value + '|' + type,
buf,
merged_tags = [];

if(sampleRate && sampleRate < 1){
Expand All @@ -217,8 +223,35 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback)

// Only send this stat if we're not a mock Client.
if(!this.mock) {
buf = new Buffer(message);
this.socket.send(buf, 0, buf.length, this.port, this.host, callback);
if(typeof callback !== 'function' && this.groupTime > 0){
if(!this.send_message) {
this.send_message = message;
} else {
var newSize = (this.send_message.length + 1 + message.length);
if(newSize > this.maxGroupSize) {
var buf = new Buffer(this.send_message);
this.send_message = message;
this.socket.send(buf, 0, buf.length, this.port, this.host, callback);
} else {
this.send_message += "\n" + message;
}
}

if(!this.timmer_running) {
var self = this;
function send_timer() {
var buf = new Buffer(self.send_message);
self.send_message = undefined;
self.timmer_running = false;
self.socket.send(buf, 0, buf.length, self.port, self.host, callback);
}
setTimeout(send_timer, this.groupTime);
this.timmer_running = true;
}
} else {
var buf = new Buffer(message);
this.socket.send(buf, 0, buf.length, this.port, this.host, callback);
}
} else {
if(typeof callback === 'function'){
callback(null, 0);
Expand Down
52 changes: 50 additions & 2 deletions test/test_statsd.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,23 @@ describe('StatsD', function(){
assert.equal(global.statsd, undefined);
assert.equal(statsd.mock, undefined);
assert.deepEqual(statsd.global_tags, []);
assert.equal(statsd.groupTime, 0);
assert.equal(statsd.maxGroupSize, 1300);
assert.ok(!statsd.mock);
});

it('should set the proper values when specified', function(){
// cachedDns isn't tested here; see below
var statsd = new StatsD('host', 1234, 'prefix', 'suffix', true, null, true, ['gtag']);
var statsd = new StatsD('host', 1234, 'prefix', 'suffix', true, null, true, ['gtag'], 5, 1500);
assert.equal(statsd.host, 'host');
assert.equal(statsd.port, 1234);
assert.equal(statsd.prefix, 'prefix');
assert.equal(statsd.suffix, 'suffix');
assert.equal(statsd, global.statsd);
assert.equal(statsd.mock, true);
assert.deepEqual(statsd.global_tags, ['gtag']);
assert.equal(statsd.groupTime, 5);
assert.equal(statsd.maxGroupSize, 1500);
});

it('should set the proper values with options hash format', function(){
Expand All @@ -108,7 +112,9 @@ describe('StatsD', function(){
suffix: 'suffix',
globalize: true,
mock: true,
global_tags: ['gtag']
global_tags: ['gtag'],
groupTime: 5,
maxGroupSize: 1500
});
assert.equal(statsd.host, 'host');
assert.equal(statsd.port, 1234);
Expand All @@ -117,6 +123,8 @@ describe('StatsD', function(){
assert.equal(statsd, global.statsd);
assert.equal(statsd.mock, true);
assert.deepEqual(statsd.global_tags, ['gtag']);
assert.equal(statsd.groupTime, 5);
assert.equal(statsd.maxGroupSize, 1500);
});

it('should attempt to cache a dns record if dnsCache is specified', function(done){
Expand Down Expand Up @@ -680,4 +688,44 @@ describe('StatsD', function(){
});
});


describe('#groupTime', function(finished){
it('check if messages are grouped', function(finished){
udpTest(function(message, server){
assert.equal(message, 'test:42|g\ntest2:42|g\ntest3:42|g');
server.close();
finished();
}, function(server){
var address = server.address(),
statsd = new StatsD({
host: address.address,
port: address.port,
groupTime: 1
});
statsd.gauge('test', 42);
statsd.gauge('test2', 42);
statsd.gauge('test3', 42);
});
});

it('check if messages are split at size', function(finished){
udpTest(function(message, server){
assert.equal(message, 'test:42|g\ntest2:42|g');
server.close();
finished();
}, function(server){
var address = server.address(),
statsd = new StatsD({
host: address.address,
port: address.port,
groupTime: 1,
maxGroupSize: 20,
});
statsd.gauge('test', 42);
statsd.gauge('test2', 42);
statsd.gauge('this will be in the next message', 42);
});
});
});

});