-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandler.py
177 lines (155 loc) · 5.64 KB
/
handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from git import Repo
from loguru import logger
from uuid import uuid4
from github import Github
import subprocess
from os import scandir
from github import Github
import simplejson as json
import boto3
ssm = boto3.client('ssm')
token = ssm.get_parameter(Name='auth_token', WithDecryption=True).get('Parameter').get('Value')
gh = Github(token)
def check_python_files(event, context):
data = event
if (
"action" in data
and data["action"] == "opened"
and "pull_request" in data
and "repository" in data
):
repo = gh.get_repo(data["repository"]["full_name"])
target_pr = repo.get_pull(data["number"])
branch = data["pull_request"]["head"]["ref"]
url = data.get("pull_request").get("head").get("repo").get("html_url")
affected_files = target_pr.get_files()
presence_python_files = False
presence_req = False
pr_id = data.get("number")
print(repo, target_pr, branch, url, pr_id)
if any(str(single.filename).endswith(".py") for single in affected_files):
presence_python_files = True
else:
pass
if any(single.filename == "requirements.txt" for single in affected_files):
presence_req = True
else:
pass
return_dict = {
"branch": branch,
"repo": url,
"is_python": presence_python_files,
"is_requirements": presence_req,
"pr_id": pr_id,
"req_repo_path": data["repository"]["full_name"],
}
print(return_dict)
return return_dict
else:
return None
def run_bandit(event, context):
# return event
data = event # json.loads(event.get('body'))
if data:
if data.get("repo") and data.get("branch") and data.get("is_python"):
random_val = str(uuid4())
repo = Repo.clone_from(
data.get("repo"),
"/tmp/{}".format(random_val),
branch=data.get("branch"),
)
subprocess.call(
"/var/lang/bin/python -m bandit -r -f json -o /tmp/result.json /tmp/{}/".format(
random_val
),
shell=True,
)
with open("/tmp/result.json", "r") as res:
outcontent = json.loads(res.read())
data["result"] = outcontent.get("results")
return data
else:
logger.error(
"Mandatory keys 'repo' and 'branch' not in event body. Will not work"
)
return event
else:
return None
def run_safety(event, context):
data = event
if data:
if data.get("repo") and data.get("branch"):
random_val = str(uuid4())
repo = Repo.clone_from(
data.get("repo"),
"/tmp/{}/".format(random_val),
branch=data.get("branch"),
)
for entry in scandir("/tmp/{}/".format(random_val)):
print(entry)
if "requirements" in entry.name:
print("in loop")
subprocess.call(
"/var/lang/bin/python -m safety check -r {} --full-report --json -o /tmp/result.json".format(
entry.path
),
shell=True,
)
with open("/tmp/result.json", "r") as res:
outcontent = json.loads(res.read())
data["sca"] = outcontent
break
return data
else:
logger.error(
"Mandatory keys 'repo' and 'branch' not in event body. Will not work"
)
return None
else:
return None
def sast_pr_comment(event, context):
if "result" in event:
mdh1 = "## Static Analysis Report - Bandit\n\n"
mdtable = "| Issue | File | Line | Confidence | Severity |\n"
mdheader = "|-------|:----------:|------:|------:|------:|\n"
mdlist = []
mdlist.append(mdh1)
mdlist.append(mdtable)
mdlist.append(mdheader)
if isinstance(event["result"], list):
for single in event["result"]:
mdlist.append(
"| {} | {} | {} | {} | {} |\n".format(
single["issue_text"],
single["filename"],
single["line_number"],
single["issue_confidence"],
single["issue_severity"],
)
)
final_md = "".join(mdlist)
repo = gh.get_repo(event.get("req_repo_path"))
pr = repo.get_pull(event.get("pr_id"))
pr.create_issue_comment(final_md)
return event
def sca_pr_comment(event, context):
if "sca" in event:
mdh1 = "## Source Composition Analysis - Safety\n\n"
mdtable = "| Library | Affected Version | Fix Version | Description | \n"
mdheader = "|-------|:----------:|------:|------:|\n"
mdlist = []
mdlist.append(mdh1)
mdlist.append(mdtable)
mdlist.append(mdheader)
if isinstance(event["sca"], list):
for single in event["sca"]:
mdlist.append(
"| {} | {} | {} | {} |\n".format(
single[0], single[1], single[2], single[3]
)
)
final_md = "".join(mdlist)
repo = gh.get_repo(event.get("req_repo_path"))
pr = repo.get_pull(event.get("pr_id"))
pr.create_issue_comment(final_md)
return event