1 /* Gearman Perl front end
2  * Copyright (C) 2009-2010 Dennis Schoen
3  * All rights reserved.
4  *
5  * This library is free software; you can redistribute it and/or modify
6  * it under the same terms as Perl itself, either Perl version 5.8.9 or,
7  * at your option, any later version of Perl 5 you may have available.
8  */
9 
10 #include "gearman_xs.h"
11 
12 typedef struct gearman_worker_st gearman_xs_worker;
13 
14 /* worker cb_arg to pass our actual perl function */
15 typedef struct
16 {
17   SV * func;
18   const char *cb_arg;
19 } gearman_worker_cb;
20 
_create_worker()21 static SV* _create_worker() {
22   gearman_worker_st *self;
23 
24   self= gearman_worker_create(NULL);
25   if (self == NULL) {
26       Perl_croak(aTHX_ "gearman_worker_create:NULL\n");
27   }
28 
29   gearman_worker_set_workload_free_fn(self, _perl_free, NULL);
30   gearman_worker_set_workload_malloc_fn(self, _perl_malloc, NULL);
31 
32   return _bless("Gearman::XS::Worker", self);
33 }
34 
35 /* wrapper function to call our actual perl function,
36    passed in through cb_arg */
_perl_worker_function_callback(gearman_job_st * job,void * cb_arg,size_t * result_size,gearman_return_t * ret_ptr)37 static void *_perl_worker_function_callback(gearman_job_st *job,
38                                      void *cb_arg,
39                                      size_t *result_size,
40                                      gearman_return_t *ret_ptr)
41 {
42   gearman_worker_cb *worker_cb;
43   int count;
44   void *result= NULL;
45   SV * result_sv;
46 
47   dSP;
48 
49   ENTER;
50   SAVETMPS;
51 
52   worker_cb= (gearman_worker_cb *)cb_arg;
53 
54   PUSHMARK(SP);
55   XPUSHs(sv_2mortal(_bless("Gearman::XS::Job", job)));
56   if (worker_cb->cb_arg != NULL)
57   {
58     XPUSHs(sv_2mortal(newSVpv(worker_cb->cb_arg, strlen(worker_cb->cb_arg))));
59   }
60   PUTBACK;
61 
62   count= call_sv(worker_cb->func, G_EVAL|G_SCALAR);
63 
64   SPAGAIN;
65 
66   if (SvTRUE(ERRSV))
67   {
68     fprintf(stderr, "Job: '%s' died with: %s",
69             gearman_job_function_name(job), SvPV_nolen(ERRSV));
70     *ret_ptr= GEARMAN_WORK_FAIL;
71     (void)POPs;
72   }
73   else
74   {
75     if (count != 1)
76       croak("Invalid number of return values.\n");
77 
78     result_sv= POPs;
79     if (SvOK(result_sv))
80     {
81       result= _get_string(result_sv, result_size);
82     }
83 
84     *ret_ptr= GEARMAN_SUCCESS;
85   }
86 
87   PUTBACK;
88   FREETMPS;
89   LEAVE;
90 
91   return result;
92 }
93 
_perl_log_fn_callback(const char * line,gearman_verbose_t verbose,void * fn)94 static void _perl_log_fn_callback( const char *line,
95                             gearman_verbose_t verbose,
96                             void *fn)
97 {
98   dSP;
99 
100   ENTER;
101   SAVETMPS;
102 
103   PUSHMARK(SP);
104   XPUSHs(sv_2mortal(newSVpv(line, strlen(line))));
105   XPUSHs(sv_2mortal(newSViv(verbose)));
106   PUTBACK;
107 
108   call_sv(fn, G_VOID);
109 
110   FREETMPS;
111   LEAVE;
112 }
113 
114 MODULE = Gearman::XS::Worker    PACKAGE = Gearman::XS::Worker
115 
116 PROTOTYPES: ENABLE
117 
118 SV*
119 Gearman::XS::Worker::new()
120   CODE:
121     PERL_UNUSED_VAR(CLASS);
122     RETVAL = _create_worker();
123   OUTPUT:
124     RETVAL
125 
126 gearman_return_t
127 add_server(self, ...)
128     gearman_xs_worker *self
129   PREINIT:
130     char *host= NULL;
131     in_port_t port= 0;
132   CODE:
133     if( (items > 1) && SvCUR(ST(1)) )
134       host= SvPV_nolen(ST(1));
135     if ( items > 2)
136       port= (in_port_t)SvIV(ST(2));
137 
138     RETVAL= gearman_worker_add_server(self, host, port);
139   OUTPUT:
140     RETVAL
141 
142 gearman_return_t
143 add_servers(self, servers)
144     gearman_xs_worker *self
145     const char *servers
146   CODE:
147     RETVAL= gearman_worker_add_servers(self, servers);
148   OUTPUT:
149     RETVAL
150 
151 void
152 remove_servers(self)
153     gearman_xs_worker *self
154   CODE:
155     gearman_worker_remove_servers(self);
156 
157 gearman_return_t
158 echo(self, workload)
159     gearman_xs_worker *self
160     SV * workload
161   PREINIT:
162     const char *w;
163     size_t w_size;
164   CODE:
165     w= SvPV(workload, w_size);
166     RETVAL= gearman_worker_echo(self, w, w_size);
167   OUTPUT:
168     RETVAL
169 
170 gearman_return_t
171 register(self, function_name, ...)
172     gearman_xs_worker *self
173     const char *function_name
174   PREINIT:
175     uint32_t timeout= 0;
176   CODE:
177     if( items > 2 )
178       timeout= (uint32_t)SvIV(ST(2));
179     RETVAL= gearman_worker_register(self, function_name, timeout);
180   OUTPUT:
181     RETVAL
182 
183 gearman_return_t
184 unregister(self, function_name)
185     gearman_xs_worker *self
186     const char *function_name
187   CODE:
188     RETVAL= gearman_worker_unregister(self, function_name);
189   OUTPUT:
190     RETVAL
191 
192 gearman_return_t
193 unregister_all(self)
194     gearman_xs_worker *self
195   CODE:
196     RETVAL= gearman_worker_unregister_all(self);
197   OUTPUT:
198     RETVAL
199 
200 gearman_return_t
201 add_function(self, function_name, timeout, worker_fn, context)
202     gearman_xs_worker *self
203     const char *function_name
204     uint32_t timeout
205     SV * worker_fn
206     const char *context
207   INIT:
208     gearman_worker_cb *worker_cb;
209   CODE:
210     Newxz(worker_cb, 1, gearman_worker_cb);
211     worker_cb->func= newSVsv(worker_fn);
212     worker_cb->cb_arg= context;
213     RETVAL= gearman_worker_add_function(self, function_name, timeout,
214                                         _perl_worker_function_callback,
215                                         (void *)worker_cb );
216   OUTPUT:
217     RETVAL
218 
219 gearman_return_t
220 work(self)
221     gearman_xs_worker *self
222   CODE:
223     RETVAL= gearman_worker_work(self);
224   OUTPUT:
225     RETVAL
226 
227 const char *
228 error(self)
229     gearman_xs_worker *self
230   CODE:
231     RETVAL= gearman_worker_error(self);
232   OUTPUT:
233     RETVAL
234 
235 gearman_worker_options_t
236 options(self)
237     gearman_xs_worker *self
238   CODE:
239     RETVAL= gearman_worker_options(self);
240   OUTPUT:
241     RETVAL
242 
243 void
244 set_options(self, options)
245     gearman_xs_worker *self
246     gearman_worker_options_t options
247   CODE:
248     gearman_worker_set_options(self, options);
249 
250 void
251 add_options(self, options)
252     gearman_xs_worker *self
253     gearman_worker_options_t options
254   CODE:
255     gearman_worker_add_options(self, options);
256 
257 void
258 remove_options(self, options)
259     gearman_xs_worker *self
260     gearman_worker_options_t options
261   CODE:
262     gearman_worker_remove_options(self, options);
263 
264 void
265 grab_job(self)
266     gearman_xs_worker *self
267   PREINIT:
268     gearman_return_t ret;
269   PPCODE:
270     (void)gearman_worker_grab_job(self, &(self->work_job), &ret);
271     XPUSHs(sv_2mortal(newSViv(ret)));
272     if (ret == GEARMAN_SUCCESS)
273       XPUSHs(sv_2mortal(_bless("Gearman::XS::Job", &(self->work_job))));
274     else
275       XPUSHs(&PL_sv_undef);
276 
277 int
278 timeout(self)
279     gearman_xs_worker *self
280   CODE:
281     RETVAL= gearman_worker_timeout(self);
282   OUTPUT:
283     RETVAL
284 
285 void
286 set_timeout(self, timeout)
287     gearman_xs_worker *self
288     int timeout
289   CODE:
290     gearman_worker_set_timeout(self, timeout);
291 
292 gearman_return_t
293 wait(self)
294     gearman_xs_worker *self
295   CODE:
296     RETVAL= gearman_worker_wait(self);
297   OUTPUT:
298     RETVAL
299 
300 void
301 set_log_fn(self, fn, verbose)
302     gearman_xs_worker *self
303     SV * fn
304     gearman_verbose_t verbose
305   CODE:
306     gearman_worker_set_log_fn(self, _perl_log_fn_callback, newSVsv(fn), verbose);
307 
308 void
309 function_exists(self, function_name)
310     gearman_xs_worker *self
311     const char *function_name
312   PPCODE:
313     if (gearman_worker_function_exist(self, function_name, strlen(function_name)))
314       XSRETURN_YES;
315     else
316       XSRETURN_NO;
317 
318 void
319 DESTROY(self)
320     gearman_xs_worker *self
321   CODE:
322     gearman_worker_free(self);
323