Skip to content

Commit

Permalink
Fix specifying allowed UserIdentityTokens
Browse files Browse the repository at this point in the history
  • Loading branch information
cziebuhr authored and Christoph Ziebuhr committed Oct 18, 2024
1 parent e7f1397 commit 33faf14
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 21 deletions.
2 changes: 1 addition & 1 deletion asyncua/server/internal_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self, user_manager: UserManager = None):
self._time_task_stop = False
self.match_discovery_endpoint_url: bool = True
self.match_discovery_source_ip: bool = True
self.supported_tokens = []
self.supported_tokens = (ua.AnonymousIdentityToken, ua.X509IdentityToken, ua.UserNameIdentityToken)

async def init(self, shelffile: Optional[Path] = None):
await self.load_standard_address_space(shelffile)
Expand Down
41 changes: 23 additions & 18 deletions asyncua/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def __init__(self, iserver: InternalServer = None, user_manager=None):
]
# allow all certificates by default
self._permission_ruleset = SimpleRoleRuleset()
self._policyIDs = ["Anonymous", "Basic256Sha256", "Username", "Aes128Sha256RsaOaep", "Aes256Sha256RsaPss"]
self.certificate: Optional[x509.Certificate] = None
# Use acceptable limits
buffer_sz = 65535
Expand Down Expand Up @@ -349,19 +348,29 @@ def set_security_policy(self, security_policy, permission_ruleset=None):

def set_security_IDs(self, policy_ids):
"""
Method setting up the security endpoints for identification
of clients. During server object initialization, all possible
endpoints are enabled:
self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
DEPRECATED!
Only available for backwards compatibility.
Use set_identity_tokens instead.
"""
_logger.warning("set_security_IDs is deprecated, use set_identity_tokens instead!")
tokens = []
if "Anonymous" in policy_ids:
tokens.append(ua.AnonymousIdentityToken)
if "Basic256Sha256" in policy_ids:
tokens.append(ua.X509IdentityToken)
if "Username" in policy_ids:
tokens.append(ua.UserNameIdentityToken)
self.set_identity_tokens(tokens)

E.g. to limit the number of IDs and disable anonymous clients:
def set_identity_tokens(self, tokens):
"""
Method setting up allowed identity token types for authentication.
set_security_IDs(["Basic256Sha256"])
E.g. to disable anonymous clients:
(Implementation for ID check is currently not finalized...)
set_identity_tokens([ua.X509IdentityToken, ua.UserNameIdentityToken])
"""
self._policyIDs = policy_ids
self.iserver.supported_tokens = tuple(tokens)

async def _setup_server_nodes(self):
# to be called just before starting server since it needs all parameters to be setup
Expand Down Expand Up @@ -492,30 +501,27 @@ def determine_security_level(security_policy_uri: str, security_mode: ua.Message

def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtokens = []
supported_token_classes = []
if "Anonymous" in self._policyIDs:
tokens = self.iserver.supported_tokens
if ua.AnonymousIdentityToken in tokens:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = "anonymous"
idtoken.TokenType = ua.UserTokenType.Anonymous
idtoken.SecurityPolicyUri = policy.URI
idtokens.append(idtoken)
supported_token_classes.append(ua.AnonymousIdentityToken)

if "Basic256Sha256" in self._policyIDs:
if ua.X509IdentityToken in tokens:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'certificate_basic256sha256'
idtoken.TokenType = ua.UserTokenType.Certificate
idtoken.SecurityPolicyUri = policy.URI
idtokens.append(idtoken)
supported_token_classes.append(ua.X509IdentityToken)

if "Username" in self._policyIDs:
if ua.UserNameIdentityToken in tokens:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = "username"
idtoken.TokenType = ua.UserTokenType.UserName
idtoken.SecurityPolicyUri = policy.URI
idtokens.append(idtoken)
supported_token_classes.append(ua.UserNameIdentityToken)

appdesc = ua.ApplicationDescription()
appdesc.ApplicationName = ua.LocalizedText(self.name)
Expand All @@ -535,7 +541,6 @@ def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.N
edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
edp.SecurityLevel = Server.determine_security_level(policy.URI, mode)
self.iserver.add_endpoint(edp)
self.iserver.supported_tokens = tuple(supported_token_classes)

def set_server_name(self, name):
self.name = name
Expand Down
3 changes: 3 additions & 0 deletions asyncua/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,9 @@ def set_security_policy(self, security_policy, permission_ruleset=None):
def set_security_IDs(self, policy_ids):
return self.aio_obj.set_security_IDs(policy_ids)

def set_identity_tokens(self, tokens):
return self.aio_obj.set_identity_tokens(tokens)

def disable_clock(self, val: bool = True):
return self.aio_obj.disable_clock(val)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_crypto_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ async def test_anonymous_rejection():
await srv.init()
srv.set_endpoint(uri_crypto_cert)
srv.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
srv.set_security_IDs(["Username", "Basic256Sha256"])
srv.set_identity_tokens([ua.UserNameIdentityToken, ua.X509IdentityToken])
await srv.load_certificate(cert)
await srv.load_private_key(key)
await srv.start()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_password.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get_user(self, iserver, username=None, password=None, certificate=None):
async def srv_user():
srv = Server(user_manager=UserManager())
srv.set_endpoint(uri)
srv.set_security_IDs(["Username"])
srv.set_identity_tokens([ua.UserNameIdentityToken])

await srv.init()
await srv.start()
Expand Down

0 comments on commit 33faf14

Please sign in to comment.