Skip to content

Commit

Permalink
added fix fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Shahbaz committed Aug 17, 2010
1 parent 9f99ba2 commit 7156092
Show file tree
Hide file tree
Showing 7 changed files with 993 additions and 17,148 deletions.
74 changes: 37 additions & 37 deletions fix.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var events = require("events");
var sys = require("sys");
var logger = require('../node-logger/logger').createLogger();
var Dirty = require('../node-dirty/lib/dirty').Dirty;
var fixtagnums = require('./resources/fixtagnums').keyvals;
var tags = require('./resources/fixtagnums').keyvals;


//logger.setLevel('debug');
Expand Down Expand Up @@ -56,7 +56,7 @@ function getInMessages(target, beginSeqNo, endSeqNo){
var msgsarr = [];
for(var k in msgs){
var msg = msgs[k];
var seqNo = parseInt(msg["34"],10);
var seqNo = parseInt(msg[tags["MsgSeqNum"]],10);
if(seqNo >= beginSeqNo && seqNo <= endSeqNo){
msgarr.push(msg);
}
Expand All @@ -74,7 +74,7 @@ function getOutMessages(target, beginSeqNo, endSeqNo){
var msgsarr = [];
for(var k in msgs){
var msg = msgs[k];
var seqNo = parseInt(msg["34"],10);
var seqNo = parseInt(msg[tags["MsgSeqNum"]],10);
if(seqNo >= beginSeqNo && seqNo <= endSeqNo){
msgarr.push(msg);
}
Expand All @@ -84,7 +84,7 @@ function getOutMessages(target, beginSeqNo, endSeqNo){

//Utility methods

var tag2txt = function(msg){ return Object.keys(msg).map(function(key){return fixtagnums[key]+"="+msg[key];}).join("|");}
var tag2txt = function(msg){ return Object.keys(msg).map(function(key){return tags[key]+"="+msg[key];}).join("|");}

logger.format = function(level, timestamp, message) {
return [timestamp.getUTCFullYear() ,"/", timestamp.getUTCMonth() ,"/", timestamp.getUTCDay() , "-" , timestamp.getUTCHours() , ":" , timestamp.getUTCMinutes() , ":" , timestamp.getUTCSeconds() , "." , timestamp.getUTCMilliseconds() , " [" , level, "] ", message].join("");
Expand Down Expand Up @@ -190,16 +190,16 @@ function Session(stream, isInitiator, opt) {
//this.write = function(msg){ writefix(msg);};
function writefix(msg) {

var senderCompIDExtracted = msg["56"];
var targetCompIDExtracted = msg["49"];
var senderCompIDExtracted = msg[tags["TargetCompID"]];
var targetCompIDExtracted = msg[tags["SenderCompID"]];

delete msg["9"]; //bodylength
delete msg["10"]; //checksum
delete msg["52"]; //timestamp
delete msg["8"]; //fixversion
delete msg["56"]; //sendercompid
delete msg["49"]; //targetcompid
delete msg["34"]; //seqnum
delete msg[tags["BodyLength"]]; //bodylength
delete msg[tags["CheckSum"]]; //checksum
delete msg[tags["SendingTime"]]; //timestamp
delete msg[tags["BeginString"]]; //fixversion
delete msg[tags["TargetCompID"]]; //sendercompid
delete msg[tags["SenderCompID"]]; //targetcompid
delete msg[tags["MsgSeqNum"]]; //seqnum
var headermsgarr = [];
for (var f in Object.keys(headers)) {
//if (headers.hasOwnProperty(f)) {
Expand Down Expand Up @@ -380,7 +380,7 @@ function Session(stream, isInitiator, opt) {
if (loggedIn) {
writefix({
"35": "3",
"45": fix["34"],
"45": fix[tags["MsgSeqNum"]],
"58": "MissingTags"
}); /*write session reject*/
}
Expand All @@ -400,7 +400,7 @@ function Session(stream, isInitiator, opt) {
if (loggedIn) {
writefix({
"35": "3",
"45": fix["34"],
"45": fix[tags["MsgSeqNum"]],
"58": "MissingTags"
}); /*write session reject*/
}
Expand All @@ -413,25 +413,25 @@ function Session(stream, isInitiator, opt) {
}

//====Step 5: Confirm first message is a logon message and it has a heartbeat
var msgType = fix["35"];
var msgType = fix[tags["MsgType"]];
if (!loggedIn && msgType != "A") {
logger.error("[ERROR] Logon message expected, received message of type " + msgType);
stream.end();
return;
}

if (msgType == "A" && fix["108"] === undefined) {
if (msgType == "A" && fix[tags["HeartBtInt"]] === undefined) {
logger.error("[ERROR] Logon does not have tag 108 (heartbeat) ");
stream.end();
return;
}


//====Step 6: Confirm incoming sequence number====
var _seqNum = parseInt(fix["34"], 10);
if(fix["35"]==="4" /*seq reset*/ && (fix["123"] === undefined || fix["123"] === "N")){
var _seqNum = parseInt(fix[tags["MsgSeqNum"]], 10);
if(fix[tags["MsgType"]]==="4" /*seq reset*/ && (fix[tags["GapFillFlag"]] === undefined || fix[tags["GapFillFlag"]] === "N")){
logger.warn("Requence Reset request received: " + msg);
var resetseqno = parseInt(fix["36"],10);
var resetseqno = parseInt(fix[tags["NewSeqNo"]],10);
if(resetseqno <= incomingSeqnum){
//TODO: Reject, sequence number may only be incremented
}
Expand All @@ -444,7 +444,7 @@ function Session(stream, isInitiator, opt) {
resendRequested = false;
}
else if (loggedIn && _seqNum < incomingSeqNum) {
var posdup = fix["43"];
var posdup = fix[tags["PossDupFlag"]];
if(posdup !== undefined && posdup === "Y"){
logger.warn("This posdup message's seqno has already been processed. Ignoring: "+msg);
}
Expand All @@ -463,9 +463,9 @@ function Session(stream, isInitiator, opt) {
}

//====Step 7: Confirm compids and fix version match what was in the logon msg
var incomingFixVersion = fix["8"];
var incomingsenderCompID = fix["56"];
var incomingTargetCompID = fix["49"];
var incomingFixVersion = fix[tags["BeginString"]];
var incomingsenderCompID = fix[tags["TargetCompID"]];
var incomingTargetCompID = fix[tags["SenderCompID"]];

if (loggedIn && (fixVersion != incomingFixVersion || senderCompID != incomingsenderCompID || targetCompID != incomingTargetCompID)) {
logger.warn("[WARNING] Incoming fix version (" + incomingFixVersion + "), sender compid (" + incomingsenderCompID + ") or target compid (" + incomingTargetCompID + ") did not match expected values (" + fixVersion + "," + senderCompID + "," + targetCompID + ")"); /*write session reject*/
Expand All @@ -488,21 +488,21 @@ function Session(stream, isInitiator, opt) {
break;
case "1":
//handle testrequest; break;
var testReqID = fix["112"];
var testReqID = fix[tags["TestReqID"]];
writefix({
"35": "0",
"112": testReqID
}); /*write heartbeat*/
break;
case "2":
var beginSeqNo = parseInt(fix["7"],10);
var endSeqNo = parseInt(fix["16"],10);
var beginSeqNo = parseInt(fix[tags["BeginSeqNo"]],10);
var endSeqNo = parseInt(fix[tags["EndSeqNo"]],10);
outgoingSeqNum = beginSeqNo;
var outmsgs = getOutMessages(targetCompID, beginSeqNo, endSeqNo);
for(var k in outmsgs){
var resendmsg = msgs[k];
resendmsg["43"] = "Y";
resendmsg["122"] = resendmsg["52"];
resendmsg[tags["PossDupFlag"]] = "Y";
resendmsg[tags["OrigSendingTime"]] = resendmsg["SendingTime"];
writefix(resendmsg);
}
//handle resendrequest; break;
Expand All @@ -512,8 +512,8 @@ function Session(stream, isInitiator, opt) {
break;
case "4":
//Gap fill mode
if(fix["123"] === "Y"){
var newSeqNo = parseInt(fix["36"],10);
if(fix[tags["GapFillFlag"]] === "Y"){
var newSeqNo = parseInt(fix[tags["NewSeqNo"]],10);

if(newSeqNo <= incomingSeqNo){
//TODO: Reject, sequence number may only be incremented
Expand All @@ -534,25 +534,25 @@ function Session(stream, isInitiator, opt) {
break;
case "A":
//handle logon; break;
fixVersion = fix["8"];
senderCompID = fix["56"];
targetCompID = fix["49"];
fixVersion = fix[tags["BeginString"]];
senderCompID = fix[tags["TargetCompID"]];
targetCompID = fix[tags["SenderCompID"]];

//create data store
datastore = new Dirty(senderCompID + '-' + targetCompID + '-' + fixVersion + '.dat');
datastore.add(fix);

heartbeatDuration = parseInt(fix["108"], 10) * 1000;
heartbeatDuration = parseInt(fix[tags["HeartBtInt"]], 10) * 1000;
loggedIn = true;
heartbeatIntervalID = setInterval(heartbeatCallback, heartbeatDuration);
//heartbeatIntervalIDs.push(intervalID);
this.emit("logon", targetCompID,stream);
logger.info(fix["49"] + " logged on from " + stream.remoteAddress);
logger.info(fix[tags["SenderCompID"]] + " logged on from " + stream.remoteAddress);

if(isInitiator === true){
writefix({
"35": "A",
"108": fix["108"]
"108": fix[tags["HeartBtInt"]]
}); /*write logon ack*/
}
break;
Expand Down
Loading

0 comments on commit 7156092

Please sign in to comment.