Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightfuzz - Add many comments #2189

Merged
merged 7 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions bbot/modules/lightfuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async def interactsh_callback(self, r):
details = self.interactsh_subdomain_tags.get(full_id.split(".")[0])
if not details["event"]:
return
# currently, this is only used by the cmdi submodule. Later, when other modules use it, we will need to store description data in the interactsh_subdomain_tags dictionary
await self.emit_event(
{
"severity": "CRITICAL",
Expand Down Expand Up @@ -107,6 +108,19 @@ def _outgoing_dedup_hash(self, event):
)

def url_unparse(self, param_type, parsed_url):
"""
Reconstructs a URL from its components, optionally omitting the query string for GET parameters.

Parameters:
- param_type (str): The type of parameter, typically "GETPARAM" or another type indicating the request method.
- parsed_url (ParseResult): A named tuple containing the components of the URL (scheme, netloc, path, params, query, fragment).

Returns:
- str: The reconstructed URL as a string.

The method checks if the parameter type is "GETPARAM". If so, it omits the query string from the reconstructed URL unless
the retain_querystring flag is set to True. For other parameter types, it includes the query string.
"""
if param_type == "GETPARAM":
querystring = ""
else:
Expand Down Expand Up @@ -148,6 +162,7 @@ async def handle_event(self, event):
if self.config.get("force_common_headers", False) is False:
return False

# If force_common_headers is True, we force the emission of a WEB_PARAMETER for each of the common headers to force fuzzing against them
for h in self.common_headers:
description = f"Speculative (Forced) Header [{h}]"
data = {
Expand Down Expand Up @@ -191,6 +206,7 @@ async def finish(self):
except InteractshError as e:
self.debug(f"Error in interact.sh: {e}")

# If we've disabled fuzzing POST parameters, back out of POSTPARAM WEB_PARAMETER events as quickly as possible
async def filter_event(self, event):
if event.type == "WEB_PARAMETER" and self.disable_post and event.data["type"] == "POSTPARAM":
return False, "POST parameter disabled in lightfuzz module"
Expand Down
44 changes: 38 additions & 6 deletions bbot/modules/lightfuzz_submodules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,27 @@ def __init__(self, lightfuzz, event):
self.event = event
self.results = []

# WEB_PARAMETERs may contain additional_params (e.g. other parameters in the same form or query string). These will be sent unchanged along with the probe.

def additional_params_process(self, additional_params, additional_params_populate_blank_empty):
"""
Processes additional parameters by populating blank or empty values with random strings if specified.

Parameters:
- additional_params (dict): A dictionary of additional parameters to process.
- additional_params_populate_blank_empty (bool): If True, populates blank or empty parameter values with random numeric strings.

Returns:
- dict: A dictionary with processed additional parameters, where blank or empty values are replaced with random strings if specified.

The function iterates over the provided additional parameters and replaces any blank or empty values with a random numeric string
of length 10, if the flag is set to True. Otherwise, it returns the parameters unchanged.
"""
if additional_params_populate_blank_empty is False:
return additional_params
new_additional_params = {}
for k, v in additional_params.items():
if v == "" or v is None:
if v == "" or v is None: # if the value is blank or empty, and additional_params_populate_blank_empty is True, populate with a random string
new_additional_params[k] = self.lightfuzz.helpers.rand_string(10, numeric_only=True)
else:
new_additional_params[k] = v
Expand All @@ -21,6 +36,11 @@ def additional_params_process(self, additional_params, additional_params_populat
def compare_baseline(
self, event_type, probe, cookies, additional_params_populate_empty=False, speculative_mode="GETPARAM"
):
"""
Initializes the http_compare object and executes a probe to establish a baseline for comparison.

Handles each of the types of WEB_PARAMETERS (GETPARAM, COOKIE, HEADER, POSTPARAM, BODYJSON)
"""
probe = self.outgoing_probe_value(probe)
http_compare = None

Expand Down Expand Up @@ -71,6 +91,9 @@ def compare_baseline(
return http_compare

async def baseline_probe(self, cookies):
"""
Executes a baseline probe to establish a baseline for comparison.
"""
if self.event.data.get("eventtype") in ["POSTPARAM", "BODYJSON"]:
method = "POST"
else:
Expand All @@ -95,8 +118,12 @@ async def compare_probe(
additional_params_override={},
speculative_mode="GETPARAM",
):
"""
Executes a probe to compare against a baseline.
"""
probe = self.outgoing_probe_value(probe)
additional_params = copy.deepcopy(self.event.data.get("additional_params", {}))
additional_params = copy.deepcopy(self.event.data.get("additional_params", {}))
# Create a complete copy to avoid modifying the original additional_params
if additional_params_override:
for k, v in additional_params_override.items():
additional_params[k] = v
Expand Down Expand Up @@ -142,6 +169,9 @@ async def standard_probe(
speculative_mode="GETPARAM",
allow_redirects=False,
):
"""
Send a probe to the target URL, abstracting away the details associated with each WEB_PARAMETER type.
"""

probe = self.outgoing_probe_value(probe)

Expand Down Expand Up @@ -214,6 +244,9 @@ def metadata(self):
return metadata_string

def incoming_probe_value(self, populate_empty=True):
"""
Transparently modifies the incoming probe value (the original value of the WEB_PARAMETER), given any envelopes that may have been identified, so that fuzzing within the envelopes can occur.
"""
envelopes = getattr(self.event, "envelopes", None)
probe_value = ""
if envelopes is not None:
Expand All @@ -224,14 +257,13 @@ def incoming_probe_value(self, populate_empty=True):
probe_value = self.lightfuzz.helpers.rand_string(10, numeric_only=True)
else:
probe_value = ""
# if not isinstance(probe_value, str):
# raise ValueError(
# f"incoming_probe_value should always be a string (got {type(probe_value)} / {probe_value})"
# )
probe_value = str(probe_value)
return probe_value

def outgoing_probe_value(self, outgoing_probe_value):
"""
Transparently modifies the outgoing probe value (fuzz probe being sent to the target), given any envelopes that may have been identified, so that fuzzing within the envelopes can occur.
"""
self.lightfuzz.debug(f"outgoing_probe_value (before packing): {outgoing_probe_value} / {self.event}")
envelopes = getattr(self.event, "envelopes", None)
if envelopes is not None:
Expand Down
20 changes: 13 additions & 7 deletions bbot/modules/lightfuzz_submodules/cmdi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

class CmdILightfuzz(BaseLightfuzz):
async def fuzz(self):
cookies = self.event.data.get("assigned_cookies", {})
cookies = self.event.data.get("assigned_cookies", {}) # Retrieve assigned cookies from WEB_PARAMETER event data, if present
probe_value = self.incoming_probe_value()

canary = self.lightfuzz.helpers.rand_string(10, numeric_only=True)
http_compare = self.compare_baseline(self.event.data["type"], probe_value, cookies)
http_compare = self.compare_baseline(self.event.data["type"], probe_value, cookies) # Initialize the http_compare object and establish a baseline HTTP response

cmdi_probe_strings = [
"AAAA",
"AAAA", # False positive probe
";",
"&&",
"||",
Expand All @@ -24,19 +24,23 @@ async def fuzz(self):
positive_detections = []
for p in cmdi_probe_strings:
try:
# add "echo" to the cmdi probe value to construct the command to be executed
echo_probe = f"{probe_value}{p} echo {canary} {p}"
if self.event.data["type"] == "GETPARAM":
echo_probe = urllib.parse.quote(echo_probe.encode(), safe="")
# send cmdi probe and compare with baseline response
cmdi_probe = await self.compare_probe(http_compare, self.event.data["type"], echo_probe, cookies)
# ensure we received an HTTP response
if cmdi_probe[3]:
# check if the canary is in the response and the word "echo" is NOT in the response text, ruling out mere reflection of the entire probe value without execution
if canary in cmdi_probe[3].text and "echo" not in cmdi_probe[3].text:
self.lightfuzz.debug(f"canary [{canary}] found in response when sending probe [{p}]")
if p == "AAAA":
if p == "AAAA": # Handle detection false positive probe
self.lightfuzz.warning(
f"False Postive Probe appears to have been triggered for {self.event.data['url']}, aborting remaining detection"
)
return
positive_detections.append(p)
positive_detections.append(p) # Add detected probes to positive detections
except HttpCompareError as e:
self.lightfuzz.debug(e)
continue
Expand All @@ -49,21 +53,23 @@ async def fuzz(self):
)

# Blind OS Command Injection

if self.lightfuzz.interactsh_instance:
self.lightfuzz.event_dict[self.event.data["url"]] = self.event
self.lightfuzz.event_dict[self.event.data["url"]] = self.event # Store the event associated with the URL
for p in cmdi_probe_strings:
# generate a random subdomain tag and associate it with the event, type, name, and probe
subdomain_tag = self.lightfuzz.helpers.rand_string(4, digits=False)
self.lightfuzz.interactsh_subdomain_tags[subdomain_tag] = {
"event": self.event,
"type": self.event.data["type"],
"name": self.event.data["name"],
"probe": p,
}
# payload is an nslookup command that includes the interactsh domain prepended the previously generated subdomain tag
interactsh_probe = f"{p} nslookup {subdomain_tag}.{self.lightfuzz.interactsh_domain} {p}"

if self.event.data["type"] == "GETPARAM":
interactsh_probe = urllib.parse.quote(interactsh_probe.encode(), safe="")
# we send the probe here, and any positive detections are processed in the interactsh_callback defined in lightfuzz.py
await self.standard_probe(
self.event.data["type"], cookies, f"{probe_value}{interactsh_probe}", timeout=15
)
Loading
Loading