1 /*
2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3  *                         University Research and Technology
4  *                         Corporation.  All rights reserved.
5  * Copyright (c) 2004-2008 The University of Tennessee and The University
6  *                         of Tennessee Research Foundation.  All rights
7  *                         reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                         University of Stuttgart.  All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  *                         All rights reserved.
12  * Copyright (c) 2007-2013 Los Alamos National Security, LLC.  All rights
13  *                         reserved.
14  * Copyright (c) 2008      Institut National de Recherche en Informatique
15  *                         et Automatique. All rights reserved.
16  * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
17  * $COPYRIGHT$
18  *
19  * Additional copyrights may follow
20  *
21  * $HEADER$
22  */
23 
24 
25 #include "orte_config.h"
26 
27 #include <string.h>
28 #include <assert.h>
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #ifdef HAVE_SYS_QUEUE_H
33 #include <sys/queue.h>
34 #endif
35 #include <errno.h>
36 #ifdef HAVE_SYS_TIME_H
37 #include <sys/time.h>
38 #endif
39 #ifdef HAVE_SYS_TYPES_H
40 #include <sys/types.h>
41 #endif
42 #include <fcntl.h>
43 #include <stdlib.h>
44 #include <signal.h>
45 #include <stdio.h>
46 #include <sys/stat.h>
47 #ifdef HAVE_SYS_WAIT_H
48 #include <sys/wait.h>
49 #endif
50 
51 #include "opal/dss/dss_types.h"
52 #include "opal/class/opal_object.h"
53 #include "opal/util/output.h"
54 #include "opal/class/opal_list.h"
55 #include "opal/mca/event/event.h"
56 #include "opal/threads/mutex.h"
57 #include "opal/threads/condition.h"
58 #include "opal/sys/atomic.h"
59 
60 #include "orte/constants.h"
61 #include "orte/mca/errmgr/errmgr.h"
62 #include "orte/util/name_fns.h"
63 #include "orte/util/threads.h"
64 #include "orte/runtime/orte_globals.h"
65 
66 #include "orte/runtime/orte_wait.h"
67 
68 /* Timer Object Declaration */
timer_const(orte_timer_t * tm)69 static void timer_const(orte_timer_t *tm)
70 {
71     tm->ev = opal_event_alloc();
72     tm->payload = NULL;
73 }
timer_dest(orte_timer_t * tm)74 static void timer_dest(orte_timer_t *tm)
75 {
76     opal_event_free(tm->ev);
77 }
78 OBJ_CLASS_INSTANCE(orte_timer_t,
79                    opal_object_t,
80                    timer_const,
81                    timer_dest);
82 
83 /* Local objects */
84 typedef struct {
85     opal_list_item_t super;
86     opal_event_t ev;
87     orte_proc_t *child;
88     orte_wait_fn_t cbfunc;
89     void *cbdata;
90 } orte_wait_tracker_t;
wccon(orte_wait_tracker_t * p)91 static void wccon(orte_wait_tracker_t *p)
92 {
93     p->child = NULL;
94     p->cbfunc = NULL;
95     p->cbdata = NULL;
96 }
wcdes(orte_wait_tracker_t * p)97 static void wcdes(orte_wait_tracker_t *p)
98 {
99     if (NULL != p->child) {
100         OBJ_RELEASE(p->child);
101     }
102 }
103 static OBJ_CLASS_INSTANCE(orte_wait_tracker_t,
104                           opal_list_item_t,
105                           wccon, wcdes);
106 
107 /* Local Variables */
108 static opal_event_t handler;
109 static opal_list_t pending_cbs;
110 
111 /* Local Function Prototypes */
112 static void wait_signal_callback(int fd, short event, void *arg);
113 
114 /* Interface Functions */
115 
orte_wait_disable(void)116 void orte_wait_disable(void)
117 {
118     opal_event_del(&handler);
119 }
120 
orte_wait_enable(void)121 void orte_wait_enable(void)
122 {
123     opal_event_add(&handler, NULL);
124 }
125 
orte_wait_init(void)126 int orte_wait_init(void)
127 {
128     OBJ_CONSTRUCT(&pending_cbs, opal_list_t);
129 
130     opal_event_set(orte_event_base,
131                    &handler, SIGCHLD, OPAL_EV_SIGNAL|OPAL_EV_PERSIST,
132                    wait_signal_callback,
133                    &handler);
134     opal_event_set_priority(&handler, ORTE_SYS_PRI);
135 
136     opal_event_add(&handler, NULL);
137     return ORTE_SUCCESS;
138 }
139 
140 
orte_wait_finalize(void)141 int orte_wait_finalize(void)
142 {
143     opal_event_del(&handler);
144 
145     /* clear out the pending cbs */
146     OPAL_LIST_DESTRUCT(&pending_cbs);
147 
148     return ORTE_SUCCESS;
149 }
150 
151 /* this function *must* always be called from
152  * within an event in the orte_event_base */
orte_wait_cb(orte_proc_t * child,orte_wait_fn_t callback,void * data)153 void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data)
154 {
155     orte_wait_tracker_t *t2;
156 
157     if (NULL == child || NULL == callback) {
158         /* bozo protection */
159         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
160         return;
161     }
162 
163     /* see if this proc is still alive */
164     if (!ORTE_FLAG_TEST(child, ORTE_PROC_FLAG_ALIVE)) {
165         /* already heard this proc is dead, so just do the callback */
166         callback(child, data);
167         return;
168     }
169 
170    /* we just override any existing registration */
171     OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
172         if (t2->child == child) {
173             t2->cbfunc = callback;
174             t2->cbdata = data;
175             return;
176         }
177     }
178     /* get here if this is a new registration */
179     t2 = OBJ_NEW(orte_wait_tracker_t);
180     OBJ_RETAIN(child);  // protect against race conditions
181     t2->child = child;
182     t2->cbfunc = callback;
183     t2->cbdata = data;
184     opal_list_append(&pending_cbs, &t2->super);
185 }
186 
cancel_callback(int fd,short args,void * cbdata)187 static void cancel_callback(int fd, short args, void *cbdata)
188 {
189     orte_wait_tracker_t *trk = (orte_wait_tracker_t*)cbdata;
190     orte_wait_tracker_t *t2;
191 
192     ORTE_ACQUIRE_OBJECT(trk);
193 
194     OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
195         if (t2->child == trk->child) {
196             opal_list_remove_item(&pending_cbs, &t2->super);
197             OBJ_RELEASE(t2);
198             OBJ_RELEASE(trk);
199             return;
200         }
201     }
202 
203     OBJ_RELEASE(trk);
204 }
205 
orte_wait_cb_cancel(orte_proc_t * child)206 void orte_wait_cb_cancel(orte_proc_t *child)
207 {
208     orte_wait_tracker_t *trk;
209 
210     if (NULL == child) {
211         /* bozo protection */
212         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
213         return;
214     }
215 
216     /* push this into the event library for handling */
217     trk = OBJ_NEW(orte_wait_tracker_t);
218     OBJ_RETAIN(child);  // protect against race conditions
219     trk->child = child;
220     ORTE_THREADSHIFT(trk, orte_event_base, cancel_callback, ORTE_SYS_PRI);
221 }
222 
223 
224 /* callback from the event library whenever a SIGCHLD is received */
wait_signal_callback(int fd,short event,void * arg)225 static void wait_signal_callback(int fd, short event, void *arg)
226 {
227     opal_event_t *signal = (opal_event_t*) arg;
228     int status;
229     pid_t pid;
230     orte_wait_tracker_t *t2;
231 
232     ORTE_ACQUIRE_OBJECT(signal);
233 
234     if (SIGCHLD != OPAL_EVENT_SIGNAL(signal)) {
235         return;
236     }
237 
238     /* we can have multiple children leave but only get one
239      * sigchild callback, so reap all the waitpids until we
240      * don't get anything valid back */
241     while (1) {
242         pid = waitpid(-1, &status, WNOHANG);
243         if (-1 == pid && EINTR == errno) {
244             /* try it again */
245             continue;
246         }
247         /* if we got garbage, then nothing we can do */
248         if (pid <= 0) {
249             return;
250         }
251 
252         /* we are already in an event, so it is safe to access the list */
253         OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
254             if (pid == t2->child->pid) {
255                 /* found it! */
256                 t2->child->exit_code = status;
257                 if (NULL != t2->cbfunc) {
258                     t2->cbfunc(t2->child, t2->cbdata);
259                 }
260                 opal_list_remove_item(&pending_cbs, &t2->super);
261                 OBJ_RELEASE(t2);
262                 break;
263             }
264         }
265     }
266 }
267