1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31
32 /*-----------------------------------------------------------------------------
33 * List API
34 *----------------------------------------------------------------------------*/
35
36 /* The function pushes an element to the specified list object 'subject',
37 * at head or tail position as specified by 'where'.
38 *
39 * There is no need for the caller to increment the refcount of 'value' as
40 * the function takes care of it if needed. */
listTypePush(robj * subject,robj * value,int where)41 void listTypePush(robj *subject, robj *value, int where) {
42 if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
43 int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
44 if (value->encoding == OBJ_ENCODING_INT) {
45 char buf[32];
46 ll2string(buf, 32, (long)value->ptr);
47 quicklistPush(subject->ptr, buf, strlen(buf), pos);
48 } else {
49 quicklistPush(subject->ptr, value->ptr, sdslen(value->ptr), pos);
50 }
51 } else {
52 serverPanic("Unknown list encoding");
53 }
54 }
55
listPopSaver(unsigned char * data,size_t sz)56 void *listPopSaver(unsigned char *data, size_t sz) {
57 return createStringObject((char*)data,sz);
58 }
59
listTypePop(robj * subject,int where)60 robj *listTypePop(robj *subject, int where) {
61 long long vlong;
62 robj *value = NULL;
63
64 int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
65 if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
66 if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
67 NULL, &vlong, listPopSaver)) {
68 if (!value)
69 value = createStringObjectFromLongLong(vlong);
70 }
71 } else {
72 serverPanic("Unknown list encoding");
73 }
74 return value;
75 }
76
listTypeLength(const robj * subject)77 unsigned long listTypeLength(const robj *subject) {
78 if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
79 return quicklistCount(subject->ptr);
80 } else {
81 serverPanic("Unknown list encoding");
82 }
83 }
84
85 /* Initialize an iterator at the specified index. */
listTypeInitIterator(robj * subject,long index,unsigned char direction)86 listTypeIterator *listTypeInitIterator(robj *subject, long index,
87 unsigned char direction) {
88 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
89 li->subject = subject;
90 li->encoding = subject->encoding;
91 li->direction = direction;
92 li->iter = NULL;
93 /* LIST_HEAD means start at TAIL and move *towards* head.
94 * LIST_TAIL means start at HEAD and move *towards tail. */
95 int iter_direction =
96 direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
97 if (li->encoding == OBJ_ENCODING_QUICKLIST) {
98 li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
99 iter_direction, index);
100 } else {
101 serverPanic("Unknown list encoding");
102 }
103 return li;
104 }
105
106 /* Sets the direction of an iterator. */
listTypeSetIteratorDirection(listTypeIterator * li,unsigned char direction)107 void listTypeSetIteratorDirection(listTypeIterator *li, unsigned char direction) {
108 li->direction = direction;
109 int dir = direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
110 quicklistSetDirection(li->iter, dir);
111 }
112
113 /* Clean up the iterator. */
listTypeReleaseIterator(listTypeIterator * li)114 void listTypeReleaseIterator(listTypeIterator *li) {
115 quicklistReleaseIterator(li->iter);
116 zfree(li);
117 }
118
119 /* Stores pointer to current the entry in the provided entry structure
120 * and advances the position of the iterator. Returns 1 when the current
121 * entry is in fact an entry, 0 otherwise. */
listTypeNext(listTypeIterator * li,listTypeEntry * entry)122 int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
123 /* Protect from converting when iterating */
124 serverAssert(li->subject->encoding == li->encoding);
125
126 entry->li = li;
127 if (li->encoding == OBJ_ENCODING_QUICKLIST) {
128 return quicklistNext(li->iter, &entry->entry);
129 } else {
130 serverPanic("Unknown list encoding");
131 }
132 return 0;
133 }
134
135 /* Return entry or NULL at the current position of the iterator. */
listTypeGet(listTypeEntry * entry)136 robj *listTypeGet(listTypeEntry *entry) {
137 robj *value = NULL;
138 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
139 if (entry->entry.value) {
140 value = createStringObject((char *)entry->entry.value,
141 entry->entry.sz);
142 } else {
143 value = createStringObjectFromLongLong(entry->entry.longval);
144 }
145 } else {
146 serverPanic("Unknown list encoding");
147 }
148 return value;
149 }
150
listTypeInsert(listTypeEntry * entry,robj * value,int where)151 void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
152 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
153 value = getDecodedObject(value);
154 sds str = value->ptr;
155 size_t len = sdslen(str);
156 if (where == LIST_TAIL) {
157 quicklistInsertAfter(entry->li->iter, &entry->entry, str, len);
158 } else if (where == LIST_HEAD) {
159 quicklistInsertBefore(entry->li->iter, &entry->entry, str, len);
160 }
161 decrRefCount(value);
162 } else {
163 serverPanic("Unknown list encoding");
164 }
165 }
166
167 /* Replaces entry at the current position of the iterator. */
listTypeReplace(listTypeEntry * entry,robj * value)168 void listTypeReplace(listTypeEntry *entry, robj *value) {
169 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
170 value = getDecodedObject(value);
171 sds str = value->ptr;
172 size_t len = sdslen(str);
173 quicklistReplaceEntry(entry->li->iter, &entry->entry, str, len);
174 decrRefCount(value);
175 } else {
176 serverPanic("Unknown list encoding");
177 }
178 }
179
180 /* Compare the given object with the entry at the current position. */
listTypeEqual(listTypeEntry * entry,robj * o)181 int listTypeEqual(listTypeEntry *entry, robj *o) {
182 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
183 serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
184 return quicklistCompare(&entry->entry,o->ptr,sdslen(o->ptr));
185 } else {
186 serverPanic("Unknown list encoding");
187 }
188 }
189
190 /* Delete the element pointed to. */
listTypeDelete(listTypeIterator * iter,listTypeEntry * entry)191 void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
192 if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
193 quicklistDelEntry(iter->iter, &entry->entry);
194 } else {
195 serverPanic("Unknown list encoding");
196 }
197 }
198
199 /* This is a helper function for the COPY command.
200 * Duplicate a list object, with the guarantee that the returned object
201 * has the same encoding as the original one.
202 *
203 * The resulting object always has refcount set to 1 */
listTypeDup(robj * o)204 robj *listTypeDup(robj *o) {
205 robj *lobj;
206
207 serverAssert(o->type == OBJ_LIST);
208
209 switch (o->encoding) {
210 case OBJ_ENCODING_QUICKLIST:
211 lobj = createObject(OBJ_LIST, quicklistDup(o->ptr));
212 lobj->encoding = o->encoding;
213 break;
214 default:
215 serverPanic("Unknown list encoding");
216 break;
217 }
218 return lobj;
219 }
220
221 /* Delete a range of elements from the list. */
listTypeDelRange(robj * subject,long start,long count)222 int listTypeDelRange(robj *subject, long start, long count) {
223 if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
224 return quicklistDelRange(subject->ptr, start, count);
225 } else {
226 serverPanic("Unknown list encoding");
227 }
228 }
229
230 /*-----------------------------------------------------------------------------
231 * List Commands
232 *----------------------------------------------------------------------------*/
233
234 /* Implements LPUSH/RPUSH/LPUSHX/RPUSHX.
235 * 'xx': push if key exists. */
pushGenericCommand(client * c,int where,int xx)236 void pushGenericCommand(client *c, int where, int xx) {
237 int j;
238
239 robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
240 if (checkType(c,lobj,OBJ_LIST)) return;
241 if (!lobj) {
242 if (xx) {
243 addReply(c, shared.czero);
244 return;
245 }
246
247 lobj = createQuicklistObject();
248 quicklistSetOptions(lobj->ptr, server.list_max_listpack_size,
249 server.list_compress_depth);
250 dbAdd(c->db,c->argv[1],lobj);
251 }
252
253 for (j = 2; j < c->argc; j++) {
254 listTypePush(lobj,c->argv[j],where);
255 server.dirty++;
256 }
257
258 addReplyLongLong(c, listTypeLength(lobj));
259
260 char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
261 signalModifiedKey(c,c->db,c->argv[1]);
262 notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
263 }
264
265 /* LPUSH <key> <element> [<element> ...] */
lpushCommand(client * c)266 void lpushCommand(client *c) {
267 pushGenericCommand(c,LIST_HEAD,0);
268 }
269
270 /* RPUSH <key> <element> [<element> ...] */
rpushCommand(client * c)271 void rpushCommand(client *c) {
272 pushGenericCommand(c,LIST_TAIL,0);
273 }
274
275 /* LPUSHX <key> <element> [<element> ...] */
lpushxCommand(client * c)276 void lpushxCommand(client *c) {
277 pushGenericCommand(c,LIST_HEAD,1);
278 }
279
280 /* RPUSH <key> <element> [<element> ...] */
rpushxCommand(client * c)281 void rpushxCommand(client *c) {
282 pushGenericCommand(c,LIST_TAIL,1);
283 }
284
285 /* LINSERT <key> (BEFORE|AFTER) <pivot> <element> */
linsertCommand(client * c)286 void linsertCommand(client *c) {
287 int where;
288 robj *subject;
289 listTypeIterator *iter;
290 listTypeEntry entry;
291 int inserted = 0;
292
293 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
294 where = LIST_TAIL;
295 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
296 where = LIST_HEAD;
297 } else {
298 addReplyErrorObject(c,shared.syntaxerr);
299 return;
300 }
301
302 if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
303 checkType(c,subject,OBJ_LIST)) return;
304
305 /* Seek pivot from head to tail */
306 iter = listTypeInitIterator(subject,0,LIST_TAIL);
307 while (listTypeNext(iter,&entry)) {
308 if (listTypeEqual(&entry,c->argv[3])) {
309 listTypeInsert(&entry,c->argv[4],where);
310 inserted = 1;
311 break;
312 }
313 }
314 listTypeReleaseIterator(iter);
315
316 if (inserted) {
317 signalModifiedKey(c,c->db,c->argv[1]);
318 notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
319 c->argv[1],c->db->id);
320 server.dirty++;
321 } else {
322 /* Notify client of a failed insert */
323 addReplyLongLong(c,-1);
324 return;
325 }
326
327 addReplyLongLong(c,listTypeLength(subject));
328 }
329
330 /* LLEN <key> */
llenCommand(client * c)331 void llenCommand(client *c) {
332 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
333 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
334 addReplyLongLong(c,listTypeLength(o));
335 }
336
337 /* LINDEX <key> <index> */
lindexCommand(client * c)338 void lindexCommand(client *c) {
339 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]);
340 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
341 long index;
342
343 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
344 return;
345
346 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
347 quicklistEntry entry;
348 quicklistIter *iter = quicklistGetIteratorEntryAtIdx(o->ptr, index, &entry);
349 if (iter) {
350 if (entry.value) {
351 addReplyBulkCBuffer(c, entry.value, entry.sz);
352 } else {
353 addReplyBulkLongLong(c, entry.longval);
354 }
355 } else {
356 addReplyNull(c);
357 }
358 quicklistReleaseIterator(iter);
359 } else {
360 serverPanic("Unknown list encoding");
361 }
362 }
363
364 /* LSET <key> <index> <element> */
lsetCommand(client * c)365 void lsetCommand(client *c) {
366 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
367 if (o == NULL || checkType(c,o,OBJ_LIST)) return;
368 long index;
369 robj *value = c->argv[3];
370
371 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
372 return;
373
374 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
375 quicklist *ql = o->ptr;
376 int replaced = quicklistReplaceAtIndex(ql, index,
377 value->ptr, sdslen(value->ptr));
378 if (!replaced) {
379 addReplyErrorObject(c,shared.outofrangeerr);
380 } else {
381 addReply(c,shared.ok);
382 signalModifiedKey(c,c->db,c->argv[1]);
383 notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
384 server.dirty++;
385 }
386 } else {
387 serverPanic("Unknown list encoding");
388 }
389 }
390
391 /* A helper function like addListRangeReply, more details see below.
392 * The difference is that here we are returning nested arrays, like:
393 * 1) keyname
394 * 2) 1) element1
395 * 2) element2
396 *
397 * And also actually pop out from the list by calling listElementsRemoved.
398 * We maintain the server.dirty and notifications there.
399 *
400 * 'deleted' is an optional output argument to get an indication
401 * if the key got deleted by this function. */
listPopRangeAndReplyWithKey(client * c,robj * o,robj * key,int where,long count,int * deleted)402 void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long count, int *deleted) {
403 long llen = listTypeLength(o);
404 long rangelen = (count > llen) ? llen : count;
405 long rangestart = (where == LIST_HEAD) ? 0 : -rangelen;
406 long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1;
407 int reverse = (where == LIST_HEAD) ? 0 : 1;
408
409 /* We return key-name just once, and an array of elements */
410 addReplyArrayLen(c, 2);
411 addReplyBulk(c, key);
412 addListRangeReply(c, o, rangestart, rangeend, reverse);
413
414 /* Pop these elements. */
415 listTypeDelRange(o, rangestart, rangelen);
416 /* Maintain the notifications and dirty. */
417 listElementsRemoved(c, key, where, o, rangelen, deleted);
418 }
419
420 /* A helper for replying with a list's range between the inclusive start and end
421 * indexes as multi-bulk, with support for negative indexes. Note that start
422 * must be less than end or an empty array is returned. When the reverse
423 * argument is set to a non-zero value, the reply is reversed so that elements
424 * are returned from end to start. */
addListRangeReply(client * c,robj * o,long start,long end,int reverse)425 void addListRangeReply(client *c, robj *o, long start, long end, int reverse) {
426 long rangelen, llen = listTypeLength(o);
427
428 /* Convert negative indexes. */
429 if (start < 0) start = llen+start;
430 if (end < 0) end = llen+end;
431 if (start < 0) start = 0;
432
433 /* Invariant: start >= 0, so this test will be true when end < 0.
434 * The range is empty when start > end or start >= length. */
435 if (start > end || start >= llen) {
436 addReply(c,shared.emptyarray);
437 return;
438 }
439 if (end >= llen) end = llen-1;
440 rangelen = (end-start)+1;
441
442 /* Return the result in form of a multi-bulk reply */
443 addReplyArrayLen(c,rangelen);
444 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
445 int from = reverse ? end : start;
446 int direction = reverse ? LIST_HEAD : LIST_TAIL;
447 listTypeIterator *iter = listTypeInitIterator(o,from,direction);
448
449 while(rangelen--) {
450 listTypeEntry entry;
451 serverAssert(listTypeNext(iter, &entry)); /* fail on corrupt data */
452 quicklistEntry *qe = &entry.entry;
453 if (qe->value) {
454 addReplyBulkCBuffer(c,qe->value,qe->sz);
455 } else {
456 addReplyBulkLongLong(c,qe->longval);
457 }
458 }
459 listTypeReleaseIterator(iter);
460 } else {
461 serverPanic("Unknown list encoding");
462 }
463 }
464
465 /* A housekeeping helper for list elements popping tasks.
466 *
467 * 'deleted' is an optional output argument to get an indication
468 * if the key got deleted by this function. */
listElementsRemoved(client * c,robj * key,int where,robj * o,long count,int * deleted)469 void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int *deleted) {
470 char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
471
472 notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id);
473 if (listTypeLength(o) == 0) {
474 if (deleted) *deleted = 1;
475
476 dbDelete(c->db, key);
477 notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
478 } else {
479 if (deleted) *deleted = 0;
480 }
481 signalModifiedKey(c, c->db, key);
482 server.dirty += count;
483 }
484
485 /* Implements the generic list pop operation for LPOP/RPOP.
486 * The where argument specifies which end of the list is operated on. An
487 * optional count may be provided as the third argument of the client's
488 * command. */
popGenericCommand(client * c,int where)489 void popGenericCommand(client *c, int where) {
490 int hascount = (c->argc == 3);
491 long count = 0;
492 robj *value;
493
494 if (c->argc > 3) {
495 addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
496 c->cmd->name);
497 return;
498 } else if (hascount) {
499 /* Parse the optional count argument. */
500 if (getPositiveLongFromObjectOrReply(c,c->argv[2],&count,NULL) != C_OK)
501 return;
502 }
503
504 robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp]);
505 if (o == NULL || checkType(c, o, OBJ_LIST))
506 return;
507
508 if (hascount && !count) {
509 /* Fast exit path. */
510 addReply(c,shared.emptyarray);
511 return;
512 }
513
514 if (!count) {
515 /* Pop a single element. This is POP's original behavior that replies
516 * with a bulk string. */
517 value = listTypePop(o,where);
518 serverAssert(value != NULL);
519 addReplyBulk(c,value);
520 decrRefCount(value);
521 listElementsRemoved(c,c->argv[1],where,o,1,NULL);
522 } else {
523 /* Pop a range of elements. An addition to the original POP command,
524 * which replies with a multi-bulk. */
525 long llen = listTypeLength(o);
526 long rangelen = (count > llen) ? llen : count;
527 long rangestart = (where == LIST_HEAD) ? 0 : -rangelen;
528 long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1;
529 int reverse = (where == LIST_HEAD) ? 0 : 1;
530
531 addListRangeReply(c,o,rangestart,rangeend,reverse);
532 listTypeDelRange(o,rangestart,rangelen);
533 listElementsRemoved(c,c->argv[1],where,o,rangelen,NULL);
534 }
535 }
536
537 /* Like popGenericCommand but work with multiple keys.
538 * Take multiple keys and return multiple elements from just one key.
539 *
540 * 'numkeys' the number of keys.
541 * 'count' is the number of elements requested to pop.
542 *
543 * Always reply with array. */
mpopGenericCommand(client * c,robj ** keys,int numkeys,int where,long count)544 void mpopGenericCommand(client *c, robj **keys, int numkeys, int where, long count) {
545 int j;
546 robj *o;
547 robj *key;
548
549 for (j = 0; j < numkeys; j++) {
550 key = keys[j];
551 o = lookupKeyWrite(c->db, key);
552
553 /* Non-existing key, move to next key. */
554 if (o == NULL) continue;
555
556 if (checkType(c, o, OBJ_LIST)) return;
557
558 long llen = listTypeLength(o);
559 /* Empty list, move to next key. */
560 if (llen == 0) continue;
561
562 /* Pop a range of elements in a nested arrays way. */
563 listPopRangeAndReplyWithKey(c, o, key, where, count, NULL);
564
565 /* Replicate it as [LR]POP COUNT. */
566 robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
567 rewriteClientCommandVector(c, 3,
568 (where == LIST_HEAD) ? shared.lpop : shared.rpop,
569 key, count_obj);
570 decrRefCount(count_obj);
571 return;
572 }
573
574 /* Look like we are not able to pop up any elements. */
575 addReplyNullArray(c);
576 }
577
578 /* LPOP <key> [count] */
lpopCommand(client * c)579 void lpopCommand(client *c) {
580 popGenericCommand(c,LIST_HEAD);
581 }
582
583 /* RPOP <key> [count] */
rpopCommand(client * c)584 void rpopCommand(client *c) {
585 popGenericCommand(c,LIST_TAIL);
586 }
587
588 /* LRANGE <key> <start> <stop> */
lrangeCommand(client * c)589 void lrangeCommand(client *c) {
590 robj *o;
591 long start, end;
592
593 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
594 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
595
596 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL
597 || checkType(c,o,OBJ_LIST)) return;
598
599 addListRangeReply(c,o,start,end,0);
600 }
601
602 /* LTRIM <key> <start> <stop> */
ltrimCommand(client * c)603 void ltrimCommand(client *c) {
604 robj *o;
605 long start, end, llen, ltrim, rtrim;
606
607 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
608 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
609
610 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
611 checkType(c,o,OBJ_LIST)) return;
612 llen = listTypeLength(o);
613
614 /* convert negative indexes */
615 if (start < 0) start = llen+start;
616 if (end < 0) end = llen+end;
617 if (start < 0) start = 0;
618
619 /* Invariant: start >= 0, so this test will be true when end < 0.
620 * The range is empty when start > end or start >= length. */
621 if (start > end || start >= llen) {
622 /* Out of range start or start > end result in empty list */
623 ltrim = llen;
624 rtrim = 0;
625 } else {
626 if (end >= llen) end = llen-1;
627 ltrim = start;
628 rtrim = llen-end-1;
629 }
630
631 /* Remove list elements to perform the trim */
632 if (o->encoding == OBJ_ENCODING_QUICKLIST) {
633 quicklistDelRange(o->ptr,0,ltrim);
634 quicklistDelRange(o->ptr,-rtrim,rtrim);
635 } else {
636 serverPanic("Unknown list encoding");
637 }
638
639 notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
640 if (listTypeLength(o) == 0) {
641 dbDelete(c->db,c->argv[1]);
642 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
643 }
644 signalModifiedKey(c,c->db,c->argv[1]);
645 server.dirty += (ltrim + rtrim);
646 addReply(c,shared.ok);
647 }
648
649 /* LPOS key element [RANK rank] [COUNT num-matches] [MAXLEN len]
650 *
651 * The "rank" is the position of the match, so if it is 1, the first match
652 * is returned, if it is 2 the second match is returned and so forth.
653 * It is 1 by default. If negative has the same meaning but the search is
654 * performed starting from the end of the list.
655 *
656 * If COUNT is given, instead of returning the single element, a list of
657 * all the matching elements up to "num-matches" are returned. COUNT can
658 * be combined with RANK in order to returning only the element starting
659 * from the Nth. If COUNT is zero, all the matching elements are returned.
660 *
661 * MAXLEN tells the command to scan a max of len elements. If zero (the
662 * default), all the elements in the list are scanned if needed.
663 *
664 * The returned elements indexes are always referring to what LINDEX
665 * would return. So first element from head is 0, and so forth. */
lposCommand(client * c)666 void lposCommand(client *c) {
667 robj *o, *ele;
668 ele = c->argv[2];
669 int direction = LIST_TAIL;
670 long rank = 1, count = -1, maxlen = 0; /* Count -1: option not given. */
671
672 /* Parse the optional arguments. */
673 for (int j = 3; j < c->argc; j++) {
674 char *opt = c->argv[j]->ptr;
675 int moreargs = (c->argc-1)-j;
676
677 if (!strcasecmp(opt,"RANK") && moreargs) {
678 j++;
679 if (getLongFromObjectOrReply(c, c->argv[j], &rank, NULL) != C_OK)
680 return;
681 if (rank == 0) {
682 addReplyError(c,"RANK can't be zero: use 1 to start from "
683 "the first match, 2 from the second, ...");
684 return;
685 }
686 } else if (!strcasecmp(opt,"COUNT") && moreargs) {
687 j++;
688 if (getPositiveLongFromObjectOrReply(c, c->argv[j], &count,
689 "COUNT can't be negative") != C_OK)
690 return;
691 } else if (!strcasecmp(opt,"MAXLEN") && moreargs) {
692 j++;
693 if (getPositiveLongFromObjectOrReply(c, c->argv[j], &maxlen,
694 "MAXLEN can't be negative") != C_OK)
695 return;
696 } else {
697 addReplyErrorObject(c,shared.syntaxerr);
698 return;
699 }
700 }
701
702 /* A negative rank means start from the tail. */
703 if (rank < 0) {
704 rank = -rank;
705 direction = LIST_HEAD;
706 }
707
708 /* We return NULL or an empty array if there is no such key (or
709 * if we find no matches, depending on the presence of the COUNT option. */
710 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
711 if (count != -1)
712 addReply(c,shared.emptyarray);
713 else
714 addReply(c,shared.null[c->resp]);
715 return;
716 }
717 if (checkType(c,o,OBJ_LIST)) return;
718
719 /* If we got the COUNT option, prepare to emit an array. */
720 void *arraylenptr = NULL;
721 if (count != -1) arraylenptr = addReplyDeferredLen(c);
722
723 /* Seek the element. */
724 listTypeIterator *li;
725 li = listTypeInitIterator(o,direction == LIST_HEAD ? -1 : 0,direction);
726 listTypeEntry entry;
727 long llen = listTypeLength(o);
728 long index = 0, matches = 0, matchindex = -1, arraylen = 0;
729 while (listTypeNext(li,&entry) && (maxlen == 0 || index < maxlen)) {
730 if (listTypeEqual(&entry,ele)) {
731 matches++;
732 matchindex = (direction == LIST_TAIL) ? index : llen - index - 1;
733 if (matches >= rank) {
734 if (arraylenptr) {
735 arraylen++;
736 addReplyLongLong(c,matchindex);
737 if (count && matches-rank+1 >= count) break;
738 } else {
739 break;
740 }
741 }
742 }
743 index++;
744 matchindex = -1; /* Remember if we exit the loop without a match. */
745 }
746 listTypeReleaseIterator(li);
747
748 /* Reply to the client. Note that arraylenptr is not NULL only if
749 * the COUNT option was selected. */
750 if (arraylenptr != NULL) {
751 setDeferredArrayLen(c,arraylenptr,arraylen);
752 } else {
753 if (matchindex != -1)
754 addReplyLongLong(c,matchindex);
755 else
756 addReply(c,shared.null[c->resp]);
757 }
758 }
759
760 /* LREM <key> <count> <element> */
lremCommand(client * c)761 void lremCommand(client *c) {
762 robj *subject, *obj;
763 obj = c->argv[3];
764 long toremove;
765 long removed = 0;
766
767 if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
768 return;
769
770 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
771 if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
772
773 listTypeIterator *li;
774 if (toremove < 0) {
775 toremove = -toremove;
776 li = listTypeInitIterator(subject,-1,LIST_HEAD);
777 } else {
778 li = listTypeInitIterator(subject,0,LIST_TAIL);
779 }
780
781 listTypeEntry entry;
782 while (listTypeNext(li,&entry)) {
783 if (listTypeEqual(&entry,obj)) {
784 listTypeDelete(li, &entry);
785 server.dirty++;
786 removed++;
787 if (toremove && removed == toremove) break;
788 }
789 }
790 listTypeReleaseIterator(li);
791
792 if (removed) {
793 signalModifiedKey(c,c->db,c->argv[1]);
794 notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
795 }
796
797 if (listTypeLength(subject) == 0) {
798 dbDelete(c->db,c->argv[1]);
799 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
800 }
801
802 addReplyLongLong(c,removed);
803 }
804
lmoveHandlePush(client * c,robj * dstkey,robj * dstobj,robj * value,int where)805 void lmoveHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value,
806 int where) {
807 /* Create the list if the key does not exist */
808 if (!dstobj) {
809 dstobj = createQuicklistObject();
810 quicklistSetOptions(dstobj->ptr, server.list_max_listpack_size,
811 server.list_compress_depth);
812 dbAdd(c->db,dstkey,dstobj);
813 }
814 signalModifiedKey(c,c->db,dstkey);
815 listTypePush(dstobj,value,where);
816 notifyKeyspaceEvent(NOTIFY_LIST,
817 where == LIST_HEAD ? "lpush" : "rpush",
818 dstkey,
819 c->db->id);
820 /* Always send the pushed value to the client. */
821 addReplyBulk(c,value);
822 }
823
getListPositionFromObjectOrReply(client * c,robj * arg,int * position)824 int getListPositionFromObjectOrReply(client *c, robj *arg, int *position) {
825 if (strcasecmp(arg->ptr,"right") == 0) {
826 *position = LIST_TAIL;
827 } else if (strcasecmp(arg->ptr,"left") == 0) {
828 *position = LIST_HEAD;
829 } else {
830 addReplyErrorObject(c,shared.syntaxerr);
831 return C_ERR;
832 }
833 return C_OK;
834 }
835
getStringObjectFromListPosition(int position)836 robj *getStringObjectFromListPosition(int position) {
837 if (position == LIST_HEAD) {
838 return shared.left;
839 } else {
840 // LIST_TAIL
841 return shared.right;
842 }
843 }
844
lmoveGenericCommand(client * c,int wherefrom,int whereto)845 void lmoveGenericCommand(client *c, int wherefrom, int whereto) {
846 robj *sobj, *value;
847 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
848 == NULL || checkType(c,sobj,OBJ_LIST)) return;
849
850 if (listTypeLength(sobj) == 0) {
851 /* This may only happen after loading very old RDB files. Recent
852 * versions of Redis delete keys of empty lists. */
853 addReplyNull(c);
854 } else {
855 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
856 robj *touchedkey = c->argv[1];
857
858 if (checkType(c,dobj,OBJ_LIST)) return;
859 value = listTypePop(sobj,wherefrom);
860 serverAssert(value); /* assertion for valgrind (avoid NPD) */
861 lmoveHandlePush(c,c->argv[2],dobj,value,whereto);
862
863 /* listTypePop returns an object with its refcount incremented */
864 decrRefCount(value);
865
866 /* Delete the source list when it is empty */
867 notifyKeyspaceEvent(NOTIFY_LIST,
868 wherefrom == LIST_HEAD ? "lpop" : "rpop",
869 touchedkey,
870 c->db->id);
871 if (listTypeLength(sobj) == 0) {
872 dbDelete(c->db,touchedkey);
873 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
874 touchedkey,c->db->id);
875 }
876 signalModifiedKey(c,c->db,touchedkey);
877 server.dirty++;
878 if (c->cmd->proc == blmoveCommand) {
879 rewriteClientCommandVector(c,5,shared.lmove,
880 c->argv[1],c->argv[2],c->argv[3],c->argv[4]);
881 } else if (c->cmd->proc == brpoplpushCommand) {
882 rewriteClientCommandVector(c,3,shared.rpoplpush,
883 c->argv[1],c->argv[2]);
884 }
885 }
886 }
887
888 /* LMOVE <source> <destination> (LEFT|RIGHT) (LEFT|RIGHT) */
lmoveCommand(client * c)889 void lmoveCommand(client *c) {
890 int wherefrom, whereto;
891 if (getListPositionFromObjectOrReply(c,c->argv[3],&wherefrom)
892 != C_OK) return;
893 if (getListPositionFromObjectOrReply(c,c->argv[4],&whereto)
894 != C_OK) return;
895 lmoveGenericCommand(c, wherefrom, whereto);
896 }
897
898 /* This is the semantic of this command:
899 * RPOPLPUSH srclist dstlist:
900 * IF LLEN(srclist) > 0
901 * element = RPOP srclist
902 * LPUSH dstlist element
903 * RETURN element
904 * ELSE
905 * RETURN nil
906 * END
907 * END
908 *
909 * The idea is to be able to get an element from a list in a reliable way
910 * since the element is not just returned but pushed against another list
911 * as well. This command was originally proposed by Ezra Zygmuntowicz.
912 */
rpoplpushCommand(client * c)913 void rpoplpushCommand(client *c) {
914 lmoveGenericCommand(c, LIST_TAIL, LIST_HEAD);
915 }
916
917 /*-----------------------------------------------------------------------------
918 * Blocking POP operations
919 *----------------------------------------------------------------------------*/
920
921 /* This is a helper function for handleClientsBlockedOnKeys(). Its work
922 * is to serve a specific client (receiver) that is blocked on 'key'
923 * in the context of the specified 'db', doing the following:
924 *
925 * 1) Provide the client with the 'value' element or a range of elements.
926 * We will do the pop in here and caller does not need to bother the return.
927 * 2) If the dstkey is not NULL (we are serving a BLMOVE) also push the
928 * 'value' element on the destination list (the "push" side of the command).
929 * 3) Propagate the resulting BRPOP, BLPOP, BLMPOP and additional xPUSH if any into
930 * the AOF and replication channel.
931 *
932 * The argument 'wherefrom' is LIST_TAIL or LIST_HEAD, and indicates if the
933 * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that
934 * we can propagate the command properly.
935 *
936 * The argument 'whereto' is LIST_TAIL or LIST_HEAD, and indicates if the
937 * 'value' element is to be pushed to the head or tail so that we can
938 * propagate the command properly.
939 *
940 * 'deleted' is an optional output argument to get an indication
941 * if the key got deleted by this function. */
serveClientBlockedOnList(client * receiver,robj * o,robj * key,robj * dstkey,redisDb * db,int wherefrom,int whereto,int * deleted)942 void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted)
943 {
944 robj *argv[5];
945 robj *value = NULL;
946
947 if (deleted) *deleted = 0;
948
949 if (dstkey == NULL) {
950 /* Propagate the [LR]POP operation. */
951 argv[0] = (wherefrom == LIST_HEAD) ? shared.lpop :
952 shared.rpop;
953 argv[1] = key;
954
955 if (receiver->lastcmd->proc == blmpopCommand) {
956 /* Propagate the [LR]POP COUNT operation. */
957 long count = receiver->bpop.count;
958 serverAssert(count > 0);
959 long llen = listTypeLength(o);
960 serverAssert(llen > 0);
961
962 argv[2] = createStringObjectFromLongLong((count > llen) ? llen : count);
963 propagate(db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL);
964 decrRefCount(argv[2]);
965
966 /* Pop a range of elements in a nested arrays way. */
967 listPopRangeAndReplyWithKey(receiver, o, key, wherefrom, count, deleted);
968 return;
969 }
970
971 propagate(db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL);
972
973 /* BRPOP/BLPOP */
974 value = listTypePop(o, wherefrom);
975 serverAssert(value != NULL);
976
977 addReplyArrayLen(receiver,2);
978 addReplyBulk(receiver,key);
979 addReplyBulk(receiver,value);
980
981 /* Notify event. */
982 char *event = (wherefrom == LIST_HEAD) ? "lpop" : "rpop";
983 notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);
984 } else {
985 /* BLMOVE */
986 robj *dstobj =
987 lookupKeyWrite(receiver->db,dstkey);
988 if (!(dstobj &&
989 checkType(receiver,dstobj,OBJ_LIST)))
990 {
991 value = listTypePop(o, wherefrom);
992 serverAssert(value != NULL);
993
994 lmoveHandlePush(receiver,dstkey,dstobj,value,whereto);
995 /* Propagate the LMOVE/RPOPLPUSH operation. */
996 int isbrpoplpush = (receiver->lastcmd->proc == brpoplpushCommand);
997 argv[0] = isbrpoplpush ? shared.rpoplpush : shared.lmove;
998 argv[1] = key;
999 argv[2] = dstkey;
1000 argv[3] = getStringObjectFromListPosition(wherefrom);
1001 argv[4] = getStringObjectFromListPosition(whereto);
1002 propagate(db->id,argv,(isbrpoplpush ? 3 : 5),PROPAGATE_AOF|PROPAGATE_REPL);
1003
1004 /* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */
1005 notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop",
1006 key,receiver->db->id);
1007 }
1008 }
1009
1010 if (value) decrRefCount(value);
1011
1012 if (listTypeLength(o) == 0) {
1013 if (deleted) *deleted = 1;
1014
1015 dbDelete(receiver->db, key);
1016 notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, receiver->db->id);
1017 }
1018 /* We don't call signalModifiedKey() as it was already called
1019 * when an element was pushed on the list. */
1020 }
1021
1022 /* Blocking RPOP/LPOP/LMPOP
1023 *
1024 * 'numkeys' is the number of keys.
1025 * 'timeout_idx' parameter position of block timeout.
1026 * 'where' LIST_HEAD for LEFT, LIST_TAIL for RIGHT.
1027 * 'count' is the number of elements requested to pop, or -1 for plain single pop.
1028 *
1029 * When count is -1, a reply of a single bulk-string will be used.
1030 * When count > 0, an array reply will be used. */
blockingPopGenericCommand(client * c,robj ** keys,int numkeys,int where,int timeout_idx,long count)1031 void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, int timeout_idx, long count) {
1032 robj *o;
1033 robj *key;
1034 mstime_t timeout;
1035 int j;
1036
1037 if (getTimeoutFromObjectOrReply(c,c->argv[timeout_idx],&timeout,UNIT_SECONDS)
1038 != C_OK) return;
1039
1040 /* Traverse all input keys, we take action only based on one key. */
1041 for (j = 0; j < numkeys; j++) {
1042 key = keys[j];
1043 o = lookupKeyWrite(c->db, key);
1044
1045 /* Non-existing key, move to next key. */
1046 if (o == NULL) continue;
1047
1048 if (checkType(c, o, OBJ_LIST)) return;
1049
1050 long llen = listTypeLength(o);
1051 /* Empty list, move to next key. */
1052 if (llen == 0) continue;
1053
1054 if (count != -1) {
1055 /* BLMPOP, non empty list, like a normal [LR]POP with count option.
1056 * The difference here we pop a range of elements in a nested arrays way. */
1057 listPopRangeAndReplyWithKey(c, o, key, where, count, NULL);
1058
1059 /* Replicate it as [LR]POP COUNT. */
1060 robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
1061 rewriteClientCommandVector(c, 3,
1062 (where == LIST_HEAD) ? shared.lpop : shared.rpop,
1063 key, count_obj);
1064 decrRefCount(count_obj);
1065 return;
1066 }
1067
1068 /* Non empty list, this is like a normal [LR]POP. */
1069 robj *value = listTypePop(o,where);
1070 serverAssert(value != NULL);
1071
1072 addReplyArrayLen(c,2);
1073 addReplyBulk(c,key);
1074 addReplyBulk(c,value);
1075 decrRefCount(value);
1076 listElementsRemoved(c,key,where,o,1,NULL);
1077
1078 /* Replicate it as an [LR]POP instead of B[LR]POP. */
1079 rewriteClientCommandVector(c,2,
1080 (where == LIST_HEAD) ? shared.lpop : shared.rpop,
1081 key);
1082 return;
1083 }
1084
1085 /* If we are not allowed to block the client, the only thing
1086 * we can do is treating it as a timeout (even with timeout 0). */
1087 if (c->flags & CLIENT_DENY_BLOCKING) {
1088 addReplyNullArray(c);
1089 return;
1090 }
1091
1092 /* If the keys do not exist we must block */
1093 struct blockPos pos = {where};
1094 blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL);
1095 }
1096
1097 /* BLPOP <key> [<key> ...] <timeout> */
blpopCommand(client * c)1098 void blpopCommand(client *c) {
1099 blockingPopGenericCommand(c,c->argv+1,c->argc-2,LIST_HEAD,c->argc-1,-1);
1100 }
1101
1102 /* BRPOP <key> [<key> ...] <timeout> */
brpopCommand(client * c)1103 void brpopCommand(client *c) {
1104 blockingPopGenericCommand(c,c->argv+1,c->argc-2,LIST_TAIL,c->argc-1,-1);
1105 }
1106
blmoveGenericCommand(client * c,int wherefrom,int whereto,mstime_t timeout)1107 void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeout) {
1108 robj *key = lookupKeyWrite(c->db, c->argv[1]);
1109 if (checkType(c,key,OBJ_LIST)) return;
1110
1111 if (key == NULL) {
1112 if (c->flags & CLIENT_DENY_BLOCKING) {
1113 /* Blocking against an empty list when blocking is not allowed
1114 * returns immediately. */
1115 addReplyNull(c);
1116 } else {
1117 /* The list is empty and the client blocks. */
1118 struct blockPos pos = {wherefrom, whereto};
1119 blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,-1,timeout,c->argv[2],&pos,NULL);
1120 }
1121 } else {
1122 /* The list exists and has elements, so
1123 * the regular lmoveCommand is executed. */
1124 serverAssertWithInfo(c,key,listTypeLength(key) > 0);
1125 lmoveGenericCommand(c,wherefrom,whereto);
1126 }
1127 }
1128
1129 /* BLMOVE <source> <destination> (LEFT|RIGHT) (LEFT|RIGHT) <timeout> */
blmoveCommand(client * c)1130 void blmoveCommand(client *c) {
1131 mstime_t timeout;
1132 int wherefrom, whereto;
1133 if (getListPositionFromObjectOrReply(c,c->argv[3],&wherefrom)
1134 != C_OK) return;
1135 if (getListPositionFromObjectOrReply(c,c->argv[4],&whereto)
1136 != C_OK) return;
1137 if (getTimeoutFromObjectOrReply(c,c->argv[5],&timeout,UNIT_SECONDS)
1138 != C_OK) return;
1139 blmoveGenericCommand(c,wherefrom,whereto,timeout);
1140 }
1141
1142 /* BRPOPLPUSH <source> <destination> <timeout> */
brpoplpushCommand(client * c)1143 void brpoplpushCommand(client *c) {
1144 mstime_t timeout;
1145 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
1146 != C_OK) return;
1147 blmoveGenericCommand(c, LIST_TAIL, LIST_HEAD, timeout);
1148 }
1149
1150 /* LMPOP/BLMPOP
1151 *
1152 * 'numkeys_idx' parameter position of key number.
1153 * 'is_block' this indicates whether it is a blocking variant. */
lmpopGenericCommand(client * c,int numkeys_idx,int is_block)1154 void lmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
1155 long j;
1156 long numkeys = 0; /* Number of keys. */
1157 int where = 0; /* HEAD for LEFT, TAIL for RIGHT. */
1158 long count = -1; /* Reply will consist of up to count elements, depending on the list's length. */
1159
1160 /* Parse the numkeys. */
1161 if (getRangeLongFromObjectOrReply(c, c->argv[numkeys_idx], 1, LONG_MAX,
1162 &numkeys, "numkeys should be greater than 0") != C_OK)
1163 return;
1164
1165 /* Parse the where. where_idx: the index of where in the c->argv. */
1166 long where_idx = numkeys_idx + numkeys + 1;
1167 if (where_idx >= c->argc) {
1168 addReplyErrorObject(c, shared.syntaxerr);
1169 return;
1170 }
1171 if (getListPositionFromObjectOrReply(c, c->argv[where_idx], &where) != C_OK)
1172 return;
1173
1174 /* Parse the optional arguments. */
1175 for (j = where_idx + 1; j < c->argc; j++) {
1176 char *opt = c->argv[j]->ptr;
1177 int moreargs = (c->argc - 1) - j;
1178
1179 if (count == -1 && !strcasecmp(opt, "COUNT") && moreargs) {
1180 j++;
1181 if (getRangeLongFromObjectOrReply(c, c->argv[j], 1, LONG_MAX,
1182 &count,"count should be greater than 0") != C_OK)
1183 return;
1184 } else {
1185 addReplyErrorObject(c, shared.syntaxerr);
1186 return;
1187 }
1188 }
1189
1190 if (count == -1) count = 1;
1191
1192 if (is_block) {
1193 /* BLOCK. We will handle CLIENT_DENY_BLOCKING flag in blockingPopGenericCommand. */
1194 blockingPopGenericCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count);
1195 } else {
1196 /* NON-BLOCK */
1197 mpopGenericCommand(c, c->argv+numkeys_idx+1, numkeys, where, count);
1198 }
1199 }
1200
1201 /* LMPOP numkeys [<key> ...] LEFT|RIGHT [COUNT count] */
lmpopCommand(client * c)1202 void lmpopCommand(client *c) {
1203 lmpopGenericCommand(c, 1, 0);
1204 }
1205
1206 /* BLMPOP timeout numkeys [<key> ...] LEFT|RIGHT [COUNT count] */
blmpopCommand(client * c)1207 void blmpopCommand(client *c) {
1208 lmpopGenericCommand(c, 2, 1);
1209 }
1210