1 /* for pread() */
2
3 #define _XOPEN_SOURCE 500
4 #define _GNU_SOURCE /* for unlocked_stdio */
5 #include "config.h"
6
7 /* TODO: handle EINTR everywhere. */
8
9 #include <string.h>
10 #include <stdio.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <sys/mman.h>
14 #include <zlib.h>
15 #include <errno.h>
16
17 #include "gskutils.h"
18 #include "gskerror.h"
19 #include "gsklistmacros.h"
20 #include "gsktable.h"
21 #include "gsktable-file.h"
22 #include "gsktable-helpers.h"
23
24 #define FTELLO ftello /* TODO: track the offsets ourselves */
25 #define FSEEKO fseeko
26 #define FREAD fread
27
28 /* debug invocation of cache-entry force */
29 #define DEBUG_CACHE_ENTRY_FORCE 0
30
31 /* debug compressed data contents */
32 #define DEBUG_DUMP_COMPRESSED_DATA 0
33
34 /* debug chunk sizes and offsets while reading */
35 #define DEBUG_READ_CHUNK 0
36
37 /* debug serialization and deserialization of cache entries */
38 #define DEBUG_ENTRY_SERIALIZATION 0
39
40 /* debug flushing the current chunk of data */
41 #define DEBUG_FLUSH 0
42
43 /* debug serialization of fixed-length index-entry objects */
44 #define DEBUG_INDEX_ENTRIES 0
45
46 typedef struct _FlatFactory FlatFactory;
47 typedef struct _MmapReader MmapReader;
48 typedef struct _MmapWriter MmapWriter;
49 typedef struct _FlatFileBuilder FlatFileBuilder;
50 typedef struct _FlatFile FlatFile;
51 typedef struct _FlatFileReader FlatFileReader;
52
53 typedef enum
54 {
55 FILE_INDEX,
56 FILE_FIRSTKEYS,
57 FILE_DATA
58 } WhichFile;
59 #define N_FILES 3
60
61 /* MUST be a power-of-two */
62 #define MMAP_WRITER_SIZE (512*1024)
63
64 #define MAX_MMAP (1024*1024)
65
66 static const char *file_extensions[N_FILES] = { "index", "firstkeys", "data" };
67
68 struct _FlatFactory
69 {
70 GskTableFileFactory base_factory;
71 guint bytes_per_chunk;
72 guint compression_level;
73 guint n_recycled_builders;
74 guint max_recycled_builders;
75 FlatFileBuilder *recycled_builders;
76 guint max_cache_entries;
77 };
78
79
80 struct _MmapReader
81 {
82 gint fd;
83 guint64 file_size;
84 guint8 *mmapped;
85 GskTableBuffer tmp_buf; /* only if !mmapped */
86 };
87
88 struct _MmapWriter
89 {
90 gint fd;
91 guint64 file_size;
92 guint64 mmap_offset;
93 guint8 *mmapped;
94 guint cur_offset; /* in mmapped */
95 GskTableBuffer tmp_buf;
96 };
97
98
99 struct _FlatFileBuilder
100 {
101 GskTableBuffer input;
102
103 gboolean has_last_key;
104 GskTableBuffer first_key;
105 GskTableBuffer last_key;
106
107 GskTableBuffer uncompressed;
108 GskTableBuffer compressed;
109
110 guint n_compressed_entries;
111 guint uncompressed_data_len;
112
113 MmapWriter writers[N_FILES];
114
115 z_stream compressor;
116 GskMemPool compressor_allocator;
117 guint8 *compressor_allocator_scratchpad;
118 gsize compressor_allocator_scratchpad_len;
119
120 FlatFileBuilder *next_recycled_builder;
121 };
122
123 typedef struct _CacheEntry CacheEntry;
124 typedef struct _CacheEntryRecord CacheEntryRecord;
125 struct _CacheEntryRecord
126 {
127 guint key_len;
128 const guint8 *key_data;
129 guint value_len;
130 const guint8 *value_data;
131 };
132 struct _CacheEntry
133 {
134 guint n_entries;
135 guint64 index;
136 CacheEntry *prev_lru, *next_lru;
137 CacheEntry *bin_next;
138 CacheEntryRecord records[1]; /* must be last! */
139 };
140
141 struct _FlatFile
142 {
143 GskTableFile base_file;
144 gint fds[N_FILES];
145 FlatFileBuilder *builder;
146
147 gboolean has_readers; /* builder and has_readers are exclusive: they
148 cannot be set at the same time */
149 MmapReader readers[N_FILES];
150
151 guint cache_entries_len;
152 CacheEntry **cache_entries;
153 guint cache_entries_count;
154 guint max_cache_entries;
155 CacheEntry *most_recently_used, *least_recently_used;
156 };
157
158 struct _FlatFileReader
159 {
160 GskTableReader base_reader;
161 FILE *fps[N_FILES];
162 guint64 chunk_file_offsets[N_FILES];
163 CacheEntry *cache_entry;
164 guint record_index;
165 guint64 index_entry_index;
166 };
167
168 #define GET_LRU_LIST(file) \
169 CacheEntry *, (file)->most_recently_used, (file)->least_recently_used, \
170 prev_lru, next_lru
171
172 typedef struct _IndexEntry IndexEntry;
173 struct _IndexEntry
174 {
175 guint64 firstkeys_offset;
176 guint32 firstkeys_len;
177 guint64 compressed_data_offset;
178 guint32 compressed_data_len;
179 };
180
181 static gboolean
182 do_pread (FlatFile *ffile,
183 WhichFile f,
184 guint64 offset,
185 guint length,
186 guint8 *ptr_out,
187 GError **error);
188
189 static guint
uint32_vli_encode(guint32 to_encode,guint8 * buf)190 uint32_vli_encode (guint32 to_encode,
191 guint8 *buf) /* min length 5 */
192 {
193 if (to_encode < 0x80)
194 {
195 buf[0] = to_encode;
196 return 1;
197 }
198 else if (to_encode < (1<<14))
199 {
200 buf[0] = 0x80 | (to_encode >> 7);
201 buf[1] = to_encode & 0x7f;
202 return 2;
203 }
204 else if (to_encode < (1<<21))
205 {
206 buf[0] = 0x80 | (to_encode >> 14);
207 buf[1] = 0x80 | (to_encode >> 7);
208 buf[2] = to_encode & 0x7f;
209 return 3;
210 }
211 else if (to_encode < (1<<28))
212 {
213 buf[0] = 0x80 | (to_encode >> 21);
214 buf[1] = 0x80 | (to_encode >> 14);
215 buf[2] = 0x80 | (to_encode >> 7);
216 buf[3] = to_encode & 0x7f;
217 return 4;
218 }
219 else
220 {
221 buf[0] = 0x80 | (to_encode >> 28);
222 buf[1] = 0x80 | (to_encode >> 21);
223 buf[2] = 0x80 | (to_encode >> 14);
224 buf[3] = 0x80 | (to_encode >> 7);
225 buf[4] = to_encode & 0x7f;
226 return 5;
227 }
228 }
229 static guint
uint32_vli_decode(const guint8 * input,guint32 * decoded)230 uint32_vli_decode (const guint8 *input,
231 guint32 *decoded)
232 {
233 guint32 val = input[0] & 0x7f;
234 if (input[0] & 0x80)
235 {
236 guint used = 1;
237 do
238 {
239 val <<= 7;
240 val |= (input[used] & 0x7f);
241 }
242 while ((input[used++] & 0x80) != 0);
243 *decoded = val;
244 return used;
245 }
246 else
247 {
248 *decoded = val;
249 return 1;
250 }
251 }
252
253 typedef struct _CacheEntryTmpRecord CacheEntryTmpRecord;
254 struct _CacheEntryTmpRecord
255 {
256 guint prefix_len;
257 guint key_len;
258 guint value_len;
259 const guint8 *keydata; /* key without prefix */
260 const guint8 *value;
261 };
262 static CacheEntry *
cache_entry_deserialize(guint64 index,guint firstkey_len,const guint8 * firstkey_data,guint compressed_data_len,const guint8 * compressed_data,GError ** error)263 cache_entry_deserialize (guint64 index,
264 guint firstkey_len,
265 const guint8 *firstkey_data,
266 guint compressed_data_len,
267 const guint8 *compressed_data,
268 GError **error)
269 {
270 guint used, tmp;
271 guint n_compressed_entries, uncompressed_data_len;
272 CacheEntryTmpRecord *records, *to_free = NULL;
273 guint8 *uncompressed_data, *to_free2 = NULL;
274 guint i;
275 int zrv;
276 guint8 *uc_at;
277 guint data_size;
278 CacheEntry *rv;
279 const guint8 *last_key;
280 guint8 *heap_at;
281 used = uint32_vli_decode (compressed_data, &n_compressed_entries);
282 used += uint32_vli_decode (compressed_data + used, &uncompressed_data_len);
283
284 #if DEBUG_ENTRY_SERIALIZATION
285 g_message ("deserialize %llu: n_compressed_entry=%u, uncompressed_data_len=%u, compressed_header_len=%u, actual zlib compressed_len=%u", index, n_compressed_entries, uncompressed_data_len, used, compressed_data_len - used);
286 #if DEBUG_DUMP_COMPRESSED_DATA
287 {
288 char *hex = gsk_escape_memory_hex (compressed_data + used, compressed_data_len - used);
289 g_message (" compressed_data=%s", hex);
290 g_free (hex);
291 }
292 #endif
293 #endif
294
295 /* uncompress */
296 if (uncompressed_data_len < 32*1024)
297 uncompressed_data = g_alloca (uncompressed_data_len);
298 else
299 uncompressed_data = to_free2 = g_malloc (uncompressed_data_len);
300
301 z_stream uncompress_buf;
302 memset (&uncompress_buf, 0, sizeof (uncompress_buf));
303 inflateInit (&uncompress_buf);
304 uncompress_buf.avail_in = compressed_data_len - used;
305 uncompress_buf.next_in = (guint8 *) compressed_data + used;
306 uncompress_buf.avail_out = uncompressed_data_len;
307 uncompress_buf.next_out = uncompressed_data;
308 zrv = inflate (&uncompress_buf, Z_SYNC_FLUSH);
309 if (zrv != Z_OK)
310 {
311 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_CORRUPT,
312 "error uncompressing zlib compressed data (zrv=%d)",
313 zrv);
314 g_free (to_free2);
315 return NULL;
316 }
317
318 /* parse data */
319 if (n_compressed_entries < 512)
320 records = g_newa (CacheEntryTmpRecord, n_compressed_entries);
321 else
322 records = to_free = g_new (CacheEntryTmpRecord, n_compressed_entries);
323
324 uc_at = uncompressed_data;
325 data_size = 0;
326 #if DEBUG_ENTRY_SERIALIZATION
327 g_message ("uncompressed_data_len=%u, n_compressed_entries=%u",uncompressed_data_len,n_compressed_entries);
328 #endif
329 for (i = 0; i < n_compressed_entries; i++)
330 {
331 if (i > 0)
332 {
333 uc_at += uint32_vli_decode (uc_at, &tmp);
334 records[i].prefix_len = tmp;
335 uc_at += uint32_vli_decode (uc_at, &tmp);
336 records[i].key_len = records[i].prefix_len + tmp;
337 records[i].keydata = uc_at;
338 uc_at += tmp; /* skip keydata for now */
339 }
340 else
341 {
342 records[i].key_len = firstkey_len;
343 records[i].keydata = firstkey_data;
344 records[i].prefix_len = 0;
345 }
346 uc_at += uint32_vli_decode (uc_at, &tmp);
347 records[i].value_len = tmp;
348 records[i].value = uc_at;
349 uc_at += tmp;
350
351 data_size += records[i].key_len + records[i].value_len;
352 }
353 if (uc_at - uncompressed_data != (gssize) uncompressed_data_len)
354 {
355 g_set_error (error, GSK_G_ERROR_DOMAIN,
356 GSK_ERROR_CORRUPT,
357 "data corrupt uncompressing block (distance %d)",
358 (int)(uc_at-uncompressed_data+uncompressed_data_len));
359 g_free (to_free);
360 g_free (to_free2);
361 return NULL;
362 }
363
364 rv = g_malloc (sizeof (CacheEntry)
365 + (n_compressed_entries-1) * sizeof (CacheEntryRecord)
366 + data_size);
367 last_key = NULL;
368 heap_at = (guint8 *) (rv->records + n_compressed_entries);
369
370 rv->n_entries = n_compressed_entries;
371 rv->index = index;
372
373 for (i = 0; i < n_compressed_entries; i++)
374 {
375 guint key_len = records[i].key_len, pref_len = records[i].prefix_len;
376 guint val_len = records[i].value_len;
377 rv->records[i].key_len = key_len;
378 rv->records[i].value_len = val_len;
379 rv->records[i].key_data = heap_at;
380 memcpy (heap_at, last_key, pref_len);
381 memcpy (heap_at + pref_len, records[i].keydata, key_len - pref_len);
382 heap_at += key_len;
383 memcpy (heap_at, records[i].value, val_len);
384 rv->records[i].value_data = heap_at;
385 heap_at += val_len;
386 last_key = rv->records[i].key_data;
387 }
388 g_free (to_free);
389 g_free (to_free2);
390 return rv;
391 }
392
393 static CacheEntry *
cache_entry_force(FlatFile * ffile,guint64 index,IndexEntry * index_entry,guint8 * firstkey_data,GError ** error)394 cache_entry_force (FlatFile *ffile,
395 guint64 index,
396 IndexEntry *index_entry,
397 guint8 *firstkey_data,
398 GError **error)
399 {
400 guint bin;
401 CacheEntry *entry;
402 #if DEBUG_CACHE_ENTRY_FORCE
403 g_message ("cache_entry_force: index=%llu [key offset/length=%llu/%u; data offset/length=%llu/%u]", index,index_entry->firstkeys_offset,index_entry->firstkeys_len, index_entry->compressed_data_offset, index_entry->compressed_data_len);
404 #endif
405 if (ffile->cache_entries_len == 0)
406 {
407 ffile->cache_entries_len = g_spaced_primes_closest (ffile->max_cache_entries);
408 ffile->cache_entries = g_new0 (CacheEntry *, ffile->cache_entries_len);
409 }
410 bin = (guint) index % ffile->cache_entries_len;
411 for (entry = ffile->cache_entries[bin]; entry != NULL; entry = entry->bin_next)
412 if (entry->index == index)
413 {
414 if (entry->prev_lru != NULL)
415 {
416 GSK_LIST_REMOVE (GET_LRU_LIST (ffile), entry);
417 GSK_LIST_PREPEND (GET_LRU_LIST (ffile), entry);
418 }
419 return entry;
420 }
421
422 /* possibly evict old cache entry */
423 if (ffile->cache_entries_count == ffile->max_cache_entries)
424 {
425 CacheEntry *evicted = ffile->least_recently_used;
426 guint bin = (guint) evicted->index % ffile->cache_entries_len;
427 CacheEntry **pprev;
428 GSK_LIST_REMOVE_LAST (GET_LRU_LIST (ffile));
429
430 /* remove from hash-table */
431 for (pprev = ffile->cache_entries + bin;
432 *pprev != evicted;
433 pprev = &((*pprev)->bin_next))
434 ;
435 *pprev = evicted->bin_next;
436
437 ffile->cache_entries_count--;
438 g_free (evicted);
439 }
440
441 /* create new entry */
442 guint8 *compressed_data;
443 compressed_data = g_malloc (index_entry->compressed_data_len);
444 if (!do_pread (ffile, FILE_DATA,
445 index_entry->compressed_data_offset,
446 index_entry->compressed_data_len,
447 compressed_data, error))
448 {
449 g_free (compressed_data);
450 return NULL;
451 }
452
453 /* deserialize the cache entry */
454 entry = cache_entry_deserialize (index,
455 index_entry->firstkeys_len, firstkey_data,
456 index_entry->compressed_data_len,
457 compressed_data,
458 error);
459 if (entry == NULL)
460 {
461 g_free (compressed_data);
462 return NULL;
463 }
464 entry->bin_next = ffile->cache_entries[bin];
465 ffile->cache_entries[bin] = entry;
466 ffile->cache_entries_count++;
467 g_free (compressed_data);
468 GSK_LIST_PREPEND (GET_LRU_LIST (ffile), entry);
469
470 return entry;
471 }
472
473 /* --- mmap reading implementation --- */
474 static gboolean
mmap_reader_init(MmapReader * reader,gint fd,GError ** error)475 mmap_reader_init (MmapReader *reader,
476 gint fd,
477 GError **error)
478 {
479 struct stat stat_buf;
480 reader->fd = fd;
481 if (fstat (fd, &stat_buf) < 0)
482 {
483 g_set_error (error, GSK_G_ERROR_DOMAIN,
484 GSK_ERROR_FILE_STAT,
485 "error stating fd %d: %s",
486 fd, g_strerror (errno));
487 return FALSE;
488 }
489 reader->file_size = stat_buf.st_size;
490
491 if (reader->file_size < MAX_MMAP)
492 {
493 reader->mmapped = mmap (NULL, reader->file_size, PROT_READ, MAP_SHARED, fd, 0);
494 if (reader->mmapped == NULL || reader->mmapped == MAP_FAILED)
495 {
496 reader->mmapped = NULL;
497 g_set_error (error, GSK_G_ERROR_DOMAIN,
498 GSK_ERROR_FILE_MMAP,
499 "error mmapping fd %d: %s",
500 fd, g_strerror (errno));
501 return FALSE;
502 }
503 }
504 else
505 {
506 reader->mmapped = NULL;
507 gsk_table_buffer_init (&reader->tmp_buf);
508 }
509 return TRUE;
510 }
511
512 static gboolean
mmap_reader_pread(MmapReader * reader,guint64 offset,guint length,guint8 * data_out,GError ** error)513 mmap_reader_pread (MmapReader *reader,
514 guint64 offset,
515 guint length,
516 guint8 *data_out,
517 GError **error)
518 {
519 g_assert (offset + length <= reader->file_size);
520 if (reader->mmapped)
521 {
522 memcpy (data_out, reader->mmapped + (gsize)offset, length);
523 return TRUE;
524 }
525 else
526 {
527 gssize rv = pread (reader->fd, data_out, length, offset);
528 if (rv < 0)
529 {
530 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PREAD,
531 "error calling pread(): %s",
532 g_strerror (errno));
533 return FALSE;
534 }
535 else if (rv < (gssize) length)
536 {
537 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PREMATURE_EOF,
538 "premature end-of-file calling pread() (mmap reader pread; offset=%"G_GUINT64_FORMAT"; length=%u, got=%u)",
539 offset, length, (guint) rv);
540 return FALSE;
541 }
542 return TRUE;
543 }
544 }
545
546 static void
mmap_reader_clear(MmapReader * reader)547 mmap_reader_clear (MmapReader *reader)
548 {
549 if (reader->mmapped)
550 munmap (reader->mmapped, reader->file_size);
551 else
552 gsk_table_buffer_clear (&reader->tmp_buf);
553 }
554
555 static gboolean
mmap_writer_init_at(MmapWriter * writer,gint fd,guint64 offset,GError ** error)556 mmap_writer_init_at (MmapWriter *writer,
557 gint fd,
558 guint64 offset,
559 GError **error)
560 {
561 guint64 mmap_offset = offset & (~(guint64)(MMAP_WRITER_SIZE-1));
562 struct stat stat_buf;
563 guint64 file_size;
564 writer->fd = fd;
565 if (fstat (fd, &stat_buf) < 0)
566 {
567 g_set_error (error, GSK_G_ERROR_DOMAIN,
568 GSK_ERROR_FILE_STAT,
569 "error getting size of file-descriptor %d: %s",
570 fd, g_strerror (errno));
571 return FALSE;
572 }
573 file_size = stat_buf.st_size;
574 if (mmap_offset + MMAP_WRITER_SIZE > file_size)
575 {
576 if (ftruncate (fd, mmap_offset + MMAP_WRITER_SIZE) < 0)
577 {
578 g_set_error (error, GSK_G_ERROR_DOMAIN,
579 GSK_ERROR_FILE_STAT,
580 "error expanding mmap writer file size: %s",
581 g_strerror (errno));
582 return FALSE;
583 }
584 file_size = mmap_offset + MMAP_WRITER_SIZE;
585 }
586 writer->file_size = file_size;
587 writer->mmap_offset = mmap_offset;
588 writer->mmapped = mmap (NULL, MMAP_WRITER_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED, fd, mmap_offset);
589 if (writer->mmapped == MAP_FAILED)
590 {
591 writer->mmapped = NULL;
592 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_MMAP,
593 "error mmapping for writing: %s",
594 g_strerror (errno));
595 return FALSE;
596 }
597 writer->cur_offset = offset - mmap_offset;
598 gsk_table_buffer_init (&writer->tmp_buf);
599 return TRUE;
600 }
601
602 static inline guint64
mmap_writer_offset(MmapWriter * writer)603 mmap_writer_offset (MmapWriter *writer)
604 {
605 return writer->mmap_offset + writer->cur_offset;
606 }
607
608 static gboolean
writer_advance_to_next_page(MmapWriter * writer,GError ** error)609 writer_advance_to_next_page (MmapWriter *writer,
610 GError **error)
611 {
612 munmap (writer->mmapped, MMAP_WRITER_SIZE);
613
614 writer->mmap_offset += MMAP_WRITER_SIZE;
615
616 if (writer->mmap_offset + MMAP_WRITER_SIZE > writer->file_size)
617 {
618 if (ftruncate (writer->fd, writer->mmap_offset + MMAP_WRITER_SIZE) < 0)
619 {
620 g_set_error (error, GSK_G_ERROR_DOMAIN,
621 GSK_ERROR_FILE_STAT,
622 "error expanding mmap writer file size: %s",
623 g_strerror (errno));
624 return FALSE;
625 }
626 writer->file_size = writer->mmap_offset + MMAP_WRITER_SIZE;
627 }
628 writer->cur_offset = 0;
629 writer->mmapped = mmap (NULL, MMAP_WRITER_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED, writer->fd, writer->mmap_offset);
630 if (writer->mmapped == MAP_FAILED)
631 {
632 writer->mmapped = NULL;
633 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_MMAP,
634 "mmap failed on writer: %s",
635 g_strerror (errno));
636 return FALSE;
637 }
638
639 return TRUE;
640 }
641
642 static gboolean
mmap_writer_write(MmapWriter * writer,guint len,const guint8 * data,GError ** error)643 mmap_writer_write (MmapWriter *writer,
644 guint len,
645 const guint8 *data,
646 GError **error)
647 {
648 if (G_LIKELY (writer->cur_offset + len < MMAP_WRITER_SIZE))
649 {
650 memcpy (writer->mmapped + writer->cur_offset, data, len);
651 writer->cur_offset += len;
652 }
653 else
654 {
655 guint n_written = MMAP_WRITER_SIZE - writer->cur_offset;
656 memcpy (writer->mmapped + writer->cur_offset, data, n_written);
657
658 /* advance to next page */
659 if (!writer_advance_to_next_page (writer, error))
660 return FALSE;
661
662 while (G_UNLIKELY (n_written + MMAP_WRITER_SIZE <= len))
663 {
664 /* write a full page */
665 memcpy (writer->mmapped, data + n_written, MMAP_WRITER_SIZE);
666 n_written += MMAP_WRITER_SIZE;
667
668 /* advance to next page */
669 if (!writer_advance_to_next_page (writer, error))
670 return FALSE;
671 }
672 if (G_LIKELY (n_written < len))
673 {
674 memcpy (writer->mmapped, data + n_written, len - n_written);
675 writer->cur_offset = len - n_written;
676 }
677 }
678 return TRUE;
679 }
680
681 static gboolean
mmap_writer_pread(MmapWriter * writer,guint64 offset,guint length,guint8 * data_out,GError ** error)682 mmap_writer_pread (MmapWriter *writer,
683 guint64 offset,
684 guint length,
685 guint8 *data_out,
686 GError **error)
687 {
688 g_assert (offset + length <= writer->mmap_offset + writer->cur_offset);
689 if (offset + length <= writer->mmap_offset)
690 {
691 /* pure pread() */
692 gssize rv = pread (writer->fd, data_out, length, offset);
693 if (rv < 0)
694 {
695 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PREAD,
696 "error calling pread(): %s",
697 g_strerror (errno));
698 return FALSE;
699 }
700 else if (rv < (gssize) length)
701 {
702 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PREMATURE_EOF,
703 "premature end-of-file calling pread() (mmap writer pread; offset=%"G_GUINT64_FORMAT"; length=%u, got=%u; case 0)",
704 offset, length, (guint) rv);
705 return FALSE;
706 }
707 return TRUE;
708 }
709 else if (offset < writer->mmap_offset)
710 {
711 /* pread() + memcpy() */
712 guint pread_len = writer->mmap_offset - offset;
713 gssize rv = pread (writer->fd, data_out, pread_len, offset);
714 if (rv < 0)
715 {
716 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PREAD,
717 "error calling pread(): %s",
718 g_strerror (errno));
719 return FALSE;
720 }
721 else if (rv < (gssize) pread_len)
722 {
723 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PREMATURE_EOF,
724 "premature end-of-file calling pread() (mmap writer pread; offset=%"G_GUINT64_FORMAT"; length=%u, got=%u; case 1)",
725 offset, length, (guint) rv);
726 return FALSE;
727 }
728 memcpy (data_out + pread_len, writer->mmapped, length - pread_len);
729 return TRUE;
730 }
731 else
732 {
733 /* pure memcpy() */
734 guint buf_offset = offset - writer->mmap_offset;
735 memcpy (data_out, writer->mmapped + buf_offset, length);
736 return TRUE;
737 }
738 }
739
740 static void
mmap_writer_clear(MmapWriter * writer)741 mmap_writer_clear (MmapWriter *writer)
742 {
743 munmap (writer->mmapped, MMAP_WRITER_SIZE);
744 gsk_table_buffer_clear (&writer->tmp_buf);
745 }
746
747
748 /* --- index entry serialization --- */
749 #define SIZEOF_INDEX_ENTRY 24
750 #define INDEX_HEADER_SIZE 8 /* the number of records, as a LE int64 */
751 /* each entry in index file is:
752 8 bytes -- initial key offset
753 4 bytes -- initial key length
754 8 bytes -- data offset
755 4 bytes -- data length
756 */
757
758 static void
index_entry_serialize(const IndexEntry * index_entry,guint8 * data_out)759 index_entry_serialize (const IndexEntry *index_entry,
760 guint8 *data_out)
761 {
762 guint32 tmp32, tmp32_le;
763 guint64 tmp64, tmp64_le;
764
765 tmp64 = index_entry->firstkeys_offset;
766 tmp64_le = GUINT64_TO_LE (tmp64);
767 memcpy (data_out + 0, &tmp64_le, 8);
768 tmp32 = index_entry->firstkeys_len;
769 tmp32_le = GUINT32_TO_LE (tmp32);
770 memcpy (data_out + 8, &tmp32_le, 4);
771
772 tmp64 = index_entry->compressed_data_offset;
773 tmp64_le = GUINT64_TO_LE (tmp64);
774 memcpy (data_out + 12, &tmp64_le, 8);
775 tmp32 = index_entry->compressed_data_len;
776 tmp32_le = GUINT32_TO_LE (tmp32);
777 memcpy (data_out + 20, &tmp32_le, 4);
778 }
779
780 static void
index_entry_deserialize(const guint8 * data_in,IndexEntry * index_entry_out)781 index_entry_deserialize (const guint8 *data_in,
782 IndexEntry *index_entry_out)
783 {
784 guint32 tmp32_le;
785 guint64 tmp64_le;
786
787 memcpy (&tmp64_le, data_in + 0, 8);
788 index_entry_out->firstkeys_offset = GUINT64_FROM_LE (tmp64_le);
789 memcpy (&tmp32_le, data_in + 8, 4);
790 index_entry_out->firstkeys_len = GUINT32_FROM_LE (tmp32_le);
791 memcpy (&tmp64_le, data_in + 12, 8);
792 index_entry_out->compressed_data_offset = GUINT64_FROM_LE (tmp64_le);
793 memcpy (&tmp32_le, data_in + 20, 4);
794 index_entry_out->compressed_data_len = GUINT32_FROM_LE (tmp32_le);
795 }
796
797
my_mem_pool_alloc(voidpf opaque,uInt items,uInt size)798 static voidpf my_mem_pool_alloc (voidpf opaque, uInt items, uInt size)
799 {
800 FlatFileBuilder *builder = opaque;
801 /* TODO: hack: use alloc0 to avoid uninitialized warnings in valgrind */
802 return gsk_mem_pool_alloc0 (&builder->compressor_allocator, items*size);
803 }
my_mem_pool_free(voidpf opaque,voidpf address)804 static void my_mem_pool_free (voidpf opaque, voidpf address)
805 {
806 }
807
808 static inline void
reinit_compressor(FlatFileBuilder * builder,guint compression_level,gboolean preowned_mempool)809 reinit_compressor (FlatFileBuilder *builder,
810 guint compression_level,
811 gboolean preowned_mempool)
812 {
813 if (preowned_mempool)
814 {
815 if (builder->compressor_allocator.all_chunk_list != NULL)
816 {
817 gsk_mem_pool_destruct (&builder->compressor_allocator);
818 builder->compressor_allocator_scratchpad_len *= 2;
819 builder->compressor_allocator_scratchpad = g_realloc (builder->compressor_allocator_scratchpad,
820 builder->compressor_allocator_scratchpad_len);
821 }
822 }
823 gsk_mem_pool_construct_with_scratch_buf (&builder->compressor_allocator,
824 builder->compressor_allocator_scratchpad,
825 builder->compressor_allocator_scratchpad_len);
826 memset (&builder->compressor, 0, sizeof (z_stream));
827 builder->compressor.zalloc = my_mem_pool_alloc;
828 builder->compressor.zfree = my_mem_pool_free;
829 builder->compressor.opaque = builder;
830 deflateInit (&builder->compressor, compression_level);
831 builder->n_compressed_entries = 0;
832 builder->uncompressed_data_len = 0;
833 builder->has_last_key = FALSE;
834 gsk_table_buffer_set_len (&builder->compressed, 0);
835 }
836
837 static FlatFileBuilder *
flat_file_builder_new(FlatFactory * factory)838 flat_file_builder_new (FlatFactory *factory)
839 {
840 if (factory->recycled_builders)
841 {
842 FlatFileBuilder *builder = factory->recycled_builders;
843 factory->recycled_builders = builder->next_recycled_builder;
844 factory->n_recycled_builders--;
845 g_assert (builder->n_compressed_entries == 0
846 && builder->uncompressed_data_len == 0);
847 return builder;
848 }
849 else
850 {
851 FlatFileBuilder *builder = g_slice_new (FlatFileBuilder);
852 gsk_table_buffer_init (&builder->input);
853 gsk_table_buffer_init (&builder->first_key);
854 gsk_table_buffer_init (&builder->last_key);
855 gsk_table_buffer_init (&builder->compressed);
856 gsk_table_buffer_init (&builder->uncompressed);
857 builder->compressor_allocator_scratchpad_len = 1024;
858 builder->compressor_allocator_scratchpad = g_malloc (builder->compressor_allocator_scratchpad_len);
859 reinit_compressor (builder, factory->compression_level, FALSE);
860 return builder;
861 }
862 }
863
864 static void
builder_recycle(FlatFactory * ffactory,FlatFileBuilder * builder)865 builder_recycle (FlatFactory *ffactory,
866 FlatFileBuilder *builder)
867 {
868 if (ffactory->n_recycled_builders == ffactory->max_recycled_builders)
869 {
870 gsk_table_buffer_clear (&builder->input);
871 gsk_table_buffer_clear (&builder->first_key);
872 gsk_table_buffer_clear (&builder->last_key);
873 gsk_table_buffer_clear (&builder->compressed);
874 gsk_table_buffer_clear (&builder->uncompressed);
875 gsk_mem_pool_destruct (&builder->compressor_allocator);
876 g_free (builder->compressor_allocator_scratchpad);
877 g_slice_free (FlatFileBuilder, builder);
878 }
879 else
880 {
881 reinit_compressor (builder, ffactory->compression_level, TRUE);
882 builder->next_recycled_builder = ffactory->recycled_builders;
883 ffactory->recycled_builders = builder;
884 ffactory->n_recycled_builders++;
885 }
886 }
887
888 typedef enum
889 {
890 OPEN_MODE_CREATE,
891 OPEN_MODE_CONTINUE_CREATE,
892 OPEN_MODE_READONLY
893 } OpenMode;
894
895 static gboolean
open_3_files(FlatFile * file,const char * dir,guint64 id,OpenMode open_mode,GError ** error)896 open_3_files (FlatFile *file,
897 const char *dir,
898 guint64 id,
899 OpenMode open_mode,
900 GError **error)
901 {
902 char fname_buf[GSK_TABLE_MAX_PATH];
903 guint open_flags;
904 const char *participle;
905 guint f;
906 switch (open_mode)
907 {
908 case OPEN_MODE_CREATE:
909 open_flags = O_RDWR | O_CREAT | O_TRUNC;
910 participle = "creating";
911 break;
912 case OPEN_MODE_CONTINUE_CREATE:
913 open_flags = O_RDWR;
914 participle = "opening for writing";
915 break;
916 case OPEN_MODE_READONLY:
917 open_flags = O_RDONLY;
918 participle = "opening for reading";
919 break;
920 default:
921 g_assert_not_reached ();
922 }
923
924 for (f = 0; f < N_FILES; f++)
925 {
926 gsk_table_mk_fname (fname_buf, dir, id, file_extensions[f]);
927 file->fds[f] = open (fname_buf, open_flags, 0644);
928 if (file->fds[f] < 0)
929 {
930 guint tmp_f;
931 g_set_error (error, GSK_G_ERROR_DOMAIN,
932 GSK_ERROR_FILE_CREATE,
933 "error %s %s: %s",
934 participle, fname_buf, g_strerror (errno));
935 for (tmp_f = 0; tmp_f < f; tmp_f++)
936 close (file->fds[tmp_f]);
937 return FALSE;
938 }
939 }
940 return TRUE;
941 }
942
943 static GskTableFile *
flat__create_file(GskTableFileFactory * factory,const char * dir,guint64 id,const GskTableFileHints * hints,GError ** error)944 flat__create_file (GskTableFileFactory *factory,
945 const char *dir,
946 guint64 id,
947 const GskTableFileHints *hints,
948 GError **error)
949 {
950 FlatFactory *ffactory = (FlatFactory *) factory;
951 FlatFile *rv = g_slice_new (FlatFile);
952 guint f;
953 rv->base_file.factory = factory;
954 rv->base_file.id = id;
955 rv->base_file.n_entries = 0;
956
957 if (!open_3_files (rv, dir, id, OPEN_MODE_CREATE, error))
958 {
959 g_slice_free (FlatFile, rv);
960 return NULL;
961 }
962 rv->builder = flat_file_builder_new (ffactory);
963 for (f = 0; f < N_FILES; f++)
964 {
965 if (!mmap_writer_init_at (&rv->builder->writers[f], rv->fds[f], 0, error))
966 {
967 guint tmp_f;
968 for (tmp_f = 0; tmp_f < f; tmp_f++)
969 mmap_writer_clear (&rv->builder->writers[tmp_f]);
970 for (f = 0; f < N_FILES; f++)
971 close (rv->fds[f]);
972 builder_recycle (ffactory, rv->builder);
973 g_slice_free (FlatFile, rv);
974 return NULL;
975 }
976 }
977
978 /* write the index file's header */
979 {
980 guint64 zero_le = 0;
981 if (!mmap_writer_write (&rv->builder->writers[FILE_INDEX], 8,
982 (guint8 *) &zero_le, error))
983 {
984 for (f = 0; f < N_FILES; f++)
985 {
986 mmap_writer_clear (&rv->builder->writers[f]);
987 close (rv->fds[f]);
988 }
989 builder_recycle (ffactory, rv->builder);
990 g_slice_free (FlatFile, rv);
991 return NULL;
992 }
993 }
994
995
996 rv->has_readers = FALSE;
997 rv->cache_entries_len = 0;
998 rv->cache_entries = NULL;
999 rv->cache_entries_count = 0;
1000 rv->max_cache_entries = ffactory->max_cache_entries;
1001 return &rv->base_file;
1002 }
1003
1004 static GskTableFile *
flat__open_building_file(GskTableFileFactory * factory,const char * dir,guint64 id,guint state_len,const guint8 * state_data,GError ** error)1005 flat__open_building_file(GskTableFileFactory *factory,
1006 const char *dir,
1007 guint64 id,
1008 guint state_len,
1009 const guint8 *state_data,
1010 GError **error)
1011 {
1012 FlatFactory *ffactory = (FlatFactory *) factory;
1013 FlatFile *rv = g_slice_new (FlatFile);
1014 rv->base_file.factory = factory;
1015 rv->base_file.id = id;
1016 if (!open_3_files (rv, dir, id, OPEN_MODE_CONTINUE_CREATE, error))
1017 {
1018 g_slice_free (FlatFile, rv);
1019 return NULL;
1020 }
1021
1022 rv->builder = flat_file_builder_new (ffactory);
1023
1024 /* seek according to 'state_data' */
1025 g_assert (state_len == 33);
1026 g_assert (state_data[0] == 0);
1027 {
1028 guint f;
1029 for (f = 0; f < N_FILES; f++)
1030 {
1031 guint64 offset_le;
1032 guint64 offset;
1033 memcpy (&offset_le, state_data + 8 * f + 1, 8);
1034 offset = GUINT64_FROM_LE (offset_le);
1035 if (!mmap_writer_init_at (&rv->builder->writers[f], rv->fds[f], offset, error))
1036 {
1037 guint tmp_f;
1038 for (tmp_f = 0; tmp_f < f; tmp_f++)
1039 mmap_writer_clear (&rv->builder->writers[tmp_f]);
1040 for (tmp_f = 0; tmp_f < N_FILES; tmp_f++)
1041 close (rv->fds[tmp_f]);
1042 builder_recycle (ffactory, rv->builder);
1043 g_slice_free (FlatFile, rv);
1044 return NULL;
1045 }
1046 }
1047 {
1048 guint64 n_entries_le, n_entries;
1049 memcpy (&n_entries_le, state_data + 1 + 3*8, 8);
1050 n_entries = GUINT64_FROM_LE (n_entries_le);
1051 rv->base_file.n_entries = n_entries;
1052 }
1053 }
1054 rv->has_readers = FALSE;
1055
1056 rv->cache_entries_len = 0;
1057 rv->cache_entries = NULL;
1058 rv->cache_entries_count = 0;
1059 rv->max_cache_entries = ffactory->max_cache_entries;
1060
1061 return &rv->base_file;
1062 }
1063
1064 GskTableFile *
flat__open_file(GskTableFileFactory * factory,const char * dir,guint64 id,GError ** error)1065 flat__open_file (GskTableFileFactory *factory,
1066 const char *dir,
1067 guint64 id,
1068 GError **error)
1069 {
1070 FlatFactory *ffactory = (FlatFactory *) factory;
1071 FlatFile *rv = g_slice_new (FlatFile);
1072 guint f;
1073 rv->base_file.factory = factory;
1074 rv->base_file.id = id;
1075 if (!open_3_files (rv, dir, id, OPEN_MODE_READONLY, error))
1076 {
1077 g_slice_free (FlatFile, rv);
1078 return NULL;
1079 }
1080 rv->builder = NULL;
1081
1082 /* pread() to get the number of records */
1083 {
1084 guint64 n_entries_le;
1085 int prv = pread (rv->fds[FILE_INDEX], &n_entries_le, 8, 0);
1086 if (prv < 0)
1087 {
1088 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PREAD,
1089 "error reading nrecords from index file: %s",
1090 g_strerror (errno));
1091 for (f = 0; f < N_FILES; f++)
1092 close (rv->fds[f]);
1093 g_slice_free (FlatFile, rv);
1094 return NULL;
1095 }
1096 if (prv < 8)
1097 {
1098 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PREAD,
1099 "premature eof reading nrecords from index file: %s",
1100 g_strerror (errno));
1101 for (f = 0; f < N_FILES; f++)
1102 close (rv->fds[f]);
1103 g_slice_free (FlatFile, rv);
1104 return NULL;
1105 }
1106 rv->base_file.n_entries = GUINT64_FROM_LE (n_entries_le);
1107 }
1108
1109 /* mmap small files for reading */
1110 for (f = 0; f < N_FILES; f++)
1111 {
1112 if (!mmap_reader_init (&rv->readers[f], rv->fds[f], error))
1113 {
1114 guint tmp_f;
1115 for (tmp_f = 0; tmp_f < f; tmp_f++)
1116 mmap_reader_clear (&rv->readers[tmp_f]);
1117 for (f = 0; f < N_FILES; f++)
1118 close (rv->fds[f]);
1119 g_slice_free (FlatFile, rv);
1120 return NULL;
1121 }
1122 }
1123 rv->has_readers = TRUE;
1124
1125 rv->cache_entries_len = 0;
1126 rv->cache_entries = NULL;
1127 rv->cache_entries_count = 0;
1128 rv->max_cache_entries = ffactory->max_cache_entries;
1129
1130 return &rv->base_file;
1131 }
1132
1133 static inline void
do_compress(FlatFileBuilder * builder,guint len,const guint8 * data)1134 do_compress (FlatFileBuilder *builder,
1135 guint len,
1136 const guint8 *data)
1137 {
1138 //g_message ("do_compress: len=%u",len);
1139 builder->uncompressed_data_len += len;
1140
1141 /* ensure there is enough data at the end of 'compressed' */
1142 gsk_table_buffer_ensure_extra (&builder->compressed, len / 2 + 16);
1143
1144 /* initialize input and output buffers */
1145 builder->compressor.next_in = (Bytef *) data;
1146 builder->compressor.avail_in = len;
1147 builder->compressor.next_out = builder->compressed.data
1148 + builder->compressed.len;
1149 builder->compressor.avail_out = builder->compressed.alloced
1150 - builder->compressed.len;
1151
1152 /* deflate until all input consumed */
1153 while (builder->compressor.avail_in > 0)
1154 {
1155
1156 int zrv;
1157 retry_deflate:
1158 zrv = deflate (&builder->compressor, 0);
1159 g_assert (zrv == Z_OK);
1160 builder->compressed.len = (guint8 *) builder->compressor.next_out
1161 - (guint8 *) builder->compressed.data;
1162
1163 if (builder->compressor.avail_out == 0)
1164 {
1165 gsk_table_buffer_ensure_extra (&builder->compressed,
1166 builder->compressor.avail_in / 2 + 16);
1167 builder->compressor.next_out = builder->compressed.data
1168 + builder->compressed.len;
1169 builder->compressor.avail_out = builder->compressed.alloced
1170 - builder->compressed.len;
1171 goto retry_deflate;
1172 }
1173 }
1174 }
1175
1176 static void
do_compress_flush(FlatFileBuilder * builder)1177 do_compress_flush (FlatFileBuilder *builder)
1178 {
1179 /* 6 bytes is sufficient according to the zlib header file docs;
1180 add 10 for good measure. */
1181 gsk_table_buffer_ensure_extra (&builder->compressed, 6 + 10);
1182 builder->compressor.next_in = NULL;
1183 builder->compressor.avail_in = 0;
1184 builder->compressor.next_out = builder->compressed.data
1185 + builder->compressed.len;
1186 builder->compressor.avail_out = builder->compressed.alloced
1187 - builder->compressed.len;
1188 for (;;)
1189 {
1190 if (deflate (&builder->compressor, Z_SYNC_FLUSH) != Z_OK)
1191 g_assert_not_reached ();
1192 builder->compressed.len = builder->compressor.next_out
1193 - builder->compressed.data;
1194 if (builder->compressor.avail_out > 0)
1195 break;
1196
1197 gsk_table_buffer_ensure_extra (&builder->compressed, 64);
1198
1199 builder->compressor.next_out = builder->compressed.data
1200 + builder->compressed.len;
1201 builder->compressor.avail_out = builder->compressed.alloced
1202 - builder->compressed.len;
1203 }
1204 }
1205
1206 static gboolean
flush_to_files(FlatFileBuilder * builder,GError ** error)1207 flush_to_files (FlatFileBuilder *builder,
1208 GError **error)
1209 {
1210 /* emit index, keyfile and data file stuff */
1211 guint8 header[SIZEOF_INDEX_ENTRY];
1212 guint8 compressed_header[5 + 5];
1213 guint compressed_header_len = 0;
1214 IndexEntry index_entry;
1215 guint tmp;
1216
1217 /* flush compressor */
1218 do_compress_flush (builder);
1219
1220 /* write uncompressed_data_len and n_compressed_entries
1221 to the compressed_header */
1222 compressed_header_len = uint32_vli_encode (builder->n_compressed_entries,
1223 compressed_header);
1224 tmp = uint32_vli_encode (builder->uncompressed_data_len,
1225 compressed_header + compressed_header_len);
1226 compressed_header_len += tmp;
1227
1228 #if DEBUG_FLUSH
1229 g_message ("flush_to_files: n_compressed_entry=%u, uncompressed_data_len=%u, compressed_header_len=%u, compressed_len=%u", builder->n_compressed_entries, builder->uncompressed_data_len, compressed_header_len, builder->compressed.len);
1230 #if DEBUG_DUMP_COMPRESSED_DATA
1231 {
1232 char *hex = gsk_escape_memory_hex (builder->compressed.data, builder->compressed.len);
1233 g_message (" compressed_data=%s", hex);
1234 g_free (hex);
1235 }
1236 #endif
1237 #endif
1238
1239 /* encode index entry */
1240 index_entry.firstkeys_offset = mmap_writer_offset (&builder->writers[FILE_FIRSTKEYS]);
1241 index_entry.firstkeys_len = builder->first_key.len;
1242 index_entry.compressed_data_offset = mmap_writer_offset (&builder->writers[FILE_DATA]);
1243 index_entry.compressed_data_len = compressed_header_len + builder->compressed.len;
1244 #if DEBUG_INDEX_ENTRIES
1245 g_message ("writing index entry firstkey offset/len=%llu/%u; compressed %llu/%u", index_entry.firstkeys_offset, index_entry.firstkeys_len, index_entry.compressed_data_offset, index_entry.compressed_data_len);
1246 #endif
1247 index_entry_serialize (&index_entry, header);
1248
1249 /* write data to files */
1250 if (!mmap_writer_write (builder->writers + FILE_INDEX, SIZEOF_INDEX_ENTRY, header, error)
1251 || !mmap_writer_write (builder->writers + FILE_FIRSTKEYS, builder->first_key.len, builder->first_key.data, error)
1252 || !mmap_writer_write (builder->writers + FILE_DATA, compressed_header_len, compressed_header, error)
1253 || !mmap_writer_write (builder->writers + FILE_DATA, builder->compressed.len, builder->compressed.data, error))
1254 return FALSE;
1255 return TRUE;
1256 }
1257
1258 /* methods for a file which is being built */
1259 static GskTableFeedEntryResult
flat__feed_entry(GskTableFile * file,guint key_len,const guint8 * key_data,guint value_len,const guint8 * value_data,GError ** error)1260 flat__feed_entry (GskTableFile *file,
1261 guint key_len,
1262 const guint8 *key_data,
1263 guint value_len,
1264 const guint8 *value_data,
1265 GError **error)
1266 {
1267 FlatFile *ffile = (FlatFile *) file;
1268 FlatFactory *ffactory = (FlatFactory *) file->factory;
1269 FlatFileBuilder *builder = ffile->builder;
1270 guint8 enc_buf[5+5];
1271 guint encoded_len, tmp;
1272
1273 g_assert (builder != NULL);
1274
1275 file->n_entries++;
1276
1277 if (builder->has_last_key)
1278 {
1279 /* compute prefix length */
1280 guint prefix_len = 0;
1281 guint max_prefix_len = MIN (key_len, builder->last_key.len);
1282 while (prefix_len < max_prefix_len
1283 && key_data[prefix_len] == builder->last_key.data[prefix_len])
1284 prefix_len++;
1285
1286 /* encode prefix_length, and (key_len-prefix_length) */
1287 encoded_len = uint32_vli_encode (prefix_len, enc_buf);
1288 tmp = uint32_vli_encode (key_len - prefix_len, enc_buf + encoded_len);
1289 encoded_len += tmp;
1290 memcpy (gsk_table_buffer_set_len (&builder->uncompressed, encoded_len),
1291 enc_buf, encoded_len);
1292
1293 /* copy non-prefix portion of key */
1294 memcpy (gsk_table_buffer_append (&builder->uncompressed, key_len - prefix_len),
1295 key_data + prefix_len, key_len - prefix_len);
1296 }
1297 else
1298 {
1299 /* the key's length will be in the index file
1300 (no prefix-compression possible on the first key);
1301 the key's data will be in the firstkeys file */
1302 builder->has_last_key = TRUE;
1303 memcpy (gsk_table_buffer_set_len (&builder->first_key,
1304 key_len), key_data, key_len);
1305 gsk_table_buffer_set_len (&builder->uncompressed, 0);
1306 }
1307
1308 builder->n_compressed_entries++;
1309
1310 /* encode value length */
1311 encoded_len = uint32_vli_encode (value_len, enc_buf);
1312 memcpy (gsk_table_buffer_append (&builder->uncompressed, encoded_len),
1313 enc_buf, encoded_len);
1314
1315 /* compress the non-value portion */
1316 do_compress (builder, builder->uncompressed.len, builder->uncompressed.data);
1317
1318 /* compress the value portion */
1319 do_compress (builder, value_len, value_data);
1320
1321 if (builder->compressed.len >= ffactory->bytes_per_chunk)
1322 {
1323 if (!flush_to_files (builder, error))
1324 return GSK_TABLE_FEED_ENTRY_ERROR;
1325
1326 reinit_compressor (builder, ffactory->compression_level, TRUE);
1327 builder->has_last_key = FALSE;
1328 }
1329 else
1330 {
1331 builder->has_last_key = TRUE;
1332 memcpy (gsk_table_buffer_set_len (&builder->last_key,
1333 key_len), key_data, key_len);
1334 }
1335 return builder->has_last_key ? GSK_TABLE_FEED_ENTRY_WANT_MORE
1336 : GSK_TABLE_FEED_ENTRY_SUCCESS;
1337 }
1338
1339 static gboolean
flat__done_feeding(GskTableFile * file,gboolean * ready_out,GError ** error)1340 flat__done_feeding (GskTableFile *file,
1341 gboolean *ready_out,
1342 GError **error)
1343 {
1344 FlatFile *ffile = (FlatFile *) file;
1345 FlatFactory *ffactory = (FlatFactory *) file->factory;
1346 FlatFileBuilder *builder = ffile->builder;
1347 guint f;
1348 if (builder->has_last_key)
1349 {
1350 if (!flush_to_files (builder, error))
1351 return FALSE;
1352 }
1353
1354 /* unmap and ftruncate all files */
1355 for (f = 0; f < N_FILES; f++)
1356 {
1357 guint64 offset = mmap_writer_offset (&builder->writers[f]);
1358 mmap_writer_clear (&builder->writers[f]);
1359 if (ftruncate (ffile->fds[f], offset) < 0)
1360 {
1361 g_set_error (error, GSK_G_ERROR_DOMAIN,
1362 GSK_ERROR_FILE_TRUNCATE,
1363 "error truncating %s file: %s",
1364 file_extensions[f], g_strerror (errno));
1365 return FALSE;
1366 }
1367 }
1368
1369 /* write the number of records to the front */
1370 {
1371 guint64 n_entries = file->n_entries;
1372 guint64 n_entries_le = GUINT64_TO_LE (n_entries);
1373 int pwrite_rv = pwrite (ffile->fds[FILE_INDEX], &n_entries_le, 8, 0);
1374 if (pwrite_rv < 0)
1375 {
1376 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PWRITE,
1377 "pwrite failed writing n_entries: %s",
1378 g_strerror (errno));
1379 return FALSE;
1380 }
1381 if (pwrite_rv < 8)
1382 {
1383 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PWRITE,
1384 "pwrite partial data write???");
1385 return FALSE;
1386 }
1387 }
1388
1389 /* mmap for reading small files */
1390 for (f = 0; f < N_FILES; f++)
1391 if (!mmap_reader_init (&ffile->readers[f], ffile->fds[f], error))
1392 {
1393 guint tmp_f;
1394 for (tmp_f = 0; tmp_f < f; tmp_f++)
1395 mmap_reader_clear (&ffile->readers[tmp_f]);
1396 return FALSE;
1397 }
1398 ffile->has_readers = TRUE;
1399
1400 /* recycle/free the builder object */
1401 ffile->builder = NULL;
1402 builder_recycle (ffactory, builder);
1403 *ready_out = TRUE;
1404
1405 return TRUE;
1406 }
1407
1408 static gboolean
flat__get_build_state(GskTableFile * file,guint * state_len_out,guint8 ** state_data_out,GError ** error)1409 flat__get_build_state (GskTableFile *file,
1410 guint *state_len_out,
1411 guint8 **state_data_out,
1412 GError **error)
1413 {
1414 guint f;
1415 FlatFile *ffile = (FlatFile *) file;
1416 FlatFileBuilder *builder = ffile->builder;
1417 guint8 *data;
1418 g_assert (builder != NULL);
1419 *state_len_out = 1 + 3 * 8 + 8;
1420 data = *state_data_out = g_malloc (*state_len_out);
1421 data[0] = 0; /* phase 0; reserved to allow multiphase processing in future */
1422 for (f = 0; f < N_FILES; f++)
1423 {
1424 guint64 offset = mmap_writer_offset (builder->writers + f);
1425 guint64 offset_le = GUINT64_TO_LE (offset);
1426 memcpy (data + 8 * f + 1, &offset_le, 8);
1427 }
1428 {
1429 guint64 n_entries = file->n_entries;
1430 guint64 n_entries_le = GUINT64_TO_LE (n_entries);
1431 memcpy (data + 1 + 3*8, &n_entries_le, 8);
1432 }
1433 return TRUE;
1434 }
1435
1436 static gboolean
flat__build_file(GskTableFile * file,gboolean * ready_out,GError ** error)1437 flat__build_file (GskTableFile *file,
1438 gboolean *ready_out,
1439 GError **error)
1440 {
1441 *ready_out = TRUE;
1442 return TRUE;
1443 }
1444
1445 static void
flat__release_build_data(GskTableFile * file)1446 flat__release_build_data(GskTableFile *file)
1447 {
1448 /* nothing to do, since we finish building immediately */
1449 }
1450
1451 /* --- query api --- */
1452 static gboolean
do_pread(FlatFile * ffile,WhichFile f,guint64 offset,guint length,guint8 * ptr_out,GError ** error)1453 do_pread (FlatFile *ffile,
1454 WhichFile f,
1455 guint64 offset,
1456 guint length,
1457 guint8 *ptr_out,
1458 GError **error)
1459 {
1460 if (ffile->builder)
1461 {
1462 return mmap_writer_pread (ffile->builder->writers + f,
1463 offset, length, ptr_out, error);
1464 }
1465 else
1466 {
1467 g_assert (ffile->has_readers);
1468 return mmap_reader_pread (ffile->readers + f,
1469 offset, length, ptr_out, error);
1470 }
1471 }
1472
1473 static gboolean
flat__query_file(GskTableFile * file,GskTableFileQuery * query_inout,GError ** error)1474 flat__query_file (GskTableFile *file,
1475 GskTableFileQuery *query_inout,
1476 GError **error)
1477 {
1478 FlatFile *ffile = (FlatFile *) file;
1479 guint64 n_index_records, first, n;
1480 CacheEntry *cache_entry;
1481 IndexEntry index_entry;
1482 gboolean index_entry_up_to_date = FALSE;
1483 guint8 index_entry_data[SIZEOF_INDEX_ENTRY];
1484 if (ffile->builder != NULL)
1485 n_index_records = (mmap_writer_offset (&ffile->builder->writers[FILE_INDEX]) - INDEX_HEADER_SIZE)
1486 / SIZEOF_INDEX_ENTRY;
1487 else if (ffile->has_readers)
1488 n_index_records = (ffile->readers[FILE_INDEX].file_size - INDEX_HEADER_SIZE)
1489 / SIZEOF_INDEX_ENTRY;
1490 else
1491 {
1492 g_set_error (error, GSK_G_ERROR_DOMAIN,
1493 GSK_ERROR_INVALID_STATE,
1494 "flat file in error state");
1495 return FALSE;
1496 }
1497
1498 if (n_index_records == 0)
1499 {
1500 query_inout->found = FALSE;
1501 return TRUE;
1502 }
1503 first = 0;
1504 n = n_index_records;
1505 GskTableBuffer firstkey;
1506 gsk_table_buffer_init (&firstkey);
1507 while (n > 1)
1508 {
1509 guint64 mid = first + n / 2;
1510 gint compare_rv;
1511
1512 /* read index entry */
1513 if (!do_pread (ffile, FILE_INDEX, mid * SIZEOF_INDEX_ENTRY + INDEX_HEADER_SIZE, SIZEOF_INDEX_ENTRY, index_entry_data, error))
1514 {
1515 gsk_table_buffer_clear (&firstkey);
1516 return FALSE;
1517 }
1518 index_entry_deserialize (index_entry_data, &index_entry);
1519
1520 /* read key */
1521 gsk_table_buffer_set_len (&firstkey, index_entry.firstkeys_len);
1522 if (!do_pread (ffile, FILE_FIRSTKEYS, index_entry.firstkeys_offset, index_entry.firstkeys_len,
1523 firstkey.data, error))
1524 {
1525 gsk_table_buffer_clear (&firstkey);
1526 return FALSE;
1527 }
1528
1529 /* invoke comparator */
1530 compare_rv = query_inout->compare (index_entry.firstkeys_len,
1531 firstkey.data,
1532 query_inout->compare_data);
1533
1534 if (compare_rv < 0)
1535 {
1536 n = mid - first;
1537 index_entry_up_to_date = FALSE;
1538 }
1539 else if (compare_rv > 0)
1540 {
1541 n = first + n - mid;
1542 first = mid;
1543 index_entry_up_to_date = TRUE;
1544 }
1545 else
1546 {
1547 CacheEntryRecord *record;
1548 cache_entry = cache_entry_force (ffile, mid,
1549 &index_entry, firstkey.data,
1550 error);
1551 if (cache_entry == NULL)
1552 {
1553 gsk_table_buffer_clear (&firstkey);
1554 return FALSE;
1555 }
1556 record = cache_entry->records + 0;
1557 memcpy (gsk_table_buffer_set_len (&query_inout->value, record->value_len),
1558 record->value_data, record->value_len);
1559 query_inout->found = TRUE;
1560 gsk_table_buffer_clear (&firstkey);
1561 return TRUE;
1562 }
1563 }
1564
1565 if (!index_entry_up_to_date)
1566 {
1567 /* read index entry */
1568 if (!do_pread (ffile, FILE_INDEX, first * SIZEOF_INDEX_ENTRY + INDEX_HEADER_SIZE, SIZEOF_INDEX_ENTRY, index_entry_data, error))
1569 return FALSE;
1570 index_entry_deserialize (index_entry_data, &index_entry);
1571
1572 /* read firstkey */
1573 gsk_table_buffer_set_len (&firstkey, index_entry.firstkeys_len);
1574 if (!do_pread (ffile, FILE_FIRSTKEYS, index_entry.firstkeys_offset, index_entry.firstkeys_len,
1575 firstkey.data, error))
1576 {
1577 gsk_table_buffer_clear (&firstkey);
1578 return FALSE;
1579 }
1580 }
1581
1582 /* uncompress block, cache */
1583 cache_entry = cache_entry_force (ffile, first,
1584 &index_entry, firstkey.data,
1585 error);
1586 if (cache_entry == NULL)
1587 {
1588 gsk_table_buffer_clear (&firstkey);
1589 return FALSE;
1590 }
1591
1592 /* bsearch the uncompressed block */
1593 {
1594 guint first = 0;
1595 guint n = cache_entry->n_entries;
1596 while (n > 1)
1597 {
1598 guint mid = first + n / 2;
1599 CacheEntryRecord *record = cache_entry->records + mid;
1600 int compare_rv = query_inout->compare (record->key_len, record->key_data,
1601 query_inout->compare_data);
1602 if (compare_rv < 0)
1603 {
1604 n = mid - first;
1605 }
1606 else if (compare_rv > 0)
1607 {
1608 n = first + n - mid;
1609 first = mid;
1610 }
1611 else
1612 {
1613 memcpy (gsk_table_buffer_set_len (&query_inout->value, record->value_len),
1614 record->value_data, record->value_len);
1615 query_inout->found = TRUE;
1616 return TRUE;
1617 }
1618 }
1619 if (n == 1 && first < cache_entry->n_entries)
1620 {
1621 CacheEntryRecord *record = cache_entry->records + first;
1622 int compare_rv = query_inout->compare (record->key_len, record->key_data,
1623 query_inout->compare_data);
1624 if (compare_rv == 0)
1625 {
1626 memcpy (gsk_table_buffer_set_len (&query_inout->value, record->value_len),
1627 record->value_data, record->value_len);
1628 query_inout->found = TRUE;
1629 return TRUE;
1630 }
1631 }
1632 }
1633 query_inout->found = FALSE;
1634 return TRUE;
1635 }
1636
1637 /* --- reader api --- */
1638 static void
read_and_uncompress_chunk(FlatFileReader * freader)1639 read_and_uncompress_chunk (FlatFileReader *freader)
1640 {
1641 /* read index fp record, or set eof flag or error */
1642 guint8 index_data[SIZEOF_INDEX_ENTRY];
1643 IndexEntry index_entry;
1644 guint8 *firstkey;
1645 guint f;
1646
1647 /* set up state before reading */
1648 for (f = 0; f < 3; f++)
1649 freader->chunk_file_offsets[f] = FTELLO (freader->fps[f]);
1650
1651 if (FREAD (index_data, SIZEOF_INDEX_ENTRY, 1, freader->fps[FILE_INDEX]) != 1)
1652 {
1653 freader->base_reader.eof = 1;
1654 return;
1655 }
1656 index_entry_deserialize (index_data, &index_entry);
1657
1658 #if DEBUG_READ_CHUNK
1659 g_message ("chunk offsets=%llu,%llu,%llu; ie.compressed_len=%u",
1660 freader->chunk_file_offsets[0],
1661 freader->chunk_file_offsets[1],
1662 freader->chunk_file_offsets[2],
1663 index_entry.compressed_data_len);
1664 #endif
1665
1666 /* allocate buffers in one piece */
1667 firstkey = g_malloc (index_entry.firstkeys_len + index_entry.compressed_data_len);
1668 guint8 *compressed_data;
1669 compressed_data = firstkey + index_entry.firstkeys_len;
1670
1671 /* read firstkey */
1672 if (index_entry.firstkeys_len != 0
1673 && FREAD (firstkey, index_entry.firstkeys_len, 1, freader->fps[FILE_FIRSTKEYS]) != 1)
1674 {
1675 freader->base_reader.error = g_error_new (GSK_G_ERROR_DOMAIN,
1676 GSK_ERROR_PREMATURE_EOF,
1677 "premature eof in firstkey file [firstkey len=%u]", index_entry.firstkeys_len);
1678 g_free (firstkey);
1679 return;
1680 }
1681
1682 /* read data */
1683 if (FREAD (compressed_data, index_entry.compressed_data_len, 1, freader->fps[FILE_DATA]) != 1)
1684 {
1685 freader->base_reader.error = g_error_new (GSK_G_ERROR_DOMAIN,
1686 GSK_ERROR_PREMATURE_EOF,
1687 "premature eof in compressed-data file [compressed_data_len=%u]",
1688 index_entry.compressed_data_len);
1689 g_free (firstkey);
1690 return;
1691 }
1692
1693 /* do the actual un-gzipping and scanning */
1694 freader->cache_entry = cache_entry_deserialize (freader->index_entry_index++,
1695 index_entry.firstkeys_len, firstkey,
1696 index_entry.compressed_data_len, compressed_data,
1697 &freader->base_reader.error);
1698 g_free (firstkey);
1699 }
1700
1701 static inline void
init_base_reader_record(FlatFileReader * freader)1702 init_base_reader_record (FlatFileReader *freader)
1703 {
1704 CacheEntryRecord *record = freader->cache_entry->records + freader->record_index;
1705 freader->base_reader.key_len = record->key_len;
1706 freader->base_reader.key_data = record->key_data;
1707 freader->base_reader.value_len = record->value_len;
1708 freader->base_reader.value_data = record->value_data;
1709 }
1710
1711 static void
reader_advance(GskTableReader * reader)1712 reader_advance (GskTableReader *reader)
1713 {
1714 FlatFileReader *freader = (FlatFileReader *) reader;
1715 if (freader->base_reader.eof || freader->base_reader.error)
1716 return;
1717 if (++freader->record_index == freader->cache_entry->n_entries)
1718 {
1719 g_free (freader->cache_entry);
1720 freader->cache_entry = NULL;
1721 read_and_uncompress_chunk (freader);
1722 if (reader->eof || reader->error != NULL)
1723 return;
1724 freader->record_index = 0;
1725 }
1726 init_base_reader_record (freader);
1727 }
1728 static void
reader_destroy(GskTableReader * reader)1729 reader_destroy (GskTableReader *reader)
1730 {
1731 guint f;
1732 FlatFileReader *freader = (FlatFileReader *) reader;
1733 if (freader->cache_entry)
1734 g_free (freader->cache_entry);
1735 for (f = 0; f < N_FILES; f++)
1736 if (freader->fps[f] != NULL)
1737 fclose (freader->fps[f]);
1738 g_slice_free (FlatFileReader, freader);
1739 }
1740
1741 static FlatFileReader *
reader_open_fps(GskTableFile * file,const char * dir,GError ** error)1742 reader_open_fps (GskTableFile *file,
1743 const char *dir,
1744 GError **error)
1745 {
1746 FlatFileReader *freader = g_slice_new (FlatFileReader);
1747 guint f;
1748 freader->base_reader.eof = FALSE;
1749 freader->base_reader.error = NULL;
1750 for (f = 0; f < N_FILES; f++)
1751 {
1752 char fname_buf[GSK_TABLE_MAX_PATH];
1753 gsk_table_mk_fname (fname_buf, dir, file->id, file_extensions[f]);
1754 freader->fps[f] = fopen (fname_buf, "rb");
1755 if (freader->fps[f] == NULL)
1756 {
1757 g_set_error (error, GSK_G_ERROR_DOMAIN,
1758 GSK_ERROR_FILE_OPEN,
1759 "error opening %s for reading: %s",
1760 fname_buf, g_strerror (errno));
1761 g_slice_free (FlatFileReader, freader);
1762 return NULL;
1763 }
1764 }
1765 freader->base_reader.advance = reader_advance;
1766 freader->base_reader.destroy = reader_destroy;
1767 return freader;
1768 }
1769 static FlatFileReader *
reader_open_eof(void)1770 reader_open_eof (void)
1771 {
1772 FlatFileReader *freader = g_slice_new (FlatFileReader);
1773 guint f;
1774 freader->base_reader.eof = TRUE;
1775 freader->base_reader.error = NULL;
1776 for (f = 0; f < N_FILES; f++)
1777 freader->fps[f] = NULL;
1778 freader->base_reader.advance = reader_advance;
1779 freader->base_reader.destroy = reader_destroy;
1780 return freader;
1781 }
1782
1783 static GskTableReader *
flat__create_reader(GskTableFile * file,const char * dir,GError ** error)1784 flat__create_reader (GskTableFile *file,
1785 const char *dir,
1786 GError **error)
1787 {
1788 FlatFileReader *freader = reader_open_fps (file, dir, error);
1789 guint64 ief_header;
1790 if (freader == NULL)
1791 return NULL;
1792
1793 if (FREAD (&ief_header, 8, 1, freader->fps[FILE_INDEX]) != 1)
1794 {
1795 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_READ,
1796 "premature eof reading index file header");
1797 return NULL;
1798 }
1799
1800 read_and_uncompress_chunk (freader);
1801 if (!freader->base_reader.eof && freader->base_reader.error == NULL)
1802 {
1803 freader->record_index = 0;
1804 init_base_reader_record (freader);
1805 }
1806
1807 return &freader->base_reader;
1808 }
1809
1810 /* you must always be able to get reader state */
1811 static gboolean
flat__get_reader_state(GskTableFile * file,GskTableReader * reader,guint * state_len_out,guint8 ** state_data_out,GError ** error)1812 flat__get_reader_state (GskTableFile *file,
1813 GskTableReader *reader,
1814 guint *state_len_out,
1815 guint8 **state_data_out,
1816 GError **error)
1817 {
1818 FlatFileReader *freader = (FlatFileReader *) reader;
1819 guint8 *data;
1820 guint f;
1821 /* state is:
1822 1 byte state -- 0=in progress; 1=eof
1823 if state==0:
1824 8 bytes index file offset
1825 8 bytes firstkeys file offset
1826 8 bytes data offset
1827 4 bytes index into the compressed byte to return next
1828 if state==1:
1829 no other data
1830 */
1831 g_assert (reader->error == NULL);
1832 if (reader->eof)
1833 {
1834 *state_len_out = 1;
1835 *state_data_out = g_malloc (1);
1836 (*state_data_out)[0] = 1;
1837 return TRUE;
1838 }
1839 *state_len_out = 1 + 8 + 8 + 8 + 4;
1840 data = *state_data_out = g_malloc (*state_len_out);
1841 data[0] = 0;
1842 for (f = 0; f < N_FILES; f++)
1843 {
1844 guint64 tmp64 = freader->chunk_file_offsets[f];
1845 guint64 tmp_le = GUINT64_TO_LE (tmp64);
1846 memcpy (data + 1 + 8 * f, &tmp_le, 8);
1847 }
1848 {
1849 guint32 tmp32 = freader->record_index;
1850 guint32 tmp32_le = GUINT32_TO_LE (tmp32);
1851 memcpy (data + 1 + 8*3, &tmp32_le, 4);
1852 }
1853 return TRUE;
1854 }
1855
1856 static GskTableReader *
flat__recreate_reader(GskTableFile * file,const char * dir,guint state_len,const guint8 * state_data,GError ** error)1857 flat__recreate_reader (GskTableFile *file,
1858 const char *dir,
1859 guint state_len,
1860 const guint8 *state_data,
1861 GError **error)
1862 {
1863 FlatFileReader *freader;
1864 guint f;
1865 if (freader == NULL)
1866 return NULL;
1867 switch (state_data[0])
1868 {
1869 case 0: /* in progress */
1870 freader = reader_open_fps (file, dir, error);
1871 if (freader == NULL)
1872 return NULL;
1873
1874 /* seek */
1875 for (f = 0; f < 3; f++)
1876 {
1877 guint64 tmp_le, tmp;
1878 memcpy (&tmp_le, state_data + 1 + 8*f, 8);
1879 tmp = GUINT64_FROM_LE (tmp_le);
1880 if (FSEEKO (freader->fps[f], tmp, SEEK_SET) < 0)
1881 {
1882 guint tmp_f;
1883 g_set_error (error, GSK_G_ERROR_DOMAIN,
1884 GSK_ERROR_FILE_SEEK,
1885 "error seeking %s file: %s",
1886 file_extensions[f], g_strerror (errno));
1887 for (tmp_f = 0; tmp_f < N_FILES; tmp_f++)
1888 fclose (freader->fps[tmp_f]);
1889 g_slice_free (FlatFileReader, freader);
1890 return NULL;
1891 }
1892 }
1893
1894 read_and_uncompress_chunk (freader);
1895
1896 if (freader->base_reader.eof
1897 || freader->base_reader.error != NULL)
1898 {
1899 if (freader->base_reader.error)
1900 g_propagate_error (error, freader->base_reader.error);
1901 else
1902 g_set_error (error, GSK_G_ERROR_DOMAIN,
1903 GSK_ERROR_PREMATURE_EOF,
1904 "unexpected eof restoring file reader");
1905 for (f = 0; f < N_FILES; f++)
1906 fclose (freader->fps[f]);
1907 g_slice_free (FlatFileReader, freader);
1908 return NULL;
1909 }
1910 {
1911 guint32 tmp_le;
1912 memcpy (&tmp_le, state_data + 1 + 3*8, 4);
1913 freader->record_index = GUINT32_FROM_LE (tmp_le);
1914 if (freader->record_index >= freader->cache_entry->n_entries)
1915 {
1916 g_set_error (error, GSK_G_ERROR_DOMAIN,
1917 GSK_ERROR_PREMATURE_EOF,
1918 "record index out-of-bounds in state-data");
1919 for (f = 0; f < N_FILES; f++)
1920 fclose (freader->fps[f]);
1921 g_slice_free (FlatFileReader, freader);
1922 return NULL;
1923 }
1924 }
1925 init_base_reader_record (freader);
1926
1927 break;
1928 case 1: /* eof */
1929 g_assert (state_len == 1);
1930 freader = reader_open_eof ();
1931 break;
1932 default:
1933 g_set_error (error, GSK_G_ERROR_DOMAIN,
1934 GSK_ERROR_PARSE,
1935 "unknown state for reader");
1936 return NULL;
1937 }
1938 return &freader->base_reader;
1939 }
1940
1941
1942 /* destroying files and factories */
1943 static gboolean
flat__destroy_file(GskTableFile * file,const char * dir,gboolean erase,GError ** error)1944 flat__destroy_file (GskTableFile *file,
1945 const char *dir,
1946 gboolean erase,
1947 GError **error)
1948 {
1949 FlatFactory *ffactory = (FlatFactory *) file->factory;
1950 FlatFile *ffile = (FlatFile *) file;
1951 FlatFileBuilder *builder = ffile->builder;
1952 guint f;
1953 if (builder != NULL)
1954 {
1955 for (f = 0; f < N_FILES; f++)
1956 mmap_writer_clear (builder->writers + f);
1957 builder_recycle (ffactory, builder);
1958 }
1959 else if (ffile->has_readers)
1960 {
1961 for (f = 0; f < N_FILES; f++)
1962 mmap_reader_clear (ffile->readers + f);
1963 }
1964 for (f = 0; f < N_FILES; f++)
1965 close (ffile->fds[f]);
1966 if (erase)
1967 {
1968 for (f = 0; f < N_FILES; f++)
1969 {
1970 char fname_buf[GSK_TABLE_MAX_PATH];
1971 gsk_table_mk_fname (fname_buf, dir, file->id, file_extensions[f]);
1972 unlink (fname_buf);
1973 }
1974 }
1975 g_slice_free (FlatFile, ffile);
1976 return TRUE;
1977 }
1978
1979 static void
flat__destroy_factory(GskTableFileFactory * factory)1980 flat__destroy_factory (GskTableFileFactory *factory)
1981 {
1982 /* static factory */
1983 }
1984
1985
1986 /* for now, return a static factory object */
gsk_table_file_factory_new_flat(void)1987 GskTableFileFactory *gsk_table_file_factory_new_flat (void)
1988 {
1989 static FlatFactory the_factory =
1990 {
1991 {
1992 flat__create_file,
1993 flat__open_building_file,
1994 flat__open_file,
1995 flat__feed_entry,
1996 flat__done_feeding,
1997 flat__get_build_state,
1998 flat__build_file,
1999 flat__release_build_data,
2000 flat__query_file,
2001 flat__create_reader,
2002 flat__get_reader_state,
2003 flat__recreate_reader,
2004 flat__destroy_file,
2005 flat__destroy_factory
2006 },
2007 16384,
2008 3, /* zlib compression level */
2009 0, /* n recycled builders */
2010 8, /* max recycled builders */
2011 NULL, /* recycled builder list */
2012 24 /* max cache entries */
2013 };
2014
2015 return &the_factory.base_factory;
2016 }
2017