1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31 #include "atomicvar.h"
32 #include "cluster.h"
33 #include <sys/socket.h>
34 #include <sys/uio.h>
35 #include <math.h>
36 #include <ctype.h>
37
38 static void setProtocolError(const char *errstr, client *c);
39 int postponeClientRead(client *c);
40 int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
41
42 /* Return the size consumed from the allocator, for the specified SDS string,
43 * including internal fragmentation. This function is used in order to compute
44 * the client output buffer size. */
sdsZmallocSize(sds s)45 size_t sdsZmallocSize(sds s) {
46 void *sh = sdsAllocPtr(s);
47 return zmalloc_size(sh);
48 }
49
50 /* Return the amount of memory used by the sds string at object->ptr
51 * for a string object. This includes internal fragmentation. */
getStringObjectSdsUsedMemory(robj * o)52 size_t getStringObjectSdsUsedMemory(robj *o) {
53 serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
54 switch(o->encoding) {
55 case OBJ_ENCODING_RAW: return sdsZmallocSize(o->ptr);
56 case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj);
57 default: return 0; /* Just integer encoding for now. */
58 }
59 }
60
61 /* Return the length of a string object.
62 * This does NOT includes internal fragmentation or sds unused space. */
getStringObjectLen(robj * o)63 size_t getStringObjectLen(robj *o) {
64 serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
65 switch(o->encoding) {
66 case OBJ_ENCODING_RAW: return sdslen(o->ptr);
67 case OBJ_ENCODING_EMBSTR: return sdslen(o->ptr);
68 default: return 0; /* Just integer encoding for now. */
69 }
70 }
71
72 /* Client.reply list dup and free methods. */
dupClientReplyValue(void * o)73 void *dupClientReplyValue(void *o) {
74 clientReplyBlock *old = o;
75 clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size);
76 memcpy(buf, o, sizeof(clientReplyBlock) + old->size);
77 return buf;
78 }
79
freeClientReplyValue(void * o)80 void freeClientReplyValue(void *o) {
81 zfree(o);
82 }
83
listMatchObjects(void * a,void * b)84 int listMatchObjects(void *a, void *b) {
85 return equalStringObjects(a,b);
86 }
87
88 /* This function links the client to the global linked list of clients.
89 * unlinkClient() does the opposite, among other things. */
linkClient(client * c)90 void linkClient(client *c) {
91 listAddNodeTail(server.clients,c);
92 /* Note that we remember the linked list node where the client is stored,
93 * this way removing the client in unlinkClient() will not require
94 * a linear scan, but just a constant time operation. */
95 c->client_list_node = listLast(server.clients);
96 uint64_t id = htonu64(c->id);
97 raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
98 }
99
100 /* Initialize client authentication state.
101 */
clientSetDefaultAuth(client * c)102 static void clientSetDefaultAuth(client *c) {
103 /* If the default user does not require authentication, the user is
104 * directly authenticated. */
105 c->user = DefaultUser;
106 c->authenticated = (c->user->flags & USER_FLAG_NOPASS) &&
107 !(c->user->flags & USER_FLAG_DISABLED);
108 }
109
authRequired(client * c)110 int authRequired(client *c) {
111 /* Check if the user is authenticated. This check is skipped in case
112 * the default user is flagged as "nopass" and is active. */
113 int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
114 (DefaultUser->flags & USER_FLAG_DISABLED)) &&
115 !c->authenticated;
116 return auth_required;
117 }
118
createClient(connection * conn)119 client *createClient(connection *conn) {
120 client *c = zmalloc(sizeof(client));
121
122 /* passing NULL as conn it is possible to create a non connected client.
123 * This is useful since all the commands needs to be executed
124 * in the context of a client. When commands are executed in other
125 * contexts (for instance a Lua script) we need a non connected client. */
126 if (conn) {
127 connEnableTcpNoDelay(conn);
128 if (server.tcpkeepalive)
129 connKeepAlive(conn,server.tcpkeepalive);
130 connSetReadHandler(conn, readQueryFromClient);
131 connSetPrivateData(conn, c);
132 }
133
134 selectDb(c,0);
135 uint64_t client_id;
136 atomicGetIncr(server.next_client_id, client_id, 1);
137 c->id = client_id;
138 c->resp = 2;
139 c->conn = conn;
140 c->name = NULL;
141 c->bufpos = 0;
142 c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf);
143 c->ref_repl_buf_node = NULL;
144 c->ref_block_pos = 0;
145 c->qb_pos = 0;
146 c->querybuf = sdsempty();
147 c->pending_querybuf = sdsempty();
148 c->querybuf_peak = 0;
149 c->reqtype = 0;
150 c->argc = 0;
151 c->argv = NULL;
152 c->argv_len = 0;
153 c->argv_len_sum = 0;
154 c->original_argc = 0;
155 c->original_argv = NULL;
156 c->cmd = c->lastcmd = NULL;
157 c->multibulklen = 0;
158 c->bulklen = -1;
159 c->sentlen = 0;
160 c->flags = 0;
161 c->ctime = c->lastinteraction = server.unixtime;
162 clientSetDefaultAuth(c);
163 c->replstate = REPL_STATE_NONE;
164 c->repl_put_online_on_ack = 0;
165 c->reploff = 0;
166 c->read_reploff = 0;
167 c->repl_ack_off = 0;
168 c->repl_ack_time = 0;
169 c->repl_last_partial_write = 0;
170 c->slave_listening_port = 0;
171 c->slave_addr = NULL;
172 c->slave_capa = SLAVE_CAPA_NONE;
173 c->reply = listCreate();
174 c->reply_bytes = 0;
175 c->obuf_soft_limit_reached_time = 0;
176 listSetFreeMethod(c->reply,freeClientReplyValue);
177 listSetDupMethod(c->reply,dupClientReplyValue);
178 c->btype = BLOCKED_NONE;
179 c->bpop.timeout = 0;
180 c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType);
181 c->bpop.target = NULL;
182 c->bpop.xread_group = NULL;
183 c->bpop.xread_consumer = NULL;
184 c->bpop.xread_group_noack = 0;
185 c->bpop.numreplicas = 0;
186 c->bpop.reploffset = 0;
187 c->woff = 0;
188 c->watched_keys = listCreate();
189 c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
190 c->pubsub_patterns = listCreate();
191 c->peerid = NULL;
192 c->sockname = NULL;
193 c->client_list_node = NULL;
194 c->paused_list_node = NULL;
195 c->pending_read_list_node = NULL;
196 c->client_tracking_redirection = 0;
197 c->client_tracking_prefixes = NULL;
198 c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0;
199 c->last_memory_type = CLIENT_TYPE_NORMAL;
200 c->auth_callback = NULL;
201 c->auth_callback_privdata = NULL;
202 c->auth_module = NULL;
203 listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
204 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
205 c->mem_usage_bucket = NULL;
206 c->mem_usage_bucket_node = NULL;
207 if (conn) linkClient(c);
208 initClientMultiState(c);
209 return c;
210 }
211
212 /* This function puts the client in the queue of clients that should write
213 * their output buffers to the socket. Note that it does not *yet* install
214 * the write handler, to start clients are put in a queue of clients that need
215 * to write, so we try to do that before returning in the event loop (see the
216 * handleClientsWithPendingWrites() function).
217 * If we fail and there is more data to write, compared to what the socket
218 * buffers can hold, then we'll really install the handler. */
clientInstallWriteHandler(client * c)219 void clientInstallWriteHandler(client *c) {
220 /* Schedule the client to write the output buffers to the socket only
221 * if not already done and, for slaves, if the slave can actually receive
222 * writes at this stage. */
223 if (!(c->flags & CLIENT_PENDING_WRITE) &&
224 (c->replstate == REPL_STATE_NONE ||
225 (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
226 {
227 /* Here instead of installing the write handler, we just flag the
228 * client and put it into a list of clients that have something
229 * to write to the socket. This way before re-entering the event
230 * loop, we can try to directly write to the client sockets avoiding
231 * a system call. We'll only really install the write handler if
232 * we'll not be able to write the whole reply at once. */
233 c->flags |= CLIENT_PENDING_WRITE;
234 listAddNodeHead(server.clients_pending_write,c);
235 }
236 }
237
238 /* This function is called every time we are going to transmit new data
239 * to the client. The behavior is the following:
240 *
241 * If the client should receive new data (normal clients will) the function
242 * returns C_OK, and make sure to install the write handler in our event
243 * loop so that when the socket is writable new data gets written.
244 *
245 * If the client should not receive new data, because it is a fake client
246 * (used to load AOF in memory), a master or because the setup of the write
247 * handler failed, the function returns C_ERR.
248 *
249 * The function may return C_OK without actually installing the write
250 * event handler in the following cases:
251 *
252 * 1) The event handler should already be installed since the output buffer
253 * already contains something.
254 * 2) The client is a slave but not yet online, so we want to just accumulate
255 * writes in the buffer but not actually sending them yet.
256 *
257 * Typically gets called every time a reply is built, before adding more
258 * data to the clients output buffers. If the function returns C_ERR no
259 * data should be appended to the output buffers. */
prepareClientToWrite(client * c)260 int prepareClientToWrite(client *c) {
261 /* If it's the Lua client we always return ok without installing any
262 * handler since there is no socket at all. */
263 if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
264
265 /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
266 if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
267
268 /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
269 if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
270
271 /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
272 * is set. */
273 if ((c->flags & CLIENT_MASTER) &&
274 !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
275
276 if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
277
278 /* Schedule the client to write the output buffers to the socket, unless
279 * it should already be setup to do so (it has already pending data).
280 *
281 * If CLIENT_PENDING_READ is set, we're in an IO thread and should
282 * not install a write handler. Instead, it will be done by
283 * handleClientsWithPendingReadsUsingThreads() upon return.
284 */
285 if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
286 clientInstallWriteHandler(c);
287
288 /* Authorize the caller to queue in the output buffer of this client. */
289 return C_OK;
290 }
291
292 /* -----------------------------------------------------------------------------
293 * Low level functions to add more data to output buffers.
294 * -------------------------------------------------------------------------- */
295
296 /* Attempts to add the reply to the static buffer in the client struct.
297 * Returns the length of data that is added to the reply buffer.
298 *
299 * Sanitizer suppression: client->buf_usable_size determined by
300 * zmalloc_usable_size() call. Writing beyond client->buf boundaries confuses
301 * sanitizer and generates a false positive out-of-bounds error */
302 REDIS_NO_SANITIZE("bounds")
_addReplyToBuffer(client * c,const char * s,size_t len)303 size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
304 size_t available = c->buf_usable_size - c->bufpos;
305
306 /* If there already are entries in the reply list, we cannot
307 * add anything more to the static buffer. */
308 if (listLength(c->reply) > 0) return 0;
309
310 size_t reply_len = len > available ? available : len;
311 memcpy(c->buf+c->bufpos,s,reply_len);
312 c->bufpos+=reply_len;
313 return reply_len;
314 }
315
316 /* Adds the reply to the reply linked list.
317 * Note: some edits to this function need to be relayed to AddReplyFromClient. */
_addReplyProtoToList(client * c,const char * s,size_t len)318 void _addReplyProtoToList(client *c, const char *s, size_t len) {
319 listNode *ln = listLast(c->reply);
320 clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
321
322 /* Note that 'tail' may be NULL even if we have a tail node, because when
323 * addReplyDeferredLen() is used, it sets a dummy node to NULL just
324 * to fill it later, when the size of the bulk length is set. */
325
326 /* Append to tail string when possible. */
327 if (tail) {
328 /* Copy the part we can fit into the tail, and leave the rest for a
329 * new node */
330 size_t avail = tail->size - tail->used;
331 size_t copy = avail >= len? len: avail;
332 memcpy(tail->buf + tail->used, s, copy);
333 tail->used += copy;
334 s += copy;
335 len -= copy;
336 }
337 if (len) {
338 /* Create a new node, make sure it is allocated to at
339 * least PROTO_REPLY_CHUNK_BYTES */
340 size_t usable_size;
341 size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
342 tail = zmalloc_usable(size + sizeof(clientReplyBlock), &usable_size);
343 /* take over the allocation's internal fragmentation */
344 tail->size = usable_size - sizeof(clientReplyBlock);
345 tail->used = len;
346 memcpy(tail->buf, s, len);
347 listAddNodeTail(c->reply, tail);
348 c->reply_bytes += tail->size;
349
350 closeClientOnOutputBufferLimitReached(c, 1);
351 }
352 }
353
_addReplyToBufferOrList(client * c,const char * s,size_t len)354 void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
355 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
356
357 size_t reply_len = _addReplyToBuffer(c,s,len);
358 if (len > reply_len) _addReplyProtoToList(c,s+reply_len,len-reply_len);
359 }
360
361 /* -----------------------------------------------------------------------------
362 * Higher level functions to queue data on the client output buffer.
363 * The following functions are the ones that commands implementations will call.
364 * -------------------------------------------------------------------------- */
365
366 /* Add the object 'obj' string representation to the client output buffer. */
addReply(client * c,robj * obj)367 void addReply(client *c, robj *obj) {
368 if (prepareClientToWrite(c) != C_OK) return;
369
370 if (sdsEncodedObject(obj)) {
371 _addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr));
372 } else if (obj->encoding == OBJ_ENCODING_INT) {
373 /* For integer encoded strings we just convert it into a string
374 * using our optimized function, and attach the resulting string
375 * to the output buffer. */
376 char buf[32];
377 size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
378 _addReplyToBufferOrList(c,buf,len);
379 } else {
380 serverPanic("Wrong obj->encoding in addReply()");
381 }
382 }
383
384 /* Add the SDS 's' string to the client output buffer, as a side effect
385 * the SDS string is freed. */
addReplySds(client * c,sds s)386 void addReplySds(client *c, sds s) {
387 if (prepareClientToWrite(c) != C_OK) {
388 /* The caller expects the sds to be free'd. */
389 sdsfree(s);
390 return;
391 }
392 _addReplyToBufferOrList(c,s,sdslen(s));
393 sdsfree(s);
394 }
395
396 /* This low level function just adds whatever protocol you send it to the
397 * client buffer, trying the static buffer initially, and using the string
398 * of objects if not possible.
399 *
400 * It is efficient because does not create an SDS object nor an Redis object
401 * if not needed. The object will only be created by calling
402 * _addReplyProtoToList() if we fail to extend the existing tail object
403 * in the list of objects. */
addReplyProto(client * c,const char * s,size_t len)404 void addReplyProto(client *c, const char *s, size_t len) {
405 if (prepareClientToWrite(c) != C_OK) return;
406 _addReplyToBufferOrList(c,s,len);
407 }
408
409 /* Low level function called by the addReplyError...() functions.
410 * It emits the protocol for a Redis error, in the form:
411 *
412 * -ERRORCODE Error Message<CR><LF>
413 *
414 * If the error code is already passed in the string 's', the error
415 * code provided is used, otherwise the string "-ERR " for the generic
416 * error code is automatically added.
417 * Note that 's' must NOT end with \r\n. */
addReplyErrorLength(client * c,const char * s,size_t len)418 void addReplyErrorLength(client *c, const char *s, size_t len) {
419 /* If the string already starts with "-..." then the error code
420 * is provided by the caller. Otherwise we use "-ERR". */
421 if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5);
422 addReplyProto(c,s,len);
423 addReplyProto(c,"\r\n",2);
424 }
425
426 /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
afterErrorReply(client * c,const char * s,size_t len)427 void afterErrorReply(client *c, const char *s, size_t len) {
428 /* Increment the global error counter */
429 server.stat_total_error_replies++;
430 /* Increment the error stats
431 * If the string already starts with "-..." then the error prefix
432 * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */
433 if (s[0] != '-') {
434 incrementErrorCount("ERR", 3);
435 } else {
436 char *spaceloc = memchr(s, ' ', len < 32 ? len : 32);
437 if (spaceloc) {
438 const size_t errEndPos = (size_t)(spaceloc - s);
439 incrementErrorCount(s+1, errEndPos-1);
440 } else {
441 /* Fallback to ERR if we can't retrieve the error prefix */
442 incrementErrorCount("ERR", 3);
443 }
444 }
445
446 /* Sometimes it could be normal that a slave replies to a master with
447 * an error and this function gets called. Actually the error will never
448 * be sent because addReply*() against master clients has no effect...
449 * A notable example is:
450 *
451 * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x
452 *
453 * Where the master must propagate the first change even if the second
454 * will produce an error. However it is useful to log such events since
455 * they are rare and may hint at errors in a script or a bug in Redis. */
456 int ctype = getClientType(c);
457 if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE || c->id == CLIENT_ID_AOF) {
458 char *to, *from;
459
460 if (c->id == CLIENT_ID_AOF) {
461 to = "AOF-loading-client";
462 from = "server";
463 } else if (ctype == CLIENT_TYPE_MASTER) {
464 to = "master";
465 from = "replica";
466 } else {
467 to = "replica";
468 from = "master";
469 }
470
471 if (len > 4096) len = 4096;
472 char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
473 serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
474 "to its %s: '%.*s' after processing the command "
475 "'%s'", from, to, (int)len, s, cmdname);
476 if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog &&
477 server.repl_backlog->histlen > 0)
478 {
479 showLatestBacklog();
480 }
481 server.stat_unexpected_error_replies++;
482 }
483 }
484
485 /* The 'err' object is expected to start with -ERRORCODE and end with \r\n.
486 * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
addReplyErrorObject(client * c,robj * err)487 void addReplyErrorObject(client *c, robj *err) {
488 addReply(c, err);
489 afterErrorReply(c, err->ptr, sdslen(err->ptr)-2); /* Ignore trailing \r\n */
490 }
491
492 /* Sends either a reply or an error reply by checking the first char.
493 * If the first char is '-' the reply is considered an error.
494 * In any case the given reply is sent, if the reply is also recognize
495 * as an error we also perform some post reply operations such as
496 * logging and stats update. */
addReplyOrErrorObject(client * c,robj * reply)497 void addReplyOrErrorObject(client *c, robj *reply) {
498 serverAssert(sdsEncodedObject(reply));
499 sds rep = reply->ptr;
500 if (sdslen(rep) > 1 && rep[0] == '-') {
501 addReplyErrorObject(c, reply);
502 } else {
503 addReply(c, reply);
504 }
505 }
506
507 /* See addReplyErrorLength for expectations from the input string. */
addReplyError(client * c,const char * err)508 void addReplyError(client *c, const char *err) {
509 addReplyErrorLength(c,err,strlen(err));
510 afterErrorReply(c,err,strlen(err));
511 }
512
513 /* See addReplyErrorLength for expectations from the input string. */
514 /* As a side effect the SDS string is freed. */
addReplyErrorSds(client * c,sds err)515 void addReplyErrorSds(client *c, sds err) {
516 addReplyErrorLength(c,err,sdslen(err));
517 afterErrorReply(c,err,sdslen(err));
518 sdsfree(err);
519 }
520
521 /* See addReplyErrorLength for expectations from the formatted string.
522 * The formatted string is safe to contain \r and \n anywhere. */
addReplyErrorFormat(client * c,const char * fmt,...)523 void addReplyErrorFormat(client *c, const char *fmt, ...) {
524 va_list ap;
525 va_start(ap,fmt);
526 sds s = sdscatvprintf(sdsempty(),fmt,ap);
527 va_end(ap);
528 /* Trim any newlines at the end (ones will be added by addReplyErrorLength) */
529 s = sdstrim(s, "\r\n");
530 /* Make sure there are no newlines in the middle of the string, otherwise
531 * invalid protocol is emitted. */
532 s = sdsmapchars(s, "\r\n", " ", 2);
533 addReplyErrorLength(c,s,sdslen(s));
534 afterErrorReply(c,s,sdslen(s));
535 sdsfree(s);
536 }
537
addReplyStatusLength(client * c,const char * s,size_t len)538 void addReplyStatusLength(client *c, const char *s, size_t len) {
539 addReplyProto(c,"+",1);
540 addReplyProto(c,s,len);
541 addReplyProto(c,"\r\n",2);
542 }
543
addReplyStatus(client * c,const char * status)544 void addReplyStatus(client *c, const char *status) {
545 addReplyStatusLength(c,status,strlen(status));
546 }
547
addReplyStatusFormat(client * c,const char * fmt,...)548 void addReplyStatusFormat(client *c, const char *fmt, ...) {
549 va_list ap;
550 va_start(ap,fmt);
551 sds s = sdscatvprintf(sdsempty(),fmt,ap);
552 va_end(ap);
553 addReplyStatusLength(c,s,sdslen(s));
554 sdsfree(s);
555 }
556
557 /* Sometimes we are forced to create a new reply node, and we can't append to
558 * the previous one, when that happens, we wanna try to trim the unused space
559 * at the end of the last reply node which we won't use anymore. */
trimReplyUnusedTailSpace(client * c)560 void trimReplyUnusedTailSpace(client *c) {
561 listNode *ln = listLast(c->reply);
562 clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
563
564 /* Note that 'tail' may be NULL even if we have a tail node, because when
565 * addReplyDeferredLen() is used */
566 if (!tail) return;
567
568 /* We only try to trim the space is relatively high (more than a 1/4 of the
569 * allocation), otherwise there's a high chance realloc will NOP.
570 * Also, to avoid large memmove which happens as part of realloc, we only do
571 * that if the used part is small. */
572 if (tail->size - tail->used > tail->size / 4 &&
573 tail->used < PROTO_REPLY_CHUNK_BYTES)
574 {
575 size_t old_size = tail->size;
576 tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock));
577 /* take over the allocation's internal fragmentation (at least for
578 * memory usage tracking) */
579 tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock);
580 c->reply_bytes = c->reply_bytes + tail->size - old_size;
581 listNodeValue(ln) = tail;
582 }
583 }
584
585 /* Adds an empty object to the reply list that will contain the multi bulk
586 * length, which is not known when this function is called. */
addReplyDeferredLen(client * c)587 void *addReplyDeferredLen(client *c) {
588 /* Note that we install the write event here even if the object is not
589 * ready to be sent, since we are sure that before returning to the
590 * event loop setDeferredAggregateLen() will be called. */
591 if (prepareClientToWrite(c) != C_OK) return NULL;
592 trimReplyUnusedTailSpace(c);
593 listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
594 return listLast(c->reply);
595 }
596
setDeferredReply(client * c,void * node,const char * s,size_t length)597 void setDeferredReply(client *c, void *node, const char *s, size_t length) {
598 listNode *ln = (listNode*)node;
599 clientReplyBlock *next, *prev;
600
601 /* Abort when *node is NULL: when the client should not accept writes
602 * we return NULL in addReplyDeferredLen() */
603 if (node == NULL) return;
604 serverAssert(!listNodeValue(ln));
605
606 /* Normally we fill this dummy NULL node, added by addReplyDeferredLen(),
607 * with a new buffer structure containing the protocol needed to specify
608 * the length of the array following. However sometimes there might be room
609 * in the previous/next node so we can instead remove this NULL node, and
610 * suffix/prefix our data in the node immediately before/after it, in order
611 * to save a write(2) syscall later. Conditions needed to do it:
612 *
613 * - The prev node is non-NULL and has space in it or
614 * - The next node is non-NULL,
615 * - It has enough room already allocated
616 * - And not too large (avoid large memmove) */
617 if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) &&
618 prev->size - prev->used > 0)
619 {
620 size_t len_to_copy = prev->size - prev->used;
621 if (len_to_copy > length)
622 len_to_copy = length;
623 memcpy(prev->buf + prev->used, s, len_to_copy);
624 prev->used += len_to_copy;
625 length -= len_to_copy;
626 if (length == 0) {
627 listDelNode(c->reply, ln);
628 return;
629 }
630 s += len_to_copy;
631 }
632
633 if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
634 next->size - next->used >= length &&
635 next->used < PROTO_REPLY_CHUNK_BYTES * 4)
636 {
637 memmove(next->buf + length, next->buf, next->used);
638 memcpy(next->buf, s, length);
639 next->used += length;
640 listDelNode(c->reply,ln);
641 } else {
642 /* Create a new node */
643 clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock));
644 /* Take over the allocation's internal fragmentation */
645 buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock);
646 buf->used = length;
647 memcpy(buf->buf, s, length);
648 listNodeValue(ln) = buf;
649 c->reply_bytes += buf->size;
650
651 closeClientOnOutputBufferLimitReached(c, 1);
652 }
653 }
654
655 /* Populate the length object and try gluing it to the next chunk. */
setDeferredAggregateLen(client * c,void * node,long length,char prefix)656 void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
657 serverAssert(length >= 0);
658
659 /* Abort when *node is NULL: when the client should not accept writes
660 * we return NULL in addReplyDeferredLen() */
661 if (node == NULL) return;
662
663 char lenstr[128];
664 size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
665 setDeferredReply(c, node, lenstr, lenstr_len);
666 }
667
setDeferredArrayLen(client * c,void * node,long length)668 void setDeferredArrayLen(client *c, void *node, long length) {
669 setDeferredAggregateLen(c,node,length,'*');
670 }
671
setDeferredMapLen(client * c,void * node,long length)672 void setDeferredMapLen(client *c, void *node, long length) {
673 int prefix = c->resp == 2 ? '*' : '%';
674 if (c->resp == 2) length *= 2;
675 setDeferredAggregateLen(c,node,length,prefix);
676 }
677
setDeferredSetLen(client * c,void * node,long length)678 void setDeferredSetLen(client *c, void *node, long length) {
679 int prefix = c->resp == 2 ? '*' : '~';
680 setDeferredAggregateLen(c,node,length,prefix);
681 }
682
setDeferredAttributeLen(client * c,void * node,long length)683 void setDeferredAttributeLen(client *c, void *node, long length) {
684 serverAssert(c->resp >= 3);
685 setDeferredAggregateLen(c,node,length,'|');
686 }
687
setDeferredPushLen(client * c,void * node,long length)688 void setDeferredPushLen(client *c, void *node, long length) {
689 serverAssert(c->resp >= 3);
690 setDeferredAggregateLen(c,node,length,'>');
691 }
692
693 /* Add a double as a bulk reply */
addReplyDouble(client * c,double d)694 void addReplyDouble(client *c, double d) {
695 if (isinf(d)) {
696 /* Libc in odd systems (Hi Solaris!) will format infinite in a
697 * different way, so better to handle it in an explicit way. */
698 if (c->resp == 2) {
699 addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
700 } else {
701 addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n",
702 d > 0 ? 6 : 7);
703 }
704 } else {
705 char dbuf[MAX_LONG_DOUBLE_CHARS+3],
706 sbuf[MAX_LONG_DOUBLE_CHARS+32];
707 int dlen, slen;
708 if (c->resp == 2) {
709 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
710 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
711 addReplyProto(c,sbuf,slen);
712 } else {
713 dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d);
714 addReplyProto(c,dbuf,dlen);
715 }
716 }
717 }
718
addReplyBigNum(client * c,const char * num,size_t len)719 void addReplyBigNum(client *c, const char* num, size_t len) {
720 if (c->resp == 2) {
721 addReplyBulkCBuffer(c, num, len);
722 } else {
723 addReplyProto(c,"(",1);
724 addReplyProto(c,num,len);
725 addReply(c,shared.crlf);
726 }
727 }
728
729 /* Add a long double as a bulk reply, but uses a human readable formatting
730 * of the double instead of exposing the crude behavior of doubles to the
731 * dear user. */
addReplyHumanLongDouble(client * c,long double d)732 void addReplyHumanLongDouble(client *c, long double d) {
733 if (c->resp == 2) {
734 robj *o = createStringObjectFromLongDouble(d,1);
735 addReplyBulk(c,o);
736 decrRefCount(o);
737 } else {
738 char buf[MAX_LONG_DOUBLE_CHARS];
739 int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
740 addReplyProto(c,",",1);
741 addReplyProto(c,buf,len);
742 addReplyProto(c,"\r\n",2);
743 }
744 }
745
746 /* Add a long long as integer reply or bulk len / multi bulk count.
747 * Basically this is used to output <prefix><long long><crlf>. */
addReplyLongLongWithPrefix(client * c,long long ll,char prefix)748 void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
749 char buf[128];
750 int len;
751
752 /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
753 * so we have a few shared objects to use if the integer is small
754 * like it is most of the times. */
755 if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
756 addReply(c,shared.mbulkhdr[ll]);
757 return;
758 } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
759 addReply(c,shared.bulkhdr[ll]);
760 return;
761 }
762
763 buf[0] = prefix;
764 len = ll2string(buf+1,sizeof(buf)-1,ll);
765 buf[len+1] = '\r';
766 buf[len+2] = '\n';
767 addReplyProto(c,buf,len+3);
768 }
769
addReplyLongLong(client * c,long long ll)770 void addReplyLongLong(client *c, long long ll) {
771 if (ll == 0)
772 addReply(c,shared.czero);
773 else if (ll == 1)
774 addReply(c,shared.cone);
775 else
776 addReplyLongLongWithPrefix(c,ll,':');
777 }
778
addReplyAggregateLen(client * c,long length,int prefix)779 void addReplyAggregateLen(client *c, long length, int prefix) {
780 serverAssert(length >= 0);
781 addReplyLongLongWithPrefix(c,length,prefix);
782 }
783
addReplyArrayLen(client * c,long length)784 void addReplyArrayLen(client *c, long length) {
785 addReplyAggregateLen(c,length,'*');
786 }
787
addReplyMapLen(client * c,long length)788 void addReplyMapLen(client *c, long length) {
789 int prefix = c->resp == 2 ? '*' : '%';
790 if (c->resp == 2) length *= 2;
791 addReplyAggregateLen(c,length,prefix);
792 }
793
addReplySetLen(client * c,long length)794 void addReplySetLen(client *c, long length) {
795 int prefix = c->resp == 2 ? '*' : '~';
796 addReplyAggregateLen(c,length,prefix);
797 }
798
addReplyAttributeLen(client * c,long length)799 void addReplyAttributeLen(client *c, long length) {
800 serverAssert(c->resp >= 3);
801 addReplyAggregateLen(c,length,'|');
802 }
803
addReplyPushLen(client * c,long length)804 void addReplyPushLen(client *c, long length) {
805 serverAssert(c->resp >= 3);
806 addReplyAggregateLen(c,length,'>');
807 }
808
addReplyNull(client * c)809 void addReplyNull(client *c) {
810 if (c->resp == 2) {
811 addReplyProto(c,"$-1\r\n",5);
812 } else {
813 addReplyProto(c,"_\r\n",3);
814 }
815 }
816
addReplyBool(client * c,int b)817 void addReplyBool(client *c, int b) {
818 if (c->resp == 2) {
819 addReply(c, b ? shared.cone : shared.czero);
820 } else {
821 addReplyProto(c, b ? "#t\r\n" : "#f\r\n",4);
822 }
823 }
824
825 /* A null array is a concept that no longer exists in RESP3. However
826 * RESP2 had it, so API-wise we have this call, that will emit the correct
827 * RESP2 protocol, however for RESP3 the reply will always be just the
828 * Null type "_\r\n". */
addReplyNullArray(client * c)829 void addReplyNullArray(client *c) {
830 if (c->resp == 2) {
831 addReplyProto(c,"*-1\r\n",5);
832 } else {
833 addReplyProto(c,"_\r\n",3);
834 }
835 }
836
837 /* Create the length prefix of a bulk reply, example: $2234 */
addReplyBulkLen(client * c,robj * obj)838 void addReplyBulkLen(client *c, robj *obj) {
839 size_t len = stringObjectLen(obj);
840
841 addReplyLongLongWithPrefix(c,len,'$');
842 }
843
844 /* Add a Redis Object as a bulk reply */
addReplyBulk(client * c,robj * obj)845 void addReplyBulk(client *c, robj *obj) {
846 addReplyBulkLen(c,obj);
847 addReply(c,obj);
848 addReply(c,shared.crlf);
849 }
850
851 /* Add a C buffer as bulk reply */
addReplyBulkCBuffer(client * c,const void * p,size_t len)852 void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
853 addReplyLongLongWithPrefix(c,len,'$');
854 addReplyProto(c,p,len);
855 addReply(c,shared.crlf);
856 }
857
858 /* Add sds to reply (takes ownership of sds and frees it) */
addReplyBulkSds(client * c,sds s)859 void addReplyBulkSds(client *c, sds s) {
860 addReplyLongLongWithPrefix(c,sdslen(s),'$');
861 addReplySds(c,s);
862 addReply(c,shared.crlf);
863 }
864
865 /* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */
setDeferredReplyBulkSds(client * c,void * node,sds s)866 void setDeferredReplyBulkSds(client *c, void *node, sds s) {
867 sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s);
868 setDeferredReply(c, node, reply, sdslen(reply));
869 sdsfree(reply);
870 sdsfree(s);
871 }
872
873 /* Add a C null term string as bulk reply */
addReplyBulkCString(client * c,const char * s)874 void addReplyBulkCString(client *c, const char *s) {
875 if (s == NULL) {
876 addReplyNull(c);
877 } else {
878 addReplyBulkCBuffer(c,s,strlen(s));
879 }
880 }
881
882 /* Add a long long as a bulk reply */
addReplyBulkLongLong(client * c,long long ll)883 void addReplyBulkLongLong(client *c, long long ll) {
884 char buf[64];
885 int len;
886
887 len = ll2string(buf,64,ll);
888 addReplyBulkCBuffer(c,buf,len);
889 }
890
891 /* Reply with a verbatim type having the specified extension.
892 *
893 * The 'ext' is the "extension" of the file, actually just a three
894 * character type that describes the format of the verbatim string.
895 * For instance "txt" means it should be interpreted as a text only
896 * file by the receiver, "md " as markdown, and so forth. Only the
897 * three first characters of the extension are used, and if the
898 * provided one is shorter than that, the remaining is filled with
899 * spaces. */
addReplyVerbatim(client * c,const char * s,size_t len,const char * ext)900 void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
901 if (c->resp == 2) {
902 addReplyBulkCBuffer(c,s,len);
903 } else {
904 char buf[32];
905 size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
906 char *p = buf+preflen-4;
907 for (int i = 0; i < 3; i++) {
908 if (*ext == '\0') {
909 p[i] = ' ';
910 } else {
911 p[i] = *ext++;
912 }
913 }
914 addReplyProto(c,buf,preflen);
915 addReplyProto(c,s,len);
916 addReplyProto(c,"\r\n",2);
917 }
918 }
919
920 /* Add an array of C strings as status replies with a heading.
921 * This function is typically invoked by from commands that support
922 * subcommands in response to the 'help' subcommand. The help array
923 * is terminated by NULL sentinel. */
addReplyHelp(client * c,const char ** help)924 void addReplyHelp(client *c, const char **help) {
925 sds cmd = sdsnew((char*) c->argv[0]->ptr);
926 void *blenp = addReplyDeferredLen(c);
927 int blen = 0;
928
929 sdstoupper(cmd);
930 addReplyStatusFormat(c,
931 "%s <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",cmd);
932 sdsfree(cmd);
933
934 while (help[blen]) addReplyStatus(c,help[blen++]);
935
936 addReplyStatus(c,"HELP");
937 addReplyStatus(c," Prints this help.");
938
939 blen += 1; /* Account for the header. */
940 blen += 2; /* Account for the footer. */
941 setDeferredArrayLen(c,blenp,blen);
942 }
943
944 /* Add a suggestive error reply.
945 * This function is typically invoked by from commands that support
946 * subcommands in response to an unknown subcommand or argument error. */
addReplySubcommandSyntaxError(client * c)947 void addReplySubcommandSyntaxError(client *c) {
948 sds cmd = sdsnew((char*) c->argv[0]->ptr);
949 sdstoupper(cmd);
950 addReplyErrorFormat(c,
951 "Unknown subcommand or wrong number of arguments for '%s'. Try %s HELP.",
952 (char*)c->argv[1]->ptr,cmd);
953 sdsfree(cmd);
954 }
955
956 /* Append 'src' client output buffers into 'dst' client output buffers.
957 * This function clears the output buffers of 'src' */
AddReplyFromClient(client * dst,client * src)958 void AddReplyFromClient(client *dst, client *src) {
959 /* If the source client contains a partial response due to client output
960 * buffer limits, propagate that to the dest rather than copy a partial
961 * reply. We don't wanna run the risk of copying partial response in case
962 * for some reason the output limits don't reach the same decision (maybe
963 * they changed) */
964 if (src->flags & CLIENT_CLOSE_ASAP) {
965 sds client = catClientInfoString(sdsempty(),dst);
966 freeClientAsync(dst);
967 serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
968 sdsfree(client);
969 return;
970 }
971
972 /* First add the static buffer (either into the static buffer or reply list) */
973 addReplyProto(dst,src->buf, src->bufpos);
974
975 /* We need to check with prepareClientToWrite again (after addReplyProto)
976 * since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */
977 if (prepareClientToWrite(dst) != C_OK)
978 return;
979
980 /* We're bypassing _addReplyProtoToList, so we need to add the pre/post
981 * checks in it. */
982 if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return;
983
984 /* Concatenate the reply list into the dest */
985 if (listLength(src->reply))
986 listJoin(dst->reply,src->reply);
987 dst->reply_bytes += src->reply_bytes;
988 src->reply_bytes = 0;
989 src->bufpos = 0;
990
991 /* Check output buffer limits */
992 closeClientOnOutputBufferLimitReached(dst, 1);
993 }
994
995 /* Logically copy 'src' replica client buffers info to 'dst' replica.
996 * Basically increase referenced buffer block node reference count. */
copyReplicaOutputBuffer(client * dst,client * src)997 void copyReplicaOutputBuffer(client *dst, client *src) {
998 serverAssert(src->bufpos == 0 && listLength(src->reply) == 0);
999
1000 if (src->ref_repl_buf_node == NULL) return;
1001 dst->ref_repl_buf_node = src->ref_repl_buf_node;
1002 dst->ref_block_pos = src->ref_block_pos;
1003 ((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++;
1004 }
1005
1006 /* Return true if the specified client has pending reply buffers to write to
1007 * the socket. */
clientHasPendingReplies(client * c)1008 int clientHasPendingReplies(client *c) {
1009 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
1010 /* Replicas use global shared replication buffer instead of
1011 * private output buffer. */
1012 serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
1013 if (c->ref_repl_buf_node == NULL) return 0;
1014
1015 /* If the last replication buffer block content is totally sent,
1016 * we have nothing to send. */
1017 listNode *ln = listLast(server.repl_buffer_blocks);
1018 replBufBlock *tail = listNodeValue(ln);
1019 if (ln == c->ref_repl_buf_node &&
1020 c->ref_block_pos == tail->used) return 0;
1021
1022 return 1;
1023 } else {
1024 return c->bufpos || listLength(c->reply);
1025 }
1026 }
1027
clientAcceptHandler(connection * conn)1028 void clientAcceptHandler(connection *conn) {
1029 client *c = connGetPrivateData(conn);
1030
1031 if (connGetState(conn) != CONN_STATE_CONNECTED) {
1032 serverLog(LL_WARNING,
1033 "Error accepting a client connection: %s",
1034 connGetLastError(conn));
1035 freeClientAsync(c);
1036 return;
1037 }
1038
1039 /* If the server is running in protected mode (the default) and there
1040 * is no password set, nor a specific interface is bound, we don't accept
1041 * requests from non loopback interfaces. Instead we try to explain the
1042 * user what to do to fix it if needed. */
1043 if (server.protected_mode &&
1044 DefaultUser->flags & USER_FLAG_NOPASS &&
1045 !(c->flags & CLIENT_UNIX_SOCKET))
1046 {
1047 char cip[NET_IP_STR_LEN+1] = { 0 };
1048 connPeerToString(conn, cip, sizeof(cip)-1, NULL);
1049
1050 if (strcmp(cip,"127.0.0.1") && strcmp(cip,"::1")) {
1051 char *err =
1052 "-DENIED Redis is running in protected mode because protected "
1053 "mode is enabled and no password is set for the default user. "
1054 "In this mode connections are only accepted from the loopback interface. "
1055 "If you want to connect from external computers to Redis you "
1056 "may adopt one of the following solutions: "
1057 "1) Just disable protected mode sending the command "
1058 "'CONFIG SET protected-mode no' from the loopback interface "
1059 "by connecting to Redis from the same host the server is "
1060 "running, however MAKE SURE Redis is not publicly accessible "
1061 "from internet if you do so. Use CONFIG REWRITE to make this "
1062 "change permanent. "
1063 "2) Alternatively you can just disable the protected mode by "
1064 "editing the Redis configuration file, and setting the protected "
1065 "mode option to 'no', and then restarting the server. "
1066 "3) If you started the server manually just for testing, restart "
1067 "it with the '--protected-mode no' option. "
1068 "4) Setup a an authentication password for the default user. "
1069 "NOTE: You only need to do one of the above things in order for "
1070 "the server to start accepting connections from the outside.\r\n";
1071 if (connWrite(c->conn,err,strlen(err)) == -1) {
1072 /* Nothing to do, Just to avoid the warning... */
1073 }
1074 server.stat_rejected_conn++;
1075 freeClientAsync(c);
1076 return;
1077 }
1078 }
1079
1080 server.stat_numconnections++;
1081 moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,
1082 REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,
1083 c);
1084 }
1085
1086 #define MAX_ACCEPTS_PER_CALL 1000
acceptCommonHandler(connection * conn,int flags,char * ip)1087 static void acceptCommonHandler(connection *conn, int flags, char *ip) {
1088 client *c;
1089 char conninfo[100];
1090 UNUSED(ip);
1091
1092 if (connGetState(conn) != CONN_STATE_ACCEPTING) {
1093 serverLog(LL_VERBOSE,
1094 "Accepted client connection in error state: %s (conn: %s)",
1095 connGetLastError(conn),
1096 connGetInfo(conn, conninfo, sizeof(conninfo)));
1097 connClose(conn);
1098 return;
1099 }
1100
1101 /* Limit the number of connections we take at the same time.
1102 *
1103 * Admission control will happen before a client is created and connAccept()
1104 * called, because we don't want to even start transport-level negotiation
1105 * if rejected. */
1106 if (listLength(server.clients) + getClusterConnectionsCount()
1107 >= server.maxclients)
1108 {
1109 char *err;
1110 if (server.cluster_enabled)
1111 err = "-ERR max number of clients + cluster "
1112 "connections reached\r\n";
1113 else
1114 err = "-ERR max number of clients reached\r\n";
1115
1116 /* That's a best effort error message, don't check write errors.
1117 * Note that for TLS connections, no handshake was done yet so nothing
1118 * is written and the connection will just drop. */
1119 if (connWrite(conn,err,strlen(err)) == -1) {
1120 /* Nothing to do, Just to avoid the warning... */
1121 }
1122 server.stat_rejected_conn++;
1123 connClose(conn);
1124 return;
1125 }
1126
1127 /* Create connection and client */
1128 if ((c = createClient(conn)) == NULL) {
1129 serverLog(LL_WARNING,
1130 "Error registering fd event for the new client: %s (conn: %s)",
1131 connGetLastError(conn),
1132 connGetInfo(conn, conninfo, sizeof(conninfo)));
1133 connClose(conn); /* May be already closed, just ignore errors */
1134 return;
1135 }
1136
1137 /* Last chance to keep flags */
1138 c->flags |= flags;
1139
1140 /* Initiate accept.
1141 *
1142 * Note that connAccept() is free to do two things here:
1143 * 1. Call clientAcceptHandler() immediately;
1144 * 2. Schedule a future call to clientAcceptHandler().
1145 *
1146 * Because of that, we must do nothing else afterwards.
1147 */
1148 if (connAccept(conn, clientAcceptHandler) == C_ERR) {
1149 char conninfo[100];
1150 if (connGetState(conn) == CONN_STATE_ERROR)
1151 serverLog(LL_WARNING,
1152 "Error accepting a client connection: %s (conn: %s)",
1153 connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
1154 freeClient(connGetPrivateData(conn));
1155 return;
1156 }
1157 }
1158
acceptTcpHandler(aeEventLoop * el,int fd,void * privdata,int mask)1159 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1160 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
1161 char cip[NET_IP_STR_LEN];
1162 UNUSED(el);
1163 UNUSED(mask);
1164 UNUSED(privdata);
1165
1166 while(max--) {
1167 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
1168 if (cfd == ANET_ERR) {
1169 if (errno != EWOULDBLOCK)
1170 serverLog(LL_WARNING,
1171 "Accepting client connection: %s", server.neterr);
1172 return;
1173 }
1174 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
1175 acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
1176 }
1177 }
1178
acceptTLSHandler(aeEventLoop * el,int fd,void * privdata,int mask)1179 void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1180 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
1181 char cip[NET_IP_STR_LEN];
1182 UNUSED(el);
1183 UNUSED(mask);
1184 UNUSED(privdata);
1185
1186 while(max--) {
1187 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
1188 if (cfd == ANET_ERR) {
1189 if (errno != EWOULDBLOCK)
1190 serverLog(LL_WARNING,
1191 "Accepting client connection: %s", server.neterr);
1192 return;
1193 }
1194 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
1195 acceptCommonHandler(connCreateAcceptedTLS(cfd, server.tls_auth_clients),0,cip);
1196 }
1197 }
1198
acceptUnixHandler(aeEventLoop * el,int fd,void * privdata,int mask)1199 void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1200 int cfd, max = MAX_ACCEPTS_PER_CALL;
1201 UNUSED(el);
1202 UNUSED(mask);
1203 UNUSED(privdata);
1204
1205 while(max--) {
1206 cfd = anetUnixAccept(server.neterr, fd);
1207 if (cfd == ANET_ERR) {
1208 if (errno != EWOULDBLOCK)
1209 serverLog(LL_WARNING,
1210 "Accepting client connection: %s", server.neterr);
1211 return;
1212 }
1213 serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
1214 acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL);
1215 }
1216 }
1217
freeClientOriginalArgv(client * c)1218 void freeClientOriginalArgv(client *c) {
1219 /* We didn't rewrite this client */
1220 if (!c->original_argv) return;
1221
1222 for (int j = 0; j < c->original_argc; j++)
1223 decrRefCount(c->original_argv[j]);
1224 zfree(c->original_argv);
1225 c->original_argv = NULL;
1226 c->original_argc = 0;
1227 }
1228
freeClientArgv(client * c)1229 void freeClientArgv(client *c) {
1230 int j;
1231 for (j = 0; j < c->argc; j++)
1232 decrRefCount(c->argv[j]);
1233 c->argc = 0;
1234 c->cmd = NULL;
1235 c->argv_len_sum = 0;
1236 c->argv_len = 0;
1237 zfree(c->argv);
1238 c->argv = NULL;
1239 }
1240
1241 /* Close all the slaves connections. This is useful in chained replication
1242 * when we resync with our own master and want to force all our slaves to
1243 * resync with us as well. */
disconnectSlaves(void)1244 void disconnectSlaves(void) {
1245 listIter li;
1246 listNode *ln;
1247 listRewind(server.slaves,&li);
1248 while((ln = listNext(&li))) {
1249 freeClient((client*)ln->value);
1250 }
1251 }
1252
1253 /* Check if there is any other slave waiting dumping RDB finished expect me.
1254 * This function is useful to judge current dumping RDB can be used for full
1255 * synchronization or not. */
anyOtherSlaveWaitRdb(client * except_me)1256 int anyOtherSlaveWaitRdb(client *except_me) {
1257 listIter li;
1258 listNode *ln;
1259
1260 listRewind(server.slaves, &li);
1261 while((ln = listNext(&li))) {
1262 client *slave = ln->value;
1263 if (slave != except_me &&
1264 slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
1265 {
1266 return 1;
1267 }
1268 }
1269 return 0;
1270 }
1271
1272 /* Remove the specified client from global lists where the client could
1273 * be referenced, not including the Pub/Sub channels.
1274 * This is used by freeClient() and replicationCacheMaster(). */
unlinkClient(client * c)1275 void unlinkClient(client *c) {
1276 listNode *ln;
1277
1278 /* If this is marked as current client unset it. */
1279 if (server.current_client == c) server.current_client = NULL;
1280
1281 /* Certain operations must be done only if the client has an active connection.
1282 * If the client was already unlinked or if it's a "fake client" the
1283 * conn is already set to NULL. */
1284 if (c->conn) {
1285 /* Remove from the list of active clients. */
1286 if (c->client_list_node) {
1287 uint64_t id = htonu64(c->id);
1288 raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL);
1289 listDelNode(server.clients,c->client_list_node);
1290 c->client_list_node = NULL;
1291 }
1292
1293 /* Check if this is a replica waiting for diskless replication (rdb pipe),
1294 * in which case it needs to be cleaned from that list */
1295 if (c->flags & CLIENT_SLAVE &&
1296 c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
1297 server.rdb_pipe_conns)
1298 {
1299 int i;
1300 for (i=0; i < server.rdb_pipe_numconns; i++) {
1301 if (server.rdb_pipe_conns[i] == c->conn) {
1302 rdbPipeWriteHandlerConnRemoved(c->conn);
1303 server.rdb_pipe_conns[i] = NULL;
1304 break;
1305 }
1306 }
1307 }
1308 connClose(c->conn);
1309 c->conn = NULL;
1310 }
1311
1312 /* Remove from the list of pending writes if needed. */
1313 if (c->flags & CLIENT_PENDING_WRITE) {
1314 ln = listSearchKey(server.clients_pending_write,c);
1315 serverAssert(ln != NULL);
1316 listDelNode(server.clients_pending_write,ln);
1317 c->flags &= ~CLIENT_PENDING_WRITE;
1318 }
1319
1320 /* Remove from the list of pending reads if needed. */
1321 serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
1322 if (c->pending_read_list_node != NULL) {
1323 listDelNode(server.clients_pending_read,c->pending_read_list_node);
1324 c->pending_read_list_node = NULL;
1325 }
1326
1327
1328 /* When client was just unblocked because of a blocking operation,
1329 * remove it from the list of unblocked clients. */
1330 if (c->flags & CLIENT_UNBLOCKED) {
1331 ln = listSearchKey(server.unblocked_clients,c);
1332 serverAssert(ln != NULL);
1333 listDelNode(server.unblocked_clients,ln);
1334 c->flags &= ~CLIENT_UNBLOCKED;
1335 }
1336
1337 /* Clear the tracking status. */
1338 if (c->flags & CLIENT_TRACKING) disableTracking(c);
1339 }
1340
freeClient(client * c)1341 void freeClient(client *c) {
1342 listNode *ln;
1343
1344 /* If a client is protected, yet we need to free it right now, make sure
1345 * to at least use asynchronous freeing. */
1346 if (c->flags & CLIENT_PROTECTED) {
1347 freeClientAsync(c);
1348 return;
1349 }
1350
1351 /* For connected clients, call the disconnection event of modules hooks. */
1352 if (c->conn) {
1353 moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,
1354 REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED,
1355 c);
1356 }
1357
1358 /* Notify module system that this client auth status changed. */
1359 moduleNotifyUserChanged(c);
1360
1361 /* If this client was scheduled for async freeing we need to remove it
1362 * from the queue. Note that we need to do this here, because later
1363 * we may call replicationCacheMaster() and the client should already
1364 * be removed from the list of clients to free. */
1365 if (c->flags & CLIENT_CLOSE_ASAP) {
1366 ln = listSearchKey(server.clients_to_close,c);
1367 serverAssert(ln != NULL);
1368 listDelNode(server.clients_to_close,ln);
1369 }
1370
1371 /* If it is our master that's being disconnected we should make sure
1372 * to cache the state to try a partial resynchronization later.
1373 *
1374 * Note that before doing this we make sure that the client is not in
1375 * some unexpected state, by checking its flags. */
1376 if (server.master && c->flags & CLIENT_MASTER) {
1377 serverLog(LL_WARNING,"Connection with master lost.");
1378 if (!(c->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) {
1379 c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY);
1380 replicationCacheMaster(c);
1381 return;
1382 }
1383 }
1384
1385 /* Log link disconnection with slave */
1386 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
1387 serverLog(LL_WARNING,"Connection with replica %s lost.",
1388 replicationGetSlaveName(c));
1389 }
1390
1391 /* Free the query buffer */
1392 sdsfree(c->querybuf);
1393 sdsfree(c->pending_querybuf);
1394 c->querybuf = NULL;
1395
1396 /* Deallocate structures used to block on blocking ops. */
1397 if (c->flags & CLIENT_BLOCKED) unblockClient(c);
1398 dictRelease(c->bpop.keys);
1399
1400 /* UNWATCH all the keys */
1401 unwatchAllKeys(c);
1402 listRelease(c->watched_keys);
1403
1404 /* Unsubscribe from all the pubsub channels */
1405 pubsubUnsubscribeAllChannels(c,0);
1406 pubsubUnsubscribeAllPatterns(c,0);
1407 dictRelease(c->pubsub_channels);
1408 listRelease(c->pubsub_patterns);
1409
1410 /* Free data structures. */
1411 listRelease(c->reply);
1412 freeReplicaReferencedReplBuffer(c);
1413 freeClientArgv(c);
1414 freeClientOriginalArgv(c);
1415
1416 /* Unlink the client: this will close the socket, remove the I/O
1417 * handlers, and remove references of the client from different
1418 * places where active clients may be referenced. */
1419 unlinkClient(c);
1420
1421 /* Master/slave cleanup Case 1:
1422 * we lost the connection with a slave. */
1423 if (c->flags & CLIENT_SLAVE) {
1424 /* If there is no any other slave waiting dumping RDB finished, the
1425 * current child process need not continue to dump RDB, then we kill it.
1426 * So child process won't use more memory, and we also can fork a new
1427 * child process asap to dump rdb for next full synchronization or bgsave.
1428 * But we also need to check if users enable 'save' RDB, if enable, we
1429 * should not remove directly since that means RDB is important for users
1430 * to keep data safe and we may delay configured 'save' for full sync. */
1431 if (server.saveparamslen == 0 &&
1432 c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
1433 server.child_type == CHILD_TYPE_RDB &&
1434 server.rdb_child_type == RDB_CHILD_TYPE_DISK &&
1435 anyOtherSlaveWaitRdb(c) == 0)
1436 {
1437 killRDBChild();
1438 }
1439 if (c->replstate == SLAVE_STATE_SEND_BULK) {
1440 if (c->repldbfd != -1) close(c->repldbfd);
1441 if (c->replpreamble) sdsfree(c->replpreamble);
1442 }
1443 list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
1444 ln = listSearchKey(l,c);
1445 serverAssert(ln != NULL);
1446 listDelNode(l,ln);
1447 /* We need to remember the time when we started to have zero
1448 * attached slaves, as after some time we'll free the replication
1449 * backlog. */
1450 if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0)
1451 server.repl_no_slaves_since = server.unixtime;
1452 refreshGoodSlavesCount();
1453 /* Fire the replica change modules event. */
1454 if (c->replstate == SLAVE_STATE_ONLINE)
1455 moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
1456 REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE,
1457 NULL);
1458 }
1459
1460 /* Master/slave cleanup Case 2:
1461 * we lost the connection with the master. */
1462 if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
1463
1464 /* Remove the contribution that this client gave to our
1465 * incrementally computed memory usage. */
1466 server.stat_clients_type_memory[c->last_memory_type] -=
1467 c->last_memory_usage;
1468 /* Remove client from memory usage buckets */
1469 if (c->mem_usage_bucket) {
1470 c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
1471 listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node);
1472 }
1473
1474 /* Release other dynamically allocated client structure fields,
1475 * and finally release the client structure itself. */
1476 if (c->name) decrRefCount(c->name);
1477 freeClientMultiState(c);
1478 sdsfree(c->peerid);
1479 sdsfree(c->sockname);
1480 sdsfree(c->slave_addr);
1481 zfree(c);
1482 }
1483
1484 /* Schedule a client to free it at a safe time in the serverCron() function.
1485 * This function is useful when we need to terminate a client but we are in
1486 * a context where calling freeClient() is not possible, because the client
1487 * should be valid for the continuation of the flow of the program. */
freeClientAsync(client * c)1488 void freeClientAsync(client *c) {
1489 /* We need to handle concurrent access to the server.clients_to_close list
1490 * only in the freeClientAsync() function, since it's the only function that
1491 * may access the list while Redis uses I/O threads. All the other accesses
1492 * are in the context of the main thread while the other threads are
1493 * idle. */
1494 if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
1495 c->flags |= CLIENT_CLOSE_ASAP;
1496 if (server.io_threads_num == 1) {
1497 /* no need to bother with locking if there's just one thread (the main thread) */
1498 listAddNodeTail(server.clients_to_close,c);
1499 return;
1500 }
1501 static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
1502 pthread_mutex_lock(&async_free_queue_mutex);
1503 listAddNodeTail(server.clients_to_close,c);
1504 pthread_mutex_unlock(&async_free_queue_mutex);
1505 }
1506
1507 /* Perform processing of the client before moving on to processing the next client
1508 * this is useful for performing operations that affect the global state but can't
1509 * wait until we're done with all clients. In other words can't wait until beforeSleep()
1510 * return C_ERR in case client is no longer valid after call.
1511 * The input client argument: c, may be NULL in case the previous client was
1512 * freed before the call. */
beforeNextClient(client * c)1513 int beforeNextClient(client *c) {
1514 /* Skip the client processing if we're in an IO thread, in that case we'll perform
1515 this operation later (this function is called again) in the fan-in stage of the threading mechanism */
1516 if (io_threads_op != IO_THREADS_OP_IDLE)
1517 return C_OK;
1518 /* Handle async frees */
1519 /* Note: this doesn't make the server.clients_to_close list redundant because of
1520 * cases where we want an async free of a client other than myself. For example
1521 * in ACL modifications we disconnect clients authenticated to non-existent
1522 * users (see ACL LOAD). */
1523 if (c && (c->flags & CLIENT_CLOSE_ASAP)) {
1524 freeClient(c);
1525 return C_ERR;
1526 }
1527 return C_OK;
1528 }
1529
1530 /* Free the clients marked as CLOSE_ASAP, return the number of clients
1531 * freed. */
freeClientsInAsyncFreeQueue(void)1532 int freeClientsInAsyncFreeQueue(void) {
1533 int freed = 0;
1534 listIter li;
1535 listNode *ln;
1536
1537 listRewind(server.clients_to_close,&li);
1538 while ((ln = listNext(&li)) != NULL) {
1539 client *c = listNodeValue(ln);
1540
1541 if (c->flags & CLIENT_PROTECTED) continue;
1542
1543 c->flags &= ~CLIENT_CLOSE_ASAP;
1544 freeClient(c);
1545 listDelNode(server.clients_to_close,ln);
1546 freed++;
1547 }
1548 return freed;
1549 }
1550
1551 /* Return a client by ID, or NULL if the client ID is not in the set
1552 * of registered clients. Note that "fake clients", created with -1 as FD,
1553 * are not registered clients. */
lookupClientByID(uint64_t id)1554 client *lookupClientByID(uint64_t id) {
1555 id = htonu64(id);
1556 client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id));
1557 return (c == raxNotFound) ? NULL : c;
1558 }
1559
1560 /* This function does actual writing output buffers to different types of
1561 * clients, it is called by writeToClient.
1562 * If we write successfully, it return C_OK, otherwise, C_ERR is returned,
1563 * And 'nwritten' is a output parameter, it means how many bytes server write
1564 * to client. */
_writeToClient(client * c,ssize_t * nwritten)1565 int _writeToClient(client *c, ssize_t *nwritten) {
1566 *nwritten = 0;
1567 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
1568 serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
1569
1570 replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
1571 serverAssert(o->used >= c->ref_block_pos);
1572 /* Send current block if it is not fully sent. */
1573 if (o->used > c->ref_block_pos) {
1574 *nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
1575 o->used-c->ref_block_pos);
1576 if (*nwritten <= 0) return C_ERR;
1577 c->ref_block_pos += *nwritten;
1578 }
1579
1580 /* If we fully sent the object on head, go to the next one. */
1581 listNode *next = listNextNode(c->ref_repl_buf_node);
1582 if (next && c->ref_block_pos == o->used) {
1583 o->refcount--;
1584 ((replBufBlock *)(listNodeValue(next)))->refcount++;
1585 c->ref_repl_buf_node = next;
1586 c->ref_block_pos = 0;
1587 incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
1588 }
1589 return C_OK;
1590 }
1591
1592 if (c->bufpos > 0) {
1593 *nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
1594 if (*nwritten <= 0) return C_ERR;
1595 c->sentlen += *nwritten;
1596
1597 /* If the buffer was sent, set bufpos to zero to continue with
1598 * the remainder of the reply. */
1599 if ((int)c->sentlen == c->bufpos) {
1600 c->bufpos = 0;
1601 c->sentlen = 0;
1602 }
1603 } else {
1604 clientReplyBlock *o = listNodeValue(listFirst(c->reply));
1605 size_t objlen = o->used;
1606
1607 if (objlen == 0) {
1608 c->reply_bytes -= o->size;
1609 listDelNode(c->reply,listFirst(c->reply));
1610 return C_OK;
1611 }
1612
1613 *nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);
1614 if (*nwritten <= 0) return C_ERR;
1615 c->sentlen += *nwritten;
1616
1617 /* If we fully sent the object on head go to the next one */
1618 if (c->sentlen == objlen) {
1619 c->reply_bytes -= o->size;
1620 listDelNode(c->reply,listFirst(c->reply));
1621 c->sentlen = 0;
1622 /* If there are no longer objects in the list, we expect
1623 * the count of reply bytes to be exactly zero. */
1624 if (listLength(c->reply) == 0)
1625 serverAssert(c->reply_bytes == 0);
1626 }
1627 }
1628 return C_OK;
1629 }
1630
1631 /* Write data in output buffers to client. Return C_OK if the client
1632 * is still valid after the call, C_ERR if it was freed because of some
1633 * error. If handler_installed is set, it will attempt to clear the
1634 * write event.
1635 *
1636 * This function is called by threads, but always with handler_installed
1637 * set to 0. So when handler_installed is set to 0 the function must be
1638 * thread safe. */
writeToClient(client * c,int handler_installed)1639 int writeToClient(client *c, int handler_installed) {
1640 /* Update total number of writes on server */
1641 atomicIncr(server.stat_total_writes_processed, 1);
1642
1643 ssize_t nwritten = 0, totwritten = 0;
1644
1645 while(clientHasPendingReplies(c)) {
1646 int ret = _writeToClient(c, &nwritten);
1647 if (ret == C_ERR) break;
1648 totwritten += nwritten;
1649 /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
1650 * bytes, in a single threaded server it's a good idea to serve
1651 * other clients as well, even if a very large request comes from
1652 * super fast link that is always able to accept data (in real world
1653 * scenario think about 'KEYS *' against the loopback interface).
1654 *
1655 * However if we are over the maxmemory limit we ignore that and
1656 * just deliver as much data as it is possible to deliver.
1657 *
1658 * Moreover, we also send as much as possible if the client is
1659 * a slave or a monitor (otherwise, on high-speed traffic, the
1660 * replication/output buffer will grow indefinitely) */
1661 if (totwritten > NET_MAX_WRITES_PER_EVENT &&
1662 (server.maxmemory == 0 ||
1663 zmalloc_used_memory() < server.maxmemory) &&
1664 !(c->flags & CLIENT_SLAVE)) break;
1665 }
1666 atomicIncr(server.stat_net_output_bytes, totwritten);
1667 if (nwritten == -1) {
1668 if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
1669 serverLog(LL_VERBOSE,
1670 "Error writing to client: %s", connGetLastError(c->conn));
1671 freeClientAsync(c);
1672 return C_ERR;
1673 }
1674 }
1675 if (totwritten > 0) {
1676 /* For clients representing masters we don't count sending data
1677 * as an interaction, since we always send REPLCONF ACK commands
1678 * that take some time to just fill the socket output buffer.
1679 * We just rely on data / pings received for timeout detection. */
1680 if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
1681 }
1682 if (!clientHasPendingReplies(c)) {
1683 c->sentlen = 0;
1684 /* Note that writeToClient() is called in a threaded way, but
1685 * adDeleteFileEvent() is not thread safe: however writeToClient()
1686 * is always called with handler_installed set to 0 from threads
1687 * so we are fine. */
1688 if (handler_installed) {
1689 serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
1690 connSetWriteHandler(c->conn, NULL);
1691 }
1692
1693 /* Close connection after entire reply has been sent. */
1694 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
1695 freeClientAsync(c);
1696 return C_ERR;
1697 }
1698 }
1699 updateClientMemUsage(c);
1700 return C_OK;
1701 }
1702
1703 /* Write event handler. Just send data to the client. */
sendReplyToClient(connection * conn)1704 void sendReplyToClient(connection *conn) {
1705 client *c = connGetPrivateData(conn);
1706 writeToClient(c,1);
1707 }
1708
1709 /* This function is called just before entering the event loop, in the hope
1710 * we can just write the replies to the client output buffer without any
1711 * need to use a syscall in order to install the writable event handler,
1712 * get it called, and so forth. */
handleClientsWithPendingWrites(void)1713 int handleClientsWithPendingWrites(void) {
1714 listIter li;
1715 listNode *ln;
1716 int processed = listLength(server.clients_pending_write);
1717
1718 listRewind(server.clients_pending_write,&li);
1719 while((ln = listNext(&li))) {
1720 client *c = listNodeValue(ln);
1721 c->flags &= ~CLIENT_PENDING_WRITE;
1722 listDelNode(server.clients_pending_write,ln);
1723
1724 /* If a client is protected, don't do anything,
1725 * that may trigger write error or recreate handler. */
1726 if (c->flags & CLIENT_PROTECTED) continue;
1727
1728 /* Don't write to clients that are going to be closed anyway. */
1729 if (c->flags & CLIENT_CLOSE_ASAP) continue;
1730
1731 /* Try to write buffers to the client socket. */
1732 if (writeToClient(c,0) == C_ERR) continue;
1733
1734 /* If after the synchronous writes above we still have data to
1735 * output to the client, we need to install the writable handler. */
1736 if (clientHasPendingReplies(c)) {
1737 int ae_barrier = 0;
1738 /* For the fsync=always policy, we want that a given FD is never
1739 * served for reading and writing in the same event loop iteration,
1740 * so that in the middle of receiving the query, and serving it
1741 * to the client, we'll call beforeSleep() that will do the
1742 * actual fsync of AOF to disk. the write barrier ensures that. */
1743 if (server.aof_state == AOF_ON &&
1744 server.aof_fsync == AOF_FSYNC_ALWAYS)
1745 {
1746 ae_barrier = 1;
1747 }
1748 if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
1749 freeClientAsync(c);
1750 }
1751 }
1752 }
1753 return processed;
1754 }
1755
1756 /* resetClient prepare the client to process the next command */
resetClient(client * c)1757 void resetClient(client *c) {
1758 redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
1759
1760 freeClientArgv(c);
1761 c->reqtype = 0;
1762 c->multibulklen = 0;
1763 c->bulklen = -1;
1764
1765 /* We clear the ASKING flag as well if we are not inside a MULTI, and
1766 * if what we just executed is not the ASKING command itself. */
1767 if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
1768 c->flags &= ~CLIENT_ASKING;
1769
1770 /* We do the same for the CACHING command as well. It also affects
1771 * the next command or transaction executed, in a way very similar
1772 * to ASKING. */
1773 if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand)
1774 c->flags &= ~CLIENT_TRACKING_CACHING;
1775
1776 /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
1777 * to the next command will be sent, but set the flag if the command
1778 * we just processed was "CLIENT REPLY SKIP". */
1779 c->flags &= ~CLIENT_REPLY_SKIP;
1780 if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
1781 c->flags |= CLIENT_REPLY_SKIP;
1782 c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
1783 }
1784 }
1785
1786 /* This function is used when we want to re-enter the event loop but there
1787 * is the risk that the client we are dealing with will be freed in some
1788 * way. This happens for instance in:
1789 *
1790 * * DEBUG RELOAD and similar.
1791 * * When a Lua script is in -BUSY state.
1792 *
1793 * So the function will protect the client by doing two things:
1794 *
1795 * 1) It removes the file events. This way it is not possible that an
1796 * error is signaled on the socket, freeing the client.
1797 * 2) Moreover it makes sure that if the client is freed in a different code
1798 * path, it is not really released, but only marked for later release. */
protectClient(client * c)1799 void protectClient(client *c) {
1800 c->flags |= CLIENT_PROTECTED;
1801 if (c->conn) {
1802 connSetReadHandler(c->conn,NULL);
1803 connSetWriteHandler(c->conn,NULL);
1804 }
1805 }
1806
1807 /* This will undo the client protection done by protectClient() */
unprotectClient(client * c)1808 void unprotectClient(client *c) {
1809 if (c->flags & CLIENT_PROTECTED) {
1810 c->flags &= ~CLIENT_PROTECTED;
1811 if (c->conn) {
1812 connSetReadHandler(c->conn,readQueryFromClient);
1813 if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
1814 }
1815 }
1816 }
1817
1818 /* Like processMultibulkBuffer(), but for the inline protocol instead of RESP,
1819 * this function consumes the client query buffer and creates a command ready
1820 * to be executed inside the client structure. Returns C_OK if the command
1821 * is ready to be executed, or C_ERR if there is still protocol to read to
1822 * have a well formed command. The function also returns C_ERR when there is
1823 * a protocol error: in such a case the client structure is setup to reply
1824 * with the error and close the connection. */
processInlineBuffer(client * c)1825 int processInlineBuffer(client *c) {
1826 char *newline;
1827 int argc, j, linefeed_chars = 1;
1828 sds *argv, aux;
1829 size_t querylen;
1830
1831 /* Search for end of line */
1832 newline = strchr(c->querybuf+c->qb_pos,'\n');
1833
1834 /* Nothing to do without a \r\n */
1835 if (newline == NULL) {
1836 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
1837 addReplyError(c,"Protocol error: too big inline request");
1838 setProtocolError("too big inline request",c);
1839 }
1840 return C_ERR;
1841 }
1842
1843 /* Handle the \r\n case. */
1844 if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
1845 newline--, linefeed_chars++;
1846
1847 /* Split the input buffer up to the \r\n */
1848 querylen = newline-(c->querybuf+c->qb_pos);
1849 aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
1850 argv = sdssplitargs(aux,&argc);
1851 sdsfree(aux);
1852 if (argv == NULL) {
1853 addReplyError(c,"Protocol error: unbalanced quotes in request");
1854 setProtocolError("unbalanced quotes in inline request",c);
1855 return C_ERR;
1856 }
1857
1858 /* Newline from slaves can be used to refresh the last ACK time.
1859 * This is useful for a slave to ping back while loading a big
1860 * RDB file. */
1861 if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
1862 c->repl_ack_time = server.unixtime;
1863
1864 /* Masters should never send us inline protocol to run actual
1865 * commands. If this happens, it is likely due to a bug in Redis where
1866 * we got some desynchronization in the protocol, for example
1867 * because of a PSYNC gone bad.
1868 *
1869 * However the is an exception: masters may send us just a newline
1870 * to keep the connection active. */
1871 if (querylen != 0 && c->flags & CLIENT_MASTER) {
1872 sdsfreesplitres(argv,argc);
1873 serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master.");
1874 setProtocolError("Master using the inline protocol. Desync?",c);
1875 return C_ERR;
1876 }
1877
1878 /* Move querybuffer position to the next query in the buffer. */
1879 c->qb_pos += querylen+linefeed_chars;
1880
1881 /* Setup argv array on client structure */
1882 if (argc) {
1883 if (c->argv) zfree(c->argv);
1884 c->argv_len = argc;
1885 c->argv = zmalloc(sizeof(robj*)*c->argv_len);
1886 c->argv_len_sum = 0;
1887 }
1888
1889 /* Create redis objects for all arguments. */
1890 for (c->argc = 0, j = 0; j < argc; j++) {
1891 c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
1892 c->argc++;
1893 c->argv_len_sum += sdslen(argv[j]);
1894 }
1895 zfree(argv);
1896 return C_OK;
1897 }
1898
1899 /* Helper function. Record protocol error details in server log,
1900 * and set the client as CLIENT_CLOSE_AFTER_REPLY and
1901 * CLIENT_PROTOCOL_ERROR. */
1902 #define PROTO_DUMP_LEN 128
setProtocolError(const char * errstr,client * c)1903 static void setProtocolError(const char *errstr, client *c) {
1904 if (server.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) {
1905 sds client = catClientInfoString(sdsempty(),c);
1906
1907 /* Sample some protocol to given an idea about what was inside. */
1908 char buf[256];
1909 if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
1910 snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos);
1911 } else {
1912 snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
1913 }
1914
1915 /* Remove non printable chars. */
1916 char *p = buf;
1917 while (*p != '\0') {
1918 if (!isprint(*p)) *p = '.';
1919 p++;
1920 }
1921
1922 /* Log all the client and protocol info. */
1923 int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING :
1924 LL_VERBOSE;
1925 serverLog(loglevel,
1926 "Protocol error (%s) from client: %s. %s", errstr, client, buf);
1927 sdsfree(client);
1928 }
1929 c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR);
1930 }
1931
1932 /* Process the query buffer for client 'c', setting up the client argument
1933 * vector for command execution. Returns C_OK if after running the function
1934 * the client has a well-formed ready to be processed command, otherwise
1935 * C_ERR if there is still to read more buffer to get the full command.
1936 * The function also returns C_ERR when there is a protocol error: in such a
1937 * case the client structure is setup to reply with the error and close
1938 * the connection.
1939 *
1940 * This function is called if processInputBuffer() detects that the next
1941 * command is in RESP format, so the first byte in the command is found
1942 * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
processMultibulkBuffer(client * c)1943 int processMultibulkBuffer(client *c) {
1944 char *newline = NULL;
1945 int ok;
1946 long long ll;
1947
1948 if (c->multibulklen == 0) {
1949 /* The client should have been reset */
1950 serverAssertWithInfo(c,NULL,c->argc == 0);
1951
1952 /* Multi bulk length cannot be read without a \r\n */
1953 newline = strchr(c->querybuf+c->qb_pos,'\r');
1954 if (newline == NULL) {
1955 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
1956 addReplyError(c,"Protocol error: too big mbulk count string");
1957 setProtocolError("too big mbulk count string",c);
1958 }
1959 return C_ERR;
1960 }
1961
1962 /* Buffer should also contain \n */
1963 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
1964 return C_ERR;
1965
1966 /* We know for sure there is a whole line since newline != NULL,
1967 * so go ahead and find out the multi bulk length. */
1968 serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
1969 ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
1970 if (!ok || ll > INT_MAX) {
1971 addReplyError(c,"Protocol error: invalid multibulk length");
1972 setProtocolError("invalid mbulk count",c);
1973 return C_ERR;
1974 } else if (ll > 10 && authRequired(c)) {
1975 addReplyError(c, "Protocol error: unauthenticated multibulk length");
1976 setProtocolError("unauth mbulk count", c);
1977 return C_ERR;
1978 }
1979
1980 c->qb_pos = (newline-c->querybuf)+2;
1981
1982 if (ll <= 0) return C_OK;
1983
1984 c->multibulklen = ll;
1985
1986 /* Setup argv array on client structure */
1987 if (c->argv) zfree(c->argv);
1988 c->argv_len = min(c->multibulklen, 1024);
1989 c->argv = zmalloc(sizeof(robj*)*c->argv_len);
1990 c->argv_len_sum = 0;
1991 }
1992
1993 serverAssertWithInfo(c,NULL,c->multibulklen > 0);
1994 while(c->multibulklen) {
1995 /* Read bulk length if unknown */
1996 if (c->bulklen == -1) {
1997 newline = strchr(c->querybuf+c->qb_pos,'\r');
1998 if (newline == NULL) {
1999 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
2000 addReplyError(c,
2001 "Protocol error: too big bulk count string");
2002 setProtocolError("too big bulk count string",c);
2003 return C_ERR;
2004 }
2005 break;
2006 }
2007
2008 /* Buffer should also contain \n */
2009 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
2010 break;
2011
2012 if (c->querybuf[c->qb_pos] != '$') {
2013 addReplyErrorFormat(c,
2014 "Protocol error: expected '$', got '%c'",
2015 c->querybuf[c->qb_pos]);
2016 setProtocolError("expected $ but got something else",c);
2017 return C_ERR;
2018 }
2019
2020 ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
2021 if (!ok || ll < 0 ||
2022 (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) {
2023 addReplyError(c,"Protocol error: invalid bulk length");
2024 setProtocolError("invalid bulk length",c);
2025 return C_ERR;
2026 } else if (ll > 16384 && authRequired(c)) {
2027 addReplyError(c, "Protocol error: unauthenticated bulk length");
2028 setProtocolError("unauth bulk length", c);
2029 return C_ERR;
2030 }
2031
2032 c->qb_pos = newline-c->querybuf+2;
2033 if (ll >= PROTO_MBULK_BIG_ARG) {
2034 /* If we are going to read a large object from network
2035 * try to make it likely that it will start at c->querybuf
2036 * boundary so that we can optimize object creation
2037 * avoiding a large copy of data.
2038 *
2039 * But only when the data we have not parsed is less than
2040 * or equal to ll+2. If the data length is greater than
2041 * ll+2, trimming querybuf is just a waste of time, because
2042 * at this time the querybuf contains not only our bulk. */
2043 if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
2044 sdsrange(c->querybuf,c->qb_pos,-1);
2045 c->qb_pos = 0;
2046 /* Hint the sds library about the amount of bytes this string is
2047 * going to contain. */
2048 c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf,ll+2-sdslen(c->querybuf));
2049 }
2050 }
2051 c->bulklen = ll;
2052 }
2053
2054 /* Read bulk argument */
2055 if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
2056 /* Not enough data (+2 == trailing \r\n) */
2057 break;
2058 } else {
2059 /* Check if we have space in argv, grow if needed */
2060 if (c->argc >= c->argv_len) {
2061 c->argv_len = min(c->argv_len < INT_MAX/2 ? c->argv_len*2 : INT_MAX, c->argc+c->multibulklen);
2062 c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len);
2063 }
2064
2065 /* Optimization: if the buffer contains JUST our bulk element
2066 * instead of creating a new object by *copying* the sds we
2067 * just use the current sds string. */
2068 if (c->qb_pos == 0 &&
2069 c->bulklen >= PROTO_MBULK_BIG_ARG &&
2070 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
2071 {
2072 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
2073 c->argv_len_sum += c->bulklen;
2074 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
2075 /* Assume that if we saw a fat argument we'll see another one
2076 * likely... */
2077 c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
2078 sdsclear(c->querybuf);
2079 } else {
2080 c->argv[c->argc++] =
2081 createStringObject(c->querybuf+c->qb_pos,c->bulklen);
2082 c->argv_len_sum += c->bulklen;
2083 c->qb_pos += c->bulklen+2;
2084 }
2085 c->bulklen = -1;
2086 c->multibulklen--;
2087 }
2088 }
2089
2090 /* We're done when c->multibulk == 0 */
2091 if (c->multibulklen == 0) return C_OK;
2092
2093 /* Still not ready to process the command */
2094 return C_ERR;
2095 }
2096
2097 /* Perform necessary tasks after a command was executed:
2098 *
2099 * 1. The client is reset unless there are reasons to avoid doing it.
2100 * 2. In the case of master clients, the replication offset is updated.
2101 * 3. Propagate commands we got from our master to replicas down the line. */
commandProcessed(client * c)2102 void commandProcessed(client *c) {
2103 /* If client is blocked(including paused), just return avoid reset and replicate.
2104 *
2105 * 1. Don't reset the client structure for blocked clients, so that the reply
2106 * callback will still be able to access the client argv and argc fields.
2107 * The client will be reset in unblockClient().
2108 * 2. Don't update replication offset or propagate commands to replicas,
2109 * since we have not applied the command. */
2110 if (c->flags & CLIENT_BLOCKED) return;
2111
2112 resetClient(c);
2113
2114 long long prev_offset = c->reploff;
2115 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
2116 /* Update the applied replication offset of our master. */
2117 c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
2118 }
2119
2120 /* If the client is a master we need to compute the difference
2121 * between the applied offset before and after processing the buffer,
2122 * to understand how much of the replication stream was actually
2123 * applied to the master state: this quantity, and its corresponding
2124 * part of the replication stream, will be propagated to the
2125 * sub-replicas and to the replication backlog. */
2126 if (c->flags & CLIENT_MASTER) {
2127 long long applied = c->reploff - prev_offset;
2128 if (applied) {
2129 replicationFeedStreamFromMasterStream(c->pending_querybuf,applied);
2130 sdsrange(c->pending_querybuf,applied,-1);
2131 }
2132 }
2133 }
2134
2135 /* This function calls processCommand(), but also performs a few sub tasks
2136 * for the client that are useful in that context:
2137 *
2138 * 1. It sets the current client to the client 'c'.
2139 * 2. calls commandProcessed() if the command was handled.
2140 *
2141 * The function returns C_ERR in case the client was freed as a side effect
2142 * of processing the command, otherwise C_OK is returned. */
processCommandAndResetClient(client * c)2143 int processCommandAndResetClient(client *c) {
2144 int deadclient = 0;
2145 client *old_client = server.current_client;
2146 server.current_client = c;
2147 if (processCommand(c) == C_OK) {
2148 commandProcessed(c);
2149 /* Update the client's memory to include output buffer growth following the
2150 * processed command. */
2151 updateClientMemUsage(c);
2152 }
2153
2154 if (server.current_client == NULL) deadclient = 1;
2155 /*
2156 * Restore the old client, this is needed because when a script
2157 * times out, we will get into this code from processEventsWhileBlocked.
2158 * Which will cause to set the server.current_client. If not restored
2159 * we will return 1 to our caller which will falsely indicate the client
2160 * is dead and will stop reading from its buffer.
2161 */
2162 server.current_client = old_client;
2163 /* performEvictions may flush slave output buffers. This may
2164 * result in a slave, that may be the active client, to be
2165 * freed. */
2166 return deadclient ? C_ERR : C_OK;
2167 }
2168
2169
2170 /* This function will execute any fully parsed commands pending on
2171 * the client. Returns C_ERR if the client is no longer valid after executing
2172 * the command, and C_OK for all other cases. */
processPendingCommandsAndResetClient(client * c)2173 int processPendingCommandsAndResetClient(client *c) {
2174 if (c->flags & CLIENT_PENDING_COMMAND) {
2175 c->flags &= ~CLIENT_PENDING_COMMAND;
2176 if (processCommandAndResetClient(c) == C_ERR) {
2177 return C_ERR;
2178 }
2179 }
2180 return C_OK;
2181 }
2182
2183 /* This function is called every time, in the client structure 'c', there is
2184 * more query buffer to process, because we read more data from the socket
2185 * or because a client was blocked and later reactivated, so there could be
2186 * pending query buffer, already representing a full command, to process.
2187 * return C_ERR in case the client was freed during the processing */
processInputBuffer(client * c)2188 int processInputBuffer(client *c) {
2189 /* Keep processing while there is something in the input buffer */
2190 while(c->qb_pos < sdslen(c->querybuf)) {
2191 /* Immediately abort if the client is in the middle of something. */
2192 if (c->flags & CLIENT_BLOCKED) break;
2193
2194 /* Don't process more buffers from clients that have already pending
2195 * commands to execute in c->argv. */
2196 if (c->flags & CLIENT_PENDING_COMMAND) break;
2197
2198 /* Don't process input from the master while there is a busy script
2199 * condition on the slave. We want just to accumulate the replication
2200 * stream (instead of replying -BUSY like we do with other clients) and
2201 * later resume the processing. */
2202 if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
2203
2204 /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
2205 * written to the client. Make sure to not let the reply grow after
2206 * this flag has been set (i.e. don't process more commands).
2207 *
2208 * The same applies for clients we want to terminate ASAP. */
2209 if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
2210
2211 /* Determine request type when unknown. */
2212 if (!c->reqtype) {
2213 if (c->querybuf[c->qb_pos] == '*') {
2214 c->reqtype = PROTO_REQ_MULTIBULK;
2215 } else {
2216 c->reqtype = PROTO_REQ_INLINE;
2217 }
2218 }
2219
2220 if (c->reqtype == PROTO_REQ_INLINE) {
2221 if (processInlineBuffer(c) != C_OK) break;
2222 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
2223 if (processMultibulkBuffer(c) != C_OK) break;
2224 } else {
2225 serverPanic("Unknown request type");
2226 }
2227
2228 /* Multibulk processing could see a <= 0 length. */
2229 if (c->argc == 0) {
2230 resetClient(c);
2231 } else {
2232 /* If we are in the context of an I/O thread, we can't really
2233 * execute the command here. All we can do is to flag the client
2234 * as one that needs to process the command. */
2235 if (io_threads_op != IO_THREADS_OP_IDLE) {
2236 serverAssert(io_threads_op == IO_THREADS_OP_READ);
2237 c->flags |= CLIENT_PENDING_COMMAND;
2238 break;
2239 }
2240
2241 /* We are finally ready to execute the command. */
2242 if (processCommandAndResetClient(c) == C_ERR) {
2243 /* If the client is no longer valid, we avoid exiting this
2244 * loop and trimming the client buffer later. So we return
2245 * ASAP in that case. */
2246 return C_ERR;
2247 }
2248 }
2249 }
2250
2251 /* Trim to pos */
2252 if (c->qb_pos) {
2253 sdsrange(c->querybuf,c->qb_pos,-1);
2254 c->qb_pos = 0;
2255 }
2256
2257 /* Update client memory usage after processing the query buffer, this is
2258 * important in case the query buffer is big and wasn't drained during
2259 * the above loop (because of partially sent big commands). */
2260 updateClientMemUsage(c);
2261
2262 return C_OK;
2263 }
2264
readQueryFromClient(connection * conn)2265 void readQueryFromClient(connection *conn) {
2266 client *c = connGetPrivateData(conn);
2267 int nread, big_arg = 0;
2268 size_t qblen, readlen;
2269
2270 /* Check if we want to read from the client later when exiting from
2271 * the event loop. This is the case if threaded I/O is enabled. */
2272 if (postponeClientRead(c)) return;
2273
2274 /* Update total number of reads on server */
2275 atomicIncr(server.stat_total_reads_processed, 1);
2276
2277 readlen = PROTO_IOBUF_LEN;
2278 /* If this is a multi bulk request, and we are processing a bulk reply
2279 * that is large enough, try to maximize the probability that the query
2280 * buffer contains exactly the SDS string representing the object, even
2281 * at the risk of requiring more read(2) calls. This way the function
2282 * processMultiBulkBuffer() can avoid copying buffers to create the
2283 * Redis Object representing the argument. */
2284 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
2285 && c->bulklen >= PROTO_MBULK_BIG_ARG)
2286 {
2287 ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
2288 big_arg = 1;
2289
2290 /* Note that the 'remaining' variable may be zero in some edge case,
2291 * for example once we resume a blocked client after CLIENT PAUSE. */
2292 if (remaining > 0) readlen = remaining;
2293 }
2294
2295 qblen = sdslen(c->querybuf);
2296 if (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN) {
2297 /* When reading a BIG_ARG we won't be reading more than that one arg
2298 * into the query buffer, so we don't need to pre-allocate more than we
2299 * need, so using the non-greedy growing. For an initial allocation of
2300 * the query buffer, we also don't wanna use the greedy growth, in order
2301 * to avoid collision with the RESIZE_THRESHOLD mechanism. */
2302 c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen);
2303 } else {
2304 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
2305
2306 /* Read as much as possible from the socket to save read(2) system calls. */
2307 readlen = sdsavail(c->querybuf);
2308 }
2309 nread = connRead(c->conn, c->querybuf+qblen, readlen);
2310 if (nread == -1) {
2311 if (connGetState(conn) == CONN_STATE_CONNECTED) {
2312 return;
2313 } else {
2314 serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
2315 freeClientAsync(c);
2316 goto done;
2317 }
2318 } else if (nread == 0) {
2319 if (server.verbosity <= LL_VERBOSE) {
2320 sds info = catClientInfoString(sdsempty(), c);
2321 serverLog(LL_VERBOSE, "Client closed connection %s", info);
2322 sdsfree(info);
2323 }
2324 freeClientAsync(c);
2325 goto done;
2326 } else if (c->flags & CLIENT_MASTER) {
2327 /* Append the query buffer to the pending (not applied) buffer
2328 * of the master. We'll use this buffer later in order to have a
2329 * copy of the string applied by the last command executed. */
2330 c->pending_querybuf = sdscatlen(c->pending_querybuf,
2331 c->querybuf+qblen,nread);
2332 }
2333
2334 sdsIncrLen(c->querybuf,nread);
2335 qblen = sdslen(c->querybuf);
2336 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
2337
2338 c->lastinteraction = server.unixtime;
2339 if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
2340 atomicIncr(server.stat_net_input_bytes, nread);
2341 if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
2342 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
2343
2344 bytes = sdscatrepr(bytes,c->querybuf,64);
2345 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
2346 sdsfree(ci);
2347 sdsfree(bytes);
2348 freeClientAsync(c);
2349 goto done;
2350 }
2351
2352 /* There is more data in the client input buffer, continue parsing it
2353 * and check if there is a full command to execute. */
2354 if (processInputBuffer(c) == C_ERR)
2355 c = NULL;
2356
2357 done:
2358 beforeNextClient(c);
2359 }
2360
2361 /* A Redis "Address String" is a colon separated ip:port pair.
2362 * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234".
2363 * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234".
2364 * For Unix sockets we use path:0, like in "/tmp/redis:0".
2365 *
2366 * An Address String always fits inside a buffer of NET_ADDR_STR_LEN bytes,
2367 * including the null term.
2368 *
2369 * On failure the function still populates 'addr' with the "?:0" string in case
2370 * you want to relax error checking or need to display something anyway (see
2371 * anetFdToString implementation for more info). */
genClientAddrString(client * client,char * addr,size_t addr_len,int fd_to_str_type)2372 void genClientAddrString(client *client, char *addr,
2373 size_t addr_len, int fd_to_str_type) {
2374 if (client->flags & CLIENT_UNIX_SOCKET) {
2375 /* Unix socket client. */
2376 snprintf(addr,addr_len,"%s:0",server.unixsocket);
2377 } else {
2378 /* TCP client. */
2379 connFormatFdAddr(client->conn,addr,addr_len,fd_to_str_type);
2380 }
2381 }
2382
2383 /* This function returns the client peer id, by creating and caching it
2384 * if client->peerid is NULL, otherwise returning the cached value.
2385 * The Peer ID never changes during the life of the client, however it
2386 * is expensive to compute. */
getClientPeerId(client * c)2387 char *getClientPeerId(client *c) {
2388 char peerid[NET_ADDR_STR_LEN];
2389
2390 if (c->peerid == NULL) {
2391 genClientAddrString(c,peerid,sizeof(peerid),FD_TO_PEER_NAME);
2392 c->peerid = sdsnew(peerid);
2393 }
2394 return c->peerid;
2395 }
2396
2397 /* This function returns the client bound socket name, by creating and caching
2398 * it if client->sockname is NULL, otherwise returning the cached value.
2399 * The Socket Name never changes during the life of the client, however it
2400 * is expensive to compute. */
getClientSockname(client * c)2401 char *getClientSockname(client *c) {
2402 char sockname[NET_ADDR_STR_LEN];
2403
2404 if (c->sockname == NULL) {
2405 genClientAddrString(c,sockname,sizeof(sockname),FD_TO_SOCK_NAME);
2406 c->sockname = sdsnew(sockname);
2407 }
2408 return c->sockname;
2409 }
2410
2411 /* Concatenate a string representing the state of a client in a human
2412 * readable format, into the sds string 's'. */
catClientInfoString(sds s,client * client)2413 sds catClientInfoString(sds s, client *client) {
2414 char flags[16], events[3], conninfo[CONN_INFO_LEN], *p;
2415
2416 p = flags;
2417 if (client->flags & CLIENT_SLAVE) {
2418 if (client->flags & CLIENT_MONITOR)
2419 *p++ = 'O';
2420 else
2421 *p++ = 'S';
2422 }
2423 if (client->flags & CLIENT_MASTER) *p++ = 'M';
2424 if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
2425 if (client->flags & CLIENT_MULTI) *p++ = 'x';
2426 if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
2427 if (client->flags & CLIENT_TRACKING) *p++ = 't';
2428 if (client->flags & CLIENT_TRACKING_BROKEN_REDIR) *p++ = 'R';
2429 if (client->flags & CLIENT_TRACKING_BCAST) *p++ = 'B';
2430 if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
2431 if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
2432 if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
2433 if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
2434 if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
2435 if (client->flags & CLIENT_READONLY) *p++ = 'r';
2436 if (client->flags & CLIENT_NO_EVICT) *p++ = 'e';
2437 if (p == flags) *p++ = 'N';
2438 *p++ = '\0';
2439
2440 p = events;
2441 if (client->conn) {
2442 if (connHasReadHandler(client->conn)) *p++ = 'r';
2443 if (connHasWriteHandler(client->conn)) *p++ = 'w';
2444 }
2445 *p = '\0';
2446
2447 /* Compute the total memory consumed by this client. */
2448 size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem);
2449
2450 size_t used_blocks_of_repl_buf = 0;
2451 if (client->ref_repl_buf_node) {
2452 replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
2453 replBufBlock *cur = listNodeValue(client->ref_repl_buf_node);
2454 used_blocks_of_repl_buf = last->id - cur->id + 1;
2455 }
2456
2457 sds cmdname = client->lastcmd ? getFullCommandName(client->lastcmd) : NULL;
2458 sds ret = sdscatfmt(s,
2459 "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
2460 (unsigned long long) client->id,
2461 getClientPeerId(client),
2462 getClientSockname(client),
2463 connGetInfo(client->conn, conninfo, sizeof(conninfo)),
2464 client->name ? (char*)client->name->ptr : "",
2465 (long long)(server.unixtime - client->ctime),
2466 (long long)(server.unixtime - client->lastinteraction),
2467 flags,
2468 client->db->id,
2469 (int) dictSize(client->pubsub_channels),
2470 (int) listLength(client->pubsub_patterns),
2471 (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
2472 (unsigned long long) sdslen(client->querybuf),
2473 (unsigned long long) sdsavail(client->querybuf),
2474 (unsigned long long) client->argv_len_sum,
2475 (unsigned long long) client->mstate.argv_len_sums,
2476 (unsigned long long) client->bufpos,
2477 (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf,
2478 (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
2479 (unsigned long long) total_mem,
2480 events,
2481 cmdname ? cmdname : "NULL",
2482 client->user ? client->user->name : "(superuser)",
2483 (client->flags & CLIENT_TRACKING) ? (long long) client->client_tracking_redirection : -1,
2484 client->resp);
2485 if (cmdname)
2486 sdsfree(cmdname);
2487 return ret;
2488 }
2489
getAllClientsInfoString(int type)2490 sds getAllClientsInfoString(int type) {
2491 listNode *ln;
2492 listIter li;
2493 client *client;
2494 sds o = sdsnewlen(SDS_NOINIT,200*listLength(server.clients));
2495 sdsclear(o);
2496 listRewind(server.clients,&li);
2497 while ((ln = listNext(&li)) != NULL) {
2498 client = listNodeValue(ln);
2499 if (type != -1 && getClientType(client) != type) continue;
2500 o = catClientInfoString(o,client);
2501 o = sdscatlen(o,"\n",1);
2502 }
2503 return o;
2504 }
2505
2506 /* This function implements CLIENT SETNAME, including replying to the
2507 * user with an error if the charset is wrong (in that case C_ERR is
2508 * returned). If the function succeeded C_OK is returned, and it's up
2509 * to the caller to send a reply if needed.
2510 *
2511 * Setting an empty string as name has the effect of unsetting the
2512 * currently set name: the client will remain unnamed.
2513 *
2514 * This function is also used to implement the HELLO SETNAME option. */
clientSetNameOrReply(client * c,robj * name)2515 int clientSetNameOrReply(client *c, robj *name) {
2516 int len = sdslen(name->ptr);
2517 char *p = name->ptr;
2518
2519 /* Setting the client name to an empty string actually removes
2520 * the current name. */
2521 if (len == 0) {
2522 if (c->name) decrRefCount(c->name);
2523 c->name = NULL;
2524 return C_OK;
2525 }
2526
2527 /* Otherwise check if the charset is ok. We need to do this otherwise
2528 * CLIENT LIST format will break. You should always be able to
2529 * split by space to get the different fields. */
2530 for (int j = 0; j < len; j++) {
2531 if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */
2532 addReplyError(c,
2533 "Client names cannot contain spaces, "
2534 "newlines or special characters.");
2535 return C_ERR;
2536 }
2537 }
2538 if (c->name) decrRefCount(c->name);
2539 c->name = name;
2540 incrRefCount(name);
2541 return C_OK;
2542 }
2543
2544 /* Reset the client state to resemble a newly connected client.
2545 */
resetCommand(client * c)2546 void resetCommand(client *c) {
2547 listNode *ln;
2548
2549 /* MONITOR clients are also marked with CLIENT_SLAVE, we need to
2550 * distinguish between the two.
2551 */
2552 if (c->flags & CLIENT_MONITOR) {
2553 ln = listSearchKey(server.monitors,c);
2554 serverAssert(ln != NULL);
2555 listDelNode(server.monitors,ln);
2556
2557 c->flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE);
2558 }
2559
2560 if (c->flags & (CLIENT_SLAVE|CLIENT_MASTER|CLIENT_MODULE)) {
2561 addReplyError(c,"can only reset normal client connections");
2562 return;
2563 }
2564
2565 if (c->flags & CLIENT_TRACKING) disableTracking(c);
2566 selectDb(c,0);
2567 c->resp = 2;
2568
2569 clientSetDefaultAuth(c);
2570 moduleNotifyUserChanged(c);
2571 discardTransaction(c);
2572
2573 pubsubUnsubscribeAllChannels(c,0);
2574 pubsubUnsubscribeAllPatterns(c,0);
2575
2576 if (c->name) {
2577 decrRefCount(c->name);
2578 c->name = NULL;
2579 }
2580
2581 /* Selectively clear state flags not covered above */
2582 c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB|
2583 CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP_NEXT);
2584
2585 addReplyStatus(c,"RESET");
2586 }
2587
2588 /* Disconnect the current client */
quitCommand(client * c)2589 void quitCommand(client *c) {
2590 addReply(c,shared.ok);
2591 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2592 }
2593
clientCommand(client * c)2594 void clientCommand(client *c) {
2595 listNode *ln;
2596 listIter li;
2597
2598 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
2599 const char *help[] = {
2600 "CACHING (YES|NO)",
2601 " Enable/disable tracking of the keys for next command in OPTIN/OPTOUT modes.",
2602 "GETREDIR",
2603 " Return the client ID we are redirecting to when tracking is enabled.",
2604 "GETNAME",
2605 " Return the name of the current connection.",
2606 "ID",
2607 " Return the ID of the current connection.",
2608 "INFO",
2609 " Return information about the current client connection.",
2610 "KILL <ip:port>",
2611 " Kill connection made from <ip:port>.",
2612 "KILL <option> <value> [<option> <value> [...]]",
2613 " Kill connections. Options are:",
2614 " * ADDR (<ip:port>|<unixsocket>:0)",
2615 " Kill connections made from the specified address",
2616 " * LADDR (<ip:port>|<unixsocket>:0)",
2617 " Kill connections made to specified local address",
2618 " * TYPE (normal|master|replica|pubsub)",
2619 " Kill connections by type.",
2620 " * USER <username>",
2621 " Kill connections authenticated by <username>.",
2622 " * SKIPME (YES|NO)",
2623 " Skip killing current connection (default: yes).",
2624 "LIST [options ...]",
2625 " Return information about client connections. Options:",
2626 " * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)",
2627 " Return clients of specified type.",
2628 "UNPAUSE",
2629 " Stop the current client pause, resuming traffic.",
2630 "PAUSE <timeout> [WRITE|ALL]",
2631 " Suspend all, or just write, clients for <timeout> milliseconds.",
2632 "REPLY (ON|OFF|SKIP)",
2633 " Control the replies sent to the current connection.",
2634 "SETNAME <name>",
2635 " Assign the name <name> to the current connection.",
2636 "GETNAME",
2637 " Get the name of the current connection.",
2638 "UNBLOCK <clientid> [TIMEOUT|ERROR]",
2639 " Unblock the specified blocked client.",
2640 "TRACKING (ON|OFF) [REDIRECT <id>] [BCAST] [PREFIX <prefix> [...]]",
2641 " [OPTIN] [OPTOUT] [NOLOOP]",
2642 " Control server assisted client side caching.",
2643 "TRACKINGINFO",
2644 " Report tracking status for the current connection.",
2645 "NO-EVICT (ON|OFF)",
2646 " Protect current client connection from eviction.",
2647 NULL
2648 };
2649 addReplyHelp(c, help);
2650 } else if (!strcasecmp(c->argv[1]->ptr,"id") && c->argc == 2) {
2651 /* CLIENT ID */
2652 addReplyLongLong(c,c->id);
2653 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
2654 /* CLIENT INFO */
2655 sds o = catClientInfoString(sdsempty(), c);
2656 o = sdscatlen(o,"\n",1);
2657 addReplyVerbatim(c,o,sdslen(o),"txt");
2658 sdsfree(o);
2659 } else if (!strcasecmp(c->argv[1]->ptr,"list")) {
2660 /* CLIENT LIST */
2661 int type = -1;
2662 sds o = NULL;
2663 if (c->argc == 4 && !strcasecmp(c->argv[2]->ptr,"type")) {
2664 type = getClientTypeByName(c->argv[3]->ptr);
2665 if (type == -1) {
2666 addReplyErrorFormat(c,"Unknown client type '%s'",
2667 (char*) c->argv[3]->ptr);
2668 return;
2669 }
2670 } else if (c->argc > 3 && !strcasecmp(c->argv[2]->ptr,"id")) {
2671 int j;
2672 o = sdsempty();
2673 for (j = 3; j < c->argc; j++) {
2674 long long cid;
2675 if (getLongLongFromObjectOrReply(c, c->argv[j], &cid,
2676 "Invalid client ID")) {
2677 sdsfree(o);
2678 return;
2679 }
2680 client *cl = lookupClientByID(cid);
2681 if (cl) {
2682 o = catClientInfoString(o, cl);
2683 o = sdscatlen(o, "\n", 1);
2684 }
2685 }
2686 } else if (c->argc != 2) {
2687 addReplyErrorObject(c,shared.syntaxerr);
2688 return;
2689 }
2690
2691 if (!o)
2692 o = getAllClientsInfoString(type);
2693 addReplyVerbatim(c,o,sdslen(o),"txt");
2694 sdsfree(o);
2695 } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) {
2696 /* CLIENT REPLY ON|OFF|SKIP */
2697 if (!strcasecmp(c->argv[2]->ptr,"on")) {
2698 c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF);
2699 addReply(c,shared.ok);
2700 } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
2701 c->flags |= CLIENT_REPLY_OFF;
2702 } else if (!strcasecmp(c->argv[2]->ptr,"skip")) {
2703 if (!(c->flags & CLIENT_REPLY_OFF))
2704 c->flags |= CLIENT_REPLY_SKIP_NEXT;
2705 } else {
2706 addReplyErrorObject(c,shared.syntaxerr);
2707 return;
2708 }
2709 } else if (!strcasecmp(c->argv[1]->ptr,"no-evict") && c->argc == 3) {
2710 /* CLIENT NO-EVICT ON|OFF */
2711 if (!strcasecmp(c->argv[2]->ptr,"on")) {
2712 c->flags |= CLIENT_NO_EVICT;
2713 addReply(c,shared.ok);
2714 } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
2715 c->flags &= ~CLIENT_NO_EVICT;
2716 addReply(c,shared.ok);
2717 } else {
2718 addReplyErrorObject(c,shared.syntaxerr);
2719 return;
2720 }
2721 } else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
2722 /* CLIENT KILL <ip:port>
2723 * CLIENT KILL <option> [value] ... <option> [value] */
2724 char *addr = NULL;
2725 char *laddr = NULL;
2726 user *user = NULL;
2727 int type = -1;
2728 uint64_t id = 0;
2729 int skipme = 1;
2730 int killed = 0, close_this_client = 0;
2731
2732 if (c->argc == 3) {
2733 /* Old style syntax: CLIENT KILL <addr> */
2734 addr = c->argv[2]->ptr;
2735 skipme = 0; /* With the old form, you can kill yourself. */
2736 } else if (c->argc > 3) {
2737 int i = 2; /* Next option index. */
2738
2739 /* New style syntax: parse options. */
2740 while(i < c->argc) {
2741 int moreargs = c->argc > i+1;
2742
2743 if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) {
2744 long tmp;
2745
2746 if (getRangeLongFromObjectOrReply(c, c->argv[i+1], 1, LONG_MAX, &tmp,
2747 "client-id should be greater than 0") != C_OK)
2748 return;
2749 id = tmp;
2750 } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) {
2751 type = getClientTypeByName(c->argv[i+1]->ptr);
2752 if (type == -1) {
2753 addReplyErrorFormat(c,"Unknown client type '%s'",
2754 (char*) c->argv[i+1]->ptr);
2755 return;
2756 }
2757 } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) {
2758 addr = c->argv[i+1]->ptr;
2759 } else if (!strcasecmp(c->argv[i]->ptr,"laddr") && moreargs) {
2760 laddr = c->argv[i+1]->ptr;
2761 } else if (!strcasecmp(c->argv[i]->ptr,"user") && moreargs) {
2762 user = ACLGetUserByName(c->argv[i+1]->ptr,
2763 sdslen(c->argv[i+1]->ptr));
2764 if (user == NULL) {
2765 addReplyErrorFormat(c,"No such user '%s'",
2766 (char*) c->argv[i+1]->ptr);
2767 return;
2768 }
2769 } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && moreargs) {
2770 if (!strcasecmp(c->argv[i+1]->ptr,"yes")) {
2771 skipme = 1;
2772 } else if (!strcasecmp(c->argv[i+1]->ptr,"no")) {
2773 skipme = 0;
2774 } else {
2775 addReplyErrorObject(c,shared.syntaxerr);
2776 return;
2777 }
2778 } else {
2779 addReplyErrorObject(c,shared.syntaxerr);
2780 return;
2781 }
2782 i += 2;
2783 }
2784 } else {
2785 addReplyErrorObject(c,shared.syntaxerr);
2786 return;
2787 }
2788
2789 /* Iterate clients killing all the matching clients. */
2790 listRewind(server.clients,&li);
2791 while ((ln = listNext(&li)) != NULL) {
2792 client *client = listNodeValue(ln);
2793 if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
2794 if (laddr && strcmp(getClientSockname(client),laddr) != 0) continue;
2795 if (type != -1 && getClientType(client) != type) continue;
2796 if (id != 0 && client->id != id) continue;
2797 if (user && client->user != user) continue;
2798 if (c == client && skipme) continue;
2799
2800 /* Kill it. */
2801 if (c == client) {
2802 close_this_client = 1;
2803 } else {
2804 freeClient(client);
2805 }
2806 killed++;
2807 }
2808
2809 /* Reply according to old/new format. */
2810 if (c->argc == 3) {
2811 if (killed == 0)
2812 addReplyError(c,"No such client");
2813 else
2814 addReply(c,shared.ok);
2815 } else {
2816 addReplyLongLong(c,killed);
2817 }
2818
2819 /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY
2820 * only after we queued the reply to its output buffers. */
2821 if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2822 } else if (!strcasecmp(c->argv[1]->ptr,"unblock") && (c->argc == 3 ||
2823 c->argc == 4))
2824 {
2825 /* CLIENT UNBLOCK <id> [timeout|error] */
2826 long long id;
2827 int unblock_error = 0;
2828
2829 if (c->argc == 4) {
2830 if (!strcasecmp(c->argv[3]->ptr,"timeout")) {
2831 unblock_error = 0;
2832 } else if (!strcasecmp(c->argv[3]->ptr,"error")) {
2833 unblock_error = 1;
2834 } else {
2835 addReplyError(c,
2836 "CLIENT UNBLOCK reason should be TIMEOUT or ERROR");
2837 return;
2838 }
2839 }
2840 if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
2841 != C_OK) return;
2842 struct client *target = lookupClientByID(id);
2843 if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
2844 if (unblock_error)
2845 addReplyError(target,
2846 "-UNBLOCKED client unblocked via CLIENT UNBLOCK");
2847 else
2848 replyToBlockedClientTimedOut(target);
2849 unblockClient(target);
2850 addReply(c,shared.cone);
2851 } else {
2852 addReply(c,shared.czero);
2853 }
2854 } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) {
2855 /* CLIENT SETNAME */
2856 if (clientSetNameOrReply(c,c->argv[2]) == C_OK)
2857 addReply(c,shared.ok);
2858 } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) {
2859 /* CLIENT GETNAME */
2860 if (c->name)
2861 addReplyBulk(c,c->name);
2862 else
2863 addReplyNull(c);
2864 } else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {
2865 /* CLIENT UNPAUSE */
2866 unpauseClients();
2867 addReply(c,shared.ok);
2868 } else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||
2869 c->argc == 4))
2870 {
2871 /* CLIENT PAUSE TIMEOUT [WRITE|ALL] */
2872 mstime_t end;
2873 int type = CLIENT_PAUSE_ALL;
2874 if (c->argc == 4) {
2875 if (!strcasecmp(c->argv[3]->ptr,"write")) {
2876 type = CLIENT_PAUSE_WRITE;
2877 } else if (!strcasecmp(c->argv[3]->ptr,"all")) {
2878 type = CLIENT_PAUSE_ALL;
2879 } else {
2880 addReplyError(c,
2881 "CLIENT PAUSE mode must be WRITE or ALL");
2882 return;
2883 }
2884 }
2885
2886 if (getTimeoutFromObjectOrReply(c,c->argv[2],&end,
2887 UNIT_MILLISECONDS) != C_OK) return;
2888 pauseClients(end, type);
2889 addReply(c,shared.ok);
2890 } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
2891 /* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
2892 * [PREFIX second] [OPTIN] [OPTOUT] [NOLOOP]... */
2893 long long redir = 0;
2894 uint64_t options = 0;
2895 robj **prefix = NULL;
2896 size_t numprefix = 0;
2897
2898 /* Parse the options. */
2899 for (int j = 3; j < c->argc; j++) {
2900 int moreargs = (c->argc-1) - j;
2901
2902 if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
2903 j++;
2904 if (redir != 0) {
2905 addReplyError(c,"A client can only redirect to a single "
2906 "other client");
2907 zfree(prefix);
2908 return;
2909 }
2910
2911 if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
2912 C_OK)
2913 {
2914 zfree(prefix);
2915 return;
2916 }
2917 /* We will require the client with the specified ID to exist
2918 * right now, even if it is possible that it gets disconnected
2919 * later. Still a valid sanity check. */
2920 if (lookupClientByID(redir) == NULL) {
2921 addReplyError(c,"The client ID you want redirect to "
2922 "does not exist");
2923 zfree(prefix);
2924 return;
2925 }
2926 } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
2927 options |= CLIENT_TRACKING_BCAST;
2928 } else if (!strcasecmp(c->argv[j]->ptr,"optin")) {
2929 options |= CLIENT_TRACKING_OPTIN;
2930 } else if (!strcasecmp(c->argv[j]->ptr,"optout")) {
2931 options |= CLIENT_TRACKING_OPTOUT;
2932 } else if (!strcasecmp(c->argv[j]->ptr,"noloop")) {
2933 options |= CLIENT_TRACKING_NOLOOP;
2934 } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
2935 j++;
2936 prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
2937 prefix[numprefix++] = c->argv[j];
2938 } else {
2939 zfree(prefix);
2940 addReplyErrorObject(c,shared.syntaxerr);
2941 return;
2942 }
2943 }
2944
2945 /* Options are ok: enable or disable the tracking for this client. */
2946 if (!strcasecmp(c->argv[2]->ptr,"on")) {
2947 /* Before enabling tracking, make sure options are compatible
2948 * among each other and with the current state of the client. */
2949 if (!(options & CLIENT_TRACKING_BCAST) && numprefix) {
2950 addReplyError(c,
2951 "PREFIX option requires BCAST mode to be enabled");
2952 zfree(prefix);
2953 return;
2954 }
2955
2956 if (c->flags & CLIENT_TRACKING) {
2957 int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
2958 int newbcast = !!(options & CLIENT_TRACKING_BCAST);
2959 if (oldbcast != newbcast) {
2960 addReplyError(c,
2961 "You can't switch BCAST mode on/off before disabling "
2962 "tracking for this client, and then re-enabling it with "
2963 "a different mode.");
2964 zfree(prefix);
2965 return;
2966 }
2967 }
2968
2969 if (options & CLIENT_TRACKING_BCAST &&
2970 options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT))
2971 {
2972 addReplyError(c,
2973 "OPTIN and OPTOUT are not compatible with BCAST");
2974 zfree(prefix);
2975 return;
2976 }
2977
2978 if (options & CLIENT_TRACKING_OPTIN && options & CLIENT_TRACKING_OPTOUT)
2979 {
2980 addReplyError(c,
2981 "You can't specify both OPTIN mode and OPTOUT mode");
2982 zfree(prefix);
2983 return;
2984 }
2985
2986 if ((options & CLIENT_TRACKING_OPTIN && c->flags & CLIENT_TRACKING_OPTOUT) ||
2987 (options & CLIENT_TRACKING_OPTOUT && c->flags & CLIENT_TRACKING_OPTIN))
2988 {
2989 addReplyError(c,
2990 "You can't switch OPTIN/OPTOUT mode before disabling "
2991 "tracking for this client, and then re-enabling it with "
2992 "a different mode.");
2993 zfree(prefix);
2994 return;
2995 }
2996
2997 if (options & CLIENT_TRACKING_BCAST) {
2998 if (!checkPrefixCollisionsOrReply(c,prefix,numprefix)) {
2999 zfree(prefix);
3000 return;
3001 }
3002 }
3003
3004 enableTracking(c,redir,options,prefix,numprefix);
3005 } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
3006 disableTracking(c);
3007 } else {
3008 zfree(prefix);
3009 addReplyErrorObject(c,shared.syntaxerr);
3010 return;
3011 }
3012 zfree(prefix);
3013 addReply(c,shared.ok);
3014 } else if (!strcasecmp(c->argv[1]->ptr,"caching") && c->argc >= 3) {
3015 if (!(c->flags & CLIENT_TRACKING)) {
3016 addReplyError(c,"CLIENT CACHING can be called only when the "
3017 "client is in tracking mode with OPTIN or "
3018 "OPTOUT mode enabled");
3019 return;
3020 }
3021
3022 char *opt = c->argv[2]->ptr;
3023 if (!strcasecmp(opt,"yes")) {
3024 if (c->flags & CLIENT_TRACKING_OPTIN) {
3025 c->flags |= CLIENT_TRACKING_CACHING;
3026 } else {
3027 addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode.");
3028 return;
3029 }
3030 } else if (!strcasecmp(opt,"no")) {
3031 if (c->flags & CLIENT_TRACKING_OPTOUT) {
3032 c->flags |= CLIENT_TRACKING_CACHING;
3033 } else {
3034 addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode.");
3035 return;
3036 }
3037 } else {
3038 addReplyErrorObject(c,shared.syntaxerr);
3039 return;
3040 }
3041
3042 /* Common reply for when we succeeded. */
3043 addReply(c,shared.ok);
3044 } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
3045 /* CLIENT GETREDIR */
3046 if (c->flags & CLIENT_TRACKING) {
3047 addReplyLongLong(c,c->client_tracking_redirection);
3048 } else {
3049 addReplyLongLong(c,-1);
3050 }
3051 } else if (!strcasecmp(c->argv[1]->ptr,"trackinginfo") && c->argc == 2) {
3052 addReplyMapLen(c,3);
3053
3054 /* Flags */
3055 addReplyBulkCString(c,"flags");
3056 void *arraylen_ptr = addReplyDeferredLen(c);
3057 int numflags = 0;
3058 addReplyBulkCString(c,c->flags & CLIENT_TRACKING ? "on" : "off");
3059 numflags++;
3060 if (c->flags & CLIENT_TRACKING_BCAST) {
3061 addReplyBulkCString(c,"bcast");
3062 numflags++;
3063 }
3064 if (c->flags & CLIENT_TRACKING_OPTIN) {
3065 addReplyBulkCString(c,"optin");
3066 numflags++;
3067 if (c->flags & CLIENT_TRACKING_CACHING) {
3068 addReplyBulkCString(c,"caching-yes");
3069 numflags++;
3070 }
3071 }
3072 if (c->flags & CLIENT_TRACKING_OPTOUT) {
3073 addReplyBulkCString(c,"optout");
3074 numflags++;
3075 if (c->flags & CLIENT_TRACKING_CACHING) {
3076 addReplyBulkCString(c,"caching-no");
3077 numflags++;
3078 }
3079 }
3080 if (c->flags & CLIENT_TRACKING_NOLOOP) {
3081 addReplyBulkCString(c,"noloop");
3082 numflags++;
3083 }
3084 if (c->flags & CLIENT_TRACKING_BROKEN_REDIR) {
3085 addReplyBulkCString(c,"broken_redirect");
3086 numflags++;
3087 }
3088 setDeferredSetLen(c,arraylen_ptr,numflags);
3089
3090 /* Redirect */
3091 addReplyBulkCString(c,"redirect");
3092 if (c->flags & CLIENT_TRACKING) {
3093 addReplyLongLong(c,c->client_tracking_redirection);
3094 } else {
3095 addReplyLongLong(c,-1);
3096 }
3097
3098 /* Prefixes */
3099 addReplyBulkCString(c,"prefixes");
3100 if (c->client_tracking_prefixes) {
3101 addReplyArrayLen(c,raxSize(c->client_tracking_prefixes));
3102 raxIterator ri;
3103 raxStart(&ri,c->client_tracking_prefixes);
3104 raxSeek(&ri,"^",NULL,0);
3105 while(raxNext(&ri)) {
3106 addReplyBulkCBuffer(c,ri.key,ri.key_len);
3107 }
3108 raxStop(&ri);
3109 } else {
3110 addReplyArrayLen(c,0);
3111 }
3112 } else {
3113 addReplySubcommandSyntaxError(c);
3114 }
3115 }
3116
3117 /* HELLO [<protocol-version> [AUTH <user> <password>] [SETNAME <name>] ] */
helloCommand(client * c)3118 void helloCommand(client *c) {
3119 long long ver = 0;
3120 int next_arg = 1;
3121
3122 if (c->argc >= 2) {
3123 if (getLongLongFromObjectOrReply(c, c->argv[next_arg++], &ver,
3124 "Protocol version is not an integer or out of range") != C_OK) {
3125 return;
3126 }
3127
3128 if (ver < 2 || ver > 3) {
3129 addReplyError(c,"-NOPROTO unsupported protocol version");
3130 return;
3131 }
3132 }
3133
3134 for (int j = next_arg; j < c->argc; j++) {
3135 int moreargs = (c->argc-1) - j;
3136 const char *opt = c->argv[j]->ptr;
3137 if (!strcasecmp(opt,"AUTH") && moreargs >= 2) {
3138 redactClientCommandArgument(c, j+1);
3139 redactClientCommandArgument(c, j+2);
3140 if (ACLAuthenticateUser(c, c->argv[j+1], c->argv[j+2]) == C_ERR) {
3141 addReplyError(c,"-WRONGPASS invalid username-password pair or user is disabled.");
3142 return;
3143 }
3144 j += 2;
3145 } else if (!strcasecmp(opt,"SETNAME") && moreargs) {
3146 if (clientSetNameOrReply(c, c->argv[j+1]) == C_ERR) return;
3147 j++;
3148 } else {
3149 addReplyErrorFormat(c,"Syntax error in HELLO option '%s'",opt);
3150 return;
3151 }
3152 }
3153
3154 /* At this point we need to be authenticated to continue. */
3155 if (!c->authenticated) {
3156 addReplyError(c,"-NOAUTH HELLO must be called with the client already "
3157 "authenticated, otherwise the HELLO AUTH <user> <pass> "
3158 "option can be used to authenticate the client and "
3159 "select the RESP protocol version at the same time");
3160 return;
3161 }
3162
3163 /* Let's switch to the specified RESP mode. */
3164 if (ver) c->resp = ver;
3165 addReplyMapLen(c,6 + !server.sentinel_mode);
3166
3167 addReplyBulkCString(c,"server");
3168 addReplyBulkCString(c,"redis");
3169
3170 addReplyBulkCString(c,"version");
3171 addReplyBulkCString(c,REDIS_VERSION);
3172
3173 addReplyBulkCString(c,"proto");
3174 addReplyLongLong(c,c->resp);
3175
3176 addReplyBulkCString(c,"id");
3177 addReplyLongLong(c,c->id);
3178
3179 addReplyBulkCString(c,"mode");
3180 if (server.sentinel_mode) addReplyBulkCString(c,"sentinel");
3181 else if (server.cluster_enabled) addReplyBulkCString(c,"cluster");
3182 else addReplyBulkCString(c,"standalone");
3183
3184 if (!server.sentinel_mode) {
3185 addReplyBulkCString(c,"role");
3186 addReplyBulkCString(c,server.masterhost ? "replica" : "master");
3187 }
3188
3189 addReplyBulkCString(c,"modules");
3190 addReplyLoadedModules(c);
3191 }
3192
3193 /* This callback is bound to POST and "Host:" command names. Those are not
3194 * really commands, but are used in security attacks in order to talk to
3195 * Redis instances via HTTP, with a technique called "cross protocol scripting"
3196 * which exploits the fact that services like Redis will discard invalid
3197 * HTTP headers and will process what follows.
3198 *
3199 * As a protection against this attack, Redis will terminate the connection
3200 * when a POST or "Host:" header is seen, and will log the event from
3201 * time to time (to avoid creating a DOS as a result of too many logs). */
securityWarningCommand(client * c)3202 void securityWarningCommand(client *c) {
3203 static time_t logged_time = 0;
3204 time_t now = time(NULL);
3205
3206 if (llabs(now-logged_time) > 60) {
3207 serverLog(LL_WARNING,"Possible SECURITY ATTACK detected. It looks like somebody is sending POST or Host: commands to Redis. This is likely due to an attacker attempting to use Cross Protocol Scripting to compromise your Redis instance. Connection aborted.");
3208 logged_time = now;
3209 }
3210 freeClientAsync(c);
3211 }
3212
3213 /* Keep track of the original command arguments so that we can generate
3214 * an accurate slowlog entry after the command has been executed. */
retainOriginalCommandVector(client * c)3215 static void retainOriginalCommandVector(client *c) {
3216 /* We already rewrote this command, so don't rewrite it again */
3217 if (c->original_argv) return;
3218 c->original_argc = c->argc;
3219 c->original_argv = zmalloc(sizeof(robj*)*(c->argc));
3220 for (int j = 0; j < c->argc; j++) {
3221 c->original_argv[j] = c->argv[j];
3222 incrRefCount(c->argv[j]);
3223 }
3224 }
3225
3226 /* Redact a given argument to prevent it from being shown
3227 * in the slowlog. This information is stored in the
3228 * original_argv array. */
redactClientCommandArgument(client * c,int argc)3229 void redactClientCommandArgument(client *c, int argc) {
3230 retainOriginalCommandVector(c);
3231 decrRefCount(c->argv[argc]);
3232 c->original_argv[argc] = shared.redacted;
3233 }
3234
3235 /* Rewrite the command vector of the client. All the new objects ref count
3236 * is incremented. The old command vector is freed, and the old objects
3237 * ref count is decremented. */
rewriteClientCommandVector(client * c,int argc,...)3238 void rewriteClientCommandVector(client *c, int argc, ...) {
3239 va_list ap;
3240 int j;
3241 robj **argv; /* The new argument vector */
3242
3243 argv = zmalloc(sizeof(robj*)*argc);
3244 va_start(ap,argc);
3245 for (j = 0; j < argc; j++) {
3246 robj *a;
3247
3248 a = va_arg(ap, robj*);
3249 argv[j] = a;
3250 incrRefCount(a);
3251 }
3252 replaceClientCommandVector(c, argc, argv);
3253 va_end(ap);
3254 }
3255
3256 /* Completely replace the client command vector with the provided one. */
replaceClientCommandVector(client * c,int argc,robj ** argv)3257 void replaceClientCommandVector(client *c, int argc, robj **argv) {
3258 int j;
3259 retainOriginalCommandVector(c);
3260 freeClientArgv(c);
3261 zfree(c->argv);
3262 c->argv = argv;
3263 c->argc = argc;
3264 c->argv_len_sum = 0;
3265 for (j = 0; j < c->argc; j++)
3266 if (c->argv[j])
3267 c->argv_len_sum += getStringObjectLen(c->argv[j]);
3268 c->cmd = lookupCommandOrOriginal(c->argv,c->argc);
3269 serverAssertWithInfo(c,NULL,c->cmd != NULL);
3270 }
3271
3272 /* Rewrite a single item in the command vector.
3273 * The new val ref count is incremented, and the old decremented.
3274 *
3275 * It is possible to specify an argument over the current size of the
3276 * argument vector: in this case the array of objects gets reallocated
3277 * and c->argc set to the max value. However it's up to the caller to
3278 *
3279 * 1. Make sure there are no "holes" and all the arguments are set.
3280 * 2. If the original argument vector was longer than the one we
3281 * want to end with, it's up to the caller to set c->argc and
3282 * free the no longer used objects on c->argv. */
rewriteClientCommandArgument(client * c,int i,robj * newval)3283 void rewriteClientCommandArgument(client *c, int i, robj *newval) {
3284 robj *oldval;
3285 retainOriginalCommandVector(c);
3286
3287 /* We need to handle both extending beyond argc (just update it and
3288 * initialize the new element) or beyond argv_len (realloc is needed).
3289 */
3290 if (i >= c->argc) {
3291 if (i >= c->argv_len) {
3292 c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1));
3293 c->argv_len = i+1;
3294 }
3295 c->argc = i+1;
3296 c->argv[i] = NULL;
3297 }
3298 oldval = c->argv[i];
3299 if (oldval) c->argv_len_sum -= getStringObjectLen(oldval);
3300 if (newval) c->argv_len_sum += getStringObjectLen(newval);
3301 c->argv[i] = newval;
3302 incrRefCount(newval);
3303 if (oldval) decrRefCount(oldval);
3304
3305 /* If this is the command name make sure to fix c->cmd. */
3306 if (i == 0) {
3307 c->cmd = lookupCommandOrOriginal(c->argv,c->argc);
3308 serverAssertWithInfo(c,NULL,c->cmd != NULL);
3309 }
3310 }
3311
3312 /* This function returns the number of bytes that Redis is
3313 * using to store the reply still not read by the client.
3314 *
3315 * Note: this function is very fast so can be called as many time as
3316 * the caller wishes. The main usage of this function currently is
3317 * enforcing the client output length limits. */
getClientOutputBufferMemoryUsage(client * c)3318 size_t getClientOutputBufferMemoryUsage(client *c) {
3319 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
3320 size_t repl_buf_size = 0;
3321 size_t repl_node_num = 0;
3322 size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock);
3323 if (c->ref_repl_buf_node) {
3324 replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
3325 replBufBlock *cur = listNodeValue(c->ref_repl_buf_node);
3326 repl_buf_size = last->repl_offset + last->size - cur->repl_offset;
3327 repl_node_num = last->id - cur->id + 1;
3328 }
3329 return repl_buf_size + (repl_node_size*repl_node_num);
3330 } else {
3331 size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
3332 return c->reply_bytes + (list_item_size*listLength(c->reply));
3333 }
3334 }
3335
3336 /* Returns the total client's memory usage.
3337 * Optionally, if output_buffer_mem_usage is not NULL, it fills it with
3338 * the client output buffer memory usage portion of the total. */
getClientMemoryUsage(client * c,size_t * output_buffer_mem_usage)3339 size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
3340 size_t mem = getClientOutputBufferMemoryUsage(c);
3341 if (output_buffer_mem_usage != NULL)
3342 *output_buffer_mem_usage = mem;
3343 mem += sdsZmallocSize(c->querybuf);
3344 mem += zmalloc_size(c);
3345 /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
3346 * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
3347 * spot problematic clients. */
3348 mem += c->argv_len_sum + sizeof(robj*)*c->argc;
3349 mem += multiStateMemOverhead(c);
3350
3351 /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers
3352 * to the strings themselves because they aren't stored per client. */
3353 mem += listLength(c->pubsub_patterns) * sizeof(listNode);
3354 mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) +
3355 dictSlots(c->pubsub_channels) * sizeof(dictEntry*);
3356
3357 /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire rax */
3358 if (c->client_tracking_prefixes)
3359 mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*));
3360
3361 return mem;
3362 }
3363
3364 /* Get the class of a client, used in order to enforce limits to different
3365 * classes of clients.
3366 *
3367 * The function will return one of the following:
3368 * CLIENT_TYPE_NORMAL -> Normal client
3369 * CLIENT_TYPE_SLAVE -> Slave
3370 * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
3371 * CLIENT_TYPE_MASTER -> The client representing our replication master.
3372 */
getClientType(client * c)3373 int getClientType(client *c) {
3374 if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
3375 /* Even though MONITOR clients are marked as replicas, we
3376 * want the expose them as normal clients. */
3377 if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
3378 return CLIENT_TYPE_SLAVE;
3379 if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;
3380 return CLIENT_TYPE_NORMAL;
3381 }
3382
getClientTypeByName(char * name)3383 int getClientTypeByName(char *name) {
3384 if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;
3385 else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;
3386 else if (!strcasecmp(name,"replica")) return CLIENT_TYPE_SLAVE;
3387 else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;
3388 else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER;
3389 else return -1;
3390 }
3391
getClientTypeName(int class)3392 char *getClientTypeName(int class) {
3393 switch(class) {
3394 case CLIENT_TYPE_NORMAL: return "normal";
3395 case CLIENT_TYPE_SLAVE: return "slave";
3396 case CLIENT_TYPE_PUBSUB: return "pubsub";
3397 case CLIENT_TYPE_MASTER: return "master";
3398 default: return NULL;
3399 }
3400 }
3401
3402 /* The function checks if the client reached output buffer soft or hard
3403 * limit, and also update the state needed to check the soft limit as
3404 * a side effect.
3405 *
3406 * Return value: non-zero if the client reached the soft or the hard limit.
3407 * Otherwise zero is returned. */
checkClientOutputBufferLimits(client * c)3408 int checkClientOutputBufferLimits(client *c) {
3409 int soft = 0, hard = 0, class;
3410 unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
3411
3412 class = getClientType(c);
3413 /* For the purpose of output buffer limiting, masters are handled
3414 * like normal clients. */
3415 if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
3416
3417 /* Note that it doesn't make sense to set the replica clients output buffer
3418 * limit lower than the repl-backlog-size config (partial sync will succeed
3419 * and then replica will get disconnected).
3420 * Such a configuration is ignored (the size of repl-backlog-size will be used).
3421 * This doesn't have memory consumption implications since the replica client
3422 * will share the backlog buffers memory. */
3423 size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes;
3424 if (class == CLIENT_TYPE_SLAVE && hard_limit_bytes &&
3425 (long long)hard_limit_bytes < server.repl_backlog_size)
3426 hard_limit_bytes = server.repl_backlog_size;
3427 if (server.client_obuf_limits[class].hard_limit_bytes &&
3428 used_mem >= hard_limit_bytes)
3429 hard = 1;
3430 if (server.client_obuf_limits[class].soft_limit_bytes &&
3431 used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
3432 soft = 1;
3433
3434 /* We need to check if the soft limit is reached continuously for the
3435 * specified amount of seconds. */
3436 if (soft) {
3437 if (c->obuf_soft_limit_reached_time == 0) {
3438 c->obuf_soft_limit_reached_time = server.unixtime;
3439 soft = 0; /* First time we see the soft limit reached */
3440 } else {
3441 time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
3442
3443 if (elapsed <=
3444 server.client_obuf_limits[class].soft_limit_seconds) {
3445 soft = 0; /* The client still did not reached the max number of
3446 seconds for the soft limit to be considered
3447 reached. */
3448 }
3449 }
3450 } else {
3451 c->obuf_soft_limit_reached_time = 0;
3452 }
3453 return soft || hard;
3454 }
3455
3456 /* Asynchronously close a client if soft or hard limit is reached on the
3457 * output buffer size. The caller can check if the client will be closed
3458 * checking if the client CLIENT_CLOSE_ASAP flag is set.
3459 *
3460 * Note: we need to close the client asynchronously because this function is
3461 * called from contexts where the client can't be freed safely, i.e. from the
3462 * lower level functions pushing data inside the client output buffers.
3463 * When `async` is set to 0, we close the client immediately, this is
3464 * useful when called from cron.
3465 *
3466 * Returns 1 if client was (flagged) closed. */
closeClientOnOutputBufferLimitReached(client * c,int async)3467 int closeClientOnOutputBufferLimitReached(client *c, int async) {
3468 if (!c->conn) return 0; /* It is unsafe to free fake clients. */
3469 serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
3470 /* Note that c->reply_bytes is irrelevant for replica clients
3471 * (they use the global repl buffers). */
3472 if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) ||
3473 c->flags & CLIENT_CLOSE_ASAP) return 0;
3474 if (checkClientOutputBufferLimits(c)) {
3475 sds client = catClientInfoString(sdsempty(),c);
3476
3477 if (async) {
3478 freeClientAsync(c);
3479 serverLog(LL_WARNING,
3480 "Client %s scheduled to be closed ASAP for overcoming of output buffer limits.",
3481 client);
3482 } else {
3483 freeClient(c);
3484 serverLog(LL_WARNING,
3485 "Client %s closed for overcoming of output buffer limits.",
3486 client);
3487 }
3488 sdsfree(client);
3489 return 1;
3490 }
3491 return 0;
3492 }
3493
3494 /* Helper function used by performEvictions() in order to flush slaves
3495 * output buffers without returning control to the event loop.
3496 * This is also called by SHUTDOWN for a best-effort attempt to send
3497 * slaves the latest writes. */
flushSlavesOutputBuffers(void)3498 void flushSlavesOutputBuffers(void) {
3499 listIter li;
3500 listNode *ln;
3501
3502 listRewind(server.slaves,&li);
3503 while((ln = listNext(&li))) {
3504 client *slave = listNodeValue(ln);
3505 int can_receive_writes = connHasWriteHandler(slave->conn) ||
3506 (slave->flags & CLIENT_PENDING_WRITE);
3507
3508 /* We don't want to send the pending data to the replica in a few
3509 * cases:
3510 *
3511 * 1. For some reason there is neither the write handler installed
3512 * nor the client is flagged as to have pending writes: for some
3513 * reason this replica may not be set to receive data. This is
3514 * just for the sake of defensive programming.
3515 *
3516 * 2. The put_online_on_ack flag is true. To know why we don't want
3517 * to send data to the replica in this case, please grep for the
3518 * flag for this flag.
3519 *
3520 * 3. Obviously if the slave is not ONLINE.
3521 */
3522 if (slave->replstate == SLAVE_STATE_ONLINE &&
3523 can_receive_writes &&
3524 !slave->repl_put_online_on_ack &&
3525 clientHasPendingReplies(slave))
3526 {
3527 writeToClient(slave,0);
3528 }
3529 }
3530 }
3531
3532 /* Pause clients up to the specified unixtime (in ms) for a given type of
3533 * commands.
3534 *
3535 * A main use case of this function is to allow pausing replication traffic
3536 * so that a failover without data loss to occur. Replicas will continue to receive
3537 * traffic to facilitate this functionality.
3538 *
3539 * This function is also internally used by Redis Cluster for the manual
3540 * failover procedure implemented by CLUSTER FAILOVER.
3541 *
3542 * The function always succeed, even if there is already a pause in progress.
3543 * In such a case, the duration is set to the maximum and new end time and the
3544 * type is set to the more restrictive type of pause. */
pauseClients(mstime_t end,pause_type type)3545 void pauseClients(mstime_t end, pause_type type) {
3546 if (type > server.client_pause_type) {
3547 server.client_pause_type = type;
3548 }
3549
3550 if (end > server.client_pause_end_time) {
3551 server.client_pause_end_time = end;
3552 }
3553
3554 /* We allow write commands that were queued
3555 * up before and after to execute. We need
3556 * to track this state so that we don't assert
3557 * in propagate(). */
3558 if (server.in_exec) {
3559 server.client_pause_in_transaction = 1;
3560 }
3561 }
3562
3563 /* Unpause clients and queue them for reprocessing. */
unpauseClients(void)3564 void unpauseClients(void) {
3565 listNode *ln;
3566 listIter li;
3567 client *c;
3568
3569 server.client_pause_type = CLIENT_PAUSE_OFF;
3570 server.client_pause_end_time = 0;
3571
3572 /* Unblock all of the clients so they are reprocessed. */
3573 listRewind(server.paused_clients,&li);
3574 while ((ln = listNext(&li)) != NULL) {
3575 c = listNodeValue(ln);
3576 unblockClient(c);
3577 }
3578 }
3579
3580 /* Returns true if clients are paused and false otherwise. */
areClientsPaused(void)3581 int areClientsPaused(void) {
3582 return server.client_pause_type != CLIENT_PAUSE_OFF;
3583 }
3584
3585 /* Checks if the current client pause has elapsed and unpause clients
3586 * if it has. Also returns true if clients are now paused and false
3587 * otherwise. */
checkClientPauseTimeoutAndReturnIfPaused(void)3588 int checkClientPauseTimeoutAndReturnIfPaused(void) {
3589 if (!areClientsPaused())
3590 return 0;
3591 if (server.client_pause_end_time < server.mstime) {
3592 unpauseClients();
3593 }
3594 return areClientsPaused();
3595 }
3596
3597 /* This function is called by Redis in order to process a few events from
3598 * time to time while blocked into some not interruptible operation.
3599 * This allows to reply to clients with the -LOADING error while loading the
3600 * data set at startup or after a full resynchronization with the master
3601 * and so forth.
3602 *
3603 * It calls the event loop in order to process a few events. Specifically we
3604 * try to call the event loop 4 times as long as we receive acknowledge that
3605 * some event was processed, in order to go forward with the accept, read,
3606 * write, close sequence needed to serve a client.
3607 *
3608 * The function returns the total number of events processed. */
processEventsWhileBlocked(void)3609 void processEventsWhileBlocked(void) {
3610 int iterations = 4; /* See the function top-comment. */
3611
3612 /* Update our cached time since it is used to create and update the last
3613 * interaction time with clients and for other important things. */
3614 updateCachedTime(0);
3615
3616 /* Note: when we are processing events while blocked (for instance during
3617 * busy Lua scripts), we set a global flag. When such flag is set, we
3618 * avoid handling the read part of clients using threaded I/O.
3619 * See https://github.com/redis/redis/issues/6988 for more info. */
3620 ProcessingEventsWhileBlocked = 1;
3621 while (iterations--) {
3622 long long startval = server.events_processed_while_blocked;
3623 long long ae_events = aeProcessEvents(server.el,
3624 AE_FILE_EVENTS|AE_DONT_WAIT|
3625 AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
3626 /* Note that server.events_processed_while_blocked will also get
3627 * incremented by callbacks called by the event loop handlers. */
3628 server.events_processed_while_blocked += ae_events;
3629 long long events = server.events_processed_while_blocked - startval;
3630 if (!events) break;
3631 }
3632
3633 whileBlockedCron();
3634
3635 ProcessingEventsWhileBlocked = 0;
3636 }
3637
3638 /* ==========================================================================
3639 * Threaded I/O
3640 * ========================================================================== */
3641
3642 #define IO_THREADS_MAX_NUM 128
3643
3644 pthread_t io_threads[IO_THREADS_MAX_NUM];
3645 pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
3646 redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
3647 int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??!
3648
3649 /* This is the list of clients each thread will serve when threaded I/O is
3650 * used. We spawn io_threads_num-1 threads, since one is the main thread
3651 * itself. */
3652 list *io_threads_list[IO_THREADS_MAX_NUM];
3653
getIOPendingCount(int i)3654 static inline unsigned long getIOPendingCount(int i) {
3655 unsigned long count = 0;
3656 atomicGetWithSync(io_threads_pending[i], count);
3657 return count;
3658 }
3659
setIOPendingCount(int i,unsigned long count)3660 static inline void setIOPendingCount(int i, unsigned long count) {
3661 atomicSetWithSync(io_threads_pending[i], count);
3662 }
3663
IOThreadMain(void * myid)3664 void *IOThreadMain(void *myid) {
3665 /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
3666 * used by the thread to just manipulate a single sub-array of clients. */
3667 long id = (unsigned long)myid;
3668 char thdname[16];
3669
3670 snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
3671 redis_set_thread_title(thdname);
3672 redisSetCpuAffinity(server.server_cpulist);
3673 makeThreadKillable();
3674
3675 while(1) {
3676 /* Wait for start */
3677 for (int j = 0; j < 1000000; j++) {
3678 if (getIOPendingCount(id) != 0) break;
3679 }
3680
3681 /* Give the main thread a chance to stop this thread. */
3682 if (getIOPendingCount(id) == 0) {
3683 pthread_mutex_lock(&io_threads_mutex[id]);
3684 pthread_mutex_unlock(&io_threads_mutex[id]);
3685 continue;
3686 }
3687
3688 serverAssert(getIOPendingCount(id) != 0);
3689
3690 /* Process: note that the main thread will never touch our list
3691 * before we drop the pending count to 0. */
3692 listIter li;
3693 listNode *ln;
3694 listRewind(io_threads_list[id],&li);
3695 while((ln = listNext(&li))) {
3696 client *c = listNodeValue(ln);
3697 if (io_threads_op == IO_THREADS_OP_WRITE) {
3698 writeToClient(c,0);
3699 } else if (io_threads_op == IO_THREADS_OP_READ) {
3700 readQueryFromClient(c->conn);
3701 } else {
3702 serverPanic("io_threads_op value is unknown");
3703 }
3704 }
3705 listEmpty(io_threads_list[id]);
3706 setIOPendingCount(id, 0);
3707 }
3708 }
3709
3710 /* Initialize the data structures needed for threaded I/O. */
initThreadedIO(void)3711 void initThreadedIO(void) {
3712 server.io_threads_active = 0; /* We start with threads not active. */
3713
3714 /* Indicate that io-threads are currently idle */
3715 io_threads_op = IO_THREADS_OP_IDLE;
3716
3717 /* Don't spawn any thread if the user selected a single thread:
3718 * we'll handle I/O directly from the main thread. */
3719 if (server.io_threads_num == 1) return;
3720
3721 if (server.io_threads_num > IO_THREADS_MAX_NUM) {
3722 serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
3723 "The maximum number is %d.", IO_THREADS_MAX_NUM);
3724 exit(1);
3725 }
3726
3727 /* Spawn and initialize the I/O threads. */
3728 for (int i = 0; i < server.io_threads_num; i++) {
3729 /* Things we do for all the threads including the main thread. */
3730 io_threads_list[i] = listCreate();
3731 if (i == 0) continue; /* Thread 0 is the main thread. */
3732
3733 /* Things we do only for the additional threads. */
3734 pthread_t tid;
3735 pthread_mutex_init(&io_threads_mutex[i],NULL);
3736 setIOPendingCount(i, 0);
3737 pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
3738 if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
3739 serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
3740 exit(1);
3741 }
3742 io_threads[i] = tid;
3743 }
3744 }
3745
killIOThreads(void)3746 void killIOThreads(void) {
3747 int err, j;
3748 for (j = 0; j < server.io_threads_num; j++) {
3749 if (io_threads[j] == pthread_self()) continue;
3750 if (io_threads[j] && pthread_cancel(io_threads[j]) == 0) {
3751 if ((err = pthread_join(io_threads[j],NULL)) != 0) {
3752 serverLog(LL_WARNING,
3753 "IO thread(tid:%lu) can not be joined: %s",
3754 (unsigned long)io_threads[j], strerror(err));
3755 } else {
3756 serverLog(LL_WARNING,
3757 "IO thread(tid:%lu) terminated",(unsigned long)io_threads[j]);
3758 }
3759 }
3760 }
3761 }
3762
startThreadedIO(void)3763 void startThreadedIO(void) {
3764 serverAssert(server.io_threads_active == 0);
3765 for (int j = 1; j < server.io_threads_num; j++)
3766 pthread_mutex_unlock(&io_threads_mutex[j]);
3767 server.io_threads_active = 1;
3768 }
3769
stopThreadedIO(void)3770 void stopThreadedIO(void) {
3771 /* We may have still clients with pending reads when this function
3772 * is called: handle them before stopping the threads. */
3773 handleClientsWithPendingReadsUsingThreads();
3774 serverAssert(server.io_threads_active == 1);
3775 for (int j = 1; j < server.io_threads_num; j++)
3776 pthread_mutex_lock(&io_threads_mutex[j]);
3777 server.io_threads_active = 0;
3778 }
3779
3780 /* This function checks if there are not enough pending clients to justify
3781 * taking the I/O threads active: in that case I/O threads are stopped if
3782 * currently active. We track the pending writes as a measure of clients
3783 * we need to handle in parallel, however the I/O threading is disabled
3784 * globally for reads as well if we have too little pending clients.
3785 *
3786 * The function returns 0 if the I/O threading should be used because there
3787 * are enough active threads, otherwise 1 is returned and the I/O threads
3788 * could be possibly stopped (if already active) as a side effect. */
stopThreadedIOIfNeeded(void)3789 int stopThreadedIOIfNeeded(void) {
3790 int pending = listLength(server.clients_pending_write);
3791
3792 /* Return ASAP if IO threads are disabled (single threaded mode). */
3793 if (server.io_threads_num == 1) return 1;
3794
3795 if (pending < (server.io_threads_num*2)) {
3796 if (server.io_threads_active) stopThreadedIO();
3797 return 1;
3798 } else {
3799 return 0;
3800 }
3801 }
3802
3803 /* This function achieves thread safety using a fan-out -> fan-in paradigm:
3804 * Fan out: The main thread fans out work to the io-threads which block until
3805 * setIOPendingCount() is called with a value larger than 0 by the main thread.
3806 * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
3807 * it can safely perform post-processing and return to normal synchronous
3808 * work. */
handleClientsWithPendingWritesUsingThreads(void)3809 int handleClientsWithPendingWritesUsingThreads(void) {
3810 int processed = listLength(server.clients_pending_write);
3811 if (processed == 0) return 0; /* Return ASAP if there are no clients. */
3812
3813 /* If I/O threads are disabled or we have few clients to serve, don't
3814 * use I/O threads, but the boring synchronous code. */
3815 if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
3816 return handleClientsWithPendingWrites();
3817 }
3818
3819 /* Start threads if needed. */
3820 if (!server.io_threads_active) startThreadedIO();
3821
3822 /* Distribute the clients across N different lists. */
3823 listIter li;
3824 listNode *ln;
3825 listRewind(server.clients_pending_write,&li);
3826 int item_id = 0;
3827 while((ln = listNext(&li))) {
3828 client *c = listNodeValue(ln);
3829 c->flags &= ~CLIENT_PENDING_WRITE;
3830
3831 /* Remove clients from the list of pending writes since
3832 * they are going to be closed ASAP. */
3833 if (c->flags & CLIENT_CLOSE_ASAP) {
3834 listDelNode(server.clients_pending_write, ln);
3835 continue;
3836 }
3837
3838 /* Since all replicas and replication backlog use global replication
3839 * buffer, to guarantee data accessing thread safe, we must put all
3840 * replicas client into io_threads_list[0] i.e. main thread handles
3841 * sending the output buffer of all replicas. */
3842 if (getClientType(c) == CLIENT_TYPE_SLAVE) {
3843 listAddNodeTail(io_threads_list[0],c);
3844 continue;
3845 }
3846
3847 int target_id = item_id % server.io_threads_num;
3848 listAddNodeTail(io_threads_list[target_id],c);
3849 item_id++;
3850 }
3851
3852 /* Give the start condition to the waiting threads, by setting the
3853 * start condition atomic var. */
3854 io_threads_op = IO_THREADS_OP_WRITE;
3855 for (int j = 1; j < server.io_threads_num; j++) {
3856 int count = listLength(io_threads_list[j]);
3857 setIOPendingCount(j, count);
3858 }
3859
3860 /* Also use the main thread to process a slice of clients. */
3861 listRewind(io_threads_list[0],&li);
3862 while((ln = listNext(&li))) {
3863 client *c = listNodeValue(ln);
3864 writeToClient(c,0);
3865 }
3866 listEmpty(io_threads_list[0]);
3867
3868 /* Wait for all the other threads to end their work. */
3869 while(1) {
3870 unsigned long pending = 0;
3871 for (int j = 1; j < server.io_threads_num; j++)
3872 pending += getIOPendingCount(j);
3873 if (pending == 0) break;
3874 }
3875
3876 io_threads_op = IO_THREADS_OP_IDLE;
3877
3878 /* Run the list of clients again to install the write handler where
3879 * needed. */
3880 listRewind(server.clients_pending_write,&li);
3881 while((ln = listNext(&li))) {
3882 client *c = listNodeValue(ln);
3883
3884 /* Update the client in the mem usage buckets after we're done processing it in the io-threads */
3885 updateClientMemUsageBucket(c);
3886
3887 /* Install the write handler if there are pending writes in some
3888 * of the clients. */
3889 if (clientHasPendingReplies(c) &&
3890 connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
3891 {
3892 freeClientAsync(c);
3893 }
3894 }
3895 listEmpty(server.clients_pending_write);
3896
3897 /* Update processed count on server */
3898 server.stat_io_writes_processed += processed;
3899
3900 return processed;
3901 }
3902
3903 /* Return 1 if we want to handle the client read later using threaded I/O.
3904 * This is called by the readable handler of the event loop.
3905 * As a side effect of calling this function the client is put in the
3906 * pending read clients and flagged as such. */
postponeClientRead(client * c)3907 int postponeClientRead(client *c) {
3908 if (server.io_threads_active &&
3909 server.io_threads_do_reads &&
3910 !ProcessingEventsWhileBlocked &&
3911 !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
3912 io_threads_op == IO_THREADS_OP_IDLE)
3913 {
3914 listAddNodeHead(server.clients_pending_read,c);
3915 c->pending_read_list_node = listFirst(server.clients_pending_read);
3916 return 1;
3917 } else {
3918 return 0;
3919 }
3920 }
3921
3922 /* When threaded I/O is also enabled for the reading + parsing side, the
3923 * readable handler will just put normal clients into a queue of clients to
3924 * process (instead of serving them synchronously). This function runs
3925 * the queue using the I/O threads, and process them in order to accumulate
3926 * the reads in the buffers, and also parse the first command available
3927 * rendering it in the client structures.
3928 * This function achieves thread safety using a fan-out -> fan-in paradigm:
3929 * Fan out: The main thread fans out work to the io-threads which block until
3930 * setIOPendingCount() is called with a value larger than 0 by the main thread.
3931 * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
3932 * it can safely perform post-processing and return to normal synchronous
3933 * work. */
handleClientsWithPendingReadsUsingThreads(void)3934 int handleClientsWithPendingReadsUsingThreads(void) {
3935 if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
3936 int processed = listLength(server.clients_pending_read);
3937 if (processed == 0) return 0;
3938
3939 /* Distribute the clients across N different lists. */
3940 listIter li;
3941 listNode *ln;
3942 listRewind(server.clients_pending_read,&li);
3943 int item_id = 0;
3944 while((ln = listNext(&li))) {
3945 client *c = listNodeValue(ln);
3946 int target_id = item_id % server.io_threads_num;
3947 listAddNodeTail(io_threads_list[target_id],c);
3948 item_id++;
3949 }
3950
3951 /* Give the start condition to the waiting threads, by setting the
3952 * start condition atomic var. */
3953 io_threads_op = IO_THREADS_OP_READ;
3954 for (int j = 1; j < server.io_threads_num; j++) {
3955 int count = listLength(io_threads_list[j]);
3956 setIOPendingCount(j, count);
3957 }
3958
3959 /* Also use the main thread to process a slice of clients. */
3960 listRewind(io_threads_list[0],&li);
3961 while((ln = listNext(&li))) {
3962 client *c = listNodeValue(ln);
3963 readQueryFromClient(c->conn);
3964 }
3965 listEmpty(io_threads_list[0]);
3966
3967 /* Wait for all the other threads to end their work. */
3968 while(1) {
3969 unsigned long pending = 0;
3970 for (int j = 1; j < server.io_threads_num; j++)
3971 pending += getIOPendingCount(j);
3972 if (pending == 0) break;
3973 }
3974
3975 io_threads_op = IO_THREADS_OP_IDLE;
3976
3977 /* Run the list of clients again to process the new buffers. */
3978 while(listLength(server.clients_pending_read)) {
3979 ln = listFirst(server.clients_pending_read);
3980 client *c = listNodeValue(ln);
3981 listDelNode(server.clients_pending_read,ln);
3982 c->pending_read_list_node = NULL;
3983
3984 serverAssert(!(c->flags & CLIENT_BLOCKED));
3985
3986 if (beforeNextClient(c) == C_ERR) {
3987 /* If the client is no longer valid, we avoid
3988 * processing the client later. So we just go
3989 * to the next. */
3990 continue;
3991 }
3992
3993 /* Once io-threads are idle we can update the client in the mem usage buckets */
3994 updateClientMemUsageBucket(c);
3995
3996 if (processPendingCommandsAndResetClient(c) == C_ERR) {
3997 /* If the client is no longer valid, we avoid
3998 * processing the client later. So we just go
3999 * to the next. */
4000 continue;
4001 }
4002
4003 if (processInputBuffer(c) == C_ERR) {
4004 /* If the client is no longer valid, we avoid
4005 * processing the client later. So we just go
4006 * to the next. */
4007 continue;
4008 }
4009
4010 /* We may have pending replies if a thread readQueryFromClient() produced
4011 * replies and did not install a write handler (it can't).
4012 */
4013 if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
4014 clientInstallWriteHandler(c);
4015 }
4016
4017 /* Update processed count on server */
4018 server.stat_io_reads_processed += processed;
4019
4020 return processed;
4021 }
4022
4023 /* Returns the actual client eviction limit based on current configuration or
4024 * 0 if no limit. */
getClientEvictionLimit(void)4025 size_t getClientEvictionLimit(void) {
4026 size_t maxmemory_clients_actual = SIZE_MAX;
4027
4028 /* Handle percentage of maxmemory*/
4029 if (server.maxmemory_clients < 0 && server.maxmemory > 0) {
4030 unsigned long long maxmemory_clients_bytes = (unsigned long long)((double)server.maxmemory * -(double) server.maxmemory_clients / 100);
4031 if (maxmemory_clients_bytes <= SIZE_MAX)
4032 maxmemory_clients_actual = maxmemory_clients_bytes;
4033 }
4034 else if (server.maxmemory_clients > 0)
4035 maxmemory_clients_actual = server.maxmemory_clients;
4036 else
4037 return 0;
4038
4039 /* Don't allow a too small maxmemory-clients to avoid cases where we can't communicate
4040 * at all with the server because of bad configuration */
4041 if (maxmemory_clients_actual < 1024*128)
4042 maxmemory_clients_actual = 1024*128;
4043
4044 return maxmemory_clients_actual;
4045 }
4046
evictClients(void)4047 void evictClients(void) {
4048 /* Start eviction from topmost bucket (largest clients) */
4049 int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1;
4050 listIter bucket_iter;
4051 listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
4052 size_t client_eviction_limit = getClientEvictionLimit();
4053 if (client_eviction_limit == 0)
4054 return;
4055 while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
4056 server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >= client_eviction_limit) {
4057 listNode *ln = listNext(&bucket_iter);
4058 if (ln) {
4059 client *c = ln->value;
4060 sds ci = catClientInfoString(sdsempty(),c);
4061 serverLog(LL_NOTICE, "Evicting client: %s", ci);
4062 freeClient(c);
4063 sdsfree(ci);
4064 server.stat_evictedclients++;
4065 } else {
4066 curr_bucket--;
4067 if (curr_bucket < 0) {
4068 serverLog(LL_WARNING, "Over client maxmemory after evicting all evictable clients");
4069 break;
4070 }
4071 listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
4072 }
4073 }
4074 }
4075