1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2008 Brian Aker, Eric Day
7  *  All rights reserved.
8  *
9  *  Redistribution and use in source and binary forms, with or without
10  *  modification, are permitted provided that the following conditions are
11  *  met:
12  *
13  *      * Redistributions of source code must retain the above copyright
14  *  notice, this list of conditions and the following disclaimer.
15  *
16  *      * Redistributions in binary form must reproduce the above
17  *  copyright notice, this list of conditions and the following disclaimer
18  *  in the documentation and/or other materials provided with the
19  *  distribution.
20  *
21  *      * The names of its contributors may not be used to endorse or
22  *  promote products derived from this software without specific prior
23  *  written permission.
24  *
25  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  */
38 
39 /**
40  * @file
41  * @brief Server connection definitions
42  */
43 
44 #include "gear_config.h"
45 
46 #include "libgearman-server/common.h"
47 
48 #include <string.h>
49 #include <errno.h>
50 #include <assert.h>
51 
52 static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
53                                                   gearmand_error_t *ret);
54 
55 /*
56  * Public definitions
57  */
58 
gearman_server_con_add(gearman_server_thread_st * thread,gearmand_con_st * dcon,gearmand_error_t * ret)59 gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread, gearmand_con_st *dcon, gearmand_error_t *ret)
60 {
61   gearman_server_con_st *con= _server_con_create(thread, dcon, ret);
62   if (con)
63   {
64     if ((*ret= gearman_io_set_fd(&(con->con), dcon->fd)) != GEARMAN_SUCCESS)
65     {
66       gearman_server_con_free(con);
67       return NULL;
68     }
69 
70     *ret= gearmand_io_set_events(con, POLLIN);
71     if (*ret != GEARMAN_SUCCESS)
72     {
73       gearmand_gerror("gearmand_io_set_events", *ret);
74       gearman_server_con_free(con);
75       return NULL;
76     }
77   }
78 
79   return con;
80 }
81 
_server_con_create(gearman_server_thread_st * thread,gearmand_con_st * dcon,gearmand_error_t * ret)82 static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread,
83                                                   gearmand_con_st *dcon,
84                                                   gearmand_error_t *ret)
85 {
86   gearman_server_con_st *con;
87 
88   if (thread->free_con_count > 0)
89   {
90     con= thread->free_con_list;
91     GEARMAN_LIST__DEL(thread->free_con, con);
92   }
93   else
94   {
95     con= new (std::nothrow) gearman_server_con_st;
96     if (con == NULL)
97     {
98       *ret= gearmand_perror(errno, "new() build_gearman_server_con_st");
99       return NULL;
100     }
101   }
102 
103   assert(con);
104   if (con == NULL)
105   {
106     gearmand_error("Neigther an allocated gearman_server_con_st() or free listed could be found");
107     *ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
108     return NULL;
109   }
110 
111   gearmand_connection_options_t options[]= { GEARMAND_CON_MAX };
112   gearmand_connection_init(thread->gearman, &(con->con), dcon, options);
113 
114   con->con.root= con;
115 
116   con->is_sleeping= false;
117   con->is_exceptions= Gearmand()->_exceptions;
118   con->is_dead= false;
119   con->is_cleaned_up = false;
120   con->is_noop_sent= false;
121 
122   con->ret= GEARMAN_SUCCESS;
123   con->io_list= false;
124   con->proc_list= false;
125   con->to_be_freed_list= false;
126   con->proc_removed= false;
127   con->io_packet_count= 0;
128   con->proc_packet_count= 0;
129   con->worker_count= 0;
130   con->client_count= 0;
131   con->thread= thread;
132   con->packet= NULL;
133   con->io_packet_list= NULL;
134   con->io_packet_end= NULL;
135   con->proc_packet_list= NULL;
136   con->proc_packet_end= NULL;
137   con->io_next= NULL;
138   con->io_prev= NULL;
139   con->proc_next= NULL;
140   con->proc_prev= NULL;
141   con->to_be_freed_next= NULL;
142   con->to_be_freed_prev= NULL;
143   con->worker_list= NULL;
144   con->client_list= NULL;
145   con->_host= dcon->host;
146   con->_port= dcon->port;
147   strcpy(con->id, "-");
148   con->timeout_event= NULL;
149 
150   con->protocol= NULL;
151 
152   int error;
153   if ((error= pthread_mutex_lock(&thread->lock)) == 0)
154   {
155     GEARMAN_LIST__ADD(thread->con, con);
156     if ((error= pthread_mutex_unlock(&thread->lock)))
157     {
158       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
159       gearman_server_con_free(con);
160 
161       *ret= GEARMAN_ERRNO;
162       return NULL;
163     }
164   }
165   else
166   {
167     assert(error);
168     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
169     gearman_server_con_free(con);
170 
171     *ret= GEARMAN_ERRNO;
172     return NULL;
173   }
174 
175 
176   return con;
177 }
178 
gearman_server_con_attempt_free(gearman_server_con_st * con)179 void gearman_server_con_attempt_free(gearman_server_con_st *con)
180 {
181   con->_host= NULL;
182   con->_port= NULL;
183 
184   if (Server->flags.threaded)
185   {
186     if (!(con->proc_removed) && !(Server->proc_shutdown))
187     {
188       gearman_server_con_delete_timeout(con);
189       con->is_dead= true;
190       con->is_sleeping= false;
191       con->is_exceptions= Gearmand()->_exceptions;
192       con->is_noop_sent= false;
193       gearman_server_con_proc_add(con);
194     }
195   }
196   else
197   {
198     gearman_server_con_free(con);
199   }
200 }
201 
gearman_server_con_free(gearman_server_con_st * con)202 void gearman_server_con_free(gearman_server_con_st *con)
203 {
204   gearman_server_thread_st *thread= con->thread;
205   gearman_server_packet_st *packet;
206   con->_host= NULL;
207   con->_port= NULL;
208 
209   gearman_server_con_delete_timeout(con);
210 
211   if (con->is_cleaned_up)
212   {
213     gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "con %llu is already cleaned-up. returning", con);
214     return;
215   }
216 
217   gearmand_io_free(&(con->con));
218 
219   con->protocol_release();
220 
221   if (con->packet != NULL)
222   {
223     if (&(con->packet->packet) != con->con.recv_packet)
224     {
225       gearmand_packet_free(&(con->packet->packet));
226     }
227 
228     gearman_server_packet_free(con->packet, con->thread, true);
229   }
230 
231   while (con->io_packet_list != NULL)
232   {
233     gearman_server_io_packet_remove(con);
234   }
235 
236   while (con->proc_packet_list != NULL)
237   {
238     packet= gearman_server_proc_packet_remove(con);
239     gearmand_packet_free(&(packet->packet));
240     gearman_server_packet_free(packet, con->thread, true);
241   }
242 
243   gearman_server_con_free_workers(con);
244 
245   while (con->client_list != NULL)
246   {
247     gearman_server_client_free(con->client_list);
248   }
249 
250   if (con->timeout_event != NULL)
251   {
252     event_del(con->timeout_event);
253   }
254 
255   if (con->proc_list)
256   {
257     gearman_server_con_proc_remove(con);
258   }
259 
260   if (con->io_list)
261   {
262     gearman_server_con_io_remove(con);
263   }
264 
265   int lock_error;
266   if ((lock_error= pthread_mutex_lock(&thread->lock)) == 0)
267   {
268     GEARMAN_LIST__DEL(con->thread->con, con);
269     if ((lock_error= pthread_mutex_unlock(&thread->lock)))
270     {
271       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
272     }
273   }
274   else
275   {
276     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
277   }
278   assert(lock_error == 0);
279 
280   if (thread->free_con_count < GEARMAN_MAX_FREE_SERVER_CON)
281   {
282     GEARMAN_LIST__ADD(thread->free_con, con);
283 
284     con->is_cleaned_up = true;
285     return;
286   }
287 
288   delete con;
289 }
290 
gearman_server_con_con(gearman_server_con_st * con)291 gearmand_io_st *gearman_server_con_con(gearman_server_con_st *con)
292 {
293   assert(con);
294   return &con->con;
295 }
296 
gearman_server_con_data(gearman_server_con_st * con)297 gearmand_con_st *gearman_server_con_data(gearman_server_con_st *con)
298 {
299   assert(con);
300   return gearman_io_context(&(con->con));
301 }
302 
gearman_server_con_id(gearman_server_con_st * con)303 const char *gearman_server_con_id(gearman_server_con_st *con)
304 {
305   assert(con);
306   return con->id;
307 }
308 
gearman_server_con_set_id(gearman_server_con_st * con,char * id,size_t size)309 void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
310                                size_t size)
311 {
312   if (size >= GEARMAN_SERVER_CON_ID_SIZE)
313   {
314     size= GEARMAN_SERVER_CON_ID_SIZE - 1;
315   }
316 
317   memcpy(con->id, id, size);
318   con->id[size]= 0;
319 }
320 
gearman_server_con_free_worker(gearman_server_con_st * con,char * function_name,size_t function_name_size)321 void gearman_server_con_free_worker(gearman_server_con_st *con,
322                                     char *function_name,
323                                     size_t function_name_size)
324 {
325   gearman_server_worker_st *worker= con->worker_list;
326   gearman_server_worker_st *prev_worker= NULL;
327 
328   while (worker != NULL)
329   {
330     if (worker->function->function_name_size == function_name_size &&
331         !memcmp(worker->function->function_name, function_name,
332                 function_name_size))
333     {
334       gearman_server_worker_free(worker);
335 
336       /* Set worker to the last kept worker, or the beginning of the list. */
337       if (prev_worker == NULL)
338       {
339         worker= con->worker_list;
340       }
341       else
342       {
343         worker= prev_worker;
344       }
345     }
346     else
347     {
348       /* Save this so we don't need to scan the list again if one is removed. */
349       prev_worker= worker;
350       worker= worker->con_next;
351     }
352   }
353 }
354 
gearman_server_con_free_workers(gearman_server_con_st * con)355 void gearman_server_con_free_workers(gearman_server_con_st *con)
356 {
357   while (con->worker_list != NULL)
358   {
359     gearman_server_worker_free(con->worker_list);
360   }
361 }
362 
gearman_server_con_to_be_freed_add(gearman_server_con_st * con)363 void gearman_server_con_to_be_freed_add(gearman_server_con_st *con)
364 {
365   int lock_error;
366   if ((lock_error= pthread_mutex_lock(&con->thread->lock)) == 0)
367   {
368     if (con->to_be_freed_list)
369     {
370       if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
371       {
372         gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
373       }
374       assert(lock_error == 0);
375       return;
376     }
377   }
378   else
379   {
380     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_lock");
381   }
382   assert(lock_error == 0);
383 
384   GEARMAN_LIST_ADD(con->thread->to_be_freed, con, to_be_freed_);
385   con->to_be_freed_list = true;
386 
387   /* Looks funny, but need to check to_be_freed_count locked, but call run unlocked. */
388   if (con->thread->to_be_freed_count == 1 && con->thread->run_fn)
389   {
390     if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
391     {
392       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
393     }
394     assert(lock_error == 0);
395     (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
396   }
397   else
398   {
399     if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
400     {
401       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
402     }
403     assert(lock_error == 0);
404   }
405 }
406 
gearman_server_con_to_be_freed_next(gearman_server_thread_st * thread)407 gearman_server_con_st * gearman_server_con_to_be_freed_next(gearman_server_thread_st *thread)
408 {
409   gearman_server_con_st *con;
410 
411   if (thread->to_be_freed_list == NULL)
412   {
413     return NULL;
414   }
415 
416   int lock_error;
417   if ((lock_error= pthread_mutex_lock(&thread->lock)) == 0)
418   {
419     con= thread->to_be_freed_list;
420     while (con != NULL)
421     {
422       GEARMAN_LIST_DEL(thread->to_be_freed, con, to_be_freed_);
423         if (con->to_be_freed_list)
424         {
425           con->to_be_freed_list= false;
426           break;
427         }
428       con= thread->to_be_freed_list;
429     }
430 
431     if ((lock_error= pthread_mutex_unlock(&thread->lock)))
432     {
433       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
434     }
435 
436     return con;
437   }
438   else
439   {
440     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_lock");
441   }
442   assert(lock_error == 0);
443 
444   return NULL;
445 }
446 
gearman_server_con_io_add(gearman_server_con_st * con)447 void gearman_server_con_io_add(gearman_server_con_st *con)
448 {
449   if (con->io_list)
450   {
451     return;
452   }
453 
454   int lock_error;
455   if ((lock_error= pthread_mutex_lock(&con->thread->lock)) == 0)
456   {
457     GEARMAN_LIST_ADD(con->thread->io, con, io_);
458     con->io_list= true;
459 
460     /* Looks funny, but need to check io_count locked, but call run unlocked. */
461     if (con->thread->io_count == 1 && con->thread->run_fn)
462     {
463       if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
464       {
465         gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
466       }
467 
468       (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
469     }
470     else
471     {
472       if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
473       {
474         gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
475       }
476     }
477   }
478   else
479   {
480     gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "pthread_mutex_lock(%d), programming error, please report", lock_error);
481   }
482 
483   assert(lock_error == 0);
484 }
485 
gearman_server_con_io_remove(gearman_server_con_st * con)486 void gearman_server_con_io_remove(gearman_server_con_st *con)
487 {
488   int lock_error;
489   if ((lock_error= pthread_mutex_lock(&con->thread->lock)) == 0)
490   {
491     if (con->io_list)
492     {
493       GEARMAN_LIST_DEL(con->thread->io, con, io_);
494       con->io_list= false;
495     }
496     if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
497     {
498       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
499     }
500   }
501   else
502   {
503     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_lock");
504   }
505 
506   assert(lock_error == 0);
507 }
508 
509 gearman_server_con_st *
gearman_server_con_io_next(gearman_server_thread_st * thread)510 gearman_server_con_io_next(gearman_server_thread_st *thread)
511 {
512   gearman_server_con_st *con= thread->io_list;
513 
514   if (con)
515   {
516     gearman_server_con_io_remove(con);
517   }
518 
519   return con;
520 }
521 
gearman_server_con_proc_add(gearman_server_con_st * con)522 void gearman_server_con_proc_add(gearman_server_con_st *con)
523 {
524   if (con->proc_list)
525   {
526     return;
527   }
528 
529   int pthread_error;
530   if ((pthread_error= pthread_mutex_lock(&con->thread->lock)) == 0)
531   {
532     GEARMAN_LIST_ADD(con->thread->proc, con, proc_);
533     con->proc_list= true;
534     if ((pthread_error= pthread_mutex_unlock(&con->thread->lock)))
535     {
536       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
537     }
538 
539     if (! (Server->proc_shutdown) && !(Server->proc_wakeup))
540     {
541       if ((pthread_error= pthread_mutex_lock(&(Server->proc_lock))) == 0)
542       {
543         Server->proc_wakeup= true;
544         if ((pthread_error= pthread_cond_signal(&(Server->proc_cond))) == 0)
545         {
546           if ((pthread_error= pthread_mutex_unlock(&(Server->proc_lock))))
547           {
548             gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
549           }
550         }
551         else
552         {
553           gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_cond_signal");
554         }
555       }
556       else
557       {
558         gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
559       }
560     }
561   }
562   else
563   {
564     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
565   }
566 }
567 
gearman_server_con_proc_remove(gearman_server_con_st * con)568 void gearman_server_con_proc_remove(gearman_server_con_st *con)
569 {
570   int pthread_error;
571 
572   if ((pthread_error= pthread_mutex_lock(&con->thread->lock)) == 0)
573   {
574     if (con->proc_list)
575     {
576       GEARMAN_LIST_DEL(con->thread->proc, con, proc_);
577       con->proc_list= false;
578     }
579 
580     if ((pthread_error= pthread_mutex_unlock(&con->thread->lock)))
581     {
582       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
583     }
584   }
585   else
586   {
587     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
588   }
589 }
590 
591 gearman_server_con_st *
gearman_server_con_proc_next(gearman_server_thread_st * thread)592 gearman_server_con_proc_next(gearman_server_thread_st *thread)
593 {
594   if (thread->proc_list == NULL)
595   {
596     return NULL;
597   }
598 
599   gearman_server_con_st *con= NULL;
600 
601   int pthread_error;
602   if ((pthread_error= pthread_mutex_lock(&thread->lock)) == 0)
603   {
604     con= thread->proc_list;
605     while (con != NULL)
606     {
607       GEARMAN_LIST_DEL(thread->proc, con, proc_);
608       con->proc_list= false;
609       if (!(con->proc_removed))
610       {
611         break;
612       }
613       con= thread->proc_list;
614     }
615 
616     if ((pthread_error= pthread_mutex_unlock(&thread->lock)))
617     {
618       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
619     }
620   }
621   else
622   {
623     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
624   }
625 
626   return con;
627 }
628 
_server_job_timeout(int fd,short event,void * arg)629 static void _server_job_timeout(int fd, short event, void *arg)
630 {
631   (void)fd;
632   (void)event;
633   gearman_server_job_st *job= (gearman_server_job_st *)arg;
634 
635   /* A timeout has ocurred on a job, re-queue it */
636   gearmand_log_warning(GEARMAN_DEFAULT_LOG_PARAM,
637                        "Worker timeout reached on job, requeueing: %s %s",
638                        job->job_handle, job->unique);
639 
640   gearmand_error_t ret= gearman_server_job_queue(job);
641   if (ret != GEARMAN_SUCCESS)
642   {
643     gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
644                        "Failed trying to requeue job after timeout, job lost: %s %s",
645                        job->job_handle, job->unique);
646     gearman_server_job_free(job);
647   }
648 }
649 
gearman_server_con_add_job_timeout(gearman_server_con_st * con,gearman_server_job_st * job)650 gearmand_error_t gearman_server_con_add_job_timeout(gearman_server_con_st *con, gearman_server_job_st *job)
651 {
652   if (job)
653   {
654     gearman_server_worker_st *worker;
655     for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
656     {
657       /* Assumes the functions are always fetched from the same server structure */
658       if (worker->function == job->function)
659       {
660         break;
661       }
662     }
663 
664     /* It makes no sense to add a timeout to a connection that has no workers for a job */
665     assert(worker);
666     if (worker)
667     {
668       // We treat 0 and -1 as being the same (i.e. no timer)
669       if (worker->timeout > 0)
670       {
671         if (worker->timeout < 1000)
672         {
673           worker->timeout= 1000;
674         }
675 
676         gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Adding timeout on %s for %s (%dl)",
677                            job->function->function_name,
678                            job->job_handle,
679                            worker->timeout);
680         if (con->timeout_event == NULL)
681         {
682           gearmand_con_st *dcon= con->con.context;
683           con->timeout_event= (struct event *)malloc(sizeof(struct event));
684           if (con->timeout_event == NULL)
685           {
686             return gearmand_gerror("creating timeout event", GEARMAN_MEMORY_ALLOCATION_FAILURE);
687           }
688           timeout_set(con->timeout_event, _server_job_timeout, job);
689           event_base_set(dcon->thread->base, con->timeout_event);
690         }
691 
692         /* XXX Right now, if a worker has diff timeouts for functions I think
693           this will overwrite any existing timeouts on that event. One
694           solution to that would be to record the timeout from last time,
695           and only set this one if it is longer than that one. */
696 
697         struct timeval timeout_tv = { 0 , 0 };
698         timeout_tv.tv_sec= worker->timeout;
699         timeout_add(con->timeout_event, &timeout_tv);
700       }
701       else if (con->timeout_event) // Delete the timer if it exists
702       {
703         gearman_server_con_delete_timeout(con);
704       }
705     }
706   }
707 
708   return GEARMAN_SUCCESS;
709 }
710 
gearman_server_con_delete_timeout(gearman_server_con_st * con)711 void gearman_server_con_delete_timeout(gearman_server_con_st *con)
712 {
713   if (con->timeout_event)
714   {
715     timeout_del(con->timeout_event);
716     con->timeout_event= NULL;
717   }
718 }
719 
gearmand_ready(gearmand_connection_list_st * universal)720 gearman_server_con_st *gearmand_ready(gearmand_connection_list_st *universal)
721 {
722   if (universal->ready_con_list)
723   {
724     gearmand_io_st *con= universal->ready_con_list;
725     con->options.ready= false;
726     GEARMAN_LIST_DEL(universal->ready_con, con, ready_);
727     return con->root;
728   }
729 
730   return NULL;
731 }
732