1 /* for pread() */
2 
3 #define _XOPEN_SOURCE 500
4 #define _GNU_SOURCE             /* for unlocked_stdio */
5 #include "config.h"
6 
7 /* TODO: handle EINTR everywhere. */
8 
9 #include <string.h>
10 #include <stdio.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <sys/mman.h>
14 #include <zlib.h>
15 #include <errno.h>
16 
17 #include "gskutils.h"
18 #include "gskerror.h"
19 #include "gsklistmacros.h"
20 #include "gsktable.h"
21 #include "gsktable-file.h"
22 #include "gsktable-helpers.h"
23 
24 #define FTELLO          ftello          /* TODO: track the offsets ourselves */
25 #define FSEEKO          fseeko
26 #define FREAD           fread
27 
28 /* debug invocation of cache-entry force */
29 #define DEBUG_CACHE_ENTRY_FORCE                 0
30 
31 /* debug compressed data contents */
32 #define DEBUG_DUMP_COMPRESSED_DATA              0
33 
34 /* debug chunk sizes and offsets while reading */
35 #define DEBUG_READ_CHUNK                        0
36 
37 /* debug serialization and deserialization of cache entries */
38 #define DEBUG_ENTRY_SERIALIZATION               0
39 
40 /* debug flushing the current chunk of data */
41 #define DEBUG_FLUSH                             0
42 
43 /* debug serialization of fixed-length index-entry objects */
44 #define DEBUG_INDEX_ENTRIES                     0
45 
46 typedef struct _FlatFactory FlatFactory;
47 typedef struct _MmapReader MmapReader;
48 typedef struct _MmapWriter MmapWriter;
49 typedef struct _FlatFileBuilder FlatFileBuilder;
50 typedef struct _FlatFile FlatFile;
51 typedef struct _FlatFileReader FlatFileReader;
52 
53 typedef enum
54 {
55   FILE_INDEX,
56   FILE_FIRSTKEYS,
57   FILE_DATA
58 } WhichFile;
59 #define N_FILES 3
60 
61 /* MUST be a power-of-two */
62 #define MMAP_WRITER_SIZE                (512*1024)
63 
64 #define MAX_MMAP                (1024*1024)
65 
66 static const char *file_extensions[N_FILES] = { "index", "firstkeys", "data" };
67 
68 struct _FlatFactory
69 {
70   GskTableFileFactory base_factory;
71   guint bytes_per_chunk;
72   guint compression_level;
73   guint n_recycled_builders;
74   guint max_recycled_builders;
75   FlatFileBuilder *recycled_builders;
76   guint max_cache_entries;
77 };
78 
79 
80 struct _MmapReader
81 {
82   gint fd;
83   guint64 file_size;
84   guint8 *mmapped;
85   GskTableBuffer tmp_buf;               /* only if !mmapped */
86 };
87 
88 struct _MmapWriter
89 {
90   gint fd;
91   guint64 file_size;
92   guint64 mmap_offset;
93   guint8 *mmapped;
94   guint cur_offset;             /* in mmapped */
95   GskTableBuffer tmp_buf;
96 };
97 
98 
99 struct _FlatFileBuilder
100 {
101   GskTableBuffer input;
102 
103   gboolean has_last_key;
104   GskTableBuffer first_key;
105   GskTableBuffer last_key;
106 
107   GskTableBuffer uncompressed;
108   GskTableBuffer compressed;
109 
110   guint n_compressed_entries;
111   guint uncompressed_data_len;
112 
113   MmapWriter writers[N_FILES];
114 
115   z_stream compressor;
116   GskMemPool compressor_allocator;
117   guint8 *compressor_allocator_scratchpad;
118   gsize compressor_allocator_scratchpad_len;
119 
120   FlatFileBuilder *next_recycled_builder;
121 };
122 
123 typedef struct _CacheEntry CacheEntry;
124 typedef struct _CacheEntryRecord CacheEntryRecord;
125 struct _CacheEntryRecord
126 {
127   guint key_len;
128   const guint8 *key_data;
129   guint value_len;
130   const guint8 *value_data;
131 };
132 struct _CacheEntry
133 {
134   guint n_entries;
135   guint64 index;
136   CacheEntry *prev_lru, *next_lru;
137   CacheEntry *bin_next;
138   CacheEntryRecord records[1];          /* must be last! */
139 };
140 
141 struct _FlatFile
142 {
143   GskTableFile base_file;
144   gint         fds[N_FILES];
145   FlatFileBuilder *builder;
146 
147   gboolean has_readers;         /* builder and has_readers are exclusive: they
148                                    cannot be set at the same time */
149   MmapReader readers[N_FILES];
150 
151   guint cache_entries_len;
152   CacheEntry **cache_entries;
153   guint cache_entries_count;
154   guint max_cache_entries;
155   CacheEntry *most_recently_used, *least_recently_used;
156 };
157 
158 struct _FlatFileReader
159 {
160   GskTableReader base_reader;
161   FILE *fps[N_FILES];
162   guint64 chunk_file_offsets[N_FILES];
163   CacheEntry *cache_entry;
164   guint record_index;
165   guint64 index_entry_index;
166 };
167 
168 #define GET_LRU_LIST(file) \
169   CacheEntry *, (file)->most_recently_used, (file)->least_recently_used, \
170   prev_lru, next_lru
171 
172 typedef struct _IndexEntry IndexEntry;
173 struct _IndexEntry
174 {
175   guint64 firstkeys_offset;
176   guint32 firstkeys_len;
177   guint64 compressed_data_offset;
178   guint32 compressed_data_len;
179 };
180 
181 static gboolean
182 do_pread (FlatFile *ffile,
183           WhichFile f,
184           guint64   offset,
185           guint     length,
186           guint8    *ptr_out,
187           GError   **error);
188 
189 static guint
uint32_vli_encode(guint32 to_encode,guint8 * buf)190 uint32_vli_encode (guint32 to_encode,
191                    guint8 *buf)         /* min length 5 */
192 {
193   if (to_encode < 0x80)
194     {
195       buf[0] = to_encode;
196       return 1;
197     }
198   else if (to_encode < (1<<14))
199     {
200       buf[0] = 0x80 | (to_encode >> 7);
201       buf[1] = to_encode & 0x7f;
202       return 2;
203     }
204   else if (to_encode < (1<<21))
205     {
206       buf[0] = 0x80 | (to_encode >> 14);
207       buf[1] = 0x80 | (to_encode >> 7);
208       buf[2] = to_encode & 0x7f;
209       return 3;
210     }
211   else if (to_encode < (1<<28))
212     {
213       buf[0] = 0x80 | (to_encode >> 21);
214       buf[1] = 0x80 | (to_encode >> 14);
215       buf[2] = 0x80 | (to_encode >> 7);
216       buf[3] = to_encode & 0x7f;
217       return 4;
218     }
219   else
220     {
221       buf[0] = 0x80 | (to_encode >> 28);
222       buf[1] = 0x80 | (to_encode >> 21);
223       buf[2] = 0x80 | (to_encode >> 14);
224       buf[3] = 0x80 | (to_encode >> 7);
225       buf[4] = to_encode & 0x7f;
226       return 5;
227     }
228 }
229 static guint
uint32_vli_decode(const guint8 * input,guint32 * decoded)230 uint32_vli_decode (const guint8 *input,
231                    guint32      *decoded)
232 {
233   guint32 val = input[0] & 0x7f;
234   if (input[0] & 0x80)
235     {
236       guint used = 1;
237       do
238         {
239           val <<= 7;
240           val |= (input[used] & 0x7f);
241         }
242       while ((input[used++] & 0x80) != 0);
243       *decoded = val;
244       return used;
245     }
246   else
247     {
248       *decoded = val;
249       return 1;
250     }
251 }
252 
253 typedef struct _CacheEntryTmpRecord CacheEntryTmpRecord;
254 struct _CacheEntryTmpRecord
255 {
256   guint prefix_len;
257   guint key_len;
258   guint value_len;
259   const guint8 *keydata;          /* key without prefix */
260   const guint8 *value;
261 };
262 static CacheEntry *
cache_entry_deserialize(guint64 index,guint firstkey_len,const guint8 * firstkey_data,guint compressed_data_len,const guint8 * compressed_data,GError ** error)263 cache_entry_deserialize (guint64       index,
264                          guint         firstkey_len,
265                          const guint8 *firstkey_data,
266                          guint         compressed_data_len,
267                          const guint8 *compressed_data,
268                          GError      **error)
269 {
270   guint used, tmp;
271   guint n_compressed_entries, uncompressed_data_len;
272   CacheEntryTmpRecord *records, *to_free = NULL;
273   guint8 *uncompressed_data, *to_free2 = NULL;
274   guint i;
275   int zrv;
276   guint8 *uc_at;
277   guint data_size;
278   CacheEntry *rv;
279   const guint8 *last_key;
280   guint8 *heap_at;
281   used = uint32_vli_decode (compressed_data, &n_compressed_entries);
282   used += uint32_vli_decode (compressed_data + used, &uncompressed_data_len);
283 
284 #if DEBUG_ENTRY_SERIALIZATION
285   g_message ("deserialize %llu: n_compressed_entry=%u, uncompressed_data_len=%u, compressed_header_len=%u, actual zlib compressed_len=%u", index, n_compressed_entries, uncompressed_data_len, used, compressed_data_len - used);
286 #if DEBUG_DUMP_COMPRESSED_DATA
287   {
288     char *hex = gsk_escape_memory_hex (compressed_data + used, compressed_data_len - used);
289     g_message ("  compressed_data=%s", hex);
290     g_free (hex);
291   }
292 #endif
293 #endif
294 
295   /* uncompress */
296   if (uncompressed_data_len < 32*1024)
297     uncompressed_data = g_alloca (uncompressed_data_len);
298   else
299     uncompressed_data = to_free2 = g_malloc (uncompressed_data_len);
300 
301   z_stream uncompress_buf;
302   memset (&uncompress_buf, 0, sizeof (uncompress_buf));
303   inflateInit (&uncompress_buf);
304   uncompress_buf.avail_in = compressed_data_len - used;
305   uncompress_buf.next_in = (guint8 *) compressed_data + used;
306   uncompress_buf.avail_out = uncompressed_data_len;
307   uncompress_buf.next_out = uncompressed_data;
308   zrv = inflate (&uncompress_buf, Z_SYNC_FLUSH);
309   if (zrv != Z_OK)
310     {
311       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_CORRUPT,
312                    "error uncompressing zlib compressed data (zrv=%d)",
313                    zrv);
314       g_free (to_free2);
315       return NULL;
316     }
317 
318   /* parse data */
319   if (n_compressed_entries < 512)
320     records = g_newa (CacheEntryTmpRecord, n_compressed_entries);
321   else
322     records = to_free = g_new (CacheEntryTmpRecord, n_compressed_entries);
323 
324   uc_at = uncompressed_data;
325   data_size = 0;
326 #if DEBUG_ENTRY_SERIALIZATION
327   g_message ("uncompressed_data_len=%u, n_compressed_entries=%u",uncompressed_data_len,n_compressed_entries);
328 #endif
329   for (i = 0; i < n_compressed_entries; i++)
330     {
331       if (i > 0)
332         {
333           uc_at += uint32_vli_decode (uc_at, &tmp);
334           records[i].prefix_len = tmp;
335           uc_at += uint32_vli_decode (uc_at, &tmp);
336           records[i].key_len = records[i].prefix_len + tmp;
337           records[i].keydata = uc_at;
338           uc_at += tmp;              /* skip keydata for now */
339         }
340       else
341         {
342           records[i].key_len = firstkey_len;
343           records[i].keydata = firstkey_data;
344           records[i].prefix_len = 0;
345         }
346       uc_at += uint32_vli_decode (uc_at, &tmp);
347       records[i].value_len = tmp;
348       records[i].value = uc_at;
349       uc_at += tmp;
350 
351       data_size += records[i].key_len + records[i].value_len;
352     }
353   if (uc_at - uncompressed_data != (gssize) uncompressed_data_len)
354     {
355       g_set_error (error, GSK_G_ERROR_DOMAIN,
356                    GSK_ERROR_CORRUPT,
357                    "data corrupt uncompressing block (distance %d)",
358                    (int)(uc_at-uncompressed_data+uncompressed_data_len));
359       g_free (to_free);
360       g_free (to_free2);
361       return NULL;
362     }
363 
364   rv = g_malloc (sizeof (CacheEntry)
365                  + (n_compressed_entries-1) * sizeof (CacheEntryRecord)
366                  + data_size);
367   last_key = NULL;
368   heap_at = (guint8 *) (rv->records + n_compressed_entries);
369 
370   rv->n_entries = n_compressed_entries;
371   rv->index = index;
372 
373   for (i = 0; i < n_compressed_entries; i++)
374     {
375       guint key_len = records[i].key_len, pref_len = records[i].prefix_len;
376       guint val_len = records[i].value_len;
377       rv->records[i].key_len = key_len;
378       rv->records[i].value_len = val_len;
379       rv->records[i].key_data = heap_at;
380       memcpy (heap_at, last_key, pref_len);
381       memcpy (heap_at + pref_len, records[i].keydata, key_len - pref_len);
382       heap_at += key_len;
383       memcpy (heap_at, records[i].value, val_len);
384       rv->records[i].value_data = heap_at;
385       heap_at += val_len;
386       last_key = rv->records[i].key_data;
387     }
388   g_free (to_free);
389   g_free (to_free2);
390   return rv;
391 }
392 
393 static CacheEntry *
cache_entry_force(FlatFile * ffile,guint64 index,IndexEntry * index_entry,guint8 * firstkey_data,GError ** error)394 cache_entry_force (FlatFile  *ffile,
395                    guint64    index,
396                    IndexEntry *index_entry,
397                    guint8     *firstkey_data,
398                    GError    **error)
399 {
400   guint bin;
401   CacheEntry *entry;
402 #if DEBUG_CACHE_ENTRY_FORCE
403   g_message ("cache_entry_force: index=%llu [key offset/length=%llu/%u; data offset/length=%llu/%u]", index,index_entry->firstkeys_offset,index_entry->firstkeys_len, index_entry->compressed_data_offset, index_entry->compressed_data_len);
404 #endif
405   if (ffile->cache_entries_len == 0)
406     {
407       ffile->cache_entries_len = g_spaced_primes_closest (ffile->max_cache_entries);
408       ffile->cache_entries = g_new0 (CacheEntry *, ffile->cache_entries_len);
409     }
410   bin = (guint) index % ffile->cache_entries_len;
411   for (entry = ffile->cache_entries[bin]; entry != NULL; entry = entry->bin_next)
412     if (entry->index == index)
413       {
414         if (entry->prev_lru != NULL)
415           {
416             GSK_LIST_REMOVE (GET_LRU_LIST (ffile), entry);
417             GSK_LIST_PREPEND (GET_LRU_LIST (ffile), entry);
418           }
419         return entry;
420       }
421 
422   /* possibly evict old cache entry */
423   if (ffile->cache_entries_count == ffile->max_cache_entries)
424     {
425       CacheEntry *evicted = ffile->least_recently_used;
426       guint bin = (guint) evicted->index % ffile->cache_entries_len;
427       CacheEntry **pprev;
428       GSK_LIST_REMOVE_LAST (GET_LRU_LIST (ffile));
429 
430       /* remove from hash-table */
431       for (pprev = ffile->cache_entries + bin;
432            *pprev != evicted;
433            pprev = &((*pprev)->bin_next))
434         ;
435       *pprev = evicted->bin_next;
436 
437       ffile->cache_entries_count--;
438       g_free (evicted);
439     }
440 
441   /* create new entry */
442   guint8 *compressed_data;
443   compressed_data = g_malloc (index_entry->compressed_data_len);
444   if (!do_pread (ffile, FILE_DATA,
445                  index_entry->compressed_data_offset,
446                  index_entry->compressed_data_len,
447                  compressed_data, error))
448     {
449       g_free (compressed_data);
450       return NULL;
451     }
452 
453   /* deserialize the cache entry */
454   entry = cache_entry_deserialize (index,
455                                    index_entry->firstkeys_len, firstkey_data,
456                                    index_entry->compressed_data_len,
457                                    compressed_data,
458                                    error);
459   if (entry == NULL)
460     {
461       g_free (compressed_data);
462       return NULL;
463     }
464   entry->bin_next = ffile->cache_entries[bin];
465   ffile->cache_entries[bin] = entry;
466   ffile->cache_entries_count++;
467   g_free (compressed_data);
468   GSK_LIST_PREPEND (GET_LRU_LIST (ffile), entry);
469 
470   return entry;
471 }
472 
473 /* --- mmap reading implementation --- */
474 static gboolean
mmap_reader_init(MmapReader * reader,gint fd,GError ** error)475 mmap_reader_init (MmapReader     *reader,
476                   gint            fd,
477                   GError        **error)
478 {
479   struct stat stat_buf;
480   reader->fd = fd;
481   if (fstat (fd, &stat_buf) < 0)
482     {
483       g_set_error (error, GSK_G_ERROR_DOMAIN,
484                    GSK_ERROR_FILE_STAT,
485                    "error stating fd %d: %s",
486                    fd, g_strerror (errno));
487       return FALSE;
488     }
489   reader->file_size = stat_buf.st_size;
490 
491   if (reader->file_size < MAX_MMAP)
492     {
493       reader->mmapped = mmap (NULL, reader->file_size, PROT_READ, MAP_SHARED, fd, 0);
494       if (reader->mmapped == NULL || reader->mmapped == MAP_FAILED)
495         {
496           reader->mmapped = NULL;
497           g_set_error (error, GSK_G_ERROR_DOMAIN,
498                        GSK_ERROR_FILE_MMAP,
499                        "error mmapping fd %d: %s",
500                        fd, g_strerror (errno));
501           return FALSE;
502         }
503     }
504   else
505     {
506       reader->mmapped = NULL;
507       gsk_table_buffer_init (&reader->tmp_buf);
508     }
509   return TRUE;
510 }
511 
512 static gboolean
mmap_reader_pread(MmapReader * reader,guint64 offset,guint length,guint8 * data_out,GError ** error)513 mmap_reader_pread (MmapReader     *reader,
514                    guint64         offset,
515                    guint           length,
516                    guint8         *data_out,
517                    GError        **error)
518 {
519   g_assert (offset + length <= reader->file_size);
520   if (reader->mmapped)
521     {
522       memcpy (data_out, reader->mmapped + (gsize)offset, length);
523       return TRUE;
524     }
525   else
526     {
527       gssize rv = pread (reader->fd, data_out, length, offset);
528       if (rv < 0)
529         {
530           g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PREAD,
531                        "error calling pread(): %s",
532                        g_strerror (errno));
533           return FALSE;
534         }
535       else if (rv < (gssize) length)
536         {
537           g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PREMATURE_EOF,
538                        "premature end-of-file calling pread() (mmap reader pread; offset=%"G_GUINT64_FORMAT"; length=%u, got=%u)",
539                        offset, length, (guint) rv);
540           return FALSE;
541         }
542       return TRUE;
543     }
544 }
545 
546 static void
mmap_reader_clear(MmapReader * reader)547 mmap_reader_clear (MmapReader *reader)
548 {
549   if (reader->mmapped)
550     munmap (reader->mmapped, reader->file_size);
551   else
552     gsk_table_buffer_clear (&reader->tmp_buf);
553 }
554 
555 static gboolean
mmap_writer_init_at(MmapWriter * writer,gint fd,guint64 offset,GError ** error)556 mmap_writer_init_at (MmapWriter *writer,
557                      gint        fd,
558                      guint64     offset,
559                      GError    **error)
560 {
561   guint64 mmap_offset = offset & (~(guint64)(MMAP_WRITER_SIZE-1));
562   struct stat stat_buf;
563   guint64 file_size;
564   writer->fd = fd;
565   if (fstat (fd, &stat_buf) < 0)
566     {
567       g_set_error (error, GSK_G_ERROR_DOMAIN,
568                    GSK_ERROR_FILE_STAT,
569                    "error getting size of file-descriptor %d: %s",
570                    fd, g_strerror (errno));
571       return FALSE;
572     }
573   file_size = stat_buf.st_size;
574   if (mmap_offset + MMAP_WRITER_SIZE > file_size)
575     {
576       if (ftruncate (fd, mmap_offset + MMAP_WRITER_SIZE) < 0)
577         {
578           g_set_error (error, GSK_G_ERROR_DOMAIN,
579                        GSK_ERROR_FILE_STAT,
580                        "error expanding mmap writer file size: %s",
581                        g_strerror (errno));
582           return FALSE;
583         }
584       file_size = mmap_offset + MMAP_WRITER_SIZE;
585     }
586   writer->file_size = file_size;
587   writer->mmap_offset = mmap_offset;
588   writer->mmapped = mmap (NULL, MMAP_WRITER_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED, fd, mmap_offset);
589   if (writer->mmapped == MAP_FAILED)
590     {
591       writer->mmapped = NULL;
592       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_MMAP,
593                    "error mmapping for writing: %s",
594                    g_strerror (errno));
595       return FALSE;
596     }
597   writer->cur_offset = offset - mmap_offset;
598   gsk_table_buffer_init (&writer->tmp_buf);
599   return TRUE;
600 }
601 
602 static inline guint64
mmap_writer_offset(MmapWriter * writer)603 mmap_writer_offset (MmapWriter *writer)
604 {
605   return writer->mmap_offset + writer->cur_offset;
606 }
607 
608 static gboolean
writer_advance_to_next_page(MmapWriter * writer,GError ** error)609 writer_advance_to_next_page (MmapWriter *writer,
610                              GError    **error)
611 {
612   munmap (writer->mmapped, MMAP_WRITER_SIZE);
613 
614   writer->mmap_offset += MMAP_WRITER_SIZE;
615 
616   if (writer->mmap_offset + MMAP_WRITER_SIZE > writer->file_size)
617     {
618       if (ftruncate (writer->fd, writer->mmap_offset + MMAP_WRITER_SIZE) < 0)
619         {
620           g_set_error (error, GSK_G_ERROR_DOMAIN,
621                        GSK_ERROR_FILE_STAT,
622                        "error expanding mmap writer file size: %s",
623                        g_strerror (errno));
624           return FALSE;
625         }
626       writer->file_size = writer->mmap_offset + MMAP_WRITER_SIZE;
627     }
628   writer->cur_offset = 0;
629   writer->mmapped = mmap (NULL, MMAP_WRITER_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED, writer->fd, writer->mmap_offset);
630   if (writer->mmapped == MAP_FAILED)
631     {
632       writer->mmapped = NULL;
633       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_MMAP,
634                    "mmap failed on writer: %s",
635                    g_strerror (errno));
636       return FALSE;
637     }
638 
639   return TRUE;
640 }
641 
642 static gboolean
mmap_writer_write(MmapWriter * writer,guint len,const guint8 * data,GError ** error)643 mmap_writer_write (MmapWriter   *writer,
644                    guint         len,
645                    const guint8 *data,
646                    GError      **error)
647 {
648   if (G_LIKELY (writer->cur_offset + len < MMAP_WRITER_SIZE))
649     {
650       memcpy (writer->mmapped + writer->cur_offset, data, len);
651       writer->cur_offset += len;
652     }
653   else
654     {
655       guint n_written = MMAP_WRITER_SIZE - writer->cur_offset;
656       memcpy (writer->mmapped + writer->cur_offset, data, n_written);
657 
658       /* advance to next page */
659       if (!writer_advance_to_next_page (writer, error))
660         return FALSE;
661 
662       while (G_UNLIKELY (n_written + MMAP_WRITER_SIZE <= len))
663         {
664           /* write a full page */
665           memcpy (writer->mmapped, data + n_written, MMAP_WRITER_SIZE);
666           n_written += MMAP_WRITER_SIZE;
667 
668           /* advance to next page */
669           if (!writer_advance_to_next_page (writer, error))
670             return FALSE;
671         }
672       if (G_LIKELY (n_written < len))
673         {
674           memcpy (writer->mmapped, data + n_written, len - n_written);
675           writer->cur_offset = len - n_written;
676         }
677     }
678   return TRUE;
679 }
680 
681 static gboolean
mmap_writer_pread(MmapWriter * writer,guint64 offset,guint length,guint8 * data_out,GError ** error)682 mmap_writer_pread (MmapWriter   *writer,
683                    guint64       offset,
684                    guint         length,
685                    guint8       *data_out,
686                    GError      **error)
687 {
688   g_assert (offset + length <= writer->mmap_offset + writer->cur_offset);
689   if (offset + length <= writer->mmap_offset)
690     {
691       /* pure pread() */
692       gssize rv = pread (writer->fd, data_out, length, offset);
693       if (rv < 0)
694         {
695           g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PREAD,
696                        "error calling pread(): %s",
697                        g_strerror (errno));
698           return FALSE;
699         }
700       else if (rv < (gssize) length)
701         {
702           g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PREMATURE_EOF,
703                        "premature end-of-file calling pread() (mmap writer pread; offset=%"G_GUINT64_FORMAT"; length=%u, got=%u; case 0)",
704                        offset, length, (guint) rv);
705           return FALSE;
706         }
707       return TRUE;
708     }
709   else if (offset < writer->mmap_offset)
710     {
711       /* pread() + memcpy() */
712       guint pread_len = writer->mmap_offset - offset;
713       gssize rv = pread (writer->fd, data_out, pread_len, offset);
714       if (rv < 0)
715         {
716           g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PREAD,
717                        "error calling pread(): %s",
718                        g_strerror (errno));
719           return FALSE;
720         }
721       else if (rv < (gssize) pread_len)
722         {
723           g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PREMATURE_EOF,
724                        "premature end-of-file calling pread() (mmap writer pread; offset=%"G_GUINT64_FORMAT"; length=%u, got=%u; case 1)",
725                        offset, length, (guint) rv);
726           return FALSE;
727         }
728       memcpy (data_out + pread_len, writer->mmapped, length - pread_len);
729       return TRUE;
730     }
731   else
732     {
733       /* pure memcpy() */
734       guint buf_offset = offset - writer->mmap_offset;
735       memcpy (data_out, writer->mmapped + buf_offset, length);
736       return TRUE;
737     }
738 }
739 
740 static void
mmap_writer_clear(MmapWriter * writer)741 mmap_writer_clear (MmapWriter *writer)
742 {
743   munmap (writer->mmapped, MMAP_WRITER_SIZE);
744   gsk_table_buffer_clear (&writer->tmp_buf);
745 }
746 
747 
748 /* --- index entry serialization --- */
749 #define SIZEOF_INDEX_ENTRY 24
750 #define INDEX_HEADER_SIZE   8    /* the number of records, as a LE int64 */
751 /* each entry in index file is:
752      8 bytes -- initial key offset
753      4 bytes -- initial key length
754      8 bytes -- data offset
755      4 bytes -- data length
756  */
757 
758 static void
index_entry_serialize(const IndexEntry * index_entry,guint8 * data_out)759 index_entry_serialize (const IndexEntry *index_entry,
760                        guint8           *data_out)
761 {
762   guint32 tmp32, tmp32_le;
763   guint64 tmp64, tmp64_le;
764 
765   tmp64 = index_entry->firstkeys_offset;
766   tmp64_le = GUINT64_TO_LE (tmp64);
767   memcpy (data_out + 0, &tmp64_le, 8);
768   tmp32 = index_entry->firstkeys_len;
769   tmp32_le = GUINT32_TO_LE (tmp32);
770   memcpy (data_out + 8, &tmp32_le, 4);
771 
772   tmp64 = index_entry->compressed_data_offset;
773   tmp64_le = GUINT64_TO_LE (tmp64);
774   memcpy (data_out + 12, &tmp64_le, 8);
775   tmp32 = index_entry->compressed_data_len;
776   tmp32_le = GUINT32_TO_LE (tmp32);
777   memcpy (data_out + 20, &tmp32_le, 4);
778 }
779 
780 static void
index_entry_deserialize(const guint8 * data_in,IndexEntry * index_entry_out)781 index_entry_deserialize (const guint8     *data_in,
782                          IndexEntry       *index_entry_out)
783 {
784   guint32 tmp32_le;
785   guint64 tmp64_le;
786 
787   memcpy (&tmp64_le, data_in + 0, 8);
788   index_entry_out->firstkeys_offset = GUINT64_FROM_LE (tmp64_le);
789   memcpy (&tmp32_le, data_in + 8, 4);
790   index_entry_out->firstkeys_len = GUINT32_FROM_LE (tmp32_le);
791   memcpy (&tmp64_le, data_in + 12, 8);
792   index_entry_out->compressed_data_offset = GUINT64_FROM_LE (tmp64_le);
793   memcpy (&tmp32_le, data_in + 20, 4);
794   index_entry_out->compressed_data_len = GUINT32_FROM_LE (tmp32_le);
795 }
796 
797 
my_mem_pool_alloc(voidpf opaque,uInt items,uInt size)798 static voidpf my_mem_pool_alloc (voidpf opaque, uInt items, uInt size)
799 {
800   FlatFileBuilder *builder = opaque;
801   /* TODO: hack: use alloc0 to avoid uninitialized warnings in valgrind */
802   return gsk_mem_pool_alloc0 (&builder->compressor_allocator, items*size);
803 }
my_mem_pool_free(voidpf opaque,voidpf address)804 static void my_mem_pool_free (voidpf opaque, voidpf address)
805 {
806 }
807 
808 static inline void
reinit_compressor(FlatFileBuilder * builder,guint compression_level,gboolean preowned_mempool)809 reinit_compressor (FlatFileBuilder *builder,
810                    guint            compression_level,
811                    gboolean         preowned_mempool)
812 {
813   if (preowned_mempool)
814     {
815       if (builder->compressor_allocator.all_chunk_list != NULL)
816         {
817           gsk_mem_pool_destruct (&builder->compressor_allocator);
818           builder->compressor_allocator_scratchpad_len *= 2;
819           builder->compressor_allocator_scratchpad = g_realloc (builder->compressor_allocator_scratchpad,
820                                                                 builder->compressor_allocator_scratchpad_len);
821         }
822     }
823   gsk_mem_pool_construct_with_scratch_buf (&builder->compressor_allocator,
824                                            builder->compressor_allocator_scratchpad,
825                                            builder->compressor_allocator_scratchpad_len);
826   memset (&builder->compressor, 0, sizeof (z_stream));
827   builder->compressor.zalloc = my_mem_pool_alloc;
828   builder->compressor.zfree = my_mem_pool_free;
829   builder->compressor.opaque = builder;
830   deflateInit (&builder->compressor, compression_level);
831   builder->n_compressed_entries = 0;
832   builder->uncompressed_data_len = 0;
833   builder->has_last_key = FALSE;
834   gsk_table_buffer_set_len (&builder->compressed, 0);
835 }
836 
837 static FlatFileBuilder *
flat_file_builder_new(FlatFactory * factory)838 flat_file_builder_new (FlatFactory *factory)
839 {
840   if (factory->recycled_builders)
841     {
842       FlatFileBuilder *builder = factory->recycled_builders;
843       factory->recycled_builders = builder->next_recycled_builder;
844       factory->n_recycled_builders--;
845       g_assert (builder->n_compressed_entries == 0
846              && builder->uncompressed_data_len == 0);
847       return builder;
848     }
849   else
850     {
851       FlatFileBuilder *builder = g_slice_new (FlatFileBuilder);
852       gsk_table_buffer_init (&builder->input);
853       gsk_table_buffer_init (&builder->first_key);
854       gsk_table_buffer_init (&builder->last_key);
855       gsk_table_buffer_init (&builder->compressed);
856       gsk_table_buffer_init (&builder->uncompressed);
857       builder->compressor_allocator_scratchpad_len = 1024;
858       builder->compressor_allocator_scratchpad = g_malloc (builder->compressor_allocator_scratchpad_len);
859       reinit_compressor (builder, factory->compression_level, FALSE);
860       return builder;
861     }
862 }
863 
864 static void
builder_recycle(FlatFactory * ffactory,FlatFileBuilder * builder)865 builder_recycle (FlatFactory *ffactory,
866                  FlatFileBuilder *builder)
867 {
868   if (ffactory->n_recycled_builders == ffactory->max_recycled_builders)
869     {
870       gsk_table_buffer_clear (&builder->input);
871       gsk_table_buffer_clear (&builder->first_key);
872       gsk_table_buffer_clear (&builder->last_key);
873       gsk_table_buffer_clear (&builder->compressed);
874       gsk_table_buffer_clear (&builder->uncompressed);
875       gsk_mem_pool_destruct (&builder->compressor_allocator);
876       g_free (builder->compressor_allocator_scratchpad);
877       g_slice_free (FlatFileBuilder, builder);
878     }
879   else
880     {
881       reinit_compressor (builder, ffactory->compression_level, TRUE);
882       builder->next_recycled_builder = ffactory->recycled_builders;
883       ffactory->recycled_builders = builder;
884       ffactory->n_recycled_builders++;
885     }
886 }
887 
888 typedef enum
889 {
890   OPEN_MODE_CREATE,
891   OPEN_MODE_CONTINUE_CREATE,
892   OPEN_MODE_READONLY
893 } OpenMode;
894 
895 static gboolean
open_3_files(FlatFile * file,const char * dir,guint64 id,OpenMode open_mode,GError ** error)896 open_3_files (FlatFile                 *file,
897               const char               *dir,
898               guint64                   id,
899               OpenMode                  open_mode,
900               GError                  **error)
901 {
902   char fname_buf[GSK_TABLE_MAX_PATH];
903   guint open_flags;
904   const char *participle;
905   guint f;
906   switch (open_mode)
907     {
908     case OPEN_MODE_CREATE:
909       open_flags = O_RDWR | O_CREAT | O_TRUNC;
910       participle = "creating";
911       break;
912     case OPEN_MODE_CONTINUE_CREATE:
913       open_flags = O_RDWR;
914       participle = "opening for writing";
915       break;
916     case OPEN_MODE_READONLY:
917       open_flags = O_RDONLY;
918       participle = "opening for reading";
919       break;
920     default:
921       g_assert_not_reached ();
922     }
923 
924   for (f = 0; f < N_FILES; f++)
925     {
926       gsk_table_mk_fname (fname_buf, dir, id, file_extensions[f]);
927       file->fds[f] = open (fname_buf, open_flags, 0644);
928       if (file->fds[f] < 0)
929         {
930           guint tmp_f;
931           g_set_error (error, GSK_G_ERROR_DOMAIN,
932                        GSK_ERROR_FILE_CREATE,
933                        "error %s %s: %s",
934                        participle, fname_buf, g_strerror (errno));
935           for (tmp_f = 0; tmp_f < f; tmp_f++)
936             close (file->fds[tmp_f]);
937           return FALSE;
938         }
939     }
940   return TRUE;
941 }
942 
943 static GskTableFile *
flat__create_file(GskTableFileFactory * factory,const char * dir,guint64 id,const GskTableFileHints * hints,GError ** error)944 flat__create_file      (GskTableFileFactory      *factory,
945                         const char               *dir,
946                         guint64                   id,
947                         const GskTableFileHints  *hints,
948                         GError                  **error)
949 {
950   FlatFactory *ffactory = (FlatFactory *) factory;
951   FlatFile *rv = g_slice_new (FlatFile);
952   guint f;
953   rv->base_file.factory = factory;
954   rv->base_file.id = id;
955   rv->base_file.n_entries = 0;
956 
957   if (!open_3_files (rv, dir, id, OPEN_MODE_CREATE, error))
958     {
959       g_slice_free (FlatFile, rv);
960       return NULL;
961     }
962   rv->builder = flat_file_builder_new (ffactory);
963   for (f = 0; f < N_FILES; f++)
964     {
965       if (!mmap_writer_init_at (&rv->builder->writers[f], rv->fds[f], 0, error))
966         {
967           guint tmp_f;
968           for (tmp_f = 0; tmp_f < f; tmp_f++)
969             mmap_writer_clear (&rv->builder->writers[tmp_f]);
970           for (f = 0; f < N_FILES; f++)
971             close (rv->fds[f]);
972           builder_recycle (ffactory, rv->builder);
973           g_slice_free (FlatFile, rv);
974           return NULL;
975         }
976     }
977 
978   /* write the index file's header */
979   {
980     guint64 zero_le = 0;
981     if (!mmap_writer_write (&rv->builder->writers[FILE_INDEX], 8,
982                             (guint8 *) &zero_le, error))
983       {
984         for (f = 0; f < N_FILES; f++)
985           {
986             mmap_writer_clear (&rv->builder->writers[f]);
987             close (rv->fds[f]);
988           }
989         builder_recycle (ffactory, rv->builder);
990         g_slice_free (FlatFile, rv);
991         return NULL;
992       }
993   }
994 
995 
996   rv->has_readers = FALSE;
997   rv->cache_entries_len = 0;
998   rv->cache_entries = NULL;
999   rv->cache_entries_count = 0;
1000   rv->max_cache_entries = ffactory->max_cache_entries;
1001   return &rv->base_file;
1002 }
1003 
1004 static GskTableFile *
flat__open_building_file(GskTableFileFactory * factory,const char * dir,guint64 id,guint state_len,const guint8 * state_data,GError ** error)1005 flat__open_building_file(GskTableFileFactory     *factory,
1006                          const char               *dir,
1007                          guint64                   id,
1008                          guint                     state_len,
1009                          const guint8             *state_data,
1010                          GError                  **error)
1011 {
1012   FlatFactory *ffactory = (FlatFactory *) factory;
1013   FlatFile *rv = g_slice_new (FlatFile);
1014   rv->base_file.factory = factory;
1015   rv->base_file.id = id;
1016   if (!open_3_files (rv, dir, id, OPEN_MODE_CONTINUE_CREATE, error))
1017     {
1018       g_slice_free (FlatFile, rv);
1019       return NULL;
1020     }
1021 
1022   rv->builder = flat_file_builder_new (ffactory);
1023 
1024   /* seek according to 'state_data' */
1025   g_assert (state_len == 33);
1026   g_assert (state_data[0] == 0);
1027   {
1028     guint f;
1029     for (f = 0; f < N_FILES; f++)
1030       {
1031         guint64 offset_le;
1032         guint64 offset;
1033         memcpy (&offset_le, state_data + 8 * f + 1, 8);
1034         offset = GUINT64_FROM_LE (offset_le);
1035         if (!mmap_writer_init_at (&rv->builder->writers[f], rv->fds[f], offset, error))
1036           {
1037             guint tmp_f;
1038             for (tmp_f = 0; tmp_f < f; tmp_f++)
1039               mmap_writer_clear (&rv->builder->writers[tmp_f]);
1040             for (tmp_f = 0; tmp_f < N_FILES; tmp_f++)
1041               close (rv->fds[tmp_f]);
1042             builder_recycle (ffactory, rv->builder);
1043             g_slice_free (FlatFile, rv);
1044             return NULL;
1045           }
1046       }
1047     {
1048       guint64 n_entries_le, n_entries;
1049       memcpy (&n_entries_le, state_data + 1 + 3*8, 8);
1050       n_entries = GUINT64_FROM_LE (n_entries_le);
1051       rv->base_file.n_entries = n_entries;
1052     }
1053   }
1054   rv->has_readers = FALSE;
1055 
1056   rv->cache_entries_len = 0;
1057   rv->cache_entries = NULL;
1058   rv->cache_entries_count = 0;
1059   rv->max_cache_entries = ffactory->max_cache_entries;
1060 
1061   return &rv->base_file;
1062 }
1063 
1064 GskTableFile *
flat__open_file(GskTableFileFactory * factory,const char * dir,guint64 id,GError ** error)1065 flat__open_file        (GskTableFileFactory      *factory,
1066                         const char               *dir,
1067                         guint64                   id,
1068                         GError                  **error)
1069 {
1070   FlatFactory *ffactory = (FlatFactory *) factory;
1071   FlatFile *rv = g_slice_new (FlatFile);
1072   guint f;
1073   rv->base_file.factory = factory;
1074   rv->base_file.id = id;
1075   if (!open_3_files (rv, dir, id, OPEN_MODE_READONLY, error))
1076     {
1077       g_slice_free (FlatFile, rv);
1078       return NULL;
1079     }
1080   rv->builder = NULL;
1081 
1082   /* pread() to get the number of records */
1083   {
1084     guint64 n_entries_le;
1085     int prv = pread (rv->fds[FILE_INDEX], &n_entries_le, 8, 0);
1086     if (prv < 0)
1087       {
1088         g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PREAD,
1089                      "error reading nrecords from index file: %s",
1090                      g_strerror (errno));
1091         for (f = 0; f < N_FILES; f++)
1092           close (rv->fds[f]);
1093         g_slice_free (FlatFile, rv);
1094         return NULL;
1095       }
1096     if (prv < 8)
1097       {
1098         g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PREAD,
1099                      "premature eof reading nrecords from index file: %s",
1100                      g_strerror (errno));
1101         for (f = 0; f < N_FILES; f++)
1102           close (rv->fds[f]);
1103         g_slice_free (FlatFile, rv);
1104         return NULL;
1105       }
1106     rv->base_file.n_entries = GUINT64_FROM_LE (n_entries_le);
1107   }
1108 
1109   /* mmap small files for reading */
1110   for (f = 0; f < N_FILES; f++)
1111     {
1112       if (!mmap_reader_init (&rv->readers[f], rv->fds[f], error))
1113         {
1114           guint tmp_f;
1115           for (tmp_f = 0; tmp_f < f; tmp_f++)
1116             mmap_reader_clear (&rv->readers[tmp_f]);
1117           for (f = 0; f < N_FILES; f++)
1118             close (rv->fds[f]);
1119           g_slice_free (FlatFile, rv);
1120           return NULL;
1121         }
1122     }
1123   rv->has_readers = TRUE;
1124 
1125   rv->cache_entries_len = 0;
1126   rv->cache_entries = NULL;
1127   rv->cache_entries_count = 0;
1128   rv->max_cache_entries = ffactory->max_cache_entries;
1129 
1130   return &rv->base_file;
1131 }
1132 
1133 static inline void
do_compress(FlatFileBuilder * builder,guint len,const guint8 * data)1134 do_compress (FlatFileBuilder *builder,
1135              guint            len,
1136              const guint8    *data)
1137 {
1138   //g_message ("do_compress: len=%u",len);
1139   builder->uncompressed_data_len += len;
1140 
1141   /* ensure there is enough data at the end of 'compressed' */
1142   gsk_table_buffer_ensure_extra (&builder->compressed, len / 2 + 16);
1143 
1144   /* initialize input and output buffers */
1145   builder->compressor.next_in = (Bytef *) data;
1146   builder->compressor.avail_in = len;
1147   builder->compressor.next_out = builder->compressed.data
1148                                + builder->compressed.len;
1149   builder->compressor.avail_out = builder->compressed.alloced
1150                                 - builder->compressed.len;
1151 
1152   /* deflate until all input consumed */
1153   while (builder->compressor.avail_in > 0)
1154     {
1155 
1156       int zrv;
1157 retry_deflate:
1158       zrv = deflate (&builder->compressor, 0);
1159       g_assert (zrv == Z_OK);
1160       builder->compressed.len = (guint8 *) builder->compressor.next_out
1161                               - (guint8 *) builder->compressed.data;
1162 
1163       if (builder->compressor.avail_out == 0)
1164         {
1165           gsk_table_buffer_ensure_extra (&builder->compressed,
1166                                          builder->compressor.avail_in / 2 + 16);
1167           builder->compressor.next_out = builder->compressed.data
1168                                        + builder->compressed.len;
1169           builder->compressor.avail_out = builder->compressed.alloced
1170                                         - builder->compressed.len;
1171           goto retry_deflate;
1172         }
1173     }
1174 }
1175 
1176 static void
do_compress_flush(FlatFileBuilder * builder)1177 do_compress_flush (FlatFileBuilder *builder)
1178 {
1179   /* 6 bytes is sufficient according to the zlib header file docs;
1180      add 10 for good measure. */
1181   gsk_table_buffer_ensure_extra (&builder->compressed, 6 + 10);
1182   builder->compressor.next_in = NULL;
1183   builder->compressor.avail_in = 0;
1184   builder->compressor.next_out = builder->compressed.data
1185                                + builder->compressed.len;
1186   builder->compressor.avail_out = builder->compressed.alloced
1187                                 - builder->compressed.len;
1188   for (;;)
1189     {
1190       if (deflate (&builder->compressor, Z_SYNC_FLUSH) != Z_OK)
1191         g_assert_not_reached ();
1192       builder->compressed.len = builder->compressor.next_out
1193                               - builder->compressed.data;
1194       if (builder->compressor.avail_out > 0)
1195         break;
1196 
1197       gsk_table_buffer_ensure_extra (&builder->compressed, 64);
1198 
1199       builder->compressor.next_out = builder->compressed.data
1200                                    + builder->compressed.len;
1201       builder->compressor.avail_out = builder->compressed.alloced
1202                                     - builder->compressed.len;
1203     }
1204 }
1205 
1206 static gboolean
flush_to_files(FlatFileBuilder * builder,GError ** error)1207 flush_to_files (FlatFileBuilder *builder,
1208                 GError **error)
1209 {
1210   /* emit index, keyfile and data file stuff */
1211   guint8 header[SIZEOF_INDEX_ENTRY];
1212   guint8 compressed_header[5 + 5];
1213   guint compressed_header_len = 0;
1214   IndexEntry index_entry;
1215   guint tmp;
1216 
1217   /* flush compressor */
1218   do_compress_flush (builder);
1219 
1220   /* write uncompressed_data_len and n_compressed_entries
1221      to the compressed_header */
1222   compressed_header_len = uint32_vli_encode (builder->n_compressed_entries,
1223                                              compressed_header);
1224   tmp = uint32_vli_encode (builder->uncompressed_data_len,
1225                            compressed_header + compressed_header_len);
1226   compressed_header_len += tmp;
1227 
1228 #if DEBUG_FLUSH
1229   g_message ("flush_to_files: n_compressed_entry=%u, uncompressed_data_len=%u, compressed_header_len=%u, compressed_len=%u", builder->n_compressed_entries, builder->uncompressed_data_len, compressed_header_len, builder->compressed.len);
1230 #if DEBUG_DUMP_COMPRESSED_DATA
1231   {
1232     char *hex = gsk_escape_memory_hex (builder->compressed.data, builder->compressed.len);
1233     g_message ("  compressed_data=%s", hex);
1234     g_free (hex);
1235   }
1236 #endif
1237 #endif
1238 
1239   /* encode index entry */
1240   index_entry.firstkeys_offset = mmap_writer_offset (&builder->writers[FILE_FIRSTKEYS]);
1241   index_entry.firstkeys_len = builder->first_key.len;
1242   index_entry.compressed_data_offset = mmap_writer_offset (&builder->writers[FILE_DATA]);
1243   index_entry.compressed_data_len = compressed_header_len + builder->compressed.len;
1244 #if DEBUG_INDEX_ENTRIES
1245   g_message ("writing index entry firstkey offset/len=%llu/%u; compressed %llu/%u", index_entry.firstkeys_offset, index_entry.firstkeys_len, index_entry.compressed_data_offset, index_entry.compressed_data_len);
1246 #endif
1247   index_entry_serialize (&index_entry, header);
1248 
1249   /* write data to files */
1250   if (!mmap_writer_write (builder->writers + FILE_INDEX, SIZEOF_INDEX_ENTRY, header, error)
1251    || !mmap_writer_write (builder->writers + FILE_FIRSTKEYS, builder->first_key.len, builder->first_key.data, error)
1252    || !mmap_writer_write (builder->writers + FILE_DATA, compressed_header_len, compressed_header, error)
1253    || !mmap_writer_write (builder->writers + FILE_DATA, builder->compressed.len, builder->compressed.data, error))
1254     return FALSE;
1255   return TRUE;
1256 }
1257 
1258 /* methods for a file which is being built */
1259 static GskTableFeedEntryResult
flat__feed_entry(GskTableFile * file,guint key_len,const guint8 * key_data,guint value_len,const guint8 * value_data,GError ** error)1260 flat__feed_entry      (GskTableFile             *file,
1261                        guint                     key_len,
1262                        const guint8             *key_data,
1263                        guint                     value_len,
1264                        const guint8             *value_data,
1265                        GError                  **error)
1266 {
1267   FlatFile *ffile = (FlatFile *) file;
1268   FlatFactory *ffactory = (FlatFactory *) file->factory;
1269   FlatFileBuilder *builder = ffile->builder;
1270   guint8 enc_buf[5+5];
1271   guint encoded_len, tmp;
1272 
1273   g_assert (builder != NULL);
1274 
1275   file->n_entries++;
1276 
1277   if (builder->has_last_key)
1278     {
1279       /* compute prefix length */
1280       guint prefix_len = 0;
1281       guint max_prefix_len = MIN (key_len, builder->last_key.len);
1282       while (prefix_len < max_prefix_len
1283           && key_data[prefix_len] == builder->last_key.data[prefix_len])
1284         prefix_len++;
1285 
1286       /* encode prefix_length, and (key_len-prefix_length) */
1287       encoded_len = uint32_vli_encode (prefix_len, enc_buf);
1288       tmp = uint32_vli_encode (key_len - prefix_len, enc_buf + encoded_len);
1289       encoded_len += tmp;
1290       memcpy (gsk_table_buffer_set_len (&builder->uncompressed, encoded_len),
1291               enc_buf, encoded_len);
1292 
1293       /* copy non-prefix portion of key */
1294       memcpy (gsk_table_buffer_append (&builder->uncompressed, key_len - prefix_len),
1295               key_data + prefix_len, key_len - prefix_len);
1296     }
1297   else
1298     {
1299       /* the key's length will be in the index file
1300          (no prefix-compression possible on the first key);
1301          the key's data will be in the firstkeys file */
1302       builder->has_last_key = TRUE;
1303       memcpy (gsk_table_buffer_set_len (&builder->first_key,
1304                                         key_len), key_data, key_len);
1305       gsk_table_buffer_set_len (&builder->uncompressed, 0);
1306     }
1307 
1308   builder->n_compressed_entries++;
1309 
1310   /* encode value length */
1311   encoded_len = uint32_vli_encode (value_len, enc_buf);
1312   memcpy (gsk_table_buffer_append (&builder->uncompressed, encoded_len),
1313           enc_buf, encoded_len);
1314 
1315   /* compress the non-value portion */
1316   do_compress (builder, builder->uncompressed.len, builder->uncompressed.data);
1317 
1318   /* compress the value portion */
1319   do_compress (builder, value_len, value_data);
1320 
1321   if (builder->compressed.len >= ffactory->bytes_per_chunk)
1322     {
1323       if (!flush_to_files (builder, error))
1324         return GSK_TABLE_FEED_ENTRY_ERROR;
1325 
1326       reinit_compressor (builder, ffactory->compression_level, TRUE);
1327       builder->has_last_key = FALSE;
1328     }
1329   else
1330     {
1331       builder->has_last_key = TRUE;
1332       memcpy (gsk_table_buffer_set_len (&builder->last_key,
1333                                         key_len), key_data, key_len);
1334     }
1335   return builder->has_last_key ? GSK_TABLE_FEED_ENTRY_WANT_MORE
1336                                : GSK_TABLE_FEED_ENTRY_SUCCESS;
1337 }
1338 
1339 static gboolean
flat__done_feeding(GskTableFile * file,gboolean * ready_out,GError ** error)1340 flat__done_feeding     (GskTableFile             *file,
1341                         gboolean                 *ready_out,
1342                         GError                  **error)
1343 {
1344   FlatFile *ffile = (FlatFile *) file;
1345   FlatFactory *ffactory = (FlatFactory *) file->factory;
1346   FlatFileBuilder *builder = ffile->builder;
1347   guint f;
1348   if (builder->has_last_key)
1349     {
1350       if (!flush_to_files (builder, error))
1351         return FALSE;
1352     }
1353 
1354   /* unmap and ftruncate all files */
1355   for (f = 0; f < N_FILES; f++)
1356     {
1357       guint64 offset = mmap_writer_offset (&builder->writers[f]);
1358       mmap_writer_clear (&builder->writers[f]);
1359       if (ftruncate (ffile->fds[f], offset) < 0)
1360         {
1361           g_set_error (error, GSK_G_ERROR_DOMAIN,
1362                        GSK_ERROR_FILE_TRUNCATE,
1363                        "error truncating %s file: %s",
1364                        file_extensions[f], g_strerror (errno));
1365           return FALSE;
1366         }
1367     }
1368 
1369   /* write the number of records to the front */
1370   {
1371     guint64 n_entries = file->n_entries;
1372     guint64 n_entries_le = GUINT64_TO_LE (n_entries);
1373     int pwrite_rv = pwrite (ffile->fds[FILE_INDEX], &n_entries_le, 8, 0);
1374     if (pwrite_rv < 0)
1375       {
1376         g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PWRITE,
1377                      "pwrite failed writing n_entries: %s",
1378                      g_strerror (errno));
1379         return FALSE;
1380       }
1381     if (pwrite_rv < 8)
1382       {
1383         g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_PWRITE,
1384                      "pwrite partial data write???");
1385         return FALSE;
1386       }
1387   }
1388 
1389   /* mmap for reading small files */
1390   for (f = 0; f < N_FILES; f++)
1391     if (!mmap_reader_init (&ffile->readers[f], ffile->fds[f], error))
1392       {
1393         guint tmp_f;
1394         for (tmp_f = 0; tmp_f < f; tmp_f++)
1395           mmap_reader_clear (&ffile->readers[tmp_f]);
1396         return FALSE;
1397       }
1398   ffile->has_readers = TRUE;
1399 
1400   /* recycle/free the builder object */
1401   ffile->builder = NULL;
1402   builder_recycle (ffactory, builder);
1403   *ready_out = TRUE;
1404 
1405   return TRUE;
1406 }
1407 
1408 static gboolean
flat__get_build_state(GskTableFile * file,guint * state_len_out,guint8 ** state_data_out,GError ** error)1409 flat__get_build_state  (GskTableFile             *file,
1410                         guint                    *state_len_out,
1411                         guint8                  **state_data_out,
1412                         GError                  **error)
1413 {
1414   guint f;
1415   FlatFile *ffile = (FlatFile *) file;
1416   FlatFileBuilder *builder = ffile->builder;
1417   guint8 *data;
1418   g_assert (builder != NULL);
1419   *state_len_out = 1 + 3 * 8 + 8;
1420   data = *state_data_out = g_malloc (*state_len_out);
1421   data[0] = 0;               /* phase 0; reserved to allow multiphase processing in future */
1422   for (f = 0; f < N_FILES; f++)
1423     {
1424       guint64 offset = mmap_writer_offset (builder->writers + f);
1425       guint64 offset_le = GUINT64_TO_LE (offset);
1426       memcpy (data + 8 * f + 1, &offset_le, 8);
1427     }
1428   {
1429     guint64 n_entries = file->n_entries;
1430     guint64 n_entries_le = GUINT64_TO_LE (n_entries);
1431     memcpy (data + 1 + 3*8, &n_entries_le, 8);
1432   }
1433   return TRUE;
1434 }
1435 
1436 static gboolean
flat__build_file(GskTableFile * file,gboolean * ready_out,GError ** error)1437 flat__build_file       (GskTableFile             *file,
1438                         gboolean                 *ready_out,
1439                         GError                  **error)
1440 {
1441   *ready_out = TRUE;
1442   return TRUE;
1443 }
1444 
1445 static void
flat__release_build_data(GskTableFile * file)1446 flat__release_build_data(GskTableFile            *file)
1447 {
1448   /* nothing to do, since we finish building immediately */
1449 }
1450 
1451 /* --- query api --- */
1452 static gboolean
do_pread(FlatFile * ffile,WhichFile f,guint64 offset,guint length,guint8 * ptr_out,GError ** error)1453 do_pread (FlatFile *ffile,
1454           WhichFile f,
1455           guint64   offset,
1456           guint     length,
1457           guint8    *ptr_out,
1458           GError   **error)
1459 {
1460   if (ffile->builder)
1461     {
1462       return mmap_writer_pread (ffile->builder->writers + f,
1463                                 offset, length, ptr_out, error);
1464     }
1465   else
1466     {
1467       g_assert (ffile->has_readers);
1468       return mmap_reader_pread (ffile->readers + f,
1469                                 offset, length, ptr_out, error);
1470     }
1471 }
1472 
1473 static gboolean
flat__query_file(GskTableFile * file,GskTableFileQuery * query_inout,GError ** error)1474 flat__query_file       (GskTableFile             *file,
1475                         GskTableFileQuery        *query_inout,
1476                         GError                  **error)
1477 {
1478   FlatFile *ffile = (FlatFile *) file;
1479   guint64 n_index_records, first, n;
1480   CacheEntry *cache_entry;
1481   IndexEntry index_entry;
1482   gboolean index_entry_up_to_date = FALSE;
1483   guint8 index_entry_data[SIZEOF_INDEX_ENTRY];
1484   if (ffile->builder != NULL)
1485     n_index_records = (mmap_writer_offset (&ffile->builder->writers[FILE_INDEX]) - INDEX_HEADER_SIZE)
1486                     / SIZEOF_INDEX_ENTRY;
1487   else if (ffile->has_readers)
1488     n_index_records = (ffile->readers[FILE_INDEX].file_size - INDEX_HEADER_SIZE)
1489                     / SIZEOF_INDEX_ENTRY;
1490   else
1491     {
1492       g_set_error (error, GSK_G_ERROR_DOMAIN,
1493                    GSK_ERROR_INVALID_STATE,
1494                    "flat file in error state");
1495       return FALSE;
1496     }
1497 
1498   if (n_index_records == 0)
1499     {
1500       query_inout->found = FALSE;
1501       return TRUE;
1502     }
1503   first = 0;
1504   n = n_index_records;
1505   GskTableBuffer firstkey;
1506   gsk_table_buffer_init (&firstkey);
1507   while (n > 1)
1508     {
1509       guint64 mid = first + n / 2;
1510       gint compare_rv;
1511 
1512       /* read index entry */
1513       if (!do_pread (ffile, FILE_INDEX, mid * SIZEOF_INDEX_ENTRY + INDEX_HEADER_SIZE, SIZEOF_INDEX_ENTRY, index_entry_data, error))
1514         {
1515           gsk_table_buffer_clear (&firstkey);
1516           return FALSE;
1517         }
1518       index_entry_deserialize (index_entry_data, &index_entry);
1519 
1520       /* read key */
1521       gsk_table_buffer_set_len (&firstkey, index_entry.firstkeys_len);
1522       if (!do_pread (ffile, FILE_FIRSTKEYS, index_entry.firstkeys_offset, index_entry.firstkeys_len,
1523                      firstkey.data, error))
1524         {
1525           gsk_table_buffer_clear (&firstkey);
1526           return FALSE;
1527         }
1528 
1529       /* invoke comparator */
1530       compare_rv = query_inout->compare (index_entry.firstkeys_len,
1531                                          firstkey.data,
1532                                          query_inout->compare_data);
1533 
1534       if (compare_rv < 0)
1535         {
1536           n = mid - first;
1537           index_entry_up_to_date = FALSE;
1538         }
1539       else if (compare_rv > 0)
1540         {
1541           n = first + n - mid;
1542           first = mid;
1543           index_entry_up_to_date = TRUE;
1544         }
1545       else
1546         {
1547           CacheEntryRecord *record;
1548           cache_entry = cache_entry_force (ffile, mid,
1549                                            &index_entry, firstkey.data,
1550                                            error);
1551           if (cache_entry == NULL)
1552             {
1553               gsk_table_buffer_clear (&firstkey);
1554               return FALSE;
1555             }
1556           record = cache_entry->records + 0;
1557           memcpy (gsk_table_buffer_set_len (&query_inout->value, record->value_len),
1558                   record->value_data, record->value_len);
1559           query_inout->found = TRUE;
1560           gsk_table_buffer_clear (&firstkey);
1561           return TRUE;
1562         }
1563     }
1564 
1565   if (!index_entry_up_to_date)
1566     {
1567       /* read index entry */
1568       if (!do_pread (ffile, FILE_INDEX, first * SIZEOF_INDEX_ENTRY + INDEX_HEADER_SIZE, SIZEOF_INDEX_ENTRY, index_entry_data, error))
1569         return FALSE;
1570       index_entry_deserialize (index_entry_data, &index_entry);
1571 
1572       /* read firstkey */
1573       gsk_table_buffer_set_len (&firstkey, index_entry.firstkeys_len);
1574       if (!do_pread (ffile, FILE_FIRSTKEYS, index_entry.firstkeys_offset, index_entry.firstkeys_len,
1575                      firstkey.data, error))
1576         {
1577           gsk_table_buffer_clear (&firstkey);
1578           return FALSE;
1579         }
1580     }
1581 
1582   /* uncompress block, cache */
1583   cache_entry = cache_entry_force (ffile, first,
1584                                    &index_entry, firstkey.data,
1585                                    error);
1586   if (cache_entry == NULL)
1587     {
1588       gsk_table_buffer_clear (&firstkey);
1589       return FALSE;
1590     }
1591 
1592   /* bsearch the uncompressed block */
1593   {
1594     guint first = 0;
1595     guint n = cache_entry->n_entries;
1596     while (n > 1)
1597       {
1598         guint mid = first + n / 2;
1599         CacheEntryRecord *record = cache_entry->records + mid;
1600         int compare_rv = query_inout->compare (record->key_len, record->key_data,
1601                                                query_inout->compare_data);
1602         if (compare_rv < 0)
1603           {
1604             n = mid - first;
1605           }
1606         else if (compare_rv > 0)
1607           {
1608             n = first + n - mid;
1609             first = mid;
1610           }
1611         else
1612           {
1613             memcpy (gsk_table_buffer_set_len (&query_inout->value, record->value_len),
1614                     record->value_data, record->value_len);
1615             query_inout->found = TRUE;
1616             return TRUE;
1617           }
1618       }
1619     if (n == 1 && first < cache_entry->n_entries)
1620       {
1621         CacheEntryRecord *record = cache_entry->records + first;
1622         int compare_rv = query_inout->compare (record->key_len, record->key_data,
1623                                                query_inout->compare_data);
1624         if (compare_rv == 0)
1625           {
1626             memcpy (gsk_table_buffer_set_len (&query_inout->value, record->value_len),
1627                     record->value_data, record->value_len);
1628             query_inout->found = TRUE;
1629             return TRUE;
1630           }
1631       }
1632   }
1633   query_inout->found = FALSE;
1634   return TRUE;
1635 }
1636 
1637 /* --- reader api --- */
1638 static void
read_and_uncompress_chunk(FlatFileReader * freader)1639 read_and_uncompress_chunk (FlatFileReader *freader)
1640 {
1641   /* read index fp record, or set eof flag or error */
1642   guint8 index_data[SIZEOF_INDEX_ENTRY];
1643   IndexEntry index_entry;
1644   guint8 *firstkey;
1645   guint f;
1646 
1647   /* set up state before reading */
1648   for (f = 0; f < 3; f++)
1649     freader->chunk_file_offsets[f] = FTELLO (freader->fps[f]);
1650 
1651   if (FREAD (index_data, SIZEOF_INDEX_ENTRY, 1, freader->fps[FILE_INDEX]) != 1)
1652     {
1653       freader->base_reader.eof = 1;
1654       return;
1655     }
1656   index_entry_deserialize (index_data, &index_entry);
1657 
1658 #if DEBUG_READ_CHUNK
1659   g_message ("chunk offsets=%llu,%llu,%llu; ie.compressed_len=%u",
1660              freader->chunk_file_offsets[0],
1661              freader->chunk_file_offsets[1],
1662              freader->chunk_file_offsets[2],
1663              index_entry.compressed_data_len);
1664 #endif
1665 
1666   /* allocate buffers in one piece */
1667   firstkey = g_malloc (index_entry.firstkeys_len + index_entry.compressed_data_len);
1668   guint8 *compressed_data;
1669   compressed_data = firstkey + index_entry.firstkeys_len;
1670 
1671   /* read firstkey */
1672   if (index_entry.firstkeys_len != 0
1673     && FREAD (firstkey, index_entry.firstkeys_len, 1, freader->fps[FILE_FIRSTKEYS]) != 1)
1674     {
1675       freader->base_reader.error = g_error_new (GSK_G_ERROR_DOMAIN,
1676                                     GSK_ERROR_PREMATURE_EOF,
1677                                     "premature eof in firstkey file [firstkey len=%u]", index_entry.firstkeys_len);
1678       g_free (firstkey);
1679       return;
1680     }
1681 
1682   /* read data */
1683   if (FREAD (compressed_data, index_entry.compressed_data_len, 1, freader->fps[FILE_DATA]) != 1)
1684     {
1685       freader->base_reader.error = g_error_new (GSK_G_ERROR_DOMAIN,
1686                                     GSK_ERROR_PREMATURE_EOF,
1687                                     "premature eof in compressed-data file [compressed_data_len=%u]",
1688                                     index_entry.compressed_data_len);
1689       g_free (firstkey);
1690       return;
1691     }
1692 
1693   /* do the actual un-gzipping and scanning */
1694   freader->cache_entry = cache_entry_deserialize (freader->index_entry_index++,
1695                                                   index_entry.firstkeys_len, firstkey,
1696                                                   index_entry.compressed_data_len, compressed_data,
1697                                                   &freader->base_reader.error);
1698   g_free (firstkey);
1699 }
1700 
1701 static inline void
init_base_reader_record(FlatFileReader * freader)1702 init_base_reader_record (FlatFileReader *freader)
1703 {
1704   CacheEntryRecord *record = freader->cache_entry->records + freader->record_index;
1705   freader->base_reader.key_len = record->key_len;
1706   freader->base_reader.key_data = record->key_data;
1707   freader->base_reader.value_len = record->value_len;
1708   freader->base_reader.value_data = record->value_data;
1709 }
1710 
1711 static void
reader_advance(GskTableReader * reader)1712 reader_advance (GskTableReader *reader)
1713 {
1714   FlatFileReader *freader = (FlatFileReader *) reader;
1715   if (freader->base_reader.eof || freader->base_reader.error)
1716     return;
1717   if (++freader->record_index == freader->cache_entry->n_entries)
1718     {
1719       g_free (freader->cache_entry);
1720       freader->cache_entry = NULL;
1721       read_and_uncompress_chunk (freader);
1722       if (reader->eof || reader->error != NULL)
1723         return;
1724       freader->record_index = 0;
1725     }
1726   init_base_reader_record (freader);
1727 }
1728 static void
reader_destroy(GskTableReader * reader)1729 reader_destroy (GskTableReader *reader)
1730 {
1731   guint f;
1732   FlatFileReader *freader = (FlatFileReader *) reader;
1733   if (freader->cache_entry)
1734     g_free (freader->cache_entry);
1735   for (f = 0; f < N_FILES; f++)
1736     if (freader->fps[f] != NULL)
1737       fclose (freader->fps[f]);
1738   g_slice_free (FlatFileReader, freader);
1739 }
1740 
1741 static FlatFileReader *
reader_open_fps(GskTableFile * file,const char * dir,GError ** error)1742 reader_open_fps (GskTableFile *file,
1743                  const char   *dir,
1744                  GError      **error)
1745 {
1746   FlatFileReader *freader = g_slice_new (FlatFileReader);
1747   guint f;
1748   freader->base_reader.eof = FALSE;
1749   freader->base_reader.error = NULL;
1750   for (f = 0; f < N_FILES; f++)
1751     {
1752       char fname_buf[GSK_TABLE_MAX_PATH];
1753       gsk_table_mk_fname (fname_buf, dir, file->id, file_extensions[f]);
1754       freader->fps[f] = fopen (fname_buf, "rb");
1755       if (freader->fps[f] == NULL)
1756         {
1757           g_set_error (error, GSK_G_ERROR_DOMAIN,
1758                        GSK_ERROR_FILE_OPEN,
1759                        "error opening %s for reading: %s",
1760                        fname_buf, g_strerror (errno));
1761           g_slice_free (FlatFileReader, freader);
1762           return NULL;
1763         }
1764     }
1765   freader->base_reader.advance = reader_advance;
1766   freader->base_reader.destroy = reader_destroy;
1767   return freader;
1768 }
1769 static FlatFileReader *
reader_open_eof(void)1770 reader_open_eof (void)
1771 {
1772   FlatFileReader *freader = g_slice_new (FlatFileReader);
1773   guint f;
1774   freader->base_reader.eof = TRUE;
1775   freader->base_reader.error = NULL;
1776   for (f = 0; f < N_FILES; f++)
1777     freader->fps[f] = NULL;
1778   freader->base_reader.advance = reader_advance;
1779   freader->base_reader.destroy = reader_destroy;
1780   return freader;
1781 }
1782 
1783 static GskTableReader *
flat__create_reader(GskTableFile * file,const char * dir,GError ** error)1784 flat__create_reader    (GskTableFile             *file,
1785                         const char               *dir,
1786                         GError                  **error)
1787 {
1788   FlatFileReader *freader = reader_open_fps (file, dir, error);
1789   guint64 ief_header;
1790   if (freader == NULL)
1791     return NULL;
1792 
1793   if (FREAD (&ief_header, 8, 1, freader->fps[FILE_INDEX]) != 1)
1794     {
1795       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_READ,
1796                    "premature eof reading index file header");
1797       return NULL;
1798     }
1799 
1800   read_and_uncompress_chunk (freader);
1801   if (!freader->base_reader.eof && freader->base_reader.error == NULL)
1802     {
1803       freader->record_index = 0;
1804       init_base_reader_record (freader);
1805     }
1806 
1807   return &freader->base_reader;
1808 }
1809 
1810 /* you must always be able to get reader state */
1811 static gboolean
flat__get_reader_state(GskTableFile * file,GskTableReader * reader,guint * state_len_out,guint8 ** state_data_out,GError ** error)1812 flat__get_reader_state (GskTableFile             *file,
1813                         GskTableReader           *reader,
1814                         guint                    *state_len_out,
1815                         guint8                  **state_data_out,
1816                         GError                  **error)
1817 {
1818   FlatFileReader *freader = (FlatFileReader *) reader;
1819   guint8 *data;
1820   guint f;
1821   /* state is:
1822        1 byte state -- 0=in progress;  1=eof
1823      if state==0:
1824        8 bytes index file offset
1825        8 bytes firstkeys file offset
1826        8 bytes data offset
1827        4 bytes index into the compressed byte to return next
1828      if state==1:
1829        no other data
1830    */
1831   g_assert (reader->error == NULL);
1832   if (reader->eof)
1833     {
1834       *state_len_out = 1;
1835       *state_data_out = g_malloc (1);
1836       (*state_data_out)[0] = 1;
1837       return TRUE;
1838     }
1839   *state_len_out = 1 + 8 + 8 + 8 + 4;
1840   data = *state_data_out = g_malloc (*state_len_out);
1841   data[0] = 0;
1842   for (f = 0; f < N_FILES; f++)
1843     {
1844       guint64 tmp64 = freader->chunk_file_offsets[f];
1845       guint64 tmp_le = GUINT64_TO_LE (tmp64);
1846       memcpy (data + 1 + 8 * f, &tmp_le, 8);
1847     }
1848   {
1849     guint32 tmp32 = freader->record_index;
1850     guint32 tmp32_le = GUINT32_TO_LE (tmp32);
1851     memcpy (data + 1 + 8*3, &tmp32_le, 4);
1852   }
1853   return TRUE;
1854 }
1855 
1856 static GskTableReader *
flat__recreate_reader(GskTableFile * file,const char * dir,guint state_len,const guint8 * state_data,GError ** error)1857 flat__recreate_reader  (GskTableFile             *file,
1858                         const char               *dir,
1859                         guint                     state_len,
1860                         const guint8             *state_data,
1861                         GError                  **error)
1862 {
1863   FlatFileReader *freader;
1864   guint f;
1865   if (freader == NULL)
1866     return NULL;
1867   switch (state_data[0])
1868     {
1869     case 0:             /* in progress */
1870       freader = reader_open_fps (file, dir, error);
1871       if (freader == NULL)
1872         return NULL;
1873 
1874       /* seek */
1875       for (f = 0; f < 3; f++)
1876         {
1877           guint64 tmp_le, tmp;
1878           memcpy (&tmp_le, state_data + 1 + 8*f, 8);
1879           tmp = GUINT64_FROM_LE (tmp_le);
1880           if (FSEEKO (freader->fps[f], tmp, SEEK_SET) < 0)
1881             {
1882               guint tmp_f;
1883               g_set_error (error, GSK_G_ERROR_DOMAIN,
1884                            GSK_ERROR_FILE_SEEK,
1885                            "error seeking %s file: %s",
1886                            file_extensions[f], g_strerror (errno));
1887               for (tmp_f = 0; tmp_f < N_FILES; tmp_f++)
1888                 fclose (freader->fps[tmp_f]);
1889               g_slice_free (FlatFileReader, freader);
1890               return NULL;
1891             }
1892         }
1893 
1894       read_and_uncompress_chunk (freader);
1895 
1896       if (freader->base_reader.eof
1897        || freader->base_reader.error != NULL)
1898         {
1899           if (freader->base_reader.error)
1900             g_propagate_error (error, freader->base_reader.error);
1901           else
1902             g_set_error (error, GSK_G_ERROR_DOMAIN,
1903                          GSK_ERROR_PREMATURE_EOF,
1904                          "unexpected eof restoring file reader");
1905           for (f = 0; f < N_FILES; f++)
1906             fclose (freader->fps[f]);
1907           g_slice_free (FlatFileReader, freader);
1908           return NULL;
1909         }
1910       {
1911         guint32 tmp_le;
1912         memcpy (&tmp_le, state_data + 1 + 3*8, 4);
1913         freader->record_index = GUINT32_FROM_LE (tmp_le);
1914         if (freader->record_index >= freader->cache_entry->n_entries)
1915           {
1916             g_set_error (error, GSK_G_ERROR_DOMAIN,
1917                          GSK_ERROR_PREMATURE_EOF,
1918                          "record index out-of-bounds in state-data");
1919             for (f = 0; f < N_FILES; f++)
1920               fclose (freader->fps[f]);
1921             g_slice_free (FlatFileReader, freader);
1922             return NULL;
1923           }
1924       }
1925       init_base_reader_record (freader);
1926 
1927       break;
1928     case 1:             /* eof */
1929       g_assert (state_len == 1);
1930       freader = reader_open_eof ();
1931       break;
1932     default:
1933       g_set_error (error, GSK_G_ERROR_DOMAIN,
1934                    GSK_ERROR_PARSE,
1935                    "unknown state for reader");
1936       return NULL;
1937     }
1938   return &freader->base_reader;
1939 }
1940 
1941 
1942 /* destroying files and factories */
1943 static gboolean
flat__destroy_file(GskTableFile * file,const char * dir,gboolean erase,GError ** error)1944 flat__destroy_file     (GskTableFile             *file,
1945                         const char               *dir,
1946                         gboolean                  erase,
1947                         GError                  **error)
1948 {
1949   FlatFactory *ffactory = (FlatFactory *) file->factory;
1950   FlatFile *ffile = (FlatFile *) file;
1951   FlatFileBuilder *builder = ffile->builder;
1952   guint f;
1953   if (builder != NULL)
1954     {
1955       for (f = 0; f < N_FILES; f++)
1956         mmap_writer_clear (builder->writers + f);
1957       builder_recycle (ffactory, builder);
1958     }
1959   else if (ffile->has_readers)
1960     {
1961       for (f = 0; f < N_FILES; f++)
1962         mmap_reader_clear (ffile->readers + f);
1963     }
1964   for (f = 0; f < N_FILES; f++)
1965     close (ffile->fds[f]);
1966   if (erase)
1967     {
1968       for (f = 0; f < N_FILES; f++)
1969         {
1970           char fname_buf[GSK_TABLE_MAX_PATH];
1971           gsk_table_mk_fname (fname_buf, dir, file->id, file_extensions[f]);
1972           unlink (fname_buf);
1973         }
1974     }
1975   g_slice_free (FlatFile, ffile);
1976   return TRUE;
1977 }
1978 
1979 static void
flat__destroy_factory(GskTableFileFactory * factory)1980 flat__destroy_factory  (GskTableFileFactory      *factory)
1981 {
1982   /* static factory */
1983 }
1984 
1985 
1986 /* for now, return a static factory object */
gsk_table_file_factory_new_flat(void)1987 GskTableFileFactory *gsk_table_file_factory_new_flat (void)
1988 {
1989   static FlatFactory the_factory =
1990     {
1991       {
1992         flat__create_file,
1993         flat__open_building_file,
1994         flat__open_file,
1995         flat__feed_entry,
1996         flat__done_feeding,
1997         flat__get_build_state,
1998         flat__build_file,
1999         flat__release_build_data,
2000         flat__query_file,
2001         flat__create_reader,
2002         flat__get_reader_state,
2003         flat__recreate_reader,
2004         flat__destroy_file,
2005         flat__destroy_factory
2006       },
2007       16384,
2008       3,                        /* zlib compression level */
2009       0,                        /* n recycled builders */
2010       8,                        /* max recycled builders */
2011       NULL,                     /* recycled builder list */
2012       24                        /* max cache entries */
2013     };
2014 
2015   return &the_factory.base_factory;
2016 }
2017