From 3cce1bfe5f2e48c77a727e94d723162c4153441a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20=C5=A0incek?= <35937483+ivan-sincek@users.noreply.github.com> Date: Wed, 15 May 2024 13:21:52 +0200 Subject: [PATCH] Small Chnages Bug fix for `paths-sniper`, and improved the request methods by adding more control over what is returned. --- pyproject.toml | 2 +- src/forbidden/forbidden.py | 232 ++++++++++++++++++++++--------------- src/stresser/stresser.py | 170 ++++++++++++++------------- 3 files changed, 229 insertions(+), 175 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f8bf9f0..41bf476 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "forbidden" -version = "11.0" +version = "11.1" authors = [{ name = "Ivan Sincek" }] description = "Bypass 4xx HTTP response status codes and more. Based on PycURL and Python Requests." readme = "README.md" diff --git a/src/forbidden/forbidden.py b/src/forbidden/forbidden.py index 5bdcbee..181b0ad 100644 --- a/src/forbidden/forbidden.py +++ b/src/forbidden/forbidden.py @@ -319,7 +319,7 @@ def write_file(data, out): # ---------------------------------------- -default_user_agent = "Forbidden/11.0" +default_user_agent = "Forbidden/11.1" def get_all_user_agents(): tmp = [] @@ -364,7 +364,7 @@ def __init__(self, url, ignore_qsf, ignore_curl, tests, force, values, paths, ev self.__show_table = show_table self.__debug = debug # -------------------------------- - # NOTE: PycURL configuration. + # NOTE: cURL configuration. self.__curl = not ignore_curl self.__verify = False # NOTE: Ignore SSL/TLS verification. self.__allow_redirects = True @@ -396,23 +396,26 @@ def __parse_url(self, url, ignore_qsf = False, case_sensitive = False): fragment["parsed"] = {} # NOTE: Not used. fragment["full" ] = ("#{0}").format(url.fragment) if url.fragment else "" # -------------------------------- - tmp = {} - tmp["scheme" ] = scheme - tmp["domain_no_port" ] = domain.split(":", 1)[0] - tmp["port" ] = port - tmp["domain" ] = domain - tmp["domain_extended" ] = get_all_domains(tmp["scheme"], tmp["domain_no_port"], tmp["port"]) + tmp = {} + tmp["scheme" ] = scheme + tmp["port" ] = port # -------------------------------- - tmp["ip_no_port" ] = None - tmp["ip" ] = None - tmp["ip_extended" ] = None - tmp["scheme_ip" ] = None + tmp["domain_no_port" ] = domain.split(":", 1)[0] + tmp["domain" ] = domain + tmp["domain_extended" ] = get_all_domains(tmp["scheme"], tmp["domain_no_port"], tmp["port"]) + tmp["scheme_domain" ] = ("{0}://{1}").format(tmp["scheme"], tmp["domain"]) + tmp["scheme_domain_no_port"] = ("{0}://{1}").format(tmp["scheme"], tmp["domain_no_port"]) # -------------------------------- - tmp["scheme_domain" ] = ("{0}://{1}").format(tmp["scheme"], tmp["domain"]) - tmp["path" ] = path - tmp["query" ] = query - tmp["fragment" ] = fragment - tmp["path_full" ] = tmp["path"] + tmp["query"]["full"] + tmp["fragment"]["full"] + tmp["ip_no_port" ] = None + tmp["ip" ] = None + tmp["ip_extended" ] = None + tmp["scheme_ip" ] = None + tmp["scheme_ip_no_port" ] = None + # -------------------------------- + tmp["path" ] = path + tmp["query" ] = query + tmp["fragment" ] = fragment + tmp["path_full" ] = tmp["path"] + tmp["query"]["full"] + tmp["fragment"]["full"] # -------------------------------- tmp["urls" ] = { "base" : tmp["scheme_domain"] + tmp["path_full"], @@ -427,21 +430,21 @@ def __parse_url(self, url, ignore_qsf = False, case_sensitive = False): } # -------------------------------- tmp["relative_paths" ] = extend_path(tmp["path"]) + extend_path(tmp["path"], tmp["query"]["full"], tmp["fragment"]["full"]) - tmp["absolute_paths" ] = append_paths(("{0}://{1}").format(tmp["scheme"], tmp["domain_no_port"]), tmp["relative_paths"]) + append_paths(tmp["scheme_domain"], tmp["relative_paths"]) + tmp["absolute_paths" ] = append_paths(tmp["scheme_domain_no_port"], tmp["relative_paths"]) + append_paths(tmp["scheme_domain"], tmp["relative_paths"]) # -------------------------------- for key in tmp: if isinstance(tmp[key], list): tmp[key] = unique(tmp[key]) return tmp - # -------------------------------- def __parse_ip(self, obj): try: - obj["ip_no_port" ] = socket.gethostbyname(obj["domain_no_port"]) - obj["ip" ] = ("{0}:{1}").format(obj["ip_no_port"], obj["port"]) - obj["ip_extended"] = get_all_domains(obj["scheme"], obj["ip_no_port"], obj["port"]) - obj["scheme_ip" ] = ("{0}://{1}").format(obj["scheme"], obj["ip"]) - obj["urls"]["ip" ] = { + obj["ip_no_port" ] = socket.gethostbyname(obj["domain_no_port"]) + obj["ip" ] = ("{0}:{1}").format(obj["ip_no_port"], obj["port"]) + obj["ip_extended" ] = get_all_domains(obj["scheme"], obj["ip_no_port"], obj["port"]) + obj["scheme_ip" ] = ("{0}://{1}").format(obj["scheme"], obj["ip"]) + obj["scheme_ip_no_port"] = ("{0}://{1}").format(obj["scheme"], obj["ip_no_port"]) + obj["urls"]["ip" ] = { "https": get_base_https_url(obj["scheme"], obj["ip_no_port"], obj["port"], obj["path_full"]), "http" : get_base_http_url(obj["scheme"], obj["ip_no_port"], obj["port"], obj["path_full"]) } @@ -449,6 +452,8 @@ def __parse_ip(self, obj): self.__print_debug(ex) return obj + # ------------------------------------ + def __add_content_lengths(self, content_lengths): if not isinstance(content_lengths, list): content_lengths = [content_lengths] @@ -480,6 +485,8 @@ def __decode(self, values): else: return values.decode(self.__encoding) + # ------------------------------------ + def run(self): self.__validate_inaccessible_and_evil_urls() if not self.__error: @@ -491,17 +498,16 @@ def run(self): if not self.__collection: print("No test records were created") else: - self.__filter_collection() + self.__remove_duplicates() print_cyan(("Number of created test records: {0}").format(len(self.__collection))) self.__run_tests() self.__validate_results() def __validate_inaccessible_and_evil_urls(self): - # -------------------------------- print_cyan(("Normalized inaccessible URL: {0}").format(self.__url["urls"]["base"])) print_time(("Validating the inaccessible URL using HTTP {0} method...").format(self.__force if self.__force else self.__default_method)) - record = self.__fetch(url = self.__url["urls"]["base"], method = self.__force if self.__force else self.__default_method) - if not (record["code"] > 0): + record = self.__fetch(url = self.__url["urls"]["base"], method = self.__force if self.__force else self.__default_method, ignore = False) + if record["code"] <= 0: self.__print_error("Cannot validate the inaccessible URL, script will exit shortly...") elif "base" in self.__content_lengths: print_green(("Ignoring the inaccessible URL response content length: {0}").format(record["length"])) @@ -512,12 +518,12 @@ def __validate_inaccessible_and_evil_urls(self): print_cyan(("Normalized evil URL: {0}").format(self.__evil["urls"]["base"])) print_time(("Validating the evil URL using HTTP {0} method...").format(self.__default_method)) record = self.__fetch(url = self.__evil["urls"]["base"], method = self.__default_method) - if not (record["code"] > 0): + if record["code"] == IGNORED: + self.__print_error("Evil URL is ignored, script will exit shortly...") + elif record["code"] <= 0: self.__print_error("Cannot validate the evil URL, script will exit shortly...") - # -------------------------------- def __fetch_inaccessible_and_evil_ips(self): - # -------------------------------- print_time("Fetching the IP of inaccessible URL...") self.__url = self.__parse_ip(copy.deepcopy(self.__url)) if not self.__url["ip_no_port"]: @@ -528,16 +534,17 @@ def __fetch_inaccessible_and_evil_ips(self): self.__evil = self.__parse_ip(copy.deepcopy(self.__evil)) if not self.__evil["ip_no_port"]: self.__print_error("Cannot fetch the IP of evil URL, script will exit shortly...") - # -------------------------------- # NOTE: Proceed with the first valid accessible URL. def __validate_accessible_urls(self): - if self.__check_tests(["headers", "all"]): + if self.__check_tests(["headers", "all"]): # NOTE: Only tests that use the accessible URL. print_time(("Validating the accessible URLs using HTTP {0} method...").format(self.__default_method)) for url in copy.deepcopy(self.__accessible): self.__accessible = "" record = self.__fetch(url = url, method = self.__default_method) - if record["code"] >= 200 and record["code"] < 400: + if record["code"] == IGNORED: + print_yellow(("Valid accessible URL ignored: {0}").format(record["url"])) + elif record["code"] >= 200 and record["code"] < 300: print_green(("Valid accessible URL found: {0}").format(record["url"])) self.__accessible = record["url"] if "path" in self.__content_lengths: @@ -549,24 +556,22 @@ def __validate_accessible_urls(self): print_cyan("No valid accessible URLs were found, moving on...") def __set_allowed_http_methods(self): - # -------------------------------- if self.__force: print_cyan(("Forcing HTTP {0} method for all non-specific test cases...").format(self.__force)) self.__allowed_methods = [self.__force] - # -------------------------------- - elif self.__check_tests(["methods", "method-overrides", "all"]): + elif self.__check_tests(["methods", "method-overrides", "all"]): # NOTE: Only tests that use multiple HTTP methods. print_time("Fetching allowed HTTP methods...") - record = self.__fetch(url = self.__url["urls"]["base"], method = "OPTIONS") + record = self.__fetch(url = self.__url["urls"]["base"], method = "OPTIONS", ignore = False, response_headers = True) if record["code"] > 0: if record["curl"]: - methods = re.search(r"(?<=^allow\:).+", record["response_headers"], self.__regex_flags) + methods = re.search(r"(?<=^allow\:).+", record["response_headers"], self.__regex_flags) # NOTE: HTTP response headers are returned in plaintext. if methods: for method in methods[0].split(","): method = method.strip().upper() if method not in self.__allowed_methods: self.__allowed_methods.append(method) else: - for key in record["response_headers"]: + for key in record["response_headers"]: # NOTE: HTTP response headers are returned as dictionary. if key.lower() == "allow": for method in record["response_headers"][key].split(","): method = method.strip().upper() @@ -574,16 +579,17 @@ def __set_allowed_http_methods(self): self.__allowed_methods.append(method) break if not self.__allowed_methods: - print_cyan("Cannot fetch allowed HTTP methods, moving on...") + print_cyan("Cannot fetch allowed HTTP methods, using all built-in HTTP methods...") self.__allowed_methods = self.__get_methods() - # TO DO: Brute-force allowed HTTP methods. + # TO DO: Validate (brute-force) all built-in HTTP methods. else: print_green(("Allowed HTTP methods: [{0}]").format((", ").join(self.__allowed_methods))) - # -------------------------------- - def __fetch(self, url, method = None, headers = None, cookies = None, body = None, user_agent = None, proxy = None, curl = None, passthrough = True): + # ------------------------------------ + + def __fetch(self, url, method = None, headers = None, cookies = None, body = None, user_agent = None, proxy = None, curl = None, ignore = True, response_headers = False, response_body = False): record = self.__record("SYSTEM-0", url, method, headers, cookies, body, user_agent, proxy, curl) - return self.__send_curl(record, passthrough) if record["curl"] else self.__send_request(record, passthrough) + return self.__send_curl(record, ignore, response_headers, response_body) if record["curl"] else self.__send_request(record, ignore, response_headers, response_body) def __records(self, identifier, urls, methods = None, headers = None, cookies = None, body = None, user_agent = None, proxy = None, curl = None): if not isinstance(urls, list): @@ -611,7 +617,7 @@ def __record(self, identifier, url, method, headers, cookies, body, user_agent, headers = self.__inspect_headers(headers) cookies = self.__inspect_cookies(cookies) if not user_agent: - user_agent = self.__user_agents[random.randint(0, self.__user_agents_len - 1)] if self.__user_agents_len > 1 else self.__user_agents[0] + user_agent = self.__get_user_agent() if not proxy: proxy = self.__proxy if not curl and curl is not False: @@ -666,6 +672,9 @@ def __inspect_cookies(self, cookies = None): tmp.append(format_cookie_key_value(key, value)) return tmp + def __get_user_agent(self): + return self.__user_agents[random.randint(0, self.__user_agents_len - 1)] if self.__user_agents_len > 1 else self.__user_agents[0] + def __build_command(self, record): tmp = ["curl", ("--connect-timeout {0}").format(self.__connect_timeout), ("-m {0}").format(self.__read_timeout), "-iskL", ("--max-redirs {0}").format(self.__max_redirects), "--path-as-is"] if record["body"]: @@ -684,8 +693,9 @@ def __build_command(self, record): tmp = (" ").join(tmp) return tmp - # NOTE: Remove duplicate test records. - def __filter_collection(self): + # ------------------------------------ + + def __remove_duplicates(self): tmp = [] exists = set() for record in self.__collection: @@ -712,7 +722,9 @@ def __run_tests(self): executor.shutdown(wait = True, cancel_futures = True) self.__collection = results - def __send_curl(self, record, passthrough = False): + # ------------------------------------ + + def __send_curl(self, record, ignore = True, response_headers = False, response_body = False): if self.__sleep: time.sleep(self.__sleep) curl = None @@ -728,7 +740,7 @@ def __send_curl(self, record, passthrough = False): curl.setopt(pycurl.COOKIEFILE, cookiefile.name) curl.setopt(pycurl.COOKIEJAR, cookiefile.name) # ---------------------------- - if passthrough: + if response_headers: headers = io.BytesIO() curl.setopt(pycurl.HEADERFUNCTION, headers.write) # ---------------------------- @@ -767,11 +779,14 @@ def __send_curl(self, record, passthrough = False): # ---------------------------- record["code"] = int(curl.getinfo(pycurl.RESPONSE_CODE)) record["length"] = int(curl.getinfo(pycurl.SIZE_DOWNLOAD)) - if passthrough: + record["response"] = self.__decode(response.getvalue()) + if response_headers: record["response_headers"] = self.__decode(headers.getvalue()) - # record["response"] = self.__decode(response.getvalue()) - elif record["length"] in self.__content_lengths or (self.__ignore and re.search(self.__ignore, self.__decode(response.getvalue()), self.__regex_flags)): - record["code"] = IGNORED + if ignore: + if record["length"] in self.__content_lengths or (self.__ignore and re.search(self.__ignore, record["response"], self.__regex_flags)): + record["code"] = IGNORED + if not response_body: + record["response"] = "" # ---------------------------- except pycurl.error as ex: # ---------------------------- @@ -793,7 +808,7 @@ def __send_curl(self, record, passthrough = False): # ---------------------------- return record - def __send_request(self, record, passthrough = False): + def __send_request(self, record, ignore = True, response_headers = False, response_body = False): if self.__sleep: time.sleep(self.__sleep) session = None @@ -832,11 +847,14 @@ def __send_request(self, record, passthrough = False): # ---------------------------- record["code"] = int(response.status_code) record["length"] = len(response.content) - if passthrough: + record["response"] = self.__decode(response.content) + if response_headers: record["response_headers"] = dict(response.headers) - # record["response"] = self.__decode(response.content) - elif record["length"] in self.__content_lengths or (self.__ignore and re.search(self.__ignore, self.__decode(response.content), self.__regex_flags)): - record["code"] = IGNORED + if ignore: + if record["length"] in self.__content_lengths or (self.__ignore and re.search(self.__ignore, record["response"], self.__regex_flags)): + record["code"] = IGNORED + if not response_body: + record["response"] = "" # ---------------------------- except (requests.packages.urllib3.exceptions.LocationParseError, requests.exceptions.RequestException) as ex: # ---------------------------- @@ -856,7 +874,9 @@ def __set_double_headers(self, request, headers): exists = set() for header in headers: key, value = get_header_key_value(header) - request.headers[key if key not in exists and not exists.add(key) else uniquestr(key)] = self.__encode(value) # NOTE: Allows double headers. + request.headers[key if key not in exists and not exists.add(key) else uniquestr(key)] = self.__encode(value) + + # ------------------------------------ def __validate_results(self): print_time("Validating results...") @@ -864,6 +884,8 @@ def __validate_results(self): self.__collection = output.results_table() if self.__show_table else output.results_json() output.stats_table() + # ------------------------------------ + def __check_tests(self, array): return any(test in array for test in self.__tests) @@ -1068,8 +1090,10 @@ def __prepare_collection(self): headers = self.__get_ip_headers(self.__get_broken_urls(scheme = False, ip = False) + self.__get_broken_urls(scheme = False, ip = True)) ) + # ------------------------------------ + def __get_methods(self): - tmp = [ + return unique([ "ACL", "ARBITRARY", "BASELINE-CONTROL", @@ -1115,8 +1139,7 @@ def __get_methods(self): "UPDATE", "UPDATEREDIRECTREF", "VERSION-CONTROL" - ] - return unique(tmp) + ]) def __get_file_upload_urls(self, files): tmp = [] @@ -1197,7 +1220,6 @@ def __get_port_override_headers(self): def __get_url_headers(self, values): tmp = [] - # -------------------------------- headers = [ "19-Profile", "Base-URL", @@ -1222,7 +1244,6 @@ def __get_url_headers(self, values): for header in headers: for value in values: tmp.append(("{0}: {1}").format(header, value)) - # -------------------------------- return unique(tmp) def __get_ip_headers(self, values): @@ -1331,27 +1352,34 @@ def __get_random_values(self): def __get_all_values(self, scheme = True, ip = False): tmp = [] + # -------------------------------- domain_extended = "ip_extended" if ip else "domain_extended" - localhost = "127.0.0.1" if ip else "localhost" + localhost = "127.0.0.1" if ip else "localhost" + # -------------------------------- temp = strip_url_schemes(self.__get_localhost_values() + self.__get_random_values() + self.__url[domain_extended]) if scheme: tmp.extend([("{0}://{1}").format(self.__url["scheme"], entry + self.__url["path_full"]) for entry in temp]) else: tmp += temp + # -------------------------------- temp = strip_url_schemes(self.__evil[domain_extended]) if scheme: tmp.extend([("{0}://{1}").format(self.__evil["scheme"], entry + self.__url["path_full"]) for entry in temp]) else: tmp += temp + # -------------------------------- if not scheme: for override in strip_url_schemes(self.__url[domain_extended] + self.__evil[domain_extended]): for initial in strip_url_schemes([localhost, ("{0}:{1}").format(localhost, self.__url["port"])]): tmp.append(("{0},{1}").format(initial, override)) + # -------------------------------- return unique(tmp) def __get_double_host_header(self, ip = False): tmp = [] + # -------------------------------- domain_extended = "ip_extended" if ip else "domain_extended" + # -------------------------------- exists = set() for override in strip_url_schemes(self.__evil[domain_extended]): for initial in strip_url_schemes(self.__url[domain_extended]): @@ -1361,6 +1389,7 @@ def __get_double_host_header(self, ip = False): ("Host: {0}").format(initial), ("Host: {0}").format(override) ]) + # -------------------------------- return tmp def __get_path_bypass_urls(self, sniper = False): @@ -1375,11 +1404,15 @@ def __get_path_bypass_urls(self, sniper = False): injections.extend([path_const + i + path_const, i + path_const, path_const + i, i]) if sniper: for i in injections: - path_bypasses.extend([path + i, i + path, i + path + i]) + path_bypasses.extend([path + i, i + path]) + if path: + path_bypasses.extend([i + path + i]) else: for i in injections: - for j in injections: - path_bypasses.extend([i + path + j]) + path_bypasses.extend([path + i, i + path]) + if path: + for j in injections: + path_bypasses.extend([i + path + j]) # -------------------------------- # NOTE: Inject at the end of the URL path. injections = [] @@ -1420,7 +1453,7 @@ def __get_basic_auth_headers(self): headers = [ "Authorization" ] - values = ["", "null", "None", "nil"] + values = ["", "null", "None", "nil"] usernames = ["admin", "cisco", "gateway", "guest", "jigsaw", "root", "router", "switch", "tomcat", "wampp", "xampp", "sysadmin"] passwords = ["admin", "cisco", "default", "gateway", "guest", "jigsaw", "password", "root", "router", "secret", "switch", "tomcat", "toor", "wampp", "xampp", "sysadmin"] for username in usernames: @@ -1456,8 +1489,10 @@ def __get_bearer_auth_headers(self): def __get_redirect_urls(self, scheme = True, ip = False): tmp = [] + # -------------------------------- domain_extended = "ip_extended" if ip else "domain_extended" - domain_no_port = "ip_no_port" if ip else "domain_no_port" + domain_no_port = "ip_no_port" if ip else "domain_no_port" + # -------------------------------- injections = [path_const, ("{0}.").format(path_const)] for override in strip_url_schemes(self.__evil[domain_extended]): tmp.append(override) @@ -1465,21 +1500,26 @@ def __get_redirect_urls(self, scheme = True, ip = False): tmp.append(override + injection + self.__url[domain_no_port]) if not ip: tmp.append(("{0}.{1}").format(self.__url[domain_no_port], override)) + # -------------------------------- if scheme: tmp = [("{0}://{1}").format(self.__evil["scheme"], entry + self.__url["path_full"]) for entry in tmp] + # -------------------------------- return unique(tmp) def __get_broken_urls(self, scheme = True, ip = False): tmp = [] + # -------------------------------- domain_extended = "ip_extended" if ip else "domain_extended" - at_const = "@" - injections = [at_const, (" {0}").format(at_const), ("#{0}").format(at_const)] + # -------------------------------- + injections = ["@", " @", "#@"] for override in strip_url_schemes(self.__evil[domain_extended]): for initial in strip_url_schemes(self.__url[domain_extended]): for injection in injections: tmp.append(initial + injection + override) + # -------------------------------- if scheme: tmp = [("{0}://{1}").format(self.__evil["scheme"], entry + self.__url["path_full"]) for entry in tmp] + # -------------------------------- return unique(tmp) # ---------------------------------------- @@ -1496,7 +1536,7 @@ def __color(self, value, color): def __results_row(self, record, color): return [self.__color(record[key], color) for key in ["id", "code", "length", "command"]] - def results_table(self): + def results_table(self): # NOTE: To see more or less results, comment in or out 'continue' line. tmp = []; table = [] for record in self.__collection: if record["code"] < 100 or record["code"] >= 600: @@ -1596,7 +1636,7 @@ def show(self): class MyArgParser(argparse.ArgumentParser): def print_help(self): - print("Forbidden v11.0 ( github.com/ivan-sincek/forbidden )") + print("Forbidden v11.1 ( github.com/ivan-sincek/forbidden )") print("") print("Usage: forbidden -u url -t tests [-f force] [-v values ] [-p path ] [-o out ]") print("Example: forbidden -u https://example.com/admin -t all [-f POST ] [-v values.txt] [-p /home] [-o results.json]") @@ -1751,38 +1791,38 @@ def __print_error(self, msg): def __parse_url(self, value, key): data = { "url": { - "schemes": ["http", "https"], - "scheme_error": [ - "Inaccessible URL: Scheme is required", - "Inaccessible URL: Supported schemes are 'http' and 'https'" - ], + "schemes" : ["http", "https"], + "scheme_error": { + "required" : "Inaccessible URL: Scheme is required", + "unsupported": "Inaccessible URL: Supported schemes are 'http' and 'https'" + }, "domain_error": "Inaccessible URL: Invalid domain name", - "port_error": "Inaccessible URL: Port number is out of range" + "port_error" : "Inaccessible URL: Port number is out of range" }, "evil": { - "schemes": ["http", "https"], - "scheme_error": [ - "Evil URL: Scheme is required", - "Evil URL: Supported schemes are 'http' and 'https'" - ], + "schemes" : ["http", "https"], + "scheme_error": { + "required" : "Evil URL: Scheme is required", + "unsupported": "Evil URL: Supported schemes are 'http' and 'https'" + }, "domain_error": "Evil URL: Invalid domain name", - "port_error": "Evil URL: Port number is out of range" + "port_error" : "Evil URL: Port number is out of range" }, "proxy": { - "schemes": ["http", "https", "socks4", "socks4h", "socks5", "socks5h"], - "scheme_error": [ - "Proxy URL: Scheme is required", - "Proxy URL: Supported schemes are 'http[s]', 'socks4[h]', and 'socks5[h]'" - ], + "schemes" : ["http", "https", "socks4", "socks4h", "socks5", "socks5h"], + "scheme_error": { + "required" : "Proxy URL: Scheme is required", + "unsupported": "Proxy URL: Supported schemes are 'http[s]', 'socks4[h]', and 'socks5[h]'" + }, "domain_error": "Proxy URL: Invalid domain name", - "port_error": "Proxy URL: Port number is out of range" + "port_error" : "Proxy URL: Port number is out of range" } } tmp = urllib.parse.urlsplit(value) if not tmp.scheme: - self.__error(data[key]["scheme_error"][0]) + self.__error(data[key]["scheme_error"]["required"]) elif tmp.scheme not in data[key]["schemes"]: - self.__error(data[key]["scheme_error"][1]) + self.__error(data[key]["scheme_error"]["unsupported"]) elif not tmp.netloc: self.__error(data[key]["domain_error"]) elif tmp.port and (tmp.port < 1 or tmp.port > 65535): @@ -1909,7 +1949,7 @@ def main(): if validate.run(): print("###########################################################################") print("# #") - print("# Forbidden v11.0 #") + print("# Forbidden v11.1 #") print("# by Ivan Sincek #") print("# #") print("# Bypass 4xx HTTP response status codes and more. #") diff --git a/src/stresser/stresser.py b/src/stresser/stresser.py index 409ef84..6d8721a 100644 --- a/src/stresser/stresser.py +++ b/src/stresser/stresser.py @@ -186,7 +186,7 @@ def write_file(data, out): # ---------------------------------------- -default_user_agent = "Stresser/11.0" +default_user_agent = "Stresser/11.1" def get_all_user_agents(): tmp = [] @@ -229,7 +229,7 @@ def __init__(self, url, ignore_qsf, ignore_curl, force, headers, cookies, ignore self.__directory = directory self.__debug = debug # -------------------------------- - # NOTE: PycURL configuration. + # NOTE: cURL configuration. self.__curl = not ignore_curl self.__verify = False # NOTE: Ignore SSL/TLS verification. self.__allow_redirects = True @@ -261,23 +261,26 @@ def __parse_url(self, url, ignore_qsf = False, case_sensitive = False): fragment["parsed"] = {} # NOTE: Not used. fragment["full" ] = ("#{0}").format(url.fragment) if url.fragment else "" # -------------------------------- - tmp = {} - tmp["scheme" ] = scheme - tmp["domain_no_port" ] = domain.split(":", 1)[0] - tmp["port" ] = port - tmp["domain" ] = domain - tmp["domain_extended" ] = get_all_domains(tmp["scheme"], tmp["domain_no_port"], tmp["port"]) + tmp = {} + tmp["scheme" ] = scheme + tmp["port" ] = port # -------------------------------- - tmp["ip_no_port" ] = None - tmp["ip" ] = None - tmp["ip_extended" ] = None - tmp["scheme_ip" ] = None + tmp["domain_no_port" ] = domain.split(":", 1)[0] + tmp["domain" ] = domain + tmp["domain_extended" ] = get_all_domains(tmp["scheme"], tmp["domain_no_port"], tmp["port"]) + tmp["scheme_domain" ] = ("{0}://{1}").format(tmp["scheme"], tmp["domain"]) + tmp["scheme_domain_no_port"] = ("{0}://{1}").format(tmp["scheme"], tmp["domain_no_port"]) # -------------------------------- - tmp["scheme_domain" ] = ("{0}://{1}").format(tmp["scheme"], tmp["domain"]) - tmp["path" ] = path - tmp["query" ] = query - tmp["fragment" ] = fragment - tmp["path_full" ] = tmp["path"] + tmp["query"]["full"] + tmp["fragment"]["full"] + tmp["ip_no_port" ] = None + tmp["ip" ] = None + tmp["ip_extended" ] = None + tmp["scheme_ip" ] = None + tmp["scheme_ip_no_port" ] = None + # -------------------------------- + tmp["path" ] = path + tmp["query" ] = query + tmp["fragment" ] = fragment + tmp["path_full" ] = tmp["path"] + tmp["query"]["full"] + tmp["fragment"]["full"] # -------------------------------- tmp["urls" ] = { "base" : tmp["scheme_domain"] + tmp["path_full"], @@ -292,21 +295,21 @@ def __parse_url(self, url, ignore_qsf = False, case_sensitive = False): } # -------------------------------- tmp["relative_paths" ] = extend_path(tmp["path"]) + extend_path(tmp["path"], tmp["query"]["full"], tmp["fragment"]["full"]) - tmp["absolute_paths" ] = append_paths(("{0}://{1}").format(tmp["scheme"], tmp["domain_no_port"]), tmp["relative_paths"]) + append_paths(tmp["scheme_domain"], tmp["relative_paths"]) + tmp["absolute_paths" ] = append_paths(tmp["scheme_domain_no_port"], tmp["relative_paths"]) + append_paths(tmp["scheme_domain"], tmp["relative_paths"]) # -------------------------------- for key in tmp: if isinstance(tmp[key], list): tmp[key] = unique(tmp[key]) return tmp - # -------------------------------- def __parse_ip(self, obj): try: - obj["ip_no_port" ] = socket.gethostbyname(obj["domain_no_port"]) - obj["ip" ] = ("{0}:{1}").format(obj["ip_no_port"], obj["port"]) - obj["ip_extended"] = get_all_domains(obj["scheme"], obj["ip_no_port"], obj["port"]) - obj["scheme_ip" ] = ("{0}://{1}").format(obj["scheme"], obj["ip"]) - obj["urls"]["ip" ] = { + obj["ip_no_port" ] = socket.gethostbyname(obj["domain_no_port"]) + obj["ip" ] = ("{0}:{1}").format(obj["ip_no_port"], obj["port"]) + obj["ip_extended" ] = get_all_domains(obj["scheme"], obj["ip_no_port"], obj["port"]) + obj["scheme_ip" ] = ("{0}://{1}").format(obj["scheme"], obj["ip"]) + obj["scheme_ip_no_port"] = ("{0}://{1}").format(obj["scheme"], obj["ip_no_port"]) + obj["urls"]["ip" ] = { "https": get_base_https_url(obj["scheme"], obj["ip_no_port"], obj["port"], obj["path_full"]), "http" : get_base_http_url(obj["scheme"], obj["ip_no_port"], obj["port"], obj["path_full"]) } @@ -314,6 +317,8 @@ def __parse_ip(self, obj): self.__print_debug(ex) return obj + # ------------------------------------ + def __add_content_lengths(self, content_lengths): if not isinstance(content_lengths, list): content_lengths = [content_lengths] @@ -345,6 +350,8 @@ def __decode(self, values): else: return values.decode(self.__encoding) + # ------------------------------------ + def run(self): self.__validate_inaccessible_url() if not self.__error: @@ -360,45 +367,39 @@ def run(self): self.__validate_results() def __validate_inaccessible_url(self): - # -------------------------------- print_cyan(("Normalized inaccessible URL: {0}").format(self.__url["urls"]["base"])) print_time(("Validating the inaccessible URL using HTTP {0} method...").format(self.__force if self.__force else self.__default_method)) - record = self.__fetch(url = self.__url["urls"]["base"], method = self.__force if self.__force else self.__default_method) - if not (record["code"] > 0): + record = self.__fetch(url = self.__url["urls"]["base"], method = self.__force if self.__force else self.__default_method, ignore = False) + if record["code"] <= 0: self.__print_error("Cannot validate the inaccessible URL, script will exit shortly...") elif "base" in self.__content_lengths: print_green(("Ignoring the inaccessible URL response content length: {0}").format(record["length"])) self.__content_lengths.pop(self.__content_lengths.index("base")) self.__add_content_lengths(record["length"]) - # -------------------------------- def __fetch_inaccessible_ip(self): - # -------------------------------- print_time("Fetching the IP of inaccessible URL...") self.__url = self.__parse_ip(copy.deepcopy(self.__url)) if not self.__url["ip_no_port"]: self.__print_error("Cannot fetch the IP of inaccessible URL, script will exit shortly...") - # -------------------------------- def __set_allowed_http_methods(self): - # -------------------------------- if self.__force: print_cyan(("Forcing HTTP {0} method for all non-specific test cases...").format(self.__force)) self.__allowed_methods = [self.__force] - # -------------------------------- else: print_time("Fetching allowed HTTP methods...") - record = self.__fetch(url = self.__url["urls"]["base"], method = "OPTIONS") + record = self.__fetch(url = self.__url["urls"]["base"], method = "OPTIONS", ignore = False, response_headers = True) if record["code"] > 0: if record["curl"]: - methods = re.search(r"(?<=^allow\:).+", record["response_headers"], self.__regex_flags) + methods = re.search(r"(?<=^allow\:).+", record["response_headers"], self.__regex_flags) # NOTE: HTTP response headers are returned in plaintext. if methods: for method in methods[0].split(","): method = method.strip().upper() if method not in self.__allowed_methods: self.__allowed_methods.append(method) else: - for key in record["response_headers"]: + for key in record["response_headers"]: # NOTE: HTTP response headers are returned as dictionary. if key.lower() == "allow": for method in record["response_headers"][key].split(","): method = method.strip().upper() @@ -406,16 +407,16 @@ def __set_allowed_http_methods(self): self.__allowed_methods.append(method) break if not self.__allowed_methods: - print_cyan(("Cannot fetch allowed HTTP methods, defaulting to HTTP {0} method for all non-specific test cases...").format(self.__default_method)) + print_cyan(("Cannot fetch allowed HTTP methods, using default HTTP {0} method...").format(self.__default_method)) self.__allowed_methods = [self.__default_method] - # TO DO: Brute-force allowed HTTP methods. else: print_green(("Allowed HTTP methods: [{0}]").format((", ").join(self.__allowed_methods))) - # -------------------------------- - def __fetch(self, url, method = None, headers = None, cookies = None, body = None, user_agent = None, proxy = None, curl = None, passthrough = True): + # ------------------------------------ + + def __fetch(self, url, method = None, headers = None, cookies = None, body = None, user_agent = None, proxy = None, curl = None, ignore = True, response_headers = False, response_body = False, save = False): record = self.__record("SYSTEM-0", url, method, headers, cookies, body, user_agent, proxy, curl) - return self.__send_curl(record, passthrough) if record["curl"] else self.__send_request(record, passthrough) + return self.__send_curl(record, ignore, response_headers, response_body, save) if record["curl"] else self.__send_request(record, ignore, response_headers, response_body, save) def __records(self, identifier, urls, methods = None, headers = None, cookies = None, body = None, user_agent = None, proxy = None, curl = None, repeat = None): if not isinstance(urls, list): @@ -447,7 +448,7 @@ def __record(self, identifier, url, method, headers, cookies, body, user_agent, headers = self.__inspect_headers(headers) cookies = self.__inspect_cookies(cookies) if not user_agent: - user_agent = self.__user_agents[random.randint(0, self.__user_agents_len - 1)] if self.__user_agents_len > 1 else self.__user_agents[0] + user_agent = self.__get_user_agent() if not proxy: proxy = self.__proxy if not curl and curl is not False: @@ -502,6 +503,9 @@ def __inspect_cookies(self, cookies = None): tmp.append(format_cookie_key_value(key, value)) return tmp + def __get_user_agent(self): + return self.__user_agents[random.randint(0, self.__user_agents_len - 1)] if self.__user_agents_len > 1 else self.__user_agents[0] + def __build_command(self, record): tmp = ["curl", ("--connect-timeout {0}").format(self.__connect_timeout), ("-m {0}").format(self.__read_timeout), "-iskL", ("--max-redirs {0}").format(self.__max_redirects), "--path-as-is"] if record["body"]: @@ -520,6 +524,8 @@ def __build_command(self, record): tmp = (" ").join(tmp) return tmp + # ------------------------------------ + def __run_tests(self): results = [] print_time(("Running tests with {0} engine...").format("PycURL" if self.__curl else "Python Requests")) @@ -538,7 +544,9 @@ def __run_tests(self): executor.shutdown(wait = True, cancel_futures = True) self.__collection = results - def __send_curl(self, record, passthrough = False): + # ------------------------------------ + + def __send_curl(self, record, ignore = True, response_headers = False, response_body = False, save = True): curl = None cookiefile = None headers = None @@ -552,7 +560,7 @@ def __send_curl(self, record, passthrough = False): curl.setopt(pycurl.COOKIEFILE, cookiefile.name) curl.setopt(pycurl.COOKIEJAR, cookiefile.name) # ---------------------------- - if passthrough: + if response_headers: headers = io.BytesIO() curl.setopt(pycurl.HEADERFUNCTION, headers.write) # ---------------------------- @@ -591,18 +599,19 @@ def __send_curl(self, record, passthrough = False): # ---------------------------- record["code"] = int(curl.getinfo(pycurl.RESPONSE_CODE)) record["length"] = int(curl.getinfo(pycurl.SIZE_DOWNLOAD)) + record["response"] = self.__decode(response.getvalue()) record["id"] = ("{0}-{1}-{2}").format(record["code"], record["length"], record["id"]) - content = response.getvalue() - if passthrough: + if response_headers: record["response_headers"] = self.__decode(headers.getvalue()) - # record["response"] = self.__decode(content) - elif record["length"] in self.__content_lengths or (self.__ignore and re.search(self.__ignore, self.__decode(content), self.__regex_flags)): - record["code"] = IGNORED - # NOTE: Additional validation to prevent congestion from writing large and usless data to files. - elif record["code"] >= 200 and record["code"] < 400: + if ignore: + if record["length"] in self.__content_lengths or (self.__ignore and re.search(self.__ignore, record["response"], self.__regex_flags)): + record["code"] = IGNORED + if save and record["code"] >= 200 and record["code"] < 400: # NOTE: Additional validation to prevent congestion from writing large and usless data to files. file = os.path.join(self.__directory, ("{0}.txt").format(record["id"])) if not os.path.exists(file): - open(file, "wb").write(content) + open(file, "w").write(record["response"]) + if not response_body: + record["response"] = "" # ---------------------------- except (pycurl.error, FileNotFoundError) as ex: # -------------------------------- @@ -624,7 +633,7 @@ def __send_curl(self, record, passthrough = False): # ---------------------------- return record - def __send_request(self, record, passthrough = False): + def __send_request(self, record, ignore = True, response_headers = False, response_body = False, save = True): session = None response = None try: @@ -661,18 +670,19 @@ def __send_request(self, record, passthrough = False): # ---------------------------- record["code"] = int(response.status_code) record["length"] = len(response.content) + record["response"] = self.__decode(response.content) record["id"] = ("{0}-{1}-{2}").format(record["code"], record["length"], record["id"]) - content = response.content - if passthrough: + if response_headers: record["response_headers"] = dict(response.headers) - # record["response"] = self.__decode(content) - elif record["length"] in self.__content_lengths or (self.__ignore and re.search(self.__ignore, self.__decode(content), self.__regex_flags)): - record["code"] = IGNORED - # NOTE: Additional validation to prevent congestion from writing large and usless data to files. - elif record["code"] >= 200 and record["code"] < 400: + if ignore: + if record["length"] in self.__content_lengths or (self.__ignore and re.search(self.__ignore, record["response"], self.__regex_flags)): + record["code"] = IGNORED + if save and record["code"] >= 200 and record["code"] < 400: # NOTE: Additional validation to prevent congestion from writing large and usless data to files. file = os.path.join(self.__directory, ("{0}.txt").format(record["id"])) if not os.path.exists(file): - open(file, "wb").write(content) + open(file, "w").write(record["response"]) + if not response_body: + record["response"] = "" # ---------------------------- except (requests.packages.urllib3.exceptions.LocationParseError, requests.exceptions.RequestException, FileNotFoundError) as ex: # ---------------------------- @@ -692,7 +702,9 @@ def __set_double_headers(self, request, headers): exists = set() for header in headers: key, value = get_header_key_value(header) - request.headers[key if key not in exists and not exists.add(key) else uniquestr(key)] = self.__encode(value) # NOTE: Allows double headers. + request.headers[key if key not in exists and not exists.add(key) else uniquestr(key)] = self.__encode(value) + + # ------------------------------------ def __validate_results(self): print_time("Validating results...") @@ -708,6 +720,8 @@ def __mark_duplicates(self): continue record["code"] = DUPLICATE + # ------------------------------------ + def __prepare_collection(self): print_time("Preparing test records...") # -------------------------------- @@ -731,7 +745,7 @@ def __color(self, value, color): def __results_row(self, record, color): return [self.__color(record[key], color) for key in ["id", "code", "length", "command"]] - def results_table(self): + def results_table(self): # NOTE: To see more or less results, comment in or out 'continue' line. tmp = []; table = [] for record in self.__collection: if record["code"] < 100 or record["code"] >= 600: @@ -833,7 +847,7 @@ def show(self): class MyArgParser(argparse.ArgumentParser): def print_help(self): - print("Stresser v11.0 ( github.com/ivan-sincek/forbidden )") + print("Stresser v11.1 ( github.com/ivan-sincek/forbidden )") print("") print("Usage: stresser -u url -dir directory -r repeat -th threads [-f force] [-o out ]") print("Example: stresser -u https://example.com/secret -dir results -r 1000 -th 200 [-f GET ] [-o results.json]") @@ -962,29 +976,29 @@ def __print_error(self, msg): def __parse_url(self, value, key): data = { "url": { - "schemes": ["http", "https"], - "scheme_error": [ - "Inaccessible URL: Scheme is required", - "Inaccessible URL: Supported schemes are 'http' and 'https'" - ], + "schemes" : ["http", "https"], + "scheme_error": { + "required" : "Inaccessible URL: Scheme is required", + "unsupported": "Inaccessible URL: Supported schemes are 'http' and 'https'" + }, "domain_error": "Inaccessible URL: Invalid domain name", - "port_error": "Inaccessible URL: Port number is out of range" + "port_error" : "Inaccessible URL: Port number is out of range" }, "proxy": { - "schemes": ["http", "https", "socks4", "socks4h", "socks5", "socks5h"], - "scheme_error": [ - "Proxy URL: Scheme is required", - "Proxy URL: Supported schemes are 'http[s]', 'socks4[h]', and 'socks5[h]'" - ], + "schemes" : ["http", "https", "socks4", "socks4h", "socks5", "socks5h"], + "scheme_error": { + "required" : "Proxy URL: Scheme is required", + "unsupported": "Proxy URL: Supported schemes are 'http[s]', 'socks4[h]', and 'socks5[h]'" + }, "domain_error": "Proxy URL: Invalid domain name", - "port_error": "Proxy URL: Port number is out of range" + "port_error" : "Proxy URL: Port number is out of range" } } tmp = urllib.parse.urlsplit(value) if not tmp.scheme: - self.__error(data[key]["scheme_error"][0]) + self.__error(data[key]["scheme_error"]["required"]) elif tmp.scheme not in data[key]["schemes"]: - self.__error(data[key]["scheme_error"][1]) + self.__error(data[key]["scheme_error"]["unsupported"]) elif not tmp.netloc: self.__error(data[key]["domain_error"]) elif tmp.port and (tmp.port < 1 or tmp.port > 65535): @@ -1083,7 +1097,7 @@ def main(): if validate.run(): print("##########################################################################") print("# #") - print("# Stresser v11.0 #") + print("# Stresser v11.1 #") print("# by Ivan Sincek #") print("# #") print("# Bypass 4xx HTTP response status codes with stress testing. #")