1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2  * vim: sw=2 ts=4 et :
3  */
4 /* This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 
8 #ifndef mozilla_ipc_ProducerConsumerQueue_h
9 #define mozilla_ipc_ProducerConsumerQueue_h 1
10 
11 #include <atomic>
12 #include <tuple>
13 #include <type_traits>
14 #include <utility>
15 #include <vector>
16 #include "mozilla/dom/QueueParamTraits.h"
17 #include "CrossProcessSemaphore.h"
18 
19 namespace IPC {
20 template <typename T>
21 struct ParamTraits;
22 }  // namespace IPC
23 
24 namespace mozilla {
25 namespace webgl {
26 
27 using IPC::PcqTypeInfo;
28 using IPC::PcqTypeInfoID;
29 
30 extern LazyLogModule gPCQLog;
31 #define PCQ_LOG_(lvl, ...) MOZ_LOG(mozilla::webgl::gPCQLog, lvl, (__VA_ARGS__))
32 #define PCQ_LOGD(...) PCQ_LOG_(LogLevel::Debug, __VA_ARGS__)
33 #define PCQ_LOGE(...) PCQ_LOG_(LogLevel::Error, __VA_ARGS__)
34 
35 class ProducerConsumerQuue;
36 class PcqProducer;
37 class PcqConsumer;
38 
39 }  // namespace webgl
40 
41 // NB: detail is in mozilla instead of mozilla::webgl because many points in
42 // existing code get confused if mozilla::detail and mozilla::webgl::detail
43 // exist.
44 namespace detail {
45 using IPC::PcqTypeInfo;
46 using IPC::PcqTypeInfoID;
47 
48 using mozilla::ipc::Shmem;
49 using mozilla::webgl::IsSuccess;
50 using mozilla::webgl::ProducerConsumerQueue;
51 using mozilla::webgl::QueueStatus;
52 
GetCacheLineSize()53 constexpr size_t GetCacheLineSize() { return 64; }
54 
55 // NB: The header may end up consuming fewer bytes than this.  This value
56 // guarantees that we can always byte-align the header contents.
GetMaxHeaderSize()57 constexpr size_t GetMaxHeaderSize() {
58   // Recall that the Shmem contents are laid out like this:
59   // -----------------------------------------------------------------------
60   // queue contents | align1 | mRead | align2 | mWrite | align3 | User Data
61   // -----------------------------------------------------------------------
62 
63   constexpr size_t alignment =
64       std::max(std::alignment_of<size_t>::value, GetCacheLineSize());
65   static_assert(alignment >= sizeof(size_t),
66                 "alignment expected to be large enough to hold a size_t");
67 
68   // We may need up to this many bytes to properly align mRead
69   constexpr size_t maxAlign1 = alignment - 1;
70   constexpr size_t readAndAlign2 = alignment;
71   constexpr size_t writeAndAlign3 = alignment;
72   return maxAlign1 + readAndAlign2 + writeAndAlign3;
73 }
74 
75 template <typename View, typename Arg, typename... Args>
MinSizeofArgs(View & aView,const Arg * aArg,const Args * ...aArgs)76 size_t MinSizeofArgs(View& aView, const Arg* aArg, const Args*... aArgs) {
77   return aView.MinSizeParam(aArg) + MinSizeofArgs(aView, aArgs...);
78 }
79 
80 template <typename View>
MinSizeofArgs(View &)81 size_t MinSizeofArgs(View&) {
82   return 0;
83 }
84 
85 template <typename View, typename Arg1, typename Arg2, typename... Args>
MinSizeofArgs(View & aView)86 size_t MinSizeofArgs(View& aView) {
87   return aView.template MinSizeParam<Arg1>(nullptr) +
88          MinSizeofArgs<Arg2, Args...>(aView);
89 }
90 
91 template <typename View, typename Arg>
MinSizeofArgs(View & aView)92 size_t MinSizeofArgs(View& aView) {
93   return aView.MinSizeParam(nullptr);
94 }
95 
96 class PcqRCSemaphore {
97  public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(PcqRCSemaphore)98   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(PcqRCSemaphore)
99   explicit PcqRCSemaphore(CrossProcessSemaphore* aSem) : mSem(aSem) {
100     MOZ_ASSERT(mSem);
101   }
102 
Wait(const Maybe<TimeDuration> & aTime)103   bool Wait(const Maybe<TimeDuration>& aTime) { return mSem->Wait(aTime); }
Signal()104   void Signal() { mSem->Signal(); }
IsAvailable()105   bool IsAvailable() {
106     MOZ_ASSERT_UNREACHABLE("Unimplemented");
107     return false;
108   }
ShareToProcess(base::ProcessId aTargetPid)109   CrossProcessSemaphoreHandle ShareToProcess(base::ProcessId aTargetPid) {
110     return mSem->ShareToProcess(aTargetPid);
111   }
CloseHandle()112   void CloseHandle() { mSem->CloseHandle(); }
113 
114  private:
~PcqRCSemaphore()115   ~PcqRCSemaphore() { delete mSem; }
116   CrossProcessSemaphore* mSem;
117 };
118 
119 /**
120  * Common base class for PcqProducer and Consumer.
121  */
122 class PcqBase {
123  public:
124   /**
125    * Bytes used in the queue if the parameters are the read/write heads.
126    */
UsedBytes(size_t aRead,size_t aWrite)127   size_t UsedBytes(size_t aRead, size_t aWrite) {
128     MOZ_ASSERT(ValidState(aRead, aWrite));
129     return mozilla::webgl::UsedBytes(QueueBufferSize(), aRead, aWrite);
130   }
131 
132   /**
133    * Bytes free in the queue if the parameters are the read/write heads.
134    */
FreeBytes(size_t aRead,size_t aWrite)135   size_t FreeBytes(size_t aRead, size_t aWrite) {
136     MOZ_ASSERT(ValidState(aRead, aWrite));
137     return mozilla::webgl::FreeBytes(QueueBufferSize(), aRead, aWrite);
138   }
139 
140   /**
141    * True when this queue is valid with the parameters as the read/write heads.
142    */
ValidState(size_t aRead,size_t aWrite)143   bool ValidState(size_t aRead, size_t aWrite) {
144     return (aRead < QueueBufferSize()) && (aWrite < QueueBufferSize());
145   }
146 
147   /**
148    * True when this queue is empty with the parameters as the read/write heads.
149    */
IsEmpty(size_t aRead,size_t aWrite)150   bool IsEmpty(size_t aRead, size_t aWrite) {
151     MOZ_ASSERT(ValidState(aRead, aWrite));
152     return UsedBytes(aRead, aWrite) == 0;
153   }
154 
155   /**
156    * True when this queue is full with the parameters as the read/write heads.
157    */
IsFull(size_t aRead,size_t aWrite)158   bool IsFull(size_t aRead, size_t aWrite) {
159     MOZ_ASSERT(ValidState(aRead, aWrite));
160     return FreeBytes(aRead, aWrite) == 0;
161   }
162 
163   // Cheaply get the used size of the current queue.  This does no
164   // synchronization so the information may be stale.  On the PcqProducer
165   // side, it will never underestimate the number of bytes used and,
166   // on the Consumer side, it will never overestimate them.
167   // (The reciprocal is true of FreeBytes.)
UsedBytes()168   size_t UsedBytes() {
169     size_t write = mWrite->load(std::memory_order_relaxed);
170     size_t read = mRead->load(std::memory_order_relaxed);
171     return UsedBytes(read, write);
172   }
173 
174   // This does no synchronization so the information may be stale.
FreeBytes()175   size_t FreeBytes() { return QueueSize() - UsedBytes(); }
176 
177   // This does no synchronization so the information may be stale.
IsEmpty()178   bool IsEmpty() { return IsEmpty(GetReadRelaxed(), GetWriteRelaxed()); }
179 
180   // This does no synchronization so the information may be stale.
IsFull()181   bool IsFull() { return IsFull(GetReadRelaxed(), GetWriteRelaxed()); }
182 
183  protected:
184   friend struct mozilla::ipc::IPDLParamTraits<PcqBase>;
185   friend ProducerConsumerQueue;
186 
187   PcqBase()
188       : mOtherPid(0),
189         mQueue(nullptr),
190         mQueueBufferSize(0),
191         mUserReservedMemory(nullptr),
192         mUserReservedSize(0),
193         mRead(nullptr),
194         mWrite(nullptr) {}
195 
196   PcqBase(Shmem& aShmem, base::ProcessId aOtherPid, size_t aQueueSize,
197           RefPtr<PcqRCSemaphore> aMaybeNotEmptySem,
198           RefPtr<PcqRCSemaphore> aMaybeNotFullSem) {
199     Set(aShmem, aOtherPid, aQueueSize, aMaybeNotEmptySem, aMaybeNotFullSem);
200   }
201 
202   PcqBase(const PcqBase&) = delete;
203   PcqBase(PcqBase&&) = default;
204   PcqBase& operator=(const PcqBase&) = delete;
205   PcqBase& operator=(PcqBase&&) = default;
206 
207   void Set(Shmem& aShmem, base::ProcessId aOtherPid, size_t aQueueSize,
208            RefPtr<PcqRCSemaphore> aMaybeNotEmptySem,
209            RefPtr<PcqRCSemaphore> aMaybeNotFullSem) {
210     mOtherPid = aOtherPid;
211     mShmem = aShmem;
212     mQueue = aShmem.get<uint8_t>();
213 
214     // NB: The buffer needs one extra byte for the queue contents
215     mQueueBufferSize = aQueueSize + 1;
216 
217     // Recall that the Shmem contents are laid out like this:
218     // -----------------------------------------------------------------------
219     // queue contents | align1 | mRead | align2 | mWrite | align3 | User Data
220     // -----------------------------------------------------------------------
221 
222     size_t shmemSize = aShmem.Size<uint8_t>();
223     uint8_t* header = mQueue + mQueueBufferSize;
224 
225     constexpr size_t alignment =
226         std::max(std::alignment_of<size_t>::value, GetCacheLineSize());
227     static_assert(alignment >= sizeof(size_t),
228                   "alignment expected to be large enough to hold a size_t");
229 
230     static_assert((alignment & (alignment - 1)) == 0,
231                   "alignment must be a power of 2");
232 
233     // We may need up to this many bytes to properly align mRead
234     constexpr size_t maxAlign1 = alignment - 1;
235 
236     // Find the lowest value of align1 that assures proper byte-alignment.
237     uintptr_t alignValue = reinterpret_cast<uintptr_t>(header + maxAlign1);
238     alignValue &= ~(alignment - 1);
239     uint8_t* metadata = reinterpret_cast<uint8_t*>(alignValue);
240 
241     // NB: We do not call the nontrivial constructor here (we do not write
242     // `new std::atomic_size_t()`) because it would zero the read/write values
243     // in the shared memory, which may already represent data in the queue.
244     mRead = new (metadata) std::atomic_size_t;
245     mWrite = new (metadata + alignment) std::atomic_size_t;
246 
247     // The actual number of bytes we needed to properly align mRead
248     size_t align1 = metadata - header;
249     MOZ_ASSERT(align1 <= maxAlign1);
250 
251     // The rest of the memory is the user reserved memory
252     size_t headerSize = align1 + 2 * alignment;
253     size_t userSize = shmemSize - mQueueBufferSize - headerSize;
254     if (userSize > 0) {
255       mUserReservedMemory = mQueue + mQueueBufferSize + headerSize;
256       mUserReservedSize = userSize;
257     } else {
258       mUserReservedMemory = nullptr;
259       mUserReservedSize = 0;
260     }
261 
262     // We use Monitors to wait for data when reading from an empty queue
263     // and to wait for free space when writing to a full one.
264     MOZ_ASSERT(aMaybeNotEmptySem && aMaybeNotFullSem);
265     mMaybeNotEmptySem = aMaybeNotEmptySem;
266     mMaybeNotFullSem = aMaybeNotFullSem;
267 
268     PCQ_LOGD("Created queue (%p) with size: %zu, alignment: %zu, align1: %zu",
269              this, aQueueSize, alignment, align1);
270   }
271 
272   ~PcqBase() {
273     PCQ_LOGD("Destroying queue (%p).", this);
274     // NB: We would call the destructors for mRead and mWrite here (but not
275     // delete since their memory belongs to the shmem) but the std library's
276     // type aliases make this tricky and, by the spec for std::atomic, their
277     // destructors are trivial (i.e. no-ops) anyway.
278   }
279 
280   size_t GetReadRelaxed() { return mRead->load(std::memory_order_relaxed); }
281 
282   size_t GetWriteRelaxed() { return mWrite->load(std::memory_order_relaxed); }
283 
284   /**
285    * The QueueSize is the number of bytes the queue can hold.  The queue is
286    * backed by a buffer that is one byte larger than this, meaning that one
287    * byte of the buffer is always wasted.
288    * This is usually the right method to use when testing queue capacity.
289    */
290   size_t QueueSize() { return QueueBufferSize() - 1; }
291 
292   /**
293    * The QueueBufferSize is the number of bytes in the buffer that the queue
294    * uses for storage.
295    * This is usually the right method to use when calculating read/write head
296    * positions.
297    */
298   size_t QueueBufferSize() { return mQueueBufferSize; }
299 
300   // PID of process on the other end.  Both ends may run on the same process.
301   base::ProcessId mOtherPid;
302 
303   uint8_t* mQueue;
304   size_t mQueueBufferSize;
305 
306   // Pointer to memory reserved for use by the user, or null if none
307   uint8_t* mUserReservedMemory;
308   size_t mUserReservedSize;
309 
310   // These std::atomics are in shared memory so DO NOT DELETE THEM!  We should,
311   // however, call their destructors.
312   std::atomic_size_t* mRead;
313   std::atomic_size_t* mWrite;
314 
315   // The Shmem contents are laid out like this:
316   // -----------------------------------------------------------------------
317   // queue contents | align1 | mRead | align2 | mWrite | align3 | User Data
318   // -----------------------------------------------------------------------
319   // where align1 is chosen so that mRead is properly aligned for a
320   // std_atomic_size_t and is on a cache line separate from the queue contents
321   // align2 and align3 is chosen to separate mRead/mWrite and mWrite/User Data
322   // similarly.
323   Shmem mShmem;
324 
325   // Two semaphores that are signaled when the queue goes from a state
326   // where it definitely is empty/full to a state where it "may not be".
327   // Therefore, we can wait on them and know that we will be awakened if
328   // there may be work to do.
329   // Our use of these semaphores leans heavily on the assumption that
330   // the queue is used by one producer and one consumer.
331   RefPtr<PcqRCSemaphore> mMaybeNotEmptySem;
332   RefPtr<PcqRCSemaphore> mMaybeNotFullSem;
333 };
334 
335 }  // namespace detail
336 
337 namespace webgl {
338 
339 using mozilla::ipc::Shmem;
340 
341 /**
342  * The PcqProducer is the endpoint that inserts elements into the queue.  It
343  * should only be used from one thread at a time.
344  */
345 class PcqProducer : public detail::PcqBase {
346  public:
347   PcqProducer(PcqProducer&& aOther) = default;
348   PcqProducer& operator=(PcqProducer&&) = default;
349   PcqProducer() = default;  // for IPDL
350 
351   /**
352    * The number of bytes that the queue can hold.
353    */
354   size_t Size() { return QueueSize(); }
355 
356   /**
357    * Attempts to insert aArgs into the queue.  If the operation does not
358    * succeed then the queue is unchanged.
359    */
360   template <typename... Args>
361   QueueStatus TryInsert(Args&&... aArgs) {
362     size_t write = mWrite->load(std::memory_order_relaxed);
363     const size_t initWrite = write;
364     size_t read = mRead->load(std::memory_order_acquire);
365 
366     if (!ValidState(read, write)) {
367       PCQ_LOGE(
368           "Queue was found in an invalid state.  Queue Size: %zu.  "
369           "Read: %zu.  Write: %zu",
370           Size(), read, write);
371       return QueueStatus::kFatalError;
372     }
373 
374     ProducerView view(this, read, &write);
375 
376     // Check that the queue has enough unoccupied room for all Args types.
377     // This is based on the user's size estimate for args from QueueParamTraits.
378     size_t bytesNeeded = detail::MinSizeofArgs(view, &aArgs...);
379 
380     if (Size() < bytesNeeded) {
381       PCQ_LOGE(
382           "Queue is too small for objects.  Queue Size: %zu.  "
383           "Needed: %zu",
384           Size(), bytesNeeded);
385       return QueueStatus::kTooSmall;
386     }
387 
388     if (FreeBytes(read, write) < bytesNeeded) {
389       PCQ_LOGD(
390           "Not enough room to insert.  Has: %zu (%zu,%zu).  "
391           "Needed: %zu",
392           FreeBytes(read, write), read, write, bytesNeeded);
393       return QueueStatus::kNotReady;
394     }
395 
396     // Try to insert args in sequence.  Only update the queue if the
397     // operation was successful.  We already checked all normal means of
398     // failure but we can expect occasional failure here if the user's
399     // QueueParamTraits::MinSize method was inexact.
400     QueueStatus status = TryInsertHelper(view, aArgs...);
401     if (!status) {
402       PCQ_LOGD(
403           "Failed to insert with error (%d).  Has: %zu (%zu,%zu).  "
404           "Estimate of bytes needed: %zu",
405           (int)status, FreeBytes(read, write), read, write, bytesNeeded);
406       return status;
407     }
408 
409     MOZ_ASSERT(ValidState(read, write));
410 
411     // Check that at least bytesNeeded were produced.  Failing this means
412     // that some QueueParamTraits::MinSize estimated too many bytes.
413     bool enoughBytes =
414         UsedBytes(read, write) >=
415         UsedBytes(read, (initWrite + bytesNeeded) % QueueBufferSize());
416     MOZ_ASSERT(enoughBytes);
417     if (!enoughBytes) {
418       return QueueStatus::kFatalError;
419     }
420 
421     // Commit the transaction.
422     PCQ_LOGD(
423         "Successfully inserted.  PcqProducer used %zu bytes total.  "
424         "Write index: %zu -> %zu",
425         bytesNeeded, initWrite, write);
426     mWrite->store(write, std::memory_order_release);
427 
428     // Set the semaphore (unless it is already set) to let the consumer know
429     // that the queue may not be empty.  We just need to guarantee that it
430     // was set (i.e. non-zero) at some time after mWrite was updated.
431     if (!mMaybeNotEmptySem->IsAvailable()) {
432       mMaybeNotEmptySem->Signal();
433     }
434     return status;
435   }
436 
437   /**
438    * Attempts to insert aArgs into the queue.  If the operation does not
439    * succeed in the time allotted then the queue is unchanged.
440    */
441   template <typename... Args>
442   QueueStatus TryWaitInsert(const Maybe<TimeDuration>& aDuration,
443                             Args&&... aArgs) {
444     return TryWaitInsertImpl(false, aDuration, std::forward<Args>(aArgs)...);
445   }
446 
447   template <typename... Args>
448   QueueStatus TryTypedInsert(Args&&... aArgs) {
449     return TryInsert(PcqTypedArg<Args>(aArgs)...);
450   }
451 
452  protected:
453   friend ProducerConsumerQueue;
454   friend ProducerView<PcqProducer>;
455 
456   template <typename Arg, typename... Args>
457   QueueStatus TryInsertHelper(ProducerView<PcqProducer>& aView, const Arg& aArg,
458                               const Args&... aArgs) {
459     QueueStatus status = TryInsertItem(aView, aArg);
460     return IsSuccess(status) ? TryInsertHelper(aView, aArgs...) : status;
461   }
462 
463   QueueStatus TryInsertHelper(ProducerView<PcqProducer>&) {
464     return QueueStatus::kSuccess;
465   }
466 
467   template <typename Arg>
468   QueueStatus TryInsertItem(ProducerView<PcqProducer>& aView, const Arg& aArg) {
469     return QueueParamTraits<typename RemoveCVR<Arg>::Type>::Write(aView, aArg);
470   }
471 
472   template <typename... Args>
473   QueueStatus TryWaitInsertImpl(bool aRecursed,
474                                 const Maybe<TimeDuration>& aDuration,
475                                 Args&&... aArgs) {
476     // Wait up to aDuration for the not-full semaphore to be signaled.
477     // If we run out of time then quit.
478     TimeStamp start(TimeStamp::Now());
479     if (aRecursed && (!mMaybeNotFullSem->Wait(aDuration))) {
480       return QueueStatus::kNotReady;
481     }
482 
483     // Attempt to insert all args.  No waiting is done here.
484     QueueStatus status = TryInsert(std::forward<Args>(aArgs)...);
485 
486     TimeStamp now;
487     if (aRecursed && IsSuccess(status)) {
488       // If our local view of the queue is that it is still not full then
489       // we know it won't get full without us (we are the only producer).
490       // So re-set the not-full semaphore unless it's already set.
491       // (We are also the only not-full semaphore decrementer so it can't
492       // become 0.)
493       if ((!IsFull()) && (!mMaybeNotFullSem->IsAvailable())) {
494         mMaybeNotFullSem->Signal();
495       }
496     } else if ((status == QueueStatus::kNotReady) &&
497                (aDuration.isNothing() ||
498                 ((now = TimeStamp::Now()) - start) < aDuration.value())) {
499       // We don't have enough room but still have time, e.g. because
500       // the consumer read some data but not enough or because the
501       // not-full semaphore gave a false positive.  Either way, retry.
502       status =
503           aDuration.isNothing()
504               ? TryWaitInsertImpl(true, aDuration, std::forward<Args>(aArgs)...)
505               : TryWaitInsertImpl(true, Some(aDuration.value() - (now - start)),
506                                   std::forward<Args>(aArgs)...);
507     }
508 
509     return status;
510   }
511 
512   template <typename Arg>
513   QueueStatus WriteObject(size_t aRead, size_t* aWrite, const Arg& arg,
514                           size_t aArgSize) {
515     return Marshaller::WriteObject(mQueue, QueueBufferSize(), aRead, aWrite,
516                                    arg, aArgSize);
517   }
518 
519   // Currently, the PCQ requires any parameters expected to need more than
520   // 1/16 the total number of bytes in the command queue to use their own
521   // SharedMemory.
522   bool NeedsSharedMemory(size_t aRequested) {
523     return (Size() / 16) < aRequested;
524   }
525 
526   PcqProducer(Shmem& aShmem, base::ProcessId aOtherPid, size_t aQueueSize,
527               RefPtr<detail::PcqRCSemaphore> aMaybeNotEmptySem,
528               RefPtr<detail::PcqRCSemaphore> aMaybeNotFullSem)
529       : PcqBase(aShmem, aOtherPid, aQueueSize, aMaybeNotEmptySem,
530                 aMaybeNotFullSem) {
531     // Since they are shared, this initializes mRead/mWrite in the PcqConsumer
532     // as well.
533     *mRead = 0;
534     *mWrite = 0;
535   }
536 
537   PcqProducer(const PcqProducer&) = delete;
538   PcqProducer& operator=(const PcqProducer&) = delete;
539 };
540 
541 class PcqConsumer : public detail::PcqBase {
542  public:
543   PcqConsumer(PcqConsumer&& aOther) = default;
544   PcqConsumer& operator=(PcqConsumer&&) = default;
545   PcqConsumer() = default;  // for IPDL
546 
547   /**
548    * The number of bytes that the queue can hold.
549    */
550   size_t Size() { return QueueSize(); }
551 
552   /**
553    * Attempts to copy aArgs in the queue.  The queue remains unchanged.
554    */
555   template <typename... Args>
556   QueueStatus TryPeek(Args&... aArgs) {
557     return TryPeekOrRemove<false, Args...>(
558         [&](ConsumerView<PcqConsumer>& aView) -> QueueStatus {
559           return TryPeekRemoveHelper(aView, &aArgs...);
560         });
561   }
562 
563   template <typename... Args>
564   QueueStatus TryTypedPeek(Args&... aArgs) {
565     return TryPeek(PcqTypedArg<Args>(aArgs)...);
566   }
567 
568   /**
569    * Attempts to copy and remove aArgs from the queue.  If the operation does
570    * not succeed then the queue is unchanged.
571    */
572   template <typename... Args>
573   QueueStatus TryRemove(Args&... aArgs) {
574     return TryPeekOrRemove<true, Args...>(
575         [&](ConsumerView<PcqConsumer>& aView) -> QueueStatus {
576           return TryPeekRemoveHelper(aView, &aArgs...);
577         });
578   }
579 
580   template <typename... Args>
581   QueueStatus TryTypedRemove(Args&... aArgs) {
582     return TryRemove(PcqTypedArg<Args>(&aArgs)...);
583   }
584 
585   /**
586    * Attempts to remove Args from the queue without copying them.  If the
587    * operation does not succeed then the queue is unchanged.
588    */
589   template <typename... Args>
590   QueueStatus TryRemove() {
591     using seq = std::index_sequence_for<Args...>;
592     return TryRemove<Args...>(seq{});
593   }
594 
595   template <typename... Args>
596   QueueStatus TryTypedRemove() {
597     return TryRemove<PcqTypedArg<Args>...>();
598   }
599 
600   /**
601    * Wait for up to aDuration to get a copy of the requested data.  If the
602    * operation does not succeed in the time allotted then the queue is
603    * unchanged. Pass Nothing to wait until peek succeeds.
604    */
605   template <typename... Args>
606   QueueStatus TryWaitPeek(const Maybe<TimeDuration>& aDuration,
607                           Args&... aArgs) {
608     return TryWaitPeekOrRemove<false>(aDuration, aArgs...);
609   }
610 
611   /**
612    * Wait for up to aDuration to remove the requested data from the queue.
613    * Pass Nothing to wait until removal succeeds.
614    */
615   template <typename... Args>
616   QueueStatus TryWaitRemove(const Maybe<TimeDuration>& aDuration,
617                             Args&... aArgs) {
618     return TryWaitPeekOrRemove<true>(aDuration, aArgs...);
619   }
620 
621   /**
622    * Wait for up to aDuration to remove the requested data.  No copy
623    * of the data is returned.  If the operation does not succeed in the
624    * time allotted then the queue is unchanged.
625    * Pass Nothing to wait until removal succeeds.
626    */
627   template <typename... Args>
628   QueueStatus TryWaitRemove(const Maybe<TimeDuration>& aDuration) {
629     // Wait up to aDuration for the not-empty semaphore to be signaled.
630     // If we run out of time then quit.
631     TimeStamp start(TimeStamp::Now());
632     if (!mMaybeNotEmptySem->Wait(aDuration)) {
633       return QueueStatus::kNotReady;
634     }
635 
636     // Attempt to remove all args.  No waiting is done here.
637     QueueStatus status = TryRemove<Args...>();
638 
639     TimeStamp now;
640     if (IsSuccess(status)) {
641       // If our local view of the queue is that it is still not empty then
642       // we know it won't get empty without us (we are the only consumer).
643       // So re-set the not-empty semaphore unless it's already set.
644       // (We are also the only not-empty semaphore decrementer so it can't
645       // become 0.)
646       if ((!IsEmpty()) && (!mMaybeNotEmptySem->IsAvailable())) {
647         mMaybeNotEmptySem->Signal();
648       }
649     } else if ((status == QueueStatus::kNotReady) &&
650                (aDuration.isNothing() ||
651                 ((now = TimeStamp::Now()) - start) < aDuration.value())) {
652       // We don't have enough data but still have time, e.g. because
653       // the producer wrote some data but not enough or because the
654       // not-empty semaphore gave a false positive.  Either way, retry.
655       status =
656           aDuration.isNothing()
657               ? TryWaitRemove<Args...>(aDuration)
658               : TryWaitRemove<Args...>(Some(aDuration.value() - (now - start)));
659     }
660 
661     return status;
662   }
663 
664  protected:
665   friend ProducerConsumerQueue;
666   friend ConsumerView<PcqConsumer>;
667 
668   // PeekOrRemoveOperation takes a read pointer and a write index.
669   using PeekOrRemoveOperation =
670       std::function<QueueStatus(ConsumerView<PcqConsumer>&)>;
671 
672   template <bool isRemove, typename... Args>
673   QueueStatus TryPeekOrRemove(const PeekOrRemoveOperation& aOperation) {
674     size_t write = mWrite->load(std::memory_order_acquire);
675     size_t read = mRead->load(std::memory_order_relaxed);
676     const size_t initRead = read;
677 
678     if (!ValidState(read, write)) {
679       PCQ_LOGE(
680           "Queue was found in an invalid state.  Queue Size: %zu.  "
681           "Read: %zu.  Write: %zu",
682           Size(), read, write);
683       return QueueStatus::kFatalError;
684     }
685 
686     ConsumerView<PcqConsumer> view(this, &read, write);
687 
688     // Check that the queue has enough unoccupied room for all Args types.
689     // This is based on the user's size estimate for Args from QueueParamTraits.
690     size_t bytesNeeded = detail::MinSizeofArgs(view);
691 
692     if (Size() < bytesNeeded) {
693       PCQ_LOGE(
694           "Queue is too small for objects.  Queue Size: %zu.  "
695           "Bytes needed: %zu.",
696           Size(), bytesNeeded);
697       return QueueStatus::kTooSmall;
698     }
699 
700     if (UsedBytes(read, write) < bytesNeeded) {
701       PCQ_LOGD(
702           "Not enough data in queue.  Has: %zu (%zu,%zu).  "
703           "Bytes needed: %zu",
704           UsedBytes(read, write), read, write, bytesNeeded);
705       return QueueStatus::kNotReady;
706     }
707 
708     // Only update the queue if the operation was successful and we aren't
709     // peeking.
710     QueueStatus status = aOperation(view);
711     if (!status) {
712       return status;
713     }
714 
715     // Check that at least bytesNeeded were consumed.  Failing this means
716     // that some QueueParamTraits::MinSize estimated too many bytes.
717     bool enoughBytes =
718         FreeBytes(read, write) >=
719         FreeBytes((initRead + bytesNeeded) % QueueBufferSize(), write);
720     MOZ_ASSERT(enoughBytes);
721     if (!enoughBytes) {
722       return QueueStatus::kFatalError;
723     }
724 
725     MOZ_ASSERT(ValidState(read, write));
726 
727     PCQ_LOGD(
728         "Successfully %s.  PcqConsumer used %zu bytes total.  "
729         "Read index: %zu -> %zu",
730         isRemove ? "removed" : "peeked", bytesNeeded, initRead, read);
731 
732     // Commit the transaction... unless we were just peeking.
733     if (isRemove) {
734       mRead->store(read, std::memory_order_release);
735       // Set the semaphore (unless it is already set) to let the producer know
736       // that the queue may not be full.  We just need to guarantee that it
737       // was set (i.e. non-zero) at some time after mRead was updated.
738       if (!mMaybeNotFullSem->IsAvailable()) {
739         mMaybeNotFullSem->Signal();
740       }
741     }
742     return status;
743   }
744 
745   // Helper that passes nulls for all Args*
746   template <typename... Args, size_t... Is>
747   QueueStatus TryRemove(std::index_sequence<Is...>) {
748     std::tuple<Args*...> nullArgs;
749     return TryPeekOrRemove<true, Args...>(
750         [&](ConsumerView<PcqConsumer>& aView) {
751           return TryPeekRemoveHelper(aView, std::get<Is>(nullArgs)...);
752         });
753   }
754 
755   template <bool isRemove, typename... Args>
756   QueueStatus TryWaitPeekOrRemove(const Maybe<TimeDuration>& aDuration,
757                                   Args&&... aArgs) {
758     return TryWaitPeekOrRemoveImpl<isRemove>(false, aDuration,
759                                              std::forward<Args>(aArgs)...);
760   }
761 
762   template <bool isRemove, typename... Args>
763   QueueStatus TryWaitPeekOrRemoveImpl(bool aRecursed,
764                                       const Maybe<TimeDuration>& aDuration,
765                                       Args&... aArgs) {
766     // Wait up to aDuration for the not-empty semaphore to be signaled.
767     // If we run out of time then quit.
768     TimeStamp start(TimeStamp::Now());
769     if (aRecursed && (!mMaybeNotEmptySem->Wait(aDuration))) {
770       return QueueStatus::kNotReady;
771     }
772 
773     // Attempt to read all args.  No waiting is done here.
774     QueueStatus status = isRemove ? TryRemove(aArgs...) : TryPeek(aArgs...);
775 
776     TimeStamp now;
777     if (aRecursed && IsSuccess(status)) {
778       // If our local view of the queue is that it is still not empty then
779       // we know it won't get empty without us (we are the only consumer).
780       // So re-set the not-empty semaphore unless it's already set.
781       // (We are also the only not-empty semaphore decrementer so it can't
782       // become 0.)
783       if ((!IsEmpty()) && (!mMaybeNotEmptySem->IsAvailable())) {
784         mMaybeNotEmptySem->Signal();
785       }
786     } else if ((status == QueueStatus::kNotReady) &&
787                (aDuration.isNothing() ||
788                 ((now = TimeStamp::Now()) - start) < aDuration.value())) {
789       // We don't have enough data but still have time, e.g. because
790       // the producer wrote some data but not enough or because the
791       // not-empty semaphore gave a false positive.  Either way, retry.
792       status =
793           aDuration.isNothing()
794               ? TryWaitPeekOrRemoveImpl<isRemove>(true, aDuration, aArgs...)
795               : TryWaitPeekOrRemoveImpl<isRemove>(
796                     true, Some(aDuration.value() - (now - start)), aArgs...);
797     }
798 
799     return status;
800   }
801 
802   // Version of the helper for copying values out of the queue.
803   template <typename... Args>
804   QueueStatus TryPeekRemoveHelper(ConsumerView<PcqConsumer>& aView,
805                                   Args*... aArgs);
806 
807   template <typename Arg, typename... Args>
808   QueueStatus TryPeekRemoveHelper(ConsumerView<PcqConsumer>& aView, Arg* aArg,
809                                   Args*... aArgs) {
810     QueueStatus status = TryCopyOrSkipItem<Arg>(aView, aArg);
811     return IsSuccess(status) ? TryPeekRemoveHelper<Args...>(aView, aArgs...)
812                              : status;
813   }
814 
815   QueueStatus TryPeekRemoveHelper(ConsumerView<PcqConsumer>&) {
816     return QueueStatus::kSuccess;
817   }
818 
819   // If an item is available then it is copied into aArg.  The item is skipped
820   // over if aArg is null.
821   template <typename Arg>
822   QueueStatus TryCopyOrSkipItem(ConsumerView<PcqConsumer>& aView, Arg* aArg) {
823     return QueueParamTraits<typename RemoveCVR<Arg>::Type>::Read(
824         aView, const_cast<std::remove_cv_t<Arg>*>(aArg));
825   }
826 
827   template <typename Arg>
828   QueueStatus ReadObject(size_t* aRead, size_t aWrite, Arg* arg,
829                          size_t aArgSize) {
830     return Marshaller::ReadObject(mQueue, QueueBufferSize(), aRead, aWrite, arg,
831                                   aArgSize);
832   }
833 
834   // Currently, the PCQ requires any parameters expected to need more than
835   // 1/16 the total number of bytes in the command queue to use their own
836   // SharedMemory.
837   bool NeedsSharedMemory(size_t aRequested) {
838     return (Size() / 16) < aRequested;
839   }
840 
841   PcqConsumer(Shmem& aShmem, base::ProcessId aOtherPid, size_t aQueueSize,
842               RefPtr<detail::PcqRCSemaphore> aMaybeNotEmptySem,
843               RefPtr<detail::PcqRCSemaphore> aMaybeNotFullSem)
844       : PcqBase(aShmem, aOtherPid, aQueueSize, aMaybeNotEmptySem,
845                 aMaybeNotFullSem) {}
846 
847   PcqConsumer(const PcqConsumer&) = delete;
848   PcqConsumer& operator=(const PcqConsumer&) = delete;
849 };
850 
851 using mozilla::detail::GetCacheLineSize;
852 using mozilla::detail::GetMaxHeaderSize;
853 
854 /**
855  * A single producer + single consumer queue, implemented as a
856  * circular queue.  The object is backed with a Shmem, which allows
857  * it to be used across processes.
858  *
859  * This is a single-producer/single-consumer queue.  Another way of saying that
860  * is to say that the PcqProducer and PcqConsumer objects are not thread-safe.
861  */
862 class ProducerConsumerQueue {
863  public:
864   /**
865    * Create a queue whose endpoints are the same as those of aProtocol.
866    * In choosing a queueSize, be aware that both the queue and the Shmem will
867    * allocate additional shared memory for internal accounting (see
868    * GetMaxHeaderSize) and that Shmem sizes are a multiple of the operating
869    * system's page sizes.
870    *
871    * aAdditionalBytes of shared memory will also be allocated.
872    * Clients may use this shared memory for their own purposes.
873    * See GetUserReservedMemory() and GetUserReservedMemorySize()
874    */
875   static UniquePtr<ProducerConsumerQueue> Create(
876       mozilla::ipc::IProtocol* aProtocol, size_t aQueueSize,
877       size_t aAdditionalBytes = 0) {
878     MOZ_ASSERT(aProtocol);
879     Shmem shmem;
880 
881     // NB: We need one extra byte for the queue contents (hence the "+1").
882     uint32_t totalShmemSize =
883         aQueueSize + 1 + GetMaxHeaderSize() + aAdditionalBytes;
884 
885     if (!aProtocol->AllocUnsafeShmem(
886             totalShmemSize, mozilla::ipc::SharedMemory::TYPE_BASIC, &shmem)) {
887       return nullptr;
888     }
889 
890     UniquePtr<ProducerConsumerQueue> ret =
891         Create(shmem, aProtocol->OtherPid(), aQueueSize);
892     if (!ret) {
893       return ret;
894     }
895 
896     // The system may have reserved more bytes than the user asked for.
897     // Make sure they aren't given access to the extra.
898     MOZ_ASSERT(ret->mProducer->mUserReservedSize >= aAdditionalBytes);
899     ret->mProducer->mUserReservedSize = aAdditionalBytes;
900     ret->mConsumer->mUserReservedSize = aAdditionalBytes;
901     if (aAdditionalBytes == 0) {
902       ret->mProducer->mUserReservedMemory = nullptr;
903       ret->mConsumer->mUserReservedMemory = nullptr;
904     }
905     return ret;
906   }
907 
908   /**
909    * Create a queue that is backed by aShmem, which must be:
910    * (1) unsafe
911    * (2) made for use with aOtherPid (may be this process' PID)
912    * (3) large enough to hold the queue contents and the shared meta-data of
913    *     the queue (see GetMaxHeaderSize).  Any room left over will be available
914    *     as user reserved memory.
915    *     See GetUserReservedMemory() and GetUserReservedMemorySize()
916    */
917   static UniquePtr<ProducerConsumerQueue> Create(Shmem& aShmem,
918                                                  base::ProcessId aOtherPid,
919                                                  size_t aQueueSize) {
920     uint32_t totalShmemSize = aShmem.Size<uint8_t>();
921 
922     // NB: We need one extra byte for the queue contents (hence the "+1").
923     if ((!aShmem.IsWritable()) || (!aShmem.IsReadable()) ||
924         ((GetMaxHeaderSize() + aQueueSize + 1) > totalShmemSize)) {
925       return nullptr;
926     }
927 
928     auto notempty = MakeRefPtr<detail::PcqRCSemaphore>(
929         CrossProcessSemaphore::Create("webgl-notempty", 0));
930     auto notfull = MakeRefPtr<detail::PcqRCSemaphore>(
931         CrossProcessSemaphore::Create("webgl-notfull", 1));
932     return WrapUnique(new ProducerConsumerQueue(aShmem, aOtherPid, aQueueSize,
933                                                 notempty, notfull));
934   }
935 
936   /**
937    * The queue needs a few bytes for 2 shared counters.  It takes these from the
938    * underlying Shmem.  This will still work if the cache line size is incorrect
939    * for some architecture but operations may be less efficient.
940    */
941   static constexpr size_t GetMaxHeaderSize() {
942     return mozilla::detail::GetMaxHeaderSize();
943   }
944 
945   /**
946    * Cache line size for the machine.  We assume a 64-byte cache line size.
947    */
948   static constexpr size_t GetCacheLineSize() {
949     return mozilla::detail::GetCacheLineSize();
950   }
951 
952   using Producer = PcqProducer;
953   using Consumer = PcqConsumer;
954 
955   UniquePtr<Producer> TakeProducer() { return std::move(mProducer); }
956   UniquePtr<Consumer> TakeConsumer() { return std::move(mConsumer); }
957 
958  private:
959   ProducerConsumerQueue(Shmem& aShmem, base::ProcessId aOtherPid,
960                         size_t aQueueSize,
961                         RefPtr<detail::PcqRCSemaphore>& aMaybeNotEmptySem,
962                         RefPtr<detail::PcqRCSemaphore>& aMaybeNotFullSem)
963       : mProducer(
964             WrapUnique(new Producer(aShmem, aOtherPid, aQueueSize,
965                                     aMaybeNotEmptySem, aMaybeNotFullSem))),
966         mConsumer(
967             WrapUnique(new Consumer(aShmem, aOtherPid, aQueueSize,
968                                     aMaybeNotEmptySem, aMaybeNotFullSem))) {
969     PCQ_LOGD(
970         "Constructed PCQ (%p).  Shmem Size = %zu. Queue Size = %zu.  "
971         "Other process ID: %08x.",
972         this, aShmem.Size<uint8_t>(), aQueueSize, (uint32_t)aOtherPid);
973   }
974 
975   UniquePtr<Producer> mProducer;
976   UniquePtr<Consumer> mConsumer;
977 };
978 
979 }  // namespace webgl
980 
981 namespace ipc {
982 
983 template <>
984 struct IPDLParamTraits<mozilla::detail::PcqBase> {
985   typedef mozilla::detail::PcqBase paramType;
986 
987   static void Write(IPC::Message* aMsg, IProtocol* aActor, paramType& aParam) {
988     WriteIPDLParam(aMsg, aActor, aParam.QueueSize());
989     WriteIPDLParam(aMsg, aActor, std::move(aParam.mShmem));
990 
991     // May not currently share a PcqProducer or PcqConsumer with a process that
992     // it's Shmem is not related to.
993     MOZ_ASSERT(aActor->OtherPid() == aParam.mOtherPid);
994     WriteIPDLParam(
995         aMsg, aActor,
996         aParam.mMaybeNotEmptySem->ShareToProcess(aActor->OtherPid()));
997 
998     WriteIPDLParam(aMsg, aActor,
999                    aParam.mMaybeNotFullSem->ShareToProcess(aActor->OtherPid()));
1000   }
1001 
1002   static bool Read(const IPC::Message* aMsg, PickleIterator* aIter,
1003                    IProtocol* aActor, paramType* aResult) {
1004     size_t queueSize;
1005     Shmem shmem;
1006     CrossProcessSemaphoreHandle notEmptyHandle;
1007     CrossProcessSemaphoreHandle notFullHandle;
1008 
1009     if (!ReadIPDLParam(aMsg, aIter, aActor, &queueSize) ||
1010         !ReadIPDLParam(aMsg, aIter, aActor, &shmem) ||
1011         !ReadIPDLParam(aMsg, aIter, aActor, &notEmptyHandle) ||
1012         !ReadIPDLParam(aMsg, aIter, aActor, &notFullHandle)) {
1013       return false;
1014     }
1015 
1016     MOZ_ASSERT(IsHandleValid(notEmptyHandle) && IsHandleValid(notFullHandle));
1017     aResult->Set(shmem, aActor->OtherPid(), queueSize,
1018                  MakeRefPtr<detail::PcqRCSemaphore>(
1019                      CrossProcessSemaphore::Create(notEmptyHandle)),
1020                  MakeRefPtr<detail::PcqRCSemaphore>(
1021                      CrossProcessSemaphore::Create(notFullHandle)));
1022     return true;
1023   }
1024 
1025   static void Log(const paramType& aParam, std::wstring* aLog) {
1026     IPDLParamTraits<Shmem>::Log(aParam.mShmem, aLog);
1027   }
1028 };
1029 
1030 template <>
1031 struct IPDLParamTraits<mozilla::webgl::PcqProducer>
1032     : public IPDLParamTraits<mozilla::detail::PcqBase> {
1033   typedef mozilla::webgl::PcqProducer paramType;
1034 };
1035 
1036 template <>
1037 struct IPDLParamTraits<mozilla::webgl::PcqConsumer>
1038     : public IPDLParamTraits<mozilla::detail::PcqBase> {
1039   typedef mozilla::webgl::PcqConsumer paramType;
1040 };
1041 
1042 }  // namespace ipc
1043 }  // namespace mozilla
1044 
1045 #endif  // mozilla_ipc_ProducerConsumerQueue_h
1046