1 //-< BTREE.CPP >-----------------------------------------------------*--------*
2 // GigaBASE Version 1.0 (c) 1999 GARRET * ? *
3 // (Post Relational Database Management System) * /\| *
4 // * / \ *
5 // Created: 1-Jan-99 K.A. Knizhnik * / [] \ *
6 // Last update: 25-Oct-99 K.A. Knizhnik * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // B-Tree implementation
9 //-------------------------------------------------------------------*--------*
10
11 #define INSIDE_GIGABASE
12
13 #include "gigabase.h"
14 #include "btree.h"
15
16
17 BEGIN_GIGABASE_NAMESPACE
18
19 #ifdef USE_LOCALE_SETTINGS
20 #define SCMP(x,y) STRCOLL(x,y)
21 #else
22 #define SCMP(x,y) STRCMP(x,y)
23 #endif
24
25
26 static const int keySize[] = {
27 sizeof(bool), // tpBool
28 sizeof(int1), // tpInt1
29 sizeof(int2), // tpInt2
30 sizeof(int4), // tpInt4
31 sizeof(db_int8), // tpInt8
32 sizeof(real4), // tpReal4
33 sizeof(real8), // tpReal8
34 8, // tpString
35 sizeof(oid_t), // tpReference
36 0, // tpArray,
37 0, // tpMethodBool,
38 0, // tpMethodInt1,
39 0, // tpMethodInt2,
40 0, // tpMethodInt4,
41 0, // tpMethodInt8,
42 0, // tpMethodReal4,
43 0, // tpMethodReal8,
44 0, // tpMethodString,
45 0, // tpMethodReference,
46 0, // tpStructure,
47 0, // tpRawBinary,
48 0, // tpStdString,
49 0, // tpMfcString,
50 0, // tpRectangle,
51 0 // tpUnknown
52 };
53
find(dbDatabase * db,oid_t treeId,dbSearchContext & sc,dbUDTComparator comparator)54 void dbBtree::find(dbDatabase* db, oid_t treeId, dbSearchContext& sc, dbUDTComparator comparator)
55 {
56 dbGetTie tie;
57 dbBtree* tree = (dbBtree*)db->getRow(tie, treeId);
58 oid_t rootId = tree->root;
59 int height = tree->height;
60 char_t firstKeyBuf[dbBtreePage::dbMaxKeyLen];
61 char_t lastKeyBuf[dbBtreePage::dbMaxKeyLen];
62 if (tree->type == dbField::tpString) {
63 bool tmpKeys = sc.tmpKeys;
64 if (sc.firstKey != NULL) {
65 if (tree->flags & FLAGS_CASE_INSENSITIVE) {
66 strlower(firstKeyBuf, sc.firstKey);
67 if (tmpKeys) {
68 delete[] sc.firstKey;
69 sc.tmpKeys = false;
70 }
71 sc.firstKey = firstKeyBuf;
72 }
73 }
74 if (sc.lastKey != NULL) {
75 if (tree->flags & FLAGS_CASE_INSENSITIVE) {
76 strlower(lastKeyBuf, sc.lastKey);
77 if (tmpKeys) {
78 delete[] sc.lastKey;
79 sc.tmpKeys = false;
80 }
81 sc.lastKey = lastKeyBuf;
82 }
83 }
84
85 }
86 if (rootId != 0) {
87 byte* page = db->get(rootId);
88 if (tree->flags & FLAGS_THICK) {
89 ((dbThickBtreePage*)page)->find(db, sc, tree->type, tree->sizeofType, comparator, height);
90 } else {
91 ((dbBtreePage*)page)->find(db, sc, tree->type, tree->sizeofType, comparator, height);
92 }
93 db->pool.unfix(page);
94 }
95 }
96
allocate(dbDatabase * db,int type,int sizeofType,int flags)97 oid_t dbBtree::allocate(dbDatabase* db, int type, int sizeofType, int flags)
98 {
99 oid_t oid = db->allocateId();
100 offs_t pos = db->allocate(sizeof(dbBtree));
101 db->setPos(oid, pos | dbModifiedFlag);
102 dbPutTie tie;
103 tie.set(db->pool, oid, pos, sizeof(dbBtree));
104 dbBtree* tree = (dbBtree*)tie.get();
105 tree->size = sizeof(dbBtree);
106 tree->root = 0;
107 tree->height = 0;
108 tree->type = type;
109 tree->sizeofType = sizeofType;
110 tree->flags = (int1)flags;
111 return oid;
112 }
113
insert(dbDatabase * db,oid_t treeId,oid_t recordId,int offs,dbUDTComparator comparator)114 bool dbBtree::insert(dbDatabase* db, oid_t treeId, oid_t recordId, int offs, dbUDTComparator comparator)
115 {
116 dbGetTie tie;
117 byte* p = (byte*)db->getRow(tie, recordId);
118 return insert(db, treeId, recordId, p, offs, comparator);
119 }
120
insert(dbDatabase * db,oid_t treeId,oid_t recordId,byte * p,int offs,dbUDTComparator comparator)121 bool dbBtree::insert(dbDatabase* db, oid_t treeId, oid_t recordId, byte* p, int offs, dbUDTComparator comparator)
122 {
123 dbGetTie treeTie;
124 dbBtree* tree = (dbBtree*)db->getRow(treeTie, treeId);
125 oid_t rootId = tree->root;
126 int height = tree->height;
127
128 if (tree->flags & FLAGS_THICK) {
129 dbThickBtreePage::item ins;
130 if (tree->type == dbField::tpString) {
131 ins.keyLen = ((dbVarying*)(p + offs))->size;
132 assert(ins.keyLen <= dbThickBtreePage::dbMaxKeyLen);
133 if (tree->flags & FLAGS_CASE_INSENSITIVE) {
134 strlower(ins.keyChar, (char_t*)(p + ((dbVarying*)(p + offs))->offs));
135 } else {
136 memcpy(ins.keyChar, p + ((dbVarying*)(p + offs))->offs, ins.keyLen*sizeof(char_t));
137 }
138 } else if (tree->type == dbField::tpRawBinary) {
139 memcpy(ins.keyChar, p + offs, tree->sizeofType);
140 } else {
141 memcpy(ins.keyChar, p + offs, keySize[tree->type]);
142 }
143 ins.oid = ins.recId = recordId;
144
145 if (rootId == 0) {
146 dbPutTie tie;
147 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
148 t->root = dbThickBtreePage::allocate(db, 0, tree->type, tree->sizeofType, ins);
149 t->height = 1;
150 } else {
151 int result;
152 result = dbThickBtreePage::insert(db, rootId, tree->type, tree->sizeofType, comparator, ins, height);
153 assert(result != not_found);
154 if (result == overflow) {
155 dbPutTie tie;
156 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
157 t->root = dbThickBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, ins);
158 t->height += 1;
159 }
160 }
161 } else { // normal Btree
162 bool unique = (tree->flags & FLAGS_UNIQUE) != 0;
163 dbBtreePage::item ins;
164 if (tree->type == dbField::tpString) {
165 ins.keyLen = ((dbVarying*)(p + offs))->size;
166 assert(ins.keyLen <= dbBtreePage::dbMaxKeyLen);
167 if (tree->flags & FLAGS_CASE_INSENSITIVE) {
168 strlower(ins.keyChar, (char_t*)(p + ((dbVarying*)(p + offs))->offs));
169 } else {
170 memcpy(ins.keyChar, p + ((dbVarying*)(p + offs))->offs, ins.keyLen*sizeof(char_t));
171 }
172 } else if (tree->type == dbField::tpRawBinary) {
173 memcpy(ins.keyChar, p + offs, tree->sizeofType);
174 } else {
175 memcpy(ins.keyChar, p + offs, keySize[tree->type]);
176 }
177 ins.oid = recordId;
178
179 if (rootId == 0) {
180 dbPutTie tie;
181 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
182 t->root = dbBtreePage::allocate(db, 0, tree->type, tree->sizeofType, ins);
183 t->height = 1;
184 } else {
185 int result;
186 result = dbBtreePage::insert(db, rootId, tree->type, tree->sizeofType, comparator, ins, height, unique);
187 assert(result != not_found);
188 if (result == overflow) {
189 dbPutTie tie;
190 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
191 t->root = dbBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, ins);
192 t->height += 1;
193 } else if (result == not_unique) {
194 return false;
195 }
196 }
197 }
198 return true;
199 }
200
201
insert(dbDatabase * db,oid_t treeId,dbBtreePage::item & ins,dbUDTComparator comparator)202 bool dbBtree::insert(dbDatabase* db, oid_t treeId, dbBtreePage::item& ins, dbUDTComparator comparator)
203 {
204 dbGetTie treeTie;
205 dbBtree* tree = (dbBtree*)db->getRow(treeTie, treeId);
206 oid_t rootId = tree->root;
207 int height = tree->height;
208
209 if (tree->flags & FLAGS_THICK) {
210 dbThickBtreePage::item thickIns;
211 thickIns.oid = thickIns.recId = ins.oid;
212 thickIns.keyLen = ins.keyLen;
213 if (tree->type == dbField::tpString) {
214 memcpy(thickIns.keyChar, ins.keyChar, ins.keyLen*sizeof(char_t));
215 assert(thickIns.keyLen <= dbThickBtreePage::dbMaxKeyLen);
216 if (tree->flags & FLAGS_CASE_INSENSITIVE) {
217 strlower(thickIns.keyChar, thickIns.keyChar);
218 }
219 } else {
220 thickIns.keyInt8 = ins.keyInt8;
221 }
222
223 if (rootId == 0) {
224 dbPutTie tie;
225 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
226 t->root = dbThickBtreePage::allocate(db, 0, tree->type, tree->sizeofType, thickIns);
227 t->height = 1;
228 } else {
229 int result;
230 result = dbThickBtreePage::insert(db, rootId, tree->type, tree->sizeofType, comparator, thickIns, height);
231 assert(result != not_found);
232 if (result == overflow) {
233 dbPutTie tie;
234 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
235 t->root = dbThickBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, thickIns);
236 t->height += 1;
237 }
238 }
239 } else { // normal Btree
240 bool unique = (tree->flags & FLAGS_UNIQUE) != 0;
241 if (tree->type == dbField::tpString) {
242 assert(ins.keyLen <= dbBtreePage::dbMaxKeyLen);
243 if (tree->flags & FLAGS_CASE_INSENSITIVE) {
244 strlower(ins.keyChar, ins.keyChar);
245 }
246 }
247
248 if (rootId == 0) {
249 dbPutTie tie;
250 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
251 t->root = dbBtreePage::allocate(db, 0, tree->type, tree->sizeofType, ins);
252 t->height = 1;
253 } else {
254 int result;
255 result = dbBtreePage::insert(db, rootId, tree->type, tree->sizeofType, comparator, ins, height, unique);
256 assert(result != not_found);
257 if (result == overflow) {
258 dbPutTie tie;
259 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
260 t->root = dbBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, ins);
261 t->height += 1;
262 } else if (result == not_unique) {
263 return false;
264 }
265 }
266 }
267 return true;
268 }
269
270
remove(dbDatabase * db,oid_t treeId,oid_t recordId,int offs,dbUDTComparator comparator)271 void dbBtree::remove(dbDatabase* db, oid_t treeId, oid_t recordId, int offs, dbUDTComparator comparator)
272 {
273 dbGetTie tie;
274 byte* p = (byte*)db->getRow(tie, recordId);
275 remove(db, treeId, recordId, p, offs, comparator);
276 }
277
remove(dbDatabase * db,oid_t treeId,oid_t recordId,byte * p,int offs,dbUDTComparator comparator)278 void dbBtree::remove(dbDatabase* db, oid_t treeId, oid_t recordId, byte* p, int offs, dbUDTComparator comparator)
279 {
280 dbGetTie treeTie;
281 dbBtree* tree = (dbBtree*)db->getRow(treeTie, treeId);
282 oid_t rootId = tree->root;
283 int height = tree->height;
284
285 if (rootId == 0) {
286 return;
287 }
288 if (tree->flags & FLAGS_THICK) {
289 dbThickBtreePage::item rem;
290 if (tree->type == dbField::tpString) {
291 rem.keyLen = ((dbVarying*)(p + offs))->size;
292 assert(rem.keyLen <= dbThickBtreePage::dbMaxKeyLen);
293 if (tree->flags & FLAGS_CASE_INSENSITIVE) {
294 strlower(rem.keyChar, (char_t*)(p + ((dbVarying*)(p + offs))->offs));
295 } else {
296 memcpy(rem.keyChar, p + ((dbVarying*)(p + offs))->offs, rem.keyLen*sizeof(char_t));
297 }
298 } else if (tree->type == dbField::tpRawBinary) {
299 memcpy(rem.keyChar, p + offs, tree->sizeofType);
300 } else {
301 memcpy(rem.keyChar, p + offs, keySize[tree->type]);
302 }
303 rem.oid = rem.recId = recordId;
304 int result = dbThickBtreePage::remove(db, rootId, tree->type, tree->sizeofType, comparator, rem, height);
305 assert(result != not_found);
306 if (result == underflow) {
307 dbThickBtreePage* page = (dbThickBtreePage*)db->get(rootId);
308 if (page->nItems == 0) {
309 dbPutTie tie;
310 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
311 if (height != 1) {
312 if (tree->type == dbField::tpString) {
313 t->root = page->keyStr[0].oid;
314 } else {
315 t->root = page->ref[dbThickBtreePage::maxItems-1].oid;
316 }
317 } else {
318 t->root = 0;
319 }
320 t->height -= 1;
321 db->freePage(rootId);
322 }
323 db->pool.unfix(page);
324 } else if (result == dbBtree::overflow) {
325 dbPutTie tie;
326 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
327 t->root = dbThickBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, rem);
328 t->height += 1;
329 }
330 } else { // normal B-Tree
331 dbBtreePage::item rem;
332 if (tree->type == dbField::tpString) {
333 rem.keyLen = ((dbVarying*)(p + offs))->size;
334 assert(rem.keyLen <= dbBtreePage::dbMaxKeyLen);
335 if (tree->flags & FLAGS_CASE_INSENSITIVE) {
336 strlower(rem.keyChar, (char_t*)(p + ((dbVarying*)(p + offs))->offs));
337 } else {
338 memcpy(rem.keyChar, p + ((dbVarying*)(p + offs))->offs, rem.keyLen*sizeof(char_t));
339 }
340 } else if (tree->type == dbField::tpRawBinary) {
341 memcpy(rem.keyChar, p + offs, tree->sizeofType);
342 } else {
343 memcpy(rem.keyChar, p + offs, keySize[tree->type]);
344 }
345 rem.oid = recordId;
346 int result = dbBtreePage::remove(db, rootId, tree->type, tree->sizeofType, comparator, rem, height);
347 assert(result != not_found);
348 if (result == underflow && height != 1) {
349 dbBtreePage* page = (dbBtreePage*)db->get(rootId);
350 if (page->nItems == 0) {
351 dbPutTie tie;
352 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
353 if (tree->type == dbField::tpString) {
354 t->root = page->keyStr[0].oid;
355 } else {
356 t->root = page->record[dbBtreePage::maxItems-1];
357 }
358 t->height -= 1;
359 db->freePage(rootId);
360 }
361 db->pool.unfix(page);
362 } else if (result == dbBtree::overflow) {
363 dbPutTie tie;
364 dbBtree* t = (dbBtree*)db->putRow(tie, treeId);
365 t->root = dbBtreePage::allocate(db, rootId, tree->type, tree->sizeofType, rem);
366 t->height += 1;
367 }
368 }
369 }
370
purge(dbDatabase * db,oid_t treeId)371 void dbBtree::purge(dbDatabase* db, oid_t treeId)
372 {
373 dbPutTie tie;
374 dbBtree* tree = (dbBtree*)db->putRow(tie, treeId);
375 if (tree->root != 0) {
376 if (tree->flags & FLAGS_THICK) {
377 dbThickBtreePage::purge(db, tree->root, tree->type, tree->height);
378 } else {
379 dbBtreePage::purge(db, tree->root, tree->type, tree->height);
380 }
381 tree->root = 0;
382 tree->height = 0;
383 }
384 }
385
drop(dbDatabase * db,oid_t treeId)386 void dbBtree::drop(dbDatabase* db, oid_t treeId)
387 {
388 purge(db, treeId);
389 db->free(db->getPos(treeId) & ~dbFlagsMask, sizeof(dbBtree));
390 db->freeId(treeId);
391 }
392
traverseForward(dbDatabase * db,oid_t treeId,dbAnyCursor * cursor,dbExprNode * condition)393 void dbBtree::traverseForward(dbDatabase* db, oid_t treeId,
394 dbAnyCursor* cursor, dbExprNode* condition)
395 {
396 dbGetTie tie;
397 dbBtree* tree = (dbBtree*)db->getRow(tie, treeId);
398 if (tree->root != 0) {
399 byte* page = db->get(tree->root);
400 if (tree->flags & FLAGS_THICK) {
401 ((dbThickBtreePage*)page)->traverseForward(db, cursor, condition, tree->type, tree->height);
402 } else {
403 ((dbBtreePage*)page)->traverseForward(db, cursor, condition, tree->type, tree->height);
404 }
405 db->pool.unfix(page);
406 }
407 }
408
409
traverseBackward(dbDatabase * db,oid_t treeId,dbAnyCursor * cursor,dbExprNode * condition)410 void dbBtree::traverseBackward(dbDatabase* db, oid_t treeId,
411 dbAnyCursor* cursor, dbExprNode* condition)
412 {
413 dbGetTie tie;
414 dbBtree* tree = (dbBtree*)db->getRow(tie, treeId);
415 if (tree->root != 0) {
416 byte* page = db->get(tree->root);
417 if (tree->flags & FLAGS_THICK) {
418 ((dbThickBtreePage*)page)->traverseBackward(db, cursor, condition, tree->type, tree->height);
419 } else {
420 ((dbBtreePage*)page)->traverseBackward(db, cursor, condition, tree->type, tree->height);
421 }
422 db->pool.unfix(page);
423 }
424 }
425
426 #define FIND(KEY, TYPE) \
427 if (sc.firstKey != NULL) { \
428 TYPE key = CAST(TYPE, sc.firstKey); \
429 while (l < r) { \
430 int i = (l+r) >> 1; \
431 if (CHECK(key, EXTRACT(KEY,i), firstKeyInclusion)) { \
432 l = i+1; \
433 } else { \
434 r = i; \
435 } \
436 } \
437 assert(r == l); \
438 } \
439 if (sc.lastKey != NULL) { \
440 TYPE key = CAST(TYPE, sc.lastKey); \
441 if (height == 0) { \
442 while (l < n) { \
443 if (CHECK(EXTRACT(KEY,l), key, lastKeyInclusion)) { \
444 return false; \
445 } \
446 if (!sc.condition \
447 || db->evaluateBoolean(sc.condition, record[maxItems-1-l],\
448 table, sc.cursor)) \
449 { \
450 if (!sc.cursor->add(record[maxItems-1-l])) { \
451 return false; \
452 } \
453 } \
454 l += 1; \
455 } \
456 return true; \
457 } else { \
458 do { \
459 dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-1-l]);\
460 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {\
461 db->pool.unfix(pg); \
462 return false; \
463 } \
464 db->pool.unfix(pg); \
465 if (l == n) { \
466 return true; \
467 } \
468 } while(LESS_OR_EQUAL(EXTRACT(KEY,l++), key)); \
469 return false; \
470 } \
471 } \
472 break
473
474 #define FIND_REVERSE(KEY, TYPE) \
475 if (sc.lastKey != NULL) { \
476 TYPE key = CAST(TYPE, sc.lastKey); \
477 while (l < r) { \
478 int i = (l+r) >> 1; \
479 if (CHECK(key, EXTRACT(KEY,i), !lastKeyInclusion)) { \
480 l = i+1; \
481 } else { \
482 r = i; \
483 } \
484 } \
485 assert(r == l); \
486 } \
487 if (sc.firstKey != NULL) { \
488 TYPE key = CAST(TYPE, sc.firstKey); \
489 if (height == 0) { \
490 while (--r >= 0) { \
491 if (CHECK(key, EXTRACT(KEY,r), firstKeyInclusion)) { \
492 return false; \
493 } \
494 if (!sc.condition \
495 || db->evaluateBoolean(sc.condition, record[maxItems-1-r],\
496 table, sc.cursor)) \
497 { \
498 if (!sc.cursor->add(record[maxItems-1-r])) { \
499 return false; \
500 } \
501 } \
502 } \
503 return true; \
504 } else { \
505 do { \
506 dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-1-r]);\
507 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {\
508 db->pool.unfix(pg); \
509 return false; \
510 } \
511 db->pool.unfix(pg); \
512 } while(--r >= 0 && !CHECK(key, EXTRACT(KEY,r), firstKeyInclusion)); \
513 return r < 0; \
514 } \
515 } \
516 break
517
find(dbDatabase * db,dbSearchContext & sc,int type,int sizeofType,dbUDTComparator comparator,int height)518 bool dbBtreePage::find(dbDatabase* db, dbSearchContext& sc, int type, int sizeofType,
519 dbUDTComparator comparator, int height)
520 {
521 int l = 0, n = nItems, r = n;
522 int diff;
523 dbTableDescriptor* table = sc.cursor->table;
524 sc.probes += 1;
525 height -= 1;
526 int firstKeyInclusion = sc.firstKeyInclusion;
527 int lastKeyInclusion = sc.lastKeyInclusion;
528
529 if (sc.ascent) {
530 #define CHECK(a, b, inclusion) (a > b || (a == b && !inclusion))
531 #define EXTRACT(array, index) array[index]
532 #define CAST(TYPE, expr) *(TYPE*)(expr)
533 #define LESS_OR_EQUAL(a, b) (a <= b)
534 switch (type) {
535 case dbField::tpBool:
536 case dbField::tpInt1:
537 FIND(keyInt1, int1);
538 case dbField::tpInt2:
539 FIND(keyInt2, int2);
540 case dbField::tpInt4:
541 FIND(keyInt4, int4);
542 case dbField::tpInt8:
543 FIND(keyInt8, db_int8);
544 case dbField::tpReference:
545 FIND(keyOid, oid_t);
546 case dbField::tpReal4:
547 FIND(keyReal4, real4);
548 case dbField::tpReal8:
549 FIND(keyReal8, real8);
550 case dbField::tpRawBinary:
551 #undef CHECK
552 #undef EXTRACT
553 #undef CAST
554 #undef LESS_OR_EQUAL
555 #define CHECK(a, b, inclusion) ((diff = comparator(a, b, sizeofType)) > 0 || (diff == 0 && !inclusion))
556 #define EXTRACT(array, index) (array + (index)*sizeofType)
557 #define CAST(TYPE, expr) (TYPE)(expr)
558 #define LESS_OR_EQUAL(a, b) (comparator(a, b, sizeofType) <= 0)
559 FIND(keyChar, void*);
560
561 case dbField::tpString:
562 if (sc.firstKey != NULL) {
563 char_t* firstKey = sc.firstKey;
564 while (l < r) {
565 int i = (l+r) >> 1;
566 if (SCMP(firstKey, (char_t*)&keyChar[keyStr[i].offs])
567 >= sc.firstKeyInclusion)
568 {
569 l = i + 1;
570 } else {
571 r = i;
572 }
573 }
574 assert(r == l);
575 }
576 if (sc.lastKey != NULL) {
577 char_t* lastKey = sc.lastKey;
578 if (height == 0) {
579 while (l < n) {
580 if (SCMP((char_t*)&keyChar[keyStr[l].offs],
581 lastKey) >= sc.lastKeyInclusion)
582 {
583 return false;
584 }
585 if (!sc.condition
586 || db->evaluateBoolean(sc.condition, keyStr[l].oid, table, sc.cursor))
587 {
588 if (!sc.cursor->add(keyStr[l].oid)) {
589 return false;
590 }
591 }
592 l += 1;
593 }
594 } else {
595 do {
596 dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[l].oid);
597 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
598 db->pool.unfix(pg);
599 return false;
600 }
601 db->pool.unfix(pg);
602 if (l == n) {
603 return true;
604 }
605 } while (SCMP((char_t*)&keyChar[keyStr[l++].offs], lastKey) <= 0);
606 return false;
607 }
608 } else if (sc.prefixLength != 0) {
609 char_t* prefix = sc.firstKey;
610 if (height == 0) {
611 while (l < n) {
612 if (memcmp(&keyChar[keyStr[l].offs], prefix, sc.prefixLength*sizeof(char_t)) != 0) {
613 return false;
614 }
615 if (!sc.condition
616 || db->evaluateBoolean(sc.condition, keyStr[l].oid, table, sc.cursor))
617 {
618 if (!sc.cursor->add(keyStr[l].oid)) {
619 return false;
620 }
621 }
622 l += 1;
623 }
624 } else {
625 do {
626 dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[l].oid);
627 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
628 db->pool.unfix(pg);
629 return false;
630 }
631 db->pool.unfix(pg);
632 if (l == n) {
633 return true;
634 }
635 } while (memcmp(&keyChar[keyStr[l++].offs], prefix, sc.prefixLength*sizeof(char_t)) == 0);
636 return false;
637 }
638 } else {
639 if (height == 0) {
640 if (sc.condition) {
641 while (l < n) {
642 if (db->evaluateBoolean(sc.condition, keyStr[l].oid, table, sc.cursor)) {
643 if (!sc.cursor->add(keyStr[l].oid)) {
644 return false;
645 }
646 }
647 l += 1;
648 }
649 } else {
650 while (l < n) {
651 if (!sc.cursor->add(keyStr[l].oid)) {
652 return false;
653 }
654 l += 1;
655 }
656 }
657 } else {
658 do {
659 dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[l].oid);
660 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
661 db->pool.unfix(pg);
662 return false;
663 }
664 db->pool.unfix(pg);
665 } while (++l <= n);
666 }
667 }
668 return true;
669 default:
670 assert(false);
671 }
672 if (height == 0) {
673 if (sc.condition) {
674 while (l < n) {
675 if (db->evaluateBoolean(sc.condition, record[maxItems-1-l], table, sc.cursor)) {
676 if (!sc.cursor->add(record[maxItems-1-l])) {
677 return false;
678 }
679 }
680 l += 1;
681 }
682 } else {
683 while (l < n) {
684 if (!sc.cursor->add(record[maxItems-1-l])) {
685 return false;
686 }
687 l += 1;
688 }
689 }
690 } else {
691 do {
692 dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-1-l]);
693 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
694 db->pool.unfix(pg);
695 return false;
696 }
697 db->pool.unfix(pg);
698 } while (++l <= n);
699 }
700 } else { // descent ordering
701 #undef CHECK
702 #undef EXTRACT
703 #undef CAST
704 #undef LESS_OR_EQUAL
705 #define CHECK(a, b, inclusion) (a > b || (a == b && !inclusion))
706 #define EXTRACT(array, index) array[index]
707 #define CAST(TYPE, expr) *(TYPE*)(expr)
708 #define LESS_OR_EQUAL(a, b) (a <= b)
709 switch (type) {
710 case dbField::tpBool:
711 case dbField::tpInt1:
712 FIND_REVERSE(keyInt1, int1);
713 case dbField::tpInt2:
714 FIND_REVERSE(keyInt2, int2);
715 case dbField::tpInt4:
716 FIND_REVERSE(keyInt4, int4);
717 case dbField::tpInt8:
718 FIND_REVERSE(keyInt8, db_int8);
719 case dbField::tpReference:
720 FIND_REVERSE(keyOid, oid_t);
721 case dbField::tpReal4:
722 FIND_REVERSE(keyReal4, real4);
723 case dbField::tpReal8:
724 FIND_REVERSE(keyReal8, real8);
725
726 case dbField::tpRawBinary:
727 #undef CHECK
728 #undef EXTRACT
729 #undef CAST
730 #undef LESS_OR_EQUAL
731 #define CHECK(a, b, inclusion) ((diff = comparator(a, b, sizeofType)) > 0 || (diff == 0 && !inclusion))
732 #define EXTRACT(array, index) (array + (index)*sizeofType)
733 #define CAST(TYPE, expr) (TYPE)(expr)
734 #define LESS_OR_EQUAL(a, b) (comparator(a, b, sizeofType) <= 0)
735 FIND_REVERSE(keyChar, void*);
736
737
738 case dbField::tpString:
739 if (sc.lastKey != NULL) {
740 char_t* key = sc.lastKey;
741 while (l < r) {
742 int i = (l+r) >> 1;
743 if (SCMP(key, (char_t*)&keyChar[keyStr[i].offs]) >= 1-lastKeyInclusion) {
744 l = i + 1;
745 } else {
746 r = i;
747 }
748 }
749 assert(r == l);
750 }
751 if (sc.firstKey != NULL) {
752 char_t* key = sc.firstKey;
753 if (height == 0) {
754 while (--r >= 0) {
755 if (SCMP(key, (char_t*)&keyChar[keyStr[r].offs]) >= firstKeyInclusion) {
756 return false;
757 }
758 if (!sc.condition
759 || db->evaluateBoolean(sc.condition, keyStr[r].oid, table, sc.cursor))
760 {
761 if (!sc.cursor->add(keyStr[r].oid)) {
762 return false;
763 }
764 }
765 }
766 } else {
767 do {
768 dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[r].oid);
769 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
770 db->pool.unfix(pg);
771 return false;
772 }
773 db->pool.unfix(pg);
774 } while (--r >= 0 && SCMP(key, (char_t*)&keyChar[keyStr[r].offs]) < firstKeyInclusion);
775 return r < 0;
776 }
777 } else {
778 if (height == 0) {
779 if (sc.condition) {
780 while (--r >= 0) {
781 if (db->evaluateBoolean(sc.condition, keyStr[r].oid, table, sc.cursor)) {
782 if (!sc.cursor->add(keyStr[r].oid)) {
783 return false;
784 }
785 }
786 }
787 } else {
788 while (--r >= 0) {
789 if (!sc.cursor->add(keyStr[r].oid)) {
790 return false;
791 }
792 }
793 }
794 } else {
795 do {
796 dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[r].oid);
797 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
798 db->pool.unfix(pg);
799 return false;
800 }
801 db->pool.unfix(pg);
802 } while (--r >= 0);
803 }
804 }
805 return true;
806 default:
807 assert(false);
808 }
809 if (height == 0) {
810 if (sc.condition) {
811 while (--r >= 0) {
812 if (db->evaluateBoolean(sc.condition, record[maxItems-1-r], table, sc.cursor)) {
813 if (!sc.cursor->add(record[maxItems-1-r])) {
814 return false;
815 }
816 }
817 }
818 } else {
819 while (--r >= 0) {
820 if (!sc.cursor->add(record[maxItems-1-r])) {
821 return false;
822 }
823 }
824 }
825 } else {
826 do {
827 dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-1-r]);
828 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
829 db->pool.unfix(pg);
830 return false;
831 }
832 db->pool.unfix(pg);
833 } while (--r >= 0);
834 }
835 }
836 return true;
837 }
838
839
allocate(dbDatabase * db,oid_t root,int type,int sizeofType,item & ins)840 oid_t dbBtreePage::allocate(dbDatabase* db, oid_t root, int type, int sizeofType, item& ins)
841 {
842 oid_t pageId = db->allocatePage();
843 dbBtreePage* page = (dbBtreePage*)db->put(pageId);
844 page->nItems = 1;
845 if (type == dbField::tpString) {
846 page->size = ins.keyLen*sizeof(char_t);
847 page->keyStr[0].offs = (nat2)(sizeof(page->keyChar) - ins.keyLen*sizeof(char_t));
848 page->keyStr[0].size = ins.keyLen;
849 page->keyStr[0].oid = ins.oid;
850 page->keyStr[1].oid = root;
851 memcpy(&page->keyChar[page->keyStr[0].offs], ins.keyChar, ins.keyLen*sizeof(char_t));
852 } else if (type == dbField::tpRawBinary) {
853 memcpy(page->keyChar, ins.keyChar, sizeofType);
854 page->record[maxItems-1] = ins.oid;
855 page->record[maxItems-2] = root;
856 } else {
857 memcpy(page->keyChar, ins.keyChar, keySize[type]);
858 page->record[maxItems-1] = ins.oid;
859 page->record[maxItems-2] = root;
860 }
861 db->pool.unfix(page);
862 return pageId;
863 }
864
865
866 #define INSERT(KEY, TYPE) { \
867 TYPE key = ins.KEY; \
868 while (l < r) { \
869 int i = (l+r) >> 1; \
870 if (key > pg->KEY[i]) l = i+1; else r = i; \
871 } \
872 assert(l == r); \
873 /* insert before e[r] */ \
874 if (--height != 0) { \
875 result = insert(db, pg->record[maxItems-r-1], type, sizeofType, comparator, ins, height, unique); \
876 assert(result == dbBtree::not_unique || result == dbBtree::done || result == dbBtree::overflow); \
877 if (result != dbBtree::overflow) { \
878 db->pool.unfix(pg); \
879 return result; \
880 } \
881 n += 1; \
882 } else if (unique && r < n && key == pg->KEY[r]) { \
883 db->pool.unfix(pg); \
884 return dbBtree::not_unique; \
885 } \
886 db->pool.unfix(pg); \
887 pg = (dbBtreePage*)db->put(tie, pageId); \
888 const int max = sizeof(pg->KEY) / (sizeof(oid_t) + sizeof(TYPE)); \
889 if (n < max) { \
890 memmove(&pg->KEY[r+1], &pg->KEY[r], (n - r)*sizeof(TYPE)); \
891 memmove(&pg->record[maxItems-n-1], &pg->record[maxItems-n], \
892 (n-r)*sizeof(oid_t)); \
893 pg->KEY[r] = ins.KEY; \
894 pg->record[maxItems-r-1] = ins.oid; \
895 pg->nItems += 1; \
896 return dbBtree::done; \
897 } else { /* page is full then divide page */ \
898 oid_t pageId = db->allocatePage(); \
899 dbBtreePage* b = (dbBtreePage*)db->put(pageId); \
900 assert(n == max); \
901 const int m = (max+1)/2; \
902 if (r < m) { \
903 memcpy(b->KEY, pg->KEY, r*sizeof(TYPE)); \
904 b->KEY[r] = ins.KEY; \
905 memcpy(&b->KEY[r+1], &pg->KEY[r], (m-r-1)*sizeof(TYPE)); \
906 memmove(pg->KEY, &pg->KEY[m-1], (max-m+1)*sizeof(TYPE)); \
907 memcpy(&b->record[maxItems-r], &pg->record[maxItems-r], \
908 r*sizeof(oid_t)); \
909 b->record[maxItems-r-1] = ins.oid; \
910 memcpy(&b->record[maxItems-m], &pg->record[maxItems-m+1], \
911 (m-r-1)*sizeof(oid_t)); \
912 memmove(&pg->record[maxItems-max+m-1],&pg->record[maxItems-max],\
913 (max-m+1)*sizeof(oid_t)); \
914 } else { \
915 memcpy(b->KEY, pg->KEY, m*sizeof(TYPE)); \
916 memmove(pg->KEY, &pg->KEY[m], (r-m)*sizeof(TYPE)); \
917 pg->KEY[r-m] = ins.KEY; \
918 memmove(&pg->KEY[r-m+1], &pg->KEY[r], (max-r)*sizeof(TYPE)); \
919 memcpy(&b->record[maxItems-m], &pg->record[maxItems-m], \
920 m*sizeof(oid_t)); \
921 memmove(&pg->record[maxItems-r+m], &pg->record[maxItems-r], \
922 (r-m)*sizeof(oid_t)); \
923 pg->record[maxItems-r+m-1] = ins.oid; \
924 memmove(&pg->record[maxItems-max+m-1],&pg->record[maxItems-max],\
925 (max-r)*sizeof(oid_t)); \
926 } \
927 ins.oid = pageId; \
928 ins.KEY = b->KEY[m-1]; \
929 if (height == 0) { \
930 pg->nItems = max - m + 1; \
931 b->nItems = m; \
932 } else { \
933 pg->nItems = max - m; \
934 b->nItems = m - 1; \
935 } \
936 db->pool.unfix(b); \
937 return dbBtree::overflow; \
938 } \
939 }
940
941
insert(dbDatabase * db,oid_t pageId,int type,int sizeofType,dbUDTComparator comparator,item & ins,int height,bool unique)942 int dbBtreePage::insert(dbDatabase* db, oid_t pageId, int type, int sizeofType,
943 dbUDTComparator comparator, item& ins, int height, bool unique)
944 {
945 dbPutTie tie;
946 dbBtreePage* pg = (dbBtreePage*)db->get(pageId);
947 int result;
948 int l = 0, n = pg->nItems, r = n;
949
950 switch (type) {
951 case dbField::tpBool:
952 case dbField::tpInt1:
953 INSERT(keyInt1, int1);
954 case dbField::tpInt2:
955 INSERT(keyInt2, int2);
956 case dbField::tpInt4:
957 INSERT(keyInt4, int4);
958 case dbField::tpInt8:
959 INSERT(keyInt8, db_int8);
960 case dbField::tpReference:
961 INSERT(keyOid, oid_t);
962 case dbField::tpReal4:
963 INSERT(keyReal4, real4);
964 case dbField::tpReal8:
965 INSERT(keyReal8, real8);
966 case dbField::tpRawBinary:
967 {
968 char* key = (char*)ins.keyChar;
969 while (l < r) {
970 int i = (l+r) >> 1;
971 if (comparator(key, pg->keyChar + i*sizeofType, sizeofType) > 0) l = i+1; else r = i;
972 }
973 assert(l == r);
974 /* insert before e[r] */
975 if (--height != 0) {
976 result = insert(db, pg->record[maxItems-r-1], type, sizeofType, comparator, ins, height, unique);
977 assert(result == dbBtree::not_unique || result == dbBtree::done || result == dbBtree::overflow);
978 if (result != dbBtree::overflow) {
979 db->pool.unfix(pg);
980 return result;
981 }
982 n += 1;
983 } else if (unique && r < n && comparator(key, pg->keyChar + r*sizeofType, sizeofType) == 0) {
984 db->pool.unfix(pg);
985 return dbBtree::not_unique;
986 }
987 db->pool.unfix(pg);
988 pg = (dbBtreePage*)db->put(tie, pageId);
989 const int max = sizeof(pg->keyChar) / (sizeof(oid_t) + sizeofType);
990 if (n < max) {
991 memmove(pg->keyChar + (r+1)*sizeofType, pg->keyChar + r*sizeofType, (n - r)*sizeofType);
992 memmove(&pg->record[maxItems-n-1], &pg->record[maxItems-n],
993 (n-r)*sizeof(oid_t));
994 memcpy(pg->keyChar + r*sizeofType, ins.keyChar, sizeofType);
995 pg->record[maxItems-r-1] = ins.oid;
996 pg->nItems += 1;
997 return dbBtree::done;
998 } else { /* page is full then divide page */
999 oid_t pageId = db->allocatePage();
1000 dbBtreePage* b = (dbBtreePage*)db->put(pageId);
1001 assert(n == max);
1002 const int m = (max+1)/2;
1003 if (r < m) {
1004 memcpy(b->keyChar, pg->keyChar, r*sizeofType);
1005 memcpy(b->keyChar + r*sizeofType, ins.keyChar, sizeofType);
1006 memcpy(b->keyChar + (r+1)*sizeofType, pg->keyChar + r*sizeofType, (m-r-1)*sizeofType);
1007 memmove(pg->keyChar, pg->keyChar + (m-1)*sizeofType, (max-m+1)*sizeofType);
1008 memcpy(&b->record[maxItems-r], &pg->record[maxItems-r],
1009 r*sizeof(oid_t));
1010 b->record[maxItems-r-1] = ins.oid;
1011 memcpy(&b->record[maxItems-m], &pg->record[maxItems-m+1],
1012 (m-r-1)*sizeof(oid_t));
1013 memmove(&pg->record[maxItems-max+m-1],&pg->record[maxItems-max],
1014 (max-m+1)*sizeof(oid_t));
1015 } else {
1016 memcpy(b->keyChar, pg->keyChar, m*sizeofType);
1017 memmove(pg->keyChar, pg->keyChar + m*sizeofType, (r-m)*sizeofType);
1018 memcpy(pg->keyChar + (r-m)*sizeofType, ins.keyChar, sizeofType);
1019 memmove(pg->keyChar + (r-m+1)*sizeofType, pg->keyChar + r*sizeofType,
1020 (max-r)*sizeofType);
1021 memcpy(&b->record[maxItems-m], &pg->record[maxItems-m],
1022 m*sizeof(oid_t));
1023 memmove(&pg->record[maxItems-r+m], &pg->record[maxItems-r],
1024 (r-m)*sizeof(oid_t));
1025 pg->record[maxItems-r+m-1] = ins.oid;
1026 memmove(&pg->record[maxItems-max+m-1],&pg->record[maxItems-max],
1027 (max-r)*sizeof(oid_t));
1028 }
1029 ins.oid = pageId;
1030 memcpy(ins.keyChar, b->keyChar + (m-1)*sizeofType, sizeofType);
1031 if (height == 0) {
1032 pg->nItems = max - m + 1;
1033 b->nItems = m;
1034 } else {
1035 pg->nItems = max - m;
1036 b->nItems = m - 1;
1037 }
1038 db->pool.unfix(b);
1039 return dbBtree::overflow;
1040 }
1041 }
1042 case dbField::tpString:
1043 {
1044 char_t* key = ins.keyChar;
1045 while (l < r) {
1046 int i = (l+r) >> 1;
1047 if (SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[i].offs]) > 0) {
1048 l = i+1;
1049 } else {
1050 r = i;
1051 }
1052 }
1053 if (--height != 0) {
1054 result = insert(db, pg->keyStr[r].oid, type, sizeofType, comparator, ins, height, unique);
1055 assert (result != dbBtree::not_found);
1056 if (result != dbBtree::overflow) {
1057 db->pool.unfix(pg);
1058 return result;
1059 }
1060 } else if (unique && r < n && SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[r].offs]) == 0) {
1061 db->pool.unfix(pg);
1062 return dbBtree::not_unique;
1063 }
1064 db->pool.unfix(pg);
1065 pg = (dbBtreePage*)db->put(tie, pageId);
1066 return pg->insertStrKey(db, r, ins, height);
1067 }
1068 default:
1069 assert(false);
1070 }
1071 return dbBtree::done;
1072 }
1073
insertStrKey(dbDatabase * db,int r,item & ins,int height)1074 int dbBtreePage::insertStrKey(dbDatabase* db, int r, item& ins, int height)
1075 {
1076 int n = height != 0 ? nItems+1 : nItems;
1077 // insert before e[r]
1078 int len = ins.keyLen;
1079 if (size + len*sizeof(char_t) + (n+1)*sizeof(str) <= sizeof(keyChar)) {
1080 memmove(&keyStr[r+1], &keyStr[r], (n-r)*sizeof(str));
1081 size += len*sizeof(char_t);
1082 keyStr[r].offs = nat2(sizeof(keyChar) - size);
1083 keyStr[r].size = len;
1084 keyStr[r].oid = ins.oid;
1085 memcpy(&keyChar[sizeof(keyChar) - size], ins.keyChar, len*sizeof(char_t));
1086 nItems += 1;
1087 } else { // page is full then divide page
1088 oid_t pageId = db->allocatePage();
1089 dbBtreePage* b = (dbBtreePage*)db->put(pageId);
1090 size_t moved = 0;
1091 size_t inserted = len*sizeof(char_t) + sizeof(str);
1092 long prevDelta = (1L << (sizeof(long)*8-1)) + 1;
1093 for (int bn = 0, i = 0; ; bn += 1) {
1094 size_t addSize, subSize;
1095 int j = nItems - i - 1;
1096 size_t keyLen = keyStr[i].size;
1097 if (bn == r) {
1098 keyLen = len;
1099 inserted = 0;
1100 addSize = len;
1101 if (height == 0) {
1102 subSize = 0;
1103 j += 1;
1104 } else {
1105 subSize = keyStr[i].size;
1106 }
1107 } else {
1108 addSize = subSize = keyLen;
1109 if (height != 0) {
1110 if (i + 1 != r) {
1111 subSize += keyStr[i+1].size;
1112 j -= 1;
1113 } else {
1114 inserted = 0;
1115 }
1116 }
1117 }
1118 long delta = (long)((moved + addSize*sizeof(char_t) + (bn+1)*sizeof(str))
1119 - (j*sizeof(str) + size - subSize*sizeof(char_t) + inserted));
1120 if (delta >= -prevDelta) {
1121 char_t insKey[dbBtreePage::dbMaxKeyLen];
1122 if (bn <= r) {
1123 memcpy(insKey, ins.keyChar, len*sizeof(char_t));
1124 }
1125 if (height == 0) {
1126 ins.keyLen = b->keyStr[bn-1].size;
1127 memcpy(ins.keyChar, &b->keyChar[b->keyStr[bn-1].offs], ins.keyLen*sizeof(char_t));
1128 } else {
1129 assert(((void)"String fits in the B-Tree page",
1130 moved + (bn+1)*sizeof(str) <= sizeof(keyChar)));
1131 if (bn != r) {
1132 ins.keyLen = (int)keyLen;
1133 memcpy(ins.keyChar, &keyChar[keyStr[i].offs], keyLen*sizeof(char_t));
1134 b->keyStr[bn].oid = keyStr[i].oid;
1135 size -= (nat4)(keyLen*sizeof(char_t));
1136 i += 1;
1137 } else {
1138 b->keyStr[bn].oid = ins.oid;
1139 }
1140 }
1141 compactify(db, i);
1142 if (bn < r || (bn == r && height == 0)) {
1143 memmove(&keyStr[r-i+1], &keyStr[r-i],
1144 (n - r)*sizeof(str));
1145 size += len*sizeof(char_t);
1146 nItems += 1;
1147 assert(((void)"String fits in the B-Tree page",
1148 size + (n-i+1)*sizeof(str) <= sizeof(keyChar)));
1149 keyStr[r-i].offs = (nat2)(sizeof(keyChar) - size);
1150 keyStr[r-i].size = len;
1151 keyStr[r-i].oid = ins.oid;
1152 memcpy(&keyChar[keyStr[r-i].offs], insKey, len*sizeof(char_t));
1153 }
1154 b->nItems = bn;
1155 b->size = (nat4)moved;
1156 ins.oid = pageId;
1157 db->pool.unfix(b);
1158 return dbBtree::overflow;
1159 }
1160 prevDelta = delta;
1161 moved += keyLen*sizeof(char_t);
1162 assert(((void)"String fits in the B-Tree page",
1163 moved + (bn+1)*sizeof(str) <= sizeof(keyChar)));
1164 b->keyStr[bn].size = (nat2)keyLen;
1165 b->keyStr[bn].offs = (nat2)(sizeof(keyChar) - moved);
1166 if (bn == r) {
1167 b->keyStr[bn].oid = ins.oid;
1168 memcpy(&b->keyChar[b->keyStr[bn].offs], ins.keyChar, keyLen*sizeof(char_t));
1169 } else {
1170 b->keyStr[bn].oid = keyStr[i].oid;
1171 memcpy(&b->keyChar[b->keyStr[bn].offs],
1172 &keyChar[keyStr[i].offs], keyLen*sizeof(char_t));
1173 size -= (nat4)(keyLen*sizeof(char_t));
1174 i += 1;
1175 }
1176 }
1177 }
1178 return nItems == 0 || (size + sizeof(str)*(nItems+1))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
1179 ? dbBtree::underflow : dbBtree::done;
1180 }
1181
compactify(dbDatabase * db,int m)1182 void dbBtreePage::compactify(dbDatabase* db, int m)
1183 {
1184 int i, j, offs, len, n = nItems;
1185 // int size[dbPageSize]; // for 8Kb default page size, total size used by these buffers will be 64kb which is too much for many environments
1186 // int index[dbPageSize];
1187 int* size = db->btreeBuf;
1188 int* index = size + dbPageSize;
1189 if (m == 0) {
1190 return;
1191 }
1192 if (m < 0) {
1193 m = -m;
1194 for (i = 0; i < n-m; i++) {
1195 len = keyStr[i].size;
1196 size[keyStr[i].offs + len*sizeof(char_t)] = len;
1197 index[keyStr[i].offs + len*sizeof(char_t)] = i;
1198 }
1199 for (; i < n; i++) {
1200 len = keyStr[i].size;
1201 size[keyStr[i].offs + len*sizeof(char_t)] = len;
1202 index[keyStr[i].offs + len*sizeof(char_t)] = -1;
1203 }
1204 } else {
1205 for (i = 0; i < m; i++) {
1206 len = keyStr[i].size;
1207 size[keyStr[i].offs + len*sizeof(char_t)] = len;
1208 index[keyStr[i].offs + len*sizeof(char_t)] = -1;
1209 }
1210 for (; i < n; i++) {
1211 len = keyStr[i].size;
1212 size[keyStr[i].offs + len*sizeof(char_t)] = len;
1213 index[keyStr[i].offs + len*sizeof(char_t)] = i - m;
1214 keyStr[i-m].oid = keyStr[i].oid;
1215 keyStr[i-m].size = len;
1216 }
1217 keyStr[i-m].oid = keyStr[i].oid;
1218 }
1219 nItems = n -= m;
1220 for (offs = sizeof(keyChar), i = offs; n != 0; i -= len) {
1221 len = size[i]*sizeof(char_t);
1222 j = index[i];
1223 if (j >= 0) {
1224 offs -= len;
1225 n -= 1;
1226 keyStr[j].offs = offs;
1227 if (offs != i - len) {
1228 memmove(&keyChar[offs], &keyChar[(i - len)], len);
1229 }
1230 }
1231 }
1232 }
1233
removeStrKey(dbDatabase * db,int r)1234 int dbBtreePage::removeStrKey(dbDatabase* db, int r)
1235 {
1236 int len = keyStr[r].size;
1237 int offs = keyStr[r].offs;
1238 memmove(keyChar + sizeof(keyChar) - size + len*sizeof(char_t),
1239 keyChar + sizeof(keyChar) - size,
1240 size - sizeof(keyChar) + offs);
1241 memmove(&keyStr[r], &keyStr[r+1], (nItems-r)*sizeof(str));
1242 nItems -= 1;
1243 size -= len*sizeof(char_t);
1244 for (int i = nItems; --i >= 0; ) {
1245 if (keyStr[i].offs < offs) {
1246 keyStr[i].offs += (nat2)(len*sizeof(char_t));
1247 }
1248 }
1249 return nItems == 0 || (size + sizeof(str)*(nItems+1))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
1250 ? dbBtree::underflow : dbBtree::done;
1251 }
1252
replaceStrKey(dbDatabase * db,int r,item & ins,int height)1253 int dbBtreePage::replaceStrKey(dbDatabase* db, int r, item& ins, int height)
1254 {
1255 ins.oid = keyStr[r].oid;
1256 removeStrKey(db, r);
1257 return insertStrKey(db, r, ins, height);
1258 }
1259
handlePageUnderflow(dbDatabase * db,int r,int type,int sizeofType,item & rem,int height)1260 int dbBtreePage::handlePageUnderflow(dbDatabase* db, int r, int type, int sizeofType, item& rem,
1261 int height)
1262 {
1263 dbPutTie tie;
1264 if (type == dbField::tpString) {
1265 dbBtreePage* a = (dbBtreePage*)db->put(tie, keyStr[r].oid);
1266 int an = a->nItems;
1267 if (r < (int)nItems) { // exists greater page
1268 dbBtreePage* b = (dbBtreePage*)db->get(keyStr[r+1].oid);
1269 int bn = b->nItems;
1270 size_t merged_size = (an+bn)*sizeof(str) + a->size + b->size;
1271 if (height != 1) {
1272 merged_size += keyStr[r].size*sizeof(char_t) + sizeof(str)*2;
1273 }
1274 if (merged_size > sizeof(keyChar)) {
1275 // reallocation of nodes between pages a and b
1276 int i, j, k;
1277 dbPutTie tie;
1278 db->pool.unfix(b);
1279 b = (dbBtreePage*)db->put(tie, keyStr[r+1].oid);
1280 size_t size_a = a->size;
1281 size_t size_b = b->size;
1282 size_t addSize, subSize;
1283 if (height != 1) {
1284 addSize = keyStr[r].size;
1285 subSize = b->keyStr[0].size;
1286 } else {
1287 addSize = subSize = b->keyStr[0].size;
1288 }
1289 i = 0;
1290 long prevDelta = (long)((an*sizeof(str) + size_a) - (bn*sizeof(str) + size_b));
1291 while (true) {
1292 i += 1;
1293 long delta = (long)(((an+i)*sizeof(str) + size_a + addSize*sizeof(char_t))
1294 - ((bn-i)*sizeof(str) + size_b - subSize*sizeof(char_t)));
1295 if (delta >= 0) {
1296 if (delta >= -prevDelta) {
1297 i -= 1;
1298 }
1299 break;
1300 }
1301 size_a += addSize*sizeof(char_t);
1302 size_b -= subSize*sizeof(char_t);
1303 prevDelta = delta;
1304 if (height != 1) {
1305 addSize = subSize;
1306 subSize = b->keyStr[i].size;
1307 } else {
1308 addSize = subSize = b->keyStr[i].size;
1309 }
1310 }
1311 int result = dbBtree::done;
1312 if (i > 0) {
1313 k = i;
1314 if (height != 1) {
1315 int len = keyStr[r].size;
1316 a->size += len*sizeof(char_t);
1317 a->keyStr[an].offs = (nat2)(sizeof(a->keyChar) - a->size);
1318 a->keyStr[an].size = len;
1319 memcpy(a->keyChar + a->keyStr[an].offs,
1320 keyChar + keyStr[r].offs, len*sizeof(char_t));
1321 k -= 1;
1322 an += 1;
1323 a->keyStr[an+k].oid = b->keyStr[k].oid;
1324 b->size -= b->keyStr[k].size*sizeof(char_t);
1325 }
1326 for (j = 0; j < k; j++) {
1327 int len = b->keyStr[j].size;
1328 a->size += len*sizeof(char_t);
1329 b->size -= len*sizeof(char_t);
1330 a->keyStr[an].offs = (nat2)(sizeof(a->keyChar) - a->size);
1331 a->keyStr[an].size = len;
1332 a->keyStr[an].oid = b->keyStr[j].oid;
1333 memcpy(a->keyChar + a->keyStr[an].offs,
1334 b->keyChar + b->keyStr[j].offs, len*sizeof(char_t));
1335 an += 1;
1336 }
1337 memcpy(rem.keyChar, b->keyChar + b->keyStr[i-1].offs,
1338 b->keyStr[i-1].size*sizeof(char_t));
1339 rem.keyLen = b->keyStr[i-1].size;
1340 result = replaceStrKey(db, r, rem, height);
1341 a->nItems = an;
1342 b->compactify(db, i);
1343 }
1344 assert(a->nItems > 0 && b->nItems > 0);
1345 return result;
1346 } else { // merge page b to a
1347 if (height != 1) {
1348 a->size += (a->keyStr[an].size = keyStr[r].size)*sizeof(char_t);
1349 a->keyStr[an].offs = (nat2)(sizeof(keyChar) - a->size);
1350 memcpy(&a->keyChar[a->keyStr[an].offs],
1351 &keyChar[keyStr[r].offs], keyStr[r].size*sizeof(char_t));
1352 an += 1;
1353 a->keyStr[an+bn].oid = b->keyStr[bn].oid;
1354 }
1355 for (int i = 0; i < bn; i++, an++) {
1356 a->keyStr[an] = b->keyStr[i];
1357 a->keyStr[an].offs -= a->size;
1358 }
1359 a->size += b->size;
1360 a->nItems = an;
1361 memcpy(a->keyChar + sizeof(keyChar) - a->size,
1362 b->keyChar + sizeof(keyChar) - b->size,
1363 b->size);
1364 db->pool.unfix(b);
1365 db->freePage(keyStr[r+1].oid);
1366 keyStr[r+1].oid = keyStr[r].oid;
1367 return removeStrKey(db, r);
1368 }
1369 } else { // page b is before a
1370 dbBtreePage* b = (dbBtreePage*)db->get(keyStr[r-1].oid);
1371 int bn = b->nItems;
1372 size_t merged_size = (an+bn)*sizeof(str) + a->size + b->size;
1373 if (height != 1) {
1374 merged_size += keyStr[r-1].size*sizeof(char_t) + sizeof(str)*2;
1375 }
1376 if (merged_size > sizeof(keyChar)) {
1377 // reallocation of nodes between pages a and b
1378 dbPutTie tie;
1379 int i, j, k;
1380 db->pool.unfix(b);
1381 b = (dbBtreePage*)db->put(tie, keyStr[r-1].oid);
1382 size_t size_a = a->size;
1383 size_t size_b = b->size;
1384 size_t addSize, subSize;
1385 if (height != 1) {
1386 addSize = keyStr[r-1].size;
1387 subSize = b->keyStr[bn-1].size;
1388 } else {
1389 addSize = subSize = b->keyStr[bn-1].size;
1390 }
1391 i = 0;
1392 long prevDelta = (long)((an*sizeof(str) + size_a) - (bn*sizeof(str) + size_b));
1393 while (true) {
1394 i += 1;
1395 long delta = (long)(((an+i)*sizeof(str) + size_a + addSize*sizeof(char_t))
1396 - ((bn-i)*sizeof(str) + size_b - subSize*sizeof(char_t)));
1397 if (delta >= 0) {
1398 if (delta >= -prevDelta) {
1399 i -= 1;
1400 }
1401 break;
1402 }
1403 prevDelta = delta;
1404 size_a += addSize*sizeof(char_t);
1405 size_b -= subSize*sizeof(char_t);
1406 if (height != 1) {
1407 addSize = subSize;
1408 subSize = b->keyStr[bn-i-1].size;
1409 } else {
1410 addSize = subSize = b->keyStr[bn-i-1].size;
1411 }
1412 }
1413 int result = dbBtree::done;
1414 if (i > 0) {
1415 k = i;
1416 assert(i < bn);
1417 if (height != 1) {
1418 memmove(&a->keyStr[i], a->keyStr, (an+1)*sizeof(str));
1419 b->size -= b->keyStr[bn-k].size*sizeof(char_t);
1420 k -= 1;
1421 a->keyStr[k].oid = b->keyStr[bn].oid;
1422 int len = keyStr[r-1].size;
1423 a->keyStr[k].size = len;
1424 a->size += len*sizeof(char_t);
1425 a->keyStr[k].offs = (nat2)(sizeof(keyChar) - a->size);
1426 memcpy(&a->keyChar[a->keyStr[k].offs],
1427 &keyChar[keyStr[r-1].offs], len*sizeof(char_t));
1428 } else {
1429 memmove(&a->keyStr[i], a->keyStr, an*sizeof(str));
1430 }
1431 for (j = 0; j < k; j++) {
1432 int len = b->keyStr[bn-k+j].size;
1433 a->size += len*sizeof(char_t);
1434 b->size -= len*sizeof(char_t);
1435 a->keyStr[j].offs = (nat2)(sizeof(a->keyChar) - a->size);
1436 a->keyStr[j].size = len;
1437 a->keyStr[j].oid = b->keyStr[bn-k+j].oid;
1438 memcpy(a->keyChar + a->keyStr[j].offs,
1439 b->keyChar + b->keyStr[bn-k+j].offs, len*sizeof(char_t));
1440 }
1441 an += i;
1442 a->nItems = an;
1443 memcpy(rem.keyChar, b->keyChar + b->keyStr[bn-k-1].offs,
1444 b->keyStr[bn-k-1].size*sizeof(char_t));
1445 rem.keyLen = b->keyStr[bn-k-1].size;
1446 result = replaceStrKey(db, r-1, rem, height);
1447 b->compactify(db, -i);
1448 }
1449 assert(a->nItems > 0 && b->nItems > 0);
1450 return result;
1451 } else { // merge page b to a
1452 if (height != 1) {
1453 memmove(a->keyStr + bn + 1, a->keyStr, (an+1)*sizeof(str));
1454 a->size += (a->keyStr[bn].size = keyStr[r-1].size)*sizeof(char_t);
1455 a->keyStr[bn].offs = (nat2)(sizeof(keyChar) - a->size);
1456 a->keyStr[bn].oid = b->keyStr[bn].oid;
1457 memcpy(&a->keyChar[a->keyStr[bn].offs],
1458 &keyChar[keyStr[r-1].offs], keyStr[r-1].size*sizeof(char_t));
1459 an += 1;
1460 } else {
1461 memmove(a->keyStr + bn, a->keyStr, an*sizeof(str));
1462 }
1463 for (int i = 0; i < bn; i++) {
1464 a->keyStr[i] = b->keyStr[i];
1465 a->keyStr[i].offs -= a->size;
1466 }
1467 an += bn;
1468 a->nItems = an;
1469 a->size += b->size;
1470 memcpy(a->keyChar + sizeof(keyChar) - a->size,
1471 b->keyChar + sizeof(keyChar) - b->size,
1472 b->size);
1473 db->pool.unfix(b);
1474 db->freePage(keyStr[r-1].oid);
1475 return removeStrKey(db, r-1);
1476 }
1477 }
1478 } else {
1479 dbBtreePage* a = (dbBtreePage*)db->put(tie, record[maxItems-r-1]);
1480 int an = a->nItems;
1481 if (r < int(nItems)) { // exists greater page
1482 dbBtreePage* b = (dbBtreePage*)db->get(record[maxItems-r-2]);
1483 int bn = b->nItems;
1484 assert(bn >= an);
1485 if (height != 1) {
1486 memcpy(a->keyChar + an*sizeofType, keyChar + r*sizeofType,
1487 sizeofType);
1488 an += 1;
1489 bn += 1;
1490 }
1491 size_t merged_size = (an+bn)*(sizeof(oid_t) + sizeofType);
1492 if (merged_size > sizeof(keyChar)) {
1493 // reallocation of nodes between pages a and b
1494 int i = bn - ((an + bn) >> 1);
1495 dbPutTie tie;
1496 db->pool.unfix(b);
1497 b = (dbBtreePage*)db->put(tie, record[maxItems-r-2]);
1498 memcpy(a->keyChar + an*sizeofType, b->keyChar, i*sizeofType);
1499 memmove(b->keyChar, b->keyChar + i*sizeofType, (bn-i)*sizeofType);
1500 memcpy(&a->record[maxItems-an-i], &b->record[maxItems-i],
1501 i*sizeof(oid_t));
1502 memmove(&b->record[maxItems-bn+i], &b->record[maxItems-bn],
1503 (bn-i)*sizeof(oid_t));
1504 memcpy(keyChar + r*sizeofType, a->keyChar + (an+i-1)*sizeofType,
1505 sizeofType);
1506 b->nItems -= i;
1507 a->nItems += i;
1508 return dbBtree::done;
1509 } else { // merge page b to a
1510 memcpy(a->keyChar + an*sizeofType, b->keyChar, bn*sizeofType);
1511 memcpy(&a->record[maxItems-an-bn], &b->record[maxItems-bn],
1512 bn*sizeof(oid_t));
1513 db->pool.unfix(b);
1514 db->freePage(record[maxItems-r-2]);
1515 memmove(&record[maxItems-nItems], &record[maxItems-nItems-1],
1516 (nItems - r - 1)*sizeof(oid_t));
1517 memmove(keyChar + r*sizeofType, keyChar + (r+1)*sizeofType,
1518 (nItems - r - 1)*sizeofType);
1519 a->nItems += bn;
1520 nItems -= 1;
1521 return nItems == 0 || (nItems+1)*(sizeofType + sizeof(oid_t))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
1522 ? dbBtree::underflow : dbBtree::done;
1523 }
1524 } else { // page b is before a
1525 dbBtreePage* b = (dbBtreePage*)db->get(record[maxItems-r]);
1526 int bn = b->nItems;
1527 assert(bn >= an);
1528 if (height != 1) {
1529 an += 1;
1530 bn += 1;
1531 }
1532 size_t merged_size = (an+bn)*(sizeof(oid_t) + sizeofType);
1533 if (merged_size > sizeof(keyChar)) {
1534 // reallocation of nodes between pages a and b
1535 int i = bn - ((an + bn) >> 1);
1536 dbPutTie tie;
1537 db->pool.unfix(b);
1538 b = (dbBtreePage*)db->put(tie, record[maxItems-r]);
1539 memmove(a->keyChar + i*sizeofType, a->keyChar, an*sizeofType);
1540 memcpy(a->keyChar, b->keyChar + (bn-i)*sizeofType, i*sizeofType);
1541 memmove(&a->record[maxItems-an-i], &a->record[maxItems-an],
1542 an*sizeof(oid_t));
1543 memcpy(&a->record[maxItems-i], &b->record[maxItems-bn],
1544 i*sizeof(oid_t));
1545 if (height != 1) {
1546 memcpy(a->keyChar + (i-1)*sizeofType, keyChar + (r-1)*sizeofType,
1547 sizeofType);
1548 }
1549 memcpy(keyChar + (r-1)*sizeofType, b->keyChar + (bn-i-1)*sizeofType,
1550 sizeofType);
1551 b->nItems -= i;
1552 a->nItems += i;
1553 return dbBtree::done;
1554 } else { // merge page b to a
1555 memmove(a->keyChar + bn*sizeofType, a->keyChar, an*sizeofType);
1556 memcpy(a->keyChar, b->keyChar, bn*sizeofType);
1557 memmove(&a->record[maxItems-an-bn], &a->record[maxItems-an],
1558 an*sizeof(oid_t));
1559 memcpy(&a->record[maxItems-bn], &b->record[maxItems-bn],
1560 bn*sizeof(oid_t));
1561 if (height != 1) {
1562 memcpy(a->keyChar + (bn-1)*sizeofType, keyChar + (r-1)*sizeofType,
1563 sizeofType);
1564 }
1565 db->pool.unfix(b);
1566 db->freePage(record[maxItems-r]);
1567 record[maxItems-r] = record[maxItems-r-1];
1568 a->nItems += bn;
1569 nItems -= 1;
1570 return nItems == 0 || (nItems+1)*(sizeofType + sizeof(oid_t))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
1571 ? dbBtree::underflow : dbBtree::done;
1572 }
1573 }
1574 }
1575 }
1576
1577 #define REMOVE(KEY,TYPE) { \
1578 TYPE key = rem.KEY; \
1579 while (l < r) { \
1580 i = (l+r) >> 1; \
1581 if (key > pg->KEY[i]) l = i+1; else r = i; \
1582 } \
1583 if (--height == 0) { \
1584 oid_t oid = rem.oid; \
1585 while (r < n) { \
1586 if (key == pg->KEY[r]) { \
1587 if (pg->record[maxItems-r-1] == oid) { \
1588 db->pool.unfix(pg); \
1589 pg = (dbBtreePage*)db->put(tie, pageId); \
1590 memmove(&pg->KEY[r], &pg->KEY[r+1], (n - r - 1)*sizeof(TYPE)); \
1591 memmove(&pg->record[maxItems-n+1], &pg->record[maxItems-n], \
1592 (n - r - 1)*sizeof(oid_t)); \
1593 pg->nItems = --n; \
1594 return n == 0 || n*(sizeof(TYPE) + sizeof(oid_t))*100 < sizeof(pg->keyChar)*db->btreeUnderflowPercent \
1595 ? dbBtree::underflow : dbBtree::done; \
1596 } \
1597 } else { \
1598 break; \
1599 } \
1600 r += 1; \
1601 } \
1602 db->pool.unfix(pg); \
1603 return dbBtree::not_found; \
1604 } \
1605 break; \
1606 }
1607
remove(dbDatabase * db,oid_t pageId,int type,int sizeofType,dbUDTComparator comparator,item & rem,int height)1608 int dbBtreePage::remove(dbDatabase* db, oid_t pageId, int type, int sizeofType,
1609 dbUDTComparator comparator, item& rem, int height)
1610 {
1611 dbBtreePage* pg = (dbBtreePage*)db->get(pageId);
1612 dbPutTie tie;
1613 int i, n = pg->nItems, l = 0, r = n;
1614
1615 switch (type) {
1616 case dbField::tpBool:
1617 case dbField::tpInt1:
1618 REMOVE(keyInt1, int1);
1619 case dbField::tpInt2:
1620 REMOVE(keyInt2, int2);
1621 case dbField::tpInt4:
1622 REMOVE(keyInt4, int4);
1623 case dbField::tpInt8:
1624 REMOVE(keyInt8, db_int8);
1625 case dbField::tpReference:
1626 REMOVE(keyOid, oid_t);
1627 case dbField::tpReal4:
1628 REMOVE(keyReal4, real4);
1629 case dbField::tpReal8:
1630 REMOVE(keyReal8, real8);
1631 case dbField::tpRawBinary:
1632 {
1633 char* key = (char*)rem.keyChar;
1634 while (l < r) {
1635 i = (l+r) >> 1;
1636 if (comparator(key, pg->keyChar + i*sizeofType, sizeofType) > 0) l = i+1; else r = i;
1637 }
1638 if (--height == 0) {
1639 oid_t oid = rem.oid;
1640 while (r < n) {
1641 if (memcmp(key, pg->keyChar + r*sizeofType, sizeofType) == 0) {
1642 if (pg->record[maxItems-r-1] == oid) {
1643 db->pool.unfix(pg);
1644 pg = (dbBtreePage*)db->put(tie, pageId);
1645 memmove(pg->keyChar + r*sizeofType, pg->keyChar + (r+1)*sizeofType,
1646 (n - r - 1)*sizeofType);
1647 memmove(&pg->record[maxItems-n+1], &pg->record[maxItems-n],
1648 (n - r - 1)*sizeof(oid_t));
1649 pg->nItems = --n;
1650 return n == 0 || n*(sizeofType + sizeof(oid_t))*100 < sizeof(pg->keyChar)*db->btreeUnderflowPercent
1651 ? dbBtree::underflow : dbBtree::done;
1652 }
1653 } else {
1654 break;
1655 }
1656 r += 1;
1657 }
1658 db->pool.unfix(pg);
1659 return dbBtree::not_found;
1660 }
1661 break;
1662 }
1663 case dbField::tpString:
1664 {
1665 char_t* key = rem.keyChar;
1666 while (l < r) {
1667 i = (l+r) >> 1;
1668 if (SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[i].offs]) > 0) {
1669 l = i+1;
1670 } else {
1671 r = i;
1672 }
1673 }
1674 if (--height != 0) {
1675 do {
1676 switch (remove(db, pg->keyStr[r].oid, type, sizeofType, comparator, rem, height)) {
1677 case dbBtree::underflow:
1678 db->pool.unfix(pg);
1679 pg = (dbBtreePage*)db->put(tie, pageId);
1680 return pg->handlePageUnderflow(db, r, type, sizeofType, rem, height);
1681 case dbBtree::done:
1682 db->pool.unfix(pg);
1683 return dbBtree::done;
1684 case dbBtree::overflow:
1685 db->pool.unfix(pg);
1686 pg = (dbBtreePage*)db->put(tie, pageId);
1687 return pg->insertStrKey(db, r, rem, height);
1688 }
1689 } while (++r <= n);
1690 } else {
1691 while (r < n) {
1692 if (SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[r].offs]) == 0) {
1693 if (pg->keyStr[r].oid == rem.oid) {
1694 db->pool.unfix(pg);
1695 pg = (dbBtreePage*)db->put(tie, pageId);
1696 return pg->removeStrKey(db, r);
1697 }
1698 } else {
1699 break;
1700 }
1701 r += 1;
1702 }
1703 }
1704 db->pool.unfix(pg);
1705 return dbBtree::not_found;
1706 }
1707 default:
1708 assert(false);
1709 }
1710 do {
1711 switch (remove(db, pg->record[maxItems-r-1], type, sizeofType, comparator, rem, height)) {
1712 case dbBtree::underflow:
1713 db->pool.unfix(pg);
1714 pg = (dbBtreePage*)db->put(tie, pageId);
1715 return pg->handlePageUnderflow(db, r, type, sizeofType, rem, height);
1716 case dbBtree::done:
1717 db->pool.unfix(pg);
1718 return dbBtree::done;
1719 }
1720 } while (++r <= n);
1721
1722 db->pool.unfix(pg);
1723 return dbBtree::not_found;
1724 }
1725
1726
purge(dbDatabase * db,oid_t pageId,int type,int height)1727 void dbBtreePage::purge(dbDatabase* db, oid_t pageId, int type, int height)
1728 {
1729 if (--height != 0) {
1730 dbBtreePage* pg = (dbBtreePage*)db->get(pageId);
1731 int n = pg->nItems+1;
1732 if (type == dbField::tpString) { // page of strings
1733 while (--n >= 0) {
1734 purge(db, pg->keyStr[n].oid, type, height);
1735 }
1736 } else {
1737 while (--n >= 0) {
1738 purge(db, pg->record[maxItems-n-1], type, height);
1739 }
1740 }
1741 db->pool.unfix(pg);
1742 }
1743 db->freePage(pageId);
1744 }
1745
traverseForward(dbDatabase * db,dbAnyCursor * cursor,dbExprNode * condition,int type,int height)1746 bool dbBtreePage::traverseForward(dbDatabase* db, dbAnyCursor* cursor,
1747 dbExprNode* condition, int type, int height)
1748 {
1749 int n = nItems;
1750 if (--height != 0) {
1751 if (type == dbField::tpString) { // page of strings
1752 for (int i = 0; i <= n; i++) {
1753 dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[i].oid);
1754 if (!pg->traverseForward(db, cursor, condition, type, height)) {
1755 db->pool.unfix(pg);
1756 return false;
1757 }
1758 db->pool.unfix(pg);
1759 }
1760 } else {
1761 for (int i = 0; i <= n; i++) {
1762 dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-i-1]);
1763 if (!pg->traverseForward(db, cursor, condition, type, height)) {
1764 db->pool.unfix(pg);
1765 return false;
1766 }
1767 db->pool.unfix(pg);
1768 }
1769 }
1770 } else {
1771 if (type != dbField::tpString) { // page of scalars
1772 if (condition == NULL) {
1773 for (int i = 0; i < n; i++) {
1774 if (!cursor->add(record[maxItems-1-i])) {
1775 return false;
1776 }
1777 }
1778 } else {
1779 dbTableDescriptor* table = cursor->table;
1780 for (int i = 0; i < n; i++) {
1781 if (db->evaluateBoolean(condition, record[maxItems-1-i], table, cursor)) {
1782 if (!cursor->add(record[maxItems-1-i])) {
1783 return false;
1784 }
1785 }
1786 }
1787 }
1788 } else { // page of strings
1789 if (condition == NULL) {
1790 for (int i = 0; i < n; i++) {
1791 if (!cursor->add(keyStr[i].oid)) {
1792 return false;
1793 }
1794 }
1795 } else {
1796 dbTableDescriptor* table = cursor->table;
1797 for (int i = 0; i < n; i++) {
1798 if (db->evaluateBoolean(condition, keyStr[i].oid, table, cursor)) {
1799 if (!cursor->add(keyStr[i].oid)) {
1800 return false;
1801 }
1802 }
1803 }
1804 }
1805 }
1806 }
1807 return true;
1808 }
1809
1810
traverseBackward(dbDatabase * db,dbAnyCursor * cursor,dbExprNode * condition,int type,int height)1811 bool dbBtreePage::traverseBackward(dbDatabase* db, dbAnyCursor* cursor,
1812 dbExprNode* condition, int type, int height)
1813 {
1814 int n = nItems;
1815 if (--height != 0) {
1816 if (type == dbField::tpString) { // page of strings
1817 do {
1818 dbBtreePage* pg = (dbBtreePage*)db->get(keyStr[n].oid);
1819 if (!pg->traverseBackward(db, cursor, condition, type, height)) {
1820 db->pool.unfix(pg);
1821 return false;
1822 }
1823 db->pool.unfix(pg);
1824 } while (--n >= 0);
1825 } else {
1826 do {
1827 dbBtreePage* pg = (dbBtreePage*)db->get(record[maxItems-n-1]);
1828 if (!pg->traverseBackward(db, cursor, condition, type, height)) {
1829 db->pool.unfix(pg);
1830 return false;
1831 }
1832 db->pool.unfix(pg);
1833 } while (--n >= 0);
1834 }
1835 } else {
1836 if (type != dbField::tpString) { // page of scalars
1837 if (condition == NULL) {
1838 while (--n >= 0) {
1839 if (!cursor->add(record[maxItems-1-n])) {
1840 return false;
1841 }
1842 }
1843 } else {
1844 dbTableDescriptor* table = cursor->table;
1845 while (--n >= 0) {
1846 if (db->evaluateBoolean(condition, record[maxItems-1-n], table, cursor)) {
1847 if (!cursor->add(record[maxItems-1-n])) {
1848 return false;
1849 }
1850 }
1851 }
1852 }
1853 } else { // page of strings
1854 if (condition == NULL) {
1855 while (--n >= 0) {
1856 if (!cursor->add(keyStr[n].oid)) {
1857 return false;
1858 }
1859 }
1860 } else {
1861 dbTableDescriptor* table = cursor->table;
1862 while (--n >= 0) {
1863 if (db->evaluateBoolean(condition, keyStr[n].oid, table, cursor)) {
1864 if (!cursor->add(keyStr[n].oid)) {
1865 return false;
1866 }
1867 }
1868 }
1869 }
1870 }
1871 }
1872 return true;
1873 }
1874
1875 //
1876 // Btree optimized for handling duplicates
1877 //
1878
1879 #undef FIND
1880 #define FIND(KEY, TYPE) \
1881 if (sc.firstKey != NULL) { \
1882 TYPE key = CAST(TYPE, sc.firstKey); \
1883 while (l < r) { \
1884 int i = (l+r) >> 1; \
1885 if (CHECK(key, EXTRACT(KEY,i), firstKeyInclusion)) { \
1886 l = i+1; \
1887 } else { \
1888 r = i; \
1889 } \
1890 } \
1891 assert(r == l); \
1892 } \
1893 if (sc.lastKey != NULL) { \
1894 TYPE key = CAST(TYPE, sc.lastKey); \
1895 if (height == 0) { \
1896 while (l < n) { \
1897 if (CHECK(EXTRACT(KEY,l), key, lastKeyInclusion)) { \
1898 return false; \
1899 } \
1900 if (!sc.condition \
1901 || db->evaluateBoolean(sc.condition, ref[maxItems-1-l].oid,\
1902 table, sc.cursor)) \
1903 { \
1904 if (!sc.cursor->add(ref[maxItems-1-l].oid)) { \
1905 return false; \
1906 } \
1907 } \
1908 l += 1; \
1909 } \
1910 return true; \
1911 } else { \
1912 do { \
1913 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-1-l].oid);\
1914 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {\
1915 db->pool.unfix(pg); \
1916 return false; \
1917 } \
1918 db->pool.unfix(pg); \
1919 if (l == n) { \
1920 return true; \
1921 } \
1922 } while(LESS_OR_EQUAL(EXTRACT(KEY,l++), key)); \
1923 return false; \
1924 } \
1925 } \
1926 break
1927
1928 #undef FIND_REVERSE
1929 #define FIND_REVERSE(KEY, TYPE) \
1930 if (sc.lastKey != NULL) { \
1931 TYPE key = CAST(TYPE, sc.lastKey); \
1932 while (l < r) { \
1933 int i = (l+r) >> 1; \
1934 if (CHECK(key, EXTRACT(KEY,i), !lastKeyInclusion)) { \
1935 l = i+1; \
1936 } else { \
1937 r = i; \
1938 } \
1939 } \
1940 assert(r == l); \
1941 } \
1942 if (sc.firstKey != NULL) { \
1943 TYPE key = CAST(TYPE, sc.firstKey); \
1944 if (height == 0) { \
1945 while (--r >= 0) { \
1946 if (CHECK(key, EXTRACT(KEY,r), firstKeyInclusion)) { \
1947 return false; \
1948 } \
1949 if (!sc.condition \
1950 || db->evaluateBoolean(sc.condition, ref[maxItems-1-r].oid,\
1951 table, sc.cursor)) \
1952 { \
1953 if (!sc.cursor->add(ref[maxItems-1-r].oid)) { \
1954 return false; \
1955 } \
1956 } \
1957 } \
1958 return true; \
1959 } else { \
1960 do { \
1961 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-1-r].oid);\
1962 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {\
1963 db->pool.unfix(pg); \
1964 return false; \
1965 } \
1966 db->pool.unfix(pg); \
1967 } while(--r >= 0 && !CHECK(key, EXTRACT(KEY,r), firstKeyInclusion)); \
1968 return r < 0; \
1969 } \
1970 } \
1971 break
1972
find(dbDatabase * db,dbSearchContext & sc,int type,int sizeofType,dbUDTComparator comparator,int height)1973 bool dbThickBtreePage::find(dbDatabase* db, dbSearchContext& sc, int type, int sizeofType,
1974 dbUDTComparator comparator, int height)
1975 {
1976 int l = 0, n = nItems, r = n;
1977 int diff;
1978 dbTableDescriptor* table = sc.cursor->table;
1979 sc.probes += 1;
1980 height -= 1;
1981 int firstKeyInclusion = sc.firstKeyInclusion;
1982 int lastKeyInclusion = sc.lastKeyInclusion;
1983
1984 if (sc.ascent) {
1985 #undef CHECK
1986 #undef EXTRACT
1987 #undef CAST
1988 #undef LESS_OR_EQUAL
1989 #define CHECK(a, b, inclusion) (a > b || (a == b && !inclusion))
1990 #define EXTRACT(array, index) array[index]
1991 #define CAST(TYPE, expr) *(TYPE*)(expr)
1992 #define LESS_OR_EQUAL(a, b) (a <= b)
1993 switch (type) {
1994 case dbField::tpBool:
1995 case dbField::tpInt1:
1996 FIND(keyInt1, int1);
1997 case dbField::tpInt2:
1998 FIND(keyInt2, int2);
1999 case dbField::tpInt4:
2000 FIND(keyInt4, int4);
2001 case dbField::tpInt8:
2002 FIND(keyInt8, db_int8);
2003 case dbField::tpReference:
2004 FIND(keyOid, oid_t);
2005 case dbField::tpReal4:
2006 FIND(keyReal4, real4);
2007 case dbField::tpReal8:
2008 FIND(keyReal8, real8);
2009
2010 case dbField::tpRawBinary:
2011 #undef CHECK
2012 #undef EXTRACT
2013 #undef CAST
2014 #undef LESS_OR_EQUAL
2015 #define CHECK(a, b, inclusion) ((diff = comparator(a, b, sizeofType)) > 0 || (diff == 0 && !inclusion))
2016 #define EXTRACT(array, index) (array + (index)*sizeofType)
2017 #define CAST(TYPE, expr) (TYPE)(expr)
2018 #define LESS_OR_EQUAL(a, b) (comparator(a, b, sizeofType) <= 0)
2019 FIND(keyChar, void*);
2020
2021 case dbField::tpString:
2022 if (sc.firstKey != NULL) {
2023 char_t* firstKey = sc.firstKey;
2024 while (l < r) {
2025 int i = (l+r) >> 1;
2026 if (SCMP(firstKey, (char_t*)&keyChar[keyStr[i].offs])
2027 >= sc.firstKeyInclusion)
2028 {
2029 l = i + 1;
2030 } else {
2031 r = i;
2032 }
2033 }
2034 assert(r == l);
2035 }
2036 if (sc.lastKey != NULL) {
2037 char_t* lastKey = sc.lastKey;
2038 if (height == 0) {
2039 while (l < n) {
2040 if (SCMP((char_t*)&keyChar[keyStr[l].offs],
2041 lastKey) >= sc.lastKeyInclusion)
2042 {
2043 return false;
2044 }
2045 if (!sc.condition
2046 || db->evaluateBoolean(sc.condition, keyStr[l].oid, table, sc.cursor))
2047 {
2048 if (!sc.cursor->add(keyStr[l].oid)) {
2049 return false;
2050 }
2051 }
2052 l += 1;
2053 }
2054 } else {
2055 do {
2056 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[l].oid);
2057 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2058 db->pool.unfix(pg);
2059 return false;
2060 }
2061 db->pool.unfix(pg);
2062 if (l == n) {
2063 return true;
2064 }
2065 } while (SCMP((char_t*)&keyChar[keyStr[l++].offs], lastKey) <= 0);
2066 return false;
2067 }
2068 } else {
2069 if (height == 0) {
2070 if (sc.condition) {
2071 while (l < n) {
2072 if (db->evaluateBoolean(sc.condition, keyStr[l].oid, table, sc.cursor)) {
2073 if (!sc.cursor->add(keyStr[l].oid)) {
2074 return false;
2075 }
2076 }
2077 l += 1;
2078 }
2079 } else {
2080 while (l < n) {
2081 if (!sc.cursor->add(keyStr[l].oid)) {
2082 return false;
2083 }
2084 l += 1;
2085 }
2086 }
2087 } else {
2088 do {
2089 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[l].oid);
2090 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2091 db->pool.unfix(pg);
2092 return false;
2093 }
2094 db->pool.unfix(pg);
2095 } while (++l <= n);
2096 }
2097 }
2098 return true;
2099 default:
2100 assert(false);
2101 }
2102 if (height == 0) {
2103 if (sc.condition) {
2104 while (l < n) {
2105 if (db->evaluateBoolean(sc.condition, ref[maxItems-1-l].oid, table, sc.cursor)) {
2106 if (!sc.cursor->add(ref[maxItems-1-l].oid)) {
2107 return false;
2108 }
2109 }
2110 l += 1;
2111 }
2112 } else {
2113 while (l < n) {
2114 if (!sc.cursor->add(ref[maxItems-1-l].oid)) {
2115 return false;
2116 }
2117 l += 1;
2118 }
2119 }
2120 } else {
2121 do {
2122 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-1-l].oid);
2123 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2124 db->pool.unfix(pg);
2125 return false;
2126 }
2127 db->pool.unfix(pg);
2128 } while (++l <= n);
2129 }
2130 } else { // descent ordering
2131 #undef CHECK
2132 #undef EXTRACT
2133 #undef CAST
2134 #undef LESS_OR_EQUAL
2135 #define CHECK(a, b, inclusion) (a > b || (a == b && !inclusion))
2136 #define EXTRACT(array, index) array[index]
2137 #define CAST(TYPE, expr) *(TYPE*)(expr)
2138 #define LESS_OR_EQUAL(a, b) (a <= b)
2139 switch (type) {
2140 case dbField::tpBool:
2141 case dbField::tpInt1:
2142 FIND_REVERSE(keyInt1, int1);
2143 case dbField::tpInt2:
2144 FIND_REVERSE(keyInt2, int2);
2145 case dbField::tpInt4:
2146 FIND_REVERSE(keyInt4, int4);
2147 case dbField::tpInt8:
2148 FIND_REVERSE(keyInt8, db_int8);
2149 case dbField::tpReference:
2150 FIND_REVERSE(keyOid, oid_t);
2151 case dbField::tpReal4:
2152 FIND_REVERSE(keyReal4, real4);
2153 case dbField::tpReal8:
2154 FIND_REVERSE(keyReal8, real8);
2155
2156 case dbField::tpRawBinary:
2157 #undef CHECK
2158 #undef EXTRACT
2159 #undef CAST
2160 #undef LESS_OR_EQUAL
2161 #define CHECK(a, b, inclusion) ((diff = comparator(a, b, sizeofType)) > 0 || (diff == 0 && !inclusion))
2162 #define EXTRACT(array, index) (array + (index)*sizeofType)
2163 #define CAST(TYPE, expr) (TYPE)(expr)
2164 #define LESS_OR_EQUAL(a, b) (comparator(a, b, sizeofType) <= 0)
2165 FIND_REVERSE(keyChar, void*);
2166
2167
2168 case dbField::tpString:
2169 if (sc.lastKey != NULL) {
2170 char_t* key = sc.lastKey;
2171 while (l < r) {
2172 int i = (l+r) >> 1;
2173 if (SCMP(key, (char_t*)&keyChar[keyStr[i].offs]) >= 1-lastKeyInclusion) {
2174 l = i + 1;
2175 } else {
2176 r = i;
2177 }
2178 }
2179 assert(r == l);
2180 }
2181 if (sc.firstKey != NULL) {
2182 char_t* key = sc.firstKey;
2183 if (height == 0) {
2184 while (--r >= 0) {
2185 if (SCMP(key, (char_t*)&keyChar[keyStr[r].offs]) >= firstKeyInclusion) {
2186 return false;
2187 }
2188 if (!sc.condition
2189 || db->evaluateBoolean(sc.condition, keyStr[r].oid, table, sc.cursor))
2190 {
2191 if (!sc.cursor->add(keyStr[r].oid)) {
2192 return false;
2193 }
2194 }
2195 }
2196 } else {
2197 do {
2198 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[r].oid);
2199 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2200 db->pool.unfix(pg);
2201 return false;
2202 }
2203 db->pool.unfix(pg);
2204 } while (--r >= 0 && SCMP(key, (char_t*)&keyChar[keyStr[r].offs]) < firstKeyInclusion);
2205 return r < 0;
2206 }
2207 } else {
2208 if (height == 0) {
2209 if (sc.condition) {
2210 while (--r >= 0) {
2211 if (db->evaluateBoolean(sc.condition, keyStr[r].oid, table, sc.cursor)) {
2212 if (!sc.cursor->add(keyStr[r].oid)) {
2213 return false;
2214 }
2215 }
2216 }
2217 } else {
2218 while (--r >= 0) {
2219 if (!sc.cursor->add(keyStr[r].oid)) {
2220 return false;
2221 }
2222 }
2223 }
2224 } else {
2225 do {
2226 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[r].oid);
2227 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2228 db->pool.unfix(pg);
2229 return false;
2230 }
2231 db->pool.unfix(pg);
2232 } while (--r >= 0);
2233 }
2234 }
2235 return true;
2236 default:
2237 assert(false);
2238 }
2239 if (height == 0) {
2240 if (sc.condition) {
2241 while (--r >= 0) {
2242 if (db->evaluateBoolean(sc.condition, ref[maxItems-1-r].oid, table, sc.cursor)) {
2243 if (!sc.cursor->add(ref[maxItems-1-r].oid)) {
2244 return false;
2245 }
2246 }
2247 }
2248 } else {
2249 while (--r >= 0) {
2250 if (!sc.cursor->add(ref[maxItems-1-r].oid)) {
2251 return false;
2252 }
2253 }
2254 }
2255 } else {
2256 do {
2257 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-1-r].oid);
2258 if (!pg->find(db, sc, type, sizeofType, comparator, height)) {
2259 db->pool.unfix(pg);
2260 return false;
2261 }
2262 db->pool.unfix(pg);
2263 } while (--r >= 0);
2264 }
2265 }
2266 return true;
2267 }
2268
2269
allocate(dbDatabase * db,oid_t root,int type,int sizeofType,item & ins)2270 oid_t dbThickBtreePage::allocate(dbDatabase* db, oid_t root, int type, int sizeofType, item& ins)
2271 {
2272 oid_t pageId = db->allocatePage();
2273 dbThickBtreePage* page = (dbThickBtreePage*)db->put(pageId);
2274 page->nItems = 1;
2275 if (type == dbField::tpString) {
2276 page->size = ins.keyLen*sizeof(char_t);
2277 page->keyStr[0].offs = (nat2)(sizeof(page->keyChar) - ins.keyLen*sizeof(char_t));
2278 page->keyStr[0].size = ins.keyLen;
2279 page->keyStr[0].oid = ins.oid;
2280 page->keyStr[0].recId = ins.recId;
2281 page->keyStr[1].oid = root;
2282 memcpy(&page->keyChar[page->keyStr[0].offs], ins.keyChar, ins.keyLen*sizeof(char_t));
2283 } else if (type == dbField::tpRawBinary) {
2284 memcpy(page->keyChar, ins.keyChar, sizeofType);
2285 page->ref[maxItems-1].oid = ins.oid;
2286 page->ref[maxItems-1].recId = ins.recId;
2287 page->ref[maxItems-2].oid = root;
2288 } else {
2289 memcpy(page->keyChar, ins.keyChar, keySize[type]);
2290 page->ref[maxItems-1].oid = ins.oid;
2291 page->ref[maxItems-1].recId = ins.recId;
2292 page->ref[maxItems-2].oid = root;
2293 }
2294 db->pool.unfix(page);
2295 return pageId;
2296 }
2297
2298
2299
2300 #undef INSERT
2301 #define INSERT(KEY, TYPE) { \
2302 TYPE key = ins.KEY; \
2303 while (l < r) { \
2304 int i = (l+r) >> 1; \
2305 if (key > pg->KEY[i] || (key == pg->KEY[i] && ins.recId > pg->ref[maxItems-i-1].recId)) { \
2306 l = i+1; \
2307 } else { \
2308 r = i; \
2309 } \
2310 } \
2311 assert(l == r); \
2312 /* insert before e[r] */ \
2313 if (--height != 0) { \
2314 result = insert(db, pg->ref[maxItems-r-1].oid, type, sizeofType, comparator, ins, height); \
2315 assert(result == dbBtree::done || result == dbBtree::overflow); \
2316 if (result == dbBtree::done) { \
2317 db->pool.unfix(pg); \
2318 return result; \
2319 } \
2320 n += 1; \
2321 } \
2322 db->pool.unfix(pg); \
2323 pg = (dbThickBtreePage*)db->put(tie, pageId); \
2324 const int max = sizeof(pg->KEY) / (sizeof(reference) + sizeof(TYPE)); \
2325 if (n < max) { \
2326 memmove(&pg->KEY[r+1], &pg->KEY[r], (n - r)*sizeof(TYPE)); \
2327 memmove(&pg->ref[maxItems-n-1], &pg->ref[maxItems-n], \
2328 (n-r)*sizeof(reference)); \
2329 pg->KEY[r] = ins.KEY; \
2330 pg->ref[maxItems-r-1].oid = ins.oid; \
2331 pg->ref[maxItems-r-1].recId = ins.recId; \
2332 pg->nItems += 1; \
2333 return dbBtree::done; \
2334 } else { /* page is full then divide page */ \
2335 oid_t pageId = db->allocatePage(); \
2336 dbThickBtreePage* b = (dbThickBtreePage*)db->put(pageId); \
2337 assert(n == max); \
2338 const int m = (max+1)/2; \
2339 if (r < m) { \
2340 memcpy(b->KEY, pg->KEY, r*sizeof(TYPE)); \
2341 b->KEY[r] = ins.KEY; \
2342 memcpy(&b->KEY[r+1], &pg->KEY[r], (m-r-1)*sizeof(TYPE)); \
2343 memmove(pg->KEY, &pg->KEY[m-1], (max-m+1)*sizeof(TYPE)); \
2344 memcpy(&b->ref[maxItems-r], &pg->ref[maxItems-r], \
2345 r*sizeof(reference)); \
2346 b->ref[maxItems-r-1].oid = ins.oid; \
2347 b->ref[maxItems-r-1].recId = ins.recId; \
2348 memcpy(&b->ref[maxItems-m], &pg->ref[maxItems-m+1], \
2349 (m-r-1)*sizeof(reference)); \
2350 memmove(&pg->ref[maxItems-max+m-1], &pg->ref[maxItems-max], \
2351 (max-m+1)*sizeof(reference)); \
2352 } else { \
2353 memcpy(b->KEY, pg->KEY, m*sizeof(TYPE)); \
2354 memmove(pg->KEY, &pg->KEY[m], (r-m)*sizeof(TYPE)); \
2355 pg->KEY[r-m] = ins.KEY; \
2356 memmove(&pg->KEY[r-m+1], &pg->KEY[r], (max-r)*sizeof(TYPE)); \
2357 memcpy(&b->ref[maxItems-m], &pg->ref[maxItems-m], \
2358 m*sizeof(reference)); \
2359 memmove(&pg->ref[maxItems-r+m], &pg->ref[maxItems-r], \
2360 (r-m)*sizeof(reference)); \
2361 pg->ref[maxItems-r+m-1].oid = ins.oid; \
2362 pg->ref[maxItems-r+m-1].recId = ins.recId; \
2363 memmove(&pg->ref[maxItems-max+m-1], &pg->ref[maxItems-max], \
2364 (max-r)*sizeof(reference)); \
2365 } \
2366 ins.oid = pageId; \
2367 ins.recId = b->ref[maxItems-m].recId; \
2368 ins.KEY = b->KEY[m-1]; \
2369 if (height == 0) { \
2370 pg->nItems = max - m + 1; \
2371 b->nItems = m; \
2372 } else { \
2373 pg->nItems = max - m; \
2374 b->nItems = m - 1; \
2375 } \
2376 db->pool.unfix(b); \
2377 return dbBtree::overflow; \
2378 } \
2379 }
2380
2381
insert(dbDatabase * db,oid_t pageId,int type,int sizeofType,dbUDTComparator comparator,item & ins,int height)2382 int dbThickBtreePage::insert(dbDatabase* db, oid_t pageId, int type, int sizeofType,
2383 dbUDTComparator comparator, item& ins, int height)
2384 {
2385 dbPutTie tie;
2386 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(pageId);
2387 int result;
2388 int l = 0, n = pg->nItems, r = n;
2389
2390 switch (type) {
2391 case dbField::tpBool:
2392 case dbField::tpInt1:
2393 INSERT(keyInt1, int1);
2394 case dbField::tpInt2:
2395 INSERT(keyInt2, int2);
2396 case dbField::tpInt4:
2397 INSERT(keyInt4, int4);
2398 case dbField::tpInt8:
2399 INSERT(keyInt8, db_int8);
2400 case dbField::tpReference:
2401 INSERT(keyOid, oid_t);
2402 case dbField::tpReal4:
2403 INSERT(keyReal4, real4);
2404 case dbField::tpReal8:
2405 INSERT(keyReal8, real8);
2406 case dbField::tpRawBinary:
2407 {
2408 char* key = (char*)ins.keyChar;
2409 while (l < r) {
2410 int i = (l+r) >> 1;
2411 int diff = comparator(key, pg->keyChar + i*sizeofType, sizeofType);
2412 if (diff > 0 || (diff == 0 && ins.recId > pg->ref[maxItems-i-1].recId)) {
2413 l = i+1;
2414 } else {
2415 r = i;
2416 }
2417 }
2418 assert(l == r);
2419 /* insert before e[r] */
2420 if (--height != 0) {
2421 result = insert(db, pg->ref[maxItems-r-1].oid, type, sizeofType, comparator, ins, height);
2422 assert(result == dbBtree::done || result == dbBtree::overflow);
2423 if (result == dbBtree::done) {
2424 db->pool.unfix(pg);
2425 return result;
2426 }
2427 n += 1;
2428 }
2429 db->pool.unfix(pg);
2430 pg = (dbThickBtreePage*)db->put(tie, pageId);
2431 const int max = sizeof(pg->keyChar) / (sizeof(reference) + sizeofType);
2432 if (n < max) {
2433 memmove(pg->keyChar + (r+1)*sizeofType, pg->keyChar + r*sizeofType, (n - r)*sizeofType);
2434 memmove(&pg->ref[maxItems-n-1], &pg->ref[maxItems-n], (n-r)*sizeof(reference));
2435 memcpy(pg->keyChar + r*sizeofType, ins.keyChar, sizeofType);
2436 pg->ref[maxItems-r-1].oid = ins.oid;
2437 pg->ref[maxItems-r-1].recId = ins.recId;
2438 pg->nItems += 1;
2439 return dbBtree::done;
2440 } else { /* page is full then divide page */
2441 oid_t pageId = db->allocatePage();
2442 dbThickBtreePage* b = (dbThickBtreePage*)db->put(pageId);
2443 assert(n == max);
2444 const int m = (max+1)/2;
2445 if (r < m) {
2446 memcpy(b->keyChar, pg->keyChar, r*sizeofType);
2447 memcpy(b->keyChar + r*sizeofType, ins.keyChar, sizeofType);
2448 memcpy(b->keyChar + (r+1)*sizeofType, pg->keyChar + r*sizeofType, (m-r-1)*sizeofType);
2449 memmove(pg->keyChar, pg->keyChar + (m-1)*sizeofType, (max-m+1)*sizeofType);
2450 memcpy(&b->ref[maxItems-r], &pg->ref[maxItems-r], r*sizeof(reference));
2451 b->ref[maxItems-r-1].oid = ins.oid;
2452 b->ref[maxItems-r-1].recId = ins.recId;
2453 memcpy(&b->ref[maxItems-m], &pg->ref[maxItems-m+1], (m-r-1)*sizeof(reference));
2454 memmove(&pg->ref[maxItems-max+m-1], &pg->ref[maxItems-max], (max-m+1)*sizeof(reference));
2455 } else {
2456 memcpy(b->keyChar, pg->keyChar, m*sizeofType);
2457 memmove(pg->keyChar, pg->keyChar + m*sizeofType, (r-m)*sizeofType);
2458 memcpy(pg->keyChar + (r-m)*sizeofType, ins.keyChar, sizeofType);
2459 memmove(pg->keyChar + (r-m+1)*sizeofType, pg->keyChar + r*sizeofType,
2460 (max-r)*sizeofType);
2461 memcpy(&b->ref[maxItems-m], &pg->ref[maxItems-m], m*sizeof(reference));
2462 memmove(&pg->ref[maxItems-r+m], &pg->ref[maxItems-r], (r-m)*sizeof(reference));
2463 pg->ref[maxItems-r+m-1].oid = ins.oid;
2464 pg->ref[maxItems-r+m-1].recId = ins.recId;
2465 memmove(&pg->ref[maxItems-max+m-1], &pg->ref[maxItems-max], (max-r)*sizeof(reference));
2466 }
2467 ins.oid = pageId;
2468 ins.recId = b->ref[maxItems-m].recId;
2469 memcpy(ins.keyChar, b->keyChar + (m-1)*sizeofType, sizeofType);
2470 if (height == 0) {
2471 pg->nItems = max - m + 1;
2472 b->nItems = m;
2473 } else {
2474 pg->nItems = max - m;
2475 b->nItems = m - 1;
2476 }
2477 db->pool.unfix(b);
2478 return dbBtree::overflow;
2479 }
2480 }
2481 case dbField::tpString:
2482 {
2483 char_t* key = ins.keyChar;
2484 while (l < r) {
2485 int i = (l+r) >> 1;
2486 int diff = SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[i].offs]);
2487 if (diff > 0 || (diff == 0 && ins.recId > pg->keyStr[i].recId)) {
2488 l = i+1;
2489 } else {
2490 r = i;
2491 }
2492 }
2493 if (--height != 0) {
2494 result = insert(db, pg->keyStr[r].oid, type, sizeofType, comparator, ins, height);
2495 assert (result != dbBtree::not_found);
2496 if (result != dbBtree::overflow) {
2497 db->pool.unfix(pg);
2498 return result;
2499 }
2500 }
2501 db->pool.unfix(pg);
2502 pg = (dbThickBtreePage*)db->put(tie, pageId);
2503 return pg->insertStrKey(db, r, ins, height);
2504 }
2505 default:
2506 assert(false);
2507 }
2508 return dbBtree::done;
2509 }
2510
insertStrKey(dbDatabase * db,int r,item & ins,int height)2511 int dbThickBtreePage::insertStrKey(dbDatabase* db, int r, item& ins, int height)
2512 {
2513 int n = height != 0 ? nItems+1 : nItems;
2514 // insert before e[r]
2515 int len = ins.keyLen;
2516 if (size + len*sizeof(char_t) + (n+1)*sizeof(str) <= sizeof(keyChar)) {
2517 memmove(&keyStr[r+1], &keyStr[r], (n-r)*sizeof(str));
2518 size += len*sizeof(char_t);
2519 keyStr[r].offs = (nat2)(sizeof(keyChar) - size);
2520 keyStr[r].size = len;
2521 keyStr[r].oid = ins.oid;
2522 keyStr[r].recId = ins.recId;
2523 memcpy(&keyChar[sizeof(keyChar) - size], ins.keyChar, len*sizeof(char_t));
2524 nItems += 1;
2525 } else { // page is full then divide page
2526 oid_t pageId = db->allocatePage();
2527 dbThickBtreePage* b = (dbThickBtreePage*)db->put(pageId);
2528 size_t moved = 0;
2529 size_t inserted = len*sizeof(char_t) + sizeof(str);
2530 long prevDelta = (1L << (sizeof(long)*8-1)) + 1;
2531 for (int bn = 0, i = 0; ; bn += 1) {
2532 size_t addSize, subSize;
2533 int j = nItems - i - 1;
2534 size_t keyLen = keyStr[i].size;
2535 if (bn == r) {
2536 keyLen = len;
2537 inserted = 0;
2538 addSize = len;
2539 if (height == 0) {
2540 subSize = 0;
2541 j += 1;
2542 } else {
2543 subSize = keyStr[i].size;
2544 }
2545 } else {
2546 addSize = subSize = keyLen;
2547 if (height != 0) {
2548 if (i + 1 != r) {
2549 subSize += keyStr[i+1].size;
2550 j -= 1;
2551 } else {
2552 inserted = 0;
2553 }
2554 }
2555 }
2556 long delta = (long)((moved + addSize*sizeof(char_t) + (bn+1)*sizeof(str))
2557 - (j*sizeof(str) + size - subSize*sizeof(char_t) + inserted));
2558 if (delta >= -prevDelta) {
2559 char_t insKey[dbThickBtreePage::dbMaxKeyLen];
2560 oid_t insRecId = 0;
2561 if (bn <= r) {
2562 memcpy(insKey, ins.keyChar, len*sizeof(char_t));
2563 insRecId = ins.recId;
2564 }
2565 if (height == 0) {
2566 ins.keyLen = b->keyStr[bn-1].size;
2567 memcpy(ins.keyChar, &b->keyChar[b->keyStr[bn-1].offs], ins.keyLen*sizeof(char_t));
2568 ins.recId = b->keyStr[bn-1].recId;
2569 } else {
2570 assert(((void)"String fits in the B-Tree page",
2571 moved + (bn+1)*sizeof(str) <= sizeof(keyChar)));
2572 if (bn != r) {
2573 ins.keyLen = (int)keyLen;
2574 memcpy(ins.keyChar, &keyChar[keyStr[i].offs],
2575 keyLen*sizeof(char_t));
2576 b->keyStr[bn].oid = keyStr[i].oid;
2577 ins.recId = keyStr[i].recId;
2578 size -= (int)(keyLen*sizeof(char_t));
2579 i += 1;
2580 } else {
2581 b->keyStr[bn].oid = ins.oid;
2582 }
2583 }
2584 compactify(db, i);
2585 if (bn < r || (bn == r && height == 0)) {
2586 memmove(&keyStr[r-i+1], &keyStr[r-i],
2587 (n - r)*sizeof(str));
2588 size += len*sizeof(char_t);
2589 nItems += 1;
2590 assert(((void)"String fits in the B-Tree page",
2591 size + (n-i+1)*sizeof(str) <= sizeof(keyChar)));
2592 keyStr[r-i].offs = (nat2)(sizeof(keyChar) - size);
2593 keyStr[r-i].size = len;
2594 keyStr[r-i].oid = ins.oid;
2595 keyStr[r-i].recId = insRecId;
2596 memcpy(&keyChar[keyStr[r-i].offs], insKey, len*sizeof(char_t));
2597 }
2598 b->nItems = bn;
2599 b->size = (nat4)moved;
2600 ins.oid = pageId;
2601 db->pool.unfix(b);
2602 assert(nItems > 0 && b->nItems > 0);
2603 return dbBtree::overflow;
2604 }
2605 prevDelta = delta;
2606 moved += keyLen*sizeof(char_t);
2607 assert(((void)"String fits in the B-Tree page",
2608 moved + (bn+1)*sizeof(str) <= sizeof(keyChar)));
2609 b->keyStr[bn].size = (nat2)keyLen;
2610 b->keyStr[bn].offs = (nat2)(sizeof(keyChar) - moved);
2611 if (bn == r) {
2612 b->keyStr[bn].oid = ins.oid;
2613 b->keyStr[bn].recId = ins.recId;
2614 memcpy(&b->keyChar[b->keyStr[bn].offs], ins.keyChar, keyLen*sizeof(char_t));
2615 } else {
2616 b->keyStr[bn].oid = keyStr[i].oid;
2617 b->keyStr[bn].recId = keyStr[i].recId;
2618 memcpy(&b->keyChar[b->keyStr[bn].offs],
2619 &keyChar[keyStr[i].offs], keyLen*sizeof(char_t));
2620 size -= (nat4)(keyLen*sizeof(char_t));
2621 i += 1;
2622 }
2623 }
2624 }
2625 return nItems == 0 || (size + sizeof(str)*(nItems+1))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
2626 ? dbBtree::underflow : dbBtree::done;
2627 }
2628
compactify(dbDatabase * db,int m)2629 void dbThickBtreePage::compactify(dbDatabase* db, int m)
2630 {
2631 int i, j, offs, len, n = nItems;
2632 // int size[dbPageSize]; // for 8Kb default page size, total size used by these buffers will be 64kb which is too much for many environments
2633 // int index[dbPageSize];
2634 int* size = db->btreeBuf;
2635 int* index = size + dbPageSize;
2636 if (m == 0) {
2637 return;
2638 }
2639 if (m < 0) {
2640 m = -m;
2641 for (i = 0; i < n-m; i++) {
2642 len = keyStr[i].size;
2643 size[keyStr[i].offs + len*sizeof(char_t)] = len;
2644 index[keyStr[i].offs + len*sizeof(char_t)] = i;
2645 }
2646 for (; i < n; i++) {
2647 len = keyStr[i].size;
2648 size[keyStr[i].offs + len*sizeof(char_t)] = len;
2649 index[keyStr[i].offs + len*sizeof(char_t)] = -1;
2650 }
2651 } else {
2652 for (i = 0; i < m; i++) {
2653 len = keyStr[i].size;
2654 size[keyStr[i].offs + len*sizeof(char_t)] = len;
2655 index[keyStr[i].offs + len*sizeof(char_t)] = -1;
2656 }
2657 for (; i < n; i++) {
2658 len = keyStr[i].size;
2659 size[keyStr[i].offs + len*sizeof(char_t)] = len;
2660 index[keyStr[i].offs + len*sizeof(char_t)] = i - m;
2661 keyStr[i-m].oid = keyStr[i].oid;
2662 keyStr[i-m].recId = keyStr[i].recId;
2663 keyStr[i-m].size = len;
2664 }
2665 keyStr[i-m].oid = keyStr[i].oid;
2666 keyStr[i-m].recId = keyStr[i].recId;
2667 }
2668 nItems = n -= m;
2669 for (offs = sizeof(keyChar), i = offs; n != 0; i -= len) {
2670 len = size[i]*sizeof(char_t);
2671 j = index[i];
2672 if (j >= 0) {
2673 offs -= len;
2674 n -= 1;
2675 keyStr[j].offs = offs;
2676 if (offs != i - len) {
2677 memmove(&keyChar[offs], &keyChar[(i - len)], len);
2678 }
2679 }
2680 }
2681 }
2682
removeStrKey(dbDatabase * db,int r)2683 int dbThickBtreePage::removeStrKey(dbDatabase* db, int r)
2684 {
2685 int len = keyStr[r].size;
2686 int offs = keyStr[r].offs;
2687 memmove(keyChar + sizeof(keyChar) - size + len*sizeof(char_t),
2688 keyChar + sizeof(keyChar) - size,
2689 size - sizeof(keyChar) + offs);
2690 memmove(&keyStr[r], &keyStr[r+1], (nItems-r)*sizeof(str));
2691 nItems -= 1;
2692 size -= len*sizeof(char_t);
2693 for (int i = nItems; --i >= 0; ) {
2694 if (keyStr[i].offs < offs) {
2695 keyStr[i].offs += (nat2)(len*sizeof(char_t));
2696 }
2697 }
2698 return nItems == 0 || (size + sizeof(str)*(nItems+1))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
2699 ? dbBtree::underflow : dbBtree::done;
2700 }
2701
replaceStrKey(dbDatabase * db,int r,item & ins,int height)2702 int dbThickBtreePage::replaceStrKey(dbDatabase* db, int r, item& ins, int height)
2703 {
2704 ins.oid = keyStr[r].oid;
2705 removeStrKey(db, r);
2706 return insertStrKey(db, r, ins, height);
2707 }
2708
handlePageUnderflow(dbDatabase * db,int r,int type,int sizeofType,item & rem,int height)2709 int dbThickBtreePage::handlePageUnderflow(dbDatabase* db, int r, int type, int sizeofType, item& rem,
2710 int height)
2711 {
2712 dbPutTie tie;
2713 if (type == dbField::tpString) {
2714 dbThickBtreePage* a = (dbThickBtreePage*)db->put(tie, keyStr[r].oid);
2715 int an = a->nItems;
2716 if (r < (int)nItems) { // exists greater page
2717 dbThickBtreePage* b = (dbThickBtreePage*)db->get(keyStr[r+1].oid);
2718 int bn = b->nItems;
2719 size_t merged_size = (an+bn)*sizeof(str) + a->size + b->size;
2720 if (height != 1) {
2721 merged_size += keyStr[r].size*sizeof(char_t) + sizeof(str)*2;
2722 }
2723 if (merged_size > sizeof(keyChar)) {
2724 // reallocation of nodes between pages a and b
2725 int i, j, k;
2726 dbPutTie tie;
2727 db->pool.unfix(b);
2728 b = (dbThickBtreePage*)db->put(tie, keyStr[r+1].oid);
2729 size_t size_a = a->size;
2730 size_t size_b = b->size;
2731 size_t addSize, subSize;
2732 if (height != 1) {
2733 addSize = keyStr[r].size;
2734 subSize = b->keyStr[0].size;
2735 } else {
2736 addSize = subSize = b->keyStr[0].size;
2737 }
2738 i = 0;
2739 long prevDelta = (long)((an*sizeof(str) + size_a) - (bn*sizeof(str) + size_b));
2740 while (true) {
2741 i += 1;
2742 long delta = (long)(((an+i)*sizeof(str) + size_a + addSize*sizeof(char_t))
2743 - ((bn-i)*sizeof(str) + size_b - subSize*sizeof(char_t)));
2744 if (delta >= 0) {
2745 if (delta >= -prevDelta) {
2746 i -= 1;
2747 }
2748 break;
2749 }
2750 size_a += addSize*sizeof(char_t);
2751 size_b -= subSize*sizeof(char_t);
2752 prevDelta = delta;
2753 if (height != 1) {
2754 addSize = subSize;
2755 subSize = b->keyStr[i].size;
2756 } else {
2757 addSize = subSize = b->keyStr[i].size;
2758 }
2759 }
2760 int result = dbBtree::done;
2761 if (i > 0) {
2762 k = i;
2763 if (height != 1) {
2764 int len = keyStr[r].size;
2765 a->size += len*sizeof(char_t);
2766 a->keyStr[an].offs = (nat2)(sizeof(a->keyChar) - a->size);
2767 a->keyStr[an].size = len;
2768 a->keyStr[an].recId = keyStr[r].recId;
2769 memcpy(a->keyChar + a->keyStr[an].offs,
2770 keyChar + keyStr[r].offs, len*sizeof(char_t));
2771 k -= 1;
2772 an += 1;
2773 a->keyStr[an+k].oid = b->keyStr[k].oid;
2774 b->size -= b->keyStr[k].size*sizeof(char_t);
2775 }
2776 for (j = 0; j < k; j++) {
2777 int len = b->keyStr[j].size;
2778 a->size += len*sizeof(char_t);
2779 b->size -= len*sizeof(char_t);
2780 a->keyStr[an].offs = (nat2)(sizeof(a->keyChar) - a->size);
2781 a->keyStr[an].size = len;
2782 a->keyStr[an].oid = b->keyStr[j].oid;
2783 a->keyStr[an].recId = b->keyStr[j].recId;
2784 memcpy(a->keyChar + a->keyStr[an].offs,
2785 b->keyChar + b->keyStr[j].offs, len*sizeof(char_t));
2786 an += 1;
2787 }
2788 memcpy(rem.keyChar, b->keyChar + b->keyStr[i-1].offs,
2789 b->keyStr[i-1].size*sizeof(char_t));
2790 rem.keyLen = b->keyStr[i-1].size;
2791 rem.recId = b->keyStr[i-1].recId;
2792 result = replaceStrKey(db, r, rem, height);
2793 a->nItems = an;
2794 b->compactify(db, i);
2795 }
2796 assert(a->nItems > 0 && b->nItems > 0);
2797 return result;
2798 } else { // merge page b to a
2799 if (height != 1) {
2800 a->size += (a->keyStr[an].size = keyStr[r].size)*sizeof(char_t);
2801 a->keyStr[an].offs = (nat2)(sizeof(keyChar) - a->size);
2802 memcpy(&a->keyChar[a->keyStr[an].offs],
2803 &keyChar[keyStr[r].offs], keyStr[r].size*sizeof(char_t));
2804 a->keyStr[an].recId = keyStr[r].recId;
2805 an += 1;
2806 a->keyStr[an+bn].oid = b->keyStr[bn].oid;
2807 }
2808 for (int i = 0; i < bn; i++, an++) {
2809 a->keyStr[an] = b->keyStr[i];
2810 a->keyStr[an].offs -= a->size;
2811 }
2812 a->size += b->size;
2813 a->nItems = an;
2814 memcpy(a->keyChar + sizeof(keyChar) - a->size,
2815 b->keyChar + sizeof(keyChar) - b->size,
2816 b->size);
2817 db->pool.unfix(b);
2818 db->freePage(keyStr[r+1].oid);
2819 keyStr[r+1].oid = keyStr[r].oid;
2820 return removeStrKey(db, r);
2821 }
2822 } else { // page b is before a
2823 dbThickBtreePage* b = (dbThickBtreePage*)db->get(keyStr[r-1].oid);
2824 int bn = b->nItems;
2825 size_t merged_size = (an+bn)*sizeof(str) + a->size + b->size;
2826 if (height != 1) {
2827 merged_size += keyStr[r-1].size*sizeof(char_t) + sizeof(str)*2;
2828 }
2829 if (merged_size > sizeof(keyChar)) {
2830 // reallocation of nodes between pages a and b
2831 dbPutTie tie;
2832 int i, j, k;
2833 db->pool.unfix(b);
2834 b = (dbThickBtreePage*)db->put(tie, keyStr[r-1].oid);
2835 size_t size_a = a->size;
2836 size_t size_b = b->size;
2837 size_t addSize, subSize;
2838 if (height != 1) {
2839 addSize = keyStr[r-1].size;
2840 subSize = b->keyStr[bn-1].size;
2841 } else {
2842 addSize = subSize = b->keyStr[bn-1].size;
2843 }
2844 i = 0;
2845 long prevDelta = (long)((an*sizeof(str) + size_a) - (bn*sizeof(str) + size_b));
2846 while (true) {
2847 i += 1;
2848 long delta = (long)(((an+i)*sizeof(str) + size_a + addSize*sizeof(char_t))
2849 - ((bn-i)*sizeof(str) + size_b - subSize*sizeof(char_t)));
2850 if (delta >= 0) {
2851 if (delta >= -prevDelta) {
2852 i -= 1;
2853 }
2854 break;
2855 }
2856 prevDelta = delta;
2857 size_a += addSize*sizeof(char_t);
2858 size_b -= subSize*sizeof(char_t);
2859 if (height != 1) {
2860 addSize = subSize;
2861 subSize = b->keyStr[bn-i-1].size;
2862 } else {
2863 addSize = subSize = b->keyStr[bn-i-1].size;
2864 }
2865 }
2866 int result = dbBtree::done;
2867 if (i > 0) {
2868 k = i;
2869 assert(i < bn);
2870 if (height != 1) {
2871 memmove(&a->keyStr[i], a->keyStr, (an+1)*sizeof(str));
2872 b->size -= b->keyStr[bn-k].size*sizeof(char_t);
2873 k -= 1;
2874 a->keyStr[k].oid = b->keyStr[bn].oid;
2875 a->keyStr[k].recId = keyStr[r-1].recId;
2876 int len = keyStr[r-1].size;
2877 a->keyStr[k].size = len;
2878 a->size += len*sizeof(char_t);
2879 a->keyStr[k].offs = (nat2)(sizeof(keyChar) - a->size);
2880 memcpy(&a->keyChar[a->keyStr[k].offs],
2881 &keyChar[keyStr[r-1].offs], len*sizeof(char_t));
2882 } else {
2883 memmove(&a->keyStr[i], a->keyStr, an*sizeof(str));
2884 }
2885 for (j = 0; j < k; j++) {
2886 int len = b->keyStr[bn-k+j].size;
2887 a->size += len*sizeof(char_t);
2888 b->size -= len*sizeof(char_t);
2889 a->keyStr[j].offs = (nat2)(sizeof(a->keyChar) - a->size);
2890 a->keyStr[j].size = len;
2891 a->keyStr[j].oid = b->keyStr[bn-k+j].oid;
2892 a->keyStr[j].recId = b->keyStr[bn-k+j].recId;
2893 memcpy(a->keyChar + a->keyStr[j].offs,
2894 b->keyChar + b->keyStr[bn-k+j].offs, len*sizeof(char_t));
2895 }
2896 an += i;
2897 a->nItems = an;
2898 memcpy(rem.keyChar, b->keyChar + b->keyStr[bn-k-1].offs,
2899 b->keyStr[bn-k-1].size*sizeof(char_t));
2900 rem.keyLen = b->keyStr[bn-k-1].size;
2901 rem.recId = b->keyStr[bn-k-1].recId;
2902 result = replaceStrKey(db, r-1, rem, height);
2903 b->compactify(db, -i);
2904 }
2905 assert(a->nItems > 0 && b->nItems > 0);
2906 return result;
2907 } else { // merge page b to a
2908 if (height != 1) {
2909 memmove(a->keyStr + bn + 1, a->keyStr, (an+1)*sizeof(str));
2910 a->size += (a->keyStr[bn].size = keyStr[r-1].size)*sizeof(char_t);
2911 a->keyStr[bn].offs = (nat2)(sizeof(keyChar) - a->size);
2912 a->keyStr[bn].oid = b->keyStr[bn].oid;
2913 a->keyStr[bn].recId = keyStr[r-1].recId;
2914 memcpy(&a->keyChar[a->keyStr[bn].offs],
2915 &keyChar[keyStr[r-1].offs], keyStr[r-1].size*sizeof(char_t));
2916 an += 1;
2917 } else {
2918 memmove(a->keyStr + bn, a->keyStr, an*sizeof(str));
2919 }
2920 for (int i = 0; i < bn; i++) {
2921 a->keyStr[i] = b->keyStr[i];
2922 a->keyStr[i].offs -= a->size;
2923 }
2924 an += bn;
2925 a->nItems = an;
2926 a->size += b->size;
2927 memcpy(a->keyChar + sizeof(keyChar) - a->size,
2928 b->keyChar + sizeof(keyChar) - b->size,
2929 b->size);
2930 db->pool.unfix(b);
2931 db->freePage(keyStr[r-1].oid);
2932 return removeStrKey(db, r-1);
2933 }
2934 }
2935 } else {
2936 dbThickBtreePage* a = (dbThickBtreePage*)db->put(tie, ref[maxItems-r-1].oid);
2937 int an = a->nItems;
2938 if (r < int(nItems)) { // exists greater page
2939 dbThickBtreePage* b = (dbThickBtreePage*)db->get(ref[maxItems-r-2].oid);
2940 int bn = b->nItems;
2941 assert(bn >= an);
2942 if (height != 1) {
2943 memcpy(a->keyChar + an*sizeofType, keyChar + r*sizeofType,
2944 sizeofType);
2945 a->ref[maxItems-an-1].recId = ref[maxItems-r-1].recId;
2946 an += 1;
2947 bn += 1;
2948 }
2949 size_t merged_size = (an+bn)*(sizeof(reference) + sizeofType);
2950 if (merged_size > sizeof(keyChar)) {
2951 // reallocation of nodes between pages a and b
2952 int i = bn - ((an + bn) >> 1);
2953 dbPutTie tie;
2954 db->pool.unfix(b);
2955 b = (dbThickBtreePage*)db->put(tie, ref[maxItems-r-2].oid);
2956 memcpy(a->keyChar + an*sizeofType, b->keyChar, i*sizeofType);
2957 memmove(b->keyChar, b->keyChar + i*sizeofType, (bn-i)*sizeofType);
2958 memcpy(&a->ref[maxItems-an-i], &b->ref[maxItems-i], i*sizeof(reference));
2959 memmove(&b->ref[maxItems-bn+i], &b->ref[maxItems-bn], (bn-i)*sizeof(reference));
2960 memcpy(keyChar + r*sizeofType, a->keyChar + (an+i-1)*sizeofType, sizeofType);
2961 ref[maxItems-r-1].recId = a->ref[maxItems-an-i].recId;
2962 b->nItems -= i;
2963 a->nItems += i;
2964 return dbBtree::done;
2965 } else { // merge page b to a
2966 memcpy(a->keyChar + an*sizeofType, b->keyChar, bn*sizeofType);
2967 memcpy(&a->ref[maxItems-an-bn], &b->ref[maxItems-bn], bn*sizeof(reference));
2968 db->pool.unfix(b);
2969 db->freePage(ref[maxItems-r-2].oid);
2970 ref[maxItems-r-1].recId = ref[maxItems-r-2].recId;
2971 memmove(&ref[maxItems-nItems], &ref[maxItems-nItems-1],
2972 (nItems - r - 1)*sizeof(reference));
2973 memmove(keyChar + r*sizeofType, keyChar + (r+1)*sizeofType,
2974 (nItems - r - 1)*sizeofType);
2975 a->nItems += bn;
2976 nItems -= 1;
2977 return nItems == 0 || (nItems+1)*(sizeofType + sizeof(reference))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
2978 ? dbBtree::underflow : dbBtree::done;
2979 }
2980 } else { // page b is before a
2981 dbThickBtreePage* b = (dbThickBtreePage*)db->get(ref[maxItems-r].oid);
2982 int bn = b->nItems;
2983 assert(bn >= an);
2984 if (height != 1) {
2985 an += 1;
2986 bn += 1;
2987 }
2988 size_t merged_size = (an+bn)*(sizeof(reference) + sizeofType);
2989 if (merged_size > sizeof(keyChar)) {
2990 // reallocation of nodes between pages a and b
2991 int i = bn - ((an + bn) >> 1);
2992 dbPutTie tie;
2993 db->pool.unfix(b);
2994 b = (dbThickBtreePage*)db->put(tie, ref[maxItems-r].oid);
2995 memmove(a->keyChar + i*sizeofType, a->keyChar, an*sizeofType);
2996 memcpy(a->keyChar, b->keyChar + (bn-i)*sizeofType, i*sizeofType);
2997 memmove(&a->ref[maxItems-an-i], &a->ref[maxItems-an], an*sizeof(reference));
2998 memcpy(&a->ref[maxItems-i], &b->ref[maxItems-bn], i*sizeof(reference));
2999 if (height != 1) {
3000 memcpy(a->keyChar + (i-1)*sizeofType, keyChar + (r-1)*sizeofType,
3001 sizeofType);
3002 a->ref[maxItems-i].recId = ref[maxItems-r].recId;
3003 }
3004 memcpy(keyChar + (r-1)*sizeofType, b->keyChar + (bn-i-1)*sizeofType,
3005 sizeofType);
3006 ref[maxItems-r].recId = b->ref[maxItems-bn+i].recId;
3007 b->nItems -= i;
3008 a->nItems += i;
3009 return dbBtree::done;
3010 } else { // merge page b to a
3011 memmove(a->keyChar + bn*sizeofType, a->keyChar, an*sizeofType);
3012 memcpy(a->keyChar, b->keyChar, bn*sizeofType);
3013 memmove(&a->ref[maxItems-an-bn], &a->ref[maxItems-an], an*sizeof(reference));
3014 memcpy(&a->ref[maxItems-bn], &b->ref[maxItems-bn], bn*sizeof(reference));
3015 if (height != 1) {
3016 memcpy(a->keyChar + (bn-1)*sizeofType, keyChar + (r-1)*sizeofType,
3017 sizeofType);
3018 a->ref[maxItems-bn].recId = ref[maxItems-r].recId;
3019 }
3020 db->pool.unfix(b);
3021 db->freePage(ref[maxItems-r].oid);
3022 ref[maxItems-r].oid = ref[maxItems-r-1].oid;
3023 a->nItems += bn;
3024 nItems -= 1;
3025 return nItems == 0 || (nItems+1)*(sizeofType + sizeof(reference))*100 < sizeof(keyChar)*db->btreeUnderflowPercent
3026 ? dbBtree::underflow : dbBtree::done;
3027 }
3028 }
3029 }
3030 }
3031
3032 #undef REMOVE
3033 #define REMOVE(KEY,TYPE) { \
3034 TYPE key = rem.KEY; \
3035 while (l < r) { \
3036 i = (l+r) >> 1; \
3037 if (key > pg->KEY[i] || (key == pg->KEY[i] && recId > pg->ref[maxItems-i-1].recId)) { \
3038 l = i+1; \
3039 } else { \
3040 r = i; \
3041 } \
3042 } \
3043 if (--height == 0) { \
3044 while (r < n) { \
3045 if (key == pg->KEY[r]) { \
3046 if (pg->ref[maxItems-r-1].oid == oid) { \
3047 db->pool.unfix(pg); \
3048 pg = (dbThickBtreePage*)db->put(tie, pageId); \
3049 memmove(&pg->KEY[r], &pg->KEY[r+1], (n - r - 1)*sizeof(TYPE)); \
3050 memmove(&pg->ref[maxItems-n+1], &pg->ref[maxItems-n], \
3051 (n - r - 1)*sizeof(reference)); \
3052 pg->nItems = --n; \
3053 return n == 0 || n*(sizeof(TYPE) + sizeof(reference))*100 < sizeof(pg->keyChar)*db->btreeUnderflowPercent \
3054 ? dbBtree::underflow : dbBtree::done; \
3055 } \
3056 } else { \
3057 break; \
3058 } \
3059 r += 1; \
3060 } \
3061 db->pool.unfix(pg); \
3062 return dbBtree::not_found; \
3063 } \
3064 break; \
3065 }
3066
remove(dbDatabase * db,oid_t pageId,int type,int sizeofType,dbUDTComparator comparator,item & rem,int height)3067 int dbThickBtreePage::remove(dbDatabase* db, oid_t pageId, int type, int sizeofType,
3068 dbUDTComparator comparator, item& rem, int height)
3069 {
3070 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(pageId);
3071 dbPutTie tie;
3072 oid_t oid = rem.oid;
3073 oid_t recId = rem.recId;
3074 int i, n = pg->nItems, l = 0, r = n;
3075
3076 switch (type) {
3077 case dbField::tpBool:
3078 case dbField::tpInt1:
3079 REMOVE(keyInt1, int1);
3080 case dbField::tpInt2:
3081 REMOVE(keyInt2, int2);
3082 case dbField::tpInt4:
3083 REMOVE(keyInt4, int4);
3084 case dbField::tpInt8:
3085 REMOVE(keyInt8, db_int8);
3086 case dbField::tpReference:
3087 REMOVE(keyOid, oid_t);
3088 case dbField::tpReal4:
3089 REMOVE(keyReal4, real4);
3090 case dbField::tpReal8:
3091 REMOVE(keyReal8, real8);
3092 case dbField::tpRawBinary:
3093 {
3094 char* key = (char*)rem.keyChar;
3095 while (l < r) {
3096 i = (l+r) >> 1;
3097 int diff = comparator(key, pg->keyChar + i*sizeofType, sizeofType);
3098 if (diff > 0 || (diff == 0 && recId > pg->ref[maxItems-i-1].recId)) {
3099 l = i+1;
3100 } else {
3101 r = i;
3102 }
3103 }
3104 if (--height == 0) {
3105 while (r < n) {
3106 if (memcmp(key, pg->keyChar + r*sizeofType, sizeofType) == 0) {
3107 if (pg->ref[maxItems-r-1].oid == oid) {
3108 db->pool.unfix(pg);
3109 pg = (dbThickBtreePage*)db->put(tie, pageId);
3110 memmove(pg->keyChar + r*sizeofType, pg->keyChar + (r+1)*sizeofType,
3111 (n - r - 1)*sizeofType);
3112 memmove(&pg->ref[maxItems-n+1], &pg->ref[maxItems-n],
3113 (n - r - 1)*sizeof(reference));
3114 pg->nItems = --n;
3115 return n == 0 || n*(sizeofType + sizeof(reference))*100 < sizeof(pg->keyChar)*db->btreeUnderflowPercent
3116 ? dbBtree::underflow : dbBtree::done;
3117 }
3118 } else {
3119 break;
3120 }
3121 r += 1;
3122 }
3123 db->pool.unfix(pg);
3124 return dbBtree::not_found;
3125 }
3126 break;
3127 }
3128 case dbField::tpString:
3129 {
3130 char_t* key = rem.keyChar;
3131 while (l < r) {
3132 i = (l+r) >> 1;
3133 int diff = SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[i].offs]);
3134 if (diff > 0 || (diff == 0 && recId > pg->keyStr[i].recId)) {
3135 l = i+1;
3136 } else {
3137 r = i;
3138 }
3139 }
3140 if (--height != 0) {
3141 do {
3142 switch (remove(db, pg->keyStr[r].oid, type, sizeofType, comparator, rem, height)) {
3143 case dbBtree::underflow:
3144 db->pool.unfix(pg);
3145 pg = (dbThickBtreePage*)db->put(tie, pageId);
3146 return pg->handlePageUnderflow(db, r, type, sizeofType, rem, height);
3147 case dbBtree::done:
3148 db->pool.unfix(pg);
3149 return dbBtree::done;
3150 case dbBtree::overflow:
3151 db->pool.unfix(pg);
3152 pg = (dbThickBtreePage*)db->put(tie, pageId);
3153 return pg->insertStrKey(db, r, rem, height);
3154 }
3155 } while (++r <= n);
3156 } else {
3157 while (r < n) {
3158 if (SCMP(key, (char_t*)&pg->keyChar[pg->keyStr[r].offs]) == 0) {
3159 if (pg->keyStr[r].oid == oid) {
3160 db->pool.unfix(pg);
3161 pg = (dbThickBtreePage*)db->put(tie, pageId);
3162 return pg->removeStrKey(db, r);
3163 }
3164 } else {
3165 break;
3166 }
3167 r += 1;
3168 }
3169 }
3170 db->pool.unfix(pg);
3171 return dbBtree::not_found;
3172 }
3173 default:
3174 assert(false);
3175 }
3176 do {
3177 switch (remove(db, pg->ref[maxItems-r-1].oid, type, sizeofType, comparator, rem, height)) {
3178 case dbBtree::underflow:
3179 db->pool.unfix(pg);
3180 pg = (dbThickBtreePage*)db->put(tie, pageId);
3181 return pg->handlePageUnderflow(db, r, type, sizeofType, rem, height);
3182 case dbBtree::done:
3183 db->pool.unfix(pg);
3184 return dbBtree::done;
3185 }
3186 } while (++r <= n);
3187
3188 db->pool.unfix(pg);
3189 return dbBtree::not_found;
3190 }
3191
3192
purge(dbDatabase * db,oid_t pageId,int type,int height)3193 void dbThickBtreePage::purge(dbDatabase* db, oid_t pageId, int type, int height)
3194 {
3195 if (--height != 0) {
3196 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(pageId);
3197 int n = pg->nItems+1;
3198 if (type == dbField::tpString) { // page of strings
3199 while (--n >= 0) {
3200 purge(db, pg->keyStr[n].oid, type, height);
3201 }
3202 } else {
3203 while (--n >= 0) {
3204 purge(db, pg->ref[maxItems-n-1].oid, type, height);
3205 }
3206 }
3207 db->pool.unfix(pg);
3208 }
3209 db->freePage(pageId);
3210 }
3211
traverseForward(dbDatabase * db,dbAnyCursor * cursor,dbExprNode * condition,int type,int height)3212 bool dbThickBtreePage::traverseForward(dbDatabase* db, dbAnyCursor* cursor,
3213 dbExprNode* condition, int type, int height)
3214 {
3215 int n = nItems;
3216 if (--height != 0) {
3217 if (type == dbField::tpString) { // page of strings
3218 for (int i = 0; i <= n; i++) {
3219 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[i].oid);
3220 if (!pg->traverseForward(db, cursor, condition, type, height)) {
3221 db->pool.unfix(pg);
3222 return false;
3223 }
3224 db->pool.unfix(pg);
3225 }
3226 } else {
3227 for (int i = 0; i <= n; i++) {
3228 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-i-1].oid);
3229 if (!pg->traverseForward(db, cursor, condition, type, height)) {
3230 db->pool.unfix(pg);
3231 return false;
3232 }
3233 db->pool.unfix(pg);
3234 }
3235 }
3236 } else {
3237 if (type != dbField::tpString) { // page of scalars
3238 if (condition == NULL) {
3239 for (int i = 0; i < n; i++) {
3240 if (!cursor->add(ref[maxItems-1-i].oid)) {
3241 return false;
3242 }
3243 }
3244 } else {
3245 dbTableDescriptor* table = cursor->table;
3246 for (int i = 0; i < n; i++) {
3247 if (db->evaluateBoolean(condition, ref[maxItems-1-i].oid, table, cursor)) {
3248 if (!cursor->add(ref[maxItems-1-i].oid)) {
3249 return false;
3250 }
3251 }
3252 }
3253 }
3254 } else { // page of strings
3255 if (condition == NULL) {
3256 for (int i = 0; i < n; i++) {
3257 if (!cursor->add(keyStr[i].oid)) {
3258 return false;
3259 }
3260 }
3261 } else {
3262 dbTableDescriptor* table = cursor->table;
3263 for (int i = 0; i < n; i++) {
3264 if (db->evaluateBoolean(condition, keyStr[i].oid, table, cursor)) {
3265 if (!cursor->add(keyStr[i].oid)) {
3266 return false;
3267 }
3268 }
3269 }
3270 }
3271 }
3272 }
3273 return true;
3274 }
3275
3276
traverseBackward(dbDatabase * db,dbAnyCursor * cursor,dbExprNode * condition,int type,int height)3277 bool dbThickBtreePage::traverseBackward(dbDatabase* db, dbAnyCursor* cursor,
3278 dbExprNode* condition, int type, int height)
3279 {
3280 int n = nItems;
3281 if (--height != 0) {
3282 if (type == dbField::tpString) { // page of strings
3283 do {
3284 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(keyStr[n].oid);
3285 if (!pg->traverseBackward(db, cursor, condition, type, height)) {
3286 db->pool.unfix(pg);
3287 return false;
3288 }
3289 db->pool.unfix(pg);
3290 } while (--n >= 0);
3291 } else {
3292 do {
3293 dbThickBtreePage* pg = (dbThickBtreePage*)db->get(ref[maxItems-n-1].oid);
3294 if (!pg->traverseBackward(db, cursor, condition, type, height)) {
3295 db->pool.unfix(pg);
3296 return false;
3297 }
3298 db->pool.unfix(pg);
3299 } while (--n >= 0);
3300 }
3301 } else {
3302 if (type != dbField::tpString) { // page of scalars
3303 if (condition == NULL) {
3304 while (--n >= 0) {
3305 if (!cursor->add(ref[maxItems-1-n].oid)) {
3306 return false;
3307 }
3308 }
3309 } else {
3310 dbTableDescriptor* table = cursor->table;
3311 while (--n >= 0) {
3312 if (db->evaluateBoolean(condition, ref[maxItems-1-n].oid, table, cursor)) {
3313 if (!cursor->add(ref[maxItems-1-n].oid)) {
3314 return false;
3315 }
3316 }
3317 }
3318 }
3319 } else { // page of strings
3320 if (condition == NULL) {
3321 while (--n >= 0) {
3322 if (!cursor->add(keyStr[n].oid)) {
3323 return false;
3324 }
3325 }
3326 } else {
3327 dbTableDescriptor* table = cursor->table;
3328 while (--n >= 0) {
3329 if (db->evaluateBoolean(condition, keyStr[n].oid, table, cursor)) {
3330 if (!cursor->add(keyStr[n].oid)) {
3331 return false;
3332 }
3333 }
3334 }
3335 }
3336 }
3337 }
3338 return true;
3339 }
3340
boolComparator(void * p,void * q,size_t)3341 static int boolComparator(void* p, void* q, size_t)
3342 {
3343 return (int)*(bool*)p - (int)*(bool*)q;
3344 }
3345
int1Comparator(void * p,void * q,size_t)3346 static int int1Comparator(void* p, void* q, size_t)
3347 {
3348 return *(int1*)p - *(int1*)q;
3349 }
3350
int2Comparator(void * p,void * q,size_t)3351 static int int2Comparator(void* p, void* q, size_t)
3352 {
3353 return *(int2*)p - *(int2*)q;
3354 }
3355
int4Comparator(void * p,void * q,size_t)3356 static int int4Comparator(void* p, void* q, size_t)
3357 {
3358 return *(int4*)p < *(int4*)q ? -1 : *(int4*)p == *(int4*)q ? 0 : 1;
3359 }
3360
int8Comparator(void * p,void * q,size_t)3361 static int int8Comparator(void* p, void* q, size_t)
3362 {
3363 return *(db_int8*)p < *(db_int8*)q ? -1 : *(db_int8*)p == *(db_int8*)q ? 0 : 1;
3364 }
3365
real4Comparator(void * p,void * q,size_t)3366 static int real4Comparator(void* p, void* q, size_t)
3367 {
3368 return *(real4*)p < *(real4*)q ? -1 : *(real4*)p == *(real4*)q ? 0 : 1;
3369 }
3370
real8Comparator(void * p,void * q,size_t)3371 static int real8Comparator(void* p, void* q, size_t)
3372 {
3373 return *(real8*)p < *(real8*)q ? -1 : *(real8*)p == *(real8*)q ? 0 : 1;
3374 }
3375
stringComparator(void * p,void * q,size_t)3376 static int stringComparator(void* p, void* q, size_t)
3377 {
3378 return SCMP((char_t*)p, (char_t*)q);
3379 }
3380
3381
3382
3383 static dbUDTComparator const scalarComparators[] =
3384 {
3385 &boolComparator,
3386 &int1Comparator,
3387 &int2Comparator,
3388 &int4Comparator,
3389 &int8Comparator,
3390 &real4Comparator,
3391 &real8Comparator,
3392 &stringComparator
3393 };
3394
init(dbDatabase * db,oid_t treeId,dbSearchContext & sc,dbUDTComparator comparator)3395 void dbBtreeIterator::init(dbDatabase* db, oid_t treeId, dbSearchContext& sc, dbUDTComparator comparator)
3396 {
3397 dbGetTie tie;
3398 dbBtree* tree = (dbBtree*)db->getRow(tie, treeId);
3399 type = tree->type;
3400 sizeofType = tree->sizeofType;
3401 this->db = db;
3402 this->sc = sc;
3403 this->treeId = treeId;
3404 this->comparator = (type <= dbField::tpString) ? scalarComparators[type] : comparator;
3405 if (tree->flags & dbBtree::FLAGS_THICK) {
3406 getOid = (type == dbField::tpString) ? &dbBtreeIterator::getStringThickBtreePageOid : &dbBtreeIterator::getScalarThickBtreePageOid;
3407 getKey = (type == dbField::tpString) ? &dbBtreeIterator::getStringThickBtreePageKey : &dbBtreeIterator::getScalarThickBtreePageKey;
3408 } else {
3409 getOid = (type == dbField::tpString) ? &dbBtreeIterator::getStringBtreePageOid : &dbBtreeIterator::getScalarBtreePageOid;
3410 getKey = (type == dbField::tpString) ? &dbBtreeIterator::getStringBtreePageKey : &dbBtreeIterator::getScalarBtreePageKey;
3411 }
3412 }
3413
3414
getStringBtreePageOid(byte * pg,int i)3415 oid_t dbBtreeIterator::getStringBtreePageOid(byte* pg, int i)
3416 {
3417 return ((dbBtreePage*)pg)->keyStr[i].oid;
3418 }
3419
getScalarBtreePageOid(byte * pg,int i)3420 oid_t dbBtreeIterator::getScalarBtreePageOid(byte* pg, int i)
3421 {
3422 return ((dbBtreePage*)pg)->record[dbBtreePage::maxItems-i-1];
3423 }
3424
getStringThickBtreePageOid(byte * pg,int i)3425 oid_t dbBtreeIterator::getStringThickBtreePageOid(byte* pg, int i)
3426 {
3427 return ((dbThickBtreePage*)pg)->keyStr[i].oid;
3428 }
3429
getScalarThickBtreePageOid(byte * pg,int i)3430 oid_t dbBtreeIterator::getScalarThickBtreePageOid(byte* pg, int i)
3431 {
3432 return ((dbThickBtreePage*)pg)->ref[dbThickBtreePage::maxItems-i-1].oid;
3433 }
3434
getStringBtreePageKey(byte * pg,int i)3435 void* dbBtreeIterator::getStringBtreePageKey(byte* pg, int i)
3436 {
3437 return &((dbBtreePage*)pg)->keyChar[((dbBtreePage*)pg)->keyStr[i].offs];
3438 }
3439
getScalarBtreePageKey(byte * pg,int i)3440 void* dbBtreeIterator::getScalarBtreePageKey(byte* pg, int i)
3441 {
3442 return ((dbBtreePage*)pg)->keyChar + i*sizeofType;
3443 }
3444
getStringThickBtreePageKey(byte * pg,int i)3445 void* dbBtreeIterator::getStringThickBtreePageKey(byte* pg, int i)
3446 {
3447 return &((dbThickBtreePage*)pg)->keyChar[((dbThickBtreePage*)pg)->keyStr[i].offs];
3448 }
3449
getScalarThickBtreePageKey(byte * pg,int i)3450 void* dbBtreeIterator::getScalarThickBtreePageKey(byte* pg, int i)
3451 {
3452 return ((dbThickBtreePage*)pg)->keyChar + i*sizeofType;
3453 }
3454
3455
3456
reset(bool ascent)3457 oid_t dbBtreeIterator::reset(bool ascent)
3458 {
3459 oid_t curr = 0;
3460 if (sc.prefixLength != 0 && !ascent) {
3461 oid_t last = reset(true);
3462 while ((curr = next()) != 0) {
3463 last = curr;
3464 }
3465 return last;
3466 }
3467 int i, l, r;
3468 dbGetTie tie;
3469 dbBtree* tree = (dbBtree*)db->getRow(tie, treeId);
3470 int h = tree->height;
3471 height = h;
3472 if (h == 0) {
3473 return 0;
3474 }
3475 int sp = 0;
3476 oid_t pageId = tree->root;
3477
3478 if (ascent) {
3479 char_t* firstKey = sc.firstKey;
3480 if (firstKey == NULL) {
3481 while (--h > 0) {
3482 posStack[sp] = 0;
3483 pageStack[sp] = pageId;
3484 byte* pg = db->get(pageId);
3485 pageId = (this->*getOid)(pg, 0);
3486 db->pool.unfix(pg);
3487 sp += 1;
3488 }
3489 pageStack[sp++] = pageId;
3490 byte* pg = db->get(pageId);
3491 curr = gotoNextItem(pg, -1, true);
3492 } else {
3493 while (--h > 0) {
3494 pageStack[sp] = pageId;
3495 byte* pg = db->get(pageId);
3496 l = 0;
3497 r = nItems(pg);
3498 while (l < r) {
3499 i = (l+r) >> 1;
3500 if (comparator(firstKey, (this->*getKey)(pg, i), sizeofType) >= sc.firstKeyInclusion) {
3501 l = i + 1;
3502 } else {
3503 r = i;
3504 }
3505 }
3506 assert(r == l);
3507 posStack[sp] = r;
3508 pageId = (this->*getOid)(pg, r);
3509 db->pool.unfix(pg);
3510 sp += 1;
3511 }
3512 pageStack[sp++] = pageId;
3513 byte* pg = db->get(pageId);
3514 l = 0;
3515 r = nItems(pg);
3516 while (l < r) {
3517 i = (l+r) >> 1;
3518 if (comparator(firstKey, (this->*getKey)(pg, i), sizeofType) >= sc.firstKeyInclusion) {
3519 l = i + 1;
3520 } else {
3521 r = i;
3522 }
3523 }
3524 assert(r == l);
3525 curr = gotoNextItem(pg, r-1, true);
3526 }
3527 } else { // descent order
3528 char_t* lastKey = sc.lastKey;
3529 if (lastKey == NULL) {
3530 while (--h > 0) {
3531 pageStack[sp] = pageId;
3532 byte* pg = db->get(pageId);
3533 posStack[sp] = nItems(pg);
3534 pageId = (this->*getOid)(pg, posStack[sp]);
3535 db->pool.unfix(pg);
3536 sp += 1;
3537 }
3538 pageStack[sp++] = pageId;
3539 byte* pg = db->get(pageId);
3540 curr = gotoNextItem(pg, nItems(pg), false);
3541 } else {
3542 while (--h > 0) {
3543 pageStack[sp] = pageId;
3544 byte* pg = db->get(pageId);
3545 l = 0;
3546 r = nItems(pg);
3547 while (l < r) {
3548 i = (l+r) >> 1;
3549 if (comparator(lastKey, (this->*getKey)(pg, i), sizeofType) >= 1-sc.lastKeyInclusion) {
3550 l = i + 1;
3551 } else {
3552 r = i;
3553 }
3554 }
3555 assert(r == l);
3556 posStack[sp] = r;
3557 pageId = (this->*getOid)(pg, r);
3558 db->pool.unfix(pg);
3559 sp += 1;
3560 }
3561 pageStack[sp++] = pageId;
3562 byte* pg = db->get(pageId);
3563 l = 0;
3564 r = nItems(pg);
3565 while (l < r) {
3566 i = (l+r) >> 1;
3567 if (comparator(lastKey, (this->*getKey)(pg, i), sizeofType) >= 1-sc.lastKeyInclusion) {
3568 l = i + 1;
3569 } else {
3570 r = i;
3571 }
3572 }
3573 curr = gotoNextItem(pg, r, false);
3574 }
3575 }
3576 if (curr == 0) {
3577 height = 0;
3578 }
3579 return curr;
3580 }
3581
first()3582 oid_t dbBtreeIterator::first()
3583 {
3584 return reset(sc.ascent);
3585 }
3586
last()3587 oid_t dbBtreeIterator::last()
3588 {
3589 return reset(!sc.ascent);
3590 }
3591
next()3592 oid_t dbBtreeIterator::next()
3593 {
3594 if (height == 0) {
3595 return 0;
3596 }
3597 byte* pg = db->get(pageStack[height-1]);
3598 return gotoNextItem(pg, posStack[height-1], sc.ascent);
3599 }
3600
prev()3601 oid_t dbBtreeIterator::prev()
3602 {
3603 if (height == 0) {
3604 return 0;
3605 }
3606 byte* pg = db->get(pageStack[height-1]);
3607 return gotoNextItem(pg, posStack[height-1], !sc.ascent);
3608 }
3609
gotoNextItem(byte * pg,int pos,bool forward)3610 oid_t dbBtreeIterator::gotoNextItem(byte* pg, int pos, bool forward)
3611 {
3612 dbTableDescriptor* table = sc.cursor->table;
3613 int sp = height;
3614 oid_t oldPageStack[MAX_BTREE_HEIGHT];
3615 int oldPosStack[MAX_BTREE_HEIGHT];
3616
3617 memcpy(oldPageStack, pageStack, sp*sizeof(oid_t));
3618 memcpy(oldPosStack, posStack, sp*sizeof(int));
3619
3620 oid_t curr = 0;
3621 while (true) {
3622 if (forward) {
3623 if (++pos == nItems(pg)) {
3624 while (--sp != 0) {
3625 db->pool.unfix(pg);
3626 pos = posStack[sp-1];
3627 pg = db->get(pageStack[sp-1]);
3628 if (++pos <= nItems(pg)) {
3629 posStack[sp-1] = pos;
3630 do {
3631 oid_t pageId = (this->*getOid)(pg, pos);
3632 db->pool.unfix(pg);
3633 pg = db->get(pageId);
3634 pageStack[sp] = pageId;
3635 posStack[sp] = pos = 0;
3636 } while (++sp < height);
3637
3638 break;
3639 }
3640 }
3641 } else {
3642 posStack[sp-1] = pos;
3643 }
3644 if (sp == 0
3645 || (sc.lastKey != NULL
3646 && comparator((this->*getKey)(pg, pos), sc.lastKey, sizeofType) >= sc.lastKeyInclusion)
3647 || (sc.prefixLength != 0 && memcmp((this->*getKey)(pg, pos), sc.firstKey, sc.prefixLength*sizeof(char_t)) != 0))
3648 {
3649 // memcpy(pageStack, oldPageStack, sp*sizeof(oid_t));
3650 // memcpy(posStack, oldPosStack, sp*sizeof(int));
3651 break;
3652 }
3653 } else { // backward
3654 if (--pos < 0) {
3655 while (--sp != 0) {
3656 db->pool.unfix(pg);
3657 pos = posStack[sp-1];
3658 pg = db->get(pageStack[sp-1]);
3659 if (--pos >= 0) {
3660 posStack[sp-1] = pos;
3661 do {
3662 oid_t pageId = (this->*getOid)(pg, pos);
3663 db->pool.unfix(pg);
3664 pg = db->get(pageId);
3665 pageStack[sp] = pageId;
3666 posStack[sp] = pos = nItems(pg);
3667 } while (++sp < height);
3668
3669 posStack[sp-1] = --pos;
3670 break;
3671 }
3672 }
3673 } else {
3674 posStack[sp-1] = pos;
3675 }
3676 if (sp == 0
3677 || (sc.firstKey != NULL && comparator(sc.firstKey, (this->*getKey)(pg, pos), sizeofType) >= sc.firstKeyInclusion))
3678 {
3679 memcpy(pageStack, oldPageStack, sp*sizeof(oid_t));
3680 memcpy(posStack, oldPosStack, sp*sizeof(int));
3681 break;
3682 }
3683 }
3684 if (sc.condition == NULL || db->evaluateBoolean(sc.condition, (this->*getOid)(pg, pos), table, sc.cursor)) {
3685 curr = (this->*getOid)(pg, pos);
3686 break;
3687 }
3688 }
3689 db->pool.unfix(pg);
3690 return curr;
3691 }
3692
3693
3694
3695 END_GIGABASE_NAMESPACE
3696