1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 * vim: sw=2 ts=4 et :
3 */
4 /* This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7
8 #ifndef mozilla_ipc_ProducerConsumerQueue_h
9 #define mozilla_ipc_ProducerConsumerQueue_h 1
10
11 #include <atomic>
12 #include <tuple>
13 #include <type_traits>
14 #include <utility>
15 #include <vector>
16 #include "mozilla/dom/QueueParamTraits.h"
17 #include "CrossProcessSemaphore.h"
18
19 namespace IPC {
20 template <typename T>
21 struct ParamTraits;
22 } // namespace IPC
23
24 namespace mozilla {
25 namespace webgl {
26
27 using IPC::PcqTypeInfo;
28 using IPC::PcqTypeInfoID;
29
30 extern LazyLogModule gPCQLog;
31 #define PCQ_LOG_(lvl, ...) MOZ_LOG(mozilla::webgl::gPCQLog, lvl, (__VA_ARGS__))
32 #define PCQ_LOGD(...) PCQ_LOG_(LogLevel::Debug, __VA_ARGS__)
33 #define PCQ_LOGE(...) PCQ_LOG_(LogLevel::Error, __VA_ARGS__)
34
35 class ProducerConsumerQuue;
36 class PcqProducer;
37 class PcqConsumer;
38
39 } // namespace webgl
40
41 // NB: detail is in mozilla instead of mozilla::webgl because many points in
42 // existing code get confused if mozilla::detail and mozilla::webgl::detail
43 // exist.
44 namespace detail {
45 using IPC::PcqTypeInfo;
46 using IPC::PcqTypeInfoID;
47
48 using mozilla::ipc::Shmem;
49 using mozilla::webgl::IsSuccess;
50 using mozilla::webgl::ProducerConsumerQueue;
51 using mozilla::webgl::QueueStatus;
52
GetCacheLineSize()53 constexpr size_t GetCacheLineSize() { return 64; }
54
55 // NB: The header may end up consuming fewer bytes than this. This value
56 // guarantees that we can always byte-align the header contents.
GetMaxHeaderSize()57 constexpr size_t GetMaxHeaderSize() {
58 // Recall that the Shmem contents are laid out like this:
59 // -----------------------------------------------------------------------
60 // queue contents | align1 | mRead | align2 | mWrite | align3 | User Data
61 // -----------------------------------------------------------------------
62
63 constexpr size_t alignment =
64 std::max(std::alignment_of<size_t>::value, GetCacheLineSize());
65 static_assert(alignment >= sizeof(size_t),
66 "alignment expected to be large enough to hold a size_t");
67
68 // We may need up to this many bytes to properly align mRead
69 constexpr size_t maxAlign1 = alignment - 1;
70 constexpr size_t readAndAlign2 = alignment;
71 constexpr size_t writeAndAlign3 = alignment;
72 return maxAlign1 + readAndAlign2 + writeAndAlign3;
73 }
74
75 template <typename View, typename Arg, typename... Args>
MinSizeofArgs(View & aView,const Arg * aArg,const Args * ...aArgs)76 size_t MinSizeofArgs(View& aView, const Arg* aArg, const Args*... aArgs) {
77 return aView.MinSizeParam(aArg) + MinSizeofArgs(aView, aArgs...);
78 }
79
80 template <typename View>
MinSizeofArgs(View &)81 size_t MinSizeofArgs(View&) {
82 return 0;
83 }
84
85 template <typename View, typename Arg1, typename Arg2, typename... Args>
MinSizeofArgs(View & aView)86 size_t MinSizeofArgs(View& aView) {
87 return aView.template MinSizeParam<Arg1>(nullptr) +
88 MinSizeofArgs<Arg2, Args...>(aView);
89 }
90
91 template <typename View, typename Arg>
MinSizeofArgs(View & aView)92 size_t MinSizeofArgs(View& aView) {
93 return aView.MinSizeParam(nullptr);
94 }
95
96 class PcqRCSemaphore {
97 public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(PcqRCSemaphore)98 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(PcqRCSemaphore)
99 explicit PcqRCSemaphore(CrossProcessSemaphore* aSem) : mSem(aSem) {
100 MOZ_ASSERT(mSem);
101 }
102
Wait(const Maybe<TimeDuration> & aTime)103 bool Wait(const Maybe<TimeDuration>& aTime) { return mSem->Wait(aTime); }
Signal()104 void Signal() { mSem->Signal(); }
IsAvailable()105 bool IsAvailable() {
106 MOZ_ASSERT_UNREACHABLE("Unimplemented");
107 return false;
108 }
ShareToProcess(base::ProcessId aTargetPid)109 CrossProcessSemaphoreHandle ShareToProcess(base::ProcessId aTargetPid) {
110 return mSem->ShareToProcess(aTargetPid);
111 }
CloseHandle()112 void CloseHandle() { mSem->CloseHandle(); }
113
114 private:
~PcqRCSemaphore()115 ~PcqRCSemaphore() { delete mSem; }
116 CrossProcessSemaphore* mSem;
117 };
118
119 /**
120 * Common base class for PcqProducer and Consumer.
121 */
122 class PcqBase {
123 public:
124 /**
125 * Bytes used in the queue if the parameters are the read/write heads.
126 */
UsedBytes(size_t aRead,size_t aWrite)127 size_t UsedBytes(size_t aRead, size_t aWrite) {
128 MOZ_ASSERT(ValidState(aRead, aWrite));
129 return mozilla::webgl::UsedBytes(QueueBufferSize(), aRead, aWrite);
130 }
131
132 /**
133 * Bytes free in the queue if the parameters are the read/write heads.
134 */
FreeBytes(size_t aRead,size_t aWrite)135 size_t FreeBytes(size_t aRead, size_t aWrite) {
136 MOZ_ASSERT(ValidState(aRead, aWrite));
137 return mozilla::webgl::FreeBytes(QueueBufferSize(), aRead, aWrite);
138 }
139
140 /**
141 * True when this queue is valid with the parameters as the read/write heads.
142 */
ValidState(size_t aRead,size_t aWrite)143 bool ValidState(size_t aRead, size_t aWrite) {
144 return (aRead < QueueBufferSize()) && (aWrite < QueueBufferSize());
145 }
146
147 /**
148 * True when this queue is empty with the parameters as the read/write heads.
149 */
IsEmpty(size_t aRead,size_t aWrite)150 bool IsEmpty(size_t aRead, size_t aWrite) {
151 MOZ_ASSERT(ValidState(aRead, aWrite));
152 return UsedBytes(aRead, aWrite) == 0;
153 }
154
155 /**
156 * True when this queue is full with the parameters as the read/write heads.
157 */
IsFull(size_t aRead,size_t aWrite)158 bool IsFull(size_t aRead, size_t aWrite) {
159 MOZ_ASSERT(ValidState(aRead, aWrite));
160 return FreeBytes(aRead, aWrite) == 0;
161 }
162
163 // Cheaply get the used size of the current queue. This does no
164 // synchronization so the information may be stale. On the PcqProducer
165 // side, it will never underestimate the number of bytes used and,
166 // on the Consumer side, it will never overestimate them.
167 // (The reciprocal is true of FreeBytes.)
UsedBytes()168 size_t UsedBytes() {
169 size_t write = mWrite->load(std::memory_order_relaxed);
170 size_t read = mRead->load(std::memory_order_relaxed);
171 return UsedBytes(read, write);
172 }
173
174 // This does no synchronization so the information may be stale.
FreeBytes()175 size_t FreeBytes() { return QueueSize() - UsedBytes(); }
176
177 // This does no synchronization so the information may be stale.
IsEmpty()178 bool IsEmpty() { return IsEmpty(GetReadRelaxed(), GetWriteRelaxed()); }
179
180 // This does no synchronization so the information may be stale.
IsFull()181 bool IsFull() { return IsFull(GetReadRelaxed(), GetWriteRelaxed()); }
182
183 protected:
184 friend struct mozilla::ipc::IPDLParamTraits<PcqBase>;
185 friend ProducerConsumerQueue;
186
187 PcqBase()
188 : mOtherPid(0),
189 mQueue(nullptr),
190 mQueueBufferSize(0),
191 mUserReservedMemory(nullptr),
192 mUserReservedSize(0),
193 mRead(nullptr),
194 mWrite(nullptr) {}
195
196 PcqBase(Shmem& aShmem, base::ProcessId aOtherPid, size_t aQueueSize,
197 RefPtr<PcqRCSemaphore> aMaybeNotEmptySem,
198 RefPtr<PcqRCSemaphore> aMaybeNotFullSem) {
199 Set(aShmem, aOtherPid, aQueueSize, aMaybeNotEmptySem, aMaybeNotFullSem);
200 }
201
202 PcqBase(const PcqBase&) = delete;
203 PcqBase(PcqBase&&) = default;
204 PcqBase& operator=(const PcqBase&) = delete;
205 PcqBase& operator=(PcqBase&&) = default;
206
207 void Set(Shmem& aShmem, base::ProcessId aOtherPid, size_t aQueueSize,
208 RefPtr<PcqRCSemaphore> aMaybeNotEmptySem,
209 RefPtr<PcqRCSemaphore> aMaybeNotFullSem) {
210 mOtherPid = aOtherPid;
211 mShmem = aShmem;
212 mQueue = aShmem.get<uint8_t>();
213
214 // NB: The buffer needs one extra byte for the queue contents
215 mQueueBufferSize = aQueueSize + 1;
216
217 // Recall that the Shmem contents are laid out like this:
218 // -----------------------------------------------------------------------
219 // queue contents | align1 | mRead | align2 | mWrite | align3 | User Data
220 // -----------------------------------------------------------------------
221
222 size_t shmemSize = aShmem.Size<uint8_t>();
223 uint8_t* header = mQueue + mQueueBufferSize;
224
225 constexpr size_t alignment =
226 std::max(std::alignment_of<size_t>::value, GetCacheLineSize());
227 static_assert(alignment >= sizeof(size_t),
228 "alignment expected to be large enough to hold a size_t");
229
230 static_assert((alignment & (alignment - 1)) == 0,
231 "alignment must be a power of 2");
232
233 // We may need up to this many bytes to properly align mRead
234 constexpr size_t maxAlign1 = alignment - 1;
235
236 // Find the lowest value of align1 that assures proper byte-alignment.
237 uintptr_t alignValue = reinterpret_cast<uintptr_t>(header + maxAlign1);
238 alignValue &= ~(alignment - 1);
239 uint8_t* metadata = reinterpret_cast<uint8_t*>(alignValue);
240
241 // NB: We do not call the nontrivial constructor here (we do not write
242 // `new std::atomic_size_t()`) because it would zero the read/write values
243 // in the shared memory, which may already represent data in the queue.
244 mRead = new (metadata) std::atomic_size_t;
245 mWrite = new (metadata + alignment) std::atomic_size_t;
246
247 // The actual number of bytes we needed to properly align mRead
248 size_t align1 = metadata - header;
249 MOZ_ASSERT(align1 <= maxAlign1);
250
251 // The rest of the memory is the user reserved memory
252 size_t headerSize = align1 + 2 * alignment;
253 size_t userSize = shmemSize - mQueueBufferSize - headerSize;
254 if (userSize > 0) {
255 mUserReservedMemory = mQueue + mQueueBufferSize + headerSize;
256 mUserReservedSize = userSize;
257 } else {
258 mUserReservedMemory = nullptr;
259 mUserReservedSize = 0;
260 }
261
262 // We use Monitors to wait for data when reading from an empty queue
263 // and to wait for free space when writing to a full one.
264 MOZ_ASSERT(aMaybeNotEmptySem && aMaybeNotFullSem);
265 mMaybeNotEmptySem = aMaybeNotEmptySem;
266 mMaybeNotFullSem = aMaybeNotFullSem;
267
268 PCQ_LOGD("Created queue (%p) with size: %zu, alignment: %zu, align1: %zu",
269 this, aQueueSize, alignment, align1);
270 }
271
272 ~PcqBase() {
273 PCQ_LOGD("Destroying queue (%p).", this);
274 // NB: We would call the destructors for mRead and mWrite here (but not
275 // delete since their memory belongs to the shmem) but the std library's
276 // type aliases make this tricky and, by the spec for std::atomic, their
277 // destructors are trivial (i.e. no-ops) anyway.
278 }
279
280 size_t GetReadRelaxed() { return mRead->load(std::memory_order_relaxed); }
281
282 size_t GetWriteRelaxed() { return mWrite->load(std::memory_order_relaxed); }
283
284 /**
285 * The QueueSize is the number of bytes the queue can hold. The queue is
286 * backed by a buffer that is one byte larger than this, meaning that one
287 * byte of the buffer is always wasted.
288 * This is usually the right method to use when testing queue capacity.
289 */
290 size_t QueueSize() { return QueueBufferSize() - 1; }
291
292 /**
293 * The QueueBufferSize is the number of bytes in the buffer that the queue
294 * uses for storage.
295 * This is usually the right method to use when calculating read/write head
296 * positions.
297 */
298 size_t QueueBufferSize() { return mQueueBufferSize; }
299
300 // PID of process on the other end. Both ends may run on the same process.
301 base::ProcessId mOtherPid;
302
303 uint8_t* mQueue;
304 size_t mQueueBufferSize;
305
306 // Pointer to memory reserved for use by the user, or null if none
307 uint8_t* mUserReservedMemory;
308 size_t mUserReservedSize;
309
310 // These std::atomics are in shared memory so DO NOT DELETE THEM! We should,
311 // however, call their destructors.
312 std::atomic_size_t* mRead;
313 std::atomic_size_t* mWrite;
314
315 // The Shmem contents are laid out like this:
316 // -----------------------------------------------------------------------
317 // queue contents | align1 | mRead | align2 | mWrite | align3 | User Data
318 // -----------------------------------------------------------------------
319 // where align1 is chosen so that mRead is properly aligned for a
320 // std_atomic_size_t and is on a cache line separate from the queue contents
321 // align2 and align3 is chosen to separate mRead/mWrite and mWrite/User Data
322 // similarly.
323 Shmem mShmem;
324
325 // Two semaphores that are signaled when the queue goes from a state
326 // where it definitely is empty/full to a state where it "may not be".
327 // Therefore, we can wait on them and know that we will be awakened if
328 // there may be work to do.
329 // Our use of these semaphores leans heavily on the assumption that
330 // the queue is used by one producer and one consumer.
331 RefPtr<PcqRCSemaphore> mMaybeNotEmptySem;
332 RefPtr<PcqRCSemaphore> mMaybeNotFullSem;
333 };
334
335 } // namespace detail
336
337 namespace webgl {
338
339 using mozilla::ipc::Shmem;
340
341 /**
342 * The PcqProducer is the endpoint that inserts elements into the queue. It
343 * should only be used from one thread at a time.
344 */
345 class PcqProducer : public detail::PcqBase {
346 public:
347 PcqProducer(PcqProducer&& aOther) = default;
348 PcqProducer& operator=(PcqProducer&&) = default;
349 PcqProducer() = default; // for IPDL
350
351 /**
352 * The number of bytes that the queue can hold.
353 */
354 size_t Size() { return QueueSize(); }
355
356 /**
357 * Attempts to insert aArgs into the queue. If the operation does not
358 * succeed then the queue is unchanged.
359 */
360 template <typename... Args>
361 QueueStatus TryInsert(Args&&... aArgs) {
362 size_t write = mWrite->load(std::memory_order_relaxed);
363 const size_t initWrite = write;
364 size_t read = mRead->load(std::memory_order_acquire);
365
366 if (!ValidState(read, write)) {
367 PCQ_LOGE(
368 "Queue was found in an invalid state. Queue Size: %zu. "
369 "Read: %zu. Write: %zu",
370 Size(), read, write);
371 return QueueStatus::kFatalError;
372 }
373
374 ProducerView view(this, read, &write);
375
376 // Check that the queue has enough unoccupied room for all Args types.
377 // This is based on the user's size estimate for args from QueueParamTraits.
378 size_t bytesNeeded = detail::MinSizeofArgs(view, &aArgs...);
379
380 if (Size() < bytesNeeded) {
381 PCQ_LOGE(
382 "Queue is too small for objects. Queue Size: %zu. "
383 "Needed: %zu",
384 Size(), bytesNeeded);
385 return QueueStatus::kTooSmall;
386 }
387
388 if (FreeBytes(read, write) < bytesNeeded) {
389 PCQ_LOGD(
390 "Not enough room to insert. Has: %zu (%zu,%zu). "
391 "Needed: %zu",
392 FreeBytes(read, write), read, write, bytesNeeded);
393 return QueueStatus::kNotReady;
394 }
395
396 // Try to insert args in sequence. Only update the queue if the
397 // operation was successful. We already checked all normal means of
398 // failure but we can expect occasional failure here if the user's
399 // QueueParamTraits::MinSize method was inexact.
400 QueueStatus status = TryInsertHelper(view, aArgs...);
401 if (!status) {
402 PCQ_LOGD(
403 "Failed to insert with error (%d). Has: %zu (%zu,%zu). "
404 "Estimate of bytes needed: %zu",
405 (int)status, FreeBytes(read, write), read, write, bytesNeeded);
406 return status;
407 }
408
409 MOZ_ASSERT(ValidState(read, write));
410
411 // Check that at least bytesNeeded were produced. Failing this means
412 // that some QueueParamTraits::MinSize estimated too many bytes.
413 bool enoughBytes =
414 UsedBytes(read, write) >=
415 UsedBytes(read, (initWrite + bytesNeeded) % QueueBufferSize());
416 MOZ_ASSERT(enoughBytes);
417 if (!enoughBytes) {
418 return QueueStatus::kFatalError;
419 }
420
421 // Commit the transaction.
422 PCQ_LOGD(
423 "Successfully inserted. PcqProducer used %zu bytes total. "
424 "Write index: %zu -> %zu",
425 bytesNeeded, initWrite, write);
426 mWrite->store(write, std::memory_order_release);
427
428 // Set the semaphore (unless it is already set) to let the consumer know
429 // that the queue may not be empty. We just need to guarantee that it
430 // was set (i.e. non-zero) at some time after mWrite was updated.
431 if (!mMaybeNotEmptySem->IsAvailable()) {
432 mMaybeNotEmptySem->Signal();
433 }
434 return status;
435 }
436
437 /**
438 * Attempts to insert aArgs into the queue. If the operation does not
439 * succeed in the time allotted then the queue is unchanged.
440 */
441 template <typename... Args>
442 QueueStatus TryWaitInsert(const Maybe<TimeDuration>& aDuration,
443 Args&&... aArgs) {
444 return TryWaitInsertImpl(false, aDuration, std::forward<Args>(aArgs)...);
445 }
446
447 template <typename... Args>
448 QueueStatus TryTypedInsert(Args&&... aArgs) {
449 return TryInsert(PcqTypedArg<Args>(aArgs)...);
450 }
451
452 protected:
453 friend ProducerConsumerQueue;
454 friend ProducerView<PcqProducer>;
455
456 template <typename Arg, typename... Args>
457 QueueStatus TryInsertHelper(ProducerView<PcqProducer>& aView, const Arg& aArg,
458 const Args&... aArgs) {
459 QueueStatus status = TryInsertItem(aView, aArg);
460 return IsSuccess(status) ? TryInsertHelper(aView, aArgs...) : status;
461 }
462
463 QueueStatus TryInsertHelper(ProducerView<PcqProducer>&) {
464 return QueueStatus::kSuccess;
465 }
466
467 template <typename Arg>
468 QueueStatus TryInsertItem(ProducerView<PcqProducer>& aView, const Arg& aArg) {
469 return QueueParamTraits<typename RemoveCVR<Arg>::Type>::Write(aView, aArg);
470 }
471
472 template <typename... Args>
473 QueueStatus TryWaitInsertImpl(bool aRecursed,
474 const Maybe<TimeDuration>& aDuration,
475 Args&&... aArgs) {
476 // Wait up to aDuration for the not-full semaphore to be signaled.
477 // If we run out of time then quit.
478 TimeStamp start(TimeStamp::Now());
479 if (aRecursed && (!mMaybeNotFullSem->Wait(aDuration))) {
480 return QueueStatus::kNotReady;
481 }
482
483 // Attempt to insert all args. No waiting is done here.
484 QueueStatus status = TryInsert(std::forward<Args>(aArgs)...);
485
486 TimeStamp now;
487 if (aRecursed && IsSuccess(status)) {
488 // If our local view of the queue is that it is still not full then
489 // we know it won't get full without us (we are the only producer).
490 // So re-set the not-full semaphore unless it's already set.
491 // (We are also the only not-full semaphore decrementer so it can't
492 // become 0.)
493 if ((!IsFull()) && (!mMaybeNotFullSem->IsAvailable())) {
494 mMaybeNotFullSem->Signal();
495 }
496 } else if ((status == QueueStatus::kNotReady) &&
497 (aDuration.isNothing() ||
498 ((now = TimeStamp::Now()) - start) < aDuration.value())) {
499 // We don't have enough room but still have time, e.g. because
500 // the consumer read some data but not enough or because the
501 // not-full semaphore gave a false positive. Either way, retry.
502 status =
503 aDuration.isNothing()
504 ? TryWaitInsertImpl(true, aDuration, std::forward<Args>(aArgs)...)
505 : TryWaitInsertImpl(true, Some(aDuration.value() - (now - start)),
506 std::forward<Args>(aArgs)...);
507 }
508
509 return status;
510 }
511
512 template <typename Arg>
513 QueueStatus WriteObject(size_t aRead, size_t* aWrite, const Arg& arg,
514 size_t aArgSize) {
515 return Marshaller::WriteObject(mQueue, QueueBufferSize(), aRead, aWrite,
516 arg, aArgSize);
517 }
518
519 // Currently, the PCQ requires any parameters expected to need more than
520 // 1/16 the total number of bytes in the command queue to use their own
521 // SharedMemory.
522 bool NeedsSharedMemory(size_t aRequested) {
523 return (Size() / 16) < aRequested;
524 }
525
526 PcqProducer(Shmem& aShmem, base::ProcessId aOtherPid, size_t aQueueSize,
527 RefPtr<detail::PcqRCSemaphore> aMaybeNotEmptySem,
528 RefPtr<detail::PcqRCSemaphore> aMaybeNotFullSem)
529 : PcqBase(aShmem, aOtherPid, aQueueSize, aMaybeNotEmptySem,
530 aMaybeNotFullSem) {
531 // Since they are shared, this initializes mRead/mWrite in the PcqConsumer
532 // as well.
533 *mRead = 0;
534 *mWrite = 0;
535 }
536
537 PcqProducer(const PcqProducer&) = delete;
538 PcqProducer& operator=(const PcqProducer&) = delete;
539 };
540
541 class PcqConsumer : public detail::PcqBase {
542 public:
543 PcqConsumer(PcqConsumer&& aOther) = default;
544 PcqConsumer& operator=(PcqConsumer&&) = default;
545 PcqConsumer() = default; // for IPDL
546
547 /**
548 * The number of bytes that the queue can hold.
549 */
550 size_t Size() { return QueueSize(); }
551
552 /**
553 * Attempts to copy aArgs in the queue. The queue remains unchanged.
554 */
555 template <typename... Args>
556 QueueStatus TryPeek(Args&... aArgs) {
557 return TryPeekOrRemove<false, Args...>(
558 [&](ConsumerView<PcqConsumer>& aView) -> QueueStatus {
559 return TryPeekRemoveHelper(aView, &aArgs...);
560 });
561 }
562
563 template <typename... Args>
564 QueueStatus TryTypedPeek(Args&... aArgs) {
565 return TryPeek(PcqTypedArg<Args>(aArgs)...);
566 }
567
568 /**
569 * Attempts to copy and remove aArgs from the queue. If the operation does
570 * not succeed then the queue is unchanged.
571 */
572 template <typename... Args>
573 QueueStatus TryRemove(Args&... aArgs) {
574 return TryPeekOrRemove<true, Args...>(
575 [&](ConsumerView<PcqConsumer>& aView) -> QueueStatus {
576 return TryPeekRemoveHelper(aView, &aArgs...);
577 });
578 }
579
580 template <typename... Args>
581 QueueStatus TryTypedRemove(Args&... aArgs) {
582 return TryRemove(PcqTypedArg<Args>(&aArgs)...);
583 }
584
585 /**
586 * Attempts to remove Args from the queue without copying them. If the
587 * operation does not succeed then the queue is unchanged.
588 */
589 template <typename... Args>
590 QueueStatus TryRemove() {
591 using seq = std::index_sequence_for<Args...>;
592 return TryRemove<Args...>(seq{});
593 }
594
595 template <typename... Args>
596 QueueStatus TryTypedRemove() {
597 return TryRemove<PcqTypedArg<Args>...>();
598 }
599
600 /**
601 * Wait for up to aDuration to get a copy of the requested data. If the
602 * operation does not succeed in the time allotted then the queue is
603 * unchanged. Pass Nothing to wait until peek succeeds.
604 */
605 template <typename... Args>
606 QueueStatus TryWaitPeek(const Maybe<TimeDuration>& aDuration,
607 Args&... aArgs) {
608 return TryWaitPeekOrRemove<false>(aDuration, aArgs...);
609 }
610
611 /**
612 * Wait for up to aDuration to remove the requested data from the queue.
613 * Pass Nothing to wait until removal succeeds.
614 */
615 template <typename... Args>
616 QueueStatus TryWaitRemove(const Maybe<TimeDuration>& aDuration,
617 Args&... aArgs) {
618 return TryWaitPeekOrRemove<true>(aDuration, aArgs...);
619 }
620
621 /**
622 * Wait for up to aDuration to remove the requested data. No copy
623 * of the data is returned. If the operation does not succeed in the
624 * time allotted then the queue is unchanged.
625 * Pass Nothing to wait until removal succeeds.
626 */
627 template <typename... Args>
628 QueueStatus TryWaitRemove(const Maybe<TimeDuration>& aDuration) {
629 // Wait up to aDuration for the not-empty semaphore to be signaled.
630 // If we run out of time then quit.
631 TimeStamp start(TimeStamp::Now());
632 if (!mMaybeNotEmptySem->Wait(aDuration)) {
633 return QueueStatus::kNotReady;
634 }
635
636 // Attempt to remove all args. No waiting is done here.
637 QueueStatus status = TryRemove<Args...>();
638
639 TimeStamp now;
640 if (IsSuccess(status)) {
641 // If our local view of the queue is that it is still not empty then
642 // we know it won't get empty without us (we are the only consumer).
643 // So re-set the not-empty semaphore unless it's already set.
644 // (We are also the only not-empty semaphore decrementer so it can't
645 // become 0.)
646 if ((!IsEmpty()) && (!mMaybeNotEmptySem->IsAvailable())) {
647 mMaybeNotEmptySem->Signal();
648 }
649 } else if ((status == QueueStatus::kNotReady) &&
650 (aDuration.isNothing() ||
651 ((now = TimeStamp::Now()) - start) < aDuration.value())) {
652 // We don't have enough data but still have time, e.g. because
653 // the producer wrote some data but not enough or because the
654 // not-empty semaphore gave a false positive. Either way, retry.
655 status =
656 aDuration.isNothing()
657 ? TryWaitRemove<Args...>(aDuration)
658 : TryWaitRemove<Args...>(Some(aDuration.value() - (now - start)));
659 }
660
661 return status;
662 }
663
664 protected:
665 friend ProducerConsumerQueue;
666 friend ConsumerView<PcqConsumer>;
667
668 // PeekOrRemoveOperation takes a read pointer and a write index.
669 using PeekOrRemoveOperation =
670 std::function<QueueStatus(ConsumerView<PcqConsumer>&)>;
671
672 template <bool isRemove, typename... Args>
673 QueueStatus TryPeekOrRemove(const PeekOrRemoveOperation& aOperation) {
674 size_t write = mWrite->load(std::memory_order_acquire);
675 size_t read = mRead->load(std::memory_order_relaxed);
676 const size_t initRead = read;
677
678 if (!ValidState(read, write)) {
679 PCQ_LOGE(
680 "Queue was found in an invalid state. Queue Size: %zu. "
681 "Read: %zu. Write: %zu",
682 Size(), read, write);
683 return QueueStatus::kFatalError;
684 }
685
686 ConsumerView<PcqConsumer> view(this, &read, write);
687
688 // Check that the queue has enough unoccupied room for all Args types.
689 // This is based on the user's size estimate for Args from QueueParamTraits.
690 size_t bytesNeeded = detail::MinSizeofArgs(view);
691
692 if (Size() < bytesNeeded) {
693 PCQ_LOGE(
694 "Queue is too small for objects. Queue Size: %zu. "
695 "Bytes needed: %zu.",
696 Size(), bytesNeeded);
697 return QueueStatus::kTooSmall;
698 }
699
700 if (UsedBytes(read, write) < bytesNeeded) {
701 PCQ_LOGD(
702 "Not enough data in queue. Has: %zu (%zu,%zu). "
703 "Bytes needed: %zu",
704 UsedBytes(read, write), read, write, bytesNeeded);
705 return QueueStatus::kNotReady;
706 }
707
708 // Only update the queue if the operation was successful and we aren't
709 // peeking.
710 QueueStatus status = aOperation(view);
711 if (!status) {
712 return status;
713 }
714
715 // Check that at least bytesNeeded were consumed. Failing this means
716 // that some QueueParamTraits::MinSize estimated too many bytes.
717 bool enoughBytes =
718 FreeBytes(read, write) >=
719 FreeBytes((initRead + bytesNeeded) % QueueBufferSize(), write);
720 MOZ_ASSERT(enoughBytes);
721 if (!enoughBytes) {
722 return QueueStatus::kFatalError;
723 }
724
725 MOZ_ASSERT(ValidState(read, write));
726
727 PCQ_LOGD(
728 "Successfully %s. PcqConsumer used %zu bytes total. "
729 "Read index: %zu -> %zu",
730 isRemove ? "removed" : "peeked", bytesNeeded, initRead, read);
731
732 // Commit the transaction... unless we were just peeking.
733 if (isRemove) {
734 mRead->store(read, std::memory_order_release);
735 // Set the semaphore (unless it is already set) to let the producer know
736 // that the queue may not be full. We just need to guarantee that it
737 // was set (i.e. non-zero) at some time after mRead was updated.
738 if (!mMaybeNotFullSem->IsAvailable()) {
739 mMaybeNotFullSem->Signal();
740 }
741 }
742 return status;
743 }
744
745 // Helper that passes nulls for all Args*
746 template <typename... Args, size_t... Is>
747 QueueStatus TryRemove(std::index_sequence<Is...>) {
748 std::tuple<Args*...> nullArgs;
749 return TryPeekOrRemove<true, Args...>(
750 [&](ConsumerView<PcqConsumer>& aView) {
751 return TryPeekRemoveHelper(aView, std::get<Is>(nullArgs)...);
752 });
753 }
754
755 template <bool isRemove, typename... Args>
756 QueueStatus TryWaitPeekOrRemove(const Maybe<TimeDuration>& aDuration,
757 Args&&... aArgs) {
758 return TryWaitPeekOrRemoveImpl<isRemove>(false, aDuration,
759 std::forward<Args>(aArgs)...);
760 }
761
762 template <bool isRemove, typename... Args>
763 QueueStatus TryWaitPeekOrRemoveImpl(bool aRecursed,
764 const Maybe<TimeDuration>& aDuration,
765 Args&... aArgs) {
766 // Wait up to aDuration for the not-empty semaphore to be signaled.
767 // If we run out of time then quit.
768 TimeStamp start(TimeStamp::Now());
769 if (aRecursed && (!mMaybeNotEmptySem->Wait(aDuration))) {
770 return QueueStatus::kNotReady;
771 }
772
773 // Attempt to read all args. No waiting is done here.
774 QueueStatus status = isRemove ? TryRemove(aArgs...) : TryPeek(aArgs...);
775
776 TimeStamp now;
777 if (aRecursed && IsSuccess(status)) {
778 // If our local view of the queue is that it is still not empty then
779 // we know it won't get empty without us (we are the only consumer).
780 // So re-set the not-empty semaphore unless it's already set.
781 // (We are also the only not-empty semaphore decrementer so it can't
782 // become 0.)
783 if ((!IsEmpty()) && (!mMaybeNotEmptySem->IsAvailable())) {
784 mMaybeNotEmptySem->Signal();
785 }
786 } else if ((status == QueueStatus::kNotReady) &&
787 (aDuration.isNothing() ||
788 ((now = TimeStamp::Now()) - start) < aDuration.value())) {
789 // We don't have enough data but still have time, e.g. because
790 // the producer wrote some data but not enough or because the
791 // not-empty semaphore gave a false positive. Either way, retry.
792 status =
793 aDuration.isNothing()
794 ? TryWaitPeekOrRemoveImpl<isRemove>(true, aDuration, aArgs...)
795 : TryWaitPeekOrRemoveImpl<isRemove>(
796 true, Some(aDuration.value() - (now - start)), aArgs...);
797 }
798
799 return status;
800 }
801
802 // Version of the helper for copying values out of the queue.
803 template <typename... Args>
804 QueueStatus TryPeekRemoveHelper(ConsumerView<PcqConsumer>& aView,
805 Args*... aArgs);
806
807 template <typename Arg, typename... Args>
808 QueueStatus TryPeekRemoveHelper(ConsumerView<PcqConsumer>& aView, Arg* aArg,
809 Args*... aArgs) {
810 QueueStatus status = TryCopyOrSkipItem<Arg>(aView, aArg);
811 return IsSuccess(status) ? TryPeekRemoveHelper<Args...>(aView, aArgs...)
812 : status;
813 }
814
815 QueueStatus TryPeekRemoveHelper(ConsumerView<PcqConsumer>&) {
816 return QueueStatus::kSuccess;
817 }
818
819 // If an item is available then it is copied into aArg. The item is skipped
820 // over if aArg is null.
821 template <typename Arg>
822 QueueStatus TryCopyOrSkipItem(ConsumerView<PcqConsumer>& aView, Arg* aArg) {
823 return QueueParamTraits<typename RemoveCVR<Arg>::Type>::Read(
824 aView, const_cast<std::remove_cv_t<Arg>*>(aArg));
825 }
826
827 template <typename Arg>
828 QueueStatus ReadObject(size_t* aRead, size_t aWrite, Arg* arg,
829 size_t aArgSize) {
830 return Marshaller::ReadObject(mQueue, QueueBufferSize(), aRead, aWrite, arg,
831 aArgSize);
832 }
833
834 // Currently, the PCQ requires any parameters expected to need more than
835 // 1/16 the total number of bytes in the command queue to use their own
836 // SharedMemory.
837 bool NeedsSharedMemory(size_t aRequested) {
838 return (Size() / 16) < aRequested;
839 }
840
841 PcqConsumer(Shmem& aShmem, base::ProcessId aOtherPid, size_t aQueueSize,
842 RefPtr<detail::PcqRCSemaphore> aMaybeNotEmptySem,
843 RefPtr<detail::PcqRCSemaphore> aMaybeNotFullSem)
844 : PcqBase(aShmem, aOtherPid, aQueueSize, aMaybeNotEmptySem,
845 aMaybeNotFullSem) {}
846
847 PcqConsumer(const PcqConsumer&) = delete;
848 PcqConsumer& operator=(const PcqConsumer&) = delete;
849 };
850
851 using mozilla::detail::GetCacheLineSize;
852 using mozilla::detail::GetMaxHeaderSize;
853
854 /**
855 * A single producer + single consumer queue, implemented as a
856 * circular queue. The object is backed with a Shmem, which allows
857 * it to be used across processes.
858 *
859 * This is a single-producer/single-consumer queue. Another way of saying that
860 * is to say that the PcqProducer and PcqConsumer objects are not thread-safe.
861 */
862 class ProducerConsumerQueue {
863 public:
864 /**
865 * Create a queue whose endpoints are the same as those of aProtocol.
866 * In choosing a queueSize, be aware that both the queue and the Shmem will
867 * allocate additional shared memory for internal accounting (see
868 * GetMaxHeaderSize) and that Shmem sizes are a multiple of the operating
869 * system's page sizes.
870 *
871 * aAdditionalBytes of shared memory will also be allocated.
872 * Clients may use this shared memory for their own purposes.
873 * See GetUserReservedMemory() and GetUserReservedMemorySize()
874 */
875 static UniquePtr<ProducerConsumerQueue> Create(
876 mozilla::ipc::IProtocol* aProtocol, size_t aQueueSize,
877 size_t aAdditionalBytes = 0) {
878 MOZ_ASSERT(aProtocol);
879 Shmem shmem;
880
881 // NB: We need one extra byte for the queue contents (hence the "+1").
882 uint32_t totalShmemSize =
883 aQueueSize + 1 + GetMaxHeaderSize() + aAdditionalBytes;
884
885 if (!aProtocol->AllocUnsafeShmem(
886 totalShmemSize, mozilla::ipc::SharedMemory::TYPE_BASIC, &shmem)) {
887 return nullptr;
888 }
889
890 UniquePtr<ProducerConsumerQueue> ret =
891 Create(shmem, aProtocol->OtherPid(), aQueueSize);
892 if (!ret) {
893 return ret;
894 }
895
896 // The system may have reserved more bytes than the user asked for.
897 // Make sure they aren't given access to the extra.
898 MOZ_ASSERT(ret->mProducer->mUserReservedSize >= aAdditionalBytes);
899 ret->mProducer->mUserReservedSize = aAdditionalBytes;
900 ret->mConsumer->mUserReservedSize = aAdditionalBytes;
901 if (aAdditionalBytes == 0) {
902 ret->mProducer->mUserReservedMemory = nullptr;
903 ret->mConsumer->mUserReservedMemory = nullptr;
904 }
905 return ret;
906 }
907
908 /**
909 * Create a queue that is backed by aShmem, which must be:
910 * (1) unsafe
911 * (2) made for use with aOtherPid (may be this process' PID)
912 * (3) large enough to hold the queue contents and the shared meta-data of
913 * the queue (see GetMaxHeaderSize). Any room left over will be available
914 * as user reserved memory.
915 * See GetUserReservedMemory() and GetUserReservedMemorySize()
916 */
917 static UniquePtr<ProducerConsumerQueue> Create(Shmem& aShmem,
918 base::ProcessId aOtherPid,
919 size_t aQueueSize) {
920 uint32_t totalShmemSize = aShmem.Size<uint8_t>();
921
922 // NB: We need one extra byte for the queue contents (hence the "+1").
923 if ((!aShmem.IsWritable()) || (!aShmem.IsReadable()) ||
924 ((GetMaxHeaderSize() + aQueueSize + 1) > totalShmemSize)) {
925 return nullptr;
926 }
927
928 auto notempty = MakeRefPtr<detail::PcqRCSemaphore>(
929 CrossProcessSemaphore::Create("webgl-notempty", 0));
930 auto notfull = MakeRefPtr<detail::PcqRCSemaphore>(
931 CrossProcessSemaphore::Create("webgl-notfull", 1));
932 return WrapUnique(new ProducerConsumerQueue(aShmem, aOtherPid, aQueueSize,
933 notempty, notfull));
934 }
935
936 /**
937 * The queue needs a few bytes for 2 shared counters. It takes these from the
938 * underlying Shmem. This will still work if the cache line size is incorrect
939 * for some architecture but operations may be less efficient.
940 */
941 static constexpr size_t GetMaxHeaderSize() {
942 return mozilla::detail::GetMaxHeaderSize();
943 }
944
945 /**
946 * Cache line size for the machine. We assume a 64-byte cache line size.
947 */
948 static constexpr size_t GetCacheLineSize() {
949 return mozilla::detail::GetCacheLineSize();
950 }
951
952 using Producer = PcqProducer;
953 using Consumer = PcqConsumer;
954
955 UniquePtr<Producer> TakeProducer() { return std::move(mProducer); }
956 UniquePtr<Consumer> TakeConsumer() { return std::move(mConsumer); }
957
958 private:
959 ProducerConsumerQueue(Shmem& aShmem, base::ProcessId aOtherPid,
960 size_t aQueueSize,
961 RefPtr<detail::PcqRCSemaphore>& aMaybeNotEmptySem,
962 RefPtr<detail::PcqRCSemaphore>& aMaybeNotFullSem)
963 : mProducer(
964 WrapUnique(new Producer(aShmem, aOtherPid, aQueueSize,
965 aMaybeNotEmptySem, aMaybeNotFullSem))),
966 mConsumer(
967 WrapUnique(new Consumer(aShmem, aOtherPid, aQueueSize,
968 aMaybeNotEmptySem, aMaybeNotFullSem))) {
969 PCQ_LOGD(
970 "Constructed PCQ (%p). Shmem Size = %zu. Queue Size = %zu. "
971 "Other process ID: %08x.",
972 this, aShmem.Size<uint8_t>(), aQueueSize, (uint32_t)aOtherPid);
973 }
974
975 UniquePtr<Producer> mProducer;
976 UniquePtr<Consumer> mConsumer;
977 };
978
979 } // namespace webgl
980
981 namespace ipc {
982
983 template <>
984 struct IPDLParamTraits<mozilla::detail::PcqBase> {
985 typedef mozilla::detail::PcqBase paramType;
986
987 static void Write(IPC::Message* aMsg, IProtocol* aActor, paramType& aParam) {
988 WriteIPDLParam(aMsg, aActor, aParam.QueueSize());
989 WriteIPDLParam(aMsg, aActor, std::move(aParam.mShmem));
990
991 // May not currently share a PcqProducer or PcqConsumer with a process that
992 // it's Shmem is not related to.
993 MOZ_ASSERT(aActor->OtherPid() == aParam.mOtherPid);
994 WriteIPDLParam(
995 aMsg, aActor,
996 aParam.mMaybeNotEmptySem->ShareToProcess(aActor->OtherPid()));
997
998 WriteIPDLParam(aMsg, aActor,
999 aParam.mMaybeNotFullSem->ShareToProcess(aActor->OtherPid()));
1000 }
1001
1002 static bool Read(const IPC::Message* aMsg, PickleIterator* aIter,
1003 IProtocol* aActor, paramType* aResult) {
1004 size_t queueSize;
1005 Shmem shmem;
1006 CrossProcessSemaphoreHandle notEmptyHandle;
1007 CrossProcessSemaphoreHandle notFullHandle;
1008
1009 if (!ReadIPDLParam(aMsg, aIter, aActor, &queueSize) ||
1010 !ReadIPDLParam(aMsg, aIter, aActor, &shmem) ||
1011 !ReadIPDLParam(aMsg, aIter, aActor, ¬EmptyHandle) ||
1012 !ReadIPDLParam(aMsg, aIter, aActor, ¬FullHandle)) {
1013 return false;
1014 }
1015
1016 MOZ_ASSERT(IsHandleValid(notEmptyHandle) && IsHandleValid(notFullHandle));
1017 aResult->Set(shmem, aActor->OtherPid(), queueSize,
1018 MakeRefPtr<detail::PcqRCSemaphore>(
1019 CrossProcessSemaphore::Create(notEmptyHandle)),
1020 MakeRefPtr<detail::PcqRCSemaphore>(
1021 CrossProcessSemaphore::Create(notFullHandle)));
1022 return true;
1023 }
1024
1025 static void Log(const paramType& aParam, std::wstring* aLog) {
1026 IPDLParamTraits<Shmem>::Log(aParam.mShmem, aLog);
1027 }
1028 };
1029
1030 template <>
1031 struct IPDLParamTraits<mozilla::webgl::PcqProducer>
1032 : public IPDLParamTraits<mozilla::detail::PcqBase> {
1033 typedef mozilla::webgl::PcqProducer paramType;
1034 };
1035
1036 template <>
1037 struct IPDLParamTraits<mozilla::webgl::PcqConsumer>
1038 : public IPDLParamTraits<mozilla::detail::PcqBase> {
1039 typedef mozilla::webgl::PcqConsumer paramType;
1040 };
1041
1042 } // namespace ipc
1043 } // namespace mozilla
1044
1045 #endif // mozilla_ipc_ProducerConsumerQueue_h
1046