1# #START_LICENSE########################################################### 2# 3# 4# This file is part of the Environment for Tree Exploration program 5# (ETE). http://etetoolkit.org 6# 7# ETE is free software: you can redistribute it and/or modify it 8# under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# 12# ETE is distributed in the hope that it will be useful, but WITHOUT 13# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 14# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public 15# License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with ETE. If not, see <http://www.gnu.org/licenses/>. 19# 20# 21# ABOUT THE ETE PACKAGE 22# ===================== 23# 24# ETE is distributed under the GPL copyleft license (2008-2015). 25# 26# If you make use of ETE in published work, please cite: 27# 28# Jaime Huerta-Cepas, Joaquin Dopazo and Toni Gabaldon. 29# ETE: a python Environment for Tree Exploration. Jaime BMC 30# Bioinformatics 2010,:24doi:10.1186/1471-2105-11-24 31# 32# Note that extra references to the specific methods implemented in 33# the toolkit may be available in the documentation. 34# 35# More info at http://etetoolkit.org. Contact: huerta@embl.de 36# 37# 38# #END_LICENSE############################################################# 39from __future__ import absolute_import 40from __future__ import print_function 41 42import sys 43import os 44import signal 45import subprocess 46from multiprocessing import Process, Queue 47from six.moves.queue import Empty as QueueEmpty 48from time import sleep, ctime, time 49from collections import defaultdict, deque 50import re 51import logging 52import six 53from six.moves import map 54from six.moves import range 55log = logging.getLogger("main") 56 57from . import db 58from .errors import ConfigError, TaskError 59from .logger import set_logindent, logindent, get_logindent 60from .utils import (generate_id, PhyloTree, NodeStyle, Tree, 61 DEBUG, NPR_TREE_STYLE, faces, GLOBALS, 62 basename, pjoin, ask, send_mail, pid_up, SeqGroup, cmp) 63from .master_task import (isjob, update_task_states_recursively, 64 store_task_data_recursively, 65 remove_task_dir_recursively, 66 update_job_status) 67from .workflow.common import assembly_tree, get_cmd_log 68 69def cmp_to_key(mycmp): 70 'Convert a cmp= function into a key= function' 71 class K: 72 def __init__(self, obj, *args): 73 self.obj = obj 74 def __lt__(self, other): 75 return mycmp(self.obj, other.obj) < 0 76 def __gt__(self, other): 77 return mycmp(self.obj, other.obj) > 0 78 def __eq__(self, other): 79 return mycmp(self.obj, other.obj) == 0 80 def __le__(self, other): 81 return mycmp(self.obj, other.obj) <= 0 82 def __ge__(self, other): 83 return mycmp(self.obj, other.obj) >= 0 84 def __ne__(self, other): 85 return mycmp(self.obj, other.obj) != 0 86 return K 87 88def debug(_signal, _frame): 89 import pdb 90 pdb.set_trace() 91 92def control_c(_signal, _frame): 93 signal.signal(signal.SIGINT, signal.SIG_IGN) 94 db.commit() 95 96 ver = {28: "0", 26: "1", 24: "2", 22: "3", 20: "4", 10: "5"} 97 ver_level = log.level 98 99 print('\n\nYou pressed Ctrl+C!') 100 print('q) quit') 101 print('v) change verbosity level:', ver.get(ver_level, ver_level)) 102 print('d) enter debug mode') 103 print('c) continue execution') 104 key = ask(" Choose:", ["q", "v", "d", "c"]) 105 if key == "q": 106 raise KeyboardInterrupt 107 elif key == "d": 108 signal.signal(signal.SIGALRM, debug) 109 signal.alarm(1) 110 return 111 elif key == "v": 112 vl = ask("new level", sorted(ver.values())) 113 new_level = sorted(list(ver.keys()), reverse=True)[int(vl)] 114 log.setLevel(new_level) 115 elif key == "d": 116 import pdb 117 pdb.set_trace() 118 signal.signal(signal.SIGINT, control_c) 119 120def sort_tasks(x, y): 121 priority = { 122 "treemerger": 1, 123 "tree": 2, 124 "mchooser": 3, 125 "alg": 4, 126 "concat_alg": 5, 127 "acleaner": 6, 128 "msf":7, 129 "cog_selector":8} 130 131 x_type_prio = priority.get(x.ttype, 100) 132 y_type_prio = priority.get(y.ttype, 100) 133 134 prio_cmp = cmp(x_type_prio, y_type_prio) 135 if prio_cmp == 0: 136 x_size = getattr(x, "size", 0) 137 y_size = getattr(y, "size", 0) 138 size_cmp = cmp(x_size, y_size) * -1 139 if size_cmp == 0: 140 return cmp(x.threadid, y.threadid) 141 else: 142 return size_cmp 143 else: 144 return prio_cmp 145 146def get_stored_data(fileid): 147 try: 148 _tid, _did = fileid.split(".") 149 _did = int(_did) 150 except (IndexError, ValueError): 151 dataid = fileid 152 else: 153 dataid = db.get_dataid(_tid, _did) 154 return db.get_data(dataid) 155 156def schedule(workflow_task_processor, pending_tasks, schedule_time, execution, debug, norender): 157 # Adjust debug mode 158 if debug == "all": 159 log.setLevel(10) 160 pending_tasks = set(pending_tasks) 161 162 ## =================================== 163 ## INITIALIZE BASIC VARS 164 execution, run_detached = execution 165 thread2tasks = defaultdict(list) 166 for task in pending_tasks: 167 thread2tasks[task.configid].append(task) 168 expected_threads = set(thread2tasks.keys()) 169 past_threads = {} 170 thread_errors = defaultdict(list) 171 ## END OF VARS AND SHORTCUTS 172 ## =================================== 173 174 cores_total = GLOBALS["_max_cores"] 175 if cores_total > 0: 176 job_queue = Queue() 177 178 back_launcher = Process(target=background_job_launcher, 179 args=(job_queue, run_detached, 180 GLOBALS["launch_time"], cores_total)) 181 back_launcher.start() 182 else: 183 job_queue = None 184 back_launcher = None 185 186 GLOBALS["_background_scheduler"] = back_launcher 187 GLOBALS["_job_queue"] = job_queue 188 # Captures Ctrl-C for debuging DEBUG 189 #signal.signal(signal.SIGINT, control_c) 190 191 last_report_time = None 192 193 BUG = set() 194 try: 195 # Enters into task scheduling 196 while pending_tasks: 197 wtime = schedule_time 198 199 # ask SGE for running jobs 200 if execution == "sge": 201 #sgeid2jobs = db.get_sge_tasks() 202 #qstat_jobs = sge.qstat() 203 pass 204 else: 205 qstat_jobs = None 206 207 # Show summary of pending tasks per thread 208 thread2tasks = defaultdict(list) 209 for task in pending_tasks: 210 thread2tasks[task.configid].append(task) 211 set_logindent(0) 212 log.log(28, "@@13: Updating tasks status:@@1: (%s)" % (ctime())) 213 info_lines = [] 214 for tid, tlist in six.iteritems(thread2tasks): 215 threadname = GLOBALS[tid]["_name"] 216 sizelist = ["%s" %getattr(_ts, "size", "?") for _ts in tlist] 217 info = "Thread @@13:%s@@1:: pending tasks: @@8:%s@@1: of sizes: %s" %( 218 threadname, len(tlist), ', '.join(sizelist)) 219 info_lines.append(info) 220 221 for line in info_lines: 222 log.log(28, line) 223 224 if GLOBALS["email"] and last_report_time is None: 225 last_report_time = time() 226 send_mail(GLOBALS["email"], "Your NPR process has started", '\n'.join(info_lines)) 227 228 ## ================================ 229 ## CHECK AND UPDATE CURRENT TASKS 230 checked_tasks = set() 231 check_start_time = time() 232 to_add_tasks = set() 233 234 GLOBALS["cached_status"] = {} 235 for task in sorted(pending_tasks, key=cmp_to_key(sort_tasks)): 236 # Avoids endless periods without new job submissions 237 elapsed_time = time() - check_start_time 238 #if not back_launcher and pending_tasks and \ 239 # elapsed_time > schedule_time * 2: 240 # log.log(26, "@@8:Interrupting task checks to schedule new jobs@@1:") 241 # db.commit() 242 # wtime = launch_jobs(sorted(pending_tasks, sort_tasks), 243 # execution, run_detached) 244 # check_start_time = time() 245 246 # Enter debuging mode if necessary 247 if debug and log.level > 10 and task.taskid.startswith(debug): 248 log.setLevel(10) 249 log.debug("ENTERING IN DEBUGGING MODE") 250 thread2tasks[task.configid].append(task) 251 252 # Update tasks and job statuses 253 254 if task.taskid not in checked_tasks: 255 try: 256 show_task_info(task) 257 task.status = task.get_status(qstat_jobs) 258 db.dataconn.commit() 259 if back_launcher and task.status not in set("DE"): 260 for j, cmd in task.iter_waiting_jobs(): 261 j.status = "Q" 262 GLOBALS["cached_status"][j.jobid] = "Q" 263 if j.jobid not in BUG: 264 if not os.path.exists(j.jobdir): 265 os.makedirs(j.jobdir) 266 for ifile, outpath in six.iteritems(j.input_files): 267 try: 268 _tid, _did = ifile.split(".") 269 _did = int(_did) 270 except (IndexError, ValueError): 271 dataid = ifile 272 else: 273 dataid = db.get_dataid(_tid, _did) 274 275 if not outpath: 276 outfile = pjoin(GLOBALS["input_dir"], ifile) 277 else: 278 outfile = pjoin(outpath, ifile) 279 280 if not os.path.exists(outfile): 281 open(outfile, "w").write(db.get_data(dataid)) 282 283 log.log(24, " @@8:Queueing @@1: %s from %s" %(j, task)) 284 if execution: 285 with open(pjoin(GLOBALS[task.configid]["_outpath"], "commands.log"), "a") as CMD_LOGGER: 286 print('\t'.join([task.tname, task.taskid, j.jobname, j.jobid, j.get_launch_cmd()]), file=CMD_LOGGER) 287 288 job_queue.put([j.jobid, j.cores, cmd, j.status_file]) 289 BUG.add(j.jobid) 290 291 update_task_states_recursively(task) 292 db.commit() 293 checked_tasks.add(task.taskid) 294 except TaskError as e: 295 log.error("Errors found in %s" %task) 296 import traceback 297 traceback.print_exc() 298 if GLOBALS["email"]: 299 threadname = GLOBALS[task.configid]["_name"] 300 send_mail(GLOBALS["email"], "Errors found in %s!" %threadname, 301 '\n'.join(map(str, [task, e.value, e.msg]))) 302 pending_tasks.discard(task) 303 thread_errors[task.configid].append([task, e.value, e.msg]) 304 continue 305 else: 306 # Set temporary Queued state to avoids launching 307 # jobs from clones 308 task.status = "Q" 309 if log.level < 24: 310 show_task_info(task) 311 312 if task.status == "D": 313 #db.commit() 314 show_task_info(task) 315 logindent(3) 316 317 318 # Log commands of every task 319 # if 'cmd_log_file' not in GLOBALS[task.configid]: 320 # GLOBALS[task.configid]['cmd_log_file'] = pjoin(GLOBALS[task.configid]["_outpath"], "cmd.log") 321 # O = open(GLOBALS[task.configid]['cmd_log_file'], "w") 322 # O.close() 323 324 # cmd_lines = get_cmd_log(task) 325 # CMD_LOG = open(GLOBALS[task.configid]['cmd_log_file'], "a") 326 # print(task, file=CMD_LOG) 327 # for c in cmd_lines: 328 # print(' '+'\t'.join(map(str, c)), file=CMD_LOG) 329 # CMD_LOG.close() 330 # 331 332 try: 333 #wkname = GLOBALS[task.configid]['_name'] 334 create_tasks = workflow_task_processor(task, task.target_wkname) 335 except TaskError as e: 336 log.error("Errors found in %s" %task) 337 pending_tasks.discard(task) 338 thread_errors[task.configid].append([task, e.value, e.msg]) 339 continue 340 else: 341 logindent(-3) 342 343 to_add_tasks.update(create_tasks) 344 pending_tasks.discard(task) 345 346 elif task.status == "E": 347 log.error("task contains errors: %s " %task) 348 log.error("Errors found in %s") 349 pending_tasks.discard(task) 350 thread_errors[task.configid].append([task, None, "Found (E) task status"]) 351 352 #db.commit() 353 #if not back_launcher: 354 # wtime = launch_jobs(sorted(pending_tasks, sort_tasks), 355 # execution, run_detached) 356 357 # Update global task list with recently added jobs to be check 358 # during next cycle 359 pending_tasks.update(to_add_tasks) 360 361 ## END CHECK AND UPDATE CURRENT TASKS 362 ## ================================ 363 364 if wtime: 365 set_logindent(0) 366 log.log(28, "@@13:Waiting %s seconds@@1:" %wtime) 367 sleep(wtime) 368 else: 369 sleep(schedule_time) 370 371 # Dump / show ended threads 372 error_lines = [] 373 for configid, etasks in six.iteritems(thread_errors): 374 error_lines.append("Thread @@10:%s@@1: contains errors:" %\ 375 (GLOBALS[configid]["_name"])) 376 for error in etasks: 377 error_lines.append(" ** %s" %error[0]) 378 e_obj = error[1] if error[1] else error[0] 379 error_path = e_obj.jobdir if isjob(e_obj) else e_obj.taskid 380 if e_obj is not error[0]: 381 error_lines.append(" -> %s" %e_obj) 382 error_lines.append(" -> %s" %error_path) 383 error_lines.append(" -> %s" %error[2]) 384 for eline in error_lines: 385 log.error(eline) 386 387 pending_threads = set([ts.configid for ts in pending_tasks]) 388 finished_threads = expected_threads - (pending_threads | set(thread_errors.keys())) 389 just_finished_lines = [] 390 finished_lines = [] 391 for configid in finished_threads: 392 # configid is the the same as threadid in master tasks 393 final_tree_file = pjoin(GLOBALS[configid]["_outpath"], 394 GLOBALS["inputname"] + ".final_tree") 395 threadname = GLOBALS[configid]["_name"] 396 397 if configid in past_threads: 398 log.log(28, "Done thread @@12:%s@@1: in %d iteration(s)", 399 threadname, past_threads[configid]) 400 finished_lines.append("Finished %s in %d iteration(s)" %( 401 threadname, past_threads[configid])) 402 else: 403 404 log.log(28, "Assembling final tree...") 405 main_tree, treeiters = assembly_tree(configid) 406 past_threads[configid] = treeiters - 1 407 408 log.log(28, "Done thread @@12:%s@@1: in %d iteration(s)", 409 threadname, past_threads[configid]) 410 411 412 log.log(28, "Writing final tree for @@13:%s@@1:\n %s\n %s", 413 threadname, final_tree_file+".nw", 414 final_tree_file+".nwx (newick extended)") 415 main_tree.write(outfile=final_tree_file+".nw") 416 main_tree.write(outfile=final_tree_file+ ".nwx", features=[], 417 format_root_node=True) 418 419 if hasattr(main_tree, "tree_phylip_alg"): 420 log.log(28, "Writing final tree alignment @@13:%s@@1:\n %s", 421 threadname, final_tree_file+".used_alg.fa") 422 423 alg = SeqGroup(get_stored_data(main_tree.tree_phylip_alg), format="iphylip_relaxed") 424 OUT = open(final_tree_file+".used_alg.fa", "w") 425 for name, seq, comments in alg: 426 realname = db.get_seq_name(name) 427 print(">%s\n%s" %(realname, seq), file=OUT) 428 OUT.close() 429 430 431 if hasattr(main_tree, "alg_path"): 432 log.log(28, "Writing root node alignment @@13:%s@@1:\n %s", 433 threadname, final_tree_file+".fa") 434 435 alg = SeqGroup(get_stored_data(main_tree.alg_path)) 436 OUT = open(final_tree_file+".fa", "w") 437 for name, seq, comments in alg: 438 realname = db.get_seq_name(name) 439 print(">%s\n%s" %(realname, seq), file=OUT) 440 OUT.close() 441 442 if hasattr(main_tree, "clean_alg_path"): 443 log.log(28, "Writing root node trimmed alignment @@13:%s@@1:\n %s", 444 threadname, final_tree_file+".trimmed.fa") 445 446 alg = SeqGroup(get_stored_data(main_tree.clean_alg_path)) 447 OUT = open(final_tree_file+".trimmed.fa", "w") 448 for name, seq, comments in alg: 449 realname = db.get_seq_name(name) 450 print(">%s\n%s" %(realname, seq), file=OUT) 451 OUT.close() 452 453 if norender == False: 454 log.log(28, "Generating tree image for @@13:%s@@1:\n %s", 455 threadname, final_tree_file+".png") 456 for lf in main_tree: 457 lf.add_feature("sequence", alg.get_seq(lf.safename)) 458 try: 459 from .visualize import draw_tree 460 draw_tree(main_tree, GLOBALS[configid], final_tree_file+".png") 461 except Exception as e: 462 log.warning('@@8:something went wrong when generating the tree image. Try manually :(@@1:') 463 if DEBUG: 464 import traceback, sys 465 traceback.print_exc(file=sys.stdout) 466 467 just_finished_lines.append("Finished %s in %d iteration(s)" %( 468 threadname, past_threads[configid])) 469 if GLOBALS["email"]: 470 if not pending_tasks: 471 all_lines = finished_lines + just_finished_lines + error_lines 472 send_mail(GLOBALS["email"], "Your NPR process has ended", '\n'.join(all_lines)) 473 474 elif GLOBALS["email_report_time"] and time() - last_report_time >= \ 475 GLOBALS["email_report_time"]: 476 all_lines = info_lines + error_lines + just_finished_lines 477 send_mail(GLOBALS["email"], "Your NPR report", '\n'.join(all_lines)) 478 last_report_time = time() 479 480 elif just_finished_lines: 481 send_mail(GLOBALS["email"], "Finished threads!", 482 '\n'.join(just_finished_lines)) 483 484 log.log(26, "") 485 except: 486 raise 487 488 if thread_errors: 489 log.error("Done with ERRORS") 490 else: 491 log.log(28, "Done") 492 493 return thread_errors 494 495 496def background_job_launcher(job_queue, run_detached, schedule_time, max_cores): 497 running_jobs = {} 498 visited_ids = set() 499 # job_queue = [jid, cores, cmd, status_file] 500 GLOBALS["myid"] = 'back_launcher' 501 finished_states = set("ED") 502 cores_used = 0 503 dups = set() 504 pending_jobs = deque() 505 try: 506 while True: 507 launched = 0 508 done_jobs = set() 509 cores_used = 0 510 for jid, (cores, cmd, st_file, pid) in six.iteritems(running_jobs): 511 process_done = pid.poll() if pid else None 512 try: 513 st = open(st_file).read(1) 514 except IOError: 515 st = "?" 516 #print pid.poll(), pid.pid, st 517 if st in finished_states: 518 done_jobs.add(jid) 519 elif process_done is not None and st == "R": 520 # check if a running job is actually running 521 print("LOST PROCESS", pid, jid) 522 ST=open(st_file, "w"); ST.write("E"); ST.flush(); ST.close() 523 done_jobs.add(jid) 524 else: 525 cores_used += cores 526 527 for d in done_jobs: 528 del running_jobs[d] 529 530 cores_avail = max_cores - cores_used 531 for i in range(cores_avail): 532 try: 533 jid, cores, cmd, st_file = job_queue.get(False) 534 except QueueEmpty: 535 pass 536 else: 537 pending_jobs.append([jid, cores, cmd, st_file]) 538 539 if pending_jobs and pending_jobs[0][1] <= cores_avail: 540 jid, cores, cmd, st_file = pending_jobs.popleft() 541 if jid in visited_ids: 542 dups.add(jid) 543 print("DUPLICATED execution!!!!!!!!!!!! This should not occur!", jid) 544 continue 545 elif pending_jobs: 546 log.log(28, "@@8:waiting for %s cores" %pending_jobs[0][1]) 547 break 548 else: 549 break 550 551 ST=open(st_file, "w"); ST.write("R"); ST.flush(); ST.close() 552 try: 553 if run_detached: 554 cmd += " &" 555 running_proc = None 556 subprocess.call(cmd, shell=True) 557 else: 558 # create a process group, so I can kill the thread if necessary 559 running_proc = subprocess.Popen(cmd, shell=True, preexec_fn=os.setsid) 560 561 except Exception as e: 562 print(e) 563 ST=open(st_file, "w"); ST.write("E"); ST.flush(); ST.close() 564 else: 565 launched += 1 566 running_jobs[jid] = [cores, cmd, st_file, running_proc] 567 cores_avail -= cores 568 cores_used += cores 569 visited_ids.add(jid) 570 try: 571 waiting_jobs = job_queue.qsize() + len(pending_jobs) 572 except NotImplementedError: # OSX does not support qsize 573 waiting_jobs = len(pending_jobs) 574 575 log.log(28, "@@8:Launched@@1: %s jobs. %d(R), %s(W). Cores usage: %s/%s", 576 launched, len(running_jobs), waiting_jobs, cores_used, max_cores) 577 for _d in dups: 578 print("duplicate bug", _d) 579 580 sleep(schedule_time) 581 except: 582 if len(running_jobs): 583 print(' Killing %s running jobs...' %len(running_jobs), file=sys.stderr) 584 for jid, (cores, cmd, st_file, pid) in six.iteritems(running_jobs): 585 if pid: 586 #print >>sys.stderr, ".", 587 #sys.stderr.flush() 588 try: 589 os.killpg(pid.pid, signal.SIGTERM) 590 except: 591 print("Ooops, the process", pid.pid, "could not be terminated!") 592 pass 593 try: 594 open(st_file, "w").write("E") 595 except: 596 print("Ooops,", st_file, "could not be labeled as Error task. Please remove file before resuming the analysis.") 597 598 sys.exit(0) 599 600 601def launch_detached_process(cmd): 602 os.system(cmd) 603 604def color_status(status): 605 if status == "D": 606 stcolor = "@@06:" 607 elif status == "E": 608 stcolor = "@@03:" 609 elif status == "R": 610 stcolor = "@@05:" 611 else: 612 stcolor = "" 613 return "%s%s@@1:" %(stcolor, status) 614 615def show_task_info(task): 616 log.log(26, "") 617 set_logindent(1) 618 log.log(28, "(%s) %s" % (color_status(task.status), task)) 619 logindent(2) 620 st_info = ', '.join(["%d(%s)" % (v, k) for k, v in 621 six.iteritems(task.job_status)]) 622 log.log(26, "%d jobs: %s" %(len(task.jobs), st_info)) 623 tdir = task.taskid 624 tdir = tdir.lstrip("/") 625 log.log(20, "TaskDir: %s" %tdir) 626 if task.status == "L": 627 logindent(-2) 628 log.warning("Some jobs within the task [%s] are marked as (L)ost," 629 " meaning that although they look as running," 630 " its execution could not be tracked. NPR will" 631 " continue execution with other pending tasks." 632 %task) 633 logindent(2) 634 logindent(2) 635 # Shows details about jobs 636 for j in task.jobs: 637 if j.status == "D": 638 log.log(20, "(%s): %s", j.status, j) 639 else: 640 log.log(24, "(%s): %s", j.status, j) 641 logindent(-2) 642 643 644def check_cores(j, cores_used, cores_total, execution): 645 if j.cores > cores_total: 646 raise ConfigError("Job [%s] is trying to be executed using [%d] cores." 647 " However, the program is limited to [%d] core(s)." 648 " Use the --multicore option to enable more cores." % 649 (j, j.cores, cores_total)) 650 elif execution =="insitu" and j.cores > cores_total-cores_used: 651 log.log(22, "Job [%s] awaiting [%d] core(s)" 652 % (j, j.cores)) 653 return False 654 else: 655 return True 656 657def launch_detached(cmd): 658 pid1 = os.fork() 659 if pid1 == 0: 660 pid2 = os.fork() 661 662 if pid2 == 0: 663 os.setsid() 664 pid3 = os.fork() 665 if pid3 == 0: 666 os.chdir("/") 667 os.umask(0) 668 P = subprocess.Popen(cmd, shell=True) 669 P.wait() 670 os._exit(0) 671 else: 672 # exit() or _exit()? See below. 673 os._exit(0) 674 else: 675 # exit() or _exit()? 676 # _exit is like exit(), but it doesn't call any functions registered 677 # with atexit (and on_exit) or any registered signal handlers. It also 678 # closes any open file descriptors. Using exit() may cause all stdio 679 # streams to be flushed twice and any temporary files may be unexpectedly 680 # removed. It's therefore recommended that child branches of a fork() 681 # and the parent branch(es) of a daemon use _exit(). 682 os._exit(0) 683 else: 684 return 685 686