1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "tsmemcache.h"
25
26 /*
27 TODO
28 - on OPEN_WRITE_FAIL don't poll, figure out another way, and timeout
29 - factor code better, particularly incr/set
30 - MIOBufferAccessor::reader_for
31 - cleanup creader dependency in stream_event
32 */
33
34 #define REALTIME_MAXDELTA 60 * 60 * 24 * 30
35 #define STRCMP_REST(_c, _s, _e) (((_e) - (_s)) < (int)sizeof(_c) || STRCMP(_s, _c) || !isspace((_s)[sizeof(_c) - 1]))
36
37 ClassAllocator<MC> theMCAllocator("MC");
38
39 static time_t base_day_time;
40
41 // These should be persistent.
42 int32_t MC::verbosity = 0;
43 ink_hrtime MC::last_flush = 0;
44 int64_t MC::next_cas = 1;
45
46 static void
tsmemcache_constants()47 tsmemcache_constants()
48 {
49 struct tm tm;
50 memset(&tm, 0, sizeof(tm));
51 // jan 1 2010
52 tm.tm_year = 110;
53 tm.tm_mon = 1;
54 tm.tm_mday = 1;
55 base_day_time = mktime(&tm);
56 ink_assert(base_day_time != (time_t)-1);
57 }
58
59 #ifdef DEBUG
60 char debug_string_buffer[TSMEMCACHE_TMP_CMD_BUFFER_SIZE];
61 static char *
mc_string(const char * s,int len)62 mc_string(const char *s, int len)
63 {
64 int l = len;
65 while (l && (s[l - 1] == '\r' || s[l - 1] == '\n')) {
66 l--;
67 }
68 if (l > TSMEMCACHE_TMP_CMD_BUFFER_SIZE - 1) {
69 l = TSMEMCACHE_TMP_CMD_BUFFER_SIZE - 1;
70 }
71 if (l) {
72 memcpy(debug_string_buffer, s, l);
73 }
74 debug_string_buffer[l] = 0;
75 return debug_string_buffer;
76 }
77 #endif
78
79 #ifdef DEBUG
80 #define MCDebugBuf(_t, _s, _l) \
81 if (is_debug_tag_set(_t)) \
82 printf(_t ": %s\n", mc_string(_s, _l))
83 #define MCDebug Debug
84 #else
85 #define MCDebugBuf(_t, _s, _l) \
86 do { \
87 } while (0)
88 #define MCDebug \
89 if (0) \
90 Debug
91 #endif
92
93 static uint64_t
ink_hton64(uint64_t in)94 ink_hton64(uint64_t in)
95 {
96 int32_t val = 1;
97 uint8_t *c = reinterpret_cast<uint8_t *>(&val);
98 if (*c == 1) {
99 union {
100 uint64_t rv;
101 uint8_t b[8];
102 } x;
103 #define SWP1B(_x, _y) \
104 do { \
105 uint8_t t = (_y); \
106 (_y) = (_x); \
107 (_x) = t; \
108 } while (0)
109 x.rv = in;
110 SWP1B(x.b[0], x.b[7]);
111 SWP1B(x.b[1], x.b[6]);
112 SWP1B(x.b[2], x.b[5]);
113 SWP1B(x.b[3], x.b[4]);
114 #undef SWP1B
115 return x.rv;
116 } else {
117 return in;
118 }
119 }
120 #define ink_ntoh64 ink_hton64
121
122 int
main_event(int event,void * data)123 MCAccept::main_event(int event, void *data)
124 {
125 if (event == NET_EVENT_ACCEPT) {
126 NetVConnection *netvc = (NetVConnection *)data;
127 MC *mc = theMCAllocator.alloc();
128 if (!mutex->thread_holding) {
129 mc->new_connection(netvc, netvc->thread);
130 } else {
131 mc->new_connection(netvc, mutex->thread_holding);
132 }
133 return EVENT_CONT;
134 } else {
135 Fatal("tsmemcache accept received fatal error: errno = %d", -(static_cast<int>((intptr_t)data)));
136 return EVENT_CONT;
137 }
138 }
139
140 void
new_connection(NetVConnection * netvc,EThread * thread)141 MC::new_connection(NetVConnection *netvc, EThread *thread)
142 {
143 nvc = netvc;
144 mutex = new_ProxyMutex();
145 rbuf = new_MIOBuffer(MAX_IOBUFFER_SIZE);
146 rbuf->water_mark = TSMEMCACHE_TMP_CMD_BUFFER_SIZE;
147 reader = rbuf->alloc_reader();
148 wbuf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
149 cbuf = 0;
150 writer = wbuf->alloc_reader();
151 SCOPED_MUTEX_LOCK(lock, mutex, thread);
152 rvio = nvc->do_io_read(this, INT64_MAX, rbuf);
153 wvio = nvc->do_io_write(this, 0, writer);
154 header.magic = TSMEMCACHE_HEADER_MAGIC;
155 read_from_client();
156 }
157
158 int
die()159 MC::die()
160 {
161 if (pending_action && pending_action != ACTION_RESULT_DONE) {
162 pending_action->cancel();
163 }
164 if (nvc) {
165 nvc->do_io_close(1); // abort
166 }
167 if (crvc) {
168 crvc->do_io_close(1); // abort
169 }
170 if (cwvc) {
171 cwvc->do_io_close(1); // abort
172 }
173 if (rbuf) {
174 free_MIOBuffer(rbuf);
175 }
176 if (wbuf) {
177 free_MIOBuffer(wbuf);
178 }
179 if (cbuf) {
180 free_MIOBuffer(cbuf);
181 }
182 if (tbuf) {
183 ats_free(tbuf);
184 }
185 mutex = NULL;
186 theMCAllocator.free(this);
187 return EVENT_DONE;
188 }
189
190 int
unexpected_event()191 MC::unexpected_event()
192 {
193 ink_assert(!"unexpected event");
194 return die();
195 }
196
197 int
write_then_close(int64_t ntowrite)198 MC::write_then_close(int64_t ntowrite)
199 {
200 SET_HANDLER(&MC::write_then_close_event);
201 return write_to_client(ntowrite);
202 }
203
204 int
write_then_read_from_client(int64_t ntowrite)205 MC::write_then_read_from_client(int64_t ntowrite)
206 {
207 SET_HANDLER(&MC::read_from_client_event);
208 return write_to_client(ntowrite);
209 }
210
211 int
stream_then_read_from_client(int64_t ntowrite)212 MC::stream_then_read_from_client(int64_t ntowrite)
213 {
214 SET_HANDLER(&MC::read_from_client_event);
215 creader = reader;
216 TS_PUSH_HANDLER(&MC::stream_event);
217 return write_to_client(ntowrite);
218 }
219
220 void
add_binary_header(uint16_t err,uint8_t hdr_len,uint16_t key_len,uint32_t body_len)221 MC::add_binary_header(uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len)
222 {
223 protocol_binary_response_header r;
224
225 r.response.magic = static_cast<uint8_t>(PROTOCOL_BINARY_RES);
226 r.response.opcode = binary_header.request.opcode;
227 r.response.keylen = (uint16_t)htons(key_len);
228 r.response.extlen = hdr_len;
229 r.response.datatype = static_cast<uint8_t>(PROTOCOL_BINARY_RAW_BYTES);
230 r.response.status = (uint16_t)htons(err);
231 r.response.bodylen = htonl(body_len);
232 r.response.opaque = binary_header.request.opaque;
233 r.response.cas = ink_hton64(header.cas);
234
235 wbuf->write(&r, sizeof(r));
236 }
237
238 int
write_binary_error(protocol_binary_response_status err,int swallow)239 MC::write_binary_error(protocol_binary_response_status err, int swallow)
240 {
241 const char *errstr = "Unknown error";
242 switch (err) {
243 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
244 errstr = "Out of memory";
245 break;
246 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
247 errstr = "Unknown command";
248 break;
249 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
250 errstr = "Not found";
251 break;
252 case PROTOCOL_BINARY_RESPONSE_EINVAL:
253 errstr = "Invalid arguments";
254 break;
255 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
256 errstr = "Data exists for key.";
257 break;
258 case PROTOCOL_BINARY_RESPONSE_E2BIG:
259 errstr = "Too large.";
260 break;
261 case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
262 errstr = "Non-numeric server-side value for incr or decr";
263 break;
264 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
265 errstr = "Not stored.";
266 break;
267 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
268 errstr = "Auth failure.";
269 break;
270 default:
271 ink_assert(!"unhandled error");
272 errstr = "UNHANDLED ERROR";
273 Warning("tsmemcache: unhandled error: %d\n", err);
274 }
275
276 size_t len = strlen(errstr);
277 add_binary_header(err, 0, 0, len);
278 if (swallow > 0) {
279 int64_t avail = reader->read_avail();
280 if (avail >= swallow) {
281 reader->consume(swallow);
282 } else {
283 swallow_bytes = swallow - avail;
284 reader->consume(avail);
285 SET_HANDLER(&MC::swallow_then_read_event);
286 }
287 }
288 return 0;
289 }
290
291 int
swallow_then_read_event(int event,void * data)292 MC::swallow_then_read_event(int event, void *data)
293 {
294 rvio->nbytes = INT64_MAX;
295 int64_t avail = reader->read_avail();
296 if (avail >= swallow_bytes) {
297 reader->consume(swallow_bytes);
298 swallow_bytes = 0;
299 return read_from_client();
300 } else {
301 swallow_bytes -= avail;
302 reader->consume(avail);
303 return EVENT_CONT;
304 }
305 }
306
307 int
swallow_cmd_then_read_from_client_event(int event,void * data)308 MC::swallow_cmd_then_read_from_client_event(int event, void *data)
309 {
310 int64_t avail = reader->read_avail();
311 if (avail) {
312 int64_t n = reader->memchr('\n');
313 if (n >= 0) {
314 reader->consume(n + 1);
315 return read_from_client();
316 }
317 reader->consume(avail);
318 return EVENT_CONT;
319 }
320 return EVENT_CONT;
321 }
322
323 int
protocol_error()324 MC::protocol_error()
325 {
326 Warning("tsmemcache: protocol error");
327 return write_then_close(write_binary_error(PROTOCOL_BINARY_RESPONSE_EINVAL, 0));
328 }
329
330 int
read_from_client()331 MC::read_from_client()
332 {
333 if (swallow_bytes) {
334 return TS_SET_CALL(&MC::swallow_then_read_event, VC_EVENT_READ_READY, rvio);
335 }
336 read_offset = 0;
337 end_of_cmd = 0;
338 ngets = 0;
339 ff = 0;
340 if (crvc) {
341 crvc->do_io_close();
342 crvc = 0;
343 crvio = NULL;
344 }
345 if (cwvc) {
346 cwvc->do_io_close();
347 cwvc = 0;
348 cwvio = NULL;
349 }
350 if (cbuf) {
351 cbuf->clear();
352 }
353 ink_assert(!crvc && !cwvc);
354 if (tbuf) {
355 ats_free(tbuf);
356 }
357 return TS_SET_CALL(&MC::read_from_client_event, VC_EVENT_READ_READY, rvio);
358 }
359
360 int
write_to_client(int64_t towrite)361 MC::write_to_client(int64_t towrite)
362 {
363 (void)towrite;
364 wvio->nbytes = INT64_MAX;
365 wvio->reenable();
366 return EVENT_CONT;
367 }
368
369 int
write_binary_response(const void * d,int hlen,int keylen,int dlen)370 MC::write_binary_response(const void *d, int hlen, int keylen, int dlen)
371 {
372 if (!f.noreply || binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETQ ||
373 binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETKQ) {
374 add_binary_header(0, hlen, keylen, dlen);
375 if (dlen) {
376 MCDebug("tsmemcache", "response dlen %d\n", dlen);
377 wbuf->write(d, dlen);
378 } else {
379 MCDebug("tsmemcache", "no response\n");
380 }
381 }
382 return writer->read_avail();
383 }
384
385 #define CHECK_READ_AVAIL(_n, _h) \
386 do { \
387 if (reader->read_avail() < _n) { \
388 switch (event) { \
389 case VC_EVENT_EOS: \
390 if ((VIO *)data == rvio) \
391 break; \
392 /* fallthrough */ \
393 case VC_EVENT_READ_READY: \
394 return EVENT_CONT; \
395 case VC_EVENT_WRITE_READY: \
396 if (wvio->buffer.reader()->read_avail() > 0) \
397 return EVENT_CONT; \
398 /* fallthrough */ \
399 case VC_EVENT_WRITE_COMPLETE: \
400 return EVENT_DONE; \
401 default: \
402 break; \
403 } \
404 return die(); \
405 } \
406 } while (0)
407
408 static char *
get_pointer(MC * mc,int start,int len)409 get_pointer(MC *mc, int start, int len)
410 {
411 if (mc->reader->block_read_avail() >= start + len) {
412 return mc->reader->start() + start;
413 }
414 // the block of data straddles an IOBufferBlock boundary, exceptional case, malloc
415 ink_assert(!mc->tbuf);
416 mc->tbuf = static_cast<char *>(ats_malloc(len));
417 mc->reader->memcpy(mc->tbuf, len, start);
418 return mc->tbuf;
419 }
420
421 static inline char *
binary_get_key(MC * mc)422 binary_get_key(MC *mc)
423 {
424 return get_pointer(mc, 0, mc->binary_header.request.keylen);
425 }
426
427 int
cache_read_event(int event,void * data)428 MC::cache_read_event(int event, void *data)
429 {
430 switch (event) {
431 case CACHE_EVENT_OPEN_READ: {
432 crvc = (CacheVConnection *)data;
433 int hlen = 0;
434 if (crvc->get_header((void **)&rcache_header, &hlen) < 0) {
435 goto Lfail;
436 }
437 if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || rcache_header->magic != TSMEMCACHE_HEADER_MAGIC) {
438 goto Lfail;
439 }
440 if (header.nkey != rcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + rcache_header->nkey)) {
441 goto Lfail;
442 }
443 if (memcmp(key, rcache_header->key(), header.nkey)) {
444 goto Lfail;
445 }
446 {
447 ink_hrtime t = Thread::get_hrtime();
448 if ((static_cast<ink_hrtime>(rcache_header->settime)) <= last_flush ||
449 t >= (static_cast<ink_hrtime>(rcache_header->settime)) + HRTIME_SECONDS(rcache_header->exptime)) {
450 goto Lfail;
451 }
452 }
453 break;
454 Lfail:
455 crvc->do_io_close();
456 crvc = 0;
457 crvio = NULL;
458 event = CACHE_EVENT_OPEN_READ_FAILED; // convert to failure
459 break;
460 }
461 case VC_EVENT_EOS:
462 case VC_EVENT_ERROR:
463 case CACHE_EVENT_OPEN_READ_FAILED:
464 break;
465 default:
466 return EVENT_CONT;
467 }
468 return TS_POP_CALL(event, data);
469 }
470
471 int
get_item()472 MC::get_item()
473 {
474 TS_PUSH_HANDLER(&MC::cache_read_event);
475 CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey);
476 pending_action = cacheProcessor.open_read(this, &cache_key);
477 return EVENT_CONT;
478 }
479
480 int
set_item()481 MC::set_item()
482 {
483 CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey);
484 pending_action = cacheProcessor.open_write(this, &cache_key, CACHE_FRAG_TYPE_NONE, header.nbytes,
485 CACHE_WRITE_OPT_OVERWRITE | TSMEMCACHE_WRITE_SYNC);
486 return EVENT_CONT;
487 }
488
489 int
delete_item()490 MC::delete_item()
491 {
492 CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey);
493 pending_action = cacheProcessor.remove(this, &cache_key, CACHE_FRAG_TYPE_NONE);
494 return EVENT_CONT;
495 }
496
497 int
binary_get_event(int event,void * data)498 MC::binary_get_event(int event, void *data)
499 {
500 ink_assert(!"EVENT_ITEM_GOT is incorrect here");
501 if (event != TSMEMCACHE_EVENT_GOT_ITEM) {
502 CHECK_READ_AVAIL(binary_header.request.keylen, &MC::binary_get);
503 key = binary_get_key(this);
504 header.nkey = binary_header.request.keylen;
505 return get_item();
506 } else if (event == CACHE_EVENT_OPEN_READ_FAILED) {
507 if (f.noreply) {
508 return read_from_client();
509 }
510 if (binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETK) {
511 add_binary_header(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, header.nkey, header.nkey);
512 wbuf->write(key, header.nkey);
513 return write_then_read_from_client();
514 } else {
515 return write_binary_error(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
516 }
517 } else if (event == CACHE_EVENT_OPEN_READ) {
518 protocol_binary_response_get *rsp = &res.get;
519 uint16_t keylen = 0;
520 uint32_t bodylen = sizeof(rsp->message.body) + (rcache_header->nbytes - 2);
521 bool getk =
522 (binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETK || binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETKQ);
523 if (getk) {
524 bodylen += header.nkey;
525 keylen = header.nkey;
526 }
527 add_binary_header(0, sizeof(rsp->message.body), keylen, bodylen);
528 rsp->message.header.response.cas = ink_hton64(rcache_header->cas);
529 rsp->message.body.flags = htonl(rcache_header->flags);
530 wbuf->write(&rsp->message.body, sizeof(rsp->message.body));
531 if (getk) {
532 wbuf->write(key, header.nkey);
533 }
534 crvio = crvc->do_io_read(this, rcache_header->nbytes, wbuf);
535 return stream_then_read_from_client(rcache_header->nbytes);
536 } else {
537 return unexpected_event();
538 }
539 return 0;
540 }
541
542 int
bin_read_key()543 MC::bin_read_key()
544 {
545 return -1;
546 }
547
548 int
read_binary_from_client_event(int event,void * data)549 MC::read_binary_from_client_event(int event, void *data)
550 {
551 if (reader->read_avail() < (int)sizeof(binary_header)) {
552 return EVENT_CONT;
553 }
554 reader->memcpy(&binary_header, sizeof(binary_header));
555 if (binary_header.request.magic != PROTOCOL_BINARY_REQ) {
556 Warning("tsmemcache: bad binary magic: %x", binary_header.request.magic);
557 return die();
558 }
559 int keylen = binary_header.request.keylen = ntohs(binary_header.request.keylen);
560 int bodylen = binary_header.request.bodylen = ntohl(binary_header.request.bodylen);
561 binary_header.request.cas = ink_ntoh64(binary_header.request.cas);
562 int extlen = binary_header.request.extlen;
563 end_of_cmd = sizeof(binary_header) + extlen;
564
565 #define CHECK_PROTOCOL(_e) \
566 if (!(_e)) \
567 return protocol_error();
568
569 MCDebug("tsmemcache", "bin cmd %d\n", binary_header.request.opcode);
570 switch (binary_header.request.opcode) {
571 case PROTOCOL_BINARY_CMD_VERSION:
572 CHECK_PROTOCOL(extlen == 0 && keylen == 0 && bodylen == 0);
573 return write_to_client(write_binary_response(TSMEMCACHE_VERSION, 0, 0, STRLEN(TSMEMCACHE_VERSION)));
574 case PROTOCOL_BINARY_CMD_NOOP:
575 CHECK_PROTOCOL(extlen == 0 && keylen == 0 && bodylen == 0);
576 return write_to_client(write_binary_response(nullptr, 0, 0, 0));
577 case PROTOCOL_BINARY_CMD_GETKQ:
578 f.noreply = 1; // fall through
579 case PROTOCOL_BINARY_CMD_GETQ:
580 f.noreply = 1; // fall through
581 case PROTOCOL_BINARY_CMD_GETK:
582 case PROTOCOL_BINARY_CMD_GET:
583 CHECK_PROTOCOL(extlen == 0 && (int)bodylen == keylen && keylen > 0);
584 return TS_SET_CALL(&MC::binary_get_event, event, data);
585 case PROTOCOL_BINARY_CMD_APPENDQ:
586 case PROTOCOL_BINARY_CMD_APPEND:
587 f.set_append = 1;
588 goto Lset;
589 case PROTOCOL_BINARY_CMD_PREPENDQ:
590 case PROTOCOL_BINARY_CMD_PREPEND:
591 f.set_prepend = 1;
592 goto Lset;
593 case PROTOCOL_BINARY_CMD_ADDQ:
594 f.noreply = 1; // fall through
595 case PROTOCOL_BINARY_CMD_ADD:
596 CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8);
597 f.set_add = 1;
598 goto Lset;
599 case PROTOCOL_BINARY_CMD_REPLACEQ:
600 f.noreply = 1; // fall through
601 case PROTOCOL_BINARY_CMD_REPLACE:
602 CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8);
603 f.set_replace = 1;
604 goto Lset;
605 case PROTOCOL_BINARY_CMD_SETQ:
606 f.noreply = 1; // fall through
607 case PROTOCOL_BINARY_CMD_SET: {
608 CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8);
609 Lset:
610 if (bin_read_key() < 0) {
611 return EVENT_CONT;
612 }
613 key = binary_get_key(this);
614 header.nkey = keylen;
615 protocol_binary_request_set *req = reinterpret_cast<protocol_binary_request_set *>(&binary_header);
616 req->message.body.flags = ntohl(req->message.body.flags);
617 req->message.body.expiration = ntohl(req->message.body.expiration);
618 nbytes = bodylen - (header.nkey + extlen);
619 break;
620 }
621 case PROTOCOL_BINARY_CMD_DELETEQ:
622 f.noreply = 1; // fall through
623 case PROTOCOL_BINARY_CMD_DELETE:
624 break;
625 case PROTOCOL_BINARY_CMD_INCREMENTQ:
626 f.noreply = 1; // fall through
627 case PROTOCOL_BINARY_CMD_INCREMENT:
628 break;
629 case PROTOCOL_BINARY_CMD_DECREMENTQ:
630 f.noreply = 1; // fall through
631 case PROTOCOL_BINARY_CMD_DECREMENT:
632 break;
633 case PROTOCOL_BINARY_CMD_QUITQ:
634 f.noreply = 1; // fall through
635 case PROTOCOL_BINARY_CMD_QUIT:
636 if (f.noreply) {
637 return die();
638 }
639 return write_then_close(write_binary_response(nullptr, 0, 0, 0));
640 case PROTOCOL_BINARY_CMD_FLUSHQ:
641 f.noreply = 1; // fall through
642 case PROTOCOL_BINARY_CMD_FLUSH:
643 break;
644 break;
645 case PROTOCOL_BINARY_CMD_STAT:
646 break;
647 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
648 case PROTOCOL_BINARY_CMD_SASL_AUTH:
649 case PROTOCOL_BINARY_CMD_SASL_STEP:
650 Warning("tsmemcache: sasl not (yet) supported");
651 return die();
652 case PROTOCOL_BINARY_CMD_RGET:
653 case PROTOCOL_BINARY_CMD_RSET:
654 case PROTOCOL_BINARY_CMD_RSETQ:
655 case PROTOCOL_BINARY_CMD_RAPPEND:
656 case PROTOCOL_BINARY_CMD_RAPPENDQ:
657 case PROTOCOL_BINARY_CMD_RPREPEND:
658 case PROTOCOL_BINARY_CMD_RPREPENDQ:
659 case PROTOCOL_BINARY_CMD_RDELETE:
660 case PROTOCOL_BINARY_CMD_RDELETEQ:
661 case PROTOCOL_BINARY_CMD_RINCR:
662 case PROTOCOL_BINARY_CMD_RINCRQ:
663 case PROTOCOL_BINARY_CMD_RDECR:
664 case PROTOCOL_BINARY_CMD_RDECRQ:
665 Warning("tsmemcache: range not (yet) supported");
666 return die();
667 default:
668 Warning("tsmemcache: unexpected binary opcode %x", binary_header.request.opcode);
669 return die();
670 }
671 return EVENT_CONT;
672 }
673
674 int
ascii_response(const char * s,int len)675 MC::ascii_response(const char *s, int len)
676 {
677 if (!f.noreply) {
678 wbuf->write(s, len);
679 wvio->nbytes = INT64_MAX;
680 wvio->reenable();
681 MCDebugBuf("tsmemcache_ascii_response", s, len);
682 }
683 if (end_of_cmd > 0) {
684 reader->consume(end_of_cmd);
685 return read_from_client();
686 } else if (end_of_cmd < 0) {
687 return read_from_client();
688 } else {
689 return TS_SET_CALL(&MC::swallow_cmd_then_read_from_client_event, EVENT_NONE, NULL);
690 }
691 }
692
693 char *
get_ascii_input(int n,int * end)694 MC::get_ascii_input(int n, int *end)
695 {
696 int block_read_avail = reader->block_read_avail();
697 if (block_read_avail >= n) {
698 Lblock:
699 *end = block_read_avail;
700 return reader->start();
701 }
702 int read_avail = reader->read_avail();
703 if (block_read_avail == read_avail) {
704 goto Lblock;
705 }
706 char *c = tmp_cmd_buffer;
707 int e = read_avail;
708 if (e > n) {
709 e = n;
710 }
711 reader->memcpy(c, e);
712 *end = e;
713 return c;
714 }
715
716 int
ascii_get_event(int event,void * data)717 MC::ascii_get_event(int event, void *data)
718 {
719 switch (event) {
720 case CACHE_EVENT_OPEN_READ_FAILED:
721 reader->consume(read_offset);
722 read_offset = 0;
723 break;
724 case CACHE_EVENT_OPEN_READ: {
725 wbuf->WRITE("VALUE ");
726 wbuf->write(key, header.nkey);
727 wbuf->WRITE(" ");
728 char t[32], *te = t + 32;
729 char *flags = xutoa(rcache_header->flags, te);
730 wbuf->write(flags, te - flags);
731 wbuf->WRITE(" ");
732 char *bytes = xutoa(rcache_header->nbytes, te);
733 wbuf->write(bytes, te - bytes);
734 if (f.return_cas) {
735 wbuf->WRITE(" ");
736 char *pcas = xutoa(rcache_header->cas, te);
737 wbuf->write(pcas, te - pcas);
738 }
739 wbuf->WRITE("\r\n");
740 int ntowrite = writer->read_avail() + rcache_header->nbytes;
741 crvio = crvc->do_io_read(this, rcache_header->nbytes, wbuf);
742 creader = reader;
743 TS_PUSH_HANDLER(&MC::stream_event);
744 return write_to_client(ntowrite);
745 }
746 case TSMEMCACHE_STREAM_DONE:
747 crvc->do_io_close();
748 crvc = 0;
749 crvio = NULL;
750 reader->consume(read_offset);
751 read_offset = 0;
752 wbuf->WRITE("\r\n");
753 return ascii_gets();
754 default:
755 break;
756 }
757 return ascii_gets();
758 }
759
760 int
ascii_set_event(int event,void * data)761 MC::ascii_set_event(int event, void *data)
762 {
763 switch (event) {
764 case CACHE_EVENT_OPEN_WRITE_FAILED:
765 // another write currently in progress
766 mutex->thread_holding->schedule_in(this, TSMEMCACHE_RETRY_WRITE_INTERVAL);
767 return EVENT_CONT;
768 case EVENT_INTERVAL:
769 return read_from_client();
770 case CACHE_EVENT_OPEN_WRITE: {
771 cwvc = (CacheVConnection *)data;
772 int hlen = 0;
773 if (cwvc->get_header((void **)&wcache_header, &hlen) >= 0) {
774 if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || wcache_header->magic != TSMEMCACHE_HEADER_MAGIC) {
775 goto Lfail;
776 }
777 if (header.nkey != wcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + wcache_header->nkey)) {
778 goto Lfail;
779 }
780 ink_hrtime t = Thread::get_hrtime();
781 if ((static_cast<ink_hrtime>(wcache_header->settime)) <= last_flush ||
782 t >= (static_cast<ink_hrtime>(wcache_header->settime)) + HRTIME_SECONDS(wcache_header->exptime)) {
783 goto Lstale;
784 }
785 if (f.set_add) {
786 return ASCII_RESPONSE("NOT_STORED");
787 }
788 } else {
789 Lstale:
790 if (f.set_replace) {
791 return ASCII_RESPONSE("NOT_STORED");
792 }
793 }
794 memcpy(tmp_cache_header_key, key, header.nkey);
795 header.settime = Thread::get_hrtime();
796 if (exptime) {
797 if (exptime > REALTIME_MAXDELTA) {
798 if (HRTIME_SECONDS(exptime) <= (static_cast<ink_hrtime>(header.settime))) {
799 header.exptime = 0;
800 } else {
801 header.exptime = static_cast<int32_t>(exptime - (header.settime / HRTIME_SECOND));
802 }
803 } else {
804 header.exptime = exptime;
805 }
806 } else {
807 header.exptime = UINT32_MAX; // 136 years
808 }
809 if (f.set_cas) {
810 if (!wcache_header) {
811 return ASCII_RESPONSE("NOT_FOUND");
812 }
813 if (header.cas && header.cas != wcache_header->cas) {
814 return ASCII_RESPONSE("EXISTS");
815 }
816 }
817 header.cas = ink_atomic_increment(&next_cas, 1);
818 if (f.set_append || f.set_prepend) {
819 header.nbytes = nbytes + rcache_header->nbytes;
820 } else {
821 header.nbytes = nbytes;
822 }
823 cwvc->set_header(&header, header.len());
824 reader->consume(end_of_cmd);
825 end_of_cmd = -1;
826 swallow_bytes = 2; // \r\n
827 if (f.set_append) {
828 TS_PUSH_HANDLER(&MC::tunnel_event);
829 if (!cbuf) {
830 cbuf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
831 }
832 creader = cbuf->alloc_reader();
833 crvio = crvc->do_io_read(this, rcache_header->nbytes, cbuf);
834 cwvio = cwvc->do_io_write(this, header.nbytes, creader);
835 } else {
836 if (f.set_prepend) {
837 int64_t a = reader->read_avail();
838 if (a >= static_cast<int64_t>(nbytes)) {
839 a = static_cast<int64_t>(nbytes);
840 }
841 if (!cbuf) {
842 cbuf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
843 }
844 creader = cbuf->alloc_reader();
845 if (a) {
846 cbuf->write(reader, a);
847 reader->consume(a);
848 }
849 if (a == static_cast<int64_t>(nbytes)) {
850 cwvio = cwvc->do_io_write(this, header.nbytes, creader);
851 goto Lstreamdone;
852 }
853 rvio->nbytes = rvio->ndone + (int64_t)nbytes - a;
854 } else {
855 creader = reader;
856 }
857 TS_PUSH_HANDLER(&MC::stream_event);
858 cwvio = cwvc->do_io_write(this, header.nbytes, creader);
859 }
860 return EVENT_CONT;
861 }
862 case TSMEMCACHE_STREAM_DONE:
863 rvio->nbytes = UINT64_MAX;
864 Lstreamdone:
865 if (f.set_prepend) {
866 TS_PUSH_HANDLER(&MC::tunnel_event);
867 crvio = crvc->do_io_read(this, rcache_header->nbytes, cbuf);
868 return EVENT_CONT;
869 }
870 return ASCII_RESPONSE("STORED");
871 case TSMEMCACHE_TUNNEL_DONE:
872 crvc->do_io_close();
873 crvc = 0;
874 crvio = NULL;
875 if (f.set_append) {
876 int64_t a = reader->read_avail();
877 if (a > static_cast<int64_t>(nbytes)) {
878 a = static_cast<int64_t>(nbytes);
879 }
880 if (a) {
881 cbuf->write(reader, a);
882 reader->consume(a);
883 }
884 TS_PUSH_HANDLER(&MC::stream_event);
885 return handleEvent(VC_EVENT_READ_READY, rvio);
886 }
887 ink_assert(f.set_prepend);
888 cwvc->do_io_close();
889 cwvc = 0;
890 return ASCII_RESPONSE("STORED");
891 case CACHE_EVENT_OPEN_READ_FAILED:
892 swallow_bytes = nbytes + 2;
893 return ASCII_RESPONSE("NOT_STORED");
894 case CACHE_EVENT_OPEN_READ:
895 crvc = (CacheVConnection *)data;
896 return set_item();
897 default:
898 break;
899 }
900 return EVENT_CONT;
901 Lfail:
902 Warning("tsmemcache: bad cache data");
903 return ASCII_SERVER_ERROR("");
904 }
905
906 int
ascii_delete_event(int event,void * data)907 MC::ascii_delete_event(int event, void *data)
908 {
909 switch (event) {
910 case CACHE_EVENT_REMOVE_FAILED:
911 return ASCII_RESPONSE("NOT_FOUND");
912 case CACHE_EVENT_REMOVE:
913 return ASCII_RESPONSE("DELETED");
914 default:
915 return EVENT_CONT;
916 }
917 }
918
919 int
ascii_incr_decr_event(int event,void * data)920 MC::ascii_incr_decr_event(int event, void *data)
921 {
922 switch (event) {
923 case CACHE_EVENT_OPEN_WRITE_FAILED:
924 // another write currently in progress
925 mutex->thread_holding->schedule_in(this, TSMEMCACHE_RETRY_WRITE_INTERVAL);
926 return EVENT_CONT;
927 case EVENT_INTERVAL:
928 return read_from_client();
929 case CACHE_EVENT_OPEN_WRITE: {
930 int hlen = 0;
931 cwvc = (CacheVConnection *)data;
932 {
933 if (cwvc->get_header((void **)&wcache_header, &hlen) >= 0) {
934 if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || wcache_header->magic != TSMEMCACHE_HEADER_MAGIC) {
935 goto Lfail;
936 }
937 if (header.nkey != wcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + wcache_header->nkey)) {
938 goto Lfail;
939 }
940 ink_hrtime t = Thread::get_hrtime();
941 if ((static_cast<ink_hrtime>(wcache_header->settime)) <= last_flush ||
942 t >= (static_cast<ink_hrtime>(wcache_header->settime)) + HRTIME_SECONDS(wcache_header->exptime)) {
943 goto Lfail;
944 }
945 } else {
946 goto Lfail;
947 }
948 memcpy(tmp_cache_header_key, key, header.nkey);
949 header.settime = Thread::get_hrtime();
950 if (exptime) {
951 if (exptime > REALTIME_MAXDELTA) {
952 if (HRTIME_SECONDS(exptime) <= (static_cast<ink_hrtime>(header.settime))) {
953 header.exptime = 0;
954 } else {
955 header.exptime = static_cast<int32_t>(exptime - (header.settime / HRTIME_SECOND));
956 }
957 } else {
958 header.exptime = exptime;
959 }
960 } else {
961 header.exptime = UINT32_MAX; // 136 years
962 }
963 }
964 header.cas = ink_atomic_increment(&next_cas, 1);
965 {
966 char *localdata = nullptr;
967 int len = 0;
968 // must be huge, why convert to a counter ??
969 if (cwvc->get_single_data((void **)&localdata, &len) < 0) {
970 goto Lfail;
971 }
972 uint64_t new_value = xatoull(localdata, localdata + len);
973 if (f.set_incr) {
974 new_value += delta;
975 } else {
976 if (delta > new_value) {
977 new_value = 0;
978 } else {
979 new_value -= delta;
980 }
981 }
982 char new_value_str_buffer[32], *e = &new_value_str_buffer[30];
983 e[0] = '\r';
984 e[1] = '\n';
985 char *s = xutoa(new_value, e);
986 creader = wbuf->clone_reader(writer);
987 wbuf->write(s, e - s + 2);
988 if (f.noreply) {
989 writer->consume(e - s + 2);
990 } else {
991 wvio->reenable();
992 }
993 MCDebugBuf("tsmemcache_ascii_response", s, e - s + 2);
994 header.nbytes = e - s;
995 cwvc->set_header(&header, header.len());
996 TS_PUSH_HANDLER(&MC::stream_event);
997 cwvio = cwvc->do_io_write(this, header.nbytes, creader);
998 }
999 return EVENT_CONT;
1000 }
1001 case TSMEMCACHE_STREAM_DONE: {
1002 wbuf->dealloc_reader(creader);
1003 creader = 0;
1004 reader->consume(end_of_cmd);
1005 return read_from_client();
1006 }
1007 default:
1008 break;
1009 }
1010 return EVENT_CONT;
1011 Lfail:
1012 Warning("tsmemcache: bad cache data");
1013 return ASCII_RESPONSE("NOT_FOUND");
1014 }
1015
1016 int
get_ascii_key(char * as,char * e)1017 MC::get_ascii_key(char *as, char *e)
1018 {
1019 char *s = as;
1020 // skip space
1021 while (*s == ' ') {
1022 s++;
1023 if (s >= e) {
1024 if (as - e >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE) {
1025 return ASCII_CLIENT_ERROR("bad command line");
1026 }
1027 return EVENT_CONT;
1028 }
1029 }
1030 // grab key
1031 key = s;
1032 while (!isspace(*s)) {
1033 if (s >= e) {
1034 if (as - e >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE) {
1035 return ASCII_RESPONSE("key too large");
1036 }
1037 return EVENT_CONT;
1038 }
1039 s++;
1040 }
1041 if (s - key > TSMEMCACHE_MAX_KEY_LEN) {
1042 return ASCII_CLIENT_ERROR("bad command line");
1043 }
1044 header.nkey = s - key;
1045 if (!header.nkey) {
1046 if (e - s >= 2) {
1047 if (*s == '\r') {
1048 s++;
1049 }
1050 if (*s == '\n' && ngets) {
1051 return ASCII_RESPONSE("END");
1052 }
1053 return ASCII_CLIENT_ERROR("bad command line");
1054 }
1055 return EVENT_CONT; // get some more
1056 }
1057 read_offset = s - as;
1058 return TSMEMCACHE_EVENT_GOT_KEY;
1059 }
1060
1061 int
ascii_get(char * as,char * e)1062 MC::ascii_get(char *as, char *e)
1063 {
1064 SET_HANDLER(&MC::ascii_get_event);
1065 CHECK_RET(get_ascii_key(as, e), TSMEMCACHE_EVENT_GOT_KEY);
1066 ngets++;
1067 return get_item();
1068 }
1069
1070 int
ascii_gets()1071 MC::ascii_gets()
1072 {
1073 int len = 0;
1074 char *c = get_ascii_input(TSMEMCACHE_TMP_CMD_BUFFER_SIZE, &len);
1075 return ascii_get(c, c + len);
1076 }
1077
1078 #define SKIP_SPACE \
1079 do { \
1080 while (*s == ' ') { \
1081 s++; \
1082 if (s >= e) \
1083 return ASCII_CLIENT_ERROR("bad command line"); \
1084 } \
1085 } while (0)
1086
1087 #define SKIP_TOKEN \
1088 do { \
1089 while (!isspace(*s)) { \
1090 s++; \
1091 if (s >= e) \
1092 return ASCII_CLIENT_ERROR("bad command line"); \
1093 } \
1094 } while (0)
1095
1096 #define GET_NUM(_n) \
1097 do { \
1098 if (isdigit(*s)) { \
1099 _n = *s - '0'; \
1100 s++; \
1101 if (s >= e) \
1102 return ASCII_CLIENT_ERROR("bad command line"); \
1103 } else \
1104 _n = 0; \
1105 while (isdigit(*s)) { \
1106 _n *= 10; \
1107 _n += *s - '0'; \
1108 s++; \
1109 if (s >= e) \
1110 return ASCII_CLIENT_ERROR("bad command line"); \
1111 } \
1112 } while (0)
1113
1114 #define GET_SNUM(_n) \
1115 do { \
1116 int neg = 0; \
1117 if (*s == '-') { \
1118 s++; \
1119 neg = 1; \
1120 } \
1121 if (isdigit(*s)) { \
1122 _n = *s - '0'; \
1123 s++; \
1124 if (s >= e) \
1125 return ASCII_CLIENT_ERROR("bad command line"); \
1126 } else \
1127 _n = 0; \
1128 while (isdigit(*s)) { \
1129 _n *= 10; \
1130 _n += *s - '0'; \
1131 s++; \
1132 if (s >= e) \
1133 return ASCII_CLIENT_ERROR("bad command line"); \
1134 } \
1135 if (neg) \
1136 _n = -_n; \
1137 } while (0)
1138
1139 int
ascii_set(char * s,char * e)1140 MC::ascii_set(char *s, char *e)
1141 {
1142 SKIP_SPACE;
1143 key = s;
1144 SKIP_TOKEN;
1145 header.nkey = s - key;
1146 SKIP_SPACE;
1147 GET_NUM(header.flags);
1148 SKIP_SPACE;
1149 GET_SNUM(exptime);
1150 SKIP_SPACE;
1151 GET_NUM(nbytes);
1152 swallow_bytes = nbytes + 2; // assume failure
1153 if (f.set_cas) {
1154 SKIP_SPACE;
1155 GET_NUM(header.cas);
1156 } else {
1157 header.cas = 0;
1158 }
1159 SKIP_SPACE;
1160 if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) {
1161 f.noreply = 1;
1162 s += 7;
1163 if (s >= e) {
1164 return ASCII_CLIENT_ERROR("bad command line");
1165 }
1166 SKIP_SPACE;
1167 }
1168 if (*s == '\r') {
1169 s++;
1170 }
1171 if (*s == '\n') {
1172 s++;
1173 }
1174 if (s != e) {
1175 return ASCII_CLIENT_ERROR("bad command line");
1176 }
1177 SET_HANDLER(&MC::ascii_set_event);
1178 if (f.set_append || f.set_prepend) {
1179 return get_item();
1180 } else {
1181 return set_item();
1182 }
1183 }
1184
1185 int
ascii_delete(char * s,char * e)1186 MC::ascii_delete(char *s, char *e)
1187 {
1188 SKIP_SPACE;
1189 key = s;
1190 SKIP_TOKEN;
1191 header.nkey = s - key;
1192 SKIP_SPACE;
1193 if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) {
1194 f.noreply = 1;
1195 s += 7;
1196 if (s >= e) {
1197 return ASCII_CLIENT_ERROR("bad command line");
1198 }
1199 SKIP_SPACE;
1200 }
1201 if (*s == '\r') {
1202 s++;
1203 }
1204 if (*s == '\n') {
1205 s++;
1206 }
1207 if (s != e) {
1208 return ASCII_CLIENT_ERROR("bad command line");
1209 }
1210 SET_HANDLER(&MC::ascii_delete_event);
1211 return delete_item();
1212 }
1213
1214 int
ascii_incr_decr(char * s,char * e)1215 MC::ascii_incr_decr(char *s, char *e)
1216 {
1217 SKIP_SPACE;
1218 key = s;
1219 SKIP_TOKEN;
1220 header.nkey = s - key;
1221 SKIP_SPACE;
1222 GET_NUM(delta);
1223 SKIP_SPACE;
1224 if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) {
1225 f.noreply = 1;
1226 s += 7;
1227 if (s >= e) {
1228 return ASCII_CLIENT_ERROR("bad command line");
1229 }
1230 SKIP_SPACE;
1231 }
1232 if (*s == '\r') {
1233 s++;
1234 }
1235 if (*s == '\n') {
1236 s++;
1237 }
1238 if (s != e) {
1239 return ASCII_CLIENT_ERROR("bad command line");
1240 }
1241 SET_HANDLER(&MC::ascii_incr_decr_event);
1242 return set_item();
1243 }
1244
1245 static int
is_end_of_cmd(char * t,char * e)1246 is_end_of_cmd(char *t, char *e)
1247 {
1248 while (*t == ' ' && t < e) {
1249 t++; // skip spaces
1250 }
1251 if (*t == '\r') {
1252 t++;
1253 }
1254 if (t != e - 1) {
1255 return 0;
1256 }
1257 return 1;
1258 }
1259
1260 // moves *pt past the noreply if it is found
1261 static int
is_noreply(char ** pt,char * e)1262 is_noreply(char **pt, char *e)
1263 {
1264 char *t = *pt;
1265 if (t < e - 8) {
1266 while (*t == ' ') {
1267 if (t > e - 8) {
1268 return 0;
1269 }
1270 t++;
1271 }
1272 if (t[0] == 'n' && !STRCMP(t + 1, "oreply") && isspace(t[7])) {
1273 *pt = t + sizeof("noreply") - 1;
1274 return 1;
1275 }
1276 }
1277 return 0;
1278 }
1279
1280 int
read_ascii_from_client_event(int event,void * data)1281 MC::read_ascii_from_client_event(int event, void *data)
1282 {
1283 int len = 0;
1284 char *c = get_ascii_input(TSMEMCACHE_TMP_CMD_BUFFER_SIZE, &len), *s = c;
1285 MCDebugBuf("tsmemcache_ascii_cmd", c, len);
1286 char *e = c + len - 5; // at least 6 chars
1287 while (*s == ' ' && s < e) {
1288 s++; // skip leading spaces
1289 }
1290 if (s >= e) {
1291 if (len >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE || memchr(c, '\n', len)) {
1292 return ASCII_CLIENT_ERROR("bad command line");
1293 }
1294 return EVENT_CONT;
1295 }
1296 // gets can be large, so do not require the full cmd fit in the buffer
1297 e = c + len;
1298 switch (*s) {
1299 case 'g': // get gets
1300 if (s[3] == 's' && s[4] == ' ') {
1301 f.return_cas = 1;
1302 read_offset = 5;
1303 goto Lget;
1304 } else if (s[3] == ' ') {
1305 read_offset = 4;
1306 Lget:
1307 reader->consume(read_offset);
1308 if (c != tmp_cmd_buffer) { // all in the block
1309 return ascii_get(s + read_offset, e);
1310 } else {
1311 return ascii_gets();
1312 }
1313 }
1314 break;
1315 case 'b': // bget
1316 if (s[4] != ' ') {
1317 break;
1318 }
1319 read_offset = 5;
1320 goto Lget;
1321 break;
1322 default:
1323 break;
1324 }
1325 // find the end of the command
1326 e = static_cast<char *>(memchr(s, '\n', len));
1327 if (!e) {
1328 if (reader->read_avail() > TSMEMCACHE_MAX_CMD_SIZE) {
1329 return ASCII_CLIENT_ERROR("bad command line");
1330 }
1331 return EVENT_CONT;
1332 }
1333 e++; // skip nl
1334 end_of_cmd = e - c;
1335 switch (*s) {
1336 case 's': // set stats
1337 if (s[1] == 'e' && s[2] == 't' && s[3] == ' ') {
1338 return ascii_set(s + sizeof("set") - 1, e);
1339 }
1340 if (STRCMP_REST("tats", s + 1, e)) {
1341 break;
1342 }
1343 s += sizeof("stats") - 1;
1344 if (is_noreply(&s, e)) {
1345 break; // to please memcapable
1346 } else {
1347 return ASCII_RESPONSE("END");
1348 }
1349 case 'a': // add
1350 if (s[1] == 'd' && s[2] == 'd' && s[3] == ' ') {
1351 f.set_add = 1;
1352 return ascii_set(s + sizeof("add") - 1, e);
1353 }
1354 if (STRCMP_REST("ppend", s + 1, e)) {
1355 break;
1356 }
1357 f.set_append = 1;
1358 return ascii_set(s + sizeof("append") - 1, e);
1359 case 'p': // prepend
1360 if (STRCMP_REST("repend", s + 1, e)) {
1361 break;
1362 }
1363 f.set_prepend = 1;
1364 return ascii_set(s + sizeof("prepend") - 1, e);
1365 case 'c': // cas
1366 if (s[1] == 'a' && s[2] == 's' && s[3] == ' ') {
1367 f.set_cas = 1;
1368 return ascii_set(s + sizeof("cas") - 1, e);
1369 }
1370 break;
1371 case 'i': // incr
1372 if (s[1] == 'n' && s[2] == 'c' && s[3] == 'r' && s[4] == ' ') {
1373 f.set_incr = 1;
1374 return ascii_incr_decr(s + sizeof("incr") - 1, e);
1375 }
1376 break;
1377 case 'f': { // flush_all
1378 if (STRCMP_REST("lush_all", s + 1, e)) {
1379 break;
1380 }
1381 s += sizeof("flush_all") - 1;
1382 SKIP_SPACE;
1383 int32_t time_offset = 0;
1384 if (isdigit(*s)) {
1385 GET_NUM(time_offset);
1386 }
1387 f.noreply = is_noreply(&s, e);
1388 ink_hrtime new_last_flush = Thread::get_hrtime() + HRTIME_SECONDS(time_offset);
1389 #if __WORDSIZE == 64
1390 last_flush = new_last_flush; // this will be atomic for native word size
1391 #else
1392 ink_atomic_swap(&last_flush, new_last_flush);
1393 #endif
1394 if (!is_end_of_cmd(s, e)) {
1395 break;
1396 }
1397 return ASCII_RESPONSE("OK");
1398 }
1399 case 'd': // delete decr
1400 if (e - s < 5) {
1401 break;
1402 }
1403 if (s[2] == 'l') {
1404 if (s[1] == 'e' && s[3] == 'e' && s[4] == 't' && s[5] == 'e' && s[6] == ' ') {
1405 return ascii_delete(s + sizeof("delete") - 1, e);
1406 }
1407 } else if (s[1] == 'e' && s[2] == 'c' && s[3] == 'r' && s[4] == ' ') { // decr
1408 f.set_decr = 1;
1409 return ascii_incr_decr(s + sizeof("decr") - 1, e);
1410 }
1411 break;
1412 case 'r': // replace
1413 if (STRCMP_REST("eplace", s + 1, e)) {
1414 break;
1415 }
1416 f.set_replace = 1;
1417 return ascii_set(s + sizeof("replace") - 1, e);
1418 case 'q': // quit
1419 if (STRCMP_REST("uit", s + 1, e)) {
1420 break;
1421 }
1422 if (!is_end_of_cmd(s + sizeof("quit") - 1, e)) {
1423 break;
1424 }
1425 return die();
1426 case 'v': { // version
1427 if (s[3] == 's') {
1428 if (STRCMP_REST("ersion", s + 1, e)) {
1429 break;
1430 }
1431 if (!is_end_of_cmd(s + sizeof("version") - 1, e)) {
1432 break;
1433 }
1434 return ASCII_RESPONSE("VERSION " TSMEMCACHE_VERSION);
1435 } else if (s[3] == 'b') {
1436 if (STRCMP_REST("erbosity", s + 1, e)) {
1437 break;
1438 }
1439 s += sizeof("verbosity") - 1;
1440 SKIP_SPACE;
1441 if (!isdigit(*s)) {
1442 break;
1443 }
1444 GET_NUM(verbosity);
1445 f.noreply = is_noreply(&s, e);
1446 if (!is_end_of_cmd(s, e)) {
1447 break;
1448 }
1449 return ASCII_RESPONSE("OK");
1450 }
1451 break;
1452 }
1453 }
1454 return ASCII_ERROR();
1455 }
1456
1457 int
write_then_close_event(int event,void * data)1458 MC::write_then_close_event(int event, void *data)
1459 {
1460 switch (event) {
1461 case VC_EVENT_EOS:
1462 if ((VIO *)data == wvio) {
1463 break;
1464 }
1465 // fall through
1466 case VC_EVENT_READ_READY:
1467 return EVENT_DONE; // no more of that stuff
1468 case VC_EVENT_WRITE_READY:
1469 if (wvio->buffer.reader()->read_avail() > 0) {
1470 return EVENT_CONT;
1471 }
1472 break;
1473 default:
1474 break;
1475 }
1476 return die();
1477 }
1478
1479 int
read_from_client_event(int event,void * data)1480 MC::read_from_client_event(int event, void *data)
1481 {
1482 switch (event) {
1483 case TSMEMCACHE_STREAM_DONE:
1484 return read_from_client();
1485 case VC_EVENT_READ_READY:
1486 case VC_EVENT_EOS:
1487 if (reader->read_avail() < 1) {
1488 return EVENT_CONT;
1489 }
1490 if ((uint8_t)reader->start()[0] == (uint8_t)PROTOCOL_BINARY_REQ) {
1491 return TS_SET_CALL(&MC::read_binary_from_client_event, event, data);
1492 } else {
1493 return TS_SET_CALL(&MC::read_ascii_from_client_event, event, data);
1494 }
1495 case VC_EVENT_WRITE_READY:
1496 case VC_EVENT_WRITE_COMPLETE:
1497 break;
1498 default:
1499 return die();
1500 }
1501 return EVENT_CONT;
1502 }
1503
1504 // between client and cache
1505 int
stream_event(int event,void * data)1506 MC::stream_event(int event, void *data)
1507 {
1508 if (data == crvio || data == cwvio) {
1509 switch (event) {
1510 case VC_EVENT_READ_READY:
1511 wvio->reenable();
1512 break;
1513 case VC_EVENT_WRITE_READY:
1514 rvio->reenable();
1515 break;
1516 case VC_EVENT_WRITE_COMPLETE:
1517 case VC_EVENT_EOS:
1518 case VC_EVENT_READ_COMPLETE:
1519 return TS_POP_CALL(TSMEMCACHE_STREAM_DONE, 0);
1520 default:
1521 return die();
1522 }
1523 } else {
1524 switch (event) {
1525 case VC_EVENT_READ_READY:
1526 if (cwvio) {
1527 if (creader != reader && creader->read_avail() < cwvio->nbytes) {
1528 int64_t a = reader->read_avail();
1529 if (a > static_cast<int64_t>(nbytes)) {
1530 a = static_cast<int64_t>(nbytes);
1531 }
1532 if (a) {
1533 cbuf->write(reader, a);
1534 reader->consume(a);
1535 }
1536 }
1537 cwvio->reenable();
1538 }
1539 break;
1540 case VC_EVENT_WRITE_READY:
1541 if (crvio) {
1542 crvio->reenable();
1543 }
1544 break;
1545 case VC_EVENT_WRITE_COMPLETE:
1546 case VC_EVENT_READ_COMPLETE:
1547 return TS_POP_CALL(TSMEMCACHE_STREAM_DONE, 0);
1548 default:
1549 return die();
1550 }
1551 }
1552 return EVENT_CONT;
1553 }
1554
1555 // cache to cache
1556 int
tunnel_event(int event,void * data)1557 MC::tunnel_event(int event, void *data)
1558 {
1559 MCDebug("tsmemcache", "tunnel %d %p crvio %p cwvio %p", event, data, crvio, cwvio);
1560 if (data == crvio) {
1561 switch (event) {
1562 case VC_EVENT_READ_READY:
1563 cwvio->reenable();
1564 break;
1565 case VC_EVENT_EOS:
1566 case VC_EVENT_READ_COMPLETE:
1567 if (cwvio->nbytes == cwvio->ndone + cwvio->buffer.reader()->read_avail()) {
1568 cwvio->reenable();
1569 return EVENT_CONT;
1570 }
1571 return TS_POP_CALL(TSMEMCACHE_TUNNEL_DONE, 0);
1572 default:
1573 return die();
1574 }
1575 } else if (data == cwvio) {
1576 switch (event) {
1577 case VC_EVENT_WRITE_READY:
1578 crvio->reenable();
1579 break;
1580 case VC_EVENT_WRITE_COMPLETE:
1581 case VC_EVENT_EOS:
1582 return TS_POP_CALL(TSMEMCACHE_TUNNEL_DONE, 0);
1583 default:
1584 return die();
1585 }
1586 } else { // network I/O
1587 switch (event) {
1588 case VC_EVENT_READ_READY:
1589 case VC_EVENT_WRITE_READY:
1590 case VC_EVENT_WRITE_COMPLETE:
1591 case VC_EVENT_READ_COMPLETE:
1592 return EVENT_CONT;
1593 default:
1594 return die();
1595 }
1596 }
1597 return EVENT_CONT;
1598 }
1599
1600 int
init_tsmemcache(int port)1601 init_tsmemcache(int port)
1602 {
1603 tsmemcache_constants();
1604 MCAccept *a = new MCAccept;
1605 a->mutex = new_ProxyMutex();
1606 NetProcessor::AcceptOptions options(NetProcessor::DEFAULT_ACCEPT_OPTIONS);
1607 options.local_port = a->accept_port = port;
1608 netProcessor.accept(a, options);
1609 return 0;
1610 }
1611
1612 void
TSPluginInit(int argc,const char * argv[])1613 TSPluginInit(int argc, const char *argv[])
1614 {
1615 ink_assert(sizeof(protocol_binary_request_header) == 24);
1616
1617 TSPluginRegistrationInfo info;
1618 info.plugin_name = (char *)"tsmemcache";
1619 info.vendor_name = (char *)"ats";
1620 info.support_email = (char *)"jplevyak@apache.org";
1621
1622 int port = 11211;
1623
1624 if (TSPluginRegister(&info) != TS_SUCCESS) {
1625 TSError("[PluginInit] tsmemcache registration failed.\n");
1626 goto error;
1627 }
1628
1629 if (argc < 2) {
1630 TSError("[tsmemcache] Usage: tsmemcache.so [accept_port]\n");
1631 goto error;
1632 } else {
1633 int port = atoi(argv[1]);
1634 if (!port) {
1635 TSError("[tsmemcache] bad accept_port '%s'\n", argv[1]);
1636 goto error;
1637 }
1638 MCDebug("tsmemcache", "using accept_port %d", port);
1639 }
1640 init_tsmemcache(port);
1641 return;
1642
1643 error:
1644 TSError("[PluginInit] Plugin not initialized");
1645 }
1646