1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "atomicvar.h"
32 #include "cluster.h"
33 #include <sys/socket.h>
34 #include <sys/uio.h>
35 #include <math.h>
36 #include <ctype.h>
37 
38 static void setProtocolError(const char *errstr, client *c);
39 int postponeClientRead(client *c);
40 int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
41 
42 /* Return the size consumed from the allocator, for the specified SDS string,
43  * including internal fragmentation. This function is used in order to compute
44  * the client output buffer size. */
sdsZmallocSize(sds s)45 size_t sdsZmallocSize(sds s) {
46     void *sh = sdsAllocPtr(s);
47     return zmalloc_size(sh);
48 }
49 
50 /* Return the amount of memory used by the sds string at object->ptr
51  * for a string object. This includes internal fragmentation. */
getStringObjectSdsUsedMemory(robj * o)52 size_t getStringObjectSdsUsedMemory(robj *o) {
53     serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
54     switch(o->encoding) {
55     case OBJ_ENCODING_RAW: return sdsZmallocSize(o->ptr);
56     case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj);
57     default: return 0; /* Just integer encoding for now. */
58     }
59 }
60 
61 /* Return the length of a string object.
62  * This does NOT includes internal fragmentation or sds unused space. */
getStringObjectLen(robj * o)63 size_t getStringObjectLen(robj *o) {
64     serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
65     switch(o->encoding) {
66     case OBJ_ENCODING_RAW: return sdslen(o->ptr);
67     case OBJ_ENCODING_EMBSTR: return sdslen(o->ptr);
68     default: return 0; /* Just integer encoding for now. */
69     }
70 }
71 
72 /* Client.reply list dup and free methods. */
dupClientReplyValue(void * o)73 void *dupClientReplyValue(void *o) {
74     clientReplyBlock *old = o;
75     clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size);
76     memcpy(buf, o, sizeof(clientReplyBlock) + old->size);
77     return buf;
78 }
79 
freeClientReplyValue(void * o)80 void freeClientReplyValue(void *o) {
81     zfree(o);
82 }
83 
listMatchObjects(void * a,void * b)84 int listMatchObjects(void *a, void *b) {
85     return equalStringObjects(a,b);
86 }
87 
88 /* This function links the client to the global linked list of clients.
89  * unlinkClient() does the opposite, among other things. */
linkClient(client * c)90 void linkClient(client *c) {
91     listAddNodeTail(server.clients,c);
92     /* Note that we remember the linked list node where the client is stored,
93      * this way removing the client in unlinkClient() will not require
94      * a linear scan, but just a constant time operation. */
95     c->client_list_node = listLast(server.clients);
96     uint64_t id = htonu64(c->id);
97     raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
98 }
99 
100 /* Initialize client authentication state.
101  */
clientSetDefaultAuth(client * c)102 static void clientSetDefaultAuth(client *c) {
103     /* If the default user does not require authentication, the user is
104      * directly authenticated. */
105     c->user = DefaultUser;
106     c->authenticated = (c->user->flags & USER_FLAG_NOPASS) &&
107                        !(c->user->flags & USER_FLAG_DISABLED);
108 }
109 
authRequired(client * c)110 int authRequired(client *c) {
111     /* Check if the user is authenticated. This check is skipped in case
112      * the default user is flagged as "nopass" and is active. */
113     int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
114                           (DefaultUser->flags & USER_FLAG_DISABLED)) &&
115                         !c->authenticated;
116     return auth_required;
117 }
118 
createClient(connection * conn)119 client *createClient(connection *conn) {
120     client *c = zmalloc(sizeof(client));
121 
122     /* passing NULL as conn it is possible to create a non connected client.
123      * This is useful since all the commands needs to be executed
124      * in the context of a client. When commands are executed in other
125      * contexts (for instance a Lua script) we need a non connected client. */
126     if (conn) {
127         connEnableTcpNoDelay(conn);
128         if (server.tcpkeepalive)
129             connKeepAlive(conn,server.tcpkeepalive);
130         connSetReadHandler(conn, readQueryFromClient);
131         connSetPrivateData(conn, c);
132     }
133 
134     selectDb(c,0);
135     uint64_t client_id;
136     atomicGetIncr(server.next_client_id, client_id, 1);
137     c->id = client_id;
138     c->resp = 2;
139     c->conn = conn;
140     c->name = NULL;
141     c->bufpos = 0;
142     c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf);
143     c->ref_repl_buf_node = NULL;
144     c->ref_block_pos = 0;
145     c->qb_pos = 0;
146     c->querybuf = sdsempty();
147     c->pending_querybuf = sdsempty();
148     c->querybuf_peak = 0;
149     c->reqtype = 0;
150     c->argc = 0;
151     c->argv = NULL;
152     c->argv_len = 0;
153     c->argv_len_sum = 0;
154     c->original_argc = 0;
155     c->original_argv = NULL;
156     c->cmd = c->lastcmd = NULL;
157     c->multibulklen = 0;
158     c->bulklen = -1;
159     c->sentlen = 0;
160     c->flags = 0;
161     c->ctime = c->lastinteraction = server.unixtime;
162     clientSetDefaultAuth(c);
163     c->replstate = REPL_STATE_NONE;
164     c->repl_put_online_on_ack = 0;
165     c->reploff = 0;
166     c->read_reploff = 0;
167     c->repl_ack_off = 0;
168     c->repl_ack_time = 0;
169     c->repl_last_partial_write = 0;
170     c->slave_listening_port = 0;
171     c->slave_addr = NULL;
172     c->slave_capa = SLAVE_CAPA_NONE;
173     c->reply = listCreate();
174     c->reply_bytes = 0;
175     c->obuf_soft_limit_reached_time = 0;
176     listSetFreeMethod(c->reply,freeClientReplyValue);
177     listSetDupMethod(c->reply,dupClientReplyValue);
178     c->btype = BLOCKED_NONE;
179     c->bpop.timeout = 0;
180     c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType);
181     c->bpop.target = NULL;
182     c->bpop.xread_group = NULL;
183     c->bpop.xread_consumer = NULL;
184     c->bpop.xread_group_noack = 0;
185     c->bpop.numreplicas = 0;
186     c->bpop.reploffset = 0;
187     c->woff = 0;
188     c->watched_keys = listCreate();
189     c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
190     c->pubsub_patterns = listCreate();
191     c->peerid = NULL;
192     c->sockname = NULL;
193     c->client_list_node = NULL;
194     c->paused_list_node = NULL;
195     c->pending_read_list_node = NULL;
196     c->client_tracking_redirection = 0;
197     c->client_tracking_prefixes = NULL;
198     c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0;
199     c->last_memory_type = CLIENT_TYPE_NORMAL;
200     c->auth_callback = NULL;
201     c->auth_callback_privdata = NULL;
202     c->auth_module = NULL;
203     listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
204     listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
205     c->mem_usage_bucket = NULL;
206     c->mem_usage_bucket_node = NULL;
207     if (conn) linkClient(c);
208     initClientMultiState(c);
209     return c;
210 }
211 
212 /* This function puts the client in the queue of clients that should write
213  * their output buffers to the socket. Note that it does not *yet* install
214  * the write handler, to start clients are put in a queue of clients that need
215  * to write, so we try to do that before returning in the event loop (see the
216  * handleClientsWithPendingWrites() function).
217  * If we fail and there is more data to write, compared to what the socket
218  * buffers can hold, then we'll really install the handler. */
clientInstallWriteHandler(client * c)219 void clientInstallWriteHandler(client *c) {
220     /* Schedule the client to write the output buffers to the socket only
221      * if not already done and, for slaves, if the slave can actually receive
222      * writes at this stage. */
223     if (!(c->flags & CLIENT_PENDING_WRITE) &&
224         (c->replstate == REPL_STATE_NONE ||
225          (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
226     {
227         /* Here instead of installing the write handler, we just flag the
228          * client and put it into a list of clients that have something
229          * to write to the socket. This way before re-entering the event
230          * loop, we can try to directly write to the client sockets avoiding
231          * a system call. We'll only really install the write handler if
232          * we'll not be able to write the whole reply at once. */
233         c->flags |= CLIENT_PENDING_WRITE;
234         listAddNodeHead(server.clients_pending_write,c);
235     }
236 }
237 
238 /* This function is called every time we are going to transmit new data
239  * to the client. The behavior is the following:
240  *
241  * If the client should receive new data (normal clients will) the function
242  * returns C_OK, and make sure to install the write handler in our event
243  * loop so that when the socket is writable new data gets written.
244  *
245  * If the client should not receive new data, because it is a fake client
246  * (used to load AOF in memory), a master or because the setup of the write
247  * handler failed, the function returns C_ERR.
248  *
249  * The function may return C_OK without actually installing the write
250  * event handler in the following cases:
251  *
252  * 1) The event handler should already be installed since the output buffer
253  *    already contains something.
254  * 2) The client is a slave but not yet online, so we want to just accumulate
255  *    writes in the buffer but not actually sending them yet.
256  *
257  * Typically gets called every time a reply is built, before adding more
258  * data to the clients output buffers. If the function returns C_ERR no
259  * data should be appended to the output buffers. */
prepareClientToWrite(client * c)260 int prepareClientToWrite(client *c) {
261     /* If it's the Lua client we always return ok without installing any
262      * handler since there is no socket at all. */
263     if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
264 
265     /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
266     if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
267 
268     /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
269     if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
270 
271     /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
272      * is set. */
273     if ((c->flags & CLIENT_MASTER) &&
274         !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
275 
276     if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
277 
278     /* Schedule the client to write the output buffers to the socket, unless
279      * it should already be setup to do so (it has already pending data).
280      *
281      * If CLIENT_PENDING_READ is set, we're in an IO thread and should
282      * not install a write handler. Instead, it will be done by
283      * handleClientsWithPendingReadsUsingThreads() upon return.
284      */
285     if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
286             clientInstallWriteHandler(c);
287 
288     /* Authorize the caller to queue in the output buffer of this client. */
289     return C_OK;
290 }
291 
292 /* -----------------------------------------------------------------------------
293  * Low level functions to add more data to output buffers.
294  * -------------------------------------------------------------------------- */
295 
296 /* Attempts to add the reply to the static buffer in the client struct.
297  * Returns the length of data that is added to the reply buffer.
298  *
299  * Sanitizer suppression: client->buf_usable_size determined by
300  * zmalloc_usable_size() call. Writing beyond client->buf boundaries confuses
301  * sanitizer and generates a false positive out-of-bounds error */
302 REDIS_NO_SANITIZE("bounds")
_addReplyToBuffer(client * c,const char * s,size_t len)303 size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
304     size_t available = c->buf_usable_size - c->bufpos;
305 
306     /* If there already are entries in the reply list, we cannot
307      * add anything more to the static buffer. */
308     if (listLength(c->reply) > 0) return 0;
309 
310     size_t reply_len = len > available ? available : len;
311     memcpy(c->buf+c->bufpos,s,reply_len);
312     c->bufpos+=reply_len;
313     return reply_len;
314 }
315 
316 /* Adds the reply to the reply linked list.
317  * Note: some edits to this function need to be relayed to AddReplyFromClient. */
_addReplyProtoToList(client * c,const char * s,size_t len)318 void _addReplyProtoToList(client *c, const char *s, size_t len) {
319     listNode *ln = listLast(c->reply);
320     clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
321 
322     /* Note that 'tail' may be NULL even if we have a tail node, because when
323      * addReplyDeferredLen() is used, it sets a dummy node to NULL just
324      * to fill it later, when the size of the bulk length is set. */
325 
326     /* Append to tail string when possible. */
327     if (tail) {
328         /* Copy the part we can fit into the tail, and leave the rest for a
329          * new node */
330         size_t avail = tail->size - tail->used;
331         size_t copy = avail >= len? len: avail;
332         memcpy(tail->buf + tail->used, s, copy);
333         tail->used += copy;
334         s += copy;
335         len -= copy;
336     }
337     if (len) {
338         /* Create a new node, make sure it is allocated to at
339          * least PROTO_REPLY_CHUNK_BYTES */
340         size_t usable_size;
341         size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
342         tail = zmalloc_usable(size + sizeof(clientReplyBlock), &usable_size);
343         /* take over the allocation's internal fragmentation */
344         tail->size = usable_size - sizeof(clientReplyBlock);
345         tail->used = len;
346         memcpy(tail->buf, s, len);
347         listAddNodeTail(c->reply, tail);
348         c->reply_bytes += tail->size;
349 
350         closeClientOnOutputBufferLimitReached(c, 1);
351     }
352 }
353 
_addReplyToBufferOrList(client * c,const char * s,size_t len)354 void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
355     if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
356 
357     size_t reply_len = _addReplyToBuffer(c,s,len);
358     if (len > reply_len) _addReplyProtoToList(c,s+reply_len,len-reply_len);
359 }
360 
361 /* -----------------------------------------------------------------------------
362  * Higher level functions to queue data on the client output buffer.
363  * The following functions are the ones that commands implementations will call.
364  * -------------------------------------------------------------------------- */
365 
366 /* Add the object 'obj' string representation to the client output buffer. */
addReply(client * c,robj * obj)367 void addReply(client *c, robj *obj) {
368     if (prepareClientToWrite(c) != C_OK) return;
369 
370     if (sdsEncodedObject(obj)) {
371         _addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr));
372     } else if (obj->encoding == OBJ_ENCODING_INT) {
373         /* For integer encoded strings we just convert it into a string
374          * using our optimized function, and attach the resulting string
375          * to the output buffer. */
376         char buf[32];
377         size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
378         _addReplyToBufferOrList(c,buf,len);
379     } else {
380         serverPanic("Wrong obj->encoding in addReply()");
381     }
382 }
383 
384 /* Add the SDS 's' string to the client output buffer, as a side effect
385  * the SDS string is freed. */
addReplySds(client * c,sds s)386 void addReplySds(client *c, sds s) {
387     if (prepareClientToWrite(c) != C_OK) {
388         /* The caller expects the sds to be free'd. */
389         sdsfree(s);
390         return;
391     }
392     _addReplyToBufferOrList(c,s,sdslen(s));
393     sdsfree(s);
394 }
395 
396 /* This low level function just adds whatever protocol you send it to the
397  * client buffer, trying the static buffer initially, and using the string
398  * of objects if not possible.
399  *
400  * It is efficient because does not create an SDS object nor an Redis object
401  * if not needed. The object will only be created by calling
402  * _addReplyProtoToList() if we fail to extend the existing tail object
403  * in the list of objects. */
addReplyProto(client * c,const char * s,size_t len)404 void addReplyProto(client *c, const char *s, size_t len) {
405     if (prepareClientToWrite(c) != C_OK) return;
406     _addReplyToBufferOrList(c,s,len);
407 }
408 
409 /* Low level function called by the addReplyError...() functions.
410  * It emits the protocol for a Redis error, in the form:
411  *
412  * -ERRORCODE Error Message<CR><LF>
413  *
414  * If the error code is already passed in the string 's', the error
415  * code provided is used, otherwise the string "-ERR " for the generic
416  * error code is automatically added.
417  * Note that 's' must NOT end with \r\n. */
addReplyErrorLength(client * c,const char * s,size_t len)418 void addReplyErrorLength(client *c, const char *s, size_t len) {
419     /* If the string already starts with "-..." then the error code
420      * is provided by the caller. Otherwise we use "-ERR". */
421     if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5);
422     addReplyProto(c,s,len);
423     addReplyProto(c,"\r\n",2);
424 }
425 
426 /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
afterErrorReply(client * c,const char * s,size_t len)427 void afterErrorReply(client *c, const char *s, size_t len) {
428     /* Increment the global error counter */
429     server.stat_total_error_replies++;
430     /* Increment the error stats
431      * If the string already starts with "-..." then the error prefix
432      * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */
433     if (s[0] != '-') {
434         incrementErrorCount("ERR", 3);
435     } else {
436         char *spaceloc = memchr(s, ' ', len < 32 ? len : 32);
437         if (spaceloc) {
438             const size_t errEndPos = (size_t)(spaceloc - s);
439             incrementErrorCount(s+1, errEndPos-1);
440         } else {
441             /* Fallback to ERR if we can't retrieve the error prefix */
442             incrementErrorCount("ERR", 3);
443         }
444     }
445 
446     /* Sometimes it could be normal that a slave replies to a master with
447      * an error and this function gets called. Actually the error will never
448      * be sent because addReply*() against master clients has no effect...
449      * A notable example is:
450      *
451      *    EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x
452      *
453      * Where the master must propagate the first change even if the second
454      * will produce an error. However it is useful to log such events since
455      * they are rare and may hint at errors in a script or a bug in Redis. */
456     int ctype = getClientType(c);
457     if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE || c->id == CLIENT_ID_AOF) {
458         char *to, *from;
459 
460         if (c->id == CLIENT_ID_AOF) {
461             to = "AOF-loading-client";
462             from = "server";
463         } else if (ctype == CLIENT_TYPE_MASTER) {
464             to = "master";
465             from = "replica";
466         } else {
467             to = "replica";
468             from = "master";
469         }
470 
471         if (len > 4096) len = 4096;
472         char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
473         serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
474                              "to its %s: '%.*s' after processing the command "
475                              "'%s'", from, to, (int)len, s, cmdname);
476         if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog &&
477             server.repl_backlog->histlen > 0)
478         {
479             showLatestBacklog();
480         }
481         server.stat_unexpected_error_replies++;
482     }
483 }
484 
485 /* The 'err' object is expected to start with -ERRORCODE and end with \r\n.
486  * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
addReplyErrorObject(client * c,robj * err)487 void addReplyErrorObject(client *c, robj *err) {
488     addReply(c, err);
489     afterErrorReply(c, err->ptr, sdslen(err->ptr)-2); /* Ignore trailing \r\n */
490 }
491 
492 /* Sends either a reply or an error reply by checking the first char.
493  * If the first char is '-' the reply is considered an error.
494  * In any case the given reply is sent, if the reply is also recognize
495  * as an error we also perform some post reply operations such as
496  * logging and stats update. */
addReplyOrErrorObject(client * c,robj * reply)497 void addReplyOrErrorObject(client *c, robj *reply) {
498     serverAssert(sdsEncodedObject(reply));
499     sds rep = reply->ptr;
500     if (sdslen(rep) > 1 && rep[0] == '-') {
501         addReplyErrorObject(c, reply);
502     } else {
503         addReply(c, reply);
504     }
505 }
506 
507 /* See addReplyErrorLength for expectations from the input string. */
addReplyError(client * c,const char * err)508 void addReplyError(client *c, const char *err) {
509     addReplyErrorLength(c,err,strlen(err));
510     afterErrorReply(c,err,strlen(err));
511 }
512 
513 /* See addReplyErrorLength for expectations from the input string. */
514 /* As a side effect the SDS string is freed. */
addReplyErrorSds(client * c,sds err)515 void addReplyErrorSds(client *c, sds err) {
516     addReplyErrorLength(c,err,sdslen(err));
517     afterErrorReply(c,err,sdslen(err));
518     sdsfree(err);
519 }
520 
521 /* See addReplyErrorLength for expectations from the formatted string.
522  * The formatted string is safe to contain \r and \n anywhere. */
addReplyErrorFormat(client * c,const char * fmt,...)523 void addReplyErrorFormat(client *c, const char *fmt, ...) {
524     va_list ap;
525     va_start(ap,fmt);
526     sds s = sdscatvprintf(sdsempty(),fmt,ap);
527     va_end(ap);
528     /* Trim any newlines at the end (ones will be added by addReplyErrorLength) */
529     s = sdstrim(s, "\r\n");
530     /* Make sure there are no newlines in the middle of the string, otherwise
531      * invalid protocol is emitted. */
532     s = sdsmapchars(s, "\r\n", "  ",  2);
533     addReplyErrorLength(c,s,sdslen(s));
534     afterErrorReply(c,s,sdslen(s));
535     sdsfree(s);
536 }
537 
addReplyStatusLength(client * c,const char * s,size_t len)538 void addReplyStatusLength(client *c, const char *s, size_t len) {
539     addReplyProto(c,"+",1);
540     addReplyProto(c,s,len);
541     addReplyProto(c,"\r\n",2);
542 }
543 
addReplyStatus(client * c,const char * status)544 void addReplyStatus(client *c, const char *status) {
545     addReplyStatusLength(c,status,strlen(status));
546 }
547 
addReplyStatusFormat(client * c,const char * fmt,...)548 void addReplyStatusFormat(client *c, const char *fmt, ...) {
549     va_list ap;
550     va_start(ap,fmt);
551     sds s = sdscatvprintf(sdsempty(),fmt,ap);
552     va_end(ap);
553     addReplyStatusLength(c,s,sdslen(s));
554     sdsfree(s);
555 }
556 
557 /* Sometimes we are forced to create a new reply node, and we can't append to
558  * the previous one, when that happens, we wanna try to trim the unused space
559  * at the end of the last reply node which we won't use anymore. */
trimReplyUnusedTailSpace(client * c)560 void trimReplyUnusedTailSpace(client *c) {
561     listNode *ln = listLast(c->reply);
562     clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
563 
564     /* Note that 'tail' may be NULL even if we have a tail node, because when
565      * addReplyDeferredLen() is used */
566     if (!tail) return;
567 
568     /* We only try to trim the space is relatively high (more than a 1/4 of the
569      * allocation), otherwise there's a high chance realloc will NOP.
570      * Also, to avoid large memmove which happens as part of realloc, we only do
571      * that if the used part is small.  */
572     if (tail->size - tail->used > tail->size / 4 &&
573         tail->used < PROTO_REPLY_CHUNK_BYTES)
574     {
575         size_t old_size = tail->size;
576         tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock));
577         /* take over the allocation's internal fragmentation (at least for
578          * memory usage tracking) */
579         tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock);
580         c->reply_bytes = c->reply_bytes + tail->size - old_size;
581         listNodeValue(ln) = tail;
582     }
583 }
584 
585 /* Adds an empty object to the reply list that will contain the multi bulk
586  * length, which is not known when this function is called. */
addReplyDeferredLen(client * c)587 void *addReplyDeferredLen(client *c) {
588     /* Note that we install the write event here even if the object is not
589      * ready to be sent, since we are sure that before returning to the
590      * event loop setDeferredAggregateLen() will be called. */
591     if (prepareClientToWrite(c) != C_OK) return NULL;
592     trimReplyUnusedTailSpace(c);
593     listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
594     return listLast(c->reply);
595 }
596 
setDeferredReply(client * c,void * node,const char * s,size_t length)597 void setDeferredReply(client *c, void *node, const char *s, size_t length) {
598     listNode *ln = (listNode*)node;
599     clientReplyBlock *next, *prev;
600 
601     /* Abort when *node is NULL: when the client should not accept writes
602      * we return NULL in addReplyDeferredLen() */
603     if (node == NULL) return;
604     serverAssert(!listNodeValue(ln));
605 
606     /* Normally we fill this dummy NULL node, added by addReplyDeferredLen(),
607      * with a new buffer structure containing the protocol needed to specify
608      * the length of the array following. However sometimes there might be room
609      * in the previous/next node so we can instead remove this NULL node, and
610      * suffix/prefix our data in the node immediately before/after it, in order
611      * to save a write(2) syscall later. Conditions needed to do it:
612      *
613      * - The prev node is non-NULL and has space in it or
614      * - The next node is non-NULL,
615      * - It has enough room already allocated
616      * - And not too large (avoid large memmove) */
617     if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) &&
618         prev->size - prev->used > 0)
619     {
620         size_t len_to_copy = prev->size - prev->used;
621         if (len_to_copy > length)
622             len_to_copy = length;
623         memcpy(prev->buf + prev->used, s, len_to_copy);
624         prev->used += len_to_copy;
625         length -= len_to_copy;
626         if (length == 0) {
627             listDelNode(c->reply, ln);
628             return;
629         }
630         s += len_to_copy;
631     }
632 
633     if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
634         next->size - next->used >= length &&
635         next->used < PROTO_REPLY_CHUNK_BYTES * 4)
636     {
637         memmove(next->buf + length, next->buf, next->used);
638         memcpy(next->buf, s, length);
639         next->used += length;
640         listDelNode(c->reply,ln);
641     } else {
642         /* Create a new node */
643         clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock));
644         /* Take over the allocation's internal fragmentation */
645         buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock);
646         buf->used = length;
647         memcpy(buf->buf, s, length);
648         listNodeValue(ln) = buf;
649         c->reply_bytes += buf->size;
650 
651         closeClientOnOutputBufferLimitReached(c, 1);
652     }
653 }
654 
655 /* Populate the length object and try gluing it to the next chunk. */
setDeferredAggregateLen(client * c,void * node,long length,char prefix)656 void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
657     serverAssert(length >= 0);
658 
659     /* Abort when *node is NULL: when the client should not accept writes
660      * we return NULL in addReplyDeferredLen() */
661     if (node == NULL) return;
662 
663     char lenstr[128];
664     size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
665     setDeferredReply(c, node, lenstr, lenstr_len);
666 }
667 
setDeferredArrayLen(client * c,void * node,long length)668 void setDeferredArrayLen(client *c, void *node, long length) {
669     setDeferredAggregateLen(c,node,length,'*');
670 }
671 
setDeferredMapLen(client * c,void * node,long length)672 void setDeferredMapLen(client *c, void *node, long length) {
673     int prefix = c->resp == 2 ? '*' : '%';
674     if (c->resp == 2) length *= 2;
675     setDeferredAggregateLen(c,node,length,prefix);
676 }
677 
setDeferredSetLen(client * c,void * node,long length)678 void setDeferredSetLen(client *c, void *node, long length) {
679     int prefix = c->resp == 2 ? '*' : '~';
680     setDeferredAggregateLen(c,node,length,prefix);
681 }
682 
setDeferredAttributeLen(client * c,void * node,long length)683 void setDeferredAttributeLen(client *c, void *node, long length) {
684     serverAssert(c->resp >= 3);
685     setDeferredAggregateLen(c,node,length,'|');
686 }
687 
setDeferredPushLen(client * c,void * node,long length)688 void setDeferredPushLen(client *c, void *node, long length) {
689     serverAssert(c->resp >= 3);
690     setDeferredAggregateLen(c,node,length,'>');
691 }
692 
693 /* Add a double as a bulk reply */
addReplyDouble(client * c,double d)694 void addReplyDouble(client *c, double d) {
695     if (isinf(d)) {
696         /* Libc in odd systems (Hi Solaris!) will format infinite in a
697          * different way, so better to handle it in an explicit way. */
698         if (c->resp == 2) {
699             addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
700         } else {
701             addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n",
702                               d > 0 ? 6 : 7);
703         }
704     } else {
705         char dbuf[MAX_LONG_DOUBLE_CHARS+3],
706              sbuf[MAX_LONG_DOUBLE_CHARS+32];
707         int dlen, slen;
708         if (c->resp == 2) {
709             dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
710             slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
711             addReplyProto(c,sbuf,slen);
712         } else {
713             dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d);
714             addReplyProto(c,dbuf,dlen);
715         }
716     }
717 }
718 
addReplyBigNum(client * c,const char * num,size_t len)719 void addReplyBigNum(client *c, const char* num, size_t len) {
720     if (c->resp == 2) {
721         addReplyBulkCBuffer(c, num, len);
722     } else {
723         addReplyProto(c,"(",1);
724         addReplyProto(c,num,len);
725         addReply(c,shared.crlf);
726     }
727 }
728 
729 /* Add a long double as a bulk reply, but uses a human readable formatting
730  * of the double instead of exposing the crude behavior of doubles to the
731  * dear user. */
addReplyHumanLongDouble(client * c,long double d)732 void addReplyHumanLongDouble(client *c, long double d) {
733     if (c->resp == 2) {
734         robj *o = createStringObjectFromLongDouble(d,1);
735         addReplyBulk(c,o);
736         decrRefCount(o);
737     } else {
738         char buf[MAX_LONG_DOUBLE_CHARS];
739         int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
740         addReplyProto(c,",",1);
741         addReplyProto(c,buf,len);
742         addReplyProto(c,"\r\n",2);
743     }
744 }
745 
746 /* Add a long long as integer reply or bulk len / multi bulk count.
747  * Basically this is used to output <prefix><long long><crlf>. */
addReplyLongLongWithPrefix(client * c,long long ll,char prefix)748 void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
749     char buf[128];
750     int len;
751 
752     /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
753      * so we have a few shared objects to use if the integer is small
754      * like it is most of the times. */
755     if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
756         addReply(c,shared.mbulkhdr[ll]);
757         return;
758     } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
759         addReply(c,shared.bulkhdr[ll]);
760         return;
761     }
762 
763     buf[0] = prefix;
764     len = ll2string(buf+1,sizeof(buf)-1,ll);
765     buf[len+1] = '\r';
766     buf[len+2] = '\n';
767     addReplyProto(c,buf,len+3);
768 }
769 
addReplyLongLong(client * c,long long ll)770 void addReplyLongLong(client *c, long long ll) {
771     if (ll == 0)
772         addReply(c,shared.czero);
773     else if (ll == 1)
774         addReply(c,shared.cone);
775     else
776         addReplyLongLongWithPrefix(c,ll,':');
777 }
778 
addReplyAggregateLen(client * c,long length,int prefix)779 void addReplyAggregateLen(client *c, long length, int prefix) {
780     serverAssert(length >= 0);
781     addReplyLongLongWithPrefix(c,length,prefix);
782 }
783 
addReplyArrayLen(client * c,long length)784 void addReplyArrayLen(client *c, long length) {
785     addReplyAggregateLen(c,length,'*');
786 }
787 
addReplyMapLen(client * c,long length)788 void addReplyMapLen(client *c, long length) {
789     int prefix = c->resp == 2 ? '*' : '%';
790     if (c->resp == 2) length *= 2;
791     addReplyAggregateLen(c,length,prefix);
792 }
793 
addReplySetLen(client * c,long length)794 void addReplySetLen(client *c, long length) {
795     int prefix = c->resp == 2 ? '*' : '~';
796     addReplyAggregateLen(c,length,prefix);
797 }
798 
addReplyAttributeLen(client * c,long length)799 void addReplyAttributeLen(client *c, long length) {
800     serverAssert(c->resp >= 3);
801     addReplyAggregateLen(c,length,'|');
802 }
803 
addReplyPushLen(client * c,long length)804 void addReplyPushLen(client *c, long length) {
805     serverAssert(c->resp >= 3);
806     addReplyAggregateLen(c,length,'>');
807 }
808 
addReplyNull(client * c)809 void addReplyNull(client *c) {
810     if (c->resp == 2) {
811         addReplyProto(c,"$-1\r\n",5);
812     } else {
813         addReplyProto(c,"_\r\n",3);
814     }
815 }
816 
addReplyBool(client * c,int b)817 void addReplyBool(client *c, int b) {
818     if (c->resp == 2) {
819         addReply(c, b ? shared.cone : shared.czero);
820     } else {
821         addReplyProto(c, b ? "#t\r\n" : "#f\r\n",4);
822     }
823 }
824 
825 /* A null array is a concept that no longer exists in RESP3. However
826  * RESP2 had it, so API-wise we have this call, that will emit the correct
827  * RESP2 protocol, however for RESP3 the reply will always be just the
828  * Null type "_\r\n". */
addReplyNullArray(client * c)829 void addReplyNullArray(client *c) {
830     if (c->resp == 2) {
831         addReplyProto(c,"*-1\r\n",5);
832     } else {
833         addReplyProto(c,"_\r\n",3);
834     }
835 }
836 
837 /* Create the length prefix of a bulk reply, example: $2234 */
addReplyBulkLen(client * c,robj * obj)838 void addReplyBulkLen(client *c, robj *obj) {
839     size_t len = stringObjectLen(obj);
840 
841     addReplyLongLongWithPrefix(c,len,'$');
842 }
843 
844 /* Add a Redis Object as a bulk reply */
addReplyBulk(client * c,robj * obj)845 void addReplyBulk(client *c, robj *obj) {
846     addReplyBulkLen(c,obj);
847     addReply(c,obj);
848     addReply(c,shared.crlf);
849 }
850 
851 /* Add a C buffer as bulk reply */
addReplyBulkCBuffer(client * c,const void * p,size_t len)852 void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
853     addReplyLongLongWithPrefix(c,len,'$');
854     addReplyProto(c,p,len);
855     addReply(c,shared.crlf);
856 }
857 
858 /* Add sds to reply (takes ownership of sds and frees it) */
addReplyBulkSds(client * c,sds s)859 void addReplyBulkSds(client *c, sds s)  {
860     addReplyLongLongWithPrefix(c,sdslen(s),'$');
861     addReplySds(c,s);
862     addReply(c,shared.crlf);
863 }
864 
865 /* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */
setDeferredReplyBulkSds(client * c,void * node,sds s)866 void setDeferredReplyBulkSds(client *c, void *node, sds s) {
867     sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s);
868     setDeferredReply(c, node, reply, sdslen(reply));
869     sdsfree(reply);
870     sdsfree(s);
871 }
872 
873 /* Add a C null term string as bulk reply */
addReplyBulkCString(client * c,const char * s)874 void addReplyBulkCString(client *c, const char *s) {
875     if (s == NULL) {
876         addReplyNull(c);
877     } else {
878         addReplyBulkCBuffer(c,s,strlen(s));
879     }
880 }
881 
882 /* Add a long long as a bulk reply */
addReplyBulkLongLong(client * c,long long ll)883 void addReplyBulkLongLong(client *c, long long ll) {
884     char buf[64];
885     int len;
886 
887     len = ll2string(buf,64,ll);
888     addReplyBulkCBuffer(c,buf,len);
889 }
890 
891 /* Reply with a verbatim type having the specified extension.
892  *
893  * The 'ext' is the "extension" of the file, actually just a three
894  * character type that describes the format of the verbatim string.
895  * For instance "txt" means it should be interpreted as a text only
896  * file by the receiver, "md " as markdown, and so forth. Only the
897  * three first characters of the extension are used, and if the
898  * provided one is shorter than that, the remaining is filled with
899  * spaces. */
addReplyVerbatim(client * c,const char * s,size_t len,const char * ext)900 void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
901     if (c->resp == 2) {
902         addReplyBulkCBuffer(c,s,len);
903     } else {
904         char buf[32];
905         size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
906         char *p = buf+preflen-4;
907         for (int i = 0; i < 3; i++) {
908             if (*ext == '\0') {
909                 p[i] = ' ';
910             } else {
911                 p[i] = *ext++;
912             }
913         }
914         addReplyProto(c,buf,preflen);
915         addReplyProto(c,s,len);
916         addReplyProto(c,"\r\n",2);
917     }
918 }
919 
920 /* Add an array of C strings as status replies with a heading.
921  * This function is typically invoked by from commands that support
922  * subcommands in response to the 'help' subcommand. The help array
923  * is terminated by NULL sentinel. */
addReplyHelp(client * c,const char ** help)924 void addReplyHelp(client *c, const char **help) {
925     sds cmd = sdsnew((char*) c->argv[0]->ptr);
926     void *blenp = addReplyDeferredLen(c);
927     int blen = 0;
928 
929     sdstoupper(cmd);
930     addReplyStatusFormat(c,
931         "%s <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",cmd);
932     sdsfree(cmd);
933 
934     while (help[blen]) addReplyStatus(c,help[blen++]);
935 
936     addReplyStatus(c,"HELP");
937     addReplyStatus(c,"    Prints this help.");
938 
939     blen += 1;  /* Account for the header. */
940     blen += 2;  /* Account for the footer. */
941     setDeferredArrayLen(c,blenp,blen);
942 }
943 
944 /* Add a suggestive error reply.
945  * This function is typically invoked by from commands that support
946  * subcommands in response to an unknown subcommand or argument error. */
addReplySubcommandSyntaxError(client * c)947 void addReplySubcommandSyntaxError(client *c) {
948     sds cmd = sdsnew((char*) c->argv[0]->ptr);
949     sdstoupper(cmd);
950     addReplyErrorFormat(c,
951         "Unknown subcommand or wrong number of arguments for '%s'. Try %s HELP.",
952         (char*)c->argv[1]->ptr,cmd);
953     sdsfree(cmd);
954 }
955 
956 /* Append 'src' client output buffers into 'dst' client output buffers.
957  * This function clears the output buffers of 'src' */
AddReplyFromClient(client * dst,client * src)958 void AddReplyFromClient(client *dst, client *src) {
959     /* If the source client contains a partial response due to client output
960      * buffer limits, propagate that to the dest rather than copy a partial
961      * reply. We don't wanna run the risk of copying partial response in case
962      * for some reason the output limits don't reach the same decision (maybe
963      * they changed) */
964     if (src->flags & CLIENT_CLOSE_ASAP) {
965         sds client = catClientInfoString(sdsempty(),dst);
966         freeClientAsync(dst);
967         serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
968         sdsfree(client);
969         return;
970     }
971 
972     /* First add the static buffer (either into the static buffer or reply list) */
973     addReplyProto(dst,src->buf, src->bufpos);
974 
975     /* We need to check with prepareClientToWrite again (after addReplyProto)
976      * since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */
977     if (prepareClientToWrite(dst) != C_OK)
978         return;
979 
980     /* We're bypassing _addReplyProtoToList, so we need to add the pre/post
981      * checks in it. */
982     if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return;
983 
984     /* Concatenate the reply list into the dest */
985     if (listLength(src->reply))
986         listJoin(dst->reply,src->reply);
987     dst->reply_bytes += src->reply_bytes;
988     src->reply_bytes = 0;
989     src->bufpos = 0;
990 
991     /* Check output buffer limits */
992     closeClientOnOutputBufferLimitReached(dst, 1);
993 }
994 
995 /* Logically copy 'src' replica client buffers info to 'dst' replica.
996  * Basically increase referenced buffer block node reference count. */
copyReplicaOutputBuffer(client * dst,client * src)997 void copyReplicaOutputBuffer(client *dst, client *src) {
998     serverAssert(src->bufpos == 0 && listLength(src->reply) == 0);
999 
1000     if (src->ref_repl_buf_node == NULL) return;
1001     dst->ref_repl_buf_node = src->ref_repl_buf_node;
1002     dst->ref_block_pos = src->ref_block_pos;
1003     ((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++;
1004 }
1005 
1006 /* Return true if the specified client has pending reply buffers to write to
1007  * the socket. */
clientHasPendingReplies(client * c)1008 int clientHasPendingReplies(client *c) {
1009     if (getClientType(c) == CLIENT_TYPE_SLAVE) {
1010         /* Replicas use global shared replication buffer instead of
1011          * private output buffer. */
1012         serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
1013         if (c->ref_repl_buf_node == NULL) return 0;
1014 
1015         /* If the last replication buffer block content is totally sent,
1016          * we have nothing to send. */
1017         listNode *ln = listLast(server.repl_buffer_blocks);
1018         replBufBlock *tail = listNodeValue(ln);
1019         if (ln == c->ref_repl_buf_node &&
1020             c->ref_block_pos == tail->used) return 0;
1021 
1022         return 1;
1023     } else {
1024         return c->bufpos || listLength(c->reply);
1025     }
1026 }
1027 
clientAcceptHandler(connection * conn)1028 void clientAcceptHandler(connection *conn) {
1029     client *c = connGetPrivateData(conn);
1030 
1031     if (connGetState(conn) != CONN_STATE_CONNECTED) {
1032         serverLog(LL_WARNING,
1033                 "Error accepting a client connection: %s",
1034                 connGetLastError(conn));
1035         freeClientAsync(c);
1036         return;
1037     }
1038 
1039     /* If the server is running in protected mode (the default) and there
1040      * is no password set, nor a specific interface is bound, we don't accept
1041      * requests from non loopback interfaces. Instead we try to explain the
1042      * user what to do to fix it if needed. */
1043     if (server.protected_mode &&
1044         DefaultUser->flags & USER_FLAG_NOPASS &&
1045         !(c->flags & CLIENT_UNIX_SOCKET))
1046     {
1047         char cip[NET_IP_STR_LEN+1] = { 0 };
1048         connPeerToString(conn, cip, sizeof(cip)-1, NULL);
1049 
1050         if (strcmp(cip,"127.0.0.1") && strcmp(cip,"::1")) {
1051             char *err =
1052                 "-DENIED Redis is running in protected mode because protected "
1053                 "mode is enabled and no password is set for the default user. "
1054                 "In this mode connections are only accepted from the loopback interface. "
1055                 "If you want to connect from external computers to Redis you "
1056                 "may adopt one of the following solutions: "
1057                 "1) Just disable protected mode sending the command "
1058                 "'CONFIG SET protected-mode no' from the loopback interface "
1059                 "by connecting to Redis from the same host the server is "
1060                 "running, however MAKE SURE Redis is not publicly accessible "
1061                 "from internet if you do so. Use CONFIG REWRITE to make this "
1062                 "change permanent. "
1063                 "2) Alternatively you can just disable the protected mode by "
1064                 "editing the Redis configuration file, and setting the protected "
1065                 "mode option to 'no', and then restarting the server. "
1066                 "3) If you started the server manually just for testing, restart "
1067                 "it with the '--protected-mode no' option. "
1068                 "4) Setup a an authentication password for the default user. "
1069                 "NOTE: You only need to do one of the above things in order for "
1070                 "the server to start accepting connections from the outside.\r\n";
1071             if (connWrite(c->conn,err,strlen(err)) == -1) {
1072                 /* Nothing to do, Just to avoid the warning... */
1073             }
1074             server.stat_rejected_conn++;
1075             freeClientAsync(c);
1076             return;
1077         }
1078     }
1079 
1080     server.stat_numconnections++;
1081     moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,
1082                           REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,
1083                           c);
1084 }
1085 
1086 #define MAX_ACCEPTS_PER_CALL 1000
acceptCommonHandler(connection * conn,int flags,char * ip)1087 static void acceptCommonHandler(connection *conn, int flags, char *ip) {
1088     client *c;
1089     char conninfo[100];
1090     UNUSED(ip);
1091 
1092     if (connGetState(conn) != CONN_STATE_ACCEPTING) {
1093         serverLog(LL_VERBOSE,
1094             "Accepted client connection in error state: %s (conn: %s)",
1095             connGetLastError(conn),
1096             connGetInfo(conn, conninfo, sizeof(conninfo)));
1097         connClose(conn);
1098         return;
1099     }
1100 
1101     /* Limit the number of connections we take at the same time.
1102      *
1103      * Admission control will happen before a client is created and connAccept()
1104      * called, because we don't want to even start transport-level negotiation
1105      * if rejected. */
1106     if (listLength(server.clients) + getClusterConnectionsCount()
1107         >= server.maxclients)
1108     {
1109         char *err;
1110         if (server.cluster_enabled)
1111             err = "-ERR max number of clients + cluster "
1112                   "connections reached\r\n";
1113         else
1114             err = "-ERR max number of clients reached\r\n";
1115 
1116         /* That's a best effort error message, don't check write errors.
1117          * Note that for TLS connections, no handshake was done yet so nothing
1118          * is written and the connection will just drop. */
1119         if (connWrite(conn,err,strlen(err)) == -1) {
1120             /* Nothing to do, Just to avoid the warning... */
1121         }
1122         server.stat_rejected_conn++;
1123         connClose(conn);
1124         return;
1125     }
1126 
1127     /* Create connection and client */
1128     if ((c = createClient(conn)) == NULL) {
1129         serverLog(LL_WARNING,
1130             "Error registering fd event for the new client: %s (conn: %s)",
1131             connGetLastError(conn),
1132             connGetInfo(conn, conninfo, sizeof(conninfo)));
1133         connClose(conn); /* May be already closed, just ignore errors */
1134         return;
1135     }
1136 
1137     /* Last chance to keep flags */
1138     c->flags |= flags;
1139 
1140     /* Initiate accept.
1141      *
1142      * Note that connAccept() is free to do two things here:
1143      * 1. Call clientAcceptHandler() immediately;
1144      * 2. Schedule a future call to clientAcceptHandler().
1145      *
1146      * Because of that, we must do nothing else afterwards.
1147      */
1148     if (connAccept(conn, clientAcceptHandler) == C_ERR) {
1149         char conninfo[100];
1150         if (connGetState(conn) == CONN_STATE_ERROR)
1151             serverLog(LL_WARNING,
1152                     "Error accepting a client connection: %s (conn: %s)",
1153                     connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
1154         freeClient(connGetPrivateData(conn));
1155         return;
1156     }
1157 }
1158 
acceptTcpHandler(aeEventLoop * el,int fd,void * privdata,int mask)1159 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1160     int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
1161     char cip[NET_IP_STR_LEN];
1162     UNUSED(el);
1163     UNUSED(mask);
1164     UNUSED(privdata);
1165 
1166     while(max--) {
1167         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
1168         if (cfd == ANET_ERR) {
1169             if (errno != EWOULDBLOCK)
1170                 serverLog(LL_WARNING,
1171                     "Accepting client connection: %s", server.neterr);
1172             return;
1173         }
1174         serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
1175         acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
1176     }
1177 }
1178 
acceptTLSHandler(aeEventLoop * el,int fd,void * privdata,int mask)1179 void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1180     int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
1181     char cip[NET_IP_STR_LEN];
1182     UNUSED(el);
1183     UNUSED(mask);
1184     UNUSED(privdata);
1185 
1186     while(max--) {
1187         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
1188         if (cfd == ANET_ERR) {
1189             if (errno != EWOULDBLOCK)
1190                 serverLog(LL_WARNING,
1191                     "Accepting client connection: %s", server.neterr);
1192             return;
1193         }
1194         serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
1195         acceptCommonHandler(connCreateAcceptedTLS(cfd, server.tls_auth_clients),0,cip);
1196     }
1197 }
1198 
acceptUnixHandler(aeEventLoop * el,int fd,void * privdata,int mask)1199 void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1200     int cfd, max = MAX_ACCEPTS_PER_CALL;
1201     UNUSED(el);
1202     UNUSED(mask);
1203     UNUSED(privdata);
1204 
1205     while(max--) {
1206         cfd = anetUnixAccept(server.neterr, fd);
1207         if (cfd == ANET_ERR) {
1208             if (errno != EWOULDBLOCK)
1209                 serverLog(LL_WARNING,
1210                     "Accepting client connection: %s", server.neterr);
1211             return;
1212         }
1213         serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
1214         acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL);
1215     }
1216 }
1217 
freeClientOriginalArgv(client * c)1218 void freeClientOriginalArgv(client *c) {
1219     /* We didn't rewrite this client */
1220     if (!c->original_argv) return;
1221 
1222     for (int j = 0; j < c->original_argc; j++)
1223         decrRefCount(c->original_argv[j]);
1224     zfree(c->original_argv);
1225     c->original_argv = NULL;
1226     c->original_argc = 0;
1227 }
1228 
freeClientArgv(client * c)1229 void freeClientArgv(client *c) {
1230     int j;
1231     for (j = 0; j < c->argc; j++)
1232         decrRefCount(c->argv[j]);
1233     c->argc = 0;
1234     c->cmd = NULL;
1235     c->argv_len_sum = 0;
1236     c->argv_len = 0;
1237     zfree(c->argv);
1238     c->argv = NULL;
1239 }
1240 
1241 /* Close all the slaves connections. This is useful in chained replication
1242  * when we resync with our own master and want to force all our slaves to
1243  * resync with us as well. */
disconnectSlaves(void)1244 void disconnectSlaves(void) {
1245     listIter li;
1246     listNode *ln;
1247     listRewind(server.slaves,&li);
1248     while((ln = listNext(&li))) {
1249         freeClient((client*)ln->value);
1250     }
1251 }
1252 
1253 /* Check if there is any other slave waiting dumping RDB finished expect me.
1254  * This function is useful to judge current dumping RDB can be used for full
1255  * synchronization or not. */
anyOtherSlaveWaitRdb(client * except_me)1256 int anyOtherSlaveWaitRdb(client *except_me) {
1257     listIter li;
1258     listNode *ln;
1259 
1260     listRewind(server.slaves, &li);
1261     while((ln = listNext(&li))) {
1262         client *slave = ln->value;
1263         if (slave != except_me &&
1264             slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
1265         {
1266             return 1;
1267         }
1268     }
1269     return 0;
1270 }
1271 
1272 /* Remove the specified client from global lists where the client could
1273  * be referenced, not including the Pub/Sub channels.
1274  * This is used by freeClient() and replicationCacheMaster(). */
unlinkClient(client * c)1275 void unlinkClient(client *c) {
1276     listNode *ln;
1277 
1278     /* If this is marked as current client unset it. */
1279     if (server.current_client == c) server.current_client = NULL;
1280 
1281     /* Certain operations must be done only if the client has an active connection.
1282      * If the client was already unlinked or if it's a "fake client" the
1283      * conn is already set to NULL. */
1284     if (c->conn) {
1285         /* Remove from the list of active clients. */
1286         if (c->client_list_node) {
1287             uint64_t id = htonu64(c->id);
1288             raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL);
1289             listDelNode(server.clients,c->client_list_node);
1290             c->client_list_node = NULL;
1291         }
1292 
1293         /* Check if this is a replica waiting for diskless replication (rdb pipe),
1294          * in which case it needs to be cleaned from that list */
1295         if (c->flags & CLIENT_SLAVE &&
1296             c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
1297             server.rdb_pipe_conns)
1298         {
1299             int i;
1300             for (i=0; i < server.rdb_pipe_numconns; i++) {
1301                 if (server.rdb_pipe_conns[i] == c->conn) {
1302                     rdbPipeWriteHandlerConnRemoved(c->conn);
1303                     server.rdb_pipe_conns[i] = NULL;
1304                     break;
1305                 }
1306             }
1307         }
1308         connClose(c->conn);
1309         c->conn = NULL;
1310     }
1311 
1312     /* Remove from the list of pending writes if needed. */
1313     if (c->flags & CLIENT_PENDING_WRITE) {
1314         ln = listSearchKey(server.clients_pending_write,c);
1315         serverAssert(ln != NULL);
1316         listDelNode(server.clients_pending_write,ln);
1317         c->flags &= ~CLIENT_PENDING_WRITE;
1318     }
1319 
1320     /* Remove from the list of pending reads if needed. */
1321     serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
1322     if (c->pending_read_list_node != NULL) {
1323         listDelNode(server.clients_pending_read,c->pending_read_list_node);
1324         c->pending_read_list_node = NULL;
1325     }
1326 
1327 
1328     /* When client was just unblocked because of a blocking operation,
1329      * remove it from the list of unblocked clients. */
1330     if (c->flags & CLIENT_UNBLOCKED) {
1331         ln = listSearchKey(server.unblocked_clients,c);
1332         serverAssert(ln != NULL);
1333         listDelNode(server.unblocked_clients,ln);
1334         c->flags &= ~CLIENT_UNBLOCKED;
1335     }
1336 
1337     /* Clear the tracking status. */
1338     if (c->flags & CLIENT_TRACKING) disableTracking(c);
1339 }
1340 
freeClient(client * c)1341 void freeClient(client *c) {
1342     listNode *ln;
1343 
1344     /* If a client is protected, yet we need to free it right now, make sure
1345      * to at least use asynchronous freeing. */
1346     if (c->flags & CLIENT_PROTECTED) {
1347         freeClientAsync(c);
1348         return;
1349     }
1350 
1351     /* For connected clients, call the disconnection event of modules hooks. */
1352     if (c->conn) {
1353         moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,
1354                               REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED,
1355                               c);
1356     }
1357 
1358     /* Notify module system that this client auth status changed. */
1359     moduleNotifyUserChanged(c);
1360 
1361     /* If this client was scheduled for async freeing we need to remove it
1362      * from the queue. Note that we need to do this here, because later
1363      * we may call replicationCacheMaster() and the client should already
1364      * be removed from the list of clients to free. */
1365     if (c->flags & CLIENT_CLOSE_ASAP) {
1366         ln = listSearchKey(server.clients_to_close,c);
1367         serverAssert(ln != NULL);
1368         listDelNode(server.clients_to_close,ln);
1369     }
1370 
1371     /* If it is our master that's being disconnected we should make sure
1372      * to cache the state to try a partial resynchronization later.
1373      *
1374      * Note that before doing this we make sure that the client is not in
1375      * some unexpected state, by checking its flags. */
1376     if (server.master && c->flags & CLIENT_MASTER) {
1377         serverLog(LL_WARNING,"Connection with master lost.");
1378         if (!(c->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) {
1379             c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY);
1380             replicationCacheMaster(c);
1381             return;
1382         }
1383     }
1384 
1385     /* Log link disconnection with slave */
1386     if (getClientType(c) == CLIENT_TYPE_SLAVE) {
1387         serverLog(LL_WARNING,"Connection with replica %s lost.",
1388             replicationGetSlaveName(c));
1389     }
1390 
1391     /* Free the query buffer */
1392     sdsfree(c->querybuf);
1393     sdsfree(c->pending_querybuf);
1394     c->querybuf = NULL;
1395 
1396     /* Deallocate structures used to block on blocking ops. */
1397     if (c->flags & CLIENT_BLOCKED) unblockClient(c);
1398     dictRelease(c->bpop.keys);
1399 
1400     /* UNWATCH all the keys */
1401     unwatchAllKeys(c);
1402     listRelease(c->watched_keys);
1403 
1404     /* Unsubscribe from all the pubsub channels */
1405     pubsubUnsubscribeAllChannels(c,0);
1406     pubsubUnsubscribeAllPatterns(c,0);
1407     dictRelease(c->pubsub_channels);
1408     listRelease(c->pubsub_patterns);
1409 
1410     /* Free data structures. */
1411     listRelease(c->reply);
1412     freeReplicaReferencedReplBuffer(c);
1413     freeClientArgv(c);
1414     freeClientOriginalArgv(c);
1415 
1416     /* Unlink the client: this will close the socket, remove the I/O
1417      * handlers, and remove references of the client from different
1418      * places where active clients may be referenced. */
1419     unlinkClient(c);
1420 
1421     /* Master/slave cleanup Case 1:
1422      * we lost the connection with a slave. */
1423     if (c->flags & CLIENT_SLAVE) {
1424         /* If there is no any other slave waiting dumping RDB finished, the
1425          * current child process need not continue to dump RDB, then we kill it.
1426          * So child process won't use more memory, and we also can fork a new
1427          * child process asap to dump rdb for next full synchronization or bgsave.
1428          * But we also need to check if users enable 'save' RDB, if enable, we
1429          * should not remove directly since that means RDB is important for users
1430          * to keep data safe and we may delay configured 'save' for full sync. */
1431         if (server.saveparamslen == 0 &&
1432             c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
1433             server.child_type == CHILD_TYPE_RDB &&
1434             server.rdb_child_type == RDB_CHILD_TYPE_DISK &&
1435             anyOtherSlaveWaitRdb(c) == 0)
1436         {
1437             killRDBChild();
1438         }
1439         if (c->replstate == SLAVE_STATE_SEND_BULK) {
1440             if (c->repldbfd != -1) close(c->repldbfd);
1441             if (c->replpreamble) sdsfree(c->replpreamble);
1442         }
1443         list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
1444         ln = listSearchKey(l,c);
1445         serverAssert(ln != NULL);
1446         listDelNode(l,ln);
1447         /* We need to remember the time when we started to have zero
1448          * attached slaves, as after some time we'll free the replication
1449          * backlog. */
1450         if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0)
1451             server.repl_no_slaves_since = server.unixtime;
1452         refreshGoodSlavesCount();
1453         /* Fire the replica change modules event. */
1454         if (c->replstate == SLAVE_STATE_ONLINE)
1455             moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
1456                                   REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE,
1457                                   NULL);
1458     }
1459 
1460     /* Master/slave cleanup Case 2:
1461      * we lost the connection with the master. */
1462     if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
1463 
1464     /* Remove the contribution that this client gave to our
1465      * incrementally computed memory usage. */
1466     server.stat_clients_type_memory[c->last_memory_type] -=
1467         c->last_memory_usage;
1468     /* Remove client from memory usage buckets */
1469     if (c->mem_usage_bucket) {
1470         c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
1471         listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node);
1472     }
1473 
1474     /* Release other dynamically allocated client structure fields,
1475      * and finally release the client structure itself. */
1476     if (c->name) decrRefCount(c->name);
1477     freeClientMultiState(c);
1478     sdsfree(c->peerid);
1479     sdsfree(c->sockname);
1480     sdsfree(c->slave_addr);
1481     zfree(c);
1482 }
1483 
1484 /* Schedule a client to free it at a safe time in the serverCron() function.
1485  * This function is useful when we need to terminate a client but we are in
1486  * a context where calling freeClient() is not possible, because the client
1487  * should be valid for the continuation of the flow of the program. */
freeClientAsync(client * c)1488 void freeClientAsync(client *c) {
1489     /* We need to handle concurrent access to the server.clients_to_close list
1490      * only in the freeClientAsync() function, since it's the only function that
1491      * may access the list while Redis uses I/O threads. All the other accesses
1492      * are in the context of the main thread while the other threads are
1493      * idle. */
1494     if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
1495     c->flags |= CLIENT_CLOSE_ASAP;
1496     if (server.io_threads_num == 1) {
1497         /* no need to bother with locking if there's just one thread (the main thread) */
1498         listAddNodeTail(server.clients_to_close,c);
1499         return;
1500     }
1501     static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
1502     pthread_mutex_lock(&async_free_queue_mutex);
1503     listAddNodeTail(server.clients_to_close,c);
1504     pthread_mutex_unlock(&async_free_queue_mutex);
1505 }
1506 
1507 /* Perform processing of the client before moving on to processing the next client
1508  * this is useful for performing operations that affect the global state but can't
1509  * wait until we're done with all clients. In other words can't wait until beforeSleep()
1510  * return C_ERR in case client is no longer valid after call.
1511  * The input client argument: c, may be NULL in case the previous client was
1512  * freed before the call. */
beforeNextClient(client * c)1513 int beforeNextClient(client *c) {
1514     /* Skip the client processing if we're in an IO thread, in that case we'll perform
1515        this operation later (this function is called again) in the fan-in stage of the threading mechanism */
1516     if (io_threads_op != IO_THREADS_OP_IDLE)
1517         return C_OK;
1518     /* Handle async frees */
1519     /* Note: this doesn't make the server.clients_to_close list redundant because of
1520      * cases where we want an async free of a client other than myself. For example
1521      * in ACL modifications we disconnect clients authenticated to non-existent
1522      * users (see ACL LOAD). */
1523     if (c && (c->flags & CLIENT_CLOSE_ASAP)) {
1524         freeClient(c);
1525         return C_ERR;
1526     }
1527     return C_OK;
1528 }
1529 
1530 /* Free the clients marked as CLOSE_ASAP, return the number of clients
1531  * freed. */
freeClientsInAsyncFreeQueue(void)1532 int freeClientsInAsyncFreeQueue(void) {
1533     int freed = 0;
1534     listIter li;
1535     listNode *ln;
1536 
1537     listRewind(server.clients_to_close,&li);
1538     while ((ln = listNext(&li)) != NULL) {
1539         client *c = listNodeValue(ln);
1540 
1541         if (c->flags & CLIENT_PROTECTED) continue;
1542 
1543         c->flags &= ~CLIENT_CLOSE_ASAP;
1544         freeClient(c);
1545         listDelNode(server.clients_to_close,ln);
1546         freed++;
1547     }
1548     return freed;
1549 }
1550 
1551 /* Return a client by ID, or NULL if the client ID is not in the set
1552  * of registered clients. Note that "fake clients", created with -1 as FD,
1553  * are not registered clients. */
lookupClientByID(uint64_t id)1554 client *lookupClientByID(uint64_t id) {
1555     id = htonu64(id);
1556     client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id));
1557     return (c == raxNotFound) ? NULL : c;
1558 }
1559 
1560 /* This function does actual writing output buffers to different types of
1561  * clients, it is called by writeToClient.
1562  * If we write successfully, it return C_OK, otherwise, C_ERR is returned,
1563  * And 'nwritten' is a output parameter, it means how many bytes server write
1564  * to client. */
_writeToClient(client * c,ssize_t * nwritten)1565 int _writeToClient(client *c, ssize_t *nwritten) {
1566     *nwritten = 0;
1567     if (getClientType(c) == CLIENT_TYPE_SLAVE) {
1568         serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
1569 
1570         replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
1571         serverAssert(o->used >= c->ref_block_pos);
1572         /* Send current block if it is not fully sent. */
1573         if (o->used > c->ref_block_pos) {
1574             *nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
1575                                   o->used-c->ref_block_pos);
1576             if (*nwritten <= 0) return C_ERR;
1577             c->ref_block_pos += *nwritten;
1578         }
1579 
1580         /* If we fully sent the object on head, go to the next one. */
1581         listNode *next = listNextNode(c->ref_repl_buf_node);
1582         if (next && c->ref_block_pos == o->used) {
1583             o->refcount--;
1584             ((replBufBlock *)(listNodeValue(next)))->refcount++;
1585             c->ref_repl_buf_node = next;
1586             c->ref_block_pos = 0;
1587             incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
1588         }
1589         return C_OK;
1590     }
1591 
1592     if (c->bufpos > 0) {
1593         *nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
1594         if (*nwritten <= 0) return C_ERR;
1595         c->sentlen += *nwritten;
1596 
1597         /* If the buffer was sent, set bufpos to zero to continue with
1598          * the remainder of the reply. */
1599         if ((int)c->sentlen == c->bufpos) {
1600             c->bufpos = 0;
1601             c->sentlen = 0;
1602         }
1603     } else {
1604         clientReplyBlock *o = listNodeValue(listFirst(c->reply));
1605         size_t objlen = o->used;
1606 
1607         if (objlen == 0) {
1608             c->reply_bytes -= o->size;
1609             listDelNode(c->reply,listFirst(c->reply));
1610             return C_OK;
1611         }
1612 
1613         *nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);
1614         if (*nwritten <= 0) return C_ERR;
1615         c->sentlen += *nwritten;
1616 
1617         /* If we fully sent the object on head go to the next one */
1618         if (c->sentlen == objlen) {
1619             c->reply_bytes -= o->size;
1620             listDelNode(c->reply,listFirst(c->reply));
1621             c->sentlen = 0;
1622             /* If there are no longer objects in the list, we expect
1623              * the count of reply bytes to be exactly zero. */
1624             if (listLength(c->reply) == 0)
1625                 serverAssert(c->reply_bytes == 0);
1626         }
1627     }
1628     return C_OK;
1629 }
1630 
1631 /* Write data in output buffers to client. Return C_OK if the client
1632  * is still valid after the call, C_ERR if it was freed because of some
1633  * error.  If handler_installed is set, it will attempt to clear the
1634  * write event.
1635  *
1636  * This function is called by threads, but always with handler_installed
1637  * set to 0. So when handler_installed is set to 0 the function must be
1638  * thread safe. */
writeToClient(client * c,int handler_installed)1639 int writeToClient(client *c, int handler_installed) {
1640     /* Update total number of writes on server */
1641     atomicIncr(server.stat_total_writes_processed, 1);
1642 
1643     ssize_t nwritten = 0, totwritten = 0;
1644 
1645     while(clientHasPendingReplies(c)) {
1646         int ret = _writeToClient(c, &nwritten);
1647         if (ret == C_ERR) break;
1648         totwritten += nwritten;
1649         /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
1650          * bytes, in a single threaded server it's a good idea to serve
1651          * other clients as well, even if a very large request comes from
1652          * super fast link that is always able to accept data (in real world
1653          * scenario think about 'KEYS *' against the loopback interface).
1654          *
1655          * However if we are over the maxmemory limit we ignore that and
1656          * just deliver as much data as it is possible to deliver.
1657          *
1658          * Moreover, we also send as much as possible if the client is
1659          * a slave or a monitor (otherwise, on high-speed traffic, the
1660          * replication/output buffer will grow indefinitely) */
1661         if (totwritten > NET_MAX_WRITES_PER_EVENT &&
1662             (server.maxmemory == 0 ||
1663              zmalloc_used_memory() < server.maxmemory) &&
1664             !(c->flags & CLIENT_SLAVE)) break;
1665     }
1666     atomicIncr(server.stat_net_output_bytes, totwritten);
1667     if (nwritten == -1) {
1668         if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
1669             serverLog(LL_VERBOSE,
1670                 "Error writing to client: %s", connGetLastError(c->conn));
1671             freeClientAsync(c);
1672             return C_ERR;
1673         }
1674     }
1675     if (totwritten > 0) {
1676         /* For clients representing masters we don't count sending data
1677          * as an interaction, since we always send REPLCONF ACK commands
1678          * that take some time to just fill the socket output buffer.
1679          * We just rely on data / pings received for timeout detection. */
1680         if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
1681     }
1682     if (!clientHasPendingReplies(c)) {
1683         c->sentlen = 0;
1684         /* Note that writeToClient() is called in a threaded way, but
1685          * adDeleteFileEvent() is not thread safe: however writeToClient()
1686          * is always called with handler_installed set to 0 from threads
1687          * so we are fine. */
1688         if (handler_installed) {
1689             serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
1690             connSetWriteHandler(c->conn, NULL);
1691         }
1692 
1693         /* Close connection after entire reply has been sent. */
1694         if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
1695             freeClientAsync(c);
1696             return C_ERR;
1697         }
1698     }
1699     updateClientMemUsage(c);
1700     return C_OK;
1701 }
1702 
1703 /* Write event handler. Just send data to the client. */
sendReplyToClient(connection * conn)1704 void sendReplyToClient(connection *conn) {
1705     client *c = connGetPrivateData(conn);
1706     writeToClient(c,1);
1707 }
1708 
1709 /* This function is called just before entering the event loop, in the hope
1710  * we can just write the replies to the client output buffer without any
1711  * need to use a syscall in order to install the writable event handler,
1712  * get it called, and so forth. */
handleClientsWithPendingWrites(void)1713 int handleClientsWithPendingWrites(void) {
1714     listIter li;
1715     listNode *ln;
1716     int processed = listLength(server.clients_pending_write);
1717 
1718     listRewind(server.clients_pending_write,&li);
1719     while((ln = listNext(&li))) {
1720         client *c = listNodeValue(ln);
1721         c->flags &= ~CLIENT_PENDING_WRITE;
1722         listDelNode(server.clients_pending_write,ln);
1723 
1724         /* If a client is protected, don't do anything,
1725          * that may trigger write error or recreate handler. */
1726         if (c->flags & CLIENT_PROTECTED) continue;
1727 
1728         /* Don't write to clients that are going to be closed anyway. */
1729         if (c->flags & CLIENT_CLOSE_ASAP) continue;
1730 
1731         /* Try to write buffers to the client socket. */
1732         if (writeToClient(c,0) == C_ERR) continue;
1733 
1734         /* If after the synchronous writes above we still have data to
1735          * output to the client, we need to install the writable handler. */
1736         if (clientHasPendingReplies(c)) {
1737             int ae_barrier = 0;
1738             /* For the fsync=always policy, we want that a given FD is never
1739              * served for reading and writing in the same event loop iteration,
1740              * so that in the middle of receiving the query, and serving it
1741              * to the client, we'll call beforeSleep() that will do the
1742              * actual fsync of AOF to disk. the write barrier ensures that. */
1743             if (server.aof_state == AOF_ON &&
1744                 server.aof_fsync == AOF_FSYNC_ALWAYS)
1745             {
1746                 ae_barrier = 1;
1747             }
1748             if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
1749                 freeClientAsync(c);
1750             }
1751         }
1752     }
1753     return processed;
1754 }
1755 
1756 /* resetClient prepare the client to process the next command */
resetClient(client * c)1757 void resetClient(client *c) {
1758     redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
1759 
1760     freeClientArgv(c);
1761     c->reqtype = 0;
1762     c->multibulklen = 0;
1763     c->bulklen = -1;
1764 
1765     /* We clear the ASKING flag as well if we are not inside a MULTI, and
1766      * if what we just executed is not the ASKING command itself. */
1767     if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
1768         c->flags &= ~CLIENT_ASKING;
1769 
1770     /* We do the same for the CACHING command as well. It also affects
1771      * the next command or transaction executed, in a way very similar
1772      * to ASKING. */
1773     if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand)
1774         c->flags &= ~CLIENT_TRACKING_CACHING;
1775 
1776     /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
1777      * to the next command will be sent, but set the flag if the command
1778      * we just processed was "CLIENT REPLY SKIP". */
1779     c->flags &= ~CLIENT_REPLY_SKIP;
1780     if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
1781         c->flags |= CLIENT_REPLY_SKIP;
1782         c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
1783     }
1784 }
1785 
1786 /* This function is used when we want to re-enter the event loop but there
1787  * is the risk that the client we are dealing with will be freed in some
1788  * way. This happens for instance in:
1789  *
1790  * * DEBUG RELOAD and similar.
1791  * * When a Lua script is in -BUSY state.
1792  *
1793  * So the function will protect the client by doing two things:
1794  *
1795  * 1) It removes the file events. This way it is not possible that an
1796  *    error is signaled on the socket, freeing the client.
1797  * 2) Moreover it makes sure that if the client is freed in a different code
1798  *    path, it is not really released, but only marked for later release. */
protectClient(client * c)1799 void protectClient(client *c) {
1800     c->flags |= CLIENT_PROTECTED;
1801     if (c->conn) {
1802         connSetReadHandler(c->conn,NULL);
1803         connSetWriteHandler(c->conn,NULL);
1804     }
1805 }
1806 
1807 /* This will undo the client protection done by protectClient() */
unprotectClient(client * c)1808 void unprotectClient(client *c) {
1809     if (c->flags & CLIENT_PROTECTED) {
1810         c->flags &= ~CLIENT_PROTECTED;
1811         if (c->conn) {
1812             connSetReadHandler(c->conn,readQueryFromClient);
1813             if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
1814         }
1815     }
1816 }
1817 
1818 /* Like processMultibulkBuffer(), but for the inline protocol instead of RESP,
1819  * this function consumes the client query buffer and creates a command ready
1820  * to be executed inside the client structure. Returns C_OK if the command
1821  * is ready to be executed, or C_ERR if there is still protocol to read to
1822  * have a well formed command. The function also returns C_ERR when there is
1823  * a protocol error: in such a case the client structure is setup to reply
1824  * with the error and close the connection. */
processInlineBuffer(client * c)1825 int processInlineBuffer(client *c) {
1826     char *newline;
1827     int argc, j, linefeed_chars = 1;
1828     sds *argv, aux;
1829     size_t querylen;
1830 
1831     /* Search for end of line */
1832     newline = strchr(c->querybuf+c->qb_pos,'\n');
1833 
1834     /* Nothing to do without a \r\n */
1835     if (newline == NULL) {
1836         if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
1837             addReplyError(c,"Protocol error: too big inline request");
1838             setProtocolError("too big inline request",c);
1839         }
1840         return C_ERR;
1841     }
1842 
1843     /* Handle the \r\n case. */
1844     if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
1845         newline--, linefeed_chars++;
1846 
1847     /* Split the input buffer up to the \r\n */
1848     querylen = newline-(c->querybuf+c->qb_pos);
1849     aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
1850     argv = sdssplitargs(aux,&argc);
1851     sdsfree(aux);
1852     if (argv == NULL) {
1853         addReplyError(c,"Protocol error: unbalanced quotes in request");
1854         setProtocolError("unbalanced quotes in inline request",c);
1855         return C_ERR;
1856     }
1857 
1858     /* Newline from slaves can be used to refresh the last ACK time.
1859      * This is useful for a slave to ping back while loading a big
1860      * RDB file. */
1861     if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
1862         c->repl_ack_time = server.unixtime;
1863 
1864     /* Masters should never send us inline protocol to run actual
1865      * commands. If this happens, it is likely due to a bug in Redis where
1866      * we got some desynchronization in the protocol, for example
1867      * because of a PSYNC gone bad.
1868      *
1869      * However the is an exception: masters may send us just a newline
1870      * to keep the connection active. */
1871     if (querylen != 0 && c->flags & CLIENT_MASTER) {
1872         sdsfreesplitres(argv,argc);
1873         serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master.");
1874         setProtocolError("Master using the inline protocol. Desync?",c);
1875         return C_ERR;
1876     }
1877 
1878     /* Move querybuffer position to the next query in the buffer. */
1879     c->qb_pos += querylen+linefeed_chars;
1880 
1881     /* Setup argv array on client structure */
1882     if (argc) {
1883         if (c->argv) zfree(c->argv);
1884         c->argv_len = argc;
1885         c->argv = zmalloc(sizeof(robj*)*c->argv_len);
1886         c->argv_len_sum = 0;
1887     }
1888 
1889     /* Create redis objects for all arguments. */
1890     for (c->argc = 0, j = 0; j < argc; j++) {
1891         c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
1892         c->argc++;
1893         c->argv_len_sum += sdslen(argv[j]);
1894     }
1895     zfree(argv);
1896     return C_OK;
1897 }
1898 
1899 /* Helper function. Record protocol error details in server log,
1900  * and set the client as CLIENT_CLOSE_AFTER_REPLY and
1901  * CLIENT_PROTOCOL_ERROR. */
1902 #define PROTO_DUMP_LEN 128
setProtocolError(const char * errstr,client * c)1903 static void setProtocolError(const char *errstr, client *c) {
1904     if (server.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) {
1905         sds client = catClientInfoString(sdsempty(),c);
1906 
1907         /* Sample some protocol to given an idea about what was inside. */
1908         char buf[256];
1909         if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
1910             snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos);
1911         } else {
1912             snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
1913         }
1914 
1915         /* Remove non printable chars. */
1916         char *p = buf;
1917         while (*p != '\0') {
1918             if (!isprint(*p)) *p = '.';
1919             p++;
1920         }
1921 
1922         /* Log all the client and protocol info. */
1923         int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING :
1924                                                     LL_VERBOSE;
1925         serverLog(loglevel,
1926             "Protocol error (%s) from client: %s. %s", errstr, client, buf);
1927         sdsfree(client);
1928     }
1929     c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR);
1930 }
1931 
1932 /* Process the query buffer for client 'c', setting up the client argument
1933  * vector for command execution. Returns C_OK if after running the function
1934  * the client has a well-formed ready to be processed command, otherwise
1935  * C_ERR if there is still to read more buffer to get the full command.
1936  * The function also returns C_ERR when there is a protocol error: in such a
1937  * case the client structure is setup to reply with the error and close
1938  * the connection.
1939  *
1940  * This function is called if processInputBuffer() detects that the next
1941  * command is in RESP format, so the first byte in the command is found
1942  * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
processMultibulkBuffer(client * c)1943 int processMultibulkBuffer(client *c) {
1944     char *newline = NULL;
1945     int ok;
1946     long long ll;
1947 
1948     if (c->multibulklen == 0) {
1949         /* The client should have been reset */
1950         serverAssertWithInfo(c,NULL,c->argc == 0);
1951 
1952         /* Multi bulk length cannot be read without a \r\n */
1953         newline = strchr(c->querybuf+c->qb_pos,'\r');
1954         if (newline == NULL) {
1955             if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
1956                 addReplyError(c,"Protocol error: too big mbulk count string");
1957                 setProtocolError("too big mbulk count string",c);
1958             }
1959             return C_ERR;
1960         }
1961 
1962         /* Buffer should also contain \n */
1963         if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
1964             return C_ERR;
1965 
1966         /* We know for sure there is a whole line since newline != NULL,
1967          * so go ahead and find out the multi bulk length. */
1968         serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
1969         ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
1970         if (!ok || ll > INT_MAX) {
1971             addReplyError(c,"Protocol error: invalid multibulk length");
1972             setProtocolError("invalid mbulk count",c);
1973             return C_ERR;
1974         } else if (ll > 10 && authRequired(c)) {
1975             addReplyError(c, "Protocol error: unauthenticated multibulk length");
1976             setProtocolError("unauth mbulk count", c);
1977             return C_ERR;
1978         }
1979 
1980         c->qb_pos = (newline-c->querybuf)+2;
1981 
1982         if (ll <= 0) return C_OK;
1983 
1984         c->multibulklen = ll;
1985 
1986         /* Setup argv array on client structure */
1987         if (c->argv) zfree(c->argv);
1988         c->argv_len = min(c->multibulklen, 1024);
1989         c->argv = zmalloc(sizeof(robj*)*c->argv_len);
1990         c->argv_len_sum = 0;
1991     }
1992 
1993     serverAssertWithInfo(c,NULL,c->multibulklen > 0);
1994     while(c->multibulklen) {
1995         /* Read bulk length if unknown */
1996         if (c->bulklen == -1) {
1997             newline = strchr(c->querybuf+c->qb_pos,'\r');
1998             if (newline == NULL) {
1999                 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
2000                     addReplyError(c,
2001                         "Protocol error: too big bulk count string");
2002                     setProtocolError("too big bulk count string",c);
2003                     return C_ERR;
2004                 }
2005                 break;
2006             }
2007 
2008             /* Buffer should also contain \n */
2009             if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
2010                 break;
2011 
2012             if (c->querybuf[c->qb_pos] != '$') {
2013                 addReplyErrorFormat(c,
2014                     "Protocol error: expected '$', got '%c'",
2015                     c->querybuf[c->qb_pos]);
2016                 setProtocolError("expected $ but got something else",c);
2017                 return C_ERR;
2018             }
2019 
2020             ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
2021             if (!ok || ll < 0 ||
2022                 (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) {
2023                 addReplyError(c,"Protocol error: invalid bulk length");
2024                 setProtocolError("invalid bulk length",c);
2025                 return C_ERR;
2026             } else if (ll > 16384 && authRequired(c)) {
2027                 addReplyError(c, "Protocol error: unauthenticated bulk length");
2028                 setProtocolError("unauth bulk length", c);
2029                 return C_ERR;
2030             }
2031 
2032             c->qb_pos = newline-c->querybuf+2;
2033             if (ll >= PROTO_MBULK_BIG_ARG) {
2034                 /* If we are going to read a large object from network
2035                  * try to make it likely that it will start at c->querybuf
2036                  * boundary so that we can optimize object creation
2037                  * avoiding a large copy of data.
2038                  *
2039                  * But only when the data we have not parsed is less than
2040                  * or equal to ll+2. If the data length is greater than
2041                  * ll+2, trimming querybuf is just a waste of time, because
2042                  * at this time the querybuf contains not only our bulk. */
2043                 if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
2044                     sdsrange(c->querybuf,c->qb_pos,-1);
2045                     c->qb_pos = 0;
2046                     /* Hint the sds library about the amount of bytes this string is
2047                      * going to contain. */
2048                     c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf,ll+2-sdslen(c->querybuf));
2049                 }
2050             }
2051             c->bulklen = ll;
2052         }
2053 
2054         /* Read bulk argument */
2055         if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
2056             /* Not enough data (+2 == trailing \r\n) */
2057             break;
2058         } else {
2059             /* Check if we have space in argv, grow if needed */
2060             if (c->argc >= c->argv_len) {
2061                 c->argv_len = min(c->argv_len < INT_MAX/2 ? c->argv_len*2 : INT_MAX, c->argc+c->multibulklen);
2062                 c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len);
2063             }
2064 
2065             /* Optimization: if the buffer contains JUST our bulk element
2066              * instead of creating a new object by *copying* the sds we
2067              * just use the current sds string. */
2068             if (c->qb_pos == 0 &&
2069                 c->bulklen >= PROTO_MBULK_BIG_ARG &&
2070                 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
2071             {
2072                 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
2073                 c->argv_len_sum += c->bulklen;
2074                 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
2075                 /* Assume that if we saw a fat argument we'll see another one
2076                  * likely... */
2077                 c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
2078                 sdsclear(c->querybuf);
2079             } else {
2080                 c->argv[c->argc++] =
2081                     createStringObject(c->querybuf+c->qb_pos,c->bulklen);
2082                 c->argv_len_sum += c->bulklen;
2083                 c->qb_pos += c->bulklen+2;
2084             }
2085             c->bulklen = -1;
2086             c->multibulklen--;
2087         }
2088     }
2089 
2090     /* We're done when c->multibulk == 0 */
2091     if (c->multibulklen == 0) return C_OK;
2092 
2093     /* Still not ready to process the command */
2094     return C_ERR;
2095 }
2096 
2097 /* Perform necessary tasks after a command was executed:
2098  *
2099  * 1. The client is reset unless there are reasons to avoid doing it.
2100  * 2. In the case of master clients, the replication offset is updated.
2101  * 3. Propagate commands we got from our master to replicas down the line. */
commandProcessed(client * c)2102 void commandProcessed(client *c) {
2103     /* If client is blocked(including paused), just return avoid reset and replicate.
2104      *
2105      * 1. Don't reset the client structure for blocked clients, so that the reply
2106      *    callback will still be able to access the client argv and argc fields.
2107      *    The client will be reset in unblockClient().
2108      * 2. Don't update replication offset or propagate commands to replicas,
2109      *    since we have not applied the command. */
2110     if (c->flags & CLIENT_BLOCKED) return;
2111 
2112     resetClient(c);
2113 
2114     long long prev_offset = c->reploff;
2115     if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
2116         /* Update the applied replication offset of our master. */
2117         c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
2118     }
2119 
2120     /* If the client is a master we need to compute the difference
2121      * between the applied offset before and after processing the buffer,
2122      * to understand how much of the replication stream was actually
2123      * applied to the master state: this quantity, and its corresponding
2124      * part of the replication stream, will be propagated to the
2125      * sub-replicas and to the replication backlog. */
2126     if (c->flags & CLIENT_MASTER) {
2127         long long applied = c->reploff - prev_offset;
2128         if (applied) {
2129             replicationFeedStreamFromMasterStream(c->pending_querybuf,applied);
2130             sdsrange(c->pending_querybuf,applied,-1);
2131         }
2132     }
2133 }
2134 
2135 /* This function calls processCommand(), but also performs a few sub tasks
2136  * for the client that are useful in that context:
2137  *
2138  * 1. It sets the current client to the client 'c'.
2139  * 2. calls commandProcessed() if the command was handled.
2140  *
2141  * The function returns C_ERR in case the client was freed as a side effect
2142  * of processing the command, otherwise C_OK is returned. */
processCommandAndResetClient(client * c)2143 int processCommandAndResetClient(client *c) {
2144     int deadclient = 0;
2145     client *old_client = server.current_client;
2146     server.current_client = c;
2147     if (processCommand(c) == C_OK) {
2148         commandProcessed(c);
2149         /* Update the client's memory to include output buffer growth following the
2150          * processed command. */
2151         updateClientMemUsage(c);
2152     }
2153 
2154     if (server.current_client == NULL) deadclient = 1;
2155     /*
2156      * Restore the old client, this is needed because when a script
2157      * times out, we will get into this code from processEventsWhileBlocked.
2158      * Which will cause to set the server.current_client. If not restored
2159      * we will return 1 to our caller which will falsely indicate the client
2160      * is dead and will stop reading from its buffer.
2161      */
2162     server.current_client = old_client;
2163     /* performEvictions may flush slave output buffers. This may
2164      * result in a slave, that may be the active client, to be
2165      * freed. */
2166     return deadclient ? C_ERR : C_OK;
2167 }
2168 
2169 
2170 /* This function will execute any fully parsed commands pending on
2171  * the client. Returns C_ERR if the client is no longer valid after executing
2172  * the command, and C_OK for all other cases. */
processPendingCommandsAndResetClient(client * c)2173 int processPendingCommandsAndResetClient(client *c) {
2174     if (c->flags & CLIENT_PENDING_COMMAND) {
2175         c->flags &= ~CLIENT_PENDING_COMMAND;
2176         if (processCommandAndResetClient(c) == C_ERR) {
2177             return C_ERR;
2178         }
2179     }
2180     return C_OK;
2181 }
2182 
2183 /* This function is called every time, in the client structure 'c', there is
2184  * more query buffer to process, because we read more data from the socket
2185  * or because a client was blocked and later reactivated, so there could be
2186  * pending query buffer, already representing a full command, to process.
2187  * return C_ERR in case the client was freed during the processing */
processInputBuffer(client * c)2188 int processInputBuffer(client *c) {
2189     /* Keep processing while there is something in the input buffer */
2190     while(c->qb_pos < sdslen(c->querybuf)) {
2191         /* Immediately abort if the client is in the middle of something. */
2192         if (c->flags & CLIENT_BLOCKED) break;
2193 
2194         /* Don't process more buffers from clients that have already pending
2195          * commands to execute in c->argv. */
2196         if (c->flags & CLIENT_PENDING_COMMAND) break;
2197 
2198         /* Don't process input from the master while there is a busy script
2199          * condition on the slave. We want just to accumulate the replication
2200          * stream (instead of replying -BUSY like we do with other clients) and
2201          * later resume the processing. */
2202         if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
2203 
2204         /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
2205          * written to the client. Make sure to not let the reply grow after
2206          * this flag has been set (i.e. don't process more commands).
2207          *
2208          * The same applies for clients we want to terminate ASAP. */
2209         if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
2210 
2211         /* Determine request type when unknown. */
2212         if (!c->reqtype) {
2213             if (c->querybuf[c->qb_pos] == '*') {
2214                 c->reqtype = PROTO_REQ_MULTIBULK;
2215             } else {
2216                 c->reqtype = PROTO_REQ_INLINE;
2217             }
2218         }
2219 
2220         if (c->reqtype == PROTO_REQ_INLINE) {
2221             if (processInlineBuffer(c) != C_OK) break;
2222         } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
2223             if (processMultibulkBuffer(c) != C_OK) break;
2224         } else {
2225             serverPanic("Unknown request type");
2226         }
2227 
2228         /* Multibulk processing could see a <= 0 length. */
2229         if (c->argc == 0) {
2230             resetClient(c);
2231         } else {
2232             /* If we are in the context of an I/O thread, we can't really
2233              * execute the command here. All we can do is to flag the client
2234              * as one that needs to process the command. */
2235             if (io_threads_op != IO_THREADS_OP_IDLE) {
2236                 serverAssert(io_threads_op == IO_THREADS_OP_READ);
2237                 c->flags |= CLIENT_PENDING_COMMAND;
2238                 break;
2239             }
2240 
2241             /* We are finally ready to execute the command. */
2242             if (processCommandAndResetClient(c) == C_ERR) {
2243                 /* If the client is no longer valid, we avoid exiting this
2244                  * loop and trimming the client buffer later. So we return
2245                  * ASAP in that case. */
2246                 return C_ERR;
2247             }
2248         }
2249     }
2250 
2251     /* Trim to pos */
2252     if (c->qb_pos) {
2253         sdsrange(c->querybuf,c->qb_pos,-1);
2254         c->qb_pos = 0;
2255     }
2256 
2257     /* Update client memory usage after processing the query buffer, this is
2258      * important in case the query buffer is big and wasn't drained during
2259      * the above loop (because of partially sent big commands). */
2260     updateClientMemUsage(c);
2261 
2262     return C_OK;
2263 }
2264 
readQueryFromClient(connection * conn)2265 void readQueryFromClient(connection *conn) {
2266     client *c = connGetPrivateData(conn);
2267     int nread, big_arg = 0;
2268     size_t qblen, readlen;
2269 
2270     /* Check if we want to read from the client later when exiting from
2271      * the event loop. This is the case if threaded I/O is enabled. */
2272     if (postponeClientRead(c)) return;
2273 
2274     /* Update total number of reads on server */
2275     atomicIncr(server.stat_total_reads_processed, 1);
2276 
2277     readlen = PROTO_IOBUF_LEN;
2278     /* If this is a multi bulk request, and we are processing a bulk reply
2279      * that is large enough, try to maximize the probability that the query
2280      * buffer contains exactly the SDS string representing the object, even
2281      * at the risk of requiring more read(2) calls. This way the function
2282      * processMultiBulkBuffer() can avoid copying buffers to create the
2283      * Redis Object representing the argument. */
2284     if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
2285         && c->bulklen >= PROTO_MBULK_BIG_ARG)
2286     {
2287         ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
2288         big_arg = 1;
2289 
2290         /* Note that the 'remaining' variable may be zero in some edge case,
2291          * for example once we resume a blocked client after CLIENT PAUSE. */
2292         if (remaining > 0) readlen = remaining;
2293     }
2294 
2295     qblen = sdslen(c->querybuf);
2296     if (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN) {
2297         /* When reading a BIG_ARG we won't be reading more than that one arg
2298          * into the query buffer, so we don't need to pre-allocate more than we
2299          * need, so using the non-greedy growing. For an initial allocation of
2300          * the query buffer, we also don't wanna use the greedy growth, in order
2301          * to avoid collision with the RESIZE_THRESHOLD mechanism. */
2302         c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen);
2303     } else {
2304         c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
2305 
2306         /* Read as much as possible from the socket to save read(2) system calls. */
2307         readlen = sdsavail(c->querybuf);
2308     }
2309     nread = connRead(c->conn, c->querybuf+qblen, readlen);
2310     if (nread == -1) {
2311         if (connGetState(conn) == CONN_STATE_CONNECTED) {
2312             return;
2313         } else {
2314             serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
2315             freeClientAsync(c);
2316             goto done;
2317         }
2318     } else if (nread == 0) {
2319         if (server.verbosity <= LL_VERBOSE) {
2320             sds info = catClientInfoString(sdsempty(), c);
2321             serverLog(LL_VERBOSE, "Client closed connection %s", info);
2322             sdsfree(info);
2323         }
2324         freeClientAsync(c);
2325         goto done;
2326     } else if (c->flags & CLIENT_MASTER) {
2327         /* Append the query buffer to the pending (not applied) buffer
2328          * of the master. We'll use this buffer later in order to have a
2329          * copy of the string applied by the last command executed. */
2330         c->pending_querybuf = sdscatlen(c->pending_querybuf,
2331                                         c->querybuf+qblen,nread);
2332     }
2333 
2334     sdsIncrLen(c->querybuf,nread);
2335     qblen = sdslen(c->querybuf);
2336     if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
2337 
2338     c->lastinteraction = server.unixtime;
2339     if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
2340     atomicIncr(server.stat_net_input_bytes, nread);
2341     if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
2342         sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
2343 
2344         bytes = sdscatrepr(bytes,c->querybuf,64);
2345         serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
2346         sdsfree(ci);
2347         sdsfree(bytes);
2348         freeClientAsync(c);
2349         goto done;
2350     }
2351 
2352     /* There is more data in the client input buffer, continue parsing it
2353      * and check if there is a full command to execute. */
2354      if (processInputBuffer(c) == C_ERR)
2355          c = NULL;
2356 
2357 done:
2358     beforeNextClient(c);
2359 }
2360 
2361 /* A Redis "Address String" is a colon separated ip:port pair.
2362  * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234".
2363  * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234".
2364  * For Unix sockets we use path:0, like in "/tmp/redis:0".
2365  *
2366  * An Address String always fits inside a buffer of NET_ADDR_STR_LEN bytes,
2367  * including the null term.
2368  *
2369  * On failure the function still populates 'addr' with the "?:0" string in case
2370  * you want to relax error checking or need to display something anyway (see
2371  * anetFdToString implementation for more info). */
genClientAddrString(client * client,char * addr,size_t addr_len,int fd_to_str_type)2372 void genClientAddrString(client *client, char *addr,
2373                          size_t addr_len, int fd_to_str_type) {
2374     if (client->flags & CLIENT_UNIX_SOCKET) {
2375         /* Unix socket client. */
2376         snprintf(addr,addr_len,"%s:0",server.unixsocket);
2377     } else {
2378         /* TCP client. */
2379         connFormatFdAddr(client->conn,addr,addr_len,fd_to_str_type);
2380     }
2381 }
2382 
2383 /* This function returns the client peer id, by creating and caching it
2384  * if client->peerid is NULL, otherwise returning the cached value.
2385  * The Peer ID never changes during the life of the client, however it
2386  * is expensive to compute. */
getClientPeerId(client * c)2387 char *getClientPeerId(client *c) {
2388     char peerid[NET_ADDR_STR_LEN];
2389 
2390     if (c->peerid == NULL) {
2391         genClientAddrString(c,peerid,sizeof(peerid),FD_TO_PEER_NAME);
2392         c->peerid = sdsnew(peerid);
2393     }
2394     return c->peerid;
2395 }
2396 
2397 /* This function returns the client bound socket name, by creating and caching
2398  * it if client->sockname is NULL, otherwise returning the cached value.
2399  * The Socket Name never changes during the life of the client, however it
2400  * is expensive to compute. */
getClientSockname(client * c)2401 char *getClientSockname(client *c) {
2402     char sockname[NET_ADDR_STR_LEN];
2403 
2404     if (c->sockname == NULL) {
2405         genClientAddrString(c,sockname,sizeof(sockname),FD_TO_SOCK_NAME);
2406         c->sockname = sdsnew(sockname);
2407     }
2408     return c->sockname;
2409 }
2410 
2411 /* Concatenate a string representing the state of a client in a human
2412  * readable format, into the sds string 's'. */
catClientInfoString(sds s,client * client)2413 sds catClientInfoString(sds s, client *client) {
2414     char flags[16], events[3], conninfo[CONN_INFO_LEN], *p;
2415 
2416     p = flags;
2417     if (client->flags & CLIENT_SLAVE) {
2418         if (client->flags & CLIENT_MONITOR)
2419             *p++ = 'O';
2420         else
2421             *p++ = 'S';
2422     }
2423     if (client->flags & CLIENT_MASTER) *p++ = 'M';
2424     if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
2425     if (client->flags & CLIENT_MULTI) *p++ = 'x';
2426     if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
2427     if (client->flags & CLIENT_TRACKING) *p++ = 't';
2428     if (client->flags & CLIENT_TRACKING_BROKEN_REDIR) *p++ = 'R';
2429     if (client->flags & CLIENT_TRACKING_BCAST) *p++ = 'B';
2430     if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
2431     if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
2432     if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
2433     if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
2434     if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
2435     if (client->flags & CLIENT_READONLY) *p++ = 'r';
2436     if (client->flags & CLIENT_NO_EVICT) *p++ = 'e';
2437     if (p == flags) *p++ = 'N';
2438     *p++ = '\0';
2439 
2440     p = events;
2441     if (client->conn) {
2442         if (connHasReadHandler(client->conn)) *p++ = 'r';
2443         if (connHasWriteHandler(client->conn)) *p++ = 'w';
2444     }
2445     *p = '\0';
2446 
2447     /* Compute the total memory consumed by this client. */
2448     size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem);
2449 
2450     size_t used_blocks_of_repl_buf = 0;
2451     if (client->ref_repl_buf_node) {
2452         replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
2453         replBufBlock *cur = listNodeValue(client->ref_repl_buf_node);
2454         used_blocks_of_repl_buf = last->id - cur->id + 1;
2455     }
2456 
2457     sds cmdname = client->lastcmd ? getFullCommandName(client->lastcmd) : NULL;
2458     sds ret = sdscatfmt(s,
2459         "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
2460         (unsigned long long) client->id,
2461         getClientPeerId(client),
2462         getClientSockname(client),
2463         connGetInfo(client->conn, conninfo, sizeof(conninfo)),
2464         client->name ? (char*)client->name->ptr : "",
2465         (long long)(server.unixtime - client->ctime),
2466         (long long)(server.unixtime - client->lastinteraction),
2467         flags,
2468         client->db->id,
2469         (int) dictSize(client->pubsub_channels),
2470         (int) listLength(client->pubsub_patterns),
2471         (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
2472         (unsigned long long) sdslen(client->querybuf),
2473         (unsigned long long) sdsavail(client->querybuf),
2474         (unsigned long long) client->argv_len_sum,
2475         (unsigned long long) client->mstate.argv_len_sums,
2476         (unsigned long long) client->bufpos,
2477         (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf,
2478         (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
2479         (unsigned long long) total_mem,
2480         events,
2481         cmdname ? cmdname : "NULL",
2482         client->user ? client->user->name : "(superuser)",
2483         (client->flags & CLIENT_TRACKING) ? (long long) client->client_tracking_redirection : -1,
2484         client->resp);
2485     if (cmdname)
2486         sdsfree(cmdname);
2487     return ret;
2488 }
2489 
getAllClientsInfoString(int type)2490 sds getAllClientsInfoString(int type) {
2491     listNode *ln;
2492     listIter li;
2493     client *client;
2494     sds o = sdsnewlen(SDS_NOINIT,200*listLength(server.clients));
2495     sdsclear(o);
2496     listRewind(server.clients,&li);
2497     while ((ln = listNext(&li)) != NULL) {
2498         client = listNodeValue(ln);
2499         if (type != -1 && getClientType(client) != type) continue;
2500         o = catClientInfoString(o,client);
2501         o = sdscatlen(o,"\n",1);
2502     }
2503     return o;
2504 }
2505 
2506 /* This function implements CLIENT SETNAME, including replying to the
2507  * user with an error if the charset is wrong (in that case C_ERR is
2508  * returned). If the function succeeded C_OK is returned, and it's up
2509  * to the caller to send a reply if needed.
2510  *
2511  * Setting an empty string as name has the effect of unsetting the
2512  * currently set name: the client will remain unnamed.
2513  *
2514  * This function is also used to implement the HELLO SETNAME option. */
clientSetNameOrReply(client * c,robj * name)2515 int clientSetNameOrReply(client *c, robj *name) {
2516     int len = sdslen(name->ptr);
2517     char *p = name->ptr;
2518 
2519     /* Setting the client name to an empty string actually removes
2520      * the current name. */
2521     if (len == 0) {
2522         if (c->name) decrRefCount(c->name);
2523         c->name = NULL;
2524         return C_OK;
2525     }
2526 
2527     /* Otherwise check if the charset is ok. We need to do this otherwise
2528      * CLIENT LIST format will break. You should always be able to
2529      * split by space to get the different fields. */
2530     for (int j = 0; j < len; j++) {
2531         if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */
2532             addReplyError(c,
2533                 "Client names cannot contain spaces, "
2534                 "newlines or special characters.");
2535             return C_ERR;
2536         }
2537     }
2538     if (c->name) decrRefCount(c->name);
2539     c->name = name;
2540     incrRefCount(name);
2541     return C_OK;
2542 }
2543 
2544 /* Reset the client state to resemble a newly connected client.
2545  */
resetCommand(client * c)2546 void resetCommand(client *c) {
2547     listNode *ln;
2548 
2549     /* MONITOR clients are also marked with CLIENT_SLAVE, we need to
2550      * distinguish between the two.
2551      */
2552     if (c->flags & CLIENT_MONITOR) {
2553         ln = listSearchKey(server.monitors,c);
2554         serverAssert(ln != NULL);
2555         listDelNode(server.monitors,ln);
2556 
2557         c->flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE);
2558     }
2559 
2560     if (c->flags & (CLIENT_SLAVE|CLIENT_MASTER|CLIENT_MODULE)) {
2561         addReplyError(c,"can only reset normal client connections");
2562         return;
2563     }
2564 
2565     if (c->flags & CLIENT_TRACKING) disableTracking(c);
2566     selectDb(c,0);
2567     c->resp = 2;
2568 
2569     clientSetDefaultAuth(c);
2570     moduleNotifyUserChanged(c);
2571     discardTransaction(c);
2572 
2573     pubsubUnsubscribeAllChannels(c,0);
2574     pubsubUnsubscribeAllPatterns(c,0);
2575 
2576     if (c->name) {
2577         decrRefCount(c->name);
2578         c->name = NULL;
2579     }
2580 
2581     /* Selectively clear state flags not covered above */
2582     c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB|
2583             CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP_NEXT);
2584 
2585     addReplyStatus(c,"RESET");
2586 }
2587 
2588 /* Disconnect the current client */
quitCommand(client * c)2589 void quitCommand(client *c) {
2590     addReply(c,shared.ok);
2591     c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2592 }
2593 
clientCommand(client * c)2594 void clientCommand(client *c) {
2595     listNode *ln;
2596     listIter li;
2597 
2598     if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
2599         const char *help[] = {
2600 "CACHING (YES|NO)",
2601 "    Enable/disable tracking of the keys for next command in OPTIN/OPTOUT modes.",
2602 "GETREDIR",
2603 "    Return the client ID we are redirecting to when tracking is enabled.",
2604 "GETNAME",
2605 "    Return the name of the current connection.",
2606 "ID",
2607 "    Return the ID of the current connection.",
2608 "INFO",
2609 "    Return information about the current client connection.",
2610 "KILL <ip:port>",
2611 "    Kill connection made from <ip:port>.",
2612 "KILL <option> <value> [<option> <value> [...]]",
2613 "    Kill connections. Options are:",
2614 "    * ADDR (<ip:port>|<unixsocket>:0)",
2615 "      Kill connections made from the specified address",
2616 "    * LADDR (<ip:port>|<unixsocket>:0)",
2617 "      Kill connections made to specified local address",
2618 "    * TYPE (normal|master|replica|pubsub)",
2619 "      Kill connections by type.",
2620 "    * USER <username>",
2621 "      Kill connections authenticated by <username>.",
2622 "    * SKIPME (YES|NO)",
2623 "      Skip killing current connection (default: yes).",
2624 "LIST [options ...]",
2625 "    Return information about client connections. Options:",
2626 "    * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)",
2627 "      Return clients of specified type.",
2628 "UNPAUSE",
2629 "    Stop the current client pause, resuming traffic.",
2630 "PAUSE <timeout> [WRITE|ALL]",
2631 "    Suspend all, or just write, clients for <timeout> milliseconds.",
2632 "REPLY (ON|OFF|SKIP)",
2633 "    Control the replies sent to the current connection.",
2634 "SETNAME <name>",
2635 "    Assign the name <name> to the current connection.",
2636 "GETNAME",
2637 "    Get the name of the current connection.",
2638 "UNBLOCK <clientid> [TIMEOUT|ERROR]",
2639 "    Unblock the specified blocked client.",
2640 "TRACKING (ON|OFF) [REDIRECT <id>] [BCAST] [PREFIX <prefix> [...]]",
2641 "         [OPTIN] [OPTOUT] [NOLOOP]",
2642 "    Control server assisted client side caching.",
2643 "TRACKINGINFO",
2644 "    Report tracking status for the current connection.",
2645 "NO-EVICT (ON|OFF)",
2646 "    Protect current client connection from eviction.",
2647 NULL
2648         };
2649         addReplyHelp(c, help);
2650     } else if (!strcasecmp(c->argv[1]->ptr,"id") && c->argc == 2) {
2651         /* CLIENT ID */
2652         addReplyLongLong(c,c->id);
2653     } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
2654         /* CLIENT INFO */
2655         sds o = catClientInfoString(sdsempty(), c);
2656         o = sdscatlen(o,"\n",1);
2657         addReplyVerbatim(c,o,sdslen(o),"txt");
2658         sdsfree(o);
2659     } else if (!strcasecmp(c->argv[1]->ptr,"list")) {
2660         /* CLIENT LIST */
2661         int type = -1;
2662         sds o = NULL;
2663         if (c->argc == 4 && !strcasecmp(c->argv[2]->ptr,"type")) {
2664             type = getClientTypeByName(c->argv[3]->ptr);
2665             if (type == -1) {
2666                 addReplyErrorFormat(c,"Unknown client type '%s'",
2667                     (char*) c->argv[3]->ptr);
2668                 return;
2669             }
2670         } else if (c->argc > 3 && !strcasecmp(c->argv[2]->ptr,"id")) {
2671             int j;
2672             o = sdsempty();
2673             for (j = 3; j < c->argc; j++) {
2674                 long long cid;
2675                 if (getLongLongFromObjectOrReply(c, c->argv[j], &cid,
2676                             "Invalid client ID")) {
2677                     sdsfree(o);
2678                     return;
2679                 }
2680                 client *cl = lookupClientByID(cid);
2681                 if (cl) {
2682                     o = catClientInfoString(o, cl);
2683                     o = sdscatlen(o, "\n", 1);
2684                 }
2685             }
2686         } else if (c->argc != 2) {
2687             addReplyErrorObject(c,shared.syntaxerr);
2688             return;
2689         }
2690 
2691         if (!o)
2692             o = getAllClientsInfoString(type);
2693         addReplyVerbatim(c,o,sdslen(o),"txt");
2694         sdsfree(o);
2695     } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) {
2696         /* CLIENT REPLY ON|OFF|SKIP */
2697         if (!strcasecmp(c->argv[2]->ptr,"on")) {
2698             c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF);
2699             addReply(c,shared.ok);
2700         } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
2701             c->flags |= CLIENT_REPLY_OFF;
2702         } else if (!strcasecmp(c->argv[2]->ptr,"skip")) {
2703             if (!(c->flags & CLIENT_REPLY_OFF))
2704                 c->flags |= CLIENT_REPLY_SKIP_NEXT;
2705         } else {
2706             addReplyErrorObject(c,shared.syntaxerr);
2707             return;
2708         }
2709     } else if (!strcasecmp(c->argv[1]->ptr,"no-evict") && c->argc == 3) {
2710         /* CLIENT NO-EVICT ON|OFF */
2711         if (!strcasecmp(c->argv[2]->ptr,"on")) {
2712             c->flags |= CLIENT_NO_EVICT;
2713             addReply(c,shared.ok);
2714         } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
2715             c->flags &= ~CLIENT_NO_EVICT;
2716             addReply(c,shared.ok);
2717         } else {
2718             addReplyErrorObject(c,shared.syntaxerr);
2719             return;
2720         }
2721     } else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
2722         /* CLIENT KILL <ip:port>
2723          * CLIENT KILL <option> [value] ... <option> [value] */
2724         char *addr = NULL;
2725         char *laddr = NULL;
2726         user *user = NULL;
2727         int type = -1;
2728         uint64_t id = 0;
2729         int skipme = 1;
2730         int killed = 0, close_this_client = 0;
2731 
2732         if (c->argc == 3) {
2733             /* Old style syntax: CLIENT KILL <addr> */
2734             addr = c->argv[2]->ptr;
2735             skipme = 0; /* With the old form, you can kill yourself. */
2736         } else if (c->argc > 3) {
2737             int i = 2; /* Next option index. */
2738 
2739             /* New style syntax: parse options. */
2740             while(i < c->argc) {
2741                 int moreargs = c->argc > i+1;
2742 
2743                 if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) {
2744                     long tmp;
2745 
2746                     if (getRangeLongFromObjectOrReply(c, c->argv[i+1], 1, LONG_MAX, &tmp,
2747                                                       "client-id should be greater than 0") != C_OK)
2748                         return;
2749                     id = tmp;
2750                 } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) {
2751                     type = getClientTypeByName(c->argv[i+1]->ptr);
2752                     if (type == -1) {
2753                         addReplyErrorFormat(c,"Unknown client type '%s'",
2754                             (char*) c->argv[i+1]->ptr);
2755                         return;
2756                     }
2757                 } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) {
2758                     addr = c->argv[i+1]->ptr;
2759                 } else if (!strcasecmp(c->argv[i]->ptr,"laddr") && moreargs) {
2760                     laddr = c->argv[i+1]->ptr;
2761                 } else if (!strcasecmp(c->argv[i]->ptr,"user") && moreargs) {
2762                     user = ACLGetUserByName(c->argv[i+1]->ptr,
2763                                             sdslen(c->argv[i+1]->ptr));
2764                     if (user == NULL) {
2765                         addReplyErrorFormat(c,"No such user '%s'",
2766                             (char*) c->argv[i+1]->ptr);
2767                         return;
2768                     }
2769                 } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && moreargs) {
2770                     if (!strcasecmp(c->argv[i+1]->ptr,"yes")) {
2771                         skipme = 1;
2772                     } else if (!strcasecmp(c->argv[i+1]->ptr,"no")) {
2773                         skipme = 0;
2774                     } else {
2775                         addReplyErrorObject(c,shared.syntaxerr);
2776                         return;
2777                     }
2778                 } else {
2779                     addReplyErrorObject(c,shared.syntaxerr);
2780                     return;
2781                 }
2782                 i += 2;
2783             }
2784         } else {
2785             addReplyErrorObject(c,shared.syntaxerr);
2786             return;
2787         }
2788 
2789         /* Iterate clients killing all the matching clients. */
2790         listRewind(server.clients,&li);
2791         while ((ln = listNext(&li)) != NULL) {
2792             client *client = listNodeValue(ln);
2793             if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
2794             if (laddr && strcmp(getClientSockname(client),laddr) != 0) continue;
2795             if (type != -1 && getClientType(client) != type) continue;
2796             if (id != 0 && client->id != id) continue;
2797             if (user && client->user != user) continue;
2798             if (c == client && skipme) continue;
2799 
2800             /* Kill it. */
2801             if (c == client) {
2802                 close_this_client = 1;
2803             } else {
2804                 freeClient(client);
2805             }
2806             killed++;
2807         }
2808 
2809         /* Reply according to old/new format. */
2810         if (c->argc == 3) {
2811             if (killed == 0)
2812                 addReplyError(c,"No such client");
2813             else
2814                 addReply(c,shared.ok);
2815         } else {
2816             addReplyLongLong(c,killed);
2817         }
2818 
2819         /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY
2820          * only after we queued the reply to its output buffers. */
2821         if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2822     } else if (!strcasecmp(c->argv[1]->ptr,"unblock") && (c->argc == 3 ||
2823                                                           c->argc == 4))
2824     {
2825         /* CLIENT UNBLOCK <id> [timeout|error] */
2826         long long id;
2827         int unblock_error = 0;
2828 
2829         if (c->argc == 4) {
2830             if (!strcasecmp(c->argv[3]->ptr,"timeout")) {
2831                 unblock_error = 0;
2832             } else if (!strcasecmp(c->argv[3]->ptr,"error")) {
2833                 unblock_error = 1;
2834             } else {
2835                 addReplyError(c,
2836                     "CLIENT UNBLOCK reason should be TIMEOUT or ERROR");
2837                 return;
2838             }
2839         }
2840         if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
2841             != C_OK) return;
2842         struct client *target = lookupClientByID(id);
2843         if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
2844             if (unblock_error)
2845                 addReplyError(target,
2846                     "-UNBLOCKED client unblocked via CLIENT UNBLOCK");
2847             else
2848                 replyToBlockedClientTimedOut(target);
2849             unblockClient(target);
2850             addReply(c,shared.cone);
2851         } else {
2852             addReply(c,shared.czero);
2853         }
2854     } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) {
2855         /* CLIENT SETNAME */
2856         if (clientSetNameOrReply(c,c->argv[2]) == C_OK)
2857             addReply(c,shared.ok);
2858     } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) {
2859         /* CLIENT GETNAME */
2860         if (c->name)
2861             addReplyBulk(c,c->name);
2862         else
2863             addReplyNull(c);
2864     } else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {
2865         /* CLIENT UNPAUSE */
2866         unpauseClients();
2867         addReply(c,shared.ok);
2868     } else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||
2869                                                         c->argc == 4))
2870     {
2871         /* CLIENT PAUSE TIMEOUT [WRITE|ALL] */
2872         mstime_t end;
2873         int type = CLIENT_PAUSE_ALL;
2874         if (c->argc == 4) {
2875             if (!strcasecmp(c->argv[3]->ptr,"write")) {
2876                 type = CLIENT_PAUSE_WRITE;
2877             } else if (!strcasecmp(c->argv[3]->ptr,"all")) {
2878                 type = CLIENT_PAUSE_ALL;
2879             } else {
2880                 addReplyError(c,
2881                     "CLIENT PAUSE mode must be WRITE or ALL");
2882                 return;
2883             }
2884         }
2885 
2886         if (getTimeoutFromObjectOrReply(c,c->argv[2],&end,
2887             UNIT_MILLISECONDS) != C_OK) return;
2888         pauseClients(end, type);
2889         addReply(c,shared.ok);
2890     } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
2891         /* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
2892          *                          [PREFIX second] [OPTIN] [OPTOUT] [NOLOOP]... */
2893         long long redir = 0;
2894         uint64_t options = 0;
2895         robj **prefix = NULL;
2896         size_t numprefix = 0;
2897 
2898         /* Parse the options. */
2899         for (int j = 3; j < c->argc; j++) {
2900             int moreargs = (c->argc-1) - j;
2901 
2902             if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
2903                 j++;
2904                 if (redir != 0) {
2905                     addReplyError(c,"A client can only redirect to a single "
2906                                     "other client");
2907                     zfree(prefix);
2908                     return;
2909                 }
2910 
2911                 if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
2912                     C_OK)
2913                 {
2914                     zfree(prefix);
2915                     return;
2916                 }
2917                 /* We will require the client with the specified ID to exist
2918                  * right now, even if it is possible that it gets disconnected
2919                  * later. Still a valid sanity check. */
2920                 if (lookupClientByID(redir) == NULL) {
2921                     addReplyError(c,"The client ID you want redirect to "
2922                                     "does not exist");
2923                     zfree(prefix);
2924                     return;
2925                 }
2926             } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
2927                 options |= CLIENT_TRACKING_BCAST;
2928             } else if (!strcasecmp(c->argv[j]->ptr,"optin")) {
2929                 options |= CLIENT_TRACKING_OPTIN;
2930             } else if (!strcasecmp(c->argv[j]->ptr,"optout")) {
2931                 options |= CLIENT_TRACKING_OPTOUT;
2932             } else if (!strcasecmp(c->argv[j]->ptr,"noloop")) {
2933                 options |= CLIENT_TRACKING_NOLOOP;
2934             } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
2935                 j++;
2936                 prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
2937                 prefix[numprefix++] = c->argv[j];
2938             } else {
2939                 zfree(prefix);
2940                 addReplyErrorObject(c,shared.syntaxerr);
2941                 return;
2942             }
2943         }
2944 
2945         /* Options are ok: enable or disable the tracking for this client. */
2946         if (!strcasecmp(c->argv[2]->ptr,"on")) {
2947             /* Before enabling tracking, make sure options are compatible
2948              * among each other and with the current state of the client. */
2949             if (!(options & CLIENT_TRACKING_BCAST) && numprefix) {
2950                 addReplyError(c,
2951                     "PREFIX option requires BCAST mode to be enabled");
2952                 zfree(prefix);
2953                 return;
2954             }
2955 
2956             if (c->flags & CLIENT_TRACKING) {
2957                 int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
2958                 int newbcast = !!(options & CLIENT_TRACKING_BCAST);
2959                 if (oldbcast != newbcast) {
2960                     addReplyError(c,
2961                     "You can't switch BCAST mode on/off before disabling "
2962                     "tracking for this client, and then re-enabling it with "
2963                     "a different mode.");
2964                     zfree(prefix);
2965                     return;
2966                 }
2967             }
2968 
2969             if (options & CLIENT_TRACKING_BCAST &&
2970                 options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT))
2971             {
2972                 addReplyError(c,
2973                 "OPTIN and OPTOUT are not compatible with BCAST");
2974                 zfree(prefix);
2975                 return;
2976             }
2977 
2978             if (options & CLIENT_TRACKING_OPTIN && options & CLIENT_TRACKING_OPTOUT)
2979             {
2980                 addReplyError(c,
2981                 "You can't specify both OPTIN mode and OPTOUT mode");
2982                 zfree(prefix);
2983                 return;
2984             }
2985 
2986             if ((options & CLIENT_TRACKING_OPTIN && c->flags & CLIENT_TRACKING_OPTOUT) ||
2987                 (options & CLIENT_TRACKING_OPTOUT && c->flags & CLIENT_TRACKING_OPTIN))
2988             {
2989                 addReplyError(c,
2990                 "You can't switch OPTIN/OPTOUT mode before disabling "
2991                 "tracking for this client, and then re-enabling it with "
2992                 "a different mode.");
2993                 zfree(prefix);
2994                 return;
2995             }
2996 
2997             if (options & CLIENT_TRACKING_BCAST) {
2998                 if (!checkPrefixCollisionsOrReply(c,prefix,numprefix)) {
2999                     zfree(prefix);
3000                     return;
3001                 }
3002             }
3003 
3004             enableTracking(c,redir,options,prefix,numprefix);
3005         } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
3006             disableTracking(c);
3007         } else {
3008             zfree(prefix);
3009             addReplyErrorObject(c,shared.syntaxerr);
3010             return;
3011         }
3012         zfree(prefix);
3013         addReply(c,shared.ok);
3014     } else if (!strcasecmp(c->argv[1]->ptr,"caching") && c->argc >= 3) {
3015         if (!(c->flags & CLIENT_TRACKING)) {
3016             addReplyError(c,"CLIENT CACHING can be called only when the "
3017                             "client is in tracking mode with OPTIN or "
3018                             "OPTOUT mode enabled");
3019             return;
3020         }
3021 
3022         char *opt = c->argv[2]->ptr;
3023         if (!strcasecmp(opt,"yes")) {
3024             if (c->flags & CLIENT_TRACKING_OPTIN) {
3025                 c->flags |= CLIENT_TRACKING_CACHING;
3026             } else {
3027                 addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode.");
3028                 return;
3029             }
3030         } else if (!strcasecmp(opt,"no")) {
3031             if (c->flags & CLIENT_TRACKING_OPTOUT) {
3032                 c->flags |= CLIENT_TRACKING_CACHING;
3033             } else {
3034                 addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode.");
3035                 return;
3036             }
3037         } else {
3038             addReplyErrorObject(c,shared.syntaxerr);
3039             return;
3040         }
3041 
3042         /* Common reply for when we succeeded. */
3043         addReply(c,shared.ok);
3044     } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
3045         /* CLIENT GETREDIR */
3046         if (c->flags & CLIENT_TRACKING) {
3047             addReplyLongLong(c,c->client_tracking_redirection);
3048         } else {
3049             addReplyLongLong(c,-1);
3050         }
3051     } else if (!strcasecmp(c->argv[1]->ptr,"trackinginfo") && c->argc == 2) {
3052         addReplyMapLen(c,3);
3053 
3054         /* Flags */
3055         addReplyBulkCString(c,"flags");
3056         void *arraylen_ptr = addReplyDeferredLen(c);
3057         int numflags = 0;
3058         addReplyBulkCString(c,c->flags & CLIENT_TRACKING ? "on" : "off");
3059         numflags++;
3060         if (c->flags & CLIENT_TRACKING_BCAST) {
3061             addReplyBulkCString(c,"bcast");
3062             numflags++;
3063         }
3064         if (c->flags & CLIENT_TRACKING_OPTIN) {
3065             addReplyBulkCString(c,"optin");
3066             numflags++;
3067             if (c->flags & CLIENT_TRACKING_CACHING) {
3068                 addReplyBulkCString(c,"caching-yes");
3069                 numflags++;
3070             }
3071         }
3072         if (c->flags & CLIENT_TRACKING_OPTOUT) {
3073             addReplyBulkCString(c,"optout");
3074             numflags++;
3075             if (c->flags & CLIENT_TRACKING_CACHING) {
3076                 addReplyBulkCString(c,"caching-no");
3077                 numflags++;
3078             }
3079         }
3080         if (c->flags & CLIENT_TRACKING_NOLOOP) {
3081             addReplyBulkCString(c,"noloop");
3082             numflags++;
3083         }
3084         if (c->flags & CLIENT_TRACKING_BROKEN_REDIR) {
3085             addReplyBulkCString(c,"broken_redirect");
3086             numflags++;
3087         }
3088         setDeferredSetLen(c,arraylen_ptr,numflags);
3089 
3090         /* Redirect */
3091         addReplyBulkCString(c,"redirect");
3092         if (c->flags & CLIENT_TRACKING) {
3093             addReplyLongLong(c,c->client_tracking_redirection);
3094         } else {
3095             addReplyLongLong(c,-1);
3096         }
3097 
3098         /* Prefixes */
3099         addReplyBulkCString(c,"prefixes");
3100         if (c->client_tracking_prefixes) {
3101             addReplyArrayLen(c,raxSize(c->client_tracking_prefixes));
3102             raxIterator ri;
3103             raxStart(&ri,c->client_tracking_prefixes);
3104             raxSeek(&ri,"^",NULL,0);
3105             while(raxNext(&ri)) {
3106                 addReplyBulkCBuffer(c,ri.key,ri.key_len);
3107             }
3108             raxStop(&ri);
3109         } else {
3110             addReplyArrayLen(c,0);
3111         }
3112     } else {
3113         addReplySubcommandSyntaxError(c);
3114     }
3115 }
3116 
3117 /* HELLO [<protocol-version> [AUTH <user> <password>] [SETNAME <name>] ] */
helloCommand(client * c)3118 void helloCommand(client *c) {
3119     long long ver = 0;
3120     int next_arg = 1;
3121 
3122     if (c->argc >= 2) {
3123         if (getLongLongFromObjectOrReply(c, c->argv[next_arg++], &ver,
3124             "Protocol version is not an integer or out of range") != C_OK) {
3125             return;
3126         }
3127 
3128         if (ver < 2 || ver > 3) {
3129             addReplyError(c,"-NOPROTO unsupported protocol version");
3130             return;
3131         }
3132     }
3133 
3134     for (int j = next_arg; j < c->argc; j++) {
3135         int moreargs = (c->argc-1) - j;
3136         const char *opt = c->argv[j]->ptr;
3137         if (!strcasecmp(opt,"AUTH") && moreargs >= 2) {
3138             redactClientCommandArgument(c, j+1);
3139             redactClientCommandArgument(c, j+2);
3140             if (ACLAuthenticateUser(c, c->argv[j+1], c->argv[j+2]) == C_ERR) {
3141                 addReplyError(c,"-WRONGPASS invalid username-password pair or user is disabled.");
3142                 return;
3143             }
3144             j += 2;
3145         } else if (!strcasecmp(opt,"SETNAME") && moreargs) {
3146             if (clientSetNameOrReply(c, c->argv[j+1]) == C_ERR) return;
3147             j++;
3148         } else {
3149             addReplyErrorFormat(c,"Syntax error in HELLO option '%s'",opt);
3150             return;
3151         }
3152     }
3153 
3154     /* At this point we need to be authenticated to continue. */
3155     if (!c->authenticated) {
3156         addReplyError(c,"-NOAUTH HELLO must be called with the client already "
3157                         "authenticated, otherwise the HELLO AUTH <user> <pass> "
3158                         "option can be used to authenticate the client and "
3159                         "select the RESP protocol version at the same time");
3160         return;
3161     }
3162 
3163     /* Let's switch to the specified RESP mode. */
3164     if (ver) c->resp = ver;
3165     addReplyMapLen(c,6 + !server.sentinel_mode);
3166 
3167     addReplyBulkCString(c,"server");
3168     addReplyBulkCString(c,"redis");
3169 
3170     addReplyBulkCString(c,"version");
3171     addReplyBulkCString(c,REDIS_VERSION);
3172 
3173     addReplyBulkCString(c,"proto");
3174     addReplyLongLong(c,c->resp);
3175 
3176     addReplyBulkCString(c,"id");
3177     addReplyLongLong(c,c->id);
3178 
3179     addReplyBulkCString(c,"mode");
3180     if (server.sentinel_mode) addReplyBulkCString(c,"sentinel");
3181     else if (server.cluster_enabled) addReplyBulkCString(c,"cluster");
3182     else addReplyBulkCString(c,"standalone");
3183 
3184     if (!server.sentinel_mode) {
3185         addReplyBulkCString(c,"role");
3186         addReplyBulkCString(c,server.masterhost ? "replica" : "master");
3187     }
3188 
3189     addReplyBulkCString(c,"modules");
3190     addReplyLoadedModules(c);
3191 }
3192 
3193 /* This callback is bound to POST and "Host:" command names. Those are not
3194  * really commands, but are used in security attacks in order to talk to
3195  * Redis instances via HTTP, with a technique called "cross protocol scripting"
3196  * which exploits the fact that services like Redis will discard invalid
3197  * HTTP headers and will process what follows.
3198  *
3199  * As a protection against this attack, Redis will terminate the connection
3200  * when a POST or "Host:" header is seen, and will log the event from
3201  * time to time (to avoid creating a DOS as a result of too many logs). */
securityWarningCommand(client * c)3202 void securityWarningCommand(client *c) {
3203     static time_t logged_time = 0;
3204     time_t now = time(NULL);
3205 
3206     if (llabs(now-logged_time) > 60) {
3207         serverLog(LL_WARNING,"Possible SECURITY ATTACK detected. It looks like somebody is sending POST or Host: commands to Redis. This is likely due to an attacker attempting to use Cross Protocol Scripting to compromise your Redis instance. Connection aborted.");
3208         logged_time = now;
3209     }
3210     freeClientAsync(c);
3211 }
3212 
3213 /* Keep track of the original command arguments so that we can generate
3214  * an accurate slowlog entry after the command has been executed. */
retainOriginalCommandVector(client * c)3215 static void retainOriginalCommandVector(client *c) {
3216     /* We already rewrote this command, so don't rewrite it again */
3217     if (c->original_argv) return;
3218     c->original_argc = c->argc;
3219     c->original_argv = zmalloc(sizeof(robj*)*(c->argc));
3220     for (int j = 0; j < c->argc; j++) {
3221         c->original_argv[j] = c->argv[j];
3222         incrRefCount(c->argv[j]);
3223     }
3224 }
3225 
3226 /* Redact a given argument to prevent it from being shown
3227  * in the slowlog. This information is stored in the
3228  * original_argv array. */
redactClientCommandArgument(client * c,int argc)3229 void redactClientCommandArgument(client *c, int argc) {
3230     retainOriginalCommandVector(c);
3231     decrRefCount(c->argv[argc]);
3232     c->original_argv[argc] = shared.redacted;
3233 }
3234 
3235 /* Rewrite the command vector of the client. All the new objects ref count
3236  * is incremented. The old command vector is freed, and the old objects
3237  * ref count is decremented. */
rewriteClientCommandVector(client * c,int argc,...)3238 void rewriteClientCommandVector(client *c, int argc, ...) {
3239     va_list ap;
3240     int j;
3241     robj **argv; /* The new argument vector */
3242 
3243     argv = zmalloc(sizeof(robj*)*argc);
3244     va_start(ap,argc);
3245     for (j = 0; j < argc; j++) {
3246         robj *a;
3247 
3248         a = va_arg(ap, robj*);
3249         argv[j] = a;
3250         incrRefCount(a);
3251     }
3252     replaceClientCommandVector(c, argc, argv);
3253     va_end(ap);
3254 }
3255 
3256 /* Completely replace the client command vector with the provided one. */
replaceClientCommandVector(client * c,int argc,robj ** argv)3257 void replaceClientCommandVector(client *c, int argc, robj **argv) {
3258     int j;
3259     retainOriginalCommandVector(c);
3260     freeClientArgv(c);
3261     zfree(c->argv);
3262     c->argv = argv;
3263     c->argc = argc;
3264     c->argv_len_sum = 0;
3265     for (j = 0; j < c->argc; j++)
3266         if (c->argv[j])
3267             c->argv_len_sum += getStringObjectLen(c->argv[j]);
3268     c->cmd = lookupCommandOrOriginal(c->argv,c->argc);
3269     serverAssertWithInfo(c,NULL,c->cmd != NULL);
3270 }
3271 
3272 /* Rewrite a single item in the command vector.
3273  * The new val ref count is incremented, and the old decremented.
3274  *
3275  * It is possible to specify an argument over the current size of the
3276  * argument vector: in this case the array of objects gets reallocated
3277  * and c->argc set to the max value. However it's up to the caller to
3278  *
3279  * 1. Make sure there are no "holes" and all the arguments are set.
3280  * 2. If the original argument vector was longer than the one we
3281  *    want to end with, it's up to the caller to set c->argc and
3282  *    free the no longer used objects on c->argv. */
rewriteClientCommandArgument(client * c,int i,robj * newval)3283 void rewriteClientCommandArgument(client *c, int i, robj *newval) {
3284     robj *oldval;
3285     retainOriginalCommandVector(c);
3286 
3287     /* We need to handle both extending beyond argc (just update it and
3288      * initialize the new element) or beyond argv_len (realloc is needed).
3289      */
3290     if (i >= c->argc) {
3291         if (i >= c->argv_len) {
3292             c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1));
3293             c->argv_len = i+1;
3294         }
3295         c->argc = i+1;
3296         c->argv[i] = NULL;
3297     }
3298     oldval = c->argv[i];
3299     if (oldval) c->argv_len_sum -= getStringObjectLen(oldval);
3300     if (newval) c->argv_len_sum += getStringObjectLen(newval);
3301     c->argv[i] = newval;
3302     incrRefCount(newval);
3303     if (oldval) decrRefCount(oldval);
3304 
3305     /* If this is the command name make sure to fix c->cmd. */
3306     if (i == 0) {
3307         c->cmd = lookupCommandOrOriginal(c->argv,c->argc);
3308         serverAssertWithInfo(c,NULL,c->cmd != NULL);
3309     }
3310 }
3311 
3312 /* This function returns the number of bytes that Redis is
3313  * using to store the reply still not read by the client.
3314  *
3315  * Note: this function is very fast so can be called as many time as
3316  * the caller wishes. The main usage of this function currently is
3317  * enforcing the client output length limits. */
getClientOutputBufferMemoryUsage(client * c)3318 size_t getClientOutputBufferMemoryUsage(client *c) {
3319     if (getClientType(c) == CLIENT_TYPE_SLAVE) {
3320         size_t repl_buf_size = 0;
3321         size_t repl_node_num = 0;
3322         size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock);
3323         if (c->ref_repl_buf_node) {
3324             replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
3325             replBufBlock *cur = listNodeValue(c->ref_repl_buf_node);
3326             repl_buf_size = last->repl_offset + last->size - cur->repl_offset;
3327             repl_node_num = last->id - cur->id + 1;
3328         }
3329         return repl_buf_size + (repl_node_size*repl_node_num);
3330     } else {
3331         size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
3332         return c->reply_bytes + (list_item_size*listLength(c->reply));
3333     }
3334 }
3335 
3336 /* Returns the total client's memory usage.
3337  * Optionally, if output_buffer_mem_usage is not NULL, it fills it with
3338  * the client output buffer memory usage portion of the total. */
getClientMemoryUsage(client * c,size_t * output_buffer_mem_usage)3339 size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
3340     size_t mem = getClientOutputBufferMemoryUsage(c);
3341     if (output_buffer_mem_usage != NULL)
3342         *output_buffer_mem_usage = mem;
3343     mem += sdsZmallocSize(c->querybuf);
3344     mem += zmalloc_size(c);
3345     /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
3346      * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
3347      * spot problematic clients. */
3348     mem += c->argv_len_sum + sizeof(robj*)*c->argc;
3349     mem += multiStateMemOverhead(c);
3350 
3351     /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers
3352      * to the strings themselves because they aren't stored per client. */
3353     mem += listLength(c->pubsub_patterns) * sizeof(listNode);
3354     mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) +
3355            dictSlots(c->pubsub_channels) * sizeof(dictEntry*);
3356 
3357     /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire rax */
3358     if (c->client_tracking_prefixes)
3359         mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*));
3360 
3361     return mem;
3362 }
3363 
3364 /* Get the class of a client, used in order to enforce limits to different
3365  * classes of clients.
3366  *
3367  * The function will return one of the following:
3368  * CLIENT_TYPE_NORMAL -> Normal client
3369  * CLIENT_TYPE_SLAVE  -> Slave
3370  * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
3371  * CLIENT_TYPE_MASTER -> The client representing our replication master.
3372  */
getClientType(client * c)3373 int getClientType(client *c) {
3374     if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
3375     /* Even though MONITOR clients are marked as replicas, we
3376      * want the expose them as normal clients. */
3377     if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
3378         return CLIENT_TYPE_SLAVE;
3379     if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;
3380     return CLIENT_TYPE_NORMAL;
3381 }
3382 
getClientTypeByName(char * name)3383 int getClientTypeByName(char *name) {
3384     if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;
3385     else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;
3386     else if (!strcasecmp(name,"replica")) return CLIENT_TYPE_SLAVE;
3387     else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;
3388     else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER;
3389     else return -1;
3390 }
3391 
getClientTypeName(int class)3392 char *getClientTypeName(int class) {
3393     switch(class) {
3394     case CLIENT_TYPE_NORMAL: return "normal";
3395     case CLIENT_TYPE_SLAVE:  return "slave";
3396     case CLIENT_TYPE_PUBSUB: return "pubsub";
3397     case CLIENT_TYPE_MASTER: return "master";
3398     default:                       return NULL;
3399     }
3400 }
3401 
3402 /* The function checks if the client reached output buffer soft or hard
3403  * limit, and also update the state needed to check the soft limit as
3404  * a side effect.
3405  *
3406  * Return value: non-zero if the client reached the soft or the hard limit.
3407  *               Otherwise zero is returned. */
checkClientOutputBufferLimits(client * c)3408 int checkClientOutputBufferLimits(client *c) {
3409     int soft = 0, hard = 0, class;
3410     unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
3411 
3412     class = getClientType(c);
3413     /* For the purpose of output buffer limiting, masters are handled
3414      * like normal clients. */
3415     if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
3416 
3417     /* Note that it doesn't make sense to set the replica clients output buffer
3418      * limit lower than the repl-backlog-size config (partial sync will succeed
3419      * and then replica will get disconnected).
3420      * Such a configuration is ignored (the size of repl-backlog-size will be used).
3421      * This doesn't have memory consumption implications since the replica client
3422      * will share the backlog buffers memory. */
3423     size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes;
3424     if (class == CLIENT_TYPE_SLAVE && hard_limit_bytes &&
3425         (long long)hard_limit_bytes < server.repl_backlog_size)
3426         hard_limit_bytes = server.repl_backlog_size;
3427     if (server.client_obuf_limits[class].hard_limit_bytes &&
3428         used_mem >= hard_limit_bytes)
3429         hard = 1;
3430     if (server.client_obuf_limits[class].soft_limit_bytes &&
3431         used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
3432         soft = 1;
3433 
3434     /* We need to check if the soft limit is reached continuously for the
3435      * specified amount of seconds. */
3436     if (soft) {
3437         if (c->obuf_soft_limit_reached_time == 0) {
3438             c->obuf_soft_limit_reached_time = server.unixtime;
3439             soft = 0; /* First time we see the soft limit reached */
3440         } else {
3441             time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
3442 
3443             if (elapsed <=
3444                 server.client_obuf_limits[class].soft_limit_seconds) {
3445                 soft = 0; /* The client still did not reached the max number of
3446                              seconds for the soft limit to be considered
3447                              reached. */
3448             }
3449         }
3450     } else {
3451         c->obuf_soft_limit_reached_time = 0;
3452     }
3453     return soft || hard;
3454 }
3455 
3456 /* Asynchronously close a client if soft or hard limit is reached on the
3457  * output buffer size. The caller can check if the client will be closed
3458  * checking if the client CLIENT_CLOSE_ASAP flag is set.
3459  *
3460  * Note: we need to close the client asynchronously because this function is
3461  * called from contexts where the client can't be freed safely, i.e. from the
3462  * lower level functions pushing data inside the client output buffers.
3463  * When `async` is set to 0, we close the client immediately, this is
3464  * useful when called from cron.
3465  *
3466  * Returns 1 if client was (flagged) closed. */
closeClientOnOutputBufferLimitReached(client * c,int async)3467 int closeClientOnOutputBufferLimitReached(client *c, int async) {
3468     if (!c->conn) return 0; /* It is unsafe to free fake clients. */
3469     serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
3470     /* Note that c->reply_bytes is irrelevant for replica clients
3471      * (they use the global repl buffers). */
3472     if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) ||
3473         c->flags & CLIENT_CLOSE_ASAP) return 0;
3474     if (checkClientOutputBufferLimits(c)) {
3475         sds client = catClientInfoString(sdsempty(),c);
3476 
3477         if (async) {
3478             freeClientAsync(c);
3479             serverLog(LL_WARNING,
3480                       "Client %s scheduled to be closed ASAP for overcoming of output buffer limits.",
3481                       client);
3482         } else {
3483             freeClient(c);
3484             serverLog(LL_WARNING,
3485                       "Client %s closed for overcoming of output buffer limits.",
3486                       client);
3487         }
3488         sdsfree(client);
3489         return  1;
3490     }
3491     return 0;
3492 }
3493 
3494 /* Helper function used by performEvictions() in order to flush slaves
3495  * output buffers without returning control to the event loop.
3496  * This is also called by SHUTDOWN for a best-effort attempt to send
3497  * slaves the latest writes. */
flushSlavesOutputBuffers(void)3498 void flushSlavesOutputBuffers(void) {
3499     listIter li;
3500     listNode *ln;
3501 
3502     listRewind(server.slaves,&li);
3503     while((ln = listNext(&li))) {
3504         client *slave = listNodeValue(ln);
3505         int can_receive_writes = connHasWriteHandler(slave->conn) ||
3506                                  (slave->flags & CLIENT_PENDING_WRITE);
3507 
3508         /* We don't want to send the pending data to the replica in a few
3509          * cases:
3510          *
3511          * 1. For some reason there is neither the write handler installed
3512          *    nor the client is flagged as to have pending writes: for some
3513          *    reason this replica may not be set to receive data. This is
3514          *    just for the sake of defensive programming.
3515          *
3516          * 2. The put_online_on_ack flag is true. To know why we don't want
3517          *    to send data to the replica in this case, please grep for the
3518          *    flag for this flag.
3519          *
3520          * 3. Obviously if the slave is not ONLINE.
3521          */
3522         if (slave->replstate == SLAVE_STATE_ONLINE &&
3523             can_receive_writes &&
3524             !slave->repl_put_online_on_ack &&
3525             clientHasPendingReplies(slave))
3526         {
3527             writeToClient(slave,0);
3528         }
3529     }
3530 }
3531 
3532 /* Pause clients up to the specified unixtime (in ms) for a given type of
3533  * commands.
3534  *
3535  * A main use case of this function is to allow pausing replication traffic
3536  * so that a failover without data loss to occur. Replicas will continue to receive
3537  * traffic to facilitate this functionality.
3538  *
3539  * This function is also internally used by Redis Cluster for the manual
3540  * failover procedure implemented by CLUSTER FAILOVER.
3541  *
3542  * The function always succeed, even if there is already a pause in progress.
3543  * In such a case, the duration is set to the maximum and new end time and the
3544  * type is set to the more restrictive type of pause. */
pauseClients(mstime_t end,pause_type type)3545 void pauseClients(mstime_t end, pause_type type) {
3546     if (type > server.client_pause_type) {
3547         server.client_pause_type = type;
3548     }
3549 
3550     if (end > server.client_pause_end_time) {
3551         server.client_pause_end_time = end;
3552     }
3553 
3554     /* We allow write commands that were queued
3555      * up before and after to execute. We need
3556      * to track this state so that we don't assert
3557      * in propagate(). */
3558     if (server.in_exec) {
3559         server.client_pause_in_transaction = 1;
3560     }
3561 }
3562 
3563 /* Unpause clients and queue them for reprocessing. */
unpauseClients(void)3564 void unpauseClients(void) {
3565     listNode *ln;
3566     listIter li;
3567     client *c;
3568 
3569     server.client_pause_type = CLIENT_PAUSE_OFF;
3570     server.client_pause_end_time = 0;
3571 
3572     /* Unblock all of the clients so they are reprocessed. */
3573     listRewind(server.paused_clients,&li);
3574     while ((ln = listNext(&li)) != NULL) {
3575         c = listNodeValue(ln);
3576         unblockClient(c);
3577     }
3578 }
3579 
3580 /* Returns true if clients are paused and false otherwise. */
areClientsPaused(void)3581 int areClientsPaused(void) {
3582     return server.client_pause_type != CLIENT_PAUSE_OFF;
3583 }
3584 
3585 /* Checks if the current client pause has elapsed and unpause clients
3586  * if it has. Also returns true if clients are now paused and false
3587  * otherwise. */
checkClientPauseTimeoutAndReturnIfPaused(void)3588 int checkClientPauseTimeoutAndReturnIfPaused(void) {
3589     if (!areClientsPaused())
3590         return 0;
3591     if (server.client_pause_end_time < server.mstime) {
3592         unpauseClients();
3593     }
3594     return areClientsPaused();
3595 }
3596 
3597 /* This function is called by Redis in order to process a few events from
3598  * time to time while blocked into some not interruptible operation.
3599  * This allows to reply to clients with the -LOADING error while loading the
3600  * data set at startup or after a full resynchronization with the master
3601  * and so forth.
3602  *
3603  * It calls the event loop in order to process a few events. Specifically we
3604  * try to call the event loop 4 times as long as we receive acknowledge that
3605  * some event was processed, in order to go forward with the accept, read,
3606  * write, close sequence needed to serve a client.
3607  *
3608  * The function returns the total number of events processed. */
processEventsWhileBlocked(void)3609 void processEventsWhileBlocked(void) {
3610     int iterations = 4; /* See the function top-comment. */
3611 
3612     /* Update our cached time since it is used to create and update the last
3613      * interaction time with clients and for other important things. */
3614     updateCachedTime(0);
3615 
3616     /* Note: when we are processing events while blocked (for instance during
3617      * busy Lua scripts), we set a global flag. When such flag is set, we
3618      * avoid handling the read part of clients using threaded I/O.
3619      * See https://github.com/redis/redis/issues/6988 for more info. */
3620     ProcessingEventsWhileBlocked = 1;
3621     while (iterations--) {
3622         long long startval = server.events_processed_while_blocked;
3623         long long ae_events = aeProcessEvents(server.el,
3624             AE_FILE_EVENTS|AE_DONT_WAIT|
3625             AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
3626         /* Note that server.events_processed_while_blocked will also get
3627          * incremented by callbacks called by the event loop handlers. */
3628         server.events_processed_while_blocked += ae_events;
3629         long long events = server.events_processed_while_blocked - startval;
3630         if (!events) break;
3631     }
3632 
3633     whileBlockedCron();
3634 
3635     ProcessingEventsWhileBlocked = 0;
3636 }
3637 
3638 /* ==========================================================================
3639  * Threaded I/O
3640  * ========================================================================== */
3641 
3642 #define IO_THREADS_MAX_NUM 128
3643 
3644 pthread_t io_threads[IO_THREADS_MAX_NUM];
3645 pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
3646 redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
3647 int io_threads_op;      /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??!
3648 
3649 /* This is the list of clients each thread will serve when threaded I/O is
3650  * used. We spawn io_threads_num-1 threads, since one is the main thread
3651  * itself. */
3652 list *io_threads_list[IO_THREADS_MAX_NUM];
3653 
getIOPendingCount(int i)3654 static inline unsigned long getIOPendingCount(int i) {
3655     unsigned long count = 0;
3656     atomicGetWithSync(io_threads_pending[i], count);
3657     return count;
3658 }
3659 
setIOPendingCount(int i,unsigned long count)3660 static inline void setIOPendingCount(int i, unsigned long count) {
3661     atomicSetWithSync(io_threads_pending[i], count);
3662 }
3663 
IOThreadMain(void * myid)3664 void *IOThreadMain(void *myid) {
3665     /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
3666      * used by the thread to just manipulate a single sub-array of clients. */
3667     long id = (unsigned long)myid;
3668     char thdname[16];
3669 
3670     snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
3671     redis_set_thread_title(thdname);
3672     redisSetCpuAffinity(server.server_cpulist);
3673     makeThreadKillable();
3674 
3675     while(1) {
3676         /* Wait for start */
3677         for (int j = 0; j < 1000000; j++) {
3678             if (getIOPendingCount(id) != 0) break;
3679         }
3680 
3681         /* Give the main thread a chance to stop this thread. */
3682         if (getIOPendingCount(id) == 0) {
3683             pthread_mutex_lock(&io_threads_mutex[id]);
3684             pthread_mutex_unlock(&io_threads_mutex[id]);
3685             continue;
3686         }
3687 
3688         serverAssert(getIOPendingCount(id) != 0);
3689 
3690         /* Process: note that the main thread will never touch our list
3691          * before we drop the pending count to 0. */
3692         listIter li;
3693         listNode *ln;
3694         listRewind(io_threads_list[id],&li);
3695         while((ln = listNext(&li))) {
3696             client *c = listNodeValue(ln);
3697             if (io_threads_op == IO_THREADS_OP_WRITE) {
3698                 writeToClient(c,0);
3699             } else if (io_threads_op == IO_THREADS_OP_READ) {
3700                 readQueryFromClient(c->conn);
3701             } else {
3702                 serverPanic("io_threads_op value is unknown");
3703             }
3704         }
3705         listEmpty(io_threads_list[id]);
3706         setIOPendingCount(id, 0);
3707     }
3708 }
3709 
3710 /* Initialize the data structures needed for threaded I/O. */
initThreadedIO(void)3711 void initThreadedIO(void) {
3712     server.io_threads_active = 0; /* We start with threads not active. */
3713 
3714     /* Indicate that io-threads are currently idle */
3715     io_threads_op = IO_THREADS_OP_IDLE;
3716 
3717     /* Don't spawn any thread if the user selected a single thread:
3718      * we'll handle I/O directly from the main thread. */
3719     if (server.io_threads_num == 1) return;
3720 
3721     if (server.io_threads_num > IO_THREADS_MAX_NUM) {
3722         serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
3723                              "The maximum number is %d.", IO_THREADS_MAX_NUM);
3724         exit(1);
3725     }
3726 
3727     /* Spawn and initialize the I/O threads. */
3728     for (int i = 0; i < server.io_threads_num; i++) {
3729         /* Things we do for all the threads including the main thread. */
3730         io_threads_list[i] = listCreate();
3731         if (i == 0) continue; /* Thread 0 is the main thread. */
3732 
3733         /* Things we do only for the additional threads. */
3734         pthread_t tid;
3735         pthread_mutex_init(&io_threads_mutex[i],NULL);
3736         setIOPendingCount(i, 0);
3737         pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
3738         if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
3739             serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
3740             exit(1);
3741         }
3742         io_threads[i] = tid;
3743     }
3744 }
3745 
killIOThreads(void)3746 void killIOThreads(void) {
3747     int err, j;
3748     for (j = 0; j < server.io_threads_num; j++) {
3749         if (io_threads[j] == pthread_self()) continue;
3750         if (io_threads[j] && pthread_cancel(io_threads[j]) == 0) {
3751             if ((err = pthread_join(io_threads[j],NULL)) != 0) {
3752                 serverLog(LL_WARNING,
3753                     "IO thread(tid:%lu) can not be joined: %s",
3754                         (unsigned long)io_threads[j], strerror(err));
3755             } else {
3756                 serverLog(LL_WARNING,
3757                     "IO thread(tid:%lu) terminated",(unsigned long)io_threads[j]);
3758             }
3759         }
3760     }
3761 }
3762 
startThreadedIO(void)3763 void startThreadedIO(void) {
3764     serverAssert(server.io_threads_active == 0);
3765     for (int j = 1; j < server.io_threads_num; j++)
3766         pthread_mutex_unlock(&io_threads_mutex[j]);
3767     server.io_threads_active = 1;
3768 }
3769 
stopThreadedIO(void)3770 void stopThreadedIO(void) {
3771     /* We may have still clients with pending reads when this function
3772      * is called: handle them before stopping the threads. */
3773     handleClientsWithPendingReadsUsingThreads();
3774     serverAssert(server.io_threads_active == 1);
3775     for (int j = 1; j < server.io_threads_num; j++)
3776         pthread_mutex_lock(&io_threads_mutex[j]);
3777     server.io_threads_active = 0;
3778 }
3779 
3780 /* This function checks if there are not enough pending clients to justify
3781  * taking the I/O threads active: in that case I/O threads are stopped if
3782  * currently active. We track the pending writes as a measure of clients
3783  * we need to handle in parallel, however the I/O threading is disabled
3784  * globally for reads as well if we have too little pending clients.
3785  *
3786  * The function returns 0 if the I/O threading should be used because there
3787  * are enough active threads, otherwise 1 is returned and the I/O threads
3788  * could be possibly stopped (if already active) as a side effect. */
stopThreadedIOIfNeeded(void)3789 int stopThreadedIOIfNeeded(void) {
3790     int pending = listLength(server.clients_pending_write);
3791 
3792     /* Return ASAP if IO threads are disabled (single threaded mode). */
3793     if (server.io_threads_num == 1) return 1;
3794 
3795     if (pending < (server.io_threads_num*2)) {
3796         if (server.io_threads_active) stopThreadedIO();
3797         return 1;
3798     } else {
3799         return 0;
3800     }
3801 }
3802 
3803 /* This function achieves thread safety using a fan-out -> fan-in paradigm:
3804  * Fan out: The main thread fans out work to the io-threads which block until
3805  * setIOPendingCount() is called with a value larger than 0 by the main thread.
3806  * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
3807  * it can safely perform post-processing and return to normal synchronous
3808  * work. */
handleClientsWithPendingWritesUsingThreads(void)3809 int handleClientsWithPendingWritesUsingThreads(void) {
3810     int processed = listLength(server.clients_pending_write);
3811     if (processed == 0) return 0; /* Return ASAP if there are no clients. */
3812 
3813     /* If I/O threads are disabled or we have few clients to serve, don't
3814      * use I/O threads, but the boring synchronous code. */
3815     if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
3816         return handleClientsWithPendingWrites();
3817     }
3818 
3819     /* Start threads if needed. */
3820     if (!server.io_threads_active) startThreadedIO();
3821 
3822     /* Distribute the clients across N different lists. */
3823     listIter li;
3824     listNode *ln;
3825     listRewind(server.clients_pending_write,&li);
3826     int item_id = 0;
3827     while((ln = listNext(&li))) {
3828         client *c = listNodeValue(ln);
3829         c->flags &= ~CLIENT_PENDING_WRITE;
3830 
3831         /* Remove clients from the list of pending writes since
3832          * they are going to be closed ASAP. */
3833         if (c->flags & CLIENT_CLOSE_ASAP) {
3834             listDelNode(server.clients_pending_write, ln);
3835             continue;
3836         }
3837 
3838         /* Since all replicas and replication backlog use global replication
3839          * buffer, to guarantee data accessing thread safe, we must put all
3840          * replicas client into io_threads_list[0] i.e. main thread handles
3841          * sending the output buffer of all replicas. */
3842         if (getClientType(c) == CLIENT_TYPE_SLAVE) {
3843             listAddNodeTail(io_threads_list[0],c);
3844             continue;
3845         }
3846 
3847         int target_id = item_id % server.io_threads_num;
3848         listAddNodeTail(io_threads_list[target_id],c);
3849         item_id++;
3850     }
3851 
3852     /* Give the start condition to the waiting threads, by setting the
3853      * start condition atomic var. */
3854     io_threads_op = IO_THREADS_OP_WRITE;
3855     for (int j = 1; j < server.io_threads_num; j++) {
3856         int count = listLength(io_threads_list[j]);
3857         setIOPendingCount(j, count);
3858     }
3859 
3860     /* Also use the main thread to process a slice of clients. */
3861     listRewind(io_threads_list[0],&li);
3862     while((ln = listNext(&li))) {
3863         client *c = listNodeValue(ln);
3864         writeToClient(c,0);
3865     }
3866     listEmpty(io_threads_list[0]);
3867 
3868     /* Wait for all the other threads to end their work. */
3869     while(1) {
3870         unsigned long pending = 0;
3871         for (int j = 1; j < server.io_threads_num; j++)
3872             pending += getIOPendingCount(j);
3873         if (pending == 0) break;
3874     }
3875 
3876     io_threads_op = IO_THREADS_OP_IDLE;
3877 
3878     /* Run the list of clients again to install the write handler where
3879      * needed. */
3880     listRewind(server.clients_pending_write,&li);
3881     while((ln = listNext(&li))) {
3882         client *c = listNodeValue(ln);
3883 
3884         /* Update the client in the mem usage buckets after we're done processing it in the io-threads */
3885         updateClientMemUsageBucket(c);
3886 
3887         /* Install the write handler if there are pending writes in some
3888          * of the clients. */
3889         if (clientHasPendingReplies(c) &&
3890                 connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
3891         {
3892             freeClientAsync(c);
3893         }
3894     }
3895     listEmpty(server.clients_pending_write);
3896 
3897     /* Update processed count on server */
3898     server.stat_io_writes_processed += processed;
3899 
3900     return processed;
3901 }
3902 
3903 /* Return 1 if we want to handle the client read later using threaded I/O.
3904  * This is called by the readable handler of the event loop.
3905  * As a side effect of calling this function the client is put in the
3906  * pending read clients and flagged as such. */
postponeClientRead(client * c)3907 int postponeClientRead(client *c) {
3908     if (server.io_threads_active &&
3909         server.io_threads_do_reads &&
3910         !ProcessingEventsWhileBlocked &&
3911         !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
3912         io_threads_op == IO_THREADS_OP_IDLE)
3913     {
3914         listAddNodeHead(server.clients_pending_read,c);
3915         c->pending_read_list_node = listFirst(server.clients_pending_read);
3916         return 1;
3917     } else {
3918         return 0;
3919     }
3920 }
3921 
3922 /* When threaded I/O is also enabled for the reading + parsing side, the
3923  * readable handler will just put normal clients into a queue of clients to
3924  * process (instead of serving them synchronously). This function runs
3925  * the queue using the I/O threads, and process them in order to accumulate
3926  * the reads in the buffers, and also parse the first command available
3927  * rendering it in the client structures.
3928  * This function achieves thread safety using a fan-out -> fan-in paradigm:
3929  * Fan out: The main thread fans out work to the io-threads which block until
3930  * setIOPendingCount() is called with a value larger than 0 by the main thread.
3931  * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
3932  * it can safely perform post-processing and return to normal synchronous
3933  * work. */
handleClientsWithPendingReadsUsingThreads(void)3934 int handleClientsWithPendingReadsUsingThreads(void) {
3935     if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
3936     int processed = listLength(server.clients_pending_read);
3937     if (processed == 0) return 0;
3938 
3939     /* Distribute the clients across N different lists. */
3940     listIter li;
3941     listNode *ln;
3942     listRewind(server.clients_pending_read,&li);
3943     int item_id = 0;
3944     while((ln = listNext(&li))) {
3945         client *c = listNodeValue(ln);
3946         int target_id = item_id % server.io_threads_num;
3947         listAddNodeTail(io_threads_list[target_id],c);
3948         item_id++;
3949     }
3950 
3951     /* Give the start condition to the waiting threads, by setting the
3952      * start condition atomic var. */
3953     io_threads_op = IO_THREADS_OP_READ;
3954     for (int j = 1; j < server.io_threads_num; j++) {
3955         int count = listLength(io_threads_list[j]);
3956         setIOPendingCount(j, count);
3957     }
3958 
3959     /* Also use the main thread to process a slice of clients. */
3960     listRewind(io_threads_list[0],&li);
3961     while((ln = listNext(&li))) {
3962         client *c = listNodeValue(ln);
3963         readQueryFromClient(c->conn);
3964     }
3965     listEmpty(io_threads_list[0]);
3966 
3967     /* Wait for all the other threads to end their work. */
3968     while(1) {
3969         unsigned long pending = 0;
3970         for (int j = 1; j < server.io_threads_num; j++)
3971             pending += getIOPendingCount(j);
3972         if (pending == 0) break;
3973     }
3974 
3975     io_threads_op = IO_THREADS_OP_IDLE;
3976 
3977     /* Run the list of clients again to process the new buffers. */
3978     while(listLength(server.clients_pending_read)) {
3979         ln = listFirst(server.clients_pending_read);
3980         client *c = listNodeValue(ln);
3981         listDelNode(server.clients_pending_read,ln);
3982         c->pending_read_list_node = NULL;
3983 
3984         serverAssert(!(c->flags & CLIENT_BLOCKED));
3985 
3986         if (beforeNextClient(c) == C_ERR) {
3987             /* If the client is no longer valid, we avoid
3988              * processing the client later. So we just go
3989              * to the next. */
3990             continue;
3991         }
3992 
3993         /* Once io-threads are idle we can update the client in the mem usage buckets */
3994         updateClientMemUsageBucket(c);
3995 
3996         if (processPendingCommandsAndResetClient(c) == C_ERR) {
3997             /* If the client is no longer valid, we avoid
3998              * processing the client later. So we just go
3999              * to the next. */
4000             continue;
4001         }
4002 
4003         if (processInputBuffer(c) == C_ERR) {
4004             /* If the client is no longer valid, we avoid
4005              * processing the client later. So we just go
4006              * to the next. */
4007             continue;
4008         }
4009 
4010         /* We may have pending replies if a thread readQueryFromClient() produced
4011          * replies and did not install a write handler (it can't).
4012          */
4013         if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
4014             clientInstallWriteHandler(c);
4015     }
4016 
4017     /* Update processed count on server */
4018     server.stat_io_reads_processed += processed;
4019 
4020     return processed;
4021 }
4022 
4023 /* Returns the actual client eviction limit based on current configuration or
4024  * 0 if no limit. */
getClientEvictionLimit(void)4025 size_t getClientEvictionLimit(void) {
4026     size_t maxmemory_clients_actual = SIZE_MAX;
4027 
4028     /* Handle percentage of maxmemory*/
4029     if (server.maxmemory_clients < 0 && server.maxmemory > 0) {
4030         unsigned long long maxmemory_clients_bytes = (unsigned long long)((double)server.maxmemory * -(double) server.maxmemory_clients / 100);
4031         if (maxmemory_clients_bytes <= SIZE_MAX)
4032             maxmemory_clients_actual = maxmemory_clients_bytes;
4033     }
4034     else if (server.maxmemory_clients > 0)
4035         maxmemory_clients_actual = server.maxmemory_clients;
4036     else
4037         return 0;
4038 
4039     /* Don't allow a too small maxmemory-clients to avoid cases where we can't communicate
4040      * at all with the server because of bad configuration */
4041     if (maxmemory_clients_actual < 1024*128)
4042         maxmemory_clients_actual = 1024*128;
4043 
4044     return maxmemory_clients_actual;
4045 }
4046 
evictClients(void)4047 void evictClients(void) {
4048     /* Start eviction from topmost bucket (largest clients) */
4049     int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1;
4050     listIter bucket_iter;
4051     listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
4052     size_t client_eviction_limit = getClientEvictionLimit();
4053     if (client_eviction_limit == 0)
4054         return;
4055     while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
4056            server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >= client_eviction_limit) {
4057         listNode *ln = listNext(&bucket_iter);
4058         if (ln) {
4059             client *c = ln->value;
4060             sds ci = catClientInfoString(sdsempty(),c);
4061             serverLog(LL_NOTICE, "Evicting client: %s", ci);
4062             freeClient(c);
4063             sdsfree(ci);
4064             server.stat_evictedclients++;
4065         } else {
4066             curr_bucket--;
4067             if (curr_bucket < 0) {
4068                 serverLog(LL_WARNING, "Over client maxmemory after evicting all evictable clients");
4069                 break;
4070             }
4071             listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
4072         }
4073     }
4074 }
4075