1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <sys/types.h>
6 #include <netinet/in.h>
7 #include <inttypes.h>
8
9 #include "protocol_extension.h"
10 #include <memcached/util.h>
11 #include "fragment_rw.h"
12
13 static uint8_t read_command = PROTOCOL_BINARY_CMD_READ;
14 static uint8_t write_command = PROTOCOL_BINARY_CMD_WRITE;
15
16 GET_SERVER_API server_api;
17
18 static const char *get_name(void);
19 static void setup(void (*add)(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
20 uint8_t cmd,
21 BINARY_COMMAND_CALLBACK new_handler));
22
23 static ENGINE_ERROR_CODE handle_fragment_rw(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
24 ENGINE_HANDLE* handle,
25 const void* cookie,
26 protocol_binary_request_header *request,
27 ADD_RESPONSE response);
28
29 static EXTENSION_BINARY_PROTOCOL_DESCRIPTOR descriptor = {
30 .get_name = get_name,
31 .setup = setup
32 };
33
get_name(void)34 static const char *get_name(void) {
35 return "fragment read/write";
36 }
37
setup(void (* add)(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR * descriptor,uint8_t cmd,BINARY_COMMAND_CALLBACK new_handler))38 static void setup(void (*add)(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
39 uint8_t cmd,
40 BINARY_COMMAND_CALLBACK new_handler))
41 {
42 add(&descriptor, read_command, handle_fragment_rw);
43 add(&descriptor, write_command, handle_fragment_rw);
44 }
45
create_object(ENGINE_HANDLE_V1 * v1,ENGINE_HANDLE * v,const void * cookie,const item_info * org,uint16_t vbucket,const void * data,uint64_t offset,uint64_t len,uint64_t * cas)46 static ENGINE_ERROR_CODE create_object(ENGINE_HANDLE_V1 *v1,
47 ENGINE_HANDLE *v,
48 const void *cookie,
49 const item_info *org,
50 uint16_t vbucket,
51 const void *data,
52 uint64_t offset,
53 uint64_t len,
54 uint64_t *cas)
55 {
56 ENGINE_ERROR_CODE r;
57 item *item = NULL;
58
59 r = v1->allocate(v, cookie, &item, org->key, org->nkey, org->nbytes,
60 org->flags, vbucket);
61 if (r != ENGINE_SUCCESS) {
62 return r;
63 }
64
65 item_info i2 = { .nvalue = 1 };
66 if (!v1->get_item_info(v, cookie, item, &i2)) {
67 v1->release(v, cookie, item);
68 return ENGINE_DISCONNECT;
69 }
70
71 uint8_t *dest = (void*)i2.value[0].iov_base;
72 memcpy(dest, org->value[0].iov_base, org->nbytes);
73 memcpy(dest + offset, data, len);
74
75 v1->item_set_cas(v, cookie, item, org->cas);
76 r = v1->store(v, cookie, item, cas, OPERATION_CAS, vbucket);
77 v1->release(v, cookie, item);
78 return r;
79 }
80
handle_fragment_rw(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR * descriptor,ENGINE_HANDLE * handle,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)81 static ENGINE_ERROR_CODE handle_fragment_rw(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
82 ENGINE_HANDLE* handle,
83 const void* cookie,
84 protocol_binary_request_header *request,
85 ADD_RESPONSE response)
86 {
87 if (request->request.extlen != 8 || request->request.keylen == 0) {
88 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
89 PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
90 }
91
92 protocol_binary_request_read *req = (void*)request;
93 uint8_t *key = req->bytes + sizeof(request->bytes) +
94 request->request.extlen;
95 uint16_t nkey = ntohs(request->request.keylen);
96 uint64_t offset = ntohl(req->message.body.offset);
97 uint64_t len = ntohl(req->message.body.length);
98 uint16_t vbucket = ntohs(request->request.vbucket);
99 uint64_t cas = ntohll(request->request.cas);
100 ENGINE_HANDLE_V1 *v1 = (void*)handle;
101 item *item = NULL;
102 uint8_t *data = key + nkey;
103
104 ENGINE_ERROR_CODE r = v1->get(handle, cookie, &item, key, nkey, vbucket);
105 if (r == ENGINE_SUCCESS) {
106 item_info item_info = { .nvalue = 1 };
107
108 if (!v1->get_item_info(handle, NULL, item, &item_info)) {
109 r = ENGINE_FAILED;
110 } else if (cas != 0 && item_info.cas != cas) {
111 r = ENGINE_KEY_EEXISTS;
112 } else if (offset + len > (uint64_t)item_info.nbytes) {
113 r = ENGINE_ERANGE;
114 }
115
116 if (r == ENGINE_SUCCESS) {
117 if (request->request.opcode == read_command) {
118 uint8_t *ptr;
119 ptr = ((uint8_t*)item_info.value[0].iov_base) + offset;
120 if (!response(NULL, 0, NULL, 0, ptr,
121 (uint32_t)len, PROTOCOL_BINARY_RAW_BYTES,
122 PROTOCOL_BINARY_RESPONSE_SUCCESS,
123 item_info.cas, cookie)) {
124 return ENGINE_DISCONNECT;
125 }
126 } else {
127 r = create_object(v1, handle, cookie, &item_info,
128 vbucket, data, offset, len, &cas);
129 if (r == ENGINE_SUCCESS) {
130 if (!response(NULL, 0, NULL, 0, NULL, 0,
131 PROTOCOL_BINARY_RAW_BYTES,
132 PROTOCOL_BINARY_RESPONSE_SUCCESS,
133 cas, cookie)) {
134 return ENGINE_DISCONNECT;
135 }
136 }
137 }
138 }
139
140 v1->release(handle, cookie, item);
141 }
142
143 return r;
144 }
145
146 MEMCACHED_PUBLIC_API
memcached_extensions_initialize(const char * config,GET_SERVER_API get_server_api)147 EXTENSION_ERROR_CODE memcached_extensions_initialize(const char *config,
148 GET_SERVER_API get_server_api) {
149
150 server_api = get_server_api;
151 SERVER_HANDLE_V1 *server = get_server_api();
152 if (server == NULL) {
153 return EXTENSION_FATAL;
154 }
155
156 if (config != NULL) {
157 size_t rop, wop;
158 struct config_item items[] = {
159 { .key = "r",
160 .datatype = DT_SIZE,
161 .value.dt_size = &rop },
162 { .key = "w",
163 .datatype = DT_SIZE,
164 .value.dt_size = &wop },
165 { .key = NULL}
166 };
167
168 if (server->core->parse_config(config, items, stderr) != 0) {
169 return EXTENSION_FATAL;
170 }
171
172 if (items[0].found) {
173 read_command = (uint8_t)(rop & 0xff);
174 }
175
176 if (items[1].found) {
177 write_command = (uint8_t)(wop & 0xff);
178 }
179 }
180
181 if (!server->extension->register_extension(EXTENSION_BINARY_PROTOCOL,
182 &descriptor)) {
183 return EXTENSION_FATAL;
184 }
185
186 return EXTENSION_SUCCESS;
187 }
188