From 718f9011f46c5096e13d070fe53fbdc923e798c5 Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Wed, 5 Feb 2025 18:09:51 +0100 Subject: [PATCH 01/12] Socks proxy support for TCP and TLS connections. --- package.json | 3 + src/lib/client.ts | 4 + src/lib/connect/index.ts | 12 +++ src/lib/connect/socks.ts | 155 +++++++++++++++++++++++++++++++++++++++ src/lib/connect/tcp.ts | 5 ++ src/lib/connect/tls.ts | 19 ++++- 6 files changed, 196 insertions(+), 2 deletions(-) create mode 100644 src/lib/connect/socks.ts diff --git a/package.json b/package.json index 41c6aec8f..6805a7b19 100644 --- a/package.json +++ b/package.json @@ -111,11 +111,13 @@ "net": false }, "dependencies": { + "@types/duplexify": "^3.6.4", "@types/readable-stream": "^4.0.18", "@types/ws": "^8.5.14", "commist": "^3.2.0", "concat-stream": "^2.0.0", "debug": "^4.4.0", + "duplexify": "^4.1.3", "help-me": "^5.0.0", "lru-cache": "^10.4.3", "minimist": "^1.2.8", @@ -124,6 +126,7 @@ "readable-stream": "^4.7.0", "reinterval": "^1.1.0", "rfdc": "^1.4.1", + "socks": "^2.8.3", "split2": "^4.2.0", "worker-timers": "^7.1.8", "ws": "^8.18.0" diff --git a/src/lib/client.ts b/src/lib/client.ts index 2c0f9623a..c9a821f79 100644 --- a/src/lib/client.ts +++ b/src/lib/client.ts @@ -140,6 +140,10 @@ export interface IClientOptions extends ISecureClientOptions { query?: Record /** Auth string in the format : */ auth?: string + /** Optional SOCKS proxy to use for TCP / TLS connections , i.e. socks5://localhost:1333, socks4://localhost:1333, socks5h://localhost:1333 . Default is socks5h. */ + socks?: string + /** Timeout for establishing a socks connection */ + socksTimout?: number /** Custom ack handler */ customHandleAcks?: AckHandler /** Broker port */ diff --git a/src/lib/connect/index.ts b/src/lib/connect/index.ts index cda09ac2b..610e17d14 100644 --- a/src/lib/connect/index.ts +++ b/src/lib/connect/index.ts @@ -104,6 +104,18 @@ function connect( opts.clientId = opts.query.clientId } + if (isBrowser) { + opts.socks = undefined + } + + if ( + !isBrowser && + opts.socks === undefined && + typeof process !== 'undefined' + ) { + opts.socks = process.env['MQTTJS_SOCKS_PROXY'] + } + if (opts.cert && opts.key) { if (opts.protocol) { if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) { diff --git a/src/lib/connect/socks.ts b/src/lib/connect/socks.ts new file mode 100644 index 000000000..f7fc4a348 --- /dev/null +++ b/src/lib/connect/socks.ts @@ -0,0 +1,155 @@ +import _debug from 'debug' +import duplexify, { Duplexify } from 'duplexify' +import { SocksClient, SocksProxy } from 'socks' +import { lookup } from 'dns' +import { SocksProxyType } from 'socks/typings/common/constants' +import { IStream } from '../shared' +import { promisify } from 'util' + +const debug = _debug('mqttjs:socks') + +function fatal(e: T): T { + try { + if ((e as any).code === undefined) (e as any).code = 'SOCKS' + return e + } catch { + return e + } +} + +function typeFromProtocol( + proto: string, +): [SocksProxyType | undefined, boolean] { + switch (proto) { + case 'socks5h:': + return [5, true] + + case 'socks4a:': + return [4, true] + + case 'socks5:': + return [5, false] + + case 'socks4:': + return [4, false] + + default: + return [undefined, false] + } +} + +function parseSocksUrl(url: string): [SocksProxy, boolean] { + const parsedUrl = new URL(url) + + if (parsedUrl.pathname || parsedUrl.hash || parsedUrl.search) { + throw fatal(new Error('bad SOCKS URL')) + } + + const [type, resolveThroughProxy] = typeFromProtocol(parsedUrl.protocol) + if (!type) { + throw fatal(new Error('bad SOCKS URL: invalid protocol')) + } + + const port = parseInt(parsedUrl.port, 10) + if (Number.isNaN(port)) { + throw fatal(new Error('bad SOCKS URL: invalid port')) + } + + const proxy: SocksProxy = { + host: parsedUrl.hostname, + port, + type, + } + + return [proxy, resolveThroughProxy] +} + +async function connectSocks( + destinationHost: string, + destinationPort: number, + socksUrl: string, + stream: Duplexify, + timeout?: number, +): Promise { + const [proxy, resolveThroughProxy] = parseSocksUrl(socksUrl) + + if (!resolveThroughProxy) { + debug('resolving %s locally', destinationHost) + + destinationHost = ( + await promisify(lookup)(destinationHost, { + family: proxy.type === 4 ? 4 : 0, + }) + ).address + } + + debug( + 'establishing SOCKS%d connection to %s:%d via %s:%d', + proxy.type, + destinationHost, + destinationPort, + proxy.host, + proxy.port, + ) + + const socksClient = new SocksClient({ + command: 'connect', + destination: { + host: destinationHost, + port: destinationPort, + }, + proxy: { ...proxy }, + timeout, + }) + socksClient.connect() + + socksClient.on('established', ({ socket }) => { + stream.setReadable(socket) + stream.setWritable(socket) + + socket.on('close', () => { + debug('SOCKS5 socket closed') + stream.destroy() + }) + + socket.on('error', (e) => { + debug('SOCKS5 socket error: %s', e) + stream.destroy(e) + }) + + stream.emit('connect') + }) + + socksClient.on('error', (e) => { + debug('SOCKS5 failed: %s', e) + stream.destroy(fatal(e)) + }) +} + +export default function openSocks( + destinationHost: string, + destinationPort: number, + socksUrl: string, + timeout?: number, +): IStream { + debug( + 'SOCKS connection to %s:%d via %s', + destinationHost, + destinationPort, + socksUrl, + ) + + const stream = duplexify() + + connectSocks( + destinationHost, + destinationPort, + socksUrl, + stream, + timeout, + ).catch((e) => { + stream.destroy(e) + }) + + return stream +} diff --git a/src/lib/connect/tcp.ts b/src/lib/connect/tcp.ts index ef16daba3..8a0180c35 100644 --- a/src/lib/connect/tcp.ts +++ b/src/lib/connect/tcp.ts @@ -2,6 +2,7 @@ import { StreamBuilder } from '../shared' import net from 'net' import _debug from 'debug' +import openSocks from './socks' const debug = _debug('mqttjs:tcp') /* @@ -12,6 +13,10 @@ const buildStream: StreamBuilder = (client, opts) => { opts.port = opts.port || 1883 opts.hostname = opts.hostname || opts.host || 'localhost' + if (opts.socks) { + return openSocks(opts.hostname, opts.port, opts.socks, opts.socksTimout) + } + const { port, path } = opts const host = opts.hostname diff --git a/src/lib/connect/tls.ts b/src/lib/connect/tls.ts index 877f4bc2d..8b46936ca 100644 --- a/src/lib/connect/tls.ts +++ b/src/lib/connect/tls.ts @@ -1,10 +1,25 @@ -import tls from 'tls' +import tls, { TLSSocket } from 'tls' import net from 'net' import _debug from 'debug' import { StreamBuilder } from '../shared' +import { IClientOptions } from '../client' +import openSocks from './socks' const debug = _debug('mqttjs:tls') +function connect(opts: IClientOptions): TLSSocket { + const { host, port, socks, ...rest } = opts + + return tls.connect( + socks + ? { + ...rest, + socket: openSocks(host, port, socks, opts.socksTimout), + } + : opts, + ) +} + const buildStream: StreamBuilder = (client, opts) => { opts.port = opts.port || 8883 opts.host = opts.hostname || opts.host || 'localhost' @@ -24,7 +39,7 @@ const buildStream: StreamBuilder = (client, opts) => { opts.rejectUnauthorized, ) - const connection = tls.connect(opts) + const connection = connect(opts) connection.on('secureConnect', () => { if (opts.rejectUnauthorized && !connection.authorized) { connection.emit('error', new Error('TLS not authorized')) From ca1deddcf961fd5dd8e9b40b3a4d213e1ec5249d Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Wed, 5 Feb 2025 18:15:12 +0100 Subject: [PATCH 02/12] Documentation. --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 448ca5fa2..dda61cb49 100644 --- a/README.md +++ b/README.md @@ -428,6 +428,8 @@ The arguments are: CONNACK is received - `username`: the username required by your broker, if any - `password`: the password required by your broker, if any + - `socks`: establish TCP and TLS connections via a socks proxy (URL, supported protocols are `socks5://`, `socks5h://`, `socks4://`, `socks4a://`) + - `socksTimeout`: timeout for connecting to the socks proxy - `incomingStore`: a [Store](#store) for the incoming packets - `outgoingStore`: a [Store](#store) for the outgoing packets - `queueQoSZero`: if connection is broken, queue outgoing QoS zero messages (default `true`) @@ -485,6 +487,8 @@ The arguments are: - `forceNativeWebSocket`: set to true if you're having detection issues (i.e. the `ws does not work in the browser` exception) to force the use of native WebSocket. It is important to note that if set to true for the first client created, then all the clients will use native WebSocket. And conversely, if not set or set to false, all will use the detection result. - `unixSocket`: if you want to connect to a unix socket, set this to true +Instead of setting `socks` you can also supple the same parameter via the environment variable `MQTTJS_SOCKS_PROXY`. + In case mqtts (mqtt over tls) is required, the `options` object is passed through to [`tls.connect()`](http://nodejs.org/api/tls.html#tls_tls_connect_options_callback). If using a **self-signed certificate**, set `rejectUnauthorized: false`. However, be cautious as this exposes you to potential man in the middle attacks and isn't recommended for production. For those supporting multiple TLS protocols on a single port, like MQTTS and MQTT over WSS, utilize the `ALPNProtocols` option. This lets you define the Application Layer Protocol Negotiation (ALPN) protocol. You can set `ALPNProtocols` as a string array, Buffer, or Uint8Array based on your setup. From 7f214c4e85b121a7958a406c19b24191bd482484 Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Wed, 5 Feb 2025 18:18:30 +0100 Subject: [PATCH 03/12] Ignore socks for UNIX domain sockets. --- src/lib/connect/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib/connect/index.ts b/src/lib/connect/index.ts index 610e17d14..73eb4cbf8 100644 --- a/src/lib/connect/index.ts +++ b/src/lib/connect/index.ts @@ -104,12 +104,13 @@ function connect( opts.clientId = opts.query.clientId } - if (isBrowser) { + if (isBrowser || opts.unixSocket) { opts.socks = undefined } if ( !isBrowser && + !opts.unixSocket && opts.socks === undefined && typeof process !== 'undefined' ) { From a12827b0cb74f270e5de20cc0fcf03c19633d430 Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Wed, 5 Feb 2025 18:37:17 +0100 Subject: [PATCH 04/12] Avoid bundling dead code. --- esbuild.js | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/esbuild.js b/esbuild.js index f157201d7..019418a5f 100644 --- a/esbuild.js +++ b/esbuild.js @@ -44,6 +44,26 @@ const options = { ) } }, + { + name: 'resolve-socks', + setup(build) { + // socks is not supported in the browser and adds several 100kb to the build, so stub it + build.onResolve({ filter: /socks$/ }, args => { + return { + path: args.path, + namespace: 'socks-stub' + } + }) + + build.onLoad({ filter: /.*/, namespace: 'socks-stub' }, args => { + return { + contents: 'module.exports = {}', + loader: 'js' + } + } + ) + } + }, ], } From b132004ee32314eb95c69a2c854c1b4420c20fd2 Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Thu, 6 Feb 2025 10:54:35 +0100 Subject: [PATCH 05/12] Naming. --- src/lib/client.ts | 4 ++-- src/lib/connect/index.ts | 6 +++--- src/lib/connect/tcp.ts | 9 +++++++-- src/lib/connect/tls.ts | 11 ++++++++--- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/lib/client.ts b/src/lib/client.ts index c9a821f79..0ccbed34c 100644 --- a/src/lib/client.ts +++ b/src/lib/client.ts @@ -141,9 +141,9 @@ export interface IClientOptions extends ISecureClientOptions { /** Auth string in the format : */ auth?: string /** Optional SOCKS proxy to use for TCP / TLS connections , i.e. socks5://localhost:1333, socks4://localhost:1333, socks5h://localhost:1333 . Default is socks5h. */ - socks?: string + socksProxy?: string /** Timeout for establishing a socks connection */ - socksTimout?: number + socksTimeout?: number /** Custom ack handler */ customHandleAcks?: AckHandler /** Broker port */ diff --git a/src/lib/connect/index.ts b/src/lib/connect/index.ts index 73eb4cbf8..fb25afcf4 100644 --- a/src/lib/connect/index.ts +++ b/src/lib/connect/index.ts @@ -105,16 +105,16 @@ function connect( } if (isBrowser || opts.unixSocket) { - opts.socks = undefined + opts.socksProxy = undefined } if ( !isBrowser && !opts.unixSocket && - opts.socks === undefined && + opts.socksProxy === undefined && typeof process !== 'undefined' ) { - opts.socks = process.env['MQTTJS_SOCKS_PROXY'] + opts.socksProxy = process.env['MQTTJS_SOCKS_PROXY'] } if (opts.cert && opts.key) { diff --git a/src/lib/connect/tcp.ts b/src/lib/connect/tcp.ts index 8a0180c35..2b7141df4 100644 --- a/src/lib/connect/tcp.ts +++ b/src/lib/connect/tcp.ts @@ -13,8 +13,13 @@ const buildStream: StreamBuilder = (client, opts) => { opts.port = opts.port || 1883 opts.hostname = opts.hostname || opts.host || 'localhost' - if (opts.socks) { - return openSocks(opts.hostname, opts.port, opts.socks, opts.socksTimout) + if (opts.socksProxy) { + return openSocks( + opts.hostname, + opts.port, + opts.socksProxy, + opts.socksTimeout, + ) } const { port, path } = opts diff --git a/src/lib/connect/tls.ts b/src/lib/connect/tls.ts index 8b46936ca..a8f424eb1 100644 --- a/src/lib/connect/tls.ts +++ b/src/lib/connect/tls.ts @@ -8,13 +8,18 @@ import openSocks from './socks' const debug = _debug('mqttjs:tls') function connect(opts: IClientOptions): TLSSocket { - const { host, port, socks, ...rest } = opts + const { host, port, socksProxy, ...rest } = opts return tls.connect( - socks + socksProxy ? { ...rest, - socket: openSocks(host, port, socks, opts.socksTimout), + socket: openSocks( + host, + port, + socksProxy, + opts.socksTimeout, + ), } : opts, ) From 2eae2d7d99075eac92b7ef62da38af7c930d5a6f Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Thu, 6 Feb 2025 10:56:19 +0100 Subject: [PATCH 06/12] Update readme. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index dda61cb49..ccc7f7c3c 100644 --- a/README.md +++ b/README.md @@ -428,7 +428,7 @@ The arguments are: CONNACK is received - `username`: the username required by your broker, if any - `password`: the password required by your broker, if any - - `socks`: establish TCP and TLS connections via a socks proxy (URL, supported protocols are `socks5://`, `socks5h://`, `socks4://`, `socks4a://`) + - `socksProxy`: establish TCP and TLS connections via a socks proxy (URL, supported protocols are `socks5://`, `socks5h://`, `socks4://`, `socks4a://`) - `socksTimeout`: timeout for connecting to the socks proxy - `incomingStore`: a [Store](#store) for the incoming packets - `outgoingStore`: a [Store](#store) for the outgoing packets @@ -487,7 +487,7 @@ The arguments are: - `forceNativeWebSocket`: set to true if you're having detection issues (i.e. the `ws does not work in the browser` exception) to force the use of native WebSocket. It is important to note that if set to true for the first client created, then all the clients will use native WebSocket. And conversely, if not set or set to false, all will use the detection result. - `unixSocket`: if you want to connect to a unix socket, set this to true -Instead of setting `socks` you can also supple the same parameter via the environment variable `MQTTJS_SOCKS_PROXY`. +Instead of setting `socksProxy` you can also supple the same parameter via the environment variable `MQTTJS_SOCKS_PROXY`. In case mqtts (mqtt over tls) is required, the `options` object is passed through to [`tls.connect()`](http://nodejs.org/api/tls.html#tls_tls_connect_options_callback). If using a **self-signed certificate**, set `rejectUnauthorized: false`. However, be cautious as this exposes you to potential man in the middle attacks and isn't recommended for production. From 738c1ea5278438ca44a24e5cd11d888c85a0cf6f Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Thu, 6 Feb 2025 11:18:05 +0100 Subject: [PATCH 07/12] Update lock file. --- package-lock.json | 114 +++++++++++++++++++++++++++++++++------------- 1 file changed, 83 insertions(+), 31 deletions(-) diff --git a/package-lock.json b/package-lock.json index cfc159275..70644a086 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,11 +9,13 @@ "version": "5.10.3", "license": "MIT", "dependencies": { + "@types/duplexify": "^3.6.4", "@types/readable-stream": "^4.0.18", "@types/ws": "^8.5.14", "commist": "^3.2.0", "concat-stream": "^2.0.0", "debug": "^4.4.0", + "duplexify": "^4.1.3", "help-me": "^5.0.0", "lru-cache": "^10.4.3", "minimist": "^1.2.8", @@ -22,6 +24,7 @@ "readable-stream": "^4.7.0", "reinterval": "^1.1.0", "rfdc": "^1.4.1", + "socks": "^2.8.3", "split2": "^4.2.0", "worker-timers": "^7.1.8", "ws": "^8.18.0" @@ -3495,6 +3498,15 @@ "integrity": "sha512-97mx7gWt4e+kd0wPa1pNEvE4tYGhgBVa4ExWOLcfFohAjF9wERfJ+3qrn7I1e76oHupOvRs4UyYe9xzy0i4TUw==", "dev": true }, + "node_modules/@types/duplexify": { + "version": "3.6.4", + "resolved": "https://registry.npmjs.org/@types/duplexify/-/duplexify-3.6.4.tgz", + "integrity": "sha512-2eahVPsd+dy3CL6FugAzJcxoraWhUghZGEQJns1kTKfCXWKJ5iG/VkaB05wRVrDKHfOFKqb0X0kXh91eE99RZg==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/estree": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.5.tgz", @@ -7380,6 +7392,32 @@ "node": ">= 0.4" } }, + "node_modules/duplexify": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/duplexify/-/duplexify-4.1.3.tgz", + "integrity": "sha512-M3BmBhwJRZsSx38lZyhE53Csddgzl5R7xGJNk7CVddZD6CcmwMCH8J+7AprIrQKH7TonKxaCjcv27Qmf+sQ+oA==", + "license": "MIT", + "dependencies": { + "end-of-stream": "^1.4.1", + "inherits": "^2.0.3", + "readable-stream": "^3.1.1", + "stream-shift": "^1.0.2" + } + }, + "node_modules/duplexify/node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "license": "MIT", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/eastasianwidth": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/eastasianwidth/-/eastasianwidth-0.2.0.tgz", @@ -7448,7 +7486,6 @@ "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", - "dev": true, "dependencies": { "once": "^1.4.0" } @@ -9931,7 +9968,6 @@ "version": "9.0.5", "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-9.0.5.tgz", "integrity": "sha512-zHtQzGojZXTwZTHQqra+ETKd4Sn3vgi7uBmlPoXVWZqYvuKmtI0l/VZTjqGmJY9x88GGOaZ9+G9ES8hC4T4X8g==", - "dev": true, "dependencies": { "jsbn": "1.1.0", "sprintf-js": "^1.1.3" @@ -9943,8 +9979,7 @@ "node_modules/ip-address/node_modules/sprintf-js": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.1.3.tgz", - "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==", - "dev": true + "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==" }, "node_modules/ip-regex": { "version": "4.3.0", @@ -10910,8 +10945,7 @@ "node_modules/jsbn": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/jsbn/-/jsbn-1.1.0.tgz", - "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A==", - "dev": true + "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A==" }, "node_modules/jsesc": { "version": "2.5.2", @@ -12995,7 +13029,6 @@ "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", - "dev": true, "dependencies": { "wrappy": "1" } @@ -15791,7 +15824,6 @@ "version": "4.2.0", "resolved": "https://registry.npmjs.org/smart-buffer/-/smart-buffer-4.2.0.tgz", "integrity": "sha512-94hK0Hh8rPqQl2xXc3HsaBoOXKV20MToPkcXvwbISWLEs+64sBq5kFgn2kJDHb1Pry9yrP0dxrCI9RRci7RXKg==", - "dev": true, "engines": { "node": ">= 6.0.0", "npm": ">= 3.0.0" @@ -15897,7 +15929,6 @@ "version": "2.8.3", "resolved": "https://registry.npmjs.org/socks/-/socks-2.8.3.tgz", "integrity": "sha512-l5x7VUUWbjVFbafGLxPWkYsHIhEvmF85tbIeFZWc8ZPtoMyybuEhL7Jye/ooC4/d48FgOjSJXgsF/AJPYCW8Zw==", - "dev": true, "dependencies": { "ip-address": "^9.0.5", "smart-buffer": "^4.2.0" @@ -16202,10 +16233,10 @@ } }, "node_modules/stream-shift": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.1.tgz", - "integrity": "sha512-AiisoFqQ0vbGcZgQPY1cdP2I76glaVA/RauYR4G4thNFgkTqr90yXTo4LYX60Jl+sIlPNHHdGSwo01AvbKUSVQ==", - "dev": true + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", + "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==", + "license": "MIT" }, "node_modules/stream-to-pull-stream": { "version": "1.7.3", @@ -17802,8 +17833,7 @@ "node_modules/wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", - "dev": true + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" }, "node_modules/write-file-atomic": { "version": "3.0.3", @@ -20479,6 +20509,14 @@ "integrity": "sha512-97mx7gWt4e+kd0wPa1pNEvE4tYGhgBVa4ExWOLcfFohAjF9wERfJ+3qrn7I1e76oHupOvRs4UyYe9xzy0i4TUw==", "dev": true }, + "@types/duplexify": { + "version": "3.6.4", + "resolved": "https://registry.npmjs.org/@types/duplexify/-/duplexify-3.6.4.tgz", + "integrity": "sha512-2eahVPsd+dy3CL6FugAzJcxoraWhUghZGEQJns1kTKfCXWKJ5iG/VkaB05wRVrDKHfOFKqb0X0kXh91eE99RZg==", + "requires": { + "@types/node": "*" + } + }, "@types/estree": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.5.tgz", @@ -23332,6 +23370,29 @@ "gopd": "^1.2.0" } }, + "duplexify": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/duplexify/-/duplexify-4.1.3.tgz", + "integrity": "sha512-M3BmBhwJRZsSx38lZyhE53Csddgzl5R7xGJNk7CVddZD6CcmwMCH8J+7AprIrQKH7TonKxaCjcv27Qmf+sQ+oA==", + "requires": { + "end-of-stream": "^1.4.1", + "inherits": "^2.0.3", + "readable-stream": "^3.1.1", + "stream-shift": "^1.0.2" + }, + "dependencies": { + "readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "requires": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + } + } + } + }, "eastasianwidth": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/eastasianwidth/-/eastasianwidth-0.2.0.tgz", @@ -23393,7 +23454,6 @@ "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", - "dev": true, "requires": { "once": "^1.4.0" } @@ -25206,7 +25266,6 @@ "version": "9.0.5", "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-9.0.5.tgz", "integrity": "sha512-zHtQzGojZXTwZTHQqra+ETKd4Sn3vgi7uBmlPoXVWZqYvuKmtI0l/VZTjqGmJY9x88GGOaZ9+G9ES8hC4T4X8g==", - "dev": true, "requires": { "jsbn": "1.1.0", "sprintf-js": "^1.1.3" @@ -25215,8 +25274,7 @@ "sprintf-js": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.1.3.tgz", - "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==", - "dev": true + "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==" } } }, @@ -25855,8 +25913,7 @@ "jsbn": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/jsbn/-/jsbn-1.1.0.tgz", - "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A==", - "dev": true + "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A==" }, "jsesc": { "version": "2.5.2", @@ -27518,7 +27575,6 @@ "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", - "dev": true, "requires": { "wrappy": "1" } @@ -29566,8 +29622,7 @@ "smart-buffer": { "version": "4.2.0", "resolved": "https://registry.npmjs.org/smart-buffer/-/smart-buffer-4.2.0.tgz", - "integrity": "sha512-94hK0Hh8rPqQl2xXc3HsaBoOXKV20MToPkcXvwbISWLEs+64sBq5kFgn2kJDHb1Pry9yrP0dxrCI9RRci7RXKg==", - "dev": true + "integrity": "sha512-94hK0Hh8rPqQl2xXc3HsaBoOXKV20MToPkcXvwbISWLEs+64sBq5kFgn2kJDHb1Pry9yrP0dxrCI9RRci7RXKg==" }, "snazzy": { "version": "9.0.0", @@ -29650,7 +29705,6 @@ "version": "2.8.3", "resolved": "https://registry.npmjs.org/socks/-/socks-2.8.3.tgz", "integrity": "sha512-l5x7VUUWbjVFbafGLxPWkYsHIhEvmF85tbIeFZWc8ZPtoMyybuEhL7Jye/ooC4/d48FgOjSJXgsF/AJPYCW8Zw==", - "dev": true, "requires": { "ip-address": "^9.0.5", "smart-buffer": "^4.2.0" @@ -29900,10 +29954,9 @@ "dev": true }, "stream-shift": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.1.tgz", - "integrity": "sha512-AiisoFqQ0vbGcZgQPY1cdP2I76glaVA/RauYR4G4thNFgkTqr90yXTo4LYX60Jl+sIlPNHHdGSwo01AvbKUSVQ==", - "dev": true + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", + "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==" }, "stream-to-pull-stream": { "version": "1.7.3", @@ -31099,8 +31152,7 @@ "wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", - "dev": true + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" }, "write-file-atomic": { "version": "3.0.3", From 63296245c275829cf2cd3159a74c61e2ef6f5176 Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Thu, 6 Feb 2025 11:58:46 +0100 Subject: [PATCH 08/12] Simplify. Co-authored-by: Daniel Lando --- src/lib/connect/index.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/lib/connect/index.ts b/src/lib/connect/index.ts index fb25afcf4..6b85857e0 100644 --- a/src/lib/connect/index.ts +++ b/src/lib/connect/index.ts @@ -106,12 +106,7 @@ function connect( if (isBrowser || opts.unixSocket) { opts.socksProxy = undefined - } - - if ( - !isBrowser && - !opts.unixSocket && - opts.socksProxy === undefined && + } else if(opts.socksProxy === undefined && typeof process !== 'undefined' ) { opts.socksProxy = process.env['MQTTJS_SOCKS_PROXY'] From 818abccb0d25e2c69db06b43b023c3e610be0e93 Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Thu, 6 Feb 2025 17:32:51 +0100 Subject: [PATCH 09/12] Trade duplexify against home-grown wrapper. --- package-lock.json | 82 ++++-------------------------- package.json | 2 - src/lib/connect/socks.ts | 107 ++++++++++++++++++++++++++++++++------- 3 files changed, 99 insertions(+), 92 deletions(-) diff --git a/package-lock.json b/package-lock.json index 70644a086..5fd0a5c69 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,13 +9,11 @@ "version": "5.10.3", "license": "MIT", "dependencies": { - "@types/duplexify": "^3.6.4", "@types/readable-stream": "^4.0.18", "@types/ws": "^8.5.14", "commist": "^3.2.0", "concat-stream": "^2.0.0", "debug": "^4.4.0", - "duplexify": "^4.1.3", "help-me": "^5.0.0", "lru-cache": "^10.4.3", "minimist": "^1.2.8", @@ -3498,15 +3496,6 @@ "integrity": "sha512-97mx7gWt4e+kd0wPa1pNEvE4tYGhgBVa4ExWOLcfFohAjF9wERfJ+3qrn7I1e76oHupOvRs4UyYe9xzy0i4TUw==", "dev": true }, - "node_modules/@types/duplexify": { - "version": "3.6.4", - "resolved": "https://registry.npmjs.org/@types/duplexify/-/duplexify-3.6.4.tgz", - "integrity": "sha512-2eahVPsd+dy3CL6FugAzJcxoraWhUghZGEQJns1kTKfCXWKJ5iG/VkaB05wRVrDKHfOFKqb0X0kXh91eE99RZg==", - "license": "MIT", - "dependencies": { - "@types/node": "*" - } - }, "node_modules/@types/estree": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.5.tgz", @@ -7392,32 +7381,6 @@ "node": ">= 0.4" } }, - "node_modules/duplexify": { - "version": "4.1.3", - "resolved": "https://registry.npmjs.org/duplexify/-/duplexify-4.1.3.tgz", - "integrity": "sha512-M3BmBhwJRZsSx38lZyhE53Csddgzl5R7xGJNk7CVddZD6CcmwMCH8J+7AprIrQKH7TonKxaCjcv27Qmf+sQ+oA==", - "license": "MIT", - "dependencies": { - "end-of-stream": "^1.4.1", - "inherits": "^2.0.3", - "readable-stream": "^3.1.1", - "stream-shift": "^1.0.2" - } - }, - "node_modules/duplexify/node_modules/readable-stream": { - "version": "3.6.2", - "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", - "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", - "license": "MIT", - "dependencies": { - "inherits": "^2.0.3", - "string_decoder": "^1.1.1", - "util-deprecate": "^1.0.1" - }, - "engines": { - "node": ">= 6" - } - }, "node_modules/eastasianwidth": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/eastasianwidth/-/eastasianwidth-0.2.0.tgz", @@ -7486,6 +7449,7 @@ "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", + "dev": true, "dependencies": { "once": "^1.4.0" } @@ -13029,6 +12993,7 @@ "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", + "dev": true, "dependencies": { "wrappy": "1" } @@ -16236,6 +16201,7 @@ "version": "1.0.3", "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==", + "dev": true, "license": "MIT" }, "node_modules/stream-to-pull-stream": { @@ -17833,7 +17799,8 @@ "node_modules/wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", + "dev": true }, "node_modules/write-file-atomic": { "version": "3.0.3", @@ -20509,14 +20476,6 @@ "integrity": "sha512-97mx7gWt4e+kd0wPa1pNEvE4tYGhgBVa4ExWOLcfFohAjF9wERfJ+3qrn7I1e76oHupOvRs4UyYe9xzy0i4TUw==", "dev": true }, - "@types/duplexify": { - "version": "3.6.4", - "resolved": "https://registry.npmjs.org/@types/duplexify/-/duplexify-3.6.4.tgz", - "integrity": "sha512-2eahVPsd+dy3CL6FugAzJcxoraWhUghZGEQJns1kTKfCXWKJ5iG/VkaB05wRVrDKHfOFKqb0X0kXh91eE99RZg==", - "requires": { - "@types/node": "*" - } - }, "@types/estree": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.5.tgz", @@ -23370,29 +23329,6 @@ "gopd": "^1.2.0" } }, - "duplexify": { - "version": "4.1.3", - "resolved": "https://registry.npmjs.org/duplexify/-/duplexify-4.1.3.tgz", - "integrity": "sha512-M3BmBhwJRZsSx38lZyhE53Csddgzl5R7xGJNk7CVddZD6CcmwMCH8J+7AprIrQKH7TonKxaCjcv27Qmf+sQ+oA==", - "requires": { - "end-of-stream": "^1.4.1", - "inherits": "^2.0.3", - "readable-stream": "^3.1.1", - "stream-shift": "^1.0.2" - }, - "dependencies": { - "readable-stream": { - "version": "3.6.2", - "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", - "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", - "requires": { - "inherits": "^2.0.3", - "string_decoder": "^1.1.1", - "util-deprecate": "^1.0.1" - } - } - } - }, "eastasianwidth": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/eastasianwidth/-/eastasianwidth-0.2.0.tgz", @@ -23454,6 +23390,7 @@ "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", + "dev": true, "requires": { "once": "^1.4.0" } @@ -27575,6 +27512,7 @@ "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", + "dev": true, "requires": { "wrappy": "1" } @@ -29956,7 +29894,8 @@ "stream-shift": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", - "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==" + "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==", + "dev": true }, "stream-to-pull-stream": { "version": "1.7.3", @@ -31152,7 +31091,8 @@ "wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", + "dev": true }, "write-file-atomic": { "version": "3.0.3", diff --git a/package.json b/package.json index 6805a7b19..95cdacb7f 100644 --- a/package.json +++ b/package.json @@ -111,13 +111,11 @@ "net": false }, "dependencies": { - "@types/duplexify": "^3.6.4", "@types/readable-stream": "^4.0.18", "@types/ws": "^8.5.14", "commist": "^3.2.0", "concat-stream": "^2.0.0", "debug": "^4.4.0", - "duplexify": "^4.1.3", "help-me": "^5.0.0", "lru-cache": "^10.4.3", "minimist": "^1.2.8", diff --git a/src/lib/connect/socks.ts b/src/lib/connect/socks.ts index f7fc4a348..665494a9e 100644 --- a/src/lib/connect/socks.ts +++ b/src/lib/connect/socks.ts @@ -1,13 +1,97 @@ import _debug from 'debug' -import duplexify, { Duplexify } from 'duplexify' +import { Duplex } from 'stream' import { SocksClient, SocksProxy } from 'socks' import { lookup } from 'dns' import { SocksProxyType } from 'socks/typings/common/constants' import { IStream } from '../shared' import { promisify } from 'util' +import { Socket } from 'net' +import assert from 'assert' const debug = _debug('mqttjs:socks') +class ProxyStream extends Duplex { + private _flowing = false + + private _socket?: Socket + + constructor() { + super({ autoDestroy: false }) + + this.cork() + } + + _start(socket: Socket): void { + debug('proxy stream started') + + assert(!this._socket) + + this._socket = socket + + this.uncork() + + if (!this._flowing) socket.pause() + + socket.on('data', this._onData) + socket.on('end', this._onEnd) + socket.on('error', this._onError) + socket.on('close', this._onClose) + + socket.emit('connect') + } + + _write( + chunk: any, + encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ): void { + assert(this._socket) + + this._socket.write(chunk, callback) + } + + _read(size: number): void { + this._flowing = true + + this._socket?.resume?.() + } + + _destroy( + error: Error | null, + callback: (error?: Error | null) => void, + ): void { + this._socket?.destroy?.(error) + + callback(error) + } + + _onData = (chunk: any): void => { + assert(this._socket) + + this._flowing = this.push(chunk) + + if (!this._flowing) this._socket.pause() + } + + _onEnd = (): void => { + debug('proxy stream received EOF') + + this.push(null) + } + + _onClose = (): void => { + debug('proxy stream closed') + + this.destroy() + } + + _onError = (err: any): void => { + debug('proxy stream died with error %s', err) + + this.destroy(err) + } +} + function fatal(e: T): T { try { if ((e as any).code === undefined) (e as any).code = 'SOCKS' @@ -68,7 +152,7 @@ async function connectSocks( destinationHost: string, destinationPort: number, socksUrl: string, - stream: Duplexify, + stream: ProxyStream, timeout?: number, ): Promise { const [proxy, resolveThroughProxy] = parseSocksUrl(socksUrl) @@ -103,22 +187,7 @@ async function connectSocks( }) socksClient.connect() - socksClient.on('established', ({ socket }) => { - stream.setReadable(socket) - stream.setWritable(socket) - - socket.on('close', () => { - debug('SOCKS5 socket closed') - stream.destroy() - }) - - socket.on('error', (e) => { - debug('SOCKS5 socket error: %s', e) - stream.destroy(e) - }) - - stream.emit('connect') - }) + socksClient.on('established', ({ socket }) => stream._start(socket)) socksClient.on('error', (e) => { debug('SOCKS5 failed: %s', e) @@ -139,7 +208,7 @@ export default function openSocks( socksUrl, ) - const stream = duplexify() + const stream = new ProxyStream() connectSocks( destinationHost, From 2b6e20378d7cf5a8659a0b5b2d3b0f7fc8fc09d7 Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Thu, 6 Feb 2025 17:39:45 +0100 Subject: [PATCH 10/12] Paranoia, linter. --- src/lib/connect/index.ts | 3 ++- src/lib/connect/socks.ts | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/lib/connect/index.ts b/src/lib/connect/index.ts index 6b85857e0..486ee7191 100644 --- a/src/lib/connect/index.ts +++ b/src/lib/connect/index.ts @@ -106,7 +106,8 @@ function connect( if (isBrowser || opts.unixSocket) { opts.socksProxy = undefined - } else if(opts.socksProxy === undefined && + } else if ( + opts.socksProxy === undefined && typeof process !== 'undefined' ) { opts.socksProxy = process.env['MQTTJS_SOCKS_PROXY'] diff --git a/src/lib/connect/socks.ts b/src/lib/connect/socks.ts index 665494a9e..b82958112 100644 --- a/src/lib/connect/socks.ts +++ b/src/lib/connect/socks.ts @@ -28,8 +28,6 @@ class ProxyStream extends Duplex { this._socket = socket - this.uncork() - if (!this._flowing) socket.pause() socket.on('data', this._onData) @@ -38,6 +36,8 @@ class ProxyStream extends Duplex { socket.on('close', this._onClose) socket.emit('connect') + + this.uncork() } _write( From ddfe42d32a1e47835b6bfdd0e4c57f08500c0832 Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Fri, 7 Feb 2025 08:30:32 +0100 Subject: [PATCH 11/12] Minor cleanup. --- src/lib/connect/socks.ts | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/lib/connect/socks.ts b/src/lib/connect/socks.ts index b82958112..d7d2b25f4 100644 --- a/src/lib/connect/socks.ts +++ b/src/lib/connect/socks.ts @@ -65,27 +65,26 @@ class ProxyStream extends Duplex { callback(error) } - _onData = (chunk: any): void => { + private _onData = (chunk: any): void => { assert(this._socket) this._flowing = this.push(chunk) - if (!this._flowing) this._socket.pause() } - _onEnd = (): void => { + private _onEnd = (): void => { debug('proxy stream received EOF') this.push(null) } - _onClose = (): void => { + private _onClose = (): void => { debug('proxy stream closed') this.destroy() } - _onError = (err: any): void => { + private _onError = (err: any): void => { debug('proxy stream died with error %s', err) this.destroy(err) From 01f8fd6ddefa26b0ab7d83176677cfd4fa9f77b4 Mon Sep 17 00:00:00 2001 From: Christian Speckner Date: Fri, 7 Feb 2025 14:28:46 +0100 Subject: [PATCH 12/12] Tests. --- src/lib/connect/socks.ts | 27 +- src/lib/connect/tcp.ts | 9 +- src/lib/connect/tls.ts | 9 +- test/node/socks.ts | 528 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 554 insertions(+), 19 deletions(-) create mode 100644 test/node/socks.ts diff --git a/src/lib/connect/socks.ts b/src/lib/connect/socks.ts index d7d2b25f4..21677a392 100644 --- a/src/lib/connect/socks.ts +++ b/src/lib/connect/socks.ts @@ -1,7 +1,7 @@ import _debug from 'debug' import { Duplex } from 'stream' import { SocksClient, SocksProxy } from 'socks' -import { lookup } from 'dns' +import * as dns from 'dns' import { SocksProxyType } from 'socks/typings/common/constants' import { IStream } from '../shared' import { promisify } from 'util' @@ -10,6 +10,11 @@ import assert from 'assert' const debug = _debug('mqttjs:socks') +export interface SocksConnectionOptions { + timeout?: number + lookup?: (hostname: string) => Promise<{ address: string }> +} + class ProxyStream extends Duplex { private _flowing = false @@ -26,6 +31,11 @@ class ProxyStream extends Duplex { assert(!this._socket) + if (this.destroyed) { + socket.destroy(this.errored) + return + } + this._socket = socket if (!this._flowing) socket.pause() @@ -152,15 +162,17 @@ async function connectSocks( destinationPort: number, socksUrl: string, stream: ProxyStream, - timeout?: number, + options: SocksConnectionOptions = {}, ): Promise { + const lookup = options.lookup ?? promisify(dns.lookup) + const [proxy, resolveThroughProxy] = parseSocksUrl(socksUrl) if (!resolveThroughProxy) { debug('resolving %s locally', destinationHost) destinationHost = ( - await promisify(lookup)(destinationHost, { + await lookup(destinationHost, { family: proxy.type === 4 ? 4 : 0, }) ).address @@ -182,14 +194,14 @@ async function connectSocks( port: destinationPort, }, proxy: { ...proxy }, - timeout, + timeout: options.timeout, }) socksClient.connect() socksClient.on('established', ({ socket }) => stream._start(socket)) socksClient.on('error', (e) => { - debug('SOCKS5 failed: %s', e) + debug('SOCKS failed: %s', e) stream.destroy(fatal(e)) }) } @@ -198,7 +210,7 @@ export default function openSocks( destinationHost: string, destinationPort: number, socksUrl: string, - timeout?: number, + options?: SocksConnectionOptions, ): IStream { debug( 'SOCKS connection to %s:%d via %s', @@ -214,8 +226,9 @@ export default function openSocks( destinationPort, socksUrl, stream, - timeout, + options, ).catch((e) => { + debug('SOCKS failed: %s', e) stream.destroy(e) }) diff --git a/src/lib/connect/tcp.ts b/src/lib/connect/tcp.ts index 2b7141df4..40eb0daca 100644 --- a/src/lib/connect/tcp.ts +++ b/src/lib/connect/tcp.ts @@ -14,12 +14,9 @@ const buildStream: StreamBuilder = (client, opts) => { opts.hostname = opts.hostname || opts.host || 'localhost' if (opts.socksProxy) { - return openSocks( - opts.hostname, - opts.port, - opts.socksProxy, - opts.socksTimeout, - ) + return openSocks(opts.hostname, opts.port, opts.socksProxy, { + timeout: opts.socksTimeout, + }) } const { port, path } = opts diff --git a/src/lib/connect/tls.ts b/src/lib/connect/tls.ts index a8f424eb1..0a6cdbaaa 100644 --- a/src/lib/connect/tls.ts +++ b/src/lib/connect/tls.ts @@ -14,12 +14,9 @@ function connect(opts: IClientOptions): TLSSocket { socksProxy ? { ...rest, - socket: openSocks( - host, - port, - socksProxy, - opts.socksTimeout, - ), + socket: openSocks(host, port, socksProxy, { + timeout: opts.socksTimeout, + }), } : opts, ) diff --git a/test/node/socks.ts b/test/node/socks.ts new file mode 100644 index 000000000..d77a0e59d --- /dev/null +++ b/test/node/socks.ts @@ -0,0 +1,528 @@ +import assert from 'assert' +import { AddressInfo, createServer, Server, Socket } from 'net' +import { describe, it, mock, afterEach, beforeEach } from 'node:test' +import openSocks from 'src/lib/connect/socks' + +const PORT = 6666 + +type State5 = 'new' | 'id' | 'connect' + +class MockServer5 { + readonly connect: Promise + + responseID = Buffer.from([0x05, 0x00]) + + responseREQUEST = Buffer.from([ + 0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x12, 0x34, + ]) + + private server = createServer() + + private onConnect: (socket: Socket) => void + + private onError: (err: any) => void + + private socket?: Socket + + private state: State5 = 'new' + + private destination?: [string, number] + + constructor() { + this.connect = new Promise((resolve, reject) => { + this.onConnect = resolve + this.onError = reject + }) + } + + start(): Promise { + this.server.listen(undefined, 'localhost') + + this.server.on('connection', this.onConnection) + + return new Promise((r) => { + this.server.once('listening', () => r(this.port())) + }) + } + + port(): number { + return (this.server.address() as AddressInfo).port + } + + destroy() { + this.server.close() + this.socket?.end() + this.socket?.destroy() + } + + destinationAddress(): string | undefined { + return this.destination?.[0] + } + + destinationPort(): number | undefined { + return this.destination?.[1] + } + + private onConnection = (socket: Socket) => { + if (this.socket) { + socket.destroy() + return this.onError(new Error('double connect to SOCKS5 server')) + } + + this.socket = socket + + socket.on('data', this.onData) + } + + private onData = (chunk: Buffer) => { + switch (this.state) { + case 'new': { + const [ver, nmethods] = chunk + + if ( + ver !== 0x05 || + nmethods === 0 || + chunk.length !== nmethods + 2 + ) { + return this.onError(new Error('bad ID packet')) + } + + if (chunk.subarray(2, 2 + nmethods).indexOf(0x00) === -1) { + return this.onError(new Error('no supported METHOD')) + } + + this.socket?.write?.(this.responseID) + this.state = 'id' + + break + } + + case 'id': + this.destination = this.parseConnect(chunk) + + if (this.destination === undefined) { + return this.onError(new Error('bad REQUEST packet')) + } + + this.socket?.write(this.responseREQUEST) + + this.state = 'connect' + this.socket.off('data', this.onData) + this.onConnect(this.socket) + + break + } + } + + private parseConnect(buf: Buffer): [string, number] | undefined { + const [ver, cmd, rsv, atyp] = buf + + if (ver !== 0x05 || cmd !== 0x01 || rsv !== 0x00) return undefined + + const port = (buf[buf.length - 2] << 8) | buf[buf.length - 1] + + switch (atyp) { + case 0x01: + if (buf.length !== 10) return undefined + + return [buf.subarray(4, 8).join('.'), port] + + case 0x03: + if (buf.length !== 7 + buf[4]) return undefined + + return [buf.subarray(5, 5 + buf[4]).toString('ascii'), port] + + default: + return undefined + } + } +} + +describe('SOCKS layer', { timeout: 1000 }, () => { + let server5!: MockServer5 + let server4: Server | undefined + + beforeEach(() => { + server5 = new MockServer5() + }) + + afterEach(() => { + server5.destroy() + server4?.close() + }) + + it('should resolve hostnames locally for socks5', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + await server5.connect + + stream.destroy() + + await new Promise((r) => { + stream.once('close', r) + }) + + assert.strictEqual(lookup.mock.callCount(), 1) + assert.strictEqual(lookup.mock.calls[0].arguments[0], 'foo.bar') + assert.strictEqual(server5.destinationAddress(), '1.2.3.4') + assert.strictEqual(server5.destinationPort(), 1883) + }) + + it('should resolve hostnames remotely for socks5h', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5h://localhost:${port}`, + { + lookup, + }, + ) + + await server5.connect + + stream.destroy() + + await new Promise((r) => { + stream.once('close', r) + }) + + assert.strictEqual(lookup.mock.callCount(), 0) + assert.strictEqual(server5.destinationAddress(), 'foo.bar') + assert.strictEqual(server5.destinationPort(), 1883) + }) + + it('errors during name resolution should be emitted on stream', async () => { + const ERROR = new Error() + + const lookup = mock.fn((address) => Promise.reject(ERROR)) + + const stream = openSocks('foo.bar', 1883, 'socks5://localhost:6666', { + lookup, + }) + + const error = await new Promise((r) => { + stream.once('error', r) + }) + + assert.strictEqual(error, ERROR) + }) + + it('errors during SOCKS connect should be emitted on stream', async () => { + const port = await server5.start() + server5.responseID = Buffer.from([0x00, 0x00]) + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + const err = await new Promise((r) => { + stream.once('error', r) + }) + + stream.destroy() + + assert(err instanceof Error) + }) + + it('data flows through the stream after SOCKS has connected', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + const socket = await server5.connect + + socket.once('data', (chunk) => socket.write(`${chunk.toString()} pong`)) + + const response = await new Promise((resolve, reject) => { + stream.once('error', (err) => { + reject(err) + }) + + stream.once('data', (chunk) => { + resolve(chunk.toString()) + }) + + stream.write('ping') + }) + + server5.destroy() + stream.destroy() + + assert.strictEqual(response, 'ping pong') + }) + + it('data written to the stream is buffered until SOCKS has connected', async () => { + const port = await server5.start() + + let startNameResolution!: () => undefined + const resolutionPromise = new Promise((r) => { + startNameResolution = r as () => undefined + }) + + const lookup = mock.fn((_: string) => + resolutionPromise.then(() => ({ + address: '1.2.3.4', + })), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + stream.write('ping') + startNameResolution() + + const socket = await server5.connect + + socket.once('data', (chunk) => socket.write(`${chunk.toString()} pong`)) + + const response = await new Promise((resolve, reject) => { + stream.once('error', (err) => { + reject(err) + }) + + stream.once('data', (chunk) => { + resolve(chunk.toString()) + }) + }) + + server5.destroy() + stream.destroy() + + assert.strictEqual(response, 'ping pong') + }) + + it('closing the stream closes the connection', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + const socket = await server5.connect + + stream.destroy() + + await new Promise((r) => { + socket.once('close', r) + }) + }) + + it('closing the connection closes the stream', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + const socket = await server5.connect + socket.destroy() + + await new Promise((r) => { + stream.once('close', r) + }) + }) + + it('resetting the connection errors the stream', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + const socket = await server5.connect + socket.resetAndDestroy() + + const error = await new Promise((r) => { + stream.once('error', r) + }) + + assert(error instanceof Error) + }) + + it('an invalid protocol errors the stream', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks('foo.bar', 1883, `socks://localhost:${port}`, { + lookup, + }) + + const error = await new Promise((r) => { + stream.once('error', r) + }) + + assert(error instanceof Error) + }) + + it('an invalid URL errors the stream', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks('foo.bar', 1883, `socks:localhost:${port}`, { + lookup, + }) + + const error = await new Promise((r) => { + stream.once('error', r) + }) + + assert(error instanceof Error) + }) + + it('should resolve hostnames locally for socks4', async () => { + let onConnect!: (socket: Socket) => void + const connect = new Promise((r) => { + onConnect = mock.fn((socket: Socket) => { + socket.destroy() + r(socket) + }) + }) + + server4 = await new Promise((resolve, reject) => { + const server = createServer(onConnect) + + server.on('listening', () => resolve(server)) + server.on('error', reject) + + server.listen() + }) + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks4://localhost:${(server4.address() as AddressInfo).port}`, + { + lookup, + }, + ) + + const socket = await connect + + socket.destroy() + stream.destroy() + + assert.strictEqual(lookup.mock.callCount(), 1) + assert.strictEqual(lookup.mock.calls[0].arguments[0], 'foo.bar') + }) + + it('should resolve hostnames remotely for socks4a', async () => { + let onConnect!: (socket: Socket) => void + const connect = new Promise((r) => { + onConnect = mock.fn((socket: Socket) => { + socket.destroy() + r(socket) + }) + }) + + server4 = await new Promise((resolve, reject) => { + const server = createServer(onConnect) + + server.on('listening', () => resolve(server)) + server.on('error', reject) + + server.listen() + }) + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks4a://localhost:${(server4.address() as AddressInfo).port}`, + { + lookup, + }, + ) + + const socket = await connect + + socket.destroy() + stream.destroy() + + assert.strictEqual(lookup.mock.callCount(), 0) + }) +})