1 /*===========================================================================
2 *
3 * PUBLIC DOMAIN NOTICE
4 * National Center for Biotechnology Information
5 *
6 * This software/database is a "United States Government Work" under the
7 * terms of the United States Copyright Act. It was written as part of
8 * the author's official duties as a United States Government employee and
9 * thus cannot be copyrighted. This software/database is freely available
10 * to the public for use. The National Library of Medicine and the U.S.
11 * Government have not placed any restriction on its use or reproduction.
12 *
13 * Although all reasonable efforts have been taken to ensure the accuracy
14 * and reliability of the software and data, the NLM and the U.S.
15 * Government do not and cannot warrant the performance or results that
16 * may be obtained by using this software or data. The NLM and the U.S.
17 * Government disclaim all warranties, express or implied, including
18 * warranties of performance, merchantability or fitness for any particular
19 * purpose.
20 *
21 * Please cite the author in any work or product based on this material.
22 *
23 * ===========================================================================
24 *
25 */
26
27 #include <vdb/extern.h>
28
29 #include <va_copy.h>
30
31 #define TRACK_REFERENCES 0
32
33 #define KONST const
34 #define SKONST
35 #include "cursor-table.h"
36 #include "dbmgr-priv.h"
37 #include "linker-priv.h"
38 #include "table-priv.h"
39 #include "schema-priv.h"
40 #include "schema-parse.h"
41 #include "column-priv.h"
42 #include "prod-expr.h"
43 #undef KONST
44 #undef SKONST
45 #include "blob-priv.h"
46 #include "page-map.h"
47
48 #include <vdb/cursor.h>
49 #include <vdb/table.h>
50 #include <vdb/vdb-priv.h>
51 #include <kdb/table.h>
52 #include <kdb/column.h>
53 #include <kdb/meta.h>
54 #include <kdb/namelist.h>
55 #include <kfs/dyload.h>
56 #include <klib/symbol.h>
57 #include <klib/symtab.h>
58 #include <klib/namelist.h>
59 #include <klib/log.h>
60 #include <klib/rc.h>
61 #include <klib/printf.h>
62 #include <klib/sort.h>
63 #include <bitstr.h>
64 #include <os-native.h>
65 #include <sysalloc.h>
66
67 #include <kproc/lock.h>
68 #include <kproc/cond.h>
69 #include <kproc/thread.h>
70
71
72 #include <stdlib.h>
73 #include <stdio.h>
74 #include <string.h>
75 #include <ctype.h>
76 #include <assert.h>
77
78 #define ToConstVCursor(self) ((const VCursor*)(self))
79
80 #define PERMIT_POST_OPEN_ADD 1
81 #if _ARCH_BITS == 32
82 #define DISABLE_READ_CACHE 1
83 #else
84 #define DISABLE_READ_CACHE 0
85 #endif
86
87 /* normally false
88 can be set for certain applications using VDBManagerDisablePagemapThread
89 */
90 static bool s_disable_pagemap_thread;
91
92 /*--------------------------------------------------------------------------
93 * NamedParamNode
94 */
95
96 typedef struct NamedParamNode NamedParamNode;
97 struct NamedParamNode
98 {
99 BSTNode n;
100 String name;
101 KDataBuffer value;
102 };
103
104 static
NamedParamNodeWhack(BSTNode * n,void * ignore)105 void CC NamedParamNodeWhack ( BSTNode *n, void *ignore )
106 {
107 NamedParamNode *self = ( NamedParamNode* ) n;
108 KDataBufferWhack ( & self -> value );
109 free ( self );
110 }
111
112 static
NamedParamComp(const void * item,const BSTNode * n)113 int64_t CC NamedParamComp ( const void *item, const BSTNode *n )
114 {
115 const String *name = item;
116 const NamedParamNode *node = ( const NamedParamNode* ) n;
117
118 return StringOrderNoNullCheck ( name, & node -> name );
119 }
120
121 static
NamedParamNodeComp(const BSTNode * A,const BSTNode * B)122 int64_t CC NamedParamNodeComp ( const BSTNode *A, const BSTNode *B )
123 {
124 const NamedParamNode *a = (const NamedParamNode *) A;
125 const NamedParamNode *b = (const NamedParamNode *) B;
126
127 return StringOrderNoNullCheck ( & a -> name, & b -> name );
128 }
129 /*--------------------------------------------------------------------------
130 * LinkedCursorNode
131 */
132
133 typedef struct LinkedCursorNode LinkedCursorNode;
134 struct LinkedCursorNode
135 {
136 BSTNode n;
137 char tbl[64];
138 VCursor *curs;
139 };
140
141 static
LinkedCursorNodeWhack(BSTNode * n,void * ignore)142 void CC LinkedCursorNodeWhack ( BSTNode *n, void *ignore )
143 {
144 LinkedCursorNode *self = ( LinkedCursorNode* ) n;
145 VCursorRelease ( self -> curs );
146 free ( self );
147 }
148
149 static
LinkedCursorComp(const void * item,const BSTNode * n)150 int64_t CC LinkedCursorComp ( const void *item, const BSTNode *n )
151 {
152 const char *tbl = item;
153 const LinkedCursorNode *node = ( const LinkedCursorNode* ) n;
154
155 return strncmp ( tbl, node -> tbl, sizeof(node -> tbl) );
156 }
157
158 static
LinkedCursorNodeComp(const BSTNode * A,const BSTNode * B)159 int64_t CC LinkedCursorNodeComp ( const BSTNode *A, const BSTNode *B )
160 {
161 const LinkedCursorNode *a = (const LinkedCursorNode *) A;
162 const LinkedCursorNode *b = (const LinkedCursorNode *) B;
163
164 return strncmp ( a -> tbl, b -> tbl,sizeof(a->tbl) );
165 }
166
167
168
169 /*--------------------------------------------------------------------------
170 * VTableCursor
171 * a row cursor onto a VTable
172 */
173
174 /* Whack
175 */
VTableCursorWhack(const VCURSOR_IMPL * p_self)176 rc_t VTableCursorWhack ( const VCURSOR_IMPL * p_self )
177 {
178 VCURSOR_IMPL * self = ( VCURSOR_IMPL * ) p_self;
179 if ( self -> cache_curs )
180 {
181 VTableCursorWhack ( ( VTableCursor * ) ( self -> cache_curs ) );
182 }
183 VBlobMRUCacheDestroy ( self->blob_mru_cache);
184
185 BSTreeWhack ( & self -> named_params, NamedParamNodeWhack, NULL );
186 BSTreeWhack ( & self -> linked_cursors, LinkedCursorNodeWhack, NULL );
187 VectorWhack ( & self -> trig, NULL, NULL );
188 VectorWhack ( & self -> v_cache_curs, NULL, NULL );
189 VectorWhack ( & self -> v_cache_cidx, NULL, NULL );
190
191 VSchemaRelease ( self -> schema );
192
193 { /* some references from the base class may dangle if we Sever() before VCursorWhackInt() is done */
194 const VTable * tbl = self -> tbl;
195 VCursorWhackInt ( & self -> dad );
196 VTableSever ( tbl );
197 }
198
199 return 0;
200 }
201
202 /* Make - PRIVATE
203 */
VTableCursorMake(VCURSOR_IMPL ** cursp,const VTable * tbl,VCursor_vt * vt)204 rc_t VTableCursorMake ( VCURSOR_IMPL **cursp, const VTable *tbl, VCursor_vt *vt )
205 {
206 rc_t rc;
207 VTableCursor *curs;
208
209 /* must have return param */
210 assert ( cursp != NULL );
211
212 /* must have parent tbl */
213 assert ( tbl != NULL );
214
215 /* create a structure */
216 curs = calloc ( 1, sizeof * curs );
217 if ( curs == NULL )
218 rc = RC ( rcVDB, rcCursor, rcConstructing, rcMemory, rcExhausted );
219 else
220 {
221 /* create a separate schema object */
222 rc = VSchemaMake ( & curs -> schema, tbl -> schema );
223 if ( rc == 0 )
224 {
225 /* extend table schema to populate with implicits */
226 rc = STableCloneExtend ( tbl -> stbl, & curs -> stbl, curs -> schema );
227 if ( rc == 0 )
228 {
229 curs -> dad . vt = vt;
230 curs -> tbl = VTableAttach ( tbl );
231 VectorInit ( & curs -> dad . row, 1, 16 );
232 VectorInit ( & curs -> v_cache_curs, 1, 16 );
233 VectorInit ( & curs -> v_cache_cidx, 1, 16 );
234 VCursorCacheInit ( & curs -> dad . col, 0, 16 );
235 VCursorCacheInit ( & curs -> dad . phys, 0, 16 );
236 VCursorCacheInit ( & curs -> dad . prod, 0, 16 );
237 VectorInit ( & curs -> dad . owned, 0, 64 );
238 VectorInit ( & curs -> trig, 0, 64 );
239 KRefcountInit ( & curs -> dad . refcount, 1, "VCursor", "make", "vcurs" );
240 curs -> dad . state = vcConstruct;
241 curs -> permit_add_column = true;
242 curs -> suspend_triggers = false;
243 * cursp = curs;
244 return 0;
245 }
246
247 VSchemaRelease ( curs -> schema );
248 }
249
250 free ( curs );
251 }
252
253 * cursp = NULL;
254
255 return rc;
256 }
257
258 /* SupplementSchema
259 * scan table for physical column names
260 * create transparent yet incomplete (untyped) columns for unknown names
261 * create incomplete (untyped) physical columns for forwarded names
262 * repeat process on static columns, except create complete (fully typed) objects
263 */
264 static
VCursorSupplementName(const KSymTable * tbl,STable * stbl,const VTypedecl * td,const char * name)265 rc_t VCursorSupplementName ( const KSymTable *tbl,
266 STable *stbl, const VTypedecl *td, const char *name )
267 {
268 rc_t rc = 0;
269 char buffer [ 256 ];
270
271 /* create physical name string */
272 int len = snprintf ( buffer, sizeof buffer, ".%s", name );
273 if ( len < 0 || len >= sizeof buffer )
274 rc = RC ( rcVDB, rcCursor, rcConstructing, rcName, rcExcessive );
275 else
276 {
277 KSymbol *sym;
278
279 String pname, cname;
280 StringInit ( & pname, buffer, len, string_len ( buffer, len ) );
281
282 /* if physical name is known */
283 sym = KSymTableFind ( tbl, & pname );
284 if ( sym != NULL )
285 {
286 /* if it is being implemented here */
287 if ( sym -> type == eVirtual )
288 rc = STableImplicitPhysMember ( stbl, td, sym, & pname );
289 return rc;
290 }
291
292 /* if simple name is unknown, add implicit */
293 sym = KSymTableFind ( tbl, StringSubstr ( & pname, & cname, 1, 0 ) );
294 if ( sym == NULL )
295 {
296 /* create implicit physical */
297 rc = STableImplicitPhysMember ( stbl, td, sym, & pname );
298 if ( rc == 0 )
299 rc = STableImplicitColMember ( stbl, & cname, & pname );
300 }
301 }
302 return rc;
303 }
304
305 static
VCursorSupplementPhysical(const KSymTable * tbl,const VTableCursor * self)306 rc_t VCursorSupplementPhysical ( const KSymTable *tbl, const VTableCursor *self )
307 {
308 KNamelist *names;
309 rc_t rc = KTableListCol ( self -> tbl -> ktbl, & names );
310 if ( rc == 0 )
311 {
312 uint32_t i, count;
313 rc = KNamelistCount ( names, & count );
314 for ( i = 0; rc == 0 && i < count; ++ i )
315 {
316 const char *name;
317 rc = KNamelistGet ( names, i, & name );
318 if ( rc == 0 )
319 rc = VCursorSupplementName ( tbl, self -> stbl, NULL, name );
320 }
321 KNamelistRelease ( names );
322 }
323 return rc;
324 }
325
326 static
VCursorSupplementStatic(const KSymTable * tbl,const VTableCursor * self)327 rc_t VCursorSupplementStatic ( const KSymTable *tbl, const VTableCursor *self )
328 {
329 rc_t rc;
330 KNamelist *names;
331
332 const KMDataNode *root = self -> tbl -> col_node;
333 if ( root == NULL )
334 return 0;
335
336 rc = KMDataNodeListChild ( root, & names );
337 if ( rc == 0 )
338 {
339 uint32_t i, count;
340 rc = KNamelistCount ( names, & count );
341 for ( i = 0; rc == 0 && i < count; ++ i )
342 {
343 const char *name;
344 rc = KNamelistGet ( names, i, & name );
345 if ( rc == 0 )
346 {
347 const KMDataNode *node;
348 rc = KMDataNodeOpenNodeRead ( root, & node, "%s", name );
349 if ( rc == 0 )
350 {
351 size_t size;
352 char typedecl [ 256 ];
353 rc = KMDataNodeReadAttr ( node, "type", typedecl, sizeof typedecl, & size );
354 if ( rc == 0 && size != 0 )
355 {
356 VTypedecl td;
357 rc = VSchemaResolveTypedecl ( self -> schema, & td, "%s", typedecl );
358 if ( rc == 0 )
359 rc = VCursorSupplementName ( tbl, self -> stbl, & td, name );
360
361 rc = 0; /*** don't care if name is not in the schema ***/
362
363 }
364
365 KMDataNodeRelease ( node );
366 }
367 }
368 }
369
370 KNamelistRelease ( names );
371 }
372
373 return rc;
374 }
375
VCursorSupplementSchema(const VCURSOR_IMPL * self)376 rc_t VCursorSupplementSchema ( const VCURSOR_IMPL *self )
377 {
378 KSymTable tbl;
379 rc_t rc = init_tbl_symtab ( & tbl, self -> schema, self -> stbl );
380 if ( rc == 0 )
381 {
382 rc = VCursorSupplementPhysical ( & tbl, self );
383 if ( rc == 0 )
384 rc = VCursorSupplementStatic ( & tbl, self );
385 KSymTableWhack ( & tbl );
386 }
387 return rc;
388 }
389
390 /* CreateCachedCursorRead
391 * creates a read cursor object onto table with a cache limit in bytes
392 *
393 * AVAILABILITY: version 2.1
394 *
395 * "curs" [ OUT ] - return parameter for newly created cursor
396 *
397 * "capacity" [ IN ] - the maximum bytes to cache on the cursor before
398 * dropping least recently used blobs
399 */
VTableCreateCachedCursorReadImpl(const VTable * self,const VTableCursor ** cursp,size_t capacity,bool create_pagemap_thread)400 static rc_t VTableCreateCachedCursorReadImpl ( const VTable *self,
401 const VTableCursor **cursp, size_t capacity, bool create_pagemap_thread )
402 {
403 rc_t rc;
404 #if DISABLE_READ_CACHE
405 capacity = 0;
406 #endif
407 if ( cursp == NULL )
408 rc = RC ( rcVDB, rcTable, rcOpening, rcParam, rcNull );
409 else {
410 VTableCursor *curs;
411 #if LAZY_OPEN_COL_NODE
412 if ( self -> col_node == NULL )
413 KMetadataOpenNodeRead ( self -> meta, & ( ( VTable* ) self ) -> col_node, "col" );
414 #endif
415 rc = VCursorMakeFromTable ( & curs, self );
416 if ( rc == 0 ) {
417 curs -> blob_mru_cache = VBlobMRUCacheMake(capacity);
418 curs -> read_only = true;
419 rc = VCursorSupplementSchema ( curs );
420
421 #if 0
422 if ( create_pagemap_thread && capacity > 0 && rc == 0 )
423 {
424 rc = VCursorLaunchPagemapThread ( curs );
425 if ( rc != 0 )
426 {
427 if ( GetRCState( rc ) == rcNotAvailable )
428 rc = 0;
429 }
430 }
431 #endif
432 if ( rc == 0 )
433 {
434 if(capacity > 0)
435 curs->launch_cnt = 5;
436 else
437 curs->launch_cnt=200;
438 * cursp = curs;
439 if(rc==0 && self->cache_tbl)
440 {
441 rc_t rc2;
442 const VTableCursor * cache_curs;
443 rc2 = VTableCreateCachedCursorReadImpl(self->cache_tbl,&cache_curs,64*1024*1024,create_pagemap_thread);
444 DBGMSG(DBG_VDB, DBG_FLAG(DBG_VDB_VDB), ("VTableCreateCachedCursorReadImpl(vdbcache) = %d\n", rc2));
445 if(rc2 == 0)
446 {
447 ((VTableCursor*)(*cursp)) -> cache_curs = & cache_curs -> dad;
448 }
449 }
450 return 0;
451 }
452 VCursorRelease ( & curs -> dad );
453 }
454 * cursp = NULL;
455 }
456 return rc;
457 }
458
VTableCreateCachedCursorRead(const VTable * self,const VCursor ** cursp,size_t capacity)459 LIB_EXPORT rc_t CC VTableCreateCachedCursorRead ( const VTable *self,
460 const VCursor **cursp, size_t capacity )
461 {
462 return VTableCreateCachedCursorReadImpl ( self, (const VTableCursor **)cursp, capacity, true );
463 }
464
465 /**
466 *** VTableCreateCursorReadInternal is only visible in vdb and needed for schema resolutions
467 ****/
VTableCreateCursorReadInternal(const VTable * self,const VTableCursor ** cursp)468 rc_t VTableCreateCursorReadInternal(const VTable *self, const VTableCursor **cursp)
469 {
470 return VTableCreateCachedCursorReadImpl(self,cursp,0,false);
471 }
472
473 /* CreateCursor
474 * creates a cursor object onto table
475 * multiple read cursors are allowed
476 * only a single write cursor is allowed
477 *
478 * "curs" [ OUT ] - return parameter for newly created cursor
479 */
VTableCreateCursorRead(const VTable * self,const VCursor ** curs)480 LIB_EXPORT rc_t CC VTableCreateCursorRead ( const VTable *self, const VCursor **curs )
481 {
482 /* will be deprecated in the future */
483 return VTableCreateCachedCursorRead ( self, curs, 0 );
484 }
485
486 /* PermitPostOpenAdd
487 * allows columns to be added to open cursor
488 * for write cursor, the effect lasts until the first row commit
489 */
VTableCursorPermitPostOpenAdd(const VCURSOR_IMPL * cself)490 rc_t CC VTableCursorPermitPostOpenAdd ( const VCURSOR_IMPL *cself )
491 {
492 rc_t rc;
493 VTableCursor *self = ( VCURSOR_IMPL* ) cself;
494
495 if ( self == NULL )
496 rc = RC ( rcVDB, rcCursor, rcUpdating, rcSelf, rcNull );
497 else if ( self -> dad . state == vcFailed )
498 rc = RC ( rcVDB, rcCursor, rcUpdating, rcCursor, rcInvalid );
499 else if ( self -> dad . state != vcConstruct )
500 rc = RC ( rcVDB, rcCursor, rcUpdating, rcCursor, rcOpen );
501 else
502 {
503 self -> permit_post_open_add = true;
504 rc = 0;
505 }
506 if(self->cache_curs){
507 VCursorPermitPostOpenAdd(self->cache_curs);
508 }
509
510 return rc;
511 }
512 /* SuspendTriggers
513 * blocks resolution of schema-based triggers
514 *
515 */
VTableCursorSuspendTriggers(const VCURSOR_IMPL * cself)516 rc_t CC VTableCursorSuspendTriggers ( const VCURSOR_IMPL *cself )
517 {
518 rc_t rc;
519 VTableCursor *self = ( VCURSOR_IMPL* ) cself;
520
521 if ( self == NULL )
522 rc = RC ( rcVDB, rcCursor, rcUpdating, rcSelf, rcNull );
523 else
524 {
525 self -> suspend_triggers = true;
526 rc = 0;
527 }
528
529 return rc;
530 }
531
532
533 /* AddSColumn
534 */
535 static
VTableCursorAddSColumn(VCURSOR_IMPL * self,uint32_t * idx,const SColumn * scol,const VTypedecl * cast,Vector * cx_bind)536 rc_t VTableCursorAddSColumn ( VCURSOR_IMPL *self, uint32_t *idx,
537 const SColumn *scol, const VTypedecl *cast, Vector *cx_bind )
538 {
539 rc_t rc;
540 VColumn *col;
541
542 if ( self -> read_only )
543 {
544 /* must be readable */
545 if ( scol -> read == NULL )
546 return RC ( rcVDB, rcCursor, rcUpdating, rcColumn, rcWriteonly );
547 }
548 else
549 {
550 /* must be writable */
551 if ( scol -> read_only || ( scol -> read == NULL && scol -> validate == NULL ) )
552 return RC ( rcVDB, rcCursor, rcUpdating, rcColumn, rcReadonly );
553 }
554
555 /* must not already be there - benign error */
556 col = VCursorCacheGet ( & self -> dad . col, & scol -> cid );
557 if ( col != NULL )
558 {
559 * idx = col -> ord;
560 return SILENT_RC ( rcVDB, rcCursor, rcUpdating, rcColumn, rcExists );
561 }
562
563 /* make object */
564 rc = VCursorMakeColumn ( & self -> dad, & col, scol, cx_bind );
565 if ( rc == 0 )
566 {
567 /* insert it into vectors */
568 rc = VectorAppend ( & self -> dad . row, & col -> ord, col );
569 if ( rc == 0 )
570 {
571 void *ignore;
572 rc = VCursorCacheSet ( & self -> dad . col, & scol -> cid, col );
573 if ( rc == 0 )
574 {
575 /* open column if cursor open or type unknown */
576 if ( self -> dad . state >= vcReady || scol -> td . type_id == 0 )
577 {
578 rc = VCursorPostOpenAdd ( self, col );
579 assert ( rc != 0 || scol -> td . type_id != 0 );
580 }
581 if ( rc == 0 )
582 {
583 /* check cast of SColumn against requested type
584 this is to handle the case where the column
585 was created incomplete, i.e. with unknown type */
586 if ( cast == NULL || VTypedeclToTypedecl ( & scol -> td,
587 self -> schema, cast, & col -> td, NULL ) )
588 {
589 /* has been entered */
590 * idx = col -> ord;
591 return 0;
592 }
593 }
594
595 /* bail out */
596 VCursorCacheSwap ( & self -> dad . col, & scol -> cid, NULL, & ignore );
597 }
598
599 VectorSwap ( & self -> dad . row, col -> ord, NULL, & ignore );
600 }
601
602 VColumnWhack ( col, NULL );
603 }
604
605 return rc;
606 }
607
608
609 /* AddColspec
610 * a "colspec" is either a simple column name or a typed name expression
611 * uses STable to evaluate colspec and find an SColumn
612 */
613 static
VCursorAddColspec(VCURSOR_IMPL * self,uint32_t * idx,const char * colspec)614 rc_t VCursorAddColspec ( VCURSOR_IMPL *self, uint32_t *idx, const char *colspec )
615 {
616 rc_t rc;
617
618 /* find an appropriate column in schema */
619 uint32_t type;
620 VTypedecl cast;
621 const SNameOverload *name;
622 const SColumn *scol = STableFind ( self -> tbl -> stbl, self -> schema,
623 & cast, & name, & type, colspec, "VCursorAddColspec", true );
624 if ( scol == NULL || type != eColumn )
625 rc = SILENT_RC ( rcVDB, rcCursor, rcUpdating, rcColumn, rcNotFound );
626 else
627 {
628 Vector cx_bind;
629 VectorInit ( & cx_bind, 1, self -> schema -> num_indirect );
630 rc = VTableCursorAddSColumn ( self, idx, scol, & cast, & cx_bind );
631 VectorWhack ( & cx_bind, NULL, NULL );
632 if(rc == 0)
633 {
634 char ccolspec[1024];
635 size_t n;
636 rc_t rc2=string_printf(ccolspec,sizeof(ccolspec),&n,"%s_CACHE",colspec);
637
638 VectorSet(&self->v_cache_curs,*idx,NULL);
639 VectorSet(&self->v_cache_cidx,*idx,(const void*)0);
640 if(rc2==0)
641 {
642 uint32_t cidx;
643 rc2=VCursorAddColumn ( ToConstVCursor ( self ), & cidx, ccolspec ); /** see if column exists in the same table **/
644 DBGMSG(DBG_VDB, DBG_FLAG(DBG_VDB_VDB), ("VCursorAddColspec(%s,vdbcache,sametable) = %d\n", ccolspec,rc2));
645 if(rc2==0 || GetRCState ( rc2 ) == rcExists )
646 {
647 VectorSet(&self->v_cache_curs,*idx,self);
648 VectorSet(&self->v_cache_cidx,*idx,(const void*)(uint64_t)cidx);
649 }
650 else if(self->cache_curs)
651 {
652 rc2=VCursorAddColumn(self->cache_curs,&cidx,ccolspec); /** see if column exists in external table **/
653 DBGMSG(DBG_VDB, DBG_FLAG(DBG_VDB_VDB), ("VCursorAddColspec(%s,vdbcache,remotetable) = %d\n", ccolspec,rc2));
654 if(rc2==0 || GetRCState ( rc2 ) == rcExists ){
655 VectorSet(&self->v_cache_curs,*idx,self->cache_curs);
656 VectorSet(&self->v_cache_cidx,*idx,(const void*)(uint64_t)cidx);
657 }
658 }
659 }
660 }
661 }
662
663 return rc;
664 }
665
666
667 /* AddColumn
668 * add a column to an unopened cursor
669 *
670 * "idx" [ OUT ] - return parameter for column index
671 *
672 * "name" [ IN ] - NUL terminated column name spec.
673 * to identify a column by name, provide the column name
674 * by itself. if there are multiple types available under
675 * that name, the default type for that column will be
676 * selected. to select a specific type, the name may
677 * be cast to that type using a cast expression, e.g.
678 * "( type ) name"
679 * the special name "*" may be added to a read cursor.
680 */
VTableCursorVAddColumn(const VCURSOR_IMPL * cself,uint32_t * idx,const char * name,va_list args)681 rc_t VTableCursorVAddColumn ( const VCURSOR_IMPL *cself,
682 uint32_t *idx, const char *name, va_list args )
683 {
684 rc_t rc;
685 VTableCursor *self = ( VTableCursor* ) cself;
686
687 if ( idx == NULL )
688 rc = RC ( rcVDB, rcCursor, rcUpdating, rcParam, rcNull );
689 else
690 {
691 * idx = 0;
692
693 if ( self == NULL )
694 rc = RC ( rcVDB, rcCursor, rcUpdating, rcSelf, rcNull );
695 else if ( name == NULL )
696 rc = RC ( rcVDB, rcCursor, rcUpdating, rcName, rcNull );
697 else if ( name [ 0 ] == 0 )
698 rc = RC ( rcVDB, rcCursor, rcUpdating, rcName, rcEmpty );
699 else if ( self -> dad . state == vcFailed )
700 rc = RC ( rcVDB, rcCursor, rcUpdating, rcCursor, rcInvalid );
701 else if ( self -> dad . state != vcConstruct && ! self -> permit_add_column )
702 rc = RC ( rcVDB, rcCursor, rcUpdating, rcCursor, rcLocked );
703 else
704 {
705 char colspec [ 1024 ];
706 int len = vsnprintf ( colspec, sizeof colspec, name, args );
707 if ( len < 0 || len >= sizeof colspec )
708 rc = RC ( rcVDB, rcCursor, rcUpdating, rcName, rcExcessive );
709 else
710 {
711 rc = VCursorAddColspec ( self, idx, colspec );
712 if ( rc == 0 || GetRCState ( rc ) == rcExists )
713 return rc;
714 }
715
716 if ( ! self -> permit_add_column )
717 {
718 PLOGERR ( klogErr, ( klogErr, rc, "failed to add column '$(spec)' to cursor",
719 "spec=%s", colspec ));
720 }
721
722 return rc;
723 }
724 }
725
726 LOGERR ( klogErr, rc, "failed to add column" );
727
728 return rc;
729 }
730
731 /* GetColumnIdx
732 * retrieve column index by name spec
733 *
734 * "idx" [ OUT ] - return parameter for column index
735 *
736 * "name" [ IN ] - NUL terminated column name spec.
737 */
VTableCursorVGetColumnIdx(const VCURSOR_IMPL * self,uint32_t * idx,const char * name,va_list args)738 rc_t CC VTableCursorVGetColumnIdx ( const VCURSOR_IMPL *self,
739 uint32_t *idx, const char *name, va_list args )
740 {
741 rc_t rc;
742
743 if ( idx == NULL )
744 rc = RC ( rcVDB, rcCursor, rcAccessing, rcParam, rcNull );
745 else
746 {
747 * idx = 0;
748
749 if ( name == NULL )
750 rc = RC ( rcVDB, rcCursor, rcAccessing, rcName, rcNull );
751 else if ( name [ 0 ] == 0 )
752 rc = RC ( rcVDB, rcCursor, rcAccessing, rcName, rcEmpty );
753 else if ( self -> dad . state == vcFailed )
754 rc = RC ( rcVDB, rcCursor, rcAccessing, rcCursor, rcInvalid );
755 else
756 {
757 char colspec [ 1024 ];
758 int len = vsnprintf ( colspec, sizeof colspec, name, args );
759 if ( len < 0 || len >= sizeof colspec )
760 rc = RC ( rcVDB, rcCursor, rcAccessing, rcName, rcExcessive );
761 else
762 {
763 /* find an appropriate column in schema */
764 uint32_t type;
765 VTypedecl cast;
766 const SNameOverload *name;
767 const SColumn *scol = STableFind ( self -> tbl -> stbl,
768 self -> schema,
769 & cast,
770 & name,
771 & type,
772 colspec,
773 "VTableCursorVGetColumnIdx",
774 true );
775 rc = VCursorGetColidx ( & self -> dad, scol, name, type, idx );
776 }
777 }
778 }
779
780 return rc;
781 }
782
783 /* Open
784 * open cursor, resolving schema
785 * for the set of opened columns
786 *
787 * NB - there is no corresponding "Close"
788 * use "Release" instead.
789 */
790 typedef struct VProdResolveData VProdResolveData;
791 struct VProdResolveData
792 {
793 VProdResolve pr;
794 rc_t rc;
795 };
796
797
798 static
VCursorResolveColumn(void * item,void * data)799 bool CC VCursorResolveColumn ( void *item, void *data )
800 {
801 if ( item != NULL )
802 {
803 void *ignore;
804 VTableCursor *self;
805
806 VColumn *col = item;
807 VProdResolveData *pb = data;
808 SColumn *scol = ( SColumn* ) col -> scol;
809
810 VProduction *src = NULL;
811 pb -> rc = VProdResolveColumnRoot ( & pb -> pr, & src, scol );
812 if ( pb -> rc == 0 )
813 {
814 if ( src > FAILED_PRODUCTION )
815 {
816 /* repair for incomplete implicit column decl */
817 if ( scol -> td . type_id == 0 )
818 scol -> td = src -> fd . td;
819
820 return false;
821 }
822
823 pb -> rc = RC ( rcVDB, rcCursor, rcOpening, rcColumn, rcUndefined );
824 }
825
826 /* check for tolerance */
827 self = ( VTableCursor * ) pb -> pr . curs;
828 if ( ! pb -> pr . ignore_column_errors )
829 {
830 if ( ! self -> permit_post_open_add )
831 {
832 PLOGERR ( klogErr, ( klogErr, pb -> rc, "failed to resolve column '$(name)' idx '$(idx)'",
833 "name=%.*s,idx=%u"
834 , ( int ) scol -> name -> name . size
835 , scol -> name -> name . addr
836 , col -> ord ));
837 }
838
839 return true;
840 }
841
842 /* remove from row and cache */
843 VectorSwap ( & self -> dad . row, col -> ord, NULL, & ignore );
844 VCursorCacheSwap ( & self -> dad . col, & scol -> cid, NULL, & ignore );
845
846 /* dump the VColumn */
847 VColumnWhack ( col, NULL );
848
849 /* return no-error */
850 pb -> rc = 0;
851 }
852
853 return false;
854 }
855
856 static
VCursorOpenColumn(const VTableCursor * cself,VColumn * col)857 rc_t VCursorOpenColumn ( const VTableCursor *cself, VColumn *col )
858 {
859 KDlset *libs;
860 VTableCursor *self = ( VTableCursor* ) cself;
861
862 Vector cx_bind;
863 VProdResolveData pb;
864 pb . pr . schema = self -> schema;
865 pb . pr . ld = self -> tbl -> linker;
866 pb . pr . name = & self -> stbl -> name -> name;
867 pb . pr . primary_table = VCursorGetTable ( & self -> dad );
868 pb . pr . curs = & self -> dad;
869 pb . pr . cache = & self -> dad . prod;
870 pb . pr . owned = & self -> dad . owned;
871 pb . pr . cx_bind = & cx_bind;
872 pb . pr . chain = chainDecoding;
873 pb . pr . blobbing = false;
874 pb . pr . ignore_column_errors = false;
875 pb . pr . discover_writable_columns = false;
876
877 VectorInit ( & cx_bind, 1, self -> schema -> num_indirect );
878
879 pb . rc = VLinkerOpen ( pb . pr . ld, & libs );
880 if ( pb . rc == 0 )
881 {
882 pb . pr . libs = libs;
883 VCursorResolveColumn ( col, & pb );
884 KDlsetRelease ( libs );
885 }
886
887 VectorWhack ( & cx_bind, NULL, NULL );
888
889 return pb . rc;
890 }
891
892 /* PostOpenAdd
893 * handle opening of a column after the cursor is opened
894 */
VCursorPostOpenAddRead(VTableCursor * self,VColumn * col)895 rc_t VCursorPostOpenAddRead ( VTableCursor *self, VColumn *col )
896 {
897 return VCursorOpenColumn ( self, col );
898 }
899
900
901 static
VCursorResolveColumnProductions(VTableCursor * self,const KDlset * libs,bool ignore_failures)902 rc_t VCursorResolveColumnProductions ( VTableCursor *self,
903 const KDlset *libs, bool ignore_failures )
904 {
905 Vector cx_bind;
906 VProdResolveData pb;
907 pb . pr . schema = self -> schema;
908 pb . pr . ld = self -> tbl -> linker;
909 pb . pr . libs = libs;
910 pb . pr . name = & self -> stbl -> name -> name;
911 pb . pr . primary_table = VCursorGetTable ( & self -> dad ) ;
912 pb . pr . curs = & self -> dad ;
913 pb . pr . cache = & self -> dad . prod;
914 pb . pr . owned = & self -> dad . owned;
915 pb . pr . cx_bind = & cx_bind;
916 pb . pr . chain = chainDecoding;
917 pb . pr . blobbing = false;
918 pb . pr . ignore_column_errors = ignore_failures;
919 pb . pr . discover_writable_columns = false;
920 pb . rc = 0;
921
922 VectorInit ( & cx_bind, 1, self -> schema -> num_indirect );
923
924 if ( ! VectorDoUntil ( & self -> dad . row, false, VCursorResolveColumn, & pb ) )
925 pb . rc = 0;
926
927 VectorWhack ( & cx_bind, NULL, NULL );
928
929 return pb . rc;
930 }
931
VCursorOpenRead(VCURSOR_IMPL * self,const KDlset * libs)932 rc_t VCursorOpenRead ( VCURSOR_IMPL *self, const KDlset *libs )
933 {
934 rc_t rc;
935
936 if ( self -> dad . state >= vcReady )
937 rc = 0;
938 else if ( self -> dad . state == vcFailed )
939 rc = RC ( rcVDB, rcCursor, rcOpening, rcCursor, rcInvalid );
940 else
941 {
942 rc = VCursorResolveColumnProductions ( self, libs, false );
943 if ( rc == 0 )
944 {
945 self -> dad . row_id =
946 self -> dad . start_id =
947 self -> dad . end_id = 1;
948 self -> dad . state = vcReady;
949 if( self -> cache_curs )
950 {
951 VCursorOpenRead ( ( VTableCursor * ) self -> cache_curs, libs );
952 }
953 return rc;
954 }
955 else
956 {
957 /* in case the column is not defined ( rcColumn, rcUndefined )
958 we want to check if the table is empty, and report that instead
959 */
960 if ( GetRCState( rc ) == rcUndefined &&
961 GetRCObject( rc ) == ( enum RCObject )rcColumn )
962 {
963 bool empty;
964 if ( ( VTableIsEmpty ( self -> tbl, &empty ) == 0 ) && empty )
965 {
966 rc = RC ( rcVDB, rcCursor, rcOpening, rcTable, rcEmpty );
967 }
968 }
969 }
970 self -> dad . state = vcFailed;
971 }
972
973 return rc;
974 }
975
976 static
VCursorOpenForListing(const VCursor * cself)977 rc_t VCursorOpenForListing ( const VCursor *cself )
978 {
979 rc_t rc;
980 VTableCursor *self = ( VTableCursor* ) cself;
981
982 VLinker *ld = self -> tbl -> linker;
983
984 KDlset *libs;
985 rc = VLinkerOpen ( ld, & libs );
986 if ( rc == 0 )
987 {
988 rc = VCursorResolveColumnProductions ( self, libs, true );
989 KDlsetRelease ( libs );
990 }
991 return rc;
992 }
993
994 /* Read
995 * read entire single row of byte-aligned data into a buffer
996 *
997 * "col_idx" [ IN ] - index of column to be read, returned by "AddColumn"
998 *
999 * "elem_bits" [ IN ] - expected element size in bits, required
1000 * to be compatible with the actual element size, and be a multiple
1001 * of 8 ( byte-aligned ). for non-byte-aligned data, see ReadBits
1002 *
1003 * "buffer" [ OUT ] and "blen" [ IN ] - return buffer for row data
1004 * where "blen" gives buffer capacity in elements. the total buffer
1005 * size in bytes == ( "elem_bits" * "blen" + 7 ) / 8.
1006 *
1007 * "row_len" [ OUT ] - return parameter for the number of elements
1008 * in the requested row.
1009 *
1010 * when the return code is 0, "row_len" will contain the number of
1011 * elements read into buffer. if the return code indicates that the
1012 * buffer is too small, "row_len" will give the required buffer length.
1013 */
1014 static
VCursorReadColumnDirectInt(const VCURSOR_IMPL * cself,int64_t row_id,uint32_t col_idx,uint32_t * elem_bits,const void ** base,uint32_t * boff,uint32_t * row_len,uint32_t * repeat_count,const VBlob ** rslt)1015 rc_t VCursorReadColumnDirectInt ( const VCURSOR_IMPL *cself, int64_t row_id, uint32_t col_idx,
1016 uint32_t *elem_bits, const void **base, uint32_t *boff, uint32_t *row_len, uint32_t *repeat_count,
1017 const VBlob **rslt )
1018 {
1019 rc_t rc,rc_cache=0;
1020 const VColumn *col;
1021 const VBlob *blob;
1022
1023 col = ( const void* ) VectorGet ( & cself -> dad . row, col_idx );
1024 if ( col == NULL )
1025 return RC ( rcVDB, rcCursor, rcReading, rcColumn, rcInvalid );
1026
1027 /* 2.0 behavior if not caching */
1028 if ( cself -> blob_mru_cache == NULL )
1029 return VColumnRead ( col, row_id, elem_bits, base, boff, row_len, (VBlob**) rslt );
1030
1031 /* check MRU blob */
1032 blob = VBlobMRUCacheFind(cself->blob_mru_cache,col_idx,row_id);
1033 if(blob){
1034 assert(row_id >= blob->start_id && row_id <= blob->stop_id);
1035 /* if the caller wants the blob back... */
1036 if ( rslt != NULL )
1037 * rslt = blob;
1038 /* ask column to read from blob */
1039 return VColumnReadCachedBlob ( col, blob, row_id, elem_bits, base, boff, row_len, repeat_count);
1040 }
1041 { /* ask column to produce a blob to be cached */
1042 VBlobMRUCacheCursorContext cctx;
1043 cctx.cache=cself -> blob_mru_cache;
1044 cctx.col_idx = col_idx;
1045 rc = VColumnReadBlob(col,&blob,row_id,elem_bits,base,boff,row_len,repeat_count,&cctx);
1046 }
1047 if ( rc != 0 || blob == NULL ){
1048 if(rslt) *rslt = NULL;
1049 return rc;
1050 }
1051 if(blob->stop_id > blob->start_id + 4)
1052 rc_cache=VBlobMRUCacheSave(cself->blob_mru_cache, col_idx, blob);
1053 if(rslt==NULL){ /** user does not care about the blob ***/
1054 if( rc_cache == 0){
1055 VBlobRelease((VBlob*)blob);
1056 } /** else the memory will leak **/
1057 } else {
1058 *rslt=blob;
1059 }
1060 return 0;
1061 }
1062
1063 /* GetBlob
1064 * retrieve a blob of data containing the current row id
1065 * GetBlobDirect
1066 * retrieve a blob of data containing the requested row id
1067 *
1068 * "blob" [ OUT ] - return parameter for a new reference
1069 * to VBlob containing requested cell. NB - must be released
1070 * via VBlobRelease when no longer needed.
1071 *
1072 * "row_id" [ IN ] - allows ReadDirect random access to any cell
1073 * in column
1074 *
1075 * "col_idx" [ IN ] - index of column to be read, returned by "AddColumn"
1076 */
VTableCursorGetBlob(const VCURSOR_IMPL * self,const VBlob ** blob,uint32_t col_idx)1077 rc_t VTableCursorGetBlob ( const VCURSOR_IMPL *self,
1078 const VBlob **blob, uint32_t col_idx )
1079 {
1080 rc_t rc;
1081
1082 if ( blob == NULL )
1083 rc = RC ( rcVDB, rcCursor, rcAccessing, rcParam, rcNull );
1084 else
1085 {
1086 if ( ! self -> read_only )
1087 rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcWriteonly );
1088 else
1089 {
1090 const void *base;
1091 uint32_t elem_bits, boff, row_len;
1092
1093 switch ( self -> dad . state )
1094 {
1095 case vcConstruct:
1096 rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcNotOpen );
1097 break;
1098 case vcReady:
1099 rc = RC ( rcVDB, rcCursor, rcReading, rcRow, rcNotOpen );
1100 break;
1101 case vcRowOpen:
1102 rc = VCursorReadColumnDirectInt( self, self -> dad . row_id, col_idx, &elem_bits, &base, &boff, &row_len, NULL, blob );
1103 if ( rc == 0 )
1104 {
1105 rc = VBlobAddRef ( ( VBlob* ) *blob );
1106 if ( rc == 0 )
1107 return 0;
1108 }
1109 break;
1110 default:
1111 rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcInvalid );
1112 }
1113 }
1114
1115 * blob = NULL;
1116 }
1117 return rc;
1118 }
1119
VTableCursorGetBlobDirect(const VCURSOR_IMPL * self,const VBlob ** blob,int64_t row_id,uint32_t col_idx)1120 rc_t VTableCursorGetBlobDirect ( const VCURSOR_IMPL *self,
1121 const VBlob **blob, int64_t row_id, uint32_t col_idx )
1122 {
1123 rc_t rc;
1124
1125 if ( blob == NULL )
1126 rc = RC ( rcVDB, rcCursor, rcAccessing, rcParam, rcNull );
1127 else
1128 {
1129 if ( ! self -> read_only )
1130 rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcWriteonly );
1131 else
1132 {
1133 const void *base;
1134 uint32_t elem_bits, boff, row_len;
1135
1136 switch ( self -> dad . state )
1137 {
1138 case vcConstruct:
1139 rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcNotOpen );
1140 break;
1141 case vcReady:
1142 case vcRowOpen:
1143 rc = VCursorReadColumnDirectInt ( self, row_id, col_idx, &elem_bits, &base, &boff, &row_len, NULL, blob );
1144 if ( rc == 0 )
1145 {
1146 rc = VBlobAddRef ( ( VBlob* ) *blob );
1147 if ( rc == 0 )
1148 return 0;
1149 }
1150 break;
1151 default:
1152 rc = RC ( rcVDB, rcCursor, rcReading, rcCursor, rcInvalid );
1153 }
1154 }
1155
1156 * blob = NULL;
1157 }
1158 return rc;
1159 }
1160
1161 static
VCursorReadColumnDirect(const VTableCursor * self,int64_t row_id,uint32_t col_idx,uint32_t * elem_bits,const void ** base,uint32_t * boff,uint32_t * row_len)1162 rc_t VCursorReadColumnDirect ( const VTableCursor *self, int64_t row_id, uint32_t col_idx,
1163 uint32_t *elem_bits, const void **base, uint32_t *boff, uint32_t *row_len )
1164 {
1165 bool cache_col_active_save;
1166 if ( ! self -> read_only )
1167 return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcWriteonly );
1168
1169 switch ( self -> dad . state )
1170 {
1171 case vcConstruct : return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcNotOpen );
1172 case vcReady :
1173 case vcRowOpen : break;
1174 default : return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcInvalid );
1175 }
1176
1177 cache_col_active_save = self->cache_col_active;
1178 ( ( VTableCursor* ) self )->cache_col_active = false;
1179 if ( self->cache_curs != NULL )
1180 {
1181 const VCursor *curs = VectorGet( &self->v_cache_curs, col_idx );
1182 if ( curs != NULL )
1183 {
1184 ( ( VTableCursor* ) self )->cache_col_active = true;
1185 if ( self->cache_empty_start == 0 ||
1186 row_id < self->cache_empty_start ||
1187 row_id > self->cache_empty_end )
1188 {
1189 uint32_t repeat_count;
1190 uint32_t cidx = ( uint32_t )( uint64_t )VectorGet( &self->v_cache_cidx, col_idx );
1191 rc_t rc2 = VCursorReadColumnDirectInt( ( const VTableCursor * ) curs, row_id, cidx, elem_bits, base, boff, row_len, &repeat_count, NULL );
1192 if ( rc2 == 0 )
1193 {
1194 if ( *row_len > 0 )
1195 {
1196 ( ( VTableCursor* )self )->cache_col_active = cache_col_active_save;
1197 return 0;
1198 }
1199 else
1200 {
1201 /*** save window where cache is useless */
1202 ( ( VTableCursor* )self )->cache_empty_start = row_id;
1203 ( ( VTableCursor* )self )->cache_empty_end = row_id + repeat_count - 1;
1204 }
1205 }
1206 }
1207 }
1208 }
1209
1210 {
1211 rc_t rc = VCursorReadColumnDirectInt( self, row_id, col_idx, elem_bits, base, boff, row_len, NULL, NULL );
1212 ( ( VTableCursor* )self )->cache_col_active = cache_col_active_save;
1213 return rc;
1214 }
1215 }
1216
1217
1218 static
VCursorReadColumn(const VTableCursor * self,uint32_t col_idx,uint32_t * elem_bits,const void ** base,uint32_t * boff,uint32_t * row_len)1219 rc_t VCursorReadColumn ( const VTableCursor *self, uint32_t col_idx,
1220 uint32_t *elem_bits, const void **base, uint32_t *boff, uint32_t *row_len )
1221 {
1222 rc_t rc = 0;
1223 int64_t row_id = self -> dad . row_id;
1224 bool cache_col_active_save;
1225 if ( ! self -> read_only )
1226 return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcWriteonly );
1227
1228 switch ( self -> dad . state )
1229 {
1230 case vcConstruct:
1231 return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcNotOpen );
1232 case vcReady:
1233 return RC ( rcVDB, rcCursor, rcReading, rcRow, rcNotOpen );
1234 case vcRowOpen:
1235 break;
1236 default:
1237 return RC ( rcVDB, rcCursor, rcReading, rcCursor, rcInvalid );
1238 }
1239 cache_col_active_save = self->cache_col_active;
1240 ((VTableCursor*)self)->cache_col_active=false;
1241 if(self->cache_curs)
1242 {
1243 const VCursor *curs=VectorGet(&self->v_cache_curs,col_idx);
1244 if(curs)
1245 {
1246 ((VTableCursor*)self)->cache_col_active=true;
1247 if(self->cache_empty_start == 0 || row_id < self->cache_empty_start || row_id > self->cache_empty_end)
1248 {
1249 uint32_t repeat_count;
1250 uint32_t cidx = (uint32_t)(uint64_t)VectorGet(&self->v_cache_cidx,col_idx);
1251 rc_t rc2 = VCursorReadColumnDirectInt( ( const VTableCursor * ) curs,row_id, cidx, elem_bits, base, boff, row_len, &repeat_count,NULL );
1252 if(rc2==0)
1253 {
1254 if(*row_len > 0)
1255 {
1256 ((VTableCursor*)self)->cache_col_active=cache_col_active_save;
1257 return 0;
1258 }
1259 else
1260 {
1261 /*** save window where cache is useless */
1262 ((VTableCursor*)self)->cache_empty_start = row_id;
1263 ((VTableCursor*)self)->cache_empty_end = row_id + repeat_count -1;
1264 }
1265 }
1266 }
1267 }
1268 }
1269
1270 rc=VCursorReadColumnDirectInt ( self, row_id, col_idx, elem_bits, base, boff, row_len, NULL, NULL );
1271 ((VTableCursor*)self)->cache_col_active=cache_col_active_save;
1272 return rc;
1273 }
1274
1275 static __inline__
bad_elem_bits(uint32_t elem_size,uint32_t elem_bits)1276 bool bad_elem_bits ( uint32_t elem_size, uint32_t elem_bits )
1277 {
1278 if ( elem_size != elem_bits )
1279 {
1280 if ( elem_size < elem_bits && elem_bits % elem_size != 0 )
1281 return true;
1282 return ( elem_size % elem_bits != 0 );
1283 }
1284 return false;
1285 }
1286
VTableCursorRead(const VCURSOR_IMPL * self,uint32_t col_idx,uint32_t elem_bits,void * buffer,uint32_t blen,uint32_t * row_len)1287 rc_t VTableCursorRead ( const VCURSOR_IMPL *self, uint32_t col_idx,
1288 uint32_t elem_bits, void *buffer, uint32_t blen, uint32_t *row_len )
1289 {
1290 rc_t rc;
1291
1292 if ( row_len == NULL )
1293 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1294 else
1295 {
1296 if ( elem_bits == 0 || ( elem_bits & 7 ) != 0 )
1297 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcInvalid );
1298 else
1299 {
1300 uint32_t elem_size; const void *base; uint32_t boff;
1301 rc = VCursorReadColumn ( self, col_idx, & elem_size,
1302 & base, & boff, row_len );
1303 if ( rc == 0 )
1304 {
1305 if ( bad_elem_bits ( elem_size, elem_bits ) )
1306 rc = RC ( rcVDB, rcCursor, rcReading, rcType, rcInconsistent );
1307 else if ( * row_len != 0 )
1308 {
1309 if ( blen == 0 )
1310 return RC ( rcVDB, rcCursor, rcReading, rcBuffer, rcInsufficient );
1311 if ( buffer == NULL )
1312 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1313 else
1314 {
1315 uint64_t to_read = * row_len * elem_size;
1316 uint64_t bsize = blen * elem_bits;
1317
1318 /* always return the required buffer size */
1319 * row_len = ( uint32_t ) ( to_read / elem_bits );
1320
1321 /* detect buffer too small */
1322 if ( to_read > bsize )
1323 {
1324 rc = RC ( rcVDB, rcCursor, rcReading, rcBuffer, rcInsufficient );
1325 to_read = bsize;
1326 }
1327
1328 /* copy out data up to limit */
1329 assert ( boff == 0 );
1330 memmove ( buffer, base, ( size_t ) ( to_read >> 3 ) );
1331
1332 return rc;
1333 }
1334 }
1335 }
1336 }
1337
1338 * row_len = 0;
1339 }
1340
1341 return rc;
1342 }
1343
VTableCursorReadDirect(const VCURSOR_IMPL * self,int64_t row_id,uint32_t col_idx,uint32_t elem_bits,void * buffer,uint32_t blen,uint32_t * row_len)1344 rc_t VTableCursorReadDirect ( const VCURSOR_IMPL *self, int64_t row_id, uint32_t col_idx,
1345 uint32_t elem_bits, void *buffer, uint32_t blen, uint32_t *row_len )
1346 {
1347 rc_t rc;
1348
1349 if ( row_len == NULL )
1350 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1351 else
1352 {
1353 if ( elem_bits == 0 || ( elem_bits & 7 ) != 0 )
1354 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcInvalid );
1355 else
1356 {
1357 uint32_t elem_size; const void *base; uint32_t boff;
1358 rc = VCursorReadColumnDirect ( self, row_id, col_idx,
1359 & elem_size, & base, & boff, row_len );
1360 if ( rc == 0 )
1361 {
1362 if ( bad_elem_bits ( elem_size, elem_bits ) )
1363 rc = RC ( rcVDB, rcCursor, rcReading, rcType, rcInconsistent );
1364 else if ( * row_len != 0 )
1365 {
1366 if ( blen == 0 )
1367 return RC ( rcVDB, rcCursor, rcReading, rcBuffer, rcInsufficient );
1368 if ( buffer == NULL )
1369 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1370 else
1371 {
1372 uint64_t to_read = * row_len * elem_size;
1373 uint64_t bsize = blen * elem_bits;
1374
1375 /* always return the required buffer size */
1376 * row_len = ( uint32_t ) ( to_read / elem_bits );
1377
1378 /* detect buffer too small */
1379 if ( to_read > bsize )
1380 {
1381 rc = RC ( rcVDB, rcCursor, rcReading, rcBuffer, rcInsufficient );
1382 to_read = bsize;
1383 }
1384
1385 /* copy out data up to limit */
1386 assert ( boff == 0 );
1387 memmove ( buffer, base, ( size_t ) ( to_read >> 3 ) );
1388
1389 return rc;
1390 }
1391 }
1392 }
1393 }
1394
1395 * row_len = 0;
1396 }
1397
1398 return rc;
1399 }
1400
1401
1402 /* ReadBits
1403 * read single row of potentially bit-aligned column data into a buffer
1404 *
1405 * "col_idx" [ IN ] - index of column to be read, returned by "AddColumn"
1406 *
1407 * "elem_bits" [ IN ] - expected element size in bits, required to be
1408 * compatible with the actual element size, and may ( or may not ) be
1409 * a multiple of 8 ( byte aligned ).
1410 *
1411 * "start" [ IN ] - zero-based starting index to first element,
1412 * valid from 0 .. row_len - 1
1413 *
1414 * "buffer" [ IN ], "boff" [ IN ] and "blen" [ IN ] -
1415 * return buffer for row data, where "boff" is in BITS
1416 * and "blen" is in ELEMENTS.
1417 *
1418 * "num_read" [ OUT ] - return parameter for the number of elements
1419 * read, which is <= "blen"
1420 *
1421 * "remaining" [ OUT, NULL OKAY ] - optional return parameter for
1422 * the number of elements remaining to be read. specifically,
1423 * "start" + "num_read" + "remaining" == row length, assuming that
1424 * "start" <= row length.
1425 */
VTableCursorReadBits(const VCURSOR_IMPL * self,uint32_t col_idx,uint32_t elem_bits,uint32_t start,void * buffer,uint32_t off,uint32_t blen,uint32_t * num_read,uint32_t * remaining)1426 rc_t VTableCursorReadBits ( const VCURSOR_IMPL *self, uint32_t col_idx,
1427 uint32_t elem_bits, uint32_t start, void *buffer, uint32_t off,
1428 uint32_t blen, uint32_t *num_read, uint32_t *remaining )
1429 {
1430 rc_t rc;
1431
1432 uint32_t dummy;
1433 if ( remaining == NULL )
1434 remaining = & dummy;
1435
1436 if ( num_read == NULL )
1437 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1438 else
1439 {
1440 if ( elem_bits == 0 )
1441 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcInvalid );
1442 else
1443 {
1444 uint32_t elem_size; const void *base; uint32_t boff;
1445 rc = VCursorReadColumn ( self, col_idx, & elem_size,
1446 & base, & boff, num_read );
1447 if ( rc == 0 )
1448 {
1449 if ( bad_elem_bits ( elem_size, elem_bits ) )
1450 rc = RC ( rcVDB, rcCursor, rcReading, rcType, rcInconsistent );
1451 else if ( * num_read != 0 )
1452 {
1453 uint64_t to_read = * num_read * elem_size;
1454 uint64_t doff = start * elem_bits;
1455 to_read = to_read > doff ? to_read - doff : 0;
1456 if ( blen == 0 )
1457 {
1458 * num_read = 0;
1459 * remaining = ( uint32_t ) ( to_read / elem_bits );
1460 return 0;
1461 }
1462
1463 if ( buffer == NULL )
1464 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1465 else
1466 {
1467 uint64_t bsize = blen * elem_size;
1468 if ( to_read <= bsize )
1469 * remaining = 0;
1470 else
1471 {
1472 * remaining = (uint32_t)( ( to_read - bsize ) / elem_bits );
1473 to_read = bsize;
1474 }
1475 bitcpy ( buffer, off, base, boff + doff, ( bitsz_t ) to_read );
1476 * num_read = ( uint32_t ) ( to_read / elem_bits );
1477 return 0;
1478 }
1479 }
1480 }
1481 }
1482
1483 * num_read = 0;
1484 }
1485
1486 * remaining = 0;
1487
1488 return rc;
1489 }
1490
VTableCursorReadBitsDirect(const VCURSOR_IMPL * self,int64_t row_id,uint32_t col_idx,uint32_t elem_bits,uint32_t start,void * buffer,uint32_t off,uint32_t blen,uint32_t * num_read,uint32_t * remaining)1491 rc_t VTableCursorReadBitsDirect ( const VCURSOR_IMPL *self, int64_t row_id, uint32_t col_idx,
1492 uint32_t elem_bits, uint32_t start, void *buffer, uint32_t off,
1493 uint32_t blen, uint32_t *num_read, uint32_t *remaining )
1494 {
1495 rc_t rc;
1496
1497 uint32_t dummy;
1498 if ( remaining == NULL )
1499 remaining = & dummy;
1500
1501 if ( num_read == NULL )
1502 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1503 else
1504 {
1505 if ( elem_bits == 0 )
1506 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcInvalid );
1507 else
1508 {
1509 uint32_t elem_size; const void *base; uint32_t boff;
1510 rc = VCursorReadColumnDirect ( self, row_id, col_idx,
1511 & elem_size, & base, & boff, num_read );
1512 if ( rc == 0 )
1513 {
1514 if ( bad_elem_bits ( elem_size, elem_bits ) )
1515 rc = RC ( rcVDB, rcCursor, rcReading, rcType, rcInconsistent );
1516 else if ( * num_read != 0 )
1517 {
1518 uint64_t to_read = (uint64_t)(* num_read) * elem_size;
1519 uint64_t doff = (uint64_t)start * elem_bits;
1520 to_read = to_read > doff ? to_read - doff : 0;
1521 if ( blen == 0 )
1522 {
1523 * num_read = 0;
1524 * remaining = ( uint32_t ) ( to_read / elem_bits );
1525 return 0;
1526 }
1527
1528 if ( buffer == NULL )
1529 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1530 else
1531 {
1532 uint64_t bsize = (uint64_t)blen * elem_size;
1533 if ( to_read <= bsize )
1534 * remaining = 0;
1535 else
1536 {
1537 * remaining = (uint32_t)( ( to_read - bsize ) / elem_bits );
1538 to_read = bsize;
1539 }
1540 bitcpy ( buffer, off, base, boff + doff, ( bitsz_t ) to_read );
1541 * num_read = ( uint32_t ) ( to_read / elem_bits );
1542 return 0;
1543 }
1544 }
1545 }
1546 }
1547
1548 * num_read = 0;
1549 }
1550
1551 * remaining = 0;
1552
1553 return rc;
1554 }
1555
1556
1557 /* CellData
1558 * access pointer to single cell of potentially bit-aligned column data
1559 *
1560 * "col_idx" [ IN ] - index of column to be read, returned by "AddColumn"
1561 *
1562 * "elem_bits" [ OUT, NULL OKAY ] - optional return parameter for
1563 * element size in bits
1564 *
1565 * "base" [ OUT ] and "boff" [ OUT, NULL OKAY ] -
1566 * compound return parameter for pointer to row starting bit
1567 * where "boff" is in BITS
1568 *
1569 * "row_len" [ OUT, NULL OKAY ] - the number of elements in row
1570 */
VTableCursorCellData(const VCURSOR_IMPL * self,uint32_t col_idx,uint32_t * elem_bits,const void ** base,uint32_t * boff,uint32_t * row_len)1571 rc_t VTableCursorCellData ( const VCURSOR_IMPL *self, uint32_t col_idx,
1572 uint32_t *elem_bits, const void **base, uint32_t *boff, uint32_t *row_len )
1573 {
1574 rc_t rc;
1575
1576 uint32_t dummy [ 3 ];
1577 if ( row_len == NULL )
1578 row_len = & dummy [ 0 ];
1579 if ( boff == NULL )
1580 boff = & dummy [ 1 ];
1581 if ( elem_bits == NULL )
1582 elem_bits = & dummy [ 2 ];
1583
1584 if ( base == NULL )
1585 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1586 else
1587 {
1588 rc = VCursorReadColumn ( self, col_idx,
1589 elem_bits, base, boff, row_len );
1590 if ( rc == 0 )
1591 return 0;
1592
1593 * base = NULL;
1594 }
1595
1596 * elem_bits = 0;
1597 * boff = 0;
1598 * row_len = 0;
1599
1600 return rc;
1601 }
1602
VTableCursorCellDataDirect(const VCURSOR_IMPL * self,int64_t row_id,uint32_t col_idx,uint32_t * elem_bits,const void ** base,uint32_t * boff,uint32_t * row_len)1603 rc_t VTableCursorCellDataDirect ( const VCURSOR_IMPL *self, int64_t row_id, uint32_t col_idx,
1604 uint32_t *elem_bits, const void **base, uint32_t *boff, uint32_t *row_len )
1605 {
1606 rc_t rc;
1607
1608 uint32_t dummy [ 3 ];
1609 if ( row_len == NULL )
1610 row_len = & dummy[ 0 ];
1611 if ( boff == NULL )
1612 boff = & dummy [ 1 ];
1613 if ( elem_bits == NULL )
1614 elem_bits = & dummy [ 2 ];
1615
1616 if ( base == NULL )
1617 rc = RC ( rcVDB, rcCursor, rcReading, rcParam, rcNull );
1618 else
1619 {
1620 rc = VCursorReadColumnDirect ( self, row_id, col_idx,
1621 elem_bits, base, boff, row_len );
1622 if ( rc == 0 )
1623 return 0;
1624
1625 * base = NULL;
1626 }
1627
1628 * elem_bits = 0;
1629 * boff = 0;
1630 * row_len = 0;
1631
1632 return rc;
1633 }
1634
VTableCursorDataPrefetch(const VCURSOR_IMPL * cself,const int64_t * row_ids,uint32_t col_idx,uint32_t num_rows,int64_t min_valid_row_id,int64_t max_valid_row_id,bool continue_on_error)1635 rc_t VTableCursorDataPrefetch ( const VCURSOR_IMPL *cself,
1636 const int64_t *row_ids,
1637 uint32_t col_idx,
1638 uint32_t num_rows,
1639 int64_t min_valid_row_id,
1640 int64_t max_valid_row_id,
1641 bool continue_on_error )
1642 {
1643 rc_t rc=0;
1644 const VColumn *col = ( const void* ) VectorGet ( & cself -> dad . row, col_idx );
1645 if ( col == NULL )
1646 {
1647 return RC ( rcVDB, rcCursor, rcReading, rcColumn, rcInvalid );
1648 }
1649
1650 if ( cself->blob_mru_cache && num_rows > 0 )
1651 {
1652 int64_t *row_ids_sorted = malloc( num_rows * sizeof( *row_ids_sorted ) );
1653 if ( row_ids_sorted != NULL )
1654 {
1655 uint32_t i, num_rows_sorted;
1656 for( i = 0, num_rows_sorted = 0; i < num_rows; i++ )
1657 {
1658 int64_t row_id = row_ids[ i ];
1659 if ( row_id >= min_valid_row_id && row_id <= max_valid_row_id )
1660 {
1661 row_ids_sorted[ num_rows_sorted++ ] = row_id;
1662 }
1663 }
1664 if ( num_rows_sorted > 0 )
1665 {
1666 int64_t last_cached_row_id = INT64_MIN;
1667 bool first_time = true;
1668 ksort_int64_t( row_ids_sorted, num_rows_sorted );
1669 for ( i = 0; rc==0 && i < num_rows_sorted; i++ )
1670 {
1671 int64_t row_id = row_ids_sorted[ i ];
1672 if ( last_cached_row_id < row_id )
1673 {
1674 VBlob * blob = ( VBlob* )VBlobMRUCacheFind( cself->blob_mru_cache, col_idx, row_id );
1675 if ( blob != NULL )
1676 {
1677 last_cached_row_id = blob->stop_id;
1678 }
1679 else
1680 {
1681 /* prefetch it **/
1682 /** ask production for the blob **/
1683 VBlobMRUCacheCursorContext cctx;
1684
1685 cctx.cache = cself -> blob_mru_cache;
1686 cctx.col_idx = col_idx;
1687 rc = VProductionReadBlob ( col->in, & blob, & row_id, 1, &cctx );
1688 if ( rc == 0 )
1689 {
1690 rc_t rc_cache;
1691 /** always cache prefetch requests **/
1692 if ( first_time )
1693 {
1694 VBlobMRUCacheResumeFlush( cself->blob_mru_cache ); /** next call will clean cache if too big **/
1695 rc_cache = VBlobMRUCacheSave( cself->blob_mru_cache, col_idx, blob );
1696 VBlobMRUCacheSuspendFlush( cself->blob_mru_cache ); /** suspending for the rest **/
1697 first_time = false;
1698 }
1699 else
1700 {
1701 rc_cache = VBlobMRUCacheSave( cself->blob_mru_cache, col_idx, blob );
1702 }
1703
1704 if ( rc_cache == 0 )
1705 {
1706 VBlobRelease( blob );
1707 last_cached_row_id = blob->stop_id;
1708 }
1709 }
1710 else if ( continue_on_error )
1711 {
1712 rc = 0; /** reset failed row ***/
1713 last_cached_row_id = row_id; /*** and skip it **/
1714 }
1715 }
1716 }
1717 }
1718 }
1719 free( row_ids_sorted );
1720 }
1721 else
1722 {
1723 rc= RC( rcVDB, rcCursor, rcReading, rcMemory, rcExhausted );
1724 }
1725 }
1726 return rc;
1727 }
1728
1729
1730 /* OpenParent
1731 * duplicate reference to parent table
1732 * NB - returned reference must be released
1733 */
VTableCursorOpenParentRead(const VCURSOR_IMPL * self,const VTable ** tbl)1734 rc_t VTableCursorOpenParentRead ( const VCURSOR_IMPL *self, const VTable **tbl )
1735 {
1736 rc_t rc;
1737
1738 if ( tbl == NULL )
1739 rc = RC ( rcVDB, rcCursor, rcAccessing, rcParam, rcNull );
1740 else
1741 {
1742 rc = VTableAddRef ( self -> tbl );
1743 if ( rc == 0 )
1744 {
1745 * tbl = self -> tbl;
1746 return 0;
1747 }
1748
1749 * tbl = NULL;
1750 }
1751
1752 return rc;
1753 }
1754
1755 struct insert_overloaded_pb
1756 {
1757 VCursor *curs;
1758 Vector *cx_bind;
1759 };
1760
1761 static
insert_overloaded_scolumns(void * item,void * data)1762 void CC insert_overloaded_scolumns ( void *item, void *data )
1763 {
1764 struct insert_overloaded_pb *pb = data;
1765 const SColumn *scol = ( const void* ) item;
1766
1767 uint32_t ignore;
1768 VTableCursorAddSColumn ( ( VTableCursor * ) pb -> curs, & ignore, scol, NULL, pb -> cx_bind );
1769 }
1770
1771 static
VCursorListCol_walk_through_columns_and_add_to_cursor(VTableCursor * self)1772 void VCursorListCol_walk_through_columns_and_add_to_cursor ( VTableCursor *self )
1773 {
1774 uint32_t idx = VectorStart ( & self -> stbl -> cname );
1775 uint32_t end = VectorLength ( & self -> stbl -> cname );
1776
1777 Vector cx_bind;
1778 struct insert_overloaded_pb pb;
1779 pb . curs = & self -> dad;
1780 pb . cx_bind = & cx_bind;
1781 VectorInit ( & cx_bind, 1, self -> schema -> num_indirect );
1782
1783 for ( end += idx; idx < end; ++idx )
1784 {
1785 /* look at the table column name guy */
1786 const SNameOverload* ol_entry = ( const SNameOverload* ) VectorGet ( & self -> stbl -> cname, idx );
1787 if ( ol_entry != NULL )
1788 VectorForEach ( & ol_entry -> items, false, insert_overloaded_scolumns, & pb );
1789 }
1790
1791 VectorWhack ( & cx_bind, NULL, NULL );
1792 }
1793
1794 static
VCursorListCol_consolidate_and_insert(const VTableCursor * self,BSTree * columns)1795 rc_t VCursorListCol_consolidate_and_insert( const VTableCursor *self, BSTree *columns )
1796 {
1797 rc_t rc = VCursorOpenForListing ( ToConstVCursor ( self ) );
1798 if ( rc == 0 )
1799 {
1800 uint32_t idx = VectorStart ( & self -> dad . row );
1801 uint32_t end = VectorLength ( & self -> dad . row );
1802
1803 for ( end += idx; idx < end; ++idx )
1804 {
1805 const VColumn* vcol = ( const VColumn* ) VectorGet ( & self -> dad . row, idx );
1806 if ( vcol != NULL )
1807 {
1808 VColumnRef *cref;
1809 rc = VColumnRefMake ( & cref, self -> schema, vcol -> scol );
1810 if ( rc != 0 )
1811 break;
1812
1813 rc = BSTreeInsert ( columns, & cref -> n, VColumnRefSort );
1814 assert ( rc == 0 );
1815 }
1816 }
1817 }
1818
1819 return rc;
1820 }
1821
1822
1823 /* ListReadableColumns
1824 * performs an insert of '*' to cursor
1825 * attempts to resolve all read rules
1826 * records all SColumns that successfully resolved
1827 * populates BTree with VColumnRef objects
1828 */
VCursorListReadableColumns(VCURSOR_IMPL * self,BSTree * columns)1829 rc_t VCursorListReadableColumns ( VCURSOR_IMPL *self, BSTree *columns )
1830 {
1831 /* add '*' to cursor */
1832 VCursorListCol_walk_through_columns_and_add_to_cursor ( self );
1833
1834 /* insert all columns into tree */
1835 return VCursorListCol_consolidate_and_insert ( self, columns );
1836 }
1837
VTableCursorLinkedCursorGet(const VCURSOR_IMPL * cself,const char * tbl,VCursor const ** curs)1838 rc_t VTableCursorLinkedCursorGet(const VCURSOR_IMPL * cself,const char *tbl,VCursor const **curs)
1839 {
1840 rc_t rc;
1841
1842 if ( curs == NULL )
1843 rc = RC ( rcVDB, rcCursor, rcAccessing, rcParam, rcNull );
1844 else
1845 {
1846 if ( cself == NULL )
1847 rc = RC(rcVDB, rcCursor, rcAccessing, rcSelf, rcNull);
1848 else if ( tbl == NULL )
1849 rc = RC(rcVDB, rcCursor, rcAccessing, rcName, rcNull);
1850 else if ( tbl [ 0 ] == 0 )
1851 rc = RC(rcVDB, rcCursor, rcAccessing, rcName, rcEmpty);
1852 else
1853 {
1854 LinkedCursorNode *node = (LinkedCursorNode *)
1855 BSTreeFind( & cself -> linked_cursors, tbl, LinkedCursorComp);
1856
1857 if (node == NULL)
1858 rc = RC(rcVDB, rcCursor, rcAccessing, rcName, rcNotFound);
1859 else
1860 {
1861 rc = VCursorAddRef ( node -> curs );
1862 if ( rc == 0 )
1863 {
1864 * curs = node -> curs;
1865 return 0;
1866 }
1867 }
1868 }
1869
1870 * curs = NULL;
1871 }
1872
1873 return rc;
1874 }
1875
VTableCursorLinkedCursorSet(const VCURSOR_IMPL * cself,const char * tbl,VCursor const * curs)1876 rc_t VTableCursorLinkedCursorSet(const VCURSOR_IMPL *cself,const char *tbl,VCursor const *curs)
1877 {
1878 rc_t rc;
1879 VTableCursor *self = (VCURSOR_IMPL *)cself;
1880
1881 if(cself == NULL)
1882 rc = RC(rcVDB, rcCursor, rcAccessing, rcSelf, rcNull);
1883 else if(tbl == NULL)
1884 rc = RC(rcVDB, rcCursor, rcAccessing, rcName, rcNull);
1885 else if(tbl[0] == '\0')
1886 rc = RC(rcVDB, rcCursor, rcAccessing, rcName, rcEmpty);
1887 else
1888 {
1889 rc = VCursorAddRef ( curs );
1890 if ( rc == 0 )
1891 {
1892 LinkedCursorNode *node = malloc ( sizeof * node );
1893 if (node == NULL)
1894 rc = RC(rcVDB, rcCursor, rcAccessing, rcMemory, rcExhausted);
1895 else
1896 {
1897 strncpy ( node->tbl, tbl, sizeof node->tbl );
1898 node->curs = (VCursor*) curs;
1899 rc = BSTreeInsertUnique( & self -> linked_cursors, (BSTNode *)node, NULL, LinkedCursorNodeComp);
1900 if ( rc == 0 )
1901 {
1902 ( ( VTableCursor * ) curs )->is_sub_cursor = true;
1903 return 0;
1904 }
1905
1906 free ( node );
1907 }
1908
1909 VCursorRelease ( curs );
1910 }
1911 }
1912
1913 return rc;
1914 }
1915
1916
1917 /* private */
VCursorParamsGet(struct VCursorParams const * cself,const char * Name,KDataBuffer ** value)1918 LIB_EXPORT rc_t CC VCursorParamsGet( struct VCursorParams const *cself,
1919 const char *Name, KDataBuffer **value )
1920 {
1921 NamedParamNode *node;
1922 String name;
1923 VTableCursor *self = (VTableCursor *)cself; /*TODO: this looks very wrong. Cover with tests and fix */
1924
1925 if (cself == NULL)
1926 return RC(rcVDB, rcCursor, rcAccessing, rcSelf, rcNull);
1927
1928 if (Name == NULL)
1929 return RC(rcVDB, rcCursor, rcAccessing, rcName, rcNull);
1930
1931 if (Name[0] == '\0')
1932 return RC(rcVDB, rcCursor, rcAccessing, rcName, rcEmpty);
1933
1934 StringInitCString(&name, Name);
1935 node = (NamedParamNode *)BSTreeFind(&self->named_params, &name, NamedParamComp);
1936 if (node == NULL)
1937 return RC(rcVDB, rcCursor, rcAccessing, rcName, rcNotFound);
1938
1939 *value = &node->value;
1940 return 0;
1941 }
1942
1943 /* private */
VCursorParamsLookupOrCreate(struct VCursorParams const * cself,const char * Name,KDataBuffer ** value)1944 static rc_t VCursorParamsLookupOrCreate(struct VCursorParams const *cself,
1945 const char *Name, KDataBuffer **value)
1946 {
1947 NamedParamNode *node;
1948 String name;
1949 VTableCursor *self = (VTableCursor *)cself; /*TODO: this looks very wrong. Cover with tests and fix */
1950 rc_t rc;
1951
1952 StringInitCString(&name, Name);
1953 node = (NamedParamNode *)BSTreeFind(&self->named_params, &name, NamedParamComp);
1954 if (node == NULL) {
1955 node = malloc(sizeof(*node) + StringSize(&name) + 1);
1956 if (node == NULL)
1957 return RC(rcVDB, rcCursor, rcAccessing, rcMemory, rcExhausted);
1958
1959 strcpy((char *)(&node[1]), Name);
1960 StringInit ( & node -> name, (const char *)(&node[1]), name . size, name . len );
1961
1962 memset ( & node -> value, 0, sizeof node -> value );
1963 node -> value . elem_bits = 8;
1964
1965 rc = BSTreeInsertUnique(&self->named_params, (BSTNode *)node, NULL, NamedParamNodeComp);
1966 assert(rc == 0);
1967 }
1968 *value = &node->value;
1969 return 0;
1970 }
1971
VCursorParamsVSet(struct VCursorParams const * cself,const char * Name,const char * fmt,va_list args)1972 LIB_EXPORT rc_t CC VCursorParamsVSet(struct VCursorParams const *cself,
1973 const char *Name, const char *fmt, va_list args )
1974 {
1975 KDataBuffer *value;
1976 rc_t rc;
1977
1978 if (cself == NULL)
1979 return RC(rcVDB, rcCursor, rcAccessing, rcSelf, rcNull);
1980
1981 if (Name == NULL)
1982 return RC(rcVDB, rcCursor, rcAccessing, rcName, rcNull);
1983
1984 if (Name[0] == '\0')
1985 return RC(rcVDB, rcCursor, rcAccessing, rcName, rcEmpty);
1986
1987 rc = VCursorParamsLookupOrCreate(cself, Name, &value);
1988 if (rc == 0) {
1989 int n;
1990 char dummy[1], * buffer = dummy;
1991 size_t bsize = sizeof dummy;
1992
1993 va_list copy;
1994 va_copy(copy, args);
1995
1996 if ( value -> base != NULL )
1997 {
1998 buffer = value -> base;
1999 bsize = KDataBufferBytes ( value );
2000 }
2001
2002 /* optimistic printf */
2003 n = vsnprintf ( buffer, bsize, fmt, copy );
2004 va_end(copy);
2005
2006 if ( n < 0 || ( size_t ) n >= bsize )
2007 {
2008 rc = KDataBufferResize ( value, ( n < 0 ) ? 4096 : n + 1 );
2009 if (rc == 0)
2010 {
2011 bsize = KDataBufferBytes ( value );
2012 n = vsnprintf(value->base, bsize, fmt, args);
2013 if ( n < 0 || ( size_t ) n >= bsize )
2014 {
2015 rc = RC ( rcVDB, rcCursor, rcUpdating, rcParam, rcInvalid );
2016 KDataBufferWhack ( value );
2017 }
2018 }
2019 }
2020
2021 if ( rc == 0 )
2022 value -> elem_count = n;
2023 }
2024 return rc;
2025 }
2026
VCursorParamsSet(struct VCursorParams const * cself,const char * name,const char * fmt,...)2027 LIB_EXPORT rc_t CC VCursorParamsSet( struct VCursorParams const *cself,
2028 const char *name, const char *fmt, ... )
2029 {
2030 va_list va;
2031 rc_t rc;
2032
2033 va_start(va, fmt);
2034 rc = VCursorParamsVSet(cself, name, fmt, va);
2035 va_end(va);
2036 return rc;
2037 }
2038
VCursorParamsUnset(struct VCursorParams const * cself,const char * Name)2039 LIB_EXPORT rc_t CC VCursorParamsUnset( struct VCursorParams const *cself, const char *Name ) {
2040 KDataBuffer *value;
2041 rc_t rc;
2042
2043 if (cself == NULL)
2044 return RC(rcVDB, rcCursor, rcAccessing, rcSelf, rcNull);
2045
2046 if (Name == NULL)
2047 return RC(rcVDB, rcCursor, rcAccessing, rcParam, rcNull);
2048
2049 if (Name[0] == '\0')
2050 return RC(rcVDB, rcCursor, rcAccessing, rcParam, rcInvalid);
2051
2052 rc = VCursorParamsGet(cself, Name, &value);
2053 if (rc == 0)
2054 KDataBufferWhack(value);
2055
2056 return rc;
2057 }
2058
VTableCursorGetSchema(const VTableCursor * self)2059 const struct VSchema * VTableCursorGetSchema ( const VTableCursor * self )
2060 {
2061 return self -> schema;
2062 }
2063
run_pagemap_thread(const KThread * t,void * data)2064 static rc_t run_pagemap_thread ( const KThread *t, void *data )
2065 {
2066 rc_t rc;
2067 VTableCursor *self = data;
2068 /* acquire lock */
2069 MTCURSOR_DBG (( "run_pagemap_thread: acquiring lock\n" ));
2070 while((rc = KLockAcquire ( self -> pmpr.lock ))==0){
2071 CHECK_AGAIN:
2072 switch(self->pmpr.state){
2073 case ePMPR_STATE_NONE: /* wait for new request */
2074 case ePMPR_STATE_SERIALIZE_DONE: /* wait for result pickup **/
2075 case ePMPR_STATE_DESERIALIZE_DONE: /* wait for result pickup **/
2076 MTCURSOR_DBG (( "run_pagemap_thread: waiting for new request\n" ));
2077 rc = KConditionWait ( self -> pmpr.cond, self -> pmpr.lock );
2078 goto CHECK_AGAIN;
2079 case ePMPR_STATE_EXIT: /** exit requested ***/
2080 MTCURSOR_DBG (( "run_pagemap_thread: exit by request\n" ));
2081 KLockUnlock(self -> pmpr.lock);
2082 return 0;
2083 case ePMPR_STATE_DESERIALIZE_REQUESTED:
2084 MTCURSOR_DBG (( "run_pagemap_thread: request to deserialize\n" ));
2085 self->pmpr.rc = PageMapDeserialize(&self->pmpr.pm,self->pmpr.data.base,self->pmpr.data.elem_count,self->pmpr.row_count);
2086 if(self->pmpr.rc == 0){
2087 self->pmpr.rc=PageMapExpandFull(self->pmpr.pm);
2088 /*self->pmpr.rc=PageMapExpand(self->pmpr.pm,self->pmpr.row_count<2048?self->pmpr.row_count-1:2048);*/
2089 assert(self->pmpr.rc == 0);
2090 }
2091 self->pmpr.state = ePMPR_STATE_DESERIALIZE_DONE;
2092 /*fprintf(stderr,"Pagemap %p Done R:%6d|DR:%d|LR:%d\n",self->pmpr.lock, self->pmpr.pm->row_count,self->pmpr.pm->data_recs,self->pmpr.pm->leng_recs);*/
2093 KConditionSignal ( self -> pmpr.cond );
2094 KLockUnlock(self -> pmpr.lock);
2095 break;
2096 case ePMPR_STATE_SERIALIZE_REQUESTED:
2097 MTCURSOR_DBG (( "run_pagemap_thread: request to serialize\n" ));
2098 self->pmpr.rc = PageMapSerialize(self->pmpr.pm,&self->pmpr.data,0,&self->pmpr.elem_count);
2099 self->pmpr.state = ePMPR_STATE_SERIALIZE_DONE;
2100 KConditionSignal ( self -> pmpr.cond );
2101 KLockUnlock(self -> pmpr.lock);
2102 break;
2103 default:
2104 assert(0);
2105 KLockUnlock(self -> pmpr.lock);
2106 return RC(rcVDB, rcPagemap, rcConverting, rcParam, rcInvalid );
2107
2108 }
2109 }
2110 MTCURSOR_DBG (( "run_pagemap_thread: exit\n" ));
2111 return rc;
2112 }
2113
VTableCursorLaunchPagemapThread(VCURSOR_IMPL * curs)2114 rc_t VTableCursorLaunchPagemapThread(VCURSOR_IMPL *curs)
2115 {
2116 rc_t rc = 0;
2117 assert ( curs != NULL );
2118 if(curs->pagemap_thread == NULL)
2119 {
2120 if(--curs->launch_cnt<=0)
2121 {
2122 /* ignoring errors because we operate with or without thread */
2123 curs -> pagemap_thread = NULL; /** if fails - will not use **/
2124
2125 if ( s_disable_pagemap_thread )
2126 return RC ( rcVDB, rcCursor, rcExecuting, rcThread, rcNotAvailable );
2127
2128 rc = KLockMake ( & curs -> pmpr.lock );
2129 if(rc == 0)
2130 {
2131 rc = KConditionMake ( & curs -> pmpr.cond );
2132 if(rc == 0)
2133 {
2134 rc = KThreadMake ( & curs -> pagemap_thread, run_pagemap_thread, curs );
2135 if ( rc == 0 )
2136 return 0;
2137
2138 KConditionRelease ( curs -> pmpr . cond );
2139 curs -> pmpr . cond = NULL;
2140 }
2141
2142 KLockRelease ( curs -> pmpr . lock );
2143 curs -> pmpr . lock = NULL;
2144 }
2145 }
2146 }
2147 return rc;
2148 }
2149
VTableCursorTerminatePagemapThread(VCURSOR_IMPL * self)2150 rc_t VTableCursorTerminatePagemapThread(VCURSOR_IMPL *self)
2151 {
2152 rc_t rc=0;
2153
2154 assert ( self != NULL );
2155
2156 if(self -> pagemap_thread != NULL)
2157 {
2158 rc = KLockAcquire ( self -> pmpr.lock );
2159 if ( rc == 0 )
2160 {
2161 self -> pmpr.state = ePMPR_STATE_EXIT;
2162 KConditionSignal ( self -> pmpr.cond );
2163 KLockUnlock ( self -> pmpr.lock );
2164 }
2165 KThreadWait ( self -> pagemap_thread, NULL );
2166 }
2167
2168 KThreadRelease ( self -> pagemap_thread );
2169 KConditionRelease ( self -> pmpr.cond );
2170 KLockRelease ( self -> pmpr.lock );
2171
2172 self -> pagemap_thread = NULL;
2173 self -> pmpr . cond = NULL;
2174 self -> pmpr . lock = NULL;
2175
2176 return rc;
2177 }
2178
2179 /* DisablePagemapThread
2180 * this can cause difficulties for some clients
2181 */
VDBManagerDisablePagemapThread(struct VDBManager const * self)2182 LIB_EXPORT rc_t CC VDBManagerDisablePagemapThread ( struct VDBManager const *self )
2183 {
2184 if ( self == NULL )
2185 return RC ( rcVDB, rcMgr, rcUpdating, rcSelf, rcNull );
2186 s_disable_pagemap_thread = true;
2187 return 0;
2188 }
2189
VTableCursorSetCacheCapacity(VCURSOR_IMPL * self,uint64_t capacity)2190 uint64_t CC VTableCursorSetCacheCapacity(VCURSOR_IMPL *self,uint64_t capacity)
2191 {
2192 if(self) return VBlobMRUCacheSetCapacity(self->blob_mru_cache,capacity);
2193 return 0;
2194 }
VTableCursorGetCacheCapacity(const VCURSOR_IMPL * self)2195 uint64_t CC VTableCursorGetCacheCapacity(const VCURSOR_IMPL *self)
2196 {
2197 if(self) return VBlobMRUCacheGetCapacity(self->blob_mru_cache);
2198 return 0;
2199 }
2200
VTableCursorPageMapProcessRequest(const struct VCURSOR_IMPL * self)2201 const PageMapProcessRequest* VTableCursorPageMapProcessRequest(const struct VCURSOR_IMPL *self)
2202 {
2203 assert ( self != NULL );
2204 return & self -> pmpr;
2205 }
2206
VTableCursorGetTable(const struct VCURSOR_IMPL * self)2207 const struct VTable * VTableCursorGetTable ( const struct VCURSOR_IMPL * self )
2208 {
2209 assert ( self != NULL );
2210 return self -> tbl;
2211 }
2212
VTableCursorCacheActive(const struct VCURSOR_IMPL * self,int64_t * cache_empty_end)2213 bool VTableCursorCacheActive ( const struct VCURSOR_IMPL * self, int64_t * cache_empty_end )
2214 {
2215 assert ( self != NULL );
2216 assert ( cache_empty_end != NULL );
2217 if ( self -> cache_curs && self -> cache_col_active )
2218 {
2219 * cache_empty_end = self -> cache_empty_end;
2220 return true;
2221 }
2222 * cache_empty_end = 0;
2223 return false;
2224 }
2225
VTableCursorIsReadOnly(const struct VCURSOR_IMPL * self)2226 bool VTableCursorIsReadOnly ( const struct VCURSOR_IMPL * self )
2227 {
2228 return self -> read_only;
2229 }
2230
VTableCursorGetBlobMruCache(struct VCURSOR_IMPL * self)2231 VBlobMRUCache * VTableCursorGetBlobMruCache ( struct VCURSOR_IMPL * self )
2232 {
2233 assert ( self != NULL );
2234 return self -> blob_mru_cache;
2235 }
2236
VTableCursorIncrementPhysicalProductionCount(struct VCURSOR_IMPL * self)2237 uint32_t VTableCursorIncrementPhysicalProductionCount ( struct VCURSOR_IMPL * self )
2238 {
2239 assert ( self != NULL );
2240 return ++ self -> phys_cnt;
2241 }
2242
VTableCursorFindOverride(const VCURSOR_IMPL * self,const struct VCtxId * cid)2243 const struct KSymbol * VTableCursorFindOverride ( const VCURSOR_IMPL * self, const struct VCtxId * cid )
2244 {
2245 return STableFindOverride ( self -> stbl, cid );
2246 }
2247
VTableCursorGetBoundTable(const struct VCURSOR_IMPL * self,const struct String * name)2248 const struct VTable * VTableCursorGetBoundTable( const struct VCURSOR_IMPL * self, const struct String * name )
2249 {
2250 return NULL;
2251 }
2252
2253