1# -*- coding: utf-8 -*-
2# Copyright (C) 2018-2021 Greenbone Networks GmbH
3#
4# SPDX-License-Identifier: GPL-3.0-or-later
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19"""Command Line Interface Parser
20"""
21
22import argparse
23import logging
24
25from pathlib import Path
26
27from gvm import get_version as get_gvm_version
28from gvm.connections import (
29    DEFAULT_TIMEOUT,
30    SSHConnection,
31    TLSConnection,
32    UnixSocketConnection,
33)
34
35from gvmtools import get_version
36from gvmtools.config import Config
37
38logger = logging.getLogger(__name__)
39
40__version__ = get_version()
41__api_version__ = get_gvm_version()
42
43DEFAULT_CONFIG_PATH = '~/.config/gvm-tools.conf'
44
45PROTOCOL_OSP = 'OSP'
46PROTOCOL_GMP = 'GMP'
47DEFAULT_PROTOCOL = PROTOCOL_GMP
48
49
50class CliParser:
51    def __init__(
52        self, description, logfilename, *, prog=None, ignore_config=False
53    ):
54        bootstrap_parser = argparse.ArgumentParser(
55            prog=prog,
56            description=description,
57            formatter_class=argparse.RawTextHelpFormatter,
58            # don't parse help initially. the args from parser wouldn't be shown
59            add_help=False,
60        )
61
62        bootstrap_parser.add_argument(
63            '-c',
64            '--config',
65            nargs='?',
66            default=DEFAULT_CONFIG_PATH,
67            help='Configuration file path (default: %(default)s)',
68        )
69
70        choices = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
71
72        bootstrap_parser.add_argument(
73            '--log',
74            nargs='?',
75            dest='loglevel',
76            const='INFO',
77            type=lambda arg: {x.upper(): x for x in choices}[arg.upper()],
78            choices=choices,
79            help='Activate logging (default level: %(default)s)',
80        )
81
82        parser = argparse.ArgumentParser(prog=prog, parents=[bootstrap_parser])
83
84        parser.add_argument(
85            '--timeout',
86            required=False,
87            default=DEFAULT_TIMEOUT,
88            type=int,
89            help='Response timeout in seconds, or -1 to wait '
90            'indefinitely (default: %(default)s)',
91        )
92        parser.add_argument(
93            '--gmp-username',
94            help='Username for GMP service (default: %(default)r)',
95        )
96        parser.add_argument(
97            '--gmp-password',
98            help='Password for GMP service (default: %(default)r)',
99        )
100        parser.add_argument(
101            '-V',
102            '--version',
103            action='version',
104            version=f'%(prog)s {__version__} (API version {__api_version__})',
105            help='Show version information and exit',
106        )
107
108        subparsers = parser.add_subparsers(
109            metavar='CONNECTION_TYPE',
110            title='connections',
111            description='valid connection types',
112            help="Connection type to use",
113        )
114        subparsers.required = True
115        subparsers.dest = 'connection_type'
116
117        self._subparsers = subparsers
118
119        self._parser = parser
120        self._bootstrap_parser = bootstrap_parser
121
122        self._logfilename = logfilename
123        self._ignore_config = ignore_config
124
125        self._add_subparsers()
126
127    def parse_args(self, args=None):
128        args, unkown_args = self.parse_known_args(args)
129        if unkown_args:
130            self._parser.error(
131                f'unrecognized arguments {" ".join(unkown_args)}'
132            )
133        return args
134
135    def parse_known_args(self, args=None):
136        args_before, _ = self._bootstrap_parser.parse_known_args(args)
137
138        if args_before.loglevel is not None:
139            level = logging.getLevelName(args_before.loglevel)
140            logging.basicConfig(filename=self._logfilename, level=level)
141
142        self._set_defaults(None if self._ignore_config else args_before.config)
143
144        args, unknown_args = self._parser.parse_known_args(args)
145
146        # If timeout value is -1, then the socket should have no timeout
147        if args.timeout == -1:
148            args.timeout = None
149
150        logging.debug('Parsed arguments %r', args)
151
152        return args, unknown_args
153
154    def add_argument(self, *args, **kwargs):
155        self._parser_socket.add_argument(*args, **kwargs)
156        self._parser_ssh.add_argument(*args, **kwargs)
157        self._parser_tls.add_argument(*args, **kwargs)
158
159    def add_protocol_argument(self):
160        self._parser.add_argument(
161            '--protocol',
162            required=False,
163            default=DEFAULT_PROTOCOL,
164            choices=[PROTOCOL_GMP, PROTOCOL_OSP],
165            help='Service protocol to use (default: %(default)s)',
166        )
167
168    def _load_config(self, configfile):
169        config = Config()
170
171        if not configfile:
172            return config
173
174        configpath = Path(configfile)
175
176        try:
177            if not configpath.expanduser().resolve().exists():
178                logger.debug('Ignoring non existing config file %s', configfile)
179                return config
180        except FileNotFoundError:
181            # we are on python 3.5 and Path.resolve raised a FileNotFoundError
182            logger.debug('Ignoring non existing config file %s', configfile)
183            return config
184
185        try:
186            config.load(configpath)
187            logger.debug('Loaded config %s', configfile)
188        except Exception as e:  # pylint: disable=broad-except
189            raise RuntimeError(
190                f'Error while parsing config file {configfile}. Error was {e}'
191            ) from None
192
193        return config
194
195    def _add_subparsers(self):
196        parser_ssh = self._subparsers.add_parser(
197            'ssh', help='Use SSH to connect to service'
198        )
199
200        parser_ssh.add_argument(
201            '--hostname', help='Hostname or IP address (default: %(default)s)'
202        )
203        parser_ssh.add_argument(
204            '--port',
205            required=False,
206            help='SSH port (default: %(default)s)',
207            type=int,
208        )
209        parser_ssh.add_argument(
210            '--ssh-username', help='SSH username (default: %(default)r)'
211        )
212        parser_ssh.add_argument(
213            '--ssh-password', help='SSH password (default: %(default)r)'
214        )
215
216        parser_tls = self._subparsers.add_parser(
217            'tls', help='Use TLS secured connection to connect to service'
218        )
219        parser_tls.add_argument(
220            '--hostname', help='Hostname or IP address (default: %(default)s)'
221        )
222        parser_tls.add_argument(
223            '--port',
224            required=False,
225            help='GMP/OSP port (default: %(default)s)',
226            type=int,
227        )
228        parser_tls.add_argument(
229            '--certfile',
230            required=False,
231            help='Path to the certificate file for client authentication. '
232            '(default: %(default)s)',
233        )
234        parser_tls.add_argument(
235            '--keyfile',
236            required=False,
237            help='Path to key file for client authentication. '
238            '(default: %(default)s)',
239        )
240        parser_tls.add_argument(
241            '--cafile',
242            required=False,
243            help='Path to CA certificate for server authentication. '
244            '(default: %(default)s)',
245        )
246        parser_tls.add_argument(
247            '--no-credentials',
248            required=False,
249            default=False,
250            action='store_true',
251            help='Use only certificates for authentication',
252        )
253
254        parser_socket = self._subparsers.add_parser(
255            'socket', help='Use UNIX Domain socket to connect to service'
256        )
257
258        socketpath_group = parser_socket.add_mutually_exclusive_group()
259        socketpath_group.add_argument(
260            '--sockpath',
261            nargs='?',
262            default=None,
263            dest='socketpath',
264            help='Deprecated, use --socketpath instead',
265        )
266        socketpath_group.add_argument(
267            '--socketpath',
268            nargs='?',
269            help='Path to UNIX Domain socket (default: %(default)s)',
270        )
271
272        self._parser_ssh = parser_ssh
273        self._parser_socket = parser_socket
274        self._parser_tls = parser_tls
275
276    def _set_defaults(self, configfilename=None):
277        self._config = self._load_config(configfilename)
278
279        self._parser.set_defaults(
280            gmp_username=self._config.get('gmp', 'username'),
281            gmp_password=self._config.get('gmp', 'password'),
282            **self._config.defaults(),
283        )
284
285        self._parser_ssh.set_defaults(
286            port=int(self._config.get('ssh', 'port')),
287            ssh_username=self._config.get('ssh', 'username'),
288            ssh_password=self._config.get('ssh', 'password'),
289            hostname=self._config.get('ssh', 'hostname'),
290        )
291        self._parser_tls.set_defaults(
292            port=int(self._config.get('tls', 'port')),
293            certfile=self._config.get('tls', 'certfile'),
294            keyfile=self._config.get('tls', 'keyfile'),
295            cafile=self._config.get('tls', 'cafile'),
296            hostname=self._config.get('tls', 'hostname'),
297        )
298        self._parser_socket.set_defaults(
299            socketpath=self._config.get('unixsocket', 'socketpath')
300        )
301
302
303def create_parser(description, logfilename):
304    return CliParser(description, logfilename)
305
306
307def create_connection(
308    connection_type,
309    socketpath=None,
310    timeout=None,
311    hostname=None,
312    port=None,
313    certfile=None,
314    keyfile=None,
315    cafile=None,
316    ssh_username=None,
317    ssh_password=None,
318    **kwargs  # pylint: disable=unused-argument
319):
320    if 'socket' in connection_type:
321        return UnixSocketConnection(timeout=timeout, path=socketpath)
322
323    if 'tls' in connection_type:
324        return TLSConnection(
325            timeout=timeout,
326            hostname=hostname,
327            port=port,
328            certfile=certfile,
329            keyfile=keyfile,
330            cafile=cafile,
331        )
332
333    return SSHConnection(
334        timeout=timeout,
335        hostname=hostname,
336        port=port,
337        username=ssh_username,
338        password=ssh_password,
339    )
340