Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix throttling #25801

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ Zigbee2MQTT is made up of three modules, each developed in its own Github projec
### Developing

Zigbee2MQTT uses TypeScript (partially for now). Therefore after making changes to files in the `lib/` directory you need to recompile Zigbee2MQTT. This can be done by executing `pnpm run build`. For faster development instead of running `pnpm run build` you can run `pnpm run build-watch` in another terminal session, this will recompile as you change files.
In first time before building you need to run `pnpm install --include=dev`
Before submitting changes run `pnpm run test-with-coverage`, `pnpm run pretty:check` and `pnpm run eslint`

Before running any of the commands, you'll first need to run `pnpm install --include=dev`.
Before submitting changes run `pnpm run test:coverage`, `pnpm run pretty:check` and `pnpm run eslint`

## Supported devices

Expand Down
9 changes: 5 additions & 4 deletions lib/extension/receive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ import assert from 'node:assert';
import bind from 'bind-decorator';
import debounce from 'debounce';
import stringify from 'json-stable-stringify-without-jsonify';
import throttle from 'throttleit';

import * as zhc from 'zigbee-herdsman-converters';

import logger from '../util/logger';
import * as settings from '../util/settings';
import throttle from '../util/throttler';
import utils from '../util/utils';
import Extension from './extension';

Expand Down Expand Up @@ -71,8 +71,9 @@ export default class Receive extends Extension {
}

async publishThrottle(device: Device, payload: KeyValue, time: number): Promise<void> {
if (!this.throttlers[device.ieeeAddr]) {
this.throttlers[device.ieeeAddr] = {
const throttleKey = device.ieeeAddr + device.options.throttle;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will keep stale values in memory indefinitely (until Z2M shutdown). Better to invalidate the throttler when the option changes for a device.
Can't we just set the option as "restart mandatory" though (cleaner for such a feature)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review! I am again running my message in ChatGPT, please tell me if unclear.

This will keep stale values in memory indefinitely (until Z2M shutdown). Better to invalidate the throttler when the option changes for a device.

I considered the memory issue, but here's why I think it's okay:

  • Memory Impact: These chatty devices won't be used in large numbers since they can slow down networks. Most users won't have hundreds of them or frequently change their throttle settings.
  • Cache Invalidation: I looked into clearing the throttle cache when settings change, but wasn't sure how to connect the settings UI to the ExtensionReceive component. Do you know an easy way?
  • Current Approach: Given the small real-world memory impact, keeping values in memory seems better than adding complex state management.
  • Alternative Solution: We could auto-clear individual cached values after 2*throttleValue using setTimeout, but this adds complexity for minimal benefit.

Can't we just set the option as "restart mandatory" though (cleaner for such a feature)?

That's not user-friendly. Plus, as we've seen, restarting Home Assistant on HAOS doesn't restart Zigbee2MQTT, which confuses users.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just set the option as "restart mandatory" though (cleaner for such a feature)?

I'm also in favour of this. The restart button will automatically show up in the frontend. Settings requiresRestart true will enable this:

"requiresRestart": true

if (!this.throttlers[throttleKey]) {
this.throttlers[throttleKey] = {
publish: throttle(this.publishEntityState, time * 1000),
};
}
Expand All @@ -81,7 +82,7 @@ export default class Receive extends Extension {
// By updating cache we make sure that state cache is always up-to-date.
this.state.set(device, payload);

await this.throttlers[device.ieeeAddr].publish(device, payload, 'publishThrottle');
await this.throttlers[throttleKey].publish(device, payload, 'publishThrottle');
}

// if debounce_ignore are specified (Array of strings)
Expand Down
5 changes: 5 additions & 0 deletions lib/util/settings.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,11 @@
"title": "QoS",
"description": "QoS level for MQTT messages of this device"
},
"throttle": {
"type": "number",
"title": "Throttle",
"description": "The minimum time between payloads in seconds. Payloads received whilst the device is being throttled will be discarded"
},
"debounce": {
"type": "number",
"title": "Debounce",
Expand Down
23 changes: 23 additions & 0 deletions lib/util/throttler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export default function throttle<Args extends unknown[]>(fn: (...args: Args) => Promise<void>, wait: number): (...args: Args) => Promise<void> {
Copy link
Collaborator

@Nerivec Nerivec Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't test it, but this looks to be messing with context, and also the return type is no longer generic as before.
Can we just bring the same code over, update it to typescript (the type is already in there as d.ts), and avoid potential issues down the road?

And also include a link to the source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that. While my tests pass and it's working on my system, there are some TypeScript issues to address.

I tried implementing your suggestion but ran into problems with the any type. We'd need to enable noImplicitThis to use ThisType. I'm not very experienced with some TypeScript features, so there might be a simple solution I'm missing. I have a partial solution, but it's not perfect:

export default function throttle<T extends (...arguments_: unknown[]) => unknown>(function_: T, wait: number): T {
	if (typeof function_ !== 'function') {
		throw new TypeError(`Expected the first argument to be a \`function\`, got \`${typeof function_}\`.`);
	}

	// TODO: Add `wait` validation too in the next major version.

	let timeoutId: NodeJS.Timeout;
	let lastCallTime = 0;

	return function throttled(...arguments_) {  
		clearTimeout(timeoutId);

		const now = Date.now();
		const timeSinceLastCall = now - lastCallTime;
		const delayForNextCall = wait - timeSinceLastCall;

		if (delayForNextCall <= 0) {
			lastCallTime = now;
			function_.apply(this, arguments_);
		} else {
			timeoutId = setTimeout(() => {
				lastCallTime = Date.now();
				function_.apply(this, arguments_);
			}, delayForNextCall);
		}
	} as T;
}

This still relies on runtime type checking. While we could use runtime type checking, I prefer not to since:

  • The throttle function will likely only be used for this specific throttling feature
  • It's better to let the TypeScript compiler catch issues at compile time
  • We should make the types as strict as possible

You're right about adding attribution. If you know a better way to make the type checker happy, I'm interested. While my current TypeScript implementation works, I'm not sure if changing the function's context could cause problems.

let timeoutId: ReturnType<typeof setTimeout>;
let lastCallTime = 0;

return (...args: Args) => {
clearTimeout(timeoutId);
const now = Date.now();
const timeSinceLastCall = now - lastCallTime;
const delayForNextCall = wait - timeSinceLastCall;

if (delayForNextCall <= 0) {
lastCallTime = now;
return fn(...args);
}

return new Promise<void>((resolve) => {
timeoutId = setTimeout(() => {
lastCallTime = Date.now();
resolve(fn(...args));
}, delayForNextCall);
});
};
}
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
"rimraf": "^6.0.1",
"semver": "^7.6.3",
"source-map-support": "^0.5.21",
"throttleit": "^2.1.0",
"winston": "^3.17.0",
"winston-syslog": "^2.7.1",
"winston-transport": "^4.9.0",
Expand Down
9 changes: 0 additions & 9 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 94 additions & 0 deletions test/extensions/receive.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,100 @@ describe('Extension: Receive', () => {
expect(JSON.parse(mockMQTTPublishAsync.mock.calls[2][1])).toStrictEqual({temperature: 0.09, humidity: 0.01, pressure: 2});
});

it('Should handle the throttle value changing without restart', async () => {
const device = devices.SPAMMER2;
const throttle_for_testing = 10;
settings.set(['device_options', 'throttle'], throttle_for_testing);
settings.set(['device_options', 'retain'], true);
settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer1');
const data1 = {measuredValue: 1};
const payload1 = {
data: data1,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await mockZHEvents.message(payload1);
const data2 = {measuredValue: 2};
const payload2 = {
data: data2,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await mockZHEvents.message(payload2);
const data3 = {measuredValue: 3};
const payload3 = {
data: data3,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await mockZHEvents.message(payload3);
await flushPromises();

expect(mockMQTTPublishAsync).toHaveBeenCalledTimes(1);
await flushPromises();
expect(mockMQTTPublishAsync).toHaveBeenCalledTimes(1);
expect(mockMQTTPublishAsync.mock.calls[0][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(mockMQTTPublishAsync.mock.calls[0][1])).toStrictEqual({temperature: 0.01});
expect(mockMQTTPublishAsync.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true});

// Now we try after elapsed time to see if it publishes next message
const timeshift = throttle_for_testing * 2000;
vi.advanceTimersByTime(timeshift);
expect(mockMQTTPublishAsync).toHaveBeenCalledTimes(2);
await flushPromises();

expect(mockMQTTPublishAsync.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(mockMQTTPublishAsync.mock.calls[1][1])).toStrictEqual({temperature: 0.03});
expect(mockMQTTPublishAsync.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true});

// Set throttle to 1s, without any reloading
settings.set(['device_options', 'throttle'], throttle_for_testing / 10);

const data4 = {measuredValue: 4};
const payload4 = {
data: data4,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await mockZHEvents.message(payload4);
// Advance clock by 2s, not enough time for previous throttle value to pass
vi.advanceTimersByTime((throttle_for_testing * 2000) / 10);

const data5 = {measuredValue: 5};
const payload5 = {
data: data5,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await mockZHEvents.message(payload5);
await flushPromises();

// Check both messages were published straight away
expect(mockMQTTPublishAsync).toHaveBeenCalledTimes(4);
expect(mockMQTTPublishAsync.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(mockMQTTPublishAsync.mock.calls[2][1])).toStrictEqual({temperature: 0.04});
expect(mockMQTTPublishAsync.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true});

expect(mockMQTTPublishAsync.mock.calls[3][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(mockMQTTPublishAsync.mock.calls[3][1])).toStrictEqual({temperature: 0.05});
expect(mockMQTTPublishAsync.mock.calls[3][2]).toStrictEqual({qos: 0, retain: true});
});

it('Should throttle multiple messages from spamming devices', async () => {
const device = devices.SPAMMER;
const throttle_for_testing = 1;
Expand Down
10 changes: 10 additions & 0 deletions test/mocks/zigbeeHerdsman.ts
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,16 @@ export const devices = {
'Battery',
'lumi.weather',
),
SPAMMER2: new Device(
'EndDevice',
'0x0017880104e455ab',
6539,
4151,
[new Endpoint(1, [0], [], '0x0017880104e455ab')],
true,
'Battery',
'lumi.weather',
),
RTCGQ11LM: new Device(
'EndDevice',
'0x0017880104e45523',
Expand Down