1 /*
2 * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 *
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "fmacros.h"
33 #include "alloc.h"
34 #include <stdlib.h>
35 #include <string.h>
36 #ifndef _MSC_VER
37 #include <strings.h>
38 #endif
39 #include <assert.h>
40 #include <ctype.h>
41 #include <errno.h>
42 #include "async.h"
43 #include "net.h"
44 #include "dict.c"
45 #include "sds.h"
46 #include "win32.h"
47
48 #include "async_private.h"
49
50 /* Forward declarations of hiredis.c functions */
51 int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
52 void __redisSetError(redisContext *c, int type, const char *str);
53
54 /* Functions managing dictionary of callbacks for pub/sub. */
callbackHash(const void * key)55 static unsigned int callbackHash(const void *key) {
56 return dictGenHashFunction((const unsigned char *)key,
57 sdslen((const sds)key));
58 }
59
callbackValDup(void * privdata,const void * src)60 static void *callbackValDup(void *privdata, const void *src) {
61 ((void) privdata);
62 redisCallback *dup;
63
64 dup = hi_malloc(sizeof(*dup));
65 if (dup == NULL)
66 return NULL;
67
68 memcpy(dup,src,sizeof(*dup));
69 return dup;
70 }
71
callbackKeyCompare(void * privdata,const void * key1,const void * key2)72 static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
73 int l1, l2;
74 ((void) privdata);
75
76 l1 = sdslen((const sds)key1);
77 l2 = sdslen((const sds)key2);
78 if (l1 != l2) return 0;
79 return memcmp(key1,key2,l1) == 0;
80 }
81
callbackKeyDestructor(void * privdata,void * key)82 static void callbackKeyDestructor(void *privdata, void *key) {
83 ((void) privdata);
84 sdsfree((sds)key);
85 }
86
callbackValDestructor(void * privdata,void * val)87 static void callbackValDestructor(void *privdata, void *val) {
88 ((void) privdata);
89 hi_free(val);
90 }
91
92 static dictType callbackDict = {
93 callbackHash,
94 NULL,
95 callbackValDup,
96 callbackKeyCompare,
97 callbackKeyDestructor,
98 callbackValDestructor
99 };
100
redisAsyncInitialize(redisContext * c)101 static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
102 redisAsyncContext *ac;
103 dict *channels = NULL, *patterns = NULL;
104
105 channels = dictCreate(&callbackDict,NULL);
106 if (channels == NULL)
107 goto oom;
108
109 patterns = dictCreate(&callbackDict,NULL);
110 if (patterns == NULL)
111 goto oom;
112
113 ac = hi_realloc(c,sizeof(redisAsyncContext));
114 if (ac == NULL)
115 goto oom;
116
117 c = &(ac->c);
118
119 /* The regular connect functions will always set the flag REDIS_CONNECTED.
120 * For the async API, we want to wait until the first write event is
121 * received up before setting this flag, so reset it here. */
122 c->flags &= ~REDIS_CONNECTED;
123
124 ac->err = 0;
125 ac->errstr = NULL;
126 ac->data = NULL;
127 ac->dataCleanup = NULL;
128
129 ac->ev.data = NULL;
130 ac->ev.addRead = NULL;
131 ac->ev.delRead = NULL;
132 ac->ev.addWrite = NULL;
133 ac->ev.delWrite = NULL;
134 ac->ev.cleanup = NULL;
135 ac->ev.scheduleTimer = NULL;
136
137 ac->onConnect = NULL;
138 ac->onDisconnect = NULL;
139
140 ac->replies.head = NULL;
141 ac->replies.tail = NULL;
142 ac->sub.invalid.head = NULL;
143 ac->sub.invalid.tail = NULL;
144 ac->sub.channels = channels;
145 ac->sub.patterns = patterns;
146
147 return ac;
148 oom:
149 if (channels) dictRelease(channels);
150 if (patterns) dictRelease(patterns);
151 return NULL;
152 }
153
154 /* We want the error field to be accessible directly instead of requiring
155 * an indirection to the redisContext struct. */
__redisAsyncCopyError(redisAsyncContext * ac)156 static void __redisAsyncCopyError(redisAsyncContext *ac) {
157 if (!ac)
158 return;
159
160 redisContext *c = &(ac->c);
161 ac->err = c->err;
162 ac->errstr = c->errstr;
163 }
164
redisAsyncConnectWithOptions(const redisOptions * options)165 redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
166 redisOptions myOptions = *options;
167 redisContext *c;
168 redisAsyncContext *ac;
169
170 /* Clear any erroneously set sync callback and flag that we don't want to
171 * use freeReplyObject by default. */
172 myOptions.push_cb = NULL;
173 myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
174
175 myOptions.options |= REDIS_OPT_NONBLOCK;
176 c = redisConnectWithOptions(&myOptions);
177 if (c == NULL) {
178 return NULL;
179 }
180
181 ac = redisAsyncInitialize(c);
182 if (ac == NULL) {
183 redisFree(c);
184 return NULL;
185 }
186
187 /* Set any configured async push handler */
188 redisAsyncSetPushCallback(ac, myOptions.async_push_cb);
189
190 __redisAsyncCopyError(ac);
191 return ac;
192 }
193
redisAsyncConnect(const char * ip,int port)194 redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
195 redisOptions options = {0};
196 REDIS_OPTIONS_SET_TCP(&options, ip, port);
197 return redisAsyncConnectWithOptions(&options);
198 }
199
redisAsyncConnectBind(const char * ip,int port,const char * source_addr)200 redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
201 const char *source_addr) {
202 redisOptions options = {0};
203 REDIS_OPTIONS_SET_TCP(&options, ip, port);
204 options.endpoint.tcp.source_addr = source_addr;
205 return redisAsyncConnectWithOptions(&options);
206 }
207
redisAsyncConnectBindWithReuse(const char * ip,int port,const char * source_addr)208 redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
209 const char *source_addr) {
210 redisOptions options = {0};
211 REDIS_OPTIONS_SET_TCP(&options, ip, port);
212 options.options |= REDIS_OPT_REUSEADDR;
213 options.endpoint.tcp.source_addr = source_addr;
214 return redisAsyncConnectWithOptions(&options);
215 }
216
redisAsyncConnectUnix(const char * path)217 redisAsyncContext *redisAsyncConnectUnix(const char *path) {
218 redisOptions options = {0};
219 REDIS_OPTIONS_SET_UNIX(&options, path);
220 return redisAsyncConnectWithOptions(&options);
221 }
222
redisAsyncSetConnectCallback(redisAsyncContext * ac,redisConnectCallback * fn)223 int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
224 if (ac->onConnect == NULL) {
225 ac->onConnect = fn;
226
227 /* The common way to detect an established connection is to wait for
228 * the first write event to be fired. This assumes the related event
229 * library functions are already set. */
230 _EL_ADD_WRITE(ac);
231 return REDIS_OK;
232 }
233 return REDIS_ERR;
234 }
235
redisAsyncSetDisconnectCallback(redisAsyncContext * ac,redisDisconnectCallback * fn)236 int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
237 if (ac->onDisconnect == NULL) {
238 ac->onDisconnect = fn;
239 return REDIS_OK;
240 }
241 return REDIS_ERR;
242 }
243
244 /* Helper functions to push/shift callbacks */
__redisPushCallback(redisCallbackList * list,redisCallback * source)245 static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
246 redisCallback *cb;
247
248 /* Copy callback from stack to heap */
249 cb = hi_malloc(sizeof(*cb));
250 if (cb == NULL)
251 return REDIS_ERR_OOM;
252
253 if (source != NULL) {
254 memcpy(cb,source,sizeof(*cb));
255 cb->next = NULL;
256 }
257
258 /* Store callback in list */
259 if (list->head == NULL)
260 list->head = cb;
261 if (list->tail != NULL)
262 list->tail->next = cb;
263 list->tail = cb;
264 return REDIS_OK;
265 }
266
__redisShiftCallback(redisCallbackList * list,redisCallback * target)267 static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
268 redisCallback *cb = list->head;
269 if (cb != NULL) {
270 list->head = cb->next;
271 if (cb == list->tail)
272 list->tail = NULL;
273
274 /* Copy callback from heap to stack */
275 if (target != NULL)
276 memcpy(target,cb,sizeof(*cb));
277 hi_free(cb);
278 return REDIS_OK;
279 }
280 return REDIS_ERR;
281 }
282
__redisRunCallback(redisAsyncContext * ac,redisCallback * cb,redisReply * reply)283 static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
284 redisContext *c = &(ac->c);
285 if (cb->fn != NULL) {
286 c->flags |= REDIS_IN_CALLBACK;
287 cb->fn(ac,reply,cb->privdata);
288 c->flags &= ~REDIS_IN_CALLBACK;
289 }
290 }
291
__redisRunPushCallback(redisAsyncContext * ac,redisReply * reply)292 static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
293 if (ac->push_cb != NULL) {
294 ac->c.flags |= REDIS_IN_CALLBACK;
295 ac->push_cb(ac, reply);
296 ac->c.flags &= ~REDIS_IN_CALLBACK;
297 }
298 }
299
300 /* Helper function to free the context. */
__redisAsyncFree(redisAsyncContext * ac)301 static void __redisAsyncFree(redisAsyncContext *ac) {
302 redisContext *c = &(ac->c);
303 redisCallback cb;
304 dictIterator *it;
305 dictEntry *de;
306
307 /* Execute pending callbacks with NULL reply. */
308 while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
309 __redisRunCallback(ac,&cb,NULL);
310
311 /* Execute callbacks for invalid commands */
312 while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
313 __redisRunCallback(ac,&cb,NULL);
314
315 /* Run subscription callbacks with NULL reply */
316 if (ac->sub.channels) {
317 it = dictGetIterator(ac->sub.channels);
318 if (it != NULL) {
319 while ((de = dictNext(it)) != NULL)
320 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
321 dictReleaseIterator(it);
322 }
323
324 dictRelease(ac->sub.channels);
325 }
326
327 if (ac->sub.patterns) {
328 it = dictGetIterator(ac->sub.patterns);
329 if (it != NULL) {
330 while ((de = dictNext(it)) != NULL)
331 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
332 dictReleaseIterator(it);
333 }
334
335 dictRelease(ac->sub.patterns);
336 }
337
338 /* Signal event lib to clean up */
339 _EL_CLEANUP(ac);
340
341 /* Execute disconnect callback. When redisAsyncFree() initiated destroying
342 * this context, the status will always be REDIS_OK. */
343 if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
344 if (c->flags & REDIS_FREEING) {
345 ac->onDisconnect(ac,REDIS_OK);
346 } else {
347 ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
348 }
349 }
350
351 if (ac->dataCleanup) {
352 ac->dataCleanup(ac->data);
353 }
354
355 /* Cleanup self */
356 redisFree(c);
357 }
358
359 /* Free the async context. When this function is called from a callback,
360 * control needs to be returned to redisProcessCallbacks() before actual
361 * free'ing. To do so, a flag is set on the context which is picked up by
362 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
redisAsyncFree(redisAsyncContext * ac)363 void redisAsyncFree(redisAsyncContext *ac) {
364 redisContext *c = &(ac->c);
365 c->flags |= REDIS_FREEING;
366 if (!(c->flags & REDIS_IN_CALLBACK))
367 __redisAsyncFree(ac);
368 }
369
370 /* Helper function to make the disconnect happen and clean up. */
__redisAsyncDisconnect(redisAsyncContext * ac)371 void __redisAsyncDisconnect(redisAsyncContext *ac) {
372 redisContext *c = &(ac->c);
373
374 /* Make sure error is accessible if there is any */
375 __redisAsyncCopyError(ac);
376
377 if (ac->err == 0) {
378 /* For clean disconnects, there should be no pending callbacks. */
379 int ret = __redisShiftCallback(&ac->replies,NULL);
380 assert(ret == REDIS_ERR);
381 } else {
382 /* Disconnection is caused by an error, make sure that pending
383 * callbacks cannot call new commands. */
384 c->flags |= REDIS_DISCONNECTING;
385 }
386
387 /* cleanup event library on disconnect.
388 * this is safe to call multiple times */
389 _EL_CLEANUP(ac);
390
391 /* For non-clean disconnects, __redisAsyncFree() will execute pending
392 * callbacks with a NULL-reply. */
393 if (!(c->flags & REDIS_NO_AUTO_FREE)) {
394 __redisAsyncFree(ac);
395 }
396 }
397
398 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
399 * from being issued, but tries to flush the output buffer and execute
400 * callbacks for all remaining replies. When this function is called from a
401 * callback, there might be more replies and we can safely defer disconnecting
402 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
403 * when there are no pending callbacks. */
redisAsyncDisconnect(redisAsyncContext * ac)404 void redisAsyncDisconnect(redisAsyncContext *ac) {
405 redisContext *c = &(ac->c);
406 c->flags |= REDIS_DISCONNECTING;
407
408 /** unset the auto-free flag here, because disconnect undoes this */
409 c->flags &= ~REDIS_NO_AUTO_FREE;
410 if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
411 __redisAsyncDisconnect(ac);
412 }
413
__redisGetSubscribeCallback(redisAsyncContext * ac,redisReply * reply,redisCallback * dstcb)414 static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
415 redisContext *c = &(ac->c);
416 dict *callbacks;
417 redisCallback *cb;
418 dictEntry *de;
419 int pvariant;
420 char *stype;
421 sds sname;
422
423 /* Custom reply functions are not supported for pub/sub. This will fail
424 * very hard when they are used... */
425 if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
426 assert(reply->elements >= 2);
427 assert(reply->element[0]->type == REDIS_REPLY_STRING);
428 stype = reply->element[0]->str;
429 pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
430
431 if (pvariant)
432 callbacks = ac->sub.patterns;
433 else
434 callbacks = ac->sub.channels;
435
436 /* Locate the right callback */
437 assert(reply->element[1]->type == REDIS_REPLY_STRING);
438 sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
439 if (sname == NULL)
440 goto oom;
441
442 de = dictFind(callbacks,sname);
443 if (de != NULL) {
444 cb = dictGetEntryVal(de);
445
446 /* If this is an subscribe reply decrease pending counter. */
447 if (strcasecmp(stype+pvariant,"subscribe") == 0) {
448 cb->pending_subs -= 1;
449 }
450
451 memcpy(dstcb,cb,sizeof(*dstcb));
452
453 /* If this is an unsubscribe message, remove it. */
454 if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
455 if (cb->pending_subs == 0)
456 dictDelete(callbacks,sname);
457
458 /* If this was the last unsubscribe message, revert to
459 * non-subscribe mode. */
460 assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
461
462 /* Unset subscribed flag only when no pipelined pending subscribe. */
463 if (reply->element[2]->integer == 0
464 && dictSize(ac->sub.channels) == 0
465 && dictSize(ac->sub.patterns) == 0)
466 c->flags &= ~REDIS_SUBSCRIBED;
467 }
468 }
469 sdsfree(sname);
470 } else {
471 /* Shift callback for invalid commands. */
472 __redisShiftCallback(&ac->sub.invalid,dstcb);
473 }
474 return REDIS_OK;
475 oom:
476 __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
477 return REDIS_ERR;
478 }
479
480 #define redisIsSpontaneousPushReply(r) \
481 (redisIsPushReply(r) && !redisIsSubscribeReply(r))
482
redisIsSubscribeReply(redisReply * reply)483 static int redisIsSubscribeReply(redisReply *reply) {
484 char *str;
485 size_t len, off;
486
487 /* We will always have at least one string with the subscribe/message type */
488 if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING ||
489 reply->element[0]->len < sizeof("message") - 1)
490 {
491 return 0;
492 }
493
494 /* Get the string/len moving past 'p' if needed */
495 off = tolower(reply->element[0]->str[0]) == 'p';
496 str = reply->element[0]->str + off;
497 len = reply->element[0]->len - off;
498
499 return !strncasecmp(str, "subscribe", len) ||
500 !strncasecmp(str, "message", len);
501
502 }
503
redisProcessCallbacks(redisAsyncContext * ac)504 void redisProcessCallbacks(redisAsyncContext *ac) {
505 redisContext *c = &(ac->c);
506 redisCallback cb = {NULL, NULL, 0, NULL};
507 void *reply = NULL;
508 int status;
509
510 while((status = redisGetReply(c,&reply)) == REDIS_OK) {
511 if (reply == NULL) {
512 /* When the connection is being disconnected and there are
513 * no more replies, this is the cue to really disconnect. */
514 if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
515 && ac->replies.head == NULL) {
516 __redisAsyncDisconnect(ac);
517 return;
518 }
519
520 /* If monitor mode, repush callback */
521 if(c->flags & REDIS_MONITORING) {
522 __redisPushCallback(&ac->replies,&cb);
523 }
524
525 /* When the connection is not being disconnected, simply stop
526 * trying to get replies and wait for the next loop tick. */
527 break;
528 }
529
530 /* Send any non-subscribe related PUSH messages to our PUSH handler
531 * while allowing subscribe related PUSH messages to pass through.
532 * This allows existing code to be backward compatible and work in
533 * either RESP2 or RESP3 mode. */
534 if (redisIsSpontaneousPushReply(reply)) {
535 __redisRunPushCallback(ac, reply);
536 c->reader->fn->freeObject(reply);
537 continue;
538 }
539
540 /* Even if the context is subscribed, pending regular
541 * callbacks will get a reply before pub/sub messages arrive. */
542 if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
543 /*
544 * A spontaneous reply in a not-subscribed context can be the error
545 * reply that is sent when a new connection exceeds the maximum
546 * number of allowed connections on the server side.
547 *
548 * This is seen as an error instead of a regular reply because the
549 * server closes the connection after sending it.
550 *
551 * To prevent the error from being overwritten by an EOF error the
552 * connection is closed here. See issue #43.
553 *
554 * Another possibility is that the server is loading its dataset.
555 * In this case we also want to close the connection, and have the
556 * user wait until the server is ready to take our request.
557 */
558 if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
559 c->err = REDIS_ERR_OTHER;
560 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
561 c->reader->fn->freeObject(reply);
562 __redisAsyncDisconnect(ac);
563 return;
564 }
565 /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
566 assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
567 if(c->flags & REDIS_SUBSCRIBED)
568 __redisGetSubscribeCallback(ac,reply,&cb);
569 }
570
571 if (cb.fn != NULL) {
572 __redisRunCallback(ac,&cb,reply);
573 c->reader->fn->freeObject(reply);
574
575 /* Proceed with free'ing when redisAsyncFree() was called. */
576 if (c->flags & REDIS_FREEING) {
577 __redisAsyncFree(ac);
578 return;
579 }
580 } else {
581 /* No callback for this reply. This can either be a NULL callback,
582 * or there were no callbacks to begin with. Either way, don't
583 * abort with an error, but simply ignore it because the client
584 * doesn't know what the server will spit out over the wire. */
585 c->reader->fn->freeObject(reply);
586 }
587 }
588
589 /* Disconnect when there was an error reading the reply */
590 if (status != REDIS_OK)
591 __redisAsyncDisconnect(ac);
592 }
593
__redisAsyncHandleConnectFailure(redisAsyncContext * ac)594 static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
595 if (ac->onConnect) ac->onConnect(ac, REDIS_ERR);
596 __redisAsyncDisconnect(ac);
597 }
598
599 /* Internal helper function to detect socket status the first time a read or
600 * write event fires. When connecting was not successful, the connect callback
601 * is called with a REDIS_ERR status and the context is free'd. */
__redisAsyncHandleConnect(redisAsyncContext * ac)602 static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
603 int completed = 0;
604 redisContext *c = &(ac->c);
605
606 if (redisCheckConnectDone(c, &completed) == REDIS_ERR) {
607 /* Error! */
608 redisCheckSocketError(c);
609 __redisAsyncHandleConnectFailure(ac);
610 return REDIS_ERR;
611 } else if (completed == 1) {
612 /* connected! */
613 if (c->connection_type == REDIS_CONN_TCP &&
614 redisSetTcpNoDelay(c) == REDIS_ERR) {
615 __redisAsyncHandleConnectFailure(ac);
616 return REDIS_ERR;
617 }
618
619 if (ac->onConnect) ac->onConnect(ac, REDIS_OK);
620 c->flags |= REDIS_CONNECTED;
621 return REDIS_OK;
622 } else {
623 return REDIS_OK;
624 }
625 }
626
redisAsyncRead(redisAsyncContext * ac)627 void redisAsyncRead(redisAsyncContext *ac) {
628 redisContext *c = &(ac->c);
629
630 if (redisBufferRead(c) == REDIS_ERR) {
631 __redisAsyncDisconnect(ac);
632 } else {
633 /* Always re-schedule reads */
634 _EL_ADD_READ(ac);
635 redisProcessCallbacks(ac);
636 }
637 }
638
639 /* This function should be called when the socket is readable.
640 * It processes all replies that can be read and executes their callbacks.
641 */
redisAsyncHandleRead(redisAsyncContext * ac)642 void redisAsyncHandleRead(redisAsyncContext *ac) {
643 redisContext *c = &(ac->c);
644
645 if (!(c->flags & REDIS_CONNECTED)) {
646 /* Abort connect was not successful. */
647 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
648 return;
649 /* Try again later when the context is still not connected. */
650 if (!(c->flags & REDIS_CONNECTED))
651 return;
652 }
653
654 c->funcs->async_read(ac);
655 }
656
redisAsyncWrite(redisAsyncContext * ac)657 void redisAsyncWrite(redisAsyncContext *ac) {
658 redisContext *c = &(ac->c);
659 int done = 0;
660
661 if (redisBufferWrite(c,&done) == REDIS_ERR) {
662 __redisAsyncDisconnect(ac);
663 } else {
664 /* Continue writing when not done, stop writing otherwise */
665 if (!done)
666 _EL_ADD_WRITE(ac);
667 else
668 _EL_DEL_WRITE(ac);
669
670 /* Always schedule reads after writes */
671 _EL_ADD_READ(ac);
672 }
673 }
674
redisAsyncHandleWrite(redisAsyncContext * ac)675 void redisAsyncHandleWrite(redisAsyncContext *ac) {
676 redisContext *c = &(ac->c);
677
678 if (!(c->flags & REDIS_CONNECTED)) {
679 /* Abort connect was not successful. */
680 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
681 return;
682 /* Try again later when the context is still not connected. */
683 if (!(c->flags & REDIS_CONNECTED))
684 return;
685 }
686
687 c->funcs->async_write(ac);
688 }
689
redisAsyncHandleTimeout(redisAsyncContext * ac)690 void redisAsyncHandleTimeout(redisAsyncContext *ac) {
691 redisContext *c = &(ac->c);
692 redisCallback cb;
693
694 if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) {
695 /* Nothing to do - just an idle timeout */
696 return;
697 }
698
699 if (!c->err) {
700 __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
701 }
702
703 if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
704 ac->onConnect(ac, REDIS_ERR);
705 }
706
707 while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
708 __redisRunCallback(ac, &cb, NULL);
709 }
710
711 /**
712 * TODO: Don't automatically sever the connection,
713 * rather, allow to ignore <x> responses before the queue is clear
714 */
715 __redisAsyncDisconnect(ac);
716 }
717
718 /* Sets a pointer to the first argument and its length starting at p. Returns
719 * the number of bytes to skip to get to the following argument. */
nextArgument(const char * start,const char ** str,size_t * len)720 static const char *nextArgument(const char *start, const char **str, size_t *len) {
721 const char *p = start;
722 if (p[0] != '$') {
723 p = strchr(p,'$');
724 if (p == NULL) return NULL;
725 }
726
727 *len = (int)strtol(p+1,NULL,10);
728 p = strchr(p,'\r');
729 assert(p);
730 *str = p+2;
731 return p+2+(*len)+2;
732 }
733
734 /* Helper function for the redisAsyncCommand* family of functions. Writes a
735 * formatted command to the output buffer and registers the provided callback
736 * function with the context. */
__redisAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * cmd,size_t len)737 static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
738 redisContext *c = &(ac->c);
739 redisCallback cb;
740 struct dict *cbdict;
741 dictEntry *de;
742 redisCallback *existcb;
743 int pvariant, hasnext;
744 const char *cstr, *astr;
745 size_t clen, alen;
746 const char *p;
747 sds sname;
748 int ret;
749
750 /* Don't accept new commands when the connection is about to be closed. */
751 if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
752
753 /* Setup callback */
754 cb.fn = fn;
755 cb.privdata = privdata;
756 cb.pending_subs = 1;
757
758 /* Find out which command will be appended. */
759 p = nextArgument(cmd,&cstr,&clen);
760 assert(p != NULL);
761 hasnext = (p[0] == '$');
762 pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
763 cstr += pvariant;
764 clen -= pvariant;
765
766 if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
767 c->flags |= REDIS_SUBSCRIBED;
768
769 /* Add every channel/pattern to the list of subscription callbacks. */
770 while ((p = nextArgument(p,&astr,&alen)) != NULL) {
771 sname = sdsnewlen(astr,alen);
772 if (sname == NULL)
773 goto oom;
774
775 if (pvariant)
776 cbdict = ac->sub.patterns;
777 else
778 cbdict = ac->sub.channels;
779
780 de = dictFind(cbdict,sname);
781
782 if (de != NULL) {
783 existcb = dictGetEntryVal(de);
784 cb.pending_subs = existcb->pending_subs + 1;
785 }
786
787 ret = dictReplace(cbdict,sname,&cb);
788
789 if (ret == 0) sdsfree(sname);
790 }
791 } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
792 /* It is only useful to call (P)UNSUBSCRIBE when the context is
793 * subscribed to one or more channels or patterns. */
794 if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
795
796 /* (P)UNSUBSCRIBE does not have its own response: every channel or
797 * pattern that is unsubscribed will receive a message. This means we
798 * should not append a callback function for this command. */
799 } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
800 /* Set monitor flag and push callback */
801 c->flags |= REDIS_MONITORING;
802 __redisPushCallback(&ac->replies,&cb);
803 } else {
804 if (c->flags & REDIS_SUBSCRIBED)
805 /* This will likely result in an error reply, but it needs to be
806 * received and passed to the callback. */
807 __redisPushCallback(&ac->sub.invalid,&cb);
808 else
809 __redisPushCallback(&ac->replies,&cb);
810 }
811
812 __redisAppendCommand(c,cmd,len);
813
814 /* Always schedule a write when the write buffer is non-empty */
815 _EL_ADD_WRITE(ac);
816
817 return REDIS_OK;
818 oom:
819 __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
820 return REDIS_ERR;
821 }
822
redisvAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * format,va_list ap)823 int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
824 char *cmd;
825 int len;
826 int status;
827 len = redisvFormatCommand(&cmd,format,ap);
828
829 /* We don't want to pass -1 or -2 to future functions as a length. */
830 if (len < 0)
831 return REDIS_ERR;
832
833 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
834 hi_free(cmd);
835 return status;
836 }
837
redisAsyncCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * format,...)838 int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
839 va_list ap;
840 int status;
841 va_start(ap,format);
842 status = redisvAsyncCommand(ac,fn,privdata,format,ap);
843 va_end(ap);
844 return status;
845 }
846
redisAsyncCommandArgv(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,int argc,const char ** argv,const size_t * argvlen)847 int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
848 sds cmd;
849 int len;
850 int status;
851 len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
852 if (len < 0)
853 return REDIS_ERR;
854 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
855 sdsfree(cmd);
856 return status;
857 }
858
redisAsyncFormattedCommand(redisAsyncContext * ac,redisCallbackFn * fn,void * privdata,const char * cmd,size_t len)859 int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
860 int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
861 return status;
862 }
863
redisAsyncSetPushCallback(redisAsyncContext * ac,redisAsyncPushFn * fn)864 redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
865 redisAsyncPushFn *old = ac->push_cb;
866 ac->push_cb = fn;
867 return old;
868 }
869
redisAsyncSetTimeout(redisAsyncContext * ac,struct timeval tv)870 int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
871 if (!ac->c.command_timeout) {
872 ac->c.command_timeout = hi_calloc(1, sizeof(tv));
873 if (ac->c.command_timeout == NULL) {
874 __redisSetError(&ac->c, REDIS_ERR_OOM, "Out of memory");
875 __redisAsyncCopyError(ac);
876 return REDIS_ERR;
877 }
878 }
879
880 if (tv.tv_sec != ac->c.command_timeout->tv_sec ||
881 tv.tv_usec != ac->c.command_timeout->tv_usec)
882 {
883 *ac->c.command_timeout = tv;
884 }
885
886 return REDIS_OK;
887 }
888