-
Notifications
You must be signed in to change notification settings - Fork 58
/
client.js
112 lines (98 loc) · 2.94 KB
/
client.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
var net = require('net');
var fs = require('fs');
var sys = require('sys');
var events = require('events');
var crypto = require('crypto');
var StompFrame = require('./frame').StompFrame;
var StompFrameEmitter = require('./parser').StompFrameEmitter;
var StompServerCommands = [
'CONNECTED',
'MESSAGE',
'RECEIPT',
'ERROR',
];
function StompClient(address, port, user, pass) {
events.EventEmitter.call(this);
var self = this;
self.user = user;
self.pass = pass;
self.subscriptions = {};
self.stream = net.createConnection(port, address);
self.stream.on('connect', function() {
self.onConnect();
});
};
sys.inherits(StompClient, events.EventEmitter);
StompClient.prototype.onConnect = function() {
var self = this;
// First set up the frame parser
var frameEmitter = new StompFrameEmitter(StompServerCommands);
self.stream.on('data', function (data) {
frameEmitter.handleData(data);
});
self.stream.on('end', function () {
self.stream.end();
self.emit('disconnect');
});
// Listen for events on it
frameEmitter.on('frame', function(frame) {
console.log('Received Frame: ' + frame);
if (frame.command == 'MESSAGE') {
self.subscriptions[frame.headers.destination].map(function(callback) {
callback(frame.body, frame.headers);
});
}
if (frame.command == 'CONNECTED') {
self.emit('connect', frame.headers.session);
}
});
frameEmitter.on('error', function(err) {
console.log('Error Parsing Message: ' + err['message']);
});
// Send the CONNECT frame
var frame = new StompFrame({
command: 'CONNECT',
headers: {
'login': self.user,
'passcode': self.pass,
},
}).send(self.stream);
};
StompClient.prototype.subscribe = function(queue, callback) {
if (!(queue in this.subscriptions)) {
this.subscriptions[queue] = [];
new StompFrame({
command: 'SUBSCRIBE',
headers: {
destination: queue,
},
}).send(this.stream);
}
this.subscriptions[queue].push(callback);
};
StompClient.prototype.publish = function(queue, message) {
new StompFrame({
command: 'SEND',
headers: {
destination: queue,
},
body: message,
}).send(this.stream);
};
function SecureStompClient(address, port, user, pass, credentials) {
events.EventEmitter.call(this);
var self = this;
self.user = user;
self.pass = pass;
self.subscriptions = {};
self.stream = net.createConnection(port, address);
self.stream.on('connect', function() {
self.stream.setSecure(credentials);
});
self.stream.on('secure', function() {
self.onConnect();
});
};
sys.inherits(SecureStompClient, StompClient);
exports.StompClient = StompClient;
exports.SecureStompClient = SecureStompClient;