From 48c2de685e1c206c1693158c1f55c3dacaf46c6d Mon Sep 17 00:00:00 2001 From: Samuel Cabrero Date: Wed, 15 Jan 2025 10:36:49 +0100 Subject: [PATCH] INTG-TESTS: Add Tests for service by name and by port lookups Signed-off-by: Samuel Cabrero --- src/tests/intg/ldap_ent.py | 23 +++++ src/tests/intg/sssd_services.py | 163 ++++++++++++++++++++++++++++++++ src/tests/intg/test_resolver.py | 63 ++++++++++++ 3 files changed, 249 insertions(+) create mode 100644 src/tests/intg/sssd_services.py diff --git a/src/tests/intg/ldap_ent.py b/src/tests/intg/ldap_ent.py index 2a56f3d9cb5..e5d1af43ac8 100644 --- a/src/tests/intg/ldap_ent.py +++ b/src/tests/intg/ldap_ent.py @@ -171,6 +171,24 @@ def ip_net(base_dn, name, address, aliases=()): return ("cn=" + name + ",ou=Networks," + base_dn, attr_list) +def ip_service(base_dn, name, proto, port, aliases=()): + """ + Generate an RFC2307 ipService add-modlist for passing to ldap.add*. + """ + attr_list = [ + ('objectClass', [b'top', b'ipService']), + ('ipServicePort', [str(port).encode('utf-8')]), + ('ipServiceProtocol', [proto.encode('utf-8')]), + ] + if (len(aliases)) > 0: + alias_list = [alias.encode('utf-8') for alias in aliases] + alias_list.insert(0, name.encode('utf-8')) + attr_list.append(('cn', alias_list)) + else: + attr_list.append(('cn', [name.encode('utf-8')])) + return ("cn=" + name + ",ou=Services," + base_dn, attr_list) + + class List(list): """LDAP add-modlist list""" @@ -233,3 +251,8 @@ def add_ipnet(self, name, address, aliases=[], base_dn=None): """Add an RFC2307 ipNetwork add-modlist.""" self.append(ip_net(base_dn or self.base_dn, name, address, aliases)) + + def add_service(self, name, proto, port, aliases=[], base_dn=None): + """Add an RFC2307 ipService add-modlist.""" + self.append(ip_service(base_dn or self.base_dn, + name, proto, port, aliases)) diff --git a/src/tests/intg/sssd_services.py b/src/tests/intg/sssd_services.py new file mode 100644 index 00000000000..e114cc3766e --- /dev/null +++ b/src/tests/intg/sssd_services.py @@ -0,0 +1,163 @@ +# +# Module for simulation of utility "getent services -s sss" from coreutils +# +# Authors: +# Samuel Cabrero +# +# Copyright (C) 2025 SUSE LINUX GmbH, Nuernberg, Germany. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from ctypes import ( + c_int, + c_char_p, + c_ulong, + POINTER, + Structure, + create_string_buffer, +) +from sssd_nss import NssReturnCode, SssdNssError, nss_sss_ctypes_loader +import socket + +SERVICE_BUFLEN = 1024 + + +# struct servent from netdb.h +class Servent(Structure): + _fields_ = [ + ("s_name", c_char_p), + ("s_aliases", POINTER(c_char_p)), + ("s_port", c_int), + ("s_proto", c_char_p), + ] + + +def getservbyname_r(name, proto, result_p, buffer_p, buflen): + """ + ctypes wrapper for: + enum nss_status _nss_sss_getservbyname_r(const char *name, + const char *protocol, + struct servent *result, + char *buffer, size_t buflen, + int *errnop) + """ + func = nss_sss_ctypes_loader("_nss_sss_getservbyname_r") + func.restype = c_int + func.argtypes = [ + c_char_p, + c_char_p, + POINTER(Servent), + c_char_p, + c_ulong, + POINTER(c_int), + ] + + errno = POINTER(c_int)(c_int(0)) + + name = name.encode("utf-8") + proto = proto.encode("utf-8") + res = func(c_char_p(name), c_char_p(proto), result_p, buffer_p, buflen, errno) + + return (int(res), int(errno[0]), result_p) + + +def getservbyport_r(port, proto, result_p, buffer_p, buflen): + """ + ctypes wrapper for: + enum nss_status _nss_sss_getservbyport_r(int port, const char *protocol, + struct servent *result, + char *buffer, size_t buflen, + int *errnop) + """ + func = nss_sss_ctypes_loader("_nss_sss_getservbyport_r") + func.restype = c_int + func.argtypes = [ + c_int, + c_char_p, + POINTER(Servent), + c_char_p, + c_ulong, + POINTER(c_int), + ] + + errno = POINTER(c_int)(c_int(0)) + + port = socket.htons(port) + proto = proto.encode("utf-8") + res = func(port, c_char_p(proto), result_p, buffer_p, buflen, errno) + + return (int(res), int(errno[0]), result_p) + + +def set_servent_dict(res, result_p): + if res != NssReturnCode.SUCCESS: + return dict() + + servent_dict = dict() + servent_dict["name"] = result_p[0].s_name.decode("utf-8") + servent_dict["aliases"] = list() + servent_dict["port"] = result_p[0].s_port + servent_dict["proto"] = result_p[0].s_proto + + i = 0 + while result_p[0].s_aliases[i] is not None: + alias = result_p[0].s_aliases[i].decode("utf-8") + servent_dict["aliases"].append(alias) + i = i + 1 + + return servent_dict + + +def call_sssd_getservbyname(name, proto): + """ + A Python wrapper to retrieve a service by name and protocol. Returns: + (res, servent_dict) + if res is NssReturnCode.SUCCESS, then servent_dict contains the keys + corresponding to the C servent structure fields. Otherwise, the dictionary + is empty and errno indicates the error code + """ + result = Servent() + result_p = POINTER(Servent)(result) + buff = create_string_buffer(SERVICE_BUFLEN) + + (res, errno, result_p) = getservbyname_r( + name, proto, result_p, buff, SERVICE_BUFLEN + ) + if errno != 0: + raise SssdNssError(errno, "getservbyname_r") + + servent_dict = set_servent_dict(res, result_p) + return (res, servent_dict) + + +def call_sssd_getservbyport(port, proto): + """ + A Python wrapper to retrieve a service by port and protocol. Returns: + (res, servent_dict) + if res is NssReturnCode.SUCCESS, then servent_dict contains the keys + corresponding to the C servent structure fields. Otherwise, the dictionary + is empty and errno indicates the error code + """ + result = Servent() + result_p = POINTER(Servent)(result) + buff = create_string_buffer(SERVICE_BUFLEN) + + (res, errno, result_p) = getservbyport_r( + port, proto, result_p, buff, SERVICE_BUFLEN + ) + if errno != 0: + raise SssdNssError(errno, "getservbyport_r") + + servent_dict = set_servent_dict(res, result_p) + return (res, servent_dict) diff --git a/src/tests/intg/test_resolver.py b/src/tests/intg/test_resolver.py index 072d41e7bfd..e6e565b2feb 100644 --- a/src/tests/intg/test_resolver.py +++ b/src/tests/intg/test_resolver.py @@ -34,6 +34,7 @@ from sssd_nss import NssReturnCode, HostError from sssd_hosts import call_sssd_gethostbyname from sssd_nets import call_sssd_getnetbyname, call_sssd_getnetbyaddr +from sssd_services import call_sssd_getservbyname, call_sssd_getservbyport LDAP_BASE_DN = "dc=example,dc=com" @@ -240,6 +241,25 @@ def add_nets(request, ldap_conn): return None +@pytest.fixture +def add_services(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + + ent_list.add_service("svc1", "tcp", 11111, aliases=["svc1_alias1", "svc1_alias2"]) + ent_list.add_service( + "svc2_1", "tcp", 22222, aliases=["svc2_1_alias1", "svc2_1_alias2"] + ) + ent_list.add_service( + "svc2_2", "tcp", 22222, aliases=["svc2_2_alias1", "svc2_2_alias2"] + ) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + def test_hostbyname(add_hosts): (res, hres, _) = call_sssd_gethostbyname("invalid") @@ -325,3 +345,46 @@ def test_netbyaddr(add_nets): (res, hres, _) = call_sssd_getnetbyaddr("10.2.2.2", socket.AF_UNSPEC) assert res == NssReturnCode.SUCCESS assert hres == 0 + + +def test_servbyname(add_services): + (res, _) = call_sssd_getservbyname("svcX", "tcp") + assert res == NssReturnCode.NOTFOUND + + (res, _) = call_sssd_getservbyname("svcX", "udp") + assert res == NssReturnCode.NOTFOUND + + (res, _) = call_sssd_getservbyname("svc1", "tcp") + assert res == NssReturnCode.SUCCESS + + (res, _) = call_sssd_getservbyname("svc1", "udp") + assert res == NssReturnCode.NOTFOUND + + (res, _) = call_sssd_getservbyname("svc1_alias2", "tcp") + assert res == NssReturnCode.SUCCESS + + (res, _) = call_sssd_getservbyname("svc2_1", "tcp") + assert res == NssReturnCode.SUCCESS + + (res, _) = call_sssd_getservbyname("svc2_2", "tcp") + assert res == NssReturnCode.SUCCESS + + +def test_servbyport(add_services): + (res, _) = call_sssd_getservbyport(1234, "tcp") + assert res == NssReturnCode.NOTFOUND + + (res, _) = call_sssd_getservbyport(1234, "udp") + assert res == NssReturnCode.NOTFOUND + + (res, _) = call_sssd_getservbyport(11111, "tcp") + assert res == NssReturnCode.SUCCESS + + (res, _) = call_sssd_getservbyport(11111, "udp") + assert res == NssReturnCode.NOTFOUND + + (res, _) = call_sssd_getservbyport(22222, "tcp") + assert res == NssReturnCode.SUCCESS + + (res, _) = call_sssd_getservbyport(22222, "udp") + assert res == NssReturnCode.NOTFOUND