Skip to content

Commit

Permalink
Disable ZMQ Reconnect code, reliability is higher without it
Browse files Browse the repository at this point in the history
  • Loading branch information
ahundt committed Apr 29, 2016
1 parent 4801a5f commit 03d8c17
Showing 1 changed file with 34 additions and 20 deletions.
54 changes: 34 additions & 20 deletions src/java/grl/src/grl/ZMQManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class ZMQManager {
int statesLength = 0;
long message_counter = 0;
long noMessageCounter = 0;
long noMessageCounterLimit = 10000;
long noMessageCounterLimit = 9999999;
private grl.flatbuffer.KUKAiiwaStates _currentKUKAiiwaStates = null;
private grl.flatbuffer.KUKAiiwaState _currentKUKAiiwaState = null;
private grl.flatbuffer.KUKAiiwaState _previousKUKAiiwaState = null;
Expand All @@ -36,9 +36,11 @@ public class ZMQManager {
long lastMessageElapsedTime;
long lastMessageTimeoutMilliseconds = 1000;

int retriesAllowed = 3;
int retriesAttempted = 0;

public ZMQManager(String ZMQ_MASTER_URI, ITaskLogger errorlogger) {
super();
this.context = ZMQ.context(1);
this.logger = errorlogger;
_ZMQ_MASTER_URI = ZMQ_MASTER_URI;
}
Expand All @@ -51,6 +53,7 @@ public ZMQManager(String ZMQ_MASTER_URI, ITaskLogger errorlogger) {
public boolean connect(){

logger.info("Waiting for ZMQ connection initialization...");
this.context = ZMQ.context(1);
subscriber = context.socket(ZMQ.DEALER);
subscriber.connect(_ZMQ_MASTER_URI);
subscriber.setRcvHWM(100000);
Expand Down Expand Up @@ -93,35 +96,45 @@ public boolean connect(){
*/
public boolean reconnect(){
logger.info("Disconnecting...");
subscriber.disconnect(_ZMQ_MASTER_URI);
subscriber.close();
return this.connect();
//subscriber.disconnect(_ZMQ_MASTER_URI);
//subscriber.close();
//context.close();
return false;// this.connect();
}

public grl.flatbuffer.KUKAiiwaState waitForNextMessage()
{
boolean haveNextMessage = false;
while(!stop && !haveNextMessage) {

elapsedTime = System.currentTimeMillis() - startTime;
lastMessageElapsedTime = System.currentTimeMillis() - lastMessageStartTime;

if(lastMessageElapsedTime > lastMessageTimeoutMilliseconds)
{

logger.error("Message rate timeout occurred... ZMQ connection seems dead.\nAttempting to restart connection...\n");
this.reconnect();
}
else if (noMessageCounter > noMessageCounterLimit)
{
logger.error("ZMQ connection seems dead, messages arrive empty.\nAttempting to restart connection...\n");
this.reconnect();
}
// elapsedTime = System.currentTimeMillis() - startTime;
// lastMessageElapsedTime = System.currentTimeMillis() - lastMessageStartTime;
//
// if(lastMessageElapsedTime > lastMessageTimeoutMilliseconds)
// {
// retriesAttempted++;
// logger.error("Message rate timeout occurred... ZMQ connection may be dead. Retrying first. \nAttempting to restart connection...\n");
//
//
// if(retriesAttempted > retriesAllowed){
// logger.error("Attempting to restart connection...\n");
// this.reconnect();
// } else {
// lastMessageStartTime = System.currentTimeMillis();
// }
//
// }
//
// else if (noMessageCounter > noMessageCounterLimit)
// {
// logger.error("ZMQ connection seems dead, messages arrive empty.\nAttempting to restart connection...\n");
// this.reconnect();
// }


if((data = subscriber.recv(ZMQ.DONTWAIT))!=null){
/// TODO: BUG! noMessageCounter is always set to 0 here and only incremented below, so it will only ever be 0 or 1
noMessageCounter = 0;

message_counter+=1;
bb = ByteBuffer.wrap(data);

Expand All @@ -148,6 +161,7 @@ else if (noMessageCounter > noMessageCounterLimit)
}

haveNextMessage=true;
noMessageCounter = 0;
lastMessageStartTime = System.currentTimeMillis();
} else {
logger.error("got a ZMQ message but it isn't a valid message, this is an unexpected state that shouldn't occur. please debug me.");
Expand Down

0 comments on commit 03d8c17

Please sign in to comment.