1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/write_thread.h"
7 #include <chrono>
8 #include <thread>
9 #include "db/column_family.h"
10 #include "monitoring/perf_context_imp.h"
11 #include "port/port.h"
12 #include "test_util/sync_point.h"
13 #include "util/random.h"
14
15 namespace ROCKSDB_NAMESPACE {
16
WriteThread(const ImmutableDBOptions & db_options)17 WriteThread::WriteThread(const ImmutableDBOptions& db_options)
18 : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
19 ? db_options.write_thread_max_yield_usec
20 : 0),
21 slow_yield_usec_(db_options.write_thread_slow_yield_usec),
22 allow_concurrent_memtable_write_(
23 db_options.allow_concurrent_memtable_write),
24 enable_pipelined_write_(db_options.enable_pipelined_write),
25 max_write_batch_group_size_bytes(
26 db_options.max_write_batch_group_size_bytes),
27 newest_writer_(nullptr),
28 newest_memtable_writer_(nullptr),
29 last_sequence_(0),
30 write_stall_dummy_(),
31 stall_mu_(),
32 stall_cv_(&stall_mu_) {}
33
BlockingAwaitState(Writer * w,uint8_t goal_mask)34 uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
35 // We're going to block. Lazily create the mutex. We guarantee
36 // propagation of this construction to the waker via the
37 // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex
38 // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
39 // we install below.
40 w->CreateMutex();
41
42 auto state = w->state.load(std::memory_order_acquire);
43 assert(state != STATE_LOCKED_WAITING);
44 if ((state & goal_mask) == 0 &&
45 w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
46 // we have permission (and an obligation) to use StateMutex
47 std::unique_lock<std::mutex> guard(w->StateMutex());
48 w->StateCV().wait(guard, [w] {
49 return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
50 });
51 state = w->state.load(std::memory_order_relaxed);
52 }
53 // else tricky. Goal is met or CAS failed. In the latter case the waker
54 // must have changed the state, and compare_exchange_strong has updated
55 // our local variable with the new one. At the moment WriteThread never
56 // waits for a transition across intermediate states, so we know that
57 // since a state change has occurred the goal must have been met.
58 assert((state & goal_mask) != 0);
59 return state;
60 }
61
AwaitState(Writer * w,uint8_t goal_mask,AdaptationContext * ctx)62 uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
63 AdaptationContext* ctx) {
64 uint8_t state = 0;
65
66 // 1. Busy loop using "pause" for 1 micro sec
67 // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
68 // 3. Else blocking wait
69
70 // On a modern Xeon each loop takes about 7 nanoseconds (most of which
71 // is the effect of the pause instruction), so 200 iterations is a bit
72 // more than a microsecond. This is long enough that waits longer than
73 // this can amortize the cost of accessing the clock and yielding.
74 for (uint32_t tries = 0; tries < 200; ++tries) {
75 state = w->state.load(std::memory_order_acquire);
76 if ((state & goal_mask) != 0) {
77 return state;
78 }
79 port::AsmVolatilePause();
80 }
81
82 // This is below the fast path, so that the stat is zero when all writes are
83 // from the same thread.
84 PERF_TIMER_GUARD(write_thread_wait_nanos);
85
86 // If we're only going to end up waiting a short period of time,
87 // it can be a lot more efficient to call std::this_thread::yield()
88 // in a loop than to block in StateMutex(). For reference, on my 4.0
89 // SELinux test server with support for syscall auditing enabled, the
90 // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
91 // 2.7 usec, and the average is more like 10 usec. That can be a big
92 // drag on RockDB's single-writer design. Of course, spinning is a
93 // bad idea if other threads are waiting to run or if we're going to
94 // wait for a long time. How do we decide?
95 //
96 // We break waiting into 3 categories: short-uncontended,
97 // short-contended, and long. If we had an oracle, then we would always
98 // spin for short-uncontended, always block for long, and our choice for
99 // short-contended might depend on whether we were trying to optimize
100 // RocksDB throughput or avoid being greedy with system resources.
101 //
102 // Bucketing into short or long is easy by measuring elapsed time.
103 // Differentiating short-uncontended from short-contended is a bit
104 // trickier, but not too bad. We could look for involuntary context
105 // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
106 // (portability code and CPU) to just look for yield calls that take
107 // longer than we expect. sched_yield() doesn't actually result in any
108 // context switch overhead if there are no other runnable processes
109 // on the current core, in which case it usually takes less than
110 // a microsecond.
111 //
112 // There are two primary tunables here: the threshold between "short"
113 // and "long" waits, and the threshold at which we suspect that a yield
114 // is slow enough to indicate we should probably block. If these
115 // thresholds are chosen well then CPU-bound workloads that don't
116 // have more threads than cores will experience few context switches
117 // (voluntary or involuntary), and the total number of context switches
118 // (voluntary and involuntary) will not be dramatically larger (maybe
119 // 2x) than the number of voluntary context switches that occur when
120 // --max_yield_wait_micros=0.
121 //
122 // There's another constant, which is the number of slow yields we will
123 // tolerate before reversing our previous decision. Solitary slow
124 // yields are pretty common (low-priority small jobs ready to run),
125 // so this should be at least 2. We set this conservatively to 3 so
126 // that we can also immediately schedule a ctx adaptation, rather than
127 // waiting for the next update_ctx.
128
129 const size_t kMaxSlowYieldsWhileSpinning = 3;
130
131 // Whether the yield approach has any credit in this context. The credit is
132 // added by yield being succesfull before timing out, and decreased otherwise.
133 auto& yield_credit = ctx->value;
134 // Update the yield_credit based on sample runs or right after a hard failure
135 bool update_ctx = false;
136 // Should we reinforce the yield credit
137 bool would_spin_again = false;
138 // The samling base for updating the yeild credit. The sampling rate would be
139 // 1/sampling_base.
140 const int sampling_base = 256;
141
142 if (max_yield_usec_ > 0) {
143 update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);
144
145 if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
146 // we're updating the adaptation statistics, or spinning has >
147 // 50% chance of being shorter than max_yield_usec_ and causing no
148 // involuntary context switches
149 auto spin_begin = std::chrono::steady_clock::now();
150
151 // this variable doesn't include the final yield (if any) that
152 // causes the goal to be met
153 size_t slow_yield_count = 0;
154
155 auto iter_begin = spin_begin;
156 while ((iter_begin - spin_begin) <=
157 std::chrono::microseconds(max_yield_usec_)) {
158 std::this_thread::yield();
159
160 state = w->state.load(std::memory_order_acquire);
161 if ((state & goal_mask) != 0) {
162 // success
163 would_spin_again = true;
164 break;
165 }
166
167 auto now = std::chrono::steady_clock::now();
168 if (now == iter_begin ||
169 now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
170 // conservatively count it as a slow yield if our clock isn't
171 // accurate enough to measure the yield duration
172 ++slow_yield_count;
173 if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
174 // Not just one ivcsw, but several. Immediately update yield_credit
175 // and fall back to blocking
176 update_ctx = true;
177 break;
178 }
179 }
180 iter_begin = now;
181 }
182 }
183 }
184
185 if ((state & goal_mask) == 0) {
186 TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w);
187 state = BlockingAwaitState(w, goal_mask);
188 }
189
190 if (update_ctx) {
191 // Since our update is sample based, it is ok if a thread overwrites the
192 // updates by other threads. Thus the update does not have to be atomic.
193 auto v = yield_credit.load(std::memory_order_relaxed);
194 // fixed point exponential decay with decay constant 1/1024, with +1
195 // and -1 scaled to avoid overflow for int32_t
196 //
197 // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
198 // 0.1%). If the sampled yield was successful, the credit is also increased
199 // by X. Setting X=2^17 ensures that the credit never exceeds
200 // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
201 // logic applies to negative credits.
202 v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
203 yield_credit.store(v, std::memory_order_relaxed);
204 }
205
206 assert((state & goal_mask) != 0);
207 return state;
208 }
209
SetState(Writer * w,uint8_t new_state)210 void WriteThread::SetState(Writer* w, uint8_t new_state) {
211 assert(w);
212 auto state = w->state.load(std::memory_order_acquire);
213 if (state == STATE_LOCKED_WAITING ||
214 !w->state.compare_exchange_strong(state, new_state)) {
215 assert(state == STATE_LOCKED_WAITING);
216
217 std::lock_guard<std::mutex> guard(w->StateMutex());
218 assert(w->state.load(std::memory_order_relaxed) != new_state);
219 w->state.store(new_state, std::memory_order_relaxed);
220 w->StateCV().notify_one();
221 }
222 }
223
LinkOne(Writer * w,std::atomic<Writer * > * newest_writer)224 bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
225 assert(newest_writer != nullptr);
226 assert(w->state == STATE_INIT);
227 Writer* writers = newest_writer->load(std::memory_order_relaxed);
228 while (true) {
229 // If write stall in effect, and w->no_slowdown is not true,
230 // block here until stall is cleared. If its true, then return
231 // immediately
232 if (writers == &write_stall_dummy_) {
233 if (w->no_slowdown) {
234 w->status = Status::Incomplete("Write stall");
235 SetState(w, STATE_COMPLETED);
236 return false;
237 }
238 // Since no_slowdown is false, wait here to be notified of the write
239 // stall clearing
240 {
241 MutexLock lock(&stall_mu_);
242 writers = newest_writer->load(std::memory_order_relaxed);
243 if (writers == &write_stall_dummy_) {
244 TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w);
245 stall_cv_.Wait();
246 // Load newest_writers_ again since it may have changed
247 writers = newest_writer->load(std::memory_order_relaxed);
248 continue;
249 }
250 }
251 }
252 w->link_older = writers;
253 if (newest_writer->compare_exchange_weak(writers, w)) {
254 return (writers == nullptr);
255 }
256 }
257 }
258
LinkGroup(WriteGroup & write_group,std::atomic<Writer * > * newest_writer)259 bool WriteThread::LinkGroup(WriteGroup& write_group,
260 std::atomic<Writer*>* newest_writer) {
261 assert(newest_writer != nullptr);
262 Writer* leader = write_group.leader;
263 Writer* last_writer = write_group.last_writer;
264 Writer* w = last_writer;
265 while (true) {
266 // Unset link_newer pointers to make sure when we call
267 // CreateMissingNewerLinks later it create all missing links.
268 w->link_newer = nullptr;
269 w->write_group = nullptr;
270 if (w == leader) {
271 break;
272 }
273 w = w->link_older;
274 }
275 Writer* newest = newest_writer->load(std::memory_order_relaxed);
276 while (true) {
277 leader->link_older = newest;
278 if (newest_writer->compare_exchange_weak(newest, last_writer)) {
279 return (newest == nullptr);
280 }
281 }
282 }
283
CreateMissingNewerLinks(Writer * head)284 void WriteThread::CreateMissingNewerLinks(Writer* head) {
285 while (true) {
286 Writer* next = head->link_older;
287 if (next == nullptr || next->link_newer != nullptr) {
288 assert(next == nullptr || next->link_newer == head);
289 break;
290 }
291 next->link_newer = head;
292 head = next;
293 }
294 }
295
FindNextLeader(Writer * from,Writer * boundary)296 WriteThread::Writer* WriteThread::FindNextLeader(Writer* from,
297 Writer* boundary) {
298 assert(from != nullptr && from != boundary);
299 Writer* current = from;
300 while (current->link_older != boundary) {
301 current = current->link_older;
302 assert(current != nullptr);
303 }
304 return current;
305 }
306
CompleteLeader(WriteGroup & write_group)307 void WriteThread::CompleteLeader(WriteGroup& write_group) {
308 assert(write_group.size > 0);
309 Writer* leader = write_group.leader;
310 if (write_group.size == 1) {
311 write_group.leader = nullptr;
312 write_group.last_writer = nullptr;
313 } else {
314 assert(leader->link_newer != nullptr);
315 leader->link_newer->link_older = nullptr;
316 write_group.leader = leader->link_newer;
317 }
318 write_group.size -= 1;
319 SetState(leader, STATE_COMPLETED);
320 }
321
CompleteFollower(Writer * w,WriteGroup & write_group)322 void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
323 assert(write_group.size > 1);
324 assert(w != write_group.leader);
325 if (w == write_group.last_writer) {
326 w->link_older->link_newer = nullptr;
327 write_group.last_writer = w->link_older;
328 } else {
329 w->link_older->link_newer = w->link_newer;
330 w->link_newer->link_older = w->link_older;
331 }
332 write_group.size -= 1;
333 SetState(w, STATE_COMPLETED);
334 }
335
BeginWriteStall()336 void WriteThread::BeginWriteStall() {
337 LinkOne(&write_stall_dummy_, &newest_writer_);
338
339 // Walk writer list until w->write_group != nullptr. The current write group
340 // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
341 // point
342 Writer* w = write_stall_dummy_.link_older;
343 Writer* prev = &write_stall_dummy_;
344 while (w != nullptr && w->write_group == nullptr) {
345 if (w->no_slowdown) {
346 prev->link_older = w->link_older;
347 w->status = Status::Incomplete("Write stall");
348 SetState(w, STATE_COMPLETED);
349 // Only update `link_newer` if it's already set.
350 // `CreateMissingNewerLinks()` will update the nullptr `link_newer` later,
351 // which assumes the the first non-nullptr `link_newer` is the last
352 // nullptr link in the writer list.
353 // If `link_newer` is set here, `CreateMissingNewerLinks()` may stop
354 // updating the whole list when it sees the first non nullptr link.
355 if (prev->link_older && prev->link_older->link_newer) {
356 prev->link_older->link_newer = prev;
357 }
358 w = prev->link_older;
359 } else {
360 prev = w;
361 w = w->link_older;
362 }
363 }
364 }
365
EndWriteStall()366 void WriteThread::EndWriteStall() {
367 MutexLock lock(&stall_mu_);
368
369 // Unlink write_stall_dummy_ from the write queue. This will unblock
370 // pending write threads to enqueue themselves
371 assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
372 assert(write_stall_dummy_.link_older != nullptr);
373 write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
374 newest_writer_.exchange(write_stall_dummy_.link_older);
375
376 // Wake up writers
377 stall_cv_.SignalAll();
378 }
379
380 static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
JoinBatchGroup(Writer * w)381 void WriteThread::JoinBatchGroup(Writer* w) {
382 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
383 assert(w->batch != nullptr);
384
385 bool linked_as_leader = LinkOne(w, &newest_writer_);
386
387 if (linked_as_leader) {
388 SetState(w, STATE_GROUP_LEADER);
389 }
390
391 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
392
393 if (!linked_as_leader) {
394 /**
395 * Wait util:
396 * 1) An existing leader pick us as the new leader when it finishes
397 * 2) An existing leader pick us as its follewer and
398 * 2.1) finishes the memtable writes on our behalf
399 * 2.2) Or tell us to finish the memtable writes in pralallel
400 * 3) (pipelined write) An existing leader pick us as its follower and
401 * finish book-keeping and WAL write for us, enqueue us as pending
402 * memtable writer, and
403 * 3.1) we become memtable writer group leader, or
404 * 3.2) an existing memtable writer group leader tell us to finish memtable
405 * writes in parallel.
406 */
407 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
408 AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
409 STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
410 &jbg_ctx);
411 TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
412 }
413 }
414
EnterAsBatchGroupLeader(Writer * leader,WriteGroup * write_group)415 size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
416 WriteGroup* write_group) {
417 assert(leader->link_older == nullptr);
418 assert(leader->batch != nullptr);
419 assert(write_group != nullptr);
420
421 size_t size = WriteBatchInternal::ByteSize(leader->batch);
422
423 // Allow the group to grow up to a maximum size, but if the
424 // original write is small, limit the growth so we do not slow
425 // down the small write too much.
426 size_t max_size = max_write_batch_group_size_bytes;
427 const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
428 if (size <= min_batch_size_bytes) {
429 max_size = size + min_batch_size_bytes;
430 }
431
432 leader->write_group = write_group;
433 write_group->leader = leader;
434 write_group->last_writer = leader;
435 write_group->size = 1;
436 Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
437
438 // This is safe regardless of any db mutex status of the caller. Previous
439 // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
440 // (they emptied the list and then we added ourself as leader) or had to
441 // explicitly wake us up (the list was non-empty when we added ourself,
442 // so we have already received our MarkJoined).
443 CreateMissingNewerLinks(newest_writer);
444
445 // Tricky. Iteration start (leader) is exclusive and finish
446 // (newest_writer) is inclusive. Iteration goes from old to new.
447 Writer* w = leader;
448 while (w != newest_writer) {
449 assert(w->link_newer);
450 w = w->link_newer;
451
452 if (w->sync && !leader->sync) {
453 // Do not include a sync write into a batch handled by a non-sync write.
454 break;
455 }
456
457 if (w->no_slowdown != leader->no_slowdown) {
458 // Do not mix writes that are ok with delays with the ones that
459 // request fail on delays.
460 break;
461 }
462
463 if (w->disable_wal != leader->disable_wal) {
464 // Do not mix writes that enable WAL with the ones whose
465 // WAL disabled.
466 break;
467 }
468
469 if (w->protection_bytes_per_key != leader->protection_bytes_per_key) {
470 // Do not mix writes with different levels of integrity protection.
471 break;
472 }
473
474 if (w->batch == nullptr) {
475 // Do not include those writes with nullptr batch. Those are not writes,
476 // those are something else. They want to be alone
477 break;
478 }
479
480 if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
481 // don't batch writes that don't want to be batched
482 break;
483 }
484
485 auto batch_size = WriteBatchInternal::ByteSize(w->batch);
486 if (size + batch_size > max_size) {
487 // Do not make batch too big
488 break;
489 }
490
491 w->write_group = write_group;
492 size += batch_size;
493 write_group->last_writer = w;
494 write_group->size++;
495 }
496 TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
497 return size;
498 }
499
EnterAsMemTableWriter(Writer * leader,WriteGroup * write_group)500 void WriteThread::EnterAsMemTableWriter(Writer* leader,
501 WriteGroup* write_group) {
502 assert(leader != nullptr);
503 assert(leader->link_older == nullptr);
504 assert(leader->batch != nullptr);
505 assert(write_group != nullptr);
506
507 size_t size = WriteBatchInternal::ByteSize(leader->batch);
508
509 // Allow the group to grow up to a maximum size, but if the
510 // original write is small, limit the growth so we do not slow
511 // down the small write too much.
512 size_t max_size = max_write_batch_group_size_bytes;
513 const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
514 if (size <= min_batch_size_bytes) {
515 max_size = size + min_batch_size_bytes;
516 }
517
518 leader->write_group = write_group;
519 write_group->leader = leader;
520 write_group->size = 1;
521 Writer* last_writer = leader;
522
523 if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
524 Writer* newest_writer = newest_memtable_writer_.load();
525 CreateMissingNewerLinks(newest_writer);
526
527 Writer* w = leader;
528 while (w != newest_writer) {
529 assert(w->link_newer);
530 w = w->link_newer;
531
532 if (w->batch == nullptr) {
533 break;
534 }
535
536 if (w->batch->HasMerge()) {
537 break;
538 }
539
540 if (!allow_concurrent_memtable_write_) {
541 auto batch_size = WriteBatchInternal::ByteSize(w->batch);
542 if (size + batch_size > max_size) {
543 // Do not make batch too big
544 break;
545 }
546 size += batch_size;
547 }
548
549 w->write_group = write_group;
550 last_writer = w;
551 write_group->size++;
552 }
553 }
554
555 write_group->last_writer = last_writer;
556 write_group->last_sequence =
557 last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
558 }
559
ExitAsMemTableWriter(Writer *,WriteGroup & write_group)560 void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
561 WriteGroup& write_group) {
562 Writer* leader = write_group.leader;
563 Writer* last_writer = write_group.last_writer;
564
565 Writer* newest_writer = last_writer;
566 if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
567 nullptr)) {
568 CreateMissingNewerLinks(newest_writer);
569 Writer* next_leader = last_writer->link_newer;
570 assert(next_leader != nullptr);
571 next_leader->link_older = nullptr;
572 SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
573 }
574 Writer* w = leader;
575 while (true) {
576 if (!write_group.status.ok()) {
577 w->status = write_group.status;
578 }
579 Writer* next = w->link_newer;
580 if (w != leader) {
581 SetState(w, STATE_COMPLETED);
582 }
583 if (w == last_writer) {
584 break;
585 }
586 assert(next);
587 w = next;
588 }
589 // Note that leader has to exit last, since it owns the write group.
590 SetState(leader, STATE_COMPLETED);
591 }
592
LaunchParallelMemTableWriters(WriteGroup * write_group)593 void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
594 assert(write_group != nullptr);
595 write_group->running.store(write_group->size);
596 for (auto w : *write_group) {
597 SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
598 }
599 }
600
601 static WriteThread::AdaptationContext cpmtw_ctx("CompleteParallelMemTableWriter");
602 // This method is called by both the leader and parallel followers
CompleteParallelMemTableWriter(Writer * w)603 bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
604
605 auto* write_group = w->write_group;
606 if (!w->status.ok()) {
607 std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
608 write_group->status = w->status;
609 }
610
611 if (write_group->running-- > 1) {
612 // we're not the last one
613 AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
614 return false;
615 }
616 // else we're the last parallel worker and should perform exit duties.
617 w->status = write_group->status;
618 // Callers of this function must ensure w->status is checked.
619 write_group->status.PermitUncheckedError();
620 return true;
621 }
622
ExitAsBatchGroupFollower(Writer * w)623 void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
624 auto* write_group = w->write_group;
625
626 assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
627 assert(write_group->status.ok());
628 ExitAsBatchGroupLeader(*write_group, write_group->status);
629 assert(w->status.ok());
630 assert(w->state == STATE_COMPLETED);
631 SetState(write_group->leader, STATE_COMPLETED);
632 }
633
634 static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
ExitAsBatchGroupLeader(WriteGroup & write_group,Status & status)635 void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
636 Status& status) {
637 Writer* leader = write_group.leader;
638 Writer* last_writer = write_group.last_writer;
639 assert(leader->link_older == nullptr);
640
641 // If status is non-ok already, then write_group.status won't have the chance
642 // of being propagated to caller.
643 if (!status.ok()) {
644 write_group.status.PermitUncheckedError();
645 }
646
647 // Propagate memtable write error to the whole group.
648 if (status.ok() && !write_group.status.ok()) {
649 status = write_group.status;
650 }
651
652 if (enable_pipelined_write_) {
653 // Notify writers don't write to memtable to exit.
654 for (Writer* w = last_writer; w != leader;) {
655 Writer* next = w->link_older;
656 w->status = status;
657 if (!w->ShouldWriteToMemtable()) {
658 CompleteFollower(w, write_group);
659 }
660 w = next;
661 }
662 if (!leader->ShouldWriteToMemtable()) {
663 CompleteLeader(write_group);
664 }
665
666 Writer* next_leader = nullptr;
667
668 // Look for next leader before we call LinkGroup. If there isn't
669 // pending writers, place a dummy writer at the tail of the queue
670 // so we know the boundary of the current write group.
671 Writer dummy;
672 Writer* expected = last_writer;
673 bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy);
674 if (!has_dummy) {
675 // We find at least one pending writer when we insert dummy. We search
676 // for next leader from there.
677 next_leader = FindNextLeader(expected, last_writer);
678 assert(next_leader != nullptr && next_leader != last_writer);
679 }
680
681 // Link the ramaining of the group to memtable writer list.
682 //
683 // We have to link our group to memtable writer queue before wake up the
684 // next leader or set newest_writer_ to null, otherwise the next leader
685 // can run ahead of us and link to memtable writer queue before we do.
686 if (write_group.size > 0) {
687 if (LinkGroup(write_group, &newest_memtable_writer_)) {
688 // The leader can now be different from current writer.
689 SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
690 }
691 }
692
693 // If we have inserted dummy in the queue, remove it now and check if there
694 // are pending writer join the queue since we insert the dummy. If so,
695 // look for next leader again.
696 if (has_dummy) {
697 assert(next_leader == nullptr);
698 expected = &dummy;
699 bool has_pending_writer =
700 !newest_writer_.compare_exchange_strong(expected, nullptr);
701 if (has_pending_writer) {
702 next_leader = FindNextLeader(expected, &dummy);
703 assert(next_leader != nullptr && next_leader != &dummy);
704 }
705 }
706
707 if (next_leader != nullptr) {
708 next_leader->link_older = nullptr;
709 SetState(next_leader, STATE_GROUP_LEADER);
710 }
711 AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
712 STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
713 &eabgl_ctx);
714 } else {
715 Writer* head = newest_writer_.load(std::memory_order_acquire);
716 if (head != last_writer ||
717 !newest_writer_.compare_exchange_strong(head, nullptr)) {
718 // Either w wasn't the head during the load(), or it was the head
719 // during the load() but somebody else pushed onto the list before
720 // we did the compare_exchange_strong (causing it to fail). In the
721 // latter case compare_exchange_strong has the effect of re-reading
722 // its first param (head). No need to retry a failing CAS, because
723 // only a departing leader (which we are at the moment) can remove
724 // nodes from the list.
725 assert(head != last_writer);
726
727 // After walking link_older starting from head (if not already done)
728 // we will be able to traverse w->link_newer below. This function
729 // can only be called from an active leader, only a leader can
730 // clear newest_writer_, we didn't, and only a clear newest_writer_
731 // could cause the next leader to start their work without a call
732 // to MarkJoined, so we can definitely conclude that no other leader
733 // work is going on here (with or without db mutex).
734 CreateMissingNewerLinks(head);
735 assert(last_writer->link_newer->link_older == last_writer);
736 last_writer->link_newer->link_older = nullptr;
737
738 // Next leader didn't self-identify, because newest_writer_ wasn't
739 // nullptr when they enqueued (we were definitely enqueued before them
740 // and are still in the list). That means leader handoff occurs when
741 // we call MarkJoined
742 SetState(last_writer->link_newer, STATE_GROUP_LEADER);
743 }
744 // else nobody else was waiting, although there might already be a new
745 // leader now
746
747 while (last_writer != leader) {
748 assert(last_writer);
749 last_writer->status = status;
750 // we need to read link_older before calling SetState, because as soon
751 // as it is marked committed the other thread's Await may return and
752 // deallocate the Writer.
753 auto next = last_writer->link_older;
754 SetState(last_writer, STATE_COMPLETED);
755
756 last_writer = next;
757 }
758 }
759 }
760
761 static WriteThread::AdaptationContext eu_ctx("EnterUnbatched");
EnterUnbatched(Writer * w,InstrumentedMutex * mu)762 void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
763 assert(w != nullptr && w->batch == nullptr);
764 mu->Unlock();
765 bool linked_as_leader = LinkOne(w, &newest_writer_);
766 if (!linked_as_leader) {
767 TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
768 // Last leader will not pick us as a follower since our batch is nullptr
769 AwaitState(w, STATE_GROUP_LEADER, &eu_ctx);
770 }
771 if (enable_pipelined_write_) {
772 WaitForMemTableWriters();
773 }
774 mu->Lock();
775 }
776
ExitUnbatched(Writer * w)777 void WriteThread::ExitUnbatched(Writer* w) {
778 assert(w != nullptr);
779 Writer* newest_writer = w;
780 if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
781 CreateMissingNewerLinks(newest_writer);
782 Writer* next_leader = w->link_newer;
783 assert(next_leader != nullptr);
784 next_leader->link_older = nullptr;
785 SetState(next_leader, STATE_GROUP_LEADER);
786 }
787 }
788
789 static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters");
WaitForMemTableWriters()790 void WriteThread::WaitForMemTableWriters() {
791 assert(enable_pipelined_write_);
792 if (newest_memtable_writer_.load() == nullptr) {
793 return;
794 }
795 Writer w;
796 if (!LinkOne(&w, &newest_memtable_writer_)) {
797 AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx);
798 }
799 newest_memtable_writer_.store(nullptr);
800 }
801
802 } // namespace ROCKSDB_NAMESPACE
803