diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index 5ebefe7f..9c320d4d 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -23,9 +23,9 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import argparse import concurrent.futures import copy -import getopt # pylint: disable=deprecated-module import json import multiprocessing import os @@ -33,6 +33,7 @@ import sys import traceback + # pylint: disable=unused-import from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401 from typing import cast, Callable, Optional, Union, Any # noqa: F401 @@ -89,7 +90,7 @@ def usage(uout: OutputBuffer, err: Optional[str] = None) -> None: if err is not None and len(err) > 0: uout.fail(err + '\n') retval = exitcodes.UNKNOWN_ERROR - uout.info('usage: {0} [options] \n'.format(p)) + uout.info('usage: {0} [options] -ip \n'.format(p)) uout.info(' -h, --help print this help') uout.info(' -1, --ssh1 force ssh version 1 only') uout.info(' -2, --ssh2 force ssh version 2 only') @@ -116,6 +117,8 @@ def usage(uout: OutputBuffer, err: Optional[str] = None) -> None: uout.info(' -g, --gex-test= dh gex modulus size test') uout.info(' ') uout.info(' ') + uout.info(' --hostname hostname of target to scan') + uout.info(' -ip, --ip-address ip address of target to scan') uout.info(' -j, --json JSON output (use -jj to enable indents)') uout.info(' -l, --level= minimum output level (info|warn|fail)') uout.info(' -L, --list-policies list all the official, built-in policies. Use with -v') @@ -836,82 +839,122 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. aconf.colors = enable_colors out.use_colors = enable_colors - try: - sopts = 'h1246M:p:P:jbcnvl:t:T:Lmdg:' - lopts = ['help', 'ssh1', 'ssh2', 'ipv4', 'ipv6', 'make-policy=', 'port=', 'policy=', 'json', 'batch', 'client-audit', 'no-colors', 'verbose', 'level=', 'timeout=', 'targets=', 'list-policies', 'lookup=', 'threads=', 'manual', 'debug', 'gex-test=', 'dheat=', 'skip-rate-test', 'conn-rate-test='] - opts, args = getopt.gnu_getopt(args, sopts, lopts) - except getopt.GetoptError as err: - usage_cb(out, str(err)) aconf.ssh1, aconf.ssh2 = False, False host: str = '' - oport: Optional[str] = None - port: int = 0 - for o, a in opts: - if o in ('-h', '--help'): + + parser = argparse.ArgumentParser(prog='SSH Audit Tool', description='SSH Audit Tool', add_help=False, allow_abbrev=False) + + # Add short options to the parser + parser.add_argument('-1', '--ssh1', action="store_true", dest='ssh1', default=None) + parser.add_argument('-2', '--ssh2', action="store_true", dest='ssh2', default=None) + parser.add_argument('-4', '--ipv4', action="store_true", dest='ipv4', default=None) + parser.add_argument('-6', '--ipv6', action="store_true", dest='ipv6', default=None) + parser.add_argument('-b', '--batch', action="store_true", dest='batch', default=None) + parser.add_argument('-c', '--client-audit', action="store_true", dest='client_audit', default=None) + parser.add_argument('-d', '--debug', action="store_true", dest='debug', default=None) + parser.add_argument('-g', '--gex-test', action="store", dest='gex_test', default=None) + parser.add_argument('-h', '--help', action="store_true", dest='help', default=None) + parser.add_argument('-ip', '--ip-address', '--hostname', action="store", dest='host', type=str) + parser.add_argument('-j', '--json', action="store_true", dest='json', default=None) + parser.add_argument('-jj', '--json-indent', action="store_true", dest='json_indent', default=None) + parser.add_argument('-l', '--level', action="store", dest='level', type=str, default='info') + parser.add_argument('-L', '--list-policies', action="store_true", dest='list_policies', default=None) + parser.add_argument('-M', '--make-policy', action="store", dest='make_policy', default=None) + parser.add_argument('-m', '--manual', action="store_true", dest='manual', default=None) + parser.add_argument('-n', '--no-colors', action="store_true", dest='no_colors', default=None) + parser.add_argument('-P', '--policy', action="store", dest='policy', default=None) + parser.add_argument('-p', '--port', action="store", dest='port', default='22', type=int) + parser.add_argument('-T', '--targets', action="store", dest='targets', default=None) + parser.add_argument('-t', '--timeout', action="store", dest='timeout', default='5', type=int) + parser.add_argument('-v', '--verbose', action="store_true", dest='verbose', default=None) + + + # Add long options to the parser + parser.add_argument('--conn-rate-test', action="store", dest='conn_rate_test', default='0', type=int) + parser.add_argument('--dheat', action="store", dest='dheat', default='0', type=int) + parser.add_argument('--lookup', action="store", dest='lookup', default=None) + parser.add_argument('--skip-rate-test', action="store_true", dest='skip_rate_test', default=None) + parser.add_argument('--threads', action="store", dest='threads', default='32', type=int) + + try: + argument = parser.parse_args() + + if argument.help is True: usage_cb(out) - elif o in ('-1', '--ssh1'): - aconf.ssh1 = True - elif o in ('-2', '--ssh2'): - aconf.ssh2 = True - elif o in ('-4', '--ipv4'): - aconf.ipv4 = True - elif o in ('-6', '--ipv6'): - aconf.ipv6 = True - elif o in ('-p', '--port'): - oport = a - elif o in ('-b', '--batch'): + + aconf.host = argument.host + host = argument.host + port = argument.port + aconf.ssh1 = argument.ssh1 + aconf.ssh2 = argument.ssh2 + aconf.ipv4 = argument.ipv4 + aconf.ipv6 = argument.ipv6 + + aconf.json = argument.json + if argument.json_indent is True: + setattr(argument, 'json', True) + aconf.json = argument.json + aconf.json_print_indent = argument.json_indent + + if argument.batch is True: aconf.batch = True aconf.verbose = True - elif o in ('-c', '--client-audit'): - aconf.client_audit = True - elif o in ('-j', '--json'): - if aconf.json: # If specified twice, enable indent printing. - aconf.json_print_indent = True - else: - aconf.json = True - elif o in ('-v', '--verbose'): + + aconf.client_audit = argument.client_audit + + ttime = argument.timeout + if ttime != 5: + aconf.timeout = float(argument.timeout) + aconf.timeout_set = True + + if argument.verbose is True: aconf.verbose = True out.verbose = True - elif o in ('-l', '--level'): - if a not in ('info', 'warn', 'fail'): - usage_cb(out, 'level {} is not valid'.format(a)) - aconf.level = a - elif o in ('-t', '--timeout'): - aconf.timeout = float(a) - aconf.timeout_set = True - elif o in ('-M', '--make-policy'): + + # Get error level regex + err_level = argument.level + if err_level in ["info", "warn", "fail"]: + aconf.level = str(argument.level) + else: + usage_cb(out, 'Error level : {} is not valid'.format(err_level)) + + if getattr(argument, 'make_policy') is True: aconf.make_policy = True - aconf.policy_file = a - elif o in ('-P', '--policy'): - aconf.policy_file = a - elif o in ('-T', '--targets'): - aconf.target_file = a - - # If we're on Windows, and we can't use the idna workaround, force only one thread to be used (otherwise a crash would occur). - # if no_idna_workaround: - # print("\nWARNING: the idna module was not found on this system, thus only single-threaded scanning will be done (this is a workaround for this Windows-specific crash: https://github.com/python/cpython/issues/73474). Multi-threaded scanning can be enabled by installing the idna module (pip install idna).\n") - # aconf.threads = 1 - elif o == '--threads': - aconf.threads = int(a) - # if no_idna_workaround: - # aconf.threads = 1 - elif o in ('-L', '--list-policies'): + aconf.policy_file = argument.make_policy + + if getattr(argument, 'policy') is True: + aconf.policy_file = argument.policy + + if getattr(argument, 'targets') is True: + aconf.target_file = argument.targets + + if argument.threads != 32: + aconf.threads = argument.threads + + if getattr(argument, 'list_policies') is True: aconf.list_policies = True - elif o == '--lookup': - aconf.lookup = a - elif o in ('-m', '--manual'): + + if getattr(argument, 'lookup') is True: + aconf.lookup = argument.lookup + + if getattr(argument, 'manual') is True: aconf.manual = True - elif o in ('-d', '--debug'): + else: + aconf.manual = False + + if argument.debug is True: aconf.debug = True out.debug = True - elif o in ('-g', '--gex-test'): + + if getattr(argument, 'gex_test') is True: + dh_gex = argument.gex_test permitted_syntax = get_permitted_syntax_for_gex_test() - if not any(re.search(regex_str, a) for regex_str in permitted_syntax.values()): - usage_cb(out, '{} {} is not valid'.format(o, a)) + if not any(re.search(regex_str, dh_gex) for regex_str in permitted_syntax.values()): + usage_cb(out, '{} is not valid'.format(dh_gex)) - if re.search(permitted_syntax['RANGE'], a): - extracted_digits = re.findall(r'\d+', a) + if re.search(permitted_syntax['RANGE'], dh_gex): + extracted_digits = re.findall(r'\d+', dh_gex) bits_left_bound = int(extracted_digits[0]) bits_right_bound = int(extracted_digits[1]) @@ -920,21 +963,26 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. bits_step = int(extracted_digits[2]) if bits_step <= 0: - usage_cb(out, '{} {} is not valid'.format(o, bits_step)) + usage_cb(out, '{} {} is not valid'.format(dh_gex, bits_step)) if all(x < 0 for x in (bits_left_bound, bits_right_bound)): - usage_cb(out, '{} {} {} is not valid'.format(o, bits_left_bound, bits_right_bound)) + usage_cb(out, '{} {} {} is not valid'.format(dh_gex, bits_left_bound, bits_right_bound)) - aconf.gex_test = a - elif o == '--dheat': - aconf.dheat = a - elif o == '--skip-rate-test': - aconf.skip_rate_test = True - elif o == '--conn-rate-test': - aconf.conn_rate_test = a + aconf.gex_test = argument.gex_test - if len(args) == 0 and aconf.client_audit is False and aconf.target_file is None and aconf.list_policies is False and aconf.lookup == '' and aconf.manual is False: + if int(argument.dheat) > 0: + aconf.dheat = argument.dheat + + aconf.skip_rate_test = argument.skip_rate_test + + if int(argument.conn_rate_test) > 0: + aconf.conn_rate_test = argument.conn_rate_test + + except argparse.ArgumentError as err: + usage_cb(out, str(err)) + + if argument.host is None and argument.client_audit is None and argument.targets is None and argument.list_policies is None and argument.lookup is None and argument.manual is None: usage_cb(out) if aconf.manual: @@ -947,24 +995,20 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. list_policies(out, aconf.verbose) sys.exit(exitcodes.GOOD) - if aconf.client_audit is False and aconf.target_file is None: - if oport is not None: - host = args[0] - else: - host, port = Utils.parse_host_and_port(args[0]) - if not host and aconf.target_file is None: + if aconf.client_audit is None and aconf.target_file is None: + host = argument.host + port = argument.port + + if argument.host is None and aconf.target_file is None: usage_cb(out, 'host is empty') - if port == 0 and oport is None: - if aconf.client_audit: # The default port to listen on during a client audit is 2222. - port = 2222 - else: - port = 22 + if aconf.client_audit is True: # The default port to listen on during a client audit is 2222. + port = 2222 - if oport is not None: - port = Utils.parse_int(oport) + if argument.port != 22: + port = Utils.parse_int(argument.port) if port <= 0 or port > 65535: - usage_cb(out, 'port {} is not valid'.format(oport)) + usage_cb(out, 'port {} is not valid'.format(argument.port)) aconf.host = host aconf.port = port