1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2008 Brian Aker, Eric Day
7  *  All rights reserved.
8  *
9  *  Redistribution and use in source and binary forms, with or without
10  *  modification, are permitted provided that the following conditions are
11  *  met:
12  *
13  *      * Redistributions of source code must retain the above copyright
14  *  notice, this list of conditions and the following disclaimer.
15  *
16  *      * Redistributions in binary form must reproduce the above
17  *  copyright notice, this list of conditions and the following disclaimer
18  *  in the documentation and/or other materials provided with the
19  *  distribution.
20  *
21  *      * The names of its contributors may not be used to endorse or
22  *  promote products derived from this software without specific prior
23  *  written permission.
24  *
25  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  */
38 
39 #include "gear_config.h"
40 
41 #include <libgearman/common.h>
42 #include <libgearman/log.hpp>
43 
44 #include "libgearman/assert.hpp"
45 
46 #include <arpa/inet.h>
47 #include <cerrno>
48 #include <cstdio>
49 #include <cstdlib>
50 #include <cstring>
51 #include <memory>
52 #include <netdb.h>
53 #include <netinet/in.h>
54 #include <sys/socket.h>
55 
56 /*
57   Allocate a client structure.
58  */
_client_allocate(gearman_client_st * client,bool is_clone)59 static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
60 {
61   if (client)
62   {
63     client->options.allocated= false;
64   }
65   else
66   {
67     client= new (std::nothrow) gearman_client_st;
68     if (client == NULL)
69     {
70       return NULL;
71     }
72 
73     client->options.allocated= true;
74   }
75 
76   client->options.non_blocking= false;
77   client->options.unbuffered_result= false;
78   client->options.no_new= false;
79   client->options.free_tasks= false;
80   client->options.generate_unique= true;
81 
82   client->state= GEARMAN_CLIENT_STATE_IDLE;
83   client->new_tasks= 0;
84   client->running_tasks= 0;
85   client->task_count= 0;
86   client->context= NULL;
87   client->con= NULL;
88   client->task= NULL;
89   client->task_list= NULL;
90   client->task_context_free_fn= NULL;
91   gearman_client_clear_fn(client);
92 
93   if (is_clone == false)
94   {
95     gearman_universal_initialize(client->universal);
96   }
97 
98   return client;
99 }
100 
101 /**
102  * Callback function used when parsing server lists.
103  */
_client_add_server(const char * host,in_port_t port,void * context)104 static gearman_return_t _client_add_server(const char *host, in_port_t port,
105                                            void *context)
106 {
107   return gearman_client_add_server(static_cast<gearman_client_st *>(context), host, port);
108 }
109 
110 
111 /**
112  * Real do function.
113  */
_client_do(gearman_client_st * client,gearman_command_t command,const char * function_name,const char * unique,const void * workload_str,size_t workload_size,size_t * result_size,gearman_return_t * ret_ptr)114 static void *_client_do(gearman_client_st *client, gearman_command_t command,
115                         const char *function_name,
116                         const char *unique,
117                         const void *workload_str, size_t workload_size,
118                         size_t *result_size, gearman_return_t *ret_ptr)
119 {
120   gearman_return_t unused;
121   if (ret_ptr == NULL)
122   {
123     ret_ptr= &unused;
124   }
125 
126   if (client == NULL)
127   {
128     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
129     return NULL;
130   }
131 
132   universal_reset_error(client->universal);
133 
134   size_t unused_size;
135   if (result_size == NULL)
136   {
137     result_size= &unused_size;
138   }
139   *result_size= 0;
140 
141   gearman_string_t function= { gearman_string_param_cstr(function_name) };
142   gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
143   gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
144 
145   gearman_task_st do_task;
146   gearman_task_st *do_task_ptr= add_task(*client, &do_task, NULL, command,
147                                          function,
148                                          local_unique,
149                                          workload,
150                                          time_t(0),
151                                          gearman_actions_do_default());
152   if (do_task_ptr == NULL)
153   {
154     *ret_ptr= gearman_universal_error_code(client->universal);
155     return NULL;
156   }
157   do_task_ptr->type= GEARMAN_TASK_KIND_DO;
158 
159   gearman_return_t ret= gearman_client_run_block_tasks(client, do_task_ptr);
160 
161   // gearman_client_run_tasks failed
162   assert(client->task_list); // Programmer error, we should always have the task that we used for do
163 
164   char *returnable= NULL;
165   if (gearman_failed(ret))
166   {
167     if (ret == GEARMAN_COULD_NOT_CONNECT)
168     { }
169     else
170     {
171       gearman_error(client->universal, ret, "occured during gearman_client_run_tasks()");
172     }
173 
174     *ret_ptr= ret;
175     *result_size= 0;
176   }
177   else if (gearman_success(ret) and do_task_ptr->result_rc == GEARMAN_SUCCESS)
178   {
179     *ret_ptr= do_task_ptr->result_rc;
180     if (gearman_task_result(do_task_ptr))
181     {
182       if (gearman_has_allocator(client->universal))
183       {
184         gearman_string_t result= gearman_result_string(do_task_ptr->result_ptr);
185         returnable= static_cast<char *>(gearman_malloc(client->universal, gearman_size(result) +1));
186         if (returnable == NULL)
187         {
188           gearman_error(client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "custom workload_fn failed to allocate memory");
189           *result_size= 0;
190         }
191         else // NULL terminate
192         {
193           memcpy(returnable, gearman_c_str(result), gearman_size(result));
194           returnable[gearman_size(result)]= 0;
195           *result_size= gearman_size(result);
196         }
197       }
198       else
199       {
200         gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
201         *result_size= gearman_size(result);
202         returnable= const_cast<char *>(gearman_c_str(result));
203       }
204     }
205     else // NULL job
206     {
207       *result_size= 0;
208     }
209   }
210   else // gearman_client_run_tasks() was successful, but the task was not
211   {
212     gearman_error(client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
213 
214     *ret_ptr= do_task_ptr->result_rc;
215     *result_size= 0;
216   }
217 
218   gearman_task_free(&do_task);
219   client->new_tasks= 0;
220   client->running_tasks= 0;
221 
222   return returnable;
223 }
224 
225 /*
226   Real background do function.
227 */
_client_do_background(gearman_client_st * client,gearman_command_t command,gearman_string_t & function,gearman_unique_t & unique,gearman_string_t & workload,gearman_job_handle_t job_handle)228 static gearman_return_t _client_do_background(gearman_client_st *client,
229                                               gearman_command_t command,
230                                               gearman_string_t &function,
231                                               gearman_unique_t &unique,
232                                               gearman_string_t &workload,
233                                               gearman_job_handle_t job_handle)
234 {
235   if (client == NULL)
236   {
237     return GEARMAN_INVALID_ARGUMENT;
238   }
239 
240   universal_reset_error(client->universal);
241 
242   if (gearman_size(function) == 0)
243   {
244     return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "function argument was empty");
245   }
246 
247   client->_do_handle[0]= 0; // Reset the job_handle we store in client
248 
249   gearman_task_st do_task, *do_task_ptr;
250   do_task_ptr= add_task(*client, &do_task,
251                         client,
252                         command,
253                         function,
254                         unique,
255                         workload,
256                         time_t(0),
257                         gearman_actions_do_default());
258   if (not do_task_ptr)
259   {
260     return gearman_universal_error_code(client->universal);
261   }
262   do_task_ptr->type= GEARMAN_TASK_KIND_DO;
263 
264   gearman_return_t ret= gearman_client_run_block_tasks(client, do_task_ptr);
265 
266   if (job_handle)
267   {
268     strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
269   }
270   strncpy(client->_do_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
271   client->new_tasks= 0;
272   client->running_tasks= 0;
273   gearman_task_free(&do_task);
274 
275   return ret;
276 }
277 
278 
279 /*
280  * Public Definitions
281  */
282 
gearman_client_create(gearman_client_st * client)283 gearman_client_st *gearman_client_create(gearman_client_st *client)
284 {
285   return _client_allocate(client, false);
286 }
287 
gearman_client_clone(gearman_client_st * client,const gearman_client_st * from)288 gearman_client_st *gearman_client_clone(gearman_client_st *client,
289                                         const gearman_client_st *from)
290 {
291   if (from == NULL)
292   {
293     return _client_allocate(client, false);
294   }
295 
296   client= _client_allocate(client, true);
297 
298   if (client == NULL)
299   {
300     return client;
301   }
302 
303   client->options.non_blocking= from->options.non_blocking;
304   client->options.unbuffered_result= from->options.unbuffered_result;
305   client->options.no_new= from->options.no_new;
306   client->options.free_tasks= from->options.free_tasks;
307   client->options.generate_unique= from->options.generate_unique;
308   client->actions= from->actions;
309   client->_do_handle[0]= 0;
310 
311   gearman_universal_clone(client->universal, from->universal);
312 
313   return client;
314 }
315 
gearman_client_free(gearman_client_st * client)316 void gearman_client_free(gearman_client_st *client)
317 {
318   if (client)
319   {
320 
321     gearman_client_task_free_all(client);
322 
323     gearman_universal_free(client->universal);
324 
325     if (client->options.allocated)
326     {
327       delete client;
328     }
329   }
330 }
331 
gearman_client_error(const gearman_client_st * client)332 const char *gearman_client_error(const gearman_client_st *client)
333 {
334   if (client)
335   {
336     return gearman_universal_error(client->universal);
337   }
338 
339   return NULL;
340 }
341 
gearman_client_error_code(const gearman_client_st * client)342 gearman_return_t gearman_client_error_code(const gearman_client_st *client)
343 {
344   if (client == NULL)
345   {
346     return GEARMAN_INVALID_ARGUMENT;
347   }
348 
349   return gearman_universal_error_code(client->universal);
350 }
351 
gearman_client_errno(const gearman_client_st * client)352 int gearman_client_errno(const gearman_client_st *client)
353 {
354   if (client == NULL)
355   {
356     return EINVAL;
357   }
358 
359   return gearman_universal_errno(client->universal);
360 }
361 
gearman_client_options(const gearman_client_st * client)362 gearman_client_options_t gearman_client_options(const gearman_client_st *client)
363 {
364   if (client)
365   {
366     int32_t options;
367     memset(&options, 0, sizeof(int32_t));
368 
369     if (client->options.allocated)
370       options|= int(GEARMAN_CLIENT_ALLOCATED);
371 
372     if (client->options.non_blocking)
373       options|= int(GEARMAN_CLIENT_NON_BLOCKING);
374 
375     if (client->options.unbuffered_result)
376       options|= int(GEARMAN_CLIENT_UNBUFFERED_RESULT);
377 
378     if (client->options.no_new)
379       options|= int(GEARMAN_CLIENT_NO_NEW);
380 
381     if (client->options.free_tasks)
382       options|= int(GEARMAN_CLIENT_FREE_TASKS);
383 
384     if (client->options.generate_unique)
385       options|= int(GEARMAN_CLIENT_GENERATE_UNIQUE);
386 
387     return gearman_client_options_t(options);
388   }
389 
390   return gearman_client_options_t(GEARMAN_WORKER_MAX);
391 }
392 
gearman_client_has_option(gearman_client_st * client,gearman_client_options_t option)393 bool gearman_client_has_option(gearman_client_st *client,
394                                 gearman_client_options_t option)
395 {
396   if (client)
397   {
398     switch (option)
399     {
400     case GEARMAN_CLIENT_ALLOCATED:
401       return client->options.allocated;
402 
403     case GEARMAN_CLIENT_NON_BLOCKING:
404       return client->options.non_blocking;
405 
406     case GEARMAN_CLIENT_UNBUFFERED_RESULT:
407       return client->options.unbuffered_result;
408 
409     case GEARMAN_CLIENT_NO_NEW:
410       return client->options.no_new;
411 
412     case GEARMAN_CLIENT_FREE_TASKS:
413       return client->options.free_tasks;
414 
415     case GEARMAN_CLIENT_GENERATE_UNIQUE:
416       return client->options.generate_unique;
417 
418     default:
419     case GEARMAN_CLIENT_TASK_IN_USE:
420     case GEARMAN_CLIENT_MAX:
421       break; // Let these fall through to false
422     }
423   }
424 
425   return false;
426 }
427 
gearman_client_set_options(gearman_client_st * client,gearman_client_options_t options)428 void gearman_client_set_options(gearman_client_st *client,
429                                 gearman_client_options_t options)
430 {
431   if (client)
432   {
433     gearman_client_options_t usable_options[]= {
434       GEARMAN_CLIENT_NON_BLOCKING,
435       GEARMAN_CLIENT_UNBUFFERED_RESULT,
436       GEARMAN_CLIENT_FREE_TASKS,
437       GEARMAN_CLIENT_GENERATE_UNIQUE,
438       GEARMAN_CLIENT_MAX
439     };
440 
441     for (gearman_client_options_t* ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
442     {
443       if (options & *ptr)
444       {
445         gearman_client_add_options(client, *ptr);
446       }
447       else
448       {
449         gearman_client_remove_options(client, *ptr);
450       }
451     }
452   }
453 }
454 
gearman_client_add_options(gearman_client_st * client,gearman_client_options_t options)455 void gearman_client_add_options(gearman_client_st *client,
456                                 gearman_client_options_t options)
457 {
458   if (client)
459   {
460     if (options & GEARMAN_CLIENT_NON_BLOCKING)
461     {
462       gearman_universal_add_options(client->universal, GEARMAN_NON_BLOCKING);
463       client->options.non_blocking= true;
464     }
465 
466     if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
467     {
468       client->options.unbuffered_result= true;
469     }
470 
471     if (options & GEARMAN_CLIENT_FREE_TASKS)
472     {
473       client->options.free_tasks= true;
474     }
475 
476     if (options & GEARMAN_CLIENT_GENERATE_UNIQUE)
477     {
478       client->options.generate_unique= true;
479     }
480   }
481 }
482 
gearman_client_remove_options(gearman_client_st * client,gearman_client_options_t options)483 void gearman_client_remove_options(gearman_client_st *client,
484                                    gearman_client_options_t options)
485 {
486   if (client)
487   {
488     if (options & GEARMAN_CLIENT_NON_BLOCKING)
489     {
490       gearman_universal_remove_options(client->universal, GEARMAN_NON_BLOCKING);
491       client->options.non_blocking= false;
492     }
493 
494     if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
495     {
496       client->options.unbuffered_result= false;
497     }
498 
499     if (options & GEARMAN_CLIENT_FREE_TASKS)
500     {
501       client->options.free_tasks= false;
502     }
503 
504     if (options & GEARMAN_CLIENT_GENERATE_UNIQUE)
505     {
506       client->options.generate_unique= false;
507     }
508   }
509 }
510 
gearman_client_timeout(gearman_client_st * client)511 int gearman_client_timeout(gearman_client_st *client)
512 {
513   if (client)
514   {
515     return gearman_universal_timeout(client->universal);
516   }
517 
518   return -1;
519 }
520 
gearman_client_set_timeout(gearman_client_st * client,int timeout)521 void gearman_client_set_timeout(gearman_client_st *client, int timeout)
522 {
523   if (client)
524   {
525     gearman_universal_set_timeout(client->universal, timeout);
526   }
527 }
528 
gearman_client_context(const gearman_client_st * client)529 void *gearman_client_context(const gearman_client_st *client)
530 {
531   if (client)
532   {
533     return const_cast<void *>(client->context);
534   }
535 
536   return NULL;
537 }
538 
gearman_client_set_context(gearman_client_st * client,void * context)539 void gearman_client_set_context(gearman_client_st *client, void *context)
540 {
541   if (client)
542   {
543     client->context= context;
544   }
545 }
546 
gearman_client_set_log_fn(gearman_client_st * client,gearman_log_fn * function,void * context,gearman_verbose_t verbose)547 void gearman_client_set_log_fn(gearman_client_st *client,
548                                gearman_log_fn *function, void *context,
549                                gearman_verbose_t verbose)
550 {
551   if (client)
552   {
553     gearman_set_log_fn(client->universal, function, context, verbose);
554   }
555 }
556 
gearman_client_set_workload_malloc_fn(gearman_client_st * client,gearman_malloc_fn * function,void * context)557 void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
558                                            gearman_malloc_fn *function,
559                                            void *context)
560 {
561   if (client)
562   {
563     gearman_set_workload_malloc_fn(client->universal, function, context);
564   }
565 }
566 
gearman_client_set_workload_free_fn(gearman_client_st * client,gearman_free_fn * function,void * context)567 void gearman_client_set_workload_free_fn(gearman_client_st *client, gearman_free_fn *function, void *context)
568 {
569   if (client)
570   {
571     gearman_set_workload_free_fn(client->universal, function, context);
572   }
573 }
574 
gearman_client_add_server(gearman_client_st * client,const char * host,in_port_t port)575 gearman_return_t gearman_client_add_server(gearman_client_st *client,
576                                            const char *host, in_port_t port)
577 {
578   if (client)
579   {
580     if (gearman_connection_create_args(client->universal, host, port) == false)
581     {
582       assert(client->universal.error.rc != GEARMAN_SUCCESS);
583       return gearman_universal_error_code(client->universal);
584     }
585 
586     return GEARMAN_SUCCESS;
587   }
588 
589   return GEARMAN_INVALID_ARGUMENT;
590 }
591 
gearman_client_add_servers(gearman_client_st * client,const char * servers)592 gearman_return_t gearman_client_add_servers(gearman_client_st *client,
593                                             const char *servers)
594 {
595   return gearman_parse_servers(servers, _client_add_server, client);
596 }
597 
gearman_client_remove_servers(gearman_client_st * client)598 void gearman_client_remove_servers(gearman_client_st *client)
599 {
600   if (client)
601   {
602     gearman_free_all_cons(client->universal);
603   }
604 }
605 
gearman_client_wait(gearman_client_st * client)606 gearman_return_t gearman_client_wait(gearman_client_st *client)
607 {
608   if (client)
609   {
610     return gearman_wait(client->universal);
611   }
612 
613   return GEARMAN_INVALID_ARGUMENT;
614 }
615 
gearman_client_do(gearman_client_st * client,const char * function,const char * unique,const void * workload,size_t workload_size,size_t * result_size,gearman_return_t * ret_ptr)616 void *gearman_client_do(gearman_client_st *client,
617                         const char *function,
618                         const char *unique,
619                         const void *workload,
620                         size_t workload_size, size_t *result_size,
621                         gearman_return_t *ret_ptr)
622 {
623   return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
624                     function,
625                     unique,
626                     workload, workload_size,
627                     result_size, ret_ptr);
628 }
629 
gearman_client_do_high(gearman_client_st * client,const char * function,const char * unique,const void * workload,size_t workload_size,size_t * result_size,gearman_return_t * ret_ptr)630 void *gearman_client_do_high(gearman_client_st *client,
631                              const char *function,
632                              const char *unique,
633                              const void *workload, size_t workload_size,
634                              size_t *result_size, gearman_return_t *ret_ptr)
635 {
636   return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
637                     function,
638                     unique,
639                     workload, workload_size,
640                     result_size, ret_ptr);
641 }
642 
gearman_client_do_low(gearman_client_st * client,const char * function,const char * unique,const void * workload,size_t workload_size,size_t * result_size,gearman_return_t * ret_ptr)643 void *gearman_client_do_low(gearman_client_st *client,
644                             const char *function,
645                             const char *unique,
646                             const void *workload, size_t workload_size,
647                             size_t *result_size, gearman_return_t *ret_ptr)
648 {
649   return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
650                     function,
651                     unique,
652                     workload, workload_size,
653                     result_size, ret_ptr);
654 }
655 
gearman_client_count_tasks(gearman_client_st * client)656 size_t gearman_client_count_tasks(gearman_client_st *client)
657 {
658   if (client == NULL)
659   {
660     return 0;
661   }
662 
663   size_t count= 1;
664   gearman_task_st *search= client->task_list;
665 
666   while ((search= search->next))
667   {
668     count++;
669   }
670 
671   return count;
672 }
673 
674 #if 0
675 static bool _active_tasks(gearman_client_st *client)
676 {
677   assert(client);
678   gearman_task_st *search= client->task_list;
679 
680   if (not search)
681     return false;
682 
683   do
684   {
685     if (gearman_task_is_active(search))
686     {
687       return true;
688     }
689   } while ((search= search->next));
690 
691   return false;
692 }
693 #endif
694 
gearman_client_do_job_handle(gearman_client_st * self)695 const char *gearman_client_do_job_handle(gearman_client_st *self)
696 {
697   if (self)
698   {
699     return self->_do_handle;
700   }
701 
702   errno= EINVAL;
703   return NULL;
704 }
705 
gearman_client_do_status(gearman_client_st *,uint32_t * numerator,uint32_t * denominator)706 void gearman_client_do_status(gearman_client_st *, uint32_t *numerator, uint32_t *denominator)
707 {
708   if (numerator)
709   {
710     *numerator= 0;
711   }
712 
713   if (denominator)
714   {
715     *denominator= 0;
716   }
717 }
718 
gearman_client_do_background(gearman_client_st * client,const char * function_name,const char * unique,const void * workload_str,size_t workload_size,gearman_job_handle_t job_handle)719 gearman_return_t gearman_client_do_background(gearman_client_st *client,
720                                               const char *function_name,
721                                               const char *unique,
722                                               const void *workload_str,
723                                               size_t workload_size,
724                                               gearman_job_handle_t job_handle)
725 {
726   gearman_string_t function= { gearman_string_param_cstr(function_name) };
727   gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
728   gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
729 
730   return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
731                                function,
732                                local_unique,
733                                workload,
734                                job_handle);
735 }
736 
gearman_client_do_high_background(gearman_client_st * client,const char * function_name,const char * unique,const void * workload_str,size_t workload_size,gearman_job_handle_t job_handle)737 gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
738                                                    const char *function_name,
739                                                    const char *unique,
740                                                    const void *workload_str,
741                                                    size_t workload_size,
742                                                    gearman_job_handle_t job_handle)
743 {
744   gearman_string_t function= { gearman_string_param_cstr(function_name) };
745   gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
746   gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
747 
748   return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
749                                function,
750                                local_unique,
751                                workload,
752                                job_handle);
753 }
754 
gearman_client_do_low_background(gearman_client_st * client,const char * function_name,const char * unique,const void * workload_str,size_t workload_size,gearman_job_handle_t job_handle)755 gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
756                                                   const char *function_name,
757                                                   const char *unique,
758                                                   const void *workload_str,
759                                                   size_t workload_size,
760                                                   gearman_job_handle_t job_handle)
761 {
762   gearman_string_t function= { gearman_string_param_cstr(function_name) };
763   gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
764   gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
765 
766   return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
767                                function,
768                                local_unique,
769                                workload,
770                                job_handle);
771 }
772 
gearman_client_unique_status(gearman_client_st * client,const char * unique,size_t unique_length)773 gearman_status_t gearman_client_unique_status(gearman_client_st *client,
774                                               const char *unique, size_t unique_length)
775 {
776   (void)unique_length;
777   gearman_status_t status;
778   gearman_init(status);
779 
780   if (client == NULL)
781   {
782     gearman_status_set_return(status, GEARMAN_INVALID_ARGUMENT);
783     return status;
784   }
785 
786   universal_reset_error(client->universal);
787 
788   gearman_return_t ret;
789   gearman_task_st do_task;
790   gearman_task_st *do_task_ptr= gearman_client_add_task_status_by_unique(client,
791                                                                          &do_task,
792                                                                          unique, &ret);
793   if (gearman_failed(ret))
794   {
795     gearman_status_set_return(status, ret);
796     return status;
797   }
798   assert(do_task_ptr);
799   do_task_ptr->type= GEARMAN_TASK_KIND_DO;
800 
801   gearman_task_clear_fn(do_task_ptr);
802 
803   ret= gearman_client_run_block_tasks(client, do_task_ptr);
804 
805   // @note we don't know if our task was run or not, we just know something
806   // happened.
807 
808   if (gearman_success(ret))
809   {
810     gearman_status_set(status,
811                        do_task.options.is_known,
812                        do_task.options.is_running,
813                        do_task.numerator,
814                        do_task.denominator,
815                        do_task.client_count);
816 
817     if (gearman_status_is_known(status) == false and gearman_status_is_running(status) == false)
818     {
819       if (do_task.options.is_running)
820       {
821         ret= GEARMAN_IN_PROGRESS;
822       }
823       else if (do_task.options.is_known)
824       {
825         ret= GEARMAN_JOB_EXISTS;
826       }
827     }
828   }
829 
830   gearman_task_free(do_task_ptr);
831 
832   gearman_status_set_return(status, ret);
833 
834   return status;
835 }
836 
gearman_client_job_status(gearman_client_st * client,const gearman_job_handle_t job_handle,bool * is_known,bool * is_running,uint32_t * numerator,uint32_t * denominator)837 gearman_return_t gearman_client_job_status(gearman_client_st *client,
838                                            const gearman_job_handle_t job_handle,
839                                            bool *is_known, bool *is_running,
840                                            uint32_t *numerator,
841                                            uint32_t *denominator)
842 {
843   gearman_return_t ret;
844 
845   if (client == NULL)
846   {
847     return GEARMAN_INVALID_ARGUMENT;
848   }
849 
850   universal_reset_error(client->universal);
851 
852   gearman_task_st do_task;
853   gearman_task_st *do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
854                                                                job_handle, &ret);
855   if (gearman_failed(ret))
856   {
857     return ret;
858   }
859   assert(do_task_ptr);
860   do_task_ptr->type= GEARMAN_TASK_KIND_DO;
861 
862   gearman_task_clear_fn(do_task_ptr);
863 
864   ret= gearman_client_run_block_tasks(client, do_task_ptr);
865 
866   // @note we don't know if our task was run or not, we just know something
867   // happened.
868 
869   if (gearman_success(ret))
870   {
871     if (is_known)
872     {
873       *is_known= do_task.options.is_known;
874     }
875 
876     if (is_running)
877     {
878       *is_running= do_task.options.is_running;
879     }
880 
881     if (numerator)
882     {
883       *numerator= do_task.numerator;
884     }
885 
886     if (denominator)
887     {
888       *denominator= do_task.denominator;
889     }
890 
891     if (is_known == false and is_running == false)
892     {
893       if (do_task.options.is_running)
894       {
895         ret= GEARMAN_IN_PROGRESS;
896       }
897       else if (do_task.options.is_known)
898       {
899         ret= GEARMAN_JOB_EXISTS;
900       }
901     }
902   }
903   else
904   {
905     if (is_known)
906     {
907       *is_known= false;
908     }
909 
910     if (is_running)
911     {
912       *is_running= false;
913     }
914 
915     if (numerator)
916     {
917       *numerator= 0;
918     }
919 
920     if (denominator)
921     {
922       *denominator= 0;
923     }
924   }
925   gearman_task_free(do_task_ptr);
926 
927   return ret;
928 }
929 
gearman_client_echo(gearman_client_st * client,const void * workload,size_t workload_size)930 gearman_return_t gearman_client_echo(gearman_client_st *client,
931                                      const void *workload,
932                                      size_t workload_size)
933 {
934   if (client == NULL)
935   {
936     return GEARMAN_INVALID_ARGUMENT;
937   }
938 
939   return gearman_echo(client->universal, workload, workload_size);
940 }
941 
gearman_client_task_free_all(gearman_client_st * client)942 void gearman_client_task_free_all(gearman_client_st *client)
943 {
944   if (client and client->task_list)
945   {
946     while (client->task_list)
947     {
948       gearman_task_free(client->task_list);
949     }
950   }
951 }
952 
953 
gearman_client_set_task_context_free_fn(gearman_client_st * client,gearman_task_context_free_fn * function)954 void gearman_client_set_task_context_free_fn(gearman_client_st *client,
955                                              gearman_task_context_free_fn *function)
956 {
957   if (client)
958   {
959     client->task_context_free_fn= function;
960   }
961 }
962 
gearman_client_set_memory_allocators(gearman_client_st * client,gearman_malloc_fn * malloc_fn,gearman_free_fn * free_fn,gearman_realloc_fn * realloc_fn,gearman_calloc_fn * calloc_fn,void * context)963 gearman_return_t gearman_client_set_memory_allocators(gearman_client_st *client,
964                                                       gearman_malloc_fn *malloc_fn,
965                                                       gearman_free_fn *free_fn,
966                                                       gearman_realloc_fn *realloc_fn,
967                                                       gearman_calloc_fn *calloc_fn,
968                                                       void *context)
969 {
970   if (client == NULL)
971   {
972     return GEARMAN_INVALID_ARGUMENT;
973   }
974 
975   return gearman_set_memory_allocator(client->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
976 }
977 
978 
979 
gearman_client_add_task(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)980 gearman_task_st *gearman_client_add_task(gearman_client_st *client,
981                                          gearman_task_st *task,
982                                          void *context,
983                                          const char *function,
984                                          const char *unique,
985                                          const void *workload, size_t workload_size,
986                                          gearman_return_t *ret_ptr)
987 {
988   gearman_return_t unused;
989   if (ret_ptr == NULL)
990   {
991     ret_ptr= &unused;
992   }
993 
994   if (client == NULL)
995   {
996     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
997     return NULL;
998   }
999 
1000   return add_task_ptr(*client, task,
1001                       context, GEARMAN_COMMAND_SUBMIT_JOB,
1002                       function,
1003                       unique,
1004                       workload, workload_size,
1005                       time_t(0),
1006                       ret_ptr,
1007                       client->actions);
1008 }
1009 
gearman_client_add_task_high(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)1010 gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
1011                                               gearman_task_st *task,
1012                                               void *context,
1013                                               const char *function,
1014                                               const char *unique,
1015                                               const void *workload, size_t workload_size,
1016                                               gearman_return_t *ret_ptr)
1017 {
1018   gearman_return_t unused;
1019   if (ret_ptr == NULL)
1020   {
1021     ret_ptr= &unused;
1022   }
1023 
1024   if (client == NULL)
1025   {
1026     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1027     return NULL;
1028   }
1029 
1030   return add_task_ptr(*client, task, context,
1031                       GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
1032                       function,
1033                       unique,
1034                       workload, workload_size,
1035                       time_t(0),
1036                       ret_ptr,
1037                       client->actions);
1038 }
1039 
gearman_client_add_task_low(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)1040 gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
1041                                              gearman_task_st *task,
1042                                              void *context,
1043                                              const char *function,
1044                                              const char *unique,
1045                                              const void *workload, size_t workload_size,
1046                                              gearman_return_t *ret_ptr)
1047 {
1048   gearman_return_t unused;
1049   if (ret_ptr == NULL)
1050   {
1051     ret_ptr= &unused;
1052   }
1053 
1054   if (client == NULL)
1055   {
1056     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1057     return NULL;
1058   }
1059 
1060   return add_task_ptr(*client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
1061                       function,
1062                       unique,
1063                       workload, workload_size,
1064                       time_t(0),
1065                       ret_ptr,
1066                       client->actions);
1067 }
1068 
gearman_client_add_task_background(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)1069 gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
1070                                                     gearman_task_st *task,
1071                                                     void *context,
1072                                                     const char *function,
1073                                                     const char *unique,
1074                                                     const void *workload, size_t workload_size,
1075                                                     gearman_return_t *ret_ptr)
1076 {
1077   gearman_return_t unused;
1078   if (ret_ptr == NULL)
1079   {
1080     ret_ptr= &unused;
1081   }
1082 
1083   if (client == NULL)
1084   {
1085     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1086     return NULL;
1087   }
1088 
1089   return add_task_ptr(*client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
1090                       function,
1091                       unique,
1092                       workload, workload_size,
1093                       time_t(0),
1094                       ret_ptr,
1095                       client->actions);
1096 }
1097 
1098 gearman_task_st *
gearman_client_add_task_high_background(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)1099 gearman_client_add_task_high_background(gearman_client_st *client,
1100                                         gearman_task_st *task,
1101                                         void *context,
1102                                         const char *function,
1103                                         const char *unique,
1104                                         const void *workload, size_t workload_size,
1105                                         gearman_return_t *ret_ptr)
1106 {
1107   gearman_return_t unused;
1108   if (ret_ptr == NULL)
1109   {
1110     ret_ptr= &unused;
1111   }
1112 
1113   if (client == NULL)
1114   {
1115     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1116     return NULL;
1117   }
1118 
1119   return add_task_ptr(*client, task, context,
1120                       GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
1121                       function,
1122                       unique,
1123                       workload, workload_size,
1124                       time_t(0),
1125                       ret_ptr,
1126                       client->actions);
1127 }
1128 
gearman_client_add_task_low_background(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)1129 gearman_task_st* gearman_client_add_task_low_background(gearman_client_st *client,
1130                                                         gearman_task_st *task,
1131                                                         void *context,
1132                                                         const char *function,
1133                                                         const char *unique,
1134                                                         const void *workload, size_t workload_size,
1135                                                         gearman_return_t *ret_ptr)
1136 {
1137   gearman_return_t unused;
1138   if (ret_ptr == NULL)
1139   {
1140     ret_ptr= &unused;
1141   }
1142 
1143   if (client == NULL)
1144   {
1145     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1146     return NULL;
1147   }
1148 
1149   return add_task_ptr(*client, task, context,
1150                       GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
1151                       function,
1152                       unique,
1153                       workload, workload_size,
1154                       time_t(0),
1155                       ret_ptr,
1156                       client->actions);
1157 
1158 }
1159 
gearman_client_add_task_status(gearman_client_st * client,gearman_task_st * task,void * context,const gearman_job_handle_t job_handle,gearman_return_t * ret_ptr)1160 gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
1161                                                 gearman_task_st *task,
1162                                                 void *context,
1163                                                 const gearman_job_handle_t job_handle,
1164                                                 gearman_return_t *ret_ptr)
1165 {
1166   const void *args[1];
1167   size_t args_size[1];
1168 
1169   gearman_return_t unused;
1170   if (ret_ptr == NULL)
1171   {
1172     ret_ptr= &unused;
1173   }
1174 
1175   if (client == NULL)
1176   {
1177     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1178     return NULL;
1179   }
1180 
1181   if ((task= gearman_task_internal_create(client, task)) == NULL)
1182   {
1183     *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
1184     return NULL;
1185   }
1186 
1187   task->context= context;
1188   snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
1189 
1190   args[0]= job_handle;
1191   args_size[0]= strlen(job_handle);
1192   gearman_return_t rc= gearman_packet_create_args(client->universal, task->send,
1193                                                   GEARMAN_MAGIC_REQUEST,
1194                                                   GEARMAN_COMMAND_GET_STATUS,
1195                                                   args, args_size, 1);
1196   if (gearman_success(rc))
1197   {
1198     client->new_tasks++;
1199     client->running_tasks++;
1200     task->options.send_in_use= true;
1201   }
1202   *ret_ptr= rc;
1203 
1204   return task;
1205 }
1206 
gearman_client_add_task_status_by_unique(gearman_client_st * client,gearman_task_st * task_ptr,const char * unique_handle,gearman_return_t * ret_ptr)1207 gearman_task_st *gearman_client_add_task_status_by_unique(gearman_client_st *client,
1208                                                           gearman_task_st *task_ptr,
1209                                                           const char *unique_handle,
1210                                                           gearman_return_t *ret_ptr)
1211 {
1212   const void *args[1];
1213   size_t args_size[1];
1214 
1215   gearman_return_t unused;
1216   if (ret_ptr == NULL)
1217   {
1218     ret_ptr= &unused;
1219   }
1220 
1221   if (client == NULL)
1222   {
1223     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1224     return NULL;
1225   }
1226 
1227   if (unique_handle == NULL)
1228   {
1229     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1230     return NULL;
1231   }
1232 
1233   size_t unique_length= strlen(unique_handle);
1234   if (unique_length > GEARMAN_MAX_UNIQUE_SIZE)
1235   {
1236     *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1237     return NULL;
1238   }
1239 
1240   gearman_task_st *task;
1241   if ((task= gearman_task_internal_create(client, task_ptr)) == NULL)
1242   {
1243     *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
1244     return NULL;
1245   }
1246 
1247   task->unique_length= unique_length;
1248   memcpy(task->unique, unique_handle, unique_length);
1249   task->unique[task->unique_length]= 0;
1250 
1251   args[0]= task->unique;
1252   args_size[0]= task->unique_length;
1253   gearman_return_t rc= gearman_packet_create_args(client->universal, task->send,
1254                                                   GEARMAN_MAGIC_REQUEST,
1255                                                   GEARMAN_COMMAND_GET_STATUS_UNIQUE,
1256                                                   args, args_size, 1);
1257   if (gearman_success(rc))
1258   {
1259     client->new_tasks++;
1260     client->running_tasks++;
1261     task->options.send_in_use= true;
1262   }
1263   *ret_ptr= rc;
1264 
1265   return task;
1266 }
1267 
gearman_client_set_workload_fn(gearman_client_st * client,gearman_workload_fn * function)1268 void gearman_client_set_workload_fn(gearman_client_st *client,
1269                                     gearman_workload_fn *function)
1270 {
1271   if (client)
1272   {
1273     client->actions.workload_fn= function;
1274   }
1275 }
1276 
gearman_client_set_created_fn(gearman_client_st * client,gearman_created_fn * function)1277 void gearman_client_set_created_fn(gearman_client_st *client,
1278                                    gearman_created_fn *function)
1279 {
1280   if (client)
1281   {
1282     client->actions.created_fn= function;
1283   }
1284 }
1285 
gearman_client_set_data_fn(gearman_client_st * client,gearman_data_fn * function)1286 void gearman_client_set_data_fn(gearman_client_st *client,
1287                                 gearman_data_fn *function)
1288 {
1289   if (client)
1290   {
1291     client->actions.data_fn= function;
1292   }
1293 }
1294 
gearman_client_set_warning_fn(gearman_client_st * client,gearman_warning_fn * function)1295 void gearman_client_set_warning_fn(gearman_client_st *client,
1296                                    gearman_warning_fn *function)
1297 {
1298   if (client)
1299   {
1300     client->actions.warning_fn= function;
1301   }
1302 }
1303 
gearman_client_set_status_fn(gearman_client_st * client,gearman_universal_status_fn * function)1304 void gearman_client_set_status_fn(gearman_client_st *client,
1305                                   gearman_universal_status_fn *function)
1306 {
1307   if (client)
1308   {
1309     client->actions.status_fn= function;
1310   }
1311 }
1312 
gearman_client_set_complete_fn(gearman_client_st * client,gearman_complete_fn * function)1313 void gearman_client_set_complete_fn(gearman_client_st *client,
1314                                     gearman_complete_fn *function)
1315 {
1316   if (client)
1317   {
1318     client->actions.complete_fn= function;
1319   }
1320 }
1321 
gearman_client_set_exception_fn(gearman_client_st * client,gearman_exception_fn * function)1322 void gearman_client_set_exception_fn(gearman_client_st *client,
1323                                      gearman_exception_fn *function)
1324 {
1325   if (client)
1326   {
1327     client->actions.exception_fn= function;
1328   }
1329 }
1330 
gearman_client_set_fail_fn(gearman_client_st * client,gearman_fail_fn * function)1331 void gearman_client_set_fail_fn(gearman_client_st *client,
1332                                 gearman_fail_fn *function)
1333 {
1334   if (client)
1335   {
1336     client->actions.fail_fn= function;
1337   }
1338 }
1339 
gearman_client_clear_fn(gearman_client_st * client)1340 void gearman_client_clear_fn(gearman_client_st *client)
1341 {
1342   if (client)
1343   {
1344     client->actions= gearman_actions_default();
1345   }
1346 }
1347 
_client_run_tasks(gearman_client_st * client,gearman_task_st * exit_task)1348 static inline gearman_return_t _client_run_tasks(gearman_client_st *client, gearman_task_st* exit_task)
1349 {
1350   gearman_return_t ret= GEARMAN_MAX_RETURN;
1351 
1352   switch(client->state)
1353   {
1354   case GEARMAN_CLIENT_STATE_IDLE:
1355     while (1)
1356     {
1357       /* Start any new tasks. */
1358       if (client->new_tasks > 0 && ! (client->options.no_new))
1359       {
1360         for (client->task= client->task_list; client->task;
1361              client->task= client->task->next)
1362         {
1363           if (client->task->state != GEARMAN_TASK_STATE_NEW)
1364           {
1365             continue;
1366           }
1367 
1368   case GEARMAN_CLIENT_STATE_NEW:
1369           if (client->task == NULL)
1370           {
1371             client->state= GEARMAN_CLIENT_STATE_IDLE;
1372             break;
1373           }
1374 
1375           assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
1376           gearman_return_t local_ret= _client_run_task(client->task);
1377           if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1378           {
1379             client->state= GEARMAN_CLIENT_STATE_NEW;
1380 
1381             return local_ret;
1382           }
1383         }
1384 
1385         if (client->new_tasks == 0)
1386         {
1387           gearman_flush_all(client->universal);
1388         }
1389       }
1390 
1391       /* See if there are any connections ready for I/O. */
1392       while ((client->con= gearman_ready(client->universal)))
1393       {
1394         if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
1395         {
1396           /* Socket is ready for writing, continue submitting jobs. */
1397           for (client->task= client->task_list; client->task;
1398                client->task= client->task->next)
1399           {
1400             if (client->task->con != client->con or
1401                 (client->task->state != GEARMAN_TASK_STATE_SUBMIT and
1402                  client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
1403             {
1404               continue;
1405             }
1406 
1407   case GEARMAN_CLIENT_STATE_SUBMIT:
1408             if (client->task == NULL)
1409             {
1410               client->state= GEARMAN_CLIENT_STATE_IDLE;
1411               break;
1412             }
1413             assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
1414             gearman_return_t local_ret= _client_run_task(client->task);
1415             if (local_ret == GEARMAN_COULD_NOT_CONNECT)
1416             {
1417               client->state= GEARMAN_CLIENT_STATE_IDLE;
1418               return local_ret;
1419             }
1420             else if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1421             {
1422               client->state= GEARMAN_CLIENT_STATE_SUBMIT;
1423               return local_ret;
1424             }
1425           }
1426 
1427           /* Connection errors are fatal. */
1428           if (client->con->revents & (POLLERR | POLLHUP | POLLNVAL))
1429           {
1430             gearman_error(client->universal, GEARMAN_LOST_CONNECTION, "detected lost connection in _client_run_tasks()");
1431             client->con->close_socket();
1432             client->state= GEARMAN_CLIENT_STATE_IDLE;
1433             return GEARMAN_LOST_CONNECTION;
1434           }
1435         }
1436 
1437         if ((client->con->revents & POLLIN) == 0)
1438         {
1439           continue;
1440         }
1441 
1442         /* Socket is ready for reading. */
1443         while (1)
1444         {
1445           /* Read packet on connection and find which task it belongs to. */
1446           if (client->options.unbuffered_result)
1447           {
1448             /* If client is handling the data read, make sure it's complete. */
1449             if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
1450             {
1451               for (client->task= client->task_list; client->task;
1452                    client->task= client->task->next)
1453               {
1454                 if (client->task->con == client->con &&
1455                     (client->task->state == GEARMAN_TASK_STATE_DATA or
1456                      client->task->state == GEARMAN_TASK_STATE_COMPLETE))
1457                 {
1458                   break;
1459                 }
1460               }
1461 
1462               /*
1463                 Someone has set GEARMAN_CLIENT_UNBUFFERED_RESULT but hasn't setup the client to fetch data correctly.
1464                 Fatal error :(
1465               */
1466               return gearman_universal_set_error(client->universal, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT,
1467                                                  "client created with GEARMAN_CLIENT_UNBUFFERED_RESULT, but was not setup to use it. %s", __func__);
1468             }
1469             else
1470             {
1471               /* Read the next packet, without buffering the data part. */
1472               client->task= NULL;
1473               (void)client->con->receiving(client->con->_packet, ret, false);
1474             }
1475           }
1476           else
1477           {
1478             /* Read the next packet, buffering the data part. */
1479             client->task= NULL;
1480             (void)client->con->receiving(client->con->_packet, ret, true);
1481           }
1482 
1483           if (client->task == NULL)
1484           {
1485             assert(ret != GEARMAN_MAX_RETURN);
1486 
1487             /* Check the return of the gearman_connection_recv() calls above. */
1488             if (gearman_failed(ret))
1489             {
1490               if (ret == GEARMAN_IO_WAIT)
1491               {
1492                 break;
1493               }
1494 
1495               client->state= GEARMAN_CLIENT_STATE_IDLE;
1496               return ret;
1497             }
1498 
1499             client->con->options.packet_in_use= true;
1500 
1501             /* We have a packet, see which task it belongs to. */
1502             for (client->task= client->task_list; client->task;
1503                  client->task= client->task->next)
1504             {
1505               if (client->task->con != client->con)
1506               {
1507                 continue;
1508               }
1509 
1510               gearman_log_debug(client->universal, "Got %s", gearman_strcommand(client->con->_packet.command));
1511               if (client->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)
1512               {
1513                 if (client->task->created_id != client->con->created_id)
1514                 {
1515                   continue;
1516                 }
1517 
1518                 /* New job created, drop through below and notify task. */
1519                 client->con->created_id++;
1520               }
1521               else if (client->con->_packet.command == GEARMAN_COMMAND_ERROR)
1522               {
1523                 gearman_universal_set_error(client->universal, GEARMAN_SERVER_ERROR, GEARMAN_AT,
1524                                             "%s:%.*s",
1525                                             static_cast<char *>(client->con->_packet.arg[0]),
1526                                             int(client->con->_packet.arg_size[1]),
1527                                             static_cast<char *>(client->con->_packet.arg[1]));
1528 
1529                 /*
1530                   Packet cleanup copied from "Clean up the packet" below, and must
1531                   remain in sync with its reference.
1532                 */
1533                 gearman_packet_free(&(client->con->_packet));
1534                 client->con->options.packet_in_use= false;
1535 
1536                 /* This step copied from _client_run_tasks() above: */
1537                 /* Increment this value because new job created then failed. */
1538                 client->con->created_id++;
1539 
1540                 return GEARMAN_SERVER_ERROR;
1541               }
1542               else if (client->con->_packet.command == GEARMAN_COMMAND_STATUS_RES_UNIQUE and
1543                        (strncmp(gearman_task_unique(client->task),
1544                                static_cast<char *>(client->con->_packet.arg[0]),
1545                                client->con->_packet.arg_size[0]) == 0))
1546               { }
1547               else if (strncmp(client->task->job_handle,
1548                                static_cast<char *>(client->con->_packet.arg[0]),
1549                                client->con->_packet.arg_size[0]) ||
1550                        (client->con->_packet.command != GEARMAN_COMMAND_WORK_FAIL &&
1551                         strlen(client->task->job_handle) != client->con->_packet.arg_size[0] - 1) ||
1552                        (client->con->_packet.command == GEARMAN_COMMAND_WORK_FAIL &&
1553                         strlen(client->task->job_handle) != client->con->_packet.arg_size[0]))
1554               {
1555                 continue;
1556               }
1557 
1558               /* Else, we have a matching result packet of some kind. */
1559 
1560               break;
1561             }
1562 
1563             if (client->task == NULL)
1564             {
1565               /* The client has stopped waiting for the response, ignore it. */
1566               client->con->free_private_packet();
1567               continue;
1568             }
1569 
1570             client->task->recv= &(client->con->_packet);
1571           }
1572 
1573   case GEARMAN_CLIENT_STATE_PACKET:
1574           /* Let task process job created or result packet. */
1575           assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
1576           gearman_return_t local_ret= _client_run_task(client->task);
1577           if (local_ret == GEARMAN_IO_WAIT)
1578           {
1579             break;
1580           }
1581 
1582           if (gearman_failed(local_ret))
1583           {
1584             client->state= GEARMAN_CLIENT_STATE_PACKET;
1585             return local_ret;
1586           }
1587 
1588           /* Clean up the packet. */
1589           client->con->free_private_packet();
1590 
1591           /* If exit task is set and matched, exit */
1592           if (exit_task)
1593           {
1594             if (exit_task->result_rc != GEARMAN_UNKNOWN_STATE)
1595             {
1596               client->state= GEARMAN_CLIENT_STATE_IDLE;
1597               return GEARMAN_SUCCESS;
1598             }
1599           }
1600 
1601           /* If all tasks are done, return. */
1602           if (client->running_tasks == 0)
1603           {
1604             client->state= GEARMAN_CLIENT_STATE_IDLE;
1605             return GEARMAN_SUCCESS;
1606           }
1607         }
1608       }
1609 
1610       /* If all tasks are done, return. */
1611       if (client->running_tasks == 0)
1612       {
1613         break;
1614       }
1615 
1616       if (client->new_tasks > 0 and ! (client->options.no_new))
1617       {
1618         continue;
1619       }
1620 
1621       if (client->options.non_blocking)
1622       {
1623         /* Let the caller wait for activity. */
1624         client->state= GEARMAN_CLIENT_STATE_IDLE;
1625 
1626         return gearman_gerror(client->universal, GEARMAN_IO_WAIT);
1627       }
1628 
1629       /* Wait for activity on one of the connections. */
1630       gearman_return_t local_ret= gearman_wait(client->universal);
1631       if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1632       {
1633         client->state= GEARMAN_CLIENT_STATE_IDLE;
1634 
1635         return local_ret;
1636       }
1637     }
1638 
1639     break;
1640   }
1641 
1642   client->state= GEARMAN_CLIENT_STATE_IDLE;
1643 
1644   return GEARMAN_SUCCESS;
1645 }
1646 
gearman_client_run_tasks(gearman_client_st * client)1647 gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
1648 {
1649   if (client == NULL)
1650   {
1651     return GEARMAN_INVALID_ARGUMENT;
1652   }
1653 
1654   if (client->task_list == NULL) // We are immediatly successful if all tasks are completed
1655   {
1656     return GEARMAN_SUCCESS;
1657   }
1658 
1659   gearman_return_t rc;
1660   {
1661     PUSH_NON_BLOCKING(client->universal);
1662 
1663     rc= _client_run_tasks(client, NULL);
1664   }
1665 
1666   if (rc == GEARMAN_COULD_NOT_CONNECT)
1667   {
1668     gearman_reset(client->universal);
1669   }
1670 
1671   return rc;
1672 }
1673 
gearman_client_run_block_tasks(gearman_client_st * client,gearman_task_st * exit_task)1674 gearman_return_t gearman_client_run_block_tasks(gearman_client_st *client, gearman_task_st* exit_task)
1675 {
1676   if (client == NULL)
1677   {
1678     return GEARMAN_INVALID_ARGUMENT;
1679   }
1680 
1681   if (client->task_list == NULL) // We are immediatly successful if all tasks are completed
1682   {
1683     return GEARMAN_SUCCESS;
1684   }
1685 
1686 
1687   gearman_return_t rc;
1688   {
1689     PUSH_BLOCKING(client->universal);
1690 
1691     rc= _client_run_tasks(client, exit_task);
1692   }
1693 
1694   if (gearman_failed(rc))
1695   {
1696     if (rc == GEARMAN_COULD_NOT_CONNECT)
1697     {
1698       gearman_reset(client->universal);
1699     }
1700 
1701     if (gearman_universal_error_code(client->universal) != rc and rc != GEARMAN_COULD_NOT_CONNECT)
1702     {
1703       assert(gearman_universal_error_code(client->universal) == rc);
1704     }
1705   }
1706 
1707   return rc;
1708 }
1709 
1710 /*
1711  * Static Definitions
1712  */
1713 
gearman_client_compare(const gearman_client_st * first,const gearman_client_st * second)1714 bool gearman_client_compare(const gearman_client_st *first, const gearman_client_st *second)
1715 {
1716   if (first == NULL or second == NULL)
1717   {
1718     return false;
1719   }
1720 
1721   if (strcmp(first->universal.con_list->host, second->universal.con_list->host))
1722   {
1723     return false;
1724   }
1725 
1726   if (first->universal.con_list->port != second->universal.con_list->port)
1727   {
1728     return false;
1729   }
1730 
1731   return true;
1732 }
1733 
gearman_client_set_server_option(gearman_client_st * self,const char * option_arg,size_t option_arg_size)1734 bool gearman_client_set_server_option(gearman_client_st *self, const char *option_arg, size_t option_arg_size)
1735 {
1736   if (self)
1737   {
1738     gearman_string_t option= { option_arg, option_arg_size };
1739     return gearman_request_option(self->universal, option);
1740   }
1741 
1742   return false;
1743 }
1744 
gearman_client_set_namespace(gearman_client_st * self,const char * namespace_key,size_t namespace_key_size)1745 void gearman_client_set_namespace(gearman_client_st *self, const char *namespace_key, size_t namespace_key_size)
1746 {
1747   if (self)
1748   {
1749     gearman_universal_set_namespace(self->universal, namespace_key, namespace_key_size);
1750   }
1751 }
1752 
gearman_client_set_identifier(gearman_client_st * client,const char * id,size_t id_size)1753 gearman_return_t gearman_client_set_identifier(gearman_client_st *client,
1754                                                const char *id, size_t id_size)
1755 {
1756   if (client)
1757   {
1758     return gearman_set_identifier(client->universal, id, id_size);
1759   }
1760 
1761   return GEARMAN_INVALID_ARGUMENT;
1762 }
1763 
gearman_client_namespace(gearman_client_st * self)1764 const char *gearman_client_namespace(gearman_client_st *self)
1765 {
1766   return gearman_univeral_namespace(self->universal);
1767 }
1768