diff --git a/.gitignore b/.gitignore index 6fb4916..a58acce 100644 --- a/.gitignore +++ b/.gitignore @@ -207,4 +207,5 @@ oas.yml .DS_Store ## local testing scripts -test.py \ No newline at end of file +test.py +test-files \ No newline at end of file diff --git a/src/README.md b/src/README.md index 83ee4c4..bcaa517 100644 --- a/src/README.md +++ b/src/README.md @@ -8,28 +8,28 @@ Automatically Tests for vulnerabilities after generating tests from openapi spec ## Security Checks -- [x] Restricted HTTP Methods -- [x] SQLi -- [x] BOLA (Might need few bug fixes) -- [x] Data Exposure (Detects Common Data Exposures) -- [x] BOPLA / Mass Assignment -- [x] Broken Access Control -- [x] Basic Command Injection -- [x] Basic XSS/HTML Injection test -- [x] Basic SSTI test -- [ ] Broken Authentication +- [x] Restricted HTTP Methods +- [x] SQLi +- [x] BOLA (Might need few bug fixes) +- [x] Data Exposure (Detects Common Data Exposures) +- [x] BOPLA / Mass Assignment +- [x] Broken Access Control +- [x] Basic Command Injection +- [x] Basic XSS/HTML Injection test +- [x] Basic SSTI test +- [ ] Broken Authentication ## Features -- Few Security Checks from OWASP API Top 10 -- Automated Testing -- User Config Based Testing -- API for Automating tests and Integrating Tool with other platforms/tools -- CLI tool -- Proxy Support -- Secure Dockerized Project for Easy Usage -- Open Source Tool with MIT License -- Github Action +- Few Security Checks from OWASP API Top 10 +- Automated Testing +- User Config Based Testing +- API for Automating tests and Integrating Tool with other platforms/tools +- CLI tool +- Proxy Support +- Secure Dockerized Project for Easy Usage +- Open Source Tool with MIT License +- Github Action ## Demo @@ -39,34 +39,34 @@ Automatically Tests for vulnerabilities after generating tests from openapi spec ## Github Action -- Create github action secret `url` for your repo -- Setup github action workflow in your repo `.github/workflows/offat.yml` +- Create github action secret `url` for your repo +- Setup github action workflow in your repo `.github/workflows/offat.yml` ```yml name: OWASP OFFAT Sample Workflow on: - push: - branches: - - dev - - main + push: + branches: + - dev + - main jobs: - test: - runs-on: ubuntu-latest - - steps: - - name: "download swagger/OAS file" - run: curl ${url} -o /tmp/swagger.json - env: - url: ${{ secrets.url }} - - - name: "OWASP OFFAT CICD Scanner" - uses: OWASP/OFFAT@main # OWASP/OFFAT@v0.17.3 - with: - file: /tmp/swagger.json # or ${{ secrets.url }} - rate_limit: 120 - artifact_retention_days: 1 + test: + runs-on: ubuntu-latest + + steps: + - name: "download swagger/OAS file" + run: curl ${url} -o /tmp/swagger.json + env: + url: ${{ secrets.url }} + + - name: "OWASP OFFAT CICD Scanner" + uses: OWASP/OFFAT@main # OWASP/OFFAT@v0.17.3 + with: + file: /tmp/swagger.json # or ${{ secrets.url }} + rate_limit: 120 + artifact_retention_days: 1 ``` > Prefer locking action to specific version `OWASP/OFFAT@v0.17.3` instead of using `OWASP/OFFAT@main` and bump OFFAT action version after testing. @@ -89,13 +89,13 @@ The disclaimer advises users to use the open-source project for ethical and legi ### Using pip -- Install main branch using pip +- Install main branch using pip ```bash python3 -m pip install git+https://github.com/OWASP/OFFAT.git ``` -- Install Release from PyPi +- Install Release from PyPi ```bash python3 -m pip install offat # only cli tool @@ -106,214 +106,218 @@ The disclaimer advises users to use the open-source project for ethical and legi ### Docker -- Build Image +- Build Image ```bash - make build-local-images + make local ``` -- CLI Tool +- CLI Tool - ```bash - docker run --rm dmdhrumilmistry/offat - ``` + ```bash + docker run --rm dmdhrumilmistry/offat + ``` -- API +- API - ```bash - docker compose up -d - ``` + ```bash + docker compose up -d + ``` - > POST `openapi` documentation to `/api/v1/scan/` endpoint with its valid `type` (json/yaml); `job_id` will be returned. + > POST `openapi` documentation to `/api/v1/scan/` endpoint with its valid `type` (json/yaml); `job_id` will be returned. ### Manual Method -- Open terminal +- Open terminal -- Install git package +- Install git package - ```bash - sudo apt install git python3 -y - ``` + ```bash + sudo apt install git python3 -y + ``` -- Install [Poetry](https://python-poetry.org/docs/master#installing-with-the-official-installer) +- Install [Poetry](https://python-poetry.org/docs/master#installing-with-the-official-installer) -- clone the repository to your machine +- clone the repository to your machine - ```bash - git clone https://github.com/OWASP/OFFAT.git - ``` + ```bash + git clone https://github.com/OWASP/OFFAT.git + ``` -- Change directory +- Change directory - ```bash - cd offat - ``` + ```bash + cd offat + ``` -- install with poetry +- install with poetry - ```bash - # without options - poetry install - ``` + ```bash + # without options + poetry install + ``` ## Start OffAT ### API -- Start API Server +- Start API Server - ```bash - python -m offat.api + ```bash + python -m offat.api - # OR + # OR - offat-api - ``` + offat-api + ``` -- API Documentation can be found at +- API Documentation can be found at ### CLI Tool -- Run offat +- Run offat - ```bash - offat -f swagger_file.json # using file - offat -f https://example.com/docs.json # using url - ``` + ```bash + offat -f swagger_file.json # using file + offat -f https://example.com/docs.json # using url + ``` -- To get all the commands use `help` +- To get all the commands use `help` - ```bash - offat -h - ``` + ```bash + offat -h + ``` -- Save result in `json`, `yaml` or `html` formats. +- Save result in `json`, `yaml` or `html` formats. - ```bash - offat -f swagger_file.json -o output.json # json - offat -f swagger_file.json -o output.html -of html # html - offat -f swagger_file.json -o output.yaml -of yaml # yaml - ``` + ```bash + offat -f swagger_file.json -o output.json -of html # json + offat -f swagger_file.json -o output.html -of html # html + offat -f swagger_file.json -o output.yaml -of yaml # yaml + ``` > `json` format is default output format. > `yaml` format needs to be sanitized before usage since it dumps data as python objects. > `html` format needs more visualization. -- Run tests only for endpoint paths matching regex pattern - - ```bash - offat -f swagger_file.json -pr '/user' - ``` - -- Add headers to requests - - ```bash - offat -f swagger_file.json -H 'Accept: application/json' -H 'Authorization: Bearer YourJWTToken' - ``` - -- Run Test with Requests Rate Limited - - ```bash - offat -f swagger_file.json -rl 1000 -dr 0.001 - ``` - - > `rl`: requests rate limit, `dr`: delay between requests - -- Use along with proxy - -```bash -offat -f swagger_file.json -p http://localhost:8080 --no-ssl -o output.json -``` - -> Make sure that proxy can handle multiple requests at the same time - -- Use user provided inputs for generating tests - - ```bash - offat -f swagger_file.json -tdc test_data_config.yaml - ``` - - `test_data_config.yaml` - - ```yaml - actors: - - actor1: - request_headers: - - name: Authorization - value: Bearer [Token1] - - name: User-Agent - value: offat-actor1 - - query: - - name: id - value: 145 - type: int - - name: country - value: uk - type: str - - name: city - value: london - type: str - - body: - - name: name - value: actorone - type: str - - name: email - value: actorone@example.com - type: str - - name: phone - value: +11233211230 - type: str - - unauthorized_endpoints: # For broken access control - - "/store/order/.*" - - - actor2: - request_headers: - - name: Authorization - value: Bearer [Token2] - - name: User-Agent - value: offat-actor2 - - query: - - name: id - value: 199 - type: int - - name: country - value: uk - type: str - - name: city - value: leeds - type: str - - body: - - name: name - value: actortwo - type: str - - name: email - value: actortwo@example.com - type: str - - name: phone - value: +41912312311 - type: str - ``` +- Run tests only for endpoint paths matching regex pattern + + ```bash + offat -f swagger_file.json -pr '/user' + ``` + +- Add headers to requests + + ```bash + offat -f swagger_file.json -H 'Accept: application/json' -H 'Authorization: Bearer YourJWTToken' + ``` + +- Run Test with Requests Rate Limited + + ```bash + offat -f swagger_file.json -rl 1000 + ``` + + > `rl`: requests rate limit per second + +- Use along with proxy + + ```bash + # without ssl check + offat -f swagger_file.json -p http://localhost:8080 -o output.json -of json # ssl checks are disabled by default to avoid certificate installations + + # with ssl check enforced + offat -f swagger_file.json -p http://localhost:8080 -o output.json -of json --ssl + ``` + + > Make sure that proxy can handle multiple requests at the same time + +- Use user provided inputs for generating tests + + ```bash + offat -f swagger_file.json -tdc test_data_config.yaml + ``` + + `test_data_config.yaml` + + ```yaml + actors: + - actor1: + request_headers: + - name: Authorization + value: Bearer [Token1] + - name: User-Agent + value: offat-actor1 + + query: + - name: id + value: 145 + type: int + - name: country + value: uk + type: str + - name: city + value: london + type: str + + body: + - name: name + value: actorone + type: str + - name: email + value: actorone@example.com + type: str + - name: phone + value: +11233211230 + type: str + + unauthorized_endpoints: # For broken access control + - "/store/order/.*" + + - actor2: + request_headers: + - name: Authorization + value: Bearer [Token2] + - name: User-Agent + value: offat-actor2 + + query: + - name: id + value: 199 + type: int + - name: country + value: uk + type: str + - name: city + value: leeds + type: str + + body: + - name: name + value: actortwo + type: str + - name: email + value: actortwo@example.com + type: str + - name: phone + value: +41912312311 + type: str + ``` > If you're using Termux or windows, then use `pip` instead of `pip3`. > Few features are only for linux os, hence they might not work on windows and require admin priviliges. ### Open In Google Cloud Shell -- Temporary Session - [![Open in Cloud Shell](https://gstatic.com/cloudssh/images/open-btn.svg)](https://shell.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https://github.com/OWASP/OFFAT.git&ephemeral=true&show=terminal&cloudshell_print=./DISCLAIMER.md) -- Perisitent Session - [![Open in Cloud Shell](https://gstatic.com/cloudssh/images/open-btn.svg)](https://shell.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https://github.com/OWASP/OFFAT.git&ephemeral=false&show=terminal&cloudshell_print=./DISCLAIMER.md) +- Temporary Session + [![Open in Cloud Shell](https://gstatic.com/cloudssh/images/open-btn.svg)](https://shell.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https://github.com/OWASP/OFFAT.git&ephemeral=true&show=terminal&cloudshell_print=./DISCLAIMER.md) +- Perisitent Session + [![Open in Cloud Shell](https://gstatic.com/cloudssh/images/open-btn.svg)](https://shell.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https://github.com/OWASP/OFFAT.git&ephemeral=false&show=terminal&cloudshell_print=./DISCLAIMER.md) ## Have any Ideas 💡 or issue -- Create an issue -- Fork the repo, update script and create a Pull Request +- Create an issue +- Fork the repo, update script and create a Pull Request ## Contributing @@ -321,4 +325,4 @@ Refer [CONTRIBUTIONS.md](/CONTRIBUTING.md) for contributing to the project. ## LICENSE -Offat is distributed under `MIT` License. Refer [License](/LICENSE.md) for more information. +OWASP OFFAT is distributed under `MIT` License. Refer [License](/LICENSE.md) for more information. diff --git a/src/offat/__main__.py b/src/offat/__main__.py index f36702a..8b6dde2 100644 --- a/src/offat/__main__.py +++ b/src/offat/__main__.py @@ -1,6 +1,7 @@ from argparse import ArgumentParser - +from .parsers.openapi import OpenAPIv3Parser +from .parsers.swagger import SwaggerParser from .config_data_handler import validate_config_file_data from .tester.tester_utils import generate_and_run_tests from .parsers import create_parser @@ -8,6 +9,7 @@ def banner(): + """Prints OWASP OFFAT cli banner""" print( r""" _/| |\_ @@ -33,106 +35,106 @@ def start(): """Starts cli tool""" banner() - parser = ArgumentParser(prog="offat") + parser = ArgumentParser(prog='offat') parser.add_argument( - "-f", - "--file", - dest="fpath", + '-f', + '--file', + dest='fpath', type=str, - help="path or url of openapi/swagger specification file", + help='path or url of openapi/swagger specification file', required=True, ) parser.add_argument( - "-v", "--version", action="version", version=f"%(prog)s {get_package_version()}" + '-v', '--version', action='version', version=f"%(prog)s {get_package_version()}" ) parser.add_argument( - "-rl", - "--rate-limit", - dest="rate_limit", - help="API requests rate limit per second", + '-rl', + '--rate-limit', + dest='rate_limit', + help='API requests rate limit per second', type=float, default=60, required=False, ) parser.add_argument( - "-pr", - "--path-regex", - dest="path_regex_pattern", + '-pr', + '--path-regex', + dest='path_regex_pattern', type=str, - help="run tests for paths matching given regex pattern", + help='run tests for paths matching given regex pattern', required=False, default=None, ) parser.add_argument( - "-o", - "--output", - dest="output_file", + '-o', + '--output', + dest='output_file', type=str, - help="path to store test results", + help='path to store test results', required=False, default=None, ) parser.add_argument( - "-of", - "--format", - dest="output_format", + '-of', + '--format', + dest='output_format', type=str, - choices=["json", "yaml", "html", "table"], - help="Data format to save (json, yaml, html, table). Default: table", + choices=['json', 'yaml', 'html', 'table'], + help='Data format to save (json, yaml, html, table). Default: table', required=False, - default="table", + default='table', ) parser.add_argument( - "-H", - "--headers", - dest="headers", + '-H', + '--headers', + dest='headers', type=str, - help="HTTP requests headers that should be sent during testing eg: User-Agent: offat", + help='HTTP requests headers that should be sent during testing eg: User-Agent: offat', required=False, default=None, - action="append", - nargs="*", + action='append', + nargs='*', ) parser.add_argument( - "-tdc", - "--test-data-config", - dest="test_data_config", - help="YAML file containing user test data for tests", + '-tdc', + '--test-data-config', + dest='test_data_config', + help='YAML file containing user test data for tests', required=False, type=str, ) parser.add_argument( - "-p", - "--proxy", - dest="proxies_list", + '-p', + '--proxy', + dest='proxies_list', help='Proxy server URL to route HTTP requests through (e.g. "http://proxyserver:port")', - action="append", + action='append', required=False, type=str, default=None, ) parser.add_argument( - "-s", - "--ssl", - dest="ssl", + '-s', + '--ssl', + dest='ssl', required=False, - action="store_true", - help="Enable SSL Verification", + action='store_true', + help='Enable SSL Verification', ) parser.add_argument( - "-cf", - "--capture-failed", - dest="capture_failed", - action="store_true", - help="Captures failed requests due to any exceptions into output file", + '-cf', + '--capture-failed', + dest='capture_failed', + action='store_true', + help='Captures failed requests due to any exceptions into output file', ) parser.add_argument( - "--server", - dest="server_url", + '--server', + dest='server_url', type=str, default=None, required=False, - help="server/host base url to overwrite from OAS/Swagger file", + help='server/host base url to overwrite from OAS/Swagger file', ) args = parser.parse_args() @@ -150,7 +152,10 @@ def start(): test_data_config = validate_config_file_data(test_data_config) # parse args and run tests - api_parser = create_parser(args.fpath, server_url=args.server_url) + api_parser: SwaggerParser | OpenAPIv3Parser = create_parser( + args.fpath, server_url=args.server_url + ) + generate_and_run_tests( api_parser=api_parser, regex_pattern=args.path_regex_pattern, @@ -165,5 +170,5 @@ def start(): ) -if __name__ == "__main__": +if __name__ == '__main__': start() diff --git a/src/offat/parsers/__init__.py b/src/offat/parsers/__init__.py index aa5261e..3d4182b 100644 --- a/src/offat/parsers/__init__.py +++ b/src/offat/parsers/__init__.py @@ -11,28 +11,28 @@ def create_parser( fpath_or_url: str, spec: dict | None = None, server_url: str | None = None, -) -> SwaggerParser | OpenAPIv3Parser | None: +) -> SwaggerParser | OpenAPIv3Parser: """returns parser based on doc file""" if fpath_or_url and is_valid_url(fpath_or_url): res = http_get(fpath_or_url, timeout=3) if res.status_code != 200: logger.error( - "server returned status code %d offat expects 200 status code", + 'server returned status code %d offat expects 200 status code', res.status_code, ) exit(-1) try: spec = json_load(res.text) - fpath_or_url = None + fpath_or_url = None # type: ignore except JSONDecodeError: - logger.error("Invalid json data spec file url") + logger.error('Invalid json data spec file url') exit(-1) try: parser = BaseParser(file_or_url=fpath_or_url, spec=spec, server_url=server_url) except OSError: - logger.error("File Not Found") + logger.error('File Not Found') exit(-1) if parser.is_v3: diff --git a/src/offat/parsers/openapi.py b/src/offat/parsers/openapi.py index d762100..268d348 100644 --- a/src/offat/parsers/openapi.py +++ b/src/offat/parsers/openapi.py @@ -20,7 +20,7 @@ def __init__( ) -> None: super().__init__(file_or_url=file_or_url, spec=spec, *args, **kwargs) # noqa if not self.is_v3: - raise InvalidOpenAPIv3File("Invalid OAS v3 file") + raise InvalidOpenAPIv3File('Invalid OAS v3 file') self.http_scheme = self._get_scheme() @@ -29,60 +29,56 @@ def __init__( # raise error if host data not found if not (self.hosts and self.hosts[0]): - raise ValueError("Host is invalid or not found") + raise ValueError('Host is invalid or not found') # parse and set host data host_dict = self.hosts[0] - self.http_scheme = host_dict["scheme"] + self.http_scheme = host_dict['scheme'] self.host = f'{host_dict["host"]}:{host_dict["port"]}' - self.api_base_path = host_dict["basepath"] + self.api_base_path = host_dict['basepath'] self.base_url = f"{self.http_scheme}://{self.host}" self.request_response_params = self._get_request_response_params() + # security schemes + self.security_schemes = self._get_security_schemes() + def _populate_hosts(self): - servers = self.specification.get("servers", []) + servers = self.specification.get('servers', []) hosts = [] if not servers: - logger.error("Invalid Server Url: Server URLs are missing in spec file") - raise InvalidOpenAPIv3File("Server URLs Not Found in spec file") + logger.error('Invalid Server Url: Server URLs are missing in spec file') + raise InvalidOpenAPIv3File('Server URLs Not Found in spec file') for server in servers: - # host = ( - # server.get('url', '') - # .removeprefix('https://') - # .removeprefix('http://') - # .removesuffix('/') - # ) - # host = None if host == '' else host - scheme, host, port, basepath = parse_server_url(url=server.get("url")) + scheme, host, port, basepath = parse_server_url(url=server.get('url')) hosts.append( { - "scheme": scheme, - "host": host, - "port": port, - "basepath": basepath, + 'scheme': scheme, + 'host': host, + 'port': port, + 'basepath': basepath, } ) self.hosts = hosts def _get_scheme(self): - servers = self.specification.get("servers", []) + servers = self.specification.get('servers', []) schemes = [] for server in servers: - schemes.append("https" if "https://" in server.get("url", "") else "http") + schemes.append('https' if 'https://' in server.get('url', '') else 'http') - scheme = "https" if "https" in schemes else "http" + scheme = 'https' if 'https' in schemes else 'http' return scheme def _fetch_schema_from_spec(self, param_schema_ref: str) -> dict: - schema_spec_path = param_schema_ref.split("/")[1:] + schema_spec_path = param_schema_ref.split('/')[1:] if len(schema_spec_path) > 3: logger.error( - "Schema spec $ref path should not be greater than 3 (excluding #)" + 'Schema spec $ref path should not be greater than 3 (excluding #)' ) return {} @@ -94,11 +90,11 @@ def _fetch_schema_from_spec(self, param_schema_ref: str) -> dict: def _get_param_definition_schema(self, param: dict): """Returns Model defined schema for the passed param""" - param_schema = param.get("schema") + param_schema = param.get('schema') # replace schema $ref with model params if param_schema: - param_schema_ref = param_schema.get("$ref") + param_schema_ref = param_schema.get('$ref') if param_schema_ref: param_schema = self._fetch_schema_from_spec(param_schema_ref) @@ -115,30 +111,30 @@ def _get_response_definition_schema(self, responses: dict): """ for status_code in responses.keys(): # below line could return: ["application/json", "application/xml"] - content = responses[status_code].get("content", None) + content = responses[status_code].get('content', None) if content: status_code_content_type_responses = content.keys() for status_code_content_type in status_code_content_type_responses: - status_code_content = responses[status_code]["content"][ + status_code_content = responses[status_code]['content'][ status_code_content_type ].keys() - if "parameters" in status_code_content: - responses[status_code]["schema"] = responses[status_code][ - "content" - ][status_code_content_type]["parameters"] - elif "schema" in status_code_content: + if 'parameters' in status_code_content: + responses[status_code]['schema'] = responses[status_code][ + 'content' + ][status_code_content_type]['parameters'] + elif 'schema' in status_code_content: responses[status_code][ - "schema" + 'schema' ] = self._get_param_definition_schema( - responses[status_code]["content"][status_code_content_type] + responses[status_code]['content'][status_code_content_type] ) else: # Fetch $ref schema directly - ref = responses[status_code].get("$ref", None) + ref = responses[status_code].get('$ref', None) if ref: - responses[status_code]["schema"] = self._fetch_schema_from_spec(ref) + responses[status_code]['schema'] = self._fetch_schema_from_spec(ref) return responses @@ -152,63 +148,79 @@ def _get_request_response_params(self): list: """ requests = [] - paths = self.specification.get("paths", {}) + paths = self.specification.get('paths', {}) # extract endpoints and supported params for path in paths.keys(): - path_params = paths[path].get("parameters", []) + path_params = paths[path].get('parameters', []) for http_method in paths.get(path, {}).keys(): # consider only http methods - if http_method not in ["get", "put", "post", "delete", "options"]: + if http_method not in ['get', 'put', 'post', 'delete', 'options']: continue - request_parameters = paths[path][http_method].get("parameters", []) + request_parameters = paths[path][http_method].get('parameters', []) + security = paths[path][http_method].get('security', []) # create list of parameters: Fetch object schema from OAS file body_params = [] body_parameter_keys = ( - paths[path][http_method].get("requestBody", {}).get("content", {}) + paths[path][http_method].get('requestBody', {}).get('content', {}) ) for body_parameter_key in body_parameter_keys: - body_parameters_dict = paths[path][http_method]["requestBody"][ - "content" + body_parameters_dict = paths[path][http_method]['requestBody'][ + 'content' ][body_parameter_key] - required = paths[path][http_method]["requestBody"].get("required") - description = paths[path][http_method]["requestBody"].get( - "description" + required = paths[path][http_method]['requestBody'].get('required') + description = paths[path][http_method]['requestBody'].get( + 'description' ) body_param = self._get_param_definition_schema(body_parameters_dict) body_params.append( { - "in": "body", - "name": body_parameter_key, - "description": description, - "required": required, - "schema": body_param, + 'in': 'body', + 'name': body_parameter_key, + 'description': description, + 'required': required, + 'schema': body_param, } ) response_params = [] response_params = self._get_response_definition_schema( - paths[path][http_method].get("responses", {}) + paths[path][http_method].get('responses', {}) ) # add body param to request param request_parameters += body_params requests.append( { - "http_method": http_method, - "path": path, - "request_params": request_parameters, - "response_params": response_params, - "path_params": path_params, - "body_params": body_params, + 'http_method': http_method, + 'path': path, + 'request_params': request_parameters, + 'response_params': response_params, + 'path_params': path_params, + 'body_params': body_params, + 'security': security, } ) return requests + + def _get_security_schemes(self): + """ + Retrieves the security schemes defined in the OpenAPI specification. + + Returns: + dict: A dictionary containing the security schemes defined in the specification. + """ + security_schemes = self.specification.get('components', {}).get( + 'securitySchemes', {} + ) + if not security_schemes: + logger.warning('Security schemes not found in the OpenAPI specification.') + return security_schemes diff --git a/src/offat/parsers/swagger.py b/src/offat/parsers/swagger.py index 0078df7..1f6b8d8 100644 --- a/src/offat/parsers/swagger.py +++ b/src/offat/parsers/swagger.py @@ -19,38 +19,41 @@ def __init__( ) -> None: super().__init__(file_or_url=fpath_or_url, spec=spec, *args, **kwargs) # noqa if self.is_v3: - raise InvalidSwaggerFile("Invalid OAS v3 file") + raise InvalidSwaggerFile('Invalid OAS v3 file') self._populate_hosts() self.http_scheme = self._get_scheme() - self.api_base_path = self.specification.get("basePath", "") + self.api_base_path = self.specification.get('basePath', '') self.base_url = f"{self.http_scheme}://{self.host}" self.request_response_params = self._get_request_response_params() + # security schemes + self.security_schemes = self._get_security_schemes() + def _populate_hosts(self): - host = self.specification.get("host") + host = self.specification.get('host') if not host: - logger.error("Invalid Host: Host is missing") - raise InvalidSwaggerFile("Host Not Found in spec file") + logger.error('Invalid Host: Host is missing') + raise InvalidSwaggerFile('Host Not Found in spec file') hosts = [host] self.hosts = hosts self.host = self.hosts[0] def _get_scheme(self): - scheme = "https" if "https" in self.specification.get("schemes", []) else "http" + scheme = 'https' if 'https' in self.specification.get('schemes', []) else 'http' return scheme def _get_param_definition_schema(self, param: dict): """Returns Model defined schema for the passed param""" - param_schema = param.get("schema") + param_schema = param.get('schema') # replace schema $ref with model params if param_schema: - param_schema_ref = param_schema.get("$ref") + param_schema_ref = param_schema.get('$ref') if param_schema_ref: - model_slug = param_schema_ref.split("/")[-1] - param_schema = self.specification.get("definitions", {}).get(model_slug) + model_slug = param_schema_ref.split('/')[-1] + param_schema = self.specification.get('definitions', {}).get(model_slug) return param_schema @@ -65,10 +68,10 @@ def _get_response_definition_schema(self, responses: dict): """ for status_code in responses.keys(): status_code_response = responses[status_code].keys() - if "parameters" in status_code_response: - responses[status_code]["schema"] = responses[status_code]["parameters"] - elif "schema" in status_code_response: - responses[status_code]["schema"] = self._get_param_definition_schema( + if 'parameters' in status_code_response: + responses[status_code]['schema'] = responses[status_code]['parameters'] + elif 'schema' in status_code_response: + responses[status_code]['schema'] = self._get_param_definition_schema( responses[status_code] ) else: @@ -86,35 +89,49 @@ def _get_request_response_params(self): list: """ requests = [] - paths = self.specification.get("paths", {}) + paths = self.specification.get('paths', {}) # extract endpoints and supported params for path in paths.keys(): - path_params = paths[path].get("parameters", []) + path_params = paths[path].get('parameters', []) for http_method in paths.get(path, {}).keys(): # consider only http methods - if http_method not in ["get", "put", "post", "delete", "options"]: + if http_method not in ['get', 'put', 'post', 'delete', 'options']: continue # below var contains overall params - request_parameters = paths[path][http_method].get("parameters", []) + request_parameters = paths[path][http_method].get('parameters', []) + security = paths[path][http_method].get('security', []) response_params = self._get_response_definition_schema( - paths[path][http_method].get("responses", {}) + paths[path][http_method].get('responses', {}) ) # create list of parameters: Fetch object schema from OAS file for param in request_parameters: - param["schema"] = self._get_param_definition_schema(param) + param['schema'] = self._get_param_definition_schema(param) requests.append( { - "http_method": http_method, - "path": path, - "request_params": request_parameters, - "response_params": response_params, - "path_params": path_params, + 'http_method': http_method, + 'path': path, + 'request_params': request_parameters, + 'response_params': response_params, + 'path_params': path_params, + 'security': security, } ) return requests + + def _get_security_schemes(self): + """ + Retrieves the security schemes defined in the OpenAPI specification. + + Returns: + dict: A dictionary containing the security schemes defined in the specification. + """ + security_schemes = self.specification.get('securityDefinitions', {}) + if not security_schemes: + logger.warning('Security schemes not found in the OpenAPI specification.') + return security_schemes diff --git a/src/offat/report/generator.py b/src/offat/report/generator.py index af04ea2..8f95bfc 100644 --- a/src/offat/report/generator.py +++ b/src/offat/report/generator.py @@ -20,21 +20,21 @@ class ReportGenerator: @staticmethod def generate_html_report(results: list[dict]): """generates html report from OFFAT results""" - html_report_template_file_name = "report.html" + html_report_template_file_name = 'report.html' html_report_file_path = path_join( dirname(templates.__file__), html_report_template_file_name ) - with open(html_report_file_path, "r", encoding="utf-8") as f: + with open(html_report_file_path, 'r', encoding='utf-8') as f: report_file_content = f.read() # TODO: validate report data to avoid HTML injection attacks. if not isinstance(results, list): - raise ValueError("results arg expects a list[dict].") + raise ValueError('results arg expects a list[dict].') # HTML escape data escaped_results = [] - escape_keys = ["response_body"] + escape_keys = ['response_body'] for result_dict in results: escaped_result_dict = {} for key, value in result_dict.items(): @@ -47,7 +47,7 @@ def generate_html_report(results: list[dict]): escaped_results.append(escaped_result_dict) report_file_content = report_file_content.replace( - "{ results }", json_dumps(escaped_results) + '{ results }', json_dumps(escaped_results) ) return report_file_content @@ -60,42 +60,42 @@ def handle_report_format( result = None match report_format: - case "html": - logger.warning("HTML output format displays only basic data.") + case 'html': + logger.warning('HTML output format displays only basic data.') result = ReportGenerator.generate_html_report(results=results) - case "yaml": + case 'yaml': logger.warning( - "YAML output format needs to be sanitized before using it further." + 'YAML output format needs to be sanitized before using it further.' ) result = yaml_dump( { - "results": results, + 'results': results, } ) - case "json": - report_format = "json" + case 'json': + report_format = 'json' result = json_dumps( { - "results": results, + 'results': results, } ) case _: # default: CLI table # TODO: filter failed requests first and then create new table for failed requests - report_format = "table" + report_format = 'table' results_table = TestResultTable().generate_result_table( deepcopy(results) ) result = results_table - logger.info("Generated %s format report.", report_format.upper()) + logger.info('Generated %s format report.', report_format.upper()) return result @staticmethod def save_report(report_path: str | None, report_file_content: str | Table | None): """saves/prints report to console""" - if report_path != "/" and report_path: + if report_path != '/' and report_path: dir_name = dirname(report_path) - if dir_name != "" and report_path: + if dir_name != '' and report_path: makedirs(dir_name, exist_ok=True) # print to cli if report path and file content as absent else write to file location. @@ -104,8 +104,8 @@ def save_report(report_path: str | None, report_file_content: str | Table | None and report_file_content and not isinstance(report_file_content, Table) ): - with open(report_path, "w", encoding="utf-8") as f: - logger.info("Writing report to file: %s", report_path) + with open(report_path, 'w', encoding='utf-8') as f: + logger.info('Writing report to file: %s', report_path) f.write(report_file_content) else: if isinstance(report_file_content, Table) and report_file_content.columns: @@ -114,7 +114,7 @@ def save_report(report_path: str | None, report_file_content: str | Table | None isinstance(report_file_content, Table) and not report_file_content.columns ): - logger.warning("No Columns found in Table.") + logger.warning('No Columns found in Table.') else: console.print(report_file_content) @@ -127,12 +127,12 @@ def generate_report( ): """main function used to generate report""" if report_path: - report_format = report_path.split(".")[-1] + report_format = report_path.split('.')[-1] # do not store errored results if `capture_failed` is False if not capture_failed: results = list( - filter(lambda result: result.get("error", True) == False, results) + filter(lambda result: result.get('error', True) is False, results) ) formatted_results = ReportGenerator.handle_report_format( diff --git a/src/offat/report/templates/table.py b/src/offat/report/templates/table.py index 52c0325..a2a6a62 100644 --- a/src/offat/report/templates/table.py +++ b/src/offat/report/templates/table.py @@ -75,6 +75,9 @@ def _sanitize_results( if result.get('response_match_regex'): del result['response_match_regex'] + if result.get('security') or result.get('security') == []: + del result['security'] + if result.get('data_leak'): result['data_leak'] = '[bold red]Leak Found \u00d7[/bold red]' else: diff --git a/src/offat/tester/generator.py b/src/offat/tester/generator.py index c7be901..420ef2e 100644 --- a/src/offat/tester/generator.py +++ b/src/offat/tester/generator.py @@ -1,3 +1,6 @@ +''' +This module contains the TestGenerator class which is used to generate API test checks. +''' from copy import deepcopy from .fuzzer import fill_params from .post_test_processor import PostTestFiltersEnum @@ -17,16 +20,19 @@ class TestGenerator: None Methods: - check_unsupported_http_methods: Checks whether endpoint supports undocumented/unsupported HTTP methods. - sqli_fuzz_params: Performs SQL injection (SQLi) parameter fuzzing based on the provided OpenAPIParser instance. + check_unsupported_http_methods: Checks whether endpoint supports + undocumented/unsupported HTTP methods. + sqli_fuzz_params: Performs SQL injection (SQLi) parameter fuzzing + based on the provided OpenAPIParser instance. """ - def __init__(self, headers: dict = None) -> None: + def __init__(self, headers: dict = None) -> None: # type: ignore """ Initializes an instance of the TestGenerator class. Args: - headers (dict, optional): A dictionary of headers to be set for the instance. Defaults to None. + headers (dict, optional): A dictionary of headers to be set + for the instance. Defaults to None. Returns: None @@ -40,16 +46,19 @@ def __init__(self, headers: dict = None) -> None: def check_unsupported_http_methods( self, openapi_parser: SwaggerParser | OpenAPIv3Parser, - success_codes: list[int] = [200, 201, 301, 302], *args, + success_codes: list[int] | None = None, **kwargs, ): '''Checks whether endpoint supports undocumented/unsupported HTTP methods Args: base_url (str): The base URL to check for unsupported HTTP methods. - endpoints (list[tuple]): A list of tuples representing the endpoints to check. Each tuple should contain the endpoint path and the corresponding supported HTTP methods. - success_codes (list[int], optional): A list of HTTP success codes to consider as successful responses. Defaults to [ 200, 201, 301, 302 ]. + endpoints (list[tuple]): A list of tuples representing the endpoints + to check. Each tuple should contain the endpoint path and the + corresponding supported HTTP methods. + success_codes (list[int], optional): A list of HTTP success codes to consider as + successful responses. Defaults to [ 200, 201, 301, 302 ]. *args: Variable-length positional arguments. **kwargs: Arbitrary keyword arguments. @@ -59,6 +68,9 @@ def check_unsupported_http_methods( Raises: Any exceptions raised during the execution. ''' + if success_codes is None: + success_codes = [200, 201, 301, 302] + tasks = [] fuzzed_endpoints = self.__fuzz_request_params(openapi_parser) endpoints_index = {} @@ -111,8 +123,8 @@ def check_unsupported_http_methods( 'args': args, 'kwargs': kwargs, 'result_details': { - True: 'Endpoint does not perform any HTTP method which is not documented', # passed - False: 'Endpoint performs HTTP method which is not documented', # failed + True: "Endpoint doesn't perform any HTTP verb which is not documented", + False: 'Endpoint performs HTTP verb which is not documented', }, 'body_params': body_params, 'query_params': query_params, @@ -124,30 +136,6 @@ def check_unsupported_http_methods( return tasks - def __get_request_params_list(self, request_params: list[dict]): - '''Get list of request parameters''' - payload_data = [] - for request_param in request_params: - param_pos = request_param.get('in') - param_schema = request_param.get('schema') - - if param_schema: - props: dict = param_schema.get('properties', {}) - required_params: list = param_schema.get('required', []) - - for prop in props.keys(): - prop_type = props[prop].get('type') - payload_data.append( - { - 'in': param_pos, - 'name': prop, - 'type': prop_type, - 'required': prop in required_params, - } - ) - - return payload_data - def __fuzz_request_params( self, openapi_parser: SwaggerParser | OpenAPIv3Parser ) -> list[dict]: @@ -170,6 +158,7 @@ def __fuzz_request_params( # handle path params from request_params request_params = path_obj.get('request_params', []) request_params = fill_params(request_params, openapi_parser.is_v3) + security = path_obj.get('security', []) # get params based on their position in request request_body_params = list( @@ -183,7 +172,7 @@ def __fuzz_request_params( ) # get endpoint path - endpoint_path: str = path_obj.get('path') + endpoint_path: str = path_obj.get('path') # type: ignore # get path params and fill them path_params = path_obj.get('path_params', []) @@ -212,7 +201,7 @@ def __fuzz_request_params( 'body_params': request_body_params, 'query_params': request_query_params, 'path_params': path_params, - # 'malicious_payload':path_params, + 'security': security, } ) @@ -244,15 +233,18 @@ def __inject_payload_in_params(self, request_params: list[dict], payload: str): def sqli_fuzz_params_test( self, openapi_parser: SwaggerParser | OpenAPIv3Parser, - success_codes: list[int] = [500], *args, + success_codes: list[int] | None = None, **kwargs, ): - '''Performs SQL injection (SQLi) parameter fuzzing based on the provided OpenAPIParser instance. + '''Performs SQL injection (SQLi) parameter fuzzing based on the + provided OpenAPIParser instance. Args: - openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing the parsed OpenAPI specification. - success_codes (list[int], optional): A list of HTTP success codes to consider as successful SQLi responses. Defaults to [500]. + openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class + containing the parsed OpenAPI specification. + success_codes (list[int], optional): A list of HTTP success codes to + consider as successful SQLi responses. Defaults to [500]. *args: Variable-length positional arguments. **kwargs: Arbitrary keyword arguments. @@ -262,6 +254,8 @@ def sqli_fuzz_params_test( Raises: Any exceptions raised during the execution. ''' + if success_codes is None: + success_codes = [500] # APPROACH: first send sqli in all params, if error is generated # then enumerate one by one or ask user to pentest manually using @@ -292,7 +286,8 @@ def sqli_fuzz_params_test( query_request_params, sqli_payload ) - # BUG: for few SQLi test, path params injected value is not matching with final URI path params in output + # BUG: for few SQLi test, path params injected value is not matching + # with final URI path params in output request_obj['test_name'] = 'SQLi Test' request_obj['body_params'] = malicious_body_request_params @@ -317,15 +312,17 @@ def sqli_fuzz_params_test( def sqli_in_uri_path_fuzz_test( self, openapi_parser: SwaggerParser | OpenAPIv3Parser, - success_codes: list[int] = [500], *args, + success_codes: list[int] | None = None, **kwargs, ): '''Generate Tests for SQLi in endpoint path Args: - openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing the parsed OpenAPI specification. - success_codes (list[int], optional): A list of HTTP success codes to consider as successful BOLA responses. Defaults to [200, 201, 301]. + openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class + containing the parsed OpenAPI specification. + success_codes (list[int], optional): A list of HTTP success codes to + consider as successful BOLA responses. Defaults to [200, 201, 301]. *args: Variable-length positional arguments. **kwargs: Arbitrary keyword arguments. @@ -335,14 +332,18 @@ def sqli_in_uri_path_fuzz_test( Raises: Any exceptions raised during the execution. ''' + if success_codes is None: + success_codes = [500] + base_url: str = openapi_parser.base_url request_response_params: list[dict] = openapi_parser.request_response_params # filter path containing params in path endpoints_with_param_in_path = list( filter( - lambda path_obj: '/{' in path_obj.get('path'), request_response_params - ) + lambda path_obj: '/{' in path_obj.get('path'), # type: ignore + request_response_params, # type: ignore + ) # type: ignore ) basic_sqli_payloads = [ @@ -379,7 +380,6 @@ def sqli_in_uri_path_fuzz_test( for path_param in path_params: path_param_name = path_param.get('name') - # path_param_value = path_param.get('value') endpoint_path = endpoint_path.replace( '{' + str(path_param_name) + '}', str(sqli_payload) ) @@ -418,15 +418,17 @@ def sqli_in_uri_path_fuzz_test( def bola_fuzz_path_test( self, openapi_parser: SwaggerParser | OpenAPIv3Parser, - success_codes: list[int] = [200, 201, 301], *args, + success_codes: list[int] | None = None, # type: ignore **kwargs, ): '''Generate Tests for BOLA in endpoint path Args: - openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing the parsed OpenAPI specification. - success_codes (list[int], optional): A list of HTTP success codes to consider as successful BOLA responses. Defaults to [200, 201, 301]. + openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class + containing the parsed OpenAPI specification. + success_codes (list[int], optional): A list of HTTP success codes to consider + as successful BOLA responses. Defaults to [200, 201, 301]. *args: Variable-length positional arguments. **kwargs: Arbitrary keyword arguments. @@ -436,14 +438,17 @@ def bola_fuzz_path_test( Raises: Any exceptions raised during the execution. ''' + if success_codes is None: + success_codes: list[int] = [200, 201, 301] + base_url: str = openapi_parser.base_url request_response_params: list[dict] = openapi_parser.request_response_params # filter path containing params in path endpoints_with_param_in_path = list( filter( - lambda path_obj: '/{' in path_obj.get('path'), request_response_params - ) + lambda path_obj: '/{' in path_obj.get('path'), request_response_params # type: ignore + ) # type: ignore ) tasks = [] @@ -480,7 +485,6 @@ def bola_fuzz_path_test( tasks.append( { 'test_name': 'BOLA Path Test with Fuzzed Params', - # f'{base_url}{endpoint_path}', 'url': join_uri_path( base_url, openapi_parser.api_base_path, endpoint_path ), @@ -508,15 +512,17 @@ def bola_fuzz_path_test( def bola_fuzz_trailing_slash_path_test( self, openapi_parser: SwaggerParser | OpenAPIv3Parser, - success_codes: list[int] = [200, 201, 301], *args, + success_codes: list[int] | None = None, # type: ignore **kwargs, ): '''Generate Tests for BOLA in endpoint path Args: - openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing the parsed OpenAPI specification. - success_codes (list[int], optional): A list of HTTP success codes to consider as successful BOLA responses. Defaults to [200, 201, 301]. + openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class + containing the parsed OpenAPI specification. + success_codes (list[int], optional): A list of HTTP success codes to + consider as successful BOLA responses. Defaults to [200, 201, 301]. *args: Variable-length positional arguments. **kwargs: Arbitrary keyword arguments. @@ -526,6 +532,9 @@ def bola_fuzz_trailing_slash_path_test( Raises: Any exceptions raised during the execution. ''' + if success_codes is None: + success_codes: list[int] = [200, 201, 301] + base_url: str = openapi_parser.base_url request_response_params: list[dict] = openapi_parser.request_response_params @@ -547,7 +556,7 @@ def bola_fuzz_trailing_slash_path_test( ) # get endpoint path - endpoint_path: str = path_obj.get('path') + endpoint_path: str = path_obj.get('path') # type: ignore # get path params and fill them path_params = path_obj.get('path_params', []) @@ -578,7 +587,7 @@ def bola_fuzz_trailing_slash_path_test( 'endpoint': join_uri_path( openapi_parser.api_base_path, endpoint_path ), - 'method': path_obj.get('http_method').upper(), + 'method': path_obj.get('http_method').upper(), # type: ignore 'body_params': request_body_params, 'query_params': request_query_params, 'path_params': path_params, @@ -628,15 +637,17 @@ def _inject_response_params(self, response_params: dict, is_v3: bool = False): def bopla_fuzz_test( self, openapi_parser: SwaggerParser | OpenAPIv3Parser, - success_codes: list[int] = [200, 201, 301], *args, + success_codes: list[int] | None = None, # type: ignore **kwargs, ): '''Generate Tests for BOPLA/Mass Assignment Vulnerability Args: - openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing the parsed OpenAPI specification. - success_codes (list[int], optional): A list of HTTP success codes to consider as successful BOLA responses. Defaults to [200, 201, 301]. + openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class + containing the parsed OpenAPI specification. + success_codes (list[int], optional): A list of HTTP success codes to + consider as successful BOLA responses. Defaults to [200, 201, 301]. *args: Variable-length positional arguments. **kwargs: Arbitrary keyword arguments. @@ -646,6 +657,9 @@ def bopla_fuzz_test( Raises: Any exceptions raised during the execution. ''' + if success_codes is None: + success_codes: list[int] = [200, 201, 301] + base_url: str = openapi_parser.base_url request_response_params: list[dict] = openapi_parser.request_response_params @@ -669,7 +683,7 @@ def bopla_fuzz_test( # handle path params from path_params # and replace path params by value in # endpoint path - endpoint_path: str = path_obj.get('path') + endpoint_path: str = path_obj.get('path') # type: ignore path_params = path_obj.get('path_params', []) path_params = fill_params(path_params, openapi_parser.is_v3) path_params = get_unique_params(path_params_in_body, path_params) @@ -719,16 +733,17 @@ def test_with_user_data( self, user_data: dict, test_generator_method, + *args, test_for_actor1: bool = True, test_for_actor2: bool = False, - *args, **kwargs, ): '''Generate Tests with user sepecified data using provided test generator method Args: user_data (dict): User specified YAML data as dict. - test_generator_method (class method): test generator class method to be used for generating API pentest tests. + test_generator_method (class method): test generator class method to be + used for generating API pentest tests. test_for_actor1 (bool): Generate tests for actor1 user data test_for_actor2 (bool): Generate tests for actor2 user data *args: Variable-length positional arguments. @@ -764,11 +779,14 @@ def __generate_injection_fuzz_params_test( *args, **kwargs, ): - '''Performs injection parameter fuzzing based on the provided OpenAPIParser instance and matches injected payload using regex in response. + '''Performs injection parameter fuzzing based on the provided OpenAPIParser instance + and matches injected payload using regex in response. Args: - openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing the parsed OpenAPI specification. - payloads_data (list[dict]): list of dictionary containing malicious request payload and regex for matching injection in response. + openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing + the parsed OpenAPI specification. + payloads_data (list[dict]): list of dictionary containing malicious request payload + and regex for matching injection in response. *args: Variable-length positional arguments. **kwargs: Arbitrary keyword arguments. @@ -823,10 +841,12 @@ def __generate_injection_fuzz_params_test( def os_command_injection_fuzz_params_test( self, openapi_parser: SwaggerParser | OpenAPIv3Parser ): - '''Performs OS Command injection parameter fuzzing based on the provided OpenAPIParser instance. + '''Performs OS Command injection parameter fuzzing based on the provided + OpenAPIParser instance. Args: - openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing the parsed OpenAPI specification. + openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing the + parsed OpenAPI specification. *args: Variable-length positional arguments. **kwargs: Arbitrary keyword arguments. @@ -838,9 +858,10 @@ def os_command_injection_fuzz_params_test( ''' test_name = 'OS Command Injection Test' + root_regex = r'root:.*' payloads_data = [ - {'request_payload': 'cat /etc/passwd', 'response_match_regex': r'root:.*'}, - {'request_payload': 'cat /etc/shadow', 'response_match_regex': r'root:.*'}, + {'request_payload': 'cat /etc/passwd', 'response_match_regex': root_regex}, + {'request_payload': 'cat /etc/shadow', 'response_match_regex': root_regex}, {'request_payload': 'ls -la', 'response_match_regex': r'total\s\d+'}, ] @@ -859,10 +880,12 @@ def os_command_injection_fuzz_params_test( def xss_html_injection_fuzz_params_test( self, openapi_parser: SwaggerParser | OpenAPIv3Parser ): - '''Performs OS Command injection parameter fuzzing based on the provided OpenAPIParser instance. + '''Performs OS Command injection parameter fuzzing based on the + provided OpenAPIParser instance. Args: - openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing the parsed OpenAPI specification. + openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class + containing the parsed OpenAPI specification. *args: Variable-length positional arguments. **kwargs: Arbitrary keyword arguments. @@ -905,7 +928,8 @@ def ssti_fuzz_params_test(self, openapi_parser: SwaggerParser | OpenAPIv3Parser) '''Performs SSTI fuzzing based on the provided OpenAPIParser instance. Args: - openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class containing the parsed OpenAPI specification. + openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class + containing the parsed OpenAPI specification. *args: Variable-length positional arguments. **kwargs: Arbitrary keyword arguments. @@ -955,3 +979,103 @@ def ssti_fuzz_params_test(self, openapi_parser: SwaggerParser | OpenAPIv3Parser) result_details=result_details, payloads_data=payloads_data, ) + + def missing_auth_fuzz_test( + self, + openapi_parser: SwaggerParser | OpenAPIv3Parser, + *args, + success_codes: list[int] | None = None, # type: ignore + **kwargs, + ): + '''Generate Tests for API endpoints which documents security authentication but doesn't check for authentication properly + + Args: + openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class + containing the parsed OpenAPI specification. + success_codes (list[int], optional): A list of HTTP success codes to consider + as successful BOLA responses. Defaults to [200, 201, 301]. + *args: Variable-length positional arguments. + **kwargs: Arbitrary keyword arguments. + + Returns: + list[dict]: list of dict containing test case for endpoint + + Raises: + Any exceptions raised during the execution. + ''' + if success_codes is None: + success_codes: list[int] = [200, 201, 301] + + base_url: str = openapi_parser.base_url + request_response_params: list[dict] = openapi_parser.request_response_params + + # pop authorization header from kwargs.headers dict + kwargs.get('headers', {}).pop('Authorization', None) + kwargs.get('headers', {}).pop('X-Api-Key', None) + + # filter endpoints which has security definition in their documentation + endpoints_with_security = list( + filter( + lambda path_obj: path_obj.get('security') + and path_obj.get('security') != [{}], + request_response_params, # type: ignore + ) # type: ignore + ) + + tasks = [] + for path_obj in endpoints_with_security: + # handle path params from request_params + request_params = path_obj.get('request_params', []) + request_params = fill_params(request_params, openapi_parser.is_v3) + + # get request body params + request_body_params = list( + filter(lambda x: x.get('in') == 'body', request_params) + ) + + endpoint_path: str = path_obj.get('path') # type: ignore + + path_params = path_obj.get('path_params', []) + path_params_in_body = list( + filter(lambda x: x.get('in') == 'path', request_params) + ) + path_params = fill_params(path_params, openapi_parser.is_v3) + path_params = get_unique_params(path_params_in_body, path_params) + + for path_param in path_params: + path_param_name = path_param.get('name') + path_param_value = path_param.get('value') + endpoint_path = endpoint_path.replace( + '{' + str(path_param_name) + '}', str(path_param_value) + ) + + request_query_params = list( + filter(lambda x: x.get('in') == 'query', request_params) + ) + + tasks.append( + { + 'test_name': 'Missing Authentication Test with Fuzzed Params', + 'url': join_uri_path( + base_url, openapi_parser.api_base_path, endpoint_path + ), + 'endpoint': join_uri_path( + openapi_parser.api_base_path, endpoint_path + ), + 'method': path_obj.get('http_method').upper(), # type: ignore + 'body_params': request_body_params, + 'query_params': request_query_params, + 'path_params': path_params, + 'malicious_payload': 'Security Payload Missing', + 'args': args, + 'kwargs': kwargs, + 'result_details': { + True: 'Endpoint implements security authentication as defined', # passed + False: 'Endpoint fails to implement security authentication as defined', # failed + }, + 'success_codes': success_codes, + 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name, + } + ) + + return tasks diff --git a/src/offat/tester/regexs.py b/src/offat/tester/regexs.py index e083489..ed602b1 100644 --- a/src/offat/tester/regexs.py +++ b/src/offat/tester/regexs.py @@ -1,28 +1,28 @@ sensitive_data_regex_patterns = { # General Data - "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b", + 'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b', # 'passwordOrToken': r'(^|\s|")(?=.*[A-Za-z])(?=.*\d)(?=.*[@$!%*#?&_])[A-Za-z\d@$!%*#?&_]{10,}($|\s|")', # Assuming the password contains at least 1 uppercase letter, 1 lowercase letter, 1 digit, 1 special character, and is at least 8 characters long. - "date": r"\b\d{2}/\d{2}/\d{4}\b", - "ip": r"(?:\d{1,3}\.){3}\d{1,3}\b|\b(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}\b", - "ccn": r"\b\d{4}-\d{4}-\d{4}-\d{4}\b", - "jwtToken": r'(^|\s|")[A-Za-z0-9_-]{2,}(?:\.[A-Za-z0-9_-]{2,}){2}($|\s|")', - "ato_data": r"\b(auth_code|otp|password|password_hash|auth_token|access_token|refresh_token|secret|session_id|key|pin|accessToken|refreshToken|authenticationCode|authentication_code|jwt|api_secret|apiSecret)\b", + 'date': r'\b\d{2}/\d{2}/\d{4}\b', + 'ip': r'(?:\d{1,3}\.){3}\d{1,3}\b|\b(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}\b', + 'ccn': r'\b\d{4}-\d{4}-\d{4}-\d{4}\b', + 'jwtToken': r'(^|\s|")[A-Za-z0-9_-]{2,}(?:\.[A-Za-z0-9_-]{2,}){2}($|\s|")', + 'ato_data': r'\b(auth_code|otp|password|password_hash|auth_token|access_token|refresh_token|secret|session_id|key|pin|accessToken|refreshToken|authenticationCode|authentication_code|jwt|api_secret|apiSecret)\b', # BRAZIL - "BrazilCPF": r"\b(\d{3}\.){2}\d{3}\-\d{2}\b", + 'BrazilCPF': r'\b(\d{3}\.){2}\d{3}\-\d{2}\b', # INDIA # Assuming the format: AAAAB1234C (5 uppercase letters, 4 digits, 1 uppercase letter) - "pan": r"\b[A-Z]{5}\d{4}[A-Z]{1}\b", + 'pan': r'\b[A-Z]{5}\d{4}[A-Z]{1}\b', # Assuming the format XXXX XXXX XXXX (4 digits, space, 4 digits, space, 4 digits) - "aadhaarCard": r"\b\d{4}\s\d{4}\s\d{4}\b", - "PhoneNumberIN": r"((\+*)((0[ -]*)*|((91 )*))((\d{12})+|(\d{10})+))|\d{5}([- ]*)\d{6}", + 'aadhaarCard': r'\b\d{4}\s\d{4}\s\d{4}\b', + 'PhoneNumberIN': r'\b((\+*)((0[ -]*)*|((91 )*))((\d{12})+|(\d{10})+))|\d{5}([- ]*)\d{6}\b', # US - "ssn": r"\b\d{3}-\d{2}-\d{4}\b", - "PhoneNumberUS": r'(^|\s|")(1\s?)?(\d{3}|\(\d{3}\))[\s\-]?\d{3}[\s\-]?\d{4}(?:$|\s|")', + 'ssn': r'\b\d{3}-\d{2}-\d{4}\b', + 'PhoneNumberUS': r'\b(^|\s|")(1\s?)?(\d{3}|\(\d{3}\))[\s\-]?\d{3}[\s\-]?\d{4}(?:$|\s|")\b', # AWS # Assuming the format: AKIA followed by 16 uppercase alphanumeric characters - "AWSAccessKey": r"\bAKIA[0-9A-Z]{16}\b", + 'AWSAccessKey': r'\bAKIA[0-9A-Z]{16}\b', # Assuming the format: 40 alphanumeric characters, including + and / - "AWSSecretKey": r"\b[0-9a-zA-Z/+]{40}\b", - "AWSResourceURL": r"\b([A-Za-z0-9-_]*\.[A-Za-z0-9-_]*\.amazonaws.com*)\b", - "AWSArnId": r"\barn:aws:[A-Za-z0-9-_]*\:[A-Za-z0-9-_]*\:[A-Za-z0-9-_]*\:[A-Za-z0-9-/_]*\b", + 'AWSSecretKey': r'\b[0-9a-zA-Z/+]{40}\b', + 'AWSResourceURL': r'\b([A-Za-z0-9-_]*\.[A-Za-z0-9-_]*\.amazonaws.com*)\b', + 'AWSArnId': r'\barn:aws:[A-Za-z0-9-_]*\:[A-Za-z0-9-_]*\:[A-Za-z0-9-_]*\:[A-Za-z0-9-/_]*\b', } diff --git a/src/offat/tester/tester_utils.py b/src/offat/tester/tester_utils.py index f96470d..fb30d60 100644 --- a/src/offat/tester/tester_utils.py +++ b/src/offat/tester/tester_utils.py @@ -232,9 +232,7 @@ def generate_and_run_tests( # XSS/HTML Injection Fuzz Test test_name = 'Checking for XSS/HTML Injection Vulnerability with fuzzed params and checking response body' # noqa: E501 logger.info(test_name) - xss_injection_tests = test_generator.xss_html_injection_fuzz_params_test( - api_parser - ) + xss_injection_tests = test_generator.xss_html_injection_fuzz_params_test(api_parser) results += run_test( test_runner=test_runner, tests=xss_injection_tests, @@ -296,6 +294,18 @@ def generate_and_run_tests( post_run_matcher_test=True, ) + # Missing Authorization Test + test_name = 'Checking for Missing Authorization' + logger.info(test_name) + missing_auth_tests = test_generator.missing_auth_fuzz_test(api_parser) + results += run_test( + test_runner=test_runner, + tests=missing_auth_tests, + regex_pattern=regex_pattern, + description=f'(FUZZED) {test_name}', + post_run_matcher_test=False, + ) + # Tests with User provided Data if bool(test_data_config): logger.info('[bold] Testing with user provided data [/bold]') @@ -380,6 +390,22 @@ def generate_and_run_tests( post_run_matcher_test=True, ) + # Missing Authorization Test + test_name = 'Checking for Missing Authorization with user data' + logger.info(test_name) + missing_auth_tests = test_generator.test_with_user_data( + test_data_config, + test_generator.missing_auth_fuzz_test, + openapi_parser=api_parser, + ) + results += run_test( + test_runner=test_runner, + tests=missing_auth_tests, + regex_pattern=regex_pattern, + description=f'(USER + FUZZED) {test_name}', + post_run_matcher_test=False, + ) + # Broken Access Control Test test_name = 'Checking for Broken Access Control' logger.info(test_name) diff --git a/src/poetry.lock b/src/poetry.lock index 7fc3d25..5b50680 100644 --- a/src/poetry.lock +++ b/src/poetry.lock @@ -965,18 +965,17 @@ typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" [[package]] name = "pygments" -version = "2.17.2" +version = "2.18.0" description = "Pygments is a syntax highlighting package written in Python." category = "main" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "pygments-2.17.2-py3-none-any.whl", hash = "sha256:b27c2826c47d0f3219f29554824c30c5e8945175d888647acd804ddd04af846c"}, - {file = "pygments-2.17.2.tar.gz", hash = "sha256:da46cec9fd2de5be3a8a784f434e4c4ab670b4ff54d605c4c2717e9d49c4c367"}, + {file = "pygments-2.18.0-py3-none-any.whl", hash = "sha256:b8e6aca0523f3ab76fee51799c488e38782ac06eafcf95e7ba832985c8e7b13a"}, + {file = "pygments-2.18.0.tar.gz", hash = "sha256:786ff802f32e91311bff3889f6e9a86e81505fe99f2735bb6d60ae0c5004f199"}, ] [package.extras] -plugins = ["importlib-metadata"] windows-terminal = ["colorama (>=0.4.6)"] [[package]] diff --git a/src/pyproject.toml b/src/pyproject.toml index 9b4365e..009f21e 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "offat" -version = "0.17.4" +version = "0.17.5" description = "Offensive API tester tool automates checks for common API vulnerabilities" authors = ["Dhrumil Mistry "] license = "MIT"