1 //-< BTREE.CPP >-----------------------------------------------------*--------*
2 // GigaBASE                  Version 1.0         (c) 1999  GARRET    *     ?  *
3 // (Post Relational Database Management System)                      *   /\|  *
4 //                                                                   *  /  \  *
5 //                          Created:      1-Jan-99    K.A. Knizhnik  * / [] \ *
6 //                          Last update: 25-Oct-99    K.A. Knizhnik  * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // B-Tree implementation
9 //-------------------------------------------------------------------*--------*
10 
11 #define INSIDE_GIGABASE
12 
13 #include "gigabase.h"
14 #include "btree.h"
15 
16 
17 BEGIN_GIGABASE_NAMESPACE
18 
19 #ifdef USE_LOCALE_SETTINGS
20 #define SCMP(x,y) STRCOLL(x,y)
21 #else
22 #define SCMP(x,y) STRCMP(x,y)
23 #endif
24 
25 
26 static const int keySize[] = {
27     sizeof(bool),  // tpBool
28     sizeof(int1),  // tpInt1
29     sizeof(int2),  // tpInt2
30     sizeof(int4),  // tpInt4
31     sizeof(db_int8),  // tpInt8
32     sizeof(real4), // tpReal4
33     sizeof(real8), // tpReal8
34     8,             // tpString
35     sizeof(oid_t), // tpReference
36     0,             // tpArray,
37     0,             // tpMethodBool,
38     0,             // tpMethodInt1,
39     0,             // tpMethodInt2,
40     0,             // tpMethodInt4,
41     0,             // tpMethodInt8,
42     0,             // tpMethodReal4,
43     0,             // tpMethodReal8,
44     0,             // tpMethodString,
45     0,             // tpMethodReference,
46     0,             // tpStructure,
47     0,             // tpRawBinary,
48     0,             // tpStdString,
49     0,             // tpMfcString,
50     0,             // tpRectangle,
51     0              // tpUnknown
52 };
53 
find(dbDatabase * db,oid_t treeId,dbSearchContext & sc,dbUDTComparator comparator)54 void dbBtree::find(dbDatabase* db, oid_t treeId, dbSearchContext& sc, dbUDTComparator comparator)
55 {
56     dbGetTie tie;
57     dbBtree* tree = (dbBtree*)db->getRow(tie, treeId);
58     oid_t rootId = tree->root;
59     int   height = tree->height;
60     char_t  firstKeyBuf[dbBtreePage::dbMaxKeyLen];
61     char_t  lastKeyBuf[dbBtreePage::dbMaxKeyLen];
62     if (tree->type == dbField::tpString) {
63         bool tmpKeys = sc.tmpKeys;
64         if (sc.firstKey != NULL) {
65             if (tree->flags & FLAGS_CASE_INSENSITIVE) {
66                 strlower(firstKeyBuf, sc.firstKey);
67                 if (tmpKeys) {
68                     delete[] sc.firstKey;
69                     sc.tmpKeys = false;
70                 }
71                 sc.firstKey = firstKeyBuf;
72             }
73         }
74         if (sc.lastKey != NULL) {
75             if (tree->flags & FLAGS_CASE_INSENSITIVE) {
76                 strlower(lastKeyBuf, sc.lastKey);
77                 if (tmpKeys) {
78                     delete[] sc.lastKey;
79                     sc.tmpKeys = false;
80                 }
81                 sc.lastKey = lastKeyBuf;
82             }
83         }
84 
85     }
86     if (rootId != 0) {
87         byte* page = db->get(rootId);
88         if (tree->flags & FLAGS_THICK) {
89             ((dbThickBtreePage*)page)->find(db, sc, tree->type, tree->sizeofType, comparator, height);
90         } else {
91             ((dbBtreePage*)page)->find(db, sc, tree->type, tree->sizeofType, comparator, height);
92         }
93         db->pool.unfix(page);
94     }
95 }
96 
allocate(dbDatabase * db,int type,int sizeofType,int flags)97 oid_t dbBtree::allocate(dbDatabase* db, int type, int sizeofType, int flags)
98 {
99     oid_t oid = db->allocateId();
100     offs_t pos = db->allocate(sizeof(dbBtree));
101     db->setPos(oid, pos | dbModifiedFlag);
102     dbPutTie tie;
103     tie.set(db->pool, oid, pos, sizeof(dbBtree));
104     dbBtree* tree = (dbBtree*)tie.get();
105     tree->size = sizeof(dbBtree);
106     tree->root = 0;
107     tree->height = 0;
108     tree->type = type;
109     tree->sizeofType = sizeofType;
110     tree->flags = (int1)flags;
111     return oid;
112 }
113 
insert(dbDatabase * db,oid_t treeId,oid_t recordId,int offs,dbUDTComparator comparator)114 bool dbBtree::insert(dbDatabase* db, oid_t treeId, oid_t recordId, int offs, dbUDTComparator comparator)
115 {
116     dbGetTie tie;
117     byte* p = (byte*)db->getRow(tie, recordId);
118     return insert(db, treeId, recordId, p, offs, comparator);
119 }
120 
insert(dbDatabase * db,oid_t treeId,oid_t recordId,byte * p,int offs,dbUDTComparator comparator)121 bool dbBtree::insert(dbDatabase* db, oid_t treeId, oid_t recordId, byte* p, int offs, dbUDTComparator comparator)
122 {
123     dbGetTie treeTie;
124     dbBtree* tree = (dbBtree*)db->getRow(treeTie, treeId);
125     oid_t rootId = tree->root;
126     int   height = tree->height;
127 
128     if (tree->flags & FLAGS_THICK) {
129         dbThickBtreePage::item ins;
130         if (tree->type == dbField::tpString) {
131             ins.keyLen = ((dbVarying*)(p + offs))->size;
132             assert(ins.keyLen <= dbThickBtreePage::dbMaxKeyLen);
133             if (tree->flags & FLAGS_CASE_INSENSITIVE) {
134                 strlower(ins.keyChar, (char_t*)(p + ((dbVarying*)(p + offs))->offs));
135             } else {
136                 memcpy(ins.keyChar, p + ((dbVarying*)(p + offs))->offs, ins.keyLen*sizeof(char_t));
137             }
138         } else if (tree->type == dbField::tpRawBinary) {
139             memcpy(ins.keyChar, p + offs, tree->sizeofType);
140         } else {
141             memcpy(ins.keyChar, p + offs, keySize[tree->type]);
142         }
143         ins.oid = ins.recId = recordId;
144 
145         if (rootId == 0) {
146             dbPutTie tie;
147             dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
148             t->root = dbThickBtreePage::allocate(db, 0, tree->type, tree->sizeofType, ins);
149             t->height = 1;
150         } else {
151             int result;
152             result = dbThickBtreePage::insert(db, rootId, tree->type, tree->sizeofType, comparator, ins, height);
153             assert(result != not_found);
154             if (result == overflow) {
155                 dbPutTie tie;
156                 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
157                 t->root = dbThickBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, ins);
158                 t->height += 1;
159             }
160         }
161     } else { // normal Btree
162         bool unique = (tree->flags & FLAGS_UNIQUE) != 0;
163         dbBtreePage::item ins;
164         if (tree->type == dbField::tpString) {
165             ins.keyLen = ((dbVarying*)(p + offs))->size;
166             assert(ins.keyLen <= dbBtreePage::dbMaxKeyLen);
167             if (tree->flags & FLAGS_CASE_INSENSITIVE) {
168                 strlower(ins.keyChar, (char_t*)(p + ((dbVarying*)(p + offs))->offs));
169             } else {
170                 memcpy(ins.keyChar, p + ((dbVarying*)(p + offs))->offs, ins.keyLen*sizeof(char_t));
171             }
172         } else if (tree->type == dbField::tpRawBinary) {
173             memcpy(ins.keyChar, p + offs, tree->sizeofType);
174         } else {
175             memcpy(ins.keyChar, p + offs, keySize[tree->type]);
176         }
177         ins.oid = recordId;
178 
179         if (rootId == 0) {
180             dbPutTie tie;
181             dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
182             t->root = dbBtreePage::allocate(db, 0, tree->type, tree->sizeofType, ins);
183             t->height = 1;
184         } else {
185             int result;
186             result = dbBtreePage::insert(db, rootId, tree->type, tree->sizeofType, comparator, ins, height, unique);
187             assert(result != not_found);
188             if (result == overflow) {
189                 dbPutTie tie;
190                 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
191                 t->root = dbBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, ins);
192                 t->height += 1;
193             } else if (result == not_unique) {
194                 return false;
195             }
196         }
197     }
198     return true;
199 }
200 
201 
insert(dbDatabase * db,oid_t treeId,dbBtreePage::item & ins,dbUDTComparator comparator)202 bool dbBtree::insert(dbDatabase* db, oid_t treeId, dbBtreePage::item& ins, dbUDTComparator comparator)
203 {
204     dbGetTie treeTie;
205     dbBtree* tree = (dbBtree*)db->getRow(treeTie, treeId);
206     oid_t rootId = tree->root;
207     int   height = tree->height;
208 
209     if (tree->flags & FLAGS_THICK) {
210         dbThickBtreePage::item thickIns;
211         thickIns.oid = thickIns.recId = ins.oid;
212         thickIns.keyLen = ins.keyLen;
213         if (tree->type == dbField::tpString) {
214             memcpy(thickIns.keyChar, ins.keyChar, ins.keyLen*sizeof(char_t));
215             assert(thickIns.keyLen <= dbThickBtreePage::dbMaxKeyLen);
216             if (tree->flags & FLAGS_CASE_INSENSITIVE) {
217                 strlower(thickIns.keyChar, thickIns.keyChar);
218             }
219         } else {
220             thickIns.keyInt8 = ins.keyInt8;
221         }
222 
223         if (rootId == 0) {
224             dbPutTie tie;
225             dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
226             t->root = dbThickBtreePage::allocate(db, 0, tree->type, tree->sizeofType, thickIns);
227             t->height = 1;
228         } else {
229             int result;
230             result = dbThickBtreePage::insert(db, rootId, tree->type, tree->sizeofType, comparator, thickIns, height);
231             assert(result != not_found);
232             if (result == overflow) {
233                 dbPutTie tie;
234                 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
235                 t->root = dbThickBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, thickIns);
236                 t->height += 1;
237             }
238         }
239     } else { // normal Btree
240         bool unique = (tree->flags & FLAGS_UNIQUE) != 0;
241         if (tree->type == dbField::tpString) {
242             assert(ins.keyLen <= dbBtreePage::dbMaxKeyLen);
243             if (tree->flags & FLAGS_CASE_INSENSITIVE) {
244                 strlower(ins.keyChar, ins.keyChar);
245             }
246         }
247 
248         if (rootId == 0) {
249             dbPutTie tie;
250             dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
251             t->root = dbBtreePage::allocate(db, 0, tree->type, tree->sizeofType, ins);
252             t->height = 1;
253         } else {
254             int result;
255             result = dbBtreePage::insert(db, rootId, tree->type, tree->sizeofType, comparator, ins, height, unique);
256             assert(result != not_found);
257             if (result == overflow) {
258                 dbPutTie tie;
259                 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
260                 t->root = dbBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, ins);
261                 t->height += 1;
262             } else if (result == not_unique) {
263                 return false;
264             }
265         }
266     }
267     return true;
268 }
269 
270 
remove(dbDatabase * db,oid_t treeId,oid_t recordId,int offs,dbUDTComparator comparator)271 void dbBtree::remove(dbDatabase* db, oid_t treeId, oid_t recordId, int offs, dbUDTComparator comparator)
272 {
273     dbGetTie tie;
274     byte* p = (byte*)db->getRow(tie, recordId);
275     remove(db, treeId, recordId, p, offs, comparator);
276 }
277 
remove(dbDatabase * db,oid_t treeId,oid_t recordId,byte * p,int offs,dbUDTComparator comparator)278 void dbBtree::remove(dbDatabase* db, oid_t treeId, oid_t recordId, byte* p, int offs, dbUDTComparator comparator)
279 {
280     dbGetTie treeTie;
281     dbBtree* tree = (dbBtree*)db->getRow(treeTie, treeId);
282     oid_t rootId = tree->root;
283     int   height = tree->height;
284 
285     if (rootId == 0) {
286         return;
287     }
288     if (tree->flags & FLAGS_THICK) {
289         dbThickBtreePage::item rem;
290         if (tree->type == dbField::tpString) {
291             rem.keyLen = ((dbVarying*)(p + offs))->size;
292             assert(rem.keyLen <= dbThickBtreePage::dbMaxKeyLen);
293             if (tree->flags & FLAGS_CASE_INSENSITIVE) {
294                 strlower(rem.keyChar, (char_t*)(p + ((dbVarying*)(p + offs))->offs));
295             } else {
296                 memcpy(rem.keyChar, p + ((dbVarying*)(p + offs))->offs, rem.keyLen*sizeof(char_t));
297             }
298         } else if (tree->type == dbField::tpRawBinary) {
299             memcpy(rem.keyChar, p + offs, tree->sizeofType);
300         } else {
301             memcpy(rem.keyChar, p + offs, keySize[tree->type]);
302         }
303         rem.oid = rem.recId = recordId;
304         int result = dbThickBtreePage::remove(db, rootId, tree->type, tree->sizeofType, comparator, rem, height);
305         assert(result != not_found);
306         if (result == underflow) {
307             dbThickBtreePage* page = (dbThickBtreePage*)db->get(rootId);
308             if (page->nItems == 0) {
309                 dbPutTie tie;
310                 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
311                 if (height != 1) {
312                     if (tree->type == dbField::tpString) {
313                         t->root = page->keyStr[0].oid;
314                     } else {
315                         t->root = page->ref[dbThickBtreePage::maxItems-1].oid;
316                     }
317                 } else {
318                     t->root = 0;
319                 }
320                 t->height -= 1;
321                 db->freePage(rootId);
322             }
323             db->pool.unfix(page);
324         } else if (result == dbBtree::overflow) {
325             dbPutTie tie;
326             dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
327             t->root = dbThickBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, rem);
328             t->height += 1;
329         }
330     } else { // normal B-Tree
331         dbBtreePage::item rem;
332         if (tree->type == dbField::tpString) {
333             rem.keyLen = ((dbVarying*)(p + offs))->size;
334             assert(rem.keyLen <= dbBtreePage::dbMaxKeyLen);
335             if (tree->flags & FLAGS_CASE_INSENSITIVE) {
336                 strlower(rem.keyChar, (char_t*)(p + ((dbVarying*)(p + offs))->offs));
337             } else {
338                 memcpy(rem.keyChar, p + ((dbVarying*)(p + offs))->offs, rem.keyLen*sizeof(char_t));
339             }
340         } else if (tree->type == dbField::tpRawBinary) {
341             memcpy(rem.keyChar,  p + offs, tree->sizeofType);
342         } else {
343             memcpy(rem.keyChar, p + offs, keySize[tree->type]);
344         }
345         rem.oid = recordId;
346         int result = dbBtreePage::remove(db, rootId, tree->type, tree->sizeofType, comparator, rem, height);
347         assert(result != not_found);
348         if (result == underflow && height != 1) {
349             dbBtreePage* page = (dbBtreePage*)db->get(rootId);
350             if (page->nItems == 0) {
351                 dbPutTie tie;
352                 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
353                 if (tree->type == dbField::tpString) {
354                     t->root = page->keyStr[0].oid;
355                 } else {
356                     t->root = page->record[dbBtreePage::maxItems-1];
357                 }
358                 t->height -= 1;
359                 db->freePage(rootId);
360             }
361             db->pool.unfix(page);
362         } else if (result == dbBtree::overflow) {
363             dbPutTie tie;
364             dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
365             t->root = dbBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, rem);
366             t->height += 1;
367         }
368     }
369 }
370 
purge(dbDatabase * db,oid_t treeId)371 void dbBtree::purge(dbDatabase* db, oid_t treeId)
372 {
373     dbPutTie tie;
374     dbBtree* tree = (dbBtree*)db->putRow(tie, treeId);
375     if (tree->root != 0) {
376         if (tree->flags & FLAGS_THICK) {
377             dbThickBtreePage::purge(db, tree->root, tree->type, tree->height);
378         } else {
379             dbBtreePage::purge(db, tree->root, tree->type, tree->height);
380         }
381         tree->root = 0;
382         tree->height = 0;
383     }
384 }
385 
drop(dbDatabase * db,oid_t treeId)386 void dbBtree::drop(dbDatabase* db, oid_t treeId)
387 {
388     purge(db, treeId);
389     db->free(db->getPos(treeId) & ~dbFlagsMask, sizeof(dbBtree));
390     db->freeId(treeId);
391 }
392 
traverseForward(dbDatabase * db,oid_t treeId,dbAnyCursor * cursor,dbExprNode * condition)393 void dbBtree::traverseForward(dbDatabase* db, oid_t treeId,
394                               dbAnyCursor* cursor, dbExprNode* condition)
395 {
396     dbGetTie tie;
397     dbBtree* tree = (dbBtree*)db->getRow(tie, treeId);
398     if (tree->root != 0) {
399         byte* page = db->get(tree->root);
400         if (tree->flags & FLAGS_THICK) {
401             ((dbThickBtreePage*)page)->traverseForward(db, cursor, condition, tree->type, tree->height);
402         } else {
403             ((dbBtreePage*)page)->traverseForward(db, cursor, condition, tree->type, tree->height);
404         }
405         db->pool.unfix(page);
406     }
407 }
408 
409 
traverseBackward(dbDatabase * db,oid_t treeId,dbAnyCursor * cursor,dbExprNode * condition)410 void dbBtree::traverseBackward(dbDatabase* db, oid_t treeId,
411                                dbAnyCursor* cursor, dbExprNode* condition)
412 {
413     dbGetTie tie;
414     dbBtree* tree = (dbBtree*)db->getRow(tie, treeId);
415     if (tree->root != 0) {
416         byte* page = db->get(tree->root);
417         if (tree->flags & FLAGS_THICK) {
418             ((dbThickBtreePage*)page)->traverseBackward(db, cursor, condition, tree->type, tree->height);
419         } else {
420             ((dbBtreePage*)page)->traverseBackward(db, cursor, condition, tree->type, tree->height);
421         }
422         db->pool.unfix(page);
423     }
424 }
425 
426 #define FIND(KEY, TYPE)                                                           \
427         if (sc.firstKey != NULL) {                                                \
428             TYPE key = CAST(TYPE, sc.firstKey);                                   \
429             while (l < r)  {                                                      \
430                  int i = (l+r) >> 1;                                              \
431                  if (CHECK(key, EXTRACT(KEY,i), firstKeyInclusion)) {             \
432                      l = i+1;                                                     \
433                  } else {                                                         \
434                      r = i;                                                       \
435                  }                                                                \
436             }                                                                     \
437             assert(r == l);                                                       \
438         }                                                                         \
439         if (sc.lastKey != NULL) {                                                 \
440             TYPE key = CAST(TYPE, sc.lastKey);                                    \
441             if (height == 0) {                                                    \
442                 while (l < n) {                                                   \
443                     if (CHECK(EXTRACT(KEY,l), key, lastKeyInclusion)) {           \
444                         return false;                                             \
445                     }                                                             \
446                     if (!sc.condition                                             \
447                         || db->evaluateBoolean(sc.condition, record[maxItems-1-l],\
448                                         table, sc.cursor))                        \
449                     {                                                             \
450                         if (!sc.cursor->add(record[maxItems-1-l])) {              \
451                             return false;                                         \
452                         }                                                         \
453                     }                                                             \
454                     l += 1;                                                       \
455                 }                                                                 \
456                 return true;                                                      \
457             } else {                                                              \
458                 do {                                                              \
459                     dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-1-l]);\
460                     if (!pg->find(db, sc, type, sizeofType, comparator, height)) {\
461                         db->pool.unfix(pg);                                       \
462                         return false;                                             \
463                     }                                                             \
464                     db->pool.unfix(pg);                                           \
465                     if (l == n) {                                                 \
466                         return true;                                              \
467                     }                                                             \
468                 } while(LESS_OR_EQUAL(EXTRACT(KEY,l++), key));                    \
469                 return false;                                                     \
470             }                                                                     \
471         }                                                                         \
472         break
473 
474 #define FIND_REVERSE(KEY, TYPE)                                                   \
475         if (sc.lastKey != NULL) {                                                 \
476             TYPE key = CAST(TYPE, sc.lastKey);                                    \
477             while (l < r)  {                                                      \
478                  int i = (l+r) >> 1;                                              \
479                  if (CHECK(key, EXTRACT(KEY,i), !lastKeyInclusion)) {             \
480                      l = i+1;                                                     \
481                  } else {                                                         \
482                      r = i;                                                       \
483                  }                                                                \
484             }                                                                     \
485             assert(r == l);                                                       \
486         }                                                                         \
487         if (sc.firstKey != NULL) {                                                \
488             TYPE key = CAST(TYPE, sc.firstKey);                                   \
489             if (height == 0) {                                                    \
490                 while (--r >= 0) {                                                \
491                     if (CHECK(key, EXTRACT(KEY,r), firstKeyInclusion)) {          \
492                         return false;                                             \
493                     }                                                             \
494                     if (!sc.condition                                             \
495                         || db->evaluateBoolean(sc.condition, record[maxItems-1-r],\
496                                         table, sc.cursor))                        \
497                     {                                                             \
498                         if (!sc.cursor->add(record[maxItems-1-r])) {              \
499                             return false;                                         \
500                         }                                                         \
501                     }                                                             \
502                 }                                                                 \
503                 return true;                                                      \
504             } else {                                                              \
505                 do {                                                              \
506                     dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-1-r]);\
507                     if (!pg->find(db, sc, type, sizeofType, comparator, height)) {\
508                         db->pool.unfix(pg);                                       \
509                         return false;                                             \
510                     }                                                             \
511                     db->pool.unfix(pg);                                           \
512                 } while(--r >= 0 && !CHECK(key, EXTRACT(KEY,r), firstKeyInclusion)); \
513                 return r < 0;                                                     \
514             }                                                                     \
515         }                                                                         \
516         break
517 
find(dbDatabase * db,dbSearchContext & sc,int type,int sizeofType,dbUDTComparator comparator,int height)518 bool dbBtreePage::find(dbDatabase* db, dbSearchContext& sc, int type, int sizeofType,
519                        dbUDTComparator comparator, int height)
520 {
521     int l = 0, n = nItems, r = n;
522     int diff;
523     dbTableDescriptor* table = sc.cursor->table;
524     sc.probes += 1;
525     height -= 1;
526     int firstKeyInclusion = sc.firstKeyInclusion;
527     int lastKeyInclusion = sc.lastKeyInclusion;
528 
529     if (sc.ascent) {
530 #define CHECK(a, b, inclusion) (a > b || (a == b && !inclusion))
531 #define EXTRACT(array, index) array[index]
532 #define CAST(TYPE, expr) *(TYPE*)(expr)
533 #define LESS_OR_EQUAL(a, b) (a <= b)
534         switch (type) {
535           case dbField::tpBool:
536           case dbField::tpInt1:
537             FIND(keyInt1, int1);
538           case dbField::tpInt2:
539             FIND(keyInt2, int2);
540           case dbField::tpInt4:
541             FIND(keyInt4, int4);
542           case dbField::tpInt8:
543             FIND(keyInt8, db_int8);
544           case dbField::tpReference:
545             FIND(keyOid, oid_t);
546           case dbField::tpReal4:
547             FIND(keyReal4, real4);
548           case dbField::tpReal8:
549             FIND(keyReal8, real8);
550           case dbField::tpRawBinary:
551 #undef CHECK
552 #undef EXTRACT
553 #undef CAST
554 #undef LESS_OR_EQUAL
555 #define CHECK(a, b, inclusion) ((diff = comparator(a, b, sizeofType)) > 0 || (diff == 0 && !inclusion))
556 #define EXTRACT(array, index) (array + (index)*sizeofType)
557 #define CAST(TYPE, expr) (TYPE)(expr)
558 #define LESS_OR_EQUAL(a, b) (comparator(a, b, sizeofType) <= 0)
559             FIND(keyChar, void*);
560 
561           case dbField::tpString:
562             if (sc.firstKey != NULL) {
563                 char_t* firstKey = sc.firstKey;
564                 while (l < r)  {
565                     int i = (l+r) >> 1;
566                     if (SCMP(firstKey, (char_t*)&keyChar[keyStr[i].offs])
567                         >= sc.firstKeyInclusion)
568                     {
569                         l = i + 1;
570                     } else {
571                         r = i;
572                     }
573                 }
574                 assert(r == l);
575             }
576             if (sc.lastKey != NULL) {
577                 char_t* lastKey = sc.lastKey;
578                 if (height == 0) {
579                     while (l < n) {
580                         if (SCMP((char_t*)&keyChar[keyStr[l].offs],
581                                    lastKey) >= sc.lastKeyInclusion)
582                         {
583                             return false;
584                         }
585                         if (!sc.condition
586                             || db->evaluateBoolean(sc.condition, keyStr[l].oid, table, sc.cursor))
587                         {
588                             if (!sc.cursor->add(keyStr[l].oid)) {
589                                 return false;
590                             }
591                         }
592                         l += 1;
593                     }
594                 } else {
595                     do {
596                         dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[l].oid);
597                         if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
598                             db->pool.unfix(pg);
599                             return false;
600                         }
601                         db->pool.unfix(pg);
602                         if (l == n) {
603                             return true;
604                         }
605                     } while (SCMP((char_t*)&keyChar[keyStr[l++].offs], lastKey) <= 0);
606                     return false;
607                 }
608             } else if (sc.prefixLength != 0) {
609                 char_t*  prefix = sc.firstKey;
610                 if (height == 0) {
611                     while (l < n) {
612                         if (memcmp(&keyChar[keyStr[l].offs], prefix, sc.prefixLength*sizeof(char_t)) != 0) {
613                             return false;
614                         }
615                         if (!sc.condition
616                             || db->evaluateBoolean(sc.condition, keyStr[l].oid, table, sc.cursor))
617                         {
618                             if (!sc.cursor->add(keyStr[l].oid)) {
619                                 return false;
620                             }
621                         }
622                         l += 1;
623                     }
624                 } else {
625                     do {
626                         dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[l].oid);
627                         if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
628                             db->pool.unfix(pg);
629                             return false;
630                         }
631                         db->pool.unfix(pg);
632                         if (l == n) {
633                             return true;
634                         }
635                     } while (memcmp(&keyChar[keyStr[l++].offs], prefix, sc.prefixLength*sizeof(char_t)) == 0);
636                     return false;
637                 }
638             } else {
639                 if (height == 0) {
640                     if (sc.condition) {
641                         while (l < n) {
642                             if (db->evaluateBoolean(sc.condition, keyStr[l].oid, table, sc.cursor)) {
643                                 if (!sc.cursor->add(keyStr[l].oid)) {
644                                     return false;
645                                 }
646                             }
647                             l += 1;
648                         }
649                     } else {
650                         while (l < n) {
651                             if (!sc.cursor->add(keyStr[l].oid)) {
652                                 return false;
653                             }
654                             l += 1;
655                         }
656                     }
657                 } else {
658                     do {
659                         dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[l].oid);
660                         if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
661                             db->pool.unfix(pg);
662                             return false;
663                         }
664                         db->pool.unfix(pg);
665                     } while (++l <= n);
666                 }
667             }
668             return true;
669           default:
670             assert(false);
671         }
672         if (height == 0) {
673             if (sc.condition) {
674                 while (l < n) {
675                     if (db->evaluateBoolean(sc.condition, record[maxItems-1-l], table, sc.cursor)) {
676                         if (!sc.cursor->add(record[maxItems-1-l])) {
677                             return false;
678                         }
679                     }
680                     l += 1;
681                 }
682             } else {
683                 while (l < n) {
684                     if (!sc.cursor->add(record[maxItems-1-l])) {
685                         return false;
686                     }
687                     l += 1;
688                 }
689             }
690         } else {
691             do {
692                 dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-1-l]);
693                 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
694                     db->pool.unfix(pg);
695                     return false;
696                 }
697                 db->pool.unfix(pg);
698             } while (++l <= n);
699         }
700     } else { // descent ordering
701 #undef CHECK
702 #undef EXTRACT
703 #undef CAST
704 #undef LESS_OR_EQUAL
705 #define CHECK(a, b, inclusion) (a > b || (a == b && !inclusion))
706 #define EXTRACT(array, index) array[index]
707 #define CAST(TYPE, expr) *(TYPE*)(expr)
708 #define LESS_OR_EQUAL(a, b) (a <= b)
709         switch (type) {
710           case dbField::tpBool:
711           case dbField::tpInt1:
712             FIND_REVERSE(keyInt1, int1);
713           case dbField::tpInt2:
714             FIND_REVERSE(keyInt2, int2);
715           case dbField::tpInt4:
716             FIND_REVERSE(keyInt4, int4);
717           case dbField::tpInt8:
718             FIND_REVERSE(keyInt8, db_int8);
719           case dbField::tpReference:
720             FIND_REVERSE(keyOid, oid_t);
721           case dbField::tpReal4:
722             FIND_REVERSE(keyReal4, real4);
723           case dbField::tpReal8:
724             FIND_REVERSE(keyReal8, real8);
725 
726           case dbField::tpRawBinary:
727 #undef CHECK
728 #undef EXTRACT
729 #undef CAST
730 #undef LESS_OR_EQUAL
731 #define CHECK(a, b, inclusion) ((diff = comparator(a, b, sizeofType)) > 0 || (diff == 0 && !inclusion))
732 #define EXTRACT(array, index) (array + (index)*sizeofType)
733 #define CAST(TYPE, expr) (TYPE)(expr)
734 #define LESS_OR_EQUAL(a, b) (comparator(a, b, sizeofType) <= 0)
735             FIND_REVERSE(keyChar, void*);
736 
737 
738           case dbField::tpString:
739             if (sc.lastKey != NULL) {
740                 char_t* key = sc.lastKey;
741                 while (l < r)  {
742                     int i = (l+r) >> 1;
743                     if (SCMP(key, (char_t*)&keyChar[keyStr[i].offs]) >= 1-lastKeyInclusion) {
744                         l = i + 1;
745                     } else {
746                         r = i;
747                     }
748                 }
749                 assert(r == l);
750             }
751             if (sc.firstKey != NULL) {
752                 char_t* key = sc.firstKey;
753                 if (height == 0) {
754                     while (--r >= 0) {
755                         if (SCMP(key, (char_t*)&keyChar[keyStr[r].offs]) >= firstKeyInclusion) {
756                             return false;
757                         }
758                         if (!sc.condition
759                             || db->evaluateBoolean(sc.condition, keyStr[r].oid, table, sc.cursor))
760                         {
761                             if (!sc.cursor->add(keyStr[r].oid)) {
762                                 return false;
763                             }
764                         }
765                     }
766                 } else {
767                     do {
768                         dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[r].oid);
769                         if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
770                             db->pool.unfix(pg);
771                             return false;
772                         }
773                         db->pool.unfix(pg);
774                     } while (--r >= 0 && SCMP(key, (char_t*)&keyChar[keyStr[r].offs]) < firstKeyInclusion);
775                     return r < 0;
776                 }
777             } else {
778                 if (height == 0) {
779                     if (sc.condition) {
780                         while (--r >= 0) {
781                             if (db->evaluateBoolean(sc.condition, keyStr[r].oid, table, sc.cursor)) {
782                                 if (!sc.cursor->add(keyStr[r].oid)) {
783                                     return false;
784                                 }
785                             }
786                         }
787                     } else {
788                         while (--r >= 0) {
789                             if (!sc.cursor->add(keyStr[r].oid)) {
790                                 return false;
791                             }
792                         }
793                     }
794                 } else {
795                     do {
796                         dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[r].oid);
797                         if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
798                             db->pool.unfix(pg);
799                             return false;
800                         }
801                         db->pool.unfix(pg);
802                     } while (--r >= 0);
803                 }
804             }
805             return true;
806           default:
807             assert(false);
808         }
809         if (height == 0) {
810             if (sc.condition) {
811                 while (--r >= 0) {
812                     if (db->evaluateBoolean(sc.condition, record[maxItems-1-r], table, sc.cursor)) {
813                         if (!sc.cursor->add(record[maxItems-1-r])) {
814                             return false;
815                         }
816                     }
817                 }
818             } else {
819                 while (--r >= 0) {
820                     if (!sc.cursor->add(record[maxItems-1-r])) {
821                         return false;
822                     }
823                 }
824             }
825         } else {
826             do {
827                 dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-1-r]);
828                 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
829                     db->pool.unfix(pg);
830                     return false;
831                 }
832                 db->pool.unfix(pg);
833             } while (--r >= 0);
834         }
835     }
836     return true;
837 }
838 
839 
allocate(dbDatabase * db,oid_t root,int type,int sizeofType,item & ins)840 oid_t dbBtreePage::allocate(dbDatabase* db, oid_t root, int type, int sizeofType, item& ins)
841 {
842     oid_t pageId = db->allocatePage();
843     dbBtreePage* page = (dbBtreePage*)db->put(pageId);
844     page->nItems = 1;
845     if (type == dbField::tpString) {
846         page->size = ins.keyLen*sizeof(char_t);
847         page->keyStr[0].offs = (nat2)(sizeof(page->keyChar) - ins.keyLen*sizeof(char_t));
848         page->keyStr[0].size = ins.keyLen;
849         page->keyStr[0].oid = ins.oid;
850         page->keyStr[1].oid = root;
851         memcpy(&page->keyChar[page->keyStr[0].offs], ins.keyChar, ins.keyLen*sizeof(char_t));
852     } else if (type == dbField::tpRawBinary) {
853         memcpy(page->keyChar, ins.keyChar, sizeofType);
854         page->record[maxItems-1] = ins.oid;
855         page->record[maxItems-2] = root;
856     } else {
857         memcpy(page->keyChar, ins.keyChar, keySize[type]);
858         page->record[maxItems-1] = ins.oid;
859         page->record[maxItems-2] = root;
860     }
861     db->pool.unfix(page);
862     return pageId;
863 }
864 
865 
866 #define INSERT(KEY, TYPE) {                                                 \
867     TYPE key = ins.KEY;                                                     \
868     while (l < r)  {                                                        \
869         int i = (l+r) >> 1;                                                 \
870         if (key > pg->KEY[i]) l = i+1; else r = i;                          \
871     }                                                                       \
872     assert(l == r);                                                         \
873     /* insert before e[r] */                                                \
874     if (--height != 0) {                                                    \
875         result = insert(db, pg->record[maxItems-r-1], type, sizeofType, comparator, ins, height, unique); \
876         assert(result == dbBtree::not_unique || result == dbBtree::done || result == dbBtree::overflow); \
877         if (result != dbBtree::overflow) {                                  \
878             db->pool.unfix(pg);                                             \
879             return result;                                                  \
880         }                                                                   \
881         n += 1;                                                             \
882     } else if (unique && r < n && key == pg->KEY[r]) {                      \
883         db->pool.unfix(pg);                                                 \
884         return dbBtree::not_unique;                                                  \
885     }                                                                       \
886     db->pool.unfix(pg);                                                     \
887     pg = (dbBtreePage*)db->put(tie, pageId);                                \
888     const int max = sizeof(pg->KEY) / (sizeof(oid_t) + sizeof(TYPE));       \
889     if (n < max) {                                                          \
890         memmove(&pg->KEY[r+1], &pg->KEY[r], (n - r)*sizeof(TYPE));          \
891         memmove(&pg->record[maxItems-n-1], &pg->record[maxItems-n],         \
892                (n-r)*sizeof(oid_t));                                        \
893         pg->KEY[r] = ins.KEY;                                               \
894         pg->record[maxItems-r-1] = ins.oid;                                 \
895         pg->nItems += 1;                                                    \
896         return dbBtree::done;                                               \
897     } else { /* page is full then divide page */                            \
898         oid_t pageId = db->allocatePage();                                  \
899         dbBtreePage* b = (dbBtreePage*)db->put(pageId);                     \
900         assert(n == max);                                                   \
901         const int m = (max+1)/2;                                        \
902         if (r < m) {                                                        \
903             memcpy(b->KEY, pg->KEY, r*sizeof(TYPE));                        \
904             b->KEY[r] = ins.KEY;                                            \
905             memcpy(&b->KEY[r+1], &pg->KEY[r], (m-r-1)*sizeof(TYPE));        \
906             memmove(pg->KEY, &pg->KEY[m-1], (max-m+1)*sizeof(TYPE));        \
907             memcpy(&b->record[maxItems-r], &pg->record[maxItems-r],         \
908                    r*sizeof(oid_t));                                        \
909             b->record[maxItems-r-1] = ins.oid;                              \
910             memcpy(&b->record[maxItems-m], &pg->record[maxItems-m+1],       \
911                    (m-r-1)*sizeof(oid_t));                                  \
912             memmove(&pg->record[maxItems-max+m-1],&pg->record[maxItems-max],\
913                     (max-m+1)*sizeof(oid_t));                               \
914         } else {                                                            \
915             memcpy(b->KEY, pg->KEY, m*sizeof(TYPE));                        \
916             memmove(pg->KEY, &pg->KEY[m], (r-m)*sizeof(TYPE));              \
917             pg->KEY[r-m] = ins.KEY;                                         \
918             memmove(&pg->KEY[r-m+1], &pg->KEY[r], (max-r)*sizeof(TYPE));    \
919             memcpy(&b->record[maxItems-m], &pg->record[maxItems-m],         \
920                    m*sizeof(oid_t));                                        \
921             memmove(&pg->record[maxItems-r+m], &pg->record[maxItems-r],     \
922                     (r-m)*sizeof(oid_t));                                   \
923             pg->record[maxItems-r+m-1] = ins.oid;                           \
924             memmove(&pg->record[maxItems-max+m-1],&pg->record[maxItems-max],\
925                     (max-r)*sizeof(oid_t));                                 \
926         }                                                                   \
927         ins.oid = pageId;                                                   \
928         ins.KEY = b->KEY[m-1];                                              \
929         if (height == 0) {                                                  \
930             pg->nItems = max - m + 1;                                       \
931             b->nItems = m;                                                  \
932         } else {                                                            \
933             pg->nItems = max - m;                                           \
934             b->nItems = m - 1;                                              \
935         }                                                                   \
936         db->pool.unfix(b);                                                  \
937         return dbBtree::overflow;                                           \
938     }                                                                       \
939 }
940 
941 
insert(dbDatabase * db,oid_t pageId,int type,int sizeofType,dbUDTComparator comparator,item & ins,int height,bool unique)942 int dbBtreePage::insert(dbDatabase* db, oid_t pageId, int type, int sizeofType,
943                         dbUDTComparator comparator, item& ins, int height, bool unique)
944 {
945     dbPutTie tie;
946     dbBtreePage* pg = (dbBtreePage*)db->get(pageId);
947     int result;
948     int l = 0, n = pg->nItems, r = n;
949 
950     switch (type) {
951       case dbField::tpBool:
952       case dbField::tpInt1:
953         INSERT(keyInt1, int1);
954       case dbField::tpInt2:
955         INSERT(keyInt2, int2);
956       case dbField::tpInt4:
957         INSERT(keyInt4, int4);
958       case dbField::tpInt8:
959         INSERT(keyInt8, db_int8);
960       case dbField::tpReference:
961         INSERT(keyOid, oid_t);
962       case dbField::tpReal4:
963         INSERT(keyReal4, real4);
964       case dbField::tpReal8:
965         INSERT(keyReal8, real8);
966       case dbField::tpRawBinary:
967       {
968          char* key = (char*)ins.keyChar;
969          while (l < r)  {
970              int i = (l+r) >> 1;
971              if (comparator(key, pg->keyChar + i*sizeofType, sizeofType) > 0) l = i+1; else r = i;
972          }
973          assert(l == r);
974          /* insert before e[r] */
975          if (--height != 0) {
976              result = insert(db, pg->record[maxItems-r-1], type, sizeofType, comparator, ins, height, unique);
977              assert(result == dbBtree::not_unique || result == dbBtree::done || result == dbBtree::overflow);
978              if (result != dbBtree::overflow) {
979                  db->pool.unfix(pg);
980                  return result;
981              }
982              n += 1;
983          } else if (unique && r < n && comparator(key, pg->keyChar + r*sizeofType, sizeofType) == 0) {
984              db->pool.unfix(pg);
985              return dbBtree::not_unique;
986          }
987          db->pool.unfix(pg);
988          pg = (dbBtreePage*)db->put(tie, pageId);
989          const int max = sizeof(pg->keyChar) / (sizeof(oid_t) + sizeofType);
990          if (n < max) {
991              memmove(pg->keyChar + (r+1)*sizeofType, pg->keyChar + r*sizeofType, (n - r)*sizeofType);
992              memmove(&pg->record[maxItems-n-1], &pg->record[maxItems-n],
993                     (n-r)*sizeof(oid_t));
994              memcpy(pg->keyChar + r*sizeofType, ins.keyChar, sizeofType);
995              pg->record[maxItems-r-1] = ins.oid;
996              pg->nItems += 1;
997              return dbBtree::done;
998          } else { /* page is full then divide page */
999              oid_t pageId = db->allocatePage();
1000              dbBtreePage* b = (dbBtreePage*)db->put(pageId);
1001              assert(n == max);
1002              const int m = (max+1)/2;
1003              if (r < m) {
1004                  memcpy(b->keyChar, pg->keyChar, r*sizeofType);
1005                  memcpy(b->keyChar + r*sizeofType, ins.keyChar, sizeofType);
1006                  memcpy(b->keyChar + (r+1)*sizeofType, pg->keyChar + r*sizeofType, (m-r-1)*sizeofType);
1007                  memmove(pg->keyChar, pg->keyChar + (m-1)*sizeofType, (max-m+1)*sizeofType);
1008                  memcpy(&b->record[maxItems-r], &pg->record[maxItems-r],
1009                         r*sizeof(oid_t));
1010                  b->record[maxItems-r-1] = ins.oid;
1011                  memcpy(&b->record[maxItems-m], &pg->record[maxItems-m+1],
1012                         (m-r-1)*sizeof(oid_t));
1013                  memmove(&pg->record[maxItems-max+m-1],&pg->record[maxItems-max],
1014                          (max-m+1)*sizeof(oid_t));
1015              } else {
1016                  memcpy(b->keyChar, pg->keyChar, m*sizeofType);
1017                  memmove(pg->keyChar, pg->keyChar + m*sizeofType, (r-m)*sizeofType);
1018                  memcpy(pg->keyChar + (r-m)*sizeofType, ins.keyChar, sizeofType);
1019                  memmove(pg->keyChar + (r-m+1)*sizeofType, pg->keyChar + r*sizeofType,
1020                         (max-r)*sizeofType);
1021                  memcpy(&b->record[maxItems-m], &pg->record[maxItems-m],
1022                         m*sizeof(oid_t));
1023                  memmove(&pg->record[maxItems-r+m], &pg->record[maxItems-r],
1024                          (r-m)*sizeof(oid_t));
1025                  pg->record[maxItems-r+m-1] = ins.oid;
1026                  memmove(&pg->record[maxItems-max+m-1],&pg->record[maxItems-max],
1027                          (max-r)*sizeof(oid_t));
1028              }
1029              ins.oid = pageId;
1030              memcpy(ins.keyChar, b->keyChar + (m-1)*sizeofType, sizeofType);
1031              if (height == 0) {
1032                  pg->nItems = max - m + 1;
1033                  b->nItems = m;
1034              } else {
1035                  pg->nItems = max - m;
1036                  b->nItems = m - 1;
1037              }
1038              db->pool.unfix(b);
1039              return dbBtree::overflow;
1040          }
1041       }
1042       case dbField::tpString:
1043       {
1044         char_t* key = ins.keyChar;
1045         while (l < r)  {
1046             int i = (l+r) >> 1;
1047             if (SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[i].offs]) > 0) {
1048                 l = i+1;
1049             } else {
1050                 r = i;
1051             }
1052         }
1053         if (--height != 0) {
1054             result = insert(db, pg->keyStr[r].oid, type, sizeofType, comparator, ins, height, unique);
1055             assert (result != dbBtree::not_found);
1056             if (result != dbBtree::overflow) {
1057                 db->pool.unfix(pg);
1058                 return result;
1059             }
1060         } else if (unique && r < n && SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[r].offs]) == 0) {
1061             db->pool.unfix(pg);
1062             return dbBtree::not_unique;
1063         }
1064         db->pool.unfix(pg);
1065         pg = (dbBtreePage*)db->put(tie, pageId);
1066         return pg->insertStrKey(db, r, ins, height);
1067       }
1068       default:
1069         assert(false);
1070     }
1071     return dbBtree::done;
1072 }
1073 
insertStrKey(dbDatabase * db,int r,item & ins,int height)1074 int dbBtreePage::insertStrKey(dbDatabase* db, int r, item& ins, int height)
1075 {
1076     int n = height != 0 ? nItems+1 : nItems;
1077     // insert before e[r]
1078     int len = ins.keyLen;
1079     if (size + len*sizeof(char_t) + (n+1)*sizeof(str) <= sizeof(keyChar)) {
1080         memmove(&keyStr[r+1], &keyStr[r], (n-r)*sizeof(str));
1081         size += len*sizeof(char_t);
1082         keyStr[r].offs = nat2(sizeof(keyChar) - size);
1083         keyStr[r].size = len;
1084         keyStr[r].oid = ins.oid;
1085         memcpy(&keyChar[sizeof(keyChar) - size], ins.keyChar, len*sizeof(char_t));
1086         nItems += 1;
1087     } else { // page is full then divide page
1088         oid_t pageId = db->allocatePage();
1089         dbBtreePage* b = (dbBtreePage*)db->put(pageId);
1090         size_t moved = 0;
1091         size_t inserted = len*sizeof(char_t) + sizeof(str);
1092         long prevDelta = (1L << (sizeof(long)*8-1)) + 1;
1093         for (int bn = 0, i = 0; ; bn += 1) {
1094             size_t addSize, subSize;
1095             int j = nItems - i - 1;
1096             size_t keyLen = keyStr[i].size;
1097             if (bn == r) {
1098                 keyLen = len;
1099                 inserted = 0;
1100                 addSize = len;
1101                 if (height == 0) {
1102                     subSize = 0;
1103                     j += 1;
1104                 } else {
1105                     subSize = keyStr[i].size;
1106                 }
1107             } else {
1108                 addSize = subSize = keyLen;
1109                 if (height != 0) {
1110                     if (i + 1 != r) {
1111                         subSize += keyStr[i+1].size;
1112                         j -= 1;
1113                     } else {
1114                         inserted = 0;
1115                     }
1116                 }
1117             }
1118             long delta = (long)((moved + addSize*sizeof(char_t) + (bn+1)*sizeof(str))
1119                 - (j*sizeof(str) + size - subSize*sizeof(char_t) + inserted));
1120             if (delta >= -prevDelta) {
1121                 char_t insKey[dbBtreePage::dbMaxKeyLen];
1122                 if (bn <= r) {
1123                     memcpy(insKey, ins.keyChar, len*sizeof(char_t));
1124                 }
1125                 if (height == 0) {
1126                     ins.keyLen = b->keyStr[bn-1].size;
1127                     memcpy(ins.keyChar, &b->keyChar[b->keyStr[bn-1].offs], ins.keyLen*sizeof(char_t));
1128                 } else {
1129                     assert(((void)"String fits in the B-Tree page",
1130                             moved + (bn+1)*sizeof(str) <= sizeof(keyChar)));
1131                     if (bn != r) {
1132                         ins.keyLen = (int)keyLen;
1133                         memcpy(ins.keyChar, &keyChar[keyStr[i].offs], keyLen*sizeof(char_t));
1134                         b->keyStr[bn].oid = keyStr[i].oid;
1135                         size -= (nat4)(keyLen*sizeof(char_t));
1136                         i += 1;
1137                     } else {
1138                         b->keyStr[bn].oid = ins.oid;
1139                     }
1140                 }
1141                 compactify(db, i);
1142                 if (bn < r || (bn == r && height == 0)) {
1143                     memmove(&keyStr[r-i+1], &keyStr[r-i],
1144                             (n - r)*sizeof(str));
1145                     size += len*sizeof(char_t);
1146                     nItems += 1;
1147                     assert(((void)"String fits in the B-Tree page",
1148                             size + (n-i+1)*sizeof(str) <= sizeof(keyChar)));
1149                     keyStr[r-i].offs = (nat2)(sizeof(keyChar) - size);
1150                     keyStr[r-i].size = len;
1151                     keyStr[r-i].oid = ins.oid;
1152                     memcpy(&keyChar[keyStr[r-i].offs], insKey, len*sizeof(char_t));
1153                 }
1154                 b->nItems = bn;
1155                 b->size = (nat4)moved;
1156                 ins.oid = pageId;
1157                 db->pool.unfix(b);
1158                 return dbBtree::overflow;
1159             }
1160             prevDelta = delta;
1161             moved += keyLen*sizeof(char_t);
1162             assert(((void)"String fits in the B-Tree page",
1163                     moved + (bn+1)*sizeof(str) <= sizeof(keyChar)));
1164             b->keyStr[bn].size = (nat2)keyLen;
1165             b->keyStr[bn].offs = (nat2)(sizeof(keyChar) - moved);
1166             if (bn == r) {
1167                 b->keyStr[bn].oid = ins.oid;
1168                 memcpy(&b->keyChar[b->keyStr[bn].offs], ins.keyChar, keyLen*sizeof(char_t));
1169             } else {
1170                 b->keyStr[bn].oid = keyStr[i].oid;
1171                 memcpy(&b->keyChar[b->keyStr[bn].offs],
1172                        &keyChar[keyStr[i].offs], keyLen*sizeof(char_t));
1173                 size -= (nat4)(keyLen*sizeof(char_t));
1174                 i += 1;
1175             }
1176         }
1177     }
1178     return nItems == 0 || (size + sizeof(str)*(nItems+1))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
1179         ? dbBtree::underflow : dbBtree::done;
1180 }
1181 
compactify(dbDatabase * db,int m)1182 void dbBtreePage::compactify(dbDatabase* db, int m)
1183 {
1184     int i, j, offs, len, n = nItems;
1185     // int size[dbPageSize]; // for 8Kb default page size, total size used by these buffers will be 64kb which is too much for many environments
1186     // int index[dbPageSize];
1187     int* size = db->btreeBuf;
1188     int* index = size + dbPageSize;
1189     if (m == 0) {
1190         return;
1191     }
1192     if (m < 0) {
1193         m = -m;
1194         for (i = 0; i < n-m; i++) {
1195             len = keyStr[i].size;
1196             size[keyStr[i].offs + len*sizeof(char_t)] = len;
1197             index[keyStr[i].offs + len*sizeof(char_t)] = i;
1198         }
1199         for (; i < n; i++) {
1200             len = keyStr[i].size;
1201             size[keyStr[i].offs + len*sizeof(char_t)] = len;
1202             index[keyStr[i].offs + len*sizeof(char_t)] = -1;
1203         }
1204     } else {
1205         for (i = 0; i < m; i++) {
1206             len = keyStr[i].size;
1207             size[keyStr[i].offs + len*sizeof(char_t)] = len;
1208             index[keyStr[i].offs + len*sizeof(char_t)] = -1;
1209         }
1210         for (; i < n; i++) {
1211             len = keyStr[i].size;
1212             size[keyStr[i].offs + len*sizeof(char_t)] = len;
1213             index[keyStr[i].offs + len*sizeof(char_t)] = i - m;
1214             keyStr[i-m].oid = keyStr[i].oid;
1215             keyStr[i-m].size = len;
1216         }
1217         keyStr[i-m].oid = keyStr[i].oid;
1218     }
1219     nItems = n -= m;
1220     for (offs = sizeof(keyChar), i = offs; n != 0; i -= len) {
1221         len = size[i]*sizeof(char_t);
1222         j = index[i];
1223         if (j >= 0) {
1224             offs -= len;
1225             n -= 1;
1226             keyStr[j].offs = offs;
1227             if (offs != i - len) {
1228                 memmove(&keyChar[offs], &keyChar[(i - len)], len);
1229             }
1230         }
1231     }
1232 }
1233 
removeStrKey(dbDatabase * db,int r)1234 int dbBtreePage::removeStrKey(dbDatabase* db, int r)
1235 {
1236     int len = keyStr[r].size;
1237     int offs = keyStr[r].offs;
1238     memmove(keyChar + sizeof(keyChar) - size + len*sizeof(char_t),
1239             keyChar + sizeof(keyChar) - size,
1240             size - sizeof(keyChar) + offs);
1241     memmove(&keyStr[r], &keyStr[r+1], (nItems-r)*sizeof(str));
1242     nItems -= 1;
1243     size -= len*sizeof(char_t);
1244     for (int i = nItems; --i >= 0; ) {
1245         if (keyStr[i].offs < offs) {
1246             keyStr[i].offs += (nat2)(len*sizeof(char_t));
1247         }
1248     }
1249     return nItems == 0 || (size + sizeof(str)*(nItems+1))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
1250         ? dbBtree::underflow : dbBtree::done;
1251 }
1252 
replaceStrKey(dbDatabase * db,int r,item & ins,int height)1253 int dbBtreePage::replaceStrKey(dbDatabase* db, int r, item& ins, int height)
1254 {
1255     ins.oid = keyStr[r].oid;
1256     removeStrKey(db, r);
1257     return insertStrKey(db, r, ins, height);
1258 }
1259 
handlePageUnderflow(dbDatabase * db,int r,int type,int sizeofType,item & rem,int height)1260 int dbBtreePage::handlePageUnderflow(dbDatabase* db, int r, int type, int sizeofType, item& rem,
1261                                      int height)
1262 {
1263     dbPutTie tie;
1264     if (type == dbField::tpString) {
1265         dbBtreePage* a = (dbBtreePage*)db->put(tie, keyStr[r].oid);
1266         int an = a->nItems;
1267         if (r < (int)nItems) { // exists greater page
1268             dbBtreePage* b = (dbBtreePage*)db->get(keyStr[r+1].oid);
1269             int bn = b->nItems;
1270             size_t merged_size = (an+bn)*sizeof(str) + a->size + b->size;
1271             if (height != 1) {
1272                 merged_size += keyStr[r].size*sizeof(char_t) + sizeof(str)*2;
1273             }
1274             if (merged_size > sizeof(keyChar)) {
1275                 // reallocation of nodes between pages a and b
1276                 int i, j, k;
1277                 dbPutTie tie;
1278                 db->pool.unfix(b);
1279                 b = (dbBtreePage*)db->put(tie, keyStr[r+1].oid);
1280                 size_t size_a = a->size;
1281                 size_t size_b = b->size;
1282                 size_t addSize, subSize;
1283                 if (height != 1) {
1284                     addSize = keyStr[r].size;
1285                     subSize = b->keyStr[0].size;
1286                 } else {
1287                     addSize = subSize = b->keyStr[0].size;
1288                 }
1289                 i = 0;
1290                 long prevDelta = (long)((an*sizeof(str) + size_a) - (bn*sizeof(str) + size_b));
1291                 while (true) {
1292                     i += 1;
1293                     long delta = (long)(((an+i)*sizeof(str) + size_a + addSize*sizeof(char_t))
1294                          - ((bn-i)*sizeof(str) + size_b - subSize*sizeof(char_t)));
1295                     if (delta >= 0) {
1296                         if (delta >= -prevDelta) {
1297                             i -= 1;
1298                         }
1299                         break;
1300                     }
1301                     size_a += addSize*sizeof(char_t);
1302                     size_b -= subSize*sizeof(char_t);
1303                     prevDelta = delta;
1304                     if (height != 1) {
1305                         addSize = subSize;
1306                         subSize = b->keyStr[i].size;
1307                     } else {
1308                         addSize = subSize = b->keyStr[i].size;
1309                     }
1310                 }
1311                 int result = dbBtree::done;
1312                 if (i > 0) {
1313                     k = i;
1314                     if (height != 1) {
1315                         int len = keyStr[r].size;
1316                         a->size += len*sizeof(char_t);
1317                         a->keyStr[an].offs = (nat2)(sizeof(a->keyChar) - a->size);
1318                         a->keyStr[an].size = len;
1319                         memcpy(a->keyChar + a->keyStr[an].offs,
1320                                keyChar + keyStr[r].offs, len*sizeof(char_t));
1321                         k -= 1;
1322                         an += 1;
1323                         a->keyStr[an+k].oid = b->keyStr[k].oid;
1324                         b->size -= b->keyStr[k].size*sizeof(char_t);
1325                     }
1326                     for (j = 0; j < k; j++) {
1327                         int len = b->keyStr[j].size;
1328                         a->size += len*sizeof(char_t);
1329                         b->size -= len*sizeof(char_t);
1330                         a->keyStr[an].offs = (nat2)(sizeof(a->keyChar) - a->size);
1331                         a->keyStr[an].size = len;
1332                         a->keyStr[an].oid = b->keyStr[j].oid;
1333                         memcpy(a->keyChar + a->keyStr[an].offs,
1334                                b->keyChar + b->keyStr[j].offs, len*sizeof(char_t));
1335                         an += 1;
1336                     }
1337                     memcpy(rem.keyChar, b->keyChar + b->keyStr[i-1].offs,
1338                            b->keyStr[i-1].size*sizeof(char_t));
1339                     rem.keyLen = b->keyStr[i-1].size;
1340                     result = replaceStrKey(db, r, rem, height);
1341                     a->nItems = an;
1342                     b->compactify(db, i);
1343                 }
1344                 assert(a->nItems > 0 && b->nItems > 0);
1345                 return result;
1346             } else { // merge page b to a
1347                 if (height != 1) {
1348                     a->size += (a->keyStr[an].size = keyStr[r].size)*sizeof(char_t);
1349                     a->keyStr[an].offs = (nat2)(sizeof(keyChar) - a->size);
1350                     memcpy(&a->keyChar[a->keyStr[an].offs],
1351                            &keyChar[keyStr[r].offs], keyStr[r].size*sizeof(char_t));
1352                     an += 1;
1353                     a->keyStr[an+bn].oid = b->keyStr[bn].oid;
1354                 }
1355                 for (int i = 0; i < bn; i++, an++) {
1356                     a->keyStr[an] = b->keyStr[i];
1357                     a->keyStr[an].offs -= a->size;
1358                 }
1359                 a->size += b->size;
1360                 a->nItems = an;
1361                 memcpy(a->keyChar + sizeof(keyChar) - a->size,
1362                        b->keyChar + sizeof(keyChar) - b->size,
1363                        b->size);
1364                 db->pool.unfix(b);
1365                 db->freePage(keyStr[r+1].oid);
1366                 keyStr[r+1].oid = keyStr[r].oid;
1367                 return removeStrKey(db, r);
1368             }
1369         } else { // page b is before a
1370             dbBtreePage* b = (dbBtreePage*)db->get(keyStr[r-1].oid);
1371             int bn = b->nItems;
1372             size_t merged_size = (an+bn)*sizeof(str) + a->size + b->size;
1373             if (height != 1) {
1374                 merged_size += keyStr[r-1].size*sizeof(char_t) + sizeof(str)*2;
1375             }
1376             if (merged_size > sizeof(keyChar)) {
1377                 // reallocation of nodes between pages a and b
1378                 dbPutTie tie;
1379                 int i, j, k;
1380                 db->pool.unfix(b);
1381                 b = (dbBtreePage*)db->put(tie, keyStr[r-1].oid);
1382                 size_t size_a = a->size;
1383                 size_t size_b = b->size;
1384                 size_t addSize, subSize;
1385                 if (height != 1) {
1386                     addSize = keyStr[r-1].size;
1387                     subSize = b->keyStr[bn-1].size;
1388                 } else {
1389                     addSize = subSize = b->keyStr[bn-1].size;
1390                 }
1391                 i = 0;
1392                 long prevDelta = (long)((an*sizeof(str) + size_a) - (bn*sizeof(str) + size_b));
1393                 while (true) {
1394                     i += 1;
1395                     long delta = (long)(((an+i)*sizeof(str) + size_a + addSize*sizeof(char_t))
1396                          - ((bn-i)*sizeof(str) + size_b - subSize*sizeof(char_t)));
1397                     if (delta >= 0) {
1398                         if (delta >= -prevDelta) {
1399                             i -= 1;
1400                         }
1401                         break;
1402                     }
1403                     prevDelta = delta;
1404                     size_a += addSize*sizeof(char_t);
1405                     size_b -= subSize*sizeof(char_t);
1406                     if (height != 1) {
1407                         addSize = subSize;
1408                         subSize = b->keyStr[bn-i-1].size;
1409                     } else {
1410                         addSize = subSize = b->keyStr[bn-i-1].size;
1411                     }
1412                 }
1413                 int result = dbBtree::done;
1414                 if (i > 0) {
1415                     k = i;
1416                     assert(i < bn);
1417                     if (height != 1) {
1418                         memmove(&a->keyStr[i], a->keyStr, (an+1)*sizeof(str));
1419                         b->size -= b->keyStr[bn-k].size*sizeof(char_t);
1420                         k -= 1;
1421                         a->keyStr[k].oid = b->keyStr[bn].oid;
1422                         int len = keyStr[r-1].size;
1423                         a->keyStr[k].size = len;
1424                         a->size += len*sizeof(char_t);
1425                         a->keyStr[k].offs = (nat2)(sizeof(keyChar) - a->size);
1426                         memcpy(&a->keyChar[a->keyStr[k].offs],
1427                                &keyChar[keyStr[r-1].offs], len*sizeof(char_t));
1428                     } else {
1429                         memmove(&a->keyStr[i], a->keyStr, an*sizeof(str));
1430                     }
1431                     for (j = 0; j < k; j++) {
1432                         int len = b->keyStr[bn-k+j].size;
1433                         a->size += len*sizeof(char_t);
1434                         b->size -= len*sizeof(char_t);
1435                         a->keyStr[j].offs = (nat2)(sizeof(a->keyChar) - a->size);
1436                         a->keyStr[j].size = len;
1437                         a->keyStr[j].oid = b->keyStr[bn-k+j].oid;
1438                         memcpy(a->keyChar + a->keyStr[j].offs,
1439                                b->keyChar + b->keyStr[bn-k+j].offs, len*sizeof(char_t));
1440                     }
1441                     an += i;
1442                     a->nItems = an;
1443                     memcpy(rem.keyChar, b->keyChar + b->keyStr[bn-k-1].offs,
1444                            b->keyStr[bn-k-1].size*sizeof(char_t));
1445                     rem.keyLen = b->keyStr[bn-k-1].size;
1446                     result = replaceStrKey(db, r-1, rem, height);
1447                     b->compactify(db, -i);
1448                 }
1449                 assert(a->nItems > 0 && b->nItems > 0);
1450                 return result;
1451             } else { // merge page b to a
1452                 if (height != 1) {
1453                     memmove(a->keyStr + bn + 1, a->keyStr, (an+1)*sizeof(str));
1454                     a->size += (a->keyStr[bn].size = keyStr[r-1].size)*sizeof(char_t);
1455                     a->keyStr[bn].offs = (nat2)(sizeof(keyChar) - a->size);
1456                     a->keyStr[bn].oid = b->keyStr[bn].oid;
1457                     memcpy(&a->keyChar[a->keyStr[bn].offs],
1458                            &keyChar[keyStr[r-1].offs], keyStr[r-1].size*sizeof(char_t));
1459                     an += 1;
1460                 } else {
1461                     memmove(a->keyStr + bn, a->keyStr, an*sizeof(str));
1462                 }
1463                 for (int i = 0; i < bn; i++) {
1464                     a->keyStr[i] = b->keyStr[i];
1465                     a->keyStr[i].offs -= a->size;
1466                 }
1467                 an += bn;
1468                 a->nItems = an;
1469                 a->size += b->size;
1470                 memcpy(a->keyChar + sizeof(keyChar) - a->size,
1471                        b->keyChar + sizeof(keyChar) - b->size,
1472                        b->size);
1473                 db->pool.unfix(b);
1474                 db->freePage(keyStr[r-1].oid);
1475                 return removeStrKey(db, r-1);
1476             }
1477         }
1478     } else {
1479         dbBtreePage* a = (dbBtreePage*)db->put(tie, record[maxItems-r-1]);
1480         int an = a->nItems;
1481         if (r < int(nItems)) { // exists greater page
1482             dbBtreePage* b = (dbBtreePage*)db->get(record[maxItems-r-2]);
1483             int bn = b->nItems;
1484             assert(bn >= an);
1485             if (height != 1) {
1486                 memcpy(a->keyChar + an*sizeofType, keyChar + r*sizeofType,
1487                        sizeofType);
1488                 an += 1;
1489                 bn += 1;
1490             }
1491             size_t merged_size = (an+bn)*(sizeof(oid_t) + sizeofType);
1492             if (merged_size > sizeof(keyChar)) {
1493                 // reallocation of nodes between pages a and b
1494                 int i = bn - ((an + bn) >> 1);
1495                 dbPutTie tie;
1496                 db->pool.unfix(b);
1497                 b = (dbBtreePage*)db->put(tie, record[maxItems-r-2]);
1498                 memcpy(a->keyChar + an*sizeofType, b->keyChar, i*sizeofType);
1499                 memmove(b->keyChar, b->keyChar + i*sizeofType, (bn-i)*sizeofType);
1500                 memcpy(&a->record[maxItems-an-i], &b->record[maxItems-i],
1501                        i*sizeof(oid_t));
1502                 memmove(&b->record[maxItems-bn+i], &b->record[maxItems-bn],
1503                         (bn-i)*sizeof(oid_t));
1504                 memcpy(keyChar + r*sizeofType,  a->keyChar + (an+i-1)*sizeofType,
1505                        sizeofType);
1506                 b->nItems -= i;
1507                 a->nItems += i;
1508                 return dbBtree::done;
1509             } else { // merge page b to a
1510                 memcpy(a->keyChar + an*sizeofType, b->keyChar, bn*sizeofType);
1511                 memcpy(&a->record[maxItems-an-bn], &b->record[maxItems-bn],
1512                        bn*sizeof(oid_t));
1513                 db->pool.unfix(b);
1514                 db->freePage(record[maxItems-r-2]);
1515                 memmove(&record[maxItems-nItems], &record[maxItems-nItems-1],
1516                         (nItems - r - 1)*sizeof(oid_t));
1517                 memmove(keyChar + r*sizeofType, keyChar + (r+1)*sizeofType,
1518                        (nItems - r - 1)*sizeofType);
1519                 a->nItems += bn;
1520                 nItems -= 1;
1521                 return nItems == 0 || (nItems+1)*(sizeofType + sizeof(oid_t))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
1522                     ? dbBtree::underflow : dbBtree::done;
1523             }
1524         } else { // page b is before a
1525             dbBtreePage* b = (dbBtreePage*)db->get(record[maxItems-r]);
1526             int bn = b->nItems;
1527             assert(bn >= an);
1528             if (height != 1) {
1529                 an += 1;
1530                 bn += 1;
1531             }
1532             size_t merged_size = (an+bn)*(sizeof(oid_t) + sizeofType);
1533             if (merged_size > sizeof(keyChar)) {
1534                 // reallocation of nodes between pages a and b
1535                 int i = bn - ((an + bn) >> 1);
1536                 dbPutTie tie;
1537                 db->pool.unfix(b);
1538                 b = (dbBtreePage*)db->put(tie, record[maxItems-r]);
1539                 memmove(a->keyChar + i*sizeofType, a->keyChar, an*sizeofType);
1540                 memcpy(a->keyChar, b->keyChar + (bn-i)*sizeofType, i*sizeofType);
1541                 memmove(&a->record[maxItems-an-i], &a->record[maxItems-an],
1542                        an*sizeof(oid_t));
1543                 memcpy(&a->record[maxItems-i], &b->record[maxItems-bn],
1544                        i*sizeof(oid_t));
1545                 if (height != 1) {
1546                     memcpy(a->keyChar + (i-1)*sizeofType, keyChar + (r-1)*sizeofType,
1547                            sizeofType);
1548                 }
1549                 memcpy(keyChar + (r-1)*sizeofType, b->keyChar + (bn-i-1)*sizeofType,
1550                        sizeofType);
1551                 b->nItems -= i;
1552                 a->nItems += i;
1553                 return dbBtree::done;
1554             } else { // merge page b to a
1555                 memmove(a->keyChar + bn*sizeofType, a->keyChar, an*sizeofType);
1556                 memcpy(a->keyChar, b->keyChar, bn*sizeofType);
1557                 memmove(&a->record[maxItems-an-bn], &a->record[maxItems-an],
1558                        an*sizeof(oid_t));
1559                 memcpy(&a->record[maxItems-bn], &b->record[maxItems-bn],
1560                        bn*sizeof(oid_t));
1561                 if (height != 1) {
1562                     memcpy(a->keyChar + (bn-1)*sizeofType, keyChar + (r-1)*sizeofType,
1563                            sizeofType);
1564                 }
1565                 db->pool.unfix(b);
1566                 db->freePage(record[maxItems-r]);
1567                 record[maxItems-r] = record[maxItems-r-1];
1568                 a->nItems += bn;
1569                 nItems -= 1;
1570                 return nItems == 0 || (nItems+1)*(sizeofType + sizeof(oid_t))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
1571                     ? dbBtree::underflow : dbBtree::done;
1572             }
1573         }
1574     }
1575 }
1576 
1577 #define REMOVE(KEY,TYPE) {                                                        \
1578     TYPE key = rem.KEY;                                                           \
1579     while (l < r)  {                                                              \
1580         i = (l+r) >> 1;                                                           \
1581         if (key > pg->KEY[i]) l = i+1; else r = i;                                \
1582     }                                                                             \
1583     if (--height == 0) {                                                          \
1584         oid_t oid = rem.oid;                                                      \
1585         while (r < n) {                                                           \
1586             if (key == pg->KEY[r]) {                                              \
1587                 if (pg->record[maxItems-r-1] == oid) {                            \
1588                     db->pool.unfix(pg);                                           \
1589                     pg = (dbBtreePage*)db->put(tie, pageId);                      \
1590                     memmove(&pg->KEY[r], &pg->KEY[r+1], (n - r - 1)*sizeof(TYPE)); \
1591                     memmove(&pg->record[maxItems-n+1], &pg->record[maxItems-n],   \
1592                             (n - r - 1)*sizeof(oid_t));                           \
1593                     pg->nItems = --n;                                             \
1594                     return n == 0 || n*(sizeof(TYPE) + sizeof(oid_t))*100 < sizeof(pg->keyChar)*db->btreeUnderflowPercent \
1595                         ? dbBtree::underflow : dbBtree::done;                     \
1596                 }                                                                 \
1597             } else {                                                              \
1598                 break;                                                            \
1599             }                                                                     \
1600             r += 1;                                                               \
1601         }                                                                         \
1602         db->pool.unfix(pg);                                                       \
1603         return dbBtree::not_found;                                                \
1604     }                                                                             \
1605     break;                                                                        \
1606 }
1607 
remove(dbDatabase * db,oid_t pageId,int type,int sizeofType,dbUDTComparator comparator,item & rem,int height)1608 int dbBtreePage::remove(dbDatabase* db, oid_t pageId, int type, int sizeofType,
1609                         dbUDTComparator comparator, item& rem,  int height)
1610 {
1611     dbBtreePage* pg = (dbBtreePage*)db->get(pageId);
1612     dbPutTie tie;
1613     int i, n = pg->nItems, l = 0, r = n;
1614 
1615     switch (type) {
1616       case dbField::tpBool:
1617       case dbField::tpInt1:
1618         REMOVE(keyInt1, int1);
1619       case dbField::tpInt2:
1620         REMOVE(keyInt2, int2);
1621       case dbField::tpInt4:
1622         REMOVE(keyInt4, int4);
1623       case dbField::tpInt8:
1624         REMOVE(keyInt8, db_int8);
1625       case dbField::tpReference:
1626         REMOVE(keyOid, oid_t);
1627       case dbField::tpReal4:
1628         REMOVE(keyReal4, real4);
1629       case dbField::tpReal8:
1630         REMOVE(keyReal8, real8);
1631       case dbField::tpRawBinary:
1632       {
1633         char* key = (char*)rem.keyChar;
1634         while (l < r)  {
1635             i = (l+r) >> 1;
1636             if (comparator(key, pg->keyChar + i*sizeofType, sizeofType) > 0) l = i+1; else r = i;
1637         }
1638         if (--height == 0) {
1639             oid_t oid = rem.oid;
1640             while (r < n) {
1641                 if (memcmp(key, pg->keyChar + r*sizeofType, sizeofType) == 0) {
1642                     if (pg->record[maxItems-r-1] == oid) {
1643                         db->pool.unfix(pg);
1644                         pg = (dbBtreePage*)db->put(tie, pageId);
1645                         memmove(pg->keyChar + r*sizeofType, pg->keyChar + (r+1)*sizeofType,
1646                                (n - r - 1)*sizeofType);
1647                         memmove(&pg->record[maxItems-n+1], &pg->record[maxItems-n],
1648                                   (n - r - 1)*sizeof(oid_t));
1649                         pg->nItems = --n;
1650                         return n == 0 || n*(sizeofType + sizeof(oid_t))*100 < sizeof(pg->keyChar)*db->btreeUnderflowPercent
1651                             ? dbBtree::underflow : dbBtree::done;
1652                     }
1653                 } else {
1654                     break;
1655                 }
1656                 r += 1;
1657             }
1658             db->pool.unfix(pg);
1659             return dbBtree::not_found;
1660         }
1661         break;
1662       }
1663       case dbField::tpString:
1664       {
1665         char_t* key = rem.keyChar;
1666         while (l < r)  {
1667             i = (l+r) >> 1;
1668             if (SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[i].offs]) > 0) {
1669                 l = i+1;
1670             } else {
1671                 r = i;
1672             }
1673         }
1674         if (--height != 0) {
1675             do {
1676                 switch (remove(db, pg->keyStr[r].oid, type, sizeofType, comparator, rem, height)) {
1677                   case dbBtree::underflow:
1678                     db->pool.unfix(pg);
1679                     pg = (dbBtreePage*)db->put(tie, pageId);
1680                     return pg->handlePageUnderflow(db, r, type, sizeofType, rem, height);
1681                   case dbBtree::done:
1682                     db->pool.unfix(pg);
1683                     return dbBtree::done;
1684                   case dbBtree::overflow:
1685                     db->pool.unfix(pg);
1686                     pg = (dbBtreePage*)db->put(tie, pageId);
1687                     return pg->insertStrKey(db, r, rem, height);
1688                 }
1689             } while (++r <= n);
1690         } else {
1691             while (r < n) {
1692                 if (SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[r].offs]) == 0) {
1693                     if (pg->keyStr[r].oid == rem.oid) {
1694                         db->pool.unfix(pg);
1695                         pg = (dbBtreePage*)db->put(tie, pageId);
1696                         return pg->removeStrKey(db, r);
1697                     }
1698                 } else {
1699                     break;
1700                 }
1701                 r += 1;
1702             }
1703         }
1704         db->pool.unfix(pg);
1705         return dbBtree::not_found;
1706       }
1707       default:
1708         assert(false);
1709     }
1710     do {
1711         switch (remove(db, pg->record[maxItems-r-1], type, sizeofType, comparator, rem, height)) {
1712           case dbBtree::underflow:
1713             db->pool.unfix(pg);
1714             pg = (dbBtreePage*)db->put(tie, pageId);
1715             return pg->handlePageUnderflow(db, r, type, sizeofType, rem, height);
1716           case dbBtree::done:
1717             db->pool.unfix(pg);
1718             return dbBtree::done;
1719         }
1720     } while (++r <= n);
1721 
1722     db->pool.unfix(pg);
1723     return dbBtree::not_found;
1724 }
1725 
1726 
purge(dbDatabase * db,oid_t pageId,int type,int height)1727 void dbBtreePage::purge(dbDatabase* db, oid_t pageId, int type, int height)
1728 {
1729     if (--height != 0) {
1730         dbBtreePage* pg = (dbBtreePage*)db->get(pageId);
1731         int n = pg->nItems+1;
1732         if (type == dbField::tpString) { // page of strings
1733             while (--n >= 0) {
1734                 purge(db, pg->keyStr[n].oid, type, height);
1735             }
1736         } else {
1737             while (--n >= 0) {
1738                 purge(db, pg->record[maxItems-n-1], type, height);
1739             }
1740         }
1741         db->pool.unfix(pg);
1742     }
1743     db->freePage(pageId);
1744 }
1745 
traverseForward(dbDatabase * db,dbAnyCursor * cursor,dbExprNode * condition,int type,int height)1746 bool dbBtreePage::traverseForward(dbDatabase* db, dbAnyCursor* cursor,
1747                                   dbExprNode* condition, int type, int height)
1748 {
1749     int n = nItems;
1750     if (--height != 0) {
1751         if (type == dbField::tpString) { // page of strings
1752             for (int i = 0; i <= n; i++) {
1753                 dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[i].oid);
1754                 if (!pg->traverseForward(db, cursor, condition, type, height)) {
1755                     db->pool.unfix(pg);
1756                     return false;
1757                 }
1758                 db->pool.unfix(pg);
1759             }
1760         } else {
1761             for (int i = 0; i <= n; i++) {
1762                 dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-i-1]);
1763                 if (!pg->traverseForward(db, cursor, condition, type, height)) {
1764                     db->pool.unfix(pg);
1765                     return false;
1766                 }
1767                 db->pool.unfix(pg);
1768             }
1769         }
1770     } else {
1771         if (type != dbField::tpString) { // page of scalars
1772             if (condition == NULL) {
1773                 for (int i = 0; i < n; i++) {
1774                     if (!cursor->add(record[maxItems-1-i])) {
1775                         return false;
1776                     }
1777                 }
1778             } else {
1779                 dbTableDescriptor* table = cursor->table;
1780                 for (int i = 0; i < n; i++) {
1781                     if (db->evaluateBoolean(condition, record[maxItems-1-i], table, cursor)) {
1782                         if (!cursor->add(record[maxItems-1-i])) {
1783                             return false;
1784                         }
1785                     }
1786                 }
1787             }
1788         } else { // page of strings
1789             if (condition == NULL) {
1790                 for (int i = 0; i < n; i++) {
1791                     if (!cursor->add(keyStr[i].oid)) {
1792                         return false;
1793                     }
1794                 }
1795             } else {
1796                 dbTableDescriptor* table = cursor->table;
1797                 for (int i = 0; i < n; i++) {
1798                     if (db->evaluateBoolean(condition, keyStr[i].oid, table, cursor)) {
1799                         if (!cursor->add(keyStr[i].oid)) {
1800                             return false;
1801                         }
1802                     }
1803                 }
1804             }
1805         }
1806     }
1807     return true;
1808 }
1809 
1810 
traverseBackward(dbDatabase * db,dbAnyCursor * cursor,dbExprNode * condition,int type,int height)1811 bool dbBtreePage::traverseBackward(dbDatabase* db, dbAnyCursor* cursor,
1812                                    dbExprNode* condition, int type, int height)
1813 {
1814     int n = nItems;
1815     if (--height != 0) {
1816         if (type == dbField::tpString) { // page of strings
1817             do {
1818                 dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[n].oid);
1819                 if (!pg->traverseBackward(db, cursor, condition, type, height)) {
1820                     db->pool.unfix(pg);
1821                     return false;
1822                 }
1823                 db->pool.unfix(pg);
1824             } while (--n >= 0);
1825         } else {
1826             do {
1827                 dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-n-1]);
1828                 if (!pg->traverseBackward(db, cursor, condition, type, height)) {
1829                     db->pool.unfix(pg);
1830                     return false;
1831                 }
1832                 db->pool.unfix(pg);
1833             } while (--n >= 0);
1834         }
1835     } else {
1836         if (type != dbField::tpString) { // page of scalars
1837             if (condition == NULL) {
1838                 while (--n >= 0) {
1839                     if (!cursor->add(record[maxItems-1-n])) {
1840                         return false;
1841                     }
1842                 }
1843             } else {
1844                 dbTableDescriptor* table = cursor->table;
1845                 while (--n >= 0) {
1846                     if (db->evaluateBoolean(condition, record[maxItems-1-n], table, cursor)) {
1847                         if (!cursor->add(record[maxItems-1-n])) {
1848                             return false;
1849                         }
1850                     }
1851                 }
1852             }
1853         } else { // page of strings
1854             if (condition == NULL) {
1855                 while (--n >= 0) {
1856                     if (!cursor->add(keyStr[n].oid)) {
1857                         return false;
1858                     }
1859                 }
1860             } else {
1861                 dbTableDescriptor* table = cursor->table;
1862                 while (--n >= 0) {
1863                     if (db->evaluateBoolean(condition, keyStr[n].oid, table, cursor)) {
1864                         if (!cursor->add(keyStr[n].oid)) {
1865                             return false;
1866                         }
1867                     }
1868                 }
1869             }
1870         }
1871     }
1872     return true;
1873 }
1874 
1875 //
1876 // Btree optimized for handling duplicates
1877 //
1878 
1879 #undef FIND
1880 #define FIND(KEY, TYPE)                                                           \
1881         if (sc.firstKey != NULL) {                                                \
1882             TYPE key = CAST(TYPE, sc.firstKey);                                   \
1883             while (l < r)  {                                                      \
1884                  int i = (l+r) >> 1;                                              \
1885                  if (CHECK(key, EXTRACT(KEY,i), firstKeyInclusion)) {             \
1886                      l = i+1;                                                     \
1887                  } else {                                                         \
1888                      r = i;                                                       \
1889                  }                                                                \
1890             }                                                                     \
1891             assert(r == l);                                                       \
1892         }                                                                         \
1893         if (sc.lastKey != NULL) {                                                 \
1894             TYPE key = CAST(TYPE, sc.lastKey);                                    \
1895             if (height == 0) {                                                    \
1896                 while (l < n) {                                                   \
1897                     if (CHECK(EXTRACT(KEY,l), key, lastKeyInclusion)) {           \
1898                         return false;                                             \
1899                     }                                                             \
1900                     if (!sc.condition                                             \
1901                         || db->evaluateBoolean(sc.condition, ref[maxItems-1-l].oid,\
1902                                         table, sc.cursor))                        \
1903                     {                                                             \
1904                         if (!sc.cursor->add(ref[maxItems-1-l].oid)) {              \
1905                             return false;                                         \
1906                         }                                                         \
1907                     }                                                             \
1908                     l += 1;                                                       \
1909                 }                                                                 \
1910                 return true;                                                      \
1911             } else {                                                              \
1912                 do {                                                              \
1913                     dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-1-l].oid);\
1914                     if (!pg->find(db, sc, type, sizeofType, comparator, height)) {\
1915                         db->pool.unfix(pg);                                       \
1916                         return false;                                             \
1917                     }                                                             \
1918                     db->pool.unfix(pg);                                           \
1919                     if (l == n) {                                                 \
1920                         return true;                                              \
1921                     }                                                             \
1922                 } while(LESS_OR_EQUAL(EXTRACT(KEY,l++), key));                    \
1923                 return false;                                                     \
1924             }                                                                     \
1925         }                                                                         \
1926         break
1927 
1928 #undef FIND_REVERSE
1929 #define FIND_REVERSE(KEY, TYPE)                                                   \
1930         if (sc.lastKey != NULL) {                                                 \
1931             TYPE key = CAST(TYPE, sc.lastKey);                                    \
1932             while (l < r)  {                                                      \
1933                  int i = (l+r) >> 1;                                              \
1934                  if (CHECK(key, EXTRACT(KEY,i), !lastKeyInclusion)) {             \
1935                      l = i+1;                                                     \
1936                  } else {                                                         \
1937                      r = i;                                                       \
1938                  }                                                                \
1939             }                                                                     \
1940             assert(r == l);                                                       \
1941         }                                                                         \
1942         if (sc.firstKey != NULL) {                                                \
1943             TYPE key = CAST(TYPE, sc.firstKey);                                   \
1944             if (height == 0) {                                                    \
1945                 while (--r >= 0) {                                                \
1946                     if (CHECK(key, EXTRACT(KEY,r), firstKeyInclusion)) {          \
1947                         return false;                                             \
1948                     }                                                             \
1949                     if (!sc.condition                                             \
1950                         || db->evaluateBoolean(sc.condition, ref[maxItems-1-r].oid,\
1951                                         table, sc.cursor))                        \
1952                     {                                                             \
1953                         if (!sc.cursor->add(ref[maxItems-1-r].oid)) {              \
1954                             return false;                                         \
1955                         }                                                         \
1956                     }                                                             \
1957                 }                                                                 \
1958                 return true;                                                      \
1959             } else {                                                              \
1960                 do {                                                              \
1961                     dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-1-r].oid);\
1962                     if (!pg->find(db, sc, type, sizeofType, comparator, height)) {\
1963                         db->pool.unfix(pg);                                       \
1964                         return false;                                             \
1965                     }                                                             \
1966                     db->pool.unfix(pg);                                           \
1967                 } while(--r >= 0 && !CHECK(key, EXTRACT(KEY,r), firstKeyInclusion)); \
1968                 return r < 0;                                                     \
1969             }                                                                     \
1970         }                                                                         \
1971         break
1972 
find(dbDatabase * db,dbSearchContext & sc,int type,int sizeofType,dbUDTComparator comparator,int height)1973 bool dbThickBtreePage::find(dbDatabase* db, dbSearchContext& sc, int type, int sizeofType,
1974                        dbUDTComparator comparator, int height)
1975 {
1976     int l = 0, n = nItems, r = n;
1977     int diff;
1978     dbTableDescriptor* table = sc.cursor->table;
1979     sc.probes += 1;
1980     height -= 1;
1981     int firstKeyInclusion = sc.firstKeyInclusion;
1982     int lastKeyInclusion = sc.lastKeyInclusion;
1983 
1984     if (sc.ascent) {
1985 #undef CHECK
1986 #undef EXTRACT
1987 #undef CAST
1988 #undef LESS_OR_EQUAL
1989 #define CHECK(a, b, inclusion) (a > b || (a == b && !inclusion))
1990 #define EXTRACT(array, index) array[index]
1991 #define CAST(TYPE, expr) *(TYPE*)(expr)
1992 #define LESS_OR_EQUAL(a, b) (a <= b)
1993         switch (type) {
1994           case dbField::tpBool:
1995           case dbField::tpInt1:
1996             FIND(keyInt1, int1);
1997           case dbField::tpInt2:
1998             FIND(keyInt2, int2);
1999           case dbField::tpInt4:
2000             FIND(keyInt4, int4);
2001           case dbField::tpInt8:
2002             FIND(keyInt8, db_int8);
2003           case dbField::tpReference:
2004             FIND(keyOid, oid_t);
2005           case dbField::tpReal4:
2006             FIND(keyReal4, real4);
2007           case dbField::tpReal8:
2008             FIND(keyReal8, real8);
2009 
2010           case dbField::tpRawBinary:
2011 #undef CHECK
2012 #undef EXTRACT
2013 #undef CAST
2014 #undef LESS_OR_EQUAL
2015 #define CHECK(a, b, inclusion) ((diff = comparator(a, b, sizeofType)) > 0 || (diff == 0 && !inclusion))
2016 #define EXTRACT(array, index) (array + (index)*sizeofType)
2017 #define CAST(TYPE, expr) (TYPE)(expr)
2018 #define LESS_OR_EQUAL(a, b) (comparator(a, b, sizeofType) <= 0)
2019             FIND(keyChar, void*);
2020 
2021           case dbField::tpString:
2022             if (sc.firstKey != NULL) {
2023                 char_t* firstKey = sc.firstKey;
2024                 while (l < r)  {
2025                     int i = (l+r) >> 1;
2026                     if (SCMP(firstKey, (char_t*)&keyChar[keyStr[i].offs])
2027                         >= sc.firstKeyInclusion)
2028                     {
2029                         l = i + 1;
2030                     } else {
2031                         r = i;
2032                     }
2033                 }
2034                 assert(r == l);
2035             }
2036             if (sc.lastKey != NULL) {
2037                 char_t* lastKey = sc.lastKey;
2038                 if (height == 0) {
2039                     while (l < n) {
2040                         if (SCMP((char_t*)&keyChar[keyStr[l].offs],
2041                                    lastKey) >= sc.lastKeyInclusion)
2042                         {
2043                             return false;
2044                         }
2045                         if (!sc.condition
2046                             || db->evaluateBoolean(sc.condition, keyStr[l].oid, table, sc.cursor))
2047                         {
2048                             if (!sc.cursor->add(keyStr[l].oid)) {
2049                                 return false;
2050                             }
2051                         }
2052                         l += 1;
2053                     }
2054                 } else {
2055                     do {
2056                         dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[l].oid);
2057                         if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2058                             db->pool.unfix(pg);
2059                             return false;
2060                         }
2061                         db->pool.unfix(pg);
2062                         if (l == n) {
2063                             return true;
2064                         }
2065                     } while (SCMP((char_t*)&keyChar[keyStr[l++].offs], lastKey) <= 0);
2066                     return false;
2067                 }
2068             } else {
2069                 if (height == 0) {
2070                     if (sc.condition) {
2071                         while (l < n) {
2072                             if (db->evaluateBoolean(sc.condition, keyStr[l].oid, table, sc.cursor)) {
2073                                 if (!sc.cursor->add(keyStr[l].oid)) {
2074                                     return false;
2075                                 }
2076                             }
2077                             l += 1;
2078                         }
2079                     } else {
2080                         while (l < n) {
2081                             if (!sc.cursor->add(keyStr[l].oid)) {
2082                                 return false;
2083                             }
2084                             l += 1;
2085                         }
2086                     }
2087                 } else {
2088                     do {
2089                         dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[l].oid);
2090                         if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2091                             db->pool.unfix(pg);
2092                             return false;
2093                         }
2094                         db->pool.unfix(pg);
2095                     } while (++l <= n);
2096                 }
2097             }
2098             return true;
2099           default:
2100             assert(false);
2101         }
2102         if (height == 0) {
2103             if (sc.condition) {
2104                 while (l < n) {
2105                     if (db->evaluateBoolean(sc.condition, ref[maxItems-1-l].oid, table, sc.cursor)) {
2106                         if (!sc.cursor->add(ref[maxItems-1-l].oid)) {
2107                             return false;
2108                         }
2109                     }
2110                     l += 1;
2111                 }
2112             } else {
2113                 while (l < n) {
2114                     if (!sc.cursor->add(ref[maxItems-1-l].oid)) {
2115                         return false;
2116                     }
2117                     l += 1;
2118                 }
2119             }
2120         } else {
2121             do {
2122                 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-1-l].oid);
2123                 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2124                     db->pool.unfix(pg);
2125                     return false;
2126                 }
2127                 db->pool.unfix(pg);
2128             } while (++l <= n);
2129         }
2130     } else { // descent ordering
2131 #undef CHECK
2132 #undef EXTRACT
2133 #undef CAST
2134 #undef LESS_OR_EQUAL
2135 #define CHECK(a, b, inclusion) (a > b || (a == b && !inclusion))
2136 #define EXTRACT(array, index) array[index]
2137 #define CAST(TYPE, expr) *(TYPE*)(expr)
2138 #define LESS_OR_EQUAL(a, b) (a <= b)
2139         switch (type) {
2140           case dbField::tpBool:
2141           case dbField::tpInt1:
2142             FIND_REVERSE(keyInt1, int1);
2143           case dbField::tpInt2:
2144             FIND_REVERSE(keyInt2, int2);
2145           case dbField::tpInt4:
2146             FIND_REVERSE(keyInt4, int4);
2147           case dbField::tpInt8:
2148             FIND_REVERSE(keyInt8, db_int8);
2149           case dbField::tpReference:
2150             FIND_REVERSE(keyOid, oid_t);
2151           case dbField::tpReal4:
2152             FIND_REVERSE(keyReal4, real4);
2153           case dbField::tpReal8:
2154             FIND_REVERSE(keyReal8, real8);
2155 
2156           case dbField::tpRawBinary:
2157 #undef CHECK
2158 #undef EXTRACT
2159 #undef CAST
2160 #undef LESS_OR_EQUAL
2161 #define CHECK(a, b, inclusion) ((diff = comparator(a, b, sizeofType)) > 0 || (diff == 0 && !inclusion))
2162 #define EXTRACT(array, index) (array + (index)*sizeofType)
2163 #define CAST(TYPE, expr) (TYPE)(expr)
2164 #define LESS_OR_EQUAL(a, b) (comparator(a, b, sizeofType) <= 0)
2165             FIND_REVERSE(keyChar, void*);
2166 
2167 
2168           case dbField::tpString:
2169             if (sc.lastKey != NULL) {
2170                 char_t* key = sc.lastKey;
2171                 while (l < r)  {
2172                     int i = (l+r) >> 1;
2173                     if (SCMP(key, (char_t*)&keyChar[keyStr[i].offs]) >= 1-lastKeyInclusion) {
2174                         l = i + 1;
2175                     } else {
2176                         r = i;
2177                     }
2178                 }
2179                 assert(r == l);
2180             }
2181             if (sc.firstKey != NULL) {
2182                 char_t* key = sc.firstKey;
2183                 if (height == 0) {
2184                     while (--r >= 0) {
2185                         if (SCMP(key, (char_t*)&keyChar[keyStr[r].offs]) >= firstKeyInclusion) {
2186                             return false;
2187                         }
2188                         if (!sc.condition
2189                             || db->evaluateBoolean(sc.condition, keyStr[r].oid, table, sc.cursor))
2190                         {
2191                             if (!sc.cursor->add(keyStr[r].oid)) {
2192                                 return false;
2193                             }
2194                         }
2195                     }
2196                 } else {
2197                     do {
2198                         dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[r].oid);
2199                         if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2200                             db->pool.unfix(pg);
2201                             return false;
2202                         }
2203                         db->pool.unfix(pg);
2204                     } while (--r >= 0 && SCMP(key, (char_t*)&keyChar[keyStr[r].offs]) < firstKeyInclusion);
2205                     return r < 0;
2206                 }
2207             } else {
2208                 if (height == 0) {
2209                     if (sc.condition) {
2210                         while (--r >= 0) {
2211                             if (db->evaluateBoolean(sc.condition, keyStr[r].oid, table, sc.cursor)) {
2212                                 if (!sc.cursor->add(keyStr[r].oid)) {
2213                                     return false;
2214                                 }
2215                             }
2216                         }
2217                     } else {
2218                         while (--r >= 0) {
2219                             if (!sc.cursor->add(keyStr[r].oid)) {
2220                                 return false;
2221                             }
2222                         }
2223                     }
2224                 } else {
2225                     do {
2226                         dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[r].oid);
2227                         if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2228                             db->pool.unfix(pg);
2229                             return false;
2230                         }
2231                         db->pool.unfix(pg);
2232                     } while (--r >= 0);
2233                 }
2234             }
2235             return true;
2236           default:
2237             assert(false);
2238         }
2239         if (height == 0) {
2240             if (sc.condition) {
2241                 while (--r >= 0) {
2242                     if (db->evaluateBoolean(sc.condition, ref[maxItems-1-r].oid, table, sc.cursor)) {
2243                         if (!sc.cursor->add(ref[maxItems-1-r].oid)) {
2244                             return false;
2245                         }
2246                     }
2247                 }
2248             } else {
2249                 while (--r >= 0) {
2250                     if (!sc.cursor->add(ref[maxItems-1-r].oid)) {
2251                         return false;
2252                     }
2253                 }
2254             }
2255         } else {
2256             do {
2257                 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-1-r].oid);
2258                 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2259                     db->pool.unfix(pg);
2260                     return false;
2261                 }
2262                 db->pool.unfix(pg);
2263             } while (--r >= 0);
2264         }
2265     }
2266     return true;
2267 }
2268 
2269 
allocate(dbDatabase * db,oid_t root,int type,int sizeofType,item & ins)2270 oid_t dbThickBtreePage::allocate(dbDatabase* db, oid_t root, int type, int sizeofType, item& ins)
2271 {
2272     oid_t pageId = db->allocatePage();
2273     dbThickBtreePage* page = (dbThickBtreePage*)db->put(pageId);
2274     page->nItems = 1;
2275     if (type == dbField::tpString) {
2276         page->size = ins.keyLen*sizeof(char_t);
2277         page->keyStr[0].offs = (nat2)(sizeof(page->keyChar) - ins.keyLen*sizeof(char_t));
2278         page->keyStr[0].size = ins.keyLen;
2279         page->keyStr[0].oid = ins.oid;
2280         page->keyStr[0].recId = ins.recId;
2281         page->keyStr[1].oid = root;
2282         memcpy(&page->keyChar[page->keyStr[0].offs], ins.keyChar, ins.keyLen*sizeof(char_t));
2283     } else if (type == dbField::tpRawBinary) {
2284         memcpy(page->keyChar, ins.keyChar, sizeofType);
2285         page->ref[maxItems-1].oid = ins.oid;
2286         page->ref[maxItems-1].recId = ins.recId;
2287         page->ref[maxItems-2].oid = root;
2288     } else {
2289         memcpy(page->keyChar, ins.keyChar, keySize[type]);
2290         page->ref[maxItems-1].oid = ins.oid;
2291         page->ref[maxItems-1].recId = ins.recId;
2292         page->ref[maxItems-2].oid = root;
2293     }
2294     db->pool.unfix(page);
2295     return pageId;
2296 }
2297 
2298 
2299 
2300 #undef INSERT
2301 #define INSERT(KEY, TYPE) {                                                 \
2302     TYPE key = ins.KEY;                                                     \
2303     while (l < r)  {                                                        \
2304         int i = (l+r) >> 1;                                                 \
2305         if (key > pg->KEY[i] || (key == pg->KEY[i] && ins.recId > pg->ref[maxItems-i-1].recId)) { \
2306             l = i+1;                                                        \
2307         } else {                                                            \
2308             r = i;                                                          \
2309         }                                                                   \
2310     }                                                                       \
2311     assert(l == r);                                                         \
2312     /* insert before e[r] */                                                \
2313     if (--height != 0) {                                                    \
2314         result = insert(db, pg->ref[maxItems-r-1].oid, type, sizeofType, comparator, ins, height); \
2315         assert(result == dbBtree::done || result == dbBtree::overflow);     \
2316         if (result == dbBtree::done) {                                      \
2317             db->pool.unfix(pg);                                             \
2318             return result;                                                  \
2319         }                                                                   \
2320         n += 1;                                                             \
2321     }                                                                       \
2322     db->pool.unfix(pg);                                                     \
2323     pg = (dbThickBtreePage*)db->put(tie, pageId);                           \
2324     const int max = sizeof(pg->KEY) / (sizeof(reference) + sizeof(TYPE));   \
2325     if (n < max) {                                                          \
2326         memmove(&pg->KEY[r+1], &pg->KEY[r], (n - r)*sizeof(TYPE));          \
2327         memmove(&pg->ref[maxItems-n-1], &pg->ref[maxItems-n],                \
2328                (n-r)*sizeof(reference));                                    \
2329         pg->KEY[r] = ins.KEY;                                               \
2330         pg->ref[maxItems-r-1].oid = ins.oid;                                \
2331         pg->ref[maxItems-r-1].recId = ins.recId;                            \
2332         pg->nItems += 1;                                                    \
2333         return dbBtree::done;                                               \
2334     } else { /* page is full then divide page */                            \
2335         oid_t pageId = db->allocatePage();                                  \
2336         dbThickBtreePage* b = (dbThickBtreePage*)db->put(pageId);           \
2337         assert(n == max);                                                   \
2338         const int m = (max+1)/2;                                        \
2339         if (r < m) {                                                        \
2340             memcpy(b->KEY, pg->KEY, r*sizeof(TYPE));                        \
2341             b->KEY[r] = ins.KEY;                                            \
2342             memcpy(&b->KEY[r+1], &pg->KEY[r], (m-r-1)*sizeof(TYPE));        \
2343             memmove(pg->KEY, &pg->KEY[m-1], (max-m+1)*sizeof(TYPE));         \
2344             memcpy(&b->ref[maxItems-r], &pg->ref[maxItems-r],               \
2345                    r*sizeof(reference));                                    \
2346             b->ref[maxItems-r-1].oid = ins.oid;                             \
2347             b->ref[maxItems-r-1].recId = ins.recId;                         \
2348             memcpy(&b->ref[maxItems-m], &pg->ref[maxItems-m+1],             \
2349                    (m-r-1)*sizeof(reference));                              \
2350             memmove(&pg->ref[maxItems-max+m-1], &pg->ref[maxItems-max],     \
2351                     (max-m+1)*sizeof(reference));                           \
2352         } else {                                                            \
2353             memcpy(b->KEY, pg->KEY, m*sizeof(TYPE));                        \
2354             memmove(pg->KEY, &pg->KEY[m], (r-m)*sizeof(TYPE));               \
2355             pg->KEY[r-m] = ins.KEY;                                         \
2356             memmove(&pg->KEY[r-m+1], &pg->KEY[r], (max-r)*sizeof(TYPE));     \
2357             memcpy(&b->ref[maxItems-m], &pg->ref[maxItems-m],               \
2358                    m*sizeof(reference));                                    \
2359             memmove(&pg->ref[maxItems-r+m], &pg->ref[maxItems-r],           \
2360                     (r-m)*sizeof(reference));                               \
2361             pg->ref[maxItems-r+m-1].oid = ins.oid;                          \
2362             pg->ref[maxItems-r+m-1].recId = ins.recId;                      \
2363             memmove(&pg->ref[maxItems-max+m-1], &pg->ref[maxItems-max],     \
2364                     (max-r)*sizeof(reference));                             \
2365         }                                                                   \
2366         ins.oid = pageId;                                                   \
2367         ins.recId = b->ref[maxItems-m].recId;                               \
2368         ins.KEY = b->KEY[m-1];                                              \
2369         if (height == 0) {                                                  \
2370             pg->nItems = max - m + 1;                                       \
2371             b->nItems = m;                                                  \
2372         } else {                                                            \
2373             pg->nItems = max - m;                                           \
2374             b->nItems = m - 1;                                              \
2375         }                                                                   \
2376         db->pool.unfix(b);                                                  \
2377         return dbBtree::overflow;                                           \
2378     }                                                                       \
2379 }
2380 
2381 
insert(dbDatabase * db,oid_t pageId,int type,int sizeofType,dbUDTComparator comparator,item & ins,int height)2382 int dbThickBtreePage::insert(dbDatabase* db, oid_t pageId, int type, int sizeofType,
2383                              dbUDTComparator comparator, item& ins, int height)
2384 {
2385     dbPutTie tie;
2386     dbThickBtreePage* pg = (dbThickBtreePage*)db->get(pageId);
2387     int result;
2388     int l = 0, n = pg->nItems, r = n;
2389 
2390     switch (type) {
2391       case dbField::tpBool:
2392       case dbField::tpInt1:
2393         INSERT(keyInt1, int1);
2394       case dbField::tpInt2:
2395         INSERT(keyInt2, int2);
2396       case dbField::tpInt4:
2397         INSERT(keyInt4, int4);
2398       case dbField::tpInt8:
2399         INSERT(keyInt8, db_int8);
2400       case dbField::tpReference:
2401         INSERT(keyOid, oid_t);
2402       case dbField::tpReal4:
2403         INSERT(keyReal4, real4);
2404       case dbField::tpReal8:
2405         INSERT(keyReal8, real8);
2406       case dbField::tpRawBinary:
2407       {
2408          char* key = (char*)ins.keyChar;
2409          while (l < r)  {
2410              int i = (l+r) >> 1;
2411              int diff = comparator(key, pg->keyChar + i*sizeofType, sizeofType);
2412              if (diff > 0 || (diff == 0 && ins.recId > pg->ref[maxItems-i-1].recId)) {
2413                  l = i+1;
2414              } else {
2415                  r = i;
2416              }
2417          }
2418          assert(l == r);
2419          /* insert before e[r] */
2420          if (--height != 0) {
2421              result = insert(db, pg->ref[maxItems-r-1].oid, type, sizeofType, comparator, ins, height);
2422              assert(result == dbBtree::done || result == dbBtree::overflow);
2423              if (result == dbBtree::done) {
2424                  db->pool.unfix(pg);
2425                  return result;
2426              }
2427              n += 1;
2428          }
2429          db->pool.unfix(pg);
2430          pg = (dbThickBtreePage*)db->put(tie, pageId);
2431          const int max = sizeof(pg->keyChar) / (sizeof(reference) + sizeofType);
2432          if (n < max) {
2433              memmove(pg->keyChar + (r+1)*sizeofType, pg->keyChar + r*sizeofType, (n - r)*sizeofType);
2434              memmove(&pg->ref[maxItems-n-1], &pg->ref[maxItems-n], (n-r)*sizeof(reference));
2435              memcpy(pg->keyChar + r*sizeofType, ins.keyChar, sizeofType);
2436              pg->ref[maxItems-r-1].oid = ins.oid;
2437              pg->ref[maxItems-r-1].recId = ins.recId;
2438              pg->nItems += 1;
2439              return dbBtree::done;
2440          } else { /* page is full then divide page */
2441              oid_t pageId = db->allocatePage();
2442              dbThickBtreePage* b = (dbThickBtreePage*)db->put(pageId);
2443              assert(n == max);
2444              const int m = (max+1)/2;
2445              if (r < m) {
2446                  memcpy(b->keyChar, pg->keyChar, r*sizeofType);
2447                  memcpy(b->keyChar + r*sizeofType, ins.keyChar, sizeofType);
2448                  memcpy(b->keyChar + (r+1)*sizeofType, pg->keyChar + r*sizeofType, (m-r-1)*sizeofType);
2449                  memmove(pg->keyChar, pg->keyChar + (m-1)*sizeofType, (max-m+1)*sizeofType);
2450                  memcpy(&b->ref[maxItems-r], &pg->ref[maxItems-r], r*sizeof(reference));
2451                  b->ref[maxItems-r-1].oid = ins.oid;
2452                  b->ref[maxItems-r-1].recId = ins.recId;
2453                  memcpy(&b->ref[maxItems-m], &pg->ref[maxItems-m+1], (m-r-1)*sizeof(reference));
2454                  memmove(&pg->ref[maxItems-max+m-1], &pg->ref[maxItems-max], (max-m+1)*sizeof(reference));
2455              } else {
2456                  memcpy(b->keyChar, pg->keyChar, m*sizeofType);
2457                  memmove(pg->keyChar, pg->keyChar + m*sizeofType, (r-m)*sizeofType);
2458                  memcpy(pg->keyChar + (r-m)*sizeofType, ins.keyChar, sizeofType);
2459                  memmove(pg->keyChar + (r-m+1)*sizeofType, pg->keyChar + r*sizeofType,
2460                         (max-r)*sizeofType);
2461                  memcpy(&b->ref[maxItems-m], &pg->ref[maxItems-m], m*sizeof(reference));
2462                  memmove(&pg->ref[maxItems-r+m], &pg->ref[maxItems-r], (r-m)*sizeof(reference));
2463                  pg->ref[maxItems-r+m-1].oid = ins.oid;
2464                  pg->ref[maxItems-r+m-1].recId = ins.recId;
2465                  memmove(&pg->ref[maxItems-max+m-1], &pg->ref[maxItems-max], (max-r)*sizeof(reference));
2466              }
2467              ins.oid = pageId;
2468              ins.recId = b->ref[maxItems-m].recId;
2469              memcpy(ins.keyChar, b->keyChar + (m-1)*sizeofType, sizeofType);
2470              if (height == 0) {
2471                  pg->nItems = max - m + 1;
2472                  b->nItems = m;
2473              } else {
2474                  pg->nItems = max - m;
2475                  b->nItems = m - 1;
2476              }
2477              db->pool.unfix(b);
2478              return dbBtree::overflow;
2479          }
2480       }
2481       case dbField::tpString:
2482       {
2483         char_t* key = ins.keyChar;
2484         while (l < r)  {
2485             int i = (l+r) >> 1;
2486             int diff = SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[i].offs]);
2487             if (diff > 0 || (diff == 0 && ins.recId > pg->keyStr[i].recId)) {
2488                 l = i+1;
2489             } else {
2490                 r = i;
2491             }
2492         }
2493         if (--height != 0) {
2494             result = insert(db, pg->keyStr[r].oid, type, sizeofType, comparator, ins, height);
2495             assert (result != dbBtree::not_found);
2496             if (result != dbBtree::overflow) {
2497                 db->pool.unfix(pg);
2498                 return result;
2499             }
2500         }
2501         db->pool.unfix(pg);
2502         pg = (dbThickBtreePage*)db->put(tie, pageId);
2503         return pg->insertStrKey(db, r, ins, height);
2504       }
2505       default:
2506         assert(false);
2507     }
2508     return dbBtree::done;
2509 }
2510 
insertStrKey(dbDatabase * db,int r,item & ins,int height)2511 int dbThickBtreePage::insertStrKey(dbDatabase* db, int r, item& ins, int height)
2512 {
2513     int n = height != 0 ? nItems+1 : nItems;
2514     // insert before e[r]
2515     int len = ins.keyLen;
2516     if (size + len*sizeof(char_t) + (n+1)*sizeof(str) <= sizeof(keyChar)) {
2517         memmove(&keyStr[r+1], &keyStr[r], (n-r)*sizeof(str));
2518         size += len*sizeof(char_t);
2519         keyStr[r].offs = (nat2)(sizeof(keyChar) - size);
2520         keyStr[r].size = len;
2521         keyStr[r].oid = ins.oid;
2522         keyStr[r].recId = ins.recId;
2523         memcpy(&keyChar[sizeof(keyChar) - size], ins.keyChar, len*sizeof(char_t));
2524         nItems += 1;
2525     } else { // page is full then divide page
2526         oid_t pageId = db->allocatePage();
2527         dbThickBtreePage* b = (dbThickBtreePage*)db->put(pageId);
2528         size_t moved = 0;
2529         size_t inserted = len*sizeof(char_t) + sizeof(str);
2530         long prevDelta = (1L << (sizeof(long)*8-1)) + 1;
2531         for (int bn = 0, i = 0; ; bn += 1) {
2532             size_t addSize, subSize;
2533             int j = nItems - i - 1;
2534             size_t keyLen = keyStr[i].size;
2535             if (bn == r) {
2536                 keyLen = len;
2537                 inserted = 0;
2538                 addSize = len;
2539                 if (height == 0) {
2540                     subSize = 0;
2541                     j += 1;
2542                 } else {
2543                     subSize = keyStr[i].size;
2544                 }
2545             } else {
2546                 addSize = subSize = keyLen;
2547                 if (height != 0) {
2548                     if (i + 1 != r) {
2549                         subSize += keyStr[i+1].size;
2550                         j -= 1;
2551                     } else {
2552                         inserted = 0;
2553                     }
2554                 }
2555             }
2556             long delta = (long)((moved + addSize*sizeof(char_t) + (bn+1)*sizeof(str))
2557                 - (j*sizeof(str) + size - subSize*sizeof(char_t) + inserted));
2558             if (delta >= -prevDelta) {
2559                 char_t insKey[dbThickBtreePage::dbMaxKeyLen];
2560                 oid_t insRecId = 0;
2561                 if (bn <= r) {
2562                     memcpy(insKey, ins.keyChar, len*sizeof(char_t));
2563                     insRecId = ins.recId;
2564                 }
2565                 if (height == 0) {
2566                     ins.keyLen = b->keyStr[bn-1].size;
2567                     memcpy(ins.keyChar, &b->keyChar[b->keyStr[bn-1].offs], ins.keyLen*sizeof(char_t));
2568                     ins.recId = b->keyStr[bn-1].recId;
2569                 } else {
2570                     assert(((void)"String fits in the B-Tree page",
2571                             moved + (bn+1)*sizeof(str) <= sizeof(keyChar)));
2572                     if (bn != r) {
2573                         ins.keyLen = (int)keyLen;
2574                         memcpy(ins.keyChar, &keyChar[keyStr[i].offs],
2575                                keyLen*sizeof(char_t));
2576                         b->keyStr[bn].oid = keyStr[i].oid;
2577                         ins.recId = keyStr[i].recId;
2578                         size -= (int)(keyLen*sizeof(char_t));
2579                         i += 1;
2580                     } else {
2581                         b->keyStr[bn].oid = ins.oid;
2582                     }
2583                 }
2584                 compactify(db, i);
2585                 if (bn < r || (bn == r && height == 0)) {
2586                     memmove(&keyStr[r-i+1], &keyStr[r-i],
2587                             (n - r)*sizeof(str));
2588                     size += len*sizeof(char_t);
2589                     nItems += 1;
2590                     assert(((void)"String fits in the B-Tree page",
2591                             size + (n-i+1)*sizeof(str) <= sizeof(keyChar)));
2592                     keyStr[r-i].offs = (nat2)(sizeof(keyChar) - size);
2593                     keyStr[r-i].size = len;
2594                     keyStr[r-i].oid = ins.oid;
2595                     keyStr[r-i].recId = insRecId;
2596                     memcpy(&keyChar[keyStr[r-i].offs], insKey, len*sizeof(char_t));
2597                 }
2598                 b->nItems = bn;
2599                 b->size = (nat4)moved;
2600                 ins.oid = pageId;
2601                 db->pool.unfix(b);
2602                 assert(nItems > 0 && b->nItems > 0);
2603                 return dbBtree::overflow;
2604             }
2605             prevDelta = delta;
2606             moved += keyLen*sizeof(char_t);
2607             assert(((void)"String fits in the B-Tree page",
2608                     moved + (bn+1)*sizeof(str) <= sizeof(keyChar)));
2609             b->keyStr[bn].size = (nat2)keyLen;
2610             b->keyStr[bn].offs = (nat2)(sizeof(keyChar) - moved);
2611             if (bn == r) {
2612                 b->keyStr[bn].oid = ins.oid;
2613                 b->keyStr[bn].recId = ins.recId;
2614                 memcpy(&b->keyChar[b->keyStr[bn].offs], ins.keyChar, keyLen*sizeof(char_t));
2615             } else {
2616                 b->keyStr[bn].oid = keyStr[i].oid;
2617                 b->keyStr[bn].recId = keyStr[i].recId;
2618                 memcpy(&b->keyChar[b->keyStr[bn].offs],
2619                        &keyChar[keyStr[i].offs], keyLen*sizeof(char_t));
2620                 size -= (nat4)(keyLen*sizeof(char_t));
2621                 i += 1;
2622             }
2623         }
2624     }
2625     return nItems == 0 || (size + sizeof(str)*(nItems+1))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
2626         ? dbBtree::underflow : dbBtree::done;
2627 }
2628 
compactify(dbDatabase * db,int m)2629 void dbThickBtreePage::compactify(dbDatabase* db, int m)
2630 {
2631     int i, j, offs, len, n = nItems;
2632     // int size[dbPageSize]; // for 8Kb default page size, total size used by these buffers will be 64kb which is too much for many environments
2633     // int index[dbPageSize];
2634     int* size = db->btreeBuf;
2635     int* index = size + dbPageSize;
2636     if (m == 0) {
2637         return;
2638     }
2639     if (m < 0) {
2640         m = -m;
2641         for (i = 0; i < n-m; i++) {
2642             len = keyStr[i].size;
2643             size[keyStr[i].offs + len*sizeof(char_t)] = len;
2644             index[keyStr[i].offs + len*sizeof(char_t)] = i;
2645         }
2646         for (; i < n; i++) {
2647             len = keyStr[i].size;
2648             size[keyStr[i].offs + len*sizeof(char_t)] = len;
2649             index[keyStr[i].offs + len*sizeof(char_t)] = -1;
2650         }
2651     } else {
2652         for (i = 0; i < m; i++) {
2653             len = keyStr[i].size;
2654             size[keyStr[i].offs + len*sizeof(char_t)] = len;
2655             index[keyStr[i].offs + len*sizeof(char_t)] = -1;
2656         }
2657         for (; i < n; i++) {
2658             len = keyStr[i].size;
2659             size[keyStr[i].offs + len*sizeof(char_t)] = len;
2660             index[keyStr[i].offs + len*sizeof(char_t)] = i - m;
2661             keyStr[i-m].oid = keyStr[i].oid;
2662             keyStr[i-m].recId = keyStr[i].recId;
2663             keyStr[i-m].size = len;
2664         }
2665         keyStr[i-m].oid = keyStr[i].oid;
2666         keyStr[i-m].recId = keyStr[i].recId;
2667     }
2668     nItems = n -= m;
2669     for (offs = sizeof(keyChar), i = offs; n != 0; i -= len) {
2670         len = size[i]*sizeof(char_t);
2671         j = index[i];
2672         if (j >= 0) {
2673             offs -= len;
2674             n -= 1;
2675             keyStr[j].offs = offs;
2676             if (offs != i - len) {
2677                 memmove(&keyChar[offs], &keyChar[(i - len)], len);
2678             }
2679         }
2680     }
2681 }
2682 
removeStrKey(dbDatabase * db,int r)2683 int dbThickBtreePage::removeStrKey(dbDatabase* db, int r)
2684 {
2685     int len = keyStr[r].size;
2686     int offs = keyStr[r].offs;
2687     memmove(keyChar + sizeof(keyChar) - size + len*sizeof(char_t),
2688             keyChar + sizeof(keyChar) - size,
2689             size - sizeof(keyChar) + offs);
2690     memmove(&keyStr[r], &keyStr[r+1], (nItems-r)*sizeof(str));
2691     nItems -= 1;
2692     size -= len*sizeof(char_t);
2693     for (int i = nItems; --i >= 0; ) {
2694         if (keyStr[i].offs < offs) {
2695             keyStr[i].offs += (nat2)(len*sizeof(char_t));
2696         }
2697     }
2698     return nItems == 0 || (size + sizeof(str)*(nItems+1))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
2699         ? dbBtree::underflow : dbBtree::done;
2700 }
2701 
replaceStrKey(dbDatabase * db,int r,item & ins,int height)2702 int dbThickBtreePage::replaceStrKey(dbDatabase* db, int r, item& ins, int height)
2703 {
2704     ins.oid = keyStr[r].oid;
2705     removeStrKey(db, r);
2706     return insertStrKey(db, r, ins, height);
2707 }
2708 
handlePageUnderflow(dbDatabase * db,int r,int type,int sizeofType,item & rem,int height)2709 int dbThickBtreePage::handlePageUnderflow(dbDatabase* db, int r, int type, int sizeofType, item& rem,
2710                                           int height)
2711 {
2712     dbPutTie tie;
2713     if (type == dbField::tpString) {
2714         dbThickBtreePage* a = (dbThickBtreePage*)db->put(tie, keyStr[r].oid);
2715         int an = a->nItems;
2716         if (r < (int)nItems) { // exists greater page
2717             dbThickBtreePage* b = (dbThickBtreePage*)db->get(keyStr[r+1].oid);
2718             int bn = b->nItems;
2719             size_t merged_size = (an+bn)*sizeof(str) + a->size + b->size;
2720             if (height != 1) {
2721                 merged_size += keyStr[r].size*sizeof(char_t) + sizeof(str)*2;
2722             }
2723             if (merged_size > sizeof(keyChar)) {
2724                 // reallocation of nodes between pages a and b
2725                 int i, j, k;
2726                 dbPutTie tie;
2727                 db->pool.unfix(b);
2728                 b = (dbThickBtreePage*)db->put(tie, keyStr[r+1].oid);
2729                 size_t size_a = a->size;
2730                 size_t size_b = b->size;
2731                 size_t addSize, subSize;
2732                 if (height != 1) {
2733                     addSize = keyStr[r].size;
2734                     subSize = b->keyStr[0].size;
2735                 } else {
2736                     addSize = subSize = b->keyStr[0].size;
2737                 }
2738                 i = 0;
2739                 long prevDelta = (long)((an*sizeof(str) + size_a) - (bn*sizeof(str) + size_b));
2740                 while (true) {
2741                     i += 1;
2742                     long delta = (long)(((an+i)*sizeof(str) + size_a + addSize*sizeof(char_t))
2743                          - ((bn-i)*sizeof(str) + size_b - subSize*sizeof(char_t)));
2744                     if (delta >= 0) {
2745                         if (delta >= -prevDelta) {
2746                             i -= 1;
2747                         }
2748                         break;
2749                     }
2750                     size_a += addSize*sizeof(char_t);
2751                     size_b -= subSize*sizeof(char_t);
2752                     prevDelta = delta;
2753                     if (height != 1) {
2754                         addSize = subSize;
2755                         subSize = b->keyStr[i].size;
2756                     } else {
2757                         addSize = subSize = b->keyStr[i].size;
2758                     }
2759                 }
2760                 int result = dbBtree::done;
2761                 if (i > 0) {
2762                     k = i;
2763                     if (height != 1) {
2764                         int len = keyStr[r].size;
2765                         a->size += len*sizeof(char_t);
2766                         a->keyStr[an].offs = (nat2)(sizeof(a->keyChar) - a->size);
2767                         a->keyStr[an].size = len;
2768                         a->keyStr[an].recId = keyStr[r].recId;
2769                         memcpy(a->keyChar + a->keyStr[an].offs,
2770                                keyChar + keyStr[r].offs, len*sizeof(char_t));
2771                         k -= 1;
2772                         an += 1;
2773                         a->keyStr[an+k].oid = b->keyStr[k].oid;
2774                         b->size -= b->keyStr[k].size*sizeof(char_t);
2775                     }
2776                     for (j = 0; j < k; j++) {
2777                         int len = b->keyStr[j].size;
2778                         a->size += len*sizeof(char_t);
2779                         b->size -= len*sizeof(char_t);
2780                         a->keyStr[an].offs = (nat2)(sizeof(a->keyChar) - a->size);
2781                         a->keyStr[an].size = len;
2782                         a->keyStr[an].oid = b->keyStr[j].oid;
2783                         a->keyStr[an].recId = b->keyStr[j].recId;
2784                         memcpy(a->keyChar + a->keyStr[an].offs,
2785                                b->keyChar + b->keyStr[j].offs, len*sizeof(char_t));
2786                         an += 1;
2787                     }
2788                     memcpy(rem.keyChar, b->keyChar + b->keyStr[i-1].offs,
2789                            b->keyStr[i-1].size*sizeof(char_t));
2790                     rem.keyLen = b->keyStr[i-1].size;
2791                     rem.recId =  b->keyStr[i-1].recId;
2792                     result = replaceStrKey(db, r, rem, height);
2793                     a->nItems = an;
2794                     b->compactify(db, i);
2795                 }
2796                 assert(a->nItems > 0 && b->nItems > 0);
2797                 return result;
2798             } else { // merge page b to a
2799                 if (height != 1) {
2800                     a->size += (a->keyStr[an].size = keyStr[r].size)*sizeof(char_t);
2801                     a->keyStr[an].offs = (nat2)(sizeof(keyChar) - a->size);
2802                     memcpy(&a->keyChar[a->keyStr[an].offs],
2803                            &keyChar[keyStr[r].offs], keyStr[r].size*sizeof(char_t));
2804                     a->keyStr[an].recId = keyStr[r].recId;
2805                     an += 1;
2806                     a->keyStr[an+bn].oid = b->keyStr[bn].oid;
2807                 }
2808                 for (int i = 0; i < bn; i++, an++) {
2809                     a->keyStr[an] = b->keyStr[i];
2810                     a->keyStr[an].offs -= a->size;
2811                 }
2812                 a->size += b->size;
2813                 a->nItems = an;
2814                 memcpy(a->keyChar + sizeof(keyChar) - a->size,
2815                        b->keyChar + sizeof(keyChar) - b->size,
2816                        b->size);
2817                 db->pool.unfix(b);
2818                 db->freePage(keyStr[r+1].oid);
2819                 keyStr[r+1].oid = keyStr[r].oid;
2820                 return removeStrKey(db, r);
2821             }
2822         } else { // page b is before a
2823             dbThickBtreePage* b = (dbThickBtreePage*)db->get(keyStr[r-1].oid);
2824             int bn = b->nItems;
2825             size_t merged_size = (an+bn)*sizeof(str) + a->size + b->size;
2826             if (height != 1) {
2827                 merged_size += keyStr[r-1].size*sizeof(char_t) + sizeof(str)*2;
2828             }
2829             if (merged_size > sizeof(keyChar)) {
2830                 // reallocation of nodes between pages a and b
2831                 dbPutTie tie;
2832                 int i, j, k;
2833                 db->pool.unfix(b);
2834                 b = (dbThickBtreePage*)db->put(tie, keyStr[r-1].oid);
2835                 size_t size_a = a->size;
2836                 size_t size_b = b->size;
2837                 size_t addSize, subSize;
2838                 if (height != 1) {
2839                     addSize = keyStr[r-1].size;
2840                     subSize = b->keyStr[bn-1].size;
2841                 } else {
2842                     addSize = subSize = b->keyStr[bn-1].size;
2843                 }
2844                 i = 0;
2845                 long prevDelta = (long)((an*sizeof(str) + size_a) - (bn*sizeof(str) + size_b));
2846                 while (true) {
2847                     i += 1;
2848                     long delta = (long)(((an+i)*sizeof(str) + size_a + addSize*sizeof(char_t))
2849                          - ((bn-i)*sizeof(str) + size_b - subSize*sizeof(char_t)));
2850                     if (delta >= 0) {
2851                         if (delta >= -prevDelta) {
2852                             i -= 1;
2853                         }
2854                         break;
2855                     }
2856                     prevDelta = delta;
2857                     size_a += addSize*sizeof(char_t);
2858                     size_b -= subSize*sizeof(char_t);
2859                     if (height != 1) {
2860                         addSize = subSize;
2861                         subSize = b->keyStr[bn-i-1].size;
2862                     } else {
2863                         addSize = subSize = b->keyStr[bn-i-1].size;
2864                     }
2865                 }
2866                 int result = dbBtree::done;
2867                 if (i > 0) {
2868                     k = i;
2869                     assert(i < bn);
2870                     if (height != 1) {
2871                         memmove(&a->keyStr[i], a->keyStr, (an+1)*sizeof(str));
2872                         b->size -= b->keyStr[bn-k].size*sizeof(char_t);
2873                         k -= 1;
2874                         a->keyStr[k].oid = b->keyStr[bn].oid;
2875                         a->keyStr[k].recId = keyStr[r-1].recId;
2876                         int len = keyStr[r-1].size;
2877                         a->keyStr[k].size = len;
2878                         a->size += len*sizeof(char_t);
2879                         a->keyStr[k].offs = (nat2)(sizeof(keyChar) - a->size);
2880                         memcpy(&a->keyChar[a->keyStr[k].offs],
2881                                &keyChar[keyStr[r-1].offs], len*sizeof(char_t));
2882                     } else {
2883                         memmove(&a->keyStr[i], a->keyStr, an*sizeof(str));
2884                     }
2885                     for (j = 0; j < k; j++) {
2886                         int len = b->keyStr[bn-k+j].size;
2887                         a->size += len*sizeof(char_t);
2888                         b->size -= len*sizeof(char_t);
2889                         a->keyStr[j].offs = (nat2)(sizeof(a->keyChar) - a->size);
2890                         a->keyStr[j].size = len;
2891                         a->keyStr[j].oid = b->keyStr[bn-k+j].oid;
2892                         a->keyStr[j].recId = b->keyStr[bn-k+j].recId;
2893                         memcpy(a->keyChar + a->keyStr[j].offs,
2894                                b->keyChar + b->keyStr[bn-k+j].offs, len*sizeof(char_t));
2895                     }
2896                     an += i;
2897                     a->nItems = an;
2898                     memcpy(rem.keyChar, b->keyChar + b->keyStr[bn-k-1].offs,
2899                            b->keyStr[bn-k-1].size*sizeof(char_t));
2900                     rem.keyLen = b->keyStr[bn-k-1].size;
2901                     rem.recId = b->keyStr[bn-k-1].recId;
2902                     result = replaceStrKey(db, r-1, rem, height);
2903                     b->compactify(db, -i);
2904                 }
2905                 assert(a->nItems > 0 && b->nItems > 0);
2906                 return result;
2907             } else { // merge page b to a
2908                 if (height != 1) {
2909                     memmove(a->keyStr + bn + 1, a->keyStr, (an+1)*sizeof(str));
2910                     a->size += (a->keyStr[bn].size = keyStr[r-1].size)*sizeof(char_t);
2911                     a->keyStr[bn].offs = (nat2)(sizeof(keyChar) - a->size);
2912                     a->keyStr[bn].oid = b->keyStr[bn].oid;
2913                     a->keyStr[bn].recId = keyStr[r-1].recId;
2914                     memcpy(&a->keyChar[a->keyStr[bn].offs],
2915                            &keyChar[keyStr[r-1].offs], keyStr[r-1].size*sizeof(char_t));
2916                     an += 1;
2917                 } else {
2918                     memmove(a->keyStr + bn, a->keyStr, an*sizeof(str));
2919                 }
2920                 for (int i = 0; i < bn; i++) {
2921                     a->keyStr[i] = b->keyStr[i];
2922                     a->keyStr[i].offs -= a->size;
2923                 }
2924                 an += bn;
2925                 a->nItems = an;
2926                 a->size += b->size;
2927                 memcpy(a->keyChar + sizeof(keyChar) - a->size,
2928                        b->keyChar + sizeof(keyChar) - b->size,
2929                        b->size);
2930                 db->pool.unfix(b);
2931                 db->freePage(keyStr[r-1].oid);
2932                 return removeStrKey(db, r-1);
2933             }
2934         }
2935     } else {
2936         dbThickBtreePage* a = (dbThickBtreePage*)db->put(tie, ref[maxItems-r-1].oid);
2937         int an = a->nItems;
2938         if (r < int(nItems)) { // exists greater page
2939             dbThickBtreePage* b = (dbThickBtreePage*)db->get(ref[maxItems-r-2].oid);
2940             int bn = b->nItems;
2941             assert(bn >= an);
2942             if (height != 1) {
2943                 memcpy(a->keyChar + an*sizeofType, keyChar + r*sizeofType,
2944                        sizeofType);
2945                 a->ref[maxItems-an-1].recId = ref[maxItems-r-1].recId;
2946                 an += 1;
2947                 bn += 1;
2948             }
2949             size_t merged_size = (an+bn)*(sizeof(reference) + sizeofType);
2950             if (merged_size > sizeof(keyChar)) {
2951                 // reallocation of nodes between pages a and b
2952                 int i = bn - ((an + bn) >> 1);
2953                 dbPutTie tie;
2954                 db->pool.unfix(b);
2955                 b = (dbThickBtreePage*)db->put(tie, ref[maxItems-r-2].oid);
2956                 memcpy(a->keyChar + an*sizeofType, b->keyChar, i*sizeofType);
2957                 memmove(b->keyChar, b->keyChar + i*sizeofType, (bn-i)*sizeofType);
2958                 memcpy(&a->ref[maxItems-an-i], &b->ref[maxItems-i], i*sizeof(reference));
2959                 memmove(&b->ref[maxItems-bn+i], &b->ref[maxItems-bn], (bn-i)*sizeof(reference));
2960                 memcpy(keyChar + r*sizeofType,  a->keyChar + (an+i-1)*sizeofType, sizeofType);
2961                 ref[maxItems-r-1].recId = a->ref[maxItems-an-i].recId;
2962                 b->nItems -= i;
2963                 a->nItems += i;
2964                 return dbBtree::done;
2965             } else { // merge page b to a
2966                 memcpy(a->keyChar + an*sizeofType, b->keyChar, bn*sizeofType);
2967                 memcpy(&a->ref[maxItems-an-bn], &b->ref[maxItems-bn], bn*sizeof(reference));
2968                 db->pool.unfix(b);
2969                 db->freePage(ref[maxItems-r-2].oid);
2970                 ref[maxItems-r-1].recId = ref[maxItems-r-2].recId;
2971                 memmove(&ref[maxItems-nItems], &ref[maxItems-nItems-1],
2972                         (nItems - r - 1)*sizeof(reference));
2973                 memmove(keyChar + r*sizeofType, keyChar + (r+1)*sizeofType,
2974                        (nItems - r - 1)*sizeofType);
2975                 a->nItems += bn;
2976                 nItems -= 1;
2977                 return nItems == 0 || (nItems+1)*(sizeofType + sizeof(reference))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
2978                     ? dbBtree::underflow : dbBtree::done;
2979             }
2980         } else { // page b is before a
2981             dbThickBtreePage* b = (dbThickBtreePage*)db->get(ref[maxItems-r].oid);
2982             int bn = b->nItems;
2983             assert(bn >= an);
2984             if (height != 1) {
2985                 an += 1;
2986                 bn += 1;
2987             }
2988             size_t merged_size = (an+bn)*(sizeof(reference) + sizeofType);
2989             if (merged_size > sizeof(keyChar)) {
2990                 // reallocation of nodes between pages a and b
2991                 int i = bn - ((an + bn) >> 1);
2992                 dbPutTie tie;
2993                 db->pool.unfix(b);
2994                 b = (dbThickBtreePage*)db->put(tie, ref[maxItems-r].oid);
2995                 memmove(a->keyChar + i*sizeofType, a->keyChar, an*sizeofType);
2996                 memcpy(a->keyChar, b->keyChar + (bn-i)*sizeofType, i*sizeofType);
2997                 memmove(&a->ref[maxItems-an-i], &a->ref[maxItems-an], an*sizeof(reference));
2998                 memcpy(&a->ref[maxItems-i], &b->ref[maxItems-bn], i*sizeof(reference));
2999                 if (height != 1) {
3000                     memcpy(a->keyChar + (i-1)*sizeofType, keyChar + (r-1)*sizeofType,
3001                            sizeofType);
3002                     a->ref[maxItems-i].recId = ref[maxItems-r].recId;
3003                 }
3004                 memcpy(keyChar + (r-1)*sizeofType, b->keyChar + (bn-i-1)*sizeofType,
3005                        sizeofType);
3006                 ref[maxItems-r].recId = b->ref[maxItems-bn+i].recId;
3007                 b->nItems -= i;
3008                 a->nItems += i;
3009                 return dbBtree::done;
3010             } else { // merge page b to a
3011                 memmove(a->keyChar + bn*sizeofType, a->keyChar, an*sizeofType);
3012                 memcpy(a->keyChar, b->keyChar, bn*sizeofType);
3013                 memmove(&a->ref[maxItems-an-bn], &a->ref[maxItems-an], an*sizeof(reference));
3014                 memcpy(&a->ref[maxItems-bn], &b->ref[maxItems-bn], bn*sizeof(reference));
3015                 if (height != 1) {
3016                     memcpy(a->keyChar + (bn-1)*sizeofType, keyChar + (r-1)*sizeofType,
3017                            sizeofType);
3018                     a->ref[maxItems-bn].recId = ref[maxItems-r].recId;
3019                 }
3020                 db->pool.unfix(b);
3021                 db->freePage(ref[maxItems-r].oid);
3022                 ref[maxItems-r].oid = ref[maxItems-r-1].oid;
3023                 a->nItems += bn;
3024                 nItems -= 1;
3025                 return nItems == 0 || (nItems+1)*(sizeofType + sizeof(reference))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
3026                     ? dbBtree::underflow : dbBtree::done;
3027             }
3028         }
3029     }
3030 }
3031 
3032 #undef REMOVE
3033 #define REMOVE(KEY,TYPE) {                                                        \
3034     TYPE key = rem.KEY;                                                           \
3035     while (l < r)  {                                                              \
3036         i = (l+r) >> 1;                                                           \
3037         if (key > pg->KEY[i] || (key == pg->KEY[i] && recId > pg->ref[maxItems-i-1].recId)) { \
3038             l = i+1;                                                              \
3039         } else {                                                                  \
3040             r = i;                                                                \
3041         }                                                                         \
3042     }                                                                             \
3043     if (--height == 0) {                                                          \
3044         while (r < n) {                                                           \
3045             if (key == pg->KEY[r]) {                                              \
3046                 if (pg->ref[maxItems-r-1].oid == oid) {                           \
3047                     db->pool.unfix(pg);                                           \
3048                     pg = (dbThickBtreePage*)db->put(tie, pageId);                 \
3049                     memmove(&pg->KEY[r], &pg->KEY[r+1], (n - r - 1)*sizeof(TYPE)); \
3050                     memmove(&pg->ref[maxItems-n+1], &pg->ref[maxItems-n],         \
3051                             (n - r - 1)*sizeof(reference));                       \
3052                     pg->nItems = --n;                                             \
3053                     return n == 0 || n*(sizeof(TYPE) + sizeof(reference))*100 < sizeof(pg->keyChar)*db->btreeUnderflowPercent \
3054                         ? dbBtree::underflow : dbBtree::done;                     \
3055                 }                                                                 \
3056             } else {                                                              \
3057                 break;                                                            \
3058             }                                                                     \
3059             r += 1;                                                               \
3060         }                                                                         \
3061         db->pool.unfix(pg);                                                       \
3062         return dbBtree::not_found;                                                \
3063     }                                                                             \
3064     break;                                                                        \
3065 }
3066 
remove(dbDatabase * db,oid_t pageId,int type,int sizeofType,dbUDTComparator comparator,item & rem,int height)3067 int dbThickBtreePage::remove(dbDatabase* db, oid_t pageId, int type, int sizeofType,
3068                              dbUDTComparator comparator, item& rem,     int height)
3069 {
3070     dbThickBtreePage* pg = (dbThickBtreePage*)db->get(pageId);
3071     dbPutTie tie;
3072     oid_t oid = rem.oid;
3073     oid_t recId = rem.recId;
3074     int i, n = pg->nItems, l = 0, r = n;
3075 
3076     switch (type) {
3077       case dbField::tpBool:
3078       case dbField::tpInt1:
3079         REMOVE(keyInt1, int1);
3080       case dbField::tpInt2:
3081         REMOVE(keyInt2, int2);
3082       case dbField::tpInt4:
3083         REMOVE(keyInt4, int4);
3084       case dbField::tpInt8:
3085         REMOVE(keyInt8, db_int8);
3086       case dbField::tpReference:
3087         REMOVE(keyOid, oid_t);
3088       case dbField::tpReal4:
3089         REMOVE(keyReal4, real4);
3090       case dbField::tpReal8:
3091         REMOVE(keyReal8, real8);
3092       case dbField::tpRawBinary:
3093       {
3094         char* key = (char*)rem.keyChar;
3095         while (l < r)  {
3096             i = (l+r) >> 1;
3097             int diff = comparator(key, pg->keyChar + i*sizeofType, sizeofType);
3098             if (diff > 0 || (diff == 0 && recId > pg->ref[maxItems-i-1].recId)) {
3099                 l = i+1;
3100             } else {
3101                 r = i;
3102             }
3103         }
3104         if (--height == 0) {
3105             while (r < n) {
3106                 if (memcmp(key, pg->keyChar + r*sizeofType, sizeofType) == 0) {
3107                     if (pg->ref[maxItems-r-1].oid == oid) {
3108                         db->pool.unfix(pg);
3109                         pg = (dbThickBtreePage*)db->put(tie, pageId);
3110                         memmove(pg->keyChar + r*sizeofType, pg->keyChar + (r+1)*sizeofType,
3111                                (n - r - 1)*sizeofType);
3112                         memmove(&pg->ref[maxItems-n+1], &pg->ref[maxItems-n],
3113                                 (n - r - 1)*sizeof(reference));
3114                         pg->nItems = --n;
3115                         return n == 0 || n*(sizeofType + sizeof(reference))*100 < sizeof(pg->keyChar)*db->btreeUnderflowPercent
3116                             ? dbBtree::underflow : dbBtree::done;
3117                     }
3118                 } else {
3119                     break;
3120                 }
3121                 r += 1;
3122             }
3123             db->pool.unfix(pg);
3124             return dbBtree::not_found;
3125         }
3126         break;
3127       }
3128       case dbField::tpString:
3129       {
3130         char_t* key = rem.keyChar;
3131         while (l < r)  {
3132             i = (l+r) >> 1;
3133             int diff = SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[i].offs]);
3134             if (diff > 0 || (diff == 0 && recId > pg->keyStr[i].recId)) {
3135                 l = i+1;
3136             } else {
3137                 r = i;
3138             }
3139         }
3140         if (--height != 0) {
3141             do {
3142                 switch (remove(db, pg->keyStr[r].oid, type, sizeofType, comparator, rem, height)) {
3143                   case dbBtree::underflow:
3144                     db->pool.unfix(pg);
3145                     pg = (dbThickBtreePage*)db->put(tie, pageId);
3146                     return pg->handlePageUnderflow(db, r, type, sizeofType, rem, height);
3147                   case dbBtree::done:
3148                     db->pool.unfix(pg);
3149                     return dbBtree::done;
3150                   case dbBtree::overflow:
3151                     db->pool.unfix(pg);
3152                     pg = (dbThickBtreePage*)db->put(tie, pageId);
3153                     return pg->insertStrKey(db, r, rem, height);
3154                 }
3155             } while (++r <= n);
3156         } else {
3157             while (r < n) {
3158                 if (SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[r].offs]) == 0) {
3159                     if (pg->keyStr[r].oid == oid) {
3160                         db->pool.unfix(pg);
3161                         pg = (dbThickBtreePage*)db->put(tie, pageId);
3162                         return pg->removeStrKey(db, r);
3163                     }
3164                 } else {
3165                     break;
3166                 }
3167                 r += 1;
3168             }
3169         }
3170         db->pool.unfix(pg);
3171         return dbBtree::not_found;
3172       }
3173       default:
3174         assert(false);
3175     }
3176     do {
3177         switch (remove(db, pg->ref[maxItems-r-1].oid, type, sizeofType, comparator, rem, height)) {
3178           case dbBtree::underflow:
3179             db->pool.unfix(pg);
3180             pg = (dbThickBtreePage*)db->put(tie, pageId);
3181             return pg->handlePageUnderflow(db, r, type, sizeofType, rem, height);
3182           case dbBtree::done:
3183             db->pool.unfix(pg);
3184             return dbBtree::done;
3185         }
3186     } while (++r <= n);
3187 
3188     db->pool.unfix(pg);
3189     return dbBtree::not_found;
3190 }
3191 
3192 
purge(dbDatabase * db,oid_t pageId,int type,int height)3193 void dbThickBtreePage::purge(dbDatabase* db, oid_t pageId, int type, int height)
3194 {
3195     if (--height != 0) {
3196         dbThickBtreePage* pg = (dbThickBtreePage*)db->get(pageId);
3197         int n = pg->nItems+1;
3198         if (type == dbField::tpString) { // page of strings
3199             while (--n >= 0) {
3200                 purge(db, pg->keyStr[n].oid, type, height);
3201             }
3202         } else {
3203             while (--n >= 0) {
3204                 purge(db, pg->ref[maxItems-n-1].oid, type, height);
3205             }
3206         }
3207         db->pool.unfix(pg);
3208     }
3209     db->freePage(pageId);
3210 }
3211 
traverseForward(dbDatabase * db,dbAnyCursor * cursor,dbExprNode * condition,int type,int height)3212 bool dbThickBtreePage::traverseForward(dbDatabase* db, dbAnyCursor* cursor,
3213                                   dbExprNode* condition, int type, int height)
3214 {
3215     int n = nItems;
3216     if (--height != 0) {
3217         if (type == dbField::tpString) { // page of strings
3218             for (int i = 0; i <= n; i++) {
3219                 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[i].oid);
3220                 if (!pg->traverseForward(db, cursor, condition, type, height)) {
3221                     db->pool.unfix(pg);
3222                     return false;
3223                 }
3224                 db->pool.unfix(pg);
3225             }
3226         } else {
3227             for (int i = 0; i <= n; i++) {
3228                 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-i-1].oid);
3229                 if (!pg->traverseForward(db, cursor, condition, type, height)) {
3230                     db->pool.unfix(pg);
3231                     return false;
3232                 }
3233                 db->pool.unfix(pg);
3234             }
3235         }
3236     } else {
3237         if (type != dbField::tpString) { // page of scalars
3238             if (condition == NULL) {
3239                 for (int i = 0; i < n; i++) {
3240                     if (!cursor->add(ref[maxItems-1-i].oid)) {
3241                         return false;
3242                     }
3243                 }
3244             } else {
3245                 dbTableDescriptor* table = cursor->table;
3246                 for (int i = 0; i < n; i++) {
3247                     if (db->evaluateBoolean(condition, ref[maxItems-1-i].oid, table, cursor)) {
3248                         if (!cursor->add(ref[maxItems-1-i].oid)) {
3249                             return false;
3250                         }
3251                     }
3252                 }
3253             }
3254         } else { // page of strings
3255             if (condition == NULL) {
3256                 for (int i = 0; i < n; i++) {
3257                     if (!cursor->add(keyStr[i].oid)) {
3258                         return false;
3259                     }
3260                 }
3261             } else {
3262                 dbTableDescriptor* table = cursor->table;
3263                 for (int i = 0; i < n; i++) {
3264                     if (db->evaluateBoolean(condition, keyStr[i].oid, table, cursor)) {
3265                         if (!cursor->add(keyStr[i].oid)) {
3266                             return false;
3267                         }
3268                     }
3269                 }
3270             }
3271         }
3272     }
3273     return true;
3274 }
3275 
3276 
traverseBackward(dbDatabase * db,dbAnyCursor * cursor,dbExprNode * condition,int type,int height)3277 bool dbThickBtreePage::traverseBackward(dbDatabase* db, dbAnyCursor* cursor,
3278                                         dbExprNode* condition, int type, int height)
3279 {
3280     int n = nItems;
3281     if (--height != 0) {
3282         if (type == dbField::tpString) { // page of strings
3283             do {
3284                 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[n].oid);
3285                 if (!pg->traverseBackward(db, cursor, condition, type, height)) {
3286                     db->pool.unfix(pg);
3287                     return false;
3288                 }
3289                 db->pool.unfix(pg);
3290             } while (--n >= 0);
3291         } else {
3292             do {
3293                 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-n-1].oid);
3294                 if (!pg->traverseBackward(db, cursor, condition, type, height)) {
3295                     db->pool.unfix(pg);
3296                     return false;
3297                 }
3298                 db->pool.unfix(pg);
3299             } while (--n >= 0);
3300         }
3301     } else {
3302         if (type != dbField::tpString) { // page of scalars
3303             if (condition == NULL) {
3304                 while (--n >= 0) {
3305                     if (!cursor->add(ref[maxItems-1-n].oid)) {
3306                         return false;
3307                     }
3308                 }
3309             } else {
3310                 dbTableDescriptor* table = cursor->table;
3311                 while (--n >= 0) {
3312                     if (db->evaluateBoolean(condition, ref[maxItems-1-n].oid, table, cursor)) {
3313                         if (!cursor->add(ref[maxItems-1-n].oid)) {
3314                             return false;
3315                         }
3316                     }
3317                 }
3318             }
3319         } else { // page of strings
3320             if (condition == NULL) {
3321                 while (--n >= 0) {
3322                     if (!cursor->add(keyStr[n].oid)) {
3323                         return false;
3324                     }
3325                 }
3326             } else {
3327                 dbTableDescriptor* table = cursor->table;
3328                 while (--n >= 0) {
3329                     if (db->evaluateBoolean(condition, keyStr[n].oid, table, cursor)) {
3330                         if (!cursor->add(keyStr[n].oid)) {
3331                             return false;
3332                         }
3333                     }
3334                 }
3335             }
3336         }
3337     }
3338     return true;
3339 }
3340 
boolComparator(void * p,void * q,size_t)3341 static int boolComparator(void* p, void* q, size_t)
3342 {
3343     return (int)*(bool*)p - (int)*(bool*)q;
3344 }
3345 
int1Comparator(void * p,void * q,size_t)3346 static int int1Comparator(void* p, void* q, size_t)
3347 {
3348     return *(int1*)p - *(int1*)q;
3349 }
3350 
int2Comparator(void * p,void * q,size_t)3351 static int int2Comparator(void* p, void* q, size_t)
3352 {
3353     return *(int2*)p - *(int2*)q;
3354 }
3355 
int4Comparator(void * p,void * q,size_t)3356 static int int4Comparator(void* p, void* q, size_t)
3357 {
3358     return *(int4*)p < *(int4*)q ? -1 : *(int4*)p == *(int4*)q ? 0 : 1;
3359 }
3360 
int8Comparator(void * p,void * q,size_t)3361 static int int8Comparator(void* p, void* q, size_t)
3362 {
3363     return *(db_int8*)p < *(db_int8*)q ? -1 : *(db_int8*)p == *(db_int8*)q ? 0 : 1;
3364 }
3365 
real4Comparator(void * p,void * q,size_t)3366 static int real4Comparator(void* p, void* q, size_t)
3367 {
3368     return *(real4*)p < *(real4*)q ? -1 : *(real4*)p == *(real4*)q ? 0 : 1;
3369 }
3370 
real8Comparator(void * p,void * q,size_t)3371 static int real8Comparator(void* p, void* q, size_t)
3372 {
3373     return *(real8*)p < *(real8*)q ? -1 : *(real8*)p == *(real8*)q ? 0 : 1;
3374 }
3375 
stringComparator(void * p,void * q,size_t)3376 static int stringComparator(void* p, void* q, size_t)
3377 {
3378     return SCMP((char_t*)p, (char_t*)q);
3379 }
3380 
3381 
3382 
3383 static dbUDTComparator const scalarComparators[] =
3384 {
3385     &boolComparator,
3386     &int1Comparator,
3387     &int2Comparator,
3388     &int4Comparator,
3389     &int8Comparator,
3390     &real4Comparator,
3391     &real8Comparator,
3392     &stringComparator
3393 };
3394 
init(dbDatabase * db,oid_t treeId,dbSearchContext & sc,dbUDTComparator comparator)3395 void dbBtreeIterator::init(dbDatabase* db, oid_t treeId, dbSearchContext& sc, dbUDTComparator comparator)
3396 {
3397     dbGetTie tie;
3398     dbBtree* tree = (dbBtree*)db->getRow(tie, treeId);
3399     type = tree->type;
3400     sizeofType = tree->sizeofType;
3401     this->db = db;
3402     this->sc = sc;
3403     this->treeId = treeId;
3404     this->comparator = (type <= dbField::tpString) ? scalarComparators[type] : comparator;
3405     if (tree->flags & dbBtree::FLAGS_THICK) {
3406         getOid = (type == dbField::tpString) ? &dbBtreeIterator::getStringThickBtreePageOid : &dbBtreeIterator::getScalarThickBtreePageOid;
3407         getKey = (type == dbField::tpString) ? &dbBtreeIterator::getStringThickBtreePageKey : &dbBtreeIterator::getScalarThickBtreePageKey;
3408     } else {
3409         getOid = (type == dbField::tpString) ? &dbBtreeIterator::getStringBtreePageOid : &dbBtreeIterator::getScalarBtreePageOid;
3410         getKey = (type == dbField::tpString) ? &dbBtreeIterator::getStringBtreePageKey : &dbBtreeIterator::getScalarBtreePageKey;
3411     }
3412 }
3413 
3414 
getStringBtreePageOid(byte * pg,int i)3415 oid_t dbBtreeIterator::getStringBtreePageOid(byte* pg, int i)
3416 {
3417     return ((dbBtreePage*)pg)->keyStr[i].oid;
3418 }
3419 
getScalarBtreePageOid(byte * pg,int i)3420 oid_t dbBtreeIterator::getScalarBtreePageOid(byte* pg, int i)
3421 {
3422     return ((dbBtreePage*)pg)->record[dbBtreePage::maxItems-i-1];
3423 }
3424 
getStringThickBtreePageOid(byte * pg,int i)3425 oid_t dbBtreeIterator::getStringThickBtreePageOid(byte* pg, int i)
3426 {
3427     return ((dbThickBtreePage*)pg)->keyStr[i].oid;
3428 }
3429 
getScalarThickBtreePageOid(byte * pg,int i)3430 oid_t dbBtreeIterator::getScalarThickBtreePageOid(byte* pg, int i)
3431 {
3432     return ((dbThickBtreePage*)pg)->ref[dbThickBtreePage::maxItems-i-1].oid;
3433 }
3434 
getStringBtreePageKey(byte * pg,int i)3435 void* dbBtreeIterator::getStringBtreePageKey(byte* pg, int i)
3436 {
3437     return &((dbBtreePage*)pg)->keyChar[((dbBtreePage*)pg)->keyStr[i].offs];
3438 }
3439 
getScalarBtreePageKey(byte * pg,int i)3440 void* dbBtreeIterator::getScalarBtreePageKey(byte* pg, int i)
3441 {
3442     return ((dbBtreePage*)pg)->keyChar + i*sizeofType;
3443 }
3444 
getStringThickBtreePageKey(byte * pg,int i)3445 void* dbBtreeIterator::getStringThickBtreePageKey(byte* pg, int i)
3446 {
3447     return &((dbThickBtreePage*)pg)->keyChar[((dbThickBtreePage*)pg)->keyStr[i].offs];
3448 }
3449 
getScalarThickBtreePageKey(byte * pg,int i)3450 void* dbBtreeIterator::getScalarThickBtreePageKey(byte* pg, int i)
3451 {
3452     return ((dbThickBtreePage*)pg)->keyChar + i*sizeofType;
3453 }
3454 
3455 
3456 
reset(bool ascent)3457 oid_t dbBtreeIterator::reset(bool ascent)
3458 {
3459     oid_t curr = 0;
3460     if (sc.prefixLength != 0 && !ascent) {
3461         oid_t last = reset(true);
3462         while ((curr = next()) != 0) {
3463             last = curr;
3464         }
3465         return last;
3466     }
3467     int i, l, r;
3468     dbGetTie tie;
3469     dbBtree* tree = (dbBtree*)db->getRow(tie, treeId);
3470     int h = tree->height;
3471     height = h;
3472     if (h == 0) {
3473         return 0;
3474     }
3475     int sp = 0;
3476     oid_t pageId = tree->root;
3477 
3478     if (ascent) {
3479         char_t* firstKey = sc.firstKey;
3480         if (firstKey == NULL) {
3481             while (--h > 0) {
3482                 posStack[sp] = 0;
3483                 pageStack[sp] = pageId;
3484                 byte* pg = db->get(pageId);
3485                 pageId = (this->*getOid)(pg, 0);
3486                 db->pool.unfix(pg);
3487                 sp += 1;
3488             }
3489             pageStack[sp++] = pageId;
3490             byte* pg = db->get(pageId);
3491             curr = gotoNextItem(pg, -1, true);
3492         } else {
3493             while (--h > 0) {
3494                 pageStack[sp] = pageId;
3495                 byte* pg = db->get(pageId);
3496                 l = 0;
3497                 r = nItems(pg);
3498                 while (l < r)  {
3499                     i = (l+r) >> 1;
3500                     if (comparator(firstKey, (this->*getKey)(pg, i), sizeofType) >= sc.firstKeyInclusion) {
3501                         l = i + 1;
3502                     } else {
3503                         r = i;
3504                     }
3505                 }
3506                 assert(r == l);
3507                 posStack[sp] = r;
3508                 pageId = (this->*getOid)(pg, r);
3509                 db->pool.unfix(pg);
3510                 sp += 1;
3511             }
3512             pageStack[sp++] = pageId;
3513             byte* pg = db->get(pageId);
3514             l = 0;
3515             r = nItems(pg);
3516             while (l < r)  {
3517                 i = (l+r) >> 1;
3518                 if (comparator(firstKey, (this->*getKey)(pg, i), sizeofType) >= sc.firstKeyInclusion) {
3519                     l = i + 1;
3520                 } else {
3521                     r = i;
3522                 }
3523             }
3524             assert(r == l);
3525             curr = gotoNextItem(pg, r-1, true);
3526         }
3527     } else { // descent order
3528         char_t* lastKey = sc.lastKey;
3529         if (lastKey == NULL) {
3530             while (--h > 0) {
3531                 pageStack[sp] = pageId;
3532                 byte* pg = db->get(pageId);
3533                 posStack[sp] = nItems(pg);
3534                 pageId = (this->*getOid)(pg, posStack[sp]);
3535                 db->pool.unfix(pg);
3536                 sp += 1;
3537             }
3538             pageStack[sp++] = pageId;
3539             byte* pg = db->get(pageId);
3540             curr = gotoNextItem(pg, nItems(pg), false);
3541         } else {
3542             while (--h > 0) {
3543                 pageStack[sp] = pageId;
3544                 byte* pg = db->get(pageId);
3545                 l = 0;
3546                 r = nItems(pg);
3547                 while (l < r)  {
3548                     i = (l+r) >> 1;
3549                     if (comparator(lastKey, (this->*getKey)(pg, i), sizeofType) >= 1-sc.lastKeyInclusion) {
3550                         l = i + 1;
3551                     } else {
3552                         r = i;
3553                     }
3554                 }
3555                 assert(r == l);
3556                 posStack[sp] = r;
3557                 pageId = (this->*getOid)(pg, r);
3558                 db->pool.unfix(pg);
3559                 sp += 1;
3560             }
3561             pageStack[sp++] = pageId;
3562             byte* pg = db->get(pageId);
3563             l = 0;
3564             r = nItems(pg);
3565             while (l < r)  {
3566                 i = (l+r) >> 1;
3567                 if (comparator(lastKey, (this->*getKey)(pg, i), sizeofType) >= 1-sc.lastKeyInclusion) {
3568                     l = i + 1;
3569                 } else {
3570                     r = i;
3571                 }
3572             }
3573             curr = gotoNextItem(pg, r, false);
3574         }
3575     }
3576     if (curr == 0) {
3577         height = 0;
3578     }
3579     return curr;
3580 }
3581 
first()3582 oid_t dbBtreeIterator::first()
3583 {
3584     return reset(sc.ascent);
3585 }
3586 
last()3587 oid_t dbBtreeIterator::last()
3588 {
3589     return reset(!sc.ascent);
3590 }
3591 
next()3592 oid_t dbBtreeIterator::next()
3593 {
3594     if (height == 0) {
3595         return 0;
3596     }
3597     byte* pg = db->get(pageStack[height-1]);
3598     return gotoNextItem(pg, posStack[height-1], sc.ascent);
3599 }
3600 
prev()3601 oid_t dbBtreeIterator::prev()
3602 {
3603     if (height == 0) {
3604         return 0;
3605     }
3606     byte* pg = db->get(pageStack[height-1]);
3607     return gotoNextItem(pg, posStack[height-1], !sc.ascent);
3608 }
3609 
gotoNextItem(byte * pg,int pos,bool forward)3610 oid_t dbBtreeIterator::gotoNextItem(byte* pg, int pos, bool forward)
3611 {
3612     dbTableDescriptor* table = sc.cursor->table;
3613     int sp = height;
3614     oid_t oldPageStack[MAX_BTREE_HEIGHT];
3615     int   oldPosStack[MAX_BTREE_HEIGHT];
3616 
3617     memcpy(oldPageStack, pageStack, sp*sizeof(oid_t));
3618     memcpy(oldPosStack, posStack, sp*sizeof(int));
3619 
3620     oid_t curr = 0;
3621     while (true) {
3622         if (forward) {
3623             if (++pos == nItems(pg)) {
3624                 while (--sp != 0) {
3625                     db->pool.unfix(pg);
3626                     pos = posStack[sp-1];
3627                     pg = db->get(pageStack[sp-1]);
3628                     if (++pos <= nItems(pg)) {
3629                         posStack[sp-1] = pos;
3630                         do {
3631                             oid_t pageId = (this->*getOid)(pg, pos);
3632                             db->pool.unfix(pg);
3633                             pg = db->get(pageId);
3634                             pageStack[sp] = pageId;
3635                             posStack[sp] = pos = 0;
3636                         } while (++sp < height);
3637 
3638                         break;
3639                     }
3640                 }
3641             } else {
3642                 posStack[sp-1] = pos;
3643             }
3644             if (sp == 0
3645                 || (sc.lastKey != NULL
3646                     && comparator((this->*getKey)(pg, pos), sc.lastKey, sizeofType) >= sc.lastKeyInclusion)
3647                 || (sc.prefixLength != 0 && memcmp((this->*getKey)(pg, pos), sc.firstKey, sc.prefixLength*sizeof(char_t)) != 0))
3648             {
3649 //                memcpy(pageStack, oldPageStack, sp*sizeof(oid_t));
3650 //                memcpy(posStack, oldPosStack, sp*sizeof(int));
3651                 break;
3652             }
3653         } else { // backward
3654             if (--pos < 0) {
3655                 while (--sp != 0) {
3656                     db->pool.unfix(pg);
3657                     pos = posStack[sp-1];
3658                     pg = db->get(pageStack[sp-1]);
3659                     if (--pos >= 0) {
3660                         posStack[sp-1] = pos;
3661                         do {
3662                             oid_t pageId = (this->*getOid)(pg, pos);
3663                             db->pool.unfix(pg);
3664                             pg = db->get(pageId);
3665                             pageStack[sp] = pageId;
3666                             posStack[sp] = pos = nItems(pg);
3667                         } while (++sp < height);
3668 
3669                         posStack[sp-1] = --pos;
3670                         break;
3671                     }
3672                 }
3673             } else {
3674                 posStack[sp-1] = pos;
3675             }
3676             if (sp == 0
3677                 || (sc.firstKey != NULL && comparator(sc.firstKey, (this->*getKey)(pg, pos), sizeofType) >= sc.firstKeyInclusion))
3678             {
3679                 memcpy(pageStack, oldPageStack, sp*sizeof(oid_t));
3680                 memcpy(posStack, oldPosStack, sp*sizeof(int));
3681                 break;
3682             }
3683         }
3684         if (sc.condition == NULL || db->evaluateBoolean(sc.condition, (this->*getOid)(pg, pos), table, sc.cursor)) {
3685             curr = (this->*getOid)(pg, pos);
3686             break;
3687         }
3688     }
3689     db->pool.unfix(pg);
3690     return curr;
3691 }
3692 
3693 
3694 
3695 END_GIGABASE_NAMESPACE
3696