-
Notifications
You must be signed in to change notification settings - Fork 4
First commit of the IdP Metadata Attribute Store #8
base: master
Are you sure you want to change the base?
First commit of the IdP Metadata Attribute Store #8
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend these improvements.
|
||
# Process the default configuration first then any per-IdP overrides. | ||
idp_list = ['default'] | ||
idp_list.extend([ key for key in config.keys() if key != 'default' ]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does 'default' have to be first? The following seems to be what you need.
idp_list = config.keys()
id_default = idp_list.pop('default', {})
self.config_defaults.update(idp_default)
|
||
for idp in idp_list: | ||
if not isinstance(config[idp], dict): | ||
msg = "Configuration value for {} must be a dictionary" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This format string is missing a parameter. I suppose you want:
msg = "Configuration for {} must be a dictionary".format(idp)
if 'default' not in config: | ||
msg = "No default configuration is present" | ||
satosa_logging(logger, logging.ERROR, msg, None) | ||
raise IdpMetadataAttributeStoreError(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this should be an error. I would allow a missing default, which would make this microservice only act upon the specified entities and by default ignore all non-matching ones. This would behave like having "ignore": True
under "default"
, but without having to add "ignore": False
to the specified idp configs (you need this, as "ignore": True
would propagate from the default profile)
raise IdpMetadataAttributeStoreError(msg) | ||
|
||
# Initialize configuration using module defaults then update | ||
# with configuration defaults and then per-IdP overrides. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can precompute this, as config_defaults
and config['default']
will not change - that's what I've done above with idp_list
.
# with configuration defaults and then per-IdP overrides. | ||
idp_config = copy.deepcopy(IdpMetadataAttributeStore.config_defaults) | ||
if 'default' in self.config: | ||
idp_config.update(self.config['default']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This
if 'default' in self.config:
should always by True
(if you take into account the No default configuration is present
error).
But, if you follow my comments, you can replace these lines with:
idp_config = copy.deepcopy(self.config_defaults)
idp_config.update(config[idp])
config_defaults
should be prepared with config['default']
and you can use that without checks.
if 'text' in e: | ||
return e['text'] | ||
|
||
return '' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, now I see what're trying to do here, according to the comment. I guess that's why the previous loop was nested.
first_text_lang = None
first_text = None
fallback = ''
for el in elements:
if 'text' in el.keys():
if first_text is None:
first_text = el['text']
if first_text_lang is None and lang in el.keys(): # should this be: lang is el.get('lang', None) ?
first_text_lang = el['text']
break
return first_text_lang or first_text or fallback
PS: Note my comment there. lang
represents the value, not the key-name. I guess we should be checking for that: lang is el.get('lang', None)
satosa_logging(logger, logging.ERROR, "Unable to determine the entityID for the IdP issuer", context.state) | ||
return super().process(context, data) | ||
|
||
# Get the configuration for the IdP. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the comments above (allowing not having a default configuration), I would do the following:
config = self.config.get(idp_entity_id) or self.config.get('default', {})
satosa_logging(logger, logging.DEBUG, "Using config {}".format(config), context.state)
|
||
satosa_logging(logger, logging.DEBUG, "Using config {}".format(config), context.state) | ||
|
||
# Ignore this IdP if so configured. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
..continuing from above:
if not config or config.get('ignore'):
satosa_logging(logger, logging.INFO, "Ignoring IdP {}".format(idp_entity_id), context.state)
return super().process(context, data)
metadata = metadata_store[idp_entity_id] | ||
except Exception as err: | ||
satosa_logging(logger, logging.ERROR, "Unable to retrieve metadata for IdP {}".format(idp_entity_id), context.state) | ||
return super().process(context, data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a recurring pattern:
try:
something()
except:
return super().process(context, data)
This can be handled better by changing process()
to call _process()
as such:
class IdpMetadataAttributeStoreWarning(SATOSAError):
pass
def process(..):
try:
self._process(..)
except IdpMetadataAttributeStoreWarning as ex:
satosa_logging(logger, logging.WARN, str(ex), context.state)
return super().process(context, data)
def _process(..):
# all the code previously on process() but with modified parts as such:
try:
something()
except SomeError as err:
msg_warn = "something went wrong, but it's ok"
raise IdpMetadataAttributeStoreWarning(msg_warn) from err
Errors of type IdpMetadataAttributeStoreWarning
will be logged and let the process continue, but other errors will be raised and stop the execution. This also makes errors that we handle explicit.
|
||
satosa_logging(logger, logging.DEBUG, "Metadata for IdP {} is {}".format(idp_entity_id, metadata), context.state) | ||
|
||
# Find the mdui:DisplayName for the IdP if so configured. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would split the processing of the attributes into different classes. This may be a bit of over-engineering but it allows to extend the mechanism by only creating a new class with just the appropriate find handler and mapping it to the factory. It also makes code reusable.
from abc import ABC
# abstract class acts like an interface
class GenericAttribute(ABC): # not sure about the name
@abstractmethod
def find(self, metadata, lang, *argv):
raise NotImplementedError
# organization_display_name and organization_name use the same mechanism for find()
class GenericName(GenericAttribute, ABC):
def find(self, metadata, lang, *argv):
org_display_name_elements = metadata['organization'][self.attr_name]
value = self._first_lang_element_text(org_display_name_elements, lang)
return value
class OrganizationDisplayName(GenericName):
attr_name = 'organization_display_name'
class OrganizationName(GenericName):
attr_name = 'organization_name'
class DisplayName(GenericAttribute):
attr_name = 'display_name'
def find(self, metadata, lang, *argv):
extensions = metadata['idpsso_descriptor'][0]['extensions']['extension_elements']
for e in extensions:
if e['__class__'] == 'urn:oasis:names:tc:SAML:metadata:ui&UIInfo':
display_name_elements = e[self.attr_name]
value = self._first_lang_element_text(display_name_elements, lang)
break
return value
class AttributeFactory:
factory = {
'organization_display_name': OrganizationDisplayName,
'organization_name': OrganizationName,
'display_name': DisplayName,
}
def create(attr_type):
try:
attr_class = factory[attr_type]
except KeyError as err:
msg_err = "No handler for attribute: {}".format(attr_type)
raise IdpMetadataAttributeStoreError(msg_err) from err
else:
return attr_class()
def process(..):
...
for attr in config.keys():
attribute_handler = AttributeFactory.create(attr)
value = attribute_handler.find(metadata, lang)
if value is not None:
data.attributes[config[attr]['internal_attribute_name']] = value
msg_dbg = "attribute {} is {}".format(attr, value)
satosa_logging(logger, logging.DEBUG, msg_dbg, context.state)
Added capability to assert the entityID of the authenticating IdP as an attribute.
8e8f52d
to
bb8689b
Compare
No description provided.