Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: ravi_kumar_pilla <[email protected]>
  • Loading branch information
ravi-kumar-pilla committed Feb 7, 2025
1 parent 6f4fcc3 commit cdc2d7a
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 1 deletion.
42 changes: 42 additions & 0 deletions package/tests/test_data_access/test_managers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
from typing import Dict

import networkx as nx
Expand All @@ -11,9 +12,18 @@
from kedro_viz.constants import DEFAULT_REGISTERED_PIPELINE_ID, ROOT_MODULAR_PIPELINE_ID
from kedro_viz.data_access.managers import DataAccessManager
from kedro_viz.data_access.repositories.catalog import CatalogRepository
from kedro_viz.data_access.repositories.graph import GraphNodesRepository
from kedro_viz.data_access.repositories.modular_pipelines import (
ModularPipelinesRepository,
)
from kedro_viz.data_access.repositories.registered_pipelines import (
RegisteredPipelinesRepository,
)
from kedro_viz.data_access.repositories.runs import RunsRepository
from kedro_viz.data_access.repositories.tags import TagsRepository
from kedro_viz.data_access.repositories.tracking_datasets import (
TrackingDatasetsRepository,
)
from kedro_viz.integrations.utils import UnavailableDataset
from kedro_viz.models.flowchart.edge import GraphEdge
from kedro_viz.models.flowchart.named_entities import Tag
Expand All @@ -29,6 +39,38 @@ def identity(x):
return x


class TestDataAccessManager:
def test_manager_initialize_fields(self, data_access_manager: DataAccessManager):
"""Test that all instance variables are correctly initialized."""
assert isinstance(data_access_manager.catalog, CatalogRepository)
assert isinstance(data_access_manager.nodes, GraphNodesRepository)
assert isinstance(
data_access_manager.registered_pipelines, RegisteredPipelinesRepository
)
assert isinstance(data_access_manager.tags, TagsRepository)
assert isinstance(data_access_manager.modular_pipelines, defaultdict)
assert isinstance(data_access_manager.edges, defaultdict)
assert isinstance(data_access_manager.node_dependencies, defaultdict)
assert isinstance(data_access_manager.runs, RunsRepository)
assert isinstance(
data_access_manager.tracking_datasets, TrackingDatasetsRepository
)
assert data_access_manager.dataset_stats == {}

def test_manager_reset_fields(self, data_access_manager: DataAccessManager):
"""Test that reset_fields correctly reinitializes the instance variables."""
# Modify fields to non-default values
data_access_manager.catalog = None
data_access_manager.dataset_stats = {"test_key": "test_value"}

data_access_manager.reset_fields()

# Assert fields are reset to default
assert isinstance(data_access_manager.catalog, CatalogRepository)
assert isinstance(data_access_manager.dataset_stats, dict)
assert data_access_manager.dataset_stats == {}


class TestAddCatalog:
def test_add_catalog(
self,
Expand Down
33 changes: 32 additions & 1 deletion package/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import pytest
from pydantic import BaseModel

from kedro_viz.server import run_server
from kedro_viz.server import load_and_populate_data_for_notebook_users, run_server
from kedro_viz.utils import NotebookUser


class ExampleAPIResponse(BaseModel):
Expand Down Expand Up @@ -141,3 +142,33 @@ def test_save_file(self, tmp_path, mocker):
save_api_responses_to_fs_mock.assert_called_once_with(
save_file, mock_filesystem.return_value, True
)


class TestLoadAndPopulateData:
def test_load_and_populate_data_for_notebook_users(
self,
example_pipelines,
example_catalog,
mocker,
):
"""Test that data is loaded and populated correctly for notebook users."""
mock_load_data = mocker.patch(
"kedro_viz.server.kedro_data_loader.load_data_for_notebook_users",
return_value=(
example_catalog,
example_pipelines,
None,
{},
),
)
mock_populate_data = mocker.patch("kedro_viz.server.populate_data")
mock_reset_fields = mocker.patch(
"kedro_viz.server.data_access_manager.reset_fields"
)

notebook_user = NotebookUser(pipeline=example_pipelines)
load_and_populate_data_for_notebook_users(notebook_user)

mock_load_data.assert_called_once_with(notebook_user)
mock_reset_fields.assert_called_once()
mock_populate_data.assert_called_once()
42 changes: 42 additions & 0 deletions package/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from kedro_viz.utils import merge_dicts


class TestUtils:
def test_merge_dicts_flat(self):
"""Test merging flat dictionaries."""
dict_one = {"a": 1, "b": 2}
dict_two = {"b": 3, "c": 4}
expected = {"a": 1, "b": 3, "c": 4}

result = merge_dicts(dict_one, dict_two)
assert result == expected

def test_merge_dicts_nested(self):
"""Test merging nested dictionaries."""
dict_one = {"a": {"x": 1}, "b": 2}
dict_two = {"a": {"y": 2}, "c": 3}
expected = {"a": {"x": 1, "y": 2}, "b": 2, "c": 3}

result = merge_dicts(dict_one, dict_two)
assert result == expected

def test_merge_dicts_overwrite(self):
"""Test merging with overwriting nested keys."""
dict_one = {"a": {"x": 1, "y": 2}}
dict_two = {"a": {"x": 3}}
expected = {"a": {"x": 3, "y": 2}}

result = merge_dicts(dict_one, dict_two)
assert result == expected

def test_merge_dicts_empty(self):
"""Test merging when one dictionary is empty."""
dict_one = {"a": 1}
dict_two = {}
expected = {"a": 1}

result = merge_dicts(dict_one, dict_two)
assert result == expected

result = merge_dicts(dict_two, dict_one)
assert result == expected

0 comments on commit cdc2d7a

Please sign in to comment.