1# -*- coding: utf-8 -*- 2"""Job control for the xonsh shell.""" 3import os 4import sys 5import time 6import ctypes 7import signal 8import builtins 9import subprocess 10import collections 11 12from xonsh.lazyasd import LazyObject 13from xonsh.platform import FD_STDERR, ON_DARWIN, ON_WINDOWS, ON_CYGWIN, ON_MSYS, LIBC 14from xonsh.tools import unthreadable 15 16 17tasks = LazyObject(collections.deque, globals(), "tasks") 18# Track time stamp of last exit command, so that two consecutive attempts to 19# exit can kill all jobs and exit. 20_last_exit_time = None 21 22 23if ON_DARWIN: 24 25 def _send_signal(job, signal): 26 # On OS X, os.killpg() may cause PermissionError when there are 27 # any zombie processes in the process group. 28 # See github issue #1012 for details 29 for pid in job["pids"]: 30 if pid is None: # the pid of an aliased proc is None 31 continue 32 try: 33 os.kill(pid, signal) 34 except ProcessLookupError: 35 pass 36 37 38elif ON_WINDOWS: 39 pass 40elif ON_CYGWIN or ON_MSYS: 41 # Similar to what happened on OSX, more issues on Cygwin 42 # (see Github issue #514). 43 def _send_signal(job, signal): 44 try: 45 os.killpg(job["pgrp"], signal) 46 except Exception: 47 for pid in job["pids"]: 48 try: 49 os.kill(pid, signal) 50 except Exception: 51 pass 52 53 54else: 55 56 def _send_signal(job, signal): 57 pgrp = job["pgrp"] 58 if pgrp is None: 59 for pid in job["pids"]: 60 try: 61 os.kill(pid, signal) 62 except Exception: 63 pass 64 else: 65 os.killpg(job["pgrp"], signal) 66 67 68if ON_WINDOWS: 69 70 def _continue(job): 71 job["status"] = "running" 72 73 def _kill(job): 74 subprocess.check_output(["taskkill", "/F", "/T", "/PID", str(job["obj"].pid)]) 75 76 def ignore_sigtstp(): 77 pass 78 79 def give_terminal_to(pgid): 80 pass 81 82 def wait_for_active_job(last_task=None, backgrounded=False): 83 """ 84 Wait for the active job to finish, to be killed by SIGINT, or to be 85 suspended by ctrl-z. 86 """ 87 _clear_dead_jobs() 88 active_task = get_next_task() 89 # Return when there are no foreground active task 90 if active_task is None: 91 return last_task 92 obj = active_task["obj"] 93 _continue(active_task) 94 while obj.returncode is None: 95 try: 96 obj.wait(0.01) 97 except subprocess.TimeoutExpired: 98 pass 99 except KeyboardInterrupt: 100 _kill(active_task) 101 return wait_for_active_job(last_task=active_task) 102 103 104else: 105 106 def _continue(job): 107 _send_signal(job, signal.SIGCONT) 108 109 def _kill(job): 110 _send_signal(job, signal.SIGKILL) 111 112 def ignore_sigtstp(): 113 signal.signal(signal.SIGTSTP, signal.SIG_IGN) 114 115 _shell_pgrp = os.getpgrp() 116 117 _block_when_giving = LazyObject( 118 lambda: (signal.SIGTTOU, signal.SIGTTIN, signal.SIGTSTP, signal.SIGCHLD), 119 globals(), 120 "_block_when_giving", 121 ) 122 123 if ON_CYGWIN or ON_MSYS: 124 # on cygwin, signal.pthread_sigmask does not exist in Python, even 125 # though pthread_sigmask is defined in the kernel. thus, we use 126 # ctypes to mimic the calls in the "normal" version below. 127 LIBC.pthread_sigmask.restype = ctypes.c_int 128 LIBC.pthread_sigmask.argtypes = [ 129 ctypes.c_int, 130 ctypes.POINTER(ctypes.c_ulong), 131 ctypes.POINTER(ctypes.c_ulong), 132 ] 133 134 def _pthread_sigmask(how, signals): 135 mask = 0 136 for sig in signals: 137 mask |= 1 << sig 138 oldmask = ctypes.c_ulong() 139 mask = ctypes.c_ulong(mask) 140 result = LIBC.pthread_sigmask( 141 how, ctypes.byref(mask), ctypes.byref(oldmask) 142 ) 143 if result: 144 raise OSError(result, "Sigmask error.") 145 146 return { 147 sig 148 for sig in getattr(signal, "Signals", range(0, 65)) 149 if (oldmask.value >> sig) & 1 150 } 151 152 else: 153 _pthread_sigmask = signal.pthread_sigmask 154 155 # give_terminal_to is a simplified version of: 156 # give_terminal_to from bash 4.3 source, jobs.c, line 4030 157 # this will give the terminal to the process group pgid 158 def give_terminal_to(pgid): 159 if pgid is None: 160 return False 161 oldmask = _pthread_sigmask(signal.SIG_BLOCK, _block_when_giving) 162 try: 163 os.tcsetpgrp(FD_STDERR, pgid) 164 return True 165 except ProcessLookupError: 166 # when the process finished before giving terminal to it, 167 # see issue #2288 168 return False 169 except OSError as e: 170 if e.errno == 22: # [Errno 22] Invalid argument 171 # there are cases that all the processes of pgid have 172 # finished, then we don't need to do anything here, see 173 # issue #2220 174 return False 175 elif e.errno == 25: # [Errno 25] Inappropriate ioctl for device 176 # There are also cases where we are not connected to a 177 # real TTY, even though we may be run in interactive 178 # mode. See issue #2267 for an example with emacs 179 return False 180 else: 181 raise 182 finally: 183 _pthread_sigmask(signal.SIG_SETMASK, oldmask) 184 185 def wait_for_active_job(last_task=None, backgrounded=False): 186 """ 187 Wait for the active job to finish, to be killed by SIGINT, or to be 188 suspended by ctrl-z. 189 """ 190 _clear_dead_jobs() 191 active_task = get_next_task() 192 # Return when there are no foreground active task 193 if active_task is None: 194 return last_task 195 obj = active_task["obj"] 196 backgrounded = False 197 try: 198 _, wcode = os.waitpid(obj.pid, os.WUNTRACED) 199 except ChildProcessError: # No child processes 200 return wait_for_active_job(last_task=active_task, backgrounded=backgrounded) 201 if os.WIFSTOPPED(wcode): 202 print("^Z") 203 active_task["status"] = "stopped" 204 backgrounded = True 205 elif os.WIFSIGNALED(wcode): 206 print() # get a newline because ^C will have been printed 207 obj.signal = (os.WTERMSIG(wcode), os.WCOREDUMP(wcode)) 208 obj.returncode = None 209 else: 210 obj.returncode = os.WEXITSTATUS(wcode) 211 obj.signal = None 212 return wait_for_active_job(last_task=active_task, backgrounded=backgrounded) 213 214 215def get_next_task(): 216 """ Get the next active task and put it on top of the queue""" 217 selected_task = None 218 for tid in tasks: 219 task = get_task(tid) 220 if not task["bg"] and task["status"] == "running": 221 selected_task = tid 222 break 223 if selected_task is None: 224 return 225 tasks.remove(selected_task) 226 tasks.appendleft(selected_task) 227 return get_task(selected_task) 228 229 230def get_task(tid): 231 return builtins.__xonsh_all_jobs__[tid] 232 233 234def _clear_dead_jobs(): 235 to_remove = set() 236 for tid in tasks: 237 obj = get_task(tid)["obj"] 238 if obj is None or obj.poll() is not None: 239 to_remove.add(tid) 240 for job in to_remove: 241 tasks.remove(job) 242 del builtins.__xonsh_all_jobs__[job] 243 244 245def print_one_job(num, outfile=sys.stdout): 246 """Print a line describing job number ``num``.""" 247 try: 248 job = builtins.__xonsh_all_jobs__[num] 249 except KeyError: 250 return 251 pos = "+" if tasks[0] == num else "-" if tasks[1] == num else " " 252 status = job["status"] 253 cmd = [" ".join(i) if isinstance(i, list) else i for i in job["cmds"]] 254 cmd = " ".join(cmd) 255 pid = job["pids"][-1] 256 bg = " &" if job["bg"] else "" 257 print("[{}]{} {}: {}{} ({})".format(num, pos, status, cmd, bg, pid), file=outfile) 258 259 260def get_next_job_number(): 261 """Get the lowest available unique job number (for the next job created). 262 """ 263 _clear_dead_jobs() 264 i = 1 265 while i in builtins.__xonsh_all_jobs__: 266 i += 1 267 return i 268 269 270def add_job(info): 271 """Add a new job to the jobs dictionary.""" 272 num = get_next_job_number() 273 info["started"] = time.time() 274 info["status"] = "running" 275 tasks.appendleft(num) 276 builtins.__xonsh_all_jobs__[num] = info 277 if info["bg"] and builtins.__xonsh_env__.get("XONSH_INTERACTIVE"): 278 print_one_job(num) 279 280 281def clean_jobs(): 282 """Clean up jobs for exiting shell 283 284 In non-interactive mode, kill all jobs. 285 286 In interactive mode, check for suspended or background jobs, print a 287 warning if any exist, and return False. Otherwise, return True. 288 """ 289 jobs_clean = True 290 if builtins.__xonsh_env__["XONSH_INTERACTIVE"]: 291 _clear_dead_jobs() 292 293 if builtins.__xonsh_all_jobs__: 294 global _last_exit_time 295 hist = builtins.__xonsh_history__ 296 if hist is not None and len(hist.tss) > 0: 297 last_cmd_start = hist.tss[-1][0] 298 else: 299 last_cmd_start = None 300 301 if _last_exit_time and last_cmd_start and _last_exit_time > last_cmd_start: 302 # Exit occurred after last command started, so it was called as 303 # part of the last command and is now being called again 304 # immediately. Kill jobs and exit without reminder about 305 # unfinished jobs in this case. 306 kill_all_jobs() 307 else: 308 if len(builtins.__xonsh_all_jobs__) > 1: 309 msg = "there are unfinished jobs" 310 else: 311 msg = "there is an unfinished job" 312 313 if "prompt_toolkit" not in builtins.__xonsh_env__["SHELL_TYPE"]: 314 # The Ctrl+D binding for prompt_toolkit already inserts a 315 # newline 316 print() 317 print("xonsh: {}".format(msg), file=sys.stderr) 318 print("-" * 5, file=sys.stderr) 319 jobs([], stdout=sys.stderr) 320 print("-" * 5, file=sys.stderr) 321 print( 322 'Type "exit" or press "ctrl-d" again to force quit.', 323 file=sys.stderr, 324 ) 325 jobs_clean = False 326 _last_exit_time = time.time() 327 else: 328 kill_all_jobs() 329 330 return jobs_clean 331 332 333def kill_all_jobs(): 334 """ 335 Send SIGKILL to all child processes (called when exiting xonsh). 336 """ 337 _clear_dead_jobs() 338 for job in builtins.__xonsh_all_jobs__.values(): 339 _kill(job) 340 341 342def jobs(args, stdin=None, stdout=sys.stdout, stderr=None): 343 """ 344 xonsh command: jobs 345 346 Display a list of all current jobs. 347 """ 348 _clear_dead_jobs() 349 for j in tasks: 350 print_one_job(j, outfile=stdout) 351 return None, None 352 353 354@unthreadable 355def fg(args, stdin=None): 356 """ 357 xonsh command: fg 358 359 Bring the currently active job to the foreground, or, if a single number is 360 given as an argument, bring that job to the foreground. Additionally, 361 specify "+" for the most recent job and "-" for the second most recent job. 362 """ 363 _clear_dead_jobs() 364 if len(tasks) == 0: 365 return "", "Cannot bring nonexistent job to foreground.\n" 366 367 if len(args) == 0: 368 tid = tasks[0] # take the last manipulated task by default 369 elif len(args) == 1: 370 try: 371 if args[0] == "+": # take the last manipulated task 372 tid = tasks[0] 373 elif args[0] == "-": # take the second to last manipulated task 374 tid = tasks[1] 375 else: 376 tid = int(args[0]) 377 except (ValueError, IndexError): 378 return "", "Invalid job: {}\n".format(args[0]) 379 380 if tid not in builtins.__xonsh_all_jobs__: 381 return "", "Invalid job: {}\n".format(args[0]) 382 else: 383 return "", "fg expects 0 or 1 arguments, not {}\n".format(len(args)) 384 385 # Put this one on top of the queue 386 tasks.remove(tid) 387 tasks.appendleft(tid) 388 389 job = get_task(tid) 390 job["bg"] = False 391 job["status"] = "running" 392 if builtins.__xonsh_env__.get("XONSH_INTERACTIVE"): 393 print_one_job(tid) 394 pipeline = job["pipeline"] 395 pipeline.resume(job) 396 397 398def bg(args, stdin=None): 399 """xonsh command: bg 400 401 Resume execution of the currently active job in the background, or, if a 402 single number is given as an argument, resume that job in the background. 403 """ 404 res = fg(args, stdin) 405 if res is None: 406 curtask = get_task(tasks[0]) 407 curtask["bg"] = True 408 _continue(curtask) 409 else: 410 return res 411