1 /* Gearman Perl front end
2 * Copyright (C) 2009-2010 Dennis Schoen
3 * All rights reserved.
4 *
5 * This library is free software; you can redistribute it and/or modify
6 * it under the same terms as Perl itself, either Perl version 5.8.9 or,
7 * at your option, any later version of Perl 5 you may have available.
8 */
9
10 #include "gearman_xs.h"
11
12 typedef struct gearman_xs_client {
13 gearman_client_st *client;
14 /* used for keeping track of task interface callbacks */
15 SV * created_fn;
16 SV * data_fn;
17 SV * complete_fn;
18 SV * fail_fn;
19 SV * status_fn;
20 SV * warning_fn;
21 } gearman_xs_client;
22
23 /* client task context */
24 typedef struct
25 {
26 gearman_client_st *client;
27 const char *workload;
28 } gearman_task_context_st;
29
30
31 /* context free function to free() the workload */
_perl_task_free(gearman_task_st * task,void * context)32 static void _perl_task_free(gearman_task_st *task, void *context)
33 {
34 PERL_UNUSED_VAR(task);
35 gearman_task_context_st *context_st= (gearman_task_context_st *)context;
36 Safefree(context_st->workload);
37 Safefree(context_st);
38 }
39
_perl_task_callback(SV * fn,gearman_task_st * task)40 static gearman_return_t _perl_task_callback(SV * fn, gearman_task_st *task)
41 {
42 int count;
43 gearman_return_t ret;
44
45 dSP;
46
47 ENTER;
48 SAVETMPS;
49
50 PUSHMARK(SP);
51 XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
52 PUTBACK;
53
54 count= call_sv(fn, G_SCALAR);
55 if (count != 1)
56 croak("Invalid number of return values.\n");
57
58 SPAGAIN;
59
60 ret= POPi;
61
62 PUTBACK;
63 FREETMPS;
64 LEAVE;
65
66 return ret;
67 }
68
_perl_task_complete_fn(gearman_task_st * task)69 static gearman_return_t _perl_task_complete_fn(gearman_task_st *task)
70 {
71 gearman_task_context_st *context_st;
72 gearman_xs_client *self;
73
74 context_st= (gearman_task_context_st *)gearman_task_context(task);
75 self= (gearman_xs_client *)gearman_client_context(context_st->client);
76
77 return _perl_task_callback(self->complete_fn, task);
78 }
79
_perl_task_fail_fn(gearman_task_st * task)80 static gearman_return_t _perl_task_fail_fn(gearman_task_st *task)
81 {
82 gearman_task_context_st *context_st;
83 gearman_xs_client *self;
84
85 context_st= (gearman_task_context_st *)gearman_task_context(task);
86 self= (gearman_xs_client *)gearman_client_context(context_st->client);
87
88 return _perl_task_callback(self->fail_fn, task);
89 }
90
_perl_task_status_fn(gearman_task_st * task)91 static gearman_return_t _perl_task_status_fn(gearman_task_st *task)
92 {
93 gearman_task_context_st *context_st;
94 gearman_xs_client *self;
95
96 context_st= (gearman_task_context_st *)gearman_task_context(task);
97 self= (gearman_xs_client *)gearman_client_context(context_st->client);
98
99 return _perl_task_callback(self->status_fn, task);
100 }
101
_perl_task_created_fn(gearman_task_st * task)102 static gearman_return_t _perl_task_created_fn(gearman_task_st *task)
103 {
104 gearman_task_context_st *context_st;
105 gearman_xs_client *self;
106
107 context_st= (gearman_task_context_st *)gearman_task_context(task);
108 self= (gearman_xs_client *)gearman_client_context(context_st->client);
109
110 return _perl_task_callback(self->created_fn, task);
111 }
112
_perl_task_data_fn(gearman_task_st * task)113 static gearman_return_t _perl_task_data_fn(gearman_task_st *task)
114 {
115 gearman_task_context_st *context_st;
116 gearman_xs_client *self;
117
118 context_st= (gearman_task_context_st *)gearman_task_context(task);
119 self= (gearman_xs_client *)gearman_client_context(context_st->client);
120
121 return _perl_task_callback(self->data_fn, task);
122 }
123
_perl_task_warning_fn(gearman_task_st * task)124 static gearman_return_t _perl_task_warning_fn(gearman_task_st *task)
125 {
126 gearman_task_context_st *context_st;
127 gearman_xs_client *self;
128
129 context_st= (gearman_task_context_st *)gearman_task_context(task);
130 self= (gearman_xs_client *)gearman_client_context(context_st->client);
131
132 return _perl_task_callback(self->warning_fn, task);
133 }
134
_create_client()135 static SV* _create_client() {
136 gearman_xs_client *self;
137
138 Newxz(self, 1, gearman_xs_client);
139 self->client= gearman_client_create(NULL);
140 if (self->client == NULL) {
141 Perl_croak(aTHX_ "gearman_client_create:NULL\n");
142 }
143
144 gearman_client_set_context(self->client, self);
145 gearman_client_add_options(self->client, GEARMAN_CLIENT_FREE_TASKS);
146 gearman_client_set_workload_malloc_fn(self->client, _perl_malloc, NULL);
147 gearman_client_set_workload_free_fn(self->client, _perl_free, NULL);
148 gearman_client_set_task_context_free_fn(self->client, _perl_task_free);
149
150 return _bless("Gearman::XS::Client", self);
151 }
152
153 MODULE = Gearman::XS::Client PACKAGE = Gearman::XS::Client
154
155 PROTOTYPES: ENABLE
156
157 SV*
158 Gearman::XS::Client::new()
159 CODE:
160 PERL_UNUSED_VAR(CLASS);
161 RETVAL = _create_client();
162 OUTPUT:
163 RETVAL
164
165 gearman_return_t
166 add_server(self, ...)
167 gearman_xs_client *self
168 PREINIT:
169 char *host= NULL;
170 in_port_t port= 0;
171 CODE:
172 if( (items > 1) && SvCUR(ST(1)) )
173 host= SvPV_nolen(ST(1));
174 if ( items > 2)
175 port= (in_port_t)SvIV(ST(2));
176
177 RETVAL= gearman_client_add_server(self->client, host, port);
178 OUTPUT:
179 RETVAL
180
181 gearman_return_t
182 add_servers(self, servers)
183 gearman_xs_client *self
184 const char *servers
185 CODE:
186 RETVAL= gearman_client_add_servers(self->client, servers);
187 OUTPUT:
188 RETVAL
189
190 void
191 remove_servers(self)
192 gearman_xs_client *self
193 CODE:
194 gearman_client_remove_servers(self->client);
195
196 gearman_client_options_t
197 options(self)
198 gearman_xs_client *self
199 CODE:
200 RETVAL= gearman_client_options(self->client);
201 OUTPUT:
202 RETVAL
203
204 void
205 set_options(self, options)
206 gearman_xs_client *self
207 gearman_client_options_t options
208 CODE:
209 gearman_client_set_options(self->client, options);
210
211 void
212 add_options(self, options)
213 gearman_xs_client *self
214 gearman_client_options_t options
215 CODE:
216 gearman_client_add_options(self->client, options);
217
218 void
219 remove_options(self, options)
220 gearman_xs_client *self
221 gearman_client_options_t options
222 CODE:
223 gearman_client_remove_options(self->client, options);
224
225 gearman_return_t
226 echo(self, workload)
227 gearman_xs_client *self
228 SV * workload
229 PREINIT:
230 const char *w;
231 size_t w_size;
232 CODE:
233 w= SvPV(workload, w_size);
234 RETVAL= gearman_client_echo(self->client, w, w_size);
235 OUTPUT:
236 RETVAL
237
238 void
239 do(self, function_name, workload, ...)
240 gearman_xs_client *self
241 const char *function_name
242 SV * workload
243 PREINIT:
244 char *unique= NULL;
245 gearman_return_t ret;
246 const char *w;
247 size_t w_size;
248 void *result;
249 size_t result_size;
250 PPCODE:
251 if (items > 3)
252 unique= SvPV_nolen(ST(3));
253 w= SvPV(workload, w_size);
254 result= gearman_client_do(self->client, function_name, unique, w, w_size,
255 &result_size, &ret);
256 XPUSHs(sv_2mortal(newSViv(ret)));
257 if ((ret == GEARMAN_WORK_DATA) || (ret == GEARMAN_SUCCESS) || (ret == GEARMAN_WORK_WARNING))
258 {
259 XPUSHs(sv_2mortal(newSVpvn(result, result_size)));
260 Safefree(result);
261 }
262 else
263 XPUSHs(&PL_sv_undef);
264
265 void
266 do_high(self, function_name, workload, ...)
267 gearman_xs_client *self
268 const char *function_name
269 SV * workload
270 PREINIT:
271 char *unique= NULL;
272 gearman_return_t ret;
273 const char *w;
274 size_t w_size;
275 void *result;
276 size_t result_size;
277 PPCODE:
278 if (items > 3)
279 unique= SvPV_nolen(ST(3));
280 w= SvPV(workload, w_size);
281 result= gearman_client_do_high(self->client, function_name, unique, w,
282 w_size, &result_size, &ret);
283 XPUSHs(sv_2mortal(newSViv(ret)));
284 if ((ret == GEARMAN_WORK_DATA) || (ret == GEARMAN_SUCCESS) || (ret == GEARMAN_WORK_WARNING))
285 {
286 XPUSHs(sv_2mortal(newSVpvn(result, result_size)));
287 Safefree(result);
288 }
289 else
290 XPUSHs(&PL_sv_undef);
291
292 void
293 do_low(self, function_name, workload, ...)
294 gearman_xs_client *self
295 const char *function_name
296 SV * workload
297 PREINIT:
298 char *unique= NULL;
299 gearman_return_t ret;
300 const char *w;
301 size_t w_size;
302 void *result;
303 size_t result_size;
304 PPCODE:
305 if (items > 3)
306 unique= SvPV_nolen(ST(3));
307 w= SvPV(workload, w_size);
308 result= gearman_client_do_low(self->client, function_name, unique, w,
309 w_size, &result_size, &ret);
310 XPUSHs(sv_2mortal(newSViv(ret)));
311 if ((ret == GEARMAN_WORK_DATA) || (ret == GEARMAN_SUCCESS) || (ret == GEARMAN_WORK_WARNING))
312 {
313 XPUSHs(sv_2mortal(newSVpvn(result, result_size)));
314 Safefree(result);
315 }
316 else
317 XPUSHs(&PL_sv_undef);
318
319 void
320 do_background(self, function_name, workload, ...)
321 gearman_xs_client *self
322 const char *function_name
323 SV * workload
324 PREINIT:
325 char *job_handle;
326 char *unique= NULL;
327 const char *w;
328 size_t w_size;
329 gearman_return_t ret;
330 PPCODE:
331 if (items > 3)
332 unique= SvPV_nolen(ST(3));
333 Newxz(job_handle, GEARMAN_JOB_HANDLE_SIZE, char);
334 w= SvPV(workload, w_size);
335 ret= gearman_client_do_background(self->client, function_name, unique, w,
336 w_size, job_handle);
337 XPUSHs(sv_2mortal(newSViv(ret)));
338 if (ret != GEARMAN_SUCCESS)
339 {
340 Safefree(job_handle);
341 XPUSHs(&PL_sv_undef);
342 }
343 else
344 XPUSHs(sv_2mortal(newSVpv(job_handle, 0)));
345
346 void
347 do_high_background(self, function_name, workload, ...)
348 gearman_xs_client *self
349 const char *function_name
350 SV * workload
351 PREINIT:
352 char *job_handle;
353 char *unique= NULL;
354 const char *w;
355 size_t w_size;
356 gearman_return_t ret;
357 PPCODE:
358 if (items > 3)
359 unique= SvPV_nolen(ST(3));
360 Newxz(job_handle, GEARMAN_JOB_HANDLE_SIZE, char);
361 w= SvPV(workload, w_size);
362 ret= gearman_client_do_high_background(self->client, function_name, unique,
363 w, w_size, job_handle);
364 XPUSHs(sv_2mortal(newSViv(ret)));
365 if (ret != GEARMAN_SUCCESS)
366 {
367 Safefree(job_handle);
368 XPUSHs(&PL_sv_undef);
369 }
370 else
371 XPUSHs(sv_2mortal(newSVpv(job_handle, 0)));
372
373 void
374 do_low_background(self, function_name, workload, ...)
375 gearman_xs_client *self
376 const char *function_name
377 SV * workload
378 PREINIT:
379 char *job_handle;
380 char *unique= NULL;
381 const char *w;
382 size_t w_size;
383 gearman_return_t ret;
384 PPCODE:
385 if (items > 3)
386 unique= SvPV_nolen(ST(3));
387 Newxz(job_handle, GEARMAN_JOB_HANDLE_SIZE, char);
388 w= SvPV(workload, w_size);
389 ret= gearman_client_do_low_background(self->client, function_name, unique,
390 w, w_size, job_handle);
391 XPUSHs(sv_2mortal(newSViv(ret)));
392 if (ret != GEARMAN_SUCCESS)
393 {
394 Safefree(job_handle);
395 XPUSHs(&PL_sv_undef);
396 }
397 else
398 XPUSHs(sv_2mortal(newSVpv(job_handle, 0)));
399
400 void
401 add_task(self, function_name, workload, ...)
402 gearman_xs_client *self
403 const char *function_name
404 SV * workload
405 PREINIT:
406 gearman_task_st *task;
407 char *unique= NULL;
408 gearman_return_t ret;
409 gearman_task_context_st *context;
410 const void *w;
411 size_t w_size;
412 PPCODE:
413 if (items > 3)
414 unique= SvPV_nolen(ST(3));
415 w= _get_string(workload, &w_size);
416 Newxz(context, 1, gearman_task_context_st);
417 context->client= self->client;
418 context->workload= w;
419 task= gearman_client_add_task(self->client, NULL, context, function_name,
420 unique, w, w_size, &ret);
421 XPUSHs(sv_2mortal(newSViv(ret)));
422 XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
423
424 void
425 add_task_high(self, function_name, workload, ...)
426 gearman_xs_client *self
427 const char *function_name
428 SV * workload
429 PREINIT:
430 gearman_task_st *task;
431 char *unique= NULL;
432 gearman_return_t ret;
433 gearman_task_context_st *context;
434 const void *w;
435 size_t w_size;
436 PPCODE:
437 if (items > 3)
438 unique= SvPV_nolen(ST(3));
439 w= _get_string(workload, &w_size);
440 Newxz(context, 1, gearman_task_context_st);
441 context->client= self->client;
442 context->workload= w;
443 task= gearman_client_add_task_high(self->client, NULL, context,
444 function_name, unique, w, w_size, &ret);
445
446 XPUSHs(sv_2mortal(newSViv(ret)));
447 XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
448
449 void
450 add_task_low(self, function_name, workload, ...)
451 gearman_xs_client *self
452 const char *function_name
453 SV * workload
454 PREINIT:
455 gearman_task_st *task;
456 char *unique= NULL;
457 gearman_return_t ret;
458 gearman_task_context_st *context;
459 const void *w;
460 size_t w_size;
461 PPCODE:
462 if (items > 3)
463 unique= SvPV_nolen(ST(3));
464 w= _get_string(workload, &w_size);
465 Newxz(context, 1, gearman_task_context_st);
466 context->client= self->client;
467 context->workload= w;
468 task= gearman_client_add_task_low(self->client, NULL, context,
469 function_name, unique, w, w_size, &ret);
470
471 XPUSHs(sv_2mortal(newSViv(ret)));
472 XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
473
474 void
475 add_task_background(self, function_name, workload, ...)
476 gearman_xs_client *self
477 const char *function_name
478 SV * workload
479 PREINIT:
480 gearman_task_st *task;
481 char *unique= NULL;
482 gearman_return_t ret;
483 gearman_task_context_st *context;
484 const void *w;
485 size_t w_size;
486 PPCODE:
487 if (items > 3)
488 unique= SvPV_nolen(ST(3));
489 w= _get_string(workload, &w_size);
490 Newxz(context, 1, gearman_task_context_st);
491 context->client= self->client;
492 context->workload= w;
493 task= gearman_client_add_task_background(self->client, NULL, context,
494 function_name, unique, w, w_size,
495 &ret);
496
497 XPUSHs(sv_2mortal(newSViv(ret)));
498 XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
499
500 void
501 add_task_high_background(self, function_name, workload, ...)
502 gearman_xs_client *self
503 const char *function_name
504 SV * workload
505 PREINIT:
506 gearman_task_st *task;
507 char *unique= NULL;
508 gearman_return_t ret;
509 gearman_task_context_st *context;
510 const void *w;
511 size_t w_size;
512 PPCODE:
513 if (items > 3)
514 unique= SvPV_nolen(ST(3));
515 w= _get_string(workload, &w_size);
516 Newxz(context, 1, gearman_task_context_st);
517 context->client= self->client;
518 context->workload= w;
519 task= gearman_client_add_task_high_background(self->client, NULL, context,
520 function_name, unique, w,
521 w_size, &ret);
522
523 XPUSHs(sv_2mortal(newSViv(ret)));
524 XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
525
526 void
527 add_task_low_background(self, function_name, workload, ...)
528 gearman_xs_client *self
529 const char *function_name
530 SV * workload
531 PREINIT:
532 gearman_task_st *task;
533 char *unique= NULL;
534 gearman_return_t ret;
535 gearman_task_context_st *context;
536 const void *w;
537 size_t w_size;
538 PPCODE:
539 if (items > 3)
540 unique= SvPV_nolen(ST(3));
541 w= _get_string(workload, &w_size);
542 Newxz(context, 1, gearman_task_context_st);
543 context->client= self->client;
544 context->workload= w;
545 task= gearman_client_add_task_low_background(self->client, NULL, context,
546 function_name, unique, w,
547 w_size, &ret);
548
549 XPUSHs(sv_2mortal(newSViv(ret)));
550 XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
551
552 gearman_return_t
553 run_tasks(self)
554 gearman_xs_client *self
555 CODE:
556 RETVAL= gearman_client_run_tasks(self->client);
557 OUTPUT:
558 RETVAL
559
560 void
561 set_created_fn(self, fn)
562 gearman_xs_client *self
563 SV * fn
564 CODE:
565 self->created_fn= newSVsv(fn);
566 gearman_client_set_created_fn(self->client, _perl_task_created_fn);
567
568 void
569 set_data_fn(self, fn)
570 gearman_xs_client *self
571 SV * fn
572 CODE:
573 self->data_fn= newSVsv(fn);
574 gearman_client_set_data_fn(self->client, _perl_task_data_fn);
575
576 void
577 set_complete_fn(self, fn)
578 gearman_xs_client *self
579 SV * fn
580 CODE:
581 self->complete_fn= newSVsv(fn);
582 gearman_client_set_complete_fn(self->client, _perl_task_complete_fn);
583
584 void
585 set_fail_fn(self, fn)
586 gearman_xs_client *self
587 SV * fn
588 CODE:
589 self->fail_fn= newSVsv(fn);
590 gearman_client_set_fail_fn(self->client, _perl_task_fail_fn);
591
592 void
593 set_status_fn(self, fn)
594 gearman_xs_client *self
595 SV * fn
596 CODE:
597 self->status_fn= newSVsv(fn);
598 gearman_client_set_status_fn(self->client, _perl_task_status_fn);
599
600 void
601 set_warning_fn(self, fn)
602 gearman_xs_client *self
603 SV * fn
604 CODE:
605 self->warning_fn= newSVsv(fn);
606 gearman_client_set_warning_fn(self->client, _perl_task_warning_fn);
607
608 const char *
609 error(self)
610 gearman_xs_client *self
611 CODE:
612 RETVAL= gearman_client_error(self->client);
613 OUTPUT:
614 RETVAL
615
616 void
617 do_status(self)
618 gearman_xs_client *self
619 PREINIT:
620 uint32_t numerator;
621 uint32_t denominator;
622 PPCODE:
623 gearman_client_do_status(self->client, &numerator, &denominator);
624 XPUSHs(sv_2mortal(newSVuv(numerator)));
625 XPUSHs(sv_2mortal(newSVuv(denominator)));
626
627 void
628 job_status(self, job_handle= NULL)
629 gearman_xs_client *self
630 const char *job_handle
631 PREINIT:
632 gearman_return_t ret;
633 bool is_known;
634 bool is_running;
635 uint32_t numerator;
636 uint32_t denominator;
637 PPCODE:
638 ret= gearman_client_job_status(self->client, job_handle, &is_known, &is_running,
639 &numerator, &denominator);
640 XPUSHs(sv_2mortal(newSViv(ret)));
641 XPUSHs(sv_2mortal(newSViv(is_known)));
642 XPUSHs(sv_2mortal(newSViv(is_running)));
643 XPUSHs(sv_2mortal(newSVuv(numerator)));
644 XPUSHs(sv_2mortal(newSVuv(denominator)));
645
646 int
647 timeout(self)
648 gearman_xs_client *self
649 CODE:
650 RETVAL= gearman_client_timeout(self->client);
651 OUTPUT:
652 RETVAL
653
654 void
655 set_timeout(self, timeout)
656 gearman_xs_client *self
657 int timeout
658 CODE:
659 gearman_client_set_timeout(self->client, timeout);
660
661 gearman_return_t
662 wait(self)
663 gearman_xs_client *self
664 CODE:
665 RETVAL= gearman_client_wait(self->client);
666 OUTPUT:
667 RETVAL
668
669 void
670 add_task_status(self, job_handle)
671 gearman_xs_client *self
672 const char *job_handle
673 PREINIT:
674 gearman_task_st *task;
675 gearman_return_t ret;
676 gearman_task_context_st *context;
677 PPCODE:
678 Newxz(context, 1, gearman_task_context_st);
679 context->client= self->client;
680 task= gearman_client_add_task_status(self->client, NULL, context, job_handle, &ret);
681 XPUSHs(sv_2mortal(newSViv(ret)));
682 XPUSHs(sv_2mortal(_bless("Gearman::XS::Task", task)));
683
684 void
685 clear_fn(self)
686 gearman_xs_client *self
687 CODE:
688 gearman_client_clear_fn(self->client);
689
690 void
691 DESTROY(self)
692 gearman_xs_client *self
693 CODE:
694 gearman_client_free(self->client);
695 if (self->created_fn)
696 sv_free(self->created_fn);
697 if (self->data_fn)
698 sv_free(self->data_fn);
699 if (self->complete_fn)
700 sv_free(self->complete_fn);
701 if (self->fail_fn)
702 sv_free(self->fail_fn);
703 if (self->status_fn)
704 sv_free(self->status_fn);
705 if (self->warning_fn)
706 sv_free(self->warning_fn);
707 Safefree(self);
708