diff --git a/fuzz/corpus/fuzz_decoders/fuzz_decoders b/fuzz/corpus/fuzz_decoders/0 similarity index 100% rename from fuzz/corpus/fuzz_decoders/fuzz_decoders rename to fuzz/corpus/fuzz_decoders/0 diff --git a/fuzz/corpus/fuzz_form/fuzz_form b/fuzz/corpus/fuzz_form/0 similarity index 100% rename from fuzz/corpus/fuzz_form/fuzz_form rename to fuzz/corpus/fuzz_form/0 diff --git a/fuzz/corpus/fuzz_multipart_parser/0 b/fuzz/corpus/fuzz_multipart_parser/0 new file mode 100644 index 0000000..3dd943e --- /dev/null +++ b/fuzz/corpus/fuzz_multipart_parser/0 @@ -0,0 +1,5 @@ +--boundary +Content-Disposition: form-data; name="field" + +value +--boundary-- diff --git a/fuzz/corpus/fuzz_multipart_parser/1 b/fuzz/corpus/fuzz_multipart_parser/1 new file mode 100644 index 0000000..3ff9ec0 --- /dev/null +++ b/fuzz/corpus/fuzz_multipart_parser/1 @@ -0,0 +1,6 @@ +--x +Content-Disposition: form-data; name="a"; filename="..\\x" +Content-Transfer-Encoding: base64 + +%%%= +--x-- diff --git a/fuzz/corpus/fuzz_options_header/fuzz_options_header b/fuzz/corpus/fuzz_options_header/0 similarity index 100% rename from fuzz/corpus/fuzz_options_header/fuzz_options_header rename to fuzz/corpus/fuzz_options_header/0 diff --git a/fuzz/corpus/fuzz_querystring/0 b/fuzz/corpus/fuzz_querystring/0 new file mode 100644 index 0000000..118274d --- /dev/null +++ b/fuzz/corpus/fuzz_querystring/0 @@ -0,0 +1 @@ +a=b&c=d diff --git a/fuzz/corpus/fuzz_querystring/1 b/fuzz/corpus/fuzz_querystring/1 new file mode 100644 index 0000000..c4bf6b3 --- /dev/null +++ b/fuzz/corpus/fuzz_querystring/1 @@ -0,0 +1 @@ +&&a=%ZZ&=b&k&x==1 diff --git a/fuzz/fuzz_decoders.py b/fuzz/fuzz_decoders.py index 543c299..c9b845a 100644 --- a/fuzz/fuzz_decoders.py +++ b/fuzz/fuzz_decoders.py @@ -1,6 +1,9 @@ import io +import logging import sys +logging.disable(logging.CRITICAL) + import atheris from helpers import EnhancedDataProvider @@ -14,15 +17,40 @@ def fuzz_base64_decoder(fdp: EnhancedDataProvider) -> None: decoder.finalize() +def fuzz_base64_decoder_chunked(fdp: EnhancedDataProvider) -> None: + decoder = Base64Decoder(io.BytesIO()) + num_chunks = fdp.ConsumeIntInRange(1, 8) + body = fdp.ConsumeRandomBytes() + chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks) + for i in range(0, len(body), chunk_size): + decoder.write(body[i : i + chunk_size]) + decoder.finalize() + + def fuzz_quoted_decoder(fdp: EnhancedDataProvider) -> None: decoder = QuotedPrintableDecoder(io.BytesIO()) decoder.write(fdp.ConsumeRandomBytes()) decoder.finalize() +def fuzz_quoted_decoder_chunked(fdp: EnhancedDataProvider) -> None: + decoder = QuotedPrintableDecoder(io.BytesIO()) + num_chunks = fdp.ConsumeIntInRange(1, 8) + body = fdp.ConsumeRandomBytes() + chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks) + for i in range(0, len(body), chunk_size): + decoder.write(body[i : i + chunk_size]) + decoder.finalize() + + def TestOneInput(data: bytes) -> None: fdp = EnhancedDataProvider(data) - targets = [fuzz_base64_decoder, fuzz_quoted_decoder] + targets = [ + fuzz_base64_decoder, + fuzz_base64_decoder_chunked, + fuzz_quoted_decoder, + fuzz_quoted_decoder_chunked, + ] target = fdp.PickValueInList(targets) try: diff --git a/fuzz/fuzz_form.py b/fuzz/fuzz_form.py index 9a3d854..b81efcc 100644 --- a/fuzz/fuzz_form.py +++ b/fuzz/fuzz_form.py @@ -1,6 +1,8 @@ import io +import logging import sys -from unittest.mock import Mock + +logging.disable(logging.CRITICAL) import atheris from helpers import EnhancedDataProvider @@ -9,40 +11,68 @@ from python_multipart.exceptions import FormParserError from python_multipart.multipart import parse_form -on_field = Mock() -on_file = Mock() + +def _on_field(field) -> None: + pass + + +def _on_file(file) -> None: + pass def parse_octet_stream(fdp: EnhancedDataProvider) -> None: header = {"Content-Type": "application/octet-stream"} - parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file) + parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file) def parse_url_encoded(fdp: EnhancedDataProvider) -> None: - header = {"Content-Type": "application/x-url-encoded"} - parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file) + ct = fdp.PickValueInList(["application/x-url-encoded", "application/x-www-form-urlencoded"]) + header = {"Content-Type": ct} + parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file) -def parse_form_urlencoded(fdp: EnhancedDataProvider) -> None: - header = {"Content-Type": "application/x-www-form-urlencoded"} - parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file) +def parse_multipart_raw(fdp: EnhancedDataProvider) -> None: + # Boundary: 1-70 bytes, no CR/LF (RFC 2046 constraint kept to avoid ValueError). + boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2))) + boundary = fdp.ConsumeBytes(boundary_len) + boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B" + header = {"Content-Type": "multipart/form-data; boundary=" + boundary.decode("latin-1")} + body = fdp.ConsumeRandomBytes() + parse_form(header, io.BytesIO(body), _on_field, _on_file) + +def parse_multipart_with_content_length(fdp: EnhancedDataProvider) -> None: + boundary = b"boundary" + content_length = fdp.ConsumeIntInRange(0, 1024) + header = { + "Content-Type": "multipart/form-data; boundary=boundary", + "Content-Length": str(content_length), + } + body = fdp.ConsumeRandomBytes() + parse_form(header, io.BytesIO(body), _on_field, _on_file) -def parse_multipart_form_data(fdp: EnhancedDataProvider) -> None: - boundary = "boundary" - header = {"Content-Type": f"multipart/form-data; boundary={boundary}"} - body = ( - f"--{boundary}\r\n" - f"Content-Type: multipart/form-data; boundary={boundary}\r\n\r\n" - f"{fdp.ConsumeRandomString()}\r\n" - f"--{boundary}--\r\n" - ) - parse_form(header, io.BytesIO(body.encode("latin1", errors="ignore")), on_field, on_file) +def parse_form_urlencoded_chunked(fdp: EnhancedDataProvider) -> None: + from python_multipart.multipart import create_form_parser + + num_chunks = fdp.ConsumeIntInRange(1, 8) + header = {"Content-Type": "application/x-www-form-urlencoded"} + parser = create_form_parser(header, _on_field, _on_file) + body = fdp.ConsumeRandomBytes() + chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks) + for i in range(0, len(body), chunk_size): + parser.write(body[i : i + chunk_size]) + parser.finalize() def TestOneInput(data: bytes) -> None: fdp = EnhancedDataProvider(data) - targets = [parse_octet_stream, parse_url_encoded, parse_form_urlencoded, parse_multipart_form_data] + targets = [ + parse_octet_stream, + parse_url_encoded, + parse_multipart_raw, + parse_multipart_with_content_length, + parse_form_urlencoded_chunked, + ] target = fdp.PickValueInList(targets) try: diff --git a/fuzz/fuzz_multipart_parser.py b/fuzz/fuzz_multipart_parser.py new file mode 100644 index 0000000..d31c8d4 --- /dev/null +++ b/fuzz/fuzz_multipart_parser.py @@ -0,0 +1,103 @@ +import logging +import sys + +logging.disable(logging.CRITICAL) + +import atheris +from helpers import EnhancedDataProvider + +with atheris.instrument_imports(): + from python_multipart.exceptions import MultipartParseError + from python_multipart.multipart import MultipartParser + + +def _noop() -> None: + pass + + +def _noop_data(data: bytes, start: int, end: int) -> None: + pass + + +def _make_parser(boundary: bytes, max_size: float = float("inf")) -> MultipartParser: + return MultipartParser( + boundary, + callbacks={ + "on_part_begin": _noop, + "on_part_data": _noop_data, + "on_part_end": _noop, + "on_header_begin": _noop, + "on_header_field": _noop_data, + "on_header_value": _noop_data, + "on_header_end": _noop, + "on_headers_finished": _noop, + "on_end": _noop, + }, + max_size=max_size, + ) + + +def fuzz_single_write(fdp: EnhancedDataProvider) -> None: + boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2))) + boundary = fdp.ConsumeBytes(boundary_len) + # Drop CR/LF to avoid ValueError from MultipartParser boundary validation. + boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B" + + parser = _make_parser(boundary) + parser.write(fdp.ConsumeRandomBytes()) + parser.finalize() + + +def fuzz_chunked_write(fdp: EnhancedDataProvider) -> None: + boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 3))) + boundary = fdp.ConsumeBytes(boundary_len) + boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B" + + num_chunks = fdp.ConsumeIntInRange(1, 16) + parser = _make_parser(boundary) + body = fdp.ConsumeRandomBytes() + if body: + chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks) + for i in range(0, len(body), chunk_size): + parser.write(body[i : i + chunk_size]) + parser.finalize() + + +def fuzz_max_size(fdp: EnhancedDataProvider) -> None: + boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2))) + boundary = fdp.ConsumeBytes(boundary_len) + boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B" + + max_size = fdp.ConsumeIntInRange(1, 2048) + parser = _make_parser(boundary, max_size=max_size) + parser.write(fdp.ConsumeRandomBytes()) + parser.finalize() + + +def fuzz_invalid_boundary_constructor(fdp: EnhancedDataProvider) -> None: + boundary_len = fdp.ConsumeIntInRange(0, min(70, fdp.remaining_bytes())) + boundary = fdp.ConsumeBytes(boundary_len) + try: + _make_parser(boundary) + except ValueError: + return + + +def TestOneInput(data: bytes) -> None: + fdp = EnhancedDataProvider(data) + targets = [fuzz_single_write, fuzz_chunked_write, fuzz_max_size, fuzz_invalid_boundary_constructor] + target = fdp.PickValueInList(targets) + + try: + target(fdp) + except MultipartParseError: + return + + +def main(): + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + + +if __name__ == "__main__": + main() diff --git a/fuzz/fuzz_options_header.py b/fuzz/fuzz_options_header.py index 2546eaf..8fa0a95 100644 --- a/fuzz/fuzz_options_header.py +++ b/fuzz/fuzz_options_header.py @@ -1,5 +1,8 @@ +import logging import sys +logging.disable(logging.CRITICAL) + import atheris from helpers import EnhancedDataProvider @@ -7,14 +10,25 @@ from python_multipart.multipart import parse_options_header +def fuzz_bytes_input(fdp: EnhancedDataProvider) -> None: + # WSGI: bytes received from the network, decoded as latin-1 inside the function. + parse_options_header(fdp.ConsumeRandomBytes()) + + +def fuzz_string_input(fdp: EnhancedDataProvider) -> None: + # Simulate a caller that already decoded the header value as latin-1. + raw = fdp.ConsumeRandomBytes() + parse_options_header(raw.decode("latin-1")) + + +def fuzz_none_input(fdp: EnhancedDataProvider) -> None: + parse_options_header(None) + + def TestOneInput(data: bytes) -> None: fdp = EnhancedDataProvider(data) - try: - parse_options_header(fdp.ConsumeRandomBytes()) - except AssertionError: - return - except TypeError: - return + target = fdp.PickValueInList([fuzz_bytes_input, fuzz_string_input, fuzz_none_input]) + target(fdp) def main(): diff --git a/fuzz/fuzz_querystring.py b/fuzz/fuzz_querystring.py new file mode 100644 index 0000000..af638ce --- /dev/null +++ b/fuzz/fuzz_querystring.py @@ -0,0 +1,95 @@ +import logging +import sys + +logging.disable(logging.CRITICAL) + +import atheris +from helpers import EnhancedDataProvider + +with atheris.instrument_imports(): + from python_multipart.exceptions import QuerystringParseError + from python_multipart.multipart import QuerystringParser + + +def _noop_data(data: bytes, start: int, end: int) -> None: + pass + + +def _noop() -> None: + pass + + +def fuzz_single_write(fdp: EnhancedDataProvider) -> None: + strict = fdp.ConsumeBool() + parser = QuerystringParser( + callbacks={ + "on_field_start": _noop, + "on_field_name": _noop_data, + "on_field_data": _noop_data, + "on_field_end": _noop, + "on_end": _noop, + }, + strict_parsing=strict, + ) + parser.write(fdp.ConsumeRandomBytes()) + parser.finalize() + + +def fuzz_chunked_write(fdp: EnhancedDataProvider) -> None: + strict = fdp.ConsumeBool() + num_chunks = fdp.ConsumeIntInRange(1, 8) + parser = QuerystringParser( + callbacks={ + "on_field_start": _noop, + "on_field_name": _noop_data, + "on_field_data": _noop_data, + "on_field_end": _noop, + "on_end": _noop, + }, + strict_parsing=strict, + ) + body = fdp.ConsumeRandomBytes() + chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks) + for i in range(0, len(body), chunk_size): + parser.write(body[i : i + chunk_size]) + parser.finalize() + + +def fuzz_max_size(fdp: EnhancedDataProvider) -> None: + body = fdp.ConsumeRandomBytes() + body_len = max(1, len(body)) + # Pick max_size anywhere from 1 byte up to 2× the body — covers both + # "truncate heavily" and "allow everything through" branches. + max_size = fdp.ConsumeIntInRange(1, body_len * 2) + parser = QuerystringParser( + callbacks={ + "on_field_start": _noop, + "on_field_name": _noop_data, + "on_field_data": _noop_data, + "on_field_end": _noop, + "on_end": _noop, + }, + max_size=max_size, + ) + parser.write(body) + parser.finalize() + + +def TestOneInput(data: bytes) -> None: + fdp = EnhancedDataProvider(data) + targets = [fuzz_single_write, fuzz_chunked_write, fuzz_max_size] + target = fdp.PickValueInList(targets) + + try: + target(fdp) + except QuerystringParseError: + return + + +def main(): + atheris.Setup(sys.argv, TestOneInput) + atheris.Fuzz() + + +if __name__ == "__main__": + main() diff --git a/fuzz/helpers.py b/fuzz/helpers.py index 7fcd45c..1ed702a 100644 --- a/fuzz/helpers.py +++ b/fuzz/helpers.py @@ -7,3 +7,6 @@ def ConsumeRandomBytes(self) -> bytes: def ConsumeRandomString(self) -> str: return self.ConsumeUnicodeNoSurrogates(self.ConsumeIntInRange(0, self.remaining_bytes())) + + def ConsumeBool(self) -> bool: + return bool(self.ConsumeInt(1))