1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2008 Brian Aker, Eric Day
7  *  All rights reserved.
8  *
9  *  Redistribution and use in source and binary forms, with or without
10  *  modification, are permitted provided that the following conditions are
11  *  met:
12  *
13  *      * Redistributions of source code must retain the above copyright
14  *  notice, this list of conditions and the following disclaimer.
15  *
16  *      * Redistributions in binary form must reproduce the above
17  *  copyright notice, this list of conditions and the following disclaimer
18  *  in the documentation and/or other materials provided with the
19  *  distribution.
20  *
21  *      * The names of its contributors may not be used to endorse or
22  *  promote products derived from this software without specific prior
23  *  written permission.
24  *
25  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  */
38 
39 #include "gear_config.h"
40 
41 #include <libgearman/common.h>
42 #include <libgearman/function/base.hpp>
43 #include <libgearman/function/make.hpp>
44 
45 #include "libgearman/pipe.h"
46 
47 #include "libgearman/assert.hpp"
48 
49 #include <cstdio>
50 #include <cstdlib>
51 #include <cstring>
52 #include <memory>
53 #include <unistd.h>
54 #include <fcntl.h>
55 #include <cerrno>
56 
57 /**
58  * @addtogroup gearman_worker_static Static Worker Declarations
59  * @ingroup gearman_worker
60  * @{
61  */
62 
_function_exist(gearman_worker_st * worker,const char * function_name,size_t function_length)63 static inline struct _worker_function_st *_function_exist(gearman_worker_st *worker, const char *function_name, size_t function_length)
64 {
65   struct _worker_function_st *function;
66 
67   for (function= worker->function_list; function;
68        function= function->next)
69   {
70     if (function_length == function->function_length)
71     {
72       if (memcmp(function_name, function->function_name, function_length) == 0)
73       {
74         break;
75       }
76     }
77   }
78 
79   return function;
80 }
81 
82 /**
83  * Allocate a worker structure.
84  */
85 static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone);
86 
87 /**
88  * Initialize common packets for later use.
89  */
90 static gearman_return_t _worker_packet_init(gearman_worker_st *worker);
91 
92 /**
93  * Callback function used when parsing server lists.
94  */
95 static gearman_return_t _worker_add_server(const char *host, in_port_t port, void *context);
96 
97 /**
98  * Allocate and add a function to the register list.
99  */
100 static gearman_return_t _worker_function_create(gearman_worker_st *worker,
101                                                 const char *function_name, size_t function_length,
102                                                 const gearman_function_t &function,
103                                                 uint32_t timeout,
104                                                 void *context);
105 
106 /**
107  * Free a function.
108  */
109 static void _worker_function_free(gearman_worker_st *worker,
110                                   struct _worker_function_st *function);
111 
112 
113 /** @} */
114 
115 /*
116  * Public Definitions
117  */
118 
gearman_worker_create(gearman_worker_st * worker)119 gearman_worker_st *gearman_worker_create(gearman_worker_st *worker)
120 {
121   worker= _worker_allocate(worker, false);
122 
123   if (worker == NULL)
124   {
125     return NULL;
126   }
127 
128   if (gearman_failed(_worker_packet_init(worker)))
129   {
130     gearman_worker_free(worker);
131     return NULL;
132   }
133 
134   return worker;
135 }
136 
gearman_worker_clone(gearman_worker_st * worker,const gearman_worker_st * source)137 gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker,
138                                         const gearman_worker_st *source)
139 {
140   if (source == NULL)
141   {
142     return _worker_allocate(worker, false);
143   }
144 
145   worker= _worker_allocate(worker, true);
146 
147   if (worker == NULL)
148   {
149     return worker;
150   }
151 
152   worker->options.non_blocking= source->options.non_blocking;
153   worker->options.change= source->options.change;
154   worker->options.grab_uniq= source->options.grab_uniq;
155   worker->options.grab_all= source->options.grab_all;
156   worker->options.timeout_return= source->options.timeout_return;
157 
158   gearman_universal_clone(worker->universal, source->universal, true);
159 
160   if (gearman_failed(_worker_packet_init(worker)))
161   {
162     gearman_worker_free(worker);
163     return NULL;
164   }
165 
166   return worker;
167 }
168 
gearman_worker_free(gearman_worker_st * worker)169 void gearman_worker_free(gearman_worker_st *worker)
170 {
171   if (worker == NULL)
172   {
173     return;
174   }
175 
176   if (worker->universal.wakeup_fd[0] != INVALID_SOCKET)
177   {
178     close(worker->universal.wakeup_fd[0]);
179   }
180 
181   if (worker->universal.wakeup_fd[1] != INVALID_SOCKET)
182   {
183     close(worker->universal.wakeup_fd[1]);
184   }
185 
186   gearman_worker_unregister_all(worker);
187 
188   if (worker->options.packet_init)
189   {
190     gearman_packet_free(&worker->grab_job);
191     gearman_packet_free(&worker->pre_sleep);
192   }
193 
194   gearman_job_free(worker->job);
195   worker->work_job= NULL;
196 
197   if (worker->work_result)
198   {
199     gearman_free(worker->universal, worker->work_result);
200   }
201 
202   while (worker->function_list)
203   {
204     _worker_function_free(worker, worker->function_list);
205   }
206 
207   gearman_job_free_all(worker);
208 
209   gearman_universal_free(worker->universal);
210 
211   if (worker->options.allocated)
212   {
213     delete worker;
214   }
215 }
216 
gearman_worker_error(const gearman_worker_st * worker)217 const char *gearman_worker_error(const gearman_worker_st *worker)
218 {
219   if (worker == NULL)
220   {
221     return NULL;
222   }
223 
224   return gearman_universal_error(worker->universal);
225 }
226 
gearman_worker_errno(gearman_worker_st * worker)227 int gearman_worker_errno(gearman_worker_st *worker)
228 {
229   if (worker == NULL)
230   {
231     return EINVAL;
232   }
233 
234   return gearman_universal_errno(worker->universal);
235 }
236 
gearman_worker_options(const gearman_worker_st * worker)237 gearman_worker_options_t gearman_worker_options(const gearman_worker_st *worker)
238 {
239   if (worker == NULL)
240   {
241     return gearman_worker_options_t();
242   }
243 
244   int options;
245   memset(&options, 0, sizeof(gearman_worker_options_t));
246 
247   if (worker->options.allocated)
248     options|= int(GEARMAN_WORKER_ALLOCATED);
249   if (worker->options.non_blocking)
250     options|= int(GEARMAN_WORKER_NON_BLOCKING);
251   if (worker->options.packet_init)
252     options|= int(GEARMAN_WORKER_PACKET_INIT);
253   if (worker->options.change)
254     options|= int(GEARMAN_WORKER_CHANGE);
255   if (worker->options.grab_uniq)
256     options|= int(GEARMAN_WORKER_GRAB_UNIQ);
257   if (worker->options.grab_all)
258     options|= int(GEARMAN_WORKER_GRAB_ALL);
259   if (worker->options.timeout_return)
260     options|= int(GEARMAN_WORKER_TIMEOUT_RETURN);
261 
262   return gearman_worker_options_t(options);
263 }
264 
gearman_worker_set_options(gearman_worker_st * worker,gearman_worker_options_t options)265 void gearman_worker_set_options(gearman_worker_st *worker,
266                                 gearman_worker_options_t options)
267 {
268   if (worker == NULL)
269   {
270     return;
271   }
272 
273   gearman_worker_options_t usable_options[]= {
274     GEARMAN_WORKER_NON_BLOCKING,
275     GEARMAN_WORKER_GRAB_UNIQ,
276     GEARMAN_WORKER_GRAB_ALL,
277     GEARMAN_WORKER_TIMEOUT_RETURN,
278     GEARMAN_WORKER_MAX
279   };
280 
281   gearman_worker_options_t *ptr;
282 
283 
284   for (ptr= usable_options; *ptr != GEARMAN_WORKER_MAX ; ptr++)
285   {
286     if (options & *ptr)
287     {
288       gearman_worker_add_options(worker, *ptr);
289     }
290     else
291     {
292       gearman_worker_remove_options(worker, *ptr);
293     }
294   }
295 }
296 
gearman_worker_add_options(gearman_worker_st * worker,gearman_worker_options_t options)297 void gearman_worker_add_options(gearman_worker_st *worker,
298                                 gearman_worker_options_t options)
299 {
300   if (worker == NULL)
301   {
302     return;
303   }
304 
305   if (options & GEARMAN_WORKER_NON_BLOCKING)
306   {
307     gearman_universal_add_options(worker->universal, GEARMAN_NON_BLOCKING);
308     worker->options.non_blocking= true;
309   }
310 
311   if (options & GEARMAN_WORKER_GRAB_UNIQ)
312   {
313     worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_UNIQ;
314     gearman_return_t rc= gearman_packet_pack_header(&(worker->grab_job));
315     (void)(rc);
316     assert(gearman_success(rc));
317     worker->options.grab_uniq= true;
318   }
319 
320   if (options & GEARMAN_WORKER_GRAB_ALL)
321   {
322     worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_ALL;
323     gearman_return_t rc= gearman_packet_pack_header(&(worker->grab_job));
324     (void)(rc);
325     assert(gearman_success(rc));
326     worker->options.grab_all= true;
327   }
328 
329   if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
330   {
331     worker->options.timeout_return= true;
332   }
333 }
334 
gearman_worker_remove_options(gearman_worker_st * worker,gearman_worker_options_t options)335 void gearman_worker_remove_options(gearman_worker_st *worker,
336                                    gearman_worker_options_t options)
337 {
338   if (worker)
339   {
340     if (options & GEARMAN_WORKER_NON_BLOCKING)
341     {
342       gearman_universal_remove_options(worker->universal, GEARMAN_NON_BLOCKING);
343       worker->options.non_blocking= false;
344     }
345 
346     if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
347     {
348       worker->options.timeout_return= false;
349       gearman_universal_set_timeout(worker->universal, GEARMAN_WORKER_WAIT_TIMEOUT);
350     }
351 
352     if (options & GEARMAN_WORKER_GRAB_UNIQ)
353     {
354       worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
355       (void)gearman_packet_pack_header(&(worker->grab_job));
356       worker->options.grab_uniq= false;
357     }
358 
359     if (options & GEARMAN_WORKER_GRAB_ALL)
360     {
361       worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
362       (void)gearman_packet_pack_header(&(worker->grab_job));
363       worker->options.grab_all= false;
364     }
365   }
366 }
367 
gearman_worker_timeout(gearman_worker_st * worker)368 int gearman_worker_timeout(gearman_worker_st *worker)
369 {
370   if (worker == NULL)
371   {
372     return 0;
373   }
374 
375   return gearman_universal_timeout(worker->universal);
376 }
377 
gearman_worker_set_timeout(gearman_worker_st * worker,int timeout)378 void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout)
379 {
380   if (worker)
381   {
382     gearman_worker_add_options(worker, GEARMAN_WORKER_TIMEOUT_RETURN);
383     gearman_universal_set_timeout(worker->universal, timeout);
384   }
385 }
386 
gearman_worker_context(const gearman_worker_st * worker)387 void *gearman_worker_context(const gearman_worker_st *worker)
388 {
389   if (worker)
390   {
391     return worker->context;
392   }
393 
394   return NULL;
395 }
396 
gearman_worker_set_context(gearman_worker_st * worker,void * context)397 void gearman_worker_set_context(gearman_worker_st *worker, void *context)
398 {
399   if (worker)
400   {
401     worker->context= context;
402   }
403 }
404 
gearman_worker_set_log_fn(gearman_worker_st * worker,gearman_log_fn * function,void * context,gearman_verbose_t verbose)405 void gearman_worker_set_log_fn(gearman_worker_st *worker,
406                                gearman_log_fn *function, void *context,
407                                gearman_verbose_t verbose)
408 {
409   gearman_set_log_fn(worker->universal, function, context, verbose);
410 }
411 
gearman_worker_set_workload_malloc_fn(gearman_worker_st * worker,gearman_malloc_fn * function,void * context)412 void gearman_worker_set_workload_malloc_fn(gearman_worker_st *worker,
413                                            gearman_malloc_fn *function,
414                                            void *context)
415 {
416   if (worker)
417   {
418     gearman_set_workload_malloc_fn(worker->universal, function, context);
419   }
420 }
421 
gearman_worker_set_workload_free_fn(gearman_worker_st * worker,gearman_free_fn * function,void * context)422 void gearman_worker_set_workload_free_fn(gearman_worker_st *worker,
423                                          gearman_free_fn *function,
424                                          void *context)
425 {
426   if (worker)
427   {
428     gearman_set_workload_free_fn(worker->universal, function, context);
429   }
430 }
431 
gearman_worker_add_server(gearman_worker_st * worker,const char * host,in_port_t port)432 gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
433                                            const char *host, in_port_t port)
434 {
435   if (worker)
436   {
437     if (gearman_connection_create_args(worker->universal, host, port) == NULL)
438     {
439       return gearman_universal_error_code(worker->universal);
440     }
441 
442     return GEARMAN_SUCCESS;
443   }
444 
445   return GEARMAN_INVALID_ARGUMENT;
446 }
447 
gearman_worker_add_servers(gearman_worker_st * worker,const char * servers)448 gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker, const char *servers)
449 {
450   return gearman_parse_servers(servers, _worker_add_server, worker);
451 }
452 
gearman_worker_remove_servers(gearman_worker_st * worker)453 void gearman_worker_remove_servers(gearman_worker_st *worker)
454 {
455   if (worker)
456   {
457     gearman_free_all_cons(worker->universal);
458   }
459 }
460 
gearman_worker_wait(gearman_worker_st * worker)461 gearman_return_t gearman_worker_wait(gearman_worker_st *worker)
462 {
463   if (worker)
464   {
465     return gearman_wait(worker->universal);
466   }
467 
468   return GEARMAN_INVALID_ARGUMENT;
469 }
470 
gearman_worker_register(gearman_worker_st * worker,const char * function_name,uint32_t timeout)471 gearman_return_t gearman_worker_register(gearman_worker_st *worker,
472                                          const char *function_name,
473                                          uint32_t timeout)
474 {
475   gearman_function_t null_func= gearman_function_create_null();
476   return _worker_function_create(worker, function_name, strlen(function_name), null_func, timeout, NULL);
477 }
478 
gearman_worker_function_exist(gearman_worker_st * worker,const char * function_name,size_t function_length)479 bool gearman_worker_function_exist(gearman_worker_st *worker,
480                                    const char *function_name,
481                                    size_t function_length)
482 {
483   if (worker)
484   {
485     struct _worker_function_st *function;
486 
487     function= _function_exist(worker, function_name, function_length);
488 
489     return (function && function->options.remove == false) ? true : false;
490   }
491 
492   return false;
493 }
494 
_worker_unregister(gearman_worker_st * worker,const char * function_name,size_t function_length)495 static inline gearman_return_t _worker_unregister(gearman_worker_st *worker,
496                                                   const char *function_name, size_t function_length)
497 {
498   if (worker)
499   {
500     _worker_function_st *function= _function_exist(worker, function_name, function_length);
501 
502     if (function == NULL || function->options.remove)
503     {
504       return GEARMAN_NO_REGISTERED_FUNCTION;
505     }
506 
507     if (function->options.packet_in_use)
508     {
509       gearman_packet_free(&(function->packet()));
510       function->options.packet_in_use= false;
511     }
512 
513     const void *args[1];
514     size_t args_size[1];
515     args[0]= function->name();
516     args_size[0]= function->length();
517     gearman_return_t ret= gearman_packet_create_args(worker->universal, function->packet(),
518                                                      GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CANT_DO,
519                                                      args, args_size, 1);
520     if (gearman_failed(ret))
521     {
522       function->options.packet_in_use= false;
523       return ret;
524     }
525     function->options.packet_in_use= true;
526 
527     function->options.change= true;
528     function->options.remove= true;
529 
530     worker->options.change= true;
531 
532     return GEARMAN_SUCCESS;
533   }
534 
535   return GEARMAN_INVALID_ARGUMENT;
536 }
537 
gearman_worker_unregister(gearman_worker_st * worker,const char * function_name)538 gearman_return_t gearman_worker_unregister(gearman_worker_st *worker,
539                                            const char *function_name)
540 {
541   return _worker_unregister(worker, function_name, strlen(function_name));
542 }
543 
gearman_worker_unregister_all(gearman_worker_st * worker)544 gearman_return_t gearman_worker_unregister_all(gearman_worker_st *worker)
545 {
546   if (worker)
547   {
548     struct _worker_function_st *function;
549     uint32_t count= 0;
550 
551     if (worker->function_list == NULL)
552     {
553       return GEARMAN_NO_REGISTERED_FUNCTIONS;
554     }
555 
556     /* Lets find out if we have any functions left that are valid */
557     for (function= worker->function_list; function;
558          function= function->next)
559     {
560       if (function->options.remove == false)
561       {
562         count++;
563       }
564     }
565 
566     if (count == 0)
567     {
568       return GEARMAN_NO_REGISTERED_FUNCTIONS;
569     }
570 
571     gearman_packet_free(&(worker->function_list->packet()));
572 
573     gearman_return_t ret= gearman_packet_create_args(worker->universal,
574                                                      worker->function_list->packet(),
575                                                      GEARMAN_MAGIC_REQUEST,
576                                                      GEARMAN_COMMAND_RESET_ABILITIES,
577                                                      NULL, NULL, 0);
578     if (gearman_failed(ret))
579     {
580       worker->function_list->options.packet_in_use= false;
581 
582       return ret;
583     }
584 
585     while (worker->function_list->next)
586     {
587       _worker_function_free(worker, worker->function_list->next);
588     }
589 
590     worker->function_list->options.change= true;
591     worker->function_list->options.remove= true;
592 
593     worker->options.change= true;
594 
595     return GEARMAN_SUCCESS;
596   }
597 
598   return GEARMAN_INVALID_ARGUMENT;
599 }
600 
gearman_worker_grab_job(gearman_worker_st * worker,gearman_job_st * job,gearman_return_t * ret_ptr)601 gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
602                                         gearman_job_st *job,
603                                         gearman_return_t *ret_ptr)
604 {
605   if (worker)
606   {
607     struct _worker_function_st *function;
608     uint32_t active;
609 
610     gearman_return_t unused;
611     if (not ret_ptr)
612     {
613       ret_ptr= &unused;
614     }
615 
616     while (1)
617     {
618       switch (worker->state)
619       {
620       case GEARMAN_WORKER_STATE_START:
621         /* If there are any new functions changes, send them now. */
622         if (worker->options.change)
623         {
624           worker->function= worker->function_list;
625           while (worker->function)
626           {
627             if (not (worker->function->options.change))
628             {
629               worker->function= worker->function->next;
630               continue;
631             }
632 
633             for (worker->con= (&worker->universal)->con_list; worker->con;
634                  worker->con= worker->con->next)
635             {
636               if (worker->con->fd == -1)
637               {
638                 continue;
639               }
640 
641             case GEARMAN_WORKER_STATE_FUNCTION_SEND:
642               *ret_ptr= worker->con->send_packet(worker->function->packet(), true);
643               if (gearman_failed(*ret_ptr))
644               {
645                 if (*ret_ptr == GEARMAN_IO_WAIT)
646                 {
647                   worker->state= GEARMAN_WORKER_STATE_FUNCTION_SEND;
648                 }
649                 else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
650                 {
651                   continue;
652                 }
653 
654                 return NULL;
655               }
656             }
657 
658             if (worker->function->options.remove)
659             {
660               function= worker->function->prev;
661               _worker_function_free(worker, worker->function);
662               if (function == NULL)
663                 worker->function= worker->function_list;
664               else
665                 worker->function= function;
666             }
667             else
668             {
669               worker->function->options.change= false;
670               worker->function= worker->function->next;
671             }
672           }
673 
674           worker->options.change= false;
675         }
676 
677         if (not worker->function_list)
678         {
679           gearman_error(worker->universal, GEARMAN_NO_REGISTERED_FUNCTIONS, "no functions have been registered");
680           *ret_ptr= GEARMAN_NO_REGISTERED_FUNCTIONS;
681           return NULL;
682         }
683 
684         for (worker->con= (&worker->universal)->con_list; worker->con;
685              worker->con= worker->con->next)
686         {
687           /* If the connection to the job server is not active, start it. */
688           if (worker->con->fd == -1)
689           {
690             for (worker->function= worker->function_list;
691                  worker->function;
692                  worker->function= worker->function->next)
693             {
694             case GEARMAN_WORKER_STATE_CONNECT:
695               *ret_ptr= worker->con->send_packet(worker->function->packet(), true);
696               if (gearman_failed(*ret_ptr))
697               {
698                 if (*ret_ptr == GEARMAN_IO_WAIT)
699                 {
700                   worker->state= GEARMAN_WORKER_STATE_CONNECT;
701                 }
702                 else if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT or *ret_ptr == GEARMAN_LOST_CONNECTION)
703                 {
704                   break;
705                 }
706 
707                 return NULL;
708               }
709             }
710 
711             if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
712             {
713               continue;
714             }
715           }
716 
717         case GEARMAN_WORKER_STATE_GRAB_JOB_SEND:
718           if (worker->con->fd == -1)
719             continue;
720 
721           *ret_ptr= worker->con->send_packet(worker->grab_job, true);
722           if (gearman_failed(*ret_ptr))
723           {
724             if (*ret_ptr == GEARMAN_IO_WAIT)
725             {
726               worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
727             }
728             else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
729             {
730               continue;
731             }
732 
733             return NULL;
734           }
735 
736           if (not worker->job)
737           {
738             worker->job= gearman_job_create(worker, job);
739             if (not worker->job)
740             {
741               *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
742               return NULL;
743             }
744           }
745 
746           while (1)
747           {
748           case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
749             assert(worker->job);
750             (void)worker->con->receiving(worker->job->assigned, *ret_ptr, true);
751 
752             if (gearman_failed(*ret_ptr))
753             {
754               if (*ret_ptr == GEARMAN_IO_WAIT)
755               {
756                 worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_RECV;
757               }
758               else
759               {
760                 gearman_job_free(worker->job);
761                 worker->job= NULL;
762 
763                 if (*ret_ptr == GEARMAN_LOST_CONNECTION)
764                 {
765                   break;
766                 }
767               }
768 
769               return NULL;
770             }
771 
772             if (worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN or
773                 worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL or
774                 worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
775             {
776               worker->job->options.assigned_in_use= true;
777               worker->job->con= worker->con;
778               worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
779               job= worker->job;
780               worker->job= NULL;
781 
782               return job;
783             }
784 
785             if (worker->job->assigned.command == GEARMAN_COMMAND_NO_JOB or
786                 worker->job->assigned.command == GEARMAN_COMMAND_OPTION_RES)
787             {
788               gearman_packet_free(&(worker->job->assigned));
789               break;
790             }
791 
792             if (worker->job->assigned.command != GEARMAN_COMMAND_NOOP)
793             {
794               gearman_universal_set_error(worker->universal, GEARMAN_UNEXPECTED_PACKET, GEARMAN_AT,
795                                           "unexpected packet:%s",
796                                           gearman_command_info(worker->job->assigned.command)->name);
797               gearman_packet_free(&(worker->job->assigned));
798               gearman_job_free(worker->job);
799               worker->job= NULL;
800               *ret_ptr= GEARMAN_UNEXPECTED_PACKET;
801               return NULL;
802             }
803 
804             gearman_packet_free(&(worker->job->assigned));
805           }
806         }
807 
808       case GEARMAN_WORKER_STATE_PRE_SLEEP:
809         for (worker->con= (&worker->universal)->con_list; worker->con;
810              worker->con= worker->con->next)
811         {
812           if (worker->con->fd == INVALID_SOCKET)
813           {
814             continue;
815           }
816 
817           *ret_ptr= worker->con->send_packet(worker->pre_sleep, true);
818           if (gearman_failed(*ret_ptr))
819           {
820             if (*ret_ptr == GEARMAN_IO_WAIT)
821             {
822               worker->state= GEARMAN_WORKER_STATE_PRE_SLEEP;
823             }
824             else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
825             {
826               continue;
827             }
828 
829             return NULL;
830           }
831         }
832 
833         worker->state= GEARMAN_WORKER_STATE_START;
834 
835         /* Set a watch on all active connections that we sent a PRE_SLEEP to. */
836         active= 0;
837         for (worker->con= worker->universal.con_list; worker->con; worker->con= worker->con->next)
838         {
839           if (worker->con->fd == INVALID_SOCKET)
840           {
841             continue;
842           }
843 
844           worker->con->set_events(POLLIN);
845           active++;
846         }
847 
848         if ((&worker->universal)->options.non_blocking)
849         {
850           *ret_ptr= GEARMAN_NO_JOBS;
851           return NULL;
852         }
853 
854         if (active == 0)
855         {
856           if (worker->universal.timeout < 0)
857           {
858             gearman_nap(GEARMAN_WORKER_WAIT_TIMEOUT);
859           }
860           else
861           {
862             if (worker->universal.timeout > 0)
863             {
864               gearman_nap(worker->universal);
865             }
866 
867             if (worker->options.timeout_return)
868             {
869               *ret_ptr= gearman_error(worker->universal, GEARMAN_TIMEOUT, "Option timeout return reached");
870 
871               return NULL;
872             }
873           }
874         }
875         else
876         {
877           *ret_ptr= gearman_wait(worker->universal);
878           if (gearman_failed(*ret_ptr) and (*ret_ptr != GEARMAN_TIMEOUT or worker->options.timeout_return))
879           {
880             return NULL;
881           }
882         }
883 
884         break;
885       }
886     }
887   }
888 
889   return NULL;
890 }
891 
gearman_job_free_all(gearman_worker_st * worker)892 void gearman_job_free_all(gearman_worker_st *worker)
893 {
894   if (worker)
895   {
896     while (worker->job_list)
897     {
898       gearman_job_free(worker->job_list);
899     }
900   }
901 }
902 
gearman_worker_add_function(gearman_worker_st * worker,const char * function_name,uint32_t timeout,gearman_worker_fn * worker_fn,void * context)903 gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
904                                              const char *function_name,
905                                              uint32_t timeout,
906                                              gearman_worker_fn *worker_fn,
907                                              void *context)
908 {
909   if (worker)
910   {
911     if (function_name == NULL)
912     {
913       return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name not given");
914     }
915 
916     if (worker_fn == NULL)
917     {
918       return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function not given");
919     }
920     gearman_function_t local= gearman_function_create_v1(worker_fn);
921 
922     return _worker_function_create(worker,
923                                    function_name, strlen(function_name),
924                                    local,
925                                    timeout,
926                                    context);
927   }
928 
929   return GEARMAN_INVALID_ARGUMENT;
930 }
931 
gearman_worker_define_function(gearman_worker_st * worker,const char * function_name,const size_t function_name_length,const gearman_function_t function,const uint32_t timeout,void * context)932 gearman_return_t gearman_worker_define_function(gearman_worker_st *worker,
933                                                 const char *function_name, const size_t function_name_length,
934                                                 const gearman_function_t function,
935                                                 const uint32_t timeout,
936                                                 void *context)
937 {
938   if (worker)
939   {
940     if (function_name == NULL or function_name_length == 0)
941     {
942       return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name not given");
943     }
944 
945     return _worker_function_create(worker,
946                                    function_name, function_name_length,
947                                    function,
948                                    timeout,
949                                    context);
950   }
951 
952   return GEARMAN_INVALID_ARGUMENT;
953 }
954 
gearman_worker_reset_error(gearman_worker_st * worker)955 void gearman_worker_reset_error(gearman_worker_st *worker)
956 {
957   if (worker)
958   {
959     universal_reset_error(worker->universal);
960   }
961 }
962 
gearman_worker_work(gearman_worker_st * worker)963 gearman_return_t gearman_worker_work(gearman_worker_st *worker)
964 {
965   bool shutdown= false;
966 
967   if (worker)
968   {
969     universal_reset_error(worker->universal);
970 
971     switch (worker->work_state)
972     {
973     case GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB:
974       {
975         gearman_return_t ret;
976         worker->work_job= gearman_worker_grab_job(worker, NULL, &ret);
977 
978         if (gearman_failed(ret))
979         {
980           if (ret == GEARMAN_COULD_NOT_CONNECT)
981           {
982             gearman_reset(worker->universal);
983           }
984           return ret;
985         }
986         assert(worker->work_job);
987 
988         for (worker->work_function= worker->function_list;
989              worker->work_function;
990              worker->work_function= worker->work_function->next)
991         {
992           if (not strcmp(gearman_job_function_name(worker->work_job),
993                          worker->work_function->function_name))
994           {
995             break;
996           }
997         }
998 
999         if (not worker->work_function)
1000         {
1001           gearman_job_free(worker->work_job);
1002           worker->work_job= NULL;
1003           return gearman_error(worker->universal, GEARMAN_INVALID_FUNCTION_NAME, "Function not found");
1004         }
1005 
1006         if (not worker->work_function->has_callback())
1007         {
1008           gearman_job_free(worker->work_job);
1009           worker->work_job= NULL;
1010           return gearman_error(worker->universal, GEARMAN_INVALID_FUNCTION_NAME, "Neither a gearman_worker_fn, or gearman_function_fn callback was supplied");
1011         }
1012 
1013         worker->work_result_size= 0;
1014       }
1015 
1016     case GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION:
1017       {
1018         switch (worker->work_function->callback(worker->work_job,
1019                                                 static_cast<void *>(worker->work_function->context)))
1020         {
1021         case GEARMAN_FUNCTION_INVALID_ARGUMENT:
1022           worker->work_job->error_code= gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "worker returned an invalid response, gearman_return_t");
1023         case GEARMAN_FUNCTION_FATAL:
1024           if (gearman_job_send_fail_fin(worker->work_job) == GEARMAN_LOST_CONNECTION) // If we fail this, we have no connection, @note this causes us to lose the current error
1025           {
1026             worker->work_job->error_code= GEARMAN_LOST_CONNECTION;
1027             break;
1028           }
1029           worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
1030           return worker->work_job->error_code;
1031 
1032         case GEARMAN_FUNCTION_ERROR: // retry
1033           gearman_reset(worker->universal);
1034           worker->work_job->error_code= GEARMAN_LOST_CONNECTION;
1035           break;
1036 
1037         case GEARMAN_FUNCTION_SHUTDOWN:
1038           shutdown= true;
1039 
1040         case GEARMAN_FUNCTION_SUCCESS:
1041           break;
1042         }
1043 
1044         if (worker->work_job->error_code == GEARMAN_LOST_CONNECTION)
1045         {
1046           break;
1047         }
1048       }
1049 
1050     case GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE:
1051       {
1052         worker->work_job->error_code= gearman_job_send_complete_fin(worker->work_job,
1053                                                                     worker->work_result, worker->work_result_size);
1054         if (worker->work_job->error_code == GEARMAN_IO_WAIT)
1055         {
1056           worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE;
1057           return gearman_error(worker->universal, worker->work_job->error_code,
1058                                "A failure occurred after worker had successful complete, unless gearman_job_send_complete() was called directly by worker, client has not been informed of success.");
1059         }
1060 
1061         if (worker->work_result)
1062         {
1063           gearman_free(worker->universal, worker->work_result);
1064           worker->work_result= NULL;
1065         }
1066 
1067         // If we lost the connection, we retry the work, otherwise we error
1068         if (worker->work_job->error_code == GEARMAN_LOST_CONNECTION)
1069         {
1070           break;
1071         }
1072         else if (worker->work_job->error_code == GEARMAN_SHUTDOWN)
1073         { }
1074         else if (gearman_failed(worker->work_job->error_code))
1075         {
1076           worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
1077 
1078           return worker->work_job->error_code;
1079         }
1080       }
1081       break;
1082 
1083     case GEARMAN_WORKER_WORK_UNIVERSAL_FAIL:
1084       {
1085         if (gearman_failed(worker->work_job->error_code= gearman_job_send_fail_fin(worker->work_job)))
1086         {
1087           if (worker->work_job->error_code == GEARMAN_LOST_CONNECTION)
1088           {
1089             break;
1090           }
1091 
1092           return worker->work_job->error_code;
1093         }
1094       }
1095       break;
1096     }
1097 
1098     gearman_job_free(worker->work_job);
1099     worker->work_job= NULL;
1100 
1101     worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
1102 
1103     if (shutdown)
1104     {
1105       return GEARMAN_SHUTDOWN;
1106     }
1107 
1108     return GEARMAN_SUCCESS;
1109   }
1110 
1111   return GEARMAN_INVALID_ARGUMENT;
1112 }
1113 
gearman_worker_echo(gearman_worker_st * worker,const void * workload,size_t workload_size)1114 gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
1115                                      const void *workload,
1116                                      size_t workload_size)
1117 {
1118   if (worker)
1119   {
1120     return gearman_echo(worker->universal, workload, workload_size);
1121   }
1122 
1123   return GEARMAN_INVALID_ARGUMENT;
1124 }
1125 
1126 /*
1127  * Static Definitions
1128  */
1129 
_worker_allocate(gearman_worker_st * worker,bool is_clone)1130 static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone)
1131 {
1132   if (worker)
1133   {
1134     worker->options.allocated= false;
1135   }
1136   else
1137   {
1138     worker= new (std::nothrow) gearman_worker_st;
1139     if (worker == NULL)
1140     {
1141       return NULL;
1142     }
1143 
1144     worker->options.allocated= true;
1145   }
1146 
1147   worker->options.non_blocking= false;
1148   worker->options.packet_init= false;
1149   worker->options.change= false;
1150   worker->options.grab_uniq= true;
1151   worker->options.grab_all= true;
1152   worker->options.timeout_return= false;
1153 
1154   worker->state= GEARMAN_WORKER_STATE_START;
1155   worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
1156   worker->function_count= 0;
1157   worker->job_count= 0;
1158   worker->work_result_size= 0;
1159   worker->context= NULL;
1160   worker->con= NULL;
1161   worker->job= NULL;
1162   worker->job_list= NULL;
1163   worker->function= NULL;
1164   worker->function_list= NULL;
1165   worker->work_function= NULL;
1166   worker->work_result= NULL;
1167 
1168   if (is_clone == false)
1169   {
1170     gearman_universal_initialize(worker->universal);
1171 #if 0
1172     gearman_universal_set_timeout(worker->universal, GEARMAN_WORKER_WAIT_TIMEOUT);
1173 #endif
1174   }
1175 
1176   if (setup_shutdown_pipe(worker->universal.wakeup_fd) == false)
1177   {
1178     if (worker->options.allocated)
1179     {
1180       delete worker;
1181     }
1182 
1183     return NULL;
1184   }
1185 
1186   return worker;
1187 }
1188 
_worker_packet_init(gearman_worker_st * worker)1189 static gearman_return_t _worker_packet_init(gearman_worker_st *worker)
1190 {
1191   gearman_return_t ret= gearman_packet_create_args(worker->universal, worker->grab_job,
1192                                                    GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_GRAB_JOB_ALL,
1193                                                    NULL, NULL, 0);
1194   if (gearman_failed(ret))
1195   {
1196     return ret;
1197   }
1198 
1199   ret= gearman_packet_create_args(worker->universal, worker->pre_sleep,
1200                                   GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_PRE_SLEEP,
1201                                   NULL, NULL, 0);
1202   if (gearman_failed(ret))
1203   {
1204     gearman_packet_free(&(worker->grab_job));
1205     return ret;
1206   }
1207 
1208   worker->options.packet_init= true;
1209 
1210   return GEARMAN_SUCCESS;
1211 }
1212 
_worker_add_server(const char * host,in_port_t port,void * context)1213 static gearman_return_t _worker_add_server(const char *host, in_port_t port, void *context)
1214 {
1215   return gearman_worker_add_server(static_cast<gearman_worker_st *>(context), host, port);
1216 }
1217 
_worker_function_create(gearman_worker_st * worker,const char * function_name,size_t function_length,const gearman_function_t & function_arg,uint32_t timeout,void * context)1218 static gearman_return_t _worker_function_create(gearman_worker_st *worker,
1219                                                 const char *function_name, size_t function_length,
1220                                                 const gearman_function_t &function_arg,
1221                                                 uint32_t timeout,
1222                                                 void *context)
1223 {
1224   const void *args[2];
1225   size_t args_size[2];
1226 
1227   if (worker == NULL)
1228   {
1229     return GEARMAN_INVALID_ARGUMENT;
1230   }
1231 
1232   if (function_length == 0 or function_name == NULL or function_length > GEARMAN_FUNCTION_MAX_SIZE)
1233   {
1234     if (function_length > GEARMAN_FUNCTION_MAX_SIZE)
1235     {
1236       gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name longer then GEARMAN_MAX_FUNCTION_SIZE");
1237     }
1238     else
1239     {
1240       gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "invalid function");
1241     }
1242 
1243     return GEARMAN_INVALID_ARGUMENT;
1244   }
1245 
1246   _worker_function_st *function= make(worker->universal._namespace, function_name, function_length, function_arg, context);
1247   if (function == NULL)
1248   {
1249     gearman_perror(worker->universal, "_worker_function_st::new()");
1250     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
1251   }
1252 
1253   gearman_return_t ret;
1254   if (timeout > 0)
1255   {
1256     char timeout_buffer[11];
1257     snprintf(timeout_buffer, sizeof(timeout_buffer), "%u", timeout);
1258     args[0]= function->name();
1259     args_size[0]= function->length() + 1;
1260     args[1]= timeout_buffer;
1261     args_size[1]= strlen(timeout_buffer);
1262     ret= gearman_packet_create_args(worker->universal, function->packet(),
1263                                     GEARMAN_MAGIC_REQUEST,
1264                                     GEARMAN_COMMAND_CAN_DO_TIMEOUT,
1265                                     args, args_size, 2);
1266   }
1267   else
1268   {
1269     args[0]= function->name();
1270     args_size[0]= function->length();
1271     ret= gearman_packet_create_args(worker->universal, function->packet(),
1272                                     GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
1273                                     args, args_size, 1);
1274   }
1275 
1276   if (gearman_failed(ret))
1277   {
1278     delete function;
1279 
1280     return ret;
1281   }
1282 
1283   if (worker->function_list)
1284   {
1285     worker->function_list->prev= function;
1286   }
1287 
1288   function->next= worker->function_list;
1289   function->prev= NULL;
1290   worker->function_list= function;
1291   worker->function_count++;
1292 
1293   worker->options.change= true;
1294 
1295   return GEARMAN_SUCCESS;
1296 }
1297 
_worker_function_free(gearman_worker_st * worker,struct _worker_function_st * function)1298 static void _worker_function_free(gearman_worker_st *worker,
1299                                   struct _worker_function_st *function)
1300 {
1301   if (worker->function_list == function)
1302   {
1303     worker->function_list= function->next;
1304   }
1305 
1306   if (function->prev)
1307   {
1308     function->prev->next= function->next;
1309   }
1310 
1311   if (function->next)
1312   {
1313     function->next->prev= function->prev;
1314   }
1315   worker->function_count--;
1316 
1317   delete function;
1318 }
1319 
gearman_worker_set_memory_allocators(gearman_worker_st * worker,gearman_malloc_fn * malloc_fn,gearman_free_fn * free_fn,gearman_realloc_fn * realloc_fn,gearman_calloc_fn * calloc_fn,void * context)1320 gearman_return_t gearman_worker_set_memory_allocators(gearman_worker_st *worker,
1321                                                       gearman_malloc_fn *malloc_fn,
1322                                                       gearman_free_fn *free_fn,
1323                                                       gearman_realloc_fn *realloc_fn,
1324                                                       gearman_calloc_fn *calloc_fn,
1325                                                       void *context)
1326 {
1327   if (worker == NULL)
1328   {
1329     return GEARMAN_INVALID_ARGUMENT;
1330   }
1331 
1332   return gearman_set_memory_allocator(worker->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
1333 }
1334 
gearman_worker_set_server_option(gearman_worker_st * self,const char * option_arg,size_t option_arg_size)1335 bool gearman_worker_set_server_option(gearman_worker_st *self, const char *option_arg, size_t option_arg_size)
1336 {
1337   gearman_string_t option= { option_arg, option_arg_size };
1338 
1339   return gearman_request_option(self->universal, option);
1340 }
1341 
gearman_worker_set_namespace(gearman_worker_st * self,const char * namespace_key,size_t namespace_key_size)1342 void gearman_worker_set_namespace(gearman_worker_st *self, const char *namespace_key, size_t namespace_key_size)
1343 {
1344   if (self)
1345   {
1346     gearman_universal_set_namespace(self->universal, namespace_key, namespace_key_size);
1347   }
1348 }
1349 
gearman_worker_id(gearman_worker_st * self)1350 gearman_id_t gearman_worker_id(gearman_worker_st *self)
1351 {
1352   if (self == NULL)
1353   {
1354     gearman_id_t handle= { INVALID_SOCKET, INVALID_SOCKET };
1355     return handle;
1356   }
1357 
1358   return gearman_universal_id(self->universal);
1359 }
1360 
gearman_job_clone_worker(gearman_job_st * job)1361 gearman_worker_st *gearman_job_clone_worker(gearman_job_st *job)
1362 {
1363   if (job)
1364   {
1365     return gearman_worker_clone(NULL, job->worker);
1366   }
1367 
1368   return NULL;
1369 }
1370 
gearman_worker_set_identifier(gearman_worker_st * worker,const char * id,size_t id_size)1371 gearman_return_t gearman_worker_set_identifier(gearman_worker_st *worker,
1372                                                const char *id, size_t id_size)
1373 {
1374   if (worker)
1375   {
1376     return gearman_set_identifier(worker->universal, id, id_size);
1377   }
1378 
1379   return GEARMAN_INVALID_ARGUMENT;
1380 }
1381 
gearman_worker_namespace(gearman_worker_st * self)1382 const char *gearman_worker_namespace(gearman_worker_st *self)
1383 {
1384   return gearman_univeral_namespace(self->universal);
1385 }
1386