From ee36ac6512dec8236baf51317c2334fc5a12b805 Mon Sep 17 00:00:00 2001 From: Eg0ra Date: Sat, 2 Nov 2024 10:10:27 +0700 Subject: [PATCH 1/2] Add test for widgets --- import_export_extensions/widgets.py | 6 ++-- test_project/tests/test_widgets.py | 50 +++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/import_export_extensions/widgets.py b/import_export_extensions/widgets.py index cf06f61..db15451 100644 --- a/import_export_extensions/widgets.py +++ b/import_export_extensions/widgets.py @@ -320,12 +320,12 @@ def clean( internal_url = utils.url_to_internal_value(urlparse(value).path) if not internal_url: - raise ValidationError("Invalid image path") + raise ValidationError("Invalid file path") try: if default_storage.exists(internal_url): return internal_url - except SuspiciousFileOperation: + except SuspiciousFileOperation: # pragma: no cover pass return self._get_file(value) @@ -350,4 +350,4 @@ def _get_default_storage(self) -> str: """ if hasattr(settings, "STORAGES"): return settings.STORAGES["default"]["BACKEND"] - return settings.DEFAULT_FILE_STORAGE + return settings.DEFAULT_FILE_STORAGE # pragma: no cover diff --git a/test_project/tests/test_widgets.py b/test_project/tests/test_widgets.py index c741810..80f9e4e 100644 --- a/test_project/tests/test_widgets.py +++ b/test_project/tests/test_widgets.py @@ -2,8 +2,10 @@ from django.core.files import File from django.core.files.storage import default_storage +from django.forms import ValidationError import pytest +import pytest_mock from import_export.exceptions import ImportExportError from pytest_mock import MockerFixture @@ -341,6 +343,31 @@ def test_render_empty_values(membership: Membership): assert expected_result == result +@pytest.mark.parametrize( + argnames="rem_field_lookup", + argvalues=[ + "regex", + "icontains", + ], +) +def test_intermediate_widget_filter_with_lookup(rem_field_lookup: str): + """Test widget filter rem model with lookup.""" + founded_band = BandFactory(title="In result band") + ignored_band = BandFactory(title="This band will be ignored") + + widget = IntermediateManyToManyWidget( + rem_model=Band, + rem_field="title", + rem_field_lookup=rem_field_lookup, + instance_separator=";", + ) + result = widget.filter_instances(founded_band.title) + + assert len(result) == 1 + assert founded_band in result + assert ignored_band not in result + + def test_file_widget_render_link(import_file): """Test FileWidget `render` method.""" widget = FileWidget( @@ -351,6 +378,22 @@ def test_file_widget_render_link(import_file): assert import_file.url in result +def test_file_widget_render_link_for_non_local_env( + import_file, + mocker: pytest_mock.MockerFixture, +): + """Test FileWidget `render` method.""" + widget = FileWidget(filename="test_widget") + mocker.patch.object( + widget, + "_get_default_storage", + return_value="non_local_storage", + ) + result = widget.render(import_file) + + assert import_file.url == result + + def test_file_widget_clean_url(): """Test FileWidget `clean` method.""" filename = default_storage.save("original", File(io.BytesIO(b"testvalue"))) @@ -360,6 +403,13 @@ def test_file_widget_clean_url(): assert cleaned_result == filename +def test_file_widget_clean_with_invalid_file_path(): + """Test that FileWidget.clean raise error with invalid file path.""" + widget = FileWidget(filename="imported_file") + with pytest.raises(ValidationError, match="Invalid file path"): + widget.clean("invalid_value") + + def test_file_widget_clean_non_existed_url(mocker: MockerFixture): """Check FileWidget `clean` method with downloading new file.""" response = mocker.MagicMock() From 721796128b363678f4e5d54726afa3ef0ccb3413 Mon Sep 17 00:00:00 2001 From: Eg0ra Date: Sat, 2 Nov 2024 10:22:00 +0700 Subject: [PATCH 2/2] Split admin tests, add annotations --- import_export_extensions/widgets.py | 3 +- .../test_admin/test_export.py | 462 ------------- .../test_admin/test_export/__init__.py | 0 .../test_export/test_admin_actions.py | 79 +++ .../test_export/test_admin_class.py | 105 +++ .../test_export/test_celery_endpoints.py | 77 +++ .../test_admin/test_export/test_export.py | 230 +++++++ .../test_admin/test_import.py | 626 ------------------ .../test_admin/test_import/__init__.py | 0 .../test_import/test_admin_actions.py | 170 +++++ .../test_import/test_admin_class.py | 91 +++ .../test_import/test_celery_endpoints.py | 113 ++++ .../test_admin/test_import/test_import.py | 297 +++++++++ test_project/tests/test_widgets.py | 8 +- 14 files changed, 1169 insertions(+), 1092 deletions(-) delete mode 100644 test_project/tests/integration_tests/test_admin/test_export.py create mode 100644 test_project/tests/integration_tests/test_admin/test_export/__init__.py create mode 100644 test_project/tests/integration_tests/test_admin/test_export/test_admin_actions.py create mode 100644 test_project/tests/integration_tests/test_admin/test_export/test_admin_class.py create mode 100644 test_project/tests/integration_tests/test_admin/test_export/test_celery_endpoints.py create mode 100644 test_project/tests/integration_tests/test_admin/test_export/test_export.py delete mode 100644 test_project/tests/integration_tests/test_admin/test_import.py create mode 100644 test_project/tests/integration_tests/test_admin/test_import/__init__.py create mode 100644 test_project/tests/integration_tests/test_admin/test_import/test_admin_actions.py create mode 100644 test_project/tests/integration_tests/test_admin/test_import/test_admin_class.py create mode 100644 test_project/tests/integration_tests/test_admin/test_import/test_celery_endpoints.py create mode 100644 test_project/tests/integration_tests/test_admin/test_import/test_import.py diff --git a/import_export_extensions/widgets.py b/import_export_extensions/widgets.py index db15451..88da777 100644 --- a/import_export_extensions/widgets.py +++ b/import_export_extensions/widgets.py @@ -7,6 +7,7 @@ from django.core.files import File from django.core.files.storage import default_storage from django.db.models import Model, Q, QuerySet +from django.db.models.fields.files import FieldFile from django.forms import ValidationError from django.utils.encoding import smart_str @@ -294,7 +295,7 @@ def __init__(self, filename: str): def render( self, - value: Model | None, + value: FieldFile | None, obj=None, **kwargs, ) -> str | None: diff --git a/test_project/tests/integration_tests/test_admin/test_export.py b/test_project/tests/integration_tests/test_admin/test_export.py deleted file mode 100644 index 0f9420d..0000000 --- a/test_project/tests/integration_tests/test_admin/test_export.py +++ /dev/null @@ -1,462 +0,0 @@ -"""There are kind a functional tests for export using Django Admin.""" - -from django.contrib.auth.models import User -from django.test.client import Client -from django.urls import reverse - -from rest_framework import status - -import pytest -import pytest_mock -from celery import states -from pytest_lazy_fixtures import lf - -from import_export_extensions.models import ExportJob -from test_project.fake_app.factories import ArtistExportJobFactory - - -@pytest.mark.usefixtures("existing_artist") -@pytest.mark.django_db(transaction=True) -def test_export_using_admin_model(client: Client, superuser: User): - """Test entire exporting process using Django Admin. - - There is following workflow: - 1. Go to Artist Export page - 2. Start Export Job with chosen file format (csv) - 3. Redirect to export job result page through the status page - - """ - client.force_login(superuser) - - # Make get request to admin export page - export_get_response = client.get( - path=reverse("admin:fake_app_artist_export"), - ) - assert export_get_response.status_code == status.HTTP_200_OK - - # Start export job using admin panel - start_export_response = client.post( - path=reverse("admin:fake_app_artist_export"), - data={ - "format": 0, - }, - ) - assert start_export_response.status_code == status.HTTP_302_FOUND - - # Go to redirected page after export is finished - status_response = client.get(start_export_response.url) - assert status_response.status_code == status.HTTP_302_FOUND - - result_response = client.get(status_response.url) - assert result_response.status_code == status.HTTP_200_OK - - assert ExportJob.objects.exists() - export_job = ExportJob.objects.first() - assert export_job.export_status == ExportJob.ExportStatus.EXPORTED - - -@pytest.mark.parametrize( - argnames=["view_name", "path_kwargs"], - argvalues=[ - pytest.param( - "admin:fake_app_artist_export", - None, - id="Test access to `celery_export_action`", - ), - pytest.param( - "admin:fake_app_artist_export_job_status", - {"job_id": lf("artist_export_job.pk")}, - id="Test access to `export_job_status_view`", - ), - pytest.param( - "admin:fake_app_artist_export_job_results", - {"job_id": lf("artist_export_job.pk")}, - id="Test access to `export_job_results_view`", - ), - ], -) -def test_export_using_admin_model_without_permissions( - client: Client, - superuser: User, - view_name: str, - path_kwargs: dict[str, str], - mocker: pytest_mock.MockerFixture, -): - """Test access to celery-export endpoints forbidden without permission.""" - client.force_login(superuser) - mocker.patch( - "test_project.fake_app.admin.ArtistAdmin.has_export_permission", - return_value=False, - ) - - response = client.get( - path=reverse( - view_name, - kwargs=path_kwargs, - ), - ) - assert response.status_code == status.HTTP_403_FORBIDDEN - - -def test_celery_export_status_view_during_export( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test export status page when export in progress.""" - client.force_login(superuser) - - mocker.patch("import_export_extensions.tasks.export_data_task.apply_async") - artist_export_job = ArtistExportJobFactory() - artist_export_job.export_status = ExportJob.ExportStatus.EXPORTING - artist_export_job.save() - - response = client.get( - path=reverse( - "admin:fake_app_artist_export_job_status", - kwargs={"job_id": artist_export_job.pk}, - ), - ) - - expected_export_job_url = reverse( - "admin:export_job_progress", - kwargs={"job_id": artist_export_job.id}, - ) - - assert response.status_code == status.HTTP_200_OK - assert response.context["export_job_url"] == expected_export_job_url - - -@pytest.mark.parametrize( - argnames="incorrect_job_status", - argvalues=[ - ExportJob.ExportStatus.CREATED, - ExportJob.ExportStatus.EXPORTING, - ExportJob.ExportStatus.CANCELLED, - ], -) -def test_celery_export_results_view_redirect_to_status_page( - client: Client, - superuser: User, - incorrect_job_status: ExportJob.ExportStatus, - mocker: pytest_mock.MockerFixture, -): - """Test redirect to export status page when job in not results statuses.""" - client.force_login(superuser) - - mocker.patch("import_export_extensions.tasks.export_data_task.apply_async") - artist_export_job = ArtistExportJobFactory() - artist_export_job.export_status = incorrect_job_status - artist_export_job.save() - - response = client.get( - path=reverse( - "admin:fake_app_artist_export_job_results", - kwargs={"job_id": artist_export_job.pk}, - ), - ) - - expected_redirect_url = reverse( - "admin:fake_app_artist_export_job_status", - kwargs={"job_id": artist_export_job.pk}, - ) - assert response.status_code == status.HTTP_302_FOUND - assert response.url == expected_redirect_url - - -@pytest.mark.django_db(transaction=True) -def test_export_progress_during_export( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test export job admin progress page during export.""" - client.force_login(superuser) - - # Prepare data to imitate intermediate task state - fake_progress_info = { - "current": 2, - "total": 3, - } - mocker.patch( - "celery.result.AsyncResult.info", - new=fake_progress_info, - ) - expected_percent = int( - fake_progress_info["current"] / fake_progress_info["total"] * 100, - ) - - artist_export_job = ArtistExportJobFactory() - artist_export_job.export_status = ExportJob.ExportStatus.EXPORTING - artist_export_job.save() - - response = client.post( - path=reverse( - "admin:export_job_progress", - kwargs={"job_id": artist_export_job.pk}, - ), - ) - assert response.status_code == status.HTTP_200_OK - json_data = response.json() - - assert json_data == { - "status": ExportJob.ExportStatus.EXPORTING.title(), - "state": "SUCCESS", - "percent": expected_percent, - **fake_progress_info, - } - - -@pytest.mark.django_db(transaction=True) -def test_export_progress_after_complete_export( - client: Client, - superuser: User, -): - """Test export job admin progress page after complete export.""" - client.force_login(superuser) - - artist_export_job = ArtistExportJobFactory() - artist_export_job.refresh_from_db() - - response = client.post( - path=reverse( - "admin:export_job_progress", - kwargs={"job_id": artist_export_job.pk}, - ), - ) - assert response.status_code == status.HTTP_200_OK - assert response.json() == { - "status": artist_export_job.export_status.title(), - } - - -@pytest.mark.django_db(transaction=True) -def test_export_progress_with_deleted_export_job( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test export job admin progress page with deleted export job.""" - client.force_login(superuser) - - mocker.patch("import_export_extensions.tasks.export_data_task.apply_async") - artist_export_job = ArtistExportJobFactory() - job_id = artist_export_job.id - artist_export_job.delete() - - expected_error_message = "ExportJob matching query does not exist." - - response = client.post( - path=reverse( - "admin:export_job_progress", - kwargs={ - "job_id": job_id, - }, - ), - ) - - assert response.status_code == status.HTTP_404_NOT_FOUND - assert response.json()["validation_error"] == expected_error_message - - -@pytest.mark.django_db(transaction=True) -def test_export_progress_with_failed_celery_task( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test than after celery fail ExportJob will be in export error status.""" - client.force_login(superuser) - - expected_error_message = "Mocked Error Message" - mocker.patch( - "celery.result.AsyncResult.state", - new=states.FAILURE, - ) - mocker.patch( - "celery.result.AsyncResult.info", - new=ValueError(expected_error_message), - ) - artist_export_job = ArtistExportJobFactory() - artist_export_job.export_status = ExportJob.ExportStatus.EXPORTING - artist_export_job.save() - - response = client.post( - path=reverse( - "admin:export_job_progress", - kwargs={ - "job_id": artist_export_job.id, - }, - ), - ) - - assert response.status_code == status.HTTP_200_OK - assert response.json()["state"] == states.FAILURE - artist_export_job.refresh_from_db() - assert ( - artist_export_job.export_status == ExportJob.ExportStatus.EXPORT_ERROR - ) - - -@pytest.mark.django_db(transaction=True) -def test_cancel_export_admin_action( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test `cancel_export` via admin action.""" - client.force_login(superuser) - - revoke_mock = mocker.patch("celery.current_app.control.revoke") - export_data_mock = mocker.patch( - "import_export_extensions.models.ExportJob.export_data", - ) - job: ExportJob = ArtistExportJobFactory() - - response = client.post( - reverse("admin:import_export_extensions_exportjob_changelist"), - data={ - "action": "cancel_jobs", - "_selected_action": [job.pk], - }, - follow=True, - ) - job.refresh_from_db() - - assert response.status_code == status.HTTP_200_OK - assert job.export_status == ExportJob.ExportStatus.CANCELLED - assert ( - response.context["messages"]._loaded_data[0].message - == f"Export of {job} canceled" - ) - export_data_mock.assert_called_once() - revoke_mock.assert_called_once_with(job.export_task_id, terminate=True) - - -@pytest.mark.django_db(transaction=True) -def test_cancel_export_admin_action_with_incorrect_export_job_status( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test `cancel_export` via admin action with wrong export job status.""" - client.force_login(superuser) - - revoke_mock = mocker.patch("celery.current_app.control.revoke") - job: ExportJob = ArtistExportJobFactory() - - expected_error_message = f"ExportJob with id {job.pk} has incorrect status" - - response = client.post( - reverse("admin:import_export_extensions_exportjob_changelist"), - data={ - "action": "cancel_jobs", - "_selected_action": [job.pk], - }, - follow=True, - ) - job.refresh_from_db() - - assert response.status_code == status.HTTP_200_OK - assert job.export_status == ExportJob.ExportStatus.EXPORTED - assert ( - expected_error_message - in response.context["messages"]._loaded_data[0].message - ) - revoke_mock.assert_not_called() - - -@pytest.mark.parametrize( - argnames=["job_status", "expected_fieldsets"], - argvalues=[ - pytest.param( - ExportJob.ExportStatus.CREATED, - ( - ( - "export_status", - "_model", - "created", - "export_started", - "export_finished", - ), - ), - id="Get fieldsets for job in status CREATED", - ), - pytest.param( - ExportJob.ExportStatus.EXPORTED, - ( - ( - "export_status", - "_model", - "created", - "export_started", - "export_finished", - ), - ("data_file",), - ), - id="Get fieldsets for job in status EXPORTED", - ), - pytest.param( - ExportJob.ExportStatus.EXPORTING, - ( - ( - "export_status", - "export_progressbar", - ), - ), - id="Get fieldsets for job in status EXPORTING", - ), - pytest.param( - ExportJob.ExportStatus.EXPORT_ERROR, - ( - ( - "export_status", - "_model", - "created", - "export_started", - "export_finished", - ), - ( - "error_message", - "traceback", - ), - ), - id="Get fieldsets for job in status EXPORT_ERROR", - ), - ], -) -def test_get_fieldsets_by_export_job_status( - client: Client, - superuser: User, - job_status: ExportJob.ExportStatus, - expected_fieldsets: tuple[tuple[str]], - mocker: pytest_mock.MockerFixture, -): - """Test that appropriate fieldsets returned for different job statuses.""" - client.force_login(superuser) - - mocker.patch( - "import_export_extensions.models.ExportJob.export_data", - ) - job: ExportJob = ArtistExportJobFactory() - job.export_status = job_status - job.save() - - response = client.get( - reverse( - "admin:import_export_extensions_exportjob_change", - kwargs={"object_id": job.pk}, - ), - ) - - fieldsets = response.context["adminform"].fieldsets - fields = [fields["fields"] for _, fields in fieldsets] - - assert tuple(fields) == ( - *expected_fieldsets, - ( - "resource_path", - "resource_kwargs", - "file_format_path", - ), - ) diff --git a/test_project/tests/integration_tests/test_admin/test_export/__init__.py b/test_project/tests/integration_tests/test_admin/test_export/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test_project/tests/integration_tests/test_admin/test_export/test_admin_actions.py b/test_project/tests/integration_tests/test_admin/test_export/test_admin_actions.py new file mode 100644 index 0000000..ad8259f --- /dev/null +++ b/test_project/tests/integration_tests/test_admin/test_export/test_admin_actions.py @@ -0,0 +1,79 @@ +from django.contrib.auth.models import User +from django.test.client import Client +from django.urls import reverse + +from rest_framework import status + +import pytest +import pytest_mock + +from import_export_extensions.models import ExportJob +from test_project.fake_app.factories import ArtistExportJobFactory + + +@pytest.mark.django_db(transaction=True) +def test_cancel_export_admin_action( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test `cancel_export` via admin action.""" + client.force_login(superuser) + + revoke_mock = mocker.patch("celery.current_app.control.revoke") + export_data_mock = mocker.patch( + "import_export_extensions.models.ExportJob.export_data", + ) + job: ExportJob = ArtistExportJobFactory() + + response = client.post( + reverse("admin:import_export_extensions_exportjob_changelist"), + data={ + "action": "cancel_jobs", + "_selected_action": [job.pk], + }, + follow=True, + ) + job.refresh_from_db() + + assert response.status_code == status.HTTP_200_OK + assert job.export_status == ExportJob.ExportStatus.CANCELLED + assert ( + response.context["messages"]._loaded_data[0].message + == f"Export of {job} canceled" + ) + export_data_mock.assert_called_once() + revoke_mock.assert_called_once_with(job.export_task_id, terminate=True) + + +@pytest.mark.django_db(transaction=True) +def test_cancel_export_admin_action_with_incorrect_export_job_status( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test `cancel_export` via admin action with wrong export job status.""" + client.force_login(superuser) + + revoke_mock = mocker.patch("celery.current_app.control.revoke") + job: ExportJob = ArtistExportJobFactory() + + expected_error_message = f"ExportJob with id {job.pk} has incorrect status" + + response = client.post( + reverse("admin:import_export_extensions_exportjob_changelist"), + data={ + "action": "cancel_jobs", + "_selected_action": [job.pk], + }, + follow=True, + ) + job.refresh_from_db() + + assert response.status_code == status.HTTP_200_OK + assert job.export_status == ExportJob.ExportStatus.EXPORTED + assert ( + expected_error_message + in response.context["messages"]._loaded_data[0].message + ) + revoke_mock.assert_not_called() diff --git a/test_project/tests/integration_tests/test_admin/test_export/test_admin_class.py b/test_project/tests/integration_tests/test_admin/test_export/test_admin_class.py new file mode 100644 index 0000000..524388b --- /dev/null +++ b/test_project/tests/integration_tests/test_admin/test_export/test_admin_class.py @@ -0,0 +1,105 @@ +from django.contrib.auth.models import User +from django.test.client import Client +from django.urls import reverse + +import pytest +import pytest_mock + +from import_export_extensions.models import ExportJob +from test_project.fake_app.factories import ArtistExportJobFactory + + +@pytest.mark.parametrize( + argnames=["job_status", "expected_fieldsets"], + argvalues=[ + pytest.param( + ExportJob.ExportStatus.CREATED, + ( + ( + "export_status", + "_model", + "created", + "export_started", + "export_finished", + ), + ), + id="Get fieldsets for job in status CREATED", + ), + pytest.param( + ExportJob.ExportStatus.EXPORTED, + ( + ( + "export_status", + "_model", + "created", + "export_started", + "export_finished", + ), + ("data_file",), + ), + id="Get fieldsets for job in status EXPORTED", + ), + pytest.param( + ExportJob.ExportStatus.EXPORTING, + ( + ( + "export_status", + "export_progressbar", + ), + ), + id="Get fieldsets for job in status EXPORTING", + ), + pytest.param( + ExportJob.ExportStatus.EXPORT_ERROR, + ( + ( + "export_status", + "_model", + "created", + "export_started", + "export_finished", + ), + ( + "error_message", + "traceback", + ), + ), + id="Get fieldsets for job in status EXPORT_ERROR", + ), + ], +) +def test_get_fieldsets_by_export_job_status( + client: Client, + superuser: User, + job_status: ExportJob.ExportStatus, + expected_fieldsets: tuple[tuple[str]], + mocker: pytest_mock.MockerFixture, +): + """Test that appropriate fieldsets returned for different job statuses.""" + client.force_login(superuser) + + mocker.patch( + "import_export_extensions.models.ExportJob.export_data", + ) + job: ExportJob = ArtistExportJobFactory() + job.export_status = job_status + job.save() + + response = client.get( + reverse( + "admin:import_export_extensions_exportjob_change", + kwargs={"object_id": job.pk}, + ), + ) + + fieldsets = response.context["adminform"].fieldsets + fields = [fields["fields"] for _, fields in fieldsets] + + assert tuple(fields) == ( + *expected_fieldsets, + ( + "resource_path", + "resource_kwargs", + "file_format_path", + ), + ) diff --git a/test_project/tests/integration_tests/test_admin/test_export/test_celery_endpoints.py b/test_project/tests/integration_tests/test_admin/test_export/test_celery_endpoints.py new file mode 100644 index 0000000..ea5cca1 --- /dev/null +++ b/test_project/tests/integration_tests/test_admin/test_export/test_celery_endpoints.py @@ -0,0 +1,77 @@ +from django.contrib.auth.models import User +from django.test.client import Client +from django.urls import reverse + +from rest_framework import status + +import pytest +import pytest_mock + +from import_export_extensions.models import ExportJob +from test_project.fake_app.factories import ArtistExportJobFactory + + +def test_celery_export_status_view_during_export( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test export status page when export in progress.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.export_data_task.apply_async") + artist_export_job = ArtistExportJobFactory() + artist_export_job.export_status = ExportJob.ExportStatus.EXPORTING + artist_export_job.save() + + response = client.get( + path=reverse( + "admin:fake_app_artist_export_job_status", + kwargs={"job_id": artist_export_job.pk}, + ), + ) + + expected_export_job_url = reverse( + "admin:export_job_progress", + kwargs={"job_id": artist_export_job.id}, + ) + + assert response.status_code == status.HTTP_200_OK + assert response.context["export_job_url"] == expected_export_job_url + + +@pytest.mark.parametrize( + argnames="incorrect_job_status", + argvalues=[ + ExportJob.ExportStatus.CREATED, + ExportJob.ExportStatus.EXPORTING, + ExportJob.ExportStatus.CANCELLED, + ], +) +def test_celery_export_results_view_redirect_to_status_page( + client: Client, + superuser: User, + incorrect_job_status: ExportJob.ExportStatus, + mocker: pytest_mock.MockerFixture, +): + """Test redirect to export status page when job in not results statuses.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.export_data_task.apply_async") + artist_export_job = ArtistExportJobFactory() + artist_export_job.export_status = incorrect_job_status + artist_export_job.save() + + response = client.get( + path=reverse( + "admin:fake_app_artist_export_job_results", + kwargs={"job_id": artist_export_job.pk}, + ), + ) + + expected_redirect_url = reverse( + "admin:fake_app_artist_export_job_status", + kwargs={"job_id": artist_export_job.pk}, + ) + assert response.status_code == status.HTTP_302_FOUND + assert response.url == expected_redirect_url diff --git a/test_project/tests/integration_tests/test_admin/test_export/test_export.py b/test_project/tests/integration_tests/test_admin/test_export/test_export.py new file mode 100644 index 0000000..83420fd --- /dev/null +++ b/test_project/tests/integration_tests/test_admin/test_export/test_export.py @@ -0,0 +1,230 @@ +from django.contrib.auth.models import User +from django.test.client import Client +from django.urls import reverse + +from rest_framework import status + +import pytest +import pytest_mock +from celery import states +from pytest_lazy_fixtures import lf + +from import_export_extensions.models import ExportJob +from test_project.fake_app.factories import ArtistExportJobFactory + + +@pytest.mark.usefixtures("existing_artist") +@pytest.mark.django_db(transaction=True) +def test_export_using_admin_model(client: Client, superuser: User): + """Test entire exporting process using Django Admin. + + There is following workflow: + 1. Go to Artist Export page + 2. Start Export Job with chosen file format (csv) + 3. Redirect to export job result page through the status page + + """ + client.force_login(superuser) + + # Make get request to admin export page + export_get_response = client.get( + path=reverse("admin:fake_app_artist_export"), + ) + assert export_get_response.status_code == status.HTTP_200_OK + + # Start export job using admin panel + start_export_response = client.post( + path=reverse("admin:fake_app_artist_export"), + data={ + "format": 0, + }, + ) + assert start_export_response.status_code == status.HTTP_302_FOUND + + # Go to redirected page after export is finished + status_response = client.get(start_export_response.url) + assert status_response.status_code == status.HTTP_302_FOUND + + result_response = client.get(status_response.url) + assert result_response.status_code == status.HTTP_200_OK + + assert ExportJob.objects.exists() + export_job = ExportJob.objects.first() + assert export_job.export_status == ExportJob.ExportStatus.EXPORTED + + +@pytest.mark.parametrize( + argnames=["view_name", "path_kwargs"], + argvalues=[ + pytest.param( + "admin:fake_app_artist_export", + None, + id="Test access to `celery_export_action`", + ), + pytest.param( + "admin:fake_app_artist_export_job_status", + {"job_id": lf("artist_export_job.pk")}, + id="Test access to `export_job_status_view`", + ), + pytest.param( + "admin:fake_app_artist_export_job_results", + {"job_id": lf("artist_export_job.pk")}, + id="Test access to `export_job_results_view`", + ), + ], +) +def test_export_using_admin_model_without_permissions( + client: Client, + superuser: User, + view_name: str, + path_kwargs: dict[str, str], + mocker: pytest_mock.MockerFixture, +): + """Test access to celery-export endpoints forbidden without permission.""" + client.force_login(superuser) + mocker.patch( + "test_project.fake_app.admin.ArtistAdmin.has_export_permission", + return_value=False, + ) + + response = client.get( + path=reverse( + view_name, + kwargs=path_kwargs, + ), + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + +@pytest.mark.django_db(transaction=True) +def test_export_progress_during_export( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test export job admin progress page during export.""" + client.force_login(superuser) + + # Prepare data to imitate intermediate task state + fake_progress_info = { + "current": 2, + "total": 3, + } + mocker.patch( + "celery.result.AsyncResult.info", + new=fake_progress_info, + ) + expected_percent = int( + fake_progress_info["current"] / fake_progress_info["total"] * 100, + ) + + artist_export_job = ArtistExportJobFactory() + artist_export_job.export_status = ExportJob.ExportStatus.EXPORTING + artist_export_job.save() + + response = client.post( + path=reverse( + "admin:export_job_progress", + kwargs={"job_id": artist_export_job.pk}, + ), + ) + assert response.status_code == status.HTTP_200_OK + json_data = response.json() + + assert json_data == { + "status": ExportJob.ExportStatus.EXPORTING.title(), + "state": "SUCCESS", + "percent": expected_percent, + **fake_progress_info, + } + + +@pytest.mark.django_db(transaction=True) +def test_export_progress_after_complete_export( + client: Client, + superuser: User, +): + """Test export job admin progress page after complete export.""" + client.force_login(superuser) + + artist_export_job = ArtistExportJobFactory() + artist_export_job.refresh_from_db() + + response = client.post( + path=reverse( + "admin:export_job_progress", + kwargs={"job_id": artist_export_job.pk}, + ), + ) + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "status": artist_export_job.export_status.title(), + } + + +@pytest.mark.django_db(transaction=True) +def test_export_progress_with_deleted_export_job( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test export job admin progress page with deleted export job.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.export_data_task.apply_async") + artist_export_job = ArtistExportJobFactory() + job_id = artist_export_job.id + artist_export_job.delete() + + expected_error_message = "ExportJob matching query does not exist." + + response = client.post( + path=reverse( + "admin:export_job_progress", + kwargs={ + "job_id": job_id, + }, + ), + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + assert response.json()["validation_error"] == expected_error_message + + +@pytest.mark.django_db(transaction=True) +def test_export_progress_with_failed_celery_task( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test that after celery fail ExportJob will be in export error status.""" + client.force_login(superuser) + + expected_error_message = "Mocked Error Message" + mocker.patch( + "celery.result.AsyncResult.state", + new=states.FAILURE, + ) + mocker.patch( + "celery.result.AsyncResult.info", + new=ValueError(expected_error_message), + ) + artist_export_job = ArtistExportJobFactory() + artist_export_job.export_status = ExportJob.ExportStatus.EXPORTING + artist_export_job.save() + + response = client.post( + path=reverse( + "admin:export_job_progress", + kwargs={ + "job_id": artist_export_job.id, + }, + ), + ) + + assert response.status_code == status.HTTP_200_OK + assert response.json()["state"] == states.FAILURE + artist_export_job.refresh_from_db() + assert ( + artist_export_job.export_status == ExportJob.ExportStatus.EXPORT_ERROR + ) diff --git a/test_project/tests/integration_tests/test_admin/test_import.py b/test_project/tests/integration_tests/test_admin/test_import.py deleted file mode 100644 index 9ad0ca1..0000000 --- a/test_project/tests/integration_tests/test_admin/test_import.py +++ /dev/null @@ -1,626 +0,0 @@ -"""There are kind a functional tests for import using Django Admin.""" - -from django.contrib.auth.models import User -from django.core.files.uploadedfile import SimpleUploadedFile -from django.test.client import Client -from django.urls import reverse - -from rest_framework import status - -import pytest -import pytest_mock -from celery import states -from pytest_lazy_fixtures import lf - -from import_export_extensions.models import ImportJob -from test_project.fake_app.factories import ArtistImportJobFactory - - -@pytest.mark.usefixtures("existing_artist") -@pytest.mark.django_db(transaction=True) -def test_import_using_admin_model( - client: Client, - superuser: User, - uploaded_file: SimpleUploadedFile, -): - """Test entire importing process using Django Admin. - - There is following workflow: - 1. Go to Artist Import page - 2. Start Import Job with chosen file - 3. Redirect to the import job parsing result page through - the status page - 4. Confirm import - 5. Redirect to the import job importing page through the status page - - """ - client.force_login(superuser) - - # Go to import page in admin panel - import_response = client.get( - path=reverse("admin:fake_app_artist_import"), - ) - assert import_response.status_code == status.HTTP_200_OK - - # Start import job using admin panel - start_import_job_response = client.post( - path=reverse("admin:fake_app_artist_import"), - data={ - "import_file": uploaded_file, - "format": 0, # Choose CSV format - }, - ) - assert start_import_job_response.status_code == status.HTTP_302_FOUND - - # Go to import job status page - # Ensure there is another one redirect because import job finished - status_page_response = client.get(path=start_import_job_response.url) - assert status_page_response.status_code == status.HTTP_302_FOUND - - assert ImportJob.objects.exists() - - import_job = ImportJob.objects.first() - - # Go to results page - result_page_get_response = client.get(status_page_response.url) - assert result_page_get_response.status_code == status.HTTP_200_OK - - # Confirm import on result page - confirm_response = client.post( - path=status_page_response.url, - data={"confirm": "Confirm import"}, - ) - assert confirm_response.status_code == status.HTTP_302_FOUND - - # Ensure import job finished and redirected to result page - import_status_response = client.get(path=confirm_response.url) - assert import_status_response.status_code == status.HTTP_302_FOUND - - result_response = client.get(path=import_status_response.url) - - assert result_response.status_code == status.HTTP_200_OK - - import_job.refresh_from_db() - assert import_job.import_status == ImportJob.ImportStatus.IMPORTED - - -@pytest.mark.parametrize( - argnames=["view_name", "path_kwargs"], - argvalues=[ - pytest.param( - "admin:fake_app_artist_import", - None, - id="Test access to `celery_import_action`", - ), - pytest.param( - "admin:fake_app_artist_import_job_status", - {"job_id": lf("artist_import_job.pk")}, - id="Test access to `import_job_status_view`", - ), - pytest.param( - "admin:fake_app_artist_import_job_results", - {"job_id": lf("artist_import_job.pk")}, - id="Test access to `import_job_results_view`", - ), - ], -) -def test_import_using_admin_model_without_permissions( - client: Client, - superuser: User, - view_name: str, - path_kwargs: dict[str, str], - mocker: pytest_mock.MockerFixture, -): - """Test access to celery-import endpoints forbidden without permission.""" - client.force_login(superuser) - mocker.patch( - "test_project.fake_app.admin.ArtistAdmin.has_import_permission", - return_value=False, - ) - response = client.get( - path=reverse( - view_name, - kwargs=path_kwargs, - ), - ) - assert response.status_code == status.HTTP_403_FORBIDDEN - - -def test_celery_import_status_view_during_import( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test import status page when import in progress.""" - client.force_login(superuser) - - mocker.patch("import_export_extensions.tasks.parse_data_task.apply_async") - artist_import_job = ArtistImportJobFactory(skip_parse_step=True) - artist_import_job.import_status = ImportJob.ImportStatus.IMPORTING - artist_import_job.save() - - response = client.get( - path=reverse( - "admin:fake_app_artist_import_job_status", - kwargs={"job_id": artist_import_job.pk}, - ), - ) - - expected_import_job_url = reverse( - "admin:import_job_progress", - kwargs={"job_id": artist_import_job.id}, - ) - - assert response.status_code == status.HTTP_200_OK - assert response.context["import_job_url"] == expected_import_job_url - - -@pytest.mark.parametrize( - argnames="incorrect_job_status", - argvalues=[ - ImportJob.ImportStatus.CREATED, - ImportJob.ImportStatus.PARSING, - ImportJob.ImportStatus.PARSE_ERROR, - ImportJob.ImportStatus.CONFIRMED, - ImportJob.ImportStatus.IMPORTING, - ImportJob.ImportStatus.CANCELLED, - ], -) -def test_celery_import_results_view_redirect_to_status_page( - client: Client, - superuser: User, - incorrect_job_status: ImportJob.ImportStatus, - mocker: pytest_mock.MockerFixture, -): - """Test redirect to import status page when job in not result status.""" - client.force_login(superuser) - - mocker.patch("import_export_extensions.tasks.parse_data_task.apply_async") - artist_import_job = ArtistImportJobFactory() - artist_import_job.import_status = incorrect_job_status - artist_import_job.save() - - response = client.get( - path=reverse( - "admin:fake_app_artist_import_job_results", - kwargs={"job_id": artist_import_job.pk}, - ), - ) - - expected_redirect_url = reverse( - "admin:fake_app_artist_import_job_status", - kwargs={"job_id": artist_import_job.pk}, - ) - assert response.status_code == status.HTTP_302_FOUND - assert response.url == expected_redirect_url - - -@pytest.mark.parametrize( - argnames="incorrect_job_status", - argvalues=[ - ImportJob.ImportStatus.INPUT_ERROR, - ImportJob.ImportStatus.IMPORT_ERROR, - ImportJob.ImportStatus.IMPORTED, - ], -) -def test_celery_import_results_confirm_forbidden( - client: Client, - superuser: User, - incorrect_job_status: ImportJob.ImportStatus, - mocker: pytest_mock.MockerFixture, -): - """Check that confirm from result page forbidden for not PARSED jobs.""" - client.force_login(superuser) - - mocker.patch("import_export_extensions.tasks.parse_data_task.apply_async") - artist_import_job = ArtistImportJobFactory() - artist_import_job.import_status = incorrect_job_status - artist_import_job.save() - - response = client.post( - path=reverse( - "admin:fake_app_artist_import_job_results", - kwargs={"job_id": artist_import_job.pk}, - ), - data={"confirm": "Confirm import"}, - ) - - assert response.status_code == status.HTTP_403_FORBIDDEN - - -@pytest.mark.usefixtures("existing_artist") -def test_import_admin_has_same_formats( - client: Client, - superuser: User, - artist_import_job: ImportJob, -): - """Ensure input formats on import forms are the same. - - Ensure Import forms on import and on import result pages - fetch format choices from the same source. - - """ - client.force_login(superuser) - artist_import_job.import_status = ImportJob.ImportStatus.IMPORTED - artist_import_job.save() - import_response = client.get( - path=reverse("admin:fake_app_artist_import"), - ) - import_response_result = client.get( - path=reverse( - "admin:fake_app_artist_import_job_results", - kwargs={"job_id": artist_import_job.id}, - ), - ) - import_response_form = import_response.context_data["form"] - import_response_result_form = import_response_result.context_data[ - "import_form" - ] - assert ( - import_response_form.fields["format"].choices - == import_response_result_form.fields["format"].choices - ) - - -@pytest.mark.django_db(transaction=True) -def test_import_progress_during_import( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test import job admin progress page during import.""" - client.force_login(superuser) - - # Prepare data to imitate intermediate task state - fake_progress_info = { - "current": 2, - "total": 3, - } - mocker.patch( - "celery.result.AsyncResult.info", - new=fake_progress_info, - ) - expected_percent = int( - fake_progress_info["current"] / fake_progress_info["total"] * 100, - ) - - artist_import_job = ArtistImportJobFactory( - skip_parse_step=True, - ) - artist_import_job.import_status = ImportJob.ImportStatus.IMPORTING - artist_import_job.save() - - response = client.post( - path=reverse( - "admin:import_job_progress", - kwargs={"job_id": artist_import_job.pk}, - ), - ) - - assert response.status_code == status.HTTP_200_OK - assert response.json() == { - "status": ImportJob.ImportStatus.IMPORTING.title(), - "state": "SUCCESS", - "percent": expected_percent, - **fake_progress_info, - } - - -@pytest.mark.django_db(transaction=True) -def test_import_progress_after_complete_import( - client: Client, - superuser: User, -): - """Test import job admin progress page after complete import.""" - client.force_login(superuser) - - artist_import_job = ArtistImportJobFactory( - skip_parse_step=True, - ) - artist_import_job.refresh_from_db() - - response = client.post( - path=reverse( - "admin:import_job_progress", - kwargs={"job_id": artist_import_job.pk}, - ), - ) - assert response.status_code == status.HTTP_200_OK - assert response.json() == { - "status": artist_import_job.import_status.title(), - } - - -@pytest.mark.django_db(transaction=True) -def test_import_progress_with_deleted_import_job( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test import job admin progress page with deleted import job.""" - client.force_login(superuser) - - mocker.patch("import_export_extensions.tasks.import_data_task.apply_async") - artist_import_job = ArtistImportJobFactory() - job_id = artist_import_job.id - artist_import_job.delete() - - expected_error_message = "ImportJob matching query does not exist." - - response = client.post( - path=reverse( - "admin:import_job_progress", - kwargs={ - "job_id": job_id, - }, - ), - ) - - assert response.status_code == status.HTTP_404_NOT_FOUND - assert response.json()["validation_error"] == expected_error_message - - -@pytest.mark.django_db(transaction=True) -def test_import_progress_with_failed_celery_task( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test than after celery fail ImportJob will be in import error status.""" - client.force_login(superuser) - - expected_error_message = "Mocked Error Message" - mocker.patch( - "celery.result.AsyncResult.state", - new=states.FAILURE, - ) - mocker.patch( - "celery.result.AsyncResult.info", - new=ValueError(expected_error_message), - ) - artist_import_job = ArtistImportJobFactory() - artist_import_job.refresh_from_db() - artist_import_job.confirm_import() - artist_import_job.import_status = ImportJob.ImportStatus.IMPORTING - artist_import_job.save() - - response = client.post( - path=reverse( - "admin:import_job_progress", - kwargs={ - "job_id": artist_import_job.id, - }, - ), - ) - - assert response.status_code == status.HTTP_200_OK - assert response.json()["state"] == states.FAILURE - artist_import_job.refresh_from_db() - assert ( - artist_import_job.import_status == ImportJob.ImportStatus.IMPORT_ERROR - ) - - -@pytest.mark.django_db(transaction=True) -def test_cancel_import_admin_action( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test `cancel_import` via admin action.""" - client.force_login(superuser) - - revoke_mock = mocker.patch("celery.current_app.control.revoke") - import_data_mock = mocker.patch( - "import_export_extensions.models.ImportJob.import_data", - ) - artist_import_job = ArtistImportJobFactory() - artist_import_job.refresh_from_db() - artist_import_job.confirm_import() - - response = client.post( - reverse("admin:import_export_extensions_importjob_changelist"), - data={ - "action": "cancel_jobs", - "_selected_action": [artist_import_job.pk], - }, - follow=True, - ) - artist_import_job.refresh_from_db() - - assert response.status_code == status.HTTP_200_OK - assert artist_import_job.import_status == ImportJob.ImportStatus.CANCELLED - assert ( - response.context["messages"]._loaded_data[0].message - == f"Import of {artist_import_job} canceled" - ) - import_data_mock.assert_called_once() - revoke_mock.assert_called_once_with( - artist_import_job.import_task_id, - terminate=True, - ) - - -@pytest.mark.django_db(transaction=True) -def test_cancel_import_admin_action_with_incorrect_import_job_status( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test `cancel_import` via admin action with wrong import job status.""" - client.force_login(superuser) - - revoke_mock = mocker.patch("celery.current_app.control.revoke") - artist_import_job = ArtistImportJobFactory() - - expected_error_message = ( - f"ImportJob with id {artist_import_job.pk} has incorrect status" - ) - - response = client.post( - reverse("admin:import_export_extensions_importjob_changelist"), - data={ - "action": "cancel_jobs", - "_selected_action": [artist_import_job.pk], - }, - follow=True, - ) - artist_import_job.refresh_from_db() - - assert response.status_code == status.HTTP_200_OK - assert artist_import_job.import_status == ImportJob.ImportStatus.PARSED - assert ( - expected_error_message - in response.context["messages"]._loaded_data[0].message - ) - revoke_mock.assert_not_called() - - -@pytest.mark.django_db(transaction=True) -def test_confirm_import_admin_action( - client: Client, - superuser: User, - mocker: pytest_mock.MockerFixture, -): - """Test `confirm_import` via admin action.""" - client.force_login(superuser) - - import_data_mock = mocker.patch( - "import_export_extensions.models.ImportJob.import_data", - ) - artist_import_job = ArtistImportJobFactory() - artist_import_job.refresh_from_db() - - response = client.post( - reverse("admin:import_export_extensions_importjob_changelist"), - data={ - "action": "confirm_jobs", - "_selected_action": [artist_import_job.pk], - }, - follow=True, - ) - artist_import_job.refresh_from_db() - - assert response.status_code == status.HTTP_200_OK - assert artist_import_job.import_status == ImportJob.ImportStatus.CONFIRMED - assert ( - response.context["messages"]._loaded_data[0].message - == f"Import of {artist_import_job} confirmed" - ) - import_data_mock.assert_called_once() - - -@pytest.mark.django_db(transaction=True) -def test_confirm_import_admin_action_with_incorrect_import_job_status( - client: Client, - superuser: User, -): - """Test `confirm_import` via admin action with wrong import job status.""" - client.force_login(superuser) - - artist_import_job = ArtistImportJobFactory() - artist_import_job.import_status = ImportJob.ImportStatus.CANCELLED - artist_import_job.save() - - expected_error_message = ( - f"ImportJob with id {artist_import_job.pk} has incorrect status" - ) - - response = client.post( - reverse("admin:import_export_extensions_importjob_changelist"), - data={ - "action": "confirm_jobs", - "_selected_action": [artist_import_job.pk], - }, - follow=True, - ) - artist_import_job.refresh_from_db() - - assert response.status_code == status.HTTP_200_OK - assert artist_import_job.import_status == ImportJob.ImportStatus.CANCELLED - assert ( - expected_error_message - in response.context["messages"]._loaded_data[0].message - ) - - -@pytest.mark.parametrize( - argnames=["job_status", "expected_fieldsets"], - argvalues=[ - pytest.param( - ImportJob.ImportStatus.CREATED, - tuple(), - id="Get fieldsets for job in status CREATED", - ), - pytest.param( - ImportJob.ImportStatus.IMPORTED, - ( - ("_show_results",), - ( - "input_errors_file", - "_input_errors", - ), - ), - id="Get fieldsets for job in status IMPORTED", - ), - pytest.param( - ImportJob.ImportStatus.IMPORTING, - ( - ( - "import_status", - "import_progressbar", - ), - ), - id="Get fieldsets for job in status IMPORTING", - ), - pytest.param( - ImportJob.ImportStatus.IMPORT_ERROR, - (("traceback",),), - id="Get fieldsets for job in status IMPORT_ERROR", - ), - ], -) -def test_get_fieldsets_by_import_job_status( - client: Client, - superuser: User, - job_status: ImportJob.ImportStatus, - expected_fieldsets: tuple[tuple[str]], - mocker: pytest_mock.MockerFixture, -): - """Test that appropriate fieldsets returned for different job statuses.""" - client.force_login(superuser) - - mocker.patch( - "import_export_extensions.models.ImportJob.import_data", - ) - artist_import_job = ArtistImportJobFactory() - artist_import_job.import_status = job_status - artist_import_job.save() - - response = client.get( - reverse( - "admin:import_export_extensions_importjob_change", - kwargs={"object_id": artist_import_job.pk}, - ), - ) - - fieldsets = response.context["adminform"].fieldsets - fields = [fields["fields"] for _, fields in fieldsets] - - assert tuple(fields) == ( - ( - "import_status", - "_model", - "created_by", - "created", - "parse_finished", - "import_started", - "import_finished", - ), - *expected_fieldsets, - ( - "data_file", - "resource_path", - "resource_kwargs", - ), - ) diff --git a/test_project/tests/integration_tests/test_admin/test_import/__init__.py b/test_project/tests/integration_tests/test_admin/test_import/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test_project/tests/integration_tests/test_admin/test_import/test_admin_actions.py b/test_project/tests/integration_tests/test_admin/test_import/test_admin_actions.py new file mode 100644 index 0000000..9504e6b --- /dev/null +++ b/test_project/tests/integration_tests/test_admin/test_import/test_admin_actions.py @@ -0,0 +1,170 @@ +from django.contrib.auth.models import User +from django.test.client import Client +from django.urls import reverse + +from rest_framework import status + +import pytest +import pytest_mock + +from import_export_extensions.models import ImportJob +from test_project.fake_app.factories import ArtistImportJobFactory + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.parametrize( + argnames="allowed_cancel_status", + argvalues=[ + ImportJob.ImportStatus.CREATED, + ImportJob.ImportStatus.PARSING, + ImportJob.ImportStatus.IMPORTING, + ImportJob.ImportStatus.CONFIRMED, + ], +) +def test_cancel_import_admin_action( + client: Client, + superuser: User, + allowed_cancel_status: ImportJob.ImportStatus, + mocker: pytest_mock.MockerFixture, +): + """Test `cancel_import` via admin action.""" + client.force_login(superuser) + + revoke_mock = mocker.patch("celery.current_app.control.revoke") + artist_import_job = ArtistImportJobFactory() + artist_import_job.import_status = allowed_cancel_status + artist_import_job.save() + + response = client.post( + reverse("admin:import_export_extensions_importjob_changelist"), + data={ + "action": "cancel_jobs", + "_selected_action": [artist_import_job.pk], + }, + follow=True, + ) + artist_import_job.refresh_from_db() + + assert response.status_code == status.HTTP_200_OK + assert artist_import_job.import_status == ImportJob.ImportStatus.CANCELLED + assert ( + response.context["messages"]._loaded_data[0].message + == f"Import of {artist_import_job} canceled" + ) + revoke_mock.assert_called_once() + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.parametrize( + argnames="incorrect_job_status", + argvalues=[ + ImportJob.ImportStatus.INPUT_ERROR, + ImportJob.ImportStatus.PARSE_ERROR, + ImportJob.ImportStatus.IMPORT_ERROR, + ImportJob.ImportStatus.IMPORTED, + ImportJob.ImportStatus.CANCELLED, + ], +) +def test_cancel_import_admin_action_with_incorrect_import_job_status( + client: Client, + superuser: User, + incorrect_job_status: ImportJob.ImportStatus, + mocker: pytest_mock.MockerFixture, +): + """Test `cancel_import` via admin action with wrong import job status.""" + client.force_login(superuser) + + revoke_mock = mocker.patch("celery.current_app.control.revoke") + artist_import_job = ArtistImportJobFactory() + artist_import_job.import_status = incorrect_job_status + artist_import_job.save() + + expected_error_message = ( + f"ImportJob with id {artist_import_job.pk} has incorrect status" + ) + + response = client.post( + reverse("admin:import_export_extensions_importjob_changelist"), + data={ + "action": "cancel_jobs", + "_selected_action": [artist_import_job.pk], + }, + follow=True, + ) + artist_import_job.refresh_from_db() + + assert response.status_code == status.HTTP_200_OK + assert artist_import_job.import_status == incorrect_job_status + assert ( + expected_error_message + in response.context["messages"]._loaded_data[0].message + ) + revoke_mock.assert_not_called() + + +@pytest.mark.django_db(transaction=True) +def test_confirm_import_admin_action( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test `confirm_import` via admin action.""" + client.force_login(superuser) + + import_data_mock = mocker.patch( + "import_export_extensions.models.ImportJob.import_data", + ) + artist_import_job = ArtistImportJobFactory() + artist_import_job.refresh_from_db() + + response = client.post( + reverse("admin:import_export_extensions_importjob_changelist"), + data={ + "action": "confirm_jobs", + "_selected_action": [artist_import_job.pk], + }, + follow=True, + ) + artist_import_job.refresh_from_db() + + assert response.status_code == status.HTTP_200_OK + assert artist_import_job.import_status == ImportJob.ImportStatus.CONFIRMED + assert ( + response.context["messages"]._loaded_data[0].message + == f"Import of {artist_import_job} confirmed" + ) + import_data_mock.assert_called_once() + + +@pytest.mark.django_db(transaction=True) +def test_confirm_import_admin_action_with_incorrect_import_job_status( + client: Client, + superuser: User, +): + """Test `confirm_import` via admin action with wrong import job status.""" + client.force_login(superuser) + + artist_import_job = ArtistImportJobFactory() + artist_import_job.import_status = ImportJob.ImportStatus.CANCELLED + artist_import_job.save() + + expected_error_message = ( + f"ImportJob with id {artist_import_job.pk} has incorrect status" + ) + + response = client.post( + reverse("admin:import_export_extensions_importjob_changelist"), + data={ + "action": "confirm_jobs", + "_selected_action": [artist_import_job.pk], + }, + follow=True, + ) + artist_import_job.refresh_from_db() + + assert response.status_code == status.HTTP_200_OK + assert artist_import_job.import_status == ImportJob.ImportStatus.CANCELLED + assert ( + expected_error_message + in response.context["messages"]._loaded_data[0].message + ) diff --git a/test_project/tests/integration_tests/test_admin/test_import/test_admin_class.py b/test_project/tests/integration_tests/test_admin/test_import/test_admin_class.py new file mode 100644 index 0000000..19e25b5 --- /dev/null +++ b/test_project/tests/integration_tests/test_admin/test_import/test_admin_class.py @@ -0,0 +1,91 @@ +from django.contrib.auth.models import User +from django.test.client import Client +from django.urls import reverse + +import pytest +import pytest_mock + +from import_export_extensions.models import ImportJob +from test_project.fake_app.factories import ArtistImportJobFactory + + +@pytest.mark.parametrize( + argnames=["job_status", "expected_fieldsets"], + argvalues=[ + pytest.param( + ImportJob.ImportStatus.CREATED, + tuple(), + id="Get fieldsets for job in status CREATED", + ), + pytest.param( + ImportJob.ImportStatus.IMPORTED, + ( + ("_show_results",), + ( + "input_errors_file", + "_input_errors", + ), + ), + id="Get fieldsets for job in status IMPORTED", + ), + pytest.param( + ImportJob.ImportStatus.IMPORTING, + ( + ( + "import_status", + "import_progressbar", + ), + ), + id="Get fieldsets for job in status IMPORTING", + ), + pytest.param( + ImportJob.ImportStatus.IMPORT_ERROR, + (("traceback",),), + id="Get fieldsets for job in status IMPORT_ERROR", + ), + ], +) +def test_get_fieldsets_by_import_job_status( + client: Client, + superuser: User, + job_status: ImportJob.ImportStatus, + expected_fieldsets: tuple[tuple[str]], + mocker: pytest_mock.MockerFixture, +): + """Test that appropriate fieldsets returned for different job statuses.""" + client.force_login(superuser) + + mocker.patch( + "import_export_extensions.models.ImportJob.import_data", + ) + artist_import_job = ArtistImportJobFactory() + artist_import_job.import_status = job_status + artist_import_job.save() + + response = client.get( + reverse( + "admin:import_export_extensions_importjob_change", + kwargs={"object_id": artist_import_job.pk}, + ), + ) + + fieldsets = response.context["adminform"].fieldsets + fields = [fields["fields"] for _, fields in fieldsets] + + assert tuple(fields) == ( + ( + "import_status", + "_model", + "created_by", + "created", + "parse_finished", + "import_started", + "import_finished", + ), + *expected_fieldsets, + ( + "data_file", + "resource_path", + "resource_kwargs", + ), + ) diff --git a/test_project/tests/integration_tests/test_admin/test_import/test_celery_endpoints.py b/test_project/tests/integration_tests/test_admin/test_import/test_celery_endpoints.py new file mode 100644 index 0000000..fb4705f --- /dev/null +++ b/test_project/tests/integration_tests/test_admin/test_import/test_celery_endpoints.py @@ -0,0 +1,113 @@ +from django.contrib.auth.models import User +from django.test.client import Client +from django.urls import reverse + +from rest_framework import status + +import pytest +import pytest_mock + +from import_export_extensions.models import ImportJob +from test_project.fake_app.factories import ArtistImportJobFactory + + +def test_celery_import_status_view_during_import( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test import status page when import in progress.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.parse_data_task.apply_async") + artist_import_job = ArtistImportJobFactory(skip_parse_step=True) + artist_import_job.import_status = ImportJob.ImportStatus.IMPORTING + artist_import_job.save() + + response = client.get( + path=reverse( + "admin:fake_app_artist_import_job_status", + kwargs={"job_id": artist_import_job.pk}, + ), + ) + + expected_import_job_url = reverse( + "admin:import_job_progress", + kwargs={"job_id": artist_import_job.id}, + ) + + assert response.status_code == status.HTTP_200_OK + assert response.context["import_job_url"] == expected_import_job_url + + +@pytest.mark.parametrize( + argnames="incorrect_job_status", + argvalues=[ + ImportJob.ImportStatus.CREATED, + ImportJob.ImportStatus.PARSING, + ImportJob.ImportStatus.PARSE_ERROR, + ImportJob.ImportStatus.CONFIRMED, + ImportJob.ImportStatus.IMPORTING, + ImportJob.ImportStatus.CANCELLED, + ], +) +def test_celery_import_results_view_redirect_to_status_page( + client: Client, + superuser: User, + incorrect_job_status: ImportJob.ImportStatus, + mocker: pytest_mock.MockerFixture, +): + """Test redirect to import status page when job in not result status.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.parse_data_task.apply_async") + artist_import_job = ArtistImportJobFactory() + artist_import_job.import_status = incorrect_job_status + artist_import_job.save() + + response = client.get( + path=reverse( + "admin:fake_app_artist_import_job_results", + kwargs={"job_id": artist_import_job.pk}, + ), + ) + + expected_redirect_url = reverse( + "admin:fake_app_artist_import_job_status", + kwargs={"job_id": artist_import_job.pk}, + ) + assert response.status_code == status.HTTP_302_FOUND + assert response.url == expected_redirect_url + + +@pytest.mark.parametrize( + argnames="incorrect_job_status", + argvalues=[ + ImportJob.ImportStatus.INPUT_ERROR, + ImportJob.ImportStatus.IMPORT_ERROR, + ImportJob.ImportStatus.IMPORTED, + ], +) +def test_celery_import_results_confirm_forbidden( + client: Client, + superuser: User, + incorrect_job_status: ImportJob.ImportStatus, + mocker: pytest_mock.MockerFixture, +): + """Check that confirm from result page forbidden for not PARSED jobs.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.parse_data_task.apply_async") + artist_import_job = ArtistImportJobFactory() + artist_import_job.import_status = incorrect_job_status + artist_import_job.save() + + response = client.post( + path=reverse( + "admin:fake_app_artist_import_job_results", + kwargs={"job_id": artist_import_job.pk}, + ), + data={"confirm": "Confirm import"}, + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN diff --git a/test_project/tests/integration_tests/test_admin/test_import/test_import.py b/test_project/tests/integration_tests/test_admin/test_import/test_import.py new file mode 100644 index 0000000..d6a430f --- /dev/null +++ b/test_project/tests/integration_tests/test_admin/test_import/test_import.py @@ -0,0 +1,297 @@ +from django.contrib.auth.models import User +from django.core.files.uploadedfile import SimpleUploadedFile +from django.test.client import Client +from django.urls import reverse + +from rest_framework import status + +import pytest +import pytest_mock +from celery import states +from pytest_lazy_fixtures import lf + +from import_export_extensions.models import ImportJob +from test_project.fake_app.factories import ArtistImportJobFactory + + +@pytest.mark.usefixtures("existing_artist") +@pytest.mark.django_db(transaction=True) +def test_import_using_admin_model( + client: Client, + superuser: User, + uploaded_file: SimpleUploadedFile, +): + """Test entire importing process using Django Admin. + + There is following workflow: + 1. Go to Artist Import page + 2. Start Import Job with chosen file + 3. Redirect to the import job parsing result page through + the status page + 4. Confirm import + 5. Redirect to the import job importing page through the status page + + """ + client.force_login(superuser) + + # Go to import page in admin panel + import_response = client.get( + path=reverse("admin:fake_app_artist_import"), + ) + assert import_response.status_code == status.HTTP_200_OK + + # Start import job using admin panel + start_import_job_response = client.post( + path=reverse("admin:fake_app_artist_import"), + data={ + "import_file": uploaded_file, + "format": 0, # Choose CSV format + }, + ) + assert start_import_job_response.status_code == status.HTTP_302_FOUND + + # Go to import job status page + # Ensure there is another one redirect because import job finished + status_page_response = client.get(path=start_import_job_response.url) + assert status_page_response.status_code == status.HTTP_302_FOUND + + assert ImportJob.objects.exists() + + import_job = ImportJob.objects.first() + + # Go to results page + result_page_get_response = client.get(status_page_response.url) + assert result_page_get_response.status_code == status.HTTP_200_OK + + # Confirm import on result page + confirm_response = client.post( + path=status_page_response.url, + data={"confirm": "Confirm import"}, + ) + assert confirm_response.status_code == status.HTTP_302_FOUND + + # Ensure import job finished and redirected to result page + import_status_response = client.get(path=confirm_response.url) + assert import_status_response.status_code == status.HTTP_302_FOUND + + result_response = client.get(path=import_status_response.url) + + assert result_response.status_code == status.HTTP_200_OK + + import_job.refresh_from_db() + assert import_job.import_status == ImportJob.ImportStatus.IMPORTED + + +@pytest.mark.parametrize( + argnames=["view_name", "path_kwargs"], + argvalues=[ + pytest.param( + "admin:fake_app_artist_import", + None, + id="Test access to `celery_import_action`", + ), + pytest.param( + "admin:fake_app_artist_import_job_status", + {"job_id": lf("artist_import_job.pk")}, + id="Test access to `import_job_status_view`", + ), + pytest.param( + "admin:fake_app_artist_import_job_results", + {"job_id": lf("artist_import_job.pk")}, + id="Test access to `import_job_results_view`", + ), + ], +) +def test_import_using_admin_model_without_permissions( + client: Client, + superuser: User, + view_name: str, + path_kwargs: dict[str, str], + mocker: pytest_mock.MockerFixture, +): + """Test access to celery-import endpoints forbidden without permission.""" + client.force_login(superuser) + mocker.patch( + "test_project.fake_app.admin.ArtistAdmin.has_import_permission", + return_value=False, + ) + response = client.get( + path=reverse( + view_name, + kwargs=path_kwargs, + ), + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + +@pytest.mark.usefixtures("existing_artist") +def test_import_admin_has_same_formats( + client: Client, + superuser: User, + artist_import_job: ImportJob, +): + """Ensure input formats on import forms are the same. + + Ensure Import forms on import and on import result pages + fetch format choices from the same source. + + """ + client.force_login(superuser) + artist_import_job.import_status = ImportJob.ImportStatus.IMPORTED + artist_import_job.save() + import_response = client.get( + path=reverse("admin:fake_app_artist_import"), + ) + import_response_result = client.get( + path=reverse( + "admin:fake_app_artist_import_job_results", + kwargs={"job_id": artist_import_job.id}, + ), + ) + import_response_form = import_response.context_data["form"] + import_response_result_form = import_response_result.context_data[ + "import_form" + ] + assert ( + import_response_form.fields["format"].choices + == import_response_result_form.fields["format"].choices + ) + + +@pytest.mark.django_db(transaction=True) +def test_import_progress_during_import( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test import job admin progress page during import.""" + client.force_login(superuser) + + # Prepare data to imitate intermediate task state + fake_progress_info = { + "current": 2, + "total": 3, + } + mocker.patch( + "celery.result.AsyncResult.info", + new=fake_progress_info, + ) + expected_percent = int( + fake_progress_info["current"] / fake_progress_info["total"] * 100, + ) + + artist_import_job = ArtistImportJobFactory( + skip_parse_step=True, + ) + artist_import_job.import_status = ImportJob.ImportStatus.IMPORTING + artist_import_job.save() + + response = client.post( + path=reverse( + "admin:import_job_progress", + kwargs={"job_id": artist_import_job.pk}, + ), + ) + + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "status": ImportJob.ImportStatus.IMPORTING.title(), + "state": "SUCCESS", + "percent": expected_percent, + **fake_progress_info, + } + + +@pytest.mark.django_db(transaction=True) +def test_import_progress_after_complete_import( + client: Client, + superuser: User, +): + """Test import job admin progress page after complete import.""" + client.force_login(superuser) + + artist_import_job = ArtistImportJobFactory( + skip_parse_step=True, + ) + artist_import_job.refresh_from_db() + + response = client.post( + path=reverse( + "admin:import_job_progress", + kwargs={"job_id": artist_import_job.pk}, + ), + ) + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "status": artist_import_job.import_status.title(), + } + + +@pytest.mark.django_db(transaction=True) +def test_import_progress_with_deleted_import_job( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test import job admin progress page with deleted import job.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.import_data_task.apply_async") + artist_import_job = ArtistImportJobFactory() + job_id = artist_import_job.id + artist_import_job.delete() + + expected_error_message = "ImportJob matching query does not exist." + + response = client.post( + path=reverse( + "admin:import_job_progress", + kwargs={ + "job_id": job_id, + }, + ), + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + assert response.json()["validation_error"] == expected_error_message + + +@pytest.mark.django_db(transaction=True) +def test_import_progress_with_failed_celery_task( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test that after celery fail ImportJob will be in import error status.""" + client.force_login(superuser) + + expected_error_message = "Mocked Error Message" + mocker.patch( + "celery.result.AsyncResult.state", + new=states.FAILURE, + ) + mocker.patch( + "celery.result.AsyncResult.info", + new=ValueError(expected_error_message), + ) + artist_import_job = ArtistImportJobFactory() + artist_import_job.refresh_from_db() + artist_import_job.confirm_import() + artist_import_job.import_status = ImportJob.ImportStatus.IMPORTING + artist_import_job.save() + + response = client.post( + path=reverse( + "admin:import_job_progress", + kwargs={ + "job_id": artist_import_job.id, + }, + ), + ) + + assert response.status_code == status.HTTP_200_OK + assert response.json()["state"] == states.FAILURE + artist_import_job.refresh_from_db() + assert ( + artist_import_job.import_status == ImportJob.ImportStatus.IMPORT_ERROR + ) diff --git a/test_project/tests/test_widgets.py b/test_project/tests/test_widgets.py index 80f9e4e..1b21ab1 100644 --- a/test_project/tests/test_widgets.py +++ b/test_project/tests/test_widgets.py @@ -2,6 +2,7 @@ from django.core.files import File from django.core.files.storage import default_storage +from django.db.models.fields.files import FieldFile from django.forms import ValidationError import pytest @@ -24,7 +25,7 @@ @pytest.fixture -def import_file(): +def import_file() -> FieldFile: """Prepare import file from import job.""" import_job = ArtistImportJobFactory.build(artists=[ArtistFactory()]) return import_job.data_file @@ -368,18 +369,19 @@ def test_intermediate_widget_filter_with_lookup(rem_field_lookup: str): assert ignored_band not in result -def test_file_widget_render_link(import_file): +def test_file_widget_render_link(import_file: FieldFile): """Test FileWidget `render` method.""" widget = FileWidget( filename="test_widget", ) result = widget.render(import_file) + assert result assert import_file.url in result def test_file_widget_render_link_for_non_local_env( - import_file, + import_file: FieldFile, mocker: pytest_mock.MockerFixture, ): """Test FileWidget `render` method."""