1 /* Gearman Perl front end
2  * Copyright (C) 2009-2010 Dennis Schoen
3  * All rights reserved.
4  *
5  * This library is free software; you can redistribute it and/or modify
6  * it under the same terms as Perl itself, either Perl version 5.8.9 or,
7  * at your option, any later version of Perl 5 you may have available.
8  */
9 
10 #include "gearman_xs.h"
11 
12 typedef struct gearman_xs_client {
13   gearman_client_st *client;
14   /* used for keeping track of task interface callbacks */
15   SV * created_fn;
16   SV * data_fn;
17   SV * complete_fn;
18   SV * fail_fn;
19   SV * status_fn;
20   SV * warning_fn;
21 } gearman_xs_client;
22 
23 /* client task context */
24 typedef struct
25 {
26   gearman_client_st *client;
27   const char *workload;
28 } gearman_task_context_st;
29 
30 
31 /* context free function to free() the workload */
_perl_task_free(gearman_task_st * task,void * context)32 static void _perl_task_free(gearman_task_st *task, void *context)
33 {
34   PERL_UNUSED_VAR(task);
35   gearman_task_context_st *context_st= (gearman_task_context_st *)context;
36   Safefree(context_st->workload);
37   Safefree(context_st);
38 }
39 
_perl_task_callback(SV * fn,gearman_task_st * task)40 static gearman_return_t _perl_task_callback(SV * fn, gearman_task_st *task)
41 {
42   int count;
43   gearman_return_t ret;
44 
45   dSP;
46 
47   ENTER;
48   SAVETMPS;
49 
50   PUSHMARK(SP);
51   XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
52   PUTBACK;
53 
54   count= call_sv(fn, G_SCALAR);
55   if (count != 1)
56     croak("Invalid number of return values.\n");
57 
58   SPAGAIN;
59 
60   ret= POPi;
61 
62   PUTBACK;
63   FREETMPS;
64   LEAVE;
65 
66   return ret;
67 }
68 
_perl_task_complete_fn(gearman_task_st * task)69 static gearman_return_t _perl_task_complete_fn(gearman_task_st *task)
70 {
71   gearman_task_context_st *context_st;
72   gearman_xs_client *self;
73 
74   context_st= (gearman_task_context_st *)gearman_task_context(task);
75   self= (gearman_xs_client *)gearman_client_context(context_st->client);
76 
77   return _perl_task_callback(self->complete_fn, task);
78 }
79 
_perl_task_fail_fn(gearman_task_st * task)80 static gearman_return_t _perl_task_fail_fn(gearman_task_st *task)
81 {
82   gearman_task_context_st *context_st;
83   gearman_xs_client *self;
84 
85   context_st= (gearman_task_context_st *)gearman_task_context(task);
86   self= (gearman_xs_client *)gearman_client_context(context_st->client);
87 
88   return _perl_task_callback(self->fail_fn, task);
89 }
90 
_perl_task_status_fn(gearman_task_st * task)91 static gearman_return_t _perl_task_status_fn(gearman_task_st *task)
92 {
93   gearman_task_context_st *context_st;
94   gearman_xs_client *self;
95 
96   context_st= (gearman_task_context_st *)gearman_task_context(task);
97   self= (gearman_xs_client *)gearman_client_context(context_st->client);
98 
99   return _perl_task_callback(self->status_fn, task);
100 }
101 
_perl_task_created_fn(gearman_task_st * task)102 static gearman_return_t _perl_task_created_fn(gearman_task_st *task)
103 {
104   gearman_task_context_st *context_st;
105   gearman_xs_client *self;
106 
107   context_st= (gearman_task_context_st *)gearman_task_context(task);
108   self= (gearman_xs_client *)gearman_client_context(context_st->client);
109 
110   return _perl_task_callback(self->created_fn, task);
111 }
112 
_perl_task_data_fn(gearman_task_st * task)113 static gearman_return_t _perl_task_data_fn(gearman_task_st *task)
114 {
115   gearman_task_context_st *context_st;
116   gearman_xs_client *self;
117 
118   context_st= (gearman_task_context_st *)gearman_task_context(task);
119   self= (gearman_xs_client *)gearman_client_context(context_st->client);
120 
121   return _perl_task_callback(self->data_fn, task);
122 }
123 
_perl_task_warning_fn(gearman_task_st * task)124 static gearman_return_t _perl_task_warning_fn(gearman_task_st *task)
125 {
126   gearman_task_context_st *context_st;
127   gearman_xs_client *self;
128 
129   context_st= (gearman_task_context_st *)gearman_task_context(task);
130   self= (gearman_xs_client *)gearman_client_context(context_st->client);
131 
132   return _perl_task_callback(self->warning_fn, task);
133 }
134 
_create_client()135 static SV* _create_client() {
136   gearman_xs_client *self;
137 
138   Newxz(self, 1, gearman_xs_client);
139   self->client= gearman_client_create(NULL);
140   if (self->client == NULL) {
141       Perl_croak(aTHX_ "gearman_client_create:NULL\n");
142   }
143 
144   gearman_client_set_context(self->client, self);
145   gearman_client_add_options(self->client, GEARMAN_CLIENT_FREE_TASKS);
146   gearman_client_set_workload_malloc_fn(self->client, _perl_malloc, NULL);
147   gearman_client_set_workload_free_fn(self->client, _perl_free, NULL);
148   gearman_client_set_task_context_free_fn(self->client, _perl_task_free);
149 
150   return _bless("Gearman::XS::Client", self);
151 }
152 
153 MODULE = Gearman::XS::Client    PACKAGE = Gearman::XS::Client
154 
155 PROTOTYPES: ENABLE
156 
157 SV*
158 Gearman::XS::Client::new()
159   CODE:
160     PERL_UNUSED_VAR(CLASS);
161     RETVAL = _create_client();
162   OUTPUT:
163     RETVAL
164 
165 gearman_return_t
166 add_server(self, ...)
167     gearman_xs_client *self
168   PREINIT:
169     char *host= NULL;
170     in_port_t port= 0;
171   CODE:
172     if( (items > 1) && SvCUR(ST(1)) )
173       host= SvPV_nolen(ST(1));
174     if ( items > 2)
175       port= (in_port_t)SvIV(ST(2));
176 
177     RETVAL= gearman_client_add_server(self->client, host, port);
178   OUTPUT:
179     RETVAL
180 
181 gearman_return_t
182 add_servers(self, servers)
183     gearman_xs_client *self
184     const char *servers
185   CODE:
186     RETVAL= gearman_client_add_servers(self->client, servers);
187   OUTPUT:
188     RETVAL
189 
190 void
191 remove_servers(self)
192     gearman_xs_client *self
193   CODE:
194     gearman_client_remove_servers(self->client);
195 
196 gearman_client_options_t
197 options(self)
198     gearman_xs_client *self
199   CODE:
200     RETVAL= gearman_client_options(self->client);
201   OUTPUT:
202     RETVAL
203 
204 void
205 set_options(self, options)
206     gearman_xs_client *self
207     gearman_client_options_t options
208   CODE:
209     gearman_client_set_options(self->client, options);
210 
211 void
212 add_options(self, options)
213     gearman_xs_client *self
214     gearman_client_options_t options
215   CODE:
216     gearman_client_add_options(self->client, options);
217 
218 void
219 remove_options(self, options)
220     gearman_xs_client *self
221     gearman_client_options_t options
222   CODE:
223     gearman_client_remove_options(self->client, options);
224 
225 gearman_return_t
226 echo(self, workload)
227     gearman_xs_client *self
228     SV * workload
229   PREINIT:
230     const char *w;
231     size_t w_size;
232   CODE:
233     w= SvPV(workload, w_size);
234     RETVAL= gearman_client_echo(self->client, w, w_size);
235   OUTPUT:
236     RETVAL
237 
238 void
239 do(self, function_name, workload, ...)
240     gearman_xs_client *self
241     const char *function_name
242     SV * workload
243   PREINIT:
244     char *unique= NULL;
245     gearman_return_t ret;
246     const char *w;
247     size_t w_size;
248     void *result;
249     size_t result_size;
250   PPCODE:
251     if (items > 3)
252       unique= SvPV_nolen(ST(3));
253     w= SvPV(workload, w_size);
254     result= gearman_client_do(self->client, function_name, unique, w, w_size,
255                                                           &result_size, &ret);
256     XPUSHs(sv_2mortal(newSViv(ret)));
257     if ((ret == GEARMAN_WORK_DATA) || (ret == GEARMAN_SUCCESS) || (ret == GEARMAN_WORK_WARNING))
258     {
259       XPUSHs(sv_2mortal(newSVpvn(result, result_size)));
260       Safefree(result);
261     }
262     else
263       XPUSHs(&PL_sv_undef);
264 
265 void
266 do_high(self, function_name, workload, ...)
267     gearman_xs_client *self
268     const char *function_name
269     SV * workload
270   PREINIT:
271     char *unique= NULL;
272     gearman_return_t ret;
273     const char *w;
274     size_t w_size;
275     void *result;
276     size_t result_size;
277   PPCODE:
278     if (items > 3)
279       unique= SvPV_nolen(ST(3));
280     w= SvPV(workload, w_size);
281     result= gearman_client_do_high(self->client, function_name, unique, w,
282                                                 w_size, &result_size, &ret);
283     XPUSHs(sv_2mortal(newSViv(ret)));
284     if ((ret == GEARMAN_WORK_DATA) || (ret == GEARMAN_SUCCESS) || (ret == GEARMAN_WORK_WARNING))
285     {
286       XPUSHs(sv_2mortal(newSVpvn(result, result_size)));
287       Safefree(result);
288     }
289     else
290       XPUSHs(&PL_sv_undef);
291 
292 void
293 do_low(self, function_name, workload, ...)
294     gearman_xs_client *self
295     const char *function_name
296     SV * workload
297   PREINIT:
298     char *unique= NULL;
299     gearman_return_t ret;
300     const char *w;
301     size_t w_size;
302     void *result;
303     size_t result_size;
304   PPCODE:
305     if (items > 3)
306       unique= SvPV_nolen(ST(3));
307     w= SvPV(workload, w_size);
308     result= gearman_client_do_low(self->client, function_name, unique, w,
309                                                 w_size, &result_size, &ret);
310     XPUSHs(sv_2mortal(newSViv(ret)));
311     if ((ret == GEARMAN_WORK_DATA) || (ret == GEARMAN_SUCCESS) || (ret == GEARMAN_WORK_WARNING))
312     {
313       XPUSHs(sv_2mortal(newSVpvn(result, result_size)));
314       Safefree(result);
315     }
316     else
317       XPUSHs(&PL_sv_undef);
318 
319 void
320 do_background(self, function_name, workload, ...)
321     gearman_xs_client *self
322     const char *function_name
323     SV * workload
324   PREINIT:
325     char *job_handle;
326     char *unique= NULL;
327     const char *w;
328     size_t w_size;
329     gearman_return_t ret;
330   PPCODE:
331     if (items > 3)
332       unique= SvPV_nolen(ST(3));
333     Newxz(job_handle, GEARMAN_JOB_HANDLE_SIZE, char);
334     w= SvPV(workload, w_size);
335     ret= gearman_client_do_background(self->client, function_name, unique, w,
336                                                         w_size, job_handle);
337     XPUSHs(sv_2mortal(newSViv(ret)));
338     if (ret != GEARMAN_SUCCESS)
339     {
340       Safefree(job_handle);
341       XPUSHs(&PL_sv_undef);
342     }
343     else
344       XPUSHs(sv_2mortal(newSVpv(job_handle, 0)));
345 
346 void
347 do_high_background(self, function_name, workload, ...)
348     gearman_xs_client *self
349     const char *function_name
350     SV * workload
351   PREINIT:
352     char *job_handle;
353     char *unique= NULL;
354     const char *w;
355     size_t w_size;
356     gearman_return_t ret;
357   PPCODE:
358     if (items > 3)
359       unique= SvPV_nolen(ST(3));
360     Newxz(job_handle, GEARMAN_JOB_HANDLE_SIZE, char);
361     w= SvPV(workload, w_size);
362     ret= gearman_client_do_high_background(self->client, function_name, unique,
363                                                       w, w_size, job_handle);
364     XPUSHs(sv_2mortal(newSViv(ret)));
365     if (ret != GEARMAN_SUCCESS)
366     {
367       Safefree(job_handle);
368       XPUSHs(&PL_sv_undef);
369     }
370     else
371       XPUSHs(sv_2mortal(newSVpv(job_handle, 0)));
372 
373 void
374 do_low_background(self, function_name, workload, ...)
375     gearman_xs_client *self
376     const char *function_name
377     SV * workload
378   PREINIT:
379     char *job_handle;
380     char *unique= NULL;
381     const char *w;
382     size_t w_size;
383     gearman_return_t ret;
384   PPCODE:
385     if (items > 3)
386       unique= SvPV_nolen(ST(3));
387     Newxz(job_handle, GEARMAN_JOB_HANDLE_SIZE, char);
388     w= SvPV(workload, w_size);
389     ret= gearman_client_do_low_background(self->client, function_name, unique,
390                                                       w, w_size, job_handle);
391     XPUSHs(sv_2mortal(newSViv(ret)));
392     if (ret != GEARMAN_SUCCESS)
393     {
394       Safefree(job_handle);
395       XPUSHs(&PL_sv_undef);
396     }
397     else
398       XPUSHs(sv_2mortal(newSVpv(job_handle, 0)));
399 
400 void
401 add_task(self, function_name, workload, ...)
402     gearman_xs_client *self
403     const char *function_name
404     SV * workload
405   PREINIT:
406     gearman_task_st *task;
407     char *unique= NULL;
408     gearman_return_t ret;
409     gearman_task_context_st *context;
410     const void *w;
411     size_t w_size;
412   PPCODE:
413     if (items > 3)
414       unique= SvPV_nolen(ST(3));
415     w= _get_string(workload, &w_size);
416     Newxz(context, 1, gearman_task_context_st);
417     context->client= self->client;
418     context->workload= w;
419     task= gearman_client_add_task(self->client, NULL, context, function_name,
420                                             unique, w, w_size, &ret);
421     XPUSHs(sv_2mortal(newSViv(ret)));
422     XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
423 
424 void
425 add_task_high(self, function_name, workload, ...)
426     gearman_xs_client *self
427     const char *function_name
428     SV * workload
429   PREINIT:
430     gearman_task_st *task;
431     char *unique= NULL;
432     gearman_return_t ret;
433     gearman_task_context_st *context;
434     const void *w;
435     size_t w_size;
436   PPCODE:
437     if (items > 3)
438       unique= SvPV_nolen(ST(3));
439     w= _get_string(workload, &w_size);
440     Newxz(context, 1, gearman_task_context_st);
441     context->client= self->client;
442     context->workload= w;
443     task= gearman_client_add_task_high(self->client, NULL, context,
444                                        function_name, unique, w, w_size, &ret);
445 
446     XPUSHs(sv_2mortal(newSViv(ret)));
447     XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
448 
449 void
450 add_task_low(self, function_name, workload, ...)
451     gearman_xs_client *self
452     const char *function_name
453     SV * workload
454   PREINIT:
455     gearman_task_st *task;
456     char *unique= NULL;
457     gearman_return_t ret;
458     gearman_task_context_st *context;
459     const void *w;
460     size_t w_size;
461   PPCODE:
462     if (items > 3)
463       unique= SvPV_nolen(ST(3));
464     w= _get_string(workload, &w_size);
465     Newxz(context, 1, gearman_task_context_st);
466     context->client= self->client;
467     context->workload= w;
468     task= gearman_client_add_task_low(self->client, NULL, context,
469                                       function_name, unique, w, w_size, &ret);
470 
471     XPUSHs(sv_2mortal(newSViv(ret)));
472     XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
473 
474 void
475 add_task_background(self, function_name, workload, ...)
476     gearman_xs_client *self
477     const char *function_name
478     SV * workload
479   PREINIT:
480     gearman_task_st *task;
481     char *unique= NULL;
482     gearman_return_t ret;
483     gearman_task_context_st *context;
484     const void *w;
485     size_t w_size;
486   PPCODE:
487     if (items > 3)
488       unique= SvPV_nolen(ST(3));
489     w= _get_string(workload, &w_size);
490     Newxz(context, 1, gearman_task_context_st);
491     context->client= self->client;
492     context->workload= w;
493     task= gearman_client_add_task_background(self->client, NULL, context,
494                                              function_name, unique, w, w_size,
495                                              &ret);
496 
497     XPUSHs(sv_2mortal(newSViv(ret)));
498     XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
499 
500 void
501 add_task_high_background(self, function_name, workload, ...)
502     gearman_xs_client *self
503     const char *function_name
504     SV * workload
505   PREINIT:
506     gearman_task_st *task;
507     char *unique= NULL;
508     gearman_return_t ret;
509     gearman_task_context_st *context;
510     const void *w;
511     size_t w_size;
512   PPCODE:
513     if (items > 3)
514       unique= SvPV_nolen(ST(3));
515     w= _get_string(workload, &w_size);
516     Newxz(context, 1, gearman_task_context_st);
517     context->client= self->client;
518     context->workload= w;
519     task= gearman_client_add_task_high_background(self->client, NULL, context,
520                                                   function_name, unique, w,
521                                                   w_size, &ret);
522 
523     XPUSHs(sv_2mortal(newSViv(ret)));
524     XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
525 
526 void
527 add_task_low_background(self, function_name, workload, ...)
528     gearman_xs_client *self
529     const char *function_name
530     SV * workload
531   PREINIT:
532     gearman_task_st *task;
533     char *unique= NULL;
534     gearman_return_t ret;
535     gearman_task_context_st *context;
536     const void *w;
537     size_t w_size;
538   PPCODE:
539     if (items > 3)
540       unique= SvPV_nolen(ST(3));
541     w= _get_string(workload, &w_size);
542     Newxz(context, 1, gearman_task_context_st);
543     context->client= self->client;
544     context->workload= w;
545     task= gearman_client_add_task_low_background(self->client, NULL, context,
546                                                  function_name, unique, w,
547                                                  w_size, &ret);
548 
549     XPUSHs(sv_2mortal(newSViv(ret)));
550     XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
551 
552 gearman_return_t
553 run_tasks(self)
554     gearman_xs_client *self
555   CODE:
556     RETVAL= gearman_client_run_tasks(self->client);
557   OUTPUT:
558     RETVAL
559 
560 void
561 set_created_fn(self, fn)
562     gearman_xs_client *self
563     SV * fn
564   CODE:
565     self->created_fn= newSVsv(fn);
566     gearman_client_set_created_fn(self->client, _perl_task_created_fn);
567 
568 void
569 set_data_fn(self, fn)
570     gearman_xs_client *self
571     SV * fn
572   CODE:
573     self->data_fn= newSVsv(fn);
574     gearman_client_set_data_fn(self->client, _perl_task_data_fn);
575 
576 void
577 set_complete_fn(self, fn)
578     gearman_xs_client *self
579     SV * fn
580   CODE:
581     self->complete_fn= newSVsv(fn);
582     gearman_client_set_complete_fn(self->client, _perl_task_complete_fn);
583 
584 void
585 set_fail_fn(self, fn)
586     gearman_xs_client *self
587     SV * fn
588   CODE:
589     self->fail_fn= newSVsv(fn);
590     gearman_client_set_fail_fn(self->client, _perl_task_fail_fn);
591 
592 void
593 set_status_fn(self, fn)
594     gearman_xs_client *self
595     SV * fn
596   CODE:
597     self->status_fn= newSVsv(fn);
598     gearman_client_set_status_fn(self->client, _perl_task_status_fn);
599 
600 void
601 set_warning_fn(self, fn)
602     gearman_xs_client *self
603     SV * fn
604   CODE:
605     self->warning_fn= newSVsv(fn);
606     gearman_client_set_warning_fn(self->client, _perl_task_warning_fn);
607 
608 const char *
609 error(self)
610     gearman_xs_client *self
611   CODE:
612     RETVAL= gearman_client_error(self->client);
613   OUTPUT:
614     RETVAL
615 
616 void
617 do_status(self)
618     gearman_xs_client *self
619   PREINIT:
620     uint32_t numerator;
621     uint32_t denominator;
622   PPCODE:
623     gearman_client_do_status(self->client, &numerator, &denominator);
624     XPUSHs(sv_2mortal(newSVuv(numerator)));
625     XPUSHs(sv_2mortal(newSVuv(denominator)));
626 
627 void
628 job_status(self, job_handle= NULL)
629     gearman_xs_client *self
630     const char *job_handle
631   PREINIT:
632     gearman_return_t ret;
633     bool is_known;
634     bool is_running;
635     uint32_t numerator;
636     uint32_t denominator;
637   PPCODE:
638     ret= gearman_client_job_status(self->client, job_handle, &is_known, &is_running,
639                                    &numerator, &denominator);
640     XPUSHs(sv_2mortal(newSViv(ret)));
641     XPUSHs(sv_2mortal(newSViv(is_known)));
642     XPUSHs(sv_2mortal(newSViv(is_running)));
643     XPUSHs(sv_2mortal(newSVuv(numerator)));
644     XPUSHs(sv_2mortal(newSVuv(denominator)));
645 
646 int
647 timeout(self)
648     gearman_xs_client *self
649   CODE:
650     RETVAL= gearman_client_timeout(self->client);
651   OUTPUT:
652     RETVAL
653 
654 void
655 set_timeout(self, timeout)
656     gearman_xs_client *self
657     int timeout
658   CODE:
659     gearman_client_set_timeout(self->client, timeout);
660 
661 gearman_return_t
662 wait(self)
663     gearman_xs_client *self
664   CODE:
665     RETVAL= gearman_client_wait(self->client);
666   OUTPUT:
667     RETVAL
668 
669 void
670 add_task_status(self, job_handle)
671     gearman_xs_client *self
672     const char *job_handle
673   PREINIT:
674     gearman_task_st *task;
675     gearman_return_t ret;
676     gearman_task_context_st *context;
677   PPCODE:
678     Newxz(context, 1, gearman_task_context_st);
679     context->client= self->client;
680     task= gearman_client_add_task_status(self->client, NULL, context, job_handle, &ret);
681     XPUSHs(sv_2mortal(newSViv(ret)));
682     XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
683 
684 void
685 clear_fn(self)
686     gearman_xs_client *self
687   CODE:
688     gearman_client_clear_fn(self->client);
689 
690 void
691 DESTROY(self)
692     gearman_xs_client *self
693   CODE:
694     gearman_client_free(self->client);
695     if (self->created_fn)
696       sv_free(self->created_fn);
697     if (self->data_fn)
698       sv_free(self->data_fn);
699     if (self->complete_fn)
700       sv_free(self->complete_fn);
701     if (self->fail_fn)
702       sv_free(self->fail_fn);
703     if (self->status_fn)
704       sv_free(self->status_fn);
705     if (self->warning_fn)
706       sv_free(self->warning_fn);
707     Safefree(self);
708