Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sca,iac): remove code #136

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Removed

- Removed SCA and IaC code
197 changes: 0 additions & 197 deletions pygitguardian/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@
DEFAULT_TIMEOUT,
MAXIMUM_PAYLOAD_SIZE,
)
from .iac_models import (
IaCDiffScanResult,
IaCScanParameters,
IaCScanParametersSchema,
IaCScanResult,
)
from .models import (
APITokensResponse,
CreateInvitation,
Expand Down Expand Up @@ -66,12 +60,6 @@
UpdateTeamSource,
)
from .models_utils import CursorPaginatedResponse
from .sca_models import (
ComputeSCAFilesResult,
SCAScanAllOutput,
SCAScanDiffOutput,
SCAScanParameters,
)


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -355,8 +343,6 @@ def post(
extra_headers: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> Response:
# Be aware that self.iac_directory_scan bypass this method and calls self.request directly.
# self.iac_diff_scan also bypass this method
return self.request(
"post",
endpoint=endpoint,
Expand Down Expand Up @@ -695,100 +681,6 @@ def create_honeytoken_with_context(
result.status_code = resp.status_code
return result

def iac_directory_scan(
self,
directory: Path,
filenames: List[str],
scan_parameters: IaCScanParameters,
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, IaCScanResult]:
"""
iac_directory_scan handles the /iac_scan endpoint of the API.
:param directory: path to the directory to scan
:param filenames: filenames of the directory to include in the scan
:param scan_parameters: minimum severities wanted and policies to ignore
example: {"ignored_policies":["GG_IAC_0003"],"minimum_severity":"HIGH"}
:param extra_headers: optional extra headers to add to the request
:return: ScanResult response and status code
"""
tar = _create_tar(directory, filenames)
result: Union[Detail, IaCScanResult]
try:
# bypass self.post because data argument is needed in self.request and self.post use it as json
resp = self.request(
"post",
endpoint="iac_scan",
extra_headers=extra_headers,
files={
"directory": tar,
},
data={
"scan_parameters": IaCScanParametersSchema().dumps(scan_parameters),
},
)
except requests.exceptions.ReadTimeout:
result = Detail("The request timed out.")
result.status_code = 504
else:
if is_ok(resp):
result = IaCScanResult.from_dict(resp.json())
else:
result = load_detail(resp)

result.status_code = resp.status_code

return result

def iac_diff_scan(
self,
reference: bytes,
current: bytes,
scan_parameters: IaCScanParameters,
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, IaCDiffScanResult]:
"""
iac_diff_scan handles the /iac_diff_scan endpoint of the API.
Scan two directories and compare their vulnerabilities.
Vulnerabilities in reference but not in current are considered "new".
Vulnerabilities in both reference and current are considered "unchanged".
Vulnerabilities in current but not in reference are considered "deleted".
:param reference: tar file containing the reference directory. Usually an incoming commit
:param current: tar file of the current directory. Usually HEAD
:param scan_parameters: minimum severities wanted and policies to ignore
example: {"ignored_policies":["GG_IAC_0003"],"minimum_severity":"HIGH"}
:param extra_headers: optional extra headers to add to the request
:return: ScanResult response and status code
"""
result: Union[Detail, IaCDiffScanResult]
try:
# bypass self.post because data argument is needed in self.request and self.post use it as json
resp = self.request(
"post",
endpoint="iac_diff_scan",
extra_headers=extra_headers,
files={
"reference": reference,
"current": current,
},
data={
"scan_parameters": IaCScanParametersSchema().dumps(scan_parameters),
},
)
except requests.exceptions.ReadTimeout:
result = Detail("The request timed out.")
result.status_code = 504
else:
if is_ok(resp):
result = IaCDiffScanResult.from_dict(resp.json())
else:
result = load_detail(resp)

result.status_code = resp.status_code
return result

def read_metadata(self) -> Optional[Detail]:
"""
Fetch server preferences and store them in `self.secret_scan_preferences`.
Expand Down Expand Up @@ -836,95 +728,6 @@ def create_jwt(
obj.status_code = resp.status_code
return obj

def compute_sca_files(
self,
files: List[str],
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, ComputeSCAFilesResult]:
if len(files) == 0:
result = ComputeSCAFilesResult(sca_files=[])
result.status_code = 200
return result

response = self.post(
endpoint="sca/compute_sca_files/",
data={"files": files},
extra_headers=extra_headers,
)
result: Union[Detail, ComputeSCAFilesResult]
if is_ok(response):
result = ComputeSCAFilesResult.from_dict(response.json())
else:
result = load_detail(response)

result.status_code = response.status_code
return result

def sca_scan_directory(
self,
tar_file: bytes,
scan_parameters: SCAScanParameters,
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, SCAScanAllOutput]:
"""
Launches an SCA scan via SCA public API on a tar archive
"""

result: Union[Detail, SCAScanAllOutput]

try:
# bypass self.post because data argument is needed in self.request and self.post use it as json
response = self.request(
"post",
endpoint="sca/sca_scan_all/",
files={"directory": tar_file},
data={
"scan_parameters": SCAScanParameters.SCHEMA.dumps(scan_parameters)
},
extra_headers=extra_headers,
)
except requests.exceptions.ReadTimeout:
result = Detail("The request timed out.")
result.status_code = 504
else:
if is_ok(response):
result = SCAScanAllOutput.from_dict(response.json())
else:
result = load_detail(response)

result.status_code = response.status_code

return result

def scan_diff(
self,
reference: bytes,
current: bytes,
scan_parameters: SCAScanParameters,
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, SCAScanDiffOutput]:
result: Union[Detail, SCAScanDiffOutput]
try:
response = self.request(
"post",
endpoint="sca/sca_scan_diff/",
files={"reference": reference, "current": current},
data={
"scan_parameters": SCAScanParameters.SCHEMA.dumps(scan_parameters)
},
extra_headers=extra_headers,
)
except requests.exceptions.ReadTimeout:
result = Detail("The request timed out.")
result.status_code = 504
else:
if is_ok(response):
result = SCAScanDiffOutput.from_dict(response.json())
else:
result = load_detail(response)
result.status_code = response.status_code
return result

def list_members(
self,
parameters: Optional[MembersParameters] = None,
Expand Down
96 changes: 0 additions & 96 deletions pygitguardian/iac_models.py

This file was deleted.

Loading
Loading