1 /*===========================================================================
2 *
3 * PUBLIC DOMAIN NOTICE
4 * National Center for Biotechnology Information
5 *
6 * This software/database is a "United States Government Work" under the
7 * terms of the United States Copyright Act. It was written as part of
8 * the author's official duties as a United States Government employee and
9 * thus cannot be copyrighted. This software/database is freely available
10 * to the public for use. The National Library of Medicine and the U.S.
11 * Government have not placed any restriction on its use or reproduction.
12 *
13 * Although all reasonable efforts have been taken to ensure the accuracy
14 * and reliability of the software and data, the NLM and the U.S.
15 * Government do not and cannot warrant the performance or results that
16 * may be obtained by using this software or data. The NLM and the U.S.
17 * Government disclaim all warranties, express or implied, including
18 * warranties of performance, merchantability or fitness for any particular
19 * purpose.
20 *
21 * Please cite the author in any work or product based on this material.
22 *
23 * ===========================================================================
24 *
25 */
26 #define BYTECODE 0
27
28 #define USE_EUGENE 1
29
30
31 #define TRACK_REFERENCES 0
32
33 #include <vdb/extern.h>
34
35 #define KONST const
36 #include "prod-priv.h"
37 #include "prod-expr.h"
38 #include "schema-priv.h"
39 #include "schema-parse.h"
40 #include "schema-expr.h"
41 #include "table-priv.h"
42 #include "cursor-priv.h"
43 #include "linker-priv.h"
44 #include "column-priv.h"
45 #include "phys-priv.h"
46 #include "blob-priv.h"
47 #include "blob.h"
48 #include "page-map.h"
49 #include "blob-headers.h"
50 #undef KONST
51
52 #include <vdb/schema.h>
53 #include <vdb/cursor.h>
54 #include <vdb/xform.h>
55 #include <klib/symbol.h>
56 #include <klib/log.h>
57 #include <klib/debug.h>
58 #include <klib/rc.h>
59 #include <sysalloc.h>
60
61 #include <ctype.h>
62 #include <os-native.h>
63 #include <stdlib.h>
64 #include <string.h>
65 #include <assert.h>
66 #include <bitstr.h>
67 #include <stdio.h>
68 #include <limits.h>
69
70 #include "bytecode.h"
71
72 #if !defined(WINDOWS) && !defined(_WIN32) && !defined(NCBI_WITHOUT_MT)
73 #define LAUNCH_PAGEMAP_THREAD 1
74 #endif
75
76
77 /*--------------------------------------------------------------------------
78 * VBlob
79 */
80
81
82 static
vblob_release(void * item,void * ignore)83 void CC vblob_release ( void *item, void *ignore )
84 {
85 TRACK_BLOB ( VBlobRelease, ( VBlob* ) item );
86 VBlobRelease ( ( VBlob* ) item );
87 }
88
89 /*--------------------------------------------------------------------------
90 * VProduction
91 */
92
93
94 /* Make
95 * allocation and parent initialization
96 * called from the "Make" functions below
97 *
98 * "prod" [ OUT ] - returned production
99 *
100 * "size" [ IN ] - sizeof sub-type
101 *
102 * "owned" [ IN ] - vector to contain productions
103 *
104 * "var" [ IN ] - variant code, e.g. prodSimple, prodFunc
105 *
106 * "sub" [ IN ] - sub-variant code, e.g. prodSimplePage2Blob
107 *
108 * "name" [ IN, NULL OKAY ] - optional object name, derived
109 * from schema if possible
110 *
111 * "fd" [ IN ] and "desc" [ IN ] - production type description
112 *
113 * "cid" [ IN ] - contextual ( two-level ) id
114 *
115 * "chain" [ IN ] - which chain(s) are supported
116 * chainEncoding - when going from input to physical
117 * chainDecoding - when pulling from physical to output
118 * chainUncommitted - when resolving trigger and validation expressions
119 */
VProductionMake(VProduction ** prodp,Vector * owned,size_t size,int var,int sub,const char * name,const VFormatdecl * fd,const VTypedesc * desc,const VCtxId * cid,uint8_t chain)120 rc_t VProductionMake ( VProduction **prodp, Vector *owned, size_t size,
121 int var, int sub, const char *name, const VFormatdecl *fd,
122 const VTypedesc *desc, const VCtxId *cid, uint8_t chain )
123 {
124 rc_t rc;
125 VProduction *prod;
126
127 #if PROD_NAME
128 size_t psize = size;
129 #endif
130
131 assert ( size >= sizeof * prod );
132
133 #if PROD_NAME
134 if ( name != NULL )
135 size += strlen ( name );
136 size += 1;
137 #endif
138
139 prod = calloc ( 1, size );
140 if ( prod == NULL )
141 rc = RC ( rcVDB, rcProduction, rcResolving, rcMemory, rcExhausted );
142 else
143 {
144 rc = VectorAppend ( owned, & prod -> oid, prod );
145 if ( rc != 0 )
146 {
147 free ( prod );
148 prod = NULL;
149 }
150 else
151 {
152 #if PROD_NAME
153 prod -> name = ( ( const char* ) prod ) + psize;
154 strcpy ( ( char* ) prod -> name, name ? name : "" );
155 #endif
156
157 if ( fd != NULL )
158 prod -> fd = * fd;
159 if ( desc != NULL )
160 prod -> desc = * desc;
161 if ( cid != NULL )
162 prod -> cid = * cid;
163
164 prod -> var = ( uint8_t ) var;
165 prod -> sub = ( uint8_t ) sub;
166 prod -> chain = chain;
167 }
168 }
169
170 * prodp = prod;
171 return rc;
172 }
173
174 #if PROD_CACHE
175 static
VProductionFlushCacheDeep(VProduction * self,const char * context)176 void VProductionFlushCacheDeep ( VProduction *self, const char *context )
177 {
178 int i;
179 for ( i = 0; i < self -> cache_cnt; ++ i )
180 {
181 #if TRACKING_BLOBS
182 if ( self -> cache [ i ] != NULL )
183 {
184 fprintf( stderr, "%p->%p(%d) dropped from cache on %s *** %s\n"
185 , self
186 , self -> cache [ i ]
187 , atomic32_read ( & self -> cache -> refcount )
188 , context
189 , self -> name
190 );
191 }
192 #endif
193 vblob_release ( self -> cache [ i ], NULL );
194 self -> cache [ i ] = NULL;
195 }
196 }
197 #endif
198
199
200 /*--------------------------------------------------------------------------
201 * VSimpleProd
202 * single input param
203 */
204
205 /* Make
206 */
VSimpleProdMake(VProduction ** prodp,Vector * owned,struct VCursor const * curs,int sub,const char * name,const VFormatdecl * fd,const VTypedesc * desc,const VCtxId * cid,VProduction * in,uint8_t chain)207 rc_t VSimpleProdMake ( VProduction **prodp, Vector *owned, struct VCursor const *curs,
208 int sub, const char *name, const VFormatdecl *fd, const VTypedesc *desc,
209 const VCtxId *cid, VProduction *in, uint8_t chain )
210 {
211 VSimpleProd *prod;
212 rc_t rc = VProductionMake ( prodp, owned, sizeof * prod,
213 prodSimple, sub, name, fd, desc, cid, chain );
214 if ( rc == 0 )
215 {
216 prod = ( VSimpleProd* ) * prodp;
217 prod -> in = in;
218 prod -> curs = curs;
219 }
220 return rc;
221 }
222
223 static
VSimpleProdPage2Blob(VSimpleProd * self,VBlob ** rslt,int64_t id,uint32_t cnt)224 rc_t VSimpleProdPage2Blob ( VSimpleProd *self, VBlob **rslt, int64_t id ,uint32_t cnt)
225 {
226 return VProductionReadBlob(self->in, rslt, & id, cnt, NULL);
227 }
228
229 static
VSimpleProdSerial2Blob(VSimpleProd * self,VBlob ** rslt,int64_t id,uint32_t cnt)230 rc_t VSimpleProdSerial2Blob ( VSimpleProd *self, VBlob **rslt, int64_t id, uint32_t cnt )
231 {
232 /* read serialized blob */
233 VBlob *sblob;
234 rc_t rc = VProductionReadBlob ( self -> in, &sblob, & id, cnt, NULL );
235 if ( rc == 0 )
236 {
237 /* recast data to 8 bit */
238 KDataBuffer buffer;
239 rc = KDataBufferCast ( & sblob -> data, & buffer, 8, false );
240 if (rc == 0)
241 {
242 /* create a new, fluffy blob having rowmap and headers */
243 VBlob *y;
244 #if LAUNCH_PAGEMAP_THREAD
245 VCursorLaunchPagemapThread ( (VCursor*) self -> curs );
246 #endif
247
248 rc = VBlobCreateFromData ( & y, sblob -> start_id, sblob -> stop_id,
249 & buffer, VTypedescSizeof ( & self -> dad . desc ), VCursorPageMapProcessRequest( self -> curs ) );
250 KDataBufferWhack ( & buffer );
251
252 /* return on success */
253 if ( rc == 0 )
254 * rslt = y;
255 }
256
257 vblob_release(sblob, NULL);
258 }
259
260 return rc;
261 }
262
263
264 static
VSimpleProdBlob2Serial(VSimpleProd * self,VBlob ** rslt,int64_t id,uint32_t cnt)265 rc_t VSimpleProdBlob2Serial( VSimpleProd *self, VBlob **rslt, int64_t id, uint32_t cnt )
266 {
267 rc_t rc;
268 VBlob *sblob;
269
270 rc = VProductionReadBlob(self->in, &sblob, & id, cnt, NULL);
271 if (rc == 0) {
272 VBlob *y;
273
274 rc = VBlobNew(&y, sblob->start_id, sblob->stop_id, "blob2serial");
275 TRACK_BLOB (VBlobNew, y);
276 if (rc == 0) {
277 rc = KDataBufferMakeBytes(&y->data, 0);
278 if (rc == 0) {
279 /* save a reference to the page map so that fixed row-length can be determined */
280 y->pm = sblob->pm;
281 PageMapAddRef(y->pm);
282
283 rc = VBlobSerialize(sblob, &y->data);
284 if (rc == 0)
285 * rslt = y;
286 }
287 if (rc)
288 vblob_release(y, NULL);
289 }
290
291 vblob_release(sblob, NULL);
292 }
293 return rc;
294 }
295
296 /* Read
297 * return a blob containing row id
298 */
VSimpleProdRead(VSimpleProd * self,VBlob ** vblob,int64_t * id,uint32_t cnt,VBlobMRUCacheCursorContext * cctx)299 rc_t VSimpleProdRead ( VSimpleProd *self, VBlob **vblob, int64_t * id, uint32_t cnt, VBlobMRUCacheCursorContext *cctx)
300 {
301 rc_t rc;
302
303 switch ( self -> dad . sub )
304 {
305 case prodSimpleCast:
306 rc = VProductionReadBlob ( self -> in, vblob, id, cnt, cctx ); /* *id may change here */
307 break;
308 case prodSimplePage2Blob:
309 return VSimpleProdPage2Blob(self, vblob, * id, cnt);
310 case prodSimpleSerial2Blob:
311 return VSimpleProdSerial2Blob(self, vblob, * id, cnt);
312 case prodSimpleBlob2Serial:
313 return VSimpleProdBlob2Serial(self, vblob, * id, cnt);
314 default:
315 * vblob = NULL;
316 return RC ( rcVDB, rcProduction, rcReading, rcProduction, rcCorrupt );
317 }
318
319 if ( rc == 0 )
320 {
321 VBlob *blob = * vblob;
322
323 /* force data buffer to reflect element size */
324 if ( self -> dad . fd . fmt == 0 &&
325 self -> dad . fd . td . type_id > 2 )
326 {
327 uint32_t elem_bits = VTypedescSizeof ( & self -> dad . desc );
328 if ( elem_bits != 0 && blob -> data . elem_bits != elem_bits )
329 {
330 rc = KDataBufferCast ( & blob -> data, & blob -> data, elem_bits, true );
331 if ( rc != 0 )
332 {
333 vblob_release ( blob, NULL );
334 * vblob = NULL;
335 }
336 }
337 }
338 }
339
340 return rc;
341 }
342
343
344 /*--------------------------------------------------------------------------
345 * VFunctionProd
346 * function input params are VProduction*
347 * extern C function pointer and self object
348 */
349
VFunctionProdMake(VFunctionProd ** prodp,Vector * owned,const VCursor * curs,int sub,const char * name,const VFormatdecl * fd,const VTypedesc * desc,uint8_t chain)350 rc_t VFunctionProdMake ( VFunctionProd **prodp, Vector *owned,
351 const VCursor *curs, int sub, const char *name,
352 const VFormatdecl *fd, const VTypedesc *desc, uint8_t chain )
353 {
354 VFunctionProd *prod;
355 rc_t rc = VProductionMake ( ( VProduction** ) prodp, owned,
356 sizeof * prod, prodFunc, sub, name, fd, desc, NULL, chain );
357 if ( rc == 0 )
358 {
359 prod = * prodp;
360 prod -> curs = curs;
361
362 if ( sub != prodFuncByteswap )
363 VectorInit ( & prod -> parms, 0, 4 );
364 else
365 {
366 SDatatype *dt = VSchemaFindTypeid ( VCursorGetSchema ( curs ), fd -> td . type_id );
367 assert ( dt != NULL );
368 prod -> u . bswap = dt -> byte_swap;
369 VectorInit ( & prod -> parms, 0, 1 );
370 }
371 }
372 return rc;
373 }
374
VFunctionProdDestroy(VFunctionProd * self)375 void VFunctionProdDestroy ( VFunctionProd *self )
376 {
377 /* release input parameters */
378 VectorWhack ( & self -> parms, NULL, NULL );
379 if ( self -> whack != NULL )
380 ( * self -> whack ) ( self -> fself );
381 }
382
383
384
385 /* Read
386 */
387
388 #define VECTOR_ALLOC_ARRAY( ARGC, ARRAY, S_ARRAY, H_ARRAY ) \
389 do { \
390 size_t const needed = (ARGC) * sizeof((ARRAY)[0]); \
391 (H_ARRAY) = NULL; \
392 (ARRAY) = &((S_ARRAY)[0]); \
393 if (needed > sizeof((S_ARRAY))) { \
394 (H_ARRAY) = malloc(needed); \
395 if ((H_ARRAY) == NULL) \
396 return RC(rcVDB, rcProduction, rcReading, rcMemory, rcExhausted); \
397 (ARRAY) = &((H_ARRAY)[0]); \
398 } \
399 memset((ARRAY), 0, needed); \
400 } while (0)
401
402 #define VECTOR_COPY_TO_ARRAY( VECTOR, ARRAY ) \
403 do { \
404 int i, n; \
405 for (n = (i = VectorStart((VECTOR))) + VectorLength((VECTOR)); i != n; ++i) \
406 (ARRAY)[i] = VectorGet((VECTOR), i); \
407 } while (0)
408
409 #define VECTOR_TO_ARRAY( ARGC, ARRAY, S_ARRAY, H_ARRAY, VECTOR ) \
410 do { \
411 VECTOR_ALLOC_ARRAY((ARGC), (ARRAY), (S_ARRAY), (H_ARRAY)); \
412 VECTOR_COPY_TO_ARRAY((VECTOR), (ARRAY)); \
413 } while (0)
414
415 static
VFunctionProdCallNDRowFunc(VFunctionProd * self,VBlob ** prslt,int64_t row_id,const VXformInfo * info,Vector * args)416 rc_t VFunctionProdCallNDRowFunc(
417 VFunctionProd *self,
418 VBlob **prslt,
419 int64_t row_id,
420 const VXformInfo *info,
421 Vector *args
422 )
423 {
424 rc_t rc;
425
426 /* create output blob
427 TBD - try to used cached blob if available */
428 #if PROD_NAME
429 rc = VBlobNew ( prslt, row_id, row_id, self->dad.name );
430 #else
431 rc = VBlobNew ( prslt, row_id, row_id, "VFunctionProdCallNDRowFunc" );
432 #endif
433 TRACK_BLOB ( VBlobNew, *prslt );
434 if ( rc == 0 )
435 {
436 VRowResult rslt;
437 VRowData on_stack [ 16 ], *on_heap, *argv;
438
439 VBlob *blob = * prslt;
440 uint32_t i, argc = VectorLength ( args );
441
442 /* create and populate array of input parameters */
443 VECTOR_ALLOC_ARRAY(argc, argv, on_stack, on_heap);
444 for ( i = 0; i < argc; ++ i )
445 {
446 const VBlob *in = VectorGet(args, i);
447 uint32_t first_elem;
448
449 /* always point to page base address */
450 argv [ i ] . u . data . base = in -> data . base;
451
452 /* get row length and starting element in one pass */
453 argv [ i ] . u . data . elem_count = PageMapGetIdxRowInfo ( in -> pm, (uint32_t)( row_id - in -> start_id ), & first_elem, NULL );
454 argv [ i ] . u . data . first_elem = first_elem;
455
456 /* finally set the element size */
457 argv [ i ] . u . data . elem_bits = in -> data . elem_bits;
458 }
459
460 /* fill out return param block
461 NB - the initially passed-in buffer
462 may be reallocated by external function */
463 rslt . data = & blob -> data;
464 rslt . elem_count = 0;
465 rslt . elem_bits = blob -> data . elem_bits =
466 VTypedescSizeof ( & self -> dad . desc );
467 rslt.no_cache = 0;
468
469 blob -> byte_order = vboNative;
470
471 /* invoke the row function */
472 rc = self -> u . ndf ( self -> fself, info, row_id, & rslt, argc, argv );
473 blob->no_cache = (rslt.no_cache ? true : false);
474
475 /* clean up input arguments */
476 if ( on_heap != NULL )
477 free ( on_heap );
478
479 /* take reallocated buffer */
480 if ( rslt . data != & blob -> data )
481 {
482 KDataBufferWhack ( & blob -> data );
483 KDataBufferSub ( rslt . data, & blob -> data, 0, UINT64_MAX );
484 KDataBufferWhack ( rslt . data );
485 }
486 blob->data.elem_count = rslt.elem_count;
487
488 /* if the function was successful incorporate row length */
489 if (rc == 0)
490 {
491 assert(rslt . elem_count >> 32 == 0);
492 rc = PageMapNewFixedRowLength ( & blob -> pm, 1, (uint32_t)rslt . elem_count );
493 if ( rc == 0 )
494 return 0;
495 }
496
497 vblob_release ( blob, NULL );
498
499 *prslt = NULL;
500 }
501
502 return rc;
503 }
504
505 /* TODO: enable in next release */
506 #define PAGEMAP_PRE_EXPANDING_SINGLE_ROW_FIX 0
507 static
VFunctionProdCallNullaryRowFunc(VFunctionProd * self,VBlob ** prslt,int64_t row_id,uint32_t row_count,const VXformInfo * info)508 rc_t VFunctionProdCallNullaryRowFunc(VFunctionProd *self,
509 VBlob **prslt,
510 int64_t row_id,
511 uint32_t row_count,
512 const VXformInfo *info)
513 {
514 rc_t rc = 0;
515 KDataBuffer scratch;
516 VRowData args[1];
517 VRowResult rslt;
518
519 memset(&scratch, 0, sizeof(scratch));
520 memset(&args[0], 0, sizeof(args[0]));
521
522 rslt.data = &scratch;
523 rslt.elem_count = 0;
524 rslt.elem_bits = scratch.elem_bits = VTypedescSizeof(&self->dad.desc);
525
526 rc = self->u.rf(self->fself, info, row_id, &rslt, 0, args);
527 if (rc == 0) {
528 VBlob *blob = NULL;
529
530 #if PROD_NAME
531 rc = VBlobNew ( &blob, -INT64_MAX - 1, INT64_MAX, self->dad.name );
532 #else
533 rc = VBlobNew ( &blob, -INT64_MAX - 1, INT64_MAX, "VFunctionProdCallDetRowFunc" );
534 #endif
535 if (rc == 0) {
536 blob->byte_order = vboNative;
537 assert(rslt.elem_count <= UINT32_MAX);
538 KDataBufferSub(rslt.data, &blob->data, 0, rslt.elem_count);
539 if ( rslt.data != & scratch )
540 KDataBufferWhack(rslt.data);
541 rc = PageMapNewSingle(&blob->pm, UINT32_MAX, (uint32_t)rslt.elem_count);
542 if (rc == 0)
543 *prslt = blob;
544 else
545 vblob_release(blob, NULL);
546 }
547 }
548 KDataBufferWhack(&scratch);
549 return rc;
550 }
551
computeWindow(uint32_t * const pwindow,int64_t const start_id,int64_t const stop_id,int64_t const row_id,uint32_t const max_blob_regroup)552 static bool computeWindow(uint32_t *const pwindow, int64_t const start_id, int64_t const stop_id, int64_t const row_id, uint32_t const max_blob_regroup)
553 {
554 int64_t window = stop_id - start_id + 1;
555 bool window_resized = false;
556
557 /*** from previous fetch **/
558
559 /** detect sequentual io ***/
560 if (row_id == stop_id + 1)
561 {
562 if (window > max_blob_regroup)
563 {
564 window = max_blob_regroup;
565 window_resized = true;
566 }
567 else if (row_id % (4 * window) == 1)
568 {
569 if (window < max_blob_regroup)
570 {
571 if (4 * window <= max_blob_regroup)
572 window *= 4;
573 else
574 window = max_blob_regroup;
575 window_resized = true;
576 }
577 /* we know that row_id lands on the first row of the new window */
578 }
579 }
580 else
581 {
582 /* random access - use tiny blob window */
583 window = 1;
584 window_resized = true;
585 }
586 assert(window <= UINT32_MAX);
587 *pwindow = window;
588 return window_resized;
589 }
590
591 #if UNIT_TEST_VDB_3058
UnitTest_VDB_3058(void)592 static void UnitTest_VDB_3058(void)
593 {
594 /* test for case of integer overflow to zero causing divide-by-zero error in computeWindow */
595 uint32_t max_blob_regroup = 1024;
596 uint32_t window = 0;
597 int64_t start_id = 1;
598 int64_t stop_id = UINT32_MAX / 4 + 1;
599 int64_t row_id = stop_id + 1;
600 bool window_resized = computeWindow(&window, start_id, stop_id, row_id, max_blob_regroup);
601
602 /* this output isn't important, it's just to make sure that
603 * the compiler doesn't optimize out the relevant computations */
604 printf("window: %u\nwindow_resized: %s\n", window, window_resized ? "true" : "false");
605 }
606 #endif
607
608 static
VFunctionProdCallRowFunc1(VFunctionProd * self,VBlob ** prslt,int64_t row_id,uint32_t row_count,const VXformInfo * info,Vector * args,int64_t param_start_id,int64_t param_stop_id)609 rc_t VFunctionProdCallRowFunc1( VFunctionProd *self, VBlob **prslt, int64_t row_id,
610 uint32_t row_count, const VXformInfo *info, Vector *args,int64_t param_start_id,int64_t param_stop_id)
611 {
612 rc_t rc;
613 uint32_t i, argc = VectorLength ( args );
614 VRowResult rslt;
615 VRowData args_os[16], *args_oh, *argv;
616 KDataBuffer scratch;
617 VBlob *blob;
618 const VBlob *in;
619 PageMapIterator iter_os[16], *iter_oh, *iter;
620 uint64_t last = 0;
621 uint32_t last_len = 0;
622 uint32_t window;
623 uint32_t min_row_count=UINT32_MAX; /* will increase row_count due to larger common repeat count of parameters */
624 int64_t row_id_max=0;
625 uint32_t max_blob_regroup; /** max rows in blob for regrouping ***/
626 bool function_failed = false;
627 bool window_resized = false;
628
629 if ( VCursorCacheActive ( self -> curs, & row_id_max ) )
630 { /*** since cache_cursor exist, trying to avoid prefetching data which is in cache cursor ***/
631 max_blob_regroup=256;
632 } else
633 {
634 max_blob_regroup=1024;
635 }
636
637 if(self->dad.sub == vftRowFast)
638 {
639 window = max_blob_regroup;
640 }
641 else
642 {
643 window_resized = computeWindow(&window, self->start_id, self->stop_id, row_id, max_blob_regroup);
644 }
645
646 assert ( 0 < window && window <= max_blob_regroup );
647
648 if(window == 1)
649 {
650 /* random access or initial blob - create blob with initial row-count */
651 self->start_id=self->stop_id=row_id;
652 if(row_count > 0)
653 self->stop_id += row_count-1;
654 }
655 else
656 {
657 /* start out with supplied row range */
658 self->start_id=param_start_id;
659 self->stop_id =param_stop_id;
660 assert(row_id >= self->start_id && row_id + row_count -1 <= self->stop_id);
661
662 /* special code to detect an old-style static column with infinite range */
663 if(self->start_id==-INT64_MAX - 1 || self->stop_id==INT64_MAX)
664 {
665 /* same logic as above */
666 self->start_id=self->stop_id=row_id;
667 if(row_count > 0)
668 self->stop_id += row_count-1;
669 }
670
671 /* this code only executes if requested row-count is 1 */
672 else if ( row_count == 1 )
673 {
674 /* the blob itself has to be at least twice the window size.
675 this may be a problem because once "window" becomes large enough,
676 it could cause us to switch from "reblobbing" back to whole blob. */
677 if ( self->stop_id - self->start_id > 2*window)
678 {
679 /* determine which "window" ( relative to start of TABLE ) contains row_id */
680 int64_t n=(row_id-1)/window;
681
682 /* look at blob left edge, move up to window left edge if possible */
683 if(self->start_id <= n*window)
684 self->start_id=n*window+1;
685
686 /* look at blob right edge, move down to window right edge if possible */
687 if(self->stop_id > (n+1) * window)
688 self->stop_id = (n+1)*window;
689
690 /* eventual window from self->start_id..self->stop_id must be <= "window" size */
691 assert ( self -> start_id <= self -> start_id );
692 assert ( self -> stop_id - self -> start_id + 1 <= window );
693 }
694 else if ( window_resized )
695 {
696 self -> start_id = row_id;
697 }
698 else
699 {
700 /* handle case when the window has grown large enough
701 to disable reblobbing after blobbing was once in place */
702 }
703 }
704 }
705
706 /* create and populate array of input parameters */
707 VECTOR_ALLOC_ARRAY(argc, argv, args_os, args_oh);
708 VECTOR_ALLOC_ARRAY(argc, iter, iter_os, iter_oh);
709 for (i = 0; i != argc; ++ i){
710
711 in = VectorGet(args, i);
712
713 if(in->start_id == -INT64_MAX - 1 ) {
714 rc = PageMapNewIterator(in->pm, &iter[i],0,-1);
715 } else if(param_stop_id>self->stop_id && param_stop_id < INT64_MAX){
716 rc = PageMapNewIterator(in->pm, &iter[i], self->start_id-in->start_id, param_stop_id - self->start_id + 1);
717 if(rc == 0){
718 uint32_t n = PageMapIteratorRepeatCount(&iter[i]);
719 if(n < min_row_count) min_row_count=n;
720 }
721 } else {
722 rc = PageMapNewIterator(in->pm, &iter[i], self->start_id-in->start_id, self->stop_id - self->start_id + 1);
723 }
724 if ( rc ) break;
725 argv[i].variant = vrdData;
726 argv[i].blob_stop_id = in->stop_id;
727 argv[i].u.data.elem_bits = in->data.elem_bits;
728 argv[i].u.data.base = in->data.base;
729 argv[i].u.data.base_elem_count = in->data.elem_count;
730 }
731 if(min_row_count < UINT32_MAX && self->start_id + min_row_count -1 > self->stop_id ){
732 self->stop_id = self->start_id + min_row_count - 1;
733 }
734 if(row_id_max >= row_id && self->stop_id > row_id_max) self->stop_id = row_id_max;
735
736
737
738
739 #if PROD_NAME
740 rc = VBlobNew ( &blob, self->start_id, self->stop_id, self->dad.name );
741 #else
742 rc = VBlobNew ( &blob, self->start_id, self->stop_id, "VFunctionProdCallDetRowFunc" );
743 #endif
744 TRACK_BLOB ( VBlobNew, blob );
745 if (rc)
746 return rc;
747
748 #if PAGEMAP_PRE_EXPANDING_SINGLE_ROW_FIX
749 #else
750 rc = PageMapNew(&blob->pm, row_count /**BlobRowCount(blob)**/);
751 #if 0
752 /* disabled for causing problems with accumulating static columns */
753 if (rc == 0)
754 rc = PageMapPreExpandFull(blob->pm, BlobRowCount(blob));
755 #endif
756 if (rc) {
757 vblob_release(blob, NULL);
758 return rc;
759 }
760 #endif
761
762 memset(&scratch, 0, sizeof(scratch));
763 rslt.data = &scratch;
764 rslt.elem_bits = scratch.elem_bits = blob->data.elem_bits = VTypedescSizeof(&self->dad.desc);
765 blob->byte_order = vboNative;
766
767
768
769 for (row_id = self->start_id; row_id <= self->stop_id && rc == 0; ) {
770 uint32_t row_count = 1;
771 if(self->dad.sub == vftRow || self->dad.sub ==vftRowFast ){
772 row_count = PageMapIteratorRepeatCount(&iter[0]);
773
774 for (i = 1; i != argc; ++i) {
775 uint32_t j = PageMapIteratorRepeatCount(&iter[i]);
776 if (row_count > j)
777 row_count = j;
778 }
779 if (row_id + row_count > self->stop_id + 1)
780 row_count = (uint32_t)( self->stop_id + 1 - row_id );
781 }
782
783 for (i = 0; i != argc; ++i) {
784 argv[i].u.data.elem_count = PageMapIteratorDataLength(&iter[i]);
785 argv[i].u.data.first_elem = PageMapIteratorDataOffset(&iter[i]);
786 }
787
788 rslt.elem_count = 0;
789 rc = self->u.rf(self->fself, info, row_id, &rslt, argc, argv);
790 if (rc) {
791 function_failed = true;
792 break;
793 }
794
795 assert(rslt.elem_count >> 32 == 0);
796
797 if (row_id == self->start_id) {
798 #if PAGEMAP_PRE_EXPANDING_SINGLE_ROW_FIX
799 if (blob->start_id + row_count > blob->stop_id) {
800 last = blob->data.elem_count;
801 rc = KDataBufferResize(&blob->data, blob->data.elem_count + rslt.elem_count);
802 if (rc == 0) {
803 bitcpy(blob->data.base, last * rslt.elem_bits,
804 rslt.data->base, 0, rslt.elem_count * rslt.elem_bits);
805 rc = PageMapNewSingle(&blob->pm, row_count, rslt.elem_count);
806 }
807 }
808 else
809 goto ADD_ROW_TO_BLOB;
810 #else
811 goto ADD_ROW_TO_BLOB;
812 #endif
813 }
814 else if (last_len != rslt.elem_count ||
815 bitcmp(blob->data.base, last * rslt.elem_bits,
816 rslt.data->base, 0, rslt.elem_count * rslt.elem_bits) != 0)
817 {
818 ADD_ROW_TO_BLOB:
819 last = blob->data.elem_count;
820 rc = KDataBufferResize(&blob->data, blob->data.elem_count + rslt.elem_count);
821 if (rc == 0) {
822 bitcpy(blob->data.base, last * rslt.elem_bits,
823 rslt.data->base, 0, rslt.elem_count * rslt.elem_bits);
824 rc = PageMapAppendRows(blob->pm, rslt.elem_count, row_count, false);
825 }
826 }
827 else
828 rc = PageMapAppendRows(blob->pm, rslt.elem_count, row_count, true);
829
830 /* drop any new buffer that was returned to us */
831 if (rslt.data != &scratch) {
832 KDataBufferWhack(rslt.data);
833 }
834
835 if (rc) break;
836
837 last_len = (uint32_t)rslt.elem_count;
838
839 for (i = 0; i != argc; ++i)
840 PageMapIteratorAdvance(&iter[i], row_count);
841 row_id += row_count;
842 }
843 KDataBufferWhack(&scratch);
844 if (args_oh) free(args_oh);
845 if (iter_oh) free(iter_oh);
846
847 if (rc == 0 || (function_failed && row_id > self->start_id)) {
848 *prslt = blob;
849 return 0;
850 }
851 vblob_release(blob, NULL);
852
853 return rc;
854 }
855
856 static
VFunctionProdCallRowFunc(VFunctionProd * self,VBlob ** prslt,int64_t row_id,uint32_t row_count,const VXformInfo * info,Vector * args,int64_t param_start_id,int64_t param_stop_id)857 rc_t VFunctionProdCallRowFunc( VFunctionProd *self, VBlob **prslt, int64_t row_id,
858 uint32_t row_count, const VXformInfo *info, Vector *args,int64_t param_start_id,int64_t param_stop_id)
859 {
860 uint32_t const argc = VectorLength(args);
861
862 if (argc == 0)
863 return VFunctionProdCallNullaryRowFunc(self, prslt, row_id, row_count, info);
864 else
865 return VFunctionProdCallRowFunc1(self, prslt, row_id, row_count, info, args, param_start_id, param_stop_id);
866 }
867
868 static
VFunctionProdCallArrayFunc(VFunctionProd * self,VBlob ** prslt,int64_t id,const VXformInfo * info,Vector * args)869 rc_t VFunctionProdCallArrayFunc( VFunctionProd *self, VBlob **prslt,
870 int64_t id, const VXformInfo *info, Vector *args ) {
871 VBlob *rslt = 0;
872 VBlob *sblob;
873 rc_t rc;
874
875 sblob = VectorGet(args, 0);
876 assert(sblob);
877
878 #if PROD_NAME
879 rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, self->dad.name);
880 #else
881 rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, "VFunctionProdCallArrayFunc");
882 #endif
883 TRACK_BLOB( VBlobNew, rslt );
884 if (rc == 0) {
885 rslt->pm = sblob->pm;
886 PageMapAddRef(rslt->pm);
887
888 if (sblob->headers) {
889 if ( self -> dad . chain == chainEncoding )
890 rc = BlobHeadersCreateChild(sblob->headers, &rslt->headers);
891 else {
892 rslt->headers = (BlobHeaders *)BlobHeadersGetNextFrame(sblob->headers);
893 BlobHeadersAddRef(rslt->headers);
894 }
895 }
896 if (rc == 0) {
897 rc = KDataBufferMake(&rslt->data, VTypedescSizeof(&self->dad.desc), sblob->data.elem_count);
898 if (rc == 0) {
899 rc = self->u.af(
900 self->fself,
901 info,
902 rslt->data.base,
903 sblob->data.base,
904 sblob->data.elem_count
905 );
906 if (rc == 0) {
907 *prslt = rslt;
908 return 0;
909 }
910 }
911 }
912
913 vblob_release( rslt, NULL );
914 }
915
916 return rc;
917 }
918
919 static
VFunctionProdCallPageFunc(VFunctionProd * self,VBlob ** rslt,int64_t id,const VXformInfo * info,Vector * args)920 rc_t VFunctionProdCallPageFunc( VFunctionProd *self, VBlob **rslt, int64_t id,
921 const VXformInfo *info, Vector *args )
922 {
923 struct input_t {
924 const VBlob *blob;
925 bool sb_input;
926 PageMapIterator cur_row;
927 bool at_end;
928 };
929 struct input_t on_stack[8];
930 struct input_t *on_heap;
931 struct input_t *argv;
932
933 VRowData pb_stack[8];
934 VRowData *pb_heap;
935 VRowData *param;
936
937 rc_t rc=0;
938 uint32_t i, argc = VectorLength(args);
939 int64_t start_id;
940 int64_t stop_id;
941 uint32_t elem_count;
942 uint32_t row_count;
943 int first_non_control_input;
944 int allInputsAreSingleRow;
945 VBlob *blob = NULL;
946
947 VECTOR_ALLOC_ARRAY(argc, argv, on_stack, on_heap);
948 VECTOR_ALLOC_ARRAY(argc, param, pb_stack, pb_heap);
949
950 for (start_id = stop_id = 0,
951 first_non_control_input=-1,
952 allInputsAreSingleRow = true,
953 i = 0; i != argc ; ++i) {
954 const VBlob *b = (const VBlob *)VectorGet(args, i);
955 const VProduction *prod = (const VProduction *)VectorGet(&self->parms, i);
956
957
958 if(b->pm == NULL){
959 rc=PageMapProcessGetPagemap( VCursorPageMapProcessRequest ( self -> curs ) ,(PageMap**)(&b->pm));
960 if(rc != 0) return rc;
961 }
962
963 if (prod->control) {
964 param[i].variant = vrdControl;
965 assert(0); /*** TBD: Not implemented ???? ***/
966 } else {
967 param[i].variant = vrdData;
968 argv[i].blob = b;
969 argv[i].sb_input = VBlobIsSingleRow(argv[i].blob);
970 allInputsAreSingleRow &= argv[i].sb_input;
971
972 if(first_non_control_input < 0){
973 first_non_control_input = i;
974 start_id = argv[i].blob->start_id;
975 stop_id = argv[i].blob->stop_id;
976 } else {
977 if(start_id < argv[i].blob->start_id)
978 start_id = argv[i].blob->start_id;
979 if(stop_id > argv[i].blob->stop_id)
980 stop_id = argv[i].blob->stop_id;
981 }
982 }
983 }
984 if ( allInputsAreSingleRow ) {
985 row_count = stop_id - start_id + 1;
986 if(row_count == 0 ) /*** case of static column **/
987 row_count=1;
988 } else {
989 row_count = stop_id - start_id + 1;
990 }
991 if (first_non_control_input < 0) /* no non-control inputs */
992 rc = RC(rcVDB, rcFunction, rcExecuting, rcParam, rcInvalid);
993 else if (start_id > stop_id )
994 rc = RC(rcVDB, rcFunction, rcExecuting, rcRange, rcEmpty);
995
996 for ( elem_count = 0, i = 0; i != argc && rc ==0; ++i) {
997 rc = PageMapNewIterator(argv[i].blob->pm, &argv[i].cur_row, start_id - argv[i].blob->start_id ,row_count);
998 if(rc == 0){
999 PageMapIterator temp = argv[i].cur_row;
1000 uint32_t ec = 0;
1001
1002 if ( argv[i].sb_input ){
1003 ec = PageMapIteratorDataLength(&temp) * row_count;
1004 } else do {
1005 ec+=PageMapIteratorDataLength(&temp);
1006 } while (PageMapIteratorNext(&temp));
1007 if(ec == 0){
1008 rc = RC(rcVDB, rcFunction, rcExecuting, rcPagemap, rcInvalid); /* bad page map */
1009 } else if (elem_count == 0){
1010 elem_count=ec;
1011 } else if (ec != elem_count){
1012 rc = RC(rcVDB, rcFunction, rcExecuting, rcParam, rcInvalid); /* Pages have to have the same number of elements*/
1013 }
1014 }
1015 }
1016
1017 while (rc == 0) /* not really while */ {
1018 #if PROD_NAME
1019 rc = VBlobNew(&blob, start_id, stop_id, self->dad.name);
1020 #else
1021 rc = VBlobNew(&blob, start_id, stop_id, "VFunctionProdCallPageFunc");
1022 #endif
1023 if (rc) break;
1024
1025 TRACK_BLOB(VBlobNew,blob);
1026
1027 if (allInputsAreSingleRow) {
1028 VFixedRowResult rslt;
1029 uint32_t row_element_count = PageMapIteratorDataLength(&argv[first_non_control_input].cur_row);
1030
1031 rc = PageMapNewSingle(&blob->pm, row_count, row_element_count);
1032 if (rc) break;
1033
1034 rc = KDataBufferMake(&blob->data, VTypedescSizeof(&self->dad.desc), row_element_count);
1035 if (rc) break;
1036
1037 for (i = 0; i != argc; ++i) {
1038 if (param[i].variant == vrdControl)
1039 continue;
1040 if (argv[i].at_end) {
1041 rc = RC(rcVDB, rcFunction, rcExecuting, rcRow, rcNotFound);
1042 break;
1043 }
1044
1045 param[i].u.data.base = argv[i].blob->data.base;
1046 param[i].u.data.elem_count = row_element_count;
1047 param[i].u.data.first_elem = PageMapIteratorDataOffset(&argv[i].cur_row);
1048 param[i].u.data.elem_bits = argv[i].blob->data.elem_bits;
1049
1050 argv[i].at_end = PageMapIteratorNext(&argv[i].cur_row) ? false : true;
1051 }
1052 if (rc) break;
1053
1054 rslt.base = blob->data.base;
1055 rslt.first_elem = 0;
1056 rslt.elem_count = row_element_count;
1057 rslt.elem_bits = blob->data.elem_bits;
1058
1059 rc = self->u.pf(self->fself, info, start_id, &rslt, argc, param);
1060 } else {
1061 uint32_t first_write;
1062 int64_t row_id;
1063 uint32_t last = 0;
1064 uint32_t last_rowlen = 0;
1065
1066 rc = PageMapNew(&blob->pm, row_count); /*** max number of rows - it may collapse some **/
1067 if (rc) break;
1068 rc = KDataBufferMake(&blob->data, VTypedescSizeof(&self->dad.desc), elem_count);
1069 if (rc) break;
1070
1071 for (first_write = 0, row_id = start_id; row_id <= stop_id; ++row_id) {
1072 VFixedRowResult rslt;
1073
1074 for (i = 0; i != argc; ++i) {
1075 if (param[i].variant == vrdControl)
1076 continue;
1077 if (argv[i].at_end) {
1078 rc = RC(rcVDB, rcFunction, rcExecuting, rcRow, rcNotFound);
1079 break;
1080 }
1081
1082 param[i].u.data.base = argv[i].blob->data.base;
1083 param[i].u.data.elem_count = PageMapIteratorDataLength(&argv[i].cur_row);
1084 param[i].u.data.first_elem = PageMapIteratorDataOffset(&argv[i].cur_row);
1085 param[i].u.data.elem_bits = argv[i].blob->data.elem_bits;
1086
1087 argv[i].at_end = PageMapIteratorNext(&argv[i].cur_row) ? false : true;
1088 }
1089 if (rc)
1090 break;
1091
1092 rslt.base = blob->data.base;
1093 rslt.first_elem = first_write;
1094 rslt.elem_count = param[first_non_control_input].u.data.elem_count;
1095 rslt.elem_bits = blob->data.elem_bits;
1096
1097 rc = self->u.pf(self->fself, info, row_id, &rslt, argc, param);
1098 if (rc)
1099 break;
1100
1101 assert(rslt.elem_count >> 32 == 0);
1102 if ( row_id != start_id && last_rowlen == rslt.elem_count &&
1103 memcmp(((char*)blob->data.base) + (last*rslt.elem_bits)/8,
1104 ((char*)blob->data.base) + (first_write*rslt.elem_bits)/8,
1105 (rslt.elem_count*rslt.elem_bits)/8) == 0)
1106 {
1107 rc = PageMapAppendRow(blob->pm, (uint32_t)rslt.elem_count, true);
1108 }
1109 else {
1110 last = first_write;
1111 first_write += rslt.elem_count;
1112 rc = PageMapAppendRow(blob->pm, (uint32_t)rslt.elem_count, false);
1113 }
1114 if (rc)
1115 break;
1116 last_rowlen = (uint32_t)rslt.elem_count;
1117 }
1118 if (rc)
1119 break;
1120 KDataBufferSub(&blob->data, &blob->data, 0, first_write);
1121 }
1122 *rslt = blob;
1123 break;
1124 }
1125 if (rc != 0 && blob != NULL) vblob_release(blob, NULL);
1126 if (on_heap) free(on_heap);
1127 if (pb_heap) free(pb_heap);
1128 return rc;
1129 }
1130
1131 static
VFunctionProdCallBlobFuncEncoding(VFunctionProd * self,VBlob * rslt,int64_t id,const VXformInfo * info,const VBlob * sblob)1132 rc_t VFunctionProdCallBlobFuncEncoding( VFunctionProd *self, VBlob *rslt, int64_t id,
1133 const VXformInfo *info, const VBlob *sblob ) {
1134 VBlobData src;
1135 VBlobResult dst;
1136 VBlobHeader *hdr;
1137 rc_t rc;
1138 uint32_t elem_size = VTypedescSizeof(&self->dad.desc);
1139
1140 rc = BlobHeadersCreateChild(sblob->headers, &rslt->headers);
1141 if (rc == 0) {
1142 hdr = BlobHeadersGetHdrWrite(rslt->headers);
1143 if (hdr) {
1144 bitsz_t sz = KDataBufferBits(&sblob->data);
1145
1146 VBlobHeaderSetSourceSize(hdr, KDataBufferBytes(&sblob->data));
1147 sz = (sz + elem_size - 1) / elem_size;
1148 rc = KDataBufferMake( &rslt->data, elem_size, sz );
1149 }
1150 else
1151 rc = RC(rcVDB, rcFunction, rcExecuting, rcMemory, rcExhausted);
1152 }
1153 if (rc)
1154 return rc;
1155
1156 dst.header = NULL;
1157
1158 if ( sblob -> data.elem_count == 0)
1159 goto SKIP_COMPRESSION;
1160
1161 src.data = sblob -> data.base;
1162 src.elem_count = sblob -> data.elem_count;
1163 src.elem_bits = sblob -> data.elem_bits;
1164 src.byte_order = sblob -> byte_order;
1165
1166 dst.data = rslt -> data.base;
1167 dst.elem_count = rslt -> data.elem_count;
1168 dst.elem_bits = rslt -> data.elem_bits;
1169 dst.byte_order = sblob -> byte_order;
1170
1171 rc = self->u.bf(self->fself, info, &dst, &src, hdr);
1172
1173 if (rc == 0) {
1174 if ( dst.header != NULL && dst.header != hdr ) {
1175 VBlobHeaderReplace ( hdr, dst.header );
1176 VBlobHeaderRelease ( dst.header );
1177 }
1178 rslt->data.elem_bits = dst.elem_bits;
1179 rslt->data.elem_count = dst.elem_count;
1180 rslt->byte_order = dst.byte_order;
1181 }
1182 else if (GetRCObject(rc) == (enum RCObject)rcBuffer && GetRCState(rc) == rcInsufficient) {
1183 SKIP_COMPRESSION:
1184 VBlobHeaderSetFlags(hdr, 1);
1185
1186 KDataBufferWhack(&rslt->data);
1187 if ( dst.header != NULL && dst.header != hdr )
1188 VBlobHeaderRelease ( dst.header );
1189
1190 /* compressors usually produce bits (elem_size == 1) or bytes (elem_size == 8)
1191 * the cast to bits can never fail, so we will force the cast to bytes to also be
1192 * infallible; casts to other sizes are allowed to fail to prevent data loss */
1193 if (elem_size == 8) {
1194 KDataBufferSub(&sblob->data, &rslt->data, 0, UINT64_MAX);
1195 /* We can't shrink the data and KDataBufferCast won't increase the number of bits
1196 * but we know that KDataBuffer can't allocate anything other than whole bytes
1197 * so we're forcing the conversion to bytes manually. */
1198 rslt->data.elem_count = KDataBufferBytes(&rslt->data);
1199 rslt->data.elem_bits = 8;
1200 rc = 0;
1201 }
1202 else /* if elem_size == 1 this will always work */
1203 rc = KDataBufferCast(&sblob->data, &rslt->data, elem_size, false);
1204 }
1205 VBlobHeaderRelease(hdr);
1206
1207 return rc;
1208 }
1209
1210 static
VFunctionProdCallBlobFuncDecoding(VFunctionProd * self,VBlob * rslt,int64_t id,const VXformInfo * info,const VBlob * sblob)1211 rc_t VFunctionProdCallBlobFuncDecoding( VFunctionProd *self, VBlob *rslt,
1212 int64_t id, const VXformInfo *info, const VBlob *sblob ) {
1213 VBlobHeader *hdr;
1214 rc_t rc;
1215 uint32_t elem_size = VTypedescSizeof(&self->dad.desc);
1216
1217 if (sblob->headers == NULL) {
1218 /* v1 blobs don't have headers, but v1 blobs
1219 * are fixed row-length so we know the data size
1220 * we are relying on the blob deserialization code
1221 * to set the page map up correctly */
1222 if (sblob->pm == NULL) {
1223 hdr = BlobHeadersCreateDummyHeader(0, 0, 0, (sblob->data.elem_bits * sblob->data.elem_count + 7) >> 3);
1224 }
1225 else
1226 hdr = BlobHeadersCreateDummyHeader(0, 0, 0, BlobRowCount(sblob) * PageMapGetIdxRowInfo(sblob->pm, 0, 0, NULL));
1227 /* leave rslt->headers null so that the next
1228 * stage will also create a dummy header */
1229 }
1230 else {
1231 /* rslt gets the headers for the next stage in decoding */
1232 rslt->headers = (BlobHeaders *)BlobHeadersGetNextFrame(sblob->headers);
1233 BlobHeadersAddRef(rslt->headers);
1234
1235 /* get the headers for this stage in decoding */
1236 hdr = BlobHeadersGetHeader(sblob->headers);
1237 }
1238 if ( hdr == NULL )
1239 rc = RC(rcVDB, rcFunction, rcExecuting, rcMemory, rcExhausted);
1240 else if ((VBlobHeaderFlags(hdr) & 1) != 0)
1241 {
1242 /* compression was skipped */
1243 VBlobHeaderRelease(hdr);
1244 return KDataBufferCast(&sblob->data, &rslt->data, elem_size, true);
1245 }
1246 else
1247 {
1248 rc = KDataBufferMakeBytes(&rslt->data, VBlobHeaderSourceSize(hdr));
1249 if (rc == 0) {
1250 VBlobData src;
1251 VBlobResult dst;
1252
1253 dst.header = NULL;
1254
1255 src.data = sblob -> data.base;
1256 src.elem_count = sblob -> data.elem_count;
1257 src.elem_bits = sblob -> data.elem_bits;
1258 src.byte_order = sblob -> byte_order;
1259
1260 dst.data = rslt -> data.base;
1261 dst.elem_count = (rslt -> data.elem_count << 3) / elem_size;
1262 dst.elem_bits = elem_size;
1263 dst.byte_order = sblob -> byte_order;
1264
1265 rc = self->u.bf(self->fself, info, &dst, &src, hdr);
1266
1267 if (rc == 0) {
1268 if ( dst.header != NULL && dst.header != hdr ) {
1269 /* only allow replacement of headers when encoding */
1270 VBlobHeaderRelease ( dst.header );
1271 }
1272
1273 rslt->data.elem_bits = dst.elem_bits;
1274 rslt->data.elem_count = dst.elem_count;
1275 rslt->byte_order = dst.byte_order;
1276
1277 rc = KDataBufferCast(&rslt->data, &rslt->data, elem_size, true);
1278 }
1279 }
1280 VBlobHeaderRelease(hdr);
1281 }
1282
1283 return rc;
1284 }
1285
1286 static
VFunctionProdCallBlobFunc(VFunctionProd * self,VBlob ** prslt,int64_t id,const VXformInfo * info,Vector * args)1287 rc_t VFunctionProdCallBlobFunc( VFunctionProd *self, VBlob **prslt,
1288 int64_t id, const VXformInfo *info, Vector *args ) {
1289 VBlob *rslt = 0;
1290 VBlob *sblob;
1291 rc_t rc;
1292
1293 sblob = VectorGet(args, 0);
1294 assert(sblob);
1295 if(self->dad.chain == chainEncoding){
1296 VBlobAddRef(sblob);
1297 if(sblob->headers==NULL)/**first in encryption chain***/
1298 VBlobPageMapOptimize(&sblob); /** try to optimize the blob **/
1299 }
1300
1301 #if PROD_NAME
1302 rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, self->dad.name);
1303 #else
1304 rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, "VFunctionProdCallBlobFunc");
1305 #endif
1306 if (rc)
1307 return rc;
1308
1309 TRACK_BLOB(VBlobNew,rslt);
1310
1311 /* blob funcs are not allowed to change page maps */
1312 rslt->pm = sblob->pm;
1313 PageMapAddRef(rslt->pm);
1314
1315 rslt->byte_order = sblob->byte_order;
1316
1317 if (self->dad.chain == chainEncoding){
1318 rc = VFunctionProdCallBlobFuncEncoding(self, rslt, id, info, sblob);
1319 vblob_release( sblob, NULL );
1320 } else {
1321 rc = VFunctionProdCallBlobFuncDecoding(self, rslt, id, info, sblob);
1322 }
1323
1324 if (rc == 0) {
1325 *prslt = rslt;
1326 return 0;
1327 }
1328 vblob_release( rslt, NULL );
1329 return rc;
1330 }
1331
1332 static
VFunctionProdCallBlobNFunc(VFunctionProd * self,VBlob ** rslt,int64_t id,const VXformInfo * info,Vector * args)1333 rc_t VFunctionProdCallBlobNFunc( VFunctionProd *self, VBlob **rslt,
1334 int64_t id, const VXformInfo *info, Vector *args ) {
1335 const VBlob *on_stack[16];
1336 const VBlob **on_heap;
1337 const VBlob **argv;
1338 int argc = VectorLength(args);
1339 rc_t rc;
1340
1341 VECTOR_TO_ARRAY(argc, argv, on_stack, on_heap, args);
1342 {
1343 int i;
1344 for(i=0;i<argc;i++){
1345 VBlob const *vb=argv[i];
1346 if(vb->pm == NULL){
1347 rc=PageMapProcessGetPagemap( VCursorPageMapProcessRequest ( self -> curs ),(PageMap**)(&vb->pm));
1348 if(rc != 0) return rc;
1349 }
1350 }
1351 }
1352 rc = self->u.bfN(self->fself, info, id, rslt, argc, argv);
1353 if ( on_heap )
1354 free( (void*) on_heap );
1355 return rc;
1356 }
1357
1358 static
VFunctionProdCallLegacyBlobFunc(VFunctionProd * self,VBlob ** prslt,int64_t id,const VXformInfo * info,Vector * args)1359 rc_t VFunctionProdCallLegacyBlobFunc( VFunctionProd *self, VBlob **prslt,
1360 int64_t id, const VXformInfo *info, Vector *args ) {
1361 VBlob *rslt = 0;
1362 VBlob *sblob;
1363 VNoHdrBlobFunc func = (VNoHdrBlobFunc)self->u.bf;
1364 rc_t rc;
1365 uint32_t elem_size = VTypedescSizeof(&self->dad.desc);
1366
1367 sblob = VectorGet(args, 0);
1368 assert(sblob);
1369
1370 #if PROD_NAME
1371 rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, self->dad.name);
1372 #else
1373 rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, "VFunctionProdCallLegacyBlobFunc");
1374 #endif
1375 TRACK_BLOB(VBlobNew,rslt);
1376 if (rc == 0) {
1377 rc = KDataBufferMakeBytes(&rslt->data, 0);
1378 if (rc == 0) {
1379 VLegacyBlobResult dst;
1380 dst.dst = & rslt -> data;
1381 dst.byte_order = vboLittleEndian;
1382 rc = func(self->fself,
1383 info,
1384 &dst,
1385 &sblob->data
1386 );
1387
1388 if (rc == 0)
1389 {
1390 rslt->byte_order = dst.byte_order;
1391
1392 rc = KDataBufferCast(&rslt->data, &rslt->data, elem_size, true);
1393 if (rc == 0) {
1394 rslt->pm = sblob->pm;
1395 PageMapAddRef(rslt->pm);
1396
1397 *prslt = rslt;
1398 return 0;
1399 }
1400 }
1401 }
1402
1403 vblob_release( rslt, NULL );
1404 }
1405 return rc;
1406 }
1407
1408 static
VFunctionProdCallByteswap(VFunctionProd * self,VBlob ** vblob,int64_t id,const VXformInfo * info,Vector * args)1409 rc_t VFunctionProdCallByteswap ( VFunctionProd *self, VBlob **vblob,
1410 int64_t id, const VXformInfo *info, Vector *args )
1411 {
1412 /* get single input blob */
1413 VBlob *blob = VectorFirst ( args );
1414 rc_t rc;
1415
1416 assert ( blob != NULL );
1417
1418 #if PROD_CACHE
1419 VProductionFlushCacheDeep ( & self -> dad, "byteswap" );
1420 #endif
1421
1422 /* CAST */
1423 rc = KDataBufferCast ( & blob->data, & blob->data,
1424 self->dad.desc.intrinsic_bits * self->dad.desc.intrinsic_dim,
1425 false );
1426 if ( rc == 0 )
1427 {
1428 /* legacy blob check
1429 * repair missing pagemap
1430 */
1431 if (blob->pm == NULL) {
1432 uint64_t row_count = BlobRowCount ( blob );
1433 if ( row_count == 0 || blob->data.elem_count % row_count != 0)
1434 rc = RC(rcVDB, rcBlob, rcReading, rcBlob, rcCorrupt);
1435 else {
1436 uint64_t row_len = blob->data.elem_count / row_count;
1437 rc = PageMapNewFixedRowLength(&blob->pm, row_count, row_len);
1438 }
1439 }
1440 }
1441
1442 if ( rc != 0)
1443 return rc;
1444
1445 /* check for byteswapping function */
1446 if ( self -> u.bswap != NULL )
1447 {
1448
1449 if ( blob -> byte_order ==
1450 #if __BYTE_ORDER == __LITTLE_ENDIAN
1451 vboBigEndian
1452 #else
1453 vboLittleEndian
1454 #endif
1455 )
1456 {
1457 uint32_t int_size;
1458 uint64_t blob_bits;
1459
1460 /* make writable */
1461 KDataBuffer buffer;
1462
1463 rc = KDataBufferMakeWritable ( & blob -> data, & buffer );
1464 if ( rc != 0 )
1465 return rc;
1466
1467 /* invoke byte-swap function on input */
1468 blob_bits = KDataBufferBits ( & buffer );
1469 int_size = self -> dad.desc .intrinsic_bits;
1470 ( * self -> u.bswap ) ( buffer.base, buffer.base,
1471 ( uint32_t ) ( blob_bits / int_size ) );
1472
1473 /* poke bytes back into blob */
1474 KDataBufferWhack ( & blob -> data );
1475 blob -> data = buffer;
1476 }
1477 }
1478
1479 blob -> byte_order = vboNative;
1480 *vblob = blob;
1481
1482 (void)VBlobAddRef ( blob );
1483 TRACK_BLOB( VBlobAddRef, blob );
1484
1485 return 0;
1486 }
1487
1488 #if _DEBUGGING
1489
1490 static
VFunctionProdCallCompare1(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1491 rc_t VFunctionProdCallCompare1(VFunctionProd *self, VBlob **vblob, int64_t id, uint32_t cnt) {
1492 VBlob *orig;
1493 rc_t rc;
1494
1495 *vblob = NULL;
1496 assert(VectorLength(&self->parms) == 2);
1497 rc = VProductionReadBlob((const VProduction *)VectorGet(&self->parms, 0), &orig, &id, cnt, NULL);
1498 if (rc == 0) {
1499 int64_t i;
1500 PageMapIterator oi;
1501 VRowData orig_data;
1502 const VProduction *test_prod = VectorGet(&self->parms, 1);
1503
1504 memset(&orig_data, 0, sizeof(orig_data));
1505 orig_data.u.data.base = orig->data.base;
1506 orig_data.u.data.elem_bits = orig->data.elem_bits;
1507
1508 PageMapNewIterator(orig->pm, &oi, 0, -1);
1509
1510 for (i = orig->start_id; i <= orig->stop_id; ++i) {
1511 VBlob *test;
1512 uint32_t j;
1513
1514 j = PageMapIteratorDataLength(&oi);
1515
1516 rc = VProductionReadBlob(test_prod, &test, &i, 1, NULL);
1517 if (rc == 0) {
1518 if (orig->data.elem_bits != test->data.elem_bits || orig->byte_order != test->byte_order)
1519 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1520 else {
1521 PageMapIterator ti;
1522 VRowData test_data;
1523
1524 memset(&test_data, 0, sizeof(test_data));
1525 test_data.u.data.base = test->data.base;
1526 test_data.u.data.elem_bits = test->data.elem_bits;
1527
1528 PageMapNewIterator(test->pm, &ti, 0, -1);
1529
1530 if (!PageMapIteratorAdvance(&ti, (uint32_t)(i - test->start_id))) {
1531 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1532 }
1533 else {
1534 uint32_t k = PageMapIteratorDataLength(&ti);
1535 orig_data.u.data.elem_count = test_data.u.data.elem_count = j;
1536
1537 orig_data.u.data.first_elem = (orig->data.bit_offset / orig->data.elem_bits) + PageMapIteratorDataOffset(&oi);
1538 test_data.u.data.first_elem = (test->data.bit_offset / test->data.elem_bits) + PageMapIteratorDataOffset(&ti);
1539
1540 if (j != k) {
1541 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1542 } else {
1543 rc = self->u.cf(self->fself, &orig_data, &test_data);
1544 if (rc) {
1545 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1546 }
1547 }
1548 if (rc) {
1549 const uint8_t *a = orig_data.u.data.base;
1550 const uint8_t *b = test_data.u.data.base;
1551 unsigned count;
1552 unsigned k;
1553 unsigned m;
1554 char f, ax[9 + 16 * 4 + 1], bx[9 + 16 * 4 + 1];
1555 char av[16], bv[16];
1556
1557 a += (orig_data.u.data.first_elem * orig_data.u.data.elem_bits) >> 3;
1558 b += (test_data.u.data.first_elem * test_data.u.data.elem_bits) >> 3;
1559 /* show up to a row of data before */
1560 count = a - (const uint8_t*)orig_data.u.data.base;
1561 count = count < b - (const uint8_t*)orig_data.u.data.base ? count : b - (const uint8_t*)orig_data.u.data.base;
1562 count = count > 16 ? 16 : count;
1563 a -= count;
1564 b -= count;
1565
1566 count += (j * orig->data.elem_bits + 7) >> 3;
1567
1568 for (k = 0, m = 0; k != count; ++k) {
1569 if (m == 0) {
1570 sprintf(ax, "%08X>", k);
1571 sprintf(bx, "%08X<", k);
1572 }
1573 f = a[k] == b[k] ? ' ': '*';
1574 sprintf(ax + m * 4 + 9, " %02x%c", a[k], f);
1575 av[m] = isprint(a[k]) ? a[k] : '.';
1576 sprintf(bx + m * 4 + 9, " %02x%c", b[k], f);
1577 bv[m] = isprint(b[k]) ? b[k] : '.';
1578 m++;
1579 if(m == 16 || k == count - 1) {
1580 DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%-73s '%.*s'\n%-73s '%.*s'\n\n", ax, m, av, bx, m, bv));
1581 m = 0;
1582 }
1583 }
1584 }
1585 }
1586 }
1587 vblob_release(test, NULL);
1588 if (rc)
1589 break; }
1590 else
1591 break;
1592 PageMapIteratorAdvance(&oi, 1);
1593 }
1594 vblob_release(orig, NULL);
1595 }
1596 return rc;
1597 }
1598 #endif
1599
1600 static
VFunctionProdCallCompare(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1601 rc_t VFunctionProdCallCompare( VFunctionProd *self, VBlob **vblob, int64_t id, uint32_t cnt ) {
1602 VBlob *orig;
1603 rc_t rc;
1604 VProduction const *orig_prod;
1605
1606 *vblob = NULL;
1607 assert(VectorLength(&self->parms) == 2);
1608 orig_prod = (const VProduction *)VectorGet(&self->parms, 0);
1609 rc = VProductionReadBlob(orig_prod, &orig, &id, cnt, NULL);
1610 if (rc == 0) {
1611 VBlob *test;
1612 const VProduction *test_prod = VectorGet(&self->parms, 1);
1613
1614 rc = VProductionReadBlob(test_prod, &test, &id, cnt, NULL);
1615 if (rc == 0) {
1616 if (orig->data.elem_bits != test->data.elem_bits || orig->byte_order != test->byte_order){
1617 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1618 } else if( orig->pm->data_recs != 1 /*** catching static **/
1619 || test->pm->data_recs != 1 /*** trying quick comparison **/
1620 || orig->data.elem_count != test->data.elem_count
1621 || memcmp( orig->data.base, test->data.base, (orig->data.elem_bits*orig->data.elem_count+7)/8)){
1622 uint64_t i;
1623 PageMapIterator oi;
1624 PageMapIterator ti;
1625 VRowData orig_data;
1626 VRowData test_data;
1627
1628 memset(&orig_data, 0, sizeof(orig_data));
1629 orig_data.u.data.base = orig->data.base;
1630 orig_data.u.data.elem_bits = orig->data.elem_bits;
1631
1632 memset(&test_data, 0, sizeof(test_data));
1633 test_data.u.data.base = test->data.base;
1634 test_data.u.data.elem_bits = test->data.elem_bits;
1635
1636 PageMapNewIterator(orig->pm, &oi, 0, -1);
1637 PageMapNewIterator(test->pm, &ti, 0, -1);
1638 if (test->start_id < orig->start_id) {
1639 if ( !PageMapIteratorAdvance( &ti, (uint32_t)( orig->start_id - test->start_id ) ) ) {
1640 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1641 DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: page map mismatch at row %li\n", self->dad.name, id));
1642 }
1643 }
1644
1645 for (i = orig->start_id; rc == 0; ) {
1646 uint32_t elem_count;
1647 uint64_t prev_i = i;
1648
1649 orig_data.u.data.first_elem = (orig->data.bit_offset / orig->data.elem_bits) + PageMapIteratorDataOffset(&oi);
1650 test_data.u.data.first_elem = (test->data.bit_offset / test->data.elem_bits) + PageMapIteratorDataOffset(&ti);
1651
1652 for (elem_count = 0; ; ) {
1653 bool done = false;
1654 uint32_t j;
1655 uint32_t k;
1656
1657 j = PageMapIteratorDataLength(&oi);
1658 k = PageMapIteratorDataLength(&ti);
1659 if (j != k) {
1660 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1661 DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: length mismatch at row %li ( original=%u, test=%u )\n", self->dad.name, i, j, k));
1662 break;
1663 }
1664 elem_count += j;
1665
1666 j = PageMapIteratorRepeatCount(&oi);
1667 k = PageMapIteratorRepeatCount(&ti);
1668 if (j != k) {
1669 done = true;
1670 if (j > k)
1671 j = k;
1672 }
1673 if (PageMapIteratorAdvance(&ti, j) != PageMapIteratorAdvance(&oi, j)) {
1674 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1675 DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: page map corrupt at row %li\n", self->dad.name, i));
1676 break;
1677 }
1678 i += j;
1679 if ( done || (int64_t)i > orig->stop_id || test->pm->random_access || orig->pm->random_access || !PageMapIteratorAdvance( &ti, 0 ) )
1680 break;
1681 }
1682 if (rc)
1683 break;
1684 if ( (int64_t)i > ( orig->stop_id + 1 ) ) {
1685 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1686 (void)prev_i; /* shut up warning when not printing debug msg */
1687 DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: page map has too many rows at row %li\n", self->dad.name, prev_i));
1688 break;
1689 }
1690
1691 orig_data.u.data.elem_count = test_data.u.data.elem_count = elem_count;
1692
1693 rc = self->u.cf(self->fself, &orig_data, &test_data);
1694 if (rc) {
1695 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1696 DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: data mismatch at row %li\n", self->dad.name, prev_i));
1697 break;
1698 }
1699 if ((int64_t)i > orig->stop_id )
1700 break;
1701
1702 /* check to see if the test iterator is at end
1703 * and if so, fetch next blob */
1704 if (!PageMapIteratorAdvance(&ti, 0)) {
1705 VBlob *temp;
1706 int64_t row = i;
1707 rc = VProductionReadBlob(test_prod, &temp, &row, orig->stop_id - row, NULL);
1708 if (rc == 0) {
1709 vblob_release(test, NULL);
1710 test = temp;
1711 test_data.u.data.base = test->data.base;
1712 PageMapNewIterator(test->pm, &ti, 0, -1);
1713 if ( test->start_id < row ) {
1714 if ( !PageMapIteratorAdvance( &ti, (uint32_t)( row - test->start_id ) ) ) {
1715 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1716 DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: page map mismatch at row %li\n", self->dad.name, row));
1717 }
1718 }
1719 }
1720 }
1721 }
1722 }
1723 vblob_release(test, NULL);
1724 }
1725 vblob_release(orig, NULL);
1726 }
1727 return rc;
1728 }
1729
1730 typedef struct fetch_param_blob_data fetch_param_blob_data;
1731 struct fetch_param_blob_data
1732 {
1733 int64_t id;
1734 uint32_t cnt;
1735 int64_t range_start_id;
1736 int64_t range_stop_id;
1737 Vector *inputs;
1738 VBlob *vblob;
1739 rc_t rc;
1740 bool no_cache;
1741 };
1742
fetch_param_blob_data_init(fetch_param_blob_data * pb,int64_t id,uint32_t cnt,Vector * inputs)1743 static void fetch_param_blob_data_init(fetch_param_blob_data *pb,int64_t id,uint32_t cnt,Vector *inputs)
1744 {
1745 pb->id = id;
1746 pb->cnt = cnt;
1747 pb->inputs = inputs;
1748 pb->rc = 0;
1749 pb->vblob = NULL;
1750 pb->range_start_id=INT64_MIN;
1751 pb->range_stop_id =INT64_MAX;
1752 pb->no_cache = false;
1753 }
1754 static
fetch_param_blob(void * item,void * data)1755 bool CC fetch_param_blob ( void *item, void *data )
1756 {
1757 fetch_param_blob_data *pb = data;
1758 VBlob *blob;
1759
1760 pb -> rc = VProductionReadBlob ( item, & blob, & pb -> id , pb -> cnt, NULL);
1761 if ( pb -> rc == 0 )
1762 {
1763 pb -> rc = VectorAppend ( pb -> inputs, NULL, blob );
1764 if ( pb -> rc == 0 ) {
1765 pb->no_cache |= blob->no_cache;
1766 if(blob->start_id > pb->range_start_id) pb->range_start_id=blob->start_id;
1767 if(blob->stop_id < pb->range_stop_id) pb->range_stop_id =blob->stop_id;
1768 return false;
1769 }
1770 vblob_release ( blob, NULL );
1771 }
1772
1773 return true;
1774 }
1775
1776 static
fetch_first_param_blob(void * item,void * data)1777 bool CC fetch_first_param_blob ( void *item, void *data )
1778 {
1779 fetch_param_blob_data *pb = data;
1780
1781 pb -> rc = VProductionReadBlob ( item, &pb->vblob, &pb -> id , pb -> cnt, NULL);
1782 if (GetRCState(pb->rc) == rcNotFound)
1783 return false;
1784 if ( pb -> vblob -> data.elem_count == 0 )
1785 return false;
1786 pb->range_start_id=pb->vblob->start_id;
1787 pb->range_stop_id =pb->vblob->stop_id;
1788 return true;
1789 }
1790
1791 static
VFunctionProdSelect(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1792 rc_t VFunctionProdSelect ( VFunctionProd *self, VBlob **vblob, int64_t id, uint32_t cnt ) {
1793 fetch_param_blob_data pb;
1794 fetch_param_blob_data_init(&pb,id,cnt,NULL);
1795 VectorDoUntil ( & self -> parms, false, fetch_first_param_blob, & pb );
1796 * vblob = pb.vblob;
1797 return pb.rc;
1798 }
1799
1800 static
VFunctionProdPassThrough(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1801 rc_t VFunctionProdPassThrough ( VFunctionProd *self, VBlob **vblob, int64_t id, uint32_t cnt ) {
1802 assert(VectorLength(&self->parms) == 1);
1803 return VProductionReadBlob(VectorGet(&self->parms, 0), vblob, &id, cnt, NULL);
1804 }
1805
VFunctionProdReadNormal(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1806 static rc_t VFunctionProdReadNormal ( VFunctionProd *self, VBlob **vblob, int64_t id ,uint32_t cnt)
1807 {
1808 rc_t rc;
1809 Vector inputs;
1810 fetch_param_blob_data pb;
1811 VBlob *vb=NULL;
1812 int64_t id_run;
1813 int64_t cnt_run;
1814
1815 /* fill out information for function to use */
1816 const VCursor *curs = self -> curs;
1817 VXformInfo info;
1818
1819 if(cnt == 0) cnt = 1;
1820
1821 #if VMGR_PASSED_TO_XFORM
1822 info . mgr = curs -> tbl -> mgr;
1823 #endif
1824 #if VSCHEMA_PASSED_TO_XFORM
1825 info . schema = curs -> schema;
1826 #endif
1827 #if VTABLE_PASSED_TO_XFORM
1828 info . tbl = VCursorGetTable ( curs );
1829 #endif
1830 #if VPRODUCTION_PASSED_TO_XFORM
1831 info . prod = & self -> dad;
1832 #endif
1833 info . fdesc . fd = self -> dad . fd;
1834 info . fdesc . desc = self -> dad . desc;
1835 *vblob = NULL;
1836
1837 if (self->dad.sub == prodFuncBuiltInCompare) {
1838 rc = VFunctionProdCallCompare(self, vblob, id, cnt);
1839 #if _DEBUGGING
1840 if (rc != 0)
1841 rc = VFunctionProdCallCompare1(self, vblob, id, cnt);
1842 #endif
1843 return rc;
1844 }
1845
1846 /* all other functions take some form of blob input */
1847 VectorInit ( & inputs, 0, VectorLength ( & self -> parms ) );
1848 fetch_param_blob_data_init(&pb,id,cnt,&inputs);
1849 if ( VectorDoUntil ( & self -> parms, false, fetch_param_blob, & pb ) )
1850 rc = pb . rc;
1851 else for( id_run=id, cnt_run=cnt, rc=0; cnt_run > 0 && rc==0;)
1852 {
1853 switch ( self -> dad . sub )
1854 {
1855 case vftLegacyBlob:
1856 rc = VFunctionProdCallLegacyBlobFunc ( self, &vb, id_run, & info, & inputs );
1857 break;
1858 case vftNonDetRow:
1859 rc = VFunctionProdCallNDRowFunc ( self, &vb, id_run, & info, & inputs );
1860 break;
1861 case vftRow:
1862 case vftRowFast:
1863 case vftIdDepRow:
1864 rc = VFunctionProdCallRowFunc ( self, &vb, id_run, cnt_run, & info, & inputs, pb.range_start_id,pb.range_stop_id );
1865 break;
1866 case vftArray:
1867 rc = VFunctionProdCallArrayFunc ( self, &vb, id_run, & info, & inputs );
1868 break;
1869 case vftFixedRow:
1870 rc = VFunctionProdCallPageFunc ( self, &vb, id_run, & info, & inputs );
1871 break;
1872 case vftBlob:
1873 rc = VFunctionProdCallBlobFunc ( self, &vb, id_run, & info, & inputs );
1874 break;
1875 case vftBlobN:
1876 rc = VFunctionProdCallBlobNFunc ( self, &vb, id_run, & info, & inputs );
1877 break;
1878 case prodFuncByteswap:
1879 rc = VFunctionProdCallByteswap ( self, &vb, id_run, & info, & inputs );
1880 break;
1881 default:
1882 rc = RC ( rcVDB, rcFunction, rcReading, rcProduction, rcCorrupt );
1883 }
1884 if (rc == 0) {
1885 if (vb == NULL) {
1886 rc = RC ( rcVDB, rcFunction, rcReading, rcProduction, rcNull );
1887 }
1888 else {
1889 if (vb -> start_id > id_run || vb -> stop_id < id_run) { /*** shoudn't happen ***/
1890 rc = RC ( rcVDB, rcBlob, rcReading, rcRange, rcInsufficient );
1891 }
1892 if (*vblob == NULL) {
1893 *vblob=vb;
1894 }
1895 else {
1896 if (vb -> start_id <= id) {/** new blob is not appendable, but can replace the current one **/
1897 vblob_release(*vblob, NULL);
1898 *vblob = vb;
1899 }
1900 else {
1901 /*** append here **/
1902 rc = VBlobAppend(*vblob, vb);
1903 vblob_release(vb, NULL);
1904 }
1905 }
1906 /* propagate dirty flag */
1907 (*vblob)->no_cache |= pb.no_cache;
1908 if( (*vblob) -> stop_id >= id + cnt - 1)
1909 break;
1910
1911 id_run = (*vblob) -> stop_id + 1;
1912 cnt_run = id + cnt - id_run;
1913
1914 }
1915 }
1916 else if (id < id_run) {
1917 /* if there is reblobbing and our result blob already contains some
1918 * data, then return that data and not error out */
1919 rc = 0;
1920 break;
1921 }
1922 }
1923 /* drop input blobs */
1924 VectorWhack ( & inputs, vblob_release, NULL );
1925 return rc;
1926 }
1927
VFunctionProdRead(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1928 rc_t VFunctionProdRead ( VFunctionProd *self, VBlob **vblob, int64_t id , uint32_t cnt)
1929 {
1930 if ( self -> dad . sub == vftSelect )
1931 return VFunctionProdSelect ( self, vblob, id , cnt);
1932 if ( self -> dad . sub == vftPassThrough )
1933 return VFunctionProdPassThrough ( self, vblob, id , cnt);
1934 return VFunctionProdReadNormal(self, vblob, id, cnt);
1935 }
1936
1937 typedef struct fetch_param_IdRange_data fetch_param_IdRange_data;
1938 struct fetch_param_IdRange_data
1939 {
1940 int64_t first;
1941 int64_t last;
1942 rc_t rc;
1943 bool first_time;
1944 };
1945
1946 static
fetch_param_IdRange(void * item,void * data)1947 bool CC fetch_param_IdRange ( void *item, void *data )
1948 {
1949 fetch_param_IdRange_data *pb = data;
1950 int64_t first;
1951 int64_t last;
1952 rc_t rc;
1953
1954 rc = VProductionColumnIdRange(item, &first, &last);
1955 if (GetRCState(rc) == rcEmpty && GetRCObject(rc) == rcRange)
1956 return false;
1957
1958 pb->rc = rc;
1959 if (rc == 0 )
1960 {
1961 if (pb->first_time || first < pb->first)
1962 pb->first = first;
1963 if (pb->first_time || last > pb->last)
1964 pb->last = last;
1965 pb->first_time = false;
1966 return false;
1967 }
1968
1969 return true;
1970 }
1971
VFunctionProdColumnIdRange(const VFunctionProd * self,int64_t * first,int64_t * last)1972 LIB_EXPORT rc_t CC VFunctionProdColumnIdRange ( const VFunctionProd *self, int64_t *first, int64_t *last )
1973 {
1974 fetch_param_IdRange_data pb;
1975
1976 pb.first_time = true;
1977 pb . rc = 0;
1978 pb.first = 1;
1979 pb.last = 0;
1980
1981 VectorDoUntil ( & self -> parms, false, fetch_param_IdRange, & pb );
1982
1983 if (pb.rc == 0) {
1984 #if 0
1985 /* this causes problems in the loaders */
1986 if(pb.first_time){ /** no parameters - some function which generated data; f.e. meta_value() ***/
1987 pb.last = INT64_MAX;
1988 }
1989 #endif
1990 *first = pb.first;
1991 *last = pb.last;
1992 }
1993 return pb . rc;
1994 }
1995
1996 typedef struct fetch_param_FixedRowLength_data fetch_param_FixedRowLength_data;
1997 struct fetch_param_FixedRowLength_data
1998 {
1999 uint32_t length;
2000 int64_t row_id;
2001 bool first_time;
2002 };
2003
2004 static
fetch_param_FixedRowLength(void * item,void * data)2005 bool CC fetch_param_FixedRowLength ( void *item, void *data )
2006 {
2007 fetch_param_FixedRowLength_data *pb = data;
2008 uint32_t length;
2009
2010 if (((const VProduction *)item)->control == false) {
2011 length = VProductionFixedRowLength(item, pb->row_id, false);
2012
2013 if (pb->first_time)
2014 pb->length = length;
2015
2016 pb->first_time = false;
2017
2018 if (length == 0 || length != pb->length)
2019 return true;
2020 }
2021 return false;
2022 }
2023
2024 static
VFunctionProdFixedRowLength(const VFunctionProd * self,int64_t row_id,bool ignore_self)2025 uint32_t VFunctionProdFixedRowLength ( const VFunctionProd *self, int64_t row_id, bool ignore_self )
2026 {
2027 fetch_param_FixedRowLength_data pb;
2028
2029 if ( ! ignore_self )
2030 {
2031 switch ( self -> dad . sub )
2032 {
2033 case vftRow:
2034 case vftRowFast:
2035 case vftNonDetRow:
2036 case vftIdDepRow:
2037 return 0;
2038 }
2039 }
2040
2041 pb.first_time = true;
2042 pb.length = 0;
2043
2044 VectorDoUntil ( & self -> parms, false, fetch_param_FixedRowLength, & pb );
2045
2046 return pb.length;
2047 }
2048
2049
2050 /*--------------------------------------------------------------------------
2051 * VScriptProd
2052 */
2053
VScriptProdMake(VScriptProd ** prodp,Vector * owned,struct VCursor const * curs,int sub,const char * name,const VFormatdecl * fd,const VTypedesc * desc,uint8_t chain)2054 rc_t VScriptProdMake ( VScriptProd **prodp, Vector *owned, struct VCursor const *curs,
2055 int sub, const char *name, const VFormatdecl *fd,
2056 const VTypedesc *desc, uint8_t chain )
2057 {
2058 VScriptProd *prod;
2059 rc_t rc = VProductionMake ( ( VProduction** ) prodp, owned, sizeof * prod,
2060 prodScript, sub, name, fd, desc, NULL, chain );
2061 if ( rc == 0 )
2062 {
2063 prod = * prodp;
2064 prod -> curs = curs;
2065 VectorInit ( & prod -> owned, 0, 4 );
2066 }
2067 return rc;
2068 }
2069
VScriptProdDestroy(VScriptProd * self)2070 void VScriptProdDestroy ( VScriptProd *self )
2071 {
2072 VectorWhack ( & self -> owned, VProductionWhack, NULL );
2073 }
2074
2075
2076 /* Read
2077 */
VScriptProdRead(VScriptProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)2078 rc_t VScriptProdRead ( VScriptProd *self, VBlob **vblob, int64_t id,uint32_t cnt )
2079 {
2080 return VProductionReadBlob ( self -> rtn, vblob, &id , cnt, NULL);
2081 }
2082
VScriptProdColumnIdRange(const VScriptProd * self,int64_t * first,int64_t * last)2083 static rc_t VScriptProdColumnIdRange ( const VScriptProd *self, int64_t *first, int64_t *last )
2084 {
2085 return VProductionColumnIdRange(self->rtn, first, last);
2086 }
2087
VScriptProdFixedRowLength(const VScriptProd * self,int64_t row_id)2088 static uint32_t VScriptProdFixedRowLength ( const VScriptProd *self, int64_t row_id )
2089 {
2090 return VProductionFixedRowLength(self->rtn, row_id, false);
2091 }
2092
2093 /*--------------------------------------------------------------------------
2094 * VPivotProd
2095 * potentially pivots to a new row-id space
2096 */
2097
VPivotProdMake(VPivotProd ** p_prodp,Vector * p_owned,VProduction * p_member,VProduction * p_row_id,const char * p_name,int p_chain)2098 rc_t VPivotProdMake ( VPivotProd ** p_prodp,
2099 Vector * p_owned,
2100 VProduction * p_member,
2101 VProduction * p_row_id,
2102 const char * p_name,
2103 int p_chain )
2104 {
2105 VPivotProd * prod;
2106 VFormatdecl fd = { { 0, 0 }, 0 };
2107 VTypedesc desc = { 64, 1, vtdInt };
2108 rc_t rc = VProductionMake ( ( VProduction** ) p_prodp, p_owned, sizeof * prod,
2109 prodPivot, 0, p_name, & fd, & desc, NULL, p_chain );
2110 if ( rc == 0 )
2111 {
2112 prod = * p_prodp;
2113 prod -> member = p_member;
2114 prod -> row_id = p_row_id;
2115 }
2116 return rc;
2117 }
2118
VPivotProdDestroy(VPivotProd * p_self)2119 void VPivotProdDestroy ( VPivotProd * p_self )
2120 {
2121 }
2122
2123 /* Read
2124 */
VPivotProdRead(VPivotProd * p_self,struct VBlob ** p_vblob,int64_t * p_id,uint32_t p_cnt)2125 rc_t VPivotProdRead ( VPivotProd * p_self, struct VBlob ** p_vblob, int64_t * p_id, uint32_t p_cnt )
2126 {
2127 struct VBlob * rowIdBlob;
2128 rc_t rc;
2129 assert ( p_id != NULL );
2130 rc = VProductionReadBlob ( p_self -> row_id, & rowIdBlob, p_id , p_cnt, NULL);
2131 if ( rc == 0 )
2132 {
2133 uint32_t elemNum;
2134 uint32_t repeat_count;
2135 uint32_t rowLen = PageMapGetIdxRowInfo ( rowIdBlob -> pm, ( uint32_t ) ( * p_id - rowIdBlob -> start_id ), & elemNum, & repeat_count );
2136 /* assume elem size is 64 */
2137 int64_t newRowId = * ( ( int64_t* ) rowIdBlob -> data . base + elemNum );
2138
2139 assert ( rowLen == 1);
2140 assert ( repeat_count == 1);
2141
2142 vblob_release ( rowIdBlob, NULL );
2143
2144 rc = VProductionReadBlob ( p_self -> member, p_vblob, & newRowId, p_cnt, NULL);
2145 if ( rc == 0 )
2146 {
2147 /* the cache mechanism will get confused by our overwriting p_id, so turn it off */
2148 ( * p_vblob ) -> no_cache = true;
2149 * p_id = newRowId;
2150 }
2151 }
2152 return rc;
2153 }
2154
2155 /*--------------------------------------------------------------------------
2156 * VProduction
2157 */
2158
2159 /* Init
2160 * parent initialization function
2161 * called from the "Make" functions below
2162 */
2163 #if 0
2164 static
2165 void VProductionInit ( VProduction *self, int var, int sub, const char *name,
2166 const VFormatdecl *fd, const VTypedesc *desc,
2167 const VCtxId *cid, uint8_t chain )
2168 {
2169 memset ( self, 0, sizeof * self );
2170
2171 if ( fd != NULL )
2172 self -> fd = * fd;
2173 if ( desc != NULL )
2174 self -> desc = * desc;
2175 if ( cid != NULL )
2176 self -> cid = * cid;
2177
2178 self -> var = ( uint8_t ) var;
2179 self -> sub = ( uint8_t ) sub;
2180 self -> chain = chain;
2181 }
2182 #endif
2183
VProductionWhack(void * item,void * owned)2184 void CC VProductionWhack ( void *item, void *owned )
2185 {
2186 VProduction * self = item;
2187
2188 if ( self != NULL )
2189 {
2190 if ( owned != NULL)
2191 {
2192 void *ignore;
2193 VectorSwap ( owned, self -> oid, NULL, & ignore );
2194 assert ( ( void* ) self == ignore );
2195 }
2196
2197
2198 #if PROD_CACHE
2199 VProductionFlushCacheDeep ( self, "whack" );
2200 #endif
2201 switch ( self -> var )
2202 {
2203 case prodSimple:
2204 #if TRACKING_BLOBS
2205 fprintf( stderr, "VSimpleProd %p being whacked *** %s\n", self, self->name );
2206 #endif
2207 VSimpleProdDestroy ( ( VSimpleProd* ) self );
2208 break;
2209
2210 case prodFunc:
2211 #if TRACKING_BLOBS
2212 fprintf( stderr, "VFunctionProd %p being whacked *** %s\n", self, self->name );
2213 #endif
2214 VFunctionProdDestroy ( ( VFunctionProd* ) self );
2215 break;
2216
2217 case prodScript:
2218 #if TRACKING_BLOBS
2219 fprintf( stderr, "VScriptProd %p being whacked *** %s\n", self, self->name );
2220 #endif
2221 VScriptProdDestroy ( ( VScriptProd* ) self );
2222 break;
2223
2224 case prodPhysical:
2225 #if TRACKING_BLOBS
2226 fprintf( stderr, "VPhysicalProd %p being whacked *** %s\n", self, self->name );
2227 #endif
2228 VPhysicalProdDestroy ( ( VPhysicalProd* ) self );
2229 break;
2230
2231 case prodColumn:
2232 #if TRACKING_BLOBS
2233 fprintf( stderr, "VColumnProd %p being whacked *** %s\n", self, self->name );
2234 #endif
2235 VColumnProdDestroy ( ( VColumnProd* ) self );
2236 break;
2237
2238 case prodPivot:
2239 #if TRACKING_BLOBS
2240 fprintf( stderr, "VPivotProd %p being whacked *** %s\n", self, self->name );
2241 #endif
2242 VPivotProdDestroy ( ( VPivotProd* ) self );
2243 break;
2244 }
2245
2246 free ( self );
2247 }
2248 }
2249
2250 /* Cmp
2251 * Sort
2252 * compare item is a VCtxId
2253 * sort item is a VProduction
2254 * n is always a VProduction
2255 */
VProductionCmp(const void * item,const void * n)2256 LIB_EXPORT int64_t CC VProductionCmp ( const void *item, const void *n )
2257 {
2258 const VCtxId *a = item;
2259 const VProduction *b = n;
2260 return VCtxIdCmp ( a, & b -> cid );
2261 }
2262
VProductionSort(const void * item,const void * n)2263 LIB_EXPORT int64_t CC VProductionSort ( const void *item, const void *n )
2264 {
2265 const VProduction *a = item;
2266 const VProduction *b = n;
2267 return VCtxIdCmp ( & a -> cid, & b -> cid );
2268 }
2269
2270
2271 /* IdRange
2272 * obtains intersection of all physical sources
2273 *
2274 * "first" [ IN/OUT ] and "last" [ IN/OUT ] - range to intersect
2275 */
VProductionColumnIdRange(const VProduction * self,int64_t * first,int64_t * last)2276 rc_t VProductionColumnIdRange ( const VProduction *self,
2277 int64_t *first, int64_t *last )
2278 {
2279 if ( self <= FAILED_PRODUCTION )
2280 return 0;
2281
2282 switch ( self -> var )
2283 {
2284 case prodSimple:
2285 return VProductionColumnIdRange ( ( ( const VSimpleProd* ) self ) -> in, first, last );
2286 case prodFunc:
2287 return VFunctionProdColumnIdRange((const VFunctionProd *)self, first, last);
2288 case prodScript:
2289 return VScriptProdColumnIdRange((const VScriptProd *)self, first, last);
2290 case prodPhysical:
2291 return VPhysicalProdColumnIdRange((const VPhysicalProd *)self, first, last);
2292 case prodPivot:
2293 return VProductionColumnIdRange( ( (const VPivotProd *)self ) -> member, first, last);
2294 case prodColumn:
2295 return RC ( rcVDB, rcColumn, rcAccessing, rcRange, rcEmpty );
2296 }
2297
2298 return RC ( rcVDB, rcColumn, rcAccessing, rcType, rcUnknown );
2299 }
2300
VProductionPageIdRange(VProduction * self,int64_t id,int64_t * first,int64_t * last)2301 rc_t VProductionPageIdRange ( VProduction *self,
2302 int64_t id, int64_t *first, int64_t *last )
2303 {
2304 VBlob *blob;
2305 rc_t rc = VProductionReadBlob ( self, & blob, & id , 1, NULL);
2306 if ( rc == 0 )
2307 {
2308 * first = blob -> start_id;
2309 * last = blob -> stop_id;
2310
2311 vblob_release ( blob, NULL );
2312 }
2313 return rc;
2314 }
2315
2316 /* RowLength
2317 * get row length for a particular row
2318 */
VProductionRowLength(const VProduction * self,int64_t row_id)2319 uint32_t VProductionRowLength ( const VProduction *self, int64_t row_id )
2320 {
2321 uint32_t row_len;
2322
2323 VBlob *blob;
2324 rc_t rc = VProductionReadBlob ( self, & blob, & row_id, 1, NULL );
2325 if ( rc != 0 )
2326 return 0;
2327
2328 row_len = PageMapGetIdxRowInfo ( blob -> pm, (uint32_t)( row_id - blob -> start_id) , NULL, NULL );
2329
2330 vblob_release ( blob, NULL );
2331
2332 return row_len;
2333 }
2334
2335 /* FixedRowLength
2336 * get fixed row length for entire column
2337 * returns 0 if not fixed
2338 */
VProductionFixedRowLength(const VProduction * self,int64_t row_id,bool ignore_self)2339 uint32_t VProductionFixedRowLength ( const VProduction *self, int64_t row_id,bool ignore_self )
2340 {
2341 switch ( self -> var )
2342 {
2343 case prodSimple:
2344 return VProductionFixedRowLength ( ( ( const VSimpleProd* ) self ) -> in, row_id, ignore_self );
2345 case prodFunc:
2346 return VFunctionProdFixedRowLength((const VFunctionProd *)self, row_id, ignore_self);
2347 case prodScript:
2348 return VScriptProdFixedRowLength((const VScriptProd *)self, row_id);
2349 case prodPhysical:
2350 return VPhysicalProdFixedRowLength((const VPhysicalProd *)self, row_id);
2351 case prodPivot:
2352 assert(false); /*TODO*/
2353 }
2354
2355 return RC ( rcVDB, rcColumn, rcAccessing, rcType, rcUnknown );
2356 }
2357
2358 /* ReadBlob
2359 */
VProductionReadBlob(const VProduction * cself,VBlob ** vblob,int64_t * p_id,uint32_t cnt,VBlobMRUCacheCursorContext * cctx)2360 rc_t VProductionReadBlob ( const VProduction *cself, VBlob **vblob, int64_t * p_id, uint32_t cnt, VBlobMRUCacheCursorContext *cctx )
2361 {
2362 #if BYTECODE
2363
2364 rc_t rc;
2365 VProduction *self = ( VProduction* ) cself;
2366 if ( self == NULL )
2367 return RC ( rcVDB, rcProduction, rcReading, rcSelf, rcNull );
2368
2369 struct ByteCodeContext ctx;
2370 ctx . id = * p_id;
2371 ctx . cnt = cnt;
2372 ctx . cctx = cctx;
2373 ctx . result = NULL;
2374 rc = ExecuteByteCode ( bcProductionReadBlob, self, & ctx );
2375 * vblob = ctx . result;
2376 if ( rc == 0 )
2377 {
2378 * p_id = ctx . id;
2379 }
2380 return rc;
2381
2382 #else
2383
2384 rc_t rc;
2385 VProduction *self = ( VProduction* ) cself;
2386
2387 #if PROD_CACHE
2388 int i;
2389 VBlob *blob;
2390 #endif
2391
2392 * vblob = NULL;
2393
2394 /* should not be possible, but safety is cheap */
2395 if ( self == NULL )
2396 return RC ( rcVDB, rcProduction, rcReading, rcSelf, rcNull );
2397
2398 /*** Cursor-level column blobs may be 1-to-1 with production blobs ***/
2399 if(cctx != NULL && self->cctx.cache == NULL ){ /*** we are connected to read cursor **/
2400 self->cctx = *cctx; /*** remember it ***/
2401 /** No need to do anything else here - we are on "direct line" to the column ***/
2402 } else if(self->cctx.cache != NULL){
2403 /** somewhere else this production is connected to a cursor **/
2404 /** lets try to get answers from the cursor **/
2405 blob=(VBlob*) VBlobMRUCacheFind(self->cctx.cache,self->cctx.col_idx,*p_id);
2406 if(blob){
2407 rc = VBlobAddRef ( blob );
2408 if ( rc != 0 ) return rc;
2409 *vblob=blob;
2410 return 0;
2411 }
2412 }
2413
2414 #if PROD_CACHE
2415 /* check cache */
2416 for ( i = 0; i < self -> cache_cnt; ++ i )
2417 {
2418 blob = self -> cache [ i ];
2419 if ( self -> cache [ i ] != NULL )
2420 {
2421 /* check id range */
2422 if (
2423 #if USE_EUGENE
2424 /* NB - this is an approach where we always cache
2425 a blob after a read in order to keep it alive,
2426 but never allow a cache hit on retrieval */
2427 ! blob -> no_cache &&
2428 #endif
2429 * p_id >= blob -> start_id &&
2430 * p_id <= blob -> stop_id )
2431 {
2432 rc = VBlobAddRef ( blob );
2433 if ( rc != 0 )
2434 return rc;
2435 #if TRACKING_BLOBS
2436 fprintf( stderr, "%p->%p(%d) new reference to cached blob *** %s\n"
2437 , self
2438 , blob
2439 , atomic32_read ( & blob -> refcount )
2440 , self->name
2441 );
2442 #endif
2443 /* return new reference */
2444 * vblob = blob;
2445 #if PROD_CACHE > 1
2446 #if PROD_CACHE > 2
2447 /* MRU cache */
2448 if ( i > 0 )
2449 {
2450 memmove(self -> cache +1,self -> cache,i*sizeof(*self->cache));
2451 self -> cache [ 0 ] = blob;
2452 }
2453 #else
2454 if(i > 0 ){ /** trivial case ***/
2455 self -> cache [ 1 ] = self -> cache [ 0 ];
2456 self -> cache [ 0 ] = blob;
2457 }
2458 #endif
2459 #endif
2460 return 0;
2461 }
2462 }
2463 }
2464 #endif /* PROD_CACHE */
2465
2466 /* dispatch */
2467 switch ( self -> var )
2468 {
2469 case prodSimple:
2470 rc = VSimpleProdRead ( ( VSimpleProd* ) self, vblob, p_id, cnt,cctx );
2471 break;
2472 case prodFunc:
2473 rc = VFunctionProdRead ( ( VFunctionProd* ) self, vblob, * p_id , cnt);
2474 #if _DEBUGGING && PROD_NAME
2475 if ( rc != 0 )
2476 DBGMSG ( DBG_VDB, DBG_VDB_FUNCTION, ( "%s: %R\n", self -> name, rc ) );
2477 #endif
2478 break;
2479 case prodScript:
2480 rc = VScriptProdRead ( ( VScriptProd* ) self, vblob, * p_id , cnt);
2481 break;
2482 case prodPhysical:
2483 rc = VPhysicalProdRead ( ( VPhysicalProd* ) self, vblob, * p_id, cnt );
2484 break;
2485 case prodColumn:
2486 rc = VColumnProdRead ( ( VColumnProd* ) self, vblob, * p_id );
2487 break;
2488 case prodPivot:
2489 rc = VPivotProdRead ( ( VPivotProd* ) self, vblob, p_id, cnt );
2490 break;
2491 default:
2492 return RC ( rcVDB, rcProduction, rcReading, rcType, rcUnknown );
2493 }
2494
2495 #if ! PROD_CACHE
2496 return rc;
2497 #else
2498 blob = * vblob;
2499
2500 if ( rc != 0 || * vblob == NULL )
2501 return rc;
2502
2503 #if ! USE_EUGENE
2504 /* NB - there is another caching mechanism on VColumn
2505 if a blob does not want to be cached, it is rejected here */
2506 if ( ! blob -> no_cache )
2507 return 0;
2508 #endif
2509 if(cctx == NULL && self->cctx.cache != NULL && blob->stop_id > blob->start_id + 4){/** we will benefit from caching here **/
2510 VBlobMRUCacheSave(self->cctx.cache,self->cctx.col_idx,blob);
2511 return 0;
2512 }
2513
2514 if(blob->pm == NULL) return 0;
2515
2516
2517 /* cache output */
2518 rc = VBlobAddRef ( blob );
2519 if ( rc == 0 )
2520 {
2521 VBlobCheckIntegrity ( blob );
2522 #if PROD_CACHE
2523 if(self -> cache_cnt < PROD_CACHE){
2524 #if PROD_CACHE > 1
2525 if(self -> cache_cnt > 0 ){
2526 #if PROD_CACHE > 2
2527 memmove(self -> cache + 1, self -> cache , self -> cache_cnt * sizeof(*self -> cache));
2528 #else
2529 self -> cache[1]=self -> cache[0];
2530 #endif
2531 }
2532 #endif
2533 self -> cache_cnt ++;
2534 } else {
2535 /* release whatever was there previously */
2536 /* drop LRU */
2537 vblob_release ( self -> cache [ self -> cache_cnt - 1 ], NULL );
2538 #if PROD_CACHE > 1
2539 #if PROD_CACHE > 2
2540 memmove(self -> cache + 1, self -> cache , (self -> cache_cnt -1) * sizeof(*self -> cache));
2541 #else
2542 self -> cache[1]=self -> cache[0];
2543 #endif
2544 #endif
2545 }
2546 /* insert a head of list */
2547 self -> cache [ 0 ] = blob;
2548 #endif
2549
2550 #if TRACKING_BLOBS
2551 fprintf( stderr, "%p->%p(%d) cached *** %s\n"
2552 , self
2553 , blob
2554 , atomic32_read ( & blob -> refcount )
2555 , self -> name
2556 );
2557 #endif
2558 }
2559
2560 #if USE_EUGENE
2561 /* this code requires the blob to be cached on the production */
2562 return rc;
2563 #else
2564 /* we don't care if the blob was not cached */
2565 return 0;
2566 #endif
2567
2568 #endif /* PROD_CACHE */
2569
2570 #endif
2571 }
2572
2573 /* IsStatic
2574 * trace all the way to a physical production
2575 */
VProductionIsStatic(const VProduction * self,bool * is_static)2576 rc_t VProductionIsStatic ( const VProduction *self, bool *is_static )
2577 {
2578 rc_t rc;
2579
2580 if ( self == NULL )
2581 rc = RC ( rcVDB, rcColumn, rcAccessing, rcSelf, rcNull );
2582 else
2583 {
2584 for ( rc = 0; self != NULL; )
2585 {
2586 switch ( self -> var )
2587 {
2588 case prodSimple:
2589 self = ( ( const VSimpleProd*) self ) -> in;
2590 break;
2591 case prodFunc:
2592 case prodScript:
2593 {
2594 const VFunctionProd *fp = ( const VFunctionProd* ) self;
2595 uint32_t start = VectorStart ( & fp -> parms );
2596 uint32_t end = VectorLength ( & fp -> parms );
2597 for ( end += start; start < end; ++ start )
2598 {
2599 self = ( const VProduction* ) VectorGet ( & fp -> parms, start );
2600 if ( self != NULL )
2601 {
2602 rc = VProductionIsStatic ( self, is_static );
2603 if ( rc != 0 || * is_static )
2604 break;
2605 }
2606 }
2607 return rc;
2608 }
2609 case prodPhysical:
2610 return VPhysicalIsStatic ( ( ( const VPhysicalProd* ) self ) -> phys, is_static );
2611 case prodColumn:
2612 self = NULL;
2613 break;
2614 case prodPivot:
2615 assert(false); /* TODO */
2616 break;
2617 default:
2618 return RC ( rcVDB, rcProduction, rcReading, rcType, rcUnknown );
2619 }
2620 }
2621 }
2622
2623 return rc;
2624 }
2625
2626 /* GetKColumn
2627 * drills down to physical production to get a KColumn,
2628 * and if that fails, indicate whether the column is static
2629 */
VProductionGetKColumn(const VProduction * self,struct KColumn ** kcol,bool * is_static)2630 rc_t VProductionGetKColumn ( const VProduction * self, struct KColumn ** kcol, bool * is_static )
2631 {
2632 rc_t rc;
2633
2634 if ( self == NULL )
2635 rc = RC ( rcVDB, rcColumn, rcAccessing, rcSelf, rcNull );
2636 else
2637 {
2638 for ( rc = 0; self != NULL; )
2639 {
2640 switch ( self -> var )
2641 {
2642 case prodSimple:
2643 self = ( ( const VSimpleProd*) self ) -> in;
2644 break;
2645 case prodFunc:
2646 case prodScript:
2647 {
2648 const VFunctionProd *fp = ( const VFunctionProd* ) self;
2649 uint32_t start = VectorStart ( & fp -> parms );
2650 uint32_t end = VectorLength ( & fp -> parms );
2651 for ( end += start; start < end; ++ start )
2652 {
2653 self = ( const VProduction* ) VectorGet ( & fp -> parms, start );
2654 if ( self != NULL )
2655 {
2656 rc = VProductionGetKColumn ( self, kcol, is_static );
2657 if ( rc != 0 || * kcol != NULL || * is_static )
2658 break;
2659 }
2660 }
2661 return rc;
2662 }
2663 case prodPhysical:
2664 return VPhysicalGetKColumn ( ( ( const VPhysicalProd* ) self ) -> phys, kcol, is_static );
2665 case prodColumn:
2666 self = NULL;
2667 break;
2668 case prodPivot:
2669 assert(false); /* TODO */
2670 break;
2671 default:
2672 return RC ( rcVDB, rcProduction, rcReading, rcType, rcUnknown );
2673 }
2674 }
2675 }
2676
2677 return rc;
2678 }
2679