1# -*- coding: utf-8 -*- 2# Copyright (C) 2014-2021 Greenbone Networks GmbH 3# 4# SPDX-License-Identifier: AGPL-3.0-or-later 5# 6# This program is free software: you can redistribute it and/or modify 7# it under the terms of the GNU Affero General Public License as 8# published by the Free Software Foundation, either version 3 of the 9# License, or (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU Affero General Public License for more details. 15# 16# You should have received a copy of the GNU Affero General Public License 17# along with this program. If not, see <http://www.gnu.org/licenses/>. 18 19 20""" Provide functions to handle NVT Info Cache. """ 21 22import logging 23 24from typing import List, Dict, Optional, Iterator, Tuple 25from pathlib import Path 26from time import time 27 28from ospd.errors import RequiredArgument 29from ospd_openvas.errors import OspdOpenvasError 30from ospd_openvas.db import NVT_META_FIELDS, OpenvasDB, MainDB, BaseDB, RedisCtx 31 32NVTI_CACHE_NAME = "nvticache" 33 34logger = logging.getLogger(__name__) 35 36LIST_FIRST_POS = 0 37LIST_LAST_POS = -1 38 39 40class NVTICache(BaseDB): 41 42 QOD_TYPES = { 43 'exploit': '100', 44 'remote_vul': '99', 45 'remote_app': '98', 46 'package': '97', 47 'registry': '97', 48 'remote_active': '95', 49 'remote_banner': '80', 50 'executable_version': '80', 51 'remote_analysis': '70', 52 'remote_probe': '50', 53 'remote_banner_unreliable': '30', 54 'executable_version_unreliable': '30', 55 'general_note': '1', 56 'default': '70', 57 } 58 59 def __init__( # pylint: disable=super-init-not-called 60 self, main_db: MainDB 61 ): 62 self._ctx = None 63 self.index = None 64 self._main_db = main_db 65 66 @property 67 def ctx(self) -> Optional[RedisCtx]: 68 if self._ctx is None: 69 self._ctx, self.index = OpenvasDB.find_database_by_pattern( 70 NVTI_CACHE_NAME, self._main_db.max_database_index 71 ) 72 return self._ctx 73 74 def get_feed_version(self) -> Optional[str]: 75 """Get feed version of the nvti cache db. 76 77 Returns the feed version or None if the nvt feed isn't available. 78 """ 79 if not self.ctx: 80 # no nvti cache db available yet 81 return None 82 83 return OpenvasDB.get_single_item(self.ctx, NVTI_CACHE_NAME) 84 85 def get_oids(self) -> Iterator[Tuple[str, str]]: 86 """Get the list of NVT file names and OIDs. 87 88 Returns: 89 A i. Each single list contains the filename 90 as first element and the oid as second one. 91 """ 92 return OpenvasDB.get_filenames_and_oids(self.ctx) 93 94 def get_nvt_params(self, oid: str) -> Optional[Dict[str, str]]: 95 """Get NVT's preferences. 96 97 Arguments: 98 oid: OID of VT from which to get the parameters. 99 100 Returns: 101 A dictionary with preferences and timeout. 102 """ 103 prefs = self.get_nvt_prefs(oid) 104 105 vt_params = {} 106 107 if prefs: 108 for nvt_pref in prefs: 109 elem = nvt_pref.split('|||') 110 111 param_id = elem[0] 112 param_name = elem[1] 113 param_type = elem[2] 114 115 vt_params[param_id] = dict() 116 vt_params[param_id]['id'] = param_id 117 vt_params[param_id]['type'] = param_type 118 vt_params[param_id]['name'] = param_name.strip() 119 vt_params[param_id]['description'] = 'Description' 120 121 if len(elem) > 3: 122 param_default = elem[3] 123 vt_params[param_id]['default'] = param_default 124 else: 125 vt_params[param_id]['default'] = '' 126 127 return vt_params 128 129 @staticmethod 130 def _parse_metadata_tags(tags_str: str, oid: str) -> Dict[str, str]: 131 """Parse a string with multiple tags. 132 133 Arguments: 134 tags_str: String with tags separated by `|`. 135 oid: VT OID. Only used for logging in error case. 136 137 Returns: 138 A dictionary with the tags. 139 """ 140 tags_dict = dict() 141 tags = tags_str.split('|') 142 for tag in tags: 143 try: 144 _tag, _value = tag.split('=', 1) 145 except ValueError: 146 logger.error('Tag %s in %s has no value.', tag, oid) 147 continue 148 tags_dict[_tag] = _value 149 150 return tags_dict 151 152 def get_nvt_metadata(self, oid: str) -> Optional[Dict[str, str]]: 153 """Get a full NVT. Returns an XML tree with the NVT metadata. 154 155 Arguments: 156 oid: OID of VT from which to get the metadata. 157 158 Returns: 159 A dictionary with the VT metadata. 160 """ 161 resp = OpenvasDB.get_list_item( 162 self.ctx, 163 "nvt:%s" % oid, 164 start=NVT_META_FIELDS.index("NVT_FILENAME_POS"), 165 end=NVT_META_FIELDS.index("NVT_NAME_POS"), 166 ) 167 168 if not isinstance(resp, list) or len(resp) == 0: 169 return None 170 171 subelem = [ 172 'filename', 173 'required_keys', 174 'mandatory_keys', 175 'excluded_keys', 176 'required_udp_ports', 177 'required_ports', 178 'dependencies', 179 'tag', 180 'cve', 181 'bid', 182 'xref', 183 'category', 184 'timeout', 185 'family', 186 'name', 187 ] 188 189 custom = dict() 190 custom['refs'] = dict() 191 custom['vt_params'] = dict() 192 for child, res in zip(subelem, resp): 193 if child not in ['cve', 'bid', 'xref', 'tag', 'timeout'] and res: 194 custom[child] = res 195 elif child == 'tag': 196 custom.update(self._parse_metadata_tags(res, oid)) 197 elif child in ['cve', 'bid', 'xref'] and res: 198 custom['refs'][child] = res.split(", ") 199 elif child == 'timeout': 200 if res is None: 201 continue 202 vt_params = {} 203 if int(res) > 0: 204 _param_id = '0' 205 vt_params[_param_id] = dict() 206 vt_params[_param_id]['id'] = _param_id 207 vt_params[_param_id]['type'] = 'entry' 208 vt_params[_param_id]['name'] = 'timeout' 209 vt_params[_param_id]['description'] = 'Script Timeout' 210 vt_params[_param_id]['default'] = res 211 custom['vt_params'] = vt_params 212 custom['vt_params'].update(self.get_nvt_params(oid)) 213 214 return custom 215 216 def get_nvt_refs(self, oid: str) -> Optional[Dict[str, str]]: 217 """Get a full NVT. 218 219 Arguments: 220 oid: OID of VT from which to get the VT references. 221 222 Returns: 223 A dictionary with the VT references. 224 """ 225 resp = OpenvasDB.get_list_item( 226 self.ctx, 227 "nvt:%s" % oid, 228 start=NVT_META_FIELDS.index("NVT_CVES_POS"), 229 end=NVT_META_FIELDS.index("NVT_XREFS_POS"), 230 ) 231 232 if not isinstance(resp, list) or len(resp) == 0: 233 return None 234 235 subelem = ['cve', 'bid', 'xref'] 236 237 refs = dict() 238 for child, res in zip(subelem, resp): 239 refs[child] = res.split(", ") 240 241 return refs 242 243 def get_nvt_family(self, oid: str) -> str: 244 """Get NVT family 245 Arguments: 246 oid: OID of VT from which to get the VT family. 247 248 Returns: 249 A str with the VT family. 250 """ 251 return OpenvasDB.get_single_item( 252 self.ctx, 253 'nvt:%s' % oid, 254 index=NVT_META_FIELDS.index("NVT_FAMILY_POS"), 255 ) 256 257 def get_nvt_prefs(self, oid: str) -> Optional[List[str]]: 258 """Get NVT preferences. 259 260 Arguments: 261 ctx: Redis context to be used. 262 oid: OID of VT from which to get the VT preferences. 263 264 Returns: 265 A list with the VT preferences. 266 """ 267 key = 'oid:%s:prefs' % oid 268 return OpenvasDB.get_list_item(self.ctx, key) 269 270 def get_nvt_timeout(self, oid: str) -> Optional[str]: 271 """Get NVT timeout 272 273 Arguments: 274 ctx: Redis context to be used. 275 oid: OID of VT from which to get the script timeout. 276 277 Returns: 278 The timeout. 279 """ 280 return OpenvasDB.get_single_item( 281 self.ctx, 282 'nvt:%s' % oid, 283 index=NVT_META_FIELDS.index("NVT_TIMEOUT_POS"), 284 ) 285 286 def get_nvt_tags(self, oid: str) -> Optional[Dict[str, str]]: 287 """Get Tags of the given OID. 288 289 Arguments: 290 ctx: Redis context to be used. 291 oid: OID of VT from which to get the VT tags. 292 293 Returns: 294 A dictionary with the VT tags. 295 """ 296 tag = OpenvasDB.get_single_item( 297 self.ctx, 298 'nvt:%s' % oid, 299 index=NVT_META_FIELDS.index('NVT_TAGS_POS'), 300 ) 301 tags = tag.split('|') 302 303 return dict([item.split('=', 1) for item in tags]) 304 305 def get_nvt_files_count(self) -> int: 306 return OpenvasDB.get_key_count(self.ctx, "filename:*") 307 308 def get_nvt_count(self) -> int: 309 return OpenvasDB.get_key_count(self.ctx, "nvt:*") 310 311 def force_reload(self): 312 self._main_db.release_database(self) 313 314 def add_vt_to_cache(self, vt_id: str, vt: List[str]): 315 if not vt_id: 316 raise RequiredArgument('add_vt_to_cache', 'vt_id') 317 if not vt: 318 raise RequiredArgument('add_vt_to_cache', 'vt') 319 if not isinstance(vt, list) or len(vt) != 15: 320 raise OspdOpenvasError( 321 'Error trying to load the VT' ' {} in cache'.format(vt) 322 ) 323 324 OpenvasDB.add_single_list(self.ctx, vt_id, vt) 325 326 OpenvasDB.add_single_item(self.ctx, f'filename:{vt[0]}', [int(time())]) 327 328 def get_file_checksum(self, file_abs_path: Path) -> str: 329 """Get file sha256 checksum or md5 checksum 330 331 Arguments: 332 file_abs_path: File to get the checksum 333 334 Returns: 335 The checksum 336 """ 337 # Try to get first sha256 checksum 338 sha256sum = OpenvasDB.get_single_item( 339 self.ctx, 340 f'sha256sums:{file_abs_path}', 341 ) 342 if sha256sum: 343 return sha256sum 344 345 # Search for md5 checksum 346 md5sum = OpenvasDB.get_single_item( 347 self.ctx, 348 f'md5sums:{file_abs_path}', 349 ) 350 if md5sum: 351 return md5sum 352