Skip to content

Commit

Permalink
feat!: cluster v1.0 support (#40)
Browse files Browse the repository at this point in the history
Support for Cluster v1.0.

There are breaking changes to some API endpoints (they now return ndjson).

BREAKING CHANGE: The client is not compatible with Cluster pre v1.0 anymore. Note: there are no changes to the programmatic API.
  • Loading branch information
Alan Shaw authored Apr 4, 2022
1 parent cb9bafa commit fd965ad
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 15 deletions.
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ services:

cluster0:
container_name: cluster0
image: ipfs/ipfs-cluster:v0.14.5-rc1
image: ipfs/ipfs-cluster:v1.0.0-rc3
depends_on:
- ipfs0
environment:
Expand Down Expand Up @@ -79,7 +79,7 @@ services:

cluster1:
container_name: cluster1
image: ipfs/ipfs-cluster:v0.14.5-rc1
image: ipfs/ipfs-cluster:v1.0.0-rc3
depends_on:
- ipfs1
environment:
Expand All @@ -105,7 +105,7 @@ services:

cluster2:
container_name: cluster2
image: ipfs/ipfs-cluster:v0.14.5-rc1
image: ipfs/ipfs-cluster:v1.0.0-rc3
depends_on:
- ipfs2
environment:
Expand Down
89 changes: 78 additions & 11 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ export const status = async (cluster, cid, { local, signal } = {}) => {
signal
})

return toStausResponse(data)
return toStatusResponse(data)
}

/**
Expand All @@ -194,18 +194,19 @@ export const statusAll = async (
cluster,
{ local, filter, cids, signal } = {}
) => {
const data = await request(cluster, 'pins', {
const stream = streamRequest(cluster, 'pins', {
params: {
local,
filter: filter ? String(filter) : null,
cids: cids ? String(cids) : null
},
signal
})
if (!Array.isArray(data)) {
throw new Error('response data is not an array')
const statuses = []
for await (const d of stream) {
statuses.push(toStatusResponse(d))
}
return data.map((d) => toStausResponse(d))
return statuses
}

/**
Expand Down Expand Up @@ -236,7 +237,7 @@ export const recover = async (cluster, cid, { local, signal } = {}) => {
signal
})

return toStausResponse(data)
return toStatusResponse(data)
}

/**
Expand All @@ -253,11 +254,12 @@ export const metricNames = (cluster, { signal } = {}) =>
* @returns {Promise<API.ClusterInfo[]>}
*/
export const peerList = async (cluster, options = {}) => {
const data = await request(cluster, 'peers', { signal: options.signal })
if (!Array.isArray(data)) {
throw new Error('unexpected response format')
const stream = streamRequest(cluster, 'peers', { signal: options.signal })
const infos = []
for await (const d of stream) {
infos.push(toClusterInfo(d))
}
return data.map(toClusterInfo)
return infos
}

/**
Expand Down Expand Up @@ -298,6 +300,42 @@ const request = async (
}
}

/**
* @param {API.Config} cluster
* @param {string} path
* @param {Object} init
* @param {string} [init.method]
* @param {Record<string, string|number|boolean|null|undefined>} [init.params]
* @param {BodyInit} [init.body]
* @param {AbortSignal} [init.signal]
*/
const streamRequest = async function* (
{ url, headers },
path,
{ method, params, body, signal }
) {
const endpoint = new URL(path, url)
for (const [key, value] of Object.entries(params || {})) {
if (value != null) {
endpoint.searchParams.set(key, String(value))
}
}

method = method || 'GET'
const res = await fetch(endpoint.href, { method, headers, body, signal })

if (!res.ok) {
const msg = `${res.status}: ${res.statusText}`
throw Object.assign(new Error(msg), { response: res })
}

if (!res.body) {
throw Object.assign(new Error('Missing response body'), { response: res })
}

yield* ndjsonParse(res.body)
}

export class Cluster {
/**
* Create a new instance of the cluster client.
Expand Down Expand Up @@ -534,7 +572,7 @@ const toPinResponse = (data) => {
* @param {any} data
* @returns {API.StatusResponse}
*/
const toStausResponse = (data) => {
const toStatusResponse = (data) => {
let peerMap = data.peer_map
if (peerMap) {
peerMap = Object.fromEntries(
Expand Down Expand Up @@ -586,6 +624,35 @@ const toClusterInfo = ({
*/
const getName = (file) => file.name

/**
* @param {ReadableStream<Uint8Array>} stream
*/
const ndjsonParse = async function* (stream) {
const reader = stream.getReader()
const matcher = /\r?\n/
const decoder = new TextDecoder('utf8')
let buffer = ''
try {
while (true) {
const result = await reader.read()

if (result.done) {
break
}

buffer += decoder.decode(result.value, { stream: true })
const parts = buffer.split(matcher)
buffer = parts.pop() || ''
for (const part of parts) yield JSON.parse(part)
}
} finally {
reader.cancel()
reader.releaseLock()
}
buffer += decoder.decode(undefined, { stream: false })
if (buffer) yield JSON.parse(buffer)
}

export const PinTypeBad = 1
export const PinTypeData = 2
export const PinTypeMeta = 3
Expand Down
5 changes: 4 additions & 1 deletion test/all.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ describe('cluster.pin', () => {

assert.equal(status.cid, cid)
for (const pinInfo of Object.values(status.peerMap)) {
assert.ok(['pinning', 'pinned'].includes(pinInfo.status))
assert.ok(
['pinning', 'pinned'].includes(pinInfo.status),
`${pinInfo.status} is pinning or pinned`
)
assertField(pinInfo, 'ipfsPeerId')
}
})
Expand Down

0 comments on commit fd965ad

Please sign in to comment.