1 /*-
2  * Public Domain 2014-2018 MongoDB, Inc.
3  * Public Domain 2008-2014 WiredTiger, Inc.
4  *
5  * This is free and unencumbered software released into the public domain.
6  *
7  * Anyone is free to copy, modify, publish, use, compile, sell, or
8  * distribute this software, either in source code form or as a compiled
9  * binary, for any purpose, commercial or non-commercial, and by any
10  * means.
11  *
12  * In jurisdictions that recognize copyright laws, the author or authors
13  * of this software dedicate any and all copyright interest in the
14  * software to the public domain. We make this dedication for the benefit
15  * of the public at large and to the detriment of our heirs and
16  * successors. We intend this dedication to be an overt act of
17  * relinquishment in perpetuity of all present and future rights to this
18  * software under copyright law.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23  * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
24  * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
25  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26  * OTHER DEALINGS IN THE SOFTWARE.
27  */
28 #include <sys/select.h>
29 
30 #include <ctype.h>
31 #include <errno.h>
32 #include <inttypes.h>
33 #include <pthread.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 
38 #include <he.h>
39 
40 #include <wiredtiger.h>
41 #include <wiredtiger_ext.h>
42 
43 typedef struct he_env	HE_ENV;
44 typedef struct he_item	HE_ITEM;
45 
46 static int verbose = 0;					/* Verbose messages */
47 
48 #define	WT_ERR(a) do {							\
49 	if ((ret = (a)) != 0)						\
50 		goto err;						\
51 } while (0)
52 #define	WT_RET(a) do {							\
53 	int __ret;							\
54 	if ((__ret = (a)) != 0)						\
55 		return (__ret);						\
56 } while (0)
57 
58 /*
59  * Macros to output error  and verbose messages, and set or return an error.
60  * Error macros require local "ret" variable.
61  *
62  * ESET: update an error value, handling more/less important errors.
63  * ERET: output a message, return the error.
64  * EMSG: output a message, set the local error value.
65  * EMSG_ERR:
66  *	 output a message, set the local error value, jump to the err label.
67  * VMSG: verbose message.
68  */
69 #undef	ESET
70 #define	ESET(a) do {							\
71 	int __v;							\
72 	if ((__v = (a)) != 0) {						\
73 		/*							\
74 		 * On error, check for a panic (it overrides all other	\
75 		 * returns).  Else, if there's no return value or the	\
76 		 * return value is not strictly an error, override it	\
77 		 * with the error.					\
78 		 */							\
79 		if (__v == WT_PANIC ||					\
80 		    ret == 0 ||						\
81 		    ret == WT_DUPLICATE_KEY || ret == WT_NOTFOUND)	\
82 			ret = __v;					\
83 		/*							\
84 		 * We don't want to return Helium errors to our caller.	\
85 		 * Map non-system errors (indicated by a negative	\
86 		 * value), outside the WiredTiger error name space, to a\
87 		 * generic WiredTiger error.				\
88 		 */							\
89 		if (ret < -31999 || (ret > -31800 && ret < 0))		\
90 			ret = WT_ERROR;					\
91 	}								\
92 } while (0)
93 #undef	ERET
94 #define	ERET(wt_api, session, v, ...) do {				\
95 	(void)								\
96 	    wt_api->err_printf(wt_api, session, "helium: " __VA_ARGS__);\
97 	ESET(v);							\
98 	return (ret);							\
99 } while (0)
100 #undef	EMSG
101 #define	EMSG(wt_api, session, v, ...) do {				\
102 	(void)								\
103 	    wt_api->err_printf(wt_api, session, "helium: " __VA_ARGS__);\
104 	ESET(v);							\
105 } while (0)
106 #undef	EMSG_ERR
107 #define	EMSG_ERR(wt_api, session, v, ...) do {				\
108 	(void)								\
109 	    wt_api->err_printf(wt_api, session, "helium: " __VA_ARGS__);\
110 	ESET(v);							\
111 	goto err;							\
112 } while (0)
113 #undef	VERBOSE_L1
114 #define	VERBOSE_L1	1
115 #undef	VERBOSE_L2
116 #define	VERBOSE_L2	2
117 #undef	VMSG
118 #define	VMSG(wt_api, session, v, ...) do {				\
119 	if (verbose >= v)						\
120 		(void)wt_api->						\
121 		    msg_printf(wt_api, session, "helium: " __VA_ARGS__);\
122 } while (0)
123 
124 /*
125  * OVERWRITE_AND_FREE --
126  *	Make sure we don't re-use a structure after it's dead.
127  */
128 #undef	OVERWRITE_AND_FREE
129 #define	OVERWRITE_AND_FREE(p) do {					\
130 	memset(p, 0xab, sizeof(*(p)));                         		\
131 	free(p);							\
132 } while (0)
133 
134 /*
135  * Version each object, out of sheer raging paranoia.
136  */
137 #define	WIREDTIGER_HELIUM_MAJOR	1		/* Major, minor version */
138 #define	WIREDTIGER_HELIUM_MINOR	0
139 
140 /*
141  * WiredTiger name space on the Helium store: all objects are named with the
142  * WiredTiger prefix (we don't require the Helium store be exclusive to our
143  * files).  Primary objects are named "WiredTiger.[name]", associated cache
144  * objects are "WiredTiger.[name].cache".  The per-connection transaction
145  * object is "WiredTiger.WiredTigerTxn".  When we first open a Helium volume,
146  * we open/close a file in order to apply flags for the first open of the
147  * volume, that's "WiredTiger.WiredTigerInit".
148  */
149 #define	WT_NAME_PREFIX	"WiredTiger."
150 #define	WT_NAME_INIT	"WiredTiger.WiredTigerInit"
151 #define	WT_NAME_TXN	"WiredTiger.WiredTigerTxn"
152 #define	WT_NAME_CACHE	".cache"
153 
154 /*
155  * WT_SOURCE --
156  *	A WiredTiger source, supporting one or more cursors.
157  */
158 typedef struct __wt_source {
159 	char *uri;				/* Unique name */
160 
161 	pthread_rwlock_t lock;			/* Lock */
162 	bool		 lockinit;		/* Lock created */
163 
164 	bool	configured;			/* If structure configured */
165 	u_int	ref;				/* Active reference count */
166 
167 	uint64_t append_recno;			/* Allocation record number */
168 
169 	bool	 config_bitfield;		/* config "value_format=#t" */
170 	bool	 config_recno;			/* config "key_format=r" */
171 
172 	/*
173 	 * Each WiredTiger object has a "primary" namespace in a Helium store
174 	 * plus a "cache" namespace, which has not-yet-resolved updates.  There
175 	 * is a dirty flag so read-only data sets can ignore the cache.
176 	 */
177 	he_t	he;				/* Underlying Helium object */
178 	he_t	he_cache;			/* Underlying Helium cache */
179 	bool	he_cache_inuse;			/* Cache is in use */
180 	int	he_cache_ops;			/* Operations since cleaning */
181 
182 	struct __he_source *hs;			/* Underlying Helium source */
183 	struct __wt_source *next;		/* List of WiredTiger objects */
184 } WT_SOURCE;
185 
186 /*
187  * HELIUM_SOURCE --
188  *	A Helium volume, supporting one or more WT_SOURCE objects.
189  */
190 typedef struct __he_source {
191 	/* The transaction commit handler must appear first in the structure. */
192 	WT_TXN_NOTIFY txn_notify;		/* Transaction commit handler */
193 
194 	WT_EXTENSION_API *wt_api;		/* Extension functions */
195 
196 	char *name;				/* Unique WiredTiger name */
197 	char *device;				/* Unique Helium volume name */
198 
199 	/*
200 	 * Maintain a handle for each underlying Helium source so checkpoint is
201 	 * faster, we can "commit" a single handle per source, regardless of the
202 	 * number of objects.
203 	 */
204 	he_t he_volume;
205 
206 	struct __wt_source *ws_head;		/* List of WiredTiger sources */
207 
208 	/*
209 	 * Each Helium source has a cleaner thread to migrate WiredTiger source
210 	 * updates from the cache namespace to the primary namespace, based on
211 	 * the number of bytes or the number of operations.  (There's a cleaner
212 	 * thread per Helium store so migration operations can overlap.)  We
213 	 * read these fields without a lock, but serialize writes to minimize
214 	 * races (and because it costs us nothing).
215 	 */
216 	pthread_t cleaner_id;			/* Cleaner thread ID */
217 	volatile int cleaner_stop;		/* Cleaner thread quit flag */
218 
219 	/*
220 	 * Each WiredTiger connection has a transaction namespace which lists
221 	 * resolved transactions with their committed or aborted state as a
222 	 * value.  That namespace appears in a single Helium store (the first
223 	 * one created, if it doesn't already exist), and then it's referenced
224 	 * from other Helium stores.
225 	 */
226 #define	TXN_ABORTED	'A'
227 #define	TXN_COMMITTED	'C'
228 #define	TXN_UNRESOLVED	0
229 	he_t	he_txn;				/* Helium txn store */
230 	bool	he_owner;			/* Owns transaction store */
231 
232 	struct __he_source *next;		/* List of Helium sources */
233 } HELIUM_SOURCE;
234 
235 /*
236  * DATA_SOURCE --
237  *	A WiredTiger data source, supporting one or more HELIUM_SOURCE objects.
238  */
239 typedef struct __data_source {
240 	WT_DATA_SOURCE wtds;			/* Must come first */
241 
242 	WT_EXTENSION_API *wt_api;		/* Extension functions */
243 
244 	pthread_rwlock_t global_lock;		/* Global lock */
245 	bool		 lockinit;		/* Lock created */
246 
247 	struct __he_source *hs_head;		/* List of Helium sources */
248 } DATA_SOURCE;
249 
250 /*
251  * CACHE_RECORD --
252  *	An array of updates from the cache object.
253  *
254  * Values in the cache store are marshalled/unmarshalled to/from the store,
255  * using a simple encoding:
256  *	{N records: 4B}
257  *	{record#1 TxnID: 8B}
258  *	{record#1 remove tombstone: 1B}
259  *	{record#1 data length: 4B}
260  *	{record#1 data}
261  *	...
262  *
263  * Each cursor potentially has a single set of these values.
264  */
265 typedef struct __cache_record {
266 	uint8_t	*v;				/* Value */
267 	uint32_t len;				/* Value length */
268 	uint64_t txnid;				/* Transaction ID */
269 #define	REMOVE_TOMBSTONE	'R'
270 	int	 remove;			/* 1/0 remove flag */
271 } CACHE_RECORD;
272 
273 /*
274  * CURSOR --
275  *	A cursor, supporting a single WiredTiger cursor.
276  */
277 typedef struct __cursor {
278 	WT_CURSOR wtcursor;			/* Must come first */
279 
280 	WT_EXTENSION_API *wt_api;		/* Extension functions */
281 
282 	WT_SOURCE *ws;				/* Underlying source */
283 
284 	HE_ITEM record;				/* Record */
285 	uint8_t  __key[HE_MAX_KEY_LEN];		/* Record.key, Record.value */
286 	uint8_t *v;
287 	size_t   len;
288 	size_t	 mem_len;
289 
290 	struct {
291 		uint8_t *v;			/* Temporary buffers */
292 		size_t   len;
293 		size_t   mem_len;
294 	} t1, t2, t3;
295 
296 	int	 config_append;			/* config "append" */
297 	int	 config_overwrite;		/* config "overwrite" */
298 
299 	CACHE_RECORD	*cache;			/* unmarshalled cache records */
300 	uint32_t	 cache_entries;		/* cache records */
301 	uint32_t	 cache_slots;		/* cache total record slots */
302 } CURSOR;
303 
304 /*
305  * prefix_match --
306  *	Return if a string matches a prefix.
307  */
308 static inline int
prefix_match(const char * str,const char * pfx)309 prefix_match(const char *str, const char *pfx)
310 {
311 	return (strncmp(str, pfx, strlen(pfx)) == 0);
312 }
313 
314 /*
315  * string_match --
316  *	Return if a string matches a byte string of len bytes.
317  */
318 static inline int
string_match(const char * str,const char * bytes,size_t len)319 string_match(const char *str, const char *bytes, size_t len)
320 {
321 	return (strncmp(str, bytes, len) == 0 && (str)[(len)] == '\0');
322 }
323 
324 /*
325  * cursor_destroy --
326  *	Free a cursor's memory, and optionally the cursor itself.
327  */
328 static void
cursor_destroy(CURSOR * cursor)329 cursor_destroy(CURSOR *cursor)
330 {
331 	if (cursor != NULL) {
332 		free(cursor->v);
333 		free(cursor->t1.v);
334 		free(cursor->t2.v);
335 		free(cursor->t3.v);
336 		free(cursor->cache);
337 		OVERWRITE_AND_FREE(cursor);
338 	}
339 }
340 
341 /*
342  * os_errno --
343  *	Limit our use of errno so it's easy to find/remove.
344  */
345 static int
os_errno(void)346 os_errno(void)
347 {
348 	return (errno);
349 }
350 
351 /*
352  * lock_init --
353  *	Initialize a lock.
354  */
355 static int
lock_init(WT_EXTENSION_API * wt_api,WT_SESSION * session,pthread_rwlock_t * lockp)356 lock_init(
357     WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp)
358 {
359 	int ret = 0;
360 
361 	if ((ret = pthread_rwlock_init(lockp, NULL)) != 0)
362 		ERET(wt_api, session, WT_PANIC,
363 		    "pthread_rwlock_init: %s", strerror(ret));
364 	return (0);
365 }
366 
367 /*
368  * lock_destroy --
369  *	Destroy a lock.
370  */
371 static int
lock_destroy(WT_EXTENSION_API * wt_api,WT_SESSION * session,pthread_rwlock_t * lockp)372 lock_destroy(
373     WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp)
374 {
375 	int ret = 0;
376 
377 	if ((ret = pthread_rwlock_destroy(lockp)) != 0)
378 		ERET(wt_api, session, WT_PANIC,
379 		    "pthread_rwlock_destroy: %s", strerror(ret));
380 	return (0);
381 }
382 
383 /*
384  * writelock --
385  *	Acquire a write lock.
386  */
387 static inline int
writelock(WT_EXTENSION_API * wt_api,WT_SESSION * session,pthread_rwlock_t * lockp)388 writelock(
389     WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp)
390 {
391 	int ret = 0;
392 
393 	if ((ret = pthread_rwlock_wrlock(lockp)) != 0)
394 		ERET(wt_api, session, WT_PANIC,
395 		    "pthread_rwlock_wrlock: %s", strerror(ret));
396 	return (0);
397 }
398 
399 /*
400  * unlock --
401  *	Release a lock.
402  */
403 static inline int
unlock(WT_EXTENSION_API * wt_api,WT_SESSION * session,pthread_rwlock_t * lockp)404 unlock(WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp)
405 {
406 	int ret = 0;
407 
408 	if ((ret = pthread_rwlock_unlock(lockp)) != 0)
409 		ERET(wt_api, session, WT_PANIC,
410 		    "pthread_rwlock_unlock: %s", strerror(ret));
411 	return (0);
412 }
413 
414 #if 0
415 /*
416  * helium_dump_kv --
417  *	Dump a Helium record.
418  */
419 static void
420 helium_dump_kv(const char *pfx, uint8_t *p, size_t len, FILE *fp)
421 {
422 	(void)fprintf(stderr, "%s %3zu: ", pfx, len);
423 	for (; len > 0; --len, ++p)
424 		if (!isspace(*p) && isprint(*p))
425 			(void)putc(*p, fp);
426 		else if (len == 1 && *p == '\0')	/* Skip string nuls. */
427 			continue;
428 		else
429 			(void)fprintf(fp, "%#x", *p);
430 	(void)putc('\n', fp);
431 }
432 
433 /*
434  * helium_dump --
435  *	Dump the records in a Helium store.
436  */
437 static int
438 helium_dump(WT_EXTENSION_API *wt_api, he_t he, const char *tag)
439 {
440 	HE_ITEM *r, _r;
441 	uint8_t k[4 * 1024], v[4 * 1024];
442 	int ret = 0;
443 
444 	r = &_r;
445 	memset(r, 0, sizeof(*r));
446 	r->key = k;
447 	r->val = v;
448 
449 	(void)fprintf(stderr, "== %s\n", tag);
450 	while ((ret = he_next(he, r, (size_t)0, sizeof(v))) == 0) {
451 #if 0
452 		uint64_t recno;
453 		WT_RET(wt_api->struct_unpack(wt_api,
454 		    NULL, r->key, r->key_len, "r", &recno));
455 		fprintf(stderr, "K: %" PRIu64, recno);
456 #else
457 		helium_dump_kv("K: ", r->key, r->key_len, stderr);
458 #endif
459 		helium_dump_kv("V: ", r->val, r->val_len, stderr);
460 	}
461 	if (ret != HE_ERR_ITEM_NOT_FOUND) {
462 		fprintf(stderr, "he_next: %s\n", he_strerror(ret));
463 		ret = WT_ERROR;
464 	}
465 	return (ret);
466 }
467 
468 /*
469  * helium_stats --
470  *	Display Helium statistics for a datastore.
471  */
472 static int
473 helium_stats(
474     WT_EXTENSION_API *wt_api, WT_SESSION *session, he_t he, const char *tag)
475 {
476 	HE_STATS stats;
477 	int ret = 0;
478 
479 	if ((ret = he_stats(he, &stats)) != 0)
480 		ERET(wt_api, session, ret, "he_stats: %s", he_strerror(ret));
481 	fprintf(stderr, "== %s\n", tag);
482 	fprintf(stderr, "name=%s\n", stats.name);
483 	fprintf(stderr, "deleted_items=%" PRIu64 "\n", stats.deleted_items);
484 	fprintf(stderr, "locked_items=%" PRIu64 "\n", stats.locked_items);
485 	fprintf(stderr, "valid_items=%" PRIu64 "\n", stats.valid_items);
486 	fprintf(stderr, "capacity=%" PRIu64 "B\n", stats.capacity);
487 	fprintf(stderr, "size=%" PRIu64 "B\n", stats.size);
488 	return (0);
489 }
490 #endif
491 
492 /*
493  * helium_call --
494  *	Call a Helium key retrieval function, handling overflow.
495  */
496 static inline int
helium_call(WT_CURSOR * wtcursor,const char * fname,he_t he,int (* f)(he_t,HE_ITEM *,size_t,size_t))497 helium_call(WT_CURSOR *wtcursor, const char *fname,
498     he_t he, int (*f)(he_t, HE_ITEM *, size_t, size_t))
499 {
500 	CURSOR *cursor;
501 	HE_ITEM *r;
502 	WT_EXTENSION_API *wt_api;
503 	WT_SESSION *session;
504 	int ret = 0;
505 	char *p;
506 
507 	session = wtcursor->session;
508 	cursor = (CURSOR *)wtcursor;
509 	wt_api = cursor->wt_api;
510 
511 	r = &cursor->record;
512 	r->val = cursor->v;
513 
514 restart:
515 	if ((ret = f(he, r, (size_t)0, cursor->mem_len)) != 0) {
516 		if (ret == HE_ERR_ITEM_NOT_FOUND)
517 			return (WT_NOTFOUND);
518 		ERET(wt_api, session, ret, "%s: %s", fname, he_strerror(ret));
519 	}
520 
521 	/*
522 	 * If the returned length is larger than our passed-in length, we didn't
523 	 * get the complete value.  Grow the buffer and use he_lookup to do the
524 	 * retrieval (he_lookup because the call succeeded and the key was
525 	 * copied out, so calling he_next/he_prev again would skip key/value
526 	 * pairs).
527 	 *
528 	 * We have to loop, another thread of control might change the length of
529 	 * the value, requiring we grow our buffer multiple times.
530 	 *
531 	 * We have to potentially restart the entire call in case the underlying
532 	 * key/value disappears.
533 	 */
534 	for (;;) {
535 		if (cursor->mem_len >= r->val_len) {
536 			cursor->len = r->val_len;
537 			return (0);
538 		}
539 
540 		/* Grow the value buffer. */
541 		if ((p = realloc(cursor->v, r->val_len + 32)) == NULL)
542 			return (os_errno());
543 		cursor->v = r->val = p;
544 		cursor->mem_len = r->val_len + 32;
545 
546 		if ((ret = he_lookup(he, r, (size_t)0, cursor->mem_len)) != 0) {
547 			if (ret == HE_ERR_ITEM_NOT_FOUND)
548 				goto restart;
549 			ERET(wt_api,
550 			    session, ret, "he_lookup: %s", he_strerror(ret));
551 		}
552 	}
553 	/* NOTREACHED */
554 }
555 
556 /*
557  * txn_state_set --
558  *	Resolve a transaction.
559  */
560 static int
txn_state_set(WT_EXTENSION_API * wt_api,WT_SESSION * session,HELIUM_SOURCE * hs,uint64_t txnid,int commit)561 txn_state_set(WT_EXTENSION_API *wt_api,
562     WT_SESSION *session, HELIUM_SOURCE *hs, uint64_t txnid, int commit)
563 {
564 	HE_ITEM txn;
565 	uint8_t val;
566 	int ret = 0;
567 
568 	/*
569 	 * Update the store -- commits must be durable, flush the volume.
570 	 *
571 	 * XXX
572 	 * Not endian-portable, we're writing a native transaction ID to the
573 	 * store.
574 	 */
575 	memset(&txn, 0, sizeof(txn));
576 	txn.key = &txnid;
577 	txn.key_len = sizeof(txnid);
578 	val = commit ? TXN_COMMITTED : TXN_ABORTED;
579 	txn.val = &val;
580 	txn.val_len = sizeof(val);
581 
582 	if ((ret = he_update(hs->he_txn, &txn)) != 0)
583 		ERET(wt_api, session, ret, "he_update: %s", he_strerror(ret));
584 
585 	if (commit && (ret = he_commit(hs->he_txn)) != 0)
586 		ERET(wt_api, session, ret, "he_commit: %s", he_strerror(ret));
587 	return (0);
588 }
589 
590 /*
591  * txn_notify --
592  *	Resolve a transaction; called from WiredTiger during commit/abort.
593  */
594 static int
txn_notify(WT_TXN_NOTIFY * handler,WT_SESSION * session,uint64_t txnid,int committed)595 txn_notify(WT_TXN_NOTIFY *handler,
596     WT_SESSION *session, uint64_t txnid, int committed)
597 {
598 	HELIUM_SOURCE *hs;
599 
600 	hs = (HELIUM_SOURCE *)handler;
601 	return (txn_state_set(hs->wt_api, session, hs, txnid, committed));
602 }
603 
604 /*
605  * txn_state --
606  *	Return a transaction's state.
607  */
608 static int
txn_state(WT_CURSOR * wtcursor,uint64_t txnid)609 txn_state(WT_CURSOR *wtcursor, uint64_t txnid)
610 {
611 	CURSOR *cursor;
612 	HE_ITEM txn;
613 	HELIUM_SOURCE *hs;
614 	uint8_t val_buf[16];
615 
616 	cursor = (CURSOR *)wtcursor;
617 	hs = cursor->ws->hs;
618 
619 	memset(&txn, 0, sizeof(txn));
620 	txn.key = &txnid;
621 	txn.key_len = sizeof(txnid);
622 	txn.val = val_buf;
623 	txn.val_len = sizeof(val_buf);
624 
625 	if (he_lookup(hs->he_txn, &txn, (size_t)0, sizeof(val_buf)) == 0)
626 		return (val_buf[0]);
627 	return (TXN_UNRESOLVED);
628 }
629 
630 /*
631  * cache_value_append --
632  *	Append the current WiredTiger cursor's value to a cache record.
633  */
634 static int
cache_value_append(WT_CURSOR * wtcursor,int remove_op)635 cache_value_append(WT_CURSOR *wtcursor, int remove_op)
636 {
637 	CURSOR *cursor;
638 	HE_ITEM *r;
639 	WT_EXTENSION_API *wt_api;
640 	WT_SESSION *session;
641 	uint64_t txnid;
642 	size_t len;
643 	uint32_t entries;
644 	uint8_t *p;
645 
646 	session = wtcursor->session;
647 	cursor = (CURSOR *)wtcursor;
648 	wt_api = cursor->wt_api;
649 
650 	r = &cursor->record;
651 
652 	/*
653 	 * A cache update is 4B that counts the number of entries in the update,
654 	 * followed by sets of: 8B of txn ID then either a remove tombstone or a
655 	 * 4B length and variable-length data pair.  Grow the value buffer, then
656 	 * append the cursor's information.
657 	 */
658 	len = cursor->len +				/* current length */
659 	    sizeof(uint32_t) +				/* entries */
660 	    sizeof(uint64_t) +				/* txn ID */
661 	    1 +						/* remove byte */
662 	    (remove_op ? 0 :				/* optional data */
663 	    sizeof(uint32_t) + wtcursor->value.size) +
664 	    32;						/* slop */
665 
666 	if (len > cursor->mem_len) {
667 		if ((p = realloc(cursor->v, len)) == NULL)
668 			return (os_errno());
669 		cursor->v = p;
670 		cursor->mem_len = len;
671 	}
672 
673 	/* Get the transaction ID. */
674 	txnid = wt_api->transaction_id(wt_api, session);
675 
676 	/* Update the number of records in this value. */
677 	if (cursor->len == 0) {
678 		entries = 1;
679 		cursor->len = sizeof(uint32_t);
680 	} else {
681 		memcpy(&entries, cursor->v, sizeof(uint32_t));
682 		++entries;
683 	}
684 	memcpy(cursor->v, &entries, sizeof(uint32_t));
685 
686 	/*
687 	 * Copy the WiredTiger cursor's data into place: txn ID, remove
688 	 * tombstone, data length, data.
689 	 *
690 	 * XXX
691 	 * Not endian-portable, we're writing a native transaction ID to the
692 	 * store.
693 	 */
694 	p = cursor->v + cursor->len;
695 	memcpy(p, &txnid, sizeof(uint64_t));
696 	p += sizeof(uint64_t);
697 	if (remove_op)
698 		*p++ = REMOVE_TOMBSTONE;
699 	else {
700 		*p++ = ' ';
701 		memcpy(p, &wtcursor->value.size, sizeof(uint32_t));
702 		p += sizeof(uint32_t);
703 		memcpy(p, wtcursor->value.data, wtcursor->value.size);
704 		p += wtcursor->value.size;
705 	}
706 	cursor->len = (size_t)(p - cursor->v);
707 
708 	/* Update the underlying Helium record. */
709 	r->val = cursor->v;
710 	r->val_len = cursor->len;
711 
712 	return (0);
713 }
714 
715 /*
716  * cache_value_unmarshall --
717  *	Unmarshall a cache value into a set of records.
718  */
719 static int
cache_value_unmarshall(WT_CURSOR * wtcursor)720 cache_value_unmarshall(WT_CURSOR *wtcursor)
721 {
722 	CACHE_RECORD *cp;
723 	CURSOR *cursor;
724 	uint32_t entries, i;
725 	uint8_t *p;
726 	int ret = 0;
727 
728 	cursor = (CURSOR *)wtcursor;
729 
730 	/* If we don't have enough record slots, allocate some more. */
731 	memcpy(&entries, cursor->v, sizeof(uint32_t));
732 	if (entries > cursor->cache_slots) {
733 		if ((p = realloc(cursor->cache,
734 		    (entries + 20) * sizeof(cursor->cache[0]))) == NULL)
735 			return (os_errno());
736 
737 		cursor->cache = (CACHE_RECORD *)p;
738 		cursor->cache_slots = entries + 20;
739 	}
740 
741 	/* Walk the value, splitting it up into records. */
742 	p = cursor->v + sizeof(uint32_t);
743 	for (i = 0, cp = cursor->cache; i < entries; ++i, ++cp) {
744 		memcpy(&cp->txnid, p, sizeof(uint64_t));
745 		p += sizeof(uint64_t);
746 		cp->remove = *p++ == REMOVE_TOMBSTONE ? 1 : 0;
747 		if (!cp->remove) {
748 			memcpy(&cp->len, p, sizeof(uint32_t));
749 			p += sizeof(uint32_t);
750 			cp->v = p;
751 			p += cp->len;
752 		}
753 	}
754 	cursor->cache_entries = entries;
755 
756 	return (ret);
757 }
758 
759 /*
760  * cache_value_aborted --
761  *	Return if a transaction has been aborted.
762  */
763 static inline int
cache_value_aborted(WT_CURSOR * wtcursor,CACHE_RECORD * cp)764 cache_value_aborted(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
765 {
766 	/*
767 	 * This function exists as a place to hang this comment.
768 	 *
769 	 * WiredTiger resets updated entry transaction IDs to an aborted state
770 	 * on rollback; to do that here would require tracking updated entries
771 	 * for a transaction or scanning the cache for updates made on behalf
772 	 * of the transaction during rollback, expensive stuff.  Instead, check
773 	 * if the transaction has been aborted before calling the underlying
774 	 * WiredTiger visibility function.
775 	 */
776 	return (txn_state(wtcursor, cp->txnid) == TXN_ABORTED ? 1 : 0);
777 }
778 
779 /*
780  * cache_value_committed --
781  *	Return if a transaction has been committed.
782  */
783 static inline int
cache_value_committed(WT_CURSOR * wtcursor,CACHE_RECORD * cp)784 cache_value_committed(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
785 {
786 	return (txn_state(wtcursor, cp->txnid) == TXN_COMMITTED ? 1 : 0);
787 }
788 
789 /*
790  * cache_value_update_check --
791  *	Return if an update can proceed based on the previous updates made to
792  * the cache entry.
793  */
794 static int
cache_value_update_check(WT_CURSOR * wtcursor)795 cache_value_update_check(WT_CURSOR *wtcursor)
796 {
797 	CACHE_RECORD *cp;
798 	CURSOR *cursor;
799 	WT_EXTENSION_API *wt_api;
800 	WT_SESSION *session;
801 	u_int i;
802 
803 	session = wtcursor->session;
804 	cursor = (CURSOR *)wtcursor;
805 	wt_api = cursor->wt_api;
806 
807 	/* Only interesting for snapshot isolation. */
808 	if (wt_api->
809 	    transaction_isolation_level(wt_api, session) != WT_TXN_ISO_SNAPSHOT)
810 		return (0);
811 
812 	/*
813 	 * If there's an entry that's not visible and hasn't been aborted,
814 	 * return a deadlock.
815 	 */
816 	for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp)
817 		if (!cache_value_aborted(wtcursor, cp) &&
818 		    !wt_api->transaction_visible(wt_api, session, cp->txnid))
819 			return (WT_ROLLBACK);
820 	return (0);
821 }
822 
823 /*
824  * cache_value_visible --
825  *	Return the most recent cache entry update visible to the running
826  * transaction.
827  */
828 static int
cache_value_visible(WT_CURSOR * wtcursor,CACHE_RECORD ** cpp)829 cache_value_visible(WT_CURSOR *wtcursor, CACHE_RECORD **cpp)
830 {
831 	CACHE_RECORD *cp;
832 	CURSOR *cursor;
833 	WT_EXTENSION_API *wt_api;
834 	WT_SESSION *session;
835 	u_int i;
836 
837 	*cpp = NULL;
838 
839 	session = wtcursor->session;
840 	cursor = (CURSOR *)wtcursor;
841 	wt_api = cursor->wt_api;
842 
843 	/*
844 	 * We want the most recent cache entry update; the cache entries are
845 	 * in update order, walk from the end to the beginning.
846 	 */
847 	cp = cursor->cache + cursor->cache_entries;
848 	for (i = 0; i < cursor->cache_entries; ++i) {
849 		--cp;
850 		if (!cache_value_aborted(wtcursor, cp) &&
851 		    wt_api->transaction_visible(wt_api, session, cp->txnid)) {
852 			*cpp = cp;
853 			return (1);
854 		}
855 	}
856 	return (0);
857 }
858 
859 /*
860  * cache_value_visible_all --
861  *	Return if a cache entry has no updates that aren't globally visible.
862  */
863 static int
cache_value_visible_all(WT_CURSOR * wtcursor,uint64_t oldest)864 cache_value_visible_all(WT_CURSOR *wtcursor, uint64_t oldest)
865 {
866 	CACHE_RECORD *cp;
867 	CURSOR *cursor;
868 	u_int i;
869 
870 	cursor = (CURSOR *)wtcursor;
871 
872 	/*
873 	 * Compare the update's transaction ID and the oldest transaction ID
874 	 * not yet visible to a running transaction.  If there's an update a
875 	 * running transaction might want, the entry must remain in the cache.
876 	 * (We could tighten this requirement: if the only update required is
877 	 * also the update we'd migrate to the primary, it would still be OK
878 	 * to migrate it.)
879 	 */
880 	for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp)
881 		if (cp->txnid >= oldest)
882 			return (0);
883 	return (1);
884 }
885 
886 /*
887  * cache_value_last_committed --
888  *	Find the most recent update in a cache entry, recovery processing.
889  */
890 static void
cache_value_last_committed(WT_CURSOR * wtcursor,CACHE_RECORD ** cpp)891 cache_value_last_committed(WT_CURSOR *wtcursor, CACHE_RECORD **cpp)
892 {
893 	CACHE_RECORD *cp;
894 	CURSOR *cursor;
895 	u_int i;
896 
897 	*cpp = NULL;
898 
899 	cursor = (CURSOR *)wtcursor;
900 
901 	/*
902 	 * Find the most recent update in the cache record, we're going to try
903 	 * and migrate it into the primary, recovery version.
904 	 *
905 	 * We know the entry is visible, but it must have been committed before
906 	 * the failure to be migrated.
907 	 *
908 	 * Cache entries are in update order, walk from end to beginning.
909 	 */
910 	cp = cursor->cache + cursor->cache_entries;
911 	for (i = 0; i < cursor->cache_entries; ++i) {
912 		--cp;
913 		if (cache_value_committed(wtcursor, cp)) {
914 			*cpp = cp;
915 			return;
916 		}
917 	}
918 }
919 
920 /*
921  * cache_value_last_not_aborted --
922  *	Find the most recent update in a cache entry, normal processing.
923  */
924 static void
cache_value_last_not_aborted(WT_CURSOR * wtcursor,CACHE_RECORD ** cpp)925 cache_value_last_not_aborted(WT_CURSOR *wtcursor, CACHE_RECORD **cpp)
926 {
927 	CACHE_RECORD *cp;
928 	CURSOR *cursor;
929 	u_int i;
930 
931 	*cpp = NULL;
932 
933 	cursor = (CURSOR *)wtcursor;
934 
935 	/*
936 	 * Find the most recent update in the cache record, we're going to try
937 	 * and migrate it into the primary, normal processing version.
938 	 *
939 	 * We don't have to check if the entry was committed, we've already
940 	 * confirmed all entries for this cache key are globally visible, which
941 	 * means they must be either committed or aborted.
942 	 *
943 	 * Cache entries are in update order, walk from end to beginning.
944 	 */
945 	cp = cursor->cache + cursor->cache_entries;
946 	for (i = 0; i < cursor->cache_entries; ++i) {
947 		--cp;
948 		if (!cache_value_aborted(wtcursor, cp)) {
949 			*cpp = cp;
950 			return;
951 		}
952 	}
953 }
954 
955 /*
956  * cache_value_txnmin --
957  *	Return the oldest transaction ID involved in a cache update.
958  */
959 static void
cache_value_txnmin(WT_CURSOR * wtcursor,uint64_t * txnminp)960 cache_value_txnmin(WT_CURSOR *wtcursor, uint64_t *txnminp)
961 {
962 	CACHE_RECORD *cp;
963 	CURSOR *cursor;
964 	uint64_t txnmin;
965 	u_int i;
966 
967 	cursor = (CURSOR *)wtcursor;
968 
969 	/* Return the oldest transaction ID for in the cache entry. */
970 	txnmin = UINT64_MAX;
971 	for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp)
972 		if (txnmin > cp->txnid)
973 			txnmin = cp->txnid;
974 	*txnminp = txnmin;
975 }
976 
977 /*
978  * key_max_err --
979  *	Common error when a WiredTiger key is too large.
980  */
981 static int
key_max_err(WT_EXTENSION_API * wt_api,WT_SESSION * session,size_t len)982 key_max_err(WT_EXTENSION_API *wt_api, WT_SESSION *session, size_t len)
983 {
984 	int ret = 0;
985 
986 	ERET(wt_api, session, EINVAL,
987 	    "key length (%zu bytes) larger than the maximum Helium "
988 	    "key length of %d bytes",
989 	    len, HE_MAX_KEY_LEN);
990 }
991 
992 /*
993  * copyin_key --
994  *	Copy a WT_CURSOR key to a HE_ITEM key.
995  */
996 static inline int
copyin_key(WT_CURSOR * wtcursor,int allocate_key)997 copyin_key(WT_CURSOR *wtcursor, int allocate_key)
998 {
999 	CURSOR *cursor;
1000 	HE_ITEM *r;
1001 	WT_EXTENSION_API *wt_api;
1002 	WT_SESSION *session;
1003 	WT_SOURCE *ws;
1004 	size_t size;
1005 
1006 	session = wtcursor->session;
1007 	cursor = (CURSOR *)wtcursor;
1008 	ws = cursor->ws;
1009 	wt_api = cursor->wt_api;
1010 
1011 	r = &cursor->record;
1012 	if (ws->config_recno) {
1013 		/*
1014 		 * Allocate a new record for append operations.
1015 		 *
1016 		 * A specified record number could potentially be larger than
1017 		 * the maximum known record number, update the maximum number
1018 		 * as necessary.
1019 		 *
1020 		 * Assume we can compare 8B values without locking them, and
1021 		 * test again after acquiring the lock.
1022 		 *
1023 		 * XXX
1024 		 * If the put fails for some reason, we'll have incremented the
1025 		 * maximum record number past the correct point.  I can't think
1026 		 * of a reason any application would care or notice, but it's
1027 		 * not quite right.
1028 		 */
1029 		if (allocate_key && cursor->config_append) {
1030 			WT_RET(writelock(wt_api, session, &ws->lock));
1031 			wtcursor->recno = ++ws->append_recno;
1032 			WT_RET(unlock(wt_api, session, &ws->lock));
1033 		} else if (wtcursor->recno > ws->append_recno) {
1034 			WT_RET(writelock(wt_api, session, &ws->lock));
1035 			if (wtcursor->recno > ws->append_recno)
1036 				ws->append_recno = wtcursor->recno;
1037 			WT_RET(unlock(wt_api, session, &ws->lock));
1038 		}
1039 
1040 		WT_RET(wt_api->struct_size(wt_api,
1041 		    session, &size, "r", wtcursor->recno));
1042 		WT_RET(wt_api->struct_pack(wt_api,
1043 		    session, r->key, HE_MAX_KEY_LEN, "r", wtcursor->recno));
1044 		r->key_len = size;
1045 	} else {
1046 		/* I'm not sure this test is necessary, but it's cheap. */
1047 		if (wtcursor->key.size > HE_MAX_KEY_LEN)
1048 			return (
1049 			    key_max_err(wt_api, session, wtcursor->key.size));
1050 
1051 		/*
1052 		 * A set cursor key might reference application memory, which
1053 		 * is only OK until the cursor operation has been called (in
1054 		 * other words, we can only reference application memory from
1055 		 * the WT_CURSOR.set_key call until the WT_CURSOR.op call).
1056 		 * For this reason, do a full copy, don't just reference the
1057 		 * WT_CURSOR key's data.
1058 		 */
1059 		memcpy(r->key, wtcursor->key.data, wtcursor->key.size);
1060 		r->key_len = wtcursor->key.size;
1061 	}
1062 	return (0);
1063 }
1064 
1065 /*
1066  * copyout_key --
1067  *	Copy a HE_ITEM key to a WT_CURSOR key.
1068  */
1069 static inline int
copyout_key(WT_CURSOR * wtcursor)1070 copyout_key(WT_CURSOR *wtcursor)
1071 {
1072 	CURSOR *cursor;
1073 	HE_ITEM *r;
1074 	WT_EXTENSION_API *wt_api;
1075 	WT_SESSION *session;
1076 	WT_SOURCE *ws;
1077 
1078 	session = wtcursor->session;
1079 	cursor = (CURSOR *)wtcursor;
1080 	wt_api = cursor->wt_api;
1081 	ws = cursor->ws;
1082 
1083 	r = &cursor->record;
1084 	if (ws->config_recno)
1085 		WT_RET(wt_api->struct_unpack(wt_api,
1086 		    session, r->key, r->key_len, "r", &wtcursor->recno));
1087 	else {
1088 		wtcursor->key.data = r->key;
1089 		wtcursor->key.size = (size_t)r->key_len;
1090 	}
1091 	return (0);
1092 }
1093 
1094 /*
1095  * copyout_val --
1096  *	Copy a Helium store's HE_ITEM value to a WT_CURSOR value.
1097  */
1098 static inline int
copyout_val(WT_CURSOR * wtcursor,CACHE_RECORD * cp)1099 copyout_val(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
1100 {
1101 	CURSOR *cursor;
1102 
1103 	cursor = (CURSOR *)wtcursor;
1104 
1105 	if (cp == NULL) {
1106 		wtcursor->value.data = cursor->v;
1107 		wtcursor->value.size = cursor->len;
1108 	} else {
1109 		wtcursor->value.data = cp->v;
1110 		wtcursor->value.size = cp->len;
1111 	}
1112 	return (0);
1113 }
1114 
1115 /*
1116  * nextprev --
1117  *	Cursor next/prev.
1118  */
1119 static int
nextprev(WT_CURSOR * wtcursor,const char * fname,int (* f)(he_t,HE_ITEM *,size_t,size_t))1120 nextprev(WT_CURSOR *wtcursor, const char *fname,
1121     int (*f)(he_t, HE_ITEM *, size_t, size_t))
1122 {
1123 	CACHE_RECORD *cp;
1124 	CURSOR *cursor;
1125 	HE_ITEM *r;
1126 	WT_EXTENSION_API *wt_api;
1127 	WT_ITEM a, b;
1128 	WT_SESSION *session;
1129 	WT_SOURCE *ws;
1130 	int cache_ret, cache_rm, cmp, ret = 0;
1131 	void *p;
1132 
1133 	session = wtcursor->session;
1134 	cursor = (CURSOR *)wtcursor;
1135 	ws = cursor->ws;
1136 	wt_api = cursor->wt_api;
1137 	r = &cursor->record;
1138 
1139 	cache_rm = 0;
1140 
1141 	/*
1142 	 * If the cache isn't yet in use, it's a simpler problem, just check
1143 	 * the store.  We don't care if we race, we're not guaranteeing any
1144 	 * special behavior with respect to phantoms.
1145 	 */
1146 	if (!ws->he_cache_inuse) {
1147 		cache_ret = WT_NOTFOUND;
1148 		goto cache_clean;
1149 	}
1150 
1151 skip_deleted:
1152 	/*
1153 	 * The next/prev key/value pair might be in the cache, which means we
1154 	 * are making two calls and returning the best choice.  As each call
1155 	 * overwrites both key and value, we have to have a copy of the key
1156 	 * for the second call plus the returned key and value from the first
1157 	 * call. That's why each cursor has 3 temporary buffers.
1158 	 *
1159 	 * First, copy the key.
1160 	 */
1161 	if (cursor->t1.mem_len < r->key_len) {
1162 		if ((p = realloc(cursor->t1.v, r->key_len)) == NULL)
1163 			return (os_errno());
1164 		cursor->t1.v = p;
1165 		cursor->t1.mem_len = r->key_len;
1166 	}
1167 	memcpy(cursor->t1.v, r->key, r->key_len);
1168 	cursor->t1.len = r->key_len;
1169 
1170 	/*
1171 	 * Move through the cache until we either find a record with a visible
1172 	 * entry, or we reach the end/beginning.
1173 	 */
1174 	for (cache_rm = 0;;) {
1175 		if ((ret = helium_call(wtcursor, fname, ws->he_cache, f)) != 0)
1176 			break;
1177 		WT_RET(cache_value_unmarshall(wtcursor));
1178 
1179 		/* If there's no visible entry, move to the next one. */
1180 		if (!cache_value_visible(wtcursor, &cp))
1181 			continue;
1182 
1183 		/*
1184 		 * If the entry has been deleted, remember that and continue.
1185 		 * We can't just skip the entry because it might be a delete
1186 		 * of an entry in the primary store, which means the cache
1187 		 * entry stops us from returning the primary store's entry.
1188 		 */
1189 		if (cp->remove)
1190 			cache_rm = 1;
1191 
1192 		/*
1193 		 * Copy the cache key. If the cache's entry wasn't a delete,
1194 		 * copy the value as well, we may return the cache entry.
1195 		 */
1196 		if (cursor->t2.mem_len < r->key_len) {
1197 			if ((p = realloc(cursor->t2.v, r->key_len)) == NULL)
1198 				return (os_errno());
1199 			cursor->t2.v = p;
1200 			cursor->t2.mem_len = r->key_len;
1201 		}
1202 		memcpy(cursor->t2.v, r->key, r->key_len);
1203 		cursor->t2.len = r->key_len;
1204 
1205 		if (cache_rm)
1206 			break;
1207 
1208 		if (cursor->t3.mem_len < cp->len) {
1209 			if ((p = realloc(cursor->t3.v, cp->len)) == NULL)
1210 				return (os_errno());
1211 			cursor->t3.v = p;
1212 			cursor->t3.mem_len = cp->len;
1213 		}
1214 		memcpy(cursor->t3.v, cp->v, cp->len);
1215 		cursor->t3.len = cp->len;
1216 
1217 		break;
1218 	}
1219 	if (ret != 0 && ret != WT_NOTFOUND)
1220 		return (ret);
1221 	cache_ret = ret;
1222 
1223 	/* Copy the original key back into place. */
1224 	memcpy(r->key, cursor->t1.v, cursor->t1.len);
1225 	r->key_len = cursor->t1.len;
1226 
1227 cache_clean:
1228 	/* Get the next/prev entry from the store. */
1229 	ret = helium_call(wtcursor, fname, ws->he, f);
1230 	if (ret != 0 && ret != WT_NOTFOUND)
1231 		return (ret);
1232 
1233 	/* If no entries in either the cache or the primary, we're done. */
1234 	if (cache_ret == WT_NOTFOUND && ret == WT_NOTFOUND)
1235 		return (WT_NOTFOUND);
1236 
1237 	/*
1238 	 * If both the cache and the primary had entries, decide which is a
1239 	 * better choice and pretend we didn't find the other one.
1240 	 */
1241 	if (cache_ret == 0 && ret == 0) {
1242 		a.data = r->key;		/* a is the primary */
1243 		a.size = (uint32_t)r->key_len;
1244 		b.data = cursor->t2.v;		/* b is the cache */
1245 		b.size = (uint32_t)cursor->t2.len;
1246 		WT_RET(wt_api->collate(wt_api, session, NULL, &a, &b, &cmp));
1247 
1248 		if (f == he_next) {
1249 			if (cmp >= 0)
1250 				ret = WT_NOTFOUND;
1251 			else
1252 				cache_ret = WT_NOTFOUND;
1253 		} else {
1254 			if (cmp <= 0)
1255 				ret = WT_NOTFOUND;
1256 			else
1257 				cache_ret = WT_NOTFOUND;
1258 		}
1259 	}
1260 
1261 	/*
1262 	 * If the cache is the key we'd choose, but it's a delete, skip past it
1263 	 * by moving from the deleted key to the next/prev item in either the
1264 	 * primary or the cache.
1265 	 */
1266 	if (cache_ret == 0 && cache_rm) {
1267 		memcpy(r->key, cursor->t2.v, cursor->t2.len);
1268 		r->key_len = cursor->t2.len;
1269 		goto skip_deleted;
1270 	}
1271 
1272 	/* If taking the cache's entry, copy the value into place. */
1273 	if (cache_ret == 0) {
1274 		memcpy(r->key, cursor->t2.v, cursor->t2.len);
1275 		r->key_len = cursor->t2.len;
1276 
1277 		memcpy(cursor->v, cursor->t3.v, cursor->t3.len);
1278 		cursor->len = cursor->t3.len;
1279 	}
1280 
1281 	/* Copy out the chosen key/value pair. */
1282 	WT_RET(copyout_key(wtcursor));
1283 	WT_RET(copyout_val(wtcursor, NULL));
1284 	return (0);
1285 }
1286 
1287 /*
1288  * helium_cursor_next --
1289  *	WT_CURSOR.next method.
1290  */
1291 static int
helium_cursor_next(WT_CURSOR * wtcursor)1292 helium_cursor_next(WT_CURSOR *wtcursor)
1293 {
1294 	return (nextprev(wtcursor, "he_next", he_next));
1295 }
1296 
1297 /*
1298  * helium_cursor_prev --
1299  *	WT_CURSOR.prev method.
1300  */
1301 static int
helium_cursor_prev(WT_CURSOR * wtcursor)1302 helium_cursor_prev(WT_CURSOR *wtcursor)
1303 {
1304 	return (nextprev(wtcursor, "he_prev", he_prev));
1305 }
1306 
1307 /*
1308  * helium_cursor_reset --
1309  *	WT_CURSOR.reset method.
1310  */
1311 static int
helium_cursor_reset(WT_CURSOR * wtcursor)1312 helium_cursor_reset(WT_CURSOR *wtcursor)
1313 {
1314 	CURSOR *cursor;
1315 	HE_ITEM *r;
1316 
1317 	cursor = (CURSOR *)wtcursor;
1318 	r = &cursor->record;
1319 
1320 	/*
1321 	 * Reset the cursor by setting the key length to 0, causing subsequent
1322 	 * next/prev operations to return the first/last record of the object.
1323 	 */
1324 	r->key_len = 0;
1325 	return (0);
1326 }
1327 
1328 /*
1329  * helium_cursor_search --
1330  *	WT_CURSOR.search method.
1331  */
1332 static int
helium_cursor_search(WT_CURSOR * wtcursor)1333 helium_cursor_search(WT_CURSOR *wtcursor)
1334 {
1335 	CACHE_RECORD *cp;
1336 	CURSOR *cursor;
1337 	WT_SOURCE *ws;
1338 	int ret = 0;
1339 
1340 	cursor = (CURSOR *)wtcursor;
1341 	ws = cursor->ws;
1342 
1343 	/* Copy in the WiredTiger cursor's key. */
1344 	WT_RET(copyin_key(wtcursor, 0));
1345 
1346 	/*
1347 	 * Check for an entry in the cache.  If we find one, unmarshall it
1348 	 * and check for a visible entry we can return.
1349 	 */
1350 	if ((ret =
1351 	    helium_call(wtcursor, "he_lookup", ws->he_cache, he_lookup)) == 0) {
1352 		WT_RET(cache_value_unmarshall(wtcursor));
1353 		if (cache_value_visible(wtcursor, &cp))
1354 			return (cp->remove ?
1355 			    WT_NOTFOUND : copyout_val(wtcursor, cp));
1356 	} else if (ret != WT_NOTFOUND)
1357 		return (ret);
1358 
1359 	/* Check for an entry in the primary store. */
1360 	WT_RET(helium_call(wtcursor, "he_lookup", ws->he, he_lookup));
1361 	WT_RET(copyout_val(wtcursor, NULL));
1362 
1363 	return (0);
1364 }
1365 
1366 /*
1367  * helium_cursor_search_near --
1368  *	WT_CURSOR.search_near method.
1369  */
1370 static int
helium_cursor_search_near(WT_CURSOR * wtcursor,int * exact)1371 helium_cursor_search_near(WT_CURSOR *wtcursor, int *exact)
1372 {
1373 	int ret = 0;
1374 
1375 	/*
1376 	 * XXX
1377 	 * I'm not confident this is sufficient: if there are multiple threads
1378 	 * of control, it's possible for the search for an exact match to fail,
1379 	 * another thread of control to insert (and commit) an exact match, and
1380 	 * then it's possible we'll return the wrong value.  This needs to be
1381 	 * revisited once the transactional code is in place.
1382 	 */
1383 
1384 	/* Search for an exact match. */
1385 	if ((ret = helium_cursor_search(wtcursor)) == 0) {
1386 		*exact = 0;
1387 		return (0);
1388 	}
1389 	if (ret != WT_NOTFOUND)
1390 		return (ret);
1391 
1392 	/* Search for a key that's larger. */
1393 	if ((ret = helium_cursor_next(wtcursor)) == 0) {
1394 		*exact = 1;
1395 		return (0);
1396 	}
1397 	if (ret != WT_NOTFOUND)
1398 		return (ret);
1399 
1400 	/* Search for a key that's smaller. */
1401 	if ((ret = helium_cursor_prev(wtcursor)) == 0) {
1402 		*exact = -1;
1403 		return (0);
1404 	}
1405 
1406 	return (ret);
1407 }
1408 
1409 /*
1410  * helium_cursor_insert --
1411  *	WT_CURSOR.insert method.
1412  */
1413 static int
helium_cursor_insert(WT_CURSOR * wtcursor)1414 helium_cursor_insert(WT_CURSOR *wtcursor)
1415 {
1416 	CACHE_RECORD *cp;
1417 	CURSOR *cursor;
1418 	HE_ITEM *r;
1419 	HELIUM_SOURCE *hs;
1420 	WT_EXTENSION_API *wt_api;
1421 	WT_SESSION *session;
1422 	WT_SOURCE *ws;
1423 	int ret = 0;
1424 
1425 	session = wtcursor->session;
1426 	cursor = (CURSOR *)wtcursor;
1427 	wt_api = cursor->wt_api;
1428 	ws = cursor->ws;
1429 	hs = ws->hs;
1430 	r = &cursor->record;
1431 
1432 	/* Get the WiredTiger cursor's key. */
1433 	WT_RET(copyin_key(wtcursor, 1));
1434 
1435 	VMSG(wt_api, session, VERBOSE_L2,
1436 	    "I %.*s.%.*s", (int)r->key_len, r->key, (int)r->val_len, r->val);
1437 
1438 	/* Clear the value, assume we're adding the first cache entry. */
1439 	cursor->len = 0;
1440 
1441 	/* Updates are read-modify-writes, lock the underlying cache. */
1442 	WT_RET(writelock(wt_api, session, &ws->lock));
1443 
1444 	/* Read the record from the cache store. */
1445 	switch (ret = helium_call(
1446 	    wtcursor, "he_lookup", ws->he_cache, he_lookup)) {
1447 	case 0:
1448 		/* Crack the record. */
1449 		WT_ERR(cache_value_unmarshall(wtcursor));
1450 
1451 		/* Check if the update can proceed. */
1452 		WT_ERR(cache_value_update_check(wtcursor));
1453 
1454 		if (cursor->config_overwrite)
1455 			break;
1456 
1457 		/*
1458 		 * If overwrite is false, a visible entry (that's not a removed
1459 		 * entry), is an error.  We're done checking if there is a
1460 		 * visible entry in the cache, otherwise repeat the check on the
1461 		 * primary store.
1462 		 */
1463 		if (cache_value_visible(wtcursor, &cp)) {
1464 			if (cp->remove)
1465 				break;
1466 
1467 			ret = WT_DUPLICATE_KEY;
1468 			goto err;
1469 		}
1470 		/* FALLTHROUGH */
1471 	case WT_NOTFOUND:
1472 		if (cursor->config_overwrite)
1473 			break;
1474 
1475 		/* If overwrite is false, an entry is an error. */
1476 		if ((ret = helium_call(
1477 		    wtcursor, "he_lookup", ws->he, he_lookup)) != WT_NOTFOUND) {
1478 			if (ret == 0)
1479 				ret = WT_DUPLICATE_KEY;
1480 			goto err;
1481 		}
1482 		ret = 0;
1483 		break;
1484 	default:
1485 		goto err;
1486 	}
1487 
1488 	/*
1489 	 * Create a new value using the current cache record plus the WiredTiger
1490 	 * cursor's value, and update the cache.
1491 	 */
1492 	WT_ERR(cache_value_append(wtcursor, 0));
1493 	if ((ret = he_update(ws->he_cache, r)) != 0)
1494 		EMSG(wt_api, session, ret, "he_update: %s", he_strerror(ret));
1495 
1496 	/* Update the state while still holding the lock. */
1497 	if (!ws->he_cache_inuse)
1498 		ws->he_cache_inuse = true;
1499 	++ws->he_cache_ops;
1500 
1501 	/* Discard the lock. */
1502 err:	ESET(unlock(wt_api, session, &ws->lock));
1503 
1504 	/* If successful, request notification at transaction resolution. */
1505 	if (ret == 0)
1506 		ESET(wt_api->transaction_notify(
1507 		    wt_api, session, &hs->txn_notify));
1508 
1509 	return (ret);
1510 }
1511 
1512 /*
1513  * update --
1514  *	Update or remove an entry.
1515  */
1516 static int
update(WT_CURSOR * wtcursor,int remove_op)1517 update(WT_CURSOR *wtcursor, int remove_op)
1518 {
1519 	CACHE_RECORD *cp;
1520 	CURSOR *cursor;
1521 	HE_ITEM *r;
1522 	HELIUM_SOURCE *hs;
1523 	WT_EXTENSION_API *wt_api;
1524 	WT_SESSION *session;
1525 	WT_SOURCE *ws;
1526 	int ret = 0;
1527 
1528 	session = wtcursor->session;
1529 	cursor = (CURSOR *)wtcursor;
1530 	wt_api = cursor->wt_api;
1531 	ws = cursor->ws;
1532 	hs = ws->hs;
1533 	r = &cursor->record;
1534 
1535 	/* Get the WiredTiger cursor's key. */
1536 	WT_RET(copyin_key(wtcursor, 0));
1537 
1538 	VMSG(wt_api, session, VERBOSE_L2,
1539 	    "%c %.*s.%.*s",
1540 	    remove_op ? 'R' : 'U',
1541 	    (int)r->key_len, r->key, (int)r->val_len, r->val);
1542 
1543 	/* Clear the value, assume we're adding the first cache entry. */
1544 	cursor->len = 0;
1545 
1546 	/* Updates are read-modify-writes, lock the underlying cache. */
1547 	WT_RET(writelock(wt_api, session, &ws->lock));
1548 
1549 	/* Read the record from the cache store. */
1550 	switch (ret = helium_call(
1551 	    wtcursor, "he_lookup", ws->he_cache, he_lookup)) {
1552 	case 0:
1553 		/* Crack the record. */
1554 		WT_ERR(cache_value_unmarshall(wtcursor));
1555 
1556 		/* Check if the update can proceed. */
1557 		WT_ERR(cache_value_update_check(wtcursor));
1558 
1559 		if (cursor->config_overwrite)
1560 			break;
1561 
1562 		/*
1563 		 * If overwrite is false, no entry (or a removed entry), is an
1564 		 * error. We're done checking if there is a visible entry in
1565 		 * the cache, otherwise repeat the check on the primary store.
1566 		 */
1567 		if (cache_value_visible(wtcursor, &cp)) {
1568 			if (!cp->remove)
1569 				break;
1570 
1571 			ret = WT_NOTFOUND;
1572 			goto err;
1573 		}
1574 		/* FALLTHROUGH */
1575 	case WT_NOTFOUND:
1576 		if (cursor->config_overwrite)
1577 			break;
1578 
1579 		/* If overwrite is false, no entry is an error. */
1580 		WT_ERR(helium_call(wtcursor, "he_lookup", ws->he, he_lookup));
1581 
1582 		/*
1583 		 * All we care about is the cache entry, which didn't exist;
1584 		 * clear the returned value, we're about to "append" to it.
1585 		 */
1586 		cursor->len = 0;
1587 		break;
1588 	default:
1589 		goto err;
1590 	}
1591 
1592 	/*
1593 	 * Create a new cache value based on the current cache record plus the
1594 	 * WiredTiger cursor's value.
1595 	 */
1596 	WT_ERR(cache_value_append(wtcursor, remove_op));
1597 
1598 	/* Push the record into the cache. */
1599 	if ((ret = he_update(ws->he_cache, r)) != 0)
1600 		EMSG(wt_api, session, ret, "he_update: %s", he_strerror(ret));
1601 
1602 	/* Update the state while still holding the lock. */
1603 	if (!ws->he_cache_inuse)
1604 		ws->he_cache_inuse = true;
1605 	++ws->he_cache_ops;
1606 
1607 	/* Discard the lock. */
1608 err:	ESET(unlock(wt_api, session, &ws->lock));
1609 
1610 	/* If successful, request notification at transaction resolution. */
1611 	if (ret == 0)
1612 		ESET(wt_api->transaction_notify(
1613 		    wt_api, session, &hs->txn_notify));
1614 
1615 	return (ret);
1616 }
1617 
1618 /*
1619  * helium_cursor_update --
1620  *	WT_CURSOR.update method.
1621  */
1622 static int
helium_cursor_update(WT_CURSOR * wtcursor)1623 helium_cursor_update(WT_CURSOR *wtcursor)
1624 {
1625 	return (update(wtcursor, 0));
1626 }
1627 
1628 /*
1629  * helium_cursor_reserve --
1630  *	WT_CURSOR.reserve method.
1631  */
1632 static int
helium_cursor_reserve(WT_CURSOR * wtcursor)1633 helium_cursor_reserve(WT_CURSOR *wtcursor)
1634 {
1635 	(void)wtcursor;
1636 
1637 	/*
1638 	 * XXX
1639 	 * We don't currently support reserve, this will require some work.
1640 	 * The test programs don't currently detect it, so return success.
1641 	 */
1642 	return (0);
1643 }
1644 
1645 /*
1646  * helium_cursor_remove --
1647  *	WT_CURSOR.remove method.
1648  */
1649 static int
helium_cursor_remove(WT_CURSOR * wtcursor)1650 helium_cursor_remove(WT_CURSOR *wtcursor)
1651 {
1652 	CURSOR *cursor;
1653 	WT_SOURCE *ws;
1654 
1655 	cursor = (CURSOR *)wtcursor;
1656 	ws = cursor->ws;
1657 
1658 	/*
1659 	 * WiredTiger's "remove" of a bitfield is really an update with a value
1660 	 * of zero.
1661 	 */
1662 	if (ws->config_bitfield) {
1663 		wtcursor->value.size = 1;
1664 		wtcursor->value.data = "";
1665 		return (update(wtcursor, 0));
1666 	}
1667 	return (update(wtcursor, 1));
1668 }
1669 
1670 /*
1671  * helium_cursor_close --
1672  *	WT_CURSOR.close method.
1673  */
1674 static int
helium_cursor_close(WT_CURSOR * wtcursor)1675 helium_cursor_close(WT_CURSOR *wtcursor)
1676 {
1677 	CURSOR *cursor;
1678 	WT_EXTENSION_API *wt_api;
1679 	WT_SESSION *session;
1680 	WT_SOURCE *ws;
1681 	int ret = 0;
1682 
1683 	session = wtcursor->session;
1684 	cursor = (CURSOR *)wtcursor;
1685 	wt_api = cursor->wt_api;
1686 	ws = cursor->ws;
1687 
1688 	if ((ret = writelock(wt_api, session, &ws->lock)) == 0) {
1689 		--ws->ref;
1690 		ret = unlock(wt_api, session, &ws->lock);
1691 	}
1692 	cursor_destroy(cursor);
1693 
1694 	return (ret);
1695 }
1696 
1697 /*
1698  * ws_source_name --
1699  *	Build a namespace name.
1700  */
1701 static int
ws_source_name(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,const char * suffix,char ** pp)1702 ws_source_name(WT_DATA_SOURCE *wtds,
1703     WT_SESSION *session, const char *uri, const char *suffix, char **pp)
1704 {
1705 	DATA_SOURCE *ds;
1706 	WT_EXTENSION_API *wt_api;
1707 	size_t len;
1708 	int ret = 0;
1709 	const char *p;
1710 
1711 	ds = (DATA_SOURCE *)wtds;
1712 	wt_api = ds->wt_api;
1713 
1714 	/*
1715 	 * Create the store's name.  Application URIs are "helium:device/name";
1716 	 * we want the names on the Helium device to be obviously WiredTiger's,
1717 	 * and the device name isn't interesting.  Convert to "WiredTiger:name",
1718 	 * and add an optional suffix.
1719 	 */
1720 	if (!prefix_match(uri, "helium:") || (p = strchr(uri, '/')) == NULL)
1721 		ERET(wt_api, session, EINVAL, "%s: illegal Helium URI", uri);
1722 	++p;
1723 
1724 	len = strlen(WT_NAME_PREFIX) +
1725 	    strlen(p) + (suffix == NULL ? 0 : strlen(suffix)) + 5;
1726 	if ((*pp = malloc(len)) == NULL)
1727 		return (os_errno());
1728 	(void)snprintf(*pp, len, "%s%s%s",
1729 	    WT_NAME_PREFIX, p, suffix == NULL ? "" : suffix);
1730 	return (0);
1731 }
1732 
1733 /*
1734  * ws_source_close --
1735  *	Close a WT_SOURCE reference.
1736  */
1737 static int
ws_source_close(WT_EXTENSION_API * wt_api,WT_SESSION * session,WT_SOURCE * ws)1738 ws_source_close(WT_EXTENSION_API *wt_api, WT_SESSION *session, WT_SOURCE *ws)
1739 {
1740 	int ret = 0, tret;
1741 
1742 	/*
1743 	 * Warn if open cursors: it shouldn't happen because the upper layers of
1744 	 * WiredTiger prevent it, so we don't do anything more than warn.
1745 	 */
1746 	if (ws->ref != 0)
1747 		EMSG(wt_api, session, WT_ERROR,
1748 		    "%s: open object with %u open cursors being closed",
1749 		    ws->uri, ws->ref);
1750 
1751 	if (ws->he != NULL) {
1752 		if ((tret = he_commit(ws->he)) != 0)
1753 			EMSG(wt_api, session, tret,
1754 			    "he_commit: %s: %s", ws->uri, he_strerror(tret));
1755 		if ((tret = he_close(ws->he)) != 0)
1756 			EMSG(wt_api, session, tret,
1757 			    "he_close: %s: %s", ws->uri, he_strerror(tret));
1758 		ws->he = NULL;
1759 	}
1760 	if (ws->he_cache != NULL) {
1761 		if ((tret = he_close(ws->he_cache)) != 0)
1762 			EMSG(wt_api, session, tret,
1763 			    "he_close: %s(cache): %s",
1764 			    ws->uri, he_strerror(tret));
1765 		ws->he_cache = NULL;
1766 	}
1767 
1768 	if (ws->lockinit)
1769 		ESET(lock_destroy(wt_api, session, &ws->lock));
1770 
1771 	free(ws->uri);
1772 	OVERWRITE_AND_FREE(ws);
1773 
1774 	return (ret);
1775 }
1776 
1777 /*
1778  * ws_source_open_object --
1779  *	Open an object in the Helium store.
1780  */
1781 static int
ws_source_open_object(WT_DATA_SOURCE * wtds,WT_SESSION * session,HELIUM_SOURCE * hs,const char * uri,const char * suffix,int flags,he_t * hep)1782 ws_source_open_object(WT_DATA_SOURCE *wtds, WT_SESSION *session,
1783     HELIUM_SOURCE *hs,
1784     const char *uri, const char *suffix, int flags, he_t *hep)
1785 {
1786 	DATA_SOURCE *ds;
1787 	WT_EXTENSION_API *wt_api;
1788 	he_t he;
1789 	char *p;
1790 	int ret = 0;
1791 
1792 	*hep = NULL;
1793 
1794 	ds = (DATA_SOURCE *)wtds;
1795 	wt_api = ds->wt_api;
1796 	p = NULL;
1797 
1798 	/* Open the underlying Helium object. */
1799 	WT_RET(ws_source_name(wtds, session, uri, suffix, &p));
1800 	VMSG(wt_api, session, VERBOSE_L1, "open %s/%s", hs->name, p);
1801 	if ((he = he_open(hs->device, p, flags, NULL)) == NULL) {
1802 		ret = os_errno();
1803 		EMSG(wt_api, session, ret,
1804 		    "he_open: %s/%s: %s", hs->name, p, he_strerror(ret));
1805 	}
1806 	*hep = he;
1807 
1808 	free(p);
1809 	return (ret);
1810 }
1811 
1812 #define	WS_SOURCE_OPEN_BUSY	0x01		/* Fail if source busy */
1813 #define	WS_SOURCE_OPEN_GLOBAL	0x02		/* Keep the global lock */
1814 
1815 /*
1816  * ws_source_open --
1817  *	Return a locked WiredTiger source, allocating and opening if it doesn't
1818  * already exist.
1819  */
1820 static int
ws_source_open(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config,u_int flags,WT_SOURCE ** refp)1821 ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session,
1822     const char *uri, WT_CONFIG_ARG *config, u_int flags, WT_SOURCE **refp)
1823 {
1824 	DATA_SOURCE *ds;
1825 	HELIUM_SOURCE *hs;
1826 	WT_CONFIG_ITEM a;
1827 	WT_EXTENSION_API *wt_api;
1828 	WT_SOURCE *ws;
1829 	size_t len;
1830 	int oflags, ret = 0;
1831 	const char *p, *t;
1832 
1833 	*refp = NULL;
1834 
1835 	ds = (DATA_SOURCE *)wtds;
1836 	wt_api = ds->wt_api;
1837 	ws = NULL;
1838 
1839 	/*
1840 	 * The URI will be "helium:" followed by a Helium name and object name
1841 	 * pair separated by a slash, for example, "helium:volume/object".
1842 	 */
1843 	if (!prefix_match(uri, "helium:"))
1844 		goto bad_name;
1845 	p = uri + strlen("helium:");
1846 	if (p[0] == '/' || (t = strchr(p, '/')) == NULL || t[1] == '\0')
1847 bad_name:	ERET(wt_api, session, EINVAL, "%s: illegal name format", uri);
1848 	len = (size_t)(t - p);
1849 
1850 	/* Find a matching Helium device. */
1851 	for (hs = ds->hs_head; hs != NULL; hs = hs->next)
1852 		if (string_match(hs->name, p, len))
1853 			break;
1854 	if (hs == NULL)
1855 		ERET(wt_api, NULL,
1856 		    EINVAL, "%s: no matching Helium store found", uri);
1857 
1858 	/*
1859 	 * We're about to walk the Helium device's list of files, acquire the
1860 	 * global lock.
1861 	 */
1862 	WT_RET(writelock(wt_api, session, &ds->global_lock));
1863 
1864 	/*
1865 	 * Check for a match: if we find one, optionally trade the global lock
1866 	 * for the object's lock, optionally check if the object is busy, and
1867 	 * return.
1868 	 */
1869 	for (ws = hs->ws_head; ws != NULL; ws = ws->next)
1870 		if (strcmp(ws->uri, uri) == 0) {
1871 			/* Check to see if the object is busy. */
1872 			if (ws->ref != 0 && (flags & WS_SOURCE_OPEN_BUSY)) {
1873 				ret = EBUSY;
1874 				ESET(unlock(wt_api, session, &ds->global_lock));
1875 				return (ret);
1876 			}
1877 			/* Swap the global lock for an object lock. */
1878 			if (!(flags & WS_SOURCE_OPEN_GLOBAL)) {
1879 				ret = writelock(wt_api, session, &ws->lock);
1880 				ESET(unlock(wt_api, session, &ds->global_lock));
1881 				if (ret != 0)
1882 					return (ret);
1883 			}
1884 			*refp = ws;
1885 			return (0);
1886 		}
1887 
1888 	/* Allocate and initialize a new underlying WiredTiger source object. */
1889 	if ((ws = calloc(1, sizeof(*ws))) == NULL ||
1890 	    (ws->uri = strdup(uri)) == NULL) {
1891 		ret = os_errno();
1892 		goto err;
1893 	}
1894 	WT_ERR(lock_init(wt_api, session, &ws->lock));
1895 	ws->lockinit = true;
1896 	ws->hs = hs;
1897 
1898 	/*
1899 	 * Open the underlying Helium objects, then push the change.
1900 	 *
1901 	 * The naming scheme is simple: the URI names the primary store, and the
1902 	 * URI with a trailing suffix names the associated caching store.
1903 	 *
1904 	 * We always set the create flag, our caller handles attempts to create
1905 	 * existing objects.
1906 	 */
1907 	oflags = HE_O_CREATE;
1908 	if ((ret = wt_api->config_get(wt_api,
1909 	    session, config, "helium_o_compress", &a)) == 0 && a.val != 0)
1910 		oflags |= HE_O_COMPRESS;
1911 	if (ret != 0 && ret != WT_NOTFOUND)
1912 		EMSG_ERR(wt_api, session, ret,
1913 		    "helium_o_compress configuration: %s",
1914 		    wt_api->strerror(wt_api, session, ret));
1915 	if ((ret = wt_api->config_get(wt_api,
1916 	    session, config, "helium_o_truncate", &a)) == 0 && a.val != 0)
1917 		oflags |= HE_O_TRUNCATE;
1918 	if (ret != 0 && ret != WT_NOTFOUND)
1919 		EMSG_ERR(wt_api, session, ret,
1920 		    "helium_o_truncate configuration: %s",
1921 		    wt_api->strerror(wt_api, session, ret));
1922 
1923 	WT_ERR(ws_source_open_object(
1924 	    wtds, session, hs, uri, NULL, oflags, &ws->he));
1925 	WT_ERR(ws_source_open_object(
1926 	    wtds, session, hs, uri, WT_NAME_CACHE, HE_O_CREATE, &ws->he_cache));
1927 	if ((ret = he_commit(ws->he)) != 0)
1928 		EMSG_ERR(wt_api, session, ret,
1929 		    "he_commit: %s", he_strerror(ret));
1930 
1931 	/* Optionally trade the global lock for the object lock. */
1932 	if (!(flags & WS_SOURCE_OPEN_GLOBAL))
1933 		WT_ERR(writelock(wt_api, session, &ws->lock));
1934 
1935 	/* Insert the new entry at the head of the list. */
1936 	ws->next = hs->ws_head;
1937 	hs->ws_head = ws;
1938 
1939 	*refp = ws;
1940 	ws = NULL;
1941 
1942 	if (0) {
1943 err:		if (ws != NULL)
1944 			ESET(ws_source_close(wt_api, session, ws));
1945 	}
1946 
1947 	/*
1948 	 * If there was an error or our caller doesn't need the global lock,
1949 	 * release the global lock.
1950 	 */
1951 	if (!(flags & WS_SOURCE_OPEN_GLOBAL) || ret != 0)
1952 		ESET(unlock(wt_api, session, &ds->global_lock));
1953 
1954 	return (ret);
1955 }
1956 
1957 /*
1958  * master_uri_get --
1959  *	Get the Helium master record for a URI.
1960  */
1961 static int
master_uri_get(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,char ** valuep)1962 master_uri_get(WT_DATA_SOURCE *wtds,
1963     WT_SESSION *session, const char *uri, char **valuep)
1964 {
1965 	DATA_SOURCE *ds;
1966 	WT_EXTENSION_API *wt_api;
1967 
1968 	ds = (DATA_SOURCE *)wtds;
1969 	wt_api = ds->wt_api;
1970 
1971 	return (wt_api->metadata_search(wt_api, session, uri, valuep));
1972 }
1973 
1974 /*
1975  * master_uri_drop --
1976  *	Drop the Helium master record for a URI.
1977  */
1978 static int
master_uri_drop(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri)1979 master_uri_drop(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri)
1980 {
1981 	DATA_SOURCE *ds;
1982 	WT_EXTENSION_API *wt_api;
1983 
1984 	ds = (DATA_SOURCE *)wtds;
1985 	wt_api = ds->wt_api;
1986 
1987 	return (wt_api->metadata_remove(wt_api, session, uri));
1988 }
1989 
1990 /*
1991  * master_uri_rename --
1992  *	Rename the Helium master record for a URI.
1993  */
1994 static int
master_uri_rename(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,const char * newuri)1995 master_uri_rename(WT_DATA_SOURCE *wtds,
1996     WT_SESSION *session, const char *uri, const char *newuri)
1997 {
1998 	DATA_SOURCE *ds;
1999 	WT_EXTENSION_API *wt_api;
2000 	int ret = 0;
2001 	char *value;
2002 
2003 	ds = (DATA_SOURCE *)wtds;
2004 	wt_api = ds->wt_api;
2005 	value = NULL;
2006 
2007 	/* Insert the record under a new name. */
2008 	WT_ERR(master_uri_get(wtds, session, uri, &value));
2009 	WT_ERR(wt_api->metadata_insert(wt_api, session, newuri, value));
2010 
2011 	/*
2012 	 * Remove the original record, and if that fails, attempt to remove
2013 	 * the new record.
2014 	 */
2015 	if ((ret = wt_api->metadata_remove(wt_api, session, uri)) != 0)
2016 		(void)wt_api->metadata_remove(wt_api, session, newuri);
2017 
2018 err:	free((void *)value);
2019 	return (ret);
2020 }
2021 
2022 /*
2023  * master_uri_set --
2024  *	Set the Helium master record for a URI.
2025  */
2026 static int
master_uri_set(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config)2027 master_uri_set(WT_DATA_SOURCE *wtds,
2028     WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
2029 {
2030 	DATA_SOURCE *ds;
2031 	WT_CONFIG_ITEM a, b, c;
2032 	WT_EXTENSION_API *wt_api;
2033 	int exclusive, ret = 0;
2034 	char value[1024];
2035 
2036 	ds = (DATA_SOURCE *)wtds;
2037 	wt_api = ds->wt_api;
2038 
2039 	exclusive = 0;
2040 	if ((ret =
2041 	    wt_api->config_get(wt_api, session, config, "exclusive", &a)) == 0)
2042 		exclusive = a.val != 0;
2043 	else if (ret != WT_NOTFOUND)
2044 		ERET(wt_api, session, ret,
2045 		    "exclusive configuration: %s",
2046 		    wt_api->strerror(wt_api, session, ret));
2047 
2048 	/* Get the key/value format strings. */
2049 	if ((ret = wt_api->config_get(
2050 	    wt_api, session, config, "key_format", &a)) != 0) {
2051 		if (ret == WT_NOTFOUND) {
2052 			a.str = "u";
2053 			a.len = 1;
2054 		} else
2055 			ERET(wt_api, session, ret,
2056 			    "key_format configuration: %s",
2057 			    wt_api->strerror(wt_api, session, ret));
2058 	}
2059 	if ((ret = wt_api->config_get(
2060 	    wt_api, session, config, "value_format", &b)) != 0) {
2061 		if (ret == WT_NOTFOUND) {
2062 			b.str = "u";
2063 			b.len = 1;
2064 		} else
2065 			ERET(wt_api, session, ret,
2066 			    "value_format configuration: %s",
2067 			    wt_api->strerror(wt_api, session, ret));
2068 	}
2069 
2070 	/* Get the compression configuration. */
2071 	if ((ret = wt_api->config_get(
2072 	    wt_api, session, config, "helium_o_compress", &c)) != 0) {
2073 		if (ret == WT_NOTFOUND)
2074 			c.val = 0;
2075 		else
2076 			ERET(wt_api, session, ret,
2077 			    "helium_o_compress configuration: %s",
2078 			    wt_api->strerror(wt_api, session, ret));
2079 	}
2080 
2081 	/*
2082 	 * Create a new reference using insert (which fails if the record
2083 	 * already exists).
2084 	 */
2085 	(void)snprintf(value, sizeof(value),
2086 	    "wiredtiger_helium_version=(major=%d,minor=%d),"
2087 	    "key_format=%.*s,value_format=%.*s,"
2088 	    "helium_o_compress=%d",
2089 	    WIREDTIGER_HELIUM_MAJOR, WIREDTIGER_HELIUM_MINOR,
2090 	    (int)a.len, a.str, (int)b.len, b.str, c.val ? 1 : 0);
2091 	if ((ret = wt_api->metadata_insert(wt_api, session, uri, value)) == 0)
2092 		return (0);
2093 	if (ret == WT_DUPLICATE_KEY)
2094 		return (exclusive ? EEXIST : 0);
2095 	ERET(wt_api, session,
2096 	    ret, "%s: %s", uri, wt_api->strerror(wt_api, session, ret));
2097 }
2098 
2099 /*
2100  * helium_session_open_cursor --
2101  *	WT_SESSION.open_cursor method.
2102  */
2103 static int
helium_session_open_cursor(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config,WT_CURSOR ** new_cursor)2104 helium_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
2105     const char *uri, WT_CONFIG_ARG *config, WT_CURSOR **new_cursor)
2106 {
2107 	CURSOR *cursor;
2108 	DATA_SOURCE *ds;
2109 	WT_CONFIG_ITEM v;
2110 	WT_CONFIG_PARSER *config_parser;
2111 	WT_CURSOR *wtcursor;
2112 	WT_EXTENSION_API *wt_api;
2113 	WT_SOURCE *ws;
2114 	int locked, own, ret, tret;
2115 	char *value;
2116 
2117 	*new_cursor = NULL;
2118 
2119 	config_parser = NULL;
2120 	cursor = NULL;
2121 	ds = (DATA_SOURCE *)wtds;
2122 	wt_api = ds->wt_api;
2123 	ws = NULL;
2124 	locked = 0;
2125 	ret = tret = 0;
2126 	value = NULL;
2127 
2128 	/* Allocate and initialize a cursor. */
2129 	if ((cursor = calloc(1, sizeof(CURSOR))) == NULL)
2130 		return (os_errno());
2131 
2132 	if ((ret = wt_api->config_get(		/* Parse configuration */
2133 	    wt_api, session, config, "append", &v)) != 0)
2134 		EMSG_ERR(wt_api, session, ret,
2135 		    "append configuration: %s",
2136 		    wt_api->strerror(wt_api, session, ret));
2137 	cursor->config_append = v.val != 0;
2138 
2139 	if ((ret = wt_api->config_get(
2140 	    wt_api, session, config, "overwrite", &v)) != 0)
2141 		EMSG_ERR(wt_api, session, ret,
2142 		    "overwrite configuration: %s",
2143 		    wt_api->strerror(wt_api, session, ret));
2144 	cursor->config_overwrite = v.val != 0;
2145 
2146 	if ((ret = wt_api->collator_config(
2147 	    wt_api, session, uri, config, NULL, &own)) != 0)
2148 		EMSG_ERR(wt_api, session, ret,
2149 		    "collator configuration: %s",
2150 		    wt_api->strerror(wt_api, session, ret));
2151 
2152 	/* Finish initializing the cursor. */
2153 	cursor->wtcursor.close = helium_cursor_close;
2154 	cursor->wtcursor.insert = helium_cursor_insert;
2155 	cursor->wtcursor.next = helium_cursor_next;
2156 	cursor->wtcursor.prev = helium_cursor_prev;
2157 	cursor->wtcursor.remove = helium_cursor_remove;
2158 	cursor->wtcursor.reserve = helium_cursor_reserve;
2159 	cursor->wtcursor.reset = helium_cursor_reset;
2160 	cursor->wtcursor.search = helium_cursor_search;
2161 	cursor->wtcursor.search_near = helium_cursor_search_near;
2162 	cursor->wtcursor.update = helium_cursor_update;
2163 
2164 	cursor->wt_api = wt_api;
2165 	cursor->record.key = cursor->__key;
2166 	if ((cursor->v = malloc(128)) == NULL)
2167 		goto err;
2168 	cursor->mem_len = 128;
2169 
2170 	/* Get a locked reference to the WiredTiger source. */
2171 	WT_ERR(ws_source_open(wtds, session, uri, config, 0, &ws));
2172 	locked = 1;
2173 	cursor->ws = ws;
2174 
2175 	/*
2176 	 * If this is the first access to the URI, we have to configure it
2177 	 * using information stored in the master record.
2178 	 */
2179 	if (!ws->configured) {
2180 		WT_ERR(master_uri_get(wtds, session, uri, &value));
2181 
2182 		if ((ret = wt_api->config_parser_open(wt_api,
2183 		    session, value, strlen(value), &config_parser)) != 0)
2184 			EMSG_ERR(wt_api, session, ret,
2185 			    "Configuration string parser: %s",
2186 			    wt_api->strerror(wt_api, session, ret));
2187 		if ((ret = config_parser->get(
2188 		    config_parser, "key_format", &v)) != 0)
2189 			EMSG_ERR(wt_api, session, ret,
2190 			    "key_format configuration: %s",
2191 			    wt_api->strerror(wt_api, session, ret));
2192 		ws->config_recno = v.len == 1 && v.str[0] == 'r';
2193 
2194 		if ((ret = config_parser->get(
2195 		    config_parser, "value_format", &v)) != 0)
2196 			EMSG_ERR(wt_api, session, ret,
2197 			    "value_format configuration: %s",
2198 			    wt_api->strerror(wt_api, session, ret));
2199 		ws->config_bitfield = v.len == 2 &&
2200 		    isdigit((u_char)v.str[0]) && v.str[1] == 't';
2201 
2202 		/*
2203 		 * If it's a record-number key, read the last record from the
2204 		 * object and set the allocation record value.
2205 		 */
2206 		if (ws->config_recno) {
2207 			wtcursor = (WT_CURSOR *)cursor;
2208 			WT_ERR(helium_cursor_reset(wtcursor));
2209 
2210 			if ((ret = helium_cursor_prev(wtcursor)) == 0)
2211 				ws->append_recno = wtcursor->recno;
2212 			else if (ret != WT_NOTFOUND)
2213 				goto err;
2214 
2215 			WT_ERR(helium_cursor_reset(wtcursor));
2216 		}
2217 
2218 		ws->configured = true;
2219 	}
2220 
2221 	/* Increment the open reference count to pin the URI and unlock it. */
2222 	++ws->ref;
2223 	WT_ERR(unlock(wt_api, session, &ws->lock));
2224 
2225 	*new_cursor = (WT_CURSOR *)cursor;
2226 
2227 	if (0) {
2228 err:		if (ws != NULL && locked)
2229 			ESET(unlock(wt_api, session, &ws->lock));
2230 		cursor_destroy(cursor);
2231 	}
2232 	if (config_parser != NULL &&
2233 	    (tret = config_parser->close(config_parser)) != 0)
2234 		EMSG(wt_api, session, tret,
2235 		    "WT_CONFIG_PARSER.close: %s",
2236 		    wt_api->strerror(wt_api, session, tret));
2237 
2238 	free((void *)value);
2239 	return (ret);
2240 }
2241 
2242 /*
2243  * helium_session_create --
2244  *	WT_SESSION.create method.
2245  */
2246 static int
helium_session_create(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config)2247 helium_session_create(WT_DATA_SOURCE *wtds,
2248     WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
2249 {
2250 	DATA_SOURCE *ds;
2251 	WT_EXTENSION_API *wt_api;
2252 	WT_SOURCE *ws;
2253 
2254 	ds = (DATA_SOURCE *)wtds;
2255 	wt_api = ds->wt_api;
2256 
2257 	/*
2258 	 * Get a locked reference to the WiredTiger source, then immediately
2259 	 * unlock it, we aren't doing anything else.
2260 	 */
2261 	WT_RET(ws_source_open(wtds, session, uri, config, 0, &ws));
2262 	WT_RET(unlock(wt_api, session, &ws->lock));
2263 
2264 	/*
2265 	 * Create the URI master record if it doesn't already exist.
2266 	 *
2267 	 * We've discarded the lock, but that's OK, creates are single-threaded
2268 	 * at the WiredTiger level, it's not our problem to solve.
2269 	 *
2270 	 * If unable to enter a WiredTiger record, leave the Helium store alone.
2271 	 * A subsequent create should do the right thing, we aren't leaving
2272 	 * anything in an inconsistent state.
2273 	 */
2274 	return (master_uri_set(wtds, session, uri, config));
2275 }
2276 
2277 /*
2278  * helium_session_drop --
2279  *	WT_SESSION.drop method.
2280  */
2281 static int
helium_session_drop(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config)2282 helium_session_drop(WT_DATA_SOURCE *wtds,
2283     WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
2284 {
2285 	DATA_SOURCE *ds;
2286 	HELIUM_SOURCE *hs;
2287 	WT_EXTENSION_API *wt_api;
2288 	WT_SOURCE **p, *ws;
2289 	int ret = 0;
2290 
2291 	ds = (DATA_SOURCE *)wtds;
2292 	wt_api = ds->wt_api;
2293 
2294 	/*
2295 	 * Get a locked reference to the data source: hold the global lock,
2296 	 * we're changing the HELIUM_SOURCE's list of WT_SOURCE objects.
2297 	 *
2298 	 * Remove the entry from the WT_SOURCE list -- it's a singly-linked
2299 	 * list, find the reference to it.
2300 	 */
2301 	WT_RET(ws_source_open(wtds, session, uri, config,
2302 	    WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws));
2303 	hs = ws->hs;
2304 	for (p = &hs->ws_head; *p != NULL; p = &(*p)->next)
2305 		if (*p == ws) {
2306 			*p = (*p)->next;
2307 			break;
2308 		}
2309 
2310 	/* Drop the underlying Helium objects. */
2311 	ESET(he_remove(ws->he));
2312 	ws->he = NULL;				/* The handle is dead. */
2313 	ESET(he_remove(ws->he_cache));
2314 	ws->he_cache = NULL;			/* The handle is dead. */
2315 
2316 	/* Close the source, discarding the structure. */
2317 	ESET(ws_source_close(wt_api, session, ws));
2318 	ws = NULL;
2319 
2320 	/* Discard the metadata entry. */
2321 	ESET(master_uri_drop(wtds, session, uri));
2322 
2323 	/*
2324 	 * If we have an error at this point, panic -- there's an inconsistency
2325 	 * in what WiredTiger knows about and the underlying store.
2326 	 */
2327 	if (ret != 0)
2328 		ret = WT_PANIC;
2329 
2330 	ESET(unlock(wt_api, session, &ds->global_lock));
2331 	return (ret);
2332 }
2333 
2334 /*
2335  * helium_session_rename --
2336  *	WT_SESSION.rename method.
2337  */
2338 static int
helium_session_rename(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,const char * newuri,WT_CONFIG_ARG * config)2339 helium_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
2340     const char *uri, const char *newuri, WT_CONFIG_ARG *config)
2341 {
2342 	DATA_SOURCE *ds;
2343 	WT_EXTENSION_API *wt_api;
2344 	WT_SOURCE *ws;
2345 	int ret = 0;
2346 	char *p;
2347 
2348 	ds = (DATA_SOURCE *)wtds;
2349 	wt_api = ds->wt_api;
2350 
2351 	/*
2352 	 * Get a locked reference to the data source; hold the global lock,
2353 	 * we are going to change the object's name, and we can't allow
2354 	 * other threads walking the list and comparing against the name.
2355 	 */
2356 	WT_RET(ws_source_open(wtds, session, uri, config,
2357 	    WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws));
2358 
2359 	/* Get a copy of the new name for the WT_SOURCE structure. */
2360 	if ((p = strdup(newuri)) == NULL) {
2361 		ret = os_errno();
2362 		goto err;
2363 	}
2364 	free(ws->uri);
2365 	ws->uri = p;
2366 
2367 	/* Rename the underlying Helium objects. */
2368 	ESET(ws_source_name(wtds, session, newuri, NULL, &p));
2369 	if (ret == 0) {
2370 		ESET(he_rename(ws->he, p));
2371 		free(p);
2372 	}
2373 	ESET(ws_source_name(wtds, session, newuri, WT_NAME_CACHE, &p));
2374 	if (ret == 0) {
2375 		ESET(he_rename(ws->he_cache, p));
2376 		free(p);
2377 	}
2378 
2379 	/* Update the metadata record. */
2380 	ESET(master_uri_rename(wtds, session, uri, newuri));
2381 
2382 	/*
2383 	 * If we have an error at this point, panic -- there's an inconsistency
2384 	 * in what WiredTiger knows about and the underlying store.
2385 	 */
2386 	if (ret != 0)
2387 		ret = WT_PANIC;
2388 
2389 err:	ESET(unlock(wt_api, session, &ds->global_lock));
2390 
2391 	return (ret);
2392 }
2393 
2394 /*
2395  * helium_session_truncate --
2396  *	WT_SESSION.truncate method.
2397  */
2398 static int
helium_session_truncate(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config)2399 helium_session_truncate(WT_DATA_SOURCE *wtds,
2400     WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
2401 {
2402 	DATA_SOURCE *ds;
2403 	WT_EXTENSION_API *wt_api;
2404 	int ret = 0;
2405 
2406 	(void)config;
2407 
2408 	ds = (DATA_SOURCE *)wtds;
2409 	wt_api = ds->wt_api;
2410 
2411 	/*
2412 	 * XXX
2413 	 * Fail URI truncation for now. (Truncation based on a cursor range is
2414 	 * handled by the upper-levels of WiredTiger, this is just support for
2415 	 * URI truncation.) The problem is there's no way to truncate an open
2416 	 * object in Helium without closing handles, and we can't close/re-open
2417 	 * handles because we don't have the configuration information from the
2418 	 * open.
2419 	 */
2420 	ERET(wt_api, session, ENOTSUP, "WT_SESSION.truncate: %s", uri);
2421 }
2422 
2423 /*
2424  * helium_session_verify --
2425  *	WT_SESSION.verify method.
2426  */
2427 static int
helium_session_verify(WT_DATA_SOURCE * wtds,WT_SESSION * session,const char * uri,WT_CONFIG_ARG * config)2428 helium_session_verify(WT_DATA_SOURCE *wtds,
2429     WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
2430 {
2431 	(void)wtds;
2432 	(void)session;
2433 	(void)uri;
2434 	(void)config;
2435 	return (0);
2436 }
2437 
2438 /*
2439  * helium_session_checkpoint --
2440  *	WT_SESSION.checkpoint method.
2441  */
2442 static int
helium_session_checkpoint(WT_DATA_SOURCE * wtds,WT_SESSION * session,WT_CONFIG_ARG * config)2443 helium_session_checkpoint(
2444     WT_DATA_SOURCE *wtds, WT_SESSION *session, WT_CONFIG_ARG *config)
2445 {
2446 	DATA_SOURCE *ds;
2447 	HELIUM_SOURCE *hs;
2448 	WT_EXTENSION_API *wt_api;
2449 	int ret = 0;
2450 
2451 	(void)config;
2452 
2453 	ds = (DATA_SOURCE *)wtds;
2454 	wt_api = ds->wt_api;
2455 
2456 	/* Flush all volumes. */
2457 	if ((hs = ds->hs_head) != NULL &&
2458 	    (ret = he_commit(hs->he_volume)) != 0)
2459 		ERET(wt_api, session, ret,
2460 		    "he_commit: %s: %s", hs->device, he_strerror(ret));
2461 
2462 	return (0);
2463 }
2464 
2465 /*
2466  * helium_source_close --
2467  *	Discard a HELIUM_SOURCE.
2468  */
2469 static int
helium_source_close(WT_EXTENSION_API * wt_api,WT_SESSION * session,HELIUM_SOURCE * hs)2470 helium_source_close(
2471     WT_EXTENSION_API *wt_api, WT_SESSION *session, HELIUM_SOURCE *hs)
2472 {
2473 	WT_SOURCE *ws;
2474 	int ret = 0, tret;
2475 
2476 	/* Resolve the cache into the primary one last time and quit. */
2477 	if (hs->cleaner_id != 0) {
2478 		hs->cleaner_stop = 1;
2479 
2480 		if ((tret = pthread_join(hs->cleaner_id, NULL)) != 0)
2481 			EMSG(wt_api, session, tret,
2482 			    "pthread_join: %s", strerror(tret));
2483 		hs->cleaner_id = 0;
2484 	}
2485 
2486 	/* Close the underlying WiredTiger sources. */
2487 	while ((ws = hs->ws_head) != NULL) {
2488 		hs->ws_head = ws->next;
2489 		ESET(ws_source_close(wt_api, session, ws));
2490 	}
2491 
2492 	/* If the owner, close the database transaction store. */
2493 	if (hs->he_txn != NULL && hs->he_owner) {
2494 		if ((tret = he_close(hs->he_txn)) != 0)
2495 			EMSG(wt_api, session, tret,
2496 			    "he_close: %s: %s: %s",
2497 			    hs->name, WT_NAME_TXN, he_strerror(tret));
2498 		hs->he_txn = NULL;
2499 		hs->he_owner = false;
2500 	}
2501 
2502 	/* Flush and close the Helium source. */
2503 	if (hs->he_volume != NULL) {
2504 		if ((tret = he_commit(hs->he_volume)) != 0)
2505 			EMSG(wt_api, session, tret,
2506 			    "he_commit: %s: %s",
2507 			    hs->device, he_strerror(tret));
2508 
2509 		if ((tret = he_close(hs->he_volume)) != 0)
2510 			EMSG(wt_api, session, tret,
2511 			    "he_close: %s: %s: %s",
2512 			    hs->name, WT_NAME_INIT, he_strerror(tret));
2513 		hs->he_volume = NULL;
2514 	}
2515 
2516 	free(hs->name);
2517 	free(hs->device);
2518 	OVERWRITE_AND_FREE(hs);
2519 
2520 	return (ret);
2521 }
2522 
2523 /*
2524  * cache_cleaner --
2525  *	Migrate information from the cache to the primary store.
2526  */
2527 static int
cache_cleaner(WT_EXTENSION_API * wt_api,WT_CURSOR * wtcursor,uint64_t oldest,uint64_t * txnminp)2528 cache_cleaner(WT_EXTENSION_API *wt_api,
2529     WT_CURSOR *wtcursor, uint64_t oldest, uint64_t *txnminp)
2530 {
2531 	CACHE_RECORD *cp;
2532 	CURSOR *cursor;
2533 	HE_ITEM *r;
2534 	WT_SOURCE *ws;
2535 	uint64_t txnid;
2536 	int locked, pushed, recovery, ret = 0;
2537 
2538 	/*
2539 	 * Called in two ways: in normal processing mode where we're supplied a
2540 	 * value for the oldest transaction ID not yet visible to a running
2541 	 * transaction, and we're tracking the smallest transaction ID
2542 	 * referenced by any cache entry, and in recovery mode where neither of
2543 	 * those are true.
2544 	 */
2545 	if (txnminp == NULL)
2546 		recovery = 1;
2547 	else {
2548 		recovery = 0;
2549 		*txnminp = UINT64_MAX;
2550 	}
2551 
2552 	cursor = (CURSOR *)wtcursor;
2553 	ws = cursor->ws;
2554 	r = &cursor->record;
2555 	locked = pushed = 0;
2556 
2557 	/*
2558 	 * For every cache key where all updates are globally visible:
2559 	 *	Migrate the most recent update value to the primary store.
2560 	 */
2561 	for (r->key_len = 0; (ret =
2562 	    helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) {
2563 		/*
2564 		 * Unmarshall the value, and if all of the updates are globally
2565 		 * visible, update the primary with the last committed update.
2566 		 * In normal processing, the last committed update test is for
2567 		 * a globally visible update that's not explicitly aborted.  In
2568 		 * recovery processing, the last committed update test is for
2569 		 * an explicitly committed update.  See the underlying functions
2570 		 * for more information.
2571 		 */
2572 		if ((ret = cache_value_unmarshall(wtcursor)) != 0)
2573 			goto err;
2574 		if (!recovery && !cache_value_visible_all(wtcursor, oldest))
2575 			continue;
2576 		if (recovery)
2577 			cache_value_last_committed(wtcursor, &cp);
2578 		else
2579 			cache_value_last_not_aborted(wtcursor, &cp);
2580 		if (cp == NULL)
2581 			continue;
2582 
2583 		pushed = 1;
2584 		if (cp->remove) {
2585 			if ((ret = he_delete(ws->he, r)) == 0)
2586 				continue;
2587 
2588 			/*
2589 			 * Updates confined to the cache may not appear in the
2590 			 * primary at all, that is, an insert and remove pair
2591 			 * may be confined to the cache.
2592 			 */
2593 			if (ret == HE_ERR_ITEM_NOT_FOUND) {
2594 				ret = 0;
2595 				continue;
2596 			}
2597 			ERET(wt_api, NULL, ret,
2598 			    "he_delete: %s", he_strerror(ret));
2599 		} else {
2600 			r->val = cp->v;
2601 			r->val_len = cp->len;
2602 			ret = he_update(ws->he, r);
2603 			if (ret == 0)
2604 				continue;
2605 
2606 			ERET(wt_api, NULL, ret,
2607 			    "he_update: %s", he_strerror(ret));
2608 		}
2609 	}
2610 
2611 	if (ret == WT_NOTFOUND)
2612 		ret = 0;
2613 	if (ret != 0)
2614 		ERET(wt_api, NULL, ret, "he_next: %s", he_strerror(ret));
2615 
2616 	/*
2617 	 * If we didn't move any keys from the cache to the primary, quit.  It's
2618 	 * possible we could still remove values from the cache, but not likely,
2619 	 * and another pass would probably be wasted effort (especially locked).
2620 	 */
2621 	if (!pushed)
2622 		return (0);
2623 
2624 	/*
2625 	 * Push the store to stable storage for correctness.  (It doesn't matter
2626 	 * what Helium handle we commit, so we just commit one of them.)
2627 	 */
2628 	if ((ret = he_commit(ws->he)) != 0)
2629 		ERET(wt_api, NULL, ret, "he_commit: %s", he_strerror(ret));
2630 
2631 	/*
2632 	 * If we're performing recovery, that's all we need to do, we're going
2633 	 * to simply discard the cache, there's no reason to remove entries one
2634 	 * at a time.
2635 	 */
2636 	if (recovery)
2637 		return (0);
2638 
2639 	/*
2640 	 * For every cache key where all updates are globally visible:
2641 	 *	Remove the cache key.
2642 	 *
2643 	 * We're updating the cache, which requires a lock during normal
2644 	 * cleaning.
2645 	 */
2646 	WT_ERR(writelock(wt_api, NULL, &ws->lock));
2647 	locked = 1;
2648 
2649 	for (r->key_len = 0; (ret =
2650 	    helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) {
2651 		/*
2652 		 * Unmarshall the value, and if all of the updates are globally
2653 		 * visible, remove the cache entry.
2654 		 */
2655 		WT_ERR(cache_value_unmarshall(wtcursor));
2656 		if (cache_value_visible_all(wtcursor, oldest)) {
2657 			if ((ret = he_delete(ws->he_cache, r)) != 0)
2658 				EMSG_ERR(wt_api, NULL, ret,
2659 				    "he_delete: %s", he_strerror(ret));
2660 			continue;
2661 		}
2662 
2663 		/*
2664 		 * If the entry will remain in the cache, figure out the oldest
2665 		 * transaction for which it contains an update (which might be
2666 		 * different from the oldest transaction in the system).  We
2667 		 * need the oldest transaction ID that appears anywhere in any
2668 		 * cache, it limits the records we can discard from the
2669 		 * transaction store.
2670 		 */
2671 		cache_value_txnmin(wtcursor, &txnid);
2672 		if (txnid < *txnminp)
2673 			*txnminp = txnid;
2674 	}
2675 
2676 	locked = 0;
2677 	WT_ERR(unlock(wt_api, NULL, &ws->lock));
2678 	if (ret == WT_NOTFOUND)
2679 		ret = 0;
2680 	if (ret != 0)
2681 		EMSG_ERR(wt_api, NULL, ret, "he_next: %s", he_strerror(ret));
2682 
2683 err:	if (locked)
2684 		ESET(unlock(wt_api, NULL, &ws->lock));
2685 
2686 	return (ret);
2687 }
2688 
2689 /*
2690  * txn_cleaner --
2691  *	Discard no longer needed entries from the transaction store.
2692  */
2693 static int
txn_cleaner(WT_CURSOR * wtcursor,he_t he_txn,uint64_t txnmin)2694 txn_cleaner(WT_CURSOR *wtcursor, he_t he_txn, uint64_t txnmin)
2695 {
2696 	CURSOR *cursor;
2697 	HE_ITEM *r;
2698 	WT_EXTENSION_API *wt_api;
2699 	uint64_t txnid;
2700 	int ret = 0;
2701 
2702 	cursor = (CURSOR *)wtcursor;
2703 	wt_api = cursor->wt_api;
2704 	r = &cursor->record;
2705 
2706 	/*
2707 	 * Remove all entries from the transaction store that are before the
2708 	 * oldest transaction ID that appears anywhere in any cache.
2709 	 */
2710 	for (r->key_len = 0;
2711 	    (ret = helium_call(wtcursor, "he_next", he_txn, he_next)) == 0;) {
2712 		memcpy(&txnid, r->key, sizeof(txnid));
2713 		if (txnid < txnmin && (ret = he_delete(he_txn, r)) != 0)
2714 			ERET(wt_api, NULL, ret,
2715 			    "he_delete: %s", he_strerror(ret));
2716 	}
2717 	if (ret == WT_NOTFOUND)
2718 		ret = 0;
2719 	if (ret != 0)
2720 		ERET(wt_api, NULL, ret, "he_next: %s", he_strerror(ret));
2721 
2722 	return (0);
2723 }
2724 
2725 /*
2726  * fake_cursor --
2727  *	Fake up enough of a cursor to do Helium operations.
2728  */
2729 static int
fake_cursor(WT_EXTENSION_API * wt_api,WT_CURSOR ** wtcursorp)2730 fake_cursor(WT_EXTENSION_API *wt_api, WT_CURSOR **wtcursorp)
2731 {
2732 	CURSOR *cursor;
2733 	WT_CURSOR *wtcursor;
2734 
2735 	/*
2736 	 * Fake a cursor.
2737 	 */
2738 	if ((cursor = calloc(1, sizeof(CURSOR))) == NULL)
2739 		return (os_errno());
2740 	cursor->wt_api = wt_api;
2741 	cursor->record.key = cursor->__key;
2742 	if ((cursor->v = malloc(128)) == NULL) {
2743 		free(cursor);
2744 		return (os_errno());
2745 	}
2746 	cursor->mem_len = 128;
2747 
2748 	/*
2749 	 * !!!
2750 	 * Fake cursors don't have WT_SESSION handles.
2751 	 */
2752 	wtcursor = (WT_CURSOR *)cursor;
2753 	wtcursor->session = NULL;
2754 
2755 	*wtcursorp = wtcursor;
2756 	return (0);
2757 }
2758 
2759 /*
2760  * cache_cleaner_worker --
2761  *	Thread to migrate data from the cache to the primary.
2762  */
2763 static void *
cache_cleaner_worker(void * arg)2764 cache_cleaner_worker(void *arg)
2765 {
2766 	struct timeval t;
2767 	CURSOR *cursor;
2768 	HELIUM_SOURCE *hs;
2769 	WT_CURSOR *wtcursor;
2770 	WT_EXTENSION_API *wt_api;
2771 	WT_SOURCE *ws;
2772 	uint64_t oldest, txnmin, txntmp;
2773 	int cleaner_stop, delay, ret = 0;
2774 
2775 	hs = (HELIUM_SOURCE *)arg;
2776 
2777 	cursor = NULL;
2778 	wt_api = hs->wt_api;
2779 
2780 	if ((ret = fake_cursor(wt_api, &wtcursor)) != 0)
2781 		EMSG_ERR(wt_api, NULL, ret, "cleaner: %s", strerror(ret));
2782 	cursor = (CURSOR *)wtcursor;
2783 
2784 	for (cleaner_stop = delay = 0; !cleaner_stop;) {
2785 		/*
2786 		 * Check if this will be the final run; cleaner_stop is declared
2787 		 * volatile, and so the read will happen.  We don't much care if
2788 		 * there's extra loops, it's enough if a read eventually happens
2789 		 * and finds the variable set.  Store the read locally, reading
2790 		 * the variable twice might race.
2791 		 */
2792 		cleaner_stop = hs->cleaner_stop;
2793 
2794 		/*
2795 		 * Delay if this isn't the final run and the last pass didn't
2796 		 * find any work to do.
2797 		 */
2798 		if (!cleaner_stop && delay != 0) {
2799 			t.tv_sec = delay;
2800 			t.tv_usec = 0;
2801 			(void)select(0, NULL, NULL, NULL, &t);
2802 		}
2803 
2804 		/* Run at least every 5 seconds. */
2805 		if (delay < 5)
2806 			++delay;
2807 
2808 		/*
2809 		 * Clean the datastore caches. It's both more and less expensive
2810 		 * to return values from the cache: more because we have to
2811 		 * marshall/unmarshall the values, less because there's only a
2812 		 * single lookup to the cache store rather than a lookup into
2813 		 * the cache and then the primary. I have no tuning information,
2814 		 * for now, just clean if there have been 1K operations.
2815 		 */
2816 #undef	CACHE_SIZE_TRIGGER
2817 #define	CACHE_SIZE_TRIGGER	(1000)
2818 		for (ws = hs->ws_head; ws != NULL; ws = ws->next)
2819 			if (ws->he_cache_ops > CACHE_SIZE_TRIGGER)
2820 				break;
2821 		if (!cleaner_stop && ws == NULL)
2822 			continue;
2823 
2824 		/* There was work to do, don't delay before checking again. */
2825 		delay = 0;
2826 
2827 		/*
2828 		 * Get the oldest transaction ID not yet visible to a running
2829 		 * transaction.  Do this before doing anything else, avoiding
2830 		 * any race with creating new WT_SOURCE handles.
2831 		 */
2832 		oldest = wt_api->transaction_oldest(wt_api);
2833 
2834 		/*
2835 		 * If any cache needs cleaning, clean them all, because we have
2836 		 * to know the minimum transaction ID referenced by any cache.
2837 		 *
2838 		 * For each cache/primary pair, migrate whatever records we can,
2839 		 * tracking the lowest transaction ID of any entry in any cache.
2840 		 */
2841 		txnmin = UINT64_MAX;
2842 		for (ws = hs->ws_head; ws != NULL; ws = ws->next) {
2843 			/* Reset the operations counter. */
2844 			ws->he_cache_ops = 0;
2845 
2846 			cursor->ws = ws;
2847 			WT_ERR(cache_cleaner(
2848 			    wt_api, wtcursor, oldest, &txntmp));
2849 			if (txntmp < txnmin)
2850 				txnmin = txntmp;
2851 		}
2852 
2853 		/*
2854 		 * Discard any transactions less than the minimum transaction ID
2855 		 * referenced in any cache.
2856 		 *
2857 		 * !!!
2858 		 * I'm playing fast-and-loose with whether or not the cursor
2859 		 * references an underlying WT_SOURCE, there's a structural
2860 		 * problem here.
2861 		 */
2862 		cursor->ws = NULL;
2863 		WT_ERR(txn_cleaner(wtcursor, hs->he_txn, txnmin));
2864 	}
2865 
2866 err:	cursor_destroy(cursor);
2867 	return (NULL);
2868 }
2869 
2870 /*
2871  * helium_config_read --
2872  *	Parse the Helium configuration.
2873  */
2874 static int
helium_config_read(WT_EXTENSION_API * wt_api,WT_CONFIG_ITEM * config,char ** devicep,HE_ENV * envp,int * env_setp,int * flagsp)2875 helium_config_read(WT_EXTENSION_API *wt_api, WT_CONFIG_ITEM *config,
2876     char **devicep, HE_ENV *envp, int *env_setp, int *flagsp)
2877 {
2878 	WT_CONFIG_ITEM k, v;
2879 	WT_CONFIG_PARSER *config_parser;
2880 	int ret = 0, tret;
2881 
2882 	*env_setp = 0;
2883 	*flagsp = 0;
2884 
2885 	/* Traverse the configuration arguments list. */
2886 	if ((ret = wt_api->config_parser_open(
2887 	    wt_api, NULL, config->str, config->len, &config_parser)) != 0)
2888 		ERET(wt_api, NULL, ret,
2889 		    "WT_EXTENSION_API.config_parser_open: %s",
2890 		    wt_api->strerror(wt_api, NULL, ret));
2891 	while ((ret = config_parser->next(config_parser, &k, &v)) == 0) {
2892 		if (string_match("helium_devices", k.str, k.len)) {
2893 			if ((*devicep = calloc(1, v.len + 1)) == NULL)
2894 				return (os_errno());
2895 			memcpy(*devicep, v.str, v.len);
2896 			continue;
2897 		}
2898 		if (string_match("helium_read_cache", k.str, k.len)) {
2899 			envp->read_cache = (uint64_t)v.val;
2900 			*env_setp = 1;
2901 			continue;
2902 		}
2903 		if (string_match("helium_write_cache", k.str, k.len)) {
2904 			envp->write_cache = (uint64_t)v.val;
2905 			*env_setp = 1;
2906 			continue;
2907 		}
2908 		if (string_match("helium_o_volume_truncate", k.str, k.len)) {
2909 			if (v.val != 0)
2910 				*flagsp |= HE_O_VOLUME_TRUNCATE;
2911 			continue;
2912 		}
2913 		EMSG_ERR(wt_api, NULL, EINVAL,
2914 		    "unknown configuration key value pair %.*s=%.*s",
2915 		    (int)k.len, k.str, (int)v.len, v.str);
2916 	}
2917 	if (ret == WT_NOTFOUND)
2918 		ret = 0;
2919 	if (ret != 0)
2920 		EMSG_ERR(wt_api, NULL, ret,
2921 		    "WT_CONFIG_PARSER.next: %s",
2922 		    wt_api->strerror(wt_api, NULL, ret));
2923 
2924 err:	if ((tret = config_parser->close(config_parser)) != 0)
2925 		EMSG(wt_api, NULL, tret,
2926 		    "WT_CONFIG_PARSER.close: %s",
2927 		    wt_api->strerror(wt_api, NULL, tret));
2928 
2929 	return (ret);
2930 }
2931 
2932 /*
2933  * helium_source_open --
2934  *	Allocate and open a Helium source.
2935  */
2936 static int
helium_source_open(DATA_SOURCE * ds,WT_CONFIG_ITEM * k,WT_CONFIG_ITEM * v)2937 helium_source_open(DATA_SOURCE *ds, WT_CONFIG_ITEM *k, WT_CONFIG_ITEM *v)
2938 {
2939 	struct he_env env;
2940 	HELIUM_SOURCE *hs;
2941 	WT_EXTENSION_API *wt_api;
2942 	int env_set, flags, ret = 0;
2943 
2944 	wt_api = ds->wt_api;
2945 	hs = NULL;
2946 
2947 	VMSG(wt_api, NULL, VERBOSE_L1, "volume %.*s=%.*s",
2948 	    (int)k->len, k->str, (int)v->len, v->str);
2949 
2950 	/*
2951 	 * Check for a Helium source we've already opened: we don't check the
2952 	 * value (which implies you can open the same underlying stores using
2953 	 * more than one name, but I don't know of any problems that causes),
2954 	 * we only check the key, that is, the top-level WiredTiger name.
2955 	 */
2956 	for (hs = ds->hs_head; hs != NULL; hs = hs->next)
2957 		if (string_match(hs->name, k->str, k->len))
2958 			ERET(wt_api, NULL,
2959 			    EINVAL, "%s: device already open", hs->name);
2960 
2961 	/* Allocate and initialize a new underlying Helium source object. */
2962 	if ((hs = calloc(1, sizeof(*hs))) == NULL ||
2963 	    (hs->name = calloc(1, k->len + 1)) == NULL) {
2964 		free(hs);
2965 		return (os_errno());
2966 	}
2967 	memcpy(hs->name, k->str, k->len);
2968 	hs->txn_notify.notify = txn_notify;
2969 	hs->wt_api = wt_api;
2970 
2971 	/* Read the configuration, require a device naming the Helium store. */
2972 	memset(&env, 0, sizeof(env));
2973 	WT_ERR(helium_config_read(
2974 	    wt_api, v, &hs->device, &env, &env_set, &flags));
2975 	if (hs->device == NULL)
2976 		EMSG_ERR(wt_api, NULL,
2977 		    EINVAL, "%s: no Helium volumes specified", hs->name);
2978 
2979 	/*
2980 	 * Open the Helium volume, creating it if necessary.  We have to open
2981 	 * an object at the same time, that's why we have object flags as well
2982 	 * as volume flags.
2983 	 */
2984 	flags |= HE_O_CREATE | HE_O_TRUNCATE | HE_O_CLEAN | HE_O_VOLUME_CREATE;
2985 	if ((hs->he_volume = he_open(
2986 	    hs->device, WT_NAME_INIT, flags, env_set ? &env : NULL)) == NULL) {
2987 		ret = os_errno();
2988 		EMSG_ERR(wt_api, NULL, ret,
2989 		    "he_open: %s: %s: %s",
2990 		    hs->name, WT_NAME_INIT, he_strerror(ret));
2991 	}
2992 
2993 	/* Insert the new entry at the head of the list. */
2994 	hs->next = ds->hs_head;
2995 	ds->hs_head = hs;
2996 
2997 	if (0) {
2998 err:		if (hs != NULL)
2999 			ESET(helium_source_close(wt_api, NULL, hs));
3000 	}
3001 	return (ret);
3002 }
3003 
3004 /*
3005  * helium_source_txn_open --
3006  *	Open the database-wide transaction store.
3007  */
3008 static int
helium_source_txn_open(DATA_SOURCE * ds)3009 helium_source_txn_open(DATA_SOURCE *ds)
3010 {
3011 	HELIUM_SOURCE *hs, *hs_txn;
3012 	WT_EXTENSION_API *wt_api;
3013 	he_t he_txn, t;
3014 	int ret = 0;
3015 
3016 	wt_api = ds->wt_api;
3017 
3018 	/*
3019 	 * The global txn namespace is per connection, it spans multiple Helium
3020 	 * sources.
3021 	 *
3022 	 * We've opened the Helium sources: check to see if any of them already
3023 	 * have a transaction store, and make sure we only find one.
3024 	 */
3025 	hs_txn = NULL;
3026 	he_txn = NULL;
3027 	for (hs = ds->hs_head; hs != NULL; hs = hs->next)
3028 		if ((t = he_open(hs->device, WT_NAME_TXN, 0, NULL)) != NULL) {
3029 			if (hs_txn != NULL) {
3030 				(void)he_close(t);
3031 				(void)he_close(hs_txn);
3032 				ERET(wt_api, NULL, WT_PANIC,
3033 				    "found multiple transaction stores, "
3034 				    "unable to proceed");
3035 			}
3036 			he_txn = t;
3037 			hs_txn = hs;
3038 		}
3039 
3040 	/*
3041 	 * If we didn't find a transaction store, open a transaction store in
3042 	 * the first Helium source we loaded. (It could just as easily be the
3043 	 * last one we loaded, we're just picking one, but picking the first
3044 	 * seems slightly less likely to make people wonder.)
3045 	 */
3046 	if ((hs = hs_txn) == NULL) {
3047 		for (hs = ds->hs_head; hs->next != NULL; hs = hs->next)
3048 			;
3049 		if ((he_txn = he_open(
3050 		    hs->device, WT_NAME_TXN, HE_O_CREATE, NULL)) == NULL) {
3051 			ret = os_errno();
3052 			ERET(wt_api, NULL, ret,
3053 			    "he_open: %s: %s: %s",
3054 			    hs->name, WT_NAME_TXN, he_strerror(ret));
3055 		}
3056 
3057 		/* Push the change. */
3058 		if ((ret = he_commit(he_txn)) != 0)
3059 			ERET(wt_api, NULL, ret,
3060 			    "he_commit: %s", he_strerror(ret));
3061 	}
3062 	VMSG(wt_api, NULL, VERBOSE_L1, "%s" "transactional store on %s",
3063 	    hs_txn == NULL ? "creating " : "", hs->name);
3064 
3065 	/* Set the owner field, this Helium source has to be closed last. */
3066 	hs->he_owner = true;
3067 
3068 	/* Add a reference to the transaction store in each Helium source. */
3069 	for (hs = ds->hs_head; hs != NULL; hs = hs->next)
3070 		hs->he_txn = he_txn;
3071 
3072 	return (0);
3073 }
3074 
3075 /*
3076  * helium_source_txn_truncate --
3077  *	Truncate the database-wide transaction store.
3078  */
3079 static int
helium_source_txn_truncate(DATA_SOURCE * ds)3080 helium_source_txn_truncate(DATA_SOURCE *ds)
3081 {
3082 	HELIUM_SOURCE *hs;
3083 	WT_EXTENSION_API *wt_api;
3084 	int ret = 0;
3085 
3086 	wt_api = ds->wt_api;
3087 
3088 	/*
3089 	 * We want to truncate the transaction store after recovery, but there
3090 	 * isn't a Helium truncate operation. Remove/re-open the store instead.
3091 	 */
3092 	hs = ds->hs_head;
3093 	if (hs->he_txn != NULL && (ret = he_remove(hs->he_txn)) != 0)
3094 		ERET(wt_api, NULL, ret,
3095 		    "he_remove: %s: %s: %s",
3096 		    hs->name, WT_NAME_TXN, he_strerror(ret));
3097 
3098 	/* The handle is dead, clear any references. */
3099 	for (hs = ds->hs_head; hs != NULL; hs = hs->next) {
3100 		hs->he_txn = NULL;
3101 		hs->he_owner = false;
3102 	}
3103 
3104 	return (helium_source_txn_open(ds));
3105 }
3106 
3107 /*
3108  * helium_source_recover_namespace --
3109  *	Recover a single cache/primary pair in a Helium namespace.
3110  */
3111 static int
helium_source_recover_namespace(WT_DATA_SOURCE * wtds,HELIUM_SOURCE * hs,const char * name,WT_CONFIG_ARG * config)3112 helium_source_recover_namespace(WT_DATA_SOURCE *wtds,
3113     HELIUM_SOURCE *hs, const char *name, WT_CONFIG_ARG *config)
3114 {
3115 	CURSOR *cursor;
3116 	DATA_SOURCE *ds;
3117 	WT_CURSOR *wtcursor;
3118 	WT_EXTENSION_API *wt_api;
3119 	WT_SOURCE *ws;
3120 	size_t len;
3121 	int ret = 0;
3122 	const char *p;
3123 	char *uri;
3124 
3125 	ds = (DATA_SOURCE *)wtds;
3126 	wt_api = ds->wt_api;
3127 	cursor = NULL;
3128 	ws = NULL;
3129 	uri = NULL;
3130 
3131 	/*
3132 	 * The name we store on the Helium device is a translation of the
3133 	 * WiredTiger name: do the reverse process here so we can use the
3134 	 * standard source-open function.
3135 	 */
3136 	p = name + strlen(WT_NAME_PREFIX);
3137 	len = strlen("helium:") + strlen(hs->name) + strlen(p) + 10;
3138 	if ((uri = malloc(len)) == NULL) {
3139 		ret = os_errno();
3140 		goto err;
3141 	}
3142 	(void)snprintf(uri, len, "helium:%s/%s", hs->name, p);
3143 
3144 	/*
3145 	 * Open the cache/primary pair by going through the full open process,
3146 	 * instantiating the underlying WT_SOURCE object.
3147 	 */
3148 	WT_ERR(ws_source_open(wtds, NULL, uri, config, 0, &ws));
3149 	WT_ERR(unlock(wt_api, NULL, &ws->lock));
3150 
3151 	/* Fake up a cursor. */
3152 	if ((ret = fake_cursor(wt_api, &wtcursor)) != 0)
3153 		EMSG_ERR(wt_api, NULL, ret, "recovery: %s", strerror(ret));
3154 	cursor = (CURSOR *)wtcursor;
3155 	cursor->ws = ws;
3156 
3157 	/* Process, then remove, the cache. */
3158 	WT_ERR(cache_cleaner(wt_api, wtcursor, 0, NULL));
3159 
3160 	if ((ret = he_remove(ws->he_cache)) != 0)
3161 		EMSG(wt_api, NULL, ret,
3162 		    "he_remove: %s(cache): %s", ws->uri, he_strerror(ret));
3163 	ws->he_cache = NULL;			/* The handle is dead. */
3164 
3165 	/* Close the underlying WiredTiger sources. */
3166 err:	while ((ws = hs->ws_head) != NULL) {
3167 		hs->ws_head = ws->next;
3168 		ESET(ws_source_close(wt_api, NULL, ws));
3169 	}
3170 
3171 	cursor_destroy(cursor);
3172 	free(uri);
3173 
3174 	return (ret);
3175 }
3176 
3177 struct helium_namespace_cookie {
3178 	char **list;
3179 	u_int  list_cnt;
3180 	u_int  list_max;
3181 };
3182 
3183 /*
3184  * helium_namespace_list --
3185  *	Get a list of the objects we're going to recover.
3186  */
3187 static int
helium_namespace_list(void * cookie,const char * name)3188 helium_namespace_list(void *cookie, const char *name)
3189 {
3190 	struct helium_namespace_cookie *names;
3191 	void *allocp;
3192 
3193 	names = cookie;
3194 
3195 	/*
3196 	 * Ignore any files without a WiredTiger prefix.
3197 	 * Ignore the metadata and cache files.
3198 	 */
3199 	if (!prefix_match(name, WT_NAME_PREFIX))
3200 		return (0);
3201 	if (strcmp(name, WT_NAME_INIT) == 0)
3202 		return (0);
3203 	if (strcmp(name, WT_NAME_TXN) == 0)
3204 		return (0);
3205 	if (string_match(
3206 	    strrchr(name, '.'), WT_NAME_CACHE, strlen(WT_NAME_CACHE)))
3207 		return (0);
3208 
3209 	if (names->list_cnt + 1 >= names->list_max) {
3210 		if ((allocp = realloc(names->list,
3211 		    (names->list_max + 20) * sizeof(names->list[0]))) == NULL)
3212 			return (os_errno());
3213 		names->list = allocp;
3214 		names->list_max += 20;
3215 	}
3216 	if ((names->list[names->list_cnt] = strdup(name)) == NULL)
3217 		return (os_errno());
3218 	++names->list_cnt;
3219 	names->list[names->list_cnt] = NULL;
3220 	return (0);
3221 }
3222 
3223 /*
3224  * helium_source_recover --
3225  *	Recover the HELIUM_SOURCE.
3226  */
3227 static int
helium_source_recover(WT_DATA_SOURCE * wtds,HELIUM_SOURCE * hs,WT_CONFIG_ARG * config)3228 helium_source_recover(
3229     WT_DATA_SOURCE *wtds, HELIUM_SOURCE *hs, WT_CONFIG_ARG *config)
3230 {
3231 	struct helium_namespace_cookie names;
3232 	DATA_SOURCE *ds;
3233 	WT_EXTENSION_API *wt_api;
3234 	u_int i;
3235 	int ret = 0;
3236 
3237 	ds = (DATA_SOURCE *)wtds;
3238 	wt_api = ds->wt_api;
3239 	memset(&names, 0, sizeof(names));
3240 
3241 	VMSG(wt_api, NULL, VERBOSE_L1, "recover %s", hs->name);
3242 
3243 	/* Get a list of the cache/primary object pairs in the Helium source. */
3244 	if ((ret = he_enumerate(
3245 	    hs->device, helium_namespace_list, &names)) != 0)
3246 		ERET(wt_api, NULL, ret,
3247 		    "he_enumerate: %s: %s", hs->name, he_strerror(ret));
3248 
3249 	/* Recover the objects. */
3250 	for (i = 0; i < names.list_cnt; ++i)
3251 		WT_ERR(helium_source_recover_namespace(
3252 		    wtds, hs, names.list[i], config));
3253 
3254 err:	for (i = 0; i < names.list_cnt; ++i)
3255 		free(names.list[i]);
3256 	free(names.list);
3257 
3258 	return (ret);
3259 }
3260 
3261 /*
3262  * helium_terminate --
3263  *	Unload the data-source.
3264  */
3265 static int
helium_terminate(WT_DATA_SOURCE * wtds,WT_SESSION * session)3266 helium_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
3267 {
3268 	DATA_SOURCE *ds;
3269 	HELIUM_SOURCE *hs, *last;
3270 	WT_EXTENSION_API *wt_api;
3271 	int ret = 0;
3272 
3273 	ds = (DATA_SOURCE *)wtds;
3274 	wt_api = ds->wt_api;
3275 
3276 	/* Lock the system down. */
3277 	if (ds->lockinit)
3278 		ret = writelock(wt_api, session, &ds->global_lock);
3279 
3280 	/*
3281 	 * Close the Helium sources, close the Helium source that "owns" the
3282 	 * database transaction store last.
3283 	 */
3284 	last = NULL;
3285 	while ((hs = ds->hs_head) != NULL) {
3286 		ds->hs_head = hs->next;
3287 		if (hs->he_owner) {
3288 			last = hs;
3289 			continue;
3290 		}
3291 		ESET(helium_source_close(wt_api, session, hs));
3292 	}
3293 	if (last != NULL)
3294 		ESET(helium_source_close(wt_api, session, last));
3295 
3296 	/* Unlock and destroy the system. */
3297 	if (ds->lockinit) {
3298 		ESET(unlock(wt_api, session, &ds->global_lock));
3299 		ESET(lock_destroy(wt_api, NULL, &ds->global_lock));
3300 	}
3301 
3302 	OVERWRITE_AND_FREE(ds);
3303 
3304 	return (ret);
3305 }
3306 
3307 /*
3308  * wiredtiger_extension_init --
3309  *	Initialize the Helium connector code.
3310  */
3311 int
wiredtiger_extension_init(WT_CONNECTION * connection,WT_CONFIG_ARG * config)3312 wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
3313 {
3314 	/*
3315 	 * List of the WT_DATA_SOURCE methods -- it's static so it breaks at
3316 	 * compile-time should the structure change underneath us.
3317 	 */
3318 	static const WT_DATA_SOURCE wtds = {
3319 		NULL,				/* No session.alter */
3320 		helium_session_create,		/* session.create */
3321 		NULL,				/* No session.compaction */
3322 		helium_session_drop,		/* session.drop */
3323 		helium_session_open_cursor,	/* session.open_cursor */
3324 		helium_session_rename,		/* session.rename */
3325 		NULL,				/* No session.salvage */
3326 		helium_session_truncate,	/* session.truncate */
3327 		NULL,				/* No session.range_truncate */
3328 		helium_session_verify,		/* session.verify */
3329 		helium_session_checkpoint,	/* session.checkpoint */
3330 		helium_terminate		/* termination */
3331 	};
3332 	static const char *session_create_opts[] = {
3333 		"helium_o_compress=0",		/* HE_O_COMPRESS */
3334 		"helium_o_truncate=0",		/* HE_O_TRUNCATE */
3335 		NULL
3336 	};
3337 	DATA_SOURCE *ds;
3338 	HELIUM_SOURCE *hs;
3339 	WT_CONFIG_ITEM k, v;
3340 	WT_CONFIG_PARSER *config_parser;
3341 	WT_EXTENSION_API *wt_api;
3342 	int vmajor, vminor, vpatch, ret = 0;
3343 	const char **p;
3344 
3345 	config_parser = NULL;
3346 	ds = NULL;
3347 
3348 	wt_api = connection->get_extension_api(connection);
3349 
3350 						/* Check the library version */
3351 #if HE_VERSION_MAJOR != 2 || HE_VERSION_MINOR != 12
3352 	ERET(wt_api, NULL, EINVAL,
3353 	    "unsupported Helium header file %d.%d, expected version 2.12",
3354 	    HE_VERSION_MAJOR, HE_VERSION_MINOR);
3355 #endif
3356 	(void)he_version(&vmajor, &vminor, &vpatch);
3357 	if (vmajor != 2 || vminor != 12)
3358 		ERET(wt_api, NULL, EINVAL,
3359 		    "unsupported Helium library version %d.%d, expected "
3360 		    "version 2.12", vmajor, vminor);
3361 
3362 	/* Allocate and initialize the local data-source structure. */
3363 	if ((ds = calloc(1, sizeof(DATA_SOURCE))) == NULL)
3364 		return (os_errno());
3365 	ds->wtds = wtds;
3366 	ds->wt_api = wt_api;
3367 	WT_ERR(lock_init(wt_api, NULL, &ds->global_lock));
3368 	ds->lockinit = true;
3369 
3370 	/* Step through the list of Helium sources, opening each one. */
3371 	if ((ret = wt_api->config_parser_open_arg(
3372 	    wt_api, NULL, config, &config_parser)) != 0)
3373 		EMSG_ERR(wt_api, NULL, ret,
3374 		    "WT_EXTENSION_API.config_parser_open: config: %s",
3375 		    wt_api->strerror(wt_api, NULL, ret));
3376 	while ((ret = config_parser->next(config_parser, &k, &v)) == 0) {
3377 		if (string_match("helium_verbose", k.str, k.len)) {
3378 			verbose = v.val == 0 ? 0 : 1;
3379 			continue;
3380 		}
3381 		if ((ret = helium_source_open(ds, &k, &v)) != 0)
3382 			goto err;
3383 	}
3384 	if (ret != WT_NOTFOUND)
3385 		EMSG_ERR(wt_api, NULL, ret,
3386 		    "WT_CONFIG_PARSER.next: config: %s",
3387 		    wt_api->strerror(wt_api, NULL, ret));
3388 	if ((ret = config_parser->close(config_parser)) != 0)
3389 		EMSG_ERR(wt_api, NULL, ret,
3390 		    "WT_CONFIG_PARSER.close: config: %s",
3391 		    wt_api->strerror(wt_api, NULL, ret));
3392 	config_parser = NULL;
3393 
3394 	/*
3395 	 * Find and open the database transaction store, recover each Helium
3396 	 * source, then discard the transaction store's contents.
3397 	 */
3398 	WT_ERR(helium_source_txn_open(ds));
3399 	for (hs = ds->hs_head; hs != NULL; hs = hs->next)
3400 		WT_ERR(helium_source_recover(&ds->wtds, hs, config));
3401 	WT_ERR(helium_source_txn_truncate(ds));
3402 
3403 	/* Start each Helium source cleaner thread. */
3404 	for (hs = ds->hs_head; hs != NULL; hs = hs->next)
3405 		if ((ret = pthread_create(
3406 		    &hs->cleaner_id, NULL, cache_cleaner_worker, hs)) != 0)
3407 			EMSG_ERR(wt_api, NULL, ret,
3408 			    "%s: pthread_create: cleaner thread: %s",
3409 			    hs->name, strerror(ret));
3410 
3411 	/* Add Helium-specific WT_SESSION.create configuration options.  */
3412 	for (p = session_create_opts; *p != NULL; ++p)
3413 		if ((ret = connection->configure_method(connection,
3414 		    "WT_SESSION.create", "helium:", *p, "boolean", NULL)) != 0)
3415 			EMSG_ERR(wt_api, NULL, ret,
3416 			    "WT_CONNECTION.configure_method: session.create: "
3417 			    "%s: %s",
3418 			    *p, wt_api->strerror(wt_api, NULL, ret));
3419 
3420 	/* Add the data source */
3421 	if ((ret = connection->add_data_source(
3422 	    connection, "helium:", (WT_DATA_SOURCE *)ds, NULL)) != 0)
3423 		EMSG_ERR(wt_api, NULL, ret,
3424 		    "WT_CONNECTION.add_data_source: %s",
3425 		    wt_api->strerror(wt_api, NULL, ret));
3426 	return (0);
3427 
3428 err:	if (ds != NULL)
3429 		ESET(helium_terminate((WT_DATA_SOURCE *)ds, NULL));
3430 	if (config_parser != NULL)
3431 		(void)config_parser->close(config_parser);
3432 	return (ret);
3433 }
3434 
3435 /*
3436  * wiredtiger_extension_terminate --
3437  *	Shutdown the Helium connector code.
3438  */
3439 int
wiredtiger_extension_terminate(WT_CONNECTION * connection)3440 wiredtiger_extension_terminate(WT_CONNECTION *connection)
3441 {
3442 	(void)connection;			/* Unused parameters */
3443 
3444 	return (0);
3445 }
3446