1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2008 Brian Aker, Eric Day
7  *  All rights reserved.
8  *
9  *  Redistribution and use in source and binary forms, with or without
10  *  modification, are permitted provided that the following conditions are
11  *  met:
12  *
13  *      * Redistributions of source code must retain the above copyright
14  *  notice, this list of conditions and the following disclaimer.
15  *
16  *      * Redistributions in binary form must reproduce the above
17  *  copyright notice, this list of conditions and the following disclaimer
18  *  in the documentation and/or other materials provided with the
19  *  distribution.
20  *
21  *      * The names of its contributors may not be used to endorse or
22  *  promote products derived from this software without specific prior
23  *  written permission.
24  *
25  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  */
38 
39 /**
40  * @file
41  * @brief Connection Definitions
42  */
43 
44 #include "gear_config.h"
45 #include "libgearman-server/common.h"
46 #include <libgearman-server/plugins/base.h>
47 
48 #include <cstring>
49 #include <cerrno>
50 #include <cassert>
51 
52 #ifndef SOCK_NONBLOCK
53 # define SOCK_NONBLOCK 0
54 #endif
55 
_connection_close(gearmand_io_st * connection)56 static void _connection_close(gearmand_io_st *connection)
57 {
58   if (connection->fd == INVALID_SOCKET)
59   {
60     return;
61   }
62 
63   if (connection->options.external_fd)
64   {
65     connection->options.external_fd= false;
66   }
67   else
68   {
69     (void)gearmand_sockfd_close(connection->fd);
70     assert_msg(false, "We should never have an internal fd");
71   }
72 
73   connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
74   connection->fd= INVALID_SOCKET;
75   connection->events= 0;
76   connection->revents= 0;
77 
78   connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
79   connection->send_buffer_ptr= connection->send_buffer;
80   connection->send_buffer_size= 0;
81   connection->send_data_size= 0;
82   connection->send_data_offset= 0;
83 
84   connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
85   if (connection->recv_packet != NULL)
86   {
87     gearmand_packet_free(connection->recv_packet);
88     connection->recv_packet= NULL;
89   }
90 
91   connection->recv_buffer_ptr= connection->recv_buffer;
92   connection->recv_buffer_size= 0;
93 }
94 
_connection_read(gearman_server_con_st * con,void * data,size_t data_size,gearmand_error_t & ret)95 static size_t _connection_read(gearman_server_con_st *con, void *data, size_t data_size, gearmand_error_t &ret)
96 {
97   ssize_t read_size;
98   gearmand_io_st *connection= &con->con;
99 
100   while (1)
101   {
102     read_size= recv(connection->fd, data, data_size, MSG_DONTWAIT);
103 
104     if (read_size == 0)
105     {
106       ret= GEARMAN_LOST_CONNECTION;
107       gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
108                         "Peer connection has called close() %s:%s",
109                         connection->context == NULL ? "-" : connection->context->host,
110                         connection->context == NULL ? "-" : connection->context->port);
111       _connection_close(connection);
112       return 0;
113     }
114     else if (read_size == -1)
115     {
116       int local_errno= errno;
117       switch (local_errno)
118       {
119       case EAGAIN:
120         ret= gearmand_io_set_events(con, POLLIN);
121         if (gearmand_failed(ret))
122         {
123           gearmand_perror(local_errno, "recv");
124           return 0;
125         }
126 
127         ret= GEARMAN_IO_WAIT;
128         return 0;
129 
130       case EINTR:
131         continue;
132 
133       case EPIPE:
134       case ECONNRESET:
135       case EHOSTDOWN:
136         {
137           ret= GEARMAN_LOST_CONNECTION;
138           gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
139                             "Peer connection has called close() %s:%s",
140                             connection->context == NULL ? "-" : connection->context->host,
141                             connection->context == NULL ? "-" : connection->context->port);
142           _connection_close(connection);
143           return 0;
144         }
145         break;
146 
147       default:
148         ret= GEARMAN_ERRNO;
149       }
150 
151       gearmand_perror(local_errno, "closing connection due to previous errno error");
152       _connection_close(connection);
153       return 0;
154     }
155 
156     break;
157   }
158 
159   ret= GEARMAN_SUCCESS;
160   return size_t(read_size);
161 }
162 
gearmand_connection_recv_data(gearman_server_con_st * con,void * data,size_t data_size)163 static gearmand_error_t gearmand_connection_recv_data(gearman_server_con_st *con, void *data, size_t data_size)
164 {
165   gearmand_io_st *connection= &con->con;
166 
167   if (connection->recv_data_size == 0)
168   {
169     return GEARMAN_SUCCESS;
170   }
171 
172   if ((connection->recv_data_size - connection->recv_data_offset) < data_size)
173   {
174     data_size= connection->recv_data_size - connection->recv_data_offset;
175   }
176 
177   size_t recv_size= 0;
178   if (connection->recv_buffer_size > 0)
179   {
180     if (connection->recv_buffer_size < data_size)
181     {
182       recv_size= connection->recv_buffer_size;
183     }
184     else
185     {
186       recv_size= data_size;
187     }
188 
189     memcpy(data, connection->recv_buffer_ptr, recv_size);
190     connection->recv_buffer_ptr+= recv_size;
191     connection->recv_buffer_size-= recv_size;
192   }
193 
194   gearmand_error_t ret;
195   if (data_size != recv_size)
196   {
197     recv_size+= _connection_read(con, ((uint8_t *)data) + recv_size, data_size - recv_size, ret);
198     connection->recv_data_offset+= recv_size;
199   }
200   else
201   {
202     connection->recv_data_offset+= recv_size;
203     ret= GEARMAN_SUCCESS;
204   }
205 
206   if (connection->recv_data_size == connection->recv_data_offset)
207   {
208     connection->recv_data_size= 0;
209     connection->recv_data_offset= 0;
210     connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
211   }
212 
213   return ret;
214 }
215 
_connection_flush(gearman_server_con_st * con)216 static gearmand_error_t _connection_flush(gearman_server_con_st *con)
217 {
218   gearmand_io_st *connection= &con->con;
219 
220   assert(connection->_state == gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED);
221   while (1)
222   {
223     switch (connection->_state)
224     {
225     case gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID:
226       assert(0);
227       return GEARMAN_ERRNO;
228 
229     case gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED:
230       while (connection->send_buffer_size)
231       {
232         ssize_t write_size= send(connection->fd, connection->send_buffer_ptr, connection->send_buffer_size, MSG_NOSIGNAL|MSG_DONTWAIT);
233 
234         if (write_size == 0) // detect infinite loop?
235         {
236           gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() sent zero bytes to peer %s:%s",
237                              connection->context == NULL ? "-" : connection->context->host,
238                              connection->context == NULL ? "-" : connection->context->port);
239           continue;
240         }
241         else if (write_size == -1)
242         {
243           int local_errno= errno;
244           switch (local_errno)
245           {
246           case EAGAIN:
247             {
248               gearmand_error_t gret= gearmand_io_set_events(con, POLLOUT);
249               if (gret != GEARMAN_SUCCESS)
250               {
251                 return gret;
252               }
253               return GEARMAN_IO_WAIT;
254             }
255 
256           case EINTR:
257             continue;
258 
259           case EPIPE:
260           case ECONNRESET:
261           case EHOSTDOWN:
262             gearmand_perror(local_errno, "lost connection to client during send(EPIPE || ECONNRESET || EHOSTDOWN)");
263             _connection_close(connection);
264             return GEARMAN_LOST_CONNECTION;
265 
266           default:
267             break;
268           }
269 
270           gearmand_perror(local_errno, "send() failed, closing connection");
271           _connection_close(connection);
272           return GEARMAN_ERRNO;
273         }
274 
275         gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "send() %u bytes to peer %s:%s",
276                            uint32_t(write_size),
277                            connection->context == NULL ? "-" : connection->context->host,
278                            connection->context == NULL ? "-" : connection->context->port);
279 
280         connection->send_buffer_size-= static_cast<size_t>(write_size);
281         if (connection->send_state == gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA)
282         {
283           connection->send_data_offset+= static_cast<size_t>(write_size);
284           if (connection->send_data_offset == connection->send_data_size)
285           {
286             connection->send_data_size= 0;
287             connection->send_data_offset= 0;
288             break;
289           }
290 
291           if (connection->send_buffer_size == 0)
292           {
293             return GEARMAN_SUCCESS;
294           }
295         }
296         else if (connection->send_buffer_size == 0)
297         {
298           break;
299         }
300 
301         connection->send_buffer_ptr+= write_size;
302       }
303 
304       connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
305       connection->send_buffer_ptr= connection->send_buffer;
306       return GEARMAN_SUCCESS;
307     }
308   }
309 }
310 
311 /**
312  * @addtogroup gearmand_io_static Static Connection Declarations
313  * @ingroup gearman_connection
314  * @{
315  */
316 
317 
gearmand_connection_init(gearmand_connection_list_st * gearman,gearmand_io_st * connection,gearmand_con_st * dcon,gearmand_connection_options_t * options)318 void gearmand_connection_init(gearmand_connection_list_st *gearman,
319                               gearmand_io_st *connection,
320                               gearmand_con_st *dcon,
321                               gearmand_connection_options_t *options)
322 {
323   assert(gearman);
324   assert(connection);
325 
326   connection->options.ready= false;
327   connection->options.packet_in_use= false;
328   connection->options.external_fd= false;
329   connection->options.close_after_flush= false;
330 
331   if (options)
332   {
333     while (*options != GEARMAND_CON_MAX)
334     {
335       gearman_io_set_option(connection, *options, true);
336       options++;
337     }
338   }
339 
340 
341   connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_INVALID;
342   connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
343   connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
344   connection->events= 0;
345   connection->revents= 0;
346   connection->fd= INVALID_SOCKET;
347   connection->created_id= 0;
348   connection->created_id_next= 0;
349   connection->send_buffer_size= 0;
350   connection->send_data_size= 0;
351   connection->send_data_offset= 0;
352   connection->recv_buffer_size= 0;
353   connection->recv_data_size= 0;
354   connection->recv_data_offset= 0;
355   connection->universal= gearman;
356 
357   GEARMAN_LIST__ADD(gearman->con, connection);
358 
359   connection->context= dcon;
360 
361   connection->send_buffer_ptr= connection->send_buffer;
362   connection->recv_packet= NULL;
363   connection->recv_buffer_ptr= connection->recv_buffer;
364 }
365 
list_free()366 void gearmand_connection_list_st::list_free()
367 {
368   while (con_list)
369   {
370     gearmand_io_free(con_list);
371   }
372 }
373 
gearmand_connection_list_st()374 gearmand_connection_list_st::gearmand_connection_list_st() :
375   con_count(0),
376   con_list(NULL),
377   event_watch_fn(NULL),
378   event_watch_context(NULL)
379 {
380 }
381 
init(gearmand_event_watch_fn * watch_fn,void * watch_context)382 void gearmand_connection_list_st::init(gearmand_event_watch_fn *watch_fn, void *watch_context)
383 {
384   ready_con_count= 0;
385   ready_con_list= NULL;
386   con_count= 0;
387   con_list= NULL;
388   event_watch_fn= watch_fn;
389   event_watch_context= watch_context;
390 }
391 
gearmand_io_free(gearmand_io_st * connection)392 void gearmand_io_free(gearmand_io_st *connection)
393 {
394   if (connection->fd != INVALID_SOCKET)
395     _connection_close(connection);
396 
397   if (connection->options.ready)
398   {
399     connection->options.ready= false;
400     GEARMAN_LIST_DEL(connection->universal->ready_con, connection, ready_);
401   }
402 
403   GEARMAN_LIST__DEL(connection->universal->con, connection);
404 
405   if (connection->options.packet_in_use)
406   {
407     gearmand_packet_free(&(connection->packet));
408   }
409 }
410 
gearman_io_set_option(gearmand_io_st * connection,gearmand_connection_options_t options,bool value)411 gearmand_error_t gearman_io_set_option(gearmand_io_st *connection,
412                                                gearmand_connection_options_t options,
413                                                bool value)
414 {
415   switch (options)
416   {
417   case GEARMAND_CON_PACKET_IN_USE:
418     connection->options.packet_in_use= value;
419     break;
420   case GEARMAND_CON_EXTERNAL_FD:
421     connection->options.external_fd= value;
422     break;
423   case GEARMAND_CON_CLOSE_AFTER_FLUSH:
424     connection->options.close_after_flush= value;
425     break;
426   case GEARMAND_CON_MAX:
427     return GEARMAN_INVALID_COMMAND;
428   }
429 
430   return GEARMAN_SUCCESS;
431 }
432 
433 /**
434  * Set socket options for a connection.
435  */
436 static gearmand_error_t _io_setsockopt(gearmand_io_st &connection);
437 
438 /** @} */
439 
440 /*
441  * Public Definitions
442  */
443 
gearman_io_set_fd(gearmand_io_st * connection,int fd)444 gearmand_error_t gearman_io_set_fd(gearmand_io_st *connection, int fd)
445 {
446   assert(connection);
447 
448   connection->options.external_fd= true;
449   connection->fd= fd;
450   connection->_state= gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED;
451 
452   return _io_setsockopt(*connection);
453 }
454 
gearman_io_context(const gearmand_io_st * connection)455 gearmand_con_st *gearman_io_context(const gearmand_io_st *connection)
456 {
457   return connection->context;
458 }
459 
gearman_io_send(gearman_server_con_st * con,const gearmand_packet_st * packet,bool flush)460 gearmand_error_t gearman_io_send(gearman_server_con_st *con,
461                                  const gearmand_packet_st *packet, bool flush)
462 {
463   size_t send_size;
464 
465   gearmand_io_st *connection= &con->con;
466 
467   switch (connection->send_state)
468   {
469   case gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE:
470     if (! (packet->options.complete))
471     {
472       gearmand_error("packet not complete");
473       return GEARMAN_INVALID_PACKET;
474     }
475 
476     /* Pack first part of packet, which is everything but the payload. */
477     while (1)
478     {
479       gearmand_error_t ret;
480       send_size= con->protocol->pack(packet,
481                                      con,
482                                      connection->send_buffer +connection->send_buffer_size,
483                                      GEARMAN_SEND_BUFFER_SIZE -connection->send_buffer_size,
484                                      ret);
485       if (ret == GEARMAN_SUCCESS)
486       {
487         connection->send_buffer_size+= send_size;
488         break;
489       }
490       else if (ret == GEARMAN_IGNORE_PACKET)
491       {
492         return GEARMAN_SUCCESS;
493       }
494       else if (ret != GEARMAN_FLUSH_DATA)
495       {
496         return ret;
497       }
498 
499       /* We were asked to flush when the buffer is already flushed! */
500       if (connection->send_buffer_size == 0)
501       {
502         gearmand_error("send buffer too small");
503 
504         return GEARMAN_SEND_BUFFER_TOO_SMALL;
505       }
506 
507       /* Flush buffer now if first part of packet won't fit in. */
508       connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH;
509 
510     case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_PRE_FLUSH:
511       {
512         gearmand_error_t local_ret;
513         if ((local_ret= _connection_flush(con)) != GEARMAN_SUCCESS)
514         {
515           return local_ret;
516         }
517       }
518     }
519 
520     /* Return here if we have no data to send. */
521     if (packet->data_size == 0)
522     {
523       break;
524     }
525 
526     /* If there is any room in the buffer, copy in data. */
527     if (packet->data and (GEARMAN_SEND_BUFFER_SIZE - connection->send_buffer_size) > 0)
528     {
529       connection->send_data_offset= GEARMAN_SEND_BUFFER_SIZE - connection->send_buffer_size;
530       if (connection->send_data_offset > packet->data_size)
531       {
532         connection->send_data_offset= packet->data_size;
533       }
534 
535       memcpy(connection->send_buffer +connection->send_buffer_size,
536              packet->data,
537              connection->send_data_offset);
538       connection->send_buffer_size+= connection->send_data_offset;
539 
540       /* Return if all data fit in the send buffer. */
541       if (connection->send_data_offset == packet->data_size)
542       {
543         connection->send_data_offset= 0;
544         break;
545       }
546     }
547 
548     /* Flush buffer now so we can start writing directly from data buffer. */
549     connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH;
550 
551   case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FORCE_FLUSH:
552     {
553       gearmand_error_t local_ret;
554       if ((local_ret= _connection_flush(con)) != GEARMAN_SUCCESS)
555       {
556         return local_ret;
557       }
558     }
559 
560     connection->send_data_size= packet->data_size;
561 
562     /* If this is NULL, then ?? function will be used. */
563     if (packet->data == NULL)
564     {
565       connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
566       return GEARMAN_SUCCESS;
567     }
568 
569     /* Copy into the buffer if it fits, otherwise flush from packet buffer. */
570     connection->send_buffer_size= packet->data_size - connection->send_data_offset;
571     if (connection->send_buffer_size < GEARMAN_SEND_BUFFER_SIZE)
572     {
573       memcpy(connection->send_buffer,
574              packet->data + connection->send_data_offset,
575              connection->send_buffer_size);
576       connection->send_data_size= 0;
577       connection->send_data_offset= 0;
578       break;
579     }
580 
581     connection->send_buffer_ptr= const_cast<char *>(packet->data) + connection->send_data_offset;
582     connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA;
583 
584   case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH:
585   case gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH_DATA:
586     {
587       gearmand_error_t local_ret= _connection_flush(con);
588       if (local_ret == GEARMAN_SUCCESS and
589           connection->options.close_after_flush)
590       {
591         _connection_close(connection);
592         local_ret= GEARMAN_LOST_CONNECTION;
593         gearmand_debug("closing connection after flush by request");
594       }
595       return local_ret;
596     }
597   }
598 
599   if (flush)
600   {
601     connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_UNIVERSAL_FLUSH;
602     gearmand_error_t local_ret= _connection_flush(con);
603     if (local_ret == GEARMAN_SUCCESS and connection->options.close_after_flush)
604     {
605       _connection_close(connection);
606       local_ret= GEARMAN_LOST_CONNECTION;
607       gearmand_debug("closing connection after flush by request");
608     }
609     return local_ret;
610   }
611 
612   connection->send_state= gearmand_io_st::GEARMAND_CON_SEND_STATE_NONE;
613   return GEARMAN_SUCCESS;
614 }
615 
616 #ifndef __INTEL_COMPILER
617 #pragma GCC diagnostic ignored "-Wold-style-cast"
618 #endif
619 
620 
gearman_io_recv(gearman_server_con_st * con,bool recv_data)621 gearmand_error_t gearman_io_recv(gearman_server_con_st *con, bool recv_data)
622 {
623   gearmand_io_st *connection= &con->con;
624   gearmand_packet_st *packet= &(con->packet->packet);
625 
626   switch (connection->recv_state)
627   {
628   case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE:
629     if (connection->_state != gearmand_io_st::GEARMAND_CON_UNIVERSAL_CONNECTED)
630     {
631       gearmand_error("not connected");
632       return GEARMAN_NOT_CONNECTED;
633     }
634 
635     connection->recv_packet= packet;
636     // The options being passed in are just defaults.
637     connection->recv_packet->reset(GEARMAN_MAGIC_TEXT, GEARMAN_COMMAND_TEXT);
638 
639     connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ;
640 
641   case gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_READ:
642     while (1)
643     {
644       gearmand_error_t ret;
645 
646       if (connection->recv_buffer_size > 0)
647       {
648         assert(con->protocol);
649         size_t recv_size= con->protocol->unpack(connection->recv_packet,
650                                                 con,
651                                                 connection->recv_buffer_ptr,
652                                                 connection->recv_buffer_size, ret);
653         connection->recv_buffer_ptr+= recv_size;
654         connection->recv_buffer_size-= recv_size;
655         if (gearmand_success(ret))
656         {
657           break;
658         }
659         else if (ret != GEARMAN_IO_WAIT)
660         {
661           gearmand_gerror_warn("protocol failure, closing connection", ret);
662           _connection_close(connection);
663           return ret;
664         }
665       }
666 
667       /* Shift buffer contents if needed. */
668       if (connection->recv_buffer_size > 0)
669       {
670         memmove(connection->recv_buffer, connection->recv_buffer_ptr, connection->recv_buffer_size);
671       }
672       connection->recv_buffer_ptr= connection->recv_buffer;
673 
674       size_t recv_size= _connection_read(con, connection->recv_buffer + connection->recv_buffer_size,
675 					 GEARMAN_RECV_BUFFER_SIZE - connection->recv_buffer_size, ret);
676       if (gearmand_failed(ret))
677       {
678         // GEARMAN_LOST_CONNECTION is not worth a warning, clients/workers just
679         // drop connections for close.
680         if (ret != GEARMAN_LOST_CONNECTION)
681         {
682           gearmand_gerror_warn("Failed while in _connection_read()", ret);
683         }
684         return ret;
685       }
686       gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "read %lu bytes", (unsigned long)recv_size);
687 
688       connection->recv_buffer_size+= recv_size;
689     }
690 
691     if (packet->data_size == 0)
692     {
693       connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
694       break;
695     }
696 
697     connection->recv_data_size= packet->data_size;
698 
699     if (not recv_data)
700     {
701       connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
702       break;
703     }
704 
705     packet->data= static_cast<char *>(malloc(packet->data_size));
706     if (not packet->data)
707     {
708       // Server up the memory error first, in case _connection_close()
709       // creates any.
710       gearmand_merror("malloc", char, packet->data_size);
711       _connection_close(connection);
712       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
713     }
714 
715     packet->options.free_data= true;
716     connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA;
717 
718   case gearmand_io_st::GEARMAND_CON_RECV_STATE_READ_DATA:
719     while (connection->recv_data_size)
720     {
721       gearmand_error_t ret;
722       ret= gearmand_connection_recv_data(con,
723                                          ((uint8_t *)(packet->data)) +
724                                          connection->recv_data_offset,
725                                          packet->data_size -
726                                          connection->recv_data_offset);
727       if (gearmand_failed(ret))
728       {
729         return ret;
730       }
731     }
732 
733     connection->recv_state= gearmand_io_st::GEARMAND_CON_RECV_UNIVERSAL_NONE;
734     break;
735   }
736 
737   packet= connection->recv_packet;
738   connection->recv_packet= NULL;
739 
740   return GEARMAN_SUCCESS;
741 }
742 
gearmand_io_set_events(gearman_server_con_st * con,short events)743 gearmand_error_t gearmand_io_set_events(gearman_server_con_st *con, short events)
744 {
745   gearmand_io_st *connection= &con->con;
746 
747   if ((connection->events | events) == connection->events)
748   {
749     return GEARMAN_SUCCESS;
750   }
751 
752   connection->events|= events;
753 
754   if (connection->universal->event_watch_fn)
755   {
756     gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
757                                                                 (void *)connection->universal->event_watch_context);
758     if (gearmand_failed(ret))
759     {
760       gearmand_gerror_warn("event watch failed, closing connection", ret);
761       _connection_close(connection);
762       return ret;
763     }
764   }
765 
766   return GEARMAN_SUCCESS;
767 }
768 
gearmand_io_set_revents(gearman_server_con_st * con,short revents)769 gearmand_error_t gearmand_io_set_revents(gearman_server_con_st *con, short revents)
770 {
771   gearmand_io_st *connection= &con->con;
772 
773   if (revents != 0)
774   {
775     connection->options.ready= true;
776     GEARMAN_LIST_ADD(connection->universal->ready_con, connection, ready_);
777   }
778 
779   connection->revents= revents;
780 
781   /* Remove external POLLOUT watch if we didn't ask for it. Otherwise we spin
782     forever until another POLLIN state change. This is much more efficient
783     than removing POLLOUT on every state change since some external polling
784     mechanisms need to use a system call to change flags (like Linux epoll). */
785   if (revents & POLLOUT && !(connection->events & POLLOUT) &&
786       connection->universal->event_watch_fn != NULL)
787   {
788     gearmand_error_t ret= connection->universal->event_watch_fn(connection, connection->events,
789                                                                 (void *)connection->universal->event_watch_context);
790     if (gearmand_failed(ret))
791     {
792       gearmand_gerror_warn("event watch failed, closing connection", ret);
793       _connection_close(connection);
794       return ret;
795     }
796   }
797 
798   connection->events&= (short)~revents;
799 
800   return GEARMAN_SUCCESS;
801 }
802 
803 /*
804  * Static Definitions
805  */
806 
_io_setsockopt(gearmand_io_st & connection)807 static gearmand_error_t _io_setsockopt(gearmand_io_st &connection)
808 {
809   gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "setsockopt() %d", connection.fd);
810   {
811     int setting= 1;
812     if (setsockopt(connection.fd, IPPROTO_TCP, TCP_NODELAY, &setting, (socklen_t)sizeof(int)) and errno != EOPNOTSUPP)
813     {
814       return gearmand_perror(errno, "setsockopt(TCP_NODELAY)");
815     }
816   }
817 
818   {
819     struct linger linger;
820     linger.l_onoff= 1;
821     linger.l_linger= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
822     if (setsockopt(connection.fd, SOL_SOCKET, SO_LINGER, &linger, (socklen_t)sizeof(struct linger)))
823     {
824       return gearmand_perror(errno, "setsockopt(SO_LINGER)");
825     }
826   }
827 
828 #if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
829   {
830     int setting= 1;
831 
832     // This is not considered a fatal error
833     if (setsockopt(connection.fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&setting, sizeof(int)))
834     {
835       gearmand_perror(errno, "setsockopt(SO_NOSIGPIPE)");
836     }
837   }
838 #endif
839 
840   if (0)
841   {
842     struct timeval waittime;
843     waittime.tv_sec= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
844     waittime.tv_usec= 0;
845     if (setsockopt(connection.fd, SOL_SOCKET, SO_SNDTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
846     {
847       return gearmand_perror(errno, "setsockopt(SO_SNDTIMEO)");
848     }
849 
850     if (setsockopt(connection.fd, SOL_SOCKET, SO_RCVTIMEO, &waittime, (socklen_t)sizeof(struct timeval)) and errno != ENOPROTOOPT)
851     {
852       return gearmand_perror(errno, "setsockopt(SO_RCVTIMEO)");
853     }
854   }
855 
856   if (0)
857   {
858     int setting= GEARMAN_DEFAULT_SOCKET_SEND_SIZE;
859     if (setsockopt(connection.fd, SOL_SOCKET, SO_SNDBUF, &setting, (socklen_t)sizeof(int)))
860     {
861       return gearmand_perror(errno, "setsockopr(SO_SNDBUF)");
862     }
863 
864     setting= GEARMAN_DEFAULT_SOCKET_RECV_SIZE;
865     if (setsockopt(connection.fd, SOL_SOCKET, SO_RCVBUF, &setting, (socklen_t)sizeof(int)))
866     {
867       return gearmand_perror(errno, "setsockopt(SO_RCVBUF)");
868     }
869   }
870 
871   if (SOCK_NONBLOCK == 0)
872   {
873     int flags;
874     do
875     {
876       flags= fcntl(connection.fd, F_GETFL, 0);
877     } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
878 
879     if (flags == -1)
880     {
881       return gearmand_perror(errno, "fcntl(F_GETFL)");
882     }
883     else if ((flags & O_NONBLOCK) == 0)
884     {
885       int retval;
886       do
887       {
888         retval= fcntl(connection.fd, F_SETFL, flags | O_NONBLOCK);
889       } while (retval == -1 and (errno == EINTR or errno == EAGAIN));
890 
891       if (retval == -1)
892       {
893         return gearmand_perror(errno, "fcntl(F_SETFL)");
894       }
895     }
896   }
897 
898   return GEARMAN_SUCCESS;
899 }
900 
gearmand_sockfd_close(int & sockfd)901 void gearmand_sockfd_close(int& sockfd)
902 {
903   if (sockfd == INVALID_SOCKET)
904   {
905     gearmand_error("gearmand_sockfd_close() called with an invalid socket");
906     return;
907   }
908 
909   /* in case of death shutdown to avoid blocking at close() */
910   if (shutdown(sockfd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
911   {
912     gearmand_perror(errno, "shutdown");
913     assert(errno != ENOTSOCK);
914   }
915   else if (closesocket(sockfd) == SOCKET_ERROR)
916   {
917     gearmand_perror(errno, "close");
918   }
919 
920   sockfd= INVALID_SOCKET;
921 }
922 
gearmand_pipe_close(int & pipefd)923 void gearmand_pipe_close(int& pipefd)
924 {
925   if (pipefd == INVALID_SOCKET)
926   {
927     gearmand_error("gearmand_pipe_close() called with an invalid socket");
928     return;
929   }
930 
931   if (closesocket(pipefd) == SOCKET_ERROR)
932   {
933     gearmand_perror(errno, "close");
934   }
935 
936   pipefd= -1;
937 }
938