1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/write_thread.h"
7 #include <chrono>
8 #include <thread>
9 #include "db/column_family.h"
10 #include "monitoring/perf_context_imp.h"
11 #include "port/port.h"
12 #include "test_util/sync_point.h"
13 #include "util/random.h"
14 
15 namespace ROCKSDB_NAMESPACE {
16 
WriteThread(const ImmutableDBOptions & db_options)17 WriteThread::WriteThread(const ImmutableDBOptions& db_options)
18     : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
19                           ? db_options.write_thread_max_yield_usec
20                           : 0),
21       slow_yield_usec_(db_options.write_thread_slow_yield_usec),
22       allow_concurrent_memtable_write_(
23           db_options.allow_concurrent_memtable_write),
24       enable_pipelined_write_(db_options.enable_pipelined_write),
25       max_write_batch_group_size_bytes(
26           db_options.max_write_batch_group_size_bytes),
27       newest_writer_(nullptr),
28       newest_memtable_writer_(nullptr),
29       last_sequence_(0),
30       write_stall_dummy_(),
31       stall_mu_(),
32       stall_cv_(&stall_mu_) {}
33 
BlockingAwaitState(Writer * w,uint8_t goal_mask)34 uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
35   // We're going to block.  Lazily create the mutex.  We guarantee
36   // propagation of this construction to the waker via the
37   // STATE_LOCKED_WAITING state.  The waker won't try to touch the mutex
38   // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
39   // we install below.
40   w->CreateMutex();
41 
42   auto state = w->state.load(std::memory_order_acquire);
43   assert(state != STATE_LOCKED_WAITING);
44   if ((state & goal_mask) == 0 &&
45       w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
46     // we have permission (and an obligation) to use StateMutex
47     std::unique_lock<std::mutex> guard(w->StateMutex());
48     w->StateCV().wait(guard, [w] {
49       return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
50     });
51     state = w->state.load(std::memory_order_relaxed);
52   }
53   // else tricky.  Goal is met or CAS failed.  In the latter case the waker
54   // must have changed the state, and compare_exchange_strong has updated
55   // our local variable with the new one.  At the moment WriteThread never
56   // waits for a transition across intermediate states, so we know that
57   // since a state change has occurred the goal must have been met.
58   assert((state & goal_mask) != 0);
59   return state;
60 }
61 
AwaitState(Writer * w,uint8_t goal_mask,AdaptationContext * ctx)62 uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
63                                 AdaptationContext* ctx) {
64   uint8_t state = 0;
65 
66   // 1. Busy loop using "pause" for 1 micro sec
67   // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
68   // 3. Else blocking wait
69 
70   // On a modern Xeon each loop takes about 7 nanoseconds (most of which
71   // is the effect of the pause instruction), so 200 iterations is a bit
72   // more than a microsecond.  This is long enough that waits longer than
73   // this can amortize the cost of accessing the clock and yielding.
74   for (uint32_t tries = 0; tries < 200; ++tries) {
75     state = w->state.load(std::memory_order_acquire);
76     if ((state & goal_mask) != 0) {
77       return state;
78     }
79     port::AsmVolatilePause();
80   }
81 
82   // This is below the fast path, so that the stat is zero when all writes are
83   // from the same thread.
84   PERF_TIMER_GUARD(write_thread_wait_nanos);
85 
86   // If we're only going to end up waiting a short period of time,
87   // it can be a lot more efficient to call std::this_thread::yield()
88   // in a loop than to block in StateMutex().  For reference, on my 4.0
89   // SELinux test server with support for syscall auditing enabled, the
90   // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
91   // 2.7 usec, and the average is more like 10 usec.  That can be a big
92   // drag on RockDB's single-writer design.  Of course, spinning is a
93   // bad idea if other threads are waiting to run or if we're going to
94   // wait for a long time.  How do we decide?
95   //
96   // We break waiting into 3 categories: short-uncontended,
97   // short-contended, and long.  If we had an oracle, then we would always
98   // spin for short-uncontended, always block for long, and our choice for
99   // short-contended might depend on whether we were trying to optimize
100   // RocksDB throughput or avoid being greedy with system resources.
101   //
102   // Bucketing into short or long is easy by measuring elapsed time.
103   // Differentiating short-uncontended from short-contended is a bit
104   // trickier, but not too bad.  We could look for involuntary context
105   // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
106   // (portability code and CPU) to just look for yield calls that take
107   // longer than we expect.  sched_yield() doesn't actually result in any
108   // context switch overhead if there are no other runnable processes
109   // on the current core, in which case it usually takes less than
110   // a microsecond.
111   //
112   // There are two primary tunables here: the threshold between "short"
113   // and "long" waits, and the threshold at which we suspect that a yield
114   // is slow enough to indicate we should probably block.  If these
115   // thresholds are chosen well then CPU-bound workloads that don't
116   // have more threads than cores will experience few context switches
117   // (voluntary or involuntary), and the total number of context switches
118   // (voluntary and involuntary) will not be dramatically larger (maybe
119   // 2x) than the number of voluntary context switches that occur when
120   // --max_yield_wait_micros=0.
121   //
122   // There's another constant, which is the number of slow yields we will
123   // tolerate before reversing our previous decision.  Solitary slow
124   // yields are pretty common (low-priority small jobs ready to run),
125   // so this should be at least 2.  We set this conservatively to 3 so
126   // that we can also immediately schedule a ctx adaptation, rather than
127   // waiting for the next update_ctx.
128 
129   const size_t kMaxSlowYieldsWhileSpinning = 3;
130 
131   // Whether the yield approach has any credit in this context. The credit is
132   // added by yield being succesfull before timing out, and decreased otherwise.
133   auto& yield_credit = ctx->value;
134   // Update the yield_credit based on sample runs or right after a hard failure
135   bool update_ctx = false;
136   // Should we reinforce the yield credit
137   bool would_spin_again = false;
138   // The samling base for updating the yeild credit. The sampling rate would be
139   // 1/sampling_base.
140   const int sampling_base = 256;
141 
142   if (max_yield_usec_ > 0) {
143     update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);
144 
145     if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
146       // we're updating the adaptation statistics, or spinning has >
147       // 50% chance of being shorter than max_yield_usec_ and causing no
148       // involuntary context switches
149       auto spin_begin = std::chrono::steady_clock::now();
150 
151       // this variable doesn't include the final yield (if any) that
152       // causes the goal to be met
153       size_t slow_yield_count = 0;
154 
155       auto iter_begin = spin_begin;
156       while ((iter_begin - spin_begin) <=
157              std::chrono::microseconds(max_yield_usec_)) {
158         std::this_thread::yield();
159 
160         state = w->state.load(std::memory_order_acquire);
161         if ((state & goal_mask) != 0) {
162           // success
163           would_spin_again = true;
164           break;
165         }
166 
167         auto now = std::chrono::steady_clock::now();
168         if (now == iter_begin ||
169             now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
170           // conservatively count it as a slow yield if our clock isn't
171           // accurate enough to measure the yield duration
172           ++slow_yield_count;
173           if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
174             // Not just one ivcsw, but several.  Immediately update yield_credit
175             // and fall back to blocking
176             update_ctx = true;
177             break;
178           }
179         }
180         iter_begin = now;
181       }
182     }
183   }
184 
185   if ((state & goal_mask) == 0) {
186     TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w);
187     state = BlockingAwaitState(w, goal_mask);
188   }
189 
190   if (update_ctx) {
191     // Since our update is sample based, it is ok if a thread overwrites the
192     // updates by other threads. Thus the update does not have to be atomic.
193     auto v = yield_credit.load(std::memory_order_relaxed);
194     // fixed point exponential decay with decay constant 1/1024, with +1
195     // and -1 scaled to avoid overflow for int32_t
196     //
197     // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
198     // 0.1%). If the sampled yield was successful, the credit is also increased
199     // by X. Setting X=2^17 ensures that the credit never exceeds
200     // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
201     // logic applies to negative credits.
202     v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
203     yield_credit.store(v, std::memory_order_relaxed);
204   }
205 
206   assert((state & goal_mask) != 0);
207   return state;
208 }
209 
SetState(Writer * w,uint8_t new_state)210 void WriteThread::SetState(Writer* w, uint8_t new_state) {
211   assert(w);
212   auto state = w->state.load(std::memory_order_acquire);
213   if (state == STATE_LOCKED_WAITING ||
214       !w->state.compare_exchange_strong(state, new_state)) {
215     assert(state == STATE_LOCKED_WAITING);
216 
217     std::lock_guard<std::mutex> guard(w->StateMutex());
218     assert(w->state.load(std::memory_order_relaxed) != new_state);
219     w->state.store(new_state, std::memory_order_relaxed);
220     w->StateCV().notify_one();
221   }
222 }
223 
LinkOne(Writer * w,std::atomic<Writer * > * newest_writer)224 bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
225   assert(newest_writer != nullptr);
226   assert(w->state == STATE_INIT);
227   Writer* writers = newest_writer->load(std::memory_order_relaxed);
228   while (true) {
229     // If write stall in effect, and w->no_slowdown is not true,
230     // block here until stall is cleared. If its true, then return
231     // immediately
232     if (writers == &write_stall_dummy_) {
233       if (w->no_slowdown) {
234         w->status = Status::Incomplete("Write stall");
235         SetState(w, STATE_COMPLETED);
236         return false;
237       }
238       // Since no_slowdown is false, wait here to be notified of the write
239       // stall clearing
240       {
241         MutexLock lock(&stall_mu_);
242         writers = newest_writer->load(std::memory_order_relaxed);
243         if (writers == &write_stall_dummy_) {
244           TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w);
245           stall_cv_.Wait();
246           // Load newest_writers_ again since it may have changed
247           writers = newest_writer->load(std::memory_order_relaxed);
248           continue;
249         }
250       }
251     }
252     w->link_older = writers;
253     if (newest_writer->compare_exchange_weak(writers, w)) {
254       return (writers == nullptr);
255     }
256   }
257 }
258 
LinkGroup(WriteGroup & write_group,std::atomic<Writer * > * newest_writer)259 bool WriteThread::LinkGroup(WriteGroup& write_group,
260                             std::atomic<Writer*>* newest_writer) {
261   assert(newest_writer != nullptr);
262   Writer* leader = write_group.leader;
263   Writer* last_writer = write_group.last_writer;
264   Writer* w = last_writer;
265   while (true) {
266     // Unset link_newer pointers to make sure when we call
267     // CreateMissingNewerLinks later it create all missing links.
268     w->link_newer = nullptr;
269     w->write_group = nullptr;
270     if (w == leader) {
271       break;
272     }
273     w = w->link_older;
274   }
275   Writer* newest = newest_writer->load(std::memory_order_relaxed);
276   while (true) {
277     leader->link_older = newest;
278     if (newest_writer->compare_exchange_weak(newest, last_writer)) {
279       return (newest == nullptr);
280     }
281   }
282 }
283 
CreateMissingNewerLinks(Writer * head)284 void WriteThread::CreateMissingNewerLinks(Writer* head) {
285   while (true) {
286     Writer* next = head->link_older;
287     if (next == nullptr || next->link_newer != nullptr) {
288       assert(next == nullptr || next->link_newer == head);
289       break;
290     }
291     next->link_newer = head;
292     head = next;
293   }
294 }
295 
FindNextLeader(Writer * from,Writer * boundary)296 WriteThread::Writer* WriteThread::FindNextLeader(Writer* from,
297                                                  Writer* boundary) {
298   assert(from != nullptr && from != boundary);
299   Writer* current = from;
300   while (current->link_older != boundary) {
301     current = current->link_older;
302     assert(current != nullptr);
303   }
304   return current;
305 }
306 
CompleteLeader(WriteGroup & write_group)307 void WriteThread::CompleteLeader(WriteGroup& write_group) {
308   assert(write_group.size > 0);
309   Writer* leader = write_group.leader;
310   if (write_group.size == 1) {
311     write_group.leader = nullptr;
312     write_group.last_writer = nullptr;
313   } else {
314     assert(leader->link_newer != nullptr);
315     leader->link_newer->link_older = nullptr;
316     write_group.leader = leader->link_newer;
317   }
318   write_group.size -= 1;
319   SetState(leader, STATE_COMPLETED);
320 }
321 
CompleteFollower(Writer * w,WriteGroup & write_group)322 void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
323   assert(write_group.size > 1);
324   assert(w != write_group.leader);
325   if (w == write_group.last_writer) {
326     w->link_older->link_newer = nullptr;
327     write_group.last_writer = w->link_older;
328   } else {
329     w->link_older->link_newer = w->link_newer;
330     w->link_newer->link_older = w->link_older;
331   }
332   write_group.size -= 1;
333   SetState(w, STATE_COMPLETED);
334 }
335 
BeginWriteStall()336 void WriteThread::BeginWriteStall() {
337   LinkOne(&write_stall_dummy_, &newest_writer_);
338 
339   // Walk writer list until w->write_group != nullptr. The current write group
340   // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
341   // point
342   Writer* w = write_stall_dummy_.link_older;
343   Writer* prev = &write_stall_dummy_;
344   while (w != nullptr && w->write_group == nullptr) {
345     if (w->no_slowdown) {
346       prev->link_older = w->link_older;
347       w->status = Status::Incomplete("Write stall");
348       SetState(w, STATE_COMPLETED);
349       // Only update `link_newer` if it's already set.
350       // `CreateMissingNewerLinks()` will update the nullptr `link_newer` later,
351       // which assumes the the first non-nullptr `link_newer` is the last
352       // nullptr link in the writer list.
353       // If `link_newer` is set here, `CreateMissingNewerLinks()` may stop
354       // updating the whole list when it sees the first non nullptr link.
355       if (prev->link_older && prev->link_older->link_newer) {
356         prev->link_older->link_newer = prev;
357       }
358       w = prev->link_older;
359     } else {
360       prev = w;
361       w = w->link_older;
362     }
363   }
364 }
365 
EndWriteStall()366 void WriteThread::EndWriteStall() {
367   MutexLock lock(&stall_mu_);
368 
369   // Unlink write_stall_dummy_ from the write queue. This will unblock
370   // pending write threads to enqueue themselves
371   assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
372   assert(write_stall_dummy_.link_older != nullptr);
373   write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
374   newest_writer_.exchange(write_stall_dummy_.link_older);
375 
376   // Wake up writers
377   stall_cv_.SignalAll();
378 }
379 
380 static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
JoinBatchGroup(Writer * w)381 void WriteThread::JoinBatchGroup(Writer* w) {
382   TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
383   assert(w->batch != nullptr);
384 
385   bool linked_as_leader = LinkOne(w, &newest_writer_);
386 
387   if (linked_as_leader) {
388     SetState(w, STATE_GROUP_LEADER);
389   }
390 
391   TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
392 
393   if (!linked_as_leader) {
394     /**
395      * Wait util:
396      * 1) An existing leader pick us as the new leader when it finishes
397      * 2) An existing leader pick us as its follewer and
398      * 2.1) finishes the memtable writes on our behalf
399      * 2.2) Or tell us to finish the memtable writes in pralallel
400      * 3) (pipelined write) An existing leader pick us as its follower and
401      *    finish book-keeping and WAL write for us, enqueue us as pending
402      *    memtable writer, and
403      * 3.1) we become memtable writer group leader, or
404      * 3.2) an existing memtable writer group leader tell us to finish memtable
405      *      writes in parallel.
406      */
407     TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
408     AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
409                       STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
410                &jbg_ctx);
411     TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
412   }
413 }
414 
EnterAsBatchGroupLeader(Writer * leader,WriteGroup * write_group)415 size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
416                                             WriteGroup* write_group) {
417   assert(leader->link_older == nullptr);
418   assert(leader->batch != nullptr);
419   assert(write_group != nullptr);
420 
421   size_t size = WriteBatchInternal::ByteSize(leader->batch);
422 
423   // Allow the group to grow up to a maximum size, but if the
424   // original write is small, limit the growth so we do not slow
425   // down the small write too much.
426   size_t max_size = max_write_batch_group_size_bytes;
427   const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
428   if (size <= min_batch_size_bytes) {
429     max_size = size + min_batch_size_bytes;
430   }
431 
432   leader->write_group = write_group;
433   write_group->leader = leader;
434   write_group->last_writer = leader;
435   write_group->size = 1;
436   Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
437 
438   // This is safe regardless of any db mutex status of the caller. Previous
439   // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
440   // (they emptied the list and then we added ourself as leader) or had to
441   // explicitly wake us up (the list was non-empty when we added ourself,
442   // so we have already received our MarkJoined).
443   CreateMissingNewerLinks(newest_writer);
444 
445   // Tricky. Iteration start (leader) is exclusive and finish
446   // (newest_writer) is inclusive. Iteration goes from old to new.
447   Writer* w = leader;
448   while (w != newest_writer) {
449     assert(w->link_newer);
450     w = w->link_newer;
451 
452     if (w->sync && !leader->sync) {
453       // Do not include a sync write into a batch handled by a non-sync write.
454       break;
455     }
456 
457     if (w->no_slowdown != leader->no_slowdown) {
458       // Do not mix writes that are ok with delays with the ones that
459       // request fail on delays.
460       break;
461     }
462 
463     if (w->disable_wal != leader->disable_wal) {
464       // Do not mix writes that enable WAL with the ones whose
465       // WAL disabled.
466       break;
467     }
468 
469     if (w->protection_bytes_per_key != leader->protection_bytes_per_key) {
470       // Do not mix writes with different levels of integrity protection.
471       break;
472     }
473 
474     if (w->batch == nullptr) {
475       // Do not include those writes with nullptr batch. Those are not writes,
476       // those are something else. They want to be alone
477       break;
478     }
479 
480     if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
481       // don't batch writes that don't want to be batched
482       break;
483     }
484 
485     auto batch_size = WriteBatchInternal::ByteSize(w->batch);
486     if (size + batch_size > max_size) {
487       // Do not make batch too big
488       break;
489     }
490 
491     w->write_group = write_group;
492     size += batch_size;
493     write_group->last_writer = w;
494     write_group->size++;
495   }
496   TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
497   return size;
498 }
499 
EnterAsMemTableWriter(Writer * leader,WriteGroup * write_group)500 void WriteThread::EnterAsMemTableWriter(Writer* leader,
501                                         WriteGroup* write_group) {
502   assert(leader != nullptr);
503   assert(leader->link_older == nullptr);
504   assert(leader->batch != nullptr);
505   assert(write_group != nullptr);
506 
507   size_t size = WriteBatchInternal::ByteSize(leader->batch);
508 
509   // Allow the group to grow up to a maximum size, but if the
510   // original write is small, limit the growth so we do not slow
511   // down the small write too much.
512   size_t max_size = max_write_batch_group_size_bytes;
513   const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
514   if (size <= min_batch_size_bytes) {
515     max_size = size + min_batch_size_bytes;
516   }
517 
518   leader->write_group = write_group;
519   write_group->leader = leader;
520   write_group->size = 1;
521   Writer* last_writer = leader;
522 
523   if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
524     Writer* newest_writer = newest_memtable_writer_.load();
525     CreateMissingNewerLinks(newest_writer);
526 
527     Writer* w = leader;
528     while (w != newest_writer) {
529       assert(w->link_newer);
530       w = w->link_newer;
531 
532       if (w->batch == nullptr) {
533         break;
534       }
535 
536       if (w->batch->HasMerge()) {
537         break;
538       }
539 
540       if (!allow_concurrent_memtable_write_) {
541         auto batch_size = WriteBatchInternal::ByteSize(w->batch);
542         if (size + batch_size > max_size) {
543           // Do not make batch too big
544           break;
545         }
546         size += batch_size;
547       }
548 
549       w->write_group = write_group;
550       last_writer = w;
551       write_group->size++;
552     }
553   }
554 
555   write_group->last_writer = last_writer;
556   write_group->last_sequence =
557       last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
558 }
559 
ExitAsMemTableWriter(Writer *,WriteGroup & write_group)560 void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
561                                        WriteGroup& write_group) {
562   Writer* leader = write_group.leader;
563   Writer* last_writer = write_group.last_writer;
564 
565   Writer* newest_writer = last_writer;
566   if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
567                                                        nullptr)) {
568     CreateMissingNewerLinks(newest_writer);
569     Writer* next_leader = last_writer->link_newer;
570     assert(next_leader != nullptr);
571     next_leader->link_older = nullptr;
572     SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
573   }
574   Writer* w = leader;
575   while (true) {
576     if (!write_group.status.ok()) {
577       w->status = write_group.status;
578     }
579     Writer* next = w->link_newer;
580     if (w != leader) {
581       SetState(w, STATE_COMPLETED);
582     }
583     if (w == last_writer) {
584       break;
585     }
586     assert(next);
587     w = next;
588   }
589   // Note that leader has to exit last, since it owns the write group.
590   SetState(leader, STATE_COMPLETED);
591 }
592 
LaunchParallelMemTableWriters(WriteGroup * write_group)593 void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
594   assert(write_group != nullptr);
595   write_group->running.store(write_group->size);
596   for (auto w : *write_group) {
597     SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
598   }
599 }
600 
601 static WriteThread::AdaptationContext cpmtw_ctx("CompleteParallelMemTableWriter");
602 // This method is called by both the leader and parallel followers
CompleteParallelMemTableWriter(Writer * w)603 bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
604 
605   auto* write_group = w->write_group;
606   if (!w->status.ok()) {
607     std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
608     write_group->status = w->status;
609   }
610 
611   if (write_group->running-- > 1) {
612     // we're not the last one
613     AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
614     return false;
615   }
616   // else we're the last parallel worker and should perform exit duties.
617   w->status = write_group->status;
618   // Callers of this function must ensure w->status is checked.
619   write_group->status.PermitUncheckedError();
620   return true;
621 }
622 
ExitAsBatchGroupFollower(Writer * w)623 void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
624   auto* write_group = w->write_group;
625 
626   assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
627   assert(write_group->status.ok());
628   ExitAsBatchGroupLeader(*write_group, write_group->status);
629   assert(w->status.ok());
630   assert(w->state == STATE_COMPLETED);
631   SetState(write_group->leader, STATE_COMPLETED);
632 }
633 
634 static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
ExitAsBatchGroupLeader(WriteGroup & write_group,Status & status)635 void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
636                                          Status& status) {
637   Writer* leader = write_group.leader;
638   Writer* last_writer = write_group.last_writer;
639   assert(leader->link_older == nullptr);
640 
641   // If status is non-ok already, then write_group.status won't have the chance
642   // of being propagated to caller.
643   if (!status.ok()) {
644     write_group.status.PermitUncheckedError();
645   }
646 
647   // Propagate memtable write error to the whole group.
648   if (status.ok() && !write_group.status.ok()) {
649     status = write_group.status;
650   }
651 
652   if (enable_pipelined_write_) {
653     // Notify writers don't write to memtable to exit.
654     for (Writer* w = last_writer; w != leader;) {
655       Writer* next = w->link_older;
656       w->status = status;
657       if (!w->ShouldWriteToMemtable()) {
658         CompleteFollower(w, write_group);
659       }
660       w = next;
661     }
662     if (!leader->ShouldWriteToMemtable()) {
663       CompleteLeader(write_group);
664     }
665 
666     Writer* next_leader = nullptr;
667 
668     // Look for next leader before we call LinkGroup. If there isn't
669     // pending writers, place a dummy writer at the tail of the queue
670     // so we know the boundary of the current write group.
671     Writer dummy;
672     Writer* expected = last_writer;
673     bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy);
674     if (!has_dummy) {
675       // We find at least one pending writer when we insert dummy. We search
676       // for next leader from there.
677       next_leader = FindNextLeader(expected, last_writer);
678       assert(next_leader != nullptr && next_leader != last_writer);
679     }
680 
681     // Link the ramaining of the group to memtable writer list.
682     //
683     // We have to link our group to memtable writer queue before wake up the
684     // next leader or set newest_writer_ to null, otherwise the next leader
685     // can run ahead of us and link to memtable writer queue before we do.
686     if (write_group.size > 0) {
687       if (LinkGroup(write_group, &newest_memtable_writer_)) {
688         // The leader can now be different from current writer.
689         SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
690       }
691     }
692 
693     // If we have inserted dummy in the queue, remove it now and check if there
694     // are pending writer join the queue since we insert the dummy. If so,
695     // look for next leader again.
696     if (has_dummy) {
697       assert(next_leader == nullptr);
698       expected = &dummy;
699       bool has_pending_writer =
700           !newest_writer_.compare_exchange_strong(expected, nullptr);
701       if (has_pending_writer) {
702         next_leader = FindNextLeader(expected, &dummy);
703         assert(next_leader != nullptr && next_leader != &dummy);
704       }
705     }
706 
707     if (next_leader != nullptr) {
708       next_leader->link_older = nullptr;
709       SetState(next_leader, STATE_GROUP_LEADER);
710     }
711     AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
712                            STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
713                &eabgl_ctx);
714   } else {
715     Writer* head = newest_writer_.load(std::memory_order_acquire);
716     if (head != last_writer ||
717         !newest_writer_.compare_exchange_strong(head, nullptr)) {
718       // Either w wasn't the head during the load(), or it was the head
719       // during the load() but somebody else pushed onto the list before
720       // we did the compare_exchange_strong (causing it to fail).  In the
721       // latter case compare_exchange_strong has the effect of re-reading
722       // its first param (head).  No need to retry a failing CAS, because
723       // only a departing leader (which we are at the moment) can remove
724       // nodes from the list.
725       assert(head != last_writer);
726 
727       // After walking link_older starting from head (if not already done)
728       // we will be able to traverse w->link_newer below. This function
729       // can only be called from an active leader, only a leader can
730       // clear newest_writer_, we didn't, and only a clear newest_writer_
731       // could cause the next leader to start their work without a call
732       // to MarkJoined, so we can definitely conclude that no other leader
733       // work is going on here (with or without db mutex).
734       CreateMissingNewerLinks(head);
735       assert(last_writer->link_newer->link_older == last_writer);
736       last_writer->link_newer->link_older = nullptr;
737 
738       // Next leader didn't self-identify, because newest_writer_ wasn't
739       // nullptr when they enqueued (we were definitely enqueued before them
740       // and are still in the list).  That means leader handoff occurs when
741       // we call MarkJoined
742       SetState(last_writer->link_newer, STATE_GROUP_LEADER);
743     }
744     // else nobody else was waiting, although there might already be a new
745     // leader now
746 
747     while (last_writer != leader) {
748       assert(last_writer);
749       last_writer->status = status;
750       // we need to read link_older before calling SetState, because as soon
751       // as it is marked committed the other thread's Await may return and
752       // deallocate the Writer.
753       auto next = last_writer->link_older;
754       SetState(last_writer, STATE_COMPLETED);
755 
756       last_writer = next;
757     }
758   }
759 }
760 
761 static WriteThread::AdaptationContext eu_ctx("EnterUnbatched");
EnterUnbatched(Writer * w,InstrumentedMutex * mu)762 void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
763   assert(w != nullptr && w->batch == nullptr);
764   mu->Unlock();
765   bool linked_as_leader = LinkOne(w, &newest_writer_);
766   if (!linked_as_leader) {
767     TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
768     // Last leader will not pick us as a follower since our batch is nullptr
769     AwaitState(w, STATE_GROUP_LEADER, &eu_ctx);
770   }
771   if (enable_pipelined_write_) {
772     WaitForMemTableWriters();
773   }
774   mu->Lock();
775 }
776 
ExitUnbatched(Writer * w)777 void WriteThread::ExitUnbatched(Writer* w) {
778   assert(w != nullptr);
779   Writer* newest_writer = w;
780   if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
781     CreateMissingNewerLinks(newest_writer);
782     Writer* next_leader = w->link_newer;
783     assert(next_leader != nullptr);
784     next_leader->link_older = nullptr;
785     SetState(next_leader, STATE_GROUP_LEADER);
786   }
787 }
788 
789 static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters");
WaitForMemTableWriters()790 void WriteThread::WaitForMemTableWriters() {
791   assert(enable_pipelined_write_);
792   if (newest_memtable_writer_.load() == nullptr) {
793     return;
794   }
795   Writer w;
796   if (!LinkOne(&w, &newest_memtable_writer_)) {
797     AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx);
798   }
799   newest_memtable_writer_.store(nullptr);
800 }
801 
802 }  // namespace ROCKSDB_NAMESPACE
803