1 /*
2 * twemproxy - A fast and lightweight proxy for memcached protocol.
3 * Copyright (C) 2011 Twitter, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <stdio.h>
19 #include <ctype.h>
20 #include <math.h>
21
22 #include <nc_core.h>
23 #include <nc_proto.h>
24
25 #define RSP_STRING(ACTION) \
26 ACTION( ok, "+OK\r\n" ) \
27 ACTION( pong, "+PONG\r\n" ) \
28 ACTION( invalid_password, "-ERR invalid password\r\n" ) \
29 ACTION( auth_required, "-NOAUTH Authentication required\r\n" ) \
30 ACTION( no_password, "-ERR Client sent AUTH, but no password is set\r\n" ) \
31
32 #define DEFINE_ACTION(_var, _str) static struct string rsp_##_var = string(_str);
33 RSP_STRING( DEFINE_ACTION )
34 #undef DEFINE_ACTION
35
36 static rstatus_t redis_handle_auth_req(struct msg *request, struct msg *response);
37
38 /*
39 * Return true, if the redis command take no key, otherwise
40 * return false
41 */
42 static bool
redis_argz(const struct msg * r)43 redis_argz(const struct msg *r)
44 {
45 switch (r->type) {
46 /* TODO: PING has an optional argument, emulate that? */
47 case MSG_REQ_REDIS_PING:
48 case MSG_REQ_REDIS_QUIT:
49 case MSG_REQ_REDIS_COMMAND:
50 return true;
51
52 default:
53 break;
54 }
55
56 return false;
57 }
58
59 /*
60 * Return true, if the redis command accepts no arguments, otherwise
61 * return false
62 */
63 static bool
redis_arg0(const struct msg * r)64 redis_arg0(const struct msg *r)
65 {
66 switch (r->type) {
67 case MSG_REQ_REDIS_PERSIST:
68 case MSG_REQ_REDIS_PTTL:
69 case MSG_REQ_REDIS_TTL:
70 case MSG_REQ_REDIS_TYPE:
71 case MSG_REQ_REDIS_DUMP:
72
73 case MSG_REQ_REDIS_DECR:
74 case MSG_REQ_REDIS_GET:
75 case MSG_REQ_REDIS_GETDEL:
76 case MSG_REQ_REDIS_INCR:
77 case MSG_REQ_REDIS_STRLEN:
78
79 case MSG_REQ_REDIS_HGETALL:
80 case MSG_REQ_REDIS_HKEYS:
81 case MSG_REQ_REDIS_HLEN:
82 case MSG_REQ_REDIS_HVALS:
83
84 case MSG_REQ_REDIS_LLEN:
85
86 case MSG_REQ_REDIS_SCARD:
87 case MSG_REQ_REDIS_SMEMBERS:
88
89 case MSG_REQ_REDIS_ZCARD:
90 /* TODO: Support emulating 2-arg username+password auth by just checking password? */
91 case MSG_REQ_REDIS_AUTH:
92 return true;
93
94 default:
95 break;
96 }
97
98 return false;
99 }
100
101 /*
102 * Return true, if the redis command accepts exactly 1 argument, otherwise
103 * return false
104 */
105 static bool
redis_arg1(const struct msg * r)106 redis_arg1(const struct msg *r)
107 {
108 switch (r->type) {
109 case MSG_REQ_REDIS_EXPIRE:
110 case MSG_REQ_REDIS_EXPIREAT:
111 case MSG_REQ_REDIS_PEXPIRE:
112 case MSG_REQ_REDIS_PEXPIREAT:
113 case MSG_REQ_REDIS_MOVE:
114
115 case MSG_REQ_REDIS_APPEND:
116 case MSG_REQ_REDIS_DECRBY:
117 case MSG_REQ_REDIS_GETBIT:
118 case MSG_REQ_REDIS_GETSET:
119 case MSG_REQ_REDIS_INCRBY:
120 case MSG_REQ_REDIS_INCRBYFLOAT:
121 case MSG_REQ_REDIS_SETNX:
122
123 case MSG_REQ_REDIS_HEXISTS:
124 case MSG_REQ_REDIS_HGET:
125 case MSG_REQ_REDIS_HSTRLEN:
126
127 case MSG_REQ_REDIS_LINDEX:
128 case MSG_REQ_REDIS_RPOPLPUSH:
129
130 case MSG_REQ_REDIS_SISMEMBER:
131
132 case MSG_REQ_REDIS_ZRANK:
133 case MSG_REQ_REDIS_ZREVRANK:
134 case MSG_REQ_REDIS_ZSCORE:
135 return true;
136
137 default:
138 break;
139 }
140
141 return false;
142 }
143
144 /*
145 * Return true, if the redis command accepts exactly 2 arguments, otherwise
146 * return false
147 */
148 static bool
redis_arg2(const struct msg * r)149 redis_arg2(const struct msg *r)
150 {
151 switch (r->type) {
152 case MSG_REQ_REDIS_GETRANGE:
153 case MSG_REQ_REDIS_PSETEX:
154 case MSG_REQ_REDIS_SETBIT:
155 case MSG_REQ_REDIS_SETEX:
156 case MSG_REQ_REDIS_SETRANGE:
157
158 case MSG_REQ_REDIS_HINCRBY:
159 case MSG_REQ_REDIS_HINCRBYFLOAT:
160 case MSG_REQ_REDIS_HSETNX:
161
162 case MSG_REQ_REDIS_LRANGE:
163 case MSG_REQ_REDIS_LREM:
164 case MSG_REQ_REDIS_LSET:
165 case MSG_REQ_REDIS_LTRIM:
166
167 case MSG_REQ_REDIS_SMOVE:
168
169 case MSG_REQ_REDIS_ZCOUNT:
170 case MSG_REQ_REDIS_ZLEXCOUNT:
171 case MSG_REQ_REDIS_ZINCRBY:
172 case MSG_REQ_REDIS_ZREMRANGEBYLEX:
173 case MSG_REQ_REDIS_ZREMRANGEBYRANK:
174 case MSG_REQ_REDIS_ZREMRANGEBYSCORE:
175 return true;
176
177 default:
178 break;
179 }
180
181 return false;
182 }
183
184 /*
185 * Return true, if the redis command accepts exactly 3 arguments, otherwise
186 * return false
187 */
188 static bool
redis_arg3(const struct msg * r)189 redis_arg3(const struct msg *r)
190 {
191 switch (r->type) {
192 case MSG_REQ_REDIS_LINSERT:
193 case MSG_REQ_REDIS_LMOVE:
194 return true;
195
196 default:
197 break;
198 }
199
200 return false;
201 }
202
203 /*
204 * Return true, if the redis command operates on one key and accepts 0 or more arguments, otherwise
205 * return false
206 */
207 static bool
redis_argn(const struct msg * r)208 redis_argn(const struct msg *r)
209 {
210 switch (r->type) {
211 case MSG_REQ_REDIS_SORT:
212 case MSG_REQ_REDIS_COPY:
213
214 case MSG_REQ_REDIS_BITCOUNT:
215 case MSG_REQ_REDIS_BITPOS:
216 case MSG_REQ_REDIS_BITFIELD:
217 /* TODO: Support REDIS_BITOP operation destkey key ... and add tests - this requires handling key in a position other than the first one */
218
219 case MSG_REQ_REDIS_EXISTS:
220 case MSG_REQ_REDIS_GETEX:
221 case MSG_REQ_REDIS_SET:
222
223 case MSG_REQ_REDIS_HDEL:
224 case MSG_REQ_REDIS_HMGET:
225 case MSG_REQ_REDIS_HMSET:
226 case MSG_REQ_REDIS_HSCAN:
227 case MSG_REQ_REDIS_HSET:
228 case MSG_REQ_REDIS_HRANDFIELD:
229
230 case MSG_REQ_REDIS_LPUSH:
231 case MSG_REQ_REDIS_LPUSHX:
232 case MSG_REQ_REDIS_RPUSH:
233 case MSG_REQ_REDIS_RPUSHX:
234 case MSG_REQ_REDIS_LPOP:
235 case MSG_REQ_REDIS_RPOP:
236 case MSG_REQ_REDIS_LPOS:
237
238 case MSG_REQ_REDIS_SADD:
239 case MSG_REQ_REDIS_SDIFF:
240 case MSG_REQ_REDIS_SDIFFSTORE:
241 case MSG_REQ_REDIS_SINTER:
242 case MSG_REQ_REDIS_SINTERSTORE:
243 case MSG_REQ_REDIS_SREM:
244 case MSG_REQ_REDIS_SUNION:
245 case MSG_REQ_REDIS_SUNIONSTORE:
246 case MSG_REQ_REDIS_SRANDMEMBER:
247 case MSG_REQ_REDIS_SSCAN:
248 case MSG_REQ_REDIS_SPOP:
249 case MSG_REQ_REDIS_SMISMEMBER:
250
251 case MSG_REQ_REDIS_PFADD:
252 case MSG_REQ_REDIS_PFMERGE:
253 case MSG_REQ_REDIS_PFCOUNT:
254
255 case MSG_REQ_REDIS_ZADD:
256 case MSG_REQ_REDIS_ZDIFF:
257 case MSG_REQ_REDIS_ZDIFFSTORE:
258 case MSG_REQ_REDIS_ZINTER:
259 case MSG_REQ_REDIS_ZINTERSTORE:
260 case MSG_REQ_REDIS_ZMSCORE:
261 case MSG_REQ_REDIS_ZPOPMAX:
262 case MSG_REQ_REDIS_ZPOPMIN:
263 case MSG_REQ_REDIS_ZRANDMEMBER:
264 case MSG_REQ_REDIS_ZRANGE:
265 case MSG_REQ_REDIS_ZRANGEBYLEX:
266 case MSG_REQ_REDIS_ZRANGEBYSCORE:
267 case MSG_REQ_REDIS_ZRANGESTORE:
268 case MSG_REQ_REDIS_ZREM:
269 case MSG_REQ_REDIS_ZREVRANGE:
270 case MSG_REQ_REDIS_ZREVRANGEBYLEX:
271 case MSG_REQ_REDIS_ZREVRANGEBYSCORE:
272 case MSG_REQ_REDIS_ZSCAN:
273 case MSG_REQ_REDIS_ZUNION:
274 case MSG_REQ_REDIS_ZUNIONSTORE:
275
276 case MSG_REQ_REDIS_GEODIST:
277 case MSG_REQ_REDIS_GEOPOS:
278 case MSG_REQ_REDIS_GEOHASH:
279 case MSG_REQ_REDIS_GEOADD:
280 case MSG_REQ_REDIS_GEORADIUS:
281 case MSG_REQ_REDIS_GEORADIUSBYMEMBER:
282 case MSG_REQ_REDIS_GEOSEARCH:
283 case MSG_REQ_REDIS_GEOSEARCHSTORE:
284
285 case MSG_REQ_REDIS_RESTORE:
286 return true;
287
288 default:
289 break;
290 }
291
292 return false;
293 }
294
295 /*
296 * Return true, if the redis command is a vector command accepting one or
297 * more keys, otherwise return false
298 */
299 static bool
redis_argx(const struct msg * r)300 redis_argx(const struct msg *r)
301 {
302 switch (r->type) {
303 case MSG_REQ_REDIS_MGET:
304 case MSG_REQ_REDIS_DEL:
305 case MSG_REQ_REDIS_UNLINK:
306 case MSG_REQ_REDIS_TOUCH:
307 return true;
308
309 default:
310 break;
311 }
312
313 return false;
314 }
315
316 /*
317 * Return true, if the redis command is a vector command accepting one or
318 * more key-value pairs, otherwise return false
319 */
320 static bool
redis_argkvx(const struct msg * r)321 redis_argkvx(const struct msg *r)
322 {
323 switch (r->type) {
324 case MSG_REQ_REDIS_MSET:
325 return true;
326
327 default:
328 break;
329 }
330
331 return false;
332 }
333
334 /*
335 * Return true, if the redis command is either EVAL or EVALSHA. These commands
336 * have a special format with exactly 2 arguments, followed by one or more keys,
337 * followed by zero or more arguments (the documentation online seems to suggest
338 * that at least one argument is required, but that shouldn't be the case).
339 */
340 static bool
redis_argeval(const struct msg * r)341 redis_argeval(const struct msg *r)
342 {
343 switch (r->type) {
344 case MSG_REQ_REDIS_EVAL:
345 case MSG_REQ_REDIS_EVALSHA:
346 return true;
347
348 default:
349 break;
350 }
351
352 return false;
353 }
354
355 static bool
redis_nokey(const struct msg * r)356 redis_nokey(const struct msg *r)
357 {
358 switch (r->type) {
359 case MSG_REQ_REDIS_LOLWUT:
360 return true;
361
362 default:
363 break;
364 }
365
366 return false;
367 }
368
369 /*
370 * Return true, if the redis response is an error response i.e. a simple
371 * string whose first character is '-', otherwise return false.
372 */
373 static bool
redis_error(const struct msg * r)374 redis_error(const struct msg *r)
375 {
376 switch (r->type) {
377 case MSG_RSP_REDIS_ERROR:
378 case MSG_RSP_REDIS_ERROR_ERR:
379 case MSG_RSP_REDIS_ERROR_OOM:
380 case MSG_RSP_REDIS_ERROR_BUSY:
381 case MSG_RSP_REDIS_ERROR_NOAUTH:
382 case MSG_RSP_REDIS_ERROR_LOADING:
383 case MSG_RSP_REDIS_ERROR_BUSYKEY:
384 case MSG_RSP_REDIS_ERROR_MISCONF:
385 case MSG_RSP_REDIS_ERROR_NOSCRIPT:
386 case MSG_RSP_REDIS_ERROR_READONLY:
387 case MSG_RSP_REDIS_ERROR_WRONGTYPE:
388 case MSG_RSP_REDIS_ERROR_EXECABORT:
389 case MSG_RSP_REDIS_ERROR_MASTERDOWN:
390 case MSG_RSP_REDIS_ERROR_NOREPLICAS:
391 return true;
392
393 default:
394 break;
395 }
396
397 return false;
398 }
399
400 /*
401 * Reference: http://redis.io/topics/protocol
402 *
403 * Redis >= 1.2 uses the unified protocol to send requests to the Redis
404 * server. In the unified protocol all the arguments sent to the server
405 * are binary safe and every request has the following general form:
406 *
407 * *<number of arguments> CR LF
408 * $<number of bytes of argument 1> CR LF
409 * <argument data> CR LF
410 * ...
411 * $<number of bytes of argument N> CR LF
412 * <argument data> CR LF
413 *
414 * Before the unified request protocol, redis protocol for requests supported
415 * the following commands
416 * 1). Inline commands: simple commands where arguments are just space
417 * separated strings. No binary safeness is possible.
418 * 2). Bulk commands: bulk commands are exactly like inline commands, but
419 * the last argument is handled in a special way in order to allow for
420 * a binary-safe last argument.
421 *
422 * Nutcracker only supports the Redis unified protocol for requests.
423 */
424 void
redis_parse_req(struct msg * r)425 redis_parse_req(struct msg *r)
426 {
427 struct mbuf *b;
428 uint8_t *p, *m;
429 uint8_t ch;
430 enum {
431 SW_START,
432 SW_NARG,
433 SW_NARG_LF,
434 SW_REQ_TYPE_LEN,
435 SW_REQ_TYPE_LEN_LF,
436 SW_REQ_TYPE,
437 SW_REQ_TYPE_LF,
438 SW_KEY_LEN,
439 SW_KEY_LEN_LF,
440 SW_KEY,
441 SW_KEY_LF,
442 SW_ARG1_LEN,
443 SW_ARG1_LEN_LF,
444 SW_ARG1,
445 SW_ARG1_LF,
446 SW_ARG2_LEN,
447 SW_ARG2_LEN_LF,
448 SW_ARG2,
449 SW_ARG2_LF,
450 SW_ARG3_LEN,
451 SW_ARG3_LEN_LF,
452 SW_ARG3,
453 SW_ARG3_LF,
454 SW_ARGN_LEN,
455 SW_ARGN_LEN_LF,
456 SW_ARGN,
457 SW_ARGN_LF,
458 SW_SENTINEL
459 } state;
460
461 state = r->state;
462 b = STAILQ_LAST(&r->mhdr, mbuf, next);
463
464 ASSERT(r->request);
465 ASSERT(state >= SW_START && state < SW_SENTINEL);
466 ASSERT(b != NULL);
467 ASSERT(b->pos <= b->last);
468
469 /* validate the parsing maker */
470 ASSERT(r->pos != NULL);
471 ASSERT(r->pos >= b->pos && r->pos <= b->last);
472
473 for (p = r->pos; p < b->last; p++) {
474 ch = *p;
475
476 switch (state) {
477
478 case SW_START:
479 ASSERT(r->token == NULL);
480 if (ch != '*') {
481 /* redis commands are always arrays */
482 goto error;
483 }
484 r->token = p;
485 /* req_start <- p */
486 r->narg_start = p;
487 r->rnarg = 0;
488 state = SW_NARG;
489
490 break;
491
492 case SW_NARG:
493 /* SW_NARG: The number of arguments in the redis command array */
494 ASSERT(r->token != NULL);
495 if (isdigit(ch)) {
496 r->rnarg = r->rnarg * 10 + (uint32_t)(ch - '0');
497 } else if (ch == CR) {
498 if (r->rnarg == 0) {
499 goto error;
500 }
501 r->narg = r->rnarg;
502 r->narg_end = p;
503 r->token = NULL;
504 state = SW_NARG_LF;
505 } else {
506 goto error;
507 }
508
509 break;
510
511 case SW_NARG_LF:
512 switch (ch) {
513 case LF:
514 state = SW_REQ_TYPE_LEN;
515 break;
516
517 default:
518 goto error;
519 }
520
521 break;
522
523 case SW_REQ_TYPE_LEN:
524 if (r->token == NULL) {
525 if (ch != '$') {
526 goto error;
527 }
528 r->token = p;
529 r->rlen = 0;
530 } else if (isdigit(ch)) {
531 r->rlen = r->rlen * 10 + (uint32_t)(ch - '0');
532 } else if (ch == CR) {
533 if (r->rlen == 0 || r->rnarg == 0) {
534 goto error;
535 }
536 r->rnarg--;
537 r->token = NULL;
538 state = SW_REQ_TYPE_LEN_LF;
539 } else {
540 goto error;
541 }
542
543 break;
544
545 case SW_REQ_TYPE_LEN_LF:
546 switch (ch) {
547 case LF:
548 state = SW_REQ_TYPE;
549 break;
550
551 default:
552 goto error;
553 }
554
555 break;
556
557 case SW_REQ_TYPE:
558 if (r->token == NULL) {
559 r->token = p;
560 }
561
562 m = r->token + r->rlen;
563 if (m >= b->last) {
564 m = b->last - 1;
565 p = m;
566 break;
567 }
568
569 if (*m != CR) {
570 goto error;
571 }
572
573 p = m; /* move forward by rlen bytes */
574 r->rlen = 0;
575 m = r->token;
576 r->token = NULL;
577 r->type = MSG_UNKNOWN;
578
579 switch (p - m) {
580
581 case 3:
582 if (str3icmp(m, 'g', 'e', 't')) {
583 r->type = MSG_REQ_REDIS_GET;
584 break;
585 }
586
587 if (str3icmp(m, 's', 'e', 't')) {
588 r->type = MSG_REQ_REDIS_SET;
589 break;
590 }
591
592 if (str3icmp(m, 't', 't', 'l')) {
593 r->type = MSG_REQ_REDIS_TTL;
594 break;
595 }
596
597 if (str3icmp(m, 'd', 'e', 'l')) {
598 r->type = MSG_REQ_REDIS_DEL;
599 break;
600 }
601
602 break;
603
604 case 4:
605 if (str4icmp(m, 'p', 't', 't', 'l')) {
606 r->type = MSG_REQ_REDIS_PTTL;
607 break;
608 }
609
610 if (str4icmp(m, 'd', 'e', 'c', 'r')) {
611 r->type = MSG_REQ_REDIS_DECR;
612 break;
613 }
614
615 if (str4icmp(m, 'd', 'u', 'm', 'p')) {
616 r->type = MSG_REQ_REDIS_DUMP;
617 break;
618 }
619
620 if (str4icmp(m, 'h', 'd', 'e', 'l')) {
621 r->type = MSG_REQ_REDIS_HDEL;
622 break;
623 }
624
625 if (str4icmp(m, 'h', 'g', 'e', 't')) {
626 r->type = MSG_REQ_REDIS_HGET;
627 break;
628 }
629
630 if (str4icmp(m, 'h', 'l', 'e', 'n')) {
631 r->type = MSG_REQ_REDIS_HLEN;
632 break;
633 }
634
635 if (str4icmp(m, 'h', 's', 'e', 't')) {
636 r->type = MSG_REQ_REDIS_HSET;
637 break;
638 }
639
640 if (str4icmp(m, 'i', 'n', 'c', 'r')) {
641 r->type = MSG_REQ_REDIS_INCR;
642 break;
643 }
644
645 if (str4icmp(m, 'l', 'l', 'e', 'n')) {
646 r->type = MSG_REQ_REDIS_LLEN;
647 break;
648 }
649
650 if (str4icmp(m, 'l', 'p', 'o', 'p')) {
651 r->type = MSG_REQ_REDIS_LPOP;
652 break;
653 }
654
655 if (str4icmp(m, 'l', 'p', 'o', 's')) {
656 r->type = MSG_REQ_REDIS_LPOS;
657 break;
658 }
659
660 if (str4icmp(m, 'l', 'r', 'e', 'm')) {
661 r->type = MSG_REQ_REDIS_LREM;
662 break;
663 }
664
665 if (str4icmp(m, 'l', 's', 'e', 't')) {
666 r->type = MSG_REQ_REDIS_LSET;
667 break;
668 }
669
670 if (str4icmp(m, 'r', 'p', 'o', 'p')) {
671 r->type = MSG_REQ_REDIS_RPOP;
672 break;
673 }
674
675 if (str4icmp(m, 's', 'a', 'd', 'd')) {
676 r->type = MSG_REQ_REDIS_SADD;
677 break;
678 }
679
680 if (str4icmp(m, 's', 'p', 'o', 'p')) {
681 r->type = MSG_REQ_REDIS_SPOP;
682 break;
683 }
684
685 if (str4icmp(m, 's', 'r', 'e', 'm')) {
686 r->type = MSG_REQ_REDIS_SREM;
687 break;
688 }
689
690 if (str4icmp(m, 't', 'y', 'p', 'e')) {
691 r->type = MSG_REQ_REDIS_TYPE;
692 break;
693 }
694
695 if (str4icmp(m, 'm', 'g', 'e', 't')) {
696 r->type = MSG_REQ_REDIS_MGET;
697 break;
698 }
699 if (str4icmp(m, 'm', 's', 'e', 't')) {
700 r->type = MSG_REQ_REDIS_MSET;
701 break;
702 }
703
704 if (str4icmp(m, 'z', 'a', 'd', 'd')) {
705 r->type = MSG_REQ_REDIS_ZADD;
706 break;
707 }
708
709 if (str4icmp(m, 'z', 'r', 'e', 'm')) {
710 r->type = MSG_REQ_REDIS_ZREM;
711 break;
712 }
713
714 if (str4icmp(m, 'e', 'v', 'a', 'l')) {
715 r->type = MSG_REQ_REDIS_EVAL;
716 break;
717 }
718
719 if (str4icmp(m, 's', 'o', 'r', 't')) {
720 r->type = MSG_REQ_REDIS_SORT;
721 break;
722 }
723
724 if (str4icmp(m, 'p', 'i', 'n', 'g')) {
725 r->type = MSG_REQ_REDIS_PING;
726 r->noforward = 1;
727 break;
728 }
729
730 if (str4icmp(m, 'q', 'u', 'i', 't')) {
731 r->type = MSG_REQ_REDIS_QUIT;
732 r->quit = 1;
733 break;
734 }
735
736 if (str4icmp(m, 'a', 'u', 't', 'h')) {
737 r->type = MSG_REQ_REDIS_AUTH;
738 r->noforward = 1;
739 break;
740 }
741
742 if (str4icmp(m, 'm', 'o', 'v', 'e')) {
743 r->type = MSG_REQ_REDIS_MOVE;
744 r->noforward = 1;
745 break;
746 }
747
748 if (str4icmp(m, 'c', 'o', 'p', 'y')) {
749 r->type = MSG_REQ_REDIS_COPY;
750 break;
751 }
752
753 break;
754
755 case 5:
756 if (str5icmp(m, 'h', 'k', 'e', 'y', 's')) {
757 r->type = MSG_REQ_REDIS_HKEYS;
758 break;
759 }
760
761 if (str5icmp(m, 'h', 'm', 'g', 'e', 't')) {
762 r->type = MSG_REQ_REDIS_HMGET;
763 break;
764 }
765
766 if (str5icmp(m, 'h', 'm', 's', 'e', 't')) {
767 r->type = MSG_REQ_REDIS_HMSET;
768 break;
769 }
770
771 if (str5icmp(m, 'h', 'v', 'a', 'l', 's')) {
772 r->type = MSG_REQ_REDIS_HVALS;
773 break;
774 }
775
776 if (str5icmp(m, 'h', 's', 'c', 'a', 'n')) {
777 r->type = MSG_REQ_REDIS_HSCAN;
778 break;
779 }
780
781 if (str5icmp(m, 'l', 'p', 'u', 's', 'h')) {
782 r->type = MSG_REQ_REDIS_LPUSH;
783 break;
784 }
785
786 if (str5icmp(m, 'l', 't', 'r', 'i', 'm')) {
787 r->type = MSG_REQ_REDIS_LTRIM;
788 break;
789 }
790
791 if (str5icmp(m, 'r', 'p', 'u', 's', 'h')) {
792 r->type = MSG_REQ_REDIS_RPUSH;
793 break;
794 }
795
796 if (str5icmp(m, 's', 'c', 'a', 'r', 'd')) {
797 r->type = MSG_REQ_REDIS_SCARD;
798 break;
799 }
800
801 if (str5icmp(m, 's', 'd', 'i', 'f', 'f')) {
802 r->type = MSG_REQ_REDIS_SDIFF;
803 break;
804 }
805
806 if (str5icmp(m, 's', 'e', 't', 'e', 'x')) {
807 r->type = MSG_REQ_REDIS_SETEX;
808 break;
809 }
810
811 if (str5icmp(m, 's', 'e', 't', 'n', 'x')) {
812 r->type = MSG_REQ_REDIS_SETNX;
813 break;
814 }
815
816 if (str5icmp(m, 's', 'm', 'o', 'v', 'e')) {
817 r->type = MSG_REQ_REDIS_SMOVE;
818 break;
819 }
820
821 if (str5icmp(m, 's', 's', 'c', 'a', 'n')) {
822 r->type = MSG_REQ_REDIS_SSCAN;
823 break;
824 }
825
826 if (str5icmp(m, 'z', 'c', 'a', 'r', 'd')) {
827 r->type = MSG_REQ_REDIS_ZCARD;
828 break;
829 }
830
831 if (str5icmp(m, 'z', 'd', 'i', 'f', 'f')) {
832 r->type = MSG_REQ_REDIS_ZDIFF;
833 break;
834 }
835
836 if (str5icmp(m, 'z', 'r', 'a', 'n', 'k')) {
837 r->type = MSG_REQ_REDIS_ZRANK;
838 break;
839 }
840
841 if (str5icmp(m, 'z', 's', 'c', 'a', 'n')) {
842 r->type = MSG_REQ_REDIS_ZSCAN;
843 break;
844 }
845
846 if (str5icmp(m, 'p', 'f', 'a', 'd', 'd')) {
847 r->type = MSG_REQ_REDIS_PFADD;
848 break;
849 }
850
851 if (str5icmp(m, 'g', 'e', 't', 'e', 'x')) {
852 r->type = MSG_REQ_REDIS_GETEX;
853 break;
854 }
855
856 if (str5icmp(m, 't', 'o', 'u', 'c', 'h')) {
857 r->type = MSG_REQ_REDIS_TOUCH;
858 break;
859 }
860
861 if (str5icmp(m, 'l', 'm', 'o', 'v', 'e')) {
862 r->type = MSG_REQ_REDIS_LMOVE;
863 break;
864 }
865
866 break;
867
868 case 6:
869 if (str6icmp(m, 'a', 'p', 'p', 'e', 'n', 'd')) {
870 r->type = MSG_REQ_REDIS_APPEND;
871 break;
872 }
873
874 if (str6icmp(m, 'b', 'i', 't', 'p', 'o', 's')) {
875 r->type = MSG_REQ_REDIS_BITPOS;
876 break;
877 }
878
879 if (str6icmp(m, 'd', 'e', 'c', 'r', 'b', 'y')) {
880 r->type = MSG_REQ_REDIS_DECRBY;
881 break;
882 }
883
884 if (str6icmp(m, 'e', 'x', 'i', 's', 't', 's')) {
885 r->type = MSG_REQ_REDIS_EXISTS;
886 break;
887 }
888
889 if (str6icmp(m, 'e', 'x', 'p', 'i', 'r', 'e')) {
890 r->type = MSG_REQ_REDIS_EXPIRE;
891 break;
892 }
893
894 if (str6icmp(m, 'g', 'e', 't', 'b', 'i', 't')) {
895 r->type = MSG_REQ_REDIS_GETBIT;
896 break;
897 }
898
899 if (str6icmp(m, 'g', 'e', 't', 's', 'e', 't')) {
900 r->type = MSG_REQ_REDIS_GETSET;
901 break;
902 }
903
904 if (str6icmp(m, 'p', 's', 'e', 't', 'e', 'x')) {
905 r->type = MSG_REQ_REDIS_PSETEX;
906 break;
907 }
908
909 if (str6icmp(m, 'h', 's', 'e', 't', 'n', 'x')) {
910 r->type = MSG_REQ_REDIS_HSETNX;
911 break;
912 }
913
914 if (str6icmp(m, 'i', 'n', 'c', 'r', 'b', 'y')) {
915 r->type = MSG_REQ_REDIS_INCRBY;
916 break;
917 }
918
919 if (str6icmp(m, 'l', 'i', 'n', 'd', 'e', 'x')) {
920 r->type = MSG_REQ_REDIS_LINDEX;
921 break;
922 }
923
924 if (str6icmp(m, 'l', 'p', 'u', 's', 'h', 'x')) {
925 r->type = MSG_REQ_REDIS_LPUSHX;
926 break;
927 }
928
929 if (str6icmp(m, 'l', 'r', 'a', 'n', 'g', 'e')) {
930 r->type = MSG_REQ_REDIS_LRANGE;
931 break;
932 }
933
934 if (str6icmp(m, 'r', 'p', 'u', 's', 'h', 'x')) {
935 r->type = MSG_REQ_REDIS_RPUSHX;
936 break;
937 }
938
939 if (str6icmp(m, 's', 'e', 't', 'b', 'i', 't')) {
940 r->type = MSG_REQ_REDIS_SETBIT;
941 break;
942 }
943
944 if (str6icmp(m, 's', 'i', 'n', 't', 'e', 'r')) {
945 r->type = MSG_REQ_REDIS_SINTER;
946 break;
947 }
948
949 if (str6icmp(m, 's', 't', 'r', 'l', 'e', 'n')) {
950 r->type = MSG_REQ_REDIS_STRLEN;
951 break;
952 }
953
954 if (str6icmp(m, 's', 'u', 'n', 'i', 'o', 'n')) {
955 r->type = MSG_REQ_REDIS_SUNION;
956 break;
957 }
958
959 if (str6icmp(m, 'z', 'c', 'o', 'u', 'n', 't')) {
960 r->type = MSG_REQ_REDIS_ZCOUNT;
961 break;
962 }
963
964 if (str6icmp(m, 'z', 'r', 'a', 'n', 'g', 'e')) {
965 r->type = MSG_REQ_REDIS_ZRANGE;
966 break;
967 }
968
969 if (str6icmp(m, 'z', 's', 'c', 'o', 'r', 'e')) {
970 r->type = MSG_REQ_REDIS_ZSCORE;
971 break;
972 }
973
974 if (str6icmp(m, 'g', 'e', 'o', 'p', 'o', 's')) {
975 r->type = MSG_REQ_REDIS_GEOPOS;
976 break;
977 }
978
979 if (str6icmp(m, 'g', 'e', 'o', 'a', 'd', 'd')) {
980 r->type = MSG_REQ_REDIS_GEOADD;
981 break;
982 }
983
984 if (str6icmp(m, 'g', 'e', 't', 'd', 'e', 'l')) {
985 r->type = MSG_REQ_REDIS_GETDEL;
986 break;
987 }
988
989 if (str6icmp(m, 'z', 'u', 'n', 'i', 'o', 'n')) {
990 r->type = MSG_REQ_REDIS_ZUNION;
991 break;
992 }
993
994 if (str6icmp(m, 'z', 'i', 'n', 't', 'e', 'r')) {
995 r->type = MSG_REQ_REDIS_ZINTER;
996 break;
997 }
998
999 if (str6icmp(m, 'u', 'n', 'l', 'i', 'n', 'k')) {
1000 r->type = MSG_REQ_REDIS_UNLINK;
1001 break;
1002 }
1003
1004 if (str6icmp(m, 'l', 'o', 'l', 'w', 'u', 't')) {
1005 r->type = MSG_REQ_REDIS_LOLWUT;
1006 if (!msg_set_placeholder_key(r)) {
1007 goto enomem;
1008 }
1009 break;
1010 }
1011
1012 break;
1013
1014 case 7:
1015 if (str7icmp(m, 'p', 'e', 'r', 's', 'i', 's', 't')) {
1016 r->type = MSG_REQ_REDIS_PERSIST;
1017 break;
1018 }
1019
1020 if (str7icmp(m, 'p', 'e', 'x', 'p', 'i', 'r', 'e')) {
1021 r->type = MSG_REQ_REDIS_PEXPIRE;
1022 break;
1023 }
1024
1025 if (str7icmp(m, 'h', 'e', 'x', 'i', 's', 't', 's')) {
1026 r->type = MSG_REQ_REDIS_HEXISTS;
1027 break;
1028 }
1029
1030 if (str7icmp(m, 'h', 'g', 'e', 't', 'a', 'l', 'l')) {
1031 r->type = MSG_REQ_REDIS_HGETALL;
1032 break;
1033 }
1034
1035 if (str7icmp(m, 'h', 'i', 'n', 'c', 'r', 'b', 'y')) {
1036 r->type = MSG_REQ_REDIS_HINCRBY;
1037 break;
1038 }
1039
1040 if (str7icmp(m, 'l', 'i', 'n', 's', 'e', 'r', 't')) {
1041 r->type = MSG_REQ_REDIS_LINSERT;
1042 break;
1043 }
1044
1045 if (str7icmp(m, 'z', 'i', 'n', 'c', 'r', 'b', 'y')) {
1046 r->type = MSG_REQ_REDIS_ZINCRBY;
1047 break;
1048 }
1049
1050 if (str7icmp(m, 'e', 'v', 'a', 'l', 's', 'h', 'a')) {
1051 r->type = MSG_REQ_REDIS_EVALSHA;
1052 break;
1053 }
1054
1055 if (str7icmp(m, 'r', 'e', 's', 't', 'o', 'r', 'e')) {
1056 r->type = MSG_REQ_REDIS_RESTORE;
1057 break;
1058 }
1059
1060 if (str7icmp(m, 'p', 'f', 'c', 'o', 'u', 'n', 't')) {
1061 r->type = MSG_REQ_REDIS_PFCOUNT;
1062 break;
1063 }
1064
1065 if (str7icmp(m, 'p', 'f', 'm', 'e', 'r', 'g', 'e')) {
1066 r->type = MSG_REQ_REDIS_PFMERGE;
1067 break;
1068 }
1069
1070 if (str7icmp(m, 'z', 'm', 's', 'c', 'o', 'r', 'e')) {
1071 r->type = MSG_REQ_REDIS_ZMSCORE;
1072 break;
1073 }
1074
1075 if (str7icmp(m, 'z', 'p', 'o', 'p', 'm', 'i', 'n')) {
1076 r->type = MSG_REQ_REDIS_ZPOPMIN;
1077 break;
1078 }
1079
1080 if (str7icmp(m, 'z', 'p', 'o', 'p', 'm', 'a', 'x')) {
1081 r->type = MSG_REQ_REDIS_ZPOPMAX;
1082 break;
1083 }
1084
1085 if (str7icmp(m, 'g', 'e', 'o', 'd', 'i', 's', 't')) {
1086 r->type = MSG_REQ_REDIS_GEODIST;
1087 break;
1088 }
1089
1090 if (str7icmp(m, 'g', 'e', 'o', 'h', 'a', 's', 'h')) {
1091 r->type = MSG_REQ_REDIS_GEOHASH;
1092 break;
1093 }
1094
1095 if (str7icmp(m, 'h', 's', 't', 'r', 'l', 'e', 'n')) {
1096 r->type = MSG_REQ_REDIS_HSTRLEN;
1097 break;
1098 }
1099
1100 if (str7icmp(m, 'c', 'o', 'm', 'm', 'a', 'n', 'd')) {
1101 r->type = MSG_REQ_REDIS_COMMAND;
1102 if (!msg_set_placeholder_key(r)) {
1103 goto enomem;
1104 }
1105 break;
1106 }
1107
1108 break;
1109
1110 case 8:
1111 if (str8icmp(m, 'e', 'x', 'p', 'i', 'r', 'e', 'a', 't')) {
1112 r->type = MSG_REQ_REDIS_EXPIREAT;
1113 break;
1114 }
1115
1116 if (str8icmp(m, 'b', 'i', 't', 'c', 'o', 'u', 'n', 't')) {
1117 r->type = MSG_REQ_REDIS_BITCOUNT;
1118 break;
1119 }
1120
1121 if (str8icmp(m, 'g', 'e', 't', 'r', 'a', 'n', 'g', 'e')) {
1122 r->type = MSG_REQ_REDIS_GETRANGE;
1123 break;
1124 }
1125
1126 if (str8icmp(m, 's', 'e', 't', 'r', 'a', 'n', 'g', 'e')) {
1127 r->type = MSG_REQ_REDIS_SETRANGE;
1128 break;
1129 }
1130
1131 if (str8icmp(m, 's', 'm', 'e', 'm', 'b', 'e', 'r', 's')) {
1132 r->type = MSG_REQ_REDIS_SMEMBERS;
1133 break;
1134 }
1135
1136 if (str8icmp(m, 'z', 'r', 'e', 'v', 'r', 'a', 'n', 'k')) {
1137 r->type = MSG_REQ_REDIS_ZREVRANK;
1138 break;
1139 }
1140
1141 if (str8icmp(m, 'b', 'i', 't', 'f', 'i', 'e', 'l', 'd')) {
1142 r->type = MSG_REQ_REDIS_BITFIELD;
1143 break;
1144 }
1145
1146 break;
1147
1148 case 9:
1149 if (str9icmp(m, 'p', 'e', 'x', 'p', 'i', 'r', 'e', 'a', 't')) {
1150 r->type = MSG_REQ_REDIS_PEXPIREAT;
1151 break;
1152 }
1153
1154 if (str9icmp(m, 'r', 'p', 'o', 'p', 'l', 'p', 'u', 's', 'h')) {
1155 r->type = MSG_REQ_REDIS_RPOPLPUSH;
1156 break;
1157 }
1158
1159 if (str9icmp(m, 's', 'i', 's', 'm', 'e', 'm', 'b', 'e', 'r')) {
1160 r->type = MSG_REQ_REDIS_SISMEMBER;
1161 break;
1162 }
1163
1164 if (str9icmp(m, 'z', 'r', 'e', 'v', 'r', 'a', 'n', 'g', 'e')) {
1165 r->type = MSG_REQ_REDIS_ZREVRANGE;
1166 break;
1167 }
1168
1169 if (str9icmp(m, 'z', 'l', 'e', 'x', 'c', 'o', 'u', 'n', 't')) {
1170 r->type = MSG_REQ_REDIS_ZLEXCOUNT;
1171 break;
1172 }
1173
1174 if (str9icmp(m, 'g', 'e', 'o', 's', 'e', 'a', 'r', 'c', 'h')) {
1175 r->type = MSG_REQ_REDIS_GEOSEARCH;
1176 break;
1177 }
1178
1179 if (str9icmp(m, 'g', 'e', 'o', 'r', 'a', 'd', 'i', 'u', 's')) {
1180 r->type = MSG_REQ_REDIS_GEORADIUS;
1181 break;
1182 }
1183
1184 break;
1185
1186 case 10:
1187 if (str10icmp(m, 's', 'd', 'i', 'f', 'f', 's', 't', 'o', 'r', 'e')) {
1188 r->type = MSG_REQ_REDIS_SDIFFSTORE;
1189 break;
1190 }
1191
1192 if (str10icmp(m, 'h', 'r', 'a', 'n', 'd', 'f', 'i', 'e', 'l', 'd')) {
1193 r->type = MSG_REQ_REDIS_HRANDFIELD;
1194 break;
1195 }
1196
1197 if (str10icmp(m, 's', 'm', 'i', 's', 'm', 'e', 'm', 'b', 'e', 'r')) {
1198 r->type = MSG_REQ_REDIS_SMISMEMBER;
1199 break;
1200 }
1201
1202 if (str10icmp(m, 'z', 'd', 'i', 'f', 'f', 's', 't', 'o', 'r', 'e')) {
1203 r->type = MSG_REQ_REDIS_ZDIFFSTORE;
1204 break;
1205 }
1206
1207
1208 break;
1209
1210 case 11:
1211 if (str11icmp(m, 'i', 'n', 'c', 'r', 'b', 'y', 'f', 'l', 'o', 'a', 't')) {
1212 r->type = MSG_REQ_REDIS_INCRBYFLOAT;
1213 break;
1214 }
1215
1216 if (str11icmp(m, 's', 'i', 'n', 't', 'e', 'r', 's', 't', 'o', 'r', 'e')) {
1217 r->type = MSG_REQ_REDIS_SINTERSTORE;
1218 break;
1219 }
1220
1221 if (str11icmp(m, 's', 'r', 'a', 'n', 'd', 'm', 'e', 'm', 'b', 'e', 'r')) {
1222 r->type = MSG_REQ_REDIS_SRANDMEMBER;
1223 break;
1224 }
1225
1226 if (str11icmp(m, 's', 'u', 'n', 'i', 'o', 'n', 's', 't', 'o', 'r', 'e')) {
1227 r->type = MSG_REQ_REDIS_SUNIONSTORE;
1228 break;
1229 }
1230
1231 if (str11icmp(m, 'z', 'i', 'n', 't', 'e', 'r', 's', 't', 'o', 'r', 'e')) {
1232 r->type = MSG_REQ_REDIS_ZINTERSTORE;
1233 break;
1234 }
1235
1236 if (str11icmp(m, 'z', 'u', 'n', 'i', 'o', 'n', 's', 't', 'o', 'r', 'e')) {
1237 r->type = MSG_REQ_REDIS_ZUNIONSTORE;
1238 break;
1239 }
1240
1241 if (str11icmp(m, 'z', 'r', 'a', 'n', 'g', 'e', 'b', 'y', 'l', 'e', 'x')) {
1242 r->type = MSG_REQ_REDIS_ZRANGEBYLEX;
1243 break;
1244 }
1245
1246 if (str11icmp(m, 'z', 'r', 'a', 'n', 'd', 'm', 'e', 'm', 'b', 'e', 'r')) {
1247 r->type = MSG_REQ_REDIS_ZRANDMEMBER;
1248 break;
1249 }
1250
1251 if (str11icmp(m, 'z', 'r', 'a', 'n', 'g', 'e', 's', 't', 'o', 'r', 'e')) {
1252 r->type = MSG_REQ_REDIS_ZRANGESTORE;
1253 break;
1254 }
1255
1256 break;
1257
1258 case 12:
1259 if (str12icmp(m, 'h', 'i', 'n', 'c', 'r', 'b', 'y', 'f', 'l', 'o', 'a', 't')) {
1260 r->type = MSG_REQ_REDIS_HINCRBYFLOAT;
1261 break;
1262 }
1263
1264
1265 break;
1266
1267 case 13:
1268 if (str13icmp(m, 'z', 'r', 'a', 'n', 'g', 'e', 'b', 'y', 's', 'c', 'o', 'r', 'e')) {
1269 r->type = MSG_REQ_REDIS_ZRANGEBYSCORE;
1270 break;
1271 }
1272
1273 break;
1274
1275 case 14:
1276 if (str14icmp(m, 'z', 'r', 'e', 'm', 'r', 'a', 'n', 'g', 'e', 'b', 'y', 'l', 'e', 'x')) {
1277 r->type = MSG_REQ_REDIS_ZREMRANGEBYLEX;
1278 break;
1279 }
1280
1281 if (str14icmp(m, 'z', 'r', 'e', 'v', 'r', 'a', 'n', 'g', 'e', 'b', 'y', 'l', 'e', 'x')) {
1282 r->type = MSG_REQ_REDIS_ZREVRANGEBYLEX;
1283 break;
1284 }
1285 if (str14icmp(m, 'g', 'e', 'o', 's', 'e', 'a', 'r', 'c', 'h', 's', 't', 'o', 'r', 'e')) {
1286 r->type = MSG_REQ_REDIS_GEOSEARCHSTORE;
1287 break;
1288 }
1289
1290
1291 break;
1292
1293 case 15:
1294 if (str15icmp(m, 'z', 'r', 'e', 'm', 'r', 'a', 'n', 'g', 'e', 'b', 'y', 'r', 'a', 'n', 'k')) {
1295 r->type = MSG_REQ_REDIS_ZREMRANGEBYRANK;
1296 break;
1297 }
1298
1299 break;
1300
1301 case 16:
1302 if (str16icmp(m, 'z', 'r', 'e', 'm', 'r', 'a', 'n', 'g', 'e', 'b', 'y', 's', 'c', 'o', 'r', 'e')) {
1303 r->type = MSG_REQ_REDIS_ZREMRANGEBYSCORE;
1304 break;
1305 }
1306
1307 if (str16icmp(m, 'z', 'r', 'e', 'v', 'r', 'a', 'n', 'g', 'e', 'b', 'y', 's', 'c', 'o', 'r', 'e')) {
1308 r->type = MSG_REQ_REDIS_ZREVRANGEBYSCORE;
1309 break;
1310 }
1311 break;
1312
1313 case 17:
1314 if (str17icmp(m, 'g', 'e', 'o', 'r', 'a', 'd', 'i', 'u', 's', 'b', 'y', 'm', 'e', 'm', 'b', 'e', 'r')) {
1315 r->type = MSG_REQ_REDIS_GEORADIUSBYMEMBER;
1316 break;
1317 }
1318
1319 default:
1320 break;
1321 }
1322
1323 if (r->type == MSG_UNKNOWN) {
1324 log_error("parsed unsupported command '%.*s'", (int)(p - m), m);
1325 goto error;
1326 }
1327
1328 log_debug(LOG_VERB, "parsed command '%.*s'", (int)(p - m), m);
1329
1330 state = SW_REQ_TYPE_LF;
1331 break;
1332
1333 case SW_REQ_TYPE_LF:
1334 switch (ch) {
1335 case LF:
1336 if (redis_argz(r)) {
1337 if (r->narg != 1) {
1338 /* It's an error to provide more than one argument. */
1339 goto error;
1340 }
1341 goto done;
1342 } else if (redis_nokey(r)) {
1343 if (r->narg == 1) {
1344 goto done;
1345 }
1346 state = SW_ARGN_LEN;
1347 } else if (r->narg == 1) {
1348 goto error;
1349 } else if (redis_argeval(r)) {
1350 state = SW_ARG1_LEN;
1351 } else {
1352 state = SW_KEY_LEN;
1353 }
1354 break;
1355
1356 default:
1357 goto error;
1358 }
1359
1360 break;
1361
1362 case SW_KEY_LEN:
1363 if (r->token == NULL) {
1364 if (ch != '$') {
1365 goto error;
1366 }
1367 r->token = p;
1368 r->rlen = 0;
1369 } else if (isdigit(ch)) {
1370 r->rlen = r->rlen * 10 + (uint32_t)(ch - '0');
1371 } else if (ch == CR) {
1372 if (r->rlen >= mbuf_data_size()) {
1373 log_error("parsed bad req %"PRIu64" of type %d with key "
1374 "length %d that greater than or equal to maximum"
1375 " redis key length of %zu", r->id, r->type,
1376 r->rlen, mbuf_data_size());
1377 goto error;
1378 }
1379 if (r->rnarg == 0) {
1380 goto error;
1381 }
1382 r->rnarg--;
1383 r->token = NULL;
1384 state = SW_KEY_LEN_LF;
1385 } else {
1386 goto error;
1387 }
1388
1389 break;
1390
1391 case SW_KEY_LEN_LF:
1392 switch (ch) {
1393 case LF:
1394 state = SW_KEY;
1395 break;
1396
1397 default:
1398 goto error;
1399 }
1400
1401 break;
1402
1403 case SW_KEY:
1404 if (r->token == NULL) {
1405 r->token = p;
1406 }
1407
1408 m = r->token + r->rlen;
1409 if (m >= b->last) {
1410 m = b->last - 1;
1411 p = m;
1412 break;
1413 }
1414
1415 if (*m != CR) {
1416 goto error;
1417 } else { /* got a key */
1418 struct keypos *kpos;
1419
1420 p = m; /* move forward by rlen bytes */
1421 r->rlen = 0;
1422 m = r->token;
1423 r->token = NULL;
1424
1425 kpos = array_push(r->keys);
1426 if (kpos == NULL) {
1427 goto enomem;
1428 }
1429 kpos->start = m;
1430 kpos->end = p;
1431
1432 state = SW_KEY_LF;
1433 }
1434
1435 break;
1436
1437 case SW_KEY_LF:
1438 switch (ch) {
1439 case LF:
1440 if (redis_arg0(r)) {
1441 if (r->rnarg != 0) {
1442 goto error;
1443 }
1444 goto done;
1445 } else if (redis_arg1(r)) {
1446 if (r->rnarg != 1) {
1447 goto error;
1448 }
1449 state = SW_ARG1_LEN;
1450 } else if (redis_arg2(r)) {
1451 if (r->rnarg != 2) {
1452 goto error;
1453 }
1454 state = SW_ARG1_LEN;
1455 } else if (redis_arg3(r)) {
1456 if (r->rnarg != 3) {
1457 goto error;
1458 }
1459 state = SW_ARG1_LEN;
1460 } else if (redis_argn(r)) {
1461 if (r->rnarg == 0) {
1462 goto done;
1463 }
1464 state = SW_ARG1_LEN;
1465 } else if (redis_argx(r)) {
1466 if (r->rnarg == 0) {
1467 goto done;
1468 }
1469 state = SW_KEY_LEN;
1470 } else if (redis_argkvx(r)) {
1471 if (r->narg % 2 == 0) {
1472 goto error;
1473 }
1474 state = SW_ARG1_LEN;
1475 } else if (redis_argeval(r)) {
1476 if (r->rnarg == 0) {
1477 goto done;
1478 }
1479 state = SW_ARGN_LEN;
1480 } else {
1481 goto error;
1482 }
1483
1484 break;
1485
1486 default:
1487 goto error;
1488 }
1489
1490 break;
1491
1492 case SW_ARG1_LEN:
1493 if (r->token == NULL) {
1494 if (ch != '$') {
1495 goto error;
1496 }
1497 r->rlen = 0;
1498 r->token = p;
1499 } else if (isdigit(ch)) {
1500 r->rlen = r->rlen * 10 + (uint32_t)(ch - '0');
1501 } else if (ch == CR) {
1502 if ((p - r->token) <= 1 || r->rnarg == 0) {
1503 goto error;
1504 }
1505 r->rnarg--;
1506 r->token = NULL;
1507 state = SW_ARG1_LEN_LF;
1508 } else {
1509 goto error;
1510 }
1511
1512 break;
1513
1514 case SW_ARG1_LEN_LF:
1515 switch (ch) {
1516 case LF:
1517 state = SW_ARG1;
1518 break;
1519
1520 default:
1521 goto error;
1522 }
1523
1524 break;
1525
1526 case SW_ARG1:
1527 m = p + r->rlen;
1528 if (m >= b->last) {
1529 r->rlen -= (uint32_t)(b->last - p);
1530 m = b->last - 1;
1531 p = m;
1532 break;
1533 }
1534
1535 if (*m != CR) {
1536 goto error;
1537 }
1538
1539 p = m; /* move forward by rlen bytes */
1540 r->rlen = 0;
1541
1542 state = SW_ARG1_LF;
1543
1544 break;
1545
1546 case SW_ARG1_LF:
1547 switch (ch) {
1548 case LF:
1549 if (redis_arg1(r)) {
1550 if (r->rnarg != 0) {
1551 goto error;
1552 }
1553 goto done;
1554 } else if (redis_arg2(r)) {
1555 if (r->rnarg != 1) {
1556 goto error;
1557 }
1558 state = SW_ARG2_LEN;
1559 } else if (redis_arg3(r)) {
1560 if (r->rnarg != 2) {
1561 goto error;
1562 }
1563 state = SW_ARG2_LEN;
1564 } else if (redis_argn(r)) {
1565 if (r->rnarg == 0) {
1566 goto done;
1567 }
1568 state = SW_ARGN_LEN;
1569 } else if (redis_argeval(r)) {
1570 if (r->rnarg < 2) {
1571 goto error;
1572 }
1573 state = SW_ARG2_LEN;
1574 } else if (redis_argkvx(r)) {
1575 if (r->rnarg == 0) {
1576 goto done;
1577 }
1578 state = SW_KEY_LEN;
1579 } else {
1580 goto error;
1581 }
1582
1583 break;
1584
1585 default:
1586 goto error;
1587 }
1588
1589 break;
1590
1591 case SW_ARG2_LEN:
1592 if (r->token == NULL) {
1593 if (ch != '$') {
1594 goto error;
1595 }
1596 r->rlen = 0;
1597 r->token = p;
1598 } else if (isdigit(ch)) {
1599 r->rlen = r->rlen * 10 + (uint32_t)(ch - '0');
1600 } else if (ch == CR) {
1601 if ((p - r->token) <= 1 || r->rnarg == 0) {
1602 goto error;
1603 }
1604 r->rnarg--;
1605 r->token = NULL;
1606 state = SW_ARG2_LEN_LF;
1607 } else {
1608 goto error;
1609 }
1610
1611 break;
1612
1613 case SW_ARG2_LEN_LF:
1614 switch (ch) {
1615 case LF:
1616 state = SW_ARG2;
1617 break;
1618
1619 default:
1620 goto error;
1621 }
1622
1623 break;
1624
1625 case SW_ARG2:
1626 if (r->token == NULL && redis_argeval(r)) {
1627 /*
1628 * For EVAL/EVALSHA, ARG2 represents the # key/arg pairs which must
1629 * be tokenized and stored in contiguous memory.
1630 */
1631 r->token = p;
1632 }
1633
1634 m = p + r->rlen;
1635 if (m >= b->last) {
1636 r->rlen -= (uint32_t)(b->last - p);
1637 m = b->last - 1;
1638 p = m;
1639 break;
1640 }
1641
1642 if (*m != CR) {
1643 goto error;
1644 }
1645
1646 p = m; /* move forward by rlen bytes */
1647 r->rlen = 0;
1648
1649 if (redis_argeval(r)) {
1650 uint32_t nkey;
1651 uint8_t *chp;
1652
1653 /*
1654 * For EVAL/EVALSHA, we need to find the integer value of this
1655 * argument. It tells us the number of keys in the script, and
1656 * we need to error out if number of keys is 0. At this point,
1657 * both p and m point to the end of the argument and r->token
1658 * points to the start.
1659 */
1660 if (p - r->token < 1) {
1661 goto error;
1662 }
1663
1664 for (nkey = 0, chp = r->token; chp < p; chp++) {
1665 if (isdigit(*chp)) {
1666 nkey = nkey * 10 + (uint32_t)(*chp - '0');
1667 } else {
1668 goto error;
1669 }
1670 }
1671 if (nkey == 0) {
1672 goto error;
1673 }
1674
1675 r->token = NULL;
1676 }
1677
1678 state = SW_ARG2_LF;
1679
1680 break;
1681
1682 case SW_ARG2_LF:
1683 switch (ch) {
1684 case LF:
1685 if (redis_arg2(r)) {
1686 if (r->rnarg != 0) {
1687 goto error;
1688 }
1689 goto done;
1690 } else if (redis_arg3(r)) {
1691 if (r->rnarg != 1) {
1692 goto error;
1693 }
1694 state = SW_ARG3_LEN;
1695 } else if (redis_argn(r)) {
1696 if (r->rnarg == 0) {
1697 goto done;
1698 }
1699 state = SW_ARGN_LEN;
1700 } else if (redis_argeval(r)) {
1701 if (r->rnarg < 1) {
1702 goto error;
1703 }
1704 state = SW_KEY_LEN;
1705 } else {
1706 goto error;
1707 }
1708
1709 break;
1710
1711 default:
1712 goto error;
1713 }
1714
1715 break;
1716
1717 case SW_ARG3_LEN:
1718 if (r->token == NULL) {
1719 if (ch != '$') {
1720 goto error;
1721 }
1722 r->rlen = 0;
1723 r->token = p;
1724 } else if (isdigit(ch)) {
1725 r->rlen = r->rlen * 10 + (uint32_t)(ch - '0');
1726 } else if (ch == CR) {
1727 if ((p - r->token) <= 1 || r->rnarg == 0) {
1728 goto error;
1729 }
1730 r->rnarg--;
1731 r->token = NULL;
1732 state = SW_ARG3_LEN_LF;
1733 } else {
1734 goto error;
1735 }
1736
1737 break;
1738
1739 case SW_ARG3_LEN_LF:
1740 switch (ch) {
1741 case LF:
1742 state = SW_ARG3;
1743 break;
1744
1745 default:
1746 goto error;
1747 }
1748
1749 break;
1750
1751 case SW_ARG3:
1752 m = p + r->rlen;
1753 if (m >= b->last) {
1754 r->rlen -= (uint32_t)(b->last - p);
1755 m = b->last - 1;
1756 p = m;
1757 break;
1758 }
1759
1760 if (*m != CR) {
1761 goto error;
1762 }
1763
1764 p = m; /* move forward by rlen bytes */
1765 r->rlen = 0;
1766 state = SW_ARG3_LF;
1767
1768 break;
1769
1770 case SW_ARG3_LF:
1771 switch (ch) {
1772 case LF:
1773 if (redis_arg3(r)) {
1774 if (r->rnarg != 0) {
1775 goto error;
1776 }
1777 goto done;
1778 } else if (redis_argn(r)) {
1779 if (r->rnarg == 0) {
1780 goto done;
1781 }
1782 state = SW_ARGN_LEN;
1783 } else {
1784 goto error;
1785 }
1786
1787 break;
1788
1789 default:
1790 goto error;
1791 }
1792
1793 break;
1794
1795 case SW_ARGN_LEN:
1796 if (r->token == NULL) {
1797 if (ch != '$') {
1798 goto error;
1799 }
1800 r->rlen = 0;
1801 r->token = p;
1802 } else if (isdigit(ch)) {
1803 r->rlen = r->rlen * 10 + (uint32_t)(ch - '0');
1804 } else if (ch == CR) {
1805 if ((p - r->token) <= 1 || r->rnarg == 0) {
1806 goto error;
1807 }
1808 r->rnarg--;
1809 r->token = NULL;
1810 state = SW_ARGN_LEN_LF;
1811 } else {
1812 goto error;
1813 }
1814
1815 break;
1816
1817 case SW_ARGN_LEN_LF:
1818 switch (ch) {
1819 case LF:
1820 state = SW_ARGN;
1821 break;
1822
1823 default:
1824 goto error;
1825 }
1826
1827 break;
1828
1829 case SW_ARGN:
1830 m = p + r->rlen;
1831 if (m >= b->last) {
1832 r->rlen -= (uint32_t)(b->last - p);
1833 m = b->last - 1;
1834 p = m;
1835 break;
1836 }
1837
1838 if (*m != CR) {
1839 goto error;
1840 }
1841
1842 p = m; /* move forward by rlen bytes */
1843 r->rlen = 0;
1844 state = SW_ARGN_LF;
1845
1846 break;
1847
1848 case SW_ARGN_LF:
1849 switch (ch) {
1850 case LF:
1851 if (redis_argn(r) || redis_argeval(r) || redis_nokey(r)) {
1852 if (r->rnarg == 0) {
1853 goto done;
1854 }
1855 state = SW_ARGN_LEN;
1856 } else {
1857 goto error;
1858 }
1859
1860 break;
1861
1862 default:
1863 goto error;
1864 }
1865
1866 break;
1867
1868 case SW_SENTINEL:
1869 default:
1870 NOT_REACHED();
1871 break;
1872 }
1873 }
1874
1875 ASSERT(p == b->last);
1876 r->pos = p;
1877 r->state = state;
1878
1879 if (b->last == b->end && r->token != NULL) {
1880 r->pos = r->token;
1881 r->token = NULL;
1882 r->result = MSG_PARSE_REPAIR;
1883 } else {
1884 r->result = MSG_PARSE_AGAIN;
1885 }
1886
1887 log_hexdump(LOG_VERB, b->pos, mbuf_length(b), "parsed req %"PRIu64" res %d "
1888 "type %d state %d rpos %d of %d", r->id, r->result, r->type,
1889 r->state, (int)(r->pos - b->pos), (int)(b->last - b->pos));
1890 return;
1891
1892 done:
1893 ASSERT(r->type > MSG_UNKNOWN && r->type < MSG_SENTINEL);
1894 r->pos = p + 1;
1895 ASSERT(r->pos <= b->last);
1896 r->state = SW_START;
1897 r->token = NULL;
1898 r->result = MSG_PARSE_OK;
1899
1900 log_hexdump(LOG_VERB, b->pos, mbuf_length(b), "parsed req %"PRIu64" res %d "
1901 "type %d state %d rpos %d of %d", r->id, r->result, r->type,
1902 r->state, (int)(r->pos - b->pos), (int)(b->last - b->pos));
1903 return;
1904
1905 enomem:
1906 r->result = MSG_PARSE_ERROR;
1907 r->state = state;
1908
1909 log_hexdump(LOG_INFO, b->pos, mbuf_length(b), "out of memory on parse req %"PRIu64" "
1910 "res %d type %d state %d", r->id, r->result, r->type, r->state);
1911
1912 return;
1913
1914 error:
1915 r->result = MSG_PARSE_ERROR;
1916 r->state = state;
1917 errno = EINVAL;
1918
1919 log_hexdump(LOG_INFO, b->pos, mbuf_length(b), "parsed bad req %"PRIu64" "
1920 "res %d type %d state %d", r->id, r->result, r->type,
1921 r->state);
1922 }
1923
1924 /*
1925 * Reference: http://redis.io/topics/protocol
1926 *
1927 * Redis will reply to commands with different kinds of replies. It is
1928 * possible to check the kind of reply from the first byte sent by the
1929 * server:
1930 * - with a single line reply the first byte of the reply will be "+"
1931 * - with an error message the first byte of the reply will be "-"
1932 * - with an integer number the first byte of the reply will be ":"
1933 * - with bulk reply the first byte of the reply will be "$"
1934 * - with multi-bulk reply the first byte of the reply will be "*"
1935 *
1936 * 1). Status reply (or single line reply) is in the form of a single line
1937 * string starting with "+" terminated by "\r\n".
1938 * 2). Error reply are similar to status replies. The only difference is
1939 * that the first byte is "-" instead of "+".
1940 * 3). Integer reply is just a CRLF terminated string representing an
1941 * integer, and prefixed by a ":" byte.
1942 * 4). Bulk reply is used by server to return a single binary safe string.
1943 * The first reply line is a "$" byte followed by the number of bytes
1944 * of the actual reply, followed by CRLF, then the actual data bytes,
1945 * followed by additional two bytes for the final CRLF. If the requested
1946 * value does not exist the bulk reply will use the special value '-1'
1947 * as the data length.
1948 * 5). Multi-bulk reply is used by the server to return many binary safe
1949 * strings (bulks) with the initial line indicating how many bulks that
1950 * will follow. The first byte of a multi bulk reply is always *.
1951 */
1952 void
redis_parse_rsp(struct msg * r)1953 redis_parse_rsp(struct msg *r)
1954 {
1955 struct mbuf *b;
1956 uint8_t *p, *m;
1957 uint8_t ch;
1958
1959 enum {
1960 SW_START,
1961 SW_STATUS,
1962 SW_ERROR,
1963 SW_INTEGER_START,
1964 SW_SIMPLE,
1965 SW_BULK,
1966 SW_BULK_LF,
1967 SW_BULK_ARG,
1968 SW_BULK_ARG_LF,
1969 SW_MULTIBULK,
1970 SW_MULTIBULK_NARG_LF,
1971 SW_MULTIBULK_ARGN_LEN,
1972 SW_MULTIBULK_ARGN_LEN_LF,
1973 SW_MULTIBULK_ARGN,
1974 SW_MULTIBULK_ARGN_LF,
1975 SW_RUNTO_CRLF,
1976 SW_ALMOST_DONE,
1977 SW_SENTINEL
1978 } state;
1979
1980 state = r->state;
1981 b = STAILQ_LAST(&r->mhdr, mbuf, next);
1982
1983 ASSERT(!r->request);
1984 ASSERT(state >= SW_START && state < SW_SENTINEL);
1985 ASSERT(b != NULL);
1986 ASSERT(b->pos <= b->last);
1987
1988 /* validate the parsing marker */
1989 ASSERT(r->pos != NULL);
1990 ASSERT(r->pos >= b->pos && r->pos <= b->last);
1991
1992 for (p = r->pos; p < b->last; p++) {
1993 ch = *p;
1994
1995 switch (state) {
1996 case SW_START:
1997 r->type = MSG_UNKNOWN;
1998 r->rnarg = 1;
1999 r->is_top_level = 1;
2000
2001 switch (ch) {
2002 case '+':
2003 p = p - 1; /* go back by 1 byte */
2004 r->type = MSG_RSP_REDIS_STATUS;
2005 state = SW_STATUS;
2006 break;
2007
2008 case '-':
2009 r->type = MSG_RSP_REDIS_ERROR;
2010 p = p - 1; /* go back by 1 byte */
2011 state = SW_ERROR;
2012 break;
2013
2014 case ':':
2015 r->type = MSG_RSP_REDIS_INTEGER;
2016 r->integer = 0;
2017 state = SW_INTEGER_START;
2018 break;
2019
2020 case '$':
2021 r->type = MSG_RSP_REDIS_BULK;
2022 p = p - 1; /* go back by 1 byte */
2023 state = SW_BULK;
2024 break;
2025
2026 case '*':
2027 r->type = MSG_RSP_REDIS_MULTIBULK;
2028 p = p - 1; /* go back by 1 byte */
2029 state = SW_MULTIBULK;
2030 break;
2031
2032 default:
2033 goto error;
2034 }
2035
2036 break;
2037
2038 case SW_STATUS:
2039 /* rsp_start <- p */
2040 state = SW_RUNTO_CRLF;
2041 break;
2042
2043 case SW_ERROR:
2044 if (r->token == NULL) {
2045 if (ch != '-') {
2046 goto error;
2047 }
2048 /* rsp_start <- p */
2049 r->token = p;
2050 }
2051 if (ch == ' ' || ch == CR) {
2052 m = r->token;
2053 r->token = NULL;
2054 switch (p - m) {
2055
2056 case 4:
2057 /*
2058 * -ERR no such key\r\n
2059 * -ERR syntax error\r\n
2060 * -ERR source and destination objects are the same\r\n
2061 * -ERR index out of range\r\n
2062 */
2063 if (str4cmp(m, '-', 'E', 'R', 'R')) {
2064 r->type = MSG_RSP_REDIS_ERROR_ERR;
2065 break;
2066 }
2067
2068 /* -OOM command not allowed when used memory > 'maxmemory'.\r\n */
2069 if (str4cmp(m, '-', 'O', 'O', 'M')) {
2070 r->type = MSG_RSP_REDIS_ERROR_OOM;
2071 break;
2072 }
2073
2074 break;
2075
2076 case 5:
2077 /* -BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n" */
2078 if (str5cmp(m, '-', 'B', 'U', 'S', 'Y')) {
2079 r->type = MSG_RSP_REDIS_ERROR_BUSY;
2080 break;
2081 }
2082
2083 break;
2084
2085 case 7:
2086 /* -NOAUTH Authentication required.\r\n */
2087 if (str7cmp(m, '-', 'N', 'O', 'A', 'U', 'T', 'H')) {
2088 r->type = MSG_RSP_REDIS_ERROR_NOAUTH;
2089 break;
2090 }
2091
2092 break;
2093
2094 case 8:
2095 /* rsp: "-LOADING Redis is loading the dataset in memory\r\n" */
2096 if (str8cmp(m, '-', 'L', 'O', 'A', 'D', 'I', 'N', 'G')) {
2097 r->type = MSG_RSP_REDIS_ERROR_LOADING;
2098 break;
2099 }
2100
2101 /* -BUSYKEY Target key name already exists.\r\n */
2102 if (str8cmp(m, '-', 'B', 'U', 'S', 'Y', 'K', 'E', 'Y')) {
2103 r->type = MSG_RSP_REDIS_ERROR_BUSYKEY;
2104 break;
2105 }
2106
2107 /* "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n" */
2108 if (str8cmp(m, '-', 'M', 'I', 'S', 'C', 'O', 'N', 'F')) {
2109 r->type = MSG_RSP_REDIS_ERROR_MISCONF;
2110 break;
2111 }
2112
2113 break;
2114
2115 case 9:
2116 /* -NOSCRIPT No matching script. Please use EVAL.\r\n */
2117 if (str9cmp(m, '-', 'N', 'O', 'S', 'C', 'R', 'I', 'P', 'T')) {
2118 r->type = MSG_RSP_REDIS_ERROR_NOSCRIPT;
2119 break;
2120 }
2121
2122 /* -READONLY You can't write against a read only slave.\r\n */
2123 if (str9cmp(m, '-', 'R', 'E', 'A', 'D', 'O', 'N', 'L', 'Y')) {
2124 r->type = MSG_RSP_REDIS_ERROR_READONLY;
2125 break;
2126 }
2127
2128 break;
2129
2130 case 10:
2131 /* -WRONGTYPE Operation against a key holding the wrong kind of value\r\n */
2132 if (str10cmp(m, '-', 'W', 'R', 'O', 'N', 'G', 'T', 'Y', 'P', 'E')) {
2133 r->type = MSG_RSP_REDIS_ERROR_WRONGTYPE;
2134 break;
2135 }
2136
2137 /* -EXECABORT Transaction discarded because of previous errors.\r\n" */
2138 if (str10cmp(m, '-', 'E', 'X', 'E', 'C', 'A', 'B', 'O', 'R', 'T')) {
2139 r->type = MSG_RSP_REDIS_ERROR_EXECABORT;
2140 break;
2141 }
2142
2143 break;
2144
2145 case 11:
2146 /* -MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n */
2147 if (str11cmp(m, '-', 'M', 'A', 'S', 'T', 'E', 'R', 'D', 'O', 'W', 'N')) {
2148 r->type = MSG_RSP_REDIS_ERROR_MASTERDOWN;
2149 break;
2150 }
2151
2152 /* -NOREPLICAS Not enough good slaves to write.\r\n */
2153 if (str11cmp(m, '-', 'N', 'O', 'R', 'E', 'P', 'L', 'I', 'C', 'A', 'S')) {
2154 r->type = MSG_RSP_REDIS_ERROR_NOREPLICAS;
2155 break;
2156 }
2157
2158 break;
2159 }
2160 if (ch == '\r') {
2161 state = SW_ALMOST_DONE;
2162 } else {
2163 /* Read remaining characters until '\r' */
2164 state = SW_RUNTO_CRLF;
2165 }
2166 }
2167
2168 break;
2169
2170 case SW_SIMPLE:
2171 if (ch == CR) {
2172 ASSERT(r->rnarg > 0);
2173 r->rnarg--;
2174 state = SW_MULTIBULK_ARGN_LF;
2175 }
2176 break;
2177
2178 case SW_INTEGER_START:
2179 if (ch == CR) {
2180 state = SW_ALMOST_DONE;
2181 } else if (ch == '-') {
2182 ;
2183 } else if (isdigit(ch)) {
2184 r->integer = r->integer * 10 + (uint32_t)(ch - '0');
2185 } else {
2186 goto error;
2187 }
2188 break;
2189
2190 case SW_RUNTO_CRLF:
2191 switch (ch) {
2192 case CR:
2193 state = SW_ALMOST_DONE;
2194 break;
2195
2196 default:
2197 break;
2198 }
2199
2200 break;
2201
2202 case SW_ALMOST_DONE:
2203 switch (ch) {
2204 case LF:
2205 /* rsp_end <- p */
2206 goto done;
2207
2208 default:
2209 goto error;
2210 }
2211
2212 break;
2213
2214 case SW_BULK:
2215 /*
2216 * SW_BULK is used for top-level bulk string replies.
2217 * Within an array, SW_MULTIBULK_ARG... helpers are used
2218 * to parse bulk strings instead.
2219 */
2220 if (r->token == NULL) {
2221 if (ch != '$') {
2222 goto error;
2223 }
2224 /* rsp_start <- p */
2225 r->token = p;
2226 r->rlen = 0;
2227 } else if (ch == '-') {
2228 /* handles null bulk reply = '$-1' */
2229 state = SW_RUNTO_CRLF;
2230 } else if (isdigit(ch)) {
2231 r->rlen = r->rlen * 10 + (uint32_t)(ch - '0');
2232 } else if (ch == CR) {
2233 if ((p - r->token) <= 1) {
2234 goto error;
2235 }
2236 r->token = NULL;
2237 state = SW_BULK_LF;
2238 } else {
2239 goto error;
2240 }
2241
2242 break;
2243
2244 case SW_BULK_LF:
2245 switch (ch) {
2246 case LF:
2247 state = SW_BULK_ARG;
2248 break;
2249
2250 default:
2251 goto error;
2252 }
2253
2254 break;
2255
2256 case SW_BULK_ARG:
2257 m = p + r->rlen;
2258 if (m >= b->last) {
2259 r->rlen -= (uint32_t)(b->last - p);
2260 m = b->last - 1;
2261 p = m;
2262 break;
2263 }
2264
2265 if (*m != CR) {
2266 goto error;
2267 }
2268
2269 p = m; /* move forward by rlen bytes */
2270 r->rlen = 0;
2271
2272 state = SW_BULK_ARG_LF;
2273
2274 break;
2275
2276 case SW_BULK_ARG_LF:
2277 switch (ch) {
2278 case LF:
2279 goto done;
2280
2281 default:
2282 goto error;
2283 }
2284
2285 break;
2286
2287 case SW_MULTIBULK:
2288 if (r->token == NULL) {
2289 if (ch != '*') {
2290 goto error;
2291 }
2292 r->vlen = 0;
2293 r->token = p;
2294 /* rsp_start <- p */
2295 if (r->is_top_level) {
2296 r->narg_start = p;
2297 }
2298 } else if (ch == '-') {
2299 p = p-1;
2300 r->token = NULL;
2301 /*
2302 * This is a null array (e.g. from BLPOP). Don't increment rnarg
2303 * https://redis.io/topics/protocol
2304 */
2305 r->vlen = 1;
2306 state = SW_MULTIBULK_ARGN_LEN;
2307 } else if (isdigit(ch)) {
2308 r->vlen = r->vlen * 10 + (uint32_t)(ch - '0');
2309 } else if (ch == CR) {
2310 if ((p - r->token) <= 1) {
2311 goto error;
2312 }
2313
2314 if (r->is_top_level) {
2315 /* For multiget responses, we may need to know the number of responses to combine them. */
2316 r->narg = r->vlen;
2317 r->narg_end = p;
2318 }
2319 r->is_top_level = 0;
2320 ASSERT(r->rnarg > 0);
2321 r->rnarg += r->vlen - 1;
2322 r->token = NULL;
2323
2324 /*
2325 * The stack is always initialized before transitioning
2326 * to another state.
2327 */
2328 state = SW_MULTIBULK_NARG_LF;
2329 } else {
2330 goto error;
2331 }
2332
2333 break;
2334
2335 case SW_MULTIBULK_NARG_LF:
2336 switch (ch) {
2337 case LF:
2338 if (r->rnarg == 0) {
2339 /* response is '*0\r\n' */
2340 goto done;
2341 }
2342
2343 state = SW_MULTIBULK_ARGN_LEN;
2344 break;
2345
2346 default:
2347 goto error;
2348 }
2349
2350 break;
2351
2352 case SW_MULTIBULK_ARGN_LEN:
2353 if (r->token == NULL) {
2354 /*
2355 * From: http://redis.io/topics/protocol, a multi bulk reply
2356 * is used to return an array of other replies. Every element
2357 * of a multi bulk reply can be of any kind, including a
2358 * nested multi bulk reply.
2359 *
2360 * Here, we only handle a multi bulk reply element that
2361 * are either integer reply or bulk reply.
2362 *
2363 * there is a special case for sscan/hscan/zscan, these command
2364 * replay a nested multi-bulk with a number and a multi bulk like this:
2365 *
2366 * - multi-bulk
2367 * - cursor
2368 * - multi-bulk
2369 * - val1
2370 * - val2
2371 * - val3
2372 *
2373 * in this case, there is only one sub-multi-bulk,
2374 * and it's the last element of parent,
2375 * we can handle it like tail-recursive.
2376 *
2377 */
2378 if (ch == '*') { /* for sscan/hscan/zscan only */
2379 p = p - 1; /* go back by 1 byte */
2380 state = SW_MULTIBULK;
2381 break;
2382 }
2383
2384 if (ch == ':' || ch == '+' || ch == '-') {
2385 /* handles not-found reply = '$-1' or integer reply = ':<num>' */
2386 /* and *2\r\n$2\r\nr0\r\n+OK\r\n or *1\r\n+OK\r\n */
2387 state = SW_SIMPLE;
2388 break;
2389 }
2390
2391 if (ch != '$') {
2392 goto error;
2393 }
2394
2395 r->token = p;
2396 r->rlen = 0;
2397 } else if (isdigit(ch)) {
2398 r->rlen = r->rlen * 10 + (uint32_t)(ch - '0');
2399 } else if (ch == '-') {
2400 ;
2401 } else if (ch == CR) {
2402 if ((p - r->token) <= 1 || r->rnarg == 0) {
2403 goto error;
2404 }
2405
2406 if ((r->rlen == 1 && (p - r->token) == 3)) {
2407 r->rlen = 0;
2408 state = SW_MULTIBULK_ARGN_LF;
2409 } else {
2410 state = SW_MULTIBULK_ARGN_LEN_LF;
2411 }
2412 ASSERT(r->rnarg > 0);
2413 r->rnarg--;
2414 r->token = NULL;
2415 } else {
2416 goto error;
2417 }
2418
2419 break;
2420
2421 case SW_MULTIBULK_ARGN_LEN_LF:
2422 switch (ch) {
2423 case LF:
2424 state = SW_MULTIBULK_ARGN;
2425 break;
2426
2427 default:
2428 goto error;
2429 }
2430
2431 break;
2432
2433 case SW_MULTIBULK_ARGN:
2434 m = p + r->rlen;
2435 if (m >= b->last) {
2436 r->rlen -= (uint32_t)(b->last - p);
2437 m = b->last - 1;
2438 p = m;
2439 break;
2440 }
2441
2442 if (*m != CR) {
2443 goto error;
2444 }
2445
2446 p += r->rlen; /* move forward by rlen bytes */
2447 r->rlen = 0;
2448
2449 state = SW_MULTIBULK_ARGN_LF;
2450
2451 break;
2452
2453 case SW_MULTIBULK_ARGN_LF:
2454 switch (ch) {
2455 case LF:
2456 if (r->rnarg == 0) {
2457 goto done;
2458 }
2459
2460 state = SW_MULTIBULK_ARGN_LEN;
2461 break;
2462
2463 default:
2464 goto error;
2465 }
2466
2467 break;
2468
2469 case SW_SENTINEL:
2470 default:
2471 NOT_REACHED();
2472 break;
2473 }
2474 }
2475
2476 ASSERT(p == b->last);
2477 r->pos = p;
2478 r->state = state;
2479
2480 if (b->last == b->end && r->token != NULL) {
2481 r->pos = r->token;
2482 r->token = NULL;
2483 r->result = MSG_PARSE_REPAIR;
2484 } else {
2485 r->result = MSG_PARSE_AGAIN;
2486 }
2487
2488 log_hexdump(LOG_VERB, b->pos, mbuf_length(b), "parsed rsp %"PRIu64" res %d "
2489 "type %d state %d rpos %d of %d", r->id, r->result, r->type,
2490 r->state, (int)(r->pos - b->pos), (int)(b->last - b->pos));
2491 return;
2492
2493 done:
2494 ASSERT(r->type > MSG_UNKNOWN && r->type < MSG_SENTINEL);
2495 r->pos = p + 1;
2496 ASSERT(r->pos <= b->last);
2497 r->state = SW_START;
2498 r->token = NULL;
2499 r->result = MSG_PARSE_OK;
2500
2501 log_hexdump(LOG_VERB, b->pos, mbuf_length(b), "parsed rsp %"PRIu64" res %d "
2502 "type %d state %d rpos %d of %d", r->id, r->result, r->type,
2503 r->state, (int)(r->pos - b->pos), (int)(b->last - b->pos));
2504 return;
2505
2506 error:
2507 r->result = MSG_PARSE_ERROR;
2508 r->state = state;
2509 errno = EINVAL;
2510
2511 log_hexdump(LOG_INFO, b->pos, mbuf_length(b), "parsed bad rsp %"PRIu64" "
2512 "res %d type %d state %d", r->id, r->result, r->type,
2513 r->state);
2514 }
2515
2516 /*
2517 * Return true, if redis replies with a transient server failure response,
2518 * otherwise return false
2519 *
2520 * Transient failures on redis are scenarios when it is temporarily
2521 * unresponsive and responds with the following protocol specific error
2522 * reply:
2523 * -OOM, when redis is out-of-memory
2524 * -BUSY, when redis is busy
2525 * -LOADING when redis is loading dataset into memory
2526 *
2527 * See issue: https://github.com/twitter/twemproxy/issues/369
2528 */
2529 bool
redis_failure(const struct msg * r)2530 redis_failure(const struct msg *r)
2531 {
2532 ASSERT(!r->request);
2533
2534 switch (r->type) {
2535 case MSG_RSP_REDIS_ERROR_OOM:
2536 case MSG_RSP_REDIS_ERROR_BUSY:
2537 case MSG_RSP_REDIS_ERROR_LOADING:
2538 return true;
2539
2540 default:
2541 break;
2542 }
2543
2544 return false;
2545 }
2546
2547 /*
2548 * copy one bulk from src to dst
2549 *
2550 * if dst == NULL, we just eat the bulk
2551 *
2552 * */
2553 static rstatus_t
redis_copy_bulk(struct msg * dst,struct msg * src)2554 redis_copy_bulk(struct msg *dst, struct msg *src)
2555 {
2556 struct mbuf *mbuf, *nbuf;
2557 uint8_t *p;
2558 uint32_t len = 0;
2559 uint32_t bytes = 0;
2560 rstatus_t status;
2561
2562 for (mbuf = STAILQ_FIRST(&src->mhdr);
2563 mbuf && mbuf_empty(mbuf);
2564 mbuf = STAILQ_FIRST(&src->mhdr)) {
2565
2566 mbuf_remove(&src->mhdr, mbuf);
2567 mbuf_put(mbuf);
2568 }
2569
2570 mbuf = STAILQ_FIRST(&src->mhdr);
2571 if (mbuf == NULL) {
2572 return NC_ERROR;
2573 }
2574
2575 p = mbuf->pos;
2576 ASSERT(*p == '$');
2577 p++;
2578
2579 if (p[0] == '-' && p[1] == '1') {
2580 len = 1 + 2 + CRLF_LEN; /* $-1\r\n */
2581 p = mbuf->pos + len;
2582 } else {
2583 len = 0;
2584 for (; p < mbuf->last && isdigit(*p); p++) {
2585 len = len * 10 + (uint32_t)(*p - '0');
2586 }
2587 len += CRLF_LEN * 2;
2588 len += (p - mbuf->pos);
2589 }
2590 bytes = len;
2591
2592 /* copy len bytes to dst */
2593 for (; mbuf;) {
2594 if (mbuf_length(mbuf) <= len) { /* steal this buf from src to dst */
2595 nbuf = STAILQ_NEXT(mbuf, next);
2596 mbuf_remove(&src->mhdr, mbuf);
2597 if (dst != NULL) {
2598 mbuf_insert(&dst->mhdr, mbuf);
2599 } else {
2600 mbuf_put(mbuf);
2601 }
2602 len -= mbuf_length(mbuf);
2603 mbuf = nbuf;
2604 } else { /* split it */
2605 if (dst != NULL) {
2606 status = msg_append(dst, mbuf->pos, len);
2607 if (status != NC_OK) {
2608 return status;
2609 }
2610 }
2611 mbuf->pos += len;
2612 break;
2613 }
2614 }
2615
2616 if (dst != NULL) {
2617 dst->mlen += bytes;
2618 }
2619 src->mlen -= bytes;
2620 log_debug(LOG_VVERB, "redis_copy_bulk copy bytes: %d", bytes);
2621 return NC_OK;
2622 }
2623
2624 /*
2625 * Pre-coalesce handler is invoked when the message is a response to
2626 * the fragmented multi vector request - 'mget' or 'del' and all the
2627 * responses to the fragmented request vector hasn't been received
2628 */
2629 void
redis_pre_coalesce(struct msg * r)2630 redis_pre_coalesce(struct msg *r)
2631 {
2632 struct msg *pr = r->peer; /* peer request */
2633 struct mbuf *mbuf;
2634
2635 ASSERT(!r->request);
2636 ASSERT(pr->request);
2637
2638 if (pr->frag_id == 0) {
2639 /* do nothing, if not a response to a fragmented request */
2640 return;
2641 }
2642 pr->frag_owner->nfrag_done++;
2643
2644 switch (r->type) {
2645 case MSG_RSP_REDIS_INTEGER:
2646 /* only redis 'del' fragmented request sends back integer reply */
2647 ASSERT(pr->type == MSG_REQ_REDIS_DEL || pr->type == MSG_REQ_REDIS_TOUCH || pr->type == MSG_REQ_REDIS_UNLINK);
2648
2649 mbuf = STAILQ_FIRST(&r->mhdr);
2650 /*
2651 * Our response parser guarantees that the integer reply will be
2652 * completely encapsulated in a single mbuf and we should skip over
2653 * all the mbuf contents and discard it as the parser has already
2654 * parsed the integer reply and stored it in msg->integer
2655 */
2656 ASSERT(mbuf == STAILQ_LAST(&r->mhdr, mbuf, next));
2657 ASSERT(r->mlen == mbuf_length(mbuf));
2658
2659 r->mlen -= mbuf_length(mbuf);
2660 mbuf_rewind(mbuf);
2661
2662 /* accumulate the integer value in frag_owner of peer request */
2663 pr->frag_owner->integer += r->integer;
2664 break;
2665
2666 case MSG_RSP_REDIS_MULTIBULK:
2667 /* only redis 'mget' fragmented request sends back multi-bulk reply */
2668 ASSERT(pr->type == MSG_REQ_REDIS_MGET);
2669
2670 mbuf = STAILQ_FIRST(&r->mhdr);
2671 /*
2672 * Muti-bulk reply can span over multiple mbufs and in each reply
2673 * we should skip over the narg token. Our response parser
2674 * guarantees that the narg token and the immediately following
2675 * '\r\n' will exist in a contiguous region in the first mbuf
2676 */
2677 ASSERT(r->narg_start == mbuf->pos);
2678 ASSERT(r->narg_start < r->narg_end);
2679
2680 r->narg_end += CRLF_LEN;
2681 r->mlen -= (uint32_t)(r->narg_end - r->narg_start);
2682 mbuf->pos = r->narg_end;
2683
2684 break;
2685
2686 case MSG_RSP_REDIS_STATUS:
2687 if (pr->type == MSG_REQ_REDIS_MSET) { /* MSET segments */
2688 mbuf = STAILQ_FIRST(&r->mhdr);
2689 r->mlen -= mbuf_length(mbuf);
2690 mbuf_rewind(mbuf);
2691 }
2692 break;
2693
2694 default:
2695 /*
2696 * Valid responses for a fragmented request are MSG_RSP_REDIS_INTEGER or,
2697 * MSG_RSP_REDIS_MULTIBULK. For an invalid response, we send out -ERR
2698 * with EINVAL errno
2699 */
2700 mbuf = STAILQ_FIRST(&r->mhdr);
2701 log_hexdump(LOG_ERR, mbuf->pos, mbuf_length(mbuf), "rsp fragment "
2702 "with unknown type %d", r->type);
2703 pr->error = 1;
2704 pr->err = EINVAL;
2705 break;
2706 }
2707 }
2708
2709 static rstatus_t
redis_append_key(struct msg * r,const uint8_t * key,uint32_t keylen)2710 redis_append_key(struct msg *r, const uint8_t *key, uint32_t keylen)
2711 {
2712 uint32_t len;
2713 struct mbuf *mbuf;
2714 uint8_t printbuf[32];
2715 struct keypos *kpos;
2716
2717 /* 1. keylen */
2718 len = (uint32_t)nc_snprintf(printbuf, sizeof(printbuf), "$%d\r\n", keylen);
2719 mbuf = msg_ensure_mbuf(r, len);
2720 if (mbuf == NULL) {
2721 return NC_ENOMEM;
2722 }
2723 mbuf_copy(mbuf, printbuf, len);
2724 r->mlen += len;
2725
2726 /* 2. key */
2727 mbuf = msg_ensure_mbuf(r, keylen);
2728 if (mbuf == NULL) {
2729 return NC_ENOMEM;
2730 }
2731
2732 kpos = array_push(r->keys);
2733 if (kpos == NULL) {
2734 return NC_ENOMEM;
2735 }
2736
2737 kpos->start = mbuf->last;
2738 kpos->end = mbuf->last + keylen;
2739 mbuf_copy(mbuf, key, keylen);
2740 r->mlen += keylen;
2741
2742 /* 3. CRLF */
2743 mbuf = msg_ensure_mbuf(r, CRLF_LEN);
2744 if (mbuf == NULL) {
2745 return NC_ENOMEM;
2746 }
2747 mbuf_copy(mbuf, (uint8_t *)CRLF, CRLF_LEN);
2748 r->mlen += (uint32_t)CRLF_LEN;
2749
2750 return NC_OK;
2751 }
2752
2753 /*
2754 * input a msg, return a msg chain.
2755 * nserver is the number of backend redis/memcache server
2756 *
2757 * the original msg will be fragmented into at most nserver fragments.
2758 * all the keys map to the same backend will group into one fragment.
2759 *
2760 * frag_id:
2761 * a unique fragment id for all fragments of the message vector. including the orig msg.
2762 *
2763 * frag_owner:
2764 * All fragments of the message use frag_owner point to the orig msg
2765 *
2766 * frag_seq:
2767 * the map from each key to it's fragment, (only in the orig msg)
2768 *
2769 * For example, a message vector with 3 keys:
2770 *
2771 * get key1 key2 key3
2772 *
2773 * suppose we have 2 backend server, and the map is:
2774 *
2775 * key1 => backend 0
2776 * key2 => backend 1
2777 * key3 => backend 0
2778 *
2779 * it will fragment like this:
2780 *
2781 * +-----------------+
2782 * | msg vector |
2783 * |(original msg) |
2784 * |key1, key2, key3 |
2785 * +-----------------+
2786 *
2787 * frag_owner
2788 * /--------------------------------------+
2789 * frag_owner / |
2790 * /-----------+ | /------------+ frag_owner |
2791 * | | | | | |
2792 * | v v v | |
2793 * +--------------------+ +---------------------+ +----+----------------+
2794 * | frag_id = 10 | | frag_id = 10 | | frag_id = 10 |
2795 * | nfrag = 3 | | nfrag = 0 | | nfrag = 0 |
2796 * | frag_seq = x x x | | key1, key3 | | key2 |
2797 * +------------|-|-|---+ +---------------------+ +---------------------+
2798 * | | | ^ ^ ^
2799 * | \ \ | | |
2800 * | \ ----------+ | |
2801 * +---\---------------+ |
2802 * ------------------------------------------+
2803 *
2804 */
2805 static rstatus_t
redis_fragment_argx(struct msg * r,uint32_t nserver,struct msg_tqh * frag_msgq,uint32_t key_step)2806 redis_fragment_argx(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq,
2807 uint32_t key_step)
2808 {
2809 struct mbuf *mbuf;
2810 struct msg **sub_msgs;
2811 uint32_t i;
2812 rstatus_t status;
2813 struct array *keys = r->keys;
2814
2815 ASSERT(array_n(keys) == (r->narg - 1) / key_step);
2816
2817 sub_msgs = nc_zalloc(nserver * sizeof(*sub_msgs));
2818 if (sub_msgs == NULL) {
2819 return NC_ENOMEM;
2820 }
2821
2822 ASSERT(r->frag_seq == NULL);
2823 r->frag_seq = nc_alloc(array_n(keys) * sizeof(*r->frag_seq));
2824 if (r->frag_seq == NULL) {
2825 nc_free(sub_msgs);
2826 return NC_ENOMEM;
2827 }
2828
2829 mbuf = STAILQ_FIRST(&r->mhdr);
2830 mbuf->pos = mbuf->start;
2831
2832 /*
2833 * This code is based on the assumption that '*narg\r\n$4\r\nMGET\r\n' is located
2834 * in a contiguous location.
2835 * This is always true because we have capped our MBUF_MIN_SIZE at 512 and
2836 * whenever we have multiple messages, we copy the tail message into a new mbuf
2837 */
2838 for (i = 0; i < 3; i++) { /* eat *narg\r\n$4\r\nMGET\r\n */
2839 for (; *(mbuf->pos) != '\n';) {
2840 mbuf->pos++;
2841 }
2842 mbuf->pos++;
2843 }
2844
2845 r->frag_id = msg_gen_frag_id();
2846 r->nfrag = 0;
2847 r->frag_owner = r;
2848
2849 /* Build up the key1 key2 ... to be sent to a given server at index idx */
2850 for (i = 0; i < array_n(keys); i++) { /* for each key */
2851 struct msg *sub_msg;
2852 struct keypos *kpos = array_get(keys, i);
2853 uint32_t idx = msg_backend_idx(r, kpos->start, kpos->end - kpos->start);
2854 ASSERT(idx < nserver);
2855
2856 if (sub_msgs[idx] == NULL) {
2857 sub_msgs[idx] = msg_get(r->owner, r->request, r->redis);
2858 if (sub_msgs[idx] == NULL) {
2859 nc_free(sub_msgs);
2860 return NC_ENOMEM;
2861 }
2862 }
2863 r->frag_seq[i] = sub_msg = sub_msgs[idx];
2864
2865 sub_msg->narg++;
2866 status = redis_append_key(sub_msg, kpos->start, kpos->end - kpos->start);
2867 if (status != NC_OK) {
2868 nc_free(sub_msgs);
2869 return status;
2870 }
2871
2872 if (key_step == 1) { /* mget,del */
2873 continue;
2874 } else { /* mset */
2875 status = redis_copy_bulk(NULL, r); /* eat key */
2876 if (status != NC_OK) {
2877 nc_free(sub_msgs);
2878 return status;
2879 }
2880
2881 status = redis_copy_bulk(sub_msg, r);
2882 if (status != NC_OK) {
2883 nc_free(sub_msgs);
2884 return status;
2885 }
2886
2887 sub_msg->narg++;
2888 }
2889 }
2890
2891 /*
2892 * prepend mget header, and forward the command (command type+key(s)+suffix)
2893 * to the corresponding server(s)
2894 */
2895 for (i = 0; i < nserver; i++) {
2896 struct msg *sub_msg = sub_msgs[i];
2897 if (sub_msg == NULL) {
2898 continue;
2899 }
2900
2901 if (r->type == MSG_REQ_REDIS_MGET) {
2902 status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmget\r\n",
2903 sub_msg->narg + 1);
2904 } else if (r->type == MSG_REQ_REDIS_DEL) {
2905 status = msg_prepend_format(sub_msg, "*%d\r\n$3\r\ndel\r\n",
2906 sub_msg->narg + 1);
2907 } else if (r->type == MSG_REQ_REDIS_MSET) {
2908 status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmset\r\n",
2909 sub_msg->narg + 1);
2910 } else if (r->type == MSG_REQ_REDIS_TOUCH) {
2911 status = msg_prepend_format(sub_msg, "*%d\r\n$5\r\ntouch\r\n",
2912 sub_msg->narg + 1);
2913 } else if (r->type == MSG_REQ_REDIS_UNLINK) {
2914 status = msg_prepend_format(sub_msg, "*%d\r\n$6\r\nunlink\r\n",
2915 sub_msg->narg + 1);
2916 } else {
2917 NOT_REACHED();
2918 }
2919 if (status != NC_OK) {
2920 nc_free(sub_msgs);
2921 return status;
2922 }
2923
2924 sub_msg->type = r->type;
2925 sub_msg->frag_id = r->frag_id;
2926 sub_msg->frag_owner = r->frag_owner;
2927
2928 TAILQ_INSERT_TAIL(frag_msgq, sub_msg, m_tqe);
2929 r->nfrag++;
2930 }
2931
2932 nc_free(sub_msgs);
2933 return NC_OK;
2934 }
2935
2936 rstatus_t
redis_fragment(struct msg * r,uint32_t nserver,struct msg_tqh * frag_msgq)2937 redis_fragment(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq)
2938 {
2939 if (1 == array_n(r->keys)){
2940 return NC_OK;
2941 }
2942
2943 switch (r->type) {
2944 case MSG_REQ_REDIS_MGET:
2945 case MSG_REQ_REDIS_DEL:
2946 case MSG_REQ_REDIS_TOUCH:
2947 case MSG_REQ_REDIS_UNLINK:
2948 return redis_fragment_argx(r, nserver, frag_msgq, 1);
2949
2950 /* TODO: MSETNX - instead of responding with OK, respond with 1 if all fragments respond with 1 */
2951 case MSG_REQ_REDIS_MSET:
2952 return redis_fragment_argx(r, nserver, frag_msgq, 2);
2953
2954 default:
2955 return NC_OK;
2956 }
2957 }
2958
2959 rstatus_t
redis_reply(struct msg * r)2960 redis_reply(struct msg *r)
2961 {
2962 struct conn *c_conn;
2963 struct msg *response = r->peer;
2964
2965 ASSERT(response != NULL && response->owner != NULL);
2966
2967 c_conn = response->owner;
2968 if (r->type == MSG_REQ_REDIS_AUTH) {
2969 return redis_handle_auth_req(r, response);
2970 }
2971
2972 if (!conn_authenticated(c_conn)) {
2973 return msg_append(response, rsp_auth_required.data, rsp_auth_required.len);
2974 }
2975
2976 switch (r->type) {
2977 case MSG_REQ_REDIS_PING:
2978 return msg_append(response, rsp_pong.data, rsp_pong.len);
2979
2980 default:
2981 NOT_REACHED();
2982 return NC_ERROR;
2983 }
2984 }
2985
2986 void
redis_post_coalesce_mset(struct msg * request)2987 redis_post_coalesce_mset(struct msg *request)
2988 {
2989 rstatus_t status;
2990 struct msg *response = request->peer;
2991
2992 status = msg_append(response, rsp_ok.data, rsp_ok.len);
2993 if (status != NC_OK) {
2994 response->error = 1; /* mark this msg as err */
2995 response->err = errno;
2996 }
2997 }
2998
2999 void
redis_post_coalesce_del_or_touch(struct msg * request)3000 redis_post_coalesce_del_or_touch(struct msg *request)
3001 {
3002 struct msg *response = request->peer;
3003 rstatus_t status;
3004
3005 status = msg_prepend_format(response, ":%d\r\n", request->integer);
3006 if (status != NC_OK) {
3007 response->error = 1;
3008 response->err = errno;
3009 }
3010 }
3011
3012 static void
redis_post_coalesce_mget(struct msg * request)3013 redis_post_coalesce_mget(struct msg *request)
3014 {
3015 struct msg *response = request->peer;
3016 struct msg *sub_msg;
3017 rstatus_t status;
3018 uint32_t i;
3019
3020 status = msg_prepend_format(response, "*%d\r\n", request->narg - 1);
3021 if (status != NC_OK) {
3022 /*
3023 * the fragments is still in c_conn->omsg_q, we have to discard all of them,
3024 * we just close the conn here
3025 */
3026 response->owner->err = 1;
3027 return;
3028 }
3029
3030 for (i = 0; i < array_n(request->keys); i++) { /* for each key */
3031 sub_msg = request->frag_seq[i]->peer; /* get it's peer response */
3032 if (sub_msg == NULL) {
3033 response->owner->err = 1;
3034 return;
3035 }
3036 status = redis_copy_bulk(response, sub_msg);
3037 if (status != NC_OK) {
3038 response->owner->err = 1;
3039 return;
3040 }
3041 }
3042 }
3043
3044 /*
3045 * Post-coalesce handler is invoked when the message is a response to
3046 * the fragmented multi vector request - 'mget' or 'del' and all the
3047 * responses to the fragmented request vector has been received and
3048 * the fragmented request is consider to be done
3049 */
3050 void
redis_post_coalesce(struct msg * r)3051 redis_post_coalesce(struct msg *r)
3052 {
3053 struct msg *pr = r->peer; /* peer response */
3054
3055 ASSERT(!pr->request);
3056 ASSERT(r->request && (r->frag_owner == r));
3057 if (r->error || r->ferror) {
3058 /* do nothing, if msg is in error */
3059 return;
3060 }
3061
3062 switch (r->type) {
3063 case MSG_REQ_REDIS_MGET:
3064 return redis_post_coalesce_mget(r);
3065
3066 case MSG_REQ_REDIS_DEL:
3067 case MSG_REQ_REDIS_TOUCH:
3068 case MSG_REQ_REDIS_UNLINK:
3069 return redis_post_coalesce_del_or_touch(r);
3070
3071 case MSG_REQ_REDIS_MSET:
3072 return redis_post_coalesce_mset(r);
3073
3074 default:
3075 NOT_REACHED();
3076 }
3077 }
3078
3079 static rstatus_t
redis_handle_auth_req(struct msg * req,struct msg * rsp)3080 redis_handle_auth_req(struct msg *req, struct msg *rsp)
3081 {
3082 struct conn *conn = (struct conn *)rsp->owner;
3083 const struct server_pool *pool;
3084 const struct keypos *kpos;
3085 const uint8_t *key;
3086 uint32_t keylen;
3087 bool valid;
3088
3089 ASSERT(conn->client && !conn->proxy);
3090
3091 pool = (const struct server_pool *)conn->owner;
3092
3093 if (!pool->require_auth) {
3094 /*
3095 * AUTH command from the client in absence of a redis_auth:
3096 * directive should be treated as an error
3097 */
3098 return msg_append(rsp, rsp_no_password.data, rsp_no_password.len);
3099 }
3100
3101 kpos = array_get(req->keys, 0);
3102 key = kpos->start;
3103 keylen = (uint32_t)(kpos->end - kpos->start);
3104 valid = (keylen == pool->redis_auth.len) &&
3105 (memcmp(pool->redis_auth.data, key, keylen) == 0);
3106 if (valid) {
3107 conn->authenticated = 1;
3108 return msg_append(rsp, rsp_ok.data, rsp_ok.len);
3109 }
3110
3111 /*
3112 * Password in the AUTH command doesn't match the one configured in
3113 * redis_auth: directive
3114 *
3115 * We mark the connection has unauthenticated until the client
3116 * reauthenticates with the correct password
3117 */
3118 conn->authenticated = 0;
3119 return msg_append(rsp, rsp_invalid_password.data, rsp_invalid_password.len);
3120 }
3121
3122 rstatus_t
redis_add_auth(struct context * ctx,struct conn * c_conn,struct conn * s_conn)3123 redis_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn)
3124 {
3125 rstatus_t status;
3126 struct msg *msg;
3127 struct server_pool *pool;
3128
3129 ASSERT(!s_conn->client && !s_conn->proxy);
3130 ASSERT(!conn_authenticated(s_conn));
3131
3132 pool = c_conn->owner;
3133
3134 msg = msg_get(c_conn, true, c_conn->redis);
3135 if (msg == NULL) {
3136 c_conn->err = errno;
3137 return NC_ENOMEM;
3138 }
3139
3140 status = msg_prepend_format(msg, "*2\r\n$4\r\nAUTH\r\n$%d\r\n%s\r\n",
3141 pool->redis_auth.len, pool->redis_auth.data);
3142 if (status != NC_OK) {
3143 msg_put(msg);
3144 return status;
3145 }
3146
3147 msg->swallow = 1;
3148 s_conn->enqueue_inq(ctx, s_conn, msg);
3149 s_conn->authenticated = 1;
3150
3151 return NC_OK;
3152 }
3153
3154 void
redis_post_connect(struct context * ctx,struct conn * conn,struct server * server)3155 redis_post_connect(struct context *ctx, struct conn *conn, struct server *server)
3156 {
3157 rstatus_t status;
3158 struct server_pool *pool = server->owner;
3159 struct msg *msg;
3160 int digits;
3161
3162 ASSERT(!conn->client && conn->connected);
3163 ASSERT(conn->redis);
3164
3165 /*
3166 * By default, every connection to redis uses the database DB 0. You
3167 * can select a different one on a per-connection basis by sending
3168 * a request 'SELECT <redis_db>', where <redis_db> is the configured
3169 * on a per pool basis in the configuration
3170 */
3171 if (pool->redis_db <= 0) {
3172 return;
3173 }
3174
3175 /*
3176 * Create a fake client message and add it to the pipeline. We force this
3177 * message to be head of queue as it might already contain a command
3178 * that triggered the connect.
3179 */
3180 msg = msg_get(conn, true, conn->redis);
3181 if (msg == NULL) {
3182 return;
3183 }
3184
3185 digits = (pool->redis_db >= 10) ? (int)log10(pool->redis_db) + 1 : 1;
3186 status = msg_prepend_format(msg, "*2\r\n$6\r\nSELECT\r\n$%d\r\n%d\r\n", digits, pool->redis_db);
3187 if (status != NC_OK) {
3188 msg_put(msg);
3189 return;
3190 }
3191 msg->type = MSG_REQ_REDIS_SELECT;
3192 msg->result = MSG_PARSE_OK;
3193 msg->swallow = 1;
3194 msg->owner = NULL;
3195
3196 /* enqueue as head and send */
3197 req_server_enqueue_imsgq_head(ctx, conn, msg);
3198 msg_send(ctx, conn);
3199
3200 log_debug(LOG_NOTICE, "sent 'SELECT %d' to %s | %s", pool->redis_db,
3201 pool->name.data, server->name.data);
3202 }
3203
3204 void
redis_swallow_msg(struct conn * conn,struct msg * pmsg,struct msg * msg)3205 redis_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg)
3206 {
3207 if (pmsg != NULL && pmsg->type == MSG_REQ_REDIS_SELECT &&
3208 msg != NULL && redis_error(msg)) {
3209 struct server* conn_server;
3210 struct server_pool* conn_pool;
3211 struct mbuf* rsp_buffer;
3212 uint8_t message[128];
3213 size_t copy_len;
3214
3215 /*
3216 * Get a substring from the message so that the initial - and the trailing
3217 * \r\n is removed.
3218 */
3219 conn_server = (struct server*)conn->owner;
3220 conn_pool = conn_server->owner;
3221 rsp_buffer = STAILQ_LAST(&msg->mhdr, mbuf, next);
3222 copy_len = MIN(mbuf_length(rsp_buffer) - 3, sizeof(message) - 1);
3223
3224 nc_memcpy(message, &rsp_buffer->start[1], copy_len);
3225 message[copy_len] = 0;
3226
3227 log_warn("SELECT %d failed on %s | %s: %s",
3228 conn_pool->redis_db, conn_pool->name.data,
3229 conn_server->name.data, message);
3230 }
3231 }
3232