1 /* XXX: strace indicates that we are using O_LARGEFILE? */
2 /* XXX: need to handle files that have to spend a while finishing
3    up after the last entry is added. */
4 /* XXX: still get occasional expected value but not found errors
5    from test scripts (while N; do Nq; done) ... see also "setup" script for tests */
6 /* NOTE: see 'setup' script for tests (etc) */
7 /* POSSIBLE TODO: support key_fixed_length, value_fixed_length */
8 /* POSSIBLE TODO: support disabling prefix compression */
9 /* POSSIBLE TODO: support altering compression level */
10 /* POSSIBLE TODO: support LZO compression */
11 
12 #include <string.h>
13 #include <errno.h>
14 #include <sys/mman.h>
15 #include <fcntl.h>
16 #include <sys/stat.h>
17 #include <sys/types.h>
18 #include <unistd.h>
19 #include <stdio.h>              /* for rename() */
20 
21 #include "gskrbtreemacros.h"
22 #include "gsklistmacros.h"
23 #include "gskghelpers.h"
24 #include "gskutils.h"
25 #include "gskmemorybarrier.h"
26 #include "gsktable.h"
27 #include "gsktable-file.h"
28 #include "gskerror.h"
29 
30 typedef struct _TableUserData TableUserData;
31 typedef struct _MergeTask MergeTask;
32 typedef struct _FileInfo FileInfo;
33 typedef struct _TreeNode TreeNode;
34 
35 #define ID_FMT  "%" G_GUINT64_FORMAT
36 
37 #define JOURNAL_FILE_MAGIC              0x1143eeab
38 
39 #define DEBUG_MERGE_TASKS 0
40 #define DEBUG_PRINT_QUERIES 0
41 #define DEBUG_JOURNAL_WRITING 0
42 #define DEBUG_JOURNAL_REPLAY 0
43 
44 #define TASK_IS_UNSTARTED(task) \
45   ((task) == NULL || !(task)->is_started)
46 
47 struct _TableUserData
48 {
49   guint ref_count;
50   GskTableMergeFunc merge;
51   gpointer user_data;
52   GDestroyNotify destroy;
53 };
54 
55 struct _MergeTask
56 {
57   gboolean is_started;
58   FileInfo *inputs[2];
59 
60   union
61   {
62     struct {
63       /* ratio of inputs[0]->n_data / inputs[1]->n_data * (2^16) */
64       /* we want the smallest ratio possible;
65          the "max_merge_ratio" specifies the limit
66          for merge-task creation. */
67       guint32 input_size_ratio_b16;
68 
69       /* tree of unstarted merge tasks, sorted by input_size_ratio_b16
70          then n_entries */
71       MergeTask *left, *right, *parent;
72       gboolean is_red;
73     } unstarted;
74     struct {
75       GskTableFile *output;
76       gboolean has_last_queryable_key;
77       GskTableBuffer last_queryable_key;
78       struct {
79         GskTableReader *reader;
80       } inputs[2];
81       MergeTask *next_run;
82     } started;
83   } info;
84 };
85 
86 struct _FileInfo
87 {
88   GskTableFile *file;
89   guint ref_count;
90   guint64 first_input_entry, n_input_entries;
91   MergeTask *prev_task;         /* possible merge task with prior file */
92   MergeTask *next_task;         /* possible merge task with next file */
93   FileInfo *prev_file, *next_file;
94 };
95 
96 struct _TreeNode
97 {
98   GskTableBuffer key;
99   GskTableBuffer value;
100   TreeNode *left, *right, *parent;
101   guint is_red : 1;
102 };
103 
104 #define GET_FILE_INFO_LIST(table) \
105   FileInfo *, (table)->first_file, (table)->last_file, prev_file, next_file
106 #define GET_RUN_STACK(table) \
107   MergeTask *, table->run_list, info.started.next_run
108 
109 /* always runs the table->run_list task */
110 typedef gboolean (*RunTaskFunc) (GskTable   *table,
111                                  guint       iterations,
112                                  GError    **error);
113 typedef struct _RunTaskFuncs RunTaskFuncs;
114 struct _RunTaskFuncs
115 {
116   RunTaskFunc simplify_flush;
117   RunTaskFunc simplify_noflush;
118   RunTaskFunc nosimplify_flush;
119   RunTaskFunc nosimplify_noflush;
120 };
121 
122 /* optimized variants of RunTaskFuncs:
123       len_memcmp_nomerge
124       len_memcmp_merge
125       nolen_memcmp_merge
126       len_memcmp_merge
127       len_nomerge
128       nolen_nomerge
129       nolen_merge
130       len_merge      */
131 static RunTaskFuncs *
132               table_options_get_run_funcs    (const GskTableOptions *options,
133                                               gboolean        *has_len_out,
134                                               GError         **error);
135 static GskTableFileFactory *
136               table_options_get_file_factory (const GskTableOptions *,
137                                               GError **error);
138 
139 typedef TreeNode *(*InMemoryTreeLookupFunc) (GskTable     *table,
140                                              guint         key_len,
141                                              const guint8 *key_data);
142 typedef int       (*TreeNodeCompareFunc)    (GskTable     *table,
143                                              TreeNode     *a,
144                                              TreeNode     *b);
145 struct _GskTable
146 {
147   char *dir;
148   int lock_fd;
149 
150   /* data handling functions */
151   gboolean has_len;
152   union
153   {
154     GskTableCompareFunc with_len;
155     GskTableCompareFuncNoLen no_len;
156   } compare;
157   union
158   {
159     GskTableMergeFunc with_len;
160     GskTableMergeFuncNoLen no_len;
161   } merge;
162   union
163   {
164     GskTableSimplifyFunc with_len;
165     GskTableSimplifyFuncNoLen no_len;
166   } simplify;
167   GskTableValueIsStableFunc is_stable_func;
168   RunTaskFuncs *run_funcs;
169   InMemoryTreeLookupFunc in_memory_tree_lookup;
170   TreeNodeCompareFunc tree_node_compare;
171 
172   guint query_reverse_chronologically : 1;
173 
174   gpointer user_data;
175   TableUserData *table_user_data; /* for ref-counting */
176 
177   guint64 n_input_entries;
178 
179   /* journalling */
180   int journal_fd;
181   char *journal_cur_fname;
182   char *journal_tmp_fname;              /* actual file to write to */
183   guint8 *journal_mmap;
184   guint journal_len;                    /* current offset in journal */
185   guint journal_size;                   /* size of journal data */
186   GskTableJournalMode journal_mode;
187   guint journal_flush_index;
188 
189   /* files */
190   guint n_files;
191   FileInfo *first_file, *last_file;
192   guint64 last_file_id;
193 
194   /* old files */
195   guint n_old_files;
196   FileInfo **old_files;          /* as in the beginning of the journal file */
197 
198   /* tree of merge tasks, sorted by input file ratio */
199   MergeTask *unstarted_merges;
200 
201   /* heap of merge tasks (sorted by n_entries ascending) */
202   MergeTask *run_list;
203   guint n_running_tasks;
204 
205   /* tree of in-memory entries */
206   TreeNode *in_memory_tree;
207   guint in_memory_bytes;
208   guint in_memory_entry_count;
209 
210   /* fixed pool of tree nodes to used */
211   TreeNode *tree_node_pool;
212   guint tree_node_pool_used;            /* out of max_in_memory_entries */
213 
214   /* fixed length keys and values can be optimized by not storing the length. */
215   gssize key_fixed_length;               /* or -1 */
216   gssize value_fixed_length;             /* or -1 */
217 
218   /* buffers */
219   GskTableBuffer result_buffers[2];
220   GskTableBuffer merge_buffer;
221   GskTableBuffer simplify_buffer;
222   GskTableFileQuery file_query;
223   guint file_query_key_len;
224   const guint8 *file_query_key_data;
225 
226   /* file factory */
227   GskTableFileFactory *file_factory;
228 
229   /* tunables */
230   guint max_running_tasks;
231   guint max_merge_ratio_b16;
232   guint max_in_memory_bytes;
233   guint max_in_memory_entries;
234   guint journal_flush_period;
235 
236 };
237 
238 #define UNSTARTED_MERGE_TASK_IS_RED(task)         (task)->info.unstarted.is_red
239 #define UNSTARTED_MERGE_TASK_SET_IS_RED(task,v)   (task)->info.unstarted.is_red = v
240 #define COMPARE_UNSTARTED_MERGE_TASKS(a,b, rv) \
241   if (a->info.unstarted.input_size_ratio_b16 < b->info.unstarted.input_size_ratio_b16) \
242     rv = -1; \
243   else if (a->info.unstarted.input_size_ratio_b16 > b->info.unstarted.input_size_ratio_b16) \
244     rv = 1; \
245   else \
246     rv = (a<b) ? -1 : (a>b) ? 1 : 0;
247 #define GET_UNSTARTED_MERGE_TASK_TREE(table) \
248   (table)->unstarted_merges, MergeTask *, \
249   UNSTARTED_MERGE_TASK_IS_RED, \
250   UNSTARTED_MERGE_TASK_SET_IS_RED, \
251   info.unstarted.parent, \
252   info.unstarted.left, \
253   info.unstarted.right, \
254   COMPARE_UNSTARTED_MERGE_TASKS
255 
256 /* NOTE: 'table' must be a local-variable or a parameter for
257    these macros to work!!! blah. */
258 #define TREE_NODE_IS_RED(node)         node->is_red
259 #define TREE_NODE_SET_IS_RED(node,v)   node->is_red = v
260 #define TREE_NODE_COMPARE(a,b,rv)      rv = table->tree_node_compare(table, a,b)
261 #define GET_IN_MEMORY_TREE(table) \
262   (table)->in_memory_tree, TreeNode *, \
263   TREE_NODE_IS_RED, TREE_NODE_SET_IS_RED, \
264   parent, left, right, \
265   TREE_NODE_COMPARE
266 
267 
268 static inline void
set_buffer(GskTableBuffer * buffer,guint len,const guint8 * data)269 set_buffer (GskTableBuffer *buffer,
270             guint           len,
271             const guint8   *data)
272 {
273   memcpy (gsk_table_buffer_set_len (buffer, len), data, len);
274 }
275 static inline void
copy_buffer(GskTableBuffer * buffer,const GskTableBuffer * src)276 copy_buffer (GskTableBuffer *buffer,
277              const GskTableBuffer *src)
278 {
279   set_buffer (buffer, src->len, src->data);
280 }
281 static void create_unstarted_merge_task (GskTable *table,
282                                          FileInfo *prev,
283                                          FileInfo *next);
284 
285 /* --- file-info ref-counting --- */
286 static inline FileInfo *
file_info_ref(FileInfo * fi)287 file_info_ref (FileInfo *fi)
288 {
289   g_assert (fi->ref_count > 0);
290   ++(fi->ref_count);
291   return fi;
292 }
293 static inline FileInfo *
file_info_unref(FileInfo * fi,const char * dir,gboolean erase)294 file_info_unref (FileInfo *fi, const char *dir, gboolean erase)
295 {
296   g_assert (fi->ref_count > 0);
297   if (--(fi->ref_count) == 0)
298     {
299       GError *error = NULL;
300       if (!gsk_table_file_destroy (fi->file, dir, erase, &error))
301         {
302           g_warning ("gsk_table_file_destroy "ID_FMT" (erase=%u) failed: %s",
303                      fi->file->id, erase, error->message);
304           g_error_free (error);
305         }
306       g_slice_free (FileInfo, fi);
307     }
308   return fi;
309 }
310 
311 /* --- data structure invariant checking --- */
312 static gboolean
are_files_contiguous(GskTable * table)313 are_files_contiguous (GskTable *table)
314 {
315   guint64 last_end = 0;
316   FileInfo *fi;
317   for (fi = table->first_file; fi != NULL; fi = fi->next_file)
318     {
319       if (last_end != fi->first_input_entry)
320         return FALSE;
321       last_end += fi->n_input_entries;
322     }
323   return TRUE;
324 }
325 #define CHECK_FILES_CONTIGUOUS(table) g_assert (are_files_contiguous (table))
326 
327 /* --- journal management --- */
328 static gboolean read_journal  (GskTable    *table,
329                                GError     **error);
330 static gboolean reset_journal (GskTable    *table,
331                                GError     **error);
332 
333 static inline void
kill_unstarted_merge_task(GskTable * table,MergeTask * to_kill)334 kill_unstarted_merge_task (GskTable *table,
335                            MergeTask *to_kill)
336 {
337   g_assert (to_kill->inputs[0]->next_task == to_kill);
338   g_assert (to_kill->inputs[1]->prev_task == to_kill);
339   GSK_RBTREE_REMOVE (GET_UNSTARTED_MERGE_TASK_TREE (table), to_kill);
340   to_kill->inputs[0]->next_task = NULL;
341   to_kill->inputs[1]->prev_task = NULL;
342   g_slice_free (MergeTask, to_kill);
343 }
344 
345 static void
create_unstarted_merge_task(GskTable * table,FileInfo * prev,FileInfo * next)346 create_unstarted_merge_task (GskTable *table,
347                              FileInfo *prev,
348                              FileInfo *next)
349 {
350   MergeTask *task = g_slice_new (MergeTask);
351   MergeTask *unused;
352   guint32 ratio_b16;
353   g_assert (TASK_IS_UNSTARTED (prev->prev_task));
354   g_assert (prev->next_task == NULL);
355   g_assert (next->prev_task == NULL);
356   g_assert (TASK_IS_UNSTARTED (next->next_task));
357   task->is_started = FALSE;
358   task->inputs[0] = prev;
359   task->inputs[1] = next;
360   if (prev->file->n_entries == 0 && next->file->n_entries == 0)
361     ratio_b16 = 1<<16;
362   else if (next->file->n_entries == 0)
363     ratio_b16 = 0xffffffff;
364   else
365     {
366       gdouble ratio_d_b16 = (double) prev->file->n_entries / next->file->n_entries
367                           * (double) (1<<16);
368       if (ratio_d_b16 >= 0xffffffff)
369         ratio_b16 = 0xffffffff;
370       else
371         ratio_b16 = (guint) ratio_d_b16;
372     }
373 
374   prev->next_task = next->prev_task = task;
375   task->info.unstarted.input_size_ratio_b16 = ratio_b16;
376   GSK_RBTREE_INSERT (GET_UNSTARTED_MERGE_TASK_TREE (table),
377                      task, unused);
378   g_assert (unused == NULL);
379 }
380 
381 static guint
uint64_hash(gconstpointer a)382 uint64_hash (gconstpointer a)
383 {
384   guint64 ai = * (guint64 *) a;
385   guint ai_high = ai>>32;
386   guint ai_low = ai;
387   return ai_low * 33 + ai_high;
388 }
389 static gboolean
uint64_equal(gconstpointer a,gconstpointer b)390 uint64_equal (gconstpointer a, gconstpointer b)
391 {
392   return (* (guint64 *) a) == (* (guint64 *) b);
393 }
394 
395 static gboolean
kill_unknown_files(GskTable * table,GError ** error)396 kill_unknown_files (GskTable *table,
397                     GError  **error)
398 {
399   GDir *dirlist;
400   GHashTable *known_ids;
401   GPtrArray *to_delete;
402   FileInfo *fi;
403   const char *name;
404   const char *at;
405 
406 
407   /* move aside unused files */
408   dirlist = g_dir_open (table->dir, 0, error);
409   if (dirlist == NULL)
410     {
411       g_warning ("g_dir_open failed on existing db dir?: %s", table->dir);
412       /* TODO: cleanup (or maybe just return table anyway?) */
413       return FALSE;
414     }
415   known_ids = g_hash_table_new (uint64_hash, uint64_equal);
416   for (fi = table->first_file; fi; fi = fi->next_file)
417     {
418 #if DEBUG_OLD_FILE_DELETION
419       g_message ("note complete file "ID_FMT"", fi->file->id);
420 #endif
421       g_hash_table_insert (known_ids, &fi->file->id, fi->file);
422       if (fi->next_task != NULL && fi->next_task->is_started)
423         {
424           GskTableFile *output = fi->next_task->info.started.output;
425 #if DEBUG_OLD_FILE_DELETION
426           g_message ("note merge output file "ID_FMT"", output->id);
427 #endif
428           g_hash_table_insert (known_ids, &output->id, output);
429         }
430     }
431   to_delete = g_ptr_array_new ();
432   while ((name=g_dir_read_name (dirlist)) != NULL)
433     {
434       guint64 id;
435       if (name[0] == '.'
436        && ((strcmp (name, ".") == 0 || strcmp (name, "..") == 0)))
437         continue;           /* ignore . and .. */
438 
439       if ('A' <= name[0] && name[0] <= 'Z')
440         continue;           /* ignore user files */
441 
442       if (strcmp (name, "journal") == 0
443        || strcmp (name, "journal.tmp") == 0)
444         continue;           /* ignore journal files */
445 
446       /* find file-id and extension, since it's possible we want to
447          delete this file. */
448       for (at = name; g_ascii_isxdigit (*at); at++)
449         ;
450       if (at == name || *at != '.')
451         {
452           g_warning ("unrecognized file '%s' in dir.. skipping", name);
453           continue;
454         }
455       id = g_ascii_strtoull (name, NULL, 16);
456       /* TODO: verify we know the extension? */
457       if (g_hash_table_lookup (known_ids, &id) == NULL)
458         {
459 #if DEBUG_OLD_FILE_DELETION
460           g_message ("unknown id for file %s [id="ID_FMT"]: deleting it", name, id);
461 #endif
462           g_ptr_array_add (to_delete, g_strdup_printf ("%s/%s", table->dir, name));
463         }
464     }
465   g_hash_table_destroy (known_ids);
466   g_ptr_array_foreach (to_delete, (GFunc) unlink, NULL);    /* eep! */
467   g_ptr_array_foreach (to_delete, (GFunc) g_free, NULL);
468   g_ptr_array_free (to_delete, TRUE);
469   g_dir_close (dirlist);
470   return TRUE;
471 }
472 
473 static gboolean
read_journal(GskTable * table,GError ** error)474 read_journal (GskTable  *table,
475               GError   **error)
476 {
477   int fd = open (table->journal_cur_fname, O_RDWR);
478   struct stat stat_buf;
479   guint i;
480   guint magic, n_files, n_merge_tasks;
481   guint32 tmp32_le;
482   guint64 tmp64_le;
483   guint journal_size;
484   guint8 *mmapped_journal;
485   const guint8 *at;
486   guint64 n_input_entries;
487   FileInfo **file_infos;
488   GskTableFileFactory *file_factory = table->file_factory;
489   guint file_index;
490   guint64 max_file_id = 0;
491   GskTableJournalMode old_journal_mode;
492   if (fd < 0)
493     {
494       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_OPEN,
495                    "error opening journal file %s: %s",
496                    table->journal_cur_fname,
497                    g_strerror (errno));
498       return FALSE;
499     }
500   if (fstat (fd, &stat_buf) < 0)
501     {
502       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_STAT,
503                    "error statting journal file %s: %s",
504                    table->journal_cur_fname,
505                    g_strerror (errno));
506       close (fd);
507       return FALSE;
508     }
509   journal_size = stat_buf.st_size;
510   mmapped_journal = mmap (NULL, journal_size, PROT_READ|PROT_WRITE,
511                           MAP_SHARED, fd, 0);
512   if (mmapped_journal == MAP_FAILED || mmapped_journal == NULL)
513     {
514       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_MMAP,
515                    "error mmapping journal file %s: %s",
516                    table->journal_cur_fname,
517                    g_strerror (errno));
518       close (fd);
519       return FALSE;
520     }
521 
522   /* parse the journal */
523   at = mmapped_journal;
524 
525   tmp32_le = * (guint32 *) at;
526   magic = GUINT32_FROM_LE (tmp32_le);
527   if (magic != JOURNAL_FILE_MAGIC)
528     {
529       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PARSE,
530                    "invalid magic on journal file (0x%08x, not 0x%08x)",
531                    magic, JOURNAL_FILE_MAGIC);
532       goto error_cleanup;
533     }
534   at += 4;
535 
536   tmp32_le = * (guint32 *) at;
537   n_files = GUINT32_FROM_LE (tmp32_le);
538   at += 4;
539 
540   tmp32_le = * (guint32 *) at;
541   n_merge_tasks = GUINT32_FROM_LE (tmp32_le);
542   at += 4;
543 
544   tmp32_le = * (guint32 *) at;
545   at += 4;
546   if (tmp32_le != 0)
547     {
548       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PARSE,
549                    "reserved word in journal nonzero");
550       goto error_cleanup;
551     }
552 
553   tmp64_le = * (guint64 *) at;
554   at += 8;
555   n_input_entries = GUINT64_FROM_LE (tmp64_le);
556 
557 #if DEBUG_JOURNAL_REPLAY
558   g_message ("read_journal: n_files=%u, n_merge_tasks=%u, n_input_entries=%"G_GUINT64_FORMAT,
559              n_files, n_merge_tasks, n_input_entries);
560 #endif
561 
562   file_infos = g_new0 (FileInfo *, n_files);
563 
564   /* parse files */
565   for (i = 0; i < n_files; i++)
566     {
567       guint64 file_id, first_input_entry, n_input_entries, n_entries;
568       FileInfo *file_info;
569 
570       memcpy (&tmp64_le, at, 8);
571       file_id = GUINT64_FROM_LE (tmp64_le);
572       at += 8;
573       max_file_id = MAX (max_file_id, file_id);
574 
575       memcpy (&tmp64_le, at, 8);
576       first_input_entry = GUINT64_FROM_LE (tmp64_le);
577       at += 8;
578 
579       memcpy (&tmp64_le, at, 8);
580       n_input_entries = GUINT64_FROM_LE (tmp64_le);
581       at += 8;
582 
583       memcpy (&tmp64_le, at, 8);
584       n_entries = GUINT64_FROM_LE (tmp64_le);
585       at += 8;
586       file_info = g_slice_new0 (FileInfo);
587       file_info->first_input_entry = first_input_entry;
588       file_info->n_input_entries = n_input_entries;
589       file_info->ref_count = 1;
590 
591       file_info->file
592         = gsk_table_file_factory_open_file (file_factory, table->dir,
593                                             file_id, error);
594       if (file_info->file == NULL)
595         goto error_cleanup;
596       file_info->file->n_entries = n_entries;
597       g_assert (file_info->file);
598       file_infos[i] = file_info;
599 
600       if (i > 0)
601         {
602           FileInfo *prev = file_infos[i-1];
603           guint64 prev_end = prev->first_input_entry + prev->n_input_entries;
604           if (prev_end != file_info->first_input_entry)
605             {
606               g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PARSE,
607                            "inconsistency: files "ID_FMT" and "ID_FMT" are not continguous (prev_end=%"G_GUINT64_FORMAT"; cur_start=%"G_GUINT64_FORMAT")",
608                            prev->file->id, file_info->file->id, prev_end, file_info->first_input_entry);
609               goto error_cleanup;
610             }
611           file_info->prev_file = prev;
612           prev->next_file = file_info;
613         }
614       else
615         file_info->prev_file = NULL;
616     }
617   if (n_files > 0)
618     file_infos[n_files-1]->next_file = NULL;
619 
620   /* parse merge tasks */
621   file_index = 0;
622   for (i = 0; i < n_merge_tasks; i++)
623     {
624       /* format:
625            - for each input:
626              - uint64 for the file-id
627              - reader state
628            - output file-id
629            - output file build state
630        */
631       struct {
632         guint64 file_id;
633         guint reader_state_len;
634         const guint8 *reader_state;
635         FileInfo *file_info;
636       } inputs[2];
637       guint64 output_file_id;
638       guint build_state_len;
639       const guint8 *build_state;
640       MergeTask *merge_task;
641       guint j;
642 
643       for (j = 0; j < 2; j++)
644         {
645           memcpy (&tmp64_le, at, 8);
646           at += 8;
647           inputs[j].file_id = GUINT64_FROM_LE (tmp64_le);
648 
649           memcpy (&tmp32_le, at, 4);
650           at += 4;
651           inputs[j].reader_state_len = GUINT32_FROM_LE (tmp32_le);
652 
653           inputs[j].reader_state = at;
654           at += inputs[j].reader_state_len;
655         }
656 
657       memcpy (&tmp64_le, at, 8);
658       at += 8;
659       output_file_id = GUINT64_FROM_LE (tmp64_le);
660       max_file_id = MAX (max_file_id, output_file_id);
661 
662       memcpy (&tmp32_le, at, 4);
663       at += 4;
664       build_state_len = GUINT32_FROM_LE (tmp32_le);
665 
666       build_state = at;
667       at += build_state_len;
668 
669       /* link merge data into file infos */
670       while (file_index + 1 < n_files
671           && file_infos[file_index]->file->id != inputs[0].file_id)
672         file_index++;
673       if (file_index + 1 == n_files)
674         {
675           g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PARSE,
676                        "merge task's input[0] refers to input file "ID_FMT" which was not found, in the non-tail portion of the files list",
677                        inputs[0].file_id);
678           goto error_cleanup;
679         }
680       inputs[0].file_info = file_infos[file_index];
681       inputs[1].file_info = file_infos[file_index+1];
682       g_assert (inputs[0].file_info->file->id == inputs[0].file_id);
683       if (inputs[1].file_info->file->id != inputs[1].file_id)
684         {
685           g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PARSE,
686                        "second input to merge task nonconsecutive");
687           goto error_cleanup;
688         }
689       g_assert (inputs[0].file_info->next_task == NULL);
690       g_assert (inputs[1].file_info->prev_task == NULL);
691 
692       merge_task = g_slice_new (MergeTask);
693       merge_task->is_started = TRUE;
694       merge_task->inputs[0] = inputs[0].file_info;
695       merge_task->inputs[1] = inputs[1].file_info;
696 
697       for (j = 0; j < 2; j++)
698         {
699           /* restore reader */
700           GskTableReader *reader;
701           reader = gsk_table_file_recreate_reader (inputs[j].file_info->file,
702                                                    table->dir,
703                                                    inputs[j].reader_state_len,
704                                                    inputs[j].reader_state,
705                                                    error);
706           if (reader == NULL)
707             {
708               if (j == 1)
709                 gsk_table_reader_destroy (merge_task->info.started.inputs[0].reader);
710               g_slice_free (MergeTask, merge_task);
711               goto error_cleanup;
712             }
713           merge_task->info.started.inputs[j].reader = reader;
714         }
715 
716       merge_task->info.started.output
717         = gsk_table_file_factory_open_building_file (file_factory,
718                                                      table->dir, output_file_id,
719                                                      build_state_len,
720                                                      build_state, error);
721       if (merge_task->info.started.output == NULL)
722         {
723           gsk_g_error_add_prefix (error,
724                                 "instantiating merge-task between files "ID_FMT" and "ID_FMT,
725                                 inputs[0].file_id,
726                                 inputs[1].file_id);
727           gsk_table_reader_destroy (merge_task->info.started.inputs[0].reader);
728           gsk_table_reader_destroy (merge_task->info.started.inputs[1].reader);
729           g_slice_free (MergeTask, merge_task);
730           goto error_cleanup;
731         }
732       inputs[0].file_info->next_task = merge_task;
733       inputs[1].file_info->prev_task = merge_task;
734 
735       /* neither of these inputs can be involved in another merge-task,
736          so advance the pointer to prevent reuse. */
737       file_index++;
738 
739       GSK_STACK_PUSH (GET_RUN_STACK (table), merge_task);
740       table->n_running_tasks++;
741     }
742 #define COMPARE_RUNNING_TASKS_BY_N_INPUTS(a,b, rv)              \
743     {                                                           \
744       guint64 total_a_inputs = a->inputs[0]->file->n_entries    \
745                              + a->inputs[1]->file->n_entries;   \
746       guint64 total_b_inputs = b->inputs[0]->file->n_entries    \
747                              + b->inputs[1]->file->n_entries;   \
748       if (total_a_inputs < total_b_inputs)                      \
749         rv = 1;                                                 \
750       else if (total_a_inputs > total_b_inputs)                 \
751         rv = -1;                                                \
752       else                                                      \
753         rv = 0;                                                 \
754     }
755   GSK_STACK_SORT (GET_RUN_STACK (table), COMPARE_RUNNING_TASKS_BY_N_INPUTS);
756 #undef COMPARE_RUNNING_TASKS_BY_N_INPUTS
757 #if DEBUG_JOURNAL_REPLAY
758   g_message ("read_journal: header length %u", (guint)(at-mmapped_journal));
759 #endif
760 
761   /* --- do consistency checks --- */
762 
763   /* TODO: other checks not already done during parsing??? */
764 
765 
766   /* setup various bits of the table;
767      after doing this, do not goto error_cleanup,
768      since it will free some of this data */
769   table->journal_mmap = mmapped_journal;
770   table->journal_size = journal_size;
771   table->n_files = n_files;
772   if (n_files > 0)
773     {
774       table->first_file = file_infos[0];
775       table->last_file = file_infos[n_files-1];
776     }
777   else
778     {
779       /* since the GskTable object is zeroed, the list is already empty */
780     }
781   table->n_old_files = n_files;
782   table->old_files = file_infos;
783   for (i = 0; i < n_files; i++)
784     {
785       table->old_files[i] = file_info_ref (file_infos[i]);
786     }
787 
788   /* create unstarted merge tasks */
789   for (i = 0; i + 1 < n_files; i++)
790     if (file_infos[i]->next_task == NULL
791      && TASK_IS_UNSTARTED (file_infos[i]->prev_task)
792      && TASK_IS_UNSTARTED (file_infos[i+1]->next_task))
793       {
794         g_assert (file_infos[i+1]->prev_task == NULL);
795         create_unstarted_merge_task (table, file_infos[i], file_infos[i+1]);
796       }
797 
798   /* delete old extraneous garbage */
799   if (!kill_unknown_files (table, error))
800     return FALSE;
801 
802   table->n_input_entries = n_input_entries;
803   table->last_file_id = max_file_id;
804 
805   /* --- process existing journal entries --- */
806   /* disable journalling and replay the journal */
807   old_journal_mode = table->journal_mode;
808   table->journal_mode = GSK_TABLE_JOURNAL_NONE;
809   for (;;)
810     {
811       guint align_offset = ((gsize)at) & 3;
812       guint32 key_len, value_len;
813       if (align_offset)
814         at += 4 - align_offset;
815       tmp32_le = ((guint32 *) at)[0];
816       key_len = GUINT32_FROM_LE (tmp32_le);
817       if (key_len == 0)
818         break;
819       key_len--;            /* journal key lengths are offset by 1 */
820       tmp32_le = ((guint32 *) at)[1];
821       value_len = GUINT32_FROM_LE (tmp32_le);
822 #if DEBUG_JOURNAL_REPLAY
823       g_message ("replay journal: offset=%u, key,value_len=%u,%u",
824                  (guint)(at - table->journal_mmap),
825                  key_len, value_len);
826 #endif
827       at += 8;
828       if (!gsk_table_add (table, key_len, at, value_len, at + key_len, error))
829         {
830           gsk_g_error_add_prefix (error, "error replaying journal");
831           table->journal_mode = old_journal_mode;
832           /* do not use error_cleanup, let gsk_table_destroy() do the work */
833           return FALSE;
834         }
835       at += key_len + value_len;
836     }
837   table->journal_mode = old_journal_mode;
838   table->journal_len = at - mmapped_journal;
839 
840   return TRUE;
841 
842 error_cleanup:
843   if (file_infos)
844     {
845       for (i = 0; i < n_files && file_infos[i] != NULL; i++)
846         file_info_unref (file_infos[i], table->dir, FALSE);
847       g_free (file_infos);
848     }
849   munmap (mmapped_journal, journal_size);
850   return FALSE;
851 }
852 
853 static gboolean
resize_journal(gint journal_fd,guint8 ** journal_mmap_inout,guint * journal_size_inout,guint new_min_size,GError ** error)854 resize_journal (gint       journal_fd,
855                 guint8   **journal_mmap_inout,
856                 guint     *journal_size_inout,
857                 guint      new_min_size,
858                 GError   **error)
859 {
860   guint new_size;
861   guint8 *tmp;
862   if (new_min_size <= *journal_size_inout)
863     return TRUE;
864   new_size = *journal_size_inout;
865   while (new_size < new_min_size)
866     new_size += new_size;
867 
868   /* un-mmap file */
869   munmap (*journal_mmap_inout, *journal_size_inout);
870 
871   /* resize */
872   if (ftruncate (journal_fd, new_size) < 0)
873     {
874       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_TRUNCATE,
875                    "error resizing journal: %s",
876                    g_strerror (errno));
877       return FALSE;
878     }
879 
880   /* mmap file */
881   tmp = mmap (NULL, new_size, PROT_READ|PROT_WRITE, MAP_SHARED, journal_fd, 0);
882   if (tmp == NULL || tmp == MAP_FAILED)
883     {
884       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_MMAP,
885                    "error mmapping resized journal");
886       return FALSE;
887     }
888   *journal_size_inout = new_size;
889   *journal_mmap_inout = tmp;
890   return TRUE;
891 }
892 
893 static gboolean
reset_journal(GskTable * table,GError ** error)894 reset_journal (GskTable   *table,
895                GError    **error)
896 {
897   guint i;
898   FileInfo *fi;
899   int journal_fd;
900   guint8 *journal_mmap;
901   guint at;
902   guint n_merge_tasks_written;
903 
904   g_assert (table->in_memory_tree == NULL);
905 
906   if (table->journal_mmap)
907     munmap (table->journal_mmap, table->journal_size);
908   if (table->journal_fd >= 0)
909     close (table->journal_fd);
910 
911   /* write temporary new journal which
912      is the state dumped out,
913      and no journalled adds. */
914   {
915     journal_fd = open (table->journal_tmp_fname, O_CREAT|O_RDWR|O_TRUNC, 0644);
916     if (journal_fd < 0)
917       {
918         g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_OPEN,
919                      "error creating new journal file %s: %s",
920                      table->journal_tmp_fname, g_strerror (errno));
921         return FALSE;
922       }
923     if (ftruncate (journal_fd, table->journal_size) < 0)
924       {
925         g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_TRUNCATE,
926                      "error sizing journal file: %s",
927                      g_strerror (errno));
928         goto failed_writing_journal;
929       }
930     journal_mmap = mmap (NULL, table->journal_size, PROT_READ|PROT_WRITE,
931                          MAP_SHARED, journal_fd, 0);
932     if (journal_mmap == NULL || journal_mmap == MAP_FAILED)
933       {
934         g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_TRUNCATE,
935                      "mmap failed on tmp journal: %s",
936                      g_strerror (errno));
937         close (journal_fd);
938         unlink (table->journal_tmp_fname);
939         return FALSE;
940       }
941   }
942 
943   {
944     guint32 header[6];
945     guint32 tmp;
946     header[0] = GUINT32_TO_LE (JOURNAL_FILE_MAGIC);
947     header[1] = GUINT32_TO_LE (table->n_files);
948     header[2] = GUINT32_TO_LE (table->n_running_tasks);
949     header[3] = 0;              /* reserved */
950     tmp = table->n_input_entries;
951     header[4] = GUINT32_TO_LE (tmp);
952     tmp = table->n_input_entries>>32;
953     header[5] = GUINT32_TO_LE (tmp);
954     memcpy (journal_mmap, header, 24);
955   }
956   at = 24;
957   for (fi = table->first_file; fi; fi = fi->next_file)
958     {
959       guint64 file_header[4];
960       file_header[0] = GUINT64_TO_LE (fi->file->id);
961       file_header[1] = GUINT64_TO_LE (fi->first_input_entry);
962       file_header[2] = GUINT64_TO_LE (fi->n_input_entries);
963       file_header[3] = GUINT64_TO_LE (fi->file->n_entries);
964       if (at + sizeof (file_header) > table->journal_size)
965         {
966           if (!resize_journal (journal_fd,
967                                &journal_mmap, &table->journal_size,
968                                at + sizeof (file_header),
969                                error))
970             return FALSE;
971         }
972       memcpy (journal_mmap + at, file_header, sizeof (file_header));
973       at += sizeof (file_header);
974     }
975   n_merge_tasks_written = 0;
976   for (fi = table->first_file; fi; fi = fi->next_file)
977     if (fi->next_task != NULL && fi->next_task->is_started)
978       {
979         MergeTask *task = fi->next_task;
980         guint reader_state_lens[2];
981         guint8 *reader_states[2];
982         guint build_state_len;
983         guint8 *build_state;
984         guint input;
985         for (input = 0; input < 2; input++)
986           {
987             if (!gsk_table_file_get_reader_state (task->inputs[input]->file,
988                                                   task->info.started.inputs[input].reader,
989                                                   &reader_state_lens[input],
990                                                   &reader_states[input],
991                                                   error))
992               {
993                 gsk_g_error_add_prefix (error, "reset_journal: input %u", input);
994                 goto failed_writing_journal;
995               }
996           }
997         if (!gsk_table_file_get_build_state (task->info.started.output,
998                                              &build_state_len,
999                                              &build_state,
1000                                              error))
1001           {
1002             gsk_g_error_add_prefix (error, "reset_journal: build state");
1003             goto failed_writing_journal;
1004           }
1005 
1006         guint total_len;
1007         total_len = (8 + 4 + reader_state_lens[0])
1008                   + (8 + 4 + reader_state_lens[1])
1009                   + (8 + 4 + build_state_len);
1010         if (at + total_len > table->journal_size)
1011           {
1012             if (!resize_journal (journal_fd,
1013                                  &journal_mmap, &table->journal_size,
1014                                  at + total_len,
1015                                  error))
1016               return FALSE;
1017           }
1018         for (input = 0; input < 2; input++)
1019           {
1020             guint64 id = task->inputs[input]->file->id;
1021             guint32 len = reader_state_lens[input];
1022             guint64 id_le = GUINT64_TO_LE (id);
1023             guint32 len_le = GUINT32_TO_LE (len);
1024             memcpy (journal_mmap + at, &id_le, 8);
1025             at += 8;
1026             memcpy (journal_mmap + at, &len_le, 4);
1027             at += 4;
1028             memcpy (journal_mmap + at, reader_states[input], len);
1029             at += len;
1030             g_free (reader_states[input]);
1031           }
1032         {
1033           guint64 id = task->info.started.output->id;
1034           guint32 len = build_state_len;
1035           guint64 id_le = GUINT64_TO_LE (id);
1036           guint32 len_le = GUINT32_TO_LE (len);
1037           memcpy (journal_mmap + at, &id_le, 8);
1038           at += 8;
1039           memcpy (journal_mmap + at, &len_le, 4);
1040           at += 4;
1041           memcpy (journal_mmap + at, build_state, len);
1042           at += len;
1043           g_free (build_state);
1044         }
1045 
1046         n_merge_tasks_written++;
1047       }
1048   g_assert (n_merge_tasks_written == table->n_running_tasks);
1049 
1050   /* move the journal into place */
1051   if (rename (table->journal_tmp_fname, table->journal_cur_fname) < 0)
1052     {
1053       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_RENAME,
1054                    "error moving journal into place: %s",
1055                    g_strerror (errno));
1056       goto failed_writing_journal;
1057     }
1058 
1059 #if DEBUG_JOURNAL_WRITING
1060   g_message ("reset-journal: header length %u", at);
1061 #endif
1062 
1063 
1064   /* align journal pointer */
1065   if (at % 4 != 0)
1066     at += 4 - (at % 4);
1067 
1068   table->journal_len = at;
1069   table->journal_mmap = journal_mmap;
1070 
1071   /* preserve old files */
1072   FileInfo **new_old_files;
1073   new_old_files = g_new (FileInfo *, table->n_files);
1074   i = 0;
1075   for (fi = table->first_file; fi; fi = fi->next_file)
1076     new_old_files[i++] = file_info_ref (fi);
1077   g_assert (i == table->n_files);
1078 
1079   /* blow away old files */
1080   for (i = 0; i < table->n_old_files; i++)
1081     file_info_unref (table->old_files[i], table->dir, TRUE);
1082   g_free (table->old_files);
1083 
1084   table->n_old_files = table->n_files;
1085   table->old_files = new_old_files;
1086 
1087   return TRUE;
1088 
1089 failed_writing_journal:
1090   close (journal_fd);
1091   unlink (table->journal_tmp_fname);
1092   return FALSE;
1093 }
1094 
1095 
1096 /* --- in-memory tree lookup (implementations) --- */
compare_memory(guint a_len,const guint8 * a_data,guint b_len,const guint8 * b_data)1097 static inline gint compare_memory (guint a_len, const guint8 *a_data,
1098                                    guint b_len, const guint8 *b_data)
1099 {
1100   int rv;
1101   if (a_len < b_len)
1102     {
1103       rv = memcmp (a_data, b_data, a_len);
1104       if (rv == 0)
1105         rv = -1;
1106     }
1107   else if (a_len > b_len)
1108     {
1109       rv = memcmp (a_data, b_data, b_len);
1110       if (rv == 0)
1111         rv = 1;
1112     }
1113   else
1114     rv = memcmp (a_data, b_data, a_len);
1115   return rv;
1116 }
1117 
1118 static TreeNode *
in_memory_tree_lookup_memcmp(GskTable * table,guint key_len,const guint8 * key_data)1119 in_memory_tree_lookup_memcmp (GskTable     *table,
1120                               guint         key_len,
1121                               const guint8 *key_data)
1122 {
1123   TreeNode *found = NULL;
1124 #define TMP_KEY_COMPARATOR(unused, at, rv) \
1125   rv = compare_memory (key_len, key_data, at->key.len, at->key.data)
1126   GSK_RBTREE_LOOKUP_COMPARATOR (GET_IN_MEMORY_TREE (table),
1127                                 unused, TMP_KEY_COMPARATOR, found);
1128 #undef TMP_KEY_COMPARATOR
1129   return found;
1130 }
1131 static TreeNode *
in_memory_tree_lookup_with_len(GskTable * table,guint key_len,const guint8 * key_data)1132 in_memory_tree_lookup_with_len (GskTable     *table,
1133                                 guint         key_len,
1134                                 const guint8 *key_data)
1135 {
1136   TreeNode *found = NULL;
1137 #define TMP_KEY_COMPARATOR(unused, at, rv) \
1138   rv = table->compare.with_len (key_len, key_data, \
1139                                 at->key.len, at->key.data, \
1140                                 table->user_data)
1141   GSK_RBTREE_LOOKUP_COMPARATOR (GET_IN_MEMORY_TREE (table),
1142                                 unused, TMP_KEY_COMPARATOR, found);
1143 #undef TMP_KEY_COMPARATOR
1144   return found;
1145 }
1146 
1147 static TreeNode *
in_memory_tree_lookup_no_len(GskTable * table,guint key_len,const guint8 * key_data)1148 in_memory_tree_lookup_no_len (GskTable     *table,
1149                               guint         key_len,
1150                               const guint8 *key_data)
1151 {
1152   TreeNode *found = NULL;
1153 #define TMP_KEY_COMPARATOR(unused, at, rv) \
1154   rv = table->compare.no_len (key_data, at->key.data, table->user_data)
1155   GSK_RBTREE_LOOKUP_COMPARATOR (GET_IN_MEMORY_TREE (table),
1156                                 unused, TMP_KEY_COMPARATOR, found);
1157 #undef TMP_KEY_COMPARATOR
1158   return found;
1159 }
1160 
tree_node_compare_memcmp(GskTable * table,TreeNode * a,TreeNode * b)1161 static int tree_node_compare_memcmp (GskTable *table,
1162                                      TreeNode *a,
1163                                      TreeNode *b)
1164 {
1165   return compare_memory (a->key.len, a->key.data, b->key.len, b->key.data);
1166 }
tree_node_compare_with_len(GskTable * table,TreeNode * a,TreeNode * b)1167 static int tree_node_compare_with_len (GskTable *table,
1168                                        TreeNode *a,
1169                                        TreeNode *b)
1170 {
1171   return table->compare.with_len (a->key.len, a->key.data,
1172                                   b->key.len, b->key.data,
1173                                   table->user_data);
1174 }
tree_node_compare_no_len(GskTable * table,TreeNode * a,TreeNode * b)1175 static int tree_node_compare_no_len (GskTable *table,
1176                                      TreeNode *a,
1177                                      TreeNode *b)
1178 {
1179   return table->compare.no_len (a->key.data, b->key.data, table->user_data);
1180 }
1181 /* NOTE on nomerge variants.  These do not return 0
1182    unless the treenodes are the same (pointerwise).
1183    Note that because the nodes are allocated from
1184    the pool in forward order, pointerwise comparisons
1185    are equivalent to timewise comparisons,
1186    so this comparison is the appropriate (and cheapest) test. */
tree_node_compare_memcmp_nomerge(GskTable * table,TreeNode * a,TreeNode * b)1187 static int tree_node_compare_memcmp_nomerge (GskTable *table,
1188                                      TreeNode *a,
1189                                      TreeNode *b)
1190 {
1191   int rv = compare_memory (a->key.len, a->key.data, b->key.len, b->key.data);
1192   if (rv == 0)
1193     rv = (a < b) ? -1 : (a > b) ? 1 : 0;
1194   return rv;
1195 }
tree_node_compare_with_len_nomerge(GskTable * table,TreeNode * a,TreeNode * b)1196 static int tree_node_compare_with_len_nomerge (GskTable *table,
1197                                        TreeNode *a,
1198                                        TreeNode *b)
1199 {
1200   int rv = table->compare.with_len (a->key.len, a->key.data,
1201                                   b->key.len, b->key.data,
1202                                   table->user_data);
1203   if (rv == 0)
1204     rv = (a < b) ? -1 : (a > b) ? 1 : 0;
1205   return rv;
1206 }
tree_node_compare_no_len_nomerge(GskTable * table,TreeNode * a,TreeNode * b)1207 static int tree_node_compare_no_len_nomerge (GskTable *table,
1208                                      TreeNode *a,
1209                                      TreeNode *b)
1210 {
1211   int rv = table->compare.no_len (a->key.data, b->key.data, table->user_data);
1212   if (rv == 0)
1213     rv = (a < b) ? -1 : (a > b) ? 1 : 0;
1214   return rv;
1215 }
1216 
1217 static int
file_query_compare_memcmp(guint test_key_len,const guint8 * test_key,gpointer compare_data)1218 file_query_compare_memcmp (guint         test_key_len,
1219                            const guint8 *test_key,
1220                            gpointer      compare_data)
1221 {
1222   GskTable *table = compare_data;
1223   guint a_len = table->file_query_key_len;
1224   const guint8 *a = table->file_query_key_data;
1225   guint b_len = test_key_len;
1226   const guint8 *b = test_key;
1227 #if 0
1228   {
1229     char *a_hex = gsk_escape_memory_hex (a, a_len);
1230     char *b_hex = gsk_escape_memory_hex (b, b_len);
1231     g_message ("compare_memory: '%s' v '%s'", a_hex, b_hex);
1232     g_free (a_hex);
1233     g_free (b_hex);
1234   }
1235 #endif
1236   return compare_memory (a_len, a, b_len, b);
1237 }
1238 static int
file_query_compare_no_len(guint test_key_len,const guint8 * test_key,gpointer compare_data)1239 file_query_compare_no_len (guint         test_key_len,
1240                            const guint8 *test_key,
1241                            gpointer      compare_data)
1242 {
1243   GskTable *table = compare_data;
1244   const guint8 *a = table->file_query_key_data;
1245   const guint8 *b = test_key;
1246   return table->compare.no_len (a, b, table->user_data);
1247 }
1248 static int
file_query_compare_with_len(guint test_key_len,const guint8 * test_key,gpointer compare_data)1249 file_query_compare_with_len (guint         test_key_len,
1250                              const guint8 *test_key,
1251                              gpointer      compare_data)
1252 {
1253   GskTable *table = compare_data;
1254   guint a_len = table->file_query_key_len;
1255   const guint8 *a = table->file_query_key_data;
1256   guint b_len = test_key_len;
1257   const guint8 *b = test_key;
1258   return table->compare.with_len (a_len, a, b_len, b, table->user_data);
1259 }
1260 
1261 
1262 /**
1263  * gsk_table_new:
1264  * @dir: the directory where the table will store its data.
1265  * @options: configuration and optimization hints.
1266  * @new_flags: whether to create or open an existing table.
1267  * @error: place to put the error if something goes wrong.
1268  *
1269  * Create a new GskTable object.
1270  * Only one table may use a directory at a time.
1271  *
1272  * @options gives both compare, merge and delete
1273  * semantics, and hints on the sizes of the data.
1274  *
1275  * @new_flags determines whether we are allowed
1276  * to create a new table or open an existing table.
1277  * If @new_flags is 0, we will permit either,
1278  * equivalent to GSK_TABLE_MAY_CREATE|GSK_TABLE_MAY_EXIST.
1279  *
1280  * returns: the newly created table object, or NULL on error.
1281  */
1282 GskTable *
gsk_table_new(const char * dir,const GskTableOptions * options,GskTableNewFlags new_flags,GError ** error)1283 gsk_table_new         (const char            *dir,
1284                        const GskTableOptions *options,
1285                        GskTableNewFlags       new_flags,
1286                        GError               **error)
1287 {
1288   gboolean did_mkdir;
1289   GskTable *table;
1290   RunTaskFuncs *run_funcs;
1291   gboolean has_len;
1292   int lock_fd;
1293   GskTableFileFactory *factory;
1294 
1295   run_funcs = table_options_get_run_funcs (options, &has_len, error);
1296   if (run_funcs == NULL)
1297     return NULL;
1298   factory = table_options_get_file_factory (options, error);
1299   if (factory == NULL)
1300     return NULL;
1301 
1302   if (g_file_test (dir, G_FILE_TEST_IS_DIR))
1303     {
1304       if ((new_flags & GSK_TABLE_MAY_EXIST) == 0)
1305         {
1306           g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_EXISTS,
1307                        "table dir %s already exists", dir);
1308           return NULL;
1309         }
1310       did_mkdir = FALSE;
1311     }
1312   else
1313     {
1314       if ((new_flags & GSK_TABLE_MAY_CREATE) == 0)
1315         {
1316           g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_EXISTS,
1317                        "table dir %s already exists", dir);
1318           return NULL;
1319         }
1320       did_mkdir = TRUE;
1321       if (!gsk_mkdir_p (dir, 0755, error))
1322         return FALSE;
1323     }
1324 
1325   lock_fd = gsk_lock_dir (dir, FALSE, error);
1326   if (lock_fd < 0)
1327     return FALSE;
1328 
1329   table = g_new0 (GskTable, 1);
1330   table->dir = g_strdup (dir);
1331   table->lock_fd = lock_fd;
1332   table->run_funcs = run_funcs;
1333   table->in_memory_tree_lookup
1334     = table->compare.no_len == NULL ? in_memory_tree_lookup_memcmp
1335              : has_len ? in_memory_tree_lookup_with_len
1336              : in_memory_tree_lookup_no_len;
1337   if (options->merge == NULL && options->merge_no_len == NULL)
1338     table->tree_node_compare
1339       = table->compare.no_len == NULL ? tree_node_compare_memcmp_nomerge
1340                : has_len ? tree_node_compare_with_len_nomerge
1341                : tree_node_compare_no_len_nomerge;
1342   else
1343     table->tree_node_compare
1344       = table->compare.no_len == NULL ? tree_node_compare_memcmp
1345                : has_len ? tree_node_compare_with_len
1346                : tree_node_compare_no_len;
1347   table->journal_mode = options->journal_mode;
1348   table->max_running_tasks = 4;
1349   table->max_merge_ratio_b16 = 3<<16;
1350   table->max_in_memory_bytes = options->max_in_memory_bytes;
1351   table->max_in_memory_entries = options->max_in_memory_entries;
1352   table->journal_flush_period = 3;
1353   table->tree_node_pool = g_new0 (TreeNode, table->max_in_memory_entries);
1354   table->journal_cur_fname = g_strdup_printf ("%s/journal", dir);
1355   table->journal_tmp_fname = g_strdup_printf ("%s/journal.tmp", dir);
1356   table->key_fixed_length = -1;
1357   table->value_fixed_length = -1;
1358   table->file_factory = factory;
1359   // INIT? query_inout ???
1360   table->file_query.compare
1361       = table->compare.no_len == NULL ? file_query_compare_memcmp
1362                : has_len ? file_query_compare_with_len
1363                : file_query_compare_no_len;
1364   table->file_query.compare_data = table;
1365 
1366   table->has_len = has_len;
1367   if (has_len)
1368     {
1369       table->compare.with_len = options->compare;
1370       table->merge.with_len = options->merge;
1371       table->simplify.with_len = options->simplify;
1372     }
1373   else
1374     {
1375       table->compare.no_len = options->compare_no_len;
1376       table->merge.no_len = options->merge_no_len;
1377       table->simplify.no_len = options->simplify_no_len;
1378     }
1379   table->is_stable_func = options->is_stable;
1380 
1381   if (did_mkdir)
1382     {
1383       /* make an empty journal file */
1384       guint journal_size = 1024;
1385       guint min_journal_size = options->max_in_memory_bytes + 8 * options->max_in_memory_bytes;
1386       while (journal_size < min_journal_size)
1387         journal_size += journal_size;
1388       table->journal_size = journal_size;
1389       table->journal_fd = -1;
1390       if (!reset_journal (table, error))
1391         {
1392           gsk_table_destroy (table);
1393           gsk_rm_rf (table->dir, NULL);
1394           return NULL;
1395         }
1396     }
1397   else
1398     {
1399       /* load existing table */
1400       if (!read_journal (table, error))
1401         {
1402           GError *e = NULL;
1403           g_free (table->dir);
1404           if (!gsk_unlock_dir (table->lock_fd, &e))
1405             {
1406               g_warning ("error unlocking dir on failure reading journal: %s",
1407                          e->message);
1408               g_clear_error (&e);
1409             }
1410           g_free (table);
1411           return NULL;
1412         }
1413     }
1414 
1415   return table;
1416 }
1417 
1418 
1419 /* --- starting a merge-task */
1420 static gboolean
start_merge_task(GskTable * table,MergeTask * merge_task,GError ** error)1421 start_merge_task (GskTable   *table,
1422                   MergeTask  *merge_task,
1423                   GError    **error)
1424 {
1425   FileInfo *prev = merge_task->inputs[0];
1426   FileInfo *next = merge_task->inputs[1];
1427   GskTableFileHints file_hints = GSK_TABLE_FILE_HINTS_DEFAULTS;
1428   guint64 output_file_id;
1429   GskTableFile *output;
1430   guint input;
1431   GskTableReader *readers[2];
1432   guint64 n_input_entries = prev->file->n_entries + next->file->n_entries;
1433 #if DEBUG_MERGE_TASKS
1434   g_message ("starting mergetask between "ID_FMT" and "ID_FMT" [%"G_GUINT64_FORMAT" input entries]",
1435              prev->file->id, next->file->id, n_input_entries);
1436 #endif
1437   g_assert (!merge_task->is_started);
1438   g_assert (prev->prev_task == NULL || !prev->prev_task->is_started);
1439   g_assert (prev->next_task == merge_task);
1440   g_assert (next->prev_task == merge_task);
1441   g_assert (next->next_task == NULL || !next->next_task->is_started);
1442   if (prev->prev_task)
1443     {
1444       MergeTask *to_kill = prev->prev_task;
1445       g_assert (!to_kill->is_started);
1446       g_assert (to_kill->inputs[1] == prev);
1447       kill_unstarted_merge_task (table, to_kill);
1448     }
1449   if (next->next_task)
1450     {
1451       MergeTask *to_kill = next->next_task;
1452       g_assert (!to_kill->is_started);
1453       g_assert (to_kill->inputs[0] == next);
1454       kill_unstarted_merge_task (table, to_kill);
1455     }
1456 
1457   GSK_RBTREE_REMOVE (GET_UNSTARTED_MERGE_TASK_TREE (table), merge_task);
1458 
1459   for (input = 0; input < 2; input++)
1460     {
1461       readers[input] = gsk_table_file_create_reader (merge_task->inputs[input]->file,
1462                                                      table->dir,
1463                                                      error);
1464       if (readers[input] == NULL)
1465         {
1466           gsk_g_error_add_prefix (error, "creating merge job: error making reader %u", input);
1467           if (input == 1)
1468             gsk_table_reader_destroy (readers[0]);
1469           return FALSE;
1470         }
1471     }
1472 
1473   output_file_id = ++(table->last_file_id);
1474   output = gsk_table_file_factory_create_file (table->file_factory,
1475                                                table->dir,
1476                                                output_file_id,
1477                                                &file_hints,
1478                                                error);
1479   if (output == NULL)
1480     {
1481       gsk_g_error_add_prefix (error, "creating merge-task output");
1482       gsk_table_reader_destroy (readers[0]);
1483       gsk_table_reader_destroy (readers[1]);
1484       return FALSE;
1485     }
1486 
1487   merge_task->is_started = TRUE;
1488   merge_task->info.started.output = output;
1489   merge_task->info.started.inputs[0].reader = readers[0];
1490   merge_task->info.started.inputs[1].reader = readers[1];
1491   merge_task->info.started.has_last_queryable_key = FALSE;
1492   gsk_table_buffer_init (&merge_task->info.started.last_queryable_key);
1493 
1494   /* insert into run-list */
1495   MergeTask **p_next = &table->run_list;
1496   for (;;)
1497     {
1498       MergeTask *next = *p_next;
1499       guint64 next_n_input_entries;
1500       if (next == NULL)
1501         break;
1502       next_n_input_entries = next->inputs[0]->file->n_entries
1503                            + next->inputs[1]->file->n_entries;
1504       if (next_n_input_entries > n_input_entries)
1505         break;
1506 
1507       p_next = &next->info.started.next_run;
1508     }
1509   merge_task->info.started.next_run = *p_next;
1510   *p_next = merge_task;
1511   table->n_running_tasks++;
1512   return TRUE;
1513 }
1514 
1515 static gboolean
maybe_start_tasks(GskTable * table,GError ** error)1516 maybe_start_tasks (GskTable *table,
1517                    GError  **error)
1518 {
1519   while (table->n_running_tasks < table->max_running_tasks
1520       && table->unstarted_merges != NULL)
1521     {
1522       MergeTask *bottom_heaviest;
1523       GSK_RBTREE_FIRST (GET_UNSTARTED_MERGE_TASK_TREE (table), bottom_heaviest);
1524       if (bottom_heaviest->info.unstarted.input_size_ratio_b16
1525           > table->max_merge_ratio_b16)
1526         break;
1527       if (!start_merge_task (table, bottom_heaviest, error))
1528         return FALSE;
1529     }
1530   return TRUE;
1531 }
1532 
1533 static inline gboolean
run_merge_task(GskTable * table,guint count,gboolean flush,GError ** error)1534 run_merge_task (GskTable   *table,
1535                 guint       count,
1536                 gboolean    flush,
1537                 GError    **error)
1538 {
1539   MergeTask *merge_task = table->run_list;
1540   RunTaskFuncs *run_funcs = table->run_funcs;
1541   RunTaskFunc func;
1542   gboolean use_simplify;
1543   g_assert (merge_task != NULL);
1544 
1545 #if DEBUG_MERGE_TASKS
1546   g_message ("run_merge_task between "ID_FMT" and "ID_FMT"; count=%u, flush=%d",
1547              merge_task->inputs[0]->file->id,
1548              merge_task->inputs[1]->file->id,
1549              count, flush);
1550 #endif
1551 
1552   use_simplify = merge_task->inputs[0]->first_input_entry == 0
1553               && table->simplify.no_len != NULL;
1554   func = use_simplify ? (flush ? run_funcs->simplify_flush
1555                                : run_funcs->simplify_noflush)
1556                       : (flush ? run_funcs->nosimplify_flush
1557                                : run_funcs->nosimplify_noflush);
1558   return (*func) (table, count, error);
1559 }
1560 
1561 static gboolean
dump_tree_recursively(TreeNode * node,GskTableFile * file,GError ** error)1562 dump_tree_recursively (TreeNode    *node,
1563                        GskTableFile *file,
1564                        GError     **error)
1565 {
1566   if (node->left != NULL
1567    && !dump_tree_recursively (node->left, file, error))
1568     return FALSE;
1569   if (gsk_table_file_feed_entry (file, node->key.len, node->key.data,
1570                                  node->value.len, node->value.data,
1571                                  error) == GSK_TABLE_FEED_ENTRY_ERROR)
1572     return FALSE;
1573   if (node->right != NULL
1574    && !dump_tree_recursively (node->right, file, error))
1575     return FALSE;
1576   return TRUE;
1577 }
1578 
1579 static gboolean
flush_tree(GskTable * table,GError ** error)1580 flush_tree (GskTable   *table,
1581             GError    **error)
1582 {
1583   guint64 id = ++(table->last_file_id);
1584   GskTableFileHints file_hints = GSK_TABLE_FILE_HINTS_DEFAULTS;
1585   GskTableFile *file = gsk_table_file_factory_create_file (table->file_factory,
1586                                                            table->dir,
1587                                                            id,
1588                                                            &file_hints,
1589                                                            error);
1590   FileInfo *fi;
1591   gboolean done;
1592   if (file == NULL)
1593     {
1594       gsk_g_error_add_prefix (error, "flushing in-memory tree");
1595       return FALSE;
1596     }
1597   if (!dump_tree_recursively (table->in_memory_tree, file, error))
1598     {
1599       gsk_g_error_add_prefix (error, "dumping in-memory tree");
1600       gsk_table_file_destroy (file, table->dir, TRUE, NULL);
1601       return FALSE;
1602     }
1603   if (!gsk_table_file_done_feeding (file, &done, error))
1604     {
1605       gsk_g_error_add_prefix (error, "finishing flushing in-memory tree");
1606       gsk_table_file_destroy (file, table->dir, TRUE, NULL);
1607       return FALSE;
1608     }
1609   if (done == FALSE)
1610     {
1611       g_error ("TODO: handle files that require a bit of baking at end");
1612     }
1613 
1614   fi = g_slice_new0 (FileInfo);
1615   fi->ref_count = 1;
1616   fi->first_input_entry = table->n_input_entries - table->in_memory_entry_count;
1617   fi->n_input_entries = table->in_memory_entry_count;
1618   fi->file = file;
1619   table->n_files++;
1620   GSK_LIST_APPEND (GET_FILE_INFO_LIST (table), fi);
1621   if (fi->prev_file && TASK_IS_UNSTARTED (fi->prev_file->prev_task))
1622     create_unstarted_merge_task (table, fi->prev_file, fi);
1623   CHECK_FILES_CONTIGUOUS (table);
1624 
1625   /* reset tree */
1626   table->in_memory_entry_count = 0;
1627   table->in_memory_bytes = 0;
1628   table->in_memory_tree = NULL;
1629   table->tree_node_pool_used = 0;
1630   return TRUE;
1631 }
1632 
1633 /**
1634  * gsk_table_add:
1635  * @table: the table to add data to.
1636  * @key_len:
1637  * @key_data:
1638  * @value_len:
1639  * @value_data:
1640  * @error: place to put the error if something goes wrong.
1641  *
1642  * Add a new key/value pair to a GskTable.
1643  * If the key already exists, the semantics are dependent
1644  * on the merge function; if no merge function is given,
1645  * then both rows will exist in the table.
1646  *
1647  * returns: whether the addition was successful.
1648  */
1649 gboolean
gsk_table_add(GskTable * table,guint key_len,const guint8 * key_data,guint value_len,const guint8 * value_data,GError ** error)1650 gsk_table_add         (GskTable              *table,
1651                        guint                  key_len,
1652                        const guint8          *key_data,
1653                        guint                  value_len,
1654                        const guint8          *value_data,
1655                        GError               **error)
1656 {
1657   TreeNode *found;
1658   gboolean must_write_journal = table->journal_mode == GSK_TABLE_JOURNAL_DEFAULT;
1659   g_assert (table->key_fixed_length < 0
1660          || (guint) table->key_fixed_length == key_len);
1661   g_assert (table->value_fixed_length < 0
1662          || (guint) table->value_fixed_length == value_len);
1663 
1664   table->n_input_entries++;
1665 
1666   if (table->merge.no_len == NULL)
1667     found = NULL;
1668   else
1669     found = table->in_memory_tree_lookup (table, key_len, key_data);
1670   if (found)
1671     {
1672       /* Merge the old data with the new data. */
1673       GskTableMergeResult merge_result;
1674       if (table->has_len)
1675         merge_result = table->merge.with_len (key_len, key_data,
1676                                        found->value.len, found->value.data,
1677                                        value_len, value_data,
1678                                        &table->merge_buffer, table->user_data);
1679       else
1680         merge_result = table->merge.no_len (key_data, found->value.data,
1681                                        value_data,
1682                                        &table->merge_buffer, table->user_data);
1683 
1684       switch (merge_result)
1685         {
1686         case GSK_TABLE_MERGE_RETURN_A:
1687           /* nothing to do */
1688           break;
1689         case GSK_TABLE_MERGE_RETURN_B:
1690           table->in_memory_bytes -= found->value.len;
1691           table->in_memory_bytes += value_len;
1692           set_buffer (&found->value, value_len, value_data);
1693           break;
1694         case GSK_TABLE_MERGE_SUCCESS:
1695           table->in_memory_bytes -= found->value.len;
1696           table->in_memory_bytes += table->merge_buffer.len;
1697           copy_buffer (&found->value, &table->merge_buffer);
1698           break;
1699         case GSK_TABLE_MERGE_DROP:
1700           GSK_RBTREE_REMOVE (GET_IN_MEMORY_TREE (table), found);
1701           /* TreeNode itself will be auto-recycled */
1702           break;
1703         }
1704     }
1705   else
1706     {
1707       TreeNode *new = table->tree_node_pool + table->tree_node_pool_used++;
1708 
1709       /* write key and value into the new node */
1710       set_buffer (&new->key, key_len, key_data);
1711       set_buffer (&new->value, value_len, value_data);
1712 
1713       /* TODO: this repeats the work of the lookup.
1714          maybe we need a GSK_RBTREE_INSERT_NO_REPLACE()
1715          which can be used at the outset instead of LOOKUP_COMPARATOR? */
1716       GSK_RBTREE_INSERT (GET_IN_MEMORY_TREE (table), new, found);
1717       g_assert (found == NULL);
1718 
1719       table->in_memory_bytes += key_len + value_len;
1720     }
1721 
1722   table->in_memory_entry_count++;
1723   if (table->in_memory_entry_count == table->max_in_memory_entries
1724    || table->in_memory_bytes >= table->max_in_memory_bytes)
1725     {
1726       /* flush the tree */
1727       if (!flush_tree (table, error))
1728         {
1729           gsk_g_error_add_prefix (error, "flushing tree");
1730           return FALSE;
1731         }
1732 
1733       /* maybe flush journal */
1734       if (table->journal_mode != GSK_TABLE_JOURNAL_NONE
1735        && (++(table->journal_flush_index) == table->journal_flush_period))
1736         {
1737           /* write new journal */
1738           if (!reset_journal (table, error))
1739             {
1740               gsk_g_error_add_prefix (error, "error flushing journal");
1741               return FALSE;
1742             }
1743 
1744           table->journal_flush_index = 0;
1745           must_write_journal = FALSE;
1746         }
1747 
1748       if (!maybe_start_tasks (table, error))
1749         return FALSE;
1750     }
1751 
1752   if (table->run_list != NULL)
1753     {
1754       if (!run_merge_task (table, 32, FALSE, error))
1755         return FALSE;
1756     }
1757 
1758   if (must_write_journal)
1759     {
1760       guint new_journal_len = 4 + key_len + 4 + value_len + table->journal_len;
1761       if (new_journal_len % 4 != 0)
1762         new_journal_len += (4 - new_journal_len % 4);
1763       if (new_journal_len + 4 > table->journal_size)
1764         {
1765           if (!resize_journal (table->journal_fd,
1766                                &table->journal_mmap,
1767                                &table->journal_size,
1768                                new_journal_len + 4,
1769                                error))
1770             {
1771               gsk_g_error_add_prefix (error, "expanding journal");
1772               return FALSE;
1773             }
1774         }
1775 #if DEBUG_JOURNAL_WRITING
1776       g_message ("writing journal at %u: key/value_len=%u/%u", (guint) table->journal_len, key_len,value_len);
1777 #endif
1778       memset (table->journal_mmap + new_journal_len, 0, 4);
1779       memcpy (table->journal_mmap + 8 + table->journal_len,
1780               key_data, key_len);
1781       memcpy (table->journal_mmap + 8 + table->journal_len + key_len,
1782               value_data, value_len);
1783       ((guint32*)(table->journal_mmap + table->journal_len))[1]
1784          = GUINT32_TO_LE (value_len);
1785       GSK_MEMORY_BARRIER ();
1786       ((guint32*)(table->journal_mmap + table->journal_len))[0]
1787          = GUINT32_TO_LE (key_len + 1);
1788       table->journal_len = new_journal_len;
1789     }
1790   return TRUE;
1791 }
1792 
1793 static inline int
do_compare(GskTable * table,guint a_len,const guint8 * a_data,guint b_len,const guint8 * b_data)1794 do_compare (GskTable *table,
1795             guint     a_len,
1796             const guint8 *a_data,
1797             guint     b_len,
1798             const guint8 *b_data)
1799 {
1800   if (table->compare.no_len == NULL)
1801     return compare_memory (a_len, a_data, b_len, b_data);
1802   else if (table->has_len)
1803     return table->compare.with_len (a_len, a_data, b_len, b_data, table->user_data);
1804   else
1805     return table->compare.no_len (a_data, b_data, table->user_data);
1806 }
1807 
1808 gboolean
gsk_table_query(GskTable * table,guint key_len,const guint8 * key_data,gboolean * found_value_out,guint * value_len_out,guint8 ** value_data_out,GError ** error)1809 gsk_table_query       (GskTable              *table,
1810                        guint                  key_len,
1811                        const guint8          *key_data,
1812                        gboolean              *found_value_out,
1813                        guint                 *value_len_out,
1814                        guint8               **value_data_out,
1815                        GError               **error)
1816 {
1817   gboolean reverse = table->query_reverse_chronologically;
1818   gboolean has_result = FALSE;
1819   GskTableBuffer *result_buffers = table->result_buffers;       /* [2] */
1820   GskTableBuffer *result = result_buffers;
1821   GskTableBuffer *other_result = result_buffers+1;
1822   GskTableFileQuery *query = &table->file_query;
1823   FileInfo *fi;
1824   gboolean use_merge_tasks = TRUE;
1825   gpointer user_data = table->user_data;
1826 
1827   table->file_query_key_len = key_len;
1828   table->file_query_key_data = key_data;
1829 
1830 #if DEBUG_PRINT_QUERIES
1831   {
1832     char *hex = gsk_escape_memory_hex (key_data, key_len);
1833     g_message ("lookup '%s'", hex);
1834     g_free (hex);
1835   }
1836 #endif
1837 
1838   /* first query rbtree (if in reverse-chronological mode (default)) */
1839   if (reverse)
1840     {
1841       TreeNode *node = table->in_memory_tree_lookup (table, key_len, key_data);
1842       if (node != NULL)
1843         {
1844           has_result = TRUE;
1845           copy_buffer (result, &node->value);
1846 
1847           /* are we done? */
1848           if (table->is_stable_func != NULL
1849            && table->is_stable_func (key_len, key_data,
1850                                      result->len, result->data,
1851                                      table->user_data))
1852             goto done_querying_copy_result;
1853         }
1854     }
1855 
1856   /* walk through files, using merge-jobs as appropriate */
1857   for (fi = reverse ? table->last_file : table->first_file;
1858        fi != NULL;
1859        fi = reverse ? fi->prev_file : fi->next_file)
1860     {
1861       MergeTask *mt = reverse ? fi->prev_task : fi->next_task;
1862       gboolean used_merge_output = FALSE;
1863       if (use_merge_tasks && mt != NULL && mt->is_started)
1864         {
1865           if (mt->info.started.has_last_queryable_key)
1866             {
1867               /* compare last_queryable_key to key */
1868               int rv = do_compare (table,
1869                                    mt->info.started.last_queryable_key.len,
1870                                    mt->info.started.last_queryable_key.data,
1871                                    key_len, key_data);
1872               if (rv < 0)
1873                 goto cannot_use_merge_output;
1874 
1875 
1876               if (!gsk_table_file_query (mt->info.started.output,
1877                                          query, error))
1878                 {
1879                   gsk_g_error_add_prefix (error, "querying merge-task output");
1880                   goto failed;
1881                 }
1882               used_merge_output = TRUE;
1883               goto handle_file_query_result;
1884             }
1885         }
1886 cannot_use_merge_output:
1887 
1888       /* query file */
1889       if (!gsk_table_file_query (fi->file, query, error))
1890         {
1891           gsk_g_error_add_prefix (error, "querying merge-task output");
1892           goto failed;
1893         }
1894 
1895 handle_file_query_result:
1896       if (query->found)
1897         {
1898           if (has_result)
1899             {
1900               /* merge values */
1901               GskTableMergeResult merge_result;
1902               if (reverse)
1903                 merge_result
1904                   = table->has_len ?
1905                        table->merge.with_len (key_len, key_data,
1906                                               query->value.len,
1907                                               query->value.data,
1908                                               result->len, result->data,
1909                                               other_result,
1910                                               user_data)
1911                      : table->merge.no_len (key_data,
1912                                             query->value.data,
1913                                             result->data,
1914                                             other_result,
1915                                             user_data);
1916               else
1917                 merge_result
1918                   = table->has_len ?
1919                        table->merge.with_len (key_len, key_data,
1920                                               result->len, result->data,
1921                                               query->value.len,
1922                                               query->value.data,
1923                                               other_result,
1924                                               user_data)
1925                      : table->merge.no_len (key_data,
1926                                             result->data,
1927                                             query->value.data,
1928                                             other_result,
1929                                             user_data);
1930               switch (merge_result)
1931                 {
1932                 case GSK_TABLE_MERGE_RETURN_A:
1933                   if (reverse)
1934                     copy_buffer (result, &query->value);
1935                   else
1936                     (void) 0; /* do nothing: result is already correct */
1937                   break;
1938                 case GSK_TABLE_MERGE_RETURN_B:
1939                   if (!reverse)
1940                     copy_buffer (result, &query->value);
1941                   else
1942                     (void) 0; /* do nothing: result is already correct */
1943                   break;
1944                 case GSK_TABLE_MERGE_SUCCESS:
1945                   {
1946                     GskTableBuffer *tmp = result;
1947                     result = other_result;
1948                     other_result = tmp;
1949                     break;
1950                   }
1951                 case GSK_TABLE_MERGE_DROP:
1952                   has_result = FALSE;
1953                   break;
1954                 default:
1955                   g_assert_not_reached ();
1956                 }
1957             }
1958           else if (table->merge.no_len == NULL)
1959             {
1960               *found_value_out = TRUE;
1961               copy_buffer (result, &query->value);
1962               goto done_querying_copy_result;
1963             }
1964           else
1965             {
1966               has_result = TRUE;
1967               copy_buffer (result, &query->value);
1968             }
1969 
1970           /* are we done? */
1971           if (table->is_stable_func != NULL
1972            && table->is_stable_func (key_len, key_data,
1973                                      result->len, result->data,
1974                                      table->user_data))
1975             goto done_querying_copy_result;
1976         }
1977 
1978       /* skip one extra file */
1979       if (used_merge_output)
1980         fi = reverse ? fi->prev_file : fi->next_file;
1981     }
1982 
1983   /* last query rbtree (if in chronological mode) */
1984   if (!reverse)
1985     {
1986       TreeNode *node = table->in_memory_tree_lookup (table, key_len, key_data);
1987       if (node != NULL)
1988         {
1989           if (has_result)
1990             {
1991               /* merge */
1992               GskTableMergeResult merge_result;
1993               merge_result
1994                 = table->has_len
1995                      ? table->merge.with_len (key_len, key_data,
1996                                               result->len, result->data,
1997                                               node->value.len, node->value.data,
1998                                               other_result,
1999                                               user_data)
2000                      : table->merge.no_len (key_data,
2001                                             result->data,
2002                                             node->value.data,
2003                                             other_result,
2004                                             user_data);
2005               switch (merge_result)
2006                 {
2007                 case GSK_TABLE_MERGE_RETURN_A:
2008                   break;
2009                 case GSK_TABLE_MERGE_RETURN_B:
2010                   copy_buffer (result, &node->value);
2011                   break;
2012                 case GSK_TABLE_MERGE_SUCCESS:
2013                   {
2014                     GskTableBuffer *tmp = result;
2015                     result = other_result;
2016                     other_result = tmp;
2017                     break;
2018                   }
2019                 case GSK_TABLE_MERGE_DROP:
2020                   has_result = FALSE;
2021                   break;
2022                 default:
2023                   g_assert_not_reached ();
2024                 }
2025             }
2026           else
2027             {
2028               has_result = TRUE;
2029               copy_buffer (result, &node->value);
2030             }
2031 
2032           /* note: no need to check stability,
2033              since there's nothing more to do anyways. */
2034         }
2035     }
2036 
2037 done_querying_copy_result:
2038   *found_value_out = has_result;
2039   if (has_result)
2040     {
2041       *value_len_out = result->len;
2042       *value_data_out = g_memdup (result->data, result->len);
2043     }
2044   /* fall through */
2045 
2046 //done_querying:
2047   return TRUE;
2048 
2049 failed:
2050   return FALSE;
2051 }
2052 
2053 const char *
gsk_table_peek_dir(GskTable * table)2054 gsk_table_peek_dir    (GskTable              *table)
2055 {
2056   return table->dir;
2057 }
2058 
2059 void
gsk_table_destroy(GskTable * table)2060 gsk_table_destroy     (GskTable              *table)
2061 {
2062   guint i;
2063   FileInfo *fi, *next=NULL;
2064   for (fi = table->first_file; fi != NULL; fi = next)
2065     {
2066       next = fi->next_file;
2067       file_info_unref (fi, table->dir, FALSE);           /* may free fi */
2068     }
2069   for (i = 0; i < table->n_old_files; i++)
2070     file_info_unref (table->old_files[i], table->dir, FALSE);
2071   g_free (table->old_files);
2072   g_free (table->journal_cur_fname);
2073   g_free (table->journal_tmp_fname);
2074   munmap (table->journal_mmap, table->journal_size);
2075   gsk_table_buffer_clear (&table->result_buffers[0]);
2076   gsk_table_buffer_clear (&table->result_buffers[1]);
2077   gsk_table_buffer_clear (&table->merge_buffer);
2078   gsk_table_buffer_clear (&table->simplify_buffer);
2079   g_slice_free (GskTable, table);
2080 }
2081 
2082 
2083 
2084 static gboolean
merge_task_done(GskTable * table,MergeTask * task,GError ** error)2085 merge_task_done (GskTable    *table,
2086                  MergeTask   *task,
2087                  GError     **error)
2088 {
2089   gboolean done;
2090   FileInfo *new_file;
2091 
2092   g_assert (task == table->run_list);
2093   g_assert (task->inputs[0]->prev_task == NULL);
2094   g_assert (task->inputs[1]->next_task == NULL);
2095 
2096   /* remove this task from the run list */
2097   table->run_list = task->info.started.next_run;
2098   table->n_running_tasks--;
2099 
2100 #if DEBUG_MERGE_TASKS
2101   g_message ("finished mergetask between "ID_FMT" and "ID_FMT, task->inputs[0]->file->id, task->inputs[1]->file->id);
2102 #endif
2103 
2104   /* finish the output file */
2105   if (!gsk_table_file_done_feeding (task->info.started.output, &done, error))
2106     return FALSE;
2107   if (done == FALSE)
2108     g_error ("gsk_table_file_done_feeding not ready not handled yet");
2109 
2110   /* destroy the input readers */
2111   gsk_table_reader_destroy (task->info.started.inputs[0].reader);
2112   gsk_table_reader_destroy (task->info.started.inputs[1].reader);
2113 
2114   /* create a new FileInfo */
2115   new_file = g_slice_new0 (FileInfo);
2116   new_file->ref_count = 1;
2117   new_file->first_input_entry = task->inputs[0]->first_input_entry;
2118   new_file->n_input_entries = task->inputs[0]->n_input_entries
2119                             + task->inputs[1]->n_input_entries;
2120   new_file->file = task->info.started.output;
2121 
2122   /* replace task->inputs[0,1] with the new file */
2123   CHECK_FILES_CONTIGUOUS (table);
2124   GSK_LIST_INSERT_BEFORE (GET_FILE_INFO_LIST (table), task->inputs[0], new_file);
2125   GSK_LIST_REMOVE (GET_FILE_INFO_LIST (table), task->inputs[0]);
2126   GSK_LIST_REMOVE (GET_FILE_INFO_LIST (table), task->inputs[1]);
2127   table->n_files -= 1;
2128   CHECK_FILES_CONTIGUOUS (table);
2129 
2130   /* possibly create more unstarted merge-tasks */
2131   if (new_file->prev_file != NULL
2132    && TASK_IS_UNSTARTED (new_file->prev_file->prev_task))
2133     create_unstarted_merge_task (table, new_file->prev_file, new_file);
2134   if (new_file->next_file != NULL
2135    && TASK_IS_UNSTARTED (new_file->next_file->next_task))
2136     create_unstarted_merge_task (table, new_file, new_file->next_file);
2137 
2138   g_slice_free (MergeTask, task);
2139 
2140   return TRUE;
2141 }
2142 
2143 /* --- optimizing run_merge_task variants --- */
2144 #include "gsktable-implementations-generated.c"
2145 
2146 #define DEFINE_RUN_TASK_FUNCS(suffix) \
2147 { run_merge_task__simplify_flush_##suffix, \
2148   run_merge_task__simplify_noflush_##suffix, \
2149   run_merge_task__nosimplify_flush_##suffix, \
2150   run_merge_task__nosimplify_noflush_##suffix }
2151 static RunTaskFuncs all_run_task_funcs[2][2][2] = /* has_len, has_compare, has_merge */
2152 { { { DEFINE_RUN_TASK_FUNCS(nolen_memcmp_nomerge),
2153       DEFINE_RUN_TASK_FUNCS(nolen_memcmp_merge) },
2154     { DEFINE_RUN_TASK_FUNCS(nolen_compare_nomerge),
2155       DEFINE_RUN_TASK_FUNCS(nolen_compare_merge) } },
2156   { { DEFINE_RUN_TASK_FUNCS(haslen_memcmp_nomerge),
2157       DEFINE_RUN_TASK_FUNCS(haslen_memcmp_merge) },
2158     { DEFINE_RUN_TASK_FUNCS(haslen_compare_nomerge),
2159       DEFINE_RUN_TASK_FUNCS(haslen_compare_merge) } } };
2160 #undef DEFINE_RUN_TASK_FUNCS
2161 
table_options_get_run_funcs(const GskTableOptions * options,gboolean * has_len_out,GError ** error)2162 static RunTaskFuncs *table_options_get_run_funcs (const GskTableOptions *options,
2163                                                   gboolean        *has_len_out,
2164                                                   GError         **error)
2165 {
2166   gboolean has_len     = options->compare != NULL
2167                       || options->merge != NULL
2168                       || options->simplify != NULL;
2169   gboolean has_compare = options->compare != NULL
2170                       || options->compare_no_len != NULL;
2171   gboolean has_merge   = options->merge != NULL
2172                       || options->merge_no_len != NULL;
2173 #define TEST_MEMBER_NULL(member) \
2174   if (options->member != NULL) \
2175     { \
2176       g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_INVALID_ARGUMENT, \
2177                    "length and no-length function pointers mixed:  did not expect %s to be non-NULL", \
2178                    #member); \
2179       return NULL; \
2180     }
2181   if (has_len)
2182     {
2183       TEST_MEMBER_NULL (compare_no_len);
2184       TEST_MEMBER_NULL (merge_no_len);
2185       TEST_MEMBER_NULL (simplify_no_len);
2186     }
2187   else
2188     {
2189       TEST_MEMBER_NULL (compare);
2190       TEST_MEMBER_NULL (merge);
2191       TEST_MEMBER_NULL (simplify);
2192     }
2193   *has_len_out = has_len;
2194   return &all_run_task_funcs[has_len?1:0][has_compare?1:0][has_merge?1:0];
2195 }
2196 
2197 /* placeholder until there's actually some tunable stuff
2198    in flat file-factory (there certainly could be:
2199    chunk_size, compression_level) */
2200 static GskTableFileFactory *
table_options_get_file_factory(const GskTableOptions * options,GError ** error)2201 table_options_get_file_factory (const GskTableOptions *options,
2202                                 GError               **error)
2203 {
2204   return gsk_table_file_factory_new_flat ();
2205 }
2206