1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2008 Brian Aker, Eric Day
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39
40 #include "gear_config.h"
41 #include <libgearman/common.h>
42
43 #include "libgearman/assert.hpp"
44 #include "libgearman/vector.h"
45
46 #include <cstdio>
47 #include <cstring>
48 #include <memory>
49
50 struct gearman_job_reducer_st {
51 gearman_universal_st &universal;
52 gearman_client_st *client;
53 gearman_result_st result;
54 gearman_vector_st *reducer_function;
55 gearman_aggregator_fn *aggregator_fn;
56
gearman_job_reducer_stgearman_job_reducer_st57 gearman_job_reducer_st(gearman_universal_st &universal_arg,
58 const gearman_string_t &reducer_function_name,
59 gearman_aggregator_fn *aggregator_fn_arg):
60 universal(universal_arg),
61 client(NULL),
62 reducer_function(NULL),
63 aggregator_fn(aggregator_fn_arg)
64 {
65 assert_msg(gearman_size(reducer_function_name), "Trying to creat a function with zero length");
66 reducer_function= gearman_string_create(NULL, gearman_size(reducer_function_name));
67 gearman_string_append(reducer_function, gearman_string_param(reducer_function_name));
68 }
69
~gearman_job_reducer_stgearman_job_reducer_st70 ~gearman_job_reducer_st()
71 {
72 gearman_client_free(client);
73 gearman_string_free(reducer_function);
74 }
75
initgearman_job_reducer_st76 bool init()
77 {
78 client= gearman_client_create(NULL);
79 if (not client)
80 return false;
81
82 if (universal._namespace)
83 {
84 gearman_client_set_namespace(client,
85 gearman_string_value(universal._namespace),
86 gearman_string_length(universal._namespace));
87 }
88
89 for (gearman_connection_st *con= universal.con_list; con; con= con->next)
90 {
91 if (gearman_failed(gearman_client_add_server(client, con->host, con->port)))
92 {
93 return false;
94 }
95 }
96
97 return true;
98 }
99
addgearman_job_reducer_st100 bool add(gearman_argument_t &arguments)
101 {
102 gearman_string_t function= gearman_string(reducer_function);
103 gearman_unique_t unique= gearman_unique_make(0, 0);
104 gearman_task_st *task= add_task(*client,
105 NULL,
106 GEARMAN_COMMAND_SUBMIT_JOB,
107 function,
108 unique,
109 arguments.value,
110 time_t(0),
111 gearman_actions_execute_defaults());
112 if (task == NULL)
113 {
114 gearman_universal_error_code(client->universal);
115
116 return false;
117 }
118
119 return true;
120 }
121
completegearman_job_reducer_st122 gearman_return_t complete()
123 {
124 gearman_return_t rc;
125 if (gearman_failed(rc= gearman_client_run_tasks(client)))
126 {
127 return rc;
128 }
129
130 gearman_task_st *check_task= client->task_list;
131
132 if (check_task)
133 {
134 do
135 {
136 if (gearman_failed(check_task->result_rc))
137 {
138 return check_task->result_rc;
139 }
140 } while ((check_task= gearman_next(check_task)));
141
142 if (aggregator_fn)
143 {
144 gearman_aggregator_st aggregator(client->context);
145 aggregator_fn(&aggregator, client->task_list, &result);
146 }
147 }
148
149 return GEARMAN_SUCCESS;
150 }
151 };
152
153 /**
154 * @addtogroup gearman_job_static Static Job Declarations
155 * @ingroup gearman_job
156 * @{
157 */
158
159 /**
160 * Send a packet for a job.
161 */
162 static gearman_return_t _job_send(gearman_job_st *job);
163
164 /*
165 * Public Definitions
166 */
167
gearman_job_create(gearman_worker_st * worker,gearman_job_st * job)168 gearman_job_st *gearman_job_create(gearman_worker_st *worker, gearman_job_st *job)
169 {
170 if (job)
171 {
172 job->options.allocated= false;
173 }
174 else
175 {
176 job= new (std::nothrow) gearman_job_st;
177 if (not job)
178 {
179 gearman_perror(worker->universal, "new");
180 return NULL;
181 }
182
183 job->options.allocated= true;
184 }
185
186 job->options.assigned_in_use= false;
187 job->options.work_in_use= false;
188 job->options.finished= false;
189
190 job->worker= worker;
191 job->reducer= NULL;
192 job->error_code= GEARMAN_UNKNOWN_STATE;
193
194 if (worker->job_list)
195 {
196 worker->job_list->prev= job;
197 }
198 job->next= worker->job_list;
199 job->prev= NULL;
200 worker->job_list= job;
201 worker->job_count++;
202
203 job->con= NULL;
204
205 return job;
206 }
207
gearman_job_build_reducer(gearman_job_st * job,gearman_aggregator_fn * aggregator_fn)208 bool gearman_job_build_reducer(gearman_job_st *job, gearman_aggregator_fn *aggregator_fn)
209 {
210 if (job->reducer)
211 {
212 return true;
213 }
214
215 gearman_string_t reducer_func= gearman_job_reducer_string(job);
216
217 job->reducer= new (std::nothrow) gearman_job_reducer_st(job->worker->universal, reducer_func, aggregator_fn);
218 if (not job->reducer)
219 {
220 gearman_job_free(job);
221 return false;
222 }
223
224 if (not job->reducer->init())
225 {
226 gearman_job_free(job);
227 return false;
228 }
229
230 return true;
231 }
232
gearman_job_reset_error(gearman_job_st * job)233 static inline void gearman_job_reset_error(gearman_job_st *job)
234 {
235 if (job->worker)
236 {
237 gearman_worker_reset_error(job->worker);
238 }
239 }
240
gearman_job_send_data(gearman_job_st * job,const void * data,size_t data_size)241 gearman_return_t gearman_job_send_data(gearman_job_st *job, const void *data, size_t data_size)
242 {
243 if (job)
244 {
245 const void *args[2];
246 size_t args_size[2];
247
248 if (job->reducer)
249 {
250 gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(data), data_size);
251 job->reducer->add(value);
252
253 return GEARMAN_SUCCESS;
254 }
255
256 if ((job->options.work_in_use) == false)
257 {
258 args[0]= job->assigned.arg[0];
259 args_size[0]= job->assigned.arg_size[0];
260 args[1]= data;
261 args_size[1]= data_size;
262 gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
263 GEARMAN_MAGIC_REQUEST,
264 GEARMAN_COMMAND_WORK_DATA,
265 args, args_size, 2);
266 if (gearman_failed(ret))
267 {
268 return ret;
269 }
270
271 job->options.work_in_use= true;
272 }
273
274 return _job_send(job);
275 }
276
277 return GEARMAN_INVALID_ARGUMENT;
278 }
279
gearman_job_send_warning(gearman_job_st * job,const void * warning,size_t warning_size)280 gearman_return_t gearman_job_send_warning(gearman_job_st *job,
281 const void *warning,
282 size_t warning_size)
283 {
284 if (job)
285 {
286 const void *args[2];
287 size_t args_size[2];
288
289 if ((job->options.work_in_use) == false)
290 {
291 args[0]= job->assigned.arg[0];
292 args_size[0]= job->assigned.arg_size[0];
293 args[1]= warning;
294 args_size[1]= warning_size;
295
296 gearman_return_t ret;
297 ret= gearman_packet_create_args(job->worker->universal, job->work,
298 GEARMAN_MAGIC_REQUEST,
299 GEARMAN_COMMAND_WORK_WARNING,
300 args, args_size, 2);
301 if (gearman_failed(ret))
302 {
303 return ret;
304 }
305
306 job->options.work_in_use= true;
307 }
308
309 return _job_send(job);
310 }
311
312 return GEARMAN_INVALID_ARGUMENT;
313 }
314
gearman_job_send_status(gearman_job_st * job,uint32_t numerator,uint32_t denominator)315 gearman_return_t gearman_job_send_status(gearman_job_st *job,
316 uint32_t numerator,
317 uint32_t denominator)
318 {
319 if (job)
320 {
321 char numerator_string[12];
322 char denominator_string[12];
323 const void *args[3];
324 size_t args_size[3];
325
326 if (not (job->options.work_in_use))
327 {
328 snprintf(numerator_string, 12, "%u", numerator);
329 snprintf(denominator_string, 12, "%u", denominator);
330
331 args[0]= job->assigned.arg[0];
332 args_size[0]= job->assigned.arg_size[0];
333 args[1]= numerator_string;
334 args_size[1]= strlen(numerator_string) + 1;
335 args[2]= denominator_string;
336 args_size[2]= strlen(denominator_string);
337
338 gearman_return_t ret;
339 ret= gearman_packet_create_args(job->worker->universal, job->work,
340 GEARMAN_MAGIC_REQUEST,
341 GEARMAN_COMMAND_WORK_STATUS,
342 args, args_size, 3);
343 if (gearman_failed(ret))
344 {
345 return ret;
346 }
347
348 job->options.work_in_use= true;
349 }
350
351 return _job_send(job);
352 }
353
354 return GEARMAN_INVALID_ARGUMENT;
355 }
356
gearman_job_send_complete(gearman_job_st * job,const void * result,size_t result_size)357 gearman_return_t gearman_job_send_complete(gearman_job_st *job,
358 const void *result,
359 size_t result_size)
360 {
361 if (job)
362 {
363 if (job->reducer)
364 {
365 return GEARMAN_INVALID_ARGUMENT;
366 }
367
368 return gearman_job_send_complete_fin(job, result, result_size);
369 }
370
371 return GEARMAN_INVALID_ARGUMENT;
372 }
373
gearman_job_send_complete_fin(gearman_job_st * job,const void * result,size_t result_size)374 gearman_return_t gearman_job_send_complete_fin(gearman_job_st *job,
375 const void *result, size_t result_size)
376 {
377 if (job)
378 {
379 if (job->options.finished)
380 {
381 return GEARMAN_SUCCESS;
382 }
383
384 if (job->reducer)
385 {
386 if (result_size)
387 {
388 gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(result), result_size);
389 job->reducer->add(value);
390 }
391
392 gearman_return_t rc= job->reducer->complete();
393 if (gearman_failed(rc))
394 {
395 return gearman_error(job->worker->universal, rc, "The reducer's complete() returned an error");
396 }
397
398 const gearman_vector_st *reduced_value= job->reducer->result.string();
399 if (reduced_value)
400 {
401 result= gearman_string_value(reduced_value);
402 result_size= gearman_string_length(reduced_value);
403 }
404 else
405 {
406 result= NULL;
407 result_size= 0;
408 }
409 }
410
411 const void *args[2];
412 size_t args_size[2];
413
414 if (not (job->options.work_in_use))
415 {
416 args[0]= job->assigned.arg[0];
417 args_size[0]= job->assigned.arg_size[0];
418
419 args[1]= result;
420 args_size[1]= result_size;
421 gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
422 GEARMAN_MAGIC_REQUEST,
423 GEARMAN_COMMAND_WORK_COMPLETE,
424 args, args_size, 2);
425 if (gearman_failed(ret))
426 {
427 return ret;
428 }
429 job->options.work_in_use= true;
430 }
431
432 gearman_return_t ret= _job_send(job);
433 if (gearman_failed(ret))
434 {
435 return ret;
436 }
437
438 job->options.finished= true;
439
440 return GEARMAN_SUCCESS;
441 }
442
443 return GEARMAN_INVALID_ARGUMENT;
444 }
445
gearman_job_send_exception(gearman_job_st * job,const void * exception,size_t exception_size)446 gearman_return_t gearman_job_send_exception(gearman_job_st *job,
447 const void *exception,
448 size_t exception_size)
449 {
450 if (job)
451 {
452 const void *args[2];
453 size_t args_size[2];
454
455 if (not (job->options.work_in_use))
456 {
457 args[0]= job->assigned.arg[0];
458 args_size[0]= job->assigned.arg_size[0];
459 args[1]= exception;
460 args_size[1]= exception_size;
461
462 gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
463 GEARMAN_MAGIC_REQUEST,
464 GEARMAN_COMMAND_WORK_EXCEPTION,
465 args, args_size, 2);
466 if (gearman_failed(ret))
467 return ret;
468
469 job->options.work_in_use= true;
470 }
471
472 return _job_send(job);
473 }
474
475 return GEARMAN_INVALID_ARGUMENT;
476 }
477
gearman_job_send_fail(gearman_job_st * job)478 gearman_return_t gearman_job_send_fail(gearman_job_st *job)
479 {
480 if (job)
481 {
482 if (job->reducer)
483 {
484 return GEARMAN_INVALID_ARGUMENT;
485 }
486
487 return gearman_job_send_fail_fin(job);
488 }
489
490 return GEARMAN_INVALID_ARGUMENT;
491 }
492
gearman_job_send_fail_fin(gearman_job_st * job)493 gearman_return_t gearman_job_send_fail_fin(gearman_job_st *job)
494 {
495 if (job)
496 {
497 const void *args[1];
498 size_t args_size[1];
499
500 if (job->options.finished)
501 {
502 return GEARMAN_SUCCESS;
503 }
504
505 if (not (job->options.work_in_use))
506 {
507 args[0]= job->assigned.arg[0];
508 args_size[0]= job->assigned.arg_size[0] - 1;
509 gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
510 GEARMAN_MAGIC_REQUEST,
511 GEARMAN_COMMAND_WORK_FAIL,
512 args, args_size, 1);
513 if (gearman_failed(ret))
514 {
515 return ret;
516 }
517
518 job->options.work_in_use= true;
519 }
520
521 gearman_return_t ret;
522 ret= _job_send(job);
523 if (gearman_failed(ret))
524 return ret;
525
526 job->options.finished= true;
527 return GEARMAN_SUCCESS;
528 }
529
530 return GEARMAN_INVALID_ARGUMENT;
531 }
532
gearman_job_handle(const gearman_job_st * job)533 const char *gearman_job_handle(const gearman_job_st *job)
534 {
535 if (job)
536 {
537 return static_cast<const char *>(job->assigned.arg[0]);
538 }
539
540 return NULL;
541 }
542
gearman_job_function_name(const gearman_job_st * job)543 const char *gearman_job_function_name(const gearman_job_st *job)
544 {
545 if (job)
546 {
547 return static_cast<char *>(job->assigned.arg[1]);
548 }
549
550 return NULL;
551 }
552
gearman_job_function_name_string(const gearman_job_st * job)553 gearman_string_t gearman_job_function_name_string(const gearman_job_st *job)
554 {
555 if (job)
556 {
557 gearman_string_t temp= { job->assigned.arg[1], job->assigned.arg_size[1] };
558 return temp;
559 }
560
561 static gearman_string_t ret= {0, 0};
562 return ret;
563 }
564
gearman_job_unique(const gearman_job_st * job)565 const char *gearman_job_unique(const gearman_job_st *job)
566 {
567 if (job)
568 {
569 if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ or
570 job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL)
571 {
572 return static_cast<const char *>(job->assigned.arg[2]);
573 }
574
575 return "";
576 }
577
578 return NULL;
579 }
580
gearman_job_is_map(const gearman_job_st * job)581 bool gearman_job_is_map(const gearman_job_st *job)
582 {
583 if (job)
584 {
585 return bool(job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL) and job->assigned.arg_size[3] > 1;
586 }
587
588 return false;
589 }
590
gearman_job_reducer_string(const gearman_job_st * job)591 gearman_string_t gearman_job_reducer_string(const gearman_job_st *job)
592 {
593 if (job)
594 {
595 if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL and job->assigned.arg_size[3] > 1)
596 {
597 gearman_string_t temp= { job->assigned.arg[3], job->assigned.arg_size[3] -1 };
598 return temp;
599 }
600
601 static gearman_string_t null_temp= { gearman_literal_param("") };
602
603 return null_temp;
604 }
605
606 static gearman_string_t ret= {0, 0};
607 return ret;
608 }
609
gearman_job_reducer(const gearman_job_st * job)610 const char *gearman_job_reducer(const gearman_job_st *job)
611 {
612 if (job)
613 {
614 if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL)
615 {
616 return static_cast<const char *>(job->assigned.arg[3]);
617 }
618
619 return "";
620 }
621
622 return NULL;
623 }
624
gearman_job_workload(const gearman_job_st * job)625 const void *gearman_job_workload(const gearman_job_st *job)
626 {
627 if (job)
628 {
629 return job->assigned.data;
630 }
631
632 return NULL;
633 }
634
gearman_job_workload_size(const gearman_job_st * job)635 size_t gearman_job_workload_size(const gearman_job_st *job)
636 {
637 if (job)
638 {
639 return job->assigned.data_size;
640 }
641
642 return 0;
643 }
644
gearman_job_take_workload(gearman_job_st * job,size_t * data_size)645 void *gearman_job_take_workload(gearman_job_st *job, size_t *data_size)
646 {
647 if (job)
648 {
649 return gearman_packet_take_data(job->assigned, data_size);
650 }
651
652 return NULL;
653 }
654
gearman_job_free(gearman_job_st * job)655 void gearman_job_free(gearman_job_st *job)
656 {
657 if (job)
658 {
659 if (job->options.assigned_in_use)
660 {
661 gearman_packet_free(&(job->assigned));
662 }
663
664 if (job->options.work_in_use)
665 {
666 gearman_packet_free(&(job->work));
667 }
668
669 if (job->worker->job_list == job)
670 {
671 job->worker->job_list= job->next;
672 }
673
674 if (job->prev)
675 {
676 job->prev->next= job->next;
677 }
678
679 if (job->next)
680 {
681 job->next->prev= job->prev;
682 }
683 job->worker->job_count--;
684
685 delete job->reducer;
686 job->reducer= NULL;
687
688 if (job->options.allocated)
689 {
690 delete job;
691 }
692 }
693 }
694
695 /*
696 * Static Definitions
697 */
698
_job_send(gearman_job_st * job)699 static gearman_return_t _job_send(gearman_job_st *job)
700 {
701 if (job)
702 {
703 gearman_return_t ret= job->con->send_packet(job->work, true);
704
705 while ((ret == GEARMAN_IO_WAIT) or (ret == GEARMAN_TIMEOUT))
706 {
707 #if 0
708 assert(job->work.universal);
709 ret= gearman_wait(*(job->work.universal));
710 #endif
711 ret= gearman_wait(job->worker->universal);
712 if (ret == GEARMAN_SUCCESS)
713 {
714 ret= job->con->send_packet(job->work, true);
715 }
716 }
717
718 if (gearman_failed(ret))
719 {
720 return ret;
721 }
722
723 gearman_packet_free(&(job->work));
724 job->options.work_in_use= false;
725
726 return GEARMAN_SUCCESS;
727 }
728
729 return GEARMAN_INVALID_ARGUMENT;
730 }
731
gearman_job_error(gearman_job_st * job)732 const char *gearman_job_error(gearman_job_st *job)
733 {
734 if (job and job->worker)
735 {
736 return gearman_worker_error(job->worker);
737 }
738
739 return NULL;
740 }
741