Skip to content

Commit

Permalink
Bust _membership_stream_cache cache when current state changes (#17732
Browse files Browse the repository at this point in the history
)

This is particularly a problem in a state reset scenario where the membership
might change without a corresponding event.

This PR is targeting a scenario where a state reset happens which causes
room membership to change. Previously, the cache would just hold onto
stale data and now we properly bust the cache in this scenario.

We have a few tests for these scenarios which you can see are now fixed
because we can remove the `FIXME` where we were previously manually
busting the cache in the test itself.

This is a general Synapse thing so by it's nature it helps out Sliding
Sync.

Fix #17368

Prerequisite for #17929

---

Match when are busting `_curr_state_delta_stream_cache`
  • Loading branch information
MadLittleMods authored Jan 8, 2025
1 parent d0677dc commit aab3672
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 26 deletions.
1 change: 1 addition & 0 deletions changelog.d/17732.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix membership caches not updating in state reset scenarios.
4 changes: 3 additions & 1 deletion synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ def process_replication_position( # noqa: B027 (no-op by design)
"""

def _invalidate_state_caches(
self, room_id: str, members_changed: Collection[str]
self,
room_id: str,
members_changed: Collection[str],
) -> None:
"""Invalidates caches that are based on the current state, but does
not stream invalidations down replication.
Expand Down
36 changes: 36 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ def process_replication_rows(
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined]
room_id, token
)
for user_id in members_changed:
self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined]
elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
if row.keys is None:
raise Exception(
Expand All @@ -236,6 +241,35 @@ def process_replication_rows(
room_id = row.keys[0]
self._invalidate_caches_for_room_events(room_id)
self._invalidate_caches_for_room(room_id)
self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined]
room_id, token
)
# Note: This code is commented out to improve cache performance.
# While uncommenting would provide complete correctness, our
# automatic forgotten room purge logic (see
# `forgotten_room_retention_period`) means this would frequently
# clear the entire cache (effectively) and probably have a noticable
# impact on the cache hit ratio.
#
# Not updating the cache here is safe because:
#
# 1. `_membership_stream_cache` is only used to indicate the
# *absence* of changes, i.e. "nothing has changed between tokens
# X and Y and so return early and don't query the database".
# 2. `_membership_stream_cache` is used when we query data from
# `current_state_delta_stream` and `room_memberships` but since
# nothing new is written to the database for those tables when
# purging/deleting a room (only deleting rows), there is nothing
# changed to care about.
#
# At worst, the cache might indicate a change at token X, at which
# point, we will query the database and discover nothing is there.
#
# Ideally, we would make it so that we could clear the cache on a
# more granular level but that's a bit complex and fiddly to do with
# room membership.
#
# self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)

Expand Down Expand Up @@ -275,6 +309,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
)
self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined]
elif data.type == EventTypes.RoomEncryption:
self._attempt_to_invalidate_cache(
"get_room_encryption", (data.room_id,)
Expand All @@ -291,6 +326,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
Expand Down
15 changes: 14 additions & 1 deletion synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,13 @@ def _update_current_state_txn(
room_id
delta_state: Deltas that are going to be used to update the
`current_state_events` table. Changes to the current state of the room.
stream_id: TODO
stream_id: This is expected to be the minimum `stream_ordering` for the
batch of events that we are persisting; which means we do not end up in a
situation where workers see events before the `current_state_delta` updates.
FIXME: However, this function also gets called with next upcoming
`stream_ordering` when we re-sync the state of a partial stated room (see
`update_current_state(...)`) which may be "correct" but it would be good to
nail down what exactly is the expected value here.
sliding_sync_table_changes: Changes to the
`sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
derived from the given `delta_state` (see
Expand Down Expand Up @@ -1908,6 +1914,13 @@ def _update_current_state_txn(
stream_id,
)

for user_id in members_to_cache_bust:
txn.call_after(
self.store._membership_stream_cache.entity_has_changed,
user_id,
stream_id,
)

# Invalidate the various caches
self.store._invalidate_state_caches_and_stream(
txn, room_id, members_to_cache_bust
Expand Down
9 changes: 9 additions & 0 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
self._entity_to_key[entity] = stream_pos
self._evict()

def all_entities_changed(self, stream_pos: int) -> None:
"""
Mark all entities as changed. This is useful when the cache is invalidated and
there may be some potential change for all of the entities.
"""
self._cache.clear()
self._entity_to_key.clear()
self._earliest_known_stream_pos = stream_pos

def _evict(self) -> None:
"""
Ensure the cache has not exceeded the maximum size.
Expand Down
18 changes: 0 additions & 18 deletions tests/rest/client/sliding_sync/test_sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1169,12 +1169,6 @@ def test_state_reset_room_comes_down_incremental_sync(self) -> None:
self.persistence.persist_event(join_rule_event, join_rule_context)
)

# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)

# Ensure that the state reset worked and only user2 is in the room now
users_in_room = self.get_success(self.store.get_users_in_room(room_id1))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
Expand Down Expand Up @@ -1322,12 +1316,6 @@ def test_state_reset_previously_room_comes_down_incremental_sync_with_filters(
self.persistence.persist_event(join_rule_event, join_rule_context)
)

# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)

# Ensure that the state reset worked and only user2 is in the room now
users_in_room = self.get_success(self.store.get_users_in_room(space_room_id))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
Expand Down Expand Up @@ -1506,12 +1494,6 @@ def test_state_reset_never_room_incremental_sync_with_filters(
self.persistence.persist_event(join_rule_event, join_rule_context)
)

# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)

# Ensure that the state reset worked and only user2 is in the room now
users_in_room = self.get_success(self.store.get_users_in_room(space_room_id))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
Expand Down
6 changes: 0 additions & 6 deletions tests/storage/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,12 +1209,6 @@ def test_state_reset2(self) -> None:
self.persistence.persist_event(join_rule_event, join_rule_context)
)

# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)

after_reset_token = self.event_sources.get_current_token()

membership_changes = self.get_success(
Expand Down
25 changes: 25 additions & 0 deletions tests/util/test_stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,28 @@ def test_max_pos(self) -> None:

# Unknown entities will return None
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), None)

def test_all_entities_changed(self) -> None:
"""
`StreamChangeCache.all_entities_changed(...)` will mark all entites as changed.
"""
cache = StreamChangeCache("#test", 1, max_size=10)

cache.entity_has_changed("[email protected]", 2)
cache.entity_has_changed("[email protected]", 3)
cache.entity_has_changed("[email protected]", 4)

cache.all_entities_changed(5)

# Everything should be marked as changed before the stream position where the
# change occurred.
self.assertTrue(cache.has_entity_changed("[email protected]", 4))
self.assertTrue(cache.has_entity_changed("[email protected]", 4))
self.assertTrue(cache.has_entity_changed("[email protected]", 4))

# Nothing should be marked as changed at/after the stream position where the
# change occurred. In other words, nothing has changed since the stream position
# 5.
self.assertFalse(cache.has_entity_changed("[email protected]", 5))
self.assertFalse(cache.has_entity_changed("[email protected]", 5))
self.assertFalse(cache.has_entity_changed("[email protected]", 5))

0 comments on commit aab3672

Please sign in to comment.