Skip to content

Commit

Permalink
unsure of the change
Browse files Browse the repository at this point in the history
  • Loading branch information
Shahbaz committed Aug 16, 2010
1 parent ca5de32 commit 9f99ba2
Showing 1 changed file with 82 additions and 68 deletions.
150 changes: 82 additions & 68 deletions fix.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
//expire messages (for resend, send gap fill instead)

//DOING:
//Web control
//Persistence with node-dirty--how to recover from crash
//while recovering, read in stored messages as 'pos-dup"
//update sequence numbers

var net = require("net");
var events = require("events");
var sys = require("sys");
var logger = require('../node-logger/logger').createLogger();
var Dirty = require('../node-dirty/lib/dirty').Dirty;
var fixtagnums = require('./resources/fixtagnums').keyvals;


Expand Down Expand Up @@ -81,7 +84,7 @@ function getOutMessages(target, beginSeqNo, endSeqNo){

//Utility methods

function tag2txt = function(msg){ return Object.keys(msg).map(function(key){return fixtagnums[key]+"="+msg[key];}).join("|");}
var tag2txt = function(msg){ return Object.keys(msg).map(function(key){return fixtagnums[key]+"="+msg[key];}).join("|");}

logger.format = function(level, timestamp, message) {
return [timestamp.getUTCFullYear() ,"/", timestamp.getUTCMonth() ,"/", timestamp.getUTCDay() , "-" , timestamp.getUTCHours() , ":" , timestamp.getUTCMinutes() , ":" , timestamp.getUTCSeconds() , "." , timestamp.getUTCMilliseconds() , " [" , level, "] ", message].join("");
Expand Down Expand Up @@ -148,6 +151,8 @@ function Session(stream, isInitiator, opt) {

var heartbeatIntervalID ;

var datastore ;


this.addListener("end", function () {
clearInterval(heartbeatIntervalID);
Expand Down Expand Up @@ -468,81 +473,90 @@ function Session(stream, isInitiator, opt) {


//===Step 8: Record incoming message -- might be needed during resync
addInMsg(targetCompID, fix);
if(loggedIn){
datastore.add(fix);
//addInMsg(targetCompID, fix);
}
logger.debug("FIXMAP in: " + tag2txt(fix));



//====Step 9: Messages
switch (msgType) {
case "0":
//handle heartbeat; break;
break;
case "1":
//handle testrequest; break;
var testReqID = fix["112"];
writefix({
"35": "0",
"112": testReqID
}); /*write heartbeat*/
break;
case "2":
var beginSeqNo = parseInt(fix["7"],10);
var endSeqNo = parseInt(fix["16"],10);
outgoingSeqNum = beginSeqNo;
var outmsgs = getOutMessages(targetCompID, beginSeqNo, endSeqNo);
for(var k in outmsgs){
var resendmsg = msgs[k];
resendmsg["43"] = "Y";
resendmsg["122"] = resendmsg["52"];
writefix(resendmsg);
}
//handle resendrequest; break;
break;
case "3":
//handle sessionreject; break;
break;
case "4":
//Gap fill mode
if(fix["123"] === "Y"){
var newSeqNo = parseInt(fix["36"],10);

if(newSeqNo <= incomingSeqNo){
//TODO: Reject, sequence number may only be incremented
case "0":
//handle heartbeat; break;
break;
case "1":
//handle testrequest; break;
var testReqID = fix["112"];
writefix({
"35": "0",
"112": testReqID
}); /*write heartbeat*/
break;
case "2":
var beginSeqNo = parseInt(fix["7"],10);
var endSeqNo = parseInt(fix["16"],10);
outgoingSeqNum = beginSeqNo;
var outmsgs = getOutMessages(targetCompID, beginSeqNo, endSeqNo);
for(var k in outmsgs){
var resendmsg = msgs[k];
resendmsg["43"] = "Y";
resendmsg["122"] = resendmsg["52"];
writefix(resendmsg);
}
else{
incomingSeqNo = newSeqNo;
//handle resendrequest; break;
break;
case "3":
//handle sessionreject; break;
break;
case "4":
//Gap fill mode
if(fix["123"] === "Y"){
var newSeqNo = parseInt(fix["36"],10);

if(newSeqNo <= incomingSeqNo){
//TODO: Reject, sequence number may only be incremented
}
else{
incomingSeqNo = newSeqNo;
}
}
}
//Reset mode
//Reset mode is handled in step 6, when confirming incoming seqnums
//handle seqreset; break;
case "5":
//handle logout; break;
writefix({
"35": "5"
}); /*write a logout ack right back*/
break;
case "A":
//handle logon; break;
fixVersion = fix["8"];
senderCompID = fix["56"];
targetCompID = fix["49"];
heartbeatDuration = parseInt(fix["108"], 10) * 1000;
loggedIn = true;
heartbeatIntervalID = setInterval(heartbeatCallback, heartbeatDuration);
//heartbeatIntervalIDs.push(intervalID);
this.emit("logon", targetCompID,stream);
logger.info(fix["49"] + " logged on from " + stream.remoteAddress);

if(isInitiator === true){
break;
//Reset mode
//Reset mode is handled in step 6, when confirming incoming seqnums
//handle seqreset; break;
case "5":
//handle logout; break;
writefix({
"35": "A",
"108": fix["108"]
}); /*write logon ack*/
}
break;
default:
"35": "5"
}); /*write a logout ack right back*/
break;
case "A":
//handle logon; break;
fixVersion = fix["8"];
senderCompID = fix["56"];
targetCompID = fix["49"];

//create data store
datastore = new Dirty(senderCompID + '-' + targetCompID + '-' + fixVersion + '.dat');
datastore.add(fix);

heartbeatDuration = parseInt(fix["108"], 10) * 1000;
loggedIn = true;
heartbeatIntervalID = setInterval(heartbeatCallback, heartbeatDuration);
//heartbeatIntervalIDs.push(intervalID);
this.emit("logon", targetCompID,stream);
logger.info(fix["49"] + " logged on from " + stream.remoteAddress);

if(isInitiator === true){
writefix({
"35": "A",
"108": fix["108"]
}); /*write logon ack*/
}
break;
default:
}
this.emit("data", fix);
}
Expand Down

0 comments on commit 9f99ba2

Please sign in to comment.