1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2014-2020 Intel, Inc. All rights reserved.
4 * Copyright (c) 2016 Mellanox Technologies, Inc.
5 * All rights reserved.
6 * Copyright (c) 2016 IBM Corporation. All rights reserved.
7 * Copyright (c) 2019 Research Organization for Information Science
8 * and Technology (RIST). All rights reserved.
9 * $COPYRIGHT$
10 *
11 * Additional copyrights may follow
12 *
13 * $HEADER$
14 */
15 #include "src/include/pmix_config.h"
16
17 #include "src/include/pmix_stdint.h"
18 #include "src/include/pmix_socket_errno.h"
19
20 #include "include/pmix.h"
21 #include "include/pmix_common.h"
22 #include "include/pmix_server.h"
23
24 #include "src/threads/threads.h"
25 #include "src/util/argv.h"
26 #include "src/util/error.h"
27 #include "src/util/name_fns.h"
28 #include "src/util/output.h"
29 #include "src/mca/bfrops/bfrops.h"
30 #include "src/mca/ptl/ptl.h"
31
32 #include "src/client/pmix_client_ops.h"
33 #include "src/server/pmix_server_ops.h"
34 #include "src/include/pmix_globals.h"
35
relcbfunc(void * cbdata)36 static void relcbfunc(void *cbdata)
37 {
38 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
39
40 pmix_output_verbose(2, pmix_globals.debug_output,
41 "pmix:job_ctrl release callback");
42
43 if (NULL != cd->info) {
44 PMIX_INFO_FREE(cd->info, cd->ninfo);
45 }
46 PMIX_RELEASE(cd);
47 }
query_cbfunc(struct pmix_peer_t * peer,pmix_ptl_hdr_t * hdr,pmix_buffer_t * buf,void * cbdata)48 static void query_cbfunc(struct pmix_peer_t *peer,
49 pmix_ptl_hdr_t *hdr,
50 pmix_buffer_t *buf, void *cbdata)
51 {
52 pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
53 pmix_status_t rc;
54 pmix_shift_caddy_t *results;
55 int cnt;
56
57 pmix_output_verbose(2, pmix_globals.debug_output,
58 "pmix:job_ctrl cback from server with %d bytes",
59 (int)buf->bytes_used);
60
61 /* a zero-byte buffer indicates that this recv is being
62 * completed due to a lost connection */
63 if (PMIX_BUFFER_IS_EMPTY(buf)) {
64 /* release the caller */
65 if (NULL != cd->cbfunc) {
66 cd->cbfunc(PMIX_ERR_COMM_FAILURE, NULL, 0, cd->cbdata, NULL, NULL);
67 }
68 PMIX_RELEASE(cd);
69 return;
70 }
71
72 results = PMIX_NEW(pmix_shift_caddy_t);
73
74 /* unpack the status */
75 cnt = 1;
76 PMIX_BFROPS_UNPACK(rc, peer, buf, &results->status, &cnt, PMIX_STATUS);
77 if (PMIX_SUCCESS != rc) {
78 PMIX_ERROR_LOG(rc);
79 goto complete;
80 }
81 if (PMIX_SUCCESS != results->status) {
82 goto complete;
83 }
84
85 /* unpack any returned data */
86 cnt = 1;
87 PMIX_BFROPS_UNPACK(rc, peer, buf, &results->ninfo, &cnt, PMIX_SIZE);
88 if (PMIX_SUCCESS != rc && PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
89 PMIX_ERROR_LOG(rc);
90 goto complete;
91 }
92 if (0 < results->ninfo) {
93 PMIX_INFO_CREATE(results->info, results->ninfo);
94 cnt = results->ninfo;
95 PMIX_BFROPS_UNPACK(rc, peer, buf, results->info, &cnt, PMIX_INFO);
96 if (PMIX_SUCCESS != rc) {
97 PMIX_ERROR_LOG(rc);
98 goto complete;
99 }
100 }
101
102 complete:
103 pmix_output_verbose(2, pmix_globals.debug_output,
104 "pmix:job_ctrl cback from server releasing");
105 /* release the caller */
106 if (NULL != cd->cbfunc) {
107 cd->cbfunc(results->status, results->info, results->ninfo, cd->cbdata, relcbfunc, results);
108 } else {
109 PMIX_RELEASE(results);
110 }
111 PMIX_RELEASE(cd);
112 }
113
acb(pmix_status_t status,pmix_info_t * info,size_t ninfo,void * cbdata,pmix_release_cbfunc_t release_fn,void * release_cbdata)114 static void acb(pmix_status_t status,
115 pmix_info_t *info, size_t ninfo,
116 void *cbdata,
117 pmix_release_cbfunc_t release_fn,
118 void *release_cbdata)
119 {
120 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
121 size_t n;
122
123 cb->status = status;
124 if (0 < ninfo) {
125 PMIX_INFO_CREATE(cb->info, ninfo);
126 cb->ninfo = ninfo;
127 for (n=0; n < ninfo; n++) {
128 PMIX_INFO_XFER(&cb->info[n], &info[n]);
129 }
130 }
131 if (NULL != release_fn) {
132 release_fn(release_cbdata);
133 }
134 PMIX_WAKEUP_THREAD(&cb->lock);
135 }
136
PMIx_Job_control(const pmix_proc_t targets[],size_t ntargets,const pmix_info_t directives[],size_t ndirs)137 PMIX_EXPORT pmix_status_t PMIx_Job_control(const pmix_proc_t targets[], size_t ntargets,
138 const pmix_info_t directives[], size_t ndirs)
139 {
140 pmix_cb_t cb;
141 pmix_status_t rc;
142
143 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
144
145 if (pmix_globals.init_cntr <= 0) {
146 PMIX_RELEASE_THREAD(&pmix_global_lock);
147 return PMIX_ERR_INIT;
148 }
149 PMIX_RELEASE_THREAD(&pmix_global_lock);
150
151 pmix_output_verbose(2, pmix_globals.debug_output,
152 "%s pmix:job_ctrl", PMIX_NAME_PRINT(&pmix_globals.myid));
153
154 /* create a callback object as we need to pass it to the
155 * recv routine so we know which callback to use when
156 * the return message is recvd */
157 PMIX_CONSTRUCT(&cb, pmix_cb_t);
158 if (PMIX_SUCCESS != (rc = PMIx_Job_control_nb(targets, ntargets,
159 directives, ndirs,
160 acb, &cb))) {
161 PMIX_DESTRUCT(&cb);
162 return rc;
163 }
164
165 /* wait for the operation to complete */
166 PMIX_WAIT_THREAD(&cb.lock);
167 rc = cb.status;
168 PMIX_DESTRUCT(&cb);
169
170 pmix_output_verbose(2, pmix_globals.debug_output,
171 "pmix:job_ctrl completed");
172
173 return rc;
174 }
175
PMIx_Job_control_nb(const pmix_proc_t targets[],size_t ntargets,const pmix_info_t directives[],size_t ndirs,pmix_info_cbfunc_t cbfunc,void * cbdata)176 PMIX_EXPORT pmix_status_t PMIx_Job_control_nb(const pmix_proc_t targets[], size_t ntargets,
177 const pmix_info_t directives[], size_t ndirs,
178 pmix_info_cbfunc_t cbfunc, void *cbdata)
179 {
180 pmix_buffer_t *msg;
181 pmix_cmd_t cmd = PMIX_JOB_CONTROL_CMD;
182 pmix_status_t rc;
183 pmix_query_caddy_t *cb;
184
185 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
186
187 pmix_output_verbose(2, pmix_globals.debug_output,
188 "pmix: job control called with %d directives", (int)ndirs);
189
190 if (pmix_globals.init_cntr <= 0) {
191 PMIX_RELEASE_THREAD(&pmix_global_lock);
192 return PMIX_ERR_INIT;
193 }
194
195 /* if we are the server, then we just issue the request and
196 * return the response */
197 if (PMIX_PEER_IS_SERVER(pmix_globals.mypeer) &&
198 !PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer)) {
199 PMIX_RELEASE_THREAD(&pmix_global_lock);
200 if (NULL == pmix_host_server.job_control) {
201 /* nothing we can do */
202 return PMIX_ERR_NOT_SUPPORTED;
203 }
204 pmix_output_verbose(2, pmix_globals.debug_output,
205 "pmix:job_control handed to RM");
206 rc = pmix_host_server.job_control(&pmix_globals.myid,
207 targets, ntargets,
208 directives, ndirs,
209 cbfunc, cbdata);
210 return rc;
211 }
212
213 /* we need to send, so check for connection */
214 if (!pmix_globals.connected) {
215 PMIX_RELEASE_THREAD(&pmix_global_lock);
216 return PMIX_ERR_UNREACH;
217 }
218 PMIX_RELEASE_THREAD(&pmix_global_lock);
219
220 /* if we are a client, then relay this request to the server */
221 msg = PMIX_NEW(pmix_buffer_t);
222 /* pack the cmd */
223 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
224 msg, &cmd, 1, PMIX_COMMAND);
225 if (PMIX_SUCCESS != rc) {
226 PMIX_ERROR_LOG(rc);
227 PMIX_RELEASE(msg);
228 return rc;
229 }
230
231 /* pack the number of targets */
232 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
233 msg, &ntargets, 1, PMIX_SIZE);
234 if (PMIX_SUCCESS != rc) {
235 PMIX_ERROR_LOG(rc);
236 PMIX_RELEASE(msg);
237 return rc;
238 }
239 /* remember, the targets can be NULL to indicate that the operation
240 * is to be done against all members of our nspace */
241 if (NULL != targets && 0 < ntargets) {
242 /* pack the targets */
243 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
244 msg, targets, ntargets, PMIX_PROC);
245 if (PMIX_SUCCESS != rc) {
246 PMIX_ERROR_LOG(rc);
247 PMIX_RELEASE(msg);
248 return rc;
249 }
250 }
251
252 /* pack the directives */
253 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
254 msg, &ndirs, 1, PMIX_SIZE);
255 if (PMIX_SUCCESS != rc) {
256 PMIX_ERROR_LOG(rc);
257 PMIX_RELEASE(msg);
258 return rc;
259 }
260 if (NULL != directives && 0 < ndirs) {
261 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
262 msg, directives, ndirs, PMIX_INFO);
263 if (PMIX_SUCCESS != rc) {
264 PMIX_ERROR_LOG(rc);
265 PMIX_RELEASE(msg);
266 return rc;
267 }
268 }
269
270 /* create a callback object as we need to pass it to the
271 * recv routine so we know which callback to use when
272 * the return message is recvd */
273 cb = PMIX_NEW(pmix_query_caddy_t);
274 cb->cbfunc = cbfunc;
275 cb->cbdata = cbdata;
276
277 /* push the message into our event base to send to the server */
278 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
279 msg, query_cbfunc, (void*)cb);
280 if (PMIX_SUCCESS != rc) {
281 PMIX_RELEASE(msg);
282 PMIX_RELEASE(cb);
283 }
284
285 return rc;
286 }
287
PMIx_Process_monitor(const pmix_info_t * monitor,pmix_status_t error,const pmix_info_t directives[],size_t ndirs)288 PMIX_EXPORT pmix_status_t PMIx_Process_monitor(const pmix_info_t *monitor, pmix_status_t error,
289 const pmix_info_t directives[], size_t ndirs)
290 {
291 pmix_cb_t cb;
292 pmix_status_t rc;
293
294 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
295
296 if (pmix_globals.init_cntr <= 0) {
297 PMIX_RELEASE_THREAD(&pmix_global_lock);
298 return PMIX_ERR_INIT;
299 }
300 PMIX_RELEASE_THREAD(&pmix_global_lock);
301
302 pmix_output_verbose(2, pmix_globals.debug_output,
303 "%s pmix:monitor", PMIX_NAME_PRINT(&pmix_globals.myid));
304
305 /* create a callback object as we need to pass it to the
306 * recv routine so we know which callback to use when
307 * the return message is recvd */
308 PMIX_CONSTRUCT(&cb, pmix_cb_t);
309 if (PMIX_SUCCESS != (rc = PMIx_Process_monitor_nb(monitor, error,
310 directives, ndirs,
311 acb, &cb))) {
312 PMIX_DESTRUCT(&cb);
313 return rc;
314 }
315
316 /* wait for the operation to complete */
317 PMIX_WAIT_THREAD(&cb.lock);
318 rc = cb.status;
319 PMIX_DESTRUCT(&cb);
320
321 pmix_output_verbose(2, pmix_globals.debug_output,
322 "pmix:monitor completed");
323
324 return rc;
325 }
326
PMIx_Process_monitor_nb(const pmix_info_t * monitor,pmix_status_t error,const pmix_info_t directives[],size_t ndirs,pmix_info_cbfunc_t cbfunc,void * cbdata)327 PMIX_EXPORT pmix_status_t PMIx_Process_monitor_nb(const pmix_info_t *monitor, pmix_status_t error,
328 const pmix_info_t directives[], size_t ndirs,
329 pmix_info_cbfunc_t cbfunc, void *cbdata)
330 {
331 pmix_buffer_t *msg;
332 pmix_cmd_t cmd = PMIX_MONITOR_CMD;
333 pmix_status_t rc;
334 pmix_query_caddy_t *cb;
335
336 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
337
338 pmix_output_verbose(2, pmix_globals.debug_output,
339 "pmix: monitor called");
340
341 if (pmix_globals.init_cntr <= 0) {
342 PMIX_RELEASE_THREAD(&pmix_global_lock);
343 return PMIX_ERR_INIT;
344 }
345
346 /* sanity check */
347 if (NULL == monitor) {
348 PMIX_RELEASE_THREAD(&pmix_global_lock);
349 return PMIX_ERR_BAD_PARAM;
350 }
351
352 /* if we are the server, then we just issue the request and
353 * return the response */
354 if (PMIX_PEER_IS_SERVER(pmix_globals.mypeer) &&
355 !PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer)) {
356 PMIX_RELEASE_THREAD(&pmix_global_lock);
357 if (NULL == pmix_host_server.monitor) {
358 /* nothing we can do */
359 return PMIX_ERR_NOT_SUPPORTED;
360 }
361 pmix_output_verbose(2, pmix_globals.debug_output,
362 "pmix:monitor handed to RM");
363 rc = pmix_host_server.monitor(&pmix_globals.myid, monitor, error,
364 directives, ndirs, cbfunc, cbdata);
365 return rc;
366 }
367
368 /* we need to send, so check for connection */
369 if (!pmix_globals.connected) {
370 PMIX_RELEASE_THREAD(&pmix_global_lock);
371 return PMIX_ERR_UNREACH;
372 }
373 PMIX_RELEASE_THREAD(&pmix_global_lock);
374
375 /* if the monitor is PMIX_SEND_HEARTBEAT, then send it */
376 if (PMIX_CHECK_KEY(monitor, PMIX_SEND_HEARTBEAT)) {
377 msg = PMIX_NEW(pmix_buffer_t);
378 if (NULL == msg) {
379 return PMIX_ERR_NOMEM;
380 }
381 PMIX_PTL_SEND_ONEWAY(rc, pmix_client_globals.myserver, msg, PMIX_PTL_TAG_HEARTBEAT);
382 if (PMIX_SUCCESS != rc) {
383 PMIX_RELEASE(msg);
384 }
385 return rc;
386 }
387
388 /* if we are a client, then relay this request to the server */
389 msg = PMIX_NEW(pmix_buffer_t);
390 /* pack the cmd */
391 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
392 msg, &cmd, 1, PMIX_COMMAND);
393 if (PMIX_SUCCESS != rc) {
394 PMIX_ERROR_LOG(rc);
395 PMIX_RELEASE(msg);
396 return rc;
397 }
398
399 /* pack the monitor */
400 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
401 msg, monitor, 1, PMIX_INFO);
402 if (PMIX_SUCCESS != rc) {
403 PMIX_ERROR_LOG(rc);
404 PMIX_RELEASE(msg);
405 return rc;
406 }
407
408 /* pack the error */
409 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
410 msg, &error, 1, PMIX_STATUS);
411 if (PMIX_SUCCESS != rc) {
412 PMIX_ERROR_LOG(rc);
413 PMIX_RELEASE(msg);
414 return rc;
415 }
416
417 /* pack the directives */
418 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
419 msg, &ndirs, 1, PMIX_SIZE);
420 if (PMIX_SUCCESS != rc) {
421 PMIX_ERROR_LOG(rc);
422 PMIX_RELEASE(msg);
423 return rc;
424 }
425 if (0 < ndirs) {
426 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
427 msg, directives, ndirs, PMIX_INFO);
428 if (PMIX_SUCCESS != rc) {
429 PMIX_ERROR_LOG(rc);
430 PMIX_RELEASE(msg);
431 return rc;
432 }
433 }
434
435 /* create a callback object as we need to pass it to the
436 * recv routine so we know which callback to use when
437 * the return message is recvd */
438 cb = PMIX_NEW(pmix_query_caddy_t);
439 cb->cbfunc = cbfunc;
440 cb->cbdata = cbdata;
441
442 /* push the message into our event base to send to the server */
443 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
444 msg, query_cbfunc, (void*)cb);
445 if (PMIX_SUCCESS != rc) {
446 PMIX_RELEASE(msg);
447 PMIX_RELEASE(cb);
448 }
449
450 return rc;
451 }
452