1# -*- coding: utf-8 -*-
2# Copyright (C) 2014-2021 Greenbone Networks GmbH
3#
4# SPDX-License-Identifier: AGPL-3.0-or-later
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public License
17# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20""" Provide functions to handle NVT Info Cache. """
21
22import logging
23
24from typing import List, Dict, Optional, Iterator, Tuple
25from pathlib import Path
26from time import time
27
28from ospd.errors import RequiredArgument
29from ospd_openvas.errors import OspdOpenvasError
30from ospd_openvas.db import NVT_META_FIELDS, OpenvasDB, MainDB, BaseDB, RedisCtx
31
32NVTI_CACHE_NAME = "nvticache"
33
34logger = logging.getLogger(__name__)
35
36LIST_FIRST_POS = 0
37LIST_LAST_POS = -1
38
39
40class NVTICache(BaseDB):
41
42    QOD_TYPES = {
43        'exploit': '100',
44        'remote_vul': '99',
45        'remote_app': '98',
46        'package': '97',
47        'registry': '97',
48        'remote_active': '95',
49        'remote_banner': '80',
50        'executable_version': '80',
51        'remote_analysis': '70',
52        'remote_probe': '50',
53        'remote_banner_unreliable': '30',
54        'executable_version_unreliable': '30',
55        'general_note': '1',
56        'default': '70',
57    }
58
59    def __init__(  # pylint: disable=super-init-not-called
60        self, main_db: MainDB
61    ):
62        self._ctx = None
63        self.index = None
64        self._main_db = main_db
65
66    @property
67    def ctx(self) -> Optional[RedisCtx]:
68        if self._ctx is None:
69            self._ctx, self.index = OpenvasDB.find_database_by_pattern(
70                NVTI_CACHE_NAME, self._main_db.max_database_index
71            )
72        return self._ctx
73
74    def get_feed_version(self) -> Optional[str]:
75        """Get feed version of the nvti cache db.
76
77        Returns the feed version or None if the nvt feed isn't available.
78        """
79        if not self.ctx:
80            # no nvti cache db available yet
81            return None
82
83        return OpenvasDB.get_single_item(self.ctx, NVTI_CACHE_NAME)
84
85    def get_oids(self) -> Iterator[Tuple[str, str]]:
86        """Get the list of NVT file names and OIDs.
87
88        Returns:
89            A i. Each single list contains the filename
90            as first element and the oid as second one.
91        """
92        return OpenvasDB.get_filenames_and_oids(self.ctx)
93
94    def get_nvt_params(self, oid: str) -> Optional[Dict[str, str]]:
95        """Get NVT's preferences.
96
97        Arguments:
98            oid: OID of VT from which to get the parameters.
99
100        Returns:
101            A dictionary with preferences and timeout.
102        """
103        prefs = self.get_nvt_prefs(oid)
104
105        vt_params = {}
106
107        if prefs:
108            for nvt_pref in prefs:
109                elem = nvt_pref.split('|||')
110
111                param_id = elem[0]
112                param_name = elem[1]
113                param_type = elem[2]
114
115                vt_params[param_id] = dict()
116                vt_params[param_id]['id'] = param_id
117                vt_params[param_id]['type'] = param_type
118                vt_params[param_id]['name'] = param_name.strip()
119                vt_params[param_id]['description'] = 'Description'
120
121                if len(elem) > 3:
122                    param_default = elem[3]
123                    vt_params[param_id]['default'] = param_default
124                else:
125                    vt_params[param_id]['default'] = ''
126
127        return vt_params
128
129    @staticmethod
130    def _parse_metadata_tags(tags_str: str, oid: str) -> Dict[str, str]:
131        """Parse a string with multiple tags.
132
133        Arguments:
134            tags_str: String with tags separated by `|`.
135            oid: VT OID. Only used for logging in error case.
136
137        Returns:
138            A dictionary with the tags.
139        """
140        tags_dict = dict()
141        tags = tags_str.split('|')
142        for tag in tags:
143            try:
144                _tag, _value = tag.split('=', 1)
145            except ValueError:
146                logger.error('Tag %s in %s has no value.', tag, oid)
147                continue
148            tags_dict[_tag] = _value
149
150        return tags_dict
151
152    def get_nvt_metadata(self, oid: str) -> Optional[Dict[str, str]]:
153        """Get a full NVT. Returns an XML tree with the NVT metadata.
154
155        Arguments:
156            oid: OID of VT from which to get the metadata.
157
158        Returns:
159            A dictionary with the VT metadata.
160        """
161        resp = OpenvasDB.get_list_item(
162            self.ctx,
163            "nvt:%s" % oid,
164            start=NVT_META_FIELDS.index("NVT_FILENAME_POS"),
165            end=NVT_META_FIELDS.index("NVT_NAME_POS"),
166        )
167
168        if not isinstance(resp, list) or len(resp) == 0:
169            return None
170
171        subelem = [
172            'filename',
173            'required_keys',
174            'mandatory_keys',
175            'excluded_keys',
176            'required_udp_ports',
177            'required_ports',
178            'dependencies',
179            'tag',
180            'cve',
181            'bid',
182            'xref',
183            'category',
184            'timeout',
185            'family',
186            'name',
187        ]
188
189        custom = dict()
190        custom['refs'] = dict()
191        custom['vt_params'] = dict()
192        for child, res in zip(subelem, resp):
193            if child not in ['cve', 'bid', 'xref', 'tag', 'timeout'] and res:
194                custom[child] = res
195            elif child == 'tag':
196                custom.update(self._parse_metadata_tags(res, oid))
197            elif child in ['cve', 'bid', 'xref'] and res:
198                custom['refs'][child] = res.split(", ")
199            elif child == 'timeout':
200                if res is None:
201                    continue
202                vt_params = {}
203                if int(res) > 0:
204                    _param_id = '0'
205                    vt_params[_param_id] = dict()
206                    vt_params[_param_id]['id'] = _param_id
207                    vt_params[_param_id]['type'] = 'entry'
208                    vt_params[_param_id]['name'] = 'timeout'
209                    vt_params[_param_id]['description'] = 'Script Timeout'
210                    vt_params[_param_id]['default'] = res
211                custom['vt_params'] = vt_params
212                custom['vt_params'].update(self.get_nvt_params(oid))
213
214        return custom
215
216    def get_nvt_refs(self, oid: str) -> Optional[Dict[str, str]]:
217        """Get a full NVT.
218
219        Arguments:
220            oid: OID of VT from which to get the VT references.
221
222        Returns:
223            A dictionary with the VT references.
224        """
225        resp = OpenvasDB.get_list_item(
226            self.ctx,
227            "nvt:%s" % oid,
228            start=NVT_META_FIELDS.index("NVT_CVES_POS"),
229            end=NVT_META_FIELDS.index("NVT_XREFS_POS"),
230        )
231
232        if not isinstance(resp, list) or len(resp) == 0:
233            return None
234
235        subelem = ['cve', 'bid', 'xref']
236
237        refs = dict()
238        for child, res in zip(subelem, resp):
239            refs[child] = res.split(", ")
240
241        return refs
242
243    def get_nvt_family(self, oid: str) -> str:
244        """Get NVT family
245        Arguments:
246            oid: OID of VT from which to get the VT family.
247
248        Returns:
249            A str with the VT family.
250        """
251        return OpenvasDB.get_single_item(
252            self.ctx,
253            'nvt:%s' % oid,
254            index=NVT_META_FIELDS.index("NVT_FAMILY_POS"),
255        )
256
257    def get_nvt_prefs(self, oid: str) -> Optional[List[str]]:
258        """Get NVT preferences.
259
260        Arguments:
261            ctx: Redis context to be used.
262            oid: OID of VT from which to get the VT preferences.
263
264        Returns:
265            A list with the VT preferences.
266        """
267        key = 'oid:%s:prefs' % oid
268        return OpenvasDB.get_list_item(self.ctx, key)
269
270    def get_nvt_timeout(self, oid: str) -> Optional[str]:
271        """Get NVT timeout
272
273        Arguments:
274            ctx: Redis context to be used.
275            oid: OID of VT from which to get the script timeout.
276
277        Returns:
278            The timeout.
279        """
280        return OpenvasDB.get_single_item(
281            self.ctx,
282            'nvt:%s' % oid,
283            index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"),
284        )
285
286    def get_nvt_tags(self, oid: str) -> Optional[Dict[str, str]]:
287        """Get Tags of the given OID.
288
289        Arguments:
290            ctx: Redis context to be used.
291            oid: OID of VT from which to get the VT tags.
292
293        Returns:
294            A dictionary with the VT tags.
295        """
296        tag = OpenvasDB.get_single_item(
297            self.ctx,
298            'nvt:%s' % oid,
299            index=NVT_META_FIELDS.index('NVT_TAGS_POS'),
300        )
301        tags = tag.split('|')
302
303        return dict([item.split('=', 1) for item in tags])
304
305    def get_nvt_files_count(self) -> int:
306        return OpenvasDB.get_key_count(self.ctx, "filename:*")
307
308    def get_nvt_count(self) -> int:
309        return OpenvasDB.get_key_count(self.ctx, "nvt:*")
310
311    def force_reload(self):
312        self._main_db.release_database(self)
313
314    def add_vt_to_cache(self, vt_id: str, vt: List[str]):
315        if not vt_id:
316            raise RequiredArgument('add_vt_to_cache', 'vt_id')
317        if not vt:
318            raise RequiredArgument('add_vt_to_cache', 'vt')
319        if not isinstance(vt, list) or len(vt) != 15:
320            raise OspdOpenvasError(
321                'Error trying to load the VT' ' {} in cache'.format(vt)
322            )
323
324        OpenvasDB.add_single_list(self.ctx, vt_id, vt)
325
326        OpenvasDB.add_single_item(self.ctx, f'filename:{vt[0]}', [int(time())])
327
328    def get_file_checksum(self, file_abs_path: Path) -> str:
329        """Get file sha256 checksum or md5 checksum
330
331        Arguments:
332            file_abs_path: File to get the checksum
333
334        Returns:
335            The checksum
336        """
337        # Try to get first sha256 checksum
338        sha256sum = OpenvasDB.get_single_item(
339            self.ctx,
340            f'sha256sums:{file_abs_path}',
341        )
342        if sha256sum:
343            return sha256sum
344
345        # Search for md5 checksum
346        md5sum = OpenvasDB.get_single_item(
347            self.ctx,
348            f'md5sums:{file_abs_path}',
349        )
350        if md5sum:
351            return md5sum
352