1 /*
2  * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3  * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4  *
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  *   * Redistributions of source code must retain the above copyright notice,
11  *     this list of conditions and the following disclaimer.
12  *   * Redistributions in binary form must reproduce the above copyright
13  *     notice, this list of conditions and the following disclaimer in the
14  *     documentation and/or other materials provided with the distribution.
15  *   * Neither the name of Redis nor the names of its contributors may be used
16  *     to endorse or promote products derived from this software without
17  *     specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 #include "fmacros.h"
33 #include "alloc.h"
34 #include <stdlib.h>
35 #include <string.h>
36 #ifndef _MSC_VER
37 #include <strings.h>
38 #endif
39 #include <assert.h>
40 #include <ctype.h>
41 #include <errno.h>
42 #include "async.h"
43 #include "net.h"
44 #include "dict.c"
45 #include "sds.h"
46 #include "win32.h"
47 
48 #include "async_private.h"
49 
50 /* Forward declarations of hiredis.c functions */
51 int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
52 void __redisSetError(redisContext *c, int type, const char *str);
53 
54 /* Functions managing dictionary of callbacks for pub/sub. */
callbackHash(const void * key)55 static unsigned int callbackHash(const void *key) {
56     return dictGenHashFunction((const unsigned char *)key,
57                                sdslen((const sds)key));
58 }
59 
callbackValDup(void * privdata,const void * src)60 static void *callbackValDup(void *privdata, const void *src) {
61     ((void) privdata);
62     redisCallback *dup;
63 
64     dup = hi_malloc(sizeof(*dup));
65     if (dup == NULL)
66         return NULL;
67 
68     memcpy(dup,src,sizeof(*dup));
69     return dup;
70 }
71 
callbackKeyCompare(void * privdata,const void * key1,const void * key2)72 static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
73     int l1, l2;
74     ((void) privdata);
75 
76     l1 = sdslen((const sds)key1);
77     l2 = sdslen((const sds)key2);
78     if (l1 != l2) return 0;
79     return memcmp(key1,key2,l1) == 0;
80 }
81 
callbackKeyDestructor(void * privdata,void * key)82 static void callbackKeyDestructor(void *privdata, void *key) {
83     ((void) privdata);
84     sdsfree((sds)key);
85 }
86 
callbackValDestructor(void * privdata,void * val)87 static void callbackValDestructor(void *privdata, void *val) {
88     ((void) privdata);
89     hi_free(val);
90 }
91 
92 static dictType callbackDict = {
93     callbackHash,
94     NULL,
95     callbackValDup,
96     callbackKeyCompare,
97     callbackKeyDestructor,
98     callbackValDestructor
99 };
100 
redisAsyncInitialize(redisContext * c)101 static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
102     redisAsyncContext *ac;
103     dict *channels = NULL, *patterns = NULL;
104 
105     channels = dictCreate(&callbackDict,NULL);
106     if (channels == NULL)
107         goto oom;
108 
109     patterns = dictCreate(&callbackDict,NULL);
110     if (patterns == NULL)
111         goto oom;
112 
113     ac = hi_realloc(c,sizeof(redisAsyncContext));
114     if (ac == NULL)
115         goto oom;
116 
117     c = &(ac->c);
118 
119     /* The regular connect functions will always set the flag REDIS_CONNECTED.
120      * For the async API, we want to wait until the first write event is
121      * received up before setting this flag, so reset it here. */
122     c->flags &= ~REDIS_CONNECTED;
123 
124     ac->err = 0;
125     ac->errstr = NULL;
126     ac->data = NULL;
127     ac->dataCleanup = NULL;
128 
129     ac->ev.data = NULL;
130     ac->ev.addRead = NULL;
131     ac->ev.delRead = NULL;
132     ac->ev.addWrite = NULL;
133     ac->ev.delWrite = NULL;
134     ac->ev.cleanup = NULL;
135     ac->ev.scheduleTimer = NULL;
136 
137     ac->onConnect = NULL;
138     ac->onDisconnect = NULL;
139 
140     ac->replies.head = NULL;
141     ac->replies.tail = NULL;
142     ac->sub.invalid.head = NULL;
143     ac->sub.invalid.tail = NULL;
144     ac->sub.channels = channels;
145     ac->sub.patterns = patterns;
146 
147     return ac;
148 oom:
149     if (channels) dictRelease(channels);
150     if (patterns) dictRelease(patterns);
151     return NULL;
152 }
153 
154 /* We want the error field to be accessible directly instead of requiring
155  * an indirection to the redisContext struct. */
__redisAsyncCopyError(redisAsyncContext * ac)156 static void __redisAsyncCopyError(redisAsyncContext *ac) {
157     if (!ac)
158         return;
159 
160     redisContext *c = &(ac->c);
161     ac->err = c->err;
162     ac->errstr = c->errstr;
163 }
164 
redisAsyncConnectWithOptions(const redisOptions * options)165 redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
166     redisOptions myOptions = *options;
167     redisContext *c;
168     redisAsyncContext *ac;
169 
170     /* Clear any erroneously set sync callback and flag that we don't want to
171      * use freeReplyObject by default. */
172     myOptions.push_cb = NULL;
173     myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
174 
175     myOptions.options |= REDIS_OPT_NONBLOCK;
176     c = redisConnectWithOptions(&myOptions);
177     if (c == NULL) {
178         return NULL;
179     }
180 
181     ac = redisAsyncInitialize(c);
182     if (ac == NULL) {
183         redisFree(c);
184         return NULL;
185     }
186 
187     /* Set any configured async push handler */
188     redisAsyncSetPushCallback(ac, myOptions.async_push_cb);
189 
190     __redisAsyncCopyError(ac);
191     return ac;
192 }
193 
redisAsyncConnect(const char * ip,int port)194 redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
195     redisOptions options = {0};
196     REDIS_OPTIONS_SET_TCP(&options, ip, port);
197     return redisAsyncConnectWithOptions(&options);
198 }
199 
redisAsyncConnectBind(const char * ip,int port,const char * source_addr)200 redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
201                                          const char *source_addr) {
202     redisOptions options = {0};
203     REDIS_OPTIONS_SET_TCP(&options, ip, port);
204     options.endpoint.tcp.source_addr = source_addr;
205     return redisAsyncConnectWithOptions(&options);
206 }
207 
redisAsyncConnectBindWithReuse(const char * ip,int port,const char * source_addr)208 redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
209                                                   const char *source_addr) {
210     redisOptions options = {0};
211     REDIS_OPTIONS_SET_TCP(&options, ip, port);
212     options.options |= REDIS_OPT_REUSEADDR;
213     options.endpoint.tcp.source_addr = source_addr;
214     return redisAsyncConnectWithOptions(&options);
215 }
216 
redisAsyncConnectUnix(const char * path)217 redisAsyncContext *redisAsyncConnectUnix(const char *path) {
218     redisOptions options = {0};
219     REDIS_OPTIONS_SET_UNIX(&options, path);
220     return redisAsyncConnectWithOptions(&options);
221 }
222 
redisAsyncSetConnectCallback(redisAsyncContext * ac,redisConnectCallback * fn)223 int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
224     if (ac->onConnect == NULL) {
225         ac->onConnect = fn;
226 
227         /* The common way to detect an established connection is to wait for
228          * the first write event to be fired. This assumes the related event
229          * library functions are already set. */
230         _EL_ADD_WRITE(ac);
231         return REDIS_OK;
232     }
233     return REDIS_ERR;
234 }
235 
redisAsyncSetDisconnectCallback(redisAsyncContext * ac,redisDisconnectCallback * fn)236 int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
237     if (ac->onDisconnect == NULL) {
238         ac->onDisconnect = fn;
239         return REDIS_OK;
240     }
241     return REDIS_ERR;
242 }
243 
244 /* Helper functions to push/shift callbacks */
__redisPushCallback(redisCallbackList * list,redisCallback * source)245 static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
246     redisCallback *cb;
247 
248     /* Copy callback from stack to heap */
249     cb = hi_malloc(sizeof(*cb));
250     if (cb == NULL)
251         return REDIS_ERR_OOM;
252 
253     if (source != NULL) {
254         memcpy(cb,source,sizeof(*cb));
255         cb->next = NULL;
256     }
257 
258     /* Store callback in list */
259     if (list->head == NULL)
260         list->head = cb;
261     if (list->tail != NULL)
262         list->tail->next = cb;
263     list->tail = cb;
264     return REDIS_OK;
265 }
266 
__redisShiftCallback(redisCallbackList * list,redisCallback * target)267 static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
268     redisCallback *cb = list->head;
269     if (cb != NULL) {
270         list->head = cb->next;
271         if (cb == list->tail)
272             list->tail = NULL;
273 
274         /* Copy callback from heap to stack */
275         if (target != NULL)
276             memcpy(target,cb,sizeof(*cb));
277         hi_free(cb);
278         return REDIS_OK;
279     }
280     return REDIS_ERR;
281 }
282 
__redisRunCallback(redisAsyncContext * ac,redisCallback * cb,redisReply * reply)283 static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
284     redisContext *c = &(ac->c);
285     if (cb->fn != NULL) {
286         c->flags |= REDIS_IN_CALLBACK;
287         cb->fn(ac,reply,cb->privdata);
288         c->flags &= ~REDIS_IN_CALLBACK;
289     }
290 }
291 
__redisRunPushCallback(redisAsyncContext * ac,redisReply * reply)292 static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
293     if (ac->push_cb != NULL) {
294         ac->c.flags |= REDIS_IN_CALLBACK;
295         ac->push_cb(ac, reply);
296         ac->c.flags &= ~REDIS_IN_CALLBACK;
297     }
298 }
299 
300 /* Helper function to free the context. */
__redisAsyncFree(redisAsyncContext * ac)301 static void __redisAsyncFree(redisAsyncContext *ac) {
302     redisContext *c = &(ac->c);
303     redisCallback cb;
304     dictIterator *it;
305     dictEntry *de;
306 
307     /* Execute pending callbacks with NULL reply. */
308     while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
309         __redisRunCallback(ac,&cb,NULL);
310 
311     /* Execute callbacks for invalid commands */
312     while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
313         __redisRunCallback(ac,&cb,NULL);
314 
315     /* Run subscription callbacks with NULL reply */
316     if (ac->sub.channels) {
317         it = dictGetIterator(ac->sub.channels);
318         if (it != NULL) {
319             while ((de = dictNext(it)) != NULL)
320                 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
321             dictReleaseIterator(it);
322         }
323 
324         dictRelease(ac->sub.channels);
325     }
326 
327     if (ac->sub.patterns) {
328         it = dictGetIterator(ac->sub.patterns);
329         if (it != NULL) {
330             while ((de = dictNext(it)) != NULL)
331                 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
332             dictReleaseIterator(it);
333         }
334 
335         dictRelease(ac->sub.patterns);
336     }
337 
338     /* Signal event lib to clean up */
339     _EL_CLEANUP(ac);
340 
341     /* Execute disconnect callback. When redisAsyncFree() initiated destroying
342      * this context, the status will always be REDIS_OK. */
343     if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
344         if (c->flags & REDIS_FREEING) {
345             ac->onDisconnect(ac,REDIS_OK);
346         } else {
347             ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
348         }
349     }
350 
351     if (ac->dataCleanup) {
352         ac->dataCleanup(ac->data);
353     }
354 
355     /* Cleanup self */
356     redisFree(c);
357 }
358 
359 /* Free the async context. When this function is called from a callback,
360  * control needs to be returned to redisProcessCallbacks() before actual
361  * free'ing. To do so, a flag is set on the context which is picked up by
362  * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
redisAsyncFree(redisAsyncContext * ac)363 void redisAsyncFree(redisAsyncContext *ac) {
364     redisContext *c = &(ac->c);
365     c->flags |= REDIS_FREEING;
366     if (!(c->flags & REDIS_IN_CALLBACK))
367         __redisAsyncFree(ac);
368 }
369 
370 /* Helper function to make the disconnect happen and clean up. */
__redisAsyncDisconnect(redisAsyncContext * ac)371 void __redisAsyncDisconnect(redisAsyncContext *ac) {
372     redisContext *c = &(ac->c);
373 
374     /* Make sure error is accessible if there is any */
375     __redisAsyncCopyError(ac);
376 
377     if (ac->err == 0) {
378         /* For clean disconnects, there should be no pending callbacks. */
379         int ret = __redisShiftCallback(&ac->replies,NULL);
380         assert(ret == REDIS_ERR);
381     } else {
382         /* Disconnection is caused by an error, make sure that pending
383          * callbacks cannot call new commands. */
384         c->flags |= REDIS_DISCONNECTING;
385     }
386 
387     /* cleanup event library on disconnect.
388      * this is safe to call multiple times */
389     _EL_CLEANUP(ac);
390 
391     /* For non-clean disconnects, __redisAsyncFree() will execute pending
392      * callbacks with a NULL-reply. */
393     if (!(c->flags & REDIS_NO_AUTO_FREE)) {
394       __redisAsyncFree(ac);
395     }
396 }
397 
398 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
399  * from being issued, but tries to flush the output buffer and execute
400  * callbacks for all remaining replies. When this function is called from a
401  * callback, there might be more replies and we can safely defer disconnecting
402  * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
403  * when there are no pending callbacks. */
redisAsyncDisconnect(redisAsyncContext * ac)404 void redisAsyncDisconnect(redisAsyncContext *ac) {
405     redisContext *c = &(ac->c);
406     c->flags |= REDIS_DISCONNECTING;
407 
408     /** unset the auto-free flag here, because disconnect undoes this */
409     c->flags &= ~REDIS_NO_AUTO_FREE;
410     if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
411         __redisAsyncDisconnect(ac);
412 }
413 
__redisGetSubscribeCallback(redisAsyncContext * ac,redisReply * reply,redisCallback * dstcb)414 static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
415     redisContext *c = &(ac->c);
416     dict *callbacks;
417     redisCallback *cb;
418     dictEntry *de;
419     int pvariant;
420     char *stype;
421     sds sname;
422 
423     /* Custom reply functions are not supported for pub/sub. This will fail
424      * very hard when they are used... */
425     if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
426         assert(reply->elements >= 2);
427         assert(reply->element[0]->type == REDIS_REPLY_STRING);
428         stype = reply->element[0]->str;
429         pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
430 
431         if (pvariant)
432             callbacks = ac->sub.patterns;
433         else
434             callbacks = ac->sub.channels;
435 
436         /* Locate the right callback */
437         assert(reply->element[1]->type == REDIS_REPLY_STRING);
438         sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
439         if (sname == NULL)
440             goto oom;
441 
442         de = dictFind(callbacks,sname);
443         if (de != NULL) {
444             cb = dictGetEntryVal(de);
445 
446             /* If this is an subscribe reply decrease pending counter. */
447             if (strcasecmp(stype+pvariant,"subscribe") == 0) {
448                 cb->pending_subs -= 1;
449             }
450 
451             memcpy(dstcb,cb,sizeof(*dstcb));
452 
453             /* If this is an unsubscribe message, remove it. */
454             if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
455                 if (cb->pending_subs == 0)
456                     dictDelete(callbacks,sname);
457 
458                 /* If this was the last unsubscribe message, revert to
459                  * non-subscribe mode. */
460                 assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
461 
462                 /* Unset subscribed flag only when no pipelined pending subscribe. */
463                 if (reply->element[2]->integer == 0
464                     && dictSize(ac->sub.channels) == 0
465                     && dictSize(ac->sub.patterns) == 0)
466                     c->flags &= ~REDIS_SUBSCRIBED;
467             }
468         }
469         sdsfree(sname);
470     } else {
471         /* Shift callback for invalid commands. */
472         __redisShiftCallback(&ac->sub.invalid,dstcb);
473     }
474     return REDIS_OK;
475 oom:
476     __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
477     return REDIS_ERR;
478 }
479 
480 #define redisIsSpontaneousPushReply(r) \
481     (redisIsPushReply(r) && !redisIsSubscribeReply(r))
482 
redisIsSubscribeReply(redisReply * reply)483 static int redisIsSubscribeReply(redisReply *reply) {
484     char *str;
485     size_t len, off;
486 
487     /* We will always have at least one string with the subscribe/message type */
488     if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING ||
489         reply->element[0]->len < sizeof("message") - 1)
490     {
491         return 0;
492     }
493 
494     /* Get the string/len moving past 'p' if needed */
495     off = tolower(reply->element[0]->str[0]) == 'p';
496     str = reply->element[0]->str + off;
497     len = reply->element[0]->len - off;
498 
499     return !strncasecmp(str, "subscribe", len) ||
500            !strncasecmp(str, "message", len);
501 
502 }
503 
redisProcessCallbacks(redisAsyncContext * ac)504 void redisProcessCallbacks(redisAsyncContext *ac) {
505     redisContext *c = &(ac->c);
506     redisCallback cb = {NULL, NULL, 0, NULL};
507     void *reply = NULL;
508     int status;
509 
510     while((status = redisGetReply(c,&reply)) == REDIS_OK) {
511         if (reply == NULL) {
512             /* When the connection is being disconnected and there are
513              * no more replies, this is the cue to really disconnect. */
514             if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
515                 && ac->replies.head == NULL) {
516                 __redisAsyncDisconnect(ac);
517                 return;
518             }
519 
520             /* If monitor mode, repush callback */
521             if(c->flags & REDIS_MONITORING) {
522                 __redisPushCallback(&ac->replies,&cb);
523             }
524 
525             /* When the connection is not being disconnected, simply stop
526              * trying to get replies and wait for the next loop tick. */
527             break;
528         }
529 
530         /* Send any non-subscribe related PUSH messages to our PUSH handler
531          * while allowing subscribe related PUSH messages to pass through.
532          * This allows existing code to be backward compatible and work in
533          * either RESP2 or RESP3 mode. */
534         if (redisIsSpontaneousPushReply(reply)) {
535             __redisRunPushCallback(ac, reply);
536             c->reader->fn->freeObject(reply);
537             continue;
538         }
539 
540         /* Even if the context is subscribed, pending regular
541          * callbacks will get a reply before pub/sub messages arrive. */
542         if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
543             /*
544              * A spontaneous reply in a not-subscribed context can be the error
545              * reply that is sent when a new connection exceeds the maximum
546              * number of allowed connections on the server side.
547              *
548              * This is seen as an error instead of a regular reply because the
549              * server closes the connection after sending it.
550              *
551              * To prevent the error from being overwritten by an EOF error the
552              * connection is closed here. See issue #43.
553              *
554              * Another possibility is that the server is loading its dataset.
555              * In this case we also want to close the connection, and have the
556              * user wait until the server is ready to take our request.
557              */
558             if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
559                 c->err = REDIS_ERR_OTHER;
560                 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
561                 c->reader->fn->freeObject(reply);
562                 __redisAsyncDisconnect(ac);
563                 return;
564             }
565             /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
566             assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
567             if(c->flags & REDIS_SUBSCRIBED)
568                 __redisGetSubscribeCallback(ac,reply,&cb);
569         }
570 
571         if (cb.fn != NULL) {
572             __redisRunCallback(ac,&cb,reply);
573             c->reader->fn->freeObject(reply);
574 
575             /* Proceed with free'ing when redisAsyncFree() was called. */
576             if (c->flags & REDIS_FREEING) {
577                 __redisAsyncFree(ac);
578                 return;
579             }
580         } else {
581             /* No callback for this reply. This can either be a NULL callback,
582              * or there were no callbacks to begin with. Either way, don't
583              * abort with an error, but simply ignore it because the client
584              * doesn't know what the server will spit out over the wire. */
585             c->reader->fn->freeObject(reply);
586         }
587     }
588 
589     /* Disconnect when there was an error reading the reply */
590     if (status != REDIS_OK)
591         __redisAsyncDisconnect(ac);
592 }
593 
__redisAsyncHandleConnectFailure(redisAsyncContext * ac)594 static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
595     if (ac->onConnect) ac->onConnect(ac, REDIS_ERR);
596     __redisAsyncDisconnect(ac);
597 }
598 
599 /* Internal helper function to detect socket status the first time a read or
600  * write event fires. When connecting was not successful, the connect callback
601  * is called with a REDIS_ERR status and the context is free'd. */
__redisAsyncHandleConnect(redisAsyncContext * ac)602 static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
603     int completed = 0;
604     redisContext *c = &(ac->c);
605 
606     if (redisCheckConnectDone(c, &completed) == REDIS_ERR) {
607         /* Error! */
608         redisCheckSocketError(c);
609         __redisAsyncHandleConnectFailure(ac);
610         return REDIS_ERR;
611     } else if (completed == 1) {
612         /* connected! */
613         if (c->connection_type == REDIS_CONN_TCP &&
614             redisSetTcpNoDelay(c) == REDIS_ERR) {
615             __redisAsyncHandleConnectFailure(ac);
616             return REDIS_ERR;
617         }
618 
619         if (ac->onConnect) ac->onConnect(ac, REDIS_OK);
620         c->flags |= REDIS_CONNECTED;
621         return REDIS_OK;
622     } else {
623         return REDIS_OK;
624     }
625 }
626 
redisAsyncRead(redisAsyncContext * ac)627 void redisAsyncRead(redisAsyncContext *ac) {
628     redisContext *c = &(ac->c);
629 
630     if (redisBufferRead(c) == REDIS_ERR) {
631         __redisAsyncDisconnect(ac);
632     } else {
633         /* Always re-schedule reads */
634         _EL_ADD_READ(ac);
635         redisProcessCallbacks(ac);
636     }
637 }
638 
639 /* This function should be called when the socket is readable.
640  * It processes all replies that can be read and executes their callbacks.
641  */
redisAsyncHandleRead(redisAsyncContext * ac)642 void redisAsyncHandleRead(redisAsyncContext *ac) {
643     redisContext *c = &(ac->c);
644 
645     if (!(c->flags & REDIS_CONNECTED)) {
646         /* Abort connect was not successful. */
647         if (__redisAsyncHandleConnect(ac) != REDIS_OK)
648             return;
649         /* Try again later when the context is still not connected. */
650         if (!(c->flags & REDIS_CONNECTED))
651             return;
652     }
653 
654     c->funcs->async_read(ac);
655 }
656 
redisAsyncWrite(redisAsyncContext * ac)657 void redisAsyncWrite(redisAsyncContext *ac) {
658     redisContext *c = &(ac->c);
659     int done = 0;
660 
661     if (redisBufferWrite(c,&done) == REDIS_ERR) {
662         __redisAsyncDisconnect(ac);
663     } else {
664         /* Continue writing when not done, stop writing otherwise */
665         if (!done)
666             _EL_ADD_WRITE(ac);
667         else
668             _EL_DEL_WRITE(ac);
669 
670         /* Always schedule reads after writes */
671         _EL_ADD_READ(ac);
672     }
673 }
674 
redisAsyncHandleWrite(redisAsyncContext * ac)675 void redisAsyncHandleWrite(redisAsyncContext *ac) {
676     redisContext *c = &(ac->c);
677 
678     if (!(c->flags & REDIS_CONNECTED)) {
679         /* Abort connect was not successful. */
680         if (__redisAsyncHandleConnect(ac) != REDIS_OK)
681             return;
682         /* Try again later when the context is still not connected. */
683         if (!(c->flags & REDIS_CONNECTED))
684             return;
685     }
686 
687     c->funcs->async_write(ac);
688 }
689 
redisAsyncHandleTimeout(redisAsyncContext * ac)690 void redisAsyncHandleTimeout(redisAsyncContext *ac) {
691     redisContext *c = &(ac->c);
692     redisCallback cb;
693 
694     if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) {
695         /* Nothing to do - just an idle timeout */
696         return;
697     }
698 
699     if (!c->err) {
700         __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
701     }
702 
703     if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
704         ac->onConnect(ac, REDIS_ERR);
705     }
706 
707     while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
708         __redisRunCallback(ac, &cb, NULL);
709     }
710 
711     /**
712      * TODO: Don't automatically sever the connection,
713      * rather, allow to ignore <x> responses before the queue is clear
714      */
715     __redisAsyncDisconnect(ac);
716 }
717 
718 /* Sets a pointer to the first argument and its length starting at p. Returns
719  * the number of bytes to skip to get to the following argument. */
nextArgument(const char * start,const char ** str,size_t * len)720 static const char *nextArgument(const char *start, const char **str, size_t *len) {
721     const char *p = start;
722     if (p[0] != '$') {
723         p = strchr(p,'$');
724         if (p == NULL) return NULL;
725     }
726 
727     *len = (int)strtol(p+1,NULL,10);
728     p = strchr(p,'\r');
729     assert(p);
730     *str = p+2;
731     return p+2+(*len)+2;
732 }
733 
734 /* Helper function for the redisAsyncCommand* family of functions. Writes a
735  * formatted command to the output buffer and registers the provided callback
736  * function with the context. */
__redisAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * cmd,size_t len)737 static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
738     redisContext *c = &(ac->c);
739     redisCallback cb;
740     struct dict *cbdict;
741     dictEntry *de;
742     redisCallback *existcb;
743     int pvariant, hasnext;
744     const char *cstr, *astr;
745     size_t clen, alen;
746     const char *p;
747     sds sname;
748     int ret;
749 
750     /* Don't accept new commands when the connection is about to be closed. */
751     if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
752 
753     /* Setup callback */
754     cb.fn = fn;
755     cb.privdata = privdata;
756     cb.pending_subs = 1;
757 
758     /* Find out which command will be appended. */
759     p = nextArgument(cmd,&cstr,&clen);
760     assert(p != NULL);
761     hasnext = (p[0] == '$');
762     pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
763     cstr += pvariant;
764     clen -= pvariant;
765 
766     if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
767         c->flags |= REDIS_SUBSCRIBED;
768 
769         /* Add every channel/pattern to the list of subscription callbacks. */
770         while ((p = nextArgument(p,&astr,&alen)) != NULL) {
771             sname = sdsnewlen(astr,alen);
772             if (sname == NULL)
773                 goto oom;
774 
775             if (pvariant)
776                 cbdict = ac->sub.patterns;
777             else
778                 cbdict = ac->sub.channels;
779 
780             de = dictFind(cbdict,sname);
781 
782             if (de != NULL) {
783                 existcb = dictGetEntryVal(de);
784                 cb.pending_subs = existcb->pending_subs + 1;
785             }
786 
787             ret = dictReplace(cbdict,sname,&cb);
788 
789             if (ret == 0) sdsfree(sname);
790         }
791     } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
792         /* It is only useful to call (P)UNSUBSCRIBE when the context is
793          * subscribed to one or more channels or patterns. */
794         if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
795 
796         /* (P)UNSUBSCRIBE does not have its own response: every channel or
797          * pattern that is unsubscribed will receive a message. This means we
798          * should not append a callback function for this command. */
799      } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
800          /* Set monitor flag and push callback */
801          c->flags |= REDIS_MONITORING;
802          __redisPushCallback(&ac->replies,&cb);
803     } else {
804         if (c->flags & REDIS_SUBSCRIBED)
805             /* This will likely result in an error reply, but it needs to be
806              * received and passed to the callback. */
807             __redisPushCallback(&ac->sub.invalid,&cb);
808         else
809             __redisPushCallback(&ac->replies,&cb);
810     }
811 
812     __redisAppendCommand(c,cmd,len);
813 
814     /* Always schedule a write when the write buffer is non-empty */
815     _EL_ADD_WRITE(ac);
816 
817     return REDIS_OK;
818 oom:
819     __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
820     return REDIS_ERR;
821 }
822 
redisvAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * format,va_list ap)823 int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
824     char *cmd;
825     int len;
826     int status;
827     len = redisvFormatCommand(&cmd,format,ap);
828 
829     /* We don't want to pass -1 or -2 to future functions as a length. */
830     if (len < 0)
831         return REDIS_ERR;
832 
833     status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
834     hi_free(cmd);
835     return status;
836 }
837 
redisAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * format,...)838 int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
839     va_list ap;
840     int status;
841     va_start(ap,format);
842     status = redisvAsyncCommand(ac,fn,privdata,format,ap);
843     va_end(ap);
844     return status;
845 }
846 
redisAsyncCommandArgv(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,int argc,const char ** argv,const size_t * argvlen)847 int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
848     sds cmd;
849     int len;
850     int status;
851     len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
852     if (len < 0)
853         return REDIS_ERR;
854     status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
855     sdsfree(cmd);
856     return status;
857 }
858 
redisAsyncFormattedCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * cmd,size_t len)859 int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
860     int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
861     return status;
862 }
863 
redisAsyncSetPushCallback(redisAsyncContext * ac,redisAsyncPushFn * fn)864 redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
865     redisAsyncPushFn *old = ac->push_cb;
866     ac->push_cb = fn;
867     return old;
868 }
869 
redisAsyncSetTimeout(redisAsyncContext * ac,struct timeval tv)870 int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
871     if (!ac->c.command_timeout) {
872         ac->c.command_timeout = hi_calloc(1, sizeof(tv));
873         if (ac->c.command_timeout == NULL) {
874             __redisSetError(&ac->c, REDIS_ERR_OOM, "Out of memory");
875             __redisAsyncCopyError(ac);
876             return REDIS_ERR;
877         }
878     }
879 
880     if (tv.tv_sec != ac->c.command_timeout->tv_sec ||
881         tv.tv_usec != ac->c.command_timeout->tv_usec)
882     {
883         *ac->c.command_timeout = tv;
884     }
885 
886     return REDIS_OK;
887 }
888