1 /* XXX: strace indicates that we are using O_LARGEFILE? */
2 /* XXX: need to handle files that have to spend a while finishing
3 up after the last entry is added. */
4 /* XXX: still get occasional expected value but not found errors
5 from test scripts (while N; do Nq; done) ... see also "setup" script for tests */
6 /* NOTE: see 'setup' script for tests (etc) */
7 /* POSSIBLE TODO: support key_fixed_length, value_fixed_length */
8 /* POSSIBLE TODO: support disabling prefix compression */
9 /* POSSIBLE TODO: support altering compression level */
10 /* POSSIBLE TODO: support LZO compression */
11
12 #include <string.h>
13 #include <errno.h>
14 #include <sys/mman.h>
15 #include <fcntl.h>
16 #include <sys/stat.h>
17 #include <sys/types.h>
18 #include <unistd.h>
19 #include <stdio.h> /* for rename() */
20
21 #include "gskrbtreemacros.h"
22 #include "gsklistmacros.h"
23 #include "gskghelpers.h"
24 #include "gskutils.h"
25 #include "gskmemorybarrier.h"
26 #include "gsktable.h"
27 #include "gsktable-file.h"
28 #include "gskerror.h"
29
30 typedef struct _TableUserData TableUserData;
31 typedef struct _MergeTask MergeTask;
32 typedef struct _FileInfo FileInfo;
33 typedef struct _TreeNode TreeNode;
34
35 #define ID_FMT "%" G_GUINT64_FORMAT
36
37 #define JOURNAL_FILE_MAGIC 0x1143eeab
38
39 #define DEBUG_MERGE_TASKS 0
40 #define DEBUG_PRINT_QUERIES 0
41 #define DEBUG_JOURNAL_WRITING 0
42 #define DEBUG_JOURNAL_REPLAY 0
43
44 #define TASK_IS_UNSTARTED(task) \
45 ((task) == NULL || !(task)->is_started)
46
47 struct _TableUserData
48 {
49 guint ref_count;
50 GskTableMergeFunc merge;
51 gpointer user_data;
52 GDestroyNotify destroy;
53 };
54
55 struct _MergeTask
56 {
57 gboolean is_started;
58 FileInfo *inputs[2];
59
60 union
61 {
62 struct {
63 /* ratio of inputs[0]->n_data / inputs[1]->n_data * (2^16) */
64 /* we want the smallest ratio possible;
65 the "max_merge_ratio" specifies the limit
66 for merge-task creation. */
67 guint32 input_size_ratio_b16;
68
69 /* tree of unstarted merge tasks, sorted by input_size_ratio_b16
70 then n_entries */
71 MergeTask *left, *right, *parent;
72 gboolean is_red;
73 } unstarted;
74 struct {
75 GskTableFile *output;
76 gboolean has_last_queryable_key;
77 GskTableBuffer last_queryable_key;
78 struct {
79 GskTableReader *reader;
80 } inputs[2];
81 MergeTask *next_run;
82 } started;
83 } info;
84 };
85
86 struct _FileInfo
87 {
88 GskTableFile *file;
89 guint ref_count;
90 guint64 first_input_entry, n_input_entries;
91 MergeTask *prev_task; /* possible merge task with prior file */
92 MergeTask *next_task; /* possible merge task with next file */
93 FileInfo *prev_file, *next_file;
94 };
95
96 struct _TreeNode
97 {
98 GskTableBuffer key;
99 GskTableBuffer value;
100 TreeNode *left, *right, *parent;
101 guint is_red : 1;
102 };
103
104 #define GET_FILE_INFO_LIST(table) \
105 FileInfo *, (table)->first_file, (table)->last_file, prev_file, next_file
106 #define GET_RUN_STACK(table) \
107 MergeTask *, table->run_list, info.started.next_run
108
109 /* always runs the table->run_list task */
110 typedef gboolean (*RunTaskFunc) (GskTable *table,
111 guint iterations,
112 GError **error);
113 typedef struct _RunTaskFuncs RunTaskFuncs;
114 struct _RunTaskFuncs
115 {
116 RunTaskFunc simplify_flush;
117 RunTaskFunc simplify_noflush;
118 RunTaskFunc nosimplify_flush;
119 RunTaskFunc nosimplify_noflush;
120 };
121
122 /* optimized variants of RunTaskFuncs:
123 len_memcmp_nomerge
124 len_memcmp_merge
125 nolen_memcmp_merge
126 len_memcmp_merge
127 len_nomerge
128 nolen_nomerge
129 nolen_merge
130 len_merge */
131 static RunTaskFuncs *
132 table_options_get_run_funcs (const GskTableOptions *options,
133 gboolean *has_len_out,
134 GError **error);
135 static GskTableFileFactory *
136 table_options_get_file_factory (const GskTableOptions *,
137 GError **error);
138
139 typedef TreeNode *(*InMemoryTreeLookupFunc) (GskTable *table,
140 guint key_len,
141 const guint8 *key_data);
142 typedef int (*TreeNodeCompareFunc) (GskTable *table,
143 TreeNode *a,
144 TreeNode *b);
145 struct _GskTable
146 {
147 char *dir;
148 int lock_fd;
149
150 /* data handling functions */
151 gboolean has_len;
152 union
153 {
154 GskTableCompareFunc with_len;
155 GskTableCompareFuncNoLen no_len;
156 } compare;
157 union
158 {
159 GskTableMergeFunc with_len;
160 GskTableMergeFuncNoLen no_len;
161 } merge;
162 union
163 {
164 GskTableSimplifyFunc with_len;
165 GskTableSimplifyFuncNoLen no_len;
166 } simplify;
167 GskTableValueIsStableFunc is_stable_func;
168 RunTaskFuncs *run_funcs;
169 InMemoryTreeLookupFunc in_memory_tree_lookup;
170 TreeNodeCompareFunc tree_node_compare;
171
172 guint query_reverse_chronologically : 1;
173
174 gpointer user_data;
175 TableUserData *table_user_data; /* for ref-counting */
176
177 guint64 n_input_entries;
178
179 /* journalling */
180 int journal_fd;
181 char *journal_cur_fname;
182 char *journal_tmp_fname; /* actual file to write to */
183 guint8 *journal_mmap;
184 guint journal_len; /* current offset in journal */
185 guint journal_size; /* size of journal data */
186 GskTableJournalMode journal_mode;
187 guint journal_flush_index;
188
189 /* files */
190 guint n_files;
191 FileInfo *first_file, *last_file;
192 guint64 last_file_id;
193
194 /* old files */
195 guint n_old_files;
196 FileInfo **old_files; /* as in the beginning of the journal file */
197
198 /* tree of merge tasks, sorted by input file ratio */
199 MergeTask *unstarted_merges;
200
201 /* heap of merge tasks (sorted by n_entries ascending) */
202 MergeTask *run_list;
203 guint n_running_tasks;
204
205 /* tree of in-memory entries */
206 TreeNode *in_memory_tree;
207 guint in_memory_bytes;
208 guint in_memory_entry_count;
209
210 /* fixed pool of tree nodes to used */
211 TreeNode *tree_node_pool;
212 guint tree_node_pool_used; /* out of max_in_memory_entries */
213
214 /* fixed length keys and values can be optimized by not storing the length. */
215 gssize key_fixed_length; /* or -1 */
216 gssize value_fixed_length; /* or -1 */
217
218 /* buffers */
219 GskTableBuffer result_buffers[2];
220 GskTableBuffer merge_buffer;
221 GskTableBuffer simplify_buffer;
222 GskTableFileQuery file_query;
223 guint file_query_key_len;
224 const guint8 *file_query_key_data;
225
226 /* file factory */
227 GskTableFileFactory *file_factory;
228
229 /* tunables */
230 guint max_running_tasks;
231 guint max_merge_ratio_b16;
232 guint max_in_memory_bytes;
233 guint max_in_memory_entries;
234 guint journal_flush_period;
235
236 };
237
238 #define UNSTARTED_MERGE_TASK_IS_RED(task) (task)->info.unstarted.is_red
239 #define UNSTARTED_MERGE_TASK_SET_IS_RED(task,v) (task)->info.unstarted.is_red = v
240 #define COMPARE_UNSTARTED_MERGE_TASKS(a,b, rv) \
241 if (a->info.unstarted.input_size_ratio_b16 < b->info.unstarted.input_size_ratio_b16) \
242 rv = -1; \
243 else if (a->info.unstarted.input_size_ratio_b16 > b->info.unstarted.input_size_ratio_b16) \
244 rv = 1; \
245 else \
246 rv = (a<b) ? -1 : (a>b) ? 1 : 0;
247 #define GET_UNSTARTED_MERGE_TASK_TREE(table) \
248 (table)->unstarted_merges, MergeTask *, \
249 UNSTARTED_MERGE_TASK_IS_RED, \
250 UNSTARTED_MERGE_TASK_SET_IS_RED, \
251 info.unstarted.parent, \
252 info.unstarted.left, \
253 info.unstarted.right, \
254 COMPARE_UNSTARTED_MERGE_TASKS
255
256 /* NOTE: 'table' must be a local-variable or a parameter for
257 these macros to work!!! blah. */
258 #define TREE_NODE_IS_RED(node) node->is_red
259 #define TREE_NODE_SET_IS_RED(node,v) node->is_red = v
260 #define TREE_NODE_COMPARE(a,b,rv) rv = table->tree_node_compare(table, a,b)
261 #define GET_IN_MEMORY_TREE(table) \
262 (table)->in_memory_tree, TreeNode *, \
263 TREE_NODE_IS_RED, TREE_NODE_SET_IS_RED, \
264 parent, left, right, \
265 TREE_NODE_COMPARE
266
267
268 static inline void
set_buffer(GskTableBuffer * buffer,guint len,const guint8 * data)269 set_buffer (GskTableBuffer *buffer,
270 guint len,
271 const guint8 *data)
272 {
273 memcpy (gsk_table_buffer_set_len (buffer, len), data, len);
274 }
275 static inline void
copy_buffer(GskTableBuffer * buffer,const GskTableBuffer * src)276 copy_buffer (GskTableBuffer *buffer,
277 const GskTableBuffer *src)
278 {
279 set_buffer (buffer, src->len, src->data);
280 }
281 static void create_unstarted_merge_task (GskTable *table,
282 FileInfo *prev,
283 FileInfo *next);
284
285 /* --- file-info ref-counting --- */
286 static inline FileInfo *
file_info_ref(FileInfo * fi)287 file_info_ref (FileInfo *fi)
288 {
289 g_assert (fi->ref_count > 0);
290 ++(fi->ref_count);
291 return fi;
292 }
293 static inline FileInfo *
file_info_unref(FileInfo * fi,const char * dir,gboolean erase)294 file_info_unref (FileInfo *fi, const char *dir, gboolean erase)
295 {
296 g_assert (fi->ref_count > 0);
297 if (--(fi->ref_count) == 0)
298 {
299 GError *error = NULL;
300 if (!gsk_table_file_destroy (fi->file, dir, erase, &error))
301 {
302 g_warning ("gsk_table_file_destroy "ID_FMT" (erase=%u) failed: %s",
303 fi->file->id, erase, error->message);
304 g_error_free (error);
305 }
306 g_slice_free (FileInfo, fi);
307 }
308 return fi;
309 }
310
311 /* --- data structure invariant checking --- */
312 static gboolean
are_files_contiguous(GskTable * table)313 are_files_contiguous (GskTable *table)
314 {
315 guint64 last_end = 0;
316 FileInfo *fi;
317 for (fi = table->first_file; fi != NULL; fi = fi->next_file)
318 {
319 if (last_end != fi->first_input_entry)
320 return FALSE;
321 last_end += fi->n_input_entries;
322 }
323 return TRUE;
324 }
325 #define CHECK_FILES_CONTIGUOUS(table) g_assert (are_files_contiguous (table))
326
327 /* --- journal management --- */
328 static gboolean read_journal (GskTable *table,
329 GError **error);
330 static gboolean reset_journal (GskTable *table,
331 GError **error);
332
333 static inline void
kill_unstarted_merge_task(GskTable * table,MergeTask * to_kill)334 kill_unstarted_merge_task (GskTable *table,
335 MergeTask *to_kill)
336 {
337 g_assert (to_kill->inputs[0]->next_task == to_kill);
338 g_assert (to_kill->inputs[1]->prev_task == to_kill);
339 GSK_RBTREE_REMOVE (GET_UNSTARTED_MERGE_TASK_TREE (table), to_kill);
340 to_kill->inputs[0]->next_task = NULL;
341 to_kill->inputs[1]->prev_task = NULL;
342 g_slice_free (MergeTask, to_kill);
343 }
344
345 static void
create_unstarted_merge_task(GskTable * table,FileInfo * prev,FileInfo * next)346 create_unstarted_merge_task (GskTable *table,
347 FileInfo *prev,
348 FileInfo *next)
349 {
350 MergeTask *task = g_slice_new (MergeTask);
351 MergeTask *unused;
352 guint32 ratio_b16;
353 g_assert (TASK_IS_UNSTARTED (prev->prev_task));
354 g_assert (prev->next_task == NULL);
355 g_assert (next->prev_task == NULL);
356 g_assert (TASK_IS_UNSTARTED (next->next_task));
357 task->is_started = FALSE;
358 task->inputs[0] = prev;
359 task->inputs[1] = next;
360 if (prev->file->n_entries == 0 && next->file->n_entries == 0)
361 ratio_b16 = 1<<16;
362 else if (next->file->n_entries == 0)
363 ratio_b16 = 0xffffffff;
364 else
365 {
366 gdouble ratio_d_b16 = (double) prev->file->n_entries / next->file->n_entries
367 * (double) (1<<16);
368 if (ratio_d_b16 >= 0xffffffff)
369 ratio_b16 = 0xffffffff;
370 else
371 ratio_b16 = (guint) ratio_d_b16;
372 }
373
374 prev->next_task = next->prev_task = task;
375 task->info.unstarted.input_size_ratio_b16 = ratio_b16;
376 GSK_RBTREE_INSERT (GET_UNSTARTED_MERGE_TASK_TREE (table),
377 task, unused);
378 g_assert (unused == NULL);
379 }
380
381 static guint
uint64_hash(gconstpointer a)382 uint64_hash (gconstpointer a)
383 {
384 guint64 ai = * (guint64 *) a;
385 guint ai_high = ai>>32;
386 guint ai_low = ai;
387 return ai_low * 33 + ai_high;
388 }
389 static gboolean
uint64_equal(gconstpointer a,gconstpointer b)390 uint64_equal (gconstpointer a, gconstpointer b)
391 {
392 return (* (guint64 *) a) == (* (guint64 *) b);
393 }
394
395 static gboolean
kill_unknown_files(GskTable * table,GError ** error)396 kill_unknown_files (GskTable *table,
397 GError **error)
398 {
399 GDir *dirlist;
400 GHashTable *known_ids;
401 GPtrArray *to_delete;
402 FileInfo *fi;
403 const char *name;
404 const char *at;
405
406
407 /* move aside unused files */
408 dirlist = g_dir_open (table->dir, 0, error);
409 if (dirlist == NULL)
410 {
411 g_warning ("g_dir_open failed on existing db dir?: %s", table->dir);
412 /* TODO: cleanup (or maybe just return table anyway?) */
413 return FALSE;
414 }
415 known_ids = g_hash_table_new (uint64_hash, uint64_equal);
416 for (fi = table->first_file; fi; fi = fi->next_file)
417 {
418 #if DEBUG_OLD_FILE_DELETION
419 g_message ("note complete file "ID_FMT"", fi->file->id);
420 #endif
421 g_hash_table_insert (known_ids, &fi->file->id, fi->file);
422 if (fi->next_task != NULL && fi->next_task->is_started)
423 {
424 GskTableFile *output = fi->next_task->info.started.output;
425 #if DEBUG_OLD_FILE_DELETION
426 g_message ("note merge output file "ID_FMT"", output->id);
427 #endif
428 g_hash_table_insert (known_ids, &output->id, output);
429 }
430 }
431 to_delete = g_ptr_array_new ();
432 while ((name=g_dir_read_name (dirlist)) != NULL)
433 {
434 guint64 id;
435 if (name[0] == '.'
436 && ((strcmp (name, ".") == 0 || strcmp (name, "..") == 0)))
437 continue; /* ignore . and .. */
438
439 if ('A' <= name[0] && name[0] <= 'Z')
440 continue; /* ignore user files */
441
442 if (strcmp (name, "journal") == 0
443 || strcmp (name, "journal.tmp") == 0)
444 continue; /* ignore journal files */
445
446 /* find file-id and extension, since it's possible we want to
447 delete this file. */
448 for (at = name; g_ascii_isxdigit (*at); at++)
449 ;
450 if (at == name || *at != '.')
451 {
452 g_warning ("unrecognized file '%s' in dir.. skipping", name);
453 continue;
454 }
455 id = g_ascii_strtoull (name, NULL, 16);
456 /* TODO: verify we know the extension? */
457 if (g_hash_table_lookup (known_ids, &id) == NULL)
458 {
459 #if DEBUG_OLD_FILE_DELETION
460 g_message ("unknown id for file %s [id="ID_FMT"]: deleting it", name, id);
461 #endif
462 g_ptr_array_add (to_delete, g_strdup_printf ("%s/%s", table->dir, name));
463 }
464 }
465 g_hash_table_destroy (known_ids);
466 g_ptr_array_foreach (to_delete, (GFunc) unlink, NULL); /* eep! */
467 g_ptr_array_foreach (to_delete, (GFunc) g_free, NULL);
468 g_ptr_array_free (to_delete, TRUE);
469 g_dir_close (dirlist);
470 return TRUE;
471 }
472
473 static gboolean
read_journal(GskTable * table,GError ** error)474 read_journal (GskTable *table,
475 GError **error)
476 {
477 int fd = open (table->journal_cur_fname, O_RDWR);
478 struct stat stat_buf;
479 guint i;
480 guint magic, n_files, n_merge_tasks;
481 guint32 tmp32_le;
482 guint64 tmp64_le;
483 guint journal_size;
484 guint8 *mmapped_journal;
485 const guint8 *at;
486 guint64 n_input_entries;
487 FileInfo **file_infos;
488 GskTableFileFactory *file_factory = table->file_factory;
489 guint file_index;
490 guint64 max_file_id = 0;
491 GskTableJournalMode old_journal_mode;
492 if (fd < 0)
493 {
494 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_OPEN,
495 "error opening journal file %s: %s",
496 table->journal_cur_fname,
497 g_strerror (errno));
498 return FALSE;
499 }
500 if (fstat (fd, &stat_buf) < 0)
501 {
502 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_STAT,
503 "error statting journal file %s: %s",
504 table->journal_cur_fname,
505 g_strerror (errno));
506 close (fd);
507 return FALSE;
508 }
509 journal_size = stat_buf.st_size;
510 mmapped_journal = mmap (NULL, journal_size, PROT_READ|PROT_WRITE,
511 MAP_SHARED, fd, 0);
512 if (mmapped_journal == MAP_FAILED || mmapped_journal == NULL)
513 {
514 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_MMAP,
515 "error mmapping journal file %s: %s",
516 table->journal_cur_fname,
517 g_strerror (errno));
518 close (fd);
519 return FALSE;
520 }
521
522 /* parse the journal */
523 at = mmapped_journal;
524
525 tmp32_le = * (guint32 *) at;
526 magic = GUINT32_FROM_LE (tmp32_le);
527 if (magic != JOURNAL_FILE_MAGIC)
528 {
529 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PARSE,
530 "invalid magic on journal file (0x%08x, not 0x%08x)",
531 magic, JOURNAL_FILE_MAGIC);
532 goto error_cleanup;
533 }
534 at += 4;
535
536 tmp32_le = * (guint32 *) at;
537 n_files = GUINT32_FROM_LE (tmp32_le);
538 at += 4;
539
540 tmp32_le = * (guint32 *) at;
541 n_merge_tasks = GUINT32_FROM_LE (tmp32_le);
542 at += 4;
543
544 tmp32_le = * (guint32 *) at;
545 at += 4;
546 if (tmp32_le != 0)
547 {
548 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PARSE,
549 "reserved word in journal nonzero");
550 goto error_cleanup;
551 }
552
553 tmp64_le = * (guint64 *) at;
554 at += 8;
555 n_input_entries = GUINT64_FROM_LE (tmp64_le);
556
557 #if DEBUG_JOURNAL_REPLAY
558 g_message ("read_journal: n_files=%u, n_merge_tasks=%u, n_input_entries=%"G_GUINT64_FORMAT,
559 n_files, n_merge_tasks, n_input_entries);
560 #endif
561
562 file_infos = g_new0 (FileInfo *, n_files);
563
564 /* parse files */
565 for (i = 0; i < n_files; i++)
566 {
567 guint64 file_id, first_input_entry, n_input_entries, n_entries;
568 FileInfo *file_info;
569
570 memcpy (&tmp64_le, at, 8);
571 file_id = GUINT64_FROM_LE (tmp64_le);
572 at += 8;
573 max_file_id = MAX (max_file_id, file_id);
574
575 memcpy (&tmp64_le, at, 8);
576 first_input_entry = GUINT64_FROM_LE (tmp64_le);
577 at += 8;
578
579 memcpy (&tmp64_le, at, 8);
580 n_input_entries = GUINT64_FROM_LE (tmp64_le);
581 at += 8;
582
583 memcpy (&tmp64_le, at, 8);
584 n_entries = GUINT64_FROM_LE (tmp64_le);
585 at += 8;
586 file_info = g_slice_new0 (FileInfo);
587 file_info->first_input_entry = first_input_entry;
588 file_info->n_input_entries = n_input_entries;
589 file_info->ref_count = 1;
590
591 file_info->file
592 = gsk_table_file_factory_open_file (file_factory, table->dir,
593 file_id, error);
594 if (file_info->file == NULL)
595 goto error_cleanup;
596 file_info->file->n_entries = n_entries;
597 g_assert (file_info->file);
598 file_infos[i] = file_info;
599
600 if (i > 0)
601 {
602 FileInfo *prev = file_infos[i-1];
603 guint64 prev_end = prev->first_input_entry + prev->n_input_entries;
604 if (prev_end != file_info->first_input_entry)
605 {
606 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PARSE,
607 "inconsistency: files "ID_FMT" and "ID_FMT" are not continguous (prev_end=%"G_GUINT64_FORMAT"; cur_start=%"G_GUINT64_FORMAT")",
608 prev->file->id, file_info->file->id, prev_end, file_info->first_input_entry);
609 goto error_cleanup;
610 }
611 file_info->prev_file = prev;
612 prev->next_file = file_info;
613 }
614 else
615 file_info->prev_file = NULL;
616 }
617 if (n_files > 0)
618 file_infos[n_files-1]->next_file = NULL;
619
620 /* parse merge tasks */
621 file_index = 0;
622 for (i = 0; i < n_merge_tasks; i++)
623 {
624 /* format:
625 - for each input:
626 - uint64 for the file-id
627 - reader state
628 - output file-id
629 - output file build state
630 */
631 struct {
632 guint64 file_id;
633 guint reader_state_len;
634 const guint8 *reader_state;
635 FileInfo *file_info;
636 } inputs[2];
637 guint64 output_file_id;
638 guint build_state_len;
639 const guint8 *build_state;
640 MergeTask *merge_task;
641 guint j;
642
643 for (j = 0; j < 2; j++)
644 {
645 memcpy (&tmp64_le, at, 8);
646 at += 8;
647 inputs[j].file_id = GUINT64_FROM_LE (tmp64_le);
648
649 memcpy (&tmp32_le, at, 4);
650 at += 4;
651 inputs[j].reader_state_len = GUINT32_FROM_LE (tmp32_le);
652
653 inputs[j].reader_state = at;
654 at += inputs[j].reader_state_len;
655 }
656
657 memcpy (&tmp64_le, at, 8);
658 at += 8;
659 output_file_id = GUINT64_FROM_LE (tmp64_le);
660 max_file_id = MAX (max_file_id, output_file_id);
661
662 memcpy (&tmp32_le, at, 4);
663 at += 4;
664 build_state_len = GUINT32_FROM_LE (tmp32_le);
665
666 build_state = at;
667 at += build_state_len;
668
669 /* link merge data into file infos */
670 while (file_index + 1 < n_files
671 && file_infos[file_index]->file->id != inputs[0].file_id)
672 file_index++;
673 if (file_index + 1 == n_files)
674 {
675 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PARSE,
676 "merge task's input[0] refers to input file "ID_FMT" which was not found, in the non-tail portion of the files list",
677 inputs[0].file_id);
678 goto error_cleanup;
679 }
680 inputs[0].file_info = file_infos[file_index];
681 inputs[1].file_info = file_infos[file_index+1];
682 g_assert (inputs[0].file_info->file->id == inputs[0].file_id);
683 if (inputs[1].file_info->file->id != inputs[1].file_id)
684 {
685 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_PARSE,
686 "second input to merge task nonconsecutive");
687 goto error_cleanup;
688 }
689 g_assert (inputs[0].file_info->next_task == NULL);
690 g_assert (inputs[1].file_info->prev_task == NULL);
691
692 merge_task = g_slice_new (MergeTask);
693 merge_task->is_started = TRUE;
694 merge_task->inputs[0] = inputs[0].file_info;
695 merge_task->inputs[1] = inputs[1].file_info;
696
697 for (j = 0; j < 2; j++)
698 {
699 /* restore reader */
700 GskTableReader *reader;
701 reader = gsk_table_file_recreate_reader (inputs[j].file_info->file,
702 table->dir,
703 inputs[j].reader_state_len,
704 inputs[j].reader_state,
705 error);
706 if (reader == NULL)
707 {
708 if (j == 1)
709 gsk_table_reader_destroy (merge_task->info.started.inputs[0].reader);
710 g_slice_free (MergeTask, merge_task);
711 goto error_cleanup;
712 }
713 merge_task->info.started.inputs[j].reader = reader;
714 }
715
716 merge_task->info.started.output
717 = gsk_table_file_factory_open_building_file (file_factory,
718 table->dir, output_file_id,
719 build_state_len,
720 build_state, error);
721 if (merge_task->info.started.output == NULL)
722 {
723 gsk_g_error_add_prefix (error,
724 "instantiating merge-task between files "ID_FMT" and "ID_FMT,
725 inputs[0].file_id,
726 inputs[1].file_id);
727 gsk_table_reader_destroy (merge_task->info.started.inputs[0].reader);
728 gsk_table_reader_destroy (merge_task->info.started.inputs[1].reader);
729 g_slice_free (MergeTask, merge_task);
730 goto error_cleanup;
731 }
732 inputs[0].file_info->next_task = merge_task;
733 inputs[1].file_info->prev_task = merge_task;
734
735 /* neither of these inputs can be involved in another merge-task,
736 so advance the pointer to prevent reuse. */
737 file_index++;
738
739 GSK_STACK_PUSH (GET_RUN_STACK (table), merge_task);
740 table->n_running_tasks++;
741 }
742 #define COMPARE_RUNNING_TASKS_BY_N_INPUTS(a,b, rv) \
743 { \
744 guint64 total_a_inputs = a->inputs[0]->file->n_entries \
745 + a->inputs[1]->file->n_entries; \
746 guint64 total_b_inputs = b->inputs[0]->file->n_entries \
747 + b->inputs[1]->file->n_entries; \
748 if (total_a_inputs < total_b_inputs) \
749 rv = 1; \
750 else if (total_a_inputs > total_b_inputs) \
751 rv = -1; \
752 else \
753 rv = 0; \
754 }
755 GSK_STACK_SORT (GET_RUN_STACK (table), COMPARE_RUNNING_TASKS_BY_N_INPUTS);
756 #undef COMPARE_RUNNING_TASKS_BY_N_INPUTS
757 #if DEBUG_JOURNAL_REPLAY
758 g_message ("read_journal: header length %u", (guint)(at-mmapped_journal));
759 #endif
760
761 /* --- do consistency checks --- */
762
763 /* TODO: other checks not already done during parsing??? */
764
765
766 /* setup various bits of the table;
767 after doing this, do not goto error_cleanup,
768 since it will free some of this data */
769 table->journal_mmap = mmapped_journal;
770 table->journal_size = journal_size;
771 table->n_files = n_files;
772 if (n_files > 0)
773 {
774 table->first_file = file_infos[0];
775 table->last_file = file_infos[n_files-1];
776 }
777 else
778 {
779 /* since the GskTable object is zeroed, the list is already empty */
780 }
781 table->n_old_files = n_files;
782 table->old_files = file_infos;
783 for (i = 0; i < n_files; i++)
784 {
785 table->old_files[i] = file_info_ref (file_infos[i]);
786 }
787
788 /* create unstarted merge tasks */
789 for (i = 0; i + 1 < n_files; i++)
790 if (file_infos[i]->next_task == NULL
791 && TASK_IS_UNSTARTED (file_infos[i]->prev_task)
792 && TASK_IS_UNSTARTED (file_infos[i+1]->next_task))
793 {
794 g_assert (file_infos[i+1]->prev_task == NULL);
795 create_unstarted_merge_task (table, file_infos[i], file_infos[i+1]);
796 }
797
798 /* delete old extraneous garbage */
799 if (!kill_unknown_files (table, error))
800 return FALSE;
801
802 table->n_input_entries = n_input_entries;
803 table->last_file_id = max_file_id;
804
805 /* --- process existing journal entries --- */
806 /* disable journalling and replay the journal */
807 old_journal_mode = table->journal_mode;
808 table->journal_mode = GSK_TABLE_JOURNAL_NONE;
809 for (;;)
810 {
811 guint align_offset = ((gsize)at) & 3;
812 guint32 key_len, value_len;
813 if (align_offset)
814 at += 4 - align_offset;
815 tmp32_le = ((guint32 *) at)[0];
816 key_len = GUINT32_FROM_LE (tmp32_le);
817 if (key_len == 0)
818 break;
819 key_len--; /* journal key lengths are offset by 1 */
820 tmp32_le = ((guint32 *) at)[1];
821 value_len = GUINT32_FROM_LE (tmp32_le);
822 #if DEBUG_JOURNAL_REPLAY
823 g_message ("replay journal: offset=%u, key,value_len=%u,%u",
824 (guint)(at - table->journal_mmap),
825 key_len, value_len);
826 #endif
827 at += 8;
828 if (!gsk_table_add (table, key_len, at, value_len, at + key_len, error))
829 {
830 gsk_g_error_add_prefix (error, "error replaying journal");
831 table->journal_mode = old_journal_mode;
832 /* do not use error_cleanup, let gsk_table_destroy() do the work */
833 return FALSE;
834 }
835 at += key_len + value_len;
836 }
837 table->journal_mode = old_journal_mode;
838 table->journal_len = at - mmapped_journal;
839
840 return TRUE;
841
842 error_cleanup:
843 if (file_infos)
844 {
845 for (i = 0; i < n_files && file_infos[i] != NULL; i++)
846 file_info_unref (file_infos[i], table->dir, FALSE);
847 g_free (file_infos);
848 }
849 munmap (mmapped_journal, journal_size);
850 return FALSE;
851 }
852
853 static gboolean
resize_journal(gint journal_fd,guint8 ** journal_mmap_inout,guint * journal_size_inout,guint new_min_size,GError ** error)854 resize_journal (gint journal_fd,
855 guint8 **journal_mmap_inout,
856 guint *journal_size_inout,
857 guint new_min_size,
858 GError **error)
859 {
860 guint new_size;
861 guint8 *tmp;
862 if (new_min_size <= *journal_size_inout)
863 return TRUE;
864 new_size = *journal_size_inout;
865 while (new_size < new_min_size)
866 new_size += new_size;
867
868 /* un-mmap file */
869 munmap (*journal_mmap_inout, *journal_size_inout);
870
871 /* resize */
872 if (ftruncate (journal_fd, new_size) < 0)
873 {
874 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_TRUNCATE,
875 "error resizing journal: %s",
876 g_strerror (errno));
877 return FALSE;
878 }
879
880 /* mmap file */
881 tmp = mmap (NULL, new_size, PROT_READ|PROT_WRITE, MAP_SHARED, journal_fd, 0);
882 if (tmp == NULL || tmp == MAP_FAILED)
883 {
884 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_MMAP,
885 "error mmapping resized journal");
886 return FALSE;
887 }
888 *journal_size_inout = new_size;
889 *journal_mmap_inout = tmp;
890 return TRUE;
891 }
892
893 static gboolean
reset_journal(GskTable * table,GError ** error)894 reset_journal (GskTable *table,
895 GError **error)
896 {
897 guint i;
898 FileInfo *fi;
899 int journal_fd;
900 guint8 *journal_mmap;
901 guint at;
902 guint n_merge_tasks_written;
903
904 g_assert (table->in_memory_tree == NULL);
905
906 if (table->journal_mmap)
907 munmap (table->journal_mmap, table->journal_size);
908 if (table->journal_fd >= 0)
909 close (table->journal_fd);
910
911 /* write temporary new journal which
912 is the state dumped out,
913 and no journalled adds. */
914 {
915 journal_fd = open (table->journal_tmp_fname, O_CREAT|O_RDWR|O_TRUNC, 0644);
916 if (journal_fd < 0)
917 {
918 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_OPEN,
919 "error creating new journal file %s: %s",
920 table->journal_tmp_fname, g_strerror (errno));
921 return FALSE;
922 }
923 if (ftruncate (journal_fd, table->journal_size) < 0)
924 {
925 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_TRUNCATE,
926 "error sizing journal file: %s",
927 g_strerror (errno));
928 goto failed_writing_journal;
929 }
930 journal_mmap = mmap (NULL, table->journal_size, PROT_READ|PROT_WRITE,
931 MAP_SHARED, journal_fd, 0);
932 if (journal_mmap == NULL || journal_mmap == MAP_FAILED)
933 {
934 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_TRUNCATE,
935 "mmap failed on tmp journal: %s",
936 g_strerror (errno));
937 close (journal_fd);
938 unlink (table->journal_tmp_fname);
939 return FALSE;
940 }
941 }
942
943 {
944 guint32 header[6];
945 guint32 tmp;
946 header[0] = GUINT32_TO_LE (JOURNAL_FILE_MAGIC);
947 header[1] = GUINT32_TO_LE (table->n_files);
948 header[2] = GUINT32_TO_LE (table->n_running_tasks);
949 header[3] = 0; /* reserved */
950 tmp = table->n_input_entries;
951 header[4] = GUINT32_TO_LE (tmp);
952 tmp = table->n_input_entries>>32;
953 header[5] = GUINT32_TO_LE (tmp);
954 memcpy (journal_mmap, header, 24);
955 }
956 at = 24;
957 for (fi = table->first_file; fi; fi = fi->next_file)
958 {
959 guint64 file_header[4];
960 file_header[0] = GUINT64_TO_LE (fi->file->id);
961 file_header[1] = GUINT64_TO_LE (fi->first_input_entry);
962 file_header[2] = GUINT64_TO_LE (fi->n_input_entries);
963 file_header[3] = GUINT64_TO_LE (fi->file->n_entries);
964 if (at + sizeof (file_header) > table->journal_size)
965 {
966 if (!resize_journal (journal_fd,
967 &journal_mmap, &table->journal_size,
968 at + sizeof (file_header),
969 error))
970 return FALSE;
971 }
972 memcpy (journal_mmap + at, file_header, sizeof (file_header));
973 at += sizeof (file_header);
974 }
975 n_merge_tasks_written = 0;
976 for (fi = table->first_file; fi; fi = fi->next_file)
977 if (fi->next_task != NULL && fi->next_task->is_started)
978 {
979 MergeTask *task = fi->next_task;
980 guint reader_state_lens[2];
981 guint8 *reader_states[2];
982 guint build_state_len;
983 guint8 *build_state;
984 guint input;
985 for (input = 0; input < 2; input++)
986 {
987 if (!gsk_table_file_get_reader_state (task->inputs[input]->file,
988 task->info.started.inputs[input].reader,
989 &reader_state_lens[input],
990 &reader_states[input],
991 error))
992 {
993 gsk_g_error_add_prefix (error, "reset_journal: input %u", input);
994 goto failed_writing_journal;
995 }
996 }
997 if (!gsk_table_file_get_build_state (task->info.started.output,
998 &build_state_len,
999 &build_state,
1000 error))
1001 {
1002 gsk_g_error_add_prefix (error, "reset_journal: build state");
1003 goto failed_writing_journal;
1004 }
1005
1006 guint total_len;
1007 total_len = (8 + 4 + reader_state_lens[0])
1008 + (8 + 4 + reader_state_lens[1])
1009 + (8 + 4 + build_state_len);
1010 if (at + total_len > table->journal_size)
1011 {
1012 if (!resize_journal (journal_fd,
1013 &journal_mmap, &table->journal_size,
1014 at + total_len,
1015 error))
1016 return FALSE;
1017 }
1018 for (input = 0; input < 2; input++)
1019 {
1020 guint64 id = task->inputs[input]->file->id;
1021 guint32 len = reader_state_lens[input];
1022 guint64 id_le = GUINT64_TO_LE (id);
1023 guint32 len_le = GUINT32_TO_LE (len);
1024 memcpy (journal_mmap + at, &id_le, 8);
1025 at += 8;
1026 memcpy (journal_mmap + at, &len_le, 4);
1027 at += 4;
1028 memcpy (journal_mmap + at, reader_states[input], len);
1029 at += len;
1030 g_free (reader_states[input]);
1031 }
1032 {
1033 guint64 id = task->info.started.output->id;
1034 guint32 len = build_state_len;
1035 guint64 id_le = GUINT64_TO_LE (id);
1036 guint32 len_le = GUINT32_TO_LE (len);
1037 memcpy (journal_mmap + at, &id_le, 8);
1038 at += 8;
1039 memcpy (journal_mmap + at, &len_le, 4);
1040 at += 4;
1041 memcpy (journal_mmap + at, build_state, len);
1042 at += len;
1043 g_free (build_state);
1044 }
1045
1046 n_merge_tasks_written++;
1047 }
1048 g_assert (n_merge_tasks_written == table->n_running_tasks);
1049
1050 /* move the journal into place */
1051 if (rename (table->journal_tmp_fname, table->journal_cur_fname) < 0)
1052 {
1053 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_FILE_RENAME,
1054 "error moving journal into place: %s",
1055 g_strerror (errno));
1056 goto failed_writing_journal;
1057 }
1058
1059 #if DEBUG_JOURNAL_WRITING
1060 g_message ("reset-journal: header length %u", at);
1061 #endif
1062
1063
1064 /* align journal pointer */
1065 if (at % 4 != 0)
1066 at += 4 - (at % 4);
1067
1068 table->journal_len = at;
1069 table->journal_mmap = journal_mmap;
1070
1071 /* preserve old files */
1072 FileInfo **new_old_files;
1073 new_old_files = g_new (FileInfo *, table->n_files);
1074 i = 0;
1075 for (fi = table->first_file; fi; fi = fi->next_file)
1076 new_old_files[i++] = file_info_ref (fi);
1077 g_assert (i == table->n_files);
1078
1079 /* blow away old files */
1080 for (i = 0; i < table->n_old_files; i++)
1081 file_info_unref (table->old_files[i], table->dir, TRUE);
1082 g_free (table->old_files);
1083
1084 table->n_old_files = table->n_files;
1085 table->old_files = new_old_files;
1086
1087 return TRUE;
1088
1089 failed_writing_journal:
1090 close (journal_fd);
1091 unlink (table->journal_tmp_fname);
1092 return FALSE;
1093 }
1094
1095
1096 /* --- in-memory tree lookup (implementations) --- */
compare_memory(guint a_len,const guint8 * a_data,guint b_len,const guint8 * b_data)1097 static inline gint compare_memory (guint a_len, const guint8 *a_data,
1098 guint b_len, const guint8 *b_data)
1099 {
1100 int rv;
1101 if (a_len < b_len)
1102 {
1103 rv = memcmp (a_data, b_data, a_len);
1104 if (rv == 0)
1105 rv = -1;
1106 }
1107 else if (a_len > b_len)
1108 {
1109 rv = memcmp (a_data, b_data, b_len);
1110 if (rv == 0)
1111 rv = 1;
1112 }
1113 else
1114 rv = memcmp (a_data, b_data, a_len);
1115 return rv;
1116 }
1117
1118 static TreeNode *
in_memory_tree_lookup_memcmp(GskTable * table,guint key_len,const guint8 * key_data)1119 in_memory_tree_lookup_memcmp (GskTable *table,
1120 guint key_len,
1121 const guint8 *key_data)
1122 {
1123 TreeNode *found = NULL;
1124 #define TMP_KEY_COMPARATOR(unused, at, rv) \
1125 rv = compare_memory (key_len, key_data, at->key.len, at->key.data)
1126 GSK_RBTREE_LOOKUP_COMPARATOR (GET_IN_MEMORY_TREE (table),
1127 unused, TMP_KEY_COMPARATOR, found);
1128 #undef TMP_KEY_COMPARATOR
1129 return found;
1130 }
1131 static TreeNode *
in_memory_tree_lookup_with_len(GskTable * table,guint key_len,const guint8 * key_data)1132 in_memory_tree_lookup_with_len (GskTable *table,
1133 guint key_len,
1134 const guint8 *key_data)
1135 {
1136 TreeNode *found = NULL;
1137 #define TMP_KEY_COMPARATOR(unused, at, rv) \
1138 rv = table->compare.with_len (key_len, key_data, \
1139 at->key.len, at->key.data, \
1140 table->user_data)
1141 GSK_RBTREE_LOOKUP_COMPARATOR (GET_IN_MEMORY_TREE (table),
1142 unused, TMP_KEY_COMPARATOR, found);
1143 #undef TMP_KEY_COMPARATOR
1144 return found;
1145 }
1146
1147 static TreeNode *
in_memory_tree_lookup_no_len(GskTable * table,guint key_len,const guint8 * key_data)1148 in_memory_tree_lookup_no_len (GskTable *table,
1149 guint key_len,
1150 const guint8 *key_data)
1151 {
1152 TreeNode *found = NULL;
1153 #define TMP_KEY_COMPARATOR(unused, at, rv) \
1154 rv = table->compare.no_len (key_data, at->key.data, table->user_data)
1155 GSK_RBTREE_LOOKUP_COMPARATOR (GET_IN_MEMORY_TREE (table),
1156 unused, TMP_KEY_COMPARATOR, found);
1157 #undef TMP_KEY_COMPARATOR
1158 return found;
1159 }
1160
tree_node_compare_memcmp(GskTable * table,TreeNode * a,TreeNode * b)1161 static int tree_node_compare_memcmp (GskTable *table,
1162 TreeNode *a,
1163 TreeNode *b)
1164 {
1165 return compare_memory (a->key.len, a->key.data, b->key.len, b->key.data);
1166 }
tree_node_compare_with_len(GskTable * table,TreeNode * a,TreeNode * b)1167 static int tree_node_compare_with_len (GskTable *table,
1168 TreeNode *a,
1169 TreeNode *b)
1170 {
1171 return table->compare.with_len (a->key.len, a->key.data,
1172 b->key.len, b->key.data,
1173 table->user_data);
1174 }
tree_node_compare_no_len(GskTable * table,TreeNode * a,TreeNode * b)1175 static int tree_node_compare_no_len (GskTable *table,
1176 TreeNode *a,
1177 TreeNode *b)
1178 {
1179 return table->compare.no_len (a->key.data, b->key.data, table->user_data);
1180 }
1181 /* NOTE on nomerge variants. These do not return 0
1182 unless the treenodes are the same (pointerwise).
1183 Note that because the nodes are allocated from
1184 the pool in forward order, pointerwise comparisons
1185 are equivalent to timewise comparisons,
1186 so this comparison is the appropriate (and cheapest) test. */
tree_node_compare_memcmp_nomerge(GskTable * table,TreeNode * a,TreeNode * b)1187 static int tree_node_compare_memcmp_nomerge (GskTable *table,
1188 TreeNode *a,
1189 TreeNode *b)
1190 {
1191 int rv = compare_memory (a->key.len, a->key.data, b->key.len, b->key.data);
1192 if (rv == 0)
1193 rv = (a < b) ? -1 : (a > b) ? 1 : 0;
1194 return rv;
1195 }
tree_node_compare_with_len_nomerge(GskTable * table,TreeNode * a,TreeNode * b)1196 static int tree_node_compare_with_len_nomerge (GskTable *table,
1197 TreeNode *a,
1198 TreeNode *b)
1199 {
1200 int rv = table->compare.with_len (a->key.len, a->key.data,
1201 b->key.len, b->key.data,
1202 table->user_data);
1203 if (rv == 0)
1204 rv = (a < b) ? -1 : (a > b) ? 1 : 0;
1205 return rv;
1206 }
tree_node_compare_no_len_nomerge(GskTable * table,TreeNode * a,TreeNode * b)1207 static int tree_node_compare_no_len_nomerge (GskTable *table,
1208 TreeNode *a,
1209 TreeNode *b)
1210 {
1211 int rv = table->compare.no_len (a->key.data, b->key.data, table->user_data);
1212 if (rv == 0)
1213 rv = (a < b) ? -1 : (a > b) ? 1 : 0;
1214 return rv;
1215 }
1216
1217 static int
file_query_compare_memcmp(guint test_key_len,const guint8 * test_key,gpointer compare_data)1218 file_query_compare_memcmp (guint test_key_len,
1219 const guint8 *test_key,
1220 gpointer compare_data)
1221 {
1222 GskTable *table = compare_data;
1223 guint a_len = table->file_query_key_len;
1224 const guint8 *a = table->file_query_key_data;
1225 guint b_len = test_key_len;
1226 const guint8 *b = test_key;
1227 #if 0
1228 {
1229 char *a_hex = gsk_escape_memory_hex (a, a_len);
1230 char *b_hex = gsk_escape_memory_hex (b, b_len);
1231 g_message ("compare_memory: '%s' v '%s'", a_hex, b_hex);
1232 g_free (a_hex);
1233 g_free (b_hex);
1234 }
1235 #endif
1236 return compare_memory (a_len, a, b_len, b);
1237 }
1238 static int
file_query_compare_no_len(guint test_key_len,const guint8 * test_key,gpointer compare_data)1239 file_query_compare_no_len (guint test_key_len,
1240 const guint8 *test_key,
1241 gpointer compare_data)
1242 {
1243 GskTable *table = compare_data;
1244 const guint8 *a = table->file_query_key_data;
1245 const guint8 *b = test_key;
1246 return table->compare.no_len (a, b, table->user_data);
1247 }
1248 static int
file_query_compare_with_len(guint test_key_len,const guint8 * test_key,gpointer compare_data)1249 file_query_compare_with_len (guint test_key_len,
1250 const guint8 *test_key,
1251 gpointer compare_data)
1252 {
1253 GskTable *table = compare_data;
1254 guint a_len = table->file_query_key_len;
1255 const guint8 *a = table->file_query_key_data;
1256 guint b_len = test_key_len;
1257 const guint8 *b = test_key;
1258 return table->compare.with_len (a_len, a, b_len, b, table->user_data);
1259 }
1260
1261
1262 /**
1263 * gsk_table_new:
1264 * @dir: the directory where the table will store its data.
1265 * @options: configuration and optimization hints.
1266 * @new_flags: whether to create or open an existing table.
1267 * @error: place to put the error if something goes wrong.
1268 *
1269 * Create a new GskTable object.
1270 * Only one table may use a directory at a time.
1271 *
1272 * @options gives both compare, merge and delete
1273 * semantics, and hints on the sizes of the data.
1274 *
1275 * @new_flags determines whether we are allowed
1276 * to create a new table or open an existing table.
1277 * If @new_flags is 0, we will permit either,
1278 * equivalent to GSK_TABLE_MAY_CREATE|GSK_TABLE_MAY_EXIST.
1279 *
1280 * returns: the newly created table object, or NULL on error.
1281 */
1282 GskTable *
gsk_table_new(const char * dir,const GskTableOptions * options,GskTableNewFlags new_flags,GError ** error)1283 gsk_table_new (const char *dir,
1284 const GskTableOptions *options,
1285 GskTableNewFlags new_flags,
1286 GError **error)
1287 {
1288 gboolean did_mkdir;
1289 GskTable *table;
1290 RunTaskFuncs *run_funcs;
1291 gboolean has_len;
1292 int lock_fd;
1293 GskTableFileFactory *factory;
1294
1295 run_funcs = table_options_get_run_funcs (options, &has_len, error);
1296 if (run_funcs == NULL)
1297 return NULL;
1298 factory = table_options_get_file_factory (options, error);
1299 if (factory == NULL)
1300 return NULL;
1301
1302 if (g_file_test (dir, G_FILE_TEST_IS_DIR))
1303 {
1304 if ((new_flags & GSK_TABLE_MAY_EXIST) == 0)
1305 {
1306 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_EXISTS,
1307 "table dir %s already exists", dir);
1308 return NULL;
1309 }
1310 did_mkdir = FALSE;
1311 }
1312 else
1313 {
1314 if ((new_flags & GSK_TABLE_MAY_CREATE) == 0)
1315 {
1316 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_EXISTS,
1317 "table dir %s already exists", dir);
1318 return NULL;
1319 }
1320 did_mkdir = TRUE;
1321 if (!gsk_mkdir_p (dir, 0755, error))
1322 return FALSE;
1323 }
1324
1325 lock_fd = gsk_lock_dir (dir, FALSE, error);
1326 if (lock_fd < 0)
1327 return FALSE;
1328
1329 table = g_new0 (GskTable, 1);
1330 table->dir = g_strdup (dir);
1331 table->lock_fd = lock_fd;
1332 table->run_funcs = run_funcs;
1333 table->in_memory_tree_lookup
1334 = table->compare.no_len == NULL ? in_memory_tree_lookup_memcmp
1335 : has_len ? in_memory_tree_lookup_with_len
1336 : in_memory_tree_lookup_no_len;
1337 if (options->merge == NULL && options->merge_no_len == NULL)
1338 table->tree_node_compare
1339 = table->compare.no_len == NULL ? tree_node_compare_memcmp_nomerge
1340 : has_len ? tree_node_compare_with_len_nomerge
1341 : tree_node_compare_no_len_nomerge;
1342 else
1343 table->tree_node_compare
1344 = table->compare.no_len == NULL ? tree_node_compare_memcmp
1345 : has_len ? tree_node_compare_with_len
1346 : tree_node_compare_no_len;
1347 table->journal_mode = options->journal_mode;
1348 table->max_running_tasks = 4;
1349 table->max_merge_ratio_b16 = 3<<16;
1350 table->max_in_memory_bytes = options->max_in_memory_bytes;
1351 table->max_in_memory_entries = options->max_in_memory_entries;
1352 table->journal_flush_period = 3;
1353 table->tree_node_pool = g_new0 (TreeNode, table->max_in_memory_entries);
1354 table->journal_cur_fname = g_strdup_printf ("%s/journal", dir);
1355 table->journal_tmp_fname = g_strdup_printf ("%s/journal.tmp", dir);
1356 table->key_fixed_length = -1;
1357 table->value_fixed_length = -1;
1358 table->file_factory = factory;
1359 // INIT? query_inout ???
1360 table->file_query.compare
1361 = table->compare.no_len == NULL ? file_query_compare_memcmp
1362 : has_len ? file_query_compare_with_len
1363 : file_query_compare_no_len;
1364 table->file_query.compare_data = table;
1365
1366 table->has_len = has_len;
1367 if (has_len)
1368 {
1369 table->compare.with_len = options->compare;
1370 table->merge.with_len = options->merge;
1371 table->simplify.with_len = options->simplify;
1372 }
1373 else
1374 {
1375 table->compare.no_len = options->compare_no_len;
1376 table->merge.no_len = options->merge_no_len;
1377 table->simplify.no_len = options->simplify_no_len;
1378 }
1379 table->is_stable_func = options->is_stable;
1380
1381 if (did_mkdir)
1382 {
1383 /* make an empty journal file */
1384 guint journal_size = 1024;
1385 guint min_journal_size = options->max_in_memory_bytes + 8 * options->max_in_memory_bytes;
1386 while (journal_size < min_journal_size)
1387 journal_size += journal_size;
1388 table->journal_size = journal_size;
1389 table->journal_fd = -1;
1390 if (!reset_journal (table, error))
1391 {
1392 gsk_table_destroy (table);
1393 gsk_rm_rf (table->dir, NULL);
1394 return NULL;
1395 }
1396 }
1397 else
1398 {
1399 /* load existing table */
1400 if (!read_journal (table, error))
1401 {
1402 GError *e = NULL;
1403 g_free (table->dir);
1404 if (!gsk_unlock_dir (table->lock_fd, &e))
1405 {
1406 g_warning ("error unlocking dir on failure reading journal: %s",
1407 e->message);
1408 g_clear_error (&e);
1409 }
1410 g_free (table);
1411 return NULL;
1412 }
1413 }
1414
1415 return table;
1416 }
1417
1418
1419 /* --- starting a merge-task */
1420 static gboolean
start_merge_task(GskTable * table,MergeTask * merge_task,GError ** error)1421 start_merge_task (GskTable *table,
1422 MergeTask *merge_task,
1423 GError **error)
1424 {
1425 FileInfo *prev = merge_task->inputs[0];
1426 FileInfo *next = merge_task->inputs[1];
1427 GskTableFileHints file_hints = GSK_TABLE_FILE_HINTS_DEFAULTS;
1428 guint64 output_file_id;
1429 GskTableFile *output;
1430 guint input;
1431 GskTableReader *readers[2];
1432 guint64 n_input_entries = prev->file->n_entries + next->file->n_entries;
1433 #if DEBUG_MERGE_TASKS
1434 g_message ("starting mergetask between "ID_FMT" and "ID_FMT" [%"G_GUINT64_FORMAT" input entries]",
1435 prev->file->id, next->file->id, n_input_entries);
1436 #endif
1437 g_assert (!merge_task->is_started);
1438 g_assert (prev->prev_task == NULL || !prev->prev_task->is_started);
1439 g_assert (prev->next_task == merge_task);
1440 g_assert (next->prev_task == merge_task);
1441 g_assert (next->next_task == NULL || !next->next_task->is_started);
1442 if (prev->prev_task)
1443 {
1444 MergeTask *to_kill = prev->prev_task;
1445 g_assert (!to_kill->is_started);
1446 g_assert (to_kill->inputs[1] == prev);
1447 kill_unstarted_merge_task (table, to_kill);
1448 }
1449 if (next->next_task)
1450 {
1451 MergeTask *to_kill = next->next_task;
1452 g_assert (!to_kill->is_started);
1453 g_assert (to_kill->inputs[0] == next);
1454 kill_unstarted_merge_task (table, to_kill);
1455 }
1456
1457 GSK_RBTREE_REMOVE (GET_UNSTARTED_MERGE_TASK_TREE (table), merge_task);
1458
1459 for (input = 0; input < 2; input++)
1460 {
1461 readers[input] = gsk_table_file_create_reader (merge_task->inputs[input]->file,
1462 table->dir,
1463 error);
1464 if (readers[input] == NULL)
1465 {
1466 gsk_g_error_add_prefix (error, "creating merge job: error making reader %u", input);
1467 if (input == 1)
1468 gsk_table_reader_destroy (readers[0]);
1469 return FALSE;
1470 }
1471 }
1472
1473 output_file_id = ++(table->last_file_id);
1474 output = gsk_table_file_factory_create_file (table->file_factory,
1475 table->dir,
1476 output_file_id,
1477 &file_hints,
1478 error);
1479 if (output == NULL)
1480 {
1481 gsk_g_error_add_prefix (error, "creating merge-task output");
1482 gsk_table_reader_destroy (readers[0]);
1483 gsk_table_reader_destroy (readers[1]);
1484 return FALSE;
1485 }
1486
1487 merge_task->is_started = TRUE;
1488 merge_task->info.started.output = output;
1489 merge_task->info.started.inputs[0].reader = readers[0];
1490 merge_task->info.started.inputs[1].reader = readers[1];
1491 merge_task->info.started.has_last_queryable_key = FALSE;
1492 gsk_table_buffer_init (&merge_task->info.started.last_queryable_key);
1493
1494 /* insert into run-list */
1495 MergeTask **p_next = &table->run_list;
1496 for (;;)
1497 {
1498 MergeTask *next = *p_next;
1499 guint64 next_n_input_entries;
1500 if (next == NULL)
1501 break;
1502 next_n_input_entries = next->inputs[0]->file->n_entries
1503 + next->inputs[1]->file->n_entries;
1504 if (next_n_input_entries > n_input_entries)
1505 break;
1506
1507 p_next = &next->info.started.next_run;
1508 }
1509 merge_task->info.started.next_run = *p_next;
1510 *p_next = merge_task;
1511 table->n_running_tasks++;
1512 return TRUE;
1513 }
1514
1515 static gboolean
maybe_start_tasks(GskTable * table,GError ** error)1516 maybe_start_tasks (GskTable *table,
1517 GError **error)
1518 {
1519 while (table->n_running_tasks < table->max_running_tasks
1520 && table->unstarted_merges != NULL)
1521 {
1522 MergeTask *bottom_heaviest;
1523 GSK_RBTREE_FIRST (GET_UNSTARTED_MERGE_TASK_TREE (table), bottom_heaviest);
1524 if (bottom_heaviest->info.unstarted.input_size_ratio_b16
1525 > table->max_merge_ratio_b16)
1526 break;
1527 if (!start_merge_task (table, bottom_heaviest, error))
1528 return FALSE;
1529 }
1530 return TRUE;
1531 }
1532
1533 static inline gboolean
run_merge_task(GskTable * table,guint count,gboolean flush,GError ** error)1534 run_merge_task (GskTable *table,
1535 guint count,
1536 gboolean flush,
1537 GError **error)
1538 {
1539 MergeTask *merge_task = table->run_list;
1540 RunTaskFuncs *run_funcs = table->run_funcs;
1541 RunTaskFunc func;
1542 gboolean use_simplify;
1543 g_assert (merge_task != NULL);
1544
1545 #if DEBUG_MERGE_TASKS
1546 g_message ("run_merge_task between "ID_FMT" and "ID_FMT"; count=%u, flush=%d",
1547 merge_task->inputs[0]->file->id,
1548 merge_task->inputs[1]->file->id,
1549 count, flush);
1550 #endif
1551
1552 use_simplify = merge_task->inputs[0]->first_input_entry == 0
1553 && table->simplify.no_len != NULL;
1554 func = use_simplify ? (flush ? run_funcs->simplify_flush
1555 : run_funcs->simplify_noflush)
1556 : (flush ? run_funcs->nosimplify_flush
1557 : run_funcs->nosimplify_noflush);
1558 return (*func) (table, count, error);
1559 }
1560
1561 static gboolean
dump_tree_recursively(TreeNode * node,GskTableFile * file,GError ** error)1562 dump_tree_recursively (TreeNode *node,
1563 GskTableFile *file,
1564 GError **error)
1565 {
1566 if (node->left != NULL
1567 && !dump_tree_recursively (node->left, file, error))
1568 return FALSE;
1569 if (gsk_table_file_feed_entry (file, node->key.len, node->key.data,
1570 node->value.len, node->value.data,
1571 error) == GSK_TABLE_FEED_ENTRY_ERROR)
1572 return FALSE;
1573 if (node->right != NULL
1574 && !dump_tree_recursively (node->right, file, error))
1575 return FALSE;
1576 return TRUE;
1577 }
1578
1579 static gboolean
flush_tree(GskTable * table,GError ** error)1580 flush_tree (GskTable *table,
1581 GError **error)
1582 {
1583 guint64 id = ++(table->last_file_id);
1584 GskTableFileHints file_hints = GSK_TABLE_FILE_HINTS_DEFAULTS;
1585 GskTableFile *file = gsk_table_file_factory_create_file (table->file_factory,
1586 table->dir,
1587 id,
1588 &file_hints,
1589 error);
1590 FileInfo *fi;
1591 gboolean done;
1592 if (file == NULL)
1593 {
1594 gsk_g_error_add_prefix (error, "flushing in-memory tree");
1595 return FALSE;
1596 }
1597 if (!dump_tree_recursively (table->in_memory_tree, file, error))
1598 {
1599 gsk_g_error_add_prefix (error, "dumping in-memory tree");
1600 gsk_table_file_destroy (file, table->dir, TRUE, NULL);
1601 return FALSE;
1602 }
1603 if (!gsk_table_file_done_feeding (file, &done, error))
1604 {
1605 gsk_g_error_add_prefix (error, "finishing flushing in-memory tree");
1606 gsk_table_file_destroy (file, table->dir, TRUE, NULL);
1607 return FALSE;
1608 }
1609 if (done == FALSE)
1610 {
1611 g_error ("TODO: handle files that require a bit of baking at end");
1612 }
1613
1614 fi = g_slice_new0 (FileInfo);
1615 fi->ref_count = 1;
1616 fi->first_input_entry = table->n_input_entries - table->in_memory_entry_count;
1617 fi->n_input_entries = table->in_memory_entry_count;
1618 fi->file = file;
1619 table->n_files++;
1620 GSK_LIST_APPEND (GET_FILE_INFO_LIST (table), fi);
1621 if (fi->prev_file && TASK_IS_UNSTARTED (fi->prev_file->prev_task))
1622 create_unstarted_merge_task (table, fi->prev_file, fi);
1623 CHECK_FILES_CONTIGUOUS (table);
1624
1625 /* reset tree */
1626 table->in_memory_entry_count = 0;
1627 table->in_memory_bytes = 0;
1628 table->in_memory_tree = NULL;
1629 table->tree_node_pool_used = 0;
1630 return TRUE;
1631 }
1632
1633 /**
1634 * gsk_table_add:
1635 * @table: the table to add data to.
1636 * @key_len:
1637 * @key_data:
1638 * @value_len:
1639 * @value_data:
1640 * @error: place to put the error if something goes wrong.
1641 *
1642 * Add a new key/value pair to a GskTable.
1643 * If the key already exists, the semantics are dependent
1644 * on the merge function; if no merge function is given,
1645 * then both rows will exist in the table.
1646 *
1647 * returns: whether the addition was successful.
1648 */
1649 gboolean
gsk_table_add(GskTable * table,guint key_len,const guint8 * key_data,guint value_len,const guint8 * value_data,GError ** error)1650 gsk_table_add (GskTable *table,
1651 guint key_len,
1652 const guint8 *key_data,
1653 guint value_len,
1654 const guint8 *value_data,
1655 GError **error)
1656 {
1657 TreeNode *found;
1658 gboolean must_write_journal = table->journal_mode == GSK_TABLE_JOURNAL_DEFAULT;
1659 g_assert (table->key_fixed_length < 0
1660 || (guint) table->key_fixed_length == key_len);
1661 g_assert (table->value_fixed_length < 0
1662 || (guint) table->value_fixed_length == value_len);
1663
1664 table->n_input_entries++;
1665
1666 if (table->merge.no_len == NULL)
1667 found = NULL;
1668 else
1669 found = table->in_memory_tree_lookup (table, key_len, key_data);
1670 if (found)
1671 {
1672 /* Merge the old data with the new data. */
1673 GskTableMergeResult merge_result;
1674 if (table->has_len)
1675 merge_result = table->merge.with_len (key_len, key_data,
1676 found->value.len, found->value.data,
1677 value_len, value_data,
1678 &table->merge_buffer, table->user_data);
1679 else
1680 merge_result = table->merge.no_len (key_data, found->value.data,
1681 value_data,
1682 &table->merge_buffer, table->user_data);
1683
1684 switch (merge_result)
1685 {
1686 case GSK_TABLE_MERGE_RETURN_A:
1687 /* nothing to do */
1688 break;
1689 case GSK_TABLE_MERGE_RETURN_B:
1690 table->in_memory_bytes -= found->value.len;
1691 table->in_memory_bytes += value_len;
1692 set_buffer (&found->value, value_len, value_data);
1693 break;
1694 case GSK_TABLE_MERGE_SUCCESS:
1695 table->in_memory_bytes -= found->value.len;
1696 table->in_memory_bytes += table->merge_buffer.len;
1697 copy_buffer (&found->value, &table->merge_buffer);
1698 break;
1699 case GSK_TABLE_MERGE_DROP:
1700 GSK_RBTREE_REMOVE (GET_IN_MEMORY_TREE (table), found);
1701 /* TreeNode itself will be auto-recycled */
1702 break;
1703 }
1704 }
1705 else
1706 {
1707 TreeNode *new = table->tree_node_pool + table->tree_node_pool_used++;
1708
1709 /* write key and value into the new node */
1710 set_buffer (&new->key, key_len, key_data);
1711 set_buffer (&new->value, value_len, value_data);
1712
1713 /* TODO: this repeats the work of the lookup.
1714 maybe we need a GSK_RBTREE_INSERT_NO_REPLACE()
1715 which can be used at the outset instead of LOOKUP_COMPARATOR? */
1716 GSK_RBTREE_INSERT (GET_IN_MEMORY_TREE (table), new, found);
1717 g_assert (found == NULL);
1718
1719 table->in_memory_bytes += key_len + value_len;
1720 }
1721
1722 table->in_memory_entry_count++;
1723 if (table->in_memory_entry_count == table->max_in_memory_entries
1724 || table->in_memory_bytes >= table->max_in_memory_bytes)
1725 {
1726 /* flush the tree */
1727 if (!flush_tree (table, error))
1728 {
1729 gsk_g_error_add_prefix (error, "flushing tree");
1730 return FALSE;
1731 }
1732
1733 /* maybe flush journal */
1734 if (table->journal_mode != GSK_TABLE_JOURNAL_NONE
1735 && (++(table->journal_flush_index) == table->journal_flush_period))
1736 {
1737 /* write new journal */
1738 if (!reset_journal (table, error))
1739 {
1740 gsk_g_error_add_prefix (error, "error flushing journal");
1741 return FALSE;
1742 }
1743
1744 table->journal_flush_index = 0;
1745 must_write_journal = FALSE;
1746 }
1747
1748 if (!maybe_start_tasks (table, error))
1749 return FALSE;
1750 }
1751
1752 if (table->run_list != NULL)
1753 {
1754 if (!run_merge_task (table, 32, FALSE, error))
1755 return FALSE;
1756 }
1757
1758 if (must_write_journal)
1759 {
1760 guint new_journal_len = 4 + key_len + 4 + value_len + table->journal_len;
1761 if (new_journal_len % 4 != 0)
1762 new_journal_len += (4 - new_journal_len % 4);
1763 if (new_journal_len + 4 > table->journal_size)
1764 {
1765 if (!resize_journal (table->journal_fd,
1766 &table->journal_mmap,
1767 &table->journal_size,
1768 new_journal_len + 4,
1769 error))
1770 {
1771 gsk_g_error_add_prefix (error, "expanding journal");
1772 return FALSE;
1773 }
1774 }
1775 #if DEBUG_JOURNAL_WRITING
1776 g_message ("writing journal at %u: key/value_len=%u/%u", (guint) table->journal_len, key_len,value_len);
1777 #endif
1778 memset (table->journal_mmap + new_journal_len, 0, 4);
1779 memcpy (table->journal_mmap + 8 + table->journal_len,
1780 key_data, key_len);
1781 memcpy (table->journal_mmap + 8 + table->journal_len + key_len,
1782 value_data, value_len);
1783 ((guint32*)(table->journal_mmap + table->journal_len))[1]
1784 = GUINT32_TO_LE (value_len);
1785 GSK_MEMORY_BARRIER ();
1786 ((guint32*)(table->journal_mmap + table->journal_len))[0]
1787 = GUINT32_TO_LE (key_len + 1);
1788 table->journal_len = new_journal_len;
1789 }
1790 return TRUE;
1791 }
1792
1793 static inline int
do_compare(GskTable * table,guint a_len,const guint8 * a_data,guint b_len,const guint8 * b_data)1794 do_compare (GskTable *table,
1795 guint a_len,
1796 const guint8 *a_data,
1797 guint b_len,
1798 const guint8 *b_data)
1799 {
1800 if (table->compare.no_len == NULL)
1801 return compare_memory (a_len, a_data, b_len, b_data);
1802 else if (table->has_len)
1803 return table->compare.with_len (a_len, a_data, b_len, b_data, table->user_data);
1804 else
1805 return table->compare.no_len (a_data, b_data, table->user_data);
1806 }
1807
1808 gboolean
gsk_table_query(GskTable * table,guint key_len,const guint8 * key_data,gboolean * found_value_out,guint * value_len_out,guint8 ** value_data_out,GError ** error)1809 gsk_table_query (GskTable *table,
1810 guint key_len,
1811 const guint8 *key_data,
1812 gboolean *found_value_out,
1813 guint *value_len_out,
1814 guint8 **value_data_out,
1815 GError **error)
1816 {
1817 gboolean reverse = table->query_reverse_chronologically;
1818 gboolean has_result = FALSE;
1819 GskTableBuffer *result_buffers = table->result_buffers; /* [2] */
1820 GskTableBuffer *result = result_buffers;
1821 GskTableBuffer *other_result = result_buffers+1;
1822 GskTableFileQuery *query = &table->file_query;
1823 FileInfo *fi;
1824 gboolean use_merge_tasks = TRUE;
1825 gpointer user_data = table->user_data;
1826
1827 table->file_query_key_len = key_len;
1828 table->file_query_key_data = key_data;
1829
1830 #if DEBUG_PRINT_QUERIES
1831 {
1832 char *hex = gsk_escape_memory_hex (key_data, key_len);
1833 g_message ("lookup '%s'", hex);
1834 g_free (hex);
1835 }
1836 #endif
1837
1838 /* first query rbtree (if in reverse-chronological mode (default)) */
1839 if (reverse)
1840 {
1841 TreeNode *node = table->in_memory_tree_lookup (table, key_len, key_data);
1842 if (node != NULL)
1843 {
1844 has_result = TRUE;
1845 copy_buffer (result, &node->value);
1846
1847 /* are we done? */
1848 if (table->is_stable_func != NULL
1849 && table->is_stable_func (key_len, key_data,
1850 result->len, result->data,
1851 table->user_data))
1852 goto done_querying_copy_result;
1853 }
1854 }
1855
1856 /* walk through files, using merge-jobs as appropriate */
1857 for (fi = reverse ? table->last_file : table->first_file;
1858 fi != NULL;
1859 fi = reverse ? fi->prev_file : fi->next_file)
1860 {
1861 MergeTask *mt = reverse ? fi->prev_task : fi->next_task;
1862 gboolean used_merge_output = FALSE;
1863 if (use_merge_tasks && mt != NULL && mt->is_started)
1864 {
1865 if (mt->info.started.has_last_queryable_key)
1866 {
1867 /* compare last_queryable_key to key */
1868 int rv = do_compare (table,
1869 mt->info.started.last_queryable_key.len,
1870 mt->info.started.last_queryable_key.data,
1871 key_len, key_data);
1872 if (rv < 0)
1873 goto cannot_use_merge_output;
1874
1875
1876 if (!gsk_table_file_query (mt->info.started.output,
1877 query, error))
1878 {
1879 gsk_g_error_add_prefix (error, "querying merge-task output");
1880 goto failed;
1881 }
1882 used_merge_output = TRUE;
1883 goto handle_file_query_result;
1884 }
1885 }
1886 cannot_use_merge_output:
1887
1888 /* query file */
1889 if (!gsk_table_file_query (fi->file, query, error))
1890 {
1891 gsk_g_error_add_prefix (error, "querying merge-task output");
1892 goto failed;
1893 }
1894
1895 handle_file_query_result:
1896 if (query->found)
1897 {
1898 if (has_result)
1899 {
1900 /* merge values */
1901 GskTableMergeResult merge_result;
1902 if (reverse)
1903 merge_result
1904 = table->has_len ?
1905 table->merge.with_len (key_len, key_data,
1906 query->value.len,
1907 query->value.data,
1908 result->len, result->data,
1909 other_result,
1910 user_data)
1911 : table->merge.no_len (key_data,
1912 query->value.data,
1913 result->data,
1914 other_result,
1915 user_data);
1916 else
1917 merge_result
1918 = table->has_len ?
1919 table->merge.with_len (key_len, key_data,
1920 result->len, result->data,
1921 query->value.len,
1922 query->value.data,
1923 other_result,
1924 user_data)
1925 : table->merge.no_len (key_data,
1926 result->data,
1927 query->value.data,
1928 other_result,
1929 user_data);
1930 switch (merge_result)
1931 {
1932 case GSK_TABLE_MERGE_RETURN_A:
1933 if (reverse)
1934 copy_buffer (result, &query->value);
1935 else
1936 (void) 0; /* do nothing: result is already correct */
1937 break;
1938 case GSK_TABLE_MERGE_RETURN_B:
1939 if (!reverse)
1940 copy_buffer (result, &query->value);
1941 else
1942 (void) 0; /* do nothing: result is already correct */
1943 break;
1944 case GSK_TABLE_MERGE_SUCCESS:
1945 {
1946 GskTableBuffer *tmp = result;
1947 result = other_result;
1948 other_result = tmp;
1949 break;
1950 }
1951 case GSK_TABLE_MERGE_DROP:
1952 has_result = FALSE;
1953 break;
1954 default:
1955 g_assert_not_reached ();
1956 }
1957 }
1958 else if (table->merge.no_len == NULL)
1959 {
1960 *found_value_out = TRUE;
1961 copy_buffer (result, &query->value);
1962 goto done_querying_copy_result;
1963 }
1964 else
1965 {
1966 has_result = TRUE;
1967 copy_buffer (result, &query->value);
1968 }
1969
1970 /* are we done? */
1971 if (table->is_stable_func != NULL
1972 && table->is_stable_func (key_len, key_data,
1973 result->len, result->data,
1974 table->user_data))
1975 goto done_querying_copy_result;
1976 }
1977
1978 /* skip one extra file */
1979 if (used_merge_output)
1980 fi = reverse ? fi->prev_file : fi->next_file;
1981 }
1982
1983 /* last query rbtree (if in chronological mode) */
1984 if (!reverse)
1985 {
1986 TreeNode *node = table->in_memory_tree_lookup (table, key_len, key_data);
1987 if (node != NULL)
1988 {
1989 if (has_result)
1990 {
1991 /* merge */
1992 GskTableMergeResult merge_result;
1993 merge_result
1994 = table->has_len
1995 ? table->merge.with_len (key_len, key_data,
1996 result->len, result->data,
1997 node->value.len, node->value.data,
1998 other_result,
1999 user_data)
2000 : table->merge.no_len (key_data,
2001 result->data,
2002 node->value.data,
2003 other_result,
2004 user_data);
2005 switch (merge_result)
2006 {
2007 case GSK_TABLE_MERGE_RETURN_A:
2008 break;
2009 case GSK_TABLE_MERGE_RETURN_B:
2010 copy_buffer (result, &node->value);
2011 break;
2012 case GSK_TABLE_MERGE_SUCCESS:
2013 {
2014 GskTableBuffer *tmp = result;
2015 result = other_result;
2016 other_result = tmp;
2017 break;
2018 }
2019 case GSK_TABLE_MERGE_DROP:
2020 has_result = FALSE;
2021 break;
2022 default:
2023 g_assert_not_reached ();
2024 }
2025 }
2026 else
2027 {
2028 has_result = TRUE;
2029 copy_buffer (result, &node->value);
2030 }
2031
2032 /* note: no need to check stability,
2033 since there's nothing more to do anyways. */
2034 }
2035 }
2036
2037 done_querying_copy_result:
2038 *found_value_out = has_result;
2039 if (has_result)
2040 {
2041 *value_len_out = result->len;
2042 *value_data_out = g_memdup (result->data, result->len);
2043 }
2044 /* fall through */
2045
2046 //done_querying:
2047 return TRUE;
2048
2049 failed:
2050 return FALSE;
2051 }
2052
2053 const char *
gsk_table_peek_dir(GskTable * table)2054 gsk_table_peek_dir (GskTable *table)
2055 {
2056 return table->dir;
2057 }
2058
2059 void
gsk_table_destroy(GskTable * table)2060 gsk_table_destroy (GskTable *table)
2061 {
2062 guint i;
2063 FileInfo *fi, *next=NULL;
2064 for (fi = table->first_file; fi != NULL; fi = next)
2065 {
2066 next = fi->next_file;
2067 file_info_unref (fi, table->dir, FALSE); /* may free fi */
2068 }
2069 for (i = 0; i < table->n_old_files; i++)
2070 file_info_unref (table->old_files[i], table->dir, FALSE);
2071 g_free (table->old_files);
2072 g_free (table->journal_cur_fname);
2073 g_free (table->journal_tmp_fname);
2074 munmap (table->journal_mmap, table->journal_size);
2075 gsk_table_buffer_clear (&table->result_buffers[0]);
2076 gsk_table_buffer_clear (&table->result_buffers[1]);
2077 gsk_table_buffer_clear (&table->merge_buffer);
2078 gsk_table_buffer_clear (&table->simplify_buffer);
2079 g_slice_free (GskTable, table);
2080 }
2081
2082
2083
2084 static gboolean
merge_task_done(GskTable * table,MergeTask * task,GError ** error)2085 merge_task_done (GskTable *table,
2086 MergeTask *task,
2087 GError **error)
2088 {
2089 gboolean done;
2090 FileInfo *new_file;
2091
2092 g_assert (task == table->run_list);
2093 g_assert (task->inputs[0]->prev_task == NULL);
2094 g_assert (task->inputs[1]->next_task == NULL);
2095
2096 /* remove this task from the run list */
2097 table->run_list = task->info.started.next_run;
2098 table->n_running_tasks--;
2099
2100 #if DEBUG_MERGE_TASKS
2101 g_message ("finished mergetask between "ID_FMT" and "ID_FMT, task->inputs[0]->file->id, task->inputs[1]->file->id);
2102 #endif
2103
2104 /* finish the output file */
2105 if (!gsk_table_file_done_feeding (task->info.started.output, &done, error))
2106 return FALSE;
2107 if (done == FALSE)
2108 g_error ("gsk_table_file_done_feeding not ready not handled yet");
2109
2110 /* destroy the input readers */
2111 gsk_table_reader_destroy (task->info.started.inputs[0].reader);
2112 gsk_table_reader_destroy (task->info.started.inputs[1].reader);
2113
2114 /* create a new FileInfo */
2115 new_file = g_slice_new0 (FileInfo);
2116 new_file->ref_count = 1;
2117 new_file->first_input_entry = task->inputs[0]->first_input_entry;
2118 new_file->n_input_entries = task->inputs[0]->n_input_entries
2119 + task->inputs[1]->n_input_entries;
2120 new_file->file = task->info.started.output;
2121
2122 /* replace task->inputs[0,1] with the new file */
2123 CHECK_FILES_CONTIGUOUS (table);
2124 GSK_LIST_INSERT_BEFORE (GET_FILE_INFO_LIST (table), task->inputs[0], new_file);
2125 GSK_LIST_REMOVE (GET_FILE_INFO_LIST (table), task->inputs[0]);
2126 GSK_LIST_REMOVE (GET_FILE_INFO_LIST (table), task->inputs[1]);
2127 table->n_files -= 1;
2128 CHECK_FILES_CONTIGUOUS (table);
2129
2130 /* possibly create more unstarted merge-tasks */
2131 if (new_file->prev_file != NULL
2132 && TASK_IS_UNSTARTED (new_file->prev_file->prev_task))
2133 create_unstarted_merge_task (table, new_file->prev_file, new_file);
2134 if (new_file->next_file != NULL
2135 && TASK_IS_UNSTARTED (new_file->next_file->next_task))
2136 create_unstarted_merge_task (table, new_file, new_file->next_file);
2137
2138 g_slice_free (MergeTask, task);
2139
2140 return TRUE;
2141 }
2142
2143 /* --- optimizing run_merge_task variants --- */
2144 #include "gsktable-implementations-generated.c"
2145
2146 #define DEFINE_RUN_TASK_FUNCS(suffix) \
2147 { run_merge_task__simplify_flush_##suffix, \
2148 run_merge_task__simplify_noflush_##suffix, \
2149 run_merge_task__nosimplify_flush_##suffix, \
2150 run_merge_task__nosimplify_noflush_##suffix }
2151 static RunTaskFuncs all_run_task_funcs[2][2][2] = /* has_len, has_compare, has_merge */
2152 { { { DEFINE_RUN_TASK_FUNCS(nolen_memcmp_nomerge),
2153 DEFINE_RUN_TASK_FUNCS(nolen_memcmp_merge) },
2154 { DEFINE_RUN_TASK_FUNCS(nolen_compare_nomerge),
2155 DEFINE_RUN_TASK_FUNCS(nolen_compare_merge) } },
2156 { { DEFINE_RUN_TASK_FUNCS(haslen_memcmp_nomerge),
2157 DEFINE_RUN_TASK_FUNCS(haslen_memcmp_merge) },
2158 { DEFINE_RUN_TASK_FUNCS(haslen_compare_nomerge),
2159 DEFINE_RUN_TASK_FUNCS(haslen_compare_merge) } } };
2160 #undef DEFINE_RUN_TASK_FUNCS
2161
table_options_get_run_funcs(const GskTableOptions * options,gboolean * has_len_out,GError ** error)2162 static RunTaskFuncs *table_options_get_run_funcs (const GskTableOptions *options,
2163 gboolean *has_len_out,
2164 GError **error)
2165 {
2166 gboolean has_len = options->compare != NULL
2167 || options->merge != NULL
2168 || options->simplify != NULL;
2169 gboolean has_compare = options->compare != NULL
2170 || options->compare_no_len != NULL;
2171 gboolean has_merge = options->merge != NULL
2172 || options->merge_no_len != NULL;
2173 #define TEST_MEMBER_NULL(member) \
2174 if (options->member != NULL) \
2175 { \
2176 g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_INVALID_ARGUMENT, \
2177 "length and no-length function pointers mixed: did not expect %s to be non-NULL", \
2178 #member); \
2179 return NULL; \
2180 }
2181 if (has_len)
2182 {
2183 TEST_MEMBER_NULL (compare_no_len);
2184 TEST_MEMBER_NULL (merge_no_len);
2185 TEST_MEMBER_NULL (simplify_no_len);
2186 }
2187 else
2188 {
2189 TEST_MEMBER_NULL (compare);
2190 TEST_MEMBER_NULL (merge);
2191 TEST_MEMBER_NULL (simplify);
2192 }
2193 *has_len_out = has_len;
2194 return &all_run_task_funcs[has_len?1:0][has_compare?1:0][has_merge?1:0];
2195 }
2196
2197 /* placeholder until there's actually some tunable stuff
2198 in flat file-factory (there certainly could be:
2199 chunk_size, compression_level) */
2200 static GskTableFileFactory *
table_options_get_file_factory(const GskTableOptions * options,GError ** error)2201 table_options_get_file_factory (const GskTableOptions *options,
2202 GError **error)
2203 {
2204 return gsk_table_file_factory_new_flat ();
2205 }
2206