1 /* Gearman Perl front end
2 * Copyright (C) 2009-2010 Dennis Schoen
3 * All rights reserved.
4 *
5 * This library is free software; you can redistribute it and/or modify
6 * it under the same terms as Perl itself, either Perl version 5.8.9 or,
7 * at your option, any later version of Perl 5 you may have available.
8 */
9
10 #include "gearman_xs.h"
11
12 typedef struct gearman_worker_st gearman_xs_worker;
13
14 /* worker cb_arg to pass our actual perl function */
15 typedef struct
16 {
17 SV * func;
18 const char *cb_arg;
19 } gearman_worker_cb;
20
_create_worker()21 static SV* _create_worker() {
22 gearman_worker_st *self;
23
24 self= gearman_worker_create(NULL);
25 if (self == NULL) {
26 Perl_croak(aTHX_ "gearman_worker_create:NULL\n");
27 }
28
29 gearman_worker_set_workload_free_fn(self, _perl_free, NULL);
30 gearman_worker_set_workload_malloc_fn(self, _perl_malloc, NULL);
31
32 return _bless("Gearman::XS::Worker", self);
33 }
34
35 /* wrapper function to call our actual perl function,
36 passed in through cb_arg */
_perl_worker_function_callback(gearman_job_st * job,void * cb_arg,size_t * result_size,gearman_return_t * ret_ptr)37 static void *_perl_worker_function_callback(gearman_job_st *job,
38 void *cb_arg,
39 size_t *result_size,
40 gearman_return_t *ret_ptr)
41 {
42 gearman_worker_cb *worker_cb;
43 int count;
44 void *result= NULL;
45 SV * result_sv;
46
47 dSP;
48
49 ENTER;
50 SAVETMPS;
51
52 worker_cb= (gearman_worker_cb *)cb_arg;
53
54 PUSHMARK(SP);
55 XPUSHs(sv_2mortal(_bless("Gearman::XS::Job", job)));
56 if (worker_cb->cb_arg != NULL)
57 {
58 XPUSHs(sv_2mortal(newSVpv(worker_cb->cb_arg, strlen(worker_cb->cb_arg))));
59 }
60 PUTBACK;
61
62 count= call_sv(worker_cb->func, G_EVAL|G_SCALAR);
63
64 SPAGAIN;
65
66 if (SvTRUE(ERRSV))
67 {
68 fprintf(stderr, "Job: '%s' died with: %s",
69 gearman_job_function_name(job), SvPV_nolen(ERRSV));
70 *ret_ptr= GEARMAN_WORK_FAIL;
71 (void)POPs;
72 }
73 else
74 {
75 if (count != 1)
76 croak("Invalid number of return values.\n");
77
78 result_sv= POPs;
79 if (SvOK(result_sv))
80 {
81 result= _get_string(result_sv, result_size);
82 }
83
84 *ret_ptr= GEARMAN_SUCCESS;
85 }
86
87 PUTBACK;
88 FREETMPS;
89 LEAVE;
90
91 return result;
92 }
93
_perl_log_fn_callback(const char * line,gearman_verbose_t verbose,void * fn)94 static void _perl_log_fn_callback( const char *line,
95 gearman_verbose_t verbose,
96 void *fn)
97 {
98 dSP;
99
100 ENTER;
101 SAVETMPS;
102
103 PUSHMARK(SP);
104 XPUSHs(sv_2mortal(newSVpv(line, strlen(line))));
105 XPUSHs(sv_2mortal(newSViv(verbose)));
106 PUTBACK;
107
108 call_sv(fn, G_VOID);
109
110 FREETMPS;
111 LEAVE;
112 }
113
114 MODULE = Gearman::XS::Worker PACKAGE = Gearman::XS::Worker
115
116 PROTOTYPES: ENABLE
117
118 SV*
119 Gearman::XS::Worker::new()
120 CODE:
121 PERL_UNUSED_VAR(CLASS);
122 RETVAL = _create_worker();
123 OUTPUT:
124 RETVAL
125
126 gearman_return_t
127 add_server(self, ...)
128 gearman_xs_worker *self
129 PREINIT:
130 char *host= NULL;
131 in_port_t port= 0;
132 CODE:
133 if( (items > 1) && SvCUR(ST(1)) )
134 host= SvPV_nolen(ST(1));
135 if ( items > 2)
136 port= (in_port_t)SvIV(ST(2));
137
138 RETVAL= gearman_worker_add_server(self, host, port);
139 OUTPUT:
140 RETVAL
141
142 gearman_return_t
143 add_servers(self, servers)
144 gearman_xs_worker *self
145 const char *servers
146 CODE:
147 RETVAL= gearman_worker_add_servers(self, servers);
148 OUTPUT:
149 RETVAL
150
151 void
152 remove_servers(self)
153 gearman_xs_worker *self
154 CODE:
155 gearman_worker_remove_servers(self);
156
157 gearman_return_t
158 echo(self, workload)
159 gearman_xs_worker *self
160 SV * workload
161 PREINIT:
162 const char *w;
163 size_t w_size;
164 CODE:
165 w= SvPV(workload, w_size);
166 RETVAL= gearman_worker_echo(self, w, w_size);
167 OUTPUT:
168 RETVAL
169
170 gearman_return_t
171 register(self, function_name, ...)
172 gearman_xs_worker *self
173 const char *function_name
174 PREINIT:
175 uint32_t timeout= 0;
176 CODE:
177 if( items > 2 )
178 timeout= (uint32_t)SvIV(ST(2));
179 RETVAL= gearman_worker_register(self, function_name, timeout);
180 OUTPUT:
181 RETVAL
182
183 gearman_return_t
184 unregister(self, function_name)
185 gearman_xs_worker *self
186 const char *function_name
187 CODE:
188 RETVAL= gearman_worker_unregister(self, function_name);
189 OUTPUT:
190 RETVAL
191
192 gearman_return_t
193 unregister_all(self)
194 gearman_xs_worker *self
195 CODE:
196 RETVAL= gearman_worker_unregister_all(self);
197 OUTPUT:
198 RETVAL
199
200 gearman_return_t
201 add_function(self, function_name, timeout, worker_fn, context)
202 gearman_xs_worker *self
203 const char *function_name
204 uint32_t timeout
205 SV * worker_fn
206 const char *context
207 INIT:
208 gearman_worker_cb *worker_cb;
209 CODE:
210 Newxz(worker_cb, 1, gearman_worker_cb);
211 worker_cb->func= newSVsv(worker_fn);
212 worker_cb->cb_arg= context;
213 RETVAL= gearman_worker_add_function(self, function_name, timeout,
214 _perl_worker_function_callback,
215 (void *)worker_cb );
216 OUTPUT:
217 RETVAL
218
219 gearman_return_t
220 work(self)
221 gearman_xs_worker *self
222 CODE:
223 RETVAL= gearman_worker_work(self);
224 OUTPUT:
225 RETVAL
226
227 const char *
228 error(self)
229 gearman_xs_worker *self
230 CODE:
231 RETVAL= gearman_worker_error(self);
232 OUTPUT:
233 RETVAL
234
235 gearman_worker_options_t
236 options(self)
237 gearman_xs_worker *self
238 CODE:
239 RETVAL= gearman_worker_options(self);
240 OUTPUT:
241 RETVAL
242
243 void
244 set_options(self, options)
245 gearman_xs_worker *self
246 gearman_worker_options_t options
247 CODE:
248 gearman_worker_set_options(self, options);
249
250 void
251 add_options(self, options)
252 gearman_xs_worker *self
253 gearman_worker_options_t options
254 CODE:
255 gearman_worker_add_options(self, options);
256
257 void
258 remove_options(self, options)
259 gearman_xs_worker *self
260 gearman_worker_options_t options
261 CODE:
262 gearman_worker_remove_options(self, options);
263
264 void
265 grab_job(self)
266 gearman_xs_worker *self
267 PREINIT:
268 gearman_return_t ret;
269 PPCODE:
270 (void)gearman_worker_grab_job(self, &(self->work_job), &ret);
271 XPUSHs(sv_2mortal(newSViv(ret)));
272 if (ret == GEARMAN_SUCCESS)
273 XPUSHs(sv_2mortal(_bless("Gearman::XS::Job", &(self->work_job))));
274 else
275 XPUSHs(&PL_sv_undef);
276
277 int
278 timeout(self)
279 gearman_xs_worker *self
280 CODE:
281 RETVAL= gearman_worker_timeout(self);
282 OUTPUT:
283 RETVAL
284
285 void
286 set_timeout(self, timeout)
287 gearman_xs_worker *self
288 int timeout
289 CODE:
290 gearman_worker_set_timeout(self, timeout);
291
292 gearman_return_t
293 wait(self)
294 gearman_xs_worker *self
295 CODE:
296 RETVAL= gearman_worker_wait(self);
297 OUTPUT:
298 RETVAL
299
300 void
301 set_log_fn(self, fn, verbose)
302 gearman_xs_worker *self
303 SV * fn
304 gearman_verbose_t verbose
305 CODE:
306 gearman_worker_set_log_fn(self, _perl_log_fn_callback, newSVsv(fn), verbose);
307
308 void
309 function_exists(self, function_name)
310 gearman_xs_worker *self
311 const char *function_name
312 PPCODE:
313 if (gearman_worker_function_exist(self, function_name, strlen(function_name)))
314 XSRETURN_YES;
315 else
316 XSRETURN_NO;
317
318 void
319 DESTROY(self)
320 gearman_xs_worker *self
321 CODE:
322 gearman_worker_free(self);
323