Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply whitelist strictly - fail when misconfigured #35435

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion corehq/apps/receiverwrapper/tests/test_audit_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from corehq.apps.receiverwrapper.views import post_api, secure_post
from corehq.apps.users.models import CommCareUser, HqPermissions, WebUser
from corehq.apps.users.models_role import UserRole
from corehq.util.test_utils import flag_enabled


def return_200(*args, **kwargs):
Expand Down Expand Up @@ -76,7 +77,13 @@ def test_commcare_user_regular_submission(self):
self.assert_api_response(200, url)
mock_notify_exception.assert_not_called()

def test_api_user_regular_submission_gets_logged(self):
def test_api_user_regular_submission_with_no_FF(self):
url = reverse(secure_post, args=[self.domain])
self._create_user(access_api=True, access_mobile_endpoints=False)
self.assert_api_response(403, url)

@flag_enabled('OPEN_SUBMISSION_ENDPOINT')
def test_api_user_regular_submission_with_FF(self):
url = reverse(secure_post, args=[self.domain])
user = self._create_user(access_api=True, access_mobile_endpoints=False)
with patch('corehq.apps.receiverwrapper.views.notify_exception') as mock_notify_exception:
Expand Down
34 changes: 15 additions & 19 deletions corehq/apps/receiverwrapper/tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@

from couchforms import openrosa_response

from corehq import toggles
from corehq.apps.app_manager.models import Application
from corehq.apps.domain.shortcuts import create_domain
from corehq.apps.receiverwrapper.util import DEMO_SUBMIT_MODE
from corehq.apps.receiverwrapper.views import (
_verify_access,
_has_mobile_access,
post_api,
secure_post,
)
Expand All @@ -31,6 +30,7 @@
from corehq.apps.users.util import normalize_username
from corehq.form_processor.models import XFormInstance
from corehq.form_processor.tests.utils import sharded
from corehq.util.test_utils import flag_enabled


class FakeFile(object):
Expand Down Expand Up @@ -430,7 +430,6 @@ def test_access_api_required(self):


@mock.patch('corehq.apps.receiverwrapper.views.is_from_formplayer')
@mock.patch('corehq.toggles.set_toggle')
@mock.patch('corehq.apps.receiverwrapper.views.cache.get', mock.MagicMock(return_value=None))
@mock.patch('corehq.apps.receiverwrapper.views.cache.set', mock.MagicMock)
class TestHasMobileAccess(TestCase):
Expand All @@ -454,25 +453,22 @@ def _create_user(cls, access_mobile_endpoints=False):
return WebUser.create(cls.domain, str(uuid.uuid4()), 'p@$$w0rd',
None, None, role_id=role.get_id)

def _verify_access(self, user):
_verify_access(self.domain, user.user_id, mock.MagicMock(couch_user=user))
def _has_mobile_access(self, user):
return _has_mobile_access(self.domain, user.user_id, mock.MagicMock(couch_user=user))

def test_formplayer_request_without_permission(self, set_toggle, is_from_formplayer):
is_from_formplayer.return_value = True
self._verify_access(self.user_without_permission)
set_toggle.assert_not_called()

def test_regular_request_without_permission(self, set_toggle, is_from_formplayer):
def test_mobile_request_with_permission(self, is_from_formplayer):
is_from_formplayer.return_value = False
self._verify_access(self.user_without_permission)
set_toggle.assert_called_with('open_submission_endpoint', self.domain, True, toggles.NAMESPACE_DOMAIN)
self.assertTrue(self._has_mobile_access(self.user_with_permission))

def test_formplayer_request_with_permission(self, set_toggle, is_from_formplayer):
def test_formplayer_request_without_permission(self, is_from_formplayer):
is_from_formplayer.return_value = True
self._verify_access(self.user_with_permission)
set_toggle.assert_not_called()
self.assertTrue(self._has_mobile_access(self.user_without_permission))

def test_no_permission_but_whitelisted(self, is_from_formplayer):
is_from_formplayer.return_value = False
with flag_enabled('OPEN_SUBMISSION_ENDPOINT'):
self.assertTrue(self._has_mobile_access(self.user_without_permission))

def test_regular_request_with_permission(self, set_toggle, is_from_formplayer):
def test_no_permission_no_toggle(self, is_from_formplayer):
is_from_formplayer.return_value = False
self._verify_access(self.user_with_permission)
set_toggle.assert_not_called()
self.assertFalse(self._has_mobile_access(self.user_without_permission))
28 changes: 14 additions & 14 deletions corehq/apps/receiverwrapper/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,30 @@
CACHE_EXPIRY_7_DAYS_IN_SECS = 7 * 24 * 60 * 60


# This mirrors the logic of require_mobile_access
def _verify_access(domain, user_id, request):
# This mirrors the logic of require_mobile_access, but with a whitelist exempted
def _has_mobile_access(domain, user_id, request):
"""Unless going through formplayer or the API, users need access_mobile_endpoints"""
if not is_from_formplayer(request):
cache_key = f"form_submission_permissions_audit_v2:{user_id}"
if cache.get(cache_key):
# User is already logged once in last 7 days for incorrect access, so no need to log again
return
if (is_from_formplayer(request)
or request.couch_user.has_permission(domain, 'access_mobile_endpoints')):
return True

if not request.couch_user.has_permission(domain, 'access_mobile_endpoints'):
if toggles.OPEN_SUBMISSION_ENDPOINT.enabled(domain):
# log incorrect access at most once every 7 days to ease transition off flag
cache_key = f"form_submission_permissions_audit_v2:{user_id}"
if not cache.get(cache_key):
cache.set(cache_key, True, CACHE_EXPIRY_7_DAYS_IN_SECS)
message = f"NoMobileEndpointsAccess: invalid request by {user_id} on {domain}"
notify_exception(request, message=message)
if not toggles.OPEN_SUBMISSION_ENDPOINT.enabled(domain):
# Once we're confident this has enabled the flag for all active
# usage, we can make domains without the toggle fail hard
toggles.OPEN_SUBMISSION_ENDPOINT.set(domain, enabled=True, namespace=toggles.NAMESPACE_DOMAIN)
return True

return False


@profile_dump('commcare_receiverwapper_process_form.prof', probability=PROFILE_PROBABILITY, limit=PROFILE_LIMIT)
def _process_form(request, domain, app_id, user_id, authenticated,
auth_cls=AuthContext, is_api=False):
if authenticated and not is_api:
_verify_access(domain, user_id, request)
if authenticated and not is_api and not _has_mobile_access(domain, user_id, request):
return HttpResponseForbidden()

if rate_limit_submission(domain):
return HttpTooManyRequests()
Expand Down
Loading