1 /* This file is part of the Zebra server.
2    Copyright (C) 2004-2013 Index Data
3 
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 
18 */
19 
20 #if HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29 
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33 
34 
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION_NO_ROOT 0
37 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
38 
39 struct ISAMB_head {
40     zint first_block;
41     zint last_block;
42     zint free_list;
43     zint no_items;
44     int block_size;
45     int block_max;
46     int block_offset;
47 };
48 
49 /* if 1, upper nodes items are encoded; 0 if not encoded */
50 #define INT_ENCODE 1
51 
52 /* maximum size of encoded buffer */
53 #define DST_ITEM_MAX 5000
54 
55 /* max page size for _any_ isamb use */
56 #define ISAMB_MAX_PAGE 32768
57 
58 #define ISAMB_MAX_LEVEL 10
59 /* approx 2*max page + max size of item */
60 #define DST_BUF_SIZE (2*ISAMB_MAX_PAGE+DST_ITEM_MAX+100)
61 
62 /* should be maximum block size of multiple thereof */
63 #define ISAMB_CACHE_ENTRY_SIZE ISAMB_MAX_PAGE
64 
65 /* CAT_MAX: _must_ be power of 2 */
66 #define CAT_MAX 4
67 #define CAT_MASK (CAT_MAX-1)
68 /* CAT_NO: <= CAT_MAX */
69 #define CAT_NO 4
70 
71 /* Smallest block size */
72 #define ISAMB_MIN_SIZE 32
73 /* Size factor */
74 #define ISAMB_FAC_SIZE 4
75 
76 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
77 #define ISAMB_PTR_CODEC 1
78 
79 struct ISAMB_cache_entry {
80     ISAM_P pos;
81     unsigned char *buf;
82     int dirty;
83     int hits;
84     struct ISAMB_cache_entry *next;
85 };
86 
87 struct ISAMB_file {
88     BFile bf;
89     int head_dirty;
90     struct ISAMB_head head;
91     struct ISAMB_cache_entry *cache_entries;
92 };
93 
94 struct ISAMB_s {
95     BFiles bfs;
96     ISAMC_M *method;
97 
98     struct ISAMB_file *file;
99     int no_cat;
100     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
101     int log_io;        /* log level for bf_read/bf_write calls */
102     int log_freelist;  /* log level for freelist handling */
103     zint skipped_numbers; /* on a leaf node */
104     zint returned_numbers;
105     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
106     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
107     zint number_of_int_splits;
108     zint number_of_leaf_splits;
109     int enable_int_count; /* whether we count nodes (or not) */
110     int cache_size; /* size of blocks to cache (if cache=1) */
111     int minor_version;
112     zint root_ptr;
113 };
114 
115 struct ISAMB_block {
116     ISAM_P pos;
117     int cat;
118     int size;
119     int leaf;
120     int dirty;
121     int deleted;
122     int offset;
123     zint no_items;  /* number of nodes in this + children */
124     char *bytes;
125     char *cbuf;
126     unsigned char *buf;
127     void *decodeClientData;
128     int log_rw;
129 };
130 
131 struct ISAMB_PP_s {
132     ISAMB isamb;
133     ISAM_P pos;
134     int level;
135     int maxlevel; /* total depth */
136     zint total_size;
137     zint no_blocks;
138     zint skipped_numbers; /* on a leaf node */
139     zint returned_numbers;
140     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
141     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
142     struct ISAMB_block **block;
143     int scope;  /* on what level we forward */
144 };
145 
146 
147 #define encode_item_len encode_ptr
148 #if ISAMB_PTR_CODEC
encode_ptr(char ** dst,zint pos)149 static void encode_ptr(char **dst, zint pos)
150 {
151     unsigned char *bp = (unsigned char*) *dst;
152 
153     while (pos > 127)
154     {
155         *bp++ = (unsigned char) (128 | (pos & 127));
156         pos = pos >> 7;
157     }
158     *bp++ = (unsigned char) pos;
159     *dst = (char *) bp;
160 }
161 #else
encode_ptr(char ** dst,zint pos)162 static void encode_ptr(char **dst, zint pos)
163 {
164     memcpy(*dst, &pos, sizeof(pos));
165     (*dst) += sizeof(pos);
166 }
167 #endif
168 
169 #define decode_item_len decode_ptr
170 #if ISAMB_PTR_CODEC
decode_ptr(const char ** src,zint * pos)171 static void decode_ptr(const char **src, zint *pos)
172 {
173     zint d = 0;
174     unsigned char c;
175     unsigned r = 0;
176 
177     while (((c = *(const unsigned char *)((*src)++)) & 128))
178     {
179         d += ((zint) (c & 127) << r);
180 	r += 7;
181     }
182     d += ((zint) c << r);
183     *pos = d;
184 }
185 #else
decode_ptr(const char ** src,zint * pos)186 static void decode_ptr(const char **src, zint *pos)
187 {
188     memcpy(pos, *src, sizeof(*pos));
189     (*src) += sizeof(*pos);
190 }
191 #endif
192 
193 
isamb_set_int_count(ISAMB b,int v)194 void isamb_set_int_count(ISAMB b, int v)
195 {
196     b->enable_int_count = v;
197 }
198 
isamb_set_cache_size(ISAMB b,int v)199 void isamb_set_cache_size(ISAMB b, int v)
200 {
201     b->cache_size = v;
202 }
203 
isamb_open2(BFiles bfs,const char * name,int writeflag,ISAMC_M * method,int cache,int no_cat,int * sizes,int use_root_ptr)204 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
205                   int cache, int no_cat, int *sizes, int use_root_ptr)
206 {
207     ISAMB isamb = xmalloc(sizeof(*isamb));
208     int i;
209 
210     assert(no_cat <= CAT_MAX);
211 
212     isamb->bfs = bfs;
213     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
214     memcpy(isamb->method, method, sizeof(*method));
215     isamb->no_cat = no_cat;
216     isamb->log_io = 0;
217     isamb->log_freelist = 0;
218     isamb->cache = cache;
219     isamb->skipped_numbers = 0;
220     isamb->returned_numbers = 0;
221     isamb->number_of_int_splits = 0;
222     isamb->number_of_leaf_splits = 0;
223     isamb->enable_int_count = 1;
224     isamb->cache_size = 40;
225 
226     if (use_root_ptr)
227         isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
228     else
229         isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
230 
231     isamb->root_ptr = 0;
232 
233     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
234 	isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
235 
236     if (cache == -1)
237     {
238         yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
239     }
240     else
241     {
242         assert(cache == 0 || cache == 1);
243     }
244     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
245 
246     for (i = 0; i < isamb->no_cat; i++)
247     {
248         isamb->file[i].bf = 0;
249         isamb->file[i].head_dirty = 0;
250         isamb->file[i].cache_entries = 0;
251     }
252 
253     for (i = 0; i < isamb->no_cat; i++)
254     {
255         char fname[DST_BUF_SIZE];
256 	char hbuf[DST_BUF_SIZE];
257 
258         sprintf(fname, "%s%c", name, i+'A');
259         if (cache)
260             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
261                                         writeflag);
262         else
263             isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
264 
265         if (!isamb->file[i].bf)
266         {
267             isamb_close(isamb);
268             return 0;
269         }
270 
271         /* fill-in default values (for empty isamb) */
272 	isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
273 	isamb->file[i].head.last_block = isamb->file[i].head.first_block;
274 	isamb->file[i].head.block_size = sizes[i];
275         assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
276 #if ISAMB_PTR_CODEC
277 	if (i == isamb->no_cat-1 || sizes[i] > 128)
278 	    isamb->file[i].head.block_offset = 8;
279 	else
280 	    isamb->file[i].head.block_offset = 4;
281 #else
282 	isamb->file[i].head.block_offset = 11;
283 #endif
284 	isamb->file[i].head.block_max =
285 	    sizes[i] - isamb->file[i].head.block_offset;
286 	isamb->file[i].head.free_list = 0;
287 	if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
288 	{
289 	    /* got header assume "isamb"major minor len can fit in 16 bytes */
290 	    zint zint_tmp;
291 	    int major, minor, len, pos = 0;
292 	    int left;
293 	    const char *src = 0;
294 	    if (memcmp(hbuf, "isamb", 5))
295 	    {
296 		yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
297                 isamb_close(isamb);
298 		return 0;
299 	    }
300 	    if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
301 	    {
302 		yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
303                 isamb_close(isamb);
304 		return 0;
305 	    }
306 	    if (major != ISAMB_MAJOR_VERSION)
307 	    {
308 		yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
309                         fname, major, ISAMB_MAJOR_VERSION);
310                 isamb_close(isamb);
311 		return 0;
312 	    }
313 	    for (left = len - sizes[i]; left > 0; left = left - sizes[i])
314 	    {
315 		pos++;
316 		if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
317 		{
318 		    yaz_log(YLOG_WARN, "truncated isamb header for "
319                             "file=%s len=%d pos=%d",
320                             fname, len, pos);
321                     isamb_close(isamb);
322 		    return 0;
323 		}
324 	    }
325 	    src = hbuf + 16;
326 	    decode_ptr(&src, &isamb->file[i].head.first_block);
327 	    decode_ptr(&src, &isamb->file[i].head.last_block);
328 	    decode_ptr(&src, &zint_tmp);
329 	    isamb->file[i].head.block_size = (int) zint_tmp;
330 	    decode_ptr(&src, &zint_tmp);
331 	    isamb->file[i].head.block_max = (int) zint_tmp;
332 	    decode_ptr(&src, &isamb->file[i].head.free_list);
333             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
334                 decode_ptr(&src, &isamb->root_ptr);
335 	}
336         assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
337         /* must rewrite the header if root ptr is in use (bug #1017) */
338         if (use_root_ptr && writeflag)
339             isamb->file[i].head_dirty = 1;
340         else
341             isamb->file[i].head_dirty = 0;
342         assert(isamb->file[i].head.block_size == sizes[i]);
343     }
344 #if ISAMB_DEBUG
345     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
346 #endif
347     return isamb;
348 }
349 
isamb_open(BFiles bfs,const char * name,int writeflag,ISAMC_M * method,int cache)350 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
351 		 int cache)
352 {
353     int sizes[CAT_NO];
354     int i, b_size = ISAMB_MIN_SIZE;
355 
356     for (i = 0; i<CAT_NO; i++)
357     {
358         sizes[i] = b_size;
359         b_size = b_size * ISAMB_FAC_SIZE;
360     }
361     return isamb_open2(bfs, name, writeflag, method, cache,
362                        CAT_NO, sizes, 0);
363 }
364 
flush_blocks(ISAMB b,int cat)365 static void flush_blocks(ISAMB b, int cat)
366 {
367     while (b->file[cat].cache_entries)
368     {
369         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
370         b->file[cat].cache_entries = ce_this->next;
371 
372         if (ce_this->dirty)
373         {
374             yaz_log(b->log_io, "bf_write: flush_blocks");
375             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
376         }
377         xfree(ce_this->buf);
378         xfree(ce_this);
379     }
380 }
381 
cache_block(ISAMB b,ISAM_P pos,unsigned char * userbuf,int wr)382 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
383 {
384     int cat = (int) (pos&CAT_MASK);
385     int off = (int) (((pos/CAT_MAX) &
386                       (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
387                      * b->file[cat].head.block_size);
388     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
389     int no = 0;
390     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
391 
392     if (!b->cache)
393         return 0;
394 
395     assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
396     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
397     {
398         ce_last = ce;
399         if ((*ce)->pos == norm)
400         {
401             ce_this = *ce;
402             *ce = (*ce)->next;   /* remove from list */
403 
404             ce_this->next = b->file[cat].cache_entries;  /* move to front */
405             b->file[cat].cache_entries = ce_this;
406 
407             if (wr)
408             {
409                 memcpy(ce_this->buf + off, userbuf,
410                        b->file[cat].head.block_size);
411                 ce_this->dirty = 1;
412             }
413             else
414                 memcpy(userbuf, ce_this->buf + off,
415                        b->file[cat].head.block_size);
416             return 1;
417         }
418     }
419     if (no >= b->cache_size)
420     {
421         assert(ce_last && *ce_last);
422         ce_this = *ce_last;
423         *ce_last = 0;  /* remove the last entry from list */
424         if (ce_this->dirty)
425         {
426             yaz_log(b->log_io, "bf_write: cache_block");
427             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
428         }
429         xfree(ce_this->buf);
430         xfree(ce_this);
431     }
432     ce_this = xmalloc(sizeof(*ce_this));
433     ce_this->next = b->file[cat].cache_entries;
434     b->file[cat].cache_entries = ce_this;
435     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
436     ce_this->pos = norm;
437     yaz_log(b->log_io, "bf_read: cache_block");
438     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
439         memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
440     if (wr)
441     {
442         memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
443         ce_this->dirty = 1;
444     }
445     else
446     {
447         ce_this->dirty = 0;
448         memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
449     }
450     return 1;
451 }
452 
453 
isamb_close(ISAMB isamb)454 void isamb_close(ISAMB isamb)
455 {
456     int i;
457     for (i = 0; isamb->accessed_nodes[i]; i++)
458         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
459                 ZINT_FORMAT" skipped",
460                 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
461     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
462             "skipped "ZINT_FORMAT,
463             isamb->skipped_numbers, isamb->returned_numbers);
464 
465     for (i = 0; i<isamb->no_cat; i++)
466     {
467         flush_blocks(isamb, i);
468         if (isamb->file[i].head_dirty)
469 	{
470 	    char hbuf[DST_BUF_SIZE];
471 	    int major = ISAMB_MAJOR_VERSION;
472 	    int len = 16;
473 	    char *dst = hbuf + 16;
474 	    int pos = 0, left;
475 	    int b_size = isamb->file[i].head.block_size;
476 
477 	    encode_ptr(&dst, isamb->file[i].head.first_block);
478 	    encode_ptr(&dst, isamb->file[i].head.last_block);
479 	    encode_ptr(&dst, isamb->file[i].head.block_size);
480 	    encode_ptr(&dst, isamb->file[i].head.block_max);
481 	    encode_ptr(&dst, isamb->file[i].head.free_list);
482 
483             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
484                 encode_ptr(&dst, isamb->root_ptr);
485 
486 	    memset(dst, '\0', b_size); /* ensure no random bytes are written */
487 
488 	    len = dst - hbuf;
489 
490 	    /* print exactly 16 bytes (including trailing 0) */
491 	    sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
492                     isamb->minor_version, len);
493 
494             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
495 
496 	    for (left = len - b_size; left > 0; left = left - b_size)
497 	    {
498 		pos++;
499 		bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
500 	    }
501 	}
502         if (isamb->file[i].bf)
503             bf_close (isamb->file[i].bf);
504     }
505     xfree(isamb->file);
506     xfree(isamb->method);
507     xfree(isamb);
508 }
509 
510 /* open_block: read one block at pos.
511    Decode leading sys bytes .. consisting of
512    Offset:Meaning
513    0: leader byte, != 0 leaf, == 0, non-leaf
514    1-2: used size of block
515    3-7*: number of items and all children
516 
517    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
518    of items. We can thus have at most 2^40 nodes.
519 */
open_block(ISAMB b,ISAM_P pos)520 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
521 {
522     int cat = (int) (pos&CAT_MASK);
523     const char *src;
524     int offset = b->file[cat].head.block_offset;
525     struct ISAMB_block *p;
526     if (!pos)
527         return 0;
528     p = xmalloc(sizeof(*p));
529     p->pos = pos;
530     p->cat = (int) (pos & CAT_MASK);
531     p->buf = xmalloc(b->file[cat].head.block_size);
532     p->cbuf = 0;
533 
534     if (!cache_block (b, pos, p->buf, 0))
535     {
536         yaz_log(b->log_io, "bf_read: open_block");
537         if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
538         {
539             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
540                     (long) pos, (long) pos/CAT_MAX);
541             zebra_exit("isamb:open_block");
542         }
543     }
544     p->bytes = (char *)p->buf + offset;
545     p->leaf = p->buf[0];
546     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
547     if (p->size < 0)
548     {
549         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
550                 p->size, pos);
551     }
552     assert(p->size >= 0);
553     src = (char*) p->buf + 3;
554     decode_ptr(&src, &p->no_items);
555 
556     p->offset = 0;
557     p->dirty = 0;
558     p->deleted = 0;
559     p->decodeClientData = (*b->method->codec.start)();
560     return p;
561 }
562 
new_block(ISAMB b,int leaf,int cat)563 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
564 {
565     struct ISAMB_block *p;
566 
567     p = xmalloc(sizeof(*p));
568     p->buf = xmalloc(b->file[cat].head.block_size);
569 
570     if (!b->file[cat].head.free_list)
571     {
572         zint block_no;
573         block_no = b->file[cat].head.last_block++;
574         p->pos = block_no * CAT_MAX + cat;
575         if (b->log_freelist)
576             yaz_log(b->log_freelist, "got block "
577                     ZINT_FORMAT " from last %d:" ZINT_FORMAT, p->pos,
578                     cat, p->pos/CAT_MAX);
579     }
580     else
581     {
582         p->pos = b->file[cat].head.free_list;
583         assert((p->pos & CAT_MASK) == cat);
584         if (!cache_block(b, p->pos, p->buf, 0))
585         {
586             yaz_log(b->log_io, "bf_read: new_block");
587             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
588             {
589                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
590                         (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
591                 zebra_exit("isamb:new_block");
592             }
593         }
594         if (b->log_freelist)
595             yaz_log(b->log_freelist, "got block "
596                     ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
597                     cat, p->pos/CAT_MAX);
598         memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
599     }
600     p->cat = cat;
601     b->file[cat].head_dirty = 1;
602     memset(p->buf, 0, b->file[cat].head.block_size);
603     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
604     p->leaf = leaf;
605     p->size = 0;
606     p->dirty = 1;
607     p->deleted = 0;
608     p->offset = 0;
609     p->no_items = 0;
610     p->decodeClientData = (*b->method->codec.start)();
611     return p;
612 }
613 
new_leaf(ISAMB b,int cat)614 struct ISAMB_block *new_leaf(ISAMB b, int cat)
615 {
616     return new_block(b, 1, cat);
617 }
618 
619 
new_int(ISAMB b,int cat)620 struct ISAMB_block *new_int(ISAMB b, int cat)
621 {
622     return new_block(b, 0, cat);
623 }
624 
check_block(ISAMB b,struct ISAMB_block * p)625 static void check_block(ISAMB b, struct ISAMB_block *p)
626 {
627     assert(b); /* mostly to make the compiler shut up about unused b */
628     if (p->leaf)
629     {
630         ;
631     }
632     else
633     {
634         /* sanity check */
635         char *startp = p->bytes;
636         const char *src = startp;
637         char *endp = p->bytes + p->size;
638         ISAM_P pos;
639 	void *c1 = (*b->method->codec.start)();
640 
641         decode_ptr(&src, &pos);
642         assert((pos&CAT_MASK) == p->cat);
643         while (src != endp)
644         {
645 #if INT_ENCODE
646 	    char file_item_buf[DST_ITEM_MAX];
647 	    char *file_item = file_item_buf;
648 	    (*b->method->codec.reset)(c1);
649 	    (*b->method->codec.decode)(c1, &file_item, &src);
650 #else
651             zint item_len;
652             decode_item_len(&src, &item_len);
653             assert(item_len > 0 && item_len < 128);
654             src += item_len;
655 #endif
656             decode_ptr(&src, &pos);
657 	    if ((pos&CAT_MASK) != p->cat)
658 	    {
659 		assert((pos&CAT_MASK) == p->cat);
660 	    }
661         }
662 	(*b->method->codec.stop)(c1);
663     }
664 }
665 
close_block(ISAMB b,struct ISAMB_block * p)666 void close_block(ISAMB b, struct ISAMB_block *p)
667 {
668     if (!p)
669         return;
670     if (p->deleted)
671     {
672         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
673                 p->pos, p->cat, p->pos/CAT_MAX);
674         memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
675         b->file[p->cat].head.free_list = p->pos;
676         b->file[p->cat].head_dirty = 1;
677         if (!cache_block(b, p->pos, p->buf, 1))
678         {
679             yaz_log(b->log_io, "bf_write: close_block (deleted)");
680             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
681         }
682     }
683     else if (p->dirty)
684     {
685 	int offset = b->file[p->cat].head.block_offset;
686         int size = p->size + offset;
687 	char *dst =  (char*)p->buf + 3;
688         assert(p->size >= 0);
689 
690 	/* memset becuase encode_ptr usually does not write all bytes */
691 	memset(p->buf, 0, b->file[p->cat].head.block_offset);
692         p->buf[0] = p->leaf;
693         p->buf[1] = size & 255;
694         p->buf[2] = size >> 8;
695 	encode_ptr(&dst, p->no_items);
696         check_block(b, p);
697         if (!cache_block(b, p->pos, p->buf, 1))
698         {
699             yaz_log(b->log_io, "bf_write: close_block");
700             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
701         }
702     }
703     (*b->method->codec.stop)(p->decodeClientData);
704     xfree(p->buf);
705     xfree(p);
706 }
707 
708 int insert_sub(ISAMB b, struct ISAMB_block **p,
709                void *new_item, int *mode,
710                ISAMC_I *stream,
711                struct ISAMB_block **sp,
712                void *sub_item, int *sub_size,
713                const void *max_item);
714 
insert_int(ISAMB b,struct ISAMB_block * p,void * lookahead_item,int * mode,ISAMC_I * stream,struct ISAMB_block ** sp,void * split_item,int * split_size,const void * last_max_item)715 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
716                int *mode,
717                ISAMC_I *stream, struct ISAMB_block **sp,
718                void *split_item, int *split_size, const void *last_max_item)
719 {
720     char *startp = p->bytes;
721     const char *src = startp;
722     char *endp = p->bytes + p->size;
723     ISAM_P pos;
724     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
725     char sub_item[DST_ITEM_MAX];
726     int sub_size;
727     int more = 0;
728     zint diff_terms = 0;
729     void *c1 = (*b->method->codec.start)();
730 
731     *sp = 0;
732 
733     assert(p->size >= 0);
734     decode_ptr(&src, &pos);
735     while (src != endp)
736     {
737         int d;
738         const char *src0 = src;
739 #if INT_ENCODE
740 	char file_item_buf[DST_ITEM_MAX];
741 	char *file_item = file_item_buf;
742 	(*b->method->codec.reset)(c1);
743         (*b->method->codec.decode)(c1, &file_item, &src);
744 	d = (*b->method->compare_item)(file_item_buf, lookahead_item);
745         if (d > 0)
746         {
747             sub_p1 = open_block(b, pos);
748             assert(sub_p1);
749 	    diff_terms -= sub_p1->no_items;
750             more = insert_sub(b, &sub_p1, lookahead_item, mode,
751                               stream, &sub_p2,
752                               sub_item, &sub_size, file_item_buf);
753 	    diff_terms += sub_p1->no_items;
754             src = src0;
755             break;
756         }
757 #else
758         zint item_len;
759         decode_item_len(&src, &item_len);
760         d = (*b->method->compare_item)(src, lookahead_item);
761         if (d > 0)
762         {
763             sub_p1 = open_block(b, pos);
764             assert(sub_p1);
765 	    diff_terms -= sub_p1->no_items;
766             more = insert_sub(b, &sub_p1, lookahead_item, mode,
767                               stream, &sub_p2,
768                               sub_item, &sub_size, src);
769 	    diff_terms += sub_p1->no_items;
770             src = src0;
771             break;
772         }
773         src += item_len;
774 #endif
775         decode_ptr(&src, &pos);
776     }
777     if (!sub_p1)
778     {
779 	/* we reached the end. So lookahead > last item */
780         sub_p1 = open_block(b, pos);
781         assert(sub_p1);
782 	diff_terms -= sub_p1->no_items;
783         more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2,
784                           sub_item, &sub_size, last_max_item);
785 	diff_terms += sub_p1->no_items;
786     }
787     if (sub_p2)
788 	diff_terms += sub_p2->no_items;
789     if (diff_terms)
790     {
791 	p->dirty = 1;
792 	p->no_items += diff_terms;
793     }
794     if (sub_p2)
795     {
796         /* there was a split - must insert pointer in this one */
797         char dst_buf[DST_BUF_SIZE];
798         char *dst = dst_buf;
799 #if INT_ENCODE
800 	const char *sub_item_ptr = sub_item;
801 #endif
802         assert(sub_size < DST_ITEM_MAX && sub_size > 1);
803 
804         memcpy(dst, startp, src - startp);
805 
806         dst += src - startp;
807 
808 #if INT_ENCODE
809 	(*b->method->codec.reset)(c1);
810         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
811 #else
812         encode_item_len(&dst, sub_size);      /* sub length and item */
813         memcpy(dst, sub_item, sub_size);
814         dst += sub_size;
815 #endif
816 
817         encode_ptr(&dst, sub_p2->pos);   /* pos */
818 
819         if (endp - src)                   /* remaining data */
820         {
821             memcpy(dst, src, endp - src);
822             dst += endp - src;
823         }
824         p->size = dst - dst_buf;
825         assert(p->size >= 0);
826 
827         if (p->size <= b->file[p->cat].head.block_max)
828         {
829 	    /* it fits OK in this block */
830             memcpy(startp, dst_buf, dst - dst_buf);
831 
832 	    close_block(b, sub_p2);
833         }
834         else
835         {
836 	    /* must split _this_ block as well .. */
837 	    struct ISAMB_block *sub_p3;
838 #if INT_ENCODE
839 	    char file_item_buf[DST_ITEM_MAX];
840 	    char *file_item = file_item_buf;
841 #else
842 	    zint split_size_tmp;
843 #endif
844 	    zint no_items_first_half = 0;
845             int p_new_size;
846             const char *half;
847             src = dst_buf;
848             endp = dst;
849 
850             b->number_of_int_splits++;
851 
852 	    p->dirty = 1;
853 	    close_block(b, sub_p2);
854 
855             half = src + b->file[p->cat].head.block_size/2;
856             decode_ptr(&src, &pos);
857 
858             if (b->enable_int_count)
859             {
860                 /* read sub block so we can get no_items for it */
861                 sub_p3 = open_block(b, pos);
862                 no_items_first_half += sub_p3->no_items;
863                 close_block(b, sub_p3);
864             }
865 
866             while (src <= half)
867             {
868 #if INT_ENCODE
869 	        file_item = file_item_buf;
870 		(*b->method->codec.reset)(c1);
871 		(*b->method->codec.decode)(c1, &file_item, &src);
872 #else
873                 decode_item_len(&src, &split_size_tmp);
874 		*split_size = (int) split_size_tmp;
875                 src += *split_size;
876 #endif
877                 decode_ptr(&src, &pos);
878 
879                 if (b->enable_int_count)
880                 {
881                     /* read sub block so we can get no_items for it */
882                     sub_p3 = open_block(b, pos);
883                     no_items_first_half += sub_p3->no_items;
884                     close_block(b, sub_p3);
885                 }
886             }
887 	    /*  p is first half */
888             p_new_size = src - dst_buf;
889             memcpy(p->bytes, dst_buf, p_new_size);
890 
891 #if INT_ENCODE
892 	    file_item = file_item_buf;
893 	    (*b->method->codec.reset)(c1);
894 	    (*b->method->codec.decode)(c1, &file_item, &src);
895 	    *split_size = file_item - file_item_buf;
896 	    memcpy(split_item, file_item_buf, *split_size);
897 #else
898 	    decode_item_len(&src, &split_size_tmp);
899 	    *split_size = (int) split_size_tmp;
900             memcpy(split_item, src, *split_size);
901             src += *split_size;
902 #endif
903 	    /*  *sp is second half */
904             *sp = new_int(b, p->cat);
905             (*sp)->size = endp - src;
906             memcpy((*sp)->bytes, src, (*sp)->size);
907 
908             p->size = p_new_size;
909 
910 	    /*  adjust no_items in first&second half */
911 	    (*sp)->no_items = p->no_items - no_items_first_half;
912 	    p->no_items = no_items_first_half;
913         }
914 	p->dirty = 1;
915     }
916     close_block(b, sub_p1);
917     (*b->method->codec.stop)(c1);
918     return more;
919 }
920 
insert_leaf(ISAMB b,struct ISAMB_block ** sp1,void * lookahead_item,int * lookahead_mode,ISAMC_I * stream,struct ISAMB_block ** sp2,void * sub_item,int * sub_size,const void * max_item)921 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
922                 int *lookahead_mode, ISAMC_I *stream,
923                 struct ISAMB_block **sp2,
924                 void *sub_item, int *sub_size,
925                 const void *max_item)
926 {
927     struct ISAMB_block *p = *sp1;
928     char *endp = 0;
929     const char *src = 0;
930     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
931     int new_size;
932     void *c1 = (*b->method->codec.start)();
933     void *c2 = (*b->method->codec.start)();
934     int more = 1;
935     int quater = b->file[b->no_cat-1].head.block_max / 4;
936     char *mid_cut = dst_buf + quater * 2;
937     char *tail_cut = dst_buf + quater * 3;
938     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
939     char *half1 = 0;
940     char *half2 = 0;
941     char cut_item_buf[DST_ITEM_MAX];
942     int cut_item_size = 0;
943     int no_items = 0;    /* number of items (total) */
944     int no_items_1 = 0;  /* number of items (first half) */
945     int inserted_dst_bytes = 0;
946 
947     if (p && p->size)
948     {
949         char file_item_buf[DST_ITEM_MAX];
950         char *file_item = file_item_buf;
951 
952         src = p->bytes;
953         endp = p->bytes + p->size;
954         (*b->method->codec.decode)(c1, &file_item, &src);
955         while (1)
956         {
957             const char *dst_item = 0; /* resulting item to be inserted */
958             char *lookahead_next;
959 	    char *dst_0 = dst;
960             int d = -1;
961 
962             if (lookahead_item)
963                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
964 
965 	    /* d now holds comparison between existing file item and
966 	       lookahead item
967 	       d = 0: equal
968                d > 0: lookahead before file
969 	       d < 0: lookahead after file
970 	    */
971             if (d > 0)
972             {
973 		/* lookahead must be inserted */
974                 dst_item = lookahead_item;
975 		/* if this is not an insertion, it's really bad .. */
976                 if (!*lookahead_mode)
977                 {
978                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
979                     assert(*lookahead_mode);
980                 }
981             }
982             else if (d == 0 && *lookahead_mode == 2)
983             {
984                 /* For mode == 2, we insert the new key anyway - even
985                    though the comparison is 0. */
986                 dst_item = lookahead_item;
987                 p->dirty = 1;
988             }
989             else
990                 dst_item = file_item_buf;
991 
992             if (d == 0 && !*lookahead_mode)
993             {
994                 /* it's a deletion and they match so there is nothing
995                    to be inserted anyway .. But mark the thing dirty
996                    (file item was part of input.. The item will not be
997                    part of output */
998                 p->dirty = 1;
999             }
1000             else if (!half1 && dst > mid_cut)
1001             {
1002 		/* we have reached the splitting point for the first time */
1003                 const char *dst_item_0 = dst_item;
1004                 half1 = dst; /* candidate for splitting */
1005 
1006 		/* encode the resulting item */
1007                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1008 
1009                 cut_item_size = dst_item - dst_item_0;
1010 		assert(cut_item_size > 0);
1011                 memcpy(cut_item_buf, dst_item_0, cut_item_size);
1012 
1013                 half2 = dst;
1014 		no_items_1 = no_items;
1015 		no_items++;
1016             }
1017             else
1018 	    {
1019 		/* encode the resulting item */
1020                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1021 		no_items++;
1022 	    }
1023 
1024 	    /* now move "pointers" .. result has been encoded .. */
1025             if (d > 0)
1026             {
1027 		/* we must move the lookahead pointer */
1028 
1029 		inserted_dst_bytes += (dst - dst_0);
1030                 if (inserted_dst_bytes >=  quater)
1031 		    /* no more room. Mark lookahead as "gone".. */
1032                     lookahead_item = 0;
1033                 else
1034                 {
1035 		    /* move it really.. */
1036                     lookahead_next = lookahead_item;
1037                     if (!(*stream->read_item)(stream->clientData,
1038                                               &lookahead_next,
1039                                               lookahead_mode))
1040                     {
1041 			/* end of stream reached: no "more" and no lookahead */
1042                         lookahead_item = 0;
1043                         more = 0;
1044                     }
1045                     if (lookahead_item && max_item &&
1046                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1047                     {
1048 			/* the lookahead goes beyond what we allow in this
1049 			   leaf. Mark it as "gone" */
1050                         lookahead_item = 0;
1051                     }
1052 
1053                     p->dirty = 1;
1054                 }
1055             }
1056             else if (d == 0)
1057             {
1058 		/* exact match .. move both pointers */
1059 
1060                 lookahead_next = lookahead_item;
1061                 if (!(*stream->read_item)(stream->clientData,
1062                                           &lookahead_next, lookahead_mode))
1063                 {
1064                     lookahead_item = 0;
1065                     more = 0;
1066                 }
1067                 if (src == endp)
1068                     break;  /* end of file stream reached .. */
1069                 file_item = file_item_buf; /* move file pointer */
1070                 (*b->method->codec.decode)(c1, &file_item, &src);
1071             }
1072             else
1073             {
1074 		/* file pointer must be moved */
1075                 if (src == endp)
1076                     break;
1077                 file_item = file_item_buf;
1078                 (*b->method->codec.decode)(c1, &file_item, &src);
1079             }
1080         }
1081     }
1082 
1083     /* this loop runs when we are "appending" to a leaf page. That is
1084        either it's empty (new) or all file items have been read in
1085        previous loop */
1086 
1087     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1088     while (lookahead_item)
1089     {
1090         char *dst_item;
1091 	const char *src = lookahead_item;
1092         char *dst_0 = dst;
1093 
1094 	/* if we have a lookahead item, we stop if we exceed the value of it */
1095         if (max_item &&
1096             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1097         {
1098             /* stop if we have reached the value of max item */
1099             break;
1100         }
1101         if (!*lookahead_mode)
1102         {
1103 	    /* this is append. So a delete is bad */
1104             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1105             assert(*lookahead_mode);
1106         }
1107         else if (!half1 && dst > tail_cut)
1108         {
1109             const char *src_0 = src;
1110             half1 = dst; /* candidate for splitting */
1111 
1112             (*b->method->codec.encode)(c2, &dst, &src);
1113 
1114             cut_item_size = src - src_0;
1115 	    assert(cut_item_size > 0);
1116             memcpy(cut_item_buf, src_0, cut_item_size);
1117 
1118 	    no_items_1 = no_items;
1119             half2 = dst;
1120         }
1121         else
1122             (*b->method->codec.encode)(c2, &dst, &src);
1123 
1124         if (dst > maxp)
1125         {
1126             dst = dst_0;
1127             break;
1128         }
1129 	no_items++;
1130         if (p)
1131             p->dirty = 1;
1132         dst_item = lookahead_item;
1133         if (!(*stream->read_item)(stream->clientData, &dst_item,
1134                                   lookahead_mode))
1135         {
1136             lookahead_item = 0;
1137             more = 0;
1138         }
1139     }
1140     new_size = dst - dst_buf;
1141     if (p && p->cat != b->no_cat-1 &&
1142         new_size > b->file[p->cat].head.block_max)
1143     {
1144         /* non-btree block will be removed */
1145         p->deleted = 1;
1146         close_block(b, p);
1147         /* delete it too!! */
1148         p = 0; /* make a new one anyway */
1149     }
1150     if (!p)
1151     {   /* must create a new one */
1152         int i;
1153         for (i = 0; i < b->no_cat; i++)
1154             if (new_size <= b->file[i].head.block_max)
1155                 break;
1156         if (i == b->no_cat)
1157             i = b->no_cat - 1;
1158         p = new_leaf(b, i);
1159     }
1160     if (new_size > b->file[p->cat].head.block_max)
1161     {
1162         char *first_dst;
1163         const char *cut_item = cut_item_buf;
1164 
1165         assert(half1);
1166         assert(half2);
1167 
1168 	assert(cut_item_size > 0);
1169 
1170 	/* first half */
1171         p->size = half1 - dst_buf;
1172 	assert(p->size <=  b->file[p->cat].head.block_max);
1173         memcpy(p->bytes, dst_buf, half1 - dst_buf);
1174 	p->no_items = no_items_1;
1175 
1176         /* second half */
1177         *sp2 = new_leaf(b, p->cat);
1178 
1179         (*b->method->codec.reset)(c2);
1180 
1181         b->number_of_leaf_splits++;
1182 
1183         first_dst = (*sp2)->bytes;
1184 
1185         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1186 
1187         memcpy(first_dst, half2, dst - half2);
1188 
1189         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1190 	assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1191 	(*sp2)->no_items = no_items - no_items_1;
1192         (*sp2)->dirty = 1;
1193         p->dirty = 1;
1194         memcpy(sub_item, cut_item_buf, cut_item_size);
1195         *sub_size = cut_item_size;
1196     }
1197     else
1198     {
1199         memcpy(p->bytes, dst_buf, dst - dst_buf);
1200         p->size = new_size;
1201 	p->no_items = no_items;
1202     }
1203     (*b->method->codec.stop)(c1);
1204     (*b->method->codec.stop)(c2);
1205     *sp1 = p;
1206     return more;
1207 }
1208 
insert_sub(ISAMB b,struct ISAMB_block ** p,void * new_item,int * mode,ISAMC_I * stream,struct ISAMB_block ** sp,void * sub_item,int * sub_size,const void * max_item)1209 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1210                int *mode,
1211                ISAMC_I *stream,
1212                struct ISAMB_block **sp,
1213                void *sub_item, int *sub_size,
1214                const void *max_item)
1215 {
1216     if (!*p || (*p)->leaf)
1217         return insert_leaf(b, p, new_item, mode, stream, sp, sub_item,
1218                            sub_size, max_item);
1219     else
1220         return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1221                           sub_size, max_item);
1222 }
1223 
isamb_unlink(ISAMB b,ISAM_P pos)1224 int isamb_unlink(ISAMB b, ISAM_P pos)
1225 {
1226     struct ISAMB_block *p1;
1227 
1228     if (!pos)
1229 	return 0;
1230     p1 = open_block(b, pos);
1231     p1->deleted = 1;
1232     if (!p1->leaf)
1233     {
1234 	zint sub_p;
1235 	const char *src = p1->bytes + p1->offset;
1236 #if INT_ENCODE
1237 	void *c1 = (*b->method->codec.start)();
1238 #endif
1239 	decode_ptr(&src, &sub_p);
1240 	isamb_unlink(b, sub_p);
1241 
1242 	while (src != p1->bytes + p1->size)
1243 	{
1244 #if INT_ENCODE
1245 	    char file_item_buf[DST_ITEM_MAX];
1246 	    char *file_item = file_item_buf;
1247 	    (*b->method->codec.reset)(c1);
1248 	    (*b->method->codec.decode)(c1, &file_item, &src);
1249 #else
1250 	    zint item_len;
1251 	    decode_item_len(&src, &item_len);
1252 	    src += item_len;
1253 #endif
1254 	    decode_ptr(&src, &sub_p);
1255 	    isamb_unlink(b, sub_p);
1256 	}
1257 #if INT_ENCODE
1258 	(*b->method->codec.stop)(c1);
1259 #endif
1260     }
1261     close_block(b, p1);
1262     return 0;
1263 }
1264 
isamb_merge(ISAMB b,ISAM_P * pos,ISAMC_I * stream)1265 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1266 {
1267     char item_buf[DST_ITEM_MAX];
1268     char *item_ptr;
1269     int i_mode;
1270     int more;
1271     int must_delete = 0;
1272 
1273     if (b->cache < 0)
1274     {
1275         int more = 1;
1276         while (more)
1277         {
1278             item_ptr = item_buf;
1279             more =
1280                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1281         }
1282 	*pos = 1;
1283         return;
1284     }
1285     item_ptr = item_buf;
1286     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1287     while (more)
1288     {
1289         struct ISAMB_block *p = 0, *sp = 0;
1290         char sub_item[DST_ITEM_MAX];
1291         int sub_size;
1292 
1293         if (*pos)
1294             p = open_block(b, *pos);
1295         more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1296                           sub_item, &sub_size, 0);
1297         if (sp)
1298         {    /* increase level of tree by one */
1299             struct ISAMB_block *p2 = new_int(b, p->cat);
1300             char *dst = p2->bytes + p2->size;
1301 #if INT_ENCODE
1302 	    void *c1 = (*b->method->codec.start)();
1303 	    const char *sub_item_ptr = sub_item;
1304 #endif
1305 
1306             encode_ptr(&dst, p->pos);
1307 	    assert(sub_size < DST_ITEM_MAX && sub_size > 1);
1308 #if INT_ENCODE
1309 	    (*b->method->codec.reset)(c1);
1310 	    (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1311 #else
1312             encode_item_len(&dst, sub_size);
1313             memcpy(dst, sub_item, sub_size);
1314             dst += sub_size;
1315 #endif
1316             encode_ptr(&dst, sp->pos);
1317 
1318             p2->size = dst - p2->bytes;
1319 	    p2->no_items = p->no_items + sp->no_items;
1320             *pos = p2->pos;  /* return new super page */
1321             close_block(b, sp);
1322             close_block(b, p2);
1323 #if INT_ENCODE
1324 	    (*b->method->codec.stop)(c1);
1325 #endif
1326         }
1327         else
1328 	{
1329             *pos = p->pos;   /* return current one (again) */
1330 	}
1331 	if (p->no_items == 0)
1332 	    must_delete = 1;
1333 	else
1334 	    must_delete = 0;
1335         close_block(b, p);
1336     }
1337     if (must_delete)
1338     {
1339 	isamb_unlink(b, *pos);
1340 	*pos = 0;
1341     }
1342 }
1343 
isamb_pp_open_x(ISAMB isamb,ISAM_P pos,int * level,int scope)1344 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1345 {
1346     ISAMB_PP pp = xmalloc(sizeof(*pp));
1347     int i;
1348 
1349     assert(pos);
1350 
1351     pp->isamb = isamb;
1352     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1353 
1354     pp->pos = pos;
1355     pp->level = 0;
1356     pp->maxlevel = 0;
1357     pp->total_size = 0;
1358     pp->no_blocks = 0;
1359     pp->skipped_numbers = 0;
1360     pp->returned_numbers = 0;
1361     pp->scope = scope;
1362     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1363         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1364     while (1)
1365     {
1366         struct ISAMB_block *p = open_block(isamb, pos);
1367         const char *src = p->bytes + p->offset;
1368         pp->block[pp->level] = p;
1369 
1370         pp->total_size += p->size;
1371         pp->no_blocks++;
1372         if (p->leaf)
1373             break;
1374         decode_ptr(&src, &pos);
1375         p->offset = src - p->bytes;
1376         pp->level++;
1377         pp->accessed_nodes[pp->level]++;
1378     }
1379     pp->block[pp->level+1] = 0;
1380     pp->maxlevel = pp->level;
1381     if (level)
1382         *level = pp->level;
1383     return pp;
1384 }
1385 
isamb_pp_open(ISAMB isamb,ISAM_P pos,int scope)1386 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1387 {
1388     return isamb_pp_open_x(isamb, pos, 0, scope);
1389 }
1390 
isamb_pp_close_x(ISAMB_PP pp,zint * size,zint * blocks)1391 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1392 {
1393     int i;
1394     if (!pp)
1395         return;
1396     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, "
1397             "skipped "ZINT_FORMAT,
1398             pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1399     for (i = pp->maxlevel; i>=0; i--)
1400         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1401             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1402                     ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1403                     pp->accessed_nodes[i], pp->skipped_nodes[i]);
1404     pp->isamb->skipped_numbers += pp->skipped_numbers;
1405     pp->isamb->returned_numbers += pp->returned_numbers;
1406     for (i = pp->maxlevel; i>=0; i--)
1407     {
1408         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1409         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1410     }
1411     if (size)
1412         *size = pp->total_size;
1413     if (blocks)
1414         *blocks = pp->no_blocks;
1415     for (i = 0; i <= pp->level; i++)
1416         close_block(pp->isamb, pp->block[i]);
1417     xfree(pp->block);
1418     xfree(pp);
1419 }
1420 
isamb_block_info(ISAMB isamb,int cat)1421 int isamb_block_info(ISAMB isamb, int cat)
1422 {
1423     if (cat >= 0 && cat < isamb->no_cat)
1424         return isamb->file[cat].head.block_size;
1425     return -1;
1426 }
1427 
isamb_pp_close(ISAMB_PP pp)1428 void isamb_pp_close(ISAMB_PP pp)
1429 {
1430     isamb_pp_close_x(pp, 0, 0);
1431 }
1432 
1433 /* simple recursive dumper .. */
isamb_dump_r(ISAMB b,ISAM_P pos,void (* pr)(const char * str),int level)1434 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1435                          int level)
1436 {
1437     char buf[1024];
1438     char prefix_str[1024];
1439     if (pos)
1440     {
1441 	struct ISAMB_block *p = open_block(b, pos);
1442 	sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1443 		ZINT_FORMAT, level*2, "",
1444 		pos, p->cat, p->size, b->file[p->cat].head.block_max,
1445 		p->no_items);
1446 	(*pr)(prefix_str);
1447 	sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1448 	if (p->leaf)
1449 	{
1450 	    while (p->offset < p->size)
1451 	    {
1452 		const char *src = p->bytes + p->offset;
1453 		char *dst = buf;
1454 		(*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1455 		(*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1456 		p->offset = src - (char*) p->bytes;
1457 	    }
1458 	    assert(p->offset == p->size);
1459 	}
1460 	else
1461 	{
1462 	    const char *src = p->bytes + p->offset;
1463 	    ISAM_P sub;
1464 
1465 	    decode_ptr(&src, &sub);
1466 	    p->offset = src - (char*) p->bytes;
1467 
1468 	    isamb_dump_r(b, sub, pr, level+1);
1469 
1470 	    while (p->offset < p->size)
1471 	    {
1472 #if INT_ENCODE
1473 		char file_item_buf[DST_ITEM_MAX];
1474 		char *file_item = file_item_buf;
1475 		void *c1 = (*b->method->codec.start)();
1476 		(*b->method->codec.decode)(c1, &file_item, &src);
1477 		(*b->method->codec.stop)(c1);
1478 		(*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1479 #else
1480 		zint item_len;
1481 		decode_item_len(&src, &item_len);
1482 		(*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1483 		src += item_len;
1484 #endif
1485 		decode_ptr(&src, &sub);
1486 
1487 		p->offset = src - (char*) p->bytes;
1488 
1489 		isamb_dump_r(b, sub, pr, level+1);
1490 	    }
1491 	}
1492 	close_block(b, p);
1493     }
1494 }
1495 
isamb_dump(ISAMB b,ISAM_P pos,void (* pr)(const char * str))1496 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1497 {
1498     isamb_dump_r(b, pos, pr, 0);
1499 }
1500 
isamb_pp_read(ISAMB_PP pp,void * buf)1501 int isamb_pp_read(ISAMB_PP pp, void *buf)
1502 {
1503     return isamb_pp_forward(pp, buf, 0);
1504 }
1505 
1506 
isamb_pp_pos(ISAMB_PP pp,double * current,double * total)1507 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1508 { /* return an estimate of the current position and of the total number of */
1509   /* occureences in the isam tree, based on the current leaf */
1510     assert(total);
1511     assert(current);
1512 
1513     /* if end-of-stream PP may not be leaf */
1514 
1515     *total = (double) (pp->block[0]->no_items);
1516     *current = (double) pp->returned_numbers;
1517 #if ISAMB_DEBUG
1518     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1519             ZINT_FORMAT, *current, *total, pp->returned_numbers);
1520 #endif
1521 }
1522 
isamb_pp_forward(ISAMB_PP pp,void * buf,const void * untilb)1523 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1524 {
1525     char *dst = buf;
1526     const char *src;
1527     struct ISAMB_block *p = pp->block[pp->level];
1528     ISAMB b = pp->isamb;
1529     if (!p)
1530         return 0;
1531  again:
1532     while (p->offset == p->size)
1533     {
1534         ISAM_P pos;
1535 #if INT_ENCODE
1536 	const char *src_0;
1537 	void *c1;
1538 	char file_item_buf[DST_ITEM_MAX];
1539 	char *file_item = file_item_buf;
1540 #else
1541 	zint item_len;
1542 #endif
1543         while (p->offset == p->size)
1544         {
1545             if (pp->level == 0)
1546                 return 0;
1547             close_block(pp->isamb, pp->block[pp->level]);
1548             pp->block[pp->level] = 0;
1549             (pp->level)--;
1550             p = pp->block[pp->level];
1551             assert(!p->leaf);
1552         }
1553 
1554 	assert(!p->leaf);
1555         src = p->bytes + p->offset;
1556 
1557 #if INT_ENCODE
1558 	c1 = (*b->method->codec.start)();
1559 	(*b->method->codec.decode)(c1, &file_item, &src);
1560 #else
1561         decode_ptr(&src, &item_len);
1562         src += item_len;
1563 #endif
1564         decode_ptr(&src, &pos);
1565         p->offset = src - (char*) p->bytes;
1566 
1567 	src = p->bytes + p->offset;
1568 
1569 	while(1)
1570 	{
1571 	    if (!untilb || p->offset == p->size)
1572 		break;
1573 	    assert(p->offset < p->size);
1574 #if INT_ENCODE
1575 	    src_0 = src;
1576 	    file_item = file_item_buf;
1577 	    (*b->method->codec.reset)(c1);
1578 	    (*b->method->codec.decode)(c1, &file_item, &src);
1579 	    if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1580 	    {
1581 		src = src_0;
1582 		break;
1583 	    }
1584 #else
1585 	    decode_item_len(&src, &item_len);
1586 	    if ((*b->method->compare_item)(untilb, src) < pp->scope)
1587 		break;
1588 	    src += item_len;
1589 #endif
1590 	    decode_ptr(&src, &pos);
1591 	    p->offset = src - (char*) p->bytes;
1592 	}
1593 
1594 	pp->level++;
1595 
1596         while (1)
1597         {
1598             pp->block[pp->level] = p = open_block(pp->isamb, pos);
1599 
1600             pp->total_size += p->size;
1601             pp->no_blocks++;
1602 
1603             if (p->leaf)
1604             {
1605                 break;
1606             }
1607 
1608             src = p->bytes + p->offset;
1609 	    while(1)
1610 	    {
1611 		decode_ptr(&src, &pos);
1612 		p->offset = src - (char*) p->bytes;
1613 
1614 		if (!untilb || p->offset == p->size)
1615 		    break;
1616 		assert(p->offset < p->size);
1617 #if INT_ENCODE
1618 		src_0 = src;
1619 		file_item = file_item_buf;
1620 		(*b->method->codec.reset)(c1);
1621 		(*b->method->codec.decode)(c1, &file_item, &src);
1622 		if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1623 		{
1624 		    src = src_0;
1625 		    break;
1626 		}
1627 #else
1628 		decode_ptr(&src, &item_len);
1629 		if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1630 		    break;
1631 		src += item_len;
1632 #endif
1633 	    }
1634             pp->level++;
1635         }
1636 #if INT_ENCODE
1637 	(*b->method->codec.stop)(c1);
1638 #endif
1639     }
1640     assert(p->offset < p->size);
1641     assert(p->leaf);
1642     while(1)
1643     {
1644 	char *dst0 = dst;
1645         src = p->bytes + p->offset;
1646         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1647         p->offset = src - (char*) p->bytes;
1648         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1649 	    break;
1650 	dst = dst0;
1651 	if (p->offset == p->size) goto again;
1652     }
1653     pp->returned_numbers++;
1654     return 1;
1655 }
1656 
isamb_get_int_splits(ISAMB b)1657 zint isamb_get_int_splits(ISAMB b)
1658 {
1659     return b->number_of_int_splits;
1660 }
1661 
isamb_get_leaf_splits(ISAMB b)1662 zint isamb_get_leaf_splits(ISAMB b)
1663 {
1664     return b->number_of_leaf_splits;
1665 }
1666 
isamb_get_root_ptr(ISAMB b)1667 zint isamb_get_root_ptr(ISAMB b)
1668 {
1669     return b->root_ptr;
1670 }
1671 
isamb_set_root_ptr(ISAMB b,zint root_ptr)1672 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1673 {
1674     b->root_ptr = root_ptr;
1675 }
1676 
1677 
1678 /*
1679  * Local variables:
1680  * c-basic-offset: 4
1681  * c-file-style: "Stroustrup"
1682  * indent-tabs-mode: nil
1683  * End:
1684  * vim: shiftwidth=4 tabstop=8 expandtab
1685  */
1686 
1687