diff --git a/ca/django_ca/tests/__init__.py b/ca/django_ca/tests/__init__.py
index f94cd1215..0f9f9ea44 100644
--- a/ca/django_ca/tests/__init__.py
+++ b/ca/django_ca/tests/__init__.py
@@ -29,6 +29,7 @@
# NOTE: No need to add test_* modules, they are included automatically.
pytest.register_assert_rewrite(
"django_ca.tests.base.assertions",
+ "django_ca.tests.acme.views.assertions",
"django_ca.tests.admin.assertions",
"django_ca.tests.pydantic.base",
)
diff --git a/ca/django_ca/tests/acme/views/assertions.py b/ca/django_ca/tests/acme/views/assertions.py
new file mode 100644
index 000000000..a9a0431e4
--- /dev/null
+++ b/ca/django_ca/tests/acme/views/assertions.py
@@ -0,0 +1,73 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""Assertions for ACME views."""
+
+import re
+from typing import TYPE_CHECKING, Optional
+
+from requests.utils import parse_header_links
+
+from django.urls import reverse
+
+from django_ca.models import CertificateAuthority
+
+if TYPE_CHECKING:
+ from django.test.client import _MonkeyPatchedWSGIResponse as HttpResponse
+
+
+def assert_link_relations(response: "HttpResponse", ca: CertificateAuthority, **kwargs: str) -> None:
+ """Assert Link relations for a given request."""
+ directory = reverse("django_ca:acme-directory", kwargs={"serial": ca.serial})
+ kwargs.setdefault("index", response.wsgi_request.build_absolute_uri(directory))
+
+ expected = [{"rel": k, "url": v} for k, v in kwargs.items()]
+ actual = parse_header_links(response["Link"])
+ assert expected == actual
+
+
+def assert_acme_problem(
+ response: "HttpResponse",
+ typ: str,
+ status: int,
+ message: str,
+ ca: CertificateAuthority,
+ link_relations: Optional[dict[str, str]] = None,
+ regex: bool = False,
+) -> None:
+ """Assert that an HTTP response confirms to an ACME problem report.
+
+ .. seealso:: `RFC 8555, section 8 `_
+ """
+ link_relations = link_relations or {}
+ assert response["Content-Type"] == "application/problem+json", response.content
+ assert_link_relations(response, ca=ca, **link_relations)
+ data = response.json()
+ assert data["type"] == f"urn:ietf:params:acme:error:{typ}", f"detail={data['detail']}"
+ assert data["status"] == status
+ if regex:
+ assert re.search(message, data["detail"])
+ else:
+ assert data["detail"] == message
+ assert "Replay-Nonce" in response
+
+
+def assert_acme_response(
+ response: "HttpResponse",
+ ca: CertificateAuthority,
+ link_relations: Optional[dict[str, str]] = None,
+) -> None:
+ """Assert basic Acme Response properties (Content-Type & Link header)."""
+ link_relations = link_relations or {}
+ assert_link_relations(response, ca, **link_relations)
+ assert response["Content-Type"] == "application/json"
diff --git a/ca/django_ca/tests/acme/views/base.py b/ca/django_ca/tests/acme/views/base.py
index fa5554ce7..fba044717 100644
--- a/ca/django_ca/tests/acme/views/base.py
+++ b/ca/django_ca/tests/acme/views/base.py
@@ -25,7 +25,6 @@
import acme
import acme.jws
import josepy as jose
-from requests.utils import parse_header_links
from cryptography.hazmat.primitives.asymmetric.types import CertificateIssuerPrivateKeyTypes
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
@@ -34,6 +33,10 @@
from django_ca.acme.responses import AcmeResponseUnauthorized
from django_ca.models import AcmeAccount, CertificateAuthority, acme_slug
+from django_ca.tests.acme.views.assertions import (
+ assert_acme_problem,
+ assert_acme_response,
+)
from django_ca.tests.base.constants import CERT_DATA
from django_ca.tests.base.mixins import TestCaseMixin
from django_ca.tests.base.utils import mock_slug, override_tmpcadir
@@ -85,8 +88,6 @@ def absolute_uri(self, name: str, hostname: Optional[str] = None, **kwargs: Any)
hostname = self.SERVER_NAME
return super().absolute_uri(name, hostname=hostname, **kwargs)
- # NOINSPECTION NOTE: PyCharm does not detect mixins as a TestCase
- # noinspection PyPep8Naming
def assertAcmeProblem( # pylint: disable=invalid-name
self,
response: "HttpResponse",
@@ -101,17 +102,7 @@ def assertAcmeProblem( # pylint: disable=invalid-name
.. seealso:: `RFC 8555, section 8 `_
"""
- link_relations = link_relations or {}
- self.assertEqual(response["Content-Type"], "application/problem+json", response.content)
- self.assertLinkRelations(response, ca=ca, **link_relations)
- data = response.json()
- self.assertEqual(data["type"], f"urn:ietf:params:acme:error:{typ}", f"detail={data['detail']}")
- self.assertEqual(data["status"], status)
- if regex:
- self.assertRegex(data["detail"], message)
- else:
- self.assertEqual(data["detail"], message)
- self.assertIn("Replay-Nonce", response)
+ assert_acme_problem(response, typ, status, message, ca or self.ca, link_relations, regex)
# NOINSPECTION NOTE: PyCharm does not detect mixins as a TestCase
# noinspection PyPep8Naming
@@ -122,25 +113,7 @@ def assertAcmeResponse( # pylint: disable=invalid-name
link_relations: Optional[dict[str, str]] = None,
) -> None:
"""Assert basic Acme Response properties (Content-Type & Link header)."""
- link_relations = link_relations or {}
- self.assertLinkRelations(response, ca=ca, **link_relations)
- self.assertEqual(response["Content-Type"], "application/json")
-
- # NOINSPECTION NOTE: PyCharm does not detect mixins as a TestCase
- # noinspection PyPep8Naming
- def assertLinkRelations( # pylint: disable=invalid-name
- self, response: "HttpResponse", ca: Optional[CertificateAuthority] = None, **kwargs: str
- ) -> None:
- """Assert Link relations for a given request."""
- if ca is None: # pragma: no branch
- ca = self.ca
-
- directory = reverse("django_ca:acme-directory", kwargs={"serial": ca.serial})
- kwargs.setdefault("index", response.wsgi_request.build_absolute_uri(directory))
-
- expected = [{"rel": k, "url": v} for k, v in kwargs.items()]
- actual = parse_header_links(response["Link"])
- self.assertEqual(expected, actual)
+ assert_acme_response(response, ca or self.ca, link_relations)
# NOINSPECTION NOTE: PyCharm does not detect mixins as a TestCase
# noinspection PyPep8Naming
diff --git a/ca/django_ca/tests/acme/views/test_order_finalize.py b/ca/django_ca/tests/acme/views/test_order_finalize.py
index ab67f800e..43601cc29 100644
--- a/ca/django_ca/tests/acme/views/test_order_finalize.py
+++ b/ca/django_ca/tests/acme/views/test_order_finalize.py
@@ -15,6 +15,7 @@
from http import HTTPStatus
from unittest import mock
+from unittest.mock import patch
import acme
import josepy as jose
@@ -31,14 +32,20 @@
from freezegun import freeze_time
from django_ca.acme.messages import CertificateRequest
-from django_ca.models import AcmeAccount, AcmeAuthorization, AcmeOrder
+from django_ca.models import AcmeAccount, AcmeAuthorization, AcmeOrder, CertificateAuthority
from django_ca.tasks import acme_issue_certificate
+from django_ca.tests.acme.views.assertions import assert_acme_problem, assert_acme_response
from django_ca.tests.acme.views.base import AcmeWithAccountViewTestCaseMixin
from django_ca.tests.base.constants import CERT_DATA, FIXTURES_DIR, TIMESTAMPS
from django_ca.tests.base.typehints import HttpResponse
from django_ca.tests.base.utils import dns, override_tmpcadir
+def assert_bad_csr(response: "HttpResponse", message: str, ca: CertificateAuthority) -> None:
+ """Assert a badCSR error."""
+ assert_acme_problem(response, "badCSR", ca=ca, status=HTTPStatus.BAD_REQUEST, message=message)
+
+
@freeze_time(TIMESTAMPS["everything_valid"])
class AcmeOrderFinalizeViewTestCase(
AcmeWithAccountViewTestCaseMixin[CertificateRequest], TransactionTestCase
@@ -72,10 +79,6 @@ def setUp(self) -> None:
self.authz.status = AcmeAuthorization.STATUS_VALID
self.authz.save()
- def assertBadCSR(self, resp: "HttpResponse", message: str) -> None: # pylint: disable=invalid-name
- """Assert a badCSR error."""
- self.assertAcmeProblem(resp, "badCSR", status=HTTPStatus.BAD_REQUEST, message=message)
-
def get_message( # type: ignore[override]
self, csr: x509.CertificateSigningRequest
) -> CertificateRequest:
@@ -91,36 +94,54 @@ def message(self) -> CertificateRequest:
@override_tmpcadir()
def test_basic(self, accept_naive: bool = False) -> None:
"""Basic test for creating an account via ACME."""
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.message, kid=self.kid)
- self.assertEqual(resp.status_code, HTTPStatus.OK, resp.content)
- self.assertAcmeResponse(resp)
+ assert resp.status_code == HTTPStatus.OK, resp.content
+ assert_acme_response(resp, self.ca)
order = AcmeOrder.objects.get(pk=self.order.pk)
cert = order.acmecertificate
- self.assertEqual(
- mockcm.call_args_list, [mock.call(acme_issue_certificate, acme_certificate_pk=cert.pk)]
- )
- self.assertEqual(
- resp.json(),
- {
- "authorizations": [f"http://{self.SERVER_NAME}{self.authz.acme_url}"],
- "expires": pyrfc3339.generate(order.expires, accept_naive=accept_naive),
- "identifiers": [{"type": "dns", "value": self.hostname}],
- "status": "processing",
- },
- )
+ assert mockcm.call_args_list == [mock.call(acme_issue_certificate, acme_certificate_pk=cert.pk)]
+
+ assert resp.json() == {
+ "authorizations": [f"http://{self.SERVER_NAME}{self.authz.acme_url}"],
+ "expires": pyrfc3339.generate(order.expires, accept_naive=accept_naive),
+ "identifiers": [{"type": "dns", "value": self.hostname}],
+ "status": "processing",
+ }
@override_settings(USE_TZ=False)
def test_basic_without_tz(self) -> None:
"""Basic test without timezone support."""
self.test_basic(True)
+ @override_tmpcadir()
+ def test_unknown_key_backend(self) -> None:
+ """Test that the frontend does not need to know about the backend."""
+ self.ca.key_backend_alias = "unknown"
+ self.ca.save()
+
+ with patch("django_ca.acme.views.run_task") as mockcm:
+ resp = self.acme(self.url, self.message, kid=self.kid)
+ assert resp.status_code == HTTPStatus.OK, resp.content
+ assert_acme_response(resp, self.ca)
+
+ order = AcmeOrder.objects.get(pk=self.order.pk)
+ cert = order.acmecertificate
+ assert mockcm.call_args_list == [mock.call(acme_issue_certificate, acme_certificate_pk=cert.pk)]
+
+ assert resp.json() == {
+ "authorizations": [f"http://{self.SERVER_NAME}{self.authz.acme_url}"],
+ "expires": pyrfc3339.generate(order.expires, accept_naive=False),
+ "identifiers": [{"type": "dns", "value": self.hostname}],
+ "status": "processing",
+ }
+
@override_tmpcadir()
def test_not_found(self) -> None:
"""Test an order that does not exist."""
url = reverse("django_ca:acme-order-finalize", kwargs={"serial": self.ca.serial, "slug": "foo"})
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(url, self.message, kid=self.kid)
mockcm.assert_not_called()
self.assertUnauthorized(resp, "You are not authorized to perform this request.")
@@ -134,7 +155,7 @@ def test_wrong_account(self) -> None:
self.order.account = account
self.order.save()
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.message, kid=self.kid)
mockcm.assert_not_called()
self.assertUnauthorized(resp, "You are not authorized to perform this request.")
@@ -145,7 +166,7 @@ def test_not_ready(self) -> None:
self.order.status = AcmeOrder.STATUS_INVALID
self.order.save()
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.message, kid=self.kid)
mockcm.assert_not_called()
self.assertAcmeProblem(
@@ -158,7 +179,7 @@ def test_invalid_auth(self) -> None:
self.authz.status = AcmeAuthorization.STATUS_INVALID
self.authz.save()
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.message, kid=self.kid)
mockcm.assert_not_called()
self.assertAcmeProblem(
@@ -175,12 +196,12 @@ def test_csr_invalid_signature(self) -> None:
type(csr_mock).is_signature_valid = mock.PropertyMock(return_value=False)
with (
- self.patch("django_ca.acme.views.run_task") as mockcm,
- self.patch("django_ca.acme.views.parse_acme_csr", return_value=csr_mock),
+ patch("django_ca.acme.views.run_task") as mockcm,
+ patch("django_ca.acme.views.parse_acme_csr", return_value=csr_mock),
):
resp = self.acme(self.url, self.message, kid=self.kid)
mockcm.assert_not_called()
- self.assertBadCSR(resp, "CSR signature is not valid.")
+ assert_bad_csr(resp, "CSR signature is not valid.", self.ca)
@override_tmpcadir()
def test_csr_bad_algorithm(self) -> None:
@@ -188,18 +209,18 @@ def test_csr_bad_algorithm(self) -> None:
with open(FIXTURES_DIR / "md5.csr.pem", "rb") as stream:
signed_csr = x509.load_pem_x509_csr(stream.read())
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.get_message(signed_csr), kid=self.kid)
mockcm.assert_not_called()
- self.assertBadCSR(resp, "md5: Insecure hash algorithm.")
+ assert_bad_csr(resp, "md5: Insecure hash algorithm.", self.ca)
with open(FIXTURES_DIR / "sha1.csr.pem", "rb") as stream:
signed_csr = x509.load_pem_x509_csr(stream.read())
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.get_message(signed_csr), kid=self.kid)
mockcm.assert_not_called()
- self.assertBadCSR(resp, "sha1: Insecure hash algorithm.")
+ assert_bad_csr(resp, "sha1: Insecure hash algorithm.", self.ca)
@override_tmpcadir()
def test_csr_valid_subject(self) -> None:
@@ -217,25 +238,20 @@ def test_csr_valid_subject(self) -> None:
.sign(CERT_DATA["root-cert"]["key"]["parsed"], hashes.SHA256())
)
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.get_message(csr), kid=self.kid)
- self.assertEqual(resp.status_code, HTTPStatus.OK, resp.content)
- self.assertAcmeResponse(resp)
+ assert resp.status_code == HTTPStatus.OK, resp.content
+ assert_acme_response(resp, self.ca)
order = AcmeOrder.objects.get(pk=self.order.pk)
cert = order.acmecertificate
- self.assertEqual(
- mockcm.call_args_list, [mock.call(acme_issue_certificate, acme_certificate_pk=cert.pk)]
- )
- self.assertEqual(
- resp.json(),
- {
- "authorizations": [f"http://{self.SERVER_NAME}{self.authz.acme_url}"],
- "expires": pyrfc3339.generate(order.expires, accept_naive=True),
- "identifiers": [{"type": "dns", "value": self.hostname}],
- "status": "processing",
- },
- )
+ assert mockcm.call_args_list == [mock.call(acme_issue_certificate, acme_certificate_pk=cert.pk)]
+ assert resp.json() == {
+ "authorizations": [f"http://{self.SERVER_NAME}{self.authz.acme_url}"],
+ "expires": pyrfc3339.generate(order.expires, accept_naive=True),
+ "identifiers": [{"type": "dns", "value": self.hostname}],
+ "status": "processing",
+ }
@override_tmpcadir()
def test_csr_subject_no_cn(self) -> None:
@@ -253,25 +269,21 @@ def test_csr_subject_no_cn(self) -> None:
.sign(CERT_DATA["root-cert"]["key"]["parsed"], hashes.SHA256())
)
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.get_message(csr), kid=self.kid)
- self.assertEqual(resp.status_code, HTTPStatus.OK, resp.content)
- self.assertAcmeResponse(resp)
+ assert resp.status_code == HTTPStatus.OK, resp.content
+ assert_acme_response(resp, self.ca)
order = AcmeOrder.objects.get(pk=self.order.pk)
cert = order.acmecertificate
- self.assertEqual(
- mockcm.call_args_list, [mock.call(acme_issue_certificate, acme_certificate_pk=cert.pk)]
- )
- self.assertEqual(
- resp.json(),
- {
- "authorizations": [f"http://{self.SERVER_NAME}{self.authz.acme_url}"],
- "expires": pyrfc3339.generate(order.expires, accept_naive=True),
- "identifiers": [{"type": "dns", "value": self.hostname}],
- "status": "processing",
- },
- )
+ assert mockcm.call_args_list == [mock.call(acme_issue_certificate, acme_certificate_pk=cert.pk)]
+
+ assert resp.json() == {
+ "authorizations": [f"http://{self.SERVER_NAME}{self.authz.acme_url}"],
+ "expires": pyrfc3339.generate(order.expires, accept_naive=True),
+ "identifiers": [{"type": "dns", "value": self.hostname}],
+ "status": "processing",
+ }
@override_tmpcadir()
def test_csr_subject_no_domain(self) -> None:
@@ -289,10 +301,10 @@ def test_csr_subject_no_domain(self) -> None:
.sign(CERT_DATA["root-cert"]["key"]["parsed"], hashes.SHA256())
)
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.get_message(csr), kid=self.kid)
mockcm.assert_not_called()
- self.assertBadCSR(resp, "CommonName was not in order.")
+ assert_bad_csr(resp, "CommonName was not in order.", self.ca)
@override_tmpcadir()
def test_csr_subject_not_in_order(self) -> None:
@@ -310,10 +322,10 @@ def test_csr_subject_not_in_order(self) -> None:
.sign(CERT_DATA["root-cert"]["key"]["parsed"], hashes.SHA256())
)
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.get_message(csr), kid=self.kid)
mockcm.assert_not_called()
- self.assertBadCSR(resp, "CommonName was not in order.")
+ assert_bad_csr(resp, "CommonName was not in order.", self.ca)
@override_tmpcadir()
def test_csr_no_san(self) -> None:
@@ -324,14 +336,14 @@ def test_csr_no_san(self) -> None:
.sign(CERT_DATA["root-cert"]["key"]["parsed"], hashes.SHA256())
)
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.get_message(csr), kid=self.kid)
mockcm.assert_not_called()
- self.assertBadCSR(resp, "No subject alternative names found in CSR.")
+ assert_bad_csr(resp, "No subject alternative names found in CSR.", self.ca)
@override_tmpcadir()
def test_csr_different_names(self) -> None:
- """Test posting a CSR with different names in the SubjectAlternativeName extesion."""
+ """Test posting a CSR with different names in the SubjectAlternativeName extension."""
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(x509.Name([]))
@@ -342,22 +354,22 @@ def test_csr_different_names(self) -> None:
.sign(CERT_DATA["root-cert"]["key"]["parsed"], hashes.SHA256())
)
- with self.patch("django_ca.acme.views.run_task") as mockcm:
+ with patch("django_ca.acme.views.run_task") as mockcm:
resp = self.acme(self.url, self.get_message(csr), kid=self.kid)
mockcm.assert_not_called()
- self.assertBadCSR(resp, "Names in CSR do not match.")
+ assert_bad_csr(resp, "Names in CSR do not match.", self.ca)
@override_tmpcadir()
def test_unparsable_csr(self) -> None:
"""Test passing a completely unparsable CSR."""
with (
- self.patch("django_ca.acme.views.run_task") as mockcm,
- self.patch("django_ca.acme.views.AcmeOrderFinalizeView.message_cls.encode", side_effect=[b"foo"]),
+ patch("django_ca.acme.views.run_task") as mockcm,
+ patch("django_ca.acme.views.AcmeOrderFinalizeView.message_cls.encode", side_effect=[b"foo"]),
self.assertLogs(),
):
resp = self.acme(self.url, self.message, kid=self.kid)
mockcm.assert_not_called()
- self.assertBadCSR(resp, "Unable to parse CSR.")
+ assert_bad_csr(resp, "Unable to parse CSR.", self.ca)
@override_tmpcadir()
def test_csr_invalid_version(self) -> None:
@@ -365,9 +377,9 @@ def test_csr_invalid_version(self) -> None:
# It's difficult to create a CSR with an invalid version, so we just mock the parsing function raising
# the exception instead.
with (
- self.patch("django_ca.acme.views.run_task") as mockcm,
- self.patch("django_ca.acme.views.parse_acme_csr", side_effect=x509.InvalidVersion("foo", 42)),
+ patch("django_ca.acme.views.run_task") as mockcm,
+ patch("django_ca.acme.views.parse_acme_csr", side_effect=x509.InvalidVersion("foo", 42)),
):
resp = self.acme(self.url, self.message, kid=self.kid)
mockcm.assert_not_called()
- self.assertBadCSR(resp, "Invalid CSR version.")
+ assert_bad_csr(resp, "Invalid CSR version.", self.ca)