Skip to content

Commit

Permalink
added key/val converters for easier debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
Shahbaz committed Jun 17, 2010
1 parent 6796f94 commit 4f8bb7d
Show file tree
Hide file tree
Showing 14 changed files with 18,679 additions and 47 deletions.
130 changes: 83 additions & 47 deletions fix.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,65 @@ var events = require("events");
var sys = require("sys");
var logger = require('../node-logger/logger').createLogger();

//Message container
//{targetCompiD:{incoming:[], outgoing:[]}}
var msgContainer = {};

function addInMsg(target, msg){
var tgt = msgContainer[target];
if(tgt === undefined){
msgContainer[target] = {incoming:[msg], outgoing:[]};
}
else{
msgContainer[target].incoming.push(msg);
}
}

function addOutMsg(target, msg){
var tgt = msgContainer[target];
if(tgt === undefined){
msgContainer[target] = {incoming:[], outgoing:[msg]};
}
else{
msgContainer[target].outgoing.push(msg);
}
}

function getInMessages(target, beginSeqNo, endSeqNo){
var tgt = msgContainer[target];
if(tgt === undefined){
return [];
}
else{
var msgs = tgt.incoming;
var msgsarr = [];
for(var k in msgs){
var msg = msgs[k];
var seqNo = parseInt(msg["34"],10);
if(seqNo >= beginSeqNo && seqNo <= endSeqNo){
msgarr.push(msg);
}
}
}
}

function getOutMessages(target, beginSeqNo, endSeqNo){
var tgt = msgContainer[target];
if(tgt === undefined){
return [];
}
else{
var msgs = tgt.outgoing;
var msgsarr = [];
for(var k in msgs){
var msg = msgs[k];
var seqNo = parseInt(msg["34"],10);
if(seqNo >= beginSeqNo && seqNo <= endSeqNo){
msgarr.push(msg);
}
}
}
}

//Utility methods

Expand Down Expand Up @@ -52,6 +111,7 @@ function Session(stream, isInitiator, opt) {

stream.setEncoding("ascii");
stream.setTimeout(1000);


this.stream = stream;
var fixVersion = opt.version;
Expand All @@ -63,7 +123,6 @@ function Session(stream, isInitiator, opt) {
var targetCompID = ""; //targetCompID || "";
var heartbeatDuration = 0;

//this.databufferx = "";
var databuffer = "";
var charlen = 0;

Expand All @@ -72,16 +131,12 @@ function Session(stream, isInitiator, opt) {
var outgoingSeqNum = 1;
var timeOfLastIncoming = 0;
var timeOfLastOutgoing = 0;
var resendRequested = false;

//var heartbeatIntervalIDs = [];
var heartbeatIntervalID ;


/*this.addListener("connect", function () {
logger.info("New session started");
});*/
this.addListener("end", function () {
//logger.info("Session ended");
clearInterval(heartbeatIntervalID);
});

Expand Down Expand Up @@ -146,18 +201,12 @@ function Session(stream, isInitiator, opt) {
}
}
}
//var headermsg = headermsgarr.join("");
//headermsgarr = [];

var timestamp = new Date();
headermsgarr.push("52=" , timestamp.getUTCFullYear() , timestamp.getUTCMonth() , timestamp.getUTCDay() , "-" , timestamp.getUTCHours() , ":" , timestamp.getUTCMinutes() , ":" , timestamp.getUTCSeconds() , "." , timestamp.getUTCMilliseconds() , SOHCHAR);
//headermsg += "52=" + timestamp.getUTCFullYear() + timestamp.getUTCMonth() + timestamp.getUTCDay() + "-" + timestamp.getUTCHours() + ":" + timestamp.getUTCMinutes() + ":" + timestamp.getUTCSeconds() + "." + timestamp.getUTCMilliseconds() + SOHCHAR;
headermsgarr.push("56=" , (senderCompIDExtracted || senderCompID) , SOHCHAR);
//headermsg += "56=" + (senderCompIDExtracted || senderCompID) + SOHCHAR;
headermsgarr.push("49=" , (targetCompIDExtracted || targetCompID) , SOHCHAR);
//headermsg += "49=" + (targetCompIDExtracted || targetCompID) + SOHCHAR;
headermsgarr.push("34=" , (outgoingSeqNum++) , SOHCHAR);
//headermsg += "34=" + (outgoingSeqNum++) + SOHCHAR;

var trailermsgarr = [];
for (var f in trailers) {
Expand Down Expand Up @@ -201,46 +250,25 @@ function Session(stream, isInitiator, opt) {

var outmsg = outmsgarr.join("");

/*var checksum = 0;
for (var x in outmsg) {
if (outmsg.hasOwnProperty(x)) {
checksum += outmsg.charCodeAt(x);
}
}
checksum = checksum % 256;
var checksumstr = "";
if (checksum < 10) {
checksumstr = "00" + checksum;
}
else if (checksum >= 10 && checksum < 100) {
checksumstr = "0" + checksum;
}
else {
checksumstr = "" + checksum;
}*/

outmsg += "10=" + checksum(outmsg) + SOHCHAR;

logger.info("FIX out:" + outmsg);
timeOfLastOutgoing = new Date().getTime();
//this.stream.write(outmsg);

addOutMsg(targetCompID, outmsg);

stream.write(outmsg);
}

this.write = writefix;
//this.writeTest = function(){ return writefix;};


//Used for parsing incoming data -------------------------------
//this.handle = function() { return function (data) {
//var handle = function (data) {
function handlefix(data){

//logger.info("++++++++data received: " + data);

//Add data to the buffer (to avoid processing fragmented TCP packets)
//var databuffer = this.databufferx + data;
databuffer = databuffer + data;
timeOfLastIncoming = new Date().getTime();

Expand Down Expand Up @@ -325,9 +353,6 @@ function Session(stream, isInitiator, opt) {
}
}

//var dbg = "{";
//for( var x in fix){ dbg += ","+x+":"+fix[x]+"";}
//sys.debug(dbg+"}");
//====Step 4: Confirm all required fields are available====
for (var f in headers) {
if (headers.hasOwnProperty(f)) {
Expand Down Expand Up @@ -388,16 +413,21 @@ function Session(stream, isInitiator, opt) {
var _seqNum = parseInt(fix["34"], 10);
if (loggedIn && _seqNum == incomingSeqNum) {
incomingSeqNum++;
resendRequested = false;
}
else if (loggedIn && _seqNum < incomingSeqNum) {
logger.info("[ERROR] Incoming sequence number lower than expected. No way to recover.");
stream.end();
return;
}
else if (loggedIn && _seqNum > incomingSeqNum) {
//Missing messages, write rewrite request and don't process any more messages
//Missing messages, write resend request and don't process any more messages
//until the rewrite request is processed
//set flag saying "waiting for rewrite"
if(resendRequested !== true){
resendRequested = true;
writefix({"35":2, "7":incomingSeqNum, "8":0});
}
}

//====Step 7: Confirm compids and fix version match what was in the logon msg
Expand All @@ -408,9 +438,13 @@ function Session(stream, isInitiator, opt) {
if (loggedIn && (fixVersion != incomingFixVersion || senderCompID != incomingsenderCompID || targetCompID != incomingTargetCompID)) {
logger.info("[WARNING] Incoming fix version (" + incomingFixVersion + "), sender compid (" + incomingsenderCompID + ") or target compid (" + incomingTargetCompID + ") did not match expected values (" + fixVersion + "," + senderCompID + "," + targetCompID + ")"); /*write session reject*/
}


//===Step 8: Record incoming message -- might be needed during resync
addInMsg(targetCompID, fix);


//====Step 8: Messages
//====Step 9: Messages
switch (msgType) {
case "0":
//handle heartbeat; break;
Expand All @@ -424,7 +458,14 @@ function Session(stream, isInitiator, opt) {
}); /*write heartbeat*/
break;
case "2":
//handle rewriterequest; break;
var beginSeqNo = parseInt(fix["7"],10);
var endSeqNo = parseInt(fix["16"],10);
outgoingSeqNum = beginSeqNo;
var outmsgs = getOutMessages(targetCompID, beginSeqNo, endSeqNo);
for(var k in outmsgs){
writefix(msgs[k]);
}
//handle resendrequest; break;
break;
case "3":
//handle sessionreject; break;
Expand Down Expand Up @@ -458,7 +499,6 @@ function Session(stream, isInitiator, opt) {
break;
default:
}
//logger.info("[DEBUG] databuffer.length: " + databuffer.length + "; databuffer: " + databuffer);
this.emit("data", fix);
}

Expand Down Expand Up @@ -514,8 +554,6 @@ exports.createServer = function (opt, func) {
});

stream.addListener("data", function(data){session.handle(data);});
//stream.addListener("data",session.handle());
//stream.addListener("data", function (data) { session.emit("data"); });

});

Expand Down Expand Up @@ -546,8 +584,6 @@ exports.createConnection = function (senderCompID, targetCompID, heartbeatsecond
});

stream.addListener("data", function(data){session.handle(data);});
//stream.addListener("data",session.handle());
//stream.addListener("data", function (data) { session.emit("data"); });

return session;
};
Expand Down
Loading

0 comments on commit 4f8bb7d

Please sign in to comment.