Skip to content

Commit

Permalink
test: prevent proxy test conflicts via worker-threads
Browse files Browse the repository at this point in the history
  • Loading branch information
aminya committed Dec 30, 2024
1 parent b7e742a commit d69baf8
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 105 deletions.
8 changes: 4 additions & 4 deletions test/unit/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ if (semver.satisfies(zmq.version, ">= 4.2")) {
* Get a unique id to be used as a port number or IPC path.
* This function is thread-safe and will use a lock file to ensure that the id is unique.
*/
let idFallback = 5000
let idFallback = 6000
async function getUniqueId() {
const idPath = path.resolve(__dirname, "../../tmp/port-id.lock")
await fs.promises.mkdir(path.dirname(idPath), {recursive: true})

try {
// Create the file if it doesn't exist
if (!fs.existsSync(idPath)) {
await fs.promises.writeFile(idPath, "5000", "utf8")
await fs.promises.writeFile(idPath, "6000", "utf8")

/* Windows cannot bind on a ports just above 1014; start higher to be safe. */
return 5000
return 6000
}

await lockfile.lock(idPath, {retries: 10})
Expand Down Expand Up @@ -63,7 +63,7 @@ async function getUniqueId() {
}
}

type Proto = "ipc" | "tcp" | "udp" | "inproc"
export type Proto = "ipc" | "tcp" | "udp" | "inproc"

export async function uniqAddress(proto: Proto) {
const id = await getUniqueId()
Expand Down
111 changes: 11 additions & 100 deletions test/unit/proxy-router-dealer-test.ts
Original file line number Diff line number Diff line change
@@ -1,105 +1,16 @@
import * as semver from "semver"
import * as zmq from "../../src"

import {assert} from "chai"
import {cleanSocket, testProtos, uniqAddress} from "./helpers"
import {Worker} from "worker_threads"
import {testProtos} from "./helpers"

for (const proto of testProtos("tcp", "ipc", "inproc")) {
describe(`proxy with ${proto} router/dealer`, function () {
/* ZMQ < 4.0.5 has no steerable proxy support. */
if (semver.satisfies(zmq.version, "< 4.0.5")) {
return
}

let proxy: zmq.Proxy

let frontAddress: string
let backAddress: string

let req: zmq.Request
let rep: zmq.Reply

beforeEach(async function () {
proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer())

frontAddress = await uniqAddress(proto)
backAddress = await uniqAddress(proto)

req = new zmq.Request()
rep = new zmq.Reply()
})

afterEach(async function () {
/* Closing proxy sockets is only necessary if run() fails. */
proxy.frontEnd.close()
proxy.backEnd.close()

req.close()
rep.close()
global.gc?.()
await Promise.all([cleanSocket(frontAddress), cleanSocket(backAddress)])
})

describe("run", function () {
it("should proxy messages", async function () {
/* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP
<- foo <- <- foo <-
-> bar -> -> bar ->
<- bar <- <- bar <-
pause
resume
-> baz -> -> baz ->
<- baz <- <- baz <-
-> qux -> -> qux ->
<- qux <- <- qux <-
*/

await proxy.frontEnd.bind(frontAddress)
await proxy.backEnd.bind(backAddress)

const done = proxy.run()

const messages = ["foo", "bar", "baz", "qux"]
const received: string[] = []

await req.connect(frontAddress)
await rep.connect(backAddress)

const echo = async () => {
for await (const msg of rep) {
await rep.send(msg)
}
}

const send = async () => {
for (const msg of messages) {
if (received.length === 2) {
proxy.pause()
proxy.resume()
}

await req.send(Buffer.from(msg))

const [res] = await req.receive()
received.push(res.toString())
if (received.length === messages.length) {
break
}
}

rep.close()
}

console.log(
`waiting for messages for proxy with ${proto} router/dealer...`,
)

await Promise.all([echo(), send()])
assert.deepEqual(received, messages)

proxy.terminate()
await done
console.log(`Done proxying with ${proto} router/dealer`)
describe(`proxy with ${proto} router/dealer`, () => {
describe("run", () => {
it("should proxy messages", async () => {
const worker = new Worker(__filename, {
workerData: {
proto,
},
})
await worker.terminate()
})
})
})
Expand Down
98 changes: 98 additions & 0 deletions test/unit/proxy-router-dealer-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import {assert} from "chai"
import * as semver from "semver"
import * as zmq from "../../src"
import type {Proto} from "./helpers"
import {cleanSocket, uniqAddress} from "./helpers"
import {workerData} from "worker_threads"

async function testProxyRouterDealer(proto: Proto) {
/* ZMQ < 4.0.5 has no steerable proxy support. */
if (semver.satisfies(zmq.version, "< 4.0.5")) {
return
}

const proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer())

const frontAddress = await uniqAddress(proto)
const backAddress = await uniqAddress(proto)

const req = new zmq.Request()
const rep = new zmq.Reply()

try {
/* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP
<- foo <- <- foo <-
-> bar -> -> bar ->
<- bar <- <- bar <-
pause
resume
-> baz -> -> baz ->
<- baz <- <- baz <-
-> qux -> -> qux ->
<- qux <- <- qux <-
*/
await proxy.frontEnd.bind(frontAddress)
await proxy.backEnd.bind(backAddress)

const done = proxy.run()

const messages = ["foo", "bar", "baz", "qux"]
const received: string[] = []

await req.connect(frontAddress)
await rep.connect(backAddress)

const echo = async () => {
for await (const msg of rep) {
await rep.send(msg)
}
}

const send = async () => {
for (const msg of messages) {
if (received.length === 2) {
proxy.pause()
proxy.resume()
}

await req.send(Buffer.from(msg))

const [res] = await req.receive()
received.push(res.toString())
if (received.length === messages.length) {
break
}
}

rep.close()
}

console.log(`waiting for messages for proxy with ${proto} router/dealer...`)

await Promise.all([echo(), send()])
assert.deepEqual(received, messages)

proxy.terminate()
await done
console.log(`Done proxying with ${proto} router/dealer`)
} catch (err) {
/* Closing proxy sockets is only necessary if run() fails. */
proxy.frontEnd.close()
proxy.backEnd.close()
throw err
} finally {
req.close()
rep.close()
global.gc?.()
await Promise.all([cleanSocket(frontAddress), cleanSocket(backAddress)])
}
}

// Receive the proto from the main thread
testProxyRouterDealer(workerData.proto as Proto).catch(err => {
console.error(
`Error testing proxy with ${workerData.proto} router/dealer:`,
err,
)
process.exit(1)
})
2 changes: 1 addition & 1 deletion test/unit/proxy-run-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as semver from "semver"
import * as zmq from "../../src"

import {assert} from "chai"
import {testProtos, uniqAddress} from "./helpers"
import {cleanSocket, testProtos, uniqAddress} from "./helpers"
import {isFullError} from "../../src/errors"

for (const proto of testProtos("tcp", "ipc", "inproc")) {
Expand Down

0 comments on commit d69baf8

Please sign in to comment.