1 /*-
2 * Public Domain 2014-2018 MongoDB, Inc.
3 * Public Domain 2008-2014 WiredTiger, Inc.
4 *
5 * This is free and unencumbered software released into the public domain.
6 *
7 * Anyone is free to copy, modify, publish, use, compile, sell, or
8 * distribute this software, either in source code form or as a compiled
9 * binary, for any purpose, commercial or non-commercial, and by any
10 * means.
11 *
12 * In jurisdictions that recognize copyright laws, the author or authors
13 * of this software dedicate any and all copyright interest in the
14 * software to the public domain. We make this dedication for the benefit
15 * of the public at large and to the detriment of our heirs and
16 * successors. We intend this dedication to be an overt act of
17 * relinquishment in perpetuity of all present and future rights to this
18 * software under copyright law.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
24 * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
25 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26 * OTHER DEALINGS IN THE SOFTWARE.
27 */
28 #include <sys/select.h>
29
30 #include <ctype.h>
31 #include <errno.h>
32 #include <inttypes.h>
33 #include <pthread.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37
38 #include <he.h>
39
40 #include <wiredtiger.h>
41 #include <wiredtiger_ext.h>
42
43 typedef struct he_env HE_ENV;
44 typedef struct he_item HE_ITEM;
45
46 static int verbose = 0; /* Verbose messages */
47
48 #define WT_ERR(a) do { \
49 if ((ret = (a)) != 0) \
50 goto err; \
51 } while (0)
52 #define WT_RET(a) do { \
53 int __ret; \
54 if ((__ret = (a)) != 0) \
55 return (__ret); \
56 } while (0)
57
58 /*
59 * Macros to output error and verbose messages, and set or return an error.
60 * Error macros require local "ret" variable.
61 *
62 * ESET: update an error value, handling more/less important errors.
63 * ERET: output a message, return the error.
64 * EMSG: output a message, set the local error value.
65 * EMSG_ERR:
66 * output a message, set the local error value, jump to the err label.
67 * VMSG: verbose message.
68 */
69 #undef ESET
70 #define ESET(a) do { \
71 int __v; \
72 if ((__v = (a)) != 0) { \
73 /* \
74 * On error, check for a panic (it overrides all other \
75 * returns). Else, if there's no return value or the \
76 * return value is not strictly an error, override it \
77 * with the error. \
78 */ \
79 if (__v == WT_PANIC || \
80 ret == 0 || \
81 ret == WT_DUPLICATE_KEY || ret == WT_NOTFOUND) \
82 ret = __v; \
83 /* \
84 * We don't want to return Helium errors to our caller. \
85 * Map non-system errors (indicated by a negative \
86 * value), outside the WiredTiger error name space, to a\
87 * generic WiredTiger error. \
88 */ \
89 if (ret < -31999 || (ret > -31800 && ret < 0)) \
90 ret = WT_ERROR; \
91 } \
92 } while (0)
93 #undef ERET
94 #define ERET(wt_api, session, v, ...) do { \
95 (void) \
96 wt_api->err_printf(wt_api, session, "helium: " __VA_ARGS__);\
97 ESET(v); \
98 return (ret); \
99 } while (0)
100 #undef EMSG
101 #define EMSG(wt_api, session, v, ...) do { \
102 (void) \
103 wt_api->err_printf(wt_api, session, "helium: " __VA_ARGS__);\
104 ESET(v); \
105 } while (0)
106 #undef EMSG_ERR
107 #define EMSG_ERR(wt_api, session, v, ...) do { \
108 (void) \
109 wt_api->err_printf(wt_api, session, "helium: " __VA_ARGS__);\
110 ESET(v); \
111 goto err; \
112 } while (0)
113 #undef VERBOSE_L1
114 #define VERBOSE_L1 1
115 #undef VERBOSE_L2
116 #define VERBOSE_L2 2
117 #undef VMSG
118 #define VMSG(wt_api, session, v, ...) do { \
119 if (verbose >= v) \
120 (void)wt_api-> \
121 msg_printf(wt_api, session, "helium: " __VA_ARGS__);\
122 } while (0)
123
124 /*
125 * OVERWRITE_AND_FREE --
126 * Make sure we don't re-use a structure after it's dead.
127 */
128 #undef OVERWRITE_AND_FREE
129 #define OVERWRITE_AND_FREE(p) do { \
130 memset(p, 0xab, sizeof(*(p))); \
131 free(p); \
132 } while (0)
133
134 /*
135 * Version each object, out of sheer raging paranoia.
136 */
137 #define WIREDTIGER_HELIUM_MAJOR 1 /* Major, minor version */
138 #define WIREDTIGER_HELIUM_MINOR 0
139
140 /*
141 * WiredTiger name space on the Helium store: all objects are named with the
142 * WiredTiger prefix (we don't require the Helium store be exclusive to our
143 * files). Primary objects are named "WiredTiger.[name]", associated cache
144 * objects are "WiredTiger.[name].cache". The per-connection transaction
145 * object is "WiredTiger.WiredTigerTxn". When we first open a Helium volume,
146 * we open/close a file in order to apply flags for the first open of the
147 * volume, that's "WiredTiger.WiredTigerInit".
148 */
149 #define WT_NAME_PREFIX "WiredTiger."
150 #define WT_NAME_INIT "WiredTiger.WiredTigerInit"
151 #define WT_NAME_TXN "WiredTiger.WiredTigerTxn"
152 #define WT_NAME_CACHE ".cache"
153
154 /*
155 * WT_SOURCE --
156 * A WiredTiger source, supporting one or more cursors.
157 */
158 typedef struct __wt_source {
159 char *uri; /* Unique name */
160
161 pthread_rwlock_t lock; /* Lock */
162 bool lockinit; /* Lock created */
163
164 bool configured; /* If structure configured */
165 u_int ref; /* Active reference count */
166
167 uint64_t append_recno; /* Allocation record number */
168
169 bool config_bitfield; /* config "value_format=#t" */
170 bool config_recno; /* config "key_format=r" */
171
172 /*
173 * Each WiredTiger object has a "primary" namespace in a Helium store
174 * plus a "cache" namespace, which has not-yet-resolved updates. There
175 * is a dirty flag so read-only data sets can ignore the cache.
176 */
177 he_t he; /* Underlying Helium object */
178 he_t he_cache; /* Underlying Helium cache */
179 bool he_cache_inuse; /* Cache is in use */
180 int he_cache_ops; /* Operations since cleaning */
181
182 struct __he_source *hs; /* Underlying Helium source */
183 struct __wt_source *next; /* List of WiredTiger objects */
184 } WT_SOURCE;
185
186 /*
187 * HELIUM_SOURCE --
188 * A Helium volume, supporting one or more WT_SOURCE objects.
189 */
190 typedef struct __he_source {
191 /* The transaction commit handler must appear first in the structure. */
192 WT_TXN_NOTIFY txn_notify; /* Transaction commit handler */
193
194 WT_EXTENSION_API *wt_api; /* Extension functions */
195
196 char *name; /* Unique WiredTiger name */
197 char *device; /* Unique Helium volume name */
198
199 /*
200 * Maintain a handle for each underlying Helium source so checkpoint is
201 * faster, we can "commit" a single handle per source, regardless of the
202 * number of objects.
203 */
204 he_t he_volume;
205
206 struct __wt_source *ws_head; /* List of WiredTiger sources */
207
208 /*
209 * Each Helium source has a cleaner thread to migrate WiredTiger source
210 * updates from the cache namespace to the primary namespace, based on
211 * the number of bytes or the number of operations. (There's a cleaner
212 * thread per Helium store so migration operations can overlap.) We
213 * read these fields without a lock, but serialize writes to minimize
214 * races (and because it costs us nothing).
215 */
216 pthread_t cleaner_id; /* Cleaner thread ID */
217 volatile int cleaner_stop; /* Cleaner thread quit flag */
218
219 /*
220 * Each WiredTiger connection has a transaction namespace which lists
221 * resolved transactions with their committed or aborted state as a
222 * value. That namespace appears in a single Helium store (the first
223 * one created, if it doesn't already exist), and then it's referenced
224 * from other Helium stores.
225 */
226 #define TXN_ABORTED 'A'
227 #define TXN_COMMITTED 'C'
228 #define TXN_UNRESOLVED 0
229 he_t he_txn; /* Helium txn store */
230 bool he_owner; /* Owns transaction store */
231
232 struct __he_source *next; /* List of Helium sources */
233 } HELIUM_SOURCE;
234
235 /*
236 * DATA_SOURCE --
237 * A WiredTiger data source, supporting one or more HELIUM_SOURCE objects.
238 */
239 typedef struct __data_source {
240 WT_DATA_SOURCE wtds; /* Must come first */
241
242 WT_EXTENSION_API *wt_api; /* Extension functions */
243
244 pthread_rwlock_t global_lock; /* Global lock */
245 bool lockinit; /* Lock created */
246
247 struct __he_source *hs_head; /* List of Helium sources */
248 } DATA_SOURCE;
249
250 /*
251 * CACHE_RECORD --
252 * An array of updates from the cache object.
253 *
254 * Values in the cache store are marshalled/unmarshalled to/from the store,
255 * using a simple encoding:
256 * {N records: 4B}
257 * {record#1 TxnID: 8B}
258 * {record#1 remove tombstone: 1B}
259 * {record#1 data length: 4B}
260 * {record#1 data}
261 * ...
262 *
263 * Each cursor potentially has a single set of these values.
264 */
265 typedef struct __cache_record {
266 uint8_t *v; /* Value */
267 uint32_t len; /* Value length */
268 uint64_t txnid; /* Transaction ID */
269 #define REMOVE_TOMBSTONE 'R'
270 int remove; /* 1/0 remove flag */
271 } CACHE_RECORD;
272
273 /*
274 * CURSOR --
275 * A cursor, supporting a single WiredTiger cursor.
276 */
277 typedef struct __cursor {
278 WT_CURSOR wtcursor; /* Must come first */
279
280 WT_EXTENSION_API *wt_api; /* Extension functions */
281
282 WT_SOURCE *ws; /* Underlying source */
283
284 HE_ITEM record; /* Record */
285 uint8_t __key[HE_MAX_KEY_LEN]; /* Record.key, Record.value */
286 uint8_t *v;
287 size_t len;
288 size_t mem_len;
289
290 struct {
291 uint8_t *v; /* Temporary buffers */
292 size_t len;
293 size_t mem_len;
294 } t1, t2, t3;
295
296 int config_append; /* config "append" */
297 int config_overwrite; /* config "overwrite" */
298
299 CACHE_RECORD *cache; /* unmarshalled cache records */
300 uint32_t cache_entries; /* cache records */
301 uint32_t cache_slots; /* cache total record slots */
302 } CURSOR;
303
304 /*
305 * prefix_match --
306 * Return if a string matches a prefix.
307 */
308 static inline int
prefix_match(const char * str,const char * pfx)309 prefix_match(const char *str, const char *pfx)
310 {
311 return (strncmp(str, pfx, strlen(pfx)) == 0);
312 }
313
314 /*
315 * string_match --
316 * Return if a string matches a byte string of len bytes.
317 */
318 static inline int
string_match(const char * str,const char * bytes,size_t len)319 string_match(const char *str, const char *bytes, size_t len)
320 {
321 return (strncmp(str, bytes, len) == 0 && (str)[(len)] == '\0');
322 }
323
324 /*
325 * cursor_destroy --
326 * Free a cursor's memory, and optionally the cursor itself.
327 */
328 static void
cursor_destroy(CURSOR * cursor)329 cursor_destroy(CURSOR *cursor)
330 {
331 if (cursor != NULL) {
332 free(cursor->v);
333 free(cursor->t1.v);
334 free(cursor->t2.v);
335 free(cursor->t3.v);
336 free(cursor->cache);
337 OVERWRITE_AND_FREE(cursor);
338 }
339 }
340
341 /*
342 * os_errno --
343 * Limit our use of errno so it's easy to find/remove.
344 */
345 static int
os_errno(void)346 os_errno(void)
347 {
348 return (errno);
349 }
350
351 /*
352 * lock_init --
353 * Initialize a lock.
354 */
355 static int
lock_init(WT_EXTENSION_API * wt_api,WT_SESSION * session,pthread_rwlock_t * lockp)356 lock_init(
357 WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp)
358 {
359 int ret = 0;
360
361 if ((ret = pthread_rwlock_init(lockp, NULL)) != 0)
362 ERET(wt_api, session, WT_PANIC,
363 "pthread_rwlock_init: %s", strerror(ret));
364 return (0);
365 }
366
367 /*
368 * lock_destroy --
369 * Destroy a lock.
370 */
371 static int
lock_destroy(WT_EXTENSION_API * wt_api,WT_SESSION * session,pthread_rwlock_t * lockp)372 lock_destroy(
373 WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp)
374 {
375 int ret = 0;
376
377 if ((ret = pthread_rwlock_destroy(lockp)) != 0)
378 ERET(wt_api, session, WT_PANIC,
379 "pthread_rwlock_destroy: %s", strerror(ret));
380 return (0);
381 }
382
383 /*
384 * writelock --
385 * Acquire a write lock.
386 */
387 static inline int
writelock(WT_EXTENSION_API * wt_api,WT_SESSION * session,pthread_rwlock_t * lockp)388 writelock(
389 WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp)
390 {
391 int ret = 0;
392
393 if ((ret = pthread_rwlock_wrlock(lockp)) != 0)
394 ERET(wt_api, session, WT_PANIC,
395 "pthread_rwlock_wrlock: %s", strerror(ret));
396 return (0);
397 }
398
399 /*
400 * unlock --
401 * Release a lock.
402 */
403 static inline int
unlock(WT_EXTENSION_API * wt_api,WT_SESSION * session,pthread_rwlock_t * lockp)404 unlock(WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp)
405 {
406 int ret = 0;
407
408 if ((ret = pthread_rwlock_unlock(lockp)) != 0)
409 ERET(wt_api, session, WT_PANIC,
410 "pthread_rwlock_unlock: %s", strerror(ret));
411 return (0);
412 }
413
414 #if 0
415 /*
416 * helium_dump_kv --
417 * Dump a Helium record.
418 */
419 static void
420 helium_dump_kv(const char *pfx, uint8_t *p, size_t len, FILE *fp)
421 {
422 (void)fprintf(stderr, "%s %3zu: ", pfx, len);
423 for (; len > 0; --len, ++p)
424 if (!isspace(*p) && isprint(*p))
425 (void)putc(*p, fp);
426 else if (len == 1 && *p == '\0') /* Skip string nuls. */
427 continue;
428 else
429 (void)fprintf(fp, "%#x", *p);
430 (void)putc('\n', fp);
431 }
432
433 /*
434 * helium_dump --
435 * Dump the records in a Helium store.
436 */
437 static int
438 helium_dump(WT_EXTENSION_API *wt_api, he_t he, const char *tag)
439 {
440 HE_ITEM *r, _r;
441 uint8_t k[4 * 1024], v[4 * 1024];
442 int ret = 0;
443
444 r = &_r;
445 memset(r, 0, sizeof(*r));
446 r->key = k;
447 r->val = v;
448
449 (void)fprintf(stderr, "== %s\n", tag);
450 while ((ret = he_next(he, r, (size_t)0, sizeof(v))) == 0) {
451 #if 0
452 uint64_t recno;
453 WT_RET(wt_api->struct_unpack(wt_api,
454 NULL, r->key, r->key_len, "r", &recno));
455 fprintf(stderr, "K: %" PRIu64, recno);
456 #else
457 helium_dump_kv("K: ", r->key, r->key_len, stderr);
458 #endif
459 helium_dump_kv("V: ", r->val, r->val_len, stderr);
460 }
461 if (ret != HE_ERR_ITEM_NOT_FOUND) {
462 fprintf(stderr, "he_next: %s\n", he_strerror(ret));
463 ret = WT_ERROR;
464 }
465 return (ret);
466 }
467
468 /*
469 * helium_stats --
470 * Display Helium statistics for a datastore.
471 */
472 static int
473 helium_stats(
474 WT_EXTENSION_API *wt_api, WT_SESSION *session, he_t he, const char *tag)
475 {
476 HE_STATS stats;
477 int ret = 0;
478
479 if ((ret = he_stats(he, &stats)) != 0)
480 ERET(wt_api, session, ret, "he_stats: %s", he_strerror(ret));
481 fprintf(stderr, "== %s\n", tag);
482 fprintf(stderr, "name=%s\n", stats.name);
483 fprintf(stderr, "deleted_items=%" PRIu64 "\n", stats.deleted_items);
484 fprintf(stderr, "locked_items=%" PRIu64 "\n", stats.locked_items);
485 fprintf(stderr, "valid_items=%" PRIu64 "\n", stats.valid_items);
486 fprintf(stderr, "capacity=%" PRIu64 "B\n", stats.capacity);
487 fprintf(stderr, "size=%" PRIu64 "B\n", stats.size);
488 return (0);
489 }
490 #endif
491
492 /*
493 * helium_call --
494 * Call a Helium key retrieval function, handling overflow.
495 */
496 static inline int
helium_call(WT_CURSOR * wtcursor,const char * fname,he_t he,int (* f)(he_t,HE_ITEM *,size_t,size_t))497 helium_call(WT_CURSOR *wtcursor, const char *fname,
498 he_t he, int (*f)(he_t, HE_ITEM *, size_t, size_t))
499 {
500 CURSOR *cursor;
501 HE_ITEM *r;
502 WT_EXTENSION_API *wt_api;
503 WT_SESSION *session;
504 int ret = 0;
505 char *p;
506
507 session = wtcursor->session;
508 cursor = (CURSOR *)wtcursor;
509 wt_api = cursor->wt_api;
510
511 r = &cursor->record;
512 r->val = cursor->v;
513
514 restart:
515 if ((ret = f(he, r, (size_t)0, cursor->mem_len)) != 0) {
516 if (ret == HE_ERR_ITEM_NOT_FOUND)
517 return (WT_NOTFOUND);
518 ERET(wt_api, session, ret, "%s: %s", fname, he_strerror(ret));
519 }
520
521 /*
522 * If the returned length is larger than our passed-in length, we didn't
523 * get the complete value. Grow the buffer and use he_lookup to do the
524 * retrieval (he_lookup because the call succeeded and the key was
525 * copied out, so calling he_next/he_prev again would skip key/value
526 * pairs).
527 *
528 * We have to loop, another thread of control might change the length of
529 * the value, requiring we grow our buffer multiple times.
530 *
531 * We have to potentially restart the entire call in case the underlying
532 * key/value disappears.
533 */
534 for (;;) {
535 if (cursor->mem_len >= r->val_len) {
536 cursor->len = r->val_len;
537 return (0);
538 }
539
540 /* Grow the value buffer. */
541 if ((p = realloc(cursor->v, r->val_len + 32)) == NULL)
542 return (os_errno());
543 cursor->v = r->val = p;
544 cursor->mem_len = r->val_len + 32;
545
546 if ((ret = he_lookup(he, r, (size_t)0, cursor->mem_len)) != 0) {
547 if (ret == HE_ERR_ITEM_NOT_FOUND)
548 goto restart;
549 ERET(wt_api,
550 session, ret, "he_lookup: %s", he_strerror(ret));
551 }
552 }
553 /* NOTREACHED */
554 }
555
556 /*
557 * txn_state_set --
558 * Resolve a transaction.
559 */
560 static int
txn_state_set(WT_EXTENSION_API * wt_api,WT_SESSION * session,HELIUM_SOURCE * hs,uint64_t txnid,int commit)561 txn_state_set(WT_EXTENSION_API *wt_api,
562 WT_SESSION *session, HELIUM_SOURCE *hs, uint64_t txnid, int commit)
563 {
564 HE_ITEM txn;
565 uint8_t val;
566 int ret = 0;
567
568 /*
569 * Update the store -- commits must be durable, flush the volume.
570 *
571 * XXX
572 * Not endian-portable, we're writing a native transaction ID to the
573 * store.
574 */
575 memset(&txn, 0, sizeof(txn));
576 txn.key = &txnid;
577 txn.key_len = sizeof(txnid);
578 val = commit ? TXN_COMMITTED : TXN_ABORTED;
579 txn.val = &val;
580 txn.val_len = sizeof(val);
581
582 if ((ret = he_update(hs->he_txn, &txn)) != 0)
583 ERET(wt_api, session, ret, "he_update: %s", he_strerror(ret));
584
585 if (commit && (ret = he_commit(hs->he_txn)) != 0)
586 ERET(wt_api, session, ret, "he_commit: %s", he_strerror(ret));
587 return (0);
588 }
589
590 /*
591 * txn_notify --
592 * Resolve a transaction; called from WiredTiger during commit/abort.
593 */
594 static int
txn_notify(WT_TXN_NOTIFY * handler,WT_SESSION * session,uint64_t txnid,int committed)595 txn_notify(WT_TXN_NOTIFY *handler,
596 WT_SESSION *session, uint64_t txnid, int committed)
597 {
598 HELIUM_SOURCE *hs;
599
600 hs = (HELIUM_SOURCE *)handler;
601 return (txn_state_set(hs->wt_api, session, hs, txnid, committed));
602 }
603
604 /*
605 * txn_state --
606 * Return a transaction's state.
607 */
608 static int
txn_state(WT_CURSOR * wtcursor,uint64_t txnid)609 txn_state(WT_CURSOR *wtcursor, uint64_t txnid)
610 {
611 CURSOR *cursor;
612 HE_ITEM txn;
613 HELIUM_SOURCE *hs;
614 uint8_t val_buf[16];
615
616 cursor = (CURSOR *)wtcursor;
617 hs = cursor->ws->hs;
618
619 memset(&txn, 0, sizeof(txn));
620 txn.key = &txnid;
621 txn.key_len = sizeof(txnid);
622 txn.val = val_buf;
623 txn.val_len = sizeof(val_buf);
624
625 if (he_lookup(hs->he_txn, &txn, (size_t)0, sizeof(val_buf)) == 0)
626 return (val_buf[0]);
627 return (TXN_UNRESOLVED);
628 }
629
630 /*
631 * cache_value_append --
632 * Append the current WiredTiger cursor's value to a cache record.
633 */
634 static int
cache_value_append(WT_CURSOR * wtcursor,int remove_op)635 cache_value_append(WT_CURSOR *wtcursor, int remove_op)
636 {
637 CURSOR *cursor;
638 HE_ITEM *r;
639 WT_EXTENSION_API *wt_api;
640 WT_SESSION *session;
641 uint64_t txnid;
642 size_t len;
643 uint32_t entries;
644 uint8_t *p;
645
646 session = wtcursor->session;
647 cursor = (CURSOR *)wtcursor;
648 wt_api = cursor->wt_api;
649
650 r = &cursor->record;
651
652 /*
653 * A cache update is 4B that counts the number of entries in the update,
654 * followed by sets of: 8B of txn ID then either a remove tombstone or a
655 * 4B length and variable-length data pair. Grow the value buffer, then
656 * append the cursor's information.
657 */
658 len = cursor->len + /* current length */
659 sizeof(uint32_t) + /* entries */
660 sizeof(uint64_t) + /* txn ID */
661 1 + /* remove byte */
662 (remove_op ? 0 : /* optional data */
663 sizeof(uint32_t) + wtcursor->value.size) +
664 32; /* slop */
665
666 if (len > cursor->mem_len) {
667 if ((p = realloc(cursor->v, len)) == NULL)
668 return (os_errno());
669 cursor->v = p;
670 cursor->mem_len = len;
671 }
672
673 /* Get the transaction ID. */
674 txnid = wt_api->transaction_id(wt_api, session);
675
676 /* Update the number of records in this value. */
677 if (cursor->len == 0) {
678 entries = 1;
679 cursor->len = sizeof(uint32_t);
680 } else {
681 memcpy(&entries, cursor->v, sizeof(uint32_t));
682 ++entries;
683 }
684 memcpy(cursor->v, &entries, sizeof(uint32_t));
685
686 /*
687 * Copy the WiredTiger cursor's data into place: txn ID, remove
688 * tombstone, data length, data.
689 *
690 * XXX
691 * Not endian-portable, we're writing a native transaction ID to the
692 * store.
693 */
694 p = cursor->v + cursor->len;
695 memcpy(p, &txnid, sizeof(uint64_t));
696 p += sizeof(uint64_t);
697 if (remove_op)
698 *p++ = REMOVE_TOMBSTONE;
699 else {
700 *p++ = ' ';
701 memcpy(p, &wtcursor->value.size, sizeof(uint32_t));
702 p += sizeof(uint32_t);
703 memcpy(p, wtcursor->value.data, wtcursor->value.size);
704 p += wtcursor->value.size;
705 }
706 cursor->len = (size_t)(p - cursor->v);
707
708 /* Update the underlying Helium record. */
709 r->val = cursor->v;
710 r->val_len = cursor->len;
711
712 return (0);
713 }
714
715 /*
716 * cache_value_unmarshall --
717 * Unmarshall a cache value into a set of records.
718 */
719 static int
cache_value_unmarshall(WT_CURSOR * wtcursor)720 cache_value_unmarshall(WT_CURSOR *wtcursor)
721 {
722 CACHE_RECORD *cp;
723 CURSOR *cursor;
724 uint32_t entries, i;
725 uint8_t *p;
726 int ret = 0;
727
728 cursor = (CURSOR *)wtcursor;
729
730 /* If we don't have enough record slots, allocate some more. */
731 memcpy(&entries, cursor->v, sizeof(uint32_t));
732 if (entries > cursor->cache_slots) {
733 if ((p = realloc(cursor->cache,
734 (entries + 20) * sizeof(cursor->cache[0]))) == NULL)
735 return (os_errno());
736
737 cursor->cache = (CACHE_RECORD *)p;
738 cursor->cache_slots = entries + 20;
739 }
740
741 /* Walk the value, splitting it up into records. */
742 p = cursor->v + sizeof(uint32_t);
743 for (i = 0, cp = cursor->cache; i < entries; ++i, ++cp) {
744 memcpy(&cp->txnid, p, sizeof(uint64_t));
745 p += sizeof(uint64_t);
746 cp->remove = *p++ == REMOVE_TOMBSTONE ? 1 : 0;
747 if (!cp->remove) {
748 memcpy(&cp->len, p, sizeof(uint32_t));
749 p += sizeof(uint32_t);
750 cp->v = p;
751 p += cp->len;
752 }
753 }
754 cursor->cache_entries = entries;
755
756 return (ret);
757 }
758
759 /*
760 * cache_value_aborted --
761 * Return if a transaction has been aborted.
762 */
763 static inline int
cache_value_aborted(WT_CURSOR * wtcursor,CACHE_RECORD * cp)764 cache_value_aborted(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
765 {
766 /*
767 * This function exists as a place to hang this comment.
768 *
769 * WiredTiger resets updated entry transaction IDs to an aborted state
770 * on rollback; to do that here would require tracking updated entries
771 * for a transaction or scanning the cache for updates made on behalf
772 * of the transaction during rollback, expensive stuff. Instead, check
773 * if the transaction has been aborted before calling the underlying
774 * WiredTiger visibility function.
775 */
776 return (txn_state(wtcursor, cp->txnid) == TXN_ABORTED ? 1 : 0);
777 }
778
779 /*
780 * cache_value_committed --
781 * Return if a transaction has been committed.
782 */
783 static inline int
cache_value_committed(WT_CURSOR * wtcursor,CACHE_RECORD * cp)784 cache_value_committed(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
785 {
786 return (txn_state(wtcursor, cp->txnid) == TXN_COMMITTED ? 1 : 0);
787 }
788
789 /*
790 * cache_value_update_check --
791 * Return if an update can proceed based on the previous updates made to
792 * the cache entry.
793 */
794 static int
cache_value_update_check(WT_CURSOR * wtcursor)795 cache_value_update_check(WT_CURSOR *wtcursor)
796 {
797 CACHE_RECORD *cp;
798 CURSOR *cursor;
799 WT_EXTENSION_API *wt_api;
800 WT_SESSION *session;
801 u_int i;
802
803 session = wtcursor->session;
804 cursor = (CURSOR *)wtcursor;
805 wt_api = cursor->wt_api;
806
807 /* Only interesting for snapshot isolation. */
808 if (wt_api->
809 transaction_isolation_level(wt_api, session) != WT_TXN_ISO_SNAPSHOT)
810 return (0);
811
812 /*
813 * If there's an entry that's not visible and hasn't been aborted,
814 * return a deadlock.
815 */
816 for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp)
817 if (!cache_value_aborted(wtcursor, cp) &&
818 !wt_api->transaction_visible(wt_api, session, cp->txnid))
819 return (WT_ROLLBACK);
820 return (0);
821 }
822
823 /*
824 * cache_value_visible --
825 * Return the most recent cache entry update visible to the running
826 * transaction.
827 */
828 static int
cache_value_visible(WT_CURSOR * wtcursor,CACHE_RECORD ** cpp)829 cache_value_visible(WT_CURSOR *wtcursor, CACHE_RECORD **cpp)
830 {
831 CACHE_RECORD *cp;
832 CURSOR *cursor;
833 WT_EXTENSION_API *wt_api;
834 WT_SESSION *session;
835 u_int i;
836
837 *cpp = NULL;
838
839 session = wtcursor->session;
840 cursor = (CURSOR *)wtcursor;
841 wt_api = cursor->wt_api;
842
843 /*
844 * We want the most recent cache entry update; the cache entries are
845 * in update order, walk from the end to the beginning.
846 */
847 cp = cursor->cache + cursor->cache_entries;
848 for (i = 0; i < cursor->cache_entries; ++i) {
849 --cp;
850 if (!cache_value_aborted(wtcursor, cp) &&
851 wt_api->transaction_visible(wt_api, session, cp->txnid)) {
852 *cpp = cp;
853 return (1);
854 }
855 }
856 return (0);
857 }
858
859 /*
860 * cache_value_visible_all --
861 * Return if a cache entry has no updates that aren't globally visible.
862 */
863 static int
cache_value_visible_all(WT_CURSOR * wtcursor,uint64_t oldest)864 cache_value_visible_all(WT_CURSOR *wtcursor, uint64_t oldest)
865 {
866 CACHE_RECORD *cp;
867 CURSOR *cursor;
868 u_int i;
869
870 cursor = (CURSOR *)wtcursor;
871
872 /*
873 * Compare the update's transaction ID and the oldest transaction ID
874 * not yet visible to a running transaction. If there's an update a
875 * running transaction might want, the entry must remain in the cache.
876 * (We could tighten this requirement: if the only update required is
877 * also the update we'd migrate to the primary, it would still be OK
878 * to migrate it.)
879 */
880 for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp)
881 if (cp->txnid >= oldest)
882 return (0);
883 return (1);
884 }
885
886 /*
887 * cache_value_last_committed --
888 * Find the most recent update in a cache entry, recovery processing.
889 */
890 static void
cache_value_last_committed(WT_CURSOR * wtcursor,CACHE_RECORD ** cpp)891 cache_value_last_committed(WT_CURSOR *wtcursor, CACHE_RECORD **cpp)
892 {
893 CACHE_RECORD *cp;
894 CURSOR *cursor;
895 u_int i;
896
897 *cpp = NULL;
898
899 cursor = (CURSOR *)wtcursor;
900
901 /*
902 * Find the most recent update in the cache record, we're going to try
903 * and migrate it into the primary, recovery version.
904 *
905 * We know the entry is visible, but it must have been committed before
906 * the failure to be migrated.
907 *
908 * Cache entries are in update order, walk from end to beginning.
909 */
910 cp = cursor->cache + cursor->cache_entries;
911 for (i = 0; i < cursor->cache_entries; ++i) {
912 --cp;
913 if (cache_value_committed(wtcursor, cp)) {
914 *cpp = cp;
915 return;
916 }
917 }
918 }
919
920 /*
921 * cache_value_last_not_aborted --
922 * Find the most recent update in a cache entry, normal processing.
923 */
924 static void
cache_value_last_not_aborted(WT_CURSOR * wtcursor,CACHE_RECORD ** cpp)925 cache_value_last_not_aborted(WT_CURSOR *wtcursor, CACHE_RECORD **cpp)
926 {
927 CACHE_RECORD *cp;
928 CURSOR *cursor;
929 u_int i;
930
931 *cpp = NULL;
932
933 cursor = (CURSOR *)wtcursor;
934
935 /*
936 * Find the most recent update in the cache record, we're going to try
937 * and migrate it into the primary, normal processing version.
938 *
939 * We don't have to check if the entry was committed, we've already
940 * confirmed all entries for this cache key are globally visible, which
941 * means they must be either committed or aborted.
942 *
943 * Cache entries are in update order, walk from end to beginning.
944 */
945 cp = cursor->cache + cursor->cache_entries;
946 for (i = 0; i < cursor->cache_entries; ++i) {
947 --cp;
948 if (!cache_value_aborted(wtcursor, cp)) {
949 *cpp = cp;
950 return;
951 }
952 }
953 }
954
955 /*
956 * cache_value_txnmin --
957 * Return the oldest transaction ID involved in a cache update.
958 */
959 static void
cache_value_txnmin(WT_CURSOR * wtcursor,uint64_t * txnminp)960 cache_value_txnmin(WT_CURSOR *wtcursor, uint64_t *txnminp)
961 {
962 CACHE_RECORD *cp;
963 CURSOR *cursor;
964 uint64_t txnmin;
965 u_int i;
966
967 cursor = (CURSOR *)wtcursor;
968
969 /* Return the oldest transaction ID for in the cache entry. */
970 txnmin = UINT64_MAX;
971 for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp)
972 if (txnmin > cp->txnid)
973 txnmin = cp->txnid;
974 *txnminp = txnmin;
975 }
976
977 /*
978 * key_max_err --
979 * Common error when a WiredTiger key is too large.
980 */
981 static int
key_max_err(WT_EXTENSION_API * wt_api,WT_SESSION * session,size_t len)982 key_max_err(WT_EXTENSION_API *wt_api, WT_SESSION *session, size_t len)
983 {
984 int ret = 0;
985
986 ERET(wt_api, session, EINVAL,
987 "key length (%zu bytes) larger than the maximum Helium "
988 "key length of %d bytes",
989 len, HE_MAX_KEY_LEN);
990 }
991
992 /*
993 * copyin_key --
994 * Copy a WT_CURSOR key to a HE_ITEM key.
995 */
996 static inline int
copyin_key(WT_CURSOR * wtcursor,int allocate_key)997 copyin_key(WT_CURSOR *wtcursor, int allocate_key)
998 {
999 CURSOR *cursor;
1000 HE_ITEM *r;
1001 WT_EXTENSION_API *wt_api;
1002 WT_SESSION *session;
1003 WT_SOURCE *ws;
1004 size_t size;
1005
1006 session = wtcursor->session;
1007 cursor = (CURSOR *)wtcursor;
1008 ws = cursor->ws;
1009 wt_api = cursor->wt_api;
1010
1011 r = &cursor->record;
1012 if (ws->config_recno) {
1013 /*
1014 * Allocate a new record for append operations.
1015 *
1016 * A specified record number could potentially be larger than
1017 * the maximum known record number, update the maximum number
1018 * as necessary.
1019 *
1020 * Assume we can compare 8B values without locking them, and
1021 * test again after acquiring the lock.
1022 *
1023 * XXX
1024 * If the put fails for some reason, we'll have incremented the
1025 * maximum record number past the correct point. I can't think
1026 * of a reason any application would care or notice, but it's
1027 * not quite right.
1028 */
1029 if (allocate_key && cursor->config_append) {
1030 WT_RET(writelock(wt_api, session, &ws->lock));
1031 wtcursor->recno = ++ws->append_recno;
1032 WT_RET(unlock(wt_api, session, &ws->lock));
1033 } else if (wtcursor->recno > ws->append_recno) {
1034 WT_RET(writelock(wt_api, session, &ws->lock));
1035 if (wtcursor->recno > ws->append_recno)
1036 ws->append_recno = wtcursor->recno;
1037 WT_RET(unlock(wt_api, session, &ws->lock));
1038 }
1039
1040 WT_RET(wt_api->struct_size(wt_api,
1041 session, &size, "r", wtcursor->recno));
1042 WT_RET(wt_api->struct_pack(wt_api,
1043 session, r->key, HE_MAX_KEY_LEN, "r", wtcursor->recno));
1044 r->key_len = size;
1045 } else {
1046 /* I'm not sure this test is necessary, but it's cheap. */
1047 if (wtcursor->key.size > HE_MAX_KEY_LEN)
1048 return (
1049 key_max_err(wt_api, session, wtcursor->key.size));
1050
1051 /*
1052 * A set cursor key might reference application memory, which
1053 * is only OK until the cursor operation has been called (in
1054 * other words, we can only reference application memory from
1055 * the WT_CURSOR.set_key call until the WT_CURSOR.op call).
1056 * For this reason, do a full copy, don't just reference the
1057 * WT_CURSOR key's data.
1058 */
1059 memcpy(r->key, wtcursor->key.data, wtcursor->key.size);
1060 r->key_len = wtcursor->key.size;
1061 }
1062 return (0);
1063 }
1064
1065 /*
1066 * copyout_key --
1067 * Copy a HE_ITEM key to a WT_CURSOR key.
1068 */
1069 static inline int
copyout_key(WT_CURSOR * wtcursor)1070 copyout_key(WT_CURSOR *wtcursor)
1071 {
1072 CURSOR *cursor;
1073 HE_ITEM *r;
1074 WT_EXTENSION_API *wt_api;
1075 WT_SESSION *session;
1076 WT_SOURCE *ws;
1077
1078 session = wtcursor->session;
1079 cursor = (CURSOR *)wtcursor;
1080 wt_api = cursor->wt_api;
1081 ws = cursor->ws;
1082
1083 r = &cursor->record;
1084 if (ws->config_recno)
1085 WT_RET(wt_api->struct_unpack(wt_api,
1086 session, r->key, r->key_len, "r", &wtcursor->recno));
1087 else {
1088 wtcursor->key.data = r->key;
1089 wtcursor->key.size = (size_t)r->key_len;
1090 }
1091 return (0);
1092 }
1093
1094 /*
1095 * copyout_val --
1096 * Copy a Helium store's HE_ITEM value to a WT_CURSOR value.
1097 */
1098 static inline int
copyout_val(WT_CURSOR * wtcursor,CACHE_RECORD * cp)1099 copyout_val(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
1100 {
1101 CURSOR *cursor;
1102
1103 cursor = (CURSOR *)wtcursor;
1104
1105 if (cp == NULL) {
1106 wtcursor->value.data = cursor->v;
1107 wtcursor->value.size = cursor->len;
1108 } else {
1109 wtcursor->value.data = cp->v;
1110 wtcursor->value.size = cp->len;
1111 }
1112 return (0);
1113 }
1114
1115 /*
1116 * nextprev --
1117 * Cursor next/prev.
1118 */
1119 static int
nextprev(WT_CURSOR * wtcursor,const char * fname,int (* f)(he_t,HE_ITEM *,size_t,size_t))1120 nextprev(WT_CURSOR *wtcursor, const char *fname,
1121 int (*f)(he_t, HE_ITEM *, size_t, size_t))
1122 {
1123 CACHE_RECORD *cp;
1124 CURSOR *cursor;
1125 HE_ITEM *r;
1126 WT_EXTENSION_API *wt_api;
1127 WT_ITEM a, b;
1128 WT_SESSION *session;
1129 WT_SOURCE *ws;
1130 int cache_ret, cache_rm, cmp, ret = 0;
1131 void *p;
1132
1133 session = wtcursor->session;
1134 cursor = (CURSOR *)wtcursor;
1135 ws = cursor->ws;
1136 wt_api = cursor->wt_api;
1137 r = &cursor->record;
1138
1139 cache_rm = 0;
1140
1141 /*
1142 * If the cache isn't yet in use, it's a simpler problem, just check
1143 * the store. We don't care if we race, we're not guaranteeing any
1144 * special behavior with respect to phantoms.
1145 */
1146 if (!ws->he_cache_inuse) {
1147 cache_ret = WT_NOTFOUND;
1148 goto cache_clean;
1149 }
1150
1151 skip_deleted:
1152 /*
1153 * The next/prev key/value pair might be in the cache, which means we
1154 * are making two calls and returning the best choice. As each call
1155 * overwrites both key and value, we have to have a copy of the key
1156 * for the second call plus the returned key and value from the first
1157 * call. That's why each cursor has 3 temporary buffers.
1158 *
1159 * First, copy the key.
1160 */
1161 if (cursor->t1.mem_len < r->key_len) {
1162 if ((p = realloc(cursor->t1.v, r->key_len)) == NULL)
1163 return (os_errno());
1164 cursor->t1.v = p;
1165 cursor->t1.mem_len = r->key_len;
1166 }
1167 memcpy(cursor->t1.v, r->key, r->key_len);
1168 cursor->t1.len = r->key_len;
1169
1170 /*
1171 * Move through the cache until we either find a record with a visible
1172 * entry, or we reach the end/beginning.
1173 */
1174 for (cache_rm = 0;;) {
1175 if ((ret = helium_call(wtcursor, fname, ws->he_cache, f)) != 0)
1176 break;
1177 WT_RET(cache_value_unmarshall(wtcursor));
1178
1179 /* If there's no visible entry, move to the next one. */
1180 if (!cache_value_visible(wtcursor, &cp))
1181 continue;
1182
1183 /*
1184 * If the entry has been deleted, remember that and continue.
1185 * We can't just skip the entry because it might be a delete
1186 * of an entry in the primary store, which means the cache
1187 * entry stops us from returning the primary store's entry.
1188 */
1189 if (cp->remove)
1190 cache_rm = 1;
1191
1192 /*
1193 * Copy the cache key. If the cache's entry wasn't a delete,
1194 * copy the value as well, we may return the cache entry.
1195 */
1196 if (cursor->t2.mem_len < r->key_len) {
1197 if ((p = realloc(cursor->t2.v, r->key_len)) == NULL)
1198 return (os_errno());
1199 cursor->t2.v = p;
1200 cursor->t2.mem_len = r->key_len;
1201 }
1202 memcpy(cursor->t2.v, r->key, r->key_len);
1203 cursor->t2.len = r->key_len;
1204
1205 if (cache_rm)
1206 break;
1207
1208 if (cursor->t3.mem_len < cp->len) {
1209 if ((p = realloc(cursor->t3.v, cp->len)) == NULL)
1210 return (os_errno());
1211 cursor->t3.v = p;
1212 cursor->t3.mem_len = cp->len;
1213 }
1214 memcpy(cursor->t3.v, cp->v, cp->len);
1215 cursor->t3.len = cp->len;
1216
1217 break;
1218 }
1219 if (ret != 0 && ret != WT_NOTFOUND)
1220 return (ret);
1221 cache_ret = ret;
1222
1223 /* Copy the original key back into place. */
1224 memcpy(r->key, cursor->t1.v, cursor->t1.len);
1225 r->key_len = cursor->t1.len;
1226
1227 cache_clean:
1228 /* Get the next/prev entry from the store. */
1229 ret = helium_call(wtcursor, fname, ws->he, f);
1230 if (ret != 0 && ret != WT_NOTFOUND)
1231 return (ret);
1232
1233 /* If no entries in either the cache or the primary, we're done. */
1234 if (cache_ret == WT_NOTFOUND && ret == WT_NOTFOUND)
1235 return (WT_NOTFOUND);
1236
1237 /*
1238 * If both the cache and the primary had entries, decide which is a
1239 * better choice and pretend we didn't find the other one.
1240 */
1241 if (cache_ret == 0 && ret == 0) {
1242 a.data = r->key; /* a is the primary */
1243 a.size = (uint32_t)r->key_len;
1244 b.data = cursor->t2.v; /* b is the cache */
1245 b.size = (uint32_t)cursor->t2.len;
1246 WT_RET(wt_api->collate(wt_api, session, NULL, &a, &b, &cmp));
1247
1248 if (f == he_next) {
1249 if (cmp >= 0)
1250 ret = WT_NOTFOUND;
1251 else
1252 cache_ret = WT_NOTFOUND;
1253 } else {
1254 if (cmp <= 0)
1255 ret = WT_NOTFOUND;
1256 else
1257 cache_ret = WT_NOTFOUND;
1258 }
1259 }
1260
1261 /*
1262 * If the cache is the key we'd choose, but it's a delete, skip past it
1263 * by moving from the deleted key to the next/prev item in either the
1264 * primary or the cache.
1265 */
1266 if (cache_ret == 0 && cache_rm) {
1267 memcpy(r->key, cursor->t2.v, cursor->t2.len);
1268 r->key_len = cursor->t2.len;
1269 goto skip_deleted;
1270 }
1271
1272 /* If taking the cache's entry, copy the value into place. */
1273 if (cache_ret == 0) {
1274 memcpy(r->key, cursor->t2.v, cursor->t2.len);
1275 r->key_len = cursor->t2.len;
1276
1277 memcpy(cursor->v, cursor->t3.v, cursor->t3.len);
1278 cursor->len = cursor->t3.len;
1279 }
1280
1281 /* Copy out the chosen key/value pair. */
1282 WT_RET(copyout_key(wtcursor));
1283 WT_RET(copyout_val(wtcursor, NULL));
1284 return (0);
1285 }
1286
1287 /*
1288 * helium_cursor_next --
1289 * WT_CURSOR.next method.
1290 */
1291 static int
helium_cursor_next(WT_CURSOR * wtcursor)1292 helium_cursor_next(WT_CURSOR *wtcursor)
1293 {
1294 return (nextprev(wtcursor, "he_next", he_next));
1295 }
1296
1297 /*
1298 * helium_cursor_prev --
1299 * WT_CURSOR.prev method.
1300 */
1301 static int
helium_cursor_prev(WT_CURSOR * wtcursor)1302 helium_cursor_prev(WT_CURSOR *wtcursor)
1303 {
1304 return (nextprev(wtcursor, "he_prev", he_prev));
1305 }
1306
1307 /*
1308 * helium_cursor_reset --
1309 * WT_CURSOR.reset method.
1310 */
1311 static int
helium_cursor_reset(WT_CURSOR * wtcursor)1312 helium_cursor_reset(WT_CURSOR *wtcursor)
1313 {
1314 CURSOR *cursor;
1315 HE_ITEM *r;
1316
1317 cursor = (CURSOR *)wtcursor;
1318 r = &cursor->record;
1319
1320 /*
1321 * Reset the cursor by setting the key length to 0, causing subsequent
1322 * next/prev operations to return the first/last record of the object.
1323 */
1324 r->key_len = 0;
1325 return (0);
1326 }
1327
1328 /*
1329 * helium_cursor_search --
1330 * WT_CURSOR.search method.
1331 */
1332 static int
helium_cursor_search(WT_CURSOR * wtcursor)1333 helium_cursor_search(WT_CURSOR *wtcursor)
1334 {
1335 CACHE_RECORD *cp;
1336 CURSOR *cursor;
1337 WT_SOURCE *ws;
1338 int ret = 0;
1339
1340 cursor = (CURSOR *)wtcursor;
1341 ws = cursor->ws;
1342
1343 /* Copy in the WiredTiger cursor's key. */
1344 WT_RET(copyin_key(wtcursor, 0));
1345
1346 /*
1347 * Check for an entry in the cache. If we find one, unmarshall it
1348 * and check for a visible entry we can return.
1349 */
1350 if ((ret =
1351 helium_call(wtcursor, "he_lookup", ws->he_cache, he_lookup)) == 0) {
1352 WT_RET(cache_value_unmarshall(wtcursor));
1353 if (cache_value_visible(wtcursor, &cp))
1354 return (cp->remove ?
1355 WT_NOTFOUND : copyout_val(wtcursor, cp));
1356 } else if (ret != WT_NOTFOUND)
1357 return (ret);
1358
1359 /* Check for an entry in the primary store. */
1360 WT_RET(helium_call(wtcursor, "he_lookup", ws->he, he_lookup));
1361 WT_RET(copyout_val(wtcursor, NULL));
1362
1363 return (0);
1364 }
1365
1366 /*
1367 * helium_cursor_search_near --
1368 * WT_CURSOR.search_near method.
1369 */
1370 static int
helium_cursor_search_near(WT_CURSOR * wtcursor,int * exact)1371 helium_cursor_search_near(WT_CURSOR *wtcursor, int *exact)
1372 {
1373 int ret = 0;
1374
1375 /*
1376 * XXX
1377 * I'm not confident this is sufficient: if there are multiple threads
1378 * of control, it's possible for the search for an exact match to fail,
1379 * another thread of control to insert (and commit) an exact match, and
1380 * then it's possible we'll return the wrong value. This needs to be
1381 * revisited once the transactional code is in place.
1382 */
1383
1384 /* Search for an exact match. */
1385 if ((ret = helium_cursor_search(wtcursor)) == 0) {
1386 *exact = 0;
1387 return (0);
1388 }
1389 if (ret != WT_NOTFOUND)
1390 return (ret);
1391
1392 /* Search for a key that's larger. */
1393 if ((ret = helium_cursor_next(wtcursor)) == 0) {
1394 *exact = 1;
1395 return (0);
1396 }
1397 if (ret != WT_NOTFOUND)
1398 return (ret);
1399
1400 /* Search for a key that's smaller. */
1401 if ((ret = helium_cursor_prev(wtcursor)) == 0) {
1402 *exact = -1;
1403 return (0);
1404 }
1405
1406 return (ret);
1407 }
1408
1409 /*
1410 * helium_cursor_insert --
1411 * WT_CURSOR.insert method.
1412 */
1413 static int
helium_cursor_insert(WT_CURSOR * wtcursor)1414 helium_cursor_insert(WT_CURSOR *wtcursor)
1415 {
1416 CACHE_RECORD *cp;
1417 CURSOR *cursor;
1418 HE_ITEM *r;
1419 HELIUM_SOURCE *hs;
1420 WT_EXTENSION_API *wt_api;
1421 WT_SESSION *session;
1422 WT_SOURCE *ws;
1423 int ret = 0;
1424
1425 session = wtcursor->session;
1426 cursor = (CURSOR *)wtcursor;
1427 wt_api = cursor->wt_api;
1428 ws = cursor->ws;
1429 hs = ws->hs;
1430 r = &cursor->record;
1431
1432 /* Get the WiredTiger cursor's key. */
1433 WT_RET(copyin_key(wtcursor, 1));
1434
1435 VMSG(wt_api, session, VERBOSE_L2,
1436 "I %.*s.%.*s", (int)r->key_len, r->key, (int)r->val_len, r->val);
1437
1438 /* Clear the value, assume we're adding the first cache entry. */
1439 cursor->len = 0;
1440
1441 /* Updates are read-modify-writes, lock the underlying cache. */
1442 WT_RET(writelock(wt_api, session, &ws->lock));
1443
1444 /* Read the record from the cache store. */
1445 switch (ret = helium_call(
1446 wtcursor, "he_lookup", ws->he_cache, he_lookup)) {
1447 case 0:
1448 /* Crack the record. */
1449 WT_ERR(cache_value_unmarshall(wtcursor));
1450
1451 /* Check if the update can proceed. */
1452 WT_ERR(cache_value_update_check(wtcursor));
1453
1454 if (cursor->config_overwrite)
1455 break;
1456
1457 /*
1458 * If overwrite is false, a visible entry (that's not a removed
1459 * entry), is an error. We're done checking if there is a
1460 * visible entry in the cache, otherwise repeat the check on the
1461 * primary store.
1462 */
1463 if (cache_value_visible(wtcursor, &cp)) {
1464 if (cp->remove)
1465 break;
1466
1467 ret = WT_DUPLICATE_KEY;
1468 goto err;
1469 }
1470 /* FALLTHROUGH */
1471 case WT_NOTFOUND:
1472 if (cursor->config_overwrite)
1473 break;
1474
1475 /* If overwrite is false, an entry is an error. */
1476 if ((ret = helium_call(
1477 wtcursor, "he_lookup", ws->he, he_lookup)) != WT_NOTFOUND) {
1478 if (ret == 0)
1479 ret = WT_DUPLICATE_KEY;
1480 goto err;
1481 }
1482 ret = 0;
1483 break;
1484 default:
1485 goto err;
1486 }
1487
1488 /*
1489 * Create a new value using the current cache record plus the WiredTiger
1490 * cursor's value, and update the cache.
1491 */
1492 WT_ERR(cache_value_append(wtcursor, 0));
1493 if ((ret = he_update(ws->he_cache, r)) != 0)
1494 EMSG(wt_api, session, ret, "he_update: %s", he_strerror(ret));
1495
1496 /* Update the state while still holding the lock. */
1497 if (!ws->he_cache_inuse)
1498 ws->he_cache_inuse = true;
1499 ++ws->he_cache_ops;
1500
1501 /* Discard the lock. */
1502 err: ESET(unlock(wt_api, session, &ws->lock));
1503
1504 /* If successful, request notification at transaction resolution. */
1505 if (ret == 0)
1506 ESET(wt_api->transaction_notify(
1507 wt_api, session, &hs->txn_notify));
1508
1509 return (ret);
1510 }
1511
1512 /*
1513 * update --
1514 * Update or remove an entry.
1515 */
1516 static int
update(WT_CURSOR * wtcursor,int remove_op)1517 update(WT_CURSOR *wtcursor, int remove_op)
1518 {
1519 CACHE_RECORD *cp;
1520 CURSOR *cursor;
1521 HE_ITEM *r;
1522 HELIUM_SOURCE *hs;
1523 WT_EXTENSION_API *wt_api;
1524 WT_SESSION *session;
1525 WT_SOURCE *ws;
1526 int ret = 0;
1527
1528 session = wtcursor->session;
1529 cursor = (CURSOR *)wtcursor;
1530 wt_api = cursor->wt_api;
1531 ws = cursor->ws;
1532 hs = ws->hs;
1533 r = &cursor->record;
1534
1535 /* Get the WiredTiger cursor's key. */
1536 WT_RET(copyin_key(wtcursor, 0));
1537
1538 VMSG(wt_api, session, VERBOSE_L2,
1539 "%c %.*s.%.*s",
1540 remove_op ? 'R' : 'U',
1541 (int)r->key_len, r->key, (int)r->val_len, r->val);
1542
1543 /* Clear the value, assume we're adding the first cache entry. */
1544 cursor->len = 0;
1545
1546 /* Updates are read-modify-writes, lock the underlying cache. */
1547 WT_RET(writelock(wt_api, session, &ws->lock));
1548
1549 /* Read the record from the cache store. */
1550 switch (ret = helium_call(
1551 wtcursor, "he_lookup", ws->he_cache, he_lookup)) {
1552 case 0:
1553 /* Crack the record. */
1554 WT_ERR(cache_value_unmarshall(wtcursor));
1555
1556 /* Check if the update can proceed. */
1557 WT_ERR(cache_value_update_check(wtcursor));
1558
1559 if (cursor->config_overwrite)
1560 break;
1561
1562 /*
1563 * If overwrite is false, no entry (or a removed entry), is an
1564 * error. We're done checking if there is a visible entry in
1565 * the cache, otherwise repeat the check on the primary store.
1566 */
1567 if (cache_value_visible(wtcursor, &cp)) {
1568 if (!cp->remove)
1569 break;
1570
1571 ret = WT_NOTFOUND;
1572 goto err;
1573 }
1574 /* FALLTHROUGH */
1575 case WT_NOTFOUND:
1576 if (cursor->config_overwrite)
1577 break;
1578
1579 /* If overwrite is false, no entry is an error. */
1580 WT_ERR(helium_call(wtcursor, "he_lookup", ws->he, he_lookup));
1581
1582 /*
1583 * All we care about is the cache entry, which didn't exist;
1584 * clear the returned value, we're about to "append" to it.
1585 */
1586 cursor->len = 0;
1587 break;
1588 default:
1589 goto err;
1590 }
1591
1592 /*
1593 * Create a new cache value based on the current cache record plus the
1594 * WiredTiger cursor's value.
1595 */
1596 WT_ERR(cache_value_append(wtcursor, remove_op));
1597
1598 /* Push the record into the cache. */
1599 if ((ret = he_update(ws->he_cache, r)) != 0)
1600 EMSG(wt_api, session, ret, "he_update: %s", he_strerror(ret));
1601
1602 /* Update the state while still holding the lock. */
1603 if (!ws->he_cache_inuse)
1604 ws->he_cache_inuse = true;
1605 ++ws->he_cache_ops;
1606
1607 /* Discard the lock. */
1608 err: ESET(unlock(wt_api, session, &ws->lock));
1609
1610 /* If successful, request notification at transaction resolution. */
1611 if (ret == 0)
1612 ESET(wt_api->transaction_notify(
1613 wt_api, session, &hs->txn_notify));
1614
1615 return (ret);
1616 }
1617
1618 /*
1619 * helium_cursor_update --
1620 * WT_CURSOR.update method.
1621 */
1622 static int
helium_cursor_update(WT_CURSOR * wtcursor)1623 helium_cursor_update(WT_CURSOR *wtcursor)
1624 {
1625 return (update(wtcursor, 0));
1626 }
1627
1628 /*
1629 * helium_cursor_reserve --
1630 * WT_CURSOR.reserve method.
1631 */
1632 static int
helium_cursor_reserve(WT_CURSOR * wtcursor)1633 helium_cursor_reserve(WT_CURSOR *wtcursor)
1634 {
1635 (void)wtcursor;
1636
1637 /*
1638 * XXX
1639 * We don't currently support reserve, this will require some work.
1640 * The test programs don't currently detect it, so return success.
1641 */
1642 return (0);
1643 }
1644
1645 /*
1646 * helium_cursor_remove --
1647 * WT_CURSOR.remove method.
1648 */
1649 static int
helium_cursor_remove(WT_CURSOR * wtcursor)1650 helium_cursor_remove(WT_CURSOR *wtcursor)
1651 {
1652 CURSOR *cursor;
1653 WT_SOURCE *ws;
1654
1655 cursor = (CURSOR *)wtcursor;
1656 ws = cursor->ws;
1657
1658 /*
1659 * WiredTiger's "remove" of a bitfield is really an update with a value
1660 * of zero.
1661 */
1662 if (ws->config_bitfield) {
1663 wtcursor->value.size = 1;
1664 wtcursor->value.data = "";
1665 return (update(wtcursor, 0));
1666 }
1667 return (update(wtcursor, 1));
1668 }
1669
1670 /*
1671 * helium_cursor_close --
1672 * WT_CURSOR.close method.
1673 */
1674 static int
helium_cursor_close(WT_CURSOR * wtcursor)1675 helium_cursor_close(WT_CURSOR *wtcursor)
1676 {
1677 CURSOR *cursor;
1678 WT_EXTENSION_API *wt_api;
1679 WT_SESSION *session;
1680 WT_SOURCE *ws;
1681 int ret = 0;
1682
1683 session = wtcursor->session;
1684 cursor = (CURSOR *)wtcursor;
1685 wt_api = cursor->wt_api;
1686 ws = cursor->ws;
1687
1688 if ((ret = writelock(wt_api, session, &ws->lock)) == 0) {
1689 --ws->ref;
1690 ret = unlock(wt_api, session, &ws->lock);
1691 }
1692 cursor_destroy(cursor);
1693
1694 return (ret);
1695 }
1696
1697 /*
1698 * ws_source_name --
1699 * Build a namespace name.
1700 */
1701 static int
ws_source_name(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,const char * suffix,char ** pp)1702 ws_source_name(WT_DATA_SOURCE *wtds,
1703 WT_SESSION *session, const char *uri, const char *suffix, char **pp)
1704 {
1705 DATA_SOURCE *ds;
1706 WT_EXTENSION_API *wt_api;
1707 size_t len;
1708 int ret = 0;
1709 const char *p;
1710
1711 ds = (DATA_SOURCE *)wtds;
1712 wt_api = ds->wt_api;
1713
1714 /*
1715 * Create the store's name. Application URIs are "helium:device/name";
1716 * we want the names on the Helium device to be obviously WiredTiger's,
1717 * and the device name isn't interesting. Convert to "WiredTiger:name",
1718 * and add an optional suffix.
1719 */
1720 if (!prefix_match(uri, "helium:") || (p = strchr(uri, '/')) == NULL)
1721 ERET(wt_api, session, EINVAL, "%s: illegal Helium URI", uri);
1722 ++p;
1723
1724 len = strlen(WT_NAME_PREFIX) +
1725 strlen(p) + (suffix == NULL ? 0 : strlen(suffix)) + 5;
1726 if ((*pp = malloc(len)) == NULL)
1727 return (os_errno());
1728 (void)snprintf(*pp, len, "%s%s%s",
1729 WT_NAME_PREFIX, p, suffix == NULL ? "" : suffix);
1730 return (0);
1731 }
1732
1733 /*
1734 * ws_source_close --
1735 * Close a WT_SOURCE reference.
1736 */
1737 static int
ws_source_close(WT_EXTENSION_API * wt_api,WT_SESSION * session,WT_SOURCE * ws)1738 ws_source_close(WT_EXTENSION_API *wt_api, WT_SESSION *session, WT_SOURCE *ws)
1739 {
1740 int ret = 0, tret;
1741
1742 /*
1743 * Warn if open cursors: it shouldn't happen because the upper layers of
1744 * WiredTiger prevent it, so we don't do anything more than warn.
1745 */
1746 if (ws->ref != 0)
1747 EMSG(wt_api, session, WT_ERROR,
1748 "%s: open object with %u open cursors being closed",
1749 ws->uri, ws->ref);
1750
1751 if (ws->he != NULL) {
1752 if ((tret = he_commit(ws->he)) != 0)
1753 EMSG(wt_api, session, tret,
1754 "he_commit: %s: %s", ws->uri, he_strerror(tret));
1755 if ((tret = he_close(ws->he)) != 0)
1756 EMSG(wt_api, session, tret,
1757 "he_close: %s: %s", ws->uri, he_strerror(tret));
1758 ws->he = NULL;
1759 }
1760 if (ws->he_cache != NULL) {
1761 if ((tret = he_close(ws->he_cache)) != 0)
1762 EMSG(wt_api, session, tret,
1763 "he_close: %s(cache): %s",
1764 ws->uri, he_strerror(tret));
1765 ws->he_cache = NULL;
1766 }
1767
1768 if (ws->lockinit)
1769 ESET(lock_destroy(wt_api, session, &ws->lock));
1770
1771 free(ws->uri);
1772 OVERWRITE_AND_FREE(ws);
1773
1774 return (ret);
1775 }
1776
1777 /*
1778 * ws_source_open_object --
1779 * Open an object in the Helium store.
1780 */
1781 static int
ws_source_open_object(WT_DATA_SOURCE * wtds,WT_SESSION * session,HELIUM_SOURCE * hs,const char * uri,const char * suffix,int flags,he_t * hep)1782 ws_source_open_object(WT_DATA_SOURCE *wtds, WT_SESSION *session,
1783 HELIUM_SOURCE *hs,
1784 const char *uri, const char *suffix, int flags, he_t *hep)
1785 {
1786 DATA_SOURCE *ds;
1787 WT_EXTENSION_API *wt_api;
1788 he_t he;
1789 char *p;
1790 int ret = 0;
1791
1792 *hep = NULL;
1793
1794 ds = (DATA_SOURCE *)wtds;
1795 wt_api = ds->wt_api;
1796 p = NULL;
1797
1798 /* Open the underlying Helium object. */
1799 WT_RET(ws_source_name(wtds, session, uri, suffix, &p));
1800 VMSG(wt_api, session, VERBOSE_L1, "open %s/%s", hs->name, p);
1801 if ((he = he_open(hs->device, p, flags, NULL)) == NULL) {
1802 ret = os_errno();
1803 EMSG(wt_api, session, ret,
1804 "he_open: %s/%s: %s", hs->name, p, he_strerror(ret));
1805 }
1806 *hep = he;
1807
1808 free(p);
1809 return (ret);
1810 }
1811
1812 #define WS_SOURCE_OPEN_BUSY 0x01 /* Fail if source busy */
1813 #define WS_SOURCE_OPEN_GLOBAL 0x02 /* Keep the global lock */
1814
1815 /*
1816 * ws_source_open --
1817 * Return a locked WiredTiger source, allocating and opening if it doesn't
1818 * already exist.
1819 */
1820 static int
ws_source_open(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config,u_int flags,WT_SOURCE ** refp)1821 ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session,
1822 const char *uri, WT_CONFIG_ARG *config, u_int flags, WT_SOURCE **refp)
1823 {
1824 DATA_SOURCE *ds;
1825 HELIUM_SOURCE *hs;
1826 WT_CONFIG_ITEM a;
1827 WT_EXTENSION_API *wt_api;
1828 WT_SOURCE *ws;
1829 size_t len;
1830 int oflags, ret = 0;
1831 const char *p, *t;
1832
1833 *refp = NULL;
1834
1835 ds = (DATA_SOURCE *)wtds;
1836 wt_api = ds->wt_api;
1837 ws = NULL;
1838
1839 /*
1840 * The URI will be "helium:" followed by a Helium name and object name
1841 * pair separated by a slash, for example, "helium:volume/object".
1842 */
1843 if (!prefix_match(uri, "helium:"))
1844 goto bad_name;
1845 p = uri + strlen("helium:");
1846 if (p[0] == '/' || (t = strchr(p, '/')) == NULL || t[1] == '\0')
1847 bad_name: ERET(wt_api, session, EINVAL, "%s: illegal name format", uri);
1848 len = (size_t)(t - p);
1849
1850 /* Find a matching Helium device. */
1851 for (hs = ds->hs_head; hs != NULL; hs = hs->next)
1852 if (string_match(hs->name, p, len))
1853 break;
1854 if (hs == NULL)
1855 ERET(wt_api, NULL,
1856 EINVAL, "%s: no matching Helium store found", uri);
1857
1858 /*
1859 * We're about to walk the Helium device's list of files, acquire the
1860 * global lock.
1861 */
1862 WT_RET(writelock(wt_api, session, &ds->global_lock));
1863
1864 /*
1865 * Check for a match: if we find one, optionally trade the global lock
1866 * for the object's lock, optionally check if the object is busy, and
1867 * return.
1868 */
1869 for (ws = hs->ws_head; ws != NULL; ws = ws->next)
1870 if (strcmp(ws->uri, uri) == 0) {
1871 /* Check to see if the object is busy. */
1872 if (ws->ref != 0 && (flags & WS_SOURCE_OPEN_BUSY)) {
1873 ret = EBUSY;
1874 ESET(unlock(wt_api, session, &ds->global_lock));
1875 return (ret);
1876 }
1877 /* Swap the global lock for an object lock. */
1878 if (!(flags & WS_SOURCE_OPEN_GLOBAL)) {
1879 ret = writelock(wt_api, session, &ws->lock);
1880 ESET(unlock(wt_api, session, &ds->global_lock));
1881 if (ret != 0)
1882 return (ret);
1883 }
1884 *refp = ws;
1885 return (0);
1886 }
1887
1888 /* Allocate and initialize a new underlying WiredTiger source object. */
1889 if ((ws = calloc(1, sizeof(*ws))) == NULL ||
1890 (ws->uri = strdup(uri)) == NULL) {
1891 ret = os_errno();
1892 goto err;
1893 }
1894 WT_ERR(lock_init(wt_api, session, &ws->lock));
1895 ws->lockinit = true;
1896 ws->hs = hs;
1897
1898 /*
1899 * Open the underlying Helium objects, then push the change.
1900 *
1901 * The naming scheme is simple: the URI names the primary store, and the
1902 * URI with a trailing suffix names the associated caching store.
1903 *
1904 * We always set the create flag, our caller handles attempts to create
1905 * existing objects.
1906 */
1907 oflags = HE_O_CREATE;
1908 if ((ret = wt_api->config_get(wt_api,
1909 session, config, "helium_o_compress", &a)) == 0 && a.val != 0)
1910 oflags |= HE_O_COMPRESS;
1911 if (ret != 0 && ret != WT_NOTFOUND)
1912 EMSG_ERR(wt_api, session, ret,
1913 "helium_o_compress configuration: %s",
1914 wt_api->strerror(wt_api, session, ret));
1915 if ((ret = wt_api->config_get(wt_api,
1916 session, config, "helium_o_truncate", &a)) == 0 && a.val != 0)
1917 oflags |= HE_O_TRUNCATE;
1918 if (ret != 0 && ret != WT_NOTFOUND)
1919 EMSG_ERR(wt_api, session, ret,
1920 "helium_o_truncate configuration: %s",
1921 wt_api->strerror(wt_api, session, ret));
1922
1923 WT_ERR(ws_source_open_object(
1924 wtds, session, hs, uri, NULL, oflags, &ws->he));
1925 WT_ERR(ws_source_open_object(
1926 wtds, session, hs, uri, WT_NAME_CACHE, HE_O_CREATE, &ws->he_cache));
1927 if ((ret = he_commit(ws->he)) != 0)
1928 EMSG_ERR(wt_api, session, ret,
1929 "he_commit: %s", he_strerror(ret));
1930
1931 /* Optionally trade the global lock for the object lock. */
1932 if (!(flags & WS_SOURCE_OPEN_GLOBAL))
1933 WT_ERR(writelock(wt_api, session, &ws->lock));
1934
1935 /* Insert the new entry at the head of the list. */
1936 ws->next = hs->ws_head;
1937 hs->ws_head = ws;
1938
1939 *refp = ws;
1940 ws = NULL;
1941
1942 if (0) {
1943 err: if (ws != NULL)
1944 ESET(ws_source_close(wt_api, session, ws));
1945 }
1946
1947 /*
1948 * If there was an error or our caller doesn't need the global lock,
1949 * release the global lock.
1950 */
1951 if (!(flags & WS_SOURCE_OPEN_GLOBAL) || ret != 0)
1952 ESET(unlock(wt_api, session, &ds->global_lock));
1953
1954 return (ret);
1955 }
1956
1957 /*
1958 * master_uri_get --
1959 * Get the Helium master record for a URI.
1960 */
1961 static int
master_uri_get(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,char ** valuep)1962 master_uri_get(WT_DATA_SOURCE *wtds,
1963 WT_SESSION *session, const char *uri, char **valuep)
1964 {
1965 DATA_SOURCE *ds;
1966 WT_EXTENSION_API *wt_api;
1967
1968 ds = (DATA_SOURCE *)wtds;
1969 wt_api = ds->wt_api;
1970
1971 return (wt_api->metadata_search(wt_api, session, uri, valuep));
1972 }
1973
1974 /*
1975 * master_uri_drop --
1976 * Drop the Helium master record for a URI.
1977 */
1978 static int
master_uri_drop(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri)1979 master_uri_drop(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri)
1980 {
1981 DATA_SOURCE *ds;
1982 WT_EXTENSION_API *wt_api;
1983
1984 ds = (DATA_SOURCE *)wtds;
1985 wt_api = ds->wt_api;
1986
1987 return (wt_api->metadata_remove(wt_api, session, uri));
1988 }
1989
1990 /*
1991 * master_uri_rename --
1992 * Rename the Helium master record for a URI.
1993 */
1994 static int
master_uri_rename(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,const char * newuri)1995 master_uri_rename(WT_DATA_SOURCE *wtds,
1996 WT_SESSION *session, const char *uri, const char *newuri)
1997 {
1998 DATA_SOURCE *ds;
1999 WT_EXTENSION_API *wt_api;
2000 int ret = 0;
2001 char *value;
2002
2003 ds = (DATA_SOURCE *)wtds;
2004 wt_api = ds->wt_api;
2005 value = NULL;
2006
2007 /* Insert the record under a new name. */
2008 WT_ERR(master_uri_get(wtds, session, uri, &value));
2009 WT_ERR(wt_api->metadata_insert(wt_api, session, newuri, value));
2010
2011 /*
2012 * Remove the original record, and if that fails, attempt to remove
2013 * the new record.
2014 */
2015 if ((ret = wt_api->metadata_remove(wt_api, session, uri)) != 0)
2016 (void)wt_api->metadata_remove(wt_api, session, newuri);
2017
2018 err: free((void *)value);
2019 return (ret);
2020 }
2021
2022 /*
2023 * master_uri_set --
2024 * Set the Helium master record for a URI.
2025 */
2026 static int
master_uri_set(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config)2027 master_uri_set(WT_DATA_SOURCE *wtds,
2028 WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
2029 {
2030 DATA_SOURCE *ds;
2031 WT_CONFIG_ITEM a, b, c;
2032 WT_EXTENSION_API *wt_api;
2033 int exclusive, ret = 0;
2034 char value[1024];
2035
2036 ds = (DATA_SOURCE *)wtds;
2037 wt_api = ds->wt_api;
2038
2039 exclusive = 0;
2040 if ((ret =
2041 wt_api->config_get(wt_api, session, config, "exclusive", &a)) == 0)
2042 exclusive = a.val != 0;
2043 else if (ret != WT_NOTFOUND)
2044 ERET(wt_api, session, ret,
2045 "exclusive configuration: %s",
2046 wt_api->strerror(wt_api, session, ret));
2047
2048 /* Get the key/value format strings. */
2049 if ((ret = wt_api->config_get(
2050 wt_api, session, config, "key_format", &a)) != 0) {
2051 if (ret == WT_NOTFOUND) {
2052 a.str = "u";
2053 a.len = 1;
2054 } else
2055 ERET(wt_api, session, ret,
2056 "key_format configuration: %s",
2057 wt_api->strerror(wt_api, session, ret));
2058 }
2059 if ((ret = wt_api->config_get(
2060 wt_api, session, config, "value_format", &b)) != 0) {
2061 if (ret == WT_NOTFOUND) {
2062 b.str = "u";
2063 b.len = 1;
2064 } else
2065 ERET(wt_api, session, ret,
2066 "value_format configuration: %s",
2067 wt_api->strerror(wt_api, session, ret));
2068 }
2069
2070 /* Get the compression configuration. */
2071 if ((ret = wt_api->config_get(
2072 wt_api, session, config, "helium_o_compress", &c)) != 0) {
2073 if (ret == WT_NOTFOUND)
2074 c.val = 0;
2075 else
2076 ERET(wt_api, session, ret,
2077 "helium_o_compress configuration: %s",
2078 wt_api->strerror(wt_api, session, ret));
2079 }
2080
2081 /*
2082 * Create a new reference using insert (which fails if the record
2083 * already exists).
2084 */
2085 (void)snprintf(value, sizeof(value),
2086 "wiredtiger_helium_version=(major=%d,minor=%d),"
2087 "key_format=%.*s,value_format=%.*s,"
2088 "helium_o_compress=%d",
2089 WIREDTIGER_HELIUM_MAJOR, WIREDTIGER_HELIUM_MINOR,
2090 (int)a.len, a.str, (int)b.len, b.str, c.val ? 1 : 0);
2091 if ((ret = wt_api->metadata_insert(wt_api, session, uri, value)) == 0)
2092 return (0);
2093 if (ret == WT_DUPLICATE_KEY)
2094 return (exclusive ? EEXIST : 0);
2095 ERET(wt_api, session,
2096 ret, "%s: %s", uri, wt_api->strerror(wt_api, session, ret));
2097 }
2098
2099 /*
2100 * helium_session_open_cursor --
2101 * WT_SESSION.open_cursor method.
2102 */
2103 static int
helium_session_open_cursor(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config,WT_CURSOR ** new_cursor)2104 helium_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
2105 const char *uri, WT_CONFIG_ARG *config, WT_CURSOR **new_cursor)
2106 {
2107 CURSOR *cursor;
2108 DATA_SOURCE *ds;
2109 WT_CONFIG_ITEM v;
2110 WT_CONFIG_PARSER *config_parser;
2111 WT_CURSOR *wtcursor;
2112 WT_EXTENSION_API *wt_api;
2113 WT_SOURCE *ws;
2114 int locked, own, ret, tret;
2115 char *value;
2116
2117 *new_cursor = NULL;
2118
2119 config_parser = NULL;
2120 cursor = NULL;
2121 ds = (DATA_SOURCE *)wtds;
2122 wt_api = ds->wt_api;
2123 ws = NULL;
2124 locked = 0;
2125 ret = tret = 0;
2126 value = NULL;
2127
2128 /* Allocate and initialize a cursor. */
2129 if ((cursor = calloc(1, sizeof(CURSOR))) == NULL)
2130 return (os_errno());
2131
2132 if ((ret = wt_api->config_get( /* Parse configuration */
2133 wt_api, session, config, "append", &v)) != 0)
2134 EMSG_ERR(wt_api, session, ret,
2135 "append configuration: %s",
2136 wt_api->strerror(wt_api, session, ret));
2137 cursor->config_append = v.val != 0;
2138
2139 if ((ret = wt_api->config_get(
2140 wt_api, session, config, "overwrite", &v)) != 0)
2141 EMSG_ERR(wt_api, session, ret,
2142 "overwrite configuration: %s",
2143 wt_api->strerror(wt_api, session, ret));
2144 cursor->config_overwrite = v.val != 0;
2145
2146 if ((ret = wt_api->collator_config(
2147 wt_api, session, uri, config, NULL, &own)) != 0)
2148 EMSG_ERR(wt_api, session, ret,
2149 "collator configuration: %s",
2150 wt_api->strerror(wt_api, session, ret));
2151
2152 /* Finish initializing the cursor. */
2153 cursor->wtcursor.close = helium_cursor_close;
2154 cursor->wtcursor.insert = helium_cursor_insert;
2155 cursor->wtcursor.next = helium_cursor_next;
2156 cursor->wtcursor.prev = helium_cursor_prev;
2157 cursor->wtcursor.remove = helium_cursor_remove;
2158 cursor->wtcursor.reserve = helium_cursor_reserve;
2159 cursor->wtcursor.reset = helium_cursor_reset;
2160 cursor->wtcursor.search = helium_cursor_search;
2161 cursor->wtcursor.search_near = helium_cursor_search_near;
2162 cursor->wtcursor.update = helium_cursor_update;
2163
2164 cursor->wt_api = wt_api;
2165 cursor->record.key = cursor->__key;
2166 if ((cursor->v = malloc(128)) == NULL)
2167 goto err;
2168 cursor->mem_len = 128;
2169
2170 /* Get a locked reference to the WiredTiger source. */
2171 WT_ERR(ws_source_open(wtds, session, uri, config, 0, &ws));
2172 locked = 1;
2173 cursor->ws = ws;
2174
2175 /*
2176 * If this is the first access to the URI, we have to configure it
2177 * using information stored in the master record.
2178 */
2179 if (!ws->configured) {
2180 WT_ERR(master_uri_get(wtds, session, uri, &value));
2181
2182 if ((ret = wt_api->config_parser_open(wt_api,
2183 session, value, strlen(value), &config_parser)) != 0)
2184 EMSG_ERR(wt_api, session, ret,
2185 "Configuration string parser: %s",
2186 wt_api->strerror(wt_api, session, ret));
2187 if ((ret = config_parser->get(
2188 config_parser, "key_format", &v)) != 0)
2189 EMSG_ERR(wt_api, session, ret,
2190 "key_format configuration: %s",
2191 wt_api->strerror(wt_api, session, ret));
2192 ws->config_recno = v.len == 1 && v.str[0] == 'r';
2193
2194 if ((ret = config_parser->get(
2195 config_parser, "value_format", &v)) != 0)
2196 EMSG_ERR(wt_api, session, ret,
2197 "value_format configuration: %s",
2198 wt_api->strerror(wt_api, session, ret));
2199 ws->config_bitfield = v.len == 2 &&
2200 isdigit((u_char)v.str[0]) && v.str[1] == 't';
2201
2202 /*
2203 * If it's a record-number key, read the last record from the
2204 * object and set the allocation record value.
2205 */
2206 if (ws->config_recno) {
2207 wtcursor = (WT_CURSOR *)cursor;
2208 WT_ERR(helium_cursor_reset(wtcursor));
2209
2210 if ((ret = helium_cursor_prev(wtcursor)) == 0)
2211 ws->append_recno = wtcursor->recno;
2212 else if (ret != WT_NOTFOUND)
2213 goto err;
2214
2215 WT_ERR(helium_cursor_reset(wtcursor));
2216 }
2217
2218 ws->configured = true;
2219 }
2220
2221 /* Increment the open reference count to pin the URI and unlock it. */
2222 ++ws->ref;
2223 WT_ERR(unlock(wt_api, session, &ws->lock));
2224
2225 *new_cursor = (WT_CURSOR *)cursor;
2226
2227 if (0) {
2228 err: if (ws != NULL && locked)
2229 ESET(unlock(wt_api, session, &ws->lock));
2230 cursor_destroy(cursor);
2231 }
2232 if (config_parser != NULL &&
2233 (tret = config_parser->close(config_parser)) != 0)
2234 EMSG(wt_api, session, tret,
2235 "WT_CONFIG_PARSER.close: %s",
2236 wt_api->strerror(wt_api, session, tret));
2237
2238 free((void *)value);
2239 return (ret);
2240 }
2241
2242 /*
2243 * helium_session_create --
2244 * WT_SESSION.create method.
2245 */
2246 static int
helium_session_create(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config)2247 helium_session_create(WT_DATA_SOURCE *wtds,
2248 WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
2249 {
2250 DATA_SOURCE *ds;
2251 WT_EXTENSION_API *wt_api;
2252 WT_SOURCE *ws;
2253
2254 ds = (DATA_SOURCE *)wtds;
2255 wt_api = ds->wt_api;
2256
2257 /*
2258 * Get a locked reference to the WiredTiger source, then immediately
2259 * unlock it, we aren't doing anything else.
2260 */
2261 WT_RET(ws_source_open(wtds, session, uri, config, 0, &ws));
2262 WT_RET(unlock(wt_api, session, &ws->lock));
2263
2264 /*
2265 * Create the URI master record if it doesn't already exist.
2266 *
2267 * We've discarded the lock, but that's OK, creates are single-threaded
2268 * at the WiredTiger level, it's not our problem to solve.
2269 *
2270 * If unable to enter a WiredTiger record, leave the Helium store alone.
2271 * A subsequent create should do the right thing, we aren't leaving
2272 * anything in an inconsistent state.
2273 */
2274 return (master_uri_set(wtds, session, uri, config));
2275 }
2276
2277 /*
2278 * helium_session_drop --
2279 * WT_SESSION.drop method.
2280 */
2281 static int
helium_session_drop(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config)2282 helium_session_drop(WT_DATA_SOURCE *wtds,
2283 WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
2284 {
2285 DATA_SOURCE *ds;
2286 HELIUM_SOURCE *hs;
2287 WT_EXTENSION_API *wt_api;
2288 WT_SOURCE **p, *ws;
2289 int ret = 0;
2290
2291 ds = (DATA_SOURCE *)wtds;
2292 wt_api = ds->wt_api;
2293
2294 /*
2295 * Get a locked reference to the data source: hold the global lock,
2296 * we're changing the HELIUM_SOURCE's list of WT_SOURCE objects.
2297 *
2298 * Remove the entry from the WT_SOURCE list -- it's a singly-linked
2299 * list, find the reference to it.
2300 */
2301 WT_RET(ws_source_open(wtds, session, uri, config,
2302 WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws));
2303 hs = ws->hs;
2304 for (p = &hs->ws_head; *p != NULL; p = &(*p)->next)
2305 if (*p == ws) {
2306 *p = (*p)->next;
2307 break;
2308 }
2309
2310 /* Drop the underlying Helium objects. */
2311 ESET(he_remove(ws->he));
2312 ws->he = NULL; /* The handle is dead. */
2313 ESET(he_remove(ws->he_cache));
2314 ws->he_cache = NULL; /* The handle is dead. */
2315
2316 /* Close the source, discarding the structure. */
2317 ESET(ws_source_close(wt_api, session, ws));
2318 ws = NULL;
2319
2320 /* Discard the metadata entry. */
2321 ESET(master_uri_drop(wtds, session, uri));
2322
2323 /*
2324 * If we have an error at this point, panic -- there's an inconsistency
2325 * in what WiredTiger knows about and the underlying store.
2326 */
2327 if (ret != 0)
2328 ret = WT_PANIC;
2329
2330 ESET(unlock(wt_api, session, &ds->global_lock));
2331 return (ret);
2332 }
2333
2334 /*
2335 * helium_session_rename --
2336 * WT_SESSION.rename method.
2337 */
2338 static int
helium_session_rename(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,const char * newuri,WT_CONFIG_ARG * config)2339 helium_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
2340 const char *uri, const char *newuri, WT_CONFIG_ARG *config)
2341 {
2342 DATA_SOURCE *ds;
2343 WT_EXTENSION_API *wt_api;
2344 WT_SOURCE *ws;
2345 int ret = 0;
2346 char *p;
2347
2348 ds = (DATA_SOURCE *)wtds;
2349 wt_api = ds->wt_api;
2350
2351 /*
2352 * Get a locked reference to the data source; hold the global lock,
2353 * we are going to change the object's name, and we can't allow
2354 * other threads walking the list and comparing against the name.
2355 */
2356 WT_RET(ws_source_open(wtds, session, uri, config,
2357 WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws));
2358
2359 /* Get a copy of the new name for the WT_SOURCE structure. */
2360 if ((p = strdup(newuri)) == NULL) {
2361 ret = os_errno();
2362 goto err;
2363 }
2364 free(ws->uri);
2365 ws->uri = p;
2366
2367 /* Rename the underlying Helium objects. */
2368 ESET(ws_source_name(wtds, session, newuri, NULL, &p));
2369 if (ret == 0) {
2370 ESET(he_rename(ws->he, p));
2371 free(p);
2372 }
2373 ESET(ws_source_name(wtds, session, newuri, WT_NAME_CACHE, &p));
2374 if (ret == 0) {
2375 ESET(he_rename(ws->he_cache, p));
2376 free(p);
2377 }
2378
2379 /* Update the metadata record. */
2380 ESET(master_uri_rename(wtds, session, uri, newuri));
2381
2382 /*
2383 * If we have an error at this point, panic -- there's an inconsistency
2384 * in what WiredTiger knows about and the underlying store.
2385 */
2386 if (ret != 0)
2387 ret = WT_PANIC;
2388
2389 err: ESET(unlock(wt_api, session, &ds->global_lock));
2390
2391 return (ret);
2392 }
2393
2394 /*
2395 * helium_session_truncate --
2396 * WT_SESSION.truncate method.
2397 */
2398 static int
helium_session_truncate(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config)2399 helium_session_truncate(WT_DATA_SOURCE *wtds,
2400 WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
2401 {
2402 DATA_SOURCE *ds;
2403 WT_EXTENSION_API *wt_api;
2404 int ret = 0;
2405
2406 (void)config;
2407
2408 ds = (DATA_SOURCE *)wtds;
2409 wt_api = ds->wt_api;
2410
2411 /*
2412 * XXX
2413 * Fail URI truncation for now. (Truncation based on a cursor range is
2414 * handled by the upper-levels of WiredTiger, this is just support for
2415 * URI truncation.) The problem is there's no way to truncate an open
2416 * object in Helium without closing handles, and we can't close/re-open
2417 * handles because we don't have the configuration information from the
2418 * open.
2419 */
2420 ERET(wt_api, session, ENOTSUP, "WT_SESSION.truncate: %s", uri);
2421 }
2422
2423 /*
2424 * helium_session_verify --
2425 * WT_SESSION.verify method.
2426 */
2427 static int
helium_session_verify(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config)2428 helium_session_verify(WT_DATA_SOURCE *wtds,
2429 WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
2430 {
2431 (void)wtds;
2432 (void)session;
2433 (void)uri;
2434 (void)config;
2435 return (0);
2436 }
2437
2438 /*
2439 * helium_session_checkpoint --
2440 * WT_SESSION.checkpoint method.
2441 */
2442 static int
helium_session_checkpoint(WT_DATA_SOURCE * wtds,WT_SESSION * session,WT_CONFIG_ARG * config)2443 helium_session_checkpoint(
2444 WT_DATA_SOURCE *wtds, WT_SESSION *session, WT_CONFIG_ARG *config)
2445 {
2446 DATA_SOURCE *ds;
2447 HELIUM_SOURCE *hs;
2448 WT_EXTENSION_API *wt_api;
2449 int ret = 0;
2450
2451 (void)config;
2452
2453 ds = (DATA_SOURCE *)wtds;
2454 wt_api = ds->wt_api;
2455
2456 /* Flush all volumes. */
2457 if ((hs = ds->hs_head) != NULL &&
2458 (ret = he_commit(hs->he_volume)) != 0)
2459 ERET(wt_api, session, ret,
2460 "he_commit: %s: %s", hs->device, he_strerror(ret));
2461
2462 return (0);
2463 }
2464
2465 /*
2466 * helium_source_close --
2467 * Discard a HELIUM_SOURCE.
2468 */
2469 static int
helium_source_close(WT_EXTENSION_API * wt_api,WT_SESSION * session,HELIUM_SOURCE * hs)2470 helium_source_close(
2471 WT_EXTENSION_API *wt_api, WT_SESSION *session, HELIUM_SOURCE *hs)
2472 {
2473 WT_SOURCE *ws;
2474 int ret = 0, tret;
2475
2476 /* Resolve the cache into the primary one last time and quit. */
2477 if (hs->cleaner_id != 0) {
2478 hs->cleaner_stop = 1;
2479
2480 if ((tret = pthread_join(hs->cleaner_id, NULL)) != 0)
2481 EMSG(wt_api, session, tret,
2482 "pthread_join: %s", strerror(tret));
2483 hs->cleaner_id = 0;
2484 }
2485
2486 /* Close the underlying WiredTiger sources. */
2487 while ((ws = hs->ws_head) != NULL) {
2488 hs->ws_head = ws->next;
2489 ESET(ws_source_close(wt_api, session, ws));
2490 }
2491
2492 /* If the owner, close the database transaction store. */
2493 if (hs->he_txn != NULL && hs->he_owner) {
2494 if ((tret = he_close(hs->he_txn)) != 0)
2495 EMSG(wt_api, session, tret,
2496 "he_close: %s: %s: %s",
2497 hs->name, WT_NAME_TXN, he_strerror(tret));
2498 hs->he_txn = NULL;
2499 hs->he_owner = false;
2500 }
2501
2502 /* Flush and close the Helium source. */
2503 if (hs->he_volume != NULL) {
2504 if ((tret = he_commit(hs->he_volume)) != 0)
2505 EMSG(wt_api, session, tret,
2506 "he_commit: %s: %s",
2507 hs->device, he_strerror(tret));
2508
2509 if ((tret = he_close(hs->he_volume)) != 0)
2510 EMSG(wt_api, session, tret,
2511 "he_close: %s: %s: %s",
2512 hs->name, WT_NAME_INIT, he_strerror(tret));
2513 hs->he_volume = NULL;
2514 }
2515
2516 free(hs->name);
2517 free(hs->device);
2518 OVERWRITE_AND_FREE(hs);
2519
2520 return (ret);
2521 }
2522
2523 /*
2524 * cache_cleaner --
2525 * Migrate information from the cache to the primary store.
2526 */
2527 static int
cache_cleaner(WT_EXTENSION_API * wt_api,WT_CURSOR * wtcursor,uint64_t oldest,uint64_t * txnminp)2528 cache_cleaner(WT_EXTENSION_API *wt_api,
2529 WT_CURSOR *wtcursor, uint64_t oldest, uint64_t *txnminp)
2530 {
2531 CACHE_RECORD *cp;
2532 CURSOR *cursor;
2533 HE_ITEM *r;
2534 WT_SOURCE *ws;
2535 uint64_t txnid;
2536 int locked, pushed, recovery, ret = 0;
2537
2538 /*
2539 * Called in two ways: in normal processing mode where we're supplied a
2540 * value for the oldest transaction ID not yet visible to a running
2541 * transaction, and we're tracking the smallest transaction ID
2542 * referenced by any cache entry, and in recovery mode where neither of
2543 * those are true.
2544 */
2545 if (txnminp == NULL)
2546 recovery = 1;
2547 else {
2548 recovery = 0;
2549 *txnminp = UINT64_MAX;
2550 }
2551
2552 cursor = (CURSOR *)wtcursor;
2553 ws = cursor->ws;
2554 r = &cursor->record;
2555 locked = pushed = 0;
2556
2557 /*
2558 * For every cache key where all updates are globally visible:
2559 * Migrate the most recent update value to the primary store.
2560 */
2561 for (r->key_len = 0; (ret =
2562 helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) {
2563 /*
2564 * Unmarshall the value, and if all of the updates are globally
2565 * visible, update the primary with the last committed update.
2566 * In normal processing, the last committed update test is for
2567 * a globally visible update that's not explicitly aborted. In
2568 * recovery processing, the last committed update test is for
2569 * an explicitly committed update. See the underlying functions
2570 * for more information.
2571 */
2572 if ((ret = cache_value_unmarshall(wtcursor)) != 0)
2573 goto err;
2574 if (!recovery && !cache_value_visible_all(wtcursor, oldest))
2575 continue;
2576 if (recovery)
2577 cache_value_last_committed(wtcursor, &cp);
2578 else
2579 cache_value_last_not_aborted(wtcursor, &cp);
2580 if (cp == NULL)
2581 continue;
2582
2583 pushed = 1;
2584 if (cp->remove) {
2585 if ((ret = he_delete(ws->he, r)) == 0)
2586 continue;
2587
2588 /*
2589 * Updates confined to the cache may not appear in the
2590 * primary at all, that is, an insert and remove pair
2591 * may be confined to the cache.
2592 */
2593 if (ret == HE_ERR_ITEM_NOT_FOUND) {
2594 ret = 0;
2595 continue;
2596 }
2597 ERET(wt_api, NULL, ret,
2598 "he_delete: %s", he_strerror(ret));
2599 } else {
2600 r->val = cp->v;
2601 r->val_len = cp->len;
2602 ret = he_update(ws->he, r);
2603 if (ret == 0)
2604 continue;
2605
2606 ERET(wt_api, NULL, ret,
2607 "he_update: %s", he_strerror(ret));
2608 }
2609 }
2610
2611 if (ret == WT_NOTFOUND)
2612 ret = 0;
2613 if (ret != 0)
2614 ERET(wt_api, NULL, ret, "he_next: %s", he_strerror(ret));
2615
2616 /*
2617 * If we didn't move any keys from the cache to the primary, quit. It's
2618 * possible we could still remove values from the cache, but not likely,
2619 * and another pass would probably be wasted effort (especially locked).
2620 */
2621 if (!pushed)
2622 return (0);
2623
2624 /*
2625 * Push the store to stable storage for correctness. (It doesn't matter
2626 * what Helium handle we commit, so we just commit one of them.)
2627 */
2628 if ((ret = he_commit(ws->he)) != 0)
2629 ERET(wt_api, NULL, ret, "he_commit: %s", he_strerror(ret));
2630
2631 /*
2632 * If we're performing recovery, that's all we need to do, we're going
2633 * to simply discard the cache, there's no reason to remove entries one
2634 * at a time.
2635 */
2636 if (recovery)
2637 return (0);
2638
2639 /*
2640 * For every cache key where all updates are globally visible:
2641 * Remove the cache key.
2642 *
2643 * We're updating the cache, which requires a lock during normal
2644 * cleaning.
2645 */
2646 WT_ERR(writelock(wt_api, NULL, &ws->lock));
2647 locked = 1;
2648
2649 for (r->key_len = 0; (ret =
2650 helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) {
2651 /*
2652 * Unmarshall the value, and if all of the updates are globally
2653 * visible, remove the cache entry.
2654 */
2655 WT_ERR(cache_value_unmarshall(wtcursor));
2656 if (cache_value_visible_all(wtcursor, oldest)) {
2657 if ((ret = he_delete(ws->he_cache, r)) != 0)
2658 EMSG_ERR(wt_api, NULL, ret,
2659 "he_delete: %s", he_strerror(ret));
2660 continue;
2661 }
2662
2663 /*
2664 * If the entry will remain in the cache, figure out the oldest
2665 * transaction for which it contains an update (which might be
2666 * different from the oldest transaction in the system). We
2667 * need the oldest transaction ID that appears anywhere in any
2668 * cache, it limits the records we can discard from the
2669 * transaction store.
2670 */
2671 cache_value_txnmin(wtcursor, &txnid);
2672 if (txnid < *txnminp)
2673 *txnminp = txnid;
2674 }
2675
2676 locked = 0;
2677 WT_ERR(unlock(wt_api, NULL, &ws->lock));
2678 if (ret == WT_NOTFOUND)
2679 ret = 0;
2680 if (ret != 0)
2681 EMSG_ERR(wt_api, NULL, ret, "he_next: %s", he_strerror(ret));
2682
2683 err: if (locked)
2684 ESET(unlock(wt_api, NULL, &ws->lock));
2685
2686 return (ret);
2687 }
2688
2689 /*
2690 * txn_cleaner --
2691 * Discard no longer needed entries from the transaction store.
2692 */
2693 static int
txn_cleaner(WT_CURSOR * wtcursor,he_t he_txn,uint64_t txnmin)2694 txn_cleaner(WT_CURSOR *wtcursor, he_t he_txn, uint64_t txnmin)
2695 {
2696 CURSOR *cursor;
2697 HE_ITEM *r;
2698 WT_EXTENSION_API *wt_api;
2699 uint64_t txnid;
2700 int ret = 0;
2701
2702 cursor = (CURSOR *)wtcursor;
2703 wt_api = cursor->wt_api;
2704 r = &cursor->record;
2705
2706 /*
2707 * Remove all entries from the transaction store that are before the
2708 * oldest transaction ID that appears anywhere in any cache.
2709 */
2710 for (r->key_len = 0;
2711 (ret = helium_call(wtcursor, "he_next", he_txn, he_next)) == 0;) {
2712 memcpy(&txnid, r->key, sizeof(txnid));
2713 if (txnid < txnmin && (ret = he_delete(he_txn, r)) != 0)
2714 ERET(wt_api, NULL, ret,
2715 "he_delete: %s", he_strerror(ret));
2716 }
2717 if (ret == WT_NOTFOUND)
2718 ret = 0;
2719 if (ret != 0)
2720 ERET(wt_api, NULL, ret, "he_next: %s", he_strerror(ret));
2721
2722 return (0);
2723 }
2724
2725 /*
2726 * fake_cursor --
2727 * Fake up enough of a cursor to do Helium operations.
2728 */
2729 static int
fake_cursor(WT_EXTENSION_API * wt_api,WT_CURSOR ** wtcursorp)2730 fake_cursor(WT_EXTENSION_API *wt_api, WT_CURSOR **wtcursorp)
2731 {
2732 CURSOR *cursor;
2733 WT_CURSOR *wtcursor;
2734
2735 /*
2736 * Fake a cursor.
2737 */
2738 if ((cursor = calloc(1, sizeof(CURSOR))) == NULL)
2739 return (os_errno());
2740 cursor->wt_api = wt_api;
2741 cursor->record.key = cursor->__key;
2742 if ((cursor->v = malloc(128)) == NULL) {
2743 free(cursor);
2744 return (os_errno());
2745 }
2746 cursor->mem_len = 128;
2747
2748 /*
2749 * !!!
2750 * Fake cursors don't have WT_SESSION handles.
2751 */
2752 wtcursor = (WT_CURSOR *)cursor;
2753 wtcursor->session = NULL;
2754
2755 *wtcursorp = wtcursor;
2756 return (0);
2757 }
2758
2759 /*
2760 * cache_cleaner_worker --
2761 * Thread to migrate data from the cache to the primary.
2762 */
2763 static void *
cache_cleaner_worker(void * arg)2764 cache_cleaner_worker(void *arg)
2765 {
2766 struct timeval t;
2767 CURSOR *cursor;
2768 HELIUM_SOURCE *hs;
2769 WT_CURSOR *wtcursor;
2770 WT_EXTENSION_API *wt_api;
2771 WT_SOURCE *ws;
2772 uint64_t oldest, txnmin, txntmp;
2773 int cleaner_stop, delay, ret = 0;
2774
2775 hs = (HELIUM_SOURCE *)arg;
2776
2777 cursor = NULL;
2778 wt_api = hs->wt_api;
2779
2780 if ((ret = fake_cursor(wt_api, &wtcursor)) != 0)
2781 EMSG_ERR(wt_api, NULL, ret, "cleaner: %s", strerror(ret));
2782 cursor = (CURSOR *)wtcursor;
2783
2784 for (cleaner_stop = delay = 0; !cleaner_stop;) {
2785 /*
2786 * Check if this will be the final run; cleaner_stop is declared
2787 * volatile, and so the read will happen. We don't much care if
2788 * there's extra loops, it's enough if a read eventually happens
2789 * and finds the variable set. Store the read locally, reading
2790 * the variable twice might race.
2791 */
2792 cleaner_stop = hs->cleaner_stop;
2793
2794 /*
2795 * Delay if this isn't the final run and the last pass didn't
2796 * find any work to do.
2797 */
2798 if (!cleaner_stop && delay != 0) {
2799 t.tv_sec = delay;
2800 t.tv_usec = 0;
2801 (void)select(0, NULL, NULL, NULL, &t);
2802 }
2803
2804 /* Run at least every 5 seconds. */
2805 if (delay < 5)
2806 ++delay;
2807
2808 /*
2809 * Clean the datastore caches. It's both more and less expensive
2810 * to return values from the cache: more because we have to
2811 * marshall/unmarshall the values, less because there's only a
2812 * single lookup to the cache store rather than a lookup into
2813 * the cache and then the primary. I have no tuning information,
2814 * for now, just clean if there have been 1K operations.
2815 */
2816 #undef CACHE_SIZE_TRIGGER
2817 #define CACHE_SIZE_TRIGGER (1000)
2818 for (ws = hs->ws_head; ws != NULL; ws = ws->next)
2819 if (ws->he_cache_ops > CACHE_SIZE_TRIGGER)
2820 break;
2821 if (!cleaner_stop && ws == NULL)
2822 continue;
2823
2824 /* There was work to do, don't delay before checking again. */
2825 delay = 0;
2826
2827 /*
2828 * Get the oldest transaction ID not yet visible to a running
2829 * transaction. Do this before doing anything else, avoiding
2830 * any race with creating new WT_SOURCE handles.
2831 */
2832 oldest = wt_api->transaction_oldest(wt_api);
2833
2834 /*
2835 * If any cache needs cleaning, clean them all, because we have
2836 * to know the minimum transaction ID referenced by any cache.
2837 *
2838 * For each cache/primary pair, migrate whatever records we can,
2839 * tracking the lowest transaction ID of any entry in any cache.
2840 */
2841 txnmin = UINT64_MAX;
2842 for (ws = hs->ws_head; ws != NULL; ws = ws->next) {
2843 /* Reset the operations counter. */
2844 ws->he_cache_ops = 0;
2845
2846 cursor->ws = ws;
2847 WT_ERR(cache_cleaner(
2848 wt_api, wtcursor, oldest, &txntmp));
2849 if (txntmp < txnmin)
2850 txnmin = txntmp;
2851 }
2852
2853 /*
2854 * Discard any transactions less than the minimum transaction ID
2855 * referenced in any cache.
2856 *
2857 * !!!
2858 * I'm playing fast-and-loose with whether or not the cursor
2859 * references an underlying WT_SOURCE, there's a structural
2860 * problem here.
2861 */
2862 cursor->ws = NULL;
2863 WT_ERR(txn_cleaner(wtcursor, hs->he_txn, txnmin));
2864 }
2865
2866 err: cursor_destroy(cursor);
2867 return (NULL);
2868 }
2869
2870 /*
2871 * helium_config_read --
2872 * Parse the Helium configuration.
2873 */
2874 static int
helium_config_read(WT_EXTENSION_API * wt_api,WT_CONFIG_ITEM * config,char ** devicep,HE_ENV * envp,int * env_setp,int * flagsp)2875 helium_config_read(WT_EXTENSION_API *wt_api, WT_CONFIG_ITEM *config,
2876 char **devicep, HE_ENV *envp, int *env_setp, int *flagsp)
2877 {
2878 WT_CONFIG_ITEM k, v;
2879 WT_CONFIG_PARSER *config_parser;
2880 int ret = 0, tret;
2881
2882 *env_setp = 0;
2883 *flagsp = 0;
2884
2885 /* Traverse the configuration arguments list. */
2886 if ((ret = wt_api->config_parser_open(
2887 wt_api, NULL, config->str, config->len, &config_parser)) != 0)
2888 ERET(wt_api, NULL, ret,
2889 "WT_EXTENSION_API.config_parser_open: %s",
2890 wt_api->strerror(wt_api, NULL, ret));
2891 while ((ret = config_parser->next(config_parser, &k, &v)) == 0) {
2892 if (string_match("helium_devices", k.str, k.len)) {
2893 if ((*devicep = calloc(1, v.len + 1)) == NULL)
2894 return (os_errno());
2895 memcpy(*devicep, v.str, v.len);
2896 continue;
2897 }
2898 if (string_match("helium_read_cache", k.str, k.len)) {
2899 envp->read_cache = (uint64_t)v.val;
2900 *env_setp = 1;
2901 continue;
2902 }
2903 if (string_match("helium_write_cache", k.str, k.len)) {
2904 envp->write_cache = (uint64_t)v.val;
2905 *env_setp = 1;
2906 continue;
2907 }
2908 if (string_match("helium_o_volume_truncate", k.str, k.len)) {
2909 if (v.val != 0)
2910 *flagsp |= HE_O_VOLUME_TRUNCATE;
2911 continue;
2912 }
2913 EMSG_ERR(wt_api, NULL, EINVAL,
2914 "unknown configuration key value pair %.*s=%.*s",
2915 (int)k.len, k.str, (int)v.len, v.str);
2916 }
2917 if (ret == WT_NOTFOUND)
2918 ret = 0;
2919 if (ret != 0)
2920 EMSG_ERR(wt_api, NULL, ret,
2921 "WT_CONFIG_PARSER.next: %s",
2922 wt_api->strerror(wt_api, NULL, ret));
2923
2924 err: if ((tret = config_parser->close(config_parser)) != 0)
2925 EMSG(wt_api, NULL, tret,
2926 "WT_CONFIG_PARSER.close: %s",
2927 wt_api->strerror(wt_api, NULL, tret));
2928
2929 return (ret);
2930 }
2931
2932 /*
2933 * helium_source_open --
2934 * Allocate and open a Helium source.
2935 */
2936 static int
helium_source_open(DATA_SOURCE * ds,WT_CONFIG_ITEM * k,WT_CONFIG_ITEM * v)2937 helium_source_open(DATA_SOURCE *ds, WT_CONFIG_ITEM *k, WT_CONFIG_ITEM *v)
2938 {
2939 struct he_env env;
2940 HELIUM_SOURCE *hs;
2941 WT_EXTENSION_API *wt_api;
2942 int env_set, flags, ret = 0;
2943
2944 wt_api = ds->wt_api;
2945 hs = NULL;
2946
2947 VMSG(wt_api, NULL, VERBOSE_L1, "volume %.*s=%.*s",
2948 (int)k->len, k->str, (int)v->len, v->str);
2949
2950 /*
2951 * Check for a Helium source we've already opened: we don't check the
2952 * value (which implies you can open the same underlying stores using
2953 * more than one name, but I don't know of any problems that causes),
2954 * we only check the key, that is, the top-level WiredTiger name.
2955 */
2956 for (hs = ds->hs_head; hs != NULL; hs = hs->next)
2957 if (string_match(hs->name, k->str, k->len))
2958 ERET(wt_api, NULL,
2959 EINVAL, "%s: device already open", hs->name);
2960
2961 /* Allocate and initialize a new underlying Helium source object. */
2962 if ((hs = calloc(1, sizeof(*hs))) == NULL ||
2963 (hs->name = calloc(1, k->len + 1)) == NULL) {
2964 free(hs);
2965 return (os_errno());
2966 }
2967 memcpy(hs->name, k->str, k->len);
2968 hs->txn_notify.notify = txn_notify;
2969 hs->wt_api = wt_api;
2970
2971 /* Read the configuration, require a device naming the Helium store. */
2972 memset(&env, 0, sizeof(env));
2973 WT_ERR(helium_config_read(
2974 wt_api, v, &hs->device, &env, &env_set, &flags));
2975 if (hs->device == NULL)
2976 EMSG_ERR(wt_api, NULL,
2977 EINVAL, "%s: no Helium volumes specified", hs->name);
2978
2979 /*
2980 * Open the Helium volume, creating it if necessary. We have to open
2981 * an object at the same time, that's why we have object flags as well
2982 * as volume flags.
2983 */
2984 flags |= HE_O_CREATE | HE_O_TRUNCATE | HE_O_CLEAN | HE_O_VOLUME_CREATE;
2985 if ((hs->he_volume = he_open(
2986 hs->device, WT_NAME_INIT, flags, env_set ? &env : NULL)) == NULL) {
2987 ret = os_errno();
2988 EMSG_ERR(wt_api, NULL, ret,
2989 "he_open: %s: %s: %s",
2990 hs->name, WT_NAME_INIT, he_strerror(ret));
2991 }
2992
2993 /* Insert the new entry at the head of the list. */
2994 hs->next = ds->hs_head;
2995 ds->hs_head = hs;
2996
2997 if (0) {
2998 err: if (hs != NULL)
2999 ESET(helium_source_close(wt_api, NULL, hs));
3000 }
3001 return (ret);
3002 }
3003
3004 /*
3005 * helium_source_txn_open --
3006 * Open the database-wide transaction store.
3007 */
3008 static int
helium_source_txn_open(DATA_SOURCE * ds)3009 helium_source_txn_open(DATA_SOURCE *ds)
3010 {
3011 HELIUM_SOURCE *hs, *hs_txn;
3012 WT_EXTENSION_API *wt_api;
3013 he_t he_txn, t;
3014 int ret = 0;
3015
3016 wt_api = ds->wt_api;
3017
3018 /*
3019 * The global txn namespace is per connection, it spans multiple Helium
3020 * sources.
3021 *
3022 * We've opened the Helium sources: check to see if any of them already
3023 * have a transaction store, and make sure we only find one.
3024 */
3025 hs_txn = NULL;
3026 he_txn = NULL;
3027 for (hs = ds->hs_head; hs != NULL; hs = hs->next)
3028 if ((t = he_open(hs->device, WT_NAME_TXN, 0, NULL)) != NULL) {
3029 if (hs_txn != NULL) {
3030 (void)he_close(t);
3031 (void)he_close(hs_txn);
3032 ERET(wt_api, NULL, WT_PANIC,
3033 "found multiple transaction stores, "
3034 "unable to proceed");
3035 }
3036 he_txn = t;
3037 hs_txn = hs;
3038 }
3039
3040 /*
3041 * If we didn't find a transaction store, open a transaction store in
3042 * the first Helium source we loaded. (It could just as easily be the
3043 * last one we loaded, we're just picking one, but picking the first
3044 * seems slightly less likely to make people wonder.)
3045 */
3046 if ((hs = hs_txn) == NULL) {
3047 for (hs = ds->hs_head; hs->next != NULL; hs = hs->next)
3048 ;
3049 if ((he_txn = he_open(
3050 hs->device, WT_NAME_TXN, HE_O_CREATE, NULL)) == NULL) {
3051 ret = os_errno();
3052 ERET(wt_api, NULL, ret,
3053 "he_open: %s: %s: %s",
3054 hs->name, WT_NAME_TXN, he_strerror(ret));
3055 }
3056
3057 /* Push the change. */
3058 if ((ret = he_commit(he_txn)) != 0)
3059 ERET(wt_api, NULL, ret,
3060 "he_commit: %s", he_strerror(ret));
3061 }
3062 VMSG(wt_api, NULL, VERBOSE_L1, "%s" "transactional store on %s",
3063 hs_txn == NULL ? "creating " : "", hs->name);
3064
3065 /* Set the owner field, this Helium source has to be closed last. */
3066 hs->he_owner = true;
3067
3068 /* Add a reference to the transaction store in each Helium source. */
3069 for (hs = ds->hs_head; hs != NULL; hs = hs->next)
3070 hs->he_txn = he_txn;
3071
3072 return (0);
3073 }
3074
3075 /*
3076 * helium_source_txn_truncate --
3077 * Truncate the database-wide transaction store.
3078 */
3079 static int
helium_source_txn_truncate(DATA_SOURCE * ds)3080 helium_source_txn_truncate(DATA_SOURCE *ds)
3081 {
3082 HELIUM_SOURCE *hs;
3083 WT_EXTENSION_API *wt_api;
3084 int ret = 0;
3085
3086 wt_api = ds->wt_api;
3087
3088 /*
3089 * We want to truncate the transaction store after recovery, but there
3090 * isn't a Helium truncate operation. Remove/re-open the store instead.
3091 */
3092 hs = ds->hs_head;
3093 if (hs->he_txn != NULL && (ret = he_remove(hs->he_txn)) != 0)
3094 ERET(wt_api, NULL, ret,
3095 "he_remove: %s: %s: %s",
3096 hs->name, WT_NAME_TXN, he_strerror(ret));
3097
3098 /* The handle is dead, clear any references. */
3099 for (hs = ds->hs_head; hs != NULL; hs = hs->next) {
3100 hs->he_txn = NULL;
3101 hs->he_owner = false;
3102 }
3103
3104 return (helium_source_txn_open(ds));
3105 }
3106
3107 /*
3108 * helium_source_recover_namespace --
3109 * Recover a single cache/primary pair in a Helium namespace.
3110 */
3111 static int
helium_source_recover_namespace(WT_DATA_SOURCE * wtds,HELIUM_SOURCE * hs,const char * name,WT_CONFIG_ARG * config)3112 helium_source_recover_namespace(WT_DATA_SOURCE *wtds,
3113 HELIUM_SOURCE *hs, const char *name, WT_CONFIG_ARG *config)
3114 {
3115 CURSOR *cursor;
3116 DATA_SOURCE *ds;
3117 WT_CURSOR *wtcursor;
3118 WT_EXTENSION_API *wt_api;
3119 WT_SOURCE *ws;
3120 size_t len;
3121 int ret = 0;
3122 const char *p;
3123 char *uri;
3124
3125 ds = (DATA_SOURCE *)wtds;
3126 wt_api = ds->wt_api;
3127 cursor = NULL;
3128 ws = NULL;
3129 uri = NULL;
3130
3131 /*
3132 * The name we store on the Helium device is a translation of the
3133 * WiredTiger name: do the reverse process here so we can use the
3134 * standard source-open function.
3135 */
3136 p = name + strlen(WT_NAME_PREFIX);
3137 len = strlen("helium:") + strlen(hs->name) + strlen(p) + 10;
3138 if ((uri = malloc(len)) == NULL) {
3139 ret = os_errno();
3140 goto err;
3141 }
3142 (void)snprintf(uri, len, "helium:%s/%s", hs->name, p);
3143
3144 /*
3145 * Open the cache/primary pair by going through the full open process,
3146 * instantiating the underlying WT_SOURCE object.
3147 */
3148 WT_ERR(ws_source_open(wtds, NULL, uri, config, 0, &ws));
3149 WT_ERR(unlock(wt_api, NULL, &ws->lock));
3150
3151 /* Fake up a cursor. */
3152 if ((ret = fake_cursor(wt_api, &wtcursor)) != 0)
3153 EMSG_ERR(wt_api, NULL, ret, "recovery: %s", strerror(ret));
3154 cursor = (CURSOR *)wtcursor;
3155 cursor->ws = ws;
3156
3157 /* Process, then remove, the cache. */
3158 WT_ERR(cache_cleaner(wt_api, wtcursor, 0, NULL));
3159
3160 if ((ret = he_remove(ws->he_cache)) != 0)
3161 EMSG(wt_api, NULL, ret,
3162 "he_remove: %s(cache): %s", ws->uri, he_strerror(ret));
3163 ws->he_cache = NULL; /* The handle is dead. */
3164
3165 /* Close the underlying WiredTiger sources. */
3166 err: while ((ws = hs->ws_head) != NULL) {
3167 hs->ws_head = ws->next;
3168 ESET(ws_source_close(wt_api, NULL, ws));
3169 }
3170
3171 cursor_destroy(cursor);
3172 free(uri);
3173
3174 return (ret);
3175 }
3176
3177 struct helium_namespace_cookie {
3178 char **list;
3179 u_int list_cnt;
3180 u_int list_max;
3181 };
3182
3183 /*
3184 * helium_namespace_list --
3185 * Get a list of the objects we're going to recover.
3186 */
3187 static int
helium_namespace_list(void * cookie,const char * name)3188 helium_namespace_list(void *cookie, const char *name)
3189 {
3190 struct helium_namespace_cookie *names;
3191 void *allocp;
3192
3193 names = cookie;
3194
3195 /*
3196 * Ignore any files without a WiredTiger prefix.
3197 * Ignore the metadata and cache files.
3198 */
3199 if (!prefix_match(name, WT_NAME_PREFIX))
3200 return (0);
3201 if (strcmp(name, WT_NAME_INIT) == 0)
3202 return (0);
3203 if (strcmp(name, WT_NAME_TXN) == 0)
3204 return (0);
3205 if (string_match(
3206 strrchr(name, '.'), WT_NAME_CACHE, strlen(WT_NAME_CACHE)))
3207 return (0);
3208
3209 if (names->list_cnt + 1 >= names->list_max) {
3210 if ((allocp = realloc(names->list,
3211 (names->list_max + 20) * sizeof(names->list[0]))) == NULL)
3212 return (os_errno());
3213 names->list = allocp;
3214 names->list_max += 20;
3215 }
3216 if ((names->list[names->list_cnt] = strdup(name)) == NULL)
3217 return (os_errno());
3218 ++names->list_cnt;
3219 names->list[names->list_cnt] = NULL;
3220 return (0);
3221 }
3222
3223 /*
3224 * helium_source_recover --
3225 * Recover the HELIUM_SOURCE.
3226 */
3227 static int
helium_source_recover(WT_DATA_SOURCE * wtds,HELIUM_SOURCE * hs,WT_CONFIG_ARG * config)3228 helium_source_recover(
3229 WT_DATA_SOURCE *wtds, HELIUM_SOURCE *hs, WT_CONFIG_ARG *config)
3230 {
3231 struct helium_namespace_cookie names;
3232 DATA_SOURCE *ds;
3233 WT_EXTENSION_API *wt_api;
3234 u_int i;
3235 int ret = 0;
3236
3237 ds = (DATA_SOURCE *)wtds;
3238 wt_api = ds->wt_api;
3239 memset(&names, 0, sizeof(names));
3240
3241 VMSG(wt_api, NULL, VERBOSE_L1, "recover %s", hs->name);
3242
3243 /* Get a list of the cache/primary object pairs in the Helium source. */
3244 if ((ret = he_enumerate(
3245 hs->device, helium_namespace_list, &names)) != 0)
3246 ERET(wt_api, NULL, ret,
3247 "he_enumerate: %s: %s", hs->name, he_strerror(ret));
3248
3249 /* Recover the objects. */
3250 for (i = 0; i < names.list_cnt; ++i)
3251 WT_ERR(helium_source_recover_namespace(
3252 wtds, hs, names.list[i], config));
3253
3254 err: for (i = 0; i < names.list_cnt; ++i)
3255 free(names.list[i]);
3256 free(names.list);
3257
3258 return (ret);
3259 }
3260
3261 /*
3262 * helium_terminate --
3263 * Unload the data-source.
3264 */
3265 static int
helium_terminate(WT_DATA_SOURCE * wtds,WT_SESSION * session)3266 helium_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
3267 {
3268 DATA_SOURCE *ds;
3269 HELIUM_SOURCE *hs, *last;
3270 WT_EXTENSION_API *wt_api;
3271 int ret = 0;
3272
3273 ds = (DATA_SOURCE *)wtds;
3274 wt_api = ds->wt_api;
3275
3276 /* Lock the system down. */
3277 if (ds->lockinit)
3278 ret = writelock(wt_api, session, &ds->global_lock);
3279
3280 /*
3281 * Close the Helium sources, close the Helium source that "owns" the
3282 * database transaction store last.
3283 */
3284 last = NULL;
3285 while ((hs = ds->hs_head) != NULL) {
3286 ds->hs_head = hs->next;
3287 if (hs->he_owner) {
3288 last = hs;
3289 continue;
3290 }
3291 ESET(helium_source_close(wt_api, session, hs));
3292 }
3293 if (last != NULL)
3294 ESET(helium_source_close(wt_api, session, last));
3295
3296 /* Unlock and destroy the system. */
3297 if (ds->lockinit) {
3298 ESET(unlock(wt_api, session, &ds->global_lock));
3299 ESET(lock_destroy(wt_api, NULL, &ds->global_lock));
3300 }
3301
3302 OVERWRITE_AND_FREE(ds);
3303
3304 return (ret);
3305 }
3306
3307 /*
3308 * wiredtiger_extension_init --
3309 * Initialize the Helium connector code.
3310 */
3311 int
wiredtiger_extension_init(WT_CONNECTION * connection,WT_CONFIG_ARG * config)3312 wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
3313 {
3314 /*
3315 * List of the WT_DATA_SOURCE methods -- it's static so it breaks at
3316 * compile-time should the structure change underneath us.
3317 */
3318 static const WT_DATA_SOURCE wtds = {
3319 NULL, /* No session.alter */
3320 helium_session_create, /* session.create */
3321 NULL, /* No session.compaction */
3322 helium_session_drop, /* session.drop */
3323 helium_session_open_cursor, /* session.open_cursor */
3324 helium_session_rename, /* session.rename */
3325 NULL, /* No session.salvage */
3326 helium_session_truncate, /* session.truncate */
3327 NULL, /* No session.range_truncate */
3328 helium_session_verify, /* session.verify */
3329 helium_session_checkpoint, /* session.checkpoint */
3330 helium_terminate /* termination */
3331 };
3332 static const char *session_create_opts[] = {
3333 "helium_o_compress=0", /* HE_O_COMPRESS */
3334 "helium_o_truncate=0", /* HE_O_TRUNCATE */
3335 NULL
3336 };
3337 DATA_SOURCE *ds;
3338 HELIUM_SOURCE *hs;
3339 WT_CONFIG_ITEM k, v;
3340 WT_CONFIG_PARSER *config_parser;
3341 WT_EXTENSION_API *wt_api;
3342 int vmajor, vminor, vpatch, ret = 0;
3343 const char **p;
3344
3345 config_parser = NULL;
3346 ds = NULL;
3347
3348 wt_api = connection->get_extension_api(connection);
3349
3350 /* Check the library version */
3351 #if HE_VERSION_MAJOR != 2 || HE_VERSION_MINOR != 12
3352 ERET(wt_api, NULL, EINVAL,
3353 "unsupported Helium header file %d.%d, expected version 2.12",
3354 HE_VERSION_MAJOR, HE_VERSION_MINOR);
3355 #endif
3356 (void)he_version(&vmajor, &vminor, &vpatch);
3357 if (vmajor != 2 || vminor != 12)
3358 ERET(wt_api, NULL, EINVAL,
3359 "unsupported Helium library version %d.%d, expected "
3360 "version 2.12", vmajor, vminor);
3361
3362 /* Allocate and initialize the local data-source structure. */
3363 if ((ds = calloc(1, sizeof(DATA_SOURCE))) == NULL)
3364 return (os_errno());
3365 ds->wtds = wtds;
3366 ds->wt_api = wt_api;
3367 WT_ERR(lock_init(wt_api, NULL, &ds->global_lock));
3368 ds->lockinit = true;
3369
3370 /* Step through the list of Helium sources, opening each one. */
3371 if ((ret = wt_api->config_parser_open_arg(
3372 wt_api, NULL, config, &config_parser)) != 0)
3373 EMSG_ERR(wt_api, NULL, ret,
3374 "WT_EXTENSION_API.config_parser_open: config: %s",
3375 wt_api->strerror(wt_api, NULL, ret));
3376 while ((ret = config_parser->next(config_parser, &k, &v)) == 0) {
3377 if (string_match("helium_verbose", k.str, k.len)) {
3378 verbose = v.val == 0 ? 0 : 1;
3379 continue;
3380 }
3381 if ((ret = helium_source_open(ds, &k, &v)) != 0)
3382 goto err;
3383 }
3384 if (ret != WT_NOTFOUND)
3385 EMSG_ERR(wt_api, NULL, ret,
3386 "WT_CONFIG_PARSER.next: config: %s",
3387 wt_api->strerror(wt_api, NULL, ret));
3388 if ((ret = config_parser->close(config_parser)) != 0)
3389 EMSG_ERR(wt_api, NULL, ret,
3390 "WT_CONFIG_PARSER.close: config: %s",
3391 wt_api->strerror(wt_api, NULL, ret));
3392 config_parser = NULL;
3393
3394 /*
3395 * Find and open the database transaction store, recover each Helium
3396 * source, then discard the transaction store's contents.
3397 */
3398 WT_ERR(helium_source_txn_open(ds));
3399 for (hs = ds->hs_head; hs != NULL; hs = hs->next)
3400 WT_ERR(helium_source_recover(&ds->wtds, hs, config));
3401 WT_ERR(helium_source_txn_truncate(ds));
3402
3403 /* Start each Helium source cleaner thread. */
3404 for (hs = ds->hs_head; hs != NULL; hs = hs->next)
3405 if ((ret = pthread_create(
3406 &hs->cleaner_id, NULL, cache_cleaner_worker, hs)) != 0)
3407 EMSG_ERR(wt_api, NULL, ret,
3408 "%s: pthread_create: cleaner thread: %s",
3409 hs->name, strerror(ret));
3410
3411 /* Add Helium-specific WT_SESSION.create configuration options. */
3412 for (p = session_create_opts; *p != NULL; ++p)
3413 if ((ret = connection->configure_method(connection,
3414 "WT_SESSION.create", "helium:", *p, "boolean", NULL)) != 0)
3415 EMSG_ERR(wt_api, NULL, ret,
3416 "WT_CONNECTION.configure_method: session.create: "
3417 "%s: %s",
3418 *p, wt_api->strerror(wt_api, NULL, ret));
3419
3420 /* Add the data source */
3421 if ((ret = connection->add_data_source(
3422 connection, "helium:", (WT_DATA_SOURCE *)ds, NULL)) != 0)
3423 EMSG_ERR(wt_api, NULL, ret,
3424 "WT_CONNECTION.add_data_source: %s",
3425 wt_api->strerror(wt_api, NULL, ret));
3426 return (0);
3427
3428 err: if (ds != NULL)
3429 ESET(helium_terminate((WT_DATA_SOURCE *)ds, NULL));
3430 if (config_parser != NULL)
3431 (void)config_parser->close(config_parser);
3432 return (ret);
3433 }
3434
3435 /*
3436 * wiredtiger_extension_terminate --
3437 * Shutdown the Helium connector code.
3438 */
3439 int
wiredtiger_extension_terminate(WT_CONNECTION * connection)3440 wiredtiger_extension_terminate(WT_CONNECTION *connection)
3441 {
3442 (void)connection; /* Unused parameters */
3443
3444 return (0);
3445 }
3446