1 #include	"BSprivate.h"
2 
3 /*@ BSoffset - Find a consistent global numbering
4 
5     Input Parameters:
6 .   length - number of classes of things to order
7 .   array - the number of things in each class on this processor
8 .   procinfo - the usual processor stuff
9 
10     Output Parameters:
11 .   offsets - the beginning number for each class on this processor
12               in the global numbering
13 
14     Returns:
15     the number of classes across all the processors
16 
17     Notes:
18     This routine, given a list of totals, finds where we should be
19     numbered.  For example, if proc 0 submits a list of 10 and 20
20     and proc 1 submits a list of 5 and 11 and proc 2 submits a
21     list of 6 and 9, we want to do a tree ordering where
22     proc 0 gets back 5 and 32, proc 1 gets back 0 and 21 and
23     proc 2 gets back 15 and 52.
24 
25  @*/
BSoffset(int length,int * array,int ** offsets,BSprocinfo * procinfo)26 int BSoffset(int length, int *array, int **offsets, BSprocinfo *procinfo)
27 {
28 	int	global_len, work;
29 	int	parent, l_child, r_child;
30 	int	*t_offsets;
31 	int	*recv_buf;
32 	int	i;
33 	int	num_msg, num_children;
34 	int	*l_buffer, *r_buffer;
35 	int	count;
36 	BSmsg_list	*msg_list = NULL;
37 	MPI_Status	mpi_status;
38 
39 	/* first, agree on global length */
40 	global_len = length;
41 	GIMAX(&global_len,1,&work,procinfo->procset); CHKERRN(0);
42 
43 	/* allocate and initialize offsets buffer */
44 	MY_MALLOCN(t_offsets,(int *),sizeof(int)*global_len,1);
45 	(*offsets) = t_offsets;
46 	for (i=0;i<global_len;i++) t_offsets[i] = 0;
47 	for (i=0;i<length;i++) t_offsets[i] = array[i];
48 
49 	/* find my place in the tree */
50 	PSNbrTree(PS_PARENT,parent,procinfo->procset);
51 	PSNbrTree(PS_LCHILD,l_child,procinfo->procset);
52 	PSNbrTree(PS_RCHILD,r_child,procinfo->procset);
53 
54 	/* collect messages from children */
55 	num_children = 0;
56     if (l_child >= 0) num_children++;
57     if (r_child >= 0) num_children++;
58 	l_buffer = NULL;
59 	r_buffer = NULL;
60 	num_msg = 0;
61 	while (num_msg < num_children) {
62 		MY_MALLOCN(recv_buf,(int *),sizeof(int)*global_len,2);
63 		RECVSYNCNOMEM(OFFSETU_MSG,recv_buf,global_len,MPI_INT,procinfo,
64 			mpi_status);
65 		CHKERRN(0);
66 		num_msg++;
67 		for (i=0;i<global_len;i++) t_offsets[i] += recv_buf[i];
68 		if (mpi_status.MPI_SOURCE == l_child) l_buffer = recv_buf;
69 		if (mpi_status.MPI_SOURCE == r_child) r_buffer = recv_buf;
70 	}
71 
72 	/* send message to parent, if not root */
73 	if (!(PSISROOT(procinfo))) {
74 		MY_SEND_SYNC(msg_list,OFFSETU_MSG,t_offsets,global_len,parent,MPI_INT,procinfo);
75 	}
76 
77 	/* receive from parent my base numbering */
78 	MY_MALLOCN(recv_buf,(int *),sizeof(int)*global_len,3);
79 	if (PSISROOT(procinfo)) {
80 		count = 0;
81 		for (i=0;i<global_len;i++) {
82 			recv_buf[i] = count;
83 			count += t_offsets[i];
84 		}
85 	} else {
86 		RECVSYNCNOMEM(OFFSETD_MSG,recv_buf,global_len,MPI_INT,procinfo,
87 			mpi_status);
88 		CHKERRN(0);
89 	}
90 
91 	/* tell left child its offsets */
92 	if (l_buffer != NULL) {
93 		MY_SEND_SYNC(msg_list,OFFSETD_MSG,recv_buf,global_len,l_child,MPI_INT,procinfo);
94 	}
95 
96 	/* get my offsets */
97 	if (l_buffer != NULL) {
98 		for (i=0;i<global_len;i++) t_offsets[i] = l_buffer[i]+recv_buf[i];
99 	} else {
100 		for (i=0;i<global_len;i++) t_offsets[i] = recv_buf[i];
101 	}
102 
103 	/* tell right child its offsets */
104 	if (r_buffer != NULL) {
105 		for (i=0;i<global_len;i++) r_buffer[i] = t_offsets[i];
106 		for (i=0;i<length;i++) r_buffer[i] += array[i];
107 		MY_SEND_SYNC(msg_list,OFFSETD_MSG,r_buffer,global_len,r_child,MPI_INT,procinfo);
108 	}
109 	MY_FREE(recv_buf);
110 	if (r_buffer != NULL) MY_FREE(r_buffer);
111 	if (l_buffer != NULL) MY_FREE(l_buffer);
112 	FINISH_SEND_LIST(msg_list);
113 	return(global_len);
114 }
115 
116