1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2008 Brian Aker, Eric Day
7  *  All rights reserved.
8  *
9  *  Redistribution and use in source and binary forms, with or without
10  *  modification, are permitted provided that the following conditions are
11  *  met:
12  *
13  *      * Redistributions of source code must retain the above copyright
14  *  notice, this list of conditions and the following disclaimer.
15  *
16  *      * Redistributions in binary form must reproduce the above
17  *  copyright notice, this list of conditions and the following disclaimer
18  *  in the documentation and/or other materials provided with the
19  *  distribution.
20  *
21  *      * The names of its contributors may not be used to endorse or
22  *  promote products derived from this software without specific prior
23  *  written permission.
24  *
25  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  */
38 
39 /**
40  * @file
41  * @brief Gearmand Definitions
42  */
43 
44 #include "gear_config.h"
45 #include "libgearman-server/common.h"
46 
47 #include <cerrno>
48 #include <netdb.h>
49 #include <sys/socket.h>
50 #include <sys/types.h>
51 #include <sys/utsname.h>
52 #include <unistd.h>
53 
54 #include <set>
55 #include <string>
56 #include <vector>
57 
58 #include <libgearman-server/gearmand.h>
59 
60 #include <libgearman-server/struct/port.h>
61 #include <libgearman-server/plugins.h>
62 #include <libgearman-server/timer.h>
63 
64 #include "util/memory.h"
65 using namespace org::tangent;
66 
67 using namespace gearmand;
68 
69 #ifndef SOCK_NONBLOCK
70 # define SOCK_NONBLOCK 0
71 #endif
72 
73 /*
74  * Private declarations
75  */
76 
77 /**
78  * @addtogroup gearmand_private Private Gearman Daemon Functions
79  * @ingroup gearmand
80  * @{
81  */
82 
83 static gearmand_error_t _listen_init(gearmand_st *gearmand);
84 static void _listen_close(gearmand_st *gearmand);
85 static gearmand_error_t _listen_watch(gearmand_st *gearmand);
86 static void _listen_clear(gearmand_st *gearmand);
87 static void _listen_event(int fd, short events, void *arg);
88 
89 static gearmand_error_t _wakeup_init(gearmand_st *gearmand);
90 static void _wakeup_close(gearmand_st *gearmand);
91 static gearmand_error_t _wakeup_watch(gearmand_st *gearmand);
92 static void _wakeup_clear(gearmand_st *gearmand);
93 static void _wakeup_event(int fd, short events, void *arg);
94 
95 static gearmand_error_t _watch_events(gearmand_st *gearmand);
96 static void _clear_events(gearmand_st *gearmand);
97 static void _close_events(gearmand_st *gearmand);
98 
99 static bool gearman_server_create(gearman_server_st& server,
100                                   const uint32_t job_retries,
101                                   const char *job_handle_prefix,
102                                   uint8_t worker_wakeup,
103                                   bool round_robin,
104                                   uint32_t hashtable_buckets);
105 static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
106                                 void *context, const gearmand_verbose_t verbose);
107 
108 
gearman_server_free(gearman_server_st & server)109 static void gearman_server_free(gearman_server_st& server)
110 {
111   /* All threads should be cleaned up before calling this. */
112   assert(server.thread_list == NULL);
113 
114   for (uint32_t key= 0; key < server.hashtable_buckets; key++)
115   {
116     while (server.job_hash[key] != NULL)
117     {
118       gearman_server_job_free(server.job_hash[key]);
119     }
120   }
121 
122   for (uint32_t function_key= 0; function_key < GEARMAND_DEFAULT_HASH_SIZE;
123        function_key++)
124   {
125     while(server.function_hash[function_key] != NULL)
126     {
127       gearman_server_function_free(&server, server.function_hash[function_key]);
128     }
129   }
130 
131   while (server.free_packet_list != NULL)
132   {
133     gearman_server_packet_st *packet= server.free_packet_list;
134     server.free_packet_list= packet->next;
135     delete packet;
136   }
137 
138   while (server.free_job_list != NULL)
139   {
140     gearman_server_job_st* job= server.free_job_list;
141     server.free_job_list= job->next;
142     delete job;
143   }
144 
145   while (server.free_client_list != NULL)
146   {
147     gearman_server_client_st* client= server.free_client_list;
148     server.free_client_list= client->con_next;
149     delete client;
150   }
151 
152   while (server.free_worker_list != NULL)
153   {
154     gearman_server_worker_st* worker= server.free_worker_list;
155     server.free_worker_list= worker->con_next;
156     delete worker;
157   }
158 
159   gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "removing queue: %s", (server.queue_version == QUEUE_VERSION_CLASS) ? "CLASS" : "FUNCTION");
160   if (server.queue_version == QUEUE_VERSION_CLASS)
161   {
162     delete server.queue.object;
163     assert(server.queue.functions == NULL);
164   }
165   else if (server.queue_version == QUEUE_VERSION_FUNCTION)
166   {
167     delete server.queue.functions;
168     assert(server.queue.object == NULL);
169   }
170   else
171   {
172     gearmand_debug("Unknown queue type in removal");
173   }
174 
175   free(server.job_hash);
176   free(server.unique_hash);
177   free(server.function_hash);
178 }
179 
180 /** @} */
181 
182 #ifndef __INTEL_COMPILER
183 #pragma GCC diagnostic ignored "-Wold-style-cast"
184 #endif
185 
186 /*
187  * Public definitions
188  */
189 
190 static gearmand_st *_global_gearmand= NULL;
191 
Gearmand(void)192 gearmand_st *Gearmand(void)
193 {
194   if (!_global_gearmand)
195   {
196     gearmand_error("Gearmand() was called before it was allocated");
197     assert_msg(false, "Gearmand() was called before it was allocated");
198   }
199   assert(_global_gearmand);
200   return _global_gearmand;
201 }
202 
gearmand_create(const char * host_arg,uint32_t threads_arg,int backlog_arg,const uint32_t job_retries,const char * job_handle_prefix,uint8_t worker_wakeup,gearmand_log_fn * log_function,void * log_context,const gearmand_verbose_t verbose_arg,bool round_robin,bool exceptions_,uint32_t hashtable_buckets)203 gearmand_st *gearmand_create(const char *host_arg,
204                              uint32_t threads_arg,
205                              int backlog_arg,
206                              const uint32_t job_retries,
207                              const char *job_handle_prefix,
208                              uint8_t worker_wakeup,
209                              gearmand_log_fn *log_function, void *log_context, const gearmand_verbose_t verbose_arg,
210                              bool round_robin,
211                              bool exceptions_,
212                              uint32_t hashtable_buckets)
213 {
214   assert(_global_gearmand == NULL);
215   if (_global_gearmand)
216   {
217     gearmand_error("You have called gearmand_create() twice within your application.");
218     _exit(EXIT_FAILURE);
219   }
220 
221   gearmand_st* gearmand= new (std::nothrow) gearmand_st(host_arg, threads_arg, backlog_arg, verbose_arg, exceptions_);
222   if (gearmand == NULL)
223   {
224     gearmand_perror(errno, "Failed to new() gearmand_st");
225     return NULL;
226   }
227   _global_gearmand= gearmand;
228 
229   if (gearman_server_create(gearmand->server, job_retries,
230                             job_handle_prefix, worker_wakeup,
231                             round_robin, hashtable_buckets) == false)
232   {
233     delete gearmand;
234     _global_gearmand= NULL;
235     return NULL;
236   }
237 
238   gearmand_set_log_fn(gearmand, log_function, log_context, verbose_arg);
239 
240   return gearmand;
241 }
242 
gearmand_free(gearmand_st * gearmand)243 void gearmand_free(gearmand_st *gearmand)
244 {
245   if (gearmand)
246   {
247     _close_events(gearmand);
248 
249     if (gearmand->threads > 0)
250     {
251       gearmand_debug("Shutting down all threads");
252     }
253 
254     while (gearmand->thread_list != NULL)
255     {
256       gearmand_thread_free(gearmand->thread_list);
257     }
258 
259     while (gearmand->free_dcon_list != NULL)
260     {
261       gearmand_con_st* dcon= gearmand->free_dcon_list;
262       gearmand->free_dcon_list= dcon->next;
263       delete dcon;
264     }
265 
266     if (gearmand->base != NULL)
267     {
268       event_base_free(gearmand->base);
269       gearmand->base= NULL;
270     }
271 
272     gearman_server_free(gearmand->server);
273 
274     gearmand_info("Shutdown complete");
275 
276     delete gearmand;
277   }
278 }
279 
gearmand_set_log_fn(gearmand_st * gearmand,gearmand_log_fn * function,void * context,const gearmand_verbose_t verbose)280 static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
281                                 void *context, const gearmand_verbose_t verbose)
282 {
283   gearmand->log_fn= function;
284   gearmand->log_context= context;
285   gearmand->verbose= verbose;
286 }
287 
gearmand_exceptions(gearmand_st * gearmand)288 bool gearmand_exceptions(gearmand_st *gearmand)
289 {
290   return gearmand->exceptions();
291 }
292 
gearmand_port_add(gearmand_st * gearmand,const char * port,gearmand_connection_add_fn * function)293 gearmand_error_t gearmand_port_add(gearmand_st *gearmand, const char *port,
294                                    gearmand_connection_add_fn *function)
295 {
296   gearmand->_port_list.resize(gearmand->_port_list.size() +1);
297 
298   strncpy(gearmand->_port_list.back().port, port, NI_MAXSERV);
299   gearmand->_port_list.back().add_fn= function;
300 
301   return GEARMAN_SUCCESS;
302 }
303 
gearmand_server(gearmand_st * gearmand)304 gearman_server_st *gearmand_server(gearmand_st *gearmand)
305 {
306   return &gearmand->server;
307 }
308 
gearmand_run(gearmand_st * gearmand)309 gearmand_error_t gearmand_run(gearmand_st *gearmand)
310 {
311   libgearman::server::Epoch epoch;
312 
313   /* Initialize server components. */
314   if (gearmand->base == NULL)
315   {
316     gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up(%lu), verbose set to %s",
317                       (unsigned long)(getpid()),
318                       gearmand_verbose_name(gearmand->verbose));
319 
320     if (gearmand->threads > 0)
321     {
322       /* Set the number of free connection structures each thread should keep
323          around before the main thread is forced to take them. We compute this
324          here so we don't need to on every new connection. */
325       gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
326                                               gearmand->threads) / 2);
327     }
328 
329     gearmand->base= static_cast<struct event_base *>(event_base_new());
330     if (gearmand->base == NULL)
331     {
332       gearmand_fatal("event_base_new(NULL)");
333       return GEARMAN_EVENT;
334     }
335 
336     gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Method for libevent: %s", event_base_get_method(gearmand->base));
337 
338     gearmand->ret= _listen_init(gearmand);
339     if (gearmand->ret != GEARMAN_SUCCESS)
340     {
341       return gearmand->ret;
342     }
343 
344     gearmand->ret= _wakeup_init(gearmand);
345     if (gearmand->ret != GEARMAN_SUCCESS)
346     {
347       return gearmand->ret;
348     }
349 
350     gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Creating %u threads", gearmand->threads);
351 
352     /* If we have 0 threads we still need to create a fake one for context. */
353     uint32_t x= 0;
354     do
355     {
356       gearmand->ret= gearmand_thread_create(gearmand);
357       if (gearmand->ret != GEARMAN_SUCCESS)
358         return gearmand->ret;
359       x++;
360     }
361     while (x < gearmand->threads);
362 
363     gearmand_debug("replaying queue: begin");
364     gearmand->ret= gearman_server_queue_replay(gearmand->server);
365     if (gearmand_failed(gearmand->ret))
366     {
367       return gearmand_gerror("failed to reload queue", gearmand->ret);
368     }
369     gearmand_debug("replaying queue: end");
370   }
371 
372   gearmand->ret= _watch_events(gearmand);
373   if (gearmand_failed(gearmand->ret))
374   {
375     return gearmand->ret;
376   }
377 
378   gearmand_debug("Entering main event loop");
379 
380   if (event_base_loop(gearmand->base, 0) == -1)
381   {
382     gearmand_fatal("event_base_loop(-1)");
383     return GEARMAN_EVENT;
384   }
385 
386   gearmand_debug("Exited main event loop");
387 
388   return gearmand->ret;
389 }
390 
gearmand_wakeup(gearmand_st * gearmand,gearmand_wakeup_t wakeup)391 void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
392 {
393   /*
394     If this fails, there is not much we can really do. This should never fail though if the main gearmand thread is still active.
395   */
396   if (gearmand->wakeup_fd[1] != -1)
397   {
398     int limit= 5;
399     while (--limit)  // limit is for EINTR
400     {
401       ssize_t written;
402       uint8_t buffer= wakeup;
403       if ((written= write(gearmand->wakeup_fd[1], &buffer, 1)) != 1)
404       {
405         if (written < 0)
406         {
407           switch (errno)
408           {
409           case EINTR:
410             continue;
411 
412           default:
413             break;
414           }
415 
416           gearmand_perror(errno, gearmand_strwakeup(wakeup));
417         }
418         else
419         {
420           gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
421                              "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
422         }
423       }
424 
425       return;
426     }
427   }
428 }
429 
430 
431 /*
432  * Private definitions
433  */
434 
set_socket(int & fd,struct addrinfo * addrinfo_next)435 gearmand_error_t set_socket(int& fd, struct addrinfo *addrinfo_next)
436 {
437   /* Call to socket() can fail for some getaddrinfo results, try another. */
438   fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
439              addrinfo_next->ai_protocol);
440   if (fd == -1)
441   {
442     return gearmand_perror(errno, "socket()");
443   }
444 
445 #ifdef IPV6_V6ONLY
446   {
447     int flags= 1;
448     if (addrinfo_next->ai_family == AF_INET6)
449     {
450       flags= 1;
451       if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &flags, sizeof(flags)) == -1)
452       {
453         return gearmand_perror(errno, "setsockopt(IPV6_V6ONLY)");
454       }
455     }
456   }
457 #endif
458 
459   if (0) // Add in when we have server working as a library again.
460   {
461     if (FD_CLOEXEC)
462     {
463       int flags;
464       do
465       {
466         flags= fcntl(fd, F_GETFD, 0);
467       } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
468 
469       if (flags != -1)
470       {
471         int rval;
472         do
473         {
474           rval= fcntl (fd, F_SETFD, flags | FD_CLOEXEC);
475         } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
476         // we currently ignore the case where rval is -1
477       }
478     }
479   }
480 
481   {
482     int flags= 1;
483     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)) == -1)
484     {
485       return gearmand_perror(errno, "setsockopt(SO_REUSEADDR)");
486     }
487   }
488 
489   {
490     int flags= 1;
491     if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)) == -1)
492     {
493       return gearmand_perror(errno, "setsockopt(SO_KEEPALIVE)");
494     }
495   }
496 
497   {
498     struct linger ling= {0, 0};
499     if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) == -1)
500     {
501       return gearmand_perror(errno, "setsockopt(SO_LINGER)");
502     }
503   }
504 
505   {
506     int flags= 1;
507     if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)) == -1)
508     {
509       return gearmand_perror(errno, "setsockopt(TCP_NODELAY)");
510     }
511   }
512 
513   return GEARMAN_SUCCESS;
514 }
515 
516 static const uint32_t bind_timeout= 20; // Number is not special, but look at INFO messages if you decide to change it.
517 
518 typedef std::pair<std::string, std::string> host_port_t;
519 
_listen_init(gearmand_st * gearmand)520 static gearmand_error_t _listen_init(gearmand_st *gearmand)
521 {
522   for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
523   {
524     struct addrinfo hints;
525     struct addrinfo *addrinfo;
526 
527     gearmand_port_st *port= &gearmand->_port_list[x];
528 
529     memset(&hints, 0, sizeof(struct addrinfo));
530     hints.ai_flags= AI_PASSIVE;
531     hints.ai_socktype= SOCK_STREAM;
532 
533     {
534       int ret= getaddrinfo(gearmand->host, port->port, &hints, &addrinfo);
535       if (ret != 0)
536       {
537         char buffer[1024];
538 
539         int length= snprintf(buffer, sizeof(buffer), "%s:%s", gearmand->host ? gearmand->host : "<any>", port->port);
540         if (length <= 0 or size_t(length) >= sizeof(buffer))
541         {
542           buffer[0]= 0;
543         }
544         return gearmand_gai_error(buffer, ret);
545       }
546     }
547 
548     std::set<host_port_t> unique_hosts;
549     for (struct addrinfo *addrinfo_next= addrinfo; addrinfo_next != NULL;
550          addrinfo_next= addrinfo_next->ai_next)
551     {
552       char host[NI_MAXHOST];
553 
554       {
555         int ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
556                              NI_MAXHOST, port->port, NI_MAXSERV,
557                              NI_NUMERICHOST | NI_NUMERICSERV);
558         if (ret != 0)
559         {
560           gearmand_gai_error("getaddrinfo", ret);
561           strncpy(host, "-", sizeof(host));
562           strncpy(port->port, "-", sizeof(port->port));
563         }
564       }
565 
566       std::string host_string(host);
567       std::string port_string(port->port);
568       host_port_t check= std::make_pair(host_string, port_string);
569       if (unique_hosts.find(check) != unique_hosts.end())
570       {
571         gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Already listening on %s:%s", host, port->port);
572         continue;
573       }
574       unique_hosts.insert(check);
575 
576       gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Trying to listen on %s:%s", host, port->port);
577 
578       /*
579         @note logic for this pulled from Drizzle.
580 
581         Sometimes the port is not released fast enough when stopping and
582         restarting the server. This happens quite often with the test suite
583         on busy Linux systems. Retry to bind the address at these intervals:
584         Sleep intervals: 1, 2, 4,  6,  9, 13, 17, 22, ...
585         Retry at second: 1, 3, 7, 13, 22, 35, 52, 74, ...
586         Limit the sequence by drizzled_bind_timeout.
587       */
588       uint32_t waited;
589       uint32_t this_wait;
590       uint32_t retry;
591       int ret= -1;
592       int fd;
593       for (waited= 0, retry= 1; ; retry++, waited+= this_wait)
594       {
595         {
596           gearmand_error_t socket_ret;
597           if (gearmand_failed(socket_ret= set_socket(fd, addrinfo_next)))
598           {
599             gearmand_sockfd_close(fd);
600             return socket_ret;
601           }
602         }
603 
604         errno= 0;
605         if ((ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen)) == 0)
606         {
607           // Success
608           break;
609         }
610         // Protect our error
611         ret= errno;
612         gearmand_sockfd_close(fd);
613 
614         if (waited >= bind_timeout)
615         {
616           return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Timeout occured when calling bind() for %s:%s", host, port->port);
617         }
618 
619         if (ret != EADDRINUSE)
620         {
621           return gearmand_perror(ret, "bind");
622         }
623 
624         this_wait= retry * retry / 3 + 1;
625 
626         // We are in single user threads, so strerror() is fine.
627         gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u + %u >= %u",
628                            strerror(ret), host, port->port,
629                            waited, this_wait, bind_timeout);
630 
631         struct timespec requested;
632         requested.tv_sec= this_wait;
633         requested.tv_nsec= 0;
634 
635         nanosleep(&requested, NULL);
636       }
637 
638       if (listen(fd, gearmand->backlog) == -1)
639       {
640         gearmand_perror(errno, "listen");
641 
642         gearmand_sockfd_close(fd);
643 
644         return GEARMAN_ERRNO;
645       }
646 
647       // Scoping note for eventual transformation
648       {
649         int* fd_list= (int *)realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
650         if (fd_list == NULL)
651         {
652           gearmand_perror(errno, "realloc");
653 
654           gearmand_sockfd_close(fd);
655 
656           return GEARMAN_ERRNO;
657         }
658 
659         port->listen_fd= fd_list;
660       }
661 
662       port->listen_fd[port->listen_count]= fd;
663       port->listen_count++;
664 
665       gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Listening on %s:%s (%d)", host, port->port, fd);
666     }
667 
668     freeaddrinfo(addrinfo);
669 
670     /* Report last socket() error if we couldn't find an address to bind. */
671     if (port->listen_fd == NULL)
672     {
673       return gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Could not bind/listen to any addresses");
674     }
675 
676     assert(port->listen_event == NULL);
677     port->listen_event= (struct event *)malloc(sizeof(struct event) * port->listen_count);
678     if (port->listen_event == NULL)
679     {
680       return gearmand_merror("malloc", struct event, port->listen_count);
681     }
682 
683     for (uint32_t y= 0; y < port->listen_count; ++y)
684     {
685       event_set(&(port->listen_event[y]), port->listen_fd[y], EV_READ | EV_PERSIST, _listen_event, port);
686 
687       if (event_base_set(gearmand->base, &(port->listen_event[y])) == -1)
688       {
689         return gearmand_perror(errno, "event_base_set()");
690       }
691     }
692   }
693 
694   return GEARMAN_SUCCESS;
695 }
696 
_listen_close(gearmand_st * gearmand)697 static void _listen_close(gearmand_st *gearmand)
698 {
699   _listen_clear(gearmand);
700 
701   for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
702   {
703     for (uint32_t y= 0; y < gearmand->_port_list[x].listen_count; ++y)
704     {
705       if (gearmand->_port_list[x].listen_fd[y] >= 0)
706       {
707         gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Closing listening socket (%d)", gearmand->_port_list[x].listen_fd[y]);
708         gearmand_sockfd_close(gearmand->_port_list[x].listen_fd[y]);
709         gearmand->_port_list[x].listen_fd[y]= -1;
710       }
711     }
712   }
713 }
714 
_listen_watch(gearmand_st * gearmand)715 static gearmand_error_t _listen_watch(gearmand_st *gearmand)
716 {
717   if (gearmand->is_listen_event)
718   {
719     return GEARMAN_SUCCESS;
720   }
721 
722   for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
723   {
724     for (uint32_t y= 0; y < gearmand->_port_list[x].listen_count; y++)
725     {
726       gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Adding event for listening socket (%d)",
727                         gearmand->_port_list[x].listen_fd[y]);
728 
729       if (event_add(&(gearmand->_port_list[x].listen_event[y]), NULL) < 0)
730       {
731         gearmand_perror(errno, "event_add");
732         return GEARMAN_EVENT;
733       }
734     }
735   }
736 
737   gearmand->is_listen_event= true;
738   return GEARMAN_SUCCESS;
739 }
740 
_listen_clear(gearmand_st * gearmand)741 static void _listen_clear(gearmand_st *gearmand)
742 {
743   if (gearmand->is_listen_event)
744   {
745     for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
746     {
747       for (uint32_t y= 0; y < gearmand->_port_list[x].listen_count; y++)
748       {
749         gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
750                           "Clearing event for listening socket (%d)",
751                           gearmand->_port_list[x].listen_fd[y]);
752 
753         if (event_del(&(gearmand->_port_list[x].listen_event[y])) == -1)
754         {
755           gearmand_perror(errno, "We tried to event_del() an event which no longer existed");
756           assert_msg(false, "We tried to event_del() an event which no longer existed");
757         }
758       }
759     }
760 
761     gearmand->is_listen_event= false;
762   }
763 }
764 
_listen_event(int event_fd,short events,void * arg)765 static void _listen_event(int event_fd, short events __attribute__ ((unused)), void *arg)
766 {
767   gearmand_port_st *port= (gearmand_port_st *)arg;
768   struct sockaddr sa;
769 
770   socklen_t sa_len= sizeof(sa);
771 #if defined(HAVE_ACCEPT4) && HAVE_ACCEPT4
772   int fd= accept4(event_fd, &sa, &sa_len, SOCK_NONBLOCK); //  SOCK_NONBLOCK);
773 #else
774   int fd= accept(event_fd, &sa, &sa_len);
775 #endif
776 
777   if (fd == -1)
778   {
779     int local_error= errno;
780 
781     switch (local_error)
782     {
783     case EINTR:
784       return;
785 
786     case ECONNABORTED:
787     case EMFILE:
788       gearmand_perror(local_error, "accept");
789       return;
790 
791     default:
792       break;
793     }
794 
795     _clear_events(Gearmand());
796     Gearmand()->ret= gearmand_perror(local_error, "accept");
797     return;
798   }
799   gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "accept() %d", fd);
800 
801   /*
802     Since this is numeric, it should never fail. Even if it did we don't want to really error from it.
803   */
804   char host[NI_MAXHOST];
805   char port_str[NI_MAXSERV];
806   int error= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
807                          NI_NUMERICHOST | NI_NUMERICSERV);
808   if (error != 0)
809   {
810     gearmand_gai_error("getnameinfo", error);
811     strncpy(host, "-", sizeof(host));
812     strncpy(port_str, "-", sizeof(port_str));
813   }
814 
815   gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Accepted connection from %s:%s", host, port_str);
816 
817   gearmand_error_t ret= gearmand_con_create(Gearmand(), fd, host, port_str, port->add_fn);
818   if (ret == GEARMAN_MEMORY_ALLOCATION_FAILURE)
819   {
820     gearmand_sockfd_close(fd);
821     return;
822   }
823   else if (ret != GEARMAN_SUCCESS)
824   {
825     Gearmand()->ret= ret;
826     _clear_events(Gearmand());
827   }
828 }
829 
_wakeup_init(gearmand_st * gearmand)830 static gearmand_error_t _wakeup_init(gearmand_st *gearmand)
831 {
832   gearmand_debug("Creating wakeup pipe");
833 
834 #if defined(HAVE_PIPE2) && HAVE_PIPE2
835   if (pipe2(gearmand->wakeup_fd, O_NONBLOCK) == -1)
836   {
837     return gearmand_fatal_perror(errno, "pipe2(gearmand->wakeup_fd)");
838   }
839 #else
840   if (pipe(gearmand->wakeup_fd) == -1)
841   {
842     return gearmand_fatal_perror(errno, "pipe(gearmand->wakeup_fd)");
843   }
844 
845   int returned_flags;
846   if ((returned_flags= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0)) == -1)
847   {
848     return gearmand_fatal_perror(errno, "fcntl:F_GETFL");
849   }
850 
851   if (fcntl(gearmand->wakeup_fd[0], F_SETFL, returned_flags | O_NONBLOCK) == -1)
852   {
853     return gearmand_fatal_perror(errno, "fcntl(F_SETFL)");
854   }
855 #endif
856 
857   event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
858             EV_READ | EV_PERSIST, _wakeup_event, gearmand);
859   event_base_set(gearmand->base, &(gearmand->wakeup_event));
860 
861   return GEARMAN_SUCCESS;
862 }
863 
_wakeup_close(gearmand_st * gearmand)864 static void _wakeup_close(gearmand_st *gearmand)
865 {
866   _wakeup_clear(gearmand);
867 
868   if (gearmand->wakeup_fd[0] >= 0)
869   {
870     gearmand_debug("Closing wakeup pipe");
871     gearmand_pipe_close(gearmand->wakeup_fd[0]);
872     gearmand->wakeup_fd[0]= -1;
873     gearmand_pipe_close(gearmand->wakeup_fd[1]);
874     gearmand->wakeup_fd[1]= -1;
875   }
876 }
877 
_wakeup_watch(gearmand_st * gearmand)878 static gearmand_error_t _wakeup_watch(gearmand_st *gearmand)
879 {
880   if (gearmand->is_wakeup_event)
881   {
882     return GEARMAN_SUCCESS;
883   }
884 
885   gearmand_debug("Adding event for wakeup pipe");
886 
887   if (event_add(&(gearmand->wakeup_event), NULL) < 0)
888   {
889     gearmand_perror(errno, "event_add");
890     return GEARMAN_EVENT;
891   }
892 
893   gearmand->is_wakeup_event= true;
894   return GEARMAN_SUCCESS;
895 }
896 
_wakeup_clear(gearmand_st * gearmand)897 static void _wakeup_clear(gearmand_st *gearmand)
898 {
899   if (gearmand->is_wakeup_event)
900   {
901     gearmand_debug("Clearing event for wakeup pipe");
902     if (event_del(&(gearmand->wakeup_event)) < 0)
903     {
904       gearmand_perror(errno, "We tried to event_del() an event which no longer existed");
905       assert_msg(false, "We tried to event_del() an event which no longer existed");
906     }
907     gearmand->is_wakeup_event= false;
908   }
909 }
910 
_wakeup_event(int fd,short,void * arg)911 static void _wakeup_event(int fd, short, void *arg)
912 {
913   gearmand_st *gearmand= (gearmand_st *)arg;
914 
915   while (1)
916   {
917     uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
918     ssize_t ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
919     if (ret == 0)
920     {
921       _clear_events(gearmand);
922       gearmand_fatal("read(EOF)");
923       gearmand->ret= GEARMAN_PIPE_EOF;
924       return;
925     }
926     else if (ret == -1)
927     {
928       int local_error= errno;
929       if (local_error == EINTR)
930       {
931         continue;
932       }
933 
934       if (local_error == EAGAIN)
935       {
936         break;
937       }
938 
939       _clear_events(gearmand);
940       gearmand->ret= gearmand_perror(local_error, "_wakeup_event:read");
941       return;
942     }
943 
944     for (ssize_t x= 0; x < ret; ++x)
945     {
946       switch ((gearmand_wakeup_t)buffer[x])
947       {
948       case GEARMAND_WAKEUP_PAUSE:
949         gearmand_debug("Received PAUSE wakeup event");
950         _clear_events(gearmand);
951         gearmand->ret= GEARMAN_PAUSE;
952         break;
953 
954       case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
955         gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
956         _listen_close(gearmand);
957 
958         for (gearmand_thread_st* thread= gearmand->thread_list;
959              thread != NULL;
960              thread= thread->next)
961         {
962           gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
963         }
964 
965         gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
966         break;
967 
968       case GEARMAND_WAKEUP_SHUTDOWN:
969         gearmand_debug("Received SHUTDOWN wakeup event");
970         _clear_events(gearmand);
971         gearmand->ret= GEARMAN_SHUTDOWN;
972         break;
973 
974       case GEARMAND_WAKEUP_CON:
975       case GEARMAND_WAKEUP_RUN:
976         gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
977         _clear_events(gearmand);
978         gearmand->ret= GEARMAN_UNKNOWN_STATE;
979         break;
980       }
981     }
982   }
983 }
984 
_watch_events(gearmand_st * gearmand)985 static gearmand_error_t _watch_events(gearmand_st *gearmand)
986 {
987   gearmand_error_t ret= _listen_watch(gearmand);
988   if (ret != GEARMAN_SUCCESS)
989   {
990     return ret;
991   }
992 
993   ret= _wakeup_watch(gearmand);
994   if (ret != GEARMAN_SUCCESS)
995   {
996     return ret;
997   }
998 
999   return GEARMAN_SUCCESS;
1000 }
1001 
_clear_events(gearmand_st * gearmand)1002 static void _clear_events(gearmand_st *gearmand)
1003 {
1004   _listen_clear(gearmand);
1005   _wakeup_clear(gearmand);
1006 
1007   /*
1008     If we are not threaded, tell the fake thread to shutdown now to clear
1009     connections. Otherwise we will never exit the libevent loop.
1010   */
1011   if (gearmand->threads == 0 && gearmand->thread_list != NULL)
1012   {
1013     gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
1014   }
1015 }
1016 
_close_events(gearmand_st * gearmand)1017 static void _close_events(gearmand_st *gearmand)
1018 {
1019   _listen_close(gearmand);
1020   _wakeup_close(gearmand);
1021 }
1022 
1023 /** @} */
1024 
1025 /*
1026  * Public Definitions
1027  */
1028 
gearmand_version(void)1029 const char *gearmand_version(void)
1030 {
1031     return PACKAGE_VERSION;
1032 }
1033 
gearmand_bugreport(void)1034 const char *gearmand_bugreport(void)
1035 {
1036     return PACKAGE_BUGREPORT;
1037 }
1038 
gearmand_verbose_name(gearmand_verbose_t verbose)1039 const char *gearmand_verbose_name(gearmand_verbose_t verbose)
1040 {
1041   switch (verbose)
1042   {
1043   case GEARMAND_VERBOSE_FATAL:
1044     return "FATAL";
1045 
1046   case GEARMAND_VERBOSE_ALERT:
1047     return "ALERT";
1048 
1049   case GEARMAND_VERBOSE_CRITICAL:
1050     return "CRITICAL";
1051 
1052   case GEARMAND_VERBOSE_ERROR:
1053     return "ERROR";
1054 
1055   case GEARMAND_VERBOSE_WARN:
1056     return "WARNING";
1057 
1058   case GEARMAND_VERBOSE_NOTICE:
1059     return "NOTICE";
1060 
1061   case GEARMAND_VERBOSE_INFO:
1062     return "INFO";
1063 
1064   case GEARMAND_VERBOSE_DEBUG:
1065     return "DEBUG";
1066 
1067   default:
1068     break;
1069   }
1070 
1071   return "UNKNOWN";
1072 }
1073 
gearmand_verbose_check(const char * name,gearmand_verbose_t & level)1074 bool gearmand_verbose_check(const char *name, gearmand_verbose_t& level)
1075 {
1076   bool success= true;
1077   if (strcmp("FATAL", name) == 0)
1078   {
1079     level= GEARMAND_VERBOSE_FATAL;
1080   }
1081   else if (strcmp("ALERT", name) == 0)
1082   {
1083     level= GEARMAND_VERBOSE_ALERT;
1084   }
1085   else if (strcmp("CRITICAL", name) == 0)
1086   {
1087     level= GEARMAND_VERBOSE_CRITICAL;
1088   }
1089   else if (strcmp("ERROR", name) == 0)
1090   {
1091     level= GEARMAND_VERBOSE_ERROR;
1092   }
1093   else if (strcmp("WARNING", name) == 0)
1094   {
1095     level= GEARMAND_VERBOSE_WARN;
1096   }
1097   else if (strcmp("NOTICE", name) == 0)
1098   {
1099     level= GEARMAND_VERBOSE_NOTICE;
1100   }
1101   else if (strcmp("INFO", name) == 0)
1102   {
1103     level= GEARMAND_VERBOSE_INFO;
1104   }
1105   else if (strcmp("DEBUG", name) == 0)
1106   {
1107     level= GEARMAND_VERBOSE_DEBUG;
1108   }
1109   else
1110   {
1111     success= false;
1112   }
1113 
1114   return success;
1115 }
1116 
gearman_server_create(gearman_server_st & server,const uint32_t job_retries_arg,const char * job_handle_prefix,uint8_t worker_wakeup_arg,bool round_robin_arg,uint32_t hashtable_buckets)1117 static bool gearman_server_create(gearman_server_st& server,
1118                                   const uint32_t job_retries_arg,
1119                                   const char *job_handle_prefix,
1120                                   uint8_t worker_wakeup_arg,
1121                                   bool round_robin_arg,
1122                                   uint32_t hashtable_buckets)
1123 {
1124   server.state.queue_startup= false;
1125   server.flags.round_robin= round_robin_arg;
1126   server.flags.threaded= false;
1127   server.shutdown= false;
1128   server.shutdown_graceful= false;
1129   server.proc_wakeup= false;
1130   server.proc_shutdown= false;
1131   server.job_retries= job_retries_arg;
1132   server.worker_wakeup= worker_wakeup_arg;
1133   server.thread_count= 0;
1134   server.free_packet_count= 0;
1135   server.function_count= 0;
1136   server.job_count= 0;
1137   server.unique_count= 0;
1138   server.free_job_count= 0;
1139   server.free_client_count= 0;
1140   server.free_worker_count= 0;
1141   server.thread_list= NULL;
1142   server.free_packet_list= NULL;
1143   server.free_job_list= NULL;
1144   server.free_client_list= NULL;
1145   server.free_worker_list= NULL;
1146 
1147   server.queue_version= QUEUE_VERSION_NONE;
1148   server.queue.object= NULL;
1149   server.queue.functions= NULL;
1150 
1151   server.function_hash= (gearman_server_function_st **) calloc(GEARMAND_DEFAULT_HASH_SIZE, sizeof(gearman_server_function_st *));
1152   if (server.function_hash == NULL)
1153   {
1154     gearmand_merror("calloc", server.function_hash, GEARMAND_DEFAULT_HASH_SIZE);
1155     return false;
1156   }
1157 
1158   server.hashtable_buckets= hashtable_buckets;
1159   server.job_hash= (gearman_server_job_st **) calloc(hashtable_buckets, sizeof(gearman_server_job_st *));
1160   if (server.job_hash == NULL)
1161   {
1162     gearmand_merror("calloc", server.job_hash, hashtable_buckets);
1163     return false;
1164   }
1165 
1166   server.unique_hash= (gearman_server_job_st **) calloc(hashtable_buckets, sizeof(gearman_server_job_st *));
1167   if (server.unique_hash == NULL)
1168   {
1169     gearmand_merror("calloc", server.unique_hash, hashtable_buckets);
1170     return false;
1171   }
1172 
1173   int checked_length= -1;
1174   if (job_handle_prefix)
1175   {
1176     checked_length= snprintf(server.job_handle_prefix, GEARMAND_JOB_HANDLE_SIZE, "%s", job_handle_prefix);
1177   }
1178   else
1179   {
1180     struct utsname un;
1181     if (uname(&un) == -1)
1182     {
1183       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "uname(&un) failed");
1184       gearman_server_free(server);
1185       return false;
1186     }
1187     checked_length= snprintf(server.job_handle_prefix, GEARMAND_JOB_HANDLE_SIZE, "H:%s", un.nodename);
1188   }
1189 
1190   if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
1191   {
1192     gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Available length %d not enough to store job handle prefix %s",
1193                        GEARMAND_JOB_HANDLE_SIZE, server.job_handle_prefix);
1194     gearman_server_free(server);
1195     return false;
1196   }
1197 
1198   server.job_handle_count= 1;
1199 
1200   return true;
1201 }
1202