1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2008 Brian Aker, Eric Day
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39 /**
40 * @file
41 * @brief Server connection definitions
42 */
43
44 #include "gear_config.h"
45
46 #include "libgearman-server/common.h"
47
48 #include <string.h>
49 #include <errno.h>
50 #include <assert.h>
51
52 static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
53 gearmand_error_t *ret);
54
55 /*
56 * Public definitions
57 */
58
gearman_server_con_add(gearman_server_thread_st * thread,gearmand_con_st * dcon,gearmand_error_t * ret)59 gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread, gearmand_con_st *dcon, gearmand_error_t *ret)
60 {
61 gearman_server_con_st *con= _server_con_create(thread, dcon, ret);
62 if (con)
63 {
64 if ((*ret= gearman_io_set_fd(&(con->con), dcon->fd)) != GEARMAN_SUCCESS)
65 {
66 gearman_server_con_free(con);
67 return NULL;
68 }
69
70 *ret= gearmand_io_set_events(con, POLLIN);
71 if (*ret != GEARMAN_SUCCESS)
72 {
73 gearmand_gerror("gearmand_io_set_events", *ret);
74 gearman_server_con_free(con);
75 return NULL;
76 }
77 }
78
79 return con;
80 }
81
_server_con_create(gearman_server_thread_st * thread,gearmand_con_st * dcon,gearmand_error_t * ret)82 static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread,
83 gearmand_con_st *dcon,
84 gearmand_error_t *ret)
85 {
86 gearman_server_con_st *con;
87
88 if (thread->free_con_count > 0)
89 {
90 con= thread->free_con_list;
91 GEARMAN_LIST__DEL(thread->free_con, con);
92 }
93 else
94 {
95 con= new (std::nothrow) gearman_server_con_st;
96 if (con == NULL)
97 {
98 *ret= gearmand_perror(errno, "new() build_gearman_server_con_st");
99 return NULL;
100 }
101 }
102
103 assert(con);
104 if (con == NULL)
105 {
106 gearmand_error("Neigther an allocated gearman_server_con_st() or free listed could be found");
107 *ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
108 return NULL;
109 }
110
111 gearmand_connection_options_t options[]= { GEARMAND_CON_MAX };
112 gearmand_connection_init(thread->gearman, &(con->con), dcon, options);
113
114 con->con.root= con;
115
116 con->is_sleeping= false;
117 con->is_exceptions= Gearmand()->_exceptions;
118 con->is_dead= false;
119 con->is_cleaned_up = false;
120 con->is_noop_sent= false;
121
122 con->ret= GEARMAN_SUCCESS;
123 con->io_list= false;
124 con->proc_list= false;
125 con->to_be_freed_list= false;
126 con->proc_removed= false;
127 con->io_packet_count= 0;
128 con->proc_packet_count= 0;
129 con->worker_count= 0;
130 con->client_count= 0;
131 con->thread= thread;
132 con->packet= NULL;
133 con->io_packet_list= NULL;
134 con->io_packet_end= NULL;
135 con->proc_packet_list= NULL;
136 con->proc_packet_end= NULL;
137 con->io_next= NULL;
138 con->io_prev= NULL;
139 con->proc_next= NULL;
140 con->proc_prev= NULL;
141 con->to_be_freed_next= NULL;
142 con->to_be_freed_prev= NULL;
143 con->worker_list= NULL;
144 con->client_list= NULL;
145 con->_host= dcon->host;
146 con->_port= dcon->port;
147 strcpy(con->id, "-");
148 con->timeout_event= NULL;
149
150 con->protocol= NULL;
151
152 int error;
153 if ((error= pthread_mutex_lock(&thread->lock)) == 0)
154 {
155 GEARMAN_LIST__ADD(thread->con, con);
156 if ((error= pthread_mutex_unlock(&thread->lock)))
157 {
158 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
159 gearman_server_con_free(con);
160
161 *ret= GEARMAN_ERRNO;
162 return NULL;
163 }
164 }
165 else
166 {
167 assert(error);
168 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
169 gearman_server_con_free(con);
170
171 *ret= GEARMAN_ERRNO;
172 return NULL;
173 }
174
175
176 return con;
177 }
178
gearman_server_con_attempt_free(gearman_server_con_st * con)179 void gearman_server_con_attempt_free(gearman_server_con_st *con)
180 {
181 con->_host= NULL;
182 con->_port= NULL;
183
184 if (Server->flags.threaded)
185 {
186 if (!(con->proc_removed) && !(Server->proc_shutdown))
187 {
188 gearman_server_con_delete_timeout(con);
189 con->is_dead= true;
190 con->is_sleeping= false;
191 con->is_exceptions= Gearmand()->_exceptions;
192 con->is_noop_sent= false;
193 gearman_server_con_proc_add(con);
194 }
195 }
196 else
197 {
198 gearman_server_con_free(con);
199 }
200 }
201
gearman_server_con_free(gearman_server_con_st * con)202 void gearman_server_con_free(gearman_server_con_st *con)
203 {
204 gearman_server_thread_st *thread= con->thread;
205 gearman_server_packet_st *packet;
206 con->_host= NULL;
207 con->_port= NULL;
208
209 gearman_server_con_delete_timeout(con);
210
211 if (con->is_cleaned_up)
212 {
213 gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "con %llu is already cleaned-up. returning", con);
214 return;
215 }
216
217 gearmand_io_free(&(con->con));
218
219 con->protocol_release();
220
221 if (con->packet != NULL)
222 {
223 if (&(con->packet->packet) != con->con.recv_packet)
224 {
225 gearmand_packet_free(&(con->packet->packet));
226 }
227
228 gearman_server_packet_free(con->packet, con->thread, true);
229 }
230
231 while (con->io_packet_list != NULL)
232 {
233 gearman_server_io_packet_remove(con);
234 }
235
236 while (con->proc_packet_list != NULL)
237 {
238 packet= gearman_server_proc_packet_remove(con);
239 gearmand_packet_free(&(packet->packet));
240 gearman_server_packet_free(packet, con->thread, true);
241 }
242
243 gearman_server_con_free_workers(con);
244
245 while (con->client_list != NULL)
246 {
247 gearman_server_client_free(con->client_list);
248 }
249
250 if (con->timeout_event != NULL)
251 {
252 event_del(con->timeout_event);
253 }
254
255 if (con->proc_list)
256 {
257 gearman_server_con_proc_remove(con);
258 }
259
260 if (con->io_list)
261 {
262 gearman_server_con_io_remove(con);
263 }
264
265 int lock_error;
266 if ((lock_error= pthread_mutex_lock(&thread->lock)) == 0)
267 {
268 GEARMAN_LIST__DEL(con->thread->con, con);
269 if ((lock_error= pthread_mutex_unlock(&thread->lock)))
270 {
271 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
272 }
273 }
274 else
275 {
276 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
277 }
278 assert(lock_error == 0);
279
280 if (thread->free_con_count < GEARMAN_MAX_FREE_SERVER_CON)
281 {
282 GEARMAN_LIST__ADD(thread->free_con, con);
283
284 con->is_cleaned_up = true;
285 return;
286 }
287
288 delete con;
289 }
290
gearman_server_con_con(gearman_server_con_st * con)291 gearmand_io_st *gearman_server_con_con(gearman_server_con_st *con)
292 {
293 assert(con);
294 return &con->con;
295 }
296
gearman_server_con_data(gearman_server_con_st * con)297 gearmand_con_st *gearman_server_con_data(gearman_server_con_st *con)
298 {
299 assert(con);
300 return gearman_io_context(&(con->con));
301 }
302
gearman_server_con_id(gearman_server_con_st * con)303 const char *gearman_server_con_id(gearman_server_con_st *con)
304 {
305 assert(con);
306 return con->id;
307 }
308
gearman_server_con_set_id(gearman_server_con_st * con,char * id,size_t size)309 void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
310 size_t size)
311 {
312 if (size >= GEARMAN_SERVER_CON_ID_SIZE)
313 {
314 size= GEARMAN_SERVER_CON_ID_SIZE - 1;
315 }
316
317 memcpy(con->id, id, size);
318 con->id[size]= 0;
319 }
320
gearman_server_con_free_worker(gearman_server_con_st * con,char * function_name,size_t function_name_size)321 void gearman_server_con_free_worker(gearman_server_con_st *con,
322 char *function_name,
323 size_t function_name_size)
324 {
325 gearman_server_worker_st *worker= con->worker_list;
326 gearman_server_worker_st *prev_worker= NULL;
327
328 while (worker != NULL)
329 {
330 if (worker->function->function_name_size == function_name_size &&
331 !memcmp(worker->function->function_name, function_name,
332 function_name_size))
333 {
334 gearman_server_worker_free(worker);
335
336 /* Set worker to the last kept worker, or the beginning of the list. */
337 if (prev_worker == NULL)
338 {
339 worker= con->worker_list;
340 }
341 else
342 {
343 worker= prev_worker;
344 }
345 }
346 else
347 {
348 /* Save this so we don't need to scan the list again if one is removed. */
349 prev_worker= worker;
350 worker= worker->con_next;
351 }
352 }
353 }
354
gearman_server_con_free_workers(gearman_server_con_st * con)355 void gearman_server_con_free_workers(gearman_server_con_st *con)
356 {
357 while (con->worker_list != NULL)
358 {
359 gearman_server_worker_free(con->worker_list);
360 }
361 }
362
gearman_server_con_to_be_freed_add(gearman_server_con_st * con)363 void gearman_server_con_to_be_freed_add(gearman_server_con_st *con)
364 {
365 int lock_error;
366 if ((lock_error= pthread_mutex_lock(&con->thread->lock)) == 0)
367 {
368 if (con->to_be_freed_list)
369 {
370 if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
371 {
372 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
373 }
374 assert(lock_error == 0);
375 return;
376 }
377 }
378 else
379 {
380 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_lock");
381 }
382 assert(lock_error == 0);
383
384 GEARMAN_LIST_ADD(con->thread->to_be_freed, con, to_be_freed_);
385 con->to_be_freed_list = true;
386
387 /* Looks funny, but need to check to_be_freed_count locked, but call run unlocked. */
388 if (con->thread->to_be_freed_count == 1 && con->thread->run_fn)
389 {
390 if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
391 {
392 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
393 }
394 assert(lock_error == 0);
395 (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
396 }
397 else
398 {
399 if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
400 {
401 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
402 }
403 assert(lock_error == 0);
404 }
405 }
406
gearman_server_con_to_be_freed_next(gearman_server_thread_st * thread)407 gearman_server_con_st * gearman_server_con_to_be_freed_next(gearman_server_thread_st *thread)
408 {
409 gearman_server_con_st *con;
410
411 if (thread->to_be_freed_list == NULL)
412 {
413 return NULL;
414 }
415
416 int lock_error;
417 if ((lock_error= pthread_mutex_lock(&thread->lock)) == 0)
418 {
419 con= thread->to_be_freed_list;
420 while (con != NULL)
421 {
422 GEARMAN_LIST_DEL(thread->to_be_freed, con, to_be_freed_);
423 if (con->to_be_freed_list)
424 {
425 con->to_be_freed_list= false;
426 break;
427 }
428 con= thread->to_be_freed_list;
429 }
430
431 if ((lock_error= pthread_mutex_unlock(&thread->lock)))
432 {
433 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
434 }
435
436 return con;
437 }
438 else
439 {
440 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_lock");
441 }
442 assert(lock_error == 0);
443
444 return NULL;
445 }
446
gearman_server_con_io_add(gearman_server_con_st * con)447 void gearman_server_con_io_add(gearman_server_con_st *con)
448 {
449 if (con->io_list)
450 {
451 return;
452 }
453
454 int lock_error;
455 if ((lock_error= pthread_mutex_lock(&con->thread->lock)) == 0)
456 {
457 GEARMAN_LIST_ADD(con->thread->io, con, io_);
458 con->io_list= true;
459
460 /* Looks funny, but need to check io_count locked, but call run unlocked. */
461 if (con->thread->io_count == 1 && con->thread->run_fn)
462 {
463 if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
464 {
465 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
466 }
467
468 (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
469 }
470 else
471 {
472 if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
473 {
474 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
475 }
476 }
477 }
478 else
479 {
480 gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "pthread_mutex_lock(%d), programming error, please report", lock_error);
481 }
482
483 assert(lock_error == 0);
484 }
485
gearman_server_con_io_remove(gearman_server_con_st * con)486 void gearman_server_con_io_remove(gearman_server_con_st *con)
487 {
488 int lock_error;
489 if ((lock_error= pthread_mutex_lock(&con->thread->lock)) == 0)
490 {
491 if (con->io_list)
492 {
493 GEARMAN_LIST_DEL(con->thread->io, con, io_);
494 con->io_list= false;
495 }
496 if ((lock_error= pthread_mutex_unlock(&con->thread->lock)))
497 {
498 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_unlock");
499 }
500 }
501 else
502 {
503 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, lock_error, "pthread_mutex_lock");
504 }
505
506 assert(lock_error == 0);
507 }
508
509 gearman_server_con_st *
gearman_server_con_io_next(gearman_server_thread_st * thread)510 gearman_server_con_io_next(gearman_server_thread_st *thread)
511 {
512 gearman_server_con_st *con= thread->io_list;
513
514 if (con)
515 {
516 gearman_server_con_io_remove(con);
517 }
518
519 return con;
520 }
521
gearman_server_con_proc_add(gearman_server_con_st * con)522 void gearman_server_con_proc_add(gearman_server_con_st *con)
523 {
524 if (con->proc_list)
525 {
526 return;
527 }
528
529 int pthread_error;
530 if ((pthread_error= pthread_mutex_lock(&con->thread->lock)) == 0)
531 {
532 GEARMAN_LIST_ADD(con->thread->proc, con, proc_);
533 con->proc_list= true;
534 if ((pthread_error= pthread_mutex_unlock(&con->thread->lock)))
535 {
536 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
537 }
538
539 if (! (Server->proc_shutdown) && !(Server->proc_wakeup))
540 {
541 if ((pthread_error= pthread_mutex_lock(&(Server->proc_lock))) == 0)
542 {
543 Server->proc_wakeup= true;
544 if ((pthread_error= pthread_cond_signal(&(Server->proc_cond))) == 0)
545 {
546 if ((pthread_error= pthread_mutex_unlock(&(Server->proc_lock))))
547 {
548 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
549 }
550 }
551 else
552 {
553 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_cond_signal");
554 }
555 }
556 else
557 {
558 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
559 }
560 }
561 }
562 else
563 {
564 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
565 }
566 }
567
gearman_server_con_proc_remove(gearman_server_con_st * con)568 void gearman_server_con_proc_remove(gearman_server_con_st *con)
569 {
570 int pthread_error;
571
572 if ((pthread_error= pthread_mutex_lock(&con->thread->lock)) == 0)
573 {
574 if (con->proc_list)
575 {
576 GEARMAN_LIST_DEL(con->thread->proc, con, proc_);
577 con->proc_list= false;
578 }
579
580 if ((pthread_error= pthread_mutex_unlock(&con->thread->lock)))
581 {
582 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
583 }
584 }
585 else
586 {
587 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
588 }
589 }
590
591 gearman_server_con_st *
gearman_server_con_proc_next(gearman_server_thread_st * thread)592 gearman_server_con_proc_next(gearman_server_thread_st *thread)
593 {
594 if (thread->proc_list == NULL)
595 {
596 return NULL;
597 }
598
599 gearman_server_con_st *con= NULL;
600
601 int pthread_error;
602 if ((pthread_error= pthread_mutex_lock(&thread->lock)) == 0)
603 {
604 con= thread->proc_list;
605 while (con != NULL)
606 {
607 GEARMAN_LIST_DEL(thread->proc, con, proc_);
608 con->proc_list= false;
609 if (!(con->proc_removed))
610 {
611 break;
612 }
613 con= thread->proc_list;
614 }
615
616 if ((pthread_error= pthread_mutex_unlock(&thread->lock)))
617 {
618 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_unlock");
619 }
620 }
621 else
622 {
623 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, pthread_error, "pthread_mutex_lock");
624 }
625
626 return con;
627 }
628
_server_job_timeout(int fd,short event,void * arg)629 static void _server_job_timeout(int fd, short event, void *arg)
630 {
631 (void)fd;
632 (void)event;
633 gearman_server_job_st *job= (gearman_server_job_st *)arg;
634
635 /* A timeout has ocurred on a job, re-queue it */
636 gearmand_log_warning(GEARMAN_DEFAULT_LOG_PARAM,
637 "Worker timeout reached on job, requeueing: %s %s",
638 job->job_handle, job->unique);
639
640 gearmand_error_t ret= gearman_server_job_queue(job);
641 if (ret != GEARMAN_SUCCESS)
642 {
643 gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
644 "Failed trying to requeue job after timeout, job lost: %s %s",
645 job->job_handle, job->unique);
646 gearman_server_job_free(job);
647 }
648 }
649
gearman_server_con_add_job_timeout(gearman_server_con_st * con,gearman_server_job_st * job)650 gearmand_error_t gearman_server_con_add_job_timeout(gearman_server_con_st *con, gearman_server_job_st *job)
651 {
652 if (job)
653 {
654 gearman_server_worker_st *worker;
655 for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
656 {
657 /* Assumes the functions are always fetched from the same server structure */
658 if (worker->function == job->function)
659 {
660 break;
661 }
662 }
663
664 /* It makes no sense to add a timeout to a connection that has no workers for a job */
665 assert(worker);
666 if (worker)
667 {
668 // We treat 0 and -1 as being the same (i.e. no timer)
669 if (worker->timeout > 0)
670 {
671 if (worker->timeout < 1000)
672 {
673 worker->timeout= 1000;
674 }
675
676 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Adding timeout on %s for %s (%dl)",
677 job->function->function_name,
678 job->job_handle,
679 worker->timeout);
680 if (con->timeout_event == NULL)
681 {
682 gearmand_con_st *dcon= con->con.context;
683 con->timeout_event= (struct event *)malloc(sizeof(struct event));
684 if (con->timeout_event == NULL)
685 {
686 return gearmand_gerror("creating timeout event", GEARMAN_MEMORY_ALLOCATION_FAILURE);
687 }
688 timeout_set(con->timeout_event, _server_job_timeout, job);
689 event_base_set(dcon->thread->base, con->timeout_event);
690 }
691
692 /* XXX Right now, if a worker has diff timeouts for functions I think
693 this will overwrite any existing timeouts on that event. One
694 solution to that would be to record the timeout from last time,
695 and only set this one if it is longer than that one. */
696
697 struct timeval timeout_tv = { 0 , 0 };
698 timeout_tv.tv_sec= worker->timeout;
699 timeout_add(con->timeout_event, &timeout_tv);
700 }
701 else if (con->timeout_event) // Delete the timer if it exists
702 {
703 gearman_server_con_delete_timeout(con);
704 }
705 }
706 }
707
708 return GEARMAN_SUCCESS;
709 }
710
gearman_server_con_delete_timeout(gearman_server_con_st * con)711 void gearman_server_con_delete_timeout(gearman_server_con_st *con)
712 {
713 if (con->timeout_event)
714 {
715 timeout_del(con->timeout_event);
716 con->timeout_event= NULL;
717 }
718 }
719
gearmand_ready(gearmand_connection_list_st * universal)720 gearman_server_con_st *gearmand_ready(gearmand_connection_list_st *universal)
721 {
722 if (universal->ready_con_list)
723 {
724 gearmand_io_st *con= universal->ready_con_list;
725 con->options.ready= false;
726 GEARMAN_LIST_DEL(universal->ready_con, con, ready_);
727 return con->root;
728 }
729
730 return NULL;
731 }
732