1 // SPDX-License-Identifier: GPL-2.0 2 3 #include "bcachefs.h" 4 #include "btree_locking.h" 5 #include "btree_update.h" 6 #include "btree_update_interior.h" 7 #include "btree_write_buffer.h" 8 #include "error.h" 9 #include "journal.h" 10 #include "journal_reclaim.h" 11 12 #include <linux/sort.h> 13 14 static int btree_write_buffered_key_cmp(const void *_l, const void *_r) 15 { 16 const struct btree_write_buffered_key *l = _l; 17 const struct btree_write_buffered_key *r = _r; 18 19 return cmp_int(l->btree, r->btree) ?: 20 bpos_cmp(l->k.k.p, r->k.k.p) ?: 21 cmp_int(l->journal_seq, r->journal_seq) ?: 22 cmp_int(l->journal_offset, r->journal_offset); 23 } 24 25 static int btree_write_buffered_journal_cmp(const void *_l, const void *_r) 26 { 27 const struct btree_write_buffered_key *l = _l; 28 const struct btree_write_buffered_key *r = _r; 29 30 return cmp_int(l->journal_seq, r->journal_seq); 31 } 32 33 static int bch2_btree_write_buffer_flush_one(struct btree_trans *trans, 34 struct btree_iter *iter, 35 struct btree_write_buffered_key *wb, 36 unsigned commit_flags, 37 bool *write_locked, 38 size_t *fast) 39 { 40 struct bch_fs *c = trans->c; 41 struct btree_path *path; 42 int ret; 43 44 ret = bch2_btree_iter_traverse(iter); 45 if (ret) 46 return ret; 47 48 path = iter->path; 49 50 if (!*write_locked) { 51 ret = bch2_btree_node_lock_write(trans, path, &path->l[0].b->c); 52 if (ret) 53 return ret; 54 55 bch2_btree_node_prep_for_write(trans, path, path->l[0].b); 56 *write_locked = true; 57 } 58 59 if (!bch2_btree_node_insert_fits(c, path->l[0].b, wb->k.k.u64s)) { 60 bch2_btree_node_unlock_write(trans, path, path->l[0].b); 61 *write_locked = false; 62 goto trans_commit; 63 } 64 65 bch2_btree_insert_key_leaf(trans, path, &wb->k, wb->journal_seq); 66 (*fast)++; 67 68 if (path->ref > 1) { 69 /* 70 * We can't clone a path that has write locks: if the path is 71 * shared, unlock before set_pos(), traverse(): 72 */ 73 bch2_btree_node_unlock_write(trans, path, path->l[0].b); 74 *write_locked = false; 75 } 76 return 0; 77 trans_commit: 78 return bch2_trans_update_seq(trans, wb->journal_seq, iter, &wb->k, 79 BTREE_UPDATE_INTERNAL_SNAPSHOT_NODE) ?: 80 bch2_trans_commit(trans, NULL, NULL, 81 commit_flags| 82 BTREE_INSERT_NOCHECK_RW| 83 BTREE_INSERT_NOFAIL| 84 BTREE_INSERT_JOURNAL_RECLAIM); 85 } 86 87 static union btree_write_buffer_state btree_write_buffer_switch(struct btree_write_buffer *wb) 88 { 89 union btree_write_buffer_state old, new; 90 u64 v = READ_ONCE(wb->state.v); 91 92 do { 93 old.v = new.v = v; 94 95 new.nr = 0; 96 new.idx++; 97 } while ((v = atomic64_cmpxchg_acquire(&wb->state.counter, old.v, new.v)) != old.v); 98 99 while (old.idx == 0 ? wb->state.ref0 : wb->state.ref1) 100 cpu_relax(); 101 102 smp_mb(); 103 104 return old; 105 } 106 107 /* 108 * Update a btree with a write buffered key using the journal seq of the 109 * original write buffer insert. 110 * 111 * It is not safe to rejournal the key once it has been inserted into the write 112 * buffer because that may break recovery ordering. For example, the key may 113 * have already been modified in the active write buffer in a seq that comes 114 * before the current transaction. If we were to journal this key again and 115 * crash, recovery would process updates in the wrong order. 116 */ 117 static int 118 btree_write_buffered_insert(struct btree_trans *trans, 119 struct btree_write_buffered_key *wb) 120 { 121 struct btree_iter iter; 122 int ret; 123 124 bch2_trans_iter_init(trans, &iter, wb->btree, bkey_start_pos(&wb->k.k), 125 BTREE_ITER_CACHED|BTREE_ITER_INTENT); 126 127 ret = bch2_btree_iter_traverse(&iter) ?: 128 bch2_trans_update_seq(trans, wb->journal_seq, &iter, &wb->k, 129 BTREE_UPDATE_INTERNAL_SNAPSHOT_NODE); 130 bch2_trans_iter_exit(trans, &iter); 131 return ret; 132 } 133 134 int __bch2_btree_write_buffer_flush(struct btree_trans *trans, unsigned commit_flags, 135 bool locked) 136 { 137 struct bch_fs *c = trans->c; 138 struct journal *j = &c->journal; 139 struct btree_write_buffer *wb = &c->btree_write_buffer; 140 struct journal_entry_pin pin; 141 struct btree_write_buffered_key *i, *keys; 142 struct btree_iter iter = { NULL }; 143 size_t nr = 0, skipped = 0, fast = 0, slowpath = 0; 144 bool write_locked = false; 145 union btree_write_buffer_state s; 146 int ret = 0; 147 148 memset(&pin, 0, sizeof(pin)); 149 150 if (!locked && !mutex_trylock(&wb->flush_lock)) 151 return 0; 152 153 bch2_journal_pin_copy(j, &pin, &wb->journal_pin, NULL); 154 bch2_journal_pin_drop(j, &wb->journal_pin); 155 156 s = btree_write_buffer_switch(wb); 157 keys = wb->keys[s.idx]; 158 nr = s.nr; 159 160 if (race_fault()) 161 goto slowpath; 162 163 /* 164 * We first sort so that we can detect and skip redundant updates, and 165 * then we attempt to flush in sorted btree order, as this is most 166 * efficient. 167 * 168 * However, since we're not flushing in the order they appear in the 169 * journal we won't be able to drop our journal pin until everything is 170 * flushed - which means this could deadlock the journal if we weren't 171 * passing BTREE_INSERT_JOURNAL_RECLAIM. This causes the update to fail 172 * if it would block taking a journal reservation. 173 * 174 * If that happens, simply skip the key so we can optimistically insert 175 * as many keys as possible in the fast path. 176 */ 177 sort(keys, nr, sizeof(keys[0]), 178 btree_write_buffered_key_cmp, NULL); 179 180 for (i = keys; i < keys + nr; i++) { 181 if (i + 1 < keys + nr && 182 i[0].btree == i[1].btree && 183 bpos_eq(i[0].k.k.p, i[1].k.k.p)) { 184 skipped++; 185 i->journal_seq = 0; 186 continue; 187 } 188 189 if (write_locked && 190 (iter.path->btree_id != i->btree || 191 bpos_gt(i->k.k.p, iter.path->l[0].b->key.k.p))) { 192 bch2_btree_node_unlock_write(trans, iter.path, iter.path->l[0].b); 193 write_locked = false; 194 } 195 196 if (!iter.path || iter.path->btree_id != i->btree) { 197 bch2_trans_iter_exit(trans, &iter); 198 bch2_trans_iter_init(trans, &iter, i->btree, i->k.k.p, 199 BTREE_ITER_INTENT|BTREE_ITER_ALL_SNAPSHOTS); 200 } 201 202 bch2_btree_iter_set_pos(&iter, i->k.k.p); 203 iter.path->preserve = false; 204 205 do { 206 ret = bch2_btree_write_buffer_flush_one(trans, &iter, i, 207 commit_flags, &write_locked, &fast); 208 if (!write_locked) 209 bch2_trans_begin(trans); 210 } while (bch2_err_matches(ret, BCH_ERR_transaction_restart)); 211 212 if (ret == -BCH_ERR_journal_reclaim_would_deadlock) { 213 slowpath++; 214 continue; 215 } 216 if (ret) 217 break; 218 219 i->journal_seq = 0; 220 } 221 222 if (write_locked) 223 bch2_btree_node_unlock_write(trans, iter.path, iter.path->l[0].b); 224 bch2_trans_iter_exit(trans, &iter); 225 226 trace_write_buffer_flush(trans, nr, skipped, fast, wb->size); 227 228 if (slowpath) 229 goto slowpath; 230 231 bch2_fs_fatal_err_on(ret, c, "%s: insert error %s", __func__, bch2_err_str(ret)); 232 out: 233 bch2_journal_pin_drop(j, &pin); 234 mutex_unlock(&wb->flush_lock); 235 return ret; 236 slowpath: 237 trace_write_buffer_flush_slowpath(trans, i - keys, nr); 238 239 /* 240 * Now sort the rest by journal seq and bump the journal pin as we go. 241 * The slowpath zapped the seq of keys that were successfully flushed so 242 * we can skip those here. 243 */ 244 sort(keys, nr, sizeof(keys[0]), 245 btree_write_buffered_journal_cmp, 246 NULL); 247 248 commit_flags &= ~BCH_WATERMARK_MASK; 249 commit_flags |= BCH_WATERMARK_reclaim; 250 251 for (i = keys; i < keys + nr; i++) { 252 if (!i->journal_seq) 253 continue; 254 255 if (i->journal_seq > pin.seq) { 256 struct journal_entry_pin pin2; 257 258 memset(&pin2, 0, sizeof(pin2)); 259 260 bch2_journal_pin_add(j, i->journal_seq, &pin2, NULL); 261 bch2_journal_pin_drop(j, &pin); 262 bch2_journal_pin_copy(j, &pin, &pin2, NULL); 263 bch2_journal_pin_drop(j, &pin2); 264 } 265 266 ret = commit_do(trans, NULL, NULL, 267 commit_flags| 268 BTREE_INSERT_NOFAIL| 269 BTREE_INSERT_JOURNAL_RECLAIM, 270 btree_write_buffered_insert(trans, i)); 271 if (bch2_fs_fatal_err_on(ret, c, "%s: insert error %s", __func__, bch2_err_str(ret))) 272 break; 273 } 274 275 goto out; 276 } 277 278 int bch2_btree_write_buffer_flush_sync(struct btree_trans *trans) 279 { 280 bch2_trans_unlock(trans); 281 mutex_lock(&trans->c->btree_write_buffer.flush_lock); 282 return __bch2_btree_write_buffer_flush(trans, 0, true); 283 } 284 285 int bch2_btree_write_buffer_flush(struct btree_trans *trans) 286 { 287 return __bch2_btree_write_buffer_flush(trans, 0, false); 288 } 289 290 static int bch2_btree_write_buffer_journal_flush(struct journal *j, 291 struct journal_entry_pin *_pin, u64 seq) 292 { 293 struct bch_fs *c = container_of(j, struct bch_fs, journal); 294 struct btree_write_buffer *wb = &c->btree_write_buffer; 295 296 mutex_lock(&wb->flush_lock); 297 298 return bch2_trans_run(c, 299 __bch2_btree_write_buffer_flush(trans, BTREE_INSERT_NOCHECK_RW, true)); 300 } 301 302 static inline u64 btree_write_buffer_ref(int idx) 303 { 304 return ((union btree_write_buffer_state) { 305 .ref0 = idx == 0, 306 .ref1 = idx == 1, 307 }).v; 308 } 309 310 int bch2_btree_insert_keys_write_buffer(struct btree_trans *trans) 311 { 312 struct bch_fs *c = trans->c; 313 struct btree_write_buffer *wb = &c->btree_write_buffer; 314 struct btree_write_buffered_key *i; 315 union btree_write_buffer_state old, new; 316 int ret = 0; 317 u64 v; 318 319 trans_for_each_wb_update(trans, i) { 320 EBUG_ON(i->k.k.u64s > BTREE_WRITE_BUFERED_U64s_MAX); 321 322 i->journal_seq = trans->journal_res.seq; 323 i->journal_offset = trans->journal_res.offset; 324 } 325 326 preempt_disable(); 327 v = READ_ONCE(wb->state.v); 328 do { 329 old.v = new.v = v; 330 331 new.v += btree_write_buffer_ref(new.idx); 332 new.nr += trans->nr_wb_updates; 333 if (new.nr > wb->size) { 334 ret = -BCH_ERR_btree_insert_need_flush_buffer; 335 goto out; 336 } 337 } while ((v = atomic64_cmpxchg_acquire(&wb->state.counter, old.v, new.v)) != old.v); 338 339 memcpy(wb->keys[new.idx] + old.nr, 340 trans->wb_updates, 341 sizeof(trans->wb_updates[0]) * trans->nr_wb_updates); 342 343 bch2_journal_pin_add(&c->journal, trans->journal_res.seq, &wb->journal_pin, 344 bch2_btree_write_buffer_journal_flush); 345 346 atomic64_sub_return_release(btree_write_buffer_ref(new.idx), &wb->state.counter); 347 out: 348 preempt_enable(); 349 return ret; 350 } 351 352 void bch2_fs_btree_write_buffer_exit(struct bch_fs *c) 353 { 354 struct btree_write_buffer *wb = &c->btree_write_buffer; 355 356 BUG_ON(wb->state.nr && !bch2_journal_error(&c->journal)); 357 358 kvfree(wb->keys[1]); 359 kvfree(wb->keys[0]); 360 } 361 362 int bch2_fs_btree_write_buffer_init(struct bch_fs *c) 363 { 364 struct btree_write_buffer *wb = &c->btree_write_buffer; 365 366 mutex_init(&wb->flush_lock); 367 wb->size = c->opts.btree_write_buffer_size; 368 369 wb->keys[0] = kvmalloc_array(wb->size, sizeof(*wb->keys[0]), GFP_KERNEL); 370 wb->keys[1] = kvmalloc_array(wb->size, sizeof(*wb->keys[1]), GFP_KERNEL); 371 if (!wb->keys[0] || !wb->keys[1]) 372 return -BCH_ERR_ENOMEM_fs_btree_write_buffer_init; 373 374 return 0; 375 } 376