1 /*-
2 * Copyright (c) 2014-2018 MongoDB, Inc.
3 * Copyright (c) 2008-2014 WiredTiger, Inc.
4 * All rights reserved.
5 *
6 * See the file LICENSE for redistribution information.
7 */
8
9 #include "wt_internal.h"
10
11 /*
12 * __wt_page_modify_alloc --
13 * Allocate a page's modification structure.
14 */
15 int
__wt_page_modify_alloc(WT_SESSION_IMPL * session,WT_PAGE * page)16 __wt_page_modify_alloc(WT_SESSION_IMPL *session, WT_PAGE *page)
17 {
18 WT_DECL_RET;
19 WT_PAGE_MODIFY *modify;
20
21 WT_RET(__wt_calloc_one(session, &modify));
22
23 /* Initialize the spinlock for the page. */
24 WT_ERR(__wt_spin_init(session, &modify->page_lock, "btree page"));
25
26 /*
27 * Multiple threads of control may be searching and deciding to modify
28 * a page. If our modify structure is used, update the page's memory
29 * footprint, else discard the modify structure, another thread did the
30 * work.
31 */
32 if (__wt_atomic_cas_ptr(&page->modify, NULL, modify))
33 __wt_cache_page_inmem_incr(session, page, sizeof(*modify));
34 else
35 err: __wt_free(session, modify);
36 return (ret);
37 }
38
39 /*
40 * __wt_row_modify --
41 * Row-store insert, update and delete.
42 */
43 int
__wt_row_modify(WT_CURSOR_BTREE * cbt,const WT_ITEM * key,const WT_ITEM * value,WT_UPDATE * upd_arg,u_int modify_type,bool exclusive)44 __wt_row_modify(
45 WT_CURSOR_BTREE *cbt, const WT_ITEM *key, const WT_ITEM *value,
46 WT_UPDATE *upd_arg, u_int modify_type, bool exclusive)
47 {
48 WT_DECL_RET;
49 WT_INSERT *ins;
50 WT_INSERT_HEAD *ins_head, **ins_headp;
51 WT_PAGE *page;
52 WT_PAGE_MODIFY *mod;
53 WT_SESSION_IMPL *session;
54 WT_UPDATE *old_upd, *upd, **upd_entry;
55 size_t ins_size, upd_size;
56 uint32_t ins_slot;
57 u_int i, skipdepth;
58 bool logged;
59
60 ins = NULL;
61 page = cbt->ref->page;
62 session = (WT_SESSION_IMPL *)cbt->iface.session;
63 upd = upd_arg;
64 logged = false;
65
66 /* We're going to modify the page, we should have loaded history. */
67 WT_ASSERT(session, cbt->ref->state != WT_REF_LIMBO);
68
69 /* If we don't yet have a modify structure, we'll need one. */
70 WT_RET(__wt_page_modify_init(session, page));
71 mod = page->modify;
72
73 /*
74 * Modify: allocate an update array as necessary, build a WT_UPDATE
75 * structure, and call a serialized function to insert the WT_UPDATE
76 * structure.
77 *
78 * Insert: allocate an insert array as necessary, build a WT_INSERT
79 * and WT_UPDATE structure pair, and call a serialized function to
80 * insert the WT_INSERT structure.
81 */
82 if (cbt->compare == 0) {
83 if (cbt->ins == NULL) {
84 /* Allocate an update array as necessary. */
85 WT_PAGE_ALLOC_AND_SWAP(session, page,
86 mod->mod_row_update, upd_entry, page->entries);
87
88 /* Set the WT_UPDATE array reference. */
89 upd_entry = &mod->mod_row_update[cbt->slot];
90 } else
91 upd_entry = &cbt->ins->upd;
92
93 if (upd_arg == NULL) {
94 /* Make sure the update can proceed. */
95 WT_ERR(__wt_txn_update_check(
96 session, old_upd = *upd_entry));
97
98 /* Allocate a WT_UPDATE structure and transaction ID. */
99 WT_ERR(__wt_update_alloc(session,
100 value, &upd, &upd_size, modify_type));
101 WT_ERR(__wt_txn_modify(session, cbt, upd));
102 logged = true;
103
104 /* Avoid WT_CURSOR.update data copy. */
105 cbt->modify_update = upd;
106 } else {
107 upd_size = __wt_update_list_memsize(upd);
108
109 /*
110 * We are restoring updates that couldn't be evicted,
111 * there should only be one update list per key.
112 */
113 WT_ASSERT(session, *upd_entry == NULL);
114
115 /*
116 * Set the "old" entry to the second update in the list
117 * so that the serialization function succeeds in
118 * swapping the first update into place.
119 */
120 old_upd = *upd_entry = upd->next;
121 }
122
123 /*
124 * Point the new WT_UPDATE item to the next element in the list.
125 * If we get it right, the serialization function lock acts as
126 * our memory barrier to flush this write.
127 */
128 upd->next = old_upd;
129
130 /* Serialize the update. */
131 WT_ERR(__wt_update_serial(
132 session, page, upd_entry, &upd, upd_size, exclusive));
133 } else {
134 /*
135 * Allocate the insert array as necessary.
136 *
137 * We allocate an additional insert array slot for insert keys
138 * sorting less than any key on the page. The test to select
139 * that slot is baroque: if the search returned the first page
140 * slot, we didn't end up processing an insert list, and the
141 * comparison value indicates the search key was smaller than
142 * the returned slot, then we're using the smallest-key insert
143 * slot. That's hard, so we set a flag.
144 */
145 WT_PAGE_ALLOC_AND_SWAP(session, page,
146 mod->mod_row_insert, ins_headp, page->entries + 1);
147
148 ins_slot = F_ISSET(cbt, WT_CBT_SEARCH_SMALLEST) ?
149 page->entries: cbt->slot;
150 ins_headp = &mod->mod_row_insert[ins_slot];
151
152 /* Allocate the WT_INSERT_HEAD structure as necessary. */
153 WT_PAGE_ALLOC_AND_SWAP(session, page, *ins_headp, ins_head, 1);
154 ins_head = *ins_headp;
155
156 /* Choose a skiplist depth for this insert. */
157 skipdepth = __wt_skip_choose_depth(session);
158
159 /*
160 * Allocate a WT_INSERT/WT_UPDATE pair and transaction ID, and
161 * update the cursor to reference it (the WT_INSERT_HEAD might
162 * be allocated, the WT_INSERT was allocated).
163 */
164 WT_ERR(__wt_row_insert_alloc(
165 session, key, skipdepth, &ins, &ins_size));
166 cbt->ins_head = ins_head;
167 cbt->ins = ins;
168
169 if (upd_arg == NULL) {
170 WT_ERR(__wt_update_alloc(session,
171 value, &upd, &upd_size, modify_type));
172 WT_ERR(__wt_txn_modify(session, cbt, upd));
173 logged = true;
174
175 /* Avoid WT_CURSOR.update data copy. */
176 cbt->modify_update = upd;
177 } else
178 upd_size = __wt_update_list_memsize(upd);
179
180 ins->upd = upd;
181 ins_size += upd_size;
182
183 /*
184 * If there was no insert list during the search, the cursor's
185 * information cannot be correct, search couldn't have
186 * initialized it.
187 *
188 * Otherwise, point the new WT_INSERT item's skiplist to the
189 * next elements in the insert list (which we will check are
190 * still valid inside the serialization function).
191 *
192 * The serial mutex acts as our memory barrier to flush these
193 * writes before inserting them into the list.
194 */
195 if (cbt->ins_stack[0] == NULL)
196 for (i = 0; i < skipdepth; i++) {
197 cbt->ins_stack[i] = &ins_head->head[i];
198 ins->next[i] = cbt->next_stack[i] = NULL;
199 }
200 else
201 for (i = 0; i < skipdepth; i++)
202 ins->next[i] = cbt->next_stack[i];
203
204 /* Insert the WT_INSERT structure. */
205 WT_ERR(__wt_insert_serial(
206 session, page, cbt->ins_head, cbt->ins_stack,
207 &ins, ins_size, skipdepth, exclusive));
208 }
209
210 if (logged && modify_type != WT_UPDATE_RESERVE)
211 WT_ERR(__wt_txn_log_op(session, cbt));
212
213 if (0) {
214 err: /*
215 * Remove the update from the current transaction, so we don't
216 * try to modify it on rollback.
217 */
218 if (logged)
219 __wt_txn_unmodify(session);
220 __wt_free(session, ins);
221 cbt->ins = NULL;
222 if (upd_arg == NULL)
223 __wt_free(session, upd);
224 }
225
226 return (ret);
227 }
228
229 /*
230 * __wt_row_insert_alloc --
231 * Row-store insert: allocate a WT_INSERT structure and fill it in.
232 */
233 int
__wt_row_insert_alloc(WT_SESSION_IMPL * session,const WT_ITEM * key,u_int skipdepth,WT_INSERT ** insp,size_t * ins_sizep)234 __wt_row_insert_alloc(WT_SESSION_IMPL *session,
235 const WT_ITEM *key, u_int skipdepth, WT_INSERT **insp, size_t *ins_sizep)
236 {
237 WT_INSERT *ins;
238 size_t ins_size;
239
240 /*
241 * Allocate the WT_INSERT structure, next pointers for the skip list,
242 * and room for the key. Then copy the key into place.
243 */
244 ins_size = sizeof(WT_INSERT) +
245 skipdepth * sizeof(WT_INSERT *) + key->size;
246 WT_RET(__wt_calloc(session, 1, ins_size, &ins));
247
248 ins->u.key.offset = WT_STORE_SIZE(ins_size - key->size);
249 WT_INSERT_KEY_SIZE(ins) = WT_STORE_SIZE(key->size);
250 memcpy(WT_INSERT_KEY(ins), key->data, key->size);
251
252 *insp = ins;
253 if (ins_sizep != NULL)
254 *ins_sizep = ins_size;
255 return (0);
256 }
257
258 /*
259 * __wt_update_alloc --
260 * Allocate a WT_UPDATE structure and associated value and fill it in.
261 */
262 int
__wt_update_alloc(WT_SESSION_IMPL * session,const WT_ITEM * value,WT_UPDATE ** updp,size_t * sizep,u_int modify_type)263 __wt_update_alloc(WT_SESSION_IMPL *session, const WT_ITEM *value,
264 WT_UPDATE **updp, size_t *sizep, u_int modify_type)
265 {
266 WT_UPDATE *upd;
267
268 *updp = NULL;
269
270 /*
271 * The code paths leading here are convoluted: assert we never attempt
272 * to allocate an update structure if only intending to insert one we
273 * already have.
274 */
275 WT_ASSERT(session, modify_type != WT_UPDATE_INVALID);
276
277 /*
278 * Allocate the WT_UPDATE structure and room for the value, then copy
279 * the value into place.
280 */
281 if (modify_type == WT_UPDATE_BIRTHMARK ||
282 modify_type == WT_UPDATE_RESERVE ||
283 modify_type == WT_UPDATE_TOMBSTONE)
284 WT_RET(__wt_calloc(session, 1, WT_UPDATE_SIZE, &upd));
285 else {
286 WT_RET(__wt_calloc(
287 session, 1, WT_UPDATE_SIZE + value->size, &upd));
288 if (value->size != 0) {
289 upd->size = WT_STORE_SIZE(value->size);
290 memcpy(upd->data, value->data, value->size);
291 }
292 }
293 upd->type = (uint8_t)modify_type;
294
295 *updp = upd;
296 *sizep = WT_UPDATE_MEMSIZE(upd);
297 return (0);
298 }
299
300 /*
301 * __wt_update_obsolete_check --
302 * Check for obsolete updates.
303 */
304 WT_UPDATE *
__wt_update_obsolete_check(WT_SESSION_IMPL * session,WT_PAGE * page,WT_UPDATE * upd)305 __wt_update_obsolete_check(
306 WT_SESSION_IMPL *session, WT_PAGE *page, WT_UPDATE *upd)
307 {
308 WT_TXN_GLOBAL *txn_global;
309 WT_UPDATE *first, *next;
310 u_int count;
311
312 txn_global = &S2C(session)->txn_global;
313
314 /*
315 * This function identifies obsolete updates, and truncates them from
316 * the rest of the chain; because this routine is called from inside
317 * a serialization function, the caller has responsibility for actually
318 * freeing the memory.
319 *
320 * Walk the list of updates, looking for obsolete updates at the end.
321 *
322 * Only updates with globally visible, self-contained data can terminate
323 * update chains.
324 */
325 for (first = NULL, count = 0; upd != NULL; upd = upd->next, count++) {
326 if (upd->txnid == WT_TXN_ABORTED)
327 continue;
328 if (!__wt_txn_upd_visible_all(session, upd))
329 first = NULL;
330 else if (first == NULL && (WT_UPDATE_DATA_VALUE(upd) ||
331 upd->type == WT_UPDATE_BIRTHMARK))
332 first = upd;
333 }
334
335 /*
336 * We cannot discard this WT_UPDATE structure, we can only discard
337 * WT_UPDATE structures subsequent to it, other threads of control will
338 * terminate their walk in this element. Save a reference to the list
339 * we will discard, and terminate the list.
340 */
341 if (first != NULL &&
342 (next = first->next) != NULL &&
343 __wt_atomic_cas_ptr(&first->next, next, NULL))
344 return (next);
345
346 /*
347 * If the list is long, don't retry checks on this page until the
348 * transaction state has moved forwards. This function is used to
349 * trim update lists independently of the page state, ensure there
350 * is a modify structure.
351 */
352 if (count > 20 && page->modify != NULL) {
353 page->modify->obsolete_check_txn = txn_global->last_running;
354 #ifdef HAVE_TIMESTAMPS
355 if (txn_global->has_pinned_timestamp)
356 WT_WITH_TIMESTAMP_READLOCK(session, &txn_global->rwlock,
357 __wt_timestamp_set(
358 &page->modify->obsolete_check_timestamp,
359 &txn_global->pinned_timestamp));
360 #endif
361 }
362
363 return (NULL);
364 }
365