diff --git a/README.md b/README.md index 0ffbc1f..6255c62 100644 --- a/README.md +++ b/README.md @@ -162,7 +162,22 @@ Publishes any kind of message encrypted to the group. The function wraps `ssb.db ### `ssb.tribes2.listMembers(groupId, { live }) => source` -Returns a pull stream source listing every known member of the group with id `groupId`. Note: lists members whether or not they've accepted the invite. If `live` is true, then it keeps the stream open and also outputs new members. +Returns a pull stream source listing the root feed id of every member of the +group with id `groupId`. Note: lists members whether or not they've accepted the +invite. + +If `live` is true, then it keeps the stream open and also outputs updates to +membership as new members are added / excluded. + +Each update emitted from the source is the updated complete state for the +current preferred epoch of the group in the format: + +```js +{ + added: [feedId, feedId, ...], + toExclude: [feedId, ...] +} +``` ### `ssb.tribes2.listInvites() => source` diff --git a/index.js b/index.js index ab883ab..7f3ecac 100644 --- a/index.js +++ b/index.js @@ -6,15 +6,17 @@ const { promisify } = require('util') const pull = require('pull-stream') const paraMap = require('pull-paramap') const pullMany = require('pull-many') +const pullFlatMerge = require('pull-flat-merge') +const pullAbortable = require('pull-abortable') const pullDefer = require('pull-defer') const chunk = require('lodash.chunk') const clarify = require('clarify-error') const { where, and, + or, isDecrypted, type, - live, toPullStream, } = require('ssb-db2/operators') const { @@ -30,12 +32,14 @@ const { } = require('private-group-spec') const { SecretKey } = require('ssb-private-group-keys') const { fromMessageSigil, isBendyButtV1FeedSSBURI } = require('ssb-uri2') + +const startListeners = require('./listeners') const buildGroupId = require('./lib/build-group-id') const addTangles = require('./lib/tangles/add-tangles') const publishAndPrune = require('./lib/prune-publish') const MetaFeedHelpers = require('./lib/meta-feed-helpers') +const Epochs = require('./lib/epochs') const { groupRecp } = require('./lib/operators') -// const Epochs = require('./lib/epochs') module.exports = { name: 'tribes2', @@ -60,7 +64,7 @@ module.exports = { findOrCreateGroupWithoutMembers, getRootFeedIdFromMsgId, } = MetaFeedHelpers(ssb) - // const { getEpochs } = Epochs(ssb) + const { getPreferredEpoch, getMembers } = Epochs(ssb) function create(opts = {}, cb) { if (cb === undefined) return promisify(create)(opts) @@ -208,6 +212,8 @@ module.exports = { pull( listMembers(groupId), + pull.map((info) => info.added), + pull.flatten(), pull.collect((err, beforeMembers) => { // prettier-ignore if (err) return cb(clarify(err, "Couldn't get old member list when excluding members")) @@ -255,8 +261,7 @@ module.exports = { pull.collect((err) => { // prettier-ignore if (err) return cb(clarify(err, "Couldn't re-add remaining members when excluding members")) - - return cb() + cb(null) }) ) }) @@ -319,43 +324,54 @@ module.exports = { } function listMembers(groupId, opts = {}) { - const deferedSource = pullDefer.source() + const { live } = opts + const deferredSource = pullDefer.source() get(groupId, (err, group) => { // prettier-ignore - if (err) return deferedSource.abort(clarify(err, 'Failed to get group info when listing members')) + if (err) return deferredSource.abort(clarify(err, 'Failed to get group info when listing members')) // prettier-ignore - if (group.excluded) return deferedSource.abort( new Error("We're excluded from this group, can't list members")) + if (group.excluded) return deferredSource.abort( new Error("We're excluded from this group, can't list members")) + + if (!live) { + getPreferredEpoch(groupId, (err, epoch) => { + // prettier-ignore + if (err) return deferredSource.abort(clarify(err, 'failed to load preferred epoch')) + + getMembers(epoch.id, (err, res) => { + // prettier-ignore + if (err) return deferredSource.abort(clarify(err, 'error getting members')) + + const source = pull.once(res) + deferredSource.resolve(source) + }) + }) + return + } + let abortable = pullAbortable() const source = pull( - ssb.db.query( - where( - and( - isDecrypted('box2'), - type('group/add-member'), - groupRecp(groupId) - ) - ), - opts.live ? live({ old: true }) : null, - toPullStream() - ), - pull.map((msg) => msg.value.content.recps.slice(1)), - pull.flatten(), - pull.unique() - ) + getPreferredEpoch.stream(groupId, { live }), + pull.map((epoch) => { + abortable.abort() + abortable = pullAbortable() - deferedSource.resolve(source) + return pull(getMembers.stream(epoch.id, { live }), abortable) + }), + pullFlatMerge() + ) + deferredSource.resolve(source) }) - return deferedSource + return deferredSource } function listInvites() { - const deferedSource = pullDefer.source() + const deferredSource = pullDefer.source() getMyGroups((err, myGroups) => { // prettier-ignore - if (err) return deferedSource.abort(clarify(err, 'Failed to list group IDs when listing invites')) + if (err) return deferredSource.abort(clarify(err, 'Failed to list group IDs when listing invites')) const source = pull( // get all the groupIds we've heard of from invites @@ -374,10 +390,10 @@ module.exports = { pull.asyncMap(getGroupInviteData) ) - deferedSource.resolve(source) + deferredSource.resolve(source) }) - return deferedSource + return deferredSource // listInvites helpers @@ -385,6 +401,7 @@ module.exports = { const myGroups = new Set() pull( + // TODO replace with pull.values (unless want "round-robbin" sampling) pullMany([ ssb.box2.listGroupIds(), ssb.box2.listGroupIds({ excluded: true }), @@ -481,97 +498,8 @@ module.exports = { findOrCreateAdditionsFeed((err) => { // prettier-ignore if (err) return cb(clarify(err, 'Error finding or creating additions feed when starting ssb-tribes2')) - return cb() - }) - - ssb.metafeeds.findOrCreate((err, myRoot) => { - // prettier-ignore - if (err) return cb(clarify(err, 'Error getting own root in start()')) - - // check if we've been excluded - pull( - ssb.db.query( - where(and(isDecrypted('box2'), type('group/exclude-member'))), - live({ old: true }), - toPullStream() - ), - pull.filter(isExcludeMember), - pull.filter((msg) => - // it's an exclusion of us - msg.value.content.excludes.includes(myRoot.id) - ), - pull.drain( - (msg) => { - const groupId = msg.value.content.recps[0] - ssb.box2.excludeGroupInfo(groupId, (err) => { - // prettier-ignore - if (err) return cb(clarify(err, 'Error on excluding group info after finding exclusion of ourselves')) - }) - }, - (err) => { - // prettier-ignore - if (err) return cb(clarify(err, 'Error on looking for exclude messages excluding us')) - } - ) - ) - - // look for new epochs that we're added to - pull( - ssb.db.query( - where(and(isDecrypted('box2'), type('group/add-member'))), - live({ old: true }), - toPullStream() - ), - pull.filter(isAddMember), - // groups/epochs we're added to - pull.filter((msg) => { - return msg.value.content.recps.includes(myRoot.id) - }), - // to find new epochs we only check groups we've accepted the invite to - paraMap((msg, cb) => { - pull( - ssb.box2.listGroupIds(), - pull.filter((groupId) => groupId === msg.value.content.recps[0]), - pull.take(1), - pull.collect((err, groupIds) => { - // prettier-ignore - if (err) return cb(clarify(err, "Error getting groups we're already in when looking for new epochs")) - cb(null, groupIds.length ? msg : null) - }) - ) - }, 4), - pull.filter(Boolean), - pull.drain( - (msg) => { - const groupId = msg.value.content.recps[0] - - const newKey = Buffer.from(msg.value.content.groupKey, 'base64') - ssb.box2.addGroupInfo(groupId, { key: newKey }, (err) => { - // prettier-ignore - if (err) return cb(clarify(err, 'Error adding new epoch key that we found')) - - const newKeyPick = { - key: newKey, - scheme: keySchemes.private_group, - } - // TODO: naively guessing that this is the latest key for now - ssb.box2.pickGroupWriteKey(groupId, newKeyPick, (err) => { - // prettier-ignore - if (err) return cb(clarify(err, 'Error switching to new epoch key that we found')) - - ssb.db.reindexEncrypted((err) => { - // prettier-ignore - if (err) cb(clarify(err, 'Error reindexing after finding new epoch')) - }) - }) - }) - }, - (err) => { - // prettier-ignore - if (err) return cb(clarify(err, "Error finding new epochs we've been added to")) - } - ) - ) + cb(null) + startListeners(ssb, getPreferredEpoch, console.error) }) } diff --git a/lib/epochs.js b/lib/epochs.js index b0ce688..7387686 100644 --- a/lib/epochs.js +++ b/lib/epochs.js @@ -5,11 +5,14 @@ const { promisify: p } = require('util') const { fromMessageSigil } = require('ssb-uri2') const pull = require('pull-stream') +const pullDefer = require('pull-defer') +const pullFlatMerge = require('pull-flat-merge') const Reduce = require('@tangle/reduce') const OverwriteFields = require('@tangle/overwrite-fields') const clarify = require('clarify-error') const Butt64 = require('butt64') const isCanonicalBase64 = require('is-canonical-base64') +const { where, and, type, live, toPullStream } = require('ssb-db2/operators') const { validator: { group: { @@ -22,6 +25,7 @@ const { } = require('private-group-spec') const difference = require('set.prototype.difference') +const { groupRecp } = require('./operators') const getTangleUpdates = require('./tangles/get-tangle-updates') const msgPattern = toPattern(new Butt64('ssb:message/[a-zA-Z0-9-]+/', null, 32)) @@ -65,7 +69,8 @@ const strategy = OverwriteFields({ // PATCH: @tangle/reduce needs this strategy.mapToPure = (T) => T || strategy.identity() -module.exports = function Epochs(ssb) { +module.exports = Epochs +function Epochs(ssb) { const allGetters = { author(epochRoot, cb) { ssb.metafeeds.findRootFeedId(epochRoot.value.author, cb) @@ -74,118 +79,314 @@ module.exports = function Epochs(ssb) { cb(null, epochRoot.value.content.groupKey) }, members(epochRoot, cb) { - getMembers(ssb, epochRoot.key, cb) + getMembers(epochRoot.key, cb) }, } - return { - getEpochs(groupId, cb) { - if (cb === undefined) return p(this.getEpochs)(groupId) - - const getters = pluck(allGetters, ['author', 'secret']) - reduceEpochs(ssb, groupId, getters, (err, reduce) => { - if (err) - return cb(clarify(err, 'Failed to resolve epoch @tangle/reduce')) - - const epochs = reduce.graph.connectedNodes.map((node) => { - const info = { - id: node.key, // alias: epochRootId - previous: node.previous, - ...node.data[node.key], - } - info.secret = Buffer.from(info.secret, 'base64') - return info - }) + function getEpochs(groupId, cb) { + if (cb === undefined) return p(getEpochs)(groupId) + + const opts = { getters: pluck(allGetters, ['author', 'secret']) } + epochsReduce(groupId, opts, (err, reduce) => { + // prettier-ignore + if (err) return cb(clarify(err, 'Failed to resolve epoch @tangle/reduce')) + + const epochs = reduce.graph.connectedNodes.map((node) => { + const info = { + id: node.key, // alias: epochRootId + previous: node.previous, + ...node.data[node.key], + } + info.secret = Buffer.from(info.secret, 'base64') + return info + }) + + cb(null, epochs) + }) + } + + function getMembers(epochRootId, cb) { + if (cb === undefined) return p(getMembers)(epochRootId) + + epochRootId = toMsgURI(epochRootId) + const added = new Set() + const toExclude = new Set() + + pull( + getTangleUpdates.stream(ssb, 'members', epochRootId), + pull.filter((msg) => isAddMember(msg) || isExcludeMember(msg)), + pull.through((msg) => { + const { type, recps, excludes } = msg.value.content + if (type === 'group/add-member') + recps.slice(1).forEach((feedId) => added.add(feedId)) + else return excludes.forEach((feedId) => toExclude.add(feedId)) + }), + pull.collect((err) => { + if (err) return cb(clarify(err, 'Failed to resolve epoch membership')) - cb(null, epochs) + cb(null, { + added: [...added], + toExclude: [...toExclude], + }) }) - }, + ) + } + getMembers.stream = function getMembersStream(epochRootId, opts = {}) { + const { live } = opts + + const deferredSource = pullDefer.source() - getPreferredEpoch(groupId, cb) { - if (cb === undefined) return p(this.getPreferredEpoch).call(this, groupId) + getMembers(epochRootId, (err, res) => { + // prettier-ignore + if (err) return deferredSource.abort(clarify(err, 'error getting members')) - reduceEpochs(ssb, groupId, allGetters, (err, reduce) => { - if (err) - return cb(clarify(err, 'Failed to resolve epoch @tangle/reduce')) + if (!live) { + deferredSource.resolve(pull.once(res)) + return + } - const tips = Object.keys(reduce.state).map((id) => { - const info = { - id, - previous: reduce.graph.getNode(id).previous, - ...reduce.state[id][id], + const added = new Set(res.added) + const toExclude = new Set(res.toExclude) + + const source = pull( + // create a stream of "there is an update" events + pull.values([ + // one event for current state + pull.once(true), + + // run a live stream, only emiting "true" if there is new info in the + // message that comes in + pull( + getTangleUpdates.stream(ssb, 'members', epochRootId, { live }), + pull.map((msg) => { + if (isAddMember(msg)) { + const initialSize = added.size + msg.value.content.recps + .slice(1) + .forEach((feedId) => added.add(feedId)) + return added.size > initialSize + } + + if (isExcludeMember(msg)) { + const initialSize = toExclude.size + msg.value.content.excludes.forEach((feedId) => + toExclude.add(feedId) + ) + return toExclude.size > initialSize + } + + return false + }), + pull.filter(Boolean) + ), + ]), + pull.flatten(), + + // for each "there is an update" event, map that to emitting the current + // membereship state of the epoch + pull.map(() => { + return { + added: [...added], + toExclude: [...toExclude], } - info.secret = Buffer.from(info.secret, 'base64') - info.members = new Set(info.members.toAdded) - return info }) + ) - let preferredEpoch - if (tips.length === 1) preferredEpoch = tips[0] - else if (tips.length === 2) { - if (isSameSet(...tips.map((t) => t.members))) - preferredEpoch = tieBreak(tips) - else return cb(Error('case not handled yet')) - } else return cb(Error(`case of ${tips.length} tips not handled yet`)) + return deferredSource.resolve(source) + }) - delete preferredEpoch.members - cb(null, preferredEpoch) - }) - }, + return deferredSource + } - getMembers(epochRootId, cb) { - if (cb === undefined) return p(this.getMembers).call(this, epochRootId) + function getPreferredEpoch(groupId, cb) { + if (cb === undefined) return p(getPreferredEpoch)(groupId) - getMembers(ssb, epochRootId, cb) - }, + epochsReduce(groupId, { getters: allGetters }, (err, reduce) => { + if (err) return cb(clarify(err, 'Failed to resolve epoch @tangle/reduce')) - getMissingMembers(groupId, cb) { - if (cb === undefined) return p(this.getMissingMembers).call(this, groupId) - - reduceEpochs(ssb, groupId, allGetters, (err, reduce) => { - if (err) - return cb(clarify(err, 'Failed to resolve epoch @tangle/reduce')) - - if (reduce.graph.connectedNodes.length === 1) return cb(null, []) - // INFO if there is only one connectedNode, there is only a - // single epoch, so there's no need to check anything - - const result = reduce.graph.connectedNodes.reduce((acc, node) => { - // For each node (epoch) in the tangle, determine which feeds have - // historically been added / excluded. - const addedSoFar = new Set() - const excludedSoFar = new Set() - ;[node.key, ...reduce.graph.getHistory(node.key)] - .map((key) => reduce.graph.getNode(key)) - .forEach((historyNode) => { - const { added, toExclude } = - historyNode.data[historyNode.key].members - added.forEach((feedId) => addedSoFar.add(feedId)) - - if (historyNode.key === node.key) return - // INFO node members toExclude is talking about what should happen - // *after* it in the graph, so if we're enquiring about a - // particular epoch, we don't include that epoch's toExclude - toExclude.forEach((feedId) => excludedSoFar.add(feedId)) - }) + buildPreferredEpoch(reduce, cb) + }) + } + + // prettier-ignore + getPreferredEpoch.stream = function getPreferredEpochStream(groupId, opts = {}) { + const { live } = opts + + const deferredSource = pullDefer.source() - // Check if those who should be present are present - const shouldBePresent = difference(addedSoFar, excludedSoFar) - const currentMembers = new Set(node.data[node.key].members.added) - if (isSameSet(shouldBePresent, currentMembers)) return acc + // we don't want to emit every epoch up till current, so we calculate the current, + // then skip all the preferrentEpochs until we get up to the current + // This is important for listMembers to not send confusing results + getPreferredEpoch(groupId, (err, preferredEpoch) => { + if (err) return deferredSource.abort(clarify(err, 'failed to get initial preferred epoch')) + + if (!live) { + deferredSource.resolve(pull.once(preferredEpoch)) + return + } + + var sync = false + const source = pull( + epochsReduce.stream(groupId, { getters: allGetters, live }), + pull.asyncMap(buildPreferredEpoch), + pull.filter(epoch => { + // if have seen current preferredEpoch, allow through + if (sync) return true + // if we have reached current preferredEpoch, we're "in sync" + if (epoch.id === preferredEpoch.id) { + sync = true // start letting future updates through + return true // let the current one through! + } + return false + }), + ) + + deferredSource.resolve(source) + }) - // If find some missing, record this epoch and missing members - acc.push({ - epoch: node.key, // alias: epochRootId - secret: Buffer.from(node.data[node.key].secret, 'base64'), - missing: [...difference(shouldBePresent, currentMembers)], + return deferredSource + } + + function getMissingMembers(groupId, cb) { + if (cb === undefined) return p(this.getMissingMembers).call(this, groupId) + + epochsReduce(groupId, { getters: allGetters }, (err, reduce) => { + // prettier-ignore + if (err) return cb(clarify(err, 'Failed to resolve epoch @tangle/reduce')) + + if (reduce.graph.connectedNodes.length === 1) return cb(null, []) + // INFO if there is only one connectedNode, there is only a + // single epoch, so there's no need to check anything + + const result = reduce.graph.connectedNodes.reduce((acc, node) => { + // For each node (epoch) in the tangle, determine which feeds have + // historically been added / excluded. + const addedSoFar = new Set() + const excludedSoFar = new Set() + ;[node.key, ...reduce.graph.getHistory(node.key)] + .map((key) => reduce.graph.getNode(key)) + .forEach((historyNode) => { + const { added, toExclude } = + historyNode.data[historyNode.key].members + added.forEach((feedId) => addedSoFar.add(feedId)) + + if (historyNode.key === node.key) return + // INFO node members toExclude is talking about what should happen + // *after* it in the graph, so if we're enquiring about a + // particular epoch, we don't include that epoch's toExclude + toExclude.forEach((feedId) => excludedSoFar.add(feedId)) }) - return acc - }, []) - cb(null, result) + // Check if those who should be present are present + const shouldBePresent = difference(addedSoFar, excludedSoFar) + const currentMembers = new Set(node.data[node.key].members.added) + if (isSameSet(shouldBePresent, currentMembers)) return acc + + // If find some missing, record this epoch and missing members + acc.push({ + epoch: node.key, // alias: epochRootId + secret: Buffer.from(node.data[node.key].secret, 'base64'), + missing: [...difference(shouldBePresent, currentMembers)], + }) + return acc + }, []) + + cb(null, result) + }) + } + + /* private helpers */ + + function epochsReduce(groupId, opts = {}, cb) { + const { getters } = opts + // - `groupId` *String* ssb-uri for a group + // - `getters` *Object* which describes which data fields you would like + // added to each epoch, and an async getter to aquire the data. + // - The getter is a function with signature `(epochRoot, cb)`. + // - e.g.: + // ``` + // const getters = { + // author (epochRoot, cb) { + // getRootFeedId(epochRoot.value.author, cb) + // } + // } + // ``` + // - `cb` *Function* a callback which receives a @tangle/reduce result. + // This is an object which has produced a tangle of connected "nodes" + // (which each "node" is an epoch here), and we can access the graph + // and nodes under `resolve.graph` (which is a @tangle/graph object) + + pull( + // collect epochs and decorate each epoch "node" with data (using getters) + epochNodeStream(ssb, groupId, { getters }), + pull.collect((err, nodes) => { + // prettier-ignore + if (err) return cb(clarify(err, 'Failure collecting epoch messages')) + + // reduce all of these epoch nodes with a tangle + const reduce = new Reduce(strategy, { nodes }) + reduce.resolve() + // INFO: this walks the graph and prunes disconnected nodes + + cb(null, reduce) }) - }, + ) + } + // emits updated @tangle/reduce objects as changes occur to epoch graph + epochsReduce.stream = function epochsReduceStream(groupId, opts = {}) { + const { getters, live: isLive } = opts + const reduce = new Reduce(strategy) + + return pull( + // reduce can be modified by 2 sorts of update: + pull.values([ + // 1. new epochs as they're discovered + pull( + epochNodeStream(ssb, groupId, { getters, live }), + pull.map((node) => { + console.log('adding epoch', node.id) + reduce.addNodes([node]) + return true + }) + ), + // 2. new member additions (membership changes preferrentEpochs) + pull( + ssb.db.query( + where(and(type('group/add-member'), groupRecp(groupId))), + isLive ? live({ old: false }) : null, + toPullStream() + ), + pull.filter(isAddMember), + pull.asyncMap((msg, cb) => { + const epochId = msg.value.content.tangles.members.root + // check if this epoch is in the reduce tangle + const epochNode = reduce.graph.getNode(epochId) + if (!epochNode) console.log('unknown epoch', epochId) + if (!epochNode) return cb(null, false) + + getMembers(epochId, (err, res) => { + if (err) return cb(null, false) + epochNode.data[epochId].members = res + cb(null, true) + }) + }) + ), + ]), + pullFlatMerge(), + pull.filter(), + pull.map(() => { + reduce.resolve() + return reduce + }) + ) + } + + return { + getEpochs, + getMembers, + getPreferredEpoch, + getMissingMembers, tieBreak, } } @@ -201,124 +402,113 @@ function tieBreak(epochs) { return epochs.find((epoch) => epoch.secret.equals(winningKey)) } -function reduceEpochs(ssb, groupId, getters = {}, cb) { - // - `groupId` *String* ssb-uri for a group - // - `getters` *Object* which describes which data fields you would like - // added to each epoch, and an async getter to aquire the data. - // - The getter is a function with signature `(epochRoot, cb)`. - // - e.g.: - // ``` - // const getters = { - // author (epochRoot, cb) { - // getRootFeedId(epochRoot.value.author, cb) - // } - // } - // ``` - // - `cb` *Function* a callback which receives a @tangle/reduce result. - // This is an object which has produced a tangle of connected "nodes" - // (which each "node" is an epoch here), and we can access the graph - // and nodes under `resolve.graph` (which is a @tangle/graph object) - - ssb.box2.getGroupInfo(groupId, (err, info) => { - if (err) return cb(clarify(err, 'Failed to get group info for ' + groupId)) - - // Fetch the tangle root - ssb.db.get(info.root, (err, rootVal) => { - if (err) - return cb( - clarify(err, 'Failed to load group root with id ' + info.root) - ) - if (!isInitRoot(rootVal)) - return cb( - clarify( - new Error(isInitRoot.string), - 'Malformed group/init root message' - ) - ) - - const root = { key: info.root, value: rootVal } +function epochNodeStream(ssb, groupId, opts = {}) { + const { getters, live } = opts + const deferredSource = pullDefer.source() - // Fetch the tangle updates - getTangleUpdates(ssb, 'epoch', info.root, (err, updates) => { - if (err) - return cb(clarify(err, 'Failed to updates of group epoch tangle')) + getGroupInit(ssb, groupId, (err, root) => { + // prettier-ignore + if (err) return deferredSource.abort(clarify(err, 'Failed to get group init message')) - // Take each root/update and build an epoch "node" for our tangle + // Take each root/update and build an epoch "node" for our tangle + const source = pull( + pull.values([ + pull.once(root), pull( - pull.values([root, ...updates.filter(isInitEpoch)]), - pull.asyncMap((msg, cb) => { - const epochRootId = toMsgURI(msg.key) - const epochData = {} - const node = { - key: epochRootId, - previous: msg.value.content.tangles.epoch.previous, - data: { - [epochRootId]: epochData, - }, - } - - // Use out getters to attach desired data to our epoch "node" - pull( - pull.values(Object.entries(getters)), - pull.asyncMap(([fieldName, getter], cb) => { - getter(msg, (err, fieldData) => { - if (err) - return cb(clarify(err, 'Failed to get epoch ' + fieldName)) - epochData[fieldName] = fieldData - cb(null) - }) - }), - pull.collect((err) => { - if (err) return cb(clarify(err, 'Failed to collect epoch data')) - - cb(null, node) - }) - ) + getTangleUpdates.stream(ssb, 'epoch', toMsgURI(root.key), { live }), + pull.filter(isInitEpoch) + ), + ]), + pull.flatten(), + pull.asyncMap((msg, cb) => { + // Build epoch node + const epochRootId = toMsgURI(msg.key) + const epochData = {} + const node = { + key: epochRootId, + previous: msg.value.content.tangles.epoch.previous, + data: { + [epochRootId]: epochData, + }, + } + if (!getters) return cb(null, node) + + // Use getters to attach desired data to our epoch node + pull( + pull.values(Object.entries(getters)), + pull.asyncMap(([fieldName, getter], cb) => { + getter(msg, (err, fieldData) => { + if (err) + return cb(clarify(err, 'Failed to get epoch ' + fieldName)) + + epochData[fieldName] = fieldData + cb(null) + }) }), - pull.collect((err, nodes) => { - if (err) - return cb(clarify(err, 'Failure collecting epoch messages')) + pull.collect((err) => { + if (err) return cb(clarify(err, 'Failed to collect epoch data')) - // Finally reduce all of these epoch nodes with a tangle - const reduce = new Reduce(strategy, { nodes }) - reduce.resolve() - // INFO this walks the graph and prunes disconnected nodes - - cb(null, reduce) + cb(null, node) }) ) }) - }) + ) + + deferredSource.resolve(source) }) + + return deferredSource } +function getGroupInit(ssb, groupId, cb) { + ssb.box2.getGroupInfo(groupId, (err, info) => { + // prettier-ignore + if (err) return cb(clarify(err, 'Failed to get group info for ' + groupId)) + if (!info) return cb(new Error('Unknown group')) -function getMembers(ssb, epochRootId, cb) { - epochRootId = toMsgURI(epochRootId) - const added = new Set() - const toExclude = new Set() + // Fetch the tangle root + ssb.db.get(info.root, (err, rootVal) => { + // prettier-ignore + if (err) return cb(clarify(err, 'Failed to load group root with id ' + info.root)) - pull( - getTangleUpdates.stream(ssb, 'members', epochRootId), - pull.filter((msg) => isAddMember(msg) || isExcludeMember(msg)), - pull.drain( - (msg) => { - const { type, recps, excludes } = msg.value.content - if (type === 'group/add-member') - recps.slice(1).forEach((feedId) => added.add(feedId)) - else excludes.forEach((feedId) => toExclude.add(feedId)) - }, - (err) => { - if (err) return cb(clarify(err, 'Failed to resolve epoch membership')) - cb(null, { - added: [...added], - toExclude: [...toExclude], - }) - } - ) - ) + if (!isInitRoot(rootVal)) + // prettier-ignore + return cb(clarify(new Error(isInitRoot.string), 'Malformed group/init root message')) + + cb(null, { key: info.root, value: rootVal }) + }) + }) } /* HELPERS */ +function buildPreferredEpoch(reduce, cb) { + const tips = Object.keys(reduce.state).map((id) => { + const info = { + id, + previous: reduce.graph.getNode(id).previous, + ...reduce.state[id][id], + } + info.secret = Buffer.from(info.secret, 'base64') + info.members = new Set(info.members.added) + return info + }) + + var preferredEpoch + if (tips.length === 1) preferredEpoch = tips[0] + else if (tips.length === 2) { + if (isSameSet(...tips.map((t) => t.members))) { + preferredEpoch = tieBreak(tips) + } else { + return cb( + Error('Tips do not have same membership - case not handled yet') + ) + } + } else return cb(Error(`case of ${tips.length} tips not handled yet`)) + + delete preferredEpoch.members + + cb(null, preferredEpoch) +} + function toPattern(regexp) { return regexp.toString().replace(/^\//, '').replace(/\/$/, '') } diff --git a/lib/tangles/get-tangle-updates.js b/lib/tangles/get-tangle-updates.js index 1edf953..6623ea0 100644 --- a/lib/tangles/get-tangle-updates.js +++ b/lib/tangles/get-tangle-updates.js @@ -3,20 +3,26 @@ // SPDX-License-Identifier: LGPL-3.0-only const pull = require('pull-stream') +const { where, live, toPullStream } = require('ssb-db2/operators') const { tangleRoot } = require('../../lib/operators') function getTangleUpdates(ssb, tangle, root, cb) { - pull(tangleUpdateStream(ssb, tangle, root), pull.collect(cb)) + pull(getTangleUpdates.stream(ssb, tangle, root), pull.collect(cb)) } -getTangleUpdates.stream = tangleUpdateStream -module.exports = getTangleUpdates - -function tangleUpdateStream(ssb, tangle, root) { - const { where, and, toPullStream } = ssb.db.operators +getTangleUpdates.stream = function tangleUpdatesStream( + ssb, + tangle, + root, + opts = {} +) { return pull( - ssb.db.query(where(tangleRoot(tangle, root)), toPullStream()), + ssb.db.query( + where(tangleRoot(tangle, root)), + opts.live ? live({ old: true }) : null, + toPullStream() + ), pull.filter( (m) => Array.isArray(m.value.content.tangles[tangle].previous) && @@ -24,3 +30,5 @@ function tangleUpdateStream(ssb, tangle, root) { ) ) } + +module.exports = getTangleUpdates diff --git a/listeners.js b/listeners.js new file mode 100644 index 0000000..c77a6cc --- /dev/null +++ b/listeners.js @@ -0,0 +1,179 @@ +// SPDX-FileCopyrightText: 2023 Mix Irving +// +// SPDX-License-Identifier: LGPL-3.0-only +// + +const { + where, + and, + or, + live, + isDecrypted, + type, + toPullStream, +} = require('ssb-db2/operators') +const { + validator: { + group: { + addMember: isAddMember, + excludeMember: isExcludeMember, + initEpoch: isInitEpoch, + }, + }, + keySchemes, +} = require('private-group-spec') +const pull = require('pull-stream') +const paraMap = require('pull-paramap') +const clarify = require('clarify-error') + +module.exports = function startListeners(ssb, getPreferredEpoch, onError) { + let isClosed = false + ssb.close.hook((close, args) => { + isClosed = true + close.apply(ssb, args) + }) + + ssb.metafeeds.findOrCreate((err, myRoot) => { + // prettier-ignore + if (err) return onError(clarify(err, 'Error getting own root in start()')) + + // check if we've been excluded + pull( + ssb.db.query( + where(and(isDecrypted('box2'), type('group/exclude-member'))), + live({ old: true }), + toPullStream() + ), + pull.filter(isExcludeMember), + pull.filter((msg) => + // it's an exclusion of us + msg.value.content.excludes.includes(myRoot.id) + ), + pull.drain( + (msg) => { + const groupId = msg.value.content.recps[0] + ssb.box2.excludeGroupInfo(groupId, (err) => { + // prettier-ignore + if (err && !isClosed) return onError(clarify(err, 'Error on excluding group info after finding exclusion of ourselves')) + }) + }, + (err) => { + // prettier-ignore + if (err && !isClosed) return onError(clarify(err, 'Error on looking for exclude messages excluding us')) + } + ) + ) + + // look for new epochs that we're added to + pull( + ssb.db.query( + where(and(isDecrypted('box2'), type('group/add-member'))), + live({ old: true }), + toPullStream() + ), + pull.filter(isAddMember), + // groups/epochs we're added to + pull.filter((msg) => { + return msg.value.content.recps.includes(myRoot.id) + }), + // to find new epochs we only check groups we've accepted the invite to + paraMap((msg, cb) => { + pull( + ssb.box2.listGroupIds(), + pull.filter((groupId) => groupId === msg.value.content.recps[0]), + pull.take(1), + pull.collect((err, groupIds) => { + // prettier-ignore + if (err) return cb(clarify(err, "Error getting groups we're already in when looking for new epochs")) + cb(null, groupIds.length ? msg : null) + }) + ) + }, 4), + pull.filter(Boolean), + pull.drain( + (msg) => { + const groupId = msg.value.content.recps[0] + + const secret = Buffer.from(msg.value.content.groupKey, 'base64') + ssb.box2.addGroupInfo(groupId, { key: secret }, (err) => { + // prettier-ignore + if (err && !isClosed) return onError(clarify(err, 'Cannot add new epoch key that we found')) + + const newKeyPick = { + key: secret, + scheme: keySchemes.private_group, + } + // TODO: naively guessing that this is the latest key for now + ssb.box2.pickGroupWriteKey(groupId, newKeyPick, (err) => { + // prettier-ignore + if (err && !isClosed) return onError(clarify(err, 'Error switching to new epoch key that we found')) + + ssb.db.reindexEncrypted((err) => { + // prettier-ignore + if (err && !isClosed) onError(clarify(err, 'Error reindexing after finding new epoch')) + }) + }) + }) + }, + (err) => { + // prettier-ignore + if (err && !isClosed) return onError(clarify(err, "Error finding new epochs we've been added to")) + } + ) + ) + + // listen for new epochs and update groupInfo as required + pull( + ssb.db.query( + where(or(type('group/init'), type('group/add-member'))), + live({ old: true }), + toPullStream() + ), + pull.filter((msg) => isInitEpoch(msg) || isAddMember(msg)), + pull.map((msg) => msg.value.content.recps[0]), + pull.drain( + (groupId) => { + ssb.box2.getGroupInfo(groupId, (err, groupInfo) => { + // prettier-ignore + if (err && !isClosed) return onError(clarify(err, 'fatal error in live updating groupInfo')) + if (!groupInfo) return // group that we've not accepted an invite to yet + if (groupInfo.excluded) return // group where we were excluded + + getPreferredEpoch(groupId, (err, epochInfo) => { + // prettier-ignore + if (err && !isClosed) return onError(clarify(err, 'fatal error getting preferred epoch')) + if (groupInfo.writeKey.key.equals(epochInfo.secret)) return + + ssb.box2.addGroupInfo( + groupId, + { key: epochInfo.secret }, + (err) => { + // prettier-ignore + if (err && !isClosed) return onError(clarify(err, 'Error adding new epoch key')) + + const newKeyPick = { + key: epochInfo.secret, + scheme: keySchemes.private_group, + } + ssb.box2.pickGroupWriteKey(groupId, newKeyPick, (err) => { + // prettier-ignore + if (err && !isClosed) return onError(clarify(err, 'Error picking group write key')) + + ssb.db.reindexEncrypted((err) => { + // prettier-ignore + if (err && !isClosed) onError(clarify(err, 'Error reindexing after finding new epoch')) + }) + }) + } + ) + }) + }) + }, + (err) => { + // prettier-ignore + if (err && !isClosed) return onError(clarify(err, 'Problem listening to new messages')) + } + ) + ) + }) +} diff --git a/package.json b/package.json index 88dc7f7..d6af624 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,7 @@ ], "dependencies": { "@tangle/overwrite-fields": "^2.0.3", - "@tangle/reduce": "^5.0.4", + "@tangle/reduce": "^5.0.5", "@tangle/strategy": "^4.1.2", "base64-url": "^2.3.3", "bipf": "^1.9.0", @@ -44,13 +44,14 @@ "lodash.chunk": "^4.2.0", "lodash.set": "^4.3.2", "private-group-spec": "^7.0.0", + "pull-abortable": "^4.1.1", "pull-defer": "^0.2.3", + "pull-flat-merge": "^2.0.3", "pull-paramap": "^1.2.2", "pull-stream": "^3.7.0", "set.prototype.difference": "^1.0.2", "ssb-bfe": "^3.7.0", - "ssb-box2": "^7.0.0", - "ssb-crut": "^4.6.2", + "ssb-box2": "^7.1.0", "ssb-db2": "^7.1.0", "ssb-meta-feeds": "^0.39.0", "ssb-private-group-keys": "^1.1.2", @@ -59,9 +60,9 @@ }, "devDependencies": { "c8": "^7.13.0", - "eslint": "^8.37.0", + "eslint": "^8.40.0", "husky": "~4.3.8", - "prettier": "^2.8.7", + "prettier": "^2.8.8", "pretty-quick": "^3.1.3", "pull-many": "^1.0.9", "rimraf": "^3.0.2", diff --git a/test/exclude-members.test.js b/test/exclude-members.test.js index 20ce4af..16e8314 100644 --- a/test/exclude-members.test.js +++ b/test/exclude-members.test.js @@ -275,18 +275,6 @@ test('Verify that you actually get excluded from a group', async (t) => { const invites = await pull(bob.tribes2.listInvites(), pull.collectAsPromise()) t.deepEquals(invites, [], 'Bob has no invites') - await pull(bob.tribes2.listMembers(groupId), pull.collectAsPromise()) - .then(() => - t.fail( - "Bob didn't get an error when trying to list members of the group he's excluded from" - ) - ) - .catch(() => - t.pass( - "Bob gets an error when trying to list members of the group he's excluded from" - ) - ) - await Promise.all([p(alice.close)(true), p(bob.close)(true)]) }) @@ -674,6 +662,8 @@ test("restarting the client doesn't make us rejoin old stuff", async (t) => { ) await p(bob.close)(true).then(() => t.pass("bob's client was closed")) + await p(setTimeout)(500) + bob = Testbot({ rimraf: false, name: 'bobrestart', diff --git a/test/lib/epochs.test.js b/test/lib/epochs.test.js index 4c633f0..b41bdd6 100644 --- a/test/lib/epochs.test.js +++ b/test/lib/epochs.test.js @@ -72,11 +72,20 @@ test('lib/epochs (getEpochs, getMembers)', async (t) => { ], 'there is 1 epoch' ) + + let liveMembers = [] + pull( + Epochs(alice).getMembers.stream(group.root, { live: true }), // epoch zero root + pull.drain((state) => liveMembers.unshift(state)) + ) + const excpected0 = { added: [aliceId], toExclude: [] } + // group.root = epoch zero id t.deepEqual( - await Epochs(alice).getMembers(group.root), // epoch zero root - { added: [aliceId], toExclude: [] }, + await Epochs(alice).getMembers(group.root), + excpected0, 'group members: alice' ) + t.deepEqual(liveMembers[0], excpected0, 'group members: alice (live)') await sync('to get Additions feeds') @@ -90,11 +99,17 @@ test('lib/epochs (getEpochs, getMembers)', async (t) => { Promise.all(others.map((peer) => peer.tribes2.acceptInvite(group.id))) ) await sync('to see acceptance') + const expected1 = { added: [aliceId, bobId, oscarId], toExclude: [] } t.deepEqual( - await Epochs(alice).getMembers(group.root), // epoch zero root - { added: [aliceId, bobId, oscarId], toExclude: [] }, + await Epochs(alice).getMembers(group.root), + expected1, 'epoch 0 members: alice, bob, oscar' ) + t.deepEqual( + liveMembers[0], + expected1, + 'epoch 0 members: alice, bob, oscar (live)' + ) // alice removes oscar await run( @@ -105,16 +120,13 @@ test('lib/epochs (getEpochs, getMembers)', async (t) => { const epochs = await Epochs(alice).getEpochs(group.id) const groupUpdated = await alice.tribes2.get(group.id) - const lastGroupInitId = await new Promise((resolve, reject) => { - pull( - alice.db.query(where(type('group/init')), descending(), toPullStream()), - pull.map((m) => fromMessageSigil(m.key)), - pull.take(1), - pull.collect((err, keys) => { - err ? reject(err) : resolve(keys[0]) - }) - ) - }) + const [lastGroupInitId] = await pull( + alice.db.query(where(type('group/init')), descending(), toPullStream()), + pull.map((m) => fromMessageSigil(m.key)), + pull.take(1), + pull.collectAsPromise() + ) + t.deepEqual( epochs, [ @@ -133,11 +145,18 @@ test('lib/epochs (getEpochs, getMembers)', async (t) => { ], 'there are 2 epochs' ) + + const expected2 = { added: [aliceId, bobId, oscarId], toExclude: [oscarId] } t.deepEqual( await Epochs(alice).getMembers(epochs[0].id), - { added: [aliceId, bobId, oscarId], toExclude: [oscarId] }, + expected2, 'epoch 0 members: alice, bob, oscar (note toExclude oscar)' ) + t.deepEqual( + liveMembers[0], + expected2, + 'epoch 0 members: alice, bob, oscar (note toExclude oscar) (live)' + ) t.deepEqual( await Epochs(alice).getMembers(epochs[1].id), { added: [aliceId, bobId], toExclude: [] }, @@ -197,7 +216,6 @@ test('lib/epochs (getMissingMembers)', async (t) => { if (content.type === 'group/add-member') { content.recps = content.recps.filter((recp) => recp !== bobId) } - // console.log('create', args[0].content) create.apply(this, args) }) await run( @@ -292,6 +310,31 @@ test('lib/epochs (getPreferredEpoch - 4.4. same membership)', async (t) => { ]) ) + const livePreferredEpochs = [] + let testsRunning = true + pull( + Epochs(oscar).getPreferredEpoch.stream(group.id, { live: true }), + pull.drain( + (epoch) => livePreferredEpochs.unshift(epoch), + (err) => { + if (err && testsRunning) t.error(err, 'getPreferredEpoch.stream smooth') + } + ) + ) + + const epochs0 = await Epochs(oscar).getEpochs(group.id) + + t.deepEqual( + await Epochs(oscar).getPreferredEpoch(group.id), + epochs0[0], + 'getPreferredEpoch (before exclusion)' + ) + t.deepEqual( + livePreferredEpochs[0], + epochs0[0], + 'getPreferredEpoch (before exclusion) (live)' + ) + await run( 'bob and oscar both exclude alice (mutiny, fork)!', Promise.all([ @@ -299,22 +342,41 @@ test('lib/epochs (getPreferredEpoch - 4.4. same membership)', async (t) => { oscar.tribes2.excludeMembers(group.id, [aliceId], {}), ]) ) + const epochs1 = await Epochs(oscar) + .getEpochs(group.id) + .then((epochs) => epochs.filter((epoch) => epoch.author != aliceId)) + const expected1 = epochs1[0] + + t.deepEqual( + await Epochs(oscar).getPreferredEpoch(group.id), + expected1, + 'getPreferredEpoch (before fork sync)' + ) + t.deepEqual( + livePreferredEpochs[0], + expected1, + 'getPreferredEpoch (before fork sync) (live)' + ) await run('(sync exclusion)', replicate(bob, oscar)) - const epochs = await Epochs(oscar) + await p(setTimeout)(500) + + const epochs2 = await Epochs(oscar) .getEpochs(group.id) - .then((epochs) => epochs.filter((epoch) => epoch.author != aliceId)) - const preferredEpoch = Epochs({}).tieBreak(epochs) + .then((epochs) => epochs.filter((epoch) => epoch.author !== aliceId)) + const expected2 = Epochs({}).tieBreak(epochs2) t.deepEqual( await Epochs(oscar).getPreferredEpoch(group.id), - preferredEpoch, + expected2, 'getPreferredEpoch' ) + t.deepEqual(livePreferredEpochs[0], expected2, 'getPreferredEpoch (live)') // TODO need to test epochs > 2 + testsRunning = false t.end() }) diff --git a/test/lib/tangles/get-tangle-data.test.js b/test/lib/tangles/get-tangle-data.test.js index 3472fb9..19e6504 100644 --- a/test/lib/tangles/get-tangle-data.test.js +++ b/test/lib/tangles/get-tangle-data.test.js @@ -94,7 +94,7 @@ test('get-tangle-data unit test', (t) => { }, 'adding message to tip' ) - server.close(true, t.end) + server.close(true, () => t.end()) } ) }) @@ -147,7 +147,7 @@ test(`get-tangle-${n}-publishes`, (t) => { 'We expect bounded branching with fast publishing' ) - server.close(true, t.end) + server.close(true, () => t.end()) } ) ) @@ -186,7 +186,7 @@ test('get-tangle', (t) => { 'auto adds group tangle (auto added tangles.group)' ) - ssb.close(true, t.end) + ssb.close(true, () => t.end()) }) }) }) @@ -255,6 +255,7 @@ test('get-tangle with branch', async (t) => { t.deepEqual(aliceTangle2.previous.length, 2, 'There should be two tips') await Promise.all([p(alice.close)(true), p(bob.close)(true)]) + t.end() }) test('members tangle works', async (t) => { @@ -359,4 +360,5 @@ test('members tangle works', async (t) => { p(bob.close)(true), p(carol.close)(true), ]) + t.end() }) diff --git a/test/list-and-get.test.js b/test/list-and-get.test.js index ff9581d..4d615fc 100644 --- a/test/list-and-get.test.js +++ b/test/list-and-get.test.js @@ -37,7 +37,7 @@ test('tribes.list + tribes.get', (t) => { t.deepEqual(actualGroup, expectedGroup, 'gets group data') - server.close((err) => { + server.close(true, (err) => { t.error(err, 'closes server') server = Testbot({ name, rimraf: false, keys }) @@ -51,7 +51,7 @@ test('tribes.list + tribes.get', (t) => { list, 'list returns save results after restart' ) - server.close(true, t.end) + server.close(true, () => t.end()) }) ) }) @@ -80,6 +80,7 @@ test('get', async (t) => { t.equal(root, group.root) await p(ssb.close)(true) + t.end() }) test('list', (t) => { @@ -109,7 +110,7 @@ test('list', (t) => { t.equal(groups2[0].writeKey.key, writeKey1.key) t.equal(groups2[1].id, id2) - ssb.close(true, t.end) + ssb.close(true, () => t.end()) }) ) }) @@ -180,4 +181,5 @@ test('live list groups', async (t) => { t.true(groups[0].writeKey.key.equals(group.writeKey.key), 'secret matches') await Promise.all([p(alice.close)(true), p(bob.close)(true)]) + t.end() }) diff --git a/test/list-members.test.js b/test/list-members.test.js index 8d7ba59..a0c8664 100644 --- a/test/list-members.test.js +++ b/test/list-members.test.js @@ -64,6 +64,8 @@ test('list members', async (t) => { await new Promise((res) => { pull( alice.tribes2.listMembers(group.id), + pull.map((info) => info.added), + pull.flatten(), pull.collect((err, members) => { t.error(err, 'returned members') @@ -113,12 +115,12 @@ test('live list members', async (t) => { const group = await p(alice.tribes2.create)(null).catch(t.fail) t.pass('alice created a group') - const members = [] + let members = [] pull( alice.tribes2.listMembers(group.id, { live: true }), pull.drain( - (member) => { - members.push(member) + (update) => { + members = update.added }, (err) => { if (err) t.fail(err) @@ -138,3 +140,169 @@ test('live list members', async (t) => { await Promise.all([p(alice.close)(true), p(bob.close)(true)]) }) + +test('listMembers works with exclusion', async (t) => { + const alice = Testbot({ + keys: ssbKeys.generate(null, 'alice'), + mfSeed: Buffer.from( + '000000000000000000000000000000000000000000000000000000000000a1ce', + 'hex' + ), + }) + const bob = Testbot({ + keys: ssbKeys.generate(null, 'bob'), + mfSeed: Buffer.from( + '0000000000000000000000000000000000000000000000000000000000000b0b', + 'hex' + ), + }) + const carol = Testbot({ + keys: ssbKeys.generate(null, 'carol'), + mfSeed: Buffer.from( + '00000000000000000000000000000000000000000000000000000000000ca201', + 'hex' + ), + }) + const david = Testbot({ + keys: ssbKeys.generate(null, 'david'), + mfSeed: Buffer.from( + '00000000000000000000000000000000000000000000000000000000000da71d', + 'hex' + ), + }) + + await Promise.all([ + alice.tribes2.start(), + bob.tribes2.start(), + carol.tribes2.start(), + david.tribes2.start(), + ]).then(() => t.pass('tribes2 started for everyone')) + + const [aliceRoot, bobRoot, carolRoot, davidRoot] = await Promise.all([ + p(alice.metafeeds.findOrCreate)(), + p(bob.metafeeds.findOrCreate)(), + p(carol.metafeeds.findOrCreate)(), + p(david.metafeeds.findOrCreate)(), + ]) + + await Promise.all([ + replicate(alice, bob), + replicate(alice, carol), + replicate(alice, david), + ]).then(() => t.pass('everyone replicates their trees')) + + const { id: groupId } = await alice.tribes2 + .create() + .catch((err) => t.error(err, 'alice failed to create group')) + + t.pass(' --- listMembers (live) started --- ') + let liveMembers + pull( + alice.tribes2.listMembers(groupId, { live: true }), + pull.drain( + (update) => { + liveMembers = update.added + }, + (err) => { + if (err) t.error(err) + } + ) + ) + + await p(setTimeout)(1000) + + t.deepEquals(liveMembers, [aliceRoot.id], 'only alice is in the group') + + await alice.tribes2 + .addMembers(groupId, [bobRoot.id, carolRoot.id]) + .catch((err) => t.error(err, 'add bob and carol fail')) + + await p(setTimeout)(500) + t.deepEquals( + liveMembers.sort(), + [aliceRoot.id, bobRoot.id, carolRoot.id].sort(), + 'alice bob and carol are in the group' + ) + + await Promise.all([replicate(alice, bob), replicate(alice, carol)]) + + await Promise.all([ + bob.tribes2.acceptInvite(groupId), + carol.tribes2.acceptInvite(groupId), + ]) + + await Promise.all([replicate(alice, bob), replicate(alice, carol)]) + + await alice.tribes2 + .excludeMembers(groupId, [bobRoot.id]) + .then((res) => { + t.pass('alice excluded bob') + return res + }) + .catch((err) => t.error(err, 'remove member fail')) + + await Promise.all([replicate(alice, bob), replicate(alice, carol)]) + + await p(setTimeout)(500) + t.deepEquals( + liveMembers.sort(), + [aliceRoot.id, carolRoot.id].sort(), + 'bob is out of the group' + ) + + await alice.tribes2 + .addMembers(groupId, [davidRoot.id]) + .then(() => t.pass('david added to group')) + .catch((err) => t.error(err, 'add david fail')) + + await Promise.all([replicate(alice, bob), replicate(alice, carol)]) + + const aliceMembers = await pull( + alice.tribes2.listMembers(groupId), + pull.map((update) => update.added), + pull.flatten(), + pull.unique(), + pull.collectAsPromise() + ) + t.deepEquals( + aliceMembers.sort(), + [aliceRoot.id, carolRoot.id, davidRoot.id].sort(), + 'alice gets the correct members list' + ) + + const carolMembers = await pull( + carol.tribes2.listMembers(groupId), + pull.map((update) => update.added), + pull.flatten(), + pull.unique(), + pull.collectAsPromise() + ) + t.deepEquals( + carolMembers.sort(), + [aliceRoot.id, carolRoot.id, davidRoot.id].sort(), + 'carol gets the correct members list' + ) + + const msg = + "Bob gets an error when trying to list members of the group he's excluded from" + await pull(bob.tribes2.listMembers(groupId), pull.collectAsPromise()) + .then((res) => { + console.log(res) + t.fail(msg) + }) + .catch(() => t.pass(msg)) + + await p(setTimeout)(500) + t.deepEquals( + liveMembers.sort(), + [aliceRoot.id, carolRoot.id, davidRoot.id].sort(), + 'adding david to new epoch got detected live' + ) + + await Promise.all([ + p(alice.close)(true), + p(bob.close)(true), + p(carol.close)(true), + p(david.close)(true), + ]) +}) diff --git a/test/prune-publish.test.js b/test/prune-publish.test.js index e6eeba7..8827100 100644 --- a/test/prune-publish.test.js +++ b/test/prune-publish.test.js @@ -56,41 +56,31 @@ test('prune a message with way too big `previous`', async (t) => { t.true(msg16len > 10, 'there are some previouses left') await p(ssb.close)(true) + t.end() }) -test('publish many messages that might need pruning', (t) => { +test('publish many messages that might need pruning', async (t) => { const n = 5000 const ssb = Testbot() - const publishArray = new Array(n).fill().map((item, i) => i) - - ssb.tribes2.create(null, (err, group) => { - if (err) t.fail(err) - - const publishes = publishArray.map( - (value) => - new Promise((res, rej) => { - ssb.tribes2.publish( - { type: 'potato', content: value, recps: [group.id] }, - null, - (err, msg) => { - if (err) return rej(err) - return res(msg) - } - ) - }) - ) - - //console.log('publishing', n) - //console.time('publish') - Promise.all(publishes) - .then(async () => { - //console.timeEnd('publish') - - t.pass('published all the messages') - - ssb.close(true, t.end) + const group = await p(ssb.tribes2.create)(null) + + const start = Date.now() + let count = 0 + await Promise.all( + Array.from({ length: n }, (_, i) => { + const content = { type: 'potato', count: i, recps: [group.id] } + return ssb.tribes2.publish(content, null).then(() => { + count++ + if (count % 500 === 0) t.pass(count) }) - .catch(t.error) - }) + }) + ) + .then(() => { + t.pass(`published ${n} messages in ${Date.now() - start}ms`) + }) + .catch(t.error) + + await p(ssb.close)(true) + t.end() })