1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2008 Brian Aker, Eric Day
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39 /**
40 * @file
41 * @brief Gearmand Definitions
42 */
43
44 #include "gear_config.h"
45 #include "libgearman-server/common.h"
46
47 #include <cerrno>
48 #include <netdb.h>
49 #include <sys/socket.h>
50 #include <sys/types.h>
51 #include <sys/utsname.h>
52 #include <unistd.h>
53
54 #include <set>
55 #include <string>
56 #include <vector>
57
58 #include <libgearman-server/gearmand.h>
59
60 #include <libgearman-server/struct/port.h>
61 #include <libgearman-server/plugins.h>
62 #include <libgearman-server/timer.h>
63
64 #include "util/memory.h"
65 using namespace org::tangent;
66
67 using namespace gearmand;
68
69 #ifndef SOCK_NONBLOCK
70 # define SOCK_NONBLOCK 0
71 #endif
72
73 /*
74 * Private declarations
75 */
76
77 /**
78 * @addtogroup gearmand_private Private Gearman Daemon Functions
79 * @ingroup gearmand
80 * @{
81 */
82
83 static gearmand_error_t _listen_init(gearmand_st *gearmand);
84 static void _listen_close(gearmand_st *gearmand);
85 static gearmand_error_t _listen_watch(gearmand_st *gearmand);
86 static void _listen_clear(gearmand_st *gearmand);
87 static void _listen_event(int fd, short events, void *arg);
88
89 static gearmand_error_t _wakeup_init(gearmand_st *gearmand);
90 static void _wakeup_close(gearmand_st *gearmand);
91 static gearmand_error_t _wakeup_watch(gearmand_st *gearmand);
92 static void _wakeup_clear(gearmand_st *gearmand);
93 static void _wakeup_event(int fd, short events, void *arg);
94
95 static gearmand_error_t _watch_events(gearmand_st *gearmand);
96 static void _clear_events(gearmand_st *gearmand);
97 static void _close_events(gearmand_st *gearmand);
98
99 static bool gearman_server_create(gearman_server_st& server,
100 const uint32_t job_retries,
101 const char *job_handle_prefix,
102 uint8_t worker_wakeup,
103 bool round_robin,
104 uint32_t hashtable_buckets);
105 static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
106 void *context, const gearmand_verbose_t verbose);
107
108
gearman_server_free(gearman_server_st & server)109 static void gearman_server_free(gearman_server_st& server)
110 {
111 /* All threads should be cleaned up before calling this. */
112 assert(server.thread_list == NULL);
113
114 for (uint32_t key= 0; key < server.hashtable_buckets; key++)
115 {
116 while (server.job_hash[key] != NULL)
117 {
118 gearman_server_job_free(server.job_hash[key]);
119 }
120 }
121
122 for (uint32_t function_key= 0; function_key < GEARMAND_DEFAULT_HASH_SIZE;
123 function_key++)
124 {
125 while(server.function_hash[function_key] != NULL)
126 {
127 gearman_server_function_free(&server, server.function_hash[function_key]);
128 }
129 }
130
131 while (server.free_packet_list != NULL)
132 {
133 gearman_server_packet_st *packet= server.free_packet_list;
134 server.free_packet_list= packet->next;
135 delete packet;
136 }
137
138 while (server.free_job_list != NULL)
139 {
140 gearman_server_job_st* job= server.free_job_list;
141 server.free_job_list= job->next;
142 delete job;
143 }
144
145 while (server.free_client_list != NULL)
146 {
147 gearman_server_client_st* client= server.free_client_list;
148 server.free_client_list= client->con_next;
149 delete client;
150 }
151
152 while (server.free_worker_list != NULL)
153 {
154 gearman_server_worker_st* worker= server.free_worker_list;
155 server.free_worker_list= worker->con_next;
156 delete worker;
157 }
158
159 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "removing queue: %s", (server.queue_version == QUEUE_VERSION_CLASS) ? "CLASS" : "FUNCTION");
160 if (server.queue_version == QUEUE_VERSION_CLASS)
161 {
162 delete server.queue.object;
163 assert(server.queue.functions == NULL);
164 }
165 else if (server.queue_version == QUEUE_VERSION_FUNCTION)
166 {
167 delete server.queue.functions;
168 assert(server.queue.object == NULL);
169 }
170 else
171 {
172 gearmand_debug("Unknown queue type in removal");
173 }
174
175 free(server.job_hash);
176 free(server.unique_hash);
177 free(server.function_hash);
178 }
179
180 /** @} */
181
182 #ifndef __INTEL_COMPILER
183 #pragma GCC diagnostic ignored "-Wold-style-cast"
184 #endif
185
186 /*
187 * Public definitions
188 */
189
190 static gearmand_st *_global_gearmand= NULL;
191
Gearmand(void)192 gearmand_st *Gearmand(void)
193 {
194 if (!_global_gearmand)
195 {
196 gearmand_error("Gearmand() was called before it was allocated");
197 assert_msg(false, "Gearmand() was called before it was allocated");
198 }
199 assert(_global_gearmand);
200 return _global_gearmand;
201 }
202
gearmand_create(const char * host_arg,uint32_t threads_arg,int backlog_arg,const uint32_t job_retries,const char * job_handle_prefix,uint8_t worker_wakeup,gearmand_log_fn * log_function,void * log_context,const gearmand_verbose_t verbose_arg,bool round_robin,bool exceptions_,uint32_t hashtable_buckets)203 gearmand_st *gearmand_create(const char *host_arg,
204 uint32_t threads_arg,
205 int backlog_arg,
206 const uint32_t job_retries,
207 const char *job_handle_prefix,
208 uint8_t worker_wakeup,
209 gearmand_log_fn *log_function, void *log_context, const gearmand_verbose_t verbose_arg,
210 bool round_robin,
211 bool exceptions_,
212 uint32_t hashtable_buckets)
213 {
214 assert(_global_gearmand == NULL);
215 if (_global_gearmand)
216 {
217 gearmand_error("You have called gearmand_create() twice within your application.");
218 _exit(EXIT_FAILURE);
219 }
220
221 gearmand_st* gearmand= new (std::nothrow) gearmand_st(host_arg, threads_arg, backlog_arg, verbose_arg, exceptions_);
222 if (gearmand == NULL)
223 {
224 gearmand_perror(errno, "Failed to new() gearmand_st");
225 return NULL;
226 }
227 _global_gearmand= gearmand;
228
229 if (gearman_server_create(gearmand->server, job_retries,
230 job_handle_prefix, worker_wakeup,
231 round_robin, hashtable_buckets) == false)
232 {
233 delete gearmand;
234 _global_gearmand= NULL;
235 return NULL;
236 }
237
238 gearmand_set_log_fn(gearmand, log_function, log_context, verbose_arg);
239
240 return gearmand;
241 }
242
gearmand_free(gearmand_st * gearmand)243 void gearmand_free(gearmand_st *gearmand)
244 {
245 if (gearmand)
246 {
247 _close_events(gearmand);
248
249 if (gearmand->threads > 0)
250 {
251 gearmand_debug("Shutting down all threads");
252 }
253
254 while (gearmand->thread_list != NULL)
255 {
256 gearmand_thread_free(gearmand->thread_list);
257 }
258
259 while (gearmand->free_dcon_list != NULL)
260 {
261 gearmand_con_st* dcon= gearmand->free_dcon_list;
262 gearmand->free_dcon_list= dcon->next;
263 delete dcon;
264 }
265
266 if (gearmand->base != NULL)
267 {
268 event_base_free(gearmand->base);
269 gearmand->base= NULL;
270 }
271
272 gearman_server_free(gearmand->server);
273
274 gearmand_info("Shutdown complete");
275
276 delete gearmand;
277 }
278 }
279
gearmand_set_log_fn(gearmand_st * gearmand,gearmand_log_fn * function,void * context,const gearmand_verbose_t verbose)280 static void gearmand_set_log_fn(gearmand_st *gearmand, gearmand_log_fn *function,
281 void *context, const gearmand_verbose_t verbose)
282 {
283 gearmand->log_fn= function;
284 gearmand->log_context= context;
285 gearmand->verbose= verbose;
286 }
287
gearmand_exceptions(gearmand_st * gearmand)288 bool gearmand_exceptions(gearmand_st *gearmand)
289 {
290 return gearmand->exceptions();
291 }
292
gearmand_port_add(gearmand_st * gearmand,const char * port,gearmand_connection_add_fn * function)293 gearmand_error_t gearmand_port_add(gearmand_st *gearmand, const char *port,
294 gearmand_connection_add_fn *function)
295 {
296 gearmand->_port_list.resize(gearmand->_port_list.size() +1);
297
298 strncpy(gearmand->_port_list.back().port, port, NI_MAXSERV);
299 gearmand->_port_list.back().add_fn= function;
300
301 return GEARMAN_SUCCESS;
302 }
303
gearmand_server(gearmand_st * gearmand)304 gearman_server_st *gearmand_server(gearmand_st *gearmand)
305 {
306 return &gearmand->server;
307 }
308
gearmand_run(gearmand_st * gearmand)309 gearmand_error_t gearmand_run(gearmand_st *gearmand)
310 {
311 libgearman::server::Epoch epoch;
312
313 /* Initialize server components. */
314 if (gearmand->base == NULL)
315 {
316 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up(%lu), verbose set to %s",
317 (unsigned long)(getpid()),
318 gearmand_verbose_name(gearmand->verbose));
319
320 if (gearmand->threads > 0)
321 {
322 /* Set the number of free connection structures each thread should keep
323 around before the main thread is forced to take them. We compute this
324 here so we don't need to on every new connection. */
325 gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
326 gearmand->threads) / 2);
327 }
328
329 gearmand->base= static_cast<struct event_base *>(event_base_new());
330 if (gearmand->base == NULL)
331 {
332 gearmand_fatal("event_base_new(NULL)");
333 return GEARMAN_EVENT;
334 }
335
336 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Method for libevent: %s", event_base_get_method(gearmand->base));
337
338 gearmand->ret= _listen_init(gearmand);
339 if (gearmand->ret != GEARMAN_SUCCESS)
340 {
341 return gearmand->ret;
342 }
343
344 gearmand->ret= _wakeup_init(gearmand);
345 if (gearmand->ret != GEARMAN_SUCCESS)
346 {
347 return gearmand->ret;
348 }
349
350 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Creating %u threads", gearmand->threads);
351
352 /* If we have 0 threads we still need to create a fake one for context. */
353 uint32_t x= 0;
354 do
355 {
356 gearmand->ret= gearmand_thread_create(gearmand);
357 if (gearmand->ret != GEARMAN_SUCCESS)
358 return gearmand->ret;
359 x++;
360 }
361 while (x < gearmand->threads);
362
363 gearmand_debug("replaying queue: begin");
364 gearmand->ret= gearman_server_queue_replay(gearmand->server);
365 if (gearmand_failed(gearmand->ret))
366 {
367 return gearmand_gerror("failed to reload queue", gearmand->ret);
368 }
369 gearmand_debug("replaying queue: end");
370 }
371
372 gearmand->ret= _watch_events(gearmand);
373 if (gearmand_failed(gearmand->ret))
374 {
375 return gearmand->ret;
376 }
377
378 gearmand_debug("Entering main event loop");
379
380 if (event_base_loop(gearmand->base, 0) == -1)
381 {
382 gearmand_fatal("event_base_loop(-1)");
383 return GEARMAN_EVENT;
384 }
385
386 gearmand_debug("Exited main event loop");
387
388 return gearmand->ret;
389 }
390
gearmand_wakeup(gearmand_st * gearmand,gearmand_wakeup_t wakeup)391 void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
392 {
393 /*
394 If this fails, there is not much we can really do. This should never fail though if the main gearmand thread is still active.
395 */
396 if (gearmand->wakeup_fd[1] != -1)
397 {
398 int limit= 5;
399 while (--limit) // limit is for EINTR
400 {
401 ssize_t written;
402 uint8_t buffer= wakeup;
403 if ((written= write(gearmand->wakeup_fd[1], &buffer, 1)) != 1)
404 {
405 if (written < 0)
406 {
407 switch (errno)
408 {
409 case EINTR:
410 continue;
411
412 default:
413 break;
414 }
415
416 gearmand_perror(errno, gearmand_strwakeup(wakeup));
417 }
418 else
419 {
420 gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
421 "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
422 }
423 }
424
425 return;
426 }
427 }
428 }
429
430
431 /*
432 * Private definitions
433 */
434
set_socket(int & fd,struct addrinfo * addrinfo_next)435 gearmand_error_t set_socket(int& fd, struct addrinfo *addrinfo_next)
436 {
437 /* Call to socket() can fail for some getaddrinfo results, try another. */
438 fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
439 addrinfo_next->ai_protocol);
440 if (fd == -1)
441 {
442 return gearmand_perror(errno, "socket()");
443 }
444
445 #ifdef IPV6_V6ONLY
446 {
447 int flags= 1;
448 if (addrinfo_next->ai_family == AF_INET6)
449 {
450 flags= 1;
451 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &flags, sizeof(flags)) == -1)
452 {
453 return gearmand_perror(errno, "setsockopt(IPV6_V6ONLY)");
454 }
455 }
456 }
457 #endif
458
459 if (0) // Add in when we have server working as a library again.
460 {
461 if (FD_CLOEXEC)
462 {
463 int flags;
464 do
465 {
466 flags= fcntl(fd, F_GETFD, 0);
467 } while (flags == -1 and (errno == EINTR or errno == EAGAIN));
468
469 if (flags != -1)
470 {
471 int rval;
472 do
473 {
474 rval= fcntl (fd, F_SETFD, flags | FD_CLOEXEC);
475 } while (rval == -1 && (errno == EINTR or errno == EAGAIN));
476 // we currently ignore the case where rval is -1
477 }
478 }
479 }
480
481 {
482 int flags= 1;
483 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)) == -1)
484 {
485 return gearmand_perror(errno, "setsockopt(SO_REUSEADDR)");
486 }
487 }
488
489 {
490 int flags= 1;
491 if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)) == -1)
492 {
493 return gearmand_perror(errno, "setsockopt(SO_KEEPALIVE)");
494 }
495 }
496
497 {
498 struct linger ling= {0, 0};
499 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) == -1)
500 {
501 return gearmand_perror(errno, "setsockopt(SO_LINGER)");
502 }
503 }
504
505 {
506 int flags= 1;
507 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)) == -1)
508 {
509 return gearmand_perror(errno, "setsockopt(TCP_NODELAY)");
510 }
511 }
512
513 return GEARMAN_SUCCESS;
514 }
515
516 static const uint32_t bind_timeout= 20; // Number is not special, but look at INFO messages if you decide to change it.
517
518 typedef std::pair<std::string, std::string> host_port_t;
519
_listen_init(gearmand_st * gearmand)520 static gearmand_error_t _listen_init(gearmand_st *gearmand)
521 {
522 for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
523 {
524 struct addrinfo hints;
525 struct addrinfo *addrinfo;
526
527 gearmand_port_st *port= &gearmand->_port_list[x];
528
529 memset(&hints, 0, sizeof(struct addrinfo));
530 hints.ai_flags= AI_PASSIVE;
531 hints.ai_socktype= SOCK_STREAM;
532
533 {
534 int ret= getaddrinfo(gearmand->host, port->port, &hints, &addrinfo);
535 if (ret != 0)
536 {
537 char buffer[1024];
538
539 int length= snprintf(buffer, sizeof(buffer), "%s:%s", gearmand->host ? gearmand->host : "<any>", port->port);
540 if (length <= 0 or size_t(length) >= sizeof(buffer))
541 {
542 buffer[0]= 0;
543 }
544 return gearmand_gai_error(buffer, ret);
545 }
546 }
547
548 std::set<host_port_t> unique_hosts;
549 for (struct addrinfo *addrinfo_next= addrinfo; addrinfo_next != NULL;
550 addrinfo_next= addrinfo_next->ai_next)
551 {
552 char host[NI_MAXHOST];
553
554 {
555 int ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
556 NI_MAXHOST, port->port, NI_MAXSERV,
557 NI_NUMERICHOST | NI_NUMERICSERV);
558 if (ret != 0)
559 {
560 gearmand_gai_error("getaddrinfo", ret);
561 strncpy(host, "-", sizeof(host));
562 strncpy(port->port, "-", sizeof(port->port));
563 }
564 }
565
566 std::string host_string(host);
567 std::string port_string(port->port);
568 host_port_t check= std::make_pair(host_string, port_string);
569 if (unique_hosts.find(check) != unique_hosts.end())
570 {
571 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Already listening on %s:%s", host, port->port);
572 continue;
573 }
574 unique_hosts.insert(check);
575
576 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Trying to listen on %s:%s", host, port->port);
577
578 /*
579 @note logic for this pulled from Drizzle.
580
581 Sometimes the port is not released fast enough when stopping and
582 restarting the server. This happens quite often with the test suite
583 on busy Linux systems. Retry to bind the address at these intervals:
584 Sleep intervals: 1, 2, 4, 6, 9, 13, 17, 22, ...
585 Retry at second: 1, 3, 7, 13, 22, 35, 52, 74, ...
586 Limit the sequence by drizzled_bind_timeout.
587 */
588 uint32_t waited;
589 uint32_t this_wait;
590 uint32_t retry;
591 int ret= -1;
592 int fd;
593 for (waited= 0, retry= 1; ; retry++, waited+= this_wait)
594 {
595 {
596 gearmand_error_t socket_ret;
597 if (gearmand_failed(socket_ret= set_socket(fd, addrinfo_next)))
598 {
599 gearmand_sockfd_close(fd);
600 return socket_ret;
601 }
602 }
603
604 errno= 0;
605 if ((ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen)) == 0)
606 {
607 // Success
608 break;
609 }
610 // Protect our error
611 ret= errno;
612 gearmand_sockfd_close(fd);
613
614 if (waited >= bind_timeout)
615 {
616 return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Timeout occured when calling bind() for %s:%s", host, port->port);
617 }
618
619 if (ret != EADDRINUSE)
620 {
621 return gearmand_perror(ret, "bind");
622 }
623
624 this_wait= retry * retry / 3 + 1;
625
626 // We are in single user threads, so strerror() is fine.
627 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u + %u >= %u",
628 strerror(ret), host, port->port,
629 waited, this_wait, bind_timeout);
630
631 struct timespec requested;
632 requested.tv_sec= this_wait;
633 requested.tv_nsec= 0;
634
635 nanosleep(&requested, NULL);
636 }
637
638 if (listen(fd, gearmand->backlog) == -1)
639 {
640 gearmand_perror(errno, "listen");
641
642 gearmand_sockfd_close(fd);
643
644 return GEARMAN_ERRNO;
645 }
646
647 // Scoping note for eventual transformation
648 {
649 int* fd_list= (int *)realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
650 if (fd_list == NULL)
651 {
652 gearmand_perror(errno, "realloc");
653
654 gearmand_sockfd_close(fd);
655
656 return GEARMAN_ERRNO;
657 }
658
659 port->listen_fd= fd_list;
660 }
661
662 port->listen_fd[port->listen_count]= fd;
663 port->listen_count++;
664
665 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Listening on %s:%s (%d)", host, port->port, fd);
666 }
667
668 freeaddrinfo(addrinfo);
669
670 /* Report last socket() error if we couldn't find an address to bind. */
671 if (port->listen_fd == NULL)
672 {
673 return gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Could not bind/listen to any addresses");
674 }
675
676 assert(port->listen_event == NULL);
677 port->listen_event= (struct event *)malloc(sizeof(struct event) * port->listen_count);
678 if (port->listen_event == NULL)
679 {
680 return gearmand_merror("malloc", struct event, port->listen_count);
681 }
682
683 for (uint32_t y= 0; y < port->listen_count; ++y)
684 {
685 event_set(&(port->listen_event[y]), port->listen_fd[y], EV_READ | EV_PERSIST, _listen_event, port);
686
687 if (event_base_set(gearmand->base, &(port->listen_event[y])) == -1)
688 {
689 return gearmand_perror(errno, "event_base_set()");
690 }
691 }
692 }
693
694 return GEARMAN_SUCCESS;
695 }
696
_listen_close(gearmand_st * gearmand)697 static void _listen_close(gearmand_st *gearmand)
698 {
699 _listen_clear(gearmand);
700
701 for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
702 {
703 for (uint32_t y= 0; y < gearmand->_port_list[x].listen_count; ++y)
704 {
705 if (gearmand->_port_list[x].listen_fd[y] >= 0)
706 {
707 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Closing listening socket (%d)", gearmand->_port_list[x].listen_fd[y]);
708 gearmand_sockfd_close(gearmand->_port_list[x].listen_fd[y]);
709 gearmand->_port_list[x].listen_fd[y]= -1;
710 }
711 }
712 }
713 }
714
_listen_watch(gearmand_st * gearmand)715 static gearmand_error_t _listen_watch(gearmand_st *gearmand)
716 {
717 if (gearmand->is_listen_event)
718 {
719 return GEARMAN_SUCCESS;
720 }
721
722 for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
723 {
724 for (uint32_t y= 0; y < gearmand->_port_list[x].listen_count; y++)
725 {
726 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Adding event for listening socket (%d)",
727 gearmand->_port_list[x].listen_fd[y]);
728
729 if (event_add(&(gearmand->_port_list[x].listen_event[y]), NULL) < 0)
730 {
731 gearmand_perror(errno, "event_add");
732 return GEARMAN_EVENT;
733 }
734 }
735 }
736
737 gearmand->is_listen_event= true;
738 return GEARMAN_SUCCESS;
739 }
740
_listen_clear(gearmand_st * gearmand)741 static void _listen_clear(gearmand_st *gearmand)
742 {
743 if (gearmand->is_listen_event)
744 {
745 for (uint32_t x= 0; x < gearmand->_port_list.size(); ++x)
746 {
747 for (uint32_t y= 0; y < gearmand->_port_list[x].listen_count; y++)
748 {
749 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
750 "Clearing event for listening socket (%d)",
751 gearmand->_port_list[x].listen_fd[y]);
752
753 if (event_del(&(gearmand->_port_list[x].listen_event[y])) == -1)
754 {
755 gearmand_perror(errno, "We tried to event_del() an event which no longer existed");
756 assert_msg(false, "We tried to event_del() an event which no longer existed");
757 }
758 }
759 }
760
761 gearmand->is_listen_event= false;
762 }
763 }
764
_listen_event(int event_fd,short events,void * arg)765 static void _listen_event(int event_fd, short events __attribute__ ((unused)), void *arg)
766 {
767 gearmand_port_st *port= (gearmand_port_st *)arg;
768 struct sockaddr sa;
769
770 socklen_t sa_len= sizeof(sa);
771 #if defined(HAVE_ACCEPT4) && HAVE_ACCEPT4
772 int fd= accept4(event_fd, &sa, &sa_len, SOCK_NONBLOCK); // SOCK_NONBLOCK);
773 #else
774 int fd= accept(event_fd, &sa, &sa_len);
775 #endif
776
777 if (fd == -1)
778 {
779 int local_error= errno;
780
781 switch (local_error)
782 {
783 case EINTR:
784 return;
785
786 case ECONNABORTED:
787 case EMFILE:
788 gearmand_perror(local_error, "accept");
789 return;
790
791 default:
792 break;
793 }
794
795 _clear_events(Gearmand());
796 Gearmand()->ret= gearmand_perror(local_error, "accept");
797 return;
798 }
799 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "accept() %d", fd);
800
801 /*
802 Since this is numeric, it should never fail. Even if it did we don't want to really error from it.
803 */
804 char host[NI_MAXHOST];
805 char port_str[NI_MAXSERV];
806 int error= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
807 NI_NUMERICHOST | NI_NUMERICSERV);
808 if (error != 0)
809 {
810 gearmand_gai_error("getnameinfo", error);
811 strncpy(host, "-", sizeof(host));
812 strncpy(port_str, "-", sizeof(port_str));
813 }
814
815 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Accepted connection from %s:%s", host, port_str);
816
817 gearmand_error_t ret= gearmand_con_create(Gearmand(), fd, host, port_str, port->add_fn);
818 if (ret == GEARMAN_MEMORY_ALLOCATION_FAILURE)
819 {
820 gearmand_sockfd_close(fd);
821 return;
822 }
823 else if (ret != GEARMAN_SUCCESS)
824 {
825 Gearmand()->ret= ret;
826 _clear_events(Gearmand());
827 }
828 }
829
_wakeup_init(gearmand_st * gearmand)830 static gearmand_error_t _wakeup_init(gearmand_st *gearmand)
831 {
832 gearmand_debug("Creating wakeup pipe");
833
834 #if defined(HAVE_PIPE2) && HAVE_PIPE2
835 if (pipe2(gearmand->wakeup_fd, O_NONBLOCK) == -1)
836 {
837 return gearmand_fatal_perror(errno, "pipe2(gearmand->wakeup_fd)");
838 }
839 #else
840 if (pipe(gearmand->wakeup_fd) == -1)
841 {
842 return gearmand_fatal_perror(errno, "pipe(gearmand->wakeup_fd)");
843 }
844
845 int returned_flags;
846 if ((returned_flags= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0)) == -1)
847 {
848 return gearmand_fatal_perror(errno, "fcntl:F_GETFL");
849 }
850
851 if (fcntl(gearmand->wakeup_fd[0], F_SETFL, returned_flags | O_NONBLOCK) == -1)
852 {
853 return gearmand_fatal_perror(errno, "fcntl(F_SETFL)");
854 }
855 #endif
856
857 event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
858 EV_READ | EV_PERSIST, _wakeup_event, gearmand);
859 event_base_set(gearmand->base, &(gearmand->wakeup_event));
860
861 return GEARMAN_SUCCESS;
862 }
863
_wakeup_close(gearmand_st * gearmand)864 static void _wakeup_close(gearmand_st *gearmand)
865 {
866 _wakeup_clear(gearmand);
867
868 if (gearmand->wakeup_fd[0] >= 0)
869 {
870 gearmand_debug("Closing wakeup pipe");
871 gearmand_pipe_close(gearmand->wakeup_fd[0]);
872 gearmand->wakeup_fd[0]= -1;
873 gearmand_pipe_close(gearmand->wakeup_fd[1]);
874 gearmand->wakeup_fd[1]= -1;
875 }
876 }
877
_wakeup_watch(gearmand_st * gearmand)878 static gearmand_error_t _wakeup_watch(gearmand_st *gearmand)
879 {
880 if (gearmand->is_wakeup_event)
881 {
882 return GEARMAN_SUCCESS;
883 }
884
885 gearmand_debug("Adding event for wakeup pipe");
886
887 if (event_add(&(gearmand->wakeup_event), NULL) < 0)
888 {
889 gearmand_perror(errno, "event_add");
890 return GEARMAN_EVENT;
891 }
892
893 gearmand->is_wakeup_event= true;
894 return GEARMAN_SUCCESS;
895 }
896
_wakeup_clear(gearmand_st * gearmand)897 static void _wakeup_clear(gearmand_st *gearmand)
898 {
899 if (gearmand->is_wakeup_event)
900 {
901 gearmand_debug("Clearing event for wakeup pipe");
902 if (event_del(&(gearmand->wakeup_event)) < 0)
903 {
904 gearmand_perror(errno, "We tried to event_del() an event which no longer existed");
905 assert_msg(false, "We tried to event_del() an event which no longer existed");
906 }
907 gearmand->is_wakeup_event= false;
908 }
909 }
910
_wakeup_event(int fd,short,void * arg)911 static void _wakeup_event(int fd, short, void *arg)
912 {
913 gearmand_st *gearmand= (gearmand_st *)arg;
914
915 while (1)
916 {
917 uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
918 ssize_t ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
919 if (ret == 0)
920 {
921 _clear_events(gearmand);
922 gearmand_fatal("read(EOF)");
923 gearmand->ret= GEARMAN_PIPE_EOF;
924 return;
925 }
926 else if (ret == -1)
927 {
928 int local_error= errno;
929 if (local_error == EINTR)
930 {
931 continue;
932 }
933
934 if (local_error == EAGAIN)
935 {
936 break;
937 }
938
939 _clear_events(gearmand);
940 gearmand->ret= gearmand_perror(local_error, "_wakeup_event:read");
941 return;
942 }
943
944 for (ssize_t x= 0; x < ret; ++x)
945 {
946 switch ((gearmand_wakeup_t)buffer[x])
947 {
948 case GEARMAND_WAKEUP_PAUSE:
949 gearmand_debug("Received PAUSE wakeup event");
950 _clear_events(gearmand);
951 gearmand->ret= GEARMAN_PAUSE;
952 break;
953
954 case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
955 gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
956 _listen_close(gearmand);
957
958 for (gearmand_thread_st* thread= gearmand->thread_list;
959 thread != NULL;
960 thread= thread->next)
961 {
962 gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
963 }
964
965 gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
966 break;
967
968 case GEARMAND_WAKEUP_SHUTDOWN:
969 gearmand_debug("Received SHUTDOWN wakeup event");
970 _clear_events(gearmand);
971 gearmand->ret= GEARMAN_SHUTDOWN;
972 break;
973
974 case GEARMAND_WAKEUP_CON:
975 case GEARMAND_WAKEUP_RUN:
976 gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
977 _clear_events(gearmand);
978 gearmand->ret= GEARMAN_UNKNOWN_STATE;
979 break;
980 }
981 }
982 }
983 }
984
_watch_events(gearmand_st * gearmand)985 static gearmand_error_t _watch_events(gearmand_st *gearmand)
986 {
987 gearmand_error_t ret= _listen_watch(gearmand);
988 if (ret != GEARMAN_SUCCESS)
989 {
990 return ret;
991 }
992
993 ret= _wakeup_watch(gearmand);
994 if (ret != GEARMAN_SUCCESS)
995 {
996 return ret;
997 }
998
999 return GEARMAN_SUCCESS;
1000 }
1001
_clear_events(gearmand_st * gearmand)1002 static void _clear_events(gearmand_st *gearmand)
1003 {
1004 _listen_clear(gearmand);
1005 _wakeup_clear(gearmand);
1006
1007 /*
1008 If we are not threaded, tell the fake thread to shutdown now to clear
1009 connections. Otherwise we will never exit the libevent loop.
1010 */
1011 if (gearmand->threads == 0 && gearmand->thread_list != NULL)
1012 {
1013 gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
1014 }
1015 }
1016
_close_events(gearmand_st * gearmand)1017 static void _close_events(gearmand_st *gearmand)
1018 {
1019 _listen_close(gearmand);
1020 _wakeup_close(gearmand);
1021 }
1022
1023 /** @} */
1024
1025 /*
1026 * Public Definitions
1027 */
1028
gearmand_version(void)1029 const char *gearmand_version(void)
1030 {
1031 return PACKAGE_VERSION;
1032 }
1033
gearmand_bugreport(void)1034 const char *gearmand_bugreport(void)
1035 {
1036 return PACKAGE_BUGREPORT;
1037 }
1038
gearmand_verbose_name(gearmand_verbose_t verbose)1039 const char *gearmand_verbose_name(gearmand_verbose_t verbose)
1040 {
1041 switch (verbose)
1042 {
1043 case GEARMAND_VERBOSE_FATAL:
1044 return "FATAL";
1045
1046 case GEARMAND_VERBOSE_ALERT:
1047 return "ALERT";
1048
1049 case GEARMAND_VERBOSE_CRITICAL:
1050 return "CRITICAL";
1051
1052 case GEARMAND_VERBOSE_ERROR:
1053 return "ERROR";
1054
1055 case GEARMAND_VERBOSE_WARN:
1056 return "WARNING";
1057
1058 case GEARMAND_VERBOSE_NOTICE:
1059 return "NOTICE";
1060
1061 case GEARMAND_VERBOSE_INFO:
1062 return "INFO";
1063
1064 case GEARMAND_VERBOSE_DEBUG:
1065 return "DEBUG";
1066
1067 default:
1068 break;
1069 }
1070
1071 return "UNKNOWN";
1072 }
1073
gearmand_verbose_check(const char * name,gearmand_verbose_t & level)1074 bool gearmand_verbose_check(const char *name, gearmand_verbose_t& level)
1075 {
1076 bool success= true;
1077 if (strcmp("FATAL", name) == 0)
1078 {
1079 level= GEARMAND_VERBOSE_FATAL;
1080 }
1081 else if (strcmp("ALERT", name) == 0)
1082 {
1083 level= GEARMAND_VERBOSE_ALERT;
1084 }
1085 else if (strcmp("CRITICAL", name) == 0)
1086 {
1087 level= GEARMAND_VERBOSE_CRITICAL;
1088 }
1089 else if (strcmp("ERROR", name) == 0)
1090 {
1091 level= GEARMAND_VERBOSE_ERROR;
1092 }
1093 else if (strcmp("WARNING", name) == 0)
1094 {
1095 level= GEARMAND_VERBOSE_WARN;
1096 }
1097 else if (strcmp("NOTICE", name) == 0)
1098 {
1099 level= GEARMAND_VERBOSE_NOTICE;
1100 }
1101 else if (strcmp("INFO", name) == 0)
1102 {
1103 level= GEARMAND_VERBOSE_INFO;
1104 }
1105 else if (strcmp("DEBUG", name) == 0)
1106 {
1107 level= GEARMAND_VERBOSE_DEBUG;
1108 }
1109 else
1110 {
1111 success= false;
1112 }
1113
1114 return success;
1115 }
1116
gearman_server_create(gearman_server_st & server,const uint32_t job_retries_arg,const char * job_handle_prefix,uint8_t worker_wakeup_arg,bool round_robin_arg,uint32_t hashtable_buckets)1117 static bool gearman_server_create(gearman_server_st& server,
1118 const uint32_t job_retries_arg,
1119 const char *job_handle_prefix,
1120 uint8_t worker_wakeup_arg,
1121 bool round_robin_arg,
1122 uint32_t hashtable_buckets)
1123 {
1124 server.state.queue_startup= false;
1125 server.flags.round_robin= round_robin_arg;
1126 server.flags.threaded= false;
1127 server.shutdown= false;
1128 server.shutdown_graceful= false;
1129 server.proc_wakeup= false;
1130 server.proc_shutdown= false;
1131 server.job_retries= job_retries_arg;
1132 server.worker_wakeup= worker_wakeup_arg;
1133 server.thread_count= 0;
1134 server.free_packet_count= 0;
1135 server.function_count= 0;
1136 server.job_count= 0;
1137 server.unique_count= 0;
1138 server.free_job_count= 0;
1139 server.free_client_count= 0;
1140 server.free_worker_count= 0;
1141 server.thread_list= NULL;
1142 server.free_packet_list= NULL;
1143 server.free_job_list= NULL;
1144 server.free_client_list= NULL;
1145 server.free_worker_list= NULL;
1146
1147 server.queue_version= QUEUE_VERSION_NONE;
1148 server.queue.object= NULL;
1149 server.queue.functions= NULL;
1150
1151 server.function_hash= (gearman_server_function_st **) calloc(GEARMAND_DEFAULT_HASH_SIZE, sizeof(gearman_server_function_st *));
1152 if (server.function_hash == NULL)
1153 {
1154 gearmand_merror("calloc", server.function_hash, GEARMAND_DEFAULT_HASH_SIZE);
1155 return false;
1156 }
1157
1158 server.hashtable_buckets= hashtable_buckets;
1159 server.job_hash= (gearman_server_job_st **) calloc(hashtable_buckets, sizeof(gearman_server_job_st *));
1160 if (server.job_hash == NULL)
1161 {
1162 gearmand_merror("calloc", server.job_hash, hashtable_buckets);
1163 return false;
1164 }
1165
1166 server.unique_hash= (gearman_server_job_st **) calloc(hashtable_buckets, sizeof(gearman_server_job_st *));
1167 if (server.unique_hash == NULL)
1168 {
1169 gearmand_merror("calloc", server.unique_hash, hashtable_buckets);
1170 return false;
1171 }
1172
1173 int checked_length= -1;
1174 if (job_handle_prefix)
1175 {
1176 checked_length= snprintf(server.job_handle_prefix, GEARMAND_JOB_HANDLE_SIZE, "%s", job_handle_prefix);
1177 }
1178 else
1179 {
1180 struct utsname un;
1181 if (uname(&un) == -1)
1182 {
1183 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, errno, "uname(&un) failed");
1184 gearman_server_free(server);
1185 return false;
1186 }
1187 checked_length= snprintf(server.job_handle_prefix, GEARMAND_JOB_HANDLE_SIZE, "H:%s", un.nodename);
1188 }
1189
1190 if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
1191 {
1192 gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Available length %d not enough to store job handle prefix %s",
1193 GEARMAND_JOB_HANDLE_SIZE, server.job_handle_prefix);
1194 gearman_server_free(server);
1195 return false;
1196 }
1197
1198 server.job_handle_count= 1;
1199
1200 return true;
1201 }
1202