Skip to content
This repository has been archived by the owner on Jun 23, 2023. It is now read-only.

Commit

Permalink
Use configuration classes from oidcmsg.
Browse files Browse the repository at this point in the history
Need to have oidcmsg 1.5.4 .
utc_time_sans_frac instead of time.time() since the later is unspecified when it comes to timezone.
  • Loading branch information
rohe committed Dec 14, 2021
1 parent 29d6f35 commit 2f2318d
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 261 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def run_tests(self):
"Programming Language :: Python :: 3.9",
"Topic :: Software Development :: Libraries :: Python Modules"],
install_requires=[
"oidcmsg==1.5.3",
"oidcmsg==1.5.4",
"pyyaml",
"jinja2>=2.11.3",
"responses>=0.13.0"
Expand Down
261 changes: 31 additions & 230 deletions src/oidcop/configure.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,17 @@
"""Configuration management for IDP"""
"""Configuration management for OP"""
import copy
import importlib
import json
import logging
import os
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

from oidcop.logging import configure_logging
from oidcmsg.configure import Base

from oidcop.scopes import SCOPE2CLAIMS
from oidcop.utils import load_yaml_config

logger = logging.getLogger(__name__)


DEFAULT_FILE_ATTRIBUTE_NAMES = [
"server_key",
"server_cert",
"filename",
"template_dir",
"private_path",
"public_path",
"db_file",
"jwks_file",
]

OP_DEFAULT_CONFIG = {
"capabilities": {
"subject_types_supported": ["public", "pairwise"],
Expand Down Expand Up @@ -105,109 +90,6 @@
}


def add_base_path(conf: Union[dict, str], base_path: str, file_attributes: List[str]):
if isinstance(conf, str):
if conf.startswith("/"):
pass
elif conf == "":
conf = "./" + conf
else:
conf = os.path.join(base_path, conf)
elif isinstance(conf, dict):
for key, val in conf.items():
if key in file_attributes:
if val.startswith("/"):
continue
elif val == "":
conf[key] = "./" + val
else:
conf[key] = os.path.join(base_path, val)
if isinstance(val, dict):
conf[key] = add_base_path(val, base_path, file_attributes)

return conf


def set_domain_and_port(conf: dict, uris: List[str], domain: str, port: int):
for key, val in conf.items():
if key in uris:
if isinstance(val, list):
_new = [v.format(domain=domain, port=port) for v in val]
else:
_new = val.format(domain=domain, port=port)
conf[key] = _new
elif isinstance(val, dict):
conf[key] = set_domain_and_port(val, uris, domain, port)
return conf


def create_from_config_file(
cls,
filename: str,
base_path: str = "",
entity_conf: Optional[List[dict]] = None,
file_attributes: Optional[List[str]] = None,
domain: Optional[str] = "",
port: Optional[int] = 0,
):
if filename.endswith(".yaml"):
"""Load configuration as YAML"""
_conf = load_yaml_config(filename)
elif filename.endswith(".json"):
_str = open(filename).read()
_conf = json.loads(_str)
elif filename.endswith(".py"):
head, tail = os.path.split(filename)
tail = tail[:-3]
module = importlib.import_module(tail)
_conf = getattr(module, "OIDCOP_CONFIG")
else:
raise ValueError("Unknown file type")

return cls(
_conf,
entity_conf=entity_conf,
base_path=base_path,
file_attributes=file_attributes,
domain=domain,
port=port,
)


class Base(dict):
""" Configuration base class """

parameter = {}

def __init__(
self,
conf: Dict,
base_path: str = "",
file_attributes: Optional[List[str]] = None,
):
dict.__init__(self)

if file_attributes is None:
file_attributes = DEFAULT_FILE_ATTRIBUTE_NAMES

if base_path and file_attributes:
# this adds a base path to all paths in the configuration
add_base_path(conf, base_path, file_attributes)

def __getattr__(self, item):
return self[item]

def __setattr__(self, key, value):
if key in self:
raise KeyError("{} has already been set".format(key))
super(Base, self).__setitem__(key, value)

def __setitem__(self, key, value):
if key in self:
raise KeyError("{} has already been set".format(key))
super(Base, self).__setitem__(key, value)


class EntityConfiguration(Base):
default_config = AS_DEFAULT_CONFIG
uris = ["issuer", "base_url"]
Expand All @@ -231,26 +113,18 @@ class EntityConfiguration(Base):
}

def __init__(
self,
conf: Dict,
base_path: Optional[str] = "",
entity_conf: Optional[List[dict]] = None,
domain: Optional[str] = "",
port: Optional[int] = 0,
file_attributes: Optional[List[str]] = None,
self,
conf: Dict,
base_path: Optional[str] = "",
entity_conf: Optional[List[dict]] = None,
domain: Optional[str] = "",
port: Optional[int] = 0,
file_attributes: Optional[List[str]] = None,
dir_attributes: Optional[List[str]] = None,
):

conf = copy.deepcopy(conf)
Base.__init__(self, conf, base_path, file_attributes)

if file_attributes is None:
file_attributes = DEFAULT_FILE_ATTRIBUTE_NAMES

if not domain:
domain = conf.get("domain", "127.0.0.1")

if not port:
port = conf.get("port", 80)
Base.__init__(self, conf, base_path, file_attributes, dir_attributes=dir_attributes)

for key in self.parameter.keys():
_val = conf.get(key)
Expand All @@ -263,6 +137,7 @@ def __init__(
file_attributes=file_attributes,
domain=domain,
port=port,
dir_attributes=dir_attributes
)
else:
continue
Expand All @@ -277,29 +152,6 @@ def __init__(

setattr(self, key, _val)

# try:
# _dir = self.template_dir
# except AttributeError:
# self.template_dir = os.path.abspath("templates")
# else:
# self.template_dir =

def format(self, conf, base_path, file_attributes, domain, port):
"""
Formats parts of the configuration. That includes replacing the strings {domain} and {port}
with the used domain and port and making references to files and directories absolute
rather then relative. The formatting is done in place.
:param conf: The configuration part
:param base_path: The base path used to make file/directory refrences absolute
:param file_attributes: Attribute names that refer to files or directories.
:param domain: The domain name
:param port: The port used
"""
add_base_path(conf, base_path, file_attributes)
if isinstance(conf, dict):
set_domain_and_port(conf, self.uris, domain=domain, port=port)


class OPConfiguration(EntityConfiguration):
"Provider configuration"
Expand All @@ -316,13 +168,14 @@ class OPConfiguration(EntityConfiguration):
)

def __init__(
self,
conf: Dict,
base_path: Optional[str] = "",
entity_conf: Optional[List[dict]] = None,
domain: Optional[str] = "",
port: Optional[int] = 0,
file_attributes: Optional[List[str]] = None,
self,
conf: Dict,
base_path: Optional[str] = "",
entity_conf: Optional[List[dict]] = None,
domain: Optional[str] = "",
port: Optional[int] = 0,
file_attributes: Optional[List[str]] = None,
dir_attributes: Optional[List[str]] = None,
):
super().__init__(
conf=conf,
Expand All @@ -331,21 +184,22 @@ def __init__(
domain=domain,
port=port,
file_attributes=file_attributes,
dir_attributes=dir_attributes
)
self.scopes_to_claims


class ASConfiguration(EntityConfiguration):
"Authorization server configuration"

def __init__(
self,
conf: Dict,
base_path: Optional[str] = "",
entity_conf: Optional[List[dict]] = None,
domain: Optional[str] = "",
port: Optional[int] = 0,
file_attributes: Optional[List[str]] = None,
self,
conf: Dict,
base_path: Optional[str] = "",
entity_conf: Optional[List[dict]] = None,
domain: Optional[str] = "",
port: Optional[int] = 0,
file_attributes: Optional[List[str]] = None,
dir_attributes: Optional[List[str]] = None,
):
EntityConfiguration.__init__(
self,
Expand All @@ -355,63 +209,10 @@ def __init__(
domain=domain,
port=port,
file_attributes=file_attributes,
dir_attributes=dir_attributes
)


class Configuration(Base):
"""Server Configuration"""

uris = ["issuer", "base_url"]

def __init__(
self,
conf: Dict,
entity_conf: Optional[List[dict]] = None,
base_path: str = "",
file_attributes: Optional[List[str]] = None,
domain: Optional[str] = "",
port: Optional[int] = 0,
):
Base.__init__(self, conf, base_path, file_attributes)

log_conf = conf.get("logging")
if log_conf:
self.logger = configure_logging(config=log_conf).getChild(__name__)
else:
self.logger = logging.getLogger("oidcop")

self.webserver = conf.get("webserver", {})

if not domain:
domain = conf.get("domain", "127.0.0.1")

if not port:
port = conf.get("port", 80)

set_domain_and_port(conf, self.uris, domain=domain, port=port)

if entity_conf:
for econf in entity_conf:
_path = econf.get("path")
_cnf = conf
if _path:
for step in _path:
_cnf = _cnf[step]
_attr = econf["attr"]
_cls = econf["class"]
setattr(
self,
_attr,
_cls(
_cnf,
base_path=base_path,
file_attributes=file_attributes,
domain=domain,
port=port,
),
)


DEFAULT_EXTENDED_CONF = {
"add_on": {
"pkce": {
Expand Down
Loading

0 comments on commit 2f2318d

Please sign in to comment.