From 59f8968a3e23cb465a5393090d0cdcaf03f702d5 Mon Sep 17 00:00:00 2001 From: Steve McGrath Date: Fri, 6 Dec 2024 16:21:23 -0600 Subject: [PATCH] Added Security Center Hosts API Support #614 --- .gitignore | 1 + docs/api/sc/hosts.rst | 1 + pyproject.toml | 5 +- tenable/sc/__init__.py | 274 ++++++++++++++++++++------------------ tenable/sc/base.py | 73 +++++----- tenable/sc/hosts.py | 180 +++++++++++++++++++------ tests/sc/conftest.py | 98 +++++++------- tests/sc/test___init__.py | 48 ++++--- tests/sc/test_hosts.py | 116 ++++++++++++++++ 9 files changed, 526 insertions(+), 270 deletions(-) create mode 100644 docs/api/sc/hosts.rst create mode 100644 tests/sc/test_hosts.py diff --git a/.gitignore b/.gitignore index 10ac174ed..ce58f83e5 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ Jenkinsfile /ex_*.py .idea /dist +__MACOSX diff --git a/docs/api/sc/hosts.rst b/docs/api/sc/hosts.rst new file mode 100644 index 000000000..eef20f4d0 --- /dev/null +++ b/docs/api/sc/hosts.rst @@ -0,0 +1 @@ +.. automodule:: tenable.sc.hosts diff --git a/pyproject.toml b/pyproject.toml index b92275ce2..95b1a6abe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -135,6 +135,5 @@ dev-dependencies = [ [tool.pytest.ini_options] addopts = "--cov-report term-missing --cov=tenable" -filterWarnings = [ - "ignore::DeprecationWarning" -] +testpaths = ['./tests'] +filterwarnings = ["ignore:::DeprecationWarning"] diff --git a/tenable/sc/__init__.py b/tenable/sc/__init__.py index a811deab9..a02e25c78 100644 --- a/tenable/sc/__init__.py +++ b/tenable/sc/__init__.py @@ -1,4 +1,4 @@ -''' +""" Tenable Security Center ======================= @@ -45,13 +45,17 @@ system users -''' +""" + +import tempfile import warnings from typing import Optional + from semver import VersionInfo -import tempfile -from tenable.errors import APIError, ConnectionError + from tenable.base.platform import APIPlatform +from tenable.errors import APIError, ConnectionError + from .accept_risks import AcceptRiskAPI from .alerts import AlertAPI from .analysis import AnalysisAPI @@ -59,9 +63,10 @@ from .audit_files import AuditFileAPI from .credentials import CredentialAPI from .current import CurrentSessionAPI -from .files import FileAPI from .feeds import FeedAPI +from .files import FileAPI from .groups import GroupAPI +from .hosts import HostsAPI from .organizations import OrganizationAPI from .plugins import PluginAPI from .policies import ScanPolicyAPI @@ -70,23 +75,23 @@ from .report_definition import ReportDefinitionAPI from .repositories import RepositoryAPI from .roles import RoleAPI -from .scanners import ScannerAPI -from .scans import ScanAPI from .scan_instances import ScanResultAPI from .scan_zones import ScanZoneAPI +from .scanners import ScannerAPI +from .scans import ScanAPI from .status import StatusAPI from .system import SystemAPI from .users import UserAPI try: - import cryptography.hazmat.primitives.serialization.pkcs12 - from cryptography.hazmat.primitives import serialization + import cryptography.hazmat.primitives.serialization.pkcs12 + from cryptography.hazmat.primitives import serialization except ImportError: - serialization = None + serialization = None class TenableSC(APIPlatform): # noqa PLR0904 - '''TenableSC API Wrapper + """TenableSC API Wrapper The Tenable Security Center object is the primary interaction point for users to interface with Tenable Security Center via the pyTenable library. All of the API endpoint classes that have been written will be grafted onto this class. @@ -184,7 +189,8 @@ class TenableSC(APIPlatform): # noqa PLR0904 http://docs.python-requests.org/en/master/user/advanced/#client-side-certificates .. _requests_pkcs12: https://github.com/m-click/requests_pkcs12 - ''' + """ + _env_base = 'TSC' _base_path: str = 'rest' _error_map = {403: APIError} @@ -201,25 +207,28 @@ class TenableSC(APIPlatform): # noqa PLR0904 'cert': ['_cert'], } - def __init__(self, # noqa: PLR0913 - host: Optional[str] = None, - access_key: Optional[str] = None, - secret_key: Optional[str] = None, - **kwargs - ): + def __init__( + self, # noqa: PLR0913 + host: Optional[str] = None, + access_key: Optional[str] = None, + secret_key: Optional[str] = None, + **kwargs, + ): # As we will always be passing a URL to the APISession class, we will # want to construct a URL that APISession (and further requests) # understands. if host: - warnings.warn('The "host", "port", and "scheme" parameters are ' - 'deprecated and will be removed from the TenableSC ' - 'class in version 2.0.', - DeprecationWarning, - stacklevel=2 - ) - kwargs['url'] = (f'{kwargs.get("scheme", "https")}://' - f'{host}:{kwargs.get("port", 443)}' - ) + warnings.warn( + 'The "host", "port", and "scheme" parameters are ' + 'deprecated and will be removed from the TenableSC ' + 'class in version 2.0.', + DeprecationWarning, + stacklevel=2, + ) + kwargs['url'] = ( + f'{kwargs.get("scheme", "https")}://' + f'{host}:{kwargs.get("port", 443)}' + ) kwargs['access_key'] = access_key kwargs['secret_key'] = secret_key @@ -262,37 +271,35 @@ def _key_auth(self, access_key, secret_key): # if we can pull a version, check to see that the version is at least # 5.13, which is the minimum version of SC that supports API Keys. If # we cant pull a version, then we will assume it's ok. - if (not self.version - or VersionInfo.parse(self.version).match('>=5.13.0') - ): - self._session.headers.update({ - 'X-APIKey': f'accessKey={access_key}; secretKey={secret_key}' - }) + if not self.version or VersionInfo.parse(self.version).match('>=5.13.0'): + self._session.headers.update( + {'X-APIKey': f'accessKey={access_key}; secretKey={secret_key}'} + ) self._auth_mech = 'keys' else: raise ConnectionError( - f'API Keys not supported on Tenable Security Center {self.version}' - ) + f'API Keys not supported on Tenable Security Center {self.version}' + ) def _session_auth(self, username, password): """ Basic Session Authentication """ - warnings.warn('Session based authentication to Security Center will be removed' - 'in later iterations of the library as it\'s no longer an' - 'oficially recommended method of authentication to SC.', - DeprecationWarning, - stacklevel=2 - ) - resp = self.post('token', json={ - 'username': username, - 'password': password - }) + warnings.warn( + 'Session based authentication to Security Center will be removed' + "in later iterations of the library as it's no longer an" + 'oficially recommended method of authentication to SC.', + DeprecationWarning, + stacklevel=2, + ) + resp = self.post('token', json={'username': username, 'password': password}) self._auth_mech = 'user' - self._session.headers.update({ - 'X-SecurityCenter': str(resp.json()['response']['token']), - 'TNS_SESSIONID': str(resp.headers['Set-Cookie'])[14:46] - }) + self._session.headers.update( + { + 'X-SecurityCenter': str(resp.json()['response']['token']), + 'TNS_SESSIONID': str(resp.headers['Set-Cookie'])[14:46], + } + ) def _p12_auth(self, p12_cert, password): """ @@ -312,14 +319,16 @@ def _p12_auth(self, p12_cert, password): fobj.read(), password.encode() ) self._client_key = tempfile.NamedTemporaryFile() # noqa: PLR1732 - self._client_key.write(key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), - )) + self._client_key.write( + key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + ) self._client_key.flush() - self._client_cert = tempfile.NamedTemporaryFile() # noqa: PLR1732 + self._client_cert = tempfile.NamedTemporaryFile() # noqa: PLR1732 self._client_cert.write(cert.public_bytes(serialization.Encoding.PEM)) self._client_cert.flush() return self._client_cert.name, self._client_key.name @@ -329,19 +338,19 @@ def _cert_auth(self, _cert): PEM Cert Authentication """ resp = self.get('system', box=False) - self._session.headers.update({ - 'X-SecurityCenter': str(resp.json()['response']['token']), - 'TNS_SESSIONID': str(resp.headers['Set-Cookie'])[14:46] - }) + self._session.headers.update( + { + 'X-SecurityCenter': str(resp.json()['response']['token']), + 'TNS_SESSIONID': str(resp.headers['Set-Cookie'])[14:46], + } + ) self._auth_meth = 'cert' - def _deauthenticate(self): # noqa PLW0221 super()._deauthenticate(path='token') - def login(self, username=None, password=None, - access_key=None, secret_key=None): - ''' + def login(self, username=None, password=None, access_key=None, secret_key=None): + """ Logs the user into Tenable Security Center Args: @@ -364,26 +373,29 @@ def login(self, username=None, password=None, >>> sc = TenableSC('127.0.0.1', port=8443) >>> sc.login(access_key='ACCESSKEY', secret_key='SECRETKEY') - ''' - warnings.warn('Use of the login method is deprecated and will be removed in' - 'later versions of the library', - DeprecationWarning, - stacklevel=2 - ) - self._authenticate(**{ - 'username': username, - 'password': password, - 'access_key': access_key, - 'secret_key': secret_key - }) + """ + warnings.warn( + 'Use of the login method is deprecated and will be removed in' + 'later versions of the library', + DeprecationWarning, + stacklevel=2, + ) + self._authenticate( + **{ + 'username': username, + 'password': password, + 'access_key': access_key, + 'secret_key': secret_key, + } + ) def logout(self): - ''' + """ Logs out of Tenable Security Center and resets the session. Examples: >>> sc.logout() - ''' + """ self._deauthenticate() @property @@ -405,200 +417,208 @@ def version(self): @property def accept_risks(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Accept Risks APIs `. - ''' + """ return AcceptRiskAPI(self) @property def alerts(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Alerts APIs `. - ''' + """ return AlertAPI(self) @property def analysis(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Analysis APIs `. - ''' + """ return AnalysisAPI(self) @property def asset_lists(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Asset Lists APIs `. - ''' + """ return AssetListAPI(self) @property def audit_files(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Audit Files APIs `. - ''' + """ return AuditFileAPI(self) @property def credentials(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Credentials APIs `. - ''' + """ return CredentialAPI(self) @property def current(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Current Session APIs `. - ''' + """ return CurrentSessionAPI(self) @property def feeds(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Feeds APIs `. - ''' + """ return FeedAPI(self) @property def files(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Files APIs `. - ''' + """ return FileAPI(self) @property def groups(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Groups APIs `. - ''' + """ return GroupAPI(self) + @property + def hosts(self): + """ + The interface object for the + :doc:`Tenable Security Center Hosts APIs `. + """ + return HostsAPI(self) + @property def organizations(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Organization APIs `. - ''' + """ return OrganizationAPI(self) @property def plugins(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Plugins APIs `. - ''' + """ return PluginAPI(self) @property def policies(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Policies APIs `. - ''' + """ return ScanPolicyAPI(self) @property def queries(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Queries APIs `. - ''' + """ return QueryAPI(self) @property def recast_risks(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Recast Risks APIs `. - ''' + """ return RecastRiskAPI(self) @property def report_definition(self): - ''' + """ The interface object for the :doc:`Tenable.sc ReportDefinition APIs `. - ''' + """ return ReportDefinitionAPI(self) @property def repositories(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Repositories APIs `. - ''' + """ return RepositoryAPI(self) @property def roles(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Roles APIs `. - ''' + """ return RoleAPI(self) @property def scanners(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Scanners APIs `. - ''' + """ return ScannerAPI(self) @property def scans(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Scans APIs `. - ''' + """ return ScanAPI(self) @property def scan_instances(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Scan Instances APIs `. - ''' + """ return ScanResultAPI(self) @property def scan_zones(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Scan Zones APIs `. - ''' + """ return ScanZoneAPI(self) @property def status(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Status APIs `. - ''' + """ return StatusAPI(self) @property def system(self): - ''' + """ The interface object for the :doc:`Tenable Security Center System APIs `. - ''' + """ return SystemAPI(self) @property def users(self): - ''' + """ The interface object for the :doc:`Tenable Security Center Users APIs `. - ''' + """ return UserAPI(self) diff --git a/tenable/sc/base.py b/tenable/sc/base.py index 83cb9c262..72d36068e 100644 --- a/tenable/sc/base.py +++ b/tenable/sc/base.py @@ -1,4 +1,4 @@ -''' +""" Common Themes ============= @@ -76,13 +76,15 @@ these recurrence rules. Further there are some packages out there to aid in converting more human-readable text into recurrence rules, such as the `recurrent package `_ for example. -''' +""" + from tenable.base.endpoint import APIEndpoint from tenable.base.v1 import APIResultsIterator + class SCEndpoint(APIEndpoint): def _combo_expansion(self, item): - ''' + """ Expands the asset combination expressions from nested tuples to the nested dictionary structure that's expected. @@ -92,17 +94,13 @@ def _combo_expansion(self, item): Returns: :obj:`dict`: The dictionary structure of the expanded asset list combinations. - ''' + """ # the operator conversion dictionary. The UI uses "and", "or", and # "not" whereas the API uses "intersection", "union", and "compliment". # if the user is passing us the tuples, lets assume that they are using # the UI definitions and not the API ones. - oper = { - 'and': 'intersection', - 'or': 'union', - 'not': 'complement' - } + oper = {'and': 'intersection', 'or': 'union', 'not': 'complement'} # some simple checking to ensure that we are being passed good data # before we expand the tuple. @@ -138,29 +136,25 @@ def _combo_expansion(self, item): return resp def _query_constructor(self, *filters, **kw): - ''' + """ Constructs an analysis query. This part has been pulled out of the _analysis method and placed here so that it can be re-used in other part of the library. - ''' + """ if 'filters' in kw: # if filters are explicitly called, then we will replace the # implicit filters with the explicit ones and remove the entry from # the keywords dictionary filters = self._check('filters', kw['filters'], list) - del(kw['filters']) + del kw['filters'] if 'query' not in kw and 'tool' in kw and 'type' in kw: - kw['query'] = { - 'tool': kw['tool'], - 'type': kw['type'], - 'filters': list() - } + kw['query'] = {'tool': kw['tool'], 'type': kw['type'], 'filters': list()} if 'query_id' in kw: # Request the specific query ID provided and fetch only the filters query_response = self._api.get( - 'query/{}?fields=filters'.format( - kw['query_id'])).json()['response'] + 'query/{}?fields=filters'.format(kw['query_id']) + ).json()['response'] # Extract the filters or set to null if nothing is returned query_filters = query_response.get('filters', list()) @@ -174,20 +168,22 @@ def _query_constructor(self, *filters, **kw): item = {'filterName': f[0], 'operator': f[1]} if len(f) >= 3: - if (isinstance(f[2], tuple) - and f[1] == '~' and f[0] == 'asset'): + if isinstance(f[2], tuple) and f[1] == '~' and f[0] == 'asset': # if this is a asset combination, then we will want to # expand the tuple into the expected dictionary # structure that the API is expecting. item['value'] = self._combo_expansion(f[2]) - elif (isinstance(f[2], list) - and all(isinstance(i, int) for i in f[2])): + elif isinstance(f[2], list) and all( + isinstance(i, int) for i in f[2] + ): # if the value is a list and all of the items within # that list are integers, then we can safely assume that # this is a list of integer ids that need to be expanded # into a list of dictionaries. item['value'] = [dict(id=str(i)) for i in f[2]] - elif (isinstance(f[2], int) and f[0] in ['asset',]): + elif isinstance(f[2], int) and f[0] in [ + 'asset', + ]: # If the value is an integer, then we will want to # expand the value into a dictionary with an id attr. item['value'] = dict(id=str(f[2])) @@ -214,17 +210,21 @@ def _query_constructor(self, *filters, **kw): # specified. if f[1] != None and f[2] != None: kw['query']['filters'].append(item) - del(kw['type']) + del kw['type'] return kw def _schedule_constructor(self, item): - ''' + """ Handles creation of the schedule sub-document. - ''' + """ self._check('schedule:item', item, dict) - item['type'] = self._check('schedule:type', item.get('type'), str, + item['type'] = self._check( + 'schedule:type', + item.get('type'), + str, choices=['ical', 'dependent', 'never', 'rollover', 'template', 'now'], - default='never') + default='never', + ) if item['type'] == 'ical': self._check('schedule:start', item.get('start'), str) self._check('schedule:repeatRule', item.get('repeatRule'), str) @@ -232,12 +232,16 @@ def _schedule_constructor(self, item): self._check('schedule:dependentID', item.get('dependentID'), int) return item + class SCResultsIterator(APIResultsIterator): + _method = 'GET' + _body = None + def _get_page(self): - ''' + """ Retrieves the next page of results when the current page has been exhausted. - ''' + """ # First we need to see if there is a page limit and if there is, have # we run into that limit. If we have, then return a StopIteration # exception. @@ -251,7 +255,12 @@ def _get_page(self): query['endOffset'] = self._limit + self._offset # Lets actually call the API for the data at this point. - resp = self._api.get(self._resource, params=query).json() + resp = self._api._req( + self._method, + self._resource, + params=query, + json=self._body, + ).json() # Now that we have the response, lets reset any counters we need to, # and increment things like the page counter, offset, etc. diff --git a/tenable/sc/hosts.py b/tenable/sc/hosts.py index a3e1b5cd8..7fbedecaf 100644 --- a/tenable/sc/hosts.py +++ b/tenable/sc/hosts.py @@ -1,4 +1,4 @@ -''' +""" Hosts ===== @@ -11,10 +11,11 @@ .. rst-class:: hide-signature .. autoclass:: HostsAPI :members: -''' -from typing import List, Union, Dict, Optional, Tuple +""" + +from typing import Dict, List, Literal, Optional, Tuple, Union + from .base import SCEndpoint, SCResultsIterator -from tenable.errors import UnexpectedValueError class HostsResultsIterator(SCResultsIterator): @@ -25,14 +26,15 @@ class HostsAPI(SCEndpoint): _path = 'hosts' _box = True - def list(self, - fields: List[str], - limit: int = 10000, - offset: int = 0, - pages: Optional[int] = None, - pagination: bool = True, - return_json: bool = False, - ) -> Union[HostsResultsIterator, Dict]: + def list( + self, + fields: Optional[List[str]] = None, + limit: int = 10000, + offset: int = 0, + pages: Optional[int] = None, + pagination: bool = True, + return_json: bool = False, + ) -> Union[HostsResultsIterator, Dict]: """ Retreive the list of hosts from the system. @@ -60,6 +62,26 @@ def list(self, >>> for host in sc.hosts.list(): ... print(host) """ + if not fields: + fields = [ + 'id', + 'uuid', + 'tenableUUID', + 'name', + 'ipAddress', + 'os', + 'firstSeen', + 'lastSeen', + 'macAddress', + 'source', + 'repID', + 'netBios', + 'netBiosWorkgroup', + 'createdTime', + 'modifiedTime', + 'acr', + 'aes', + ] params = { 'fields': ','.join(fields), 'limit': limit, @@ -69,27 +91,35 @@ def list(self, } if return_json: return self._get(params=params).response - return HostsResultsIterator(self._api, - _resource='hosts' - _params=params, - _limit=limit, - _offset=offset - _pages_total=pages - ) - - def search(self, - filters: List[Tuple[str, str, str]], - fields: List[str], - limit: int = 10000, - offset: int = 0, - pages: Optional[int] = None, - pagination: bool = True, - return_json: bool = False, - ) -> Union[HostsResultsIterator, Dict]: + return HostsResultsIterator( + self._api, + _resource='hosts', + _query=params, + _limit=limit, + _offset=offset, + _pages_total=pages, + ) + + def search( + self, + *filters: Tuple[str, str, str], + filter_type: Literal['and', 'or'] = 'and', + fields: Optional[List[str]] = None, + limit: int = 10000, + offset: int = 0, + pages: Optional[int] = None, + pagination: bool = True, + return_json: bool = False, + ) -> Union[HostsResultsIterator, Dict]: """ Retreive the list of hosts from the system. Args: + filters (list[tuple[str, str, str]], optional): + List of search filter tuples. + filter_type (Literal['and', 'or'], optional): + The filtering boolean logic to use for multiple filters. If left + unspecified it defaults to `and`. fields (list[str], optional): What fields should be returned in the response. limit (int, 1000): @@ -110,9 +140,30 @@ def search(self, Examples: - >>> for host in sc.hosts.list(): + >>> for host in sc.hosts.search(filters=[('ip', 'eq', '1.2.3.4')]): ... print(host) """ + if not fields: + fields = [ + 'id', + 'uuid', + 'tenableUUID', + 'name', + 'ipAddress', + 'os', + 'firstSeen', + 'lastSeen', + 'macAddress', + 'source', + 'repID', + 'netBios', + 'netBiosWorkgroup', + 'createdTime', + 'modifiedTime', + 'acr', + 'aes', + ] + params = { 'fields': ','.join(fields), 'limit': limit, @@ -120,12 +171,65 @@ def search(self, 'endOffset': limit + offset, 'pagination': str(pagination).lower(), } + payload = { + 'filters': { + filter_type: [ + {'property': p, 'operator': o, 'value': v} for p, o, v in filters + ] + } + } if return_json: - return self._get(params=params).response - return HostsResultsIterator(self._api, - _resource='hosts' - _params=params, - _limit=limit, - _offset=offset - _pages_total=pages - ) + return self._post('search', params=params, json=payload).response + return HostsResultsIterator( + self._api, + _method='POST', + _resource='hosts/search', + _body=payload, + _query=params, + _limit=limit, + _offset=offset, + _pages_total=pages, + ) + + def update_acr( + self, + host_uuid: str, + reasoning: Optional[List[int]] = None, + score: Optional[int] = None, + notes: Optional[str] = None, + overwritten: bool = True, + ) -> Dict: + """ + Override the Asset Criticality Rating (ACR) score and the reasons for the + specified Host + + Args: + host_uuid (str): The Host UUID to modify + reasonings (list[int], optional): + The list of reasoning objects noting why the score was changed + score (int, optional): + The updated ACR score + notes (str, optional): + Notes detailing why the score was changed + overwritten (bool): + Should we use the overwritten score or the default one? + + Returns: + The updated host object. + + Example: + >>> sc.host.update_acr( + ... host_uuid='12345678-1234-1234-123456789012', + ... score=7, + ... reasonings=[4], + ... notes='Why we changed this score...', + ... ) + """ + payload = {'overwritten': str(overwritten).lower()} + if reasoning: + payload['reasoning'] = [{'id': x} for x in reasoning] + if score: + payload['overwrittenScore'] = score + if notes: + payload['notes'] = notes + return self._patch(f'{host_uuid}/acr', json=payload).response diff --git a/tests/sc/conftest.py b/tests/sc/conftest.py index ae4fc9424..daf260daf 100644 --- a/tests/sc/conftest.py +++ b/tests/sc/conftest.py @@ -1,20 +1,22 @@ -''' +""" configuration which would be used for testing sc. -''' +""" + import os -import responses + import pytest +import responses from tenable.errors import APIError, NotFoundError from tenable.sc import TenableSC -from tests.pytenable_log_handler import setup_logging_to_file, log_exception +from tests.pytenable_log_handler import log_exception, setup_logging_to_file @pytest.fixture(scope='module') def vcr_config(): - ''' + """ test fixture for vcr_config - ''' + """ return { 'filter_headers': [ ('Cookie', 'TNS_SESSIONID=SESSIONID'), @@ -24,23 +26,22 @@ def vcr_config(): @pytest.fixture(autouse=True, scope='module') +@pytest.mark.filterwarnings('ignore::DeprecationWarning') def security_center(request, vcr): - ''' + """ test fixture for sc(security center) - ''' + """ setup_logging_to_file() - with vcr.use_cassette('sc_login', - filter_post_data_parameters=['username', - 'password' - ]): + with vcr.use_cassette( + 'sc_login', filter_post_data_parameters=['username', 'password'] + ): tenable_security_center = TenableSC( - os.getenv('SC_TEST_HOST', 'securitycenter.home.cugnet.net'), + url=os.getenv('SC_TEST_URL', 'https://securitycenter.home.cugnet.net'), vendor='pytest', - product='pytenable-automated-testing') - tenable_security_center.login( - os.getenv('SC_TEST_USER', 'username'), - os.getenv('SC_TEST_PASS', 'password')) - tenable_security_center.version # noqa: PLW0104 + product='pytenable-automated-testing', + username='username', + password='password', + ) def teardown(): try: @@ -54,22 +55,21 @@ def teardown(): @pytest.fixture(autouse=True, scope='module') +@pytest.mark.filterwarnings('ignore::DeprecationWarning') def admin(request, vcr): - ''' + """ test fixture for admin - ''' - with vcr.use_cassette('sc_login', - filter_post_data_parameters=['username', - 'password' - ]): + """ + with vcr.use_cassette( + 'sc_login', filter_post_data_parameters=['username', 'password'] + ): sc = TenableSC( # noqa: PLC0103 - os.getenv('SC_TEST_HOST', 'securitycenter.home.cugnet.net'), + url=os.getenv('SC_TEST_URL', 'https://securitycenter.home.cugnet.net'), vendor='pytest', - product='pytenable-automated-testing') - sc.login( - os.getenv('SC_TEST_ADMIN_USER', 'admin'), - os.getenv('SC_TEST_ADMIN_PASS', 'password')) - sc.version # noqa: PLW0104 + product='pytenable-automated-testing', + username='admin', + password='password', + ) def teardown(): with vcr.use_cassette('sc_login'): @@ -80,26 +80,27 @@ def teardown(): @pytest.fixture(autouse=True, scope='module') +@pytest.mark.filterwarnings('ignore::DeprecationWarning') def unauth(vcr): - ''' + """ test fixture for un_authorization - ''' - with vcr.use_cassette('sc_login', - filter_post_data_parameters=['username', - 'password' - ]): + """ + with vcr.use_cassette( + 'sc_login', filter_post_data_parameters=['username', 'password'] + ): sc = TenableSC( # noqa: PLC0103 - os.getenv('SC_TEST_HOST', 'securitycenter.home.cugnet.net'), + url=os.getenv('SC_TEST_URL', 'https://securitycenter.home.cugnet.net'), vendor='pytest', - product='pytenable-automated-testing') + product='pytenable-automated-testing', + ) return sc @pytest.fixture def group(request, security_center, vcr): - ''' + """ test fixture for un_authorization - ''' + """ with vcr.use_cassette('test_groups_create_success'): grp = security_center.groups.create('groupname') @@ -117,13 +118,10 @@ def teardown(): @pytest.fixture def tsc(): with responses.RequestsMock() as rsps: - rsps.get('https://nourl/rest/system', - json={ - 'error_code': None, - 'response': {'version': '6.4.0'} - } - ) - return TenableSC(url='https://nourl', - access_key='SOMETHING', - secret_key='SECRET' - ) + rsps.get( + 'https://nourl/rest/system', + json={'error_code': None, 'response': {'version': '6.4.0'}}, + ) + return TenableSC( + url='https://nourl', access_key='SOMETHING', secret_key='SECRET' + ) diff --git a/tests/sc/test___init__.py b/tests/sc/test___init__.py index 09a76aabf..d37c9c746 100644 --- a/tests/sc/test___init__.py +++ b/tests/sc/test___init__.py @@ -1,28 +1,33 @@ -''' +""" test file to test various scenarios in init.py -''' +""" + import os import sys + import pytest -from requests.models import Response from requests.exceptions import ConnectionError as RequestsConnectionError +from requests.models import Response + from tenable.errors import ConnectionError from tenable.sc import TenableSC def test_init_connection_error(): - ''' + """ test init for connection error - ''' + """ with pytest.raises(RequestsConnectionError): - TenableSC(host='nothing_here', - username='something', - password='something', - vendor='pytest', - product='pytenable-automated-testing') + TenableSC( + url='https://nothing_here', + username='something', + password='something', + vendor='pytest', + product='pytenable-automated-testing', + ) -#def test_init_self_details_connection_error(vcr): +# def test_init_self_details_connection_error(vcr): # ''' # test init self details for connection error # ''' @@ -36,24 +41,26 @@ def test_init_connection_error(): def test_enter(security_center): - ''' + """ test enter - ''' + """ assert security_center == security_center.__enter__() def test_exit(security_center, vcr): - ''' + """ test exit - ''' + """ with vcr.use_cassette('sc_login'): - security_center.__exit__(exc_type='exc_type', exc_value='exc_value', exc_traceback='exc_traceback') + security_center.__exit__( + exc_type='exc_type', exc_value='exc_value', exc_traceback='exc_traceback' + ) def test_resp_error_check(security_center): - ''' + """ test response error check - ''' + """ with pytest.raises(AttributeError): response = Response() response._content = b'{ "error_code": 401}' @@ -66,9 +73,9 @@ def test_resp_error_check(security_center): @pytest.mark.skip(reason='Deprecating this method') def test_log_in(vcr): - ''' + """ test log in - ''' + """ tsc = TenableSC(url='https://localhost') tsc._version = '5.12.0' with pytest.raises(ConnectionError): @@ -87,6 +94,7 @@ def test_log_in(vcr): def test_pkcs12_import_error(): import tenable.sc + tenable.sc.serialization = None with pytest.raises(ModuleNotFoundError): sc = TenableSC( diff --git a/tests/sc/test_hosts.py b/tests/sc/test_hosts.py new file mode 100644 index 000000000..c46a23166 --- /dev/null +++ b/tests/sc/test_hosts.py @@ -0,0 +1,116 @@ +import pytest +import responses +from responses.matchers import json_params_matcher, query_param_matcher + + +@pytest.fixture +def test_resp(): + return { + 'error_code': 0, + 'error_msg': '', + 'warnings': [], + 'response': [ + { + 'id': '154', + 'uuid': '12345678-1234-1234-123456789012', + 'tenableUUID': '1234abcd-abcd-abcd-1234567890abcd', + 'name': 'test1', + 'ipAddress': '1.2.3.4', + 'os': 'Linux', + 'firstSeen': '1770789', + 'lastSeen': '1779789', + } + ], + } + + +@responses.activate +def test_host_list(tsc, test_resp): + responses.get( + 'https://nourl/rest/hosts', + match=[ + query_param_matcher( + { + 'fields': ( + 'id,uuid,tenableUUID,name,ipAddress,os,firstSeen,lastSeen,' + 'macAddress,source,repID,netBios,netBiosWorkgroup,createdTime,' + 'modifiedTime,acr,aes' + ), + 'endOffset': 10000, + 'startOffset': 0, + 'limit': 10000, + 'pagination': 'true', + } + ) + ], + json=test_resp, + ) + resp = tsc.hosts.list() + item = next(resp) + assert item == test_resp['response'][0] + + +@responses.activate +def test_host_search(tsc, test_resp): + responses.post( + 'https://nourl/rest/hosts/search', + json=test_resp, + match=[ + query_param_matcher( + { + 'fields': ( + 'id,uuid,tenableUUID,name,ipAddress,os,firstSeen,lastSeen,' + 'macAddress,source,repID,netBios,netBiosWorkgroup,createdTime,' + 'modifiedTime,acr,aes' + ), + 'endOffset': 10000, + 'startOffset': 0, + 'limit': 10000, + 'pagination': 'true', + } + ), + json_params_matcher( + { + 'filters': { + 'and': [ + {'property': 'ip', 'operator': 'eq', 'value': '1.2.3.4'} + ] + } + } + ), + ], + ) + resp = tsc.hosts.search(('ip', 'eq', '1.2.3.4')) + item = next(resp) + assert item == test_resp['response'][0] + + +@responses.activate +def test_hosts_update_acr(tsc): + test_obj = { + 'id': '123', + 'uuid': '12345678-1234-1234-123456789012', + } + responses.patch( + 'https://nourl/rest/hosts/12345678-1234-1234-123456789012/acr', + match=[ + json_params_matcher( + { + 'overwrittenScore': 7, + 'reasoning': [{'id': 1}], + 'notes': 'Example notes', + 'overwritten': 'true', + } + ) + ], + json={ + 'type': 'regular', + 'error_code': 0, + 'error_msg': '', + 'warnings': [], + 'response': test_obj, + }, + ) + assert test_obj == tsc.hosts.update_acr( + '12345678-1234-1234-123456789012', score=7, reasoning=[1], notes='Example notes' + )