1 /*
2  * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3  * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4  *
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  *   * Redistributions of source code must retain the above copyright notice,
11  *     this list of conditions and the following disclaimer.
12  *   * Redistributions in binary form must reproduce the above copyright
13  *     notice, this list of conditions and the following disclaimer in the
14  *     documentation and/or other materials provided with the distribution.
15  *   * Neither the name of Redis nor the names of its contributors may be used
16  *     to endorse or promote products derived from this software without
17  *     specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 #include "fmacros.h"
33 #include <stdlib.h>
34 #include <string.h>
35 #include <strings.h>
36 #include <assert.h>
37 #include <ctype.h>
38 #include <errno.h>
39 #include "async.h"
40 #include "net.h"
41 #include "dict.c"
42 #include "sds.h"
43 
44 #define _EL_ADD_READ(ctx) do { \
45         if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
46     } while(0)
47 #define _EL_DEL_READ(ctx) do { \
48         if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
49     } while(0)
50 #define _EL_ADD_WRITE(ctx) do { \
51         if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
52     } while(0)
53 #define _EL_DEL_WRITE(ctx) do { \
54         if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
55     } while(0)
56 #define _EL_CLEANUP(ctx) do { \
57         if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
58     } while(0);
59 
60 /* Forward declaration of function in hiredis.c */
61 int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
62 
63 /* Functions managing dictionary of callbacks for pub/sub. */
callbackHash(const void * key)64 static unsigned int callbackHash(const void *key) {
65     return dictGenHashFunction((const unsigned char *)key,
66                                sdslen((const sds)key));
67 }
68 
callbackValDup(void * privdata,const void * src)69 static void *callbackValDup(void *privdata, const void *src) {
70     ((void) privdata);
71     redisCallback *dup = malloc(sizeof(*dup));
72     memcpy(dup,src,sizeof(*dup));
73     return dup;
74 }
75 
callbackKeyCompare(void * privdata,const void * key1,const void * key2)76 static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
77     int l1, l2;
78     ((void) privdata);
79 
80     l1 = sdslen((const sds)key1);
81     l2 = sdslen((const sds)key2);
82     if (l1 != l2) return 0;
83     return memcmp(key1,key2,l1) == 0;
84 }
85 
callbackKeyDestructor(void * privdata,void * key)86 static void callbackKeyDestructor(void *privdata, void *key) {
87     ((void) privdata);
88     sdsfree((sds)key);
89 }
90 
callbackValDestructor(void * privdata,void * val)91 static void callbackValDestructor(void *privdata, void *val) {
92     ((void) privdata);
93     free(val);
94 }
95 
96 static dictType callbackDict = {
97     callbackHash,
98     NULL,
99     callbackValDup,
100     callbackKeyCompare,
101     callbackKeyDestructor,
102     callbackValDestructor
103 };
104 
redisAsyncInitialize(redisContext * c)105 static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
106     redisAsyncContext *ac;
107 
108     ac = realloc(c,sizeof(redisAsyncContext));
109     if (ac == NULL)
110         return NULL;
111 
112     c = &(ac->c);
113 
114     /* The regular connect functions will always set the flag REDIS_CONNECTED.
115      * For the async API, we want to wait until the first write event is
116      * received up before setting this flag, so reset it here. */
117     c->flags &= ~REDIS_CONNECTED;
118 
119     ac->err = 0;
120     ac->errstr = NULL;
121     ac->data = NULL;
122 
123     ac->ev.data = NULL;
124     ac->ev.addRead = NULL;
125     ac->ev.delRead = NULL;
126     ac->ev.addWrite = NULL;
127     ac->ev.delWrite = NULL;
128     ac->ev.cleanup = NULL;
129 
130     ac->onConnect = NULL;
131     ac->onDisconnect = NULL;
132 
133     ac->replies.head = NULL;
134     ac->replies.tail = NULL;
135     ac->sub.invalid.head = NULL;
136     ac->sub.invalid.tail = NULL;
137     ac->sub.channels = dictCreate(&callbackDict,NULL);
138     ac->sub.patterns = dictCreate(&callbackDict,NULL);
139     return ac;
140 }
141 
142 /* We want the error field to be accessible directly instead of requiring
143  * an indirection to the redisContext struct. */
__redisAsyncCopyError(redisAsyncContext * ac)144 static void __redisAsyncCopyError(redisAsyncContext *ac) {
145     if (!ac)
146         return;
147 
148     redisContext *c = &(ac->c);
149     ac->err = c->err;
150     ac->errstr = c->errstr;
151 }
152 
redisAsyncConnect(const char * ip,int port)153 redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
154     redisContext *c;
155     redisAsyncContext *ac;
156 
157     c = redisConnectNonBlock(ip,port);
158     if (c == NULL)
159         return NULL;
160 
161     ac = redisAsyncInitialize(c);
162     if (ac == NULL) {
163         redisFree(c);
164         return NULL;
165     }
166 
167     __redisAsyncCopyError(ac);
168     return ac;
169 }
170 
redisAsyncConnectBind(const char * ip,int port,const char * source_addr)171 redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
172                                          const char *source_addr) {
173     redisContext *c = redisConnectBindNonBlock(ip,port,source_addr);
174     redisAsyncContext *ac = redisAsyncInitialize(c);
175     __redisAsyncCopyError(ac);
176     return ac;
177 }
178 
redisAsyncConnectBindWithReuse(const char * ip,int port,const char * source_addr)179 redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
180                                                   const char *source_addr) {
181     redisContext *c = redisConnectBindNonBlockWithReuse(ip,port,source_addr);
182     redisAsyncContext *ac = redisAsyncInitialize(c);
183     __redisAsyncCopyError(ac);
184     return ac;
185 }
186 
redisAsyncConnectUnix(const char * path)187 redisAsyncContext *redisAsyncConnectUnix(const char *path) {
188     redisContext *c;
189     redisAsyncContext *ac;
190 
191     c = redisConnectUnixNonBlock(path);
192     if (c == NULL)
193         return NULL;
194 
195     ac = redisAsyncInitialize(c);
196     if (ac == NULL) {
197         redisFree(c);
198         return NULL;
199     }
200 
201     __redisAsyncCopyError(ac);
202     return ac;
203 }
204 
redisAsyncSetConnectCallback(redisAsyncContext * ac,redisConnectCallback * fn)205 int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
206     if (ac->onConnect == NULL) {
207         ac->onConnect = fn;
208 
209         /* The common way to detect an established connection is to wait for
210          * the first write event to be fired. This assumes the related event
211          * library functions are already set. */
212         _EL_ADD_WRITE(ac);
213         return REDIS_OK;
214     }
215     return REDIS_ERR;
216 }
217 
redisAsyncSetDisconnectCallback(redisAsyncContext * ac,redisDisconnectCallback * fn)218 int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
219     if (ac->onDisconnect == NULL) {
220         ac->onDisconnect = fn;
221         return REDIS_OK;
222     }
223     return REDIS_ERR;
224 }
225 
226 /* Helper functions to push/shift callbacks */
__redisPushCallback(redisCallbackList * list,redisCallback * source)227 static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
228     redisCallback *cb;
229 
230     /* Copy callback from stack to heap */
231     cb = malloc(sizeof(*cb));
232     if (cb == NULL)
233         return REDIS_ERR_OOM;
234 
235     if (source != NULL) {
236         memcpy(cb,source,sizeof(*cb));
237         cb->next = NULL;
238     }
239 
240     /* Store callback in list */
241     if (list->head == NULL)
242         list->head = cb;
243     if (list->tail != NULL)
244         list->tail->next = cb;
245     list->tail = cb;
246     return REDIS_OK;
247 }
248 
__redisShiftCallback(redisCallbackList * list,redisCallback * target)249 static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
250     redisCallback *cb = list->head;
251     if (cb != NULL) {
252         list->head = cb->next;
253         if (cb == list->tail)
254             list->tail = NULL;
255 
256         /* Copy callback from heap to stack */
257         if (target != NULL)
258             memcpy(target,cb,sizeof(*cb));
259         free(cb);
260         return REDIS_OK;
261     }
262     return REDIS_ERR;
263 }
264 
__redisRunCallback(redisAsyncContext * ac,redisCallback * cb,redisReply * reply)265 static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
266     redisContext *c = &(ac->c);
267     if (cb->fn != NULL) {
268         c->flags |= REDIS_IN_CALLBACK;
269         cb->fn(ac,reply,cb->privdata);
270         c->flags &= ~REDIS_IN_CALLBACK;
271     }
272 }
273 
274 /* Helper function to free the context. */
__redisAsyncFree(redisAsyncContext * ac)275 static void __redisAsyncFree(redisAsyncContext *ac) {
276     redisContext *c = &(ac->c);
277     redisCallback cb;
278     dictIterator *it;
279     dictEntry *de;
280 
281     /* Execute pending callbacks with NULL reply. */
282     while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
283         __redisRunCallback(ac,&cb,NULL);
284 
285     /* Execute callbacks for invalid commands */
286     while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
287         __redisRunCallback(ac,&cb,NULL);
288 
289     /* Run subscription callbacks callbacks with NULL reply */
290     it = dictGetIterator(ac->sub.channels);
291     while ((de = dictNext(it)) != NULL)
292         __redisRunCallback(ac,dictGetEntryVal(de),NULL);
293     dictReleaseIterator(it);
294     dictRelease(ac->sub.channels);
295 
296     it = dictGetIterator(ac->sub.patterns);
297     while ((de = dictNext(it)) != NULL)
298         __redisRunCallback(ac,dictGetEntryVal(de),NULL);
299     dictReleaseIterator(it);
300     dictRelease(ac->sub.patterns);
301 
302     /* Signal event lib to clean up */
303     _EL_CLEANUP(ac);
304 
305     /* Execute disconnect callback. When redisAsyncFree() initiated destroying
306      * this context, the status will always be REDIS_OK. */
307     if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
308         if (c->flags & REDIS_FREEING) {
309             ac->onDisconnect(ac,REDIS_OK);
310         } else {
311             ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
312         }
313     }
314 
315     /* Cleanup self */
316     redisFree(c);
317 }
318 
319 /* Free the async context. When this function is called from a callback,
320  * control needs to be returned to redisProcessCallbacks() before actual
321  * free'ing. To do so, a flag is set on the context which is picked up by
322  * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
redisAsyncFree(redisAsyncContext * ac)323 void redisAsyncFree(redisAsyncContext *ac) {
324     redisContext *c = &(ac->c);
325     c->flags |= REDIS_FREEING;
326     if (!(c->flags & REDIS_IN_CALLBACK))
327         __redisAsyncFree(ac);
328 }
329 
330 /* Helper function to make the disconnect happen and clean up. */
__redisAsyncDisconnect(redisAsyncContext * ac)331 static void __redisAsyncDisconnect(redisAsyncContext *ac) {
332     redisContext *c = &(ac->c);
333 
334     /* Make sure error is accessible if there is any */
335     __redisAsyncCopyError(ac);
336 
337     if (ac->err == 0) {
338         /* For clean disconnects, there should be no pending callbacks. */
339         assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
340     } else {
341         /* Disconnection is caused by an error, make sure that pending
342          * callbacks cannot call new commands. */
343         c->flags |= REDIS_DISCONNECTING;
344     }
345 
346     /* For non-clean disconnects, __redisAsyncFree() will execute pending
347      * callbacks with a NULL-reply. */
348     __redisAsyncFree(ac);
349 }
350 
351 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
352  * from being issued, but tries to flush the output buffer and execute
353  * callbacks for all remaining replies. When this function is called from a
354  * callback, there might be more replies and we can safely defer disconnecting
355  * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
356  * when there are no pending callbacks. */
redisAsyncDisconnect(redisAsyncContext * ac)357 void redisAsyncDisconnect(redisAsyncContext *ac) {
358     redisContext *c = &(ac->c);
359     c->flags |= REDIS_DISCONNECTING;
360     if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
361         __redisAsyncDisconnect(ac);
362 }
363 
__redisGetSubscribeCallback(redisAsyncContext * ac,redisReply * reply,redisCallback * dstcb)364 static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
365     redisContext *c = &(ac->c);
366     dict *callbacks;
367     dictEntry *de;
368     int pvariant;
369     char *stype;
370     sds sname;
371 
372     /* Custom reply functions are not supported for pub/sub. This will fail
373      * very hard when they are used... */
374     if (reply->type == REDIS_REPLY_ARRAY) {
375         assert(reply->elements >= 2);
376         assert(reply->element[0]->type == REDIS_REPLY_STRING);
377         stype = reply->element[0]->str;
378         pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
379 
380         if (pvariant)
381             callbacks = ac->sub.patterns;
382         else
383             callbacks = ac->sub.channels;
384 
385         /* Locate the right callback */
386         assert(reply->element[1]->type == REDIS_REPLY_STRING);
387         sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
388         de = dictFind(callbacks,sname);
389         if (de != NULL) {
390             memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
391 
392             /* If this is an unsubscribe message, remove it. */
393             if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
394                 dictDelete(callbacks,sname);
395 
396                 /* If this was the last unsubscribe message, revert to
397                  * non-subscribe mode. */
398                 assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
399                 if (reply->element[2]->integer == 0)
400                     c->flags &= ~REDIS_SUBSCRIBED;
401             }
402         }
403         sdsfree(sname);
404     } else {
405         /* Shift callback for invalid commands. */
406         __redisShiftCallback(&ac->sub.invalid,dstcb);
407     }
408     return REDIS_OK;
409 }
410 
redisProcessCallbacks(redisAsyncContext * ac)411 void redisProcessCallbacks(redisAsyncContext *ac) {
412     redisContext *c = &(ac->c);
413     redisCallback cb = {NULL, NULL, NULL};
414     void *reply = NULL;
415     int status;
416 
417     while((status = redisGetReply(c,&reply)) == REDIS_OK) {
418         if (reply == NULL) {
419             /* When the connection is being disconnected and there are
420              * no more replies, this is the cue to really disconnect. */
421             if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
422                 && ac->replies.head == NULL) {
423                 __redisAsyncDisconnect(ac);
424                 return;
425             }
426 
427             /* If monitor mode, repush callback */
428             if(c->flags & REDIS_MONITORING) {
429                 __redisPushCallback(&ac->replies,&cb);
430             }
431 
432             /* When the connection is not being disconnected, simply stop
433              * trying to get replies and wait for the next loop tick. */
434             break;
435         }
436 
437         /* Even if the context is subscribed, pending regular callbacks will
438          * get a reply before pub/sub messages arrive. */
439         if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
440             /*
441              * A spontaneous reply in a not-subscribed context can be the error
442              * reply that is sent when a new connection exceeds the maximum
443              * number of allowed connections on the server side.
444              *
445              * This is seen as an error instead of a regular reply because the
446              * server closes the connection after sending it.
447              *
448              * To prevent the error from being overwritten by an EOF error the
449              * connection is closed here. See issue #43.
450              *
451              * Another possibility is that the server is loading its dataset.
452              * In this case we also want to close the connection, and have the
453              * user wait until the server is ready to take our request.
454              */
455             if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
456                 c->err = REDIS_ERR_OTHER;
457                 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
458                 c->reader->fn->freeObject(reply);
459                 __redisAsyncDisconnect(ac);
460                 return;
461             }
462             /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
463             assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
464             if(c->flags & REDIS_SUBSCRIBED)
465                 __redisGetSubscribeCallback(ac,reply,&cb);
466         }
467 
468         if (cb.fn != NULL) {
469             __redisRunCallback(ac,&cb,reply);
470             c->reader->fn->freeObject(reply);
471 
472             /* Proceed with free'ing when redisAsyncFree() was called. */
473             if (c->flags & REDIS_FREEING) {
474                 __redisAsyncFree(ac);
475                 return;
476             }
477         } else {
478             /* No callback for this reply. This can either be a NULL callback,
479              * or there were no callbacks to begin with. Either way, don't
480              * abort with an error, but simply ignore it because the client
481              * doesn't know what the server will spit out over the wire. */
482             c->reader->fn->freeObject(reply);
483         }
484     }
485 
486     /* Disconnect when there was an error reading the reply */
487     if (status != REDIS_OK)
488         __redisAsyncDisconnect(ac);
489 }
490 
491 /* Internal helper function to detect socket status the first time a read or
492  * write event fires. When connecting was not successful, the connect callback
493  * is called with a REDIS_ERR status and the context is free'd. */
__redisAsyncHandleConnect(redisAsyncContext * ac)494 static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
495     redisContext *c = &(ac->c);
496 
497     if (redisCheckSocketError(c) == REDIS_ERR) {
498         /* Try again later when connect(2) is still in progress. */
499         if (errno == EINPROGRESS)
500             return REDIS_OK;
501 
502         if (ac->onConnect) ac->onConnect(ac,REDIS_ERR);
503         __redisAsyncDisconnect(ac);
504         return REDIS_ERR;
505     }
506 
507     /* Mark context as connected. */
508     c->flags |= REDIS_CONNECTED;
509     if (ac->onConnect) ac->onConnect(ac,REDIS_OK);
510     return REDIS_OK;
511 }
512 
513 /* This function should be called when the socket is readable.
514  * It processes all replies that can be read and executes their callbacks.
515  */
redisAsyncHandleRead(redisAsyncContext * ac)516 void redisAsyncHandleRead(redisAsyncContext *ac) {
517     redisContext *c = &(ac->c);
518 
519     if (!(c->flags & REDIS_CONNECTED)) {
520         /* Abort connect was not successful. */
521         if (__redisAsyncHandleConnect(ac) != REDIS_OK)
522             return;
523         /* Try again later when the context is still not connected. */
524         if (!(c->flags & REDIS_CONNECTED))
525             return;
526     }
527 
528     if (redisBufferRead(c) == REDIS_ERR) {
529         __redisAsyncDisconnect(ac);
530     } else {
531         /* Always re-schedule reads */
532         _EL_ADD_READ(ac);
533         redisProcessCallbacks(ac);
534     }
535 }
536 
redisAsyncHandleWrite(redisAsyncContext * ac)537 void redisAsyncHandleWrite(redisAsyncContext *ac) {
538     redisContext *c = &(ac->c);
539     int done = 0;
540 
541     if (!(c->flags & REDIS_CONNECTED)) {
542         /* Abort connect was not successful. */
543         if (__redisAsyncHandleConnect(ac) != REDIS_OK)
544             return;
545         /* Try again later when the context is still not connected. */
546         if (!(c->flags & REDIS_CONNECTED))
547             return;
548     }
549 
550     if (redisBufferWrite(c,&done) == REDIS_ERR) {
551         __redisAsyncDisconnect(ac);
552     } else {
553         /* Continue writing when not done, stop writing otherwise */
554         if (!done)
555             _EL_ADD_WRITE(ac);
556         else
557             _EL_DEL_WRITE(ac);
558 
559         /* Always schedule reads after writes */
560         _EL_ADD_READ(ac);
561     }
562 }
563 
564 /* Sets a pointer to the first argument and its length starting at p. Returns
565  * the number of bytes to skip to get to the following argument. */
nextArgument(const char * start,const char ** str,size_t * len)566 static const char *nextArgument(const char *start, const char **str, size_t *len) {
567     const char *p = start;
568     if (p[0] != '$') {
569         p = strchr(p,'$');
570         if (p == NULL) return NULL;
571     }
572 
573     *len = (int)strtol(p+1,NULL,10);
574     p = strchr(p,'\r');
575     assert(p);
576     *str = p+2;
577     return p+2+(*len)+2;
578 }
579 
580 /* Helper function for the redisAsyncCommand* family of functions. Writes a
581  * formatted command to the output buffer and registers the provided callback
582  * function with the context. */
__redisAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * cmd,size_t len)583 static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
584     redisContext *c = &(ac->c);
585     redisCallback cb;
586     int pvariant, hasnext;
587     const char *cstr, *astr;
588     size_t clen, alen;
589     const char *p;
590     sds sname;
591     int ret;
592 
593     /* Don't accept new commands when the connection is about to be closed. */
594     if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
595 
596     /* Setup callback */
597     cb.fn = fn;
598     cb.privdata = privdata;
599 
600     /* Find out which command will be appended. */
601     p = nextArgument(cmd,&cstr,&clen);
602     assert(p != NULL);
603     hasnext = (p[0] == '$');
604     pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
605     cstr += pvariant;
606     clen -= pvariant;
607 
608     if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
609         c->flags |= REDIS_SUBSCRIBED;
610 
611         /* Add every channel/pattern to the list of subscription callbacks. */
612         while ((p = nextArgument(p,&astr,&alen)) != NULL) {
613             sname = sdsnewlen(astr,alen);
614             if (pvariant)
615                 ret = dictReplace(ac->sub.patterns,sname,&cb);
616             else
617                 ret = dictReplace(ac->sub.channels,sname,&cb);
618 
619             if (ret == 0) sdsfree(sname);
620         }
621     } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
622         /* It is only useful to call (P)UNSUBSCRIBE when the context is
623          * subscribed to one or more channels or patterns. */
624         if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
625 
626         /* (P)UNSUBSCRIBE does not have its own response: every channel or
627          * pattern that is unsubscribed will receive a message. This means we
628          * should not append a callback function for this command. */
629      } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
630          /* Set monitor flag and push callback */
631          c->flags |= REDIS_MONITORING;
632          __redisPushCallback(&ac->replies,&cb);
633     } else {
634         if (c->flags & REDIS_SUBSCRIBED)
635             /* This will likely result in an error reply, but it needs to be
636              * received and passed to the callback. */
637             __redisPushCallback(&ac->sub.invalid,&cb);
638         else
639             __redisPushCallback(&ac->replies,&cb);
640     }
641 
642     __redisAppendCommand(c,cmd,len);
643 
644     /* Always schedule a write when the write buffer is non-empty */
645     _EL_ADD_WRITE(ac);
646 
647     return REDIS_OK;
648 }
649 
redisvAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * format,va_list ap)650 int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
651     char *cmd;
652     int len;
653     int status;
654     len = redisvFormatCommand(&cmd,format,ap);
655 
656     /* We don't want to pass -1 or -2 to future functions as a length. */
657     if (len < 0)
658         return REDIS_ERR;
659 
660     status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
661     free(cmd);
662     return status;
663 }
664 
redisAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * format,...)665 int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
666     va_list ap;
667     int status;
668     va_start(ap,format);
669     status = redisvAsyncCommand(ac,fn,privdata,format,ap);
670     va_end(ap);
671     return status;
672 }
673 
redisAsyncCommandArgv(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,int argc,const char ** argv,const size_t * argvlen)674 int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
675     sds cmd;
676     int len;
677     int status;
678     len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
679     status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
680     sdsfree(cmd);
681     return status;
682 }
683 
redisAsyncFormattedCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * cmd,size_t len)684 int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
685     int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
686     return status;
687 }
688