1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2014-2019 Intel, Inc. All rights reserved.
4 * Copyright (c) 2017-2019 Research Organization for Information Science
5 * and Technology (RIST). All rights reserved.
6 * Copyright (c) 2017 IBM Corporation. All rights reserved.
7 *
8 * $COPYRIGHT$
9 *
10 * Additional copyrights may follow
11 *
12 * $HEADER$
13 */
14 #include <src/include/pmix_config.h>
15
16 #include <pmix.h>
17 #include <pmix_common.h>
18 #include <pmix_server.h>
19 #include <pmix_rename.h>
20
21 #include "src/threads/threads.h"
22 #include "src/util/error.h"
23 #include "src/util/output.h"
24
25 #include "src/mca/bfrops/bfrops.h"
26 #include "src/client/pmix_client_ops.h"
27 #include "src/server/pmix_server_ops.h"
28 #include "src/include/pmix_globals.h"
29
30 static pmix_status_t notify_server_of_event(pmix_status_t status,
31 const pmix_proc_t *source,
32 pmix_data_range_t range,
33 const pmix_info_t info[], size_t ninfo,
34 pmix_op_cbfunc_t cbfunc, void *cbdata);
35
36 /* if we are a client, we call this function to notify the server of
37 * an event. If we are a server, our host RM will call this function
38 * to notify us of an event */
PMIx_Notify_event(pmix_status_t status,const pmix_proc_t * source,pmix_data_range_t range,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)39 PMIX_EXPORT pmix_status_t PMIx_Notify_event(pmix_status_t status,
40 const pmix_proc_t *source,
41 pmix_data_range_t range,
42 const pmix_info_t info[], size_t ninfo,
43 pmix_op_cbfunc_t cbfunc, void *cbdata)
44 {
45 int rc;
46
47 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
48
49 if (pmix_globals.init_cntr <= 0) {
50 PMIX_RELEASE_THREAD(&pmix_global_lock);
51 return PMIX_ERR_INIT;
52 }
53
54 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
55 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
56 PMIX_RELEASE_THREAD(&pmix_global_lock);
57
58 pmix_output_verbose(2, pmix_server_globals.event_output,
59 "pmix_server_notify_event source = %s:%d event_status = %s",
60 (NULL == source) ? "UNKNOWN" : source->nspace,
61 (NULL == source) ? PMIX_RANK_WILDCARD : source->rank, PMIx_Error_string(status));
62
63 rc = pmix_server_notify_client_of_event(status, source, range,
64 info, ninfo,
65 cbfunc, cbdata);
66
67 if (PMIX_SUCCESS != rc && PMIX_OPERATION_SUCCEEDED != rc) {
68 PMIX_ERROR_LOG(rc);
69 }
70 return rc;
71 }
72
73 /* if we aren't connected, don't attempt to send */
74 if (!pmix_globals.connected) {
75 PMIX_RELEASE_THREAD(&pmix_global_lock);
76 return PMIX_ERR_UNREACH;
77 }
78 PMIX_RELEASE_THREAD(&pmix_global_lock);
79 pmix_output_verbose(2, pmix_client_globals.event_output,
80 "pmix_client_notify_event source = %s:%d event_status =%d",
81 (NULL == source) ? pmix_globals.myid.nspace : source->nspace,
82 (NULL == source) ? pmix_globals.myid.rank : source->rank, status);
83
84 rc = notify_server_of_event(status, source, range,
85 info, ninfo,
86 cbfunc, cbdata);
87 if (PMIX_SUCCESS != rc) {
88 PMIX_ERROR_LOG(rc);
89 }
90 return rc;
91 }
92
notify_event_cbfunc(struct pmix_peer_t * pr,pmix_ptl_hdr_t * hdr,pmix_buffer_t * buf,void * cbdata)93 static void notify_event_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr,
94 pmix_buffer_t *buf, void *cbdata)
95 {
96 pmix_status_t rc, ret;
97 int32_t cnt = 1;
98 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
99
100 /* unpack the status */
101 PMIX_BFROPS_UNPACK(rc, pr, buf, &ret, &cnt, PMIX_STATUS);
102 if (PMIX_SUCCESS != rc) {
103 PMIX_ERROR_LOG(rc);
104 ret = rc;
105 }
106 /* do the cback */
107 if (NULL != cb->cbfunc.opfn) {
108 cb->cbfunc.opfn(ret, cb->cbdata);
109 }
110 PMIX_RELEASE(cb);
111 }
112
notify_event_cache(pmix_notify_caddy_t * cd)113 static pmix_status_t notify_event_cache(pmix_notify_caddy_t *cd)
114 {
115 pmix_status_t rc;
116 int j;
117 pmix_notify_caddy_t *pk;
118 int idx;
119 time_t etime;
120
121 /* add to our cache */
122 rc = pmix_hotel_checkin(&pmix_globals.notifications, cd, &cd->room);
123 /* if there wasn't room, then search for the longest tenured
124 * occupant and evict them */
125 if (PMIX_SUCCESS != rc) {
126 etime = 0;
127 idx = -1;
128 for (j=0; j < pmix_globals.max_events; j++) {
129 pmix_hotel_knock(&pmix_globals.notifications, j, (void**)&pk);
130 if (NULL == pk) {
131 /* hey, there is room! */
132 pmix_hotel_checkin_with_res(&pmix_globals.notifications, cd, &cd->room);
133 return PMIX_SUCCESS;
134 }
135 /* check the age */
136 if (0 == j) {
137 etime = pk->ts;
138 idx = j;
139 } else {
140 if (difftime(pk->ts, etime) < 0) {
141 etime = pk->ts;
142 idx = j;
143 }
144 }
145 }
146 if (0 <= idx) {
147 /* we found the oldest occupant - evict it */
148 pmix_hotel_checkout_and_return_occupant(&pmix_globals.notifications, idx, (void**)&pk);
149 PMIX_RELEASE(pk);
150 rc = pmix_hotel_checkin(&pmix_globals.notifications, cd, &cd->room);
151 }
152 }
153 return rc;
154 }
155
156 /* as a client, we pass the notification to our server */
notify_server_of_event(pmix_status_t status,const pmix_proc_t * source,pmix_data_range_t range,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)157 static pmix_status_t notify_server_of_event(pmix_status_t status,
158 const pmix_proc_t *source,
159 pmix_data_range_t range,
160 const pmix_info_t info[], size_t ninfo,
161 pmix_op_cbfunc_t cbfunc, void *cbdata)
162 {
163 pmix_status_t rc;
164 pmix_buffer_t *msg = NULL;
165 pmix_cmd_t cmd = PMIX_NOTIFY_CMD;
166 pmix_cb_t *cb;
167 pmix_event_chain_t *chain;
168 size_t n;
169 pmix_notify_caddy_t *cd;
170
171 pmix_output_verbose(2, pmix_client_globals.event_output,
172 "[%s:%d] client: notifying server %s:%d of status %s for range %s",
173 pmix_globals.myid.nspace, pmix_globals.myid.rank,
174 pmix_client_globals.myserver->info->pname.nspace,
175 pmix_client_globals.myserver->info->pname.rank,
176 PMIx_Error_string(status), PMIx_Data_range_string(range));
177
178 if (PMIX_RANGE_PROC_LOCAL != range) {
179 /* create the msg object */
180 msg = PMIX_NEW(pmix_buffer_t);
181 if (NULL == msg) {
182 return PMIX_ERR_NOMEM;
183 }
184 /* pack the command */
185 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &cmd, 1, PMIX_COMMAND);
186 if (PMIX_SUCCESS != rc) {
187 PMIX_ERROR_LOG(rc);
188 goto cleanup;
189 }
190 /* pack the status */
191 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &status, 1, PMIX_STATUS);
192 if (PMIX_SUCCESS != rc) {
193 PMIX_ERROR_LOG(rc);
194 goto cleanup;
195 }
196 /* no need to pack the source as it is us */
197
198 /* pack the range */
199 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &range, 1, PMIX_DATA_RANGE);
200 if (PMIX_SUCCESS != rc) {
201 PMIX_ERROR_LOG(rc);
202 goto cleanup;
203 }
204 /* pack the info */
205 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &ninfo, 1, PMIX_SIZE);
206 if (PMIX_SUCCESS != rc) {
207 PMIX_ERROR_LOG(rc);
208 goto cleanup;
209 }
210 if (0 < ninfo) {
211 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, info, ninfo, PMIX_INFO);
212 if (PMIX_SUCCESS != rc) {
213 PMIX_ERROR_LOG(rc);
214 goto cleanup;
215 }
216 }
217 }
218
219 /* setup for our own local callbacks */
220 chain = PMIX_NEW(pmix_event_chain_t);
221 chain->status = status;
222 pmix_strncpy(chain->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN);
223 chain->source.rank = pmix_globals.myid.rank;
224 /* we always leave space for event hdlr name and a callback object */
225 chain->nallocated = ninfo + 2;
226 PMIX_INFO_CREATE(chain->info, chain->nallocated);
227 /* prep the chain for processing */
228 pmix_prep_event_chain(chain, info, ninfo, true);
229
230 /* we need to cache this event so we can pass it into
231 * ourselves should someone later register for it */
232 cd = PMIX_NEW(pmix_notify_caddy_t);
233 cd->status = status;
234 if (NULL == source) {
235 pmix_strncpy(cd->source.nspace, "UNDEF", PMIX_MAX_NSLEN);
236 cd->source.rank = PMIX_RANK_UNDEF;
237 } else {
238 pmix_strncpy(cd->source.nspace, source->nspace, PMIX_MAX_NSLEN);
239 cd->source.rank = source->rank;
240 }
241 cd->range = range;
242 if (0 < chain->ninfo) {
243 cd->ninfo = chain->ninfo;
244 PMIX_INFO_CREATE(cd->info, cd->ninfo);
245 cd->nondefault = chain->nondefault;
246 /* need to copy the info */
247 for (n=0; n < cd->ninfo; n++) {
248 PMIX_INFO_XFER(&cd->info[n], &chain->info[n]);
249 }
250 }
251 if (NULL != chain->targets) {
252 cd->ntargets = chain->ntargets;
253 PMIX_PROC_CREATE(cd->targets, cd->ntargets);
254 memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t));
255 }
256 if (NULL != chain->affected) {
257 cd->naffected = chain->naffected;
258 PMIX_PROC_CREATE(cd->affected, cd->naffected);
259 if (NULL == cd->affected) {
260 cd->naffected = 0;
261 rc = PMIX_ERR_NOMEM;
262 goto cleanup;
263 }
264 memcpy(cd->affected, chain->affected, cd->naffected * sizeof(pmix_proc_t));
265 }
266 /* cache it */
267 rc = notify_event_cache(cd);
268 if (PMIX_SUCCESS != rc) {
269 PMIX_ERROR_LOG(rc);
270 PMIX_RELEASE(cd);
271 goto cleanup;
272 }
273
274 if (PMIX_RANGE_PROC_LOCAL != range && NULL != msg) {
275 /* create a callback object as we need to pass it to the
276 * recv routine so we know which callback to use when
277 * the server acks/nacks the register events request. The
278 * server will _not_ send this notification back to us,
279 * so we handle it locally */
280 cb = PMIX_NEW(pmix_cb_t);
281 cb->cbfunc.opfn = cbfunc;
282 cb->cbdata = cbdata;
283 /* send to the server */
284 pmix_output_verbose(2, pmix_client_globals.event_output,
285 "[%s:%d] client: notifying server %s:%d - sending",
286 pmix_globals.myid.nspace, pmix_globals.myid.rank,
287 pmix_client_globals.myserver->info->pname.nspace,
288 pmix_client_globals.myserver->info->pname.rank);
289 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
290 msg, notify_event_cbfunc, cb);
291 if (PMIX_SUCCESS != rc) {
292 PMIX_ERROR_LOG(rc);
293 PMIX_RELEASE(cb);
294 goto cleanup;
295 }
296 } else if (NULL != cbfunc) {
297 cbfunc(PMIX_SUCCESS, cbdata);
298 }
299
300 /* now notify any matching registered callbacks we have */
301 pmix_invoke_local_event_hdlr(chain);
302
303 return PMIX_SUCCESS;
304
305 cleanup:
306 pmix_output_verbose(2, pmix_client_globals.event_output,
307 "client: notifying server - unable to send");
308 if (NULL != msg) {
309 PMIX_RELEASE(msg);
310 }
311 /* we were unable to send anything, so we just return the error */
312 return rc;
313 }
314
315
progress_local_event_hdlr(pmix_status_t status,pmix_info_t * results,size_t nresults,pmix_op_cbfunc_t cbfunc,void * thiscbdata,void * notification_cbdata)316 static void progress_local_event_hdlr(pmix_status_t status,
317 pmix_info_t *results, size_t nresults,
318 pmix_op_cbfunc_t cbfunc, void *thiscbdata,
319 void *notification_cbdata)
320 {
321 /* this may be in the host's thread, so we need to threadshift it
322 * before accessing our internal data */
323
324 pmix_event_chain_t *chain = (pmix_event_chain_t*)notification_cbdata;
325 size_t n, nsave, cnt;
326 pmix_info_t *newinfo;
327 pmix_list_item_t *item;
328 pmix_event_hdlr_t *nxt;
329
330 /* aggregate the results per RFC0018 - first search the
331 * prior chained results to see if any keys have been NULL'd
332 * as this indicates that info struct should be removed */
333 nsave = 0;
334 for (n=0; n < chain->nresults; n++) {
335 if (0 < strlen(chain->results[n].key)) {
336 ++nsave;
337 }
338 }
339 /* we have to at least record the status returned by each
340 * stage of the event handler chain, so we have to reallocate
341 * the array to make space */
342
343 /* add in any new results plus space for the returned status */
344 nsave += nresults + 1;
345 /* create the new space */
346 PMIX_INFO_CREATE(newinfo, nsave);
347 /* transfer over the prior data */
348 cnt = 0;
349 for (n=0; n < chain->nresults; n++) {
350 if (0 < strlen(chain->results[n].key)) {
351 PMIX_INFO_XFER(&newinfo[cnt], &chain->results[n]);
352 ++cnt;
353 }
354 }
355
356 /* save this handler's returned status */
357 if (NULL != chain->evhdlr->name) {
358 pmix_strncpy(newinfo[cnt].key, chain->evhdlr->name, PMIX_MAX_KEYLEN);
359 } else {
360 pmix_strncpy(newinfo[cnt].key, "UNKNOWN", PMIX_MAX_KEYLEN);
361 }
362 newinfo[cnt].value.type = PMIX_STATUS;
363 newinfo[cnt].value.data.status = status;
364 ++cnt;
365 /* transfer across the new results */
366 for (n=0; n < nresults; n++) {
367 PMIX_INFO_XFER(&newinfo[cnt], &results[n]);
368 ++cnt;
369 }
370 /* release the prior results */
371 if (0 < chain->nresults) {
372 PMIX_INFO_FREE(chain->results, chain->nresults);
373 }
374 /* pass along the new ones */
375 chain->results = newinfo;
376 chain->nresults = cnt;
377 /* clear any loaded name and object */
378 chain->ninfo = chain->nallocated - 2;
379 PMIX_INFO_DESTRUCT(&chain->info[chain->nallocated-2]);
380 PMIX_INFO_DESTRUCT(&chain->info[chain->nallocated-1]);
381
382 /* if the caller indicates that the chain is completed,
383 * or we completed the "last" event */
384 if (PMIX_EVENT_ACTION_COMPLETE == status || chain->endchain) {
385 goto complete;
386 }
387 item = NULL;
388
389 /* see if we need to continue, starting with the single code events */
390 if (1 == chain->evhdlr->ncodes) {
391 /* the last handler was for a single code - see if there are
392 * any others that match this event */
393 item = &chain->evhdlr->super;
394 while (pmix_list_get_end(&pmix_globals.events.single_events) != (item = pmix_list_get_next(item))) {
395 nxt = (pmix_event_hdlr_t*)item;
396 if (nxt->codes[0] == chain->status &&
397 pmix_notify_check_range(&nxt->rng, &chain->source) &&
398 pmix_notify_check_affected(nxt->affected, nxt->naffected,
399 chain->affected, chain->naffected)) {
400 chain->evhdlr = nxt;
401 /* reset our count to the info provided by the caller */
402 chain->ninfo = chain->nallocated - 2;
403 /* if the handler has a name, then provide it */
404 if (NULL != chain->evhdlr->name) {
405 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
406 chain->ninfo++;
407 }
408
409 /* if there is an evhdlr cbobject, provide it */
410 if (NULL != chain->evhdlr->cbobject) {
411 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
412 chain->ninfo++;
413 }
414 nxt->evhdlr(nxt->index,
415 chain->status, &chain->source,
416 chain->info, chain->ninfo,
417 chain->results, chain->nresults,
418 progress_local_event_hdlr, (void*)chain);
419 return;
420 }
421 }
422 /* if we get here, then there are no more single code
423 * events that match */
424 item = pmix_list_get_begin(&pmix_globals.events.multi_events);
425 }
426
427 /* see if we need to continue with the multi code events */
428 if (NULL != chain->evhdlr->codes || NULL != item) {
429 /* the last handler was for a multi-code event, or we exhausted
430 * all the single code events */
431 if (NULL == item) {
432 /* if the last handler was multi-code, then start from that point */
433 item = &chain->evhdlr->super;
434 }
435 while (pmix_list_get_end(&pmix_globals.events.multi_events) != (item = pmix_list_get_next(item))) {
436 nxt = (pmix_event_hdlr_t*)item;
437 if (!pmix_notify_check_range(&nxt->rng, &chain->source) ||
438 !pmix_notify_check_affected(nxt->affected, nxt->naffected,
439 chain->affected, chain->naffected)) {
440 continue;
441 }
442 for (n=0; n < nxt->ncodes; n++) {
443 /* if this event handler provided a range, check to see if
444 * the source fits within it */
445 if (nxt->codes[n] == chain->status) {
446 chain->evhdlr = nxt;
447 /* reset our count to the info provided by the caller */
448 chain->ninfo = chain->nallocated - 2;
449 /* if the handler has a name, then provide it */
450 if (NULL != chain->evhdlr->name) {
451 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
452 chain->ninfo++;
453 }
454
455 /* if there is an evhdlr cbobject, provide it */
456 if (NULL != chain->evhdlr->cbobject) {
457 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
458 chain->ninfo++;
459 }
460 nxt->evhdlr(nxt->index,
461 chain->status, &chain->source,
462 chain->info, chain->ninfo,
463 chain->results, chain->nresults,
464 progress_local_event_hdlr, (void*)chain);
465 return;
466 }
467 }
468 }
469 /* if we get here, then there are no more multi-mode
470 * events that match */
471 item = pmix_list_get_begin(&pmix_globals.events.default_events);
472 }
473
474 /* if they didn't want it to go to a default handler, then ignore them */
475 if (!chain->nondefault) {
476 if (NULL == item) {
477 item = &chain->evhdlr->super;
478 }
479 if (pmix_list_get_end(&pmix_globals.events.default_events) != (item = pmix_list_get_next(item))) {
480 nxt = (pmix_event_hdlr_t*)item;
481 /* if this event handler provided a range, check to see if
482 * the source fits within it */
483 if (pmix_notify_check_range(&nxt->rng, &chain->source) &&
484 pmix_notify_check_affected(nxt->affected, nxt->naffected,
485 chain->affected, chain->naffected)) {
486 chain->evhdlr = nxt;
487 /* reset our count to the info provided by the caller */
488 chain->ninfo = chain->nallocated - 2;
489 /* if the handler has a name, then provide it */
490 if (NULL != chain->evhdlr->name) {
491 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
492 chain->ninfo++;
493 }
494
495 /* if there is an evhdlr cbobject, provide it */
496 if (NULL != chain->evhdlr->cbobject) {
497 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
498 chain->ninfo++;
499 }
500 nxt->evhdlr(nxt->index,
501 chain->status, &chain->source,
502 chain->info, chain->ninfo,
503 chain->results, chain->nresults,
504 progress_local_event_hdlr, (void*)chain);
505 return;
506 }
507 }
508 }
509
510 /* if we registered a "last" handler, and it fits the given range
511 * and code, then invoke it now */
512 if (NULL != pmix_globals.events.last &&
513 pmix_notify_check_range(&pmix_globals.events.last->rng, &chain->source) &&
514 pmix_notify_check_affected(pmix_globals.events.last->affected, pmix_globals.events.last->naffected,
515 chain->affected, chain->naffected)) {
516 chain->endchain = true; // ensure we don't do this again
517 if (1 == pmix_globals.events.last->ncodes &&
518 pmix_globals.events.last->codes[0] == chain->status) {
519 chain->evhdlr = pmix_globals.events.last;
520 /* reset our count to the info provided by the caller */
521 chain->ninfo = chain->nallocated - 2;
522 /* if the handler has a name, then provide it */
523 if (NULL != chain->evhdlr->name) {
524 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
525 chain->ninfo++;
526 }
527
528 /* if there is an evhdlr cbobject, provide it */
529 if (NULL != chain->evhdlr->cbobject) {
530 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
531 chain->ninfo++;
532 }
533 chain->evhdlr->evhdlr(chain->evhdlr->index,
534 chain->status, &chain->source,
535 chain->info, chain->ninfo,
536 chain->results, chain->nresults,
537 progress_local_event_hdlr, (void*)chain);
538 return;
539 } else if (NULL != pmix_globals.events.last->codes) {
540 /* need to check if this code is included in the array */
541 for (n=0; n < pmix_globals.events.last->ncodes; n++) {
542 if (pmix_globals.events.last->codes[n] == chain->status) {
543 chain->evhdlr = pmix_globals.events.last;
544 /* reset our count to the info provided by the caller */
545 chain->ninfo = chain->nallocated - 2;
546 /* if the handler has a name, then provide it */
547 if (NULL != chain->evhdlr->name) {
548 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
549 chain->ninfo++;
550 }
551
552 /* if there is an evhdlr cbobject, provide it */
553 if (NULL != chain->evhdlr->cbobject) {
554 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
555 chain->ninfo++;
556 }
557 chain->evhdlr->evhdlr(chain->evhdlr->index,
558 chain->status, &chain->source,
559 chain->info, chain->ninfo,
560 chain->results, chain->nresults,
561 progress_local_event_hdlr, (void*)chain);
562 return;
563 }
564 }
565 } else {
566 /* gets run for all codes */
567 chain->evhdlr = pmix_globals.events.last;
568 /* reset our count to the info provided by the caller */
569 chain->ninfo = chain->nallocated - 2;
570 /* if the handler has a name, then provide it */
571 if (NULL != chain->evhdlr->name) {
572 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
573 chain->ninfo++;
574 }
575
576 /* if there is an evhdlr cbobject, provide it */
577 if (NULL != chain->evhdlr->cbobject) {
578 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
579 chain->ninfo++;
580 }
581 chain->evhdlr->evhdlr(chain->evhdlr->index,
582 chain->status, &chain->source,
583 chain->info, chain->ninfo,
584 chain->results, chain->nresults,
585 progress_local_event_hdlr, (void*)chain);
586 return;
587 }
588 }
589
590 complete:
591 /* we still have to call their final callback */
592 if (NULL != chain->final_cbfunc) {
593 chain->final_cbfunc(PMIX_SUCCESS, chain->final_cbdata);
594 return;
595 }
596 /* maintain acctng */
597 PMIX_RELEASE(chain);
598 /* let the caller know that we are done with their callback */
599 if (NULL != cbfunc) {
600 cbfunc(PMIX_SUCCESS, thiscbdata);
601 }
602 }
603
604 /* given notification of an event, cycle thru our list of
605 * registered callbacks and invoke the matching ones. Note
606 * that we will invoke the callbacks in order from single
607 * to multi-event to default, keeping a log of any returned
608 * info and passing it down to the next invoked event handler.
609 * Thus, each handler is given the opportunity to see what
610 * prior handlers did, and decide if anything further needs
611 * to be done.
612 */
pmix_invoke_local_event_hdlr(pmix_event_chain_t * chain)613 void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
614 {
615 /* We need to parse thru each registered handler and determine
616 * which one(s) to call for the specific error */
617 size_t i;
618 pmix_event_hdlr_t *evhdlr;
619 pmix_status_t rc = PMIX_SUCCESS;
620 bool found;
621
622 pmix_output_verbose(2, pmix_client_globals.event_output,
623 "%s:%d invoke_local_event_hdlr for status %s",
624 pmix_globals.myid.nspace, pmix_globals.myid.rank,
625 PMIx_Error_string(chain->status));
626
627 /* sanity check */
628 if (NULL == chain->info) {
629 /* should never happen as space must always be
630 * reserved for handler name and callback object*/
631 rc = PMIX_ERR_BAD_PARAM;
632 goto complete;
633 }
634
635 /* if we are not a target, then we can simply ignore this event */
636 if (NULL != chain->targets) {
637 found = false;
638 for (i=0; i < chain->ntargets; i++) {
639 if (PMIX_CHECK_PROCID(&chain->targets[i], &pmix_globals.myid)) {
640 found = true;
641 break;
642 }
643 }
644 if (!found) {
645 goto complete;
646 }
647 }
648
649 /* if we registered a "first" handler, and it fits the given range,
650 * then invoke it first */
651 if (NULL != pmix_globals.events.first) {
652 if (1 == pmix_globals.events.first->ncodes &&
653 pmix_globals.events.first->codes[0] == chain->status &&
654 pmix_notify_check_range(&pmix_globals.events.first->rng, &chain->source) &&
655 pmix_notify_check_affected(pmix_globals.events.first->affected, pmix_globals.events.first->naffected,
656 chain->affected, chain->naffected)) {
657 /* invoke the handler */
658 chain->evhdlr = pmix_globals.events.first;
659 goto invk;
660 } else if (NULL != pmix_globals.events.first->codes) {
661 /* need to check if this code is included in the array */
662 found = false;
663 for (i=0; i < pmix_globals.events.first->ncodes; i++) {
664 if (pmix_globals.events.first->codes[i] == chain->status) {
665 found = true;
666 break;
667 }
668 }
669 /* if this event handler provided a range, check to see if
670 * the source fits within it */
671 if (found && pmix_notify_check_range(&pmix_globals.events.first->rng, &chain->source)) {
672 /* invoke the handler */
673 chain->evhdlr = pmix_globals.events.first;
674 goto invk;
675 }
676 } else {
677 /* take all codes for a default handler */
678 if (pmix_notify_check_range(&pmix_globals.events.first->rng, &chain->source)) {
679 /* invoke the handler */
680 chain->evhdlr = pmix_globals.events.first;
681 goto invk;
682 }
683 }
684 /* get here if there is no match, so fall thru */
685 }
686
687 /* cycle thru the single-event registrations first */
688 PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.single_events, pmix_event_hdlr_t) {
689 if (evhdlr->codes[0] == chain->status) {
690 if (pmix_notify_check_range(&evhdlr->rng, &chain->source) &&
691 pmix_notify_check_affected(evhdlr->affected, evhdlr->naffected,
692 chain->affected, chain->naffected)) {
693 /* invoke the handler */
694 chain->evhdlr = evhdlr;
695 goto invk;
696 }
697 }
698 }
699
700 /* if we didn't find any match in the single-event registrations,
701 * then cycle thru the multi-event registrations next */
702 PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.multi_events, pmix_event_hdlr_t) {
703 for (i=0; i < evhdlr->ncodes; i++) {
704 if (evhdlr->codes[i] == chain->status) {
705 if (pmix_notify_check_range(&evhdlr->rng, &chain->source) &&
706 pmix_notify_check_affected(evhdlr->affected, evhdlr->naffected,
707 chain->affected, chain->naffected)) {
708 /* invoke the handler */
709 chain->evhdlr = evhdlr;
710 goto invk;
711 }
712 }
713 }
714 }
715
716 /* if they didn't want it to go to a default handler, then ignore them */
717 if (!chain->nondefault) {
718 /* pass it to any default handlers */
719 PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) {
720 if (pmix_notify_check_range(&evhdlr->rng, &chain->source) &&
721 pmix_notify_check_affected(evhdlr->affected, evhdlr->naffected,
722 chain->affected, chain->naffected)) {
723 /* invoke the handler */
724 chain->evhdlr = evhdlr;
725 goto invk;
726 }
727 }
728 }
729
730 /* if we registered a "last" handler, and it fits the given range
731 * and code, then invoke it now */
732 if (NULL != pmix_globals.events.last &&
733 pmix_notify_check_range(&pmix_globals.events.last->rng, &chain->source) &&
734 pmix_notify_check_affected(pmix_globals.events.last->affected, pmix_globals.events.last->naffected,
735 chain->affected, chain->naffected)) {
736 chain->endchain = true; // ensure we don't do this again
737 if (1 == pmix_globals.events.last->ncodes &&
738 pmix_globals.events.last->codes[0] == chain->status) {
739 chain->evhdlr = pmix_globals.events.last;
740 goto invk;
741 } else if (NULL != pmix_globals.events.last->codes) {
742 /* need to check if this code is included in the array */
743 for (i=0; i < pmix_globals.events.last->ncodes; i++) {
744 if (pmix_globals.events.last->codes[i] == chain->status) {
745 chain->evhdlr = pmix_globals.events.last;
746 goto invk;
747 }
748 }
749 } else {
750 /* gets run for all codes */
751 chain->evhdlr = pmix_globals.events.last;
752 goto invk;
753 }
754 }
755
756 /* if we got here, then nothing was found */
757
758 complete:
759 /* we still have to call their final callback */
760 if (NULL != chain->final_cbfunc) {
761 chain->final_cbfunc(rc, chain->final_cbdata);
762 } else {
763 PMIX_RELEASE(chain);
764 }
765 return;
766
767
768 invk:
769 /* start with the chain holding only the given info */
770 chain->ninfo = chain->nallocated - 2;
771
772 /* if the handler has a name, then provide it */
773 if (NULL != chain->evhdlr->name) {
774 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
775 chain->ninfo++;
776 }
777
778 /* if there is an evhdlr cbobject, provide it */
779 if (NULL != chain->evhdlr->cbobject) {
780 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
781 chain->ninfo++;
782 }
783
784 /* invoke the handler */
785 pmix_output_verbose(2, pmix_client_globals.event_output,
786 "[%s:%d] INVOKING EVHDLR %s", __FILE__, __LINE__,
787 (NULL == chain->evhdlr->name) ?
788 "NULL" : chain->evhdlr->name);
789 chain->evhdlr->evhdlr(chain->evhdlr->index,
790 chain->status, &chain->source,
791 chain->info, chain->ninfo,
792 NULL, 0,
793 progress_local_event_hdlr, (void*)chain);
794 return;
795 }
796
local_cbfunc(pmix_status_t status,void * cbdata)797 static void local_cbfunc(pmix_status_t status, void *cbdata)
798 {
799 pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
800
801 if (NULL != cd->cbfunc) {
802 cd->cbfunc(status, cd->cbdata);
803 }
804 PMIX_RELEASE(cd);
805 }
806
_notify_client_event(int sd,short args,void * cbdata)807 static void _notify_client_event(int sd, short args, void *cbdata)
808 {
809 pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
810 pmix_regevents_info_t *reginfoptr;
811 pmix_peer_events_info_t *pr;
812 pmix_event_chain_t *chain;
813 size_t n, nleft;
814 bool matched, holdcd;
815 pmix_buffer_t *bfr;
816 pmix_cmd_t cmd = PMIX_NOTIFY_CMD;
817 pmix_status_t rc;
818 pmix_list_t trk;
819 pmix_namelist_t *nm;
820 pmix_namespace_t *nptr, *tmp;
821 pmix_range_trkr_t rngtrk;
822 pmix_proc_t proc;
823
824 /* need to acquire the object from its originating thread */
825 PMIX_ACQUIRE_OBJECT(cd);
826
827 pmix_output_verbose(2, pmix_server_globals.event_output,
828 "pmix_server: _notify_client_event notifying clients of event %s range %s type %s",
829 PMIx_Error_string(cd->status),
830 PMIx_Data_range_string(cd->range),
831 cd->nondefault ? "NONDEFAULT" : "OPEN");
832
833 /* check for caching instructions */
834 holdcd = true;
835 if (0 < cd->ninfo) {
836 /* check for caching instructions */
837 for (n=0; n < cd->ninfo; n++) {
838 if (PMIX_CHECK_KEY(&cd->info[n], PMIX_EVENT_DO_NOT_CACHE)) {
839 if (PMIX_INFO_TRUE(&cd->info[n])) {
840 holdcd = false;
841 }
842 break;
843 }
844 }
845 }
846 if (holdcd) {
847 /* we cannot know if everyone who wants this notice has had a chance
848 * to register for it - the notice may be coming too early. So cache
849 * the message until all local procs have received it, or it ages to
850 * the point where it gets pushed out by more recent events */
851 PMIX_RETAIN(cd);
852 rc = notify_event_cache(cd);
853 if (PMIX_SUCCESS != rc) {
854 PMIX_ERROR_LOG(rc);
855 }
856 }
857
858 /* we may also have registered for events, so setup to check this
859 * against our registrations */
860 chain = PMIX_NEW(pmix_event_chain_t);
861 chain->status = cd->status;
862 pmix_strncpy(chain->source.nspace, cd->source.nspace, PMIX_MAX_NSLEN);
863 chain->source.rank = cd->source.rank;
864 /* we always leave space for a callback object and
865 * the evhandler name. */
866 chain->nallocated = cd->ninfo + 2;
867 PMIX_INFO_CREATE(chain->info, chain->nallocated);
868 /* prep the chain for processing */
869 pmix_prep_event_chain(chain, cd->info, cd->ninfo, true);
870
871 /* copy setup to the cd object */
872 cd->nondefault = chain->nondefault;
873 if (NULL != chain->targets) {
874 cd->ntargets = chain->ntargets;
875 PMIX_PROC_CREATE(cd->targets, cd->ntargets);
876 memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t));
877 /* compute the number of targets that need to be notified */
878 nleft = 0;
879 for (n=0; n < cd->ntargets; n++) {
880 /* if this is a single proc, then increment by one */
881 if (PMIX_RANK_VALID >= cd->targets[n].rank) {
882 ++nleft;
883 } else {
884 /* look up the nspace for this proc */
885 nptr = NULL;
886 PMIX_LIST_FOREACH(tmp, &pmix_globals.nspaces, pmix_namespace_t) {
887 if (PMIX_CHECK_NSPACE(tmp->nspace, cd->targets[n].nspace)) {
888 nptr = tmp;
889 break;
890 }
891 }
892 /* if we don't yet know it, then nothing to do */
893 if (NULL == nptr) {
894 nleft = SIZE_MAX;
895 break;
896 }
897 /* might notify all local members */
898 nleft += nptr->nlocalprocs;
899 }
900 }
901 cd->nleft = nleft;
902 }
903 if (NULL != chain->affected) {
904 cd->naffected = chain->naffected;
905 PMIX_PROC_CREATE(cd->affected, cd->naffected);
906 if (NULL == cd->affected) {
907 cd->naffected = 0;
908 /* notify the caller */
909 if (NULL != cd->cbfunc) {
910 cd->cbfunc(PMIX_ERR_NOMEM, cd->cbdata);
911 }
912 PMIX_RELEASE(cd);
913 PMIX_RELEASE(chain);
914 return;
915 }
916 memcpy(cd->affected, chain->affected, cd->naffected * sizeof(pmix_proc_t));
917 }
918
919 /* if they provided a PMIX_EVENT_CUSTOM_RANGE info object but
920 * specified a range other than PMIX_RANGE_CUSTOM, then this
921 * is an error */
922 if (PMIX_RANGE_CUSTOM != cd->range && NULL != cd->targets) {
923 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
924 /* notify the caller */
925 if (NULL != cd->cbfunc) {
926 cd->cbfunc(PMIX_ERR_BAD_PARAM, cd->cbdata);
927 }
928 PMIX_RELEASE(cd);
929 PMIX_RELEASE(chain);
930 return;
931 }
932
933 holdcd = false;
934 if (PMIX_RANGE_PROC_LOCAL != cd->range) {
935 PMIX_CONSTRUCT(&trk, pmix_list_t);
936 rngtrk.procs = NULL;
937 rngtrk.nprocs = 0;
938 /* cycle across our registered events and send the message to
939 * any client who registered for it */
940 PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) {
941 if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) ||
942 cd->status == reginfoptr->code) {
943 PMIX_LIST_FOREACH(pr, ®infoptr->peers, pmix_peer_events_info_t) {
944 /* if this client was the source of the event, then
945 * don't send it back as they will have processed it
946 * when they generated it */
947 if (PMIX_CHECK_PROCID(&cd->source, &pr->peer->info->pname)) {
948 continue;
949 }
950 /* if we have already notified this client, then don't do it again */
951 matched = false;
952 PMIX_LIST_FOREACH(nm, &trk, pmix_namelist_t) {
953 if (nm->pname == &pr->peer->info->pname) {
954 matched = true;
955 break;
956 }
957 }
958 if (matched) {
959 continue;
960 }
961 /* check if the affected procs (if given) match those they
962 * wanted to know about */
963 if (!pmix_notify_check_affected(cd->affected, cd->naffected,
964 pr->affected, pr->naffected)) {
965 continue;
966 }
967 /* check the range */
968 if (NULL == cd->targets) {
969 rngtrk.procs = &cd->source;
970 rngtrk.nprocs = 1;
971 } else {
972 rngtrk.procs = cd->targets;
973 rngtrk.nprocs = cd->ntargets;
974 }
975 rngtrk.range = cd->range;
976 PMIX_LOAD_PROCID(&proc, pr->peer->info->pname.nspace, pr->peer->info->pname.rank);
977 if (!pmix_notify_check_range(&rngtrk, &proc)) {
978 continue;
979 }
980 pmix_output_verbose(2, pmix_server_globals.event_output,
981 "pmix_server: notifying client %s:%u on status %s",
982 pr->peer->info->pname.nspace, pr->peer->info->pname.rank,
983 PMIx_Error_string(cd->status));
984
985 /* record that we notified this client */
986 nm = PMIX_NEW(pmix_namelist_t);
987 nm->pname = &pr->peer->info->pname;
988 pmix_list_append(&trk, &nm->super);
989
990 bfr = PMIX_NEW(pmix_buffer_t);
991 if (NULL == bfr) {
992 continue;
993 }
994 /* pack the command */
995 PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cmd, 1, PMIX_COMMAND);
996 if (PMIX_SUCCESS != rc) {
997 PMIX_ERROR_LOG(rc);
998 PMIX_RELEASE(bfr);
999 continue;
1000 }
1001
1002 /* pack the status */
1003 PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->status, 1, PMIX_STATUS);
1004 if (PMIX_SUCCESS != rc) {
1005 PMIX_ERROR_LOG(rc);
1006 PMIX_RELEASE(bfr);
1007 continue;
1008 }
1009
1010 /* pack the source */
1011 PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->source, 1, PMIX_PROC);
1012 if (PMIX_SUCCESS != rc) {
1013 PMIX_ERROR_LOG(rc);
1014 PMIX_RELEASE(bfr);
1015 continue;
1016 }
1017 /* pack any info */
1018 PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->ninfo, 1, PMIX_SIZE);
1019 if (PMIX_SUCCESS != rc) {
1020 PMIX_ERROR_LOG(rc);
1021 PMIX_RELEASE(bfr);
1022 continue;
1023 }
1024
1025 if (0 < cd->ninfo) {
1026 PMIX_BFROPS_PACK(rc, pr->peer, bfr, cd->info, cd->ninfo, PMIX_INFO);
1027 if (PMIX_SUCCESS != rc) {
1028 PMIX_ERROR_LOG(rc);
1029 PMIX_RELEASE(bfr);
1030 continue;
1031 }
1032 }
1033 PMIX_SERVER_QUEUE_REPLY(rc, pr->peer, 0, bfr);
1034 if (PMIX_SUCCESS != rc) {
1035 PMIX_RELEASE(bfr);
1036 }
1037 if (NULL != cd->targets && 0 < cd->nleft) {
1038 /* track the number of targets we have left to notify */
1039 --cd->nleft;
1040 /* if the event was cached and this is the last one,
1041 * then evict this event from the cache */
1042 if (0 == cd->nleft) {
1043 pmix_hotel_checkout(&pmix_globals.notifications, cd->room);
1044 holdcd = false;
1045 break;
1046 }
1047 }
1048 }
1049 }
1050 }
1051 PMIX_LIST_DESTRUCT(&trk);
1052 if (PMIX_RANGE_LOCAL != cd->range && PMIX_CHECK_PROCID(&cd->source, &pmix_globals.myid)) {
1053 /* if we are the source, then we need to post this upwards as
1054 * well so the host RM can broadcast it as necessary */
1055 if (NULL != pmix_host_server.notify_event) {
1056 /* mark that we sent it upstairs so we don't release
1057 * the caddy until we return from the host RM */
1058 holdcd = true;
1059 pmix_host_server.notify_event(cd->status, &cd->source, cd->range,
1060 cd->info, cd->ninfo, local_cbfunc, cd);
1061 }
1062 }
1063 }
1064
1065 /* process it ourselves */
1066 pmix_invoke_local_event_hdlr(chain);
1067
1068 if (!holdcd) {
1069 /* notify the caller */
1070 if (NULL != cd->cbfunc) {
1071 cd->cbfunc(PMIX_SUCCESS, cd->cbdata);
1072 }
1073 PMIX_RELEASE(cd);
1074 }
1075 }
1076
1077
1078 /* as a server, we must do two things:
1079 *
1080 * (a) notify all clients that have registered for this event
1081 *
1082 * (b) callback any of our own functions that have registered
1083 * for this event
1084 */
pmix_server_notify_client_of_event(pmix_status_t status,const pmix_proc_t * source,pmix_data_range_t range,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)1085 pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
1086 const pmix_proc_t *source,
1087 pmix_data_range_t range,
1088 const pmix_info_t info[], size_t ninfo,
1089 pmix_op_cbfunc_t cbfunc, void *cbdata)
1090 {
1091 pmix_notify_caddy_t *cd;
1092 size_t n;
1093
1094 pmix_output_verbose(2, pmix_server_globals.event_output,
1095 "pmix_server: notify client of event %s",
1096 PMIx_Error_string(status));
1097
1098 if (NULL != info) {
1099 for (n=0; n < ninfo; n++) {
1100 if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_PROXY) &&
1101 PMIX_CHECK_PROCID(info[n].value.data.proc, &pmix_globals.myid)) {
1102 return PMIX_OPERATION_SUCCEEDED;
1103 }
1104 }
1105 }
1106
1107 cd = PMIX_NEW(pmix_notify_caddy_t);
1108 cd->status = status;
1109 if (NULL == source) {
1110 pmix_strncpy(cd->source.nspace, "UNDEF", PMIX_MAX_NSLEN);
1111 cd->source.rank = PMIX_RANK_UNDEF;
1112 } else {
1113 pmix_strncpy(cd->source.nspace, source->nspace, PMIX_MAX_NSLEN);
1114 cd->source.rank = source->rank;
1115 }
1116 cd->range = range;
1117 /* have to copy the info to preserve it for future when cached */
1118 if (0 < ninfo && NULL != info) {
1119 cd->ninfo = ninfo;
1120 PMIX_INFO_CREATE(cd->info, cd->ninfo);
1121 /* need to copy the info */
1122 for (n=0; n < cd->ninfo; n++) {
1123 PMIX_INFO_XFER(&cd->info[n], &info[n]);
1124 }
1125 }
1126
1127 /* track the eventual callback info */
1128 cd->cbfunc = cbfunc;
1129 cd->cbdata = cbdata;
1130
1131 pmix_output_verbose(2, pmix_server_globals.event_output,
1132 "pmix_server_notify_event status =%d, source = %s:%d, ninfo =%lu",
1133 status, cd->source.nspace, cd->source.rank, ninfo);
1134
1135 /* we have to push this into our event library to avoid
1136 * potential threading issues */
1137 PMIX_THREADSHIFT(cd, _notify_client_event);
1138 return PMIX_SUCCESS;
1139 }
1140
pmix_notify_check_range(pmix_range_trkr_t * rng,const pmix_proc_t * proc)1141 bool pmix_notify_check_range(pmix_range_trkr_t *rng,
1142 const pmix_proc_t *proc)
1143 {
1144 size_t n;
1145
1146 if (PMIX_RANGE_UNDEF == rng->range ||
1147 PMIX_RANGE_GLOBAL == rng->range ||
1148 PMIX_RANGE_SESSION == rng->range ||
1149 PMIX_RANGE_LOCAL == rng->range) { // assume RM took care of session & local for now
1150 return true;
1151 }
1152 if (PMIX_RANGE_NAMESPACE == rng->range) {
1153 for (n=0; n < rng->nprocs; n++) {
1154 if (PMIX_CHECK_NSPACE(rng->procs[n].nspace, proc->nspace)) {
1155 return true;
1156 }
1157 }
1158 return false;
1159 }
1160 if (PMIX_RANGE_PROC_LOCAL == rng->range) {
1161 for (n=0; n < rng->nprocs; n++) {
1162 if (PMIX_CHECK_PROCID(&rng->procs[n], proc)) {
1163 return true;
1164 }
1165 }
1166 return false;
1167 }
1168 if (PMIX_RANGE_CUSTOM == rng->range) {
1169 /* see if this proc was included */
1170 for (n=0; n < rng->nprocs; n++) {
1171 if (0 != strncmp(rng->procs[n].nspace, proc->nspace, PMIX_MAX_NSLEN)) {
1172 continue;
1173 }
1174 if (PMIX_RANK_WILDCARD == rng->procs[n].rank ||
1175 rng->procs[n].rank == proc->rank) {
1176 return true;
1177 }
1178 }
1179 /* if we get here, then this proc isn't in range */
1180 return false;
1181 }
1182
1183 /* if it is anything else, then reject it */
1184 return false;
1185 }
1186
pmix_notify_check_affected(pmix_proc_t * interested,size_t ninterested,pmix_proc_t * affected,size_t naffected)1187 bool pmix_notify_check_affected(pmix_proc_t *interested, size_t ninterested,
1188 pmix_proc_t *affected, size_t naffected)
1189 {
1190 size_t m, n;
1191
1192 /* if they didn't restrict their interests, then accept it */
1193 if (NULL == interested) {
1194 return true;
1195 }
1196 /* if we weren't given the affected procs, then accept it */
1197 if (NULL == affected) {
1198 return true;
1199 }
1200 /* check if the two overlap */
1201 for (n=0; n < naffected; n++) {
1202 for (m=0; m < ninterested; m++) {
1203 if (PMIX_CHECK_PROCID(&affected[n], &interested[m])) {
1204 return true;
1205 }
1206 }
1207 }
1208 /* if we get here, then this proc isn't in range */
1209 return false;
1210
1211 }
1212
pmix_event_timeout_cb(int fd,short flags,void * arg)1213 void pmix_event_timeout_cb(int fd, short flags, void *arg)
1214 {
1215 pmix_event_chain_t *ch = (pmix_event_chain_t*)arg;
1216
1217 /* need to acquire the object from its originating thread */
1218 PMIX_ACQUIRE_OBJECT(ch);
1219
1220 ch->timer_active = false;
1221
1222 /* remove it from the list */
1223 pmix_list_remove_item(&pmix_globals.cached_events, &ch->super);
1224
1225 /* process this event thru the regular channels */
1226 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
1227 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
1228 pmix_server_notify_client_of_event(ch->status, &ch->source,
1229 ch->range, ch->info, ch->ninfo,
1230 ch->final_cbfunc, ch->final_cbdata);
1231 } else {
1232 pmix_invoke_local_event_hdlr(ch);
1233 }
1234 }
1235
pmix_prep_event_chain(pmix_event_chain_t * chain,const pmix_info_t * info,size_t ninfo,bool xfer)1236 pmix_status_t pmix_prep_event_chain(pmix_event_chain_t *chain,
1237 const pmix_info_t *info, size_t ninfo,
1238 bool xfer)
1239 {
1240 size_t n;
1241
1242 if (NULL != info && 0 < ninfo) {
1243 chain->ninfo = ninfo;
1244 if (NULL == chain->info) {
1245 PMIX_INFO_CREATE(chain->info, chain->ninfo);
1246 }
1247 /* need to copy the info */
1248 for (n=0; n < ninfo; n++) {
1249 if (xfer) {
1250 /* chain doesn't already have a copy of the info */
1251 PMIX_INFO_XFER(&chain->info[n], &info[n]);
1252 }
1253 /* look for specific directives */
1254 if (0 == strncmp(info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
1255 chain->nondefault = PMIX_INFO_TRUE(&info[n]);
1256 } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_CUSTOM_RANGE)) {
1257 /* provides an array of pmix_proc_t identifying the procs
1258 * that are to receive this notification, or a single pmix_proc_t */
1259 if (PMIX_DATA_ARRAY == info[n].value.type &&
1260 NULL != info[n].value.data.darray &&
1261 NULL != info[n].value.data.darray->array) {
1262 chain->ntargets = info[n].value.data.darray->size;
1263 PMIX_PROC_CREATE(chain->targets, chain->ntargets);
1264 memcpy(chain->targets, info[n].value.data.darray->array, chain->ntargets * sizeof(pmix_proc_t));
1265 } else if (PMIX_PROC == info[n].value.type) {
1266 chain->ntargets = 1;
1267 PMIX_PROC_CREATE(chain->targets, chain->ntargets);
1268 memcpy(chain->targets, info[n].value.data.proc, sizeof(pmix_proc_t));
1269 } else {
1270 /* this is an error */
1271 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1272 return PMIX_ERR_BAD_PARAM;
1273 }
1274 } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROC)) {
1275 PMIX_PROC_CREATE(chain->affected, 1);
1276 if (NULL == chain->affected) {
1277 return PMIX_ERR_NOMEM;
1278 }
1279 chain->naffected = 1;
1280 memcpy(chain->affected, info[n].value.data.proc, sizeof(pmix_proc_t));
1281 } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROCS)) {
1282 chain->naffected = info[n].value.data.darray->size;
1283 PMIX_PROC_CREATE(chain->affected, chain->naffected);
1284 if (NULL == chain->affected) {
1285 chain->naffected = 0;
1286 return PMIX_ERR_NOMEM;
1287 }
1288 memcpy(chain->affected, info[n].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t));
1289 }
1290 }
1291 }
1292 return PMIX_SUCCESS;
1293 }
1294
1295 /**** CLASS INSTANTIATIONS ****/
1296
sevcon(pmix_event_hdlr_t * p)1297 static void sevcon(pmix_event_hdlr_t *p)
1298 {
1299 p->name = NULL;
1300 p->index = UINT_MAX;
1301 p->precedence = PMIX_EVENT_ORDER_NONE;
1302 p->locator = NULL;
1303 p->rng.range = PMIX_RANGE_UNDEF;
1304 p->rng.procs = NULL;
1305 p->rng.nprocs = 0;
1306 p->affected = NULL;
1307 p->naffected = 0;
1308 p->evhdlr = NULL;
1309 p->cbobject = NULL;
1310 p->codes = NULL;
1311 p->ncodes = 0;
1312 }
sevdes(pmix_event_hdlr_t * p)1313 static void sevdes(pmix_event_hdlr_t *p)
1314 {
1315 if (NULL != p->name) {
1316 free(p->name);
1317 }
1318 if (NULL != p->locator) {
1319 free(p->locator);
1320 }
1321 if (NULL != p->rng.procs) {
1322 free(p->rng.procs);
1323 }
1324 if (NULL != p->affected) {
1325 PMIX_PROC_FREE(p->affected, p->naffected);
1326 }
1327 if (NULL != p->codes) {
1328 free(p->codes);
1329 }
1330 }
1331 PMIX_CLASS_INSTANCE(pmix_event_hdlr_t,
1332 pmix_list_item_t,
1333 sevcon, sevdes);
1334
accon(pmix_active_code_t * p)1335 static void accon(pmix_active_code_t *p)
1336 {
1337 p->nregs = 0;
1338 }
1339 PMIX_CLASS_INSTANCE(pmix_active_code_t,
1340 pmix_list_item_t,
1341 accon, NULL);
1342
evcon(pmix_events_t * p)1343 static void evcon(pmix_events_t *p)
1344 {
1345 p->nhdlrs = 0;
1346 p->first = NULL;
1347 p->last = NULL;
1348 PMIX_CONSTRUCT(&p->actives, pmix_list_t);
1349 PMIX_CONSTRUCT(&p->single_events, pmix_list_t);
1350 PMIX_CONSTRUCT(&p->multi_events, pmix_list_t);
1351 PMIX_CONSTRUCT(&p->default_events, pmix_list_t);
1352 }
evdes(pmix_events_t * p)1353 static void evdes(pmix_events_t *p)
1354 {
1355 if (NULL != p->first) {
1356 PMIX_RELEASE(p->first);
1357 }
1358 if (NULL != p->last) {
1359 PMIX_RELEASE(p->last);
1360 }
1361 PMIX_LIST_DESTRUCT(&p->actives);
1362 PMIX_LIST_DESTRUCT(&p->single_events);
1363 PMIX_LIST_DESTRUCT(&p->multi_events);
1364 PMIX_LIST_DESTRUCT(&p->default_events);
1365 }
1366 PMIX_CLASS_INSTANCE(pmix_events_t,
1367 pmix_object_t,
1368 evcon, evdes);
1369
chcon(pmix_event_chain_t * p)1370 static void chcon(pmix_event_chain_t *p)
1371 {
1372 p->timer_active = false;
1373 memset(p->source.nspace, 0, PMIX_MAX_NSLEN+1);
1374 p->source.rank = PMIX_RANK_UNDEF;
1375 p->nondefault = false;
1376 p->endchain = false;
1377 p->targets = NULL;
1378 p->ntargets = 0;
1379 p->range = PMIX_RANGE_UNDEF;
1380 p->affected = NULL;
1381 p->naffected = 0;
1382 p->info = NULL;
1383 p->ninfo = 0;
1384 p->nallocated = 0;
1385 p->results = NULL;
1386 p->nresults = 0;
1387 p->evhdlr = NULL;
1388 p->final_cbfunc = NULL;
1389 p->final_cbdata = NULL;
1390 }
chdes(pmix_event_chain_t * p)1391 static void chdes(pmix_event_chain_t *p)
1392 {
1393 if (p->timer_active) {
1394 pmix_event_del(&p->ev);
1395 }
1396 if (NULL != p->targets) {
1397 PMIX_PROC_FREE(p->targets, p->ntargets);
1398 }
1399 if (NULL != p->affected) {
1400 PMIX_PROC_FREE(p->affected, p->naffected);
1401 }
1402 if (NULL != p->info) {
1403 PMIX_INFO_FREE(p->info, p->nallocated);
1404 }
1405 if (NULL != p->results) {
1406 PMIX_INFO_FREE(p->results, p->nresults);
1407 }
1408 }
1409 PMIX_CLASS_INSTANCE(pmix_event_chain_t,
1410 pmix_list_item_t,
1411 chcon, chdes);
1412