1 /*****************************************************************************\
2 ** pmix_coll_ring.c - PMIx collective primitives
3 *****************************************************************************
4 * Copyright (C) 2018 Mellanox Technologies. All rights reserved.
5 * Written by Artem Polyakov <artpol84@gmail.com, artemp@mellanox.com>,
6 * Boris Karasev <karasev.b@gmail.com, boriska@mellanox.com>.
7 *
8 * This file is part of Slurm, a resource management program.
9 * For details, see <https://slurm.schedmd.com/>.
10 * Please also read the included file: DISCLAIMER.
11 *
12 * Slurm is free software; you can redistribute it and/or modify it under
13 * the terms of the GNU General Public License as published by the Free
14 * Software Foundation; either version 2 of the License, or (at your option)
15 * any later version.
16 *
17 * In addition, as a special exception, the copyright holders give permission
18 * to link the code of portions of this program with the OpenSSL library under
19 * certain conditions as described in each individual source file, and
20 * distribute linked combinations including the two. You must obey the GNU
21 * General Public License in all respects for all of the code used other than
22 * OpenSSL. If you modify file(s) with this exception, you may extend this
23 * exception to your version of the file(s), but you are not obligated to do
24 * so. If you do not wish to do so, delete this exception statement from your
25 * version. If you delete this exception statement from all source files in
26 * the program, then also delete it here.
27 *
28 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
29 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
30 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
31 * details.
32 *
33 * You should have received a copy of the GNU General Public License along
34 * with Slurm; if not, write to the Free Software Foundation, Inc.,
35 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
36 \*****************************************************************************/
37
38 #include "pmixp_common.h"
39 #include "src/common/slurm_protocol_api.h"
40 #include "pmixp_coll.h"
41 #include "pmixp_nspaces.h"
42 #include "pmixp_server.h"
43 #include "pmixp_client.h"
44
45 typedef struct {
46 pmixp_coll_t *coll;
47 pmixp_coll_ring_ctx_t *coll_ctx;
48 Buf buf;
49 uint32_t seq;
50 } pmixp_coll_ring_cbdata_t;
51
52 static void _progress_coll_ring(pmixp_coll_ring_ctx_t *coll_ctx);
53
_ring_prev_id(pmixp_coll_t * coll)54 static inline int _ring_prev_id(pmixp_coll_t *coll)
55 {
56 return (coll->my_peerid + coll->peers_cnt - 1) % coll->peers_cnt;
57 }
58
_ring_next_id(pmixp_coll_t * coll)59 static inline int _ring_next_id(pmixp_coll_t *coll)
60 {
61 return (coll->my_peerid + 1) % coll->peers_cnt;
62 }
63
_ctx_get_coll(pmixp_coll_ring_ctx_t * coll_ctx)64 static inline pmixp_coll_t *_ctx_get_coll(pmixp_coll_ring_ctx_t *coll_ctx)
65 {
66 return coll_ctx->coll;
67 }
68
_ctx_get_coll_ring(pmixp_coll_ring_ctx_t * coll_ctx)69 static inline pmixp_coll_ring_t *_ctx_get_coll_ring(
70 pmixp_coll_ring_ctx_t *coll_ctx)
71 {
72 return &coll_ctx->coll->state.ring;
73 }
74
_ring_remain_contrib(pmixp_coll_ring_ctx_t * coll_ctx)75 static inline uint32_t _ring_remain_contrib(pmixp_coll_ring_ctx_t *coll_ctx)
76 {
77 return coll_ctx->coll->peers_cnt -
78 (coll_ctx->contrib_prev + coll_ctx->contrib_local);
79 }
80
_ring_fwd_done(pmixp_coll_ring_ctx_t * coll_ctx)81 static inline uint32_t _ring_fwd_done(pmixp_coll_ring_ctx_t *coll_ctx)
82 {
83 return !(coll_ctx->coll->peers_cnt - coll_ctx->forward_cnt - 1);
84 }
85
_ring_sent_cb(int rc,pmixp_p2p_ctx_t ctx,void * _cbdata)86 static void _ring_sent_cb(int rc, pmixp_p2p_ctx_t ctx, void *_cbdata)
87 {
88 pmixp_coll_ring_cbdata_t *cbdata = (pmixp_coll_ring_cbdata_t*)_cbdata;
89 pmixp_coll_ring_ctx_t *coll_ctx = cbdata->coll_ctx;
90 pmixp_coll_t *coll = cbdata->coll;
91 Buf buf = cbdata->buf;
92
93 pmixp_coll_sanity_check(coll);
94
95 if (PMIXP_P2P_REGULAR == ctx) {
96 /* lock the collective */
97 slurm_mutex_lock(&coll->lock);
98 }
99 #ifdef PMIXP_COLL_DEBUG
100 PMIXP_DEBUG("%p: called %d", coll_ctx, coll_ctx->seq);
101 #endif
102 if (cbdata->seq != coll_ctx->seq) {
103 /* it seems like this collective was reset since the time
104 * we initiated this send.
105 * Just exit to avoid data corruption.
106 */
107 PMIXP_DEBUG("%p: collective was reset!", coll_ctx);
108 goto exit;
109 }
110 coll_ctx->forward_cnt++;
111 _progress_coll_ring(coll_ctx);
112
113 exit:
114 pmixp_server_buf_reset(buf);
115 list_push(coll->state.ring.fwrd_buf_pool, buf);
116
117 if (PMIXP_P2P_REGULAR == ctx) {
118 /* unlock the collective */
119 slurm_mutex_unlock(&coll->lock);
120 }
121 xfree(cbdata);
122 }
123
pmixp_coll_ring_ctx_sanity_check(pmixp_coll_ring_ctx_t * coll_ctx)124 static inline void pmixp_coll_ring_ctx_sanity_check(
125 pmixp_coll_ring_ctx_t *coll_ctx)
126 {
127 xassert(NULL != coll_ctx);
128 xassert(coll_ctx->in_use);
129 pmixp_coll_sanity_check(coll_ctx->coll);
130 }
131
132 /*
133 * use it for internal collective
134 * performance evaluation tool.
135 */
pmixp_coll_ring_from_cbdata(void * cbdata)136 pmixp_coll_t *pmixp_coll_ring_from_cbdata(void *cbdata)
137 {
138 pmixp_coll_ring_cbdata_t *ptr = (pmixp_coll_ring_cbdata_t*)cbdata;
139 pmixp_coll_sanity_check(ptr->coll);
140 return ptr->coll;
141 }
142
pmixp_coll_ring_unpack(Buf buf,pmixp_coll_type_t * type,pmixp_coll_ring_msg_hdr_t * ring_hdr,pmixp_proc_t ** r,size_t * nr)143 int pmixp_coll_ring_unpack(Buf buf, pmixp_coll_type_t *type,
144 pmixp_coll_ring_msg_hdr_t *ring_hdr,
145 pmixp_proc_t **r, size_t *nr)
146 {
147 pmixp_proc_t *procs = NULL;
148 uint32_t nprocs = 0;
149 uint32_t tmp;
150 int rc, i;
151 char *temp_ptr;
152
153 /* 1. extract the type of collective */
154 if (SLURM_SUCCESS != (rc = unpack32(&tmp, buf))) {
155 PMIXP_ERROR("Cannot unpack collective type");
156 return rc;
157 }
158 *type = tmp;
159
160 /* 2. get the number of ranges */
161 if (SLURM_SUCCESS != (rc = unpack32(&nprocs, buf))) {
162 PMIXP_ERROR("Cannot unpack collective type");
163 return rc;
164 }
165 *nr = nprocs;
166
167 procs = xmalloc(sizeof(pmixp_proc_t) * nprocs);
168 *r = procs;
169
170 /* 3. get namespace/rank of particular process */
171 for (i = 0; i < (int)nprocs; i++) {
172 if ((rc = unpackmem_ptr(&temp_ptr, &tmp, buf)) ||
173 (strlcpy(procs[i].nspace, temp_ptr,
174 PMIXP_MAX_NSLEN + 1) > PMIXP_MAX_NSLEN)) {
175 PMIXP_ERROR("Cannot unpack namespace for process #%d",
176 i);
177 return rc;
178 }
179
180 rc = unpack32(&tmp, buf);
181 procs[i].rank = tmp;
182 if (SLURM_SUCCESS != rc) {
183 PMIXP_ERROR("Cannot unpack ranks for process #%d, nsp=%s",
184 i, procs[i].nspace);
185 return rc;
186 }
187 }
188
189 /* 4. extract the ring info */
190 if ((rc = unpackmem_ptr(&temp_ptr, &tmp, buf)) ||
191 (tmp != sizeof(pmixp_coll_ring_msg_hdr_t))) {
192 PMIXP_ERROR("Cannot unpack ring info");
193 return rc;
194 }
195
196 memcpy(ring_hdr, temp_ptr, sizeof(pmixp_coll_ring_msg_hdr_t));
197
198 return SLURM_SUCCESS;
199 }
200
_pack_coll_ring_info(pmixp_coll_t * coll,pmixp_coll_ring_msg_hdr_t * ring_hdr,Buf buf)201 static int _pack_coll_ring_info(pmixp_coll_t *coll,
202 pmixp_coll_ring_msg_hdr_t *ring_hdr,
203 Buf buf)
204 {
205 pmixp_proc_t *procs = coll->pset.procs;
206 size_t nprocs = coll->pset.nprocs;
207 uint32_t type = PMIXP_COLL_TYPE_FENCE_RING;
208 int i;
209
210 /* 1. store the type of collective */
211 pack32(type, buf);
212
213 /* 2. Put the number of ranges */
214 pack32(nprocs, buf);
215 for (i = 0; i < (int)nprocs; i++) {
216 /* Pack namespace */
217 packmem(procs->nspace, strlen(procs->nspace) + 1, buf);
218 pack32(procs->rank, buf);
219 }
220
221 /* 3. pack the ring header info */
222 packmem((char*)ring_hdr, sizeof(pmixp_coll_ring_msg_hdr_t), buf);
223
224 return SLURM_SUCCESS;
225 }
226
_get_fwd_buf(pmixp_coll_ring_ctx_t * coll_ctx)227 static Buf _get_fwd_buf(pmixp_coll_ring_ctx_t *coll_ctx)
228 {
229 pmixp_coll_ring_t *ring = _ctx_get_coll_ring(coll_ctx);
230 Buf buf = list_pop(ring->fwrd_buf_pool);
231 if (!buf) {
232 buf = pmixp_server_buf_new();
233 }
234 return buf;
235 }
236
_get_contrib_buf(pmixp_coll_ring_ctx_t * coll_ctx)237 static Buf _get_contrib_buf(pmixp_coll_ring_ctx_t *coll_ctx)
238 {
239 pmixp_coll_ring_t *ring = _ctx_get_coll_ring(coll_ctx);
240 Buf ring_buf = list_pop(ring->ring_buf_pool);
241 if (!ring_buf) {
242 ring_buf = create_buf(NULL, 0);
243 }
244 return ring_buf;
245 }
246
_ring_forward_data(pmixp_coll_ring_ctx_t * coll_ctx,uint32_t contrib_id,uint32_t hop_seq,void * data,size_t size)247 static int _ring_forward_data(pmixp_coll_ring_ctx_t *coll_ctx, uint32_t contrib_id,
248 uint32_t hop_seq, void *data, size_t size)
249 {
250 pmixp_coll_ring_msg_hdr_t hdr;
251 pmixp_coll_t *coll = _ctx_get_coll(coll_ctx);
252 pmixp_coll_ring_t *ring = &coll->state.ring;
253 hdr.nodeid = coll->my_peerid;
254 hdr.msgsize = size;
255 hdr.seq = coll_ctx->seq;
256 hdr.hop_seq = hop_seq;
257 hdr.contrib_id = contrib_id;
258 pmixp_ep_t *ep = (pmixp_ep_t*)xmalloc(sizeof(*ep));
259 pmixp_coll_ring_cbdata_t *cbdata = NULL;
260 uint32_t offset = 0;
261 Buf buf = _get_fwd_buf(coll_ctx);
262 int rc = SLURM_SUCCESS;
263
264
265 pmixp_coll_ring_ctx_sanity_check(coll_ctx);
266
267 #ifdef PMIXP_COLL_DEBUG
268 PMIXP_DEBUG("%p: transit data to nodeid=%d, seq=%d, hop=%d, size=%lu, contrib=%d",
269 coll_ctx, _ring_next_id(coll), hdr.seq,
270 hdr.hop_seq, hdr.msgsize, hdr.contrib_id);
271 #endif
272 if (!buf) {
273 rc = SLURM_ERROR;
274 goto exit;
275 }
276 ep->type = PMIXP_EP_NOIDEID;
277 ep->ep.nodeid = ring->next_peerid;
278
279 /* pack ring info */
280 _pack_coll_ring_info(coll, &hdr, buf);
281
282 /* insert payload to buf */
283 offset = get_buf_offset(buf);
284 pmixp_server_buf_reserve(buf, size);
285 memcpy(get_buf_data(buf) + offset, data, size);
286 set_buf_offset(buf, offset + size);
287
288 cbdata = xmalloc(sizeof(pmixp_coll_ring_cbdata_t));
289 cbdata->buf = buf;
290 cbdata->coll = coll;
291 cbdata->coll_ctx = coll_ctx;
292 cbdata->seq = coll_ctx->seq;
293 rc = pmixp_server_send_nb(ep, PMIXP_MSG_RING, coll_ctx->seq, buf,
294 _ring_sent_cb, cbdata);
295 exit:
296 return rc;
297 }
298
_reset_coll_ring(pmixp_coll_ring_ctx_t * coll_ctx)299 static void _reset_coll_ring(pmixp_coll_ring_ctx_t *coll_ctx)
300 {
301 pmixp_coll_t *coll = _ctx_get_coll(coll_ctx);
302 #ifdef PMIXP_COLL_DEBUG
303 PMIXP_DEBUG("%p: called", coll_ctx);
304 #endif
305 pmixp_coll_ring_ctx_sanity_check(coll_ctx);
306 coll_ctx->in_use = false;
307 coll_ctx->state = PMIXP_COLL_RING_SYNC;
308 coll_ctx->contrib_local = false;
309 coll_ctx->contrib_prev = 0;
310 coll_ctx->forward_cnt = 0;
311 coll->ts = time(NULL);
312 memset(coll_ctx->contrib_map, 0, sizeof(bool) * coll->peers_cnt);
313 coll_ctx->ring_buf = NULL;
314 }
315
_libpmix_cb(void * _vcbdata)316 static void _libpmix_cb(void *_vcbdata)
317 {
318 pmixp_coll_ring_cbdata_t *cbdata = (pmixp_coll_ring_cbdata_t*)_vcbdata;
319 pmixp_coll_t *coll = cbdata->coll;
320 Buf buf = cbdata->buf;
321
322 pmixp_coll_sanity_check(coll);
323
324 /* lock the structure */
325 slurm_mutex_lock(&coll->lock);
326
327 /* reset buf */
328 buf->processed = 0;
329 /* push it back to pool for reuse */
330 list_push(coll->state.ring.ring_buf_pool, buf);
331
332 /* unlock the structure */
333 slurm_mutex_unlock(&coll->lock);
334
335 xfree(cbdata);
336 }
337
_invoke_callback(pmixp_coll_ring_ctx_t * coll_ctx)338 static void _invoke_callback(pmixp_coll_ring_ctx_t *coll_ctx)
339 {
340 pmixp_coll_ring_cbdata_t *cbdata;
341 char *data;
342 size_t data_sz;
343 pmixp_coll_t *coll = _ctx_get_coll(coll_ctx);
344
345 if (!coll->cbfunc)
346 return;
347
348 data = get_buf_data(coll_ctx->ring_buf);
349 data_sz = get_buf_offset(coll_ctx->ring_buf);
350 cbdata = xmalloc(sizeof(pmixp_coll_ring_cbdata_t));
351
352 cbdata->coll = coll;
353 cbdata->coll_ctx = coll_ctx;
354 cbdata->buf = coll_ctx->ring_buf;
355 cbdata->seq = coll_ctx->seq;
356 pmixp_lib_modex_invoke(coll->cbfunc, SLURM_SUCCESS,
357 data, data_sz,
358 coll->cbdata, _libpmix_cb, (void *)cbdata);
359 /*
360 * Clear callback info as we are not allowed to use it second time
361 */
362 coll->cbfunc = NULL;
363 coll->cbdata = NULL;
364 }
365
_progress_coll_ring(pmixp_coll_ring_ctx_t * coll_ctx)366 static void _progress_coll_ring(pmixp_coll_ring_ctx_t *coll_ctx)
367 {
368 int ret = 0;
369 pmixp_coll_t *coll = _ctx_get_coll(coll_ctx);
370
371 pmixp_coll_ring_ctx_sanity_check(coll_ctx);
372
373 do {
374 ret = false;
375 switch(coll_ctx->state) {
376 case PMIXP_COLL_RING_SYNC:
377 if (coll_ctx->contrib_local || coll_ctx->contrib_prev) {
378 coll_ctx->state = PMIXP_COLL_RING_PROGRESS;
379 ret = true;
380 }
381 break;
382 case PMIXP_COLL_RING_PROGRESS:
383 /* check for all data is collected and forwarded */
384 if (!_ring_remain_contrib(coll_ctx) ) {
385 coll_ctx->state = PMIXP_COLL_RING_FINALIZE;
386 _invoke_callback(coll_ctx);
387 ret = true;
388 }
389 break;
390 case PMIXP_COLL_RING_FINALIZE:
391 if(_ring_fwd_done(coll_ctx)) {
392 #ifdef PMIXP_COLL_DEBUG
393 PMIXP_DEBUG("%p: %s seq=%d is DONE", coll,
394 pmixp_coll_type2str(coll->type),
395 coll_ctx->seq);
396 #endif
397 /* increase coll sequence */
398 coll->seq++;
399 _reset_coll_ring(coll_ctx);
400 ret = true;
401
402 }
403 break;
404 default:
405 PMIXP_ERROR("%p: unknown state = %d",
406 coll_ctx, (int)coll_ctx->state);
407 }
408 } while(ret);
409 }
410
pmixp_coll_ring_ctx_new(pmixp_coll_t * coll)411 pmixp_coll_ring_ctx_t *pmixp_coll_ring_ctx_new(pmixp_coll_t *coll)
412 {
413 int i;
414 pmixp_coll_ring_ctx_t *coll_ctx = NULL, *ret_ctx = NULL,
415 *free_ctx = NULL;
416 pmixp_coll_ring_t *ring = &coll->state.ring;
417 uint32_t seq = coll->seq;
418
419 for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
420 coll_ctx = &ring->ctx_array[i];
421 /*
422 * check that no active context exists to exclude the double
423 * context
424 */
425 if (coll_ctx->in_use) {
426 switch(coll_ctx->state) {
427 case PMIXP_COLL_RING_FINALIZE:
428 seq++;
429 break;
430 case PMIXP_COLL_RING_SYNC:
431 case PMIXP_COLL_RING_PROGRESS:
432 if (!ret_ctx && !coll_ctx->contrib_local) {
433 ret_ctx = coll_ctx;
434 }
435 break;
436 }
437 } else {
438 free_ctx = coll_ctx;
439 xassert(!free_ctx->in_use);
440 }
441 }
442 /* add this context to use */
443 if (!ret_ctx && free_ctx) {
444 ret_ctx = free_ctx;
445 ret_ctx->in_use = true;
446 ret_ctx->seq = seq;
447 ret_ctx->ring_buf = _get_contrib_buf(ret_ctx);
448 }
449 return ret_ctx;
450 }
451
pmixp_coll_ring_ctx_select(pmixp_coll_t * coll,const uint32_t seq)452 pmixp_coll_ring_ctx_t *pmixp_coll_ring_ctx_select(pmixp_coll_t *coll,
453 const uint32_t seq)
454 {
455 int i;
456 pmixp_coll_ring_ctx_t *coll_ctx = NULL, *ret = NULL;
457 pmixp_coll_ring_t *ring = &coll->state.ring;
458
459 /* finding the appropriate ring context */
460 for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
461 coll_ctx = &ring->ctx_array[i];
462 if (coll_ctx->in_use && coll_ctx->seq == seq) {
463 return coll_ctx;
464 } else if (!coll_ctx->in_use) {
465 ret = coll_ctx;
466 continue;
467 }
468 }
469 /* add this context to use */
470 if (ret && !ret->in_use) {
471 ret->in_use = true;
472 ret->seq = seq;
473 ret->ring_buf = _get_contrib_buf(ret);
474 }
475 return ret;
476 }
477
pmixp_coll_ring_init(pmixp_coll_t * coll,hostlist_t * hl)478 int pmixp_coll_ring_init(pmixp_coll_t *coll, hostlist_t *hl)
479 {
480 #ifdef PMIXP_COLL_DEBUG
481 PMIXP_DEBUG("called");
482 #endif
483 int i;
484 pmixp_coll_ring_ctx_t *coll_ctx = NULL;
485 pmixp_coll_ring_t *ring = &coll->state.ring;
486 char *p;
487 int rel_id = hostlist_find(*hl, pmixp_info_hostname());
488
489 /* compute the next absolute id of the neighbor */
490 p = hostlist_nth(*hl, (rel_id + 1) % coll->peers_cnt);
491 ring->next_peerid = pmixp_info_job_hostid(p);
492 free(p);
493
494 ring->fwrd_buf_pool = list_create(pmixp_free_buf);
495 ring->ring_buf_pool = list_create(pmixp_free_buf);
496
497 for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
498 coll_ctx = &ring->ctx_array[i];
499 coll_ctx->coll = coll;
500 coll_ctx->in_use = false;
501 coll_ctx->seq = coll->seq;
502 coll_ctx->contrib_local = false;
503 coll_ctx->contrib_prev = 0;
504 coll_ctx->state = PMIXP_COLL_RING_SYNC;
505 // TODO bit vector
506 coll_ctx->contrib_map = xmalloc(sizeof(bool) * coll->peers_cnt);
507 }
508
509 return SLURM_SUCCESS;
510 }
511
pmixp_coll_ring_free(pmixp_coll_ring_t * ring)512 void pmixp_coll_ring_free(pmixp_coll_ring_t *ring)
513 {
514 int i;
515
516 pmixp_coll_ring_ctx_t *coll_ctx;
517 for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
518 coll_ctx = &ring->ctx_array[i];
519 FREE_NULL_BUFFER(coll_ctx->ring_buf);
520 xfree(coll_ctx->contrib_map);
521 }
522 list_destroy(ring->fwrd_buf_pool);
523 list_destroy(ring->ring_buf_pool);
524 }
525
_pmixp_coll_contrib(pmixp_coll_ring_ctx_t * coll_ctx,int contrib_id,uint32_t hop,char * data,size_t size)526 inline static int _pmixp_coll_contrib(pmixp_coll_ring_ctx_t *coll_ctx,
527 int contrib_id,
528 uint32_t hop, char *data, size_t size)
529 {
530 pmixp_coll_t *coll = _ctx_get_coll(coll_ctx);
531 char *data_ptr = NULL;
532 int ret;
533
534 /* change the state */
535 coll->ts = time(NULL);
536
537 /* save contribution */
538 if (!size_buf(coll_ctx->ring_buf)) {
539 grow_buf(coll_ctx->ring_buf, size * coll->peers_cnt);
540 } else if(remaining_buf(coll_ctx->ring_buf) < size) {
541 uint32_t new_size = size_buf(coll_ctx->ring_buf) + size *
542 _ring_remain_contrib(coll_ctx);
543 grow_buf(coll_ctx->ring_buf, new_size);
544 }
545 grow_buf(coll_ctx->ring_buf, size);
546 data_ptr = get_buf_data(coll_ctx->ring_buf) +
547 get_buf_offset(coll_ctx->ring_buf);
548 memcpy(data_ptr, data, size);
549 set_buf_offset(coll_ctx->ring_buf,
550 get_buf_offset(coll_ctx->ring_buf) + size);
551
552 /* check for ring is complete */
553 if (contrib_id != _ring_next_id(coll)) {
554 /* forward data to the next node */
555 ret = _ring_forward_data(coll_ctx, contrib_id, hop,
556 data_ptr, size);
557 if (ret) {
558 PMIXP_ERROR("Cannot forward ring data");
559 return SLURM_ERROR;
560 }
561 }
562
563 return SLURM_SUCCESS;
564 }
565
pmixp_coll_ring_local(pmixp_coll_t * coll,char * data,size_t size,void * cbfunc,void * cbdata)566 int pmixp_coll_ring_local(pmixp_coll_t *coll, char *data, size_t size,
567 void *cbfunc, void *cbdata)
568 {
569 int ret = SLURM_SUCCESS;
570 pmixp_coll_ring_ctx_t *coll_ctx = NULL;
571
572 /* lock the structure */
573 slurm_mutex_lock(&coll->lock);
574
575 /* sanity check */
576 pmixp_coll_sanity_check(coll);
577
578 /* setup callback info */
579 coll->cbfunc = cbfunc;
580 coll->cbdata = cbdata;
581
582 coll_ctx = pmixp_coll_ring_ctx_new(coll);
583 if (!coll_ctx) {
584 PMIXP_ERROR("Can not get new ring collective context, seq=%u",
585 coll->seq);
586 ret = SLURM_ERROR;
587 goto exit;
588 }
589
590 #ifdef PMIXP_COLL_DEBUG
591 PMIXP_DEBUG("%p: contrib/loc: seqnum=%u, state=%d, size=%lu",
592 coll_ctx, coll_ctx->seq, coll_ctx->state, size);
593 #endif
594
595 if (_pmixp_coll_contrib(coll_ctx, coll->my_peerid, 0, data, size)) {
596 goto exit;
597 }
598
599 /* mark local contribution */
600 coll_ctx->contrib_local = true;
601 _progress_coll_ring(coll_ctx);
602
603 exit:
604 /* unlock the structure */
605 slurm_mutex_unlock(&coll->lock);
606
607 return ret;
608 }
609
pmixp_coll_ring_check(pmixp_coll_t * coll,pmixp_coll_ring_msg_hdr_t * hdr)610 int pmixp_coll_ring_check(pmixp_coll_t *coll, pmixp_coll_ring_msg_hdr_t *hdr)
611 {
612 char *nodename = NULL;
613 int rc;
614
615 if (hdr->nodeid != _ring_prev_id(coll)) {
616 nodename = pmixp_info_job_host(hdr->nodeid);
617 PMIXP_ERROR("%p: unexpected contrib from %s:%u, expected is %d",
618 coll, nodename, hdr->nodeid, _ring_prev_id(coll));
619 return SLURM_ERROR;
620 }
621 rc = pmixp_coll_check(coll, hdr->seq);
622 if (PMIXP_COLL_REQ_FAILURE == rc) {
623 /* this is an unacceptable event: either something went
624 * really wrong or the state machine is incorrect.
625 * This will 100% lead to application hang.
626 */
627 nodename = pmixp_info_job_host(hdr->nodeid);
628 PMIXP_ERROR("Bad collective seq. #%d from %s:%u, current is %d",
629 hdr->seq, nodename, hdr->nodeid, coll->seq);
630 pmixp_debug_hang(0); /* enable hang to debug this! */
631 slurm_kill_job_step(pmixp_info_jobid(),
632 pmixp_info_stepid(), SIGKILL);
633 xfree(nodename);
634 return SLURM_SUCCESS;
635 } else if (PMIXP_COLL_REQ_SKIP == rc) {
636 #ifdef PMIXP_COLL_DEBUG
637 nodename = pmixp_info_job_host(hdr->nodeid);
638 PMIXP_ERROR("Wrong collective seq. #%d from nodeid %u, current is %d, skip this message",
639 hdr->seq, hdr->nodeid, coll->seq);
640 #endif
641 return SLURM_ERROR;
642 }
643 return SLURM_SUCCESS;
644 }
645
pmixp_coll_ring_neighbor(pmixp_coll_t * coll,pmixp_coll_ring_msg_hdr_t * hdr,Buf buf)646 int pmixp_coll_ring_neighbor(pmixp_coll_t *coll, pmixp_coll_ring_msg_hdr_t *hdr,
647 Buf buf)
648 {
649 int ret = SLURM_SUCCESS;
650 char *data_ptr = NULL;
651 pmixp_coll_ring_ctx_t *coll_ctx = NULL;
652 uint32_t hop_seq;
653
654 /* lock the structure */
655 slurm_mutex_lock(&coll->lock);
656
657 coll_ctx = pmixp_coll_ring_ctx_select(coll, hdr->seq);
658 if (!coll_ctx) {
659 PMIXP_ERROR("Can not get ring collective context, seq=%u",
660 hdr->seq);
661 ret = SLURM_ERROR;
662 goto exit;
663 }
664 #ifdef PMIXP_COLL_DEBUG
665 PMIXP_DEBUG("%p: contrib/nbr: seqnum=%u, state=%d, nodeid=%d, contrib=%d, seq=%d, size=%lu",
666 coll_ctx, coll_ctx->seq, coll_ctx->state, hdr->nodeid,
667 hdr->contrib_id, hdr->hop_seq, hdr->msgsize);
668 #endif
669
670
671 /* verify msg size */
672 if (hdr->msgsize != remaining_buf(buf)) {
673 #ifdef PMIXP_COLL_DEBUG
674 PMIXP_DEBUG("%p: unexpected message size=%d, expect=%zu",
675 coll, remaining_buf(buf), hdr->msgsize);
676 #endif
677 goto exit;
678 }
679
680 /* compute the actual hops of ring: (src - dst + size) % size */
681 hop_seq = (coll->my_peerid + coll->peers_cnt - hdr->contrib_id) %
682 coll->peers_cnt - 1;
683 if (hdr->hop_seq != hop_seq) {
684 #ifdef PMIXP_COLL_DEBUG
685 PMIXP_DEBUG("%p: unexpected ring seq number=%d, expect=%d, coll seq=%d",
686 coll, hdr->hop_seq, hop_seq, coll->seq);
687 #endif
688 goto exit;
689 }
690
691 if (hdr->contrib_id >= coll->peers_cnt) {
692 goto exit;
693 }
694
695 if (coll_ctx->contrib_map[hdr->contrib_id]) {
696 #ifdef PMIXP_COLL_DEBUG
697 PMIXP_DEBUG("%p: double receiving was detected from %d, "
698 "local seq=%d, seq=%d, rejected",
699 coll, hdr->contrib_id, coll->seq, hdr->seq);
700 #endif
701 goto exit;
702 }
703
704 /* mark number of individual contributions */
705 coll_ctx->contrib_map[hdr->contrib_id] = true;
706
707 data_ptr = get_buf_data(buf) + get_buf_offset(buf);
708 if (_pmixp_coll_contrib(coll_ctx, hdr->contrib_id, hdr->hop_seq + 1,
709 data_ptr, remaining_buf(buf))) {
710 goto exit;
711 }
712
713 /* increase number of ring contributions */
714 coll_ctx->contrib_prev++;
715
716 /* ring coll progress */
717 _progress_coll_ring(coll_ctx);
718 exit:
719 /* unlock the structure */
720 slurm_mutex_unlock(&coll->lock);
721 return ret;
722 }
723
pmixp_coll_ring_reset_if_to(pmixp_coll_t * coll,time_t ts)724 void pmixp_coll_ring_reset_if_to(pmixp_coll_t *coll, time_t ts) {
725 pmixp_coll_ring_ctx_t *coll_ctx;
726 int i;
727
728 /* lock the structure */
729 slurm_mutex_lock(&coll->lock);
730 for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
731 coll_ctx = &coll->state.ring.ctx_array[i];
732 if (!coll_ctx->in_use ||
733 (PMIXP_COLL_RING_SYNC == coll_ctx->state)) {
734 continue;
735 }
736 if (ts - coll->ts > pmixp_info_timeout()) {
737 /* respond to the libpmix */
738 pmixp_coll_localcb_nodata(coll, PMIXP_ERR_TIMEOUT);
739
740 /* report the timeout event */
741 PMIXP_ERROR("%p: collective timeout seq=%d",
742 coll, coll_ctx->seq);
743 pmixp_coll_log(coll);
744 /* drop the collective */
745 _reset_coll_ring(coll_ctx);
746 }
747 }
748 /* unlock the structure */
749 slurm_mutex_unlock(&coll->lock);
750 }
751
pmixp_coll_ring_log(pmixp_coll_t * coll)752 void pmixp_coll_ring_log(pmixp_coll_t *coll)
753 {
754 int i;
755 pmixp_coll_ring_t *ring = &coll->state.ring;
756 char *nodename, *next, *prev;
757 char *out_str = NULL;
758
759 PMIXP_ERROR("%p: %s state seq=%d",
760 coll, pmixp_coll_type2str(coll->type), coll->seq);
761 nodename = pmixp_info_job_host(coll->my_peerid);
762 PMIXP_ERROR("my peerid: %d:%s", coll->my_peerid, nodename);
763 xfree(nodename);
764
765 next = pmixp_info_job_host(_ring_next_id(coll));
766 prev = pmixp_info_job_host(_ring_prev_id(coll));
767 xstrfmtcat(out_str,"neighbor id: next %d:%s, prev %d:%s",
768 _ring_next_id(coll), next, _ring_prev_id(coll), prev);
769 PMIXP_ERROR("%s", out_str);
770 xfree(next);
771 xfree(prev);
772 xfree(out_str);
773
774
775 for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
776 pmixp_coll_ring_ctx_t *coll_ctx = &ring->ctx_array[i];
777
778 PMIXP_ERROR("Context ptr=%p, #%d, in-use=%d",
779 coll_ctx, i, coll_ctx->in_use);
780
781 if (coll_ctx->in_use) {
782 int id;
783 char *done_contrib = NULL, *wait_contrib = NULL;
784 hostlist_t hl_done_contrib = NULL,
785 hl_wait_contrib = NULL, *tmp_list;
786
787 PMIXP_ERROR("\t seq=%d contribs: loc=%d/prev=%d/fwd=%d",
788 coll_ctx->seq, coll_ctx->contrib_local,
789 coll_ctx->contrib_prev,
790 coll_ctx->forward_cnt);
791 PMIXP_ERROR("\t neighbor contribs [%d]:",
792 coll->peers_cnt);
793
794 for (id = 0; id < coll->peers_cnt; id++) {
795 char *nodename;
796
797 if (coll->my_peerid == id)
798 continue;
799
800 nodename = pmixp_info_job_host(id);
801
802 tmp_list = coll_ctx->contrib_map[id] ?
803 &hl_done_contrib : &hl_wait_contrib;
804
805 if (!*tmp_list)
806 *tmp_list = hostlist_create(nodename);
807 else
808 hostlist_push_host(*tmp_list, nodename);
809 xfree(nodename);
810 }
811 if (hl_done_contrib) {
812 done_contrib =
813 slurm_hostlist_ranged_string_xmalloc(
814 hl_done_contrib);
815 FREE_NULL_HOSTLIST(hl_done_contrib);
816 }
817
818 if (hl_wait_contrib) {
819 wait_contrib =
820 slurm_hostlist_ranged_string_xmalloc(
821 hl_wait_contrib);
822 FREE_NULL_HOSTLIST(hl_wait_contrib);
823 }
824 PMIXP_ERROR("\t\t done contrib: %s",
825 done_contrib ? done_contrib : "-");
826 PMIXP_ERROR("\t\t wait contrib: %s",
827 wait_contrib ? wait_contrib : "-");
828 PMIXP_ERROR("\t status=%s",
829 pmixp_coll_ring_state2str(coll_ctx->state));
830 if (coll_ctx->ring_buf) {
831 PMIXP_ERROR("\t buf (offset/size): %u/%u",
832 get_buf_offset(coll_ctx->ring_buf),
833 size_buf(coll_ctx->ring_buf));
834 }
835 xfree(done_contrib);
836 xfree(wait_contrib);
837 }
838 }
839 }
840