diff --git a/example/plugins/microservices/filter_attributes.yaml.example b/example/plugins/microservices/filter_attributes.yaml.example index f368493b5..185f2dec0 100644 --- a/example/plugins/microservices/filter_attributes.yaml.example +++ b/example/plugins/microservices/filter_attributes.yaml.example @@ -2,6 +2,35 @@ module: satosa.micro_services.attribute_modifications.FilterAttributeValues name: AttributeFilter config: attribute_filters: + # default rules for any IdentityProvider + "": + # default rules for any requester + "": + # enforce controlled vocabulary (via simple notation) + eduPersonAffiliation: "^(faculty|student|staff|alum|member|affiliate|employee|library-walk-in)$" + eduPersonPrimaryAffiliation: "^(faculty|student|staff|alum|member|affiliate|employee|library-walk-in)$" + eduPersonScopedAffiliation: + # enforce controlled vocabulary (via extended notation) + regexp: "^(faculty|student|staff|alum|member|affiliate|employee|library-walk-in)@" + # enforce correct scope + shibmdscope_match_scope: + eduPersonPrincipalName: + # enforce correct scope + shibmdscope_match_scope: + subject-id: + # enforce attribute syntax + regexp: "^[0-9A-Za-z][-=0-9A-Za-z]{0,126}@[0-9A-Za-z][-.0-9A-Za-z]{0,126}\\Z" + # enforce correct scope + shibmdscope_match_scope: + pairwise-id: + # enforce attribute syntax + regexp: "^[0-9A-Za-z][-=0-9A-Za-z]{0,126}@[0-9A-Za-z][-.0-9A-Za-z]{0,126}\\Z" + # enforce correct scope + shibmdscope_match_scope: + schacHomeOrganization: + # enforce scoping rule on attribute value + shibmdscope_match_value: + target_provider1: requester1: attr1: "^foo:bar$" diff --git a/src/satosa/micro_services/attribute_modifications.py b/src/satosa/micro_services/attribute_modifications.py index 67633af27..bb00761b4 100644 --- a/src/satosa/micro_services/attribute_modifications.py +++ b/src/satosa/micro_services/attribute_modifications.py @@ -1,7 +1,11 @@ import re +import logging from .base import ResponseMicroService +from ..context import Context +from ..exception import SATOSAError +logger = logging.getLogger(__name__) class AddStaticAttributes(ResponseMicroService): """ @@ -29,28 +33,62 @@ def __init__(self, config, *args, **kwargs): def process(self, context, data): # apply default filters provider_filters = self.attribute_filters.get("", {}) - self._apply_requester_filters(data.attributes, provider_filters, data.requester) + target_provider = data.auth_info.issuer + self._apply_requester_filters(data.attributes, provider_filters, data.requester, context, target_provider) # apply target provider specific filters - target_provider = data.auth_info.issuer provider_filters = self.attribute_filters.get(target_provider, {}) - self._apply_requester_filters(data.attributes, provider_filters, data.requester) + self._apply_requester_filters(data.attributes, provider_filters, data.requester, context, target_provider) return super().process(context, data) - def _apply_requester_filters(self, attributes, provider_filters, requester): + def _apply_requester_filters(self, attributes, provider_filters, requester, context, target_provider): # apply default requester filters default_requester_filters = provider_filters.get("", {}) - self._apply_filter(attributes, default_requester_filters) + self._apply_filters(attributes, default_requester_filters, context, target_provider) # apply requester specific filters requester_filters = provider_filters.get(requester, {}) - self._apply_filter(attributes, requester_filters) - - def _apply_filter(self, attributes, attribute_filters): - for attribute_name, attribute_filter in attribute_filters.items(): - regex = re.compile(attribute_filter) - if attribute_name == "": # default filter for all attributes - for attribute, values in attributes.items(): - attributes[attribute] = list(filter(regex.search, attributes[attribute])) - elif attribute_name in attributes: - attributes[attribute_name] = list(filter(regex.search, attributes[attribute_name])) + self._apply_filters(attributes, requester_filters, context, target_provider) + + def _apply_filters(self, attributes, attribute_filters, context, target_provider): + for attribute_name, attribute_filters in attribute_filters.items(): + if type(attribute_filters) == str: + # convert simple notation to filter list + attribute_filters = {'regexp': attribute_filters} + + for filter_type, filter_value in attribute_filters.items(): + + if filter_type == "regexp": + filter_func = re.compile(filter_value).search + elif filter_type == "shibmdscope_match_scope": + mdstore = context.get_decoration(Context.KEY_METADATA_STORE) + md_scopes = list(mdstore.shibmd_scopes(target_provider,"idpsso_descriptor")) if mdstore else [] + filter_func = lambda v: self._shibmdscope_match_scope(v, md_scopes) + elif filter_type == "shibmdscope_match_value": + mdstore = context.get_decoration(Context.KEY_METADATA_STORE) + md_scopes = list(mdstore.shibmd_scopes(target_provider,"idpsso_descriptor")) if mdstore else [] + filter_func = lambda v: self._shibmdscope_match_value(v, md_scopes) + else: + raise SATOSAError("Unknown filter type") + + if attribute_name == "": # default filter for all attributes + for attribute, values in attributes.items(): + attributes[attribute] = list(filter(filter_func, attributes[attribute])) + elif attribute_name in attributes: + attributes[attribute_name] = list(filter(filter_func, attributes[attribute_name])) + + def _shibmdscope_match_value(self, value, md_scopes): + for md_scope in md_scopes: + if not md_scope['regexp'] and md_scope['text'] == value: + return True + elif md_scope['regexp'] and re.fullmatch(md_scope['text'], value): + return True + return False + + def _shibmdscope_match_scope(self, value, md_scopes): + split_value = value.split('@') + if len(split_value) != 2: + logger.info(f"Discarding invalid scoped value {value}") + return False + value_scope = split_value[1] + return self._shibmdscope_match_value(value_scope, md_scopes) diff --git a/tests/satosa/micro_services/test_attribute_modifications.py b/tests/satosa/micro_services/test_attribute_modifications.py index 0efaec43e..aa1fcb8d5 100644 --- a/tests/satosa/micro_services/test_attribute_modifications.py +++ b/tests/satosa/micro_services/test_attribute_modifications.py @@ -1,3 +1,9 @@ +import pytest +from tests.util import FakeIdP, create_metadata_from_config_dict, FakeSP +from saml2.mdstore import MetadataStore +from saml2.config import Config +from satosa.context import Context +from satosa.exception import SATOSAError from satosa.internal import AuthenticationInformation from satosa.internal import InternalData from satosa.micro_services.attribute_modifications import FilterAttributeValues @@ -10,6 +16,22 @@ def create_filter_service(self, attribute_filters): filter_service.next = lambda ctx, data: data return filter_service + def create_idp_metadata_conf_with_shibmd_scopes(self, idp_entityid, shibmd_scopes): + idp_conf = { + "entityid": idp_entityid, + "service": { + "idp":{} + } + } + + if shibmd_scopes is not None: + idp_conf["service"]["idp"]["scope"] = shibmd_scopes + + metadata_conf = { + "inline": [create_metadata_from_config_dict(idp_conf)] + } + return metadata_conf + def test_filter_all_attributes_from_all_target_providers_for_all_requesters(self): attribute_filters = { "": { # all providers @@ -116,3 +138,264 @@ def test_filter_one_attribute_for_one_target_provider_for_one_requester(self): } filtered = filter_service.process(None, resp) assert filtered.attributes == {"a1": ["1:foo:bar:2"]} + + def test_filter_one_attribute_from_all_target_providers_for_all_requesters_in_extended_notation(self): + attribute_filters = { + "": { + "": { + "a2": { + "regexp": "^foo:bar$" + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo:bar", "1:foo:bar:2"], + } + filtered = filter_service.process(None, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["foo:bar"]} + + def test_invalid_filter_type(self): + attribute_filters = { + "": { + "": { + "a2": { + "invalid_filter": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo:bar", "1:foo:bar:2"], + } + with pytest.raises(SATOSAError): + filtered = filter_service.process(None, resp) + + def test_shibmdscope_match_value_filter_with_no_md_store_in_context(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo:bar", "1:foo:bar:2"], + } + ctx = Context() + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": []} + + def test_shibmdscope_match_value_filter_with_empty_md_store_in_context(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo:bar", "1:foo:bar:2"], + } + ctx = Context() + mdstore = MetadataStore(None, None) + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": []} + + def test_shibmdscope_match_value_filter_with_idp_md_with_no_scope(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo.bar", "1.foo.bar.2"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, None)) + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": []} + + def test_shibmdscope_match_value_filter_with_idp_md_with_single_scope(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo.bar", "1.foo.bar.2"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar"])) + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["foo.bar"]} + + def test_shibmdscope_match_value_filter_with_idp_md_with_single_regexp_scope(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["test.foo.bar", "1.foo.bar.2"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["[^.]*\.foo\.bar$"])) + # mark scope as regexp (cannot be done via pysaml2 YAML config) + mdstore[idp_entityid]['idpsso_descriptor'][0]['extensions']['extension_elements'][0]['regexp'] = 'true' + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["test.foo.bar"]} + + def test_shibmdscope_match_value_filter_with_idp_md_with_multiple_scopes(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_value": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo.bar", "1.foo.bar.2", "foo.baz", "foo.baz.com"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar", "foo.baz"])) + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["foo.bar", "foo.baz"]} + + def test_shibmdscope_match_scope_filter_with_single_scope(self): + attribute_filters = { + "": { + "": { + "a2": { + "shibmdscope_match_scope": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo.bar", "value@foo.bar", "1.foo.bar.2", "value@foo.bar.2", "value@extra@foo.bar"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar"])) + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["value@foo.bar"]} + + def test_multiple_filters_for_single_attribute(self): + attribute_filters = { + "": { + "": { + "a2": { + "regexp": "^value1@", + "shibmdscope_match_scope": None + } + } + } + } + filter_service = self.create_filter_service(attribute_filters) + + resp = InternalData(AuthenticationInformation()) + resp.attributes = { + "a1": ["abc:xyz"], + "a2": ["foo.bar", "value1@foo.bar", "value2@foo.bar", "1.foo.bar.2", "value@foo.bar.2", "value@extra@foo.bar"], + } + + idp_entityid = 'https://idp.example.org/' + resp.auth_info.issuer = idp_entityid + + mdstore = MetadataStore(None, Config()) + mdstore.imp(self.create_idp_metadata_conf_with_shibmd_scopes(idp_entityid, ["foo.bar"])) + ctx = Context() + ctx.decorate(Context.KEY_METADATA_STORE, mdstore) + + filtered = filter_service.process(ctx, resp) + assert filtered.attributes == {"a1": ["abc:xyz"], "a2": ["value1@foo.bar"]}