1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "lzf.h"    /* LZF compression library */
32 #include "zipmap.h"
33 #include "endianconv.h"
34 #include "stream.h"
35 
36 #include <math.h>
37 #include <fcntl.h>
38 #include <sys/types.h>
39 #include <sys/time.h>
40 #include <sys/resource.h>
41 #include <sys/wait.h>
42 #include <arpa/inet.h>
43 #include <sys/stat.h>
44 #include <sys/param.h>
45 
46 /* This macro is called when the internal RDB stracture is corrupt */
47 #define rdbExitReportCorruptRDB(...) rdbReportError(1, __LINE__,__VA_ARGS__)
48 /* This macro is called when RDB read failed (possibly a short read) */
49 #define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__)
50 
51 char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
52 extern int rdbCheckMode;
53 void rdbCheckError(const char *fmt, ...);
54 void rdbCheckSetError(const char *fmt, ...);
55 
56 #ifdef __GNUC__
57 void rdbReportError(int corruption_error, int linenum, char *reason, ...) __attribute__ ((format (printf, 3, 4)));
58 #endif
rdbReportError(int corruption_error,int linenum,char * reason,...)59 void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
60     va_list ap;
61     char msg[1024];
62     int len;
63 
64     len = snprintf(msg,sizeof(msg),
65         "Internal error in RDB reading offset %llu, function at rdb.c:%d -> ",
66         (unsigned long long)server.loading_loaded_bytes, linenum);
67     va_start(ap,reason);
68     vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
69     va_end(ap);
70 
71     if (!rdbCheckMode) {
72         if (rdbFileBeingLoaded || corruption_error) {
73             serverLog(LL_WARNING, "%s", msg);
74             char *argv[2] = {"",rdbFileBeingLoaded};
75             redis_check_rdb_main(2,argv,NULL);
76         } else {
77             serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg);
78             return;
79         }
80     } else {
81         rdbCheckError("%s",msg);
82     }
83     serverLog(LL_WARNING, "Terminating server after rdb file reading failure.");
84     exit(1);
85 }
86 
rdbWriteRaw(rio * rdb,void * p,size_t len)87 static ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) {
88     if (rdb && rioWrite(rdb,p,len) == 0)
89         return -1;
90     return len;
91 }
92 
rdbSaveType(rio * rdb,unsigned char type)93 int rdbSaveType(rio *rdb, unsigned char type) {
94     return rdbWriteRaw(rdb,&type,1);
95 }
96 
97 /* Load a "type" in RDB format, that is a one byte unsigned integer.
98  * This function is not only used to load object types, but also special
99  * "types" like the end-of-file type, the EXPIRE type, and so forth. */
rdbLoadType(rio * rdb)100 int rdbLoadType(rio *rdb) {
101     unsigned char type;
102     if (rioRead(rdb,&type,1) == 0) return -1;
103     return type;
104 }
105 
106 /* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME
107  * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS
108  * opcode. On error -1 is returned, however this could be a valid time, so
109  * to check for loading errors the caller should call rioGetReadError() after
110  * calling this function. */
rdbLoadTime(rio * rdb)111 time_t rdbLoadTime(rio *rdb) {
112     int32_t t32;
113     if (rioRead(rdb,&t32,4) == 0) return -1;
114     return (time_t)t32;
115 }
116 
rdbSaveMillisecondTime(rio * rdb,long long t)117 int rdbSaveMillisecondTime(rio *rdb, long long t) {
118     int64_t t64 = (int64_t) t;
119     memrev64ifbe(&t64); /* Store in little endian. */
120     return rdbWriteRaw(rdb,&t64,8);
121 }
122 
123 /* This function loads a time from the RDB file. It gets the version of the
124  * RDB because, unfortunately, before Redis 5 (RDB version 9), the function
125  * failed to convert data to/from little endian, so RDB files with keys having
126  * expires could not be shared between big endian and little endian systems
127  * (because the expire time will be totally wrong). The fix for this is just
128  * to call memrev64ifbe(), however if we fix this for all the RDB versions,
129  * this call will introduce an incompatibility for big endian systems:
130  * after upgrading to Redis version 5 they will no longer be able to load their
131  * own old RDB files. Because of that, we instead fix the function only for new
132  * RDB versions, and load older RDB versions as we used to do in the past,
133  * allowing big endian systems to load their own old RDB files.
134  *
135  * On I/O error the function returns LLONG_MAX, however if this is also a
136  * valid stored value, the caller should use rioGetReadError() to check for
137  * errors after calling this function. */
rdbLoadMillisecondTime(rio * rdb,int rdbver)138 long long rdbLoadMillisecondTime(rio *rdb, int rdbver) {
139     int64_t t64;
140     if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX;
141     if (rdbver >= 9) /* Check the top comment of this function. */
142         memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
143     return (long long)t64;
144 }
145 
146 /* Saves an encoded length. The first two bits in the first byte are used to
147  * hold the encoding type. See the RDB_* definitions for more information
148  * on the types of encoding. */
rdbSaveLen(rio * rdb,uint64_t len)149 int rdbSaveLen(rio *rdb, uint64_t len) {
150     unsigned char buf[2];
151     size_t nwritten;
152 
153     if (len < (1<<6)) {
154         /* Save a 6 bit len */
155         buf[0] = (len&0xFF)|(RDB_6BITLEN<<6);
156         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
157         nwritten = 1;
158     } else if (len < (1<<14)) {
159         /* Save a 14 bit len */
160         buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6);
161         buf[1] = len&0xFF;
162         if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
163         nwritten = 2;
164     } else if (len <= UINT32_MAX) {
165         /* Save a 32 bit len */
166         buf[0] = RDB_32BITLEN;
167         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
168         uint32_t len32 = htonl(len);
169         if (rdbWriteRaw(rdb,&len32,4) == -1) return -1;
170         nwritten = 1+4;
171     } else {
172         /* Save a 64 bit len */
173         buf[0] = RDB_64BITLEN;
174         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
175         len = htonu64(len);
176         if (rdbWriteRaw(rdb,&len,8) == -1) return -1;
177         nwritten = 1+8;
178     }
179     return nwritten;
180 }
181 
182 
183 /* Load an encoded length. If the loaded length is a normal length as stored
184  * with rdbSaveLen(), the read length is set to '*lenptr'. If instead the
185  * loaded length describes a special encoding that follows, then '*isencoded'
186  * is set to 1 and the encoding format is stored at '*lenptr'.
187  *
188  * See the RDB_ENC_* definitions in rdb.h for more information on special
189  * encodings.
190  *
191  * The function returns -1 on error, 0 on success. */
rdbLoadLenByRef(rio * rdb,int * isencoded,uint64_t * lenptr)192 int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) {
193     unsigned char buf[2];
194     int type;
195 
196     if (isencoded) *isencoded = 0;
197     if (rioRead(rdb,buf,1) == 0) return -1;
198     type = (buf[0]&0xC0)>>6;
199     if (type == RDB_ENCVAL) {
200         /* Read a 6 bit encoding type. */
201         if (isencoded) *isencoded = 1;
202         *lenptr = buf[0]&0x3F;
203     } else if (type == RDB_6BITLEN) {
204         /* Read a 6 bit len. */
205         *lenptr = buf[0]&0x3F;
206     } else if (type == RDB_14BITLEN) {
207         /* Read a 14 bit len. */
208         if (rioRead(rdb,buf+1,1) == 0) return -1;
209         *lenptr = ((buf[0]&0x3F)<<8)|buf[1];
210     } else if (buf[0] == RDB_32BITLEN) {
211         /* Read a 32 bit len. */
212         uint32_t len;
213         if (rioRead(rdb,&len,4) == 0) return -1;
214         *lenptr = ntohl(len);
215     } else if (buf[0] == RDB_64BITLEN) {
216         /* Read a 64 bit len. */
217         uint64_t len;
218         if (rioRead(rdb,&len,8) == 0) return -1;
219         *lenptr = ntohu64(len);
220     } else {
221         rdbExitReportCorruptRDB(
222             "Unknown length encoding %d in rdbLoadLen()",type);
223         return -1; /* Never reached. */
224     }
225     return 0;
226 }
227 
228 /* This is like rdbLoadLenByRef() but directly returns the value read
229  * from the RDB stream, signaling an error by returning RDB_LENERR
230  * (since it is a too large count to be applicable in any Redis data
231  * structure). */
rdbLoadLen(rio * rdb,int * isencoded)232 uint64_t rdbLoadLen(rio *rdb, int *isencoded) {
233     uint64_t len;
234 
235     if (rdbLoadLenByRef(rdb,isencoded,&len) == -1) return RDB_LENERR;
236     return len;
237 }
238 
239 /* Encodes the "value" argument as integer when it fits in the supported ranges
240  * for encoded types. If the function successfully encodes the integer, the
241  * representation is stored in the buffer pointer to by "enc" and the string
242  * length is returned. Otherwise 0 is returned. */
rdbEncodeInteger(long long value,unsigned char * enc)243 int rdbEncodeInteger(long long value, unsigned char *enc) {
244     if (value >= -(1<<7) && value <= (1<<7)-1) {
245         enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT8;
246         enc[1] = value&0xFF;
247         return 2;
248     } else if (value >= -(1<<15) && value <= (1<<15)-1) {
249         enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT16;
250         enc[1] = value&0xFF;
251         enc[2] = (value>>8)&0xFF;
252         return 3;
253     } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
254         enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT32;
255         enc[1] = value&0xFF;
256         enc[2] = (value>>8)&0xFF;
257         enc[3] = (value>>16)&0xFF;
258         enc[4] = (value>>24)&0xFF;
259         return 5;
260     } else {
261         return 0;
262     }
263 }
264 
265 /* Loads an integer-encoded object with the specified encoding type "enctype".
266  * The returned value changes according to the flags, see
267  * rdbGenericLoadStringObject() for more info. */
rdbLoadIntegerObject(rio * rdb,int enctype,int flags,size_t * lenptr)268 void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
269     int plain = flags & RDB_LOAD_PLAIN;
270     int sds = flags & RDB_LOAD_SDS;
271     int encode = flags & RDB_LOAD_ENC;
272     unsigned char enc[4];
273     long long val;
274 
275     if (enctype == RDB_ENC_INT8) {
276         if (rioRead(rdb,enc,1) == 0) return NULL;
277         val = (signed char)enc[0];
278     } else if (enctype == RDB_ENC_INT16) {
279         uint16_t v;
280         if (rioRead(rdb,enc,2) == 0) return NULL;
281         v = enc[0]|(enc[1]<<8);
282         val = (int16_t)v;
283     } else if (enctype == RDB_ENC_INT32) {
284         uint32_t v;
285         if (rioRead(rdb,enc,4) == 0) return NULL;
286         v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
287         val = (int32_t)v;
288     } else {
289         rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype);
290         return NULL; /* Never reached. */
291     }
292     if (plain || sds) {
293         char buf[LONG_STR_SIZE], *p;
294         int len = ll2string(buf,sizeof(buf),val);
295         if (lenptr) *lenptr = len;
296         p = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT,len);
297         memcpy(p,buf,len);
298         return p;
299     } else if (encode) {
300         return createStringObjectFromLongLongForValue(val);
301     } else {
302         return createObject(OBJ_STRING,sdsfromlonglong(val));
303     }
304 }
305 
306 /* String objects in the form "2391" "-100" without any space and with a
307  * range of values that can fit in an 8, 16 or 32 bit signed value can be
308  * encoded as integers to save space */
rdbTryIntegerEncoding(char * s,size_t len,unsigned char * enc)309 int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
310     long long value;
311     char *endptr, buf[32];
312 
313     /* Check if it's possible to encode this value as a number */
314     value = strtoll(s, &endptr, 10);
315     if (endptr[0] != '\0') return 0;
316     ll2string(buf,32,value);
317 
318     /* If the number converted back into a string is not identical
319      * then it's not possible to encode the string as integer */
320     if (strlen(buf) != len || memcmp(buf,s,len)) return 0;
321 
322     return rdbEncodeInteger(value,enc);
323 }
324 
rdbSaveLzfBlob(rio * rdb,void * data,size_t compress_len,size_t original_len)325 ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len,
326                        size_t original_len) {
327     unsigned char byte;
328     ssize_t n, nwritten = 0;
329 
330     /* Data compressed! Let's save it on disk */
331     byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF;
332     if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;
333     nwritten += n;
334 
335     if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr;
336     nwritten += n;
337 
338     if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr;
339     nwritten += n;
340 
341     if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr;
342     nwritten += n;
343 
344     return nwritten;
345 
346 writeerr:
347     return -1;
348 }
349 
rdbSaveLzfStringObject(rio * rdb,unsigned char * s,size_t len)350 ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
351     size_t comprlen, outlen;
352     void *out;
353 
354     /* We require at least four bytes compression for this to be worth it */
355     if (len <= 4) return 0;
356     outlen = len-4;
357     if ((out = zmalloc(outlen+1)) == NULL) return 0;
358     comprlen = lzf_compress(s, len, out, outlen);
359     if (comprlen == 0) {
360         zfree(out);
361         return 0;
362     }
363     ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len);
364     zfree(out);
365     return nwritten;
366 }
367 
368 /* Load an LZF compressed string in RDB format. The returned value
369  * changes according to 'flags'. For more info check the
370  * rdbGenericLoadStringObject() function. */
rdbLoadLzfStringObject(rio * rdb,int flags,size_t * lenptr)371 void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
372     int plain = flags & RDB_LOAD_PLAIN;
373     int sds = flags & RDB_LOAD_SDS;
374     uint64_t len, clen;
375     unsigned char *c = NULL;
376     char *val = NULL;
377 
378     if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
379     if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
380     if ((c = zmalloc(clen)) == NULL) goto err;
381 
382     /* Allocate our target according to the uncompressed size. */
383     if (plain) {
384         val = zmalloc(len);
385     } else {
386         val = sdsnewlen(SDS_NOINIT,len);
387     }
388     if (lenptr) *lenptr = len;
389 
390     /* Load the compressed representation and uncompress it to target. */
391     if (rioRead(rdb,c,clen) == 0) goto err;
392     if (lzf_decompress(c,clen,val,len) == 0) {
393         rdbExitReportCorruptRDB("Invalid LZF compressed string");
394     }
395     zfree(c);
396 
397     if (plain || sds) {
398         return val;
399     } else {
400         return createObject(OBJ_STRING,val);
401     }
402 err:
403     zfree(c);
404     if (plain)
405         zfree(val);
406     else
407         sdsfree(val);
408     return NULL;
409 }
410 
411 /* Save a string object as [len][data] on disk. If the object is a string
412  * representation of an integer value we try to save it in a special form */
rdbSaveRawString(rio * rdb,unsigned char * s,size_t len)413 ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
414     int enclen;
415     ssize_t n, nwritten = 0;
416 
417     /* Try integer encoding */
418     if (len <= 11) {
419         unsigned char buf[5];
420         if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
421             if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
422             return enclen;
423         }
424     }
425 
426     /* Try LZF compression - under 20 bytes it's unable to compress even
427      * aaaaaaaaaaaaaaaaaa so skip it */
428     if (server.rdb_compression && len > 20) {
429         n = rdbSaveLzfStringObject(rdb,s,len);
430         if (n == -1) return -1;
431         if (n > 0) return n;
432         /* Return value of 0 means data can't be compressed, save the old way */
433     }
434 
435     /* Store verbatim */
436     if ((n = rdbSaveLen(rdb,len)) == -1) return -1;
437     nwritten += n;
438     if (len > 0) {
439         if (rdbWriteRaw(rdb,s,len) == -1) return -1;
440         nwritten += len;
441     }
442     return nwritten;
443 }
444 
445 /* Save a long long value as either an encoded string or a string. */
rdbSaveLongLongAsStringObject(rio * rdb,long long value)446 ssize_t rdbSaveLongLongAsStringObject(rio *rdb, long long value) {
447     unsigned char buf[32];
448     ssize_t n, nwritten = 0;
449     int enclen = rdbEncodeInteger(value,buf);
450     if (enclen > 0) {
451         return rdbWriteRaw(rdb,buf,enclen);
452     } else {
453         /* Encode as string */
454         enclen = ll2string((char*)buf,32,value);
455         serverAssert(enclen < 32);
456         if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1;
457         nwritten += n;
458         if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1;
459         nwritten += n;
460     }
461     return nwritten;
462 }
463 
464 /* Like rdbSaveRawString() gets a Redis object instead. */
rdbSaveStringObject(rio * rdb,robj * obj)465 ssize_t rdbSaveStringObject(rio *rdb, robj *obj) {
466     /* Avoid to decode the object, then encode it again, if the
467      * object is already integer encoded. */
468     if (obj->encoding == OBJ_ENCODING_INT) {
469         return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
470     } else {
471         serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj));
472         return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));
473     }
474 }
475 
476 /* Load a string object from an RDB file according to flags:
477  *
478  * RDB_LOAD_NONE (no flags): load an RDB object, unencoded.
479  * RDB_LOAD_ENC: If the returned type is a Redis object, try to
480  *               encode it in a special way to be more memory
481  *               efficient. When this flag is passed the function
482  *               no longer guarantees that obj->ptr is an SDS string.
483  * RDB_LOAD_PLAIN: Return a plain string allocated with zmalloc()
484  *                 instead of a Redis object with an sds in it.
485  * RDB_LOAD_SDS: Return an SDS string instead of a Redis object.
486  *
487  * On I/O error NULL is returned.
488  */
rdbGenericLoadStringObject(rio * rdb,int flags,size_t * lenptr)489 void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
490     int encode = flags & RDB_LOAD_ENC;
491     int plain = flags & RDB_LOAD_PLAIN;
492     int sds = flags & RDB_LOAD_SDS;
493     int isencoded;
494     unsigned long long len;
495 
496     len = rdbLoadLen(rdb,&isencoded);
497     if (isencoded) {
498         switch(len) {
499         case RDB_ENC_INT8:
500         case RDB_ENC_INT16:
501         case RDB_ENC_INT32:
502             return rdbLoadIntegerObject(rdb,len,flags,lenptr);
503         case RDB_ENC_LZF:
504             return rdbLoadLzfStringObject(rdb,flags,lenptr);
505         default:
506             rdbExitReportCorruptRDB("Unknown RDB string encoding type %llu",len);
507             return NULL;
508         }
509     }
510 
511     if (len == RDB_LENERR) return NULL;
512     if (plain || sds) {
513         void *buf = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT,len);
514         if (lenptr) *lenptr = len;
515         if (len && rioRead(rdb,buf,len) == 0) {
516             if (plain)
517                 zfree(buf);
518             else
519                 sdsfree(buf);
520             return NULL;
521         }
522         return buf;
523     } else {
524         robj *o = encode ? createStringObject(SDS_NOINIT,len) :
525                            createRawStringObject(SDS_NOINIT,len);
526         if (len && rioRead(rdb,o->ptr,len) == 0) {
527             decrRefCount(o);
528             return NULL;
529         }
530         return o;
531     }
532 }
533 
rdbLoadStringObject(rio * rdb)534 robj *rdbLoadStringObject(rio *rdb) {
535     return rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
536 }
537 
rdbLoadEncodedStringObject(rio * rdb)538 robj *rdbLoadEncodedStringObject(rio *rdb) {
539     return rdbGenericLoadStringObject(rdb,RDB_LOAD_ENC,NULL);
540 }
541 
542 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
543  * 8 bit integer specifying the length of the representation.
544  * This 8 bit integer has special values in order to specify the following
545  * conditions:
546  * 253: not a number
547  * 254: + inf
548  * 255: - inf
549  */
rdbSaveDoubleValue(rio * rdb,double val)550 int rdbSaveDoubleValue(rio *rdb, double val) {
551     unsigned char buf[128];
552     int len;
553 
554     if (isnan(val)) {
555         buf[0] = 253;
556         len = 1;
557     } else if (!isfinite(val)) {
558         len = 1;
559         buf[0] = (val < 0) ? 255 : 254;
560     } else {
561 #if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL)
562         /* Check if the float is in a safe range to be casted into a
563          * long long. We are assuming that long long is 64 bit here.
564          * Also we are assuming that there are no implementations around where
565          * double has precision < 52 bit.
566          *
567          * Under this assumptions we test if a double is inside an interval
568          * where casting to long long is safe. Then using two castings we
569          * make sure the decimal part is zero. If all this is true we use
570          * integer printing function that is much faster. */
571         double min = -4503599627370495; /* (2^52)-1 */
572         double max = 4503599627370496; /* -(2^52) */
573         if (val > min && val < max && val == ((double)((long long)val)))
574             ll2string((char*)buf+1,sizeof(buf)-1,(long long)val);
575         else
576 #endif
577             snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
578         buf[0] = strlen((char*)buf+1);
579         len = buf[0]+1;
580     }
581     return rdbWriteRaw(rdb,buf,len);
582 }
583 
584 /* For information about double serialization check rdbSaveDoubleValue() */
rdbLoadDoubleValue(rio * rdb,double * val)585 int rdbLoadDoubleValue(rio *rdb, double *val) {
586     char buf[256];
587     unsigned char len;
588 
589     if (rioRead(rdb,&len,1) == 0) return -1;
590     switch(len) {
591     case 255: *val = R_NegInf; return 0;
592     case 254: *val = R_PosInf; return 0;
593     case 253: *val = R_Nan; return 0;
594     default:
595         if (rioRead(rdb,buf,len) == 0) return -1;
596         buf[len] = '\0';
597         sscanf(buf, "%lg", val);
598         return 0;
599     }
600 }
601 
602 /* Saves a double for RDB 8 or greater, where IE754 binary64 format is assumed.
603  * We just make sure the integer is always stored in little endian, otherwise
604  * the value is copied verbatim from memory to disk.
605  *
606  * Return -1 on error, the size of the serialized value on success. */
rdbSaveBinaryDoubleValue(rio * rdb,double val)607 int rdbSaveBinaryDoubleValue(rio *rdb, double val) {
608     memrev64ifbe(&val);
609     return rdbWriteRaw(rdb,&val,sizeof(val));
610 }
611 
612 /* Loads a double from RDB 8 or greater. See rdbSaveBinaryDoubleValue() for
613  * more info. On error -1 is returned, otherwise 0. */
rdbLoadBinaryDoubleValue(rio * rdb,double * val)614 int rdbLoadBinaryDoubleValue(rio *rdb, double *val) {
615     if (rioRead(rdb,val,sizeof(*val)) == 0) return -1;
616     memrev64ifbe(val);
617     return 0;
618 }
619 
620 /* Like rdbSaveBinaryDoubleValue() but single precision. */
rdbSaveBinaryFloatValue(rio * rdb,float val)621 int rdbSaveBinaryFloatValue(rio *rdb, float val) {
622     memrev32ifbe(&val);
623     return rdbWriteRaw(rdb,&val,sizeof(val));
624 }
625 
626 /* Like rdbLoadBinaryDoubleValue() but single precision. */
rdbLoadBinaryFloatValue(rio * rdb,float * val)627 int rdbLoadBinaryFloatValue(rio *rdb, float *val) {
628     if (rioRead(rdb,val,sizeof(*val)) == 0) return -1;
629     memrev32ifbe(val);
630     return 0;
631 }
632 
633 /* Save the object type of object "o". */
rdbSaveObjectType(rio * rdb,robj * o)634 int rdbSaveObjectType(rio *rdb, robj *o) {
635     switch (o->type) {
636     case OBJ_STRING:
637         return rdbSaveType(rdb,RDB_TYPE_STRING);
638     case OBJ_LIST:
639         if (o->encoding == OBJ_ENCODING_QUICKLIST)
640             return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
641         else
642             serverPanic("Unknown list encoding");
643     case OBJ_SET:
644         if (o->encoding == OBJ_ENCODING_INTSET)
645             return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
646         else if (o->encoding == OBJ_ENCODING_HT)
647             return rdbSaveType(rdb,RDB_TYPE_SET);
648         else
649             serverPanic("Unknown set encoding");
650     case OBJ_ZSET:
651         if (o->encoding == OBJ_ENCODING_ZIPLIST)
652             return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST);
653         else if (o->encoding == OBJ_ENCODING_SKIPLIST)
654             return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
655         else
656             serverPanic("Unknown sorted set encoding");
657     case OBJ_HASH:
658         if (o->encoding == OBJ_ENCODING_ZIPLIST)
659             return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
660         else if (o->encoding == OBJ_ENCODING_HT)
661             return rdbSaveType(rdb,RDB_TYPE_HASH);
662         else
663             serverPanic("Unknown hash encoding");
664     case OBJ_STREAM:
665         return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
666     case OBJ_MODULE:
667         return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
668     default:
669         serverPanic("Unknown object type");
670     }
671     return -1; /* avoid warning */
672 }
673 
674 /* Use rdbLoadType() to load a TYPE in RDB format, but returns -1 if the
675  * type is not specifically a valid Object Type. */
rdbLoadObjectType(rio * rdb)676 int rdbLoadObjectType(rio *rdb) {
677     int type;
678     if ((type = rdbLoadType(rdb)) == -1) return -1;
679     if (!rdbIsObjectType(type)) return -1;
680     return type;
681 }
682 
683 /* This helper function serializes a consumer group Pending Entries List (PEL)
684  * into the RDB file. The 'nacks' argument tells the function if also persist
685  * the informations about the not acknowledged message, or if to persist
686  * just the IDs: this is useful because for the global consumer group PEL
687  * we serialized the NACKs as well, but when serializing the local consumer
688  * PELs we just add the ID, that will be resolved inside the global PEL to
689  * put a reference to the same structure. */
rdbSaveStreamPEL(rio * rdb,rax * pel,int nacks)690 ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) {
691     ssize_t n, nwritten = 0;
692 
693     /* Number of entries in the PEL. */
694     if ((n = rdbSaveLen(rdb,raxSize(pel))) == -1) return -1;
695     nwritten += n;
696 
697     /* Save each entry. */
698     raxIterator ri;
699     raxStart(&ri,pel);
700     raxSeek(&ri,"^",NULL,0);
701     while(raxNext(&ri)) {
702         /* We store IDs in raw form as 128 big big endian numbers, like
703          * they are inside the radix tree key. */
704         if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) {
705             raxStop(&ri);
706             return -1;
707         }
708         nwritten += n;
709 
710         if (nacks) {
711             streamNACK *nack = ri.data;
712             if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1) {
713                 raxStop(&ri);
714                 return -1;
715             }
716             nwritten += n;
717             if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) {
718                 raxStop(&ri);
719                 return -1;
720             }
721             nwritten += n;
722             /* We don't save the consumer name: we'll save the pending IDs
723              * for each consumer in the consumer PEL, and resolve the consumer
724              * at loading time. */
725         }
726     }
727     raxStop(&ri);
728     return nwritten;
729 }
730 
731 /* Serialize the consumers of a stream consumer group into the RDB. Helper
732  * function for the stream data type serialization. What we do here is to
733  * persist the consumer metadata, and it's PEL, for each consumer. */
rdbSaveStreamConsumers(rio * rdb,streamCG * cg)734 size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
735     ssize_t n, nwritten = 0;
736 
737     /* Number of consumers in this consumer group. */
738     if ((n = rdbSaveLen(rdb,raxSize(cg->consumers))) == -1) return -1;
739     nwritten += n;
740 
741     /* Save each consumer. */
742     raxIterator ri;
743     raxStart(&ri,cg->consumers);
744     raxSeek(&ri,"^",NULL,0);
745     while(raxNext(&ri)) {
746         streamConsumer *consumer = ri.data;
747 
748         /* Consumer name. */
749         if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
750             raxStop(&ri);
751             return -1;
752         }
753         nwritten += n;
754 
755         /* Last seen time. */
756         if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1) {
757             raxStop(&ri);
758             return -1;
759         }
760         nwritten += n;
761 
762         /* Consumer PEL, without the ACKs (see last parameter of the function
763          * passed with value of 0), at loading time we'll lookup the ID
764          * in the consumer group global PEL and will put a reference in the
765          * consumer local PEL. */
766         if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1) {
767             raxStop(&ri);
768             return -1;
769         }
770         nwritten += n;
771     }
772     raxStop(&ri);
773     return nwritten;
774 }
775 
776 /* Save a Redis object.
777  * Returns -1 on error, number of bytes written on success. */
rdbSaveObject(rio * rdb,robj * o,robj * key)778 ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
779     ssize_t n = 0, nwritten = 0;
780 
781     if (o->type == OBJ_STRING) {
782         /* Save a string value */
783         if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
784         nwritten += n;
785     } else if (o->type == OBJ_LIST) {
786         /* Save a list value */
787         if (o->encoding == OBJ_ENCODING_QUICKLIST) {
788             quicklist *ql = o->ptr;
789             quicklistNode *node = ql->head;
790 
791             if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
792             nwritten += n;
793 
794             while(node) {
795                 if (quicklistNodeIsCompressed(node)) {
796                     void *data;
797                     size_t compress_len = quicklistGetLzf(node, &data);
798                     if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
799                     nwritten += n;
800                 } else {
801                     if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
802                     nwritten += n;
803                 }
804                 node = node->next;
805             }
806         } else {
807             serverPanic("Unknown list encoding");
808         }
809     } else if (o->type == OBJ_SET) {
810         /* Save a set value */
811         if (o->encoding == OBJ_ENCODING_HT) {
812             dict *set = o->ptr;
813             dictIterator *di = dictGetIterator(set);
814             dictEntry *de;
815 
816             if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) {
817                 dictReleaseIterator(di);
818                 return -1;
819             }
820             nwritten += n;
821 
822             while((de = dictNext(di)) != NULL) {
823                 sds ele = dictGetKey(de);
824                 if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
825                     == -1)
826                 {
827                     dictReleaseIterator(di);
828                     return -1;
829                 }
830                 nwritten += n;
831             }
832             dictReleaseIterator(di);
833         } else if (o->encoding == OBJ_ENCODING_INTSET) {
834             size_t l = intsetBlobLen((intset*)o->ptr);
835 
836             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
837             nwritten += n;
838         } else {
839             serverPanic("Unknown set encoding");
840         }
841     } else if (o->type == OBJ_ZSET) {
842         /* Save a sorted set value */
843         if (o->encoding == OBJ_ENCODING_ZIPLIST) {
844             size_t l = ziplistBlobLen((unsigned char*)o->ptr);
845 
846             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
847             nwritten += n;
848         } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
849             zset *zs = o->ptr;
850             zskiplist *zsl = zs->zsl;
851 
852             if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
853             nwritten += n;
854 
855             /* We save the skiplist elements from the greatest to the smallest
856              * (that's trivial since the elements are already ordered in the
857              * skiplist): this improves the load process, since the next loaded
858              * element will always be the smaller, so adding to the skiplist
859              * will always immediately stop at the head, making the insertion
860              * O(1) instead of O(log(N)). */
861             zskiplistNode *zn = zsl->tail;
862             while (zn != NULL) {
863                 if ((n = rdbSaveRawString(rdb,
864                     (unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
865                 {
866                     return -1;
867                 }
868                 nwritten += n;
869                 if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
870                     return -1;
871                 nwritten += n;
872                 zn = zn->backward;
873             }
874         } else {
875             serverPanic("Unknown sorted set encoding");
876         }
877     } else if (o->type == OBJ_HASH) {
878         /* Save a hash value */
879         if (o->encoding == OBJ_ENCODING_ZIPLIST) {
880             size_t l = ziplistBlobLen((unsigned char*)o->ptr);
881 
882             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
883             nwritten += n;
884 
885         } else if (o->encoding == OBJ_ENCODING_HT) {
886             dictIterator *di = dictGetIterator(o->ptr);
887             dictEntry *de;
888 
889             if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {
890                 dictReleaseIterator(di);
891                 return -1;
892             }
893             nwritten += n;
894 
895             while((de = dictNext(di)) != NULL) {
896                 sds field = dictGetKey(de);
897                 sds value = dictGetVal(de);
898 
899                 if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
900                         sdslen(field))) == -1)
901                 {
902                     dictReleaseIterator(di);
903                     return -1;
904                 }
905                 nwritten += n;
906                 if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
907                         sdslen(value))) == -1)
908                 {
909                     dictReleaseIterator(di);
910                     return -1;
911                 }
912                 nwritten += n;
913             }
914             dictReleaseIterator(di);
915         } else {
916             serverPanic("Unknown hash encoding");
917         }
918     } else if (o->type == OBJ_STREAM) {
919         /* Store how many listpacks we have inside the radix tree. */
920         stream *s = o->ptr;
921         rax *rax = s->rax;
922         if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
923         nwritten += n;
924 
925         /* Serialize all the listpacks inside the radix tree as they are,
926          * when loading back, we'll use the first entry of each listpack
927          * to insert it back into the radix tree. */
928         raxIterator ri;
929         raxStart(&ri,rax);
930         raxSeek(&ri,"^",NULL,0);
931         while (raxNext(&ri)) {
932             unsigned char *lp = ri.data;
933             size_t lp_bytes = lpBytes(lp);
934             if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
935                 raxStop(&ri);
936                 return -1;
937             }
938             nwritten += n;
939             if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) {
940                 raxStop(&ri);
941                 return -1;
942             }
943             nwritten += n;
944         }
945         raxStop(&ri);
946 
947         /* Save the number of elements inside the stream. We cannot obtain
948          * this easily later, since our macro nodes should be checked for
949          * number of items: not a great CPU / space tradeoff. */
950         if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
951         nwritten += n;
952         /* Save the last entry ID. */
953         if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
954         nwritten += n;
955         if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
956         nwritten += n;
957 
958         /* The consumer groups and their clients are part of the stream
959          * type, so serialize every consumer group. */
960 
961         /* Save the number of groups. */
962         size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
963         if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1;
964         nwritten += n;
965 
966         if (num_cgroups) {
967             /* Serialize each consumer group. */
968             raxStart(&ri,s->cgroups);
969             raxSeek(&ri,"^",NULL,0);
970             while(raxNext(&ri)) {
971                 streamCG *cg = ri.data;
972 
973                 /* Save the group name. */
974                 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
975                     raxStop(&ri);
976                     return -1;
977                 }
978                 nwritten += n;
979 
980                 /* Last ID. */
981                 if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) {
982                     raxStop(&ri);
983                     return -1;
984                 }
985                 nwritten += n;
986                 if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) {
987                     raxStop(&ri);
988                     return -1;
989                 }
990                 nwritten += n;
991 
992                 /* Save the global PEL. */
993                 if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) {
994                     raxStop(&ri);
995                     return -1;
996                 }
997                 nwritten += n;
998 
999                 /* Save the consumers of this group. */
1000                 if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) {
1001                     raxStop(&ri);
1002                     return -1;
1003                 }
1004                 nwritten += n;
1005             }
1006             raxStop(&ri);
1007         }
1008     } else if (o->type == OBJ_MODULE) {
1009         /* Save a module-specific value. */
1010         RedisModuleIO io;
1011         moduleValue *mv = o->ptr;
1012         moduleType *mt = mv->type;
1013 
1014         /* Write the "module" identifier as prefix, so that we'll be able
1015          * to call the right module during loading. */
1016         int retval = rdbSaveLen(rdb,mt->id);
1017         if (retval == -1) return -1;
1018         io.bytes += retval;
1019 
1020         /* Then write the module-specific representation + EOF marker. */
1021         moduleInitIOContext(io,mt,rdb,key);
1022         mt->rdb_save(&io,mv->value);
1023         retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
1024         if (retval == -1)
1025             io.error = 1;
1026         else
1027             io.bytes += retval;
1028 
1029         if (io.ctx) {
1030             moduleFreeContext(io.ctx);
1031             zfree(io.ctx);
1032         }
1033         return io.error ? -1 : (ssize_t)io.bytes;
1034     } else {
1035         serverPanic("Unknown object type");
1036     }
1037     return nwritten;
1038 }
1039 
1040 /* Return the length the object will have on disk if saved with
1041  * the rdbSaveObject() function. Currently we use a trick to get
1042  * this length with very little changes to the code. In the future
1043  * we could switch to a faster solution. */
rdbSavedObjectLen(robj * o,robj * key)1044 size_t rdbSavedObjectLen(robj *o, robj *key) {
1045     ssize_t len = rdbSaveObject(NULL,o,key);
1046     serverAssertWithInfo(NULL,o,len != -1);
1047     return len;
1048 }
1049 
1050 /* Save a key-value pair, with expire time, type, key, value.
1051  * On error -1 is returned.
1052  * On success if the key was actually saved 1 is returned, otherwise 0
1053  * is returned (the key was already expired). */
rdbSaveKeyValuePair(rio * rdb,robj * key,robj * val,long long expiretime)1054 int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
1055     int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
1056     int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
1057 
1058     /* Save the expire time */
1059     if (expiretime != -1) {
1060         if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
1061         if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
1062     }
1063 
1064     /* Save the LRU info. */
1065     if (savelru) {
1066         uint64_t idletime = estimateObjectIdleTime(val);
1067         idletime /= 1000; /* Using seconds is enough and requires less space.*/
1068         if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
1069         if (rdbSaveLen(rdb,idletime) == -1) return -1;
1070     }
1071 
1072     /* Save the LFU info. */
1073     if (savelfu) {
1074         uint8_t buf[1];
1075         buf[0] = LFUDecrAndReturn(val);
1076         /* We can encode this in exactly two bytes: the opcode and an 8
1077          * bit counter, since the frequency is logarithmic with a 0-255 range.
1078          * Note that we do not store the halving time because to reset it
1079          * a single time when loading does not affect the frequency much. */
1080         if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
1081         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
1082     }
1083 
1084     /* Save type, key, value */
1085     if (rdbSaveObjectType(rdb,val) == -1) return -1;
1086     if (rdbSaveStringObject(rdb,key) == -1) return -1;
1087     if (rdbSaveObject(rdb,val,key) == -1) return -1;
1088 
1089     /* Delay return if required (for testing) */
1090     if (server.rdb_key_save_delay)
1091         usleep(server.rdb_key_save_delay);
1092 
1093     return 1;
1094 }
1095 
1096 /* Save an AUX field. */
rdbSaveAuxField(rio * rdb,void * key,size_t keylen,void * val,size_t vallen)1097 ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) {
1098     ssize_t ret, len = 0;
1099     if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;
1100     len += ret;
1101     if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1;
1102     len += ret;
1103     if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1;
1104     len += ret;
1105     return len;
1106 }
1107 
1108 /* Wrapper for rdbSaveAuxField() used when key/val length can be obtained
1109  * with strlen(). */
rdbSaveAuxFieldStrStr(rio * rdb,char * key,char * val)1110 ssize_t rdbSaveAuxFieldStrStr(rio *rdb, char *key, char *val) {
1111     return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val));
1112 }
1113 
1114 /* Wrapper for strlen(key) + integer type (up to long long range). */
rdbSaveAuxFieldStrInt(rio * rdb,char * key,long long val)1115 ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
1116     char buf[LONG_STR_SIZE];
1117     int vlen = ll2string(buf,sizeof(buf),val);
1118     return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen);
1119 }
1120 
1121 /* Save a few default AUX fields with information about the RDB generated. */
rdbSaveInfoAuxFields(rio * rdb,int rdbflags,rdbSaveInfo * rsi)1122 int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
1123     int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
1124     int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
1125 
1126     /* Add a few fields about the state when the RDB was created. */
1127     if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
1128     if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
1129     if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
1130     if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
1131 
1132     /* Handle saving options that generate aux fields. */
1133     if (rsi) {
1134         if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
1135             == -1) return -1;
1136         if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
1137             == -1) return -1;
1138         if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
1139             == -1) return -1;
1140     }
1141     if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
1142     return 1;
1143 }
1144 
rdbSaveSingleModuleAux(rio * rdb,int when,moduleType * mt)1145 ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
1146     /* Save a module-specific aux value. */
1147     RedisModuleIO io;
1148     int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX);
1149     if (retval == -1) return -1;
1150     io.bytes += retval;
1151 
1152     /* Write the "module" identifier as prefix, so that we'll be able
1153      * to call the right module during loading. */
1154     retval = rdbSaveLen(rdb,mt->id);
1155     if (retval == -1) return -1;
1156     io.bytes += retval;
1157 
1158     /* write the 'when' so that we can provide it on loading. add a UINT opcode
1159      * for backwards compatibility, everything after the MT needs to be prefixed
1160      * by an opcode. */
1161     retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_UINT);
1162     if (retval == -1) return -1;
1163     io.bytes += retval;
1164     retval = rdbSaveLen(rdb,when);
1165     if (retval == -1) return -1;
1166     io.bytes += retval;
1167 
1168     /* Then write the module-specific representation + EOF marker. */
1169     moduleInitIOContext(io,mt,rdb,NULL);
1170     mt->aux_save(&io,when);
1171     retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
1172     if (retval == -1)
1173         io.error = 1;
1174     else
1175         io.bytes += retval;
1176 
1177     if (io.ctx) {
1178         moduleFreeContext(io.ctx);
1179         zfree(io.ctx);
1180     }
1181     if (io.error)
1182         return -1;
1183     return io.bytes;
1184 }
1185 
1186 /* Produces a dump of the database in RDB format sending it to the specified
1187  * Redis I/O channel. On success C_OK is returned, otherwise C_ERR
1188  * is returned and part of the output, or all the output, can be
1189  * missing because of I/O errors.
1190  *
1191  * When the function returns C_ERR and if 'error' is not NULL, the
1192  * integer pointed by 'error' is set to the value of errno just after the I/O
1193  * error. */
rdbSaveRio(rio * rdb,int * error,int rdbflags,rdbSaveInfo * rsi)1194 int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
1195     dictIterator *di = NULL;
1196     dictEntry *de;
1197     char magic[10];
1198     int j;
1199     uint64_t cksum;
1200     size_t processed = 0;
1201 
1202     if (server.rdb_checksum)
1203         rdb->update_cksum = rioGenericUpdateChecksum;
1204     snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
1205     if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
1206     if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
1207     if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
1208 
1209     for (j = 0; j < server.dbnum; j++) {
1210         redisDb *db = server.db+j;
1211         dict *d = db->dict;
1212         if (dictSize(d) == 0) continue;
1213         di = dictGetSafeIterator(d);
1214 
1215         /* Write the SELECT DB opcode */
1216         if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
1217         if (rdbSaveLen(rdb,j) == -1) goto werr;
1218 
1219         /* Write the RESIZE DB opcode. */
1220         uint64_t db_size, expires_size;
1221         db_size = dictSize(db->dict);
1222         expires_size = dictSize(db->expires);
1223         if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
1224         if (rdbSaveLen(rdb,db_size) == -1) goto werr;
1225         if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
1226 
1227         /* Iterate this DB writing every entry */
1228         while((de = dictNext(di)) != NULL) {
1229             sds keystr = dictGetKey(de);
1230             robj key, *o = dictGetVal(de);
1231             long long expire;
1232 
1233             initStaticStringObject(key,keystr);
1234             expire = getExpire(db,&key);
1235             if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
1236 
1237             /* When this RDB is produced as part of an AOF rewrite, move
1238              * accumulated diff from parent to child while rewriting in
1239              * order to have a smaller final write. */
1240             if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
1241                 rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
1242             {
1243                 processed = rdb->processed_bytes;
1244                 aofReadDiffFromParent();
1245             }
1246         }
1247         dictReleaseIterator(di);
1248         di = NULL; /* So that we don't release it again on error. */
1249     }
1250 
1251     /* If we are storing the replication information on disk, persist
1252      * the script cache as well: on successful PSYNC after a restart, we need
1253      * to be able to process any EVALSHA inside the replication backlog the
1254      * master will send us. */
1255     if (rsi && dictSize(server.lua_scripts)) {
1256         di = dictGetIterator(server.lua_scripts);
1257         while((de = dictNext(di)) != NULL) {
1258             robj *body = dictGetVal(de);
1259             if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
1260                 goto werr;
1261         }
1262         dictReleaseIterator(di);
1263         di = NULL; /* So that we don't release it again on error. */
1264     }
1265 
1266     if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
1267 
1268     /* EOF opcode */
1269     if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
1270 
1271     /* CRC64 checksum. It will be zero if checksum computation is disabled, the
1272      * loading code skips the check in this case. */
1273     cksum = rdb->cksum;
1274     memrev64ifbe(&cksum);
1275     if (rioWrite(rdb,&cksum,8) == 0) goto werr;
1276     return C_OK;
1277 
1278 werr:
1279     if (error) *error = errno;
1280     if (di) dictReleaseIterator(di);
1281     return C_ERR;
1282 }
1283 
1284 /* This is just a wrapper to rdbSaveRio() that additionally adds a prefix
1285  * and a suffix to the generated RDB dump. The prefix is:
1286  *
1287  * $EOF:<40 bytes unguessable hex string>\r\n
1288  *
1289  * While the suffix is the 40 bytes hex string we announced in the prefix.
1290  * This way processes receiving the payload can understand when it ends
1291  * without doing any processing of the content. */
rdbSaveRioWithEOFMark(rio * rdb,int * error,rdbSaveInfo * rsi)1292 int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
1293     char eofmark[RDB_EOF_MARK_SIZE];
1294 
1295     startSaving(RDBFLAGS_REPLICATION);
1296     getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
1297     if (error) *error = 0;
1298     if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
1299     if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
1300     if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
1301     if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
1302     if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
1303     stopSaving(1);
1304     return C_OK;
1305 
1306 werr: /* Write error. */
1307     /* Set 'error' only if not already set by rdbSaveRio() call. */
1308     if (error && *error == 0) *error = errno;
1309     stopSaving(0);
1310     return C_ERR;
1311 }
1312 
1313 /* Save the DB on disk. Return C_ERR on error, C_OK on success. */
rdbSave(char * filename,rdbSaveInfo * rsi)1314 int rdbSave(char *filename, rdbSaveInfo *rsi) {
1315     char tmpfile[256];
1316     char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
1317     FILE *fp = NULL;
1318     rio rdb;
1319     int error = 0;
1320 
1321     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
1322     fp = fopen(tmpfile,"w");
1323     if (!fp) {
1324         char *cwdp = getcwd(cwd,MAXPATHLEN);
1325         serverLog(LL_WARNING,
1326             "Failed opening the RDB file %s (in server root dir %s) "
1327             "for saving: %s",
1328             filename,
1329             cwdp ? cwdp : "unknown",
1330             strerror(errno));
1331         return C_ERR;
1332     }
1333 
1334     rioInitWithFile(&rdb,fp);
1335     startSaving(RDBFLAGS_NONE);
1336 
1337     if (server.rdb_save_incremental_fsync)
1338         rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
1339 
1340     if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
1341         errno = error;
1342         goto werr;
1343     }
1344 
1345     /* Make sure data will not remain on the OS's output buffers */
1346     if (fflush(fp)) goto werr;
1347     if (fsync(fileno(fp))) goto werr;
1348     if (fclose(fp)) { fp = NULL; goto werr; }
1349     fp = NULL;
1350 
1351     /* Use RENAME to make sure the DB file is changed atomically only
1352      * if the generate DB file is ok. */
1353     if (rename(tmpfile,filename) == -1) {
1354         char *cwdp = getcwd(cwd,MAXPATHLEN);
1355         serverLog(LL_WARNING,
1356             "Error moving temp DB file %s on the final "
1357             "destination %s (in server root dir %s): %s",
1358             tmpfile,
1359             filename,
1360             cwdp ? cwdp : "unknown",
1361             strerror(errno));
1362         unlink(tmpfile);
1363         stopSaving(0);
1364         return C_ERR;
1365     }
1366 
1367     serverLog(LL_NOTICE,"DB saved on disk");
1368     server.dirty = 0;
1369     server.lastsave = time(NULL);
1370     server.lastbgsave_status = C_OK;
1371     stopSaving(1);
1372     return C_OK;
1373 
1374 werr:
1375     serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1376     if (fp) fclose(fp);
1377     unlink(tmpfile);
1378     stopSaving(0);
1379     return C_ERR;
1380 }
1381 
rdbSaveBackground(char * filename,rdbSaveInfo * rsi)1382 int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
1383     pid_t childpid;
1384 
1385     if (hasActiveChildProcess()) return C_ERR;
1386 
1387     server.dirty_before_bgsave = server.dirty;
1388     server.lastbgsave_try = time(NULL);
1389     openChildInfoPipe();
1390 
1391     if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
1392         int retval;
1393 
1394         /* Child */
1395         redisSetProcTitle("redis-rdb-bgsave");
1396         redisSetCpuAffinity(server.bgsave_cpulist);
1397         retval = rdbSave(filename,rsi);
1398         if (retval == C_OK) {
1399             sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
1400         }
1401         exitFromChild((retval == C_OK) ? 0 : 1);
1402     } else {
1403         /* Parent */
1404         if (childpid == -1) {
1405             closeChildInfoPipe();
1406             server.lastbgsave_status = C_ERR;
1407             serverLog(LL_WARNING,"Can't save in background: fork: %s",
1408                 strerror(errno));
1409             return C_ERR;
1410         }
1411         serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
1412         server.rdb_save_time_start = time(NULL);
1413         server.rdb_child_pid = childpid;
1414         server.rdb_child_type = RDB_CHILD_TYPE_DISK;
1415         updateDictResizePolicy();
1416         return C_OK;
1417     }
1418     return C_OK; /* unreached */
1419 }
1420 
1421 /* Note that we may call this function in signal handle 'sigShutdownHandler',
1422  * so we need guarantee all functions we call are async-signal-safe.
1423  * If  we call this function from signal handle, we won't call bg_unlik that
1424  * is not async-signal-safe. */
rdbRemoveTempFile(pid_t childpid,int from_signal)1425 void rdbRemoveTempFile(pid_t childpid, int from_signal) {
1426     char tmpfile[256];
1427     char pid[32];
1428 
1429     /* Generate temp rdb file name using aync-signal safe functions. */
1430     int pid_len = ll2string(pid, sizeof(pid), childpid);
1431     strcpy(tmpfile, "temp-");
1432     strncpy(tmpfile+5, pid, pid_len);
1433     strcpy(tmpfile+5+pid_len, ".rdb");
1434 
1435     if (from_signal) {
1436         /* bg_unlink is not async-signal-safe, but in this case we don't really
1437          * need to close the fd, it'll be released when the process exists. */
1438         int fd = open(tmpfile, O_RDONLY|O_NONBLOCK);
1439         UNUSED(fd);
1440         unlink(tmpfile);
1441     } else {
1442         bg_unlink(tmpfile);
1443     }
1444 }
1445 
1446 /* This function is called by rdbLoadObject() when the code is in RDB-check
1447  * mode and we find a module value of type 2 that can be parsed without
1448  * the need of the actual module. The value is parsed for errors, finally
1449  * a dummy redis object is returned just to conform to the API. */
rdbLoadCheckModuleValue(rio * rdb,char * modulename)1450 robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
1451     uint64_t opcode;
1452     while((opcode = rdbLoadLen(rdb,NULL)) != RDB_MODULE_OPCODE_EOF) {
1453         if (opcode == RDB_MODULE_OPCODE_SINT ||
1454             opcode == RDB_MODULE_OPCODE_UINT)
1455         {
1456             uint64_t len;
1457             if (rdbLoadLenByRef(rdb,NULL,&len) == -1) {
1458                 rdbExitReportCorruptRDB(
1459                     "Error reading integer from module %s value", modulename);
1460             }
1461         } else if (opcode == RDB_MODULE_OPCODE_STRING) {
1462             robj *o = rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
1463             if (o == NULL) {
1464                 rdbExitReportCorruptRDB(
1465                     "Error reading string from module %s value", modulename);
1466             }
1467             decrRefCount(o);
1468         } else if (opcode == RDB_MODULE_OPCODE_FLOAT) {
1469             float val;
1470             if (rdbLoadBinaryFloatValue(rdb,&val) == -1) {
1471                 rdbExitReportCorruptRDB(
1472                     "Error reading float from module %s value", modulename);
1473             }
1474         } else if (opcode == RDB_MODULE_OPCODE_DOUBLE) {
1475             double val;
1476             if (rdbLoadBinaryDoubleValue(rdb,&val) == -1) {
1477                 rdbExitReportCorruptRDB(
1478                     "Error reading double from module %s value", modulename);
1479             }
1480         }
1481     }
1482     return createStringObject("module-dummy-value",18);
1483 }
1484 
1485 /* Load a Redis object of the specified type from the specified file.
1486  * On success a newly allocated object is returned, otherwise NULL. */
rdbLoadObject(int rdbtype,rio * rdb,sds key)1487 robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
1488     robj *o = NULL, *ele, *dec;
1489     uint64_t len;
1490     unsigned int i;
1491 
1492     if (rdbtype == RDB_TYPE_STRING) {
1493         /* Read string value */
1494         if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
1495         o = tryObjectEncoding(o);
1496     } else if (rdbtype == RDB_TYPE_LIST) {
1497         /* Read list value */
1498         if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1499 
1500         o = createQuicklistObject();
1501         quicklistSetOptions(o->ptr, server.list_max_ziplist_size,
1502                             server.list_compress_depth);
1503 
1504         /* Load every single element of the list */
1505         while(len--) {
1506             if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) {
1507                 decrRefCount(o);
1508                 return NULL;
1509             }
1510             dec = getDecodedObject(ele);
1511             size_t len = sdslen(dec->ptr);
1512             quicklistPushTail(o->ptr, dec->ptr, len);
1513             decrRefCount(dec);
1514             decrRefCount(ele);
1515         }
1516     } else if (rdbtype == RDB_TYPE_SET) {
1517         /* Read Set value */
1518         if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1519 
1520         /* Use a regular set when there are too many entries. */
1521         size_t max_entries = server.set_max_intset_entries;
1522         if (max_entries >= 1<<30) max_entries = 1<<30;
1523         if (len > max_entries) {
1524             o = createSetObject();
1525             /* It's faster to expand the dict to the right size asap in order
1526              * to avoid rehashing */
1527             if (len > DICT_HT_INITIAL_SIZE)
1528                 dictExpand(o->ptr,len);
1529         } else {
1530             o = createIntsetObject();
1531         }
1532 
1533         /* Load every single element of the set */
1534         for (i = 0; i < len; i++) {
1535             long long llval;
1536             sds sdsele;
1537 
1538             if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
1539                 decrRefCount(o);
1540                 return NULL;
1541             }
1542 
1543             if (o->encoding == OBJ_ENCODING_INTSET) {
1544                 /* Fetch integer value from element. */
1545                 if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) {
1546                     o->ptr = intsetAdd(o->ptr,llval,NULL);
1547                 } else {
1548                     setTypeConvert(o,OBJ_ENCODING_HT);
1549                     dictExpand(o->ptr,len);
1550                 }
1551             }
1552 
1553             /* This will also be called when the set was just converted
1554              * to a regular hash table encoded set. */
1555             if (o->encoding == OBJ_ENCODING_HT) {
1556                 dictAdd((dict*)o->ptr,sdsele,NULL);
1557             } else {
1558                 sdsfree(sdsele);
1559             }
1560         }
1561     } else if (rdbtype == RDB_TYPE_ZSET_2 || rdbtype == RDB_TYPE_ZSET) {
1562         /* Read list/set value. */
1563         uint64_t zsetlen;
1564         size_t maxelelen = 0, totelelen = 0;
1565         zset *zs;
1566 
1567         if ((zsetlen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1568         o = createZsetObject();
1569         zs = o->ptr;
1570 
1571         if (zsetlen > DICT_HT_INITIAL_SIZE)
1572             dictExpand(zs->dict,zsetlen);
1573 
1574         /* Load every single element of the sorted set. */
1575         while(zsetlen--) {
1576             sds sdsele;
1577             double score;
1578             zskiplistNode *znode;
1579 
1580             if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
1581                 decrRefCount(o);
1582                 return NULL;
1583             }
1584 
1585             if (rdbtype == RDB_TYPE_ZSET_2) {
1586                 if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) {
1587                     decrRefCount(o);
1588                     sdsfree(sdsele);
1589                     return NULL;
1590                 }
1591             } else {
1592                 if (rdbLoadDoubleValue(rdb,&score) == -1) {
1593                     decrRefCount(o);
1594                     sdsfree(sdsele);
1595                     return NULL;
1596                 }
1597             }
1598 
1599             /* Don't care about integer-encoded strings. */
1600             if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele);
1601             totelelen += sdslen(sdsele);
1602 
1603             znode = zslInsert(zs->zsl,score,sdsele);
1604             dictAdd(zs->dict,sdsele,&znode->score);
1605         }
1606 
1607         /* Convert *after* loading, since sorted sets are not stored ordered. */
1608         if (zsetLength(o) <= server.zset_max_ziplist_entries &&
1609             maxelelen <= server.zset_max_ziplist_value &&
1610             ziplistSafeToAdd(NULL, totelelen))
1611         {
1612             zsetConvert(o,OBJ_ENCODING_ZIPLIST);
1613         }
1614     } else if (rdbtype == RDB_TYPE_HASH) {
1615         uint64_t len;
1616         int ret;
1617         sds field, value;
1618 
1619         len = rdbLoadLen(rdb, NULL);
1620         if (len == RDB_LENERR) return NULL;
1621 
1622         o = createHashObject();
1623 
1624         /* Too many entries? Use a hash table. */
1625         if (len > server.hash_max_ziplist_entries)
1626             hashTypeConvert(o, OBJ_ENCODING_HT);
1627 
1628         /* Load every field and value into the ziplist */
1629         while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) {
1630             len--;
1631             /* Load raw strings */
1632             if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
1633                 decrRefCount(o);
1634                 return NULL;
1635             }
1636             if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
1637                 sdsfree(field);
1638                 decrRefCount(o);
1639                 return NULL;
1640             }
1641 
1642             /* Convert to hash table if size threshold is exceeded */
1643             if (sdslen(field) > server.hash_max_ziplist_value ||
1644                 sdslen(value) > server.hash_max_ziplist_value ||
1645                 !ziplistSafeToAdd(o->ptr, sdslen(field)+sdslen(value)))
1646             {
1647                 hashTypeConvert(o, OBJ_ENCODING_HT);
1648                 ret = dictAdd((dict*)o->ptr, field, value);
1649                 if (ret == DICT_ERR) {
1650                     rdbExitReportCorruptRDB("Duplicate hash fields detected");
1651                 }
1652                 break;
1653             }
1654 
1655             /* Add pair to ziplist */
1656             o->ptr = ziplistPush(o->ptr, (unsigned char*)field,
1657                     sdslen(field), ZIPLIST_TAIL);
1658             o->ptr = ziplistPush(o->ptr, (unsigned char*)value,
1659                     sdslen(value), ZIPLIST_TAIL);
1660 
1661             sdsfree(field);
1662             sdsfree(value);
1663         }
1664 
1665         if (o->encoding == OBJ_ENCODING_HT && len > DICT_HT_INITIAL_SIZE)
1666             dictExpand(o->ptr,len);
1667 
1668         /* Load remaining fields and values into the hash table */
1669         while (o->encoding == OBJ_ENCODING_HT && len > 0) {
1670             len--;
1671             /* Load encoded strings */
1672             if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
1673                 decrRefCount(o);
1674                 return NULL;
1675             }
1676             if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
1677                 sdsfree(field);
1678                 decrRefCount(o);
1679                 return NULL;
1680             }
1681 
1682             /* Add pair to hash table */
1683             ret = dictAdd((dict*)o->ptr, field, value);
1684             if (ret == DICT_ERR) {
1685                 rdbExitReportCorruptRDB("Duplicate keys detected");
1686             }
1687         }
1688 
1689         /* All pairs should be read by now */
1690         serverAssert(len == 0);
1691     } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST) {
1692         if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
1693         o = createQuicklistObject();
1694         quicklistSetOptions(o->ptr, server.list_max_ziplist_size,
1695                             server.list_compress_depth);
1696 
1697         while (len--) {
1698             unsigned char *zl =
1699                 rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
1700             if (zl == NULL) {
1701                 decrRefCount(o);
1702                 return NULL;
1703             }
1704             quicklistAppendZiplist(o->ptr, zl);
1705         }
1706     } else if (rdbtype == RDB_TYPE_HASH_ZIPMAP  ||
1707                rdbtype == RDB_TYPE_LIST_ZIPLIST ||
1708                rdbtype == RDB_TYPE_SET_INTSET   ||
1709                rdbtype == RDB_TYPE_ZSET_ZIPLIST ||
1710                rdbtype == RDB_TYPE_HASH_ZIPLIST)
1711     {
1712         unsigned char *encoded =
1713             rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
1714         if (encoded == NULL) return NULL;
1715         o = createObject(OBJ_STRING,encoded); /* Obj type fixed below. */
1716 
1717         /* Fix the object encoding, and make sure to convert the encoded
1718          * data type into the base type if accordingly to the current
1719          * configuration there are too many elements in the encoded data
1720          * type. Note that we only check the length and not max element
1721          * size as this is an O(N) scan. Eventually everything will get
1722          * converted. */
1723         switch(rdbtype) {
1724             case RDB_TYPE_HASH_ZIPMAP:
1725                 /* Convert to ziplist encoded hash. This must be deprecated
1726                  * when loading dumps created by Redis 2.4 gets deprecated. */
1727                 {
1728                     unsigned char *zl = ziplistNew();
1729                     unsigned char *zi = zipmapRewind(o->ptr);
1730                     unsigned char *fstr, *vstr;
1731                     unsigned int flen, vlen;
1732                     unsigned int maxlen = 0;
1733 
1734                     while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) {
1735                         if (flen > maxlen) maxlen = flen;
1736                         if (vlen > maxlen) maxlen = vlen;
1737                         if (!ziplistSafeToAdd(zl, (size_t)flen + vlen)) {
1738                             rdbExitReportCorruptRDB("Hash zipmap too big (%u)", flen);
1739                         }
1740 
1741                         zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL);
1742                         zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL);
1743                     }
1744 
1745                     zfree(o->ptr);
1746                     o->ptr = zl;
1747                     o->type = OBJ_HASH;
1748                     o->encoding = OBJ_ENCODING_ZIPLIST;
1749 
1750                     if (hashTypeLength(o) > server.hash_max_ziplist_entries ||
1751                         maxlen > server.hash_max_ziplist_value)
1752                     {
1753                         hashTypeConvert(o, OBJ_ENCODING_HT);
1754                     }
1755                 }
1756                 break;
1757             case RDB_TYPE_LIST_ZIPLIST:
1758                 o->type = OBJ_LIST;
1759                 o->encoding = OBJ_ENCODING_ZIPLIST;
1760                 listTypeConvert(o,OBJ_ENCODING_QUICKLIST);
1761                 break;
1762             case RDB_TYPE_SET_INTSET:
1763                 o->type = OBJ_SET;
1764                 o->encoding = OBJ_ENCODING_INTSET;
1765                 if (intsetLen(o->ptr) > server.set_max_intset_entries)
1766                     setTypeConvert(o,OBJ_ENCODING_HT);
1767                 break;
1768             case RDB_TYPE_ZSET_ZIPLIST:
1769                 o->type = OBJ_ZSET;
1770                 o->encoding = OBJ_ENCODING_ZIPLIST;
1771                 if (zsetLength(o) > server.zset_max_ziplist_entries)
1772                     zsetConvert(o,OBJ_ENCODING_SKIPLIST);
1773                 break;
1774             case RDB_TYPE_HASH_ZIPLIST:
1775                 o->type = OBJ_HASH;
1776                 o->encoding = OBJ_ENCODING_ZIPLIST;
1777                 if (hashTypeLength(o) > server.hash_max_ziplist_entries)
1778                     hashTypeConvert(o, OBJ_ENCODING_HT);
1779                 break;
1780             default:
1781                 /* totally unreachable */
1782                 rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
1783                 break;
1784         }
1785     } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) {
1786         o = createStreamObject();
1787         stream *s = o->ptr;
1788         uint64_t listpacks = rdbLoadLen(rdb,NULL);
1789         if (listpacks == RDB_LENERR) {
1790             rdbReportReadError("Stream listpacks len loading failed.");
1791             decrRefCount(o);
1792             return NULL;
1793         }
1794 
1795         while(listpacks--) {
1796             /* Get the master ID, the one we'll use as key of the radix tree
1797              * node: the entries inside the listpack itself are delta-encoded
1798              * relatively to this ID. */
1799             sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
1800             if (nodekey == NULL) {
1801                 rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error.");
1802                 decrRefCount(o);
1803                 return NULL;
1804             }
1805             if (sdslen(nodekey) != sizeof(streamID)) {
1806                 rdbExitReportCorruptRDB("Stream node key entry is not the "
1807                                         "size of a stream ID");
1808             }
1809 
1810             /* Load the listpack. */
1811             unsigned char *lp =
1812                 rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
1813             if (lp == NULL) {
1814                 rdbReportReadError("Stream listpacks loading failed.");
1815                 sdsfree(nodekey);
1816                 decrRefCount(o);
1817                 return NULL;
1818             }
1819             unsigned char *first = lpFirst(lp);
1820             if (first == NULL) {
1821                 /* Serialized listpacks should never be empty, since on
1822                  * deletion we should remove the radix tree key if the
1823                  * resulting listpack is empty. */
1824                 rdbExitReportCorruptRDB("Empty listpack inside stream");
1825             }
1826 
1827             /* Insert the key in the radix tree. */
1828             int retval = raxInsert(s->rax,
1829                 (unsigned char*)nodekey,sizeof(streamID),lp,NULL);
1830             sdsfree(nodekey);
1831             if (!retval)
1832                 rdbExitReportCorruptRDB("Listpack re-added with existing key");
1833         }
1834         /* Load total number of items inside the stream. */
1835         s->length = rdbLoadLen(rdb,NULL);
1836 
1837         /* Load the last entry ID. */
1838         s->last_id.ms = rdbLoadLen(rdb,NULL);
1839         s->last_id.seq = rdbLoadLen(rdb,NULL);
1840 
1841         if (rioGetReadError(rdb)) {
1842             rdbReportReadError("Stream object metadata loading failed.");
1843             decrRefCount(o);
1844             return NULL;
1845         }
1846 
1847         /* Consumer groups loading */
1848         uint64_t cgroups_count = rdbLoadLen(rdb,NULL);
1849         if (cgroups_count == RDB_LENERR) {
1850             rdbReportReadError("Stream cgroup count loading failed.");
1851             decrRefCount(o);
1852             return NULL;
1853         }
1854         while(cgroups_count--) {
1855             /* Get the consumer group name and ID. We can then create the
1856              * consumer group ASAP and populate its structure as
1857              * we read more data. */
1858             streamID cg_id;
1859             sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
1860             if (cgname == NULL) {
1861                 rdbReportReadError(
1862                     "Error reading the consumer group name from Stream");
1863                 decrRefCount(o);
1864                 return NULL;
1865             }
1866 
1867             cg_id.ms = rdbLoadLen(rdb,NULL);
1868             cg_id.seq = rdbLoadLen(rdb,NULL);
1869             if (rioGetReadError(rdb)) {
1870                 rdbReportReadError("Stream cgroup ID loading failed.");
1871                 sdsfree(cgname);
1872                 decrRefCount(o);
1873                 return NULL;
1874             }
1875 
1876             streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
1877             if (cgroup == NULL)
1878                 rdbExitReportCorruptRDB("Duplicated consumer group name %s",
1879                                          cgname);
1880             sdsfree(cgname);
1881 
1882             /* Load the global PEL for this consumer group, however we'll
1883              * not yet populate the NACK structures with the message
1884              * owner, since consumers for this group and their messages will
1885              * be read as a next step. So for now leave them not resolved
1886              * and later populate it. */
1887             uint64_t pel_size = rdbLoadLen(rdb,NULL);
1888             if (pel_size == RDB_LENERR) {
1889                 rdbReportReadError("Stream PEL size loading failed.");
1890                 decrRefCount(o);
1891                 return NULL;
1892             }
1893             while(pel_size--) {
1894                 unsigned char rawid[sizeof(streamID)];
1895                 if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
1896                     rdbReportReadError("Stream PEL ID loading failed.");
1897                     decrRefCount(o);
1898                     return NULL;
1899                 }
1900                 streamNACK *nack = streamCreateNACK(NULL);
1901                 nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
1902                 nack->delivery_count = rdbLoadLen(rdb,NULL);
1903                 if (rioGetReadError(rdb)) {
1904                     rdbReportReadError("Stream PEL NACK loading failed.");
1905                     decrRefCount(o);
1906                     streamFreeNACK(nack);
1907                     return NULL;
1908                 }
1909                 if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL))
1910                     rdbExitReportCorruptRDB("Duplicated gobal PEL entry "
1911                                             "loading stream consumer group");
1912             }
1913 
1914             /* Now that we loaded our global PEL, we need to load the
1915              * consumers and their local PELs. */
1916             uint64_t consumers_num = rdbLoadLen(rdb,NULL);
1917             if (consumers_num == RDB_LENERR) {
1918                 rdbReportReadError("Stream consumers num loading failed.");
1919                 decrRefCount(o);
1920                 return NULL;
1921             }
1922             while(consumers_num--) {
1923                 sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
1924                 if (cname == NULL) {
1925                     rdbReportReadError(
1926                         "Error reading the consumer name from Stream group.");
1927                     decrRefCount(o);
1928                     return NULL;
1929                 }
1930                 streamConsumer *consumer =
1931                     streamLookupConsumer(cgroup,cname,SLC_NONE);
1932                 sdsfree(cname);
1933                 consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
1934                 if (rioGetReadError(rdb)) {
1935                     rdbReportReadError("Stream short read reading seen time.");
1936                     decrRefCount(o);
1937                     return NULL;
1938                 }
1939 
1940                 /* Load the PEL about entries owned by this specific
1941                  * consumer. */
1942                 pel_size = rdbLoadLen(rdb,NULL);
1943                 if (pel_size == RDB_LENERR) {
1944                     rdbReportReadError(
1945                         "Stream consumer PEL num loading failed.");
1946                     decrRefCount(o);
1947                     return NULL;
1948                 }
1949                 while(pel_size--) {
1950                     unsigned char rawid[sizeof(streamID)];
1951                     if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
1952                         rdbReportReadError(
1953                             "Stream short read reading PEL streamID.");
1954                         decrRefCount(o);
1955                         return NULL;
1956                     }
1957                     streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
1958                     if (nack == raxNotFound)
1959                         rdbExitReportCorruptRDB("Consumer entry not found in "
1960                                                 "group global PEL");
1961 
1962                     /* Set the NACK consumer, that was left to NULL when
1963                      * loading the global PEL. Then set the same shared
1964                      * NACK structure also in the consumer-specific PEL. */
1965                     nack->consumer = consumer;
1966                     if (!raxInsert(consumer->pel,rawid,sizeof(rawid),nack,NULL))
1967                         rdbExitReportCorruptRDB("Duplicated consumer PEL entry "
1968                                                 " loading a stream consumer "
1969                                                 "group");
1970                 }
1971             }
1972         }
1973     } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
1974         uint64_t moduleid = rdbLoadLen(rdb,NULL);
1975         if (rioGetReadError(rdb)) {
1976             rdbReportReadError("Short read module id");
1977             return NULL;
1978         }
1979         moduleType *mt = moduleTypeLookupModuleByID(moduleid);
1980         char name[10];
1981 
1982         if (rdbCheckMode && rdbtype == RDB_TYPE_MODULE_2) {
1983             moduleTypeNameByID(name,moduleid);
1984             return rdbLoadCheckModuleValue(rdb,name);
1985         }
1986 
1987         if (mt == NULL) {
1988             moduleTypeNameByID(name,moduleid);
1989             serverLog(LL_WARNING,"The RDB file contains module data I can't load: no matching module '%s'", name);
1990             exit(1);
1991         }
1992         RedisModuleIO io;
1993         robj keyobj;
1994         initStaticStringObject(keyobj,key);
1995         moduleInitIOContext(io,mt,rdb,&keyobj);
1996         io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
1997         /* Call the rdb_load method of the module providing the 10 bit
1998          * encoding version in the lower 10 bits of the module ID. */
1999         void *ptr = mt->rdb_load(&io,moduleid&1023);
2000         if (io.ctx) {
2001             moduleFreeContext(io.ctx);
2002             zfree(io.ctx);
2003         }
2004 
2005         /* Module v2 serialization has an EOF mark at the end. */
2006         if (io.ver == 2) {
2007             uint64_t eof = rdbLoadLen(rdb,NULL);
2008             if (eof == RDB_LENERR) {
2009                 o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */
2010                 decrRefCount(o);
2011                 return NULL;
2012             }
2013             if (eof != RDB_MODULE_OPCODE_EOF) {
2014                 serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name);
2015                 exit(1);
2016             }
2017         }
2018 
2019         if (ptr == NULL) {
2020             moduleTypeNameByID(name,moduleid);
2021             serverLog(LL_WARNING,"The RDB file contains module data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
2022             exit(1);
2023         }
2024         o = createModuleObject(mt,ptr);
2025     } else {
2026         rdbReportReadError("Unknown RDB encoding type %d",rdbtype);
2027         return NULL;
2028     }
2029     return o;
2030 }
2031 
2032 /* Mark that we are loading in the global state and setup the fields
2033  * needed to provide loading stats. */
startLoading(size_t size,int rdbflags)2034 void startLoading(size_t size, int rdbflags) {
2035     /* Load the DB */
2036     server.loading = 1;
2037     server.loading_start_time = time(NULL);
2038     server.loading_loaded_bytes = 0;
2039     server.loading_total_bytes = size;
2040 
2041     /* Fire the loading modules start event. */
2042     int subevent;
2043     if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
2044         subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START;
2045     else if(rdbflags & RDBFLAGS_REPLICATION)
2046         subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START;
2047     else
2048         subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START;
2049     moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL);
2050 }
2051 
2052 /* Mark that we are loading in the global state and setup the fields
2053  * needed to provide loading stats.
2054  * 'filename' is optional and used for rdb-check on error */
startLoadingFile(FILE * fp,char * filename,int rdbflags)2055 void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
2056     struct stat sb;
2057     if (fstat(fileno(fp), &sb) == -1)
2058         sb.st_size = 0;
2059     rdbFileBeingLoaded = filename;
2060     startLoading(sb.st_size, rdbflags);
2061 }
2062 
2063 /* Refresh the loading progress info */
loadingProgress(off_t pos)2064 void loadingProgress(off_t pos) {
2065     server.loading_loaded_bytes = pos;
2066     if (server.stat_peak_memory < zmalloc_used_memory())
2067         server.stat_peak_memory = zmalloc_used_memory();
2068 }
2069 
2070 /* Loading finished */
stopLoading(int success)2071 void stopLoading(int success) {
2072     server.loading = 0;
2073     rdbFileBeingLoaded = NULL;
2074 
2075     /* Fire the loading modules end event. */
2076     moduleFireServerEvent(REDISMODULE_EVENT_LOADING,
2077                           success?
2078                             REDISMODULE_SUBEVENT_LOADING_ENDED:
2079                             REDISMODULE_SUBEVENT_LOADING_FAILED,
2080                           NULL);
2081 }
2082 
startSaving(int rdbflags)2083 void startSaving(int rdbflags) {
2084     /* Fire the persistence modules end event. */
2085     int subevent;
2086     if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
2087         subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
2088     else if (getpid()!=server.pid)
2089         subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
2090     else
2091         subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
2092     moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
2093 }
2094 
stopSaving(int success)2095 void stopSaving(int success) {
2096     /* Fire the persistence modules end event. */
2097     moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,
2098                           success?
2099                             REDISMODULE_SUBEVENT_PERSISTENCE_ENDED:
2100                             REDISMODULE_SUBEVENT_PERSISTENCE_FAILED,
2101                           NULL);
2102 }
2103 
2104 /* Track loading progress in order to serve client's from time to time
2105    and if needed calculate rdb checksum  */
rdbLoadProgressCallback(rio * r,const void * buf,size_t len)2106 void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
2107     if (server.rdb_checksum)
2108         rioGenericUpdateChecksum(r, buf, len);
2109     if (server.loading_process_events_interval_bytes &&
2110         (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes)
2111     {
2112         /* The DB can take some non trivial amount of time to load. Update
2113          * our cached time since it is used to create and update the last
2114          * interaction time with clients and for other important things. */
2115         updateCachedTime(0);
2116         if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
2117             replicationSendNewlineToMaster();
2118         loadingProgress(r->processed_bytes);
2119         processEventsWhileBlocked();
2120         processModuleLoadingProgressEvent(0);
2121     }
2122 }
2123 
2124 /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
2125  * otherwise C_ERR is returned and 'errno' is set accordingly. */
rdbLoadRio(rio * rdb,int rdbflags,rdbSaveInfo * rsi)2126 int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
2127     uint64_t dbid;
2128     int type, rdbver;
2129     redisDb *db = server.db+0;
2130     char buf[1024];
2131 
2132     rdb->update_cksum = rdbLoadProgressCallback;
2133     rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
2134     if (rioRead(rdb,buf,9) == 0) goto eoferr;
2135     buf[9] = '\0';
2136     if (memcmp(buf,"REDIS",5) != 0) {
2137         serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
2138         errno = EINVAL;
2139         return C_ERR;
2140     }
2141     rdbver = atoi(buf+5);
2142     if (rdbver < 1 || rdbver > RDB_VERSION) {
2143         serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
2144         errno = EINVAL;
2145         return C_ERR;
2146     }
2147 
2148     /* Key-specific attributes, set by opcodes before the key type. */
2149     long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
2150     long long lru_clock = LRU_CLOCK();
2151 
2152     while(1) {
2153         sds key;
2154         robj *val;
2155 
2156         /* Read type. */
2157         if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
2158 
2159         /* Handle special types. */
2160         if (type == RDB_OPCODE_EXPIRETIME) {
2161             /* EXPIRETIME: load an expire associated with the next key
2162              * to load. Note that after loading an expire we need to
2163              * load the actual type, and continue. */
2164             expiretime = rdbLoadTime(rdb);
2165             expiretime *= 1000;
2166             if (rioGetReadError(rdb)) goto eoferr;
2167             continue; /* Read next opcode. */
2168         } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
2169             /* EXPIRETIME_MS: milliseconds precision expire times introduced
2170              * with RDB v3. Like EXPIRETIME but no with more precision. */
2171             expiretime = rdbLoadMillisecondTime(rdb,rdbver);
2172             if (rioGetReadError(rdb)) goto eoferr;
2173             continue; /* Read next opcode. */
2174         } else if (type == RDB_OPCODE_FREQ) {
2175             /* FREQ: LFU frequency. */
2176             uint8_t byte;
2177             if (rioRead(rdb,&byte,1) == 0) goto eoferr;
2178             lfu_freq = byte;
2179             continue; /* Read next opcode. */
2180         } else if (type == RDB_OPCODE_IDLE) {
2181             /* IDLE: LRU idle time. */
2182             uint64_t qword;
2183             if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
2184             lru_idle = qword;
2185             continue; /* Read next opcode. */
2186         } else if (type == RDB_OPCODE_EOF) {
2187             /* EOF: End of file, exit the main loop. */
2188             break;
2189         } else if (type == RDB_OPCODE_SELECTDB) {
2190             /* SELECTDB: Select the specified database. */
2191             if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
2192             if (dbid >= (unsigned)server.dbnum) {
2193                 serverLog(LL_WARNING,
2194                     "FATAL: Data file was created with a Redis "
2195                     "server configured to handle more than %d "
2196                     "databases. Exiting\n", server.dbnum);
2197                 exit(1);
2198             }
2199             db = server.db+dbid;
2200             continue; /* Read next opcode. */
2201         } else if (type == RDB_OPCODE_RESIZEDB) {
2202             /* RESIZEDB: Hint about the size of the keys in the currently
2203              * selected data base, in order to avoid useless rehashing. */
2204             uint64_t db_size, expires_size;
2205             if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
2206                 goto eoferr;
2207             if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
2208                 goto eoferr;
2209             dictExpand(db->dict,db_size);
2210             dictExpand(db->expires,expires_size);
2211             continue; /* Read next opcode. */
2212         } else if (type == RDB_OPCODE_AUX) {
2213             /* AUX: generic string-string fields. Use to add state to RDB
2214              * which is backward compatible. Implementations of RDB loading
2215              * are required to skip AUX fields they don't understand.
2216              *
2217              * An AUX field is composed of two strings: key and value. */
2218             robj *auxkey, *auxval;
2219             if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
2220             if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
2221 
2222             if (((char*)auxkey->ptr)[0] == '%') {
2223                 /* All the fields with a name staring with '%' are considered
2224                  * information fields and are logged at startup with a log
2225                  * level of NOTICE. */
2226                 serverLog(LL_NOTICE,"RDB '%s': %s",
2227                     (char*)auxkey->ptr,
2228                     (char*)auxval->ptr);
2229             } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
2230                 if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
2231             } else if (!strcasecmp(auxkey->ptr,"repl-id")) {
2232                 if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) {
2233                     memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1);
2234                     rsi->repl_id_is_set = 1;
2235                 }
2236             } else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
2237                 if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
2238             } else if (!strcasecmp(auxkey->ptr,"lua")) {
2239                 /* Load the script back in memory. */
2240                 if (luaCreateFunction(NULL,server.lua,auxval) == NULL) {
2241                     rdbExitReportCorruptRDB(
2242                         "Can't load Lua script from RDB file! "
2243                         "BODY: %s", (char*)auxval->ptr);
2244                 }
2245             } else if (!strcasecmp(auxkey->ptr,"redis-ver")) {
2246                 serverLog(LL_NOTICE,"Loading RDB produced by version %s",
2247                     (char*)auxval->ptr);
2248             } else if (!strcasecmp(auxkey->ptr,"ctime")) {
2249                 time_t age = time(NULL)-strtol(auxval->ptr,NULL,10);
2250                 if (age < 0) age = 0;
2251                 serverLog(LL_NOTICE,"RDB age %ld seconds",
2252                     (unsigned long) age);
2253             } else if (!strcasecmp(auxkey->ptr,"used-mem")) {
2254                 long long usedmem = strtoll(auxval->ptr,NULL,10);
2255                 serverLog(LL_NOTICE,"RDB memory usage when created %.2f Mb",
2256                     (double) usedmem / (1024*1024));
2257             } else if (!strcasecmp(auxkey->ptr,"aof-preamble")) {
2258                 long long haspreamble = strtoll(auxval->ptr,NULL,10);
2259                 if (haspreamble) serverLog(LL_NOTICE,"RDB has an AOF tail");
2260             } else if (!strcasecmp(auxkey->ptr,"redis-bits")) {
2261                 /* Just ignored. */
2262             } else {
2263                 /* We ignore fields we don't understand, as by AUX field
2264                  * contract. */
2265                 serverLog(LL_DEBUG,"Unrecognized RDB AUX field: '%s'",
2266                     (char*)auxkey->ptr);
2267             }
2268 
2269             decrRefCount(auxkey);
2270             decrRefCount(auxval);
2271             continue; /* Read type again. */
2272         } else if (type == RDB_OPCODE_MODULE_AUX) {
2273             /* Load module data that is not related to the Redis key space.
2274              * Such data can be potentially be stored both before and after the
2275              * RDB keys-values section. */
2276             uint64_t moduleid = rdbLoadLen(rdb,NULL);
2277             int when_opcode = rdbLoadLen(rdb,NULL);
2278             int when = rdbLoadLen(rdb,NULL);
2279             if (rioGetReadError(rdb)) goto eoferr;
2280             if (when_opcode != RDB_MODULE_OPCODE_UINT) {
2281                 rdbReportReadError("bad when_opcode");
2282                 goto eoferr;
2283             }
2284             moduleType *mt = moduleTypeLookupModuleByID(moduleid);
2285             char name[10];
2286             moduleTypeNameByID(name,moduleid);
2287 
2288             if (!rdbCheckMode && mt == NULL) {
2289                 /* Unknown module. */
2290                 serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
2291                 exit(1);
2292             } else if (!rdbCheckMode && mt != NULL) {
2293                 if (!mt->aux_load) {
2294                     /* Module doesn't support AUX. */
2295                     serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name);
2296                     exit(1);
2297                 }
2298 
2299                 RedisModuleIO io;
2300                 moduleInitIOContext(io,mt,rdb,NULL);
2301                 io.ver = 2;
2302                 /* Call the rdb_load method of the module providing the 10 bit
2303                  * encoding version in the lower 10 bits of the module ID. */
2304                 if (mt->aux_load(&io,moduleid&1023, when) != REDISMODULE_OK || io.error) {
2305                     moduleTypeNameByID(name,moduleid);
2306                     serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
2307                     goto eoferr;
2308                 }
2309                 if (io.ctx) {
2310                     moduleFreeContext(io.ctx);
2311                     zfree(io.ctx);
2312                 }
2313                 uint64_t eof = rdbLoadLen(rdb,NULL);
2314                 if (eof != RDB_MODULE_OPCODE_EOF) {
2315                     serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name);
2316                     goto eoferr;
2317                 }
2318                 continue;
2319             } else {
2320                 /* RDB check mode. */
2321                 robj *aux = rdbLoadCheckModuleValue(rdb,name);
2322                 decrRefCount(aux);
2323                 continue; /* Read next opcode. */
2324             }
2325         }
2326 
2327         /* Read key */
2328         if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
2329             goto eoferr;
2330         /* Read value */
2331         if ((val = rdbLoadObject(type,rdb,key)) == NULL) {
2332             sdsfree(key);
2333             goto eoferr;
2334         }
2335 
2336         /* Check if the key already expired. This function is used when loading
2337          * an RDB file from disk, either at startup, or when an RDB was
2338          * received from the master. In the latter case, the master is
2339          * responsible for key expiry. If we would expire keys here, the
2340          * snapshot taken by the master may not be reflected on the slave.
2341          * Similarly if the RDB is the preamble of an AOF file, we want to
2342          * load all the keys as they are, since the log of operations later
2343          * assume to work in an exact keyspace state. */
2344         if (iAmMaster() &&
2345             !(rdbflags&RDBFLAGS_AOF_PREAMBLE) &&
2346             expiretime != -1 && expiretime < now)
2347         {
2348             sdsfree(key);
2349             decrRefCount(val);
2350         } else {
2351             robj keyobj;
2352             initStaticStringObject(keyobj,key);
2353 
2354             /* Add the new object in the hash table */
2355             int added = dbAddRDBLoad(db,key,val);
2356             if (!added) {
2357                 if (rdbflags & RDBFLAGS_ALLOW_DUP) {
2358                     /* This flag is useful for DEBUG RELOAD special modes.
2359                      * When it's set we allow new keys to replace the current
2360                      * keys with the same name. */
2361                     dbSyncDelete(db,&keyobj);
2362                     dbAddRDBLoad(db,key,val);
2363                 } else {
2364                     serverLog(LL_WARNING,
2365                         "RDB has duplicated key '%s' in DB %d",key,db->id);
2366                     serverPanic("Duplicated key found in RDB file");
2367                 }
2368             }
2369 
2370             /* Set the expire time if needed */
2371             if (expiretime != -1) {
2372                 setExpire(NULL,db,&keyobj,expiretime);
2373             }
2374 
2375             /* Set usage information (for eviction). */
2376             objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);
2377 
2378             /* call key space notification on key loaded for modules only */
2379             moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id);
2380         }
2381 
2382         /* Loading the database more slowly is useful in order to test
2383          * certain edge cases. */
2384         if (server.key_load_delay) usleep(server.key_load_delay);
2385 
2386         /* Reset the state that is key-specified and is populated by
2387          * opcodes before the key, so that we start from scratch again. */
2388         expiretime = -1;
2389         lfu_freq = -1;
2390         lru_idle = -1;
2391     }
2392     /* Verify the checksum if RDB version is >= 5 */
2393     if (rdbver >= 5) {
2394         uint64_t cksum, expected = rdb->cksum;
2395 
2396         if (rioRead(rdb,&cksum,8) == 0) goto eoferr;
2397         if (server.rdb_checksum) {
2398             memrev64ifbe(&cksum);
2399             if (cksum == 0) {
2400                 serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
2401             } else if (cksum != expected) {
2402                 serverLog(LL_WARNING,"Wrong RDB checksum expected: (%llx) but "
2403                     "got (%llx). Aborting now.",
2404                         (unsigned long long)expected,
2405                         (unsigned long long)cksum);
2406                 rdbExitReportCorruptRDB("RDB CRC error");
2407             }
2408         }
2409     }
2410     return C_OK;
2411 
2412     /* Unexpected end of file is handled here calling rdbReportReadError():
2413      * this will in turn either abort Redis in most cases, or if we are loading
2414      * the RDB file from a socket during initial SYNC (diskless replica mode),
2415      * we'll report the error to the caller, so that we can retry. */
2416 eoferr:
2417     serverLog(LL_WARNING,
2418         "Short read or OOM loading DB. Unrecoverable error, aborting now.");
2419     rdbReportReadError("Unexpected EOF reading RDB file");
2420     return C_ERR;
2421 }
2422 
2423 /* Like rdbLoadRio() but takes a filename instead of a rio stream. The
2424  * filename is open for reading and a rio stream object created in order
2425  * to do the actual loading. Moreover the ETA displayed in the INFO
2426  * output is initialized and finalized.
2427  *
2428  * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
2429  * loading code will fiil the information fields in the structure. */
rdbLoad(char * filename,rdbSaveInfo * rsi,int rdbflags)2430 int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
2431     FILE *fp;
2432     rio rdb;
2433     int retval;
2434 
2435     if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
2436     startLoadingFile(fp, filename,rdbflags);
2437     rioInitWithFile(&rdb,fp);
2438     retval = rdbLoadRio(&rdb,rdbflags,rsi);
2439     fclose(fp);
2440     stopLoading(retval==C_OK);
2441     return retval;
2442 }
2443 
2444 /* A background saving child (BGSAVE) terminated its work. Handle this.
2445  * This function covers the case of actual BGSAVEs. */
backgroundSaveDoneHandlerDisk(int exitcode,int bysignal)2446 static void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
2447     if (!bysignal && exitcode == 0) {
2448         serverLog(LL_NOTICE,
2449             "Background saving terminated with success");
2450         server.dirty = server.dirty - server.dirty_before_bgsave;
2451         server.lastsave = time(NULL);
2452         server.lastbgsave_status = C_OK;
2453     } else if (!bysignal && exitcode != 0) {
2454         serverLog(LL_WARNING, "Background saving error");
2455         server.lastbgsave_status = C_ERR;
2456     } else {
2457         mstime_t latency;
2458 
2459         serverLog(LL_WARNING,
2460             "Background saving terminated by signal %d", bysignal);
2461         latencyStartMonitor(latency);
2462         rdbRemoveTempFile(server.rdb_child_pid, 0);
2463         latencyEndMonitor(latency);
2464         latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
2465         /* SIGUSR1 is whitelisted, so we have a way to kill a child without
2466          * triggering an error condition. */
2467         if (bysignal != SIGUSR1)
2468             server.lastbgsave_status = C_ERR;
2469     }
2470 }
2471 
2472 /* A background saving child (BGSAVE) terminated its work. Handle this.
2473  * This function covers the case of RDB -> Slaves socket transfers for
2474  * diskless replication. */
backgroundSaveDoneHandlerSocket(int exitcode,int bysignal)2475 static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
2476     if (!bysignal && exitcode == 0) {
2477         serverLog(LL_NOTICE,
2478             "Background RDB transfer terminated with success");
2479     } else if (!bysignal && exitcode != 0) {
2480         serverLog(LL_WARNING, "Background transfer error");
2481     } else {
2482         serverLog(LL_WARNING,
2483             "Background transfer terminated by signal %d", bysignal);
2484     }
2485     if (server.rdb_child_exit_pipe!=-1)
2486         close(server.rdb_child_exit_pipe);
2487     aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
2488     close(server.rdb_pipe_read);
2489     server.rdb_child_exit_pipe = -1;
2490     server.rdb_pipe_read = -1;
2491     zfree(server.rdb_pipe_conns);
2492     server.rdb_pipe_conns = NULL;
2493     server.rdb_pipe_numconns = 0;
2494     server.rdb_pipe_numconns_writing = 0;
2495     zfree(server.rdb_pipe_buff);
2496     server.rdb_pipe_buff = NULL;
2497     server.rdb_pipe_bufflen = 0;
2498 }
2499 
2500 /* When a background RDB saving/transfer terminates, call the right handler. */
backgroundSaveDoneHandler(int exitcode,int bysignal)2501 void backgroundSaveDoneHandler(int exitcode, int bysignal) {
2502     int type = server.rdb_child_type;
2503     switch(server.rdb_child_type) {
2504     case RDB_CHILD_TYPE_DISK:
2505         backgroundSaveDoneHandlerDisk(exitcode,bysignal);
2506         break;
2507     case RDB_CHILD_TYPE_SOCKET:
2508         backgroundSaveDoneHandlerSocket(exitcode,bysignal);
2509         break;
2510     default:
2511         serverPanic("Unknown RDB child type.");
2512         break;
2513     }
2514 
2515     server.rdb_child_pid = -1;
2516     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
2517     server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
2518     server.rdb_save_time_start = -1;
2519     /* Possibly there are slaves waiting for a BGSAVE in order to be served
2520      * (the first stage of SYNC is a bulk transfer of dump.rdb) */
2521     updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, type);
2522 }
2523 
2524 /* Kill the RDB saving child using SIGUSR1 (so that the parent will know
2525  * the child did not exit for an error, but because we wanted), and performs
2526  * the cleanup needed. */
killRDBChild(void)2527 void killRDBChild(void) {
2528     kill(server.rdb_child_pid,SIGUSR1);
2529     rdbRemoveTempFile(server.rdb_child_pid, 0);
2530     closeChildInfoPipe();
2531     updateDictResizePolicy();
2532 }
2533 
2534 /* Spawn an RDB child that writes the RDB to the sockets of the slaves
2535  * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
rdbSaveToSlavesSockets(rdbSaveInfo * rsi)2536 int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
2537     listNode *ln;
2538     listIter li;
2539     pid_t childpid;
2540     int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;
2541 
2542     if (hasActiveChildProcess()) return C_ERR;
2543 
2544     /* Even if the previous fork child exited, don't start a new one until we
2545      * drained the pipe. */
2546     if (server.rdb_pipe_conns) return C_ERR;
2547 
2548     /* Before to fork, create a pipe that is used to transfer the rdb bytes to
2549      * the parent, we can't let it write directly to the sockets, since in case
2550      * of TLS we must let the parent handle a continuous TLS state when the
2551      * child terminates and parent takes over. */
2552     if (pipe(pipefds) == -1) return C_ERR;
2553     server.rdb_pipe_read = pipefds[0]; /* read end */
2554     rdb_pipe_write = pipefds[1]; /* write end */
2555     anetNonBlock(NULL, server.rdb_pipe_read);
2556 
2557     /* create another pipe that is used by the parent to signal to the child
2558      * that it can exit. */
2559     if (pipe(pipefds) == -1) {
2560         close(rdb_pipe_write);
2561         close(server.rdb_pipe_read);
2562         return C_ERR;
2563     }
2564     safe_to_exit_pipe = pipefds[0]; /* read end */
2565     server.rdb_child_exit_pipe = pipefds[1]; /* write end */
2566 
2567     /* Collect the connections of the replicas we want to transfer
2568      * the RDB to, which are i WAIT_BGSAVE_START state. */
2569     server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
2570     server.rdb_pipe_numconns = 0;
2571     server.rdb_pipe_numconns_writing = 0;
2572     listRewind(server.slaves,&li);
2573     while((ln = listNext(&li))) {
2574         client *slave = ln->value;
2575         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
2576             server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
2577             replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
2578         }
2579     }
2580 
2581     /* Create the child process. */
2582     openChildInfoPipe();
2583     if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
2584         /* Child */
2585         int retval, dummy;
2586         rio rdb;
2587 
2588         rioInitWithFd(&rdb,rdb_pipe_write);
2589 
2590         redisSetProcTitle("redis-rdb-to-slaves");
2591         redisSetCpuAffinity(server.bgsave_cpulist);
2592 
2593         retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
2594         if (retval == C_OK && rioFlush(&rdb) == 0)
2595             retval = C_ERR;
2596 
2597         if (retval == C_OK) {
2598             sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
2599         }
2600 
2601         rioFreeFd(&rdb);
2602         /* wake up the reader, tell it we're done. */
2603         close(rdb_pipe_write);
2604         close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
2605         /* hold exit until the parent tells us it's safe. we're not expecting
2606          * to read anything, just get the error when the pipe is closed. */
2607         dummy = read(safe_to_exit_pipe, pipefds, 1);
2608         UNUSED(dummy);
2609         exitFromChild((retval == C_OK) ? 0 : 1);
2610     } else {
2611         /* Parent */
2612         close(safe_to_exit_pipe);
2613         if (childpid == -1) {
2614             serverLog(LL_WARNING,"Can't save in background: fork: %s",
2615                 strerror(errno));
2616 
2617             /* Undo the state change. The caller will perform cleanup on
2618              * all the slaves in BGSAVE_START state, but an early call to
2619              * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
2620             listRewind(server.slaves,&li);
2621             while((ln = listNext(&li))) {
2622                 client *slave = ln->value;
2623                 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
2624                     slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
2625                 }
2626             }
2627             close(rdb_pipe_write);
2628             close(server.rdb_pipe_read);
2629             zfree(server.rdb_pipe_conns);
2630             server.rdb_pipe_conns = NULL;
2631             server.rdb_pipe_numconns = 0;
2632             server.rdb_pipe_numconns_writing = 0;
2633             closeChildInfoPipe();
2634         } else {
2635             serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
2636                 childpid);
2637             server.rdb_save_time_start = time(NULL);
2638             server.rdb_child_pid = childpid;
2639             server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
2640             updateDictResizePolicy();
2641             close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
2642             if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
2643                 serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
2644             }
2645         }
2646         return (childpid == -1) ? C_ERR : C_OK;
2647     }
2648     return C_OK; /* Unreached. */
2649 }
2650 
saveCommand(client * c)2651 void saveCommand(client *c) {
2652     if (server.rdb_child_pid != -1) {
2653         addReplyError(c,"Background save already in progress");
2654         return;
2655     }
2656     rdbSaveInfo rsi, *rsiptr;
2657     rsiptr = rdbPopulateSaveInfo(&rsi);
2658     if (rdbSave(server.rdb_filename,rsiptr) == C_OK) {
2659         addReply(c,shared.ok);
2660     } else {
2661         addReply(c,shared.err);
2662     }
2663 }
2664 
2665 /* BGSAVE [SCHEDULE] */
bgsaveCommand(client * c)2666 void bgsaveCommand(client *c) {
2667     int schedule = 0;
2668 
2669     /* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite
2670      * is in progress. Instead of returning an error a BGSAVE gets scheduled. */
2671     if (c->argc > 1) {
2672         if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
2673             schedule = 1;
2674         } else {
2675             addReply(c,shared.syntaxerr);
2676             return;
2677         }
2678     }
2679 
2680     rdbSaveInfo rsi, *rsiptr;
2681     rsiptr = rdbPopulateSaveInfo(&rsi);
2682 
2683     if (server.rdb_child_pid != -1) {
2684         addReplyError(c,"Background save already in progress");
2685     } else if (hasActiveChildProcess()) {
2686         if (schedule) {
2687             server.rdb_bgsave_scheduled = 1;
2688             addReplyStatus(c,"Background saving scheduled");
2689         } else {
2690             addReplyError(c,
2691             "Another child process is active (AOF?): can't BGSAVE right now. "
2692             "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
2693             "possible.");
2694         }
2695     } else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {
2696         addReplyStatus(c,"Background saving started");
2697     } else {
2698         addReply(c,shared.err);
2699     }
2700 }
2701 
2702 /* Populate the rdbSaveInfo structure used to persist the replication
2703  * information inside the RDB file. Currently the structure explicitly
2704  * contains just the currently selected DB from the master stream, however
2705  * if the rdbSave*() family functions receive a NULL rsi structure also
2706  * the Replication ID/offset is not saved. The function popultes 'rsi'
2707  * that is normally stack-allocated in the caller, returns the populated
2708  * pointer if the instance has a valid master client, otherwise NULL
2709  * is returned, and the RDB saving will not persist any replication related
2710  * information. */
rdbPopulateSaveInfo(rdbSaveInfo * rsi)2711 rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
2712     rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT;
2713     *rsi = rsi_init;
2714 
2715     /* If the instance is a master, we can populate the replication info
2716      * only when repl_backlog is not NULL. If the repl_backlog is NULL,
2717      * it means that the instance isn't in any replication chains. In this
2718      * scenario the replication info is useless, because when a slave
2719      * connects to us, the NULL repl_backlog will trigger a full
2720      * synchronization, at the same time we will use a new replid and clear
2721      * replid2. */
2722     if (!server.masterhost && server.repl_backlog) {
2723         /* Note that when server.slaveseldb is -1, it means that this master
2724          * didn't apply any write commands after a full synchronization.
2725          * So we can let repl_stream_db be 0, this allows a restarted slave
2726          * to reload replication ID/offset, it's safe because the next write
2727          * command must generate a SELECT statement. */
2728         rsi->repl_stream_db = server.slaveseldb == -1 ? 0 : server.slaveseldb;
2729         return rsi;
2730     }
2731 
2732     /* If the instance is a slave we need a connected master
2733      * in order to fetch the currently selected DB. */
2734     if (server.master) {
2735         rsi->repl_stream_db = server.master->db->id;
2736         return rsi;
2737     }
2738 
2739     /* If we have a cached master we can use it in order to populate the
2740      * replication selected DB info inside the RDB file: the slave can
2741      * increment the master_repl_offset only from data arriving from the
2742      * master, so if we are disconnected the offset in the cached master
2743      * is valid. */
2744     if (server.cached_master) {
2745         rsi->repl_stream_db = server.cached_master->db->id;
2746         return rsi;
2747     }
2748     return NULL;
2749 }
2750