1 #include <assert.h>
2 #include <stddef.h>
3 #include <stdint.h>
4 #include <limits.h>
5 #include <string.h>
6 #include <stdbool.h>
7 #include <sys/types.h>
8
9 #include <msgpuck/msgpuck.h>
10
11 #include <tarantool/tnt_mem.h>
12 #include <tarantool/tnt_reply.h>
13 #include <tarantool/tnt_stream.h>
14 #include <tarantool/tnt_net.h>
15 #include <tarantool/tnt_object.h>
16 #include <tarantool/tnt_buf.h>
17 #include <tarantool/tnt_proto.h>
18 #include <tarantool/tnt_schema.h>
19
20 #include <tarantool/tnt_request.h>
21
22 #include "tnt_proto_internal.h"
23
tnt_request_init(struct tnt_request * req)24 struct tnt_request *tnt_request_init(struct tnt_request *req) {
25 int alloc = (req == NULL);
26 if (req == NULL) {
27 req = tnt_mem_alloc(sizeof(struct tnt_request));
28 if (!req) return NULL;
29 }
30 memset(req, 0, sizeof(struct tnt_request));
31 req->limit = UINT32_MAX;
32 req->alloc = alloc;
33 return req;
34 };
35
tnt_request_free(struct tnt_request * req)36 void tnt_request_free(struct tnt_request *req) {
37 if (req->key_object)
38 tnt_stream_free(req->key_object);
39 req->key_object = NULL;
40 if (req->tuple_object)
41 tnt_stream_free(req->tuple_object);
42 req->tuple_object = NULL;
43 if (req->alloc) tnt_mem_free(req);
44 }
45
46 #define TNT_REQUEST_CUSTOM(NM, CNM) \
47 struct tnt_request *tnt_request_##NM(struct tnt_request *req) { \
48 req = tnt_request_init(req); \
49 if (req) { \
50 req->hdr.type = TNT_OP_##CNM; \
51 } \
52 return req; \
53 }
54
55 TNT_REQUEST_CUSTOM(select, SELECT);
56 TNT_REQUEST_CUSTOM(insert, INSERT);
57 TNT_REQUEST_CUSTOM(replace, REPLACE);
58 TNT_REQUEST_CUSTOM(update, UPDATE);
59 TNT_REQUEST_CUSTOM(delete, DELETE);
60 TNT_REQUEST_CUSTOM(call, CALL);
61 TNT_REQUEST_CUSTOM(auth, AUTH);
62 TNT_REQUEST_CUSTOM(eval, EVAL);
63 TNT_REQUEST_CUSTOM(upsert, UPSERT);
64 TNT_REQUEST_CUSTOM(ping, PING);
65
66 #undef TNT_REQUEST_CUSTOM
67
tnt_request_set_space(struct tnt_request * req,uint32_t space)68 int tnt_request_set_space(struct tnt_request *req, uint32_t space)
69 {
70 req->space_id = space;
71 return 0;
72 }
73
tnt_request_set_index(struct tnt_request * req,uint32_t index)74 int tnt_request_set_index(struct tnt_request *req, uint32_t index)
75 {
76 req->index_id = index;
77 return 0;
78 }
79
tnt_request_set_offset(struct tnt_request * req,uint32_t offset)80 int tnt_request_set_offset(struct tnt_request *req, uint32_t offset)
81 {
82 req->offset = offset;
83 return 0;
84 }
85
tnt_request_set_limit(struct tnt_request * req,uint32_t limit)86 int tnt_request_set_limit(struct tnt_request *req, uint32_t limit)
87 {
88 req->limit = limit;
89 return 0;
90 }
91
92 int
tnt_request_set_iterator(struct tnt_request * req,enum tnt_iterator_t iterator)93 tnt_request_set_iterator(struct tnt_request *req, enum tnt_iterator_t iterator)
94 {
95 req->iterator = iterator;
96 return 0;
97 }
98
tnt_request_set_index_base(struct tnt_request * req,uint32_t index_base)99 int tnt_request_set_index_base(struct tnt_request *req, uint32_t index_base)
100 {
101 req->index_base = index_base;
102 return 0;
103 }
104
tnt_request_set_key(struct tnt_request * req,struct tnt_stream * s)105 int tnt_request_set_key(struct tnt_request *req, struct tnt_stream *s)
106 {
107 req->key = TNT_SBUF_DATA(s);
108 req->key_end = req->key + TNT_SBUF_SIZE(s);
109 return 0;
110 }
111
tnt_request_set_key_format(struct tnt_request * req,const char * fmt,...)112 int tnt_request_set_key_format(struct tnt_request *req, const char *fmt, ...)
113 {
114 if (req->key_object)
115 tnt_object_reset(req->key_object);
116 else
117 req->key_object = tnt_object(NULL);
118 if (!req->key_object)
119 return -1;
120 va_list args;
121 va_start(args, fmt);
122 ssize_t res = tnt_object_vformat(req->key_object, fmt, args);
123 va_end(args);
124 if (res == -1)
125 return -1;
126 return tnt_request_set_key(req, req->key_object);
127 }
128
129 int
tnt_request_set_func(struct tnt_request * req,const char * func,uint32_t flen)130 tnt_request_set_func(struct tnt_request *req, const char *func,
131 uint32_t flen)
132 {
133 if (req->hdr.type != TNT_OP_CALL)
134 return -1;
135 if (!func)
136 return -1;
137 req->key = func; req->key_end = req->key + flen;
138 return 0;
139 }
140
141 int
tnt_request_set_funcz(struct tnt_request * req,const char * func)142 tnt_request_set_funcz(struct tnt_request *req, const char *func)
143 {
144 if (req->hdr.type != TNT_OP_CALL)
145 return -1;
146 if (!func)
147 return -1;
148 req->key = func; req->key_end = req->key + strlen(req->key);
149 return 0;
150 }
151
152 int
tnt_request_set_expr(struct tnt_request * req,const char * expr,uint32_t elen)153 tnt_request_set_expr(struct tnt_request *req, const char *expr,
154 uint32_t elen)
155 {
156 if (req->hdr.type != TNT_OP_EVAL)
157 return -1;
158 if (!expr)
159 return -1;
160 req->key = expr; req->key_end = req->key + elen;
161 return 0;
162 }
163
164 int
tnt_request_set_exprz(struct tnt_request * req,const char * expr)165 tnt_request_set_exprz(struct tnt_request *req, const char *expr)
166 {
167 if (req->hdr.type != TNT_OP_EVAL)
168 return -1;
169 if (!expr)
170 return -1;
171 req->key = expr; req->key_end = req->key + strlen(req->key);
172 return 0;
173 }
174
175 int
tnt_request_set_ops(struct tnt_request * req,struct tnt_stream * s)176 tnt_request_set_ops(struct tnt_request *req, struct tnt_stream *s)
177 {
178 if (req->hdr.type == TNT_OP_UPDATE) {
179 req->tuple = TNT_SBUF_DATA(s);
180 req->tuple_end = req->tuple + TNT_SBUF_SIZE(s);
181 return 0;
182 } else if (req->hdr.type == TNT_OP_UPSERT) {
183 req->key = TNT_SBUF_DATA(s);
184 req->key_end = req->key + TNT_SBUF_SIZE(s);
185 return 0;
186 }
187 return -1;
188 }
189
tnt_request_set_tuple(struct tnt_request * req,struct tnt_stream * s)190 int tnt_request_set_tuple(struct tnt_request *req, struct tnt_stream *s)
191 {
192 req->tuple = TNT_SBUF_DATA(s);
193 req->tuple_end = req->tuple + TNT_SBUF_SIZE(s);
194 return 0;
195 }
196
tnt_request_set_tuple_format(struct tnt_request * req,const char * fmt,...)197 int tnt_request_set_tuple_format(struct tnt_request *req, const char *fmt, ...)
198 {
199 if (req->tuple_object)
200 tnt_object_reset(req->tuple_object);
201 else
202 req->tuple_object = tnt_object(NULL);
203 if (!req->tuple_object)
204 return -1;
205 va_list args;
206 va_start(args, fmt);
207 ssize_t res = tnt_object_vformat(req->tuple_object, fmt, args);
208 va_end(args);
209 if (res == -1)
210 return -1;
211 return tnt_request_set_tuple(req, req->tuple_object);
212 }
213
214 int64_t
tnt_request_compile(struct tnt_stream * s,struct tnt_request * req)215 tnt_request_compile(struct tnt_stream *s, struct tnt_request *req)
216 {
217 enum tnt_request_t tp = req->hdr.type;
218 req->hdr.sync = s->reqid++;
219 /* header */
220 /* int (9) + 1 + sync + 1 + op */
221 struct iovec v[10]; int v_sz = 0;
222 char header[128];
223 char *pos = header + 9;
224 char *begin = pos;
225 v[v_sz].iov_base = begin;
226 v[v_sz++].iov_len = 0;
227 pos = mp_encode_map(pos, 2); /* 1 */
228 pos = mp_encode_uint(pos, TNT_CODE); /* 1 */
229 pos = mp_encode_uint(pos, req->hdr.type); /* 1 */
230 pos = mp_encode_uint(pos, TNT_SYNC); /* 1 */
231 pos = mp_encode_uint(pos, req->hdr.sync); /* 9 */
232 char *map = pos++; /* 1 */
233 size_t nd = 0;
234 if (tp < TNT_OP_CALL) {
235 pos = mp_encode_uint(pos, TNT_SPACE); /* 1 */
236 pos = mp_encode_uint(pos, req->space_id); /* 5 */
237 nd += 1;
238 }
239 if (req->index_id && (tp == TNT_OP_SELECT ||
240 tp == TNT_OP_UPDATE ||
241 tp == TNT_OP_DELETE)) {
242 pos = mp_encode_uint(pos, TNT_INDEX); /* 1 */
243 pos = mp_encode_uint(pos, req->index_id); /* 5 */
244 nd += 1;
245 }
246 if (tp == TNT_OP_SELECT) {
247 pos = mp_encode_uint(pos, TNT_LIMIT); /* 1 */
248 pos = mp_encode_uint(pos, req->limit); /* 5 */
249 nd += 1;
250 }
251 if (req->offset && tp == TNT_OP_SELECT) {
252 pos = mp_encode_uint(pos, TNT_OFFSET); /* 1 */
253 pos = mp_encode_uint(pos, req->offset); /* 5 */
254 nd += 1;
255 }
256 if (req->iterator && tp == TNT_OP_SELECT) {
257 pos = mp_encode_uint(pos, TNT_ITERATOR); /* 1 */
258 pos = mp_encode_uint(pos, req->iterator); /* 1 */
259 nd += 1;
260 }
261 if (req->key) {
262 switch (tp) {
263 case TNT_OP_EVAL:
264 pos = mp_encode_uint(pos, TNT_EXPRESSION); /* 1 */
265 pos = mp_encode_strl(pos, req->key_end - req->key); /* 5 */
266 break;
267 case TNT_OP_CALL:
268 pos = mp_encode_uint(pos, TNT_FUNCTION); /* 1 */
269 pos = mp_encode_strl(pos, req->key_end - req->key); /* 5 */
270 break;
271 case TNT_OP_SELECT:
272 case TNT_OP_UPDATE:
273 case TNT_OP_DELETE:
274 pos = mp_encode_uint(pos, TNT_KEY); /* 1 */
275 break;
276 case TNT_OP_UPSERT:
277 pos = mp_encode_uint(pos, TNT_OPS); /* 1 */
278 break;
279 default:
280 return -1;
281 }
282 v[v_sz].iov_base = begin;
283 v[v_sz++].iov_len = pos - begin;
284 begin = pos;
285 v[v_sz].iov_base = (void *)req->key;
286 v[v_sz++].iov_len = req->key_end - req->key;
287 nd += 1;
288 }
289 if (req->tuple) {
290 pos = mp_encode_uint(pos, TNT_TUPLE); /* 1 */
291 v[v_sz].iov_base = begin;
292 v[v_sz++].iov_len = pos - begin;
293 begin = pos;
294 v[v_sz].iov_base = (void *)req->tuple;
295 v[v_sz++].iov_len = req->tuple_end - req->tuple;
296 nd += 1;
297 }
298 if (req->index_base && (tp == TNT_OP_UPDATE || tp == TNT_OP_UPSERT)) {
299 pos = mp_encode_uint(pos, TNT_INDEX_BASE); /* 1 */
300 pos = mp_encode_uint(pos, req->index_base); /* 1 */
301 nd += 1;
302 }
303 assert(mp_sizeof_map(nd) == 1);
304 if (pos != begin) {
305 v[v_sz].iov_base = begin;
306 v[v_sz++].iov_len = pos - begin;
307 }
308 mp_encode_map(map, nd);
309 size_t plen = 0; nd = 0;
310 for (int i = 1; i < v_sz; ++i) plen += v[i].iov_len;
311 nd = mp_sizeof_luint32(plen);
312 v[0].iov_base -= nd;
313 v[0].iov_len += nd;
314 mp_encode_luint32(v[0].iov_base, plen);
315 ssize_t rv = s->writev(s, v, v_sz);
316 if (rv == -1)
317 return -1;
318 return req->hdr.sync;
319 }
320