1# #START_LICENSE###########################################################
2#
3#
4# This file is part of the Environment for Tree Exploration program
5# (ETE).  http://etetoolkit.org
6#
7# ETE is free software: you can redistribute it and/or modify it
8# under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# ETE is distributed in the hope that it will be useful, but WITHOUT
13# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
15# License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with ETE.  If not, see <http://www.gnu.org/licenses/>.
19#
20#
21#                     ABOUT THE ETE PACKAGE
22#                     =====================
23#
24# ETE is distributed under the GPL copyleft license (2008-2015).
25#
26# If you make use of ETE in published work, please cite:
27#
28# Jaime Huerta-Cepas, Joaquin Dopazo and Toni Gabaldon.
29# ETE: a python Environment for Tree Exploration. Jaime BMC
30# Bioinformatics 2010,:24doi:10.1186/1471-2105-11-24
31#
32# Note that extra references to the specific methods implemented in
33# the toolkit may be available in the documentation.
34#
35# More info at http://etetoolkit.org. Contact: huerta@embl.de
36#
37#
38# #END_LICENSE#############################################################
39from __future__ import absolute_import
40from __future__ import print_function
41
42import sys
43import os
44import signal
45import subprocess
46from multiprocessing import Process, Queue
47from six.moves.queue import Empty as QueueEmpty
48from time import sleep, ctime, time
49from collections import defaultdict, deque
50import re
51import logging
52import six
53from six.moves import map
54from six.moves import range
55log = logging.getLogger("main")
56
57from . import db
58from .errors import ConfigError, TaskError
59from .logger import set_logindent, logindent, get_logindent
60from .utils import (generate_id, PhyloTree, NodeStyle, Tree,
61                    DEBUG, NPR_TREE_STYLE, faces, GLOBALS,
62                    basename, pjoin, ask, send_mail, pid_up, SeqGroup, cmp)
63from .master_task import (isjob, update_task_states_recursively,
64                          store_task_data_recursively,
65                          remove_task_dir_recursively,
66                          update_job_status)
67from .workflow.common import assembly_tree, get_cmd_log
68
69def cmp_to_key(mycmp):
70    'Convert a cmp= function into a key= function'
71    class K:
72        def __init__(self, obj, *args):
73            self.obj = obj
74        def __lt__(self, other):
75            return mycmp(self.obj, other.obj) < 0
76        def __gt__(self, other):
77            return mycmp(self.obj, other.obj) > 0
78        def __eq__(self, other):
79            return mycmp(self.obj, other.obj) == 0
80        def __le__(self, other):
81            return mycmp(self.obj, other.obj) <= 0
82        def __ge__(self, other):
83            return mycmp(self.obj, other.obj) >= 0
84        def __ne__(self, other):
85            return mycmp(self.obj, other.obj) != 0
86    return K
87
88def debug(_signal, _frame):
89    import pdb
90    pdb.set_trace()
91
92def control_c(_signal, _frame):
93    signal.signal(signal.SIGINT, signal.SIG_IGN)
94    db.commit()
95
96    ver = {28: "0", 26: "1", 24: "2", 22: "3", 20: "4", 10: "5"}
97    ver_level = log.level
98
99    print('\n\nYou pressed Ctrl+C!')
100    print('q) quit')
101    print('v) change verbosity level:', ver.get(ver_level, ver_level))
102    print('d) enter debug mode')
103    print('c) continue execution')
104    key = ask("   Choose:", ["q", "v", "d", "c"])
105    if key == "q":
106        raise KeyboardInterrupt
107    elif key == "d":
108        signal.signal(signal.SIGALRM, debug)
109        signal.alarm(1)
110        return
111    elif key == "v":
112        vl = ask("new level", sorted(ver.values()))
113        new_level = sorted(list(ver.keys()), reverse=True)[int(vl)]
114        log.setLevel(new_level)
115    elif key == "d":
116        import pdb
117        pdb.set_trace()
118    signal.signal(signal.SIGINT, control_c)
119
120def sort_tasks(x, y):
121    priority = {
122        "treemerger": 1,
123        "tree": 2,
124        "mchooser": 3,
125        "alg": 4,
126        "concat_alg": 5,
127        "acleaner": 6,
128        "msf":7,
129        "cog_selector":8}
130
131    x_type_prio = priority.get(x.ttype, 100)
132    y_type_prio = priority.get(y.ttype, 100)
133
134    prio_cmp = cmp(x_type_prio, y_type_prio)
135    if prio_cmp == 0:
136        x_size = getattr(x, "size", 0)
137        y_size = getattr(y, "size", 0)
138        size_cmp = cmp(x_size, y_size) * -1
139        if size_cmp == 0:
140            return cmp(x.threadid, y.threadid)
141        else:
142            return size_cmp
143    else:
144        return prio_cmp
145
146def get_stored_data(fileid):
147    try:
148        _tid, _did = fileid.split(".")
149        _did = int(_did)
150    except (IndexError, ValueError):
151        dataid = fileid
152    else:
153        dataid = db.get_dataid(_tid, _did)
154    return db.get_data(dataid)
155
156def schedule(workflow_task_processor, pending_tasks, schedule_time, execution, debug, norender):
157    # Adjust debug mode
158    if debug == "all":
159        log.setLevel(10)
160    pending_tasks = set(pending_tasks)
161
162    ## ===================================
163    ## INITIALIZE BASIC VARS
164    execution, run_detached = execution
165    thread2tasks = defaultdict(list)
166    for task in pending_tasks:
167        thread2tasks[task.configid].append(task)
168    expected_threads = set(thread2tasks.keys())
169    past_threads = {}
170    thread_errors = defaultdict(list)
171    ## END OF VARS AND SHORTCUTS
172    ## ===================================
173
174    cores_total = GLOBALS["_max_cores"]
175    if cores_total > 0:
176        job_queue = Queue()
177
178        back_launcher = Process(target=background_job_launcher,
179                                args=(job_queue, run_detached,
180                                      GLOBALS["launch_time"], cores_total))
181        back_launcher.start()
182    else:
183        job_queue = None
184        back_launcher = None
185
186    GLOBALS["_background_scheduler"] = back_launcher
187    GLOBALS["_job_queue"] = job_queue
188    # Captures Ctrl-C for debuging DEBUG
189    #signal.signal(signal.SIGINT, control_c)
190
191    last_report_time = None
192
193    BUG = set()
194    try:
195        # Enters into task scheduling
196        while pending_tasks:
197            wtime = schedule_time
198
199            # ask SGE for running jobs
200            if execution == "sge":
201                #sgeid2jobs = db.get_sge_tasks()
202                #qstat_jobs = sge.qstat()
203                pass
204            else:
205                qstat_jobs = None
206
207            # Show summary of pending tasks per thread
208            thread2tasks = defaultdict(list)
209            for task in pending_tasks:
210                thread2tasks[task.configid].append(task)
211            set_logindent(0)
212            log.log(28, "@@13: Updating tasks status:@@1: (%s)" % (ctime()))
213            info_lines = []
214            for tid, tlist in six.iteritems(thread2tasks):
215                threadname = GLOBALS[tid]["_name"]
216                sizelist = ["%s" %getattr(_ts, "size", "?") for _ts in tlist]
217                info = "Thread @@13:%s@@1:: pending tasks: @@8:%s@@1: of sizes: %s" %(
218                    threadname, len(tlist), ', '.join(sizelist))
219                info_lines.append(info)
220
221            for line in info_lines:
222                log.log(28, line)
223
224            if GLOBALS["email"]  and last_report_time is None:
225                last_report_time = time()
226                send_mail(GLOBALS["email"], "Your NPR process has started", '\n'.join(info_lines))
227
228            ## ================================
229            ## CHECK AND UPDATE CURRENT TASKS
230            checked_tasks = set()
231            check_start_time = time()
232            to_add_tasks = set()
233
234            GLOBALS["cached_status"] = {}
235            for task in sorted(pending_tasks, key=cmp_to_key(sort_tasks)):
236                # Avoids endless periods without new job submissions
237                elapsed_time = time() - check_start_time
238                #if not back_launcher and pending_tasks and \
239                #        elapsed_time > schedule_time * 2:
240                #    log.log(26, "@@8:Interrupting task checks to schedule new jobs@@1:")
241                #    db.commit()
242                #    wtime = launch_jobs(sorted(pending_tasks, sort_tasks),
243                #                        execution, run_detached)
244                #    check_start_time = time()
245
246                # Enter debuging mode if necessary
247                if debug and log.level > 10 and task.taskid.startswith(debug):
248                    log.setLevel(10)
249                    log.debug("ENTERING IN DEBUGGING MODE")
250                thread2tasks[task.configid].append(task)
251
252                # Update tasks and job statuses
253
254                if task.taskid not in checked_tasks:
255                    try:
256                        show_task_info(task)
257                        task.status = task.get_status(qstat_jobs)
258                        db.dataconn.commit()
259                        if back_launcher and task.status not in set("DE"):
260                            for j, cmd in task.iter_waiting_jobs():
261                                j.status = "Q"
262                                GLOBALS["cached_status"][j.jobid] = "Q"
263                                if j.jobid not in BUG:
264                                    if not os.path.exists(j.jobdir):
265                                        os.makedirs(j.jobdir)
266                                    for ifile, outpath in six.iteritems(j.input_files):
267                                        try:
268                                            _tid, _did = ifile.split(".")
269                                            _did = int(_did)
270                                        except (IndexError, ValueError):
271                                            dataid = ifile
272                                        else:
273                                            dataid = db.get_dataid(_tid, _did)
274
275                                        if not outpath:
276                                            outfile = pjoin(GLOBALS["input_dir"], ifile)
277                                        else:
278                                            outfile = pjoin(outpath, ifile)
279
280                                        if not os.path.exists(outfile):
281                                            open(outfile, "w").write(db.get_data(dataid))
282
283                                    log.log(24, "  @@8:Queueing @@1: %s from %s" %(j, task))
284                                    if execution:
285                                        with open(pjoin(GLOBALS[task.configid]["_outpath"], "commands.log"), "a") as CMD_LOGGER:
286                                            print('\t'.join([task.tname, task.taskid, j.jobname, j.jobid, j.get_launch_cmd()]), file=CMD_LOGGER)
287
288                                        job_queue.put([j.jobid, j.cores, cmd, j.status_file])
289                                BUG.add(j.jobid)
290
291                        update_task_states_recursively(task)
292                        db.commit()
293                        checked_tasks.add(task.taskid)
294                    except TaskError as e:
295                        log.error("Errors found in %s" %task)
296                        import traceback
297                        traceback.print_exc()
298                        if GLOBALS["email"]:
299                            threadname = GLOBALS[task.configid]["_name"]
300                            send_mail(GLOBALS["email"], "Errors found in %s!" %threadname,
301                                      '\n'.join(map(str, [task, e.value, e.msg])))
302                        pending_tasks.discard(task)
303                        thread_errors[task.configid].append([task, e.value, e.msg])
304                        continue
305                else:
306                    # Set temporary Queued state to avoids launching
307                    # jobs from clones
308                    task.status = "Q"
309                    if log.level < 24:
310                        show_task_info(task)
311
312                if task.status == "D":
313                    #db.commit()
314                    show_task_info(task)
315                    logindent(3)
316
317
318                    # Log commands of every task
319                    # if 'cmd_log_file' not in GLOBALS[task.configid]:
320                    #      GLOBALS[task.configid]['cmd_log_file'] = pjoin(GLOBALS[task.configid]["_outpath"], "cmd.log")
321                    #      O = open(GLOBALS[task.configid]['cmd_log_file'], "w")
322                    #      O.close()
323
324                    # cmd_lines =  get_cmd_log(task)
325                    # CMD_LOG = open(GLOBALS[task.configid]['cmd_log_file'], "a")
326                    # print(task, file=CMD_LOG)
327                    # for c in cmd_lines:
328                    #     print('   '+'\t'.join(map(str, c)), file=CMD_LOG)
329                    # CMD_LOG.close()
330                    #
331
332                    try:
333                        #wkname = GLOBALS[task.configid]['_name']
334                        create_tasks = workflow_task_processor(task, task.target_wkname)
335                    except TaskError as e:
336                        log.error("Errors found in %s" %task)
337                        pending_tasks.discard(task)
338                        thread_errors[task.configid].append([task, e.value, e.msg])
339                        continue
340                    else:
341                        logindent(-3)
342
343                        to_add_tasks.update(create_tasks)
344                        pending_tasks.discard(task)
345
346                elif task.status == "E":
347                    log.error("task contains errors: %s " %task)
348                    log.error("Errors found in %s")
349                    pending_tasks.discard(task)
350                    thread_errors[task.configid].append([task, None, "Found (E) task status"])
351
352            #db.commit()
353            #if not back_launcher:
354            #    wtime = launch_jobs(sorted(pending_tasks, sort_tasks),
355            #                    execution, run_detached)
356
357            # Update global task list with recently added jobs to be check
358            # during next cycle
359            pending_tasks.update(to_add_tasks)
360
361            ## END CHECK AND UPDATE CURRENT TASKS
362            ## ================================
363
364            if wtime:
365                set_logindent(0)
366                log.log(28, "@@13:Waiting %s seconds@@1:" %wtime)
367                sleep(wtime)
368            else:
369                sleep(schedule_time)
370
371            # Dump / show ended threads
372            error_lines = []
373            for configid, etasks in six.iteritems(thread_errors):
374                error_lines.append("Thread @@10:%s@@1: contains errors:" %\
375                            (GLOBALS[configid]["_name"]))
376                for error in etasks:
377                    error_lines.append(" ** %s" %error[0])
378                    e_obj = error[1] if error[1] else error[0]
379                    error_path = e_obj.jobdir if isjob(e_obj) else e_obj.taskid
380                    if e_obj is not error[0]:
381                        error_lines.append("      -> %s" %e_obj)
382                    error_lines.append("      -> %s" %error_path)
383                    error_lines.append("        -> %s" %error[2])
384            for eline in error_lines:
385                log.error(eline)
386
387            pending_threads = set([ts.configid for ts in pending_tasks])
388            finished_threads = expected_threads - (pending_threads | set(thread_errors.keys()))
389            just_finished_lines = []
390            finished_lines = []
391            for configid in finished_threads:
392                # configid is the the same as threadid in master tasks
393                final_tree_file = pjoin(GLOBALS[configid]["_outpath"],
394                                        GLOBALS["inputname"] + ".final_tree")
395                threadname = GLOBALS[configid]["_name"]
396
397                if configid in past_threads:
398                    log.log(28, "Done thread @@12:%s@@1: in %d iteration(s)",
399                            threadname, past_threads[configid])
400                    finished_lines.append("Finished %s in %d iteration(s)" %(
401                            threadname, past_threads[configid]))
402                else:
403
404                    log.log(28, "Assembling final tree...")
405                    main_tree, treeiters =  assembly_tree(configid)
406                    past_threads[configid] = treeiters - 1
407
408                    log.log(28, "Done thread @@12:%s@@1: in %d iteration(s)",
409                            threadname, past_threads[configid])
410
411
412                    log.log(28, "Writing final tree for @@13:%s@@1:\n   %s\n   %s",
413                            threadname, final_tree_file+".nw",
414                            final_tree_file+".nwx (newick extended)")
415                    main_tree.write(outfile=final_tree_file+".nw")
416                    main_tree.write(outfile=final_tree_file+ ".nwx", features=[],
417                                    format_root_node=True)
418
419                    if hasattr(main_tree, "tree_phylip_alg"):
420                        log.log(28, "Writing final tree alignment @@13:%s@@1:\n   %s",
421                                threadname, final_tree_file+".used_alg.fa")
422
423                        alg = SeqGroup(get_stored_data(main_tree.tree_phylip_alg), format="iphylip_relaxed")
424                        OUT = open(final_tree_file+".used_alg.fa", "w")
425                        for name, seq, comments in alg:
426                            realname = db.get_seq_name(name)
427                            print(">%s\n%s" %(realname, seq), file=OUT)
428                        OUT.close()
429
430
431                    if hasattr(main_tree, "alg_path"):
432                        log.log(28, "Writing root node alignment @@13:%s@@1:\n   %s",
433                                threadname, final_tree_file+".fa")
434
435                        alg = SeqGroup(get_stored_data(main_tree.alg_path))
436                        OUT = open(final_tree_file+".fa", "w")
437                        for name, seq, comments in alg:
438                            realname = db.get_seq_name(name)
439                            print(">%s\n%s" %(realname, seq), file=OUT)
440                        OUT.close()
441
442                    if hasattr(main_tree, "clean_alg_path"):
443                        log.log(28, "Writing root node trimmed alignment @@13:%s@@1:\n   %s",
444                                threadname, final_tree_file+".trimmed.fa")
445
446                        alg = SeqGroup(get_stored_data(main_tree.clean_alg_path))
447                        OUT = open(final_tree_file+".trimmed.fa", "w")
448                        for name, seq, comments in alg:
449                            realname = db.get_seq_name(name)
450                            print(">%s\n%s" %(realname, seq), file=OUT)
451                        OUT.close()
452
453                    if norender == False:
454                        log.log(28, "Generating tree image for @@13:%s@@1:\n   %s",
455                                threadname, final_tree_file+".png")
456                        for lf in main_tree:
457                            lf.add_feature("sequence", alg.get_seq(lf.safename))
458                        try:
459                            from .visualize import draw_tree
460                            draw_tree(main_tree, GLOBALS[configid], final_tree_file+".png")
461                        except Exception as e:
462                            log.warning('@@8:something went wrong when generating the tree image. Try manually :(@@1:')
463                            if DEBUG:
464                                import traceback, sys
465                                traceback.print_exc(file=sys.stdout)
466
467                    just_finished_lines.append("Finished %s in %d iteration(s)" %(
468                            threadname, past_threads[configid]))
469            if GLOBALS["email"]:
470                if not pending_tasks:
471                    all_lines = finished_lines + just_finished_lines + error_lines
472                    send_mail(GLOBALS["email"], "Your NPR process has ended", '\n'.join(all_lines))
473
474                elif GLOBALS["email_report_time"] and time() - last_report_time >= \
475                        GLOBALS["email_report_time"]:
476                    all_lines = info_lines + error_lines + just_finished_lines
477                    send_mail(GLOBALS["email"], "Your NPR report", '\n'.join(all_lines))
478                    last_report_time = time()
479
480                elif just_finished_lines:
481                    send_mail(GLOBALS["email"], "Finished threads!",
482                              '\n'.join(just_finished_lines))
483
484            log.log(26, "")
485    except:
486        raise
487
488    if thread_errors:
489        log.error("Done with ERRORS")
490    else:
491        log.log(28, "Done")
492
493    return thread_errors
494
495
496def background_job_launcher(job_queue, run_detached, schedule_time, max_cores):
497    running_jobs = {}
498    visited_ids = set()
499    # job_queue = [jid, cores, cmd, status_file]
500    GLOBALS["myid"] = 'back_launcher'
501    finished_states = set("ED")
502    cores_used = 0
503    dups = set()
504    pending_jobs = deque()
505    try:
506        while True:
507            launched = 0
508            done_jobs = set()
509            cores_used = 0
510            for jid, (cores, cmd, st_file, pid) in six.iteritems(running_jobs):
511                process_done = pid.poll() if pid else None
512                try:
513                    st = open(st_file).read(1)
514                except IOError:
515                    st = "?"
516                #print pid.poll(), pid.pid, st
517                if st in finished_states:
518                     done_jobs.add(jid)
519                elif process_done is not None and st == "R":
520                    # check if a running job is actually running
521                    print("LOST PROCESS", pid, jid)
522                    ST=open(st_file, "w"); ST.write("E"); ST.flush(); ST.close()
523                    done_jobs.add(jid)
524                else:
525                    cores_used += cores
526
527            for d in done_jobs:
528                del running_jobs[d]
529
530            cores_avail = max_cores - cores_used
531            for i in range(cores_avail):
532                try:
533                    jid, cores, cmd, st_file = job_queue.get(False)
534                except QueueEmpty:
535                    pass
536                else:
537                    pending_jobs.append([jid, cores, cmd, st_file])
538
539                if pending_jobs and pending_jobs[0][1] <= cores_avail:
540                    jid, cores, cmd, st_file = pending_jobs.popleft()
541                    if jid in visited_ids:
542                        dups.add(jid)
543                        print("DUPLICATED execution!!!!!!!!!!!! This should not occur!", jid)
544                        continue
545                elif pending_jobs:
546                    log.log(28, "@@8:waiting for %s cores" %pending_jobs[0][1])
547                    break
548                else:
549                    break
550
551                ST=open(st_file, "w"); ST.write("R"); ST.flush(); ST.close()
552                try:
553                    if run_detached:
554                        cmd += " &"
555                        running_proc = None
556                        subprocess.call(cmd, shell=True)
557                    else:
558                        # create a process group, so I can kill the thread if necessary
559                        running_proc = subprocess.Popen(cmd, shell=True, preexec_fn=os.setsid)
560
561                except Exception as e:
562                    print(e)
563                    ST=open(st_file, "w"); ST.write("E"); ST.flush(); ST.close()
564                else:
565                    launched += 1
566                    running_jobs[jid] = [cores, cmd, st_file, running_proc]
567                    cores_avail -= cores
568                    cores_used += cores
569                    visited_ids.add(jid)
570            try:
571                waiting_jobs = job_queue.qsize() + len(pending_jobs)
572            except NotImplementedError: # OSX does not support qsize
573                waiting_jobs = len(pending_jobs)
574
575            log.log(28, "@@8:Launched@@1: %s jobs. %d(R), %s(W). Cores usage: %s/%s",
576                    launched, len(running_jobs), waiting_jobs, cores_used, max_cores)
577            for _d in dups:
578                print("duplicate bug", _d)
579
580            sleep(schedule_time)
581    except:
582        if len(running_jobs):
583            print(' Killing %s running jobs...' %len(running_jobs), file=sys.stderr)
584            for jid, (cores, cmd, st_file, pid) in six.iteritems(running_jobs):
585                if pid:
586                    #print >>sys.stderr, ".",
587                    #sys.stderr.flush()
588                    try:
589                        os.killpg(pid.pid, signal.SIGTERM)
590                    except:
591                        print("Ooops, the process", pid.pid, "could not be terminated!")
592                        pass
593                    try:
594                        open(st_file, "w").write("E")
595                    except:
596                        print("Ooops,", st_file, "could not be labeled as Error task. Please remove file before resuming the analysis.")
597
598    sys.exit(0)
599
600
601def launch_detached_process(cmd):
602    os.system(cmd)
603
604def color_status(status):
605    if status == "D":
606        stcolor = "@@06:"
607    elif status == "E":
608        stcolor = "@@03:"
609    elif status == "R":
610        stcolor = "@@05:"
611    else:
612        stcolor = ""
613    return "%s%s@@1:" %(stcolor, status)
614
615def show_task_info(task):
616    log.log(26, "")
617    set_logindent(1)
618    log.log(28, "(%s) %s" % (color_status(task.status), task))
619    logindent(2)
620    st_info = ', '.join(["%d(%s)" % (v, k) for k, v in
621                         six.iteritems(task.job_status)])
622    log.log(26, "%d jobs: %s" %(len(task.jobs), st_info))
623    tdir = task.taskid
624    tdir = tdir.lstrip("/")
625    log.log(20, "TaskDir: %s" %tdir)
626    if task.status == "L":
627        logindent(-2)
628        log.warning("Some jobs within the task [%s] are marked as (L)ost,"
629                    " meaning that although they look as running,"
630                    " its execution could not be tracked. NPR will"
631                    " continue execution with other pending tasks."
632                    %task)
633        logindent(2)
634    logindent(2)
635    # Shows details about jobs
636    for j in task.jobs:
637        if j.status == "D":
638            log.log(20, "(%s): %s", j.status, j)
639        else:
640            log.log(24, "(%s): %s", j.status, j)
641    logindent(-2)
642
643
644def check_cores(j, cores_used, cores_total, execution):
645    if j.cores > cores_total:
646        raise ConfigError("Job [%s] is trying to be executed using [%d] cores."
647                          " However, the program is limited to [%d] core(s)."
648                          " Use the --multicore option to enable more cores." %
649                          (j, j.cores, cores_total))
650    elif execution =="insitu" and j.cores > cores_total-cores_used:
651        log.log(22, "Job [%s] awaiting [%d] core(s)"
652                 % (j, j.cores))
653        return False
654    else:
655        return True
656
657def launch_detached(cmd):
658    pid1 = os.fork()
659    if pid1 == 0:
660        pid2 = os.fork()
661
662        if pid2 == 0:
663            os.setsid()
664            pid3 = os.fork()
665            if pid3 == 0:
666                os.chdir("/")
667                os.umask(0)
668                P = subprocess.Popen(cmd, shell=True)
669                P.wait()
670                os._exit(0)
671            else:
672                # exit() or _exit()?  See below.
673                os._exit(0)
674        else:
675            # exit() or _exit()?
676            # _exit is like exit(), but it doesn't call any functions registered
677            # with atexit (and on_exit) or any registered signal handlers.  It also
678            # closes any open file descriptors.  Using exit() may cause all stdio
679            # streams to be flushed twice and any temporary files may be unexpectedly
680            # removed.  It's therefore recommended that child branches of a fork()
681            # and the parent branch(es) of a daemon use _exit().
682            os._exit(0)
683    else:
684        return
685
686