1 /*-
2  * Copyright (c) 2014-2018 MongoDB, Inc.
3  * Copyright (c) 2008-2014 WiredTiger, Inc.
4  *	All rights reserved.
5  *
6  * See the file LICENSE for redistribution information.
7  */
8 
9 #include "wt_internal.h"
10 
11 /*
12  * __wt_page_modify_alloc --
13  *	Allocate a page's modification structure.
14  */
15 int
__wt_page_modify_alloc(WT_SESSION_IMPL * session,WT_PAGE * page)16 __wt_page_modify_alloc(WT_SESSION_IMPL *session, WT_PAGE *page)
17 {
18 	WT_DECL_RET;
19 	WT_PAGE_MODIFY *modify;
20 
21 	WT_RET(__wt_calloc_one(session, &modify));
22 
23 	/* Initialize the spinlock for the page. */
24 	WT_ERR(__wt_spin_init(session, &modify->page_lock, "btree page"));
25 
26 	/*
27 	 * Multiple threads of control may be searching and deciding to modify
28 	 * a page.  If our modify structure is used, update the page's memory
29 	 * footprint, else discard the modify structure, another thread did the
30 	 * work.
31 	 */
32 	if (__wt_atomic_cas_ptr(&page->modify, NULL, modify))
33 		__wt_cache_page_inmem_incr(session, page, sizeof(*modify));
34 	else
35 err:		__wt_free(session, modify);
36 	return (ret);
37 }
38 
39 /*
40  * __wt_row_modify --
41  *	Row-store insert, update and delete.
42  */
43 int
__wt_row_modify(WT_CURSOR_BTREE * cbt,const WT_ITEM * key,const WT_ITEM * value,WT_UPDATE * upd_arg,u_int modify_type,bool exclusive)44 __wt_row_modify(
45     WT_CURSOR_BTREE *cbt, const WT_ITEM *key, const WT_ITEM *value,
46     WT_UPDATE *upd_arg, u_int modify_type, bool exclusive)
47 {
48 	WT_DECL_RET;
49 	WT_INSERT *ins;
50 	WT_INSERT_HEAD *ins_head, **ins_headp;
51 	WT_PAGE *page;
52 	WT_PAGE_MODIFY *mod;
53 	WT_SESSION_IMPL *session;
54 	WT_UPDATE *old_upd, *upd, **upd_entry;
55 	size_t ins_size, upd_size;
56 	uint32_t ins_slot;
57 	u_int i, skipdepth;
58 	bool logged;
59 
60 	ins = NULL;
61 	page = cbt->ref->page;
62 	session = (WT_SESSION_IMPL *)cbt->iface.session;
63 	upd = upd_arg;
64 	logged = false;
65 
66 	/* We're going to modify the page, we should have loaded history. */
67 	WT_ASSERT(session, cbt->ref->state != WT_REF_LIMBO);
68 
69 	/* If we don't yet have a modify structure, we'll need one. */
70 	WT_RET(__wt_page_modify_init(session, page));
71 	mod = page->modify;
72 
73 	/*
74 	 * Modify: allocate an update array as necessary, build a WT_UPDATE
75 	 * structure, and call a serialized function to insert the WT_UPDATE
76 	 * structure.
77 	 *
78 	 * Insert: allocate an insert array as necessary, build a WT_INSERT
79 	 * and WT_UPDATE structure pair, and call a serialized function to
80 	 * insert the WT_INSERT structure.
81 	 */
82 	if (cbt->compare == 0) {
83 		if (cbt->ins == NULL) {
84 			/* Allocate an update array as necessary. */
85 			WT_PAGE_ALLOC_AND_SWAP(session, page,
86 			    mod->mod_row_update, upd_entry, page->entries);
87 
88 			/* Set the WT_UPDATE array reference. */
89 			upd_entry = &mod->mod_row_update[cbt->slot];
90 		} else
91 			upd_entry = &cbt->ins->upd;
92 
93 		if (upd_arg == NULL) {
94 			/* Make sure the update can proceed. */
95 			WT_ERR(__wt_txn_update_check(
96 			    session, old_upd = *upd_entry));
97 
98 			/* Allocate a WT_UPDATE structure and transaction ID. */
99 			WT_ERR(__wt_update_alloc(session,
100 			    value, &upd, &upd_size, modify_type));
101 			WT_ERR(__wt_txn_modify(session, cbt, upd));
102 			logged = true;
103 
104 			/* Avoid WT_CURSOR.update data copy. */
105 			cbt->modify_update = upd;
106 		} else {
107 			upd_size = __wt_update_list_memsize(upd);
108 
109 			/*
110 			 * We are restoring updates that couldn't be evicted,
111 			 * there should only be one update list per key.
112 			 */
113 			WT_ASSERT(session, *upd_entry == NULL);
114 
115 			/*
116 			 * Set the "old" entry to the second update in the list
117 			 * so that the serialization function succeeds in
118 			 * swapping the first update into place.
119 			 */
120 			old_upd = *upd_entry = upd->next;
121 		}
122 
123 		/*
124 		 * Point the new WT_UPDATE item to the next element in the list.
125 		 * If we get it right, the serialization function lock acts as
126 		 * our memory barrier to flush this write.
127 		 */
128 		upd->next = old_upd;
129 
130 		/* Serialize the update. */
131 		WT_ERR(__wt_update_serial(
132 		    session, page, upd_entry, &upd, upd_size, exclusive));
133 	} else {
134 		/*
135 		 * Allocate the insert array as necessary.
136 		 *
137 		 * We allocate an additional insert array slot for insert keys
138 		 * sorting less than any key on the page.  The test to select
139 		 * that slot is baroque: if the search returned the first page
140 		 * slot, we didn't end up processing an insert list, and the
141 		 * comparison value indicates the search key was smaller than
142 		 * the returned slot, then we're using the smallest-key insert
143 		 * slot.  That's hard, so we set a flag.
144 		 */
145 		WT_PAGE_ALLOC_AND_SWAP(session, page,
146 		    mod->mod_row_insert, ins_headp, page->entries + 1);
147 
148 		ins_slot = F_ISSET(cbt, WT_CBT_SEARCH_SMALLEST) ?
149 		    page->entries: cbt->slot;
150 		ins_headp = &mod->mod_row_insert[ins_slot];
151 
152 		/* Allocate the WT_INSERT_HEAD structure as necessary. */
153 		WT_PAGE_ALLOC_AND_SWAP(session, page, *ins_headp, ins_head, 1);
154 		ins_head = *ins_headp;
155 
156 		/* Choose a skiplist depth for this insert. */
157 		skipdepth = __wt_skip_choose_depth(session);
158 
159 		/*
160 		 * Allocate a WT_INSERT/WT_UPDATE pair and transaction ID, and
161 		 * update the cursor to reference it (the WT_INSERT_HEAD might
162 		 * be allocated, the WT_INSERT was allocated).
163 		 */
164 		WT_ERR(__wt_row_insert_alloc(
165 		    session, key, skipdepth, &ins, &ins_size));
166 		cbt->ins_head = ins_head;
167 		cbt->ins = ins;
168 
169 		if (upd_arg == NULL) {
170 			WT_ERR(__wt_update_alloc(session,
171 			    value, &upd, &upd_size, modify_type));
172 			WT_ERR(__wt_txn_modify(session, cbt, upd));
173 			logged = true;
174 
175 			/* Avoid WT_CURSOR.update data copy. */
176 			cbt->modify_update = upd;
177 		} else
178 			upd_size = __wt_update_list_memsize(upd);
179 
180 		ins->upd = upd;
181 		ins_size += upd_size;
182 
183 		/*
184 		 * If there was no insert list during the search, the cursor's
185 		 * information cannot be correct, search couldn't have
186 		 * initialized it.
187 		 *
188 		 * Otherwise, point the new WT_INSERT item's skiplist to the
189 		 * next elements in the insert list (which we will check are
190 		 * still valid inside the serialization function).
191 		 *
192 		 * The serial mutex acts as our memory barrier to flush these
193 		 * writes before inserting them into the list.
194 		 */
195 		if (cbt->ins_stack[0] == NULL)
196 			for (i = 0; i < skipdepth; i++) {
197 				cbt->ins_stack[i] = &ins_head->head[i];
198 				ins->next[i] = cbt->next_stack[i] = NULL;
199 			}
200 		else
201 			for (i = 0; i < skipdepth; i++)
202 				ins->next[i] = cbt->next_stack[i];
203 
204 		/* Insert the WT_INSERT structure. */
205 		WT_ERR(__wt_insert_serial(
206 		    session, page, cbt->ins_head, cbt->ins_stack,
207 		    &ins, ins_size, skipdepth, exclusive));
208 	}
209 
210 	if (logged && modify_type != WT_UPDATE_RESERVE)
211 		WT_ERR(__wt_txn_log_op(session, cbt));
212 
213 	if (0) {
214 err:		/*
215 		 * Remove the update from the current transaction, so we don't
216 		 * try to modify it on rollback.
217 		 */
218 		if (logged)
219 			__wt_txn_unmodify(session);
220 		__wt_free(session, ins);
221 		cbt->ins = NULL;
222 		if (upd_arg == NULL)
223 			__wt_free(session, upd);
224 	}
225 
226 	return (ret);
227 }
228 
229 /*
230  * __wt_row_insert_alloc --
231  *	Row-store insert: allocate a WT_INSERT structure and fill it in.
232  */
233 int
__wt_row_insert_alloc(WT_SESSION_IMPL * session,const WT_ITEM * key,u_int skipdepth,WT_INSERT ** insp,size_t * ins_sizep)234 __wt_row_insert_alloc(WT_SESSION_IMPL *session,
235     const WT_ITEM *key, u_int skipdepth, WT_INSERT **insp, size_t *ins_sizep)
236 {
237 	WT_INSERT *ins;
238 	size_t ins_size;
239 
240 	/*
241 	 * Allocate the WT_INSERT structure, next pointers for the skip list,
242 	 * and room for the key.  Then copy the key into place.
243 	 */
244 	ins_size = sizeof(WT_INSERT) +
245 	    skipdepth * sizeof(WT_INSERT *) + key->size;
246 	WT_RET(__wt_calloc(session, 1, ins_size, &ins));
247 
248 	ins->u.key.offset = WT_STORE_SIZE(ins_size - key->size);
249 	WT_INSERT_KEY_SIZE(ins) = WT_STORE_SIZE(key->size);
250 	memcpy(WT_INSERT_KEY(ins), key->data, key->size);
251 
252 	*insp = ins;
253 	if (ins_sizep != NULL)
254 		*ins_sizep = ins_size;
255 	return (0);
256 }
257 
258 /*
259  * __wt_update_alloc --
260  *	Allocate a WT_UPDATE structure and associated value and fill it in.
261  */
262 int
__wt_update_alloc(WT_SESSION_IMPL * session,const WT_ITEM * value,WT_UPDATE ** updp,size_t * sizep,u_int modify_type)263 __wt_update_alloc(WT_SESSION_IMPL *session, const WT_ITEM *value,
264     WT_UPDATE **updp, size_t *sizep, u_int modify_type)
265 {
266 	WT_UPDATE *upd;
267 
268 	*updp = NULL;
269 
270 	/*
271 	 * The code paths leading here are convoluted: assert we never attempt
272 	 * to allocate an update structure if only intending to insert one we
273 	 * already have.
274 	 */
275 	WT_ASSERT(session, modify_type != WT_UPDATE_INVALID);
276 
277 	/*
278 	 * Allocate the WT_UPDATE structure and room for the value, then copy
279 	 * the value into place.
280 	 */
281 	if (modify_type == WT_UPDATE_BIRTHMARK ||
282 	    modify_type == WT_UPDATE_RESERVE ||
283 	    modify_type == WT_UPDATE_TOMBSTONE)
284 		WT_RET(__wt_calloc(session, 1, WT_UPDATE_SIZE, &upd));
285 	else {
286 		WT_RET(__wt_calloc(
287 		    session, 1, WT_UPDATE_SIZE + value->size, &upd));
288 		if (value->size != 0) {
289 			upd->size = WT_STORE_SIZE(value->size);
290 			memcpy(upd->data, value->data, value->size);
291 		}
292 	}
293 	upd->type = (uint8_t)modify_type;
294 
295 	*updp = upd;
296 	*sizep = WT_UPDATE_MEMSIZE(upd);
297 	return (0);
298 }
299 
300 /*
301  * __wt_update_obsolete_check --
302  *	Check for obsolete updates.
303  */
304 WT_UPDATE *
__wt_update_obsolete_check(WT_SESSION_IMPL * session,WT_PAGE * page,WT_UPDATE * upd)305 __wt_update_obsolete_check(
306     WT_SESSION_IMPL *session, WT_PAGE *page, WT_UPDATE *upd)
307 {
308 	WT_TXN_GLOBAL *txn_global;
309 	WT_UPDATE *first, *next;
310 	u_int count;
311 
312 	txn_global = &S2C(session)->txn_global;
313 
314 	/*
315 	 * This function identifies obsolete updates, and truncates them from
316 	 * the rest of the chain; because this routine is called from inside
317 	 * a serialization function, the caller has responsibility for actually
318 	 * freeing the memory.
319 	 *
320 	 * Walk the list of updates, looking for obsolete updates at the end.
321 	 *
322 	 * Only updates with globally visible, self-contained data can terminate
323 	 * update chains.
324 	 */
325 	for (first = NULL, count = 0; upd != NULL; upd = upd->next, count++) {
326 		if (upd->txnid == WT_TXN_ABORTED)
327 			continue;
328 		if (!__wt_txn_upd_visible_all(session, upd))
329 			first = NULL;
330 		else if (first == NULL && (WT_UPDATE_DATA_VALUE(upd) ||
331 		    upd->type == WT_UPDATE_BIRTHMARK))
332 			first = upd;
333 	}
334 
335 	/*
336 	 * We cannot discard this WT_UPDATE structure, we can only discard
337 	 * WT_UPDATE structures subsequent to it, other threads of control will
338 	 * terminate their walk in this element.  Save a reference to the list
339 	 * we will discard, and terminate the list.
340 	 */
341 	if (first != NULL &&
342 	    (next = first->next) != NULL &&
343 	    __wt_atomic_cas_ptr(&first->next, next, NULL))
344 		return (next);
345 
346 	/*
347 	 * If the list is long, don't retry checks on this page until the
348 	 * transaction state has moved forwards. This function is used to
349 	 * trim update lists independently of the page state, ensure there
350 	 * is a modify structure.
351 	 */
352 	if (count > 20 && page->modify != NULL) {
353 		page->modify->obsolete_check_txn = txn_global->last_running;
354 #ifdef HAVE_TIMESTAMPS
355 		if (txn_global->has_pinned_timestamp)
356 			WT_WITH_TIMESTAMP_READLOCK(session, &txn_global->rwlock,
357 			    __wt_timestamp_set(
358 				&page->modify->obsolete_check_timestamp,
359 				&txn_global->pinned_timestamp));
360 #endif
361 	}
362 
363 	return (NULL);
364 }
365