1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <sys/types.h>
6 #include <netinet/in.h>
7 #include <inttypes.h>
8 
9 #include "protocol_extension.h"
10 #include <memcached/util.h>
11 #include "fragment_rw.h"
12 
13 static uint8_t read_command = PROTOCOL_BINARY_CMD_READ;
14 static uint8_t write_command = PROTOCOL_BINARY_CMD_WRITE;
15 
16 GET_SERVER_API server_api;
17 
18 static const char *get_name(void);
19 static void setup(void (*add)(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
20                               uint8_t cmd,
21                               BINARY_COMMAND_CALLBACK new_handler));
22 
23 static ENGINE_ERROR_CODE handle_fragment_rw(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
24                                             ENGINE_HANDLE* handle,
25                                             const void* cookie,
26                                             protocol_binary_request_header *request,
27                                             ADD_RESPONSE response);
28 
29 static EXTENSION_BINARY_PROTOCOL_DESCRIPTOR descriptor = {
30     .get_name = get_name,
31     .setup = setup
32 };
33 
get_name(void)34 static const char *get_name(void) {
35     return "fragment read/write";
36 }
37 
setup(void (* add)(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR * descriptor,uint8_t cmd,BINARY_COMMAND_CALLBACK new_handler))38 static void setup(void (*add)(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
39                               uint8_t cmd,
40                               BINARY_COMMAND_CALLBACK new_handler))
41 {
42     add(&descriptor, read_command, handle_fragment_rw);
43     add(&descriptor, write_command, handle_fragment_rw);
44 }
45 
create_object(ENGINE_HANDLE_V1 * v1,ENGINE_HANDLE * v,const void * cookie,const item_info * org,uint16_t vbucket,const void * data,uint64_t offset,uint64_t len,uint64_t * cas)46 static ENGINE_ERROR_CODE create_object(ENGINE_HANDLE_V1 *v1,
47                                        ENGINE_HANDLE *v,
48                                        const void *cookie,
49                                        const item_info *org,
50                                        uint16_t vbucket,
51                                        const void *data,
52                                        uint64_t offset,
53                                        uint64_t len,
54                                        uint64_t *cas)
55 {
56     ENGINE_ERROR_CODE r;
57     item *item = NULL;
58 
59     r = v1->allocate(v, cookie, &item, org->key, org->nkey, org->nbytes,
60                      org->flags, vbucket);
61     if (r != ENGINE_SUCCESS) {
62         return r;
63     }
64 
65     item_info i2 = { .nvalue = 1 };
66     if (!v1->get_item_info(v, cookie, item, &i2)) {
67         v1->release(v, cookie, item);
68         return ENGINE_DISCONNECT;
69     }
70 
71     uint8_t *dest = (void*)i2.value[0].iov_base;
72     memcpy(dest, org->value[0].iov_base, org->nbytes);
73     memcpy(dest + offset, data, len);
74 
75     v1->item_set_cas(v, cookie, item, org->cas);
76     r = v1->store(v, cookie, item, cas, OPERATION_CAS, vbucket);
77     v1->release(v, cookie, item);
78     return r;
79 }
80 
handle_fragment_rw(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR * descriptor,ENGINE_HANDLE * handle,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)81 static ENGINE_ERROR_CODE handle_fragment_rw(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
82                                             ENGINE_HANDLE* handle,
83                                             const void* cookie,
84                                             protocol_binary_request_header *request,
85                                             ADD_RESPONSE response)
86 {
87     if (request->request.extlen != 8 || request->request.keylen == 0) {
88         return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
89                         PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
90     }
91 
92     protocol_binary_request_read *req = (void*)request;
93     uint8_t *key = req->bytes + sizeof(request->bytes) +
94         request->request.extlen;
95     uint16_t nkey = ntohs(request->request.keylen);
96     uint64_t offset = ntohl(req->message.body.offset);
97     uint64_t len = ntohl(req->message.body.length);
98     uint16_t vbucket = ntohs(request->request.vbucket);
99     uint64_t cas = ntohll(request->request.cas);
100     ENGINE_HANDLE_V1 *v1 = (void*)handle;
101     item *item = NULL;
102     uint8_t *data = key + nkey;
103 
104     ENGINE_ERROR_CODE r = v1->get(handle, cookie, &item, key, nkey, vbucket);
105     if (r == ENGINE_SUCCESS) {
106         item_info item_info = { .nvalue = 1 };
107 
108         if (!v1->get_item_info(handle, NULL, item, &item_info)) {
109             r = ENGINE_FAILED;
110         } else if (cas != 0 && item_info.cas != cas) {
111             r = ENGINE_KEY_EEXISTS;
112         } else if (offset + len > (uint64_t)item_info.nbytes) {
113             r = ENGINE_ERANGE;
114         }
115 
116         if (r == ENGINE_SUCCESS) {
117             if (request->request.opcode == read_command) {
118                 uint8_t *ptr;
119                 ptr =  ((uint8_t*)item_info.value[0].iov_base) + offset;
120                 if (!response(NULL, 0, NULL, 0, ptr,
121                               (uint32_t)len, PROTOCOL_BINARY_RAW_BYTES,
122                               PROTOCOL_BINARY_RESPONSE_SUCCESS,
123                               item_info.cas, cookie)) {
124                     return ENGINE_DISCONNECT;
125                 }
126             } else {
127                 r = create_object(v1, handle, cookie, &item_info,
128                                   vbucket, data, offset, len, &cas);
129                 if (r == ENGINE_SUCCESS) {
130                     if (!response(NULL, 0, NULL, 0, NULL, 0,
131                                   PROTOCOL_BINARY_RAW_BYTES,
132                                   PROTOCOL_BINARY_RESPONSE_SUCCESS,
133                                   cas, cookie)) {
134                         return ENGINE_DISCONNECT;
135                     }
136                 }
137             }
138         }
139 
140         v1->release(handle, cookie, item);
141     }
142 
143     return r;
144 }
145 
146 MEMCACHED_PUBLIC_API
memcached_extensions_initialize(const char * config,GET_SERVER_API get_server_api)147 EXTENSION_ERROR_CODE memcached_extensions_initialize(const char *config,
148                                                      GET_SERVER_API get_server_api) {
149 
150     server_api = get_server_api;
151     SERVER_HANDLE_V1 *server = get_server_api();
152     if (server == NULL) {
153         return EXTENSION_FATAL;
154     }
155 
156     if (config != NULL) {
157         size_t rop, wop;
158         struct config_item items[] = {
159             { .key = "r",
160               .datatype = DT_SIZE,
161               .value.dt_size = &rop },
162             { .key = "w",
163               .datatype = DT_SIZE,
164               .value.dt_size = &wop },
165             { .key = NULL}
166         };
167 
168         if (server->core->parse_config(config, items, stderr) != 0) {
169             return EXTENSION_FATAL;
170         }
171 
172         if (items[0].found) {
173             read_command = (uint8_t)(rop & 0xff);
174         }
175 
176         if (items[1].found) {
177             write_command = (uint8_t)(wop & 0xff);
178         }
179     }
180 
181     if (!server->extension->register_extension(EXTENSION_BINARY_PROTOCOL,
182                                                &descriptor)) {
183         return EXTENSION_FATAL;
184     }
185 
186     return EXTENSION_SUCCESS;
187 }
188