1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31 #include "atomicvar.h"
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <math.h>
35 #include <ctype.h>
36
37 static void setProtocolError(const char *errstr, client *c);
38
39 /* Return the size consumed from the allocator, for the specified SDS string,
40 * including internal fragmentation. This function is used in order to compute
41 * the client output buffer size. */
sdsZmallocSize(sds s)42 size_t sdsZmallocSize(sds s) {
43 void *sh = sdsAllocPtr(s);
44 return zmalloc_size(sh);
45 }
46
47 /* Return the amount of memory used by the sds string at object->ptr
48 * for a string object. */
getStringObjectSdsUsedMemory(robj * o)49 size_t getStringObjectSdsUsedMemory(robj *o) {
50 serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
51 switch(o->encoding) {
52 case OBJ_ENCODING_RAW: return sdsZmallocSize(o->ptr);
53 case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj);
54 default: return 0; /* Just integer encoding for now. */
55 }
56 }
57
58 /* Client.reply list dup and free methods. */
dupClientReplyValue(void * o)59 void *dupClientReplyValue(void *o) {
60 clientReplyBlock *old = o;
61 clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size);
62 memcpy(buf, o, sizeof(clientReplyBlock) + old->size);
63 return buf;
64 }
65
freeClientReplyValue(void * o)66 void freeClientReplyValue(void *o) {
67 zfree(o);
68 }
69
listMatchObjects(void * a,void * b)70 int listMatchObjects(void *a, void *b) {
71 return equalStringObjects(a,b);
72 }
73
74 /* This function links the client to the global linked list of clients.
75 * unlinkClient() does the opposite, among other things. */
linkClient(client * c)76 void linkClient(client *c) {
77 listAddNodeTail(server.clients,c);
78 /* Note that we remember the linked list node where the client is stored,
79 * this way removing the client in unlinkClient() will not require
80 * a linear scan, but just a constant time operation. */
81 c->client_list_node = listLast(server.clients);
82 uint64_t id = htonu64(c->id);
83 raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
84 }
85
createClient(int fd)86 client *createClient(int fd) {
87 client *c = zmalloc(sizeof(client));
88
89 /* passing -1 as fd it is possible to create a non connected client.
90 * This is useful since all the commands needs to be executed
91 * in the context of a client. When commands are executed in other
92 * contexts (for instance a Lua script) we need a non connected client. */
93 if (fd != -1) {
94 anetNonBlock(NULL,fd);
95 anetEnableTcpNoDelay(NULL,fd);
96 if (server.tcpkeepalive)
97 anetKeepAlive(NULL,fd,server.tcpkeepalive);
98 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
99 readQueryFromClient, c) == AE_ERR)
100 {
101 close(fd);
102 zfree(c);
103 return NULL;
104 }
105 }
106
107 selectDb(c,0);
108 uint64_t client_id;
109 atomicGetIncr(server.next_client_id,client_id,1);
110 c->id = client_id;
111 c->fd = fd;
112 c->name = NULL;
113 c->bufpos = 0;
114 c->qb_pos = 0;
115 c->querybuf = sdsempty();
116 c->pending_querybuf = sdsempty();
117 c->querybuf_peak = 0;
118 c->reqtype = 0;
119 c->argc = 0;
120 c->argv = NULL;
121 c->cmd = c->lastcmd = NULL;
122 c->multibulklen = 0;
123 c->bulklen = -1;
124 c->sentlen = 0;
125 c->flags = 0;
126 c->ctime = c->lastinteraction = server.unixtime;
127 c->authenticated = 0;
128 c->replstate = REPL_STATE_NONE;
129 c->repl_put_online_on_ack = 0;
130 c->reploff = 0;
131 c->read_reploff = 0;
132 c->repl_ack_off = 0;
133 c->repl_ack_time = 0;
134 c->slave_listening_port = 0;
135 c->slave_ip[0] = '\0';
136 c->slave_capa = SLAVE_CAPA_NONE;
137 c->reply = listCreate();
138 c->reply_bytes = 0;
139 c->obuf_soft_limit_reached_time = 0;
140 listSetFreeMethod(c->reply,freeClientReplyValue);
141 listSetDupMethod(c->reply,dupClientReplyValue);
142 c->btype = BLOCKED_NONE;
143 c->bpop.timeout = 0;
144 c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
145 c->bpop.target = NULL;
146 c->bpop.xread_group = NULL;
147 c->bpop.xread_consumer = NULL;
148 c->bpop.xread_group_noack = 0;
149 c->bpop.numreplicas = 0;
150 c->bpop.reploffset = 0;
151 c->woff = 0;
152 c->watched_keys = listCreate();
153 c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
154 c->pubsub_patterns = listCreate();
155 c->peerid = NULL;
156 c->client_list_node = NULL;
157 listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
158 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
159 if (fd != -1) linkClient(c);
160 initClientMultiState(c);
161 return c;
162 }
163
164 /* This funciton puts the client in the queue of clients that should write
165 * their output buffers to the socket. Note that it does not *yet* install
166 * the write handler, to start clients are put in a queue of clients that need
167 * to write, so we try to do that before returning in the event loop (see the
168 * handleClientsWithPendingWrites() function).
169 * If we fail and there is more data to write, compared to what the socket
170 * buffers can hold, then we'll really install the handler. */
clientInstallWriteHandler(client * c)171 void clientInstallWriteHandler(client *c) {
172 /* Schedule the client to write the output buffers to the socket only
173 * if not already done and, for slaves, if the slave can actually receive
174 * writes at this stage. */
175 if (!(c->flags & CLIENT_PENDING_WRITE) &&
176 (c->replstate == REPL_STATE_NONE ||
177 (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
178 {
179 /* Here instead of installing the write handler, we just flag the
180 * client and put it into a list of clients that have something
181 * to write to the socket. This way before re-entering the event
182 * loop, we can try to directly write to the client sockets avoiding
183 * a system call. We'll only really install the write handler if
184 * we'll not be able to write the whole reply at once. */
185 c->flags |= CLIENT_PENDING_WRITE;
186 listAddNodeHead(server.clients_pending_write,c);
187 }
188 }
189
190 /* This function is called every time we are going to transmit new data
191 * to the client. The behavior is the following:
192 *
193 * If the client should receive new data (normal clients will) the function
194 * returns C_OK, and make sure to install the write handler in our event
195 * loop so that when the socket is writable new data gets written.
196 *
197 * If the client should not receive new data, because it is a fake client
198 * (used to load AOF in memory), a master or because the setup of the write
199 * handler failed, the function returns C_ERR.
200 *
201 * The function may return C_OK without actually installing the write
202 * event handler in the following cases:
203 *
204 * 1) The event handler should already be installed since the output buffer
205 * already contains something.
206 * 2) The client is a slave but not yet online, so we want to just accumulate
207 * writes in the buffer but not actually sending them yet.
208 *
209 * Typically gets called every time a reply is built, before adding more
210 * data to the clients output buffers. If the function returns C_ERR no
211 * data should be appended to the output buffers. */
prepareClientToWrite(client * c)212 int prepareClientToWrite(client *c) {
213 /* If it's the Lua client we always return ok without installing any
214 * handler since there is no socket at all. */
215 if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
216
217 /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
218 if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
219
220 /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
221 * is set. */
222 if ((c->flags & CLIENT_MASTER) &&
223 !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
224
225 if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */
226
227 /* Schedule the client to write the output buffers to the socket, unless
228 * it should already be setup to do so (it has already pending data). */
229 if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);
230
231 /* Authorize the caller to queue in the output buffer of this client. */
232 return C_OK;
233 }
234
235 /* -----------------------------------------------------------------------------
236 * Low level functions to add more data to output buffers.
237 * -------------------------------------------------------------------------- */
238
_addReplyToBuffer(client * c,const char * s,size_t len)239 int _addReplyToBuffer(client *c, const char *s, size_t len) {
240 size_t available = sizeof(c->buf)-c->bufpos;
241
242 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
243
244 /* If there already are entries in the reply list, we cannot
245 * add anything more to the static buffer. */
246 if (listLength(c->reply) > 0) return C_ERR;
247
248 /* Check that the buffer has enough space available for this string. */
249 if (len > available) return C_ERR;
250
251 memcpy(c->buf+c->bufpos,s,len);
252 c->bufpos+=len;
253 return C_OK;
254 }
255
_addReplyStringToList(client * c,const char * s,size_t len)256 void _addReplyStringToList(client *c, const char *s, size_t len) {
257 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
258
259 listNode *ln = listLast(c->reply);
260 clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
261
262 /* Note that 'tail' may be NULL even if we have a tail node, becuase when
263 * addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just
264 * fo fill it later, when the size of the bulk length is set. */
265
266 /* Append to tail string when possible. */
267 if (tail) {
268 /* Copy the part we can fit into the tail, and leave the rest for a
269 * new node */
270 size_t avail = tail->size - tail->used;
271 size_t copy = avail >= len? len: avail;
272 memcpy(tail->buf + tail->used, s, copy);
273 tail->used += copy;
274 s += copy;
275 len -= copy;
276 }
277 if (len) {
278 /* Create a new node, make sure it is allocated to at
279 * least PROTO_REPLY_CHUNK_BYTES */
280 size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
281 tail = zmalloc(size + sizeof(clientReplyBlock));
282 /* take over the allocation's internal fragmentation */
283 tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
284 tail->used = len;
285 memcpy(tail->buf, s, len);
286 listAddNodeTail(c->reply, tail);
287 c->reply_bytes += tail->size;
288 }
289 asyncCloseClientOnOutputBufferLimitReached(c);
290 }
291
292 /* -----------------------------------------------------------------------------
293 * Higher level functions to queue data on the client output buffer.
294 * The following functions are the ones that commands implementations will call.
295 * -------------------------------------------------------------------------- */
296
297 /* Add the object 'obj' string representation to the client output buffer. */
addReply(client * c,robj * obj)298 void addReply(client *c, robj *obj) {
299 if (prepareClientToWrite(c) != C_OK) return;
300
301 if (sdsEncodedObject(obj)) {
302 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
303 _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
304 } else if (obj->encoding == OBJ_ENCODING_INT) {
305 /* For integer encoded strings we just convert it into a string
306 * using our optimized function, and attach the resulting string
307 * to the output buffer. */
308 char buf[32];
309 size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
310 if (_addReplyToBuffer(c,buf,len) != C_OK)
311 _addReplyStringToList(c,buf,len);
312 } else {
313 serverPanic("Wrong obj->encoding in addReply()");
314 }
315 }
316
317 /* Add the SDS 's' string to the client output buffer, as a side effect
318 * the SDS string is freed. */
addReplySds(client * c,sds s)319 void addReplySds(client *c, sds s) {
320 if (prepareClientToWrite(c) != C_OK) {
321 /* The caller expects the sds to be free'd. */
322 sdsfree(s);
323 return;
324 }
325 if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK)
326 _addReplyStringToList(c,s,sdslen(s));
327 sdsfree(s);
328 }
329
330 /* This low level function just adds whatever protocol you send it to the
331 * client buffer, trying the static buffer initially, and using the string
332 * of objects if not possible.
333 *
334 * It is efficient because does not create an SDS object nor an Redis object
335 * if not needed. The object will only be created by calling
336 * _addReplyStringToList() if we fail to extend the existing tail object
337 * in the list of objects. */
addReplyString(client * c,const char * s,size_t len)338 void addReplyString(client *c, const char *s, size_t len) {
339 if (prepareClientToWrite(c) != C_OK) return;
340 if (_addReplyToBuffer(c,s,len) != C_OK)
341 _addReplyStringToList(c,s,len);
342 }
343
344 /* Low level function called by the addReplyError...() functions.
345 * It emits the protocol for a Redis error, in the form:
346 *
347 * -ERRORCODE Error Message<CR><LF>
348 *
349 * If the error code is already passed in the string 's', the error
350 * code provided is used, otherwise the string "-ERR " for the generic
351 * error code is automatically added. */
addReplyErrorLength(client * c,const char * s,size_t len)352 void addReplyErrorLength(client *c, const char *s, size_t len) {
353 /* If the string already starts with "-..." then the error code
354 * is provided by the caller. Otherwise we use "-ERR". */
355 if (!len || s[0] != '-') addReplyString(c,"-ERR ",5);
356 addReplyString(c,s,len);
357 addReplyString(c,"\r\n",2);
358
359 /* Sometimes it could be normal that a slave replies to a master with
360 * an error and this function gets called. Actually the error will never
361 * be sent because addReply*() against master clients has no effect...
362 * A notable example is:
363 *
364 * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x
365 *
366 * Where the master must propagate the first change even if the second
367 * will produce an error. However it is useful to log such events since
368 * they are rare and may hint at errors in a script or a bug in Redis. */
369 if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
370 char* to = c->flags & CLIENT_MASTER? "master": "replica";
371 char* from = c->flags & CLIENT_MASTER? "replica": "master";
372 char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
373 serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
374 "to its %s: '%s' after processing the command "
375 "'%s'", from, to, s, cmdname);
376 }
377 }
378
addReplyError(client * c,const char * err)379 void addReplyError(client *c, const char *err) {
380 addReplyErrorLength(c,err,strlen(err));
381 }
382
addReplyErrorFormat(client * c,const char * fmt,...)383 void addReplyErrorFormat(client *c, const char *fmt, ...) {
384 size_t l, j;
385 va_list ap;
386 va_start(ap,fmt);
387 sds s = sdscatvprintf(sdsempty(),fmt,ap);
388 va_end(ap);
389 /* Make sure there are no newlines in the string, otherwise invalid protocol
390 * is emitted. */
391 l = sdslen(s);
392 for (j = 0; j < l; j++) {
393 if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
394 }
395 addReplyErrorLength(c,s,sdslen(s));
396 sdsfree(s);
397 }
398
addReplyStatusLength(client * c,const char * s,size_t len)399 void addReplyStatusLength(client *c, const char *s, size_t len) {
400 addReplyString(c,"+",1);
401 addReplyString(c,s,len);
402 addReplyString(c,"\r\n",2);
403 }
404
addReplyStatus(client * c,const char * status)405 void addReplyStatus(client *c, const char *status) {
406 addReplyStatusLength(c,status,strlen(status));
407 }
408
addReplyStatusFormat(client * c,const char * fmt,...)409 void addReplyStatusFormat(client *c, const char *fmt, ...) {
410 va_list ap;
411 va_start(ap,fmt);
412 sds s = sdscatvprintf(sdsempty(),fmt,ap);
413 va_end(ap);
414 addReplyStatusLength(c,s,sdslen(s));
415 sdsfree(s);
416 }
417
418 /* Adds an empty object to the reply list that will contain the multi bulk
419 * length, which is not known when this function is called. */
addDeferredMultiBulkLength(client * c)420 void *addDeferredMultiBulkLength(client *c) {
421 /* Note that we install the write event here even if the object is not
422 * ready to be sent, since we are sure that before returning to the
423 * event loop setDeferredMultiBulkLength() will be called. */
424 if (prepareClientToWrite(c) != C_OK) return NULL;
425 listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
426 return listLast(c->reply);
427 }
428
429 /* Populate the length object and try gluing it to the next chunk. */
setDeferredMultiBulkLength(client * c,void * node,long length)430 void setDeferredMultiBulkLength(client *c, void *node, long length) {
431 listNode *ln = (listNode*)node;
432 clientReplyBlock *next;
433 char lenstr[128];
434 size_t lenstr_len = sprintf(lenstr, "*%ld\r\n", length);
435
436 /* Abort when *node is NULL: when the client should not accept writes
437 * we return NULL in addDeferredMultiBulkLength() */
438 if (node == NULL) return;
439 serverAssert(!listNodeValue(ln));
440
441 /* Normally we fill this dummy NULL node, added by addDeferredMultiBulkLength(),
442 * with a new buffer structure containing the protocol needed to specify
443 * the length of the array following. However sometimes when there is
444 * little memory to move, we may instead remove this NULL node, and prefix
445 * our protocol in the node immediately after to it, in order to save a
446 * write(2) syscall later. Conditions needed to do it:
447 *
448 * - The next node is non-NULL,
449 * - It has enough room already allocated
450 * - And not too large (avoid large memmove) */
451 if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
452 next->size - next->used >= lenstr_len &&
453 next->used < PROTO_REPLY_CHUNK_BYTES * 4) {
454 memmove(next->buf + lenstr_len, next->buf, next->used);
455 memcpy(next->buf, lenstr, lenstr_len);
456 next->used += lenstr_len;
457 listDelNode(c->reply,ln);
458 } else {
459 /* Create a new node */
460 clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock));
461 /* Take over the allocation's internal fragmentation */
462 buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock);
463 buf->used = lenstr_len;
464 memcpy(buf->buf, lenstr, lenstr_len);
465 listNodeValue(ln) = buf;
466 c->reply_bytes += buf->size;
467 }
468 asyncCloseClientOnOutputBufferLimitReached(c);
469 }
470
471 /* Add a double as a bulk reply */
addReplyDouble(client * c,double d)472 void addReplyDouble(client *c, double d) {
473 char dbuf[128], sbuf[128];
474 int dlen, slen;
475 if (isinf(d)) {
476 /* Libc in odd systems (Hi Solaris!) will format infinite in a
477 * different way, so better to handle it in an explicit way. */
478 addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
479 } else {
480 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
481 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
482 addReplyString(c,sbuf,slen);
483 }
484 }
485
486 /* Add a long double as a bulk reply, but uses a human readable formatting
487 * of the double instead of exposing the crude behavior of doubles to the
488 * dear user. */
addReplyHumanLongDouble(client * c,long double d)489 void addReplyHumanLongDouble(client *c, long double d) {
490 robj *o = createStringObjectFromLongDouble(d,1);
491 addReplyBulk(c,o);
492 decrRefCount(o);
493 }
494
495 /* Add a long long as integer reply or bulk len / multi bulk count.
496 * Basically this is used to output <prefix><long long><crlf>. */
addReplyLongLongWithPrefix(client * c,long long ll,char prefix)497 void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
498 char buf[128];
499 int len;
500
501 /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
502 * so we have a few shared objects to use if the integer is small
503 * like it is most of the times. */
504 if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
505 addReply(c,shared.mbulkhdr[ll]);
506 return;
507 } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
508 addReply(c,shared.bulkhdr[ll]);
509 return;
510 }
511
512 buf[0] = prefix;
513 len = ll2string(buf+1,sizeof(buf)-1,ll);
514 buf[len+1] = '\r';
515 buf[len+2] = '\n';
516 addReplyString(c,buf,len+3);
517 }
518
addReplyLongLong(client * c,long long ll)519 void addReplyLongLong(client *c, long long ll) {
520 if (ll == 0)
521 addReply(c,shared.czero);
522 else if (ll == 1)
523 addReply(c,shared.cone);
524 else
525 addReplyLongLongWithPrefix(c,ll,':');
526 }
527
addReplyMultiBulkLen(client * c,long length)528 void addReplyMultiBulkLen(client *c, long length) {
529 if (length < OBJ_SHARED_BULKHDR_LEN)
530 addReply(c,shared.mbulkhdr[length]);
531 else
532 addReplyLongLongWithPrefix(c,length,'*');
533 }
534
535 /* Create the length prefix of a bulk reply, example: $2234 */
addReplyBulkLen(client * c,robj * obj)536 void addReplyBulkLen(client *c, robj *obj) {
537 size_t len;
538
539 if (sdsEncodedObject(obj)) {
540 len = sdslen(obj->ptr);
541 } else {
542 long n = (long)obj->ptr;
543
544 /* Compute how many bytes will take this integer as a radix 10 string */
545 len = 1;
546 if (n < 0) {
547 len++;
548 n = -n;
549 }
550 while((n = n/10) != 0) {
551 len++;
552 }
553 }
554
555 if (len < OBJ_SHARED_BULKHDR_LEN)
556 addReply(c,shared.bulkhdr[len]);
557 else
558 addReplyLongLongWithPrefix(c,len,'$');
559 }
560
561 /* Add a Redis Object as a bulk reply */
addReplyBulk(client * c,robj * obj)562 void addReplyBulk(client *c, robj *obj) {
563 addReplyBulkLen(c,obj);
564 addReply(c,obj);
565 addReply(c,shared.crlf);
566 }
567
568 /* Add a C buffer as bulk reply */
addReplyBulkCBuffer(client * c,const void * p,size_t len)569 void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
570 addReplyLongLongWithPrefix(c,len,'$');
571 addReplyString(c,p,len);
572 addReply(c,shared.crlf);
573 }
574
575 /* Add sds to reply (takes ownership of sds and frees it) */
addReplyBulkSds(client * c,sds s)576 void addReplyBulkSds(client *c, sds s) {
577 addReplyLongLongWithPrefix(c,sdslen(s),'$');
578 addReplySds(c,s);
579 addReply(c,shared.crlf);
580 }
581
582 /* Add a C null term string as bulk reply */
addReplyBulkCString(client * c,const char * s)583 void addReplyBulkCString(client *c, const char *s) {
584 if (s == NULL) {
585 addReply(c,shared.nullbulk);
586 } else {
587 addReplyBulkCBuffer(c,s,strlen(s));
588 }
589 }
590
591 /* Add a long long as a bulk reply */
addReplyBulkLongLong(client * c,long long ll)592 void addReplyBulkLongLong(client *c, long long ll) {
593 char buf[64];
594 int len;
595
596 len = ll2string(buf,64,ll);
597 addReplyBulkCBuffer(c,buf,len);
598 }
599
600 /* Add an array of C strings as status replies with a heading.
601 * This function is typically invoked by from commands that support
602 * subcommands in response to the 'help' subcommand. The help array
603 * is terminated by NULL sentinel. */
addReplyHelp(client * c,const char ** help)604 void addReplyHelp(client *c, const char **help) {
605 sds cmd = sdsnew((char*) c->argv[0]->ptr);
606 void *blenp = addDeferredMultiBulkLength(c);
607 int blen = 0;
608
609 sdstoupper(cmd);
610 addReplyStatusFormat(c,
611 "%s <subcommand> arg arg ... arg. Subcommands are:",cmd);
612 sdsfree(cmd);
613
614 while (help[blen]) addReplyStatus(c,help[blen++]);
615
616 blen++; /* Account for the header line(s). */
617 setDeferredMultiBulkLength(c,blenp,blen);
618 }
619
620 /* Add a suggestive error reply.
621 * This function is typically invoked by from commands that support
622 * subcommands in response to an unknown subcommand or argument error. */
addReplySubcommandSyntaxError(client * c)623 void addReplySubcommandSyntaxError(client *c) {
624 sds cmd = sdsnew((char*) c->argv[0]->ptr);
625 sdstoupper(cmd);
626 addReplyErrorFormat(c,
627 "Unknown subcommand or wrong number of arguments for '%s'. Try %s HELP.",
628 (char*)c->argv[1]->ptr,cmd);
629 sdsfree(cmd);
630 }
631
632 /* Append 'src' client output buffers into 'dst' client output buffers.
633 * This function clears the output buffers of 'src' */
AddReplyFromClient(client * dst,client * src)634 void AddReplyFromClient(client *dst, client *src) {
635 if (prepareClientToWrite(dst) != C_OK)
636 return;
637 addReplyString(dst,src->buf, src->bufpos);
638 if (listLength(src->reply))
639 listJoin(dst->reply,src->reply);
640 dst->reply_bytes += src->reply_bytes;
641 src->reply_bytes = 0;
642 src->bufpos = 0;
643 }
644
645 /* Copy 'src' client output buffers into 'dst' client output buffers.
646 * The function takes care of freeing the old output buffers of the
647 * destination client. */
copyClientOutputBuffer(client * dst,client * src)648 void copyClientOutputBuffer(client *dst, client *src) {
649 listRelease(dst->reply);
650 dst->sentlen = 0;
651 dst->reply = listDup(src->reply);
652 memcpy(dst->buf,src->buf,src->bufpos);
653 dst->bufpos = src->bufpos;
654 dst->reply_bytes = src->reply_bytes;
655 }
656
657 /* Return true if the specified client has pending reply buffers to write to
658 * the socket. */
clientHasPendingReplies(client * c)659 int clientHasPendingReplies(client *c) {
660 return c->bufpos || listLength(c->reply);
661 }
662
663 #define MAX_ACCEPTS_PER_CALL 1000
acceptCommonHandler(int fd,int flags,char * ip)664 static void acceptCommonHandler(int fd, int flags, char *ip) {
665 client *c;
666 if ((c = createClient(fd)) == NULL) {
667 serverLog(LL_WARNING,
668 "Error registering fd event for the new client: %s (fd=%d)",
669 strerror(errno),fd);
670 close(fd); /* May be already closed, just ignore errors */
671 return;
672 }
673 /* If maxclient directive is set and this is one client more... close the
674 * connection. Note that we create the client instead to check before
675 * for this condition, since now the socket is already set in non-blocking
676 * mode and we can send an error for free using the Kernel I/O */
677 if (listLength(server.clients) > server.maxclients) {
678 char *err = "-ERR max number of clients reached\r\n";
679
680 /* That's a best effort error message, don't check write errors */
681 if (write(c->fd,err,strlen(err)) == -1) {
682 /* Nothing to do, Just to avoid the warning... */
683 }
684 server.stat_rejected_conn++;
685 freeClient(c);
686 return;
687 }
688
689 /* If the server is running in protected mode (the default) and there
690 * is no password set, nor a specific interface is bound, we don't accept
691 * requests from non loopback interfaces. Instead we try to explain the
692 * user what to do to fix it if needed. */
693 if (server.protected_mode &&
694 server.bindaddr_count == 0 &&
695 server.requirepass == NULL &&
696 !(flags & CLIENT_UNIX_SOCKET) &&
697 ip != NULL)
698 {
699 if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
700 char *err =
701 "-DENIED Redis is running in protected mode because protected "
702 "mode is enabled, no bind address was specified, no "
703 "authentication password is requested to clients. In this mode "
704 "connections are only accepted from the loopback interface. "
705 "If you want to connect from external computers to Redis you "
706 "may adopt one of the following solutions: "
707 "1) Just disable protected mode sending the command "
708 "'CONFIG SET protected-mode no' from the loopback interface "
709 "by connecting to Redis from the same host the server is "
710 "running, however MAKE SURE Redis is not publicly accessible "
711 "from internet if you do so. Use CONFIG REWRITE to make this "
712 "change permanent. "
713 "2) Alternatively you can just disable the protected mode by "
714 "editing the Redis configuration file, and setting the protected "
715 "mode option to 'no', and then restarting the server. "
716 "3) If you started the server manually just for testing, restart "
717 "it with the '--protected-mode no' option. "
718 "4) Setup a bind address or an authentication password. "
719 "NOTE: You only need to do one of the above things in order for "
720 "the server to start accepting connections from the outside.\r\n";
721 if (write(c->fd,err,strlen(err)) == -1) {
722 /* Nothing to do, Just to avoid the warning... */
723 }
724 server.stat_rejected_conn++;
725 freeClient(c);
726 return;
727 }
728 }
729
730 server.stat_numconnections++;
731 c->flags |= flags;
732 }
733
acceptTcpHandler(aeEventLoop * el,int fd,void * privdata,int mask)734 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
735 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
736 char cip[NET_IP_STR_LEN];
737 UNUSED(el);
738 UNUSED(mask);
739 UNUSED(privdata);
740
741 while(max--) {
742 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
743 if (cfd == ANET_ERR) {
744 if (errno != EWOULDBLOCK)
745 serverLog(LL_WARNING,
746 "Accepting client connection: %s", server.neterr);
747 return;
748 }
749 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
750 acceptCommonHandler(cfd,0,cip);
751 }
752 }
753
acceptUnixHandler(aeEventLoop * el,int fd,void * privdata,int mask)754 void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
755 int cfd, max = MAX_ACCEPTS_PER_CALL;
756 UNUSED(el);
757 UNUSED(mask);
758 UNUSED(privdata);
759
760 while(max--) {
761 cfd = anetUnixAccept(server.neterr, fd);
762 if (cfd == ANET_ERR) {
763 if (errno != EWOULDBLOCK)
764 serverLog(LL_WARNING,
765 "Accepting client connection: %s", server.neterr);
766 return;
767 }
768 serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
769 acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL);
770 }
771 }
772
freeClientArgv(client * c)773 static void freeClientArgv(client *c) {
774 int j;
775 for (j = 0; j < c->argc; j++)
776 decrRefCount(c->argv[j]);
777 c->argc = 0;
778 c->cmd = NULL;
779 }
780
781 /* Close all the slaves connections. This is useful in chained replication
782 * when we resync with our own master and want to force all our slaves to
783 * resync with us as well. */
disconnectSlaves(void)784 void disconnectSlaves(void) {
785 while (listLength(server.slaves)) {
786 listNode *ln = listFirst(server.slaves);
787 freeClient((client*)ln->value);
788 }
789 }
790
791 /* Remove the specified client from global lists where the client could
792 * be referenced, not including the Pub/Sub channels.
793 * This is used by freeClient() and replicationCacheMaster(). */
unlinkClient(client * c)794 void unlinkClient(client *c) {
795 listNode *ln;
796
797 /* If this is marked as current client unset it. */
798 if (server.current_client == c) server.current_client = NULL;
799
800 /* Certain operations must be done only if the client has an active socket.
801 * If the client was already unlinked or if it's a "fake client" the
802 * fd is already set to -1. */
803 if (c->fd != -1) {
804 /* Remove from the list of active clients. */
805 if (c->client_list_node) {
806 uint64_t id = htonu64(c->id);
807 raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL);
808 listDelNode(server.clients,c->client_list_node);
809 c->client_list_node = NULL;
810 }
811
812 /* In the case of diskless replication the fork is writing to the
813 * sockets and just closing the fd isn't enough, if we don't also
814 * shutdown the socket the fork will continue to write to the slave
815 * and the salve will only find out that it was disconnected when
816 * it will finish reading the rdb. */
817 if ((c->flags & CLIENT_SLAVE) &&
818 (c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)) {
819 shutdown(c->fd, SHUT_RDWR);
820 }
821
822 /* Unregister async I/O handlers and close the socket. */
823 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
824 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
825 close(c->fd);
826 c->fd = -1;
827 }
828
829 /* Remove from the list of pending writes if needed. */
830 if (c->flags & CLIENT_PENDING_WRITE) {
831 ln = listSearchKey(server.clients_pending_write,c);
832 serverAssert(ln != NULL);
833 listDelNode(server.clients_pending_write,ln);
834 c->flags &= ~CLIENT_PENDING_WRITE;
835 }
836
837 /* When client was just unblocked because of a blocking operation,
838 * remove it from the list of unblocked clients. */
839 if (c->flags & CLIENT_UNBLOCKED) {
840 ln = listSearchKey(server.unblocked_clients,c);
841 serverAssert(ln != NULL);
842 listDelNode(server.unblocked_clients,ln);
843 c->flags &= ~CLIENT_UNBLOCKED;
844 }
845 }
846
freeClient(client * c)847 void freeClient(client *c) {
848 listNode *ln;
849
850 /* If a client is protected, yet we need to free it right now, make sure
851 * to at least use asynchronous freeing. */
852 if (c->flags & CLIENT_PROTECTED) {
853 freeClientAsync(c);
854 return;
855 }
856
857 /* If it is our master that's beging disconnected we should make sure
858 * to cache the state to try a partial resynchronization later.
859 *
860 * Note that before doing this we make sure that the client is not in
861 * some unexpected state, by checking its flags. */
862 if (server.master && c->flags & CLIENT_MASTER) {
863 serverLog(LL_WARNING,"Connection with master lost.");
864 if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
865 CLIENT_CLOSE_ASAP|
866 CLIENT_BLOCKED)))
867 {
868 replicationCacheMaster(c);
869 return;
870 }
871 }
872
873 /* Log link disconnection with slave */
874 if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
875 serverLog(LL_WARNING,"Connection with replica %s lost.",
876 replicationGetSlaveName(c));
877 }
878
879 /* Free the query buffer */
880 sdsfree(c->querybuf);
881 sdsfree(c->pending_querybuf);
882 c->querybuf = NULL;
883
884 /* Deallocate structures used to block on blocking ops. */
885 if (c->flags & CLIENT_BLOCKED) unblockClient(c);
886 dictRelease(c->bpop.keys);
887
888 /* UNWATCH all the keys */
889 unwatchAllKeys(c);
890 listRelease(c->watched_keys);
891
892 /* Unsubscribe from all the pubsub channels */
893 pubsubUnsubscribeAllChannels(c,0);
894 pubsubUnsubscribeAllPatterns(c,0);
895 dictRelease(c->pubsub_channels);
896 listRelease(c->pubsub_patterns);
897
898 /* Free data structures. */
899 listRelease(c->reply);
900 freeClientArgv(c);
901
902 /* Unlink the client: this will close the socket, remove the I/O
903 * handlers, and remove references of the client from different
904 * places where active clients may be referenced. */
905 unlinkClient(c);
906
907 /* Master/slave cleanup Case 1:
908 * we lost the connection with a slave. */
909 if (c->flags & CLIENT_SLAVE) {
910 if (c->replstate == SLAVE_STATE_SEND_BULK) {
911 if (c->repldbfd != -1) close(c->repldbfd);
912 if (c->replpreamble) sdsfree(c->replpreamble);
913 }
914 list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
915 ln = listSearchKey(l,c);
916 serverAssert(ln != NULL);
917 listDelNode(l,ln);
918 /* We need to remember the time when we started to have zero
919 * attached slaves, as after some time we'll free the replication
920 * backlog. */
921 if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0)
922 server.repl_no_slaves_since = server.unixtime;
923 refreshGoodSlavesCount();
924 }
925
926 /* Master/slave cleanup Case 2:
927 * we lost the connection with the master. */
928 if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
929
930 /* If this client was scheduled for async freeing we need to remove it
931 * from the queue. */
932 if (c->flags & CLIENT_CLOSE_ASAP) {
933 ln = listSearchKey(server.clients_to_close,c);
934 serverAssert(ln != NULL);
935 listDelNode(server.clients_to_close,ln);
936 }
937
938 /* Release other dynamically allocated client structure fields,
939 * and finally release the client structure itself. */
940 if (c->name) decrRefCount(c->name);
941 zfree(c->argv);
942 freeClientMultiState(c);
943 sdsfree(c->peerid);
944 zfree(c);
945 }
946
947 /* Schedule a client to free it at a safe time in the serverCron() function.
948 * This function is useful when we need to terminate a client but we are in
949 * a context where calling freeClient() is not possible, because the client
950 * should be valid for the continuation of the flow of the program. */
freeClientAsync(client * c)951 void freeClientAsync(client *c) {
952 if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
953 c->flags |= CLIENT_CLOSE_ASAP;
954 listAddNodeTail(server.clients_to_close,c);
955 }
956
freeClientsInAsyncFreeQueue(void)957 void freeClientsInAsyncFreeQueue(void) {
958 while (listLength(server.clients_to_close)) {
959 listNode *ln = listFirst(server.clients_to_close);
960 client *c = listNodeValue(ln);
961
962 c->flags &= ~CLIENT_CLOSE_ASAP;
963 freeClient(c);
964 listDelNode(server.clients_to_close,ln);
965 }
966 }
967
968 /* Return a client by ID, or NULL if the client ID is not in the set
969 * of registered clients. Note that "fake clients", created with -1 as FD,
970 * are not registered clients. */
lookupClientByID(uint64_t id)971 client *lookupClientByID(uint64_t id) {
972 id = htonu64(id);
973 client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id));
974 return (c == raxNotFound) ? NULL : c;
975 }
976
977 /* Write data in output buffers to client. Return C_OK if the client
978 * is still valid after the call, C_ERR if it was freed. */
writeToClient(int fd,client * c,int handler_installed)979 int writeToClient(int fd, client *c, int handler_installed) {
980 ssize_t nwritten = 0, totwritten = 0;
981 size_t objlen;
982 clientReplyBlock *o;
983
984 while(clientHasPendingReplies(c)) {
985 if (c->bufpos > 0) {
986 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
987 if (nwritten <= 0) break;
988 c->sentlen += nwritten;
989 totwritten += nwritten;
990
991 /* If the buffer was sent, set bufpos to zero to continue with
992 * the remainder of the reply. */
993 if ((int)c->sentlen == c->bufpos) {
994 c->bufpos = 0;
995 c->sentlen = 0;
996 }
997 } else {
998 o = listNodeValue(listFirst(c->reply));
999 objlen = o->used;
1000
1001 if (objlen == 0) {
1002 c->reply_bytes -= o->size;
1003 listDelNode(c->reply,listFirst(c->reply));
1004 continue;
1005 }
1006
1007 nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
1008 if (nwritten <= 0) break;
1009 c->sentlen += nwritten;
1010 totwritten += nwritten;
1011
1012 /* If we fully sent the object on head go to the next one */
1013 if (c->sentlen == objlen) {
1014 c->reply_bytes -= o->size;
1015 listDelNode(c->reply,listFirst(c->reply));
1016 c->sentlen = 0;
1017 /* If there are no longer objects in the list, we expect
1018 * the count of reply bytes to be exactly zero. */
1019 if (listLength(c->reply) == 0)
1020 serverAssert(c->reply_bytes == 0);
1021 }
1022 }
1023 /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
1024 * bytes, in a single threaded server it's a good idea to serve
1025 * other clients as well, even if a very large request comes from
1026 * super fast link that is always able to accept data (in real world
1027 * scenario think about 'KEYS *' against the loopback interface).
1028 *
1029 * However if we are over the maxmemory limit we ignore that and
1030 * just deliver as much data as it is possible to deliver.
1031 *
1032 * Moreover, we also send as much as possible if the client is
1033 * a slave or a monitor (otherwise, on high-speed traffic, the
1034 * replication/output buffer will grow indefinitely) */
1035 if (totwritten > NET_MAX_WRITES_PER_EVENT &&
1036 (server.maxmemory == 0 ||
1037 zmalloc_used_memory() < server.maxmemory) &&
1038 !(c->flags & CLIENT_SLAVE)) break;
1039 }
1040 server.stat_net_output_bytes += totwritten;
1041 if (nwritten == -1) {
1042 if (errno == EAGAIN) {
1043 nwritten = 0;
1044 } else {
1045 serverLog(LL_VERBOSE,
1046 "Error writing to client: %s", strerror(errno));
1047 freeClient(c);
1048 return C_ERR;
1049 }
1050 }
1051 if (totwritten > 0) {
1052 /* For clients representing masters we don't count sending data
1053 * as an interaction, since we always send REPLCONF ACK commands
1054 * that take some time to just fill the socket output buffer.
1055 * We just rely on data / pings received for timeout detection. */
1056 if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
1057 }
1058 if (!clientHasPendingReplies(c)) {
1059 c->sentlen = 0;
1060 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1061
1062 /* Close connection after entire reply has been sent. */
1063 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
1064 freeClient(c);
1065 return C_ERR;
1066 }
1067 }
1068 return C_OK;
1069 }
1070
1071 /* Write event handler. Just send data to the client. */
sendReplyToClient(aeEventLoop * el,int fd,void * privdata,int mask)1072 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1073 UNUSED(el);
1074 UNUSED(mask);
1075 writeToClient(fd,privdata,1);
1076 }
1077
1078 /* This function is called just before entering the event loop, in the hope
1079 * we can just write the replies to the client output buffer without any
1080 * need to use a syscall in order to install the writable event handler,
1081 * get it called, and so forth. */
handleClientsWithPendingWrites(void)1082 int handleClientsWithPendingWrites(void) {
1083 listIter li;
1084 listNode *ln;
1085 int processed = listLength(server.clients_pending_write);
1086
1087 listRewind(server.clients_pending_write,&li);
1088 while((ln = listNext(&li))) {
1089 client *c = listNodeValue(ln);
1090 c->flags &= ~CLIENT_PENDING_WRITE;
1091 listDelNode(server.clients_pending_write,ln);
1092
1093 /* If a client is protected, don't do anything,
1094 * that may trigger write error or recreate handler. */
1095 if (c->flags & CLIENT_PROTECTED) continue;
1096
1097 /* Try to write buffers to the client socket. */
1098 if (writeToClient(c->fd,c,0) == C_ERR) continue;
1099
1100 /* If after the synchronous writes above we still have data to
1101 * output to the client, we need to install the writable handler. */
1102 if (clientHasPendingReplies(c)) {
1103 int ae_flags = AE_WRITABLE;
1104 /* For the fsync=always policy, we want that a given FD is never
1105 * served for reading and writing in the same event loop iteration,
1106 * so that in the middle of receiving the query, and serving it
1107 * to the client, we'll call beforeSleep() that will do the
1108 * actual fsync of AOF to disk. AE_BARRIER ensures that. */
1109 if (server.aof_state == AOF_ON &&
1110 server.aof_fsync == AOF_FSYNC_ALWAYS)
1111 {
1112 ae_flags |= AE_BARRIER;
1113 }
1114 if (aeCreateFileEvent(server.el, c->fd, ae_flags,
1115 sendReplyToClient, c) == AE_ERR)
1116 {
1117 freeClientAsync(c);
1118 }
1119 }
1120 }
1121 return processed;
1122 }
1123
1124 /* resetClient prepare the client to process the next command */
resetClient(client * c)1125 void resetClient(client *c) {
1126 redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
1127
1128 freeClientArgv(c);
1129 c->reqtype = 0;
1130 c->multibulklen = 0;
1131 c->bulklen = -1;
1132
1133 /* We clear the ASKING flag as well if we are not inside a MULTI, and
1134 * if what we just executed is not the ASKING command itself. */
1135 if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
1136 c->flags &= ~CLIENT_ASKING;
1137
1138 /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
1139 * to the next command will be sent, but set the flag if the command
1140 * we just processed was "CLIENT REPLY SKIP". */
1141 c->flags &= ~CLIENT_REPLY_SKIP;
1142 if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
1143 c->flags |= CLIENT_REPLY_SKIP;
1144 c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
1145 }
1146 }
1147
1148 /* This funciton is used when we want to re-enter the event loop but there
1149 * is the risk that the client we are dealing with will be freed in some
1150 * way. This happens for instance in:
1151 *
1152 * * DEBUG RELOAD and similar.
1153 * * When a Lua script is in -BUSY state.
1154 *
1155 * So the function will protect the client by doing two things:
1156 *
1157 * 1) It removes the file events. This way it is not possible that an
1158 * error is signaled on the socket, freeing the client.
1159 * 2) Moreover it makes sure that if the client is freed in a different code
1160 * path, it is not really released, but only marked for later release. */
protectClient(client * c)1161 void protectClient(client *c) {
1162 c->flags |= CLIENT_PROTECTED;
1163 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1164 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1165 }
1166
1167 /* This will undo the client protection done by protectClient() */
unprotectClient(client * c)1168 void unprotectClient(client *c) {
1169 if (c->flags & CLIENT_PROTECTED) {
1170 c->flags &= ~CLIENT_PROTECTED;
1171 aeCreateFileEvent(server.el,c->fd,AE_READABLE,readQueryFromClient,c);
1172 if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
1173 }
1174 }
1175
1176 /* Like processMultibulkBuffer(), but for the inline protocol instead of RESP,
1177 * this function consumes the client query buffer and creates a command ready
1178 * to be executed inside the client structure. Returns C_OK if the command
1179 * is ready to be executed, or C_ERR if there is still protocol to read to
1180 * have a well formed command. The function also returns C_ERR when there is
1181 * a protocol error: in such a case the client structure is setup to reply
1182 * with the error and close the connection. */
processInlineBuffer(client * c)1183 int processInlineBuffer(client *c) {
1184 char *newline;
1185 int argc, j, linefeed_chars = 1;
1186 sds *argv, aux;
1187 size_t querylen;
1188
1189 /* Search for end of line */
1190 newline = strchr(c->querybuf+c->qb_pos,'\n');
1191
1192 /* Nothing to do without a \r\n */
1193 if (newline == NULL) {
1194 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
1195 addReplyError(c,"Protocol error: too big inline request");
1196 setProtocolError("too big inline request",c);
1197 }
1198 return C_ERR;
1199 }
1200
1201 /* Handle the \r\n case. */
1202 if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
1203 newline--, linefeed_chars++;
1204
1205 /* Split the input buffer up to the \r\n */
1206 querylen = newline-(c->querybuf+c->qb_pos);
1207 aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
1208 argv = sdssplitargs(aux,&argc);
1209 sdsfree(aux);
1210 if (argv == NULL) {
1211 addReplyError(c,"Protocol error: unbalanced quotes in request");
1212 setProtocolError("unbalanced quotes in inline request",c);
1213 return C_ERR;
1214 }
1215
1216 /* Newline from slaves can be used to refresh the last ACK time.
1217 * This is useful for a slave to ping back while loading a big
1218 * RDB file. */
1219 if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
1220 c->repl_ack_time = server.unixtime;
1221
1222 /* Move querybuffer position to the next query in the buffer. */
1223 c->qb_pos += querylen+linefeed_chars;
1224
1225 /* Setup argv array on client structure */
1226 if (argc) {
1227 if (c->argv) zfree(c->argv);
1228 c->argv = zmalloc(sizeof(robj*)*argc);
1229 }
1230
1231 /* Create redis objects for all arguments. */
1232 for (c->argc = 0, j = 0; j < argc; j++) {
1233 c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
1234 c->argc++;
1235 }
1236 zfree(argv);
1237 return C_OK;
1238 }
1239
1240 /* Helper function. Record protocol erro details in server log,
1241 * and set the client as CLIENT_CLOSE_AFTER_REPLY. */
1242 #define PROTO_DUMP_LEN 128
setProtocolError(const char * errstr,client * c)1243 static void setProtocolError(const char *errstr, client *c) {
1244 if (server.verbosity <= LL_VERBOSE) {
1245 sds client = catClientInfoString(sdsempty(),c);
1246
1247 /* Sample some protocol to given an idea about what was inside. */
1248 char buf[256];
1249 if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
1250 snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos);
1251 } else {
1252 snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
1253 }
1254
1255 /* Remove non printable chars. */
1256 char *p = buf;
1257 while (*p != '\0') {
1258 if (!isprint(*p)) *p = '.';
1259 p++;
1260 }
1261
1262 /* Log all the client and protocol info. */
1263 serverLog(LL_VERBOSE,
1264 "Protocol error (%s) from client: %s. %s", errstr, client, buf);
1265 sdsfree(client);
1266 }
1267 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
1268 }
1269
1270 /* Process the query buffer for client 'c', setting up the client argument
1271 * vector for command execution. Returns C_OK if after running the function
1272 * the client has a well-formed ready to be processed command, otherwise
1273 * C_ERR if there is still to read more buffer to get the full command.
1274 * The function also returns C_ERR when there is a protocol error: in such a
1275 * case the client structure is setup to reply with the error and close
1276 * the connection.
1277 *
1278 * This function is called if processInputBuffer() detects that the next
1279 * command is in RESP format, so the first byte in the command is found
1280 * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
processMultibulkBuffer(client * c)1281 int processMultibulkBuffer(client *c) {
1282 char *newline = NULL;
1283 int ok;
1284 long long ll;
1285
1286 if (c->multibulklen == 0) {
1287 /* The client should have been reset */
1288 serverAssertWithInfo(c,NULL,c->argc == 0);
1289
1290 /* Multi bulk length cannot be read without a \r\n */
1291 newline = strchr(c->querybuf+c->qb_pos,'\r');
1292 if (newline == NULL) {
1293 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
1294 addReplyError(c,"Protocol error: too big mbulk count string");
1295 setProtocolError("too big mbulk count string",c);
1296 }
1297 return C_ERR;
1298 }
1299
1300 /* Buffer should also contain \n */
1301 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
1302 return C_ERR;
1303
1304 /* We know for sure there is a whole line since newline != NULL,
1305 * so go ahead and find out the multi bulk length. */
1306 serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
1307 ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
1308 if (!ok || ll > 1024*1024) {
1309 addReplyError(c,"Protocol error: invalid multibulk length");
1310 setProtocolError("invalid mbulk count",c);
1311 return C_ERR;
1312 } else if (ll > 10 && server.requirepass && !c->authenticated) {
1313 addReplyError(c, "Protocol error: unauthenticated multibulk length");
1314 setProtocolError("unauth mbulk count", c);
1315 return C_ERR;
1316 }
1317
1318 c->qb_pos = (newline-c->querybuf)+2;
1319
1320 if (ll <= 0) return C_OK;
1321
1322 c->multibulklen = ll;
1323
1324 /* Setup argv array on client structure */
1325 if (c->argv) zfree(c->argv);
1326 c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
1327 }
1328
1329 serverAssertWithInfo(c,NULL,c->multibulklen > 0);
1330 while(c->multibulklen) {
1331 /* Read bulk length if unknown */
1332 if (c->bulklen == -1) {
1333 newline = strchr(c->querybuf+c->qb_pos,'\r');
1334 if (newline == NULL) {
1335 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
1336 addReplyError(c,
1337 "Protocol error: too big bulk count string");
1338 setProtocolError("too big bulk count string",c);
1339 return C_ERR;
1340 }
1341 break;
1342 }
1343
1344 /* Buffer should also contain \n */
1345 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
1346 break;
1347
1348 if (c->querybuf[c->qb_pos] != '$') {
1349 addReplyErrorFormat(c,
1350 "Protocol error: expected '$', got '%c'",
1351 c->querybuf[c->qb_pos]);
1352 setProtocolError("expected $ but got something else",c);
1353 return C_ERR;
1354 }
1355
1356 ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
1357 if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
1358 addReplyError(c,"Protocol error: invalid bulk length");
1359 setProtocolError("invalid bulk length",c);
1360 return C_ERR;
1361 } else if (ll > 16384 && server.requirepass && !c->authenticated) {
1362 addReplyError(c, "Protocol error: unauthenticated bulk length");
1363 setProtocolError("unauth bulk length", c);
1364 return C_ERR;
1365 }
1366
1367 c->qb_pos = newline-c->querybuf+2;
1368 if (ll >= PROTO_MBULK_BIG_ARG) {
1369 /* If we are going to read a large object from network
1370 * try to make it likely that it will start at c->querybuf
1371 * boundary so that we can optimize object creation
1372 * avoiding a large copy of data.
1373 *
1374 * But only when the data we have not parsed is less than
1375 * or equal to ll+2. If the data length is greater than
1376 * ll+2, trimming querybuf is just a waste of time, because
1377 * at this time the querybuf contains not only our bulk. */
1378 if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
1379 sdsrange(c->querybuf,c->qb_pos,-1);
1380 c->qb_pos = 0;
1381 /* Hint the sds library about the amount of bytes this string is
1382 * going to contain. */
1383 c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2);
1384 }
1385 }
1386 c->bulklen = ll;
1387 }
1388
1389 /* Read bulk argument */
1390 if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
1391 /* Not enough data (+2 == trailing \r\n) */
1392 break;
1393 } else {
1394 /* Optimization: if the buffer contains JUST our bulk element
1395 * instead of creating a new object by *copying* the sds we
1396 * just use the current sds string. */
1397 if (c->qb_pos == 0 &&
1398 c->bulklen >= PROTO_MBULK_BIG_ARG &&
1399 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
1400 {
1401 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
1402 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
1403 /* Assume that if we saw a fat argument we'll see another one
1404 * likely... */
1405 c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
1406 sdsclear(c->querybuf);
1407 } else {
1408 c->argv[c->argc++] =
1409 createStringObject(c->querybuf+c->qb_pos,c->bulklen);
1410 c->qb_pos += c->bulklen+2;
1411 }
1412 c->bulklen = -1;
1413 c->multibulklen--;
1414 }
1415 }
1416
1417 /* We're done when c->multibulk == 0 */
1418 if (c->multibulklen == 0) return C_OK;
1419
1420 /* Still not ready to process the command */
1421 return C_ERR;
1422 }
1423
1424 /* This function is called every time, in the client structure 'c', there is
1425 * more query buffer to process, because we read more data from the socket
1426 * or because a client was blocked and later reactivated, so there could be
1427 * pending query buffer, already representing a full command, to process. */
processInputBuffer(client * c)1428 void processInputBuffer(client *c) {
1429 server.current_client = c;
1430
1431 /* Keep processing while there is something in the input buffer */
1432 while(c->qb_pos < sdslen(c->querybuf)) {
1433 /* Return if clients are paused. */
1434 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
1435
1436 /* Immediately abort if the client is in the middle of something. */
1437 if (c->flags & CLIENT_BLOCKED) break;
1438
1439 /* Don't process input from the master while there is a busy script
1440 * condition on the slave. We want just to accumulate the replication
1441 * stream (instead of replying -BUSY like we do with other clients) and
1442 * later resume the processing. */
1443 if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
1444
1445 /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
1446 * written to the client. Make sure to not let the reply grow after
1447 * this flag has been set (i.e. don't process more commands).
1448 *
1449 * The same applies for clients we want to terminate ASAP. */
1450 if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
1451
1452 /* Determine request type when unknown. */
1453 if (!c->reqtype) {
1454 if (c->querybuf[c->qb_pos] == '*') {
1455 c->reqtype = PROTO_REQ_MULTIBULK;
1456 } else {
1457 c->reqtype = PROTO_REQ_INLINE;
1458 }
1459 }
1460
1461 if (c->reqtype == PROTO_REQ_INLINE) {
1462 if (processInlineBuffer(c) != C_OK) break;
1463 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
1464 if (processMultibulkBuffer(c) != C_OK) break;
1465 } else {
1466 serverPanic("Unknown request type");
1467 }
1468
1469 /* Multibulk processing could see a <= 0 length. */
1470 if (c->argc == 0) {
1471 resetClient(c);
1472 } else {
1473 /* Only reset the client when the command was executed. */
1474 if (processCommand(c) == C_OK) {
1475 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
1476 /* Update the applied replication offset of our master. */
1477 c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
1478 }
1479
1480 /* Don't reset the client structure for clients blocked in a
1481 * module blocking command, so that the reply callback will
1482 * still be able to access the client argv and argc field.
1483 * The client will be reset in unblockClientFromModule(). */
1484 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
1485 resetClient(c);
1486 }
1487 /* freeMemoryIfNeeded may flush slave output buffers. This may
1488 * result into a slave, that may be the active client, to be
1489 * freed. */
1490 if (server.current_client == NULL) break;
1491 }
1492 }
1493
1494 /* Trim to pos */
1495 if (server.current_client != NULL && c->qb_pos) {
1496 sdsrange(c->querybuf,c->qb_pos,-1);
1497 c->qb_pos = 0;
1498 }
1499
1500 server.current_client = NULL;
1501 }
1502
1503 /* This is a wrapper for processInputBuffer that also cares about handling
1504 * the replication forwarding to the sub-slaves, in case the client 'c'
1505 * is flagged as master. Usually you want to call this instead of the
1506 * raw processInputBuffer(). */
processInputBufferAndReplicate(client * c)1507 void processInputBufferAndReplicate(client *c) {
1508 if (!(c->flags & CLIENT_MASTER)) {
1509 processInputBuffer(c);
1510 } else {
1511 size_t prev_offset = c->reploff;
1512 processInputBuffer(c);
1513 size_t applied = c->reploff - prev_offset;
1514 if (applied) {
1515 replicationFeedSlavesFromMasterStream(server.slaves,
1516 c->pending_querybuf, applied);
1517 sdsrange(c->pending_querybuf,applied,-1);
1518 }
1519 }
1520 }
1521
readQueryFromClient(aeEventLoop * el,int fd,void * privdata,int mask)1522 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1523 client *c = (client*) privdata;
1524 int nread, readlen;
1525 size_t qblen;
1526 UNUSED(el);
1527 UNUSED(mask);
1528
1529 readlen = PROTO_IOBUF_LEN;
1530 /* If this is a multi bulk request, and we are processing a bulk reply
1531 * that is large enough, try to maximize the probability that the query
1532 * buffer contains exactly the SDS string representing the object, even
1533 * at the risk of requiring more read(2) calls. This way the function
1534 * processMultiBulkBuffer() can avoid copying buffers to create the
1535 * Redis Object representing the argument. */
1536 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
1537 && c->bulklen >= PROTO_MBULK_BIG_ARG)
1538 {
1539 ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
1540
1541 /* Note that the 'remaining' variable may be zero in some edge case,
1542 * for example once we resume a blocked client after CLIENT PAUSE. */
1543 if (remaining > 0 && remaining < readlen) readlen = remaining;
1544 }
1545
1546 qblen = sdslen(c->querybuf);
1547 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
1548 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
1549 nread = read(fd, c->querybuf+qblen, readlen);
1550 if (nread == -1) {
1551 if (errno == EAGAIN) {
1552 return;
1553 } else {
1554 serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
1555 freeClient(c);
1556 return;
1557 }
1558 } else if (nread == 0) {
1559 serverLog(LL_VERBOSE, "Client closed connection");
1560 freeClient(c);
1561 return;
1562 } else if (c->flags & CLIENT_MASTER) {
1563 /* Append the query buffer to the pending (not applied) buffer
1564 * of the master. We'll use this buffer later in order to have a
1565 * copy of the string applied by the last command executed. */
1566 c->pending_querybuf = sdscatlen(c->pending_querybuf,
1567 c->querybuf+qblen,nread);
1568 }
1569
1570 sdsIncrLen(c->querybuf,nread);
1571 c->lastinteraction = server.unixtime;
1572 if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
1573 server.stat_net_input_bytes += nread;
1574 if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
1575 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
1576
1577 bytes = sdscatrepr(bytes,c->querybuf,64);
1578 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
1579 sdsfree(ci);
1580 sdsfree(bytes);
1581 freeClient(c);
1582 return;
1583 }
1584
1585 /* Time to process the buffer. If the client is a master we need to
1586 * compute the difference between the applied offset before and after
1587 * processing the buffer, to understand how much of the replication stream
1588 * was actually applied to the master state: this quantity, and its
1589 * corresponding part of the replication stream, will be propagated to
1590 * the sub-slaves and to the replication backlog. */
1591 processInputBufferAndReplicate(c);
1592 }
1593
getClientsMaxBuffers(unsigned long * longest_output_list,unsigned long * biggest_input_buffer)1594 void getClientsMaxBuffers(unsigned long *longest_output_list,
1595 unsigned long *biggest_input_buffer) {
1596 client *c;
1597 listNode *ln;
1598 listIter li;
1599 unsigned long lol = 0, bib = 0;
1600
1601 listRewind(server.clients,&li);
1602 while ((ln = listNext(&li)) != NULL) {
1603 c = listNodeValue(ln);
1604
1605 if (listLength(c->reply) > lol) lol = listLength(c->reply);
1606 if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf);
1607 }
1608 *longest_output_list = lol;
1609 *biggest_input_buffer = bib;
1610 }
1611
1612 /* A Redis "Peer ID" is a colon separated ip:port pair.
1613 * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234".
1614 * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234".
1615 * For Unix sockets we use path:0, like in "/tmp/redis:0".
1616 *
1617 * A Peer ID always fits inside a buffer of NET_PEER_ID_LEN bytes, including
1618 * the null term.
1619 *
1620 * On failure the function still populates 'peerid' with the "?:0" string
1621 * in case you want to relax error checking or need to display something
1622 * anyway (see anetPeerToString implementation for more info). */
genClientPeerId(client * client,char * peerid,size_t peerid_len)1623 void genClientPeerId(client *client, char *peerid,
1624 size_t peerid_len) {
1625 if (client->flags & CLIENT_UNIX_SOCKET) {
1626 /* Unix socket client. */
1627 snprintf(peerid,peerid_len,"%s:0",server.unixsocket);
1628 } else {
1629 /* TCP client. */
1630 anetFormatPeer(client->fd,peerid,peerid_len);
1631 }
1632 }
1633
1634 /* This function returns the client peer id, by creating and caching it
1635 * if client->peerid is NULL, otherwise returning the cached value.
1636 * The Peer ID never changes during the life of the client, however it
1637 * is expensive to compute. */
getClientPeerId(client * c)1638 char *getClientPeerId(client *c) {
1639 char peerid[NET_PEER_ID_LEN];
1640
1641 if (c->peerid == NULL) {
1642 genClientPeerId(c,peerid,sizeof(peerid));
1643 c->peerid = sdsnew(peerid);
1644 }
1645 return c->peerid;
1646 }
1647
1648 /* Concatenate a string representing the state of a client in an human
1649 * readable format, into the sds string 's'. */
catClientInfoString(sds s,client * client)1650 sds catClientInfoString(sds s, client *client) {
1651 char flags[16], events[3], *p;
1652 int emask;
1653
1654 p = flags;
1655 if (client->flags & CLIENT_SLAVE) {
1656 if (client->flags & CLIENT_MONITOR)
1657 *p++ = 'O';
1658 else
1659 *p++ = 'S';
1660 }
1661 if (client->flags & CLIENT_MASTER) *p++ = 'M';
1662 if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
1663 if (client->flags & CLIENT_MULTI) *p++ = 'x';
1664 if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
1665 if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
1666 if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
1667 if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
1668 if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
1669 if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
1670 if (client->flags & CLIENT_READONLY) *p++ = 'r';
1671 if (p == flags) *p++ = 'N';
1672 *p++ = '\0';
1673
1674 emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);
1675 p = events;
1676 if (emask & AE_READABLE) *p++ = 'r';
1677 if (emask & AE_WRITABLE) *p++ = 'w';
1678 *p = '\0';
1679 return sdscatfmt(s,
1680 "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s",
1681 (unsigned long long) client->id,
1682 getClientPeerId(client),
1683 client->fd,
1684 client->name ? (char*)client->name->ptr : "",
1685 (long long)(server.unixtime - client->ctime),
1686 (long long)(server.unixtime - client->lastinteraction),
1687 flags,
1688 client->db->id,
1689 (int) dictSize(client->pubsub_channels),
1690 (int) listLength(client->pubsub_patterns),
1691 (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
1692 (unsigned long long) sdslen(client->querybuf),
1693 (unsigned long long) sdsavail(client->querybuf),
1694 (unsigned long long) client->bufpos,
1695 (unsigned long long) listLength(client->reply),
1696 (unsigned long long) getClientOutputBufferMemoryUsage(client),
1697 events,
1698 client->lastcmd ? client->lastcmd->name : "NULL");
1699 }
1700
getAllClientsInfoString(int type)1701 sds getAllClientsInfoString(int type) {
1702 listNode *ln;
1703 listIter li;
1704 client *client;
1705 sds o = sdsnewlen(SDS_NOINIT,200*listLength(server.clients));
1706 sdsclear(o);
1707 listRewind(server.clients,&li);
1708 while ((ln = listNext(&li)) != NULL) {
1709 client = listNodeValue(ln);
1710 if (type != -1 && getClientType(client) != type) continue;
1711 o = catClientInfoString(o,client);
1712 o = sdscatlen(o,"\n",1);
1713 }
1714 return o;
1715 }
1716
clientCommand(client * c)1717 void clientCommand(client *c) {
1718 listNode *ln;
1719 listIter li;
1720 client *client;
1721
1722 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
1723 const char *help[] = {
1724 "id -- Return the ID of the current connection.",
1725 "getname -- Return the name of the current connection.",
1726 "kill <ip:port> -- Kill connection made from <ip:port>.",
1727 "kill <option> <value> [option value ...] -- Kill connections. Options are:",
1728 " addr <ip:port> -- Kill connection made from <ip:port>",
1729 " type (normal|master|replica|pubsub) -- Kill connections by type.",
1730 " skipme (yes|no) -- Skip killing current connection (default: yes).",
1731 "list [options ...] -- Return information about client connections. Options:",
1732 " type (normal|master|replica|pubsub) -- Return clients of specified type.",
1733 "pause <timeout> -- Suspend all Redis clients for <timout> milliseconds.",
1734 "reply (on|off|skip) -- Control the replies sent to the current connection.",
1735 "setname <name> -- Assign the name <name> to the current connection.",
1736 "unblock <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
1737 NULL
1738 };
1739 addReplyHelp(c, help);
1740 } else if (!strcasecmp(c->argv[1]->ptr,"id") && c->argc == 2) {
1741 /* CLIENT ID */
1742 addReplyLongLong(c,c->id);
1743 } else if (!strcasecmp(c->argv[1]->ptr,"list")) {
1744 /* CLIENT LIST */
1745 int type = -1;
1746 if (c->argc == 4 && !strcasecmp(c->argv[2]->ptr,"type")) {
1747 type = getClientTypeByName(c->argv[3]->ptr);
1748 if (type == -1) {
1749 addReplyErrorFormat(c,"Unknown client type '%s'",
1750 (char*) c->argv[3]->ptr);
1751 return;
1752 }
1753 } else if (c->argc != 2) {
1754 addReply(c,shared.syntaxerr);
1755 return;
1756 }
1757 sds o = getAllClientsInfoString(type);
1758 addReplyBulkCBuffer(c,o,sdslen(o));
1759 sdsfree(o);
1760 } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) {
1761 /* CLIENT REPLY ON|OFF|SKIP */
1762 if (!strcasecmp(c->argv[2]->ptr,"on")) {
1763 c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF);
1764 addReply(c,shared.ok);
1765 } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
1766 c->flags |= CLIENT_REPLY_OFF;
1767 } else if (!strcasecmp(c->argv[2]->ptr,"skip")) {
1768 if (!(c->flags & CLIENT_REPLY_OFF))
1769 c->flags |= CLIENT_REPLY_SKIP_NEXT;
1770 } else {
1771 addReply(c,shared.syntaxerr);
1772 return;
1773 }
1774 } else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
1775 /* CLIENT KILL <ip:port>
1776 * CLIENT KILL <option> [value] ... <option> [value] */
1777 char *addr = NULL;
1778 int type = -1;
1779 uint64_t id = 0;
1780 int skipme = 1;
1781 int killed = 0, close_this_client = 0;
1782
1783 if (c->argc == 3) {
1784 /* Old style syntax: CLIENT KILL <addr> */
1785 addr = c->argv[2]->ptr;
1786 skipme = 0; /* With the old form, you can kill yourself. */
1787 } else if (c->argc > 3) {
1788 int i = 2; /* Next option index. */
1789
1790 /* New style syntax: parse options. */
1791 while(i < c->argc) {
1792 int moreargs = c->argc > i+1;
1793
1794 if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) {
1795 long long tmp;
1796
1797 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&tmp,NULL)
1798 != C_OK) return;
1799 id = tmp;
1800 } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) {
1801 type = getClientTypeByName(c->argv[i+1]->ptr);
1802 if (type == -1) {
1803 addReplyErrorFormat(c,"Unknown client type '%s'",
1804 (char*) c->argv[i+1]->ptr);
1805 return;
1806 }
1807 } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) {
1808 addr = c->argv[i+1]->ptr;
1809 } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && moreargs) {
1810 if (!strcasecmp(c->argv[i+1]->ptr,"yes")) {
1811 skipme = 1;
1812 } else if (!strcasecmp(c->argv[i+1]->ptr,"no")) {
1813 skipme = 0;
1814 } else {
1815 addReply(c,shared.syntaxerr);
1816 return;
1817 }
1818 } else {
1819 addReply(c,shared.syntaxerr);
1820 return;
1821 }
1822 i += 2;
1823 }
1824 } else {
1825 addReply(c,shared.syntaxerr);
1826 return;
1827 }
1828
1829 /* Iterate clients killing all the matching clients. */
1830 listRewind(server.clients,&li);
1831 while ((ln = listNext(&li)) != NULL) {
1832 client = listNodeValue(ln);
1833 if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
1834 if (type != -1 && getClientType(client) != type) continue;
1835 if (id != 0 && client->id != id) continue;
1836 if (c == client && skipme) continue;
1837
1838 /* Kill it. */
1839 if (c == client) {
1840 close_this_client = 1;
1841 } else {
1842 freeClient(client);
1843 }
1844 killed++;
1845 }
1846
1847 /* Reply according to old/new format. */
1848 if (c->argc == 3) {
1849 if (killed == 0)
1850 addReplyError(c,"No such client");
1851 else
1852 addReply(c,shared.ok);
1853 } else {
1854 addReplyLongLong(c,killed);
1855 }
1856
1857 /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY
1858 * only after we queued the reply to its output buffers. */
1859 if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY;
1860 } else if (!strcasecmp(c->argv[1]->ptr,"unblock") && (c->argc == 3 ||
1861 c->argc == 4))
1862 {
1863 /* CLIENT UNBLOCK <id> [timeout|error] */
1864 long long id;
1865 int unblock_error = 0;
1866
1867 if (c->argc == 4) {
1868 if (!strcasecmp(c->argv[3]->ptr,"timeout")) {
1869 unblock_error = 0;
1870 } else if (!strcasecmp(c->argv[3]->ptr,"error")) {
1871 unblock_error = 1;
1872 } else {
1873 addReplyError(c,
1874 "CLIENT UNBLOCK reason should be TIMEOUT or ERROR");
1875 return;
1876 }
1877 }
1878 if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
1879 != C_OK) return;
1880 struct client *target = lookupClientByID(id);
1881 if (target && target->flags & CLIENT_BLOCKED) {
1882 if (unblock_error)
1883 addReplyError(target,
1884 "-UNBLOCKED client unblocked via CLIENT UNBLOCK");
1885 else
1886 replyToBlockedClientTimedOut(target);
1887 unblockClient(target);
1888 addReply(c,shared.cone);
1889 } else {
1890 addReply(c,shared.czero);
1891 }
1892 } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) {
1893 int j, len = sdslen(c->argv[2]->ptr);
1894 char *p = c->argv[2]->ptr;
1895
1896 /* Setting the client name to an empty string actually removes
1897 * the current name. */
1898 if (len == 0) {
1899 if (c->name) decrRefCount(c->name);
1900 c->name = NULL;
1901 addReply(c,shared.ok);
1902 return;
1903 }
1904
1905 /* Otherwise check if the charset is ok. We need to do this otherwise
1906 * CLIENT LIST format will break. You should always be able to
1907 * split by space to get the different fields. */
1908 for (j = 0; j < len; j++) {
1909 if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */
1910 addReplyError(c,
1911 "Client names cannot contain spaces, "
1912 "newlines or special characters.");
1913 return;
1914 }
1915 }
1916 if (c->name) decrRefCount(c->name);
1917 c->name = c->argv[2];
1918 incrRefCount(c->name);
1919 addReply(c,shared.ok);
1920 } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) {
1921 if (c->name)
1922 addReplyBulk(c,c->name);
1923 else
1924 addReply(c,shared.nullbulk);
1925 } else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) {
1926 long long duration;
1927
1928 if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS)
1929 != C_OK) return;
1930 pauseClients(duration);
1931 addReply(c,shared.ok);
1932 } else {
1933 addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CLIENT HELP", (char*)c->argv[1]->ptr);
1934 }
1935 }
1936
1937 /* This callback is bound to POST and "Host:" command names. Those are not
1938 * really commands, but are used in security attacks in order to talk to
1939 * Redis instances via HTTP, with a technique called "cross protocol scripting"
1940 * which exploits the fact that services like Redis will discard invalid
1941 * HTTP headers and will process what follows.
1942 *
1943 * As a protection against this attack, Redis will terminate the connection
1944 * when a POST or "Host:" header is seen, and will log the event from
1945 * time to time (to avoid creating a DOS as a result of too many logs). */
securityWarningCommand(client * c)1946 void securityWarningCommand(client *c) {
1947 static time_t logged_time;
1948 time_t now = time(NULL);
1949
1950 if (labs(now-logged_time) > 60) {
1951 serverLog(LL_WARNING,"Possible SECURITY ATTACK detected. It looks like somebody is sending POST or Host: commands to Redis. This is likely due to an attacker attempting to use Cross Protocol Scripting to compromise your Redis instance. Connection aborted.");
1952 logged_time = now;
1953 }
1954 freeClientAsync(c);
1955 }
1956
1957 /* Rewrite the command vector of the client. All the new objects ref count
1958 * is incremented. The old command vector is freed, and the old objects
1959 * ref count is decremented. */
rewriteClientCommandVector(client * c,int argc,...)1960 void rewriteClientCommandVector(client *c, int argc, ...) {
1961 va_list ap;
1962 int j;
1963 robj **argv; /* The new argument vector */
1964
1965 argv = zmalloc(sizeof(robj*)*argc);
1966 va_start(ap,argc);
1967 for (j = 0; j < argc; j++) {
1968 robj *a;
1969
1970 a = va_arg(ap, robj*);
1971 argv[j] = a;
1972 incrRefCount(a);
1973 }
1974 /* We free the objects in the original vector at the end, so we are
1975 * sure that if the same objects are reused in the new vector the
1976 * refcount gets incremented before it gets decremented. */
1977 for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
1978 zfree(c->argv);
1979 /* Replace argv and argc with our new versions. */
1980 c->argv = argv;
1981 c->argc = argc;
1982 c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
1983 serverAssertWithInfo(c,NULL,c->cmd != NULL);
1984 va_end(ap);
1985 }
1986
1987 /* Completely replace the client command vector with the provided one. */
replaceClientCommandVector(client * c,int argc,robj ** argv)1988 void replaceClientCommandVector(client *c, int argc, robj **argv) {
1989 freeClientArgv(c);
1990 zfree(c->argv);
1991 c->argv = argv;
1992 c->argc = argc;
1993 c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
1994 serverAssertWithInfo(c,NULL,c->cmd != NULL);
1995 }
1996
1997 /* Rewrite a single item in the command vector.
1998 * The new val ref count is incremented, and the old decremented.
1999 *
2000 * It is possible to specify an argument over the current size of the
2001 * argument vector: in this case the array of objects gets reallocated
2002 * and c->argc set to the max value. However it's up to the caller to
2003 *
2004 * 1. Make sure there are no "holes" and all the arguments are set.
2005 * 2. If the original argument vector was longer than the one we
2006 * want to end with, it's up to the caller to set c->argc and
2007 * free the no longer used objects on c->argv. */
rewriteClientCommandArgument(client * c,int i,robj * newval)2008 void rewriteClientCommandArgument(client *c, int i, robj *newval) {
2009 robj *oldval;
2010
2011 if (i >= c->argc) {
2012 c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1));
2013 c->argc = i+1;
2014 c->argv[i] = NULL;
2015 }
2016 oldval = c->argv[i];
2017 c->argv[i] = newval;
2018 incrRefCount(newval);
2019 if (oldval) decrRefCount(oldval);
2020
2021 /* If this is the command name make sure to fix c->cmd. */
2022 if (i == 0) {
2023 c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
2024 serverAssertWithInfo(c,NULL,c->cmd != NULL);
2025 }
2026 }
2027
2028 /* This function returns the number of bytes that Redis is
2029 * using to store the reply still not read by the client.
2030 *
2031 * Note: this function is very fast so can be called as many time as
2032 * the caller wishes. The main usage of this function currently is
2033 * enforcing the client output length limits. */
getClientOutputBufferMemoryUsage(client * c)2034 unsigned long getClientOutputBufferMemoryUsage(client *c) {
2035 unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
2036 return c->reply_bytes + (list_item_size*listLength(c->reply));
2037 }
2038
2039 /* Get the class of a client, used in order to enforce limits to different
2040 * classes of clients.
2041 *
2042 * The function will return one of the following:
2043 * CLIENT_TYPE_NORMAL -> Normal client
2044 * CLIENT_TYPE_SLAVE -> Slave
2045 * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
2046 * CLIENT_TYPE_MASTER -> The client representing our replication master.
2047 */
getClientType(client * c)2048 int getClientType(client *c) {
2049 if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
2050 /* Even though MONITOR clients are marked as replicas, we
2051 * want the expose them as normal clients. */
2052 if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
2053 return CLIENT_TYPE_SLAVE;
2054 if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;
2055 return CLIENT_TYPE_NORMAL;
2056 }
2057
getClientTypeByName(char * name)2058 int getClientTypeByName(char *name) {
2059 if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;
2060 else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;
2061 else if (!strcasecmp(name,"replica")) return CLIENT_TYPE_SLAVE;
2062 else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;
2063 else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER;
2064 else return -1;
2065 }
2066
getClientTypeName(int class)2067 char *getClientTypeName(int class) {
2068 switch(class) {
2069 case CLIENT_TYPE_NORMAL: return "normal";
2070 case CLIENT_TYPE_SLAVE: return "slave";
2071 case CLIENT_TYPE_PUBSUB: return "pubsub";
2072 case CLIENT_TYPE_MASTER: return "master";
2073 default: return NULL;
2074 }
2075 }
2076
2077 /* The function checks if the client reached output buffer soft or hard
2078 * limit, and also update the state needed to check the soft limit as
2079 * a side effect.
2080 *
2081 * Return value: non-zero if the client reached the soft or the hard limit.
2082 * Otherwise zero is returned. */
checkClientOutputBufferLimits(client * c)2083 int checkClientOutputBufferLimits(client *c) {
2084 int soft = 0, hard = 0, class;
2085 unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
2086
2087 class = getClientType(c);
2088 /* For the purpose of output buffer limiting, masters are handled
2089 * like normal clients. */
2090 if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
2091
2092 if (server.client_obuf_limits[class].hard_limit_bytes &&
2093 used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
2094 hard = 1;
2095 if (server.client_obuf_limits[class].soft_limit_bytes &&
2096 used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
2097 soft = 1;
2098
2099 /* We need to check if the soft limit is reached continuously for the
2100 * specified amount of seconds. */
2101 if (soft) {
2102 if (c->obuf_soft_limit_reached_time == 0) {
2103 c->obuf_soft_limit_reached_time = server.unixtime;
2104 soft = 0; /* First time we see the soft limit reached */
2105 } else {
2106 time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
2107
2108 if (elapsed <=
2109 server.client_obuf_limits[class].soft_limit_seconds) {
2110 soft = 0; /* The client still did not reached the max number of
2111 seconds for the soft limit to be considered
2112 reached. */
2113 }
2114 }
2115 } else {
2116 c->obuf_soft_limit_reached_time = 0;
2117 }
2118 return soft || hard;
2119 }
2120
2121 /* Asynchronously close a client if soft or hard limit is reached on the
2122 * output buffer size. The caller can check if the client will be closed
2123 * checking if the client CLIENT_CLOSE_ASAP flag is set.
2124 *
2125 * Note: we need to close the client asynchronously because this function is
2126 * called from contexts where the client can't be freed safely, i.e. from the
2127 * lower level functions pushing data inside the client output buffers. */
asyncCloseClientOnOutputBufferLimitReached(client * c)2128 void asyncCloseClientOnOutputBufferLimitReached(client *c) {
2129 if (c->fd == -1) return; /* It is unsafe to free fake clients. */
2130 serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
2131 if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
2132 if (checkClientOutputBufferLimits(c)) {
2133 sds client = catClientInfoString(sdsempty(),c);
2134
2135 freeClientAsync(c);
2136 serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
2137 sdsfree(client);
2138 }
2139 }
2140
2141 /* Helper function used by freeMemoryIfNeeded() in order to flush slaves
2142 * output buffers without returning control to the event loop.
2143 * This is also called by SHUTDOWN for a best-effort attempt to send
2144 * slaves the latest writes. */
flushSlavesOutputBuffers(void)2145 void flushSlavesOutputBuffers(void) {
2146 listIter li;
2147 listNode *ln;
2148
2149 listRewind(server.slaves,&li);
2150 while((ln = listNext(&li))) {
2151 client *slave = listNodeValue(ln);
2152 int events = aeGetFileEvents(server.el,slave->fd);
2153 int can_receive_writes = (events & AE_WRITABLE) ||
2154 (slave->flags & CLIENT_PENDING_WRITE);
2155
2156 /* We don't want to send the pending data to the replica in a few
2157 * cases:
2158 *
2159 * 1. For some reason there is neither the write handler installed
2160 * nor the client is flagged as to have pending writes: for some
2161 * reason this replica may not be set to receive data. This is
2162 * just for the sake of defensive programming.
2163 *
2164 * 2. The put_online_on_ack flag is true. To know why we don't want
2165 * to send data to the replica in this case, please grep for the
2166 * flag for this flag.
2167 *
2168 * 3. Obviously if the slave is not ONLINE.
2169 */
2170 if (slave->replstate == SLAVE_STATE_ONLINE &&
2171 can_receive_writes &&
2172 !slave->repl_put_online_on_ack &&
2173 clientHasPendingReplies(slave))
2174 {
2175 writeToClient(slave->fd,slave,0);
2176 }
2177 }
2178 }
2179
2180 /* Pause clients up to the specified unixtime (in ms). While clients
2181 * are paused no command is processed from clients, so the data set can't
2182 * change during that time.
2183 *
2184 * However while this function pauses normal and Pub/Sub clients, slaves are
2185 * still served, so this function can be used on server upgrades where it is
2186 * required that slaves process the latest bytes from the replication stream
2187 * before being turned to masters.
2188 *
2189 * This function is also internally used by Redis Cluster for the manual
2190 * failover procedure implemented by CLUSTER FAILOVER.
2191 *
2192 * The function always succeed, even if there is already a pause in progress.
2193 * In such a case, the pause is extended if the duration is more than the
2194 * time left for the previous duration. However if the duration is smaller
2195 * than the time left for the previous pause, no change is made to the
2196 * left duration. */
pauseClients(mstime_t end)2197 void pauseClients(mstime_t end) {
2198 if (!server.clients_paused || end > server.clients_pause_end_time)
2199 server.clients_pause_end_time = end;
2200 server.clients_paused = 1;
2201 }
2202
2203 /* Return non-zero if clients are currently paused. As a side effect the
2204 * function checks if the pause time was reached and clear it. */
clientsArePaused(void)2205 int clientsArePaused(void) {
2206 if (server.clients_paused &&
2207 server.clients_pause_end_time < server.mstime)
2208 {
2209 listNode *ln;
2210 listIter li;
2211 client *c;
2212
2213 server.clients_paused = 0;
2214
2215 /* Put all the clients in the unblocked clients queue in order to
2216 * force the re-processing of the input buffer if any. */
2217 listRewind(server.clients,&li);
2218 while ((ln = listNext(&li)) != NULL) {
2219 c = listNodeValue(ln);
2220
2221 /* Don't touch slaves and blocked clients.
2222 * The latter pending requests will be processed when unblocked. */
2223 if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
2224 queueClientForReprocessing(c);
2225 }
2226 }
2227 return server.clients_paused;
2228 }
2229
2230 /* This function is called by Redis in order to process a few events from
2231 * time to time while blocked into some not interruptible operation.
2232 * This allows to reply to clients with the -LOADING error while loading the
2233 * data set at startup or after a full resynchronization with the master
2234 * and so forth.
2235 *
2236 * It calls the event loop in order to process a few events. Specifically we
2237 * try to call the event loop 4 times as long as we receive acknowledge that
2238 * some event was processed, in order to go forward with the accept, read,
2239 * write, close sequence needed to serve a client.
2240 *
2241 * The function returns the total number of events processed. */
processEventsWhileBlocked(void)2242 int processEventsWhileBlocked(void) {
2243 int iterations = 4; /* See the function top-comment. */
2244 int count = 0;
2245 while (iterations--) {
2246 int events = 0;
2247 events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
2248 events += handleClientsWithPendingWrites();
2249 if (!events) break;
2250 count += events;
2251 }
2252 return count;
2253 }
2254