From 9be98abe57d261eb69eb111a24438733fa6a1af3 Mon Sep 17 00:00:00 2001 From: Graciano Dias Date: Tue, 14 Jan 2025 20:51:10 +0000 Subject: [PATCH 1/3] Vendor throttle function, add to UI --- README.md | 5 +++-- lib/extension/receive.ts | 2 +- lib/util/settings.schema.json | 5 +++++ lib/util/throttler.ts | 23 +++++++++++++++++++++++ package.json | 1 - pnpm-lock.yaml | 9 --------- 6 files changed, 32 insertions(+), 13 deletions(-) create mode 100644 lib/util/throttler.ts diff --git a/README.md b/README.md index f1e5de1353..b0982f5408 100644 --- a/README.md +++ b/README.md @@ -105,8 +105,9 @@ Zigbee2MQTT is made up of three modules, each developed in its own Github projec ### Developing Zigbee2MQTT uses TypeScript (partially for now). Therefore after making changes to files in the `lib/` directory you need to recompile Zigbee2MQTT. This can be done by executing `pnpm run build`. For faster development instead of running `pnpm run build` you can run `pnpm run build-watch` in another terminal session, this will recompile as you change files. -In first time before building you need to run `pnpm install --include=dev` -Before submitting changes run `pnpm run test-with-coverage`, `pnpm run pretty:check` and `pnpm run eslint` + +Before running any of the commands, you'll first need to run `pnpm install --include=dev`. +Before submitting changes run `pnpm run test:coverage`, `pnpm run pretty:check` and `pnpm run eslint` ## Supported devices diff --git a/lib/extension/receive.ts b/lib/extension/receive.ts index a80cd5bd71..2ee663d2bd 100755 --- a/lib/extension/receive.ts +++ b/lib/extension/receive.ts @@ -3,12 +3,12 @@ import assert from 'node:assert'; import bind from 'bind-decorator'; import debounce from 'debounce'; import stringify from 'json-stable-stringify-without-jsonify'; -import throttle from 'throttleit'; import * as zhc from 'zigbee-herdsman-converters'; import logger from '../util/logger'; import * as settings from '../util/settings'; +import throttle from '../util/throttler'; import utils from '../util/utils'; import Extension from './extension'; diff --git a/lib/util/settings.schema.json b/lib/util/settings.schema.json index dc7ef0366a..c8e28a1ce0 100644 --- a/lib/util/settings.schema.json +++ b/lib/util/settings.schema.json @@ -761,6 +761,11 @@ "title": "QoS", "description": "QoS level for MQTT messages of this device" }, + "throttle": { + "type": "number", + "title": "Throttle", + "description": "The minimum time between payloads in seconds. Payloads received whilst the device is being throttled will be discarded" + }, "debounce": { "type": "number", "title": "Debounce", diff --git a/lib/util/throttler.ts b/lib/util/throttler.ts new file mode 100644 index 0000000000..46270c2967 --- /dev/null +++ b/lib/util/throttler.ts @@ -0,0 +1,23 @@ +export default function throttle(fn: (...args: Args) => Promise, wait: number): (...args: Args) => Promise { + let timeoutId: ReturnType; + let lastCallTime = 0; + + return (...args: Args) => { + clearTimeout(timeoutId); + const now = Date.now(); + const timeSinceLastCall = now - lastCallTime; + const delayForNextCall = wait - timeSinceLastCall; + + if (delayForNextCall <= 0) { + lastCallTime = now; + return fn(...args); + } + + return new Promise((resolve) => { + timeoutId = setTimeout(() => { + lastCallTime = Date.now(); + resolve(fn(...args)); + }, delayForNextCall); + }); + }; +} diff --git a/package.json b/package.json index 1df45a5ba4..5ad33b440c 100644 --- a/package.json +++ b/package.json @@ -56,7 +56,6 @@ "rimraf": "^6.0.1", "semver": "^7.6.3", "source-map-support": "^0.5.21", - "throttleit": "^2.1.0", "winston": "^3.17.0", "winston-syslog": "^2.7.1", "winston-transport": "^4.9.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a77cbadd85..fc0ced93e6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -62,9 +62,6 @@ importers: source-map-support: specifier: ^0.5.21 version: 0.5.21 - throttleit: - specifier: ^2.1.0 - version: 2.1.0 winston: specifier: ^3.17.0 version: 3.17.0 @@ -1626,10 +1623,6 @@ packages: text-hex@1.0.0: resolution: {integrity: sha512-uuVGNWzgJ4yhRaNSiubPY7OjISw4sw4E5Uv0wbjp+OzcbmVU/rsT8ujgcXJhn9ypzsgr5vlzpPqP+MBBKcGvbg==} - throttleit@2.1.0: - resolution: {integrity: sha512-nt6AMGKW1p/70DF/hGBdJB57B8Tspmbp5gfJ8ilhLnt7kkr2ye7hzD6NVG8GGErk2HWF34igrL2CXmNIkzKqKw==} - engines: {node: '>=18'} - thunky@1.1.0: resolution: {integrity: sha512-eHY7nBftgThBqOyHGVN+l8gF0BucP09fMo0oO/Lb0w1OF80dJv+lDVpXG60WMQvkcxAkNybKsrEIE3ZtKGmPrA==} @@ -3305,8 +3298,6 @@ snapshots: text-hex@1.0.0: {} - throttleit@2.1.0: {} - thunky@1.1.0: {} tinybench@2.9.0: {} From 1449cabf83b9a2509f312f30fe76d9fe7ab172f0 Mon Sep 17 00:00:00 2001 From: Graciano Dias Date: Wed, 15 Jan 2025 19:34:26 +0000 Subject: [PATCH 2/3] Use throttle key instead of device address for throttler --- lib/extension/receive.ts | 7 +-- test/extensions/receive.test.ts | 94 +++++++++++++++++++++++++++++++++ test/mocks/zigbeeHerdsman.ts | 10 ++++ 3 files changed, 108 insertions(+), 3 deletions(-) diff --git a/lib/extension/receive.ts b/lib/extension/receive.ts index 2ee663d2bd..7e6b3d8e34 100755 --- a/lib/extension/receive.ts +++ b/lib/extension/receive.ts @@ -71,8 +71,9 @@ export default class Receive extends Extension { } async publishThrottle(device: Device, payload: KeyValue, time: number): Promise { - if (!this.throttlers[device.ieeeAddr]) { - this.throttlers[device.ieeeAddr] = { + const throttleKey = device.ieeeAddr + device.options.throttle; + if (!this.throttlers[throttleKey]) { + this.throttlers[throttleKey] = { publish: throttle(this.publishEntityState, time * 1000), }; } @@ -81,7 +82,7 @@ export default class Receive extends Extension { // By updating cache we make sure that state cache is always up-to-date. this.state.set(device, payload); - await this.throttlers[device.ieeeAddr].publish(device, payload, 'publishThrottle'); + await this.throttlers[throttleKey].publish(device, payload, 'publishThrottle'); } // if debounce_ignore are specified (Array of strings) diff --git a/test/extensions/receive.test.ts b/test/extensions/receive.test.ts index 261b07e4f2..99f8db41d6 100644 --- a/test/extensions/receive.test.ts +++ b/test/extensions/receive.test.ts @@ -349,6 +349,100 @@ describe('Extension: Receive', () => { expect(JSON.parse(mockMQTTPublishAsync.mock.calls[2][1])).toStrictEqual({temperature: 0.09, humidity: 0.01, pressure: 2}); }); + it('Should handle the throttle value changing without restart', async () => { + const device = devices.SPAMMER2; + const throttle_for_testing = 10; + settings.set(['device_options', 'throttle'], throttle_for_testing); + settings.set(['device_options', 'retain'], true); + settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer1'); + const data1 = {measuredValue: 1}; + const payload1 = { + data: data1, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await mockZHEvents.message(payload1); + const data2 = {measuredValue: 2}; + const payload2 = { + data: data2, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await mockZHEvents.message(payload2); + const data3 = {measuredValue: 3}; + const payload3 = { + data: data3, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await mockZHEvents.message(payload3); + await flushPromises(); + + expect(mockMQTTPublishAsync).toHaveBeenCalledTimes(1); + await flushPromises(); + expect(mockMQTTPublishAsync).toHaveBeenCalledTimes(1); + expect(mockMQTTPublishAsync.mock.calls[0][0]).toStrictEqual('zigbee2mqtt/spammer1'); + expect(JSON.parse(mockMQTTPublishAsync.mock.calls[0][1])).toStrictEqual({temperature: 0.01}); + expect(mockMQTTPublishAsync.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true}); + + // Now we try after elapsed time to see if it publishes next message + const timeshift = throttle_for_testing * 2000; + vi.advanceTimersByTime(timeshift); + expect(mockMQTTPublishAsync).toHaveBeenCalledTimes(2); + await flushPromises(); + + expect(mockMQTTPublishAsync.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer1'); + expect(JSON.parse(mockMQTTPublishAsync.mock.calls[1][1])).toStrictEqual({temperature: 0.03}); + expect(mockMQTTPublishAsync.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true}); + + // Set throttle to 1s, without any reloading + settings.set(['device_options', 'throttle'], throttle_for_testing / 10); + + const data4 = {measuredValue: 4}; + const payload4 = { + data: data4, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await mockZHEvents.message(payload4); + // Advance clock by 2s, not enough time for previous throttle value to pass + vi.advanceTimersByTime((throttle_for_testing * 2000) / 10); + + const data5 = {measuredValue: 5}; + const payload5 = { + data: data5, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await mockZHEvents.message(payload5); + await flushPromises(); + + // Check both messages were published straight away + expect(mockMQTTPublishAsync).toHaveBeenCalledTimes(4); + expect(mockMQTTPublishAsync.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer1'); + expect(JSON.parse(mockMQTTPublishAsync.mock.calls[2][1])).toStrictEqual({temperature: 0.04}); + expect(mockMQTTPublishAsync.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true}); + + expect(mockMQTTPublishAsync.mock.calls[3][0]).toStrictEqual('zigbee2mqtt/spammer1'); + expect(JSON.parse(mockMQTTPublishAsync.mock.calls[3][1])).toStrictEqual({temperature: 0.05}); + expect(mockMQTTPublishAsync.mock.calls[3][2]).toStrictEqual({qos: 0, retain: true}); + }); + it('Should throttle multiple messages from spamming devices', async () => { const device = devices.SPAMMER; const throttle_for_testing = 1; diff --git a/test/mocks/zigbeeHerdsman.ts b/test/mocks/zigbeeHerdsman.ts index f5fd767703..a349968710 100644 --- a/test/mocks/zigbeeHerdsman.ts +++ b/test/mocks/zigbeeHerdsman.ts @@ -683,6 +683,16 @@ export const devices = { 'Battery', 'lumi.weather', ), + SPAMMER2: new Device( + 'EndDevice', + '0x0017880104e455ab', + 6539, + 4151, + [new Endpoint(1, [0], [], '0x0017880104e455ab')], + true, + 'Battery', + 'lumi.weather', + ), RTCGQ11LM: new Device( 'EndDevice', '0x0017880104e45523', From 7a872177a099c6dee815d236a88a2c460ee10ad4 Mon Sep 17 00:00:00 2001 From: Graciano Dias Date: Fri, 17 Jan 2025 16:42:40 +0000 Subject: [PATCH 3/3] Add attribution --- lib/util/throttler.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/util/throttler.ts b/lib/util/throttler.ts index 46270c2967..2a79bbd525 100644 --- a/lib/util/throttler.ts +++ b/lib/util/throttler.ts @@ -1,3 +1,5 @@ +// Vendored and modified slightly from https://github.com/sindresorhus/throttleit/blob/main/index.js + export default function throttle(fn: (...args: Args) => Promise, wait: number): (...args: Args) => Promise { let timeoutId: ReturnType; let lastCallTime = 0;