Skip to content

Commit

Permalink
remote api call to zerofox's api-token-auth/ endpoint to avoid possib…
Browse files Browse the repository at this point in the history
…le rate limits
  • Loading branch information
DNRRomero committed Feb 6, 2025
1 parent cb6869f commit 37fe75f
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 87 deletions.
88 changes: 39 additions & 49 deletions Packs/ZeroFox/Integrations/ZeroFox/ZeroFox.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@

class ZFClient(BaseClient):
def __init__(
self, username, password, only_escalated, *args, **kwargs
self, username, token, only_escalated, *args, **kwargs
):
super().__init__(*args, **kwargs)
self.credentials = {
"username": username,
"password": password
"password": token
}
self.only_escalated = only_escalated
self.auth_token = ""
self.auth_token = token

def raw_api_request(
self,
Expand All @@ -58,7 +58,6 @@ def raw_api_request(
files: dict[str, Any] | None = None,
prefix: str | None = "1.0",
empty_response: bool = False,
error_handler: Callable[[Any], Any] | None = None,
**kwargs
) -> Response:
pref_string = f"/{prefix}" if prefix else ""
Expand All @@ -74,7 +73,7 @@ def raw_api_request(
ok_codes=ok_codes,
empty_valid_codes=(200, 201),
return_empty_response=empty_response,
error_handler=error_handler,
error_handler=self.handle_zerofox_error,
files=files,
resp_type="response",
**kwargs
Expand All @@ -92,7 +91,6 @@ def api_request(
files: dict[str, Any] | None = None,
prefix: str | None = "1.0",
empty_response: bool = False,
error_handler: Callable[[Any], Any] | None = None,
**kwargs
) -> dict[str, Any]:
"""
Expand Down Expand Up @@ -143,50 +141,35 @@ def api_request(
ok_codes=ok_codes,
empty_valid_codes=(200, 201),
return_empty_response=empty_response,
error_handler=error_handler,
error_handler=self.handle_zerofox_error,
files=files,
**kwargs
)

def handle_auth_error(self, raw_response: Response):
def handle_zerofox_error(self, raw_response: Response):
status_code = raw_response.status_code
if status_code >= 500:
raise ZeroFoxInternalException(
status_code=status_code,
cause=raw_response.text,
)
cause = self._build_exception_cause(raw_response)
response = raw_response.json()
if status_code in [401, 403]:
raise ZeroFoxAuthException(cause=cause)
raise ZeroFoxInternalException(
status_code=status_code,
cause=str(response),
)

def _build_exception_cause(self, raw_response: Response) -> str:
try:
status_code = raw_response.status_code
if raw_response.status_code >= 500:
raise ZeroFoxAuthException(
cause=internal_zerofox_error_msg(
status_code=status_code,
text=raw_response.text
)
)
if raw_response.status_code == 429:
raise ZeroFoxAuthException(
cause="The application is sending too many requests to ZeroFox API,\
please contact support."
)
response = raw_response.json()
if non_field_errors := response.get("non_field_errors", []):
raise ZeroFoxAuthException(cause=non_field_errors[0])
raise ZeroFoxAuthException(cause=str(response))
return non_field_errors[0]
return str(response)
except json.JSONDecodeError:
raise ZeroFoxAuthException(cause=raw_response.text)

def get_authorization_token(self) -> str:
"""
:return: Returns the authorization token
"""
if self.auth_token:
return self.auth_token
url_suffix: str = "/1.0/api-token-auth/"
response_content = self.api_request(
"POST",
url_suffix,
data=self.credentials,
error_handler=self.handle_auth_error,
headers_builder_type=None,
prefix=None,
)
self.auth_token = response_content.get("token", "")
return self.auth_token
return raw_response.text

def _get_new_access_token(self) -> str:
url_suffix: str = "/auth/token/"
Expand All @@ -209,7 +192,7 @@ def get_cti_authorization_token(self) -> str:
return token

def get_api_request_header(self) -> dict[str, str]:
token: str = self.get_authorization_token()
token: str = self.auth_token
return {
"Authorization": f"Token {token}",
"Content-Type": "application/json",
Expand Down Expand Up @@ -722,11 +705,6 @@ def get_cti_exploits(self, since: str) -> dict[str, Any]:
""" HELPERS """


def internal_zerofox_error_msg(status_code: int, text: str) -> str:
return f"The ZeroFox API experienced an internal error, please try again later:\
Status Code: {status_code}, Response: {text}"


class ZeroFoxGetAlertsException(Exception):
def __init__(self, escalated: bool, cause: Exception):
self.escalated = escalated
Expand All @@ -751,6 +729,18 @@ def _generate_msg(self) -> str:
\n {self.cause}"


class ZeroFoxInternalException(Exception):
def __init__(self, status_code: int, cause: str):
self.status_code = status_code
self.cause = cause
super().__init__(self._generate_msg())

def _generate_msg(self) -> str:
return f"An error occurred within ZeroFox, please try again later.\
If the issue persists, contact support.\
Status Code: {self.status_code}, Response: {self.cause}"


class ZeroFoxAuthException(Exception):
def __init__(self, cause: str):
self.cause = cause
Expand Down Expand Up @@ -2124,7 +2114,7 @@ def main():
base_url=BASE_URL,
ok_codes={200, 201},
username=USERNAME,
password=PASSWORD,
token=PASSWORD,
verify=USE_SSL,
proxy=PROXY,
only_escalated=ONLY_ESCALATED,
Expand Down
Loading

0 comments on commit 37fe75f

Please sign in to comment.