Skip to content

Commit

Permalink
Enhance conversation service and data sanitizer
Browse files Browse the repository at this point in the history
- Update Redis connection to use configurable database number.
- Add support for Job and CronJob resources in data sanitizer.
- Extend unit tests to cover sanitization of Job and CronJob resources, PV, and PVCs.
- Improve the function docs.
  • Loading branch information
muralov committed Jan 20, 2025
1 parent a4a0eff commit 6e39be5
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 4 deletions.
21 changes: 20 additions & 1 deletion src/agents/memory/async_redis_checkpointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
def _make_redis_checkpoint_key(
thread_id: str, checkpoint_ns: str, checkpoint_id: str
) -> str:
"""Create a Redis key for storing checkpoint data.
Returns a Redis key string in the format "checkpoint$thread_id$namespace$checkpoint_id".
"""
return REDIS_KEY_SEPARATOR.join(
["checkpoint", thread_id, checkpoint_ns, checkpoint_id]
)
Expand All @@ -43,6 +47,10 @@ def _make_redis_checkpoint_writes_key(
task_id: str,
idx: int | None,
) -> str:
"""Create a Redis key for storing checkpoint writes data.
Returns a Redis key string in the format "writes$thread_id$namespace$checkpoint_id$task_id$idx".
"""
if idx is None:
return REDIS_KEY_SEPARATOR.join(
["writes", thread_id, checkpoint_ns, checkpoint_id, task_id]
Expand All @@ -54,6 +62,10 @@ def _make_redis_checkpoint_writes_key(


def _parse_redis_checkpoint_key(redis_key: str) -> dict:
"""Parse a Redis checkpoint key.
Returns a dictionary containing the parsed checkpoint data.
"""
namespace, thread_id, checkpoint_ns, checkpoint_id = redis_key.split(
REDIS_KEY_SEPARATOR
)
Expand All @@ -68,6 +80,10 @@ def _parse_redis_checkpoint_key(redis_key: str) -> dict:


def _parse_redis_checkpoint_writes_key(redis_key: str) -> dict:
"""Parse a Redis checkpoint writes key.
Returns a dictionary containing the parsed checkpoint writes data.
"""
namespace, thread_id, checkpoint_ns, checkpoint_id, task_id, idx = redis_key.split(
REDIS_KEY_SEPARATOR
)
Expand All @@ -91,7 +107,10 @@ def _safe_decode(key: str | bytes) -> str:
def _filter_keys(
keys: list[str | bytes], before: RunnableConfig | None, limit: int | None
) -> list[str | bytes]:
"""Filter and sort Redis keys based on optional criteria."""
"""
Filter and sort Redis keys based on optional criteria.
Returns list of filtered and sorted Redis keys.
"""
if before:
keys = [
k
Expand Down
5 changes: 2 additions & 3 deletions src/services/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from utils.config import Config
from utils.logging import get_logger
from utils.models.factory import IModel, IModelFactory, ModelFactory, ModelType
from utils.settings import REDIS_HOST, REDIS_PORT
from utils.settings import REDIS_DB_NUMBER, REDIS_HOST, REDIS_PORT
from utils.singleton_meta import SingletonMeta

logger = get_logger(__name__)
Expand Down Expand Up @@ -78,9 +78,8 @@ def __init__(
)

# Set up the Kyma Graph which allows access to stored conversation histories.
# redis_saver = RedisSaver(async_connection=initialize_async_pool(url=REDIS_URL))
checkpointer = AsyncRedisSaver.from_conn_info(
host=REDIS_HOST, port=REDIS_PORT, db=0
host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB_NUMBER
)
self._companion_graph = CompanionGraph(
models,
Expand Down
4 changes: 4 additions & 0 deletions src/services/data_sanitizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
"StatefulSetList",
"DaemonSet",
"DaemonSetList",
"Job",
"JobList",
"CronJob",
"CronJobList",
]

# List of sensitive environment variable names to remove
Expand Down
130 changes: 130 additions & 0 deletions tests/unit/services/test_data_sanitizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,136 @@ def test_data_structures_and_pii(self, test_data, expected_results, error):
},
},
),
# job resource with env variables
(
{
"kind": "Job",
"metadata": {"name": "my-job"},
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "app",
"env": [
{"name": "SECRET_KEY", "value": "secret"}
],
},
]
}
}
},
},
{
"kind": "Job",
"metadata": {"name": "my-job"},
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "app",
"env": [
{
"name": "SECRET_KEY",
"value": REDACTED_VALUE,
}
],
}
]
}
}
},
},
),
# cronjob resource with env variables
(
{
"kind": "CronJob",
"metadata": {"name": "my-cronjob"},
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "app",
"env": [
{"name": "SECRET_KEY", "value": "secret"}
],
}
]
},
}
},
},
{
"kind": "CronJob",
"metadata": {"name": "my-cronjob"},
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "app",
"env": [
{
"name": "SECRET_KEY",
"value": REDACTED_VALUE,
}
],
}
]
},
}
},
},
),
# test PV with labels with sensitive data
(
{
"kind": "PersistentVolume",
"metadata": {"name": "my-pv", "labels": {"key": "value"}},
"spec": {
"accessModes": ["ReadWriteOnce"],
"capacity": {"storage": "10Gi"},
"persistentVolumeReclaimPolicy": "Retain",
},
},
{
"kind": "PersistentVolume",
"metadata": {
"name": "my-pv",
"labels": {"key": REDACTED_VALUE},
},
"spec": {
"accessModes": ["ReadWriteOnce"],
"capacity": {"storage": "10Gi"},
"persistentVolumeReclaimPolicy": "Retain",
},
},
),
# test PVC with labels with sensitive data
(
{
"kind": "PersistentVolumeClaim",
"metadata": {"name": "my-pvc", "labels": {"key": "value"}},
"spec": {
"accessModes": ["ReadWriteOnce"],
"resources": {"requests": {"storage": "10Gi"}},
},
},
{
"kind": "PersistentVolumeClaim",
"metadata": {
"name": "my-pvc",
"labels": {"key": REDACTED_VALUE},
},
"spec": {
"accessModes": ["ReadWriteOnce"],
"resources": {"requests": {"storage": "10Gi"}},
},
},
),
],
)
def test_kubernetes_resources(self, test_data, expected_results):
Expand Down

0 comments on commit 6e39be5

Please sign in to comment.