1 /* vim: set ft=c */
2 /*
3
4 Copyright (C) 2011 Dmitry E. Oboukhov <unera@debian.org>
5 Copyright (C) 2011 Roman V. Nikolaev <rshadow@rambler.ru>
6
7 This program is free software, you can redistribute it and/or
8 modify it under the terms of the Artistic License.
9
10 */
11 #include "EXTERN.h"
12 #include "perl.h"
13 #include "XSUB.h"
14 #include "tp.h"
15 #include "msgpuck.h"
16
17 extern void _mpack_item(SV *res, SV *o);
18 extern const char *_munpack_item(const char *p,
19 size_t len, SV **res, HV *ext, int utf);
20
21 #define PREALLOC_SCALAR_SIZE 0
22
hash_ssave(HV * h,const char * k,const char * v)23 inline static void hash_ssave(HV *h, const char *k, const char *v) {
24 hv_store( h, k, strlen(k), newSVpvn( v, strlen(v) ), 0 );
25 }
26
hash_scsave(HV * h,const char * k,SV * sv)27 inline static void hash_scsave(HV *h, const char *k, SV *sv) {
28 hv_store( h, k, strlen(k), sv, 0);
29 }
30
hash_isave(HV * h,const char * k,uint32_t v)31 inline static void hash_isave(HV *h, const char *k, uint32_t v) {
32 hv_store( h, k, strlen(k), newSViv( v ), 0 );
33 }
34
sv_resizer(struct tp * p,size_t req,size_t * size)35 static char * sv_resizer(struct tp *p, size_t req, size_t *size) {
36 SV *sv = p->obj;
37 STRLEN new_len = tp_size(p) + req;
38 char *new_str = SvGROW(sv, new_len);
39 if (!new_str)
40 croak("Cannot allocate memory");
41 // SvCUR_set(sv, new_len);
42 *size = new_len;
43 return new_str;
44 }
45
46
tp_av_tuple(struct tp * req,AV * tuple)47 inline static void tp_av_tuple(struct tp *req, AV *tuple) {
48 int i;
49 tp_tuple(req);
50 for (i = 0; i <= av_len(tuple); i++) {
51 SV *field = *av_fetch(tuple, i, 0);
52 char *fd;
53 STRLEN fl;
54 fd = SvPV(field, fl);
55 tp_field(req, fd, fl);
56 }
57 }
58
59
fetch_tuples(HV * ret,struct tp * rep)60 inline static int fetch_tuples( HV * ret, struct tp * rep ) {
61 AV * tuples = newAV();
62 hv_store( ret, "tuples", 6, newRV_noinc( ( SV * ) tuples ), 0 );
63
64 int code;
65 while ( (code = tp_next(rep)) ) {
66
67 if (code < 0)
68 return code;
69
70 AV * t = newAV();
71 av_push( tuples, newRV_noinc( ( SV * ) t ) );
72
73 while((code = tp_nextfield(rep))) {
74 if (code < 0)
75 return code;
76 SV * f = newSVpvn(
77 tp_getfield(rep), tp_getfieldsize(rep)
78 );
79 av_push( t, f );
80 }
81 }
82 return 0;
83 }
84
85
86 #define ALLOC_RET_SV(__name, __ptr, __len, __size) \
87 SV *__name = newSVpvn("", 0); \
88 RETVAL = __name; \
89 if (__size) SvGROW(__name, __size); \
90 STRLEN __len; \
91 char *__ptr = SvPV(__name, __len);
92
93 MODULE = DR::Tarantool PACKAGE = DR::Tarantool
94 PROTOTYPES: ENABLE
95
96
97
98 SV * _pkt_select( req_id, ns, idx, offset, limit, keys )
99 unsigned req_id
100 unsigned ns
101 unsigned idx
102 unsigned offset
103 unsigned limit
104 AV * keys
105
106
107 CODE:
108 ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
109 int k;
110
111 struct tp req;
112 tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
113 tp_select(&req, ns, idx, offset, limit);
114
115 for (k = 0; k <= av_len(keys); k++) {
116 SV *t = *av_fetch(keys, k, 0);
117 if (!SvROK(t) || (SvTYPE(SvRV(t)) != SVt_PVAV))
118 croak("keys must be ARRAYREF of ARRAYREF");
119 AV *tuple = (AV *)SvRV(t);
120 tp_av_tuple(&req, (AV *)SvRV(t));
121 }
122 tp_reqid(&req, req_id);
123 SvCUR_set(ret, tp_used(&req));
124 OUTPUT:
125 RETVAL
126
127
128 SV * _pkt_ping( req_id )
129 unsigned req_id
130
131 CODE:
132 ALLOC_RET_SV(ret, b, len, 0);
133
134 struct tp req;
135 tp_init(&req, b, len, sv_resizer, ret);
136 tp_ping(&req);
137 tp_reqid(&req, req_id);
138 SvCUR_set(ret, tp_used(&req));
139
140 OUTPUT:
141 RETVAL
142
143
144
145 SV * _pkt_insert( req_id, ns, flags, tuple )
146 unsigned req_id
147 unsigned ns
148 unsigned flags
149 AV * tuple
150
151 CODE:
152 ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
153
154 struct tp req;
155 tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
156 tp_insert(&req, ns, flags);
157 tp_av_tuple(&req, tuple);
158 tp_reqid(&req, req_id);
159
160 SvCUR_set(ret, tp_used(&req));
161
162 OUTPUT:
163 RETVAL
164
165 SV * _pkt_delete( req_id, ns, flags, tuple )
166 unsigned req_id
167 unsigned ns
168 unsigned flags
169 AV *tuple
170
171 CODE:
172 ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
173
174 struct tp req;
175 tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
176 tp_delete(&req, ns, flags);
177 tp_av_tuple(&req, tuple);
178 tp_reqid(&req, req_id);
179
180 SvCUR_set(ret, tp_used(&req));
181
182 OUTPUT:
183 RETVAL
184
185
186 SV * _pkt_call_lua( req_id, flags, proc, tuple )
187 unsigned req_id
188 unsigned flags
189 SV *proc
190 AV *tuple
191
192 CODE:
193 STRLEN name_len;
194 char *name = SvPV(proc, name_len);
195
196 ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
197
198 struct tp req;
199 tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
200 tp_call(&req, flags, name, name_len);
201 tp_av_tuple(&req, tuple);
202 tp_reqid(&req, req_id);
203
204 SvCUR_set(ret, tp_used(&req));
205
206 OUTPUT:
207 RETVAL
208
209
210 SV * _pkt_update( req_id, ns, flags, tuple, operations )
211 unsigned req_id
212 unsigned ns
213 unsigned flags
214 AV *tuple
215 AV *operations
216
217 CODE:
218 ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
219 struct tp req;
220 int i;
221 tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
222 tp_update(&req, ns, flags);
223 tp_reqid(&req, req_id);
224 tp_av_tuple(&req, tuple);
225 tp_updatebegin(&req);
226
227
228
229 for (i = 0; i <= av_len( operations ); i++) {
230 uint8_t opcode;
231
232 SV *op = *av_fetch( operations, i, 0 );
233 if (!SvROK(op) || SvTYPE( SvRV(op) ) != SVt_PVAV)
234 croak("Wrong update operation format");
235 AV *aop = (AV *)SvRV(op);
236
237 int asize = av_len( aop ) + 1;
238 if ( asize < 2 )
239 croak("Too short operation argument list");
240
241 unsigned fno = SvIV( *av_fetch( aop, 0, 0 ) );
242 STRLEN size;
243 char *opname = SvPV( *av_fetch( aop, 1, 0 ), size );
244
245
246 /* delete */
247 if ( strcmp(opname, "delete") == 0 ) {
248 tp_op(&req, fno, TP_OPDELETE, "", 0);
249 continue;
250 }
251
252
253 if (asize < 3)
254 croak("Too short operation argument list");
255
256 /* assign */
257 if ( strcmp(opname, "set") == 0 ) {
258
259 char *data =
260 SvPV( *av_fetch( aop, 2, 0 ), size );
261 tp_op(&req, fno, TP_OPSET, data, size);
262 continue;
263 }
264
265 /* insert */
266 if ( strcmp(opname, "insert") == 0 ) {
267 char *data =
268 SvPV( *av_fetch( aop, 2, 0 ), size );
269 tp_op(&req, fno, TP_OPINSERT, data, size);
270 continue;
271 }
272
273
274 /* arithmetic operations */
275 if ( strcmp(opname, "add") == 0 ) {
276 opcode = TP_OPADD;
277 goto ARITH;
278 }
279 if ( strcmp(opname, "and") == 0 ) {
280 opcode = TP_OPAND;
281 goto ARITH;
282 }
283 if ( strcmp(opname, "or") == 0 ) {
284 opcode = TP_OPOR;
285 goto ARITH;
286 }
287 if ( strcmp(opname, "xor") == 0 ) {
288 opcode = TP_OPXOR;
289 goto ARITH;
290 }
291
292
293 /* substr */
294 if ( strcmp(opname, "substr") == 0 ) {
295 if (asize < 4)
296 croak("Too short argument "
297 "list for substr");
298 unsigned offset =
299 SvIV( *av_fetch( aop, 2, 0 ) );
300 unsigned length =
301 SvIV( *av_fetch( aop, 3, 0 ) );
302 char * data;
303 if ( asize > 4 && SvOK( *av_fetch( aop, 4, 0 ) ) ) {
304 data =
305 SvPV( *av_fetch( aop, 4, 0 ), size );
306 } else {
307 data = "";
308 size = 0;
309 }
310
311 tp_opsplice(&req, fno, offset, length,
312 data, size);
313
314 continue;
315 }
316
317 /* unknown command */
318 croak("unknown update operation: `%s'", opname);
319
320 ARITH: {
321 char *data =
322 SvPV( *av_fetch( aop, 2, 0 ), size );
323 if (sizeof(unsigned long long) < size)
324 size = sizeof(unsigned long long);
325 tp_op(&req, fno, opcode, data, size);
326 continue;
327 }
328
329 }
330
331 SvCUR_set(ret, tp_used(&req));
332 OUTPUT:
333 RETVAL
334
335
336
337 HV * _pkt_parse_response( response )
338 SV *response
339
340 INIT:
341 RETVAL = newHV();
342 sv_2mortal((SV *)RETVAL);
343
344 CODE:
345 /* asm("break"); */
346 if ( !SvOK(response) )
347 croak( "response is undefined" );
348 STRLEN size;
349 char *data = SvPV( response, size );
350
351 struct tp rep;
352 tp_init(&rep, data, size, NULL, 0);
353 // tp_use(&rep, size);
354
355 ssize_t code = tp_reply(&rep);
356
357 if (code == -1) {
358 hash_ssave(RETVAL, "status", "buffer");
359 hash_ssave(RETVAL, "errstr", "Input data too short");
360 } else if (code >= 0) {
361 uint32_t type = tp_replyop(&rep);
362 hash_isave(RETVAL, "code", tp_replycode(&rep) );
363 hash_isave(RETVAL, "req_id", tp_getreqid(&rep) );
364 hash_isave(RETVAL, "type", type );
365 hash_isave(RETVAL, "count", tp_replycount(&rep) );
366 if (code == 0) {
367 if (type != TP_PING)
368 code = fetch_tuples(RETVAL, &rep);
369 if (code != 0) {
370 hash_ssave(RETVAL, "status", "buffer");
371 hash_ssave(RETVAL, "errstr",
372 "Broken response");
373 } else {
374 hash_ssave(RETVAL, "status", "ok");
375 }
376 } else {
377 hash_ssave(RETVAL, "status", "error");
378 size_t el = tp_replyerrorlen(&rep);
379 SV *err;
380 if (el) {
381 char *s = tp_replyerror(&rep);
382 if (s[el - 1] == 0)
383 el--;
384 err = newSVpvn(s, el);
385 } else {
386 err = newSVpvn("", 0);
387 }
388
389 hash_scsave(RETVAL, "errstr", err);
390 }
391 }
392 OUTPUT:
393 RETVAL
394
395
396
397 unsigned TNT_PING()
398 CODE:
399 RETVAL = TP_PING;
400 OUTPUT:
401 RETVAL
402
403
404 unsigned TNT_CALL()
405 CODE:
406 RETVAL = TP_CALL;
407 OUTPUT:
408 RETVAL
409
410 unsigned TNT_INSERT()
411 CODE:
412 RETVAL = TP_INSERT;
413 OUTPUT:
414 RETVAL
415
416 unsigned TNT_UPDATE()
417 CODE:
418 RETVAL = TP_UPDATE;
419 OUTPUT:
420 RETVAL
421
422 unsigned TNT_DELETE()
423 CODE:
424 RETVAL = TP_DELETE;
425 OUTPUT:
426 RETVAL
427
428 unsigned TNT_SELECT()
429 CODE:
430 RETVAL = TP_SELECT;
431 OUTPUT:
432 RETVAL
433
434
435 unsigned TNT_FLAG_RETURN()
436 CODE:
437 RETVAL = TP_BOX_RETURN_TUPLE;
438 OUTPUT:
439 RETVAL
440
441 unsigned TNT_FLAG_ADD()
442 CODE:
443 RETVAL = TP_BOX_ADD;
444 OUTPUT:
445 RETVAL
446
447 unsigned TNT_FLAG_REPLACE()
448 CODE:
449 RETVAL = TP_BOX_REPLACE;
450 OUTPUT:
451 RETVAL
452
453
454
455 SV * _msgpack(o)
456 SV *o
457 CODE:
458 SV *res = newSVpvn("", 0);
459 RETVAL = res;
460
461 _mpack_item(res, o);
462 OUTPUT:
463 RETVAL
464
465 SV * _msgunpack(str, utf)
466 SV *str;
467 SV *utf;
468 PROTOTYPE: $$
469 CODE:
470 SV *sv = 0;
471 size_t len;
472 const char *s = SvPV(str, len);
473 if (items > 1)
474 _munpack_item(s, len, &sv, (HV *)ST(1), SvIV(utf));
475 else
476 _munpack_item(s, len, &sv, NULL, SvIV(utf));
477 RETVAL = sv;
478
479 OUTPUT:
480 RETVAL
481
482 size_t _msgcheck(str)
483 SV *str
484 PROTOTYPE: $
485 CODE:
486 int res;
487 size_t len;
488 if (SvOK(str)) {
489 const char *p = SvPV(str, len);
490 if (len > 0) {
491 const char *pe = p + len;
492 const char *begin = p;
493 if (mp_check(&p, pe) == 0) {
494 RETVAL = p - begin;
495 } else {
496 RETVAL = 0;
497 }
498 } else {
499 RETVAL = 0;
500 }
501 } else {
502 RETVAL = 0;
503 }
504 OUTPUT:
505 RETVAL
506
507
508
509