diff --git a/poetry.lock b/poetry.lock index bb27e664..389be3ce 100644 --- a/poetry.lock +++ b/poetry.lock @@ -748,13 +748,13 @@ files = [ [[package]] name = "deepeval" -version = "2.1.9" +version = "2.2.1" description = "The Open-Source LLM Evaluation Framework." optional = false python-versions = "<3.13,>=3.9" files = [ - {file = "deepeval-2.1.9-py3-none-any.whl", hash = "sha256:c225f8ab6ab910de50026dfd46e2ea38541b3697b189831482a6f02162ead536"}, - {file = "deepeval-2.1.9.tar.gz", hash = "sha256:b6c9e90fd0ab639c5b0af5023f2e3fd20ce1906b05d7dc9bfc0bd2f46d0545e0"}, + {file = "deepeval-2.2.1-py3-none-any.whl", hash = "sha256:e536a0b19f11158f4fa48f091de707ff321ad954472b4d17cf2a9607f49f83a9"}, + {file = "deepeval-2.2.1.tar.gz", hash = "sha256:dffde872bc514952e52d4a9ee0005d4be10d414cd5f443b992e908c0bc6307fa"}, ] [package.dependencies] @@ -958,13 +958,13 @@ testing = ["hatch", "pre-commit", "pytest", "tox"] [[package]] name = "executing" -version = "2.1.0" +version = "2.2.0" description = "Get the currently executing AST node of a frame, and other information" optional = false python-versions = ">=3.8" files = [ - {file = "executing-2.1.0-py2.py3-none-any.whl", hash = "sha256:8d63781349375b5ebccc3142f4b30350c0cd9c79f921cde38be2be4637e98eaf"}, - {file = "executing-2.1.0.tar.gz", hash = "sha256:8ea27ddd260da8150fa5a708269c4a10e76161e2496ec3e587da9e3c0fe4b9ab"}, + {file = "executing-2.2.0-py2.py3-none-any.whl", hash = "sha256:11387150cad388d62750327a53d3339fad4888b39a6fe233c3afbb54ecffd3aa"}, + {file = "executing-2.2.0.tar.gz", hash = "sha256:5d108c028108fe2551d1a7b2e8b713341e2cb4fc0aa7dcf966fa4327a5226755"}, ] [package.extras] @@ -2366,13 +2366,13 @@ langchain-core = ">=0.3.29,<0.4.0" [[package]] name = "langfuse" -version = "2.57.10" +version = "2.57.12" description = "A client library for accessing langfuse" optional = false python-versions = "<4.0,>=3.9" files = [ - {file = "langfuse-2.57.10-py3-none-any.whl", hash = "sha256:db7e8f7cf8d0204e17074e6729b144e7f9c7198499cd84a824bbc81fb5e37e4a"}, - {file = "langfuse-2.57.10.tar.gz", hash = "sha256:751dd03271809f4bf50f6e4e0d1138b0e0eb028efefc984fdc6948d2bfddd95d"}, + {file = "langfuse-2.57.12-py3-none-any.whl", hash = "sha256:11f7b0a002ef08c1de129384c866a389aa7997d6620c5a7282678ea769b93857"}, + {file = "langfuse-2.57.12.tar.gz", hash = "sha256:e74e7c7ef790475d222a9ee6e5163524495a899817db18736df2ab14bc72615f"}, ] [package.dependencies] @@ -2963,13 +2963,13 @@ signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] [[package]] name = "openai" -version = "1.59.8" +version = "1.60.0" description = "The official Python library for the openai API" optional = false python-versions = ">=3.8" files = [ - {file = "openai-1.59.8-py3-none-any.whl", hash = "sha256:a8b8ee35c4083b88e6da45406d883cf6bd91a98ab7dd79178b8bc24c8bfb09d9"}, - {file = "openai-1.59.8.tar.gz", hash = "sha256:ac4bda5fa9819fdc6127e8ea8a63501f425c587244bc653c7c11a8ad84f953e1"}, + {file = "openai-1.60.0-py3-none-any.whl", hash = "sha256:df06c43be8018274980ac363da07d4b417bd835ead1c66e14396f6f15a0d5dda"}, + {file = "openai-1.60.0.tar.gz", hash = "sha256:7fa536cd4b644718645b874d2706e36dbbef38b327e42ca0623275da347ee1a9"}, ] [package.dependencies] @@ -5810,4 +5810,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "~3.12" -content-hash = "9e2205ccfcf513cc6a7adef24999a4985171c0157f14e73e17333b81ebca3d9c" +content-hash = "3ca1f7bf6fffafa7e781d0a2d4928c171332ef23b39f765dd364db4250faf653" diff --git a/pyproject.toml b/pyproject.toml index a2143c0b..46fd0dca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ scrubadub = {extras = ["all"], version = "^2.0.1"} tiktoken = "^0.7.0" [tool.poetry.group.test.dependencies] -deepeval = "^2.1.2" +deepeval = "^2.2.0" fakeredis = "^2.23.3" prettytable = "^3.10.2" pytest = "^8.2.2" diff --git a/src/agents/memory/async_redis_checkpointer.py b/src/agents/memory/async_redis_checkpointer.py index 2a65c670..81de0c7c 100644 --- a/src/agents/memory/async_redis_checkpointer.py +++ b/src/agents/memory/async_redis_checkpointer.py @@ -32,7 +32,6 @@ def _make_redis_checkpoint_key( thread_id: str, checkpoint_ns: str, checkpoint_id: str ) -> str: """Create a Redis key for storing checkpoint data. - Returns a Redis key string in the format "checkpoint$thread_id$namespace$checkpoint_id". """ return REDIS_KEY_SEPARATOR.join( @@ -271,6 +270,8 @@ async def aput_writes( """Store intermediate writes linked to a checkpoint asynchronously. This method saves intermediate writes associated with a checkpoint to the database. + Critical for fault tolerance: stores successful node outputs even if other nodes + in the same superstep fail, preventing unnecessary re-execution on retry. Args: config (RunnableConfig): Configuration of the related checkpoint. diff --git a/tests/unit/agents/memory/test_async_redis_checkpointer.py b/tests/unit/agents/memory/test_async_redis_checkpointer.py index fd1c9986..7eff3291 100644 --- a/tests/unit/agents/memory/test_async_redis_checkpointer.py +++ b/tests/unit/agents/memory/test_async_redis_checkpointer.py @@ -297,30 +297,185 @@ async def test_aload_pending_writes( class TestUtilityFunctions: - def test_make_redis_checkpoint_key(self): - key = _make_redis_checkpoint_key("thread1", "ns1", "chk1") - assert key == "checkpoint$thread1$ns1$chk1" + @pytest.mark.parametrize( + "thread_id, checkpoint_ns, checkpoint_id, expected_key", + [ + # Basic case + ("thread1", "ns1", "chk1", "checkpoint$thread1$ns1$chk1"), + # Special characters + ("thread-1", "ns/1", "chk.1", "checkpoint$thread-1$ns/1$chk.1"), + # Long identifiers + ( + "thread_very_long_123", + "namespace_very_long_456", + "checkpoint_very_long_789", + "checkpoint$thread_very_long_123$namespace_very_long_456$checkpoint_very_long_789", + ), + # Empty namespace (should be filtered out) + ("thread1", "", "chk1", "checkpoint$thread1$$chk1"), + # Empty thread_id + ("", "ns1", "chk1", "checkpoint$$ns1$chk1"), + # Empty checkpoint_id + ("thread1", "ns1", "", "checkpoint$thread1$ns1$"), + ], + ) + def test_make_redis_checkpoint_key( + self, + thread_id: str, + checkpoint_ns: str, + checkpoint_id: str, + expected_key: str, + ) -> None: + """Test the _make_redis_checkpoint_key function with various inputs.""" - def test_make_redis_checkpoint_writes_key(self): - key = _make_redis_checkpoint_writes_key("thread1", "ns1", "chk1", "task1", 0) - assert key == "writes$thread1$ns1$chk1$task1$0" + key = _make_redis_checkpoint_key(thread_id, checkpoint_ns, checkpoint_id) + assert key == expected_key - key_no_idx = _make_redis_checkpoint_writes_key( - "thread1", "ns1", "chk1", "task1", None + @pytest.mark.parametrize( + "thread_id, checkpoint_ns, checkpoint_id, task_id, idx, expected_key", + [ + # Basic case with index + ("thread1", "ns1", "chk1", "task1", 0, "writes$thread1$ns1$chk1$task1$0"), + # Case without index (None) + ("thread1", "ns1", "chk1", "task1", None, "writes$thread1$ns1$chk1$task1"), + # Special characters in identifiers + ( + "thread-1", + "ns/1", + "chk.1", + "task:1", + 0, + "writes$thread-1$ns/1$chk.1$task:1$0", + ), + # Long identifiers + ( + "thread_very_long_123", + "namespace_very_long_456", + "checkpoint_very_long_789", + "task_very_long_012", + 1, + "writes$thread_very_long_123$namespace_very_long_456$checkpoint_very_long_789$task_very_long_012$1", + ), + # Empty namespace + ("thread1", "", "chk1", "task1", 0, "writes$thread1$$chk1$task1$0"), + # Empty thread_id + ("", "ns1", "chk1", "task1", 0, "writes$$ns1$chk1$task1$0"), + # Negative index + ("thread1", "ns1", "chk1", "task1", -1, "writes$thread1$ns1$chk1$task1$-1"), + # Large index + ( + "thread1", + "ns1", + "chk1", + "task1", + 999999, + "writes$thread1$ns1$chk1$task1$999999", + ), + ], + ) + def test_make_redis_checkpoint_writes_key( + self, + thread_id: str, + checkpoint_ns: str, + checkpoint_id: str, + task_id: str, + idx: int | None, + expected_key: str, + ) -> None: + """Test the _make_redis_checkpoint_writes_key function with various inputs.""" + key = _make_redis_checkpoint_writes_key( + thread_id, checkpoint_ns, checkpoint_id, task_id, idx ) - assert key_no_idx == "writes$thread1$ns1$chk1$task1" + assert key == expected_key - def test_parse_redis_checkpoint_key(self): - key = "checkpoint$thread1$ns1$chk1" - result = _parse_redis_checkpoint_key(key) - assert result == { - "thread_id": "thread1", - "checkpoint_ns": "ns1", - "checkpoint_id": "chk1", - } + @pytest.mark.parametrize( + "key, expected_result, should_raise", + [ + # Valid cases + ( + "checkpoint$thread1$ns1$chk1", + { + "thread_id": "thread1", + "checkpoint_ns": "ns1", + "checkpoint_id": "chk1", + }, + False, + ), + # Special characters in identifiers + ( + "checkpoint$thread-1$ns/1$chk.1", + { + "thread_id": "thread-1", + "checkpoint_ns": "ns/1", + "checkpoint_id": "chk.1", + }, + False, + ), + # Empty namespace + ( + "checkpoint$thread1$$chk1", + { + "thread_id": "thread1", + "checkpoint_ns": "", + "checkpoint_id": "chk1", + }, + False, + ), + # Long identifiers + ( + "checkpoint$thread_very_long_123$namespace_very_long_456$checkpoint_very_long_789", + { + "thread_id": "thread_very_long_123", + "checkpoint_ns": "namespace_very_long_456", + "checkpoint_id": "checkpoint_very_long_789", + }, + False, + ), + # UUID-like identifiers + ( + "checkpoint$550e8400-e29b-41d4-a716-446655440000$ns1$chk1", + { + "thread_id": "550e8400-e29b-41d4-a716-446655440000", + "checkpoint_ns": "ns1", + "checkpoint_id": "chk1", + }, + False, + ), + # Invalid cases - wrong prefix + ("invalid$thread1$ns1$chk1", None, True), + # Invalid cases - wrong number of segments + ("checkpoint$thread1$ns1", None, True), + ("checkpoint$thread1$ns1$chk1$extra", None, True), + # Invalid cases - empty segments + ( + "checkpoint$$$", + { + "thread_id": "", + "checkpoint_ns": "", + "checkpoint_id": "", + }, + False, + ), + # Invalid cases - completely wrong format + ("invalid_key", None, True), + # Invalid cases - missing separator + ("checkpointthread1ns1chk1", None, True), + ], + ) + def test_parse_redis_checkpoint_key(self, key, expected_result, should_raise): + """Test _parse_redis_checkpoint_key with various inputs using table-driven tests. - with pytest.raises(ValueError): - _parse_redis_checkpoint_key("invalid$key") + Args: + key: Input Redis key to parse + expected_result: Expected dictionary output for valid keys + should_raise: Whether ValueError should be raised + """ + if should_raise: + with pytest.raises(ValueError): + _parse_redis_checkpoint_key(key) + else: + result = _parse_redis_checkpoint_key(key) + assert result == expected_result def test_parse_redis_checkpoint_writes_key(self): key = "writes$thread1$ns1$chk1$task1$0"