You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I copied code from Palomar to subscribe to the Firehose. That includes persisting the cursor of the last processed message to a database (and also periodically). Note: This is also used by, e.g. Rainbow when saving the cursor to disk.
I'm receiving a message with sequence number N and persist N to the database. On next startup, I read N from the database and pass it as cursor=N to com.atproto.sync.subscribeRepos. I noticed that the first message I receive has sequence number N, not N+1, which means I process that message twice.
This was an easy fix on my end (and I have a PR to fix search/firehose.go), but I was curious if there were other components affected, so I dug a bit deeper.
// LowerBound specifies the smallest key (inclusive) that the iterator will
// return during iteration. If the iterator is seeked or iterated past this
// boundary the iterator will return Valid()==false. Setting LowerBound
// effectively truncates the key space visible to the iterator.
The first element of the iterator is returned, i.e., this returns all events with evt.Seq >= since.
EDIT: What confuses me is that I thought the current Firehose used DbPersistence or maybe DiskPersistence, but it doesn't behave that way 🤔 .
Solutions
It would be nice to have documentation for com.atproto.sync.subscribeRepos that states whether it should return everything > since or >= since.
The EventPersistence interface should have documentation to specify how since should be handled.
The implementations should all behave the same way.
I can submit PRs for 2 and 3, probably. Let me know :)
EDIT2: Seeing how there's a test for PebblePersist and DiskPersistence, apparently they behave the same and I'm confused somewhere. I'll keep looking...
EDIT3: Well, after thinking about this for a while, I can't really put my finger on where exactly, but I believe something is not right. The Firehose (and the EventPersistence interface, based on the existing tests) return events with evt.SeqNo >= since, i.e., including since. A bunch of the consumers, including search/firehose.go and splitter use the last processed sequence number, or the highest sequence number in PebbleDB for since. I think they process one event twice.
If you let me know how it should behave, I can send PRs to fix it.
EDIT4: Just to add, the streaming endpoint of Labelers return events with seqNo > since.
The text was updated successfully, but these errors were encountered:
The Situation
I copied code from Palomar to subscribe to the Firehose. That includes persisting the cursor of the last processed message to a database (and also periodically). Note: This is also used by, e.g. Rainbow when saving the cursor to disk.
I'm receiving a message with sequence number
N
and persistN
to the database. On next startup, I readN
from the database and pass it ascursor=N
tocom.atproto.sync.subscribeRepos
. I noticed that the first message I receive has sequence numberN
, notN+1
, which means I process that message twice.This was an easy fix on my end (and I have a PR to fix
search/firehose.go
), but I was curious if there were other components affected, so I dug a bit deeper.I didn't find good documentation for
com.atproto.sync.subscribeRepos
, but I did find this: https://github.com/bluesky-social/atproto/blob/main/lexicons/com/atproto/sync/subscribeRepos.json#L13...which states
"The last known event seq number to backfill from."
.My gut feeling tells me if I already know the sequence number, I don't want it included in the response, i.e., I want everything
> cursor
.Checking the Implementation
The Firehose runs
EventPersistence.Playback(ctx, cursor, ...)
with the user-provided cursor: https://github.com/bluesky-social/indigo/blob/main/events/events.go#L360Checking all implementations of that interface, I get:
DbPersistence
correctly queries forseq > ?
: https://github.com/bluesky-social/indigo/blob/main/events/dbpersist.go#L379DiskPersistence
... not 100% sure, but I think this correctly seeks to an event withevt.Seq > since
here: https://github.com/bluesky-social/indigo/blob/main/events/diskpersist.go#L245-L257PebblePersist
queries forLowerBound: [8]byte{since}
: https://github.com/bluesky-social/indigo/blob/main/events/pebblepersist.go#L123The docs on that say:
evt.Seq >= since
.EventRingBuffer
(I think) correctly skips whileevt.Seq <= since
: https://github.com/bluesky-social/indigo/blob/main/splitter/ringbuf.go#L116-L118MemPersister
is probably correct, if thatTODO
is telling the truth: https://github.com/bluesky-social/indigo/blob/main/events/persist.go#L76-L80YoloPersister
does not support playback.EDIT: What confuses me is that I thought the current Firehose used
DbPersistence
or maybeDiskPersistence
, but it doesn't behave that way 🤔 .Solutions
com.atproto.sync.subscribeRepos
that states whether it should return everything> since
or>= since
.EventPersistence
interface should have documentation to specify howsince
should be handled.I can submit PRs for
2
and3
, probably. Let me know :)EDIT2: Seeing how there's a test for
PebblePersist
andDiskPersistence
, apparently they behave the same and I'm confused somewhere. I'll keep looking...EDIT3: Well, after thinking about this for a while, I can't really put my finger on where exactly, but I believe something is not right. The Firehose (and the
EventPersistence
interface, based on the existing tests) return events withevt.SeqNo >= since
, i.e., includingsince
. A bunch of the consumers, includingsearch/firehose.go
andsplitter
use the last processed sequence number, or the highest sequence number in PebbleDB forsince
. I think they process one event twice.If you let me know how it should behave, I can send PRs to fix it.
EDIT4: Just to add, the streaming endpoint of Labelers return events with
seqNo > since
.The text was updated successfully, but these errors were encountered: