1 /*===========================================================================
2 *
3 *                            PUBLIC DOMAIN NOTICE
4 *               National Center for Biotechnology Information
5 *
6 *  This software/database is a "United States Government Work" under the
7 *  terms of the United States Copyright Act.  It was written as part of
8 *  the author's official duties as a United States Government employee and
9 *  thus cannot be copyrighted.  This software/database is freely available
10 *  to the public for use. The National Library of Medicine and the U.S.
11 *  Government have not placed any restriction on its use or reproduction.
12 *
13 *  Although all reasonable efforts have been taken to ensure the accuracy
14 *  and reliability of the software and data, the NLM and the U.S.
15 *  Government do not and cannot warrant the performance or results that
16 *  may be obtained by using this software or data. The NLM and the U.S.
17 *  Government disclaim all warranties, express or implied, including
18 *  warranties of performance, merchantability or fitness for any particular
19 *  purpose.
20 *
21 *  Please cite the author in any work or product based on this material.
22 *
23 * ===========================================================================
24 *
25 */
26 #define BYTECODE 0
27 
28 #define USE_EUGENE 1
29 
30 
31 #define TRACK_REFERENCES 0
32 
33 #include <vdb/extern.h>
34 
35 #define KONST const
36 #include "prod-priv.h"
37 #include "prod-expr.h"
38 #include "schema-priv.h"
39 #include "schema-parse.h"
40 #include "schema-expr.h"
41 #include "table-priv.h"
42 #include "cursor-priv.h"
43 #include "linker-priv.h"
44 #include "column-priv.h"
45 #include "phys-priv.h"
46 #include "blob-priv.h"
47 #include "blob.h"
48 #include "page-map.h"
49 #include "blob-headers.h"
50 #undef KONST
51 
52 #include <vdb/schema.h>
53 #include <vdb/cursor.h>
54 #include <vdb/xform.h>
55 #include <klib/symbol.h>
56 #include <klib/log.h>
57 #include <klib/debug.h>
58 #include <klib/rc.h>
59 #include <sysalloc.h>
60 
61 #include <ctype.h>
62 #include <os-native.h>
63 #include <stdlib.h>
64 #include <string.h>
65 #include <assert.h>
66 #include <bitstr.h>
67 #include <stdio.h>
68 #include <limits.h>
69 
70 #include "bytecode.h"
71 
72 #if !defined(WINDOWS)  &&  !defined(_WIN32)  &&  !defined(NCBI_WITHOUT_MT)
73 #define LAUNCH_PAGEMAP_THREAD 1
74 #endif
75 
76 
77 /*--------------------------------------------------------------------------
78  * VBlob
79  */
80 
81 
82 static
vblob_release(void * item,void * ignore)83 void CC vblob_release ( void *item, void *ignore )
84 {
85     TRACK_BLOB ( VBlobRelease, ( VBlob* ) item );
86     VBlobRelease ( ( VBlob* ) item );
87 }
88 
89 /*--------------------------------------------------------------------------
90  * VProduction
91  */
92 
93 
94 /* Make
95  *  allocation and parent initialization
96  *  called from the "Make" functions below
97  *
98  *  "prod" [ OUT ] - returned production
99  *
100  *  "size" [ IN ] - sizeof sub-type
101  *
102  *  "owned" [ IN ] - vector to contain productions
103  *
104  *  "var" [ IN ] - variant code, e.g. prodSimple, prodFunc
105  *
106  *  "sub" [ IN ] - sub-variant code, e.g. prodSimplePage2Blob
107  *
108  *  "name" [ IN, NULL OKAY ] - optional object name, derived
109  *  from schema if possible
110  *
111  *  "fd" [ IN ] and "desc" [ IN ] - production type description
112  *
113  *  "cid" [ IN ] - contextual ( two-level ) id
114  *
115  *  "chain" [ IN ] - which chain(s) are supported
116  *    chainEncoding    - when going from input to physical
117  *    chainDecoding    - when pulling from physical to output
118  *    chainUncommitted - when resolving trigger and validation expressions
119  */
VProductionMake(VProduction ** prodp,Vector * owned,size_t size,int var,int sub,const char * name,const VFormatdecl * fd,const VTypedesc * desc,const VCtxId * cid,uint8_t chain)120 rc_t VProductionMake ( VProduction **prodp, Vector *owned, size_t size,
121     int var, int sub, const char *name, const VFormatdecl *fd,
122     const VTypedesc *desc, const VCtxId *cid, uint8_t chain )
123 {
124     rc_t rc;
125     VProduction *prod;
126 
127 #if PROD_NAME
128     size_t psize = size;
129 #endif
130 
131     assert ( size >= sizeof * prod );
132 
133 #if PROD_NAME
134     if ( name != NULL )
135         size += strlen ( name );
136     size += 1;
137 #endif
138 
139     prod = calloc ( 1, size );
140     if ( prod == NULL )
141         rc = RC ( rcVDB, rcProduction, rcResolving, rcMemory, rcExhausted );
142     else
143     {
144         rc = VectorAppend ( owned, & prod -> oid, prod );
145         if ( rc != 0 )
146         {
147             free ( prod );
148             prod = NULL;
149         }
150         else
151         {
152 #if PROD_NAME
153             prod -> name = ( ( const char* ) prod ) + psize;
154             strcpy ( ( char* ) prod -> name, name ? name : "" );
155 #endif
156 
157             if ( fd != NULL )
158                 prod -> fd = * fd;
159             if ( desc != NULL )
160                 prod -> desc = * desc;
161             if ( cid != NULL )
162                 prod -> cid = * cid;
163 
164             prod -> var = ( uint8_t ) var;
165             prod -> sub = ( uint8_t ) sub;
166             prod -> chain = chain;
167         }
168     }
169 
170     * prodp = prod;
171     return rc;
172 }
173 
174 #if PROD_CACHE
175 static
VProductionFlushCacheDeep(VProduction * self,const char * context)176 void VProductionFlushCacheDeep ( VProduction *self, const char *context )
177 {
178     int i;
179     for ( i = 0; i < self -> cache_cnt; ++ i )
180     {
181 #if TRACKING_BLOBS
182         if ( self -> cache [ i ] != NULL )
183         {
184             fprintf( stderr, "%p->%p(%d) dropped from cache on %s *** %s\n"
185                      , self
186                      , self -> cache [ i ]
187                      , atomic32_read ( & self -> cache -> refcount )
188                      , context
189                      , self -> name
190                 );
191         }
192 #endif
193         vblob_release ( self -> cache [ i ], NULL );
194         self -> cache [ i ] = NULL;
195     }
196 }
197 #endif
198 
199 
200 /*--------------------------------------------------------------------------
201  * VSimpleProd
202  *  single input param
203  */
204 
205 /* Make
206  */
VSimpleProdMake(VProduction ** prodp,Vector * owned,struct VCursor const * curs,int sub,const char * name,const VFormatdecl * fd,const VTypedesc * desc,const VCtxId * cid,VProduction * in,uint8_t chain)207 rc_t VSimpleProdMake ( VProduction **prodp, Vector *owned, struct VCursor const *curs,
208    int sub, const char *name, const VFormatdecl *fd, const VTypedesc *desc,
209     const VCtxId *cid, VProduction *in, uint8_t chain )
210 {
211     VSimpleProd *prod;
212     rc_t rc = VProductionMake ( prodp, owned, sizeof * prod,
213         prodSimple, sub, name, fd, desc, cid, chain );
214     if ( rc == 0 )
215     {
216         prod = ( VSimpleProd* ) * prodp;
217         prod -> in = in;
218         prod -> curs = curs;
219     }
220     return rc;
221 }
222 
223 static
VSimpleProdPage2Blob(VSimpleProd * self,VBlob ** rslt,int64_t id,uint32_t cnt)224 rc_t VSimpleProdPage2Blob ( VSimpleProd *self, VBlob **rslt, int64_t id ,uint32_t cnt)
225 {
226     return VProductionReadBlob(self->in, rslt, & id, cnt, NULL);
227 }
228 
229 static
VSimpleProdSerial2Blob(VSimpleProd * self,VBlob ** rslt,int64_t id,uint32_t cnt)230 rc_t VSimpleProdSerial2Blob ( VSimpleProd *self, VBlob **rslt, int64_t id, uint32_t cnt )
231 {
232     /* read serialized blob */
233     VBlob *sblob;
234     rc_t rc = VProductionReadBlob ( self -> in, &sblob, & id, cnt, NULL );
235     if ( rc == 0 )
236     {
237         /* recast data to 8 bit */
238         KDataBuffer buffer;
239         rc = KDataBufferCast ( & sblob -> data, & buffer, 8, false );
240         if (rc == 0)
241         {
242             /* create a new, fluffy blob having rowmap and headers */
243             VBlob *y;
244 #if LAUNCH_PAGEMAP_THREAD
245             VCursorLaunchPagemapThread ( (VCursor*) self -> curs );
246 #endif
247 
248             rc = VBlobCreateFromData ( & y, sblob -> start_id, sblob -> stop_id,
249                 & buffer, VTypedescSizeof ( & self -> dad . desc ), VCursorPageMapProcessRequest( self -> curs ) );
250             KDataBufferWhack ( & buffer );
251 
252             /* return on success */
253             if ( rc == 0 )
254                 * rslt = y;
255         }
256 
257 	vblob_release(sblob, NULL);
258     }
259 
260     return rc;
261 }
262 
263 
264 static
VSimpleProdBlob2Serial(VSimpleProd * self,VBlob ** rslt,int64_t id,uint32_t cnt)265 rc_t VSimpleProdBlob2Serial( VSimpleProd *self, VBlob **rslt, int64_t id, uint32_t cnt )
266 {
267     rc_t rc;
268     VBlob *sblob;
269 
270     rc = VProductionReadBlob(self->in, &sblob, & id, cnt, NULL);
271     if (rc == 0) {
272         VBlob *y;
273 
274         rc = VBlobNew(&y, sblob->start_id, sblob->stop_id, "blob2serial");
275         TRACK_BLOB (VBlobNew, y);
276         if (rc == 0) {
277             rc = KDataBufferMakeBytes(&y->data, 0);
278             if (rc == 0) {
279                 /* save a reference to the page map so that fixed row-length can be determined */
280                 y->pm = sblob->pm;
281                 PageMapAddRef(y->pm);
282 
283                 rc = VBlobSerialize(sblob, &y->data);
284                 if (rc == 0)
285                     * rslt = y;
286             }
287             if (rc)
288 	      vblob_release(y, NULL);
289         }
290 
291 	vblob_release(sblob, NULL);
292     }
293     return rc;
294 }
295 
296 /* Read
297  *  return a blob containing row id
298  */
VSimpleProdRead(VSimpleProd * self,VBlob ** vblob,int64_t * id,uint32_t cnt,VBlobMRUCacheCursorContext * cctx)299 rc_t VSimpleProdRead ( VSimpleProd *self, VBlob **vblob, int64_t * id, uint32_t cnt, VBlobMRUCacheCursorContext *cctx)
300 {
301     rc_t rc;
302 
303     switch ( self -> dad . sub )
304     {
305     case prodSimpleCast:
306         rc = VProductionReadBlob ( self -> in, vblob, id, cnt, cctx ); /* *id may change here */
307         break;
308     case prodSimplePage2Blob:
309         return VSimpleProdPage2Blob(self, vblob, * id, cnt);
310     case prodSimpleSerial2Blob:
311         return VSimpleProdSerial2Blob(self, vblob, * id, cnt);
312     case prodSimpleBlob2Serial:
313         return VSimpleProdBlob2Serial(self, vblob, * id, cnt);
314     default:
315         * vblob = NULL;
316         return RC ( rcVDB, rcProduction, rcReading, rcProduction, rcCorrupt );
317     }
318 
319     if ( rc == 0 )
320     {
321         VBlob *blob = * vblob;
322 
323         /* force data buffer to reflect element size */
324         if ( self -> dad . fd . fmt == 0 &&
325              self -> dad . fd . td . type_id > 2 )
326         {
327             uint32_t elem_bits = VTypedescSizeof ( & self -> dad . desc );
328             if ( elem_bits != 0 && blob -> data . elem_bits != elem_bits )
329             {
330                 rc = KDataBufferCast ( & blob -> data, & blob -> data, elem_bits, true );
331                 if ( rc != 0 )
332                 {
333                     vblob_release ( blob, NULL );
334                     * vblob = NULL;
335                 }
336             }
337         }
338     }
339 
340     return rc;
341 }
342 
343 
344 /*--------------------------------------------------------------------------
345  * VFunctionProd
346  *  function input params are VProduction*
347  *  extern C function pointer and self object
348  */
349 
VFunctionProdMake(VFunctionProd ** prodp,Vector * owned,const VCursor * curs,int sub,const char * name,const VFormatdecl * fd,const VTypedesc * desc,uint8_t chain)350 rc_t VFunctionProdMake ( VFunctionProd **prodp, Vector *owned,
351     const VCursor *curs, int sub, const char *name,
352     const VFormatdecl *fd, const VTypedesc *desc, uint8_t chain )
353 {
354     VFunctionProd *prod;
355     rc_t rc = VProductionMake ( ( VProduction** ) prodp, owned,
356         sizeof * prod, prodFunc, sub, name, fd, desc, NULL, chain );
357     if ( rc == 0 )
358     {
359         prod = * prodp;
360         prod -> curs = curs;
361 
362         if ( sub != prodFuncByteswap )
363             VectorInit ( & prod -> parms, 0, 4 );
364         else
365         {
366             SDatatype *dt = VSchemaFindTypeid ( VCursorGetSchema ( curs ), fd -> td . type_id );
367             assert ( dt != NULL );
368             prod -> u . bswap = dt -> byte_swap;
369             VectorInit ( & prod -> parms, 0, 1 );
370         }
371     }
372     return rc;
373 }
374 
VFunctionProdDestroy(VFunctionProd * self)375 void VFunctionProdDestroy ( VFunctionProd *self )
376 {
377     /* release input parameters */
378     VectorWhack ( & self -> parms, NULL, NULL );
379     if ( self -> whack != NULL )
380         ( * self -> whack ) ( self -> fself );
381 }
382 
383 
384 
385 /* Read
386  */
387 
388 #define VECTOR_ALLOC_ARRAY( ARGC, ARRAY, S_ARRAY, H_ARRAY )	\
389     do { \
390         size_t const needed = (ARGC) * sizeof((ARRAY)[0]); \
391         (H_ARRAY) = NULL; \
392         (ARRAY) = &((S_ARRAY)[0]); \
393         if (needed > sizeof((S_ARRAY))) { \
394             (H_ARRAY) = malloc(needed); \
395             if ((H_ARRAY) == NULL) \
396                 return RC(rcVDB, rcProduction, rcReading, rcMemory, rcExhausted); \
397             (ARRAY) = &((H_ARRAY)[0]); \
398         } \
399         memset((ARRAY), 0, needed); \
400     } while (0)
401 
402 #define VECTOR_COPY_TO_ARRAY( VECTOR, ARRAY )	\
403     do { \
404         int i, n; \
405         for (n = (i = VectorStart((VECTOR))) + VectorLength((VECTOR)); i != n; ++i) \
406             (ARRAY)[i] = VectorGet((VECTOR), i); \
407     } while (0)
408 
409 #define VECTOR_TO_ARRAY( ARGC, ARRAY, S_ARRAY, H_ARRAY, VECTOR )	\
410     do { \
411         VECTOR_ALLOC_ARRAY((ARGC), (ARRAY), (S_ARRAY), (H_ARRAY)); \
412         VECTOR_COPY_TO_ARRAY((VECTOR), (ARRAY)); \
413     } while (0)
414 
415 static
VFunctionProdCallNDRowFunc(VFunctionProd * self,VBlob ** prslt,int64_t row_id,const VXformInfo * info,Vector * args)416 rc_t VFunctionProdCallNDRowFunc(
417                                 VFunctionProd *self,
418                                 VBlob **prslt,
419                                 int64_t row_id,
420                                 const VXformInfo *info,
421                                 Vector *args
422                                 )
423 {
424     rc_t rc;
425 
426     /* create output blob
427        TBD - try to used cached blob if available */
428 #if PROD_NAME
429     rc = VBlobNew ( prslt, row_id, row_id, self->dad.name );
430 #else
431     rc = VBlobNew ( prslt, row_id, row_id, "VFunctionProdCallNDRowFunc" );
432 #endif
433     TRACK_BLOB ( VBlobNew, *prslt );
434     if ( rc == 0 )
435     {
436         VRowResult rslt;
437         VRowData on_stack [ 16 ], *on_heap, *argv;
438 
439         VBlob *blob = * prslt;
440         uint32_t i, argc = VectorLength ( args );
441 
442         /* create and populate array of input parameters */
443         VECTOR_ALLOC_ARRAY(argc, argv, on_stack, on_heap);
444         for ( i = 0; i < argc; ++ i )
445         {
446             const VBlob *in = VectorGet(args, i);
447             uint32_t first_elem;
448 
449             /* always point to page base address */
450             argv [ i ] . u . data . base = in -> data . base;
451 
452             /* get row length and starting element in one pass */
453             argv [ i ] . u . data . elem_count = PageMapGetIdxRowInfo ( in -> pm, (uint32_t)( row_id - in -> start_id ), & first_elem, NULL );
454             argv [ i ] . u . data . first_elem = first_elem;
455 
456             /* finally set the element size */
457             argv [ i ] . u . data . elem_bits = in -> data . elem_bits;
458         }
459 
460         /* fill out return param block
461            NB - the initially passed-in buffer
462            may be reallocated by external function */
463         rslt . data =  & blob -> data;
464         rslt . elem_count = 0;
465         rslt . elem_bits = blob -> data . elem_bits =
466             VTypedescSizeof ( & self -> dad . desc );
467         rslt.no_cache = 0;
468 
469         blob -> byte_order = vboNative;
470 
471         /* invoke the row function */
472         rc = self -> u . ndf ( self -> fself, info, row_id, & rslt, argc, argv );
473         blob->no_cache = (rslt.no_cache ? true : false);
474 
475         /* clean up input arguments */
476         if ( on_heap != NULL )
477             free ( on_heap );
478 
479         /* take reallocated buffer */
480         if ( rslt . data != & blob -> data )
481         {
482             KDataBufferWhack ( & blob -> data );
483             KDataBufferSub ( rslt . data, & blob -> data, 0, UINT64_MAX );
484             KDataBufferWhack ( rslt . data );
485         }
486         blob->data.elem_count = rslt.elem_count;
487 
488         /* if the function was successful incorporate row length */
489         if (rc == 0)
490         {
491             assert(rslt . elem_count >> 32 == 0);
492             rc = PageMapNewFixedRowLength ( & blob -> pm, 1, (uint32_t)rslt . elem_count );
493             if ( rc == 0 )
494                 return 0;
495         }
496 
497         vblob_release ( blob, NULL );
498 
499         *prslt = NULL;
500     }
501 
502     return rc;
503 }
504 
505 /* TODO: enable in next release */
506 #define PAGEMAP_PRE_EXPANDING_SINGLE_ROW_FIX 0
507 static
VFunctionProdCallNullaryRowFunc(VFunctionProd * self,VBlob ** prslt,int64_t row_id,uint32_t row_count,const VXformInfo * info)508 rc_t VFunctionProdCallNullaryRowFunc(VFunctionProd *self,
509                                      VBlob **prslt,
510                                      int64_t row_id,
511                                      uint32_t row_count,
512                                      const VXformInfo *info)
513 {
514     rc_t rc = 0;
515     KDataBuffer scratch;
516     VRowData args[1];
517     VRowResult rslt;
518 
519     memset(&scratch, 0, sizeof(scratch));
520     memset(&args[0], 0, sizeof(args[0]));
521 
522     rslt.data = &scratch;
523     rslt.elem_count = 0;
524     rslt.elem_bits = scratch.elem_bits = VTypedescSizeof(&self->dad.desc);
525 
526     rc = self->u.rf(self->fself, info, row_id, &rslt, 0, args);
527     if (rc == 0) {
528         VBlob *blob = NULL;
529 
530 #if PROD_NAME
531         rc = VBlobNew ( &blob, -INT64_MAX - 1, INT64_MAX, self->dad.name );
532 #else
533         rc = VBlobNew ( &blob, -INT64_MAX - 1, INT64_MAX, "VFunctionProdCallDetRowFunc" );
534 #endif
535         if (rc == 0) {
536             blob->byte_order = vboNative;
537             assert(rslt.elem_count <= UINT32_MAX);
538             KDataBufferSub(rslt.data, &blob->data, 0, rslt.elem_count);
539             if ( rslt.data != & scratch )
540                 KDataBufferWhack(rslt.data);
541             rc = PageMapNewSingle(&blob->pm, UINT32_MAX, (uint32_t)rslt.elem_count);
542             if (rc == 0)
543                 *prslt = blob;
544             else
545                 vblob_release(blob, NULL);
546         }
547     }
548     KDataBufferWhack(&scratch);
549     return rc;
550 }
551 
computeWindow(uint32_t * const pwindow,int64_t const start_id,int64_t const stop_id,int64_t const row_id,uint32_t const max_blob_regroup)552 static bool computeWindow(uint32_t *const pwindow, int64_t const start_id, int64_t const stop_id, int64_t const row_id, uint32_t const max_blob_regroup)
553 {
554     int64_t window = stop_id - start_id + 1;
555     bool window_resized = false;
556 
557     /*** from previous fetch **/
558 
559     /** detect sequentual io ***/
560     if (row_id == stop_id + 1)
561     {
562         if (window > max_blob_regroup)
563         {
564             window = max_blob_regroup;
565             window_resized = true;
566         }
567         else if (row_id % (4 * window) == 1)
568         {
569             if (window < max_blob_regroup)
570             {
571                 if (4 * window <= max_blob_regroup)
572                     window *= 4;
573                 else
574                     window = max_blob_regroup;
575                 window_resized = true;
576             }
577             /* we know that row_id lands on the first row of the new window */
578         }
579     }
580     else
581     {
582         /* random access - use tiny blob window */
583         window = 1;
584         window_resized = true;
585     }
586     assert(window <= UINT32_MAX);
587     *pwindow = window;
588     return window_resized;
589 }
590 
591 #if UNIT_TEST_VDB_3058
UnitTest_VDB_3058(void)592 static void UnitTest_VDB_3058(void)
593 {
594 /* test for case of integer overflow to zero causing divide-by-zero error in computeWindow */
595     uint32_t max_blob_regroup = 1024;
596     uint32_t window = 0;
597     int64_t start_id = 1;
598     int64_t stop_id = UINT32_MAX / 4 + 1;
599     int64_t row_id = stop_id + 1;
600     bool window_resized = computeWindow(&window, start_id, stop_id, row_id, max_blob_regroup);
601 
602     /* this output isn't important, it's just to make sure that
603      * the compiler doesn't optimize out the relevant computations */
604     printf("window: %u\nwindow_resized: %s\n", window, window_resized ? "true" : "false");
605 }
606 #endif
607 
608 static
VFunctionProdCallRowFunc1(VFunctionProd * self,VBlob ** prslt,int64_t row_id,uint32_t row_count,const VXformInfo * info,Vector * args,int64_t param_start_id,int64_t param_stop_id)609 rc_t VFunctionProdCallRowFunc1( VFunctionProd *self, VBlob **prslt, int64_t row_id,
610     uint32_t row_count, const VXformInfo *info, Vector *args,int64_t param_start_id,int64_t param_stop_id)
611 {
612     rc_t rc;
613     uint32_t i, argc = VectorLength ( args );
614     VRowResult rslt;
615     VRowData args_os[16], *args_oh, *argv;
616     KDataBuffer scratch;
617     VBlob *blob;
618     const VBlob *in;
619     PageMapIterator iter_os[16], *iter_oh, *iter;
620     uint64_t last = 0;
621     uint32_t last_len = 0;
622     uint32_t window;
623 	uint32_t min_row_count=UINT32_MAX; /* will increase row_count due to larger common repeat count of parameters */
624     int64_t  row_id_max=0;
625     uint32_t max_blob_regroup; /** max rows in blob for regrouping ***/
626     bool function_failed = false;
627     bool window_resized = false;
628 
629     if ( VCursorCacheActive ( self -> curs, & row_id_max ) )
630     {   /*** since cache_cursor exist, trying to avoid prefetching data which is in cache cursor ***/
631 		max_blob_regroup=256;
632     } else
633     {
634 		max_blob_regroup=1024;
635     }
636 
637 	if(self->dad.sub == vftRowFast)
638     {
639 		window = max_blob_regroup;
640 	}
641     else
642     {
643         window_resized = computeWindow(&window, self->start_id, self->stop_id, row_id, max_blob_regroup);
644 	}
645 
646     assert ( 0 < window && window <= max_blob_regroup );
647 
648     if(window == 1)
649     {
650         /* random access or initial blob - create blob with initial row-count */
651 		self->start_id=self->stop_id=row_id;
652 		if(row_count > 0)
653             self->stop_id += row_count-1;
654     }
655     else
656     {
657         /* start out with supplied row range */
658         self->start_id=param_start_id;
659         self->stop_id =param_stop_id;
660         assert(row_id >= self->start_id && row_id  + row_count -1 <= self->stop_id);
661 
662         /* special code to detect an old-style static column with infinite range */
663         if(self->start_id==-INT64_MAX - 1 || self->stop_id==INT64_MAX)
664         {
665             /* same logic as above */
666             self->start_id=self->stop_id=row_id;
667             if(row_count > 0)
668                 self->stop_id += row_count-1;
669         }
670 
671         /* this code only executes if requested row-count is 1 */
672         else if ( row_count == 1 )
673         {
674             /* the blob itself has to be at least twice the window size.
675               this may be a problem because once "window" becomes large enough,
676               it could cause us to switch from "reblobbing" back to whole blob. */
677             if ( self->stop_id - self->start_id > 2*window)
678             {
679                 /* determine which "window" ( relative to start of TABLE ) contains row_id */
680                 int64_t	n=(row_id-1)/window;
681 
682                 /* look at blob left edge, move up to window left edge if possible */
683                 if(self->start_id <= n*window)
684                     self->start_id=n*window+1;
685 
686                 /* look at blob right edge, move down to window right edge if possible */
687                 if(self->stop_id > (n+1) * window)
688                     self->stop_id = (n+1)*window;
689 
690                 /* eventual window from self->start_id..self->stop_id must be <= "window" size */
691                 assert ( self -> start_id <= self -> start_id );
692                 assert ( self -> stop_id - self -> start_id + 1 <= window );
693             }
694             else if ( window_resized )
695             {
696                 self -> start_id = row_id;
697             }
698             else
699             {
700                 /* handle case when the window has grown large enough
701                    to disable reblobbing after blobbing was once in place */
702             }
703         }
704     }
705 
706     /* create and populate array of input parameters */
707     VECTOR_ALLOC_ARRAY(argc, argv, args_os, args_oh);
708     VECTOR_ALLOC_ARRAY(argc, iter, iter_os, iter_oh);
709     for (i = 0; i != argc; ++ i){
710 
711         in = VectorGet(args, i);
712 
713 		if(in->start_id == -INT64_MAX - 1 ) {
714 			rc = PageMapNewIterator(in->pm, &iter[i],0,-1);
715 		} else if(param_stop_id>self->stop_id && param_stop_id < INT64_MAX){
716 			rc = PageMapNewIterator(in->pm, &iter[i], self->start_id-in->start_id, param_stop_id - self->start_id + 1);
717 			if(rc == 0){
718 				uint32_t n = PageMapIteratorRepeatCount(&iter[i]);
719 				if(n < min_row_count) min_row_count=n;
720 			}
721 		} else {
722 			rc = PageMapNewIterator(in->pm, &iter[i], self->start_id-in->start_id, self->stop_id - self->start_id + 1);
723 		}
724         if ( rc ) break;
725         argv[i].variant = vrdData;
726         argv[i].blob_stop_id = in->stop_id;
727         argv[i].u.data.elem_bits = in->data.elem_bits;
728         argv[i].u.data.base = in->data.base;
729         argv[i].u.data.base_elem_count = in->data.elem_count;
730     }
731 	if(min_row_count < UINT32_MAX && self->start_id + min_row_count -1 > self->stop_id ){
732 		self->stop_id = self->start_id + min_row_count - 1;
733 	}
734 	if(row_id_max >= row_id && self->stop_id > row_id_max)     self->stop_id = row_id_max;
735 
736 
737 
738 
739 #if PROD_NAME
740     rc = VBlobNew ( &blob, self->start_id, self->stop_id, self->dad.name );
741 #else
742     rc = VBlobNew ( &blob, self->start_id, self->stop_id, "VFunctionProdCallDetRowFunc" );
743 #endif
744     TRACK_BLOB ( VBlobNew, blob );
745     if (rc)
746         return rc;
747 
748 #if PAGEMAP_PRE_EXPANDING_SINGLE_ROW_FIX
749 #else
750     rc = PageMapNew(&blob->pm, row_count /**BlobRowCount(blob)**/);
751 #if 0
752     /* disabled for causing problems with accumulating static columns */
753     if (rc == 0)
754 		rc = PageMapPreExpandFull(blob->pm, BlobRowCount(blob));
755 #endif
756     if (rc) {
757       vblob_release(blob, NULL);
758         return rc;
759     }
760 #endif
761 
762     memset(&scratch, 0, sizeof(scratch));
763     rslt.data = &scratch;
764     rslt.elem_bits = scratch.elem_bits = blob->data.elem_bits = VTypedescSizeof(&self->dad.desc);
765     blob->byte_order = vboNative;
766 
767 
768 
769     for (row_id = self->start_id; row_id <= self->stop_id && rc == 0; ) {
770         uint32_t row_count = 1;
771         if(self->dad.sub == vftRow || self->dad.sub ==vftRowFast ){
772             row_count = PageMapIteratorRepeatCount(&iter[0]);
773 
774             for (i = 1; i != argc; ++i) {
775                 uint32_t j = PageMapIteratorRepeatCount(&iter[i]);
776                 if (row_count > j)
777                 row_count = j;
778             }
779             if (row_id + row_count > self->stop_id + 1)
780             row_count = (uint32_t)( self->stop_id + 1 - row_id );
781         }
782 
783         for (i = 0; i != argc; ++i) {
784             argv[i].u.data.elem_count = PageMapIteratorDataLength(&iter[i]);
785             argv[i].u.data.first_elem = PageMapIteratorDataOffset(&iter[i]);
786         }
787 
788         rslt.elem_count = 0;
789         rc = self->u.rf(self->fself, info, row_id, &rslt, argc, argv);
790         if (rc) {
791             function_failed = true;
792             break;
793         }
794 
795         assert(rslt.elem_count >> 32 == 0);
796 
797         if (row_id == self->start_id) {
798 #if PAGEMAP_PRE_EXPANDING_SINGLE_ROW_FIX
799             if (blob->start_id + row_count > blob->stop_id) {
800                 last = blob->data.elem_count;
801                 rc = KDataBufferResize(&blob->data, blob->data.elem_count + rslt.elem_count);
802                 if (rc == 0) {
803                     bitcpy(blob->data.base, last * rslt.elem_bits,
804                            rslt.data->base, 0, rslt.elem_count * rslt.elem_bits);
805                     rc = PageMapNewSingle(&blob->pm, row_count, rslt.elem_count);
806                 }
807             }
808             else
809             goto ADD_ROW_TO_BLOB;
810 #else
811             goto ADD_ROW_TO_BLOB;
812 #endif
813         }
814         else if (last_len != rslt.elem_count ||
815                  bitcmp(blob->data.base, last * rslt.elem_bits,
816                         rslt.data->base, 0, rslt.elem_count * rslt.elem_bits) != 0)
817         {
818         ADD_ROW_TO_BLOB:
819             last = blob->data.elem_count;
820             rc = KDataBufferResize(&blob->data, blob->data.elem_count + rslt.elem_count);
821             if (rc == 0) {
822                 bitcpy(blob->data.base, last * rslt.elem_bits,
823                        rslt.data->base, 0, rslt.elem_count * rslt.elem_bits);
824                 rc = PageMapAppendRows(blob->pm, rslt.elem_count, row_count, false);
825             }
826         }
827         else
828             rc = PageMapAppendRows(blob->pm, rslt.elem_count, row_count, true);
829 
830         /* drop any new buffer that was returned to us */
831         if (rslt.data != &scratch) {
832             KDataBufferWhack(rslt.data);
833         }
834 
835         if (rc) break;
836 
837         last_len = (uint32_t)rslt.elem_count;
838 
839         for (i = 0; i != argc; ++i)
840             PageMapIteratorAdvance(&iter[i], row_count);
841         row_id += row_count;
842     }
843     KDataBufferWhack(&scratch);
844     if (args_oh) free(args_oh);
845     if (iter_oh) free(iter_oh);
846 
847     if (rc == 0 || (function_failed && row_id > self->start_id)) {
848         *prslt = blob;
849         return 0;
850     }
851     vblob_release(blob, NULL);
852 
853     return rc;
854 }
855 
856 static
VFunctionProdCallRowFunc(VFunctionProd * self,VBlob ** prslt,int64_t row_id,uint32_t row_count,const VXformInfo * info,Vector * args,int64_t param_start_id,int64_t param_stop_id)857 rc_t VFunctionProdCallRowFunc( VFunctionProd *self, VBlob **prslt, int64_t row_id,
858     uint32_t row_count, const VXformInfo *info, Vector *args,int64_t param_start_id,int64_t param_stop_id)
859 {
860     uint32_t const argc = VectorLength(args);
861 
862     if (argc == 0)
863         return VFunctionProdCallNullaryRowFunc(self, prslt, row_id, row_count, info);
864     else
865         return VFunctionProdCallRowFunc1(self, prslt, row_id, row_count, info, args, param_start_id, param_stop_id);
866 }
867 
868 static
VFunctionProdCallArrayFunc(VFunctionProd * self,VBlob ** prslt,int64_t id,const VXformInfo * info,Vector * args)869 rc_t VFunctionProdCallArrayFunc( VFunctionProd *self, VBlob **prslt,
870     int64_t id, const VXformInfo *info, Vector *args ) {
871     VBlob *rslt = 0;
872     VBlob *sblob;
873     rc_t rc;
874 
875     sblob = VectorGet(args, 0);
876     assert(sblob);
877 
878 #if PROD_NAME
879     rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, self->dad.name);
880 #else
881     rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, "VFunctionProdCallArrayFunc");
882 #endif
883     TRACK_BLOB( VBlobNew, rslt );
884     if (rc == 0) {
885         rslt->pm = sblob->pm;
886         PageMapAddRef(rslt->pm);
887 
888         if (sblob->headers) {
889             if ( self -> dad . chain == chainEncoding )
890                 rc = BlobHeadersCreateChild(sblob->headers, &rslt->headers);
891             else {
892                 rslt->headers = (BlobHeaders *)BlobHeadersGetNextFrame(sblob->headers);
893                 BlobHeadersAddRef(rslt->headers);
894             }
895         }
896         if (rc == 0) {
897 	    rc = KDataBufferMake(&rslt->data, VTypedescSizeof(&self->dad.desc), sblob->data.elem_count);
898             if (rc == 0) {
899                 rc = self->u.af(
900                                 self->fself,
901                                 info,
902                                 rslt->data.base,
903                                 sblob->data.base,
904                                 sblob->data.elem_count
905                                 );
906                 if (rc == 0) {
907                     *prslt = rslt;
908                     return 0;
909                 }
910             }
911         }
912 
913         vblob_release( rslt, NULL );
914     }
915 
916     return rc;
917 }
918 
919 static
VFunctionProdCallPageFunc(VFunctionProd * self,VBlob ** rslt,int64_t id,const VXformInfo * info,Vector * args)920 rc_t VFunctionProdCallPageFunc( VFunctionProd *self, VBlob **rslt, int64_t id,
921     const VXformInfo *info, Vector *args )
922 {
923     struct input_t {
924         const VBlob *blob;
925         bool	sb_input;
926         PageMapIterator cur_row;
927         bool at_end;
928     };
929     struct input_t on_stack[8];
930     struct input_t *on_heap;
931     struct input_t *argv;
932 
933     VRowData pb_stack[8];
934     VRowData *pb_heap;
935     VRowData *param;
936 
937     rc_t rc=0;
938     uint32_t i, argc = VectorLength(args);
939     int64_t start_id;
940     int64_t stop_id;
941     uint32_t elem_count;
942     uint32_t row_count;
943     int first_non_control_input;
944     int allInputsAreSingleRow;
945     VBlob *blob = NULL;
946 
947     VECTOR_ALLOC_ARRAY(argc, argv, on_stack, on_heap);
948     VECTOR_ALLOC_ARRAY(argc, param, pb_stack, pb_heap);
949 
950     for (start_id = stop_id = 0,
951          first_non_control_input=-1,
952          allInputsAreSingleRow = true,
953          i = 0; i != argc ; ++i) {
954         const VBlob *b = (const VBlob *)VectorGet(args, i);
955         const VProduction *prod = (const VProduction *)VectorGet(&self->parms, i);
956 
957 
958         if(b->pm == NULL){
959             rc=PageMapProcessGetPagemap( VCursorPageMapProcessRequest ( self -> curs ) ,(PageMap**)(&b->pm));
960             if(rc != 0) return rc;
961         }
962 
963         if (prod->control) {
964             param[i].variant = vrdControl;
965             assert(0); /*** TBD: Not implemented ???? ***/
966         } else {
967             param[i].variant = vrdData;
968             argv[i].blob = b;
969             argv[i].sb_input = VBlobIsSingleRow(argv[i].blob);
970             allInputsAreSingleRow &= argv[i].sb_input;
971 
972             if(first_non_control_input < 0){
973                 first_non_control_input = i;
974                 start_id = argv[i].blob->start_id;
975                 stop_id = argv[i].blob->stop_id;
976             } else {
977                 if(start_id < argv[i].blob->start_id)
978                     start_id = argv[i].blob->start_id;
979                 if(stop_id > argv[i].blob->stop_id)
980                     stop_id  = argv[i].blob->stop_id;
981             }
982         }
983     }
984     if ( allInputsAreSingleRow ) {
985 	row_count = stop_id - start_id + 1;
986 	if(row_count == 0 ) /*** case of static column **/
987 		row_count=1;
988     } else {
989 	row_count = stop_id - start_id + 1;
990     }
991     if (first_non_control_input < 0) /* no non-control inputs */
992         rc = RC(rcVDB, rcFunction, rcExecuting, rcParam, rcInvalid);
993     else if (start_id > stop_id )
994         rc = RC(rcVDB, rcFunction, rcExecuting, rcRange, rcEmpty);
995 
996     for ( elem_count = 0, i = 0; i != argc && rc ==0; ++i) {
997         rc = PageMapNewIterator(argv[i].blob->pm, &argv[i].cur_row, start_id - argv[i].blob->start_id ,row_count);
998         if(rc == 0){
999             PageMapIterator temp = argv[i].cur_row;
1000             uint32_t ec = 0;
1001 
1002             if ( argv[i].sb_input ){
1003                 ec = PageMapIteratorDataLength(&temp) * row_count;
1004             } else do {
1005                 ec+=PageMapIteratorDataLength(&temp);
1006             } while (PageMapIteratorNext(&temp));
1007             if(ec == 0){
1008                 rc = RC(rcVDB, rcFunction, rcExecuting, rcPagemap, rcInvalid); /* bad page map */
1009             } else if (elem_count == 0){
1010                 elem_count=ec;
1011             } else if (ec != elem_count){
1012                 rc = RC(rcVDB, rcFunction, rcExecuting, rcParam, rcInvalid); /* Pages have to have the same number of elements*/
1013             }
1014     	}
1015     }
1016 
1017     while (rc == 0) /* not really while */ {
1018 #if PROD_NAME
1019         rc = VBlobNew(&blob, start_id, stop_id, self->dad.name);
1020 #else
1021         rc = VBlobNew(&blob, start_id, stop_id, "VFunctionProdCallPageFunc");
1022 #endif
1023         if (rc) break;
1024 
1025         TRACK_BLOB(VBlobNew,blob);
1026 
1027         if (allInputsAreSingleRow) {
1028             VFixedRowResult rslt;
1029             uint32_t	row_element_count = PageMapIteratorDataLength(&argv[first_non_control_input].cur_row);
1030 
1031             rc = PageMapNewSingle(&blob->pm, row_count, row_element_count);
1032             if (rc) break;
1033 
1034             rc = KDataBufferMake(&blob->data, VTypedescSizeof(&self->dad.desc), row_element_count);
1035             if (rc) break;
1036 
1037             for (i = 0; i != argc; ++i) {
1038                 if (param[i].variant == vrdControl)
1039                     continue;
1040                 if (argv[i].at_end) {
1041                     rc = RC(rcVDB, rcFunction, rcExecuting, rcRow, rcNotFound);
1042                     break;
1043                 }
1044 
1045                 param[i].u.data.base = argv[i].blob->data.base;
1046                 param[i].u.data.elem_count = row_element_count;
1047                 param[i].u.data.first_elem = PageMapIteratorDataOffset(&argv[i].cur_row);
1048                 param[i].u.data.elem_bits = argv[i].blob->data.elem_bits;
1049 
1050                 argv[i].at_end = PageMapIteratorNext(&argv[i].cur_row) ? false : true;
1051             }
1052             if (rc) break;
1053 
1054             rslt.base = blob->data.base;
1055             rslt.first_elem = 0;
1056             rslt.elem_count = row_element_count;
1057             rslt.elem_bits = blob->data.elem_bits;
1058 
1059             rc = self->u.pf(self->fself, info, start_id, &rslt, argc, param);
1060         } else {
1061             uint32_t first_write;
1062             int64_t row_id;
1063             uint32_t last = 0;
1064             uint32_t last_rowlen = 0;
1065 
1066             rc = PageMapNew(&blob->pm, row_count); /*** max number of rows - it may collapse some **/
1067             if (rc) break;
1068             rc = KDataBufferMake(&blob->data, VTypedescSizeof(&self->dad.desc), elem_count);
1069             if (rc) break;
1070 
1071             for (first_write = 0, row_id = start_id; row_id <= stop_id; ++row_id) {
1072                 VFixedRowResult rslt;
1073 
1074                 for (i = 0; i != argc; ++i) {
1075                     if (param[i].variant == vrdControl)
1076                         continue;
1077                     if (argv[i].at_end) {
1078                         rc = RC(rcVDB, rcFunction, rcExecuting, rcRow, rcNotFound);
1079                         break;
1080                     }
1081 
1082                     param[i].u.data.base = argv[i].blob->data.base;
1083                     param[i].u.data.elem_count = PageMapIteratorDataLength(&argv[i].cur_row);
1084                     param[i].u.data.first_elem = PageMapIteratorDataOffset(&argv[i].cur_row);
1085                     param[i].u.data.elem_bits = argv[i].blob->data.elem_bits;
1086 
1087                     argv[i].at_end = PageMapIteratorNext(&argv[i].cur_row) ? false : true;
1088                 }
1089                 if (rc)
1090                     break;
1091 
1092                 rslt.base = blob->data.base;
1093                 rslt.first_elem = first_write;
1094                 rslt.elem_count = param[first_non_control_input].u.data.elem_count;
1095                 rslt.elem_bits = blob->data.elem_bits;
1096 
1097                 rc = self->u.pf(self->fself, info, row_id, &rslt, argc, param);
1098                 if (rc)
1099                     break;
1100 
1101                 assert(rslt.elem_count >> 32 == 0);
1102                 if ( row_id != start_id && last_rowlen == rslt.elem_count &&
1103                     memcmp(((char*)blob->data.base) + (last*rslt.elem_bits)/8,
1104                            ((char*)blob->data.base) + (first_write*rslt.elem_bits)/8,
1105                            (rslt.elem_count*rslt.elem_bits)/8) == 0)
1106                 {
1107                     rc = PageMapAppendRow(blob->pm, (uint32_t)rslt.elem_count, true);
1108                 }
1109                 else {
1110                     last = first_write;
1111                     first_write += rslt.elem_count;
1112                     rc = PageMapAppendRow(blob->pm, (uint32_t)rslt.elem_count, false);
1113                 }
1114                 if (rc)
1115                     break;
1116                 last_rowlen = (uint32_t)rslt.elem_count;
1117             }
1118             if (rc)
1119                 break;
1120             KDataBufferSub(&blob->data, &blob->data, 0, first_write);
1121         }
1122         *rslt = blob;
1123         break;
1124     }
1125     if (rc != 0 && blob != NULL) vblob_release(blob, NULL);
1126     if (on_heap) free(on_heap);
1127     if (pb_heap) free(pb_heap);
1128     return rc;
1129 }
1130 
1131 static
VFunctionProdCallBlobFuncEncoding(VFunctionProd * self,VBlob * rslt,int64_t id,const VXformInfo * info,const VBlob * sblob)1132 rc_t VFunctionProdCallBlobFuncEncoding( VFunctionProd *self, VBlob *rslt, int64_t id,
1133     const VXformInfo *info, const VBlob *sblob ) {
1134     VBlobData src;
1135     VBlobResult dst;
1136     VBlobHeader *hdr;
1137     rc_t rc;
1138     uint32_t elem_size = VTypedescSizeof(&self->dad.desc);
1139 
1140     rc = BlobHeadersCreateChild(sblob->headers, &rslt->headers);
1141     if (rc == 0) {
1142         hdr = BlobHeadersGetHdrWrite(rslt->headers);
1143         if (hdr) {
1144             bitsz_t sz = KDataBufferBits(&sblob->data);
1145 
1146             VBlobHeaderSetSourceSize(hdr, KDataBufferBytes(&sblob->data));
1147             sz = (sz + elem_size - 1) / elem_size;
1148             rc = KDataBufferMake( &rslt->data, elem_size, sz );
1149         }
1150         else
1151             rc = RC(rcVDB, rcFunction, rcExecuting, rcMemory, rcExhausted);
1152     }
1153     if (rc)
1154         return rc;
1155 
1156     dst.header = NULL;
1157 
1158     if ( sblob -> data.elem_count == 0)
1159         goto SKIP_COMPRESSION;
1160 
1161     src.data = sblob -> data.base;
1162     src.elem_count = sblob -> data.elem_count;
1163     src.elem_bits = sblob -> data.elem_bits;
1164     src.byte_order = sblob -> byte_order;
1165 
1166     dst.data = rslt -> data.base;
1167     dst.elem_count = rslt -> data.elem_count;
1168     dst.elem_bits = rslt -> data.elem_bits;
1169     dst.byte_order = sblob -> byte_order;
1170 
1171     rc = self->u.bf(self->fself, info, &dst, &src, hdr);
1172 
1173     if (rc == 0) {
1174         if ( dst.header != NULL && dst.header != hdr ) {
1175             VBlobHeaderReplace ( hdr, dst.header );
1176             VBlobHeaderRelease ( dst.header );
1177         }
1178         rslt->data.elem_bits = dst.elem_bits;
1179         rslt->data.elem_count = dst.elem_count;
1180         rslt->byte_order = dst.byte_order;
1181     }
1182     else if (GetRCObject(rc) == (enum RCObject)rcBuffer && GetRCState(rc) == rcInsufficient) {
1183     SKIP_COMPRESSION:
1184         VBlobHeaderSetFlags(hdr, 1);
1185 
1186         KDataBufferWhack(&rslt->data);
1187         if ( dst.header != NULL && dst.header != hdr )
1188             VBlobHeaderRelease ( dst.header );
1189 
1190         /* compressors usually produce bits (elem_size == 1) or bytes (elem_size == 8)
1191          * the cast to bits can never fail, so we will force the cast to bytes to also be
1192          * infallible; casts to other sizes are allowed to fail to prevent data loss */
1193         if (elem_size == 8) {
1194             KDataBufferSub(&sblob->data, &rslt->data, 0, UINT64_MAX);
1195             /* We can't shrink the data and KDataBufferCast won't increase the number of bits
1196              * but we know that KDataBuffer can't allocate anything other than whole bytes
1197              * so we're forcing the conversion to bytes manually. */
1198             rslt->data.elem_count = KDataBufferBytes(&rslt->data);
1199             rslt->data.elem_bits = 8;
1200             rc = 0;
1201         }
1202         else /* if elem_size == 1 this will always work */
1203             rc = KDataBufferCast(&sblob->data, &rslt->data, elem_size, false);
1204     }
1205     VBlobHeaderRelease(hdr);
1206 
1207     return rc;
1208 }
1209 
1210 static
VFunctionProdCallBlobFuncDecoding(VFunctionProd * self,VBlob * rslt,int64_t id,const VXformInfo * info,const VBlob * sblob)1211 rc_t VFunctionProdCallBlobFuncDecoding( VFunctionProd *self, VBlob *rslt,
1212     int64_t id, const VXformInfo *info, const VBlob *sblob ) {
1213     VBlobHeader *hdr;
1214     rc_t rc;
1215     uint32_t elem_size = VTypedescSizeof(&self->dad.desc);
1216 
1217     if (sblob->headers == NULL) {
1218         /* v1 blobs don't have headers, but v1 blobs
1219          * are fixed row-length so we know the data size
1220          * we are relying on the blob deserialization code
1221          * to set the page map up correctly */
1222         if (sblob->pm == NULL) {
1223             hdr = BlobHeadersCreateDummyHeader(0, 0, 0, (sblob->data.elem_bits * sblob->data.elem_count + 7) >> 3);
1224         }
1225         else
1226             hdr = BlobHeadersCreateDummyHeader(0, 0, 0, BlobRowCount(sblob) * PageMapGetIdxRowInfo(sblob->pm, 0, 0, NULL));
1227         /* leave rslt->headers null so that the next
1228          * stage will also create a dummy header */
1229     }
1230     else {
1231         /* rslt gets the headers for the next stage in decoding */
1232         rslt->headers = (BlobHeaders *)BlobHeadersGetNextFrame(sblob->headers);
1233         BlobHeadersAddRef(rslt->headers);
1234 
1235         /* get the headers for this stage in decoding */
1236         hdr = BlobHeadersGetHeader(sblob->headers);
1237     }
1238     if ( hdr == NULL )
1239         rc = RC(rcVDB, rcFunction, rcExecuting, rcMemory, rcExhausted);
1240     else if ((VBlobHeaderFlags(hdr) & 1) != 0)
1241     {
1242         /* compression was skipped */
1243         VBlobHeaderRelease(hdr);
1244         return KDataBufferCast(&sblob->data, &rslt->data, elem_size, true);
1245     }
1246     else
1247     {
1248         rc = KDataBufferMakeBytes(&rslt->data, VBlobHeaderSourceSize(hdr));
1249         if (rc == 0) {
1250             VBlobData src;
1251             VBlobResult dst;
1252 
1253             dst.header = NULL;
1254 
1255             src.data = sblob -> data.base;
1256             src.elem_count = sblob -> data.elem_count;
1257             src.elem_bits = sblob -> data.elem_bits;
1258             src.byte_order = sblob -> byte_order;
1259 
1260             dst.data = rslt -> data.base;
1261             dst.elem_count = (rslt -> data.elem_count << 3) / elem_size;
1262             dst.elem_bits = elem_size;
1263             dst.byte_order = sblob -> byte_order;
1264 
1265             rc = self->u.bf(self->fself, info, &dst, &src, hdr);
1266 
1267             if (rc == 0) {
1268                 if ( dst.header != NULL && dst.header != hdr ) {
1269                     /* only allow replacement of headers when encoding */
1270                     VBlobHeaderRelease ( dst.header );
1271                 }
1272 
1273                 rslt->data.elem_bits = dst.elem_bits;
1274                 rslt->data.elem_count = dst.elem_count;
1275                 rslt->byte_order = dst.byte_order;
1276 
1277                 rc = KDataBufferCast(&rslt->data, &rslt->data, elem_size, true);
1278             }
1279         }
1280         VBlobHeaderRelease(hdr);
1281     }
1282 
1283     return rc;
1284 }
1285 
1286 static
VFunctionProdCallBlobFunc(VFunctionProd * self,VBlob ** prslt,int64_t id,const VXformInfo * info,Vector * args)1287 rc_t VFunctionProdCallBlobFunc( VFunctionProd *self, VBlob **prslt,
1288     int64_t id, const VXformInfo *info, Vector *args ) {
1289     VBlob *rslt = 0;
1290     VBlob *sblob;
1291     rc_t rc;
1292 
1293     sblob = VectorGet(args, 0);
1294     assert(sblob);
1295     if(self->dad.chain == chainEncoding){
1296 	VBlobAddRef(sblob);
1297 	if(sblob->headers==NULL)/**first in encryption chain***/
1298 		VBlobPageMapOptimize(&sblob); /** try to optimize the blob **/
1299     }
1300 
1301 #if PROD_NAME
1302     rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, self->dad.name);
1303 #else
1304     rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, "VFunctionProdCallBlobFunc");
1305 #endif
1306     if (rc)
1307         return rc;
1308 
1309     TRACK_BLOB(VBlobNew,rslt);
1310 
1311     /* blob funcs are not allowed to change page maps */
1312     rslt->pm = sblob->pm;
1313     PageMapAddRef(rslt->pm);
1314 
1315     rslt->byte_order = sblob->byte_order;
1316 
1317     if (self->dad.chain == chainEncoding){
1318         rc = VFunctionProdCallBlobFuncEncoding(self, rslt, id, info, sblob);
1319 	vblob_release( sblob, NULL );
1320     } else {
1321         rc = VFunctionProdCallBlobFuncDecoding(self, rslt, id, info, sblob);
1322     }
1323 
1324     if (rc == 0) {
1325         *prslt = rslt;
1326         return 0;
1327     }
1328     vblob_release( rslt, NULL );
1329     return rc;
1330 }
1331 
1332 static
VFunctionProdCallBlobNFunc(VFunctionProd * self,VBlob ** rslt,int64_t id,const VXformInfo * info,Vector * args)1333 rc_t VFunctionProdCallBlobNFunc( VFunctionProd *self, VBlob **rslt,
1334     int64_t id, const VXformInfo *info, Vector *args ) {
1335     const VBlob *on_stack[16];
1336     const VBlob **on_heap;
1337     const VBlob **argv;
1338     int argc = VectorLength(args);
1339     rc_t rc;
1340 
1341     VECTOR_TO_ARRAY(argc, argv, on_stack, on_heap, args);
1342     {
1343 	int i;
1344 	for(i=0;i<argc;i++){
1345 		VBlob const *vb=argv[i];
1346 		if(vb->pm == NULL){
1347 			rc=PageMapProcessGetPagemap( VCursorPageMapProcessRequest ( self -> curs ),(PageMap**)(&vb->pm));
1348 			if(rc != 0) return rc;
1349 		}
1350 	}
1351     }
1352     rc = self->u.bfN(self->fself, info, id, rslt, argc, argv);
1353     if ( on_heap )
1354         free( (void*) on_heap );
1355     return rc;
1356 }
1357 
1358 static
VFunctionProdCallLegacyBlobFunc(VFunctionProd * self,VBlob ** prslt,int64_t id,const VXformInfo * info,Vector * args)1359 rc_t VFunctionProdCallLegacyBlobFunc( VFunctionProd *self, VBlob **prslt,
1360     int64_t id, const VXformInfo *info, Vector *args ) {
1361     VBlob *rslt = 0;
1362     VBlob *sblob;
1363     VNoHdrBlobFunc func = (VNoHdrBlobFunc)self->u.bf;
1364     rc_t rc;
1365     uint32_t elem_size = VTypedescSizeof(&self->dad.desc);
1366 
1367     sblob = VectorGet(args, 0);
1368     assert(sblob);
1369 
1370 #if PROD_NAME
1371     rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, self->dad.name);
1372 #else
1373     rc = VBlobNew(&rslt, sblob->start_id, sblob->stop_id, "VFunctionProdCallLegacyBlobFunc");
1374 #endif
1375     TRACK_BLOB(VBlobNew,rslt);
1376     if (rc == 0) {
1377         rc = KDataBufferMakeBytes(&rslt->data, 0);
1378         if (rc == 0) {
1379             VLegacyBlobResult dst;
1380             dst.dst = & rslt -> data;
1381             dst.byte_order = vboLittleEndian;
1382             rc = func(self->fself,
1383                       info,
1384                       &dst,
1385                       &sblob->data
1386                       );
1387 
1388             if (rc == 0)
1389             {
1390                 rslt->byte_order = dst.byte_order;
1391 
1392                 rc = KDataBufferCast(&rslt->data, &rslt->data, elem_size, true);
1393                 if (rc == 0) {
1394                     rslt->pm = sblob->pm;
1395                     PageMapAddRef(rslt->pm);
1396 
1397                     *prslt = rslt;
1398                     return 0;
1399                 }
1400             }
1401         }
1402 
1403         vblob_release( rslt, NULL );
1404     }
1405     return rc;
1406 }
1407 
1408 static
VFunctionProdCallByteswap(VFunctionProd * self,VBlob ** vblob,int64_t id,const VXformInfo * info,Vector * args)1409 rc_t VFunctionProdCallByteswap ( VFunctionProd *self, VBlob **vblob,
1410     int64_t id, const VXformInfo *info, Vector *args )
1411 {
1412     /* get single input blob */
1413     VBlob *blob = VectorFirst ( args );
1414     rc_t rc;
1415 
1416     assert ( blob != NULL );
1417 
1418 #if PROD_CACHE
1419     VProductionFlushCacheDeep ( & self -> dad, "byteswap" );
1420 #endif
1421 
1422     /* CAST */
1423     rc = KDataBufferCast ( & blob->data, & blob->data,
1424                            self->dad.desc.intrinsic_bits * self->dad.desc.intrinsic_dim,
1425                            false );
1426     if ( rc == 0 )
1427     {
1428         /* legacy blob check
1429          * repair missing pagemap
1430          */
1431         if (blob->pm == NULL) {
1432             uint64_t row_count = BlobRowCount ( blob );
1433             if ( row_count == 0 || blob->data.elem_count % row_count != 0)
1434                 rc = RC(rcVDB, rcBlob, rcReading, rcBlob, rcCorrupt);
1435             else {
1436                 uint64_t row_len = blob->data.elem_count / row_count;
1437                 rc = PageMapNewFixedRowLength(&blob->pm, row_count, row_len);
1438             }
1439         }
1440     }
1441 
1442     if ( rc != 0)
1443         return rc;
1444 
1445     /* check for byteswapping function */
1446     if ( self -> u.bswap != NULL )
1447     {
1448 
1449         if ( blob -> byte_order ==
1450 #if __BYTE_ORDER == __LITTLE_ENDIAN
1451              vboBigEndian
1452 #else
1453              vboLittleEndian
1454 #endif
1455             )
1456         {
1457             uint32_t int_size;
1458             uint64_t blob_bits;
1459 
1460             /* make writable */
1461             KDataBuffer buffer;
1462 
1463             rc = KDataBufferMakeWritable ( & blob -> data, & buffer );
1464             if ( rc != 0 )
1465                 return rc;
1466 
1467             /* invoke byte-swap function on input */
1468             blob_bits = KDataBufferBits ( & buffer );
1469             int_size = self -> dad.desc .intrinsic_bits;
1470             ( * self -> u.bswap ) ( buffer.base, buffer.base,
1471                 ( uint32_t ) ( blob_bits / int_size ) );
1472 
1473             /* poke bytes back into blob */
1474             KDataBufferWhack ( & blob -> data );
1475             blob -> data = buffer;
1476         }
1477     }
1478 
1479     blob -> byte_order = vboNative;
1480     *vblob = blob;
1481 
1482     (void)VBlobAddRef ( blob );
1483     TRACK_BLOB( VBlobAddRef, blob );
1484 
1485     return 0;
1486 }
1487 
1488 #if _DEBUGGING
1489 
1490 static
VFunctionProdCallCompare1(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1491 rc_t VFunctionProdCallCompare1(VFunctionProd *self, VBlob **vblob, int64_t id, uint32_t cnt) {
1492     VBlob *orig;
1493     rc_t rc;
1494 
1495     *vblob = NULL;
1496     assert(VectorLength(&self->parms) == 2);
1497     rc = VProductionReadBlob((const VProduction *)VectorGet(&self->parms, 0), &orig, &id, cnt, NULL);
1498     if (rc == 0) {
1499         int64_t i;
1500         PageMapIterator oi;
1501         VRowData orig_data;
1502         const VProduction *test_prod = VectorGet(&self->parms, 1);
1503 
1504         memset(&orig_data, 0, sizeof(orig_data));
1505         orig_data.u.data.base = orig->data.base;
1506         orig_data.u.data.elem_bits = orig->data.elem_bits;
1507 
1508         PageMapNewIterator(orig->pm, &oi, 0, -1);
1509 
1510         for (i = orig->start_id; i <= orig->stop_id; ++i) {
1511             VBlob *test;
1512             uint32_t j;
1513 
1514             j = PageMapIteratorDataLength(&oi);
1515 
1516             rc = VProductionReadBlob(test_prod, &test, &i, 1, NULL);
1517             if (rc == 0) {
1518                 if (orig->data.elem_bits != test->data.elem_bits || orig->byte_order != test->byte_order)
1519                     rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1520                 else {
1521                     PageMapIterator ti;
1522                     VRowData test_data;
1523 
1524                     memset(&test_data, 0, sizeof(test_data));
1525                     test_data.u.data.base = test->data.base;
1526                     test_data.u.data.elem_bits = test->data.elem_bits;
1527 
1528                     PageMapNewIterator(test->pm, &ti, 0, -1);
1529 
1530                     if (!PageMapIteratorAdvance(&ti, (uint32_t)(i - test->start_id))) {
1531                         rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1532                     }
1533                     else {
1534                         uint32_t k = PageMapIteratorDataLength(&ti);
1535                         orig_data.u.data.elem_count = test_data.u.data.elem_count = j;
1536 
1537                         orig_data.u.data.first_elem = (orig->data.bit_offset / orig->data.elem_bits) + PageMapIteratorDataOffset(&oi);
1538                         test_data.u.data.first_elem = (test->data.bit_offset / test->data.elem_bits) + PageMapIteratorDataOffset(&ti);
1539 
1540                         if (j != k) {
1541                             rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1542                         } else {
1543                             rc = self->u.cf(self->fself, &orig_data, &test_data);
1544                             if (rc) {
1545                                 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1546                             }
1547                         }
1548                         if (rc) {
1549                             const uint8_t *a = orig_data.u.data.base;
1550                             const uint8_t *b = test_data.u.data.base;
1551                             unsigned count;
1552                             unsigned k;
1553                             unsigned m;
1554                             char f, ax[9 + 16 * 4 + 1], bx[9 + 16 * 4 + 1];
1555                             char av[16], bv[16];
1556 
1557                             a += (orig_data.u.data.first_elem * orig_data.u.data.elem_bits) >> 3;
1558                             b += (test_data.u.data.first_elem * test_data.u.data.elem_bits) >> 3;
1559                             /* show up to a row of data before */
1560                             count = a - (const uint8_t*)orig_data.u.data.base;
1561                             count = count < b - (const uint8_t*)orig_data.u.data.base ? count : b - (const uint8_t*)orig_data.u.data.base;
1562                             count = count > 16 ? 16 : count;
1563                             a -= count;
1564                             b -= count;
1565 
1566                             count += (j * orig->data.elem_bits + 7) >> 3;
1567 
1568                             for (k = 0, m = 0; k != count; ++k) {
1569                                 if (m == 0) {
1570                                     sprintf(ax, "%08X>", k);
1571                                     sprintf(bx, "%08X<", k);
1572                                 }
1573                                 f = a[k] == b[k] ? ' ': '*';
1574                                 sprintf(ax + m * 4 + 9, " %02x%c", a[k], f);
1575                                 av[m] = isprint(a[k]) ? a[k] : '.';
1576                                 sprintf(bx + m * 4 + 9, " %02x%c", b[k], f);
1577                                 bv[m] = isprint(b[k]) ? b[k] : '.';
1578                                 m++;
1579                                 if(m == 16 || k == count - 1) {
1580                                     DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%-73s '%.*s'\n%-73s '%.*s'\n\n", ax, m, av, bx, m, bv));
1581                                     m = 0;
1582                                 }
1583                             }
1584                         }
1585                     }
1586                 }
1587                 vblob_release(test, NULL);
1588                 if (rc)
1589                     break; }
1590             else
1591                 break;
1592             PageMapIteratorAdvance(&oi, 1);
1593         }
1594         vblob_release(orig, NULL);
1595     }
1596     return rc;
1597 }
1598 #endif
1599 
1600 static
VFunctionProdCallCompare(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1601 rc_t VFunctionProdCallCompare( VFunctionProd *self, VBlob **vblob, int64_t id, uint32_t cnt ) {
1602     VBlob *orig;
1603     rc_t rc;
1604     VProduction const *orig_prod;
1605 
1606     *vblob = NULL;
1607     assert(VectorLength(&self->parms) == 2);
1608     orig_prod = (const VProduction *)VectorGet(&self->parms, 0);
1609     rc = VProductionReadBlob(orig_prod, &orig, &id, cnt, NULL);
1610     if (rc == 0) {
1611         VBlob *test;
1612         const VProduction *test_prod = VectorGet(&self->parms, 1);
1613 
1614         rc = VProductionReadBlob(test_prod, &test, &id, cnt, NULL);
1615         if (rc == 0) {
1616             if (orig->data.elem_bits != test->data.elem_bits || orig->byte_order != test->byte_order){
1617                 rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1618             } else if(   orig->pm->data_recs != 1				/*** catching static **/
1619                       || test->pm->data_recs != 1				/*** trying quick comparison **/
1620 		      || orig->data.elem_count != test->data.elem_count
1621                       || memcmp( orig->data.base, test->data.base, (orig->data.elem_bits*orig->data.elem_count+7)/8)){
1622                 uint64_t i;
1623                 PageMapIterator oi;
1624                 PageMapIterator ti;
1625                 VRowData orig_data;
1626                 VRowData test_data;
1627 
1628                 memset(&orig_data, 0, sizeof(orig_data));
1629                 orig_data.u.data.base = orig->data.base;
1630                 orig_data.u.data.elem_bits = orig->data.elem_bits;
1631 
1632                 memset(&test_data, 0, sizeof(test_data));
1633                 test_data.u.data.base = test->data.base;
1634                 test_data.u.data.elem_bits = test->data.elem_bits;
1635 
1636                 PageMapNewIterator(orig->pm, &oi, 0, -1);
1637                 PageMapNewIterator(test->pm, &ti, 0, -1);
1638                 if (test->start_id < orig->start_id) {
1639                     if ( !PageMapIteratorAdvance( &ti, (uint32_t)( orig->start_id - test->start_id ) ) ) {
1640                         rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1641                         DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: page map mismatch at row %li\n", self->dad.name, id));
1642                     }
1643                 }
1644 
1645                 for (i = orig->start_id; rc == 0; ) {
1646                     uint32_t elem_count;
1647                     uint64_t prev_i = i;
1648 
1649                     orig_data.u.data.first_elem = (orig->data.bit_offset / orig->data.elem_bits) + PageMapIteratorDataOffset(&oi);
1650                     test_data.u.data.first_elem = (test->data.bit_offset / test->data.elem_bits) + PageMapIteratorDataOffset(&ti);
1651 
1652                     for (elem_count = 0; ; ) {
1653                         bool done = false;
1654                         uint32_t j;
1655                         uint32_t k;
1656 
1657                         j = PageMapIteratorDataLength(&oi);
1658                         k = PageMapIteratorDataLength(&ti);
1659                         if (j != k) {
1660                             rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1661                             DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: length mismatch at row %li ( original=%u, test=%u )\n", self->dad.name, i, j, k));
1662                             break;
1663                         }
1664                         elem_count += j;
1665 
1666                         j = PageMapIteratorRepeatCount(&oi);
1667                         k = PageMapIteratorRepeatCount(&ti);
1668                         if (j != k) {
1669                             done = true;
1670                             if (j > k)
1671                                 j = k;
1672                         }
1673                         if (PageMapIteratorAdvance(&ti, j) != PageMapIteratorAdvance(&oi, j)) {
1674                             rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1675                             DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: page map corrupt at row %li\n", self->dad.name, i));
1676                             break;
1677                         }
1678                         i += j;
1679 			if ( done || (int64_t)i > orig->stop_id || test->pm->random_access || orig->pm->random_access || !PageMapIteratorAdvance( &ti, 0 ) )
1680                             break;
1681                     }
1682                     if (rc)
1683                         break;
1684                     if ( (int64_t)i > ( orig->stop_id + 1 ) ) {
1685                         rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1686                         (void)prev_i; /* shut up warning when not printing debug msg */
1687                         DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: page map has too many rows at row %li\n", self->dad.name, prev_i));
1688                         break;
1689                     }
1690 
1691                     orig_data.u.data.elem_count = test_data.u.data.elem_count = elem_count;
1692 
1693                     rc = self->u.cf(self->fself, &orig_data, &test_data);
1694                     if (rc) {
1695                         rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1696                         DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: data mismatch at row %li\n", self->dad.name, prev_i));
1697                         break;
1698                     }
1699                     if ((int64_t)i > orig->stop_id )
1700                         break;
1701 
1702                     /* check to see if the test iterator is at end
1703                      * and if so, fetch next blob */
1704                     if (!PageMapIteratorAdvance(&ti, 0)) {
1705                         VBlob *temp;
1706                         int64_t row = i;
1707                         rc = VProductionReadBlob(test_prod, &temp, &row, orig->stop_id - row, NULL);
1708                         if (rc == 0) {
1709                             vblob_release(test, NULL);
1710                             test = temp;
1711                             test_data.u.data.base = test->data.base;
1712                             PageMapNewIterator(test->pm, &ti, 0, -1);
1713                             if ( test->start_id < row ) {
1714                                 if ( !PageMapIteratorAdvance( &ti, (uint32_t)( row - test->start_id ) ) ) {
1715                                     rc = RC(rcVDB, rcBlob, rcValidating, rcBlob, rcCorrupt);
1716                                     DBGMSG(DBG_VDB, DBG_VDB_COMPARE, ("%s: page map mismatch at row %li\n", self->dad.name, row));
1717                                 }
1718                             }
1719                         }
1720                     }
1721                 }
1722             }
1723             vblob_release(test, NULL);
1724         }
1725         vblob_release(orig, NULL);
1726     }
1727     return rc;
1728 }
1729 
1730 typedef struct fetch_param_blob_data fetch_param_blob_data;
1731 struct fetch_param_blob_data
1732 {
1733     int64_t id;
1734     uint32_t cnt;
1735     int64_t range_start_id;
1736     int64_t range_stop_id;
1737     Vector *inputs;
1738     VBlob *vblob;
1739     rc_t rc;
1740     bool no_cache;
1741 };
1742 
fetch_param_blob_data_init(fetch_param_blob_data * pb,int64_t id,uint32_t cnt,Vector * inputs)1743 static void fetch_param_blob_data_init(fetch_param_blob_data *pb,int64_t id,uint32_t cnt,Vector *inputs)
1744 {
1745     pb->id = id;
1746     pb->cnt = cnt;
1747     pb->inputs = inputs;
1748     pb->rc = 0;
1749     pb->vblob = NULL;
1750     pb->range_start_id=INT64_MIN;
1751     pb->range_stop_id =INT64_MAX;
1752     pb->no_cache = false;
1753 }
1754 static
fetch_param_blob(void * item,void * data)1755 bool CC fetch_param_blob ( void *item, void *data )
1756 {
1757     fetch_param_blob_data *pb = data;
1758     VBlob *blob;
1759 
1760     pb -> rc = VProductionReadBlob ( item, & blob, & pb -> id , pb -> cnt, NULL);
1761     if ( pb -> rc == 0 )
1762     {
1763         pb -> rc = VectorAppend ( pb -> inputs, NULL, blob );
1764         if ( pb -> rc == 0 ) {
1765             pb->no_cache |= blob->no_cache;
1766 	    if(blob->start_id > pb->range_start_id) pb->range_start_id=blob->start_id;
1767 	    if(blob->stop_id  < pb->range_stop_id)  pb->range_stop_id =blob->stop_id;
1768             return false;
1769         }
1770         vblob_release ( blob, NULL );
1771     }
1772 
1773     return true;
1774 }
1775 
1776 static
fetch_first_param_blob(void * item,void * data)1777 bool CC fetch_first_param_blob ( void *item, void *data )
1778 {
1779     fetch_param_blob_data *pb = data;
1780 
1781     pb -> rc = VProductionReadBlob ( item, &pb->vblob, &pb -> id , pb -> cnt, NULL);
1782     if (GetRCState(pb->rc) == rcNotFound)
1783         return false;
1784     if ( pb -> vblob -> data.elem_count == 0 )
1785         return false;
1786     pb->range_start_id=pb->vblob->start_id;
1787     pb->range_stop_id =pb->vblob->stop_id;
1788     return true;
1789 }
1790 
1791 static
VFunctionProdSelect(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1792 rc_t VFunctionProdSelect ( VFunctionProd *self, VBlob **vblob, int64_t id, uint32_t cnt ) {
1793     fetch_param_blob_data pb;
1794     fetch_param_blob_data_init(&pb,id,cnt,NULL);
1795     VectorDoUntil ( & self -> parms, false, fetch_first_param_blob, & pb );
1796     * vblob = pb.vblob;
1797     return pb.rc;
1798 }
1799 
1800 static
VFunctionProdPassThrough(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1801 rc_t VFunctionProdPassThrough ( VFunctionProd *self, VBlob **vblob, int64_t id, uint32_t cnt ) {
1802     assert(VectorLength(&self->parms) == 1);
1803     return VProductionReadBlob(VectorGet(&self->parms, 0), vblob, &id, cnt, NULL);
1804 }
1805 
VFunctionProdReadNormal(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1806 static rc_t VFunctionProdReadNormal ( VFunctionProd *self, VBlob **vblob, int64_t id ,uint32_t cnt)
1807 {
1808     rc_t rc;
1809     Vector inputs;
1810     fetch_param_blob_data pb;
1811     VBlob *vb=NULL;
1812     int64_t	id_run;
1813     int64_t     cnt_run;
1814 
1815     /* fill out information for function to use */
1816     const VCursor *curs = self -> curs;
1817     VXformInfo info;
1818 
1819     if(cnt == 0) cnt = 1;
1820 
1821 #if VMGR_PASSED_TO_XFORM
1822     info . mgr = curs -> tbl -> mgr;
1823 #endif
1824 #if VSCHEMA_PASSED_TO_XFORM
1825     info . schema = curs -> schema;
1826 #endif
1827 #if VTABLE_PASSED_TO_XFORM
1828     info . tbl = VCursorGetTable ( curs );
1829 #endif
1830 #if VPRODUCTION_PASSED_TO_XFORM
1831     info . prod = & self -> dad;
1832 #endif
1833     info . fdesc . fd = self -> dad . fd;
1834     info . fdesc . desc = self -> dad . desc;
1835     *vblob = NULL;
1836 
1837     if (self->dad.sub == prodFuncBuiltInCompare) {
1838         rc = VFunctionProdCallCompare(self, vblob, id, cnt);
1839 #if _DEBUGGING
1840         if (rc != 0)
1841             rc = VFunctionProdCallCompare1(self, vblob, id, cnt);
1842 #endif
1843         return rc;
1844     }
1845 
1846     /* all other functions take some form of blob input */
1847     VectorInit ( & inputs, 0, VectorLength ( & self -> parms ) );
1848     fetch_param_blob_data_init(&pb,id,cnt,&inputs);
1849     if ( VectorDoUntil ( & self -> parms, false, fetch_param_blob, & pb ) )
1850         rc = pb . rc;
1851     else for( id_run=id, cnt_run=cnt, rc=0; cnt_run > 0 && rc==0;)
1852     {
1853         switch ( self -> dad . sub )
1854         {
1855         case vftLegacyBlob:
1856             rc = VFunctionProdCallLegacyBlobFunc ( self, &vb, id_run, & info, & inputs );
1857             break;
1858         case vftNonDetRow:
1859             rc = VFunctionProdCallNDRowFunc ( self, &vb, id_run, & info, & inputs );
1860             break;
1861         case vftRow:
1862 	    case vftRowFast:
1863         case vftIdDepRow:
1864             rc = VFunctionProdCallRowFunc ( self, &vb, id_run, cnt_run, & info, & inputs, pb.range_start_id,pb.range_stop_id );
1865             break;
1866         case vftArray:
1867             rc = VFunctionProdCallArrayFunc ( self, &vb, id_run, & info, & inputs );
1868             break;
1869         case vftFixedRow:
1870             rc = VFunctionProdCallPageFunc ( self, &vb, id_run, & info, & inputs );
1871             break;
1872         case vftBlob:
1873             rc = VFunctionProdCallBlobFunc ( self, &vb, id_run, & info, & inputs );
1874             break;
1875         case vftBlobN:
1876             rc = VFunctionProdCallBlobNFunc ( self, &vb, id_run, & info, & inputs );
1877             break;
1878         case prodFuncByteswap:
1879             rc = VFunctionProdCallByteswap ( self, &vb, id_run, & info, & inputs );
1880             break;
1881         default:
1882             rc = RC ( rcVDB, rcFunction, rcReading, rcProduction, rcCorrupt );
1883         }
1884         if (rc == 0) {
1885             if (vb == NULL) {
1886                 rc = RC ( rcVDB, rcFunction, rcReading, rcProduction, rcNull );
1887             }
1888             else {
1889                 if (vb -> start_id > id_run || vb -> stop_id < id_run) { /*** shoudn't happen ***/
1890                     rc = RC ( rcVDB, rcBlob, rcReading, rcRange, rcInsufficient );
1891                 }
1892                 if (*vblob == NULL) {
1893                     *vblob=vb;
1894                 }
1895                 else {
1896                     if (vb -> start_id <= id) {/** new blob is not appendable, but can replace the current one **/
1897                         vblob_release(*vblob, NULL);
1898                         *vblob = vb;
1899                     }
1900                     else {
1901                         /*** append here **/
1902                         rc = VBlobAppend(*vblob, vb);
1903                         vblob_release(vb, NULL);
1904                     }
1905                 }
1906                 /* propagate dirty flag */
1907                 (*vblob)->no_cache |= pb.no_cache;
1908                 if( (*vblob) -> stop_id >= id + cnt - 1)
1909                     break;
1910 
1911                 id_run  = (*vblob) -> stop_id + 1;
1912                 cnt_run = id + cnt - id_run;
1913 
1914             }
1915         }
1916         else if (id < id_run) {
1917             /* if there is reblobbing and our result blob already contains some
1918              * data, then return that data and not error out */
1919             rc = 0;
1920             break;
1921         }
1922     }
1923     /* drop input blobs */
1924     VectorWhack ( & inputs, vblob_release, NULL );
1925     return rc;
1926 }
1927 
VFunctionProdRead(VFunctionProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)1928 rc_t VFunctionProdRead ( VFunctionProd *self, VBlob **vblob, int64_t id , uint32_t cnt)
1929 {
1930     if ( self -> dad . sub == vftSelect )
1931         return VFunctionProdSelect ( self, vblob, id , cnt);
1932     if ( self -> dad . sub == vftPassThrough )
1933         return VFunctionProdPassThrough ( self, vblob, id , cnt);
1934     return VFunctionProdReadNormal(self, vblob, id, cnt);
1935 }
1936 
1937 typedef struct fetch_param_IdRange_data fetch_param_IdRange_data;
1938 struct fetch_param_IdRange_data
1939 {
1940     int64_t first;
1941     int64_t last;
1942     rc_t rc;
1943     bool first_time;
1944 };
1945 
1946 static
fetch_param_IdRange(void * item,void * data)1947 bool CC fetch_param_IdRange ( void *item, void *data )
1948 {
1949     fetch_param_IdRange_data *pb = data;
1950     int64_t first;
1951     int64_t last;
1952     rc_t rc;
1953 
1954     rc = VProductionColumnIdRange(item, &first, &last);
1955     if (GetRCState(rc) == rcEmpty && GetRCObject(rc) == rcRange)
1956         return false;
1957 
1958     pb->rc = rc;
1959     if (rc == 0 )
1960     {
1961         if (pb->first_time || first < pb->first)
1962             pb->first = first;
1963         if (pb->first_time || last > pb->last)
1964             pb->last = last;
1965         pb->first_time = false;
1966         return false;
1967     }
1968 
1969     return true;
1970 }
1971 
VFunctionProdColumnIdRange(const VFunctionProd * self,int64_t * first,int64_t * last)1972 LIB_EXPORT rc_t CC VFunctionProdColumnIdRange ( const VFunctionProd *self, int64_t *first, int64_t *last )
1973 {
1974     fetch_param_IdRange_data pb;
1975 
1976     pb.first_time = true;
1977     pb . rc = 0;
1978     pb.first = 1;
1979     pb.last = 0;
1980 
1981     VectorDoUntil ( & self -> parms, false, fetch_param_IdRange, & pb );
1982 
1983     if (pb.rc == 0) {
1984 #if 0
1985 /* this causes problems in the loaders */
1986         if(pb.first_time){ /** no parameters - some function which generated data; f.e.  meta_value() ***/
1987               pb.last = INT64_MAX;
1988         }
1989 #endif
1990         *first = pb.first;
1991         *last = pb.last;
1992     }
1993     return pb . rc;
1994 }
1995 
1996 typedef struct fetch_param_FixedRowLength_data fetch_param_FixedRowLength_data;
1997 struct fetch_param_FixedRowLength_data
1998 {
1999     uint32_t length;
2000     int64_t row_id;
2001     bool first_time;
2002 };
2003 
2004 static
fetch_param_FixedRowLength(void * item,void * data)2005 bool CC fetch_param_FixedRowLength ( void *item, void *data )
2006 {
2007     fetch_param_FixedRowLength_data *pb = data;
2008     uint32_t length;
2009 
2010     if (((const VProduction *)item)->control == false) {
2011         length = VProductionFixedRowLength(item, pb->row_id, false);
2012 
2013         if (pb->first_time)
2014             pb->length = length;
2015 
2016         pb->first_time = false;
2017 
2018         if (length == 0 || length != pb->length)
2019             return true;
2020     }
2021     return false;
2022 }
2023 
2024 static
VFunctionProdFixedRowLength(const VFunctionProd * self,int64_t row_id,bool ignore_self)2025 uint32_t VFunctionProdFixedRowLength ( const VFunctionProd *self, int64_t row_id, bool ignore_self )
2026 {
2027     fetch_param_FixedRowLength_data pb;
2028 
2029     if ( ! ignore_self )
2030     {
2031         switch ( self -> dad . sub )
2032         {
2033         case vftRow:
2034 		case vftRowFast:
2035         case vftNonDetRow:
2036         case vftIdDepRow:
2037             return 0;
2038         }
2039     }
2040 
2041     pb.first_time = true;
2042     pb.length = 0;
2043 
2044     VectorDoUntil ( & self -> parms, false, fetch_param_FixedRowLength, & pb );
2045 
2046     return pb.length;
2047 }
2048 
2049 
2050 /*--------------------------------------------------------------------------
2051  * VScriptProd
2052  */
2053 
VScriptProdMake(VScriptProd ** prodp,Vector * owned,struct VCursor const * curs,int sub,const char * name,const VFormatdecl * fd,const VTypedesc * desc,uint8_t chain)2054 rc_t VScriptProdMake ( VScriptProd **prodp, Vector *owned, struct VCursor const *curs,
2055     int sub, const char *name, const VFormatdecl *fd,
2056     const VTypedesc *desc, uint8_t chain )
2057 {
2058     VScriptProd *prod;
2059     rc_t rc = VProductionMake ( ( VProduction** ) prodp, owned, sizeof * prod,
2060         prodScript, sub, name, fd, desc, NULL, chain );
2061     if ( rc == 0 )
2062     {
2063         prod = * prodp;
2064         prod -> curs = curs;
2065         VectorInit ( & prod -> owned, 0, 4 );
2066     }
2067     return rc;
2068 }
2069 
VScriptProdDestroy(VScriptProd * self)2070 void VScriptProdDestroy ( VScriptProd *self )
2071 {
2072     VectorWhack ( & self -> owned, VProductionWhack, NULL );
2073 }
2074 
2075 
2076 /* Read
2077  */
VScriptProdRead(VScriptProd * self,VBlob ** vblob,int64_t id,uint32_t cnt)2078 rc_t VScriptProdRead ( VScriptProd *self, VBlob **vblob, int64_t id,uint32_t cnt )
2079 {
2080     return VProductionReadBlob ( self -> rtn, vblob, &id , cnt, NULL);
2081 }
2082 
VScriptProdColumnIdRange(const VScriptProd * self,int64_t * first,int64_t * last)2083 static rc_t VScriptProdColumnIdRange ( const VScriptProd *self, int64_t *first, int64_t *last )
2084 {
2085     return VProductionColumnIdRange(self->rtn, first, last);
2086 }
2087 
VScriptProdFixedRowLength(const VScriptProd * self,int64_t row_id)2088 static uint32_t VScriptProdFixedRowLength ( const VScriptProd *self, int64_t row_id )
2089 {
2090     return VProductionFixedRowLength(self->rtn, row_id, false);
2091 }
2092 
2093 /*--------------------------------------------------------------------------
2094  * VPivotProd
2095  *  potentially pivots to a new row-id space
2096  */
2097 
VPivotProdMake(VPivotProd ** p_prodp,Vector * p_owned,VProduction * p_member,VProduction * p_row_id,const char * p_name,int p_chain)2098 rc_t VPivotProdMake ( VPivotProd ** p_prodp,
2099                       Vector *      p_owned,
2100                       VProduction * p_member,
2101                       VProduction * p_row_id,
2102                       const char *  p_name,
2103                       int           p_chain )
2104 {
2105     VPivotProd * prod;
2106     VFormatdecl fd = { { 0, 0 }, 0 };
2107     VTypedesc desc = { 64, 1, vtdInt };
2108     rc_t rc = VProductionMake ( ( VProduction** ) p_prodp, p_owned, sizeof * prod,
2109         prodPivot, 0, p_name, & fd, & desc, NULL, p_chain );
2110     if ( rc == 0 )
2111     {
2112         prod = * p_prodp;
2113         prod -> member = p_member;
2114         prod -> row_id = p_row_id;
2115     }
2116     return rc;
2117 }
2118 
VPivotProdDestroy(VPivotProd * p_self)2119 void VPivotProdDestroy ( VPivotProd * p_self )
2120 {
2121 }
2122 
2123 /* Read
2124  */
VPivotProdRead(VPivotProd * p_self,struct VBlob ** p_vblob,int64_t * p_id,uint32_t p_cnt)2125 rc_t VPivotProdRead ( VPivotProd * p_self, struct VBlob ** p_vblob, int64_t * p_id, uint32_t p_cnt )
2126 {
2127     struct VBlob * rowIdBlob;
2128     rc_t rc;
2129     assert ( p_id != NULL );
2130     rc = VProductionReadBlob ( p_self -> row_id, & rowIdBlob, p_id , p_cnt, NULL);
2131     if ( rc == 0 )
2132     {
2133         uint32_t elemNum;
2134         uint32_t repeat_count;
2135         uint32_t rowLen = PageMapGetIdxRowInfo ( rowIdBlob -> pm, ( uint32_t ) ( * p_id - rowIdBlob -> start_id ), & elemNum, & repeat_count );
2136         /* assume elem size is 64 */
2137         int64_t newRowId = * ( ( int64_t* ) rowIdBlob -> data . base + elemNum );
2138 
2139 		assert ( rowLen == 1);
2140         assert ( repeat_count == 1);
2141 
2142 		vblob_release ( rowIdBlob, NULL );
2143 
2144 		rc = VProductionReadBlob ( p_self -> member, p_vblob, & newRowId, p_cnt, NULL);
2145         if ( rc == 0 )
2146         {
2147             /* the cache mechanism will get confused by our overwriting p_id, so turn it off */
2148             ( * p_vblob ) -> no_cache = true;
2149             * p_id = newRowId;
2150         }
2151     }
2152     return rc;
2153 }
2154 
2155 /*--------------------------------------------------------------------------
2156  * VProduction
2157  */
2158 
2159 /* Init
2160  *  parent initialization function
2161  *  called from the "Make" functions below
2162  */
2163 #if 0
2164 static
2165 void VProductionInit ( VProduction *self, int var, int sub, const char *name,
2166                        const VFormatdecl *fd, const VTypedesc *desc,
2167                        const VCtxId *cid, uint8_t chain )
2168 {
2169     memset ( self, 0, sizeof * self );
2170 
2171     if ( fd != NULL )
2172         self -> fd = * fd;
2173     if ( desc != NULL )
2174         self -> desc = * desc;
2175     if ( cid != NULL )
2176         self -> cid = * cid;
2177 
2178     self -> var = ( uint8_t ) var;
2179     self -> sub = ( uint8_t ) sub;
2180     self -> chain = chain;
2181 }
2182 #endif
2183 
VProductionWhack(void * item,void * owned)2184 void CC VProductionWhack ( void *item, void *owned )
2185 {
2186     VProduction * self = item;
2187 
2188     if ( self != NULL )
2189     {
2190         if ( owned != NULL)
2191         {
2192             void *ignore;
2193             VectorSwap ( owned, self -> oid, NULL, & ignore );
2194             assert ( ( void* ) self == ignore );
2195         }
2196 
2197 
2198 #if PROD_CACHE
2199         VProductionFlushCacheDeep ( self, "whack" );
2200 #endif
2201         switch ( self -> var )
2202         {
2203         case prodSimple:
2204 #if TRACKING_BLOBS
2205             fprintf( stderr, "VSimpleProd %p being whacked *** %s\n", self, self->name );
2206 #endif
2207             VSimpleProdDestroy ( ( VSimpleProd* ) self );
2208             break;
2209 
2210         case prodFunc:
2211 #if TRACKING_BLOBS
2212             fprintf( stderr, "VFunctionProd %p being whacked *** %s\n", self, self->name );
2213 #endif
2214             VFunctionProdDestroy ( ( VFunctionProd* ) self );
2215             break;
2216 
2217         case prodScript:
2218 #if TRACKING_BLOBS
2219             fprintf( stderr, "VScriptProd %p being whacked *** %s\n", self, self->name );
2220 #endif
2221             VScriptProdDestroy ( ( VScriptProd* ) self );
2222             break;
2223 
2224         case prodPhysical:
2225 #if TRACKING_BLOBS
2226             fprintf( stderr, "VPhysicalProd %p being whacked *** %s\n", self, self->name );
2227 #endif
2228             VPhysicalProdDestroy ( ( VPhysicalProd* ) self );
2229             break;
2230 
2231         case prodColumn:
2232 #if TRACKING_BLOBS
2233             fprintf( stderr, "VColumnProd %p being whacked *** %s\n", self, self->name );
2234 #endif
2235             VColumnProdDestroy ( ( VColumnProd* ) self );
2236             break;
2237 
2238         case prodPivot:
2239 #if TRACKING_BLOBS
2240             fprintf( stderr, "VPivotProd %p being whacked *** %s\n", self, self->name );
2241 #endif
2242             VPivotProdDestroy ( ( VPivotProd* ) self );
2243             break;
2244         }
2245 
2246         free ( self );
2247     }
2248 }
2249 
2250 /* Cmp
2251  * Sort
2252  *  compare item is a VCtxId
2253  *  sort item is a VProduction
2254  *  n is always a VProduction
2255  */
VProductionCmp(const void * item,const void * n)2256 LIB_EXPORT int64_t CC VProductionCmp ( const void *item, const void *n )
2257 {
2258     const VCtxId *a = item;
2259     const VProduction *b = n;
2260     return VCtxIdCmp ( a, & b -> cid );
2261 }
2262 
VProductionSort(const void * item,const void * n)2263 LIB_EXPORT int64_t CC VProductionSort ( const void *item, const void *n )
2264 {
2265     const VProduction *a = item;
2266     const VProduction *b = n;
2267     return VCtxIdCmp ( & a -> cid, & b -> cid );
2268 }
2269 
2270 
2271 /* IdRange
2272  *  obtains intersection of all physical sources
2273  *
2274  *  "first" [ IN/OUT ] and "last" [ IN/OUT ] - range to intersect
2275  */
VProductionColumnIdRange(const VProduction * self,int64_t * first,int64_t * last)2276 rc_t VProductionColumnIdRange ( const VProduction *self,
2277     int64_t *first, int64_t *last )
2278 {
2279     if ( self <= FAILED_PRODUCTION )
2280         return 0;
2281 
2282     switch ( self -> var )
2283     {
2284     case prodSimple:
2285         return VProductionColumnIdRange ( ( ( const VSimpleProd* ) self ) -> in, first, last );
2286     case prodFunc:
2287         return VFunctionProdColumnIdRange((const VFunctionProd *)self, first, last);
2288     case prodScript:
2289         return VScriptProdColumnIdRange((const VScriptProd *)self, first, last);
2290     case prodPhysical:
2291         return VPhysicalProdColumnIdRange((const VPhysicalProd *)self, first, last);
2292     case prodPivot:
2293         return VProductionColumnIdRange( ( (const VPivotProd *)self ) -> member, first, last);
2294     case prodColumn:
2295         return RC ( rcVDB, rcColumn, rcAccessing, rcRange, rcEmpty );
2296     }
2297 
2298     return RC ( rcVDB, rcColumn, rcAccessing, rcType, rcUnknown );
2299 }
2300 
VProductionPageIdRange(VProduction * self,int64_t id,int64_t * first,int64_t * last)2301 rc_t VProductionPageIdRange ( VProduction *self,
2302     int64_t id, int64_t *first, int64_t *last )
2303 {
2304     VBlob *blob;
2305     rc_t rc = VProductionReadBlob ( self, & blob, & id , 1, NULL);
2306     if ( rc == 0 )
2307     {
2308         * first = blob -> start_id;
2309         * last = blob -> stop_id;
2310 
2311         vblob_release ( blob, NULL );
2312     }
2313     return rc;
2314 }
2315 
2316 /* RowLength
2317  *  get row length for a particular row
2318  */
VProductionRowLength(const VProduction * self,int64_t row_id)2319 uint32_t VProductionRowLength ( const VProduction *self, int64_t row_id )
2320 {
2321     uint32_t row_len;
2322 
2323     VBlob *blob;
2324     rc_t rc = VProductionReadBlob ( self, & blob, & row_id, 1, NULL );
2325     if ( rc != 0 )
2326         return 0;
2327 
2328     row_len = PageMapGetIdxRowInfo ( blob -> pm, (uint32_t)( row_id - blob -> start_id) , NULL, NULL );
2329 
2330     vblob_release ( blob, NULL );
2331 
2332     return row_len;
2333 }
2334 
2335 /* FixedRowLength
2336  *  get fixed row length for entire column
2337  *  returns 0 if not fixed
2338  */
VProductionFixedRowLength(const VProduction * self,int64_t row_id,bool ignore_self)2339 uint32_t VProductionFixedRowLength ( const VProduction *self, int64_t row_id,bool ignore_self )
2340 {
2341     switch ( self -> var )
2342     {
2343     case prodSimple:
2344         return VProductionFixedRowLength ( ( ( const VSimpleProd* ) self ) -> in, row_id, ignore_self );
2345     case prodFunc:
2346         return VFunctionProdFixedRowLength((const VFunctionProd *)self, row_id, ignore_self);
2347     case prodScript:
2348         return VScriptProdFixedRowLength((const VScriptProd *)self, row_id);
2349     case prodPhysical:
2350         return VPhysicalProdFixedRowLength((const VPhysicalProd *)self, row_id);
2351     case prodPivot:
2352         assert(false); /*TODO*/
2353     }
2354 
2355     return RC ( rcVDB, rcColumn, rcAccessing, rcType, rcUnknown );
2356 }
2357 
2358 /* ReadBlob
2359  */
VProductionReadBlob(const VProduction * cself,VBlob ** vblob,int64_t * p_id,uint32_t cnt,VBlobMRUCacheCursorContext * cctx)2360 rc_t VProductionReadBlob ( const VProduction *cself, VBlob **vblob, int64_t * p_id, uint32_t cnt, VBlobMRUCacheCursorContext *cctx )
2361 {
2362 #if BYTECODE
2363 
2364     rc_t rc;
2365     VProduction *self = ( VProduction* ) cself;
2366     if ( self == NULL )
2367         return RC ( rcVDB, rcProduction, rcReading, rcSelf, rcNull );
2368 
2369     struct ByteCodeContext ctx;
2370     ctx . id = * p_id;
2371     ctx . cnt = cnt;
2372     ctx . cctx = cctx;
2373     ctx . result = NULL;
2374     rc = ExecuteByteCode ( bcProductionReadBlob, self, & ctx );
2375     * vblob = ctx . result;
2376     if ( rc == 0 )
2377     {
2378         * p_id = ctx . id;
2379     }
2380     return rc;
2381 
2382 #else
2383 
2384     rc_t rc;
2385     VProduction *self = ( VProduction* ) cself;
2386 
2387 #if PROD_CACHE
2388     int i;
2389     VBlob *blob;
2390 #endif
2391 
2392     * vblob = NULL;
2393 
2394     /* should not be possible, but safety is cheap */
2395     if ( self == NULL )
2396         return RC ( rcVDB, rcProduction, rcReading, rcSelf, rcNull );
2397 
2398     /*** Cursor-level column blobs may be 1-to-1 with production blobs ***/
2399     if(cctx != NULL && self->cctx.cache == NULL ){ /*** we are connected to read cursor **/
2400 	self->cctx = *cctx; /*** remember it ***/
2401 	/** No need to do anything else here - we are on "direct line" to the column ***/
2402     } else if(self->cctx.cache != NULL){
2403 	/** somewhere else this production is connected to a cursor **/
2404 	/** lets try to get answers from the cursor **/
2405 	blob=(VBlob*) VBlobMRUCacheFind(self->cctx.cache,self->cctx.col_idx,*p_id);
2406 	if(blob){
2407 		rc = VBlobAddRef ( blob );
2408                 if ( rc != 0 ) return rc;
2409 		*vblob=blob;
2410 		return 0;
2411 	}
2412     }
2413 
2414 #if PROD_CACHE
2415     /* check cache */
2416     for ( i = 0; i < self -> cache_cnt; ++ i )
2417     {
2418         blob = self -> cache [ i ];
2419         if ( self -> cache [ i ] != NULL )
2420         {
2421             /* check id range */
2422             if (
2423 #if USE_EUGENE
2424                 /* NB - this is an approach where we always cache
2425                    a blob after a read in order to keep it alive,
2426                    but never allow a cache hit on retrieval */
2427                 ! blob -> no_cache &&
2428 #endif
2429                 * p_id >= blob -> start_id &&
2430                 * p_id <= blob -> stop_id )
2431             {
2432                 rc = VBlobAddRef ( blob );
2433                 if ( rc != 0 )
2434                     return rc;
2435 #if TRACKING_BLOBS
2436                 fprintf( stderr, "%p->%p(%d) new reference to cached blob *** %s\n"
2437                          , self
2438                          , blob
2439                          , atomic32_read ( & blob -> refcount )
2440                          , self->name
2441                     );
2442 #endif
2443                 /* return new reference */
2444                 * vblob = blob;
2445 #if PROD_CACHE > 1
2446    #if PROD_CACHE > 2
2447                 /* MRU cache */
2448                 if ( i > 0 )
2449                 {
2450 		    memmove(self -> cache +1,self -> cache,i*sizeof(*self->cache));
2451                     self -> cache [ 0 ] = blob;
2452                 }
2453    #else
2454 		if(i > 0 ){  /** trivial case ***/
2455 		    self -> cache [ 1 ] =  self -> cache [ 0 ];
2456 		    self -> cache [ 0 ] = blob;
2457 		}
2458    #endif
2459 #endif
2460                 return 0;
2461             }
2462         }
2463     }
2464 #endif /* PROD_CACHE */
2465 
2466     /* dispatch */
2467     switch ( self -> var )
2468     {
2469     case prodSimple:
2470         rc = VSimpleProdRead ( ( VSimpleProd* ) self, vblob, p_id, cnt,cctx );
2471         break;
2472     case prodFunc:
2473         rc = VFunctionProdRead ( ( VFunctionProd* ) self, vblob, * p_id , cnt);
2474 #if _DEBUGGING && PROD_NAME
2475         if ( rc != 0 )
2476             DBGMSG ( DBG_VDB, DBG_VDB_FUNCTION, ( "%s: %R\n", self -> name, rc ) );
2477 #endif
2478         break;
2479     case prodScript:
2480         rc = VScriptProdRead ( ( VScriptProd* ) self, vblob, * p_id , cnt);
2481         break;
2482     case prodPhysical:
2483         rc = VPhysicalProdRead ( ( VPhysicalProd* ) self, vblob, * p_id, cnt );
2484         break;
2485     case prodColumn:
2486         rc = VColumnProdRead ( ( VColumnProd* ) self, vblob, * p_id );
2487         break;
2488     case prodPivot:
2489         rc = VPivotProdRead ( ( VPivotProd* ) self, vblob, p_id, cnt );
2490         break;
2491     default:
2492         return RC ( rcVDB, rcProduction, rcReading, rcType, rcUnknown );
2493     }
2494 
2495 #if ! PROD_CACHE
2496     return rc;
2497 #else
2498     blob = * vblob;
2499 
2500     if ( rc != 0 || * vblob == NULL )
2501         return rc;
2502 
2503 #if ! USE_EUGENE
2504          /* NB - there is another caching mechanism on VColumn
2505             if a blob does not want to be cached, it is rejected here */
2506     if ( ! blob -> no_cache )
2507         return 0;
2508 #endif
2509     if(cctx == NULL && self->cctx.cache != NULL && blob->stop_id > blob->start_id + 4){/** we will benefit from caching here **/
2510 		VBlobMRUCacheSave(self->cctx.cache,self->cctx.col_idx,blob);
2511 		return 0;
2512     }
2513 
2514     if(blob->pm == NULL) return 0;
2515 
2516 
2517     /* cache output */
2518     rc = VBlobAddRef ( blob );
2519     if ( rc == 0 )
2520     {
2521         VBlobCheckIntegrity ( blob );
2522 #if PROD_CACHE
2523 	if(self -> cache_cnt < PROD_CACHE){
2524 #if PROD_CACHE > 1
2525 		if(self -> cache_cnt > 0 ){
2526 #if PROD_CACHE > 2
2527 			memmove(self -> cache + 1, self -> cache , self -> cache_cnt * sizeof(*self -> cache));
2528 #else
2529 			self -> cache[1]=self -> cache[0];
2530 #endif
2531 		}
2532 #endif
2533 		self -> cache_cnt ++;
2534 	} else {
2535 		/* release whatever was there previously */
2536         	/* drop LRU */
2537 		vblob_release ( self -> cache [ self -> cache_cnt - 1 ], NULL );
2538 #if PROD_CACHE > 1
2539 #if PROD_CACHE > 2
2540 		memmove(self -> cache + 1, self -> cache , (self -> cache_cnt -1) * sizeof(*self -> cache));
2541 #else
2542 		self -> cache[1]=self -> cache[0];
2543 #endif
2544 #endif
2545         }
2546         /* insert a head of list */
2547         self -> cache [ 0 ] = blob;
2548 #endif
2549 
2550 #if TRACKING_BLOBS
2551         fprintf( stderr, "%p->%p(%d) cached *** %s\n"
2552                  , self
2553                  , blob
2554                  , atomic32_read ( & blob -> refcount )
2555                  , self -> name
2556             );
2557 #endif
2558     }
2559 
2560 #if USE_EUGENE
2561     /* this code requires the blob to be cached on the production */
2562     return rc;
2563 #else
2564     /* we don't care if the blob was not cached */
2565     return 0;
2566 #endif
2567 
2568 #endif /* PROD_CACHE */
2569 
2570 #endif
2571 }
2572 
2573 /* IsStatic
2574  *  trace all the way to a physical production
2575  */
VProductionIsStatic(const VProduction * self,bool * is_static)2576 rc_t VProductionIsStatic ( const VProduction *self, bool *is_static )
2577 {
2578     rc_t rc;
2579 
2580     if ( self == NULL )
2581         rc = RC ( rcVDB, rcColumn, rcAccessing, rcSelf, rcNull );
2582     else
2583     {
2584         for ( rc = 0; self != NULL; )
2585         {
2586             switch ( self -> var )
2587             {
2588             case prodSimple:
2589                 self = ( ( const VSimpleProd*) self ) -> in;
2590                 break;
2591             case prodFunc:
2592             case prodScript:
2593             {
2594                 const VFunctionProd *fp = ( const VFunctionProd* ) self;
2595                 uint32_t start = VectorStart ( & fp -> parms );
2596                 uint32_t end = VectorLength ( & fp -> parms );
2597                 for ( end += start; start < end; ++ start )
2598                 {
2599                     self = ( const VProduction* ) VectorGet ( & fp -> parms, start );
2600                     if ( self != NULL )
2601                     {
2602                         rc = VProductionIsStatic ( self, is_static );
2603                         if ( rc != 0 || * is_static )
2604                             break;
2605                     }
2606                 }
2607                 return rc;
2608             }
2609             case prodPhysical:
2610                 return VPhysicalIsStatic ( ( ( const VPhysicalProd* ) self ) -> phys, is_static );
2611             case prodColumn:
2612                 self = NULL;
2613                 break;
2614             case prodPivot:
2615                 assert(false); /* TODO */
2616                 break;
2617             default:
2618                 return RC ( rcVDB, rcProduction, rcReading, rcType, rcUnknown );
2619             }
2620         }
2621     }
2622 
2623     return rc;
2624 }
2625 
2626 /* GetKColumn
2627  *  drills down to physical production to get a KColumn,
2628  *  and if that fails, indicate whether the column is static
2629  */
VProductionGetKColumn(const VProduction * self,struct KColumn ** kcol,bool * is_static)2630 rc_t VProductionGetKColumn ( const VProduction * self, struct KColumn ** kcol, bool * is_static )
2631 {
2632     rc_t rc;
2633 
2634     if ( self == NULL )
2635         rc = RC ( rcVDB, rcColumn, rcAccessing, rcSelf, rcNull );
2636     else
2637     {
2638         for ( rc = 0; self != NULL; )
2639         {
2640             switch ( self -> var )
2641             {
2642             case prodSimple:
2643                 self = ( ( const VSimpleProd*) self ) -> in;
2644                 break;
2645             case prodFunc:
2646             case prodScript:
2647             {
2648                 const VFunctionProd *fp = ( const VFunctionProd* ) self;
2649                 uint32_t start = VectorStart ( & fp -> parms );
2650                 uint32_t end = VectorLength ( & fp -> parms );
2651                 for ( end += start; start < end; ++ start )
2652                 {
2653                     self = ( const VProduction* ) VectorGet ( & fp -> parms, start );
2654                     if ( self != NULL )
2655                     {
2656                         rc = VProductionGetKColumn ( self, kcol, is_static );
2657                         if ( rc != 0 || * kcol != NULL || * is_static )
2658                             break;
2659                     }
2660                 }
2661                 return rc;
2662             }
2663             case prodPhysical:
2664                 return VPhysicalGetKColumn ( ( ( const VPhysicalProd* ) self ) -> phys, kcol, is_static );
2665             case prodColumn:
2666                 self = NULL;
2667                 break;
2668             case prodPivot:
2669                 assert(false); /* TODO */
2670                 break;
2671             default:
2672                 return RC ( rcVDB, rcProduction, rcReading, rcType, rcUnknown );
2673             }
2674         }
2675     }
2676 
2677     return rc;
2678 }
2679