From fd965ad652b3243cbf6e1dbcc9c4a0a38caffea8 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 4 Apr 2022 13:45:58 +0100 Subject: [PATCH] feat!: cluster v1.0 support (#40) Support for Cluster v1.0. There are breaking changes to some API endpoints (they now return ndjson). BREAKING CHANGE: The client is not compatible with Cluster pre v1.0 anymore. Note: there are no changes to the programmatic API. --- docker-compose.yml | 6 ++-- src/index.js | 89 ++++++++++++++++++++++++++++++++++++++++------ test/all.spec.js | 5 ++- 3 files changed, 85 insertions(+), 15 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index fe7d692..174a896 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -44,7 +44,7 @@ services: cluster0: container_name: cluster0 - image: ipfs/ipfs-cluster:v0.14.5-rc1 + image: ipfs/ipfs-cluster:v1.0.0-rc3 depends_on: - ipfs0 environment: @@ -79,7 +79,7 @@ services: cluster1: container_name: cluster1 - image: ipfs/ipfs-cluster:v0.14.5-rc1 + image: ipfs/ipfs-cluster:v1.0.0-rc3 depends_on: - ipfs1 environment: @@ -105,7 +105,7 @@ services: cluster2: container_name: cluster2 - image: ipfs/ipfs-cluster:v0.14.5-rc1 + image: ipfs/ipfs-cluster:v1.0.0-rc3 depends_on: - ipfs2 environment: diff --git a/src/index.js b/src/index.js index 2169890..0340f85 100644 --- a/src/index.js +++ b/src/index.js @@ -182,7 +182,7 @@ export const status = async (cluster, cid, { local, signal } = {}) => { signal }) - return toStausResponse(data) + return toStatusResponse(data) } /** @@ -194,7 +194,7 @@ export const statusAll = async ( cluster, { local, filter, cids, signal } = {} ) => { - const data = await request(cluster, 'pins', { + const stream = streamRequest(cluster, 'pins', { params: { local, filter: filter ? String(filter) : null, @@ -202,10 +202,11 @@ export const statusAll = async ( }, signal }) - if (!Array.isArray(data)) { - throw new Error('response data is not an array') + const statuses = [] + for await (const d of stream) { + statuses.push(toStatusResponse(d)) } - return data.map((d) => toStausResponse(d)) + return statuses } /** @@ -236,7 +237,7 @@ export const recover = async (cluster, cid, { local, signal } = {}) => { signal }) - return toStausResponse(data) + return toStatusResponse(data) } /** @@ -253,11 +254,12 @@ export const metricNames = (cluster, { signal } = {}) => * @returns {Promise} */ export const peerList = async (cluster, options = {}) => { - const data = await request(cluster, 'peers', { signal: options.signal }) - if (!Array.isArray(data)) { - throw new Error('unexpected response format') + const stream = streamRequest(cluster, 'peers', { signal: options.signal }) + const infos = [] + for await (const d of stream) { + infos.push(toClusterInfo(d)) } - return data.map(toClusterInfo) + return infos } /** @@ -298,6 +300,42 @@ const request = async ( } } +/** + * @param {API.Config} cluster + * @param {string} path + * @param {Object} init + * @param {string} [init.method] + * @param {Record} [init.params] + * @param {BodyInit} [init.body] + * @param {AbortSignal} [init.signal] + */ +const streamRequest = async function* ( + { url, headers }, + path, + { method, params, body, signal } +) { + const endpoint = new URL(path, url) + for (const [key, value] of Object.entries(params || {})) { + if (value != null) { + endpoint.searchParams.set(key, String(value)) + } + } + + method = method || 'GET' + const res = await fetch(endpoint.href, { method, headers, body, signal }) + + if (!res.ok) { + const msg = `${res.status}: ${res.statusText}` + throw Object.assign(new Error(msg), { response: res }) + } + + if (!res.body) { + throw Object.assign(new Error('Missing response body'), { response: res }) + } + + yield* ndjsonParse(res.body) +} + export class Cluster { /** * Create a new instance of the cluster client. @@ -534,7 +572,7 @@ const toPinResponse = (data) => { * @param {any} data * @returns {API.StatusResponse} */ -const toStausResponse = (data) => { +const toStatusResponse = (data) => { let peerMap = data.peer_map if (peerMap) { peerMap = Object.fromEntries( @@ -586,6 +624,35 @@ const toClusterInfo = ({ */ const getName = (file) => file.name +/** + * @param {ReadableStream} stream + */ +const ndjsonParse = async function* (stream) { + const reader = stream.getReader() + const matcher = /\r?\n/ + const decoder = new TextDecoder('utf8') + let buffer = '' + try { + while (true) { + const result = await reader.read() + + if (result.done) { + break + } + + buffer += decoder.decode(result.value, { stream: true }) + const parts = buffer.split(matcher) + buffer = parts.pop() || '' + for (const part of parts) yield JSON.parse(part) + } + } finally { + reader.cancel() + reader.releaseLock() + } + buffer += decoder.decode(undefined, { stream: false }) + if (buffer) yield JSON.parse(buffer) +} + export const PinTypeBad = 1 export const PinTypeData = 2 export const PinTypeMeta = 3 diff --git a/test/all.spec.js b/test/all.spec.js index 0dbb193..20b3966 100644 --- a/test/all.spec.js +++ b/test/all.spec.js @@ -216,7 +216,10 @@ describe('cluster.pin', () => { assert.equal(status.cid, cid) for (const pinInfo of Object.values(status.peerMap)) { - assert.ok(['pinning', 'pinned'].includes(pinInfo.status)) + assert.ok( + ['pinning', 'pinned'].includes(pinInfo.status), + `${pinInfo.status} is pinning or pinned` + ) assertField(pinInfo, 'ipfsPeerId') } })