1 /*****************************************************************************\
2  **  pmix_coll_ring.c - PMIx collective primitives
3  *****************************************************************************
4  *  Copyright (C) 2018      Mellanox Technologies. All rights reserved.
5  *  Written by Artem Polyakov <artpol84@gmail.com, artemp@mellanox.com>,
6  *             Boris Karasev <karasev.b@gmail.com, boriska@mellanox.com>.
7  *
8  *  This file is part of Slurm, a resource management program.
9  *  For details, see <https://slurm.schedmd.com/>.
10  *  Please also read the included file: DISCLAIMER.
11  *
12  *  Slurm is free software; you can redistribute it and/or modify it under
13  *  the terms of the GNU General Public License as published by the Free
14  *  Software Foundation; either version 2 of the License, or (at your option)
15  *  any later version.
16  *
17  *  In addition, as a special exception, the copyright holders give permission
18  *  to link the code of portions of this program with the OpenSSL library under
19  *  certain conditions as described in each individual source file, and
20  *  distribute linked combinations including the two. You must obey the GNU
21  *  General Public License in all respects for all of the code used other than
22  *  OpenSSL. If you modify file(s) with this exception, you may extend this
23  *  exception to your version of the file(s), but you are not obligated to do
24  *  so. If you do not wish to do so, delete this exception statement from your
25  *  version.  If you delete this exception statement from all source files in
26  *  the program, then also delete it here.
27  *
28  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
29  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
30  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
31  *  details.
32  *
33  *  You should have received a copy of the GNU General Public License along
34  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
35  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
36  \*****************************************************************************/
37 
38 #include "pmixp_common.h"
39 #include "src/common/slurm_protocol_api.h"
40 #include "pmixp_coll.h"
41 #include "pmixp_nspaces.h"
42 #include "pmixp_server.h"
43 #include "pmixp_client.h"
44 
45 typedef struct {
46 	pmixp_coll_t *coll;
47 	pmixp_coll_ring_ctx_t *coll_ctx;
48 	Buf buf;
49 	uint32_t seq;
50 } pmixp_coll_ring_cbdata_t;
51 
52 static void _progress_coll_ring(pmixp_coll_ring_ctx_t *coll_ctx);
53 
_ring_prev_id(pmixp_coll_t * coll)54 static inline int _ring_prev_id(pmixp_coll_t *coll)
55 {
56 	return (coll->my_peerid + coll->peers_cnt - 1) % coll->peers_cnt;
57 }
58 
_ring_next_id(pmixp_coll_t * coll)59 static inline int _ring_next_id(pmixp_coll_t *coll)
60 {
61 	return (coll->my_peerid + 1) % coll->peers_cnt;
62 }
63 
_ctx_get_coll(pmixp_coll_ring_ctx_t * coll_ctx)64 static inline pmixp_coll_t *_ctx_get_coll(pmixp_coll_ring_ctx_t *coll_ctx)
65 {
66 	return coll_ctx->coll;
67 }
68 
_ctx_get_coll_ring(pmixp_coll_ring_ctx_t * coll_ctx)69 static inline pmixp_coll_ring_t *_ctx_get_coll_ring(
70 	pmixp_coll_ring_ctx_t *coll_ctx)
71 {
72 	return &coll_ctx->coll->state.ring;
73 }
74 
_ring_remain_contrib(pmixp_coll_ring_ctx_t * coll_ctx)75 static inline uint32_t _ring_remain_contrib(pmixp_coll_ring_ctx_t *coll_ctx)
76 {
77 	return coll_ctx->coll->peers_cnt -
78 		(coll_ctx->contrib_prev + coll_ctx->contrib_local);
79 }
80 
_ring_fwd_done(pmixp_coll_ring_ctx_t * coll_ctx)81 static inline uint32_t _ring_fwd_done(pmixp_coll_ring_ctx_t *coll_ctx)
82 {
83 	return !(coll_ctx->coll->peers_cnt - coll_ctx->forward_cnt - 1);
84 }
85 
_ring_sent_cb(int rc,pmixp_p2p_ctx_t ctx,void * _cbdata)86 static void _ring_sent_cb(int rc, pmixp_p2p_ctx_t ctx, void *_cbdata)
87 {
88 	pmixp_coll_ring_cbdata_t *cbdata = (pmixp_coll_ring_cbdata_t*)_cbdata;
89 	pmixp_coll_ring_ctx_t *coll_ctx = cbdata->coll_ctx;
90 	pmixp_coll_t *coll = cbdata->coll;
91 	Buf buf = cbdata->buf;
92 
93 	pmixp_coll_sanity_check(coll);
94 
95 	if (PMIXP_P2P_REGULAR == ctx) {
96 		/* lock the collective */
97 		slurm_mutex_lock(&coll->lock);
98 	}
99 #ifdef PMIXP_COLL_DEBUG
100 	PMIXP_DEBUG("%p: called %d", coll_ctx, coll_ctx->seq);
101 #endif
102 	if (cbdata->seq != coll_ctx->seq) {
103 		/* it seems like this collective was reset since the time
104 		 * we initiated this send.
105 		 * Just exit to avoid data corruption.
106 		 */
107 		PMIXP_DEBUG("%p: collective was reset!", coll_ctx);
108 		goto exit;
109 	}
110 	coll_ctx->forward_cnt++;
111 	_progress_coll_ring(coll_ctx);
112 
113 exit:
114 	pmixp_server_buf_reset(buf);
115 	list_push(coll->state.ring.fwrd_buf_pool, buf);
116 
117 	if (PMIXP_P2P_REGULAR == ctx) {
118 		/* unlock the collective */
119 		slurm_mutex_unlock(&coll->lock);
120 	}
121 	xfree(cbdata);
122 }
123 
pmixp_coll_ring_ctx_sanity_check(pmixp_coll_ring_ctx_t * coll_ctx)124 static inline void pmixp_coll_ring_ctx_sanity_check(
125 	pmixp_coll_ring_ctx_t *coll_ctx)
126 {
127 	xassert(NULL != coll_ctx);
128 	xassert(coll_ctx->in_use);
129 	pmixp_coll_sanity_check(coll_ctx->coll);
130 }
131 
132 /*
133  * use it for internal collective
134  * performance evaluation tool.
135  */
pmixp_coll_ring_from_cbdata(void * cbdata)136 pmixp_coll_t *pmixp_coll_ring_from_cbdata(void *cbdata)
137 {
138 	pmixp_coll_ring_cbdata_t *ptr = (pmixp_coll_ring_cbdata_t*)cbdata;
139 	pmixp_coll_sanity_check(ptr->coll);
140 	return ptr->coll;
141 }
142 
pmixp_coll_ring_unpack(Buf buf,pmixp_coll_type_t * type,pmixp_coll_ring_msg_hdr_t * ring_hdr,pmixp_proc_t ** r,size_t * nr)143 int pmixp_coll_ring_unpack(Buf buf, pmixp_coll_type_t *type,
144 			   pmixp_coll_ring_msg_hdr_t *ring_hdr,
145 			   pmixp_proc_t **r, size_t *nr)
146 {
147 	pmixp_proc_t *procs = NULL;
148 	uint32_t nprocs = 0;
149 	uint32_t tmp;
150 	int rc, i;
151 	char *temp_ptr;
152 
153 	/* 1. extract the type of collective */
154 	if (SLURM_SUCCESS != (rc = unpack32(&tmp, buf))) {
155 		PMIXP_ERROR("Cannot unpack collective type");
156 		return rc;
157 	}
158 	*type = tmp;
159 
160 	/* 2. get the number of ranges */
161 	if (SLURM_SUCCESS != (rc = unpack32(&nprocs, buf))) {
162 		PMIXP_ERROR("Cannot unpack collective type");
163 		return rc;
164 	}
165 	*nr = nprocs;
166 
167 	procs = xmalloc(sizeof(pmixp_proc_t) * nprocs);
168 	*r = procs;
169 
170 	/* 3. get namespace/rank of particular process */
171 	for (i = 0; i < (int)nprocs; i++) {
172 		if ((rc = unpackmem_ptr(&temp_ptr, &tmp, buf)) ||
173 		    (strlcpy(procs[i].nspace, temp_ptr,
174 			     PMIXP_MAX_NSLEN + 1) > PMIXP_MAX_NSLEN)) {
175 			PMIXP_ERROR("Cannot unpack namespace for process #%d",
176 				    i);
177 			return rc;
178 		}
179 
180 		rc = unpack32(&tmp, buf);
181 		procs[i].rank = tmp;
182 		if (SLURM_SUCCESS != rc) {
183 			PMIXP_ERROR("Cannot unpack ranks for process #%d, nsp=%s",
184 				    i, procs[i].nspace);
185 			return rc;
186 		}
187 	}
188 
189 	/* 4. extract the ring info */
190 	if ((rc = unpackmem_ptr(&temp_ptr, &tmp, buf)) ||
191 	    (tmp != sizeof(pmixp_coll_ring_msg_hdr_t))) {
192 		PMIXP_ERROR("Cannot unpack ring info");
193 		return rc;
194 	}
195 
196 	memcpy(ring_hdr, temp_ptr, sizeof(pmixp_coll_ring_msg_hdr_t));
197 
198 	return SLURM_SUCCESS;
199 }
200 
_pack_coll_ring_info(pmixp_coll_t * coll,pmixp_coll_ring_msg_hdr_t * ring_hdr,Buf buf)201 static int _pack_coll_ring_info(pmixp_coll_t *coll,
202 				pmixp_coll_ring_msg_hdr_t *ring_hdr,
203 				Buf buf)
204 {
205 	pmixp_proc_t *procs = coll->pset.procs;
206 	size_t nprocs = coll->pset.nprocs;
207 	uint32_t type = PMIXP_COLL_TYPE_FENCE_RING;
208 	int i;
209 
210 	/* 1. store the type of collective */
211 	pack32(type, buf);
212 
213 	/* 2. Put the number of ranges */
214 	pack32(nprocs, buf);
215 	for (i = 0; i < (int)nprocs; i++) {
216 		/* Pack namespace */
217 		packmem(procs->nspace, strlen(procs->nspace) + 1, buf);
218 		pack32(procs->rank, buf);
219 	}
220 
221 	/* 3. pack the ring header info */
222 	packmem((char*)ring_hdr, sizeof(pmixp_coll_ring_msg_hdr_t), buf);
223 
224 	return SLURM_SUCCESS;
225 }
226 
_get_fwd_buf(pmixp_coll_ring_ctx_t * coll_ctx)227 static Buf _get_fwd_buf(pmixp_coll_ring_ctx_t *coll_ctx)
228 {
229 	pmixp_coll_ring_t *ring = _ctx_get_coll_ring(coll_ctx);
230 	Buf buf = list_pop(ring->fwrd_buf_pool);
231 	if (!buf) {
232 		buf = pmixp_server_buf_new();
233 	}
234 	return buf;
235 }
236 
_get_contrib_buf(pmixp_coll_ring_ctx_t * coll_ctx)237 static Buf _get_contrib_buf(pmixp_coll_ring_ctx_t *coll_ctx)
238 {
239 	pmixp_coll_ring_t *ring = _ctx_get_coll_ring(coll_ctx);
240 	Buf ring_buf = list_pop(ring->ring_buf_pool);
241 	if (!ring_buf) {
242 		ring_buf = create_buf(NULL, 0);
243 	}
244 	return ring_buf;
245 }
246 
_ring_forward_data(pmixp_coll_ring_ctx_t * coll_ctx,uint32_t contrib_id,uint32_t hop_seq,void * data,size_t size)247 static int _ring_forward_data(pmixp_coll_ring_ctx_t *coll_ctx, uint32_t contrib_id,
248 			      uint32_t hop_seq, void *data, size_t size)
249 {
250 	pmixp_coll_ring_msg_hdr_t hdr;
251 	pmixp_coll_t *coll = _ctx_get_coll(coll_ctx);
252 	pmixp_coll_ring_t *ring = &coll->state.ring;
253 	hdr.nodeid = coll->my_peerid;
254 	hdr.msgsize = size;
255 	hdr.seq = coll_ctx->seq;
256 	hdr.hop_seq = hop_seq;
257 	hdr.contrib_id = contrib_id;
258 	pmixp_ep_t *ep = (pmixp_ep_t*)xmalloc(sizeof(*ep));
259 	pmixp_coll_ring_cbdata_t *cbdata = NULL;
260 	uint32_t offset = 0;
261 	Buf buf = _get_fwd_buf(coll_ctx);
262 	int rc = SLURM_SUCCESS;
263 
264 
265 	pmixp_coll_ring_ctx_sanity_check(coll_ctx);
266 
267 #ifdef PMIXP_COLL_DEBUG
268 	PMIXP_DEBUG("%p: transit data to nodeid=%d, seq=%d, hop=%d, size=%lu, contrib=%d",
269 		    coll_ctx, _ring_next_id(coll), hdr.seq,
270 		    hdr.hop_seq, hdr.msgsize, hdr.contrib_id);
271 #endif
272 	if (!buf) {
273 		rc = SLURM_ERROR;
274 		goto exit;
275 	}
276 	ep->type = PMIXP_EP_NOIDEID;
277 	ep->ep.nodeid = ring->next_peerid;
278 
279 	/* pack ring info */
280 	_pack_coll_ring_info(coll, &hdr, buf);
281 
282 	/* insert payload to buf */
283 	offset = get_buf_offset(buf);
284 	pmixp_server_buf_reserve(buf, size);
285 	memcpy(get_buf_data(buf) + offset, data, size);
286 	set_buf_offset(buf, offset + size);
287 
288 	cbdata = xmalloc(sizeof(pmixp_coll_ring_cbdata_t));
289 	cbdata->buf = buf;
290 	cbdata->coll = coll;
291 	cbdata->coll_ctx = coll_ctx;
292 	cbdata->seq = coll_ctx->seq;
293 	rc = pmixp_server_send_nb(ep, PMIXP_MSG_RING, coll_ctx->seq, buf,
294 				  _ring_sent_cb, cbdata);
295 exit:
296 	return rc;
297 }
298 
_reset_coll_ring(pmixp_coll_ring_ctx_t * coll_ctx)299 static void _reset_coll_ring(pmixp_coll_ring_ctx_t *coll_ctx)
300 {
301 	pmixp_coll_t *coll = _ctx_get_coll(coll_ctx);
302 #ifdef PMIXP_COLL_DEBUG
303 	PMIXP_DEBUG("%p: called", coll_ctx);
304 #endif
305 	pmixp_coll_ring_ctx_sanity_check(coll_ctx);
306 	coll_ctx->in_use = false;
307 	coll_ctx->state = PMIXP_COLL_RING_SYNC;
308 	coll_ctx->contrib_local = false;
309 	coll_ctx->contrib_prev = 0;
310 	coll_ctx->forward_cnt = 0;
311 	coll->ts = time(NULL);
312 	memset(coll_ctx->contrib_map, 0, sizeof(bool) * coll->peers_cnt);
313 	coll_ctx->ring_buf = NULL;
314 }
315 
_libpmix_cb(void * _vcbdata)316 static void _libpmix_cb(void *_vcbdata)
317 {
318 	pmixp_coll_ring_cbdata_t *cbdata = (pmixp_coll_ring_cbdata_t*)_vcbdata;
319 	pmixp_coll_t *coll = cbdata->coll;
320 	Buf buf = cbdata->buf;
321 
322 	pmixp_coll_sanity_check(coll);
323 
324 	/* lock the structure */
325 	slurm_mutex_lock(&coll->lock);
326 
327 	/* reset buf */
328 	buf->processed = 0;
329 	/* push it back to pool for reuse */
330 	list_push(coll->state.ring.ring_buf_pool, buf);
331 
332 	/* unlock the structure */
333 	slurm_mutex_unlock(&coll->lock);
334 
335 	xfree(cbdata);
336 }
337 
_invoke_callback(pmixp_coll_ring_ctx_t * coll_ctx)338 static void _invoke_callback(pmixp_coll_ring_ctx_t *coll_ctx)
339 {
340 	pmixp_coll_ring_cbdata_t *cbdata;
341 	char *data;
342 	size_t data_sz;
343 	pmixp_coll_t *coll = _ctx_get_coll(coll_ctx);
344 
345 	if (!coll->cbfunc)
346 		return;
347 
348 	data = get_buf_data(coll_ctx->ring_buf);
349 	data_sz = get_buf_offset(coll_ctx->ring_buf);
350 	cbdata = xmalloc(sizeof(pmixp_coll_ring_cbdata_t));
351 
352 	cbdata->coll = coll;
353 	cbdata->coll_ctx = coll_ctx;
354 	cbdata->buf = coll_ctx->ring_buf;
355 	cbdata->seq = coll_ctx->seq;
356 	pmixp_lib_modex_invoke(coll->cbfunc, SLURM_SUCCESS,
357 			       data, data_sz,
358 			       coll->cbdata, _libpmix_cb, (void *)cbdata);
359 	/*
360 	 * Clear callback info as we are not allowed to use it second time
361 	 */
362 	coll->cbfunc = NULL;
363 	coll->cbdata = NULL;
364 }
365 
_progress_coll_ring(pmixp_coll_ring_ctx_t * coll_ctx)366 static void _progress_coll_ring(pmixp_coll_ring_ctx_t *coll_ctx)
367 {
368 	int ret = 0;
369 	pmixp_coll_t *coll = _ctx_get_coll(coll_ctx);
370 
371 	pmixp_coll_ring_ctx_sanity_check(coll_ctx);
372 
373 	do {
374 		ret = false;
375 		switch(coll_ctx->state) {
376 		case PMIXP_COLL_RING_SYNC:
377 			if (coll_ctx->contrib_local || coll_ctx->contrib_prev) {
378 				coll_ctx->state = PMIXP_COLL_RING_PROGRESS;
379 				ret = true;
380 			}
381 			break;
382 		case PMIXP_COLL_RING_PROGRESS:
383 			/* check for all data is collected and forwarded */
384 			if (!_ring_remain_contrib(coll_ctx) ) {
385 				coll_ctx->state = PMIXP_COLL_RING_FINALIZE;
386 				_invoke_callback(coll_ctx);
387 				ret = true;
388 			}
389 			break;
390 		case PMIXP_COLL_RING_FINALIZE:
391 			if(_ring_fwd_done(coll_ctx)) {
392 #ifdef PMIXP_COLL_DEBUG
393 				PMIXP_DEBUG("%p: %s seq=%d is DONE", coll,
394 					    pmixp_coll_type2str(coll->type),
395 					    coll_ctx->seq);
396 #endif
397 				/* increase coll sequence */
398 				coll->seq++;
399 				_reset_coll_ring(coll_ctx);
400 				ret = true;
401 
402 			}
403 			break;
404 		default:
405 			PMIXP_ERROR("%p: unknown state = %d",
406 				    coll_ctx, (int)coll_ctx->state);
407 		}
408 	} while(ret);
409 }
410 
pmixp_coll_ring_ctx_new(pmixp_coll_t * coll)411 pmixp_coll_ring_ctx_t *pmixp_coll_ring_ctx_new(pmixp_coll_t *coll)
412 {
413 	int i;
414 	pmixp_coll_ring_ctx_t *coll_ctx = NULL, *ret_ctx = NULL,
415 		*free_ctx = NULL;
416 	pmixp_coll_ring_t *ring = &coll->state.ring;
417 	uint32_t seq = coll->seq;
418 
419 	for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
420 		coll_ctx = &ring->ctx_array[i];
421 		/*
422 		 * check that no active context exists to exclude the double
423 		 * context
424 		 */
425 		if (coll_ctx->in_use) {
426 			switch(coll_ctx->state) {
427 			case PMIXP_COLL_RING_FINALIZE:
428 				seq++;
429 				break;
430 			case PMIXP_COLL_RING_SYNC:
431 			case PMIXP_COLL_RING_PROGRESS:
432 				if (!ret_ctx && !coll_ctx->contrib_local) {
433 					ret_ctx = coll_ctx;
434 				}
435 				break;
436 			}
437 		} else {
438 			free_ctx = coll_ctx;
439 			xassert(!free_ctx->in_use);
440 		}
441 	}
442 	/* add this context to use */
443 	if (!ret_ctx && free_ctx) {
444 		ret_ctx = free_ctx;
445 		ret_ctx->in_use = true;
446 		ret_ctx->seq = seq;
447 		ret_ctx->ring_buf = _get_contrib_buf(ret_ctx);
448 	}
449 	return ret_ctx;
450 }
451 
pmixp_coll_ring_ctx_select(pmixp_coll_t * coll,const uint32_t seq)452 pmixp_coll_ring_ctx_t *pmixp_coll_ring_ctx_select(pmixp_coll_t *coll,
453 						  const uint32_t seq)
454 {
455 	int i;
456 	pmixp_coll_ring_ctx_t *coll_ctx = NULL, *ret = NULL;
457 	pmixp_coll_ring_t *ring = &coll->state.ring;
458 
459 	/* finding the appropriate ring context */
460 	for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
461 		coll_ctx = &ring->ctx_array[i];
462 		if (coll_ctx->in_use && coll_ctx->seq == seq) {
463 			return coll_ctx;
464 		} else if (!coll_ctx->in_use) {
465 			ret = coll_ctx;
466 			continue;
467 		}
468 	}
469 	/* add this context to use */
470 	if (ret && !ret->in_use) {
471 		ret->in_use = true;
472 		ret->seq = seq;
473 		ret->ring_buf = _get_contrib_buf(ret);
474 	}
475 	return ret;
476 }
477 
pmixp_coll_ring_init(pmixp_coll_t * coll,hostlist_t * hl)478 int pmixp_coll_ring_init(pmixp_coll_t *coll, hostlist_t *hl)
479 {
480 #ifdef PMIXP_COLL_DEBUG
481 	PMIXP_DEBUG("called");
482 #endif
483 	int i;
484 	pmixp_coll_ring_ctx_t *coll_ctx = NULL;
485 	pmixp_coll_ring_t *ring = &coll->state.ring;
486 	char *p;
487 	int rel_id = hostlist_find(*hl, pmixp_info_hostname());
488 
489 	/* compute the next absolute id of the neighbor */
490 	p = hostlist_nth(*hl, (rel_id + 1) % coll->peers_cnt);
491 	ring->next_peerid = pmixp_info_job_hostid(p);
492 	free(p);
493 
494 	ring->fwrd_buf_pool = list_create(pmixp_free_buf);
495 	ring->ring_buf_pool = list_create(pmixp_free_buf);
496 
497 	for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
498 		coll_ctx = &ring->ctx_array[i];
499 		coll_ctx->coll = coll;
500 		coll_ctx->in_use = false;
501 		coll_ctx->seq = coll->seq;
502 		coll_ctx->contrib_local = false;
503 		coll_ctx->contrib_prev = 0;
504 		coll_ctx->state = PMIXP_COLL_RING_SYNC;
505 		// TODO bit vector
506 		coll_ctx->contrib_map = xmalloc(sizeof(bool) * coll->peers_cnt);
507 	}
508 
509 	return SLURM_SUCCESS;
510 }
511 
pmixp_coll_ring_free(pmixp_coll_ring_t * ring)512 void pmixp_coll_ring_free(pmixp_coll_ring_t *ring)
513 {
514 	int i;
515 
516 	pmixp_coll_ring_ctx_t *coll_ctx;
517 	for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
518 		coll_ctx = &ring->ctx_array[i];
519 		FREE_NULL_BUFFER(coll_ctx->ring_buf);
520 		xfree(coll_ctx->contrib_map);
521 	}
522 	list_destroy(ring->fwrd_buf_pool);
523 	list_destroy(ring->ring_buf_pool);
524 }
525 
_pmixp_coll_contrib(pmixp_coll_ring_ctx_t * coll_ctx,int contrib_id,uint32_t hop,char * data,size_t size)526 inline static int _pmixp_coll_contrib(pmixp_coll_ring_ctx_t *coll_ctx,
527 				      int contrib_id,
528 				      uint32_t hop, char *data, size_t size)
529 {
530 	pmixp_coll_t *coll = _ctx_get_coll(coll_ctx);
531 	char *data_ptr = NULL;
532 	int ret;
533 
534 	/* change the state */
535 	coll->ts = time(NULL);
536 
537 	/* save contribution */
538 	if (!size_buf(coll_ctx->ring_buf)) {
539 		grow_buf(coll_ctx->ring_buf, size * coll->peers_cnt);
540 	} else if(remaining_buf(coll_ctx->ring_buf) < size) {
541 		uint32_t new_size = size_buf(coll_ctx->ring_buf) + size *
542 			_ring_remain_contrib(coll_ctx);
543 		grow_buf(coll_ctx->ring_buf, new_size);
544 	}
545 	grow_buf(coll_ctx->ring_buf, size);
546 	data_ptr = get_buf_data(coll_ctx->ring_buf) +
547 		get_buf_offset(coll_ctx->ring_buf);
548 	memcpy(data_ptr, data, size);
549 	set_buf_offset(coll_ctx->ring_buf,
550 		       get_buf_offset(coll_ctx->ring_buf) + size);
551 
552 	/* check for ring is complete */
553 	if (contrib_id != _ring_next_id(coll)) {
554 		/* forward data to the next node */
555 		ret = _ring_forward_data(coll_ctx, contrib_id, hop,
556 					 data_ptr, size);
557 		if (ret) {
558 			PMIXP_ERROR("Cannot forward ring data");
559 			return SLURM_ERROR;
560 		}
561 	}
562 
563 	return SLURM_SUCCESS;
564 }
565 
pmixp_coll_ring_local(pmixp_coll_t * coll,char * data,size_t size,void * cbfunc,void * cbdata)566 int pmixp_coll_ring_local(pmixp_coll_t *coll, char *data, size_t size,
567 			  void *cbfunc, void *cbdata)
568 {
569 	int ret = SLURM_SUCCESS;
570 	pmixp_coll_ring_ctx_t *coll_ctx = NULL;
571 
572 	/* lock the structure */
573 	slurm_mutex_lock(&coll->lock);
574 
575 	/* sanity check */
576 	pmixp_coll_sanity_check(coll);
577 
578 	/* setup callback info */
579 	coll->cbfunc = cbfunc;
580 	coll->cbdata = cbdata;
581 
582 	coll_ctx = pmixp_coll_ring_ctx_new(coll);
583 	if (!coll_ctx) {
584 		PMIXP_ERROR("Can not get new ring collective context, seq=%u",
585 			    coll->seq);
586 		ret = SLURM_ERROR;
587 		goto exit;
588 	}
589 
590 #ifdef PMIXP_COLL_DEBUG
591 	PMIXP_DEBUG("%p: contrib/loc: seqnum=%u, state=%d, size=%lu",
592 		    coll_ctx, coll_ctx->seq, coll_ctx->state, size);
593 #endif
594 
595 	if (_pmixp_coll_contrib(coll_ctx, coll->my_peerid, 0, data, size)) {
596 		goto exit;
597 	}
598 
599 	/* mark local contribution */
600 	coll_ctx->contrib_local = true;
601 	_progress_coll_ring(coll_ctx);
602 
603 exit:
604 	/* unlock the structure */
605 	slurm_mutex_unlock(&coll->lock);
606 
607 	return ret;
608 }
609 
pmixp_coll_ring_check(pmixp_coll_t * coll,pmixp_coll_ring_msg_hdr_t * hdr)610 int pmixp_coll_ring_check(pmixp_coll_t *coll, pmixp_coll_ring_msg_hdr_t *hdr)
611 {
612 	char *nodename = NULL;
613 	int rc;
614 
615 	if (hdr->nodeid != _ring_prev_id(coll)) {
616 		nodename = pmixp_info_job_host(hdr->nodeid);
617 		PMIXP_ERROR("%p: unexpected contrib from %s:%u, expected is %d",
618 			    coll, nodename, hdr->nodeid, _ring_prev_id(coll));
619 		return SLURM_ERROR;
620 	}
621 	rc = pmixp_coll_check(coll, hdr->seq);
622 	if (PMIXP_COLL_REQ_FAILURE == rc) {
623 		/* this is an unacceptable event: either something went
624 		 * really wrong or the state machine is incorrect.
625 		 * This will 100% lead to application hang.
626 		 */
627 		nodename = pmixp_info_job_host(hdr->nodeid);
628 		PMIXP_ERROR("Bad collective seq. #%d from %s:%u, current is %d",
629 			    hdr->seq, nodename, hdr->nodeid, coll->seq);
630 		pmixp_debug_hang(0); /* enable hang to debug this! */
631 		slurm_kill_job_step(pmixp_info_jobid(),
632 				    pmixp_info_stepid(), SIGKILL);
633 		xfree(nodename);
634 		return SLURM_SUCCESS;
635 	} else if (PMIXP_COLL_REQ_SKIP == rc) {
636 #ifdef PMIXP_COLL_DEBUG
637 		nodename = pmixp_info_job_host(hdr->nodeid);
638 		PMIXP_ERROR("Wrong collective seq. #%d from nodeid %u, current is %d, skip this message",
639 			    hdr->seq, hdr->nodeid, coll->seq);
640 #endif
641 		return SLURM_ERROR;
642 	}
643 	return SLURM_SUCCESS;
644 }
645 
pmixp_coll_ring_neighbor(pmixp_coll_t * coll,pmixp_coll_ring_msg_hdr_t * hdr,Buf buf)646 int pmixp_coll_ring_neighbor(pmixp_coll_t *coll, pmixp_coll_ring_msg_hdr_t *hdr,
647 			     Buf buf)
648 {
649 	int ret = SLURM_SUCCESS;
650 	char *data_ptr = NULL;
651 	pmixp_coll_ring_ctx_t *coll_ctx = NULL;
652 	uint32_t hop_seq;
653 
654 	/* lock the structure */
655 	slurm_mutex_lock(&coll->lock);
656 
657 	coll_ctx = pmixp_coll_ring_ctx_select(coll, hdr->seq);
658 	if (!coll_ctx) {
659 		PMIXP_ERROR("Can not get ring collective context, seq=%u",
660 			    hdr->seq);
661 		ret = SLURM_ERROR;
662 		goto exit;
663 	}
664 #ifdef PMIXP_COLL_DEBUG
665 	PMIXP_DEBUG("%p: contrib/nbr: seqnum=%u, state=%d, nodeid=%d, contrib=%d, seq=%d, size=%lu",
666 		    coll_ctx, coll_ctx->seq, coll_ctx->state, hdr->nodeid,
667 		    hdr->contrib_id, hdr->hop_seq, hdr->msgsize);
668 #endif
669 
670 
671 	/* verify msg size */
672 	if (hdr->msgsize != remaining_buf(buf)) {
673 #ifdef PMIXP_COLL_DEBUG
674 		PMIXP_DEBUG("%p: unexpected message size=%d, expect=%zu",
675 			    coll, remaining_buf(buf), hdr->msgsize);
676 #endif
677 		goto exit;
678 	}
679 
680 	/* compute the actual hops of ring: (src - dst + size) % size */
681 	hop_seq = (coll->my_peerid + coll->peers_cnt - hdr->contrib_id) %
682 		coll->peers_cnt - 1;
683 	if (hdr->hop_seq != hop_seq) {
684 #ifdef PMIXP_COLL_DEBUG
685 		PMIXP_DEBUG("%p: unexpected ring seq number=%d, expect=%d, coll seq=%d",
686 			    coll, hdr->hop_seq, hop_seq, coll->seq);
687 #endif
688 		goto exit;
689 	}
690 
691 	if (hdr->contrib_id >= coll->peers_cnt) {
692 		goto exit;
693 	}
694 
695 	if (coll_ctx->contrib_map[hdr->contrib_id]) {
696 #ifdef PMIXP_COLL_DEBUG
697 		PMIXP_DEBUG("%p: double receiving was detected from %d, "
698 			    "local seq=%d, seq=%d, rejected",
699 			    coll, hdr->contrib_id, coll->seq, hdr->seq);
700 #endif
701 		goto exit;
702 	}
703 
704 	/* mark number of individual contributions */
705 	coll_ctx->contrib_map[hdr->contrib_id] = true;
706 
707 	data_ptr = get_buf_data(buf) + get_buf_offset(buf);
708 	if (_pmixp_coll_contrib(coll_ctx, hdr->contrib_id, hdr->hop_seq + 1,
709 				data_ptr, remaining_buf(buf))) {
710 		goto exit;
711 	}
712 
713 	/* increase number of ring contributions */
714 	coll_ctx->contrib_prev++;
715 
716 	/* ring coll progress */
717 	_progress_coll_ring(coll_ctx);
718 exit:
719 	/* unlock the structure */
720 	slurm_mutex_unlock(&coll->lock);
721 	return ret;
722 }
723 
pmixp_coll_ring_reset_if_to(pmixp_coll_t * coll,time_t ts)724 void pmixp_coll_ring_reset_if_to(pmixp_coll_t *coll, time_t ts) {
725 	pmixp_coll_ring_ctx_t *coll_ctx;
726 	int i;
727 
728 	/* lock the structure */
729 	slurm_mutex_lock(&coll->lock);
730 	for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
731 		coll_ctx = &coll->state.ring.ctx_array[i];
732 		if (!coll_ctx->in_use ||
733 		    (PMIXP_COLL_RING_SYNC == coll_ctx->state)) {
734 			continue;
735 		}
736 		if (ts - coll->ts > pmixp_info_timeout()) {
737 			/* respond to the libpmix */
738 			pmixp_coll_localcb_nodata(coll, PMIXP_ERR_TIMEOUT);
739 
740 			/* report the timeout event */
741 			PMIXP_ERROR("%p: collective timeout seq=%d",
742 				    coll, coll_ctx->seq);
743 			pmixp_coll_log(coll);
744 			/* drop the collective */
745 			_reset_coll_ring(coll_ctx);
746 		}
747 	}
748 	/* unlock the structure */
749 	slurm_mutex_unlock(&coll->lock);
750 }
751 
pmixp_coll_ring_log(pmixp_coll_t * coll)752 void pmixp_coll_ring_log(pmixp_coll_t *coll)
753 {
754 	int i;
755 	pmixp_coll_ring_t *ring = &coll->state.ring;
756 	char *nodename, *next, *prev;
757 	char *out_str = NULL;
758 
759 	PMIXP_ERROR("%p: %s state seq=%d",
760 		    coll, pmixp_coll_type2str(coll->type), coll->seq);
761 	nodename = pmixp_info_job_host(coll->my_peerid);
762 	PMIXP_ERROR("my peerid: %d:%s", coll->my_peerid, nodename);
763 	xfree(nodename);
764 
765 	next = pmixp_info_job_host(_ring_next_id(coll));
766 	prev = pmixp_info_job_host(_ring_prev_id(coll));
767 	xstrfmtcat(out_str,"neighbor id: next %d:%s, prev %d:%s",
768 		   _ring_next_id(coll), next, _ring_prev_id(coll), prev);
769 	PMIXP_ERROR("%s", out_str);
770 	xfree(next);
771 	xfree(prev);
772 	xfree(out_str);
773 
774 
775 	for (i = 0; i < PMIXP_COLL_RING_CTX_NUM; i++) {
776 		pmixp_coll_ring_ctx_t *coll_ctx = &ring->ctx_array[i];
777 
778 		PMIXP_ERROR("Context ptr=%p, #%d, in-use=%d",
779 			    coll_ctx, i, coll_ctx->in_use);
780 
781 		if (coll_ctx->in_use) {
782 			int id;
783 			char *done_contrib = NULL, *wait_contrib = NULL;
784 			hostlist_t hl_done_contrib = NULL,
785 				hl_wait_contrib = NULL, *tmp_list;
786 
787 			PMIXP_ERROR("\t seq=%d contribs: loc=%d/prev=%d/fwd=%d",
788 				    coll_ctx->seq, coll_ctx->contrib_local,
789 				    coll_ctx->contrib_prev,
790 				    coll_ctx->forward_cnt);
791 			PMIXP_ERROR("\t neighbor contribs [%d]:",
792 				    coll->peers_cnt);
793 
794 			for (id = 0; id < coll->peers_cnt; id++) {
795 				char *nodename;
796 
797 				if (coll->my_peerid == id)
798 					continue;
799 
800 				nodename = pmixp_info_job_host(id);
801 
802 				tmp_list = coll_ctx->contrib_map[id] ?
803 					&hl_done_contrib : &hl_wait_contrib;
804 
805 				if (!*tmp_list)
806 					*tmp_list = hostlist_create(nodename);
807 				else
808 					hostlist_push_host(*tmp_list, nodename);
809 				xfree(nodename);
810 			}
811 			if (hl_done_contrib) {
812 				done_contrib =
813 					slurm_hostlist_ranged_string_xmalloc(
814 						hl_done_contrib);
815 				FREE_NULL_HOSTLIST(hl_done_contrib);
816 			}
817 
818 			if (hl_wait_contrib) {
819 				wait_contrib =
820 					slurm_hostlist_ranged_string_xmalloc(
821 						hl_wait_contrib);
822 				FREE_NULL_HOSTLIST(hl_wait_contrib);
823 			}
824 			PMIXP_ERROR("\t\t done contrib: %s",
825 				    done_contrib ? done_contrib : "-");
826 			PMIXP_ERROR("\t\t wait contrib: %s",
827 				    wait_contrib ? wait_contrib : "-");
828 			PMIXP_ERROR("\t status=%s",
829 				    pmixp_coll_ring_state2str(coll_ctx->state));
830 			if (coll_ctx->ring_buf) {
831 				PMIXP_ERROR("\t buf (offset/size): %u/%u",
832 					    get_buf_offset(coll_ctx->ring_buf),
833 					    size_buf(coll_ctx->ring_buf));
834 			}
835 			xfree(done_contrib);
836 			xfree(wait_contrib);
837 		}
838 	}
839 }
840