1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3  * Copyright (c) 2014-2020 Intel, Inc.  All rights reserved.
4  * Copyright (c) 2016      Mellanox Technologies, Inc.
5  *                         All rights reserved.
6  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
7  * Copyright (c) 2019      Research Organization for Information Science
8  *                         and Technology (RIST).  All rights reserved.
9  * $COPYRIGHT$
10  *
11  * Additional copyrights may follow
12  *
13  * $HEADER$
14  */
15 #include "src/include/pmix_config.h"
16 
17 #include "src/include/pmix_stdint.h"
18 #include "src/include/pmix_socket_errno.h"
19 
20 #include "include/pmix.h"
21 #include "include/pmix_common.h"
22 #include "include/pmix_server.h"
23 
24 #include "src/threads/threads.h"
25 #include "src/util/argv.h"
26 #include "src/util/error.h"
27 #include "src/util/name_fns.h"
28 #include "src/util/output.h"
29 #include "src/mca/bfrops/bfrops.h"
30 #include "src/mca/ptl/ptl.h"
31 
32 #include "src/client/pmix_client_ops.h"
33 #include "src/server/pmix_server_ops.h"
34 #include "src/include/pmix_globals.h"
35 
relcbfunc(void * cbdata)36 static void relcbfunc(void *cbdata)
37 {
38     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
39 
40     pmix_output_verbose(2, pmix_globals.debug_output,
41                         "pmix:job_ctrl release callback");
42 
43     if (NULL != cd->info) {
44         PMIX_INFO_FREE(cd->info, cd->ninfo);
45     }
46     PMIX_RELEASE(cd);
47 }
query_cbfunc(struct pmix_peer_t * peer,pmix_ptl_hdr_t * hdr,pmix_buffer_t * buf,void * cbdata)48 static void query_cbfunc(struct pmix_peer_t *peer,
49                          pmix_ptl_hdr_t *hdr,
50                          pmix_buffer_t *buf, void *cbdata)
51 {
52     pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
53     pmix_status_t rc;
54     pmix_shift_caddy_t *results;
55     int cnt;
56 
57     pmix_output_verbose(2, pmix_globals.debug_output,
58                         "pmix:job_ctrl cback from server with %d bytes",
59                         (int)buf->bytes_used);
60 
61     /* a zero-byte buffer indicates that this recv is being
62      * completed due to a lost connection */
63     if (PMIX_BUFFER_IS_EMPTY(buf)) {
64         /* release the caller */
65         if (NULL != cd->cbfunc) {
66             cd->cbfunc(PMIX_ERR_COMM_FAILURE, NULL, 0, cd->cbdata, NULL, NULL);
67         }
68         PMIX_RELEASE(cd);
69         return;
70     }
71 
72     results = PMIX_NEW(pmix_shift_caddy_t);
73 
74     /* unpack the status */
75     cnt = 1;
76     PMIX_BFROPS_UNPACK(rc, peer, buf, &results->status, &cnt, PMIX_STATUS);
77     if (PMIX_SUCCESS != rc) {
78         PMIX_ERROR_LOG(rc);
79         goto complete;
80     }
81     if (PMIX_SUCCESS != results->status) {
82         goto complete;
83     }
84 
85     /* unpack any returned data */
86     cnt = 1;
87     PMIX_BFROPS_UNPACK(rc, peer, buf, &results->ninfo, &cnt, PMIX_SIZE);
88     if (PMIX_SUCCESS != rc && PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
89         PMIX_ERROR_LOG(rc);
90         goto complete;
91     }
92     if (0 < results->ninfo) {
93         PMIX_INFO_CREATE(results->info, results->ninfo);
94         cnt = results->ninfo;
95         PMIX_BFROPS_UNPACK(rc, peer, buf, results->info, &cnt, PMIX_INFO);
96         if (PMIX_SUCCESS != rc) {
97             PMIX_ERROR_LOG(rc);
98             goto complete;
99         }
100     }
101 
102   complete:
103     pmix_output_verbose(2, pmix_globals.debug_output,
104                         "pmix:job_ctrl cback from server releasing");
105     /* release the caller */
106     if (NULL != cd->cbfunc) {
107         cd->cbfunc(results->status, results->info, results->ninfo, cd->cbdata, relcbfunc, results);
108     } else {
109         PMIX_RELEASE(results);
110     }
111     PMIX_RELEASE(cd);
112 }
113 
acb(pmix_status_t status,pmix_info_t * info,size_t ninfo,void * cbdata,pmix_release_cbfunc_t release_fn,void * release_cbdata)114 static void acb(pmix_status_t status,
115                 pmix_info_t *info, size_t ninfo,
116                 void *cbdata,
117                 pmix_release_cbfunc_t release_fn,
118                 void *release_cbdata)
119 {
120     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
121     size_t n;
122 
123     cb->status = status;
124     if (0 < ninfo) {
125         PMIX_INFO_CREATE(cb->info, ninfo);
126         cb->ninfo = ninfo;
127         for (n=0; n < ninfo; n++) {
128             PMIX_INFO_XFER(&cb->info[n], &info[n]);
129         }
130     }
131     if (NULL != release_fn) {
132         release_fn(release_cbdata);
133     }
134     PMIX_WAKEUP_THREAD(&cb->lock);
135 }
136 
PMIx_Job_control(const pmix_proc_t targets[],size_t ntargets,const pmix_info_t directives[],size_t ndirs)137 PMIX_EXPORT pmix_status_t PMIx_Job_control(const pmix_proc_t targets[], size_t ntargets,
138                                            const pmix_info_t directives[], size_t ndirs)
139 {
140     pmix_cb_t cb;
141     pmix_status_t rc;
142 
143     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
144 
145     if (pmix_globals.init_cntr <= 0) {
146         PMIX_RELEASE_THREAD(&pmix_global_lock);
147         return PMIX_ERR_INIT;
148     }
149     PMIX_RELEASE_THREAD(&pmix_global_lock);
150 
151     pmix_output_verbose(2, pmix_globals.debug_output,
152                         "%s pmix:job_ctrl", PMIX_NAME_PRINT(&pmix_globals.myid));
153 
154     /* create a callback object as we need to pass it to the
155      * recv routine so we know which callback to use when
156      * the return message is recvd */
157     PMIX_CONSTRUCT(&cb, pmix_cb_t);
158     if (PMIX_SUCCESS != (rc = PMIx_Job_control_nb(targets, ntargets,
159                                                   directives, ndirs,
160                                                   acb, &cb))) {
161         PMIX_DESTRUCT(&cb);
162         return rc;
163     }
164 
165     /* wait for the operation to complete */
166     PMIX_WAIT_THREAD(&cb.lock);
167     rc = cb.status;
168     PMIX_DESTRUCT(&cb);
169 
170     pmix_output_verbose(2, pmix_globals.debug_output,
171                         "pmix:job_ctrl completed");
172 
173     return rc;
174 }
175 
PMIx_Job_control_nb(const pmix_proc_t targets[],size_t ntargets,const pmix_info_t directives[],size_t ndirs,pmix_info_cbfunc_t cbfunc,void * cbdata)176 PMIX_EXPORT pmix_status_t PMIx_Job_control_nb(const pmix_proc_t targets[], size_t ntargets,
177                                               const pmix_info_t directives[], size_t ndirs,
178                                               pmix_info_cbfunc_t cbfunc, void *cbdata)
179 {
180     pmix_buffer_t *msg;
181     pmix_cmd_t cmd = PMIX_JOB_CONTROL_CMD;
182     pmix_status_t rc;
183     pmix_query_caddy_t *cb;
184 
185     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
186 
187     pmix_output_verbose(2, pmix_globals.debug_output,
188                         "pmix: job control called with %d directives", (int)ndirs);
189 
190     if (pmix_globals.init_cntr <= 0) {
191         PMIX_RELEASE_THREAD(&pmix_global_lock);
192         return PMIX_ERR_INIT;
193     }
194 
195     /* if we are the server, then we just issue the request and
196      * return the response */
197     if (PMIX_PEER_IS_SERVER(pmix_globals.mypeer) &&
198         !PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer)) {
199         PMIX_RELEASE_THREAD(&pmix_global_lock);
200         if (NULL == pmix_host_server.job_control) {
201             /* nothing we can do */
202             return PMIX_ERR_NOT_SUPPORTED;
203         }
204         pmix_output_verbose(2, pmix_globals.debug_output,
205                             "pmix:job_control handed to RM");
206         rc = pmix_host_server.job_control(&pmix_globals.myid,
207                                           targets, ntargets,
208                                           directives, ndirs,
209                                           cbfunc, cbdata);
210         return rc;
211     }
212 
213     /* we need to send, so check for connection */
214     if (!pmix_globals.connected) {
215         PMIX_RELEASE_THREAD(&pmix_global_lock);
216         return PMIX_ERR_UNREACH;
217     }
218     PMIX_RELEASE_THREAD(&pmix_global_lock);
219 
220     /* if we are a client, then relay this request to the server */
221     msg = PMIX_NEW(pmix_buffer_t);
222     /* pack the cmd */
223     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
224                      msg, &cmd, 1, PMIX_COMMAND);
225     if (PMIX_SUCCESS != rc) {
226         PMIX_ERROR_LOG(rc);
227         PMIX_RELEASE(msg);
228         return rc;
229     }
230 
231     /* pack the number of targets */
232     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
233                      msg, &ntargets, 1, PMIX_SIZE);
234     if (PMIX_SUCCESS != rc) {
235         PMIX_ERROR_LOG(rc);
236         PMIX_RELEASE(msg);
237         return rc;
238     }
239     /* remember, the targets can be NULL to indicate that the operation
240      * is to be done against all members of our nspace */
241     if (NULL != targets && 0 < ntargets) {
242         /* pack the targets */
243         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
244                          msg, targets, ntargets, PMIX_PROC);
245         if (PMIX_SUCCESS != rc) {
246             PMIX_ERROR_LOG(rc);
247             PMIX_RELEASE(msg);
248             return rc;
249         }
250     }
251 
252     /* pack the directives */
253     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
254                      msg, &ndirs, 1, PMIX_SIZE);
255     if (PMIX_SUCCESS != rc) {
256         PMIX_ERROR_LOG(rc);
257         PMIX_RELEASE(msg);
258         return rc;
259     }
260     if (NULL != directives && 0 < ndirs) {
261         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
262                          msg, directives, ndirs, PMIX_INFO);
263         if (PMIX_SUCCESS != rc) {
264             PMIX_ERROR_LOG(rc);
265             PMIX_RELEASE(msg);
266             return rc;
267         }
268     }
269 
270     /* create a callback object as we need to pass it to the
271      * recv routine so we know which callback to use when
272      * the return message is recvd */
273     cb = PMIX_NEW(pmix_query_caddy_t);
274     cb->cbfunc = cbfunc;
275     cb->cbdata = cbdata;
276 
277     /* push the message into our event base to send to the server */
278     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
279                        msg, query_cbfunc, (void*)cb);
280     if (PMIX_SUCCESS != rc) {
281         PMIX_RELEASE(msg);
282         PMIX_RELEASE(cb);
283     }
284 
285     return rc;
286 }
287 
PMIx_Process_monitor(const pmix_info_t * monitor,pmix_status_t error,const pmix_info_t directives[],size_t ndirs)288 PMIX_EXPORT pmix_status_t PMIx_Process_monitor(const pmix_info_t *monitor, pmix_status_t error,
289                                                const pmix_info_t directives[], size_t ndirs)
290 {
291     pmix_cb_t cb;
292     pmix_status_t rc;
293 
294     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
295 
296     if (pmix_globals.init_cntr <= 0) {
297         PMIX_RELEASE_THREAD(&pmix_global_lock);
298         return PMIX_ERR_INIT;
299     }
300     PMIX_RELEASE_THREAD(&pmix_global_lock);
301 
302     pmix_output_verbose(2, pmix_globals.debug_output,
303                         "%s pmix:monitor", PMIX_NAME_PRINT(&pmix_globals.myid));
304 
305     /* create a callback object as we need to pass it to the
306      * recv routine so we know which callback to use when
307      * the return message is recvd */
308     PMIX_CONSTRUCT(&cb, pmix_cb_t);
309     if (PMIX_SUCCESS != (rc = PMIx_Process_monitor_nb(monitor, error,
310                                                       directives, ndirs,
311                                                       acb, &cb))) {
312         PMIX_DESTRUCT(&cb);
313         return rc;
314     }
315 
316     /* wait for the operation to complete */
317     PMIX_WAIT_THREAD(&cb.lock);
318     rc = cb.status;
319     PMIX_DESTRUCT(&cb);
320 
321     pmix_output_verbose(2, pmix_globals.debug_output,
322                         "pmix:monitor completed");
323 
324     return rc;
325 }
326 
PMIx_Process_monitor_nb(const pmix_info_t * monitor,pmix_status_t error,const pmix_info_t directives[],size_t ndirs,pmix_info_cbfunc_t cbfunc,void * cbdata)327 PMIX_EXPORT pmix_status_t PMIx_Process_monitor_nb(const pmix_info_t *monitor, pmix_status_t error,
328                                                   const pmix_info_t directives[], size_t ndirs,
329                                                   pmix_info_cbfunc_t cbfunc, void *cbdata)
330 {
331     pmix_buffer_t *msg;
332     pmix_cmd_t cmd = PMIX_MONITOR_CMD;
333     pmix_status_t rc;
334     pmix_query_caddy_t *cb;
335 
336     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
337 
338     pmix_output_verbose(2, pmix_globals.debug_output,
339                         "pmix: monitor called");
340 
341     if (pmix_globals.init_cntr <= 0) {
342         PMIX_RELEASE_THREAD(&pmix_global_lock);
343         return PMIX_ERR_INIT;
344     }
345 
346     /* sanity check */
347     if (NULL == monitor) {
348         PMIX_RELEASE_THREAD(&pmix_global_lock);
349         return PMIX_ERR_BAD_PARAM;
350     }
351 
352     /* if we are the server, then we just issue the request and
353      * return the response */
354     if (PMIX_PEER_IS_SERVER(pmix_globals.mypeer) &&
355         !PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer)) {
356         PMIX_RELEASE_THREAD(&pmix_global_lock);
357         if (NULL == pmix_host_server.monitor) {
358             /* nothing we can do */
359             return PMIX_ERR_NOT_SUPPORTED;
360         }
361         pmix_output_verbose(2, pmix_globals.debug_output,
362                             "pmix:monitor handed to RM");
363         rc = pmix_host_server.monitor(&pmix_globals.myid, monitor, error,
364                                       directives, ndirs, cbfunc, cbdata);
365         return rc;
366     }
367 
368     /* we need to send, so check for connection */
369     if (!pmix_globals.connected) {
370         PMIX_RELEASE_THREAD(&pmix_global_lock);
371         return PMIX_ERR_UNREACH;
372     }
373     PMIX_RELEASE_THREAD(&pmix_global_lock);
374 
375     /* if the monitor is PMIX_SEND_HEARTBEAT, then send it */
376     if (PMIX_CHECK_KEY(monitor, PMIX_SEND_HEARTBEAT)) {
377         msg = PMIX_NEW(pmix_buffer_t);
378         if (NULL == msg) {
379             return PMIX_ERR_NOMEM;
380         }
381         PMIX_PTL_SEND_ONEWAY(rc, pmix_client_globals.myserver, msg, PMIX_PTL_TAG_HEARTBEAT);
382         if (PMIX_SUCCESS != rc) {
383             PMIX_RELEASE(msg);
384         }
385         return rc;
386     }
387 
388     /* if we are a client, then relay this request to the server */
389     msg = PMIX_NEW(pmix_buffer_t);
390     /* pack the cmd */
391     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
392                      msg, &cmd, 1, PMIX_COMMAND);
393     if (PMIX_SUCCESS != rc) {
394         PMIX_ERROR_LOG(rc);
395         PMIX_RELEASE(msg);
396         return rc;
397     }
398 
399     /* pack the monitor */
400     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
401                      msg, monitor, 1, PMIX_INFO);
402     if (PMIX_SUCCESS != rc) {
403         PMIX_ERROR_LOG(rc);
404         PMIX_RELEASE(msg);
405         return rc;
406     }
407 
408     /* pack the error */
409     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
410                      msg, &error, 1, PMIX_STATUS);
411     if (PMIX_SUCCESS != rc) {
412         PMIX_ERROR_LOG(rc);
413         PMIX_RELEASE(msg);
414         return rc;
415     }
416 
417     /* pack the directives */
418     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
419                      msg, &ndirs, 1, PMIX_SIZE);
420     if (PMIX_SUCCESS != rc) {
421         PMIX_ERROR_LOG(rc);
422         PMIX_RELEASE(msg);
423         return rc;
424     }
425     if (0 < ndirs) {
426         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
427                          msg, directives, ndirs, PMIX_INFO);
428         if (PMIX_SUCCESS != rc) {
429             PMIX_ERROR_LOG(rc);
430             PMIX_RELEASE(msg);
431             return rc;
432         }
433     }
434 
435     /* create a callback object as we need to pass it to the
436      * recv routine so we know which callback to use when
437      * the return message is recvd */
438     cb = PMIX_NEW(pmix_query_caddy_t);
439     cb->cbfunc = cbfunc;
440     cb->cbdata = cbdata;
441 
442     /* push the message into our event base to send to the server */
443     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
444                        msg, query_cbfunc, (void*)cb);
445     if (PMIX_SUCCESS != rc) {
446         PMIX_RELEASE(msg);
447         PMIX_RELEASE(cb);
448     }
449 
450     return rc;
451 }
452