1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2008 Brian Aker, Eric Day
7  *  All rights reserved.
8  *
9  *  Redistribution and use in source and binary forms, with or without
10  *  modification, are permitted provided that the following conditions are
11  *  met:
12  *
13  *      * Redistributions of source code must retain the above copyright
14  *  notice, this list of conditions and the following disclaimer.
15  *
16  *      * Redistributions in binary form must reproduce the above
17  *  copyright notice, this list of conditions and the following disclaimer
18  *  in the documentation and/or other materials provided with the
19  *  distribution.
20  *
21  *      * The names of its contributors may not be used to endorse or
22  *  promote products derived from this software without specific prior
23  *  written permission.
24  *
25  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  */
38 
39 
40 #include "gear_config.h"
41 #include <libgearman/common.h>
42 
43 #include "libgearman/assert.hpp"
44 #include "libgearman/vector.h"
45 
46 #include <cstdio>
47 #include <cstring>
48 #include <memory>
49 
50 struct gearman_job_reducer_st {
51   gearman_universal_st &universal;
52   gearman_client_st *client;
53   gearman_result_st result;
54   gearman_vector_st *reducer_function;
55   gearman_aggregator_fn *aggregator_fn;
56 
gearman_job_reducer_stgearman_job_reducer_st57   gearman_job_reducer_st(gearman_universal_st &universal_arg,
58                          const gearman_string_t &reducer_function_name,
59                          gearman_aggregator_fn *aggregator_fn_arg):
60     universal(universal_arg),
61     client(NULL),
62     reducer_function(NULL),
63     aggregator_fn(aggregator_fn_arg)
64   {
65     assert_msg(gearman_size(reducer_function_name), "Trying to creat a function with zero length");
66     reducer_function= gearman_string_create(NULL, gearman_size(reducer_function_name));
67     gearman_string_append(reducer_function, gearman_string_param(reducer_function_name));
68   }
69 
~gearman_job_reducer_stgearman_job_reducer_st70   ~gearman_job_reducer_st()
71   {
72     gearman_client_free(client);
73     gearman_string_free(reducer_function);
74   }
75 
initgearman_job_reducer_st76   bool init()
77   {
78     client= gearman_client_create(NULL);
79     if (not client)
80       return false;
81 
82     if (universal._namespace)
83     {
84       gearman_client_set_namespace(client,
85                                    gearman_string_value(universal._namespace),
86                                    gearman_string_length(universal._namespace));
87     }
88 
89     for (gearman_connection_st *con= universal.con_list; con; con= con->next)
90     {
91       if (gearman_failed(gearman_client_add_server(client, con->host, con->port)))
92       {
93         return false;
94       }
95     }
96 
97     return true;
98   }
99 
addgearman_job_reducer_st100   bool add(gearman_argument_t &arguments)
101   {
102     gearman_string_t function= gearman_string(reducer_function);
103     gearman_unique_t unique= gearman_unique_make(0, 0);
104     gearman_task_st *task= add_task(*client,
105                                     NULL,
106                                     GEARMAN_COMMAND_SUBMIT_JOB,
107                                     function,
108                                     unique,
109                                     arguments.value,
110                                     time_t(0),
111                                     gearman_actions_execute_defaults());
112     if (task == NULL)
113     {
114       gearman_universal_error_code(client->universal);
115 
116       return false;
117     }
118 
119     return true;
120   }
121 
completegearman_job_reducer_st122   gearman_return_t complete()
123   {
124     gearman_return_t rc;
125     if (gearman_failed(rc= gearman_client_run_tasks(client)))
126     {
127       return rc;
128     }
129 
130     gearman_task_st *check_task= client->task_list;
131 
132     if (check_task)
133     {
134       do
135       {
136         if (gearman_failed(check_task->result_rc))
137         {
138           return check_task->result_rc;
139         }
140       } while ((check_task= gearman_next(check_task)));
141 
142       if (aggregator_fn)
143       {
144         gearman_aggregator_st aggregator(client->context);
145         aggregator_fn(&aggregator, client->task_list, &result);
146       }
147     }
148 
149     return GEARMAN_SUCCESS;
150   }
151 };
152 
153 /**
154  * @addtogroup gearman_job_static Static Job Declarations
155  * @ingroup gearman_job
156  * @{
157  */
158 
159 /**
160  * Send a packet for a job.
161  */
162 static gearman_return_t _job_send(gearman_job_st *job);
163 
164 /*
165  * Public Definitions
166  */
167 
gearman_job_create(gearman_worker_st * worker,gearman_job_st * job)168 gearman_job_st *gearman_job_create(gearman_worker_st *worker, gearman_job_st *job)
169 {
170   if (job)
171   {
172     job->options.allocated= false;
173   }
174   else
175   {
176     job= new (std::nothrow) gearman_job_st;
177     if (not job)
178     {
179       gearman_perror(worker->universal, "new");
180       return NULL;
181     }
182 
183     job->options.allocated= true;
184   }
185 
186   job->options.assigned_in_use= false;
187   job->options.work_in_use= false;
188   job->options.finished= false;
189 
190   job->worker= worker;
191   job->reducer= NULL;
192   job->error_code= GEARMAN_UNKNOWN_STATE;
193 
194   if (worker->job_list)
195   {
196     worker->job_list->prev= job;
197   }
198   job->next= worker->job_list;
199   job->prev= NULL;
200   worker->job_list= job;
201   worker->job_count++;
202 
203   job->con= NULL;
204 
205   return job;
206 }
207 
gearman_job_build_reducer(gearman_job_st * job,gearman_aggregator_fn * aggregator_fn)208 bool gearman_job_build_reducer(gearman_job_st *job, gearman_aggregator_fn *aggregator_fn)
209 {
210   if (job->reducer)
211   {
212     return true;
213   }
214 
215   gearman_string_t reducer_func= gearman_job_reducer_string(job);
216 
217   job->reducer= new (std::nothrow) gearman_job_reducer_st(job->worker->universal, reducer_func, aggregator_fn);
218   if (not job->reducer)
219   {
220     gearman_job_free(job);
221     return false;
222   }
223 
224   if (not job->reducer->init())
225   {
226     gearman_job_free(job);
227     return false;
228   }
229 
230   return true;
231 }
232 
gearman_job_reset_error(gearman_job_st * job)233 static inline void gearman_job_reset_error(gearman_job_st *job)
234 {
235   if (job->worker)
236   {
237     gearman_worker_reset_error(job->worker);
238   }
239 }
240 
gearman_job_send_data(gearman_job_st * job,const void * data,size_t data_size)241 gearman_return_t gearman_job_send_data(gearman_job_st *job, const void *data, size_t data_size)
242 {
243   if (job)
244   {
245     const void *args[2];
246     size_t args_size[2];
247 
248     if (job->reducer)
249     {
250       gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(data), data_size);
251       job->reducer->add(value);
252 
253       return GEARMAN_SUCCESS;
254     }
255 
256     if ((job->options.work_in_use) == false)
257     {
258       args[0]= job->assigned.arg[0];
259       args_size[0]= job->assigned.arg_size[0];
260       args[1]= data;
261       args_size[1]= data_size;
262       gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
263                                                        GEARMAN_MAGIC_REQUEST,
264                                                        GEARMAN_COMMAND_WORK_DATA,
265                                                        args, args_size, 2);
266       if (gearman_failed(ret))
267       {
268         return ret;
269       }
270 
271       job->options.work_in_use= true;
272     }
273 
274     return _job_send(job);
275   }
276 
277   return GEARMAN_INVALID_ARGUMENT;
278 }
279 
gearman_job_send_warning(gearman_job_st * job,const void * warning,size_t warning_size)280 gearman_return_t gearman_job_send_warning(gearman_job_st *job,
281                                           const void *warning,
282                                           size_t warning_size)
283 {
284   if (job)
285   {
286     const void *args[2];
287     size_t args_size[2];
288 
289     if ((job->options.work_in_use) == false)
290     {
291       args[0]= job->assigned.arg[0];
292       args_size[0]= job->assigned.arg_size[0];
293       args[1]= warning;
294       args_size[1]= warning_size;
295 
296       gearman_return_t ret;
297       ret= gearman_packet_create_args(job->worker->universal, job->work,
298                                       GEARMAN_MAGIC_REQUEST,
299                                       GEARMAN_COMMAND_WORK_WARNING,
300                                       args, args_size, 2);
301       if (gearman_failed(ret))
302       {
303         return ret;
304       }
305 
306       job->options.work_in_use= true;
307     }
308 
309     return _job_send(job);
310   }
311 
312   return GEARMAN_INVALID_ARGUMENT;
313 }
314 
gearman_job_send_status(gearman_job_st * job,uint32_t numerator,uint32_t denominator)315 gearman_return_t gearman_job_send_status(gearman_job_st *job,
316                                          uint32_t numerator,
317                                          uint32_t denominator)
318 {
319   if (job)
320   {
321     char numerator_string[12];
322     char denominator_string[12];
323     const void *args[3];
324     size_t args_size[3];
325 
326     if (not (job->options.work_in_use))
327     {
328       snprintf(numerator_string, 12, "%u", numerator);
329       snprintf(denominator_string, 12, "%u", denominator);
330 
331       args[0]= job->assigned.arg[0];
332       args_size[0]= job->assigned.arg_size[0];
333       args[1]= numerator_string;
334       args_size[1]= strlen(numerator_string) + 1;
335       args[2]= denominator_string;
336       args_size[2]= strlen(denominator_string);
337 
338       gearman_return_t ret;
339       ret= gearman_packet_create_args(job->worker->universal, job->work,
340                                       GEARMAN_MAGIC_REQUEST,
341                                       GEARMAN_COMMAND_WORK_STATUS,
342                                       args, args_size, 3);
343       if (gearman_failed(ret))
344       {
345         return ret;
346       }
347 
348       job->options.work_in_use= true;
349     }
350 
351     return _job_send(job);
352   }
353 
354   return GEARMAN_INVALID_ARGUMENT;
355 }
356 
gearman_job_send_complete(gearman_job_st * job,const void * result,size_t result_size)357 gearman_return_t gearman_job_send_complete(gearman_job_st *job,
358                                            const void *result,
359                                            size_t result_size)
360 {
361   if (job)
362   {
363     if (job->reducer)
364     {
365       return GEARMAN_INVALID_ARGUMENT;
366     }
367 
368     return gearman_job_send_complete_fin(job, result, result_size);
369   }
370 
371   return GEARMAN_INVALID_ARGUMENT;
372 }
373 
gearman_job_send_complete_fin(gearman_job_st * job,const void * result,size_t result_size)374 gearman_return_t gearman_job_send_complete_fin(gearman_job_st *job,
375                                                const void *result, size_t result_size)
376 {
377   if (job)
378   {
379     if (job->options.finished)
380     {
381       return GEARMAN_SUCCESS;
382     }
383 
384     if (job->reducer)
385     {
386       if (result_size)
387       {
388         gearman_argument_t value= gearman_argument_make(NULL, 0, static_cast<const char *>(result), result_size);
389         job->reducer->add(value);
390       }
391 
392       gearman_return_t rc= job->reducer->complete();
393       if (gearman_failed(rc))
394       {
395         return gearman_error(job->worker->universal, rc, "The reducer's complete() returned an error");
396       }
397 
398       const gearman_vector_st *reduced_value= job->reducer->result.string();
399       if (reduced_value)
400       {
401         result= gearman_string_value(reduced_value);
402         result_size= gearman_string_length(reduced_value);
403       }
404       else
405       {
406         result= NULL;
407         result_size= 0;
408       }
409     }
410 
411     const void *args[2];
412     size_t args_size[2];
413 
414     if (not (job->options.work_in_use))
415     {
416       args[0]= job->assigned.arg[0];
417       args_size[0]= job->assigned.arg_size[0];
418 
419       args[1]= result;
420       args_size[1]= result_size;
421       gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
422                                                        GEARMAN_MAGIC_REQUEST,
423                                                        GEARMAN_COMMAND_WORK_COMPLETE,
424                                                        args, args_size, 2);
425       if (gearman_failed(ret))
426       {
427         return ret;
428       }
429       job->options.work_in_use= true;
430     }
431 
432     gearman_return_t ret= _job_send(job);
433     if (gearman_failed(ret))
434     {
435       return ret;
436     }
437 
438     job->options.finished= true;
439 
440     return GEARMAN_SUCCESS;
441   }
442 
443   return GEARMAN_INVALID_ARGUMENT;
444 }
445 
gearman_job_send_exception(gearman_job_st * job,const void * exception,size_t exception_size)446 gearman_return_t gearman_job_send_exception(gearman_job_st *job,
447                                             const void *exception,
448                                             size_t exception_size)
449 {
450   if (job)
451   {
452     const void *args[2];
453     size_t args_size[2];
454 
455     if (not (job->options.work_in_use))
456     {
457       args[0]= job->assigned.arg[0];
458       args_size[0]= job->assigned.arg_size[0];
459       args[1]= exception;
460       args_size[1]= exception_size;
461 
462       gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
463                                                        GEARMAN_MAGIC_REQUEST,
464                                                        GEARMAN_COMMAND_WORK_EXCEPTION,
465                                                        args, args_size, 2);
466       if (gearman_failed(ret))
467         return ret;
468 
469       job->options.work_in_use= true;
470     }
471 
472     return _job_send(job);
473   }
474 
475   return GEARMAN_INVALID_ARGUMENT;
476 }
477 
gearman_job_send_fail(gearman_job_st * job)478 gearman_return_t gearman_job_send_fail(gearman_job_st *job)
479 {
480   if (job)
481   {
482     if (job->reducer)
483     {
484       return GEARMAN_INVALID_ARGUMENT;
485     }
486 
487     return gearman_job_send_fail_fin(job);
488   }
489 
490   return GEARMAN_INVALID_ARGUMENT;
491 }
492 
gearman_job_send_fail_fin(gearman_job_st * job)493 gearman_return_t gearman_job_send_fail_fin(gearman_job_st *job)
494 {
495   if (job)
496   {
497     const void *args[1];
498     size_t args_size[1];
499 
500     if (job->options.finished)
501     {
502       return GEARMAN_SUCCESS;
503     }
504 
505     if (not (job->options.work_in_use))
506     {
507       args[0]= job->assigned.arg[0];
508       args_size[0]= job->assigned.arg_size[0] - 1;
509       gearman_return_t ret= gearman_packet_create_args(job->worker->universal, job->work,
510                                                        GEARMAN_MAGIC_REQUEST,
511                                                        GEARMAN_COMMAND_WORK_FAIL,
512                                                        args, args_size, 1);
513       if (gearman_failed(ret))
514       {
515         return ret;
516       }
517 
518       job->options.work_in_use= true;
519     }
520 
521     gearman_return_t ret;
522     ret= _job_send(job);
523     if (gearman_failed(ret))
524       return ret;
525 
526     job->options.finished= true;
527     return GEARMAN_SUCCESS;
528   }
529 
530   return GEARMAN_INVALID_ARGUMENT;
531 }
532 
gearman_job_handle(const gearman_job_st * job)533 const char *gearman_job_handle(const gearman_job_st *job)
534 {
535   if (job)
536   {
537     return static_cast<const char *>(job->assigned.arg[0]);
538   }
539 
540   return NULL;
541 }
542 
gearman_job_function_name(const gearman_job_st * job)543 const char *gearman_job_function_name(const gearman_job_st *job)
544 {
545   if (job)
546   {
547     return static_cast<char *>(job->assigned.arg[1]);
548   }
549 
550   return NULL;
551 }
552 
gearman_job_function_name_string(const gearman_job_st * job)553 gearman_string_t gearman_job_function_name_string(const gearman_job_st *job)
554 {
555   if (job)
556   {
557     gearman_string_t temp= { job->assigned.arg[1], job->assigned.arg_size[1] };
558     return temp;
559   }
560 
561   static gearman_string_t ret= {0, 0};
562   return ret;
563 }
564 
gearman_job_unique(const gearman_job_st * job)565 const char *gearman_job_unique(const gearman_job_st *job)
566 {
567   if (job)
568   {
569     if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ or
570         job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL)
571     {
572       return static_cast<const char *>(job->assigned.arg[2]);
573     }
574 
575     return "";
576   }
577 
578   return NULL;
579 }
580 
gearman_job_is_map(const gearman_job_st * job)581 bool gearman_job_is_map(const gearman_job_st *job)
582 {
583   if (job)
584   {
585     return bool(job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL) and job->assigned.arg_size[3] > 1;
586   }
587 
588   return false;
589 }
590 
gearman_job_reducer_string(const gearman_job_st * job)591 gearman_string_t gearman_job_reducer_string(const gearman_job_st *job)
592 {
593   if (job)
594   {
595     if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL and job->assigned.arg_size[3] > 1)
596     {
597       gearman_string_t temp= { job->assigned.arg[3], job->assigned.arg_size[3] -1 };
598       return temp;
599     }
600 
601     static gearman_string_t null_temp= { gearman_literal_param("") };
602 
603     return null_temp;
604   }
605 
606   static gearman_string_t ret= {0, 0};
607   return ret;
608 }
609 
gearman_job_reducer(const gearman_job_st * job)610 const char *gearman_job_reducer(const gearman_job_st *job)
611 {
612   if (job)
613   {
614     if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_ALL)
615     {
616       return static_cast<const char *>(job->assigned.arg[3]);
617     }
618 
619     return "";
620   }
621 
622   return NULL;
623 }
624 
gearman_job_workload(const gearman_job_st * job)625 const void *gearman_job_workload(const gearman_job_st *job)
626 {
627   if (job)
628   {
629     return job->assigned.data;
630   }
631 
632   return NULL;
633 }
634 
gearman_job_workload_size(const gearman_job_st * job)635 size_t gearman_job_workload_size(const gearman_job_st *job)
636 {
637   if (job)
638   {
639     return job->assigned.data_size;
640   }
641 
642   return 0;
643 }
644 
gearman_job_take_workload(gearman_job_st * job,size_t * data_size)645 void *gearman_job_take_workload(gearman_job_st *job, size_t *data_size)
646 {
647   if (job)
648   {
649     return gearman_packet_take_data(job->assigned, data_size);
650   }
651 
652   return NULL;
653 }
654 
gearman_job_free(gearman_job_st * job)655 void gearman_job_free(gearman_job_st *job)
656 {
657   if (job)
658   {
659     if (job->options.assigned_in_use)
660     {
661       gearman_packet_free(&(job->assigned));
662     }
663 
664     if (job->options.work_in_use)
665     {
666       gearman_packet_free(&(job->work));
667     }
668 
669     if (job->worker->job_list == job)
670     {
671       job->worker->job_list= job->next;
672     }
673 
674     if (job->prev)
675     {
676       job->prev->next= job->next;
677     }
678 
679     if (job->next)
680     {
681       job->next->prev= job->prev;
682     }
683     job->worker->job_count--;
684 
685     delete job->reducer;
686     job->reducer= NULL;
687 
688     if (job->options.allocated)
689     {
690       delete job;
691     }
692   }
693 }
694 
695 /*
696  * Static Definitions
697  */
698 
_job_send(gearman_job_st * job)699 static gearman_return_t _job_send(gearman_job_st *job)
700 {
701   if (job)
702   {
703     gearman_return_t ret= job->con->send_packet(job->work, true);
704 
705     while ((ret == GEARMAN_IO_WAIT) or (ret == GEARMAN_TIMEOUT))
706     {
707 #if 0
708       assert(job->work.universal);
709       ret= gearman_wait(*(job->work.universal));
710 #endif
711       ret= gearman_wait(job->worker->universal);
712       if (ret == GEARMAN_SUCCESS)
713       {
714         ret= job->con->send_packet(job->work, true);
715       }
716     }
717 
718     if (gearman_failed(ret))
719     {
720       return ret;
721     }
722 
723     gearman_packet_free(&(job->work));
724     job->options.work_in_use= false;
725 
726     return GEARMAN_SUCCESS;
727   }
728 
729   return GEARMAN_INVALID_ARGUMENT;
730 }
731 
gearman_job_error(gearman_job_st * job)732 const char *gearman_job_error(gearman_job_st *job)
733 {
734   if (job and job->worker)
735   {
736     return gearman_worker_error(job->worker);
737   }
738 
739   return NULL;
740 }
741