1# -*- coding: utf-8 -*- 2# Copyright (C) 2018-2021 Greenbone Networks GmbH 3# 4# SPDX-License-Identifier: GPL-3.0-or-later 5# 6# This program is free software: you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation, either version 3 of the License, or 9# (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program. If not, see <http://www.gnu.org/licenses/>. 18 19"""Command Line Interface Parser 20""" 21 22import argparse 23import logging 24 25from pathlib import Path 26 27from gvm import get_version as get_gvm_version 28from gvm.connections import ( 29 DEFAULT_TIMEOUT, 30 SSHConnection, 31 TLSConnection, 32 UnixSocketConnection, 33) 34 35from gvmtools import get_version 36from gvmtools.config import Config 37 38logger = logging.getLogger(__name__) 39 40__version__ = get_version() 41__api_version__ = get_gvm_version() 42 43DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf' 44 45PROTOCOL_OSP = 'OSP' 46PROTOCOL_GMP = 'GMP' 47DEFAULT_PROTOCOL = PROTOCOL_GMP 48 49 50class CliParser: 51 def __init__( 52 self, description, logfilename, *, prog=None, ignore_config=False 53 ): 54 bootstrap_parser = argparse.ArgumentParser( 55 prog=prog, 56 description=description, 57 formatter_class=argparse.RawTextHelpFormatter, 58 # don't parse help initially. the args from parser wouldn't be shown 59 add_help=False, 60 ) 61 62 bootstrap_parser.add_argument( 63 '-c', 64 '--config', 65 nargs='?', 66 default=DEFAULT_CONFIG_PATH, 67 help='Configuration file path (default: %(default)s)', 68 ) 69 70 choices = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] 71 72 bootstrap_parser.add_argument( 73 '--log', 74 nargs='?', 75 dest='loglevel', 76 const='INFO', 77 type=lambda arg: {x.upper(): x for x in choices}[arg.upper()], 78 choices=choices, 79 help='Activate logging (default level: %(default)s)', 80 ) 81 82 parser = argparse.ArgumentParser(prog=prog, parents=[bootstrap_parser]) 83 84 parser.add_argument( 85 '--timeout', 86 required=False, 87 default=DEFAULT_TIMEOUT, 88 type=int, 89 help='Response timeout in seconds, or -1 to wait ' 90 'indefinitely (default: %(default)s)', 91 ) 92 parser.add_argument( 93 '--gmp-username', 94 help='Username for GMP service (default: %(default)r)', 95 ) 96 parser.add_argument( 97 '--gmp-password', 98 help='Password for GMP service (default: %(default)r)', 99 ) 100 parser.add_argument( 101 '-V', 102 '--version', 103 action='version', 104 version=f'%(prog)s {__version__} (API version {__api_version__})', 105 help='Show version information and exit', 106 ) 107 108 subparsers = parser.add_subparsers( 109 metavar='CONNECTION_TYPE', 110 title='connections', 111 description='valid connection types', 112 help="Connection type to use", 113 ) 114 subparsers.required = True 115 subparsers.dest = 'connection_type' 116 117 self._subparsers = subparsers 118 119 self._parser = parser 120 self._bootstrap_parser = bootstrap_parser 121 122 self._logfilename = logfilename 123 self._ignore_config = ignore_config 124 125 self._add_subparsers() 126 127 def parse_args(self, args=None): 128 args, unkown_args = self.parse_known_args(args) 129 if unkown_args: 130 self._parser.error( 131 f'unrecognized arguments {" ".join(unkown_args)}' 132 ) 133 return args 134 135 def parse_known_args(self, args=None): 136 args_before, _ = self._bootstrap_parser.parse_known_args(args) 137 138 if args_before.loglevel is not None: 139 level = logging.getLevelName(args_before.loglevel) 140 logging.basicConfig(filename=self._logfilename, level=level) 141 142 self._set_defaults(None if self._ignore_config else args_before.config) 143 144 args, unknown_args = self._parser.parse_known_args(args) 145 146 # If timeout value is -1, then the socket should have no timeout 147 if args.timeout == -1: 148 args.timeout = None 149 150 logging.debug('Parsed arguments %r', args) 151 152 return args, unknown_args 153 154 def add_argument(self, *args, **kwargs): 155 self._parser_socket.add_argument(*args, **kwargs) 156 self._parser_ssh.add_argument(*args, **kwargs) 157 self._parser_tls.add_argument(*args, **kwargs) 158 159 def add_protocol_argument(self): 160 self._parser.add_argument( 161 '--protocol', 162 required=False, 163 default=DEFAULT_PROTOCOL, 164 choices=[PROTOCOL_GMP, PROTOCOL_OSP], 165 help='Service protocol to use (default: %(default)s)', 166 ) 167 168 def _load_config(self, configfile): 169 config = Config() 170 171 if not configfile: 172 return config 173 174 configpath = Path(configfile) 175 176 try: 177 if not configpath.expanduser().resolve().exists(): 178 logger.debug('Ignoring non existing config file %s', configfile) 179 return config 180 except FileNotFoundError: 181 # we are on python 3.5 and Path.resolve raised a FileNotFoundError 182 logger.debug('Ignoring non existing config file %s', configfile) 183 return config 184 185 try: 186 config.load(configpath) 187 logger.debug('Loaded config %s', configfile) 188 except Exception as e: # pylint: disable=broad-except 189 raise RuntimeError( 190 f'Error while parsing config file {configfile}. Error was {e}' 191 ) from None 192 193 return config 194 195 def _add_subparsers(self): 196 parser_ssh = self._subparsers.add_parser( 197 'ssh', help='Use SSH to connect to service' 198 ) 199 200 parser_ssh.add_argument( 201 '--hostname', help='Hostname or IP address (default: %(default)s)' 202 ) 203 parser_ssh.add_argument( 204 '--port', 205 required=False, 206 help='SSH port (default: %(default)s)', 207 type=int, 208 ) 209 parser_ssh.add_argument( 210 '--ssh-username', help='SSH username (default: %(default)r)' 211 ) 212 parser_ssh.add_argument( 213 '--ssh-password', help='SSH password (default: %(default)r)' 214 ) 215 216 parser_tls = self._subparsers.add_parser( 217 'tls', help='Use TLS secured connection to connect to service' 218 ) 219 parser_tls.add_argument( 220 '--hostname', help='Hostname or IP address (default: %(default)s)' 221 ) 222 parser_tls.add_argument( 223 '--port', 224 required=False, 225 help='GMP/OSP port (default: %(default)s)', 226 type=int, 227 ) 228 parser_tls.add_argument( 229 '--certfile', 230 required=False, 231 help='Path to the certificate file for client authentication. ' 232 '(default: %(default)s)', 233 ) 234 parser_tls.add_argument( 235 '--keyfile', 236 required=False, 237 help='Path to key file for client authentication. ' 238 '(default: %(default)s)', 239 ) 240 parser_tls.add_argument( 241 '--cafile', 242 required=False, 243 help='Path to CA certificate for server authentication. ' 244 '(default: %(default)s)', 245 ) 246 parser_tls.add_argument( 247 '--no-credentials', 248 required=False, 249 default=False, 250 action='store_true', 251 help='Use only certificates for authentication', 252 ) 253 254 parser_socket = self._subparsers.add_parser( 255 'socket', help='Use UNIX Domain socket to connect to service' 256 ) 257 258 socketpath_group = parser_socket.add_mutually_exclusive_group() 259 socketpath_group.add_argument( 260 '--sockpath', 261 nargs='?', 262 default=None, 263 dest='socketpath', 264 help='Deprecated, use --socketpath instead', 265 ) 266 socketpath_group.add_argument( 267 '--socketpath', 268 nargs='?', 269 help='Path to UNIX Domain socket (default: %(default)s)', 270 ) 271 272 self._parser_ssh = parser_ssh 273 self._parser_socket = parser_socket 274 self._parser_tls = parser_tls 275 276 def _set_defaults(self, configfilename=None): 277 self._config = self._load_config(configfilename) 278 279 self._parser.set_defaults( 280 gmp_username=self._config.get('gmp', 'username'), 281 gmp_password=self._config.get('gmp', 'password'), 282 **self._config.defaults(), 283 ) 284 285 self._parser_ssh.set_defaults( 286 port=int(self._config.get('ssh', 'port')), 287 ssh_username=self._config.get('ssh', 'username'), 288 ssh_password=self._config.get('ssh', 'password'), 289 hostname=self._config.get('ssh', 'hostname'), 290 ) 291 self._parser_tls.set_defaults( 292 port=int(self._config.get('tls', 'port')), 293 certfile=self._config.get('tls', 'certfile'), 294 keyfile=self._config.get('tls', 'keyfile'), 295 cafile=self._config.get('tls', 'cafile'), 296 hostname=self._config.get('tls', 'hostname'), 297 ) 298 self._parser_socket.set_defaults( 299 socketpath=self._config.get('unixsocket', 'socketpath') 300 ) 301 302 303def create_parser(description, logfilename): 304 return CliParser(description, logfilename) 305 306 307def create_connection( 308 connection_type, 309 socketpath=None, 310 timeout=None, 311 hostname=None, 312 port=None, 313 certfile=None, 314 keyfile=None, 315 cafile=None, 316 ssh_username=None, 317 ssh_password=None, 318 **kwargs # pylint: disable=unused-argument 319): 320 if 'socket' in connection_type: 321 return UnixSocketConnection(timeout=timeout, path=socketpath) 322 323 if 'tls' in connection_type: 324 return TLSConnection( 325 timeout=timeout, 326 hostname=hostname, 327 port=port, 328 certfile=certfile, 329 keyfile=keyfile, 330 cafile=cafile, 331 ) 332 333 return SSHConnection( 334 timeout=timeout, 335 hostname=hostname, 336 port=port, 337 username=ssh_username, 338 password=ssh_password, 339 ) 340