xref: /linux/fs/bcachefs/btree_write_buffer.c (revision d642ef71)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include "bcachefs.h"
4 #include "btree_locking.h"
5 #include "btree_update.h"
6 #include "btree_update_interior.h"
7 #include "btree_write_buffer.h"
8 #include "error.h"
9 #include "journal.h"
10 #include "journal_reclaim.h"
11 
12 #include <linux/sort.h>
13 
14 static int btree_write_buffered_key_cmp(const void *_l, const void *_r)
15 {
16 	const struct btree_write_buffered_key *l = _l;
17 	const struct btree_write_buffered_key *r = _r;
18 
19 	return  cmp_int(l->btree, r->btree) ?:
20 		bpos_cmp(l->k.k.p, r->k.k.p) ?:
21 		cmp_int(l->journal_seq, r->journal_seq) ?:
22 		cmp_int(l->journal_offset, r->journal_offset);
23 }
24 
25 static int btree_write_buffered_journal_cmp(const void *_l, const void *_r)
26 {
27 	const struct btree_write_buffered_key *l = _l;
28 	const struct btree_write_buffered_key *r = _r;
29 
30 	return  cmp_int(l->journal_seq, r->journal_seq);
31 }
32 
33 static int bch2_btree_write_buffer_flush_one(struct btree_trans *trans,
34 					     struct btree_iter *iter,
35 					     struct btree_write_buffered_key *wb,
36 					     unsigned commit_flags,
37 					     bool *write_locked,
38 					     size_t *fast)
39 {
40 	struct bch_fs *c = trans->c;
41 	struct btree_path *path;
42 	int ret;
43 
44 	ret = bch2_btree_iter_traverse(iter);
45 	if (ret)
46 		return ret;
47 
48 	path = iter->path;
49 
50 	if (!*write_locked) {
51 		ret = bch2_btree_node_lock_write(trans, path, &path->l[0].b->c);
52 		if (ret)
53 			return ret;
54 
55 		bch2_btree_node_prep_for_write(trans, path, path->l[0].b);
56 		*write_locked = true;
57 	}
58 
59 	if (!bch2_btree_node_insert_fits(c, path->l[0].b, wb->k.k.u64s)) {
60 		bch2_btree_node_unlock_write(trans, path, path->l[0].b);
61 		*write_locked = false;
62 		goto trans_commit;
63 	}
64 
65 	bch2_btree_insert_key_leaf(trans, path, &wb->k, wb->journal_seq);
66 	(*fast)++;
67 
68 	if (path->ref > 1) {
69 		/*
70 		 * We can't clone a path that has write locks: if the path is
71 		 * shared, unlock before set_pos(), traverse():
72 		 */
73 		bch2_btree_node_unlock_write(trans, path, path->l[0].b);
74 		*write_locked = false;
75 	}
76 	return 0;
77 trans_commit:
78 	return  bch2_trans_update_seq(trans, wb->journal_seq, iter, &wb->k,
79 				      BTREE_UPDATE_INTERNAL_SNAPSHOT_NODE) ?:
80 		bch2_trans_commit(trans, NULL, NULL,
81 				  commit_flags|
82 				  BTREE_INSERT_NOCHECK_RW|
83 				  BTREE_INSERT_NOFAIL|
84 				  BTREE_INSERT_JOURNAL_RECLAIM);
85 }
86 
87 static union btree_write_buffer_state btree_write_buffer_switch(struct btree_write_buffer *wb)
88 {
89 	union btree_write_buffer_state old, new;
90 	u64 v = READ_ONCE(wb->state.v);
91 
92 	do {
93 		old.v = new.v = v;
94 
95 		new.nr = 0;
96 		new.idx++;
97 	} while ((v = atomic64_cmpxchg_acquire(&wb->state.counter, old.v, new.v)) != old.v);
98 
99 	while (old.idx == 0 ? wb->state.ref0 : wb->state.ref1)
100 		cpu_relax();
101 
102 	smp_mb();
103 
104 	return old;
105 }
106 
107 /*
108  * Update a btree with a write buffered key using the journal seq of the
109  * original write buffer insert.
110  *
111  * It is not safe to rejournal the key once it has been inserted into the write
112  * buffer because that may break recovery ordering. For example, the key may
113  * have already been modified in the active write buffer in a seq that comes
114  * before the current transaction. If we were to journal this key again and
115  * crash, recovery would process updates in the wrong order.
116  */
117 static int
118 btree_write_buffered_insert(struct btree_trans *trans,
119 			  struct btree_write_buffered_key *wb)
120 {
121 	struct btree_iter iter;
122 	int ret;
123 
124 	bch2_trans_iter_init(trans, &iter, wb->btree, bkey_start_pos(&wb->k.k),
125 			     BTREE_ITER_CACHED|BTREE_ITER_INTENT);
126 
127 	ret   = bch2_btree_iter_traverse(&iter) ?:
128 		bch2_trans_update_seq(trans, wb->journal_seq, &iter, &wb->k,
129 				      BTREE_UPDATE_INTERNAL_SNAPSHOT_NODE);
130 	bch2_trans_iter_exit(trans, &iter);
131 	return ret;
132 }
133 
134 int __bch2_btree_write_buffer_flush(struct btree_trans *trans, unsigned commit_flags,
135 				    bool locked)
136 {
137 	struct bch_fs *c = trans->c;
138 	struct journal *j = &c->journal;
139 	struct btree_write_buffer *wb = &c->btree_write_buffer;
140 	struct journal_entry_pin pin;
141 	struct btree_write_buffered_key *i, *keys;
142 	struct btree_iter iter = { NULL };
143 	size_t nr = 0, skipped = 0, fast = 0, slowpath = 0;
144 	bool write_locked = false;
145 	union btree_write_buffer_state s;
146 	int ret = 0;
147 
148 	memset(&pin, 0, sizeof(pin));
149 
150 	if (!locked && !mutex_trylock(&wb->flush_lock))
151 		return 0;
152 
153 	bch2_journal_pin_copy(j, &pin, &wb->journal_pin, NULL);
154 	bch2_journal_pin_drop(j, &wb->journal_pin);
155 
156 	s = btree_write_buffer_switch(wb);
157 	keys = wb->keys[s.idx];
158 	nr = s.nr;
159 
160 	if (race_fault())
161 		goto slowpath;
162 
163 	/*
164 	 * We first sort so that we can detect and skip redundant updates, and
165 	 * then we attempt to flush in sorted btree order, as this is most
166 	 * efficient.
167 	 *
168 	 * However, since we're not flushing in the order they appear in the
169 	 * journal we won't be able to drop our journal pin until everything is
170 	 * flushed - which means this could deadlock the journal if we weren't
171 	 * passing BTREE_INSERT_JOURNAL_RECLAIM. This causes the update to fail
172 	 * if it would block taking a journal reservation.
173 	 *
174 	 * If that happens, simply skip the key so we can optimistically insert
175 	 * as many keys as possible in the fast path.
176 	 */
177 	sort(keys, nr, sizeof(keys[0]),
178 	     btree_write_buffered_key_cmp, NULL);
179 
180 	for (i = keys; i < keys + nr; i++) {
181 		if (i + 1 < keys + nr &&
182 		    i[0].btree == i[1].btree &&
183 		    bpos_eq(i[0].k.k.p, i[1].k.k.p)) {
184 			skipped++;
185 			i->journal_seq = 0;
186 			continue;
187 		}
188 
189 		if (write_locked &&
190 		    (iter.path->btree_id != i->btree ||
191 		     bpos_gt(i->k.k.p, iter.path->l[0].b->key.k.p))) {
192 			bch2_btree_node_unlock_write(trans, iter.path, iter.path->l[0].b);
193 			write_locked = false;
194 		}
195 
196 		if (!iter.path || iter.path->btree_id != i->btree) {
197 			bch2_trans_iter_exit(trans, &iter);
198 			bch2_trans_iter_init(trans, &iter, i->btree, i->k.k.p,
199 					     BTREE_ITER_INTENT|BTREE_ITER_ALL_SNAPSHOTS);
200 		}
201 
202 		bch2_btree_iter_set_pos(&iter, i->k.k.p);
203 		iter.path->preserve = false;
204 
205 		do {
206 			ret = bch2_btree_write_buffer_flush_one(trans, &iter, i,
207 						commit_flags, &write_locked, &fast);
208 			if (!write_locked)
209 				bch2_trans_begin(trans);
210 		} while (bch2_err_matches(ret, BCH_ERR_transaction_restart));
211 
212 		if (ret == -BCH_ERR_journal_reclaim_would_deadlock) {
213 			slowpath++;
214 			continue;
215 		}
216 		if (ret)
217 			break;
218 
219 		i->journal_seq = 0;
220 	}
221 
222 	if (write_locked)
223 		bch2_btree_node_unlock_write(trans, iter.path, iter.path->l[0].b);
224 	bch2_trans_iter_exit(trans, &iter);
225 
226 	trace_write_buffer_flush(trans, nr, skipped, fast, wb->size);
227 
228 	if (slowpath)
229 		goto slowpath;
230 
231 	bch2_fs_fatal_err_on(ret, c, "%s: insert error %s", __func__, bch2_err_str(ret));
232 out:
233 	bch2_journal_pin_drop(j, &pin);
234 	mutex_unlock(&wb->flush_lock);
235 	return ret;
236 slowpath:
237 	trace_write_buffer_flush_slowpath(trans, i - keys, nr);
238 
239 	/*
240 	 * Now sort the rest by journal seq and bump the journal pin as we go.
241 	 * The slowpath zapped the seq of keys that were successfully flushed so
242 	 * we can skip those here.
243 	 */
244 	sort(keys, nr, sizeof(keys[0]),
245 	     btree_write_buffered_journal_cmp,
246 	     NULL);
247 
248 	commit_flags &= ~BCH_WATERMARK_MASK;
249 	commit_flags |= BCH_WATERMARK_reclaim;
250 
251 	for (i = keys; i < keys + nr; i++) {
252 		if (!i->journal_seq)
253 			continue;
254 
255 		if (i->journal_seq > pin.seq) {
256 			struct journal_entry_pin pin2;
257 
258 			memset(&pin2, 0, sizeof(pin2));
259 
260 			bch2_journal_pin_add(j, i->journal_seq, &pin2, NULL);
261 			bch2_journal_pin_drop(j, &pin);
262 			bch2_journal_pin_copy(j, &pin, &pin2, NULL);
263 			bch2_journal_pin_drop(j, &pin2);
264 		}
265 
266 		ret = commit_do(trans, NULL, NULL,
267 				commit_flags|
268 				BTREE_INSERT_NOFAIL|
269 				BTREE_INSERT_JOURNAL_RECLAIM,
270 				btree_write_buffered_insert(trans, i));
271 		if (bch2_fs_fatal_err_on(ret, c, "%s: insert error %s", __func__, bch2_err_str(ret)))
272 			break;
273 	}
274 
275 	goto out;
276 }
277 
278 int bch2_btree_write_buffer_flush_sync(struct btree_trans *trans)
279 {
280 	bch2_trans_unlock(trans);
281 	mutex_lock(&trans->c->btree_write_buffer.flush_lock);
282 	return __bch2_btree_write_buffer_flush(trans, 0, true);
283 }
284 
285 int bch2_btree_write_buffer_flush(struct btree_trans *trans)
286 {
287 	return __bch2_btree_write_buffer_flush(trans, 0, false);
288 }
289 
290 static int bch2_btree_write_buffer_journal_flush(struct journal *j,
291 				struct journal_entry_pin *_pin, u64 seq)
292 {
293 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
294 	struct btree_write_buffer *wb = &c->btree_write_buffer;
295 
296 	mutex_lock(&wb->flush_lock);
297 
298 	return bch2_trans_run(c,
299 			__bch2_btree_write_buffer_flush(trans, BTREE_INSERT_NOCHECK_RW, true));
300 }
301 
302 static inline u64 btree_write_buffer_ref(int idx)
303 {
304 	return ((union btree_write_buffer_state) {
305 		.ref0 = idx == 0,
306 		.ref1 = idx == 1,
307 	}).v;
308 }
309 
310 int bch2_btree_insert_keys_write_buffer(struct btree_trans *trans)
311 {
312 	struct bch_fs *c = trans->c;
313 	struct btree_write_buffer *wb = &c->btree_write_buffer;
314 	struct btree_write_buffered_key *i;
315 	union btree_write_buffer_state old, new;
316 	int ret = 0;
317 	u64 v;
318 
319 	trans_for_each_wb_update(trans, i) {
320 		EBUG_ON(i->k.k.u64s > BTREE_WRITE_BUFERED_U64s_MAX);
321 
322 		i->journal_seq		= trans->journal_res.seq;
323 		i->journal_offset	= trans->journal_res.offset;
324 	}
325 
326 	preempt_disable();
327 	v = READ_ONCE(wb->state.v);
328 	do {
329 		old.v = new.v = v;
330 
331 		new.v += btree_write_buffer_ref(new.idx);
332 		new.nr += trans->nr_wb_updates;
333 		if (new.nr > wb->size) {
334 			ret = -BCH_ERR_btree_insert_need_flush_buffer;
335 			goto out;
336 		}
337 	} while ((v = atomic64_cmpxchg_acquire(&wb->state.counter, old.v, new.v)) != old.v);
338 
339 	memcpy(wb->keys[new.idx] + old.nr,
340 	       trans->wb_updates,
341 	       sizeof(trans->wb_updates[0]) * trans->nr_wb_updates);
342 
343 	bch2_journal_pin_add(&c->journal, trans->journal_res.seq, &wb->journal_pin,
344 			     bch2_btree_write_buffer_journal_flush);
345 
346 	atomic64_sub_return_release(btree_write_buffer_ref(new.idx), &wb->state.counter);
347 out:
348 	preempt_enable();
349 	return ret;
350 }
351 
352 void bch2_fs_btree_write_buffer_exit(struct bch_fs *c)
353 {
354 	struct btree_write_buffer *wb = &c->btree_write_buffer;
355 
356 	BUG_ON(wb->state.nr && !bch2_journal_error(&c->journal));
357 
358 	kvfree(wb->keys[1]);
359 	kvfree(wb->keys[0]);
360 }
361 
362 int bch2_fs_btree_write_buffer_init(struct bch_fs *c)
363 {
364 	struct btree_write_buffer *wb = &c->btree_write_buffer;
365 
366 	mutex_init(&wb->flush_lock);
367 	wb->size = c->opts.btree_write_buffer_size;
368 
369 	wb->keys[0] = kvmalloc_array(wb->size, sizeof(*wb->keys[0]), GFP_KERNEL);
370 	wb->keys[1] = kvmalloc_array(wb->size, sizeof(*wb->keys[1]), GFP_KERNEL);
371 	if (!wb->keys[0] || !wb->keys[1])
372 		return -BCH_ERR_ENOMEM_fs_btree_write_buffer_init;
373 
374 	return 0;
375 }
376