1 /*
2 * twemproxy - A fast and lightweight proxy for memcached protocol.
3 * Copyright (C) 2011 Twitter, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <ctype.h>
19
20 #include <nc_core.h>
21 #include <nc_proto.h>
22
23 /*
24 * From memcache protocol specification:
25 *
26 * Data stored by memcached is identified with the help of a key. A key
27 * is a text string which should uniquely identify the data for clients
28 * that are interested in storing and retrieving it. Currently the
29 * length limit of a key is set at 250 characters (of course, normally
30 * clients wouldn't need to use such long keys); the key must not include
31 * control characters or whitespace.
32 */
33 #define MEMCACHE_MAX_KEY_LENGTH 250
34
35 /*
36 * Return true, if the memcache command is a storage command, otherwise
37 * return false
38 */
39 static bool
memcache_storage(const struct msg * r)40 memcache_storage(const struct msg *r)
41 {
42 switch (r->type) {
43 case MSG_REQ_MC_SET:
44 case MSG_REQ_MC_CAS:
45 case MSG_REQ_MC_ADD:
46 case MSG_REQ_MC_REPLACE:
47 case MSG_REQ_MC_APPEND:
48 case MSG_REQ_MC_PREPEND:
49 return true;
50
51 default:
52 break;
53 }
54
55 return false;
56 }
57
58 /*
59 * Return true, if the memcache command is a cas command, otherwise
60 * return false
61 */
62 static bool
memcache_cas(const struct msg * r)63 memcache_cas(const struct msg *r)
64 {
65 if (r->type == MSG_REQ_MC_CAS) {
66 return true;
67 }
68
69 return false;
70 }
71
72 /*
73 * Return true, if the memcache command is a retrieval command, otherwise
74 * return false
75 */
76 static bool
memcache_retrieval(const struct msg * r)77 memcache_retrieval(const struct msg *r)
78 {
79 switch (r->type) {
80 case MSG_REQ_MC_GET:
81 case MSG_REQ_MC_GETS:
82 return true;
83
84 default:
85 break;
86 }
87
88 return false;
89 }
90
91 /*
92 * Return true, if the memcache command should be fragmented,
93 * otherwise return false.
94 *
95 * The only supported memcache commands that can have multiple keys
96 * are get/gets. Both are multigets, and the latter returns CAS token with the
97 * value.
98 *
99 * Fragmented requests are assumed to be slower due to the fact that they need
100 * to allocate an array to track which key went to which server,
101 * so avoid them when possible.
102 */
103 static bool
memcache_should_fragment(const struct msg * r)104 memcache_should_fragment(const struct msg *r)
105 {
106 switch (r->type) {
107 case MSG_REQ_MC_GET:
108 case MSG_REQ_MC_GETS:
109 /*
110 * A memcache get for a single key is only sent to one server.
111 * Fragmenting it would work but be less efficient.
112 */
113 return array_n(r->keys) != 1;
114
115 default:
116 break;
117 }
118
119 return false;
120 }
121
122 /*
123 * Return true, if the memcache command is a arithmetic command, otherwise
124 * return false
125 */
126 static bool
memcache_arithmetic(const struct msg * r)127 memcache_arithmetic(const struct msg *r)
128 {
129 switch (r->type) {
130 case MSG_REQ_MC_INCR:
131 case MSG_REQ_MC_DECR:
132 return true;
133
134 default:
135 break;
136 }
137
138 return false;
139 }
140
141 /*
142 * Return true, if the memcache command is a delete command, otherwise
143 * return false
144 */
145 static bool
memcache_delete(const struct msg * r)146 memcache_delete(const struct msg *r)
147 {
148 if (r->type == MSG_REQ_MC_DELETE) {
149 return true;
150 }
151
152 return false;
153 }
154
155 /*
156 * Return true, if the memcache command is a touch command, otherwise
157 * return false
158 */
159 static bool
memcache_touch(const struct msg * r)160 memcache_touch(const struct msg *r)
161 {
162 if (r->type == MSG_REQ_MC_TOUCH) {
163 return true;
164 }
165
166 return false;
167 }
168
169 void
memcache_parse_req(struct msg * r)170 memcache_parse_req(struct msg *r)
171 {
172 struct mbuf *b;
173 uint8_t *p, *m;
174 uint8_t ch;
175 enum {
176 SW_START,
177 SW_REQ_TYPE,
178 SW_SPACES_BEFORE_KEY,
179 SW_KEY,
180 SW_SPACES_BEFORE_KEYS,
181 SW_SPACES_BEFORE_FLAGS,
182 SW_FLAGS,
183 SW_SPACES_BEFORE_EXPIRY,
184 SW_EXPIRY,
185 SW_SPACES_BEFORE_VLEN,
186 SW_VLEN,
187 SW_SPACES_BEFORE_CAS,
188 SW_CAS,
189 SW_RUNTO_VAL,
190 SW_VAL,
191 SW_SPACES_BEFORE_NUM,
192 SW_NUM,
193 SW_RUNTO_CRLF,
194 SW_CRLF,
195 SW_NOREPLY,
196 SW_AFTER_NOREPLY,
197 SW_ALMOST_DONE,
198 SW_SENTINEL
199 } state;
200
201 state = r->state;
202 b = STAILQ_LAST(&r->mhdr, mbuf, next);
203
204 ASSERT(r->request);
205 ASSERT(!r->redis);
206 ASSERT(state >= SW_START && state < SW_SENTINEL);
207 ASSERT(b != NULL);
208 ASSERT(b->pos <= b->last);
209
210 /* validate the parsing maker */
211 ASSERT(r->pos != NULL);
212 ASSERT(r->pos >= b->pos && r->pos <= b->last);
213
214 for (p = r->pos; p < b->last; p++) {
215 ch = *p;
216
217 switch (state) {
218
219 case SW_START:
220 if (ch == ' ') {
221 break;
222 }
223
224 if (!islower(ch)) {
225 goto error;
226 }
227
228 /* req_start <- p; type_start <- p */
229 r->token = p;
230 state = SW_REQ_TYPE;
231
232 break;
233
234 case SW_REQ_TYPE:
235 if (ch == ' ' || ch == CR) {
236 /* type_end = p - 1 */
237 m = r->token;
238 r->token = NULL;
239 r->type = MSG_UNKNOWN;
240 r->narg++;
241
242 switch (p - m) {
243
244 case 3:
245 if (str4cmp(m, 'g', 'e', 't', ' ')) {
246 r->type = MSG_REQ_MC_GET;
247 break;
248 }
249
250 if (str4cmp(m, 's', 'e', 't', ' ')) {
251 r->type = MSG_REQ_MC_SET;
252 break;
253 }
254
255 if (str4cmp(m, 'a', 'd', 'd', ' ')) {
256 r->type = MSG_REQ_MC_ADD;
257 break;
258 }
259
260 if (str4cmp(m, 'c', 'a', 's', ' ')) {
261 r->type = MSG_REQ_MC_CAS;
262 break;
263 }
264
265 break;
266
267 case 4:
268 if (str4cmp(m, 'g', 'e', 't', 's')) {
269 r->type = MSG_REQ_MC_GETS;
270 break;
271 }
272
273 if (str4cmp(m, 'i', 'n', 'c', 'r')) {
274 r->type = MSG_REQ_MC_INCR;
275 break;
276 }
277
278 if (str4cmp(m, 'd', 'e', 'c', 'r')) {
279 r->type = MSG_REQ_MC_DECR;
280 break;
281 }
282
283 if (str4cmp(m, 'q', 'u', 'i', 't')) {
284 r->type = MSG_REQ_MC_QUIT;
285 r->quit = 1;
286 break;
287 }
288
289 break;
290
291 case 5:
292 if (str5cmp(m, 't', 'o', 'u', 'c', 'h')) {
293 r->type = MSG_REQ_MC_TOUCH;
294 break;
295 }
296
297 break;
298
299 case 6:
300 if (str6cmp(m, 'a', 'p', 'p', 'e', 'n', 'd')) {
301 r->type = MSG_REQ_MC_APPEND;
302 break;
303 }
304
305 if (str6cmp(m, 'd', 'e', 'l', 'e', 't', 'e')) {
306 r->type = MSG_REQ_MC_DELETE;
307 break;
308 }
309
310 break;
311
312 case 7:
313 if (str7cmp(m, 'p', 'r', 'e', 'p', 'e', 'n', 'd')) {
314 r->type = MSG_REQ_MC_PREPEND;
315 break;
316 }
317
318 if (str7cmp(m, 'r', 'e', 'p', 'l', 'a', 'c', 'e')) {
319 r->type = MSG_REQ_MC_REPLACE;
320 break;
321 }
322
323 if (str7cmp(m, 'v', 'e', 'r', 's', 'i', 'o', 'n')) {
324 r->type = MSG_REQ_MC_VERSION;
325 if (!msg_set_placeholder_key(r)) {
326 goto enomem;
327 }
328 break;
329 }
330
331 break;
332 }
333
334 switch (r->type) {
335 case MSG_REQ_MC_GET:
336 case MSG_REQ_MC_GETS:
337 case MSG_REQ_MC_DELETE:
338 case MSG_REQ_MC_CAS:
339 case MSG_REQ_MC_SET:
340 case MSG_REQ_MC_ADD:
341 case MSG_REQ_MC_REPLACE:
342 case MSG_REQ_MC_APPEND:
343 case MSG_REQ_MC_PREPEND:
344 case MSG_REQ_MC_INCR:
345 case MSG_REQ_MC_DECR:
346 case MSG_REQ_MC_TOUCH:
347 if (ch == CR) {
348 goto error;
349 }
350 state = SW_SPACES_BEFORE_KEY;
351 break;
352
353 case MSG_REQ_MC_VERSION:
354 case MSG_REQ_MC_QUIT:
355 p = p - 1; /* go back by 1 byte */
356 state = SW_CRLF;
357 break;
358
359 case MSG_UNKNOWN:
360 goto error;
361
362 default:
363 NOT_REACHED();
364 }
365
366 } else if (!islower(ch)) {
367 goto error;
368 }
369
370 break;
371
372 case SW_SPACES_BEFORE_KEY:
373 if (ch != ' ') {
374 p = p - 1; /* go back by 1 byte */
375 r->token = NULL;
376 state = SW_KEY;
377 }
378 break;
379
380 case SW_KEY:
381 if (r->token == NULL) {
382 r->token = p;
383 }
384 if (ch == ' ' || ch == CR) {
385 struct keypos *kpos;
386 int keylen = p - r->token;
387 if (keylen > MEMCACHE_MAX_KEY_LENGTH) {
388 log_error("parsed bad req %"PRIu64" of type %d with key "
389 "prefix '%.*s...' and length %d that exceeds "
390 "maximum key length", r->id, r->type, 16,
391 r->token, (int)(p - r->token));
392 goto error;
393 } else if (keylen == 0) {
394 log_error("parsed bad req %"PRIu64" of type %d with an "
395 "empty key", r->id, r->type);
396 goto error;
397 }
398
399 kpos = array_push(r->keys);
400 if (kpos == NULL) {
401 goto enomem;
402 }
403 kpos->start = r->token;
404 kpos->end = p;
405
406 r->narg++;
407 r->token = NULL;
408
409 /* get next state */
410 if (memcache_storage(r)) {
411 state = SW_SPACES_BEFORE_FLAGS;
412 } else if (memcache_arithmetic(r) || memcache_touch(r) ) {
413 state = SW_SPACES_BEFORE_NUM;
414 } else if (memcache_retrieval(r)) {
415 state = SW_SPACES_BEFORE_KEYS;
416 } else {
417 /* delete, etc. */
418 state = SW_RUNTO_CRLF;
419 }
420
421 if (ch == CR) {
422 if (memcache_storage(r) || memcache_arithmetic(r)) {
423 goto error;
424 }
425 p = p - 1; /* go back by 1 byte */
426 }
427 }
428
429 break;
430
431 case SW_SPACES_BEFORE_KEYS:
432 ASSERT(memcache_retrieval(r));
433 switch (ch) {
434 case ' ':
435 break;
436
437 case CR:
438 state = SW_ALMOST_DONE;
439 break;
440
441 default:
442 r->token = NULL;
443 p = p - 1; /* go back by 1 byte */
444 state = SW_KEY;
445 }
446
447 break;
448
449 case SW_SPACES_BEFORE_FLAGS:
450 if (ch != ' ') {
451 if (!isdigit(ch)) {
452 goto error;
453 }
454 /* flags_start <- p; flags <- ch - '0' */
455 r->token = p;
456 state = SW_FLAGS;
457 }
458
459 break;
460
461 case SW_FLAGS:
462 if (isdigit(ch)) {
463 /* flags <- flags * 10 + (ch - '0') */
464 ;
465 } else if (ch == ' ') {
466 /* flags_end <- p - 1 */
467 r->token = NULL;
468 state = SW_SPACES_BEFORE_EXPIRY;
469 } else {
470 goto error;
471 }
472
473 break;
474
475 case SW_SPACES_BEFORE_EXPIRY:
476 if (ch != ' ') {
477 if (!isdigit(ch)) {
478 goto error;
479 }
480 /* expiry_start <- p; expiry <- ch - '0' */
481 r->token = p;
482 state = SW_EXPIRY;
483 }
484
485 break;
486
487 case SW_EXPIRY:
488 if (isdigit(ch)) {
489 /* expiry <- expiry * 10 + (ch - '0') */
490 ;
491 } else if (ch == ' ') {
492 /* expiry_end <- p - 1 */
493 r->token = NULL;
494 state = SW_SPACES_BEFORE_VLEN;
495 } else {
496 goto error;
497 }
498
499 break;
500
501 case SW_SPACES_BEFORE_VLEN:
502 if (ch != ' ') {
503 if (!isdigit(ch)) {
504 goto error;
505 }
506 /* vlen_start <- p */
507 r->vlen = (uint32_t)(ch - '0');
508 state = SW_VLEN;
509 }
510
511 break;
512
513 case SW_VLEN:
514 if (isdigit(ch)) {
515 r->vlen = r->vlen * 10 + (uint32_t)(ch - '0');
516 } else if (memcache_cas(r)) {
517 if (ch != ' ') {
518 goto error;
519 }
520 /* vlen_end <- p - 1 */
521 p = p - 1; /* go back by 1 byte */
522 r->token = NULL;
523 state = SW_SPACES_BEFORE_CAS;
524 } else if (ch == ' ' || ch == CR) {
525 /* vlen_end <- p - 1 */
526 p = p - 1; /* go back by 1 byte */
527 r->token = NULL;
528 state = SW_RUNTO_CRLF;
529 } else {
530 goto error;
531 }
532
533 break;
534
535 case SW_SPACES_BEFORE_CAS:
536 if (ch != ' ') {
537 if (!isdigit(ch)) {
538 goto error;
539 }
540 /* cas_start <- p; cas <- ch - '0' */
541 r->token = p;
542 state = SW_CAS;
543 }
544
545 break;
546
547 case SW_CAS:
548 if (isdigit(ch)) {
549 /* cas <- cas * 10 + (ch - '0') */
550 ;
551 } else if (ch == ' ' || ch == CR) {
552 /* cas_end <- p - 1 */
553 p = p - 1; /* go back by 1 byte */
554 r->token = NULL;
555 state = SW_RUNTO_CRLF;
556 } else {
557 goto error;
558 }
559
560 break;
561
562
563 case SW_RUNTO_VAL:
564 switch (ch) {
565 case LF:
566 /* val_start <- p + 1 */
567 state = SW_VAL;
568 break;
569
570 default:
571 goto error;
572 }
573
574 break;
575
576 case SW_VAL:
577 m = p + r->vlen;
578 if (m >= b->last) {
579 ASSERT(r->vlen >= (uint32_t)(b->last - p));
580 r->vlen -= (uint32_t)(b->last - p);
581 m = b->last - 1;
582 p = m; /* move forward by vlen bytes */
583 break;
584 }
585 switch (*m) {
586 case CR:
587 /* val_end <- p - 1 */
588 p = m; /* move forward by vlen bytes */
589 state = SW_ALMOST_DONE;
590 break;
591
592 default:
593 goto error;
594 }
595
596 break;
597
598 case SW_SPACES_BEFORE_NUM:
599 if (ch != ' ') {
600 if (!(isdigit(ch) || ch == '-')) {
601 goto error;
602 }
603 /* num_start <- p; num <- ch - '0' */
604 r->token = p;
605 state = SW_NUM;
606 }
607
608 break;
609
610 case SW_NUM:
611 if (isdigit(ch)) {
612 /* num <- num * 10 + (ch - '0') */
613 ;
614 } else if (ch == ' ' || ch == CR) {
615 r->token = NULL;
616 /* num_end <- p - 1 */
617 p = p - 1; /* go back by 1 byte */
618 state = SW_RUNTO_CRLF;
619 } else {
620 goto error;
621 }
622
623 break;
624
625 case SW_RUNTO_CRLF:
626 switch (ch) {
627 case ' ':
628 break;
629
630 case 'n':
631 if (memcache_storage(r) || memcache_arithmetic(r) || memcache_delete(r) || memcache_touch(r)) {
632 /* noreply_start <- p */
633 r->token = p;
634 state = SW_NOREPLY;
635 } else {
636 goto error;
637 }
638
639 break;
640
641 case CR:
642 if (memcache_storage(r)) {
643 state = SW_RUNTO_VAL;
644 } else {
645 state = SW_ALMOST_DONE;
646 }
647
648 break;
649
650 default:
651 goto error;
652 }
653
654 break;
655
656 case SW_NOREPLY:
657 switch (ch) {
658 case ' ':
659 case CR:
660 m = r->token;
661 if (((p - m) == 7) && str7cmp(m, 'n', 'o', 'r', 'e', 'p', 'l', 'y')) {
662 ASSERT(memcache_storage(r) || memcache_arithmetic(r) || memcache_delete(r) || memcache_touch(r));
663 r->token = NULL;
664 /* noreply_end <- p - 1 */
665 r->noreply = 1;
666 state = SW_AFTER_NOREPLY;
667 p = p - 1; /* go back by 1 byte */
668 } else {
669 goto error;
670 }
671 }
672
673 break;
674
675 case SW_AFTER_NOREPLY:
676 switch (ch) {
677 case ' ':
678 break;
679
680 case CR:
681 if (memcache_storage(r)) {
682 state = SW_RUNTO_VAL;
683 } else {
684 state = SW_ALMOST_DONE;
685 }
686 break;
687
688 default:
689 goto error;
690 }
691
692 break;
693
694 case SW_CRLF:
695 switch (ch) {
696 case ' ':
697 break;
698
699 case CR:
700 state = SW_ALMOST_DONE;
701 break;
702
703 default:
704 goto error;
705 }
706
707 break;
708
709 case SW_ALMOST_DONE:
710 switch (ch) {
711 case LF:
712 /* req_end <- p */
713 goto done;
714
715 default:
716 goto error;
717 }
718
719 break;
720
721 case SW_SENTINEL:
722 default:
723 NOT_REACHED();
724 break;
725
726 }
727 }
728
729 /*
730 * At this point, buffer from b->pos to b->last has been parsed completely
731 * but we haven't been able to reach to any conclusion. Normally, this
732 * means that we have to parse again starting from the state we are in
733 * after more data has been read. The newly read data is either read into
734 * a new mbuf, if existing mbuf is full (b->last == b->end) or into the
735 * existing mbuf.
736 *
737 * The only exception to this is when the existing mbuf is full (b->last
738 * is at b->end) and token marker is set, which means that we have to
739 * copy the partial token into a new mbuf and parse again with more data
740 * read into new mbuf.
741 */
742 ASSERT(p == b->last);
743 r->pos = p;
744 r->state = state;
745
746 if (b->last == b->end && r->token != NULL) {
747 r->pos = r->token;
748 r->token = NULL;
749 r->result = MSG_PARSE_REPAIR;
750 } else {
751 r->result = MSG_PARSE_AGAIN;
752 }
753
754 log_hexdump(LOG_VERB, b->pos, mbuf_length(b), "parsed req %"PRIu64" res %d "
755 "type %d state %d rpos %d of %d", r->id, r->result, r->type,
756 r->state, (int)(r->pos - b->pos), (int)(b->last - b->pos));
757 return;
758
759 done:
760 ASSERT(r->type > MSG_UNKNOWN && r->type < MSG_SENTINEL);
761 r->pos = p + 1;
762 ASSERT(r->pos <= b->last);
763 r->state = SW_START;
764 r->result = MSG_PARSE_OK;
765
766 log_hexdump(LOG_VERB, b->pos, mbuf_length(b), "parsed req %"PRIu64" res %d "
767 "type %d state %d rpos %d of %d", r->id, r->result, r->type,
768 r->state, (int)(r->pos - b->pos), (int)(b->last - b->pos));
769 return;
770
771 enomem:
772 r->result = MSG_PARSE_ERROR;
773 r->state = state;
774
775 log_hexdump(LOG_INFO, b->pos, mbuf_length(b), "out of memory on parse req %"PRIu64" "
776 "res %d type %d state %d", r->id, r->result, r->type, r->state);
777
778 return;
779
780 error:
781 r->result = MSG_PARSE_ERROR;
782 r->state = state;
783 errno = EINVAL;
784
785 log_hexdump(LOG_INFO, b->pos, mbuf_length(b), "parsed bad req %"PRIu64" "
786 "res %d type %d state %d", r->id, r->result, r->type,
787 r->state);
788 }
789
790 void
memcache_parse_rsp(struct msg * r)791 memcache_parse_rsp(struct msg *r)
792 {
793 struct mbuf *b;
794 uint8_t *p, *m;
795 uint8_t ch;
796 enum {
797 SW_START,
798 SW_RSP_NUM,
799 SW_RSP_STR,
800 SW_SPACES_BEFORE_KEY,
801 SW_KEY,
802 SW_SPACES_BEFORE_FLAGS, /* 5 */
803 SW_FLAGS,
804 SW_SPACES_BEFORE_VLEN,
805 SW_VLEN,
806 SW_RUNTO_VAL,
807 SW_VAL, /* 10 */
808 SW_VAL_LF,
809 SW_END,
810 SW_RUNTO_CRLF,
811 SW_CRLF,
812 SW_ALMOST_DONE, /* 15 */
813 SW_SENTINEL
814 } state;
815
816 state = r->state;
817 b = STAILQ_LAST(&r->mhdr, mbuf, next);
818
819 ASSERT(!r->request);
820 ASSERT(!r->redis);
821 ASSERT(state >= SW_START && state < SW_SENTINEL);
822 ASSERT(b != NULL);
823 ASSERT(b->pos <= b->last);
824
825 /* validate the parsing marker */
826 ASSERT(r->pos != NULL);
827 ASSERT(r->pos >= b->pos && r->pos <= b->last);
828
829 for (p = r->pos; p < b->last; p++) {
830 ch = *p;
831
832 switch (state) {
833 case SW_START:
834 if (isdigit(ch)) {
835 state = SW_RSP_NUM;
836 } else {
837 state = SW_RSP_STR;
838 }
839 p = p - 1; /* go back by 1 byte */
840
841 break;
842
843 case SW_RSP_NUM:
844 if (r->token == NULL) {
845 /* rsp_start <- p; type_start <- p */
846 r->token = p;
847 }
848
849 if (isdigit(ch)) {
850 /* num <- num * 10 + (ch - '0') */
851 ;
852 } else if (ch == ' ' || ch == CR) {
853 /* type_end <- p - 1 */
854 r->token = NULL;
855 r->type = MSG_RSP_MC_NUM;
856 p = p - 1; /* go back by 1 byte */
857 state = SW_CRLF;
858 } else {
859 goto error;
860 }
861
862 break;
863
864 case SW_RSP_STR:
865 if (r->token == NULL) {
866 /* rsp_start <- p; type_start <- p */
867 r->token = p;
868 }
869
870 if (ch == ' ' || ch == CR) {
871 /* type_end <- p - 1 */
872 m = r->token;
873 /* r->token = NULL; */
874 r->type = MSG_UNKNOWN;
875
876 switch (p - m) {
877 case 3:
878 if (str4cmp(m, 'E', 'N', 'D', '\r')) {
879 r->type = MSG_RSP_MC_END;
880 /* end_start <- m; end_end <- p - 1 */
881 r->end = m;
882 break;
883 }
884
885 break;
886
887 case 5:
888 if (str5cmp(m, 'V', 'A', 'L', 'U', 'E')) {
889 /*
890 * Encompasses responses for 'get', 'gets' and
891 * 'cas' command.
892 */
893 r->type = MSG_RSP_MC_VALUE;
894 break;
895 }
896
897 if (str5cmp(m, 'E', 'R', 'R', 'O', 'R')) {
898 r->type = MSG_RSP_MC_ERROR;
899 break;
900 }
901
902 break;
903
904 case 6:
905 if (str6cmp(m, 'S', 'T', 'O', 'R', 'E', 'D')) {
906 r->type = MSG_RSP_MC_STORED;
907 break;
908 }
909
910 if (str6cmp(m, 'E', 'X', 'I', 'S', 'T', 'S')) {
911 r->type = MSG_RSP_MC_EXISTS;
912 break;
913 }
914
915 break;
916
917 case 7:
918 if (str7cmp(m, 'D', 'E', 'L', 'E', 'T', 'E', 'D')) {
919 r->type = MSG_RSP_MC_DELETED;
920 break;
921 }
922
923 if (str7cmp(m, 'T', 'O', 'U', 'C', 'H', 'E', 'D')) {
924 r->type = MSG_RSP_MC_TOUCHED;
925 break;
926 }
927
928 if (str7cmp(m, 'V', 'E', 'R', 'S', 'I', 'O', 'N')) {
929 r->type = MSG_RSP_MC_VERSION;
930 break;
931 }
932
933 break;
934
935 case 9:
936 if (str9cmp(m, 'N', 'O', 'T', '_', 'F', 'O', 'U', 'N', 'D')) {
937 r->type = MSG_RSP_MC_NOT_FOUND;
938 break;
939 }
940
941 break;
942
943 case 10:
944 if (str10cmp(m, 'N', 'O', 'T', '_', 'S', 'T', 'O', 'R', 'E', 'D')) {
945 r->type = MSG_RSP_MC_NOT_STORED;
946 break;
947 }
948
949 break;
950
951 case 12:
952 if (str12cmp(m, 'C', 'L', 'I', 'E', 'N', 'T', '_', 'E', 'R', 'R', 'O', 'R')) {
953 r->type = MSG_RSP_MC_CLIENT_ERROR;
954 break;
955 }
956
957 if (str12cmp(m, 'S', 'E', 'R', 'V', 'E', 'R', '_', 'E', 'R', 'R', 'O', 'R')) {
958 r->type = MSG_RSP_MC_SERVER_ERROR;
959 break;
960 }
961
962 break;
963 }
964
965 switch (r->type) {
966 case MSG_UNKNOWN:
967 goto error;
968
969 case MSG_RSP_MC_STORED:
970 case MSG_RSP_MC_NOT_STORED:
971 case MSG_RSP_MC_EXISTS:
972 case MSG_RSP_MC_NOT_FOUND:
973 case MSG_RSP_MC_DELETED:
974 case MSG_RSP_MC_TOUCHED:
975 state = SW_CRLF;
976 break;
977
978 case MSG_RSP_MC_END:
979 state = SW_CRLF;
980 break;
981
982 case MSG_RSP_MC_VALUE:
983 state = SW_SPACES_BEFORE_KEY;
984 break;
985
986 case MSG_RSP_MC_ERROR:
987 state = SW_CRLF;
988 break;
989
990 case MSG_RSP_MC_CLIENT_ERROR:
991 case MSG_RSP_MC_SERVER_ERROR:
992 case MSG_RSP_MC_VERSION:
993 state = SW_RUNTO_CRLF;
994 break;
995
996 default:
997 NOT_REACHED();
998 }
999
1000 p = p - 1; /* go back by 1 byte */
1001 }
1002
1003 break;
1004
1005 case SW_SPACES_BEFORE_KEY:
1006 if (ch != ' ') {
1007 state = SW_KEY;
1008 p = p - 1; /* go back by 1 byte */
1009 }
1010
1011 break;
1012
1013 case SW_KEY:
1014 if (ch == ' ') {
1015 /* r->token = NULL; */
1016 state = SW_SPACES_BEFORE_FLAGS;
1017 }
1018
1019 break;
1020
1021 case SW_SPACES_BEFORE_FLAGS:
1022 if (ch != ' ') {
1023 if (!isdigit(ch)) {
1024 goto error;
1025 }
1026 state = SW_FLAGS;
1027 p = p - 1; /* go back by 1 byte */
1028 }
1029
1030 break;
1031
1032 case SW_FLAGS:
1033 if (r->token == NULL) {
1034 /* flags_start <- p */
1035 /* r->token = p; */
1036 }
1037
1038 if (isdigit(ch)) {
1039 /* flags <- flags * 10 + (ch - '0') */
1040 ;
1041 } else if (ch == ' ') {
1042 /* flags_end <- p - 1 */
1043 /* r->token = NULL; */
1044 state = SW_SPACES_BEFORE_VLEN;
1045 } else {
1046 goto error;
1047 }
1048
1049 break;
1050
1051 case SW_SPACES_BEFORE_VLEN:
1052 if (ch != ' ') {
1053 if (!isdigit(ch)) {
1054 goto error;
1055 }
1056 p = p - 1; /* go back by 1 byte */
1057 state = SW_VLEN;
1058 r->vlen = 0;
1059 }
1060
1061 break;
1062
1063 case SW_VLEN:
1064 if (isdigit(ch)) {
1065 r->vlen = r->vlen * 10 + (uint32_t)(ch - '0');
1066 } else if (ch == ' ' || ch == CR) {
1067 /* vlen_end <- p - 1 */
1068 p = p - 1; /* go back by 1 byte */
1069 /* r->token = NULL; */
1070 state = SW_RUNTO_CRLF;
1071 } else {
1072 goto error;
1073 }
1074
1075 break;
1076
1077 case SW_RUNTO_VAL:
1078 switch (ch) {
1079 case LF:
1080 /* val_start <- p + 1 */
1081 state = SW_VAL;
1082 r->token = NULL;
1083 break;
1084
1085 default:
1086 goto error;
1087 }
1088
1089 break;
1090
1091 case SW_VAL:
1092 m = p + r->vlen;
1093 if (m >= b->last) {
1094 ASSERT(r->vlen >= (uint32_t)(b->last - p));
1095 r->vlen -= (uint32_t)(b->last - p);
1096 m = b->last - 1;
1097 p = m; /* move forward by vlen bytes */
1098 break;
1099 }
1100 switch (*m) {
1101 case CR:
1102 /* val_end <- p - 1 */
1103 p = m; /* move forward by vlen bytes */
1104 state = SW_VAL_LF;
1105 break;
1106
1107 default:
1108 goto error;
1109 }
1110
1111 break;
1112
1113 case SW_VAL_LF:
1114 switch (ch) {
1115 case LF:
1116 /* state = SW_END; */
1117 state = SW_RSP_STR;
1118 break;
1119
1120 default:
1121 goto error;
1122 }
1123
1124 break;
1125
1126 case SW_END:
1127 if (r->token == NULL) {
1128 if (ch != 'E') {
1129 goto error;
1130 }
1131 /* end_start <- p */
1132 r->token = p;
1133 } else if (ch == CR) {
1134 /* end_end <- p */
1135 m = r->token;
1136 r->token = NULL;
1137
1138 switch (p - m) {
1139 case 3:
1140 if (str4cmp(m, 'E', 'N', 'D', '\r')) {
1141 r->end = m;
1142 state = SW_ALMOST_DONE;
1143 }
1144 break;
1145
1146 default:
1147 goto error;
1148 }
1149 }
1150
1151 break;
1152
1153 case SW_RUNTO_CRLF:
1154 switch (ch) {
1155 case CR:
1156 if (r->type == MSG_RSP_MC_VALUE) {
1157 state = SW_RUNTO_VAL;
1158 } else {
1159 state = SW_ALMOST_DONE;
1160 }
1161
1162 break;
1163
1164 default:
1165 break;
1166 }
1167
1168 break;
1169
1170 case SW_CRLF:
1171 switch (ch) {
1172 case ' ':
1173 break;
1174
1175 case CR:
1176 state = SW_ALMOST_DONE;
1177 break;
1178
1179 default:
1180 goto error;
1181 }
1182
1183 break;
1184
1185 case SW_ALMOST_DONE:
1186 switch (ch) {
1187 case LF:
1188 /* rsp_end <- p */
1189 goto done;
1190
1191 default:
1192 goto error;
1193 }
1194
1195 break;
1196
1197 case SW_SENTINEL:
1198 default:
1199 NOT_REACHED();
1200 break;
1201
1202 }
1203 }
1204
1205 ASSERT(p == b->last);
1206 r->pos = p;
1207 r->state = state;
1208
1209 if (b->last == b->end && r->token != NULL) {
1210 if (state <= SW_RUNTO_VAL || state == SW_CRLF || state == SW_ALMOST_DONE) {
1211 r->state = SW_START;
1212 }
1213 r->pos = r->token;
1214 r->token = NULL;
1215 r->result = MSG_PARSE_REPAIR;
1216 } else {
1217 r->result = MSG_PARSE_AGAIN;
1218 }
1219
1220 log_hexdump(LOG_VERB, b->pos, mbuf_length(b), "parsed rsp %"PRIu64" res %d "
1221 "type %d state %d rpos %d of %d", r->id, r->result, r->type,
1222 r->state, (int)(r->pos - b->pos), (int)(b->last - b->pos));
1223 return;
1224
1225 done:
1226 ASSERT(r->type > MSG_UNKNOWN && r->type < MSG_SENTINEL);
1227 r->pos = p + 1;
1228 ASSERT(r->pos <= b->last);
1229 r->state = SW_START;
1230 r->token = NULL;
1231 r->result = MSG_PARSE_OK;
1232
1233 log_hexdump(LOG_VERB, b->pos, mbuf_length(b), "parsed rsp %"PRIu64" res %d "
1234 "type %d state %d rpos %d of %d", r->id, r->result, r->type,
1235 r->state, (int)(r->pos - b->pos), (int)(b->last - b->pos));
1236 return;
1237
1238 error:
1239 r->result = MSG_PARSE_ERROR;
1240 r->state = state;
1241 errno = EINVAL;
1242
1243 log_hexdump(LOG_INFO, b->pos, mbuf_length(b), "parsed bad rsp %"PRIu64" "
1244 "res %d type %d state %d", r->id, r->result, r->type,
1245 r->state);
1246 }
1247
1248 bool
memcache_failure(const struct msg * r)1249 memcache_failure(const struct msg *r)
1250 {
1251 return false;
1252 }
1253
1254 static rstatus_t
memcache_append_key(struct msg * r,const uint8_t * key,uint32_t keylen)1255 memcache_append_key(struct msg *r, const uint8_t *key, uint32_t keylen)
1256 {
1257 struct mbuf *mbuf;
1258 struct keypos *kpos;
1259
1260 mbuf = msg_ensure_mbuf(r, keylen + 2);
1261 if (mbuf == NULL) {
1262 return NC_ENOMEM;
1263 }
1264
1265 kpos = array_push(r->keys);
1266 if (kpos == NULL) {
1267 return NC_ENOMEM;
1268 }
1269
1270 kpos->start = mbuf->last;
1271 kpos->end = mbuf->last + keylen;
1272 mbuf_copy(mbuf, key, keylen);
1273 r->mlen += keylen;
1274
1275 mbuf_copy(mbuf, (const uint8_t *)" ", 1);
1276 r->mlen += 1;
1277 return NC_OK;
1278 }
1279
1280 /*
1281 * read the comment in proto/nc_redis.c
1282 */
1283 static rstatus_t
memcache_fragment_retrieval(struct msg * r,uint32_t nserver,struct msg_tqh * frag_msgq,uint32_t key_step)1284 memcache_fragment_retrieval(struct msg *r, uint32_t nserver,
1285 struct msg_tqh *frag_msgq,
1286 uint32_t key_step)
1287 {
1288 struct mbuf *mbuf;
1289 struct msg **sub_msgs;
1290 uint32_t i;
1291 rstatus_t status;
1292
1293 sub_msgs = nc_zalloc(nserver * sizeof(*sub_msgs));
1294 if (sub_msgs == NULL) {
1295 return NC_ENOMEM;
1296 }
1297
1298 ASSERT(r->frag_seq == NULL);
1299 r->frag_seq = nc_alloc(array_n(r->keys) * sizeof(*r->frag_seq));
1300 if (r->frag_seq == NULL) {
1301 nc_free(sub_msgs);
1302 return NC_ENOMEM;
1303 }
1304
1305 mbuf = STAILQ_FIRST(&r->mhdr);
1306 mbuf->pos = mbuf->start;
1307
1308 /*
1309 * This code is based on the assumption that 'gets ' is located
1310 * in a contiguous location.
1311 * This is always true because we have capped our MBUF_MIN_SIZE at 512 and
1312 * whenever we have multiple messages, we copy the tail message into a new mbuf
1313 */
1314 for (; *(mbuf->pos) != ' ';) { /* eat get/gets */
1315 mbuf->pos++;
1316 }
1317 mbuf->pos++;
1318
1319 r->frag_id = msg_gen_frag_id();
1320 r->nfrag = 0;
1321 r->frag_owner = r;
1322
1323 /* Build up the key1 key2 ... to be sent to a given server at index idx */
1324 for (i = 0; i < array_n(r->keys); i++) { /* for each key */
1325 struct msg *sub_msg;
1326 struct keypos *kpos = array_get(r->keys, i);
1327 uint32_t idx = msg_backend_idx(r, kpos->start, kpos->end - kpos->start);
1328 ASSERT(idx < nserver);
1329
1330 if (sub_msgs[idx] == NULL) {
1331 sub_msgs[idx] = msg_get(r->owner, r->request, r->redis);
1332 if (sub_msgs[idx] == NULL) {
1333 nc_free(sub_msgs);
1334 return NC_ENOMEM;
1335 }
1336 }
1337 r->frag_seq[i] = sub_msg = sub_msgs[idx];
1338
1339 sub_msg->narg++;
1340 status = memcache_append_key(sub_msg, kpos->start, kpos->end - kpos->start);
1341 if (status != NC_OK) {
1342 nc_free(sub_msgs);
1343 return status;
1344 }
1345 }
1346 /*
1347 * prepend mget header, and forward the get[s] key1 key2\r\n
1348 * to the corresponding server(s)
1349 */
1350 for (i = 0; i < nserver; i++) {
1351 struct msg *sub_msg = sub_msgs[i];
1352 if (sub_msg == NULL) {
1353 continue;
1354 }
1355
1356 /* prepend get/gets */
1357 if (r->type == MSG_REQ_MC_GET) {
1358 status = msg_prepend(sub_msg, (const uint8_t *)"get ", 4);
1359 } else if (r->type == MSG_REQ_MC_GETS) {
1360 status = msg_prepend(sub_msg, (const uint8_t *)"gets ", 5);
1361 }
1362 if (status != NC_OK) {
1363 nc_free(sub_msgs);
1364 return status;
1365 }
1366
1367 /* append \r\n */
1368 status = msg_append(sub_msg, (const uint8_t *)CRLF, CRLF_LEN);
1369 if (status != NC_OK) {
1370 nc_free(sub_msgs);
1371 return status;
1372 }
1373
1374 sub_msg->type = r->type;
1375 sub_msg->frag_id = r->frag_id;
1376 sub_msg->frag_owner = r->frag_owner;
1377
1378 TAILQ_INSERT_TAIL(frag_msgq, sub_msg, m_tqe);
1379 r->nfrag++;
1380 }
1381
1382 nc_free(sub_msgs);
1383 return NC_OK;
1384 }
1385
1386 rstatus_t
memcache_fragment(struct msg * r,uint32_t nserver,struct msg_tqh * frag_msgq)1387 memcache_fragment(struct msg *r, uint32_t nserver, struct msg_tqh *frag_msgq)
1388 {
1389 if (memcache_should_fragment(r)) {
1390 return memcache_fragment_retrieval(r, nserver, frag_msgq, 1);
1391 }
1392 return NC_OK;
1393 }
1394
1395 /*
1396 * Pre-coalesce handler is invoked when the message is a response to
1397 * the fragmented multi vector request - 'get' or 'gets' and all the
1398 * responses to the fragmented request vector hasn't been received
1399 */
1400 void
memcache_pre_coalesce(struct msg * r)1401 memcache_pre_coalesce(struct msg *r)
1402 {
1403 struct msg *pr = r->peer; /* peer request */
1404 struct mbuf *mbuf;
1405
1406 ASSERT(!r->request);
1407 ASSERT(pr->request);
1408
1409 if (pr->frag_id == 0) {
1410 /* do nothing, if not a response to a fragmented request */
1411 return;
1412 }
1413
1414 pr->frag_owner->nfrag_done++;
1415 switch (r->type) {
1416
1417 case MSG_RSP_MC_VALUE:
1418 case MSG_RSP_MC_END:
1419
1420 /*
1421 * Readjust responses of the fragmented message vector by not
1422 * including the end marker for all
1423 */
1424
1425 ASSERT(r->end != NULL);
1426
1427 for (;;) {
1428 mbuf = STAILQ_LAST(&r->mhdr, mbuf, next);
1429 ASSERT(mbuf != NULL);
1430
1431 /*
1432 * We cannot assert that end marker points to the last mbuf
1433 * Consider a scenario where end marker points to the
1434 * penultimate mbuf and the last mbuf only contains spaces
1435 * and CRLF: mhdr -> [...END] -> [\r\n]
1436 */
1437
1438 if (r->end >= mbuf->pos && r->end < mbuf->last) {
1439 /* end marker is within this mbuf */
1440 r->mlen -= (uint32_t)(mbuf->last - r->end);
1441 mbuf->last = r->end;
1442 break;
1443 }
1444
1445 /* end marker is not in this mbuf */
1446 r->mlen -= mbuf_length(mbuf);
1447 mbuf_remove(&r->mhdr, mbuf);
1448 mbuf_put(mbuf);
1449 }
1450
1451 break;
1452
1453 default:
1454 /*
1455 * Valid responses for a fragmented requests are MSG_RSP_MC_VALUE or,
1456 * MSG_RSP_MC_END. For an invalid response, we send out SERVER_ERRROR
1457 * with EINVAL errno
1458 */
1459 mbuf = STAILQ_FIRST(&r->mhdr);
1460 log_hexdump(LOG_ERR, mbuf->pos, mbuf_length(mbuf), "rsp fragment "
1461 "with unknown type %d", r->type);
1462 pr->error = 1;
1463 pr->err = EINVAL;
1464 break;
1465 }
1466 }
1467
1468 /*
1469 * Copy one response from src to dst and return bytes copied
1470 */
1471 static rstatus_t
memcache_copy_bulk(struct msg * dst,struct msg * src)1472 memcache_copy_bulk(struct msg *dst, struct msg *src)
1473 {
1474 struct mbuf *mbuf, *nbuf;
1475 const uint8_t *p;
1476 const uint8_t *last;
1477 uint32_t len = 0;
1478 uint32_t bytes = 0;
1479 uint32_t i = 0;
1480
1481 for (mbuf = STAILQ_FIRST(&src->mhdr);
1482 mbuf && mbuf_empty(mbuf);
1483 mbuf = STAILQ_FIRST(&src->mhdr)) {
1484
1485 mbuf_remove(&src->mhdr, mbuf);
1486 mbuf_put(mbuf);
1487 }
1488
1489 mbuf = STAILQ_FIRST(&src->mhdr);
1490 if (mbuf == NULL) {
1491 return NC_OK; /* key not exists */
1492 }
1493 p = mbuf->pos;
1494 last = mbuf->last;
1495
1496 /*
1497 * get : VALUE key flags len\r\nval\r\n
1498 * gets: VALUE key flags len cas\r\nval\r\n
1499 */
1500 ASSERT(*p == 'V');
1501 i = 0;
1502 while (p < last) { /* eat 'VALUE key flags ' */
1503 if (*p == ' ') {
1504 i++;
1505 if (i >= 3) {
1506 p++;
1507 break;
1508 }
1509 }
1510 p++;
1511 }
1512
1513 len = 0;
1514 for (; p < last && isdigit(*p); p++) {
1515 len = len * 10 + (uint32_t)(*p - '0');
1516 }
1517
1518 for (; p < last && ('\r' != *p); p++) { /* eat cas for gets */
1519 ;
1520 }
1521 /* "*p" should be pointing to '\r' */
1522
1523 len += CRLF_LEN * 2;
1524 len += (p - mbuf->pos);
1525
1526 if (p >= last) {
1527 log_error("Saw memcache value response where header was not "
1528 "parsed or header length %d unexpectedly exceeded mbuf size limit",
1529 (int)(p - mbuf->pos));
1530 return NC_ERROR;
1531 }
1532
1533
1534 bytes = len;
1535
1536 /* copy len bytes to dst */
1537 for (; mbuf;) {
1538 if (mbuf_length(mbuf) <= len) { /* steal this mbuf from src to dst */
1539 nbuf = STAILQ_NEXT(mbuf, next);
1540 mbuf_remove(&src->mhdr, mbuf);
1541 mbuf_insert(&dst->mhdr, mbuf);
1542 len -= mbuf_length(mbuf);
1543 mbuf = nbuf;
1544 } else { /* split it */
1545 nbuf = mbuf_get();
1546 if (nbuf == NULL) {
1547 return NC_ENOMEM;
1548 }
1549 mbuf_copy(nbuf, mbuf->pos, len);
1550 mbuf_insert(&dst->mhdr, nbuf);
1551 mbuf->pos += len;
1552 break;
1553 }
1554 }
1555
1556 dst->mlen += bytes;
1557 src->mlen -= bytes;
1558 log_debug(LOG_VVERB, "memcache_copy_bulk copy bytes: %d", bytes);
1559 return NC_OK;
1560 }
1561
1562 /*
1563 * Post-coalesce handler is invoked when the message is a response to
1564 * the fragmented multi vector request - 'get' or 'gets' and all the
1565 * responses to the fragmented request vector has been received and
1566 * the fragmented request is consider to be done
1567 */
1568 void
memcache_post_coalesce(struct msg * request)1569 memcache_post_coalesce(struct msg *request)
1570 {
1571 struct msg *response = request->peer;
1572 struct msg *sub_msg;
1573 uint32_t i;
1574 rstatus_t status;
1575
1576 ASSERT(!response->request);
1577 ASSERT(request->request && (request->frag_owner == request));
1578 if (request->error || request->ferror) {
1579 response->owner->err = 1;
1580 return;
1581 }
1582
1583 for (i = 0; i < array_n(request->keys); i++) { /* for each key */
1584 sub_msg = request->frag_seq[i]->peer; /* get its peer response */
1585 if (sub_msg == NULL) {
1586 response->owner->err = 1;
1587 return;
1588 }
1589 status = memcache_copy_bulk(response, sub_msg);
1590 if (status != NC_OK) {
1591 response->owner->err = 1;
1592 return;
1593 }
1594 }
1595
1596 /* append END\r\n */
1597 status = msg_append(response, (const uint8_t *)"END\r\n", 5);
1598 if (status != NC_OK) {
1599 response->owner->err = 1;
1600 return;
1601 }
1602 }
1603
1604 void
memcache_post_connect(struct context * ctx,struct conn * conn,struct server * server)1605 memcache_post_connect(struct context *ctx, struct conn *conn, struct server *server)
1606 {
1607 }
1608
1609 void
memcache_swallow_msg(struct conn * conn,struct msg * pmsg,struct msg * msg)1610 memcache_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg)
1611 {
1612 }
1613
1614 rstatus_t
memcache_add_auth(struct context * ctx,struct conn * c_conn,struct conn * s_conn)1615 memcache_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn)
1616 {
1617 NOT_REACHED();
1618 return NC_OK;
1619 }
1620
1621 rstatus_t
memcache_reply(struct msg * r)1622 memcache_reply(struct msg *r)
1623 {
1624 NOT_REACHED();
1625 return NC_OK;
1626 }
1627
1628