1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "tsmemcache.h"
25 
26 /*
27   TODO
28   - on OPEN_WRITE_FAIL don't poll, figure out another way, and timeout
29   - factor code better, particularly incr/set
30   - MIOBufferAccessor::reader_for
31   - cleanup creader dependency in stream_event
32  */
33 
34 #define REALTIME_MAXDELTA 60 * 60 * 24 * 30
35 #define STRCMP_REST(_c, _s, _e) (((_e) - (_s)) < (int)sizeof(_c) || STRCMP(_s, _c) || !isspace((_s)[sizeof(_c) - 1]))
36 
37 ClassAllocator<MC> theMCAllocator("MC");
38 
39 static time_t base_day_time;
40 
41 // These should be persistent.
42 int32_t MC::verbosity     = 0;
43 ink_hrtime MC::last_flush = 0;
44 int64_t MC::next_cas      = 1;
45 
46 static void
tsmemcache_constants()47 tsmemcache_constants()
48 {
49   struct tm tm;
50   memset(&tm, 0, sizeof(tm));
51   // jan 1 2010
52   tm.tm_year    = 110;
53   tm.tm_mon     = 1;
54   tm.tm_mday    = 1;
55   base_day_time = mktime(&tm);
56   ink_assert(base_day_time != (time_t)-1);
57 }
58 
59 #ifdef DEBUG
60 char debug_string_buffer[TSMEMCACHE_TMP_CMD_BUFFER_SIZE];
61 static char *
mc_string(const char * s,int len)62 mc_string(const char *s, int len)
63 {
64   int l = len;
65   while (l && (s[l - 1] == '\r' || s[l - 1] == '\n')) {
66     l--;
67   }
68   if (l > TSMEMCACHE_TMP_CMD_BUFFER_SIZE - 1) {
69     l = TSMEMCACHE_TMP_CMD_BUFFER_SIZE - 1;
70   }
71   if (l) {
72     memcpy(debug_string_buffer, s, l);
73   }
74   debug_string_buffer[l] = 0;
75   return debug_string_buffer;
76 }
77 #endif
78 
79 #ifdef DEBUG
80 #define MCDebugBuf(_t, _s, _l) \
81   if (is_debug_tag_set(_t))    \
82   printf(_t ": %s\n", mc_string(_s, _l))
83 #define MCDebug Debug
84 #else
85 #define MCDebugBuf(_t, _s, _l) \
86   do {                         \
87   } while (0)
88 #define MCDebug \
89   if (0)        \
90   Debug
91 #endif
92 
93 static uint64_t
ink_hton64(uint64_t in)94 ink_hton64(uint64_t in)
95 {
96   int32_t val = 1;
97   uint8_t *c  = reinterpret_cast<uint8_t *>(&val);
98   if (*c == 1) {
99     union {
100       uint64_t rv;
101       uint8_t b[8];
102     } x;
103 #define SWP1B(_x, _y) \
104   do {                \
105     uint8_t t = (_y); \
106     (_y)      = (_x); \
107     (_x)      = t;    \
108   } while (0)
109     x.rv = in;
110     SWP1B(x.b[0], x.b[7]);
111     SWP1B(x.b[1], x.b[6]);
112     SWP1B(x.b[2], x.b[5]);
113     SWP1B(x.b[3], x.b[4]);
114 #undef SWP1B
115     return x.rv;
116   } else {
117     return in;
118   }
119 }
120 #define ink_ntoh64 ink_hton64
121 
122 int
main_event(int event,void * data)123 MCAccept::main_event(int event, void *data)
124 {
125   if (event == NET_EVENT_ACCEPT) {
126     NetVConnection *netvc = (NetVConnection *)data;
127     MC *mc                = theMCAllocator.alloc();
128     if (!mutex->thread_holding) {
129       mc->new_connection(netvc, netvc->thread);
130     } else {
131       mc->new_connection(netvc, mutex->thread_holding);
132     }
133     return EVENT_CONT;
134   } else {
135     Fatal("tsmemcache accept received fatal error: errno = %d", -(static_cast<int>((intptr_t)data)));
136     return EVENT_CONT;
137   }
138 }
139 
140 void
new_connection(NetVConnection * netvc,EThread * thread)141 MC::new_connection(NetVConnection *netvc, EThread *thread)
142 {
143   nvc              = netvc;
144   mutex            = new_ProxyMutex();
145   rbuf             = new_MIOBuffer(MAX_IOBUFFER_SIZE);
146   rbuf->water_mark = TSMEMCACHE_TMP_CMD_BUFFER_SIZE;
147   reader           = rbuf->alloc_reader();
148   wbuf             = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
149   cbuf             = 0;
150   writer           = wbuf->alloc_reader();
151   SCOPED_MUTEX_LOCK(lock, mutex, thread);
152   rvio         = nvc->do_io_read(this, INT64_MAX, rbuf);
153   wvio         = nvc->do_io_write(this, 0, writer);
154   header.magic = TSMEMCACHE_HEADER_MAGIC;
155   read_from_client();
156 }
157 
158 int
die()159 MC::die()
160 {
161   if (pending_action && pending_action != ACTION_RESULT_DONE) {
162     pending_action->cancel();
163   }
164   if (nvc) {
165     nvc->do_io_close(1); // abort
166   }
167   if (crvc) {
168     crvc->do_io_close(1); // abort
169   }
170   if (cwvc) {
171     cwvc->do_io_close(1); // abort
172   }
173   if (rbuf) {
174     free_MIOBuffer(rbuf);
175   }
176   if (wbuf) {
177     free_MIOBuffer(wbuf);
178   }
179   if (cbuf) {
180     free_MIOBuffer(cbuf);
181   }
182   if (tbuf) {
183     ats_free(tbuf);
184   }
185   mutex = NULL;
186   theMCAllocator.free(this);
187   return EVENT_DONE;
188 }
189 
190 int
unexpected_event()191 MC::unexpected_event()
192 {
193   ink_assert(!"unexpected event");
194   return die();
195 }
196 
197 int
write_then_close(int64_t ntowrite)198 MC::write_then_close(int64_t ntowrite)
199 {
200   SET_HANDLER(&MC::write_then_close_event);
201   return write_to_client(ntowrite);
202 }
203 
204 int
write_then_read_from_client(int64_t ntowrite)205 MC::write_then_read_from_client(int64_t ntowrite)
206 {
207   SET_HANDLER(&MC::read_from_client_event);
208   return write_to_client(ntowrite);
209 }
210 
211 int
stream_then_read_from_client(int64_t ntowrite)212 MC::stream_then_read_from_client(int64_t ntowrite)
213 {
214   SET_HANDLER(&MC::read_from_client_event);
215   creader = reader;
216   TS_PUSH_HANDLER(&MC::stream_event);
217   return write_to_client(ntowrite);
218 }
219 
220 void
add_binary_header(uint16_t err,uint8_t hdr_len,uint16_t key_len,uint32_t body_len)221 MC::add_binary_header(uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len)
222 {
223   protocol_binary_response_header r;
224 
225   r.response.magic    = static_cast<uint8_t>(PROTOCOL_BINARY_RES);
226   r.response.opcode   = binary_header.request.opcode;
227   r.response.keylen   = (uint16_t)htons(key_len);
228   r.response.extlen   = hdr_len;
229   r.response.datatype = static_cast<uint8_t>(PROTOCOL_BINARY_RAW_BYTES);
230   r.response.status   = (uint16_t)htons(err);
231   r.response.bodylen  = htonl(body_len);
232   r.response.opaque   = binary_header.request.opaque;
233   r.response.cas      = ink_hton64(header.cas);
234 
235   wbuf->write(&r, sizeof(r));
236 }
237 
238 int
write_binary_error(protocol_binary_response_status err,int swallow)239 MC::write_binary_error(protocol_binary_response_status err, int swallow)
240 {
241   const char *errstr = "Unknown error";
242   switch (err) {
243   case PROTOCOL_BINARY_RESPONSE_ENOMEM:
244     errstr = "Out of memory";
245     break;
246   case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
247     errstr = "Unknown command";
248     break;
249   case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
250     errstr = "Not found";
251     break;
252   case PROTOCOL_BINARY_RESPONSE_EINVAL:
253     errstr = "Invalid arguments";
254     break;
255   case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
256     errstr = "Data exists for key.";
257     break;
258   case PROTOCOL_BINARY_RESPONSE_E2BIG:
259     errstr = "Too large.";
260     break;
261   case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
262     errstr = "Non-numeric server-side value for incr or decr";
263     break;
264   case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
265     errstr = "Not stored.";
266     break;
267   case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
268     errstr = "Auth failure.";
269     break;
270   default:
271     ink_assert(!"unhandled error");
272     errstr = "UNHANDLED ERROR";
273     Warning("tsmemcache: unhandled error: %d\n", err);
274   }
275 
276   size_t len = strlen(errstr);
277   add_binary_header(err, 0, 0, len);
278   if (swallow > 0) {
279     int64_t avail = reader->read_avail();
280     if (avail >= swallow) {
281       reader->consume(swallow);
282     } else {
283       swallow_bytes = swallow - avail;
284       reader->consume(avail);
285       SET_HANDLER(&MC::swallow_then_read_event);
286     }
287   }
288   return 0;
289 }
290 
291 int
swallow_then_read_event(int event,void * data)292 MC::swallow_then_read_event(int event, void *data)
293 {
294   rvio->nbytes  = INT64_MAX;
295   int64_t avail = reader->read_avail();
296   if (avail >= swallow_bytes) {
297     reader->consume(swallow_bytes);
298     swallow_bytes = 0;
299     return read_from_client();
300   } else {
301     swallow_bytes -= avail;
302     reader->consume(avail);
303     return EVENT_CONT;
304   }
305 }
306 
307 int
swallow_cmd_then_read_from_client_event(int event,void * data)308 MC::swallow_cmd_then_read_from_client_event(int event, void *data)
309 {
310   int64_t avail = reader->read_avail();
311   if (avail) {
312     int64_t n = reader->memchr('\n');
313     if (n >= 0) {
314       reader->consume(n + 1);
315       return read_from_client();
316     }
317     reader->consume(avail);
318     return EVENT_CONT;
319   }
320   return EVENT_CONT;
321 }
322 
323 int
protocol_error()324 MC::protocol_error()
325 {
326   Warning("tsmemcache: protocol error");
327   return write_then_close(write_binary_error(PROTOCOL_BINARY_RESPONSE_EINVAL, 0));
328 }
329 
330 int
read_from_client()331 MC::read_from_client()
332 {
333   if (swallow_bytes) {
334     return TS_SET_CALL(&MC::swallow_then_read_event, VC_EVENT_READ_READY, rvio);
335   }
336   read_offset = 0;
337   end_of_cmd  = 0;
338   ngets       = 0;
339   ff          = 0;
340   if (crvc) {
341     crvc->do_io_close();
342     crvc  = 0;
343     crvio = NULL;
344   }
345   if (cwvc) {
346     cwvc->do_io_close();
347     cwvc  = 0;
348     cwvio = NULL;
349   }
350   if (cbuf) {
351     cbuf->clear();
352   }
353   ink_assert(!crvc && !cwvc);
354   if (tbuf) {
355     ats_free(tbuf);
356   }
357   return TS_SET_CALL(&MC::read_from_client_event, VC_EVENT_READ_READY, rvio);
358 }
359 
360 int
write_to_client(int64_t towrite)361 MC::write_to_client(int64_t towrite)
362 {
363   (void)towrite;
364   wvio->nbytes = INT64_MAX;
365   wvio->reenable();
366   return EVENT_CONT;
367 }
368 
369 int
write_binary_response(const void * d,int hlen,int keylen,int dlen)370 MC::write_binary_response(const void *d, int hlen, int keylen, int dlen)
371 {
372   if (!f.noreply || binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETQ ||
373       binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETKQ) {
374     add_binary_header(0, hlen, keylen, dlen);
375     if (dlen) {
376       MCDebug("tsmemcache", "response dlen %d\n", dlen);
377       wbuf->write(d, dlen);
378     } else {
379       MCDebug("tsmemcache", "no response\n");
380     }
381   }
382   return writer->read_avail();
383 }
384 
385 #define CHECK_READ_AVAIL(_n, _h)                     \
386   do {                                               \
387     if (reader->read_avail() < _n) {                 \
388       switch (event) {                               \
389       case VC_EVENT_EOS:                             \
390         if ((VIO *)data == rvio)                     \
391           break;                                     \
392       /* fallthrough */                              \
393       case VC_EVENT_READ_READY:                      \
394         return EVENT_CONT;                           \
395       case VC_EVENT_WRITE_READY:                     \
396         if (wvio->buffer.reader()->read_avail() > 0) \
397           return EVENT_CONT;                         \
398       /* fallthrough */                              \
399       case VC_EVENT_WRITE_COMPLETE:                  \
400         return EVENT_DONE;                           \
401       default:                                       \
402         break;                                       \
403       }                                              \
404       return die();                                  \
405     }                                                \
406   } while (0)
407 
408 static char *
get_pointer(MC * mc,int start,int len)409 get_pointer(MC *mc, int start, int len)
410 {
411   if (mc->reader->block_read_avail() >= start + len) {
412     return mc->reader->start() + start;
413   }
414   // the block of data straddles an IOBufferBlock boundary, exceptional case, malloc
415   ink_assert(!mc->tbuf);
416   mc->tbuf = static_cast<char *>(ats_malloc(len));
417   mc->reader->memcpy(mc->tbuf, len, start);
418   return mc->tbuf;
419 }
420 
421 static inline char *
binary_get_key(MC * mc)422 binary_get_key(MC *mc)
423 {
424   return get_pointer(mc, 0, mc->binary_header.request.keylen);
425 }
426 
427 int
cache_read_event(int event,void * data)428 MC::cache_read_event(int event, void *data)
429 {
430   switch (event) {
431   case CACHE_EVENT_OPEN_READ: {
432     crvc     = (CacheVConnection *)data;
433     int hlen = 0;
434     if (crvc->get_header((void **)&rcache_header, &hlen) < 0) {
435       goto Lfail;
436     }
437     if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || rcache_header->magic != TSMEMCACHE_HEADER_MAGIC) {
438       goto Lfail;
439     }
440     if (header.nkey != rcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + rcache_header->nkey)) {
441       goto Lfail;
442     }
443     if (memcmp(key, rcache_header->key(), header.nkey)) {
444       goto Lfail;
445     }
446     {
447       ink_hrtime t = Thread::get_hrtime();
448       if ((static_cast<ink_hrtime>(rcache_header->settime)) <= last_flush ||
449           t >= (static_cast<ink_hrtime>(rcache_header->settime)) + HRTIME_SECONDS(rcache_header->exptime)) {
450         goto Lfail;
451       }
452     }
453     break;
454   Lfail:
455     crvc->do_io_close();
456     crvc  = 0;
457     crvio = NULL;
458     event = CACHE_EVENT_OPEN_READ_FAILED; // convert to failure
459     break;
460   }
461   case VC_EVENT_EOS:
462   case VC_EVENT_ERROR:
463   case CACHE_EVENT_OPEN_READ_FAILED:
464     break;
465   default:
466     return EVENT_CONT;
467   }
468   return TS_POP_CALL(event, data);
469 }
470 
471 int
get_item()472 MC::get_item()
473 {
474   TS_PUSH_HANDLER(&MC::cache_read_event);
475   CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey);
476   pending_action = cacheProcessor.open_read(this, &cache_key);
477   return EVENT_CONT;
478 }
479 
480 int
set_item()481 MC::set_item()
482 {
483   CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey);
484   pending_action = cacheProcessor.open_write(this, &cache_key, CACHE_FRAG_TYPE_NONE, header.nbytes,
485                                              CACHE_WRITE_OPT_OVERWRITE | TSMEMCACHE_WRITE_SYNC);
486   return EVENT_CONT;
487 }
488 
489 int
delete_item()490 MC::delete_item()
491 {
492   CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey);
493   pending_action = cacheProcessor.remove(this, &cache_key, CACHE_FRAG_TYPE_NONE);
494   return EVENT_CONT;
495 }
496 
497 int
binary_get_event(int event,void * data)498 MC::binary_get_event(int event, void *data)
499 {
500   ink_assert(!"EVENT_ITEM_GOT is incorrect here");
501   if (event != TSMEMCACHE_EVENT_GOT_ITEM) {
502     CHECK_READ_AVAIL(binary_header.request.keylen, &MC::binary_get);
503     key         = binary_get_key(this);
504     header.nkey = binary_header.request.keylen;
505     return get_item();
506   } else if (event == CACHE_EVENT_OPEN_READ_FAILED) {
507     if (f.noreply) {
508       return read_from_client();
509     }
510     if (binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETK) {
511       add_binary_header(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, header.nkey, header.nkey);
512       wbuf->write(key, header.nkey);
513       return write_then_read_from_client();
514     } else {
515       return write_binary_error(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
516     }
517   } else if (event == CACHE_EVENT_OPEN_READ) {
518     protocol_binary_response_get *rsp = &res.get;
519     uint16_t keylen                   = 0;
520     uint32_t bodylen                  = sizeof(rsp->message.body) + (rcache_header->nbytes - 2);
521     bool getk =
522       (binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETK || binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETKQ);
523     if (getk) {
524       bodylen += header.nkey;
525       keylen = header.nkey;
526     }
527     add_binary_header(0, sizeof(rsp->message.body), keylen, bodylen);
528     rsp->message.header.response.cas = ink_hton64(rcache_header->cas);
529     rsp->message.body.flags          = htonl(rcache_header->flags);
530     wbuf->write(&rsp->message.body, sizeof(rsp->message.body));
531     if (getk) {
532       wbuf->write(key, header.nkey);
533     }
534     crvio = crvc->do_io_read(this, rcache_header->nbytes, wbuf);
535     return stream_then_read_from_client(rcache_header->nbytes);
536   } else {
537     return unexpected_event();
538   }
539   return 0;
540 }
541 
542 int
bin_read_key()543 MC::bin_read_key()
544 {
545   return -1;
546 }
547 
548 int
read_binary_from_client_event(int event,void * data)549 MC::read_binary_from_client_event(int event, void *data)
550 {
551   if (reader->read_avail() < (int)sizeof(binary_header)) {
552     return EVENT_CONT;
553   }
554   reader->memcpy(&binary_header, sizeof(binary_header));
555   if (binary_header.request.magic != PROTOCOL_BINARY_REQ) {
556     Warning("tsmemcache: bad binary magic: %x", binary_header.request.magic);
557     return die();
558   }
559   int keylen = binary_header.request.keylen = ntohs(binary_header.request.keylen);
560   int bodylen = binary_header.request.bodylen = ntohl(binary_header.request.bodylen);
561   binary_header.request.cas                   = ink_ntoh64(binary_header.request.cas);
562   int extlen                                  = binary_header.request.extlen;
563   end_of_cmd                                  = sizeof(binary_header) + extlen;
564 
565 #define CHECK_PROTOCOL(_e) \
566   if (!(_e))               \
567     return protocol_error();
568 
569   MCDebug("tsmemcache", "bin cmd %d\n", binary_header.request.opcode);
570   switch (binary_header.request.opcode) {
571   case PROTOCOL_BINARY_CMD_VERSION:
572     CHECK_PROTOCOL(extlen == 0 && keylen == 0 && bodylen == 0);
573     return write_to_client(write_binary_response(TSMEMCACHE_VERSION, 0, 0, STRLEN(TSMEMCACHE_VERSION)));
574   case PROTOCOL_BINARY_CMD_NOOP:
575     CHECK_PROTOCOL(extlen == 0 && keylen == 0 && bodylen == 0);
576     return write_to_client(write_binary_response(nullptr, 0, 0, 0));
577   case PROTOCOL_BINARY_CMD_GETKQ:
578     f.noreply = 1; // fall through
579   case PROTOCOL_BINARY_CMD_GETQ:
580     f.noreply = 1; // fall through
581   case PROTOCOL_BINARY_CMD_GETK:
582   case PROTOCOL_BINARY_CMD_GET:
583     CHECK_PROTOCOL(extlen == 0 && (int)bodylen == keylen && keylen > 0);
584     return TS_SET_CALL(&MC::binary_get_event, event, data);
585   case PROTOCOL_BINARY_CMD_APPENDQ:
586   case PROTOCOL_BINARY_CMD_APPEND:
587     f.set_append = 1;
588     goto Lset;
589   case PROTOCOL_BINARY_CMD_PREPENDQ:
590   case PROTOCOL_BINARY_CMD_PREPEND:
591     f.set_prepend = 1;
592     goto Lset;
593   case PROTOCOL_BINARY_CMD_ADDQ:
594     f.noreply = 1; // fall through
595   case PROTOCOL_BINARY_CMD_ADD:
596     CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8);
597     f.set_add = 1;
598     goto Lset;
599   case PROTOCOL_BINARY_CMD_REPLACEQ:
600     f.noreply = 1; // fall through
601   case PROTOCOL_BINARY_CMD_REPLACE:
602     CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8);
603     f.set_replace = 1;
604     goto Lset;
605   case PROTOCOL_BINARY_CMD_SETQ:
606     f.noreply = 1; // fall through
607   case PROTOCOL_BINARY_CMD_SET: {
608     CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8);
609   Lset:
610     if (bin_read_key() < 0) {
611       return EVENT_CONT;
612     }
613     key                              = binary_get_key(this);
614     header.nkey                      = keylen;
615     protocol_binary_request_set *req = reinterpret_cast<protocol_binary_request_set *>(&binary_header);
616     req->message.body.flags          = ntohl(req->message.body.flags);
617     req->message.body.expiration     = ntohl(req->message.body.expiration);
618     nbytes                           = bodylen - (header.nkey + extlen);
619     break;
620   }
621   case PROTOCOL_BINARY_CMD_DELETEQ:
622     f.noreply = 1; // fall through
623   case PROTOCOL_BINARY_CMD_DELETE:
624     break;
625   case PROTOCOL_BINARY_CMD_INCREMENTQ:
626     f.noreply = 1; // fall through
627   case PROTOCOL_BINARY_CMD_INCREMENT:
628     break;
629   case PROTOCOL_BINARY_CMD_DECREMENTQ:
630     f.noreply = 1; // fall through
631   case PROTOCOL_BINARY_CMD_DECREMENT:
632     break;
633   case PROTOCOL_BINARY_CMD_QUITQ:
634     f.noreply = 1; // fall through
635   case PROTOCOL_BINARY_CMD_QUIT:
636     if (f.noreply) {
637       return die();
638     }
639     return write_then_close(write_binary_response(nullptr, 0, 0, 0));
640   case PROTOCOL_BINARY_CMD_FLUSHQ:
641     f.noreply = 1; // fall through
642   case PROTOCOL_BINARY_CMD_FLUSH:
643     break;
644     break;
645   case PROTOCOL_BINARY_CMD_STAT:
646     break;
647   case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
648   case PROTOCOL_BINARY_CMD_SASL_AUTH:
649   case PROTOCOL_BINARY_CMD_SASL_STEP:
650     Warning("tsmemcache: sasl not (yet) supported");
651     return die();
652   case PROTOCOL_BINARY_CMD_RGET:
653   case PROTOCOL_BINARY_CMD_RSET:
654   case PROTOCOL_BINARY_CMD_RSETQ:
655   case PROTOCOL_BINARY_CMD_RAPPEND:
656   case PROTOCOL_BINARY_CMD_RAPPENDQ:
657   case PROTOCOL_BINARY_CMD_RPREPEND:
658   case PROTOCOL_BINARY_CMD_RPREPENDQ:
659   case PROTOCOL_BINARY_CMD_RDELETE:
660   case PROTOCOL_BINARY_CMD_RDELETEQ:
661   case PROTOCOL_BINARY_CMD_RINCR:
662   case PROTOCOL_BINARY_CMD_RINCRQ:
663   case PROTOCOL_BINARY_CMD_RDECR:
664   case PROTOCOL_BINARY_CMD_RDECRQ:
665     Warning("tsmemcache: range not (yet) supported");
666     return die();
667   default:
668     Warning("tsmemcache: unexpected binary opcode %x", binary_header.request.opcode);
669     return die();
670   }
671   return EVENT_CONT;
672 }
673 
674 int
ascii_response(const char * s,int len)675 MC::ascii_response(const char *s, int len)
676 {
677   if (!f.noreply) {
678     wbuf->write(s, len);
679     wvio->nbytes = INT64_MAX;
680     wvio->reenable();
681     MCDebugBuf("tsmemcache_ascii_response", s, len);
682   }
683   if (end_of_cmd > 0) {
684     reader->consume(end_of_cmd);
685     return read_from_client();
686   } else if (end_of_cmd < 0) {
687     return read_from_client();
688   } else {
689     return TS_SET_CALL(&MC::swallow_cmd_then_read_from_client_event, EVENT_NONE, NULL);
690   }
691 }
692 
693 char *
get_ascii_input(int n,int * end)694 MC::get_ascii_input(int n, int *end)
695 {
696   int block_read_avail = reader->block_read_avail();
697   if (block_read_avail >= n) {
698   Lblock:
699     *end = block_read_avail;
700     return reader->start();
701   }
702   int read_avail = reader->read_avail();
703   if (block_read_avail == read_avail) {
704     goto Lblock;
705   }
706   char *c = tmp_cmd_buffer;
707   int e   = read_avail;
708   if (e > n) {
709     e = n;
710   }
711   reader->memcpy(c, e);
712   *end = e;
713   return c;
714 }
715 
716 int
ascii_get_event(int event,void * data)717 MC::ascii_get_event(int event, void *data)
718 {
719   switch (event) {
720   case CACHE_EVENT_OPEN_READ_FAILED:
721     reader->consume(read_offset);
722     read_offset = 0;
723     break;
724   case CACHE_EVENT_OPEN_READ: {
725     wbuf->WRITE("VALUE ");
726     wbuf->write(key, header.nkey);
727     wbuf->WRITE(" ");
728     char t[32], *te = t + 32;
729     char *flags = xutoa(rcache_header->flags, te);
730     wbuf->write(flags, te - flags);
731     wbuf->WRITE(" ");
732     char *bytes = xutoa(rcache_header->nbytes, te);
733     wbuf->write(bytes, te - bytes);
734     if (f.return_cas) {
735       wbuf->WRITE(" ");
736       char *pcas = xutoa(rcache_header->cas, te);
737       wbuf->write(pcas, te - pcas);
738     }
739     wbuf->WRITE("\r\n");
740     int ntowrite = writer->read_avail() + rcache_header->nbytes;
741     crvio        = crvc->do_io_read(this, rcache_header->nbytes, wbuf);
742     creader      = reader;
743     TS_PUSH_HANDLER(&MC::stream_event);
744     return write_to_client(ntowrite);
745   }
746   case TSMEMCACHE_STREAM_DONE:
747     crvc->do_io_close();
748     crvc  = 0;
749     crvio = NULL;
750     reader->consume(read_offset);
751     read_offset = 0;
752     wbuf->WRITE("\r\n");
753     return ascii_gets();
754   default:
755     break;
756   }
757   return ascii_gets();
758 }
759 
760 int
ascii_set_event(int event,void * data)761 MC::ascii_set_event(int event, void *data)
762 {
763   switch (event) {
764   case CACHE_EVENT_OPEN_WRITE_FAILED:
765     // another write currently in progress
766     mutex->thread_holding->schedule_in(this, TSMEMCACHE_RETRY_WRITE_INTERVAL);
767     return EVENT_CONT;
768   case EVENT_INTERVAL:
769     return read_from_client();
770   case CACHE_EVENT_OPEN_WRITE: {
771     cwvc     = (CacheVConnection *)data;
772     int hlen = 0;
773     if (cwvc->get_header((void **)&wcache_header, &hlen) >= 0) {
774       if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || wcache_header->magic != TSMEMCACHE_HEADER_MAGIC) {
775         goto Lfail;
776       }
777       if (header.nkey != wcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + wcache_header->nkey)) {
778         goto Lfail;
779       }
780       ink_hrtime t = Thread::get_hrtime();
781       if ((static_cast<ink_hrtime>(wcache_header->settime)) <= last_flush ||
782           t >= (static_cast<ink_hrtime>(wcache_header->settime)) + HRTIME_SECONDS(wcache_header->exptime)) {
783         goto Lstale;
784       }
785       if (f.set_add) {
786         return ASCII_RESPONSE("NOT_STORED");
787       }
788     } else {
789     Lstale:
790       if (f.set_replace) {
791         return ASCII_RESPONSE("NOT_STORED");
792       }
793     }
794     memcpy(tmp_cache_header_key, key, header.nkey);
795     header.settime = Thread::get_hrtime();
796     if (exptime) {
797       if (exptime > REALTIME_MAXDELTA) {
798         if (HRTIME_SECONDS(exptime) <= (static_cast<ink_hrtime>(header.settime))) {
799           header.exptime = 0;
800         } else {
801           header.exptime = static_cast<int32_t>(exptime - (header.settime / HRTIME_SECOND));
802         }
803       } else {
804         header.exptime = exptime;
805       }
806     } else {
807       header.exptime = UINT32_MAX; // 136 years
808     }
809     if (f.set_cas) {
810       if (!wcache_header) {
811         return ASCII_RESPONSE("NOT_FOUND");
812       }
813       if (header.cas && header.cas != wcache_header->cas) {
814         return ASCII_RESPONSE("EXISTS");
815       }
816     }
817     header.cas = ink_atomic_increment(&next_cas, 1);
818     if (f.set_append || f.set_prepend) {
819       header.nbytes = nbytes + rcache_header->nbytes;
820     } else {
821       header.nbytes = nbytes;
822     }
823     cwvc->set_header(&header, header.len());
824     reader->consume(end_of_cmd);
825     end_of_cmd    = -1;
826     swallow_bytes = 2; // \r\n
827     if (f.set_append) {
828       TS_PUSH_HANDLER(&MC::tunnel_event);
829       if (!cbuf) {
830         cbuf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
831       }
832       creader = cbuf->alloc_reader();
833       crvio   = crvc->do_io_read(this, rcache_header->nbytes, cbuf);
834       cwvio   = cwvc->do_io_write(this, header.nbytes, creader);
835     } else {
836       if (f.set_prepend) {
837         int64_t a = reader->read_avail();
838         if (a >= static_cast<int64_t>(nbytes)) {
839           a = static_cast<int64_t>(nbytes);
840         }
841         if (!cbuf) {
842           cbuf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
843         }
844         creader = cbuf->alloc_reader();
845         if (a) {
846           cbuf->write(reader, a);
847           reader->consume(a);
848         }
849         if (a == static_cast<int64_t>(nbytes)) {
850           cwvio = cwvc->do_io_write(this, header.nbytes, creader);
851           goto Lstreamdone;
852         }
853         rvio->nbytes = rvio->ndone + (int64_t)nbytes - a;
854       } else {
855         creader = reader;
856       }
857       TS_PUSH_HANDLER(&MC::stream_event);
858       cwvio = cwvc->do_io_write(this, header.nbytes, creader);
859     }
860     return EVENT_CONT;
861   }
862   case TSMEMCACHE_STREAM_DONE:
863     rvio->nbytes = UINT64_MAX;
864   Lstreamdone:
865     if (f.set_prepend) {
866       TS_PUSH_HANDLER(&MC::tunnel_event);
867       crvio = crvc->do_io_read(this, rcache_header->nbytes, cbuf);
868       return EVENT_CONT;
869     }
870     return ASCII_RESPONSE("STORED");
871   case TSMEMCACHE_TUNNEL_DONE:
872     crvc->do_io_close();
873     crvc  = 0;
874     crvio = NULL;
875     if (f.set_append) {
876       int64_t a = reader->read_avail();
877       if (a > static_cast<int64_t>(nbytes)) {
878         a = static_cast<int64_t>(nbytes);
879       }
880       if (a) {
881         cbuf->write(reader, a);
882         reader->consume(a);
883       }
884       TS_PUSH_HANDLER(&MC::stream_event);
885       return handleEvent(VC_EVENT_READ_READY, rvio);
886     }
887     ink_assert(f.set_prepend);
888     cwvc->do_io_close();
889     cwvc = 0;
890     return ASCII_RESPONSE("STORED");
891   case CACHE_EVENT_OPEN_READ_FAILED:
892     swallow_bytes = nbytes + 2;
893     return ASCII_RESPONSE("NOT_STORED");
894   case CACHE_EVENT_OPEN_READ:
895     crvc = (CacheVConnection *)data;
896     return set_item();
897   default:
898     break;
899   }
900   return EVENT_CONT;
901 Lfail:
902   Warning("tsmemcache: bad cache data");
903   return ASCII_SERVER_ERROR("");
904 }
905 
906 int
ascii_delete_event(int event,void * data)907 MC::ascii_delete_event(int event, void *data)
908 {
909   switch (event) {
910   case CACHE_EVENT_REMOVE_FAILED:
911     return ASCII_RESPONSE("NOT_FOUND");
912   case CACHE_EVENT_REMOVE:
913     return ASCII_RESPONSE("DELETED");
914   default:
915     return EVENT_CONT;
916   }
917 }
918 
919 int
ascii_incr_decr_event(int event,void * data)920 MC::ascii_incr_decr_event(int event, void *data)
921 {
922   switch (event) {
923   case CACHE_EVENT_OPEN_WRITE_FAILED:
924     // another write currently in progress
925     mutex->thread_holding->schedule_in(this, TSMEMCACHE_RETRY_WRITE_INTERVAL);
926     return EVENT_CONT;
927   case EVENT_INTERVAL:
928     return read_from_client();
929   case CACHE_EVENT_OPEN_WRITE: {
930     int hlen = 0;
931     cwvc     = (CacheVConnection *)data;
932     {
933       if (cwvc->get_header((void **)&wcache_header, &hlen) >= 0) {
934         if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || wcache_header->magic != TSMEMCACHE_HEADER_MAGIC) {
935           goto Lfail;
936         }
937         if (header.nkey != wcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + wcache_header->nkey)) {
938           goto Lfail;
939         }
940         ink_hrtime t = Thread::get_hrtime();
941         if ((static_cast<ink_hrtime>(wcache_header->settime)) <= last_flush ||
942             t >= (static_cast<ink_hrtime>(wcache_header->settime)) + HRTIME_SECONDS(wcache_header->exptime)) {
943           goto Lfail;
944         }
945       } else {
946         goto Lfail;
947       }
948       memcpy(tmp_cache_header_key, key, header.nkey);
949       header.settime = Thread::get_hrtime();
950       if (exptime) {
951         if (exptime > REALTIME_MAXDELTA) {
952           if (HRTIME_SECONDS(exptime) <= (static_cast<ink_hrtime>(header.settime))) {
953             header.exptime = 0;
954           } else {
955             header.exptime = static_cast<int32_t>(exptime - (header.settime / HRTIME_SECOND));
956           }
957         } else {
958           header.exptime = exptime;
959         }
960       } else {
961         header.exptime = UINT32_MAX; // 136 years
962       }
963     }
964     header.cas = ink_atomic_increment(&next_cas, 1);
965     {
966       char *localdata = nullptr;
967       int len         = 0;
968       // must be huge, why convert to a counter ??
969       if (cwvc->get_single_data((void **)&localdata, &len) < 0) {
970         goto Lfail;
971       }
972       uint64_t new_value = xatoull(localdata, localdata + len);
973       if (f.set_incr) {
974         new_value += delta;
975       } else {
976         if (delta > new_value) {
977           new_value = 0;
978         } else {
979           new_value -= delta;
980         }
981       }
982       char new_value_str_buffer[32], *e = &new_value_str_buffer[30];
983       e[0]    = '\r';
984       e[1]    = '\n';
985       char *s = xutoa(new_value, e);
986       creader = wbuf->clone_reader(writer);
987       wbuf->write(s, e - s + 2);
988       if (f.noreply) {
989         writer->consume(e - s + 2);
990       } else {
991         wvio->reenable();
992       }
993       MCDebugBuf("tsmemcache_ascii_response", s, e - s + 2);
994       header.nbytes = e - s;
995       cwvc->set_header(&header, header.len());
996       TS_PUSH_HANDLER(&MC::stream_event);
997       cwvio = cwvc->do_io_write(this, header.nbytes, creader);
998     }
999     return EVENT_CONT;
1000   }
1001   case TSMEMCACHE_STREAM_DONE: {
1002     wbuf->dealloc_reader(creader);
1003     creader = 0;
1004     reader->consume(end_of_cmd);
1005     return read_from_client();
1006   }
1007   default:
1008     break;
1009   }
1010   return EVENT_CONT;
1011 Lfail:
1012   Warning("tsmemcache: bad cache data");
1013   return ASCII_RESPONSE("NOT_FOUND");
1014 }
1015 
1016 int
get_ascii_key(char * as,char * e)1017 MC::get_ascii_key(char *as, char *e)
1018 {
1019   char *s = as;
1020   // skip space
1021   while (*s == ' ') {
1022     s++;
1023     if (s >= e) {
1024       if (as - e >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE) {
1025         return ASCII_CLIENT_ERROR("bad command line");
1026       }
1027       return EVENT_CONT;
1028     }
1029   }
1030   // grab key
1031   key = s;
1032   while (!isspace(*s)) {
1033     if (s >= e) {
1034       if (as - e >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE) {
1035         return ASCII_RESPONSE("key too large");
1036       }
1037       return EVENT_CONT;
1038     }
1039     s++;
1040   }
1041   if (s - key > TSMEMCACHE_MAX_KEY_LEN) {
1042     return ASCII_CLIENT_ERROR("bad command line");
1043   }
1044   header.nkey = s - key;
1045   if (!header.nkey) {
1046     if (e - s >= 2) {
1047       if (*s == '\r') {
1048         s++;
1049       }
1050       if (*s == '\n' && ngets) {
1051         return ASCII_RESPONSE("END");
1052       }
1053       return ASCII_CLIENT_ERROR("bad command line");
1054     }
1055     return EVENT_CONT; // get some more
1056   }
1057   read_offset = s - as;
1058   return TSMEMCACHE_EVENT_GOT_KEY;
1059 }
1060 
1061 int
ascii_get(char * as,char * e)1062 MC::ascii_get(char *as, char *e)
1063 {
1064   SET_HANDLER(&MC::ascii_get_event);
1065   CHECK_RET(get_ascii_key(as, e), TSMEMCACHE_EVENT_GOT_KEY);
1066   ngets++;
1067   return get_item();
1068 }
1069 
1070 int
ascii_gets()1071 MC::ascii_gets()
1072 {
1073   int len = 0;
1074   char *c = get_ascii_input(TSMEMCACHE_TMP_CMD_BUFFER_SIZE, &len);
1075   return ascii_get(c, c + len);
1076 }
1077 
1078 #define SKIP_SPACE                                     \
1079   do {                                                 \
1080     while (*s == ' ') {                                \
1081       s++;                                             \
1082       if (s >= e)                                      \
1083         return ASCII_CLIENT_ERROR("bad command line"); \
1084     }                                                  \
1085   } while (0)
1086 
1087 #define SKIP_TOKEN                                     \
1088   do {                                                 \
1089     while (!isspace(*s)) {                             \
1090       s++;                                             \
1091       if (s >= e)                                      \
1092         return ASCII_CLIENT_ERROR("bad command line"); \
1093     }                                                  \
1094   } while (0)
1095 
1096 #define GET_NUM(_n)                                    \
1097   do {                                                 \
1098     if (isdigit(*s)) {                                 \
1099       _n = *s - '0';                                   \
1100       s++;                                             \
1101       if (s >= e)                                      \
1102         return ASCII_CLIENT_ERROR("bad command line"); \
1103     } else                                             \
1104       _n = 0;                                          \
1105     while (isdigit(*s)) {                              \
1106       _n *= 10;                                        \
1107       _n += *s - '0';                                  \
1108       s++;                                             \
1109       if (s >= e)                                      \
1110         return ASCII_CLIENT_ERROR("bad command line"); \
1111     }                                                  \
1112   } while (0)
1113 
1114 #define GET_SNUM(_n)                                   \
1115   do {                                                 \
1116     int neg = 0;                                       \
1117     if (*s == '-') {                                   \
1118       s++;                                             \
1119       neg = 1;                                         \
1120     }                                                  \
1121     if (isdigit(*s)) {                                 \
1122       _n = *s - '0';                                   \
1123       s++;                                             \
1124       if (s >= e)                                      \
1125         return ASCII_CLIENT_ERROR("bad command line"); \
1126     } else                                             \
1127       _n = 0;                                          \
1128     while (isdigit(*s)) {                              \
1129       _n *= 10;                                        \
1130       _n += *s - '0';                                  \
1131       s++;                                             \
1132       if (s >= e)                                      \
1133         return ASCII_CLIENT_ERROR("bad command line"); \
1134     }                                                  \
1135     if (neg)                                           \
1136       _n = -_n;                                        \
1137   } while (0)
1138 
1139 int
ascii_set(char * s,char * e)1140 MC::ascii_set(char *s, char *e)
1141 {
1142   SKIP_SPACE;
1143   key = s;
1144   SKIP_TOKEN;
1145   header.nkey = s - key;
1146   SKIP_SPACE;
1147   GET_NUM(header.flags);
1148   SKIP_SPACE;
1149   GET_SNUM(exptime);
1150   SKIP_SPACE;
1151   GET_NUM(nbytes);
1152   swallow_bytes = nbytes + 2; // assume failure
1153   if (f.set_cas) {
1154     SKIP_SPACE;
1155     GET_NUM(header.cas);
1156   } else {
1157     header.cas = 0;
1158   }
1159   SKIP_SPACE;
1160   if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) {
1161     f.noreply = 1;
1162     s += 7;
1163     if (s >= e) {
1164       return ASCII_CLIENT_ERROR("bad command line");
1165     }
1166     SKIP_SPACE;
1167   }
1168   if (*s == '\r') {
1169     s++;
1170   }
1171   if (*s == '\n') {
1172     s++;
1173   }
1174   if (s != e) {
1175     return ASCII_CLIENT_ERROR("bad command line");
1176   }
1177   SET_HANDLER(&MC::ascii_set_event);
1178   if (f.set_append || f.set_prepend) {
1179     return get_item();
1180   } else {
1181     return set_item();
1182   }
1183 }
1184 
1185 int
ascii_delete(char * s,char * e)1186 MC::ascii_delete(char *s, char *e)
1187 {
1188   SKIP_SPACE;
1189   key = s;
1190   SKIP_TOKEN;
1191   header.nkey = s - key;
1192   SKIP_SPACE;
1193   if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) {
1194     f.noreply = 1;
1195     s += 7;
1196     if (s >= e) {
1197       return ASCII_CLIENT_ERROR("bad command line");
1198     }
1199     SKIP_SPACE;
1200   }
1201   if (*s == '\r') {
1202     s++;
1203   }
1204   if (*s == '\n') {
1205     s++;
1206   }
1207   if (s != e) {
1208     return ASCII_CLIENT_ERROR("bad command line");
1209   }
1210   SET_HANDLER(&MC::ascii_delete_event);
1211   return delete_item();
1212 }
1213 
1214 int
ascii_incr_decr(char * s,char * e)1215 MC::ascii_incr_decr(char *s, char *e)
1216 {
1217   SKIP_SPACE;
1218   key = s;
1219   SKIP_TOKEN;
1220   header.nkey = s - key;
1221   SKIP_SPACE;
1222   GET_NUM(delta);
1223   SKIP_SPACE;
1224   if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) {
1225     f.noreply = 1;
1226     s += 7;
1227     if (s >= e) {
1228       return ASCII_CLIENT_ERROR("bad command line");
1229     }
1230     SKIP_SPACE;
1231   }
1232   if (*s == '\r') {
1233     s++;
1234   }
1235   if (*s == '\n') {
1236     s++;
1237   }
1238   if (s != e) {
1239     return ASCII_CLIENT_ERROR("bad command line");
1240   }
1241   SET_HANDLER(&MC::ascii_incr_decr_event);
1242   return set_item();
1243 }
1244 
1245 static int
is_end_of_cmd(char * t,char * e)1246 is_end_of_cmd(char *t, char *e)
1247 {
1248   while (*t == ' ' && t < e) {
1249     t++; // skip spaces
1250   }
1251   if (*t == '\r') {
1252     t++;
1253   }
1254   if (t != e - 1) {
1255     return 0;
1256   }
1257   return 1;
1258 }
1259 
1260 // moves *pt past the noreply if it is found
1261 static int
is_noreply(char ** pt,char * e)1262 is_noreply(char **pt, char *e)
1263 {
1264   char *t = *pt;
1265   if (t < e - 8) {
1266     while (*t == ' ') {
1267       if (t > e - 8) {
1268         return 0;
1269       }
1270       t++;
1271     }
1272     if (t[0] == 'n' && !STRCMP(t + 1, "oreply") && isspace(t[7])) {
1273       *pt = t + sizeof("noreply") - 1;
1274       return 1;
1275     }
1276   }
1277   return 0;
1278 }
1279 
1280 int
read_ascii_from_client_event(int event,void * data)1281 MC::read_ascii_from_client_event(int event, void *data)
1282 {
1283   int len = 0;
1284   char *c = get_ascii_input(TSMEMCACHE_TMP_CMD_BUFFER_SIZE, &len), *s = c;
1285   MCDebugBuf("tsmemcache_ascii_cmd", c, len);
1286   char *e = c + len - 5; // at least 6 chars
1287   while (*s == ' ' && s < e) {
1288     s++; // skip leading spaces
1289   }
1290   if (s >= e) {
1291     if (len >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE || memchr(c, '\n', len)) {
1292       return ASCII_CLIENT_ERROR("bad command line");
1293     }
1294     return EVENT_CONT;
1295   }
1296   // gets can be large, so do not require the full cmd fit in the buffer
1297   e = c + len;
1298   switch (*s) {
1299   case 'g': // get gets
1300     if (s[3] == 's' && s[4] == ' ') {
1301       f.return_cas = 1;
1302       read_offset  = 5;
1303       goto Lget;
1304     } else if (s[3] == ' ') {
1305       read_offset = 4;
1306     Lget:
1307       reader->consume(read_offset);
1308       if (c != tmp_cmd_buffer) { // all in the block
1309         return ascii_get(s + read_offset, e);
1310       } else {
1311         return ascii_gets();
1312       }
1313     }
1314     break;
1315   case 'b': // bget
1316     if (s[4] != ' ') {
1317       break;
1318     }
1319     read_offset = 5;
1320     goto Lget;
1321     break;
1322   default:
1323     break;
1324   }
1325   // find the end of the command
1326   e = static_cast<char *>(memchr(s, '\n', len));
1327   if (!e) {
1328     if (reader->read_avail() > TSMEMCACHE_MAX_CMD_SIZE) {
1329       return ASCII_CLIENT_ERROR("bad command line");
1330     }
1331     return EVENT_CONT;
1332   }
1333   e++; // skip nl
1334   end_of_cmd = e - c;
1335   switch (*s) {
1336   case 's': // set stats
1337     if (s[1] == 'e' && s[2] == 't' && s[3] == ' ') {
1338       return ascii_set(s + sizeof("set") - 1, e);
1339     }
1340     if (STRCMP_REST("tats", s + 1, e)) {
1341       break;
1342     }
1343     s += sizeof("stats") - 1;
1344     if (is_noreply(&s, e)) {
1345       break; // to please memcapable
1346     } else {
1347       return ASCII_RESPONSE("END");
1348     }
1349   case 'a': // add
1350     if (s[1] == 'd' && s[2] == 'd' && s[3] == ' ') {
1351       f.set_add = 1;
1352       return ascii_set(s + sizeof("add") - 1, e);
1353     }
1354     if (STRCMP_REST("ppend", s + 1, e)) {
1355       break;
1356     }
1357     f.set_append = 1;
1358     return ascii_set(s + sizeof("append") - 1, e);
1359   case 'p': // prepend
1360     if (STRCMP_REST("repend", s + 1, e)) {
1361       break;
1362     }
1363     f.set_prepend = 1;
1364     return ascii_set(s + sizeof("prepend") - 1, e);
1365   case 'c': // cas
1366     if (s[1] == 'a' && s[2] == 's' && s[3] == ' ') {
1367       f.set_cas = 1;
1368       return ascii_set(s + sizeof("cas") - 1, e);
1369     }
1370     break;
1371   case 'i': // incr
1372     if (s[1] == 'n' && s[2] == 'c' && s[3] == 'r' && s[4] == ' ') {
1373       f.set_incr = 1;
1374       return ascii_incr_decr(s + sizeof("incr") - 1, e);
1375     }
1376     break;
1377   case 'f': { // flush_all
1378     if (STRCMP_REST("lush_all", s + 1, e)) {
1379       break;
1380     }
1381     s += sizeof("flush_all") - 1;
1382     SKIP_SPACE;
1383     int32_t time_offset = 0;
1384     if (isdigit(*s)) {
1385       GET_NUM(time_offset);
1386     }
1387     f.noreply                 = is_noreply(&s, e);
1388     ink_hrtime new_last_flush = Thread::get_hrtime() + HRTIME_SECONDS(time_offset);
1389 #if __WORDSIZE == 64
1390     last_flush = new_last_flush; // this will be atomic for native word size
1391 #else
1392     ink_atomic_swap(&last_flush, new_last_flush);
1393 #endif
1394     if (!is_end_of_cmd(s, e)) {
1395       break;
1396     }
1397     return ASCII_RESPONSE("OK");
1398   }
1399   case 'd': // delete decr
1400     if (e - s < 5) {
1401       break;
1402     }
1403     if (s[2] == 'l') {
1404       if (s[1] == 'e' && s[3] == 'e' && s[4] == 't' && s[5] == 'e' && s[6] == ' ') {
1405         return ascii_delete(s + sizeof("delete") - 1, e);
1406       }
1407     } else if (s[1] == 'e' && s[2] == 'c' && s[3] == 'r' && s[4] == ' ') { // decr
1408       f.set_decr = 1;
1409       return ascii_incr_decr(s + sizeof("decr") - 1, e);
1410     }
1411     break;
1412   case 'r': // replace
1413     if (STRCMP_REST("eplace", s + 1, e)) {
1414       break;
1415     }
1416     f.set_replace = 1;
1417     return ascii_set(s + sizeof("replace") - 1, e);
1418   case 'q': // quit
1419     if (STRCMP_REST("uit", s + 1, e)) {
1420       break;
1421     }
1422     if (!is_end_of_cmd(s + sizeof("quit") - 1, e)) {
1423       break;
1424     }
1425     return die();
1426   case 'v': { // version
1427     if (s[3] == 's') {
1428       if (STRCMP_REST("ersion", s + 1, e)) {
1429         break;
1430       }
1431       if (!is_end_of_cmd(s + sizeof("version") - 1, e)) {
1432         break;
1433       }
1434       return ASCII_RESPONSE("VERSION " TSMEMCACHE_VERSION);
1435     } else if (s[3] == 'b') {
1436       if (STRCMP_REST("erbosity", s + 1, e)) {
1437         break;
1438       }
1439       s += sizeof("verbosity") - 1;
1440       SKIP_SPACE;
1441       if (!isdigit(*s)) {
1442         break;
1443       }
1444       GET_NUM(verbosity);
1445       f.noreply = is_noreply(&s, e);
1446       if (!is_end_of_cmd(s, e)) {
1447         break;
1448       }
1449       return ASCII_RESPONSE("OK");
1450     }
1451     break;
1452   }
1453   }
1454   return ASCII_ERROR();
1455 }
1456 
1457 int
write_then_close_event(int event,void * data)1458 MC::write_then_close_event(int event, void *data)
1459 {
1460   switch (event) {
1461   case VC_EVENT_EOS:
1462     if ((VIO *)data == wvio) {
1463       break;
1464     }
1465   // fall through
1466   case VC_EVENT_READ_READY:
1467     return EVENT_DONE; // no more of that stuff
1468   case VC_EVENT_WRITE_READY:
1469     if (wvio->buffer.reader()->read_avail() > 0) {
1470       return EVENT_CONT;
1471     }
1472     break;
1473   default:
1474     break;
1475   }
1476   return die();
1477 }
1478 
1479 int
read_from_client_event(int event,void * data)1480 MC::read_from_client_event(int event, void *data)
1481 {
1482   switch (event) {
1483   case TSMEMCACHE_STREAM_DONE:
1484     return read_from_client();
1485   case VC_EVENT_READ_READY:
1486   case VC_EVENT_EOS:
1487     if (reader->read_avail() < 1) {
1488       return EVENT_CONT;
1489     }
1490     if ((uint8_t)reader->start()[0] == (uint8_t)PROTOCOL_BINARY_REQ) {
1491       return TS_SET_CALL(&MC::read_binary_from_client_event, event, data);
1492     } else {
1493       return TS_SET_CALL(&MC::read_ascii_from_client_event, event, data);
1494     }
1495   case VC_EVENT_WRITE_READY:
1496   case VC_EVENT_WRITE_COMPLETE:
1497     break;
1498   default:
1499     return die();
1500   }
1501   return EVENT_CONT;
1502 }
1503 
1504 // between client and cache
1505 int
stream_event(int event,void * data)1506 MC::stream_event(int event, void *data)
1507 {
1508   if (data == crvio || data == cwvio) {
1509     switch (event) {
1510     case VC_EVENT_READ_READY:
1511       wvio->reenable();
1512       break;
1513     case VC_EVENT_WRITE_READY:
1514       rvio->reenable();
1515       break;
1516     case VC_EVENT_WRITE_COMPLETE:
1517     case VC_EVENT_EOS:
1518     case VC_EVENT_READ_COMPLETE:
1519       return TS_POP_CALL(TSMEMCACHE_STREAM_DONE, 0);
1520     default:
1521       return die();
1522     }
1523   } else {
1524     switch (event) {
1525     case VC_EVENT_READ_READY:
1526       if (cwvio) {
1527         if (creader != reader && creader->read_avail() < cwvio->nbytes) {
1528           int64_t a = reader->read_avail();
1529           if (a > static_cast<int64_t>(nbytes)) {
1530             a = static_cast<int64_t>(nbytes);
1531           }
1532           if (a) {
1533             cbuf->write(reader, a);
1534             reader->consume(a);
1535           }
1536         }
1537         cwvio->reenable();
1538       }
1539       break;
1540     case VC_EVENT_WRITE_READY:
1541       if (crvio) {
1542         crvio->reenable();
1543       }
1544       break;
1545     case VC_EVENT_WRITE_COMPLETE:
1546     case VC_EVENT_READ_COMPLETE:
1547       return TS_POP_CALL(TSMEMCACHE_STREAM_DONE, 0);
1548     default:
1549       return die();
1550     }
1551   }
1552   return EVENT_CONT;
1553 }
1554 
1555 // cache to cache
1556 int
tunnel_event(int event,void * data)1557 MC::tunnel_event(int event, void *data)
1558 {
1559   MCDebug("tsmemcache", "tunnel %d %p crvio %p cwvio %p", event, data, crvio, cwvio);
1560   if (data == crvio) {
1561     switch (event) {
1562     case VC_EVENT_READ_READY:
1563       cwvio->reenable();
1564       break;
1565     case VC_EVENT_EOS:
1566     case VC_EVENT_READ_COMPLETE:
1567       if (cwvio->nbytes == cwvio->ndone + cwvio->buffer.reader()->read_avail()) {
1568         cwvio->reenable();
1569         return EVENT_CONT;
1570       }
1571       return TS_POP_CALL(TSMEMCACHE_TUNNEL_DONE, 0);
1572     default:
1573       return die();
1574     }
1575   } else if (data == cwvio) {
1576     switch (event) {
1577     case VC_EVENT_WRITE_READY:
1578       crvio->reenable();
1579       break;
1580     case VC_EVENT_WRITE_COMPLETE:
1581     case VC_EVENT_EOS:
1582       return TS_POP_CALL(TSMEMCACHE_TUNNEL_DONE, 0);
1583     default:
1584       return die();
1585     }
1586   } else { // network I/O
1587     switch (event) {
1588     case VC_EVENT_READ_READY:
1589     case VC_EVENT_WRITE_READY:
1590     case VC_EVENT_WRITE_COMPLETE:
1591     case VC_EVENT_READ_COMPLETE:
1592       return EVENT_CONT;
1593     default:
1594       return die();
1595     }
1596   }
1597   return EVENT_CONT;
1598 }
1599 
1600 int
init_tsmemcache(int port)1601 init_tsmemcache(int port)
1602 {
1603   tsmemcache_constants();
1604   MCAccept *a = new MCAccept;
1605   a->mutex    = new_ProxyMutex();
1606   NetProcessor::AcceptOptions options(NetProcessor::DEFAULT_ACCEPT_OPTIONS);
1607   options.local_port = a->accept_port = port;
1608   netProcessor.accept(a, options);
1609   return 0;
1610 }
1611 
1612 void
TSPluginInit(int argc,const char * argv[])1613 TSPluginInit(int argc, const char *argv[])
1614 {
1615   ink_assert(sizeof(protocol_binary_request_header) == 24);
1616 
1617   TSPluginRegistrationInfo info;
1618   info.plugin_name   = (char *)"tsmemcache";
1619   info.vendor_name   = (char *)"ats";
1620   info.support_email = (char *)"jplevyak@apache.org";
1621 
1622   int port = 11211;
1623 
1624   if (TSPluginRegister(&info) != TS_SUCCESS) {
1625     TSError("[PluginInit] tsmemcache registration failed.\n");
1626     goto error;
1627   }
1628 
1629   if (argc < 2) {
1630     TSError("[tsmemcache] Usage: tsmemcache.so [accept_port]\n");
1631     goto error;
1632   } else {
1633     int port = atoi(argv[1]);
1634     if (!port) {
1635       TSError("[tsmemcache] bad accept_port '%s'\n", argv[1]);
1636       goto error;
1637     }
1638     MCDebug("tsmemcache", "using accept_port %d", port);
1639   }
1640   init_tsmemcache(port);
1641   return;
1642 
1643 error:
1644   TSError("[PluginInit] Plugin not initialized");
1645 }
1646