1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4 
5 #include "ppport.h"
6 #include "hiredis.h"
7 #include "async.h"
8 
9 #include <string.h>
10 #include <stdlib.h>
11 #include <time.h>
12 #include <stdio.h>
13 #include <poll.h>
14 #include <sys/socket.h>
15 
16 #define MAX_ERROR_SIZE 256
17 
18 #define WAIT_FOR_EVENT_OK 0
19 #define WAIT_FOR_EVENT_READ_TIMEOUT 1
20 #define WAIT_FOR_EVENT_WRITE_TIMEOUT 2
21 #define WAIT_FOR_EVENT_EXCEPTION 3
22 
23 #define FLAG_INSIDE_TRANSACTION 0x01
24 #define FLAG_INSIDE_WATCH       0x02
25 
26 #define DEBUG_MSG(fmt, ...) \
27     if (self->debug) {                                                  \
28         fprintf(stderr, "[%s:%d:%s]: ", __FILE__, __LINE__, __func__);  \
29         fprintf(stderr, fmt, __VA_ARGS__);                              \
30         fprintf(stderr, "\n");                                          \
31     }
32 
33 #define EQUALS_COMMAND(len, cmd, expected) ((len) == sizeof(expected) - 1 && memcmp(cmd, expected, sizeof(expected) - 1) == 0)
34 
35 typedef struct redis_fast_s {
36     redisAsyncContext* ac;
37     char* hostname;
38     int port;
39     char* path;
40     char* error;
41     double reconnect;
42     int every;
43     int debug;
44     double cnx_timeout;
45     double read_timeout;
46     double write_timeout;
47     int current_database;
48     int need_reconnect;
49     int is_connected;
50     SV* on_connect;
51     SV* on_build_sock;
52     SV* data;
53     SV* reconnect_on_error;
54     double next_reconnect_on_error_at;
55     int proccess_sub_count;
56     int is_subscriber;
57     int expected_subs;
58     pid_t pid;
59     int flags;
60 } redis_fast_t, *Redis__Fast;
61 
62 typedef struct redis_fast_reply_s {
63     SV* result;
64     SV* error;
65 } redis_fast_reply_t;
66 
67 typedef redis_fast_reply_t (*CUSTOM_DECODE)(Redis__Fast self, redisReply* reply, int collect_errors);
68 
69 typedef struct redis_fast_sync_cb_s {
70     redis_fast_reply_t ret;
71     int collect_errors;
72     CUSTOM_DECODE custom_decode;
73     int on_flags;
74     int off_flags;
75 } redis_fast_sync_cb_t;
76 
77 typedef struct redis_fast_async_cb_s {
78     SV* cb;
79     int collect_errors;
80     CUSTOM_DECODE custom_decode;
81     int on_flags;
82     int off_flags;
83     const void* command_name;
84     STRLEN command_length;
85 } redis_fast_async_cb_t;
86 
87 typedef struct redis_fast_subscribe_cb_s {
88     Redis__Fast self;
89     SV* cb;
90 } redis_fast_subscribe_cb_t;
91 
92 
93 #define WAIT_FOR_READ  0x01
94 #define WAIT_FOR_WRITE 0x02
95 typedef struct redis_fast_event_s {
96     int flags;
97     Redis__Fast self;
98 } redis_fast_event_t;
99 
100 
AddRead(void * privdata)101 static void AddRead(void *privdata) {
102     redis_fast_event_t *e = (redis_fast_event_t*)privdata;
103     Redis__Fast self = e->self;
104     e->flags |= WAIT_FOR_READ;
105     DEBUG_MSG("flags = %x", e->flags);
106 }
107 
DelRead(void * privdata)108 static void DelRead(void *privdata) {
109     redis_fast_event_t *e = (redis_fast_event_t*)privdata;
110     Redis__Fast self = e->self;
111     e->flags &= ~WAIT_FOR_READ;
112     DEBUG_MSG("flags = %x", e->flags);
113 }
114 
AddWrite(void * privdata)115 static void AddWrite(void *privdata) {
116     redis_fast_event_t *e = (redis_fast_event_t*)privdata;
117     Redis__Fast self = e->self;
118     e->flags |= WAIT_FOR_WRITE;
119     DEBUG_MSG("flags = %x", e->flags);
120 }
121 
DelWrite(void * privdata)122 static void DelWrite(void *privdata) {
123     redis_fast_event_t *e = (redis_fast_event_t*)privdata;
124     Redis__Fast self = e->self;
125     e->flags &= ~WAIT_FOR_WRITE;
126     DEBUG_MSG("flags = %x", e->flags);
127 }
128 
Cleanup(void * privdata)129 static void Cleanup(void *privdata) {
130     free(privdata);
131 }
132 
Attach(redisAsyncContext * ac)133 static int Attach(redisAsyncContext *ac) {
134     Redis__Fast self = (Redis__Fast)ac->data;
135     redis_fast_event_t *e;
136 
137     /* Nothing should be attached when something is already attached */
138     if (ac->ev.data != NULL)
139         return REDIS_ERR;
140 
141     /* Create container for context and r/w events */
142     e = (redis_fast_event_t*)malloc(sizeof(*e));
143     e->flags = 0;
144     e->self = self;
145 
146     /* Register functions to start/stop listening for events */
147     ac->ev.addRead = AddRead;
148     ac->ev.delRead = DelRead;
149     ac->ev.addWrite = AddWrite;
150     ac->ev.delWrite = DelWrite;
151     ac->ev.cleanup = Cleanup;
152     ac->ev.data = e;
153 
154     return REDIS_OK;
155 }
156 
wait_for_event(Redis__Fast self,double read_timeout,double write_timeout)157 static int wait_for_event(Redis__Fast self, double read_timeout, double write_timeout) {
158     redisContext *c;
159     int fd;
160     redis_fast_event_t *e;
161     struct pollfd pollfd;
162     int rc;
163     double timeout = -1;
164     int timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
165     int ms;
166 
167     if(self==NULL) return WAIT_FOR_EVENT_EXCEPTION;
168     if(self->ac==NULL) return WAIT_FOR_EVENT_EXCEPTION;
169 
170     c = &(self->ac->c);
171     fd = c->fd;
172     e = (redis_fast_event_t*)self->ac->ev.data;
173     if(e==NULL) return 0;
174 
175     if((e->flags & (WAIT_FOR_READ|WAIT_FOR_WRITE)) == (WAIT_FOR_READ|WAIT_FOR_WRITE)) {
176         DEBUG_MSG("set READ and WRITE, compare read_timeout = %f and write_timeout = %f",
177                   read_timeout, write_timeout);
178         if(read_timeout < 0 && write_timeout < 0) {
179             timeout = -1;
180             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
181         } else if(read_timeout < 0) {
182             timeout = write_timeout;
183             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
184         } else if(write_timeout < 0) {
185             timeout = read_timeout;
186             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
187         } else if(read_timeout < write_timeout) {
188             timeout = read_timeout;
189             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
190         } else {
191             timeout = write_timeout;
192             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
193         }
194     } else if(e->flags & WAIT_FOR_READ) {
195         DEBUG_MSG("set READ, read_timeout = %f", read_timeout);
196         timeout = read_timeout;
197         timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
198     } else if(e->flags & WAIT_FOR_WRITE) {
199         DEBUG_MSG("set WRITE, write_timeout = %f", write_timeout);
200         timeout = write_timeout;
201         timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
202     }
203 
204   START_POLL:
205     if (timeout < 0) {
206         ms = -1;
207     } else {
208         ms = (int)(timeout * 1000 + 0.999);
209     }
210     DEBUG_MSG("select start, timeout is %f", timeout);
211     pollfd.fd = fd;
212     pollfd.events = 0;
213     pollfd.revents = 0;
214     if(e->flags & WAIT_FOR_READ) { pollfd.events |= POLLIN; }
215     if(e->flags & WAIT_FOR_WRITE) { pollfd.events |= POLLOUT; }
216     rc = poll(&pollfd, 1, ms);
217     DEBUG_MSG("poll returns %d", rc);
218     if(rc == 0) {
219         DEBUG_MSG("%s", "timeout");
220         return timeout_mode;
221     }
222 
223     if(rc < 0) {
224         DEBUG_MSG("exception: %s", strerror(errno));
225         if( errno == EINTR ) {
226             PERL_ASYNC_CHECK();
227             DEBUG_MSG("%s", "recieved interrupt. retry wait_for_event");
228             goto START_POLL;
229         }
230         return WAIT_FOR_EVENT_EXCEPTION;
231     }
232     if(self->ac && (pollfd.revents & POLLIN) != 0) {
233         DEBUG_MSG("ready to %s", "read");
234         redisAsyncHandleRead(self->ac);
235     }
236     if(self->ac && (pollfd.revents & (POLLOUT|POLLHUP)) != 0) {
237         DEBUG_MSG("ready to %s", "write");
238         redisAsyncHandleWrite(self->ac);
239     }
240     if((pollfd.revents & (POLLERR|POLLNVAL)) != 0) {
241         DEBUG_MSG(
242             "exception: %s%s",
243             (pollfd.revents & POLLERR) ? "POLLERR " : "",
244             (pollfd.revents & POLLNVAL) ? "POLLNVAL " : "");
245         return WAIT_FOR_EVENT_EXCEPTION;
246     }
247 
248     DEBUG_MSG("%s", "finish");
249     return WAIT_FOR_EVENT_OK;
250 }
251 
252 
_wait_all_responses(Redis__Fast self)253 static int _wait_all_responses(Redis__Fast self) {
254     DEBUG_MSG("%s", "start");
255     while(self->ac && self->ac->replies.tail) {
256         int res = wait_for_event(self, self->read_timeout, self->write_timeout);
257         if (res != WAIT_FOR_EVENT_OK) {
258             DEBUG_MSG("error: %d", res);
259             return res;
260         }
261     }
262     DEBUG_MSG("%s", "finish");
263     return WAIT_FOR_EVENT_OK;
264 }
265 
266 
Redis__Fast_connect_cb(redisAsyncContext * c,int status)267 static void Redis__Fast_connect_cb(redisAsyncContext* c, int status) {
268     Redis__Fast self = (Redis__Fast)c->data;
269     DEBUG_MSG("connected status = %d", status);
270     if(status != REDIS_OK) {
271         // Connection Error!!
272         // Redis context will close automatically
273         self->ac = NULL;
274     } else {
275         self->is_connected = 1;
276     }
277 }
278 
Redis__Fast_disconnect_cb(redisAsyncContext * c,int status)279 static void Redis__Fast_disconnect_cb(redisAsyncContext* c, int status) {
280     Redis__Fast self = (Redis__Fast)c->data;
281     PERL_UNUSED_VAR(status);
282     DEBUG_MSG("disconnected status = %d", status);
283     self->ac = NULL;
284 }
285 
__build_sock(Redis__Fast self)286 static redisAsyncContext* __build_sock(Redis__Fast self)
287 {
288     redisAsyncContext *ac;
289     double timeout;
290     int res;
291 
292     DEBUG_MSG("%s", "start");
293 
294     if(self->on_build_sock) {
295         dSP;
296 
297         ENTER;
298         SAVETMPS;
299 
300         PUSHMARK(SP);
301         call_sv(self->on_build_sock, G_DISCARD | G_NOARGS);
302 
303         FREETMPS;
304         LEAVE;
305     }
306 
307     if(self->path) {
308         ac = redisAsyncConnectUnix(self->path);
309     } else {
310         ac = redisAsyncConnect(self->hostname, self->port);
311     }
312 
313     if(ac == NULL) {
314         DEBUG_MSG("%s", "allocation error");
315         return NULL;
316     }
317     if(ac->err) {
318         DEBUG_MSG("connection error: %s", ac->errstr);
319 	redisAsyncFree(ac);
320         return NULL;
321     }
322     ac->data = (void*)self;
323     self->ac = ac;
324     self->is_connected = 0;
325 
326     Attach(ac);
327     redisAsyncSetConnectCallback(ac, (redisConnectCallback*)Redis__Fast_connect_cb);
328     redisAsyncSetDisconnectCallback(ac, (redisDisconnectCallback*)Redis__Fast_disconnect_cb);
329 
330     // wait to connect...
331     timeout = -1;
332     if(self->cnx_timeout) {
333         timeout = self->cnx_timeout;
334     }
335     while(!self->is_connected) {
336         res = wait_for_event(self, timeout, timeout);
337         if(self->ac == NULL) {
338             // set is_connected flag to reconnect.
339             // see https://github.com/shogo82148/Redis-Fast/issues/73
340             self->is_connected = 1;
341 
342             return NULL;
343         }
344         if(res != WAIT_FOR_EVENT_OK) {
345             DEBUG_MSG("error: %d", res);
346             redisAsyncFree(self->ac);
347             _wait_all_responses(self);
348 
349             // set is_connected flag to reconnect.
350             // see https://github.com/shogo82148/Redis-Fast/issues/73
351             self->is_connected = 1;
352 
353             return NULL;
354         }
355     }
356     if(self->on_connect){
357         dSP;
358         PUSHMARK(SP);
359         call_sv(self->on_connect, G_DISCARD | G_NOARGS);
360     }
361 
362     DEBUG_MSG("%s", "finsih");
363     return self->ac;
364 }
365 
366 
Redis__Fast_connect(Redis__Fast self)367 static void Redis__Fast_connect(Redis__Fast self) {
368     struct timeval start, end;
369 
370     DEBUG_MSG("%s", "start");
371 
372     if (self->ac) {
373         redisAsyncFree(self->ac);
374         _wait_all_responses(self);
375     }
376     self->flags = 0;
377 
378     //$self->{queue} = [];
379     self->pid = getpid();
380 
381     if(self->reconnect == 0) {
382         __build_sock(self);
383         if(!self->ac) {
384             if(self->path) {
385                 snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
386             } else {
387                 snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
388             }
389             croak("%s", self->error);
390         }
391         return ;
392     }
393 
394     // Reconnect...
395     gettimeofday(&start, NULL);
396     while (1) {
397         double elapsed_time;
398         if(__build_sock(self)) {
399             // Connected!
400             DEBUG_MSG("%s", "finish");
401             return;
402         }
403         gettimeofday(&end, NULL);
404         elapsed_time = (end.tv_sec-start.tv_sec) + 1E-6 * (end.tv_usec-start.tv_usec);
405         DEBUG_MSG("elasped time:%f, reconnect:%lf", elapsed_time, self->reconnect);
406         if( elapsed_time > self->reconnect) {
407             if(self->path) {
408                 snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
409             } else {
410                 snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
411             }
412             DEBUG_MSG("%s", "timed out");
413             croak("%s", self->error);
414             return;
415         }
416         DEBUG_MSG("%s", "failed to connect. wait...");
417         usleep(self->every);
418     }
419     DEBUG_MSG("%s", "finish");
420 }
421 
422 // reconnect if the current connection is closed.
423 // the caller must check self->ac != 0 to continue.
Redis__Fast_reconnect(Redis__Fast self)424 static void Redis__Fast_reconnect(Redis__Fast self) {
425     DEBUG_MSG("%s", "start");
426     if(self->is_connected && !self->ac && self->reconnect > 0) {
427         DEBUG_MSG("%s", "connection not found. reconnect");
428         Redis__Fast_connect(self);
429     }
430     if(!self->ac) {
431         DEBUG_MSG("%s", "Not connected to any server");
432     }
433     DEBUG_MSG("%s", "finish");
434 }
435 
Redis__Fast_decode_reply(Redis__Fast self,redisReply * reply,int collect_errors)436 static redis_fast_reply_t Redis__Fast_decode_reply(Redis__Fast self, redisReply* reply, int collect_errors) {
437     redis_fast_reply_t res = {NULL, NULL};
438 
439     switch (reply->type) {
440     case REDIS_REPLY_ERROR:
441         res.error = sv_2mortal(newSVpvn(reply->str, reply->len));
442         break;
443     case REDIS_REPLY_STRING:
444     case REDIS_REPLY_STATUS:
445         res.result = sv_2mortal(newSVpvn(reply->str, reply->len));
446         break;
447 
448     case REDIS_REPLY_INTEGER:
449         res.result = sv_2mortal(newSViv(reply->integer));
450         break;
451     case REDIS_REPLY_NIL:
452         res.result = &PL_sv_undef;
453         break;
454 
455     case REDIS_REPLY_ARRAY: {
456         AV* av = newAV();
457         size_t i;
458         res.result = sv_2mortal(newRV_noinc((SV*)av));
459 
460         for (i = 0; i < reply->elements; i++) {
461             redis_fast_reply_t elem = Redis__Fast_decode_reply(self, reply->element[i], collect_errors);
462             if(collect_errors) {
463                 AV* elem_av = (AV*)sv_2mortal((SV*)newAV());
464                 if(elem.result) {
465                     av_push(elem_av, SvREFCNT_inc(elem.result));
466                 } else {
467                     av_push(elem_av, newSV(0));
468                 }
469                 if(elem.error) {
470                     av_push(elem_av, SvREFCNT_inc(elem.error));
471                 } else {
472                     av_push(elem_av, newSV(0));
473                 }
474                 av_push(av, newRV_inc((SV*)elem_av));
475             } else {
476                 if(elem.result) {
477                     av_push(av, SvREFCNT_inc(elem.result));
478                 } else {
479                     av_push(av, newSV(0));
480                 }
481                 if(elem.error && !res.error) {
482                     res.error = elem.error;
483                 }
484             }
485         }
486         break;
487     }
488     }
489 
490     return res;
491 }
492 
Redis__Fast_call_reconnect_on_error(Redis__Fast self,redis_fast_reply_t ret,const void * command_name,STRLEN command_length)493 static int Redis__Fast_call_reconnect_on_error(Redis__Fast self, redis_fast_reply_t ret, const void *command_name, STRLEN command_length) {
494     int _need_reconnect = 0;
495     struct timeval current;
496     double current_sec;
497     SV* sv_ret;
498     SV* sv_err;
499     SV* sv_cmd;
500     int count;
501 
502     if (ret.error == NULL) {
503         return _need_reconnect;
504     }
505     if (self->reconnect_on_error == NULL) {
506         return _need_reconnect;
507     }
508 
509     gettimeofday(&current, NULL);
510     current_sec = current.tv_sec + 1E-6 * current.tv_usec;
511     if( self->next_reconnect_on_error_at < 0 ||
512             self->next_reconnect_on_error_at < current_sec) {
513         dSP;
514         ENTER;
515         SAVETMPS;
516 
517         sv_ret = ret.result ? ret.result : &PL_sv_undef;
518         sv_err = ret.error;
519         sv_cmd = sv_2mortal(newSVpvn((const char*)command_name, command_length));
520 
521         PUSHMARK(SP);
522         XPUSHs(sv_err);
523         XPUSHs(sv_ret);
524         XPUSHs(sv_cmd);
525         PUTBACK;
526 
527         count = call_sv(self->reconnect_on_error, G_SCALAR);
528 
529         SPAGAIN;
530 
531         if (count != 1) {
532             croak("[BUG] retval count should be 1\n");
533         }
534         _need_reconnect = POPi;
535 
536         PUTBACK;
537         FREETMPS;
538         LEAVE;
539     }
540 
541     return _need_reconnect;
542 }
543 
Redis__Fast_sync_reply_cb(redisAsyncContext * c,void * reply,void * privdata)544 static void Redis__Fast_sync_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
545     Redis__Fast self = (Redis__Fast)c->data;
546     redis_fast_sync_cb_t *cbt = (redis_fast_sync_cb_t*)privdata;
547     DEBUG_MSG("%p", (void*)privdata);
548     if(reply) {
549         self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
550         if(cbt->custom_decode) {
551             cbt->ret = (cbt->custom_decode)(self, (redisReply*)reply, cbt->collect_errors);
552         } else {
553             cbt->ret = Redis__Fast_decode_reply(self, (redisReply*)reply, cbt->collect_errors);
554         }
555     } else if(c->c.flags & REDIS_FREEING) {
556         DEBUG_MSG("%s", "redis freeing");
557         Safefree(cbt);
558     } else {
559         DEBUG_MSG("connect error: %s", c->errstr);
560         self->need_reconnect = 1;
561         cbt->ret.result = NULL;
562         cbt->ret.error = sv_2mortal( newSVpvn(c->errstr, strlen(c->errstr)) );
563     }
564     DEBUG_MSG("%s", "finish");
565 }
566 
Redis__Fast_async_reply_cb(redisAsyncContext * c,void * reply,void * privdata)567 static void Redis__Fast_async_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
568     Redis__Fast self = (Redis__Fast)c->data;
569     redis_fast_async_cb_t *cbt = (redis_fast_async_cb_t*)privdata;
570     DEBUG_MSG("%p, %p", reply, privdata);
571     if (reply) {
572         self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
573 
574         {
575             redis_fast_reply_t result;
576 
577             dSP;
578 
579             ENTER;
580             SAVETMPS;
581 
582             if(cbt->custom_decode) {
583                 result = (cbt->custom_decode)(self, (redisReply*)reply, cbt->collect_errors);
584             } else {
585                 result = Redis__Fast_decode_reply(self, (redisReply*)reply, cbt->collect_errors);
586             }
587 
588             if(result.result == NULL) result.result = &PL_sv_undef;
589             if(result.error == NULL) result.error = &PL_sv_undef;
590 
591             PUSHMARK(SP);
592             XPUSHs(result.result);
593             XPUSHs(result.error);
594             PUTBACK;
595 
596             call_sv(cbt->cb, G_DISCARD);
597 
598             FREETMPS;
599             LEAVE;
600         }
601 
602         {
603             if (0 < self->reconnect && !self->need_reconnect
604                 // Avoid useless cost when reconnect_on_error is not set.
605                 && self->reconnect_on_error != NULL) {
606                 redis_fast_reply_t result;
607                 if(cbt->custom_decode) {
608                     result = (cbt->custom_decode)(
609                         self, (redisReply*)reply, cbt->collect_errors
610                     );
611                 } else {
612                     result = Redis__Fast_decode_reply(
613                         self, (redisReply*)reply, cbt->collect_errors
614                     );
615                 }
616                 self->need_reconnect = Redis__Fast_call_reconnect_on_error(
617                     self, result, cbt->command_name, cbt->command_length
618                 );
619             }
620         }
621     } else {
622         if (c->c.flags & REDIS_FREEING) {
623              DEBUG_MSG("%s", "redis freeing");
624         } else {
625             DEBUG_MSG("connect error: %s", c->errstr);
626         }
627 
628         {
629             redis_fast_reply_t result;
630             const char *msg;
631 
632             dSP;
633 
634             ENTER;
635             SAVETMPS;
636 
637             result.result = &PL_sv_undef;
638             if (c->c.flags & REDIS_FREEING) {
639                 msg = "redis freeing";
640             } else {
641                 msg = c->errstr;
642             }
643             DEBUG_MSG("error: %s", msg);
644             result.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
645 
646             PUSHMARK(SP);
647             XPUSHs(result.result);
648             XPUSHs(result.error);
649             PUTBACK;
650 
651             call_sv(cbt->cb, G_DISCARD);
652 
653             FREETMPS;
654             LEAVE;
655         }
656     }
657 
658     SvREFCNT_dec(cbt->cb);
659     Safefree(cbt);
660 }
661 
Redis__Fast_subscribe_cb(redisAsyncContext * c,void * reply,void * privdata)662 static void Redis__Fast_subscribe_cb(redisAsyncContext* c, void* reply, void* privdata) {
663     int is_need_free = 0;
664     Redis__Fast self = (Redis__Fast)c->data;
665     redis_fast_subscribe_cb_t *cbt = (redis_fast_subscribe_cb_t*)privdata;
666     redisReply* r = (redisReply*)reply;
667 
668     DEBUG_MSG("%s", "start");
669     if(!cbt) {
670         DEBUG_MSG("%s", "cbt is empty finished");
671         return ;
672     }
673 
674     if (r) {
675         char* stype = r->element[0]->str;
676         int pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
677         redis_fast_reply_t res;
678 
679         dSP;
680         ENTER;
681         SAVETMPS;
682 
683         res = Redis__Fast_decode_reply(self, r, 0);
684 
685         if (strcasecmp(stype+pvariant,"subscribe") == 0) {
686             DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
687             self->is_subscriber = r->element[2]->integer;
688             self->expected_subs--;
689         } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
690             DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
691             self->is_subscriber = r->element[2]->integer;
692             is_need_free = 1;
693             self->expected_subs--;
694         } else {
695             DEBUG_MSG("%s %s", r->element[0]->str, r->element[1]->str);
696             self->proccess_sub_count++;
697         }
698 
699         if(res.result == NULL) res.result = &PL_sv_undef;
700         if(res.error == NULL) res.error = &PL_sv_undef;
701 
702         PUSHMARK(SP);
703         XPUSHs(res.result);
704         XPUSHs(res.error);
705         PUTBACK;
706 
707         call_sv(cbt->cb, G_DISCARD);
708 
709         FREETMPS;
710         LEAVE;
711     } else {
712         DEBUG_MSG("connect error: %s", c->errstr);
713         is_need_free = 1;
714     }
715 
716     if(is_need_free) {
717         // destroy private data
718         DEBUG_MSG("destroy %p", cbt);
719         if(cbt->cb) {
720             SvREFCNT_dec(cbt->cb);
721             cbt->cb = NULL;
722         }
723         Safefree(cbt);
724     }
725     DEBUG_MSG("%s", "finish");
726 }
727 
Redis__Fast_quit(Redis__Fast self)728 static void Redis__Fast_quit(Redis__Fast self) {
729     redis_fast_sync_cb_t *cbt;
730 
731     if(!self->ac) {
732         return;
733     }
734 
735     Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
736     cbt->ret.result = NULL;
737     cbt->ret.error = NULL;
738     cbt->custom_decode = NULL;
739 
740     // initialize, or self->flags will be corrupted.
741     cbt->on_flags = 0;
742     cbt->off_flags = 0;
743 
744     redisAsyncCommand(
745         self->ac, Redis__Fast_sync_reply_cb, cbt, "QUIT"
746         );
747     redisAsyncDisconnect(self->ac);
748     if(_wait_all_responses(self) == WAIT_FOR_EVENT_OK) {
749         DEBUG_MSG("%s", "wait_all_responses ok");
750         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
751     } else {
752         DEBUG_MSG("%s", "wait_all_responses not ok");
753         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
754     }
755     DEBUG_MSG("%s", "finish");
756 }
757 
Redis__Fast_run_cmd(Redis__Fast self,int collect_errors,CUSTOM_DECODE custom_decode,SV * cb,int argc,const char ** argv,size_t * argvlen)758 static redis_fast_reply_t  Redis__Fast_run_cmd(Redis__Fast self, int collect_errors, CUSTOM_DECODE custom_decode, SV* cb, int argc, const char** argv, size_t* argvlen) {
759     redis_fast_reply_t ret = {NULL, NULL};
760     int on_flags = 0, off_flags = ~0;
761 
762     DEBUG_MSG("start %s", argv[0]);
763 
764     DEBUG_MSG("pid check: previous pid is %d, now %d", self->pid, getpid());
765     if(self->pid != getpid()) {
766         DEBUG_MSG("%s", "pid changed. create new connection..");
767         Redis__Fast_connect(self);
768     }
769 
770     if(EQUALS_COMMAND(argvlen[0], argv[0], "MULTI")) {
771         on_flags = FLAG_INSIDE_TRANSACTION;
772     } else if(EQUALS_COMMAND(argvlen[0], argv[0], "EXEC") ||
773               EQUALS_COMMAND(argvlen[0], argv[0], "DISCARD")) {
774         off_flags = ~(FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH);
775     } else if(EQUALS_COMMAND(argvlen[0], argv[0], "WATCH")) {
776         on_flags = FLAG_INSIDE_WATCH;
777     } else if(EQUALS_COMMAND(argvlen[0], argv[0], "UNWATCH")) {
778         off_flags = ~FLAG_INSIDE_WATCH;
779     }
780 
781     if(cb) {
782         redis_fast_async_cb_t *cbt;
783         Newx(cbt, sizeof(redis_fast_async_cb_t), redis_fast_async_cb_t);
784         cbt->cb = SvREFCNT_inc(cb);
785         cbt->custom_decode = custom_decode;
786         cbt->collect_errors = collect_errors;
787         cbt->on_flags = on_flags;
788         cbt->off_flags = off_flags;
789         cbt->command_name = argv[0];
790         cbt->command_length = argvlen[0];
791         redisAsyncCommandArgv(
792             self->ac, Redis__Fast_async_reply_cb, cbt,
793             argc, argv, argvlen
794             );
795         ret.result = sv_2mortal(newSViv(1));
796     } else {
797         redis_fast_sync_cb_t *cbt;
798         int i, cnt = (self->reconnect == 0 ? 1 : 2);
799         int res = WAIT_FOR_EVENT_OK;
800         for(i = 0; i < cnt; i++) {
801             Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
802             self->need_reconnect = 0;
803             cbt->ret.result = NULL;
804             cbt->ret.error = NULL;
805             cbt->custom_decode = custom_decode;
806             cbt->collect_errors = collect_errors;
807             cbt->on_flags = on_flags;
808             cbt->off_flags = off_flags;
809             DEBUG_MSG("%s", "send command in sync mode");
810             redisAsyncCommandArgv(
811                 self->ac, Redis__Fast_sync_reply_cb, cbt,
812                 argc, argv, argvlen
813                 );
814             DEBUG_MSG("%s", "waiting response");
815             res = _wait_all_responses(self);
816             if(res == WAIT_FOR_EVENT_OK && self->need_reconnect == 0) {
817                 int _need_reconnect = 0;
818                 if (1 < cnt - i) {
819                     _need_reconnect = Redis__Fast_call_reconnect_on_error(
820                         self, cbt->ret, argv[0], argvlen[0]
821                     );
822                     // Should be quit before reconnect
823                     if (_need_reconnect) {
824                         Redis__Fast_quit(self);
825                     }
826                 }
827                 if (!_need_reconnect) {
828                     ret = cbt->ret;
829                     if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
830                     DEBUG_MSG("finish %s", argv[0]);
831                     return ret;
832                 }
833             }
834 
835             if( res == WAIT_FOR_EVENT_READ_TIMEOUT ) break;
836 
837             if(self->flags & (FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH)) {
838                 char *msg = "reconnect disabled inside transaction or watch";
839                 DEBUG_MSG("error: %s", msg);
840                 ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
841                 return ret;
842             }
843 
844             Redis__Fast_reconnect(self);
845             if(!self->ac) {
846                 char *msg = "Not connected to any server";
847                 DEBUG_MSG("error: %s", msg);
848                 ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
849                 return ret;
850             }
851         }
852 
853         if( res == WAIT_FOR_EVENT_OK && (cbt->ret.result || cbt->ret.error) ) Safefree(cbt);
854         // else destructor will release cbt
855 
856         if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) {
857             snprintf(self->error, MAX_ERROR_SIZE, "Error while reading from Redis server: %s", strerror(EAGAIN));
858             errno = EAGAIN;
859             DEBUG_MSG("error: %s", self->error);
860             ret.error = sv_2mortal(newSVpvn(self->error, strlen(self->error)));
861             return ret;
862         }
863         if(!self->ac) {
864             char *msg = "Not connected to any server";
865             DEBUG_MSG("error: %s", msg);
866             ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
867             return ret;
868         }
869     }
870     DEBUG_MSG("Finish %s", argv[0]);
871     return ret;
872 }
873 
Redis__Fast_keys_custom_decode(Redis__Fast self,redisReply * reply,int collect_errors)874 static redis_fast_reply_t Redis__Fast_keys_custom_decode(Redis__Fast self, redisReply* reply, int collect_errors) {
875     // TODO: Support redis <= 1.2.6
876     return Redis__Fast_decode_reply(self, reply, collect_errors);
877 }
878 
Redis__Fast_info_custom_decode(Redis__Fast self,redisReply * reply,int collect_errors)879 static redis_fast_reply_t Redis__Fast_info_custom_decode(Redis__Fast self, redisReply* reply, int collect_errors) {
880     redis_fast_reply_t res = {NULL, NULL};
881 
882     if(reply->type == REDIS_REPLY_STRING ||
883        reply->type == REDIS_REPLY_STATUS) {
884 
885         HV* hv = newHV();
886         char* str = reply->str;
887         size_t len = reply->len;
888         res.result = sv_2mortal(newRV_noinc((SV*)hv));
889 
890         while(len != 0) {
891             const char* line = (char*)memchr(str, '\r', len);
892             const char* sep;
893             size_t linelen;
894             if(line == NULL) {
895                 linelen = len;
896             } else {
897                 linelen = line - str;
898             }
899             sep = (char*)memchr(str, ':', linelen);
900             if(str[0] != '#' && sep != NULL) {
901                 SV* val;
902                 SV** ret;
903                 size_t keylen;
904                 keylen = sep - str;
905                 val = sv_2mortal(newSVpvn(sep + 1, linelen - keylen - 1));
906                 ret = hv_store(hv, str, keylen, SvREFCNT_inc(val), 0);
907                 if (ret == NULL) {
908                     SvREFCNT_dec(val);
909                     croak("failed to hv_store");
910                 }
911             }
912             if(line == NULL) {
913                 break;
914             } else {
915                 len -= linelen + 2;
916                 str += linelen + 2;
917             }
918         }
919     } else {
920         res = Redis__Fast_decode_reply(self, reply, collect_errors);
921     }
922 
923     return res;
924 }
925 
926 MODULE = Redis::Fast		PACKAGE = Redis::Fast
927 
928 SV*
929 _new(char* cls);
930 PREINIT:
931 redis_fast_t* self;
932 CODE:
933 {
934     Newxz(self, sizeof(redis_fast_t), redis_fast_t);
935     DEBUG_MSG("%s", "start");
936     self->error = (char*)malloc(MAX_ERROR_SIZE);
937     self->reconnect_on_error = NULL;
938     self->next_reconnect_on_error_at = -1;
939     ST(0) = sv_newmortal();
940     sv_setref_pv(ST(0), cls, (void*)self);
941     DEBUG_MSG("return %p", ST(0));
942     XSRETURN(1);
943 }
944 OUTPUT:
945     RETVAL
946 
947 double
__set_reconnect(Redis::Fast self,double val)948 __set_reconnect(Redis::Fast self, double val)
949 CODE:
950 {
951     RETVAL = self->reconnect = val;
952 }
953 OUTPUT:
954     RETVAL
955 
956 
957 double
__get_reconnect(Redis::Fast self)958 __get_reconnect(Redis::Fast self)
959 CODE:
960 {
961     RETVAL = self->reconnect;
962 }
963 OUTPUT:
964     RETVAL
965 
966 
967 int
__set_every(Redis::Fast self,int val)968 __set_every(Redis::Fast self, int val)
969 CODE:
970 {
971     RETVAL = self->every = val;
972 }
973 OUTPUT:
974     RETVAL
975 
976 
977 int
__get_every(Redis::Fast self)978 __get_every(Redis::Fast self)
979 CODE:
980 {
981     RETVAL = self->every;
982 }
983 OUTPUT:
984     RETVAL
985 
986 int
__set_debug(Redis::Fast self,int val)987 __set_debug(Redis::Fast self, int val)
988 CODE:
989 {
990     RETVAL = self->debug = val;
991 }
992 OUTPUT:
993     RETVAL
994 
995 double
__set_cnx_timeout(Redis::Fast self,double val)996 __set_cnx_timeout(Redis::Fast self, double val)
997 CODE:
998 {
999     RETVAL = self->cnx_timeout = val;
1000 }
1001 OUTPUT:
1002     RETVAL
1003 
1004 double
__get_cnx_timeout(Redis::Fast self)1005 __get_cnx_timeout(Redis::Fast self)
1006 CODE:
1007 {
1008     RETVAL = self->cnx_timeout;
1009 }
1010 OUTPUT:
1011     RETVAL
1012 
1013 
1014 double
__set_read_timeout(Redis::Fast self,double val)1015 __set_read_timeout(Redis::Fast self, double val)
1016 CODE:
1017 {
1018     RETVAL = self->read_timeout = val;
1019 }
1020 OUTPUT:
1021     RETVAL
1022 
1023 double
__get_read_timeout(Redis::Fast self)1024 __get_read_timeout(Redis::Fast self)
1025 CODE:
1026 {
1027     RETVAL = self->read_timeout;
1028 }
1029 OUTPUT:
1030     RETVAL
1031 
1032 
1033 double
__set_write_timeout(Redis::Fast self,double val)1034 __set_write_timeout(Redis::Fast self, double val)
1035 CODE:
1036 {
1037     RETVAL = self->write_timeout = val;
1038 }
1039 OUTPUT:
1040     RETVAL
1041 
1042 double
__get_write_timeout(Redis::Fast self)1043 __get_write_timeout(Redis::Fast self)
1044 CODE:
1045 {
1046     RETVAL = self->write_timeout;
1047 }
1048 OUTPUT:
1049     RETVAL
1050 
1051 
1052 int
__set_current_database(Redis::Fast self,int val)1053 __set_current_database(Redis::Fast self, int val)
1054 CODE:
1055 {
1056     RETVAL = self->current_database = val;
1057 }
1058 OUTPUT:
1059     RETVAL
1060 
1061 
1062 int
__get_current_database(Redis::Fast self)1063 __get_current_database(Redis::Fast self)
1064 CODE:
1065 {
1066     RETVAL = self->current_database;
1067 }
1068 OUTPUT:
1069     RETVAL
1070 
1071 
1072 int
__sock(Redis::Fast self)1073 __sock(Redis::Fast self)
1074 CODE:
1075 {
1076     RETVAL = self->ac ? self->ac->c.fd : 0;
1077 }
1078 OUTPUT:
1079     RETVAL
1080 
1081 
1082 int
__get_port(Redis::Fast self)1083 __get_port(Redis::Fast self)
1084 CODE:
1085 {
1086     struct sockaddr_in addr;
1087     socklen_t len;
1088     len = sizeof( addr );
1089     getsockname( self->ac->c.fd, ( struct sockaddr *)&addr, &len );
1090     RETVAL = addr.sin_port;
1091 }
1092 OUTPUT:
1093     RETVAL
1094 
1095 
1096 void
__set_on_connect(Redis::Fast self,SV * func)1097 __set_on_connect(Redis::Fast self, SV* func)
1098 CODE:
1099 {
1100     self->on_connect = SvREFCNT_inc(func);
1101 }
1102 
1103 void
__set_on_build_sock(Redis::Fast self,SV * func)1104 __set_on_build_sock(Redis::Fast self, SV* func)
1105 CODE:
1106 {
1107     self->on_build_sock = SvREFCNT_inc(func);
1108 }
1109 
1110 void
__set_data(Redis::Fast self,SV * data)1111 __set_data(Redis::Fast self, SV* data)
1112 CODE:
1113 {
1114     self->data = SvREFCNT_inc(data);
1115 }
1116 
1117 void
__get_data(Redis::Fast self)1118 __get_data(Redis::Fast self)
1119 CODE:
1120 {
1121     ST(0) = self->data;
1122     XSRETURN(1);
1123 }
1124 
1125 void
__set_reconnect_on_error(Redis::Fast self,SV * func)1126 __set_reconnect_on_error(Redis::Fast self, SV* func)
1127 CODE:
1128 {
1129     self->reconnect_on_error = SvREFCNT_inc(func);
1130 }
1131 
1132 double
__set_next_reconnect_on_error_at(Redis::Fast self,double val)1133 __set_next_reconnect_on_error_at(Redis::Fast self, double val)
1134 CODE:
1135 {
1136     struct timeval current;
1137     double current_sec;
1138 
1139     if ( -1 < val ) {
1140         gettimeofday(&current, NULL);
1141         current_sec = current.tv_sec + 1E-6 * current.tv_usec;
1142         val += current_sec;
1143     }
1144 
1145     RETVAL = self->next_reconnect_on_error_at = val;
1146 }
1147 OUTPUT:
1148     RETVAL
1149 
1150 void
is_subscriber(Redis::Fast self)1151 is_subscriber(Redis::Fast self)
1152 CODE:
1153 {
1154     ST(0) = sv_2mortal(newSViv(self->is_subscriber));
1155     XSRETURN(1);
1156 }
1157 
1158 
1159 void
1160 DESTROY(Redis::Fast self);
1161 CODE:
1162 {
1163     DEBUG_MSG("%s", "start");
1164     if (self->ac) {
1165         DEBUG_MSG("%s", "free ac");
1166         redisAsyncFree(self->ac);
1167         _wait_all_responses(self);
1168     }
1169 
1170     if(self->hostname) {
1171         DEBUG_MSG("%s", "free hostname");
1172         free(self->hostname);
1173         self->hostname = NULL;
1174     }
1175 
1176     if(self->path) {
1177         DEBUG_MSG("%s", "free path");
1178         free(self->path);
1179         self->path = NULL;
1180     }
1181 
1182     if(self->error) {
1183         DEBUG_MSG("%s", "free error");
1184         free(self->error);
1185         self->error = NULL;
1186     }
1187 
1188     if(self->on_connect) {
1189         DEBUG_MSG("%s", "free on_connect");
1190         SvREFCNT_dec(self->on_connect);
1191         self->on_connect = NULL;
1192     }
1193 
1194     if(self->on_build_sock) {
1195         DEBUG_MSG("%s", "free on_build_sock");
1196         SvREFCNT_dec(self->on_build_sock);
1197         self->on_build_sock = NULL;
1198     }
1199 
1200     if(self->data) {
1201         DEBUG_MSG("%s", "free data");
1202         SvREFCNT_dec(self->data);
1203         self->data = NULL;
1204     }
1205 
1206     if(self->reconnect_on_error) {
1207         DEBUG_MSG("%s", "free reconnect_on_error");
1208         SvREFCNT_dec(self->reconnect_on_error);
1209         self->reconnect_on_error = NULL;
1210     }
1211 
1212     Safefree(self);
1213     DEBUG_MSG("%s", "finish");
1214 }
1215 
1216 
1217 void
1218 __connection_info(Redis::Fast self, char* hostname, int port = 6379)
1219 CODE:
1220 {
1221     if(self->hostname) {
1222         free(self->hostname);
1223         self->hostname = NULL;
1224     }
1225 
1226     if(self->path) {
1227         free(self->path);
1228         self->path = NULL;
1229     }
1230 
1231     if(hostname) {
1232         self->hostname = (char*)malloc(strlen(hostname) + 1);
1233         strcpy(self->hostname, hostname);
1234     }
1235 
1236     self->port = port;
1237 }
1238 
1239 void
__connection_info_unix(Redis::Fast self,char * path)1240 __connection_info_unix(Redis::Fast self, char* path)
1241 CODE:
1242 {
1243     if(self->hostname) {
1244         free(self->hostname);
1245         self->hostname = NULL;
1246     }
1247 
1248     if(self->path) {
1249         free(self->path);
1250         self->path = NULL;
1251     }
1252 
1253     if(path) {
1254         self->path = (char*)malloc(strlen(path) + 1);
1255         strcpy(self->path, path);
1256     }
1257 }
1258 
1259 
1260 void
connect(Redis::Fast self)1261 connect(Redis::Fast self)
1262 CODE:
1263 {
1264     Redis__Fast_connect(self);
1265 }
1266 
1267 void
wait_all_responses(Redis::Fast self)1268 wait_all_responses(Redis::Fast self)
1269 CODE:
1270 {
1271     int res = _wait_all_responses(self);
1272     if(res != WAIT_FOR_EVENT_OK) {
1273         croak("Error while reading from Redis server");
1274     }
1275 
1276     if (0 < self->reconnect && self->need_reconnect) {
1277         // Should be quit before reconnect
1278         Redis__Fast_quit(self);
1279         Redis__Fast_reconnect(self);
1280         self->need_reconnect = 0;
1281     }
1282 }
1283 
1284 void
wait_one_response(Redis::Fast self)1285 wait_one_response(Redis::Fast self)
1286 CODE:
1287 {
1288     int res = _wait_all_responses(self);
1289     if(res != WAIT_FOR_EVENT_OK) {
1290         croak("Error while reading from Redis server");
1291     }
1292 
1293     if (0 < self->reconnect && self->need_reconnect) {
1294         // Should be quit before reconnect
1295         Redis__Fast_quit(self);
1296         Redis__Fast_reconnect(self);
1297         self->need_reconnect = 0;
1298     }
1299 }
1300 
1301 void
1302 __std_cmd(Redis::Fast self, ...)
1303 PREINIT:
1304     redis_fast_reply_t ret;
1305     SV* cb;
1306     char** argv;
1307     size_t* argvlen;
1308     STRLEN len;
1309     int argc, i, collect_errors;
1310 CODE:
1311 {
1312     Redis__Fast_reconnect(self);
1313     if(!self->ac) {
1314         croak("Not connected to any server");
1315     }
1316 
1317     cb = ST(items - 1);
1318     if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1319         argc = items - 2;
1320     } else {
1321         cb = NULL;
1322         argc = items - 1;
1323     }
1324     Newx(argv, sizeof(char*) * argc, char*);
1325     Newx(argvlen, sizeof(size_t) * argc, size_t);
1326 
1327     for (i = 0; i < argc; i++) {
1328         if(!sv_utf8_downgrade(ST(i + 1), 1)) {
1329             croak("command sent is not an octet sequence in the native encoding (Latin-1). Consider using debug mode to see the command itself.");
1330         }
1331         argv[i] = SvPV(ST(i + 1), len);
1332         argvlen[i] = len;
1333     }
1334 
1335     collect_errors = 0;
1336     if(cb && EQUALS_COMMAND(argvlen[0], argv[0], "EXEC"))
1337         collect_errors = 1;
1338 
1339     ret = Redis__Fast_run_cmd(self, collect_errors, NULL, cb, argc, (const char**)argv, argvlen);
1340 
1341     Safefree(argv);
1342     Safefree(argvlen);
1343 
1344     ST(0) = ret.result ? ret.result : &PL_sv_undef;
1345     ST(1) = ret.error ? ret.error : &PL_sv_undef;
1346     XSRETURN(2);
1347 }
1348 
1349 
1350 void
__quit(Redis::Fast self)1351 __quit(Redis::Fast self)
1352 CODE:
1353 {
1354     DEBUG_MSG("%s", "start QUIT");
1355     if(self->ac) {
1356         Redis__Fast_quit(self);
1357         ST(0) = sv_2mortal(newSViv(1));
1358         XSRETURN(1);
1359     } else {
1360         DEBUG_MSG("%s", "finish. there is no connection.");
1361         XSRETURN(0);
1362     }
1363 }
1364 
1365 
1366 void
__shutdown(Redis::Fast self)1367 __shutdown(Redis::Fast self)
1368 CODE:
1369 {
1370     DEBUG_MSG("%s", "start SHUTDOWN");
1371     if(self->ac) {
1372         redisAsyncCommand(
1373             self->ac, NULL, NULL, "SHUTDOWN"
1374             );
1375         redisAsyncDisconnect(self->ac);
1376         _wait_all_responses(self);
1377         self->is_connected = 0;
1378         ST(0) = sv_2mortal(newSViv(1));
1379         XSRETURN(1);
1380     } else {
1381         DEBUG_MSG("%s", "redis server has alread shutdown");
1382         XSRETURN(0);
1383     }
1384 }
1385 
1386 
1387 void
1388 __keys(Redis::Fast self, ...)
1389 PREINIT:
1390     redis_fast_reply_t ret;
1391     SV* cb;
1392     char** argv;
1393     size_t* argvlen;
1394     STRLEN len;
1395     int argc, i;
1396 CODE:
1397 {
1398     Redis__Fast_reconnect(self);
1399     if(!self->ac) {
1400         croak("Not connected to any server");
1401     }
1402 
1403     cb = ST(items - 1);
1404     if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1405         argc = items - 1;
1406     } else {
1407         cb = NULL;
1408         argc = items;
1409     }
1410     Newx(argv, sizeof(char*) * argc, char*);
1411     Newx(argvlen, sizeof(size_t) * argc, size_t);
1412 
1413     argv[0] = "KEYS";
1414     argvlen[0] = 4;
1415     for (i = 1; i < argc; i++) {
1416         argv[i] = SvPV(ST(i), len);
1417         argvlen[i] = len;
1418     }
1419 
1420     ret = Redis__Fast_run_cmd(self, 0, Redis__Fast_keys_custom_decode, cb, argc, (const char**)argv, argvlen);
1421     Safefree(argv);
1422     Safefree(argvlen);
1423 
1424     ST(0) = ret.result ? ret.result : &PL_sv_undef;
1425     ST(1) = ret.error ? ret.error : &PL_sv_undef;
1426     XSRETURN(2);
1427 }
1428 
1429 
1430 void
1431 __info(Redis::Fast self, ...)
1432 PREINIT:
1433     redis_fast_reply_t ret;
1434     SV* cb;
1435     char** argv;
1436     size_t* argvlen;
1437     STRLEN len;
1438     int argc, i;
1439 CODE:
1440 {
1441     Redis__Fast_reconnect(self);
1442     if(!self->ac) {
1443         char *msg = "Not connected to any server";
1444         ST(0) = &PL_sv_undef;
1445         ST(1) = sv_2mortal(newSVpvn(msg, strlen(msg)));
1446         XSRETURN(2);
1447     }
1448 
1449     cb = ST(items - 1);
1450     if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1451         argc = items - 1;
1452     } else {
1453         cb = NULL;
1454         argc = items;
1455     }
1456     Newx(argv, sizeof(char*) * argc, char*);
1457     Newx(argvlen, sizeof(size_t) * argc, size_t);
1458 
1459     argv[0] = "INFO";
1460     argvlen[0] = 4;
1461     for (i = 1; i < argc; i++) {
1462         argv[i] = SvPV(ST(i), len);
1463         argvlen[i] = len;
1464     }
1465 
1466     ret = Redis__Fast_run_cmd(self, 0, Redis__Fast_info_custom_decode, cb, argc, (const char**)argv, argvlen);
1467     Safefree(argv);
1468     Safefree(argvlen);
1469 
1470     ST(0) = ret.result ? ret.result : &PL_sv_undef;
1471     ST(1) = ret.error ? ret.error : &PL_sv_undef;
1472     XSRETURN(2);
1473 }
1474 
1475 
1476 void
1477 __send_subscription_cmd(Redis::Fast self, ...)
1478 PREINIT:
1479     SV* cb;
1480     char** argv;
1481     size_t* argvlen;
1482     STRLEN len;
1483     int argc, i;
1484     redis_fast_subscribe_cb_t* cbt;
1485     int pvariant;
1486 CODE:
1487 {
1488     int cnt = (self->reconnect == 0 ? 1 : 2);
1489 
1490     DEBUG_MSG("%s", "start");
1491 
1492     Redis__Fast_reconnect(self);
1493     if(!self->ac) {
1494         croak("Not connected to any server");
1495     }
1496 
1497     if(!self->is_subscriber) {
1498         _wait_all_responses(self);
1499     }
1500     cb = ST(items - 1);
1501     if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1502         argc = items - 2;
1503     } else {
1504         cb = NULL;
1505         argc = items - 1;
1506     }
1507     Newx(argv, sizeof(char*) * argc, char*);
1508     Newx(argvlen, sizeof(size_t) * argc, size_t);
1509 
1510     for (i = 0; i < argc; i++) {
1511         argv[i] = SvPV(ST(i+1), len);
1512         argvlen[i] = len;
1513         DEBUG_MSG("argv[%d] = %s", i, argv[i]);
1514     }
1515 
1516     for(i = 0; i < cnt; i++) {
1517         pvariant = tolower(argv[0][0]) == 'p';
1518         if (strcasecmp(argv[0]+pvariant,"unsubscribe") != 0) {
1519             DEBUG_MSG("%s", "command is not unsubscribe");
1520             Newx(cbt, sizeof(redis_fast_subscribe_cb_t), redis_fast_subscribe_cb_t);
1521             cbt->self = self;
1522             cbt->cb = SvREFCNT_inc(cb);
1523         } else {
1524             DEBUG_MSG("%s", "command is unsubscribe");
1525             cbt = NULL;
1526         }
1527         redisAsyncCommandArgv(
1528             self->ac, cbt ? Redis__Fast_subscribe_cb : NULL, cbt,
1529             argc, (const char**)argv, argvlen
1530             );
1531         self->expected_subs = argc - 1;
1532         while(self->expected_subs > 0 && wait_for_event(self, self->read_timeout, self->write_timeout) == WAIT_FOR_EVENT_OK) ;
1533         if(self->expected_subs == 0) break;
1534         Redis__Fast_reconnect(self);
1535         if(!self->ac) {
1536             Safefree(argv);
1537             Safefree(argvlen);
1538             croak("Not connected to any server");
1539         }
1540     }
1541 
1542     Safefree(argv);
1543     Safefree(argvlen);
1544     DEBUG_MSG("%s", "finish");
1545     XSRETURN(0);
1546 }
1547 
1548 void
1549 wait_for_messages(Redis::Fast self, double timeout = -1)
1550 CODE:
1551 {
1552     int i, cnt = (self->reconnect == 0 ? 1 : 2);
1553     int res = WAIT_FOR_EVENT_OK;
1554     DEBUG_MSG("%s", "start");
1555     self->proccess_sub_count = 0;
1556     for(i = 0; i < cnt; i++) {
1557         while((res = wait_for_event(self, timeout, timeout)) == WAIT_FOR_EVENT_OK) ;
1558         if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) break;
1559         Redis__Fast_reconnect(self);
1560         if(!self->ac) {
1561             croak("Not connected to any server");
1562         }
1563     }
1564     if(res == WAIT_FOR_EVENT_EXCEPTION) {
1565         if(!self->ac) {
1566             DEBUG_MSG("%s", "Connection not found");
1567             croak("EOF from server");
1568         } else if(self->ac->c.err == REDIS_ERR_EOF) {
1569             DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1570             croak("EOF from server");
1571         } else {
1572             DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1573             snprintf(self->error, MAX_ERROR_SIZE, "[WAIT_FOR_MESSAGES] %s", self->ac->c.errstr);
1574             croak("%s", self->error);
1575         }
1576     }
1577     ST(0) = sv_2mortal(newSViv(self->proccess_sub_count));
1578     DEBUG_MSG("finish with %d", res);
1579     XSRETURN(1);
1580 }
1581 
1582 void
1583 __wait_for_event(Redis::Fast self, double timeout = -1)
1584 CODE:
1585 {
1586     DEBUG_MSG("%s", "start");
1587     wait_for_event(self, timeout, timeout);
1588     DEBUG_MSG("%s", "finish");
1589     XSRETURN(0);
1590 }
1591