Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
File renamed without changes.
5 changes: 5 additions & 0 deletions fuzz/corpus/fuzz_multipart_parser/0
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
--boundary
Content-Disposition: form-data; name="field"

value
--boundary--
6 changes: 6 additions & 0 deletions fuzz/corpus/fuzz_multipart_parser/1
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
--x
Content-Disposition: form-data; name="a"; filename="..\\x"
Content-Transfer-Encoding: base64

%%%=
--x--
1 change: 1 addition & 0 deletions fuzz/corpus/fuzz_querystring/0
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a=b&c=d
1 change: 1 addition & 0 deletions fuzz/corpus/fuzz_querystring/1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
&&a=%ZZ&=b&k&x==1
30 changes: 29 additions & 1 deletion fuzz/fuzz_decoders.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import io
import logging
import sys

logging.disable(logging.CRITICAL)

import atheris
from helpers import EnhancedDataProvider

Expand All @@ -14,15 +17,40 @@ def fuzz_base64_decoder(fdp: EnhancedDataProvider) -> None:
decoder.finalize()


def fuzz_base64_decoder_chunked(fdp: EnhancedDataProvider) -> None:
decoder = Base64Decoder(io.BytesIO())
num_chunks = fdp.ConsumeIntInRange(1, 8)
body = fdp.ConsumeRandomBytes()
chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks)
for i in range(0, len(body), chunk_size):
decoder.write(body[i : i + chunk_size])
decoder.finalize()


def fuzz_quoted_decoder(fdp: EnhancedDataProvider) -> None:
decoder = QuotedPrintableDecoder(io.BytesIO())
decoder.write(fdp.ConsumeRandomBytes())
decoder.finalize()


def fuzz_quoted_decoder_chunked(fdp: EnhancedDataProvider) -> None:
decoder = QuotedPrintableDecoder(io.BytesIO())
num_chunks = fdp.ConsumeIntInRange(1, 8)
body = fdp.ConsumeRandomBytes()
chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks)
for i in range(0, len(body), chunk_size):
decoder.write(body[i : i + chunk_size])
decoder.finalize()


def TestOneInput(data: bytes) -> None:
fdp = EnhancedDataProvider(data)
targets = [fuzz_base64_decoder, fuzz_quoted_decoder]
targets = [
fuzz_base64_decoder,
fuzz_base64_decoder_chunked,
fuzz_quoted_decoder,
fuzz_quoted_decoder_chunked,
]
target = fdp.PickValueInList(targets)

try:
Expand Down
70 changes: 50 additions & 20 deletions fuzz/fuzz_form.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import io
import logging
import sys
from unittest.mock import Mock

logging.disable(logging.CRITICAL)

import atheris
from helpers import EnhancedDataProvider
Expand All @@ -9,40 +11,68 @@
from python_multipart.exceptions import FormParserError
from python_multipart.multipart import parse_form

on_field = Mock()
on_file = Mock()

def _on_field(field) -> None:
pass


def _on_file(file) -> None:
pass


def parse_octet_stream(fdp: EnhancedDataProvider) -> None:
header = {"Content-Type": "application/octet-stream"}
parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file)
parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file)


def parse_url_encoded(fdp: EnhancedDataProvider) -> None:
header = {"Content-Type": "application/x-url-encoded"}
parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file)
ct = fdp.PickValueInList(["application/x-url-encoded", "application/x-www-form-urlencoded"])
header = {"Content-Type": ct}
parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file)


def parse_form_urlencoded(fdp: EnhancedDataProvider) -> None:
header = {"Content-Type": "application/x-www-form-urlencoded"}
parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file)
def parse_multipart_raw(fdp: EnhancedDataProvider) -> None:
# Boundary: 1-70 bytes, no CR/LF (RFC 2046 constraint kept to avoid ValueError).
boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2)))
boundary = fdp.ConsumeBytes(boundary_len)
boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B"
header = {"Content-Type": "multipart/form-data; boundary=" + boundary.decode("latin-1")}
body = fdp.ConsumeRandomBytes()
parse_form(header, io.BytesIO(body), _on_field, _on_file)


def parse_multipart_with_content_length(fdp: EnhancedDataProvider) -> None:
boundary = b"boundary"
content_length = fdp.ConsumeIntInRange(0, 1024)
header = {
"Content-Type": "multipart/form-data; boundary=boundary",
"Content-Length": str(content_length),
}
body = fdp.ConsumeRandomBytes()
parse_form(header, io.BytesIO(body), _on_field, _on_file)

def parse_multipart_form_data(fdp: EnhancedDataProvider) -> None:
boundary = "boundary"
header = {"Content-Type": f"multipart/form-data; boundary={boundary}"}
body = (
f"--{boundary}\r\n"
f"Content-Type: multipart/form-data; boundary={boundary}\r\n\r\n"
f"{fdp.ConsumeRandomString()}\r\n"
f"--{boundary}--\r\n"
)
parse_form(header, io.BytesIO(body.encode("latin1", errors="ignore")), on_field, on_file)

def parse_form_urlencoded_chunked(fdp: EnhancedDataProvider) -> None:
from python_multipart.multipart import create_form_parser

num_chunks = fdp.ConsumeIntInRange(1, 8)
header = {"Content-Type": "application/x-www-form-urlencoded"}
parser = create_form_parser(header, _on_field, _on_file)
body = fdp.ConsumeRandomBytes()
chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks)
for i in range(0, len(body), chunk_size):
parser.write(body[i : i + chunk_size])
parser.finalize()

def TestOneInput(data: bytes) -> None:
fdp = EnhancedDataProvider(data)
targets = [parse_octet_stream, parse_url_encoded, parse_form_urlencoded, parse_multipart_form_data]
targets = [
parse_octet_stream,
parse_url_encoded,
parse_multipart_raw,
parse_multipart_with_content_length,
parse_form_urlencoded_chunked,
]
target = fdp.PickValueInList(targets)

try:
Expand Down
103 changes: 103 additions & 0 deletions fuzz/fuzz_multipart_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import logging
import sys

logging.disable(logging.CRITICAL)

import atheris
from helpers import EnhancedDataProvider

with atheris.instrument_imports():
from python_multipart.exceptions import MultipartParseError
from python_multipart.multipart import MultipartParser


def _noop() -> None:
pass


def _noop_data(data: bytes, start: int, end: int) -> None:
pass


def _make_parser(boundary: bytes, max_size: float = float("inf")) -> MultipartParser:
return MultipartParser(
boundary,
callbacks={
"on_part_begin": _noop,
"on_part_data": _noop_data,
"on_part_end": _noop,
"on_header_begin": _noop,
"on_header_field": _noop_data,
"on_header_value": _noop_data,
"on_header_end": _noop,
"on_headers_finished": _noop,
"on_end": _noop,
},
max_size=max_size,
)


def fuzz_single_write(fdp: EnhancedDataProvider) -> None:
boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2)))
boundary = fdp.ConsumeBytes(boundary_len)
# Drop CR/LF to avoid ValueError from MultipartParser boundary validation.
boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B"

parser = _make_parser(boundary)
parser.write(fdp.ConsumeRandomBytes())
parser.finalize()


def fuzz_chunked_write(fdp: EnhancedDataProvider) -> None:
boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 3)))
boundary = fdp.ConsumeBytes(boundary_len)
boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B"

num_chunks = fdp.ConsumeIntInRange(1, 16)
parser = _make_parser(boundary)
body = fdp.ConsumeRandomBytes()
if body:
chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks)
for i in range(0, len(body), chunk_size):
parser.write(body[i : i + chunk_size])
parser.finalize()


def fuzz_max_size(fdp: EnhancedDataProvider) -> None:
boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2)))
boundary = fdp.ConsumeBytes(boundary_len)
boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B"

max_size = fdp.ConsumeIntInRange(1, 2048)
parser = _make_parser(boundary, max_size=max_size)
parser.write(fdp.ConsumeRandomBytes())
parser.finalize()


def fuzz_invalid_boundary_constructor(fdp: EnhancedDataProvider) -> None:
boundary_len = fdp.ConsumeIntInRange(0, min(70, fdp.remaining_bytes()))
boundary = fdp.ConsumeBytes(boundary_len)
try:
_make_parser(boundary)
except ValueError:
return


def TestOneInput(data: bytes) -> None:
fdp = EnhancedDataProvider(data)
targets = [fuzz_single_write, fuzz_chunked_write, fuzz_max_size, fuzz_invalid_boundary_constructor]
target = fdp.PickValueInList(targets)

try:
target(fdp)
except MultipartParseError:
return


def main():
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()


if __name__ == "__main__":
main()
26 changes: 20 additions & 6 deletions fuzz/fuzz_options_header.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,34 @@
import logging
import sys

logging.disable(logging.CRITICAL)

import atheris
from helpers import EnhancedDataProvider

with atheris.instrument_imports():
from python_multipart.multipart import parse_options_header


def fuzz_bytes_input(fdp: EnhancedDataProvider) -> None:
# WSGI: bytes received from the network, decoded as latin-1 inside the function.
parse_options_header(fdp.ConsumeRandomBytes())


def fuzz_string_input(fdp: EnhancedDataProvider) -> None:
# Simulate a caller that already decoded the header value as latin-1.
raw = fdp.ConsumeRandomBytes()
parse_options_header(raw.decode("latin-1"))


def fuzz_none_input(fdp: EnhancedDataProvider) -> None:
parse_options_header(None)


def TestOneInput(data: bytes) -> None:
fdp = EnhancedDataProvider(data)
try:
parse_options_header(fdp.ConsumeRandomBytes())
except AssertionError:
return
except TypeError:
return
target = fdp.PickValueInList([fuzz_bytes_input, fuzz_string_input, fuzz_none_input])
target(fdp)


def main():
Expand Down
Loading