1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include "ppport.h"
6 #include "hiredis.h"
7 #include "async.h"
8
9 #include <string.h>
10 #include <stdlib.h>
11 #include <time.h>
12 #include <stdio.h>
13 #include <poll.h>
14 #include <sys/socket.h>
15
16 #define MAX_ERROR_SIZE 256
17
18 #define WAIT_FOR_EVENT_OK 0
19 #define WAIT_FOR_EVENT_READ_TIMEOUT 1
20 #define WAIT_FOR_EVENT_WRITE_TIMEOUT 2
21 #define WAIT_FOR_EVENT_EXCEPTION 3
22
23 #define FLAG_INSIDE_TRANSACTION 0x01
24 #define FLAG_INSIDE_WATCH 0x02
25
26 #define DEBUG_MSG(fmt, ...) \
27 if (self->debug) { \
28 fprintf(stderr, "[%s:%d:%s]: ", __FILE__, __LINE__, __func__); \
29 fprintf(stderr, fmt, __VA_ARGS__); \
30 fprintf(stderr, "\n"); \
31 }
32
33 #define EQUALS_COMMAND(len, cmd, expected) ((len) == sizeof(expected) - 1 && memcmp(cmd, expected, sizeof(expected) - 1) == 0)
34
35 typedef struct redis_fast_s {
36 redisAsyncContext* ac;
37 char* hostname;
38 int port;
39 char* path;
40 char* error;
41 double reconnect;
42 int every;
43 int debug;
44 double cnx_timeout;
45 double read_timeout;
46 double write_timeout;
47 int current_database;
48 int need_reconnect;
49 int is_connected;
50 SV* on_connect;
51 SV* on_build_sock;
52 SV* data;
53 SV* reconnect_on_error;
54 double next_reconnect_on_error_at;
55 int proccess_sub_count;
56 int is_subscriber;
57 int expected_subs;
58 pid_t pid;
59 int flags;
60 } redis_fast_t, *Redis__Fast;
61
62 typedef struct redis_fast_reply_s {
63 SV* result;
64 SV* error;
65 } redis_fast_reply_t;
66
67 typedef redis_fast_reply_t (*CUSTOM_DECODE)(Redis__Fast self, redisReply* reply, int collect_errors);
68
69 typedef struct redis_fast_sync_cb_s {
70 redis_fast_reply_t ret;
71 int collect_errors;
72 CUSTOM_DECODE custom_decode;
73 int on_flags;
74 int off_flags;
75 } redis_fast_sync_cb_t;
76
77 typedef struct redis_fast_async_cb_s {
78 SV* cb;
79 int collect_errors;
80 CUSTOM_DECODE custom_decode;
81 int on_flags;
82 int off_flags;
83 const void* command_name;
84 STRLEN command_length;
85 } redis_fast_async_cb_t;
86
87 typedef struct redis_fast_subscribe_cb_s {
88 Redis__Fast self;
89 SV* cb;
90 } redis_fast_subscribe_cb_t;
91
92
93 #define WAIT_FOR_READ 0x01
94 #define WAIT_FOR_WRITE 0x02
95 typedef struct redis_fast_event_s {
96 int flags;
97 Redis__Fast self;
98 } redis_fast_event_t;
99
100
AddRead(void * privdata)101 static void AddRead(void *privdata) {
102 redis_fast_event_t *e = (redis_fast_event_t*)privdata;
103 Redis__Fast self = e->self;
104 e->flags |= WAIT_FOR_READ;
105 DEBUG_MSG("flags = %x", e->flags);
106 }
107
DelRead(void * privdata)108 static void DelRead(void *privdata) {
109 redis_fast_event_t *e = (redis_fast_event_t*)privdata;
110 Redis__Fast self = e->self;
111 e->flags &= ~WAIT_FOR_READ;
112 DEBUG_MSG("flags = %x", e->flags);
113 }
114
AddWrite(void * privdata)115 static void AddWrite(void *privdata) {
116 redis_fast_event_t *e = (redis_fast_event_t*)privdata;
117 Redis__Fast self = e->self;
118 e->flags |= WAIT_FOR_WRITE;
119 DEBUG_MSG("flags = %x", e->flags);
120 }
121
DelWrite(void * privdata)122 static void DelWrite(void *privdata) {
123 redis_fast_event_t *e = (redis_fast_event_t*)privdata;
124 Redis__Fast self = e->self;
125 e->flags &= ~WAIT_FOR_WRITE;
126 DEBUG_MSG("flags = %x", e->flags);
127 }
128
Cleanup(void * privdata)129 static void Cleanup(void *privdata) {
130 free(privdata);
131 }
132
Attach(redisAsyncContext * ac)133 static int Attach(redisAsyncContext *ac) {
134 Redis__Fast self = (Redis__Fast)ac->data;
135 redis_fast_event_t *e;
136
137 /* Nothing should be attached when something is already attached */
138 if (ac->ev.data != NULL)
139 return REDIS_ERR;
140
141 /* Create container for context and r/w events */
142 e = (redis_fast_event_t*)malloc(sizeof(*e));
143 e->flags = 0;
144 e->self = self;
145
146 /* Register functions to start/stop listening for events */
147 ac->ev.addRead = AddRead;
148 ac->ev.delRead = DelRead;
149 ac->ev.addWrite = AddWrite;
150 ac->ev.delWrite = DelWrite;
151 ac->ev.cleanup = Cleanup;
152 ac->ev.data = e;
153
154 return REDIS_OK;
155 }
156
wait_for_event(Redis__Fast self,double read_timeout,double write_timeout)157 static int wait_for_event(Redis__Fast self, double read_timeout, double write_timeout) {
158 redisContext *c;
159 int fd;
160 redis_fast_event_t *e;
161 struct pollfd pollfd;
162 int rc;
163 double timeout = -1;
164 int timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
165 int ms;
166
167 if(self==NULL) return WAIT_FOR_EVENT_EXCEPTION;
168 if(self->ac==NULL) return WAIT_FOR_EVENT_EXCEPTION;
169
170 c = &(self->ac->c);
171 fd = c->fd;
172 e = (redis_fast_event_t*)self->ac->ev.data;
173 if(e==NULL) return 0;
174
175 if((e->flags & (WAIT_FOR_READ|WAIT_FOR_WRITE)) == (WAIT_FOR_READ|WAIT_FOR_WRITE)) {
176 DEBUG_MSG("set READ and WRITE, compare read_timeout = %f and write_timeout = %f",
177 read_timeout, write_timeout);
178 if(read_timeout < 0 && write_timeout < 0) {
179 timeout = -1;
180 timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
181 } else if(read_timeout < 0) {
182 timeout = write_timeout;
183 timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
184 } else if(write_timeout < 0) {
185 timeout = read_timeout;
186 timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
187 } else if(read_timeout < write_timeout) {
188 timeout = read_timeout;
189 timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
190 } else {
191 timeout = write_timeout;
192 timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
193 }
194 } else if(e->flags & WAIT_FOR_READ) {
195 DEBUG_MSG("set READ, read_timeout = %f", read_timeout);
196 timeout = read_timeout;
197 timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
198 } else if(e->flags & WAIT_FOR_WRITE) {
199 DEBUG_MSG("set WRITE, write_timeout = %f", write_timeout);
200 timeout = write_timeout;
201 timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
202 }
203
204 START_POLL:
205 if (timeout < 0) {
206 ms = -1;
207 } else {
208 ms = (int)(timeout * 1000 + 0.999);
209 }
210 DEBUG_MSG("select start, timeout is %f", timeout);
211 pollfd.fd = fd;
212 pollfd.events = 0;
213 pollfd.revents = 0;
214 if(e->flags & WAIT_FOR_READ) { pollfd.events |= POLLIN; }
215 if(e->flags & WAIT_FOR_WRITE) { pollfd.events |= POLLOUT; }
216 rc = poll(&pollfd, 1, ms);
217 DEBUG_MSG("poll returns %d", rc);
218 if(rc == 0) {
219 DEBUG_MSG("%s", "timeout");
220 return timeout_mode;
221 }
222
223 if(rc < 0) {
224 DEBUG_MSG("exception: %s", strerror(errno));
225 if( errno == EINTR ) {
226 PERL_ASYNC_CHECK();
227 DEBUG_MSG("%s", "recieved interrupt. retry wait_for_event");
228 goto START_POLL;
229 }
230 return WAIT_FOR_EVENT_EXCEPTION;
231 }
232 if(self->ac && (pollfd.revents & POLLIN) != 0) {
233 DEBUG_MSG("ready to %s", "read");
234 redisAsyncHandleRead(self->ac);
235 }
236 if(self->ac && (pollfd.revents & (POLLOUT|POLLHUP)) != 0) {
237 DEBUG_MSG("ready to %s", "write");
238 redisAsyncHandleWrite(self->ac);
239 }
240 if((pollfd.revents & (POLLERR|POLLNVAL)) != 0) {
241 DEBUG_MSG(
242 "exception: %s%s",
243 (pollfd.revents & POLLERR) ? "POLLERR " : "",
244 (pollfd.revents & POLLNVAL) ? "POLLNVAL " : "");
245 return WAIT_FOR_EVENT_EXCEPTION;
246 }
247
248 DEBUG_MSG("%s", "finish");
249 return WAIT_FOR_EVENT_OK;
250 }
251
252
_wait_all_responses(Redis__Fast self)253 static int _wait_all_responses(Redis__Fast self) {
254 DEBUG_MSG("%s", "start");
255 while(self->ac && self->ac->replies.tail) {
256 int res = wait_for_event(self, self->read_timeout, self->write_timeout);
257 if (res != WAIT_FOR_EVENT_OK) {
258 DEBUG_MSG("error: %d", res);
259 return res;
260 }
261 }
262 DEBUG_MSG("%s", "finish");
263 return WAIT_FOR_EVENT_OK;
264 }
265
266
Redis__Fast_connect_cb(redisAsyncContext * c,int status)267 static void Redis__Fast_connect_cb(redisAsyncContext* c, int status) {
268 Redis__Fast self = (Redis__Fast)c->data;
269 DEBUG_MSG("connected status = %d", status);
270 if(status != REDIS_OK) {
271 // Connection Error!!
272 // Redis context will close automatically
273 self->ac = NULL;
274 } else {
275 self->is_connected = 1;
276 }
277 }
278
Redis__Fast_disconnect_cb(redisAsyncContext * c,int status)279 static void Redis__Fast_disconnect_cb(redisAsyncContext* c, int status) {
280 Redis__Fast self = (Redis__Fast)c->data;
281 PERL_UNUSED_VAR(status);
282 DEBUG_MSG("disconnected status = %d", status);
283 self->ac = NULL;
284 }
285
__build_sock(Redis__Fast self)286 static redisAsyncContext* __build_sock(Redis__Fast self)
287 {
288 redisAsyncContext *ac;
289 double timeout;
290 int res;
291
292 DEBUG_MSG("%s", "start");
293
294 if(self->on_build_sock) {
295 dSP;
296
297 ENTER;
298 SAVETMPS;
299
300 PUSHMARK(SP);
301 call_sv(self->on_build_sock, G_DISCARD | G_NOARGS);
302
303 FREETMPS;
304 LEAVE;
305 }
306
307 if(self->path) {
308 ac = redisAsyncConnectUnix(self->path);
309 } else {
310 ac = redisAsyncConnect(self->hostname, self->port);
311 }
312
313 if(ac == NULL) {
314 DEBUG_MSG("%s", "allocation error");
315 return NULL;
316 }
317 if(ac->err) {
318 DEBUG_MSG("connection error: %s", ac->errstr);
319 redisAsyncFree(ac);
320 return NULL;
321 }
322 ac->data = (void*)self;
323 self->ac = ac;
324 self->is_connected = 0;
325
326 Attach(ac);
327 redisAsyncSetConnectCallback(ac, (redisConnectCallback*)Redis__Fast_connect_cb);
328 redisAsyncSetDisconnectCallback(ac, (redisDisconnectCallback*)Redis__Fast_disconnect_cb);
329
330 // wait to connect...
331 timeout = -1;
332 if(self->cnx_timeout) {
333 timeout = self->cnx_timeout;
334 }
335 while(!self->is_connected) {
336 res = wait_for_event(self, timeout, timeout);
337 if(self->ac == NULL) {
338 // set is_connected flag to reconnect.
339 // see https://github.com/shogo82148/Redis-Fast/issues/73
340 self->is_connected = 1;
341
342 return NULL;
343 }
344 if(res != WAIT_FOR_EVENT_OK) {
345 DEBUG_MSG("error: %d", res);
346 redisAsyncFree(self->ac);
347 _wait_all_responses(self);
348
349 // set is_connected flag to reconnect.
350 // see https://github.com/shogo82148/Redis-Fast/issues/73
351 self->is_connected = 1;
352
353 return NULL;
354 }
355 }
356 if(self->on_connect){
357 dSP;
358 PUSHMARK(SP);
359 call_sv(self->on_connect, G_DISCARD | G_NOARGS);
360 }
361
362 DEBUG_MSG("%s", "finsih");
363 return self->ac;
364 }
365
366
Redis__Fast_connect(Redis__Fast self)367 static void Redis__Fast_connect(Redis__Fast self) {
368 struct timeval start, end;
369
370 DEBUG_MSG("%s", "start");
371
372 if (self->ac) {
373 redisAsyncFree(self->ac);
374 _wait_all_responses(self);
375 }
376 self->flags = 0;
377
378 //$self->{queue} = [];
379 self->pid = getpid();
380
381 if(self->reconnect == 0) {
382 __build_sock(self);
383 if(!self->ac) {
384 if(self->path) {
385 snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
386 } else {
387 snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
388 }
389 croak("%s", self->error);
390 }
391 return ;
392 }
393
394 // Reconnect...
395 gettimeofday(&start, NULL);
396 while (1) {
397 double elapsed_time;
398 if(__build_sock(self)) {
399 // Connected!
400 DEBUG_MSG("%s", "finish");
401 return;
402 }
403 gettimeofday(&end, NULL);
404 elapsed_time = (end.tv_sec-start.tv_sec) + 1E-6 * (end.tv_usec-start.tv_usec);
405 DEBUG_MSG("elasped time:%f, reconnect:%lf", elapsed_time, self->reconnect);
406 if( elapsed_time > self->reconnect) {
407 if(self->path) {
408 snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
409 } else {
410 snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
411 }
412 DEBUG_MSG("%s", "timed out");
413 croak("%s", self->error);
414 return;
415 }
416 DEBUG_MSG("%s", "failed to connect. wait...");
417 usleep(self->every);
418 }
419 DEBUG_MSG("%s", "finish");
420 }
421
422 // reconnect if the current connection is closed.
423 // the caller must check self->ac != 0 to continue.
Redis__Fast_reconnect(Redis__Fast self)424 static void Redis__Fast_reconnect(Redis__Fast self) {
425 DEBUG_MSG("%s", "start");
426 if(self->is_connected && !self->ac && self->reconnect > 0) {
427 DEBUG_MSG("%s", "connection not found. reconnect");
428 Redis__Fast_connect(self);
429 }
430 if(!self->ac) {
431 DEBUG_MSG("%s", "Not connected to any server");
432 }
433 DEBUG_MSG("%s", "finish");
434 }
435
Redis__Fast_decode_reply(Redis__Fast self,redisReply * reply,int collect_errors)436 static redis_fast_reply_t Redis__Fast_decode_reply(Redis__Fast self, redisReply* reply, int collect_errors) {
437 redis_fast_reply_t res = {NULL, NULL};
438
439 switch (reply->type) {
440 case REDIS_REPLY_ERROR:
441 res.error = sv_2mortal(newSVpvn(reply->str, reply->len));
442 break;
443 case REDIS_REPLY_STRING:
444 case REDIS_REPLY_STATUS:
445 res.result = sv_2mortal(newSVpvn(reply->str, reply->len));
446 break;
447
448 case REDIS_REPLY_INTEGER:
449 res.result = sv_2mortal(newSViv(reply->integer));
450 break;
451 case REDIS_REPLY_NIL:
452 res.result = &PL_sv_undef;
453 break;
454
455 case REDIS_REPLY_ARRAY: {
456 AV* av = newAV();
457 size_t i;
458 res.result = sv_2mortal(newRV_noinc((SV*)av));
459
460 for (i = 0; i < reply->elements; i++) {
461 redis_fast_reply_t elem = Redis__Fast_decode_reply(self, reply->element[i], collect_errors);
462 if(collect_errors) {
463 AV* elem_av = (AV*)sv_2mortal((SV*)newAV());
464 if(elem.result) {
465 av_push(elem_av, SvREFCNT_inc(elem.result));
466 } else {
467 av_push(elem_av, newSV(0));
468 }
469 if(elem.error) {
470 av_push(elem_av, SvREFCNT_inc(elem.error));
471 } else {
472 av_push(elem_av, newSV(0));
473 }
474 av_push(av, newRV_inc((SV*)elem_av));
475 } else {
476 if(elem.result) {
477 av_push(av, SvREFCNT_inc(elem.result));
478 } else {
479 av_push(av, newSV(0));
480 }
481 if(elem.error && !res.error) {
482 res.error = elem.error;
483 }
484 }
485 }
486 break;
487 }
488 }
489
490 return res;
491 }
492
Redis__Fast_call_reconnect_on_error(Redis__Fast self,redis_fast_reply_t ret,const void * command_name,STRLEN command_length)493 static int Redis__Fast_call_reconnect_on_error(Redis__Fast self, redis_fast_reply_t ret, const void *command_name, STRLEN command_length) {
494 int _need_reconnect = 0;
495 struct timeval current;
496 double current_sec;
497 SV* sv_ret;
498 SV* sv_err;
499 SV* sv_cmd;
500 int count;
501
502 if (ret.error == NULL) {
503 return _need_reconnect;
504 }
505 if (self->reconnect_on_error == NULL) {
506 return _need_reconnect;
507 }
508
509 gettimeofday(¤t, NULL);
510 current_sec = current.tv_sec + 1E-6 * current.tv_usec;
511 if( self->next_reconnect_on_error_at < 0 ||
512 self->next_reconnect_on_error_at < current_sec) {
513 dSP;
514 ENTER;
515 SAVETMPS;
516
517 sv_ret = ret.result ? ret.result : &PL_sv_undef;
518 sv_err = ret.error;
519 sv_cmd = sv_2mortal(newSVpvn((const char*)command_name, command_length));
520
521 PUSHMARK(SP);
522 XPUSHs(sv_err);
523 XPUSHs(sv_ret);
524 XPUSHs(sv_cmd);
525 PUTBACK;
526
527 count = call_sv(self->reconnect_on_error, G_SCALAR);
528
529 SPAGAIN;
530
531 if (count != 1) {
532 croak("[BUG] retval count should be 1\n");
533 }
534 _need_reconnect = POPi;
535
536 PUTBACK;
537 FREETMPS;
538 LEAVE;
539 }
540
541 return _need_reconnect;
542 }
543
Redis__Fast_sync_reply_cb(redisAsyncContext * c,void * reply,void * privdata)544 static void Redis__Fast_sync_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
545 Redis__Fast self = (Redis__Fast)c->data;
546 redis_fast_sync_cb_t *cbt = (redis_fast_sync_cb_t*)privdata;
547 DEBUG_MSG("%p", (void*)privdata);
548 if(reply) {
549 self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
550 if(cbt->custom_decode) {
551 cbt->ret = (cbt->custom_decode)(self, (redisReply*)reply, cbt->collect_errors);
552 } else {
553 cbt->ret = Redis__Fast_decode_reply(self, (redisReply*)reply, cbt->collect_errors);
554 }
555 } else if(c->c.flags & REDIS_FREEING) {
556 DEBUG_MSG("%s", "redis freeing");
557 Safefree(cbt);
558 } else {
559 DEBUG_MSG("connect error: %s", c->errstr);
560 self->need_reconnect = 1;
561 cbt->ret.result = NULL;
562 cbt->ret.error = sv_2mortal( newSVpvn(c->errstr, strlen(c->errstr)) );
563 }
564 DEBUG_MSG("%s", "finish");
565 }
566
Redis__Fast_async_reply_cb(redisAsyncContext * c,void * reply,void * privdata)567 static void Redis__Fast_async_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
568 Redis__Fast self = (Redis__Fast)c->data;
569 redis_fast_async_cb_t *cbt = (redis_fast_async_cb_t*)privdata;
570 DEBUG_MSG("%p, %p", reply, privdata);
571 if (reply) {
572 self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
573
574 {
575 redis_fast_reply_t result;
576
577 dSP;
578
579 ENTER;
580 SAVETMPS;
581
582 if(cbt->custom_decode) {
583 result = (cbt->custom_decode)(self, (redisReply*)reply, cbt->collect_errors);
584 } else {
585 result = Redis__Fast_decode_reply(self, (redisReply*)reply, cbt->collect_errors);
586 }
587
588 if(result.result == NULL) result.result = &PL_sv_undef;
589 if(result.error == NULL) result.error = &PL_sv_undef;
590
591 PUSHMARK(SP);
592 XPUSHs(result.result);
593 XPUSHs(result.error);
594 PUTBACK;
595
596 call_sv(cbt->cb, G_DISCARD);
597
598 FREETMPS;
599 LEAVE;
600 }
601
602 {
603 if (0 < self->reconnect && !self->need_reconnect
604 // Avoid useless cost when reconnect_on_error is not set.
605 && self->reconnect_on_error != NULL) {
606 redis_fast_reply_t result;
607 if(cbt->custom_decode) {
608 result = (cbt->custom_decode)(
609 self, (redisReply*)reply, cbt->collect_errors
610 );
611 } else {
612 result = Redis__Fast_decode_reply(
613 self, (redisReply*)reply, cbt->collect_errors
614 );
615 }
616 self->need_reconnect = Redis__Fast_call_reconnect_on_error(
617 self, result, cbt->command_name, cbt->command_length
618 );
619 }
620 }
621 } else {
622 if (c->c.flags & REDIS_FREEING) {
623 DEBUG_MSG("%s", "redis freeing");
624 } else {
625 DEBUG_MSG("connect error: %s", c->errstr);
626 }
627
628 {
629 redis_fast_reply_t result;
630 const char *msg;
631
632 dSP;
633
634 ENTER;
635 SAVETMPS;
636
637 result.result = &PL_sv_undef;
638 if (c->c.flags & REDIS_FREEING) {
639 msg = "redis freeing";
640 } else {
641 msg = c->errstr;
642 }
643 DEBUG_MSG("error: %s", msg);
644 result.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
645
646 PUSHMARK(SP);
647 XPUSHs(result.result);
648 XPUSHs(result.error);
649 PUTBACK;
650
651 call_sv(cbt->cb, G_DISCARD);
652
653 FREETMPS;
654 LEAVE;
655 }
656 }
657
658 SvREFCNT_dec(cbt->cb);
659 Safefree(cbt);
660 }
661
Redis__Fast_subscribe_cb(redisAsyncContext * c,void * reply,void * privdata)662 static void Redis__Fast_subscribe_cb(redisAsyncContext* c, void* reply, void* privdata) {
663 int is_need_free = 0;
664 Redis__Fast self = (Redis__Fast)c->data;
665 redis_fast_subscribe_cb_t *cbt = (redis_fast_subscribe_cb_t*)privdata;
666 redisReply* r = (redisReply*)reply;
667
668 DEBUG_MSG("%s", "start");
669 if(!cbt) {
670 DEBUG_MSG("%s", "cbt is empty finished");
671 return ;
672 }
673
674 if (r) {
675 char* stype = r->element[0]->str;
676 int pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
677 redis_fast_reply_t res;
678
679 dSP;
680 ENTER;
681 SAVETMPS;
682
683 res = Redis__Fast_decode_reply(self, r, 0);
684
685 if (strcasecmp(stype+pvariant,"subscribe") == 0) {
686 DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
687 self->is_subscriber = r->element[2]->integer;
688 self->expected_subs--;
689 } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
690 DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
691 self->is_subscriber = r->element[2]->integer;
692 is_need_free = 1;
693 self->expected_subs--;
694 } else {
695 DEBUG_MSG("%s %s", r->element[0]->str, r->element[1]->str);
696 self->proccess_sub_count++;
697 }
698
699 if(res.result == NULL) res.result = &PL_sv_undef;
700 if(res.error == NULL) res.error = &PL_sv_undef;
701
702 PUSHMARK(SP);
703 XPUSHs(res.result);
704 XPUSHs(res.error);
705 PUTBACK;
706
707 call_sv(cbt->cb, G_DISCARD);
708
709 FREETMPS;
710 LEAVE;
711 } else {
712 DEBUG_MSG("connect error: %s", c->errstr);
713 is_need_free = 1;
714 }
715
716 if(is_need_free) {
717 // destroy private data
718 DEBUG_MSG("destroy %p", cbt);
719 if(cbt->cb) {
720 SvREFCNT_dec(cbt->cb);
721 cbt->cb = NULL;
722 }
723 Safefree(cbt);
724 }
725 DEBUG_MSG("%s", "finish");
726 }
727
Redis__Fast_quit(Redis__Fast self)728 static void Redis__Fast_quit(Redis__Fast self) {
729 redis_fast_sync_cb_t *cbt;
730
731 if(!self->ac) {
732 return;
733 }
734
735 Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
736 cbt->ret.result = NULL;
737 cbt->ret.error = NULL;
738 cbt->custom_decode = NULL;
739
740 // initialize, or self->flags will be corrupted.
741 cbt->on_flags = 0;
742 cbt->off_flags = 0;
743
744 redisAsyncCommand(
745 self->ac, Redis__Fast_sync_reply_cb, cbt, "QUIT"
746 );
747 redisAsyncDisconnect(self->ac);
748 if(_wait_all_responses(self) == WAIT_FOR_EVENT_OK) {
749 DEBUG_MSG("%s", "wait_all_responses ok");
750 if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
751 } else {
752 DEBUG_MSG("%s", "wait_all_responses not ok");
753 if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
754 }
755 DEBUG_MSG("%s", "finish");
756 }
757
Redis__Fast_run_cmd(Redis__Fast self,int collect_errors,CUSTOM_DECODE custom_decode,SV * cb,int argc,const char ** argv,size_t * argvlen)758 static redis_fast_reply_t Redis__Fast_run_cmd(Redis__Fast self, int collect_errors, CUSTOM_DECODE custom_decode, SV* cb, int argc, const char** argv, size_t* argvlen) {
759 redis_fast_reply_t ret = {NULL, NULL};
760 int on_flags = 0, off_flags = ~0;
761
762 DEBUG_MSG("start %s", argv[0]);
763
764 DEBUG_MSG("pid check: previous pid is %d, now %d", self->pid, getpid());
765 if(self->pid != getpid()) {
766 DEBUG_MSG("%s", "pid changed. create new connection..");
767 Redis__Fast_connect(self);
768 }
769
770 if(EQUALS_COMMAND(argvlen[0], argv[0], "MULTI")) {
771 on_flags = FLAG_INSIDE_TRANSACTION;
772 } else if(EQUALS_COMMAND(argvlen[0], argv[0], "EXEC") ||
773 EQUALS_COMMAND(argvlen[0], argv[0], "DISCARD")) {
774 off_flags = ~(FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH);
775 } else if(EQUALS_COMMAND(argvlen[0], argv[0], "WATCH")) {
776 on_flags = FLAG_INSIDE_WATCH;
777 } else if(EQUALS_COMMAND(argvlen[0], argv[0], "UNWATCH")) {
778 off_flags = ~FLAG_INSIDE_WATCH;
779 }
780
781 if(cb) {
782 redis_fast_async_cb_t *cbt;
783 Newx(cbt, sizeof(redis_fast_async_cb_t), redis_fast_async_cb_t);
784 cbt->cb = SvREFCNT_inc(cb);
785 cbt->custom_decode = custom_decode;
786 cbt->collect_errors = collect_errors;
787 cbt->on_flags = on_flags;
788 cbt->off_flags = off_flags;
789 cbt->command_name = argv[0];
790 cbt->command_length = argvlen[0];
791 redisAsyncCommandArgv(
792 self->ac, Redis__Fast_async_reply_cb, cbt,
793 argc, argv, argvlen
794 );
795 ret.result = sv_2mortal(newSViv(1));
796 } else {
797 redis_fast_sync_cb_t *cbt;
798 int i, cnt = (self->reconnect == 0 ? 1 : 2);
799 int res = WAIT_FOR_EVENT_OK;
800 for(i = 0; i < cnt; i++) {
801 Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
802 self->need_reconnect = 0;
803 cbt->ret.result = NULL;
804 cbt->ret.error = NULL;
805 cbt->custom_decode = custom_decode;
806 cbt->collect_errors = collect_errors;
807 cbt->on_flags = on_flags;
808 cbt->off_flags = off_flags;
809 DEBUG_MSG("%s", "send command in sync mode");
810 redisAsyncCommandArgv(
811 self->ac, Redis__Fast_sync_reply_cb, cbt,
812 argc, argv, argvlen
813 );
814 DEBUG_MSG("%s", "waiting response");
815 res = _wait_all_responses(self);
816 if(res == WAIT_FOR_EVENT_OK && self->need_reconnect == 0) {
817 int _need_reconnect = 0;
818 if (1 < cnt - i) {
819 _need_reconnect = Redis__Fast_call_reconnect_on_error(
820 self, cbt->ret, argv[0], argvlen[0]
821 );
822 // Should be quit before reconnect
823 if (_need_reconnect) {
824 Redis__Fast_quit(self);
825 }
826 }
827 if (!_need_reconnect) {
828 ret = cbt->ret;
829 if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
830 DEBUG_MSG("finish %s", argv[0]);
831 return ret;
832 }
833 }
834
835 if( res == WAIT_FOR_EVENT_READ_TIMEOUT ) break;
836
837 if(self->flags & (FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH)) {
838 char *msg = "reconnect disabled inside transaction or watch";
839 DEBUG_MSG("error: %s", msg);
840 ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
841 return ret;
842 }
843
844 Redis__Fast_reconnect(self);
845 if(!self->ac) {
846 char *msg = "Not connected to any server";
847 DEBUG_MSG("error: %s", msg);
848 ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
849 return ret;
850 }
851 }
852
853 if( res == WAIT_FOR_EVENT_OK && (cbt->ret.result || cbt->ret.error) ) Safefree(cbt);
854 // else destructor will release cbt
855
856 if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) {
857 snprintf(self->error, MAX_ERROR_SIZE, "Error while reading from Redis server: %s", strerror(EAGAIN));
858 errno = EAGAIN;
859 DEBUG_MSG("error: %s", self->error);
860 ret.error = sv_2mortal(newSVpvn(self->error, strlen(self->error)));
861 return ret;
862 }
863 if(!self->ac) {
864 char *msg = "Not connected to any server";
865 DEBUG_MSG("error: %s", msg);
866 ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
867 return ret;
868 }
869 }
870 DEBUG_MSG("Finish %s", argv[0]);
871 return ret;
872 }
873
Redis__Fast_keys_custom_decode(Redis__Fast self,redisReply * reply,int collect_errors)874 static redis_fast_reply_t Redis__Fast_keys_custom_decode(Redis__Fast self, redisReply* reply, int collect_errors) {
875 // TODO: Support redis <= 1.2.6
876 return Redis__Fast_decode_reply(self, reply, collect_errors);
877 }
878
Redis__Fast_info_custom_decode(Redis__Fast self,redisReply * reply,int collect_errors)879 static redis_fast_reply_t Redis__Fast_info_custom_decode(Redis__Fast self, redisReply* reply, int collect_errors) {
880 redis_fast_reply_t res = {NULL, NULL};
881
882 if(reply->type == REDIS_REPLY_STRING ||
883 reply->type == REDIS_REPLY_STATUS) {
884
885 HV* hv = newHV();
886 char* str = reply->str;
887 size_t len = reply->len;
888 res.result = sv_2mortal(newRV_noinc((SV*)hv));
889
890 while(len != 0) {
891 const char* line = (char*)memchr(str, '\r', len);
892 const char* sep;
893 size_t linelen;
894 if(line == NULL) {
895 linelen = len;
896 } else {
897 linelen = line - str;
898 }
899 sep = (char*)memchr(str, ':', linelen);
900 if(str[0] != '#' && sep != NULL) {
901 SV* val;
902 SV** ret;
903 size_t keylen;
904 keylen = sep - str;
905 val = sv_2mortal(newSVpvn(sep + 1, linelen - keylen - 1));
906 ret = hv_store(hv, str, keylen, SvREFCNT_inc(val), 0);
907 if (ret == NULL) {
908 SvREFCNT_dec(val);
909 croak("failed to hv_store");
910 }
911 }
912 if(line == NULL) {
913 break;
914 } else {
915 len -= linelen + 2;
916 str += linelen + 2;
917 }
918 }
919 } else {
920 res = Redis__Fast_decode_reply(self, reply, collect_errors);
921 }
922
923 return res;
924 }
925
926 MODULE = Redis::Fast PACKAGE = Redis::Fast
927
928 SV*
929 _new(char* cls);
930 PREINIT:
931 redis_fast_t* self;
932 CODE:
933 {
934 Newxz(self, sizeof(redis_fast_t), redis_fast_t);
935 DEBUG_MSG("%s", "start");
936 self->error = (char*)malloc(MAX_ERROR_SIZE);
937 self->reconnect_on_error = NULL;
938 self->next_reconnect_on_error_at = -1;
939 ST(0) = sv_newmortal();
940 sv_setref_pv(ST(0), cls, (void*)self);
941 DEBUG_MSG("return %p", ST(0));
942 XSRETURN(1);
943 }
944 OUTPUT:
945 RETVAL
946
947 double
__set_reconnect(Redis::Fast self,double val)948 __set_reconnect(Redis::Fast self, double val)
949 CODE:
950 {
951 RETVAL = self->reconnect = val;
952 }
953 OUTPUT:
954 RETVAL
955
956
957 double
__get_reconnect(Redis::Fast self)958 __get_reconnect(Redis::Fast self)
959 CODE:
960 {
961 RETVAL = self->reconnect;
962 }
963 OUTPUT:
964 RETVAL
965
966
967 int
__set_every(Redis::Fast self,int val)968 __set_every(Redis::Fast self, int val)
969 CODE:
970 {
971 RETVAL = self->every = val;
972 }
973 OUTPUT:
974 RETVAL
975
976
977 int
__get_every(Redis::Fast self)978 __get_every(Redis::Fast self)
979 CODE:
980 {
981 RETVAL = self->every;
982 }
983 OUTPUT:
984 RETVAL
985
986 int
__set_debug(Redis::Fast self,int val)987 __set_debug(Redis::Fast self, int val)
988 CODE:
989 {
990 RETVAL = self->debug = val;
991 }
992 OUTPUT:
993 RETVAL
994
995 double
__set_cnx_timeout(Redis::Fast self,double val)996 __set_cnx_timeout(Redis::Fast self, double val)
997 CODE:
998 {
999 RETVAL = self->cnx_timeout = val;
1000 }
1001 OUTPUT:
1002 RETVAL
1003
1004 double
__get_cnx_timeout(Redis::Fast self)1005 __get_cnx_timeout(Redis::Fast self)
1006 CODE:
1007 {
1008 RETVAL = self->cnx_timeout;
1009 }
1010 OUTPUT:
1011 RETVAL
1012
1013
1014 double
__set_read_timeout(Redis::Fast self,double val)1015 __set_read_timeout(Redis::Fast self, double val)
1016 CODE:
1017 {
1018 RETVAL = self->read_timeout = val;
1019 }
1020 OUTPUT:
1021 RETVAL
1022
1023 double
__get_read_timeout(Redis::Fast self)1024 __get_read_timeout(Redis::Fast self)
1025 CODE:
1026 {
1027 RETVAL = self->read_timeout;
1028 }
1029 OUTPUT:
1030 RETVAL
1031
1032
1033 double
__set_write_timeout(Redis::Fast self,double val)1034 __set_write_timeout(Redis::Fast self, double val)
1035 CODE:
1036 {
1037 RETVAL = self->write_timeout = val;
1038 }
1039 OUTPUT:
1040 RETVAL
1041
1042 double
__get_write_timeout(Redis::Fast self)1043 __get_write_timeout(Redis::Fast self)
1044 CODE:
1045 {
1046 RETVAL = self->write_timeout;
1047 }
1048 OUTPUT:
1049 RETVAL
1050
1051
1052 int
__set_current_database(Redis::Fast self,int val)1053 __set_current_database(Redis::Fast self, int val)
1054 CODE:
1055 {
1056 RETVAL = self->current_database = val;
1057 }
1058 OUTPUT:
1059 RETVAL
1060
1061
1062 int
__get_current_database(Redis::Fast self)1063 __get_current_database(Redis::Fast self)
1064 CODE:
1065 {
1066 RETVAL = self->current_database;
1067 }
1068 OUTPUT:
1069 RETVAL
1070
1071
1072 int
__sock(Redis::Fast self)1073 __sock(Redis::Fast self)
1074 CODE:
1075 {
1076 RETVAL = self->ac ? self->ac->c.fd : 0;
1077 }
1078 OUTPUT:
1079 RETVAL
1080
1081
1082 int
__get_port(Redis::Fast self)1083 __get_port(Redis::Fast self)
1084 CODE:
1085 {
1086 struct sockaddr_in addr;
1087 socklen_t len;
1088 len = sizeof( addr );
1089 getsockname( self->ac->c.fd, ( struct sockaddr *)&addr, &len );
1090 RETVAL = addr.sin_port;
1091 }
1092 OUTPUT:
1093 RETVAL
1094
1095
1096 void
__set_on_connect(Redis::Fast self,SV * func)1097 __set_on_connect(Redis::Fast self, SV* func)
1098 CODE:
1099 {
1100 self->on_connect = SvREFCNT_inc(func);
1101 }
1102
1103 void
__set_on_build_sock(Redis::Fast self,SV * func)1104 __set_on_build_sock(Redis::Fast self, SV* func)
1105 CODE:
1106 {
1107 self->on_build_sock = SvREFCNT_inc(func);
1108 }
1109
1110 void
__set_data(Redis::Fast self,SV * data)1111 __set_data(Redis::Fast self, SV* data)
1112 CODE:
1113 {
1114 self->data = SvREFCNT_inc(data);
1115 }
1116
1117 void
__get_data(Redis::Fast self)1118 __get_data(Redis::Fast self)
1119 CODE:
1120 {
1121 ST(0) = self->data;
1122 XSRETURN(1);
1123 }
1124
1125 void
__set_reconnect_on_error(Redis::Fast self,SV * func)1126 __set_reconnect_on_error(Redis::Fast self, SV* func)
1127 CODE:
1128 {
1129 self->reconnect_on_error = SvREFCNT_inc(func);
1130 }
1131
1132 double
__set_next_reconnect_on_error_at(Redis::Fast self,double val)1133 __set_next_reconnect_on_error_at(Redis::Fast self, double val)
1134 CODE:
1135 {
1136 struct timeval current;
1137 double current_sec;
1138
1139 if ( -1 < val ) {
1140 gettimeofday(¤t, NULL);
1141 current_sec = current.tv_sec + 1E-6 * current.tv_usec;
1142 val += current_sec;
1143 }
1144
1145 RETVAL = self->next_reconnect_on_error_at = val;
1146 }
1147 OUTPUT:
1148 RETVAL
1149
1150 void
is_subscriber(Redis::Fast self)1151 is_subscriber(Redis::Fast self)
1152 CODE:
1153 {
1154 ST(0) = sv_2mortal(newSViv(self->is_subscriber));
1155 XSRETURN(1);
1156 }
1157
1158
1159 void
1160 DESTROY(Redis::Fast self);
1161 CODE:
1162 {
1163 DEBUG_MSG("%s", "start");
1164 if (self->ac) {
1165 DEBUG_MSG("%s", "free ac");
1166 redisAsyncFree(self->ac);
1167 _wait_all_responses(self);
1168 }
1169
1170 if(self->hostname) {
1171 DEBUG_MSG("%s", "free hostname");
1172 free(self->hostname);
1173 self->hostname = NULL;
1174 }
1175
1176 if(self->path) {
1177 DEBUG_MSG("%s", "free path");
1178 free(self->path);
1179 self->path = NULL;
1180 }
1181
1182 if(self->error) {
1183 DEBUG_MSG("%s", "free error");
1184 free(self->error);
1185 self->error = NULL;
1186 }
1187
1188 if(self->on_connect) {
1189 DEBUG_MSG("%s", "free on_connect");
1190 SvREFCNT_dec(self->on_connect);
1191 self->on_connect = NULL;
1192 }
1193
1194 if(self->on_build_sock) {
1195 DEBUG_MSG("%s", "free on_build_sock");
1196 SvREFCNT_dec(self->on_build_sock);
1197 self->on_build_sock = NULL;
1198 }
1199
1200 if(self->data) {
1201 DEBUG_MSG("%s", "free data");
1202 SvREFCNT_dec(self->data);
1203 self->data = NULL;
1204 }
1205
1206 if(self->reconnect_on_error) {
1207 DEBUG_MSG("%s", "free reconnect_on_error");
1208 SvREFCNT_dec(self->reconnect_on_error);
1209 self->reconnect_on_error = NULL;
1210 }
1211
1212 Safefree(self);
1213 DEBUG_MSG("%s", "finish");
1214 }
1215
1216
1217 void
1218 __connection_info(Redis::Fast self, char* hostname, int port = 6379)
1219 CODE:
1220 {
1221 if(self->hostname) {
1222 free(self->hostname);
1223 self->hostname = NULL;
1224 }
1225
1226 if(self->path) {
1227 free(self->path);
1228 self->path = NULL;
1229 }
1230
1231 if(hostname) {
1232 self->hostname = (char*)malloc(strlen(hostname) + 1);
1233 strcpy(self->hostname, hostname);
1234 }
1235
1236 self->port = port;
1237 }
1238
1239 void
__connection_info_unix(Redis::Fast self,char * path)1240 __connection_info_unix(Redis::Fast self, char* path)
1241 CODE:
1242 {
1243 if(self->hostname) {
1244 free(self->hostname);
1245 self->hostname = NULL;
1246 }
1247
1248 if(self->path) {
1249 free(self->path);
1250 self->path = NULL;
1251 }
1252
1253 if(path) {
1254 self->path = (char*)malloc(strlen(path) + 1);
1255 strcpy(self->path, path);
1256 }
1257 }
1258
1259
1260 void
connect(Redis::Fast self)1261 connect(Redis::Fast self)
1262 CODE:
1263 {
1264 Redis__Fast_connect(self);
1265 }
1266
1267 void
wait_all_responses(Redis::Fast self)1268 wait_all_responses(Redis::Fast self)
1269 CODE:
1270 {
1271 int res = _wait_all_responses(self);
1272 if(res != WAIT_FOR_EVENT_OK) {
1273 croak("Error while reading from Redis server");
1274 }
1275
1276 if (0 < self->reconnect && self->need_reconnect) {
1277 // Should be quit before reconnect
1278 Redis__Fast_quit(self);
1279 Redis__Fast_reconnect(self);
1280 self->need_reconnect = 0;
1281 }
1282 }
1283
1284 void
wait_one_response(Redis::Fast self)1285 wait_one_response(Redis::Fast self)
1286 CODE:
1287 {
1288 int res = _wait_all_responses(self);
1289 if(res != WAIT_FOR_EVENT_OK) {
1290 croak("Error while reading from Redis server");
1291 }
1292
1293 if (0 < self->reconnect && self->need_reconnect) {
1294 // Should be quit before reconnect
1295 Redis__Fast_quit(self);
1296 Redis__Fast_reconnect(self);
1297 self->need_reconnect = 0;
1298 }
1299 }
1300
1301 void
1302 __std_cmd(Redis::Fast self, ...)
1303 PREINIT:
1304 redis_fast_reply_t ret;
1305 SV* cb;
1306 char** argv;
1307 size_t* argvlen;
1308 STRLEN len;
1309 int argc, i, collect_errors;
1310 CODE:
1311 {
1312 Redis__Fast_reconnect(self);
1313 if(!self->ac) {
1314 croak("Not connected to any server");
1315 }
1316
1317 cb = ST(items - 1);
1318 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1319 argc = items - 2;
1320 } else {
1321 cb = NULL;
1322 argc = items - 1;
1323 }
1324 Newx(argv, sizeof(char*) * argc, char*);
1325 Newx(argvlen, sizeof(size_t) * argc, size_t);
1326
1327 for (i = 0; i < argc; i++) {
1328 if(!sv_utf8_downgrade(ST(i + 1), 1)) {
1329 croak("command sent is not an octet sequence in the native encoding (Latin-1). Consider using debug mode to see the command itself.");
1330 }
1331 argv[i] = SvPV(ST(i + 1), len);
1332 argvlen[i] = len;
1333 }
1334
1335 collect_errors = 0;
1336 if(cb && EQUALS_COMMAND(argvlen[0], argv[0], "EXEC"))
1337 collect_errors = 1;
1338
1339 ret = Redis__Fast_run_cmd(self, collect_errors, NULL, cb, argc, (const char**)argv, argvlen);
1340
1341 Safefree(argv);
1342 Safefree(argvlen);
1343
1344 ST(0) = ret.result ? ret.result : &PL_sv_undef;
1345 ST(1) = ret.error ? ret.error : &PL_sv_undef;
1346 XSRETURN(2);
1347 }
1348
1349
1350 void
__quit(Redis::Fast self)1351 __quit(Redis::Fast self)
1352 CODE:
1353 {
1354 DEBUG_MSG("%s", "start QUIT");
1355 if(self->ac) {
1356 Redis__Fast_quit(self);
1357 ST(0) = sv_2mortal(newSViv(1));
1358 XSRETURN(1);
1359 } else {
1360 DEBUG_MSG("%s", "finish. there is no connection.");
1361 XSRETURN(0);
1362 }
1363 }
1364
1365
1366 void
__shutdown(Redis::Fast self)1367 __shutdown(Redis::Fast self)
1368 CODE:
1369 {
1370 DEBUG_MSG("%s", "start SHUTDOWN");
1371 if(self->ac) {
1372 redisAsyncCommand(
1373 self->ac, NULL, NULL, "SHUTDOWN"
1374 );
1375 redisAsyncDisconnect(self->ac);
1376 _wait_all_responses(self);
1377 self->is_connected = 0;
1378 ST(0) = sv_2mortal(newSViv(1));
1379 XSRETURN(1);
1380 } else {
1381 DEBUG_MSG("%s", "redis server has alread shutdown");
1382 XSRETURN(0);
1383 }
1384 }
1385
1386
1387 void
1388 __keys(Redis::Fast self, ...)
1389 PREINIT:
1390 redis_fast_reply_t ret;
1391 SV* cb;
1392 char** argv;
1393 size_t* argvlen;
1394 STRLEN len;
1395 int argc, i;
1396 CODE:
1397 {
1398 Redis__Fast_reconnect(self);
1399 if(!self->ac) {
1400 croak("Not connected to any server");
1401 }
1402
1403 cb = ST(items - 1);
1404 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1405 argc = items - 1;
1406 } else {
1407 cb = NULL;
1408 argc = items;
1409 }
1410 Newx(argv, sizeof(char*) * argc, char*);
1411 Newx(argvlen, sizeof(size_t) * argc, size_t);
1412
1413 argv[0] = "KEYS";
1414 argvlen[0] = 4;
1415 for (i = 1; i < argc; i++) {
1416 argv[i] = SvPV(ST(i), len);
1417 argvlen[i] = len;
1418 }
1419
1420 ret = Redis__Fast_run_cmd(self, 0, Redis__Fast_keys_custom_decode, cb, argc, (const char**)argv, argvlen);
1421 Safefree(argv);
1422 Safefree(argvlen);
1423
1424 ST(0) = ret.result ? ret.result : &PL_sv_undef;
1425 ST(1) = ret.error ? ret.error : &PL_sv_undef;
1426 XSRETURN(2);
1427 }
1428
1429
1430 void
1431 __info(Redis::Fast self, ...)
1432 PREINIT:
1433 redis_fast_reply_t ret;
1434 SV* cb;
1435 char** argv;
1436 size_t* argvlen;
1437 STRLEN len;
1438 int argc, i;
1439 CODE:
1440 {
1441 Redis__Fast_reconnect(self);
1442 if(!self->ac) {
1443 char *msg = "Not connected to any server";
1444 ST(0) = &PL_sv_undef;
1445 ST(1) = sv_2mortal(newSVpvn(msg, strlen(msg)));
1446 XSRETURN(2);
1447 }
1448
1449 cb = ST(items - 1);
1450 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1451 argc = items - 1;
1452 } else {
1453 cb = NULL;
1454 argc = items;
1455 }
1456 Newx(argv, sizeof(char*) * argc, char*);
1457 Newx(argvlen, sizeof(size_t) * argc, size_t);
1458
1459 argv[0] = "INFO";
1460 argvlen[0] = 4;
1461 for (i = 1; i < argc; i++) {
1462 argv[i] = SvPV(ST(i), len);
1463 argvlen[i] = len;
1464 }
1465
1466 ret = Redis__Fast_run_cmd(self, 0, Redis__Fast_info_custom_decode, cb, argc, (const char**)argv, argvlen);
1467 Safefree(argv);
1468 Safefree(argvlen);
1469
1470 ST(0) = ret.result ? ret.result : &PL_sv_undef;
1471 ST(1) = ret.error ? ret.error : &PL_sv_undef;
1472 XSRETURN(2);
1473 }
1474
1475
1476 void
1477 __send_subscription_cmd(Redis::Fast self, ...)
1478 PREINIT:
1479 SV* cb;
1480 char** argv;
1481 size_t* argvlen;
1482 STRLEN len;
1483 int argc, i;
1484 redis_fast_subscribe_cb_t* cbt;
1485 int pvariant;
1486 CODE:
1487 {
1488 int cnt = (self->reconnect == 0 ? 1 : 2);
1489
1490 DEBUG_MSG("%s", "start");
1491
1492 Redis__Fast_reconnect(self);
1493 if(!self->ac) {
1494 croak("Not connected to any server");
1495 }
1496
1497 if(!self->is_subscriber) {
1498 _wait_all_responses(self);
1499 }
1500 cb = ST(items - 1);
1501 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1502 argc = items - 2;
1503 } else {
1504 cb = NULL;
1505 argc = items - 1;
1506 }
1507 Newx(argv, sizeof(char*) * argc, char*);
1508 Newx(argvlen, sizeof(size_t) * argc, size_t);
1509
1510 for (i = 0; i < argc; i++) {
1511 argv[i] = SvPV(ST(i+1), len);
1512 argvlen[i] = len;
1513 DEBUG_MSG("argv[%d] = %s", i, argv[i]);
1514 }
1515
1516 for(i = 0; i < cnt; i++) {
1517 pvariant = tolower(argv[0][0]) == 'p';
1518 if (strcasecmp(argv[0]+pvariant,"unsubscribe") != 0) {
1519 DEBUG_MSG("%s", "command is not unsubscribe");
1520 Newx(cbt, sizeof(redis_fast_subscribe_cb_t), redis_fast_subscribe_cb_t);
1521 cbt->self = self;
1522 cbt->cb = SvREFCNT_inc(cb);
1523 } else {
1524 DEBUG_MSG("%s", "command is unsubscribe");
1525 cbt = NULL;
1526 }
1527 redisAsyncCommandArgv(
1528 self->ac, cbt ? Redis__Fast_subscribe_cb : NULL, cbt,
1529 argc, (const char**)argv, argvlen
1530 );
1531 self->expected_subs = argc - 1;
1532 while(self->expected_subs > 0 && wait_for_event(self, self->read_timeout, self->write_timeout) == WAIT_FOR_EVENT_OK) ;
1533 if(self->expected_subs == 0) break;
1534 Redis__Fast_reconnect(self);
1535 if(!self->ac) {
1536 Safefree(argv);
1537 Safefree(argvlen);
1538 croak("Not connected to any server");
1539 }
1540 }
1541
1542 Safefree(argv);
1543 Safefree(argvlen);
1544 DEBUG_MSG("%s", "finish");
1545 XSRETURN(0);
1546 }
1547
1548 void
1549 wait_for_messages(Redis::Fast self, double timeout = -1)
1550 CODE:
1551 {
1552 int i, cnt = (self->reconnect == 0 ? 1 : 2);
1553 int res = WAIT_FOR_EVENT_OK;
1554 DEBUG_MSG("%s", "start");
1555 self->proccess_sub_count = 0;
1556 for(i = 0; i < cnt; i++) {
1557 while((res = wait_for_event(self, timeout, timeout)) == WAIT_FOR_EVENT_OK) ;
1558 if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) break;
1559 Redis__Fast_reconnect(self);
1560 if(!self->ac) {
1561 croak("Not connected to any server");
1562 }
1563 }
1564 if(res == WAIT_FOR_EVENT_EXCEPTION) {
1565 if(!self->ac) {
1566 DEBUG_MSG("%s", "Connection not found");
1567 croak("EOF from server");
1568 } else if(self->ac->c.err == REDIS_ERR_EOF) {
1569 DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1570 croak("EOF from server");
1571 } else {
1572 DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1573 snprintf(self->error, MAX_ERROR_SIZE, "[WAIT_FOR_MESSAGES] %s", self->ac->c.errstr);
1574 croak("%s", self->error);
1575 }
1576 }
1577 ST(0) = sv_2mortal(newSViv(self->proccess_sub_count));
1578 DEBUG_MSG("finish with %d", res);
1579 XSRETURN(1);
1580 }
1581
1582 void
1583 __wait_for_event(Redis::Fast self, double timeout = -1)
1584 CODE:
1585 {
1586 DEBUG_MSG("%s", "start");
1587 wait_for_event(self, timeout, timeout);
1588 DEBUG_MSG("%s", "finish");
1589 XSRETURN(0);
1590 }
1591