1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3  * Copyright (c) 2015-2019 Intel, Inc.  All rights reserved.
4  * Copyright (c) 2015      Research Organization for Information Science
5  *                         and Technology (RIST). All rights reserved.
6  * Copyright (c) 2015-2019 Mellanox Technologies, Inc.
7  *                         All rights reserved.
8  * $COPYRIGHT$
9  *
10  * Additional copyrights may follow
11  *
12  * $HEADER$
13  */
14 
15 #include "cli_stages.h"
16 
17 cli_info_t *cli_info = NULL;
18 int cli_info_cnt = 0;
19 bool test_abort = false;
20 
cli_rank(cli_info_t * cli)21 int cli_rank(cli_info_t *cli)
22 {
23     int i;
24     for(i=0; i < cli_info_cnt; i++){
25         if( cli == &cli_info[i] ){
26             return cli->rank;
27         }
28     }
29     return -1;
30 }
31 
cli_init(int nprocs)32 void cli_init(int nprocs)
33 {
34     int n, i;
35     cli_state_t order[CLI_TERM+1];
36 
37     cli_info = malloc( sizeof(cli_info_t) * nprocs);
38     cli_info_cnt = nprocs;
39 
40     order[CLI_UNINIT] = CLI_FORKED;
41     order[CLI_FORKED] = CLI_FIN;
42     order[CLI_CONNECTED] = CLI_UNDEF;
43     order[CLI_FIN] = CLI_TERM;
44     order[CLI_DISCONN] = CLI_UNDEF;
45     order[CLI_TERM] = CLI_UNDEF;
46 
47     for (n=0; n < nprocs; n++) {
48         cli_info[n].sd = -1;
49         cli_info[n].ev = NULL;
50         cli_info[n].pid = -1;
51         cli_info[n].state = CLI_UNINIT;
52         PMIX_CONSTRUCT(&(cli_info[n].modex), pmix_list_t);
53         for (i = 0; i < CLI_TERM+1; i++) {
54             cli_info[n].next_state[i] = order[i];
55         }
56         cli_info[n].rank = -1;
57         cli_info[n].ns = NULL;
58     }
59 }
60 
cli_connect(cli_info_t * cli,int sd,struct event_base * ebase,event_callback_fn callback)61 void cli_connect(cli_info_t *cli, int sd, struct event_base * ebase, event_callback_fn callback)
62 {
63     if( CLI_CONNECTED != cli->next_state[cli->state] ){
64         TEST_ERROR(("Rank %d has bad next state: expect %d have %d!",
65                      cli_rank(cli), CLI_CONNECTED, cli->next_state[cli->state]));
66         test_abort = true;
67         return;
68     }
69 
70     cli->sd = sd;
71     cli->ev = event_new(ebase, sd,
72                       EV_READ|EV_PERSIST, callback, cli);
73     event_add(cli->ev,NULL);
74     pmix_ptl_base_set_nonblocking(sd);
75     TEST_VERBOSE(("Connection accepted from rank %d", cli_rank(cli) ));
76     cli->state = CLI_CONNECTED;
77 }
78 
cli_finalize(cli_info_t * cli)79 void cli_finalize(cli_info_t *cli)
80 {
81     if( CLI_FIN != cli->next_state[cli->state] ){
82         TEST_ERROR(("rank %d: bad client next state: expect %d have %d!",
83                      cli_rank(cli), CLI_FIN, cli->next_state[cli->state]));
84         test_abort = true;
85     }
86 
87     cli->state = CLI_FIN;
88 }
89 
cli_disconnect(cli_info_t * cli)90 void cli_disconnect(cli_info_t *cli)
91 {
92     if( CLI_DISCONN != cli->next_state[cli->state] ){
93         TEST_ERROR(("rank %d: bad client next state: expect %d have %d!",
94                      cli_rank(cli), CLI_DISCONN, cli->next_state[cli->state]));
95         test_abort = true;
96     }
97 
98     if( 0 > cli->sd ){
99         TEST_ERROR(("Bad sd = %d of rank = %d ", cli->sd, cli_rank(cli)));
100         test_abort = true;
101     } else {
102         TEST_VERBOSE(("close sd = %d for rank = %d", cli->sd, cli_rank(cli)));
103         close(cli->sd);
104         cli->sd = -1;
105     }
106 
107     if( NULL == cli->ev ){
108         TEST_ERROR(("Bad ev = NULL of rank = %d ", cli_rank(cli)));
109         test_abort = true;
110     } else {
111         TEST_VERBOSE(("remove event of rank %d from event queue", cli_rank(cli)));
112         event_del(cli->ev);
113         event_free(cli->ev);
114         cli->ev = NULL;
115     }
116 
117     TEST_VERBOSE(("Destruct modex list for the rank %d", cli_rank(cli)));
118     PMIX_LIST_DESTRUCT(&(cli->modex));
119 
120     cli->state = CLI_DISCONN;
121 }
122 
cli_terminate(cli_info_t * cli)123 void cli_terminate(cli_info_t *cli)
124 {
125     if( CLI_TERM != cli->next_state[cli->state] ){
126         TEST_ERROR(("rank %d: bad client next state: expect %d have %d!",
127                      cli_rank(cli), CLI_TERM, cli->next_state[cli->state]));
128         test_abort = true;
129     }
130     cli->pid = -1;
131     TEST_VERBOSE(("Client rank = %d terminated", cli_rank(cli)));
132     cli->state = CLI_TERM;
133     if (NULL != cli->ns) {
134         free(cli->ns);
135     }
136 }
137 
cli_cleanup(cli_info_t * cli)138 void cli_cleanup(cli_info_t *cli)
139 {
140     if (CLI_TERM < cli->state) {
141         TEST_ERROR(("Bad rank %d state %d", cli_rank(cli), cli->state));
142         test_abort = true;
143         return;
144     }
145     switch( cli->next_state[cli->state] ){
146     case CLI_FORKED:
147         break;
148     case CLI_CONNECTED:
149         /* error - means that process terminated w/o calling finalize */
150         if (!test_abort) {
151             TEST_ERROR(("rank %d with state %d unexpectedly terminated.", cli_rank(cli), cli->state));
152         }
153         cli->state = CLI_TERM;
154         test_abort = true;
155         break;
156     case CLI_FIN:
157         /* error - means that process terminated w/o calling finalize */
158         if (!test_abort) {
159             TEST_ERROR(("rank %d with state %d unexpectedly terminated.", cli_rank(cli), cli->state));
160         }
161         cli_finalize(cli);
162         cli_cleanup(cli);
163         test_abort = true;
164         break;
165     case CLI_DISCONN:
166         cli_disconnect(cli);
167         cli_cleanup(cli);
168         break;
169     case CLI_TERM:
170         cli_terminate(cli);
171         break;
172     default:
173         TEST_ERROR(("Bad rank %d next state %d", cli_rank(cli), cli->next_state[cli->state]));
174         test_abort = true;
175         return;
176     }
177 }
178 
179 
test_terminated(void)180 bool test_terminated(void)
181 {
182     bool ret = true;
183     int i;
184 
185     // All clients should disconnect
186     for(i=0; i < cli_info_cnt; i++){
187         ret = ret && (CLI_TERM <= cli_info[i].state);
188     }
189     return (ret || test_abort);
190 }
191 
cli_wait_all(double timeout)192 void cli_wait_all(double timeout)
193 {
194     struct timeval tv;
195     double start_time, cur_time;
196 
197     gettimeofday(&tv, NULL);
198     start_time = tv.tv_sec + 1E-6*tv.tv_usec;
199     cur_time = start_time;
200 
201     //TEST_VERBOSE(("Wait for all children to terminate"))
202 
203     // Wait for all children to cleanup after the test.
204     while( !test_terminated() && ( timeout >= (cur_time - start_time) ) ){
205         struct timespec ts;
206         int status, i;
207         pid_t pid;
208         while( 0 < (pid = waitpid(-1, &status, WNOHANG) ) ){
209             TEST_VERBOSE(("waitpid = %d", pid));
210             for(i=0; i < cli_info_cnt; i++){
211                 if( cli_info[i].pid == pid ){
212                     TEST_VERBOSE(("the child with pid = %d has rank = %d, ns = %s\n"
213                                 "\t\texited = %d, signalled = %d", pid,
214                                   cli_info[i].rank, cli_info[i].ns,
215                                 WIFEXITED(status), WIFSIGNALED(status) ));
216                     if( WIFEXITED(status) || WIFSIGNALED(status) ){
217                         cli_cleanup(&cli_info[i]);
218                     }
219                 }
220             }
221         }
222         if( pid < 0 ){
223             if( errno == ECHILD ){
224                 TEST_VERBOSE(("No more children to wait. Happens on the last cli_wait_all call "
225                             "which is used to ensure that all children terminated.\n"));
226                 if (pmix_test_verbose) {
227                     sleep(1);
228                 }
229                 break;
230             } else {
231                 TEST_ERROR(("waitpid(): %d : %s", errno, strerror(errno)));
232                 exit(0);
233             }
234         }
235         ts.tv_sec = 0;
236         ts.tv_nsec = 100000;
237         nanosleep(&ts, NULL);
238         // calculate current timestamp
239         gettimeofday(&tv, NULL);
240         cur_time = tv.tv_sec + 1E-6*tv.tv_usec;
241     }
242 }
243 
cli_kill_all(void)244 void cli_kill_all(void)
245 {
246     int i;
247     for(i = 0; i < cli_info_cnt; i++){
248         if( CLI_UNINIT == cli_info[i].state ){
249             TEST_ERROR(("Skip rank %d as it wasn't ever initialized (shouldn't happe)",
250                           i));
251             continue;
252         } else if( CLI_TERM <= cli_info[i].state ){
253             TEST_VERBOSE(("Skip rank %d as it was already terminated.", i));
254             continue;
255 
256         }
257         TEST_VERBOSE(("Kill rank %d (pid = %d).", i, cli_info[i].pid));
258         kill(cli_info[i].pid, SIGKILL);
259         cli_cleanup(&cli_info[i]);
260     }
261 }
262 
errhandler(size_t evhdlr_registration_id,pmix_status_t status,const pmix_proc_t * source,pmix_info_t info[],size_t ninfo,pmix_info_t results[],size_t nresults,pmix_event_notification_cbfunc_fn_t cbfunc,void * cbdata)263 void errhandler(size_t evhdlr_registration_id,
264                 pmix_status_t status,
265                 const pmix_proc_t *source,
266                 pmix_info_t info[], size_t ninfo,
267                 pmix_info_t results[], size_t nresults,
268                 pmix_event_notification_cbfunc_fn_t cbfunc,
269                 void *cbdata)
270 {
271     TEST_ERROR((" PMIX server event handler with status = %d", status));
272     /* notify clients of error */
273     PMIx_Notify_event(status, source,
274                       PMIX_RANGE_NAMESPACE,
275                       NULL, 0,
276                       op_callbk, NULL);
277 }
278 
op_callbk(pmix_status_t status,void * cbdata)279 void op_callbk(pmix_status_t status,
280                       void *cbdata)
281 {
282     TEST_VERBOSE(( "OP CALLBACK CALLED WITH STATUS %d", status));
283 }
284 
errhandler_reg_callbk(pmix_status_t status,size_t errhandler_ref,void * cbdata)285 void errhandler_reg_callbk (pmix_status_t status,
286                             size_t errhandler_ref,
287                             void *cbdata)
288 {
289     TEST_VERBOSE(("ERRHANDLER REGISTRATION CALLBACK CALLED WITH STATUS %d, ref=%lu",
290                 status, (unsigned long)errhandler_ref));
291 }
292