1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 
3 /*
4  *  (C) 2001 by Argonne National Laboratory.
5  *      See COPYRIGHT in top-level directory.
6  */
7 
8 /* Make sure that we can properly ensure atomic access to the poll routine */
9 #ifdef MPICH_IS_THREADED
10 #if !(MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL)
11 #error selected multi-threaded implementation is not supported
12 #endif
13 #endif
14 
15 
16 static int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd,
17 				      struct pollinfo * const pollinfo);
18 static int MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd,
19 				      struct pollinfo * const pollinfo);
20 static int MPIDU_Socki_handle_read(struct pollfd * const pollfd,
21 				   struct pollinfo * const pollinfo);
22 static int MPIDU_Socki_handle_write(struct pollfd * const pollfd,
23 				    struct pollinfo * const pollinfo);
24 static int MPIDU_Socki_handle_connect(struct pollfd * const pollfd,
25 				      struct pollinfo * const pollinfo);
26 
27 /*
28  * MPIDU_Sock_wait()
29  *
30  * NOTES:
31  *
32  * For fatal errors, the state of the connection progresses directly to the
33  * failed state and the connection is marked inactive in
34  * the poll array.  Under normal conditions, the fatal error should result in
35  * the termination of the process; but, if that
36  * doesn't happen, we try to leave the implementation in a somewhat sane state.
37  *
38  * In the multithreaded case, only one routine at a time may call this routine
39  * To permit progress by other threads, it will release any global lock or
40  * coarse-grain critical section.
41  */
42 #undef FUNCNAME
43 #define FUNCNAME MPIDU_Sock_wait
44 #undef FCNAME
45 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Sock_wait(struct MPIDU_Sock_set * sock_set,int millisecond_timeout,struct MPIDU_Sock_event * eventp)46 int MPIDU_Sock_wait(struct MPIDU_Sock_set * sock_set, int millisecond_timeout,
47 		    struct MPIDU_Sock_event * eventp)
48 {
49     int mpi_errno = MPI_SUCCESS;
50     MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_WAIT);
51     MPIDI_STATE_DECL(MPID_STATE_POLL);
52 
53     MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_WAIT);
54 
55     for (;;)
56     {
57 	int elem=0;   /* Keep compiler happy */
58 	int n_fds;
59 	int n_elems;
60 	int found_active_elem = FALSE;
61 
62 	mpi_errno = MPIDU_Socki_event_dequeue(sock_set, &elem, eventp);
63 	if (mpi_errno == MPI_SUCCESS) {
64 	    struct pollinfo * pollinfo;
65 	    int flags;
66 
67 	    if (eventp->op_type != MPIDU_SOCK_OP_CLOSE)
68 	    {
69 		break;
70 	    }
71 
72 	    pollinfo = &sock_set->pollinfos[elem];
73 
74 	    /*
75 	     * Attempt to set socket back to blocking.  This *should* prevent
76 	     * any data in the socket send buffer from being
77 	     * discarded.  Instead close() will block until the buffer is
78 	     * flushed or the connection timeouts and is considered
79 	     * lost.  Theoretically, this could cause the MPIDU_Sock_wait() to
80 	     * hang indefinitely; however, the calling code
81 	     * should ensure this will not happen by going through a shutdown
82 	     * protocol before posting a close operation.
83 	     *
84 	     * FIXME: If the attempt to set the socket back to blocking fails,
85 	     * we presently ignore it.  Should we return an
86 	     * error?  We need to define acceptible data loss at close time.
87 	     * MS Windows has worse problems with this, so it
88 	     * may not be possible to make any guarantees.
89 	     */
90 	    flags = fcntl(pollinfo->fd, F_GETFL, 0);
91 	    if (flags != -1)
92 	    {
93 		fcntl(pollinfo->fd, F_SETFL, flags & ~O_NONBLOCK);
94 	    }
95 
96 	    /* FIXME: return code?  If an error occurs do we return it
97 	       instead of the error specified in the event? */
98 	    close(pollinfo->fd);
99 
100 	    MPIDU_Socki_sock_free(pollinfo->sock);
101 
102 	    break;
103 	}
104 
105 	for(;;)
106 	{
107 #	    ifndef MPICH_IS_THREADED
108 	    {
109 		MPIDI_FUNC_ENTER(MPID_STATE_POLL);
110 		n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems,
111 			     millisecond_timeout);
112 		MPIDI_FUNC_EXIT(MPID_STATE_POLL);
113 	    }
114 #	    else /* MPICH_IS_THREADED */
115 	    {
116 		/* If we've enabled runtime checking of the thread level,
117 		 then test for that and if we are *not* multithreaded,
118 		 just use the same code as above.  Otherwise, use
119 		 multithreaded code (and we don't then need the
120 		 MPIU_THREAD_CHECK_BEGIN/END macros) */
121 #ifdef HAVE_RUNTIME_THREADCHECK
122 		if (!MPIR_ThreadInfo.isThreaded) {
123 		    MPIDI_FUNC_ENTER(MPID_STATE_POLL);
124 		    n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems,
125 				 millisecond_timeout);
126 		    MPIDI_FUNC_EXIT(MPID_STATE_POLL);
127 		}
128 		else
129 #endif
130 		{
131 		/*
132 		 * First try a non-blocking poll to see if any immediate
133 		 * progress can be made.  This avoids the lock manipulation
134 		 * overhead.
135 		 */
136 		MPIDI_FUNC_ENTER(MPID_STATE_POLL);
137 		n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, 0);
138 		MPIDI_FUNC_EXIT(MPID_STATE_POLL);
139 
140 		if (n_fds == 0 && millisecond_timeout != 0)
141 		{
142 		    int pollfds_active_elems = sock_set->poll_array_elems;
143 
144 		    /* The abstraction here is a shared (blocking) resource that
145 		       the threads must coordinate.  That means not holding
146 		       a lock across the blocking operation but also
147 		       ensuring that only one thread at a time attempts
148 		       to use this resource.
149 
150 		       What isn't yet clear in this where the test is made
151 		       to ensure that two threads don't call the poll operation,
152 		       even in a nonblocking sense.
153 		    */
154 		    sock_set->pollfds_active = sock_set->pollfds;
155 
156 		    /* Release the lock so that other threads may make
157 		       progress while this thread waits for something to
158 		       do */
159 		    MPIU_DBG_MSG(THREAD,TYPICAL,"Exit global critical section (sock_wait)");
160 		    /* 		    MPIU_THREAD_CS_EXIT(MPIDCOMM,);
161 				    MPIU_THREAD_CS_EXIT(ALLFUNC,); */
162 		    MPID_Thread_mutex_unlock(&MPIR_ThreadInfo.global_mutex);
163 
164 		    MPIDI_FUNC_ENTER(MPID_STATE_POLL);
165 		    n_fds = poll(sock_set->pollfds_active,
166 				 pollfds_active_elems, millisecond_timeout);
167 		    MPIDI_FUNC_EXIT(MPID_STATE_POLL);
168 
169 		    /* Reaquire the lock before processing any of the
170 		       information returned from poll */
171 		    MPIU_DBG_MSG(THREAD,TYPICAL,"Enter global critical section (sock_wait)");
172 		    /* 		    MPIU_THREAD_CS_ENTER(ALLFUNC,);
173 				    MPIU_THREAD_CS_ENTER(MPIDCOMM,); */
174 		    MPID_Thread_mutex_lock(&MPIR_ThreadInfo.global_mutex);
175 
176 		    /*
177 		     * Update pollfds array if changes were posted while we
178 		     * were blocked in poll
179 		     */
180 		    if (sock_set->pollfds_updated) {
181 			mpi_errno = MPIDI_Sock_update_sock_set(
182 				       sock_set, pollfds_active_elems );
183 		    }
184 
185 		    sock_set->pollfds_active = NULL;
186 		    sock_set->wakeup_posted = FALSE;
187 		}
188 		} /* else !MPIR_ThreadInfo.isThreaded */
189 	    }
190 #	    endif /* MPICH_IS_THREADED */
191 
192 	    if (n_fds > 0)
193 	    {
194 		break;
195 	    }
196 	    else if (n_fds == 0)
197 	    {
198 		mpi_errno = MPIDU_SOCK_ERR_TIMEOUT;
199 		goto fn_exit;
200 	    }
201 	    else if (errno == EINTR)
202 	    {
203 		if (millisecond_timeout != MPIDU_SOCK_INFINITE_TIME)
204 		{
205 		    mpi_errno = MPIDU_SOCK_ERR_TIMEOUT;
206 		    goto fn_exit;
207 		}
208 
209 		continue;
210 	    }
211 	    /* --BEGIN ERROR HANDLING-- */
212 	    else if (errno == ENOMEM || errno == EAGAIN)
213 	    {
214 		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM,
215 						 "**sock|osnomem", NULL);
216 		goto fn_exit;
217 	    }
218 	    else
219 	    {
220 		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL,
221 						 "**sock|oserror", "**sock|poll|oserror %d %s", errno, MPIU_Strerror(errno));
222 		goto fn_exit;
223 	    }
224 	    /* --END ERROR HANDLING-- */
225 	}
226 
227 	elem = sock_set->starting_elem;
228 	n_elems = sock_set->poll_array_elems;
229 	while (n_fds > 0 && n_elems > 0)
230 	{
231 	    /*
232 	     * Acquire pointers to the pollfd and pollinfo structures for the next element
233 	     *
234 	     * NOTE: These pointers could become stale, if a new sock were to be allocated during the processing of the element.
235 	     * At present, none of the handler routines allocate a sock, so the issue does not arise.
236 	     */
237 	    struct pollfd * const pollfd = &sock_set->pollfds[elem];
238 	    struct pollinfo * const pollinfo = &sock_set->pollinfos[elem];
239 
240 	    MPIU_Assert((pollfd->events & (POLLIN | POLLOUT)) || pollfd->fd == -1);
241 	    MPIU_Assert(pollfd->fd >= 0 || pollfd->fd == -1);
242 
243 	    if (pollfd->fd < 0 || pollfd->revents == 0)
244 	    {
245 		/* This optimization assumes that most FDs will not have a pending event. */
246 		n_elems -= 1;
247 		elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;
248 		continue;
249 	    }
250 
251 	    if (found_active_elem == FALSE)
252 	    {
253 		found_active_elem = TRUE;
254 		sock_set->starting_elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;
255 	    }
256 
257 	    if (pollfd->revents & POLLNVAL)
258 	    {
259 		mpi_errno = MPIR_Err_create_code(
260 		    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|badhandle",
261 		    "**sock|poll|badhandle %d %d %d %d", pollinfo->sock_set->id, pollinfo->sock_id, pollfd->fd, pollinfo->fd);
262 		goto fn_exit;
263 	    }
264 
265 	    /* --BEGIN ERROR HANDLING-- */
266 	    if (pollfd->revents & POLLHUP)
267 	    {
268 		mpi_errno = MPIDU_Socki_handle_pollhup(pollfd, pollinfo);
269 		if (MPIR_Err_is_fatal(mpi_errno))
270 		{
271 		    goto fn_exit;
272 		}
273 	    }
274 
275 	    /* According to Stevens, some errors are reported as normal data
276 	       (POLLIN) and some are reported with POLLERR. */
277 	    if (pollfd->revents & POLLERR)
278 	    {
279 		mpi_errno = MPIDU_Socki_handle_pollerr(pollfd, pollinfo);
280 		if (MPIR_Err_is_fatal(mpi_errno))
281 		{
282 		    goto fn_exit;
283 		}
284 	    }
285 	    /* --END ERROR HANDLING-- */
286 
287 	    if (pollfd->revents & POLLIN)
288 	    {
289 		if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION)
290 		{
291 		    if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW ||
292 			pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO)
293 		    {
294 			mpi_errno = MPIDU_Socki_handle_read(pollfd, pollinfo);
295 			/* --BEGIN ERROR HANDLING-- */
296 			if (MPIR_Err_is_fatal(mpi_errno))
297 			{
298 			    goto fn_exit;
299 			}
300 			/* --END ERROR HANDLING-- */
301 		    }
302 		    /* --BEGIN ERROR HANDLING-- */
303 		    else
304 		    {
305 			mpi_errno = MPIR_Err_create_code(
306 			    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",
307 			    "**sock|poll|unhandledstate %d", pollinfo->state);
308 			goto fn_exit;
309 		    }
310 		    /* --END ERROR HANDLING-- */
311 
312 		}
313 		else if (pollinfo->type == MPIDU_SOCKI_TYPE_LISTENER)
314 		{
315 		    pollfd->events &= ~POLLIN;
316 		    MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_ACCEPT, 0, pollinfo->user_ptr,
317 					      MPI_SUCCESS, mpi_errno, fn_exit);
318 		}
319 	else if ((MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) && pollinfo->type == MPIDU_SOCKI_TYPE_INTERRUPTER)
320 		{
321 		    char c[16];
322 		    int nb;
323 
324 		    do
325 		    {
326 			nb = read(pollfd->fd, c, 16);
327 		    }
328 		    while (nb > 0 || (nb < 0 && errno == EINTR));
329 		}
330 		/* --BEGIN ERROR HANDLING-- */
331 		else
332 		{
333 		    mpi_errno = MPIR_Err_create_code(
334 			MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype",
335 			"**sock|poll|unhandledtype %d", pollinfo->type);
336 		    goto fn_exit;
337 		}
338 		/* --END ERROR HANDLING-- */
339 	    }
340 
341 	    if (pollfd->revents & POLLOUT)
342 	    {
343 		if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION)
344 		{
345 		    if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW)
346 		    {
347 			mpi_errno = MPIDU_Socki_handle_write(pollfd, pollinfo);
348 			/* --BEGIN ERROR HANDLING-- */
349 			if (MPIR_Err_is_fatal(mpi_errno))
350 			{
351 			    goto fn_exit;
352 			}
353 			/* --END ERROR HANDLING-- */
354 		    }
355 		    else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING)
356 		    {
357 			mpi_errno = MPIDU_Socki_handle_connect(pollfd, pollinfo);
358 			/* --BEGIN ERROR HANDLING-- */
359 			if (MPIR_Err_is_fatal(mpi_errno))
360 			{
361 			    goto fn_exit;
362 			}
363 			/* --END ERROR HANDLING-- */
364 		    }
365 		    /* --BEGIN ERROR HANDLING-- */
366 		    else
367 		    {
368 			mpi_errno = MPIR_Err_create_code(
369 			    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",
370 			    "**sock|poll|unhandledstate %d", pollinfo->state);
371 			goto fn_exit;
372 		    }
373 		    /* --END ERROR HANDLING-- */
374 		}
375 		/* --BEGIN ERROR HANDLING-- */
376 		else
377 		{
378 		    mpi_errno = MPIR_Err_create_code(
379 			MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype",
380 			"**sock|poll|unhandledtype %d", pollinfo->type);
381 		    goto fn_exit;
382 		}
383 		/* --END ERROR HANDLING-- */
384 	    }
385 
386 	    n_fds -= 1;
387 	    n_elems -= 1;
388 	    elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;
389 	}
390     }
391 
392   fn_exit:
393     MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT);
394     return mpi_errno;
395 }
396 
397 #undef FUNCNAME
398 #define FUNCNAME MPIDU_Socki_handle_pollhup
399 #undef FCNAME
400 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd,struct pollinfo * const pollinfo)401 static int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd, struct pollinfo * const pollinfo)
402 {
403     int mpi_errno = MPI_SUCCESS;
404     MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP);
405 
406     MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP);
407 
408     if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW)
409     {
410 	/*
411 	 * If a write was posted then cancel it and generate an connection closed event.  If a read is posted, it will be handled
412 	 * by the POLLIN handler.
413 	 */
414 	/* --BEGIN ERROR HANDLING-- */
415 	if (pollfd->events & POLLOUT)
416 	{
417 	    int event_mpi_errno;
418 
419 	    event_mpi_errno = MPIR_Err_create_code(
420 		MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_CONN_CLOSED,
421 		"**sock|connclosed", "**sock|connclosed %d %d", pollinfo->sock_set->id, pollinfo->sock_id);
422 	    MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
423 				      event_mpi_errno, mpi_errno, fn_exit);
424 	    MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
425 	    pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RO;
426 	}
427 	/* --END ERROR HANDLING-- */
428     }
429     else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO)
430     {
431 	/*
432 	 * If we are in the read-only state, then we should only get an error if we are looking to read data.  If we are not
433 	 * reading data, then pollfd->fd should be set to -1 and we should not be getting a POLLHUP event.
434 	 *
435 	 * There may still be data in the socket buffer, so we will let the POLLIN handler deal with the error.  Once all of the
436 	 * data has been read, the POLLIN handler will change the connection state and remove the connection from the active poll
437 	 * list.
438 	 */
439 	MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO && (pollfd->events & POLLIN) && (pollfd->revents & POLLIN));
440     }
441     else if (pollinfo->state == MPIDU_SOCKI_STATE_DISCONNECTED)
442     {
443 	/*
444 	 * We should never reach this state because pollfd->fd should be set to -1 if we are in the disconnected state.
445 	 */
446 	MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_DISCONNECTED && pollfd->fd == -1);
447     }
448     else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING)
449     {
450 	/*
451 	 * The process we were connecting to died.  Let the POLLOUT handler deal with the error.
452 	 */
453 	MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING && (pollfd->events & POLLOUT));
454 	pollfd->revents = POLLOUT;
455     }
456     /* --BEGIN ERROR HANDLING-- */
457     else
458     {
459 	mpi_errno = MPIR_Err_create_code(
460 	    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",
461 	    "**sock|poll|unhandledstate %d", pollinfo->state);
462 	goto fn_exit;
463     }
464     /* --END ERROR HANDLING-- */
465 
466   fn_exit:
467     MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP);
468     return mpi_errno;
469 }
470 /* end MPIDU_Socki_handle_pollhup() */
471 
472 
473 #undef FUNCNAME
474 #define FUNCNAME MPIDU_Socki_handle_pollerr
475 #undef FCNAME
476 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd,struct pollinfo * const pollinfo)477 static int MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd, struct pollinfo * const pollinfo)
478 {
479     int mpi_errno = MPI_SUCCESS;
480     MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLERR);
481 
482     MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLERR);
483 
484     /* --BEGIN ERROR HANDLING-- */
485     if (pollinfo->type != MPIDU_SOCKI_TYPE_COMMUNICATION)
486     {
487 	mpi_errno = MPIR_Err_create_code(
488 	    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype",
489 	    "**sock|poll|unhandledtype %d", pollinfo->type);
490 	goto fn_exit;
491     }
492     /* --END ERROR HANDLING-- */
493 
494     if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW)
495     {
496 	/*
497 	 * Stevens suggests that some older version of UNIX did not properly reset so_error, which could allow POLLERR to be
498 	 * continuously triggered.  We remove the socket from the poll list (pollfd->fd = 1) in order to prevent this issue.
499 	 * Here, we simple check that things are as we expect them to be.
500 	 */
501 	MPIU_Assert((pollfd->events & (POLLIN | POLLOUT)) || pollfd->fd == -1);
502 
503 	/* If a write was posted then cancel it and generate an write completion event */
504 	if (pollfd->events & POLLOUT)
505 	{
506 	    int disconnected;
507 	    int os_errno;
508 	    int event_mpi_errno;
509 
510 	    MPIDU_SOCKI_GET_SOCKET_ERROR(pollinfo, os_errno, mpi_errno, fn_exit);
511 
512 	    event_mpi_errno = MPIDU_Socki_os_to_mpi_errno(pollinfo, os_errno, FCNAME, __LINE__, &disconnected);
513 	    /* --BEGIN ERROR HANDLING-- */
514 	    if (MPIR_Err_is_fatal(event_mpi_errno))
515 	    {
516 		mpi_errno = event_mpi_errno;
517 		goto fn_exit;
518 	    }
519 	    /* --END ERROR HANDLING-- */
520 
521 	    MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
522 				      event_mpi_errno, mpi_errno, fn_exit);
523 	    MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
524 	    pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RO;
525 	}
526     }
527     else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO)
528     {
529 	/*
530 	 * If we are in the read-only state, then we should only get an error if we are looking to read data.  If we are not
531 	 * reading data, then pollfd->fd should be set to -1 and we should not be getting a POLLERR event.
532 	 *
533 	 * There may still be data in the socket buffer, so we will let the POLLIN handler deal with the error.  Once all of the
534 	 * data has been read, the POLLIN handler will change the connection state and remove the connection from the active poll
535 	 * list.
536 	 */
537 	MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO && (pollfd->events & POLLIN) && (pollfd->revents & POLLIN));
538     }
539     else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING)
540     {
541 	/*
542 	 * The process we were connecting to died.  Let the POLLOUT handler deal with the error.
543 	 */
544 	MPIU_Assert(pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING && (pollfd->events & POLLOUT));
545 	pollfd->revents = POLLOUT;
546     }
547     else if (pollinfo->state == MPIDU_SOCKI_STATE_DISCONNECTED)
548     {
549 	/* We are already disconnected!  Why are we handling an error? */
550 	MPIU_Assert(pollfd->fd == -1);
551     }
552     /* --BEGIN ERROR HANDLING-- */
553     else
554     {
555 	mpi_errno = MPIR_Err_create_code(
556 	    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",
557 	    "**sock|poll|unhandledstate %d", pollinfo->state);
558 	goto fn_exit;
559     }
560     /* --END ERROR HANDLING-- */
561 
562   fn_exit:
563     MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLERR);
564     return mpi_errno;
565 }
566 /* end MPIDU_Socki_handle_pollerr() */
567 
568 
569 #undef FUNCNAME
570 #define FUNCNAME MPIDU_Socki_handle_read
571 #undef FCNAME
572 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Socki_handle_read(struct pollfd * const pollfd,struct pollinfo * const pollinfo)573 static int MPIDU_Socki_handle_read(struct pollfd * const pollfd, struct pollinfo * const pollinfo)
574 {
575     int nb;
576     int mpi_errno = MPI_SUCCESS;
577     MPIDI_STATE_DECL(MPID_STATE_READ);
578     MPIDI_STATE_DECL(MPID_STATE_READV);
579     MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_READ);
580 
581     MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_READ);
582 
583     do
584     {
585 	if (pollinfo->read_iov_flag)
586 	{
587 	    MPIDI_FUNC_ENTER(MPID_STATE_READV);
588 	    nb = readv(pollinfo->fd, pollinfo->read.iov.ptr + pollinfo->read.iov.offset,
589 		       pollinfo->read.iov.count - pollinfo->read.iov.offset);
590 	    MPIDI_FUNC_EXIT(MPID_STATE_READV);
591 	}
592 	else
593 	{
594 	    MPIDI_FUNC_ENTER(MPID_STATE_READ);
595 	    nb = read(pollinfo->fd, pollinfo->read.buf.ptr + pollinfo->read_nb,
596 		      pollinfo->read.buf.max - pollinfo->read_nb);
597 	    MPIDI_FUNC_EXIT(MPID_STATE_READ);
598 	}
599     }
600     while (nb < 0 && errno == EINTR);
601 
602     if (nb > 0)
603     {
604 	int done;
605 
606 	pollinfo->read_nb += nb;
607 
608 	done = pollinfo->read_iov_flag ?
609 	    MPIDU_Socki_adjust_iov(nb, pollinfo->read.iov.ptr, pollinfo->read.iov.count, &pollinfo->read.iov.offset) :
610 	    (pollinfo->read_nb >= pollinfo->read.buf.min);
611 
612 	if (done)
613 	{
614 	    MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_READ, pollinfo->read_nb, pollinfo->user_ptr,
615 				      MPI_SUCCESS, mpi_errno, fn_exit);
616 	    MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLIN);
617 	}
618     }
619     /* --BEGIN ERROR HANDLING-- */
620     else if (nb == 0)
621     {
622 	int event_mpi_errno;
623 
624 	event_mpi_errno = MPIR_Err_create_code(
625 	    MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_CONN_CLOSED, "**sock|connclosed",
626 	    "**sock|connclosed %d %d", pollinfo->sock_set->id, pollinfo->sock_id);
627 	if (MPIDU_SOCKI_POLLFD_OP_ISSET(pollfd, pollinfo, POLLOUT))
628 	{
629 	    MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
630 				      event_mpi_errno, mpi_errno, fn_exit);
631 	}
632 	MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_READ, pollinfo->read_nb, pollinfo->user_ptr,
633 				  event_mpi_errno, mpi_errno, fn_exit);
634 
635 	MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLIN | POLLOUT);
636 	pollinfo->state = MPIDU_SOCKI_STATE_DISCONNECTED;
637 
638     }
639     /* --END ERROR HANDLING-- */
640     else if (errno == EAGAIN && errno == EWOULDBLOCK)
641     {
642 	/* do nothing... */
643 	goto fn_exit;
644     }
645     /* --BEGIN ERROR HANDLING-- */
646     else
647     {
648 	int disconnected;
649 	int event_mpi_errno;
650 
651 	event_mpi_errno = MPIDU_Socki_os_to_mpi_errno(pollinfo, errno, FCNAME, __LINE__, &disconnected);
652 	if (MPIR_Err_is_fatal(event_mpi_errno))
653 	{
654 	    /*
655 	     * A serious error occurred.  There is no guarantee that the data
656 	     * structures are still intact.  Therefore, we avoid
657 	     * modifying them.
658 	     */
659 	    mpi_errno = event_mpi_errno;
660 	    goto fn_exit;
661 	}
662 
663 	if (disconnected)
664 	{
665 	    if (MPIDU_SOCKI_POLLFD_OP_ISSET(pollfd, pollinfo, POLLOUT))
666 	    {
667 		MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
668 					  event_mpi_errno, mpi_errno, fn_exit);
669 		MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
670 	    }
671 
672 	    pollinfo->state = MPIDU_SOCKI_STATE_DISCONNECTED;
673 	}
674 
675 	MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_READ, pollinfo->read_nb, pollinfo->user_ptr,
676 				  event_mpi_errno, mpi_errno, fn_exit);
677 	MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLIN);
678     }
679     /* --END ERROR HANDLING-- */
680 
681   fn_exit:
682     MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_READ);
683     return mpi_errno;
684 }
685 /* end MPIDU_Socki_handle_read() */
686 
687 
688 #undef FUNCNAME
689 #define FUNCNAME MPIDU_Socki_handle_write
690 #undef FCNAME
691 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Socki_handle_write(struct pollfd * const pollfd,struct pollinfo * const pollinfo)692 static int MPIDU_Socki_handle_write(struct pollfd * const pollfd, struct pollinfo * const pollinfo)
693 {
694     int nb;
695     int mpi_errno = MPI_SUCCESS;
696     MPIDI_STATE_DECL(MPID_STATE_WRITE);
697     MPIDI_STATE_DECL(MPID_STATE_WRITEV);
698     MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_WRITE);
699 
700     MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_WRITE);
701 
702     do
703     {
704 	if (pollinfo->write_iov_flag)
705 	{
706 	    MPIDI_FUNC_ENTER(MPID_STATE_WRITEV);
707 	    nb = writev(pollinfo->fd, pollinfo->write.iov.ptr + pollinfo->write.iov.offset,
708 			pollinfo->write.iov.count - pollinfo->write.iov.offset);
709 	    MPIDI_FUNC_EXIT(MPID_STATE_WRITEV);
710 	}
711 	else
712 	{
713 	    MPIDI_FUNC_ENTER(MPID_STATE_WRITE);
714 	    nb = write(pollinfo->fd, pollinfo->write.buf.ptr + pollinfo->write_nb,
715 		       pollinfo->write.buf.max - pollinfo->write_nb);
716 	    MPIDI_FUNC_EXIT(MPID_STATE_WRITE);
717 	}
718     }
719     while (nb < 0 && errno == EINTR);
720 
721     if (nb >= 0)
722     {
723 	int done;
724 
725 	pollinfo->write_nb += nb;
726 
727 	done = pollinfo->write_iov_flag ?
728 	    MPIDU_Socki_adjust_iov(nb, pollinfo->write.iov.ptr, pollinfo->write.iov.count, &pollinfo->write.iov.offset) :
729 	    (pollinfo->write_nb >= pollinfo->write.buf.min);
730 
731 	if (done)
732 	{
733 	    MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
734 				      MPI_SUCCESS, mpi_errno, fn_exit);
735 	    MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
736 	}
737     }
738     else if (errno == EAGAIN || errno == EWOULDBLOCK)
739     {
740 	/* do nothing... */
741 	goto fn_exit;
742     }
743     /* --BEGIN ERROR HANDLING-- */
744     else
745     {
746 	int disconnected;
747 	int event_mpi_errno;
748 
749 	event_mpi_errno = MPIDU_Socki_os_to_mpi_errno(pollinfo, errno, FCNAME, __LINE__, &disconnected);
750 	if (MPIR_Err_is_fatal(event_mpi_errno))
751 	{
752 	    /*
753 	     * A serious error occurred.  There is no guarantee that the data structures are still intact.  Therefore, we avoid
754 	     * modifying them.
755 	     */
756 	    mpi_errno = event_mpi_errno;
757 	    goto fn_exit;
758 	}
759 
760 	MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_WRITE, pollinfo->write_nb, pollinfo->user_ptr,
761 				  event_mpi_errno, mpi_errno, fn_exit);
762 	MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
763 	if (disconnected)
764 	{
765 	    /*
766 	     * The connection is dead but data may still be in the socket buffer; thus, we change the state and let
767 	     * MPIDU_Sock_wait() clean things up.
768 	     */
769 	    pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RO;
770 	}
771     }
772     /* --END ERROR HANDLING-- */
773 
774   fn_exit:
775     MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_WRITE);
776     return mpi_errno;
777 }
778 /* end MPIDU_Socki_handle_write() */
779 
780 
781 #undef FUNCNAME
782 #define FUNCNAME MPIDU_Socki_handle_connect
783 #undef FCNAME
784 #define FCNAME MPIU_QUOTE(FUNCNAME)
MPIDU_Socki_handle_connect(struct pollfd * const pollfd,struct pollinfo * const pollinfo)785 static int MPIDU_Socki_handle_connect(struct pollfd * const pollfd, struct pollinfo * const pollinfo)
786 {
787     struct sockaddr_in addr;
788     socklen_t addr_len;
789     int rc;
790     int mpi_errno = MPI_SUCCESS;
791     MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_CONNECT);
792 
793     MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_CONNECT);
794 
795     addr_len = sizeof(struct sockaddr_in);
796     rc = getpeername(pollfd->fd, (struct sockaddr *) &addr, &addr_len);
797     if (rc == 0)
798     {
799 	MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_CONNECT, 0, pollinfo->user_ptr, MPI_SUCCESS, mpi_errno, fn_exit);
800 	pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RW;
801     }
802     /* --BEGIN ERROR HANDLING-- */
803     else
804     {
805 	int event_mpi_errno;
806 
807 	MPIDU_SOCKI_GET_SOCKET_ERROR(pollinfo, pollinfo->os_errno, mpi_errno, fn_exit);
808 	event_mpi_errno = MPIR_Err_create_code(
809 	    MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_CONN_FAILED, "**sock|connfailed",
810 	    "**sock|poll|connfailed %d %d %d %s", pollinfo->sock_set->id, pollinfo->sock_id, pollinfo->os_errno,
811 	    MPIU_Strerror(pollinfo->os_errno));
812 	MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_CONNECT, 0, pollinfo->user_ptr, event_mpi_errno, mpi_errno, fn_exit);
813 	pollinfo->state = MPIDU_SOCKI_STATE_DISCONNECTED;
814     }
815     /* --END ERROR HANDLING-- */
816 
817     MPIDU_SOCKI_POLLFD_OP_CLEAR(pollfd, pollinfo, POLLOUT);
818 
819   fn_exit:
820     MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCKI_HANDLE_CONNECT);
821     return mpi_errno;
822 }
823 /* end MPIDU_Socki_handle_connect() */
824