Skip to content

Commit

Permalink
Merge pull request #179 from CVEProject/addition_transforms
Browse files Browse the repository at this point in the history
Additional transforms
  • Loading branch information
jwhitmore-mitre authored Jun 21, 2022
2 parents f630a93 + 09dbd27 commit fbf54b9
Show file tree
Hide file tree
Showing 2 changed files with 364 additions and 137 deletions.
244 changes: 107 additions & 137 deletions schema/v5.0/support/CVE_4_to_5_converter/cve4to5up.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
scoring_other = {}
invalid_impact_versions = []
requester_map = {}
reference_tag_map = {}
ValidationFailures = {}
cvssErrorList = []
minShortName = 100
Expand Down Expand Up @@ -105,7 +106,8 @@ def main(argv):
except:
problemfiles[filepath] = "" + str(sys.exc_info()[0]) + " -- " + str(sys.exc_info()[1]) + " -- "
CVECount += 1
if CVECount % 100 == 0: spinner.next()
# if CVECount % 100 == 0: spinner.next()
if CVECount % 10 == 0: spinner.next()

print('FINISHED processed directory', inputdir)
print('')
Expand Down Expand Up @@ -349,6 +351,7 @@ def CVE_Convert(inputfile, outputpath):
global scoring_other
global invalid_impact_versions
global requester_map
global reference_tag_map
global minShortName
global maxShortName
global maxTitle
Expand All @@ -358,15 +361,10 @@ def CVE_Convert(inputfile, outputpath):

if len(requester_map) < 1:
getRequesterMap()

''' Not needed if querying IDR by CVE ID
if len(all_users) < 1:
getAllUsers()
# get min and max length of org shortname
for org in all_orgs:
minShortName = min(minShortName, len(all_orgs[org]["short_name"]))
maxShortName = max(maxShortName, len(all_orgs[org]["short_name"]))
'''

if len(reference_tag_map) < 1:
getReferenceTagMap()


with open(inputfile) as json_file:
writeout = False
Expand All @@ -388,87 +386,6 @@ def CVE_Convert(inputfile, outputpath):
if i_meta["STATE"] not in keys_used: keys_used[i_meta["STATE"]] = {}
keys_used[i_meta["STATE"]]["CVE_data_meta"] = {}

'''
if "ASSIGNER" in i_meta:
# v4 assigner email converted to orgId before v5 upconvert
# get org info
username = i_meta["ASSIGNER"]
if i_meta["ASSIGNER"] not in all_users:
found = False
# check user mapping first for org name
if username in requester_map:
orgShort = requester_map[username][3]
for org in all_orgs:
if all_orgs[org]["short_name"].casefold() == orgShort.casefold():
o_meta["assignerOrgId"] = all_orgs[org]["UUID"]
o_meta["assignerShortName"] = all_orgs[org]["short_name"]
found = True
break
if not found:
#attempt to locate org from username domain extract
orgShort = username
if '@' in orgShort:
orgShort = orgShort.split('@',1)[1]
if '.' in orgShort:
# strip last .xxxx
orgShort = orgShort.rsplit('.', 1)[0]
# print ("looking for org: "+orgShort)
orgShort = orgShort.lower()
for org in all_orgs:
if all_orgs[org]["short_name"].casefold() == orgShort.casefold():
o_meta["assignerOrgId"] = all_orgs[org]["UUID"]
o_meta["assignerShortName"] = all_orgs[org]["short_name"]
found = True
break
if not found:
if username in requester_map:
orgShort = requester_map[username][3]
for org in all_orgs:
if all_orgs[org]["short_name"].casefold() == orgShort.casefold():
o_meta["assignerOrgId"] = all_orgs[org]["UUID"]
o_meta["assignerShortName"] = all_orgs[org]["short_name"]
found = True
break
else:
# look for email half
if '@' in username:
userShort = username.split('@',1)[1]
for user in requester_map:
tuser = user
if '@' in tuser:
tuser = user.split('@',1)[1]
if userShort.casefold() == tuser.casefold():
orgShort = requester_map[user][3]
for org in all_orgs:
if all_orgs[org]["short_name"].casefold() == orgShort.casefold():
o_meta["assignerOrgId"] = all_orgs[org]["UUID"]
o_meta["assignerShortName"] = all_orgs[org]["short_name"]
found = True
break
# end for org
if found: break
# end for user
else:
print("found username that is not an email: "+ str(username) )
# end if - user_map else parsed email magic
if not found:
# default to MITRE and CNAofLR
if username not in defaulted_users:
defaulted_users[username] = []
defaulted_users[username].append(i_meta["ID"])
username = "DEFAULT"
o_meta["assignerOrgId"] = all_users[username]["org_UUID"]
o_meta["assignerShortName"] = all_users[username]["org_short_name"]
else:
o_meta["assignerOrgId"] = all_users[username]["org_UUID"]
o_meta["assignerShortName"] = all_users[username]["org_short_name"]
# print("in = " + i_meta["ASSIGNER"] + " out = " +o_meta["assignerShortName"])
'''

if "STATE" in i_meta:
if i_meta["STATE"] == 'RESERVED':
Expand Down Expand Up @@ -533,14 +450,12 @@ def CVE_Convert(inputfile, outputpath):
else:
raise MissingRequiredPropertyValue(inputfile, "CVE_data_meta no STATE")
except Exception as e:
# print("test 5")
print( str(e) )
if type(e) is not MissingRequiredPropertyValue:
raise MissingRequiredPropertyValue(inputfile, "CVE_data_meta structure error")
else:
raise e

# o_meta["dateUpdated"] = datetime.date.today().strftime("%Y-%m-%d")
o_meta["dateUpdated"] = str(datetime.datetime.combine(datetime.date.today(), datetime.datetime.min.time()).isoformat())

jout["cveMetadata"] = o_meta
Expand Down Expand Up @@ -646,6 +561,7 @@ def CVE_Convert(inputfile, outputpath):
if "version" in vd_pd and "version_data" in vd_pd["version"]:
v_agg_hash = {}
v_agg_list = {}
product_name = vd_pd["product_name"]
for pd_vd in vd_pd["version"]["version_data"]:
if not "version_value" in pd_vd:
# throw invalid version_data, must have version_value value
Expand All @@ -658,37 +574,39 @@ def CVE_Convert(inputfile, outputpath):
v_agg_list[platform] = []
vn_hash = v_agg_hash[platform]
v_list = v_agg_list[platform]
if "version_name" in pd_vd: # vulnogram generated
if "version_name" in pd_vd: # vulnogram generated
vn = pd_vd["version_name"]
[vstatus, va] = convert_VA(pd_vd)
if va == '=':
v_list.append(nonEmpty(eq_version(pd_vd, vstatus)))
elif vn in vn_hash:
if va == '<':
vstatus = negate(vstatus)
if va == '<=':
vstatus = negate(vstatus)
pd_vd["version_value"] = pd_vd["version_value"] + ' +1'
if product_name.casefold() is not vn.casefold():
[vstatus, va] = convert_VA(pd_vd)
if va == '=':
v_list.append(nonEmpty(eq_version(pd_vd, vstatus)))
elif vn in vn_hash:
if va == '<':
vstatus = negate(vstatus)
if va == '<=':
vstatus = negate(vstatus)
pd_vd["version_value"] = pd_vd["version_value"] + ' +1'
else:
if not "changes" in vn_hash[vn]:
vn_hash[vn]["changes"] = []
chg = {
"at": pd_vd["version_value"],
"status": vstatus
}
if chg not in vn_hash[vn]["changes"]:
vn_hash[vn]["changes"].append(chg)
elif va == '<':
vn_hash[vn] = nonEmpty(l_version(pd_vd, vstatus))
elif va == '<=':
vn_hash[vn] = nonEmpty(le_version(pd_vd, vstatus))
else:
if not "changes" in vn_hash[vn]:
vn_hash[vn]["changes"] = []
chg = {
"at": pd_vd["version_value"],
"status": vstatus
vn_hash[vn] = {
"version": pd_vd["version_value"],
"status": vstatus,
"lessThan": pd_vd["version_name"] + '*',
"versionType": "custom"
}
if chg not in vn_hash[vn]["changes"]:
vn_hash[vn]["changes"].append(chg)
elif va == '<':
vn_hash[vn] = nonEmpty(l_version(pd_vd, vstatus))
elif va == '<=':
vn_hash[vn] = nonEmpty(le_version(pd_vd, vstatus))
else:
vn_hash[vn] = {
"version": pd_vd["version_value"],
"status": vstatus,
"lessThan": pd_vd["version_name"] + '*',
"versionType": "custom"
}
# end if product_name is not version_name
else:
[vstatus, va] = convert_VA(pd_vd)
version_value = pd_vd["version_value"]
Expand Down Expand Up @@ -740,6 +658,7 @@ def CVE_Convert(inputfile, outputpath):
# check for blank version and defailt to "unspecified"
#if len(version_item["version"]) < 1:
# version_item["version"] = "unspecified"
# end if version_name in pd_vd

for platform in v_agg_hash:
# build affected item here:
Expand Down Expand Up @@ -803,9 +722,17 @@ def CVE_Convert(inputfile, outputpath):
if "refsource" in i_ref:
if "tags" not in o_ref:
o_ref["tags"] = []

# convert to new reference tags
v5Tag_value = getV5ReferenceTagValue(i_ref["refsource"])
if v5Tag_value not in o_ref["tags"]:
o_ref["tags"].append(v5Tag_value)

# preserve legacy tag value
refSourceTag = "x_refsource_"+i_ref["refsource"]
if refSourceTag not in o_ref["tags"]:
o_ref["tags"].append(refSourceTag)

if "url" in i_ref: o_ref["url"] = i_ref["url"]

# decode then encode URL, to clear issue with AJV URL validations
Expand All @@ -821,17 +748,34 @@ def CVE_Convert(inputfile, outputpath):
keys_used["PUBLIC"]["credit"] = ""
if isinstance(data["credit"], list):
for i_credit in data["credit"]:
o_credit = {}
if "lang" in i_credit and "value" in i_credit:
o_credit["lang"] = lang_code_2_from_3(i_credit["lang"])
if isinstance(i_credit, dict):
o_credit = {}
if "lang" in i_credit and "value" in i_credit:
o_credit["lang"] = lang_code_2_from_3(i_credit["lang"])
else:
o_credit["lang"] = "en"

if "value" in i_credit:
if "credits" not in o_cna:
o_cna["credits"] = []
o_credit["value"] = i_credit["value"]
o_cna["credits"].append(o_credit)
elif isinstance(i_credit, list):
for citem in i_credit:
o_credit = {}
o_credit["lang"] = "en"
if "credits" not in o_cna:
o_cna["credits"] = []
o_credit["value"] = citem
o_cna["credits"].append(o_credit)
else:
o_credit = {}
o_credit["lang"] = "en"

if "value" in i_credit:
o_credit["value"] = i_credit
if "credits" not in o_cna:
o_cna["credits"] = []
o_credit["value"] = i_credit["value"]
o_cna["credits"].append(o_credit)

else:
# convert value content to string
o_cna["credits"] = []
Expand Down Expand Up @@ -1487,7 +1431,7 @@ def getAllUsers():



def getIDRInfo(cveId, delay=20, retry=0):
def getIDRInfo(cveId, delay=300, retry=0):
IDR_URL = settings.AWG_IDR_SERVICE_URL + '/cve-id/' + cveId
idr_params = {}
data = None
Expand All @@ -1499,22 +1443,23 @@ def getIDRInfo(cveId, delay=20, retry=0):
if idr_result and idr_result.startswith("{"):
data = json.loads(idr_result)
else:
if retry < 10:
# print("delay for: "+ str(delay))
if retry < 14:
print("delaying for: "+ str(delay) + " -- on -- " + cveId)
time.sleep(delay)
data = getIDRInfo(cveId, delay*2, retry+1)
data = getIDRInfo(cveId, delay, retry+1)
else:
print("Record Issue - URL - " + IDR_URL)
print("Record Timeout Issue - URL - " + IDR_URL)
# print(str(idr_result))
except Exception as e:
if retry < 10:
if delay > 179:
print("exception delay for: " + str(delay))
if retry < 14:
# if delay > 179:
print("Exception delay for: " + str(delay))
print(" --- " + IDR_URL)
time.sleep(delay)
data = getIDRInfo(cveId, delay*2, retry+1)
data = getIDRInfo(cveId, delay, retry+1)
else:
# print(str(idr_result))
print("Exception -- URL - " + IDR_URL)
print("Exception Failed -- get IDR info -- URL - " + IDR_URL)
print(str(e))
raise e
return data
Expand Down Expand Up @@ -1570,6 +1515,31 @@ def getRequesterMap():
return True


def getReferenceTagMap():
global reference_tag_map

if len(reference_tag_map) < 1 :
with open("ref_tag_map.json") as ref_tag_file:
reference_tag_map = json.load(ref_tag_file)
return True


def getV5ReferenceTagValue(v4Tag):
global reference_tag_map
v5Tag = "related"
v4Test = v4Tag.casefold()
refhit = False
for tagMap in reference_tag_map["referenceMaps"]:
if v4Test == tagMap["v4"].casefold():
v5Tag = tagMap["v5"]
refhit = True
break
if not refhit:
print("Missed Ref Tag: " + v4Tag)

return v5Tag


def call_idr_service(action, req_header, IDR_URL, params=None, content=None):
"""
:param action: GET, POST, ...
Expand Down
Loading

0 comments on commit fbf54b9

Please sign in to comment.