1 /* vim: set ft=c */
2 /*
3 
4  Copyright (C) 2011 Dmitry E. Oboukhov <unera@debian.org>
5  Copyright (C) 2011 Roman V. Nikolaev <rshadow@rambler.ru>
6 
7  This program is free software, you can redistribute it and/or
8  modify it under the terms of the Artistic License.
9 
10 */
11 #include "EXTERN.h"
12 #include "perl.h"
13 #include "XSUB.h"
14 #include "tp.h"
15 #include "msgpuck.h"
16 
17 extern void _mpack_item(SV *res, SV *o);
18 extern const char *_munpack_item(const char *p,
19     size_t len, SV **res, HV *ext, int utf);
20 
21 #define PREALLOC_SCALAR_SIZE		0
22 
hash_ssave(HV * h,const char * k,const char * v)23 inline static void hash_ssave(HV *h, const char *k, const char *v) {
24 	hv_store( h, k, strlen(k), newSVpvn( v, strlen(v) ), 0 );
25 }
26 
hash_scsave(HV * h,const char * k,SV * sv)27 inline static void hash_scsave(HV *h, const char *k, SV *sv) {
28 	hv_store( h, k, strlen(k), sv, 0);
29 }
30 
hash_isave(HV * h,const char * k,uint32_t v)31 inline static void hash_isave(HV *h, const char *k, uint32_t v) {
32 	hv_store( h, k, strlen(k), newSViv( v ), 0 );
33 }
34 
sv_resizer(struct tp * p,size_t req,size_t * size)35 static char * sv_resizer(struct tp *p, size_t req, size_t *size) {
36 	SV *sv = p->obj;
37 	STRLEN new_len = tp_size(p) + req;
38 	char *new_str = SvGROW(sv, new_len);
39 	if (!new_str)
40 		croak("Cannot allocate memory");
41 	// SvCUR_set(sv, new_len);
42 	*size = new_len;
43 	return new_str;
44 }
45 
46 
tp_av_tuple(struct tp * req,AV * tuple)47 inline static void tp_av_tuple(struct tp *req, AV *tuple) {
48 	int i;
49 	tp_tuple(req);
50 	for (i = 0; i <= av_len(tuple); i++) {
51 		SV *field = *av_fetch(tuple, i, 0);
52 		char *fd;
53 		STRLEN fl;
54 		fd = SvPV(field, fl);
55 		tp_field(req, fd, fl);
56 	}
57 }
58 
59 
fetch_tuples(HV * ret,struct tp * rep)60 inline static int fetch_tuples( HV * ret, struct tp * rep ) {
61 	AV * tuples = newAV();
62 	hv_store( ret, "tuples", 6, newRV_noinc( ( SV * ) tuples ), 0 );
63 
64 	int code;
65 	while ( (code = tp_next(rep)) ) {
66 
67 		if (code < 0)
68 			return code;
69 
70 		AV * t = newAV();
71 		av_push( tuples, newRV_noinc( ( SV * ) t ) );
72 
73 		while((code = tp_nextfield(rep))) {
74 			if (code < 0)
75 				return code;
76 			SV * f = newSVpvn(
77 				tp_getfield(rep), tp_getfieldsize(rep)
78 			);
79 			av_push( t, f );
80 		}
81 	}
82 	return 0;
83 }
84 
85 
86 #define ALLOC_RET_SV(__name, __ptr, __len, __size)		\
87 	SV *__name = newSVpvn("", 0);                           \
88 	RETVAL = __name;					\
89 	if (__size) SvGROW(__name, __size);			\
90 	STRLEN __len;						\
91 	char *__ptr = SvPV(__name, __len);
92 
93 MODULE = DR::Tarantool		PACKAGE = DR::Tarantool
94 PROTOTYPES: ENABLE
95 
96 
97 
98 SV * _pkt_select( req_id, ns, idx, offset, limit, keys )
99 	unsigned req_id
100 	unsigned ns
101 	unsigned idx
102 	unsigned offset
103 	unsigned limit
104 	AV * keys
105 
106 
107 	CODE:
108 		ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
109 		int k;
110 
111 		struct tp req;
112 		tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
113 		tp_select(&req, ns, idx, offset, limit);
114 
115 		for (k = 0; k <= av_len(keys); k++) {
116 			SV *t = *av_fetch(keys, k, 0);
117 			if (!SvROK(t) || (SvTYPE(SvRV(t)) != SVt_PVAV))
118 				croak("keys must be ARRAYREF of ARRAYREF");
119 			AV *tuple = (AV *)SvRV(t);
120 			tp_av_tuple(&req, (AV *)SvRV(t));
121 		}
122 		tp_reqid(&req, req_id);
123 		SvCUR_set(ret, tp_used(&req));
124 	OUTPUT:
125 		RETVAL
126 
127 
128 SV * _pkt_ping( req_id )
129 	unsigned req_id
130 
131 	CODE:
132 		ALLOC_RET_SV(ret, b, len, 0);
133 
134 		struct tp req;
135 		tp_init(&req, b, len, sv_resizer, ret);
136 		tp_ping(&req);
137 		tp_reqid(&req, req_id);
138 		SvCUR_set(ret, tp_used(&req));
139 
140 	OUTPUT:
141 		RETVAL
142 
143 
144 
145 SV * _pkt_insert( req_id, ns, flags, tuple )
146 	unsigned req_id
147 	unsigned ns
148 	unsigned flags
149 	AV * tuple
150 
151 	CODE:
152 		ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
153 
154 		struct tp req;
155 		tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
156 		tp_insert(&req, ns, flags);
157 		tp_av_tuple(&req, tuple);
158 		tp_reqid(&req, req_id);
159 
160 		SvCUR_set(ret, tp_used(&req));
161 
162 	OUTPUT:
163 		RETVAL
164 
165 SV * _pkt_delete( req_id, ns, flags, tuple )
166 	unsigned req_id
167 	unsigned ns
168 	unsigned flags
169 	AV *tuple
170 
171 	CODE:
172 		ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
173 
174 		struct tp req;
175 		tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
176 		tp_delete(&req, ns, flags);
177 		tp_av_tuple(&req, tuple);
178 		tp_reqid(&req, req_id);
179 
180 		SvCUR_set(ret, tp_used(&req));
181 
182 	OUTPUT:
183 		RETVAL
184 
185 
186 SV * _pkt_call_lua( req_id, flags, proc, tuple )
187 	unsigned req_id
188 	unsigned flags
189 	SV *proc
190 	AV *tuple
191 
192 	CODE:
193 		STRLEN name_len;
194 		char *name = SvPV(proc, name_len);
195 
196 		ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
197 
198 		struct tp req;
199 		tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
200 		tp_call(&req, flags, name, name_len);
201 		tp_av_tuple(&req, tuple);
202 		tp_reqid(&req, req_id);
203 
204 		SvCUR_set(ret, tp_used(&req));
205 
206 	OUTPUT:
207 		RETVAL
208 
209 
210 SV * _pkt_update( req_id, ns, flags, tuple, operations )
211 	unsigned req_id
212 	unsigned ns
213 	unsigned flags
214 	AV *tuple
215 	AV *operations
216 
217 	CODE:
218 		ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
219 		struct tp req;
220 		int i;
221 		tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
222 		tp_update(&req, ns, flags);
223 		tp_reqid(&req, req_id);
224 		tp_av_tuple(&req, tuple);
225 		tp_updatebegin(&req);
226 
227 
228 
229 		for (i = 0; i <= av_len( operations ); i++) {
230 			uint8_t opcode;
231 
232 			SV *op = *av_fetch( operations, i, 0 );
233 			if (!SvROK(op) || SvTYPE( SvRV(op) ) != SVt_PVAV)
234 				croak("Wrong update operation format");
235 			AV *aop = (AV *)SvRV(op);
236 
237 			int asize = av_len( aop ) + 1;
238 			if ( asize < 2 )
239 				croak("Too short operation argument list");
240 
241 			unsigned fno = SvIV( *av_fetch( aop, 0, 0 ) );
242 			STRLEN size;
243 			char *opname = SvPV( *av_fetch( aop, 1, 0 ), size );
244 
245 
246 			/* delete */
247 			if ( strcmp(opname, "delete") == 0 ) {
248 				tp_op(&req, fno, TP_OPDELETE, "", 0);
249 				continue;
250 			}
251 
252 
253 			if (asize < 3)
254 				croak("Too short operation argument list");
255 
256 			/* assign */
257 			if ( strcmp(opname, "set") == 0 ) {
258 
259 				char *data =
260 					SvPV( *av_fetch( aop, 2, 0 ), size );
261 				tp_op(&req, fno, TP_OPSET, data, size);
262 				continue;
263 			}
264 
265 			/* insert */
266 			if ( strcmp(opname, "insert") == 0 ) {
267 				char *data =
268 					 SvPV( *av_fetch( aop, 2, 0 ), size );
269 				tp_op(&req, fno, TP_OPINSERT, data, size);
270 				continue;
271 			}
272 
273 
274 			/* arithmetic operations */
275 			if ( strcmp(opname, "add") == 0 ) {
276 				opcode = TP_OPADD;
277 				goto ARITH;
278 			}
279 			if ( strcmp(opname, "and") == 0 ) {
280 				opcode = TP_OPAND;
281 				goto ARITH;
282 			}
283 			if ( strcmp(opname, "or") == 0 ) {
284 				opcode = TP_OPOR;
285 				goto ARITH;
286 			}
287 			if ( strcmp(opname, "xor") == 0 ) {
288 				opcode = TP_OPXOR;
289 				goto ARITH;
290 			}
291 
292 
293 			/* substr */
294 			if ( strcmp(opname, "substr") == 0 ) {
295 				if (asize < 4)
296 					croak("Too short argument "
297 						"list for substr");
298 				unsigned offset =
299 					SvIV( *av_fetch( aop, 2, 0 ) );
300 				unsigned length =
301 					SvIV( *av_fetch( aop, 3, 0 ) );
302 				char * data;
303 				if ( asize > 4 && SvOK( *av_fetch( aop, 4, 0 ) ) ) {
304 				    data =
305 					SvPV( *av_fetch( aop, 4, 0 ), size );
306 				} else {
307 				    data = "";
308 				    size = 0;
309 				}
310 
311 				tp_opsplice(&req, fno, offset, length,
312 					 data, size);
313 
314 				continue;
315 			}
316 
317 			/* unknown command */
318 			croak("unknown update operation: `%s'", opname);
319 
320 			ARITH: {
321 				char *data =
322 					 SvPV( *av_fetch( aop, 2, 0 ), size );
323 				if (sizeof(unsigned long long) < size)
324 				    size = sizeof(unsigned long long);
325 				tp_op(&req, fno, opcode, data, size);
326 				continue;
327 			}
328 
329 		}
330 
331 		SvCUR_set(ret, tp_used(&req));
332 	OUTPUT:
333 		RETVAL
334 
335 
336 
337 HV * _pkt_parse_response( response )
338 	SV *response
339 
340 	INIT:
341 		RETVAL = newHV();
342 		sv_2mortal((SV *)RETVAL);
343 
344 	CODE:
345 		/* asm("break"); */
346 		if ( !SvOK(response) )
347 			croak( "response is undefined" );
348 		STRLEN size;
349 		char *data = SvPV( response, size );
350 
351 		struct tp rep;
352 		tp_init(&rep, data, size, NULL, 0);
353 		// tp_use(&rep, size);
354 
355 		ssize_t code = tp_reply(&rep);
356 
357 		if (code == -1) {
358 			hash_ssave(RETVAL, "status", "buffer");
359 			hash_ssave(RETVAL, "errstr", "Input data too short");
360 		} else if (code >= 0) {
361 			uint32_t type = tp_replyop(&rep);
362 			hash_isave(RETVAL, "code", tp_replycode(&rep) );
363 			hash_isave(RETVAL, "req_id", tp_getreqid(&rep) );
364 			hash_isave(RETVAL, "type", type );
365 			hash_isave(RETVAL, "count", tp_replycount(&rep) );
366 			if (code == 0) {
367 			    if (type != TP_PING)
368 				code = fetch_tuples(RETVAL, &rep);
369 				if (code != 0) {
370 					hash_ssave(RETVAL, "status", "buffer");
371 					hash_ssave(RETVAL, "errstr",
372 						"Broken response");
373 				} else {
374 					hash_ssave(RETVAL, "status", "ok");
375 				}
376 			} else {
377 				hash_ssave(RETVAL, "status", "error");
378 				size_t el = tp_replyerrorlen(&rep);
379 				SV *err;
380 				if (el) {
381 					char *s = tp_replyerror(&rep);
382 					if (s[el - 1] == 0)
383 						el--;
384 					err = newSVpvn(s, el);
385 				} else {
386 					err = newSVpvn("", 0);
387 				}
388 
389 				hash_scsave(RETVAL, "errstr", err);
390 			}
391 		}
392 	OUTPUT:
393 		RETVAL
394 
395 
396 
397 unsigned TNT_PING()
398 	CODE:
399 		RETVAL = TP_PING;
400 	OUTPUT:
401 		RETVAL
402 
403 
404 unsigned TNT_CALL()
405 	CODE:
406 		RETVAL = TP_CALL;
407 	OUTPUT:
408 		RETVAL
409 
410 unsigned TNT_INSERT()
411 	CODE:
412 		RETVAL = TP_INSERT;
413 	OUTPUT:
414 		RETVAL
415 
416 unsigned TNT_UPDATE()
417 	CODE:
418 		RETVAL = TP_UPDATE;
419 	OUTPUT:
420 		RETVAL
421 
422 unsigned TNT_DELETE()
423 	CODE:
424 		RETVAL = TP_DELETE;
425 	OUTPUT:
426 		RETVAL
427 
428 unsigned TNT_SELECT()
429 	CODE:
430 		RETVAL = TP_SELECT;
431 	OUTPUT:
432 		RETVAL
433 
434 
435 unsigned TNT_FLAG_RETURN()
436 	CODE:
437 		RETVAL = TP_BOX_RETURN_TUPLE;
438 	OUTPUT:
439 		RETVAL
440 
441 unsigned TNT_FLAG_ADD()
442 	CODE:
443 		RETVAL = TP_BOX_ADD;
444 	OUTPUT:
445 		RETVAL
446 
447 unsigned TNT_FLAG_REPLACE()
448 	CODE:
449 		RETVAL = TP_BOX_REPLACE;
450 	OUTPUT:
451 		RETVAL
452 
453 
454 
455 SV * _msgpack(o)
456 	SV *o
457 	CODE:
458 		SV *res = newSVpvn("", 0);
459 		RETVAL = res;
460 
461 		_mpack_item(res, o);
462 	OUTPUT:
463 		RETVAL
464 
465 SV * _msgunpack(str, utf)
466 	SV *str;
467 	SV *utf;
468 	PROTOTYPE: $$
469 	CODE:
470 		SV *sv = 0;
471 		size_t len;
472 		const char *s = SvPV(str, len);
473 		if (items > 1)
474 			_munpack_item(s, len, &sv, (HV *)ST(1), SvIV(utf));
475 		else
476 			_munpack_item(s, len, &sv, NULL, SvIV(utf));
477 		RETVAL = sv;
478 
479 	OUTPUT:
480 		RETVAL
481 
482 size_t _msgcheck(str)
483         SV *str
484         PROTOTYPE: $
485         CODE:
486             int res;
487             size_t len;
488             if (SvOK(str)) {
489                 const char *p = SvPV(str, len);
490                 if (len > 0) {
491                     const char *pe = p + len;
492                     const char *begin = p;
493                     if (mp_check(&p, pe) == 0) {
494                         RETVAL = p - begin;
495                     } else {
496                         RETVAL = 0;
497                     }
498                 } else {
499                     RETVAL = 0;
500                 }
501             } else {
502                 RETVAL = 0;
503             }
504         OUTPUT:
505             RETVAL
506 
507 
508 
509