Skip to content

Commit

Permalink
add optional subscribeRepos query delay via new SUBSCRIBE_REPOS_BATCH…
Browse files Browse the repository at this point in the history
…_DELAY env var

for #36
  • Loading branch information
snarfed committed Nov 5, 2024
1 parent c8961c8 commit 7874619
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 1 deletion.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ Configure arroba with these environment variables:
Optional, only used in [com.atproto.repo](https://arroba.readthedocs.io/en/stable/source/arroba.html#module-arroba.xrpc_repo), [.server](https://arroba.readthedocs.io/en/stable/source/arroba.html#module-arroba.xrpc_server), and [.sync](https://arroba.readthedocs.io/en/stable/source/arroba.html#module-arroba.xrpc_sync) XRPC handlers:

* `REPO_TOKEN`, static token to use as both `accessJwt` and `refreshJwt`, defaults to contents of `repo_token` file. Not required to be an actual JWT. If not set, XRPC methods that require auth will return HTTP 501 Not Implemented.
* `ROLLBACK_WINDOW`, number of events to serve in the [`subscribeRepos` rollback window](https://atproto.com/specs/event-stream#sequence-numbers). Defaults to no limit.
* `ROLLBACK_WINDOW`, number of events to serve in the [`subscribeRepos` rollback window](https://atproto.com/specs/event-stream#sequence-numbers), as an integer. Defaults to no limit.
* `SUBSCRIBE_REPOS_BATCH_DELAY`, minimum time to wait between datastore queries in `com.atproto.sync.subscribeRepos`, in seconds, as a float. Defaults to 0 if unset.

<!-- Only used in app.py:
* `REPO_DID`, repo user's DID, defaults to contents of `repo_did` file
Expand Down
5 changes: 5 additions & 0 deletions arroba/tests/test_xrpc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,11 @@ def test_subscribe_repos(self, *_):

subscriber_b.join()

@patch.dict(os.environ, SUBSCRIBE_REPOS_BATCH_DELAY='.01')
@patch('arroba.xrpc_sync.NEW_EVENTS_TIMEOUT', timedelta(seconds=.01))
def test_subscribe_repos_batch_delay(self, *_):
self.test_subscribe_repos()

def test_subscribe_repos_cursor_zero(self, *_):
commit_cids = [self.repo.head.cid]
writes = [None]
Expand Down
3 changes: 3 additions & 0 deletions arroba/xrpc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ def handle(event):
cur_seq = last_seq
break

if delay := os.getenv('SUBSCRIBE_REPOS_BATCH_DELAY'):
time.sleep(float(delay))



@server.server.method('com.atproto.sync.getBlocks')
Expand Down

0 comments on commit 7874619

Please sign in to comment.