1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "atomicvar.h"
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <math.h>
35 #include <ctype.h>
36 
37 static void setProtocolError(const char *errstr, client *c);
38 
39 /* Return the size consumed from the allocator, for the specified SDS string,
40  * including internal fragmentation. This function is used in order to compute
41  * the client output buffer size. */
sdsZmallocSize(sds s)42 size_t sdsZmallocSize(sds s) {
43     void *sh = sdsAllocPtr(s);
44     return zmalloc_size(sh);
45 }
46 
47 /* Return the amount of memory used by the sds string at object->ptr
48  * for a string object. */
getStringObjectSdsUsedMemory(robj * o)49 size_t getStringObjectSdsUsedMemory(robj *o) {
50     serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
51     switch(o->encoding) {
52     case OBJ_ENCODING_RAW: return sdsZmallocSize(o->ptr);
53     case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj);
54     default: return 0; /* Just integer encoding for now. */
55     }
56 }
57 
58 /* Client.reply list dup and free methods. */
dupClientReplyValue(void * o)59 void *dupClientReplyValue(void *o) {
60     clientReplyBlock *old = o;
61     clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size);
62     memcpy(buf, o, sizeof(clientReplyBlock) + old->size);
63     return buf;
64 }
65 
freeClientReplyValue(void * o)66 void freeClientReplyValue(void *o) {
67     zfree(o);
68 }
69 
listMatchObjects(void * a,void * b)70 int listMatchObjects(void *a, void *b) {
71     return equalStringObjects(a,b);
72 }
73 
74 /* This function links the client to the global linked list of clients.
75  * unlinkClient() does the opposite, among other things. */
linkClient(client * c)76 void linkClient(client *c) {
77     listAddNodeTail(server.clients,c);
78     /* Note that we remember the linked list node where the client is stored,
79      * this way removing the client in unlinkClient() will not require
80      * a linear scan, but just a constant time operation. */
81     c->client_list_node = listLast(server.clients);
82     uint64_t id = htonu64(c->id);
83     raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
84 }
85 
createClient(int fd)86 client *createClient(int fd) {
87     client *c = zmalloc(sizeof(client));
88 
89     /* passing -1 as fd it is possible to create a non connected client.
90      * This is useful since all the commands needs to be executed
91      * in the context of a client. When commands are executed in other
92      * contexts (for instance a Lua script) we need a non connected client. */
93     if (fd != -1) {
94         anetNonBlock(NULL,fd);
95         anetEnableTcpNoDelay(NULL,fd);
96         if (server.tcpkeepalive)
97             anetKeepAlive(NULL,fd,server.tcpkeepalive);
98         if (aeCreateFileEvent(server.el,fd,AE_READABLE,
99             readQueryFromClient, c) == AE_ERR)
100         {
101             close(fd);
102             zfree(c);
103             return NULL;
104         }
105     }
106 
107     selectDb(c,0);
108     uint64_t client_id;
109     atomicGetIncr(server.next_client_id,client_id,1);
110     c->id = client_id;
111     c->fd = fd;
112     c->name = NULL;
113     c->bufpos = 0;
114     c->qb_pos = 0;
115     c->querybuf = sdsempty();
116     c->pending_querybuf = sdsempty();
117     c->querybuf_peak = 0;
118     c->reqtype = 0;
119     c->argc = 0;
120     c->argv = NULL;
121     c->cmd = c->lastcmd = NULL;
122     c->multibulklen = 0;
123     c->bulklen = -1;
124     c->sentlen = 0;
125     c->flags = 0;
126     c->ctime = c->lastinteraction = server.unixtime;
127     c->authenticated = 0;
128     c->replstate = REPL_STATE_NONE;
129     c->repl_put_online_on_ack = 0;
130     c->reploff = 0;
131     c->read_reploff = 0;
132     c->repl_ack_off = 0;
133     c->repl_ack_time = 0;
134     c->slave_listening_port = 0;
135     c->slave_ip[0] = '\0';
136     c->slave_capa = SLAVE_CAPA_NONE;
137     c->reply = listCreate();
138     c->reply_bytes = 0;
139     c->obuf_soft_limit_reached_time = 0;
140     listSetFreeMethod(c->reply,freeClientReplyValue);
141     listSetDupMethod(c->reply,dupClientReplyValue);
142     c->btype = BLOCKED_NONE;
143     c->bpop.timeout = 0;
144     c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
145     c->bpop.target = NULL;
146     c->bpop.xread_group = NULL;
147     c->bpop.xread_consumer = NULL;
148     c->bpop.xread_group_noack = 0;
149     c->bpop.numreplicas = 0;
150     c->bpop.reploffset = 0;
151     c->woff = 0;
152     c->watched_keys = listCreate();
153     c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
154     c->pubsub_patterns = listCreate();
155     c->peerid = NULL;
156     c->client_list_node = NULL;
157     listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
158     listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
159     if (fd != -1) linkClient(c);
160     initClientMultiState(c);
161     return c;
162 }
163 
164 /* This funciton puts the client in the queue of clients that should write
165  * their output buffers to the socket. Note that it does not *yet* install
166  * the write handler, to start clients are put in a queue of clients that need
167  * to write, so we try to do that before returning in the event loop (see the
168  * handleClientsWithPendingWrites() function).
169  * If we fail and there is more data to write, compared to what the socket
170  * buffers can hold, then we'll really install the handler. */
clientInstallWriteHandler(client * c)171 void clientInstallWriteHandler(client *c) {
172     /* Schedule the client to write the output buffers to the socket only
173      * if not already done and, for slaves, if the slave can actually receive
174      * writes at this stage. */
175     if (!(c->flags & CLIENT_PENDING_WRITE) &&
176         (c->replstate == REPL_STATE_NONE ||
177          (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
178     {
179         /* Here instead of installing the write handler, we just flag the
180          * client and put it into a list of clients that have something
181          * to write to the socket. This way before re-entering the event
182          * loop, we can try to directly write to the client sockets avoiding
183          * a system call. We'll only really install the write handler if
184          * we'll not be able to write the whole reply at once. */
185         c->flags |= CLIENT_PENDING_WRITE;
186         listAddNodeHead(server.clients_pending_write,c);
187     }
188 }
189 
190 /* This function is called every time we are going to transmit new data
191  * to the client. The behavior is the following:
192  *
193  * If the client should receive new data (normal clients will) the function
194  * returns C_OK, and make sure to install the write handler in our event
195  * loop so that when the socket is writable new data gets written.
196  *
197  * If the client should not receive new data, because it is a fake client
198  * (used to load AOF in memory), a master or because the setup of the write
199  * handler failed, the function returns C_ERR.
200  *
201  * The function may return C_OK without actually installing the write
202  * event handler in the following cases:
203  *
204  * 1) The event handler should already be installed since the output buffer
205  *    already contains something.
206  * 2) The client is a slave but not yet online, so we want to just accumulate
207  *    writes in the buffer but not actually sending them yet.
208  *
209  * Typically gets called every time a reply is built, before adding more
210  * data to the clients output buffers. If the function returns C_ERR no
211  * data should be appended to the output buffers. */
prepareClientToWrite(client * c)212 int prepareClientToWrite(client *c) {
213     /* If it's the Lua client we always return ok without installing any
214      * handler since there is no socket at all. */
215     if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
216 
217     /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
218     if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
219 
220     /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
221      * is set. */
222     if ((c->flags & CLIENT_MASTER) &&
223         !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
224 
225     if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */
226 
227     /* Schedule the client to write the output buffers to the socket, unless
228      * it should already be setup to do so (it has already pending data). */
229     if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);
230 
231     /* Authorize the caller to queue in the output buffer of this client. */
232     return C_OK;
233 }
234 
235 /* -----------------------------------------------------------------------------
236  * Low level functions to add more data to output buffers.
237  * -------------------------------------------------------------------------- */
238 
_addReplyToBuffer(client * c,const char * s,size_t len)239 int _addReplyToBuffer(client *c, const char *s, size_t len) {
240     size_t available = sizeof(c->buf)-c->bufpos;
241 
242     if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
243 
244     /* If there already are entries in the reply list, we cannot
245      * add anything more to the static buffer. */
246     if (listLength(c->reply) > 0) return C_ERR;
247 
248     /* Check that the buffer has enough space available for this string. */
249     if (len > available) return C_ERR;
250 
251     memcpy(c->buf+c->bufpos,s,len);
252     c->bufpos+=len;
253     return C_OK;
254 }
255 
_addReplyStringToList(client * c,const char * s,size_t len)256 void _addReplyStringToList(client *c, const char *s, size_t len) {
257     if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
258 
259     listNode *ln = listLast(c->reply);
260     clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
261 
262     /* Note that 'tail' may be NULL even if we have a tail node, becuase when
263      * addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just
264      * fo fill it later, when the size of the bulk length is set. */
265 
266     /* Append to tail string when possible. */
267     if (tail) {
268         /* Copy the part we can fit into the tail, and leave the rest for a
269          * new node */
270         size_t avail = tail->size - tail->used;
271         size_t copy = avail >= len? len: avail;
272         memcpy(tail->buf + tail->used, s, copy);
273         tail->used += copy;
274         s += copy;
275         len -= copy;
276     }
277     if (len) {
278         /* Create a new node, make sure it is allocated to at
279          * least PROTO_REPLY_CHUNK_BYTES */
280         size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
281         tail = zmalloc(size + sizeof(clientReplyBlock));
282         /* take over the allocation's internal fragmentation */
283         tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
284         tail->used = len;
285         memcpy(tail->buf, s, len);
286         listAddNodeTail(c->reply, tail);
287         c->reply_bytes += tail->size;
288     }
289     asyncCloseClientOnOutputBufferLimitReached(c);
290 }
291 
292 /* -----------------------------------------------------------------------------
293  * Higher level functions to queue data on the client output buffer.
294  * The following functions are the ones that commands implementations will call.
295  * -------------------------------------------------------------------------- */
296 
297 /* Add the object 'obj' string representation to the client output buffer. */
addReply(client * c,robj * obj)298 void addReply(client *c, robj *obj) {
299     if (prepareClientToWrite(c) != C_OK) return;
300 
301     if (sdsEncodedObject(obj)) {
302         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
303             _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
304     } else if (obj->encoding == OBJ_ENCODING_INT) {
305         /* For integer encoded strings we just convert it into a string
306          * using our optimized function, and attach the resulting string
307          * to the output buffer. */
308         char buf[32];
309         size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
310         if (_addReplyToBuffer(c,buf,len) != C_OK)
311             _addReplyStringToList(c,buf,len);
312     } else {
313         serverPanic("Wrong obj->encoding in addReply()");
314     }
315 }
316 
317 /* Add the SDS 's' string to the client output buffer, as a side effect
318  * the SDS string is freed. */
addReplySds(client * c,sds s)319 void addReplySds(client *c, sds s) {
320     if (prepareClientToWrite(c) != C_OK) {
321         /* The caller expects the sds to be free'd. */
322         sdsfree(s);
323         return;
324     }
325     if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK)
326         _addReplyStringToList(c,s,sdslen(s));
327     sdsfree(s);
328 }
329 
330 /* This low level function just adds whatever protocol you send it to the
331  * client buffer, trying the static buffer initially, and using the string
332  * of objects if not possible.
333  *
334  * It is efficient because does not create an SDS object nor an Redis object
335  * if not needed. The object will only be created by calling
336  * _addReplyStringToList() if we fail to extend the existing tail object
337  * in the list of objects. */
addReplyString(client * c,const char * s,size_t len)338 void addReplyString(client *c, const char *s, size_t len) {
339     if (prepareClientToWrite(c) != C_OK) return;
340     if (_addReplyToBuffer(c,s,len) != C_OK)
341         _addReplyStringToList(c,s,len);
342 }
343 
344 /* Low level function called by the addReplyError...() functions.
345  * It emits the protocol for a Redis error, in the form:
346  *
347  * -ERRORCODE Error Message<CR><LF>
348  *
349  * If the error code is already passed in the string 's', the error
350  * code provided is used, otherwise the string "-ERR " for the generic
351  * error code is automatically added. */
addReplyErrorLength(client * c,const char * s,size_t len)352 void addReplyErrorLength(client *c, const char *s, size_t len) {
353     /* If the string already starts with "-..." then the error code
354      * is provided by the caller. Otherwise we use "-ERR". */
355     if (!len || s[0] != '-') addReplyString(c,"-ERR ",5);
356     addReplyString(c,s,len);
357     addReplyString(c,"\r\n",2);
358 
359     /* Sometimes it could be normal that a slave replies to a master with
360      * an error and this function gets called. Actually the error will never
361      * be sent because addReply*() against master clients has no effect...
362      * A notable example is:
363      *
364      *    EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x
365      *
366      * Where the master must propagate the first change even if the second
367      * will produce an error. However it is useful to log such events since
368      * they are rare and may hint at errors in a script or a bug in Redis. */
369     if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
370         char* to = c->flags & CLIENT_MASTER? "master": "replica";
371         char* from = c->flags & CLIENT_MASTER? "replica": "master";
372         char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
373         serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
374                              "to its %s: '%s' after processing the command "
375                              "'%s'", from, to, s, cmdname);
376     }
377 }
378 
addReplyError(client * c,const char * err)379 void addReplyError(client *c, const char *err) {
380     addReplyErrorLength(c,err,strlen(err));
381 }
382 
addReplyErrorFormat(client * c,const char * fmt,...)383 void addReplyErrorFormat(client *c, const char *fmt, ...) {
384     size_t l, j;
385     va_list ap;
386     va_start(ap,fmt);
387     sds s = sdscatvprintf(sdsempty(),fmt,ap);
388     va_end(ap);
389     /* Make sure there are no newlines in the string, otherwise invalid protocol
390      * is emitted. */
391     l = sdslen(s);
392     for (j = 0; j < l; j++) {
393         if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
394     }
395     addReplyErrorLength(c,s,sdslen(s));
396     sdsfree(s);
397 }
398 
addReplyStatusLength(client * c,const char * s,size_t len)399 void addReplyStatusLength(client *c, const char *s, size_t len) {
400     addReplyString(c,"+",1);
401     addReplyString(c,s,len);
402     addReplyString(c,"\r\n",2);
403 }
404 
addReplyStatus(client * c,const char * status)405 void addReplyStatus(client *c, const char *status) {
406     addReplyStatusLength(c,status,strlen(status));
407 }
408 
addReplyStatusFormat(client * c,const char * fmt,...)409 void addReplyStatusFormat(client *c, const char *fmt, ...) {
410     va_list ap;
411     va_start(ap,fmt);
412     sds s = sdscatvprintf(sdsempty(),fmt,ap);
413     va_end(ap);
414     addReplyStatusLength(c,s,sdslen(s));
415     sdsfree(s);
416 }
417 
418 /* Adds an empty object to the reply list that will contain the multi bulk
419  * length, which is not known when this function is called. */
addDeferredMultiBulkLength(client * c)420 void *addDeferredMultiBulkLength(client *c) {
421     /* Note that we install the write event here even if the object is not
422      * ready to be sent, since we are sure that before returning to the
423      * event loop setDeferredMultiBulkLength() will be called. */
424     if (prepareClientToWrite(c) != C_OK) return NULL;
425     listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
426     return listLast(c->reply);
427 }
428 
429 /* Populate the length object and try gluing it to the next chunk. */
setDeferredMultiBulkLength(client * c,void * node,long length)430 void setDeferredMultiBulkLength(client *c, void *node, long length) {
431     listNode *ln = (listNode*)node;
432     clientReplyBlock *next;
433     char lenstr[128];
434     size_t lenstr_len = sprintf(lenstr, "*%ld\r\n", length);
435 
436     /* Abort when *node is NULL: when the client should not accept writes
437      * we return NULL in addDeferredMultiBulkLength() */
438     if (node == NULL) return;
439     serverAssert(!listNodeValue(ln));
440 
441     /* Normally we fill this dummy NULL node, added by addDeferredMultiBulkLength(),
442      * with a new buffer structure containing the protocol needed to specify
443      * the length of the array following. However sometimes when there is
444      * little memory to move, we may instead remove this NULL node, and prefix
445      * our protocol in the node immediately after to it, in order to save a
446      * write(2) syscall later. Conditions needed to do it:
447      *
448      * - The next node is non-NULL,
449      * - It has enough room already allocated
450      * - And not too large (avoid large memmove) */
451     if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
452         next->size - next->used >= lenstr_len &&
453         next->used < PROTO_REPLY_CHUNK_BYTES * 4) {
454         memmove(next->buf + lenstr_len, next->buf, next->used);
455         memcpy(next->buf, lenstr, lenstr_len);
456         next->used += lenstr_len;
457         listDelNode(c->reply,ln);
458     } else {
459         /* Create a new node */
460         clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock));
461         /* Take over the allocation's internal fragmentation */
462         buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock);
463         buf->used = lenstr_len;
464         memcpy(buf->buf, lenstr, lenstr_len);
465         listNodeValue(ln) = buf;
466         c->reply_bytes += buf->size;
467     }
468     asyncCloseClientOnOutputBufferLimitReached(c);
469 }
470 
471 /* Add a double as a bulk reply */
addReplyDouble(client * c,double d)472 void addReplyDouble(client *c, double d) {
473     char dbuf[128], sbuf[128];
474     int dlen, slen;
475     if (isinf(d)) {
476         /* Libc in odd systems (Hi Solaris!) will format infinite in a
477          * different way, so better to handle it in an explicit way. */
478         addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
479     } else {
480         dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
481         slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
482         addReplyString(c,sbuf,slen);
483     }
484 }
485 
486 /* Add a long double as a bulk reply, but uses a human readable formatting
487  * of the double instead of exposing the crude behavior of doubles to the
488  * dear user. */
addReplyHumanLongDouble(client * c,long double d)489 void addReplyHumanLongDouble(client *c, long double d) {
490     robj *o = createStringObjectFromLongDouble(d,1);
491     addReplyBulk(c,o);
492     decrRefCount(o);
493 }
494 
495 /* Add a long long as integer reply or bulk len / multi bulk count.
496  * Basically this is used to output <prefix><long long><crlf>. */
addReplyLongLongWithPrefix(client * c,long long ll,char prefix)497 void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
498     char buf[128];
499     int len;
500 
501     /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
502      * so we have a few shared objects to use if the integer is small
503      * like it is most of the times. */
504     if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
505         addReply(c,shared.mbulkhdr[ll]);
506         return;
507     } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
508         addReply(c,shared.bulkhdr[ll]);
509         return;
510     }
511 
512     buf[0] = prefix;
513     len = ll2string(buf+1,sizeof(buf)-1,ll);
514     buf[len+1] = '\r';
515     buf[len+2] = '\n';
516     addReplyString(c,buf,len+3);
517 }
518 
addReplyLongLong(client * c,long long ll)519 void addReplyLongLong(client *c, long long ll) {
520     if (ll == 0)
521         addReply(c,shared.czero);
522     else if (ll == 1)
523         addReply(c,shared.cone);
524     else
525         addReplyLongLongWithPrefix(c,ll,':');
526 }
527 
addReplyMultiBulkLen(client * c,long length)528 void addReplyMultiBulkLen(client *c, long length) {
529     if (length < OBJ_SHARED_BULKHDR_LEN)
530         addReply(c,shared.mbulkhdr[length]);
531     else
532         addReplyLongLongWithPrefix(c,length,'*');
533 }
534 
535 /* Create the length prefix of a bulk reply, example: $2234 */
addReplyBulkLen(client * c,robj * obj)536 void addReplyBulkLen(client *c, robj *obj) {
537     size_t len;
538 
539     if (sdsEncodedObject(obj)) {
540         len = sdslen(obj->ptr);
541     } else {
542         long n = (long)obj->ptr;
543 
544         /* Compute how many bytes will take this integer as a radix 10 string */
545         len = 1;
546         if (n < 0) {
547             len++;
548             n = -n;
549         }
550         while((n = n/10) != 0) {
551             len++;
552         }
553     }
554 
555     if (len < OBJ_SHARED_BULKHDR_LEN)
556         addReply(c,shared.bulkhdr[len]);
557     else
558         addReplyLongLongWithPrefix(c,len,'$');
559 }
560 
561 /* Add a Redis Object as a bulk reply */
addReplyBulk(client * c,robj * obj)562 void addReplyBulk(client *c, robj *obj) {
563     addReplyBulkLen(c,obj);
564     addReply(c,obj);
565     addReply(c,shared.crlf);
566 }
567 
568 /* Add a C buffer as bulk reply */
addReplyBulkCBuffer(client * c,const void * p,size_t len)569 void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
570     addReplyLongLongWithPrefix(c,len,'$');
571     addReplyString(c,p,len);
572     addReply(c,shared.crlf);
573 }
574 
575 /* Add sds to reply (takes ownership of sds and frees it) */
addReplyBulkSds(client * c,sds s)576 void addReplyBulkSds(client *c, sds s)  {
577     addReplyLongLongWithPrefix(c,sdslen(s),'$');
578     addReplySds(c,s);
579     addReply(c,shared.crlf);
580 }
581 
582 /* Add a C null term string as bulk reply */
addReplyBulkCString(client * c,const char * s)583 void addReplyBulkCString(client *c, const char *s) {
584     if (s == NULL) {
585         addReply(c,shared.nullbulk);
586     } else {
587         addReplyBulkCBuffer(c,s,strlen(s));
588     }
589 }
590 
591 /* Add a long long as a bulk reply */
addReplyBulkLongLong(client * c,long long ll)592 void addReplyBulkLongLong(client *c, long long ll) {
593     char buf[64];
594     int len;
595 
596     len = ll2string(buf,64,ll);
597     addReplyBulkCBuffer(c,buf,len);
598 }
599 
600 /* Add an array of C strings as status replies with a heading.
601  * This function is typically invoked by from commands that support
602  * subcommands in response to the 'help' subcommand. The help array
603  * is terminated by NULL sentinel. */
addReplyHelp(client * c,const char ** help)604 void addReplyHelp(client *c, const char **help) {
605     sds cmd = sdsnew((char*) c->argv[0]->ptr);
606     void *blenp = addDeferredMultiBulkLength(c);
607     int blen = 0;
608 
609     sdstoupper(cmd);
610     addReplyStatusFormat(c,
611         "%s <subcommand> arg arg ... arg. Subcommands are:",cmd);
612     sdsfree(cmd);
613 
614     while (help[blen]) addReplyStatus(c,help[blen++]);
615 
616     blen++;  /* Account for the header line(s). */
617     setDeferredMultiBulkLength(c,blenp,blen);
618 }
619 
620 /* Add a suggestive error reply.
621  * This function is typically invoked by from commands that support
622  * subcommands in response to an unknown subcommand or argument error. */
addReplySubcommandSyntaxError(client * c)623 void addReplySubcommandSyntaxError(client *c) {
624     sds cmd = sdsnew((char*) c->argv[0]->ptr);
625     sdstoupper(cmd);
626     addReplyErrorFormat(c,
627         "Unknown subcommand or wrong number of arguments for '%s'. Try %s HELP.",
628         (char*)c->argv[1]->ptr,cmd);
629     sdsfree(cmd);
630 }
631 
632 /* Append 'src' client output buffers into 'dst' client output buffers.
633  * This function clears the output buffers of 'src' */
AddReplyFromClient(client * dst,client * src)634 void AddReplyFromClient(client *dst, client *src) {
635     if (prepareClientToWrite(dst) != C_OK)
636         return;
637     addReplyString(dst,src->buf, src->bufpos);
638     if (listLength(src->reply))
639         listJoin(dst->reply,src->reply);
640     dst->reply_bytes += src->reply_bytes;
641     src->reply_bytes = 0;
642     src->bufpos = 0;
643 }
644 
645 /* Copy 'src' client output buffers into 'dst' client output buffers.
646  * The function takes care of freeing the old output buffers of the
647  * destination client. */
copyClientOutputBuffer(client * dst,client * src)648 void copyClientOutputBuffer(client *dst, client *src) {
649     listRelease(dst->reply);
650     dst->sentlen = 0;
651     dst->reply = listDup(src->reply);
652     memcpy(dst->buf,src->buf,src->bufpos);
653     dst->bufpos = src->bufpos;
654     dst->reply_bytes = src->reply_bytes;
655 }
656 
657 /* Return true if the specified client has pending reply buffers to write to
658  * the socket. */
clientHasPendingReplies(client * c)659 int clientHasPendingReplies(client *c) {
660     return c->bufpos || listLength(c->reply);
661 }
662 
663 #define MAX_ACCEPTS_PER_CALL 1000
acceptCommonHandler(int fd,int flags,char * ip)664 static void acceptCommonHandler(int fd, int flags, char *ip) {
665     client *c;
666     if ((c = createClient(fd)) == NULL) {
667         serverLog(LL_WARNING,
668             "Error registering fd event for the new client: %s (fd=%d)",
669             strerror(errno),fd);
670         close(fd); /* May be already closed, just ignore errors */
671         return;
672     }
673     /* If maxclient directive is set and this is one client more... close the
674      * connection. Note that we create the client instead to check before
675      * for this condition, since now the socket is already set in non-blocking
676      * mode and we can send an error for free using the Kernel I/O */
677     if (listLength(server.clients) > server.maxclients) {
678         char *err = "-ERR max number of clients reached\r\n";
679 
680         /* That's a best effort error message, don't check write errors */
681         if (write(c->fd,err,strlen(err)) == -1) {
682             /* Nothing to do, Just to avoid the warning... */
683         }
684         server.stat_rejected_conn++;
685         freeClient(c);
686         return;
687     }
688 
689     /* If the server is running in protected mode (the default) and there
690      * is no password set, nor a specific interface is bound, we don't accept
691      * requests from non loopback interfaces. Instead we try to explain the
692      * user what to do to fix it if needed. */
693     if (server.protected_mode &&
694         server.bindaddr_count == 0 &&
695         server.requirepass == NULL &&
696         !(flags & CLIENT_UNIX_SOCKET) &&
697         ip != NULL)
698     {
699         if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
700             char *err =
701                 "-DENIED Redis is running in protected mode because protected "
702                 "mode is enabled, no bind address was specified, no "
703                 "authentication password is requested to clients. In this mode "
704                 "connections are only accepted from the loopback interface. "
705                 "If you want to connect from external computers to Redis you "
706                 "may adopt one of the following solutions: "
707                 "1) Just disable protected mode sending the command "
708                 "'CONFIG SET protected-mode no' from the loopback interface "
709                 "by connecting to Redis from the same host the server is "
710                 "running, however MAKE SURE Redis is not publicly accessible "
711                 "from internet if you do so. Use CONFIG REWRITE to make this "
712                 "change permanent. "
713                 "2) Alternatively you can just disable the protected mode by "
714                 "editing the Redis configuration file, and setting the protected "
715                 "mode option to 'no', and then restarting the server. "
716                 "3) If you started the server manually just for testing, restart "
717                 "it with the '--protected-mode no' option. "
718                 "4) Setup a bind address or an authentication password. "
719                 "NOTE: You only need to do one of the above things in order for "
720                 "the server to start accepting connections from the outside.\r\n";
721             if (write(c->fd,err,strlen(err)) == -1) {
722                 /* Nothing to do, Just to avoid the warning... */
723             }
724             server.stat_rejected_conn++;
725             freeClient(c);
726             return;
727         }
728     }
729 
730     server.stat_numconnections++;
731     c->flags |= flags;
732 }
733 
acceptTcpHandler(aeEventLoop * el,int fd,void * privdata,int mask)734 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
735     int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
736     char cip[NET_IP_STR_LEN];
737     UNUSED(el);
738     UNUSED(mask);
739     UNUSED(privdata);
740 
741     while(max--) {
742         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
743         if (cfd == ANET_ERR) {
744             if (errno != EWOULDBLOCK)
745                 serverLog(LL_WARNING,
746                     "Accepting client connection: %s", server.neterr);
747             return;
748         }
749         serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
750         acceptCommonHandler(cfd,0,cip);
751     }
752 }
753 
acceptUnixHandler(aeEventLoop * el,int fd,void * privdata,int mask)754 void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
755     int cfd, max = MAX_ACCEPTS_PER_CALL;
756     UNUSED(el);
757     UNUSED(mask);
758     UNUSED(privdata);
759 
760     while(max--) {
761         cfd = anetUnixAccept(server.neterr, fd);
762         if (cfd == ANET_ERR) {
763             if (errno != EWOULDBLOCK)
764                 serverLog(LL_WARNING,
765                     "Accepting client connection: %s", server.neterr);
766             return;
767         }
768         serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
769         acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL);
770     }
771 }
772 
freeClientArgv(client * c)773 static void freeClientArgv(client *c) {
774     int j;
775     for (j = 0; j < c->argc; j++)
776         decrRefCount(c->argv[j]);
777     c->argc = 0;
778     c->cmd = NULL;
779 }
780 
781 /* Close all the slaves connections. This is useful in chained replication
782  * when we resync with our own master and want to force all our slaves to
783  * resync with us as well. */
disconnectSlaves(void)784 void disconnectSlaves(void) {
785     while (listLength(server.slaves)) {
786         listNode *ln = listFirst(server.slaves);
787         freeClient((client*)ln->value);
788     }
789 }
790 
791 /* Remove the specified client from global lists where the client could
792  * be referenced, not including the Pub/Sub channels.
793  * This is used by freeClient() and replicationCacheMaster(). */
unlinkClient(client * c)794 void unlinkClient(client *c) {
795     listNode *ln;
796 
797     /* If this is marked as current client unset it. */
798     if (server.current_client == c) server.current_client = NULL;
799 
800     /* Certain operations must be done only if the client has an active socket.
801      * If the client was already unlinked or if it's a "fake client" the
802      * fd is already set to -1. */
803     if (c->fd != -1) {
804         /* Remove from the list of active clients. */
805         if (c->client_list_node) {
806             uint64_t id = htonu64(c->id);
807             raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL);
808             listDelNode(server.clients,c->client_list_node);
809             c->client_list_node = NULL;
810         }
811 
812         /* In the case of diskless replication the fork is writing to the
813          * sockets and just closing the fd isn't enough, if we don't also
814          * shutdown the socket the fork will continue to write to the slave
815          * and the salve will only find out that it was disconnected when
816          * it will finish reading the rdb. */
817         if ((c->flags & CLIENT_SLAVE) &&
818             (c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)) {
819             shutdown(c->fd, SHUT_RDWR);
820         }
821 
822         /* Unregister async I/O handlers and close the socket. */
823         aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
824         aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
825         close(c->fd);
826         c->fd = -1;
827     }
828 
829     /* Remove from the list of pending writes if needed. */
830     if (c->flags & CLIENT_PENDING_WRITE) {
831         ln = listSearchKey(server.clients_pending_write,c);
832         serverAssert(ln != NULL);
833         listDelNode(server.clients_pending_write,ln);
834         c->flags &= ~CLIENT_PENDING_WRITE;
835     }
836 
837     /* When client was just unblocked because of a blocking operation,
838      * remove it from the list of unblocked clients. */
839     if (c->flags & CLIENT_UNBLOCKED) {
840         ln = listSearchKey(server.unblocked_clients,c);
841         serverAssert(ln != NULL);
842         listDelNode(server.unblocked_clients,ln);
843         c->flags &= ~CLIENT_UNBLOCKED;
844     }
845 }
846 
freeClient(client * c)847 void freeClient(client *c) {
848     listNode *ln;
849 
850     /* If a client is protected, yet we need to free it right now, make sure
851      * to at least use asynchronous freeing. */
852     if (c->flags & CLIENT_PROTECTED) {
853         freeClientAsync(c);
854         return;
855     }
856 
857     /* If it is our master that's beging disconnected we should make sure
858      * to cache the state to try a partial resynchronization later.
859      *
860      * Note that before doing this we make sure that the client is not in
861      * some unexpected state, by checking its flags. */
862     if (server.master && c->flags & CLIENT_MASTER) {
863         serverLog(LL_WARNING,"Connection with master lost.");
864         if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
865                           CLIENT_CLOSE_ASAP|
866                           CLIENT_BLOCKED)))
867         {
868             replicationCacheMaster(c);
869             return;
870         }
871     }
872 
873     /* Log link disconnection with slave */
874     if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
875         serverLog(LL_WARNING,"Connection with replica %s lost.",
876             replicationGetSlaveName(c));
877     }
878 
879     /* Free the query buffer */
880     sdsfree(c->querybuf);
881     sdsfree(c->pending_querybuf);
882     c->querybuf = NULL;
883 
884     /* Deallocate structures used to block on blocking ops. */
885     if (c->flags & CLIENT_BLOCKED) unblockClient(c);
886     dictRelease(c->bpop.keys);
887 
888     /* UNWATCH all the keys */
889     unwatchAllKeys(c);
890     listRelease(c->watched_keys);
891 
892     /* Unsubscribe from all the pubsub channels */
893     pubsubUnsubscribeAllChannels(c,0);
894     pubsubUnsubscribeAllPatterns(c,0);
895     dictRelease(c->pubsub_channels);
896     listRelease(c->pubsub_patterns);
897 
898     /* Free data structures. */
899     listRelease(c->reply);
900     freeClientArgv(c);
901 
902     /* Unlink the client: this will close the socket, remove the I/O
903      * handlers, and remove references of the client from different
904      * places where active clients may be referenced. */
905     unlinkClient(c);
906 
907     /* Master/slave cleanup Case 1:
908      * we lost the connection with a slave. */
909     if (c->flags & CLIENT_SLAVE) {
910         if (c->replstate == SLAVE_STATE_SEND_BULK) {
911             if (c->repldbfd != -1) close(c->repldbfd);
912             if (c->replpreamble) sdsfree(c->replpreamble);
913         }
914         list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
915         ln = listSearchKey(l,c);
916         serverAssert(ln != NULL);
917         listDelNode(l,ln);
918         /* We need to remember the time when we started to have zero
919          * attached slaves, as after some time we'll free the replication
920          * backlog. */
921         if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0)
922             server.repl_no_slaves_since = server.unixtime;
923         refreshGoodSlavesCount();
924     }
925 
926     /* Master/slave cleanup Case 2:
927      * we lost the connection with the master. */
928     if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
929 
930     /* If this client was scheduled for async freeing we need to remove it
931      * from the queue. */
932     if (c->flags & CLIENT_CLOSE_ASAP) {
933         ln = listSearchKey(server.clients_to_close,c);
934         serverAssert(ln != NULL);
935         listDelNode(server.clients_to_close,ln);
936     }
937 
938     /* Release other dynamically allocated client structure fields,
939      * and finally release the client structure itself. */
940     if (c->name) decrRefCount(c->name);
941     zfree(c->argv);
942     freeClientMultiState(c);
943     sdsfree(c->peerid);
944     zfree(c);
945 }
946 
947 /* Schedule a client to free it at a safe time in the serverCron() function.
948  * This function is useful when we need to terminate a client but we are in
949  * a context where calling freeClient() is not possible, because the client
950  * should be valid for the continuation of the flow of the program. */
freeClientAsync(client * c)951 void freeClientAsync(client *c) {
952     if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
953     c->flags |= CLIENT_CLOSE_ASAP;
954     listAddNodeTail(server.clients_to_close,c);
955 }
956 
freeClientsInAsyncFreeQueue(void)957 void freeClientsInAsyncFreeQueue(void) {
958     while (listLength(server.clients_to_close)) {
959         listNode *ln = listFirst(server.clients_to_close);
960         client *c = listNodeValue(ln);
961 
962         c->flags &= ~CLIENT_CLOSE_ASAP;
963         freeClient(c);
964         listDelNode(server.clients_to_close,ln);
965     }
966 }
967 
968 /* Return a client by ID, or NULL if the client ID is not in the set
969  * of registered clients. Note that "fake clients", created with -1 as FD,
970  * are not registered clients. */
lookupClientByID(uint64_t id)971 client *lookupClientByID(uint64_t id) {
972     id = htonu64(id);
973     client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id));
974     return (c == raxNotFound) ? NULL : c;
975 }
976 
977 /* Write data in output buffers to client. Return C_OK if the client
978  * is still valid after the call, C_ERR if it was freed. */
writeToClient(int fd,client * c,int handler_installed)979 int writeToClient(int fd, client *c, int handler_installed) {
980     ssize_t nwritten = 0, totwritten = 0;
981     size_t objlen;
982     clientReplyBlock *o;
983 
984     while(clientHasPendingReplies(c)) {
985         if (c->bufpos > 0) {
986             nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
987             if (nwritten <= 0) break;
988             c->sentlen += nwritten;
989             totwritten += nwritten;
990 
991             /* If the buffer was sent, set bufpos to zero to continue with
992              * the remainder of the reply. */
993             if ((int)c->sentlen == c->bufpos) {
994                 c->bufpos = 0;
995                 c->sentlen = 0;
996             }
997         } else {
998             o = listNodeValue(listFirst(c->reply));
999             objlen = o->used;
1000 
1001             if (objlen == 0) {
1002                 c->reply_bytes -= o->size;
1003                 listDelNode(c->reply,listFirst(c->reply));
1004                 continue;
1005             }
1006 
1007             nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
1008             if (nwritten <= 0) break;
1009             c->sentlen += nwritten;
1010             totwritten += nwritten;
1011 
1012             /* If we fully sent the object on head go to the next one */
1013             if (c->sentlen == objlen) {
1014                 c->reply_bytes -= o->size;
1015                 listDelNode(c->reply,listFirst(c->reply));
1016                 c->sentlen = 0;
1017                 /* If there are no longer objects in the list, we expect
1018                  * the count of reply bytes to be exactly zero. */
1019                 if (listLength(c->reply) == 0)
1020                     serverAssert(c->reply_bytes == 0);
1021             }
1022         }
1023         /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
1024          * bytes, in a single threaded server it's a good idea to serve
1025          * other clients as well, even if a very large request comes from
1026          * super fast link that is always able to accept data (in real world
1027          * scenario think about 'KEYS *' against the loopback interface).
1028          *
1029          * However if we are over the maxmemory limit we ignore that and
1030          * just deliver as much data as it is possible to deliver.
1031          *
1032          * Moreover, we also send as much as possible if the client is
1033          * a slave or a monitor (otherwise, on high-speed traffic, the
1034          * replication/output buffer will grow indefinitely) */
1035         if (totwritten > NET_MAX_WRITES_PER_EVENT &&
1036             (server.maxmemory == 0 ||
1037              zmalloc_used_memory() < server.maxmemory) &&
1038             !(c->flags & CLIENT_SLAVE)) break;
1039     }
1040     server.stat_net_output_bytes += totwritten;
1041     if (nwritten == -1) {
1042         if (errno == EAGAIN) {
1043             nwritten = 0;
1044         } else {
1045             serverLog(LL_VERBOSE,
1046                 "Error writing to client: %s", strerror(errno));
1047             freeClient(c);
1048             return C_ERR;
1049         }
1050     }
1051     if (totwritten > 0) {
1052         /* For clients representing masters we don't count sending data
1053          * as an interaction, since we always send REPLCONF ACK commands
1054          * that take some time to just fill the socket output buffer.
1055          * We just rely on data / pings received for timeout detection. */
1056         if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
1057     }
1058     if (!clientHasPendingReplies(c)) {
1059         c->sentlen = 0;
1060         if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1061 
1062         /* Close connection after entire reply has been sent. */
1063         if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
1064             freeClient(c);
1065             return C_ERR;
1066         }
1067     }
1068     return C_OK;
1069 }
1070 
1071 /* Write event handler. Just send data to the client. */
sendReplyToClient(aeEventLoop * el,int fd,void * privdata,int mask)1072 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1073     UNUSED(el);
1074     UNUSED(mask);
1075     writeToClient(fd,privdata,1);
1076 }
1077 
1078 /* This function is called just before entering the event loop, in the hope
1079  * we can just write the replies to the client output buffer without any
1080  * need to use a syscall in order to install the writable event handler,
1081  * get it called, and so forth. */
handleClientsWithPendingWrites(void)1082 int handleClientsWithPendingWrites(void) {
1083     listIter li;
1084     listNode *ln;
1085     int processed = listLength(server.clients_pending_write);
1086 
1087     listRewind(server.clients_pending_write,&li);
1088     while((ln = listNext(&li))) {
1089         client *c = listNodeValue(ln);
1090         c->flags &= ~CLIENT_PENDING_WRITE;
1091         listDelNode(server.clients_pending_write,ln);
1092 
1093         /* If a client is protected, don't do anything,
1094          * that may trigger write error or recreate handler. */
1095         if (c->flags & CLIENT_PROTECTED) continue;
1096 
1097         /* Try to write buffers to the client socket. */
1098         if (writeToClient(c->fd,c,0) == C_ERR) continue;
1099 
1100         /* If after the synchronous writes above we still have data to
1101          * output to the client, we need to install the writable handler. */
1102         if (clientHasPendingReplies(c)) {
1103             int ae_flags = AE_WRITABLE;
1104             /* For the fsync=always policy, we want that a given FD is never
1105              * served for reading and writing in the same event loop iteration,
1106              * so that in the middle of receiving the query, and serving it
1107              * to the client, we'll call beforeSleep() that will do the
1108              * actual fsync of AOF to disk. AE_BARRIER ensures that. */
1109             if (server.aof_state == AOF_ON &&
1110                 server.aof_fsync == AOF_FSYNC_ALWAYS)
1111             {
1112                 ae_flags |= AE_BARRIER;
1113             }
1114             if (aeCreateFileEvent(server.el, c->fd, ae_flags,
1115                 sendReplyToClient, c) == AE_ERR)
1116             {
1117                     freeClientAsync(c);
1118             }
1119         }
1120     }
1121     return processed;
1122 }
1123 
1124 /* resetClient prepare the client to process the next command */
resetClient(client * c)1125 void resetClient(client *c) {
1126     redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
1127 
1128     freeClientArgv(c);
1129     c->reqtype = 0;
1130     c->multibulklen = 0;
1131     c->bulklen = -1;
1132 
1133     /* We clear the ASKING flag as well if we are not inside a MULTI, and
1134      * if what we just executed is not the ASKING command itself. */
1135     if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
1136         c->flags &= ~CLIENT_ASKING;
1137 
1138     /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
1139      * to the next command will be sent, but set the flag if the command
1140      * we just processed was "CLIENT REPLY SKIP". */
1141     c->flags &= ~CLIENT_REPLY_SKIP;
1142     if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
1143         c->flags |= CLIENT_REPLY_SKIP;
1144         c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
1145     }
1146 }
1147 
1148 /* This funciton is used when we want to re-enter the event loop but there
1149  * is the risk that the client we are dealing with will be freed in some
1150  * way. This happens for instance in:
1151  *
1152  * * DEBUG RELOAD and similar.
1153  * * When a Lua script is in -BUSY state.
1154  *
1155  * So the function will protect the client by doing two things:
1156  *
1157  * 1) It removes the file events. This way it is not possible that an
1158  *    error is signaled on the socket, freeing the client.
1159  * 2) Moreover it makes sure that if the client is freed in a different code
1160  *    path, it is not really released, but only marked for later release. */
protectClient(client * c)1161 void protectClient(client *c) {
1162     c->flags |= CLIENT_PROTECTED;
1163     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1164     aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1165 }
1166 
1167 /* This will undo the client protection done by protectClient() */
unprotectClient(client * c)1168 void unprotectClient(client *c) {
1169     if (c->flags & CLIENT_PROTECTED) {
1170         c->flags &= ~CLIENT_PROTECTED;
1171         aeCreateFileEvent(server.el,c->fd,AE_READABLE,readQueryFromClient,c);
1172         if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
1173     }
1174 }
1175 
1176 /* Like processMultibulkBuffer(), but for the inline protocol instead of RESP,
1177  * this function consumes the client query buffer and creates a command ready
1178  * to be executed inside the client structure. Returns C_OK if the command
1179  * is ready to be executed, or C_ERR if there is still protocol to read to
1180  * have a well formed command. The function also returns C_ERR when there is
1181  * a protocol error: in such a case the client structure is setup to reply
1182  * with the error and close the connection. */
processInlineBuffer(client * c)1183 int processInlineBuffer(client *c) {
1184     char *newline;
1185     int argc, j, linefeed_chars = 1;
1186     sds *argv, aux;
1187     size_t querylen;
1188 
1189     /* Search for end of line */
1190     newline = strchr(c->querybuf+c->qb_pos,'\n');
1191 
1192     /* Nothing to do without a \r\n */
1193     if (newline == NULL) {
1194         if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
1195             addReplyError(c,"Protocol error: too big inline request");
1196             setProtocolError("too big inline request",c);
1197         }
1198         return C_ERR;
1199     }
1200 
1201     /* Handle the \r\n case. */
1202     if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
1203         newline--, linefeed_chars++;
1204 
1205     /* Split the input buffer up to the \r\n */
1206     querylen = newline-(c->querybuf+c->qb_pos);
1207     aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
1208     argv = sdssplitargs(aux,&argc);
1209     sdsfree(aux);
1210     if (argv == NULL) {
1211         addReplyError(c,"Protocol error: unbalanced quotes in request");
1212         setProtocolError("unbalanced quotes in inline request",c);
1213         return C_ERR;
1214     }
1215 
1216     /* Newline from slaves can be used to refresh the last ACK time.
1217      * This is useful for a slave to ping back while loading a big
1218      * RDB file. */
1219     if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
1220         c->repl_ack_time = server.unixtime;
1221 
1222     /* Move querybuffer position to the next query in the buffer. */
1223     c->qb_pos += querylen+linefeed_chars;
1224 
1225     /* Setup argv array on client structure */
1226     if (argc) {
1227         if (c->argv) zfree(c->argv);
1228         c->argv = zmalloc(sizeof(robj*)*argc);
1229     }
1230 
1231     /* Create redis objects for all arguments. */
1232     for (c->argc = 0, j = 0; j < argc; j++) {
1233         c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
1234         c->argc++;
1235     }
1236     zfree(argv);
1237     return C_OK;
1238 }
1239 
1240 /* Helper function. Record protocol erro details in server log,
1241  * and set the client as CLIENT_CLOSE_AFTER_REPLY. */
1242 #define PROTO_DUMP_LEN 128
setProtocolError(const char * errstr,client * c)1243 static void setProtocolError(const char *errstr, client *c) {
1244     if (server.verbosity <= LL_VERBOSE) {
1245         sds client = catClientInfoString(sdsempty(),c);
1246 
1247         /* Sample some protocol to given an idea about what was inside. */
1248         char buf[256];
1249         if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
1250             snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos);
1251         } else {
1252             snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
1253         }
1254 
1255         /* Remove non printable chars. */
1256         char *p = buf;
1257         while (*p != '\0') {
1258             if (!isprint(*p)) *p = '.';
1259             p++;
1260         }
1261 
1262         /* Log all the client and protocol info. */
1263         serverLog(LL_VERBOSE,
1264             "Protocol error (%s) from client: %s. %s", errstr, client, buf);
1265         sdsfree(client);
1266     }
1267     c->flags |= CLIENT_CLOSE_AFTER_REPLY;
1268 }
1269 
1270 /* Process the query buffer for client 'c', setting up the client argument
1271  * vector for command execution. Returns C_OK if after running the function
1272  * the client has a well-formed ready to be processed command, otherwise
1273  * C_ERR if there is still to read more buffer to get the full command.
1274  * The function also returns C_ERR when there is a protocol error: in such a
1275  * case the client structure is setup to reply with the error and close
1276  * the connection.
1277  *
1278  * This function is called if processInputBuffer() detects that the next
1279  * command is in RESP format, so the first byte in the command is found
1280  * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
processMultibulkBuffer(client * c)1281 int processMultibulkBuffer(client *c) {
1282     char *newline = NULL;
1283     int ok;
1284     long long ll;
1285 
1286     if (c->multibulklen == 0) {
1287         /* The client should have been reset */
1288         serverAssertWithInfo(c,NULL,c->argc == 0);
1289 
1290         /* Multi bulk length cannot be read without a \r\n */
1291         newline = strchr(c->querybuf+c->qb_pos,'\r');
1292         if (newline == NULL) {
1293             if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
1294                 addReplyError(c,"Protocol error: too big mbulk count string");
1295                 setProtocolError("too big mbulk count string",c);
1296             }
1297             return C_ERR;
1298         }
1299 
1300         /* Buffer should also contain \n */
1301         if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
1302             return C_ERR;
1303 
1304         /* We know for sure there is a whole line since newline != NULL,
1305          * so go ahead and find out the multi bulk length. */
1306         serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
1307         ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
1308         if (!ok || ll > 1024*1024) {
1309             addReplyError(c,"Protocol error: invalid multibulk length");
1310             setProtocolError("invalid mbulk count",c);
1311             return C_ERR;
1312         } else if (ll > 10 && server.requirepass && !c->authenticated) {
1313             addReplyError(c, "Protocol error: unauthenticated multibulk length");
1314             setProtocolError("unauth mbulk count", c);
1315             return C_ERR;
1316         }
1317 
1318         c->qb_pos = (newline-c->querybuf)+2;
1319 
1320         if (ll <= 0) return C_OK;
1321 
1322         c->multibulklen = ll;
1323 
1324         /* Setup argv array on client structure */
1325         if (c->argv) zfree(c->argv);
1326         c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
1327     }
1328 
1329     serverAssertWithInfo(c,NULL,c->multibulklen > 0);
1330     while(c->multibulklen) {
1331         /* Read bulk length if unknown */
1332         if (c->bulklen == -1) {
1333             newline = strchr(c->querybuf+c->qb_pos,'\r');
1334             if (newline == NULL) {
1335                 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
1336                     addReplyError(c,
1337                         "Protocol error: too big bulk count string");
1338                     setProtocolError("too big bulk count string",c);
1339                     return C_ERR;
1340                 }
1341                 break;
1342             }
1343 
1344             /* Buffer should also contain \n */
1345             if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
1346                 break;
1347 
1348             if (c->querybuf[c->qb_pos] != '$') {
1349                 addReplyErrorFormat(c,
1350                     "Protocol error: expected '$', got '%c'",
1351                     c->querybuf[c->qb_pos]);
1352                 setProtocolError("expected $ but got something else",c);
1353                 return C_ERR;
1354             }
1355 
1356             ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
1357             if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
1358                 addReplyError(c,"Protocol error: invalid bulk length");
1359                 setProtocolError("invalid bulk length",c);
1360                 return C_ERR;
1361             } else if (ll > 16384 && server.requirepass && !c->authenticated) {
1362                 addReplyError(c, "Protocol error: unauthenticated bulk length");
1363                 setProtocolError("unauth bulk length", c);
1364                 return C_ERR;
1365             }
1366 
1367             c->qb_pos = newline-c->querybuf+2;
1368             if (ll >= PROTO_MBULK_BIG_ARG) {
1369                 /* If we are going to read a large object from network
1370                  * try to make it likely that it will start at c->querybuf
1371                  * boundary so that we can optimize object creation
1372                  * avoiding a large copy of data.
1373                  *
1374                  * But only when the data we have not parsed is less than
1375                  * or equal to ll+2. If the data length is greater than
1376                  * ll+2, trimming querybuf is just a waste of time, because
1377                  * at this time the querybuf contains not only our bulk. */
1378                 if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
1379                     sdsrange(c->querybuf,c->qb_pos,-1);
1380                     c->qb_pos = 0;
1381                     /* Hint the sds library about the amount of bytes this string is
1382                      * going to contain. */
1383                     c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2);
1384                 }
1385             }
1386             c->bulklen = ll;
1387         }
1388 
1389         /* Read bulk argument */
1390         if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
1391             /* Not enough data (+2 == trailing \r\n) */
1392             break;
1393         } else {
1394             /* Optimization: if the buffer contains JUST our bulk element
1395              * instead of creating a new object by *copying* the sds we
1396              * just use the current sds string. */
1397             if (c->qb_pos == 0 &&
1398                 c->bulklen >= PROTO_MBULK_BIG_ARG &&
1399                 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
1400             {
1401                 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
1402                 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
1403                 /* Assume that if we saw a fat argument we'll see another one
1404                  * likely... */
1405                 c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
1406                 sdsclear(c->querybuf);
1407             } else {
1408                 c->argv[c->argc++] =
1409                     createStringObject(c->querybuf+c->qb_pos,c->bulklen);
1410                 c->qb_pos += c->bulklen+2;
1411             }
1412             c->bulklen = -1;
1413             c->multibulklen--;
1414         }
1415     }
1416 
1417     /* We're done when c->multibulk == 0 */
1418     if (c->multibulklen == 0) return C_OK;
1419 
1420     /* Still not ready to process the command */
1421     return C_ERR;
1422 }
1423 
1424 /* This function is called every time, in the client structure 'c', there is
1425  * more query buffer to process, because we read more data from the socket
1426  * or because a client was blocked and later reactivated, so there could be
1427  * pending query buffer, already representing a full command, to process. */
processInputBuffer(client * c)1428 void processInputBuffer(client *c) {
1429     server.current_client = c;
1430 
1431     /* Keep processing while there is something in the input buffer */
1432     while(c->qb_pos < sdslen(c->querybuf)) {
1433         /* Return if clients are paused. */
1434         if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
1435 
1436         /* Immediately abort if the client is in the middle of something. */
1437         if (c->flags & CLIENT_BLOCKED) break;
1438 
1439         /* Don't process input from the master while there is a busy script
1440          * condition on the slave. We want just to accumulate the replication
1441          * stream (instead of replying -BUSY like we do with other clients) and
1442          * later resume the processing. */
1443         if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
1444 
1445         /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
1446          * written to the client. Make sure to not let the reply grow after
1447          * this flag has been set (i.e. don't process more commands).
1448          *
1449          * The same applies for clients we want to terminate ASAP. */
1450         if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
1451 
1452         /* Determine request type when unknown. */
1453         if (!c->reqtype) {
1454             if (c->querybuf[c->qb_pos] == '*') {
1455                 c->reqtype = PROTO_REQ_MULTIBULK;
1456             } else {
1457                 c->reqtype = PROTO_REQ_INLINE;
1458             }
1459         }
1460 
1461         if (c->reqtype == PROTO_REQ_INLINE) {
1462             if (processInlineBuffer(c) != C_OK) break;
1463         } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
1464             if (processMultibulkBuffer(c) != C_OK) break;
1465         } else {
1466             serverPanic("Unknown request type");
1467         }
1468 
1469         /* Multibulk processing could see a <= 0 length. */
1470         if (c->argc == 0) {
1471             resetClient(c);
1472         } else {
1473             /* Only reset the client when the command was executed. */
1474             if (processCommand(c) == C_OK) {
1475                 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
1476                     /* Update the applied replication offset of our master. */
1477                     c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
1478                 }
1479 
1480                 /* Don't reset the client structure for clients blocked in a
1481                  * module blocking command, so that the reply callback will
1482                  * still be able to access the client argv and argc field.
1483                  * The client will be reset in unblockClientFromModule(). */
1484                 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
1485                     resetClient(c);
1486             }
1487             /* freeMemoryIfNeeded may flush slave output buffers. This may
1488              * result into a slave, that may be the active client, to be
1489              * freed. */
1490             if (server.current_client == NULL) break;
1491         }
1492     }
1493 
1494     /* Trim to pos */
1495     if (server.current_client != NULL && c->qb_pos) {
1496         sdsrange(c->querybuf,c->qb_pos,-1);
1497         c->qb_pos = 0;
1498     }
1499 
1500     server.current_client = NULL;
1501 }
1502 
1503 /* This is a wrapper for processInputBuffer that also cares about handling
1504  * the replication forwarding to the sub-slaves, in case the client 'c'
1505  * is flagged as master. Usually you want to call this instead of the
1506  * raw processInputBuffer(). */
processInputBufferAndReplicate(client * c)1507 void processInputBufferAndReplicate(client *c) {
1508     if (!(c->flags & CLIENT_MASTER)) {
1509         processInputBuffer(c);
1510     } else {
1511         size_t prev_offset = c->reploff;
1512         processInputBuffer(c);
1513         size_t applied = c->reploff - prev_offset;
1514         if (applied) {
1515             replicationFeedSlavesFromMasterStream(server.slaves,
1516                     c->pending_querybuf, applied);
1517             sdsrange(c->pending_querybuf,applied,-1);
1518         }
1519     }
1520 }
1521 
readQueryFromClient(aeEventLoop * el,int fd,void * privdata,int mask)1522 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1523     client *c = (client*) privdata;
1524     int nread, readlen;
1525     size_t qblen;
1526     UNUSED(el);
1527     UNUSED(mask);
1528 
1529     readlen = PROTO_IOBUF_LEN;
1530     /* If this is a multi bulk request, and we are processing a bulk reply
1531      * that is large enough, try to maximize the probability that the query
1532      * buffer contains exactly the SDS string representing the object, even
1533      * at the risk of requiring more read(2) calls. This way the function
1534      * processMultiBulkBuffer() can avoid copying buffers to create the
1535      * Redis Object representing the argument. */
1536     if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
1537         && c->bulklen >= PROTO_MBULK_BIG_ARG)
1538     {
1539         ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
1540 
1541         /* Note that the 'remaining' variable may be zero in some edge case,
1542          * for example once we resume a blocked client after CLIENT PAUSE. */
1543         if (remaining > 0 && remaining < readlen) readlen = remaining;
1544     }
1545 
1546     qblen = sdslen(c->querybuf);
1547     if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
1548     c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
1549     nread = read(fd, c->querybuf+qblen, readlen);
1550     if (nread == -1) {
1551         if (errno == EAGAIN) {
1552             return;
1553         } else {
1554             serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
1555             freeClient(c);
1556             return;
1557         }
1558     } else if (nread == 0) {
1559         serverLog(LL_VERBOSE, "Client closed connection");
1560         freeClient(c);
1561         return;
1562     } else if (c->flags & CLIENT_MASTER) {
1563         /* Append the query buffer to the pending (not applied) buffer
1564          * of the master. We'll use this buffer later in order to have a
1565          * copy of the string applied by the last command executed. */
1566         c->pending_querybuf = sdscatlen(c->pending_querybuf,
1567                                         c->querybuf+qblen,nread);
1568     }
1569 
1570     sdsIncrLen(c->querybuf,nread);
1571     c->lastinteraction = server.unixtime;
1572     if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
1573     server.stat_net_input_bytes += nread;
1574     if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
1575         sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
1576 
1577         bytes = sdscatrepr(bytes,c->querybuf,64);
1578         serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
1579         sdsfree(ci);
1580         sdsfree(bytes);
1581         freeClient(c);
1582         return;
1583     }
1584 
1585     /* Time to process the buffer. If the client is a master we need to
1586      * compute the difference between the applied offset before and after
1587      * processing the buffer, to understand how much of the replication stream
1588      * was actually applied to the master state: this quantity, and its
1589      * corresponding part of the replication stream, will be propagated to
1590      * the sub-slaves and to the replication backlog. */
1591     processInputBufferAndReplicate(c);
1592 }
1593 
getClientsMaxBuffers(unsigned long * longest_output_list,unsigned long * biggest_input_buffer)1594 void getClientsMaxBuffers(unsigned long *longest_output_list,
1595                           unsigned long *biggest_input_buffer) {
1596     client *c;
1597     listNode *ln;
1598     listIter li;
1599     unsigned long lol = 0, bib = 0;
1600 
1601     listRewind(server.clients,&li);
1602     while ((ln = listNext(&li)) != NULL) {
1603         c = listNodeValue(ln);
1604 
1605         if (listLength(c->reply) > lol) lol = listLength(c->reply);
1606         if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf);
1607     }
1608     *longest_output_list = lol;
1609     *biggest_input_buffer = bib;
1610 }
1611 
1612 /* A Redis "Peer ID" is a colon separated ip:port pair.
1613  * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234".
1614  * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234".
1615  * For Unix sockets we use path:0, like in "/tmp/redis:0".
1616  *
1617  * A Peer ID always fits inside a buffer of NET_PEER_ID_LEN bytes, including
1618  * the null term.
1619  *
1620  * On failure the function still populates 'peerid' with the "?:0" string
1621  * in case you want to relax error checking or need to display something
1622  * anyway (see anetPeerToString implementation for more info). */
genClientPeerId(client * client,char * peerid,size_t peerid_len)1623 void genClientPeerId(client *client, char *peerid,
1624                             size_t peerid_len) {
1625     if (client->flags & CLIENT_UNIX_SOCKET) {
1626         /* Unix socket client. */
1627         snprintf(peerid,peerid_len,"%s:0",server.unixsocket);
1628     } else {
1629         /* TCP client. */
1630         anetFormatPeer(client->fd,peerid,peerid_len);
1631     }
1632 }
1633 
1634 /* This function returns the client peer id, by creating and caching it
1635  * if client->peerid is NULL, otherwise returning the cached value.
1636  * The Peer ID never changes during the life of the client, however it
1637  * is expensive to compute. */
getClientPeerId(client * c)1638 char *getClientPeerId(client *c) {
1639     char peerid[NET_PEER_ID_LEN];
1640 
1641     if (c->peerid == NULL) {
1642         genClientPeerId(c,peerid,sizeof(peerid));
1643         c->peerid = sdsnew(peerid);
1644     }
1645     return c->peerid;
1646 }
1647 
1648 /* Concatenate a string representing the state of a client in an human
1649  * readable format, into the sds string 's'. */
catClientInfoString(sds s,client * client)1650 sds catClientInfoString(sds s, client *client) {
1651     char flags[16], events[3], *p;
1652     int emask;
1653 
1654     p = flags;
1655     if (client->flags & CLIENT_SLAVE) {
1656         if (client->flags & CLIENT_MONITOR)
1657             *p++ = 'O';
1658         else
1659             *p++ = 'S';
1660     }
1661     if (client->flags & CLIENT_MASTER) *p++ = 'M';
1662     if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
1663     if (client->flags & CLIENT_MULTI) *p++ = 'x';
1664     if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
1665     if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
1666     if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
1667     if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
1668     if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
1669     if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
1670     if (client->flags & CLIENT_READONLY) *p++ = 'r';
1671     if (p == flags) *p++ = 'N';
1672     *p++ = '\0';
1673 
1674     emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);
1675     p = events;
1676     if (emask & AE_READABLE) *p++ = 'r';
1677     if (emask & AE_WRITABLE) *p++ = 'w';
1678     *p = '\0';
1679     return sdscatfmt(s,
1680         "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s",
1681         (unsigned long long) client->id,
1682         getClientPeerId(client),
1683         client->fd,
1684         client->name ? (char*)client->name->ptr : "",
1685         (long long)(server.unixtime - client->ctime),
1686         (long long)(server.unixtime - client->lastinteraction),
1687         flags,
1688         client->db->id,
1689         (int) dictSize(client->pubsub_channels),
1690         (int) listLength(client->pubsub_patterns),
1691         (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
1692         (unsigned long long) sdslen(client->querybuf),
1693         (unsigned long long) sdsavail(client->querybuf),
1694         (unsigned long long) client->bufpos,
1695         (unsigned long long) listLength(client->reply),
1696         (unsigned long long) getClientOutputBufferMemoryUsage(client),
1697         events,
1698         client->lastcmd ? client->lastcmd->name : "NULL");
1699 }
1700 
getAllClientsInfoString(int type)1701 sds getAllClientsInfoString(int type) {
1702     listNode *ln;
1703     listIter li;
1704     client *client;
1705     sds o = sdsnewlen(SDS_NOINIT,200*listLength(server.clients));
1706     sdsclear(o);
1707     listRewind(server.clients,&li);
1708     while ((ln = listNext(&li)) != NULL) {
1709         client = listNodeValue(ln);
1710         if (type != -1 && getClientType(client) != type) continue;
1711         o = catClientInfoString(o,client);
1712         o = sdscatlen(o,"\n",1);
1713     }
1714     return o;
1715 }
1716 
clientCommand(client * c)1717 void clientCommand(client *c) {
1718     listNode *ln;
1719     listIter li;
1720     client *client;
1721 
1722     if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
1723         const char *help[] = {
1724 "id                     -- Return the ID of the current connection.",
1725 "getname                -- Return the name of the current connection.",
1726 "kill <ip:port>         -- Kill connection made from <ip:port>.",
1727 "kill <option> <value> [option value ...] -- Kill connections. Options are:",
1728 "     addr <ip:port>                      -- Kill connection made from <ip:port>",
1729 "     type (normal|master|replica|pubsub) -- Kill connections by type.",
1730 "     skipme (yes|no)   -- Skip killing current connection (default: yes).",
1731 "list [options ...]     -- Return information about client connections. Options:",
1732 "     type (normal|master|replica|pubsub) -- Return clients of specified type.",
1733 "pause <timeout>        -- Suspend all Redis clients for <timout> milliseconds.",
1734 "reply (on|off|skip)    -- Control the replies sent to the current connection.",
1735 "setname <name>         -- Assign the name <name> to the current connection.",
1736 "unblock <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
1737 NULL
1738         };
1739         addReplyHelp(c, help);
1740     } else if (!strcasecmp(c->argv[1]->ptr,"id") && c->argc == 2) {
1741         /* CLIENT ID */
1742         addReplyLongLong(c,c->id);
1743     } else if (!strcasecmp(c->argv[1]->ptr,"list")) {
1744         /* CLIENT LIST */
1745         int type = -1;
1746         if (c->argc == 4 && !strcasecmp(c->argv[2]->ptr,"type")) {
1747             type = getClientTypeByName(c->argv[3]->ptr);
1748             if (type == -1) {
1749                 addReplyErrorFormat(c,"Unknown client type '%s'",
1750                     (char*) c->argv[3]->ptr);
1751                 return;
1752              }
1753         } else if (c->argc != 2) {
1754             addReply(c,shared.syntaxerr);
1755             return;
1756         }
1757         sds o = getAllClientsInfoString(type);
1758         addReplyBulkCBuffer(c,o,sdslen(o));
1759         sdsfree(o);
1760     } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) {
1761         /* CLIENT REPLY ON|OFF|SKIP */
1762         if (!strcasecmp(c->argv[2]->ptr,"on")) {
1763             c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF);
1764             addReply(c,shared.ok);
1765         } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
1766             c->flags |= CLIENT_REPLY_OFF;
1767         } else if (!strcasecmp(c->argv[2]->ptr,"skip")) {
1768             if (!(c->flags & CLIENT_REPLY_OFF))
1769                 c->flags |= CLIENT_REPLY_SKIP_NEXT;
1770         } else {
1771             addReply(c,shared.syntaxerr);
1772             return;
1773         }
1774     } else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
1775         /* CLIENT KILL <ip:port>
1776          * CLIENT KILL <option> [value] ... <option> [value] */
1777         char *addr = NULL;
1778         int type = -1;
1779         uint64_t id = 0;
1780         int skipme = 1;
1781         int killed = 0, close_this_client = 0;
1782 
1783         if (c->argc == 3) {
1784             /* Old style syntax: CLIENT KILL <addr> */
1785             addr = c->argv[2]->ptr;
1786             skipme = 0; /* With the old form, you can kill yourself. */
1787         } else if (c->argc > 3) {
1788             int i = 2; /* Next option index. */
1789 
1790             /* New style syntax: parse options. */
1791             while(i < c->argc) {
1792                 int moreargs = c->argc > i+1;
1793 
1794                 if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) {
1795                     long long tmp;
1796 
1797                     if (getLongLongFromObjectOrReply(c,c->argv[i+1],&tmp,NULL)
1798                         != C_OK) return;
1799                     id = tmp;
1800                 } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) {
1801                     type = getClientTypeByName(c->argv[i+1]->ptr);
1802                     if (type == -1) {
1803                         addReplyErrorFormat(c,"Unknown client type '%s'",
1804                             (char*) c->argv[i+1]->ptr);
1805                         return;
1806                     }
1807                 } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) {
1808                     addr = c->argv[i+1]->ptr;
1809                 } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && moreargs) {
1810                     if (!strcasecmp(c->argv[i+1]->ptr,"yes")) {
1811                         skipme = 1;
1812                     } else if (!strcasecmp(c->argv[i+1]->ptr,"no")) {
1813                         skipme = 0;
1814                     } else {
1815                         addReply(c,shared.syntaxerr);
1816                         return;
1817                     }
1818                 } else {
1819                     addReply(c,shared.syntaxerr);
1820                     return;
1821                 }
1822                 i += 2;
1823             }
1824         } else {
1825             addReply(c,shared.syntaxerr);
1826             return;
1827         }
1828 
1829         /* Iterate clients killing all the matching clients. */
1830         listRewind(server.clients,&li);
1831         while ((ln = listNext(&li)) != NULL) {
1832             client = listNodeValue(ln);
1833             if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
1834             if (type != -1 && getClientType(client) != type) continue;
1835             if (id != 0 && client->id != id) continue;
1836             if (c == client && skipme) continue;
1837 
1838             /* Kill it. */
1839             if (c == client) {
1840                 close_this_client = 1;
1841             } else {
1842                 freeClient(client);
1843             }
1844             killed++;
1845         }
1846 
1847         /* Reply according to old/new format. */
1848         if (c->argc == 3) {
1849             if (killed == 0)
1850                 addReplyError(c,"No such client");
1851             else
1852                 addReply(c,shared.ok);
1853         } else {
1854             addReplyLongLong(c,killed);
1855         }
1856 
1857         /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY
1858          * only after we queued the reply to its output buffers. */
1859         if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY;
1860     } else if (!strcasecmp(c->argv[1]->ptr,"unblock") && (c->argc == 3 ||
1861                                                           c->argc == 4))
1862     {
1863         /* CLIENT UNBLOCK <id> [timeout|error] */
1864         long long id;
1865         int unblock_error = 0;
1866 
1867         if (c->argc == 4) {
1868             if (!strcasecmp(c->argv[3]->ptr,"timeout")) {
1869                 unblock_error = 0;
1870             } else if (!strcasecmp(c->argv[3]->ptr,"error")) {
1871                 unblock_error = 1;
1872             } else {
1873                 addReplyError(c,
1874                     "CLIENT UNBLOCK reason should be TIMEOUT or ERROR");
1875                 return;
1876             }
1877         }
1878         if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
1879             != C_OK) return;
1880         struct client *target = lookupClientByID(id);
1881         if (target && target->flags & CLIENT_BLOCKED) {
1882             if (unblock_error)
1883                 addReplyError(target,
1884                     "-UNBLOCKED client unblocked via CLIENT UNBLOCK");
1885             else
1886                 replyToBlockedClientTimedOut(target);
1887             unblockClient(target);
1888             addReply(c,shared.cone);
1889         } else {
1890             addReply(c,shared.czero);
1891         }
1892     } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) {
1893         int j, len = sdslen(c->argv[2]->ptr);
1894         char *p = c->argv[2]->ptr;
1895 
1896         /* Setting the client name to an empty string actually removes
1897          * the current name. */
1898         if (len == 0) {
1899             if (c->name) decrRefCount(c->name);
1900             c->name = NULL;
1901             addReply(c,shared.ok);
1902             return;
1903         }
1904 
1905         /* Otherwise check if the charset is ok. We need to do this otherwise
1906          * CLIENT LIST format will break. You should always be able to
1907          * split by space to get the different fields. */
1908         for (j = 0; j < len; j++) {
1909             if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */
1910                 addReplyError(c,
1911                     "Client names cannot contain spaces, "
1912                     "newlines or special characters.");
1913                 return;
1914             }
1915         }
1916         if (c->name) decrRefCount(c->name);
1917         c->name = c->argv[2];
1918         incrRefCount(c->name);
1919         addReply(c,shared.ok);
1920     } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) {
1921         if (c->name)
1922             addReplyBulk(c,c->name);
1923         else
1924             addReply(c,shared.nullbulk);
1925     } else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) {
1926         long long duration;
1927 
1928         if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS)
1929                                         != C_OK) return;
1930         pauseClients(duration);
1931         addReply(c,shared.ok);
1932     } else {
1933         addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CLIENT HELP", (char*)c->argv[1]->ptr);
1934     }
1935 }
1936 
1937 /* This callback is bound to POST and "Host:" command names. Those are not
1938  * really commands, but are used in security attacks in order to talk to
1939  * Redis instances via HTTP, with a technique called "cross protocol scripting"
1940  * which exploits the fact that services like Redis will discard invalid
1941  * HTTP headers and will process what follows.
1942  *
1943  * As a protection against this attack, Redis will terminate the connection
1944  * when a POST or "Host:" header is seen, and will log the event from
1945  * time to time (to avoid creating a DOS as a result of too many logs). */
securityWarningCommand(client * c)1946 void securityWarningCommand(client *c) {
1947     static time_t logged_time;
1948     time_t now = time(NULL);
1949 
1950     if (labs(now-logged_time) > 60) {
1951         serverLog(LL_WARNING,"Possible SECURITY ATTACK detected. It looks like somebody is sending POST or Host: commands to Redis. This is likely due to an attacker attempting to use Cross Protocol Scripting to compromise your Redis instance. Connection aborted.");
1952         logged_time = now;
1953     }
1954     freeClientAsync(c);
1955 }
1956 
1957 /* Rewrite the command vector of the client. All the new objects ref count
1958  * is incremented. The old command vector is freed, and the old objects
1959  * ref count is decremented. */
rewriteClientCommandVector(client * c,int argc,...)1960 void rewriteClientCommandVector(client *c, int argc, ...) {
1961     va_list ap;
1962     int j;
1963     robj **argv; /* The new argument vector */
1964 
1965     argv = zmalloc(sizeof(robj*)*argc);
1966     va_start(ap,argc);
1967     for (j = 0; j < argc; j++) {
1968         robj *a;
1969 
1970         a = va_arg(ap, robj*);
1971         argv[j] = a;
1972         incrRefCount(a);
1973     }
1974     /* We free the objects in the original vector at the end, so we are
1975      * sure that if the same objects are reused in the new vector the
1976      * refcount gets incremented before it gets decremented. */
1977     for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
1978     zfree(c->argv);
1979     /* Replace argv and argc with our new versions. */
1980     c->argv = argv;
1981     c->argc = argc;
1982     c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
1983     serverAssertWithInfo(c,NULL,c->cmd != NULL);
1984     va_end(ap);
1985 }
1986 
1987 /* Completely replace the client command vector with the provided one. */
replaceClientCommandVector(client * c,int argc,robj ** argv)1988 void replaceClientCommandVector(client *c, int argc, robj **argv) {
1989     freeClientArgv(c);
1990     zfree(c->argv);
1991     c->argv = argv;
1992     c->argc = argc;
1993     c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
1994     serverAssertWithInfo(c,NULL,c->cmd != NULL);
1995 }
1996 
1997 /* Rewrite a single item in the command vector.
1998  * The new val ref count is incremented, and the old decremented.
1999  *
2000  * It is possible to specify an argument over the current size of the
2001  * argument vector: in this case the array of objects gets reallocated
2002  * and c->argc set to the max value. However it's up to the caller to
2003  *
2004  * 1. Make sure there are no "holes" and all the arguments are set.
2005  * 2. If the original argument vector was longer than the one we
2006  *    want to end with, it's up to the caller to set c->argc and
2007  *    free the no longer used objects on c->argv. */
rewriteClientCommandArgument(client * c,int i,robj * newval)2008 void rewriteClientCommandArgument(client *c, int i, robj *newval) {
2009     robj *oldval;
2010 
2011     if (i >= c->argc) {
2012         c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1));
2013         c->argc = i+1;
2014         c->argv[i] = NULL;
2015     }
2016     oldval = c->argv[i];
2017     c->argv[i] = newval;
2018     incrRefCount(newval);
2019     if (oldval) decrRefCount(oldval);
2020 
2021     /* If this is the command name make sure to fix c->cmd. */
2022     if (i == 0) {
2023         c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
2024         serverAssertWithInfo(c,NULL,c->cmd != NULL);
2025     }
2026 }
2027 
2028 /* This function returns the number of bytes that Redis is
2029  * using to store the reply still not read by the client.
2030  *
2031  * Note: this function is very fast so can be called as many time as
2032  * the caller wishes. The main usage of this function currently is
2033  * enforcing the client output length limits. */
getClientOutputBufferMemoryUsage(client * c)2034 unsigned long getClientOutputBufferMemoryUsage(client *c) {
2035     unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
2036     return c->reply_bytes + (list_item_size*listLength(c->reply));
2037 }
2038 
2039 /* Get the class of a client, used in order to enforce limits to different
2040  * classes of clients.
2041  *
2042  * The function will return one of the following:
2043  * CLIENT_TYPE_NORMAL -> Normal client
2044  * CLIENT_TYPE_SLAVE  -> Slave
2045  * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
2046  * CLIENT_TYPE_MASTER -> The client representing our replication master.
2047  */
getClientType(client * c)2048 int getClientType(client *c) {
2049     if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
2050     /* Even though MONITOR clients are marked as replicas, we
2051      * want the expose them as normal clients. */
2052     if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
2053         return CLIENT_TYPE_SLAVE;
2054     if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;
2055     return CLIENT_TYPE_NORMAL;
2056 }
2057 
getClientTypeByName(char * name)2058 int getClientTypeByName(char *name) {
2059     if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;
2060     else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;
2061     else if (!strcasecmp(name,"replica")) return CLIENT_TYPE_SLAVE;
2062     else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;
2063     else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER;
2064     else return -1;
2065 }
2066 
getClientTypeName(int class)2067 char *getClientTypeName(int class) {
2068     switch(class) {
2069     case CLIENT_TYPE_NORMAL: return "normal";
2070     case CLIENT_TYPE_SLAVE:  return "slave";
2071     case CLIENT_TYPE_PUBSUB: return "pubsub";
2072     case CLIENT_TYPE_MASTER: return "master";
2073     default:                       return NULL;
2074     }
2075 }
2076 
2077 /* The function checks if the client reached output buffer soft or hard
2078  * limit, and also update the state needed to check the soft limit as
2079  * a side effect.
2080  *
2081  * Return value: non-zero if the client reached the soft or the hard limit.
2082  *               Otherwise zero is returned. */
checkClientOutputBufferLimits(client * c)2083 int checkClientOutputBufferLimits(client *c) {
2084     int soft = 0, hard = 0, class;
2085     unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
2086 
2087     class = getClientType(c);
2088     /* For the purpose of output buffer limiting, masters are handled
2089      * like normal clients. */
2090     if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
2091 
2092     if (server.client_obuf_limits[class].hard_limit_bytes &&
2093         used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
2094         hard = 1;
2095     if (server.client_obuf_limits[class].soft_limit_bytes &&
2096         used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
2097         soft = 1;
2098 
2099     /* We need to check if the soft limit is reached continuously for the
2100      * specified amount of seconds. */
2101     if (soft) {
2102         if (c->obuf_soft_limit_reached_time == 0) {
2103             c->obuf_soft_limit_reached_time = server.unixtime;
2104             soft = 0; /* First time we see the soft limit reached */
2105         } else {
2106             time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
2107 
2108             if (elapsed <=
2109                 server.client_obuf_limits[class].soft_limit_seconds) {
2110                 soft = 0; /* The client still did not reached the max number of
2111                              seconds for the soft limit to be considered
2112                              reached. */
2113             }
2114         }
2115     } else {
2116         c->obuf_soft_limit_reached_time = 0;
2117     }
2118     return soft || hard;
2119 }
2120 
2121 /* Asynchronously close a client if soft or hard limit is reached on the
2122  * output buffer size. The caller can check if the client will be closed
2123  * checking if the client CLIENT_CLOSE_ASAP flag is set.
2124  *
2125  * Note: we need to close the client asynchronously because this function is
2126  * called from contexts where the client can't be freed safely, i.e. from the
2127  * lower level functions pushing data inside the client output buffers. */
asyncCloseClientOnOutputBufferLimitReached(client * c)2128 void asyncCloseClientOnOutputBufferLimitReached(client *c) {
2129     if (c->fd == -1) return; /* It is unsafe to free fake clients. */
2130     serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
2131     if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
2132     if (checkClientOutputBufferLimits(c)) {
2133         sds client = catClientInfoString(sdsempty(),c);
2134 
2135         freeClientAsync(c);
2136         serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
2137         sdsfree(client);
2138     }
2139 }
2140 
2141 /* Helper function used by freeMemoryIfNeeded() in order to flush slaves
2142  * output buffers without returning control to the event loop.
2143  * This is also called by SHUTDOWN for a best-effort attempt to send
2144  * slaves the latest writes. */
flushSlavesOutputBuffers(void)2145 void flushSlavesOutputBuffers(void) {
2146     listIter li;
2147     listNode *ln;
2148 
2149     listRewind(server.slaves,&li);
2150     while((ln = listNext(&li))) {
2151         client *slave = listNodeValue(ln);
2152         int events = aeGetFileEvents(server.el,slave->fd);
2153         int can_receive_writes = (events & AE_WRITABLE) ||
2154                                  (slave->flags & CLIENT_PENDING_WRITE);
2155 
2156         /* We don't want to send the pending data to the replica in a few
2157          * cases:
2158          *
2159          * 1. For some reason there is neither the write handler installed
2160          *    nor the client is flagged as to have pending writes: for some
2161          *    reason this replica may not be set to receive data. This is
2162          *    just for the sake of defensive programming.
2163          *
2164          * 2. The put_online_on_ack flag is true. To know why we don't want
2165          *    to send data to the replica in this case, please grep for the
2166          *    flag for this flag.
2167          *
2168          * 3. Obviously if the slave is not ONLINE.
2169          */
2170         if (slave->replstate == SLAVE_STATE_ONLINE &&
2171             can_receive_writes &&
2172             !slave->repl_put_online_on_ack &&
2173             clientHasPendingReplies(slave))
2174         {
2175             writeToClient(slave->fd,slave,0);
2176         }
2177     }
2178 }
2179 
2180 /* Pause clients up to the specified unixtime (in ms). While clients
2181  * are paused no command is processed from clients, so the data set can't
2182  * change during that time.
2183  *
2184  * However while this function pauses normal and Pub/Sub clients, slaves are
2185  * still served, so this function can be used on server upgrades where it is
2186  * required that slaves process the latest bytes from the replication stream
2187  * before being turned to masters.
2188  *
2189  * This function is also internally used by Redis Cluster for the manual
2190  * failover procedure implemented by CLUSTER FAILOVER.
2191  *
2192  * The function always succeed, even if there is already a pause in progress.
2193  * In such a case, the pause is extended if the duration is more than the
2194  * time left for the previous duration. However if the duration is smaller
2195  * than the time left for the previous pause, no change is made to the
2196  * left duration. */
pauseClients(mstime_t end)2197 void pauseClients(mstime_t end) {
2198     if (!server.clients_paused || end > server.clients_pause_end_time)
2199         server.clients_pause_end_time = end;
2200     server.clients_paused = 1;
2201 }
2202 
2203 /* Return non-zero if clients are currently paused. As a side effect the
2204  * function checks if the pause time was reached and clear it. */
clientsArePaused(void)2205 int clientsArePaused(void) {
2206     if (server.clients_paused &&
2207         server.clients_pause_end_time < server.mstime)
2208     {
2209         listNode *ln;
2210         listIter li;
2211         client *c;
2212 
2213         server.clients_paused = 0;
2214 
2215         /* Put all the clients in the unblocked clients queue in order to
2216          * force the re-processing of the input buffer if any. */
2217         listRewind(server.clients,&li);
2218         while ((ln = listNext(&li)) != NULL) {
2219             c = listNodeValue(ln);
2220 
2221             /* Don't touch slaves and blocked clients.
2222              * The latter pending requests will be processed when unblocked. */
2223             if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
2224             queueClientForReprocessing(c);
2225         }
2226     }
2227     return server.clients_paused;
2228 }
2229 
2230 /* This function is called by Redis in order to process a few events from
2231  * time to time while blocked into some not interruptible operation.
2232  * This allows to reply to clients with the -LOADING error while loading the
2233  * data set at startup or after a full resynchronization with the master
2234  * and so forth.
2235  *
2236  * It calls the event loop in order to process a few events. Specifically we
2237  * try to call the event loop 4 times as long as we receive acknowledge that
2238  * some event was processed, in order to go forward with the accept, read,
2239  * write, close sequence needed to serve a client.
2240  *
2241  * The function returns the total number of events processed. */
processEventsWhileBlocked(void)2242 int processEventsWhileBlocked(void) {
2243     int iterations = 4; /* See the function top-comment. */
2244     int count = 0;
2245     while (iterations--) {
2246         int events = 0;
2247         events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
2248         events += handleClientsWithPendingWrites();
2249         if (!events) break;
2250         count += events;
2251     }
2252     return count;
2253 }
2254