diff --git a/faust/tables/base.py b/faust/tables/base.py index e3574ee21..ca26c0f11 100644 --- a/faust/tables/base.py +++ b/faust/tables/base.py @@ -382,13 +382,40 @@ async def _del_old_keys(self) -> None: for partition, timestamps in self._partition_timestamps.items(): while timestamps and window.stale(timestamps[0], time.time()): timestamp = heappop(timestamps) + triggered_windows = [ + self._partition_timestamp_keys.get( + (partition, window_range) + ) # noqa + for window_range in self._window_ranges(timestamp) + ] keys_to_remove = self._partition_timestamp_keys.pop( (partition, timestamp), None ) + window_data = {} if keys_to_remove: - for key in keys_to_remove: - value = self.data.pop(key, None) - await self.on_window_close(key, value) + for windows in triggered_windows: + if windows: + for processed_window in windows: + # we use set to avoid duplicate element in window's data + # window[0] is the window's key + # it is not related to window's timestamp + # windows are in format: + # (key, (window_start, window_end)) + window_data.setdefault(processed_window[0], []).extend( + self.data.get(processed_window, []) + ) + + for key_to_remove in keys_to_remove: + value = self.data.pop(key_to_remove, None) + if key_to_remove[1][0] > self.last_closed_window: + await self.on_window_close( + key_to_remove, + ( + window_data[key_to_remove[0]] + if key_to_remove[0] in window_data + else value + ), + ) self.last_closed_window = max( self.last_closed_window, max(key[1][0] for key in keys_to_remove), diff --git a/tests/unit/tables/test_base.py b/tests/unit/tables/test_base.py index 84e800083..bcd6e5fd1 100644 --- a/tests/unit/tables/test_base.py +++ b/tests/unit/tables/test_base.py @@ -191,6 +191,45 @@ async def test_last_closed_window(self, *, table): assert table.last_closed_window == 0.0 table.window = Mock(name="window") + self.mock_no_ranges(table) + table._data = { + ("boo", (1.1, 1.4)): "BOO", + ("moo", (1.4, 1.6)): "MOO", + ("faa", (1.9, 2.0)): "FAA", + ("bar", (4.1, 4.2)): "BAR", + } + table._partition_timestamps = { + TP1: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0], + } + table._partition_timestamp_keys = { + (TP1, 2.0): [ + ("boo", (1.1, 1.4)), + ("moo", (1.4, 1.6)), + ("faa", (1.9, 2.0)), + ], + (TP1, 5.0): [ + ("bar", (4.1, 4.2)), + ], + } + + def get_stale(limit): + def is_stale(timestamp, latest_timestamp): + return timestamp < limit + + return is_stale + + table.window.stale.side_effect = get_stale(4.0) + + await table._del_old_keys() + + assert table.last_closed_window == 1.9 + + @pytest.mark.asyncio + async def test_last_closed_window__mock_ranges(self, *, table): + assert table.last_closed_window == 0.0 + + table.window = Mock(name="window") + self.mock_ranges(table) table._data = { ("boo", (1.1, 1.4)): "BOO", ("moo", (1.4, 1.6)): "MOO", @@ -233,6 +272,64 @@ async def test_del_old_keys(self, *, table): on_window_close = table._on_window_close = AsyncMock(name="on_window_close") table.window = Mock(name="window") + self.mock_no_ranges(table) + table._data = { + ("boo", (1.1, 1.4)): "BOO", + ("moo", (1.4, 1.6)): "MOO", + ("faa", (1.9, 2.0)): "FAA", + ("bar", (4.1, 4.2)): "BAR", + } + table._partition_timestamps = { + TP1: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0], + } + table._partition_timestamp_keys = { + (TP1, 2.0): [ + ("boo", (1.1, 1.4)), + ("moo", (1.4, 1.6)), + ("faa", (1.9, 2.0)), + ], + (TP1, 5.0): [ + ("bar", (4.1, 4.2)), + ], + } + + def get_stale(limit): + def is_stale(timestamp, latest_timestamp): + return timestamp < limit + + return is_stale + + table.window.stale.side_effect = get_stale(4.0) + + await table._del_old_keys() + + assert table._partition_timestamps[TP1] == [4.0, 5.0, 6.0, 7.0] + assert table.data == {("bar", (4.1, 4.2)): "BAR"} + + on_window_close.assert_has_calls( + [ + call.__bool__(), + call(("boo", (1.1, 1.4)), "BOO"), + call.__bool__(), + call(("moo", (1.4, 1.6)), "MOO"), + call.__bool__(), + call(("faa", (1.9, 2.0)), "FAA"), + ] + ) + + table.last_closed_window = 8.0 + table.window.stale.side_effect = get_stale(6.0) + + await table._del_old_keys() + + assert not table.data + + @pytest.mark.asyncio + async def test_del_old_keys__mock_ranges(self, *, table): + on_window_close = table._on_window_close = AsyncMock(name="on_window_close") + + table.window = Mock(name="window") + self.mock_ranges(table) table._data = { ("boo", (1.1, 1.4)): "BOO", ("moo", (1.4, 1.6)): "MOO", @@ -289,6 +386,61 @@ async def test_del_old_keys_non_async_cb(self, *, table): on_window_close = table._on_window_close = Mock(name="on_window_close") table.window = Mock(name="window") + self.mock_no_ranges(table) + table._data = { + ("boo", (1.1, 1.4)): "BOO", + ("moo", (1.4, 1.6)): "MOO", + ("faa", (1.9, 2.0)): "FAA", + ("bar", (4.1, 4.2)): "BAR", + } + table._partition_timestamps = { + TP1: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0], + } + table._partition_timestamp_keys = { + (TP1, 2.0): [ + ("boo", (1.1, 1.4)), + ("moo", (1.4, 1.6)), + ("faa", (1.9, 2.0)), + ], + (TP1, 5.0): [ + ("bar", (4.1, 4.2)), + ], + } + + def get_stale(limit): + def is_stale(timestamp, latest_timestamp): + return timestamp < limit + + return is_stale + + table.window.stale.side_effect = get_stale(4.0) + + await table._del_old_keys() + + assert table._partition_timestamps[TP1] == [4.0, 5.0, 6.0, 7.0] + assert table.data == {("bar", (4.1, 4.2)): "BAR"} + + on_window_close.assert_has_calls( + [ + call(("boo", (1.1, 1.4)), "BOO"), + call(("moo", (1.4, 1.6)), "MOO"), + call(("faa", (1.9, 2.0)), "FAA"), + ] + ) + + table.last_closed_window = 8.0 + table.window.stale.side_effect = get_stale(6.0) + + await table._del_old_keys() + + assert not table.data + + @pytest.mark.asyncio + async def test_del_old_keys_non_async_cb__mock_ranges(self, *, table): + on_window_close = table._on_window_close = Mock(name="on_window_close") + + table.window = Mock(name="window") + self.mock_ranges(table) table._data = { ("boo", (1.1, 1.4)): "BOO", ("moo", (1.4, 1.6)): "MOO", @@ -527,6 +679,11 @@ def mock_ranges(self, table, ranges=[1.1, 1.2, 1.3]): # noqa table._window_ranges.return_value = ranges return ranges + def mock_no_ranges(self, table, ranges=[]): # noqa + table._window_ranges = Mock(name="_window_ranges") + table._window_ranges.return_value = ranges + return ranges + def test_relative_now(self, *, table): event = Mock(name="event", autospec=Event) table._partition_latest_timestamp[event.message.partition] = 30.3