1 /* Copyright (C) 2012-2021 Free Software Foundation, Inc.
2    Contributed by Torvald Riegel <triegel@redhat.com>.
3 
4    This file is part of the GNU Transactional Memory Library (libitm).
5 
6    Libitm is free software; you can redistribute it and/or modify it
7    under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10 
11    Libitm is distributed in the hope that it will be useful, but WITHOUT ANY
12    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13    FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
14    more details.
15 
16    Under Section 7 of GPL version 3, you are granted additional
17    permissions described in the GCC Runtime Library Exception, version
18    3.1, as published by the Free Software Foundation.
19 
20    You should have received a copy of the GNU General Public License and
21    a copy of the GCC Runtime Library Exception along with this program;
22    see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
23    <http://www.gnu.org/licenses/>.  */
24 
25 #include "libitm_i.h"
26 
27 using namespace GTM;
28 
29 namespace {
30 
31 // This group consists of all TM methods that synchronize via multiple locks
32 // (or ownership records).
33 struct ml_mg : public method_group
34 {
35   static const gtm_word LOCK_BIT = (~(gtm_word)0 >> 1) + 1;
36   static const gtm_word INCARNATION_BITS = 3;
37   static const gtm_word INCARNATION_MASK = 7;
38   // Maximum time is all bits except the lock bit, the overflow reserve bit,
39   // and the incarnation bits).
40   static const gtm_word TIME_MAX = (~(gtm_word)0 >> (2 + INCARNATION_BITS));
41   // The overflow reserve bit is the MSB of the timestamp part of an orec,
42   // so we can have TIME_MAX+1 pending timestamp increases before we overflow.
43   static const gtm_word OVERFLOW_RESERVE = TIME_MAX + 1;
44 
is_locked__anon8c4d5b480111::ml_mg45   static bool is_locked(gtm_word o) { return o & LOCK_BIT; }
set_locked__anon8c4d5b480111::ml_mg46   static gtm_word set_locked(gtm_thread *tx)
47   {
48     return ((uintptr_t)tx >> 1) | LOCK_BIT;
49   }
50   // Returns a time that includes the lock bit, which is required by both
51   // validate() and is_more_recent_or_locked().
get_time__anon8c4d5b480111::ml_mg52   static gtm_word get_time(gtm_word o) { return o >> INCARNATION_BITS; }
set_time__anon8c4d5b480111::ml_mg53   static gtm_word set_time(gtm_word time) { return time << INCARNATION_BITS; }
is_more_recent_or_locked__anon8c4d5b480111::ml_mg54   static bool is_more_recent_or_locked(gtm_word o, gtm_word than_time)
55   {
56     // LOCK_BIT is the MSB; thus, if O is locked, it is larger than TIME_MAX.
57     return get_time(o) > than_time;
58   }
has_incarnation_left__anon8c4d5b480111::ml_mg59   static bool has_incarnation_left(gtm_word o)
60   {
61     return (o & INCARNATION_MASK) < INCARNATION_MASK;
62   }
inc_incarnation__anon8c4d5b480111::ml_mg63   static gtm_word inc_incarnation(gtm_word o) { return o + 1; }
64 
65   // The shared time base.
66   atomic<gtm_word> time __attribute__((aligned(HW_CACHELINE_SIZE)));
67 
68   // The array of ownership records.
69   atomic<gtm_word>* orecs __attribute__((aligned(HW_CACHELINE_SIZE)));
70   char tailpadding[HW_CACHELINE_SIZE - sizeof(atomic<gtm_word>*)];
71 
72   // Location-to-orec mapping.  Stripes of 32B mapped to 2^16 orecs using
73   // multiplicative hashing.  See Section 5.2.2 of Torvald Riegel's PhD thesis
74   // for the background on this choice of hash function and parameters:
75   // http://nbn-resolving.de/urn:nbn:de:bsz:14-qucosa-115596
76   // We pick the Mult32 hash because it works well with fewer orecs (i.e.,
77   // less space overhead and just 32b multiplication).
78   // We may want to check and potentially change these settings once we get
79   // better or just more benchmarks.
80   static const gtm_word L2O_ORECS_BITS = 16;
81   static const gtm_word L2O_ORECS = 1 << L2O_ORECS_BITS;
82   // An iterator over the orecs covering the region [addr,addr+len).
83   struct orec_iterator
84   {
85     static const gtm_word L2O_SHIFT = 5;
86     static const uint32_t L2O_MULT32 = 81007;
87     uint32_t mult;
88     size_t orec;
89     size_t orec_end;
orec_iterator__anon8c4d5b480111::ml_mg::orec_iterator90     orec_iterator (const void* addr, size_t len)
91     {
92       uint32_t a = (uintptr_t) addr >> L2O_SHIFT;
93       uint32_t ae = ((uintptr_t) addr + len + (1 << L2O_SHIFT) - 1)
94 	  >> L2O_SHIFT;
95       mult = a * L2O_MULT32;
96       orec = mult >> (32 - L2O_ORECS_BITS);
97       // We can't really avoid this second multiplication unless we use a
98       // branch instead or know more about the alignment of addr.  (We often
99       // know len at compile time because of instantiations of functions
100       // such as _ITM_RU* for accesses of specific lengths.
101       orec_end = (ae * L2O_MULT32) >> (32 - L2O_ORECS_BITS);
102     }
get__anon8c4d5b480111::ml_mg::orec_iterator103     size_t get() { return orec; }
advance__anon8c4d5b480111::ml_mg::orec_iterator104     void advance()
105     {
106       // We cannot simply increment orec because L2O_MULT32 is larger than
107       // 1 << (32 - L2O_ORECS_BITS), and thus an increase of the stripe (i.e.,
108       // addr >> L2O_SHIFT) could increase the resulting orec index by more
109       // than one; with the current parameters, we would roughly acquire a
110       // fourth more orecs than necessary for regions covering more than orec.
111       // Keeping mult around as extra state shouldn't matter much.
112       mult += L2O_MULT32;
113       orec = mult >> (32 - L2O_ORECS_BITS);
114     }
reached_end__anon8c4d5b480111::ml_mg::orec_iterator115     bool reached_end() { return orec == orec_end; }
116   };
117 
init__anon8c4d5b480111::ml_mg118   virtual void init()
119   {
120     // We assume that an atomic<gtm_word> is backed by just a gtm_word, so
121     // starting with zeroed memory is fine.
122     orecs = (atomic<gtm_word>*) xcalloc(
123         sizeof(atomic<gtm_word>) * L2O_ORECS, true);
124     // This store is only executed while holding the serial lock, so relaxed
125     // memory order is sufficient here.
126     time.store(0, memory_order_relaxed);
127   }
128 
fini__anon8c4d5b480111::ml_mg129   virtual void fini()
130   {
131     free(orecs);
132   }
133 
134   // We only re-initialize when our time base overflows.  Thus, only reset
135   // the time base and the orecs but do not re-allocate the orec array.
reinit__anon8c4d5b480111::ml_mg136   virtual void reinit()
137   {
138     // This store is only executed while holding the serial lock, so relaxed
139     // memory order is sufficient here.  Same holds for the memset.
140     time.store(0, memory_order_relaxed);
141     // The memset below isn't strictly kosher because it bypasses
142     // the non-trivial assignment operator defined by std::atomic.  Using
143     // a local void* is enough to prevent GCC from warning for this.
144     void *p = orecs;
145     memset(p, 0, sizeof(atomic<gtm_word>) * L2O_ORECS);
146   }
147 };
148 
149 static ml_mg o_ml_mg;
150 
151 
152 // The multiple lock, write-through TM method.
153 // Maps each memory location to one of the orecs in the orec array, and then
154 // acquires the associated orec eagerly before writing through.
155 // Writes require undo-logging because we are dealing with several locks/orecs
156 // and need to resolve deadlocks if necessary by aborting one of the
157 // transactions.
158 // Reads do time-based validation with snapshot time extensions.  Incarnation
159 // numbers are used to decrease contention on the time base (with those,
160 // aborted transactions do not need to acquire a new version number for the
161 // data that has been previously written in the transaction and needs to be
162 // rolled back).
163 // gtm_thread::shared_state is used to store a transaction's current
164 // snapshot time (or commit time). The serial lock uses ~0 for inactive
165 // transactions and 0 for active ones. Thus, we always have a meaningful
166 // timestamp in shared_state that can be used to implement quiescence-based
167 // privatization safety.
168 class ml_wt_dispatch : public abi_dispatch
169 {
170 protected:
pre_write(gtm_thread * tx,const void * addr,size_t len)171   static void pre_write(gtm_thread *tx, const void *addr, size_t len)
172   {
173     gtm_word snapshot = tx->shared_state.load(memory_order_relaxed);
174     gtm_word locked_by_tx = ml_mg::set_locked(tx);
175 
176     // Lock all orecs that cover the region.
177     ml_mg::orec_iterator oi(addr, len);
178     do
179       {
180         // Load the orec.  Relaxed memory order is sufficient here because
181         // either we have acquired the orec or we will try to acquire it with
182         // a CAS with stronger memory order.
183         gtm_word o = o_ml_mg.orecs[oi.get()].load(memory_order_relaxed);
184 
185         // Check whether we have acquired the orec already.
186         if (likely (locked_by_tx != o))
187           {
188             // If not, acquire.  Make sure that our snapshot time is larger or
189             // equal than the orec's version to avoid masking invalidations of
190             // our snapshot with our own writes.
191             if (unlikely (ml_mg::is_locked(o)))
192               tx->restart(RESTART_LOCKED_WRITE);
193 
194             if (unlikely (ml_mg::get_time(o) > snapshot))
195               {
196                 // We only need to extend the snapshot if we have indeed read
197                 // from this orec before.  Given that we are an update
198                 // transaction, we will have to extend anyway during commit.
199                 // ??? Scan the read log instead, aborting if we have read
200                 // from data covered by this orec before?
201                 snapshot = extend(tx);
202               }
203 
204             // We need acquire memory order here to synchronize with other
205             // (ownership) releases of the orec.  We do not need acq_rel order
206             // because whenever another thread reads from this CAS'
207             // modification, then it will abort anyway and does not rely on
208             // any further happens-before relation to be established.
209             if (unlikely (!o_ml_mg.orecs[oi.get()].compare_exchange_strong(
210                 o, locked_by_tx, memory_order_acquire)))
211               tx->restart(RESTART_LOCKED_WRITE);
212 
213             // We use an explicit fence here to avoid having to use release
214             // memory order for all subsequent data stores.  This fence will
215             // synchronize with loads of the data with acquire memory order.
216             // See post_load() for why this is necessary.
217             // Adding require memory order to the prior CAS is not sufficient,
218             // at least according to the Batty et al. formalization of the
219             // memory model.
220             atomic_thread_fence(memory_order_release);
221 
222             // We log the previous value here to be able to use incarnation
223             // numbers when we have to roll back.
224             // ??? Reserve capacity early to avoid capacity checks here?
225             gtm_rwlog_entry *e = tx->writelog.push();
226             e->orec = o_ml_mg.orecs + oi.get();
227             e->value = o;
228           }
229         oi.advance();
230       }
231     while (!oi.reached_end());
232 
233     // Do undo logging.  We do not know which region prior writes logged
234     // (even if orecs have been acquired), so just log everything.
235     tx->undolog.log(addr, len);
236   }
237 
pre_write(const void * addr,size_t len)238   static void pre_write(const void *addr, size_t len)
239   {
240     gtm_thread *tx = gtm_thr();
241     pre_write(tx, addr, len);
242   }
243 
244   // Returns true iff all the orecs in our read log still have the same time
245   // or have been locked by the transaction itself.
validate(gtm_thread * tx)246   static bool validate(gtm_thread *tx)
247   {
248     gtm_word locked_by_tx = ml_mg::set_locked(tx);
249     // ??? This might get called from pre_load() via extend().  In that case,
250     // we don't really need to check the new entries that pre_load() is
251     // adding.  Stop earlier?
252     for (gtm_rwlog_entry *i = tx->readlog.begin(), *ie = tx->readlog.end();
253         i != ie; i++)
254       {
255 	// Relaxed memory order is sufficient here because we do not need to
256 	// establish any new synchronizes-with relationships.  We only need
257 	// to read a value that is as least as current as enforced by the
258 	// callers: extend() loads global time with acquire, and trycommit()
259 	// increments global time with acquire.  Therefore, we will see the
260 	// most recent orec updates before the global time that we load.
261         gtm_word o = i->orec->load(memory_order_relaxed);
262         // We compare only the time stamp and the lock bit here.  We know that
263         // we have read only committed data before, so we can ignore
264         // intermediate yet rolled-back updates presented by the incarnation
265         // number bits.
266         if (ml_mg::get_time(o) != ml_mg::get_time(i->value)
267             && o != locked_by_tx)
268           return false;
269       }
270     return true;
271   }
272 
273   // Tries to extend the snapshot to a more recent time.  Returns the new
274   // snapshot time and updates TX->SHARED_STATE.  If the snapshot cannot be
275   // extended to the current global time, TX is restarted.
extend(gtm_thread * tx)276   static gtm_word extend(gtm_thread *tx)
277   {
278     // We read global time here, even if this isn't strictly necessary
279     // because we could just return the maximum of the timestamps that
280     // validate sees.  However, the potential cache miss on global time is
281     // probably a reasonable price to pay for avoiding unnecessary extensions
282     // in the future.
283     // We need acquire memory oder because we have to synchronize with the
284     // increment of global time by update transactions, whose lock
285     // acquisitions we have to observe (also see trycommit()).
286     gtm_word snapshot = o_ml_mg.time.load(memory_order_acquire);
287     if (!validate(tx))
288       tx->restart(RESTART_VALIDATE_READ);
289 
290     // Update our public snapshot time.  Probably useful to decrease waiting
291     // due to quiescence-based privatization safety.
292     // Use release memory order to establish synchronizes-with with the
293     // privatizers; prior data loads should happen before the privatizers
294     // potentially modify anything.
295     tx->shared_state.store(snapshot, memory_order_release);
296     return snapshot;
297   }
298 
299   // First pass over orecs.  Load and check all orecs that cover the region.
300   // Write to read log, extend snapshot time if necessary.
pre_load(gtm_thread * tx,const void * addr,size_t len)301   static gtm_rwlog_entry* pre_load(gtm_thread *tx, const void* addr,
302       size_t len)
303   {
304     // Don't obtain an iterator yet because the log might get resized.
305     size_t log_start = tx->readlog.size();
306     gtm_word snapshot = tx->shared_state.load(memory_order_relaxed);
307     gtm_word locked_by_tx = ml_mg::set_locked(tx);
308 
309     ml_mg::orec_iterator oi(addr, len);
310     do
311       {
312         // We need acquire memory order here so that this load will
313         // synchronize with the store that releases the orec in trycommit().
314         // In turn, this makes sure that subsequent data loads will read from
315         // a visible sequence of side effects that starts with the most recent
316         // store to the data right before the release of the orec.
317         gtm_word o = o_ml_mg.orecs[oi.get()].load(memory_order_acquire);
318 
319         if (likely (!ml_mg::is_more_recent_or_locked(o, snapshot)))
320           {
321             success:
322             gtm_rwlog_entry *e = tx->readlog.push();
323             e->orec = o_ml_mg.orecs + oi.get();
324             e->value = o;
325           }
326         else if (!ml_mg::is_locked(o))
327           {
328             // We cannot read this part of the region because it has been
329             // updated more recently than our snapshot time.  If we can extend
330             // our snapshot, then we can read.
331             snapshot = extend(tx);
332             goto success;
333           }
334         else
335           {
336             // If the orec is locked by us, just skip it because we can just
337             // read from it.  Otherwise, restart the transaction.
338             if (o != locked_by_tx)
339               tx->restart(RESTART_LOCKED_READ);
340           }
341         oi.advance();
342       }
343     while (!oi.reached_end());
344     return &tx->readlog[log_start];
345   }
346 
347   // Second pass over orecs, verifying that the we had a consistent read.
348   // Restart the transaction if any of the orecs is locked by another
349   // transaction.
post_load(gtm_thread * tx,gtm_rwlog_entry * log)350   static void post_load(gtm_thread *tx, gtm_rwlog_entry* log)
351   {
352     for (gtm_rwlog_entry *end = tx->readlog.end(); log != end; log++)
353       {
354         // Check that the snapshot is consistent.  We expect the previous data
355         // load to have acquire memory order, or be atomic and followed by an
356         // acquire fence.
357         // As a result, the data load will synchronize with the release fence
358         // issued by the transactions whose data updates the data load has read
359         // from.  This forces the orec load to read from a visible sequence of
360         // side effects that starts with the other updating transaction's
361         // store that acquired the orec and set it to locked.
362         // We therefore either read a value with the locked bit set (and
363         // restart) or read an orec value that was written after the data had
364         // been written.  Either will allow us to detect inconsistent reads
365         // because it will have a higher/different value.
366 	// Also note that differently to validate(), we compare the raw value
367 	// of the orec here, including incarnation numbers.  We must prevent
368 	// returning uncommitted data from loads (whereas when validating, we
369 	// already performed a consistent load).
370         gtm_word o = log->orec->load(memory_order_relaxed);
371         if (log->value != o)
372           tx->restart(RESTART_VALIDATE_READ);
373       }
374   }
375 
load(const V * addr,ls_modifier mod)376   template <typename V> static V load(const V* addr, ls_modifier mod)
377   {
378     // Read-for-write should be unlikely, but we need to handle it or will
379     // break later WaW optimizations.
380     if (unlikely(mod == RfW))
381       {
382 	pre_write(addr, sizeof(V));
383 	return *addr;
384       }
385     if (unlikely(mod == RaW))
386       return *addr;
387     // ??? Optimize for RaR?
388 
389     gtm_thread *tx = gtm_thr();
390     gtm_rwlog_entry* log = pre_load(tx, addr, sizeof(V));
391 
392     // Load the data.
393     // This needs to have acquire memory order (see post_load()).
394     // Alternatively, we can put an acquire fence after the data load but this
395     // is probably less efficient.
396     // FIXME We would need an atomic load with acquire memory order here but
397     // we can't just forge an atomic load for nonatomic data because this
398     // might not work on all implementations of atomics.  However, we need
399     // the acquire memory order and we can only establish this if we link
400     // it to the matching release using a reads-from relation between atomic
401     // loads.  Also, the compiler is allowed to optimize nonatomic accesses
402     // differently than atomic accesses (e.g., if the load would be moved to
403     // after the fence, we potentially don't synchronize properly anymore).
404     // Instead of the following, just use an ordinary load followed by an
405     // acquire fence, and hope that this is good enough for now:
406     // V v = atomic_load_explicit((atomic<V>*)addr, memory_order_acquire);
407     V v = *addr;
408     atomic_thread_fence(memory_order_acquire);
409 
410     // ??? Retry the whole load if it wasn't consistent?
411     post_load(tx, log);
412 
413     return v;
414   }
415 
store(V * addr,const V value,ls_modifier mod)416   template <typename V> static void store(V* addr, const V value,
417       ls_modifier mod)
418   {
419     if (likely(mod != WaW))
420       pre_write(addr, sizeof(V));
421     // FIXME We would need an atomic store here but we can't just forge an
422     // atomic load for nonatomic data because this might not work on all
423     // implementations of atomics.  However, we need this store to link the
424     // release fence in pre_write() to the acquire operation in load, which
425     // is only guaranteed if we have a reads-from relation between atomic
426     // accesses.  Also, the compiler is allowed to optimize nonatomic accesses
427     // differently than atomic accesses (e.g., if the store would be moved
428     // to before the release fence in pre_write(), things could go wrong).
429     // atomic_store_explicit((atomic<V>*)addr, value, memory_order_relaxed);
430     *addr = value;
431   }
432 
433 public:
memtransfer_static(void * dst,const void * src,size_t size,bool may_overlap,ls_modifier dst_mod,ls_modifier src_mod)434   static void memtransfer_static(void *dst, const void* src, size_t size,
435       bool may_overlap, ls_modifier dst_mod, ls_modifier src_mod)
436   {
437     gtm_rwlog_entry* log = 0;
438     gtm_thread *tx = 0;
439 
440     if (src_mod == RfW)
441       {
442         tx = gtm_thr();
443         pre_write(tx, src, size);
444       }
445     else if (src_mod != RaW && src_mod != NONTXNAL)
446       {
447         tx = gtm_thr();
448         log = pre_load(tx, src, size);
449       }
450     // ??? Optimize for RaR?
451 
452     if (dst_mod != NONTXNAL && dst_mod != WaW)
453       {
454         if (src_mod != RfW && (src_mod == RaW || src_mod == NONTXNAL))
455           tx = gtm_thr();
456         pre_write(tx, dst, size);
457       }
458 
459     // FIXME We should use atomics here (see store()).  Let's just hope that
460     // memcpy/memmove are good enough.
461     if (!may_overlap)
462       ::memcpy(dst, src, size);
463     else
464       ::memmove(dst, src, size);
465 
466     // ??? Retry the whole memtransfer if it wasn't consistent?
467     if (src_mod != RfW && src_mod != RaW && src_mod != NONTXNAL)
468       {
469 	// See load() for why we need the acquire fence here.
470 	atomic_thread_fence(memory_order_acquire);
471 	post_load(tx, log);
472       }
473   }
474 
memset_static(void * dst,int c,size_t size,ls_modifier mod)475   static void memset_static(void *dst, int c, size_t size, ls_modifier mod)
476   {
477     if (mod != WaW)
478       pre_write(dst, size);
479     // FIXME We should use atomics here (see store()).  Let's just hope that
480     // memset is good enough.
481     ::memset(dst, c, size);
482   }
483 
begin_or_restart()484   virtual gtm_restart_reason begin_or_restart()
485   {
486     // We don't need to do anything for nested transactions.
487     gtm_thread *tx = gtm_thr();
488     if (tx->parent_txns.size() > 0)
489       return NO_RESTART;
490 
491     // Read the current time, which becomes our snapshot time.
492     // Use acquire memory oder so that we see the lock acquisitions by update
493     // transcations that incremented the global time (see trycommit()).
494     gtm_word snapshot = o_ml_mg.time.load(memory_order_acquire);
495     // Re-initialize method group on time overflow.
496     if (snapshot >= o_ml_mg.TIME_MAX)
497       return RESTART_INIT_METHOD_GROUP;
498 
499     // We don't need to enforce any ordering for the following store. There
500     // are no earlier data loads in this transaction, so the store cannot
501     // become visible before those (which could lead to the violation of
502     // privatization safety). The store can become visible after later loads
503     // but this does not matter because the previous value will have been
504     // smaller or equal (the serial lock will set shared_state to zero when
505     // marking the transaction as active, and restarts enforce immediate
506     // visibility of a smaller or equal value with a barrier (see
507     // rollback()).
508     tx->shared_state.store(snapshot, memory_order_relaxed);
509     return NO_RESTART;
510   }
511 
trycommit(gtm_word & priv_time)512   virtual bool trycommit(gtm_word& priv_time)
513   {
514     gtm_thread* tx = gtm_thr();
515 
516     // If we haven't updated anything, we can commit.
517     if (!tx->writelog.size())
518       {
519         tx->readlog.clear();
520         // We still need to ensure privatization safety, unfortunately.  While
521         // we cannot have privatized anything by ourselves (because we are not
522         // an update transaction), we can have observed the commits of
523         // another update transaction that privatized something.  Because any
524         // commit happens before ensuring privatization, our snapshot and
525         // commit can thus have happened before ensuring privatization safety
526         // for this commit/snapshot time.  Therefore, before we can return to
527         // nontransactional code that might use the privatized data, we must
528         // ensure privatization safety for our snapshot time.
529         // This still seems to be better than not allowing use of the
530         // snapshot time before privatization safety has been ensured because
531         // we at least can run transactions such as this one, and in the
532         // meantime the transaction producing this commit time might have
533         // finished ensuring privatization safety for it.
534         priv_time = tx->shared_state.load(memory_order_relaxed);
535         return true;
536       }
537 
538     // Get a commit time.
539     // Overflow of o_ml_mg.time is prevented in begin_or_restart().
540     // We need acq_rel here because (1) the acquire part is required for our
541     // own subsequent call to validate(), and the release part is necessary to
542     // make other threads' validate() work as explained there and in extend().
543     gtm_word ct = o_ml_mg.time.fetch_add(1, memory_order_acq_rel) + 1;
544 
545     // Extend our snapshot time to at least our commit time.
546     // Note that we do not need to validate if our snapshot time is right
547     // before the commit time because we are never sharing the same commit
548     // time with other transactions.
549     // No need to reset shared_state, which will be modified by the serial
550     // lock right after our commit anyway.
551     gtm_word snapshot = tx->shared_state.load(memory_order_relaxed);
552     if (snapshot < ct - 1 && !validate(tx))
553       return false;
554 
555     // Release orecs.
556     // See pre_load() / post_load() for why we need release memory order.
557     // ??? Can we use a release fence and relaxed stores?
558     gtm_word v = ml_mg::set_time(ct);
559     for (gtm_rwlog_entry *i = tx->writelog.begin(), *ie = tx->writelog.end();
560         i != ie; i++)
561       i->orec->store(v, memory_order_release);
562 
563     // We're done, clear the logs.
564     tx->writelog.clear();
565     tx->readlog.clear();
566 
567     // Need to ensure privatization safety. Every other transaction must
568     // have a snapshot time that is at least as high as our commit time
569     // (i.e., our commit must be visible to them).
570     priv_time = ct;
571     return true;
572   }
573 
rollback(gtm_transaction_cp * cp)574   virtual void rollback(gtm_transaction_cp *cp)
575   {
576     // We don't do anything for rollbacks of nested transactions.
577     // ??? We could release locks here if we snapshot writelog size.  readlog
578     // is similar.  This is just a performance optimization though.  Nested
579     // aborts should be rather infrequent, so the additional save/restore
580     // overhead for the checkpoints could be higher.
581     if (cp != 0)
582       return;
583 
584     gtm_thread *tx = gtm_thr();
585     gtm_word overflow_value = 0;
586 
587     // Release orecs.
588     for (gtm_rwlog_entry *i = tx->writelog.begin(), *ie = tx->writelog.end();
589         i != ie; i++)
590       {
591         // If possible, just increase the incarnation number.
592         // See pre_load() / post_load() for why we need release memory order.
593 	// ??? Can we use a release fence and relaxed stores?  (Same below.)
594         if (ml_mg::has_incarnation_left(i->value))
595           i->orec->store(ml_mg::inc_incarnation(i->value),
596               memory_order_release);
597         else
598           {
599             // We have an incarnation overflow.  Acquire a new timestamp, and
600             // use it from now on as value for each orec whose incarnation
601             // number cannot be increased.
602             // Overflow of o_ml_mg.time is prevented in begin_or_restart().
603             // See pre_load() / post_load() for why we need release memory
604             // order.
605             if (!overflow_value)
606               // Release memory order is sufficient but required here.
607               // In contrast to the increment in trycommit(), we need release
608               // for the same reason but do not need the acquire because we
609               // do not validate subsequently.
610               overflow_value = ml_mg::set_time(
611                   o_ml_mg.time.fetch_add(1, memory_order_release) + 1);
612             i->orec->store(overflow_value, memory_order_release);
613           }
614       }
615 
616     // We need this release fence to ensure that privatizers see the
617     // rolled-back original state (not any uncommitted values) when they read
618     // the new snapshot time that we write in begin_or_restart().
619     atomic_thread_fence(memory_order_release);
620 
621     // We're done, clear the logs.
622     tx->writelog.clear();
623     tx->readlog.clear();
624   }
625 
snapshot_most_recent()626   virtual bool snapshot_most_recent()
627   {
628     // This is the same code as in extend() except that we do not restart
629     // on failure but simply return the result, and that we don't validate
630     // if our snapshot is already most recent.
631     gtm_thread* tx = gtm_thr();
632     gtm_word snapshot = o_ml_mg.time.load(memory_order_acquire);
633     if (snapshot == tx->shared_state.load(memory_order_relaxed))
634       return true;
635     if (!validate(tx))
636       return false;
637 
638     // Update our public snapshot time.  Necessary so that we do not prevent
639     // other transactions from ensuring privatization safety.
640     tx->shared_state.store(snapshot, memory_order_release);
641     return true;
642   }
643 
supports(unsigned number_of_threads)644   virtual bool supports(unsigned number_of_threads)
645   {
646     // Each txn can commit and fail and rollback once before checking for
647     // overflow, so this bounds the number of threads that we can support.
648     // In practice, this won't be a problem but we check it anyway so that
649     // we never break in the occasional weird situation.
650     return (number_of_threads * 2 <= ml_mg::OVERFLOW_RESERVE);
651   }
652 
653   CREATE_DISPATCH_METHODS(virtual, )
CREATE_DISPATCH_METHODS_MEM()654   CREATE_DISPATCH_METHODS_MEM()
655 
656   ml_wt_dispatch() : abi_dispatch(false, true, false, false, 0, &o_ml_mg)
657   { }
658 };
659 
660 } // anon namespace
661 
662 static const ml_wt_dispatch o_ml_wt_dispatch;
663 
664 abi_dispatch *
dispatch_ml_wt()665 GTM::dispatch_ml_wt ()
666 {
667   return const_cast<ml_wt_dispatch *>(&o_ml_wt_dispatch);
668 }
669