1 /*
2  * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "endianconv.h"
32 #include "stream.h"
33 
34 #define STREAM_BYTES_PER_LISTPACK 2048
35 
36 /* Every stream item inside the listpack, has a flags field that is used to
37  * mark the entry as deleted, or having the same field as the "master"
38  * entry at the start of the listpack> */
39 #define STREAM_ITEM_FLAG_NONE 0             /* No special flags. */
40 #define STREAM_ITEM_FLAG_DELETED (1<<0)     /* Entry is delted. Skip it. */
41 #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1)  /* Same fields as master entry. */
42 
43 /* Don't let listpacks grow too big, even if the user config allows it.
44  * doing so can lead to an overflow (trying to store more than 32bit length
45  * into the listpack header), or actually an assertion since lpInsert
46  * will return NULL. */
47 #define STREAM_LISTPACK_MAX_SIZE (1<<30)
48 
49 void streamFreeCG(streamCG *cg);
50 void streamFreeNACK(streamNACK *na);
51 size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
52 
53 /* -----------------------------------------------------------------------
54  * Low level stream encoding: a radix tree of listpacks.
55  * ----------------------------------------------------------------------- */
56 
57 /* Create a new stream data structure. */
streamNew(void)58 stream *streamNew(void) {
59     stream *s = zmalloc(sizeof(*s));
60     s->rax = raxNew();
61     s->length = 0;
62     s->last_id.ms = 0;
63     s->last_id.seq = 0;
64     s->cgroups = NULL; /* Created on demand to save memory when not used. */
65     return s;
66 }
67 
68 /* Free a stream, including the listpacks stored inside the radix tree. */
freeStream(stream * s)69 void freeStream(stream *s) {
70     raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
71     if (s->cgroups)
72         raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG);
73     zfree(s);
74 }
75 
76 /* Set 'id' to be its successor streamID */
streamIncrID(streamID * id)77 void streamIncrID(streamID *id) {
78     if (id->seq == UINT64_MAX) {
79         if (id->ms == UINT64_MAX) {
80             /* Special case where 'id' is the last possible streamID... */
81             id->ms = id->seq = 0;
82         } else {
83             id->ms++;
84             id->seq = 0;
85         }
86     } else {
87         id->seq++;
88     }
89 }
90 
91 /* Return the length of a stream. */
streamLength(const robj * subject)92 unsigned long streamLength(const robj *subject) {
93     stream *s = subject->ptr;
94     return s->length;
95 }
96 
97 /* Generate the next stream item ID given the previous one. If the current
98  * milliseconds Unix time is greater than the previous one, just use this
99  * as time part and start with sequence part of zero. Otherwise we use the
100  * previous time (and never go backward) and increment the sequence. */
streamNextID(streamID * last_id,streamID * new_id)101 void streamNextID(streamID *last_id, streamID *new_id) {
102     uint64_t ms = mstime();
103     if (ms > last_id->ms) {
104         new_id->ms = ms;
105         new_id->seq = 0;
106     } else {
107         *new_id = *last_id;
108         streamIncrID(new_id);
109     }
110 }
111 
112 /* This is just a wrapper for lpAppend() to directly use a 64 bit integer
113  * instead of a string. */
lpAppendInteger(unsigned char * lp,int64_t value)114 unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) {
115     char buf[LONG_STR_SIZE];
116     int slen = ll2string(buf,sizeof(buf),value);
117     return lpAppend(lp,(unsigned char*)buf,slen);
118 }
119 
120 /* This is just a wrapper for lpReplace() to directly use a 64 bit integer
121  * instead of a string to replace the current element. The function returns
122  * the new listpack as return value, and also updates the current cursor
123  * by updating '*pos'. */
lpReplaceInteger(unsigned char * lp,unsigned char ** pos,int64_t value)124 unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **pos, int64_t value) {
125     char buf[LONG_STR_SIZE];
126     int slen = ll2string(buf,sizeof(buf),value);
127     return lpInsert(lp, (unsigned char*)buf, slen, *pos, LP_REPLACE, pos);
128 }
129 
130 /* This is a wrapper function for lpGet() to directly get an integer value
131  * from the listpack (that may store numbers as a string), converting
132  * the string if needed. */
lpGetInteger(unsigned char * ele)133 int64_t lpGetInteger(unsigned char *ele) {
134     int64_t v;
135     unsigned char *e = lpGet(ele,&v,NULL);
136     if (e == NULL) return v;
137     /* The following code path should never be used for how listpacks work:
138      * they should always be able to store an int64_t value in integer
139      * encoded form. However the implementation may change. */
140     long long ll;
141     int retval = string2ll((char*)e,v,&ll);
142     serverAssert(retval != 0);
143     v = ll;
144     return v;
145 }
146 
147 /* Debugging function to log the full content of a listpack. Useful
148  * for development and debugging. */
streamLogListpackContent(unsigned char * lp)149 void streamLogListpackContent(unsigned char *lp) {
150     unsigned char *p = lpFirst(lp);
151     while(p) {
152         unsigned char buf[LP_INTBUF_SIZE];
153         int64_t v;
154         unsigned char *ele = lpGet(p,&v,buf);
155         serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele);
156         p = lpNext(lp,p);
157     }
158 }
159 
160 /* Convert the specified stream entry ID as a 128 bit big endian number, so
161  * that the IDs can be sorted lexicographically. */
streamEncodeID(void * buf,streamID * id)162 void streamEncodeID(void *buf, streamID *id) {
163     uint64_t e[2];
164     e[0] = htonu64(id->ms);
165     e[1] = htonu64(id->seq);
166     memcpy(buf,e,sizeof(e));
167 }
168 
169 /* This is the reverse of streamEncodeID(): the decoded ID will be stored
170  * in the 'id' structure passed by reference. The buffer 'buf' must point
171  * to a 128 bit big-endian encoded ID. */
streamDecodeID(void * buf,streamID * id)172 void streamDecodeID(void *buf, streamID *id) {
173     uint64_t e[2];
174     memcpy(e,buf,sizeof(e));
175     id->ms = ntohu64(e[0]);
176     id->seq = ntohu64(e[1]);
177 }
178 
179 /* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */
streamCompareID(streamID * a,streamID * b)180 int streamCompareID(streamID *a, streamID *b) {
181     if (a->ms > b->ms) return 1;
182     else if (a->ms < b->ms) return -1;
183     /* The ms part is the same. Check the sequence part. */
184     else if (a->seq > b->seq) return 1;
185     else if (a->seq < b->seq) return -1;
186     /* Everything is the same: IDs are equal. */
187     return 0;
188 }
189 
190 /* Adds a new item into the stream 's' having the specified number of
191  * field-value pairs as specified in 'numfields' and stored into 'argv'.
192  * Returns the new entry ID populating the 'added_id' structure.
193  *
194  * If 'use_id' is not NULL, the ID is not auto-generated by the function,
195  * but instead the passed ID is uesd to add the new entry. In this case
196  * adding the entry may fail as specified later in this comment.
197  *
198  * The function returns C_OK if the item was added, this is always true
199  * if the ID was generated by the function. However the function may return
200  * C_ERR in several cases:
201  * 1. If an ID was given via 'use_id', but adding it failed since the
202  *    current top ID is greater or equal. errno will be set to EDOM.
203  * 2. If a size of a single element or the sum of the elements is too big to
204  *    be stored into the stream. errno will be set to ERANGE. */
streamAppendItem(stream * s,robj ** argv,int64_t numfields,streamID * added_id,streamID * use_id)205 int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
206 
207     /* Generate the new entry ID. */
208     streamID id;
209     if (use_id)
210         id = *use_id;
211     else
212         streamNextID(&s->last_id,&id);
213 
214     /* Check that the new ID is greater than the last entry ID
215      * or return an error. Automatically generated IDs might
216      * overflow (and wrap-around) when incrementing the sequence
217        part. */
218     if (streamCompareID(&id,&s->last_id) <= 0) {
219         errno = EDOM;
220         return C_ERR;
221     }
222 
223     /* Avoid overflow when trying to add an element to the stream (listpack
224      * can only host up to 32bit length sttrings, and also a total listpack size
225      * can't be bigger than 32bit length. */
226     size_t totelelen = 0;
227     for (int64_t i = 0; i < numfields*2; i++) {
228         sds ele = argv[i]->ptr;
229         totelelen += sdslen(ele);
230     }
231     if (totelelen > STREAM_LISTPACK_MAX_SIZE) {
232         errno = ERANGE;
233         return C_ERR;
234     }
235 
236     /* Add the new entry. */
237     raxIterator ri;
238     raxStart(&ri,s->rax);
239     raxSeek(&ri,"$",NULL,0);
240 
241     size_t lp_bytes = 0;        /* Total bytes in the tail listpack. */
242     unsigned char *lp = NULL;   /* Tail listpack pointer. */
243 
244     /* Get a reference to the tail node listpack. */
245     if (raxNext(&ri)) {
246         lp = ri.data;
247         lp_bytes = lpBytes(lp);
248     }
249     raxStop(&ri);
250 
251     /* We have to add the key into the radix tree in lexicographic order,
252      * to do so we consider the ID as a single 128 bit number written in
253      * big endian, so that the most significant bytes are the first ones. */
254     uint64_t rax_key[2];    /* Key in the radix tree containing the listpack.*/
255     streamID master_id;     /* ID of the master entry in the listpack. */
256 
257     /* Create a new listpack and radix tree node if needed. Note that when
258      * a new listpack is created, we populate it with a "master entry". This
259      * is just a set of fields that is taken as references in order to compress
260      * the stream entries that we'll add inside the listpack.
261      *
262      * Note that while we use the first added entry fields to create
263      * the master entry, the first added entry is NOT represented in the master
264      * entry, which is a stand alone object. But of course, the first entry
265      * will compress well because it's used as reference.
266      *
267      * The master entry is composed like in the following example:
268      *
269      * +-------+---------+------------+---------+--/--+---------+---------+-+
270      * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
271      * +-------+---------+------------+---------+--/--+---------+---------+-+
272      *
273      * count and deleted just represent respectively the total number of
274      * entries inside the listpack that are valid, and marked as deleted
275      * (delted flag in the entry flags set). So the total number of items
276      * actually inside the listpack (both deleted and not) is count+deleted.
277      *
278      * The real entries will be encoded with an ID that is just the
279      * millisecond and sequence difference compared to the key stored at
280      * the radix tree node containing the listpack (delta encoding), and
281      * if the fields of the entry are the same as the master enty fields, the
282      * entry flags will specify this fact and the entry fields and number
283      * of fields will be omitted (see later in the code of this function).
284      *
285      * The "0" entry at the end is the same as the 'lp-count' entry in the
286      * regular stream entries (see below), and marks the fact that there are
287      * no more entries, when we scan the stream from right to left. */
288 
289     /* First of all, check if we can append to the current macro node or
290      * if we need to switch to the next one. 'lp' will be set to NULL if
291      * the current node is full. */
292     if (lp != NULL) {
293         size_t node_max_bytes = server.stream_node_max_bytes;
294         if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE)
295             node_max_bytes = STREAM_LISTPACK_MAX_SIZE;
296         if (lp_bytes + totelelen >= node_max_bytes) {
297             lp = NULL;
298         } else if (server.stream_node_max_entries) {
299             int64_t count = lpGetInteger(lpFirst(lp));
300             if (count >= server.stream_node_max_entries) lp = NULL;
301         }
302     }
303 
304     int flags = STREAM_ITEM_FLAG_NONE;
305     if (lp == NULL || lp_bytes >= server.stream_node_max_bytes) {
306         master_id = id;
307         streamEncodeID(rax_key,&id);
308         /* Create the listpack having the master entry ID and fields. */
309         lp = lpNew();
310         lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
311         lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
312         lp = lpAppendInteger(lp,numfields);
313         for (int64_t i = 0; i < numfields; i++) {
314             sds field = argv[i*2]->ptr;
315             lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
316         }
317         lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
318         raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
319         /* The first entry we insert, has obviously the same fields of the
320          * master entry. */
321         flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
322     } else {
323         serverAssert(ri.key_len == sizeof(rax_key));
324         memcpy(rax_key,ri.key,sizeof(rax_key));
325 
326         /* Read the master ID from the radix tree key. */
327         streamDecodeID(rax_key,&master_id);
328         unsigned char *lp_ele = lpFirst(lp);
329 
330         /* Update count and skip the deleted fields. */
331         int64_t count = lpGetInteger(lp_ele);
332         lp = lpReplaceInteger(lp,&lp_ele,count+1);
333         lp_ele = lpNext(lp,lp_ele); /* seek deleted. */
334         lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
335 
336         /* Check if the entry we are adding, have the same fields
337          * as the master entry. */
338         int64_t master_fields_count = lpGetInteger(lp_ele);
339         lp_ele = lpNext(lp,lp_ele);
340         if (numfields == master_fields_count) {
341             int64_t i;
342             for (i = 0; i < master_fields_count; i++) {
343                 sds field = argv[i*2]->ptr;
344                 int64_t e_len;
345                 unsigned char buf[LP_INTBUF_SIZE];
346                 unsigned char *e = lpGet(lp_ele,&e_len,buf);
347                 /* Stop if there is a mismatch. */
348                 if (sdslen(field) != (size_t)e_len ||
349                     memcmp(e,field,e_len) != 0) break;
350                 lp_ele = lpNext(lp,lp_ele);
351             }
352             /* All fields are the same! We can compress the field names
353              * setting a single bit in the flags. */
354             if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
355         }
356     }
357 
358     /* Populate the listpack with the new entry. We use the following
359      * encoding:
360      *
361      * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
362      * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
363      * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
364      *
365      * However if the SAMEFIELD flag is set, we have just to populate
366      * the entry with the values, so it becomes:
367      *
368      * +-----+--------+-------+-/-+-------+--------+
369      * |flags|entry-id|value-1|...|value-N|lp-count|
370      * +-----+--------+-------+-/-+-------+--------+
371      *
372      * The entry-id field is actually two separated fields: the ms
373      * and seq difference compared to the master entry.
374      *
375      * The lp-count field is a number that states the number of listpack pieces
376      * that compose the entry, so that it's possible to travel the entry
377      * in reverse order: we can just start from the end of the listpack, read
378      * the entry, and jump back N times to seek the "flags" field to read
379      * the stream full entry. */
380     lp = lpAppendInteger(lp,flags);
381     lp = lpAppendInteger(lp,id.ms - master_id.ms);
382     lp = lpAppendInteger(lp,id.seq - master_id.seq);
383     if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
384         lp = lpAppendInteger(lp,numfields);
385     for (int64_t i = 0; i < numfields; i++) {
386         sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
387         if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
388             lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
389         lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
390     }
391     /* Compute and store the lp-count field. */
392     int64_t lp_count = numfields;
393     lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
394     if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
395         /* If the item is not compressed, it also has the fields other than
396          * the values, and an additional num-fileds field. */
397         lp_count += numfields+1;
398     }
399     lp = lpAppendInteger(lp,lp_count);
400 
401     /* Insert back into the tree in order to update the listpack pointer. */
402     if (ri.data != lp)
403         raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
404     s->length++;
405     s->last_id = id;
406     if (added_id) *added_id = id;
407     return C_OK;
408 }
409 
410 /* Trim the stream 's' to have no more than maxlen elements, and return the
411  * number of elements removed from the stream. The 'approx' option, if non-zero,
412  * specifies that the trimming must be performed in a approximated way in
413  * order to maximize performances. This means that the stream may contain
414  * more elements than 'maxlen', and elements are only removed if we can remove
415  * a *whole* node of the radix tree. The elements are removed from the head
416  * of the stream (older elements).
417  *
418  * The function may return zero if:
419  *
420  * 1) The stream is already shorter or equal to the specified max length.
421  * 2) The 'approx' option is true and the head node had not enough elements
422  *    to be deleted, leaving the stream with a number of elements >= maxlen.
423  */
streamTrimByLength(stream * s,size_t maxlen,int approx)424 int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
425     if (s->length <= maxlen) return 0;
426 
427     raxIterator ri;
428     raxStart(&ri,s->rax);
429     raxSeek(&ri,"^",NULL,0);
430 
431     int64_t deleted = 0;
432     while(s->length > maxlen && raxNext(&ri)) {
433         unsigned char *lp = ri.data, *p = lpFirst(lp);
434         int64_t entries = lpGetInteger(p);
435 
436         /* Check if we can remove the whole node, and still have at
437          * least maxlen elements. */
438         if (s->length - entries >= maxlen) {
439             lpFree(lp);
440             raxRemove(s->rax,ri.key,ri.key_len,NULL);
441             raxSeek(&ri,">=",ri.key,ri.key_len);
442             s->length -= entries;
443             deleted += entries;
444             continue;
445         }
446 
447         /* If we cannot remove a whole element, and approx is true,
448          * stop here. */
449         if (approx) break;
450 
451         /* Otherwise, we have to mark single entries inside the listpack
452          * as deleted. We start by updating the entries/deleted counters. */
453         int64_t to_delete = s->length - maxlen;
454         serverAssert(to_delete < entries);
455         lp = lpReplaceInteger(lp,&p,entries-to_delete);
456         p = lpNext(lp,p); /* Seek deleted field. */
457         int64_t marked_deleted = lpGetInteger(p);
458         lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
459         p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */
460 
461         /* Skip all the master fields. */
462         int64_t master_fields_count = lpGetInteger(p);
463         p = lpNext(lp,p); /* Seek the first field. */
464         for (int64_t j = 0; j < master_fields_count; j++)
465             p = lpNext(lp,p); /* Skip all master fields. */
466         p = lpNext(lp,p); /* Skip the zero master entry terminator. */
467 
468         /* 'p' is now pointing to the first entry inside the listpack.
469          * We have to run entry after entry, marking entries as deleted
470          * if they are already not deleted. */
471         while(p) {
472             int flags = lpGetInteger(p);
473             int to_skip;
474 
475             /* Mark the entry as deleted. */
476             if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
477                 flags |= STREAM_ITEM_FLAG_DELETED;
478                 lp = lpReplaceInteger(lp,&p,flags);
479                 deleted++;
480                 s->length--;
481                 if (s->length <= maxlen) break; /* Enough entries deleted. */
482             }
483 
484             p = lpNext(lp,p); /* Skip ID ms delta. */
485             p = lpNext(lp,p); /* Skip ID seq delta. */
486             p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */
487             if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
488                 to_skip = master_fields_count;
489             } else {
490                 to_skip = lpGetInteger(p);
491                 to_skip = 1+(to_skip*2);
492             }
493 
494             while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
495             p = lpNext(lp,p); /* Skip the final lp-count field. */
496         }
497 
498         /* Here we should perform garbage collection in case at this point
499          * there are too many entries deleted inside the listpack. */
500         entries -= to_delete;
501         marked_deleted += to_delete;
502         if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
503             /* TODO: perform a garbage collection. */
504         }
505 
506         /* Update the listpack with the new pointer. */
507         raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
508 
509         break; /* If we are here, there was enough to delete in the current
510                   node, so no need to go to the next node. */
511     }
512 
513     raxStop(&ri);
514     return deleted;
515 }
516 
517 /* Initialize the stream iterator, so that we can call iterating functions
518  * to get the next items. This requires a corresponding streamIteratorStop()
519  * at the end. The 'rev' parameter controls the direction. If it's zero the
520  * iteration is from the start to the end element (inclusive), otherwise
521  * if rev is non-zero, the iteration is reversed.
522  *
523  * Once the iterator is initialized, we iterate like this:
524  *
525  *  streamIterator myiterator;
526  *  streamIteratorStart(&myiterator,...);
527  *  int64_t numfields;
528  *  while(streamIteratorGetID(&myiterator,&ID,&numfields)) {
529  *      while(numfields--) {
530  *          unsigned char *key, *value;
531  *          size_t key_len, value_len;
532  *          streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len);
533  *
534  *          ... do what you want with key and value ...
535  *      }
536  *  }
537  *  streamIteratorStop(&myiterator); */
streamIteratorStart(streamIterator * si,stream * s,streamID * start,streamID * end,int rev)538 void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
539     /* Intialize the iterator and translates the iteration start/stop
540      * elements into a 128 big big-endian number. */
541     if (start) {
542         streamEncodeID(si->start_key,start);
543     } else {
544         si->start_key[0] = 0;
545         si->start_key[1] = 0;
546     }
547 
548     if (end) {
549         streamEncodeID(si->end_key,end);
550     } else {
551         si->end_key[0] = UINT64_MAX;
552         si->end_key[1] = UINT64_MAX;
553     }
554 
555     /* Seek the correct node in the radix tree. */
556     raxStart(&si->ri,s->rax);
557     if (!rev) {
558         if (start && (start->ms || start->seq)) {
559             raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
560                     sizeof(si->start_key));
561             if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0);
562         } else {
563             raxSeek(&si->ri,"^",NULL,0);
564         }
565     } else {
566         if (end && (end->ms || end->seq)) {
567             raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,
568                     sizeof(si->end_key));
569             if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0);
570         } else {
571             raxSeek(&si->ri,"$",NULL,0);
572         }
573     }
574     si->stream = s;
575     si->lp = NULL; /* There is no current listpack right now. */
576     si->lp_ele = NULL; /* Current listpack cursor. */
577     si->rev = rev;  /* Direction, if non-zero reversed, from end to start. */
578 }
579 
580 /* Return 1 and store the current item ID at 'id' if there are still
581  * elements within the iteration range, otherwise return 0 in order to
582  * signal the iteration terminated. */
streamIteratorGetID(streamIterator * si,streamID * id,int64_t * numfields)583 int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
584     while(1) { /* Will stop when element > stop_key or end of radix tree. */
585         /* If the current listpack is set to NULL, this is the start of the
586          * iteration or the previous listpack was completely iterated.
587          * Go to the next node. */
588         if (si->lp == NULL || si->lp_ele == NULL) {
589             if (!si->rev && !raxNext(&si->ri)) return 0;
590             else if (si->rev && !raxPrev(&si->ri)) return 0;
591             serverAssert(si->ri.key_len == sizeof(streamID));
592             /* Get the master ID. */
593             streamDecodeID(si->ri.key,&si->master_id);
594             /* Get the master fields count. */
595             si->lp = si->ri.data;
596             si->lp_ele = lpFirst(si->lp);           /* Seek items count */
597             si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */
598             si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */
599             si->master_fields_count = lpGetInteger(si->lp_ele);
600             si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */
601             si->master_fields_start = si->lp_ele;
602             /* We are now pointing to the first field of the master entry.
603              * We need to seek either the first or the last entry depending
604              * on the direction of the iteration. */
605             if (!si->rev) {
606                 /* If we are iterating in normal order, skip the master fields
607                  * to seek the first actual entry. */
608                 for (uint64_t i = 0; i < si->master_fields_count; i++)
609                     si->lp_ele = lpNext(si->lp,si->lp_ele);
610             } else {
611                 /* If we are iterating in reverse direction, just seek the
612                  * last part of the last entry in the listpack (that is, the
613                  * fields count). */
614                 si->lp_ele = lpLast(si->lp);
615             }
616         } else if (si->rev) {
617             /* If we are itereating in the reverse order, and this is not
618              * the first entry emitted for this listpack, then we already
619              * emitted the current entry, and have to go back to the previous
620              * one. */
621             int lp_count = lpGetInteger(si->lp_ele);
622             while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
623             /* Seek lp-count of prev entry. */
624             si->lp_ele = lpPrev(si->lp,si->lp_ele);
625         }
626 
627         /* For every radix tree node, iterate the corresponding listpack,
628          * returning elements when they are within range. */
629         while(1) {
630             if (!si->rev) {
631                 /* If we are going forward, skip the previous entry
632                  * lp-count field (or in case of the master entry, the zero
633                  * term field) */
634                 si->lp_ele = lpNext(si->lp,si->lp_ele);
635                 if (si->lp_ele == NULL) break;
636             } else {
637                 /* If we are going backward, read the number of elements this
638                  * entry is composed of, and jump backward N times to seek
639                  * its start. */
640                 int64_t lp_count = lpGetInteger(si->lp_ele);
641                 if (lp_count == 0) { /* We reached the master entry. */
642                     si->lp = NULL;
643                     si->lp_ele = NULL;
644                     break;
645                 }
646                 while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
647             }
648 
649             /* Get the flags entry. */
650             si->lp_flags = si->lp_ele;
651             int flags = lpGetInteger(si->lp_ele);
652             si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */
653 
654             /* Get the ID: it is encoded as difference between the master
655              * ID and this entry ID. */
656             *id = si->master_id;
657             id->ms += lpGetInteger(si->lp_ele);
658             si->lp_ele = lpNext(si->lp,si->lp_ele);
659             id->seq += lpGetInteger(si->lp_ele);
660             si->lp_ele = lpNext(si->lp,si->lp_ele);
661             unsigned char buf[sizeof(streamID)];
662             streamEncodeID(buf,id);
663 
664             /* The number of entries is here or not depending on the
665              * flags. */
666             if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
667                 *numfields = si->master_fields_count;
668             } else {
669                 *numfields = lpGetInteger(si->lp_ele);
670                 si->lp_ele = lpNext(si->lp,si->lp_ele);
671             }
672 
673             /* If current >= start, and the entry is not marked as
674              * deleted, emit it. */
675             if (!si->rev) {
676                 if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
677                     !(flags & STREAM_ITEM_FLAG_DELETED))
678                 {
679                     if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
680                         return 0; /* We are already out of range. */
681                     si->entry_flags = flags;
682                     if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
683                         si->master_fields_ptr = si->master_fields_start;
684                     return 1; /* Valid item returned. */
685                 }
686             } else {
687                 if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
688                     !(flags & STREAM_ITEM_FLAG_DELETED))
689                 {
690                     if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
691                         return 0; /* We are already out of range. */
692                     si->entry_flags = flags;
693                     if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
694                         si->master_fields_ptr = si->master_fields_start;
695                     return 1; /* Valid item returned. */
696                 }
697             }
698 
699             /* If we do not emit, we have to discard if we are going
700              * forward, or seek the previous entry if we are going
701              * backward. */
702             if (!si->rev) {
703                 int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ?
704                                       *numfields : *numfields*2;
705                 for (int64_t i = 0; i < to_discard; i++)
706                     si->lp_ele = lpNext(si->lp,si->lp_ele);
707             } else {
708                 int64_t prev_times = 4; /* flag + id ms + id seq + one more to
709                                            go back to the previous entry "count"
710                                            field. */
711                 /* If the entry was not flagged SAMEFIELD we also read the
712                  * number of fields, so go back one more. */
713                 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) prev_times++;
714                 while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
715             }
716         }
717 
718         /* End of listpack reached. Try the next/prev radix tree node. */
719     }
720 }
721 
722 /* Get the field and value of the current item we are iterating. This should
723  * be called immediately after streamIteratorGetID(), and for each field
724  * according to the number of fields returned by streamIteratorGetID().
725  * The function populates the field and value pointers and the corresponding
726  * lengths by reference, that are valid until the next iterator call, assuming
727  * no one touches the stream meanwhile. */
streamIteratorGetField(streamIterator * si,unsigned char ** fieldptr,unsigned char ** valueptr,int64_t * fieldlen,int64_t * valuelen)728 void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {
729     if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
730         *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);
731         si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);
732     } else {
733         *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
734         si->lp_ele = lpNext(si->lp,si->lp_ele);
735     }
736     *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);
737     si->lp_ele = lpNext(si->lp,si->lp_ele);
738 }
739 
740 /* Remove the current entry from the stream: can be called after the
741  * GetID() API or after any GetField() call, however we need to iterate
742  * a valid entry while calling this function. Moreover the function
743  * requires the entry ID we are currently iterating, that was previously
744  * returned by GetID().
745  *
746  * Note that after calling this function, next calls to GetField() can't
747  * be performed: the entry is now deleted. Instead the iterator will
748  * automatically re-seek to the next entry, so the caller should continue
749  * with GetID(). */
streamIteratorRemoveEntry(streamIterator * si,streamID * current)750 void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
751     unsigned char *lp = si->lp;
752     int64_t aux;
753 
754     /* We do not really delete the entry here. Instead we mark it as
755      * deleted flagging it, and also incrementing the count of the
756      * deleted entries in the listpack header.
757      *
758      * We start flagging: */
759     int flags = lpGetInteger(si->lp_flags);
760     flags |= STREAM_ITEM_FLAG_DELETED;
761     lp = lpReplaceInteger(lp,&si->lp_flags,flags);
762 
763     /* Change the valid/deleted entries count in the master entry. */
764     unsigned char *p = lpFirst(lp);
765     aux = lpGetInteger(p);
766 
767     if (aux == 1) {
768         /* If this is the last element in the listpack, we can remove the whole
769          * node. */
770         lpFree(lp);
771         raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);
772     } else {
773         /* In the base case we alter the counters of valid/deleted entries. */
774         lp = lpReplaceInteger(lp,&p,aux-1);
775         p = lpNext(lp,p); /* Seek deleted field. */
776         aux = lpGetInteger(p);
777         lp = lpReplaceInteger(lp,&p,aux+1);
778 
779         /* Update the listpack with the new pointer. */
780         if (si->lp != lp)
781             raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL);
782     }
783 
784     /* Update the number of entries counter. */
785     si->stream->length--;
786 
787     /* Re-seek the iterator to fix the now messed up state. */
788     streamID start, end;
789     if (si->rev) {
790         streamDecodeID(si->start_key,&start);
791         end = *current;
792     } else {
793         start = *current;
794         streamDecodeID(si->end_key,&end);
795     }
796     streamIteratorStop(si);
797     streamIteratorStart(si,si->stream,&start,&end,si->rev);
798 
799     /* TODO: perform a garbage collection here if the ration between
800      * deleted and valid goes over a certain limit. */
801 }
802 
803 /* Stop the stream iterator. The only cleanup we need is to free the rax
804  * itereator, since the stream iterator itself is supposed to be stack
805  * allocated. */
streamIteratorStop(streamIterator * si)806 void streamIteratorStop(streamIterator *si) {
807     raxStop(&si->ri);
808 }
809 
810 /* Delete the specified item ID from the stream, returning 1 if the item
811  * was deleted 0 otherwise (if it does not exist). */
streamDeleteItem(stream * s,streamID * id)812 int streamDeleteItem(stream *s, streamID *id) {
813     int deleted = 0;
814     streamIterator si;
815     streamIteratorStart(&si,s,id,id,0);
816     streamID myid;
817     int64_t numfields;
818     if (streamIteratorGetID(&si,&myid,&numfields)) {
819         streamIteratorRemoveEntry(&si,&myid);
820         deleted = 1;
821     }
822     streamIteratorStop(&si);
823     return deleted;
824 }
825 
826 /* Get the last valid (non-tombstone) streamID of 's'. */
streamLastValidID(stream * s,streamID * maxid)827 void streamLastValidID(stream *s, streamID *maxid)
828 {
829     streamIterator si;
830     streamIteratorStart(&si,s,NULL,NULL,1);
831     int64_t numfields;
832     streamIteratorGetID(&si,maxid,&numfields);
833     streamIteratorStop(&si);
834 }
835 
836 /* Emit a reply in the client output buffer by formatting a Stream ID
837  * in the standard <ms>-<seq> format, using the simple string protocol
838  * of REPL. */
addReplyStreamID(client * c,streamID * id)839 void addReplyStreamID(client *c, streamID *id) {
840     sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
841     addReplyBulkSds(c,replyid);
842 }
843 
844 /* Similar to the above function, but just creates an object, usually useful
845  * for replication purposes to create arguments. */
createObjectFromStreamID(streamID * id)846 robj *createObjectFromStreamID(streamID *id) {
847     return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U",
848                         id->ms,id->seq));
849 }
850 
851 /* As a result of an explicit XCLAIM or XREADGROUP command, new entries
852  * are created in the pending list of the stream and consumers. We need
853  * to propagate this changes in the form of XCLAIM commands. */
streamPropagateXCLAIM(client * c,robj * key,streamCG * group,robj * groupname,robj * id,streamNACK * nack)854 void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
855     /* We need to generate an XCLAIM that will work in a idempotent fashion:
856      *
857      * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
858      *        RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
859      *
860      * Note that JUSTID is useful in order to avoid that XCLAIM will do
861      * useless work in the slave side, trying to fetch the stream item. */
862     robj *argv[14];
863     argv[0] = createStringObject("XCLAIM",6);
864     argv[1] = key;
865     argv[2] = groupname;
866     argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
867     argv[4] = createStringObjectFromLongLong(0);
868     argv[5] = id;
869     argv[6] = createStringObject("TIME",4);
870     argv[7] = createStringObjectFromLongLong(nack->delivery_time);
871     argv[8] = createStringObject("RETRYCOUNT",10);
872     argv[9] = createStringObjectFromLongLong(nack->delivery_count);
873     argv[10] = createStringObject("FORCE",5);
874     argv[11] = createStringObject("JUSTID",6);
875     argv[12] = createStringObject("LASTID",6);
876     argv[13] = createObjectFromStreamID(&group->last_id);
877 
878     /* We use progagate() because this code path is not always called from
879      * the command execution context. Moreover this will just alter the
880      * consumer group state, and we don't need MULTI/EXEC wrapping because
881      * there is no message state cross-message atomicity required. */
882     propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
883     decrRefCount(argv[0]);
884     decrRefCount(argv[3]);
885     decrRefCount(argv[4]);
886     decrRefCount(argv[6]);
887     decrRefCount(argv[7]);
888     decrRefCount(argv[8]);
889     decrRefCount(argv[9]);
890     decrRefCount(argv[10]);
891     decrRefCount(argv[11]);
892     decrRefCount(argv[12]);
893     decrRefCount(argv[13]);
894 }
895 
896 /* We need this when we want to propoagate the new last-id of a consumer group
897  * that was consumed by XREADGROUP with the NOACK option: in that case we can't
898  * propagate the last ID just using the XCLAIM LASTID option, so we emit
899  *
900  *  XGROUP SETID <key> <groupname> <id>
901  */
streamPropagateGroupID(client * c,robj * key,streamCG * group,robj * groupname)902 void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
903     robj *argv[5];
904     argv[0] = createStringObject("XGROUP",6);
905     argv[1] = createStringObject("SETID",5);
906     argv[2] = key;
907     argv[3] = groupname;
908     argv[4] = createObjectFromStreamID(&group->last_id);
909 
910     /* We use progagate() because this code path is not always called from
911      * the command execution context. Moreover this will just alter the
912      * consumer group state, and we don't need MULTI/EXEC wrapping because
913      * there is no message state cross-message atomicity required. */
914     propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
915     decrRefCount(argv[0]);
916     decrRefCount(argv[1]);
917     decrRefCount(argv[4]);
918 }
919 
920 /* Send the stream items in the specified range to the client 'c'. The range
921  * the client will receive is between start and end inclusive, if 'count' is
922  * non zero, no more than 'count' elements are sent.
923  *
924  * The 'end' pointer can be NULL to mean that we want all the elements from
925  * 'start' till the end of the stream. If 'rev' is non zero, elements are
926  * produced in reversed order from end to start.
927  *
928  * The function returns the number of entries emitted.
929  *
930  * If group and consumer are not NULL, the function performs additional work:
931  * 1. It updates the last delivered ID in the group in case we are
932  *    sending IDs greater than the current last ID.
933  * 2. If the requested IDs are already assigned to some other consumer, the
934  *    function will not return it to the client.
935  * 3. An entry in the pending list will be created for every entry delivered
936  *    for the first time to this consumer.
937  *
938  * The behavior may be modified passing non-zero flags:
939  *
940  * STREAM_RWR_NOACK: Do not craete PEL entries, that is, the point "3" above
941  *                   is not performed.
942  * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries,
943  *                        and return the number of entries emitted as usually.
944  *                        This is used when the function is just used in order
945  *                        to emit data and there is some higher level logic.
946  *
947  * The final argument 'spi' (stream propagatino info pointer) is a structure
948  * filled with information needed to propagte the command execution to AOF
949  * and slaves, in the case a consumer group was passed: we need to generate
950  * XCLAIM commands to create the pending list into AOF/slaves in that case.
951  *
952  * If 'spi' is set to NULL no propagation will happen even if the group was
953  * given, but currently such a feature is never used by the code base that
954  * will always pass 'spi' and propagate when a group is passed.
955  *
956  * Note that this function is recursive in certain cases. When it's called
957  * with a non NULL group and consumer argument, it may call
958  * streamReplyWithRangeFromConsumerPEL() in order to get entries from the
959  * consumer pending entries list. However such a function will then call
960  * streamReplyWithRange() in order to emit single entries (found in the
961  * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
962  * flag.
963  */
964 #define STREAM_RWR_NOACK (1<<0)         /* Do not create entries in the PEL. */
965 #define STREAM_RWR_RAWENTRIES (1<<1)    /* Do not emit protocol for array
966                                            boundaries, just the entries. */
967 #define STREAM_RWR_HISTORY (1<<2)       /* Only serve consumer local PEL. */
streamReplyWithRange(client * c,stream * s,streamID * start,streamID * end,size_t count,int rev,streamCG * group,streamConsumer * consumer,int flags,streamPropInfo * spi)968 size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
969     void *arraylen_ptr = NULL;
970     size_t arraylen = 0;
971     streamIterator si;
972     int64_t numfields;
973     streamID id;
974     int propagate_last_id = 0;
975 
976     /* If the client is asking for some history, we serve it using a
977      * different function, so that we return entries *solely* from its
978      * own PEL. This ensures each consumer will always and only see
979      * the history of messages delivered to it and not yet confirmed
980      * as delivered. */
981     if (group && (flags & STREAM_RWR_HISTORY)) {
982         return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
983                                                    consumer);
984     }
985 
986     if (!(flags & STREAM_RWR_RAWENTRIES))
987         arraylen_ptr = addDeferredMultiBulkLength(c);
988     streamIteratorStart(&si,s,start,end,rev);
989     while(streamIteratorGetID(&si,&id,&numfields)) {
990         /* Update the group last_id if needed. */
991         if (group && streamCompareID(&id,&group->last_id) > 0) {
992             group->last_id = id;
993             propagate_last_id = 1;
994         }
995 
996         /* Emit a two elements array for each item. The first is
997          * the ID, the second is an array of field-value pairs. */
998         addReplyMultiBulkLen(c,2);
999         addReplyStreamID(c,&id);
1000         addReplyMultiBulkLen(c,numfields*2);
1001 
1002         /* Emit the field-value pairs. */
1003         while(numfields--) {
1004             unsigned char *key, *value;
1005             int64_t key_len, value_len;
1006             streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
1007             addReplyBulkCBuffer(c,key,key_len);
1008             addReplyBulkCBuffer(c,value,value_len);
1009         }
1010 
1011         /* If a group is passed, we need to create an entry in the
1012          * PEL (pending entries list) of this group *and* this consumer.
1013          *
1014          * Note that we cannot be sure about the fact the message is not
1015          * already owned by another consumer, because the admin is able
1016          * to change the consumer group last delivered ID using the
1017          * XGROUP SETID command. So if we find that there is already
1018          * a NACK for the entry, we need to associate it to the new
1019          * consumer. */
1020         if (group && !(flags & STREAM_RWR_NOACK)) {
1021             unsigned char buf[sizeof(streamID)];
1022             streamEncodeID(buf,&id);
1023 
1024             /* Try to add a new NACK. Most of the time this will work and
1025              * will not require extra lookups. We'll fix the problem later
1026              * if we find that there is already a entry for this ID. */
1027             streamNACK *nack = streamCreateNACK(consumer);
1028             int group_inserted =
1029                 raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
1030             int consumer_inserted =
1031                 raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
1032 
1033             /* Now we can check if the entry was already busy, and
1034              * in that case reassign the entry to the new consumer,
1035              * or update it if the consumer is the same as before. */
1036             if (group_inserted == 0) {
1037                 streamFreeNACK(nack);
1038                 nack = raxFind(group->pel,buf,sizeof(buf));
1039                 serverAssert(nack != raxNotFound);
1040                 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
1041                 /* Update the consumer and NACK metadata. */
1042                 nack->consumer = consumer;
1043                 nack->delivery_time = mstime();
1044                 nack->delivery_count = 1;
1045                 /* Add the entry in the new consumer local PEL. */
1046                 raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
1047             } else if (group_inserted == 1 && consumer_inserted == 0) {
1048                 serverPanic("NACK half-created. Should not be possible.");
1049             }
1050 
1051             /* Propagate as XCLAIM. */
1052             if (spi) {
1053                 robj *idarg = createObjectFromStreamID(&id);
1054                 streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
1055                 decrRefCount(idarg);
1056             }
1057         } else {
1058             if (propagate_last_id)
1059                 streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
1060         }
1061 
1062         arraylen++;
1063         if (count && count == arraylen) break;
1064     }
1065     streamIteratorStop(&si);
1066     if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
1067     return arraylen;
1068 }
1069 
1070 /* This is an helper function for streamReplyWithRange() when called with
1071  * group and consumer arguments, but with a range that is referring to already
1072  * delivered messages. In this case we just emit messages that are already
1073  * in the history of the consumer, fetching the IDs from its PEL.
1074  *
1075  * Note that this function does not have a 'rev' argument because it's not
1076  * possible to iterate in reverse using a group. Basically this function
1077  * is only called as a result of the XREADGROUP command.
1078  *
1079  * This function is more expensive because it needs to inspect the PEL and then
1080  * seek into the radix tree of the messages in order to emit the full message
1081  * to the client. However clients only reach this code path when they are
1082  * fetching the history of already retrieved messages, which is rare. */
streamReplyWithRangeFromConsumerPEL(client * c,stream * s,streamID * start,streamID * end,size_t count,streamConsumer * consumer)1083 size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
1084     raxIterator ri;
1085     unsigned char startkey[sizeof(streamID)];
1086     unsigned char endkey[sizeof(streamID)];
1087     streamEncodeID(startkey,start);
1088     if (end) streamEncodeID(endkey,end);
1089 
1090     size_t arraylen = 0;
1091     void *arraylen_ptr = addDeferredMultiBulkLength(c);
1092     raxStart(&ri,consumer->pel);
1093     raxSeek(&ri,">=",startkey,sizeof(startkey));
1094     while(raxNext(&ri) && (!count || arraylen < count)) {
1095         if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
1096         streamID thisid;
1097         streamDecodeID(ri.key,&thisid);
1098         if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
1099                                  STREAM_RWR_RAWENTRIES,NULL) == 0)
1100         {
1101             /* Note that we may have a not acknowledged entry in the PEL
1102              * about a message that's no longer here because was removed
1103              * by the user by other means. In that case we signal it emitting
1104              * the ID but then a NULL entry for the fields. */
1105             addReplyMultiBulkLen(c,2);
1106             streamID id;
1107             streamDecodeID(ri.key,&id);
1108             addReplyStreamID(c,&id);
1109             addReply(c,shared.nullmultibulk);
1110         } else {
1111             streamNACK *nack = ri.data;
1112             nack->delivery_time = mstime();
1113             nack->delivery_count++;
1114         }
1115         arraylen++;
1116     }
1117     raxStop(&ri);
1118     setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
1119     return arraylen;
1120 }
1121 
1122 /* -----------------------------------------------------------------------
1123  * Stream commands implementation
1124  * ----------------------------------------------------------------------- */
1125 
1126 /* Look the stream at 'key' and return the corresponding stream object.
1127  * The function creates a key setting it to an empty stream if needed. */
streamTypeLookupWriteOrCreate(client * c,robj * key)1128 robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
1129     robj *o = lookupKeyWrite(c->db,key);
1130     if (o == NULL) {
1131         o = createStreamObject();
1132         dbAdd(c->db,key,o);
1133     } else {
1134         if (o->type != OBJ_STREAM) {
1135             addReply(c,shared.wrongtypeerr);
1136             return NULL;
1137         }
1138     }
1139     return o;
1140 }
1141 
1142 /* Helper function to convert a string to an unsigned long long value.
1143  * The function attempts to use the faster string2ll() function inside
1144  * Redis: if it fails, strtoull() is used instead. The function returns
1145  * 1 if the conversion happened successfully or 0 if the number is
1146  * invalid or out of range. */
string2ull(const char * s,unsigned long long * value)1147 int string2ull(const char *s, unsigned long long *value) {
1148     long long ll;
1149     if (string2ll(s,strlen(s),&ll)) {
1150         if (ll < 0) return 0; /* Negative values are out of range. */
1151         *value = ll;
1152         return 1;
1153     }
1154     errno = 0;
1155     char *endptr = NULL;
1156     *value = strtoull(s,&endptr,10);
1157     if (errno == EINVAL || errno == ERANGE || !(*s != '\0' && *endptr == '\0'))
1158         return 0; /* strtoull() failed. */
1159     return 1; /* Conversion done! */
1160 }
1161 
1162 /* Parse a stream ID in the format given by clients to Redis, that is
1163  * <ms>-<seq>, and converts it into a streamID structure. If
1164  * the specified ID is invalid C_ERR is returned and an error is reported
1165  * to the client, otherwise C_OK is returned. The ID may be in incomplete
1166  * form, just stating the milliseconds time part of the stream. In such a case
1167  * the missing part is set according to the value of 'missing_seq' parameter.
1168  *
1169  * The IDs "-" and "+" specify respectively the minimum and maximum IDs
1170  * that can be represented. If 'strict' is set to 1, "-" and "+" will be
1171  * treated as an invalid ID.
1172  *
1173  * If 'c' is set to NULL, no reply is sent to the client. */
streamGenericParseIDOrReply(client * c,robj * o,streamID * id,uint64_t missing_seq,int strict)1174 int streamGenericParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int strict) {
1175     char buf[128];
1176     if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
1177     memcpy(buf,o->ptr,sdslen(o->ptr)+1);
1178 
1179     if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
1180         goto invalid;
1181 
1182     /* Handle the "-" and "+" special cases. */
1183     if (buf[0] == '-' && buf[1] == '\0') {
1184         id->ms = 0;
1185         id->seq = 0;
1186         return C_OK;
1187     } else if (buf[0] == '+' && buf[1] == '\0') {
1188         id->ms = UINT64_MAX;
1189         id->seq = UINT64_MAX;
1190         return C_OK;
1191     }
1192 
1193     /* Parse <ms>-<seq> form. */
1194     char *dot = strchr(buf,'-');
1195     if (dot) *dot = '\0';
1196     unsigned long long ms, seq;
1197     if (string2ull(buf,&ms) == 0) goto invalid;
1198     if (dot && string2ull(dot+1,&seq) == 0) goto invalid;
1199     if (!dot) seq = missing_seq;
1200     id->ms = ms;
1201     id->seq = seq;
1202     return C_OK;
1203 
1204 invalid:
1205     if (c) addReplyError(c,"Invalid stream ID specified as stream "
1206                            "command argument");
1207     return C_ERR;
1208 }
1209 
1210 /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
1211  * 0, to be used when - and + are accetable IDs. */
streamParseIDOrReply(client * c,robj * o,streamID * id,uint64_t missing_seq)1212 int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
1213     return streamGenericParseIDOrReply(c,o,id,missing_seq,0);
1214 }
1215 
1216 /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
1217  * 1, to be used when we want to return an error if the special IDs + or -
1218  * are provided. */
streamParseStrictIDOrReply(client * c,robj * o,streamID * id,uint64_t missing_seq)1219 int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
1220     return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
1221 }
1222 
1223 /* We propagate MAXLEN ~ <count> as MAXLEN = <resulting-len-of-stream>
1224  * otherwise trimming is no longer determinsitic on replicas / AOF. */
streamRewriteApproxMaxlen(client * c,stream * s,int maxlen_arg_idx)1225 void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) {
1226     robj *maxlen_obj = createStringObjectFromLongLong(s->length);
1227     robj *equal_obj = createStringObject("=",1);
1228 
1229     rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
1230     rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
1231 
1232     decrRefCount(equal_obj);
1233     decrRefCount(maxlen_obj);
1234 }
1235 
1236 /* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
xaddCommand(client * c)1237 void xaddCommand(client *c) {
1238     streamID id;
1239     int id_given = 0; /* Was an ID different than "*" specified? */
1240     long long maxlen = -1;  /* If left to -1 no trimming is performed. */
1241     int approx_maxlen = 0;  /* If 1 only delete whole radix tree nodes, so
1242                                the maxium length is not applied verbatim. */
1243     int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
1244 
1245     /* Parse options. */
1246     int i = 2; /* This is the first argument position where we could
1247                   find an option, or the ID. */
1248     for (; i < c->argc; i++) {
1249         int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
1250         char *opt = c->argv[i]->ptr;
1251         if (opt[0] == '*' && opt[1] == '\0') {
1252             /* This is just a fast path for the common case of auto-ID
1253              * creation. */
1254             break;
1255         } else if (!strcasecmp(opt,"maxlen") && moreargs) {
1256             approx_maxlen = 0;
1257             char *next = c->argv[i+1]->ptr;
1258             /* Check for the form MAXLEN ~ <count>. */
1259             if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
1260                 approx_maxlen = 1;
1261                 i++;
1262             } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
1263                 i++;
1264             }
1265             if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
1266                 != C_OK) return;
1267 
1268             if (maxlen < 0) {
1269                 addReplyError(c,"The MAXLEN argument must be >= 0.");
1270                 return;
1271             }
1272             i++;
1273             maxlen_arg_idx = i;
1274         } else {
1275             /* If we are here is a syntax error or a valid ID. */
1276             if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
1277             id_given = 1;
1278             break;
1279         }
1280     }
1281     int field_pos = i+1;
1282 
1283     /* Check arity. */
1284     if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
1285         addReplyError(c,"wrong number of arguments for XADD");
1286         return;
1287     }
1288 
1289     /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
1290      * a new stream and have streamAppendItem fail, leaving an empty key in the
1291      * database. */
1292     if (id_given && id.ms == 0 && id.seq == 0) {
1293         addReplyError(c,"The ID specified in XADD must be greater than 0-0");
1294         return;
1295     }
1296 
1297     /* Lookup the stream at key. */
1298     robj *o;
1299     stream *s;
1300     if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
1301     s = o->ptr;
1302 
1303     /* Return ASAP if the stream has reached the last possible ID */
1304     if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
1305         addReplyError(c,"The stream has exhausted the last possible ID, "
1306                         "unable to add more items");
1307         return;
1308     }
1309 
1310     /* Append using the low level function and return the ID. */
1311     if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
1312         &id, id_given ? &id : NULL) == C_ERR)
1313     {
1314         if (errno == EDOM)
1315             addReplyError(c,"The ID specified in XADD is equal or smaller than "
1316                             "the target stream top item");
1317         else
1318             addReplyError(c,"Elements are too large to be stored");
1319         return;
1320     }
1321     addReplyStreamID(c,&id);
1322 
1323     signalModifiedKey(c->db,c->argv[1]);
1324     notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
1325     server.dirty++;
1326 
1327     if (maxlen >= 0) {
1328         /* Notify xtrim event if needed. */
1329         if (streamTrimByLength(s,maxlen,approx_maxlen)) {
1330             notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
1331         }
1332         if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
1333     }
1334 
1335     /* Let's rewrite the ID argument with the one actually generated for
1336      * AOF/replication propagation. */
1337     robj *idarg = createObjectFromStreamID(&id);
1338     rewriteClientCommandArgument(c,i,idarg);
1339     decrRefCount(idarg);
1340 
1341     /* We need to signal to blocked clients that there is new data on this
1342      * stream. */
1343     if (server.blocked_clients_by_type[BLOCKED_STREAM])
1344         signalKeyAsReady(c->db, c->argv[1]);
1345 }
1346 
1347 /* XRANGE/XREVRANGE actual implementation. */
xrangeGenericCommand(client * c,int rev)1348 void xrangeGenericCommand(client *c, int rev) {
1349     robj *o;
1350     stream *s;
1351     streamID startid, endid;
1352     long long count = -1;
1353     robj *startarg = rev ? c->argv[3] : c->argv[2];
1354     robj *endarg = rev ? c->argv[2] : c->argv[3];
1355 
1356     if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return;
1357     if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return;
1358 
1359     /* Parse the COUNT option if any. */
1360     if (c->argc > 4) {
1361         for (int j = 4; j < c->argc; j++) {
1362             int additional = c->argc-j-1;
1363             if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {
1364                 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL)
1365                     != C_OK) return;
1366                 if (count < 0) count = 0;
1367                 j++; /* Consume additional arg. */
1368             } else {
1369                 addReply(c,shared.syntaxerr);
1370                 return;
1371             }
1372         }
1373     }
1374 
1375     /* Return the specified range to the user. */
1376     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
1377         || checkType(c,o,OBJ_STREAM)) return;
1378     s = o->ptr;
1379 
1380     if (count == 0) {
1381         addReply(c,shared.nullmultibulk);
1382     } else {
1383         if (count == -1) count = 0;
1384         streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
1385     }
1386 }
1387 
1388 /* XRANGE key start end [COUNT <n>] */
xrangeCommand(client * c)1389 void xrangeCommand(client *c) {
1390     xrangeGenericCommand(c,0);
1391 }
1392 
1393 /* XREVRANGE key end start [COUNT <n>] */
xrevrangeCommand(client * c)1394 void xrevrangeCommand(client *c) {
1395     xrangeGenericCommand(c,1);
1396 }
1397 
1398 /* XLEN */
xlenCommand(client * c)1399 void xlenCommand(client *c) {
1400     robj *o;
1401     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL
1402         || checkType(c,o,OBJ_STREAM)) return;
1403     stream *s = o->ptr;
1404     addReplyLongLong(c,s->length);
1405 }
1406 
1407 /* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N
1408  *       ID_1 ID_2 ... ID_N
1409  *
1410  * This function also implements the XREAD-GROUP command, which is like XREAD
1411  * but accepting the [GROUP group-name consumer-name] additional option.
1412  * This is useful because while XREAD is a read command and can be called
1413  * on slaves, XREAD-GROUP is not. */
1414 #define XREAD_BLOCKED_DEFAULT_COUNT 1000
xreadCommand(client * c)1415 void xreadCommand(client *c) {
1416     long long timeout = -1; /* -1 means, no BLOCK argument given. */
1417     long long count = 0;
1418     int streams_count = 0;
1419     int streams_arg = 0;
1420     int noack = 0;          /* True if NOACK option was specified. */
1421     #define STREAMID_STATIC_VECTOR_LEN 8
1422     streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
1423     streamID *ids = static_ids;
1424     streamCG **groups = NULL;
1425     int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
1426     robj *groupname = NULL;
1427     robj *consumername = NULL;
1428 
1429     /* Parse arguments. */
1430     for (int i = 1; i < c->argc; i++) {
1431         int moreargs = c->argc-i-1;
1432         char *o = c->argv[i]->ptr;
1433         if (!strcasecmp(o,"BLOCK") && moreargs) {
1434             i++;
1435             if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
1436                 UNIT_MILLISECONDS) != C_OK) return;
1437         } else if (!strcasecmp(o,"COUNT") && moreargs) {
1438             i++;
1439             if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
1440                 return;
1441             if (count < 0) count = 0;
1442         } else if (!strcasecmp(o,"STREAMS") && moreargs) {
1443             streams_arg = i+1;
1444             streams_count = (c->argc-streams_arg);
1445             if ((streams_count % 2) != 0) {
1446                 addReplyError(c,"Unbalanced XREAD list of streams: "
1447                                 "for each stream key an ID or '$' must be "
1448                                 "specified.");
1449                 return;
1450             }
1451             streams_count /= 2; /* We have two arguments for each stream. */
1452             break;
1453         } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {
1454             if (!xreadgroup) {
1455                 addReplyError(c,"The GROUP option is only supported by "
1456                                 "XREADGROUP. You called XREAD instead.");
1457                 return;
1458             }
1459             groupname = c->argv[i+1];
1460             consumername = c->argv[i+2];
1461             i += 2;
1462         } else if (!strcasecmp(o,"NOACK")) {
1463             if (!xreadgroup) {
1464                 addReplyError(c,"The NOACK option is only supported by "
1465                                 "XREADGROUP. You called XREAD instead.");
1466                 return;
1467             }
1468             noack = 1;
1469         } else {
1470             addReply(c,shared.syntaxerr);
1471             return;
1472         }
1473     }
1474 
1475     /* STREAMS option is mandatory. */
1476     if (streams_arg == 0) {
1477         addReply(c,shared.syntaxerr);
1478         return;
1479     }
1480 
1481     /* If the user specified XREADGROUP then it must also
1482      * provide the GROUP option. */
1483     if (xreadgroup && groupname == NULL) {
1484         addReplyError(c,"Missing GROUP option for XREADGROUP");
1485         return;
1486     }
1487 
1488     /* Parse the IDs and resolve the group name. */
1489     if (streams_count > STREAMID_STATIC_VECTOR_LEN)
1490         ids = zmalloc(sizeof(streamID)*streams_count);
1491     if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);
1492 
1493     for (int i = streams_arg + streams_count; i < c->argc; i++) {
1494         /* Specifying "$" as last-known-id means that the client wants to be
1495          * served with just the messages that will arrive into the stream
1496          * starting from now. */
1497         int id_idx = i - streams_arg - streams_count;
1498         robj *key = c->argv[i-streams_count];
1499         robj *o = lookupKeyRead(c->db,key);
1500         if (o && checkType(c,o,OBJ_STREAM)) goto cleanup;
1501         streamCG *group = NULL;
1502 
1503         /* If a group was specified, than we need to be sure that the
1504          * key and group actually exist. */
1505         if (groupname) {
1506             if (o == NULL ||
1507                 (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
1508             {
1509                 addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
1510                                        "group '%s' in XREADGROUP with GROUP "
1511                                        "option",
1512                                     (char*)key->ptr,(char*)groupname->ptr);
1513                 goto cleanup;
1514             }
1515             groups[id_idx] = group;
1516         }
1517 
1518         if (strcmp(c->argv[i]->ptr,"$") == 0) {
1519             if (xreadgroup) {
1520                 addReplyError(c,"The $ ID is meaningless in the context of "
1521                                 "XREADGROUP: you want to read the history of "
1522                                 "this consumer by specifying a proper ID, or "
1523                                 "use the > ID to get new messages. The $ ID would "
1524                                 "just return an empty result set.");
1525                 goto cleanup;
1526             }
1527             if (o) {
1528                 stream *s = o->ptr;
1529                 ids[id_idx] = s->last_id;
1530             } else {
1531                 ids[id_idx].ms = 0;
1532                 ids[id_idx].seq = 0;
1533             }
1534             continue;
1535         } else if (strcmp(c->argv[i]->ptr,">") == 0) {
1536             if (!xreadgroup) {
1537                 addReplyError(c,"The > ID can be specified only when calling "
1538                                 "XREADGROUP using the GROUP <group> "
1539                                 "<consumer> option.");
1540                 goto cleanup;
1541             }
1542             /* We use just the maximum ID to signal this is a ">" ID, anyway
1543              * the code handling the blocking clients will have to update the
1544              * ID later in order to match the changing consumer group last ID. */
1545             ids[id_idx].ms = UINT64_MAX;
1546             ids[id_idx].seq = UINT64_MAX;
1547             continue;
1548         }
1549         if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
1550             goto cleanup;
1551     }
1552 
1553     /* Try to serve the client synchronously. */
1554     size_t arraylen = 0;
1555     void *arraylen_ptr = NULL;
1556     for (int i = 0; i < streams_count; i++) {
1557         robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
1558         if (o == NULL) continue;
1559         stream *s = o->ptr;
1560         streamID *gt = ids+i; /* ID must be greater than this. */
1561         int serve_synchronously = 0;
1562         int serve_history = 0; /* True for XREADGROUP with ID != ">". */
1563 
1564         /* Check if there are the conditions to serve the client
1565          * synchronously. */
1566         if (groups) {
1567             /* If the consumer is blocked on a group, we always serve it
1568              * synchronously (serving its local history) if the ID specified
1569              * was not the special ">" ID. */
1570             if (gt->ms != UINT64_MAX ||
1571                 gt->seq != UINT64_MAX)
1572             {
1573                 serve_synchronously = 1;
1574                 serve_history = 1;
1575             } else if (s->length) {
1576                 /* We also want to serve a consumer in a consumer group
1577                  * synchronously in case the group top item delivered is smaller
1578                  * than what the stream has inside. */
1579                 streamID maxid, *last = &groups[i]->last_id;
1580                 streamLastValidID(s, &maxid);
1581                 if (streamCompareID(&maxid, last) > 0) {
1582                     serve_synchronously = 1;
1583                     *gt = *last;
1584                 }
1585             }
1586         } else if (s->length) {
1587             /* For consumers without a group, we serve synchronously if we can
1588              * actually provide at least one item from the stream. */
1589             streamID maxid;
1590             streamLastValidID(s, &maxid);
1591             if (streamCompareID(&maxid, gt) > 0) {
1592                 serve_synchronously = 1;
1593             }
1594         }
1595 
1596         if (serve_synchronously) {
1597             arraylen++;
1598             if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c);
1599             /* streamReplyWithRange() handles the 'start' ID as inclusive,
1600              * so start from the next ID, since we want only messages with
1601              * IDs greater than start. */
1602             streamID start = *gt;
1603             streamIncrID(&start);
1604 
1605             /* Emit the two elements sub-array consisting of the name
1606              * of the stream and the data we extracted from it. */
1607             addReplyMultiBulkLen(c,2);
1608             addReplyBulk(c,c->argv[streams_arg+i]);
1609             streamConsumer *consumer = NULL;
1610             if (groups) consumer = streamLookupConsumer(groups[i],
1611                                                         consumername->ptr,
1612                                                         SLC_NONE);
1613             streamPropInfo spi = {c->argv[i+streams_arg],groupname};
1614             int flags = 0;
1615             if (noack) flags |= STREAM_RWR_NOACK;
1616             if (serve_history) flags |= STREAM_RWR_HISTORY;
1617             streamReplyWithRange(c,s,&start,NULL,count,0,
1618                                  groups ? groups[i] : NULL,
1619                                  consumer, flags, &spi);
1620             if (groups) server.dirty++;
1621         }
1622     }
1623 
1624      /* We replied synchronously! Set the top array len and return to caller. */
1625     if (arraylen) {
1626         setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
1627         goto cleanup;
1628     }
1629 
1630     /* Block if needed. */
1631     if (timeout != -1) {
1632         /* If we are inside a MULTI/EXEC and the list is empty the only thing
1633          * we can do is treating it as a timeout (even with timeout 0). */
1634         if (c->flags & CLIENT_MULTI) {
1635             addReply(c,shared.nullmultibulk);
1636             goto cleanup;
1637         }
1638         blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
1639                      timeout, NULL, ids);
1640         /* If no COUNT is given and we block, set a relatively small count:
1641          * in case the ID provided is too low, we do not want the server to
1642          * block just to serve this client a huge stream of messages. */
1643         c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
1644 
1645         /* If this is a XREADGROUP + GROUP we need to remember for which
1646          * group and consumer name we are blocking, so later when one of the
1647          * keys receive more data, we can call streamReplyWithRange() passing
1648          * the right arguments. */
1649         if (groupname) {
1650             incrRefCount(groupname);
1651             incrRefCount(consumername);
1652             c->bpop.xread_group = groupname;
1653             c->bpop.xread_consumer = consumername;
1654             c->bpop.xread_group_noack = noack;
1655         } else {
1656             c->bpop.xread_group = NULL;
1657             c->bpop.xread_consumer = NULL;
1658         }
1659         goto cleanup;
1660     }
1661 
1662     /* No BLOCK option, nor any stream we can serve. Reply as with a
1663      * timeout happened. */
1664     addReply(c,shared.nullmultibulk);
1665     /* Continue to cleanup... */
1666 
1667 cleanup: /* Cleanup. */
1668 
1669     /* The command is propagated (in the READGROUP form) as a side effect
1670      * of calling lower level APIs. So stop any implicit propagation. */
1671     preventCommandPropagation(c);
1672     if (ids != static_ids) zfree(ids);
1673     zfree(groups);
1674 }
1675 
1676 /* -----------------------------------------------------------------------
1677  * Low level implementation of consumer groups
1678  * ----------------------------------------------------------------------- */
1679 
1680 /* Create a NACK entry setting the delivery count to 1 and the delivery
1681  * time to the current time. The NACK consumer will be set to the one
1682  * specified as argument of the function. */
streamCreateNACK(streamConsumer * consumer)1683 streamNACK *streamCreateNACK(streamConsumer *consumer) {
1684     streamNACK *nack = zmalloc(sizeof(*nack));
1685     nack->delivery_time = mstime();
1686     nack->delivery_count = 1;
1687     nack->consumer = consumer;
1688     return nack;
1689 }
1690 
1691 /* Free a NACK entry. */
streamFreeNACK(streamNACK * na)1692 void streamFreeNACK(streamNACK *na) {
1693     zfree(na);
1694 }
1695 
1696 /* Free a consumer and associated data structures. Note that this function
1697  * will not reassign the pending messages associated with this consumer
1698  * nor will delete them from the stream, so when this function is called
1699  * to delete a consumer, and not when the whole stream is destroyed, the caller
1700  * should do some work before. */
streamFreeConsumer(streamConsumer * sc)1701 void streamFreeConsumer(streamConsumer *sc) {
1702     raxFree(sc->pel); /* No value free callback: the PEL entries are shared
1703                          between the consumer and the main stream PEL. */
1704     sdsfree(sc->name);
1705     zfree(sc);
1706 }
1707 
1708 /* Create a new consumer group in the context of the stream 's', having the
1709  * specified name and last server ID. If a consumer group with the same name
1710  * already existed NULL is returned, otherwise the pointer to the consumer
1711  * group is returned. */
streamCreateCG(stream * s,char * name,size_t namelen,streamID * id)1712 streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
1713     if (s->cgroups == NULL) s->cgroups = raxNew();
1714     if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
1715         return NULL;
1716 
1717     streamCG *cg = zmalloc(sizeof(*cg));
1718     cg->pel = raxNew();
1719     cg->consumers = raxNew();
1720     cg->last_id = *id;
1721     raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
1722     return cg;
1723 }
1724 
1725 /* Free a consumer group and all its associated data. */
streamFreeCG(streamCG * cg)1726 void streamFreeCG(streamCG *cg) {
1727     raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK);
1728     raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer);
1729     zfree(cg);
1730 }
1731 
1732 /* Lookup the consumer group in the specified stream and returns its
1733  * pointer, otherwise if there is no such group, NULL is returned. */
streamLookupCG(stream * s,sds groupname)1734 streamCG *streamLookupCG(stream *s, sds groupname) {
1735     if (s->cgroups == NULL) return NULL;
1736     streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,
1737                            sdslen(groupname));
1738     return (cg == raxNotFound) ? NULL : cg;
1739 }
1740 
1741 /* Lookup the consumer with the specified name in the group 'cg': if the
1742  * consumer does not exist it is automatically created as a side effect
1743  * of calling this function, otherwise its last seen time is updated and
1744  * the existing consumer reference returned. */
streamLookupConsumer(streamCG * cg,sds name,int flags)1745 streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
1746     int create = !(flags & SLC_NOCREAT);
1747     int refresh = !(flags & SLC_NOREFRESH);
1748     streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
1749                                sdslen(name));
1750     if (consumer == raxNotFound) {
1751         if (!create) return NULL;
1752         consumer = zmalloc(sizeof(*consumer));
1753         consumer->name = sdsdup(name);
1754         consumer->pel = raxNew();
1755         raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
1756                   consumer,NULL);
1757     }
1758     if (refresh) consumer->seen_time = mstime();
1759     return consumer;
1760 }
1761 
1762 /* Delete the consumer specified in the consumer group 'cg'. The consumer
1763  * may have pending messages: they are removed from the PEL, and the number
1764  * of pending messages "lost" is returned. */
streamDelConsumer(streamCG * cg,sds name)1765 uint64_t streamDelConsumer(streamCG *cg, sds name) {
1766     streamConsumer *consumer =
1767         streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH);
1768     if (consumer == NULL) return 0;
1769 
1770     uint64_t retval = raxSize(consumer->pel);
1771 
1772     /* Iterate all the consumer pending messages, deleting every corresponding
1773      * entry from the global entry. */
1774     raxIterator ri;
1775     raxStart(&ri,consumer->pel);
1776     raxSeek(&ri,"^",NULL,0);
1777     while(raxNext(&ri)) {
1778         streamNACK *nack = ri.data;
1779         raxRemove(cg->pel,ri.key,ri.key_len,NULL);
1780         streamFreeNACK(nack);
1781     }
1782     raxStop(&ri);
1783 
1784     /* Deallocate the consumer. */
1785     raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL);
1786     streamFreeConsumer(consumer);
1787     return retval;
1788 }
1789 
1790 /* -----------------------------------------------------------------------
1791  * Consumer groups commands
1792  * ----------------------------------------------------------------------- */
1793 
1794 /* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
1795  * XGROUP SETID <key> <groupname> <id or $>
1796  * XGROUP DESTROY <key> <groupname>
1797  * XGROUP DELCONSUMER <key> <groupname> <consumername> */
xgroupCommand(client * c)1798 void xgroupCommand(client *c) {
1799     const char *help[] = {
1800 "CREATE      <key> <groupname> <id or $> [opt] -- Create a new consumer group.",
1801 "            option MKSTREAM: create the empty stream if it does not exist.",
1802 "SETID       <key> <groupname> <id or $>  -- Set the current group ID.",
1803 "DESTROY     <key> <groupname>            -- Remove the specified group.",
1804 "DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.",
1805 "HELP                                     -- Prints this help.",
1806 NULL
1807     };
1808     stream *s = NULL;
1809     sds grpname = NULL;
1810     streamCG *cg = NULL;
1811     char *opt = c->argv[1]->ptr; /* Subcommand name. */
1812     int mkstream = 0;
1813     robj *o;
1814 
1815     /* CREATE has an MKSTREAM option that creates the stream if it
1816      * does not exist. */
1817     if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
1818         if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
1819             addReplySubcommandSyntaxError(c);
1820             return;
1821         }
1822         mkstream = 1;
1823         grpname = c->argv[3]->ptr;
1824     }
1825 
1826     /* Everything but the "HELP" option requires a key and group name. */
1827     if (c->argc >= 4) {
1828         o = lookupKeyWrite(c->db,c->argv[2]);
1829         if (o) {
1830             if (checkType(c,o,OBJ_STREAM)) return;
1831             s = o->ptr;
1832         }
1833         grpname = c->argv[3]->ptr;
1834     }
1835 
1836     /* Check for missing key/group. */
1837     if (c->argc >= 4 && !mkstream) {
1838         /* At this point key must exist, or there is an error. */
1839         if (s == NULL) {
1840             addReplyError(c,
1841                 "The XGROUP subcommand requires the key to exist. "
1842                 "Note that for CREATE you may want to use the MKSTREAM "
1843                 "option to create an empty stream automatically.");
1844             return;
1845         }
1846 
1847         /* Certain subcommands require the group to exist. */
1848         if ((cg = streamLookupCG(s,grpname)) == NULL &&
1849             (!strcasecmp(opt,"SETID") ||
1850              !strcasecmp(opt,"DELCONSUMER")))
1851         {
1852             addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
1853                                    "for key name '%s'",
1854                                    (char*)grpname, (char*)c->argv[2]->ptr);
1855             return;
1856         }
1857     }
1858 
1859     /* Dispatch the different subcommands. */
1860     if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {
1861         streamID id;
1862         if (!strcmp(c->argv[4]->ptr,"$")) {
1863             if (s) {
1864                 id = s->last_id;
1865             } else {
1866                 id.ms = 0;
1867                 id.seq = 0;
1868             }
1869         } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) {
1870             return;
1871         }
1872 
1873         /* Handle the MKSTREAM option now that the command can no longer fail. */
1874         if (s == NULL) {
1875             serverAssert(mkstream);
1876             o = createStreamObject();
1877             dbAdd(c->db,c->argv[2],o);
1878             s = o->ptr;
1879         }
1880 
1881         streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
1882         if (cg) {
1883             addReply(c,shared.ok);
1884             server.dirty++;
1885             notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create",
1886                                 c->argv[2],c->db->id);
1887         } else {
1888             addReplySds(c,
1889                 sdsnew("-BUSYGROUP Consumer Group name already exists\r\n"));
1890         }
1891     } else if (!strcasecmp(opt,"SETID") && c->argc == 5) {
1892         streamID id;
1893         if (!strcmp(c->argv[4]->ptr,"$")) {
1894             id = s->last_id;
1895         } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
1896             return;
1897         }
1898         cg->last_id = id;
1899         addReply(c,shared.ok);
1900         server.dirty++;
1901         notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id);
1902     } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {
1903         if (cg) {
1904             raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL);
1905             streamFreeCG(cg);
1906             addReply(c,shared.cone);
1907             server.dirty++;
1908             notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
1909                                 c->argv[2],c->db->id);
1910         } else {
1911             addReply(c,shared.czero);
1912         }
1913     } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
1914         /* Delete the consumer and returns the number of pending messages
1915          * that were yet associated with such a consumer. */
1916         long long pending = streamDelConsumer(cg,c->argv[4]->ptr);
1917         addReplyLongLong(c,pending);
1918         server.dirty++;
1919         notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",
1920                             c->argv[2],c->db->id);
1921     } else if (!strcasecmp(opt,"HELP")) {
1922         addReplyHelp(c, help);
1923     } else {
1924         addReplySubcommandSyntaxError(c);
1925     }
1926 }
1927 
1928 /* XSETID <stream> <groupname> <id>
1929  *
1930  * Set the internal "last ID" of a stream. */
xsetidCommand(client * c)1931 void xsetidCommand(client *c) {
1932     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
1933     if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
1934 
1935     stream *s = o->ptr;
1936     streamID id;
1937     if (streamParseStrictIDOrReply(c,c->argv[2],&id,0) != C_OK) return;
1938 
1939     /* If the stream has at least one item, we want to check that the user
1940      * is setting a last ID that is equal or greater than the current top
1941      * item, otherwise the fundamental ID monotonicity assumption is violated. */
1942     if (s->length > 0) {
1943         streamID maxid;
1944         streamLastValidID(s,&maxid);
1945 
1946         if (streamCompareID(&id,&maxid) < 0) {
1947             addReplyError(c,"The ID specified in XSETID is smaller than the "
1948                             "target stream top item");
1949             return;
1950         }
1951     }
1952     s->last_id = id;
1953     addReply(c,shared.ok);
1954     server.dirty++;
1955     notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
1956 }
1957 
1958 /* XACK <key> <group> <id> <id> ... <id>
1959  *
1960  * Acknowledge a message as processed. In practical terms we just check the
1961  * pendine entries list (PEL) of the group, and delete the PEL entry both from
1962  * the group and the consumer (pending messages are referenced in both places).
1963  *
1964  * Return value of the command is the number of messages successfully
1965  * acknowledged, that is, the IDs we were actually able to resolve in the PEL.
1966  */
xackCommand(client * c)1967 void xackCommand(client *c) {
1968     streamCG *group = NULL;
1969     robj *o = lookupKeyRead(c->db,c->argv[1]);
1970     if (o) {
1971         if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
1972         group = streamLookupCG(o->ptr,c->argv[2]->ptr);
1973     }
1974 
1975     /* No key or group? Nothing to ack. */
1976     if (o == NULL || group == NULL) {
1977         addReply(c,shared.czero);
1978         return;
1979     }
1980 
1981     int acknowledged = 0;
1982     for (int j = 3; j < c->argc; j++) {
1983         streamID id;
1984         unsigned char buf[sizeof(streamID)];
1985         if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
1986         streamEncodeID(buf,&id);
1987 
1988         /* Lookup the ID in the group PEL: it will have a reference to the
1989          * NACK structure that will have a reference to the consumer, so that
1990          * we are able to remove the entry from both PELs. */
1991         streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
1992         if (nack != raxNotFound) {
1993             raxRemove(group->pel,buf,sizeof(buf),NULL);
1994             raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
1995             streamFreeNACK(nack);
1996             acknowledged++;
1997             server.dirty++;
1998         }
1999     }
2000     addReplyLongLong(c,acknowledged);
2001 }
2002 
2003 /* XPENDING <key> <group> [<start> <stop> <count> [<consumer>]]
2004  *
2005  * If start and stop are omitted, the command just outputs information about
2006  * the amount of pending messages for the key/group pair, together with
2007  * the minimum and maxium ID of pending messages.
2008  *
2009  * If start and stop are provided instead, the pending messages are returned
2010  * with informations about the current owner, number of deliveries and last
2011  * delivery time and so forth. */
xpendingCommand(client * c)2012 void xpendingCommand(client *c) {
2013     int justinfo = c->argc == 3; /* Without the range just outputs general
2014                                     informations about the PEL. */
2015     robj *key = c->argv[1];
2016     robj *groupname = c->argv[2];
2017     robj *consumername = (c->argc == 7) ? c->argv[6] : NULL;
2018     streamID startid, endid;
2019     long long count;
2020 
2021     /* Start and stop, and the consumer, can be omitted. */
2022     if (c->argc != 3 && c->argc != 6 && c->argc != 7) {
2023         addReply(c,shared.syntaxerr);
2024         return;
2025     }
2026 
2027     /* Parse start/end/count arguments ASAP if needed, in order to report
2028      * syntax errors before any other error. */
2029     if (c->argc >= 6) {
2030         if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)
2031             return;
2032         if (count < 0) count = 0;
2033         if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR)
2034             return;
2035         if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR)
2036             return;
2037     }
2038 
2039     /* Lookup the key and the group inside the stream. */
2040     robj *o = lookupKeyRead(c->db,c->argv[1]);
2041     streamCG *group;
2042 
2043     if (o && checkType(c,o,OBJ_STREAM)) return;
2044     if (o == NULL ||
2045         (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
2046     {
2047         addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
2048                                "group '%s'",
2049                                (char*)key->ptr,(char*)groupname->ptr);
2050         return;
2051     }
2052 
2053     /* XPENDING <key> <group> variant. */
2054     if (justinfo) {
2055         addReplyMultiBulkLen(c,4);
2056         /* Total number of messages in the PEL. */
2057         addReplyLongLong(c,raxSize(group->pel));
2058         /* First and last IDs. */
2059         if (raxSize(group->pel) == 0) {
2060             addReply(c,shared.nullbulk); /* Start. */
2061             addReply(c,shared.nullbulk); /* End. */
2062             addReply(c,shared.nullmultibulk); /* Clients. */
2063         } else {
2064             /* Start. */
2065             raxIterator ri;
2066             raxStart(&ri,group->pel);
2067             raxSeek(&ri,"^",NULL,0);
2068             raxNext(&ri);
2069             streamDecodeID(ri.key,&startid);
2070             addReplyStreamID(c,&startid);
2071 
2072             /* End. */
2073             raxSeek(&ri,"$",NULL,0);
2074             raxNext(&ri);
2075             streamDecodeID(ri.key,&endid);
2076             addReplyStreamID(c,&endid);
2077             raxStop(&ri);
2078 
2079             /* Consumers with pending messages. */
2080             raxStart(&ri,group->consumers);
2081             raxSeek(&ri,"^",NULL,0);
2082             void *arraylen_ptr = addDeferredMultiBulkLength(c);
2083             size_t arraylen = 0;
2084             while(raxNext(&ri)) {
2085                 streamConsumer *consumer = ri.data;
2086                 if (raxSize(consumer->pel) == 0) continue;
2087                 addReplyMultiBulkLen(c,2);
2088                 addReplyBulkCBuffer(c,ri.key,ri.key_len);
2089                 addReplyBulkLongLong(c,raxSize(consumer->pel));
2090                 arraylen++;
2091             }
2092             setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
2093             raxStop(&ri);
2094         }
2095     }
2096     /* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
2097     else {
2098         streamConsumer *consumer = NULL;
2099         if (consumername) {
2100             consumer = streamLookupConsumer(group,
2101                                             consumername->ptr,
2102                                             SLC_NOCREAT|SLC_NOREFRESH);
2103 
2104             /* If a consumer name was mentioned but it does not exist, we can
2105              * just return an empty array. */
2106             if (consumer == NULL) {
2107                 addReplyMultiBulkLen(c,0);
2108                 return;
2109             }
2110         }
2111 
2112         rax *pel = consumer ? consumer->pel : group->pel;
2113         unsigned char startkey[sizeof(streamID)];
2114         unsigned char endkey[sizeof(streamID)];
2115         raxIterator ri;
2116         mstime_t now = mstime();
2117 
2118         streamEncodeID(startkey,&startid);
2119         streamEncodeID(endkey,&endid);
2120         raxStart(&ri,pel);
2121         raxSeek(&ri,">=",startkey,sizeof(startkey));
2122         void *arraylen_ptr = addDeferredMultiBulkLength(c);
2123         size_t arraylen = 0;
2124 
2125         while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
2126             streamNACK *nack = ri.data;
2127 
2128             arraylen++;
2129             count--;
2130             addReplyMultiBulkLen(c,4);
2131 
2132             /* Entry ID. */
2133             streamID id;
2134             streamDecodeID(ri.key,&id);
2135             addReplyStreamID(c,&id);
2136 
2137             /* Consumer name. */
2138             addReplyBulkCBuffer(c,nack->consumer->name,
2139                                 sdslen(nack->consumer->name));
2140 
2141             /* Milliseconds elapsed since last delivery. */
2142             mstime_t elapsed = now - nack->delivery_time;
2143             if (elapsed < 0) elapsed = 0;
2144             addReplyLongLong(c,elapsed);
2145 
2146             /* Number of deliveries. */
2147             addReplyLongLong(c,nack->delivery_count);
2148         }
2149         raxStop(&ri);
2150         setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
2151     }
2152 }
2153 
2154 /* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
2155  *        [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
2156  *        [FORCE] [JUSTID]
2157  *
2158  * Gets ownership of one or multiple messages in the Pending Entries List
2159  * of a given stream consumer group.
2160  *
2161  * If the message ID (among the specified ones) exists, and its idle
2162  * time greater or equal to <min-idle-time>, then the message new owner
2163  * becomes the specified <consumer>. If the minimum idle time specified
2164  * is zero, messages are claimed regardless of their idle time.
2165  *
2166  * All the messages that cannot be found inside the pending entries list
2167  * are ignored, but in case the FORCE option is used. In that case we
2168  * create the NACK (representing a not yet acknowledged message) entry in
2169  * the consumer group PEL.
2170  *
2171  * This command creates the consumer as side effect if it does not yet
2172  * exists. Moreover the command reset the idle time of the message to 0,
2173  * even if by using the IDLE or TIME options, the user can control the
2174  * new idle time.
2175  *
2176  * The options at the end can be used in order to specify more attributes
2177  * to set in the representation of the pending message:
2178  *
2179  * 1. IDLE <ms>:
2180  *      Set the idle time (last time it was delivered) of the message.
2181  *      If IDLE is not specified, an IDLE of 0 is assumed, that is,
2182  *      the time count is reset because the message has now a new
2183  *      owner trying to process it.
2184  *
2185  * 2. TIME <ms-unix-time>:
2186  *      This is the same as IDLE but instead of a relative amount of
2187  *      milliseconds, it sets the idle time to a specific unix time
2188  *      (in milliseconds). This is useful in order to rewrite the AOF
2189  *      file generating XCLAIM commands.
2190  *
2191  * 3. RETRYCOUNT <count>:
2192  *      Set the retry counter to the specified value. This counter is
2193  *      incremented every time a message is delivered again. Normally
2194  *      XCLAIM does not alter this counter, which is just served to clients
2195  *      when the XPENDING command is called: this way clients can detect
2196  *      anomalies, like messages that are never processed for some reason
2197  *      after a big number of delivery attempts.
2198  *
2199  * 4. FORCE:
2200  *      Creates the pending message entry in the PEL even if certain
2201  *      specified IDs are not already in the PEL assigned to a different
2202  *      client. However the message must be exist in the stream, otherwise
2203  *      the IDs of non existing messages are ignored.
2204  *
2205  * 5. JUSTID:
2206  *      Return just an array of IDs of messages successfully claimed,
2207  *      without returning the actual message.
2208  *
2209  * 6. LASTID <id>:
2210  *      Update the consumer group last ID with the specified ID if the
2211  *      current last ID is smaller than the provided one.
2212  *      This is used for replication / AOF, so that when we read from a
2213  *      consumer group, the XCLAIM that gets propagated to give ownership
2214  *      to the consumer, is also used in order to update the group current
2215  *      ID.
2216  *
2217  * The command returns an array of messages that the user
2218  * successfully claimed, so that the caller is able to understand
2219  * what messages it is now in charge of. */
xclaimCommand(client * c)2220 void xclaimCommand(client *c) {
2221     streamCG *group = NULL;
2222     robj *o = lookupKeyRead(c->db,c->argv[1]);
2223     long long minidle; /* Minimum idle time argument. */
2224     long long retrycount = -1;   /* -1 means RETRYCOUNT option not given. */
2225     mstime_t deliverytime = -1;  /* -1 means IDLE/TIME options not given. */
2226     int force = 0;
2227     int justid = 0;
2228 
2229     if (o) {
2230         if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
2231         group = streamLookupCG(o->ptr,c->argv[2]->ptr);
2232     }
2233 
2234     /* No key or group? Send an error given that the group creation
2235      * is mandatory. */
2236     if (o == NULL || group == NULL) {
2237         addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
2238                               "consumer group '%s'", (char*)c->argv[1]->ptr,
2239                               (char*)c->argv[2]->ptr);
2240         return;
2241     }
2242 
2243     if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
2244         "Invalid min-idle-time argument for XCLAIM")
2245         != C_OK) return;
2246     if (minidle < 0) minidle = 0;
2247 
2248     /* Start parsing the IDs, so that we abort ASAP if there is a syntax
2249      * error: the return value of this command cannot be an error in case
2250      * the client successfully claimed some message, so it should be
2251      * executed in a "all or nothing" fashion. */
2252     int j;
2253     for (j = 5; j < c->argc; j++) {
2254         streamID id;
2255         if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
2256     }
2257     int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
2258 
2259     /* If we stopped because some IDs cannot be parsed, perhaps they
2260      * are trailing options. */
2261     mstime_t now = mstime();
2262     streamID last_id = {0,0};
2263     int propagate_last_id = 0;
2264     for (; j < c->argc; j++) {
2265         int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
2266         char *opt = c->argv[j]->ptr;
2267         if (!strcasecmp(opt,"FORCE")) {
2268             force = 1;
2269         } else if (!strcasecmp(opt,"JUSTID")) {
2270             justid = 1;
2271         } else if (!strcasecmp(opt,"IDLE") && moreargs) {
2272             j++;
2273             if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
2274                 "Invalid IDLE option argument for XCLAIM")
2275                 != C_OK) return;
2276             deliverytime = now - deliverytime;
2277         } else if (!strcasecmp(opt,"TIME") && moreargs) {
2278             j++;
2279             if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
2280                 "Invalid TIME option argument for XCLAIM")
2281                 != C_OK) return;
2282         } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {
2283             j++;
2284             if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
2285                 "Invalid RETRYCOUNT option argument for XCLAIM")
2286                 != C_OK) return;
2287         } else if (!strcasecmp(opt,"LASTID") && moreargs) {
2288             j++;
2289             if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return;
2290         } else {
2291             addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
2292             return;
2293         }
2294     }
2295 
2296     if (streamCompareID(&last_id,&group->last_id) > 0) {
2297         group->last_id = last_id;
2298         propagate_last_id = 1;
2299     }
2300 
2301     if (deliverytime != -1) {
2302         /* If a delivery time was passed, either with IDLE or TIME, we
2303          * do some sanity check on it, and set the deliverytime to now
2304          * (which is a sane choice usually) if the value is bogus.
2305          * To raise an error here is not wise because clients may compute
2306          * the idle time doing some math starting from their local time,
2307          * and this is not a good excuse to fail in case, for instance,
2308          * the computer time is a bit in the future from our POV. */
2309         if (deliverytime < 0 || deliverytime > now) deliverytime = now;
2310     } else {
2311         /* If no IDLE/TIME option was passed, we want the last delivery
2312          * time to be now, so that the idle time of the message will be
2313          * zero. */
2314         deliverytime = now;
2315     }
2316 
2317     /* Do the actual claiming. */
2318     streamConsumer *consumer = NULL;
2319     void *arraylenptr = addDeferredMultiBulkLength(c);
2320     size_t arraylen = 0;
2321     for (int j = 5; j <= last_id_arg; j++) {
2322         streamID id;
2323         unsigned char buf[sizeof(streamID)];
2324         if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
2325             serverPanic("StreamID invalid after check. Should not be possible.");
2326         streamEncodeID(buf,&id);
2327 
2328         /* Lookup the ID in the group PEL. */
2329         streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
2330 
2331         /* If FORCE is passed, let's check if at least the entry
2332          * exists in the Stream. In such case, we'll crate a new
2333          * entry in the PEL from scratch, so that XCLAIM can also
2334          * be used to create entries in the PEL. Useful for AOF
2335          * and replication of consumer groups. */
2336         if (force && nack == raxNotFound) {
2337             streamIterator myiterator;
2338             streamIteratorStart(&myiterator,o->ptr,&id,&id,0);
2339             int64_t numfields;
2340             int found = 0;
2341             streamID item_id;
2342             if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;
2343             streamIteratorStop(&myiterator);
2344 
2345             /* Item must exist for us to create a NACK for it. */
2346             if (!found) continue;
2347 
2348             /* Create the NACK. */
2349             nack = streamCreateNACK(NULL);
2350             raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
2351         }
2352 
2353         if (nack != raxNotFound) {
2354             /* We need to check if the minimum idle time requested
2355              * by the caller is satisfied by this entry.
2356              *
2357              * Note that the nack could be created by FORCE, in this
2358              * case there was no pre-existing entry and minidle should
2359              * be ignored, but in that case nick->consumer is NULL. */
2360             if (nack->consumer && minidle) {
2361                 mstime_t this_idle = now - nack->delivery_time;
2362                 if (this_idle < minidle) continue;
2363             }
2364             /* Remove the entry from the old consumer.
2365              * Note that nack->consumer is NULL if we created the
2366              * NACK above because of the FORCE option. */
2367             if (nack->consumer)
2368                 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
2369             /* Update the consumer and idle time. */
2370             if (consumer == NULL)
2371                 consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);
2372             nack->consumer = consumer;
2373             nack->delivery_time = deliverytime;
2374             /* Set the delivery attempts counter if given, otherwise
2375              * autoincrement unless JUSTID option provided */
2376             if (retrycount >= 0) {
2377                 nack->delivery_count = retrycount;
2378             } else if (!justid) {
2379                 nack->delivery_count++;
2380             }
2381             /* Add the entry in the new consumer local PEL. */
2382             raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
2383             /* Send the reply for this entry. */
2384             if (justid) {
2385                 addReplyStreamID(c,&id);
2386             } else {
2387                 size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
2388                                     NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
2389                 if (!emitted) addReply(c,shared.nullbulk);
2390             }
2391             arraylen++;
2392 
2393             /* Propagate this change. */
2394             streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
2395             propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
2396             server.dirty++;
2397         }
2398     }
2399     if (propagate_last_id) {
2400         streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
2401         server.dirty++;
2402     }
2403     setDeferredMultiBulkLength(c,arraylenptr,arraylen);
2404     preventCommandPropagation(c);
2405 }
2406 
2407 
2408 /* XDEL <key> [<ID1> <ID2> ... <IDN>]
2409  *
2410  * Removes the specified entries from the stream. Returns the number
2411  * of items actually deleted, that may be different from the number
2412  * of IDs passed in case certain IDs do not exist. */
xdelCommand(client * c)2413 void xdelCommand(client *c) {
2414     robj *o;
2415 
2416     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
2417         || checkType(c,o,OBJ_STREAM)) return;
2418     stream *s = o->ptr;
2419 
2420     /* We need to sanity check the IDs passed to start. Even if not
2421      * a big issue, it is not great that the command is only partially
2422      * executed because at some point an invalid ID is parsed. */
2423     streamID id;
2424     for (int j = 2; j < c->argc; j++) {
2425         if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
2426     }
2427 
2428     /* Actually apply the command. */
2429     int deleted = 0;
2430     for (int j = 2; j < c->argc; j++) {
2431         streamParseStrictIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */
2432         deleted += streamDeleteItem(s,&id);
2433     }
2434 
2435     /* Propagate the write if needed. */
2436     if (deleted) {
2437         signalModifiedKey(c->db,c->argv[1]);
2438         notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
2439         server.dirty += deleted;
2440     }
2441     addReplyLongLong(c,deleted);
2442 }
2443 
2444 /* General form: XTRIM <key> [... options ...]
2445  *
2446  * List of options:
2447  *
2448  * MAXLEN [~|=] <count>     -- Trim so that the stream will be capped at
2449  *                             the specified length. Use ~ before the
2450  *                             count in order to demand approximated trimming
2451  *                             (like XADD MAXLEN option).
2452  */
2453 
2454 #define TRIM_STRATEGY_NONE 0
2455 #define TRIM_STRATEGY_MAXLEN 1
xtrimCommand(client * c)2456 void xtrimCommand(client *c) {
2457     robj *o;
2458 
2459     /* If the key does not exist, we are ok returning zero, that is, the
2460      * number of elements removed from the stream. */
2461     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
2462         || checkType(c,o,OBJ_STREAM)) return;
2463     stream *s = o->ptr;
2464 
2465     /* Argument parsing. */
2466     int trim_strategy = TRIM_STRATEGY_NONE;
2467     long long maxlen = -1;  /* If left to -1 no trimming is performed. */
2468     int approx_maxlen = 0;  /* If 1 only delete whole radix tree nodes, so
2469                                the maxium length is not applied verbatim. */
2470     int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
2471 
2472     /* Parse options. */
2473     int i = 2; /* Start of options. */
2474     for (; i < c->argc; i++) {
2475         int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
2476         char *opt = c->argv[i]->ptr;
2477         if (!strcasecmp(opt,"maxlen") && moreargs) {
2478             approx_maxlen = 0;
2479             trim_strategy = TRIM_STRATEGY_MAXLEN;
2480             char *next = c->argv[i+1]->ptr;
2481             /* Check for the form MAXLEN ~ <count>. */
2482             if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
2483                 approx_maxlen = 1;
2484                 i++;
2485             } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
2486                 i++;
2487             }
2488             if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
2489                 != C_OK) return;
2490 
2491             if (maxlen < 0) {
2492                 addReplyError(c,"The MAXLEN argument must be >= 0.");
2493                 return;
2494             }
2495             i++;
2496             maxlen_arg_idx = i;
2497         } else {
2498             addReply(c,shared.syntaxerr);
2499             return;
2500         }
2501     }
2502 
2503     /* Perform the trimming. */
2504     int64_t deleted = 0;
2505     if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
2506         deleted = streamTrimByLength(s,maxlen,approx_maxlen);
2507     } else {
2508         addReplyError(c,"XTRIM called without an option to trim the stream");
2509         return;
2510     }
2511 
2512     /* Propagate the write if needed. */
2513     if (deleted) {
2514         signalModifiedKey(c->db,c->argv[1]);
2515         notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
2516         server.dirty += deleted;
2517         if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
2518     }
2519     addReplyLongLong(c,deleted);
2520 }
2521 
2522 /* XINFO CONSUMERS <key> <group>
2523  * XINFO GROUPS <key>
2524  * XINFO STREAM <key>
2525  * XINFO HELP. */
xinfoCommand(client * c)2526 void xinfoCommand(client *c) {
2527     const char *help[] = {
2528 "CONSUMERS <key> <groupname>  -- Show consumer groups of group <groupname>.",
2529 "GROUPS <key>                 -- Show the stream consumer groups.",
2530 "STREAM <key>                 -- Show information about the stream.",
2531 "HELP                         -- Print this help.",
2532 NULL
2533     };
2534     stream *s = NULL;
2535     char *opt;
2536     robj *key;
2537 
2538     /* HELP is special. Handle it ASAP. */
2539     if (!strcasecmp(c->argv[1]->ptr,"HELP")) {
2540         addReplyHelp(c, help);
2541         return;
2542     } else if (c->argc < 3) {
2543         addReplyError(c,"syntax error, try 'XINFO HELP'");
2544         return;
2545     }
2546 
2547     /* With the exception of HELP handled before any other sub commands, all
2548      * the ones are in the form of "<subcommand> <key>". */
2549     opt = c->argv[1]->ptr;
2550     key = c->argv[2];
2551 
2552     /* Lookup the key now, this is common for all the subcommands but HELP. */
2553     robj *o = lookupKeyWriteOrReply(c,key,shared.nokeyerr);
2554     if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
2555     s = o->ptr;
2556 
2557     /* Dispatch the different subcommands. */
2558     if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
2559         /* XINFO CONSUMERS <key> <group>. */
2560         streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
2561         if (cg == NULL) {
2562             addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
2563                                    "for key name '%s'",
2564                                    (char*)c->argv[3]->ptr, (char*)key->ptr);
2565             return;
2566         }
2567 
2568         addReplyMultiBulkLen(c,raxSize(cg->consumers));
2569         raxIterator ri;
2570         raxStart(&ri,cg->consumers);
2571         raxSeek(&ri,"^",NULL,0);
2572         mstime_t now = mstime();
2573         while(raxNext(&ri)) {
2574             streamConsumer *consumer = ri.data;
2575             mstime_t idle = now - consumer->seen_time;
2576             if (idle < 0) idle = 0;
2577 
2578             addReplyMultiBulkLen(c,6);
2579             addReplyBulkCString(c,"name");
2580             addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
2581             addReplyBulkCString(c,"pending");
2582             addReplyLongLong(c,raxSize(consumer->pel));
2583             addReplyBulkCString(c,"idle");
2584             addReplyLongLong(c,idle);
2585         }
2586         raxStop(&ri);
2587     } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
2588         /* XINFO GROUPS <key>. */
2589         if (s->cgroups == NULL) {
2590             addReplyMultiBulkLen(c,0);
2591             return;
2592         }
2593 
2594         addReplyMultiBulkLen(c,raxSize(s->cgroups));
2595         raxIterator ri;
2596         raxStart(&ri,s->cgroups);
2597         raxSeek(&ri,"^",NULL,0);
2598         while(raxNext(&ri)) {
2599             streamCG *cg = ri.data;
2600             addReplyMultiBulkLen(c,8);
2601             addReplyBulkCString(c,"name");
2602             addReplyBulkCBuffer(c,ri.key,ri.key_len);
2603             addReplyBulkCString(c,"consumers");
2604             addReplyLongLong(c,raxSize(cg->consumers));
2605             addReplyBulkCString(c,"pending");
2606             addReplyLongLong(c,raxSize(cg->pel));
2607             addReplyBulkCString(c,"last-delivered-id");
2608             addReplyStreamID(c,&cg->last_id);
2609         }
2610         raxStop(&ri);
2611     } else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
2612         /* XINFO STREAM <key> (or the alias XINFO <key>). */
2613         addReplyMultiBulkLen(c,14);
2614         addReplyBulkCString(c,"length");
2615         addReplyLongLong(c,s->length);
2616         addReplyBulkCString(c,"radix-tree-keys");
2617         addReplyLongLong(c,raxSize(s->rax));
2618         addReplyBulkCString(c,"radix-tree-nodes");
2619         addReplyLongLong(c,s->rax->numnodes);
2620         addReplyBulkCString(c,"groups");
2621         addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
2622         addReplyBulkCString(c,"last-generated-id");
2623         addReplyStreamID(c,&s->last_id);
2624 
2625         /* To emit the first/last entry we us the streamReplyWithRange()
2626          * API. */
2627         int count;
2628         streamID start, end;
2629         start.ms = start.seq = 0;
2630         end.ms = end.seq = UINT64_MAX;
2631         addReplyBulkCString(c,"first-entry");
2632         count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
2633                                      STREAM_RWR_RAWENTRIES,NULL);
2634         if (!count) addReply(c,shared.nullbulk);
2635         addReplyBulkCString(c,"last-entry");
2636         count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
2637                                      STREAM_RWR_RAWENTRIES,NULL);
2638         if (!count) addReply(c,shared.nullbulk);
2639     } else {
2640         addReplySubcommandSyntaxError(c);
2641     }
2642 }
2643 
2644