1# -*- coding: utf-8 -*-
2# Copyright (C) 2021 Greenbone Networks GmbH
3#
4# SPDX-License-Identifier: GPL-3.0-or-later
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
20from collections.abc import Mapping
21from numbers import Integral
22from typing import Any, List, Optional
23
24from gvm.errors import InvalidArgument, InvalidArgumentType, RequiredArgument
25from gvm.protocols.gmpv208.entities.hosts import HostsOrdering
26from gvm.utils import add_filter, is_list_like, to_bool, to_comma_list
27from gvm.xml import XmlCommand
28
29
30class TasksMixin:
31    def clone_task(self, task_id: str) -> Any:
32        """Clone an existing task
33
34        Arguments:
35            task_id: UUID of existing task to clone from
36
37        Returns:
38            The response. See :py:meth:`send_command` for details.
39        """
40        if not task_id:
41            raise RequiredArgument(
42                function=self.clone_task.__name__, argument='task_id'
43            )
44
45        cmd = XmlCommand("create_task")
46        cmd.add_element("copy", task_id)
47        return self._send_xml_command(cmd)
48
49    def create_container_task(
50        self, name: str, *, comment: Optional[str] = None
51    ) -> Any:
52        """Create a new container task
53
54        A container task is a "meta" task to import and view reports from other
55        systems.
56
57        Arguments:
58            name: Name of the task
59            comment: Comment for the task
60
61        Returns:
62            The response. See :py:meth:`send_command` for details.
63        """
64        if not name:
65            raise RequiredArgument(
66                function=self.create_container_task.__name__, argument='name'
67            )
68
69        cmd = XmlCommand("create_task")
70        cmd.add_element("name", name)
71        cmd.add_element("target", attrs={"id": "0"})
72
73        if comment:
74            cmd.add_element("comment", comment)
75
76        return self._send_xml_command(cmd)
77
78    def create_task(
79        self,
80        name: str,
81        config_id: str,
82        target_id: str,
83        scanner_id: str,
84        *,
85        alterable: Optional[bool] = None,
86        hosts_ordering: Optional[HostsOrdering] = None,
87        schedule_id: Optional[str] = None,
88        alert_ids: Optional[List[str]] = None,
89        comment: Optional[str] = None,
90        schedule_periods: Optional[int] = None,
91        observers: Optional[List[str]] = None,
92        preferences: Optional[dict] = None,
93    ) -> Any:
94        """Create a new scan task
95
96        Arguments:
97            name: Name of the new task
98            config_id: UUID of config to use by the task
99            target_id: UUID of target to be scanned
100            scanner_id: UUID of scanner to use for scanning the target
101            comment: Comment for the task
102            alterable: Whether the task should be alterable
103            alert_ids: List of UUIDs for alerts to be applied to the task
104            hosts_ordering: The order hosts are scanned in
105            schedule_id: UUID of a schedule when the task should be run.
106            schedule_periods: A limit to the number of times the task will be
107                scheduled, or 0 for no limit
108            observers: List of names or ids of users which should be allowed to
109                observe this task
110            preferences: Name/Value pairs of scanner preferences.
111
112        Returns:
113            The response. See :py:meth:`send_command` for details.
114        """
115        if not name:
116            raise RequiredArgument(
117                function=self.create_task.__name__, argument='name'
118            )
119
120        if not config_id:
121            raise RequiredArgument(
122                function=self.create_task.__name__, argument='config_id'
123            )
124
125        if not target_id:
126            raise RequiredArgument(
127                function=self.create_task.__name__, argument='target_id'
128            )
129
130        if not scanner_id:
131            raise RequiredArgument(
132                function=self.create_task.__name__, argument='scanner_id'
133            )
134
135        # don't allow to create a container task with create_task
136        if target_id == '0':
137            raise InvalidArgument(
138                function=self.create_task.__name__, argument='target_id'
139            )
140
141        cmd = XmlCommand("create_task")
142        cmd.add_element("name", name)
143        cmd.add_element("usage_type", "scan")
144        cmd.add_element("config", attrs={"id": config_id})
145        cmd.add_element("target", attrs={"id": target_id})
146        cmd.add_element("scanner", attrs={"id": scanner_id})
147
148        if comment:
149            cmd.add_element("comment", comment)
150
151        if alterable is not None:
152            cmd.add_element("alterable", to_bool(alterable))
153
154        if hosts_ordering:
155            if not isinstance(hosts_ordering, HostsOrdering):
156                raise InvalidArgumentType(
157                    function=self.create_task.__name__,
158                    argument='hosts_ordering',
159                    arg_type=HostsOrdering.__name__,
160                )
161            cmd.add_element("hosts_ordering", hosts_ordering.value)
162
163        if alert_ids is not None:
164            if not is_list_like(alert_ids):
165                raise InvalidArgumentType(
166                    function=self.modify_task.__name__,
167                    argument='alert_ids',
168                    arg_type='list',
169                )
170
171            if not len(alert_ids) == 0:
172                for alert in alert_ids:
173                    cmd.add_element("alert", attrs={"id": str(alert)})
174
175        if schedule_id:
176            cmd.add_element("schedule", attrs={"id": schedule_id})
177
178            if schedule_periods is not None:
179                if (
180                    not isinstance(schedule_periods, Integral)
181                    or schedule_periods < 0
182                ):
183                    raise InvalidArgument(
184                        "schedule_periods must be an integer greater or equal "
185                        "than 0"
186                    )
187                cmd.add_element("schedule_periods", str(schedule_periods))
188
189        if observers is not None:
190            if not is_list_like(observers):
191                raise InvalidArgumentType(
192                    function=self.create_task.__name__,
193                    argument='observers',
194                    arg_type='list',
195                )
196
197            # gvmd splits by comma and space
198            # gvmd tries to lookup each value as user name and afterwards as
199            # user id. So both user name and user id are possible
200            cmd.add_element("observers", to_comma_list(observers))
201
202        if preferences is not None:
203            if not isinstance(preferences, Mapping):
204                raise InvalidArgumentType(
205                    function=self.create_task.__name__,
206                    argument='preferences',
207                    arg_type=Mapping.__name__,
208                )
209
210            _xmlprefs = cmd.add_element("preferences")
211            for pref_name, pref_value in preferences.items():
212                _xmlpref = _xmlprefs.add_element("preference")
213                _xmlpref.add_element("scanner_name", pref_name)
214                _xmlpref.add_element("value", str(pref_value))
215
216        return self._send_xml_command(cmd)
217
218    def delete_task(
219        self, task_id: str, *, ultimate: Optional[bool] = False
220    ) -> Any:
221        """Deletes an existing task
222
223        Arguments:
224            task_id: UUID of the task to be deleted.
225            ultimate: Whether to remove entirely, or to the trashcan.
226        """
227        if not task_id:
228            raise RequiredArgument(
229                function=self.delete_task.__name__, argument='task_id'
230            )
231
232        cmd = XmlCommand("delete_task")
233        cmd.set_attribute("task_id", task_id)
234        cmd.set_attribute("ultimate", to_bool(ultimate))
235
236        return self._send_xml_command(cmd)
237
238    def get_tasks(
239        self,
240        *,
241        filter_string: Optional[str] = None,
242        filter_id: Optional[str] = None,
243        trash: Optional[bool] = None,
244        details: Optional[bool] = None,
245        schedules_only: Optional[bool] = None,
246    ) -> Any:
247        """Request a list of tasks
248
249        Arguments:
250            filter_string: Filter term to use for the query
251            filter_id: UUID of an existing filter to use for the query
252            trash: Whether to get the trashcan tasks instead
253            details: Whether to include full task details
254            schedules_only: Whether to only include id, name and schedule
255                details
256
257        Returns:
258            The response. See :py:meth:`send_command` for details.
259        """
260        cmd = XmlCommand("get_tasks")
261        cmd.set_attribute("usage_type", "scan")
262
263        add_filter(cmd, filter_string, filter_id)
264
265        if trash is not None:
266            cmd.set_attribute("trash", to_bool(trash))
267
268        if details is not None:
269            cmd.set_attribute("details", to_bool(details))
270
271        if schedules_only is not None:
272            cmd.set_attribute("schedules_only", to_bool(schedules_only))
273
274        return self._send_xml_command(cmd)
275
276    def get_task(self, task_id: str) -> Any:
277        """Request a single task
278
279        Arguments:
280            task_id: UUID of an existing task
281
282        Returns:
283            The response. See :py:meth:`send_command` for details.
284        """
285        if not task_id:
286            raise RequiredArgument(
287                function=self.get_task.__name__, argument='task_id'
288            )
289
290        cmd = XmlCommand("get_tasks")
291        cmd.set_attribute("task_id", task_id)
292        cmd.set_attribute("usage_type", "scan")
293
294        # for single entity always request all details
295        cmd.set_attribute("details", "1")
296        return self._send_xml_command(cmd)
297
298    def modify_task(
299        self,
300        task_id: str,
301        *,
302        name: Optional[str] = None,
303        config_id: Optional[str] = None,
304        target_id: Optional[str] = None,
305        scanner_id: Optional[str] = None,
306        alterable: Optional[bool] = None,
307        hosts_ordering: Optional[HostsOrdering] = None,
308        schedule_id: Optional[str] = None,
309        schedule_periods: Optional[int] = None,
310        comment: Optional[str] = None,
311        alert_ids: Optional[List[str]] = None,
312        observers: Optional[List[str]] = None,
313        preferences: Optional[dict] = None,
314    ) -> Any:
315        """Modifies an existing task.
316
317        Arguments:
318            task_id: UUID of task to modify.
319            name: The name of the task.
320            config_id: UUID of scan config to use by the task
321            target_id: UUID of target to be scanned
322            scanner_id: UUID of scanner to use for scanning the target
323            comment: The comment on the task.
324            alert_ids: List of UUIDs for alerts to be applied to the task
325            hosts_ordering: The order hosts are scanned in
326            schedule_id: UUID of a schedule when the task should be run.
327            schedule_periods: A limit to the number of times the task will be
328                scheduled, or 0 for no limit.
329            observers: List of names or ids of users which should be allowed to
330                observe this task
331            preferences: Name/Value pairs of scanner preferences.
332
333        Returns:
334            The response. See :py:meth:`send_command` for details.
335        """
336        if not task_id:
337            raise RequiredArgument(
338                function=self.modify_task.__name__, argument='task_id argument'
339            )
340
341        cmd = XmlCommand("modify_task")
342        cmd.set_attribute("task_id", task_id)
343
344        if name:
345            cmd.add_element("name", name)
346
347        if comment:
348            cmd.add_element("comment", comment)
349
350        if config_id:
351            cmd.add_element("config", attrs={"id": config_id})
352
353        if target_id:
354            cmd.add_element("target", attrs={"id": target_id})
355
356        if alterable is not None:
357            cmd.add_element("alterable", to_bool(alterable))
358
359        if hosts_ordering:
360            if not isinstance(hosts_ordering, HostsOrdering):
361                raise InvalidArgumentType(
362                    function=self.modify_task.__name__,
363                    argument='hosts_ordering',
364                    arg_type=HostsOrdering.__name__,
365                )
366            cmd.add_element("hosts_ordering", hosts_ordering.value)
367
368        if scanner_id:
369            cmd.add_element("scanner", attrs={"id": scanner_id})
370
371        if schedule_id:
372            cmd.add_element("schedule", attrs={"id": schedule_id})
373
374        if schedule_periods is not None:
375            if (
376                not isinstance(schedule_periods, Integral)
377                or schedule_periods < 0
378            ):
379                raise InvalidArgument(
380                    "schedule_periods must be an integer greater or equal "
381                    "than 0"
382                )
383            cmd.add_element("schedule_periods", str(schedule_periods))
384
385        if alert_ids is not None:
386            if not is_list_like(alert_ids):
387                raise InvalidArgumentType(
388                    function=self.modify_task.__name__,
389                    argument='alert_ids',
390                    arg_type='list',
391                )
392
393            if len(alert_ids) == 0:
394                cmd.add_element("alert", attrs={"id": "0"})
395            else:
396                for alert in alert_ids:
397                    cmd.add_element("alert", attrs={"id": str(alert)})
398
399        if observers is not None:
400            if not is_list_like(observers):
401                raise InvalidArgumentType(
402                    function=self.modify_task.__name__,
403                    argument='observers',
404                    arg_type='list',
405                )
406
407            cmd.add_element("observers", to_comma_list(observers))
408
409        if preferences is not None:
410            if not isinstance(preferences, Mapping):
411                raise InvalidArgumentType(
412                    function=self.modify_task.__name__,
413                    argument='preferences',
414                    arg_type=Mapping.__name__,
415                )
416
417            _xmlprefs = cmd.add_element("preferences")
418            for pref_name, pref_value in preferences.items():
419                _xmlpref = _xmlprefs.add_element("preference")
420                _xmlpref.add_element("scanner_name", pref_name)
421                _xmlpref.add_element("value", str(pref_value))
422
423        return self._send_xml_command(cmd)
424
425    def move_task(self, task_id: str, *, slave_id: Optional[str] = None) -> Any:
426        """Move an existing task to another GMP slave scanner or the master
427
428        Arguments:
429            task_id: UUID of the task to be moved
430            slave_id: UUID of slave to reassign the task to, empty for master.
431
432        Returns:
433            The response. See :py:meth:`send_command` for details.
434        """
435        if not task_id:
436            raise RequiredArgument(
437                function=self.move_task.__name__, argument='task_id'
438            )
439
440        cmd = XmlCommand("move_task")
441        cmd.set_attribute("task_id", task_id)
442
443        if slave_id is not None:
444            cmd.set_attribute("slave_id", slave_id)
445
446        return self._send_xml_command(cmd)
447
448    def start_task(self, task_id: str) -> Any:
449        """Start an existing task
450
451        Arguments:
452            task_id: UUID of the task to be started
453
454        Returns:
455            The response. See :py:meth:`send_command` for details.
456        """
457        if not task_id:
458            raise RequiredArgument(
459                function=self.start_task.__name__, argument='task_id'
460            )
461
462        cmd = XmlCommand("start_task")
463        cmd.set_attribute("task_id", task_id)
464
465        return self._send_xml_command(cmd)
466
467    def resume_task(self, task_id: str) -> Any:
468        """Resume an existing stopped task
469
470        Arguments:
471            task_id: UUID of the task to be resumed
472
473        Returns:
474            The response. See :py:meth:`send_command` for details.
475        """
476        if not task_id:
477            raise RequiredArgument(
478                function=self.resume_task.__name__, argument='task_id'
479            )
480
481        cmd = XmlCommand("resume_task")
482        cmd.set_attribute("task_id", task_id)
483
484        return self._send_xml_command(cmd)
485
486    def stop_task(self, task_id: str) -> Any:
487        """Stop an existing running task
488
489        Arguments:
490            task_id: UUID of the task to be stopped
491
492        Returns:
493            The response. See :py:meth:`send_command` for details.
494        """
495        if not task_id:
496            raise RequiredArgument(
497                function=self.stop_task.__name__, argument='task_id'
498            )
499
500        cmd = XmlCommand("stop_task")
501        cmd.set_attribute("task_id", task_id)
502
503        return self._send_xml_command(cmd)
504