Skip to content

Commit

Permalink
fix: put failed transaction in correct queue after recovering unrespo…
Browse files Browse the repository at this point in the history
…nsive controller (#6507)
  • Loading branch information
AlCalzone authored Nov 20, 2023
1 parent 0d61b8d commit 4a84ff5
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 11 deletions.
38 changes: 28 additions & 10 deletions packages/zwave-js/src/lib/driver/Driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1827,6 +1827,7 @@ export class Driver extends TypedEventEmitter<DriverEventCallbacks>
if (
oldStatus === NodeStatus.Dead
&& node.interviewStage !== InterviewStage.Complete
&& !this._options.testingHooks?.skipNodeInterview
) {
void this.interviewNodeInternal(node);
}
Expand Down Expand Up @@ -3613,7 +3614,9 @@ export class Driver extends TypedEventEmitter<DriverEventCallbacks>
// Re-queue the transaction, so it can get handled next.
// Its message generator may have finished, so reset that too.
transaction.reset();
this.queue.add(transaction.clone());
this.getQueueForTransaction(transaction).add(
transaction.clone(),
);

this._controller?.setStatus(ControllerStatus.Ready);
this._recoveryPhase = ControllerRecoveryPhase.None;
Expand Down Expand Up @@ -3729,7 +3732,9 @@ export class Driver extends TypedEventEmitter<DriverEventCallbacks>
// Re-queue the transaction, so it can get handled next.
// Its message generator may have finished, so reset that too.
transaction.reset();
this.queue.add(transaction.clone());
this.getQueueForTransaction(transaction).add(
transaction.clone(),
);

this._controller?.setStatus(ControllerStatus.Ready);
this._recoveryPhase =
Expand Down Expand Up @@ -4968,9 +4973,18 @@ ${handlers.length} left`,
msg,
transactionSource,
);
this.driverLog.print("SerialAPI command succeeded");
result.resolve(ret);
} catch (e) {
this.driverLog.print(
"SerialAPI command failed: " + getErrorMessage(e),
);
result.reject(e);

// // We may want to handle the error before continuing with the next command
// // Ideally, we'd pause the queue here and resume it when the error was handled,
// // but this system isn't in place yet.
// await wait(250);
}
}
}
Expand Down Expand Up @@ -5148,6 +5162,17 @@ ${handlers.length} left`,
return result;
}

private getQueueForTransaction(t: Transaction): TransactionQueue {
if (
t.priority === MessagePriority.Immediate
|| t.priority === MessagePriority.ControllerImmediate
) {
return this.immediateQueue;
} else {
return this.queue;
}
}

/**
* Sends a message to the Z-Wave stick.
* @param msg The message to send
Expand Down Expand Up @@ -5257,14 +5282,7 @@ ${handlers.length} left`,
transaction.tag = options.tag;

// And queue it
if (
transaction.priority === MessagePriority.Immediate
|| transaction.priority === MessagePriority.ControllerImmediate
) {
this.immediateQueue.add(transaction);
} else {
this.queue.add(transaction);
}
this.getQueueForTransaction(transaction).add(transaction);
transaction.setProgress({ state: TransactionState.Queued });

// If the transaction should expire, start the timeout
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{"k":"cacheFormat","v":1}
{"k":"node.1.isListening","v":true}
{"k":"node.1.isFrequentListening","v":false}
{"k":"node.1.isRouting","v":true}
{"k":"node.1.supportedDataRates","v":[40000,9600,100000]}
{"k":"node.1.protocolVersion","v":3}
{"k":"node.1.nodeType","v":"Controller"}
{"k":"node.1.supportsSecurity","v":false}
{"k":"node.1.supportsBeaming","v":true}
{"k":"node.1.deviceClass","v":{"basic":2,"generic":2,"specific":7}}
{"k":"node.1.endpoint.0.commandClass.0x72","v":{"isSupported":true,"isControlled":false,"secure":false,"version":0}}
{"k":"node.1.endpoint.0.commandClass.0x86","v":{"isSupported":true,"isControlled":false,"secure":false,"version":0}}
{"k":"node.1.endpoint.0.commandClass.0x20","v":{"isSupported":false,"isControlled":true,"secure":false,"version":0}}
{"k":"node.1.endpoint.0.commandClass.0x60","v":{"isSupported":false,"isControlled":true,"secure":false,"version":0}}
{"k":"node.1.interviewStage","v":"Complete"}

// Node 2 can sleep
{"k":"node.2.isListening","v":false}
{"k":"node.2.isFrequentListening","v":false}
{"k":"node.2.isRouting","v":true}
{"k":"node.2.supportedDataRates","v":[40000,9600,100000]}
{"k":"node.2.protocolVersion","v":3}
{"k":"node.2.nodeType","v":"End Node"}
{"k":"node.2.supportsSecurity","v":false}
{"k":"node.2.supportsBeaming","v":true}
{"k":"node.2.deviceClass","v":{"basic":4,"generic":6,"specific":1}}
{"k":"node.2.endpoint.0.commandClass.0x20","v":{"isSupported":true,"isControlled":false,"secure":false,"version":2}}
// Wakeup:
{"k":"node.2.endpoint.0.commandClass.0x84","v":{"isSupported":true,"isControlled":false,"secure":false,"version":1}}
{"k":"node.2.securityClasses.S2_AccessControl","v":false}
{"k":"node.2.securityClasses.S2_Authenticated","v":false}
{"k":"node.2.securityClasses.S2_Unauthenticated","v":false}
{"k":"node.2.securityClasses.S0_Legacy","v":false}
{"k":"node.2.interviewStage","v":"Complete"}
{"k":"node.2.hasSUCReturnRoute","v":true}

// Node 3 can't
{"k":"node.3.isListening","v":true}
{"k":"node.3.isFrequentListening","v":false}
{"k":"node.3.isRouting","v":true}
{"k":"node.3.supportedDataRates","v":[40000,9600,100000]}
{"k":"node.3.protocolVersion","v":3}
{"k":"node.3.nodeType","v":"End Node"}
{"k":"node.3.supportsSecurity","v":false}
{"k":"node.3.supportsBeaming","v":true}
{"k":"node.3.deviceClass","v":{"basic":4,"generic":6,"specific":1}}
{"k":"node.3.endpoint.0.commandClass.0x20","v":{"isSupported":true,"isControlled":false,"secure":false,"version":2}}
{"k":"node.3.securityClasses.S2_AccessControl","v":false}
{"k":"node.3.securityClasses.S2_Authenticated","v":false}
{"k":"node.3.securityClasses.S2_Unauthenticated","v":false}
{"k":"node.3.securityClasses.S0_Legacy","v":false}
{"k":"node.3.interviewStage","v":"Complete"}
{"k":"node.3.hasSUCReturnRoute","v":true}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ import {
MockControllerStateKeys,
} from "../../controller/MockControllerState";

import { NodeStatus, ZWaveErrorCodes, assertZWaveError } from "@zwave-js/core";
import {
CommandClasses,
MessagePriority,
NodeStatus,
ZWaveErrorCodes,
assertZWaveError,
} from "@zwave-js/core";
import path from "node:path";
import Sinon from "sinon";
import { SoftResetRequest } from "../../serialapi/misc/SoftResetRequest";
import {
Expand All @@ -19,6 +26,7 @@ import {
SendDataResponse,
} from "../../serialapi/transport/SendDataMessages";
import { integrationTest } from "../integrationTestSuite";
import { integrationTest as integrationTestMulti } from "../integrationTestSuiteMulti";

let shouldTimeOut: boolean;

Expand Down Expand Up @@ -683,3 +691,141 @@ integrationTest(
},
},
);

integrationTestMulti.only(
"When a command from the immediate queue to a sleeping node triggers the unresponsive controller recovery, the normal send queue does not get blocked",
{
debug: true,

provisioningDirectory: path.join(
__dirname,
"fixtures/sendDataMissingCallbackImmediateToSleepingNode",
),

nodeCapabilities: [
{
id: 2,
capabilities: {
// isFrequentListening: false,
isListening: false,
commandClasses: [
CommandClasses["Wake Up"],
CommandClasses.Basic,
],
},
},
{
id: 3,
capabilities: {
commandClasses: [
CommandClasses.Basic,
],
},
},
],

// additionalDriverOptions: {
// testingHooks: {
// skipNodeInterview: true,
// },
// },

customSetup: async (driver, mockController, mockNodes) => {
// This is almost a 1:1 copy of the default behavior, except that the callback never gets sent
const handleBrokenSendData: MockControllerBehavior = {
async onHostMessage(host, controller, msg) {
// If the controller is operating normally, defer to the default behavior
if (!shouldTimeOut) return false;

if (msg instanceof SendDataRequest) {
// Check if this command is legal right now
const state = controller.state.get(
MockControllerStateKeys.CommunicationState,
) as MockControllerCommunicationState | undefined;
if (
state != undefined
&& state !== MockControllerCommunicationState.Idle
) {
throw new Error(
"Received SendDataRequest while not idle",
);
}

// Put the controller into sending state
controller.state.set(
MockControllerStateKeys.CommunicationState,
MockControllerCommunicationState.Sending,
);

// Notify the host that the message was sent
const res = new SendDataResponse(host, {
wasSent: true,
});
await controller.sendToHost(res.serialize());

return true;
} else if (msg instanceof SendDataAbort) {
// Put the controller into idle state
controller.state.set(
MockControllerStateKeys.CommunicationState,
MockControllerCommunicationState.Idle,
);

shouldTimeOut = false;

return true;
}
},
};
mockController.defineBehavior(handleBrokenSendData);
},
testBody: async (t, driver, nodes, mockController, mockNodes) => {
driver.driverLog.print("TEST START");
driver.driverLog.print("TEST START");
driver.driverLog.print("TEST START");
driver.driverLog.print("TEST START");
driver.driverLog.print("TEST START");
// Circumvent the options validation so the test doesn't take forever
driver.options.timeouts.sendDataAbort = 1000;
driver.options.timeouts.sendDataCallback = 1500;

shouldTimeOut = true;
const [node2, node3] = nodes;

node2.markAsAsleep();
node3.markAsAlive();

const immediateCommand = node2.commandClasses.Basic.withOptions({
priority: MessagePriority.Immediate,
}).set(0).catch((e) => e.code);

await wait(2500);

// Transmission should have been aborted
mockController.assertReceivedHostMessage(
(msg) => msg.functionType === FunctionType.SendDataAbort,
);
// And the stick should have been soft-reset
mockController.assertReceivedHostMessage(
(msg) => msg.functionType === FunctionType.SoftReset,
);
mockController.clearReceivedHostMessages();

const followupCommand = node3.commandClasses.Basic.set(0).catch((
e,
) => e.code);

// Both commands should succeed now.

driver.driverLog.print("normal queue");
driver.driverLog.sendQueue(driver["queue"]);
driver.driverLog.print("immediate queue:");
driver.driverLog.sendQueue(driver["immediateQueue"]);

await immediateCommand;
await followupCommand;

t.pass();
},
},
);

0 comments on commit 4a84ff5

Please sign in to comment.