1 /* rio.c is a simple stream-oriented I/O abstraction that provides an interface
2  * to write code that can consume/produce data using different concrete input
3  * and output devices. For instance the same rdb.c code using the rio
4  * abstraction can be used to read and write the RDB format using in-memory
5  * buffers or files.
6  *
7  * A rio object provides the following methods:
8  *  read: read from stream.
9  *  write: write to stream.
10  *  tell: get the current offset.
11  *
12  * It is also possible to set a 'checksum' method that is used by rio.c in order
13  * to compute a checksum of the data written or read, or to query the rio object
14  * for the current checksum.
15  *
16  * ----------------------------------------------------------------------------
17  *
18  * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
19  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
20  * All rights reserved.
21  *
22  * Redistribution and use in source and binary forms, with or without
23  * modification, are permitted provided that the following conditions are met:
24  *
25  *   * Redistributions of source code must retain the above copyright notice,
26  *     this list of conditions and the following disclaimer.
27  *   * Redistributions in binary form must reproduce the above copyright
28  *     notice, this list of conditions and the following disclaimer in the
29  *     documentation and/or other materials provided with the distribution.
30  *   * Neither the name of Redis nor the names of its contributors may be used
31  *     to endorse or promote products derived from this software without
32  *     specific prior written permission.
33  *
34  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
35  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
38  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
39  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
40  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
41  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
42  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
43  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
44  * POSSIBILITY OF SUCH DAMAGE.
45  */
46 
47 
48 #include "fmacros.h"
49 #include <string.h>
50 #include <stdio.h>
51 #include <unistd.h>
52 #include "rio.h"
53 #include "util.h"
54 #include "crc64.h"
55 #include "config.h"
56 #include "server.h"
57 
58 /* ------------------------- Buffer I/O implementation ----------------------- */
59 
60 /* Returns 1 or 0 for success/failure. */
rioBufferWrite(rio * r,const void * buf,size_t len)61 static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
62     r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
63     r->io.buffer.pos += len;
64     return 1;
65 }
66 
67 /* Returns 1 or 0 for success/failure. */
rioBufferRead(rio * r,void * buf,size_t len)68 static size_t rioBufferRead(rio *r, void *buf, size_t len) {
69     if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
70         return 0; /* not enough buffer to return len bytes. */
71     memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
72     r->io.buffer.pos += len;
73     return 1;
74 }
75 
76 /* Returns read/write position in buffer. */
rioBufferTell(rio * r)77 static off_t rioBufferTell(rio *r) {
78     return r->io.buffer.pos;
79 }
80 
81 /* Flushes any buffer to target device if applicable. Returns 1 on success
82  * and 0 on failures. */
rioBufferFlush(rio * r)83 static int rioBufferFlush(rio *r) {
84     UNUSED(r);
85     return 1; /* Nothing to do, our write just appends to the buffer. */
86 }
87 
88 static const rio rioBufferIO = {
89     rioBufferRead,
90     rioBufferWrite,
91     rioBufferTell,
92     rioBufferFlush,
93     NULL,           /* update_checksum */
94     0,              /* current checksum */
95     0,              /* flags */
96     0,              /* bytes read or written */
97     0,              /* read/write chunk size */
98     { { NULL, 0 } } /* union for io-specific vars */
99 };
100 
rioInitWithBuffer(rio * r,sds s)101 void rioInitWithBuffer(rio *r, sds s) {
102     *r = rioBufferIO;
103     r->io.buffer.ptr = s;
104     r->io.buffer.pos = 0;
105 }
106 
107 /* --------------------- Stdio file pointer implementation ------------------- */
108 
109 /* Returns 1 or 0 for success/failure. */
rioFileWrite(rio * r,const void * buf,size_t len)110 static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
111     size_t retval;
112 
113     retval = fwrite(buf,len,1,r->io.file.fp);
114     r->io.file.buffered += len;
115 
116     if (r->io.file.autosync &&
117         r->io.file.buffered >= r->io.file.autosync)
118     {
119         fflush(r->io.file.fp);
120         redis_fsync(fileno(r->io.file.fp));
121         r->io.file.buffered = 0;
122     }
123     return retval;
124 }
125 
126 /* Returns 1 or 0 for success/failure. */
rioFileRead(rio * r,void * buf,size_t len)127 static size_t rioFileRead(rio *r, void *buf, size_t len) {
128     return fread(buf,len,1,r->io.file.fp);
129 }
130 
131 /* Returns read/write position in file. */
rioFileTell(rio * r)132 static off_t rioFileTell(rio *r) {
133     return ftello(r->io.file.fp);
134 }
135 
136 /* Flushes any buffer to target device if applicable. Returns 1 on success
137  * and 0 on failures. */
rioFileFlush(rio * r)138 static int rioFileFlush(rio *r) {
139     return (fflush(r->io.file.fp) == 0) ? 1 : 0;
140 }
141 
142 static const rio rioFileIO = {
143     rioFileRead,
144     rioFileWrite,
145     rioFileTell,
146     rioFileFlush,
147     NULL,           /* update_checksum */
148     0,              /* current checksum */
149     0,              /* flags */
150     0,              /* bytes read or written */
151     0,              /* read/write chunk size */
152     { { NULL, 0 } } /* union for io-specific vars */
153 };
154 
rioInitWithFile(rio * r,FILE * fp)155 void rioInitWithFile(rio *r, FILE *fp) {
156     *r = rioFileIO;
157     r->io.file.fp = fp;
158     r->io.file.buffered = 0;
159     r->io.file.autosync = 0;
160 }
161 
162 /* ------------------- Connection implementation -------------------
163  * We use this RIO implemetnation when reading an RDB file directly from
164  * the connection to the memory via rdbLoadRio(), thus this implementation
165  * only implements reading from a connection that is, normally,
166  * just a socket. */
167 
rioConnWrite(rio * r,const void * buf,size_t len)168 static size_t rioConnWrite(rio *r, const void *buf, size_t len) {
169     UNUSED(r);
170     UNUSED(buf);
171     UNUSED(len);
172     return 0; /* Error, this target does not yet support writing. */
173 }
174 
175 /* Returns 1 or 0 for success/failure. */
rioConnRead(rio * r,void * buf,size_t len)176 static size_t rioConnRead(rio *r, void *buf, size_t len) {
177     size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos;
178 
179     /* If the buffer is too small for the entire request: realloc. */
180     if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len)
181         r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf));
182 
183     /* If the remaining unused buffer is not large enough: memmove so that we
184      * can read the rest. */
185     if (len > avail && sdsavail(r->io.conn.buf) < len - avail) {
186         sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
187         r->io.conn.pos = 0;
188     }
189 
190     /* Make sure the caller didn't request to read past the limit.
191      * If they didn't we'll buffer till the limit, if they did, we'll
192      * return an error. */
193     if (r->io.conn.read_limit != 0 && r->io.conn.read_limit < r->io.conn.read_so_far + len) {
194         errno = EOVERFLOW;
195         return 0;
196     }
197 
198     /* If we don't already have all the data in the sds, read more */
199     while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) {
200         size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos;
201         size_t needs = len - buffered;
202         /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of
203          * the two. */
204         size_t toread = needs < PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN: needs;
205         if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf);
206         if (r->io.conn.read_limit != 0 &&
207             r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit)
208         {
209             toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered;
210         }
211         int retval = connRead(r->io.conn.conn,
212                           (char*)r->io.conn.buf + sdslen(r->io.conn.buf),
213                           toread);
214         if (retval <= 0) {
215             if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
216             return 0;
217         }
218         sdsIncrLen(r->io.conn.buf, retval);
219     }
220 
221     memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len);
222     r->io.conn.read_so_far += len;
223     r->io.conn.pos += len;
224     return len;
225 }
226 
227 /* Returns read/write position in file. */
rioConnTell(rio * r)228 static off_t rioConnTell(rio *r) {
229     return r->io.conn.read_so_far;
230 }
231 
232 /* Flushes any buffer to target device if applicable. Returns 1 on success
233  * and 0 on failures. */
rioConnFlush(rio * r)234 static int rioConnFlush(rio *r) {
235     /* Our flush is implemented by the write method, that recognizes a
236      * buffer set to NULL with a count of zero as a flush request. */
237     return rioConnWrite(r,NULL,0);
238 }
239 
240 static const rio rioConnIO = {
241     rioConnRead,
242     rioConnWrite,
243     rioConnTell,
244     rioConnFlush,
245     NULL,           /* update_checksum */
246     0,              /* current checksum */
247     0,              /* flags */
248     0,              /* bytes read or written */
249     0,              /* read/write chunk size */
250     { { NULL, 0 } } /* union for io-specific vars */
251 };
252 
253 /* Create an RIO that implements a buffered read from an fd
254  * read_limit argument stops buffering when the reaching the limit. */
rioInitWithConn(rio * r,connection * conn,size_t read_limit)255 void rioInitWithConn(rio *r, connection *conn, size_t read_limit) {
256     *r = rioConnIO;
257     r->io.conn.conn = conn;
258     r->io.conn.pos = 0;
259     r->io.conn.read_limit = read_limit;
260     r->io.conn.read_so_far = 0;
261     r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
262     sdsclear(r->io.conn.buf);
263 }
264 
265 /* Release the RIO tream. Optionally returns the unread buffered data
266  * when the SDS pointer 'remaining' is passed. */
rioFreeConn(rio * r,sds * remaining)267 void rioFreeConn(rio *r, sds *remaining) {
268     if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) {
269         if (r->io.conn.pos > 0) sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
270         *remaining = r->io.conn.buf;
271     } else {
272         sdsfree(r->io.conn.buf);
273         if (remaining) *remaining = NULL;
274     }
275     r->io.conn.buf = NULL;
276 }
277 
278 /* ------------------- File descriptor implementation ------------------
279  * This target is used to write the RDB file to pipe, when the master just
280  * streams the data to the replicas without creating an RDB on-disk image
281  * (diskless replication option).
282  * It only implements writes. */
283 
284 /* Returns 1 or 0 for success/failure.
285  *
286  * When buf is NULL and len is 0, the function performs a flush operation
287  * if there is some pending buffer, so this function is also used in order
288  * to implement rioFdFlush(). */
rioFdWrite(rio * r,const void * buf,size_t len)289 static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
290     ssize_t retval;
291     unsigned char *p = (unsigned char*) buf;
292     int doflush = (buf == NULL && len == 0);
293 
294     /* For small writes, we rather keep the data in user-space buffer, and flush
295      * it only when it grows. however for larger writes, we prefer to flush
296      * any pre-existing buffer, and write the new one directly without reallocs
297      * and memory copying. */
298     if (len > PROTO_IOBUF_LEN) {
299         /* First, flush any pre-existing buffered data. */
300         if (sdslen(r->io.fd.buf)) {
301             if (rioFdWrite(r, NULL, 0) == 0)
302                 return 0;
303         }
304         /* Write the new data, keeping 'p' and 'len' from the input. */
305     } else {
306         if (len) {
307             r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
308             if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
309                 doflush = 1;
310             if (!doflush)
311                 return 1;
312         }
313         /* Flusing the buffered data. set 'p' and 'len' accordintly. */
314         p = (unsigned char*) r->io.fd.buf;
315         len = sdslen(r->io.fd.buf);
316     }
317 
318     size_t nwritten = 0;
319     while(nwritten != len) {
320         retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
321         if (retval <= 0) {
322             /* With blocking io, which is the sole user of this
323              * rio target, EWOULDBLOCK is returned only because of
324              * the SO_SNDTIMEO socket option, so we translate the error
325              * into one more recognizable by the user. */
326             if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
327             return 0; /* error. */
328         }
329         nwritten += retval;
330     }
331 
332     r->io.fd.pos += len;
333     sdsclear(r->io.fd.buf);
334     return 1;
335 }
336 
337 /* Returns 1 or 0 for success/failure. */
rioFdRead(rio * r,void * buf,size_t len)338 static size_t rioFdRead(rio *r, void *buf, size_t len) {
339     UNUSED(r);
340     UNUSED(buf);
341     UNUSED(len);
342     return 0; /* Error, this target does not support reading. */
343 }
344 
345 /* Returns read/write position in file. */
rioFdTell(rio * r)346 static off_t rioFdTell(rio *r) {
347     return r->io.fd.pos;
348 }
349 
350 /* Flushes any buffer to target device if applicable. Returns 1 on success
351  * and 0 on failures. */
rioFdFlush(rio * r)352 static int rioFdFlush(rio *r) {
353     /* Our flush is implemented by the write method, that recognizes a
354      * buffer set to NULL with a count of zero as a flush request. */
355     return rioFdWrite(r,NULL,0);
356 }
357 
358 static const rio rioFdIO = {
359     rioFdRead,
360     rioFdWrite,
361     rioFdTell,
362     rioFdFlush,
363     NULL,           /* update_checksum */
364     0,              /* current checksum */
365     0,              /* flags */
366     0,              /* bytes read or written */
367     0,              /* read/write chunk size */
368     { { NULL, 0 } } /* union for io-specific vars */
369 };
370 
rioInitWithFd(rio * r,int fd)371 void rioInitWithFd(rio *r, int fd) {
372     *r = rioFdIO;
373     r->io.fd.fd = fd;
374     r->io.fd.pos = 0;
375     r->io.fd.buf = sdsempty();
376 }
377 
378 /* release the rio stream. */
rioFreeFd(rio * r)379 void rioFreeFd(rio *r) {
380     sdsfree(r->io.fd.buf);
381 }
382 
383 /* ---------------------------- Generic functions ---------------------------- */
384 
385 /* This function can be installed both in memory and file streams when checksum
386  * computation is needed. */
rioGenericUpdateChecksum(rio * r,const void * buf,size_t len)387 void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
388     r->cksum = crc64(r->cksum,buf,len);
389 }
390 
391 /* Set the file-based rio object to auto-fsync every 'bytes' file written.
392  * By default this is set to zero that means no automatic file sync is
393  * performed.
394  *
395  * This feature is useful in a few contexts since when we rely on OS write
396  * buffers sometimes the OS buffers way too much, resulting in too many
397  * disk I/O concentrated in very little time. When we fsync in an explicit
398  * way instead the I/O pressure is more distributed across time. */
rioSetAutoSync(rio * r,off_t bytes)399 void rioSetAutoSync(rio *r, off_t bytes) {
400     if(r->write != rioFileIO.write) return;
401     r->io.file.autosync = bytes;
402 }
403 
404 /* --------------------------- Higher level interface --------------------------
405  *
406  * The following higher level functions use lower level rio.c functions to help
407  * generating the Redis protocol for the Append Only File. */
408 
409 /* Write multi bulk count in the format: "*<count>\r\n". */
rioWriteBulkCount(rio * r,char prefix,long count)410 size_t rioWriteBulkCount(rio *r, char prefix, long count) {
411     char cbuf[128];
412     int clen;
413 
414     cbuf[0] = prefix;
415     clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
416     cbuf[clen++] = '\r';
417     cbuf[clen++] = '\n';
418     if (rioWrite(r,cbuf,clen) == 0) return 0;
419     return clen;
420 }
421 
422 /* Write binary-safe string in the format: "$<count>\r\n<payload>\r\n". */
rioWriteBulkString(rio * r,const char * buf,size_t len)423 size_t rioWriteBulkString(rio *r, const char *buf, size_t len) {
424     size_t nwritten;
425 
426     if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0;
427     if (len > 0 && rioWrite(r,buf,len) == 0) return 0;
428     if (rioWrite(r,"\r\n",2) == 0) return 0;
429     return nwritten+len+2;
430 }
431 
432 /* Write a long long value in format: "$<count>\r\n<payload>\r\n". */
rioWriteBulkLongLong(rio * r,long long l)433 size_t rioWriteBulkLongLong(rio *r, long long l) {
434     char lbuf[32];
435     unsigned int llen;
436 
437     llen = ll2string(lbuf,sizeof(lbuf),l);
438     return rioWriteBulkString(r,lbuf,llen);
439 }
440 
441 /* Write a double value in the format: "$<count>\r\n<payload>\r\n" */
rioWriteBulkDouble(rio * r,double d)442 size_t rioWriteBulkDouble(rio *r, double d) {
443     char dbuf[128];
444     unsigned int dlen;
445 
446     dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
447     return rioWriteBulkString(r,dbuf,dlen);
448 }
449