1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2008 Brian Aker, Eric Day
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39 #include "gear_config.h"
40
41 #include <libgearman/common.h>
42 #include <libgearman/function/base.hpp>
43 #include <libgearman/function/make.hpp>
44
45 #include "libgearman/pipe.h"
46
47 #include "libgearman/assert.hpp"
48
49 #include <cstdio>
50 #include <cstdlib>
51 #include <cstring>
52 #include <memory>
53 #include <unistd.h>
54 #include <fcntl.h>
55 #include <cerrno>
56
57 /**
58 * @addtogroup gearman_worker_static Static Worker Declarations
59 * @ingroup gearman_worker
60 * @{
61 */
62
_function_exist(gearman_worker_st * worker,const char * function_name,size_t function_length)63 static inline struct _worker_function_st *_function_exist(gearman_worker_st *worker, const char *function_name, size_t function_length)
64 {
65 struct _worker_function_st *function;
66
67 for (function= worker->function_list; function;
68 function= function->next)
69 {
70 if (function_length == function->function_length)
71 {
72 if (memcmp(function_name, function->function_name, function_length) == 0)
73 {
74 break;
75 }
76 }
77 }
78
79 return function;
80 }
81
82 /**
83 * Allocate a worker structure.
84 */
85 static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone);
86
87 /**
88 * Initialize common packets for later use.
89 */
90 static gearman_return_t _worker_packet_init(gearman_worker_st *worker);
91
92 /**
93 * Callback function used when parsing server lists.
94 */
95 static gearman_return_t _worker_add_server(const char *host, in_port_t port, void *context);
96
97 /**
98 * Allocate and add a function to the register list.
99 */
100 static gearman_return_t _worker_function_create(gearman_worker_st *worker,
101 const char *function_name, size_t function_length,
102 const gearman_function_t &function,
103 uint32_t timeout,
104 void *context);
105
106 /**
107 * Free a function.
108 */
109 static void _worker_function_free(gearman_worker_st *worker,
110 struct _worker_function_st *function);
111
112
113 /** @} */
114
115 /*
116 * Public Definitions
117 */
118
gearman_worker_create(gearman_worker_st * worker)119 gearman_worker_st *gearman_worker_create(gearman_worker_st *worker)
120 {
121 worker= _worker_allocate(worker, false);
122
123 if (worker == NULL)
124 {
125 return NULL;
126 }
127
128 if (gearman_failed(_worker_packet_init(worker)))
129 {
130 gearman_worker_free(worker);
131 return NULL;
132 }
133
134 return worker;
135 }
136
gearman_worker_clone(gearman_worker_st * worker,const gearman_worker_st * source)137 gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker,
138 const gearman_worker_st *source)
139 {
140 if (source == NULL)
141 {
142 return _worker_allocate(worker, false);
143 }
144
145 worker= _worker_allocate(worker, true);
146
147 if (worker == NULL)
148 {
149 return worker;
150 }
151
152 worker->options.non_blocking= source->options.non_blocking;
153 worker->options.change= source->options.change;
154 worker->options.grab_uniq= source->options.grab_uniq;
155 worker->options.grab_all= source->options.grab_all;
156 worker->options.timeout_return= source->options.timeout_return;
157
158 gearman_universal_clone(worker->universal, source->universal, true);
159
160 if (gearman_failed(_worker_packet_init(worker)))
161 {
162 gearman_worker_free(worker);
163 return NULL;
164 }
165
166 return worker;
167 }
168
gearman_worker_free(gearman_worker_st * worker)169 void gearman_worker_free(gearman_worker_st *worker)
170 {
171 if (worker == NULL)
172 {
173 return;
174 }
175
176 if (worker->universal.wakeup_fd[0] != INVALID_SOCKET)
177 {
178 close(worker->universal.wakeup_fd[0]);
179 }
180
181 if (worker->universal.wakeup_fd[1] != INVALID_SOCKET)
182 {
183 close(worker->universal.wakeup_fd[1]);
184 }
185
186 gearman_worker_unregister_all(worker);
187
188 if (worker->options.packet_init)
189 {
190 gearman_packet_free(&worker->grab_job);
191 gearman_packet_free(&worker->pre_sleep);
192 }
193
194 gearman_job_free(worker->job);
195 worker->work_job= NULL;
196
197 if (worker->work_result)
198 {
199 gearman_free(worker->universal, worker->work_result);
200 }
201
202 while (worker->function_list)
203 {
204 _worker_function_free(worker, worker->function_list);
205 }
206
207 gearman_job_free_all(worker);
208
209 gearman_universal_free(worker->universal);
210
211 if (worker->options.allocated)
212 {
213 delete worker;
214 }
215 }
216
gearman_worker_error(const gearman_worker_st * worker)217 const char *gearman_worker_error(const gearman_worker_st *worker)
218 {
219 if (worker == NULL)
220 {
221 return NULL;
222 }
223
224 return gearman_universal_error(worker->universal);
225 }
226
gearman_worker_errno(gearman_worker_st * worker)227 int gearman_worker_errno(gearman_worker_st *worker)
228 {
229 if (worker == NULL)
230 {
231 return EINVAL;
232 }
233
234 return gearman_universal_errno(worker->universal);
235 }
236
gearman_worker_options(const gearman_worker_st * worker)237 gearman_worker_options_t gearman_worker_options(const gearman_worker_st *worker)
238 {
239 if (worker == NULL)
240 {
241 return gearman_worker_options_t();
242 }
243
244 int options;
245 memset(&options, 0, sizeof(gearman_worker_options_t));
246
247 if (worker->options.allocated)
248 options|= int(GEARMAN_WORKER_ALLOCATED);
249 if (worker->options.non_blocking)
250 options|= int(GEARMAN_WORKER_NON_BLOCKING);
251 if (worker->options.packet_init)
252 options|= int(GEARMAN_WORKER_PACKET_INIT);
253 if (worker->options.change)
254 options|= int(GEARMAN_WORKER_CHANGE);
255 if (worker->options.grab_uniq)
256 options|= int(GEARMAN_WORKER_GRAB_UNIQ);
257 if (worker->options.grab_all)
258 options|= int(GEARMAN_WORKER_GRAB_ALL);
259 if (worker->options.timeout_return)
260 options|= int(GEARMAN_WORKER_TIMEOUT_RETURN);
261
262 return gearman_worker_options_t(options);
263 }
264
gearman_worker_set_options(gearman_worker_st * worker,gearman_worker_options_t options)265 void gearman_worker_set_options(gearman_worker_st *worker,
266 gearman_worker_options_t options)
267 {
268 if (worker == NULL)
269 {
270 return;
271 }
272
273 gearman_worker_options_t usable_options[]= {
274 GEARMAN_WORKER_NON_BLOCKING,
275 GEARMAN_WORKER_GRAB_UNIQ,
276 GEARMAN_WORKER_GRAB_ALL,
277 GEARMAN_WORKER_TIMEOUT_RETURN,
278 GEARMAN_WORKER_MAX
279 };
280
281 gearman_worker_options_t *ptr;
282
283
284 for (ptr= usable_options; *ptr != GEARMAN_WORKER_MAX ; ptr++)
285 {
286 if (options & *ptr)
287 {
288 gearman_worker_add_options(worker, *ptr);
289 }
290 else
291 {
292 gearman_worker_remove_options(worker, *ptr);
293 }
294 }
295 }
296
gearman_worker_add_options(gearman_worker_st * worker,gearman_worker_options_t options)297 void gearman_worker_add_options(gearman_worker_st *worker,
298 gearman_worker_options_t options)
299 {
300 if (worker == NULL)
301 {
302 return;
303 }
304
305 if (options & GEARMAN_WORKER_NON_BLOCKING)
306 {
307 gearman_universal_add_options(worker->universal, GEARMAN_NON_BLOCKING);
308 worker->options.non_blocking= true;
309 }
310
311 if (options & GEARMAN_WORKER_GRAB_UNIQ)
312 {
313 worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_UNIQ;
314 gearman_return_t rc= gearman_packet_pack_header(&(worker->grab_job));
315 (void)(rc);
316 assert(gearman_success(rc));
317 worker->options.grab_uniq= true;
318 }
319
320 if (options & GEARMAN_WORKER_GRAB_ALL)
321 {
322 worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_ALL;
323 gearman_return_t rc= gearman_packet_pack_header(&(worker->grab_job));
324 (void)(rc);
325 assert(gearman_success(rc));
326 worker->options.grab_all= true;
327 }
328
329 if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
330 {
331 worker->options.timeout_return= true;
332 }
333 }
334
gearman_worker_remove_options(gearman_worker_st * worker,gearman_worker_options_t options)335 void gearman_worker_remove_options(gearman_worker_st *worker,
336 gearman_worker_options_t options)
337 {
338 if (worker)
339 {
340 if (options & GEARMAN_WORKER_NON_BLOCKING)
341 {
342 gearman_universal_remove_options(worker->universal, GEARMAN_NON_BLOCKING);
343 worker->options.non_blocking= false;
344 }
345
346 if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
347 {
348 worker->options.timeout_return= false;
349 gearman_universal_set_timeout(worker->universal, GEARMAN_WORKER_WAIT_TIMEOUT);
350 }
351
352 if (options & GEARMAN_WORKER_GRAB_UNIQ)
353 {
354 worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
355 (void)gearman_packet_pack_header(&(worker->grab_job));
356 worker->options.grab_uniq= false;
357 }
358
359 if (options & GEARMAN_WORKER_GRAB_ALL)
360 {
361 worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
362 (void)gearman_packet_pack_header(&(worker->grab_job));
363 worker->options.grab_all= false;
364 }
365 }
366 }
367
gearman_worker_timeout(gearman_worker_st * worker)368 int gearman_worker_timeout(gearman_worker_st *worker)
369 {
370 if (worker == NULL)
371 {
372 return 0;
373 }
374
375 return gearman_universal_timeout(worker->universal);
376 }
377
gearman_worker_set_timeout(gearman_worker_st * worker,int timeout)378 void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout)
379 {
380 if (worker)
381 {
382 gearman_worker_add_options(worker, GEARMAN_WORKER_TIMEOUT_RETURN);
383 gearman_universal_set_timeout(worker->universal, timeout);
384 }
385 }
386
gearman_worker_context(const gearman_worker_st * worker)387 void *gearman_worker_context(const gearman_worker_st *worker)
388 {
389 if (worker)
390 {
391 return worker->context;
392 }
393
394 return NULL;
395 }
396
gearman_worker_set_context(gearman_worker_st * worker,void * context)397 void gearman_worker_set_context(gearman_worker_st *worker, void *context)
398 {
399 if (worker)
400 {
401 worker->context= context;
402 }
403 }
404
gearman_worker_set_log_fn(gearman_worker_st * worker,gearman_log_fn * function,void * context,gearman_verbose_t verbose)405 void gearman_worker_set_log_fn(gearman_worker_st *worker,
406 gearman_log_fn *function, void *context,
407 gearman_verbose_t verbose)
408 {
409 gearman_set_log_fn(worker->universal, function, context, verbose);
410 }
411
gearman_worker_set_workload_malloc_fn(gearman_worker_st * worker,gearman_malloc_fn * function,void * context)412 void gearman_worker_set_workload_malloc_fn(gearman_worker_st *worker,
413 gearman_malloc_fn *function,
414 void *context)
415 {
416 if (worker)
417 {
418 gearman_set_workload_malloc_fn(worker->universal, function, context);
419 }
420 }
421
gearman_worker_set_workload_free_fn(gearman_worker_st * worker,gearman_free_fn * function,void * context)422 void gearman_worker_set_workload_free_fn(gearman_worker_st *worker,
423 gearman_free_fn *function,
424 void *context)
425 {
426 if (worker)
427 {
428 gearman_set_workload_free_fn(worker->universal, function, context);
429 }
430 }
431
gearman_worker_add_server(gearman_worker_st * worker,const char * host,in_port_t port)432 gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
433 const char *host, in_port_t port)
434 {
435 if (worker)
436 {
437 if (gearman_connection_create_args(worker->universal, host, port) == NULL)
438 {
439 return gearman_universal_error_code(worker->universal);
440 }
441
442 return GEARMAN_SUCCESS;
443 }
444
445 return GEARMAN_INVALID_ARGUMENT;
446 }
447
gearman_worker_add_servers(gearman_worker_st * worker,const char * servers)448 gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker, const char *servers)
449 {
450 return gearman_parse_servers(servers, _worker_add_server, worker);
451 }
452
gearman_worker_remove_servers(gearman_worker_st * worker)453 void gearman_worker_remove_servers(gearman_worker_st *worker)
454 {
455 if (worker)
456 {
457 gearman_free_all_cons(worker->universal);
458 }
459 }
460
gearman_worker_wait(gearman_worker_st * worker)461 gearman_return_t gearman_worker_wait(gearman_worker_st *worker)
462 {
463 if (worker)
464 {
465 return gearman_wait(worker->universal);
466 }
467
468 return GEARMAN_INVALID_ARGUMENT;
469 }
470
gearman_worker_register(gearman_worker_st * worker,const char * function_name,uint32_t timeout)471 gearman_return_t gearman_worker_register(gearman_worker_st *worker,
472 const char *function_name,
473 uint32_t timeout)
474 {
475 gearman_function_t null_func= gearman_function_create_null();
476 return _worker_function_create(worker, function_name, strlen(function_name), null_func, timeout, NULL);
477 }
478
gearman_worker_function_exist(gearman_worker_st * worker,const char * function_name,size_t function_length)479 bool gearman_worker_function_exist(gearman_worker_st *worker,
480 const char *function_name,
481 size_t function_length)
482 {
483 if (worker)
484 {
485 struct _worker_function_st *function;
486
487 function= _function_exist(worker, function_name, function_length);
488
489 return (function && function->options.remove == false) ? true : false;
490 }
491
492 return false;
493 }
494
_worker_unregister(gearman_worker_st * worker,const char * function_name,size_t function_length)495 static inline gearman_return_t _worker_unregister(gearman_worker_st *worker,
496 const char *function_name, size_t function_length)
497 {
498 if (worker)
499 {
500 _worker_function_st *function= _function_exist(worker, function_name, function_length);
501
502 if (function == NULL || function->options.remove)
503 {
504 return GEARMAN_NO_REGISTERED_FUNCTION;
505 }
506
507 if (function->options.packet_in_use)
508 {
509 gearman_packet_free(&(function->packet()));
510 function->options.packet_in_use= false;
511 }
512
513 const void *args[1];
514 size_t args_size[1];
515 args[0]= function->name();
516 args_size[0]= function->length();
517 gearman_return_t ret= gearman_packet_create_args(worker->universal, function->packet(),
518 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CANT_DO,
519 args, args_size, 1);
520 if (gearman_failed(ret))
521 {
522 function->options.packet_in_use= false;
523 return ret;
524 }
525 function->options.packet_in_use= true;
526
527 function->options.change= true;
528 function->options.remove= true;
529
530 worker->options.change= true;
531
532 return GEARMAN_SUCCESS;
533 }
534
535 return GEARMAN_INVALID_ARGUMENT;
536 }
537
gearman_worker_unregister(gearman_worker_st * worker,const char * function_name)538 gearman_return_t gearman_worker_unregister(gearman_worker_st *worker,
539 const char *function_name)
540 {
541 return _worker_unregister(worker, function_name, strlen(function_name));
542 }
543
gearman_worker_unregister_all(gearman_worker_st * worker)544 gearman_return_t gearman_worker_unregister_all(gearman_worker_st *worker)
545 {
546 if (worker)
547 {
548 struct _worker_function_st *function;
549 uint32_t count= 0;
550
551 if (worker->function_list == NULL)
552 {
553 return GEARMAN_NO_REGISTERED_FUNCTIONS;
554 }
555
556 /* Lets find out if we have any functions left that are valid */
557 for (function= worker->function_list; function;
558 function= function->next)
559 {
560 if (function->options.remove == false)
561 {
562 count++;
563 }
564 }
565
566 if (count == 0)
567 {
568 return GEARMAN_NO_REGISTERED_FUNCTIONS;
569 }
570
571 gearman_packet_free(&(worker->function_list->packet()));
572
573 gearman_return_t ret= gearman_packet_create_args(worker->universal,
574 worker->function_list->packet(),
575 GEARMAN_MAGIC_REQUEST,
576 GEARMAN_COMMAND_RESET_ABILITIES,
577 NULL, NULL, 0);
578 if (gearman_failed(ret))
579 {
580 worker->function_list->options.packet_in_use= false;
581
582 return ret;
583 }
584
585 while (worker->function_list->next)
586 {
587 _worker_function_free(worker, worker->function_list->next);
588 }
589
590 worker->function_list->options.change= true;
591 worker->function_list->options.remove= true;
592
593 worker->options.change= true;
594
595 return GEARMAN_SUCCESS;
596 }
597
598 return GEARMAN_INVALID_ARGUMENT;
599 }
600
gearman_worker_grab_job(gearman_worker_st * worker,gearman_job_st * job,gearman_return_t * ret_ptr)601 gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
602 gearman_job_st *job,
603 gearman_return_t *ret_ptr)
604 {
605 if (worker)
606 {
607 struct _worker_function_st *function;
608 uint32_t active;
609
610 gearman_return_t unused;
611 if (not ret_ptr)
612 {
613 ret_ptr= &unused;
614 }
615
616 while (1)
617 {
618 switch (worker->state)
619 {
620 case GEARMAN_WORKER_STATE_START:
621 /* If there are any new functions changes, send them now. */
622 if (worker->options.change)
623 {
624 worker->function= worker->function_list;
625 while (worker->function)
626 {
627 if (not (worker->function->options.change))
628 {
629 worker->function= worker->function->next;
630 continue;
631 }
632
633 for (worker->con= (&worker->universal)->con_list; worker->con;
634 worker->con= worker->con->next)
635 {
636 if (worker->con->fd == -1)
637 {
638 continue;
639 }
640
641 case GEARMAN_WORKER_STATE_FUNCTION_SEND:
642 *ret_ptr= worker->con->send_packet(worker->function->packet(), true);
643 if (gearman_failed(*ret_ptr))
644 {
645 if (*ret_ptr == GEARMAN_IO_WAIT)
646 {
647 worker->state= GEARMAN_WORKER_STATE_FUNCTION_SEND;
648 }
649 else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
650 {
651 continue;
652 }
653
654 return NULL;
655 }
656 }
657
658 if (worker->function->options.remove)
659 {
660 function= worker->function->prev;
661 _worker_function_free(worker, worker->function);
662 if (function == NULL)
663 worker->function= worker->function_list;
664 else
665 worker->function= function;
666 }
667 else
668 {
669 worker->function->options.change= false;
670 worker->function= worker->function->next;
671 }
672 }
673
674 worker->options.change= false;
675 }
676
677 if (not worker->function_list)
678 {
679 gearman_error(worker->universal, GEARMAN_NO_REGISTERED_FUNCTIONS, "no functions have been registered");
680 *ret_ptr= GEARMAN_NO_REGISTERED_FUNCTIONS;
681 return NULL;
682 }
683
684 for (worker->con= (&worker->universal)->con_list; worker->con;
685 worker->con= worker->con->next)
686 {
687 /* If the connection to the job server is not active, start it. */
688 if (worker->con->fd == -1)
689 {
690 for (worker->function= worker->function_list;
691 worker->function;
692 worker->function= worker->function->next)
693 {
694 case GEARMAN_WORKER_STATE_CONNECT:
695 *ret_ptr= worker->con->send_packet(worker->function->packet(), true);
696 if (gearman_failed(*ret_ptr))
697 {
698 if (*ret_ptr == GEARMAN_IO_WAIT)
699 {
700 worker->state= GEARMAN_WORKER_STATE_CONNECT;
701 }
702 else if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT or *ret_ptr == GEARMAN_LOST_CONNECTION)
703 {
704 break;
705 }
706
707 return NULL;
708 }
709 }
710
711 if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
712 {
713 continue;
714 }
715 }
716
717 case GEARMAN_WORKER_STATE_GRAB_JOB_SEND:
718 if (worker->con->fd == -1)
719 continue;
720
721 *ret_ptr= worker->con->send_packet(worker->grab_job, true);
722 if (gearman_failed(*ret_ptr))
723 {
724 if (*ret_ptr == GEARMAN_IO_WAIT)
725 {
726 worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
727 }
728 else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
729 {
730 continue;
731 }
732
733 return NULL;
734 }
735
736 if (not worker->job)
737 {
738 worker->job= gearman_job_create(worker, job);
739 if (not worker->job)
740 {
741 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
742 return NULL;
743 }
744 }
745
746 while (1)
747 {
748 case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
749 assert(worker->job);
750 (void)worker->con->receiving(worker->job->assigned, *ret_ptr, true);
751
752 if (gearman_failed(*ret_ptr))
753 {
754 if (*ret_ptr == GEARMAN_IO_WAIT)
755 {
756 worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_RECV;
757 }
758 else
759 {
760 gearman_job_free(worker->job);
761 worker->job= NULL;
762
763 if (*ret_ptr == GEARMAN_LOST_CONNECTION)
764 {
765 break;
766 }
767 }
768
769 return NULL;
770 }
771
772 if (worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN or
773 worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL or
774 worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
775 {
776 worker->job->options.assigned_in_use= true;
777 worker->job->con= worker->con;
778 worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
779 job= worker->job;
780 worker->job= NULL;
781
782 return job;
783 }
784
785 if (worker->job->assigned.command == GEARMAN_COMMAND_NO_JOB or
786 worker->job->assigned.command == GEARMAN_COMMAND_OPTION_RES)
787 {
788 gearman_packet_free(&(worker->job->assigned));
789 break;
790 }
791
792 if (worker->job->assigned.command != GEARMAN_COMMAND_NOOP)
793 {
794 gearman_universal_set_error(worker->universal, GEARMAN_UNEXPECTED_PACKET, GEARMAN_AT,
795 "unexpected packet:%s",
796 gearman_command_info(worker->job->assigned.command)->name);
797 gearman_packet_free(&(worker->job->assigned));
798 gearman_job_free(worker->job);
799 worker->job= NULL;
800 *ret_ptr= GEARMAN_UNEXPECTED_PACKET;
801 return NULL;
802 }
803
804 gearman_packet_free(&(worker->job->assigned));
805 }
806 }
807
808 case GEARMAN_WORKER_STATE_PRE_SLEEP:
809 for (worker->con= (&worker->universal)->con_list; worker->con;
810 worker->con= worker->con->next)
811 {
812 if (worker->con->fd == INVALID_SOCKET)
813 {
814 continue;
815 }
816
817 *ret_ptr= worker->con->send_packet(worker->pre_sleep, true);
818 if (gearman_failed(*ret_ptr))
819 {
820 if (*ret_ptr == GEARMAN_IO_WAIT)
821 {
822 worker->state= GEARMAN_WORKER_STATE_PRE_SLEEP;
823 }
824 else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
825 {
826 continue;
827 }
828
829 return NULL;
830 }
831 }
832
833 worker->state= GEARMAN_WORKER_STATE_START;
834
835 /* Set a watch on all active connections that we sent a PRE_SLEEP to. */
836 active= 0;
837 for (worker->con= worker->universal.con_list; worker->con; worker->con= worker->con->next)
838 {
839 if (worker->con->fd == INVALID_SOCKET)
840 {
841 continue;
842 }
843
844 worker->con->set_events(POLLIN);
845 active++;
846 }
847
848 if ((&worker->universal)->options.non_blocking)
849 {
850 *ret_ptr= GEARMAN_NO_JOBS;
851 return NULL;
852 }
853
854 if (active == 0)
855 {
856 if (worker->universal.timeout < 0)
857 {
858 gearman_nap(GEARMAN_WORKER_WAIT_TIMEOUT);
859 }
860 else
861 {
862 if (worker->universal.timeout > 0)
863 {
864 gearman_nap(worker->universal);
865 }
866
867 if (worker->options.timeout_return)
868 {
869 *ret_ptr= gearman_error(worker->universal, GEARMAN_TIMEOUT, "Option timeout return reached");
870
871 return NULL;
872 }
873 }
874 }
875 else
876 {
877 *ret_ptr= gearman_wait(worker->universal);
878 if (gearman_failed(*ret_ptr) and (*ret_ptr != GEARMAN_TIMEOUT or worker->options.timeout_return))
879 {
880 return NULL;
881 }
882 }
883
884 break;
885 }
886 }
887 }
888
889 return NULL;
890 }
891
gearman_job_free_all(gearman_worker_st * worker)892 void gearman_job_free_all(gearman_worker_st *worker)
893 {
894 if (worker)
895 {
896 while (worker->job_list)
897 {
898 gearman_job_free(worker->job_list);
899 }
900 }
901 }
902
gearman_worker_add_function(gearman_worker_st * worker,const char * function_name,uint32_t timeout,gearman_worker_fn * worker_fn,void * context)903 gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
904 const char *function_name,
905 uint32_t timeout,
906 gearman_worker_fn *worker_fn,
907 void *context)
908 {
909 if (worker)
910 {
911 if (function_name == NULL)
912 {
913 return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name not given");
914 }
915
916 if (worker_fn == NULL)
917 {
918 return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function not given");
919 }
920 gearman_function_t local= gearman_function_create_v1(worker_fn);
921
922 return _worker_function_create(worker,
923 function_name, strlen(function_name),
924 local,
925 timeout,
926 context);
927 }
928
929 return GEARMAN_INVALID_ARGUMENT;
930 }
931
gearman_worker_define_function(gearman_worker_st * worker,const char * function_name,const size_t function_name_length,const gearman_function_t function,const uint32_t timeout,void * context)932 gearman_return_t gearman_worker_define_function(gearman_worker_st *worker,
933 const char *function_name, const size_t function_name_length,
934 const gearman_function_t function,
935 const uint32_t timeout,
936 void *context)
937 {
938 if (worker)
939 {
940 if (function_name == NULL or function_name_length == 0)
941 {
942 return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name not given");
943 }
944
945 return _worker_function_create(worker,
946 function_name, function_name_length,
947 function,
948 timeout,
949 context);
950 }
951
952 return GEARMAN_INVALID_ARGUMENT;
953 }
954
gearman_worker_reset_error(gearman_worker_st * worker)955 void gearman_worker_reset_error(gearman_worker_st *worker)
956 {
957 if (worker)
958 {
959 universal_reset_error(worker->universal);
960 }
961 }
962
gearman_worker_work(gearman_worker_st * worker)963 gearman_return_t gearman_worker_work(gearman_worker_st *worker)
964 {
965 bool shutdown= false;
966
967 if (worker)
968 {
969 universal_reset_error(worker->universal);
970
971 switch (worker->work_state)
972 {
973 case GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB:
974 {
975 gearman_return_t ret;
976 worker->work_job= gearman_worker_grab_job(worker, NULL, &ret);
977
978 if (gearman_failed(ret))
979 {
980 if (ret == GEARMAN_COULD_NOT_CONNECT)
981 {
982 gearman_reset(worker->universal);
983 }
984 return ret;
985 }
986 assert(worker->work_job);
987
988 for (worker->work_function= worker->function_list;
989 worker->work_function;
990 worker->work_function= worker->work_function->next)
991 {
992 if (not strcmp(gearman_job_function_name(worker->work_job),
993 worker->work_function->function_name))
994 {
995 break;
996 }
997 }
998
999 if (not worker->work_function)
1000 {
1001 gearman_job_free(worker->work_job);
1002 worker->work_job= NULL;
1003 return gearman_error(worker->universal, GEARMAN_INVALID_FUNCTION_NAME, "Function not found");
1004 }
1005
1006 if (not worker->work_function->has_callback())
1007 {
1008 gearman_job_free(worker->work_job);
1009 worker->work_job= NULL;
1010 return gearman_error(worker->universal, GEARMAN_INVALID_FUNCTION_NAME, "Neither a gearman_worker_fn, or gearman_function_fn callback was supplied");
1011 }
1012
1013 worker->work_result_size= 0;
1014 }
1015
1016 case GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION:
1017 {
1018 switch (worker->work_function->callback(worker->work_job,
1019 static_cast<void *>(worker->work_function->context)))
1020 {
1021 case GEARMAN_FUNCTION_INVALID_ARGUMENT:
1022 worker->work_job->error_code= gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "worker returned an invalid response, gearman_return_t");
1023 case GEARMAN_FUNCTION_FATAL:
1024 if (gearman_job_send_fail_fin(worker->work_job) == GEARMAN_LOST_CONNECTION) // If we fail this, we have no connection, @note this causes us to lose the current error
1025 {
1026 worker->work_job->error_code= GEARMAN_LOST_CONNECTION;
1027 break;
1028 }
1029 worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
1030 return worker->work_job->error_code;
1031
1032 case GEARMAN_FUNCTION_ERROR: // retry
1033 gearman_reset(worker->universal);
1034 worker->work_job->error_code= GEARMAN_LOST_CONNECTION;
1035 break;
1036
1037 case GEARMAN_FUNCTION_SHUTDOWN:
1038 shutdown= true;
1039
1040 case GEARMAN_FUNCTION_SUCCESS:
1041 break;
1042 }
1043
1044 if (worker->work_job->error_code == GEARMAN_LOST_CONNECTION)
1045 {
1046 break;
1047 }
1048 }
1049
1050 case GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE:
1051 {
1052 worker->work_job->error_code= gearman_job_send_complete_fin(worker->work_job,
1053 worker->work_result, worker->work_result_size);
1054 if (worker->work_job->error_code == GEARMAN_IO_WAIT)
1055 {
1056 worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE;
1057 return gearman_error(worker->universal, worker->work_job->error_code,
1058 "A failure occurred after worker had successful complete, unless gearman_job_send_complete() was called directly by worker, client has not been informed of success.");
1059 }
1060
1061 if (worker->work_result)
1062 {
1063 gearman_free(worker->universal, worker->work_result);
1064 worker->work_result= NULL;
1065 }
1066
1067 // If we lost the connection, we retry the work, otherwise we error
1068 if (worker->work_job->error_code == GEARMAN_LOST_CONNECTION)
1069 {
1070 break;
1071 }
1072 else if (worker->work_job->error_code == GEARMAN_SHUTDOWN)
1073 { }
1074 else if (gearman_failed(worker->work_job->error_code))
1075 {
1076 worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
1077
1078 return worker->work_job->error_code;
1079 }
1080 }
1081 break;
1082
1083 case GEARMAN_WORKER_WORK_UNIVERSAL_FAIL:
1084 {
1085 if (gearman_failed(worker->work_job->error_code= gearman_job_send_fail_fin(worker->work_job)))
1086 {
1087 if (worker->work_job->error_code == GEARMAN_LOST_CONNECTION)
1088 {
1089 break;
1090 }
1091
1092 return worker->work_job->error_code;
1093 }
1094 }
1095 break;
1096 }
1097
1098 gearman_job_free(worker->work_job);
1099 worker->work_job= NULL;
1100
1101 worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
1102
1103 if (shutdown)
1104 {
1105 return GEARMAN_SHUTDOWN;
1106 }
1107
1108 return GEARMAN_SUCCESS;
1109 }
1110
1111 return GEARMAN_INVALID_ARGUMENT;
1112 }
1113
gearman_worker_echo(gearman_worker_st * worker,const void * workload,size_t workload_size)1114 gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
1115 const void *workload,
1116 size_t workload_size)
1117 {
1118 if (worker)
1119 {
1120 return gearman_echo(worker->universal, workload, workload_size);
1121 }
1122
1123 return GEARMAN_INVALID_ARGUMENT;
1124 }
1125
1126 /*
1127 * Static Definitions
1128 */
1129
_worker_allocate(gearman_worker_st * worker,bool is_clone)1130 static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone)
1131 {
1132 if (worker)
1133 {
1134 worker->options.allocated= false;
1135 }
1136 else
1137 {
1138 worker= new (std::nothrow) gearman_worker_st;
1139 if (worker == NULL)
1140 {
1141 return NULL;
1142 }
1143
1144 worker->options.allocated= true;
1145 }
1146
1147 worker->options.non_blocking= false;
1148 worker->options.packet_init= false;
1149 worker->options.change= false;
1150 worker->options.grab_uniq= true;
1151 worker->options.grab_all= true;
1152 worker->options.timeout_return= false;
1153
1154 worker->state= GEARMAN_WORKER_STATE_START;
1155 worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
1156 worker->function_count= 0;
1157 worker->job_count= 0;
1158 worker->work_result_size= 0;
1159 worker->context= NULL;
1160 worker->con= NULL;
1161 worker->job= NULL;
1162 worker->job_list= NULL;
1163 worker->function= NULL;
1164 worker->function_list= NULL;
1165 worker->work_function= NULL;
1166 worker->work_result= NULL;
1167
1168 if (is_clone == false)
1169 {
1170 gearman_universal_initialize(worker->universal);
1171 #if 0
1172 gearman_universal_set_timeout(worker->universal, GEARMAN_WORKER_WAIT_TIMEOUT);
1173 #endif
1174 }
1175
1176 if (setup_shutdown_pipe(worker->universal.wakeup_fd) == false)
1177 {
1178 if (worker->options.allocated)
1179 {
1180 delete worker;
1181 }
1182
1183 return NULL;
1184 }
1185
1186 return worker;
1187 }
1188
_worker_packet_init(gearman_worker_st * worker)1189 static gearman_return_t _worker_packet_init(gearman_worker_st *worker)
1190 {
1191 gearman_return_t ret= gearman_packet_create_args(worker->universal, worker->grab_job,
1192 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_GRAB_JOB_ALL,
1193 NULL, NULL, 0);
1194 if (gearman_failed(ret))
1195 {
1196 return ret;
1197 }
1198
1199 ret= gearman_packet_create_args(worker->universal, worker->pre_sleep,
1200 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_PRE_SLEEP,
1201 NULL, NULL, 0);
1202 if (gearman_failed(ret))
1203 {
1204 gearman_packet_free(&(worker->grab_job));
1205 return ret;
1206 }
1207
1208 worker->options.packet_init= true;
1209
1210 return GEARMAN_SUCCESS;
1211 }
1212
_worker_add_server(const char * host,in_port_t port,void * context)1213 static gearman_return_t _worker_add_server(const char *host, in_port_t port, void *context)
1214 {
1215 return gearman_worker_add_server(static_cast<gearman_worker_st *>(context), host, port);
1216 }
1217
_worker_function_create(gearman_worker_st * worker,const char * function_name,size_t function_length,const gearman_function_t & function_arg,uint32_t timeout,void * context)1218 static gearman_return_t _worker_function_create(gearman_worker_st *worker,
1219 const char *function_name, size_t function_length,
1220 const gearman_function_t &function_arg,
1221 uint32_t timeout,
1222 void *context)
1223 {
1224 const void *args[2];
1225 size_t args_size[2];
1226
1227 if (worker == NULL)
1228 {
1229 return GEARMAN_INVALID_ARGUMENT;
1230 }
1231
1232 if (function_length == 0 or function_name == NULL or function_length > GEARMAN_FUNCTION_MAX_SIZE)
1233 {
1234 if (function_length > GEARMAN_FUNCTION_MAX_SIZE)
1235 {
1236 gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name longer then GEARMAN_MAX_FUNCTION_SIZE");
1237 }
1238 else
1239 {
1240 gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "invalid function");
1241 }
1242
1243 return GEARMAN_INVALID_ARGUMENT;
1244 }
1245
1246 _worker_function_st *function= make(worker->universal._namespace, function_name, function_length, function_arg, context);
1247 if (function == NULL)
1248 {
1249 gearman_perror(worker->universal, "_worker_function_st::new()");
1250 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
1251 }
1252
1253 gearman_return_t ret;
1254 if (timeout > 0)
1255 {
1256 char timeout_buffer[11];
1257 snprintf(timeout_buffer, sizeof(timeout_buffer), "%u", timeout);
1258 args[0]= function->name();
1259 args_size[0]= function->length() + 1;
1260 args[1]= timeout_buffer;
1261 args_size[1]= strlen(timeout_buffer);
1262 ret= gearman_packet_create_args(worker->universal, function->packet(),
1263 GEARMAN_MAGIC_REQUEST,
1264 GEARMAN_COMMAND_CAN_DO_TIMEOUT,
1265 args, args_size, 2);
1266 }
1267 else
1268 {
1269 args[0]= function->name();
1270 args_size[0]= function->length();
1271 ret= gearman_packet_create_args(worker->universal, function->packet(),
1272 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
1273 args, args_size, 1);
1274 }
1275
1276 if (gearman_failed(ret))
1277 {
1278 delete function;
1279
1280 return ret;
1281 }
1282
1283 if (worker->function_list)
1284 {
1285 worker->function_list->prev= function;
1286 }
1287
1288 function->next= worker->function_list;
1289 function->prev= NULL;
1290 worker->function_list= function;
1291 worker->function_count++;
1292
1293 worker->options.change= true;
1294
1295 return GEARMAN_SUCCESS;
1296 }
1297
_worker_function_free(gearman_worker_st * worker,struct _worker_function_st * function)1298 static void _worker_function_free(gearman_worker_st *worker,
1299 struct _worker_function_st *function)
1300 {
1301 if (worker->function_list == function)
1302 {
1303 worker->function_list= function->next;
1304 }
1305
1306 if (function->prev)
1307 {
1308 function->prev->next= function->next;
1309 }
1310
1311 if (function->next)
1312 {
1313 function->next->prev= function->prev;
1314 }
1315 worker->function_count--;
1316
1317 delete function;
1318 }
1319
gearman_worker_set_memory_allocators(gearman_worker_st * worker,gearman_malloc_fn * malloc_fn,gearman_free_fn * free_fn,gearman_realloc_fn * realloc_fn,gearman_calloc_fn * calloc_fn,void * context)1320 gearman_return_t gearman_worker_set_memory_allocators(gearman_worker_st *worker,
1321 gearman_malloc_fn *malloc_fn,
1322 gearman_free_fn *free_fn,
1323 gearman_realloc_fn *realloc_fn,
1324 gearman_calloc_fn *calloc_fn,
1325 void *context)
1326 {
1327 if (worker == NULL)
1328 {
1329 return GEARMAN_INVALID_ARGUMENT;
1330 }
1331
1332 return gearman_set_memory_allocator(worker->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
1333 }
1334
gearman_worker_set_server_option(gearman_worker_st * self,const char * option_arg,size_t option_arg_size)1335 bool gearman_worker_set_server_option(gearman_worker_st *self, const char *option_arg, size_t option_arg_size)
1336 {
1337 gearman_string_t option= { option_arg, option_arg_size };
1338
1339 return gearman_request_option(self->universal, option);
1340 }
1341
gearman_worker_set_namespace(gearman_worker_st * self,const char * namespace_key,size_t namespace_key_size)1342 void gearman_worker_set_namespace(gearman_worker_st *self, const char *namespace_key, size_t namespace_key_size)
1343 {
1344 if (self)
1345 {
1346 gearman_universal_set_namespace(self->universal, namespace_key, namespace_key_size);
1347 }
1348 }
1349
gearman_worker_id(gearman_worker_st * self)1350 gearman_id_t gearman_worker_id(gearman_worker_st *self)
1351 {
1352 if (self == NULL)
1353 {
1354 gearman_id_t handle= { INVALID_SOCKET, INVALID_SOCKET };
1355 return handle;
1356 }
1357
1358 return gearman_universal_id(self->universal);
1359 }
1360
gearman_job_clone_worker(gearman_job_st * job)1361 gearman_worker_st *gearman_job_clone_worker(gearman_job_st *job)
1362 {
1363 if (job)
1364 {
1365 return gearman_worker_clone(NULL, job->worker);
1366 }
1367
1368 return NULL;
1369 }
1370
gearman_worker_set_identifier(gearman_worker_st * worker,const char * id,size_t id_size)1371 gearman_return_t gearman_worker_set_identifier(gearman_worker_st *worker,
1372 const char *id, size_t id_size)
1373 {
1374 if (worker)
1375 {
1376 return gearman_set_identifier(worker->universal, id, id_size);
1377 }
1378
1379 return GEARMAN_INVALID_ARGUMENT;
1380 }
1381
gearman_worker_namespace(gearman_worker_st * self)1382 const char *gearman_worker_namespace(gearman_worker_st *self)
1383 {
1384 return gearman_univeral_namespace(self->universal);
1385 }
1386