Skip to content

Commit

Permalink
bug fixes and string concat optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
Shahbaz committed Jun 16, 2010
1 parent 3886a0b commit 6796f94
Showing 1 changed file with 66 additions and 44 deletions.
110 changes: 66 additions & 44 deletions fix.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
//TODO
//Server should keep track of who is logged on
//Allow server to send messages to individual connections
//Check duplicate senderCompIDs


var net = require("net");
var events = require("events");
var sys = require("sys");
var logger = require('../node-logger/logger').createLogger();


//Utility methods

logger.format = function(level, timestamp, message) {
return ["[", timestamp.getUTCFullYear() ,"/", timestamp.getUTCMonth() ,"/", timestamp.getUTCDay() , "-" , timestamp.getUTCHours() , ":" , timestamp.getUTCMinutes() , ":" , timestamp.getUTCSeconds() , "." , timestamp.getUTCMilliseconds() , "] " , message].join("");
};

function checksum(str){
var chksm = 0;
for(var i=0; i<str.length; i++){
Expand Down Expand Up @@ -65,10 +78,10 @@ function Session(stream, isInitiator, opt) {


/*this.addListener("connect", function () {
sys.log("New session started");
logger.info("New session started");
});*/
this.addListener("end", function () {
//sys.log("Session ended");
//logger.info("Session ended");
clearInterval(heartbeatIntervalID);
});

Expand All @@ -91,7 +104,7 @@ function Session(stream, isInitiator, opt) {
}

if (currentTime - timeOfLastIncoming > heartbeatDuration * 3) {
sys.log("[ERROR] No message received from counterparty and no response to test request.");
logger.info("[ERROR] No message received from counterparty and no response to test request.");
stream.end();
return;
}
Expand All @@ -113,7 +126,7 @@ function Session(stream, isInitiator, opt) {
delete msg["56"]; //sendercompid
delete msg["49"]; //targetcompid
delete msg["34"]; //seqnum
var headermsg = "";
var headermsgarr = [];
for (var f in headers) {
if (headers.hasOwnProperty(f)) {
var tag = headers[f];
Expand All @@ -123,24 +136,30 @@ function Session(stream, isInitiator, opt) {
}

if (tag.charAt(tag.length - 1) != "?" && msg[tag] === undefined) { //If tag is required, but missing
sys.log("[ERROR] tag " + tag + " is required but missing in outgoing message: " + msg);
logger.info("[ERROR] tag " + tag + " is required but missing in outgoing message: " + msg);
return;
}

if (msg[tag] !== undefined) {
headermsg += tag + "=" + msg[tag] + SOHCHAR;
headermsgarr.push(tag, "=", msg[tag], SOHCHAR);
delete msg[tag];
}
}
}
//var headermsg = headermsgarr.join("");
//headermsgarr = [];

var timestamp = new Date();
headermsg += "52=" + timestamp.getUTCFullYear() + timestamp.getUTCMonth() + timestamp.getUTCDay() + "-" + timestamp.getUTCHours() + ":" + timestamp.getUTCMinutes() + ":" + timestamp.getUTCSeconds() + "." + timestamp.getUTCMilliseconds() + SOHCHAR;
headermsg += "56=" + (senderCompIDExtracted || senderCompID) + SOHCHAR;
headermsg += "49=" + (targetCompIDExtracted || targetCompID) + SOHCHAR;
headermsg += "34=" + (outgoingSeqNum++) + SOHCHAR;

var trailermsg = "";
headermsgarr.push("52=" , timestamp.getUTCFullYear() , timestamp.getUTCMonth() , timestamp.getUTCDay() , "-" , timestamp.getUTCHours() , ":" , timestamp.getUTCMinutes() , ":" , timestamp.getUTCSeconds() , "." , timestamp.getUTCMilliseconds() , SOHCHAR);
//headermsg += "52=" + timestamp.getUTCFullYear() + timestamp.getUTCMonth() + timestamp.getUTCDay() + "-" + timestamp.getUTCHours() + ":" + timestamp.getUTCMinutes() + ":" + timestamp.getUTCSeconds() + "." + timestamp.getUTCMilliseconds() + SOHCHAR;
headermsgarr.push("56=" , (senderCompIDExtracted || senderCompID) , SOHCHAR);
//headermsg += "56=" + (senderCompIDExtracted || senderCompID) + SOHCHAR;
headermsgarr.push("49=" , (targetCompIDExtracted || targetCompID) , SOHCHAR);
//headermsg += "49=" + (targetCompIDExtracted || targetCompID) + SOHCHAR;
headermsgarr.push("34=" , (outgoingSeqNum++) , SOHCHAR);
//headermsg += "34=" + (outgoingSeqNum++) + SOHCHAR;

var trailermsgarr = [];
for (var f in trailers) {
if (trailers.hasOwnProperty(f)) {
var tag = trailers[f];
Expand All @@ -150,31 +169,37 @@ function Session(stream, isInitiator, opt) {
}

if (tag.charAt(tag.length - 1) != "?" && msg[tag] === undefined) { //If tag is required, but missing
sys.log("[ERROR] tag " + tag + " is required but missing in outgoing message: " + msg);
logger.info("[ERROR] tag " + tag + " is required but missing in outgoing message: " + msg);
return;
}

if (msg[tag] !== undefined) {
trailermsg += tag + "=" + msg[tag] + SOHCHAR;
trailermsgarr.push(tag , "=" , msg[tag] , SOHCHAR);
delete msg[tag];
}

}
}

var bodymsg = "";
var bodymsgarr = [];
for (var tag in msg) {
if (msg.hasOwnProperty(tag)) {
bodymsg += tag + "=" + msg[tag] + SOHCHAR;
bodymsgarr.push( tag , "=" , msg[tag] , SOHCHAR);
}
}

var outmsg = "";
outmsg += "8=" + fixVersion + SOHCHAR;
outmsg += "9=" + (headermsg.length + bodymsg.length + trailermsg.length) + SOHCHAR;
outmsg += headermsg;
outmsg += bodymsg;
outmsg += trailermsg;
var headermsg = headermsgarr.join("");
var trailermsg = trailermsgarr.join("");
var bodymsg = bodymsgarr.join("");

var outmsgarr = [];
outmsgarr.push( "8=" , fixVersion , SOHCHAR);
outmsgarr.push( "9=" , (headermsg.length + bodymsg.length + trailermsg.length) , SOHCHAR);
outmsgarr.push( headermsg);
outmsgarr.push( bodymsg);
outmsgarr.push( trailermsg);

var outmsg = outmsgarr.join("");

/*var checksum = 0;
for (var x in outmsg) {
Expand All @@ -197,7 +222,7 @@ function Session(stream, isInitiator, opt) {

outmsg += "10=" + checksum(outmsg) + SOHCHAR;

sys.log("FIX out:" + outmsg);
logger.info("FIX out:" + outmsg);
timeOfLastOutgoing = new Date().getTime();
//this.stream.write(outmsg);
stream.write(outmsg);
Expand All @@ -212,15 +237,15 @@ function Session(stream, isInitiator, opt) {
//var handle = function (data) {
function handlefix(data){

//sys.log("++++++++data received: " + data);
//logger.info("++++++++data received: " + data);

//Add data to the buffer (to avoid processing fragmented TCP packets)
//var databuffer = this.databufferx + data;
databuffer = databuffer + data;
timeOfLastIncoming = new Date().getTime();

while (databuffer.length > 0) {
//sys.log("-------NEW LOOP:" + databuffer.length + ":" + databuffer);
//logger.info("-------NEW LOOP:" + databuffer.length + ":" + databuffer);

//====Step 1: Extract complete FIX message====
//If we don't have enough data to start extracting body length, wait for more data
Expand All @@ -232,7 +257,7 @@ function Session(stream, isInitiator, opt) {
var idxOfEndOfTag9 = parseInt(_idxOfEndOfTag9Str, 10) + ENDOFTAG8;

if (isNaN(idxOfEndOfTag9)) {
sys.log("[ERROR] Unable to find the location of the end of tag 9. Message probably misformed: " + databuffer.toString());
logger.info("[ERROR] Unable to find the location of the end of tag 9. Message probably misformed: " + databuffer.toString());
stream.end();
return;
}
Expand All @@ -241,7 +266,7 @@ function Session(stream, isInitiator, opt) {
//If we don't have enough data to stop extracting body length AND we have received a lot of data
//then perhaps there is a problem with how the message is formatted and the session should be killed
if (idxOfEndOfTag9 < 0 && databuffer.length > 100) {
sys.log("[ERROR] Over 100 character received but body length still not extractable. Message probably misformed: " + databuffer.toString());
logger.info("[ERROR] Over 100 character received but body length still not extractable. Message probably misformed: " + databuffer.toString());
stream.end();
return;
}
Expand All @@ -255,7 +280,7 @@ function Session(stream, isInitiator, opt) {
var _bodyLengthStr = databuffer.substring(STARTOFTAG9VAL, idxOfEndOfTag9);
var bodyLength = parseInt(_bodyLengthStr, 10);
if (isNaN(bodyLength)) {
sys.log("[ERROR] Unable to parse bodyLength field. Message probably misformed: " + databuffer.toString());
logger.info("[ERROR] Unable to parse bodyLength field. Message probably misformed: " + databuffer.toString());
stream.end();
return;
}
Expand All @@ -273,21 +298,18 @@ function Session(stream, isInitiator, opt) {
}
else {
var debugstr = databuffer.substring(msgLength);
//sys.log("[DEBUG] debugstr:" + debugstr);
//logger.info("[DEBUG] debugstr:" + debugstr);
databuffer = debugstr;
}

sys.log("FIX in: " + msg);
logger.info("FIX in: " + msg);

//====Step 2: Validate message====
var calculatedChecksum = "10=" + checksum(msg.substr(0,msg.length - 7)) + "|";
var extractedChecksum = msg.substr((msg.length - 7));

//sys.log("calculatedChecksum:" + calculatedChecksum + ":");
//sys.log("extractedChecksum :" + extractedChecksum + ":");
var calculatedChecksum = checksum(msg.substr(0,msg.length - 7));
var extractedChecksum = msg.substr(msg.length - 4, 3);

if (calculatedChecksum !== extractedChecksum) {
sys.log("[WARNING] Discarding message because body length or checksum are wrong (expected checksum: "+calculatedChecksum+"): " + msg);
logger.info("[WARNING] Discarding message because body length or checksum are wrong (expected checksum: "+calculatedChecksum+"): " + msg);
continue;
}

Expand All @@ -311,7 +333,7 @@ function Session(stream, isInitiator, opt) {
if (headers.hasOwnProperty(f)) {
var tag = headers[f];
if (tag.charAt(tag.length - 1) != "?" && fix[tag] === undefined) { //If tag is required, but missing
sys.log("[ERROR] tag " + tag + " is required but missing in incoming message: " + msg);
logger.info("[ERROR] tag " + tag + " is required but missing in incoming message: " + msg);
if (loggedIn) {
writefix({
"35": "3",
Expand All @@ -331,7 +353,7 @@ function Session(stream, isInitiator, opt) {
if (trailers.hasOwnProperty(f)) {
var tag = trailers[f];
if (tag.charAt(tag.length - 1) != "?" && fix[tag] === undefined) { //If tag is required, but missing
sys.log("[ERROR] tag " + tag + " is required but missing in incoming message: " + msg);
logger.info("[ERROR] tag " + tag + " is required but missing in incoming message: " + msg);
if (loggedIn) {
writefix({
"35": "3",
Expand All @@ -350,13 +372,13 @@ function Session(stream, isInitiator, opt) {
//====Step 5: Confirm first message is a logon message and it has a heartbeat
var msgType = fix["35"];
if (!loggedIn && msgType != "A") {
sys.log("[ERROR] Logon message expected, received message of type " + msgType);
logger.info("[ERROR] Logon message expected, received message of type " + msgType);
stream.end();
return;
}

if (msgType == "A" && fix["108"] === undefined) {
sys.log("[ERROR] Logon does not have tag 108 (heartbeat) ");
logger.info("[ERROR] Logon does not have tag 108 (heartbeat) ");
stream.end();
return;
}
Expand All @@ -368,7 +390,7 @@ function Session(stream, isInitiator, opt) {
incomingSeqNum++;
}
else if (loggedIn && _seqNum < incomingSeqNum) {
sys.log("[ERROR] Incoming sequence number lower than expected. No way to recover.");
logger.info("[ERROR] Incoming sequence number lower than expected. No way to recover.");
stream.end();
return;
}
Expand All @@ -384,7 +406,7 @@ function Session(stream, isInitiator, opt) {
var incomingTargetCompID = fix["49"];

if (loggedIn && (fixVersion != incomingFixVersion || senderCompID != incomingsenderCompID || targetCompID != incomingTargetCompID)) {
sys.log("[WARNING] Incoming fix version (" + incomingFixVersion + "), sender compid (" + incomingsenderCompID + ") or target compid (" + incomingTargetCompID + ") did not match expected values (" + fixVersion + "," + senderCompID + "," + targetCompID + ")"); /*write session reject*/
logger.info("[WARNING] Incoming fix version (" + incomingFixVersion + "), sender compid (" + incomingsenderCompID + ") or target compid (" + incomingTargetCompID + ") did not match expected values (" + fixVersion + "," + senderCompID + "," + targetCompID + ")"); /*write session reject*/
}


Expand Down Expand Up @@ -425,7 +447,7 @@ function Session(stream, isInitiator, opt) {
heartbeatIntervalID = setInterval(heartbeatCallback, heartbeatDuration);
//heartbeatIntervalIDs.push(intervalID);
this.emit("logon", fix);
sys.log(fix["49"] + " logged on from " + stream.remoteAddress);
logger.info(fix["49"] + " logged on from " + stream.remoteAddress);

if(isInitiator === true){
writefix({
Expand All @@ -436,7 +458,7 @@ function Session(stream, isInitiator, opt) {
break;
default:
}
//sys.log("[DEBUG] databuffer.length: " + databuffer.length + "; databuffer: " + databuffer);
//logger.info("[DEBUG] databuffer.length: " + databuffer.length + "; databuffer: " + databuffer);
this.emit("data", fix);
}

Expand Down

0 comments on commit 6796f94

Please sign in to comment.