1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2
3 /*
4 * (C) 2001 by Argonne National Laboratory.
5 * See COPYRIGHT in top-level directory.
6 */
7
8 /* Make sure that we can properly ensure atomic access to the poll routine */
9 #ifdef MPICH_IS_THREADED
10 #if !(MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL)
11 #error selected multi-threaded implementation is not supported
12 #endif
13 #endif
14
15
16 static int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd,
17 struct pollinfo * const pollinfo);
18 static int MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd,
19 struct pollinfo * const pollinfo);
20 static int MPIDU_Socki_handle_read(struct pollfd * const pollfd,
21 struct pollinfo * const pollinfo);
22 static int MPIDU_Socki_handle_write(struct pollfd * const pollfd,
23 struct pollinfo * const pollinfo);
24 static int MPIDU_Socki_handle_connect(struct pollfd * const pollfd,
25 struct pollinfo * const pollinfo);
26
27 /*
28 * MPIDU_Sock_wait()
29 *
30 * NOTES:
31 *
32 * For fatal errors, the state of the connection progresses directly to the
33 * failed state and the connection is marked inactive in
34 * the poll array. Under normal conditions, the fatal error should result in
35 * the termination of the process; but, if that
36 * doesn't happen, we try to leave the implementation in a somewhat sane state.
37 *
38 * In the multithreaded case, only one routine at a time may call this routine
39 * To permit progress by other threads, it will release any global lock or
40 * coarse-grain critical section.
41 */
42 #undef FUNCNAME
43 #define FUNCNAME MPIDU_Sock_wait
44 #undef FCNAME
45 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Sock_wait(struct MPIDU_Sock_set * sock_set,int millisecond_timeout,struct MPIDU_Sock_event * eventp)46 int MPIDU_Sock_wait(struct MPIDU_Sock_set * sock_set, int millisecond_timeout,
47 struct MPIDU_Sock_event * eventp)
48 {
49 int mpi_errno = MPI_SUCCESS;
50 MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_WAIT);
51 MPIDI_STATE_DECL(MPID_STATE_POLL);
52
53 MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_WAIT);
54
55 for (;;)
56 {
57 int elem=0; /* Keep compiler happy */
58 int n_fds;
59 int n_elems;
60 int found_active_elem = FALSE;
61
62 mpi_errno = MPIDU_Socki_event_dequeue(sock_set, &elem, eventp);
63 if (mpi_errno == MPI_SUCCESS) {
64 struct pollinfo * pollinfo;
65 int flags;
66
67 if (eventp->op_type != MPIDU_SOCK_OP_CLOSE)
68 {
69 break;
70 }
71
72 pollinfo = &sock_set->pollinfos[elem];
73
74 /*
75 * Attempt to set socket back to blocking. This *should* prevent
76 * any data in the socket send buffer from being
77 * discarded. Instead close() will block until the buffer is
78 * flushed or the connection timeouts and is considered
79 * lost. Theoretically, this could cause the MPIDU_Sock_wait() to
80 * hang indefinitely; however, the calling code
81 * should ensure this will not happen by going through a shutdown
82 * protocol before posting a close operation.
83 *
84 * FIXME: If the attempt to set the socket back to blocking fails,
85 * we presently ignore it. Should we return an
86 * error? We need to define acceptible data loss at close time.
87 * MS Windows has worse problems with this, so it
88 * may not be possible to make any guarantees.
89 */
90 flags = fcntl(pollinfo->fd, F_GETFL, 0);
91 if (flags != -1)
92 {
93 fcntl(pollinfo->fd, F_SETFL, flags & ~O_NONBLOCK);
94 }
95
96 /* FIXME: return code? If an error occurs do we return it
97 instead of the error specified in the event? */
98 close(pollinfo->fd);
99
100 MPIDU_Socki_sock_free(pollinfo->sock);
101
102 break;
103 }
104
105 for(;;)
106 {
107 # ifndef MPICH_IS_THREADED
108 {
109 MPIDI_FUNC_ENTER(MPID_STATE_POLL);
110 n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems,
111 millisecond_timeout);
112 MPIDI_FUNC_EXIT(MPID_STATE_POLL);
113 }
114 # else /* MPICH_IS_THREADED */
115 {
116 /* If we've enabled runtime checking of the thread level,
117 then test for that and if we are *not* multithreaded,
118 just use the same code as above. Otherwise, use
119 multithreaded code (and we don't then need the
120 MPIU_THREAD_CHECK_BEGIN/END macros) */
121 #ifdef HAVE_RUNTIME_THREADCHECK
122 if (!MPIR_ThreadInfo.isThreaded) {
123 MPIDI_FUNC_ENTER(MPID_STATE_POLL);
124 n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems,
125 millisecond_timeout);
126 MPIDI_FUNC_EXIT(MPID_STATE_POLL);
127 }
128 else
129 #endif
130 {
131 /*
132 * First try a non-blocking poll to see if any immediate
133 * progress can be made. This avoids the lock manipulation
134 * overhead.
135 */
136 MPIDI_FUNC_ENTER(MPID_STATE_POLL);
137 n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, 0);
138 MPIDI_FUNC_EXIT(MPID_STATE_POLL);
139
140 if (n_fds == 0 && millisecond_timeout != 0)
141 {
142 int pollfds_active_elems = sock_set->poll_array_elems;
143
144 /* The abstraction here is a shared (blocking) resource that
145 the threads must coordinate. That means not holding
146 a lock across the blocking operation but also
147 ensuring that only one thread at a time attempts
148 to use this resource.
149
150 What isn't yet clear in this where the test is made
151 to ensure that two threads don't call the poll operation,
152 even in a nonblocking sense.
153 */
154 sock_set->pollfds_active = sock_set->pollfds;
155
156 /* Release the lock so that other threads may make
157 progress while this thread waits for something to
158 do */
159 MPIU_DBG_MSG(THREAD,TYPICAL,"Exit global critical section (sock_wait)");
160 /* MPIU_THREAD_CS_EXIT(MPIDCOMM,);
161 MPIU_THREAD_CS_EXIT(ALLFUNC,); */
162 MPID_Thread_mutex_unlock(&MPIR_ThreadInfo.global_mutex);
163
164 MPIDI_FUNC_ENTER(MPID_STATE_POLL);
165 n_fds = poll(sock_set->pollfds_active,
166 pollfds_active_elems, millisecond_timeout);
167 MPIDI_FUNC_EXIT(MPID_STATE_POLL);
168
169 /* Reaquire the lock before processing any of the
170 information returned from poll */
171 MPIU_DBG_MSG(THREAD,TYPICAL,"Enter global critical section (sock_wait)");
172 /* MPIU_THREAD_CS_ENTER(ALLFUNC,);
173 MPIU_THREAD_CS_ENTER(MPIDCOMM,); */
174 MPID_Thread_mutex_lock(&MPIR_ThreadInfo.global_mutex);
175
176 /*
177 * Update pollfds array if changes were posted while we
178 * were blocked in poll
179 */
180 if (sock_set->pollfds_updated) {
181 mpi_errno = MPIDI_Sock_update_sock_set(
182 sock_set, pollfds_active_elems );
183 }
184
185 sock_set->pollfds_active = NULL;
186 sock_set->wakeup_posted = FALSE;
187 }
188 } /* else !MPIR_ThreadInfo.isThreaded */
189 }
190 # endif /* MPICH_IS_THREADED */
191
192 if (n_fds > 0)
193 {
194 break;
195 }
196 else if (n_fds == 0)
197 {
198 mpi_errno = MPIDU_SOCK_ERR_TIMEOUT;
199 goto fn_exit;
200 }
201 else if (errno == EINTR)
202 {
203 if (millisecond_timeout != MPIDU_SOCK_INFINITE_TIME)
204 {
205 mpi_errno = MPIDU_SOCK_ERR_TIMEOUT;
206 goto fn_exit;
207 }
208
209 continue;
210 }
211 /* --BEGIN ERROR HANDLING-- */
212 else if (errno == ENOMEM || errno == EAGAIN)
213 {
214 mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM,
215 "**sock|osnomem", NULL);
216 goto fn_exit;
217 }
218 else
219 {
220 mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL,
221 "**sock|oserror", "**sock|poll|oserror %d %s", errno, MPIU_Strerror(errno));
222 goto fn_exit;
223 }
224 /* --END ERROR HANDLING-- */
225 }
226
227 elem = sock_set->starting_elem;
228 n_elems = sock_set->poll_array_elems;
229 while (n_fds > 0 && n_elems > 0)
230 {
231 /*
232 * Acquire pointers to the pollfd and pollinfo structures for the next element
233 *
234 * NOTE: These pointers could become stale, if a new sock were to be allocated during the processing of the element.
235 * At present, none of the handler routines allocate a sock, so the issue does not arise.
236 */
237 struct pollfd * const pollfd = &sock_set->pollfds[elem];
238 struct pollinfo * const pollinfo = &sock_set->pollinfos[elem];
239
240 MPIU_Assert((pollfd->events & (POLLIN | POLLOUT)) || pollfd->fd == -1);
241 MPIU_Assert(pollfd->fd >= 0 || pollfd->fd == -1);
242
243 if (pollfd->fd < 0 || pollfd->revents == 0)
244 {
245 /* This optimization assumes that most FDs will not have a pending event. */
246 n_elems -= 1;
247 elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;
248 continue;
249 }
250
251 if (found_active_elem == FALSE)
252 {
253 found_active_elem = TRUE;
254 sock_set->starting_elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;
255 }
256
257 if (pollfd->revents & POLLNVAL)
258 {
259 mpi_errno = MPIR_Err_create_code(
260 MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|badhandle",
261 "**sock|poll|badhandle %d %d %d %d", pollinfo->sock_set->id, pollinfo->sock_id, pollfd->fd, pollinfo->fd);
262 goto fn_exit;
263 }
264
265 /* --BEGIN ERROR HANDLING-- */
266 if (pollfd->revents & POLLHUP)
267 {
268 mpi_errno = MPIDU_Socki_handle_pollhup(pollfd, pollinfo);
269 if (MPIR_Err_is_fatal(mpi_errno))
270 {
271 goto fn_exit;
272 }
273 }
274
275 /* According to Stevens, some errors are reported as normal data
276 (POLLIN) and some are reported with POLLERR. */
277 if (pollfd->revents & POLLERR)
278 {
279 mpi_errno = MPIDU_Socki_handle_pollerr(pollfd, pollinfo);
280 if (MPIR_Err_is_fatal(mpi_errno))
281 {
282 goto fn_exit;
283 }
284 }
285 /* --END ERROR HANDLING-- */
286
287 if (pollfd->revents & POLLIN)
288 {
289 if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION)
290 {
291 if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW ||
292 pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO)
293 {
294 mpi_errno = MPIDU_Socki_handle_read(pollfd, pollinfo);
295 /* --BEGIN ERROR HANDLING-- */
296 if (MPIR_Err_is_fatal(mpi_errno))
297 {
298 goto fn_exit;
299 }
300 /* --END ERROR HANDLING-- */
301 }
302 /* --BEGIN ERROR HANDLING-- */
303 else
304 {
305 mpi_errno = MPIR_Err_create_code(
306 MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",
307 "**sock|poll|unhandledstate %d", pollinfo->state);
308 goto fn_exit;
309 }
310 /* --END ERROR HANDLING-- */
311
312 }
313 else if (pollinfo->type == MPIDU_SOCKI_TYPE_LISTENER)
314 {
315 pollfd->events &= ~POLLIN;
316 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_ACCEPT, 0, pollinfo->user_ptr,
317 MPI_SUCCESS, mpi_errno, fn_exit);
318 }
319 else if ((MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) && pollinfo->type == MPIDU_SOCKI_TYPE_INTERRUPTER)
320 {
321 char c[16];
322 int nb;
323
324 do
325 {
326 nb = read(pollfd->fd, c, 16);
327 }
328 while (nb > 0 || (nb < 0 && errno == EINTR));
329 }
330 /* --BEGIN ERROR HANDLING-- */
331 else
332 {
333 mpi_errno = MPIR_Err_create_code(
334 MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype",
335 "**sock|poll|unhandledtype %d", pollinfo->type);
336 goto fn_exit;
337 }
338 /* --END ERROR HANDLING-- */
339 }
340
341 if (pollfd->revents & POLLOUT)
342 {
343 if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION)
344 {
345 if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW)
346 {
347 mpi_errno = MPIDU_Socki_handle_write(pollfd, pollinfo);
348 /* --BEGIN ERROR HANDLING-- */
349 if (MPIR_Err_is_fatal(mpi_errno))
350 {
351 goto fn_exit;
352 }
353 /* --END ERROR HANDLING-- */
354 }
355 else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING)
356 {
357 mpi_errno = MPIDU_Socki_handle_connect(pollfd, pollinfo);
358 /* --BEGIN ERROR HANDLING-- */
359 if (MPIR_Err_is_fatal(mpi_errno))
360 {
361 goto fn_exit;
362 }
363 /* --END ERROR HANDLING-- */
364 }
365 /* --BEGIN ERROR HANDLING-- */
366 else
367 {
368 mpi_errno = MPIR_Err_create_code(
369 MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",
370 "**sock|poll|unhandledstate %d", pollinfo->state);
371 goto fn_exit;
372 }
373 /* --END ERROR HANDLING-- */
374 }
375 /* --BEGIN ERROR HANDLING-- */
376 else
377 {
378 mpi_errno = MPIR_Err_create_code(
379 MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype",
380 "**sock|poll|unhandledtype %d", pollinfo->type);
381 goto fn_exit;
382 }
383 /* --END ERROR HANDLING-- */
384 }
385
386 n_fds -= 1;
387 n_elems -= 1;
388 elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;
389 }
390 }
391
392 fn_exit:
393 MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT);
394 return mpi_errno;
395 }
396
397 #undef FUNCNAME
398 #define FUNCNAME MPIDU_Socki_handle_pollhup
399 #undef FCNAME
400 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd,struct pollinfo * const pollinfo)401 static int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd, struct pollinfo * const pollinfo)
402 {
403 int mpi_errno = MPI_SUCCESS;
404 MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP);
405
406 MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP);
407
408 if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW)
409 {
410 /*
411 * If a write was posted then cancel it and generate an connection closed event. If a read is posted, it will be handled
412 * by the POLLIN handler.
413 */
414 /* --BEGIN ERROR HANDLING-- */
415 if (pollfd->events & POLLOUT)
416 {
417 int event_mpi_errno;
418
419 event_mpi_errno = MPIR_Err_create_code(
420 MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_CONN_CLOSED,
421 "**sock|connclosed", "**sock|connclosed %d %d", pollinfo->sock_set->id, pollinfo->sock_id);
422 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
423 event_mpi_errno, mpi_errno, fn_exit);
424 MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
425 pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RO;
426 }
427 /* --END ERROR HANDLING-- */
428 }
429 else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO)
430 {
431 /*
432 * If we are in the read-only state, then we should only get an error if we are looking to read data. If we are not
433 * reading data, then pollfd->fd should be set to -1 and we should not be getting a POLLHUP event.
434 *
435 * There may still be data in the socket buffer, so we will let the POLLIN handler deal with the error. Once all of the
436 * data has been read, the POLLIN handler will change the connection state and remove the connection from the active poll
437 * list.
438 */
439 MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO && (pollfd->events & POLLIN) && (pollfd->revents & POLLIN));
440 }
441 else if (pollinfo->state == MPIDU_SOCKI_STATE_DISCONNECTED)
442 {
443 /*
444 * We should never reach this state because pollfd->fd should be set to -1 if we are in the disconnected state.
445 */
446 MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_DISCONNECTED && pollfd->fd == -1);
447 }
448 else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING)
449 {
450 /*
451 * The process we were connecting to died. Let the POLLOUT handler deal with the error.
452 */
453 MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING && (pollfd->events & POLLOUT));
454 pollfd->revents = POLLOUT;
455 }
456 /* --BEGIN ERROR HANDLING-- */
457 else
458 {
459 mpi_errno = MPIR_Err_create_code(
460 MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",
461 "**sock|poll|unhandledstate %d", pollinfo->state);
462 goto fn_exit;
463 }
464 /* --END ERROR HANDLING-- */
465
466 fn_exit:
467 MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP);
468 return mpi_errno;
469 }
470 /* end MPIDU_Socki_handle_pollhup() */
471
472
473 #undef FUNCNAME
474 #define FUNCNAME MPIDU_Socki_handle_pollerr
475 #undef FCNAME
476 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd,struct pollinfo * const pollinfo)477 static int MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd, struct pollinfo * const pollinfo)
478 {
479 int mpi_errno = MPI_SUCCESS;
480 MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLERR);
481
482 MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLERR);
483
484 /* --BEGIN ERROR HANDLING-- */
485 if (pollinfo->type != MPIDU_SOCKI_TYPE_COMMUNICATION)
486 {
487 mpi_errno = MPIR_Err_create_code(
488 MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype",
489 "**sock|poll|unhandledtype %d", pollinfo->type);
490 goto fn_exit;
491 }
492 /* --END ERROR HANDLING-- */
493
494 if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW)
495 {
496 /*
497 * Stevens suggests that some older version of UNIX did not properly reset so_error, which could allow POLLERR to be
498 * continuously triggered. We remove the socket from the poll list (pollfd->fd = 1) in order to prevent this issue.
499 * Here, we simple check that things are as we expect them to be.
500 */
501 MPIU_Assert((pollfd->events & (POLLIN | POLLOUT)) || pollfd->fd == -1);
502
503 /* If a write was posted then cancel it and generate an write completion event */
504 if (pollfd->events & POLLOUT)
505 {
506 int disconnected;
507 int os_errno;
508 int event_mpi_errno;
509
510 MPIDU_SOCKI_GET_SOCKET_ERROR(pollinfo, os_errno, mpi_errno, fn_exit);
511
512 event_mpi_errno = MPIDU_Socki_os_to_mpi_errno(pollinfo, os_errno, FCNAME, __LINE__, &disconnected);
513 /* --BEGIN ERROR HANDLING-- */
514 if (MPIR_Err_is_fatal(event_mpi_errno))
515 {
516 mpi_errno = event_mpi_errno;
517 goto fn_exit;
518 }
519 /* --END ERROR HANDLING-- */
520
521 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
522 event_mpi_errno, mpi_errno, fn_exit);
523 MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
524 pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RO;
525 }
526 }
527 else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO)
528 {
529 /*
530 * If we are in the read-only state, then we should only get an error if we are looking to read data. If we are not
531 * reading data, then pollfd->fd should be set to -1 and we should not be getting a POLLERR event.
532 *
533 * There may still be data in the socket buffer, so we will let the POLLIN handler deal with the error. Once all of the
534 * data has been read, the POLLIN handler will change the connection state and remove the connection from the active poll
535 * list.
536 */
537 MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO && (pollfd->events & POLLIN) && (pollfd->revents & POLLIN));
538 }
539 else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING)
540 {
541 /*
542 * The process we were connecting to died. Let the POLLOUT handler deal with the error.
543 */
544 MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING && (pollfd->events & POLLOUT));
545 pollfd->revents = POLLOUT;
546 }
547 else if (pollinfo->state == MPIDU_SOCKI_STATE_DISCONNECTED)
548 {
549 /* We are already disconnected! Why are we handling an error? */
550 MPIU_Assert(pollfd->fd == -1);
551 }
552 /* --BEGIN ERROR HANDLING-- */
553 else
554 {
555 mpi_errno = MPIR_Err_create_code(
556 MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",
557 "**sock|poll|unhandledstate %d", pollinfo->state);
558 goto fn_exit;
559 }
560 /* --END ERROR HANDLING-- */
561
562 fn_exit:
563 MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLERR);
564 return mpi_errno;
565 }
566 /* end MPIDU_Socki_handle_pollerr() */
567
568
569 #undef FUNCNAME
570 #define FUNCNAME MPIDU_Socki_handle_read
571 #undef FCNAME
572 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Socki_handle_read(struct pollfd * const pollfd,struct pollinfo * const pollinfo)573 static int MPIDU_Socki_handle_read(struct pollfd * const pollfd, struct pollinfo * const pollinfo)
574 {
575 int nb;
576 int mpi_errno = MPI_SUCCESS;
577 MPIDI_STATE_DECL(MPID_STATE_READ);
578 MPIDI_STATE_DECL(MPID_STATE_READV);
579 MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_READ);
580
581 MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_READ);
582
583 do
584 {
585 if (pollinfo->read_iov_flag)
586 {
587 MPIDI_FUNC_ENTER(MPID_STATE_READV);
588 nb = readv(pollinfo->fd, pollinfo->read.iov.ptr + pollinfo->read.iov.offset,
589 pollinfo->read.iov.count - pollinfo->read.iov.offset);
590 MPIDI_FUNC_EXIT(MPID_STATE_READV);
591 }
592 else
593 {
594 MPIDI_FUNC_ENTER(MPID_STATE_READ);
595 nb = read(pollinfo->fd, pollinfo->read.buf.ptr + pollinfo->read_nb,
596 pollinfo->read.buf.max - pollinfo->read_nb);
597 MPIDI_FUNC_EXIT(MPID_STATE_READ);
598 }
599 }
600 while (nb < 0 && errno == EINTR);
601
602 if (nb > 0)
603 {
604 int done;
605
606 pollinfo->read_nb += nb;
607
608 done = pollinfo->read_iov_flag ?
609 MPIDU_Socki_adjust_iov(nb, pollinfo->read.iov.ptr, pollinfo->read.iov.count, &pollinfo->read.iov.offset) :
610 (pollinfo->read_nb >= pollinfo->read.buf.min);
611
612 if (done)
613 {
614 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_READ, pollinfo->read_nb, pollinfo->user_ptr,
615 MPI_SUCCESS, mpi_errno, fn_exit);
616 MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLIN);
617 }
618 }
619 /* --BEGIN ERROR HANDLING-- */
620 else if (nb == 0)
621 {
622 int event_mpi_errno;
623
624 event_mpi_errno = MPIR_Err_create_code(
625 MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_CONN_CLOSED, "**sock|connclosed",
626 "**sock|connclosed %d %d", pollinfo->sock_set->id, pollinfo->sock_id);
627 if (MPIDU_SOCKI_POLLFD_OP_ISSET(pollfd, pollinfo, POLLOUT))
628 {
629 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
630 event_mpi_errno, mpi_errno, fn_exit);
631 }
632 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_READ, pollinfo->read_nb, pollinfo->user_ptr,
633 event_mpi_errno, mpi_errno, fn_exit);
634
635 MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLIN | POLLOUT);
636 pollinfo->state = MPIDU_SOCKI_STATE_DISCONNECTED;
637
638 }
639 /* --END ERROR HANDLING-- */
640 else if (errno == EAGAIN && errno == EWOULDBLOCK)
641 {
642 /* do nothing... */
643 goto fn_exit;
644 }
645 /* --BEGIN ERROR HANDLING-- */
646 else
647 {
648 int disconnected;
649 int event_mpi_errno;
650
651 event_mpi_errno = MPIDU_Socki_os_to_mpi_errno(pollinfo, errno, FCNAME, __LINE__, &disconnected);
652 if (MPIR_Err_is_fatal(event_mpi_errno))
653 {
654 /*
655 * A serious error occurred. There is no guarantee that the data
656 * structures are still intact. Therefore, we avoid
657 * modifying them.
658 */
659 mpi_errno = event_mpi_errno;
660 goto fn_exit;
661 }
662
663 if (disconnected)
664 {
665 if (MPIDU_SOCKI_POLLFD_OP_ISSET(pollfd, pollinfo, POLLOUT))
666 {
667 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
668 event_mpi_errno, mpi_errno, fn_exit);
669 MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
670 }
671
672 pollinfo->state = MPIDU_SOCKI_STATE_DISCONNECTED;
673 }
674
675 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_READ, pollinfo->read_nb, pollinfo->user_ptr,
676 event_mpi_errno, mpi_errno, fn_exit);
677 MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLIN);
678 }
679 /* --END ERROR HANDLING-- */
680
681 fn_exit:
682 MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_READ);
683 return mpi_errno;
684 }
685 /* end MPIDU_Socki_handle_read() */
686
687
688 #undef FUNCNAME
689 #define FUNCNAME MPIDU_Socki_handle_write
690 #undef FCNAME
691 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Socki_handle_write(struct pollfd * const pollfd,struct pollinfo * const pollinfo)692 static int MPIDU_Socki_handle_write(struct pollfd * const pollfd, struct pollinfo * const pollinfo)
693 {
694 int nb;
695 int mpi_errno = MPI_SUCCESS;
696 MPIDI_STATE_DECL(MPID_STATE_WRITE);
697 MPIDI_STATE_DECL(MPID_STATE_WRITEV);
698 MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_WRITE);
699
700 MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_WRITE);
701
702 do
703 {
704 if (pollinfo->write_iov_flag)
705 {
706 MPIDI_FUNC_ENTER(MPID_STATE_WRITEV);
707 nb = writev(pollinfo->fd, pollinfo->write.iov.ptr + pollinfo->write.iov.offset,
708 pollinfo->write.iov.count - pollinfo->write.iov.offset);
709 MPIDI_FUNC_EXIT(MPID_STATE_WRITEV);
710 }
711 else
712 {
713 MPIDI_FUNC_ENTER(MPID_STATE_WRITE);
714 nb = write(pollinfo->fd, pollinfo->write.buf.ptr + pollinfo->write_nb,
715 pollinfo->write.buf.max - pollinfo->write_nb);
716 MPIDI_FUNC_EXIT(MPID_STATE_WRITE);
717 }
718 }
719 while (nb < 0 && errno == EINTR);
720
721 if (nb >= 0)
722 {
723 int done;
724
725 pollinfo->write_nb += nb;
726
727 done = pollinfo->write_iov_flag ?
728 MPIDU_Socki_adjust_iov(nb, pollinfo->write.iov.ptr, pollinfo->write.iov.count, &pollinfo->write.iov.offset) :
729 (pollinfo->write_nb >= pollinfo->write.buf.min);
730
731 if (done)
732 {
733 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
734 MPI_SUCCESS, mpi_errno, fn_exit);
735 MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
736 }
737 }
738 else if (errno == EAGAIN || errno == EWOULDBLOCK)
739 {
740 /* do nothing... */
741 goto fn_exit;
742 }
743 /* --BEGIN ERROR HANDLING-- */
744 else
745 {
746 int disconnected;
747 int event_mpi_errno;
748
749 event_mpi_errno = MPIDU_Socki_os_to_mpi_errno(pollinfo, errno, FCNAME, __LINE__, &disconnected);
750 if (MPIR_Err_is_fatal(event_mpi_errno))
751 {
752 /*
753 * A serious error occurred. There is no guarantee that the data structures are still intact. Therefore, we avoid
754 * modifying them.
755 */
756 mpi_errno = event_mpi_errno;
757 goto fn_exit;
758 }
759
760 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
761 event_mpi_errno, mpi_errno, fn_exit);
762 MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
763 if (disconnected)
764 {
765 /*
766 * The connection is dead but data may still be in the socket buffer; thus, we change the state and let
767 * MPIDU_Sock_wait() clean things up.
768 */
769 pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RO;
770 }
771 }
772 /* --END ERROR HANDLING-- */
773
774 fn_exit:
775 MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_WRITE);
776 return mpi_errno;
777 }
778 /* end MPIDU_Socki_handle_write() */
779
780
781 #undef FUNCNAME
782 #define FUNCNAME MPIDU_Socki_handle_connect
783 #undef FCNAME
784 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Socki_handle_connect(struct pollfd * const pollfd,struct pollinfo * const pollinfo)785 static int MPIDU_Socki_handle_connect(struct pollfd * const pollfd, struct pollinfo * const pollinfo)
786 {
787 struct sockaddr_in addr;
788 socklen_t addr_len;
789 int rc;
790 int mpi_errno = MPI_SUCCESS;
791 MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_CONNECT);
792
793 MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_CONNECT);
794
795 addr_len = sizeof(struct sockaddr_in);
796 rc = getpeername(pollfd->fd, (struct sockaddr *) &addr, &addr_len);
797 if (rc == 0)
798 {
799 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_CONNECT, 0, pollinfo->user_ptr, MPI_SUCCESS, mpi_errno, fn_exit);
800 pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RW;
801 }
802 /* --BEGIN ERROR HANDLING-- */
803 else
804 {
805 int event_mpi_errno;
806
807 MPIDU_SOCKI_GET_SOCKET_ERROR(pollinfo, pollinfo->os_errno, mpi_errno, fn_exit);
808 event_mpi_errno = MPIR_Err_create_code(
809 MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_CONN_FAILED, "**sock|connfailed",
810 "**sock|poll|connfailed %d %d %d %s", pollinfo->sock_set->id, pollinfo->sock_id, pollinfo->os_errno,
811 MPIU_Strerror(pollinfo->os_errno));
812 MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_CONNECT, 0, pollinfo->user_ptr, event_mpi_errno, mpi_errno, fn_exit);
813 pollinfo->state = MPIDU_SOCKI_STATE_DISCONNECTED;
814 }
815 /* --END ERROR HANDLING-- */
816
817 MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
818
819 fn_exit:
820 MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_CONNECT);
821 return mpi_errno;
822 }
823 /* end MPIDU_Socki_handle_connect() */
824