Skip to content

Commit

Permalink
ISPN-3878 Unhandled failing ST cancel leads to deadlock
Browse files Browse the repository at this point in the history
* Remove RSVP flag from state transfer commands.
* Add OOB flag to StateRequestCommand(CANCEL_STATE_TRANSFER) commands.
* Add OOB flag when forwarding TxCompletionNotificationCommands during
  state transfer.
  • Loading branch information
danberindei authored and pruivo committed Jan 9, 2014
1 parent 71d7e88 commit 11bea3d
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,9 @@ private static Response processSingleCall(ReplicableCommand command, long timeou
Marshaller marshaller, CommandAwareRpcDispatcher card, boolean oob,
JGroupsTransport transport) throws Exception {
if (trace) log.tracef("Replication task sending %s to single recipient %s with response mode %s", command, destination, mode);
boolean rsvp = isRsvpCommand(command);

// Replay capability requires responses from all members!
/// HACK ALERT! Used for ISPN-1789. Enable RSVP if the command is a state transfer control command or cache topology control command.
boolean rsvp = command instanceof StateRequestCommand || command instanceof StateResponseCommand
|| command instanceof CacheTopologyControlCommand
|| isRsvpCommand(command);

Response retval;
Buffer buf;
buf = marshallCall(marshaller, command);
Expand All @@ -373,10 +369,7 @@ private static RspList<Object> processCalls(ReplicableCommand command, boolean b
Marshaller marshaller, CommandAwareRpcDispatcher card,
boolean oob, boolean ignoreLeavers, boolean totalOrder) throws Exception {
if (trace) log.tracef("Replication task sending %s to addresses %s with response mode %s", command, dests, mode);

/// HACK ALERT! Used for ISPN-1789. Enable RSVP if the command is a cache topology control command.
boolean rsvp = command instanceof CacheTopologyControlCommand
|| isRsvpCommand(command);
boolean rsvp = isRsvpCommand(command);

RspList<Object> retval = null;
Buffer buf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,13 @@ private void sendCancelCommand(Set<Integer> cancelledSegments) {
StateRequestCommand cmd = commandsFactory.buildStateRequestCommand(
StateRequestCommand.Type.CANCEL_STATE_TRANSFER, rpcManager.getAddress(), topologyId,
cancelledSegments);
rpcManager.invokeRemotely(Collections.singleton(source), cmd, rpcManager.getDefaultRpcOptions(false));
try {
rpcManager.invokeRemotely(Collections.singleton(source), cmd, rpcManager.getDefaultRpcOptions(false, false));
} catch (Exception e) {
// Ignore exceptions here, the worst that can happen is that the provider will send some extra state
log.debugf("Caught an exception while cancelling state transfer for segments %s from %s",
cancelledSegments, source);
}
}

public void onStateReceived(int segmentId, boolean isLastChunk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ public Map<Address, Response> forwardCommandIfNeeded(TopologyAffectedCommand com
log.tracef("Forwarding command %s to new targets %s", command, newTargets);
}
// TODO find a way to forward the command async if it was received async
return rpcManager.invokeRemotely(newTargets, command, rpcManager.getDefaultRpcOptions(sync));
// TxCompletionNotificationCommands are the only commands forwarded asynchronously, and they must be OOB
return rpcManager.invokeRemotely(newTargets, command, rpcManager.getDefaultRpcOptions(sync, false));
}
}
return Collections.emptyMap();
Expand Down

0 comments on commit 11bea3d

Please sign in to comment.