Skip to content

Commit

Permalink
Merge branch 'master' into riese/ga_form_submit
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinRiese committed Jan 23, 2025
2 parents 8870796 + f32a223 commit 4b1408d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 34 deletions.
9 changes: 8 additions & 1 deletion corehq/apps/receiverwrapper/tests/test_audit_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from corehq.apps.receiverwrapper.views import post_api, secure_post
from corehq.apps.users.models import CommCareUser, HqPermissions, WebUser
from corehq.apps.users.models_role import UserRole
from corehq.util.test_utils import flag_enabled


def return_200(*args, **kwargs):
Expand Down Expand Up @@ -76,7 +77,13 @@ def test_commcare_user_regular_submission(self):
self.assert_api_response(200, url)
mock_notify_exception.assert_not_called()

def test_api_user_regular_submission_gets_logged(self):
def test_api_user_regular_submission_with_no_FF(self):
url = reverse(secure_post, args=[self.domain])
self._create_user(access_api=True, access_mobile_endpoints=False)
self.assert_api_response(403, url)

@flag_enabled('OPEN_SUBMISSION_ENDPOINT')
def test_api_user_regular_submission_with_FF(self):
url = reverse(secure_post, args=[self.domain])
user = self._create_user(access_api=True, access_mobile_endpoints=False)
with patch('corehq.apps.receiverwrapper.views.notify_exception') as mock_notify_exception:
Expand Down
34 changes: 15 additions & 19 deletions corehq/apps/receiverwrapper/tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@

from couchforms import openrosa_response

from corehq import toggles
from corehq.apps.app_manager.models import Application
from corehq.apps.domain.shortcuts import create_domain
from corehq.apps.receiverwrapper.util import DEMO_SUBMIT_MODE
from corehq.apps.receiverwrapper.views import (
_verify_access,
_has_mobile_access,
post_api,
secure_post,
)
Expand All @@ -31,6 +30,7 @@
from corehq.apps.users.util import normalize_username
from corehq.form_processor.models import XFormInstance
from corehq.form_processor.tests.utils import sharded
from corehq.util.test_utils import flag_enabled


class FakeFile(object):
Expand Down Expand Up @@ -430,7 +430,6 @@ def test_access_api_required(self):


@mock.patch('corehq.apps.receiverwrapper.views.is_from_formplayer')
@mock.patch('corehq.toggles.set_toggle')
@mock.patch('corehq.apps.receiverwrapper.views.cache.get', mock.MagicMock(return_value=None))
@mock.patch('corehq.apps.receiverwrapper.views.cache.set', mock.MagicMock)
class TestHasMobileAccess(TestCase):
Expand All @@ -454,25 +453,22 @@ def _create_user(cls, access_mobile_endpoints=False):
return WebUser.create(cls.domain, str(uuid.uuid4()), 'p@$$w0rd',
None, None, role_id=role.get_id)

def _verify_access(self, user):
_verify_access(self.domain, user.user_id, mock.MagicMock(couch_user=user))
def _has_mobile_access(self, user):
return _has_mobile_access(self.domain, user.user_id, mock.MagicMock(couch_user=user))

def test_formplayer_request_without_permission(self, set_toggle, is_from_formplayer):
is_from_formplayer.return_value = True
self._verify_access(self.user_without_permission)
set_toggle.assert_not_called()

def test_regular_request_without_permission(self, set_toggle, is_from_formplayer):
def test_mobile_request_with_permission(self, is_from_formplayer):
is_from_formplayer.return_value = False
self._verify_access(self.user_without_permission)
set_toggle.assert_called_with('open_submission_endpoint', self.domain, True, toggles.NAMESPACE_DOMAIN)
self.assertTrue(self._has_mobile_access(self.user_with_permission))

def test_formplayer_request_with_permission(self, set_toggle, is_from_formplayer):
def test_formplayer_request_without_permission(self, is_from_formplayer):
is_from_formplayer.return_value = True
self._verify_access(self.user_with_permission)
set_toggle.assert_not_called()
self.assertTrue(self._has_mobile_access(self.user_without_permission))

def test_no_permission_but_whitelisted(self, is_from_formplayer):
is_from_formplayer.return_value = False
with flag_enabled('OPEN_SUBMISSION_ENDPOINT'):
self.assertTrue(self._has_mobile_access(self.user_without_permission))

def test_regular_request_with_permission(self, set_toggle, is_from_formplayer):
def test_no_permission_no_toggle(self, is_from_formplayer):
is_from_formplayer.return_value = False
self._verify_access(self.user_with_permission)
set_toggle.assert_not_called()
self.assertFalse(self._has_mobile_access(self.user_without_permission))
28 changes: 14 additions & 14 deletions corehq/apps/receiverwrapper/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,30 @@
CACHE_EXPIRY_7_DAYS_IN_SECS = 7 * 24 * 60 * 60


# This mirrors the logic of require_mobile_access
def _verify_access(domain, user_id, request):
# This mirrors the logic of require_mobile_access, but with a whitelist exempted
def _has_mobile_access(domain, user_id, request):
"""Unless going through formplayer or the API, users need access_mobile_endpoints"""
if not is_from_formplayer(request):
cache_key = f"form_submission_permissions_audit_v2:{user_id}"
if cache.get(cache_key):
# User is already logged once in last 7 days for incorrect access, so no need to log again
return
if (is_from_formplayer(request)
or request.couch_user.has_permission(domain, 'access_mobile_endpoints')):
return True

if not request.couch_user.has_permission(domain, 'access_mobile_endpoints'):
if toggles.OPEN_SUBMISSION_ENDPOINT.enabled(domain):
# log incorrect access at most once every 7 days to ease transition off flag
cache_key = f"form_submission_permissions_audit_v2:{user_id}"
if not cache.get(cache_key):
cache.set(cache_key, True, CACHE_EXPIRY_7_DAYS_IN_SECS)
message = f"NoMobileEndpointsAccess: invalid request by {user_id} on {domain}"
notify_exception(request, message=message)
if not toggles.OPEN_SUBMISSION_ENDPOINT.enabled(domain):
# Once we're confident this has enabled the flag for all active
# usage, we can make domains without the toggle fail hard
toggles.OPEN_SUBMISSION_ENDPOINT.set(domain, enabled=True, namespace=toggles.NAMESPACE_DOMAIN)
return True

return False


@profile_dump('commcare_receiverwapper_process_form.prof', probability=PROFILE_PROBABILITY, limit=PROFILE_LIMIT)
def _process_form(request, domain, app_id, user_id, authenticated,
auth_cls=AuthContext, is_api=False):
if authenticated and not is_api:
_verify_access(domain, user_id, request)
if authenticated and not is_api and not _has_mobile_access(domain, user_id, request):
return HttpResponseForbidden()

if rate_limit_submission(domain):
return HttpTooManyRequests()
Expand Down

0 comments on commit 4b1408d

Please sign in to comment.