1"""
2A convenience system to manage jobs, both active and already run
3"""
4
5import fnmatch
6import logging
7import os
8
9import salt.client
10import salt.minion
11import salt.payload
12import salt.returners
13import salt.utils.args
14import salt.utils.files
15import salt.utils.jid
16import salt.utils.master
17from salt.exceptions import SaltClientError
18
19try:
20    import dateutil.parser as dateutil_parser
21
22    DATEUTIL_SUPPORT = True
23except ImportError:
24    DATEUTIL_SUPPORT = False
25
26log = logging.getLogger(__name__)
27
28
29def master():
30    """
31    Return the actively executing runners for the master
32
33    CLI Example:
34
35    .. code-block:: bash
36
37        salt-run jobs.master
38    """
39    return salt.utils.master.get_running_jobs(__opts__)
40
41
42def active(display_progress=False):
43    """
44    Return a report on all actively running jobs from a job id centric
45    perspective
46
47    CLI Example:
48
49    .. code-block:: bash
50
51        salt-run jobs.active
52    """
53    ret = {}
54    with salt.client.get_local_client(__opts__["conf_file"]) as client:
55        try:
56            active_ = client.cmd("*", "saltutil.running", timeout=__opts__["timeout"])
57        except SaltClientError as client_error:
58            print(client_error)
59            return ret
60
61    if display_progress:
62        __jid_event__.fire_event(
63            {
64                "message": "Attempting to contact minions: {}".format(
65                    list(active_.keys())
66                )
67            },
68            "progress",
69        )
70    for minion, data in active_.items():
71        if display_progress:
72            __jid_event__.fire_event(
73                {"message": "Received reply from minion {}".format(minion)}, "progress"
74            )
75        if not isinstance(data, list):
76            continue
77        for job in data:
78            if not job["jid"] in ret:
79                ret[job["jid"]] = _format_jid_instance(job["jid"], job)
80                ret[job["jid"]].update(
81                    {"Running": [{minion: job.get("pid", None)}], "Returned": []}
82                )
83            else:
84                ret[job["jid"]]["Running"].append({minion: job["pid"]})
85
86    mminion = salt.minion.MasterMinion(__opts__)
87    for jid in ret:
88        returner = _get_returner(
89            (__opts__["ext_job_cache"], __opts__["master_job_cache"])
90        )
91        data = mminion.returners["{}.get_jid".format(returner)](jid)
92        if data:
93            for minion in data:
94                if minion not in ret[jid]["Returned"]:
95                    ret[jid]["Returned"].append(minion)
96
97    return ret
98
99
100def lookup_jid(
101    jid, ext_source=None, returned=True, missing=False, display_progress=False
102):
103    """
104    Return the printout from a previously executed job
105
106    jid
107        The jid to look up.
108
109    ext_source
110        The external job cache to use. Default: `None`.
111
112    returned : True
113        If ``True``, include the minions that did return from the command.
114
115        .. versionadded:: 2015.8.0
116
117    missing : False
118        If ``True``, include the minions that did *not* return from the
119        command.
120
121    display_progress : False
122        If ``True``, fire progress events.
123
124        .. versionadded:: 2015.5.0
125
126    CLI Example:
127
128    .. code-block:: bash
129
130        salt-run jobs.lookup_jid 20130916125524463507
131        salt-run jobs.lookup_jid 20130916125524463507 --out=highstate
132    """
133    ret = {}
134    mminion = salt.minion.MasterMinion(__opts__)
135    returner = _get_returner(
136        (__opts__["ext_job_cache"], ext_source, __opts__["master_job_cache"])
137    )
138
139    try:
140        data = list_job(jid, ext_source=ext_source, display_progress=display_progress)
141    except TypeError:
142        return "Requested returner could not be loaded. No JIDs could be retrieved."
143
144    targeted_minions = data.get("Minions", [])
145    returns = data.get("Result", {})
146
147    if returns:
148        for minion in returns:
149            if display_progress:
150                __jid_event__.fire_event({"message": minion}, "progress")
151            if "return" in returns[minion]:
152                if returned:
153                    ret[minion] = returns[minion].get("return")
154            else:
155                if returned:
156                    ret[minion] = returns[minion].get("return")
157    if missing:
158        for minion_id in (x for x in targeted_minions if x not in returns):
159            ret[minion_id] = "Minion did not return"
160
161    # We need to check to see if the 'out' key is present and use it to specify
162    # the correct outputter, so we get highstate output for highstate runs.
163    try:
164        # Check if the return data has an 'out' key. We'll use that as the
165        # outputter in the absence of one being passed on the CLI.
166        outputter = returns[next(iter(returns))].get("out")
167    except (StopIteration, AttributeError):
168        outputter = None
169
170    if outputter:
171        return {"outputter": outputter, "data": ret}
172    else:
173        return ret
174
175
176def list_job(jid, ext_source=None, display_progress=False):
177    """
178    List a specific job given by its jid
179
180    ext_source
181        If provided, specifies which external job cache to use.
182
183    display_progress : False
184        If ``True``, fire progress events.
185
186        .. versionadded:: 2015.8.8
187
188    CLI Example:
189
190    .. code-block:: bash
191
192        salt-run jobs.list_job 20130916125524463507
193        salt-run jobs.list_job 20130916125524463507 --out=pprint
194    """
195    ret = {"jid": jid}
196    mminion = salt.minion.MasterMinion(__opts__)
197    returner = _get_returner(
198        (__opts__["ext_job_cache"], ext_source, __opts__["master_job_cache"])
199    )
200    if display_progress:
201        __jid_event__.fire_event(
202            {"message": "Querying returner: {}".format(returner)}, "progress"
203        )
204
205    job = mminion.returners["{}.get_load".format(returner)](jid)
206    ret.update(_format_jid_instance(jid, job))
207    ret["Result"] = mminion.returners["{}.get_jid".format(returner)](jid)
208
209    fstr = "{}.get_endtime".format(__opts__["master_job_cache"])
210    if __opts__.get("job_cache_store_endtime") and fstr in mminion.returners:
211        endtime = mminion.returners[fstr](jid)
212        if endtime:
213            ret["EndTime"] = endtime
214
215    return ret
216
217
218def list_jobs(
219    ext_source=None,
220    outputter=None,
221    search_metadata=None,
222    search_function=None,
223    search_target=None,
224    start_time=None,
225    end_time=None,
226    display_progress=False,
227):
228    """
229    List all detectable jobs and associated functions
230
231    ext_source
232        If provided, specifies which external job cache to use.
233
234    **FILTER OPTIONS**
235
236    .. note::
237        If more than one of the below options are used, only jobs which match
238        *all* of the filters will be returned.
239
240    search_metadata
241        Specify a dictionary to match to the job's metadata. If any of the
242        key-value pairs in this dictionary match, the job will be returned.
243        Example:
244
245        .. code-block:: bash
246
247            salt-run jobs.list_jobs search_metadata='{"foo": "bar", "baz": "qux"}'
248
249    search_function
250        Can be passed as a string or a list. Returns jobs which match the
251        specified function. Globbing is allowed. Example:
252
253        .. code-block:: bash
254
255            salt-run jobs.list_jobs search_function='test.*'
256            salt-run jobs.list_jobs search_function='["test.*", "pkg.install"]'
257
258        .. versionchanged:: 2015.8.8
259            Multiple targets can now also be passed as a comma-separated list.
260            For example:
261
262            .. code-block:: bash
263
264                salt-run jobs.list_jobs search_function='test.*,pkg.install'
265
266    search_target
267        Can be passed as a string or a list. Returns jobs which match the
268        specified minion name. Globbing is allowed. Example:
269
270        .. code-block:: bash
271
272            salt-run jobs.list_jobs search_target='*.mydomain.tld'
273            salt-run jobs.list_jobs search_target='["db*", "myminion"]'
274
275        .. versionchanged:: 2015.8.8
276            Multiple targets can now also be passed as a comma-separated list.
277            For example:
278
279            .. code-block:: bash
280
281                salt-run jobs.list_jobs search_target='db*,myminion'
282
283    start_time
284        Accepts any timestamp supported by the dateutil_ Python module (if this
285        module is not installed, this argument will be ignored). Returns jobs
286        which started after this timestamp.
287
288    end_time
289        Accepts any timestamp supported by the dateutil_ Python module (if this
290        module is not installed, this argument will be ignored). Returns jobs
291        which started before this timestamp.
292
293    .. _dateutil: https://pypi.python.org/pypi/python-dateutil
294
295    CLI Example:
296
297    .. code-block:: bash
298
299        salt-run jobs.list_jobs
300        salt-run jobs.list_jobs search_function='test.*' search_target='localhost' search_metadata='{"bar": "foo"}'
301        salt-run jobs.list_jobs start_time='2015, Mar 16 19:00' end_time='2015, Mar 18 22:00'
302
303    """
304    returner = _get_returner(
305        (__opts__["ext_job_cache"], ext_source, __opts__["master_job_cache"])
306    )
307    if display_progress:
308        __jid_event__.fire_event(
309            {"message": "Querying returner {} for jobs.".format(returner)}, "progress"
310        )
311    mminion = salt.minion.MasterMinion(__opts__)
312
313    ret = mminion.returners["{}.get_jids".format(returner)]()
314
315    mret = {}
316    for item in ret:
317        _match = True
318        if search_metadata:
319            _match = False
320            if "Metadata" in ret[item]:
321                if isinstance(search_metadata, dict):
322                    for key in search_metadata:
323                        if key in ret[item]["Metadata"]:
324                            if ret[item]["Metadata"][key] == search_metadata[key]:
325                                _match = True
326                else:
327                    log.info(
328                        "The search_metadata parameter must be specified"
329                        " as a dictionary.  Ignoring."
330                    )
331        if search_target and _match:
332            _match = False
333            if "Target" in ret[item]:
334                targets = ret[item]["Target"]
335                if isinstance(targets, str):
336                    targets = [targets]
337                for target in targets:
338                    for key in salt.utils.args.split_input(search_target):
339                        if fnmatch.fnmatch(target, key):
340                            _match = True
341
342        if search_function and _match:
343            _match = False
344            if "Function" in ret[item]:
345                for key in salt.utils.args.split_input(search_function):
346                    if fnmatch.fnmatch(ret[item]["Function"], key):
347                        _match = True
348
349        if start_time and _match:
350            _match = False
351            if DATEUTIL_SUPPORT:
352                parsed_start_time = dateutil_parser.parse(start_time)
353                _start_time = dateutil_parser.parse(ret[item]["StartTime"])
354                if _start_time >= parsed_start_time:
355                    _match = True
356            else:
357                log.error(
358                    "'dateutil' library not available, skipping start_time comparison."
359                )
360
361        if end_time and _match:
362            _match = False
363            if DATEUTIL_SUPPORT:
364                parsed_end_time = dateutil_parser.parse(end_time)
365                _start_time = dateutil_parser.parse(ret[item]["StartTime"])
366                if _start_time <= parsed_end_time:
367                    _match = True
368            else:
369                log.error(
370                    "'dateutil' library not available, skipping end_time comparison."
371                )
372
373        if _match:
374            mret[item] = ret[item]
375
376    if outputter:
377        return {"outputter": outputter, "data": mret}
378    else:
379        return mret
380
381
382def list_jobs_filter(
383    count, filter_find_job=True, ext_source=None, outputter=None, display_progress=False
384):
385    """
386    List all detectable jobs and associated functions
387
388    ext_source
389        The external job cache to use. Default: `None`.
390
391    CLI Example:
392
393    .. code-block:: bash
394
395        salt-run jobs.list_jobs_filter 50
396        salt-run jobs.list_jobs_filter 100 filter_find_job=False
397
398    """
399    returner = _get_returner(
400        (__opts__["ext_job_cache"], ext_source, __opts__["master_job_cache"])
401    )
402    if display_progress:
403        __jid_event__.fire_event(
404            {"message": "Querying returner {} for jobs.".format(returner)}, "progress"
405        )
406    mminion = salt.minion.MasterMinion(__opts__)
407
408    fun = "{}.get_jids_filter".format(returner)
409    if fun not in mminion.returners:
410        raise NotImplementedError(
411            "'{}' returner function not implemented yet.".format(fun)
412        )
413    ret = mminion.returners[fun](count, filter_find_job)
414
415    if outputter:
416        return {"outputter": outputter, "data": ret}
417    else:
418        return ret
419
420
421def print_job(jid, ext_source=None):
422    """
423    Print a specific job's detail given by its jid, including the return data.
424
425    CLI Example:
426
427    .. code-block:: bash
428
429        salt-run jobs.print_job 20130916125524463507
430    """
431    ret = {}
432
433    returner = _get_returner(
434        (__opts__["ext_job_cache"], ext_source, __opts__["master_job_cache"])
435    )
436    mminion = salt.minion.MasterMinion(__opts__)
437
438    try:
439        job = mminion.returners["{}.get_load".format(returner)](jid)
440        ret[jid] = _format_jid_instance(jid, job)
441    except TypeError:
442        ret[jid]["Result"] = (
443            "Requested returner {} is not available. Jobs cannot be "
444            "retrieved. Check master log for details.".format(returner)
445        )
446        return ret
447    ret[jid]["Result"] = mminion.returners["{}.get_jid".format(returner)](jid)
448
449    fstr = "{}.get_endtime".format(__opts__["master_job_cache"])
450    if __opts__.get("job_cache_store_endtime") and fstr in mminion.returners:
451        endtime = mminion.returners[fstr](jid)
452        if endtime:
453            ret[jid]["EndTime"] = endtime
454
455    return ret
456
457
458def exit_success(jid, ext_source=None):
459    """
460    Check if a job has been executed and exit successfully
461
462    jid
463        The jid to look up.
464    ext_source
465        The external job cache to use. Default: `None`.
466
467    CLI Example:
468
469    .. code-block:: bash
470
471        salt-run jobs.exit_success 20160520145827701627
472    """
473    ret = dict()
474
475    data = list_job(jid, ext_source=ext_source)
476
477    minions = data.get("Minions", [])
478    result = data.get("Result", {})
479
480    for minion in minions:
481        if minion in result and "return" in result[minion]:
482            ret[minion] = True if result[minion]["return"] else False
483        else:
484            ret[minion] = False
485
486    for minion in result:
487        if "return" in result[minion] and result[minion]["return"]:
488            ret[minion] = True
489    return ret
490
491
492def last_run(
493    ext_source=None,
494    outputter=None,
495    metadata=None,
496    function=None,
497    target=None,
498    display_progress=False,
499):
500    """
501    .. versionadded:: 2015.8.0
502
503    List all detectable jobs and associated functions
504
505    CLI Example:
506
507    .. code-block:: bash
508
509        salt-run jobs.last_run
510        salt-run jobs.last_run target=nodename
511        salt-run jobs.last_run function='cmd.run'
512        salt-run jobs.last_run metadata="{'foo': 'bar'}"
513    """
514
515    if metadata:
516        if not isinstance(metadata, dict):
517            log.info("The metadata parameter must be specified as a dictionary")
518            return False
519
520    _all_jobs = list_jobs(
521        ext_source=ext_source,
522        outputter=outputter,
523        search_metadata=metadata,
524        search_function=function,
525        search_target=target,
526        display_progress=display_progress,
527    )
528    if _all_jobs:
529        last_job = sorted(_all_jobs)[-1]
530        return print_job(last_job, ext_source)
531    else:
532        return False
533
534
535def _get_returner(returner_types):
536    """
537    Helper to iterate over returner_types and pick the first one
538    """
539    for returner in returner_types:
540        if returner and returner is not None:
541            return returner
542
543
544def _format_job_instance(job):
545    """
546    Helper to format a job instance
547    """
548    if not job:
549        ret = {"Error": "Cannot contact returner or no job with this jid"}
550        return ret
551
552    ret = {
553        "Function": job.get("fun", "unknown-function"),
554        "Arguments": list(job.get("arg", [])),
555        # unlikely but safeguard from invalid returns
556        "Target": job.get("tgt", "unknown-target"),
557        "Target-type": job.get("tgt_type", "list"),
558        "User": job.get("user", "root"),
559    }
560
561    if "metadata" in job:
562        ret["Metadata"] = job.get("metadata", {})
563    else:
564        if "kwargs" in job:
565            if "metadata" in job["kwargs"]:
566                ret["Metadata"] = job["kwargs"].get("metadata", {})
567
568    if "Minions" in job:
569        ret["Minions"] = job["Minions"]
570    return ret
571
572
573def _format_jid_instance(jid, job):
574    """
575    Helper to format jid instance
576    """
577    ret = _format_job_instance(job)
578    ret.update({"StartTime": salt.utils.jid.jid_to_time(jid)})
579    return ret
580
581
582def _walk_through(job_dir, display_progress=False):
583    """
584    Walk through the job dir and return jobs
585    """
586    for top in os.listdir(job_dir):
587        t_path = os.path.join(job_dir, top)
588
589        for final in os.listdir(t_path):
590            load_path = os.path.join(t_path, final, ".load.p")
591            with salt.utils.files.fopen(load_path, "rb") as rfh:
592                job = salt.payload.load(rfh)
593
594            if not os.path.isfile(load_path):
595                continue
596
597            with salt.utils.files.fopen(load_path, "rb") as rfh:
598                job = salt.payload.load(rfh)
599            jid = job["jid"]
600            if display_progress:
601                __jid_event__.fire_event(
602                    {"message": "Found JID {}".format(jid)}, "progress"
603                )
604            yield jid, job, t_path, final
605