1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2008 Brian Aker, Eric Day
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39 #include "gear_config.h"
40
41 #include <libgearman/common.h>
42 #include <libgearman/log.hpp>
43
44 #include "libgearman/assert.hpp"
45
46 #include <arpa/inet.h>
47 #include <cerrno>
48 #include <cstdio>
49 #include <cstdlib>
50 #include <cstring>
51 #include <memory>
52 #include <netdb.h>
53 #include <netinet/in.h>
54 #include <sys/socket.h>
55
56 /*
57 Allocate a client structure.
58 */
_client_allocate(gearman_client_st * client,bool is_clone)59 static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
60 {
61 if (client)
62 {
63 client->options.allocated= false;
64 }
65 else
66 {
67 client= new (std::nothrow) gearman_client_st;
68 if (client == NULL)
69 {
70 return NULL;
71 }
72
73 client->options.allocated= true;
74 }
75
76 client->options.non_blocking= false;
77 client->options.unbuffered_result= false;
78 client->options.no_new= false;
79 client->options.free_tasks= false;
80 client->options.generate_unique= true;
81
82 client->state= GEARMAN_CLIENT_STATE_IDLE;
83 client->new_tasks= 0;
84 client->running_tasks= 0;
85 client->task_count= 0;
86 client->context= NULL;
87 client->con= NULL;
88 client->task= NULL;
89 client->task_list= NULL;
90 client->task_context_free_fn= NULL;
91 gearman_client_clear_fn(client);
92
93 if (is_clone == false)
94 {
95 gearman_universal_initialize(client->universal);
96 }
97
98 return client;
99 }
100
101 /**
102 * Callback function used when parsing server lists.
103 */
_client_add_server(const char * host,in_port_t port,void * context)104 static gearman_return_t _client_add_server(const char *host, in_port_t port,
105 void *context)
106 {
107 return gearman_client_add_server(static_cast<gearman_client_st *>(context), host, port);
108 }
109
110
111 /**
112 * Real do function.
113 */
_client_do(gearman_client_st * client,gearman_command_t command,const char * function_name,const char * unique,const void * workload_str,size_t workload_size,size_t * result_size,gearman_return_t * ret_ptr)114 static void *_client_do(gearman_client_st *client, gearman_command_t command,
115 const char *function_name,
116 const char *unique,
117 const void *workload_str, size_t workload_size,
118 size_t *result_size, gearman_return_t *ret_ptr)
119 {
120 gearman_return_t unused;
121 if (ret_ptr == NULL)
122 {
123 ret_ptr= &unused;
124 }
125
126 if (client == NULL)
127 {
128 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
129 return NULL;
130 }
131
132 universal_reset_error(client->universal);
133
134 size_t unused_size;
135 if (result_size == NULL)
136 {
137 result_size= &unused_size;
138 }
139 *result_size= 0;
140
141 gearman_string_t function= { gearman_string_param_cstr(function_name) };
142 gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
143 gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
144
145 gearman_task_st do_task;
146 gearman_task_st *do_task_ptr= add_task(*client, &do_task, NULL, command,
147 function,
148 local_unique,
149 workload,
150 time_t(0),
151 gearman_actions_do_default());
152 if (do_task_ptr == NULL)
153 {
154 *ret_ptr= gearman_universal_error_code(client->universal);
155 return NULL;
156 }
157 do_task_ptr->type= GEARMAN_TASK_KIND_DO;
158
159 gearman_return_t ret= gearman_client_run_block_tasks(client, do_task_ptr);
160
161 // gearman_client_run_tasks failed
162 assert(client->task_list); // Programmer error, we should always have the task that we used for do
163
164 char *returnable= NULL;
165 if (gearman_failed(ret))
166 {
167 if (ret == GEARMAN_COULD_NOT_CONNECT)
168 { }
169 else
170 {
171 gearman_error(client->universal, ret, "occured during gearman_client_run_tasks()");
172 }
173
174 *ret_ptr= ret;
175 *result_size= 0;
176 }
177 else if (gearman_success(ret) and do_task_ptr->result_rc == GEARMAN_SUCCESS)
178 {
179 *ret_ptr= do_task_ptr->result_rc;
180 if (gearman_task_result(do_task_ptr))
181 {
182 if (gearman_has_allocator(client->universal))
183 {
184 gearman_string_t result= gearman_result_string(do_task_ptr->result_ptr);
185 returnable= static_cast<char *>(gearman_malloc(client->universal, gearman_size(result) +1));
186 if (returnable == NULL)
187 {
188 gearman_error(client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "custom workload_fn failed to allocate memory");
189 *result_size= 0;
190 }
191 else // NULL terminate
192 {
193 memcpy(returnable, gearman_c_str(result), gearman_size(result));
194 returnable[gearman_size(result)]= 0;
195 *result_size= gearman_size(result);
196 }
197 }
198 else
199 {
200 gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
201 *result_size= gearman_size(result);
202 returnable= const_cast<char *>(gearman_c_str(result));
203 }
204 }
205 else // NULL job
206 {
207 *result_size= 0;
208 }
209 }
210 else // gearman_client_run_tasks() was successful, but the task was not
211 {
212 gearman_error(client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
213
214 *ret_ptr= do_task_ptr->result_rc;
215 *result_size= 0;
216 }
217
218 gearman_task_free(&do_task);
219 client->new_tasks= 0;
220 client->running_tasks= 0;
221
222 return returnable;
223 }
224
225 /*
226 Real background do function.
227 */
_client_do_background(gearman_client_st * client,gearman_command_t command,gearman_string_t & function,gearman_unique_t & unique,gearman_string_t & workload,gearman_job_handle_t job_handle)228 static gearman_return_t _client_do_background(gearman_client_st *client,
229 gearman_command_t command,
230 gearman_string_t &function,
231 gearman_unique_t &unique,
232 gearman_string_t &workload,
233 gearman_job_handle_t job_handle)
234 {
235 if (client == NULL)
236 {
237 return GEARMAN_INVALID_ARGUMENT;
238 }
239
240 universal_reset_error(client->universal);
241
242 if (gearman_size(function) == 0)
243 {
244 return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "function argument was empty");
245 }
246
247 client->_do_handle[0]= 0; // Reset the job_handle we store in client
248
249 gearman_task_st do_task, *do_task_ptr;
250 do_task_ptr= add_task(*client, &do_task,
251 client,
252 command,
253 function,
254 unique,
255 workload,
256 time_t(0),
257 gearman_actions_do_default());
258 if (not do_task_ptr)
259 {
260 return gearman_universal_error_code(client->universal);
261 }
262 do_task_ptr->type= GEARMAN_TASK_KIND_DO;
263
264 gearman_return_t ret= gearman_client_run_block_tasks(client, do_task_ptr);
265
266 if (job_handle)
267 {
268 strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
269 }
270 strncpy(client->_do_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
271 client->new_tasks= 0;
272 client->running_tasks= 0;
273 gearman_task_free(&do_task);
274
275 return ret;
276 }
277
278
279 /*
280 * Public Definitions
281 */
282
gearman_client_create(gearman_client_st * client)283 gearman_client_st *gearman_client_create(gearman_client_st *client)
284 {
285 return _client_allocate(client, false);
286 }
287
gearman_client_clone(gearman_client_st * client,const gearman_client_st * from)288 gearman_client_st *gearman_client_clone(gearman_client_st *client,
289 const gearman_client_st *from)
290 {
291 if (from == NULL)
292 {
293 return _client_allocate(client, false);
294 }
295
296 client= _client_allocate(client, true);
297
298 if (client == NULL)
299 {
300 return client;
301 }
302
303 client->options.non_blocking= from->options.non_blocking;
304 client->options.unbuffered_result= from->options.unbuffered_result;
305 client->options.no_new= from->options.no_new;
306 client->options.free_tasks= from->options.free_tasks;
307 client->options.generate_unique= from->options.generate_unique;
308 client->actions= from->actions;
309 client->_do_handle[0]= 0;
310
311 gearman_universal_clone(client->universal, from->universal);
312
313 return client;
314 }
315
gearman_client_free(gearman_client_st * client)316 void gearman_client_free(gearman_client_st *client)
317 {
318 if (client)
319 {
320
321 gearman_client_task_free_all(client);
322
323 gearman_universal_free(client->universal);
324
325 if (client->options.allocated)
326 {
327 delete client;
328 }
329 }
330 }
331
gearman_client_error(const gearman_client_st * client)332 const char *gearman_client_error(const gearman_client_st *client)
333 {
334 if (client)
335 {
336 return gearman_universal_error(client->universal);
337 }
338
339 return NULL;
340 }
341
gearman_client_error_code(const gearman_client_st * client)342 gearman_return_t gearman_client_error_code(const gearman_client_st *client)
343 {
344 if (client == NULL)
345 {
346 return GEARMAN_INVALID_ARGUMENT;
347 }
348
349 return gearman_universal_error_code(client->universal);
350 }
351
gearman_client_errno(const gearman_client_st * client)352 int gearman_client_errno(const gearman_client_st *client)
353 {
354 if (client == NULL)
355 {
356 return EINVAL;
357 }
358
359 return gearman_universal_errno(client->universal);
360 }
361
gearman_client_options(const gearman_client_st * client)362 gearman_client_options_t gearman_client_options(const gearman_client_st *client)
363 {
364 if (client)
365 {
366 int32_t options;
367 memset(&options, 0, sizeof(int32_t));
368
369 if (client->options.allocated)
370 options|= int(GEARMAN_CLIENT_ALLOCATED);
371
372 if (client->options.non_blocking)
373 options|= int(GEARMAN_CLIENT_NON_BLOCKING);
374
375 if (client->options.unbuffered_result)
376 options|= int(GEARMAN_CLIENT_UNBUFFERED_RESULT);
377
378 if (client->options.no_new)
379 options|= int(GEARMAN_CLIENT_NO_NEW);
380
381 if (client->options.free_tasks)
382 options|= int(GEARMAN_CLIENT_FREE_TASKS);
383
384 if (client->options.generate_unique)
385 options|= int(GEARMAN_CLIENT_GENERATE_UNIQUE);
386
387 return gearman_client_options_t(options);
388 }
389
390 return gearman_client_options_t(GEARMAN_WORKER_MAX);
391 }
392
gearman_client_has_option(gearman_client_st * client,gearman_client_options_t option)393 bool gearman_client_has_option(gearman_client_st *client,
394 gearman_client_options_t option)
395 {
396 if (client)
397 {
398 switch (option)
399 {
400 case GEARMAN_CLIENT_ALLOCATED:
401 return client->options.allocated;
402
403 case GEARMAN_CLIENT_NON_BLOCKING:
404 return client->options.non_blocking;
405
406 case GEARMAN_CLIENT_UNBUFFERED_RESULT:
407 return client->options.unbuffered_result;
408
409 case GEARMAN_CLIENT_NO_NEW:
410 return client->options.no_new;
411
412 case GEARMAN_CLIENT_FREE_TASKS:
413 return client->options.free_tasks;
414
415 case GEARMAN_CLIENT_GENERATE_UNIQUE:
416 return client->options.generate_unique;
417
418 default:
419 case GEARMAN_CLIENT_TASK_IN_USE:
420 case GEARMAN_CLIENT_MAX:
421 break; // Let these fall through to false
422 }
423 }
424
425 return false;
426 }
427
gearman_client_set_options(gearman_client_st * client,gearman_client_options_t options)428 void gearman_client_set_options(gearman_client_st *client,
429 gearman_client_options_t options)
430 {
431 if (client)
432 {
433 gearman_client_options_t usable_options[]= {
434 GEARMAN_CLIENT_NON_BLOCKING,
435 GEARMAN_CLIENT_UNBUFFERED_RESULT,
436 GEARMAN_CLIENT_FREE_TASKS,
437 GEARMAN_CLIENT_GENERATE_UNIQUE,
438 GEARMAN_CLIENT_MAX
439 };
440
441 for (gearman_client_options_t* ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
442 {
443 if (options & *ptr)
444 {
445 gearman_client_add_options(client, *ptr);
446 }
447 else
448 {
449 gearman_client_remove_options(client, *ptr);
450 }
451 }
452 }
453 }
454
gearman_client_add_options(gearman_client_st * client,gearman_client_options_t options)455 void gearman_client_add_options(gearman_client_st *client,
456 gearman_client_options_t options)
457 {
458 if (client)
459 {
460 if (options & GEARMAN_CLIENT_NON_BLOCKING)
461 {
462 gearman_universal_add_options(client->universal, GEARMAN_NON_BLOCKING);
463 client->options.non_blocking= true;
464 }
465
466 if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
467 {
468 client->options.unbuffered_result= true;
469 }
470
471 if (options & GEARMAN_CLIENT_FREE_TASKS)
472 {
473 client->options.free_tasks= true;
474 }
475
476 if (options & GEARMAN_CLIENT_GENERATE_UNIQUE)
477 {
478 client->options.generate_unique= true;
479 }
480 }
481 }
482
gearman_client_remove_options(gearman_client_st * client,gearman_client_options_t options)483 void gearman_client_remove_options(gearman_client_st *client,
484 gearman_client_options_t options)
485 {
486 if (client)
487 {
488 if (options & GEARMAN_CLIENT_NON_BLOCKING)
489 {
490 gearman_universal_remove_options(client->universal, GEARMAN_NON_BLOCKING);
491 client->options.non_blocking= false;
492 }
493
494 if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
495 {
496 client->options.unbuffered_result= false;
497 }
498
499 if (options & GEARMAN_CLIENT_FREE_TASKS)
500 {
501 client->options.free_tasks= false;
502 }
503
504 if (options & GEARMAN_CLIENT_GENERATE_UNIQUE)
505 {
506 client->options.generate_unique= false;
507 }
508 }
509 }
510
gearman_client_timeout(gearman_client_st * client)511 int gearman_client_timeout(gearman_client_st *client)
512 {
513 if (client)
514 {
515 return gearman_universal_timeout(client->universal);
516 }
517
518 return -1;
519 }
520
gearman_client_set_timeout(gearman_client_st * client,int timeout)521 void gearman_client_set_timeout(gearman_client_st *client, int timeout)
522 {
523 if (client)
524 {
525 gearman_universal_set_timeout(client->universal, timeout);
526 }
527 }
528
gearman_client_context(const gearman_client_st * client)529 void *gearman_client_context(const gearman_client_st *client)
530 {
531 if (client)
532 {
533 return const_cast<void *>(client->context);
534 }
535
536 return NULL;
537 }
538
gearman_client_set_context(gearman_client_st * client,void * context)539 void gearman_client_set_context(gearman_client_st *client, void *context)
540 {
541 if (client)
542 {
543 client->context= context;
544 }
545 }
546
gearman_client_set_log_fn(gearman_client_st * client,gearman_log_fn * function,void * context,gearman_verbose_t verbose)547 void gearman_client_set_log_fn(gearman_client_st *client,
548 gearman_log_fn *function, void *context,
549 gearman_verbose_t verbose)
550 {
551 if (client)
552 {
553 gearman_set_log_fn(client->universal, function, context, verbose);
554 }
555 }
556
gearman_client_set_workload_malloc_fn(gearman_client_st * client,gearman_malloc_fn * function,void * context)557 void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
558 gearman_malloc_fn *function,
559 void *context)
560 {
561 if (client)
562 {
563 gearman_set_workload_malloc_fn(client->universal, function, context);
564 }
565 }
566
gearman_client_set_workload_free_fn(gearman_client_st * client,gearman_free_fn * function,void * context)567 void gearman_client_set_workload_free_fn(gearman_client_st *client, gearman_free_fn *function, void *context)
568 {
569 if (client)
570 {
571 gearman_set_workload_free_fn(client->universal, function, context);
572 }
573 }
574
gearman_client_add_server(gearman_client_st * client,const char * host,in_port_t port)575 gearman_return_t gearman_client_add_server(gearman_client_st *client,
576 const char *host, in_port_t port)
577 {
578 if (client)
579 {
580 if (gearman_connection_create_args(client->universal, host, port) == false)
581 {
582 assert(client->universal.error.rc != GEARMAN_SUCCESS);
583 return gearman_universal_error_code(client->universal);
584 }
585
586 return GEARMAN_SUCCESS;
587 }
588
589 return GEARMAN_INVALID_ARGUMENT;
590 }
591
gearman_client_add_servers(gearman_client_st * client,const char * servers)592 gearman_return_t gearman_client_add_servers(gearman_client_st *client,
593 const char *servers)
594 {
595 return gearman_parse_servers(servers, _client_add_server, client);
596 }
597
gearman_client_remove_servers(gearman_client_st * client)598 void gearman_client_remove_servers(gearman_client_st *client)
599 {
600 if (client)
601 {
602 gearman_free_all_cons(client->universal);
603 }
604 }
605
gearman_client_wait(gearman_client_st * client)606 gearman_return_t gearman_client_wait(gearman_client_st *client)
607 {
608 if (client)
609 {
610 return gearman_wait(client->universal);
611 }
612
613 return GEARMAN_INVALID_ARGUMENT;
614 }
615
gearman_client_do(gearman_client_st * client,const char * function,const char * unique,const void * workload,size_t workload_size,size_t * result_size,gearman_return_t * ret_ptr)616 void *gearman_client_do(gearman_client_st *client,
617 const char *function,
618 const char *unique,
619 const void *workload,
620 size_t workload_size, size_t *result_size,
621 gearman_return_t *ret_ptr)
622 {
623 return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
624 function,
625 unique,
626 workload, workload_size,
627 result_size, ret_ptr);
628 }
629
gearman_client_do_high(gearman_client_st * client,const char * function,const char * unique,const void * workload,size_t workload_size,size_t * result_size,gearman_return_t * ret_ptr)630 void *gearman_client_do_high(gearman_client_st *client,
631 const char *function,
632 const char *unique,
633 const void *workload, size_t workload_size,
634 size_t *result_size, gearman_return_t *ret_ptr)
635 {
636 return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
637 function,
638 unique,
639 workload, workload_size,
640 result_size, ret_ptr);
641 }
642
gearman_client_do_low(gearman_client_st * client,const char * function,const char * unique,const void * workload,size_t workload_size,size_t * result_size,gearman_return_t * ret_ptr)643 void *gearman_client_do_low(gearman_client_st *client,
644 const char *function,
645 const char *unique,
646 const void *workload, size_t workload_size,
647 size_t *result_size, gearman_return_t *ret_ptr)
648 {
649 return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
650 function,
651 unique,
652 workload, workload_size,
653 result_size, ret_ptr);
654 }
655
gearman_client_count_tasks(gearman_client_st * client)656 size_t gearman_client_count_tasks(gearman_client_st *client)
657 {
658 if (client == NULL)
659 {
660 return 0;
661 }
662
663 size_t count= 1;
664 gearman_task_st *search= client->task_list;
665
666 while ((search= search->next))
667 {
668 count++;
669 }
670
671 return count;
672 }
673
674 #if 0
675 static bool _active_tasks(gearman_client_st *client)
676 {
677 assert(client);
678 gearman_task_st *search= client->task_list;
679
680 if (not search)
681 return false;
682
683 do
684 {
685 if (gearman_task_is_active(search))
686 {
687 return true;
688 }
689 } while ((search= search->next));
690
691 return false;
692 }
693 #endif
694
gearman_client_do_job_handle(gearman_client_st * self)695 const char *gearman_client_do_job_handle(gearman_client_st *self)
696 {
697 if (self)
698 {
699 return self->_do_handle;
700 }
701
702 errno= EINVAL;
703 return NULL;
704 }
705
gearman_client_do_status(gearman_client_st *,uint32_t * numerator,uint32_t * denominator)706 void gearman_client_do_status(gearman_client_st *, uint32_t *numerator, uint32_t *denominator)
707 {
708 if (numerator)
709 {
710 *numerator= 0;
711 }
712
713 if (denominator)
714 {
715 *denominator= 0;
716 }
717 }
718
gearman_client_do_background(gearman_client_st * client,const char * function_name,const char * unique,const void * workload_str,size_t workload_size,gearman_job_handle_t job_handle)719 gearman_return_t gearman_client_do_background(gearman_client_st *client,
720 const char *function_name,
721 const char *unique,
722 const void *workload_str,
723 size_t workload_size,
724 gearman_job_handle_t job_handle)
725 {
726 gearman_string_t function= { gearman_string_param_cstr(function_name) };
727 gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
728 gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
729
730 return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
731 function,
732 local_unique,
733 workload,
734 job_handle);
735 }
736
gearman_client_do_high_background(gearman_client_st * client,const char * function_name,const char * unique,const void * workload_str,size_t workload_size,gearman_job_handle_t job_handle)737 gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
738 const char *function_name,
739 const char *unique,
740 const void *workload_str,
741 size_t workload_size,
742 gearman_job_handle_t job_handle)
743 {
744 gearman_string_t function= { gearman_string_param_cstr(function_name) };
745 gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
746 gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
747
748 return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
749 function,
750 local_unique,
751 workload,
752 job_handle);
753 }
754
gearman_client_do_low_background(gearman_client_st * client,const char * function_name,const char * unique,const void * workload_str,size_t workload_size,gearman_job_handle_t job_handle)755 gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
756 const char *function_name,
757 const char *unique,
758 const void *workload_str,
759 size_t workload_size,
760 gearman_job_handle_t job_handle)
761 {
762 gearman_string_t function= { gearman_string_param_cstr(function_name) };
763 gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
764 gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
765
766 return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
767 function,
768 local_unique,
769 workload,
770 job_handle);
771 }
772
gearman_client_unique_status(gearman_client_st * client,const char * unique,size_t unique_length)773 gearman_status_t gearman_client_unique_status(gearman_client_st *client,
774 const char *unique, size_t unique_length)
775 {
776 (void)unique_length;
777 gearman_status_t status;
778 gearman_init(status);
779
780 if (client == NULL)
781 {
782 gearman_status_set_return(status, GEARMAN_INVALID_ARGUMENT);
783 return status;
784 }
785
786 universal_reset_error(client->universal);
787
788 gearman_return_t ret;
789 gearman_task_st do_task;
790 gearman_task_st *do_task_ptr= gearman_client_add_task_status_by_unique(client,
791 &do_task,
792 unique, &ret);
793 if (gearman_failed(ret))
794 {
795 gearman_status_set_return(status, ret);
796 return status;
797 }
798 assert(do_task_ptr);
799 do_task_ptr->type= GEARMAN_TASK_KIND_DO;
800
801 gearman_task_clear_fn(do_task_ptr);
802
803 ret= gearman_client_run_block_tasks(client, do_task_ptr);
804
805 // @note we don't know if our task was run or not, we just know something
806 // happened.
807
808 if (gearman_success(ret))
809 {
810 gearman_status_set(status,
811 do_task.options.is_known,
812 do_task.options.is_running,
813 do_task.numerator,
814 do_task.denominator,
815 do_task.client_count);
816
817 if (gearman_status_is_known(status) == false and gearman_status_is_running(status) == false)
818 {
819 if (do_task.options.is_running)
820 {
821 ret= GEARMAN_IN_PROGRESS;
822 }
823 else if (do_task.options.is_known)
824 {
825 ret= GEARMAN_JOB_EXISTS;
826 }
827 }
828 }
829
830 gearman_task_free(do_task_ptr);
831
832 gearman_status_set_return(status, ret);
833
834 return status;
835 }
836
gearman_client_job_status(gearman_client_st * client,const gearman_job_handle_t job_handle,bool * is_known,bool * is_running,uint32_t * numerator,uint32_t * denominator)837 gearman_return_t gearman_client_job_status(gearman_client_st *client,
838 const gearman_job_handle_t job_handle,
839 bool *is_known, bool *is_running,
840 uint32_t *numerator,
841 uint32_t *denominator)
842 {
843 gearman_return_t ret;
844
845 if (client == NULL)
846 {
847 return GEARMAN_INVALID_ARGUMENT;
848 }
849
850 universal_reset_error(client->universal);
851
852 gearman_task_st do_task;
853 gearman_task_st *do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
854 job_handle, &ret);
855 if (gearman_failed(ret))
856 {
857 return ret;
858 }
859 assert(do_task_ptr);
860 do_task_ptr->type= GEARMAN_TASK_KIND_DO;
861
862 gearman_task_clear_fn(do_task_ptr);
863
864 ret= gearman_client_run_block_tasks(client, do_task_ptr);
865
866 // @note we don't know if our task was run or not, we just know something
867 // happened.
868
869 if (gearman_success(ret))
870 {
871 if (is_known)
872 {
873 *is_known= do_task.options.is_known;
874 }
875
876 if (is_running)
877 {
878 *is_running= do_task.options.is_running;
879 }
880
881 if (numerator)
882 {
883 *numerator= do_task.numerator;
884 }
885
886 if (denominator)
887 {
888 *denominator= do_task.denominator;
889 }
890
891 if (is_known == false and is_running == false)
892 {
893 if (do_task.options.is_running)
894 {
895 ret= GEARMAN_IN_PROGRESS;
896 }
897 else if (do_task.options.is_known)
898 {
899 ret= GEARMAN_JOB_EXISTS;
900 }
901 }
902 }
903 else
904 {
905 if (is_known)
906 {
907 *is_known= false;
908 }
909
910 if (is_running)
911 {
912 *is_running= false;
913 }
914
915 if (numerator)
916 {
917 *numerator= 0;
918 }
919
920 if (denominator)
921 {
922 *denominator= 0;
923 }
924 }
925 gearman_task_free(do_task_ptr);
926
927 return ret;
928 }
929
gearman_client_echo(gearman_client_st * client,const void * workload,size_t workload_size)930 gearman_return_t gearman_client_echo(gearman_client_st *client,
931 const void *workload,
932 size_t workload_size)
933 {
934 if (client == NULL)
935 {
936 return GEARMAN_INVALID_ARGUMENT;
937 }
938
939 return gearman_echo(client->universal, workload, workload_size);
940 }
941
gearman_client_task_free_all(gearman_client_st * client)942 void gearman_client_task_free_all(gearman_client_st *client)
943 {
944 if (client and client->task_list)
945 {
946 while (client->task_list)
947 {
948 gearman_task_free(client->task_list);
949 }
950 }
951 }
952
953
gearman_client_set_task_context_free_fn(gearman_client_st * client,gearman_task_context_free_fn * function)954 void gearman_client_set_task_context_free_fn(gearman_client_st *client,
955 gearman_task_context_free_fn *function)
956 {
957 if (client)
958 {
959 client->task_context_free_fn= function;
960 }
961 }
962
gearman_client_set_memory_allocators(gearman_client_st * client,gearman_malloc_fn * malloc_fn,gearman_free_fn * free_fn,gearman_realloc_fn * realloc_fn,gearman_calloc_fn * calloc_fn,void * context)963 gearman_return_t gearman_client_set_memory_allocators(gearman_client_st *client,
964 gearman_malloc_fn *malloc_fn,
965 gearman_free_fn *free_fn,
966 gearman_realloc_fn *realloc_fn,
967 gearman_calloc_fn *calloc_fn,
968 void *context)
969 {
970 if (client == NULL)
971 {
972 return GEARMAN_INVALID_ARGUMENT;
973 }
974
975 return gearman_set_memory_allocator(client->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
976 }
977
978
979
gearman_client_add_task(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)980 gearman_task_st *gearman_client_add_task(gearman_client_st *client,
981 gearman_task_st *task,
982 void *context,
983 const char *function,
984 const char *unique,
985 const void *workload, size_t workload_size,
986 gearman_return_t *ret_ptr)
987 {
988 gearman_return_t unused;
989 if (ret_ptr == NULL)
990 {
991 ret_ptr= &unused;
992 }
993
994 if (client == NULL)
995 {
996 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
997 return NULL;
998 }
999
1000 return add_task_ptr(*client, task,
1001 context, GEARMAN_COMMAND_SUBMIT_JOB,
1002 function,
1003 unique,
1004 workload, workload_size,
1005 time_t(0),
1006 ret_ptr,
1007 client->actions);
1008 }
1009
gearman_client_add_task_high(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)1010 gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
1011 gearman_task_st *task,
1012 void *context,
1013 const char *function,
1014 const char *unique,
1015 const void *workload, size_t workload_size,
1016 gearman_return_t *ret_ptr)
1017 {
1018 gearman_return_t unused;
1019 if (ret_ptr == NULL)
1020 {
1021 ret_ptr= &unused;
1022 }
1023
1024 if (client == NULL)
1025 {
1026 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1027 return NULL;
1028 }
1029
1030 return add_task_ptr(*client, task, context,
1031 GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
1032 function,
1033 unique,
1034 workload, workload_size,
1035 time_t(0),
1036 ret_ptr,
1037 client->actions);
1038 }
1039
gearman_client_add_task_low(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)1040 gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
1041 gearman_task_st *task,
1042 void *context,
1043 const char *function,
1044 const char *unique,
1045 const void *workload, size_t workload_size,
1046 gearman_return_t *ret_ptr)
1047 {
1048 gearman_return_t unused;
1049 if (ret_ptr == NULL)
1050 {
1051 ret_ptr= &unused;
1052 }
1053
1054 if (client == NULL)
1055 {
1056 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1057 return NULL;
1058 }
1059
1060 return add_task_ptr(*client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
1061 function,
1062 unique,
1063 workload, workload_size,
1064 time_t(0),
1065 ret_ptr,
1066 client->actions);
1067 }
1068
gearman_client_add_task_background(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)1069 gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
1070 gearman_task_st *task,
1071 void *context,
1072 const char *function,
1073 const char *unique,
1074 const void *workload, size_t workload_size,
1075 gearman_return_t *ret_ptr)
1076 {
1077 gearman_return_t unused;
1078 if (ret_ptr == NULL)
1079 {
1080 ret_ptr= &unused;
1081 }
1082
1083 if (client == NULL)
1084 {
1085 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1086 return NULL;
1087 }
1088
1089 return add_task_ptr(*client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
1090 function,
1091 unique,
1092 workload, workload_size,
1093 time_t(0),
1094 ret_ptr,
1095 client->actions);
1096 }
1097
1098 gearman_task_st *
gearman_client_add_task_high_background(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)1099 gearman_client_add_task_high_background(gearman_client_st *client,
1100 gearman_task_st *task,
1101 void *context,
1102 const char *function,
1103 const char *unique,
1104 const void *workload, size_t workload_size,
1105 gearman_return_t *ret_ptr)
1106 {
1107 gearman_return_t unused;
1108 if (ret_ptr == NULL)
1109 {
1110 ret_ptr= &unused;
1111 }
1112
1113 if (client == NULL)
1114 {
1115 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1116 return NULL;
1117 }
1118
1119 return add_task_ptr(*client, task, context,
1120 GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
1121 function,
1122 unique,
1123 workload, workload_size,
1124 time_t(0),
1125 ret_ptr,
1126 client->actions);
1127 }
1128
gearman_client_add_task_low_background(gearman_client_st * client,gearman_task_st * task,void * context,const char * function,const char * unique,const void * workload,size_t workload_size,gearman_return_t * ret_ptr)1129 gearman_task_st* gearman_client_add_task_low_background(gearman_client_st *client,
1130 gearman_task_st *task,
1131 void *context,
1132 const char *function,
1133 const char *unique,
1134 const void *workload, size_t workload_size,
1135 gearman_return_t *ret_ptr)
1136 {
1137 gearman_return_t unused;
1138 if (ret_ptr == NULL)
1139 {
1140 ret_ptr= &unused;
1141 }
1142
1143 if (client == NULL)
1144 {
1145 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1146 return NULL;
1147 }
1148
1149 return add_task_ptr(*client, task, context,
1150 GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
1151 function,
1152 unique,
1153 workload, workload_size,
1154 time_t(0),
1155 ret_ptr,
1156 client->actions);
1157
1158 }
1159
gearman_client_add_task_status(gearman_client_st * client,gearman_task_st * task,void * context,const gearman_job_handle_t job_handle,gearman_return_t * ret_ptr)1160 gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
1161 gearman_task_st *task,
1162 void *context,
1163 const gearman_job_handle_t job_handle,
1164 gearman_return_t *ret_ptr)
1165 {
1166 const void *args[1];
1167 size_t args_size[1];
1168
1169 gearman_return_t unused;
1170 if (ret_ptr == NULL)
1171 {
1172 ret_ptr= &unused;
1173 }
1174
1175 if (client == NULL)
1176 {
1177 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1178 return NULL;
1179 }
1180
1181 if ((task= gearman_task_internal_create(client, task)) == NULL)
1182 {
1183 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
1184 return NULL;
1185 }
1186
1187 task->context= context;
1188 snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
1189
1190 args[0]= job_handle;
1191 args_size[0]= strlen(job_handle);
1192 gearman_return_t rc= gearman_packet_create_args(client->universal, task->send,
1193 GEARMAN_MAGIC_REQUEST,
1194 GEARMAN_COMMAND_GET_STATUS,
1195 args, args_size, 1);
1196 if (gearman_success(rc))
1197 {
1198 client->new_tasks++;
1199 client->running_tasks++;
1200 task->options.send_in_use= true;
1201 }
1202 *ret_ptr= rc;
1203
1204 return task;
1205 }
1206
gearman_client_add_task_status_by_unique(gearman_client_st * client,gearman_task_st * task_ptr,const char * unique_handle,gearman_return_t * ret_ptr)1207 gearman_task_st *gearman_client_add_task_status_by_unique(gearman_client_st *client,
1208 gearman_task_st *task_ptr,
1209 const char *unique_handle,
1210 gearman_return_t *ret_ptr)
1211 {
1212 const void *args[1];
1213 size_t args_size[1];
1214
1215 gearman_return_t unused;
1216 if (ret_ptr == NULL)
1217 {
1218 ret_ptr= &unused;
1219 }
1220
1221 if (client == NULL)
1222 {
1223 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1224 return NULL;
1225 }
1226
1227 if (unique_handle == NULL)
1228 {
1229 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1230 return NULL;
1231 }
1232
1233 size_t unique_length= strlen(unique_handle);
1234 if (unique_length > GEARMAN_MAX_UNIQUE_SIZE)
1235 {
1236 *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1237 return NULL;
1238 }
1239
1240 gearman_task_st *task;
1241 if ((task= gearman_task_internal_create(client, task_ptr)) == NULL)
1242 {
1243 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
1244 return NULL;
1245 }
1246
1247 task->unique_length= unique_length;
1248 memcpy(task->unique, unique_handle, unique_length);
1249 task->unique[task->unique_length]= 0;
1250
1251 args[0]= task->unique;
1252 args_size[0]= task->unique_length;
1253 gearman_return_t rc= gearman_packet_create_args(client->universal, task->send,
1254 GEARMAN_MAGIC_REQUEST,
1255 GEARMAN_COMMAND_GET_STATUS_UNIQUE,
1256 args, args_size, 1);
1257 if (gearman_success(rc))
1258 {
1259 client->new_tasks++;
1260 client->running_tasks++;
1261 task->options.send_in_use= true;
1262 }
1263 *ret_ptr= rc;
1264
1265 return task;
1266 }
1267
gearman_client_set_workload_fn(gearman_client_st * client,gearman_workload_fn * function)1268 void gearman_client_set_workload_fn(gearman_client_st *client,
1269 gearman_workload_fn *function)
1270 {
1271 if (client)
1272 {
1273 client->actions.workload_fn= function;
1274 }
1275 }
1276
gearman_client_set_created_fn(gearman_client_st * client,gearman_created_fn * function)1277 void gearman_client_set_created_fn(gearman_client_st *client,
1278 gearman_created_fn *function)
1279 {
1280 if (client)
1281 {
1282 client->actions.created_fn= function;
1283 }
1284 }
1285
gearman_client_set_data_fn(gearman_client_st * client,gearman_data_fn * function)1286 void gearman_client_set_data_fn(gearman_client_st *client,
1287 gearman_data_fn *function)
1288 {
1289 if (client)
1290 {
1291 client->actions.data_fn= function;
1292 }
1293 }
1294
gearman_client_set_warning_fn(gearman_client_st * client,gearman_warning_fn * function)1295 void gearman_client_set_warning_fn(gearman_client_st *client,
1296 gearman_warning_fn *function)
1297 {
1298 if (client)
1299 {
1300 client->actions.warning_fn= function;
1301 }
1302 }
1303
gearman_client_set_status_fn(gearman_client_st * client,gearman_universal_status_fn * function)1304 void gearman_client_set_status_fn(gearman_client_st *client,
1305 gearman_universal_status_fn *function)
1306 {
1307 if (client)
1308 {
1309 client->actions.status_fn= function;
1310 }
1311 }
1312
gearman_client_set_complete_fn(gearman_client_st * client,gearman_complete_fn * function)1313 void gearman_client_set_complete_fn(gearman_client_st *client,
1314 gearman_complete_fn *function)
1315 {
1316 if (client)
1317 {
1318 client->actions.complete_fn= function;
1319 }
1320 }
1321
gearman_client_set_exception_fn(gearman_client_st * client,gearman_exception_fn * function)1322 void gearman_client_set_exception_fn(gearman_client_st *client,
1323 gearman_exception_fn *function)
1324 {
1325 if (client)
1326 {
1327 client->actions.exception_fn= function;
1328 }
1329 }
1330
gearman_client_set_fail_fn(gearman_client_st * client,gearman_fail_fn * function)1331 void gearman_client_set_fail_fn(gearman_client_st *client,
1332 gearman_fail_fn *function)
1333 {
1334 if (client)
1335 {
1336 client->actions.fail_fn= function;
1337 }
1338 }
1339
gearman_client_clear_fn(gearman_client_st * client)1340 void gearman_client_clear_fn(gearman_client_st *client)
1341 {
1342 if (client)
1343 {
1344 client->actions= gearman_actions_default();
1345 }
1346 }
1347
_client_run_tasks(gearman_client_st * client,gearman_task_st * exit_task)1348 static inline gearman_return_t _client_run_tasks(gearman_client_st *client, gearman_task_st* exit_task)
1349 {
1350 gearman_return_t ret= GEARMAN_MAX_RETURN;
1351
1352 switch(client->state)
1353 {
1354 case GEARMAN_CLIENT_STATE_IDLE:
1355 while (1)
1356 {
1357 /* Start any new tasks. */
1358 if (client->new_tasks > 0 && ! (client->options.no_new))
1359 {
1360 for (client->task= client->task_list; client->task;
1361 client->task= client->task->next)
1362 {
1363 if (client->task->state != GEARMAN_TASK_STATE_NEW)
1364 {
1365 continue;
1366 }
1367
1368 case GEARMAN_CLIENT_STATE_NEW:
1369 if (client->task == NULL)
1370 {
1371 client->state= GEARMAN_CLIENT_STATE_IDLE;
1372 break;
1373 }
1374
1375 assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
1376 gearman_return_t local_ret= _client_run_task(client->task);
1377 if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1378 {
1379 client->state= GEARMAN_CLIENT_STATE_NEW;
1380
1381 return local_ret;
1382 }
1383 }
1384
1385 if (client->new_tasks == 0)
1386 {
1387 gearman_flush_all(client->universal);
1388 }
1389 }
1390
1391 /* See if there are any connections ready for I/O. */
1392 while ((client->con= gearman_ready(client->universal)))
1393 {
1394 if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
1395 {
1396 /* Socket is ready for writing, continue submitting jobs. */
1397 for (client->task= client->task_list; client->task;
1398 client->task= client->task->next)
1399 {
1400 if (client->task->con != client->con or
1401 (client->task->state != GEARMAN_TASK_STATE_SUBMIT and
1402 client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
1403 {
1404 continue;
1405 }
1406
1407 case GEARMAN_CLIENT_STATE_SUBMIT:
1408 if (client->task == NULL)
1409 {
1410 client->state= GEARMAN_CLIENT_STATE_IDLE;
1411 break;
1412 }
1413 assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
1414 gearman_return_t local_ret= _client_run_task(client->task);
1415 if (local_ret == GEARMAN_COULD_NOT_CONNECT)
1416 {
1417 client->state= GEARMAN_CLIENT_STATE_IDLE;
1418 return local_ret;
1419 }
1420 else if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1421 {
1422 client->state= GEARMAN_CLIENT_STATE_SUBMIT;
1423 return local_ret;
1424 }
1425 }
1426
1427 /* Connection errors are fatal. */
1428 if (client->con->revents & (POLLERR | POLLHUP | POLLNVAL))
1429 {
1430 gearman_error(client->universal, GEARMAN_LOST_CONNECTION, "detected lost connection in _client_run_tasks()");
1431 client->con->close_socket();
1432 client->state= GEARMAN_CLIENT_STATE_IDLE;
1433 return GEARMAN_LOST_CONNECTION;
1434 }
1435 }
1436
1437 if ((client->con->revents & POLLIN) == 0)
1438 {
1439 continue;
1440 }
1441
1442 /* Socket is ready for reading. */
1443 while (1)
1444 {
1445 /* Read packet on connection and find which task it belongs to. */
1446 if (client->options.unbuffered_result)
1447 {
1448 /* If client is handling the data read, make sure it's complete. */
1449 if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
1450 {
1451 for (client->task= client->task_list; client->task;
1452 client->task= client->task->next)
1453 {
1454 if (client->task->con == client->con &&
1455 (client->task->state == GEARMAN_TASK_STATE_DATA or
1456 client->task->state == GEARMAN_TASK_STATE_COMPLETE))
1457 {
1458 break;
1459 }
1460 }
1461
1462 /*
1463 Someone has set GEARMAN_CLIENT_UNBUFFERED_RESULT but hasn't setup the client to fetch data correctly.
1464 Fatal error :(
1465 */
1466 return gearman_universal_set_error(client->universal, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT,
1467 "client created with GEARMAN_CLIENT_UNBUFFERED_RESULT, but was not setup to use it. %s", __func__);
1468 }
1469 else
1470 {
1471 /* Read the next packet, without buffering the data part. */
1472 client->task= NULL;
1473 (void)client->con->receiving(client->con->_packet, ret, false);
1474 }
1475 }
1476 else
1477 {
1478 /* Read the next packet, buffering the data part. */
1479 client->task= NULL;
1480 (void)client->con->receiving(client->con->_packet, ret, true);
1481 }
1482
1483 if (client->task == NULL)
1484 {
1485 assert(ret != GEARMAN_MAX_RETURN);
1486
1487 /* Check the return of the gearman_connection_recv() calls above. */
1488 if (gearman_failed(ret))
1489 {
1490 if (ret == GEARMAN_IO_WAIT)
1491 {
1492 break;
1493 }
1494
1495 client->state= GEARMAN_CLIENT_STATE_IDLE;
1496 return ret;
1497 }
1498
1499 client->con->options.packet_in_use= true;
1500
1501 /* We have a packet, see which task it belongs to. */
1502 for (client->task= client->task_list; client->task;
1503 client->task= client->task->next)
1504 {
1505 if (client->task->con != client->con)
1506 {
1507 continue;
1508 }
1509
1510 gearman_log_debug(client->universal, "Got %s", gearman_strcommand(client->con->_packet.command));
1511 if (client->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)
1512 {
1513 if (client->task->created_id != client->con->created_id)
1514 {
1515 continue;
1516 }
1517
1518 /* New job created, drop through below and notify task. */
1519 client->con->created_id++;
1520 }
1521 else if (client->con->_packet.command == GEARMAN_COMMAND_ERROR)
1522 {
1523 gearman_universal_set_error(client->universal, GEARMAN_SERVER_ERROR, GEARMAN_AT,
1524 "%s:%.*s",
1525 static_cast<char *>(client->con->_packet.arg[0]),
1526 int(client->con->_packet.arg_size[1]),
1527 static_cast<char *>(client->con->_packet.arg[1]));
1528
1529 /*
1530 Packet cleanup copied from "Clean up the packet" below, and must
1531 remain in sync with its reference.
1532 */
1533 gearman_packet_free(&(client->con->_packet));
1534 client->con->options.packet_in_use= false;
1535
1536 /* This step copied from _client_run_tasks() above: */
1537 /* Increment this value because new job created then failed. */
1538 client->con->created_id++;
1539
1540 return GEARMAN_SERVER_ERROR;
1541 }
1542 else if (client->con->_packet.command == GEARMAN_COMMAND_STATUS_RES_UNIQUE and
1543 (strncmp(gearman_task_unique(client->task),
1544 static_cast<char *>(client->con->_packet.arg[0]),
1545 client->con->_packet.arg_size[0]) == 0))
1546 { }
1547 else if (strncmp(client->task->job_handle,
1548 static_cast<char *>(client->con->_packet.arg[0]),
1549 client->con->_packet.arg_size[0]) ||
1550 (client->con->_packet.command != GEARMAN_COMMAND_WORK_FAIL &&
1551 strlen(client->task->job_handle) != client->con->_packet.arg_size[0] - 1) ||
1552 (client->con->_packet.command == GEARMAN_COMMAND_WORK_FAIL &&
1553 strlen(client->task->job_handle) != client->con->_packet.arg_size[0]))
1554 {
1555 continue;
1556 }
1557
1558 /* Else, we have a matching result packet of some kind. */
1559
1560 break;
1561 }
1562
1563 if (client->task == NULL)
1564 {
1565 /* The client has stopped waiting for the response, ignore it. */
1566 client->con->free_private_packet();
1567 continue;
1568 }
1569
1570 client->task->recv= &(client->con->_packet);
1571 }
1572
1573 case GEARMAN_CLIENT_STATE_PACKET:
1574 /* Let task process job created or result packet. */
1575 assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
1576 gearman_return_t local_ret= _client_run_task(client->task);
1577 if (local_ret == GEARMAN_IO_WAIT)
1578 {
1579 break;
1580 }
1581
1582 if (gearman_failed(local_ret))
1583 {
1584 client->state= GEARMAN_CLIENT_STATE_PACKET;
1585 return local_ret;
1586 }
1587
1588 /* Clean up the packet. */
1589 client->con->free_private_packet();
1590
1591 /* If exit task is set and matched, exit */
1592 if (exit_task)
1593 {
1594 if (exit_task->result_rc != GEARMAN_UNKNOWN_STATE)
1595 {
1596 client->state= GEARMAN_CLIENT_STATE_IDLE;
1597 return GEARMAN_SUCCESS;
1598 }
1599 }
1600
1601 /* If all tasks are done, return. */
1602 if (client->running_tasks == 0)
1603 {
1604 client->state= GEARMAN_CLIENT_STATE_IDLE;
1605 return GEARMAN_SUCCESS;
1606 }
1607 }
1608 }
1609
1610 /* If all tasks are done, return. */
1611 if (client->running_tasks == 0)
1612 {
1613 break;
1614 }
1615
1616 if (client->new_tasks > 0 and ! (client->options.no_new))
1617 {
1618 continue;
1619 }
1620
1621 if (client->options.non_blocking)
1622 {
1623 /* Let the caller wait for activity. */
1624 client->state= GEARMAN_CLIENT_STATE_IDLE;
1625
1626 return gearman_gerror(client->universal, GEARMAN_IO_WAIT);
1627 }
1628
1629 /* Wait for activity on one of the connections. */
1630 gearman_return_t local_ret= gearman_wait(client->universal);
1631 if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1632 {
1633 client->state= GEARMAN_CLIENT_STATE_IDLE;
1634
1635 return local_ret;
1636 }
1637 }
1638
1639 break;
1640 }
1641
1642 client->state= GEARMAN_CLIENT_STATE_IDLE;
1643
1644 return GEARMAN_SUCCESS;
1645 }
1646
gearman_client_run_tasks(gearman_client_st * client)1647 gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
1648 {
1649 if (client == NULL)
1650 {
1651 return GEARMAN_INVALID_ARGUMENT;
1652 }
1653
1654 if (client->task_list == NULL) // We are immediatly successful if all tasks are completed
1655 {
1656 return GEARMAN_SUCCESS;
1657 }
1658
1659 gearman_return_t rc;
1660 {
1661 PUSH_NON_BLOCKING(client->universal);
1662
1663 rc= _client_run_tasks(client, NULL);
1664 }
1665
1666 if (rc == GEARMAN_COULD_NOT_CONNECT)
1667 {
1668 gearman_reset(client->universal);
1669 }
1670
1671 return rc;
1672 }
1673
gearman_client_run_block_tasks(gearman_client_st * client,gearman_task_st * exit_task)1674 gearman_return_t gearman_client_run_block_tasks(gearman_client_st *client, gearman_task_st* exit_task)
1675 {
1676 if (client == NULL)
1677 {
1678 return GEARMAN_INVALID_ARGUMENT;
1679 }
1680
1681 if (client->task_list == NULL) // We are immediatly successful if all tasks are completed
1682 {
1683 return GEARMAN_SUCCESS;
1684 }
1685
1686
1687 gearman_return_t rc;
1688 {
1689 PUSH_BLOCKING(client->universal);
1690
1691 rc= _client_run_tasks(client, exit_task);
1692 }
1693
1694 if (gearman_failed(rc))
1695 {
1696 if (rc == GEARMAN_COULD_NOT_CONNECT)
1697 {
1698 gearman_reset(client->universal);
1699 }
1700
1701 if (gearman_universal_error_code(client->universal) != rc and rc != GEARMAN_COULD_NOT_CONNECT)
1702 {
1703 assert(gearman_universal_error_code(client->universal) == rc);
1704 }
1705 }
1706
1707 return rc;
1708 }
1709
1710 /*
1711 * Static Definitions
1712 */
1713
gearman_client_compare(const gearman_client_st * first,const gearman_client_st * second)1714 bool gearman_client_compare(const gearman_client_st *first, const gearman_client_st *second)
1715 {
1716 if (first == NULL or second == NULL)
1717 {
1718 return false;
1719 }
1720
1721 if (strcmp(first->universal.con_list->host, second->universal.con_list->host))
1722 {
1723 return false;
1724 }
1725
1726 if (first->universal.con_list->port != second->universal.con_list->port)
1727 {
1728 return false;
1729 }
1730
1731 return true;
1732 }
1733
gearman_client_set_server_option(gearman_client_st * self,const char * option_arg,size_t option_arg_size)1734 bool gearman_client_set_server_option(gearman_client_st *self, const char *option_arg, size_t option_arg_size)
1735 {
1736 if (self)
1737 {
1738 gearman_string_t option= { option_arg, option_arg_size };
1739 return gearman_request_option(self->universal, option);
1740 }
1741
1742 return false;
1743 }
1744
gearman_client_set_namespace(gearman_client_st * self,const char * namespace_key,size_t namespace_key_size)1745 void gearman_client_set_namespace(gearman_client_st *self, const char *namespace_key, size_t namespace_key_size)
1746 {
1747 if (self)
1748 {
1749 gearman_universal_set_namespace(self->universal, namespace_key, namespace_key_size);
1750 }
1751 }
1752
gearman_client_set_identifier(gearman_client_st * client,const char * id,size_t id_size)1753 gearman_return_t gearman_client_set_identifier(gearman_client_st *client,
1754 const char *id, size_t id_size)
1755 {
1756 if (client)
1757 {
1758 return gearman_set_identifier(client->universal, id, id_size);
1759 }
1760
1761 return GEARMAN_INVALID_ARGUMENT;
1762 }
1763
gearman_client_namespace(gearman_client_st * self)1764 const char *gearman_client_namespace(gearman_client_st *self)
1765 {
1766 return gearman_univeral_namespace(self->universal);
1767 }
1768