Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add intrinsic support for SOCKS proxies #1966

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ The arguments are:
CONNACK is received
- `username`: the username required by your broker, if any
- `password`: the password required by your broker, if any
- `socks`: establish TCP and TLS connections via a socks proxy (URL, supported protocols are `socks5://`, `socks5h://`, `socks4://`, `socks4a://`)
- `socksTimeout`: timeout for connecting to the socks proxy
- `incomingStore`: a [Store](#store) for the incoming packets
- `outgoingStore`: a [Store](#store) for the outgoing packets
- `queueQoSZero`: if connection is broken, queue outgoing QoS zero messages (default `true`)
Expand Down Expand Up @@ -485,6 +487,8 @@ The arguments are:
- `forceNativeWebSocket`: set to true if you're having detection issues (i.e. the `ws does not work in the browser` exception) to force the use of native WebSocket. It is important to note that if set to true for the first client created, then all the clients will use native WebSocket. And conversely, if not set or set to false, all will use the detection result.
- `unixSocket`: if you want to connect to a unix socket, set this to true

Instead of setting `socks` you can also supple the same parameter via the environment variable `MQTTJS_SOCKS_PROXY`.

In case mqtts (mqtt over tls) is required, the `options` object is passed through to [`tls.connect()`](http://nodejs.org/api/tls.html#tls_tls_connect_options_callback). If using a **self-signed certificate**, set `rejectUnauthorized: false`. However, be cautious as this exposes you to potential man in the middle attacks and isn't recommended for production.

For those supporting multiple TLS protocols on a single port, like MQTTS and MQTT over WSS, utilize the `ALPNProtocols` option. This lets you define the Application Layer Protocol Negotiation (ALPN) protocol. You can set `ALPNProtocols` as a string array, Buffer, or Uint8Array based on your setup.
Expand Down
20 changes: 20 additions & 0 deletions esbuild.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,26 @@ const options = {
)
}
},
{
name: 'resolve-socks',
setup(build) {
// socks is not supported in the browser and adds several 100kb to the build, so stub it
build.onResolve({ filter: /socks$/ }, args => {
return {
path: args.path,
namespace: 'socks-stub'
}
})

build.onLoad({ filter: /.*/, namespace: 'socks-stub' }, args => {
return {
contents: 'module.exports = {}',
loader: 'js'
}
}
)
}
},
],
}

Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@
"net": false
},
"dependencies": {
"@types/duplexify": "^3.6.4",
"@types/readable-stream": "^4.0.18",
"@types/ws": "^8.5.14",
"commist": "^3.2.0",
"concat-stream": "^2.0.0",
"debug": "^4.4.0",
"duplexify": "^4.1.3",
"help-me": "^5.0.0",
"lru-cache": "^10.4.3",
"minimist": "^1.2.8",
Expand All @@ -124,6 +126,7 @@
"readable-stream": "^4.7.0",
"reinterval": "^1.1.0",
"rfdc": "^1.4.1",
"socks": "^2.8.3",
"split2": "^4.2.0",
"worker-timers": "^7.1.8",
"ws": "^8.18.0"
Expand Down
4 changes: 4 additions & 0 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ export interface IClientOptions extends ISecureClientOptions {
query?: Record<string, string>
/** Auth string in the format <username>:<password> */
auth?: string
/** Optional SOCKS proxy to use for TCP / TLS connections , i.e. socks5://localhost:1333, socks4://localhost:1333, socks5h://localhost:1333 . Default is socks5h. */
socks?: string
DirtyHairy marked this conversation as resolved.
Show resolved Hide resolved
/** Timeout for establishing a socks connection */
socksTimout?: number
DirtyHairy marked this conversation as resolved.
Show resolved Hide resolved
/** Custom ack handler */
customHandleAcks?: AckHandler
/** Broker port */
Expand Down
13 changes: 13 additions & 0 deletions src/lib/connect/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ function connect(
opts.clientId = opts.query.clientId
}

if (isBrowser || opts.unixSocket) {
opts.socks = undefined
}

if (
!isBrowser &&
!opts.unixSocket &&
opts.socks === undefined &&
typeof process !== 'undefined'
) {
opts.socks = process.env['MQTTJS_SOCKS_PROXY']
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? Could you remove the env var here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the environment variable in order to provide an easy way to route unmodified applications that use mqtt.js through a proxy. In or case the culprit is Node-RED.

It would be definitely preferable if these had support for using a proxy themselves, but configuring a proxy is one of those networking options where it is not uncommon to offer configuration through the environment.


if (opts.cert && opts.key) {
if (opts.protocol) {
if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) {
Expand Down
155 changes: 155 additions & 0 deletions src/lib/connect/socks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import _debug from 'debug'
import duplexify, { Duplexify } from 'duplexify'
import { SocksClient, SocksProxy } from 'socks'
import { lookup } from 'dns'
import { SocksProxyType } from 'socks/typings/common/constants'
import { IStream } from '../shared'
import { promisify } from 'util'
DirtyHairy marked this conversation as resolved.
Show resolved Hide resolved

const debug = _debug('mqttjs:socks')

function fatal<T>(e: T): T {
try {
if ((e as any).code === undefined) (e as any).code = 'SOCKS'
return e
} catch {
return e
}
}

function typeFromProtocol(
proto: string,
): [SocksProxyType | undefined, boolean] {
switch (proto) {
case 'socks5h:':
return [5, true]

case 'socks4a:':
return [4, true]

case 'socks5:':
return [5, false]

case 'socks4:':
return [4, false]

default:
return [undefined, false]
}
}

function parseSocksUrl(url: string): [SocksProxy, boolean] {
const parsedUrl = new URL(url)

if (parsedUrl.pathname || parsedUrl.hash || parsedUrl.search) {
throw fatal(new Error('bad SOCKS URL'))
}

const [type, resolveThroughProxy] = typeFromProtocol(parsedUrl.protocol)
if (!type) {
throw fatal(new Error('bad SOCKS URL: invalid protocol'))
}

const port = parseInt(parsedUrl.port, 10)
if (Number.isNaN(port)) {
throw fatal(new Error('bad SOCKS URL: invalid port'))
}

const proxy: SocksProxy = {
host: parsedUrl.hostname,
port,
type,
}

return [proxy, resolveThroughProxy]
}

async function connectSocks(
destinationHost: string,
destinationPort: number,
socksUrl: string,
stream: Duplexify,
timeout?: number,
): Promise<void> {
const [proxy, resolveThroughProxy] = parseSocksUrl(socksUrl)

if (!resolveThroughProxy) {
debug('resolving %s locally', destinationHost)

destinationHost = (
await promisify(lookup)(destinationHost, {
family: proxy.type === 4 ? 4 : 0,
})
).address
}

debug(
'establishing SOCKS%d connection to %s:%d via %s:%d',
proxy.type,
destinationHost,
destinationPort,
proxy.host,
proxy.port,
)

const socksClient = new SocksClient({
command: 'connect',
destination: {
host: destinationHost,
port: destinationPort,
},
proxy: { ...proxy },
timeout,
})
socksClient.connect()

socksClient.on('established', ({ socket }) => {
stream.setReadable(socket)
stream.setWritable(socket)

socket.on('close', () => {
debug('SOCKS5 socket closed')
stream.destroy()
})

socket.on('error', (e) => {
debug('SOCKS5 socket error: %s', e)
stream.destroy(e)
})

stream.emit('connect')
})

socksClient.on('error', (e) => {
debug('SOCKS5 failed: %s', e)
stream.destroy(fatal(e))
})
}

export default function openSocks(
destinationHost: string,
destinationPort: number,
socksUrl: string,
timeout?: number,
): IStream {
debug(
'SOCKS connection to %s:%d via %s',
destinationHost,
destinationPort,
socksUrl,
)

const stream = duplexify()

connectSocks(
destinationHost,
destinationPort,
socksUrl,
stream,
timeout,
).catch((e) => {
stream.destroy(e)
})

return stream
}
5 changes: 5 additions & 0 deletions src/lib/connect/tcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { StreamBuilder } from '../shared'

import net from 'net'
import _debug from 'debug'
import openSocks from './socks'

const debug = _debug('mqttjs:tcp')
/*
Expand All @@ -12,6 +13,10 @@ const buildStream: StreamBuilder = (client, opts) => {
opts.port = opts.port || 1883
opts.hostname = opts.hostname || opts.host || 'localhost'

if (opts.socks) {
return openSocks(opts.hostname, opts.port, opts.socks, opts.socksTimout)
}

const { port, path } = opts
const host = opts.hostname

Expand Down
19 changes: 17 additions & 2 deletions src/lib/connect/tls.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
import tls from 'tls'
import tls, { TLSSocket } from 'tls'
import net from 'net'
import _debug from 'debug'
import { StreamBuilder } from '../shared'
import { IClientOptions } from '../client'
import openSocks from './socks'

const debug = _debug('mqttjs:tls')

function connect(opts: IClientOptions): TLSSocket {
const { host, port, socks, ...rest } = opts

return tls.connect(
socks
? {
...rest,
socket: openSocks(host, port, socks, opts.socksTimout),
}
: opts,
)
}

const buildStream: StreamBuilder = (client, opts) => {
opts.port = opts.port || 8883
opts.host = opts.hostname || opts.host || 'localhost'
Expand All @@ -24,7 +39,7 @@ const buildStream: StreamBuilder = (client, opts) => {
opts.rejectUnauthorized,
)

const connection = tls.connect(opts)
const connection = connect(opts)
connection.on('secureConnect', () => {
if (opts.rejectUnauthorized && !connection.authorized) {
connection.emit('error', new Error('TLS not authorized'))
Expand Down
Loading