1 /* blocked.c - generic support for blocking operations like BLPOP & WAIT.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * ---------------------------------------------------------------------------
31  *
32  * API:
33  *
34  * blockClient() set the CLIENT_BLOCKED flag in the client, and set the
35  * specified block type 'btype' filed to one of BLOCKED_* macros.
36  *
37  * unblockClient() unblocks the client doing the following:
38  * 1) It calls the btype-specific function to cleanup the state.
39  * 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag.
40  * 3) It puts the client into a list of just unblocked clients that are
41  *    processed ASAP in the beforeSleep() event loop callback, so that
42  *    if there is some query buffer to process, we do it. This is also
43  *    required because otherwise there is no 'readable' event fired, we
44  *    already read the pending commands. We also set the CLIENT_UNBLOCKED
45  *    flag to remember the client is in the unblocked_clients list.
46  *
47  * processUnblockedClients() is called inside the beforeSleep() function
48  * to process the query buffer from unblocked clients and remove the clients
49  * from the blocked_clients queue.
50  *
51  * replyToBlockedClientTimedOut() is called by the cron function when
52  * a client blocked reaches the specified timeout (if the timeout is set
53  * to 0, no timeout is processed).
54  * It usually just needs to send a reply to the client.
55  *
56  * When implementing a new type of blocking operation, the implementation
57  * should modify unblockClient() and replyToBlockedClientTimedOut() in order
58  * to handle the btype-specific behavior of this two functions.
59  * If the blocking operation waits for certain keys to change state, the
60  * clusterRedirectBlockedClientIfNeeded() function should also be updated.
61  */
62 
63 #include "server.h"
64 #include "slowlog.h"
65 #include "latency.h"
66 #include "monotonic.h"
67 
68 void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted);
69 int getListPositionFromObjectOrReply(client *c, robj *arg, int *position);
70 
71 /* This structure represents the blocked key information that we store
72  * in the client structure. Each client blocked on keys, has a
73  * client->bpop.keys hash table. The keys of the hash table are Redis
74  * keys pointers to 'robj' structures. The value is this structure.
75  * The structure has two goals: firstly we store the list node that this
76  * client uses to be listed in the database "blocked clients for this key"
77  * list, so we can later unblock in O(1) without a list scan.
78  * Secondly for certain blocking types, we have additional info. Right now
79  * the only use for additional info we have is when clients are blocked
80  * on streams, as we have to remember the ID it blocked for. */
81 typedef struct bkinfo {
82     listNode *listnode;     /* List node for db->blocking_keys[key] list. */
83     streamID stream_id;     /* Stream ID if we blocked in a stream. */
84 } bkinfo;
85 
86 /* Block a client for the specific operation type. Once the CLIENT_BLOCKED
87  * flag is set client query buffer is not longer processed, but accumulated,
88  * and will be processed when the client is unblocked. */
blockClient(client * c,int btype)89 void blockClient(client *c, int btype) {
90     /* Master client should never be blocked unless pause or module */
91     serverAssert(!(c->flags & CLIENT_MASTER &&
92                    btype != BLOCKED_MODULE &&
93                    btype != BLOCKED_PAUSE));
94 
95     c->flags |= CLIENT_BLOCKED;
96     c->btype = btype;
97     server.blocked_clients++;
98     server.blocked_clients_by_type[btype]++;
99     addClientToTimeoutTable(c);
100     if (btype == BLOCKED_PAUSE) {
101         listAddNodeTail(server.paused_clients, c);
102         c->paused_list_node = listLast(server.paused_clients);
103         /* Mark this client to execute its command */
104         c->flags |= CLIENT_PENDING_COMMAND;
105     }
106 }
107 
108 /* This function is called after a client has finished a blocking operation
109  * in order to update the total command duration, log the command into
110  * the Slow log if needed, and log the reply duration event if needed. */
updateStatsOnUnblock(client * c,long blocked_us,long reply_us)111 void updateStatsOnUnblock(client *c, long blocked_us, long reply_us){
112     const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
113     c->lastcmd->microseconds += total_cmd_duration;
114 
115     /* Log the command into the Slow log if needed. */
116     slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration);
117     /* Log the reply duration event. */
118     latencyAddSampleIfNeeded("command-unblocking",reply_us/1000);
119 }
120 
121 /* This function is called in the beforeSleep() function of the event loop
122  * in order to process the pending input buffer of clients that were
123  * unblocked after a blocking operation. */
processUnblockedClients(void)124 void processUnblockedClients(void) {
125     listNode *ln;
126     client *c;
127 
128     while (listLength(server.unblocked_clients)) {
129         ln = listFirst(server.unblocked_clients);
130         serverAssert(ln != NULL);
131         c = ln->value;
132         listDelNode(server.unblocked_clients,ln);
133         c->flags &= ~CLIENT_UNBLOCKED;
134 
135         /* Process remaining data in the input buffer, unless the client
136          * is blocked again. Actually processInputBuffer() checks that the
137          * client is not blocked before to proceed, but things may change and
138          * the code is conceptually more correct this way. */
139         if (!(c->flags & CLIENT_BLOCKED)) {
140             /* If we have a queued command, execute it now. */
141             if (processPendingCommandsAndResetClient(c) == C_OK) {
142                 /* Now process client if it has more data in it's buffer. */
143                 if (c->querybuf && sdslen(c->querybuf) > 0) {
144                     if (processInputBuffer(c) == C_ERR) c = NULL;
145                 }
146             } else {
147                 c = NULL;
148             }
149         }
150         beforeNextClient(c);
151     }
152 }
153 
154 /* This function will schedule the client for reprocessing at a safe time.
155  *
156  * This is useful when a client was blocked for some reason (blocking operation,
157  * CLIENT PAUSE, or whatever), because it may end with some accumulated query
158  * buffer that needs to be processed ASAP:
159  *
160  * 1. When a client is blocked, its readable handler is still active.
161  * 2. However in this case it only gets data into the query buffer, but the
162  *    query is not parsed or executed once there is enough to proceed as
163  *    usually (because the client is blocked... so we can't execute commands).
164  * 3. When the client is unblocked, without this function, the client would
165  *    have to write some query in order for the readable handler to finally
166  *    call processQueryBuffer*() on it.
167  * 4. With this function instead we can put the client in a queue that will
168  *    process it for queries ready to be executed at a safe time.
169  */
queueClientForReprocessing(client * c)170 void queueClientForReprocessing(client *c) {
171     /* The client may already be into the unblocked list because of a previous
172      * blocking operation, don't add back it into the list multiple times. */
173     if (!(c->flags & CLIENT_UNBLOCKED)) {
174         c->flags |= CLIENT_UNBLOCKED;
175         listAddNodeTail(server.unblocked_clients,c);
176     }
177 }
178 
179 /* Unblock a client calling the right function depending on the kind
180  * of operation the client is blocking for. */
unblockClient(client * c)181 void unblockClient(client *c) {
182     if (c->btype == BLOCKED_LIST ||
183         c->btype == BLOCKED_ZSET ||
184         c->btype == BLOCKED_STREAM) {
185         unblockClientWaitingData(c);
186     } else if (c->btype == BLOCKED_WAIT) {
187         unblockClientWaitingReplicas(c);
188     } else if (c->btype == BLOCKED_MODULE) {
189         if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
190         unblockClientFromModule(c);
191     } else if (c->btype == BLOCKED_PAUSE) {
192         listDelNode(server.paused_clients,c->paused_list_node);
193         c->paused_list_node = NULL;
194     } else {
195         serverPanic("Unknown btype in unblockClient().");
196     }
197 
198     /* Reset the client for a new query since, for blocking commands
199      * we do not do it immediately after the command returns (when the
200      * client got blocked) in order to be still able to access the argument
201      * vector from module callbacks and updateStatsOnUnblock. */
202     if (c->btype != BLOCKED_PAUSE) {
203         freeClientOriginalArgv(c);
204         resetClient(c);
205     }
206 
207     /* Clear the flags, and put the client in the unblocked list so that
208      * we'll process new commands in its query buffer ASAP. */
209     server.blocked_clients--;
210     server.blocked_clients_by_type[c->btype]--;
211     c->flags &= ~CLIENT_BLOCKED;
212     c->btype = BLOCKED_NONE;
213     removeClientFromTimeoutTable(c);
214     queueClientForReprocessing(c);
215 }
216 
217 /* This function gets called when a blocked client timed out in order to
218  * send it a reply of some kind. After this function is called,
219  * unblockClient() will be called with the same client as argument. */
replyToBlockedClientTimedOut(client * c)220 void replyToBlockedClientTimedOut(client *c) {
221     if (c->btype == BLOCKED_LIST ||
222         c->btype == BLOCKED_ZSET ||
223         c->btype == BLOCKED_STREAM) {
224         addReplyNullArray(c);
225     } else if (c->btype == BLOCKED_WAIT) {
226         addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
227     } else if (c->btype == BLOCKED_MODULE) {
228         moduleBlockedClientTimedOut(c);
229     } else {
230         serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
231     }
232 }
233 
234 /* Mass-unblock clients because something changed in the instance that makes
235  * blocking no longer safe. For example clients blocked in list operations
236  * in an instance which turns from master to slave is unsafe, so this function
237  * is called when a master turns into a slave.
238  *
239  * The semantics is to send an -UNBLOCKED error to the client, disconnecting
240  * it at the same time. */
disconnectAllBlockedClients(void)241 void disconnectAllBlockedClients(void) {
242     listNode *ln;
243     listIter li;
244 
245     listRewind(server.clients,&li);
246     while((ln = listNext(&li))) {
247         client *c = listNodeValue(ln);
248 
249         if (c->flags & CLIENT_BLOCKED) {
250             /* PAUSED clients are an exception, when they'll be unblocked, the
251              * command processing will start from scratch, and the command will
252              * be either executed or rejected. (unlike LIST blocked clients for
253              * which the command is already in progress in a way. */
254             if (c->btype == BLOCKED_PAUSE)
255                 continue;
256 
257             addReplyError(c,
258                 "-UNBLOCKED force unblock from blocking operation, "
259                 "instance state changed (master -> replica?)");
260             unblockClient(c);
261             c->flags |= CLIENT_CLOSE_AFTER_REPLY;
262         }
263     }
264 }
265 
266 /* Helper function for handleClientsBlockedOnKeys(). This function is called
267  * when there may be clients blocked on a list key, and there may be new
268  * data to fetch (the key is ready). */
serveClientsBlockedOnListKey(robj * o,readyList * rl)269 void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
270     /* We serve clients in the same order they blocked for
271      * this key, from the first blocked to the last. */
272     dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
273     if (de) {
274         list *clients = dictGetVal(de);
275         int numclients = listLength(clients);
276         int deleted = 0;
277 
278         while(numclients--) {
279             listNode *clientnode = listFirst(clients);
280             client *receiver = clientnode->value;
281 
282             if (receiver->btype != BLOCKED_LIST) {
283                 /* Put at the tail, so that at the next call
284                  * we'll not run into it again. */
285                 listRotateHeadToTail(clients);
286                 continue;
287             }
288 
289             robj *dstkey = receiver->bpop.target;
290             int wherefrom = receiver->bpop.blockpos.wherefrom;
291             int whereto = receiver->bpop.blockpos.whereto;
292 
293             /* Protect receiver->bpop.target, that will be
294              * freed by the next unblockClient()
295              * call. */
296             if (dstkey) incrRefCount(dstkey);
297 
298             client *old_client = server.current_client;
299             server.current_client = receiver;
300             monotime replyTimer;
301             elapsedStart(&replyTimer);
302             serveClientBlockedOnList(receiver, o,
303                                      rl->key, dstkey, rl->db,
304                                      wherefrom, whereto,
305                                      &deleted);
306             updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
307             unblockClient(receiver);
308             afterCommand(receiver);
309             server.current_client = old_client;
310 
311             if (dstkey) decrRefCount(dstkey);
312 
313             /* The list is empty and has been deleted. */
314             if (deleted) break;
315         }
316     }
317 }
318 
319 /* Helper function for handleClientsBlockedOnKeys(). This function is called
320  * when there may be clients blocked on a sorted set key, and there may be new
321  * data to fetch (the key is ready). */
serveClientsBlockedOnSortedSetKey(robj * o,readyList * rl)322 void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
323     /* We serve clients in the same order they blocked for
324      * this key, from the first blocked to the last. */
325     dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
326     if (de) {
327         list *clients = dictGetVal(de);
328         int numclients = listLength(clients);
329         int deleted = 0;
330 
331         while (numclients--) {
332             listNode *clientnode = listFirst(clients);
333             client *receiver = clientnode->value;
334 
335             if (receiver->btype != BLOCKED_ZSET) {
336                 /* Put at the tail, so that at the next call
337                  * we'll not run into it again. */
338                 listRotateHeadToTail(clients);
339                 continue;
340             }
341 
342             long llen = zsetLength(o);
343             long count = receiver->bpop.count;
344             int where = receiver->bpop.blockpos.wherefrom;
345             int use_nested_array = (receiver->lastcmd &&
346                                     receiver->lastcmd->proc == bzmpopCommand)
347                                     ? 1 : 0;
348             int reply_nil_when_empty = use_nested_array;
349 
350             client *old_client = server.current_client;
351             server.current_client = receiver;
352             monotime replyTimer;
353             elapsedStart(&replyTimer);
354             genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted);
355             updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
356             unblockClient(receiver);
357             afterCommand(receiver);
358             server.current_client = old_client;
359 
360             /* Replicate the command. */
361             int argc = 2;
362             robj *argv[3];
363             argv[0] = where == ZSET_MIN ? shared.zpopmin : shared.zpopmax;
364             argv[1] = rl->key;
365             incrRefCount(rl->key);
366             if (count != -1) {
367                 /* Replicate it as command with COUNT. */
368                 robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
369                 argv[2] = count_obj;
370                 argc++;
371             }
372             propagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL);
373             decrRefCount(argv[1]);
374             if (count != -1) decrRefCount(argv[2]);
375 
376             /* The zset is empty and has been deleted. */
377             if (deleted) break;
378         }
379     }
380 }
381 
382 /* Helper function for handleClientsBlockedOnKeys(). This function is called
383  * when there may be clients blocked on a stream key, and there may be new
384  * data to fetch (the key is ready). */
serveClientsBlockedOnStreamKey(robj * o,readyList * rl)385 void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
386     dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
387     stream *s = o->ptr;
388 
389     /* We need to provide the new data arrived on the stream
390      * to all the clients that are waiting for an offset smaller
391      * than the current top item. */
392     if (de) {
393         list *clients = dictGetVal(de);
394         listNode *ln;
395         listIter li;
396         listRewind(clients,&li);
397 
398         while((ln = listNext(&li))) {
399             client *receiver = listNodeValue(ln);
400             if (receiver->btype != BLOCKED_STREAM) continue;
401             bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key);
402             streamID *gt = &bki->stream_id;
403 
404             /* If we blocked in the context of a consumer
405              * group, we need to resolve the group and update the
406              * last ID the client is blocked for: this is needed
407              * because serving other clients in the same consumer
408              * group will alter the "last ID" of the consumer
409              * group, and clients blocked in a consumer group are
410              * always blocked for the ">" ID: we need to deliver
411              * only new messages and avoid unblocking the client
412              * otherwise. */
413             streamCG *group = NULL;
414             if (receiver->bpop.xread_group) {
415                 group = streamLookupCG(s,
416                         receiver->bpop.xread_group->ptr);
417                 /* If the group was not found, send an error
418                  * to the consumer. */
419                 if (!group) {
420                     addReplyError(receiver,
421                         "-NOGROUP the consumer group this client "
422                         "was blocked on no longer exists");
423                     unblockClient(receiver);
424                     continue;
425                 } else {
426                     *gt = group->last_id;
427                 }
428             }
429 
430             if (streamCompareID(&s->last_id, gt) > 0) {
431                 streamID start = *gt;
432                 streamIncrID(&start);
433 
434                 /* Lookup the consumer for the group, if any. */
435                 streamConsumer *consumer = NULL;
436                 int noack = 0;
437 
438                 if (group) {
439                     noack = receiver->bpop.xread_group_noack;
440                     sds name = receiver->bpop.xread_consumer->ptr;
441                     consumer = streamLookupConsumer(group,name,SLC_DEFAULT);
442                     if (consumer == NULL) {
443                         consumer = streamCreateConsumer(group,name,rl->key,
444                                                         rl->db->id,SCC_DEFAULT);
445                         if (noack) {
446                             streamPropagateConsumerCreation(receiver,rl->key,
447                                                             receiver->bpop.xread_group,
448                                                             consumer->name);
449                         }
450                     }
451                 }
452 
453                 client *old_client = server.current_client;
454                 server.current_client = receiver;
455                 monotime replyTimer;
456                 elapsedStart(&replyTimer);
457                 /* Emit the two elements sub-array consisting of
458                  * the name of the stream and the data we
459                  * extracted from it. Wrapped in a single-item
460                  * array, since we have just one key. */
461                 if (receiver->resp == 2) {
462                     addReplyArrayLen(receiver,1);
463                     addReplyArrayLen(receiver,2);
464                 } else {
465                     addReplyMapLen(receiver,1);
466                 }
467                 addReplyBulk(receiver,rl->key);
468 
469                 streamPropInfo pi = {
470                     rl->key,
471                     receiver->bpop.xread_group
472                 };
473                 streamReplyWithRange(receiver,s,&start,NULL,
474                                      receiver->bpop.xread_count,
475                                      0, group, consumer, noack, &pi);
476                 updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
477 
478                 /* Note that after we unblock the client, 'gt'
479                  * and other receiver->bpop stuff are no longer
480                  * valid, so we must do the setup above before
481                  * this call. */
482                 unblockClient(receiver);
483                 afterCommand(receiver);
484                 server.current_client = old_client;
485             }
486         }
487     }
488 }
489 
490 /* Helper function for handleClientsBlockedOnKeys(). This function is called
491  * in order to check if we can serve clients blocked by modules using
492  * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready:
493  * our goal here is to call the RedisModuleBlockedClient reply() callback to
494  * see if the key is really able to serve the client, and in that case,
495  * unblock it. */
serveClientsBlockedOnKeyByModule(readyList * rl)496 void serveClientsBlockedOnKeyByModule(readyList *rl) {
497     dictEntry *de;
498 
499     /* Optimization: If no clients are in type BLOCKED_MODULE,
500      * we can skip this loop. */
501     if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return;
502 
503     /* We serve clients in the same order they blocked for
504      * this key, from the first blocked to the last. */
505     de = dictFind(rl->db->blocking_keys,rl->key);
506     if (de) {
507         list *clients = dictGetVal(de);
508         int numclients = listLength(clients);
509 
510         while(numclients--) {
511             listNode *clientnode = listFirst(clients);
512             client *receiver = clientnode->value;
513 
514             /* Put at the tail, so that at the next call
515              * we'll not run into it again: clients here may not be
516              * ready to be served, so they'll remain in the list
517              * sometimes. We want also be able to skip clients that are
518              * not blocked for the MODULE type safely. */
519             listRotateHeadToTail(clients);
520 
521             if (receiver->btype != BLOCKED_MODULE) continue;
522 
523             /* Note that if *this* client cannot be served by this key,
524              * it does not mean that another client that is next into the
525              * list cannot be served as well: they may be blocked by
526              * different modules with different triggers to consider if a key
527              * is ready or not. This means we can't exit the loop but need
528              * to continue after the first failure. */
529             client *old_client = server.current_client;
530             server.current_client = receiver;
531             monotime replyTimer;
532             elapsedStart(&replyTimer);
533             if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
534             updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
535 
536             moduleUnblockClient(receiver);
537             afterCommand(receiver);
538             server.current_client = old_client;
539         }
540     }
541 }
542 
543 /* This function should be called by Redis every time a single command,
544  * a MULTI/EXEC block, or a Lua script, terminated its execution after
545  * being called by a client. It handles serving clients blocked in
546  * lists, streams, and sorted sets, via a blocking commands.
547  *
548  * All the keys with at least one client blocked that received at least
549  * one new element via some write operation are accumulated into
550  * the server.ready_keys list. This function will run the list and will
551  * serve clients accordingly. Note that the function will iterate again and
552  * again as a result of serving BLMOVE we can have new blocking clients
553  * to serve because of the PUSH side of BLMOVE.
554  *
555  * This function is normally "fair", that is, it will server clients
556  * using a FIFO behavior. However this fairness is violated in certain
557  * edge cases, that is, when we have clients blocked at the same time
558  * in a sorted set and in a list, for the same key (a very odd thing to
559  * do client side, indeed!). Because mismatching clients (blocking for
560  * a different type compared to the current key type) are moved in the
561  * other side of the linked list. However as long as the key starts to
562  * be used only for a single type, like virtually any Redis application will
563  * do, the function is already fair. */
handleClientsBlockedOnKeys(void)564 void handleClientsBlockedOnKeys(void) {
565     while(listLength(server.ready_keys) != 0) {
566         list *l;
567 
568         /* Point server.ready_keys to a fresh list and save the current one
569          * locally. This way as we run the old list we are free to call
570          * signalKeyAsReady() that may push new elements in server.ready_keys
571          * when handling clients blocked into BLMOVE. */
572         l = server.ready_keys;
573         server.ready_keys = listCreate();
574 
575         while(listLength(l) != 0) {
576             listNode *ln = listFirst(l);
577             readyList *rl = ln->value;
578 
579             /* First of all remove this key from db->ready_keys so that
580              * we can safely call signalKeyAsReady() against this key. */
581             dictDelete(rl->db->ready_keys,rl->key);
582 
583             /* Even if we are not inside call(), increment the call depth
584              * in order to make sure that keys are expired against a fixed
585              * reference time, and not against the wallclock time. This
586              * way we can lookup an object multiple times (BLMOVE does
587              * that) without the risk of it being freed in the second
588              * lookup, invalidating the first one.
589              * See https://github.com/redis/redis/pull/6554. */
590             server.fixed_time_expire++;
591             updateCachedTime(0);
592 
593             /* Serve clients blocked on the key. */
594             robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS);
595             if (o != NULL) {
596                 if (o->type == OBJ_LIST)
597                     serveClientsBlockedOnListKey(o,rl);
598                 else if (o->type == OBJ_ZSET)
599                     serveClientsBlockedOnSortedSetKey(o,rl);
600                 else if (o->type == OBJ_STREAM)
601                     serveClientsBlockedOnStreamKey(o,rl);
602                 /* We want to serve clients blocked on module keys
603                  * regardless of the object type: we don't know what the
604                  * module is trying to accomplish right now. */
605                 serveClientsBlockedOnKeyByModule(rl);
606             }
607             server.fixed_time_expire--;
608 
609             /* Free this item. */
610             decrRefCount(rl->key);
611             zfree(rl);
612             listDelNode(l,ln);
613         }
614         listRelease(l); /* We have the new list on place at this point. */
615     }
616 }
617 
618 /* This is how the current blocking lists/sorted sets/streams work, we use
619  * BLPOP as example, but the concept is the same for other list ops, sorted
620  * sets and XREAD.
621  * - If the user calls BLPOP and the key exists and contains a non empty list
622  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
623  *   if blocking is not required.
624  * - If instead BLPOP is called and the key does not exists or the list is
625  *   empty we need to block. In order to do so we remove the notification for
626  *   new data to read in the client socket (so that we'll not serve new
627  *   requests if the blocking request is not served). Also we put the client
628  *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
629  *   blocking for this keys.
630  * - If a PUSH operation against a key with blocked clients waiting is
631  *   performed, we mark this key as "ready", and after the current command,
632  *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
633  *   for this list, from the one that blocked first, to the last, accordingly
634  *   to the number of elements we have in the ready list.
635  */
636 
637 /* Set a client in blocking mode for the specified key (list, zset or stream),
638  * with the specified timeout. The 'type' argument is BLOCKED_LIST,
639  * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
640  * waiting for an empty key in order to awake the client. The client is blocked
641  * for all the 'numkeys' keys as in the 'keys' argument. When we block for
642  * stream keys, we also provide an array of streamID structures: clients will
643  * be unblocked only when items with an ID greater or equal to the specified
644  * one is appended to the stream.
645  *
646  * 'count' for those commands that support the optional count argument.
647  * Otherwise the value is 0. */
blockForKeys(client * c,int btype,robj ** keys,int numkeys,long count,mstime_t timeout,robj * target,struct blockPos * blockpos,streamID * ids)648 void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids) {
649     dictEntry *de;
650     list *l;
651     int j;
652 
653     c->bpop.count = count;
654     c->bpop.timeout = timeout;
655     c->bpop.target = target;
656 
657     if (blockpos != NULL) c->bpop.blockpos = *blockpos;
658 
659     if (target != NULL) incrRefCount(target);
660 
661     for (j = 0; j < numkeys; j++) {
662         /* Allocate our bkinfo structure, associated to each key the client
663          * is blocked for. */
664         bkinfo *bki = zmalloc(sizeof(*bki));
665         if (btype == BLOCKED_STREAM)
666             bki->stream_id = ids[j];
667 
668         /* If the key already exists in the dictionary ignore it. */
669         if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
670             zfree(bki);
671             continue;
672         }
673         incrRefCount(keys[j]);
674 
675         /* And in the other "side", to map keys -> clients */
676         de = dictFind(c->db->blocking_keys,keys[j]);
677         if (de == NULL) {
678             int retval;
679 
680             /* For every key we take a list of clients blocked for it */
681             l = listCreate();
682             retval = dictAdd(c->db->blocking_keys,keys[j],l);
683             incrRefCount(keys[j]);
684             serverAssertWithInfo(c,keys[j],retval == DICT_OK);
685         } else {
686             l = dictGetVal(de);
687         }
688         listAddNodeTail(l,c);
689         bki->listnode = listLast(l);
690     }
691     blockClient(c,btype);
692 }
693 
694 /* Unblock a client that's waiting in a blocking operation such as BLPOP.
695  * You should never call this function directly, but unblockClient() instead. */
unblockClientWaitingData(client * c)696 void unblockClientWaitingData(client *c) {
697     dictEntry *de;
698     dictIterator *di;
699     list *l;
700 
701     serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
702     di = dictGetIterator(c->bpop.keys);
703     /* The client may wait for multiple keys, so unblock it for every key. */
704     while((de = dictNext(di)) != NULL) {
705         robj *key = dictGetKey(de);
706         bkinfo *bki = dictGetVal(de);
707 
708         /* Remove this client from the list of clients waiting for this key. */
709         l = dictFetchValue(c->db->blocking_keys,key);
710         serverAssertWithInfo(c,key,l != NULL);
711         listDelNode(l,bki->listnode);
712         /* If the list is empty we need to remove it to avoid wasting memory */
713         if (listLength(l) == 0)
714             dictDelete(c->db->blocking_keys,key);
715     }
716     dictReleaseIterator(di);
717 
718     /* Cleanup the client structure */
719     dictEmpty(c->bpop.keys,NULL);
720     if (c->bpop.target) {
721         decrRefCount(c->bpop.target);
722         c->bpop.target = NULL;
723     }
724     if (c->bpop.xread_group) {
725         decrRefCount(c->bpop.xread_group);
726         decrRefCount(c->bpop.xread_consumer);
727         c->bpop.xread_group = NULL;
728         c->bpop.xread_consumer = NULL;
729     }
730 }
731 
getBlockedTypeByType(int type)732 static int getBlockedTypeByType(int type) {
733     switch (type) {
734         case OBJ_LIST: return BLOCKED_LIST;
735         case OBJ_ZSET: return BLOCKED_ZSET;
736         case OBJ_MODULE: return BLOCKED_MODULE;
737         case OBJ_STREAM: return BLOCKED_STREAM;
738         default: return BLOCKED_NONE;
739     }
740 }
741 
742 /* If the specified key has clients blocked waiting for list pushes, this
743  * function will put the key reference into the server.ready_keys list.
744  * Note that db->ready_keys is a hash table that allows us to avoid putting
745  * the same key again and again in the list in case of multiple pushes
746  * made by a script or in the context of MULTI/EXEC.
747  *
748  * The list will be finally processed by handleClientsBlockedOnKeys() */
signalKeyAsReady(redisDb * db,robj * key,int type)749 void signalKeyAsReady(redisDb *db, robj *key, int type) {
750     readyList *rl;
751 
752     /* Quick returns. */
753     int btype = getBlockedTypeByType(type);
754     if (btype == BLOCKED_NONE) {
755         /* The type can never block. */
756         return;
757     }
758     if (!server.blocked_clients_by_type[btype] &&
759         !server.blocked_clients_by_type[BLOCKED_MODULE]) {
760         /* No clients block on this type. Note: Blocked modules are represented
761          * by BLOCKED_MODULE, even if the intention is to wake up by normal
762          * types (list, zset, stream), so we need to check that there are no
763          * blocked modules before we do a quick return here. */
764         return;
765     }
766 
767     /* No clients blocking for this key? No need to queue it. */
768     if (dictFind(db->blocking_keys,key) == NULL) return;
769 
770     /* Key was already signaled? No need to queue it again. */
771     if (dictFind(db->ready_keys,key) != NULL) return;
772 
773     /* Ok, we need to queue this key into server.ready_keys. */
774     rl = zmalloc(sizeof(*rl));
775     rl->key = key;
776     rl->db = db;
777     incrRefCount(key);
778     listAddNodeTail(server.ready_keys,rl);
779 
780     /* We also add the key in the db->ready_keys dictionary in order
781      * to avoid adding it multiple times into a list with a simple O(1)
782      * check. */
783     incrRefCount(key);
784     serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
785 }
786 
787