1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2008 Brian Aker, Eric Day
7  *  All rights reserved.
8  *
9  *  Redistribution and use in source and binary forms, with or without
10  *  modification, are permitted provided that the following conditions are
11  *  met:
12  *
13  *      * Redistributions of source code must retain the above copyright
14  *  notice, this list of conditions and the following disclaimer.
15  *
16  *      * Redistributions in binary form must reproduce the above
17  *  copyright notice, this list of conditions and the following disclaimer
18  *  in the documentation and/or other materials provided with the
19  *  distribution.
20  *
21  *      * The names of its contributors may not be used to endorse or
22  *  promote products derived from this software without specific prior
23  *  written permission.
24  *
25  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  */
38 
39 /**
40  * @file
41  * @brief Server Thread Definitions
42  */
43 
44 #include "gear_config.h"
45 #include "libgearman-server/common.h"
46 
47 #include <libgearman/command.h>
48 
49 #ifdef __cplusplus
50 # include <cassert>
51 # include <cerrno>
52 #else
53 # include <assert.h>
54 # include <errno.h>
55 #endif
56 
57 /*
58  * Private declarations
59  */
60 
61 /**
62  * @addtogroup gearman_server_private Private Server Functions
63  * @ingroup gearman_server
64  * @{
65  */
66 
67 /**
68  * Try reading packets for a connection.
69  */
70 static gearmand_error_t _thread_packet_read(gearman_server_con_st *con);
71 
72 /**
73  * Flush outgoing packets for a connection.
74  */
75 static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con);
76 
77 /**
78  * Start processing thread for the server.
79  */
80 static gearmand_error_t _proc_thread_start(gearman_server_st *server);
81 
82 /**
83  * Kill processing thread for the server.
84  */
85 static void _proc_thread_kill(gearman_server_st *server);
86 
87 /** @} */
88 
89 /*
90  * Public definitions
91  */
92 
gearman_server_thread_init(gearman_server_st * server,gearman_server_thread_st * thread,gearman_log_server_fn * log_function,gearmand_thread_st * context,gearmand_event_watch_fn * event_watch)93 bool gearman_server_thread_init(gearman_server_st *server,
94                                 gearman_server_thread_st *thread,
95                                 gearman_log_server_fn *log_function,
96                                 gearmand_thread_st *context,
97                                 gearmand_event_watch_fn *event_watch)
98 {
99   assert(server);
100   assert(thread);
101   if (server->thread_count == 1)
102   {
103     /* The server is going to be multi-threaded, start processing thread. */
104     if (_proc_thread_start(server) != GEARMAN_SUCCESS)
105     {
106       return false;
107     }
108   }
109 
110   thread->con_count= 0;
111   thread->io_count= 0;
112   thread->proc_count= 0;
113   thread->to_be_freed_count= 0;
114   thread->free_con_count= 0;
115   thread->free_packet_count= 0;
116   thread->log_fn= log_function;
117   thread->log_context= context;
118   thread->run_fn= NULL;
119   thread->run_fn_arg= NULL;
120   thread->con_list= NULL;
121   thread->io_list= NULL;
122   thread->proc_list= NULL;
123   thread->free_con_list= NULL;
124   thread->free_packet_list= NULL;
125   thread->to_be_freed_list= NULL;
126 
127   int error;
128   if ((error= pthread_mutex_init(&(thread->lock), NULL)))
129   {
130     gearmand_perror(error, "pthread_mutex_init");
131     return false;
132   }
133 
134   GEARMAN_LIST__ADD(server->thread, thread);
135 
136   thread->gearman= &(thread->gearmand_connection_list_static);
137   thread->gearman->init(event_watch, NULL);
138 
139   return true;
140 }
141 
gearman_server_thread_free(gearman_server_thread_st * thread)142 void gearman_server_thread_free(gearman_server_thread_st *thread)
143 {
144   _proc_thread_kill(Server);
145 
146   while (thread->con_list != NULL)
147   {
148     gearman_server_con_free(thread->con_list);
149   }
150 
151   while (thread->free_con_list != NULL)
152   {
153     gearman_server_con_st *con= thread->free_con_list;
154     thread->free_con_list= con->next;
155     delete con;
156   }
157 
158   while (thread->free_packet_list != NULL)
159   {
160     gearman_server_packet_st *packet= thread->free_packet_list;
161     thread->free_packet_list= packet->next;
162     destroy_gearman_server_packet_st(packet);
163   }
164 
165   if (thread->gearman != NULL)
166   {
167     thread->gearman->list_free();
168   }
169 
170   pthread_mutex_destroy(&(thread->lock));
171 
172   GEARMAN_LIST__DEL(Server->thread, thread);
173 }
174 
gearman_server_thread_set_run(gearman_server_thread_st * thread,gearman_server_thread_run_fn * run_fn,void * run_fn_arg)175 void gearman_server_thread_set_run(gearman_server_thread_st *thread,
176                                    gearman_server_thread_run_fn *run_fn,
177                                    void *run_fn_arg)
178 {
179   thread->run_fn= run_fn;
180   thread->run_fn_arg= run_fn_arg;
181 }
182 
183 gearmand_con_st *
gearman_server_thread_run(gearman_server_thread_st * thread,gearmand_error_t * ret_ptr)184 gearman_server_thread_run(gearman_server_thread_st *thread,
185                           gearmand_error_t *ret_ptr)
186 {
187   /* If we are multi-threaded, we may have packets to flush or connections that
188      should start reading again. */
189   if (Server->flags.threaded)
190   {
191     gearman_server_con_st *server_con;
192 
193     while ((server_con= gearman_server_con_to_be_freed_next(thread)) != NULL)
194     {
195       if (server_con->is_dead && server_con->proc_removed)
196         gearman_server_con_free(server_con);
197       else
198         gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "con %llu isn't dead %d or proc removed %d, but is in to_be_freed_list",
199                            server_con, server_con->is_dead, server_con->proc_removed);
200     }
201 
202     while ((server_con= gearman_server_con_io_next(thread)) != NULL)
203     {
204       if (server_con->is_dead)
205       {
206         gearman_server_con_attempt_free(server_con);
207         continue;
208       }
209 
210       if (server_con->ret != GEARMAN_SUCCESS)
211       {
212         *ret_ptr= server_con->ret;
213         return gearman_server_con_data(server_con);
214       }
215 
216       /* See if any outgoing packets were queued. */
217       *ret_ptr= _thread_packet_flush(server_con);
218       if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
219       {
220         return gearman_server_con_data(server_con);
221       }
222     }
223   }
224 
225   /* Check for new activity on connections. */
226   {
227     gearman_server_con_st *server_con;
228 
229     while ((server_con= gearmand_ready(thread->gearman)))
230     {
231       /* Try to read new packets. */
232       if (server_con->con.revents & POLLIN)
233       {
234         *ret_ptr= _thread_packet_read(server_con);
235         if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
236           return gearman_server_con_data(server_con);
237       }
238 
239       /* Flush existing outgoing packets. */
240       if (server_con->con.revents & POLLOUT)
241       {
242         *ret_ptr= _thread_packet_flush(server_con);
243         if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
244         {
245           return gearman_server_con_data(server_con);
246         }
247       }
248     }
249   }
250 
251   /* Start flushing new outgoing packets if we are single threaded. */
252   if (! (Server->flags.threaded))
253   {
254     gearman_server_con_st *server_con;
255     while ((server_con= gearman_server_con_io_next(thread)))
256     {
257       *ret_ptr= _thread_packet_flush(server_con);
258       if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
259       {
260         return gearman_server_con_data(server_con);
261       }
262     }
263   }
264 
265   /* Check for the two shutdown modes. */
266   if (Server->shutdown)
267   {
268     *ret_ptr= GEARMAN_SHUTDOWN;
269   }
270   else if (Server->shutdown_graceful)
271   {
272     if (Server->job_count == 0)
273     {
274       *ret_ptr= GEARMAN_SHUTDOWN;
275     }
276     else
277     {
278       *ret_ptr= GEARMAN_SHUTDOWN_GRACEFUL;
279     }
280   }
281   else
282   {
283     *ret_ptr= GEARMAN_SUCCESS;
284   }
285 
286   return NULL;
287 }
288 
289 /*
290  * Private definitions
291  */
292 
_thread_packet_read(gearman_server_con_st * con)293 static gearmand_error_t _thread_packet_read(gearman_server_con_st *con)
294 {
295   while (1)
296   {
297     if (con->packet == NULL)
298     {
299       if (! (con->packet= gearman_server_packet_create(con->thread, true)))
300       {
301         return GEARMAN_MEMORY_ALLOCATION_FAILURE;
302       }
303     }
304 
305     gearmand_error_t ret;
306     if (gearmand_failed(ret= gearman_io_recv(con, true)))
307     {
308       if (ret == GEARMAN_IO_WAIT)
309       {
310         break;
311       }
312 
313       gearman_server_packet_free(con->packet, con->thread, true);
314       con->packet= NULL;
315       return ret;
316     }
317 
318     gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
319                        "Received %s %s:%u",
320                        gearman_command_info(con->packet->packet.command)->name,
321                        con->_host == NULL ? "-" : con->_host,
322                        con->_port == NULL ? "-" : con->_port);
323 
324     /* We read a complete packet. */
325     if (Server->flags.threaded)
326     {
327       /* Multi-threaded, queue for the processing thread to run. */
328       gearman_server_proc_packet_add(con, con->packet);
329       con->packet= NULL;
330     }
331     else
332     {
333       /* Single threaded, run the command here. */
334       gearmand_error_t rc= gearman_server_run_command(con, &(con->packet->packet));
335       gearmand_packet_free(&(con->packet->packet));
336       gearman_server_packet_free(con->packet, con->thread, true);
337       con->packet= NULL;
338       if (gearmand_failed(rc))
339       {
340         return rc;
341       }
342     }
343   }
344 
345   return GEARMAN_SUCCESS;
346 }
347 
_thread_packet_flush(gearman_server_con_st * con)348 static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con)
349 {
350   /* Check to see if we've already tried to avoid excessive system calls. */
351   if (con->con.events & POLLOUT)
352   {
353     return GEARMAN_IO_WAIT;
354   }
355 
356   while (con->io_packet_list)
357   {
358     gearmand_error_t ret= gearman_io_send(con, &(con->io_packet_list->packet),
359                                           con->io_packet_list->next == NULL ? true : false);
360     if (gearmand_failed(ret))
361     {
362       return ret;
363     }
364 
365     gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
366                        "Sent %s to %s:%d",
367                        gearman_command_info(con->io_packet_list->packet.command)->name,
368                        con->_host == NULL ? "-" : con->_host,
369                        con->_port == NULL ? "-" : con->_port);
370 
371     gearman_server_io_packet_remove(con);
372   }
373 
374   /* Clear the POLLOUT flag. */
375   return gearmand_io_set_events(con, POLLIN);
376 }
377 
_proc_thread_start(gearman_server_st * server)378 static gearmand_error_t _proc_thread_start(gearman_server_st *server)
379 {
380   int error;
381   if ((error= pthread_mutex_init(&(server->proc_lock), NULL)))
382   {
383     return gearmand_perror(error, "pthread_mutex_init");
384   }
385 
386   if ((error= pthread_cond_init(&(server->proc_cond), NULL)))
387   {
388     return gearmand_perror(error, "pthread_cond_init");
389   }
390 
391   pthread_attr_t attr;
392   if ((error= pthread_attr_init(&attr)))
393   {
394     return gearmand_perror(error, "pthread_attr_init");
395   }
396 
397   if ((error= pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)))
398   {
399     (void) pthread_attr_destroy(&attr);
400     return gearmand_perror(error, "pthread_attr_setscope");
401   }
402 
403   if ((error= pthread_create(&(server->proc_id), &attr, _proc, server)))
404   {
405     (void) pthread_attr_destroy(&attr);
406     return gearmand_perror(error, "pthread_create");
407   }
408 
409   if ((error= pthread_attr_destroy(&attr)))
410   {
411     gearmand_perror(error, "pthread_create");
412   }
413 
414   server->flags.threaded= true;
415 
416   return GEARMAN_SUCCESS;
417 }
418 
_proc_thread_kill(gearman_server_st * server)419 static void _proc_thread_kill(gearman_server_st *server)
420 {
421   if (! (server->flags.threaded) || server->proc_shutdown)
422   {
423     return;
424   }
425 
426   server->proc_shutdown= true;
427 
428   /* Signal proc thread to shutdown. */
429   int error;
430   if ((error= pthread_mutex_lock(&(server->proc_lock))) == 0)
431   {
432     if ((error= pthread_cond_signal(&(server->proc_cond))))
433     {
434       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_cond_signal");
435     }
436 
437     if ((error= pthread_mutex_unlock(&(server->proc_lock))))
438     {
439       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock");
440     }
441   }
442   else
443   {
444     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
445   }
446 
447   /* Wait for the proc thread to exit and then cleanup. */
448   if ((error= pthread_join(server->proc_id, NULL)))
449   {
450     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_join");
451   }
452 
453   if ((error= pthread_cond_destroy(&(server->proc_cond))))
454   {
455     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_cond_destroy");
456   }
457 
458   if ((error= pthread_mutex_destroy(&(server->proc_lock))))
459   {
460     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_destroy");
461   }
462 }
463