diff --git a/lib/statsd.js b/lib/statsd.js index f27383b..4bc162f 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -12,9 +12,11 @@ var dgram = require('dgram'), * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. * @option global_tags {Array=} Optional tags that will be added to every metric + * @option groupTime {Number} An optional number for grouping all requests in a timeframe + * @option maxGroupSize{Number} An optional number for max packet sieze if grouping is used * @constructor */ -var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) { +var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, groupTime, maxGroupSize) { var options = host || {}, self = this; @@ -27,7 +29,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl globalize : globalize, cacheDns : cacheDns, mock : mock === true, - global_tags : global_tags + global_tags : global_tags, + groupTime : groupTime, + maxGroupSize: maxGroupSize }; } @@ -38,6 +42,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.socket = dgram.createSocket('udp4'); this.mock = options.mock; this.global_tags = options.global_tags || []; + this.groupTime = options.groupTime || 0; + this.maxGroupSize= options.maxGroupSize || 1300; + if(options.cacheDns === true){ dns.lookup(options.host, function(err, address, family){ @@ -193,7 +200,6 @@ Client.prototype.sendAll = function(stat, value, type, sampleRate, tags, callbac */ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) { var message = this.prefix + stat + this.suffix + ':' + value + '|' + type, - buf, merged_tags = []; if(sampleRate && sampleRate < 1){ @@ -217,8 +223,35 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) // Only send this stat if we're not a mock Client. if(!this.mock) { - buf = new Buffer(message); - this.socket.send(buf, 0, buf.length, this.port, this.host, callback); + if(typeof callback !== 'function' && this.groupTime > 0){ + if(!this.send_message) { + this.send_message = message; + } else { + var newSize = (this.send_message.length + 1 + message.length); + if(newSize > this.maxGroupSize) { + var buf = new Buffer(this.send_message); + this.send_message = message; + this.socket.send(buf, 0, buf.length, this.port, this.host, callback); + } else { + this.send_message += "\n" + message; + } + } + + if(!this.timmer_running) { + var self = this; + function send_timer() { + var buf = new Buffer(self.send_message); + self.send_message = undefined; + self.timmer_running = false; + self.socket.send(buf, 0, buf.length, self.port, self.host, callback); + } + setTimeout(send_timer, this.groupTime); + this.timmer_running = true; + } + } else { + var buf = new Buffer(message); + this.socket.send(buf, 0, buf.length, this.port, this.host, callback); + } } else { if(typeof callback === 'function'){ callback(null, 0); diff --git a/test/test_statsd.js b/test/test_statsd.js index 0fbb314..dbc1e31 100644 --- a/test/test_statsd.js +++ b/test/test_statsd.js @@ -84,12 +84,14 @@ describe('StatsD', function(){ assert.equal(global.statsd, undefined); assert.equal(statsd.mock, undefined); assert.deepEqual(statsd.global_tags, []); + assert.equal(statsd.groupTime, 0); + assert.equal(statsd.maxGroupSize, 1300); assert.ok(!statsd.mock); }); it('should set the proper values when specified', function(){ // cachedDns isn't tested here; see below - var statsd = new StatsD('host', 1234, 'prefix', 'suffix', true, null, true, ['gtag']); + var statsd = new StatsD('host', 1234, 'prefix', 'suffix', true, null, true, ['gtag'], 5, 1500); assert.equal(statsd.host, 'host'); assert.equal(statsd.port, 1234); assert.equal(statsd.prefix, 'prefix'); @@ -97,6 +99,8 @@ describe('StatsD', function(){ assert.equal(statsd, global.statsd); assert.equal(statsd.mock, true); assert.deepEqual(statsd.global_tags, ['gtag']); + assert.equal(statsd.groupTime, 5); + assert.equal(statsd.maxGroupSize, 1500); }); it('should set the proper values with options hash format', function(){ @@ -108,7 +112,9 @@ describe('StatsD', function(){ suffix: 'suffix', globalize: true, mock: true, - global_tags: ['gtag'] + global_tags: ['gtag'], + groupTime: 5, + maxGroupSize: 1500 }); assert.equal(statsd.host, 'host'); assert.equal(statsd.port, 1234); @@ -117,6 +123,8 @@ describe('StatsD', function(){ assert.equal(statsd, global.statsd); assert.equal(statsd.mock, true); assert.deepEqual(statsd.global_tags, ['gtag']); + assert.equal(statsd.groupTime, 5); + assert.equal(statsd.maxGroupSize, 1500); }); it('should attempt to cache a dns record if dnsCache is specified', function(done){ @@ -680,4 +688,44 @@ describe('StatsD', function(){ }); }); + + describe('#groupTime', function(finished){ + it('check if messages are grouped', function(finished){ + udpTest(function(message, server){ + assert.equal(message, 'test:42|g\ntest2:42|g\ntest3:42|g'); + server.close(); + finished(); + }, function(server){ + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + groupTime: 1 + }); + statsd.gauge('test', 42); + statsd.gauge('test2', 42); + statsd.gauge('test3', 42); + }); + }); + + it('check if messages are split at size', function(finished){ + udpTest(function(message, server){ + assert.equal(message, 'test:42|g\ntest2:42|g'); + server.close(); + finished(); + }, function(server){ + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + groupTime: 1, + maxGroupSize: 20, + }); + statsd.gauge('test', 42); + statsd.gauge('test2', 42); + statsd.gauge('this will be in the next message', 42); + }); + }); + }); + });