diff --git a/src/requests/utils.py b/src/requests/utils.py index 42238375c8..3d0fa88673 100644 --- a/src/requests/utils.py +++ b/src/requests/utils.py @@ -73,44 +73,65 @@ if sys.platform == "win32": # provide a proxy_bypass version on Windows without DNS lookups - def proxy_bypass_registry(host): + def _get_internet_settings_key(): try: import winreg except ImportError: - return False + return None try: - internetSettings = winreg.OpenKey( + return winreg.OpenKey( winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", ) - # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it - proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) - # ProxyOverride is almost always a string - proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] except (OSError, ValueError): - return False - if not proxyEnable or not proxyOverride: - return False + return None + + def _read_proxy_settings(internet_settings): + import winreg + try: + proxy_enable = int( + winreg.QueryValueEx(internet_settings, "ProxyEnable")[0] + ) + proxy_override = winreg.QueryValueEx(internet_settings, "ProxyOverride")[0] + except (OSError, ValueError): + return None, None + + return proxy_enable, proxy_override + + def _iter_proxy_override_patterns(proxy_override): # make a check value list from the registry entry: replace the # '' string by the localhost entry and the corresponding # canonical entry. - proxyOverride = proxyOverride.split(";") + overrides = proxy_override.split(";") # filter out empty strings to avoid re.match return true in the following code. - proxyOverride = filter(None, proxyOverride) - # now check if we match one of the registry values. - for test in proxyOverride: + return filter(None, overrides) + + def _host_matches_proxy_override(host, proxy_override): + for test in _iter_proxy_override_patterns(proxy_override): if test == "": if "." not in host: return True - test = test.replace(".", r"\.") # mask dots - test = test.replace("*", r".*") # change glob sequence - test = test.replace("?", r".") # change glob char - if re.match(test, host, re.I): + continue + pattern = test.replace(".", r"\.") # mask dots + pattern = pattern.replace("*", r".*") # change glob sequence + pattern = pattern.replace("?", r".") # change glob char + if re.match(pattern, host, re.I): return True return False + def proxy_bypass_registry(host): + internet_settings = _get_internet_settings_key() + if internet_settings is None: + return False + + proxy_enable, proxy_override = _read_proxy_settings(internet_settings) + if not proxy_enable or not proxy_override: + return False + + return _host_matches_proxy_override(host, proxy_override) + def proxy_bypass(host): # noqa """Return True, if the host should be bypassed. @@ -132,70 +153,100 @@ def dict_to_sequence(d): return d -def super_len(o): - total_length = None - current_position = 0 - +def _coerce_body_to_bytes_if_needed(o): if not is_urllib3_1 and isinstance(o, str): # urllib3 2.x+ treats all strings as utf-8 instead # of latin-1 (iso-8859-1) like http.client. - o = o.encode("utf-8") + return o.encode("utf-8") + return o + +def _length_from_attr(o): if hasattr(o, "__len__"): - total_length = len(o) + return len(o) + if hasattr(o, "len"): + return o.len + return None - elif hasattr(o, "len"): - total_length = o.len - elif hasattr(o, "fileno"): - try: - fileno = o.fileno() - except (io.UnsupportedOperation, AttributeError): - # AttributeError is a surprising exception, seeing as how we've just checked - # that `hasattr(o, 'fileno')`. It happens for objects obtained via - # `Tarfile.extractfile()`, per issue 5229. - pass - else: - total_length = os.fstat(fileno).st_size - - # Having used fstat to determine the file length, we need to - # confirm that this file was opened up in binary mode. - if "b" not in o.mode: - warnings.warn( - ( - "Requests has determined the content-length for this " - "request using the binary size of the file: however, the " - "file has been opened in text mode (i.e. without the 'b' " - "flag in the mode). This may lead to an incorrect " - "content-length. In Requests 3.0, support will be removed " - "for files in text mode." - ), - FileModeWarning, - ) - - if hasattr(o, "tell"): - try: - current_position = o.tell() - except OSError: - # This can happen in some weird situations, such as when the file - # is actually a special file descriptor like stdin. In this - # instance, we don't know what the length is, so set it to zero and - # let requests chunk it instead. - if total_length is not None: - current_position = total_length - else: - if hasattr(o, "seek") and total_length is None: - # StringIO and BytesIO have seek but no usable fileno - try: - # seek to end of file - o.seek(0, 2) - total_length = o.tell() - - # seek back to current position to support - # partially read file-like objects - o.seek(current_position or 0) - except OSError: - total_length = 0 +def _length_from_fileno(o): + if not hasattr(o, "fileno"): + return None + + try: + fileno = o.fileno() + except (io.UnsupportedOperation, AttributeError): + # AttributeError is a surprising exception, seeing as how we've just checked + # that `hasattr(o, 'fileno')`. It happens for objects obtained via + # `Tarfile.extractfile()`, per issue 5229. + return None + + total_length = os.fstat(fileno).st_size + + # Having used fstat to determine the file length, we need to + # confirm that this file was opened up in binary mode. + if "b" not in o.mode: + warnings.warn( + ( + "Requests has determined the content-length for this " + "request using the binary size of the file: however, the " + "file has been opened in text mode (i.e. without the 'b' " + "flag in the mode). This may lead to an incorrect " + "content-length. In Requests 3.0, support will be removed " + "for files in text mode." + ), + FileModeWarning, + ) + return total_length + + +def _current_position_and_total_length(o, total_length): + current_position = 0 + if not hasattr(o, "tell"): + return current_position, total_length + + try: + current_position = o.tell() + except OSError: + # This can happen in some weird situations, such as when the file + # is actually a special file descriptor like stdin. In this + # instance, we don't know what the length is, so set it to zero and + # let requests chunk it instead. + if total_length is not None: + current_position = total_length + return current_position, total_length + + if not hasattr(o, "seek") or total_length is not None: + return current_position, total_length + + # StringIO and BytesIO have seek but no usable fileno + try: + # seek to end of file + o.seek(0, 2) + total_length = o.tell() + + # seek back to current position to support + # partially read file-like objects + o.seek(current_position or 0) + except OSError: + total_length = 0 + + return current_position, total_length + + +def super_len(o): + total_length = None + + o = _coerce_body_to_bytes_if_needed(o) + + total_length = _length_from_attr(o) + + if total_length is None: + total_length = _length_from_fileno(o) + + current_position, total_length = _current_position_and_total_length( + o, total_length + ) if total_length is None: total_length = 0 @@ -203,54 +254,72 @@ def super_len(o): return max(0, total_length - current_position) -def get_netrc_auth(url, raise_errors=False): - """Returns the Requests tuple auth for a given url from netrc.""" - +def _iter_netrc_locations(): netrc_file = os.environ.get("NETRC") if netrc_file is not None: - netrc_locations = (netrc_file,) - else: - netrc_locations = (f"~/{f}" for f in NETRC_FILES) + return (netrc_file,) + return (f"~/{f}" for f in NETRC_FILES) + +def _find_existing_netrc_path(): + netrc_locations = _iter_netrc_locations() + + for candidate in netrc_locations: + loc = os.path.expanduser(candidate) + if os.path.exists(loc): + return loc + return None + + +def _load_netrc_auth(netrc_path, host, raise_errors): try: from netrc import NetrcParseError, netrc - netrc_path = None + _netrc = netrc(netrc_path).authenticators(host) + if _netrc and any(_netrc): + # Return with login / password + login_i = 0 if _netrc[0] else 1 + return (_netrc[login_i], _netrc[2]) + except (NetrcParseError, OSError): + # If there was a parsing error or a permissions issue reading the file, + # we'll just skip netrc auth unless explicitly asked to raise errors. + if raise_errors: + raise + return None - for f in netrc_locations: - loc = os.path.expanduser(f) - if os.path.exists(loc): - netrc_path = loc - break - # Abort early if there isn't one. - if netrc_path is None: - return +def get_netrc_auth(url, raise_errors=False): + """Returns the Requests tuple auth for a given url from netrc.""" + + netrc_path = _find_existing_netrc_path() - ri = urlparse(url) - host = ri.hostname + # Abort early if there isn't one. + if netrc_path is None: + return - try: - _netrc = netrc(netrc_path).authenticators(host) - if _netrc and any(_netrc): - # Return with login / password - login_i = 0 if _netrc[0] else 1 - return (_netrc[login_i], _netrc[2]) - except (NetrcParseError, OSError): - # If there was a parsing error or a permissions issue reading the file, - # we'll just skip netrc auth unless explicitly asked to raise errors. - if raise_errors: - raise + ri = urlparse(url) + host = ri.hostname + try: + return _load_netrc_auth(netrc_path, host, raise_errors) # App Engine hackiness. except (ImportError, AttributeError): - pass + return None + + +def _is_named_file(name): + return ( + name + and isinstance(name, basestring) + and name[0] != "<" + and name[-1] != ">" + ) def guess_filename(obj): """Tries to guess the filename of the given object.""" name = getattr(obj, "name", None) - if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": + if _is_named_file(name): return os.path.basename(name) @@ -628,22 +697,24 @@ def unquote_unreserved(uri): """ parts = uri.split("%") for i in range(1, len(parts)): - h = parts[i][0:2] - if len(h) == 2 and h.isalnum(): - try: - c = chr(int(h, 16)) - except ValueError: - raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") - - if c in UNRESERVED_SET: - parts[i] = c + parts[i][2:] - else: - parts[i] = f"%{parts[i]}" - else: - parts[i] = f"%{parts[i]}" + parts[i] = _unquote_unreserved_part(parts[i]) return "".join(parts) +def _unquote_unreserved_part(part): + h = part[0:2] + if len(h) == 2 and h.isalnum(): + try: + c = chr(int(h, 16)) + except ValueError: + raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") + + if c in UNRESERVED_SET: + return c + part[2:] + return f"%{part}" + return f"%{part}" + + def requote_uri(uri): """Re-quote the given URI. @@ -749,6 +820,56 @@ def set_environ(env_name, value): os.environ[env_name] = old_value +def _get_proxy_environ_value(key): + """Return the proxy environment variable value for *key*.""" + return os.environ.get(key) or os.environ.get(key.upper()) + + +def _parse_no_proxy(no_proxy): + """Yield host entries from a no_proxy string, skipping empty values.""" + return (host for host in no_proxy.replace(" ", "").split(",") if host) + + +def _is_ip_in_no_proxy(ip, no_proxy_hosts): + """Check if an IPv4 address is covered by a no_proxy configuration.""" + for proxy_ip in no_proxy_hosts: + if is_valid_cidr(proxy_ip): + if address_in_network(ip, proxy_ip): + return True + elif ip == proxy_ip: + # If no_proxy ip was defined in plain IP notation instead of cidr notation & + # matches the IP of the index + return True + return False + + +def _is_hostname_in_no_proxy(parsed, no_proxy_hosts): + """Check if a hostname (with optional port) is covered by no_proxy.""" + host_with_port = parsed.hostname + if parsed.port: + host_with_port += f":{parsed.port}" + + for host in no_proxy_hosts: + if parsed.hostname.endswith(host) or host_with_port.endswith(host): + # The URL does match something in no_proxy, so we don't want + # to apply the proxies on this URL. + return True + return False + + +def _should_bypass_proxies_with_no_proxy(parsed, no_proxy): + """Return True if *no_proxy* rules indicate proxies should be bypassed.""" + if not no_proxy: + return False + + no_proxy_hosts = _parse_no_proxy(no_proxy) + + if is_ipv4_address(parsed.hostname): + return _is_ip_in_no_proxy(parsed.hostname, no_proxy_hosts) + + return _is_hostname_in_no_proxy(parsed, no_proxy_hosts) + + def should_bypass_proxies(url, no_proxy): """ Returns whether we should bypass proxies or not. @@ -756,46 +877,19 @@ def should_bypass_proxies(url, no_proxy): :rtype: bool """ - # Prioritize lowercase environment variables over uppercase - # to keep a consistent behaviour with other http projects (curl, wget). - def get_proxy(key): - return os.environ.get(key) or os.environ.get(key.upper()) - # First check whether no_proxy is defined. If it is, check that the URL # we're getting isn't in the no_proxy list. no_proxy_arg = no_proxy if no_proxy is None: - no_proxy = get_proxy("no_proxy") + no_proxy = _get_proxy_environ_value("no_proxy") parsed = urlparse(url) if parsed.hostname is None: # URLs don't always have hostnames, e.g. file:/// urls. return True - if no_proxy: - # We need to check whether we match here. We need to see if we match - # the end of the hostname, both with and without the port. - no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) - - if is_ipv4_address(parsed.hostname): - for proxy_ip in no_proxy: - if is_valid_cidr(proxy_ip): - if address_in_network(parsed.hostname, proxy_ip): - return True - elif parsed.hostname == proxy_ip: - # If no_proxy ip was defined in plain IP notation instead of cidr notation & - # matches the IP of the index - return True - else: - host_with_port = parsed.hostname - if parsed.port: - host_with_port += f":{parsed.port}" - - for host in no_proxy: - if parsed.hostname.endswith(host) or host_with_port.endswith(host): - # The URL does match something in no_proxy, so we don't want - # to apply the proxies on this URL. - return True + if _should_bypass_proxies_with_no_proxy(parsed, no_proxy): + return True with set_environ("no_proxy", no_proxy_arg): # parsed.hostname can be `None` in cases such as a file URI. @@ -898,38 +992,46 @@ def default_headers(): ) -def parse_header_links(value): - """Return a list of parsed link headers proxies. - - i.e. Link: ; rel=front; type="image/jpeg",; rel=back;type="image/jpeg" - - :rtype: list - """ +def _split_link_value(value, replace_chars): + value = value.strip(replace_chars) + if not value: + return [] + return re.split(", *<", value) - links = [] - replace_chars = " '\"" +def _parse_single_link(val, replace_chars): + try: + url, params = val.split(";", 1) + except ValueError: + url, params = val, "" - value = value.strip(replace_chars) - if not value: - return links + link = {"url": url.strip("<> '\"")} - for val in re.split(", *<", value): + for param in params.split(";"): try: - url, params = val.split(";", 1) + key, value = param.split("=") except ValueError: - url, params = val, "" + break - link = {"url": url.strip("<> '\"")} + link[key.strip(replace_chars)] = value.strip(replace_chars) - for param in params.split(";"): - try: - key, value = param.split("=") - except ValueError: - break + return link - link[key.strip(replace_chars)] = value.strip(replace_chars) +def parse_header_links(value): + """Return a list of parsed link headers proxies. + + i.e. Link: ; rel=front; type="image/jpeg",; rel=back;type="image/jpeg" + + :rtype: list + """ + + replace_chars = " '\"" + parts = _split_link_value(value, replace_chars) + + links = [] + for val in parts: + link = _parse_single_link(val, replace_chars) links.append(link) return links @@ -941,38 +1043,59 @@ def parse_header_links(value): _null3 = _null * 3 -def guess_json_utf(data): - """ - :rtype: str - """ - # JSON always starts with two ASCII characters, so detection is as - # easy as counting the nulls and from their location and count - # determine the encoding. Also detect a BOM, if present. - sample = data[:4] +def _detect_bom_encoding(sample): if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): return "utf-32" # BOM included if sample[:3] == codecs.BOM_UTF8: return "utf-8-sig" # BOM included, MS style (discouraged) if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): return "utf-16" # BOM included - nullcount = sample.count(_null) + return None + + +def _guess_utf16_encoding(sample): + if sample[::2] == _null2: # 1st and 3rd are null + return "utf-16-be" + if sample[1::2] == _null2: # 2nd and 4th are null + return "utf-16-le" + return None + + +def _guess_utf32_encoding(sample): + if sample[:3] == _null3: + return "utf-32-be" + if sample[1:] == _null3: + return "utf-32-le" + return None + + +def _guess_encoding_from_nulls(sample, nullcount): if nullcount == 0: return "utf-8" if nullcount == 2: - if sample[::2] == _null2: # 1st and 3rd are null - return "utf-16-be" - if sample[1::2] == _null2: # 2nd and 4th are null - return "utf-16-le" - # Did not detect 2 valid UTF-16 ascii-range characters + return _guess_utf16_encoding(sample) if nullcount == 3: - if sample[:3] == _null3: - return "utf-32-be" - if sample[1:] == _null3: - return "utf-32-le" - # Did not detect a valid UTF-32 ascii-range character + return _guess_utf32_encoding(sample) return None +def guess_json_utf(data): + """ + :rtype: str + """ + # JSON always starts with two ASCII characters, so detection is as + # easy as counting the nulls and from their location and count + # determine the encoding. Also detect a BOM, if present. + sample = data[:4] + + bom_encoding = _detect_bom_encoding(sample) + if bom_encoding is not None: + return bom_encoding + + nullcount = sample.count(_null) + return _guess_encoding_from_nulls(sample, nullcount) + + def prepend_scheme_if_needed(url, new_scheme): """Given a URL that may or may not have a scheme, prepend the given scheme. Does not replace a present scheme with the one provided as an argument.