Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(iast): header source in werkzeug 3.1 #12213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions ddtrace/appsec/_iast/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,28 @@ def _on_flask_patch(flask_version):
"Headers.items",
functools.partial(if_iast_taint_yield_tuple_for, (OriginType.HEADER_NAME, OriginType.HEADER)),
)
_set_metric_iast_instrumented_source(OriginType.HEADER_NAME)
_set_metric_iast_instrumented_source(OriginType.HEADER)

try_wrap_function_wrapper(
"werkzeug.datastructures",
"ImmutableMultiDict.__getitem__",
functools.partial(if_iast_taint_returned_object_for, OriginType.PARAMETER),
"EnvironHeaders.__getitem__",
functools.partial(if_iast_taint_returned_object_for, OriginType.HEADER),
)
_set_metric_iast_instrumented_source(OriginType.PARAMETER)

# Since werkzeug 3.1.0 get doesn't call to __getitem__
try_wrap_function_wrapper(
"werkzeug.datastructures",
"EnvironHeaders.__getitem__",
"EnvironHeaders.get",
functools.partial(if_iast_taint_returned_object_for, OriginType.HEADER),
)
_set_metric_iast_instrumented_source(OriginType.HEADER_NAME)
_set_metric_iast_instrumented_source(OriginType.HEADER)

try_wrap_function_wrapper(
"werkzeug.datastructures",
"ImmutableMultiDict.__getitem__",
functools.partial(if_iast_taint_returned_object_for, OriginType.PARAMETER),
)
_set_metric_iast_instrumented_source(OriginType.PARAMETER)

if flask_version >= (2, 0, 0):
# instance.query_string: raising an error on werkzeug/_internal.py "AttributeError: read only property"
try_wrap_function_wrapper("werkzeug.wrappers.request", "Request.__init__", _on_request_init)
Expand Down
7 changes: 7 additions & 0 deletions hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,13 @@ flask = ["~=2.2"]
python = ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
flask = ["~=3.0"]

[[envs.appsec_integrations_flask.matrix]]
# werkzeug 3.1 drops support for py3.8
python = ["3.11", "3.12", "3.13"]
flask = ["~=3.1"]
werkzeug = ["~=3.1"]

## ASM appsec_integrations_fastapi

[envs.appsec_integrations_fastapi]
template = "appsec_integrations_fastapi"
Expand Down
56 changes: 56 additions & 0 deletions tests/appsec/integrations/flask_tests/test_iast_flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,62 @@ def sqli_2(param_str):
assert vulnerability["location"]["path"] == TEST_FILE_PATH
assert vulnerability["hash"] == hash_value

@pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST")
def test_flask_iast_enabled_http_request_header_get(self):
@self.app.route("/sqli/<string:param_str>/", methods=["GET", "POST"])
def sqli_2(param_str):
import sqlite3

from flask import request

from ddtrace.appsec._iast._taint_tracking.aspects import add_aspect

con = sqlite3.connect(":memory:")
cur = con.cursor()
# label test_flask_iast_enabled_http_request_header_get
cur.execute(add_aspect("SELECT 1 FROM ", request.headers.get("User-Agent")))

return "OK", 200

with override_global_config(
dict(
_iast_enabled=True,
_iast_deduplication_enabled=False,
)
):
resp = self.client.post(
"/sqli/sqlite_master/", data={"name": "test"}, headers={"User-Agent": "sqlite_master"}
)
assert resp.status_code == 200

root_span = self.pop_spans()[0]
assert root_span.get_metric(IAST.ENABLED) == 1.0

loaded = json.loads(root_span.get_tag(IAST.JSON))
assert loaded["sources"] == [
{"origin": "http.request.header", "name": "User-Agent", "value": "sqlite_master"}
]

line, hash_value = get_line_and_hash(
"test_flask_iast_enabled_http_request_header_get",
VULN_SQL_INJECTION,
filename=TEST_FILE_PATH,
)
vulnerability = loaded["vulnerabilities"][0]

assert vulnerability["type"] == VULN_SQL_INJECTION
assert vulnerability["evidence"] == {
"valueParts": [
{"value": "SELECT "},
{"redacted": True},
{"value": " FROM "},
{"value": "sqlite_master", "source": 0},
]
}
assert vulnerability["location"]["line"] == line
assert vulnerability["location"]["path"] == TEST_FILE_PATH
assert vulnerability["hash"] == hash_value

@pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST")
def test_flask_full_sqli_iast_enabled_http_request_header_name_keys(self):
@self.app.route("/sqli/<string:param_str>/", methods=["GET", "POST"])
Expand Down
Loading