Skip to content

Commit

Permalink
chore(sca,iac): remove code
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Perucki committed Feb 5, 2025
1 parent 6cba0a2 commit e62f72b
Show file tree
Hide file tree
Showing 40 changed files with 3 additions and 1,358 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20250205_113323_florian.perucki_remove_sca_iac.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Removed

- Removed SCA and IaC code
197 changes: 0 additions & 197 deletions pygitguardian/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@
DEFAULT_TIMEOUT,
MAXIMUM_PAYLOAD_SIZE,
)
from .iac_models import (
IaCDiffScanResult,
IaCScanParameters,
IaCScanParametersSchema,
IaCScanResult,
)
from .models import (
APITokensResponse,
CreateInvitation,
Expand Down Expand Up @@ -66,12 +60,6 @@
UpdateTeamSource,
)
from .models_utils import CursorPaginatedResponse
from .sca_models import (
ComputeSCAFilesResult,
SCAScanAllOutput,
SCAScanDiffOutput,
SCAScanParameters,
)


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -355,8 +343,6 @@ def post(
extra_headers: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> Response:
# Be aware that self.iac_directory_scan bypass this method and calls self.request directly.
# self.iac_diff_scan also bypass this method
return self.request(
"post",
endpoint=endpoint,
Expand Down Expand Up @@ -695,100 +681,6 @@ def create_honeytoken_with_context(
result.status_code = resp.status_code
return result

def iac_directory_scan(
self,
directory: Path,
filenames: List[str],
scan_parameters: IaCScanParameters,
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, IaCScanResult]:
"""
iac_directory_scan handles the /iac_scan endpoint of the API.
:param directory: path to the directory to scan
:param filenames: filenames of the directory to include in the scan
:param scan_parameters: minimum severities wanted and policies to ignore
example: {"ignored_policies":["GG_IAC_0003"],"minimum_severity":"HIGH"}
:param extra_headers: optional extra headers to add to the request
:return: ScanResult response and status code
"""
tar = _create_tar(directory, filenames)
result: Union[Detail, IaCScanResult]
try:
# bypass self.post because data argument is needed in self.request and self.post use it as json
resp = self.request(
"post",
endpoint="iac_scan",
extra_headers=extra_headers,
files={
"directory": tar,
},
data={
"scan_parameters": IaCScanParametersSchema().dumps(scan_parameters),
},
)
except requests.exceptions.ReadTimeout:
result = Detail("The request timed out.")
result.status_code = 504
else:
if is_ok(resp):
result = IaCScanResult.from_dict(resp.json())
else:
result = load_detail(resp)

result.status_code = resp.status_code

return result

def iac_diff_scan(
self,
reference: bytes,
current: bytes,
scan_parameters: IaCScanParameters,
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, IaCDiffScanResult]:
"""
iac_diff_scan handles the /iac_diff_scan endpoint of the API.
Scan two directories and compare their vulnerabilities.
Vulnerabilities in reference but not in current are considered "new".
Vulnerabilities in both reference and current are considered "unchanged".
Vulnerabilities in current but not in reference are considered "deleted".
:param reference: tar file containing the reference directory. Usually an incoming commit
:param current: tar file of the current directory. Usually HEAD
:param scan_parameters: minimum severities wanted and policies to ignore
example: {"ignored_policies":["GG_IAC_0003"],"minimum_severity":"HIGH"}
:param extra_headers: optional extra headers to add to the request
:return: ScanResult response and status code
"""
result: Union[Detail, IaCDiffScanResult]
try:
# bypass self.post because data argument is needed in self.request and self.post use it as json
resp = self.request(
"post",
endpoint="iac_diff_scan",
extra_headers=extra_headers,
files={
"reference": reference,
"current": current,
},
data={
"scan_parameters": IaCScanParametersSchema().dumps(scan_parameters),
},
)
except requests.exceptions.ReadTimeout:
result = Detail("The request timed out.")
result.status_code = 504
else:
if is_ok(resp):
result = IaCDiffScanResult.from_dict(resp.json())
else:
result = load_detail(resp)

result.status_code = resp.status_code
return result

def read_metadata(self) -> Optional[Detail]:
"""
Fetch server preferences and store them in `self.secret_scan_preferences`.
Expand Down Expand Up @@ -836,95 +728,6 @@ def create_jwt(
obj.status_code = resp.status_code
return obj

def compute_sca_files(
self,
files: List[str],
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, ComputeSCAFilesResult]:
if len(files) == 0:
result = ComputeSCAFilesResult(sca_files=[])
result.status_code = 200
return result

response = self.post(
endpoint="sca/compute_sca_files/",
data={"files": files},
extra_headers=extra_headers,
)
result: Union[Detail, ComputeSCAFilesResult]
if is_ok(response):
result = ComputeSCAFilesResult.from_dict(response.json())
else:
result = load_detail(response)

result.status_code = response.status_code
return result

def sca_scan_directory(
self,
tar_file: bytes,
scan_parameters: SCAScanParameters,
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, SCAScanAllOutput]:
"""
Launches an SCA scan via SCA public API on a tar archive
"""

result: Union[Detail, SCAScanAllOutput]

try:
# bypass self.post because data argument is needed in self.request and self.post use it as json
response = self.request(
"post",
endpoint="sca/sca_scan_all/",
files={"directory": tar_file},
data={
"scan_parameters": SCAScanParameters.SCHEMA.dumps(scan_parameters)
},
extra_headers=extra_headers,
)
except requests.exceptions.ReadTimeout:
result = Detail("The request timed out.")
result.status_code = 504
else:
if is_ok(response):
result = SCAScanAllOutput.from_dict(response.json())
else:
result = load_detail(response)

result.status_code = response.status_code

return result

def scan_diff(
self,
reference: bytes,
current: bytes,
scan_parameters: SCAScanParameters,
extra_headers: Optional[Dict[str, str]] = None,
) -> Union[Detail, SCAScanDiffOutput]:
result: Union[Detail, SCAScanDiffOutput]
try:
response = self.request(
"post",
endpoint="sca/sca_scan_diff/",
files={"reference": reference, "current": current},
data={
"scan_parameters": SCAScanParameters.SCHEMA.dumps(scan_parameters)
},
extra_headers=extra_headers,
)
except requests.exceptions.ReadTimeout:
result = Detail("The request timed out.")
result.status_code = 504
else:
if is_ok(response):
result = SCAScanDiffOutput.from_dict(response.json())
else:
result = load_detail(response)
result.status_code = response.status_code
return result

def list_members(
self,
parameters: Optional[MembersParameters] = None,
Expand Down
96 changes: 0 additions & 96 deletions pygitguardian/iac_models.py

This file was deleted.

Loading

0 comments on commit e62f72b

Please sign in to comment.