1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2008 Brian Aker, Eric Day
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39 /**
40 * @file
41 * @brief Connection Definitions
42 */
43
44 #include "gear_config.h"
45 #include "libgearman-server/common.h"
46 #include <libgearman-server/plugins/base.h>
47
48 #include <cstring>
49 #include <cerrno>
50 #include <cassert>
51
52 #ifndef SOCK_NONBLOCK
53 # define SOCK_NONBLOCK 0
54 #endif
55
_connection_close(gearmand_io_st * connection)56 static void _connection_close(gearmand_io_st *connection)
57 {
58 if (connection->fd == INVALID_SOCKET)
59 {
60 return;
61 }
62
63 if (connection->options.external_fd)
64 {
65 connection->options.external_fd= false;
66 }
67 else
68 {
69 (void)gearmand_sockfd_close(connection->fd);
70 assert_msg(false, "We should never have an internal fd");
71 }
72
73 connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
74 connection->fd= INVALID_SOCKET;
75 connection->events= 0;
76 connection->revents= 0;
77
78 connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
79 connection->send_buffer_ptr= connection->send_buffer;
80 connection->send_buffer_size= 0;
81 connection->send_data_size= 0;
82 connection->send_data_offset= 0;
83
84 connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
85 if (connection->recv_packet != NULL)
86 {
87 gearmand_packet_free(connection->recv_packet);
88 connection->recv_packet= NULL;
89 }
90
91 connection->recv_buffer_ptr= connection->recv_buffer;
92 connection->recv_buffer_size= 0;
93 }
94
_connection_read(gearman_server_con_st * con,void * data,size_t data_size,gearmand_error_t & ret)95 static size_t _connection_read(gearman_server_con_st *con, void *data, size_t data_size, gearmand_error_t &ret)
96 {
97 ssize_t read_size;
98 gearmand_io_st *connection= &con->con;
99
100 while (1)
101 {
102 read_size= recv(connection->fd, data, data_size, MSG_DONTWAIT);
103
104 if (read_size == 0)
105 {
106 ret= GEARMAN_LOST_CONNECTION;
107 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
108 "Peer connection has called close() %s:%s",
109 connection->context == NULL ? "-" : connection->context->host,
110 connection->context == NULL ? "-" : connection->context->port);
111 _connection_close(connection);
112 return 0;
113 }
114 else if (read_size == -1)
115 {
116 int local_errno= errno;
117 switch (local_errno)
118 {
119 case EAGAIN:
120 ret= gearmand_io_set_events(con, POLLIN);
121 if (gearmand_failed(ret))
122 {
123 gearmand_perror(local_errno, "recv");
124 return 0;
125 }
126
127 ret= GEARMAN_IO_WAIT;
128 return 0;
129
130 case EINTR:
131 continue;
132
133 case EPIPE:
134 case ECONNRESET:
135 case EHOSTDOWN:
136 {
137 ret= GEARMAN_LOST_CONNECTION;
138 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
139 "Peer connection has called close() %s:%s",
140 connection->context == NULL ? "-" : connection->context->host,
141 connection->context == NULL ? "-" : connection->context->port);
142 _connection_close(connection);
143 return 0;
144 }
145 break;
146
147 default:
148 ret= GEARMAN_ERRNO;
149 }
150
151 gearmand_perror(local_errno, "closing connection due to previous errno error");
152 _connection_close(connection);
153 return 0;
154 }
155
156 break;
157 }
158
159 ret= GEARMAN_SUCCESS;
160 return size_t(read_size);
161 }
162
gearmand_connection_recv_data(gearman_server_con_st * con,void * data,size_t data_size)163 static gearmand_error_t gearmand_connection_recv_data(gearman_server_con_st *con, void *data, size_t data_size)
164 {
165 gearmand_io_st *connection= &con->con;
166
167 if (connection->recv_data_size == 0)
168 {
169 return GEARMAN_SUCCESS;
170 }
171
172 if ((connection->recv_data_size - connection->recv_data_offset) < data_size)
173 {
174 data_size= connection->recv_data_size - connection->recv_data_offset;
175 }
176
177 size_t recv_size= 0;
178 if (connection->recv_buffer_size > 0)
179 {
180 if (connection->recv_buffer_size < data_size)
181 {
182 recv_size= connection->recv_buffer_size;
183 }
184 else
185 {
186 recv_size= data_size;
187 }
188
189 memcpy(data, connection->recv_buffer_ptr, recv_size);
190 connection->recv_buffer_ptr+= recv_size;
191 connection->recv_buffer_size-= recv_size;
192 }
193
194 gearmand_error_t ret;
195 if (data_size != recv_size)
196 {
197 recv_size+= _connection_read(con, ((uint8_t *)data) + recv_size, data_size - recv_size, ret);
198 connection->recv_data_offset+= recv_size;
199 }
200 else
201 {
202 connection->recv_data_offset+= recv_size;
203 ret= GEARMAN_SUCCESS;
204 }
205
206 if (connection->recv_data_size == connection->recv_data_offset)
207 {
208 connection->recv_data_size= 0;
209 connection->recv_data_offset= 0;
210 connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
211 }
212
213 return ret;
214 }
215
_connection_flush(gearman_server_con_st * con)216 static gearmand_error_t _connection_flush(gearman_server_con_st *con)
217 {
218 gearmand_io_st *connection= &con->con;
219
220 assert(connection->_state == gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED);
221 while (1)
222 {
223 switch (connection->_state)
224 {
225 case gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID:
226 assert(0);
227 return GEARMAN_ERRNO;
228
229 case gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED:
230 while (connection->send_buffer_size)
231 {
232 ssize_t write_size= send(connection->fd, connection->send_buffer_ptr, connection->send_buffer_size, MSG_NOSIGNAL|MSG_DONTWAIT);
233
234 if (write_size == 0) // detect infinite loop?
235 {
236 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() sent zero bytes to peer %s:%s",
237 connection->context == NULL ? "-" : connection->context->host,
238 connection->context == NULL ? "-" : connection->context->port);
239 continue;
240 }
241 else if (write_size == -1)
242 {
243 int local_errno= errno;
244 switch (local_errno)
245 {
246 case EAGAIN:
247 {
248 gearmand_error_t gret= gearmand_io_set_events(con, POLLOUT);
249 if (gret != GEARMAN_SUCCESS)
250 {
251 return gret;
252 }
253 return GEARMAN_IO_WAIT;
254 }
255
256 case EINTR:
257 continue;
258
259 case EPIPE:
260 case ECONNRESET:
261 case EHOSTDOWN:
262 gearmand_perror(local_errno, "lost connection to client during send(EPIPE || ECONNRESET || EHOSTDOWN)");
263 _connection_close(connection);
264 return GEARMAN_LOST_CONNECTION;
265
266 default:
267 break;
268 }
269
270 gearmand_perror(local_errno, "send() failed, closing connection");
271 _connection_close(connection);
272 return GEARMAN_ERRNO;
273 }
274
275 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() %u bytes to peer %s:%s",
276 uint32_t(write_size),
277 connection->context == NULL ? "-" : connection->context->host,
278 connection->context == NULL ? "-" : connection->context->port);
279
280 connection->send_buffer_size-= static_cast<size_t>(write_size);
281 if (connection->send_state == gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA)
282 {
283 connection->send_data_offset+= static_cast<size_t>(write_size);
284 if (connection->send_data_offset == connection->send_data_size)
285 {
286 connection->send_data_size= 0;
287 connection->send_data_offset= 0;
288 break;
289 }
290
291 if (connection->send_buffer_size == 0)
292 {
293 return GEARMAN_SUCCESS;
294 }
295 }
296 else if (connection->send_buffer_size == 0)
297 {
298 break;
299 }
300
301 connection->send_buffer_ptr+= write_size;
302 }
303
304 connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
305 connection->send_buffer_ptr= connection->send_buffer;
306 return GEARMAN_SUCCESS;
307 }
308 }
309 }
310
311 /**
312 * @addtogroup gearmand_io_static Static Connection Declarations
313 * @ingroup gearman_connection
314 * @{
315 */
316
317
gearmand_connection_init(gearmand_connection_list_st * gearman,gearmand_io_st * connection,gearmand_con_st * dcon,gearmand_connection_options_t * options)318 void gearmand_connection_init(gearmand_connection_list_st *gearman,
319 gearmand_io_st *connection,
320 gearmand_con_st *dcon,
321 gearmand_connection_options_t *options)
322 {
323 assert(gearman);
324 assert(connection);
325
326 connection->options.ready= false;
327 connection->options.packet_in_use= false;
328 connection->options.external_fd= false;
329 connection->options.close_after_flush= false;
330
331 if (options)
332 {
333 while (*options != GEARMAND_CON_MAX)
334 {
335 gearman_io_set_option(connection, *options, true);
336 options++;
337 }
338 }
339
340
341 connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
342 connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
343 connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
344 connection->events= 0;
345 connection->revents= 0;
346 connection->fd= INVALID_SOCKET;
347 connection->created_id= 0;
348 connection->created_id_next= 0;
349 connection->send_buffer_size= 0;
350 connection->send_data_size= 0;
351 connection->send_data_offset= 0;
352 connection->recv_buffer_size= 0;
353 connection->recv_data_size= 0;
354 connection->recv_data_offset= 0;
355 connection->universal= gearman;
356
357 GEARMAN_LIST__ADD(gearman->con, connection);
358
359 connection->context= dcon;
360
361 connection->send_buffer_ptr= connection->send_buffer;
362 connection->recv_packet= NULL;
363 connection->recv_buffer_ptr= connection->recv_buffer;
364 }
365
list_free()366 void gearmand_connection_list_st::list_free()
367 {
368 while (con_list)
369 {
370 gearmand_io_free(con_list);
371 }
372 }
373
gearmand_connection_list_st()374 gearmand_connection_list_st::gearmand_connection_list_st() :
375 con_count(0),
376 con_list(NULL),
377 event_watch_fn(NULL),
378 event_watch_context(NULL)
379 {
380 }
381
init(gearmand_event_watch_fn * watch_fn,void * watch_context)382 void gearmand_connection_list_st::init(gearmand_event_watch_fn *watch_fn, void *watch_context)
383 {
384 ready_con_count= 0;
385 ready_con_list= NULL;
386 con_count= 0;
387 con_list= NULL;
388 event_watch_fn= watch_fn;
389 event_watch_context= watch_context;
390 }
391
gearmand_io_free(gearmand_io_st * connection)392 void gearmand_io_free(gearmand_io_st *connection)
393 {
394 if (connection->fd != INVALID_SOCKET)
395 _connection_close(connection);
396
397 if (connection->options.ready)
398 {
399 connection->options.ready= false;
400 GEARMAN_LIST_DEL(connection->universal->ready_con, connection, ready_);
401 }
402
403 GEARMAN_LIST__DEL(connection->universal->con, connection);
404
405 if (connection->options.packet_in_use)
406 {
407 gearmand_packet_free(&(connection->packet));
408 }
409 }
410
gearman_io_set_option(gearmand_io_st * connection,gearmand_connection_options_t options,bool value)411 gearmand_error_t gearman_io_set_option(gearmand_io_st *connection,
412 gearmand_connection_options_t options,
413 bool value)
414 {
415 switch (options)
416 {
417 case GEARMAND_CON_PACKET_IN_USE:
418 connection->options.packet_in_use= value;
419 break;
420 case GEARMAND_CON_EXTERNAL_FD:
421 connection->options.external_fd= value;
422 break;
423 case GEARMAND_CON_CLOSE_AFTER_FLUSH:
424 connection->options.close_after_flush= value;
425 break;
426 case GEARMAND_CON_MAX:
427 return GEARMAN_INVALID_COMMAND;
428 }
429
430 return GEARMAN_SUCCESS;
431 }
432
433 /**
434 * Set socket options for a connection.
435 */
436 static gearmand_error_t _io_setsockopt(gearmand_io_st &connection);
437
438 /** @} */
439
440 /*
441 * Public Definitions
442 */
443
gearman_io_set_fd(gearmand_io_st * connection,int fd)444 gearmand_error_t gearman_io_set_fd(gearmand_io_st *connection, int fd)
445 {
446 assert(connection);
447
448 connection->options.external_fd= true;
449 connection->fd= fd;
450 connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED;
451
452 return _io_setsockopt(*connection);
453 }
454
gearman_io_context(const gearmand_io_st * connection)455 gearmand_con_st *gearman_io_context(const gearmand_io_st *connection)
456 {
457 return connection->context;
458 }
459
gearman_io_send(gearman_server_con_st * con,const gearmand_packet_st * packet,bool flush)460 gearmand_error_t gearman_io_send(gearman_server_con_st *con,
461 const gearmand_packet_st *packet, bool flush)
462 {
463 size_t send_size;
464
465 gearmand_io_st *connection= &con->con;
466
467 switch (connection->send_state)
468 {
469 case gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE:
470 if (! (packet->options.complete))
471 {
472 gearmand_error("packet not complete");
473 return GEARMAN_INVALID_PACKET;
474 }
475
476 /* Pack first part of packet, which is everything but the payload. */
477 while (1)
478 {
479 gearmand_error_t ret;
480 send_size= con->protocol->pack(packet,
481 con,
482 connection->send_buffer +connection->send_buffer_size,
483 GEARMAN_SEND_BUFFER_SIZE -connection->send_buffer_size,
484 ret);
485 if (ret == GEARMAN_SUCCESS)
486 {
487 connection->send_buffer_size+= send_size;
488 break;
489 }
490 else if (ret == GEARMAN_IGNORE_PACKET)
491 {
492 return GEARMAN_SUCCESS;
493 }
494 else if (ret != GEARMAN_FLUSH_DATA)
495 {
496 return ret;
497 }
498
499 /* We were asked to flush when the buffer is already flushed! */
500 if (connection->send_buffer_size == 0)
501 {
502 gearmand_error("send buffer too small");
503
504 return GEARMAN_SEND_BUFFER_TOO_SMALL;
505 }
506
507 /* Flush buffer now if first part of packet won't fit in. */
508 connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH;
509
510 case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH:
511 {
512 gearmand_error_t local_ret;
513 if ((local_ret= _connection_flush(con)) != GEARMAN_SUCCESS)
514 {
515 return local_ret;
516 }
517 }
518 }
519
520 /* Return here if we have no data to send. */
521 if (packet->data_size == 0)
522 {
523 break;
524 }
525
526 /* If there is any room in the buffer, copy in data. */
527 if (packet->data and (GEARMAN_SEND_BUFFER_SIZE - connection->send_buffer_size) > 0)
528 {
529 connection->send_data_offset= GEARMAN_SEND_BUFFER_SIZE - connection->send_buffer_size;
530 if (connection->send_data_offset > packet->data_size)
531 {
532 connection->send_data_offset= packet->data_size;
533 }
534
535 memcpy(connection->send_buffer +connection->send_buffer_size,
536 packet->data,
537 connection->send_data_offset);
538 connection->send_buffer_size+= connection->send_data_offset;
539
540 /* Return if all data fit in the send buffer. */
541 if (connection->send_data_offset == packet->data_size)
542 {
543 connection->send_data_offset= 0;
544 break;
545 }
546 }
547
548 /* Flush buffer now so we can start writing directly from data buffer. */
549 connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH;
550
551 case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH:
552 {
553 gearmand_error_t local_ret;
554 if ((local_ret= _connection_flush(con)) != GEARMAN_SUCCESS)
555 {
556 return local_ret;
557 }
558 }
559
560 connection->send_data_size= packet->data_size;
561
562 /* If this is NULL, then ?? function will be used. */
563 if (packet->data == NULL)
564 {
565 connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
566 return GEARMAN_SUCCESS;
567 }
568
569 /* Copy into the buffer if it fits, otherwise flush from packet buffer. */
570 connection->send_buffer_size= packet->data_size - connection->send_data_offset;
571 if (connection->send_buffer_size < GEARMAN_SEND_BUFFER_SIZE)
572 {
573 memcpy(connection->send_buffer,
574 packet->data + connection->send_data_offset,
575 connection->send_buffer_size);
576 connection->send_data_size= 0;
577 connection->send_data_offset= 0;
578 break;
579 }
580
581 connection->send_buffer_ptr= const_cast<char *>(packet->data) + connection->send_data_offset;
582 connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
583
584 case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH:
585 case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA:
586 {
587 gearmand_error_t local_ret= _connection_flush(con);
588 if (local_ret == GEARMAN_SUCCESS and
589 connection->options.close_after_flush)
590 {
591 _connection_close(connection);
592 local_ret= GEARMAN_LOST_CONNECTION;
593 gearmand_debug("closing connection after flush by request");
594 }
595 return local_ret;
596 }
597 }
598
599 if (flush)
600 {
601 connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH;
602 gearmand_error_t local_ret= _connection_flush(con);
603 if (local_ret == GEARMAN_SUCCESS and connection->options.close_after_flush)
604 {
605 _connection_close(connection);
606 local_ret= GEARMAN_LOST_CONNECTION;
607 gearmand_debug("closing connection after flush by request");
608 }
609 return local_ret;
610 }
611
612 connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
613 return GEARMAN_SUCCESS;
614 }
615
616 #ifndef __INTEL_COMPILER
617 #pragma GCC diagnostic ignored "-Wold-style-cast"
618 #endif
619
620
gearman_io_recv(gearman_server_con_st * con,bool recv_data)621 gearmand_error_t gearman_io_recv(gearman_server_con_st *con, bool recv_data)
622 {
623 gearmand_io_st *connection= &con->con;
624 gearmand_packet_st *packet= &(con->packet->packet);
625
626 switch (connection->recv_state)
627 {
628 case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE:
629 if (connection->_state != gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED)
630 {
631 gearmand_error("not connected");
632 return GEARMAN_NOT_CONNECTED;
633 }
634
635 connection->recv_packet= packet;
636 // The options being passed in are just defaults.
637 connection->recv_packet->reset(GEARMAN_MAGIC_TEXT, GEARMAN_COMMAND_TEXT);
638
639 connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ;
640
641 case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ:
642 while (1)
643 {
644 gearmand_error_t ret;
645
646 if (connection->recv_buffer_size > 0)
647 {
648 assert(con->protocol);
649 size_t recv_size= con->protocol->unpack(connection->recv_packet,
650 con,
651 connection->recv_buffer_ptr,
652 connection->recv_buffer_size, ret);
653 connection->recv_buffer_ptr+= recv_size;
654 connection->recv_buffer_size-= recv_size;
655 if (gearmand_success(ret))
656 {
657 break;
658 }
659 else if (ret != GEARMAN_IO_WAIT)
660 {
661 gearmand_gerror_warn("protocol failure, closing connection", ret);
662 _connection_close(connection);
663 return ret;
664 }
665 }
666
667 /* Shift buffer contents if needed. */
668 if (connection->recv_buffer_size > 0)
669 {
670 memmove(connection->recv_buffer, connection->recv_buffer_ptr, connection->recv_buffer_size);
671 }
672 connection->recv_buffer_ptr= connection->recv_buffer;
673
674 size_t recv_size= _connection_read(con, connection->recv_buffer + connection->recv_buffer_size,
675 GEARMAN_RECV_BUFFER_SIZE - connection->recv_buffer_size, ret);
676 if (gearmand_failed(ret))
677 {
678 // GEARMAN_LOST_CONNECTION is not worth a warning, clients/workers just
679 // drop connections for close.
680 if (ret != GEARMAN_LOST_CONNECTION)
681 {
682 gearmand_gerror_warn("Failed while in _connection_read()", ret);
683 }
684 return ret;
685 }
686 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "read %lu bytes", (unsigned long)recv_size);
687
688 connection->recv_buffer_size+= recv_size;
689 }
690
691 if (packet->data_size == 0)
692 {
693 connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
694 break;
695 }
696
697 connection->recv_data_size= packet->data_size;
698
699 if (not recv_data)
700 {
701 connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
702 break;
703 }
704
705 packet->data= static_cast<char *>(malloc(packet->data_size));
706 if (not packet->data)
707 {
708 // Server up the memory error first, in case _connection_close()
709 // creates any.
710 gearmand_merror("malloc", char, packet->data_size);
711 _connection_close(connection);
712 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
713 }
714
715 packet->options.free_data= true;
716 connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
717
718 case gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA:
719 while (connection->recv_data_size)
720 {
721 gearmand_error_t ret;
722 ret= gearmand_connection_recv_data(con,
723 ((uint8_t *)(packet->data)) +
724 connection->recv_data_offset,
725 packet->data_size -
726 connection->recv_data_offset);
727 if (gearmand_failed(ret))
728 {
729 return ret;
730 }
731 }
732
733 connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
734 break;
735 }
736
737 packet= connection->recv_packet;
738 connection->recv_packet= NULL;
739
740 return GEARMAN_SUCCESS;
741 }
742
gearmand_io_set_events(gearman_server_con_st * con,short events)743 gearmand_error_t gearmand_io_set_events(gearman_server_con_st *con, short events)
744 {
745 gearmand_io_st *connection= &con->con;
746
747 if ((connection->events | events) == connection->events)
748 {
749 return GEARMAN_SUCCESS;
750 }
751
752 connection->events|= events;
753
754 if (connection->universal->event_watch_fn)
755 {
756 gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
757 (void *)connection->universal->event_watch_context);
758 if (gearmand_failed(ret))
759 {
760 gearmand_gerror_warn("event watch failed, closing connection", ret);
761 _connection_close(connection);
762 return ret;
763 }
764 }
765
766 return GEARMAN_SUCCESS;
767 }
768
gearmand_io_set_revents(gearman_server_con_st * con,short revents)769 gearmand_error_t gearmand_io_set_revents(gearman_server_con_st *con, short revents)
770 {
771 gearmand_io_st *connection= &con->con;
772
773 if (revents != 0)
774 {
775 connection->options.ready= true;
776 GEARMAN_LIST_ADD(connection->universal->ready_con, connection, ready_);
777 }
778
779 connection->revents= revents;
780
781 /* Remove external POLLOUT watch if we didn't ask for it. Otherwise we spin
782 forever until another POLLIN state change. This is much more efficient
783 than removing POLLOUT on every state change since some external polling
784 mechanisms need to use a system call to change flags (like Linux epoll). */
785 if (revents & POLLOUT && !(connection->events & POLLOUT) &&
786 connection->universal->event_watch_fn != NULL)
787 {
788 gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
789 (void *)connection->universal->event_watch_context);
790 if (gearmand_failed(ret))
791 {
792 gearmand_gerror_warn("event watch failed, closing connection", ret);
793 _connection_close(connection);
794 return ret;
795 }
796 }
797
798 connection->events&= (short)~revents;
799
800 return GEARMAN_SUCCESS;
801 }
802
803 /*
804 * Static Definitions
805 */
806
_io_setsockopt(gearmand_io_st & connection)807 static gearmand_error_t _io_setsockopt(gearmand_io_st &connection)
808 {
809 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "setsockopt() %d", connection.fd);
810 {
811 int setting= 1;
812 if (setsockopt(connection.fd, IPPROTO_TCP, TCP_NODELAY, &setting, (socklen_t)sizeof(int)) and errno != EOPNOTSUPP)
813 {
814 return gearmand_perror(errno, "setsockopt(TCP_NODELAY)");
815 }
816 }
817
818 {
819 struct linger linger;
820 linger.l_onoff= 1;
821 linger.l_linger= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
822 if (setsockopt(connection.fd, SOL_SOCKET, SO_LINGER, &linger, (socklen_t)sizeof(struct linger)))
823 {
824 return gearmand_perror(errno, "setsockopt(SO_LINGER)");
825 }
826 }
827
828 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
829 {
830 int setting= 1;
831
832 // This is not considered a fatal error
833 if (setsockopt(connection.fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&setting, sizeof(int)))
834 {
835 gearmand_perror(errno, "setsockopt(SO_NOSIGPIPE)");
836 }
837 }
838 #endif
839
840 if (0)
841 {
842 struct timeval waittime;
843 waittime.tv_sec= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
844 waittime.tv_usec= 0;
845 if (setsockopt(connection.fd, SOL_SOCKET, SO_SNDTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
846 {
847 return gearmand_perror(errno, "setsockopt(SO_SNDTIMEO)");
848 }
849
850 if (setsockopt(connection.fd, SOL_SOCKET, SO_RCVTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
851 {
852 return gearmand_perror(errno, "setsockopt(SO_RCVTIMEO)");
853 }
854 }
855
856 if (0)
857 {
858 int setting= GEARMAN_DEFAULT_SOCKET_SEND_SIZE;
859 if (setsockopt(connection.fd, SOL_SOCKET, SO_SNDBUF, &setting, (socklen_t)sizeof(int)))
860 {
861 return gearmand_perror(errno, "setsockopr(SO_SNDBUF)");
862 }
863
864 setting= GEARMAN_DEFAULT_SOCKET_RECV_SIZE;
865 if (setsockopt(connection.fd, SOL_SOCKET, SO_RCVBUF, &setting, (socklen_t)sizeof(int)))
866 {
867 return gearmand_perror(errno, "setsockopt(SO_RCVBUF)");
868 }
869 }
870
871 if (SOCK_NONBLOCK == 0)
872 {
873 int flags;
874 do
875 {
876 flags= fcntl(connection.fd, F_GETFL, 0);
877 } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
878
879 if (flags == -1)
880 {
881 return gearmand_perror(errno, "fcntl(F_GETFL)");
882 }
883 else if ((flags & O_NONBLOCK) == 0)
884 {
885 int retval;
886 do
887 {
888 retval= fcntl(connection.fd, F_SETFL, flags | O_NONBLOCK);
889 } while (retval == -1 and (errno == EINTR or errno == EAGAIN));
890
891 if (retval == -1)
892 {
893 return gearmand_perror(errno, "fcntl(F_SETFL)");
894 }
895 }
896 }
897
898 return GEARMAN_SUCCESS;
899 }
900
gearmand_sockfd_close(int & sockfd)901 void gearmand_sockfd_close(int& sockfd)
902 {
903 if (sockfd == INVALID_SOCKET)
904 {
905 gearmand_error("gearmand_sockfd_close() called with an invalid socket");
906 return;
907 }
908
909 /* in case of death shutdown to avoid blocking at close() */
910 if (shutdown(sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
911 {
912 gearmand_perror(errno, "shutdown");
913 assert(errno != ENOTSOCK);
914 }
915 else if (closesocket(sockfd) == SOCKET_ERROR)
916 {
917 gearmand_perror(errno, "close");
918 }
919
920 sockfd= INVALID_SOCKET;
921 }
922
gearmand_pipe_close(int & pipefd)923 void gearmand_pipe_close(int& pipefd)
924 {
925 if (pipefd == INVALID_SOCKET)
926 {
927 gearmand_error("gearmand_pipe_close() called with an invalid socket");
928 return;
929 }
930
931 if (closesocket(pipefd) == SOCKET_ERROR)
932 {
933 gearmand_perror(errno, "close");
934 }
935
936 pipefd= -1;
937 }
938