diff --git a/extra/beep/beep.py b/extra/beep/beep.py index ad932834021..681b3e006e6 100644 --- a/extra/beep/beep.py +++ b/extra/beep/beep.py @@ -38,7 +38,7 @@ def _speaker_beep(): # Reference: https://lists.gnu.org/archive/html/emacs-devel/2014-09/msg00815.html def _cygwin_beep(filename): - os.system("play-sound-file '%s' 2>/dev/null" % filename) + os.system(f"play-sound-file '{filename}' 2>/dev/null") def _mac_beep(): import Carbon.Snd @@ -51,7 +51,7 @@ def _win_wav_play(filename): def _linux_wav_play(filename): for _ in ("aplay", "paplay", "play"): - if not os.system("%s '%s' 2>/dev/null" % (_, filename)): + if not os.system(f"{_} '{filename}' 2>/dev/null"): return import ctypes @@ -79,7 +79,9 @@ class struct_pa_sample_spec(ctypes.Structure): pa_stream = pa.pa_simple_new(None, filename, PA_STREAM_PLAYBACK, None, "playback", ctypes.byref(pa_sample_spec), None, None, ctypes.byref(error)) if not pa_stream: - raise Exception("Could not create pulse audio stream: %s" % pa.strerror(ctypes.byref(error))) + raise Exception( + f"Could not create pulse audio stream: {pa.strerror(ctypes.byref(error))}" + ) while True: latency = pa.pa_simple_get_latency(pa_stream, ctypes.byref(error)) diff --git a/extra/cloak/cloak.py b/extra/cloak/cloak.py index b9f8f8f0f6f..d72cb272565 100644 --- a/extra/cloak/cloak.py +++ b/extra/cloak/cloak.py @@ -49,7 +49,7 @@ def decloak(inputFile=None, data=None): return data def main(): - usage = '%s [-d] -i [-o ]' % sys.argv[0] + usage = f'{sys.argv[0]} [-d] -i [-o ]' parser = OptionParser(usage=usage, version='0.2') try: @@ -69,20 +69,11 @@ def main(): print('ERROR: the provided input file \'%s\' is non existent' % args.inputFile) sys.exit(1) - if not args.decrypt: - data = cloak(args.inputFile) - else: - data = decloak(args.inputFile) - + data = cloak(args.inputFile) if not args.decrypt else decloak(args.inputFile) if not args.outputFile: - if not args.decrypt: - args.outputFile = args.inputFile + '_' - else: - args.outputFile = args.inputFile[:-1] - - f = open(args.outputFile, 'wb') - f.write(data) - f.close() + args.outputFile = args.inputFile[:-1] if args.decrypt else f'{args.inputFile}_' + with open(args.outputFile, 'wb') as f: + f.write(data) if __name__ == '__main__': main() diff --git a/extra/dbgtool/dbgtool.py b/extra/dbgtool/dbgtool.py index c8e0c97339c..8164fbef793 100644 --- a/extra/dbgtool/dbgtool.py +++ b/extra/dbgtool/dbgtool.py @@ -20,19 +20,18 @@ def convert(inputFile): fileSize = fileStat.st_size if fileSize > 65280: - print("ERROR: the provided input file '%s' is too big for debug.exe" % inputFile) + print(f"ERROR: the provided input file '{inputFile}' is too big for debug.exe") sys.exit(1) script = "n %s\nr cx\n" % os.path.basename(inputFile.replace(".", "_")) script += "%x\nf 0100 ffff 00\n" % fileSize scrString = "" - counter = 256 counter2 = 0 fp = open(inputFile, "rb") fileContent = fp.read() - for fileChar in fileContent: + for counter, fileChar in enumerate(fileContent, start=256): unsignedFileChar = fileChar if sys.version_info >= (3, 0) else ord(fileChar) if unsignedFileChar != 0: @@ -47,8 +46,6 @@ def convert(inputFile): scrString = "" counter2 = 0 - counter += 1 - if counter2 == 20: script += "%s\n" % scrString scrString = "" @@ -60,7 +57,7 @@ def convert(inputFile): def main(inputFile, outputFile): if not os.path.isfile(inputFile): - print("ERROR: the provided input file '%s' is not a regular file" % inputFile) + print(f"ERROR: the provided input file '{inputFile}' is not a regular file") sys.exit(1) script = convert(inputFile) @@ -74,7 +71,7 @@ def main(inputFile, outputFile): print(script) if __name__ == "__main__": - usage = "%s -i [-o ]" % sys.argv[0] + usage = f"{sys.argv[0]} -i [-o ]" parser = OptionParser(usage=usage, version="0.1") try: diff --git a/extra/icmpsh/icmpsh_m.py b/extra/icmpsh/icmpsh_m.py index 17370fdc001..77f6fa5855f 100644 --- a/extra/icmpsh/icmpsh_m.py +++ b/extra/icmpsh/icmpsh_m.py @@ -83,7 +83,7 @@ def main(src, dst): if sock in select.select([sock], [], [])[0]: buff = sock.recv(4096) - if 0 == len(buff): + if len(buff) == 0: # Socket remotely closed sock.close() sys.exit(0) @@ -93,7 +93,11 @@ def main(src, dst): icmppacket = ippacket.child() # If the packet matches, report it to the user - if ippacket.get_ip_dst() == src and ippacket.get_ip_src() == dst and 8 == icmppacket.get_icmp_type(): + if ( + ippacket.get_ip_dst() == src + and ippacket.get_ip_src() == dst + and icmppacket.get_icmp_type() == 8 + ): # Get identifier and sequence number ident = icmppacket.get_icmp_id() seq_id = icmppacket.get_icmp_seq() @@ -136,8 +140,10 @@ def main(src, dst): if __name__ == '__main__': if len(sys.argv) < 3: - msg = 'missing mandatory options. Execute as root:\n' - msg += './icmpsh-m.py \n' + msg = ( + 'missing mandatory options. Execute as root:\n' + + './icmpsh-m.py \n' + ) sys.stderr.write(msg) sys.exit(1) diff --git a/extra/shutils/duplicates.py b/extra/shutils/duplicates.py index 0278b85dc3b..7f107a05f4e 100755 --- a/extra/shutils/duplicates.py +++ b/extra/shutils/duplicates.py @@ -11,7 +11,7 @@ if __name__ == "__main__": if len(sys.argv) > 1: - items = list() + items = [] with open(sys.argv[1], 'r') as f: for item in f: diff --git a/extra/shutils/newlines.py b/extra/shutils/newlines.py index fe28a35ba99..9f698f7a58b 100644 --- a/extra/shutils/newlines.py +++ b/extra/shutils/newlines.py @@ -21,7 +21,7 @@ def check(filepath): print("no directory specified, defaulting to current working directory") BASE_DIRECTORY = os.getcwd() - print("looking for *.py scripts in subdirectories of '%s'" % BASE_DIRECTORY) + print(f"looking for *.py scripts in subdirectories of '{BASE_DIRECTORY}'") for root, dirs, files in os.walk(BASE_DIRECTORY): if any(_ in root for _ in ("extra", "thirdparty")): continue diff --git a/extra/vulnserver/vulnserver.py b/extra/vulnserver/vulnserver.py index 76f9c23762a..8f23e6aaf83 100644 --- a/extra/vulnserver/vulnserver.py +++ b/extra/vulnserver/vulnserver.py @@ -110,7 +110,19 @@ def do_REQUEST(self): if self.data.startswith('{') and self.data.endswith('}'): params.update(json.loads(self.data)) elif self.data.startswith('<') and self.data.endswith('>'): - params.update(dict((_[0], _[1].replace("'", "'").replace(""", '"').replace("<", '<').replace(">", '>').replace("&", '&')) for _ in re.findall(r'name="([^"]+)" value="([^"]*)"', self.data))) + params.update( + { + _[0]: _[1] + .replace("'", "'") + .replace(""", '"') + .replace("<", '<') + .replace(">", '>') + .replace("&", '&') + for _ in re.findall( + r'name="([^"]+)" value="([^"]*)"', self.data + ) + } + ) else: self.data = self.data.replace(';', '&') # Note: seems that Python3 started ignoring parameter splitting with ';' params.update(parse_qs(self.data)) @@ -125,16 +137,16 @@ def do_REQUEST(self): name, value = part.split('=', 1) params[name.strip()] = unquote_plus(value.strip()) - for key in params: - if params[key] and isinstance(params[key], (tuple, list)): + for key, value_ in params.items(): + if value_ and isinstance(params[key], (tuple, list)): params[key] = params[key][-1] self.url, self.params = path, params if self.url == '/': - if not any(_ in self.params for _ in ("id", "query")): + if all(_ not in self.params for _ in ("id", "query")): self.send_response(OK) - self.send_header("Content-type", "text/html; charset=%s" % UNICODE_ENCODING) + self.send_header("Content-type", f"text/html; charset={UNICODE_ENCODING}") self.send_header("Connection", "close") self.end_headers() self.wfile.write(b"vulnserver

GET:

link

POST:

ID:
") @@ -143,19 +155,25 @@ def do_REQUEST(self): try: if self.params.get("echo", ""): - output += "%s
" % self.params["echo"] + output += f'{self.params["echo"]}
' if self.params.get("reflect", ""): - output += "%s
" % self.params.get("id") + output += f'{self.params.get("id")}
' with _lock: if "query" in self.params: _cursor.execute(self.params["query"]) elif "id" in self.params: if "base64" in self.params: - _cursor.execute("SELECT * FROM users WHERE id=%s LIMIT 0, 1" % base64.b64decode("%s===" % self.params["id"], altchars=self.params.get("altchars")).decode()) + _cursor.execute( + "SELECT * FROM users WHERE id=%s LIMIT 0, 1" + % base64.b64decode( + f'{self.params["id"]}===', + altchars=self.params.get("altchars"), + ).decode() + ) else: - _cursor.execute("SELECT * FROM users WHERE id=%s LIMIT 0, 1" % self.params["id"]) + _cursor.execute(f'SELECT * FROM users WHERE id={self.params["id"]} LIMIT 0, 1') results = _cursor.fetchall() output += "SQL results:
\n" @@ -163,24 +181,23 @@ def do_REQUEST(self): if self.params.get("code", ""): if not results: code = INTERNAL_SERVER_ERROR - else: - if results: - output += "\n" + elif results: + output += "
\n" - for row in results: - output += "" - for value in row: - output += "" % value - output += "\n" + for row in results: + output += "" + for value in row: + output += f"" + output += "\n" - output += "
%s
{value}
\n" - else: - output += "no results found" + output += "\n" + else: + output += "no results found" output += "" except Exception as ex: code = INTERNAL_SERVER_ERROR - output = "%s: %s" % (re.search(r"'([^']+)'", str(type(ex))).group(1), ex) + output = f"""{re.search("'([^']+)'", str(type(ex))).group(1)}: {ex}""" self.send_response(code) @@ -208,8 +225,7 @@ def do_HEAD(self): self.do_REQUEST() def do_POST(self): - length = int(self.headers.get("Content-length", 0)) - if length: + if length := int(self.headers.get("Content-length", 0)): data = self.rfile.read(length) data = unquote_plus(data.decode(UNICODE_ENCODING, "ignore")) self.data = data @@ -221,12 +237,11 @@ def do_POST(self): line += self.rfile.read(1) if line.endswith(b'\n'): if count % 2 == 1: - current = line.rstrip(b"\r\n") - if not current: - break - else: + if current := line.rstrip(b"\r\n"): data += current + else: + break count += 1 line = b"" diff --git a/lib/controller/action.py b/lib/controller/action.py index 1aeb0bcc409..75e2df1a5b1 100644 --- a/lib/controller/action.py +++ b/lib/controller/action.py @@ -35,20 +35,21 @@ def action(): if not Backend.getDbms() or not conf.dbmsHandler: htmlParsed = Format.getErrorParsedDBMSes() - errMsg = "sqlmap was not able to fingerprint the " - errMsg += "back-end database management system" - + errMsg = ( + "sqlmap was not able to fingerprint the " + + "back-end database management system" + ) if htmlParsed: errMsg += ", but from the HTML error page it was " errMsg += "possible to determinate that the " - errMsg += "back-end DBMS is %s" % htmlParsed + errMsg += f"back-end DBMS is {htmlParsed}" if htmlParsed and htmlParsed.lower() in SUPPORTED_DBMS: errMsg += ". Do not specify the back-end DBMS manually, " errMsg += "sqlmap will fingerprint the DBMS for you" elif kb.nullConnection: errMsg += ". You can try to rerun without using optimization " - errMsg += "switch '%s'" % ("-o" if conf.optimize else "--null-connection") + errMsg += f"""switch '{"-o" if conf.optimize else "--null-connection"}'""" raise SqlmapUnsupportedDBMSException(errMsg) @@ -184,8 +185,7 @@ def action(): if conf.sqlQuery: for query in conf.sqlQuery.strip(';').split(';'): - query = query.strip() - if query: + if query := query.strip(): conf.dumper.sqlQuery(query, conf.dbmsHandler.sqlQuery(query)) if conf.sqlShell: diff --git a/lib/controller/controller.py b/lib/controller/controller.py index 8441279a954..df418d7da25 100644 --- a/lib/controller/controller.py +++ b/lib/controller/controller.py @@ -105,9 +105,10 @@ def _selectInjection(): kb.injection = kb.injections[0] elif len(points) > 1: - message = "there were multiple injection points, please select " - message += "the one to use for following injections:\n" - + message = ( + "there were multiple injection points, please select " + + "the one to use for following injections:\n" + ) points = [] for i in xrange(0, len(kb.injections)): @@ -121,7 +122,7 @@ def _selectInjection(): ptype = PAYLOAD.PARAMETER[ptype] if isinstance(ptype, int) else ptype message += "[%d] place: %s, parameter: " % (i, place) - message += "%s, type: %s" % (parameter, ptype) + message += f"{parameter}, type: {ptype}" if i == 0: message += " (default)" @@ -159,7 +160,7 @@ def _formatInjection(inj): if count == 1: title = title.replace("columns", "column") elif comment: - vector = "%s%s" % (vector, comment) + vector = f"{vector}{comment}" data += " Type: %s\n" % PAYLOAD.SQLINJECTION[stype] data += " Title: %s\n" % title data += " Payload: %s\n" % urldecode(payload, unsafe="&", spaceplus=(inj.place != PLACE.GET and kb.postSpaceToPlus)) @@ -181,34 +182,43 @@ def _showInjections(): conf.dumper.string("", {"url": conf.url, "query": conf.parameters.get(PLACE.GET), "data": conf.parameters.get(PLACE.POST)}, content_type=CONTENT_TYPE.TARGET) conf.dumper.string("", kb.injections, content_type=CONTENT_TYPE.TECHNIQUES) else: - data = "".join(set(_formatInjection(_) for _ in kb.injections)).rstrip("\n") + data = "".join({_formatInjection(_) for _ in kb.injections}).rstrip("\n") conf.dumper.string(header, data) if conf.tamper: - warnMsg = "changes made by tampering scripts are not " - warnMsg += "included in shown payload content(s)" + warnMsg = ( + "changes made by tampering scripts are not " + + "included in shown payload content(s)" + ) logger.warning(warnMsg) if conf.hpp: - warnMsg = "changes made by HTTP parameter pollution are not " - warnMsg += "included in shown payload content(s)" + warnMsg = ( + "changes made by HTTP parameter pollution are not " + + "included in shown payload content(s)" + ) logger.warning(warnMsg) def _randomFillBlankFields(value): retVal = value - if extractRegexResult(EMPTY_FORM_FIELDS_REGEX, value): + if extractRegexResult(EMPTY_FORM_FIELDS_REGEX, retVal): message = "do you want to fill blank fields with random values? [Y/n] " if readInput(message, default='Y', boolean=True): for match in re.finditer(EMPTY_FORM_FIELDS_REGEX, retVal): item = match.group("result") - if not any(_ in item for _ in IGNORE_PARAMETERS) and not re.search(ASP_NET_CONTROL_REGEX, item): + if all( + _ not in item for _ in IGNORE_PARAMETERS + ) and not re.search(ASP_NET_CONTROL_REGEX, item): newValue = randomStr() if not re.search(r"^id|id$", item, re.I) else randomInt() if item[-1] == DEFAULT_GET_POST_DELIMITER: - retVal = retVal.replace(item, "%s%s%s" % (item[:-1], newValue, DEFAULT_GET_POST_DELIMITER)) + retVal = retVal.replace( + item, + f"{item[:-1]}{newValue}{DEFAULT_GET_POST_DELIMITER}", + ) else: - retVal = retVal.replace(item, "%s%s" % (item, newValue)) + retVal = retVal.replace(item, f"{item}{newValue}") return retVal @@ -241,7 +251,7 @@ def _saveToResultsFile(): return results = {} - techniques = dict((_[1], _[0]) for _ in getPublicTypeMembers(PAYLOAD.TECHNIQUE)) + techniques = {_[1]: _[0] for _ in getPublicTypeMembers(PAYLOAD.TECHNIQUE)} for injection in kb.injections + kb.falsePositives: if injection.place is None or injection.parameter is None: @@ -256,12 +266,12 @@ def _saveToResultsFile(): try: for key, value in results.items(): place, parameter, notes = key - line = "%s,%s,%s,%s,%s%s" % (safeCSValue(kb.originalUrls.get(conf.url) or conf.url), place, parameter, "".join(techniques[_][0].upper() for _ in sorted(value)), notes, os.linesep) + line = f'{safeCSValue(kb.originalUrls.get(conf.url) or conf.url)},{place},{parameter},{"".join(techniques[_][0].upper() for _ in sorted(value))},{notes}{os.linesep}' conf.resultsFP.write(line) conf.resultsFP.flush() except IOError as ex: - errMsg = "unable to write to the results file '%s' ('%s'). " % (conf.resultsFile, getSafeExString(ex)) + errMsg = f"unable to write to the results file '{conf.resultsFile}' ('{getSafeExString(ex)}'). " raise SqlmapSystemException(errMsg) @stackedmethod diff --git a/lib/controller/handler.py b/lib/controller/handler.py index 1c4994e8484..b3a535b3443 100644 --- a/lib/controller/handler.py +++ b/lib/controller/handler.py @@ -174,13 +174,11 @@ def setHandler(): except Exception as ex: if exception: raise exception - else: - if not isinstance(ex, NameError): - raise - else: - msg = "support for direct connection to '%s' is not available. " % dbms - msg += "Please rerun with '--dependencies'" - raise SqlmapConnectionException(msg) + if not isinstance(ex, NameError): + raise + msg = f"support for direct connection to '{dbms}' is not available. " + msg += "Please rerun with '--dependencies'" + raise SqlmapConnectionException(msg) if conf.forceDbms == dbms or handler.checkDbms(): if kb.resolutionDbms: diff --git a/lib/core/agent.py b/lib/core/agent.py index 26be9b450af..055b1c06e1b 100644 --- a/lib/core/agent.py +++ b/lib/core/agent.py @@ -158,22 +158,19 @@ def payload(self, place=None, parameter=None, value=None, newValue=None, where=N match = re.search(r"\A[^ ]+", newValue) newValue = newValue[len(match.group() if match else ""):] _ = randomInt(2) - value = "%s%s AND %s LIKE %s" % (origValue, match.group() if match else "", _, _ + 1) + value = f'{origValue}{match.group() if match else ""} AND {_} LIKE {_ + 1}' elif conf.invalidBignum: value = randomInt(6) elif conf.invalidString: value = randomStr(6) else: - if newValue.startswith("-"): - value = "" - else: - value = "-%s" % randomInt() + value = "" if newValue.startswith("-") else f"-{randomInt()}" elif where == PAYLOAD.WHERE.REPLACE: value = "" else: value = origValue - newValue = "%s%s" % (value, newValue) + newValue = f"{value}{newValue}" newValue = self.cleanupPayload(newValue, origValue) or "" @@ -186,7 +183,7 @@ def payload(self, place=None, parameter=None, value=None, newValue=None, where=N newValue = self.adjustLateValues(newValue) # TODO: support for POST_HINT - newValue = "%s%s%s" % (BOUNDED_BASE64_MARKER, newValue, BOUNDED_BASE64_MARKER) + newValue = f"{BOUNDED_BASE64_MARKER}{newValue}{BOUNDED_BASE64_MARKER}" if parameter in kb.base64Originals: origValue = kb.base64Originals[parameter] @@ -194,12 +191,17 @@ def payload(self, place=None, parameter=None, value=None, newValue=None, where=N origValue = encodeBase64(origValue, binary=False, encoding=conf.encoding or UNICODE_ENCODING) if place in (PLACE.URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER): - _ = "%s%s" % (origValue, kb.customInjectionMark) - - if kb.postHint == POST_HINT.JSON and isNumber(origValue) and not isNumber(newValue) and '"%s"' % _ not in paramString: - newValue = '"%s"' % self.addPayloadDelimiters(newValue) + _ = f"{origValue}{kb.customInjectionMark}" + + if ( + kb.postHint == POST_HINT.JSON + and isNumber(origValue) + and not isNumber(newValue) + and f'"{_}"' not in paramString + ): + newValue = f'"{self.addPayloadDelimiters(newValue)}"' elif kb.postHint == POST_HINT.JSON_LIKE and isNumber(origValue) and not isNumber(newValue) and re.search(r"['\"]%s['\"]" % re.escape(_), paramString) is None: - newValue = "'%s'" % self.addPayloadDelimiters(newValue) + newValue = f"'{self.addPayloadDelimiters(newValue)}'" else: newValue = self.addPayloadDelimiters(newValue) @@ -210,12 +212,17 @@ def payload(self, place=None, parameter=None, value=None, newValue=None, where=N retVal = retVal.replace(kb.customInjectionMark, "").replace(REPLACEMENT_MARKER, kb.customInjectionMark) elif BOUNDED_INJECTION_MARKER in paramDict[parameter]: if base64Encoding: - retVal = paramString.replace("%s%s" % (_origValue, BOUNDED_INJECTION_MARKER), _newValue) + retVal = paramString.replace( + f"{_origValue}{BOUNDED_INJECTION_MARKER}", _newValue + ) match = re.search(r"(%s)=([^&]*)" % re.sub(r" \(.+", "", parameter), retVal) if match: retVal = retVal.replace(match.group(0), "%s=%s" % (match.group(1), encodeBase64(match.group(2), binary=False, encoding=conf.encoding or UNICODE_ENCODING))) else: - retVal = paramString.replace("%s%s" % (origValue, BOUNDED_INJECTION_MARKER), self.addPayloadDelimiters(newValue)) + retVal = paramString.replace( + f"{origValue}{BOUNDED_INJECTION_MARKER}", + self.addPayloadDelimiters(newValue), + ) elif place in (PLACE.USER_AGENT, PLACE.REFERER, PLACE.HOST): retVal = paramString.replace(origValue, self.addPayloadDelimiters(newValue)) else: @@ -241,12 +248,21 @@ def _(pattern, repl, string): if origValue: regex = r"(\A|\b)%s=%s%s" % (re.escape(parameter), re.escape(origValue), r"(\Z|\b)" if origValue[-1].isalnum() else "") - retVal = _(regex, "%s=%s" % (parameter, self.addPayloadDelimiters(newValue)), paramString) + retVal = _( + regex, + f"{parameter}={self.addPayloadDelimiters(newValue)}", + paramString, + ) else: retVal = _(r"(\A|\b)%s=%s(\Z|%s|%s|\s)" % (re.escape(parameter), re.escape(origValue), DEFAULT_GET_POST_DELIMITER, DEFAULT_COOKIE_DELIMITER), r"%s=%s\g<2>" % (parameter, self.addPayloadDelimiters(newValue)), paramString) if retVal == paramString and urlencode(parameter) != parameter: - retVal = _(r"(\A|\b)%s=%s" % (re.escape(urlencode(parameter)), re.escape(origValue)), "%s=%s" % (urlencode(parameter), self.addPayloadDelimiters(newValue)), paramString) + retVal = _( + r"(\A|\b)%s=%s" + % (re.escape(urlencode(parameter)), re.escape(origValue)), + f"{urlencode(parameter)}={self.addPayloadDelimiters(newValue)}", + paramString, + ) if retVal: retVal = retVal.replace(BOUNDARY_BACKSLASH_MARKER, '\\') @@ -278,16 +294,13 @@ def prefixQuery(self, expression, prefix=None, where=None, clause=None): if where == PAYLOAD.WHERE.REPLACE and not conf.prefix: # Note: https://github.com/sqlmapproject/sqlmap/issues/4030 query = "" - # If the technique is stacked queries () do not put a space - # after the prefix or it is in GROUP BY / ORDER BY () elif getTechnique() == PAYLOAD.TECHNIQUE.STACKED: query = kb.injection.prefix - elif kb.injection.clause == [2, 3] or kb.injection.clause == [2] or kb.injection.clause == [3]: + elif kb.injection.clause in [[2, 3], [2], [3]]: query = kb.injection.prefix - elif clause == [2, 3] or clause == [2] or clause == [3]: + elif clause in [[2, 3], [2], [3]]: query = prefix - # In any other case prepend with the full prefix else: query = kb.injection.prefix or prefix or "" @@ -370,7 +383,12 @@ def cleanupPayload(self, payload, origValue=None): origValue = getUnicode(origValue) if "[ORIGVALUE]" in payload: - payload = getUnicode(payload).replace("[ORIGVALUE]", origValue if origValue.isdigit() else unescaper.escape("'%s'" % origValue)) + payload = getUnicode(payload).replace( + "[ORIGVALUE]", + origValue + if origValue.isdigit() + else unescaper.escape(f"'{origValue}'"), + ) if "[ORIGINAL]" in payload: payload = getUnicode(payload).replace("[ORIGINAL]", origValue) @@ -378,19 +396,21 @@ def cleanupPayload(self, payload, origValue=None): if Backend.getIdentifiedDbms() is not None: inference = queries[Backend.getIdentifiedDbms()].inference - if "dbms_version" in inference: - if isDBMSVersionAtLeast(inference.dbms_version): - inferenceQuery = inference.query - else: - inferenceQuery = inference.query2 - else: + if ( + "dbms_version" in inference + and isDBMSVersionAtLeast(inference.dbms_version) + or "dbms_version" not in inference + ): inferenceQuery = inference.query - + else: + inferenceQuery = inference.query2 payload = payload.replace(INFERENCE_MARKER, inferenceQuery) elif not kb.testMode: - errMsg = "invalid usage of inference payload without " - errMsg += "knowledge of underlying DBMS" + errMsg = ( + "invalid usage of inference payload without " + + "knowledge of underlying DBMS" + ) raise SqlmapNoneDataException(errMsg) return payload diff --git a/lib/core/bigarray.py b/lib/core/bigarray.py index 3cccd2d1ec6..e7701b3878a 100644 --- a/lib/core/bigarray.py +++ b/lib/core/bigarray.py @@ -118,17 +118,16 @@ def pop(self): self.chunks[-1] = pickle.loads(zlib.decompress(f.read())) except IOError as ex: errMsg = "exception occurred while retrieving data " - errMsg += "from a temporary file ('%s')" % ex + errMsg += f"from a temporary file ('{ex}')" raise SqlmapSystemException(errMsg) return self.chunks[-1].pop() def index(self, value): - for index in xrange(len(self)): - if self[index] == value: - return index - - return ValueError, "%s is not in list" % value + return next( + (index for index in xrange(len(self)) if self[index] == value), + (ValueError, f"{value} is not in list"), + ) def _dump(self, chunk): try: @@ -140,7 +139,7 @@ def _dump(self, chunk): return filename except (OSError, IOError) as ex: errMsg = "exception occurred while storing data " - errMsg += "to a temporary file ('%s'). Please " % ex + errMsg += f"to a temporary file ('{ex}'). Please " errMsg += "make sure that there is enough disk space left. If problem persists, " errMsg += "try to set environment variable 'TEMP' to a location " errMsg += "writeable by the current user" @@ -157,7 +156,7 @@ def _checkcache(self, index): self.cache = Cache(index, pickle.loads(zlib.decompress(f.read())), False) except Exception as ex: errMsg = "exception occurred while retrieving data " - errMsg += "from a temporary file ('%s')" % ex + errMsg += f"from a temporary file ('{ex}')" raise SqlmapSystemException(errMsg) def __getstate__(self): @@ -177,9 +176,8 @@ def __getitem__(self, y): if isinstance(chunk, list): return chunk[offset] - else: - self._checkcache(index) - return self.cache.data[offset] + self._checkcache(index) + return self.cache.data[offset] def __setitem__(self, y, value): index = y // self.chunk_length @@ -194,7 +192,7 @@ def __setitem__(self, y, value): self.cache.dirty = True def __repr__(self): - return "%s%s" % ("..." if len(self.chunks) > 1 else "", self.chunks[-1].__repr__()) + return f'{"..." if len(self.chunks) > 1 else ""}{self.chunks[-1].__repr__()}' def __iter__(self): for i in xrange(len(self)): diff --git a/lib/core/common.py b/lib/core/common.py index 7dd6dbb6f59..ed34810b9f0 100644 --- a/lib/core/common.py +++ b/lib/core/common.py @@ -217,7 +217,7 @@ def write(self, fp): fp.write("[%s]\n" % _configparser.DEFAULTSECT) for (key, value) in self._defaults.items(): - fp.write("%s = %s" % (key, getUnicode(value, UNICODE_ENCODING))) + fp.write(f"{key} = {getUnicode(value, UNICODE_ENCODING)}") fp.write("\n") @@ -252,7 +252,11 @@ def getDbms(versions=None): if versions is None and Backend.getVersionList(): versions = Backend.getVersionList() - return Backend.getDbms() if versions is None else "%s %s" % (Backend.getDbms(), " and ".join(filterNone(versions))) + return ( + Backend.getDbms() + if versions is None + else f'{Backend.getDbms()} {" and ".join(filterNone(versions))}' + ) @staticmethod def getErrorParsedDBMSes(): @@ -312,21 +316,21 @@ def getOs(target, info): if info and "type" in info: if conf.api: - infoApi["%s operating system" % target] = info + infoApi[f"{target} operating system"] = info else: - infoStr += "%s operating system: %s" % (target, Format.humanize(info["type"])) + infoStr += f'{target} operating system: {Format.humanize(info["type"])}' if "distrib" in info: - infoStr += " %s" % Format.humanize(info["distrib"]) + infoStr += f' {Format.humanize(info["distrib"])}' if "release" in info: - infoStr += " %s" % Format.humanize(info["release"]) + infoStr += f' {Format.humanize(info["release"])}' if "sp" in info: - infoStr += " %s" % Format.humanize(info["sp"]) + infoStr += f' {Format.humanize(info["sp"])}' if "codename" in info: - infoStr += " (%s)" % Format.humanize(info["codename"]) + infoStr += f' ({Format.humanize(info["codename"])})' if "technology" in info: if conf.api: @@ -334,10 +338,7 @@ def getOs(target, info): else: infoStr += "\nweb application technology: %s" % Format.humanize(info["technology"], ", ") - if conf.api: - return infoApi - else: - return infoStr.lstrip() + return infoApi if conf.api else infoStr.lstrip() class Backend(object): @staticmethod @@ -347,17 +348,18 @@ def setDbms(dbms): if dbms is None: return None - # Little precaution, in theory this condition should always be false elif kb.dbms is not None and kb.dbms != dbms: - warnMsg = "there appears to be a high probability that " - warnMsg += "this could be a false positive case" + warnMsg = ( + "there appears to be a high probability that " + + "this could be a false positive case" + ) logger.warning(warnMsg) msg = "sqlmap previously fingerprinted back-end DBMS as " - msg += "%s. However now it has been fingerprinted " % kb.dbms - msg += "as %s. " % dbms + msg += f"{kb.dbms}. However now it has been fingerprinted " + msg += f"as {dbms}. " msg += "Please, specify which DBMS should be " - msg += "correct [%s (default)/%s] " % (kb.dbms, dbms) + msg += f"correct [{kb.dbms} (default)/{dbms}] " while True: choice = readInput(msg, default=kb.dbms) @@ -411,13 +413,12 @@ def setOs(os): if os is None: return None - # Little precaution, in theory this condition should always be false elif kb.os is not None and isinstance(os, six.string_types) and kb.os.lower() != os.lower(): msg = "sqlmap previously fingerprinted back-end DBMS " - msg += "operating system %s. However now it has " % kb.os - msg += "been fingerprinted to be %s. " % os + msg += f"operating system {kb.os}. However now it has " + msg += f"been fingerprinted to be {os}. " msg += "Please, specify which OS is " - msg += "correct [%s (default)/%s] " % (kb.os, os) + msg += f"correct [{kb.os} (default)/{os}] " while True: choice = readInput(msg, default=kb.os) @@ -454,14 +455,20 @@ def setOsServicePack(sp): @staticmethod def setArch(): - msg = "what is the back-end database management system architecture?" - msg += "\n[1] 32-bit (default)" + msg = ( + "what is the back-end database management system architecture?" + + "\n[1] 32-bit (default)" + ) msg += "\n[2] 64-bit" while True: choice = readInput(msg, default='1') - if hasattr(choice, "isdigit") and choice.isdigit() and int(choice) in (1, 2): + if ( + hasattr(choice, "isdigit") + and choice.isdigit() + and int(choice) in {1, 2} + ): kb.arch = 32 if int(choice) == 1 else 64 break else: @@ -525,18 +532,12 @@ def getIdentifiedDbms(): @staticmethod def getVersion(): versions = filterNone(flattenValue(kb.dbmsVersion)) if not isinstance(kb.dbmsVersion, six.string_types) else [kb.dbmsVersion] - if not isNoneValue(versions): - return versions[0] - else: - return None + return versions[0] if not isNoneValue(versions) else None @staticmethod def getVersionList(): versions = filterNone(flattenValue(kb.dbmsVersion)) if not isinstance(kb.dbmsVersion, six.string_types) else [kb.dbmsVersion] - if not isNoneValue(versions): - return versions - else: - return None + return versions if not isNoneValue(versions) else None @staticmethod def getOs(): @@ -560,7 +561,9 @@ def getArch(): @staticmethod def isDbms(dbms): if not kb.get("testMode") and all((Backend.getDbms(), Backend.getIdentifiedDbms())) and Backend.getDbms() != Backend.getIdentifiedDbms(): - singleTimeWarnMessage("identified ('%s') and fingerprinted ('%s') DBMSes differ. If you experience problems in enumeration phase please rerun with '--flush-session'" % (Backend.getIdentifiedDbms(), Backend.getDbms())) + singleTimeWarnMessage( + f"identified ('{Backend.getIdentifiedDbms()}') and fingerprinted ('{Backend.getDbms()}') DBMSes differ. If you experience problems in enumeration phase please rerun with '--flush-session'" + ) return Backend.getIdentifiedDbms() == aliasToDbmsEnum(dbms) @staticmethod @@ -580,11 +583,10 @@ def isVersionWithin(versionList): if Backend.getVersionList() is None: return False - for _ in Backend.getVersionList(): - if _ != UNKNOWN_DBMS_VERSION and _ in versionList: - return True - - return False + return any( + _ != UNKNOWN_DBMS_VERSION and _ in versionList + for _ in Backend.getVersionList() + ) @staticmethod def isVersionGreaterOrEqualThan(version): diff --git a/lib/core/compat.py b/lib/core/compat.py index 851e57eb87d..be04f378685 100644 --- a/lib/core/compat.py +++ b/lib/core/compat.py @@ -298,9 +298,7 @@ def LooseVersion(version): 8.000022 """ - match = re.search(r"\A(\d[\d.]*)", version or "") - - if match: + if match := re.search(r"\A(\d[\d.]*)", version or ""): result = 0 value = match.group(1) weight = 1.0 diff --git a/lib/core/convert.py b/lib/core/convert.py index c6f86aa1fe1..5dc56e6440e 100644 --- a/lib/core/convert.py +++ b/lib/core/convert.py @@ -51,7 +51,7 @@ def base64pickle(value): retVal = encodeBase64(pickle.dumps(value, PICKLE_PROTOCOL), binary=False) except: warnMsg = "problem occurred while serializing " - warnMsg += "instance of a type '%s'" % type(value) + warnMsg += f"instance of a type '{type(value)}'" singleTimeWarnMessage(warnMsg) try: @@ -360,7 +360,7 @@ def getUnicode(value, encoding=None, noneToNull=False): except UnicodeDecodeError: return six.text_type(value, UNICODE_ENCODING, errors="reversible") elif isListLike(value): - value = list(getUnicode(_, encoding, noneToNull) for _ in value) + value = [getUnicode(_, encoding, noneToNull) for _ in value] return value else: try: @@ -400,11 +400,9 @@ def stdoutEncode(value): if IS_WIN and IS_TTY and kb.get("codePage", -1) is None: output = shellExec("chcp") - match = re.search(r": (\d{3,})", output or "") - - if match: + if match := re.search(r": (\d{3,})", output or ""): try: - candidate = "cp%s" % match.group(1) + candidate = f"cp{match.group(1)}" codecs.lookup(candidate) except LookupError: pass @@ -424,7 +422,7 @@ def stdoutEncode(value): value = value[:ex.start] + "?" * (ex.end - ex.start) + value[ex.end:] warnMsg = "cannot properly display (some) Unicode characters " - warnMsg += "inside your terminal ('%s') environment. All " % encoding + warnMsg += f"inside your terminal ('{encoding}') environment. All " warnMsg += "unhandled occurrences will result in " warnMsg += "replacement with '?' character. Please, find " warnMsg += "proper character representation inside " @@ -449,9 +447,8 @@ def getConsoleLength(value): 4 """ - if isinstance(value, six.text_type): - retVal = sum((2 if ord(_) >= 0x3000 else 1) for _ in value) - else: - retVal = len(value) - - return retVal + return ( + sum((2 if ord(_) >= 0x3000 else 1) for _ in value) + if isinstance(value, six.text_type) + else len(value) + ) diff --git a/lib/core/datatype.py b/lib/core/datatype.py index eadcb9cf7ab..f9d6b12c3ec 100644 --- a/lib/core/datatype.py +++ b/lib/core/datatype.py @@ -45,7 +45,7 @@ def __getattr__(self, item): return self.__getitem__(item) except KeyError: if self.keycheck: - raise AttributeError("unable to access item '%s'" % item) + raise AttributeError(f"unable to access item '{item}'") else: return None @@ -223,7 +223,7 @@ def pop(self, last=True): def __repr__(self): if not self: - return '%s()' % (self.__class__.__name__,) + return f'{self.__class__.__name__}()' return '%s(%r)' % (self.__class__.__name__, list(self)) def __eq__(self, other): diff --git a/lib/core/dump.py b/lib/core/dump.py index 2e3cdfde635..12977da5aa4 100644 --- a/lib/core/dump.py +++ b/lib/core/dump.py @@ -87,7 +87,7 @@ def _write(self, data, newline=True, console=True, content_type=None): try: self._outputFP.write(text) except IOError as ex: - errMsg = "error occurred while writing to log file ('%s')" % getSafeExString(ex) + errMsg = f"error occurred while writing to log file ('{getSafeExString(ex)}')" raise SqlmapGenericException(errMsg) if multiThreadMode: @@ -111,7 +111,7 @@ def setOutputFile(self): try: self._outputFP = openFile(self._outputFile, "ab" if not conf.flushSession else "wb") except IOError as ex: - errMsg = "error occurred while opening log file ('%s')" % getSafeExString(ex) + errMsg = f"error occurred while opening log file ('{getSafeExString(ex)}')" raise SqlmapGenericException(errMsg) def singleString(self, data, content_type=None): @@ -141,7 +141,9 @@ def string(self, header, data, content_type=None, sort=True): if "\n" in _: self._write("%s:\n---\n%s\n---" % (header, _)) else: - self._write("%s: %s" % (header, ("'%s'" % _) if isinstance(data, six.string_types) else _)) + self._write( + f"""{header}: {f"'{_}'" if isinstance(data, six.string_types) else _}""" + ) def lister(self, header, elements, content_type=None, sort=True): if elements and sort: @@ -160,7 +162,7 @@ def lister(self, header, elements, content_type=None, sort=True): for element in elements: if isinstance(element, six.string_types): - self._write("[*] %s" % element) + self._write(f"[*] {element}") elif isListLike(element): self._write("[*] " + ", ".join(getUnicode(e) for e in element)) @@ -175,9 +177,17 @@ def currentUser(self, data): def currentDb(self, data): if Backend.getIdentifiedDbms() in (DBMS.ORACLE, DBMS.PGSQL, DBMS.HSQLDB, DBMS.H2, DBMS.MONETDB, DBMS.VERTICA, DBMS.CRATEDB, DBMS.CACHE, DBMS.FRONTBASE): - self.string("current database (equivalent to schema on %s)" % Backend.getIdentifiedDbms(), data, content_type=CONTENT_TYPE.CURRENT_DB) + self.string( + f"current database (equivalent to schema on {Backend.getIdentifiedDbms()})", + data, + content_type=CONTENT_TYPE.CURRENT_DB, + ) elif Backend.getIdentifiedDbms() in (DBMS.ALTIBASE, DBMS.DB2, DBMS.MIMERSQL, DBMS.MAXDB, DBMS.VIRTUOSO): - self.string("current database (equivalent to owner on %s)" % Backend.getIdentifiedDbms(), data, content_type=CONTENT_TYPE.CURRENT_DB) + self.string( + f"current database (equivalent to owner on {Backend.getIdentifiedDbms()})", + data, + content_type=CONTENT_TYPE.CURRENT_DB, + ) else: self.string("current database", data, content_type=CONTENT_TYPE.CURRENT_DB) @@ -207,26 +217,22 @@ def userSettings(self, header, userSettings, subHeader, content_type=None): self._write(userSettings, content_type=content_type) if userSettings: - self._write("%s:" % header) + self._write(f"{header}:") for user in users: settings = filterNone(userSettings[user]) - if isNoneValue(settings): - stringSettings = "" - else: - stringSettings = " [%d]:" % len(settings) - + stringSettings = "" if isNoneValue(settings) else " [%d]:" % len(settings) if user in self._areAdmins: - self._write("[*] %s (administrator)%s" % (user, stringSettings)) + self._write(f"[*] {user} (administrator){stringSettings}") else: - self._write("[*] %s%s" % (user, stringSettings)) + self._write(f"[*] {user}{stringSettings}") if settings: settings.sort() for setting in settings: - self._write(" %s: %s" % (subHeader, setting)) + self._write(f" {subHeader}: {setting}") if userSettings: self.singleString("") @@ -253,14 +259,18 @@ def dbTables(self, dbTables): for db, tables in dbTables.items(): tables = sorted(filter(None, tables)) - self._write("Database: %s" % unsafeSQLIdentificatorNaming(db) if db and METADB_SUFFIX not in db else "") + self._write( + f"Database: {unsafeSQLIdentificatorNaming(db)}" + if db and METADB_SUFFIX not in db + else "" + ) if len(tables) == 1: self._write("[1 table]") else: self._write("[%d tables]" % len(tables)) - self._write("+%s+" % lines) + self._write(f"+{lines}+") for table in tables: if table and isListLike(table): @@ -268,7 +278,7 @@ def dbTables(self, dbTables): table = unsafeSQLIdentificatorNaming(table) blank = " " * (maxlength - getConsoleLength(getUnicode(table))) - self._write("| %s%s |" % (table, blank)) + self._write(f"| {table}{blank} |") self._write("+%s+\n" % lines) elif dbTables is None or len(dbTables) == 0: @@ -277,77 +287,77 @@ def dbTables(self, dbTables): self.string("tables", dbTables, content_type=CONTENT_TYPE.TABLES) def dbTableColumns(self, tableColumns, content_type=None): - if isinstance(tableColumns, dict) and len(tableColumns) > 0: - if conf.api: - self._write(tableColumns, content_type=content_type) - - for db, tables in tableColumns.items(): - if not db: - db = "All" + if not isinstance(tableColumns, dict) or len(tableColumns) <= 0: + return + if conf.api: + self._write(tableColumns, content_type=content_type) - for table, columns in tables.items(): - maxlength1 = 0 - maxlength2 = 0 + for db, tables in tableColumns.items(): + if not db: + db = "All" - colType = None + for table, columns in tables.items(): + maxlength1 = 0 + maxlength2 = 0 - colList = list(columns.keys()) - colList.sort(key=lambda _: _.lower() if hasattr(_, "lower") else _) + colType = None - for column in colList: - colType = columns[column] + colList = list(columns.keys()) + colList.sort(key=lambda _: _.lower() if hasattr(_, "lower") else _) - column = unsafeSQLIdentificatorNaming(column) - maxlength1 = max(maxlength1, len(column or "")) - maxlength2 = max(maxlength2, len(colType or "")) + for column in colList: + colType = columns[column] - maxlength1 = max(maxlength1, len("COLUMN")) - lines1 = "-" * (maxlength1 + 2) + column = unsafeSQLIdentificatorNaming(column) + maxlength1 = max(maxlength1, len(column or "")) + maxlength2 = max(maxlength2, len(colType or "")) - if colType is not None: - maxlength2 = max(maxlength2, len("TYPE")) - lines2 = "-" * (maxlength2 + 2) + maxlength1 = max(maxlength1, len("COLUMN")) + lines1 = "-" * (maxlength1 + 2) - self._write("Database: %s\nTable: %s" % (unsafeSQLIdentificatorNaming(db) if db and METADB_SUFFIX not in db else "", unsafeSQLIdentificatorNaming(table))) + if colType is not None: + maxlength2 = max(maxlength2, len("TYPE")) + lines2 = "-" * (maxlength2 + 2) - if len(columns) == 1: - self._write("[1 column]") - else: - self._write("[%d columns]" % len(columns)) + self._write("Database: %s\nTable: %s" % (unsafeSQLIdentificatorNaming(db) if db and METADB_SUFFIX not in db else "", unsafeSQLIdentificatorNaming(table))) - if colType is not None: - self._write("+%s+%s+" % (lines1, lines2)) - else: - self._write("+%s+" % lines1) + if len(columns) == 1: + self._write("[1 column]") + else: + self._write("[%d columns]" % len(columns)) - blank1 = " " * (maxlength1 - len("COLUMN")) + if colType is not None: + self._write(f"+{lines1}+{lines2}+") + else: + self._write(f"+{lines1}+") - if colType is not None: - blank2 = " " * (maxlength2 - len("TYPE")) + blank1 = " " * (maxlength1 - len("COLUMN")) - if colType is not None: - self._write("| Column%s | Type%s |" % (blank1, blank2)) - self._write("+%s+%s+" % (lines1, lines2)) - else: - self._write("| Column%s |" % blank1) - self._write("+%s+" % lines1) + if colType is not None: + blank2 = " " * (maxlength2 - len("TYPE")) - for column in colList: - colType = columns[column] + self._write(f"| Column{blank1} | Type{blank2} |") + self._write(f"+{lines1}+{lines2}+") + else: + self._write(f"| Column{blank1} |") + self._write(f"+{lines1}+") - column = unsafeSQLIdentificatorNaming(column) - blank1 = " " * (maxlength1 - len(column)) + for column in colList: + colType = columns[column] - if colType is not None: - blank2 = " " * (maxlength2 - len(colType)) - self._write("| %s%s | %s%s |" % (column, blank1, colType, blank2)) - else: - self._write("| %s%s |" % (column, blank1)) + column = unsafeSQLIdentificatorNaming(column) + blank1 = " " * (maxlength1 - len(column)) if colType is not None: - self._write("+%s+%s+\n" % (lines1, lines2)) + blank2 = " " * (maxlength2 - len(colType)) + self._write(f"| {column}{blank1} | {colType}{blank2} |") else: - self._write("+%s+\n" % lines1) + self._write(f"| {column}{blank1} |") + + if colType is not None: + self._write("+%s+%s+\n" % (lines1, lines2)) + else: + self._write("+%s+\n" % lines1) def dbTablesCount(self, dbTables): if isinstance(dbTables, dict) and len(dbTables) > 0: @@ -363,16 +373,20 @@ def dbTablesCount(self, dbTables): maxlength1 = max(maxlength1, getConsoleLength(getUnicode(table))) for db, counts in dbTables.items(): - self._write("Database: %s" % unsafeSQLIdentificatorNaming(db) if db and METADB_SUFFIX not in db else "") + self._write( + f"Database: {unsafeSQLIdentificatorNaming(db)}" + if db and METADB_SUFFIX not in db + else "" + ) lines1 = "-" * (maxlength1 + 2) blank1 = " " * (maxlength1 - len("Table")) lines2 = "-" * (maxlength2 + 2) blank2 = " " * (maxlength2 - len("Entries")) - self._write("+%s+%s+" % (lines1, lines2)) - self._write("| Table%s | Entries%s |" % (blank1, blank2)) - self._write("+%s+%s+" % (lines1, lines2)) + self._write(f"+{lines1}+{lines2}+") + self._write(f"| Table{blank1} | Entries{blank2} |") + self._write(f"+{lines1}+{lines2}+") sortedCounts = list(counts.keys()) sortedCounts.sort(reverse=True) diff --git a/lib/core/gui.py b/lib/core/gui.py index fa6f2694943..77ab2d6889a 100644 --- a/lib/core/gui.py +++ b/lib/core/gui.py @@ -42,9 +42,8 @@ def runGui(parser): from thirdparty.six.moves import tkinter_ttk as _tkinter_ttk from thirdparty.six.moves import tkinter_messagebox as _tkinter_messagebox except ImportError as ex: - raise SqlmapMissingDependence("missing dependence ('%s')" % getSafeExString(ex)) + raise SqlmapMissingDependence(f"missing dependence ('{getSafeExString(ex)}')") - # Reference: https://www.reddit.com/r/learnpython/comments/985umy/limit_user_input_to_only_int_with_tkinter/e4dj9k9?utm_source=share&utm_medium=web2x class ConstrainedEntry(_tkinter.Entry): def __init__(self, master=None, **kwargs): self.var = _tkinter.StringVar() @@ -61,7 +60,6 @@ def check(self, *args): else: self.set(self.old_value) - # Reference: https://code.activestate.com/recipes/580726-tkinter-notebook-that-fits-to-the-height-of-every-/ class AutoresizableNotebook(_tkinter_ttk.Notebook): def __init__(self, master=None, **kw): _tkinter_ttk.Notebook.__init__(self, master, **kw) @@ -76,7 +74,7 @@ def _on_tab_changed(self, event): try: window = _tkinter.Tk() except Exception as ex: - errMsg = "unable to create GUI window ('%s')" % getSafeExString(ex) + errMsg = f"unable to create GUI window ('{getSafeExString(ex)}')" raise SqlmapSystemException(errMsg) window.title(VERSION_STRING) @@ -242,12 +240,17 @@ def enqueue(stream, queue): row = 1 if group.get_description(): - _tkinter.Label(frame, text="%s:" % group.get_description()).grid(column=0, row=1, columnspan=3, sticky=_tkinter.W) + _tkinter.Label(frame, text=f"{group.get_description()}:").grid( + column=0, row=1, columnspan=3, sticky=_tkinter.W + ) _tkinter.Label(frame).grid(column=0, row=2, sticky=_tkinter.W) row += 2 for option in group.option_list: - _tkinter.Label(frame, text="%s " % parser.formatter._format_option_strings(option)).grid(column=0, row=row, sticky=_tkinter.W) + _tkinter.Label( + frame, + text=f"{parser.formatter._format_option_strings(option)} ", + ).grid(column=0, row=row, sticky=_tkinter.W) if option.type == "string": widget = _tkinter.Entry(frame) @@ -270,7 +273,9 @@ def enqueue(stream, queue): if hasattr(widget, "insert"): widget.insert(0, default) - _tkinter.Label(frame, text=" %s" % option.help).grid(column=2, row=row, sticky=_tkinter.W) + _tkinter.Label(frame, text=f" {option.help}").grid( + column=2, row=row, sticky=_tkinter.W + ) row += 1 diff --git a/lib/core/log.py b/lib/core/log.py index 14ba83d681b..6d4a0326a34 100644 --- a/lib/core/log.py +++ b/lib/core/log.py @@ -43,8 +43,7 @@ def colorize(self, message, levelno, force=False): prefix = match.group(1) if match else "" message = message[len(prefix):] - match = re.search(r"\[([A-Z ]+)\]", message) # log level - if match: + if match := re.search(r"\[([A-Z ]+)\]", message): level = match.group(1) if message.startswith(self.bold): message = message.replace(self.bold, "") @@ -54,43 +53,54 @@ def colorize(self, message, levelno, force=False): reset = self.reset message = message.replace(level, ''.join((self.csi, ';'.join(params), 'm', level, reset)), 1) - match = re.search(r"\A\s*\[([\d:]+)\]", message) # time - if match: + if match := re.search(r"\A\s*\[([\d:]+)\]", message): time = match.group(1) message = message.replace(time, ''.join((self.csi, str(self.color_map["cyan"] + 30), 'm', time, self._reset(message))), 1) - match = re.search(r"\[(#\d+)\]", message) # counter - if match: + if match := re.search(r"\[(#\d+)\]", message): counter = match.group(1) message = message.replace(counter, ''.join((self.csi, str(self.color_map["yellow"] + 30), 'm', counter, self._reset(message))), 1) if level != "PAYLOAD": if any(_ in message for _ in ("parsed DBMS error message",)): - match = re.search(r": '(.+)'", message) - if match: + if match := re.search(r": '(.+)'", message): string = match.group(1) - message = message.replace("'%s'" % string, "'%s'" % ''.join((self.csi, str(self.color_map["white"] + 30), 'm', string, self._reset(message))), 1) + message = message.replace( + f"'{string}'", + f"""'{''.join((self.csi, str(self.color_map["white"] + 30), 'm', string, self._reset(message)))}'""", + 1, + ) + elif match := re.search(r"\bresumed: '(.+\.\.\.)", message): + string = match.group(1) + message = message.replace( + f"'{string}", + f"""'{''.join((self.csi, str(self.color_map["white"] + 30), 'm', string, self._reset(message)))}""", + 1, + ) + elif match := re.search( + r" \('(.+)'\)\Z", message + ) or re.search(r"output: '(.+)'\Z", message): + string = match.group(1) + message = message.replace( + f"'{string}'", + f"""'{''.join((self.csi, str(self.color_map["white"] + 30), 'm', string, self._reset(message)))}'""", + 1, + ) else: - match = re.search(r"\bresumed: '(.+\.\.\.)", message) - if match: + for match in re.finditer(r"[^\w]'([^']+)'", message): # single-quoted string = match.group(1) - message = message.replace("'%s" % string, "'%s" % ''.join((self.csi, str(self.color_map["white"] + 30), 'm', string, self._reset(message))), 1) - else: - match = re.search(r" \('(.+)'\)\Z", message) or re.search(r"output: '(.+)'\Z", message) - if match: - string = match.group(1) - message = message.replace("'%s'" % string, "'%s'" % ''.join((self.csi, str(self.color_map["white"] + 30), 'm', string, self._reset(message))), 1) - else: - for match in re.finditer(r"[^\w]'([^']+)'", message): # single-quoted - string = match.group(1) - message = message.replace("'%s'" % string, "'%s'" % ''.join((self.csi, str(self.color_map["white"] + 30), 'm', string, self._reset(message))), 1) + message = message.replace( + f"'{string}'", + f"""'{''.join((self.csi, str(self.color_map["white"] + 30), 'm', string, self._reset(message)))}'""", + 1, + ) else: message = ''.join((self.csi, ';'.join(params), 'm', message, self.reset)) if prefix: - message = "%s%s" % (prefix, message) + message = f"{prefix}{message}" - message = message.replace("%s]" % self.bold, "]%s" % self.bold) # dirty patch + message = message.replace(f"{self.bold}]", f"]{self.bold}") return message diff --git a/lib/core/option.py b/lib/core/option.py index 678a1c2beb1..c05f34bc1ab 100644 --- a/lib/core/option.py +++ b/lib/core/option.py @@ -207,7 +207,7 @@ def __contains__(self, name): tree.parse(paths.QUERIES_XML) except Exception as ex: errMsg = "something appears to be wrong with " - errMsg += "the file '%s' ('%s'). Please make " % (paths.QUERIES_XML, getSafeExString(ex)) + errMsg += f"the file '{paths.QUERIES_XML}' ('{getSafeExString(ex)}'). Please make " errMsg += "sure that you haven't made any changes to it" raise SqlmapInstallationException(errMsg) @@ -226,7 +226,7 @@ def _setMultipleTargets(): if not conf.logFile: return - debugMsg = "parsing targets list from '%s'" % conf.logFile + debugMsg = f"parsing targets list from '{conf.logFile}'" logger.debug(debugMsg) if not os.path.exists(conf.logFile): @@ -236,7 +236,12 @@ def _setMultipleTargets(): if checkFile(conf.logFile, False): for target in parseRequestFile(conf.logFile): url, _, data, _, _ = target - key = re.sub(r"(\w+=)[^%s ]*" % (conf.paramDel or DEFAULT_GET_POST_DELIMITER), r"\g<1>", "%s %s" % (url, data)) + key = re.sub( + r"(\w+=)[^%s ]*" + % (conf.paramDel or DEFAULT_GET_POST_DELIMITER), + r"\g<1>", + f"{url} {data}", + ) if key not in seen: kb.targets.add(target) seen.add(key) @@ -251,14 +256,18 @@ def _setMultipleTargets(): for target in parseRequestFile(os.path.join(conf.logFile, reqFile)): url, _, data, _, _ = target - key = re.sub(r"(\w+=)[^%s ]*" % (conf.paramDel or DEFAULT_GET_POST_DELIMITER), r"\g<1>", "%s %s" % (url, data)) + key = re.sub( + r"(\w+=)[^%s ]*" + % (conf.paramDel or DEFAULT_GET_POST_DELIMITER), + r"\g<1>", + f"{url} {data}", + ) if key not in seen: kb.targets.add(target) seen.add(key) else: - errMsg = "the specified list of targets is not a file " - errMsg += "nor a directory" + errMsg = "the specified list of targets is not a file " + "nor a directory" raise SqlmapFilePathException(errMsg) updatedTargetsCount = len(kb.targets) @@ -302,11 +311,11 @@ def _setRequestFromFile(): seen = set() if not checkFile(requestFile, False): - errMsg = "specified HTTP request file '%s' " % requestFile + errMsg = f"specified HTTP request file '{requestFile}' " errMsg += "does not exist" raise SqlmapFilePathException(errMsg) - infoMsg = "parsing HTTP request from '%s'" % requestFile + infoMsg = f"parsing HTTP request from '{requestFile}'" logger.info(infoMsg) for target in parseRequestFile(requestFile): @@ -318,7 +327,7 @@ def _setRequestFromFile(): seen.add(url) if url is None: - errMsg = "specified file '%s' " % requestFile + errMsg = f"specified file '{requestFile}' " errMsg += "does not contain a usable HTTP request (with parameters)" raise SqlmapDataException(errMsg) @@ -326,18 +335,18 @@ def _setRequestFromFile(): conf.secondReq = safeExpandUser(conf.secondReq) if not checkFile(conf.secondReq, False): - errMsg = "specified second-order HTTP request file '%s' " % conf.secondReq + errMsg = f"specified second-order HTTP request file '{conf.secondReq}' " errMsg += "does not exist" raise SqlmapFilePathException(errMsg) - infoMsg = "parsing second-order HTTP request from '%s'" % conf.secondReq + infoMsg = f"parsing second-order HTTP request from '{conf.secondReq}'" logger.info(infoMsg) try: target = next(parseRequestFile(conf.secondReq, False)) kb.secondReq = target except StopIteration: - errMsg = "specified second-order HTTP request file '%s' " % conf.secondReq + errMsg = f"specified second-order HTTP request file '{conf.secondReq}' " errMsg += "does not contain a valid HTTP request" raise SqlmapDataException(errMsg) @@ -394,11 +403,7 @@ def retrieve(): if not conf.forms: infoMsg += ", " - if len(links) == len(kb.targets): - infoMsg += "all " - else: - infoMsg += "%d " % len(kb.targets) - + infoMsg += "all " if len(links) == len(kb.targets) else "%d " % len(kb.targets) infoMsg += "of them are testable targets" logger.info(infoMsg) @@ -423,6 +428,8 @@ def _setStdinPipeTargets(): infoMsg = "using 'STDIN' for parsing targets list" logger.info(infoMsg) + + class _(object): def __init__(self): self.__rest = OrderedSet() @@ -440,8 +447,11 @@ def next(self): line = None if line: - match = re.search(r"\b(https?://[^\s'\"]+|[\w.]+\.\w{2,3}[/\w+]*\?[^\s'\"]+)", line, re.I) - if match: + if match := re.search( + r"\b(https?://[^\s'\"]+|[\w.]+\.\w{2,3}[/\w+]*\?[^\s'\"]+)", + line, + re.I, + ): return (match.group(0), conf.method, conf.data, conf.cookie, None) elif self.__rest: return self.__rest.pop() @@ -451,6 +461,7 @@ def next(self): def add(self, elem): self.__rest.add(elem) + kb.targets = _() def _setBulkMultipleTargets(): @@ -459,12 +470,11 @@ def _setBulkMultipleTargets(): conf.bulkFile = safeExpandUser(conf.bulkFile) - infoMsg = "parsing multiple targets list from '%s'" % conf.bulkFile + infoMsg = f"parsing multiple targets list from '{conf.bulkFile}'" logger.info(infoMsg) if not checkFile(conf.bulkFile, False): - errMsg = "the specified bulk file " - errMsg += "does not exist" + errMsg = "the specified bulk file " + "does not exist" raise SqlmapFilePathException(errMsg) found = False @@ -509,7 +519,7 @@ def _findPageForms(): target = targets[i].strip() if not re.search(r"(?i)\Ahttp[s]*://", target): - target = "http://%s" % target + target = f"http://{target}" page, _, _ = Request.getPage(url=target.strip(), cookie=conf.cookie, crawling=True, raise404=False) if findPageForms(page, target, False, True): @@ -521,7 +531,7 @@ def _findPageForms(): except KeyboardInterrupt: break except Exception as ex: - errMsg = "problem occurred while searching for forms at '%s' ('%s')" % (target, getSafeExString(ex)) + errMsg = f"problem occurred while searching for forms at '{target}' ('{getSafeExString(ex)}')" logger.error(errMsg) if not found: @@ -543,8 +553,10 @@ def _setDBMSAuthentication(): match = re.search(r"^(.+?):(.*?)$", conf.dbmsCred) if not match: - errMsg = "DBMS authentication credentials value must be in format " - errMsg += "username:password" + errMsg = ( + "DBMS authentication credentials value must be in format " + + "username:password" + ) raise SqlmapSyntaxException(errMsg) conf.dbmsUsername = match.group(1) @@ -563,8 +575,10 @@ def _setMetasploit(): try: __import__("win32file") except ImportError: - errMsg = "sqlmap requires third-party module 'pywin32' " - errMsg += "in order to use Metasploit functionalities on " + errMsg = ( + "sqlmap requires third-party module 'pywin32' " + + "in order to use Metasploit functionalities on " + ) errMsg += "Windows. You can download it from " errMsg += "'https://github.com/mhammond/pywin32'" raise SqlmapMissingDependence(errMsg) @@ -579,19 +593,46 @@ def _setMetasploit(): isAdmin = runningAsAdmin() if not isAdmin: - errMsg = "you need to run sqlmap as an administrator " - errMsg += "if you want to perform a SMB relay attack because " + errMsg = ( + "you need to run sqlmap as an administrator " + + "if you want to perform a SMB relay attack because " + ) errMsg += "it will need to listen on a user-specified SMB " errMsg += "TCP port for incoming connection attempts" raise SqlmapMissingPrivileges(errMsg) if conf.msfPath: for path in (conf.msfPath, os.path.join(conf.msfPath, "bin")): - if any(os.path.exists(normalizePath(os.path.join(path, "%s%s" % (_, ".bat" if IS_WIN else "")))) for _ in ("msfcli", "msfconsole")): + if any( + os.path.exists( + normalizePath( + os.path.join(path, f'{_}{".bat" if IS_WIN else ""}') + ) + ) + for _ in ("msfcli", "msfconsole") + ): msfEnvPathExists = True - if all(os.path.exists(normalizePath(os.path.join(path, "%s%s" % (_, ".bat" if IS_WIN else "")))) for _ in ("msfvenom",)): + if all( + os.path.exists( + normalizePath( + os.path.join( + path, f'{_}{".bat" if IS_WIN else ""}' + ) + ) + ) + for _ in ("msfvenom",) + ): kb.oldMsf = False - elif all(os.path.exists(normalizePath(os.path.join(path, "%s%s" % (_, ".bat" if IS_WIN else "")))) for _ in ("msfencode", "msfpayload")): + elif all( + os.path.exists( + normalizePath( + os.path.join( + path, f'{_}{".bat" if IS_WIN else ""}' + ) + ) + ) + for _ in ("msfencode", "msfpayload") + ): kb.oldMsf = True else: msfEnvPathExists = False @@ -601,24 +642,28 @@ def _setMetasploit(): if msfEnvPathExists: debugMsg = "provided Metasploit Framework path " - debugMsg += "'%s' is valid" % conf.msfPath + debugMsg += f"'{conf.msfPath}' is valid" logger.debug(debugMsg) else: warnMsg = "the provided Metasploit Framework path " - warnMsg += "'%s' is not valid. The cause could " % conf.msfPath + warnMsg += f"'{conf.msfPath}' is not valid. The cause could " warnMsg += "be that the path does not exists or that one " warnMsg += "or more of the needed Metasploit executables " warnMsg += "within msfcli, msfconsole, msfencode and " warnMsg += "msfpayload do not exist" logger.warning(warnMsg) else: - warnMsg = "you did not provide the local path where Metasploit " - warnMsg += "Framework is installed" + warnMsg = ( + "you did not provide the local path where Metasploit " + + "Framework is installed" + ) logger.warning(warnMsg) if not msfEnvPathExists: - warnMsg = "sqlmap is going to look for Metasploit Framework " - warnMsg += "installation inside the environment path(s)" + warnMsg = ( + "sqlmap is going to look for Metasploit Framework " + + "installation inside the environment path(s)" + ) logger.warning(warnMsg) envPaths = os.environ.get("PATH", "").split(";" if IS_WIN else ":") @@ -626,18 +671,43 @@ def _setMetasploit(): for envPath in envPaths: envPath = envPath.replace(";", "") - if any(os.path.exists(normalizePath(os.path.join(envPath, "%s%s" % (_, ".bat" if IS_WIN else "")))) for _ in ("msfcli", "msfconsole")): + if any( + os.path.exists( + normalizePath( + os.path.join(envPath, f'{_}{".bat" if IS_WIN else ""}') + ) + ) + for _ in ("msfcli", "msfconsole") + ): msfEnvPathExists = True - if all(os.path.exists(normalizePath(os.path.join(envPath, "%s%s" % (_, ".bat" if IS_WIN else "")))) for _ in ("msfvenom",)): + if all( + os.path.exists( + normalizePath( + os.path.join( + envPath, f'{_}{".bat" if IS_WIN else ""}' + ) + ) + ) + for _ in ("msfvenom",) + ): kb.oldMsf = False - elif all(os.path.exists(normalizePath(os.path.join(envPath, "%s%s" % (_, ".bat" if IS_WIN else "")))) for _ in ("msfencode", "msfpayload")): + elif all( + os.path.exists( + normalizePath( + os.path.join( + envPath, f'{_}{".bat" if IS_WIN else ""}' + ) + ) + ) + for _ in ("msfencode", "msfpayload") + ): kb.oldMsf = True else: msfEnvPathExists = False if msfEnvPathExists: infoMsg = "Metasploit Framework has been found " - infoMsg += "installed in the '%s' path" % envPath + infoMsg += f"installed in the '{envPath}' path" logger.info(infoMsg) conf.msfPath = envPath @@ -645,8 +715,10 @@ def _setMetasploit(): break if not msfEnvPathExists: - errMsg = "unable to locate Metasploit Framework installation. " - errMsg += "You can get it at 'https://www.metasploit.com/download/'" + errMsg = ( + "unable to locate Metasploit Framework installation. " + + "You can get it at 'https://www.metasploit.com/download/'" + ) raise SqlmapFilePathException(errMsg) def _setWriteFile(): @@ -657,12 +729,12 @@ def _setWriteFile(): logger.debug(debugMsg) if not os.path.exists(conf.fileWrite): - errMsg = "the provided local file '%s' does not exist" % conf.fileWrite + errMsg = f"the provided local file '{conf.fileWrite}' does not exist" raise SqlmapFilePathException(errMsg) if not conf.fileDest: errMsg = "you did not provide the back-end DBMS absolute path " - errMsg += "where you want to write the local file '%s'" % conf.fileWrite + errMsg += f"where you want to write the local file '{conf.fileWrite}'" raise SqlmapMissingMandatoryOptionException(errMsg) conf.fileWriteType = getFileType(conf.fileWrite) @@ -676,40 +748,43 @@ def _setOS(): return if conf.os.lower() not in SUPPORTED_OS: - errMsg = "you provided an unsupported back-end DBMS operating " - errMsg += "system. The supported DBMS operating systems for OS " - errMsg += "and file system access are %s. " % ', '.join([o.capitalize() for o in SUPPORTED_OS]) + errMsg = ( + "you provided an unsupported back-end DBMS operating " + + "system. The supported DBMS operating systems for OS " + ) + errMsg += f"and file system access are {', '.join([o.capitalize() for o in SUPPORTED_OS])}. " errMsg += "If you do not know the back-end DBMS underlying OS, " errMsg += "do not provide it and sqlmap will fingerprint it for " errMsg += "you." raise SqlmapUnsupportedDBMSException(errMsg) debugMsg = "forcing back-end DBMS operating system to user defined " - debugMsg += "value '%s'" % conf.os + debugMsg += f"value '{conf.os}'" logger.debug(debugMsg) Backend.setOs(conf.os) def _setTechnique(): + if not conf.technique or not isinstance(conf.technique, six.string_types): + return + _ = [] + validTechniques = sorted(getPublicTypeMembers(PAYLOAD.TECHNIQUE), key=lambda x: x[1]) validLetters = [_[0][0].upper() for _ in validTechniques] - if conf.technique and isinstance(conf.technique, six.string_types): - _ = [] - - for letter in conf.technique.upper(): - if letter not in validLetters: - errMsg = "value for --technique must be a string composed " - errMsg += "by the letters %s. Refer to the " % ", ".join(validLetters) - errMsg += "user's manual for details" - raise SqlmapSyntaxException(errMsg) + for letter in conf.technique.upper(): + if letter not in validLetters: + errMsg = "value for --technique must be a string composed " + errMsg += f'by the letters {", ".join(validLetters)}. Refer to the ' + errMsg += "user's manual for details" + raise SqlmapSyntaxException(errMsg) - for validTech, validInt in validTechniques: - if letter == validTech[0]: - _.append(validInt) - break + for validTech, validInt in validTechniques: + if letter == validTech[0]: + _.append(validInt) + break - conf.technique = _ + conf.technique = _ def _setDBMS(): """ @@ -723,15 +798,20 @@ def _setDBMS(): logger.debug(debugMsg) conf.dbms = conf.dbms.lower() - regex = re.search(r"%s ([\d\.]+)" % ("(%s)" % "|".join(SUPPORTED_DBMS)), conf.dbms, re.I) - - if regex: + if regex := re.search( + r"%s ([\d\.]+)" % f'({"|".join(SUPPORTED_DBMS)})', conf.dbms, re.I + ): conf.dbms = regex.group(1) Backend.setVersion(regex.group(2)) if conf.dbms not in SUPPORTED_DBMS: errMsg = "you provided an unsupported back-end database management " - errMsg += "system. Supported DBMSes are as follows: %s. " % ', '.join(sorted((_ for _ in (list(DBMS_DICT) + getPublicTypeMembers(FORK, True))), key=str.lower)) + errMsg += "system. Supported DBMSes are as follows: %s. " % ', '.join( + sorted( + iter(list(DBMS_DICT) + getPublicTypeMembers(FORK, True)), + key=str.lower, + ) + ) errMsg += "If you do not know the back-end DBMS, do not provide " errMsg += "it and sqlmap will fingerprint it for you." raise SqlmapUnsupportedDBMSException(errMsg) @@ -753,8 +833,7 @@ def _listTamperingFunctions(): for script in sorted(glob.glob(os.path.join(paths.SQLMAP_TAMPER_PATH, "*.py"))): content = openFile(script, "rb").read() - match = re.search(r'(?s)__priority__.+"""(.+)"""', content) - if match: + if match := re.search(r'(?s)__priority__.+"""(.+)"""', content): comment = match.group(1).strip() dataToStdout("* %s - %s\n" % (setColor(os.path.basename(script), "yellow"), re.sub(r" *\n *", " ", comment.split("\n\n")[0].strip()))) diff --git a/lib/core/patch.py b/lib/core/patch.py index 9136b70a472..bf2f839dcef 100644 --- a/lib/core/patch.py +++ b/lib/core/patch.py @@ -84,7 +84,9 @@ def _(self, *args): match = re.search(r" --method[= ](\w+)", " ".join(sys.argv)) if match and match.group(1).upper() != PLACE.POST: - PLACE.CUSTOM_POST = PLACE.CUSTOM_POST.replace("POST", "%s (body)" % match.group(1)) + PLACE.CUSTOM_POST = PLACE.CUSTOM_POST.replace( + "POST", f"{match.group(1)} (body)" + ) # https://github.com/sqlmapproject/sqlmap/issues/4314 try: diff --git a/lib/core/profiling.py b/lib/core/profiling.py index 4fddab24a7e..8a6fa0397d2 100644 --- a/lib/core/profiling.py +++ b/lib/core/profiling.py @@ -25,5 +25,5 @@ def profile(profileOutputFile=None): # Start sqlmap main function and generate a raw profile file cProfile.run("start()", profileOutputFile) - infoMsg = "execution profiled and stored into file '%s' (e.g. 'gprof2dot -f pstats %s | dot -Tpng -o /tmp/sqlmap_profile.png')" % (profileOutputFile, profileOutputFile) + infoMsg = f"execution profiled and stored into file '{profileOutputFile}' (e.g. 'gprof2dot -f pstats {profileOutputFile} | dot -Tpng -o /tmp/sqlmap_profile.png')" logger.info(infoMsg) diff --git a/lib/core/readlineng.py b/lib/core/readlineng.py index 0a6c1dd5185..0be67d9d6dc 100644 --- a/lib/core/readlineng.py +++ b/lib/core/readlineng.py @@ -5,6 +5,7 @@ See the file 'LICENSE' for copying permission """ + _readline = None try: from readline import * @@ -24,8 +25,7 @@ try: _outputfile = _readline.GetOutputFile() except AttributeError: - debugMsg = "Failed GetOutputFile when using platform's " - debugMsg += "readline library" + debugMsg = "Failed GetOutputFile when using platform's " + "readline library" logger.debug(debugMsg) _readline = None @@ -37,14 +37,18 @@ if PLATFORM == "mac" and _readline: import commands - (status, result) = commands.getstatusoutput("otool -L %s | grep libedit" % _readline.__file__) + (status, result) = commands.getstatusoutput( + f"otool -L {_readline.__file__} | grep libedit" + ) if status == 0 and len(result) > 0: # We are bound to libedit - new in Leopard _readline.parse_and_bind("bind ^I rl_complete") - debugMsg = "Leopard libedit detected when using platform's " - debugMsg += "readline library" + debugMsg = ( + "Leopard libedit detected when using platform's " + + "readline library" + ) logger.debug(debugMsg) uses_libedit = True diff --git a/lib/core/replication.py b/lib/core/replication.py index 236d1ed4463..c33a0f4aafc 100644 --- a/lib/core/replication.py +++ b/lib/core/replication.py @@ -30,7 +30,7 @@ def __init__(self, dbpath): self.cursor = self.connection.cursor() except sqlite3.OperationalError as ex: errMsg = "error occurred while opening a replication " - errMsg += "file '%s' ('%s')" % (dbpath, getSafeExString(ex)) + errMsg += f"file '{dbpath}' ('{getSafeExString(ex)}')" raise SqlmapConnectionException(errMsg) class DataType(object): @@ -46,7 +46,7 @@ def __str__(self): return self.name def __repr__(self): - return "" % self + return f"" class Table(object): """ @@ -59,14 +59,18 @@ def __init__(self, parent, name, columns=None, create=True, typeless=False): self.columns = columns if create: try: - self.execute('DROP TABLE IF EXISTS "%s"' % self.name) + self.execute(f'DROP TABLE IF EXISTS "{self.name}"') if not typeless: - self.execute('CREATE TABLE "%s" (%s)' % (self.name, ','.join('"%s" %s' % (unsafeSQLIdentificatorNaming(colname), coltype) for colname, coltype in self.columns))) + self.execute( + f"""CREATE TABLE "{self.name}" ({','.join(f'"{unsafeSQLIdentificatorNaming(colname)}" {coltype}' for colname, coltype in self.columns)})""" + ) else: - self.execute('CREATE TABLE "%s" (%s)' % (self.name, ','.join('"%s"' % unsafeSQLIdentificatorNaming(colname) for colname in self.columns))) + self.execute( + f"""CREATE TABLE "{self.name}" ({','.join(f'"{unsafeSQLIdentificatorNaming(colname)}"' for colname in self.columns)})""" + ) except Exception as ex: - errMsg = "problem occurred ('%s') while initializing the sqlite database " % getSafeExString(ex, UNICODE_ENCODING) - errMsg += "located at '%s'" % self.parent.dbpath + errMsg = f"problem occurred ('{getSafeExString(ex, UNICODE_ENCODING)}') while initializing the sqlite database " + errMsg += f"located at '{self.parent.dbpath}'" raise SqlmapGenericException(errMsg) def insert(self, values): @@ -75,7 +79,10 @@ def insert(self, values): """ if len(values) == len(self.columns): - self.execute('INSERT INTO "%s" VALUES (%s)' % (self.name, ','.join(['?'] * len(values))), safechardecode(values)) + self.execute( + f"""INSERT INTO "{self.name}" VALUES ({','.join(['?'] * len(values))})""", + safechardecode(values), + ) else: errMsg = "wrong number of columns used in replicating insert" raise SqlmapValueException(errMsg) @@ -87,8 +94,8 @@ def execute(self, sql, parameters=None): except UnicodeError: self.parent.cursor.execute(sql, cleanReplaceUnicode(parameters or [])) except sqlite3.OperationalError as ex: - errMsg = "problem occurred ('%s') while accessing sqlite database " % getSafeExString(ex, UNICODE_ENCODING) - errMsg += "located at '%s'. Please make sure that " % self.parent.dbpath + errMsg = f"problem occurred ('{getSafeExString(ex, UNICODE_ENCODING)}') while accessing sqlite database " + errMsg += f"located at '{self.parent.dbpath}'. Please make sure that " errMsg += "it's not used by some other program" raise SqlmapGenericException(errMsg) @@ -106,9 +113,9 @@ def select(self, condition=None): """ This function is used for selecting row(s) from current table. """ - _ = 'SELECT * FROM %s' % self.name + _ = f'SELECT * FROM {self.name}' if condition: - _ += 'WHERE %s' % condition + _ += f'WHERE {condition}' return self.execute(_) def createTable(self, tblname, columns=None, typeless=False): diff --git a/lib/core/revision.py b/lib/core/revision.py index 7abd30cd03e..56bb0ac7771 100644 --- a/lib/core/revision.py +++ b/lib/core/revision.py @@ -28,32 +28,30 @@ def getRevisionNumber(): filePath = os.path.join(_, ".git", "HEAD") if os.path.exists(filePath): break + filePath = None + if _ == os.path.dirname(_): + break else: - filePath = None - if _ == os.path.dirname(_): - break - else: - _ = os.path.dirname(_) + _ = os.path.dirname(_) while True: - if filePath and os.path.isfile(filePath): - with openFile(filePath, "r") as f: - content = getText(f.read()) - filePath = None + if not filePath or not os.path.isfile(filePath): + break - if content.startswith("ref: "): - try: - filePath = os.path.join(_, ".git", content.replace("ref: ", "")).strip() - except UnicodeError: - pass + with openFile(filePath, "r") as f: + content = getText(f.read()) + filePath = None - if filePath is None: - match = re.match(r"(?i)[0-9a-f]{32}", content) - retVal = match.group(0) if match else None - break - else: - break + if content.startswith("ref: "): + try: + filePath = os.path.join(_, ".git", content.replace("ref: ", "")).strip() + except UnicodeError: + pass + if filePath is None: + match = re.match(r"(?i)[0-9a-f]{32}", content) + retVal = match.group(0) if match else None + break if not retVal: try: process = subprocess.Popen("git rev-parse --verify HEAD", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/lib/core/session.py b/lib/core/session.py index c50d7b03e87..c6db5cbb668 100644 --- a/lib/core/session.py +++ b/lib/core/session.py @@ -25,17 +25,15 @@ def setDbms(dbms): hashDBWrite(HASHDB_KEYS.DBMS, dbms) - _ = "(%s)" % ('|'.join(SUPPORTED_DBMS)) - _ = re.search(r"\A%s( |\Z)" % _, dbms, re.I) - - if _: + _ = f"({'|'.join(SUPPORTED_DBMS)})" + if _ := re.search(r"\A%s( |\Z)" % _, dbms, re.I): dbms = _.group(1) Backend.setDbms(dbms) if kb.resolutionDbms: hashDBWrite(HASHDB_KEYS.DBMS, kb.resolutionDbms) - logger.info("the back-end DBMS is %s" % Backend.getDbms()) + logger.info(f"the back-end DBMS is {Backend.getDbms()}") def setOs(): """ @@ -58,11 +56,11 @@ def setOs(): if "type" in kb.bannerFp: Backend.setOs(Format.humanize(kb.bannerFp["type"])) - infoMsg = "the back-end DBMS operating system is %s" % Backend.getOs() + infoMsg = f"the back-end DBMS operating system is {Backend.getOs()}" if "distrib" in kb.bannerFp: kb.osVersion = Format.humanize(kb.bannerFp["distrib"]) - infoMsg += " %s" % kb.osVersion + infoMsg += f" {kb.osVersion}" if "sp" in kb.bannerFp: kb.osSP = int(Format.humanize(kb.bannerFp["sp"]).replace("Service Pack ", "")) diff --git a/lib/core/shell.py b/lib/core/shell.py index 2ed47cecb8e..ed119b95290 100644 --- a/lib/core/shell.py +++ b/lib/core/shell.py @@ -32,10 +32,7 @@ def global_matches(self, text): n = len(text) for ns in (self.namespace,): - for word in ns: - if word[:n] == text: - matches.append(word) - + matches.extend(word for word in ns if word[:n] == text) return matches except: readline._readline = None @@ -78,7 +75,7 @@ def saveHistory(completion=None): try: readline.write_history_file(historyPath) except IOError as ex: - warnMsg = "there was a problem writing the history file '%s' (%s)" % (historyPath, getSafeExString(ex)) + warnMsg = f"there was a problem writing the history file '{historyPath}' ({getSafeExString(ex)})" logger.warning(warnMsg) except KeyboardInterrupt: pass @@ -102,11 +99,11 @@ def loadHistory(completion=None): try: readline.read_history_file(historyPath) except IOError as ex: - warnMsg = "there was a problem loading the history file '%s' (%s)" % (historyPath, getSafeExString(ex)) + warnMsg = f"there was a problem loading the history file '{historyPath}' ({getSafeExString(ex)})" logger.warning(warnMsg) except UnicodeError: if IS_WIN: - warnMsg = "there was a problem loading the history file '%s'. " % historyPath + warnMsg = f"there was a problem loading the history file '{historyPath}'. " warnMsg += "More info can be found at 'https://github.com/pyreadline/pyreadline/issues/30'" logger.warning(warnMsg) @@ -142,7 +139,7 @@ def autoCompletion(completion=None, os=None, commands=None): readline.parse_and_bind("tab: complete") elif commands: - completer = CompleterNG(dict(((_, None) for _ in commands))) + completer = CompleterNG({_: None for _ in commands}) readline.set_completer_delims(' ') readline.set_completer(completer.complete) readline.parse_and_bind("tab: complete") diff --git a/lib/core/subprocessng.py b/lib/core/subprocessng.py index 36fdf65636e..086c6a09858 100644 --- a/lib/core/subprocessng.py +++ b/lib/core/subprocessng.py @@ -46,7 +46,7 @@ def blockingReadFromFD(fd): break if not output: - raise EOFError("fd %s has been closed." % fd) + raise EOFError(f"fd {fd} has been closed.") return output @@ -170,15 +170,11 @@ def _recv(self, which, maxsize): fcntl.fcntl(conn, fcntl.F_SETFL, flags) def recv_some(p, t=.1, e=1, tr=5, stderr=0): - if tr < 1: - tr = 1 + tr = max(tr, 1) x = time.time() + t y = [] r = '' - if stderr: - pr = p.recv_err - else: - pr = p.recv + pr = p.recv_err if stderr else p.recv while time.time() < x or r: r = pr() if r is None: diff --git a/lib/core/testing.py b/lib/core/testing.py index b39229c6d6e..5530f24b645 100644 --- a/lib/core/testing.py +++ b/lib/core/testing.py @@ -123,10 +123,12 @@ def _thread(): time.sleep(1) if not vulnserver._alive: - logger.error("problem occurred in vulnserver instantiation (address: 'http://%s:%s')" % (address, port)) + logger.error( + f"problem occurred in vulnserver instantiation (address: 'http://{address}:{port}')" + ) return False else: - logger.info("vulnserver running at 'http://%s:%s'..." % (address, port)) + logger.info(f"vulnserver running at 'http://{address}:{port}'...") handle, config = tempfile.mkstemp(suffix=".conf") os.close(handle) @@ -158,11 +160,21 @@ def _thread(): f.flush() base = "http://%s:%d/" % (address, port) - url = "%s?id=1" % base - direct = "sqlite3://%s" % database + url = f"{base}?id=1" + direct = f"sqlite3://{database}" tmpdir = tempfile.mkdtemp() - content = open(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "sqlmap.conf"))).read().replace("url =", "url = %s" % url) + content = ( + open( + os.path.abspath( + os.path.join( + os.path.dirname(__file__), "..", "..", "sqlmap.conf" + ) + ) + ) + .read() + .replace("url =", f"url = {url}") + ) with open(config, "w+") as f: f.write(content) f.flush() @@ -183,7 +195,17 @@ def _thread(): for tag, value in (("", url), ("", base), ("", direct), ("", tmpdir), ("", request), ("", log), ("", multiple), ("", config), ("", url.replace("id=1", "id=MZ=%3d"))): options = options.replace(tag, value) - cmd = "%s \"%s\" %s --batch --non-interactive --debug --time-sec=1" % (sys.executable if ' ' not in sys.executable else '"%s"' % sys.executable, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "sqlmap.py")), options) + cmd = "%s \"%s\" %s --batch --non-interactive --debug --time-sec=1" % ( + sys.executable + if ' ' not in sys.executable + else f'"{sys.executable}"', + os.path.abspath( + os.path.join( + os.path.dirname(__file__), "..", "..", "sqlmap.py" + ) + ), + options, + ) if "" in cmd: handle, tmp = tempfile.mkstemp() @@ -219,7 +241,7 @@ def smokeTest(): try: re.compile(regex) except re.error: - errMsg = "smoke test failed at compiling '%s'" % regex + errMsg = f"smoke test failed at compiling '{regex}'" logger.error(errMsg) return False @@ -276,7 +298,7 @@ def _(node): try: re.compile(candidate) except: - errMsg = "smoke test failed at compiling '%s'" % candidate + errMsg = f"smoke test failed at compiling '{candidate}'" logger.error(errMsg) raise else: diff --git a/lib/core/threads.py b/lib/core/threads.py index 8b5a21deff2..e918b12ad89 100644 --- a/lib/core/threads.py +++ b/lib/core/threads.py @@ -104,8 +104,12 @@ def exceptionHandledFunction(threadFunction, silent=False): from lib.core.common import getSafeExString if not silent and kb.get("threadContinue") and not kb.get("multipleCtrlC") and not isinstance(ex, (SqlmapUserQuitException, SqlmapSkipTargetException)): - errMsg = getSafeExString(ex) if isinstance(ex, SqlmapBaseException) else "%s: %s" % (type(ex).__name__, getSafeExString(ex)) - logger.error("thread %s: '%s'" % (threading.currentThread().getName(), errMsg)) + errMsg = ( + getSafeExString(ex) + if isinstance(ex, SqlmapBaseException) + else f"{type(ex).__name__}: {getSafeExString(ex)}" + ) + logger.error(f"thread {threading.currentThread().getName()}: '{errMsg}'") if conf.get("verbose") > 1 and not isinstance(ex, SqlmapConnectionException): traceback.print_exc() @@ -134,7 +138,18 @@ def _threadFunction(): kb.multiThreadMode = False try: - if threadChoice and conf.threads == numThreads == 1 and not (kb.injection.data and not any(_ not in (PAYLOAD.TECHNIQUE.TIME, PAYLOAD.TECHNIQUE.STACKED) for _ in kb.injection.data)): + if ( + threadChoice + and conf.threads == numThreads == 1 + and ( + not kb.injection.data + or any( + _ + not in (PAYLOAD.TECHNIQUE.TIME, PAYLOAD.TECHNIQUE.STACKED) + for _ in kb.injection.data + ) + ) + ): while True: message = "please enter number of threads? [Enter for %d (current)] " % numThreads choice = readInput(message, default=str(numThreads)) @@ -180,7 +195,7 @@ def _threadFunction(): try: thread.start() except Exception as ex: - errMsg = "error occurred while starting new thread ('%s')" % ex + errMsg = f"error occurred while starting new thread ('{ex}')" logger.critical(errMsg) break @@ -208,7 +223,9 @@ def _threadFunction(): kb.lastCtrlCTime = time.time() if numThreads > 1: - logger.info("waiting for threads to finish%s" % (" (Ctrl+C was pressed)" if isinstance(ex, KeyboardInterrupt) else "")) + logger.info( + f'waiting for threads to finish{" (Ctrl+C was pressed)" if isinstance(ex, KeyboardInterrupt) else ""}' + ) try: while (threading.active_count() > 1): pass @@ -223,7 +240,7 @@ def _threadFunction(): except (SqlmapConnectionException, SqlmapValueException) as ex: print() kb.threadException = True - logger.error("thread %s: '%s'" % (threading.currentThread().getName(), ex)) + logger.error(f"thread {threading.currentThread().getName()}: '{ex}'") if conf.get("verbose") > 1 and isinstance(ex, SqlmapValueException): traceback.print_exc() @@ -234,13 +251,12 @@ def _threadFunction(): if not kb.multipleCtrlC: if isinstance(ex, sqlite3.Error): raise - else: - from lib.core.common import unhandledExceptionMessage + from lib.core.common import unhandledExceptionMessage - kb.threadException = True - errMsg = unhandledExceptionMessage() - logger.error("thread %s: %s" % (threading.currentThread().getName(), errMsg)) - traceback.print_exc() + kb.threadException = True + errMsg = unhandledExceptionMessage() + logger.error(f"thread {threading.currentThread().getName()}: {errMsg}") + traceback.print_exc() finally: kb.multiThreadMode = False diff --git a/lib/core/update.py b/lib/core/update.py index 2fe6f12e1c6..c91e4f8699f 100644 --- a/lib/core/update.py +++ b/lib/core/update.py @@ -39,8 +39,10 @@ def update(): success = False if TYPE == "pip": - infoMsg = "updating sqlmap to the latest stable version from the " - infoMsg += "PyPI repository" + infoMsg = ( + "updating sqlmap to the latest stable version from the " + + "PyPI repository" + ) logger.info(infoMsg) debugMsg = "sqlmap will try to update itself using 'pip' command" @@ -67,11 +69,13 @@ def update(): elif not os.path.exists(os.path.join(paths.SQLMAP_ROOT_PATH, ".git")): warnMsg = "not a git repository. It is recommended to clone the 'sqlmapproject/sqlmap' repository " - warnMsg += "from GitHub (e.g. 'git clone --depth 1 %s sqlmap')" % GIT_REPOSITORY + warnMsg += f"from GitHub (e.g. 'git clone --depth 1 {GIT_REPOSITORY} sqlmap')" logger.warning(warnMsg) if VERSION == getLatestRevision(): - logger.info("already at the latest revision '%s'" % (getRevisionNumber() or VERSION)) + logger.info( + f"already at the latest revision '{getRevisionNumber() or VERSION}'" + ) return message = "do you want to try to fetch the latest 'zipball' from repository and extract it (experimental) ? [y/N]" @@ -81,7 +85,7 @@ def update(): try: open(os.path.join(directory, "sqlmap.py"), "w+b") except Exception as ex: - errMsg = "unable to update content of directory '%s' ('%s')" % (directory, getSafeExString(ex)) + errMsg = f"unable to update content of directory '{directory}' ('{getSafeExString(ex)}')" logger.error(errMsg) else: attrs = os.stat(os.path.join(directory, "sqlmap.py")).st_mode @@ -96,7 +100,7 @@ def update(): pass if glob.glob(os.path.join(directory, '*')): - errMsg = "unable to clear the content of directory '%s'" % directory + errMsg = f"unable to clear the content of directory '{directory}'" logger.error(errMsg) else: try: @@ -112,10 +116,10 @@ def update(): if os.path.isfile(filepath): with openFile(filepath, "rb") as f: version = re.search(r"(?m)^VERSION\s*=\s*['\"]([^'\"]+)", f.read()).group(1) - logger.info("updated to the latest version '%s#dev'" % version) + logger.info(f"updated to the latest version '{version}#dev'") success = True except Exception as ex: - logger.error("update could not be completed ('%s')" % getSafeExString(ex)) + logger.error(f"update could not be completed ('{getSafeExString(ex)}')") else: if not success: logger.error("update could not be completed") @@ -123,11 +127,15 @@ def update(): try: os.chmod(os.path.join(directory, "sqlmap.py"), attrs) except OSError: - logger.warning("could not set the file attributes of '%s'" % os.path.join(directory, "sqlmap.py")) + logger.warning( + f"""could not set the file attributes of '{os.path.join(directory, "sqlmap.py")}'""" + ) else: - infoMsg = "updating sqlmap to the latest development revision from the " - infoMsg += "GitHub repository" + infoMsg = ( + "updating sqlmap to the latest development revision from the " + + "GitHub repository" + ) logger.info(infoMsg) debugMsg = "sqlmap will try to update itself using 'git' command" @@ -137,7 +145,13 @@ def update(): output = "" try: - process = subprocess.Popen("git checkout . && git pull %s HEAD" % GIT_REPOSITORY, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=paths.SQLMAP_ROOT_PATH) + process = subprocess.Popen( + f"git checkout . && git pull {GIT_REPOSITORY} HEAD", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=paths.SQLMAP_ROOT_PATH, + ) pollProcess(process, True) output, _ = process.communicate() success = not process.returncode @@ -148,24 +162,28 @@ def update(): output = getText(output) if success: - logger.info("%s the latest revision '%s'" % ("already at" if "Already" in output else "updated to", getRevisionNumber())) + logger.info( + f"""{"already at" if "Already" in output else "updated to"} the latest revision '{getRevisionNumber()}'""" + ) + elif "Not a git repository" in output: + errMsg = "not a valid git repository. Please checkout the 'sqlmapproject/sqlmap' repository " + errMsg += f"from GitHub (e.g. 'git clone --depth 1 {GIT_REPOSITORY} sqlmap')" + logger.error(errMsg) else: - if "Not a git repository" in output: - errMsg = "not a valid git repository. Please checkout the 'sqlmapproject/sqlmap' repository " - errMsg += "from GitHub (e.g. 'git clone --depth 1 %s sqlmap')" % GIT_REPOSITORY - logger.error(errMsg) - else: - logger.error("update could not be completed ('%s')" % re.sub(r"\W+", " ", output).strip()) + logger.error("update could not be completed ('%s')" % re.sub(r"\W+", " ", output).strip()) if not success: if IS_WIN: - infoMsg = "for Windows platform it's recommended " - infoMsg += "to use a GitHub for Windows client for updating " + infoMsg = ( + "for Windows platform it's recommended " + + "to use a GitHub for Windows client for updating " + ) infoMsg += "purposes (https://desktop.github.com/) or just " infoMsg += "download the latest snapshot from " infoMsg += "https://github.com/sqlmapproject/sqlmap/downloads" else: - infoMsg = "for Linux platform it's recommended " - infoMsg += "to install a standard 'git' package (e.g.: 'apt install git')" - + infoMsg = ( + "for Linux platform it's recommended " + + "to install a standard 'git' package (e.g.: 'apt install git')" + ) logger.info(infoMsg) diff --git a/lib/core/wordlist.py b/lib/core/wordlist.py index 781642bf5b7..b7a46c2258c 100644 --- a/lib/core/wordlist.py +++ b/lib/core/wordlist.py @@ -52,11 +52,11 @@ def adjust(self): _ = zipfile.ZipFile(self.current, 'r') except zipfile.error as ex: errMsg = "something appears to be wrong with " - errMsg += "the file '%s' ('%s'). Please make " % (self.current, getSafeExString(ex)) + errMsg += f"the file '{self.current}' ('{getSafeExString(ex)}'). Please make " errMsg += "sure that you haven't made any changes to it" raise SqlmapInstallationException(errMsg) if len(_.namelist()) == 0: - errMsg = "no file(s) inside '%s'" % self.current + errMsg = f"no file(s) inside '{self.current}'" raise SqlmapDataException(errMsg) self.fp = _.open(_.namelist()[0]) else: @@ -78,7 +78,7 @@ def __next__(self): retVal = next(self.iter).rstrip() except zipfile.error as ex: errMsg = "something appears to be wrong with " - errMsg += "the file '%s' ('%s'). Please make " % (self.current, getSafeExString(ex)) + errMsg += f"the file '{self.current}' ('{getSafeExString(ex)}'). Please make " errMsg += "sure that you haven't made any changes to it" raise SqlmapInstallationException(errMsg) except StopIteration: diff --git a/lib/parse/banner.py b/lib/parse/banner.py index 42b4dddc1ba..4465e42182c 100644 --- a/lib/parse/banner.py +++ b/lib/parse/banner.py @@ -77,7 +77,11 @@ def endElement(self, name): self._version = self._version.replace(" ", "") match = re.search(r"\A(?P\d+)\.00\.(?P\d+)\Z", self._version) - self._versionAlt = "%s.0.%s.0" % (match.group('major'), match.group('build')) if match else None + self._versionAlt = ( + f"{match.group('major')}.0.{match.group('build')}.0" + if match + else None + ) elif name == "servicepack": self._inServicePack = False @@ -108,8 +112,8 @@ def bannerParser(banner): parseXmlFile(xmlfile, handler) handler = FingerprintHandler(banner, kb.bannerFp) - parseXmlFile(paths.GENERIC_XML, handler) else: handler = FingerprintHandler(banner, kb.bannerFp) parseXmlFile(xmlfile, handler) - parseXmlFile(paths.GENERIC_XML, handler) + + parseXmlFile(paths.GENERIC_XML, handler) diff --git a/lib/parse/configfile.py b/lib/parse/configfile.py index 6891d11b4e3..275446ba87d 100644 --- a/lib/parse/configfile.py +++ b/lib/parse/configfile.py @@ -39,16 +39,13 @@ def configFileProxy(section, option, datatype): value = config.get(section, option) except ValueError as ex: errMsg = "error occurred while processing the option " - errMsg += "'%s' in provided configuration file ('%s')" % (option, getUnicode(ex)) + errMsg += f"'{option}' in provided configuration file ('{getUnicode(ex)}')" raise SqlmapSyntaxException(errMsg) - if value: - conf[option] = value - else: - conf[option] = None + conf[option] = value if value else None else: - debugMsg = "missing requested option '%s' (section " % option - debugMsg += "'%s') into the configuration file, " % section + debugMsg = f"missing requested option '{option}' (section " + debugMsg += f"'{section}') into the configuration file, " debugMsg += "ignoring. Skipping to next." logger.debug(debugMsg) @@ -70,23 +67,32 @@ def configFileParser(configFile): config = UnicodeRawConfigParser() config.readfp(configFP) except Exception as ex: - errMsg = "you have provided an invalid and/or unreadable configuration file ('%s')" % getSafeExString(ex) + errMsg = f"you have provided an invalid and/or unreadable configuration file ('{getSafeExString(ex)}')" raise SqlmapSyntaxException(errMsg) if not config.has_section("Target"): errMsg = "missing a mandatory section 'Target' in the configuration file" raise SqlmapMissingMandatoryOptionException(errMsg) - mandatory = False - - for option in ("direct", "url", "logFile", "bulkFile", "googleDork", "requestFile", "wizard"): - if config.has_option("Target", option) and config.get("Target", option) or cmdLineOptions.get(option): - mandatory = True - break - + mandatory = any( + config.has_option("Target", option) + and config.get("Target", option) + or cmdLineOptions.get(option) + for option in ( + "direct", + "url", + "logFile", + "bulkFile", + "googleDork", + "requestFile", + "wizard", + ) + ) if not mandatory: - errMsg = "missing a mandatory option in the configuration file " - errMsg += "(direct, url, logFile, bulkFile, googleDork, requestFile or wizard)" + errMsg = ( + "missing a mandatory option in the configuration file " + + "(direct, url, logFile, bulkFile, googleDork, requestFile or wizard)" + ) raise SqlmapMissingMandatoryOptionException(errMsg) for family, optionData in optDict.items(): diff --git a/lib/parse/handler.py b/lib/parse/handler.py index 9b951810cf9..531ce3e5786 100644 --- a/lib/parse/handler.py +++ b/lib/parse/handler.py @@ -66,12 +66,15 @@ def startElement(self, name, attrs): self._feedInfo("dbmsVersion", self._match.group(int(self._dbmsVersion))) if self._techVersion and self._techVersion.isdigit(): - self._feedInfo("technology", "%s %s" % (attrs.get("technology"), self._match.group(int(self._techVersion)))) + self._feedInfo( + "technology", + f'{attrs.get("technology")} {self._match.group(int(self._techVersion))}', + ) else: self._feedInfo("technology", attrs.get("technology")) if self._sp.isdigit(): - self._feedInfo("sp", "Service Pack %s" % int(self._sp)) + self._feedInfo("sp", f"Service Pack {int(self._sp)}") self._regexp = None self._match = None diff --git a/lib/parse/payloads.py b/lib/parse/payloads.py index 591abbfb7e0..39f44747abd 100644 --- a/lib/parse/payloads.py +++ b/lib/parse/payloads.py @@ -29,12 +29,8 @@ def cleanupVals(text, tag): text = int(text) elif isinstance(text, list): - count = 0 - - for _ in text: + for count, _ in enumerate(text): text[count] = int(_) if _.isdigit() else _ - count += 1 - if len(text) == 1 and tag not in ("clause", "where"): text = text[0] @@ -90,7 +86,7 @@ def loadBoundaries(): doc = et.parse(paths.BOUNDARIES_XML) except Exception as ex: errMsg = "something appears to be wrong with " - errMsg += "the file '%s' ('%s'). Please make " % (paths.BOUNDARIES_XML, getSafeExString(ex)) + errMsg += f"the file '{paths.BOUNDARIES_XML}' ('{getSafeExString(ex)}'). Please make " errMsg += "sure that you haven't made any changes to it" raise SqlmapInstallationException(errMsg) @@ -114,7 +110,7 @@ def loadPayloads(): doc = et.parse(payloadFilePath) except Exception as ex: errMsg = "something appears to be wrong with " - errMsg += "the file '%s' ('%s'). Please make " % (payloadFilePath, getSafeExString(ex)) + errMsg += f"the file '{payloadFilePath}' ('{getSafeExString(ex)}'). Please make " errMsg += "sure that you haven't made any changes to it" raise SqlmapInstallationException(errMsg) diff --git a/lib/parse/sitemap.py b/lib/parse/sitemap.py index db2f0901e9e..76bcdbd94f3 100644 --- a/lib/parse/sitemap.py +++ b/lib/parse/sitemap.py @@ -21,7 +21,7 @@ def parseSitemap(url, retVal=None): global abortedFlag if retVal is not None: - logger.debug("parsing sitemap '%s'" % url) + logger.debug(f"parsing sitemap '{url}'") try: if retVal is None: @@ -31,7 +31,7 @@ def parseSitemap(url, retVal=None): try: content = Request.getPage(url=url, raise404=True)[0] if not abortedFlag else "" except _http_client.InvalidURL: - errMsg = "invalid URL given for sitemap ('%s')" % url + errMsg = f"invalid URL given for sitemap ('{url}')" raise SqlmapSyntaxException(errMsg) for match in re.finditer(r"\s*([^<]+)", content or ""): @@ -49,8 +49,10 @@ def parseSitemap(url, retVal=None): except KeyboardInterrupt: abortedFlag = True - warnMsg = "user aborted during sitemap parsing. sqlmap " - warnMsg += "will use partial list" + warnMsg = ( + "user aborted during sitemap parsing. sqlmap " + + "will use partial list" + ) logger.warning(warnMsg) return retVal diff --git a/lib/request/basic.py b/lib/request/basic.py index c00fd0df6ca..12a3c3761a0 100644 --- a/lib/request/basic.py +++ b/lib/request/basic.py @@ -111,20 +111,32 @@ def title(self): if cookie is None or cookie.domain_specified and not (conf.hostname or "").endswith(cookie.domain): continue - if ("%s=" % getUnicode(cookie.name)) in getUnicode(headers[HTTP_HEADER.COOKIE]): + if f"{getUnicode(cookie.name)}=" in getUnicode( + headers[HTTP_HEADER.COOKIE] + ): if conf.loadCookies: conf.httpHeaders = filterNone((item if item[0] != HTTP_HEADER.COOKIE else None) for item in conf.httpHeaders) elif kb.mergeCookies is None: - message = "you provided a HTTP %s header value, while " % HTTP_HEADER.COOKIE + message = f"you provided a HTTP {HTTP_HEADER.COOKIE} header value, while " message += "target URL provides its own cookies within " - message += "HTTP %s header which intersect with yours. " % HTTP_HEADER.SET_COOKIE + message += f"HTTP {HTTP_HEADER.SET_COOKIE} header which intersect with yours. " message += "Do you want to merge them in further requests? [Y/n] " kb.mergeCookies = readInput(message, default='Y', boolean=True) if kb.mergeCookies and kb.injection.place != PLACE.COOKIE: def _(value): - return re.sub(r"(?i)\b%s=[^%s]+" % (re.escape(getUnicode(cookie.name)), conf.cookieDel or DEFAULT_COOKIE_DELIMITER), ("%s=%s" % (getUnicode(cookie.name), getUnicode(cookie.value))).replace('\\', r'\\'), value) + return re.sub( + r"(?i)\b%s=[^%s]+" + % ( + re.escape(getUnicode(cookie.name)), + conf.cookieDel or DEFAULT_COOKIE_DELIMITER, + ), + f"{getUnicode(cookie.name)}={getUnicode(cookie.value)}".replace( + '\\', r'\\' + ), + value, + ) headers[HTTP_HEADER.COOKIE] = _(headers[HTTP_HEADER.COOKIE]) @@ -134,7 +146,9 @@ def _(value): conf.httpHeaders = [(item[0], item[1] if item[0] != HTTP_HEADER.COOKIE else _(item[1])) for item in conf.httpHeaders] elif not kb.testMode: - headers[HTTP_HEADER.COOKIE] += "%s %s=%s" % (conf.cookieDel or DEFAULT_COOKIE_DELIMITER, getUnicode(cookie.name), getUnicode(cookie.value)) + headers[ + HTTP_HEADER.COOKIE + ] += f"{conf.cookieDel or DEFAULT_COOKIE_DELIMITER} {getUnicode(cookie.name)}={getUnicode(cookie.value)}" if kb.testMode and not any((conf.csrfToken, conf.safeUrl)): resetCookieJar(conf.cj) @@ -207,17 +221,17 @@ def checkCharEncoding(encoding, warn=True): # name adjustment for compatibility if encoding.startswith("8859"): - encoding = "iso-%s" % encoding + encoding = f"iso-{encoding}" elif encoding.startswith("cp-"): - encoding = "cp%s" % encoding[3:] + encoding = f"cp{encoding[3:]}" elif encoding.startswith("euc-"): - encoding = "euc_%s" % encoding[4:] + encoding = f"euc_{encoding[4:]}" elif encoding.startswith("windows") and not encoding.startswith("windows-"): - encoding = "windows-%s" % encoding[7:] + encoding = f"windows-{encoding[7:]}" elif encoding.find("iso-88") > 0: encoding = encoding[encoding.find("iso-88"):] elif encoding.startswith("is0-"): - encoding = "iso%s" % encoding[4:] + encoding = f"iso{encoding[4:]}" elif encoding.find("ascii") > 0: encoding = "ascii" elif encoding.find("utf8") > 0: @@ -243,7 +257,7 @@ def checkCharEncoding(encoding, warn=True): six.text_type(getBytes(randomStr()), encoding) except: if warn: - warnMsg = "invalid web page charset '%s'" % encoding + warnMsg = f"invalid web page charset '{encoding}'" singleTimeLogMessage(warnMsg, logging.WARN, encoding) encoding = None @@ -264,7 +278,7 @@ def getHeuristicCharEncoding(page): kb.cache.encoding[key] = retVal if retVal and retVal.lower().replace('-', "") == UNICODE_ENCODING.lower().replace('-', ""): - infoMsg = "heuristics detected web page charset '%s'" % retVal + infoMsg = f"heuristics detected web page charset '{retVal}'" singleTimeLogMessage(infoMsg, logging.INFO, retVal) return retVal @@ -287,11 +301,7 @@ def decodePage(page, contentEncoding, contentType, percentDecode=True): else: contentEncoding = "" - if hasattr(contentType, "lower"): - contentType = contentType.lower() - else: - contentType = "" - + contentType = contentType.lower() if hasattr(contentType, "lower") else "" if contentEncoding in ("gzip", "x-gzip", "deflate"): if not kb.pageCompress: return None @@ -309,7 +319,7 @@ def decodePage(page, contentEncoding, contentType, percentDecode=True): except Exception as ex: if b"%s" % value.replace('\\', r'\\'), conf.parameters[PLACE.POST]) + conf.paramDict[PLACE.POST][name] = value + conf.parameters[PLACE.POST] = re.sub( + f"(?i)({re.escape(name)}=)[^&]+", + r"\g<1>%s" % value.replace('\\', r'\\'), + conf.parameters[PLACE.POST], + ) if not kb.browserVerification and re.search(r"(?i)browser.?verification", page or ""): kb.browserVerification = True diff --git a/lib/request/basicauthhandler.py b/lib/request/basicauthhandler.py index f7c8408d82e..f29f6c29599 100644 --- a/lib/request/basicauthhandler.py +++ b/lib/request/basicauthhandler.py @@ -29,10 +29,9 @@ def http_error_auth_reqed(self, auth_header, host, req, headers): if hash(req) not in self.retried_req: self.retried_req.add(hash(req)) self.retried_count = 0 + elif self.retried_count > 5: + raise _urllib.error.HTTPError(req.get_full_url(), 401, "basic auth failed", headers, None) else: - if self.retried_count > 5: - raise _urllib.error.HTTPError(req.get_full_url(), 401, "basic auth failed", headers, None) - else: - self.retried_count += 1 + self.retried_count += 1 return _urllib.request.HTTPBasicAuthHandler.http_error_auth_reqed(self, auth_header, host, req, headers) diff --git a/lib/request/comparison.py b/lib/request/comparison.py index c703b2bb986..8ee888f3aee 100644 --- a/lib/request/comparison.py +++ b/lib/request/comparison.py @@ -35,8 +35,10 @@ from thirdparty import six def comparison(page, headers, code=None, getRatioValue=False, pageLength=None): - _ = _adjust(_comparison(page, headers, code, getRatioValue, pageLength), getRatioValue) - return _ + return _adjust( + _comparison(page, headers, code, getRatioValue, pageLength), + getRatioValue, + ) def _adjust(condition, getRatioValue): if not any((conf.string, conf.notString, conf.regexp, conf.code)): @@ -44,17 +46,31 @@ def _adjust(condition, getRatioValue): # PAYLOAD.WHERE.NEGATIVE response is considered as True; in switch based approach negative logic is not # applied as that what is by user considered as True is that what is returned by the comparison mechanism # itself - retVal = not condition if kb.negativeLogic and condition is not None and not getRatioValue else condition + return ( + not condition + if kb.negativeLogic and condition is not None and not getRatioValue + else condition + ) else: - retVal = condition if not getRatioValue else (MAX_RATIO if condition else MIN_RATIO) - - return retVal + return ( + condition + if not getRatioValue + else (MAX_RATIO if condition else MIN_RATIO) + ) def _comparison(page, headers, code, getRatioValue, pageLength): threadData = getCurrentThreadData() if kb.testMode: - threadData.lastComparisonHeaders = listToStrValue(_ for _ in headers.headers if not _.startswith("%s:" % URI_HTTP_HEADER)) if headers else "" + threadData.lastComparisonHeaders = ( + listToStrValue( + _ + for _ in headers.headers + if not _.startswith(f"{URI_HTTP_HEADER}:") + ) + if headers + else "" + ) threadData.lastComparisonPage = page threadData.lastComparisonCode = code @@ -62,7 +78,7 @@ def _comparison(page, headers, code, getRatioValue, pageLength): return None if any((conf.string, conf.notString, conf.regexp)): - rawResponse = "%s%s" % (listToStrValue(_ for _ in headers.headers if not _.startswith("%s:" % URI_HTTP_HEADER)) if headers else "", page) + rawResponse = f'{listToStrValue(_ for _ in headers.headers if not _.startswith(f"{URI_HTTP_HEADER}:")) if headers else ""}{page}' # String to match in page when the query is True if conf.string: @@ -72,11 +88,10 @@ def _comparison(page, headers, code, getRatioValue, pageLength): if conf.notString: if conf.notString in rawResponse: return False + if kb.errorIsNone and (wasLastResponseDBMSError() or wasLastResponseHTTPError()): + return None else: - if kb.errorIsNone and (wasLastResponseDBMSError() or wasLastResponseHTTPError()): - return None - else: - return True + return True # Regular expression to match in page when the query is True and/or valid if conf.regexp: @@ -92,7 +107,10 @@ def _comparison(page, headers, code, getRatioValue, pageLength): if page: # In case of an DBMS error page return None if kb.errorIsNone and (wasLastResponseDBMSError() or wasLastResponseHTTPError()) and not kb.negativeLogic: - if not (wasLastResponseHTTPError() and getLastRequestHTTPError() in (conf.ignoreCode or [])): + if ( + not wasLastResponseHTTPError() + or getLastRequestHTTPError() not in (conf.ignoreCode or []) + ): return None # Dynamic content lines to be excluded before comparison @@ -105,8 +123,10 @@ def _comparison(page, headers, code, getRatioValue, pageLength): if kb.nullConnection and pageLength: if not seqMatcher.a: - errMsg = "problem occurred while retrieving original page content " - errMsg += "which prevents sqlmap from continuation. Please rerun, " + errMsg = ( + "problem occurred while retrieving original page content " + + "which prevents sqlmap from continuation. Please rerun, " + ) errMsg += "and if the problem persists turn off any optimization switches" raise SqlmapNoneDataException(errMsg) @@ -129,10 +149,9 @@ def _comparison(page, headers, code, getRatioValue, pageLength): elif kb.skipSeqMatcher or seqMatcher.a and page and any(len(_) > MAX_DIFFLIB_SEQUENCE_LENGTH for _ in (seqMatcher.a, page)): if not page or not seqMatcher.a: return float(seqMatcher.a == page) - else: - ratio = 1. * len(seqMatcher.a) / len(page) - if ratio > 1: - ratio = 1. / ratio + ratio = 1. * len(seqMatcher.a) / len(page) + if ratio > 1: + ratio = 1. / ratio else: seq1, seq2 = None, None @@ -168,10 +187,8 @@ def _comparison(page, headers, code, getRatioValue, pageLength): if key: kb.cache.comparison[key] = ratio - # If the url is stable and we did not set yet the match ratio and the - # current injected value changes the url page content - if kb.matchRatio is None: - if ratio >= LOWER_RATIO_BOUND and ratio <= UPPER_RATIO_BOUND: + if ratio >= LOWER_RATIO_BOUND and ratio <= UPPER_RATIO_BOUND: + if kb.matchRatio is None: kb.matchRatio = ratio logger.debug("setting match ratio for current parameter to %.3f" % kb.matchRatio) diff --git a/lib/request/connect.py b/lib/request/connect.py index c9198bc5b1b..2455632eb21 100644 --- a/lib/request/connect.py +++ b/lib/request/connect.py @@ -180,10 +180,10 @@ def _retryProxy(**kwargs): setHTTPHandlers() if kb.testMode and kb.previousMethod == PAYLOAD.METHOD.TIME: - # timed based payloads can cause web server unresponsiveness - # if the injectable piece of code is some kind of JOIN-like query - warnMsg = "most likely web server instance hasn't recovered yet " - warnMsg += "from previous timed based payload. If the problem " + warnMsg = ( + "most likely web server instance hasn't recovered yet " + + "from previous timed based payload. If the problem " + ) warnMsg += "persists please wait for a few minutes and rerun " warnMsg += "without flag 'T' in option '--technique' " warnMsg += "(e.g. '--flush-session --technique=BEUS') or try to " @@ -192,8 +192,7 @@ def _retryProxy(**kwargs): elif kb.originalPage is None: if conf.tor: - warnMsg = "please make sure that you have " - warnMsg += "Tor installed and running so " + warnMsg = "please make sure that you have " + "Tor installed and running so " warnMsg += "you could successfully use " warnMsg += "switch '--tor' " if IS_WIN: @@ -201,9 +200,10 @@ def _retryProxy(**kwargs): else: warnMsg += "(e.g. 'https://help.ubuntu.com/community/Tor')" else: - warnMsg = "if the problem persists please check that the provided " - warnMsg += "target URL is reachable" - + warnMsg = ( + "if the problem persists please check that the provided " + + "target URL is reachable" + ) items = [] if not conf.randomAgent: items.append("switch '--random-agent'") @@ -217,8 +217,10 @@ def _retryProxy(**kwargs): singleTimeWarnMessage(warnMsg) elif conf.threads > 1: - warnMsg = "if the problem persists please try to lower " - warnMsg += "the number of used threads (option '--threads')" + warnMsg = ( + "if the problem persists please try to lower " + + "the number of used threads (option '--threads')" + ) singleTimeWarnMessage(warnMsg) kwargs['retrying'] = True @@ -241,16 +243,21 @@ def _connReadProxy(conn): while True: if not conn: break - else: - try: - part = conn.read(MAX_CONNECTION_READ_SIZE) - except AssertionError: - part = b"" + try: + part = conn.read(MAX_CONNECTION_READ_SIZE) + except AssertionError: + part = b"" if len(part) == MAX_CONNECTION_READ_SIZE: warnMsg = "large response detected. This could take a while" singleTimeWarnMessage(warnMsg) - part = re.sub(getBytes(r"(?si)%s.+?%s" % (kb.chars.stop, kb.chars.start)), getBytes("%s%s%s" % (kb.chars.stop, LARGE_READ_TRIM_MARKER, kb.chars.start)), part) + part = re.sub( + getBytes(f"(?si){kb.chars.stop}.+?{kb.chars.start}"), + getBytes( + f"{kb.chars.stop}{LARGE_READ_TRIM_MARKER}{kb.chars.start}" + ), + part, + ) retVal += part else: retVal += part diff --git a/sqlmap.py b/sqlmap.py index f35db5504c7..ae937830933 100755 --- a/sqlmap.py +++ b/sqlmap.py @@ -117,14 +117,18 @@ def checkEnvironment(): try: os.path.isdir(modulePath()) except UnicodeEncodeError: - errMsg = "your system does not properly handle non-ASCII paths. " - errMsg += "Please move the sqlmap's directory to the other location" + errMsg = ( + "your system does not properly handle non-ASCII paths. " + + "Please move the sqlmap's directory to the other location" + ) logger.critical(errMsg) raise SystemExit if LooseVersion(VERSION) < LooseVersion("1.0"): - errMsg = "your runtime environment (e.g. PYTHONPATH) is " - errMsg += "broken. Please make sure that you are not running " + errMsg = ( + "your runtime environment (e.g. PYTHONPATH) is " + + "broken. Please make sure that you are not running " + ) errMsg += "newer versions of sqlmap with runtime scripts for older " errMsg += "versions" logger.critical(errMsg)