1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2015-2019 Intel, Inc. All rights reserved.
4 * Copyright (c) 2015 Research Organization for Information Science
5 * and Technology (RIST). All rights reserved.
6 * Copyright (c) 2015-2019 Mellanox Technologies, Inc.
7 * All rights reserved.
8 * $COPYRIGHT$
9 *
10 * Additional copyrights may follow
11 *
12 * $HEADER$
13 */
14
15 #include "cli_stages.h"
16
17 cli_info_t *cli_info = NULL;
18 int cli_info_cnt = 0;
19 bool test_abort = false;
20
cli_rank(cli_info_t * cli)21 int cli_rank(cli_info_t *cli)
22 {
23 int i;
24 for(i=0; i < cli_info_cnt; i++){
25 if( cli == &cli_info[i] ){
26 return cli->rank;
27 }
28 }
29 return -1;
30 }
31
cli_init(int nprocs)32 void cli_init(int nprocs)
33 {
34 int n, i;
35 cli_state_t order[CLI_TERM+1];
36
37 cli_info = malloc( sizeof(cli_info_t) * nprocs);
38 cli_info_cnt = nprocs;
39
40 order[CLI_UNINIT] = CLI_FORKED;
41 order[CLI_FORKED] = CLI_FIN;
42 order[CLI_CONNECTED] = CLI_UNDEF;
43 order[CLI_FIN] = CLI_TERM;
44 order[CLI_DISCONN] = CLI_UNDEF;
45 order[CLI_TERM] = CLI_UNDEF;
46
47 for (n=0; n < nprocs; n++) {
48 cli_info[n].sd = -1;
49 cli_info[n].ev = NULL;
50 cli_info[n].pid = -1;
51 cli_info[n].state = CLI_UNINIT;
52 PMIX_CONSTRUCT(&(cli_info[n].modex), pmix_list_t);
53 for (i = 0; i < CLI_TERM+1; i++) {
54 cli_info[n].next_state[i] = order[i];
55 }
56 cli_info[n].rank = -1;
57 cli_info[n].ns = NULL;
58 }
59 }
60
cli_connect(cli_info_t * cli,int sd,struct event_base * ebase,event_callback_fn callback)61 void cli_connect(cli_info_t *cli, int sd, struct event_base * ebase, event_callback_fn callback)
62 {
63 if( CLI_CONNECTED != cli->next_state[cli->state] ){
64 TEST_ERROR(("Rank %d has bad next state: expect %d have %d!",
65 cli_rank(cli), CLI_CONNECTED, cli->next_state[cli->state]));
66 test_abort = true;
67 return;
68 }
69
70 cli->sd = sd;
71 cli->ev = event_new(ebase, sd,
72 EV_READ|EV_PERSIST, callback, cli);
73 event_add(cli->ev,NULL);
74 pmix_ptl_base_set_nonblocking(sd);
75 TEST_VERBOSE(("Connection accepted from rank %d", cli_rank(cli) ));
76 cli->state = CLI_CONNECTED;
77 }
78
cli_finalize(cli_info_t * cli)79 void cli_finalize(cli_info_t *cli)
80 {
81 if( CLI_FIN != cli->next_state[cli->state] ){
82 TEST_ERROR(("rank %d: bad client next state: expect %d have %d!",
83 cli_rank(cli), CLI_FIN, cli->next_state[cli->state]));
84 test_abort = true;
85 }
86
87 cli->state = CLI_FIN;
88 }
89
cli_disconnect(cli_info_t * cli)90 void cli_disconnect(cli_info_t *cli)
91 {
92 if( CLI_DISCONN != cli->next_state[cli->state] ){
93 TEST_ERROR(("rank %d: bad client next state: expect %d have %d!",
94 cli_rank(cli), CLI_DISCONN, cli->next_state[cli->state]));
95 test_abort = true;
96 }
97
98 if( 0 > cli->sd ){
99 TEST_ERROR(("Bad sd = %d of rank = %d ", cli->sd, cli_rank(cli)));
100 test_abort = true;
101 } else {
102 TEST_VERBOSE(("close sd = %d for rank = %d", cli->sd, cli_rank(cli)));
103 close(cli->sd);
104 cli->sd = -1;
105 }
106
107 if( NULL == cli->ev ){
108 TEST_ERROR(("Bad ev = NULL of rank = %d ", cli_rank(cli)));
109 test_abort = true;
110 } else {
111 TEST_VERBOSE(("remove event of rank %d from event queue", cli_rank(cli)));
112 event_del(cli->ev);
113 event_free(cli->ev);
114 cli->ev = NULL;
115 }
116
117 TEST_VERBOSE(("Destruct modex list for the rank %d", cli_rank(cli)));
118 PMIX_LIST_DESTRUCT(&(cli->modex));
119
120 cli->state = CLI_DISCONN;
121 }
122
cli_terminate(cli_info_t * cli)123 void cli_terminate(cli_info_t *cli)
124 {
125 if( CLI_TERM != cli->next_state[cli->state] ){
126 TEST_ERROR(("rank %d: bad client next state: expect %d have %d!",
127 cli_rank(cli), CLI_TERM, cli->next_state[cli->state]));
128 test_abort = true;
129 }
130 cli->pid = -1;
131 TEST_VERBOSE(("Client rank = %d terminated", cli_rank(cli)));
132 cli->state = CLI_TERM;
133 if (NULL != cli->ns) {
134 free(cli->ns);
135 }
136 }
137
cli_cleanup(cli_info_t * cli)138 void cli_cleanup(cli_info_t *cli)
139 {
140 if (CLI_TERM < cli->state) {
141 TEST_ERROR(("Bad rank %d state %d", cli_rank(cli), cli->state));
142 test_abort = true;
143 return;
144 }
145 switch( cli->next_state[cli->state] ){
146 case CLI_FORKED:
147 break;
148 case CLI_CONNECTED:
149 /* error - means that process terminated w/o calling finalize */
150 if (!test_abort) {
151 TEST_ERROR(("rank %d with state %d unexpectedly terminated.", cli_rank(cli), cli->state));
152 }
153 cli->state = CLI_TERM;
154 test_abort = true;
155 break;
156 case CLI_FIN:
157 /* error - means that process terminated w/o calling finalize */
158 if (!test_abort) {
159 TEST_ERROR(("rank %d with state %d unexpectedly terminated.", cli_rank(cli), cli->state));
160 }
161 cli_finalize(cli);
162 cli_cleanup(cli);
163 test_abort = true;
164 break;
165 case CLI_DISCONN:
166 cli_disconnect(cli);
167 cli_cleanup(cli);
168 break;
169 case CLI_TERM:
170 cli_terminate(cli);
171 break;
172 default:
173 TEST_ERROR(("Bad rank %d next state %d", cli_rank(cli), cli->next_state[cli->state]));
174 test_abort = true;
175 return;
176 }
177 }
178
179
test_terminated(void)180 bool test_terminated(void)
181 {
182 bool ret = true;
183 int i;
184
185 // All clients should disconnect
186 for(i=0; i < cli_info_cnt; i++){
187 ret = ret && (CLI_TERM <= cli_info[i].state);
188 }
189 return (ret || test_abort);
190 }
191
cli_wait_all(double timeout)192 void cli_wait_all(double timeout)
193 {
194 struct timeval tv;
195 double start_time, cur_time;
196
197 gettimeofday(&tv, NULL);
198 start_time = tv.tv_sec + 1E-6*tv.tv_usec;
199 cur_time = start_time;
200
201 //TEST_VERBOSE(("Wait for all children to terminate"))
202
203 // Wait for all children to cleanup after the test.
204 while( !test_terminated() && ( timeout >= (cur_time - start_time) ) ){
205 struct timespec ts;
206 int status, i;
207 pid_t pid;
208 while( 0 < (pid = waitpid(-1, &status, WNOHANG) ) ){
209 TEST_VERBOSE(("waitpid = %d", pid));
210 for(i=0; i < cli_info_cnt; i++){
211 if( cli_info[i].pid == pid ){
212 TEST_VERBOSE(("the child with pid = %d has rank = %d, ns = %s\n"
213 "\t\texited = %d, signalled = %d", pid,
214 cli_info[i].rank, cli_info[i].ns,
215 WIFEXITED(status), WIFSIGNALED(status) ));
216 if( WIFEXITED(status) || WIFSIGNALED(status) ){
217 cli_cleanup(&cli_info[i]);
218 }
219 }
220 }
221 }
222 if( pid < 0 ){
223 if( errno == ECHILD ){
224 TEST_VERBOSE(("No more children to wait. Happens on the last cli_wait_all call "
225 "which is used to ensure that all children terminated.\n"));
226 if (pmix_test_verbose) {
227 sleep(1);
228 }
229 break;
230 } else {
231 TEST_ERROR(("waitpid(): %d : %s", errno, strerror(errno)));
232 exit(0);
233 }
234 }
235 ts.tv_sec = 0;
236 ts.tv_nsec = 100000;
237 nanosleep(&ts, NULL);
238 // calculate current timestamp
239 gettimeofday(&tv, NULL);
240 cur_time = tv.tv_sec + 1E-6*tv.tv_usec;
241 }
242 }
243
cli_kill_all(void)244 void cli_kill_all(void)
245 {
246 int i;
247 for(i = 0; i < cli_info_cnt; i++){
248 if( CLI_UNINIT == cli_info[i].state ){
249 TEST_ERROR(("Skip rank %d as it wasn't ever initialized (shouldn't happe)",
250 i));
251 continue;
252 } else if( CLI_TERM <= cli_info[i].state ){
253 TEST_VERBOSE(("Skip rank %d as it was already terminated.", i));
254 continue;
255
256 }
257 TEST_VERBOSE(("Kill rank %d (pid = %d).", i, cli_info[i].pid));
258 kill(cli_info[i].pid, SIGKILL);
259 cli_cleanup(&cli_info[i]);
260 }
261 }
262
errhandler(size_t evhdlr_registration_id,pmix_status_t status,const pmix_proc_t * source,pmix_info_t info[],size_t ninfo,pmix_info_t results[],size_t nresults,pmix_event_notification_cbfunc_fn_t cbfunc,void * cbdata)263 void errhandler(size_t evhdlr_registration_id,
264 pmix_status_t status,
265 const pmix_proc_t *source,
266 pmix_info_t info[], size_t ninfo,
267 pmix_info_t results[], size_t nresults,
268 pmix_event_notification_cbfunc_fn_t cbfunc,
269 void *cbdata)
270 {
271 TEST_ERROR((" PMIX server event handler with status = %d", status));
272 /* notify clients of error */
273 PMIx_Notify_event(status, source,
274 PMIX_RANGE_NAMESPACE,
275 NULL, 0,
276 op_callbk, NULL);
277 }
278
op_callbk(pmix_status_t status,void * cbdata)279 void op_callbk(pmix_status_t status,
280 void *cbdata)
281 {
282 TEST_VERBOSE(( "OP CALLBACK CALLED WITH STATUS %d", status));
283 }
284
errhandler_reg_callbk(pmix_status_t status,size_t errhandler_ref,void * cbdata)285 void errhandler_reg_callbk (pmix_status_t status,
286 size_t errhandler_ref,
287 void *cbdata)
288 {
289 TEST_VERBOSE(("ERRHANDLER REGISTRATION CALLBACK CALLED WITH STATUS %d, ref=%lu",
290 status, (unsigned long)errhandler_ref));
291 }
292