1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
4  * Copyright (c) 2017-2019 Research Organization for Information Science
5  *                         and Technology (RIST).  All rights reserved.
6  * Copyright (c) 2017      IBM Corporation. All rights reserved.
7  *
8  * $COPYRIGHT$
9  *
10  * Additional copyrights may follow
11  *
12  * $HEADER$
13  */
14 #include <src/include/pmix_config.h>
15 
16 #include <pmix.h>
17 #include <pmix_common.h>
18 #include <pmix_server.h>
19 #include <pmix_rename.h>
20 
21 #include "src/threads/threads.h"
22 #include "src/util/error.h"
23 #include "src/util/output.h"
24 
25 #include "src/mca/bfrops/bfrops.h"
26 #include "src/client/pmix_client_ops.h"
27 #include "src/server/pmix_server_ops.h"
28 #include "src/include/pmix_globals.h"
29 
30 static pmix_status_t notify_server_of_event(pmix_status_t status,
31                                             const pmix_proc_t *source,
32                                             pmix_data_range_t range,
33                                             const pmix_info_t info[], size_t ninfo,
34                                             pmix_op_cbfunc_t cbfunc, void *cbdata);
35 
36 /* if we are a client, we call this function to notify the server of
37  * an event. If we are a server, our host RM will call this function
38  * to notify us of an event */
PMIx_Notify_event(pmix_status_t status,const pmix_proc_t * source,pmix_data_range_t range,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)39 PMIX_EXPORT pmix_status_t PMIx_Notify_event(pmix_status_t status,
40                                             const pmix_proc_t *source,
41                                             pmix_data_range_t range,
42                                             const pmix_info_t info[], size_t ninfo,
43                                             pmix_op_cbfunc_t cbfunc, void *cbdata)
44 {
45     int rc;
46 
47     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
48 
49     if (pmix_globals.init_cntr <= 0) {
50         PMIX_RELEASE_THREAD(&pmix_global_lock);
51         return PMIX_ERR_INIT;
52     }
53 
54     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
55         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
56         PMIX_RELEASE_THREAD(&pmix_global_lock);
57 
58         pmix_output_verbose(2, pmix_server_globals.event_output,
59                             "pmix_server_notify_event source = %s:%d event_status = %s",
60                             (NULL == source) ? "UNKNOWN" : source->nspace,
61                             (NULL == source) ? PMIX_RANK_WILDCARD : source->rank, PMIx_Error_string(status));
62 
63         rc = pmix_server_notify_client_of_event(status, source, range,
64                                                 info, ninfo,
65                                                 cbfunc, cbdata);
66 
67         if (PMIX_SUCCESS != rc && PMIX_OPERATION_SUCCEEDED != rc) {
68             PMIX_ERROR_LOG(rc);
69         }
70         return rc;
71     }
72 
73     /* if we aren't connected, don't attempt to send */
74     if (!pmix_globals.connected) {
75         PMIX_RELEASE_THREAD(&pmix_global_lock);
76         return PMIX_ERR_UNREACH;
77     }
78     PMIX_RELEASE_THREAD(&pmix_global_lock);
79     pmix_output_verbose(2, pmix_client_globals.event_output,
80                         "pmix_client_notify_event source = %s:%d event_status =%d",
81                         (NULL == source) ? pmix_globals.myid.nspace : source->nspace,
82                         (NULL == source) ? pmix_globals.myid.rank : source->rank, status);
83 
84     rc = notify_server_of_event(status, source, range,
85                                 info, ninfo,
86                                 cbfunc, cbdata);
87     if (PMIX_SUCCESS != rc) {
88         PMIX_ERROR_LOG(rc);
89     }
90     return rc;
91 }
92 
notify_event_cbfunc(struct pmix_peer_t * pr,pmix_ptl_hdr_t * hdr,pmix_buffer_t * buf,void * cbdata)93 static void notify_event_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr,
94                                 pmix_buffer_t *buf, void *cbdata)
95 {
96     pmix_status_t rc, ret;
97     int32_t cnt = 1;
98     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
99 
100     /* unpack the status */
101     PMIX_BFROPS_UNPACK(rc, pr, buf, &ret, &cnt, PMIX_STATUS);
102     if (PMIX_SUCCESS != rc) {
103         PMIX_ERROR_LOG(rc);
104         ret = rc;
105     }
106     /* do the cback */
107     if (NULL != cb->cbfunc.opfn) {
108         cb->cbfunc.opfn(ret, cb->cbdata);
109     }
110     PMIX_RELEASE(cb);
111 }
112 
notify_event_cache(pmix_notify_caddy_t * cd)113 static pmix_status_t notify_event_cache(pmix_notify_caddy_t *cd)
114 {
115     pmix_status_t rc;
116     int j;
117     pmix_notify_caddy_t *pk;
118     int idx;
119     time_t etime;
120 
121     /* add to our cache */
122     rc = pmix_hotel_checkin(&pmix_globals.notifications, cd, &cd->room);
123     /* if there wasn't room, then search for the longest tenured
124      * occupant and evict them */
125     if (PMIX_SUCCESS != rc) {
126         etime = 0;
127         idx = -1;
128         for (j=0; j < pmix_globals.max_events; j++) {
129             pmix_hotel_knock(&pmix_globals.notifications, j, (void**)&pk);
130             if (NULL == pk) {
131                 /* hey, there is room! */
132                 pmix_hotel_checkin_with_res(&pmix_globals.notifications, cd, &cd->room);
133                 return PMIX_SUCCESS;
134             }
135             /* check the age */
136             if (0 == j) {
137                 etime = pk->ts;
138                 idx = j;
139             } else {
140                 if (difftime(pk->ts, etime) < 0) {
141                     etime = pk->ts;
142                     idx = j;
143                 }
144             }
145         }
146         if (0 <= idx) {
147             /* we found the oldest occupant - evict it */
148             pmix_hotel_checkout_and_return_occupant(&pmix_globals.notifications, idx, (void**)&pk);
149             PMIX_RELEASE(pk);
150             rc = pmix_hotel_checkin(&pmix_globals.notifications, cd, &cd->room);
151         }
152     }
153     return rc;
154 }
155 
156 /* as a client, we pass the notification to our server */
notify_server_of_event(pmix_status_t status,const pmix_proc_t * source,pmix_data_range_t range,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)157 static pmix_status_t notify_server_of_event(pmix_status_t status,
158                                             const pmix_proc_t *source,
159                                             pmix_data_range_t range,
160                                             const pmix_info_t info[], size_t ninfo,
161                                             pmix_op_cbfunc_t cbfunc, void *cbdata)
162 {
163     pmix_status_t rc;
164     pmix_buffer_t *msg = NULL;
165     pmix_cmd_t cmd = PMIX_NOTIFY_CMD;
166     pmix_cb_t *cb;
167     pmix_event_chain_t *chain;
168     size_t n;
169     pmix_notify_caddy_t *cd;
170 
171     pmix_output_verbose(2, pmix_client_globals.event_output,
172                         "[%s:%d] client: notifying server %s:%d of status %s for range %s",
173                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
174                         pmix_client_globals.myserver->info->pname.nspace,
175                         pmix_client_globals.myserver->info->pname.rank,
176                         PMIx_Error_string(status), PMIx_Data_range_string(range));
177 
178     if (PMIX_RANGE_PROC_LOCAL != range) {
179         /* create the msg object */
180         msg = PMIX_NEW(pmix_buffer_t);
181         if (NULL == msg) {
182             return PMIX_ERR_NOMEM;
183         }
184         /* pack the command */
185         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &cmd, 1, PMIX_COMMAND);
186         if (PMIX_SUCCESS != rc) {
187             PMIX_ERROR_LOG(rc);
188             goto cleanup;
189         }
190         /* pack the status */
191         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &status, 1, PMIX_STATUS);
192         if (PMIX_SUCCESS != rc) {
193             PMIX_ERROR_LOG(rc);
194             goto cleanup;
195         }
196         /* no need to pack the source as it is us */
197 
198         /* pack the range */
199         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &range, 1, PMIX_DATA_RANGE);
200         if (PMIX_SUCCESS != rc) {
201             PMIX_ERROR_LOG(rc);
202             goto cleanup;
203         }
204         /* pack the info */
205         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &ninfo, 1, PMIX_SIZE);
206         if (PMIX_SUCCESS != rc) {
207             PMIX_ERROR_LOG(rc);
208             goto cleanup;
209         }
210         if (0 < ninfo) {
211             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, info, ninfo, PMIX_INFO);
212             if (PMIX_SUCCESS != rc) {
213                 PMIX_ERROR_LOG(rc);
214                 goto cleanup;
215             }
216         }
217     }
218 
219     /* setup for our own local callbacks */
220     chain = PMIX_NEW(pmix_event_chain_t);
221     chain->status = status;
222     pmix_strncpy(chain->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN);
223     chain->source.rank = pmix_globals.myid.rank;
224     /* we always leave space for event hdlr name and a callback object */
225     chain->nallocated = ninfo + 2;
226     PMIX_INFO_CREATE(chain->info, chain->nallocated);
227     /* prep the chain for processing */
228     pmix_prep_event_chain(chain, info, ninfo, true);
229 
230     /* we need to cache this event so we can pass it into
231      * ourselves should someone later register for it */
232     cd = PMIX_NEW(pmix_notify_caddy_t);
233     cd->status = status;
234     if (NULL == source) {
235         pmix_strncpy(cd->source.nspace, "UNDEF", PMIX_MAX_NSLEN);
236         cd->source.rank = PMIX_RANK_UNDEF;
237     } else {
238         pmix_strncpy(cd->source.nspace, source->nspace, PMIX_MAX_NSLEN);
239         cd->source.rank = source->rank;
240     }
241     cd->range = range;
242     if (0 < chain->ninfo) {
243         cd->ninfo = chain->ninfo;
244         PMIX_INFO_CREATE(cd->info, cd->ninfo);
245         cd->nondefault = chain->nondefault;
246        /* need to copy the info */
247         for (n=0; n < cd->ninfo; n++) {
248             PMIX_INFO_XFER(&cd->info[n], &chain->info[n]);
249         }
250     }
251     if (NULL != chain->targets) {
252         cd->ntargets = chain->ntargets;
253         PMIX_PROC_CREATE(cd->targets, cd->ntargets);
254         memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t));
255     }
256     if (NULL != chain->affected) {
257         cd->naffected = chain->naffected;
258         PMIX_PROC_CREATE(cd->affected, cd->naffected);
259         if (NULL == cd->affected) {
260             cd->naffected = 0;
261             rc = PMIX_ERR_NOMEM;
262             goto cleanup;
263         }
264         memcpy(cd->affected, chain->affected, cd->naffected * sizeof(pmix_proc_t));
265     }
266     /* cache it */
267     rc = notify_event_cache(cd);
268     if (PMIX_SUCCESS != rc) {
269         PMIX_ERROR_LOG(rc);
270         PMIX_RELEASE(cd);
271         goto cleanup;
272     }
273 
274     if (PMIX_RANGE_PROC_LOCAL != range && NULL != msg) {
275         /* create a callback object as we need to pass it to the
276          * recv routine so we know which callback to use when
277          * the server acks/nacks the register events request. The
278          * server will _not_ send this notification back to us,
279          * so we handle it locally */
280         cb = PMIX_NEW(pmix_cb_t);
281         cb->cbfunc.opfn = cbfunc;
282         cb->cbdata = cbdata;
283         /* send to the server */
284         pmix_output_verbose(2, pmix_client_globals.event_output,
285                             "[%s:%d] client: notifying server %s:%d - sending",
286                             pmix_globals.myid.nspace, pmix_globals.myid.rank,
287                             pmix_client_globals.myserver->info->pname.nspace,
288                             pmix_client_globals.myserver->info->pname.rank);
289         PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
290                            msg, notify_event_cbfunc, cb);
291         if (PMIX_SUCCESS != rc) {
292             PMIX_ERROR_LOG(rc);
293             PMIX_RELEASE(cb);
294             goto cleanup;
295         }
296     } else if (NULL != cbfunc) {
297         cbfunc(PMIX_SUCCESS, cbdata);
298     }
299 
300     /* now notify any matching registered callbacks we have */
301     pmix_invoke_local_event_hdlr(chain);
302 
303     return PMIX_SUCCESS;
304 
305   cleanup:
306     pmix_output_verbose(2, pmix_client_globals.event_output,
307                         "client: notifying server - unable to send");
308     if (NULL != msg) {
309         PMIX_RELEASE(msg);
310     }
311     /* we were unable to send anything, so we just return the error */
312     return rc;
313 }
314 
315 
progress_local_event_hdlr(pmix_status_t status,pmix_info_t * results,size_t nresults,pmix_op_cbfunc_t cbfunc,void * thiscbdata,void * notification_cbdata)316 static void progress_local_event_hdlr(pmix_status_t status,
317                                       pmix_info_t *results, size_t nresults,
318                                       pmix_op_cbfunc_t cbfunc, void *thiscbdata,
319                                       void *notification_cbdata)
320 {
321     /* this may be in the host's thread, so we need to threadshift it
322      * before accessing our internal data */
323 
324     pmix_event_chain_t *chain = (pmix_event_chain_t*)notification_cbdata;
325     size_t n, nsave, cnt;
326     pmix_info_t *newinfo;
327     pmix_list_item_t *item;
328     pmix_event_hdlr_t *nxt;
329 
330     /* aggregate the results per RFC0018 - first search the
331      * prior chained results to see if any keys have been NULL'd
332      * as this indicates that info struct should be removed */
333     nsave = 0;
334     for (n=0; n < chain->nresults; n++) {
335         if (0 < strlen(chain->results[n].key)) {
336             ++nsave;
337         }
338     }
339     /* we have to at least record the status returned by each
340      * stage of the event handler chain, so we have to reallocate
341      * the array to make space */
342 
343     /* add in any new results plus space for the returned status */
344     nsave += nresults + 1;
345     /* create the new space */
346     PMIX_INFO_CREATE(newinfo, nsave);
347     /* transfer over the prior data */
348     cnt = 0;
349     for (n=0; n < chain->nresults; n++) {
350         if (0 < strlen(chain->results[n].key)) {
351             PMIX_INFO_XFER(&newinfo[cnt], &chain->results[n]);
352             ++cnt;
353         }
354     }
355 
356     /* save this handler's returned status */
357     if (NULL != chain->evhdlr->name) {
358         pmix_strncpy(newinfo[cnt].key, chain->evhdlr->name, PMIX_MAX_KEYLEN);
359     } else {
360         pmix_strncpy(newinfo[cnt].key, "UNKNOWN", PMIX_MAX_KEYLEN);
361     }
362     newinfo[cnt].value.type = PMIX_STATUS;
363     newinfo[cnt].value.data.status = status;
364     ++cnt;
365     /* transfer across the new results */
366     for (n=0; n < nresults; n++) {
367         PMIX_INFO_XFER(&newinfo[cnt], &results[n]);
368         ++cnt;
369     }
370     /* release the prior results */
371     if (0 < chain->nresults) {
372         PMIX_INFO_FREE(chain->results, chain->nresults);
373     }
374     /* pass along the new ones */
375     chain->results = newinfo;
376     chain->nresults = cnt;
377     /* clear any loaded name and object */
378     chain->ninfo = chain->nallocated - 2;
379     PMIX_INFO_DESTRUCT(&chain->info[chain->nallocated-2]);
380     PMIX_INFO_DESTRUCT(&chain->info[chain->nallocated-1]);
381 
382     /* if the caller indicates that the chain is completed,
383      * or we completed the "last" event */
384     if (PMIX_EVENT_ACTION_COMPLETE == status || chain->endchain) {
385         goto complete;
386     }
387     item = NULL;
388 
389     /* see if we need to continue, starting with the single code events */
390     if (1 == chain->evhdlr->ncodes) {
391         /* the last handler was for a single code - see if there are
392          * any others that match this event */
393         item = &chain->evhdlr->super;
394         while (pmix_list_get_end(&pmix_globals.events.single_events) != (item = pmix_list_get_next(item))) {
395             nxt = (pmix_event_hdlr_t*)item;
396             if (nxt->codes[0] == chain->status &&
397                 pmix_notify_check_range(&nxt->rng, &chain->source) &&
398                 pmix_notify_check_affected(nxt->affected, nxt->naffected,
399                                            chain->affected, chain->naffected)) {
400                 chain->evhdlr = nxt;
401                 /* reset our count to the info provided by the caller */
402                 chain->ninfo = chain->nallocated - 2;
403                 /* if the handler has a name, then provide it */
404                 if (NULL != chain->evhdlr->name) {
405                     PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
406                     chain->ninfo++;
407                 }
408 
409                 /* if there is an evhdlr cbobject, provide it */
410                 if (NULL != chain->evhdlr->cbobject) {
411                     PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
412                     chain->ninfo++;
413                 }
414                 nxt->evhdlr(nxt->index,
415                             chain->status, &chain->source,
416                             chain->info, chain->ninfo,
417                             chain->results, chain->nresults,
418                             progress_local_event_hdlr, (void*)chain);
419                 return;
420             }
421         }
422         /* if we get here, then there are no more single code
423          * events that match */
424         item = pmix_list_get_begin(&pmix_globals.events.multi_events);
425     }
426 
427     /* see if we need to continue with the multi code events */
428     if (NULL != chain->evhdlr->codes || NULL != item) {
429         /* the last handler was for a multi-code event, or we exhausted
430          * all the single code events */
431         if (NULL == item) {
432             /* if the last handler was multi-code, then start from that point */
433             item = &chain->evhdlr->super;
434         }
435         while (pmix_list_get_end(&pmix_globals.events.multi_events) != (item = pmix_list_get_next(item))) {
436             nxt = (pmix_event_hdlr_t*)item;
437             if (!pmix_notify_check_range(&nxt->rng, &chain->source) ||
438                 !pmix_notify_check_affected(nxt->affected, nxt->naffected,
439                                             chain->affected, chain->naffected)) {
440                 continue;
441             }
442             for (n=0; n < nxt->ncodes; n++) {
443                 /* if this event handler provided a range, check to see if
444                  * the source fits within it */
445                 if (nxt->codes[n] == chain->status) {
446                     chain->evhdlr = nxt;
447                     /* reset our count to the info provided by the caller */
448                     chain->ninfo = chain->nallocated - 2;
449                     /* if the handler has a name, then provide it */
450                     if (NULL != chain->evhdlr->name) {
451                         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
452                         chain->ninfo++;
453                     }
454 
455                     /* if there is an evhdlr cbobject, provide it */
456                     if (NULL != chain->evhdlr->cbobject) {
457                         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
458                         chain->ninfo++;
459                     }
460                     nxt->evhdlr(nxt->index,
461                                 chain->status, &chain->source,
462                                 chain->info, chain->ninfo,
463                                 chain->results, chain->nresults,
464                                 progress_local_event_hdlr, (void*)chain);
465                     return;
466                 }
467             }
468         }
469         /* if we get here, then there are no more multi-mode
470          * events that match */
471         item = pmix_list_get_begin(&pmix_globals.events.default_events);
472     }
473 
474     /* if they didn't want it to go to a default handler, then ignore them */
475     if (!chain->nondefault) {
476         if (NULL == item) {
477             item = &chain->evhdlr->super;
478         }
479         if (pmix_list_get_end(&pmix_globals.events.default_events) != (item = pmix_list_get_next(item))) {
480             nxt = (pmix_event_hdlr_t*)item;
481             /* if this event handler provided a range, check to see if
482              * the source fits within it */
483             if (pmix_notify_check_range(&nxt->rng, &chain->source) &&
484                 pmix_notify_check_affected(nxt->affected, nxt->naffected,
485                                            chain->affected, chain->naffected)) {
486                 chain->evhdlr = nxt;
487                 /* reset our count to the info provided by the caller */
488                 chain->ninfo = chain->nallocated - 2;
489                 /* if the handler has a name, then provide it */
490                 if (NULL != chain->evhdlr->name) {
491                     PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
492                     chain->ninfo++;
493                 }
494 
495                 /* if there is an evhdlr cbobject, provide it */
496                 if (NULL != chain->evhdlr->cbobject) {
497                     PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
498                     chain->ninfo++;
499                 }
500                 nxt->evhdlr(nxt->index,
501                             chain->status, &chain->source,
502                             chain->info, chain->ninfo,
503                             chain->results, chain->nresults,
504                             progress_local_event_hdlr, (void*)chain);
505                 return;
506             }
507         }
508     }
509 
510     /* if we registered a "last" handler, and it fits the given range
511      * and code, then invoke it now */
512     if (NULL != pmix_globals.events.last &&
513         pmix_notify_check_range(&pmix_globals.events.last->rng, &chain->source) &&
514         pmix_notify_check_affected(pmix_globals.events.last->affected, pmix_globals.events.last->naffected,
515                                    chain->affected, chain->naffected)) {
516         chain->endchain = true;  // ensure we don't do this again
517         if (1 == pmix_globals.events.last->ncodes &&
518             pmix_globals.events.last->codes[0] == chain->status) {
519             chain->evhdlr = pmix_globals.events.last;
520             /* reset our count to the info provided by the caller */
521             chain->ninfo = chain->nallocated - 2;
522             /* if the handler has a name, then provide it */
523             if (NULL != chain->evhdlr->name) {
524                 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
525                 chain->ninfo++;
526             }
527 
528             /* if there is an evhdlr cbobject, provide it */
529             if (NULL != chain->evhdlr->cbobject) {
530                 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
531                 chain->ninfo++;
532             }
533             chain->evhdlr->evhdlr(chain->evhdlr->index,
534                                   chain->status, &chain->source,
535                                   chain->info, chain->ninfo,
536                                   chain->results, chain->nresults,
537                                   progress_local_event_hdlr, (void*)chain);
538             return;
539         } else if (NULL != pmix_globals.events.last->codes) {
540             /* need to check if this code is included in the array */
541             for (n=0; n < pmix_globals.events.last->ncodes; n++) {
542                 if (pmix_globals.events.last->codes[n] == chain->status) {
543                     chain->evhdlr = pmix_globals.events.last;
544                     /* reset our count to the info provided by the caller */
545                     chain->ninfo = chain->nallocated - 2;
546                     /* if the handler has a name, then provide it */
547                     if (NULL != chain->evhdlr->name) {
548                         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
549                         chain->ninfo++;
550                     }
551 
552                     /* if there is an evhdlr cbobject, provide it */
553                     if (NULL != chain->evhdlr->cbobject) {
554                         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
555                         chain->ninfo++;
556                     }
557                     chain->evhdlr->evhdlr(chain->evhdlr->index,
558                                           chain->status, &chain->source,
559                                           chain->info, chain->ninfo,
560                                           chain->results, chain->nresults,
561                                           progress_local_event_hdlr, (void*)chain);
562                     return;
563                 }
564             }
565         } else {
566             /* gets run for all codes */
567             chain->evhdlr = pmix_globals.events.last;
568             /* reset our count to the info provided by the caller */
569             chain->ninfo = chain->nallocated - 2;
570             /* if the handler has a name, then provide it */
571             if (NULL != chain->evhdlr->name) {
572                 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
573                 chain->ninfo++;
574             }
575 
576             /* if there is an evhdlr cbobject, provide it */
577             if (NULL != chain->evhdlr->cbobject) {
578                 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
579                 chain->ninfo++;
580             }
581             chain->evhdlr->evhdlr(chain->evhdlr->index,
582                                   chain->status, &chain->source,
583                                   chain->info, chain->ninfo,
584                                   chain->results, chain->nresults,
585                                   progress_local_event_hdlr, (void*)chain);
586             return;
587         }
588     }
589 
590   complete:
591     /* we still have to call their final callback */
592     if (NULL != chain->final_cbfunc) {
593         chain->final_cbfunc(PMIX_SUCCESS, chain->final_cbdata);
594         return;
595     }
596     /* maintain acctng */
597     PMIX_RELEASE(chain);
598     /* let the caller know that we are done with their callback */
599     if (NULL != cbfunc) {
600         cbfunc(PMIX_SUCCESS, thiscbdata);
601     }
602 }
603 
604 /* given notification of an event, cycle thru our list of
605  * registered callbacks and invoke the matching ones. Note
606  * that we will invoke the callbacks in order from single
607  * to multi-event to default, keeping a log of any returned
608  * info and passing it down to the next invoked event handler.
609  * Thus, each handler is given the opportunity to see what
610  * prior handlers did, and decide if anything further needs
611  * to be done.
612  */
pmix_invoke_local_event_hdlr(pmix_event_chain_t * chain)613 void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
614 {
615     /* We need to parse thru each registered handler and determine
616      * which one(s) to call for the specific error */
617     size_t i;
618     pmix_event_hdlr_t *evhdlr;
619     pmix_status_t rc = PMIX_SUCCESS;
620     bool found;
621 
622     pmix_output_verbose(2, pmix_client_globals.event_output,
623                         "%s:%d invoke_local_event_hdlr for status %s",
624                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
625                         PMIx_Error_string(chain->status));
626 
627     /* sanity check */
628     if (NULL == chain->info) {
629         /* should never happen as space must always be
630          * reserved for handler name and callback object*/
631         rc = PMIX_ERR_BAD_PARAM;
632         goto complete;
633     }
634 
635     /* if we are not a target, then we can simply ignore this event */
636     if (NULL != chain->targets) {
637         found = false;
638         for (i=0; i < chain->ntargets; i++) {
639             if (PMIX_CHECK_PROCID(&chain->targets[i], &pmix_globals.myid)) {
640                 found = true;
641                 break;
642             }
643         }
644         if (!found) {
645             goto complete;
646         }
647     }
648 
649     /* if we registered a "first" handler, and it fits the given range,
650      * then invoke it first */
651     if (NULL != pmix_globals.events.first) {
652         if (1 == pmix_globals.events.first->ncodes &&
653             pmix_globals.events.first->codes[0] == chain->status &&
654             pmix_notify_check_range(&pmix_globals.events.first->rng, &chain->source) &&
655             pmix_notify_check_affected(pmix_globals.events.first->affected, pmix_globals.events.first->naffected,
656                                        chain->affected, chain->naffected)) {
657             /* invoke the handler */
658             chain->evhdlr = pmix_globals.events.first;
659             goto invk;
660         } else if (NULL != pmix_globals.events.first->codes) {
661             /* need to check if this code is included in the array */
662             found = false;
663             for (i=0; i < pmix_globals.events.first->ncodes; i++) {
664                 if (pmix_globals.events.first->codes[i] == chain->status) {
665                     found = true;
666                     break;
667                 }
668             }
669             /* if this event handler provided a range, check to see if
670              * the source fits within it */
671             if (found && pmix_notify_check_range(&pmix_globals.events.first->rng, &chain->source)) {
672                 /* invoke the handler */
673                 chain->evhdlr = pmix_globals.events.first;
674                 goto invk;
675             }
676         } else {
677             /* take all codes for a default handler */
678             if (pmix_notify_check_range(&pmix_globals.events.first->rng, &chain->source)) {
679                 /* invoke the handler */
680                 chain->evhdlr = pmix_globals.events.first;
681                 goto invk;
682             }
683         }
684         /* get here if there is no match, so fall thru */
685     }
686 
687     /* cycle thru the single-event registrations first */
688     PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.single_events, pmix_event_hdlr_t) {
689         if (evhdlr->codes[0] == chain->status) {
690             if (pmix_notify_check_range(&evhdlr->rng, &chain->source) &&
691                 pmix_notify_check_affected(evhdlr->affected, evhdlr->naffected,
692                                            chain->affected, chain->naffected)) {
693                 /* invoke the handler */
694                 chain->evhdlr = evhdlr;
695                 goto invk;
696             }
697         }
698     }
699 
700     /* if we didn't find any match in the single-event registrations,
701      * then cycle thru the multi-event registrations next */
702     PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.multi_events, pmix_event_hdlr_t) {
703         for (i=0; i < evhdlr->ncodes; i++) {
704             if (evhdlr->codes[i] == chain->status) {
705                 if (pmix_notify_check_range(&evhdlr->rng, &chain->source) &&
706                     pmix_notify_check_affected(evhdlr->affected, evhdlr->naffected,
707                                                chain->affected, chain->naffected)) {
708                     /* invoke the handler */
709                     chain->evhdlr = evhdlr;
710                     goto invk;
711                 }
712             }
713         }
714     }
715 
716     /* if they didn't want it to go to a default handler, then ignore them */
717     if (!chain->nondefault) {
718         /* pass it to any default handlers */
719         PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) {
720             if (pmix_notify_check_range(&evhdlr->rng, &chain->source) &&
721                 pmix_notify_check_affected(evhdlr->affected, evhdlr->naffected,
722                                            chain->affected, chain->naffected)) {
723                 /* invoke the handler */
724                 chain->evhdlr = evhdlr;
725                 goto invk;
726             }
727         }
728     }
729 
730     /* if we registered a "last" handler, and it fits the given range
731      * and code, then invoke it now */
732     if (NULL != pmix_globals.events.last &&
733         pmix_notify_check_range(&pmix_globals.events.last->rng, &chain->source) &&
734         pmix_notify_check_affected(pmix_globals.events.last->affected, pmix_globals.events.last->naffected,
735                                    chain->affected, chain->naffected)) {
736         chain->endchain = true;  // ensure we don't do this again
737         if (1 == pmix_globals.events.last->ncodes &&
738             pmix_globals.events.last->codes[0] == chain->status) {
739             chain->evhdlr = pmix_globals.events.last;
740             goto invk;
741         } else if (NULL != pmix_globals.events.last->codes) {
742             /* need to check if this code is included in the array */
743             for (i=0; i < pmix_globals.events.last->ncodes; i++) {
744                 if (pmix_globals.events.last->codes[i] == chain->status) {
745                     chain->evhdlr = pmix_globals.events.last;
746                     goto invk;
747                 }
748             }
749         } else {
750             /* gets run for all codes */
751             chain->evhdlr = pmix_globals.events.last;
752             goto invk;
753         }
754     }
755 
756     /* if we got here, then nothing was found */
757 
758   complete:
759     /* we still have to call their final callback */
760     if (NULL != chain->final_cbfunc) {
761         chain->final_cbfunc(rc, chain->final_cbdata);
762     } else {
763         PMIX_RELEASE(chain);
764     }
765     return;
766 
767 
768   invk:
769     /* start with the chain holding only the given info */
770     chain->ninfo = chain->nallocated - 2;
771 
772     /* if the handler has a name, then provide it */
773     if (NULL != chain->evhdlr->name) {
774         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
775         chain->ninfo++;
776     }
777 
778     /* if there is an evhdlr cbobject, provide it */
779     if (NULL != chain->evhdlr->cbobject) {
780         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
781         chain->ninfo++;
782     }
783 
784     /* invoke the handler */
785     pmix_output_verbose(2, pmix_client_globals.event_output,
786                         "[%s:%d] INVOKING EVHDLR %s", __FILE__, __LINE__,
787                         (NULL == chain->evhdlr->name) ?
788                         "NULL" : chain->evhdlr->name);
789     chain->evhdlr->evhdlr(chain->evhdlr->index,
790                           chain->status, &chain->source,
791                           chain->info, chain->ninfo,
792                           NULL, 0,
793                           progress_local_event_hdlr, (void*)chain);
794     return;
795 }
796 
local_cbfunc(pmix_status_t status,void * cbdata)797 static void local_cbfunc(pmix_status_t status, void *cbdata)
798 {
799     pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
800 
801     if (NULL != cd->cbfunc) {
802         cd->cbfunc(status, cd->cbdata);
803     }
804     PMIX_RELEASE(cd);
805 }
806 
_notify_client_event(int sd,short args,void * cbdata)807 static void _notify_client_event(int sd, short args, void *cbdata)
808 {
809     pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
810     pmix_regevents_info_t *reginfoptr;
811     pmix_peer_events_info_t *pr;
812     pmix_event_chain_t *chain;
813     size_t n, nleft;
814     bool matched, holdcd;
815     pmix_buffer_t *bfr;
816     pmix_cmd_t cmd = PMIX_NOTIFY_CMD;
817     pmix_status_t rc;
818     pmix_list_t trk;
819     pmix_namelist_t *nm;
820     pmix_namespace_t *nptr, *tmp;
821     pmix_range_trkr_t rngtrk;
822     pmix_proc_t proc;
823 
824     /* need to acquire the object from its originating thread */
825     PMIX_ACQUIRE_OBJECT(cd);
826 
827     pmix_output_verbose(2, pmix_server_globals.event_output,
828                         "pmix_server: _notify_client_event notifying clients of event %s range %s type %s",
829                         PMIx_Error_string(cd->status),
830                         PMIx_Data_range_string(cd->range),
831                         cd->nondefault ? "NONDEFAULT" : "OPEN");
832 
833     /* check for caching instructions */
834     holdcd = true;
835     if (0 < cd->ninfo) {
836         /* check for caching instructions */
837         for (n=0; n < cd->ninfo; n++) {
838             if (PMIX_CHECK_KEY(&cd->info[n], PMIX_EVENT_DO_NOT_CACHE)) {
839                 if (PMIX_INFO_TRUE(&cd->info[n])) {
840                     holdcd = false;
841                 }
842                 break;
843             }
844         }
845     }
846     if (holdcd) {
847         /* we cannot know if everyone who wants this notice has had a chance
848          * to register for it - the notice may be coming too early. So cache
849          * the message until all local procs have received it, or it ages to
850          * the point where it gets pushed out by more recent events */
851         PMIX_RETAIN(cd);
852         rc = notify_event_cache(cd);
853         if (PMIX_SUCCESS != rc) {
854             PMIX_ERROR_LOG(rc);
855         }
856     }
857 
858     /* we may also have registered for events, so setup to check this
859      * against our registrations */
860     chain = PMIX_NEW(pmix_event_chain_t);
861     chain->status = cd->status;
862     pmix_strncpy(chain->source.nspace, cd->source.nspace, PMIX_MAX_NSLEN);
863     chain->source.rank = cd->source.rank;
864     /* we always leave space for a callback object and
865      * the evhandler name. */
866     chain->nallocated = cd->ninfo + 2;
867     PMIX_INFO_CREATE(chain->info, chain->nallocated);
868     /* prep the chain for processing */
869     pmix_prep_event_chain(chain, cd->info, cd->ninfo, true);
870 
871     /* copy setup to the cd object */
872     cd->nondefault = chain->nondefault;
873     if (NULL != chain->targets) {
874         cd->ntargets = chain->ntargets;
875         PMIX_PROC_CREATE(cd->targets, cd->ntargets);
876         memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t));
877         /* compute the number of targets that need to be notified */
878         nleft = 0;
879         for (n=0; n < cd->ntargets; n++) {
880             /* if this is a single proc, then increment by one */
881             if (PMIX_RANK_VALID >= cd->targets[n].rank) {
882                 ++nleft;
883             } else {
884                 /* look up the nspace for this proc */
885                 nptr = NULL;
886                 PMIX_LIST_FOREACH(tmp, &pmix_globals.nspaces, pmix_namespace_t) {
887                     if (PMIX_CHECK_NSPACE(tmp->nspace, cd->targets[n].nspace)) {
888                         nptr = tmp;
889                         break;
890                     }
891                 }
892                 /* if we don't yet know it, then nothing to do */
893                 if (NULL == nptr) {
894                     nleft = SIZE_MAX;
895                     break;
896                 }
897                 /* might notify all local members */
898                 nleft += nptr->nlocalprocs;
899             }
900         }
901         cd->nleft = nleft;
902     }
903     if (NULL != chain->affected) {
904         cd->naffected = chain->naffected;
905         PMIX_PROC_CREATE(cd->affected, cd->naffected);
906         if (NULL == cd->affected) {
907             cd->naffected = 0;
908             /* notify the caller */
909             if (NULL != cd->cbfunc) {
910                 cd->cbfunc(PMIX_ERR_NOMEM, cd->cbdata);
911             }
912             PMIX_RELEASE(cd);
913             PMIX_RELEASE(chain);
914             return;
915         }
916         memcpy(cd->affected, chain->affected, cd->naffected * sizeof(pmix_proc_t));
917     }
918 
919     /* if they provided a PMIX_EVENT_CUSTOM_RANGE info object but
920      * specified a range other than PMIX_RANGE_CUSTOM, then this
921      * is an error */
922     if (PMIX_RANGE_CUSTOM != cd->range && NULL != cd->targets) {
923         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
924         /* notify the caller */
925         if (NULL != cd->cbfunc) {
926             cd->cbfunc(PMIX_ERR_BAD_PARAM, cd->cbdata);
927         }
928         PMIX_RELEASE(cd);
929         PMIX_RELEASE(chain);
930         return;
931     }
932 
933     holdcd = false;
934     if (PMIX_RANGE_PROC_LOCAL != cd->range) {
935         PMIX_CONSTRUCT(&trk, pmix_list_t);
936         rngtrk.procs = NULL;
937         rngtrk.nprocs = 0;
938         /* cycle across our registered events and send the message to
939          * any client who registered for it */
940         PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) {
941             if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) ||
942                 cd->status == reginfoptr->code) {
943                 PMIX_LIST_FOREACH(pr, &reginfoptr->peers, pmix_peer_events_info_t) {
944                     /* if this client was the source of the event, then
945                      * don't send it back as they will have processed it
946                      * when they generated it */
947                     if (PMIX_CHECK_PROCID(&cd->source, &pr->peer->info->pname)) {
948                         continue;
949                     }
950                     /* if we have already notified this client, then don't do it again */
951                     matched = false;
952                     PMIX_LIST_FOREACH(nm, &trk, pmix_namelist_t) {
953                         if (nm->pname == &pr->peer->info->pname) {
954                             matched = true;
955                             break;
956                         }
957                     }
958                     if (matched) {
959                         continue;
960                     }
961                     /* check if the affected procs (if given) match those they
962                      * wanted to know about */
963                     if (!pmix_notify_check_affected(cd->affected, cd->naffected,
964                                                     pr->affected, pr->naffected)) {
965                         continue;
966                     }
967                     /* check the range */
968                     if (NULL == cd->targets) {
969                         rngtrk.procs = &cd->source;
970                         rngtrk.nprocs = 1;
971                     } else {
972                         rngtrk.procs = cd->targets;
973                         rngtrk.nprocs = cd->ntargets;
974                     }
975                     rngtrk.range = cd->range;
976                     PMIX_LOAD_PROCID(&proc, pr->peer->info->pname.nspace, pr->peer->info->pname.rank);
977                     if (!pmix_notify_check_range(&rngtrk, &proc)) {
978                         continue;
979                     }
980                     pmix_output_verbose(2, pmix_server_globals.event_output,
981                                         "pmix_server: notifying client %s:%u on status %s",
982                                         pr->peer->info->pname.nspace, pr->peer->info->pname.rank,
983                                         PMIx_Error_string(cd->status));
984 
985                     /* record that we notified this client */
986                     nm = PMIX_NEW(pmix_namelist_t);
987                     nm->pname = &pr->peer->info->pname;
988                     pmix_list_append(&trk, &nm->super);
989 
990                     bfr = PMIX_NEW(pmix_buffer_t);
991                     if (NULL == bfr) {
992                         continue;
993                     }
994                     /* pack the command */
995                     PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cmd, 1, PMIX_COMMAND);
996                     if (PMIX_SUCCESS != rc) {
997                         PMIX_ERROR_LOG(rc);
998                         PMIX_RELEASE(bfr);
999                         continue;
1000                     }
1001 
1002                     /* pack the status */
1003                     PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->status, 1, PMIX_STATUS);
1004                     if (PMIX_SUCCESS != rc) {
1005                         PMIX_ERROR_LOG(rc);
1006                         PMIX_RELEASE(bfr);
1007                         continue;
1008                     }
1009 
1010                     /* pack the source */
1011                     PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->source, 1, PMIX_PROC);
1012                     if (PMIX_SUCCESS != rc) {
1013                         PMIX_ERROR_LOG(rc);
1014                         PMIX_RELEASE(bfr);
1015                         continue;
1016                     }
1017                     /* pack any info */
1018                     PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->ninfo, 1, PMIX_SIZE);
1019                     if (PMIX_SUCCESS != rc) {
1020                         PMIX_ERROR_LOG(rc);
1021                         PMIX_RELEASE(bfr);
1022                         continue;
1023                     }
1024 
1025                     if (0 < cd->ninfo) {
1026                         PMIX_BFROPS_PACK(rc, pr->peer, bfr, cd->info, cd->ninfo, PMIX_INFO);
1027                         if (PMIX_SUCCESS != rc) {
1028                             PMIX_ERROR_LOG(rc);
1029                             PMIX_RELEASE(bfr);
1030                             continue;
1031                         }
1032                     }
1033                     PMIX_SERVER_QUEUE_REPLY(rc, pr->peer, 0, bfr);
1034                     if (PMIX_SUCCESS != rc) {
1035                         PMIX_RELEASE(bfr);
1036                     }
1037                     if (NULL != cd->targets && 0 < cd->nleft) {
1038                         /* track the number of targets we have left to notify */
1039                         --cd->nleft;
1040                         /* if the event was cached and this is the last one,
1041                          * then evict this event from the cache */
1042                         if (0 == cd->nleft) {
1043                             pmix_hotel_checkout(&pmix_globals.notifications, cd->room);
1044                             holdcd = false;
1045                             break;
1046                         }
1047                     }
1048                 }
1049             }
1050         }
1051         PMIX_LIST_DESTRUCT(&trk);
1052         if (PMIX_RANGE_LOCAL != cd->range && PMIX_CHECK_PROCID(&cd->source, &pmix_globals.myid)) {
1053             /* if we are the source, then we need to post this upwards as
1054              * well so the host RM can broadcast it as necessary */
1055             if (NULL != pmix_host_server.notify_event) {
1056                 /* mark that we sent it upstairs so we don't release
1057                  * the caddy until we return from the host RM */
1058                 holdcd = true;
1059                 pmix_host_server.notify_event(cd->status, &cd->source, cd->range,
1060                                               cd->info, cd->ninfo, local_cbfunc, cd);
1061             }
1062         }
1063     }
1064 
1065     /* process it ourselves */
1066     pmix_invoke_local_event_hdlr(chain);
1067 
1068     if (!holdcd) {
1069         /* notify the caller */
1070         if (NULL != cd->cbfunc) {
1071             cd->cbfunc(PMIX_SUCCESS, cd->cbdata);
1072         }
1073         PMIX_RELEASE(cd);
1074     }
1075 }
1076 
1077 
1078 /* as a server, we must do two things:
1079  *
1080  * (a) notify all clients that have registered for this event
1081  *
1082  * (b) callback any of our own functions that have registered
1083  *     for this event
1084  */
pmix_server_notify_client_of_event(pmix_status_t status,const pmix_proc_t * source,pmix_data_range_t range,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)1085 pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
1086                                                  const pmix_proc_t *source,
1087                                                  pmix_data_range_t range,
1088                                                  const pmix_info_t info[], size_t ninfo,
1089                                                  pmix_op_cbfunc_t cbfunc, void *cbdata)
1090 {
1091     pmix_notify_caddy_t *cd;
1092     size_t n;
1093 
1094     pmix_output_verbose(2, pmix_server_globals.event_output,
1095                         "pmix_server: notify client of event %s",
1096                         PMIx_Error_string(status));
1097 
1098     if (NULL != info) {
1099         for (n=0; n < ninfo; n++) {
1100             if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_PROXY) &&
1101                 PMIX_CHECK_PROCID(info[n].value.data.proc, &pmix_globals.myid)) {
1102                 return PMIX_OPERATION_SUCCEEDED;
1103             }
1104         }
1105     }
1106 
1107     cd = PMIX_NEW(pmix_notify_caddy_t);
1108     cd->status = status;
1109     if (NULL == source) {
1110         pmix_strncpy(cd->source.nspace, "UNDEF", PMIX_MAX_NSLEN);
1111         cd->source.rank = PMIX_RANK_UNDEF;
1112     } else {
1113         pmix_strncpy(cd->source.nspace, source->nspace, PMIX_MAX_NSLEN);
1114         cd->source.rank = source->rank;
1115     }
1116     cd->range = range;
1117     /* have to copy the info to preserve it for future when cached */
1118     if (0 < ninfo && NULL != info) {
1119         cd->ninfo = ninfo;
1120         PMIX_INFO_CREATE(cd->info, cd->ninfo);
1121         /* need to copy the info */
1122         for (n=0; n < cd->ninfo; n++) {
1123             PMIX_INFO_XFER(&cd->info[n], &info[n]);
1124         }
1125     }
1126 
1127     /* track the eventual callback info */
1128     cd->cbfunc = cbfunc;
1129     cd->cbdata = cbdata;
1130 
1131     pmix_output_verbose(2, pmix_server_globals.event_output,
1132                         "pmix_server_notify_event status =%d, source = %s:%d, ninfo =%lu",
1133                          status, cd->source.nspace, cd->source.rank, ninfo);
1134 
1135     /* we have to push this into our event library to avoid
1136      * potential threading issues */
1137     PMIX_THREADSHIFT(cd, _notify_client_event);
1138     return PMIX_SUCCESS;
1139 }
1140 
pmix_notify_check_range(pmix_range_trkr_t * rng,const pmix_proc_t * proc)1141 bool pmix_notify_check_range(pmix_range_trkr_t *rng,
1142                              const pmix_proc_t *proc)
1143 {
1144     size_t n;
1145 
1146     if (PMIX_RANGE_UNDEF == rng->range ||
1147         PMIX_RANGE_GLOBAL == rng->range ||
1148         PMIX_RANGE_SESSION == rng->range ||
1149         PMIX_RANGE_LOCAL == rng->range) { // assume RM took care of session & local for now
1150         return true;
1151     }
1152     if (PMIX_RANGE_NAMESPACE == rng->range) {
1153         for (n=0; n < rng->nprocs; n++) {
1154             if (PMIX_CHECK_NSPACE(rng->procs[n].nspace, proc->nspace)) {
1155                 return true;
1156             }
1157         }
1158         return false;
1159     }
1160     if (PMIX_RANGE_PROC_LOCAL == rng->range) {
1161         for (n=0; n < rng->nprocs; n++) {
1162             if (PMIX_CHECK_PROCID(&rng->procs[n], proc)) {
1163                 return true;
1164             }
1165         }
1166         return false;
1167     }
1168     if (PMIX_RANGE_CUSTOM == rng->range) {
1169         /* see if this proc was included */
1170         for (n=0; n < rng->nprocs; n++) {
1171             if (0 != strncmp(rng->procs[n].nspace, proc->nspace, PMIX_MAX_NSLEN)) {
1172                 continue;
1173             }
1174             if (PMIX_RANK_WILDCARD == rng->procs[n].rank ||
1175                 rng->procs[n].rank == proc->rank) {
1176                 return true;
1177             }
1178         }
1179         /* if we get here, then this proc isn't in range */
1180         return false;
1181     }
1182 
1183     /* if it is anything else, then reject it */
1184     return false;
1185 }
1186 
pmix_notify_check_affected(pmix_proc_t * interested,size_t ninterested,pmix_proc_t * affected,size_t naffected)1187 bool pmix_notify_check_affected(pmix_proc_t *interested, size_t ninterested,
1188                                 pmix_proc_t *affected, size_t naffected)
1189 {
1190     size_t m, n;
1191 
1192     /* if they didn't restrict their interests, then accept it */
1193     if (NULL == interested) {
1194         return true;
1195     }
1196     /* if we weren't given the affected procs, then accept it */
1197     if (NULL == affected) {
1198         return true;
1199     }
1200     /* check if the two overlap */
1201     for (n=0; n < naffected; n++) {
1202         for (m=0; m < ninterested; m++) {
1203             if (PMIX_CHECK_PROCID(&affected[n], &interested[m])) {
1204                 return true;
1205             }
1206         }
1207     }
1208     /* if we get here, then this proc isn't in range */
1209     return false;
1210 
1211 }
1212 
pmix_event_timeout_cb(int fd,short flags,void * arg)1213 void pmix_event_timeout_cb(int fd, short flags, void *arg)
1214 {
1215     pmix_event_chain_t *ch = (pmix_event_chain_t*)arg;
1216 
1217     /* need to acquire the object from its originating thread */
1218     PMIX_ACQUIRE_OBJECT(ch);
1219 
1220     ch->timer_active = false;
1221 
1222     /* remove it from the list */
1223     pmix_list_remove_item(&pmix_globals.cached_events, &ch->super);
1224 
1225     /* process this event thru the regular channels */
1226     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
1227         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
1228         pmix_server_notify_client_of_event(ch->status, &ch->source,
1229                                            ch->range, ch->info, ch->ninfo,
1230                                            ch->final_cbfunc, ch->final_cbdata);
1231     } else {
1232         pmix_invoke_local_event_hdlr(ch);
1233     }
1234 }
1235 
pmix_prep_event_chain(pmix_event_chain_t * chain,const pmix_info_t * info,size_t ninfo,bool xfer)1236 pmix_status_t pmix_prep_event_chain(pmix_event_chain_t *chain,
1237                                     const pmix_info_t *info, size_t ninfo,
1238                                     bool xfer)
1239 {
1240     size_t n;
1241 
1242     if (NULL != info && 0 < ninfo) {
1243         chain->ninfo = ninfo;
1244         if (NULL == chain->info) {
1245             PMIX_INFO_CREATE(chain->info, chain->ninfo);
1246         }
1247        /* need to copy the info */
1248         for (n=0; n < ninfo; n++) {
1249             if (xfer) {
1250                 /* chain doesn't already have a copy of the info */
1251                 PMIX_INFO_XFER(&chain->info[n], &info[n]);
1252             }
1253             /* look for specific directives */
1254             if (0 == strncmp(info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
1255                 chain->nondefault = PMIX_INFO_TRUE(&info[n]);
1256             } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_CUSTOM_RANGE)) {
1257                 /* provides an array of pmix_proc_t identifying the procs
1258                  * that are to receive this notification, or a single pmix_proc_t  */
1259                 if (PMIX_DATA_ARRAY == info[n].value.type &&
1260                     NULL != info[n].value.data.darray &&
1261                     NULL != info[n].value.data.darray->array) {
1262                     chain->ntargets = info[n].value.data.darray->size;
1263                     PMIX_PROC_CREATE(chain->targets, chain->ntargets);
1264                     memcpy(chain->targets, info[n].value.data.darray->array, chain->ntargets * sizeof(pmix_proc_t));
1265                 } else if (PMIX_PROC == info[n].value.type) {
1266                     chain->ntargets = 1;
1267                     PMIX_PROC_CREATE(chain->targets, chain->ntargets);
1268                     memcpy(chain->targets, info[n].value.data.proc, sizeof(pmix_proc_t));
1269                 } else {
1270                     /* this is an error */
1271                     PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1272                     return PMIX_ERR_BAD_PARAM;
1273                 }
1274             } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROC)) {
1275                 PMIX_PROC_CREATE(chain->affected, 1);
1276                 if (NULL == chain->affected) {
1277                     return PMIX_ERR_NOMEM;
1278                 }
1279                 chain->naffected = 1;
1280                 memcpy(chain->affected, info[n].value.data.proc, sizeof(pmix_proc_t));
1281             } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROCS)) {
1282                 chain->naffected = info[n].value.data.darray->size;
1283                 PMIX_PROC_CREATE(chain->affected, chain->naffected);
1284                 if (NULL == chain->affected) {
1285                     chain->naffected = 0;
1286                     return PMIX_ERR_NOMEM;
1287                 }
1288                 memcpy(chain->affected, info[n].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t));
1289             }
1290         }
1291     }
1292     return PMIX_SUCCESS;
1293 }
1294 
1295 /****    CLASS INSTANTIATIONS    ****/
1296 
sevcon(pmix_event_hdlr_t * p)1297 static void sevcon(pmix_event_hdlr_t *p)
1298 {
1299     p->name = NULL;
1300     p->index = UINT_MAX;
1301     p->precedence = PMIX_EVENT_ORDER_NONE;
1302     p->locator = NULL;
1303     p->rng.range = PMIX_RANGE_UNDEF;
1304     p->rng.procs = NULL;
1305     p->rng.nprocs = 0;
1306     p->affected = NULL;
1307     p->naffected = 0;
1308     p->evhdlr = NULL;
1309     p->cbobject = NULL;
1310     p->codes = NULL;
1311     p->ncodes = 0;
1312 }
sevdes(pmix_event_hdlr_t * p)1313 static void sevdes(pmix_event_hdlr_t *p)
1314 {
1315     if (NULL != p->name) {
1316         free(p->name);
1317     }
1318     if (NULL != p->locator) {
1319         free(p->locator);
1320     }
1321     if (NULL != p->rng.procs) {
1322         free(p->rng.procs);
1323     }
1324     if (NULL != p->affected) {
1325         PMIX_PROC_FREE(p->affected, p->naffected);
1326     }
1327     if (NULL != p->codes) {
1328         free(p->codes);
1329     }
1330 }
1331 PMIX_CLASS_INSTANCE(pmix_event_hdlr_t,
1332                     pmix_list_item_t,
1333                     sevcon, sevdes);
1334 
accon(pmix_active_code_t * p)1335 static void accon(pmix_active_code_t *p)
1336 {
1337     p->nregs = 0;
1338 }
1339 PMIX_CLASS_INSTANCE(pmix_active_code_t,
1340                     pmix_list_item_t,
1341                     accon, NULL);
1342 
evcon(pmix_events_t * p)1343 static void evcon(pmix_events_t *p)
1344 {
1345     p->nhdlrs = 0;
1346     p->first = NULL;
1347     p->last = NULL;
1348     PMIX_CONSTRUCT(&p->actives, pmix_list_t);
1349     PMIX_CONSTRUCT(&p->single_events, pmix_list_t);
1350     PMIX_CONSTRUCT(&p->multi_events, pmix_list_t);
1351     PMIX_CONSTRUCT(&p->default_events, pmix_list_t);
1352 }
evdes(pmix_events_t * p)1353 static void evdes(pmix_events_t *p)
1354 {
1355     if (NULL != p->first) {
1356         PMIX_RELEASE(p->first);
1357     }
1358     if (NULL != p->last) {
1359         PMIX_RELEASE(p->last);
1360     }
1361     PMIX_LIST_DESTRUCT(&p->actives);
1362     PMIX_LIST_DESTRUCT(&p->single_events);
1363     PMIX_LIST_DESTRUCT(&p->multi_events);
1364     PMIX_LIST_DESTRUCT(&p->default_events);
1365 }
1366 PMIX_CLASS_INSTANCE(pmix_events_t,
1367                     pmix_object_t,
1368                     evcon, evdes);
1369 
chcon(pmix_event_chain_t * p)1370 static void chcon(pmix_event_chain_t *p)
1371 {
1372     p->timer_active = false;
1373     memset(p->source.nspace, 0, PMIX_MAX_NSLEN+1);
1374     p->source.rank = PMIX_RANK_UNDEF;
1375     p->nondefault = false;
1376     p->endchain = false;
1377     p->targets = NULL;
1378     p->ntargets = 0;
1379     p->range = PMIX_RANGE_UNDEF;
1380     p->affected = NULL;
1381     p->naffected = 0;
1382     p->info = NULL;
1383     p->ninfo = 0;
1384     p->nallocated = 0;
1385     p->results = NULL;
1386     p->nresults = 0;
1387     p->evhdlr = NULL;
1388     p->final_cbfunc = NULL;
1389     p->final_cbdata = NULL;
1390 }
chdes(pmix_event_chain_t * p)1391 static void chdes(pmix_event_chain_t *p)
1392 {
1393     if (p->timer_active) {
1394         pmix_event_del(&p->ev);
1395     }
1396     if (NULL != p->targets) {
1397         PMIX_PROC_FREE(p->targets, p->ntargets);
1398     }
1399     if (NULL != p->affected) {
1400         PMIX_PROC_FREE(p->affected, p->naffected);
1401     }
1402     if (NULL != p->info) {
1403         PMIX_INFO_FREE(p->info, p->nallocated);
1404     }
1405     if (NULL != p->results) {
1406         PMIX_INFO_FREE(p->results, p->nresults);
1407     }
1408 }
1409 PMIX_CLASS_INSTANCE(pmix_event_chain_t,
1410                     pmix_list_item_t,
1411                     chcon, chdes);
1412