1# -*- coding: utf-8 -*-
2"""Job control for the xonsh shell."""
3import os
4import sys
5import time
6import ctypes
7import signal
8import builtins
9import subprocess
10import collections
11
12from xonsh.lazyasd import LazyObject
13from xonsh.platform import FD_STDERR, ON_DARWIN, ON_WINDOWS, ON_CYGWIN, ON_MSYS, LIBC
14from xonsh.tools import unthreadable
15
16
17tasks = LazyObject(collections.deque, globals(), "tasks")
18# Track time stamp of last exit command, so that two consecutive attempts to
19# exit can kill all jobs and exit.
20_last_exit_time = None
21
22
23if ON_DARWIN:
24
25    def _send_signal(job, signal):
26        # On OS X, os.killpg() may cause PermissionError when there are
27        # any zombie processes in the process group.
28        # See github issue #1012 for details
29        for pid in job["pids"]:
30            if pid is None:  # the pid of an aliased proc is None
31                continue
32            try:
33                os.kill(pid, signal)
34            except ProcessLookupError:
35                pass
36
37
38elif ON_WINDOWS:
39    pass
40elif ON_CYGWIN or ON_MSYS:
41    # Similar to what happened on OSX, more issues on Cygwin
42    # (see Github issue #514).
43    def _send_signal(job, signal):
44        try:
45            os.killpg(job["pgrp"], signal)
46        except Exception:
47            for pid in job["pids"]:
48                try:
49                    os.kill(pid, signal)
50                except Exception:
51                    pass
52
53
54else:
55
56    def _send_signal(job, signal):
57        pgrp = job["pgrp"]
58        if pgrp is None:
59            for pid in job["pids"]:
60                try:
61                    os.kill(pid, signal)
62                except Exception:
63                    pass
64        else:
65            os.killpg(job["pgrp"], signal)
66
67
68if ON_WINDOWS:
69
70    def _continue(job):
71        job["status"] = "running"
72
73    def _kill(job):
74        subprocess.check_output(["taskkill", "/F", "/T", "/PID", str(job["obj"].pid)])
75
76    def ignore_sigtstp():
77        pass
78
79    def give_terminal_to(pgid):
80        pass
81
82    def wait_for_active_job(last_task=None, backgrounded=False):
83        """
84        Wait for the active job to finish, to be killed by SIGINT, or to be
85        suspended by ctrl-z.
86        """
87        _clear_dead_jobs()
88        active_task = get_next_task()
89        # Return when there are no foreground active task
90        if active_task is None:
91            return last_task
92        obj = active_task["obj"]
93        _continue(active_task)
94        while obj.returncode is None:
95            try:
96                obj.wait(0.01)
97            except subprocess.TimeoutExpired:
98                pass
99            except KeyboardInterrupt:
100                _kill(active_task)
101        return wait_for_active_job(last_task=active_task)
102
103
104else:
105
106    def _continue(job):
107        _send_signal(job, signal.SIGCONT)
108
109    def _kill(job):
110        _send_signal(job, signal.SIGKILL)
111
112    def ignore_sigtstp():
113        signal.signal(signal.SIGTSTP, signal.SIG_IGN)
114
115    _shell_pgrp = os.getpgrp()
116
117    _block_when_giving = LazyObject(
118        lambda: (signal.SIGTTOU, signal.SIGTTIN, signal.SIGTSTP, signal.SIGCHLD),
119        globals(),
120        "_block_when_giving",
121    )
122
123    if ON_CYGWIN or ON_MSYS:
124        # on cygwin, signal.pthread_sigmask does not exist in Python, even
125        # though pthread_sigmask is defined in the kernel.  thus, we use
126        # ctypes to mimic the calls in the "normal" version below.
127        LIBC.pthread_sigmask.restype = ctypes.c_int
128        LIBC.pthread_sigmask.argtypes = [
129            ctypes.c_int,
130            ctypes.POINTER(ctypes.c_ulong),
131            ctypes.POINTER(ctypes.c_ulong),
132        ]
133
134        def _pthread_sigmask(how, signals):
135            mask = 0
136            for sig in signals:
137                mask |= 1 << sig
138            oldmask = ctypes.c_ulong()
139            mask = ctypes.c_ulong(mask)
140            result = LIBC.pthread_sigmask(
141                how, ctypes.byref(mask), ctypes.byref(oldmask)
142            )
143            if result:
144                raise OSError(result, "Sigmask error.")
145
146            return {
147                sig
148                for sig in getattr(signal, "Signals", range(0, 65))
149                if (oldmask.value >> sig) & 1
150            }
151
152    else:
153        _pthread_sigmask = signal.pthread_sigmask
154
155    # give_terminal_to is a simplified version of:
156    #    give_terminal_to from bash 4.3 source, jobs.c, line 4030
157    # this will give the terminal to the process group pgid
158    def give_terminal_to(pgid):
159        if pgid is None:
160            return False
161        oldmask = _pthread_sigmask(signal.SIG_BLOCK, _block_when_giving)
162        try:
163            os.tcsetpgrp(FD_STDERR, pgid)
164            return True
165        except ProcessLookupError:
166            # when the process finished before giving terminal to it,
167            # see issue #2288
168            return False
169        except OSError as e:
170            if e.errno == 22:  # [Errno 22] Invalid argument
171                # there are cases that all the processes of pgid have
172                # finished, then we don't need to do anything here, see
173                # issue #2220
174                return False
175            elif e.errno == 25:  # [Errno 25] Inappropriate ioctl for device
176                # There are also cases where we are not connected to a
177                # real TTY, even though we may be run in interactive
178                # mode. See issue #2267 for an example with emacs
179                return False
180            else:
181                raise
182        finally:
183            _pthread_sigmask(signal.SIG_SETMASK, oldmask)
184
185    def wait_for_active_job(last_task=None, backgrounded=False):
186        """
187        Wait for the active job to finish, to be killed by SIGINT, or to be
188        suspended by ctrl-z.
189        """
190        _clear_dead_jobs()
191        active_task = get_next_task()
192        # Return when there are no foreground active task
193        if active_task is None:
194            return last_task
195        obj = active_task["obj"]
196        backgrounded = False
197        try:
198            _, wcode = os.waitpid(obj.pid, os.WUNTRACED)
199        except ChildProcessError:  # No child processes
200            return wait_for_active_job(last_task=active_task, backgrounded=backgrounded)
201        if os.WIFSTOPPED(wcode):
202            print("^Z")
203            active_task["status"] = "stopped"
204            backgrounded = True
205        elif os.WIFSIGNALED(wcode):
206            print()  # get a newline because ^C will have been printed
207            obj.signal = (os.WTERMSIG(wcode), os.WCOREDUMP(wcode))
208            obj.returncode = None
209        else:
210            obj.returncode = os.WEXITSTATUS(wcode)
211            obj.signal = None
212        return wait_for_active_job(last_task=active_task, backgrounded=backgrounded)
213
214
215def get_next_task():
216    """ Get the next active task and put it on top of the queue"""
217    selected_task = None
218    for tid in tasks:
219        task = get_task(tid)
220        if not task["bg"] and task["status"] == "running":
221            selected_task = tid
222            break
223    if selected_task is None:
224        return
225    tasks.remove(selected_task)
226    tasks.appendleft(selected_task)
227    return get_task(selected_task)
228
229
230def get_task(tid):
231    return builtins.__xonsh_all_jobs__[tid]
232
233
234def _clear_dead_jobs():
235    to_remove = set()
236    for tid in tasks:
237        obj = get_task(tid)["obj"]
238        if obj is None or obj.poll() is not None:
239            to_remove.add(tid)
240    for job in to_remove:
241        tasks.remove(job)
242        del builtins.__xonsh_all_jobs__[job]
243
244
245def print_one_job(num, outfile=sys.stdout):
246    """Print a line describing job number ``num``."""
247    try:
248        job = builtins.__xonsh_all_jobs__[num]
249    except KeyError:
250        return
251    pos = "+" if tasks[0] == num else "-" if tasks[1] == num else " "
252    status = job["status"]
253    cmd = [" ".join(i) if isinstance(i, list) else i for i in job["cmds"]]
254    cmd = " ".join(cmd)
255    pid = job["pids"][-1]
256    bg = " &" if job["bg"] else ""
257    print("[{}]{} {}: {}{} ({})".format(num, pos, status, cmd, bg, pid), file=outfile)
258
259
260def get_next_job_number():
261    """Get the lowest available unique job number (for the next job created).
262    """
263    _clear_dead_jobs()
264    i = 1
265    while i in builtins.__xonsh_all_jobs__:
266        i += 1
267    return i
268
269
270def add_job(info):
271    """Add a new job to the jobs dictionary."""
272    num = get_next_job_number()
273    info["started"] = time.time()
274    info["status"] = "running"
275    tasks.appendleft(num)
276    builtins.__xonsh_all_jobs__[num] = info
277    if info["bg"] and builtins.__xonsh_env__.get("XONSH_INTERACTIVE"):
278        print_one_job(num)
279
280
281def clean_jobs():
282    """Clean up jobs for exiting shell
283
284    In non-interactive mode, kill all jobs.
285
286    In interactive mode, check for suspended or background jobs, print a
287    warning if any exist, and return False. Otherwise, return True.
288    """
289    jobs_clean = True
290    if builtins.__xonsh_env__["XONSH_INTERACTIVE"]:
291        _clear_dead_jobs()
292
293        if builtins.__xonsh_all_jobs__:
294            global _last_exit_time
295            hist = builtins.__xonsh_history__
296            if hist is not None and len(hist.tss) > 0:
297                last_cmd_start = hist.tss[-1][0]
298            else:
299                last_cmd_start = None
300
301            if _last_exit_time and last_cmd_start and _last_exit_time > last_cmd_start:
302                # Exit occurred after last command started, so it was called as
303                # part of the last command and is now being called again
304                # immediately. Kill jobs and exit without reminder about
305                # unfinished jobs in this case.
306                kill_all_jobs()
307            else:
308                if len(builtins.__xonsh_all_jobs__) > 1:
309                    msg = "there are unfinished jobs"
310                else:
311                    msg = "there is an unfinished job"
312
313                if "prompt_toolkit" not in builtins.__xonsh_env__["SHELL_TYPE"]:
314                    # The Ctrl+D binding for prompt_toolkit already inserts a
315                    # newline
316                    print()
317                print("xonsh: {}".format(msg), file=sys.stderr)
318                print("-" * 5, file=sys.stderr)
319                jobs([], stdout=sys.stderr)
320                print("-" * 5, file=sys.stderr)
321                print(
322                    'Type "exit" or press "ctrl-d" again to force quit.',
323                    file=sys.stderr,
324                )
325                jobs_clean = False
326                _last_exit_time = time.time()
327    else:
328        kill_all_jobs()
329
330    return jobs_clean
331
332
333def kill_all_jobs():
334    """
335    Send SIGKILL to all child processes (called when exiting xonsh).
336    """
337    _clear_dead_jobs()
338    for job in builtins.__xonsh_all_jobs__.values():
339        _kill(job)
340
341
342def jobs(args, stdin=None, stdout=sys.stdout, stderr=None):
343    """
344    xonsh command: jobs
345
346    Display a list of all current jobs.
347    """
348    _clear_dead_jobs()
349    for j in tasks:
350        print_one_job(j, outfile=stdout)
351    return None, None
352
353
354@unthreadable
355def fg(args, stdin=None):
356    """
357    xonsh command: fg
358
359    Bring the currently active job to the foreground, or, if a single number is
360    given as an argument, bring that job to the foreground. Additionally,
361    specify "+" for the most recent job and "-" for the second most recent job.
362    """
363    _clear_dead_jobs()
364    if len(tasks) == 0:
365        return "", "Cannot bring nonexistent job to foreground.\n"
366
367    if len(args) == 0:
368        tid = tasks[0]  # take the last manipulated task by default
369    elif len(args) == 1:
370        try:
371            if args[0] == "+":  # take the last manipulated task
372                tid = tasks[0]
373            elif args[0] == "-":  # take the second to last manipulated task
374                tid = tasks[1]
375            else:
376                tid = int(args[0])
377        except (ValueError, IndexError):
378            return "", "Invalid job: {}\n".format(args[0])
379
380        if tid not in builtins.__xonsh_all_jobs__:
381            return "", "Invalid job: {}\n".format(args[0])
382    else:
383        return "", "fg expects 0 or 1 arguments, not {}\n".format(len(args))
384
385    # Put this one on top of the queue
386    tasks.remove(tid)
387    tasks.appendleft(tid)
388
389    job = get_task(tid)
390    job["bg"] = False
391    job["status"] = "running"
392    if builtins.__xonsh_env__.get("XONSH_INTERACTIVE"):
393        print_one_job(tid)
394    pipeline = job["pipeline"]
395    pipeline.resume(job)
396
397
398def bg(args, stdin=None):
399    """xonsh command: bg
400
401    Resume execution of the currently active job in the background, or, if a
402    single number is given as an argument, resume that job in the background.
403    """
404    res = fg(args, stdin)
405    if res is None:
406        curtask = get_task(tasks[0])
407        curtask["bg"] = True
408        _continue(curtask)
409    else:
410        return res
411