1 /*
2  * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3  * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4  *
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  *   * Redistributions of source code must retain the above copyright notice,
11  *     this list of conditions and the following disclaimer.
12  *   * Redistributions in binary form must reproduce the above copyright
13  *     notice, this list of conditions and the following disclaimer in the
14  *     documentation and/or other materials provided with the distribution.
15  *   * Neither the name of Redis nor the names of its contributors may be used
16  *     to endorse or promote products derived from this software without
17  *     specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 #include "fmacros.h"
33 #include <stdlib.h>
34 #include <string.h>
35 #include <strings.h>
36 #include <assert.h>
37 #include <ctype.h>
38 #include <errno.h>
39 #include "async.h"
40 #include "net.h"
41 #include "dict.c"
42 #include "sds.h"
43 
44 #define _EL_ADD_READ(ctx) do { \
45         if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
46     } while(0)
47 #define _EL_DEL_READ(ctx) do { \
48         if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
49     } while(0)
50 #define _EL_ADD_WRITE(ctx) do { \
51         if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
52     } while(0)
53 #define _EL_DEL_WRITE(ctx) do { \
54         if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
55     } while(0)
56 #define _EL_CLEANUP(ctx) do { \
57         if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
58     } while(0);
59 
60 /* Forward declaration of function in hiredis.c */
61 int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
62 
63 /* Functions managing dictionary of callbacks for pub/sub. */
callbackHash(const void * key)64 static unsigned int callbackHash(const void *key) {
65     return dictGenHashFunction((const unsigned char *)key,
66                                sdslen((const sds)key));
67 }
68 
callbackValDup(void * privdata,const void * src)69 static void *callbackValDup(void *privdata, const void *src) {
70     ((void) privdata);
71     redisCallback *dup = malloc(sizeof(*dup));
72     memcpy(dup,src,sizeof(*dup));
73     return dup;
74 }
75 
callbackKeyCompare(void * privdata,const void * key1,const void * key2)76 static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
77     int l1, l2;
78     ((void) privdata);
79 
80     l1 = sdslen((const sds)key1);
81     l2 = sdslen((const sds)key2);
82     if (l1 != l2) return 0;
83     return memcmp(key1,key2,l1) == 0;
84 }
85 
callbackKeyDestructor(void * privdata,void * key)86 static void callbackKeyDestructor(void *privdata, void *key) {
87     ((void) privdata);
88     sdsfree((sds)key);
89 }
90 
callbackValDestructor(void * privdata,void * val)91 static void callbackValDestructor(void *privdata, void *val) {
92     ((void) privdata);
93     free(val);
94 }
95 
96 static dictType callbackDict = {
97     callbackHash,
98     NULL,
99     callbackValDup,
100     callbackKeyCompare,
101     callbackKeyDestructor,
102     callbackValDestructor
103 };
104 
redisAsyncInitialize(redisContext * c)105 static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
106     redisAsyncContext *ac;
107 
108     ac = realloc(c,sizeof(redisAsyncContext));
109     if (ac == NULL)
110         return NULL;
111 
112     c = &(ac->c);
113 
114     /* The regular connect functions will always set the flag REDIS_CONNECTED.
115      * For the async API, we want to wait until the first write event is
116      * received up before setting this flag, so reset it here. */
117     c->flags &= ~REDIS_CONNECTED;
118 
119     ac->err = 0;
120     ac->errstr = NULL;
121     ac->data = NULL;
122 
123     ac->ev.data = NULL;
124     ac->ev.addRead = NULL;
125     ac->ev.delRead = NULL;
126     ac->ev.addWrite = NULL;
127     ac->ev.delWrite = NULL;
128     ac->ev.cleanup = NULL;
129 
130     ac->onConnect = NULL;
131     ac->onDisconnect = NULL;
132 
133     ac->replies.head = NULL;
134     ac->replies.tail = NULL;
135     ac->sub.invalid.head = NULL;
136     ac->sub.invalid.tail = NULL;
137     ac->sub.channels = dictCreate(&callbackDict,NULL);
138     ac->sub.patterns = dictCreate(&callbackDict,NULL);
139     return ac;
140 }
141 
142 /* We want the error field to be accessible directly instead of requiring
143  * an indirection to the redisContext struct. */
__redisAsyncCopyError(redisAsyncContext * ac)144 static void __redisAsyncCopyError(redisAsyncContext *ac) {
145     if (!ac)
146         return;
147 
148     redisContext *c = &(ac->c);
149     ac->err = c->err;
150     ac->errstr = c->errstr;
151 }
152 
redisAsyncConnect(const char * ip,int port)153 redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
154     redisContext *c;
155     redisAsyncContext *ac;
156 
157     c = redisConnectNonBlock(ip,port);
158     if (c == NULL)
159         return NULL;
160 
161     ac = redisAsyncInitialize(c);
162     if (ac == NULL) {
163         redisFree(c);
164         return NULL;
165     }
166 
167     __redisAsyncCopyError(ac);
168     return ac;
169 }
170 
redisAsyncConnectBind(const char * ip,int port,const char * source_addr)171 redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
172                                          const char *source_addr) {
173     redisContext *c = redisConnectBindNonBlock(ip,port,source_addr);
174     redisAsyncContext *ac = redisAsyncInitialize(c);
175     __redisAsyncCopyError(ac);
176     return ac;
177 }
178 
redisAsyncConnectBindWithReuse(const char * ip,int port,const char * source_addr)179 redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
180                                                   const char *source_addr) {
181     redisContext *c = redisConnectBindNonBlockWithReuse(ip,port,source_addr);
182     redisAsyncContext *ac = redisAsyncInitialize(c);
183     __redisAsyncCopyError(ac);
184     return ac;
185 }
186 
redisAsyncConnectUnix(const char * path)187 redisAsyncContext *redisAsyncConnectUnix(const char *path) {
188     redisContext *c;
189     redisAsyncContext *ac;
190 
191     c = redisConnectUnixNonBlock(path);
192     if (c == NULL)
193         return NULL;
194 
195     ac = redisAsyncInitialize(c);
196     if (ac == NULL) {
197         redisFree(c);
198         return NULL;
199     }
200 
201     __redisAsyncCopyError(ac);
202     return ac;
203 }
204 
redisAsyncSetConnectCallback(redisAsyncContext * ac,redisConnectCallback * fn)205 int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
206     if (ac->onConnect == NULL) {
207         ac->onConnect = fn;
208 
209         /* The common way to detect an established connection is to wait for
210          * the first write event to be fired. This assumes the related event
211          * library functions are already set. */
212         _EL_ADD_WRITE(ac);
213         return REDIS_OK;
214     }
215     return REDIS_ERR;
216 }
217 
redisAsyncSetDisconnectCallback(redisAsyncContext * ac,redisDisconnectCallback * fn)218 int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
219     if (ac->onDisconnect == NULL) {
220         ac->onDisconnect = fn;
221         return REDIS_OK;
222     }
223     return REDIS_ERR;
224 }
225 
226 /* Helper functions to push/shift callbacks */
__redisPushCallback(redisCallbackList * list,redisCallback * source)227 static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
228     redisCallback *cb;
229 
230     /* Copy callback from stack to heap */
231     cb = malloc(sizeof(*cb));
232     if (cb == NULL)
233         return REDIS_ERR_OOM;
234 
235     if (source != NULL) {
236         memcpy(cb,source,sizeof(*cb));
237         cb->next = NULL;
238     }
239 
240     /* Store callback in list */
241     if (list->head == NULL)
242         list->head = cb;
243     if (list->tail != NULL)
244         list->tail->next = cb;
245     list->tail = cb;
246     return REDIS_OK;
247 }
248 
__redisShiftCallback(redisCallbackList * list,redisCallback * target)249 static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
250     redisCallback *cb = list->head;
251     if (cb != NULL) {
252         list->head = cb->next;
253         if (cb == list->tail)
254             list->tail = NULL;
255 
256         /* Copy callback from heap to stack */
257         if (target != NULL)
258             memcpy(target,cb,sizeof(*cb));
259         free(cb);
260         return REDIS_OK;
261     }
262     return REDIS_ERR;
263 }
264 
__redisRunCallback(redisAsyncContext * ac,redisCallback * cb,redisReply * reply)265 static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
266     redisContext *c = &(ac->c);
267     if (cb->fn != NULL) {
268         c->flags |= REDIS_IN_CALLBACK;
269         cb->fn(ac,reply,cb->privdata);
270         c->flags &= ~REDIS_IN_CALLBACK;
271     }
272 }
273 
274 /* Helper function to free the context. */
__redisAsyncFree(redisAsyncContext * ac)275 static void __redisAsyncFree(redisAsyncContext *ac) {
276     redisContext *c = &(ac->c);
277     redisCallback cb;
278     dictIterator *it;
279     dictEntry *de;
280 
281     /* Execute pending callbacks with NULL reply. */
282     while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
283         __redisRunCallback(ac,&cb,NULL);
284 
285     /* Execute callbacks for invalid commands */
286     while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
287         __redisRunCallback(ac,&cb,NULL);
288 
289     /* Run subscription callbacks callbacks with NULL reply */
290     it = dictGetIterator(ac->sub.channels);
291     while ((de = dictNext(it)) != NULL)
292         __redisRunCallback(ac,dictGetEntryVal(de),NULL);
293     dictReleaseIterator(it);
294     dictRelease(ac->sub.channels);
295 
296     it = dictGetIterator(ac->sub.patterns);
297     while ((de = dictNext(it)) != NULL)
298         __redisRunCallback(ac,dictGetEntryVal(de),NULL);
299     dictReleaseIterator(it);
300     dictRelease(ac->sub.patterns);
301 
302     /* Signal event lib to clean up */
303     _EL_CLEANUP(ac);
304 
305     /* Execute disconnect callback. When redisAsyncFree() initiated destroying
306      * this context, the status will always be REDIS_OK. */
307     if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
308         if (c->flags & REDIS_FREEING) {
309             ac->onDisconnect(ac,REDIS_OK);
310         } else {
311             ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
312         }
313     }
314 
315     /* Cleanup self */
316     redisFree(c);
317 }
318 
319 /* Free the async context. When this function is called from a callback,
320  * control needs to be returned to redisProcessCallbacks() before actual
321  * free'ing. To do so, a flag is set on the context which is picked up by
322  * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
redisAsyncFree(redisAsyncContext * ac)323 void redisAsyncFree(redisAsyncContext *ac) {
324     redisContext *c = &(ac->c);
325     c->flags |= REDIS_FREEING;
326     if (!(c->flags & REDIS_IN_CALLBACK))
327         __redisAsyncFree(ac);
328 }
329 
330 /* Helper function to make the disconnect happen and clean up. */
__redisAsyncDisconnect(redisAsyncContext * ac)331 static void __redisAsyncDisconnect(redisAsyncContext *ac) {
332     redisContext *c = &(ac->c);
333 
334     /* Make sure error is accessible if there is any */
335     __redisAsyncCopyError(ac);
336 
337     if (ac->err == 0) {
338         /* For clean disconnects, there should be no pending callbacks. */
339         int ret = __redisShiftCallback(&ac->replies,NULL);
340         assert(ret == REDIS_ERR);
341     } else {
342         /* Disconnection is caused by an error, make sure that pending
343          * callbacks cannot call new commands. */
344         c->flags |= REDIS_DISCONNECTING;
345     }
346 
347     /* For non-clean disconnects, __redisAsyncFree() will execute pending
348      * callbacks with a NULL-reply. */
349     __redisAsyncFree(ac);
350 }
351 
352 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
353  * from being issued, but tries to flush the output buffer and execute
354  * callbacks for all remaining replies. When this function is called from a
355  * callback, there might be more replies and we can safely defer disconnecting
356  * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
357  * when there are no pending callbacks. */
redisAsyncDisconnect(redisAsyncContext * ac)358 void redisAsyncDisconnect(redisAsyncContext *ac) {
359     redisContext *c = &(ac->c);
360     c->flags |= REDIS_DISCONNECTING;
361     if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
362         __redisAsyncDisconnect(ac);
363 }
364 
__redisGetSubscribeCallback(redisAsyncContext * ac,redisReply * reply,redisCallback * dstcb)365 static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
366     redisContext *c = &(ac->c);
367     dict *callbacks;
368     dictEntry *de;
369     int pvariant;
370     char *stype;
371     sds sname;
372 
373     /* Custom reply functions are not supported for pub/sub. This will fail
374      * very hard when they are used... */
375     if (reply->type == REDIS_REPLY_ARRAY) {
376         assert(reply->elements >= 2);
377         assert(reply->element[0]->type == REDIS_REPLY_STRING);
378         stype = reply->element[0]->str;
379         pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
380 
381         if (pvariant)
382             callbacks = ac->sub.patterns;
383         else
384             callbacks = ac->sub.channels;
385 
386         /* Locate the right callback */
387         assert(reply->element[1]->type == REDIS_REPLY_STRING);
388         sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
389         de = dictFind(callbacks,sname);
390         if (de != NULL) {
391             memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
392 
393             /* If this is an unsubscribe message, remove it. */
394             if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
395                 dictDelete(callbacks,sname);
396 
397                 /* If this was the last unsubscribe message, revert to
398                  * non-subscribe mode. */
399                 assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
400                 if (reply->element[2]->integer == 0)
401                     c->flags &= ~REDIS_SUBSCRIBED;
402             }
403         }
404         sdsfree(sname);
405     } else {
406         /* Shift callback for invalid commands. */
407         __redisShiftCallback(&ac->sub.invalid,dstcb);
408     }
409     return REDIS_OK;
410 }
411 
redisProcessCallbacks(redisAsyncContext * ac)412 void redisProcessCallbacks(redisAsyncContext *ac) {
413     redisContext *c = &(ac->c);
414     redisCallback cb = {NULL, NULL, NULL};
415     void *reply = NULL;
416     int status;
417 
418     while((status = redisGetReply(c,&reply)) == REDIS_OK) {
419         if (reply == NULL) {
420             /* When the connection is being disconnected and there are
421              * no more replies, this is the cue to really disconnect. */
422             if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
423                 && ac->replies.head == NULL) {
424                 __redisAsyncDisconnect(ac);
425                 return;
426             }
427 
428             /* If monitor mode, repush callback */
429             if(c->flags & REDIS_MONITORING) {
430                 __redisPushCallback(&ac->replies,&cb);
431             }
432 
433             /* When the connection is not being disconnected, simply stop
434              * trying to get replies and wait for the next loop tick. */
435             break;
436         }
437 
438         /* Even if the context is subscribed, pending regular callbacks will
439          * get a reply before pub/sub messages arrive. */
440         if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
441             /*
442              * A spontaneous reply in a not-subscribed context can be the error
443              * reply that is sent when a new connection exceeds the maximum
444              * number of allowed connections on the server side.
445              *
446              * This is seen as an error instead of a regular reply because the
447              * server closes the connection after sending it.
448              *
449              * To prevent the error from being overwritten by an EOF error the
450              * connection is closed here. See issue #43.
451              *
452              * Another possibility is that the server is loading its dataset.
453              * In this case we also want to close the connection, and have the
454              * user wait until the server is ready to take our request.
455              */
456             if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
457                 c->err = REDIS_ERR_OTHER;
458                 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
459                 c->reader->fn->freeObject(reply);
460                 __redisAsyncDisconnect(ac);
461                 return;
462             }
463             /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
464             assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
465             if(c->flags & REDIS_SUBSCRIBED)
466                 __redisGetSubscribeCallback(ac,reply,&cb);
467         }
468 
469         if (cb.fn != NULL) {
470             __redisRunCallback(ac,&cb,reply);
471             c->reader->fn->freeObject(reply);
472 
473             /* Proceed with free'ing when redisAsyncFree() was called. */
474             if (c->flags & REDIS_FREEING) {
475                 __redisAsyncFree(ac);
476                 return;
477             }
478         } else {
479             /* No callback for this reply. This can either be a NULL callback,
480              * or there were no callbacks to begin with. Either way, don't
481              * abort with an error, but simply ignore it because the client
482              * doesn't know what the server will spit out over the wire. */
483             c->reader->fn->freeObject(reply);
484         }
485     }
486 
487     /* Disconnect when there was an error reading the reply */
488     if (status != REDIS_OK)
489         __redisAsyncDisconnect(ac);
490 }
491 
492 /* Internal helper function to detect socket status the first time a read or
493  * write event fires. When connecting was not successful, the connect callback
494  * is called with a REDIS_ERR status and the context is free'd. */
__redisAsyncHandleConnect(redisAsyncContext * ac)495 static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
496     redisContext *c = &(ac->c);
497 
498     if (redisCheckSocketError(c) == REDIS_ERR) {
499         /* Try again later when connect(2) is still in progress. */
500         if (errno == EINPROGRESS)
501             return REDIS_OK;
502 
503         if (ac->onConnect) ac->onConnect(ac,REDIS_ERR);
504         __redisAsyncDisconnect(ac);
505         return REDIS_ERR;
506     }
507 
508     /* Mark context as connected. */
509     c->flags |= REDIS_CONNECTED;
510     if (ac->onConnect) ac->onConnect(ac,REDIS_OK);
511     return REDIS_OK;
512 }
513 
514 /* This function should be called when the socket is readable.
515  * It processes all replies that can be read and executes their callbacks.
516  */
redisAsyncHandleRead(redisAsyncContext * ac)517 void redisAsyncHandleRead(redisAsyncContext *ac) {
518     redisContext *c = &(ac->c);
519 
520     if (!(c->flags & REDIS_CONNECTED)) {
521         /* Abort connect was not successful. */
522         if (__redisAsyncHandleConnect(ac) != REDIS_OK)
523             return;
524         /* Try again later when the context is still not connected. */
525         if (!(c->flags & REDIS_CONNECTED))
526             return;
527     }
528 
529     if (redisBufferRead(c) == REDIS_ERR) {
530         __redisAsyncDisconnect(ac);
531     } else {
532         /* Always re-schedule reads */
533         _EL_ADD_READ(ac);
534         redisProcessCallbacks(ac);
535     }
536 }
537 
redisAsyncHandleWrite(redisAsyncContext * ac)538 void redisAsyncHandleWrite(redisAsyncContext *ac) {
539     redisContext *c = &(ac->c);
540     int done = 0;
541 
542     if (!(c->flags & REDIS_CONNECTED)) {
543         /* Abort connect was not successful. */
544         if (__redisAsyncHandleConnect(ac) != REDIS_OK)
545             return;
546         /* Try again later when the context is still not connected. */
547         if (!(c->flags & REDIS_CONNECTED))
548             return;
549     }
550 
551     if (redisBufferWrite(c,&done) == REDIS_ERR) {
552         __redisAsyncDisconnect(ac);
553     } else {
554         /* Continue writing when not done, stop writing otherwise */
555         if (!done)
556             _EL_ADD_WRITE(ac);
557         else
558             _EL_DEL_WRITE(ac);
559 
560         /* Always schedule reads after writes */
561         _EL_ADD_READ(ac);
562     }
563 }
564 
565 /* Sets a pointer to the first argument and its length starting at p. Returns
566  * the number of bytes to skip to get to the following argument. */
nextArgument(const char * start,const char ** str,size_t * len)567 static const char *nextArgument(const char *start, const char **str, size_t *len) {
568     const char *p = start;
569     if (p[0] != '$') {
570         p = strchr(p,'$');
571         if (p == NULL) return NULL;
572     }
573 
574     *len = (int)strtol(p+1,NULL,10);
575     p = strchr(p,'\r');
576     assert(p);
577     *str = p+2;
578     return p+2+(*len)+2;
579 }
580 
581 /* Helper function for the redisAsyncCommand* family of functions. Writes a
582  * formatted command to the output buffer and registers the provided callback
583  * function with the context. */
__redisAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * cmd,size_t len)584 static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
585     redisContext *c = &(ac->c);
586     redisCallback cb;
587     int pvariant, hasnext;
588     const char *cstr, *astr;
589     size_t clen, alen;
590     const char *p;
591     sds sname;
592     int ret;
593 
594     /* Don't accept new commands when the connection is about to be closed. */
595     if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
596 
597     /* Setup callback */
598     cb.fn = fn;
599     cb.privdata = privdata;
600 
601     /* Find out which command will be appended. */
602     p = nextArgument(cmd,&cstr,&clen);
603     assert(p != NULL);
604     hasnext = (p[0] == '$');
605     pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
606     cstr += pvariant;
607     clen -= pvariant;
608 
609     if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
610         c->flags |= REDIS_SUBSCRIBED;
611 
612         /* Add every channel/pattern to the list of subscription callbacks. */
613         while ((p = nextArgument(p,&astr,&alen)) != NULL) {
614             sname = sdsnewlen(astr,alen);
615             if (pvariant)
616                 ret = dictReplace(ac->sub.patterns,sname,&cb);
617             else
618                 ret = dictReplace(ac->sub.channels,sname,&cb);
619 
620             if (ret == 0) sdsfree(sname);
621         }
622     } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
623         /* It is only useful to call (P)UNSUBSCRIBE when the context is
624          * subscribed to one or more channels or patterns. */
625         if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
626 
627         /* (P)UNSUBSCRIBE does not have its own response: every channel or
628          * pattern that is unsubscribed will receive a message. This means we
629          * should not append a callback function for this command. */
630      } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
631          /* Set monitor flag and push callback */
632          c->flags |= REDIS_MONITORING;
633          __redisPushCallback(&ac->replies,&cb);
634     } else {
635         if (c->flags & REDIS_SUBSCRIBED)
636             /* This will likely result in an error reply, but it needs to be
637              * received and passed to the callback. */
638             __redisPushCallback(&ac->sub.invalid,&cb);
639         else
640             __redisPushCallback(&ac->replies,&cb);
641     }
642 
643     __redisAppendCommand(c,cmd,len);
644 
645     /* Always schedule a write when the write buffer is non-empty */
646     _EL_ADD_WRITE(ac);
647 
648     return REDIS_OK;
649 }
650 
redisvAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * format,va_list ap)651 int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
652     char *cmd;
653     int len;
654     int status;
655     len = redisvFormatCommand(&cmd,format,ap);
656 
657     /* We don't want to pass -1 or -2 to future functions as a length. */
658     if (len < 0)
659         return REDIS_ERR;
660 
661     status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
662     free(cmd);
663     return status;
664 }
665 
redisAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * format,...)666 int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
667     va_list ap;
668     int status;
669     va_start(ap,format);
670     status = redisvAsyncCommand(ac,fn,privdata,format,ap);
671     va_end(ap);
672     return status;
673 }
674 
redisAsyncCommandArgv(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,int argc,const char ** argv,const size_t * argvlen)675 int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
676     sds cmd;
677     int len;
678     int status;
679     len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
680     if (len < 0)
681         return REDIS_ERR;
682     status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
683     sdsfree(cmd);
684     return status;
685 }
686 
redisAsyncFormattedCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * cmd,size_t len)687 int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
688     int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
689     return status;
690 }
691