1# -*- coding: utf-8 -*- 2# Copyright (C) 2021 Greenbone Networks GmbH 3# 4# SPDX-License-Identifier: GPL-3.0-or-later 5# 6# This program is free software: you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation, either version 3 of the License, or 9# (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program. If not, see <http://www.gnu.org/licenses/>. 18 19 20from collections.abc import Mapping 21from numbers import Integral 22from typing import Any, List, Optional 23 24from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument 25from gvm.protocols.gmpv208.entities.hosts import HostsOrdering 26from gvm.utils import add_filter, is_list_like, to_bool, to_comma_list 27from gvm.xml import XmlCommand 28 29 30class TasksMixin: 31 def clone_task(self, task_id: str) -> Any: 32 """Clone an existing task 33 34 Arguments: 35 task_id: UUID of existing task to clone from 36 37 Returns: 38 The response. See :py:meth:`send_command` for details. 39 """ 40 if not task_id: 41 raise RequiredArgument( 42 function=self.clone_task.__name__, argument='task_id' 43 ) 44 45 cmd = XmlCommand("create_task") 46 cmd.add_element("copy", task_id) 47 return self._send_xml_command(cmd) 48 49 def create_container_task( 50 self, name: str, *, comment: Optional[str] = None 51 ) -> Any: 52 """Create a new container task 53 54 A container task is a "meta" task to import and view reports from other 55 systems. 56 57 Arguments: 58 name: Name of the task 59 comment: Comment for the task 60 61 Returns: 62 The response. See :py:meth:`send_command` for details. 63 """ 64 if not name: 65 raise RequiredArgument( 66 function=self.create_container_task.__name__, argument='name' 67 ) 68 69 cmd = XmlCommand("create_task") 70 cmd.add_element("name", name) 71 cmd.add_element("target", attrs={"id": "0"}) 72 73 if comment: 74 cmd.add_element("comment", comment) 75 76 return self._send_xml_command(cmd) 77 78 def create_task( 79 self, 80 name: str, 81 config_id: str, 82 target_id: str, 83 scanner_id: str, 84 *, 85 alterable: Optional[bool] = None, 86 hosts_ordering: Optional[HostsOrdering] = None, 87 schedule_id: Optional[str] = None, 88 alert_ids: Optional[List[str]] = None, 89 comment: Optional[str] = None, 90 schedule_periods: Optional[int] = None, 91 observers: Optional[List[str]] = None, 92 preferences: Optional[dict] = None, 93 ) -> Any: 94 """Create a new scan task 95 96 Arguments: 97 name: Name of the new task 98 config_id: UUID of config to use by the task 99 target_id: UUID of target to be scanned 100 scanner_id: UUID of scanner to use for scanning the target 101 comment: Comment for the task 102 alterable: Whether the task should be alterable 103 alert_ids: List of UUIDs for alerts to be applied to the task 104 hosts_ordering: The order hosts are scanned in 105 schedule_id: UUID of a schedule when the task should be run. 106 schedule_periods: A limit to the number of times the task will be 107 scheduled, or 0 for no limit 108 observers: List of names or ids of users which should be allowed to 109 observe this task 110 preferences: Name/Value pairs of scanner preferences. 111 112 Returns: 113 The response. See :py:meth:`send_command` for details. 114 """ 115 if not name: 116 raise RequiredArgument( 117 function=self.create_task.__name__, argument='name' 118 ) 119 120 if not config_id: 121 raise RequiredArgument( 122 function=self.create_task.__name__, argument='config_id' 123 ) 124 125 if not target_id: 126 raise RequiredArgument( 127 function=self.create_task.__name__, argument='target_id' 128 ) 129 130 if not scanner_id: 131 raise RequiredArgument( 132 function=self.create_task.__name__, argument='scanner_id' 133 ) 134 135 # don't allow to create a container task with create_task 136 if target_id == '0': 137 raise InvalidArgument( 138 function=self.create_task.__name__, argument='target_id' 139 ) 140 141 cmd = XmlCommand("create_task") 142 cmd.add_element("name", name) 143 cmd.add_element("usage_type", "scan") 144 cmd.add_element("config", attrs={"id": config_id}) 145 cmd.add_element("target", attrs={"id": target_id}) 146 cmd.add_element("scanner", attrs={"id": scanner_id}) 147 148 if comment: 149 cmd.add_element("comment", comment) 150 151 if alterable is not None: 152 cmd.add_element("alterable", to_bool(alterable)) 153 154 if hosts_ordering: 155 if not isinstance(hosts_ordering, HostsOrdering): 156 raise InvalidArgumentType( 157 function=self.create_task.__name__, 158 argument='hosts_ordering', 159 arg_type=HostsOrdering.__name__, 160 ) 161 cmd.add_element("hosts_ordering", hosts_ordering.value) 162 163 if alert_ids is not None: 164 if not is_list_like(alert_ids): 165 raise InvalidArgumentType( 166 function=self.modify_task.__name__, 167 argument='alert_ids', 168 arg_type='list', 169 ) 170 171 if not len(alert_ids) == 0: 172 for alert in alert_ids: 173 cmd.add_element("alert", attrs={"id": str(alert)}) 174 175 if schedule_id: 176 cmd.add_element("schedule", attrs={"id": schedule_id}) 177 178 if schedule_periods is not None: 179 if ( 180 not isinstance(schedule_periods, Integral) 181 or schedule_periods < 0 182 ): 183 raise InvalidArgument( 184 "schedule_periods must be an integer greater or equal " 185 "than 0" 186 ) 187 cmd.add_element("schedule_periods", str(schedule_periods)) 188 189 if observers is not None: 190 if not is_list_like(observers): 191 raise InvalidArgumentType( 192 function=self.create_task.__name__, 193 argument='observers', 194 arg_type='list', 195 ) 196 197 # gvmd splits by comma and space 198 # gvmd tries to lookup each value as user name and afterwards as 199 # user id. So both user name and user id are possible 200 cmd.add_element("observers", to_comma_list(observers)) 201 202 if preferences is not None: 203 if not isinstance(preferences, Mapping): 204 raise InvalidArgumentType( 205 function=self.create_task.__name__, 206 argument='preferences', 207 arg_type=Mapping.__name__, 208 ) 209 210 _xmlprefs = cmd.add_element("preferences") 211 for pref_name, pref_value in preferences.items(): 212 _xmlpref = _xmlprefs.add_element("preference") 213 _xmlpref.add_element("scanner_name", pref_name) 214 _xmlpref.add_element("value", str(pref_value)) 215 216 return self._send_xml_command(cmd) 217 218 def delete_task( 219 self, task_id: str, *, ultimate: Optional[bool] = False 220 ) -> Any: 221 """Deletes an existing task 222 223 Arguments: 224 task_id: UUID of the task to be deleted. 225 ultimate: Whether to remove entirely, or to the trashcan. 226 """ 227 if not task_id: 228 raise RequiredArgument( 229 function=self.delete_task.__name__, argument='task_id' 230 ) 231 232 cmd = XmlCommand("delete_task") 233 cmd.set_attribute("task_id", task_id) 234 cmd.set_attribute("ultimate", to_bool(ultimate)) 235 236 return self._send_xml_command(cmd) 237 238 def get_tasks( 239 self, 240 *, 241 filter_string: Optional[str] = None, 242 filter_id: Optional[str] = None, 243 trash: Optional[bool] = None, 244 details: Optional[bool] = None, 245 schedules_only: Optional[bool] = None, 246 ) -> Any: 247 """Request a list of tasks 248 249 Arguments: 250 filter_string: Filter term to use for the query 251 filter_id: UUID of an existing filter to use for the query 252 trash: Whether to get the trashcan tasks instead 253 details: Whether to include full task details 254 schedules_only: Whether to only include id, name and schedule 255 details 256 257 Returns: 258 The response. See :py:meth:`send_command` for details. 259 """ 260 cmd = XmlCommand("get_tasks") 261 cmd.set_attribute("usage_type", "scan") 262 263 add_filter(cmd, filter_string, filter_id) 264 265 if trash is not None: 266 cmd.set_attribute("trash", to_bool(trash)) 267 268 if details is not None: 269 cmd.set_attribute("details", to_bool(details)) 270 271 if schedules_only is not None: 272 cmd.set_attribute("schedules_only", to_bool(schedules_only)) 273 274 return self._send_xml_command(cmd) 275 276 def get_task(self, task_id: str) -> Any: 277 """Request a single task 278 279 Arguments: 280 task_id: UUID of an existing task 281 282 Returns: 283 The response. See :py:meth:`send_command` for details. 284 """ 285 if not task_id: 286 raise RequiredArgument( 287 function=self.get_task.__name__, argument='task_id' 288 ) 289 290 cmd = XmlCommand("get_tasks") 291 cmd.set_attribute("task_id", task_id) 292 cmd.set_attribute("usage_type", "scan") 293 294 # for single entity always request all details 295 cmd.set_attribute("details", "1") 296 return self._send_xml_command(cmd) 297 298 def modify_task( 299 self, 300 task_id: str, 301 *, 302 name: Optional[str] = None, 303 config_id: Optional[str] = None, 304 target_id: Optional[str] = None, 305 scanner_id: Optional[str] = None, 306 alterable: Optional[bool] = None, 307 hosts_ordering: Optional[HostsOrdering] = None, 308 schedule_id: Optional[str] = None, 309 schedule_periods: Optional[int] = None, 310 comment: Optional[str] = None, 311 alert_ids: Optional[List[str]] = None, 312 observers: Optional[List[str]] = None, 313 preferences: Optional[dict] = None, 314 ) -> Any: 315 """Modifies an existing task. 316 317 Arguments: 318 task_id: UUID of task to modify. 319 name: The name of the task. 320 config_id: UUID of scan config to use by the task 321 target_id: UUID of target to be scanned 322 scanner_id: UUID of scanner to use for scanning the target 323 comment: The comment on the task. 324 alert_ids: List of UUIDs for alerts to be applied to the task 325 hosts_ordering: The order hosts are scanned in 326 schedule_id: UUID of a schedule when the task should be run. 327 schedule_periods: A limit to the number of times the task will be 328 scheduled, or 0 for no limit. 329 observers: List of names or ids of users which should be allowed to 330 observe this task 331 preferences: Name/Value pairs of scanner preferences. 332 333 Returns: 334 The response. See :py:meth:`send_command` for details. 335 """ 336 if not task_id: 337 raise RequiredArgument( 338 function=self.modify_task.__name__, argument='task_id argument' 339 ) 340 341 cmd = XmlCommand("modify_task") 342 cmd.set_attribute("task_id", task_id) 343 344 if name: 345 cmd.add_element("name", name) 346 347 if comment: 348 cmd.add_element("comment", comment) 349 350 if config_id: 351 cmd.add_element("config", attrs={"id": config_id}) 352 353 if target_id: 354 cmd.add_element("target", attrs={"id": target_id}) 355 356 if alterable is not None: 357 cmd.add_element("alterable", to_bool(alterable)) 358 359 if hosts_ordering: 360 if not isinstance(hosts_ordering, HostsOrdering): 361 raise InvalidArgumentType( 362 function=self.modify_task.__name__, 363 argument='hosts_ordering', 364 arg_type=HostsOrdering.__name__, 365 ) 366 cmd.add_element("hosts_ordering", hosts_ordering.value) 367 368 if scanner_id: 369 cmd.add_element("scanner", attrs={"id": scanner_id}) 370 371 if schedule_id: 372 cmd.add_element("schedule", attrs={"id": schedule_id}) 373 374 if schedule_periods is not None: 375 if ( 376 not isinstance(schedule_periods, Integral) 377 or schedule_periods < 0 378 ): 379 raise InvalidArgument( 380 "schedule_periods must be an integer greater or equal " 381 "than 0" 382 ) 383 cmd.add_element("schedule_periods", str(schedule_periods)) 384 385 if alert_ids is not None: 386 if not is_list_like(alert_ids): 387 raise InvalidArgumentType( 388 function=self.modify_task.__name__, 389 argument='alert_ids', 390 arg_type='list', 391 ) 392 393 if len(alert_ids) == 0: 394 cmd.add_element("alert", attrs={"id": "0"}) 395 else: 396 for alert in alert_ids: 397 cmd.add_element("alert", attrs={"id": str(alert)}) 398 399 if observers is not None: 400 if not is_list_like(observers): 401 raise InvalidArgumentType( 402 function=self.modify_task.__name__, 403 argument='observers', 404 arg_type='list', 405 ) 406 407 cmd.add_element("observers", to_comma_list(observers)) 408 409 if preferences is not None: 410 if not isinstance(preferences, Mapping): 411 raise InvalidArgumentType( 412 function=self.modify_task.__name__, 413 argument='preferences', 414 arg_type=Mapping.__name__, 415 ) 416 417 _xmlprefs = cmd.add_element("preferences") 418 for pref_name, pref_value in preferences.items(): 419 _xmlpref = _xmlprefs.add_element("preference") 420 _xmlpref.add_element("scanner_name", pref_name) 421 _xmlpref.add_element("value", str(pref_value)) 422 423 return self._send_xml_command(cmd) 424 425 def move_task(self, task_id: str, *, slave_id: Optional[str] = None) -> Any: 426 """Move an existing task to another GMP slave scanner or the master 427 428 Arguments: 429 task_id: UUID of the task to be moved 430 slave_id: UUID of slave to reassign the task to, empty for master. 431 432 Returns: 433 The response. See :py:meth:`send_command` for details. 434 """ 435 if not task_id: 436 raise RequiredArgument( 437 function=self.move_task.__name__, argument='task_id' 438 ) 439 440 cmd = XmlCommand("move_task") 441 cmd.set_attribute("task_id", task_id) 442 443 if slave_id is not None: 444 cmd.set_attribute("slave_id", slave_id) 445 446 return self._send_xml_command(cmd) 447 448 def start_task(self, task_id: str) -> Any: 449 """Start an existing task 450 451 Arguments: 452 task_id: UUID of the task to be started 453 454 Returns: 455 The response. See :py:meth:`send_command` for details. 456 """ 457 if not task_id: 458 raise RequiredArgument( 459 function=self.start_task.__name__, argument='task_id' 460 ) 461 462 cmd = XmlCommand("start_task") 463 cmd.set_attribute("task_id", task_id) 464 465 return self._send_xml_command(cmd) 466 467 def resume_task(self, task_id: str) -> Any: 468 """Resume an existing stopped task 469 470 Arguments: 471 task_id: UUID of the task to be resumed 472 473 Returns: 474 The response. See :py:meth:`send_command` for details. 475 """ 476 if not task_id: 477 raise RequiredArgument( 478 function=self.resume_task.__name__, argument='task_id' 479 ) 480 481 cmd = XmlCommand("resume_task") 482 cmd.set_attribute("task_id", task_id) 483 484 return self._send_xml_command(cmd) 485 486 def stop_task(self, task_id: str) -> Any: 487 """Stop an existing running task 488 489 Arguments: 490 task_id: UUID of the task to be stopped 491 492 Returns: 493 The response. See :py:meth:`send_command` for details. 494 """ 495 if not task_id: 496 raise RequiredArgument( 497 function=self.stop_task.__name__, argument='task_id' 498 ) 499 500 cmd = XmlCommand("stop_task") 501 cmd.set_attribute("task_id", task_id) 502 503 return self._send_xml_command(cmd) 504