1 /*****************************************************************************
2 
3 Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file row/row0uins.cc
22 Fresh insert undo
23 
24 Created 2/25/1997 Heikki Tuuri
25 *******************************************************/
26 
27 #include "row0uins.h"
28 #include "dict0dict.h"
29 #include "dict0stats.h"
30 #include "dict0boot.h"
31 #include "dict0crea.h"
32 #include "trx0undo.h"
33 #include "trx0roll.h"
34 #include "btr0btr.h"
35 #include "mach0data.h"
36 #include "row0undo.h"
37 #include "row0vers.h"
38 #include "row0log.h"
39 #include "trx0trx.h"
40 #include "trx0rec.h"
41 #include "row0row.h"
42 #include "row0upd.h"
43 #include "que0que.h"
44 #include "ibuf0ibuf.h"
45 #include "log0log.h"
46 #include "fil0fil.h"
47 
48 /*************************************************************************
49 IMPORTANT NOTE: Any operation that generates redo MUST check that there
50 is enough space in the redo log before for that operation. This is
51 done by calling log_free_check(). The reason for checking the
52 availability of the redo log space before the start of the operation is
53 that we MUST not hold any synchonization objects when performing the
54 check.
55 If you make a change in this module make sure that no codepath is
56 introduced where a call to log_free_check() is bypassed. */
57 
58 /***************************************************************//**
59 Removes a clustered index record. The pcur in node was positioned on the
60 record, now it is detached.
61 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
62 static  MY_ATTRIBUTE((nonnull, warn_unused_result))
63 dberr_t
row_undo_ins_remove_clust_rec(undo_node_t * node)64 row_undo_ins_remove_clust_rec(
65 /*==========================*/
66 	undo_node_t*	node)	/*!< in: undo node */
67 {
68 	btr_cur_t*	btr_cur;
69 	ibool		success;
70 	dberr_t		err;
71 	ulint		n_tries	= 0;
72 	mtr_t		mtr;
73 	dict_index_t*	index	= node->pcur.btr_cur.index;
74 	bool		online;
75 
76 	ut_ad(dict_index_is_clust(index));
77 	ut_ad(node->trx->in_rollback);
78 
79 	mtr.start();
80 	if (index->table->is_temporary()) {
81 		ut_ad(node->rec_type == TRX_UNDO_INSERT_REC);
82 		mtr.set_log_mode(MTR_LOG_NO_REDO);
83 	} else {
84 		index->set_modified(mtr);
85 		ut_ad(lock_table_has_locks(index->table));
86 	}
87 
88 	/* This is similar to row_undo_mod_clust(). The DDL thread may
89 	already have copied this row from the log to the new table.
90 	We must log the removal, so that the row will be correctly
91 	purged. However, we can log the removal out of sync with the
92 	B-tree modification. */
93 
94 	online = dict_index_is_online_ddl(index);
95 	if (online) {
96 		ut_ad(node->trx->dict_operation_lock_mode
97 		      != RW_X_LATCH);
98 		ut_ad(node->table->id != DICT_INDEXES_ID);
99 		mtr_s_lock_index(index, &mtr);
100 	}
101 
102 	success = btr_pcur_restore_position(
103 		online
104 		? BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED
105 		: BTR_MODIFY_LEAF, &node->pcur, &mtr);
106 	ut_a(success);
107 
108 	btr_cur = btr_pcur_get_btr_cur(&node->pcur);
109 
110 	ut_ad(rec_get_trx_id(btr_cur_get_rec(btr_cur), btr_cur->index)
111 	      == node->trx->id || node->table->is_temporary());
112 	ut_ad(!rec_get_deleted_flag(
113 		      btr_cur_get_rec(btr_cur),
114 		      dict_table_is_comp(btr_cur->index->table)));
115 
116 	if (online && dict_index_is_online_ddl(index)) {
117 		const rec_t*	rec	= btr_cur_get_rec(btr_cur);
118 		mem_heap_t*	heap	= NULL;
119 		const rec_offs*	offsets	= rec_get_offsets(
120 			rec, index, NULL, index->n_core_fields,
121 			ULINT_UNDEFINED, &heap);
122 		row_log_table_delete(rec, index, offsets, NULL);
123 		mem_heap_free(heap);
124 	}
125 
126 	switch (node->table->id) {
127 	case DICT_INDEXES_ID:
128 		ut_ad(!online);
129 		ut_ad(node->trx->dict_operation_lock_mode == RW_X_LATCH);
130 		ut_ad(node->rec_type == TRX_UNDO_INSERT_REC);
131 
132 		dict_drop_index_tree(
133 			btr_pcur_get_rec(&node->pcur), &node->pcur, node->trx,
134 			&mtr);
135 
136 		mtr.commit();
137 
138 		mtr.start();
139 
140 		success = btr_pcur_restore_position(
141 			BTR_MODIFY_LEAF, &node->pcur, &mtr);
142 		ut_a(success);
143 		break;
144 	case DICT_COLUMNS_ID:
145 		/* This is rolling back an INSERT into SYS_COLUMNS.
146 		If it was part of an instant ADD COLUMN operation, we
147 		must modify the table definition. At this point, any
148 		corresponding operation to the metadata record will have
149 		been rolled back. */
150 		ut_ad(!online);
151 		ut_ad(node->trx->dict_operation_lock_mode == RW_X_LATCH);
152 		ut_ad(node->rec_type == TRX_UNDO_INSERT_REC);
153 		const rec_t* rec = btr_pcur_get_rec(&node->pcur);
154 		if (rec_get_n_fields_old(rec)
155 		    != DICT_NUM_FIELDS__SYS_COLUMNS) {
156 			break;
157 		}
158 		ulint len;
159 		const byte* data = rec_get_nth_field_old(
160 			rec, DICT_FLD__SYS_COLUMNS__TABLE_ID, &len);
161 		if (len != 8) {
162 			break;
163 		}
164 		const table_id_t table_id = mach_read_from_8(data);
165 		data = rec_get_nth_field_old(rec, DICT_FLD__SYS_COLUMNS__POS,
166 					     &len);
167 		if (len != 4) {
168 			break;
169 		}
170 		const unsigned pos = mach_read_from_4(data);
171 		if (pos == 0 || pos >= (1U << 16)) {
172 			break;
173 		}
174 		dict_table_t* table = dict_table_open_on_id(
175 			table_id, true, DICT_TABLE_OP_OPEN_ONLY_IF_CACHED);
176 		if (!table) {
177 			break;
178 		}
179 
180 		dict_index_t* index = dict_table_get_first_index(table);
181 
182 		if (index && index->is_instant()
183 		    && DATA_N_SYS_COLS + 1 + pos == table->n_cols) {
184 			/* This is the rollback of an instant ADD COLUMN.
185 			Remove the column from the dictionary cache,
186 			but keep the system columns. */
187 			table->rollback_instant(pos);
188 		}
189 
190 		dict_table_close(table, true, false);
191 	}
192 
193 	if (btr_cur_optimistic_delete(btr_cur, 0, &mtr)) {
194 		err = DB_SUCCESS;
195 		goto func_exit;
196 	}
197 
198 	btr_pcur_commit_specify_mtr(&node->pcur, &mtr);
199 retry:
200 	/* If did not succeed, try pessimistic descent to tree */
201 	mtr.start();
202 	if (index->table->is_temporary()) {
203 		mtr.set_log_mode(MTR_LOG_NO_REDO);
204 	} else {
205 		index->set_modified(mtr);
206 	}
207 
208 	success = btr_pcur_restore_position(
209 			BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
210 			&node->pcur, &mtr);
211 	ut_a(success);
212 
213 	btr_cur_pessimistic_delete(&err, FALSE, btr_cur, 0, true, &mtr);
214 
215 	/* The delete operation may fail if we have little
216 	file space left: TODO: easiest to crash the database
217 	and restart with more file space */
218 
219 	if (err == DB_OUT_OF_FILE_SPACE
220 	    && n_tries < BTR_CUR_RETRY_DELETE_N_TIMES) {
221 
222 		btr_pcur_commit_specify_mtr(&(node->pcur), &mtr);
223 
224 		n_tries++;
225 
226 		os_thread_sleep(BTR_CUR_RETRY_SLEEP_TIME);
227 
228 		goto retry;
229 	}
230 
231 func_exit:
232 	btr_pcur_commit_specify_mtr(&node->pcur, &mtr);
233 	if (err == DB_SUCCESS && node->rec_type == TRX_UNDO_INSERT_METADATA) {
234 		/* When rolling back the very first instant ADD COLUMN
235 		operation, reset the root page to the basic state. */
236 		ut_ad(!index->table->is_temporary());
237 		mtr.start();
238 		if (page_t* root = btr_root_get(index, &mtr)) {
239 			byte* page_type = root + FIL_PAGE_TYPE;
240 			ut_ad(mach_read_from_2(page_type)
241 			      == FIL_PAGE_TYPE_INSTANT
242 			      || mach_read_from_2(page_type)
243 			      == FIL_PAGE_INDEX);
244 			index->set_modified(mtr);
245 			mlog_write_ulint(page_type, FIL_PAGE_INDEX,
246 					 MLOG_2BYTES, &mtr);
247 			byte* instant = PAGE_INSTANT + PAGE_HEADER + root;
248 			mlog_write_ulint(instant,
249 					 page_ptr_get_direction(instant + 1),
250 					 MLOG_2BYTES, &mtr);
251 		}
252 		mtr.commit();
253 	}
254 
255 	return(err);
256 }
257 
258 /***************************************************************//**
259 Removes a secondary index entry if found.
260 @return DB_SUCCESS, DB_FAIL, or DB_OUT_OF_FILE_SPACE */
261 static MY_ATTRIBUTE((nonnull, warn_unused_result))
262 dberr_t
row_undo_ins_remove_sec_low(ulint mode,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)263 row_undo_ins_remove_sec_low(
264 /*========================*/
265 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
266 				depending on whether we wish optimistic or
267 				pessimistic descent down the index tree */
268 	dict_index_t*	index,	/*!< in: index */
269 	dtuple_t*	entry,	/*!< in: index entry to remove */
270 	que_thr_t*	thr)	/*!< in: query thread */
271 {
272 	btr_pcur_t		pcur;
273 	dberr_t			err	= DB_SUCCESS;
274 	mtr_t			mtr;
275 	const bool		modify_leaf = mode == BTR_MODIFY_LEAF;
276 
277 	row_mtr_start(&mtr, index, !modify_leaf);
278 
279 	if (modify_leaf) {
280 		mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
281 		mtr_s_lock_index(index, &mtr);
282 	} else {
283 		ut_ad(mode == (BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE));
284 		mtr_sx_lock_index(index, &mtr);
285 	}
286 
287 	if (row_log_online_op_try(index, entry, 0)) {
288 		goto func_exit_no_pcur;
289 	}
290 
291 	if (dict_index_is_spatial(index)) {
292 		if (modify_leaf) {
293 			mode |= BTR_RTREE_DELETE_MARK;
294 		}
295 		btr_pcur_get_btr_cur(&pcur)->thr = thr;
296 		mode |= BTR_RTREE_UNDO_INS;
297 	}
298 
299 	switch (row_search_index_entry(index, entry, mode, &pcur, &mtr)) {
300 	case ROW_BUFFERED:
301 	case ROW_NOT_DELETED_REF:
302 		/* These are invalid outcomes, because the mode passed
303 		to row_search_index_entry() did not include any of the
304 		flags BTR_INSERT, BTR_DELETE, or BTR_DELETE_MARK. */
305 		ut_error;
306 	case ROW_NOT_FOUND:
307 		break;
308 	case ROW_FOUND:
309 		if (dict_index_is_spatial(index)
310 		    && rec_get_deleted_flag(
311 			    btr_pcur_get_rec(&pcur),
312 			    dict_table_is_comp(index->table))) {
313 			ib::error() << "Record found in index " << index->name
314 				<< " is deleted marked on insert rollback.";
315 			ut_ad(0);
316 		}
317 
318 		btr_cur_t* btr_cur = btr_pcur_get_btr_cur(&pcur);
319 
320 		if (modify_leaf) {
321 			err = btr_cur_optimistic_delete(btr_cur, 0, &mtr)
322 				? DB_SUCCESS : DB_FAIL;
323 		} else {
324 			/* Passing rollback=false here, because we are
325 			deleting a secondary index record: the distinction
326 			only matters when deleting a record that contains
327 			externally stored columns. */
328 			btr_cur_pessimistic_delete(&err, FALSE, btr_cur, 0,
329 						   false, &mtr);
330 		}
331 	}
332 
333 	btr_pcur_close(&pcur);
334 func_exit_no_pcur:
335 	mtr_commit(&mtr);
336 
337 	return(err);
338 }
339 
340 /***************************************************************//**
341 Removes a secondary index entry from the index if found. Tries first
342 optimistic, then pessimistic descent down the tree.
343 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
344 static MY_ATTRIBUTE((nonnull, warn_unused_result))
345 dberr_t
row_undo_ins_remove_sec(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)346 row_undo_ins_remove_sec(
347 /*====================*/
348 	dict_index_t*	index,	/*!< in: index */
349 	dtuple_t*	entry,	/*!< in: index entry to insert */
350 	que_thr_t*	thr)	/*!< in: query thread */
351 {
352 	dberr_t	err;
353 	ulint	n_tries	= 0;
354 
355 	/* Try first optimistic descent to the B-tree */
356 
357 	err = row_undo_ins_remove_sec_low(BTR_MODIFY_LEAF, index, entry, thr);
358 
359 	if (err == DB_SUCCESS) {
360 
361 		return(err);
362 	}
363 
364 	/* Try then pessimistic descent to the B-tree */
365 retry:
366 	err = row_undo_ins_remove_sec_low(
367 		BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
368 		index, entry, thr);
369 
370 	/* The delete operation may fail if we have little
371 	file space left: TODO: easiest to crash the database
372 	and restart with more file space */
373 
374 	if (err != DB_SUCCESS && n_tries < BTR_CUR_RETRY_DELETE_N_TIMES) {
375 
376 		n_tries++;
377 
378 		os_thread_sleep(BTR_CUR_RETRY_SLEEP_TIME);
379 
380 		goto retry;
381 	}
382 
383 	return(err);
384 }
385 
386 /***********************************************************//**
387 Parses the row reference and other info in a fresh insert undo record. */
388 static
389 void
row_undo_ins_parse_undo_rec(undo_node_t * node,ibool dict_locked)390 row_undo_ins_parse_undo_rec(
391 /*========================*/
392 	undo_node_t*	node,		/*!< in/out: row undo node */
393 	ibool		dict_locked)	/*!< in: TRUE if own dict_sys->mutex */
394 {
395 	dict_index_t*	clust_index;
396 	byte*		ptr;
397 	undo_no_t	undo_no;
398 	table_id_t	table_id;
399 	ulint		dummy;
400 	bool		dummy_extern;
401 
402 	ut_ad(node);
403 
404 	ptr = trx_undo_rec_get_pars(node->undo_rec, &node->rec_type, &dummy,
405 				    &dummy_extern, &undo_no, &table_id);
406 
407 	node->update = NULL;
408 	node->table = dict_table_open_on_id(
409 		table_id, dict_locked, DICT_TABLE_OP_NORMAL);
410 
411 	/* Skip the UNDO if we can't find the table or the .ibd file. */
412 	if (UNIV_UNLIKELY(node->table == NULL)) {
413 		return;
414 	}
415 
416 	switch (node->rec_type) {
417 	default:
418 		ut_ad(!"wrong undo record type");
419 		goto close_table;
420 	case TRX_UNDO_INSERT_METADATA:
421 	case TRX_UNDO_INSERT_REC:
422 		break;
423 	case TRX_UNDO_RENAME_TABLE:
424 		dict_table_t* table = node->table;
425 		ut_ad(!table->is_temporary());
426 		ut_ad(dict_table_is_file_per_table(table)
427 		      == !is_system_tablespace(table->space_id));
428 		size_t len = mach_read_from_2(node->undo_rec)
429 			+ size_t(node->undo_rec - ptr) - 2;
430 		ptr[len] = 0;
431 		const char* name = reinterpret_cast<char*>(ptr);
432 		if (strcmp(table->name.m_name, name)) {
433 			dict_table_rename_in_cache(table, name, false,
434 						   table_id != 0);
435 		}
436 		goto close_table;
437 	}
438 
439 	if (UNIV_UNLIKELY(!fil_table_accessible(node->table))) {
440 close_table:
441 		/* Normally, tables should not disappear or become
442 		unaccessible during ROLLBACK, because they should be
443 		protected by InnoDB table locks. Corruption could be
444 		a valid exception.
445 
446 		FIXME: When running out of temporary tablespace, it
447 		would probably be better to just drop all temporary
448 		tables (and temporary undo log records) of the current
449 		connection, instead of doing this rollback. */
450 		dict_table_close(node->table, dict_locked, FALSE);
451 		node->table = NULL;
452 	} else {
453 		ut_ad(!node->table->skip_alter_undo);
454 		clust_index = dict_table_get_first_index(node->table);
455 
456 		if (clust_index != NULL) {
457 			if (node->rec_type == TRX_UNDO_INSERT_REC) {
458 				ptr = trx_undo_rec_get_row_ref(
459 					ptr, clust_index, &node->ref,
460 					node->heap);
461 			} else {
462 				node->ref = &trx_undo_metadata;
463 				if (!row_undo_search_clust_to_pcur(node)) {
464 					/* An error probably occurred during
465 					an insert into the clustered index,
466 					after we wrote the undo log record. */
467 					goto close_table;
468 				}
469 				return;
470 			}
471 
472 			if (!row_undo_search_clust_to_pcur(node)) {
473 				/* An error probably occurred during
474 				an insert into the clustered index,
475 				after we wrote the undo log record. */
476 				goto close_table;
477 			}
478 			if (node->table->n_v_cols) {
479 				trx_undo_read_v_cols(node->table, ptr,
480 						     node->row, false);
481 			}
482 
483 		} else {
484 			ib::warn() << "Table " << node->table->name
485 				 << " has no indexes,"
486 				" ignoring the table";
487 			goto close_table;
488 		}
489 	}
490 }
491 
492 /***************************************************************//**
493 Removes secondary index records.
494 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
495 static MY_ATTRIBUTE((nonnull, warn_unused_result))
496 dberr_t
row_undo_ins_remove_sec_rec(undo_node_t * node,que_thr_t * thr)497 row_undo_ins_remove_sec_rec(
498 /*========================*/
499 	undo_node_t*	node,	/*!< in/out: row undo node */
500 	que_thr_t*	thr)	/*!< in: query thread */
501 {
502 	dberr_t		err	= DB_SUCCESS;
503 	dict_index_t*	index	= node->index;
504 	mem_heap_t*	heap;
505 
506 	heap = mem_heap_create(1024);
507 
508 	while (index != NULL) {
509 		dtuple_t*	entry;
510 
511 		if (index->type & DICT_FTS) {
512 			dict_table_next_uncorrupted_index(index);
513 			continue;
514 		}
515 
516 		/* An insert undo record TRX_UNDO_INSERT_REC will
517 		always contain all fields of the index. It does not
518 		matter if any indexes were created afterwards; all
519 		index entries can be reconstructed from the row. */
520 		entry = row_build_index_entry(
521 			node->row, node->ext, index, heap);
522 		if (UNIV_UNLIKELY(!entry)) {
523 			/* The database must have crashed after
524 			inserting a clustered index record but before
525 			writing all the externally stored columns of
526 			that record, or a statement is being rolled
527 			back because an error occurred while storing
528 			off-page columns.
529 
530 			Because secondary index entries are inserted
531 			after the clustered index record, we may
532 			assume that the secondary index record does
533 			not exist. */
534 		} else {
535 			err = row_undo_ins_remove_sec(index, entry, thr);
536 
537 			if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
538 				goto func_exit;
539 			}
540 		}
541 
542 		mem_heap_empty(heap);
543 		dict_table_next_uncorrupted_index(index);
544 	}
545 
546 func_exit:
547 	node->index = index;
548 	mem_heap_free(heap);
549 	return(err);
550 }
551 
552 /***********************************************************//**
553 Undoes a fresh insert of a row to a table. A fresh insert means that
554 the same clustered index unique key did not have any record, even delete
555 marked, at the time of the insert.  InnoDB is eager in a rollback:
556 if it figures out that an index record will be removed in the purge
557 anyway, it will remove it in the rollback.
558 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
559 dberr_t
row_undo_ins(undo_node_t * node,que_thr_t * thr)560 row_undo_ins(
561 /*=========*/
562 	undo_node_t*	node,	/*!< in: row undo node */
563 	que_thr_t*	thr)	/*!< in: query thread */
564 {
565 	dberr_t	err;
566 	ibool	dict_locked;
567 
568 	ut_ad(node->state == UNDO_NODE_INSERT);
569 	ut_ad(node->trx->in_rollback);
570 	ut_ad(trx_undo_roll_ptr_is_insert(node->roll_ptr));
571 
572 	dict_locked = node->trx->dict_operation_lock_mode == RW_X_LATCH;
573 
574 	row_undo_ins_parse_undo_rec(node, dict_locked);
575 
576 	if (node->table == NULL) {
577 		return(DB_SUCCESS);
578 	}
579 
580 	/* Iterate over all the indexes and undo the insert.*/
581 
582 	node->index = dict_table_get_first_index(node->table);
583 	ut_ad(dict_index_is_clust(node->index));
584 
585 	switch (node->rec_type) {
586 	default:
587 		ut_ad(!"wrong undo record type");
588 		/* fall through */
589 	case TRX_UNDO_INSERT_REC:
590 		/* Skip the clustered index (the first index) */
591 		node->index = dict_table_get_next_index(node->index);
592 
593 		dict_table_skip_corrupt_index(node->index);
594 
595 		err = row_undo_ins_remove_sec_rec(node, thr);
596 
597 		if (err != DB_SUCCESS) {
598 			break;
599 		}
600 
601 		/* fall through */
602 	case TRX_UNDO_INSERT_METADATA:
603 		log_free_check();
604 
605 		if (node->table->id == DICT_INDEXES_ID) {
606 			ut_ad(node->rec_type == TRX_UNDO_INSERT_REC);
607 
608 			if (!dict_locked) {
609 				mutex_enter(&dict_sys->mutex);
610 			}
611 		}
612 
613 		// FIXME: We need to update the dict_index_t::space and
614 		// page number fields too.
615 		err = row_undo_ins_remove_clust_rec(node);
616 
617 		if (node->table->id == DICT_INDEXES_ID
618 		    && !dict_locked) {
619 
620 			mutex_exit(&dict_sys->mutex);
621 		}
622 
623 		if (err == DB_SUCCESS && node->table->stat_initialized) {
624 			/* Not protected by dict_sys->mutex for
625 			performance reasons, we would rather get garbage
626 			in stat_n_rows (which is just an estimate anyway)
627 			than protecting the following code with a latch. */
628 			dict_table_n_rows_dec(node->table);
629 
630 			/* Do not attempt to update statistics when
631 			executing ROLLBACK in the InnoDB SQL
632 			interpreter, because in that case we would
633 			already be holding dict_sys->mutex, which
634 			would be acquired when updating statistics. */
635 			if (!dict_locked) {
636 				dict_stats_update_if_needed(node->table,
637 							    *node->trx);
638 			}
639 		}
640 	}
641 
642 	dict_table_close(node->table, dict_locked, FALSE);
643 
644 	node->table = NULL;
645 
646 	return(err);
647 }
648