1 /*===========================================================================
2 *
3 *                            PUBLIC DOMAIN NOTICE
4 *               National Center for Biotechnology Information
5 *
6 *  This software/database is a "United States Government Work" under the
7 *  terms of the United States Copyright Act.  It was written as part of
8 *  the author's official duties as a United States Government employee and
9 *  thus cannot be copyrighted.  This software/database is freely available
10 *  to the public for use. The National Library of Medicine and the U.S.
11 *  Government have not placed any restriction on its use or reproduction.
12 *
13 *  Although all reasonable efforts have been taken to ensure the accuracy
14 *  and reliability of the software and data, the NLM and the U.S.
15 *  Government do not and cannot warrant the performance or results that
16 *  may be obtained by using this software or data. The NLM and the U.S.
17 *  Government disclaim all warranties, express or implied, including
18 *  warranties of performance, merchantability or fitness for any particular
19 *  purpose.
20 *
21 *  Please cite the author in any work or product based on this material.
22 *
23 * ===========================================================================
24 *
25 */
26 
27 #include <vdb/extern.h>
28 
29 #include <va_copy.h>
30 
31 #define TRACK_REFERENCES 0
32 
33 #define KONST const
34 #define SKONST
35 #include "cursor-table.h"
36 #include "dbmgr-priv.h"
37 #include "linker-priv.h"
38 #include "table-priv.h"
39 #include "schema-priv.h"
40 #include "schema-parse.h"
41 #include "column-priv.h"
42 #include "prod-expr.h"
43 #undef KONST
44 #undef SKONST
45 #include "blob-priv.h"
46 #include "page-map.h"
47 
48 #include <vdb/cursor.h>
49 #include <vdb/table.h>
50 #include <vdb/vdb-priv.h>
51 #include <kdb/table.h>
52 #include <kdb/column.h>
53 #include <kdb/meta.h>
54 #include <kdb/namelist.h>
55 #include <kfs/dyload.h>
56 #include <klib/symbol.h>
57 #include <klib/symtab.h>
58 #include <klib/namelist.h>
59 #include <klib/log.h>
60 #include <klib/rc.h>
61 #include <klib/printf.h>
62 #include <klib/sort.h>
63 #include <bitstr.h>
64 #include <os-native.h>
65 #include <sysalloc.h>
66 
67 #include <kproc/lock.h>
68 #include <kproc/cond.h>
69 #include <kproc/thread.h>
70 
71 
72 #include <stdlib.h>
73 #include <stdio.h>
74 #include <string.h>
75 #include <ctype.h>
76 #include <assert.h>
77 
78 #define ToConstVCursor(self)    ((const VCursor*)(self))
79 
80 #define PERMIT_POST_OPEN_ADD 1
81 #if _ARCH_BITS == 32
82 #define DISABLE_READ_CACHE 1
83 #else
84 #define DISABLE_READ_CACHE 0
85 #endif
86 
87 /* normally false
88    can be set for certain applications using VDBManagerDisablePagemapThread
89 */
90 static bool s_disable_pagemap_thread;
91 
92 /*--------------------------------------------------------------------------
93  * NamedParamNode
94  */
95 
96 typedef struct NamedParamNode NamedParamNode;
97 struct NamedParamNode
98 {
99     BSTNode n;
100     String name;
101     KDataBuffer value;
102 };
103 
104 static
NamedParamNodeWhack(BSTNode * n,void * ignore)105 void CC NamedParamNodeWhack ( BSTNode *n, void *ignore )
106 {
107     NamedParamNode *self = ( NamedParamNode* ) n;
108     KDataBufferWhack ( & self -> value );
109     free ( self );
110 }
111 
112 static
NamedParamComp(const void * item,const BSTNode * n)113 int64_t CC NamedParamComp ( const void *item, const BSTNode *n )
114 {
115     const String *name = item;
116     const NamedParamNode *node = ( const NamedParamNode* ) n;
117 
118     return StringOrderNoNullCheck ( name, & node -> name );
119 }
120 
121 static
NamedParamNodeComp(const BSTNode * A,const BSTNode * B)122 int64_t CC NamedParamNodeComp ( const BSTNode *A, const BSTNode *B )
123 {
124     const NamedParamNode *a = (const NamedParamNode *) A;
125     const NamedParamNode *b = (const NamedParamNode *) B;
126 
127     return StringOrderNoNullCheck ( & a -> name, & b -> name );
128 }
129 /*--------------------------------------------------------------------------
130  * LinkedCursorNode
131  */
132 
133 typedef struct LinkedCursorNode LinkedCursorNode;
134 struct LinkedCursorNode
135 {
136     BSTNode n;
137     char tbl[64];
138     VCursor *curs;
139 };
140 
141 static
LinkedCursorNodeWhack(BSTNode * n,void * ignore)142 void CC LinkedCursorNodeWhack ( BSTNode *n, void *ignore )
143 {
144     LinkedCursorNode *self = ( LinkedCursorNode* ) n;
145     VCursorRelease (  self -> curs );
146     free ( self );
147 }
148 
149 static
LinkedCursorComp(const void * item,const BSTNode * n)150 int64_t CC LinkedCursorComp ( const void *item, const BSTNode *n )
151 {
152     const char *tbl = item;
153     const LinkedCursorNode *node = ( const LinkedCursorNode* ) n;
154 
155     return strncmp ( tbl, node -> tbl, sizeof(node -> tbl) );
156 }
157 
158 static
LinkedCursorNodeComp(const BSTNode * A,const BSTNode * B)159 int64_t CC LinkedCursorNodeComp ( const BSTNode *A, const BSTNode *B )
160 {
161     const LinkedCursorNode *a = (const LinkedCursorNode *) A;
162     const LinkedCursorNode *b = (const LinkedCursorNode *) B;
163 
164     return strncmp ( a -> tbl, b -> tbl,sizeof(a->tbl) );
165 }
166 
167 
168 
169 /*--------------------------------------------------------------------------
170  * VTableCursor
171  *  a row cursor onto a VTable
172  */
173 
174 /* Whack
175  */
VTableCursorWhack(const VCURSOR_IMPL * p_self)176 rc_t VTableCursorWhack ( const VCURSOR_IMPL * p_self )
177 {
178     VCURSOR_IMPL * self = ( VCURSOR_IMPL * ) p_self;
179     if ( self -> cache_curs )
180     {
181         VTableCursorWhack ( ( VTableCursor * ) ( self -> cache_curs ) );
182     }
183     VBlobMRUCacheDestroy ( self->blob_mru_cache);
184 
185     BSTreeWhack ( & self -> named_params, NamedParamNodeWhack, NULL );
186     BSTreeWhack ( & self -> linked_cursors, LinkedCursorNodeWhack, NULL );
187     VectorWhack ( & self -> trig, NULL, NULL );
188     VectorWhack ( & self -> v_cache_curs, NULL, NULL );
189     VectorWhack ( & self -> v_cache_cidx, NULL, NULL );
190 
191     VSchemaRelease ( self -> schema );
192 
193     { /* some references from the base class may dangle if we Sever() before VCursorWhackInt() is done */
194 	    const VTable * tbl = self -> tbl;
195 		VCursorWhackInt ( & self -> dad );
196 		VTableSever ( tbl );
197 	}
198 
199     return 0;
200 }
201 
202 /* Make - PRIVATE
203  */
VTableCursorMake(VCURSOR_IMPL ** cursp,const VTable * tbl,VCursor_vt * vt)204 rc_t VTableCursorMake ( VCURSOR_IMPL **cursp, const VTable *tbl, VCursor_vt *vt )
205 {
206     rc_t rc;
207     VTableCursor *curs;
208 
209     /* must have return param */
210     assert ( cursp != NULL );
211 
212     /* must have parent tbl */
213     assert ( tbl != NULL );
214 
215     /* create a structure */
216     curs = calloc ( 1, sizeof * curs );
217     if ( curs == NULL )
218         rc = RC ( rcVDB, rcCursor, rcConstructing, rcMemory, rcExhausted );
219     else
220     {
221         /* create a separate schema object */
222         rc = VSchemaMake ( & curs -> schema, tbl -> schema );
223         if ( rc == 0 )
224         {
225             /* extend table schema to populate with implicits */
226             rc = STableCloneExtend ( tbl -> stbl, & curs -> stbl, curs -> schema );
227             if ( rc == 0 )
228             {
229                 curs -> dad . vt = vt;
230                 curs -> tbl = VTableAttach ( tbl );
231                 VectorInit ( & curs -> dad . row, 1, 16 );
232                 VectorInit ( & curs -> v_cache_curs, 1, 16 );
233                 VectorInit ( & curs -> v_cache_cidx, 1, 16 );
234                 VCursorCacheInit ( & curs -> dad . col, 0, 16 );
235                 VCursorCacheInit ( & curs -> dad . phys, 0, 16 );
236                 VCursorCacheInit ( & curs -> dad . prod, 0, 16 );
237                 VectorInit ( & curs -> dad . owned, 0, 64 );
238                 VectorInit ( & curs -> trig, 0, 64 );
239                 KRefcountInit ( & curs -> dad . refcount, 1, "VCursor", "make", "vcurs" );
240                 curs -> dad . state = vcConstruct;
241                 curs -> permit_add_column = true;
242                 curs -> suspend_triggers  = false;
243                 * cursp = curs;
244                 return 0;
245             }
246 
247             VSchemaRelease ( curs -> schema );
248         }
249 
250         free ( curs );
251     }
252 
253     * cursp = NULL;
254 
255     return rc;
256 }
257 
258 /* SupplementSchema
259  *  scan table for physical column names
260  *  create transparent yet incomplete (untyped) columns for unknown names
261  *  create incomplete (untyped) physical columns for forwarded names
262  *  repeat process on static columns, except create complete (fully typed) objects
263  */
264 static
VCursorSupplementName(const KSymTable * tbl,STable * stbl,const VTypedecl * td,const char * name)265 rc_t VCursorSupplementName ( const KSymTable *tbl,
266     STable *stbl, const VTypedecl *td, const char *name )
267 {
268     rc_t rc = 0;
269     char buffer [ 256 ];
270 
271     /* create physical name string */
272     int len = snprintf ( buffer, sizeof buffer, ".%s", name );
273     if ( len < 0 || len >= sizeof buffer )
274         rc = RC ( rcVDB, rcCursor, rcConstructing, rcName, rcExcessive );
275     else
276     {
277         KSymbol *sym;
278 
279         String pname, cname;
280         StringInit ( & pname, buffer, len, string_len ( buffer, len ) );
281 
282         /* if physical name is known */
283         sym = KSymTableFind ( tbl, & pname );
284         if ( sym != NULL )
285         {
286             /* if it is being implemented here */
287             if ( sym -> type == eVirtual )
288                 rc = STableImplicitPhysMember ( stbl, td, sym, & pname );
289             return rc;
290         }
291 
292         /* if simple name is unknown, add implicit */
293         sym = KSymTableFind ( tbl, StringSubstr ( & pname, & cname, 1, 0 ) );
294         if ( sym == NULL )
295         {
296             /* create implicit physical */
297             rc = STableImplicitPhysMember ( stbl, td, sym, & pname );
298             if ( rc == 0 )
299                 rc = STableImplicitColMember ( stbl, & cname, & pname );
300         }
301     }
302     return rc;
303 }
304 
305 static
VCursorSupplementPhysical(const KSymTable * tbl,const VTableCursor * self)306 rc_t VCursorSupplementPhysical ( const KSymTable *tbl, const VTableCursor *self )
307 {
308     KNamelist *names;
309     rc_t rc = KTableListCol ( self -> tbl -> ktbl, & names );
310     if ( rc == 0 )
311     {
312         uint32_t i, count;
313         rc = KNamelistCount ( names, & count );
314         for ( i = 0; rc == 0 && i < count; ++ i )
315         {
316             const char *name;
317             rc = KNamelistGet ( names, i, & name );
318             if ( rc == 0 )
319                 rc = VCursorSupplementName ( tbl, self -> stbl, NULL, name );
320         }
321         KNamelistRelease ( names );
322     }
323     return rc;
324 }
325 
326 static
VCursorSupplementStatic(const KSymTable * tbl,const VTableCursor * self)327 rc_t VCursorSupplementStatic ( const KSymTable *tbl, const VTableCursor *self )
328 {
329     rc_t rc;
330     KNamelist *names;
331 
332     const KMDataNode *root = self -> tbl -> col_node;
333     if ( root == NULL )
334         return 0;
335 
336     rc = KMDataNodeListChild ( root, & names );
337     if ( rc == 0 )
338     {
339         uint32_t i, count;
340         rc = KNamelistCount ( names, & count );
341         for ( i = 0; rc == 0 && i < count; ++ i )
342         {
343             const char *name;
344             rc = KNamelistGet ( names, i, & name );
345             if ( rc == 0 )
346             {
347                 const KMDataNode *node;
348                 rc = KMDataNodeOpenNodeRead ( root, & node, "%s", name );
349                 if ( rc == 0 )
350                 {
351                     size_t size;
352                     char typedecl [ 256 ];
353                     rc = KMDataNodeReadAttr ( node, "type", typedecl, sizeof typedecl, & size );
354                     if ( rc == 0 && size != 0 )
355                     {
356                         VTypedecl td;
357                         rc = VSchemaResolveTypedecl ( self -> schema, & td, "%s", typedecl );
358                         if ( rc == 0 )
359                             rc = VCursorSupplementName ( tbl, self -> stbl, & td, name );
360 
361                         rc = 0; /*** don't care if name is not in the schema ***/
362 
363                     }
364 
365                     KMDataNodeRelease ( node );
366                 }
367             }
368         }
369 
370         KNamelistRelease ( names );
371     }
372 
373     return rc;
374 }
375 
VCursorSupplementSchema(const VCURSOR_IMPL * self)376 rc_t VCursorSupplementSchema ( const VCURSOR_IMPL *self )
377 {
378     KSymTable tbl;
379     rc_t rc = init_tbl_symtab ( & tbl, self -> schema, self -> stbl );
380     if ( rc == 0 )
381     {
382         rc = VCursorSupplementPhysical ( & tbl, self );
383         if ( rc == 0 )
384             rc = VCursorSupplementStatic ( & tbl, self );
385         KSymTableWhack ( & tbl );
386     }
387     return rc;
388 }
389 
390 /* CreateCachedCursorRead
391  *  creates a read cursor object onto table with a cache limit in bytes
392  *
393  *  AVAILABILITY: version 2.1
394  *
395  *  "curs" [ OUT ] - return parameter for newly created cursor
396  *
397  *  "capacity" [ IN ] - the maximum bytes to cache on the cursor before
398  *  dropping least recently used blobs
399  */
VTableCreateCachedCursorReadImpl(const VTable * self,const VTableCursor ** cursp,size_t capacity,bool create_pagemap_thread)400 static rc_t VTableCreateCachedCursorReadImpl ( const VTable *self,
401     const VTableCursor **cursp, size_t capacity, bool create_pagemap_thread  )
402 {
403     rc_t rc;
404 #if DISABLE_READ_CACHE
405     capacity = 0;
406 #endif
407     if ( cursp == NULL )
408         rc = RC ( rcVDB, rcTable, rcOpening, rcParam, rcNull );
409     else {
410         VTableCursor *curs;
411 #if LAZY_OPEN_COL_NODE
412         if ( self -> col_node == NULL )
413             KMetadataOpenNodeRead ( self -> meta, & ( ( VTable* ) self ) -> col_node, "col" );
414 #endif
415         rc = VCursorMakeFromTable ( & curs, self );
416         if ( rc == 0 ) {
417             curs -> blob_mru_cache = VBlobMRUCacheMake(capacity);
418             curs -> read_only = true;
419             rc = VCursorSupplementSchema ( curs );
420 
421 #if 0
422             if ( create_pagemap_thread && capacity > 0 && rc == 0 )
423             {
424                 rc = VCursorLaunchPagemapThread ( curs );
425                 if ( rc != 0 )
426                 {
427                     if ( GetRCState( rc ) == rcNotAvailable )
428                         rc = 0;
429                 }
430             }
431 #endif
432             if ( rc == 0 )
433             {
434                 if(capacity > 0)
435                     curs->launch_cnt = 5;
436                 else
437                     curs->launch_cnt=200;
438                 * cursp = curs;
439                 if(rc==0 && self->cache_tbl)
440                 {
441                     rc_t rc2;
442                     const VTableCursor * cache_curs;
443                     rc2 = VTableCreateCachedCursorReadImpl(self->cache_tbl,&cache_curs,64*1024*1024,create_pagemap_thread);
444                     DBGMSG(DBG_VDB, DBG_FLAG(DBG_VDB_VDB), ("VTableCreateCachedCursorReadImpl(vdbcache) = %d\n", rc2));
445                     if(rc2 == 0)
446                     {
447                         ((VTableCursor*)(*cursp)) -> cache_curs = & cache_curs -> dad;
448                     }
449                 }
450                 return 0;
451             }
452             VCursorRelease ( & curs -> dad );
453         }
454         * cursp = NULL;
455     }
456     return rc;
457 }
458 
VTableCreateCachedCursorRead(const VTable * self,const VCursor ** cursp,size_t capacity)459 LIB_EXPORT rc_t CC VTableCreateCachedCursorRead ( const VTable *self,
460     const VCursor **cursp, size_t capacity )
461 {
462 	return VTableCreateCachedCursorReadImpl ( self, (const VTableCursor **)cursp, capacity, true );
463 }
464 
465 /**
466 *** VTableCreateCursorReadInternal is only visible in vdb and needed for schema resolutions
467 ****/
VTableCreateCursorReadInternal(const VTable * self,const VTableCursor ** cursp)468 rc_t  VTableCreateCursorReadInternal(const VTable *self, const VTableCursor **cursp)
469 {
470 	return VTableCreateCachedCursorReadImpl(self,cursp,0,false);
471 }
472 
473 /* CreateCursor
474  *  creates a cursor object onto table
475  *  multiple read cursors are allowed
476  *  only a single write cursor is allowed
477  *
478  *  "curs" [ OUT ] - return parameter for newly created cursor
479  */
VTableCreateCursorRead(const VTable * self,const VCursor ** curs)480 LIB_EXPORT rc_t CC VTableCreateCursorRead ( const VTable *self, const VCursor **curs )
481 {
482     /* will be deprecated in the future */
483     return VTableCreateCachedCursorRead ( self, curs, 0 );
484 }
485 
486 /* PermitPostOpenAdd
487  *  allows columns to be added to open cursor
488  *  for write cursor, the effect lasts until the first row commit
489  */
VTableCursorPermitPostOpenAdd(const VCURSOR_IMPL * cself)490 rc_t CC VTableCursorPermitPostOpenAdd ( const VCURSOR_IMPL *cself )
491 {
492     rc_t rc;
493     VTableCursor *self = ( VCURSOR_IMPL* ) cself;
494 
495     if ( self == NULL )
496         rc = RC ( rcVDB, rcCursor, rcUpdating, rcSelf, rcNull );
497     else if ( self -> dad . state == vcFailed )
498         rc = RC ( rcVDB, rcCursor, rcUpdating, rcCursor, rcInvalid );
499     else if ( self -> dad . state != vcConstruct )
500         rc = RC ( rcVDB, rcCursor, rcUpdating, rcCursor, rcOpen );
501     else
502     {
503         self -> permit_post_open_add = true;
504         rc = 0;
505     }
506     if(self->cache_curs){
507 	VCursorPermitPostOpenAdd(self->cache_curs);
508     }
509 
510     return rc;
511 }
512 /*  SuspendTriggers
513  *  blocks resolution of schema-based triggers
514  *
515  */
VTableCursorSuspendTriggers(const VCURSOR_IMPL * cself)516 rc_t CC VTableCursorSuspendTriggers ( const VCURSOR_IMPL *cself )
517 {
518     rc_t rc;
519     VTableCursor *self = ( VCURSOR_IMPL* ) cself;
520 
521     if ( self == NULL )
522         rc = RC ( rcVDB, rcCursor, rcUpdating, rcSelf, rcNull );
523     else
524     {
525         self -> suspend_triggers = true;
526         rc = 0;
527     }
528 
529     return rc;
530 }
531 
532 
533 /* AddSColumn
534  */
535 static
VTableCursorAddSColumn(VCURSOR_IMPL * self,uint32_t * idx,const SColumn * scol,const VTypedecl * cast,Vector * cx_bind)536 rc_t VTableCursorAddSColumn ( VCURSOR_IMPL *self, uint32_t *idx,
537     const SColumn *scol, const VTypedecl *cast, Vector *cx_bind )
538 {
539     rc_t rc;
540     VColumn *col;
541 
542     if ( self -> read_only )
543     {
544         /* must be readable */
545         if ( scol -> read == NULL )
546             return RC ( rcVDB, rcCursor, rcUpdating, rcColumn, rcWriteonly );
547     }
548     else
549     {
550         /* must be writable */
551         if ( scol -> read_only || ( scol -> read == NULL && scol -> validate == NULL ) )
552             return RC ( rcVDB, rcCursor, rcUpdating, rcColumn, rcReadonly );
553     }
554 
555     /* must not already be there - benign error */
556     col = VCursorCacheGet ( & self -> dad . col, & scol -> cid );
557     if ( col != NULL )
558     {
559         * idx = col -> ord;
560         return SILENT_RC ( rcVDB, rcCursor, rcUpdating, rcColumn, rcExists );
561     }
562 
563     /* make object */
564     rc = VCursorMakeColumn ( & self -> dad, & col, scol, cx_bind );
565     if ( rc == 0 )
566     {
567         /* insert it into vectors */
568         rc = VectorAppend ( & self -> dad . row, & col -> ord, col );
569         if ( rc == 0 )
570         {
571             void *ignore;
572             rc = VCursorCacheSet ( & self -> dad . col, & scol -> cid, col );
573             if ( rc == 0 )
574             {
575                 /* open column if cursor open or type unknown */
576                 if ( self -> dad . state >= vcReady || scol -> td . type_id == 0 )
577                 {
578                     rc = VCursorPostOpenAdd ( self, col );
579                     assert ( rc != 0 || scol -> td . type_id != 0 );
580                 }
581                 if ( rc == 0 )
582                 {
583                     /* check cast of SColumn against requested type
584                        this is to handle the case where the column
585                        was created incomplete, i.e. with unknown type */
586                     if ( cast == NULL || VTypedeclToTypedecl ( & scol -> td,
587                              self -> schema, cast, & col -> td, NULL ) )
588                     {
589                         /* has been entered */
590                         * idx = col -> ord;
591                         return 0;
592                     }
593                 }
594 
595                 /* bail out */
596                 VCursorCacheSwap ( & self -> dad . col, & scol -> cid, NULL, & ignore );
597             }
598 
599             VectorSwap ( & self -> dad . row, col -> ord, NULL, & ignore );
600         }
601 
602         VColumnWhack ( col, NULL );
603     }
604 
605     return rc;
606 }
607 
608 
609 /* AddColspec
610  *  a "colspec" is either a simple column name or a typed name expression
611  *  uses STable to evaluate colspec and find an SColumn
612  */
613 static
VCursorAddColspec(VCURSOR_IMPL * self,uint32_t * idx,const char * colspec)614 rc_t VCursorAddColspec ( VCURSOR_IMPL *self, uint32_t *idx, const char *colspec )
615 {
616     rc_t rc;
617 
618     /* find an appropriate column in schema */
619     uint32_t type;
620     VTypedecl cast;
621     const SNameOverload *name;
622     const SColumn *scol = STableFind ( self -> tbl -> stbl, self -> schema,
623         & cast, & name, & type, colspec, "VCursorAddColspec", true );
624     if ( scol == NULL || type != eColumn )
625         rc = SILENT_RC ( rcVDB, rcCursor, rcUpdating, rcColumn, rcNotFound );
626     else
627     {
628         Vector cx_bind;
629         VectorInit ( & cx_bind, 1, self -> schema -> num_indirect );
630         rc = VTableCursorAddSColumn ( self, idx, scol, & cast, & cx_bind );
631         VectorWhack ( & cx_bind, NULL, NULL );
632         if(rc == 0)
633         {
634             char ccolspec[1024];
635             size_t n;
636             rc_t rc2=string_printf(ccolspec,sizeof(ccolspec),&n,"%s_CACHE",colspec);
637 
638             VectorSet(&self->v_cache_curs,*idx,NULL);
639             VectorSet(&self->v_cache_cidx,*idx,(const void*)0);
640             if(rc2==0)
641             {
642                 uint32_t cidx;
643                 rc2=VCursorAddColumn ( ToConstVCursor ( self ), & cidx, ccolspec ); /** see if column exists in the same table **/
644                 DBGMSG(DBG_VDB, DBG_FLAG(DBG_VDB_VDB), ("VCursorAddColspec(%s,vdbcache,sametable) = %d\n", ccolspec,rc2));
645                 if(rc2==0 || GetRCState ( rc2 ) == rcExists )
646                 {
647                     VectorSet(&self->v_cache_curs,*idx,self);
648                     VectorSet(&self->v_cache_cidx,*idx,(const void*)(uint64_t)cidx);
649                 }
650                 else if(self->cache_curs)
651                 {
652                     rc2=VCursorAddColumn(self->cache_curs,&cidx,ccolspec); /** see if column exists in external table **/
653                         DBGMSG(DBG_VDB, DBG_FLAG(DBG_VDB_VDB), ("VCursorAddColspec(%s,vdbcache,remotetable) = %d\n", ccolspec,rc2));
654                     if(rc2==0 || GetRCState ( rc2 ) == rcExists ){
655                         VectorSet(&self->v_cache_curs,*idx,self->cache_curs);
656                         VectorSet(&self->v_cache_cidx,*idx,(const void*)(uint64_t)cidx);
657                     }
658                 }
659             }
660         }
661     }
662 
663     return rc;
664 }
665 
666 
667 /* AddColumn
668  *  add a column to an unopened cursor
669  *
670  *  "idx" [ OUT ] - return parameter for column index
671  *
672  *  "name" [ IN ] - NUL terminated column name spec.
673  *  to identify a column by name, provide the column name
674  *  by itself. if there are multiple types available under
675  *  that name, the default type for that column will be
676  *  selected. to select a specific type, the name may
677  *  be cast to that type using a cast expression, e.g.
678  *    "( type ) name"
679  *  the special name "*" may be added to a read cursor.
680  */
VTableCursorVAddColumn(const VCURSOR_IMPL * cself,uint32_t * idx,const char * name,va_list args)681 rc_t VTableCursorVAddColumn ( const VCURSOR_IMPL *cself,
682     uint32_t *idx, const char *name, va_list args )
683 {
684     rc_t rc;
685     VTableCursor *self = ( VTableCursor* ) cself;
686 
687     if ( idx == NULL )
688         rc = RC ( rcVDB, rcCursor, rcUpdating, rcParam, rcNull );
689     else
690     {
691         * idx = 0;
692 
693         if ( self == NULL )
694             rc = RC ( rcVDB, rcCursor, rcUpdating, rcSelf, rcNull );
695         else if ( name == NULL )
696             rc = RC ( rcVDB, rcCursor, rcUpdating, rcName, rcNull );
697         else if ( name [ 0 ] == 0 )
698             rc = RC ( rcVDB, rcCursor, rcUpdating, rcName, rcEmpty );
699         else if ( self -> dad . state == vcFailed )
700             rc = RC ( rcVDB, rcCursor, rcUpdating, rcCursor, rcInvalid );
701         else if ( self -> dad . state != vcConstruct && ! self -> permit_add_column )
702             rc = RC ( rcVDB, rcCursor, rcUpdating, rcCursor, rcLocked );
703         else
704         {
705             char colspec [ 1024 ];
706             int len = vsnprintf ( colspec, sizeof colspec, name, args );
707             if ( len < 0 || len >= sizeof colspec )
708                 rc = RC ( rcVDB, rcCursor, rcUpdating, rcName, rcExcessive );
709             else
710             {
711                 rc = VCursorAddColspec ( self, idx, colspec );
712                 if ( rc == 0 || GetRCState ( rc ) == rcExists )
713                     return rc;
714             }
715 
716             if ( ! self -> permit_add_column )
717             {
718                 PLOGERR ( klogErr, ( klogErr, rc, "failed to add column '$(spec)' to cursor",
719                                      "spec=%s", colspec ));
720             }
721 
722             return rc;
723         }
724     }
725 
726     LOGERR ( klogErr, rc, "failed to add column" );
727 
728     return rc;
729 }
730 
731 /* GetColumnIdx
732  *  retrieve column index by name spec
733  *
734  *  "idx" [ OUT ] - return parameter for column index
735  *
736  *  "name" [ IN ] - NUL terminated column name spec.
737  */
VTableCursorVGetColumnIdx(const VCURSOR_IMPL * self,uint32_t * idx,const char * name,va_list args)738 rc_t CC VTableCursorVGetColumnIdx ( const VCURSOR_IMPL *self,
739     uint32_t *idx, const char *name, va_list args )
740 {
741     rc_t rc;
742 
743     if ( idx == NULL )
744         rc = RC ( rcVDB, rcCursor, rcAccessing, rcParam, rcNull );
745     else
746     {
747         * idx = 0;
748 
749         if ( name == NULL )
750             rc = RC ( rcVDB, rcCursor, rcAccessing, rcName, rcNull );
751         else if ( name [ 0 ] == 0 )
752             rc = RC ( rcVDB, rcCursor, rcAccessing, rcName, rcEmpty );
753         else if ( self -> dad . state == vcFailed )
754             rc = RC ( rcVDB, rcCursor, rcAccessing, rcCursor, rcInvalid );
755         else
756         {
757             char colspec [ 1024 ];
758             int len = vsnprintf ( colspec, sizeof colspec, name, args );
759             if ( len < 0 || len >= sizeof colspec )
760                 rc = RC ( rcVDB, rcCursor, rcAccessing, rcName, rcExcessive );
761             else
762             {
763                 /* find an appropriate column in schema */
764                 uint32_t type;
765                 VTypedecl cast;
766                 const SNameOverload *name;
767                 const SColumn *scol = STableFind ( self -> tbl -> stbl,
768                                                    self -> schema,
769                                                    & cast,
770                                                    & name,
771                                                    & type,
772                                                    colspec,
773                                                    "VTableCursorVGetColumnIdx",
774                                                    true );
775             rc = VCursorGetColidx ( & self -> dad, scol, name, type, idx );
776             }
777         }
778     }
779 
780     return rc;
781 }
782 
783 /* Open
784  *  open cursor, resolving schema
785  *  for the set of opened columns
786  *
787  *  NB - there is no corresponding "Close"
788  *  use "Release" instead.
789  */
790 typedef struct VProdResolveData VProdResolveData;
791 struct VProdResolveData
792 {
793     VProdResolve pr;
794     rc_t rc;
795 };
796 
797 
798 static
VCursorResolveColumn(void * item,void * data)799 bool CC VCursorResolveColumn ( void *item, void *data )
800 {
801     if ( item != NULL )
802     {
803         void *ignore;
804         VTableCursor *self;
805 
806         VColumn *col = item;
807         VProdResolveData *pb = data;
808         SColumn *scol = ( SColumn* ) col -> scol;
809 
810         VProduction *src = NULL;
811         pb -> rc = VProdResolveColumnRoot ( & pb -> pr, & src, scol );
812         if ( pb -> rc == 0 )
813         {
814             if ( src > FAILED_PRODUCTION )
815             {
816                 /* repair for incomplete implicit column decl */
817                 if ( scol -> td . type_id == 0 )
818                     scol -> td = src -> fd . td;
819 
820                 return false;
821             }
822 
823             pb -> rc = RC ( rcVDB, rcCursor, rcOpening, rcColumn, rcUndefined );
824         }
825 
826         /* check for tolerance */
827         self = ( VTableCursor * ) pb -> pr . curs;
828         if ( ! pb -> pr . ignore_column_errors )
829         {
830             if ( ! self -> permit_post_open_add )
831             {
832                 PLOGERR ( klogErr, ( klogErr, pb -> rc, "failed to resolve column '$(name)' idx '$(idx)'",
833                                      "name=%.*s,idx=%u"
834                                      , ( int ) scol -> name -> name . size
835                                      , scol -> name -> name . addr
836                                      , col -> ord ));
837             }
838 
839             return true;
840         }
841 
842         /* remove from row and cache */
843         VectorSwap ( & self -> dad . row, col -> ord, NULL, & ignore );
844         VCursorCacheSwap ( & self -> dad . col, & scol -> cid, NULL, & ignore );
845 
846         /* dump the VColumn */
847         VColumnWhack ( col, NULL );
848 
849         /* return no-error */
850         pb -> rc = 0;
851     }
852 
853     return false;
854 }
855 
856 static
VCursorOpenColumn(const VTableCursor * cself,VColumn * col)857 rc_t VCursorOpenColumn ( const VTableCursor *cself, VColumn *col )
858 {
859     KDlset *libs;
860     VTableCursor *self = ( VTableCursor* ) cself;
861 
862     Vector cx_bind;
863     VProdResolveData pb;
864     pb . pr . schema = self -> schema;
865     pb . pr . ld = self -> tbl -> linker;
866     pb . pr . name = & self -> stbl -> name -> name;
867     pb . pr . primary_table = VCursorGetTable ( & self -> dad );
868     pb . pr . curs = & self -> dad;
869     pb . pr . cache = & self -> dad . prod;
870     pb . pr . owned = & self -> dad . owned;
871     pb . pr . cx_bind = & cx_bind;
872     pb . pr . chain = chainDecoding;
873     pb . pr . blobbing = false;
874     pb . pr . ignore_column_errors = false;
875     pb . pr . discover_writable_columns = false;
876 
877     VectorInit ( & cx_bind, 1, self -> schema -> num_indirect );
878 
879     pb . rc = VLinkerOpen ( pb . pr . ld, & libs );
880     if ( pb . rc == 0 )
881     {
882         pb . pr . libs = libs;
883         VCursorResolveColumn ( col, & pb );
884         KDlsetRelease ( libs );
885     }
886 
887     VectorWhack ( & cx_bind, NULL, NULL );
888 
889     return pb . rc;
890 }
891 
892 /* PostOpenAdd
893  *  handle opening of a column after the cursor is opened
894  */
VCursorPostOpenAddRead(VTableCursor * self,VColumn * col)895 rc_t VCursorPostOpenAddRead ( VTableCursor *self, VColumn *col )
896 {
897     return VCursorOpenColumn ( self, col );
898 }
899 
900 
901 static
VCursorResolveColumnProductions(VTableCursor * self,const KDlset * libs,bool ignore_failures)902 rc_t VCursorResolveColumnProductions ( VTableCursor *self,
903     const KDlset *libs, bool ignore_failures )
904 {
905     Vector cx_bind;
906     VProdResolveData pb;
907     pb . pr . schema = self -> schema;
908     pb . pr . ld = self -> tbl -> linker;
909     pb . pr . libs = libs;
910     pb . pr . name = & self -> stbl -> name -> name;
911     pb . pr . primary_table = VCursorGetTable ( & self -> dad ) ;
912     pb . pr . curs = & self -> dad ;
913     pb . pr . cache = & self -> dad . prod;
914     pb . pr . owned = & self -> dad . owned;
915     pb . pr . cx_bind = & cx_bind;
916     pb . pr . chain = chainDecoding;
917     pb . pr . blobbing = false;
918     pb . pr . ignore_column_errors = ignore_failures;
919     pb . pr . discover_writable_columns = false;
920     pb . rc = 0;
921 
922     VectorInit ( & cx_bind, 1, self -> schema -> num_indirect );
923 
924     if ( ! VectorDoUntil ( & self -> dad . row, false, VCursorResolveColumn, & pb ) )
925         pb . rc = 0;
926 
927     VectorWhack ( & cx_bind, NULL, NULL );
928 
929     return pb . rc;
930 }
931 
VCursorOpenRead(VCURSOR_IMPL * self,const KDlset * libs)932 rc_t VCursorOpenRead ( VCURSOR_IMPL *self, const KDlset *libs )
933 {
934     rc_t rc;
935 
936     if ( self -> dad . state >= vcReady )
937         rc = 0;
938     else if ( self -> dad . state == vcFailed )
939         rc = RC ( rcVDB, rcCursor, rcOpening, rcCursor, rcInvalid );
940     else
941     {
942         rc = VCursorResolveColumnProductions ( self, libs, false );
943         if ( rc == 0 )
944         {
945             self -> dad . row_id =
946             self -> dad . start_id =
947             self -> dad . end_id = 1;
948             self -> dad . state = vcReady;
949             if( self -> cache_curs )
950             {
951                 VCursorOpenRead ( ( VTableCursor * ) self -> cache_curs, libs );
952             }
953             return rc;
954         }
955         else
956         {
957             /* in case the column is not defined ( rcColumn, rcUndefined )
958                 we want to check if the table is empty, and report that instead
959             */
960             if ( GetRCState( rc ) == rcUndefined &&
961                  GetRCObject( rc ) == ( enum RCObject )rcColumn )
962             {
963                 bool empty;
964                 if ( ( VTableIsEmpty ( self -> tbl, &empty ) == 0 ) && empty )
965                 {
966                     rc = RC ( rcVDB, rcCursor, rcOpening, rcTable, rcEmpty );
967                 }
968             }
969         }
970         self -> dad . state = vcFailed;
971     }
972 
973     return rc;
974 }
975 
976 static
VCursorOpenForListing(const VCursor * cself)977 rc_t VCursorOpenForListing ( const VCursor *cself )
978 {
979     rc_t rc;
980     VTableCursor *self = ( VTableCursor* ) cself;
981 
982     VLinker *ld = self -> tbl -> linker;
983 
984     KDlset *libs;
985     rc = VLinkerOpen ( ld, & libs );
986     if ( rc == 0 )
987     {
988         rc = VCursorResolveColumnProductions ( self, libs, true );
989         KDlsetRelease ( libs );
990     }
991     return rc;
992 }
993 
994 /* Read
995  *  read entire single row of byte-aligned data into a buffer
996  *
997  *  "col_idx" [ IN ] - index of column to be read, returned by "AddColumn"
998  *
999  *  "elem_bits" [ IN ] - expected element size in bits, required
1000  *  to be compatible with the actual element size, and be a multiple
1001  *  of 8 ( byte-aligned ). for non-byte-aligned data, see ReadBits
1002  *
1003  *  "buffer" [ OUT ] and "blen" [ IN ] - return buffer for row data
1004  *  where "blen" gives buffer capacity in elements. the total buffer
1005  *  size in bytes == ( "elem_bits" * "blen" + 7 ) / 8.
1006  *
1007  *  "row_len" [ OUT ] - return parameter for the number of elements
1008  *  in the requested row.
1009  *
1010  *  when the return code is 0, "row_len" will contain the number of
1011  *  elements read into buffer. if the return code indicates that the
1012  *  buffer is too small, "row_len" will give the required buffer length.
1013  */
1014 static
VCursorReadColumnDirectInt(const VCURSOR_IMPL * cself,int64_t row_id,uint32_t col_idx,uint32_t * elem_bits,const void ** base,uint32_t * boff,uint32_t * row_len,uint32_t * repeat_count,const VBlob ** rslt)1015 rc_t VCursorReadColumnDirectInt ( const VCURSOR_IMPL *cself, int64_t row_id, uint32_t col_idx,
1016     uint32_t *elem_bits, const void **base, uint32_t *boff, uint32_t *row_len, uint32_t *repeat_count,
1017     const VBlob **rslt )
1018 {
1019     rc_t rc,rc_cache=0;
1020     const VColumn *col;
1021     const VBlob *blob;
1022 
1023     col = ( const void* ) VectorGet ( & cself -> dad . row, col_idx );
1024     if ( col == NULL )
1025         return RC ( rcVDB, rcCursor, rcReading, rcColumn, rcInvalid );
1026 
1027     /* 2.0 behavior if not caching */
1028     if ( cself -> blob_mru_cache == NULL )
1029         return VColumnRead ( col, row_id, elem_bits, base, boff, row_len, (VBlob**) rslt );
1030 
1031     /* check MRU blob */
1032     blob = VBlobMRUCacheFind(cself->blob_mru_cache,col_idx,row_id);
1033     if(blob){
1034         assert(row_id >= blob->start_id && row_id <= blob->stop_id);
1035         /* if the caller wants the blob back... */
1036         if ( rslt != NULL )
1037                 * rslt = blob;
1038         /* ask column to read from blob */
1039         return VColumnReadCachedBlob ( col, blob, row_id, elem_bits, base, boff, row_len, repeat_count);
1040     }
1041     { /* ask column to produce a blob to be cached */
1042 	VBlobMRUCacheCursorContext cctx;
1043 	cctx.cache=cself -> blob_mru_cache;
1044 	cctx.col_idx = col_idx;
1045 	rc = VColumnReadBlob(col,&blob,row_id,elem_bits,base,boff,row_len,repeat_count,&cctx);
1046     }
1047     if ( rc != 0 || blob == NULL ){
1048         if(rslt) *rslt = NULL;
1049         return rc;
1050     }
1051     if(blob->stop_id > blob->start_id + 4)
1052 	    rc_cache=VBlobMRUCacheSave(cself->blob_mru_cache, col_idx, blob);
1053     if(rslt==NULL){ /** user does not care about the blob ***/
1054         if( rc_cache == 0){
1055             VBlobRelease((VBlob*)blob);
1056         } /** else the memory will leak **/
1057     } else {
1058         *rslt=blob;
1059     }
1060     return 0;
1061 }
1062 
1063 /* GetBlob
1064  *  retrieve a blob of data containing the current row id
1065  * GetBlobDirect
1066  *  retrieve a blob of data containing the requested row id
1067  *
1068  *  "blob" [ OUT ] - return parameter for a new reference
1069  *  to VBlob containing requested cell. NB - must be released
1070  *  via VBlobRelease when no longer needed.
1071  *
1072  *  "row_id" [ IN ] - allows ReadDirect random access to any cell
1073  *  in column
1074  *
1075  *  "col_idx" [ IN ] - index of column to be read, returned by "AddColumn"
1076  */
VTableCursorGetBlob(const VCURSOR_IMPL * self,const VBlob ** blob,uint32_t col_idx)1077 rc_t VTableCursorGetBlob ( const VCURSOR_IMPL *self,
1078     const VBlob **blob, uint32_t col_idx )
1079 {
1080     rc_t rc;
1081 
1082     if ( blob == NULL )
1083         rc = RC ( rcVDB, rcCursor, rcAccessing, rcParam, rcNull );
1084     else
1085     {
1086         if ( ! self -> read_only )
1087             rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcWriteonly );
1088         else
1089         {
1090             const void *base;
1091             uint32_t elem_bits, boff, row_len;
1092 
1093             switch ( self -> dad . state )
1094             {
1095             case vcConstruct:
1096                 rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcNotOpen );
1097                 break;
1098             case vcReady:
1099                 rc = RC ( rcVDB, rcCursor, rcReading, rcRow, rcNotOpen );
1100                 break;
1101             case vcRowOpen:
1102                 rc = VCursorReadColumnDirectInt( self, self -> dad . row_id, col_idx, &elem_bits, &base, &boff, &row_len, NULL, blob );
1103                 if ( rc == 0 )
1104                 {
1105                     rc = VBlobAddRef ( ( VBlob* ) *blob );
1106                     if ( rc == 0 )
1107                         return 0;
1108                 }
1109                 break;
1110             default:
1111                 rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcInvalid );
1112             }
1113         }
1114 
1115         * blob = NULL;
1116     }
1117     return rc;
1118 }
1119 
VTableCursorGetBlobDirect(const VCURSOR_IMPL * self,const VBlob ** blob,int64_t row_id,uint32_t col_idx)1120 rc_t VTableCursorGetBlobDirect ( const VCURSOR_IMPL *self,
1121     const VBlob **blob, int64_t row_id, uint32_t col_idx )
1122 {
1123     rc_t rc;
1124 
1125     if ( blob == NULL )
1126         rc = RC ( rcVDB, rcCursor, rcAccessing, rcParam, rcNull );
1127     else
1128     {
1129         if ( ! self -> read_only )
1130             rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcWriteonly );
1131         else
1132         {
1133             const void *base;
1134             uint32_t elem_bits, boff, row_len;
1135 
1136             switch ( self -> dad . state )
1137             {
1138             case vcConstruct:
1139                 rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcNotOpen );
1140                 break;
1141             case vcReady:
1142             case vcRowOpen:
1143                 rc = VCursorReadColumnDirectInt ( self, row_id, col_idx, &elem_bits, &base, &boff, &row_len, NULL, blob );
1144                 if ( rc == 0 )
1145                 {
1146                     rc = VBlobAddRef ( ( VBlob* ) *blob );
1147                     if ( rc == 0 )
1148                         return 0;
1149                 }
1150                 break;
1151             default:
1152                 rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcInvalid );
1153             }
1154         }
1155 
1156         * blob = NULL;
1157     }
1158     return rc;
1159 }
1160 
1161 static
VCursorReadColumnDirect(const VTableCursor * self,int64_t row_id,uint32_t col_idx,uint32_t * elem_bits,const void ** base,uint32_t * boff,uint32_t * row_len)1162 rc_t VCursorReadColumnDirect ( const VTableCursor *self, int64_t row_id, uint32_t col_idx,
1163     uint32_t *elem_bits, const void **base, uint32_t *boff, uint32_t *row_len )
1164 {
1165     bool cache_col_active_save;
1166     if ( ! self -> read_only )
1167         return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcWriteonly );
1168 
1169     switch ( self -> dad . state )
1170     {
1171 		case vcConstruct : return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcNotOpen );
1172 		case vcReady :
1173 		case vcRowOpen :  break;
1174 		default : return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcInvalid );
1175     }
1176 
1177     cache_col_active_save = self->cache_col_active;
1178     ( ( VTableCursor* ) self )->cache_col_active = false;
1179     if ( self->cache_curs != NULL )
1180 	{
1181 		const VCursor *curs = VectorGet( &self->v_cache_curs, col_idx );
1182 		if ( curs != NULL )
1183 		{
1184 			( ( VTableCursor* ) self )->cache_col_active = true;
1185 			if ( self->cache_empty_start == 0 ||
1186 			     row_id < self->cache_empty_start ||
1187 				 row_id > self->cache_empty_end )
1188 			{
1189 				uint32_t repeat_count;
1190 				uint32_t cidx = ( uint32_t )( uint64_t )VectorGet( &self->v_cache_cidx, col_idx );
1191 				rc_t rc2 = VCursorReadColumnDirectInt( ( const VTableCursor * ) curs, row_id, cidx, elem_bits, base, boff, row_len, &repeat_count, NULL );
1192 				if ( rc2 == 0 )
1193 				{
1194 					if ( *row_len > 0 )
1195 					{
1196 						( ( VTableCursor* )self )->cache_col_active = cache_col_active_save;
1197 						return 0;
1198 					}
1199 					else
1200 					{
1201 						/*** save window where cache is useless */
1202 						( ( VTableCursor* )self )->cache_empty_start = row_id;
1203 						( ( VTableCursor* )self )->cache_empty_end = row_id + repeat_count - 1;
1204 					}
1205 				}
1206 			}
1207 		}
1208 	}
1209 
1210     {
1211 		rc_t rc = VCursorReadColumnDirectInt( self, row_id, col_idx, elem_bits, base, boff, row_len, NULL, NULL );
1212 		( ( VTableCursor* )self )->cache_col_active = cache_col_active_save;
1213 		return rc;
1214     }
1215 }
1216 
1217 
1218 static
VCursorReadColumn(const VTableCursor * self,uint32_t col_idx,uint32_t * elem_bits,const void ** base,uint32_t * boff,uint32_t * row_len)1219 rc_t VCursorReadColumn ( const VTableCursor *self, uint32_t col_idx,
1220     uint32_t *elem_bits, const void **base, uint32_t *boff, uint32_t *row_len )
1221 {
1222     rc_t rc = 0;
1223     int64_t row_id = self -> dad . row_id;
1224     bool cache_col_active_save;
1225     if ( ! self -> read_only )
1226         return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcWriteonly );
1227 
1228     switch ( self -> dad . state )
1229     {
1230     case vcConstruct:
1231         return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcNotOpen );
1232     case vcReady:
1233         return RC ( rcVDB, rcCursor, rcReading, rcRow, rcNotOpen );
1234     case vcRowOpen:
1235         break;
1236     default:
1237         return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcInvalid );
1238     }
1239     cache_col_active_save = self->cache_col_active;
1240     ((VTableCursor*)self)->cache_col_active=false;
1241     if(self->cache_curs)
1242     {
1243         const VCursor *curs=VectorGet(&self->v_cache_curs,col_idx);
1244         if(curs)
1245         {
1246             ((VTableCursor*)self)->cache_col_active=true;
1247             if(self->cache_empty_start == 0 ||  row_id < self->cache_empty_start || row_id > self->cache_empty_end)
1248             {
1249                 uint32_t repeat_count;
1250                 uint32_t cidx =  (uint32_t)(uint64_t)VectorGet(&self->v_cache_cidx,col_idx);
1251                 rc_t rc2 = VCursorReadColumnDirectInt( ( const VTableCursor * ) curs,row_id, cidx, elem_bits, base, boff, row_len, &repeat_count,NULL );
1252                 if(rc2==0)
1253                 {
1254                     if(*row_len > 0)
1255                     {
1256                         ((VTableCursor*)self)->cache_col_active=cache_col_active_save;
1257                         return 0;
1258                     }
1259                     else
1260                     {
1261                         /*** save window where cache is useless */
1262                         ((VTableCursor*)self)->cache_empty_start = row_id;
1263                         ((VTableCursor*)self)->cache_empty_end = row_id + repeat_count -1;
1264                     }
1265                 }
1266             }
1267         }
1268     }
1269 
1270     rc=VCursorReadColumnDirectInt ( self, row_id, col_idx, elem_bits, base, boff, row_len, NULL, NULL );
1271     ((VTableCursor*)self)->cache_col_active=cache_col_active_save;
1272     return rc;
1273 }
1274 
1275 static __inline__
bad_elem_bits(uint32_t elem_size,uint32_t elem_bits)1276 bool bad_elem_bits ( uint32_t elem_size, uint32_t elem_bits )
1277 {
1278     if ( elem_size != elem_bits )
1279     {
1280         if ( elem_size < elem_bits && elem_bits % elem_size != 0 )
1281             return true;
1282         return ( elem_size % elem_bits != 0 );
1283     }
1284     return false;
1285 }
1286 
VTableCursorRead(const VCURSOR_IMPL * self,uint32_t col_idx,uint32_t elem_bits,void * buffer,uint32_t blen,uint32_t * row_len)1287 rc_t VTableCursorRead ( const VCURSOR_IMPL *self, uint32_t col_idx,
1288     uint32_t elem_bits, void *buffer, uint32_t blen, uint32_t *row_len )
1289 {
1290     rc_t rc;
1291 
1292     if ( row_len == NULL )
1293         rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1294     else
1295     {
1296         if ( elem_bits == 0 || ( elem_bits & 7 ) != 0 )
1297             rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcInvalid );
1298         else
1299         {
1300             uint32_t elem_size; const void *base; uint32_t boff;
1301             rc = VCursorReadColumn ( self, col_idx, & elem_size,
1302                 & base, & boff, row_len );
1303             if ( rc == 0 )
1304             {
1305                 if ( bad_elem_bits ( elem_size, elem_bits ) )
1306                     rc = RC ( rcVDB, rcCursor, rcReading, rcType, rcInconsistent );
1307                 else if ( * row_len != 0 )
1308                 {
1309                     if ( blen == 0 )
1310                         return RC ( rcVDB, rcCursor, rcReading, rcBuffer, rcInsufficient );
1311                     if ( buffer == NULL )
1312                         rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1313                     else
1314                     {
1315                         uint64_t to_read = * row_len * elem_size;
1316                         uint64_t bsize = blen * elem_bits;
1317 
1318                         /* always return the required buffer size */
1319                         * row_len = ( uint32_t ) ( to_read / elem_bits );
1320 
1321                         /* detect buffer too small */
1322                         if ( to_read > bsize )
1323                         {
1324                             rc = RC ( rcVDB, rcCursor, rcReading, rcBuffer, rcInsufficient );
1325                             to_read = bsize;
1326                         }
1327 
1328                         /* copy out data up to limit */
1329                         assert ( boff == 0 );
1330                         memmove ( buffer, base, ( size_t ) ( to_read >> 3 ) );
1331 
1332                         return rc;
1333                     }
1334                 }
1335             }
1336         }
1337 
1338         * row_len = 0;
1339     }
1340 
1341     return rc;
1342 }
1343 
VTableCursorReadDirect(const VCURSOR_IMPL * self,int64_t row_id,uint32_t col_idx,uint32_t elem_bits,void * buffer,uint32_t blen,uint32_t * row_len)1344 rc_t VTableCursorReadDirect ( const VCURSOR_IMPL *self, int64_t row_id, uint32_t col_idx,
1345     uint32_t elem_bits, void *buffer, uint32_t blen, uint32_t *row_len )
1346 {
1347     rc_t rc;
1348 
1349     if ( row_len == NULL )
1350         rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1351     else
1352     {
1353         if ( elem_bits == 0 || ( elem_bits & 7 ) != 0 )
1354             rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcInvalid );
1355         else
1356         {
1357             uint32_t elem_size; const void *base; uint32_t boff;
1358             rc = VCursorReadColumnDirect ( self, row_id, col_idx,
1359                 & elem_size, & base, & boff, row_len );
1360             if ( rc == 0 )
1361             {
1362                 if ( bad_elem_bits ( elem_size, elem_bits ) )
1363                     rc = RC ( rcVDB, rcCursor, rcReading, rcType, rcInconsistent );
1364                 else if ( * row_len != 0 )
1365                 {
1366                     if ( blen == 0 )
1367                         return RC ( rcVDB, rcCursor, rcReading, rcBuffer, rcInsufficient );
1368                     if ( buffer == NULL )
1369                         rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1370                     else
1371                     {
1372                         uint64_t to_read = * row_len * elem_size;
1373                         uint64_t bsize = blen * elem_bits;
1374 
1375                         /* always return the required buffer size */
1376                         * row_len = ( uint32_t ) ( to_read / elem_bits );
1377 
1378                         /* detect buffer too small */
1379                         if ( to_read > bsize )
1380                         {
1381                             rc = RC ( rcVDB, rcCursor, rcReading, rcBuffer, rcInsufficient );
1382                             to_read = bsize;
1383                         }
1384 
1385                         /* copy out data up to limit */
1386                         assert ( boff == 0 );
1387                         memmove ( buffer, base, ( size_t ) ( to_read >> 3 ) );
1388 
1389                         return rc;
1390                     }
1391                 }
1392             }
1393         }
1394 
1395         * row_len = 0;
1396     }
1397 
1398     return rc;
1399 }
1400 
1401 
1402 /* ReadBits
1403  *  read single row of potentially bit-aligned column data into a buffer
1404  *
1405  *  "col_idx" [ IN ] - index of column to be read, returned by "AddColumn"
1406  *
1407  *  "elem_bits" [ IN ] - expected element size in bits, required to be
1408  *  compatible with the actual element size, and may ( or may not ) be
1409  *  a multiple of 8 ( byte aligned ).
1410  *
1411  *  "start" [ IN ] - zero-based starting index to first element,
1412  *  valid from 0 .. row_len - 1
1413  *
1414  *  "buffer" [ IN ], "boff" [ IN ] and "blen" [ IN ] -
1415  *  return buffer for row data, where "boff" is in BITS
1416  *  and "blen" is in ELEMENTS.
1417  *
1418  *  "num_read" [ OUT ] - return parameter for the number of elements
1419  *  read, which is <= "blen"
1420  *
1421  *  "remaining" [ OUT, NULL OKAY ] - optional return parameter for
1422  *  the number of elements remaining to be read. specifically,
1423  *  "start" + "num_read" + "remaining" == row length, assuming that
1424  *  "start" <= row length.
1425  */
VTableCursorReadBits(const VCURSOR_IMPL * self,uint32_t col_idx,uint32_t elem_bits,uint32_t start,void * buffer,uint32_t off,uint32_t blen,uint32_t * num_read,uint32_t * remaining)1426 rc_t VTableCursorReadBits ( const VCURSOR_IMPL *self, uint32_t col_idx,
1427     uint32_t elem_bits, uint32_t start, void *buffer, uint32_t off,
1428     uint32_t blen, uint32_t *num_read, uint32_t *remaining )
1429 {
1430     rc_t rc;
1431 
1432     uint32_t dummy;
1433     if ( remaining == NULL )
1434         remaining = & dummy;
1435 
1436     if ( num_read == NULL )
1437         rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1438     else
1439     {
1440         if ( elem_bits == 0 )
1441             rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcInvalid );
1442         else
1443         {
1444             uint32_t elem_size; const void *base; uint32_t boff;
1445             rc = VCursorReadColumn ( self, col_idx, & elem_size,
1446                 & base, & boff, num_read );
1447             if ( rc == 0 )
1448             {
1449                 if ( bad_elem_bits ( elem_size, elem_bits ) )
1450                     rc = RC ( rcVDB, rcCursor, rcReading, rcType, rcInconsistent );
1451                 else if ( * num_read != 0 )
1452                 {
1453                     uint64_t to_read = * num_read * elem_size;
1454                     uint64_t doff = start * elem_bits;
1455                     to_read = to_read > doff ? to_read - doff : 0;
1456                     if ( blen == 0 )
1457                     {
1458                         * num_read = 0;
1459                         * remaining = ( uint32_t ) ( to_read / elem_bits );
1460                         return 0;
1461                     }
1462 
1463                     if ( buffer == NULL )
1464                         rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1465                     else
1466                     {
1467                         uint64_t bsize = blen * elem_size;
1468                         if ( to_read <= bsize )
1469                             * remaining = 0;
1470                         else
1471                         {
1472                             * remaining = (uint32_t)( ( to_read - bsize ) / elem_bits );
1473                             to_read = bsize;
1474                         }
1475                         bitcpy ( buffer, off, base, boff + doff, ( bitsz_t ) to_read );
1476                         * num_read = ( uint32_t ) ( to_read / elem_bits );
1477                         return 0;
1478                     }
1479                 }
1480             }
1481         }
1482 
1483         * num_read = 0;
1484     }
1485 
1486     * remaining = 0;
1487 
1488     return rc;
1489 }
1490 
VTableCursorReadBitsDirect(const VCURSOR_IMPL * self,int64_t row_id,uint32_t col_idx,uint32_t elem_bits,uint32_t start,void * buffer,uint32_t off,uint32_t blen,uint32_t * num_read,uint32_t * remaining)1491 rc_t VTableCursorReadBitsDirect ( const VCURSOR_IMPL *self, int64_t row_id, uint32_t col_idx,
1492     uint32_t elem_bits, uint32_t start, void *buffer, uint32_t off,
1493     uint32_t blen, uint32_t *num_read, uint32_t *remaining )
1494 {
1495     rc_t rc;
1496 
1497     uint32_t dummy;
1498     if ( remaining == NULL )
1499         remaining = & dummy;
1500 
1501     if ( num_read == NULL )
1502         rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1503     else
1504     {
1505         if ( elem_bits == 0 )
1506             rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcInvalid );
1507         else
1508         {
1509             uint32_t elem_size; const void *base; uint32_t boff;
1510             rc = VCursorReadColumnDirect ( self, row_id, col_idx,
1511                 & elem_size, & base, & boff, num_read );
1512             if ( rc == 0 )
1513             {
1514                 if ( bad_elem_bits ( elem_size, elem_bits ) )
1515                     rc = RC ( rcVDB, rcCursor, rcReading, rcType, rcInconsistent );
1516                 else if ( * num_read != 0 )
1517                 {
1518                     uint64_t to_read = (uint64_t)(* num_read) * elem_size;
1519                     uint64_t doff = (uint64_t)start * elem_bits;
1520                     to_read = to_read > doff ? to_read - doff : 0;
1521                     if ( blen == 0 )
1522                     {
1523                         * num_read = 0;
1524                         * remaining = ( uint32_t ) ( to_read / elem_bits );
1525                         return 0;
1526                     }
1527 
1528                     if ( buffer == NULL )
1529                         rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1530                     else
1531                     {
1532                         uint64_t bsize = (uint64_t)blen * elem_size;
1533                         if ( to_read <= bsize )
1534                             * remaining = 0;
1535                         else
1536                         {
1537                             * remaining = (uint32_t)( ( to_read - bsize ) / elem_bits );
1538                             to_read = bsize;
1539                         }
1540                         bitcpy ( buffer, off, base, boff + doff, ( bitsz_t ) to_read );
1541                         * num_read = ( uint32_t ) ( to_read / elem_bits );
1542                         return 0;
1543                     }
1544                 }
1545             }
1546         }
1547 
1548         * num_read = 0;
1549     }
1550 
1551     * remaining = 0;
1552 
1553     return rc;
1554 }
1555 
1556 
1557 /* CellData
1558  *  access pointer to single cell of potentially bit-aligned column data
1559  *
1560  *  "col_idx" [ IN ] - index of column to be read, returned by "AddColumn"
1561  *
1562  *  "elem_bits" [ OUT, NULL OKAY ] - optional return parameter for
1563  *  element size in bits
1564  *
1565  *  "base" [ OUT ] and "boff" [ OUT, NULL OKAY ] -
1566  *  compound return parameter for pointer to row starting bit
1567  *  where "boff" is in BITS
1568  *
1569  *  "row_len" [ OUT, NULL OKAY ] - the number of elements in row
1570  */
VTableCursorCellData(const VCURSOR_IMPL * self,uint32_t col_idx,uint32_t * elem_bits,const void ** base,uint32_t * boff,uint32_t * row_len)1571 rc_t VTableCursorCellData ( const VCURSOR_IMPL *self, uint32_t col_idx,
1572     uint32_t *elem_bits, const void **base, uint32_t *boff, uint32_t *row_len )
1573 {
1574     rc_t rc;
1575 
1576     uint32_t dummy [ 3 ];
1577     if ( row_len == NULL )
1578         row_len = & dummy [ 0 ];
1579     if ( boff == NULL )
1580         boff = & dummy [ 1 ];
1581     if ( elem_bits == NULL )
1582         elem_bits = & dummy [ 2 ];
1583 
1584     if ( base == NULL )
1585         rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1586     else
1587     {
1588         rc = VCursorReadColumn ( self, col_idx,
1589             elem_bits, base, boff, row_len );
1590         if ( rc == 0 )
1591             return 0;
1592 
1593         * base = NULL;
1594     }
1595 
1596     * elem_bits = 0;
1597     * boff = 0;
1598     * row_len = 0;
1599 
1600     return rc;
1601 }
1602 
VTableCursorCellDataDirect(const VCURSOR_IMPL * self,int64_t row_id,uint32_t col_idx,uint32_t * elem_bits,const void ** base,uint32_t * boff,uint32_t * row_len)1603 rc_t VTableCursorCellDataDirect ( const VCURSOR_IMPL *self, int64_t row_id, uint32_t col_idx,
1604     uint32_t *elem_bits, const void **base, uint32_t *boff, uint32_t *row_len )
1605 {
1606     rc_t rc;
1607 
1608     uint32_t dummy [ 3 ];
1609     if ( row_len == NULL )
1610         row_len = & dummy[ 0 ];
1611     if ( boff == NULL )
1612         boff = & dummy [ 1 ];
1613     if ( elem_bits == NULL )
1614         elem_bits = & dummy [ 2 ];
1615 
1616     if ( base == NULL )
1617         rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1618     else
1619     {
1620         rc = VCursorReadColumnDirect ( self, row_id, col_idx,
1621                 elem_bits, base, boff, row_len );
1622         if ( rc == 0 )
1623             return 0;
1624 
1625         * base = NULL;
1626     }
1627 
1628     * elem_bits = 0;
1629     * boff = 0;
1630     * row_len = 0;
1631 
1632     return rc;
1633 }
1634 
VTableCursorDataPrefetch(const VCURSOR_IMPL * cself,const int64_t * row_ids,uint32_t col_idx,uint32_t num_rows,int64_t min_valid_row_id,int64_t max_valid_row_id,bool continue_on_error)1635 rc_t VTableCursorDataPrefetch ( const VCURSOR_IMPL *cself,
1636 								const int64_t *row_ids,
1637 								uint32_t col_idx,
1638 								uint32_t num_rows,
1639 								int64_t min_valid_row_id,
1640 								int64_t max_valid_row_id,
1641 								bool continue_on_error )
1642 {
1643 	rc_t rc=0;
1644 	const VColumn *col = ( const void* ) VectorGet ( & cself -> dad . row, col_idx );
1645 	if ( col == NULL )
1646 	{
1647 		return RC ( rcVDB, rcCursor, rcReading, rcColumn, rcInvalid );
1648 	}
1649 
1650 	if ( cself->blob_mru_cache && num_rows > 0 )
1651 	{
1652 		int64_t *row_ids_sorted = malloc( num_rows * sizeof( *row_ids_sorted ) );
1653 		if ( row_ids_sorted != NULL )
1654 		{
1655 			uint32_t i, num_rows_sorted;
1656 			for( i = 0, num_rows_sorted = 0; i < num_rows; i++ )
1657 			{
1658 				int64_t row_id = row_ids[ i ];
1659 				if ( row_id >= min_valid_row_id && row_id <= max_valid_row_id )
1660 				{
1661 					row_ids_sorted[ num_rows_sorted++ ] = row_id;
1662 				}
1663 			}
1664 			if ( num_rows_sorted > 0 )
1665 			{
1666 				int64_t last_cached_row_id = INT64_MIN;
1667 				bool first_time = true;
1668 				ksort_int64_t( row_ids_sorted, num_rows_sorted );
1669 				for	( i = 0; rc==0 && i < num_rows_sorted; i++ )
1670 				{
1671 					int64_t row_id = row_ids_sorted[ i ];
1672 					if ( last_cached_row_id < row_id )
1673 					{
1674 						VBlob * blob = ( VBlob* )VBlobMRUCacheFind( cself->blob_mru_cache, col_idx, row_id );
1675 						if ( blob != NULL )
1676 						{
1677 							last_cached_row_id = blob->stop_id;
1678 						}
1679 						else
1680 						{
1681 							/* prefetch it **/
1682 							/** ask production for the blob **/
1683 							VBlobMRUCacheCursorContext cctx;
1684 
1685 							cctx.cache = cself -> blob_mru_cache;
1686 							cctx.col_idx = col_idx;
1687 							rc = VProductionReadBlob ( col->in, & blob, & row_id, 1, &cctx );
1688 							if ( rc == 0 )
1689 							{
1690 								rc_t rc_cache;
1691 								/** always cache prefetch requests **/
1692 								if ( first_time )
1693 								{
1694 									VBlobMRUCacheResumeFlush( cself->blob_mru_cache ); /** next call will clean cache if too big **/
1695 									rc_cache = VBlobMRUCacheSave( cself->blob_mru_cache, col_idx, blob );
1696 									VBlobMRUCacheSuspendFlush( cself->blob_mru_cache ); /** suspending for the rest **/
1697 									first_time = false;
1698 								}
1699 								else
1700 								{
1701 									rc_cache = VBlobMRUCacheSave( cself->blob_mru_cache, col_idx, blob );
1702 								}
1703 
1704 								if ( rc_cache == 0 )
1705 								{
1706 									VBlobRelease( blob );
1707 									last_cached_row_id = blob->stop_id;
1708 								}
1709 							}
1710 							else if ( continue_on_error )
1711 							{
1712 								rc = 0; /** reset failed row ***/
1713 								last_cached_row_id = row_id; /*** and skip it **/
1714 							}
1715 						}
1716 					}
1717 				}
1718 			}
1719 			free( row_ids_sorted );
1720 		}
1721 		else
1722 		{
1723 			rc= RC( rcVDB, rcCursor, rcReading, rcMemory, rcExhausted );
1724 		}
1725 	}
1726 	return rc;
1727 }
1728 
1729 
1730 /* OpenParent
1731  *  duplicate reference to parent table
1732  *  NB - returned reference must be released
1733  */
VTableCursorOpenParentRead(const VCURSOR_IMPL * self,const VTable ** tbl)1734 rc_t VTableCursorOpenParentRead ( const VCURSOR_IMPL *self, const VTable **tbl )
1735 {
1736     rc_t rc;
1737 
1738     if ( tbl == NULL )
1739         rc = RC ( rcVDB, rcCursor, rcAccessing, rcParam, rcNull );
1740     else
1741     {
1742         rc = VTableAddRef ( self -> tbl );
1743         if ( rc == 0 )
1744         {
1745             * tbl = self -> tbl;
1746             return 0;
1747         }
1748 
1749         * tbl = NULL;
1750     }
1751 
1752     return rc;
1753 }
1754 
1755 struct insert_overloaded_pb
1756 {
1757     VCursor *curs;
1758     Vector *cx_bind;
1759 };
1760 
1761 static
insert_overloaded_scolumns(void * item,void * data)1762 void CC insert_overloaded_scolumns ( void *item, void *data )
1763 {
1764     struct insert_overloaded_pb *pb = data;
1765     const SColumn *scol = ( const void* ) item;
1766 
1767     uint32_t ignore;
1768     VTableCursorAddSColumn ( ( VTableCursor * ) pb -> curs, & ignore, scol, NULL, pb -> cx_bind );
1769 }
1770 
1771 static
VCursorListCol_walk_through_columns_and_add_to_cursor(VTableCursor * self)1772 void VCursorListCol_walk_through_columns_and_add_to_cursor ( VTableCursor *self )
1773 {
1774     uint32_t idx = VectorStart ( & self -> stbl -> cname );
1775     uint32_t end = VectorLength ( & self -> stbl -> cname );
1776 
1777     Vector cx_bind;
1778     struct insert_overloaded_pb pb;
1779     pb . curs = & self -> dad;
1780     pb . cx_bind = & cx_bind;
1781     VectorInit ( & cx_bind, 1, self -> schema -> num_indirect );
1782 
1783     for ( end += idx; idx < end; ++idx )
1784     {
1785         /* look at the table column name guy */
1786         const SNameOverload* ol_entry = ( const SNameOverload* ) VectorGet ( & self -> stbl -> cname, idx );
1787         if ( ol_entry != NULL )
1788             VectorForEach ( & ol_entry -> items, false, insert_overloaded_scolumns, & pb );
1789     }
1790 
1791     VectorWhack ( & cx_bind, NULL, NULL );
1792 }
1793 
1794 static
VCursorListCol_consolidate_and_insert(const VTableCursor * self,BSTree * columns)1795 rc_t VCursorListCol_consolidate_and_insert( const VTableCursor *self, BSTree *columns )
1796 {
1797     rc_t rc = VCursorOpenForListing ( ToConstVCursor ( self ) );
1798     if ( rc == 0 )
1799     {
1800         uint32_t idx = VectorStart ( & self -> dad . row );
1801         uint32_t end = VectorLength ( & self -> dad . row );
1802 
1803         for ( end += idx; idx < end; ++idx )
1804         {
1805             const VColumn* vcol = ( const VColumn* ) VectorGet ( & self -> dad . row, idx );
1806             if ( vcol != NULL )
1807             {
1808                 VColumnRef *cref;
1809                 rc = VColumnRefMake ( & cref, self -> schema, vcol -> scol );
1810                 if ( rc != 0 )
1811                     break;
1812 
1813                 rc = BSTreeInsert ( columns, & cref -> n, VColumnRefSort );
1814                 assert ( rc == 0 );
1815             }
1816         }
1817     }
1818 
1819     return rc;
1820 }
1821 
1822 
1823 /* ListReadableColumns
1824  *  performs an insert of '*' to cursor
1825  *  attempts to resolve all read rules
1826  *  records all SColumns that successfully resolved
1827  *  populates BTree with VColumnRef objects
1828  */
VCursorListReadableColumns(VCURSOR_IMPL * self,BSTree * columns)1829 rc_t VCursorListReadableColumns ( VCURSOR_IMPL *self, BSTree *columns )
1830 {
1831     /* add '*' to cursor */
1832     VCursorListCol_walk_through_columns_and_add_to_cursor ( self );
1833 
1834     /* insert all columns into tree */
1835     return VCursorListCol_consolidate_and_insert ( self, columns );
1836 }
1837 
VTableCursorLinkedCursorGet(const VCURSOR_IMPL * cself,const char * tbl,VCursor const ** curs)1838 rc_t VTableCursorLinkedCursorGet(const VCURSOR_IMPL * cself,const char *tbl,VCursor const **curs)
1839 {
1840     rc_t rc;
1841 
1842     if ( curs == NULL )
1843         rc = RC ( rcVDB, rcCursor, rcAccessing, rcParam, rcNull );
1844     else
1845     {
1846         if ( cself == NULL )
1847             rc = RC(rcVDB, rcCursor, rcAccessing, rcSelf, rcNull);
1848         else if ( tbl == NULL )
1849             rc = RC(rcVDB, rcCursor, rcAccessing, rcName, rcNull);
1850         else if ( tbl [ 0 ] == 0 )
1851             rc = RC(rcVDB, rcCursor, rcAccessing, rcName, rcEmpty);
1852         else
1853         {
1854             LinkedCursorNode *node = (LinkedCursorNode *)
1855                 BSTreeFind( & cself -> linked_cursors, tbl, LinkedCursorComp);
1856 
1857             if (node == NULL)
1858                 rc = RC(rcVDB, rcCursor, rcAccessing, rcName, rcNotFound);
1859             else
1860             {
1861                 rc = VCursorAddRef ( node -> curs );
1862                 if ( rc == 0 )
1863                 {
1864                     * curs = node -> curs;
1865                     return 0;
1866                 }
1867             }
1868         }
1869 
1870         * curs = NULL;
1871     }
1872 
1873     return rc;
1874 }
1875 
VTableCursorLinkedCursorSet(const VCURSOR_IMPL * cself,const char * tbl,VCursor const * curs)1876 rc_t VTableCursorLinkedCursorSet(const VCURSOR_IMPL *cself,const char *tbl,VCursor const *curs)
1877 {
1878     rc_t rc;
1879     VTableCursor *self = (VCURSOR_IMPL *)cself;
1880 
1881     if(cself == NULL)
1882         rc = RC(rcVDB, rcCursor, rcAccessing, rcSelf, rcNull);
1883     else if(tbl == NULL)
1884         rc = RC(rcVDB, rcCursor, rcAccessing, rcName, rcNull);
1885     else if(tbl[0] == '\0')
1886         rc = RC(rcVDB, rcCursor, rcAccessing, rcName, rcEmpty);
1887     else
1888     {
1889         rc = VCursorAddRef ( curs );
1890         if ( rc == 0 )
1891         {
1892             LinkedCursorNode *node = malloc ( sizeof * node );
1893             if (node == NULL)
1894                 rc = RC(rcVDB, rcCursor, rcAccessing, rcMemory, rcExhausted);
1895             else
1896             {
1897                 strncpy ( node->tbl, tbl, sizeof node->tbl );
1898                 node->curs = (VCursor*) curs;
1899                 rc = BSTreeInsertUnique( & self -> linked_cursors, (BSTNode *)node, NULL, LinkedCursorNodeComp);
1900                 if ( rc == 0 )
1901                 {
1902                     ( ( VTableCursor * ) curs )->is_sub_cursor = true;
1903                     return 0;
1904                 }
1905 
1906                 free ( node );
1907             }
1908 
1909             VCursorRelease ( curs );
1910         }
1911     }
1912 
1913     return rc;
1914 }
1915 
1916 
1917 /* private */
VCursorParamsGet(struct VCursorParams const * cself,const char * Name,KDataBuffer ** value)1918 LIB_EXPORT rc_t CC VCursorParamsGet( struct VCursorParams const *cself,
1919     const char *Name, KDataBuffer **value )
1920 {
1921     NamedParamNode *node;
1922     String name;
1923     VTableCursor *self = (VTableCursor *)cself; /*TODO: this looks very wrong. Cover with tests and fix */
1924 
1925     if (cself == NULL)
1926         return RC(rcVDB, rcCursor, rcAccessing, rcSelf, rcNull);
1927 
1928     if (Name == NULL)
1929         return RC(rcVDB, rcCursor, rcAccessing, rcName, rcNull);
1930 
1931     if (Name[0] == '\0')
1932         return RC(rcVDB, rcCursor, rcAccessing, rcName, rcEmpty);
1933 
1934     StringInitCString(&name, Name);
1935     node = (NamedParamNode *)BSTreeFind(&self->named_params, &name, NamedParamComp);
1936     if (node == NULL)
1937         return RC(rcVDB, rcCursor, rcAccessing, rcName, rcNotFound);
1938 
1939     *value = &node->value;
1940     return 0;
1941 }
1942 
1943 /* private */
VCursorParamsLookupOrCreate(struct VCursorParams const * cself,const char * Name,KDataBuffer ** value)1944 static rc_t VCursorParamsLookupOrCreate(struct VCursorParams const *cself,
1945                          const char *Name, KDataBuffer **value)
1946 {
1947     NamedParamNode *node;
1948     String name;
1949     VTableCursor *self = (VTableCursor *)cself; /*TODO: this looks very wrong. Cover with tests and fix */
1950     rc_t rc;
1951 
1952     StringInitCString(&name, Name);
1953     node = (NamedParamNode *)BSTreeFind(&self->named_params, &name, NamedParamComp);
1954     if (node == NULL) {
1955         node = malloc(sizeof(*node) + StringSize(&name) + 1);
1956         if (node == NULL)
1957             return RC(rcVDB, rcCursor, rcAccessing, rcMemory, rcExhausted);
1958 
1959         strcpy((char *)(&node[1]), Name);
1960         StringInit ( & node -> name, (const char *)(&node[1]), name . size, name . len );
1961 
1962         memset ( & node -> value, 0, sizeof node -> value );
1963         node -> value . elem_bits = 8;
1964 
1965         rc = BSTreeInsertUnique(&self->named_params, (BSTNode *)node, NULL, NamedParamNodeComp);
1966         assert(rc == 0);
1967     }
1968     *value = &node->value;
1969     return 0;
1970 }
1971 
VCursorParamsVSet(struct VCursorParams const * cself,const char * Name,const char * fmt,va_list args)1972 LIB_EXPORT rc_t CC VCursorParamsVSet(struct VCursorParams const *cself,
1973     const char *Name, const char *fmt, va_list args )
1974 {
1975     KDataBuffer *value;
1976     rc_t rc;
1977 
1978     if (cself == NULL)
1979         return RC(rcVDB, rcCursor, rcAccessing, rcSelf, rcNull);
1980 
1981     if (Name == NULL)
1982         return RC(rcVDB, rcCursor, rcAccessing, rcName, rcNull);
1983 
1984     if (Name[0] == '\0')
1985         return RC(rcVDB, rcCursor, rcAccessing, rcName, rcEmpty);
1986 
1987     rc = VCursorParamsLookupOrCreate(cself, Name, &value);
1988     if (rc == 0) {
1989         int n;
1990         char dummy[1], * buffer = dummy;
1991         size_t bsize = sizeof dummy;
1992 
1993         va_list copy;
1994         va_copy(copy, args);
1995 
1996         if ( value -> base != NULL )
1997         {
1998             buffer = value -> base;
1999             bsize = KDataBufferBytes ( value );
2000         }
2001 
2002         /* optimistic printf */
2003         n = vsnprintf ( buffer, bsize, fmt, copy );
2004         va_end(copy);
2005 
2006         if ( n < 0 || ( size_t ) n >= bsize )
2007         {
2008             rc = KDataBufferResize ( value, ( n < 0 ) ? 4096 : n + 1 );
2009             if (rc == 0)
2010             {
2011                 bsize = KDataBufferBytes ( value );
2012                 n = vsnprintf(value->base, bsize, fmt, args);
2013                 if ( n < 0 || ( size_t ) n >= bsize )
2014                 {
2015                     rc = RC ( rcVDB, rcCursor, rcUpdating, rcParam, rcInvalid );
2016                     KDataBufferWhack ( value );
2017                 }
2018             }
2019         }
2020 
2021         if ( rc == 0 )
2022             value -> elem_count = n;
2023     }
2024     return rc;
2025 }
2026 
VCursorParamsSet(struct VCursorParams const * cself,const char * name,const char * fmt,...)2027 LIB_EXPORT rc_t CC VCursorParamsSet( struct VCursorParams const *cself,
2028     const char *name, const char *fmt, ... )
2029 {
2030     va_list va;
2031     rc_t rc;
2032 
2033     va_start(va, fmt);
2034     rc = VCursorParamsVSet(cself, name, fmt, va);
2035     va_end(va);
2036     return rc;
2037 }
2038 
VCursorParamsUnset(struct VCursorParams const * cself,const char * Name)2039 LIB_EXPORT rc_t CC VCursorParamsUnset( struct VCursorParams const *cself, const char *Name ) {
2040     KDataBuffer *value;
2041     rc_t rc;
2042 
2043     if (cself == NULL)
2044         return RC(rcVDB, rcCursor, rcAccessing, rcSelf, rcNull);
2045 
2046     if (Name == NULL)
2047         return RC(rcVDB, rcCursor, rcAccessing, rcParam, rcNull);
2048 
2049     if (Name[0] == '\0')
2050         return RC(rcVDB, rcCursor, rcAccessing, rcParam, rcInvalid);
2051 
2052     rc = VCursorParamsGet(cself, Name, &value);
2053     if (rc == 0)
2054         KDataBufferWhack(value);
2055 
2056     return rc;
2057 }
2058 
VTableCursorGetSchema(const VTableCursor * self)2059 const struct VSchema * VTableCursorGetSchema ( const VTableCursor * self )
2060 {
2061     return self -> schema;
2062 }
2063 
run_pagemap_thread(const KThread * t,void * data)2064 static rc_t run_pagemap_thread ( const KThread *t, void *data )
2065 {
2066     rc_t rc;
2067     VTableCursor *self = data;
2068     /* acquire lock */
2069     MTCURSOR_DBG (( "run_pagemap_thread: acquiring lock\n" ));
2070     while((rc = KLockAcquire ( self -> pmpr.lock ))==0){
2071 CHECK_AGAIN:
2072 	switch(self->pmpr.state){
2073 	 case ePMPR_STATE_NONE: 		/* wait for new request */
2074 	 case ePMPR_STATE_SERIALIZE_DONE: 	/* wait for result pickup **/
2075 	 case ePMPR_STATE_DESERIALIZE_DONE:	/* wait for result pickup **/
2076 		MTCURSOR_DBG (( "run_pagemap_thread: waiting for new request\n" ));
2077 		rc = KConditionWait ( self -> pmpr.cond, self -> pmpr.lock );
2078 		goto CHECK_AGAIN;
2079 	 case ePMPR_STATE_EXIT: /** exit requested ***/
2080 		MTCURSOR_DBG (( "run_pagemap_thread: exit by request\n" ));
2081 		KLockUnlock(self -> pmpr.lock);
2082 		return 0;
2083 	 case ePMPR_STATE_DESERIALIZE_REQUESTED:
2084 		MTCURSOR_DBG (( "run_pagemap_thread: request to deserialize\n" ));
2085 		self->pmpr.rc = PageMapDeserialize(&self->pmpr.pm,self->pmpr.data.base,self->pmpr.data.elem_count,self->pmpr.row_count);
2086 		if(self->pmpr.rc == 0){
2087 			self->pmpr.rc=PageMapExpandFull(self->pmpr.pm);
2088 			/*self->pmpr.rc=PageMapExpand(self->pmpr.pm,self->pmpr.row_count<2048?self->pmpr.row_count-1:2048);*/
2089 			assert(self->pmpr.rc == 0);
2090 		}
2091 		self->pmpr.state = ePMPR_STATE_DESERIALIZE_DONE;
2092 		/*fprintf(stderr,"Pagemap %p Done R:%6d|DR:%d|LR:%d\n",self->pmpr.lock, self->pmpr.pm->row_count,self->pmpr.pm->data_recs,self->pmpr.pm->leng_recs);*/
2093 		KConditionSignal ( self -> pmpr.cond );
2094 		KLockUnlock(self -> pmpr.lock);
2095 		break;
2096 	 case ePMPR_STATE_SERIALIZE_REQUESTED:
2097 		MTCURSOR_DBG (( "run_pagemap_thread: request to serialize\n" ));
2098 		self->pmpr.rc = PageMapSerialize(self->pmpr.pm,&self->pmpr.data,0,&self->pmpr.elem_count);
2099 		self->pmpr.state = ePMPR_STATE_SERIALIZE_DONE;
2100 		KConditionSignal ( self -> pmpr.cond );
2101 		KLockUnlock(self -> pmpr.lock);
2102 		break;
2103 	 default:
2104 		assert(0);
2105 		KLockUnlock(self -> pmpr.lock);
2106 		return RC(rcVDB, rcPagemap, rcConverting, rcParam, rcInvalid );
2107 
2108 	}
2109     }
2110     MTCURSOR_DBG (( "run_pagemap_thread: exit\n" ));
2111     return rc;
2112 }
2113 
VTableCursorLaunchPagemapThread(VCURSOR_IMPL * curs)2114 rc_t VTableCursorLaunchPagemapThread(VCURSOR_IMPL *curs)
2115 {
2116 	rc_t rc = 0;
2117     assert ( curs != NULL );
2118     if(curs->pagemap_thread == NULL)
2119     {
2120         if(--curs->launch_cnt<=0)
2121         {
2122             /* ignoring errors because we operate with or without thread */
2123             curs -> pagemap_thread = NULL; /** if fails - will not use **/
2124 
2125             if ( s_disable_pagemap_thread )
2126                 return RC ( rcVDB, rcCursor, rcExecuting, rcThread, rcNotAvailable );
2127 
2128             rc = KLockMake ( & curs -> pmpr.lock );
2129             if(rc == 0)
2130             {
2131                 rc = KConditionMake ( & curs -> pmpr.cond );
2132                 if(rc == 0)
2133                 {
2134                     rc = KThreadMake ( & curs -> pagemap_thread, run_pagemap_thread, curs );
2135                     if ( rc == 0 )
2136                         return 0;
2137 
2138                     KConditionRelease ( curs -> pmpr . cond );
2139                     curs -> pmpr . cond = NULL;
2140                 }
2141 
2142                 KLockRelease ( curs -> pmpr . lock );
2143                 curs -> pmpr . lock = NULL;
2144             }
2145         }
2146     }
2147 	return rc;
2148 }
2149 
VTableCursorTerminatePagemapThread(VCURSOR_IMPL * self)2150 rc_t VTableCursorTerminatePagemapThread(VCURSOR_IMPL *self)
2151 {
2152 	rc_t rc=0;
2153 
2154     assert ( self != NULL );
2155 
2156 	if(self -> pagemap_thread != NULL)
2157     {
2158 		rc = KLockAcquire ( self -> pmpr.lock );
2159 		if ( rc == 0 )
2160         {
2161 			self -> pmpr.state = ePMPR_STATE_EXIT;
2162 			KConditionSignal ( self -> pmpr.cond );
2163 			KLockUnlock ( self -> pmpr.lock );
2164 		}
2165 		KThreadWait ( self -> pagemap_thread, NULL );
2166 	}
2167 
2168 	KThreadRelease ( self -> pagemap_thread );
2169 	KConditionRelease ( self -> pmpr.cond );
2170 	KLockRelease ( self -> pmpr.lock );
2171 
2172     self -> pagemap_thread = NULL;
2173     self -> pmpr . cond = NULL;
2174     self -> pmpr . lock = NULL;
2175 
2176 	return rc;
2177 }
2178 
2179 /* DisablePagemapThread
2180  *  this can cause difficulties for some clients
2181  */
VDBManagerDisablePagemapThread(struct VDBManager const * self)2182 LIB_EXPORT rc_t CC VDBManagerDisablePagemapThread ( struct VDBManager const *self )
2183 {
2184     if ( self == NULL )
2185         return RC ( rcVDB, rcMgr, rcUpdating, rcSelf, rcNull );
2186     s_disable_pagemap_thread = true;
2187     return 0;
2188 }
2189 
VTableCursorSetCacheCapacity(VCURSOR_IMPL * self,uint64_t capacity)2190 uint64_t CC VTableCursorSetCacheCapacity(VCURSOR_IMPL *self,uint64_t capacity)
2191 {
2192 	if(self) return VBlobMRUCacheSetCapacity(self->blob_mru_cache,capacity);
2193         return 0;
2194 }
VTableCursorGetCacheCapacity(const VCURSOR_IMPL * self)2195 uint64_t CC VTableCursorGetCacheCapacity(const VCURSOR_IMPL *self)
2196 {
2197 	if(self) return VBlobMRUCacheGetCapacity(self->blob_mru_cache);
2198 	return 0;
2199 }
2200 
VTableCursorPageMapProcessRequest(const struct VCURSOR_IMPL * self)2201 const PageMapProcessRequest* VTableCursorPageMapProcessRequest(const struct VCURSOR_IMPL *self)
2202 {
2203     assert ( self != NULL );
2204     return & self -> pmpr;
2205 }
2206 
VTableCursorGetTable(const struct VCURSOR_IMPL * self)2207 const struct VTable * VTableCursorGetTable ( const struct VCURSOR_IMPL * self )
2208 {
2209     assert ( self != NULL );
2210     return self -> tbl;
2211 }
2212 
VTableCursorCacheActive(const struct VCURSOR_IMPL * self,int64_t * cache_empty_end)2213 bool VTableCursorCacheActive ( const struct VCURSOR_IMPL * self, int64_t * cache_empty_end )
2214 {
2215     assert ( self != NULL );
2216     assert ( cache_empty_end != NULL );
2217     if ( self -> cache_curs && self -> cache_col_active )
2218     {
2219         * cache_empty_end = self -> cache_empty_end;
2220         return true;
2221     }
2222     * cache_empty_end = 0;
2223     return false;
2224 }
2225 
VTableCursorIsReadOnly(const struct VCURSOR_IMPL * self)2226 bool VTableCursorIsReadOnly ( const struct VCURSOR_IMPL * self )
2227 {
2228     return self -> read_only;
2229 }
2230 
VTableCursorGetBlobMruCache(struct VCURSOR_IMPL * self)2231 VBlobMRUCache * VTableCursorGetBlobMruCache ( struct VCURSOR_IMPL * self )
2232 {
2233     assert ( self != NULL );
2234     return self -> blob_mru_cache;
2235 }
2236 
VTableCursorIncrementPhysicalProductionCount(struct VCURSOR_IMPL * self)2237 uint32_t VTableCursorIncrementPhysicalProductionCount ( struct VCURSOR_IMPL * self )
2238 {
2239     assert ( self != NULL );
2240     return ++ self -> phys_cnt;
2241 }
2242 
VTableCursorFindOverride(const VCURSOR_IMPL * self,const struct VCtxId * cid)2243 const struct KSymbol * VTableCursorFindOverride ( const VCURSOR_IMPL * self, const struct VCtxId * cid )
2244 {
2245     return STableFindOverride ( self -> stbl, cid );
2246 }
2247 
VTableCursorGetBoundTable(const struct VCURSOR_IMPL * self,const struct String * name)2248 const struct VTable * VTableCursorGetBoundTable( const struct VCURSOR_IMPL * self, const struct String * name )
2249 {
2250     return NULL;
2251 }
2252 
2253