1 // 2 // Copyright 2016 Pixar 3 // 4 // Licensed under the Apache License, Version 2.0 (the "Apache License") 5 // with the following modification; you may not use this file except in 6 // compliance with the Apache License and the following modification to it: 7 // Section 6. Trademarks. is deleted and replaced with: 8 // 9 // 6. Trademarks. This License does not grant permission to use the trade 10 // names, trademarks, service marks, or product names of the Licensor 11 // and its affiliates, except as required to comply with Section 4(c) of 12 // the License and to reproduce the content of the NOTICE file. 13 // 14 // You may obtain a copy of the Apache License at 15 // 16 // http://www.apache.org/licenses/LICENSE-2.0 17 // 18 // Unless required by applicable law or agreed to in writing, software 19 // distributed under the Apache License with the above modification is 20 // distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 21 // KIND, either express or implied. See the Apache License for the specific 22 // language governing permissions and limitations under the Apache License. 23 // 24 #ifndef PXR_BASE_WORK_SINGULAR_TASK_H 25 #define PXR_BASE_WORK_SINGULAR_TASK_H 26 27 /// \file work/singularTask.h 28 29 #include "pxr/pxr.h" 30 31 #include <atomic> 32 #include <functional> 33 #include <type_traits> 34 35 PXR_NAMESPACE_OPEN_SCOPE 36 37 class WorkDispatcher; 38 39 /// \class WorkSingularTask 40 /// 41 /// A WorkSingularTask runs a task in a WorkDispatcher, but never concurrently 42 /// with itself. That is, the function provided to the WorkSingularTask runs 43 /// concurrently with other tasks in the WorkDispatcher, but never with another 44 /// invocation of itself. 45 /// 46 /// This is useful if there is single-threaded work to do that can be overlapped 47 /// with other parallel tasks in a dispatcher. For example, a 48 /// multiple-producer, single-consumer problem can be tackled this way. Run the 49 /// producer tasks as usual in a WorkDispatcher and create a WorkSingularTask 50 /// for the consumer. When a producer task has generated a result to consume, 51 /// it invokes Wake() on the consumer task. This ensures that the consumer runs 52 /// only when there are results to consume, and it lets the consumer operate 53 /// single-threaded. For example, the consumer could populate stl containers 54 /// without locking. 55 /// 56 class WorkSingularTask 57 { 58 public: 59 60 WorkSingularTask(WorkSingularTask const &) = delete; 61 WorkSingularTask &operator=(WorkSingularTask const &) = delete; 62 63 #ifdef doxygen 64 65 /// Create a singular task to be run in \p dispatcher. Callers must ensure 66 /// that \p dispatcher lives at least as long as this WorkSingularTask. 67 /// 68 /// A singular task is one that will not run concurrently with itself. See 69 /// the WorkSingularTask doc for more details. 70 /// 71 /// After constructing a WorkSingularTask, call Wake() to ensure that the 72 /// task runs at least once. 73 template <class Callable, class A1, class A2, ... class AN> 74 WorkSingularTask(WorkDispatcher &dispatcher, 75 Callable &&c, A1 &&a1, A2 &&a2, ... AN &&aN); 76 77 #else // doxygen 78 79 template <class Callable, class... Args> WorkSingularTask(WorkDispatcher & d,Callable && c,Args &&...args)80 WorkSingularTask(WorkDispatcher &d, Callable &&c, Args&&... args) 81 : _waker(_MakeWaker(d, std::bind(std::forward<Callable>(c), 82 std::forward<Args>(args)...))) 83 , _count(0) {} 84 85 #endif // doxygen 86 87 /// Ensure that this task runs at least once after this call. The task is 88 /// not guaranteed to run as many times as Wake() is invoked, only that it 89 /// run at least once after a call to Wake(). Wake()90 inline void Wake() { 91 if (++_count == 1) 92 _waker(_count); 93 } 94 95 private: 96 template <class Dispatcher, class Fn> 97 struct _Waker { _Waker_Waker98 explicit _Waker(Dispatcher &d, Fn &&fn) 99 : _dispatcher(d), _fn(std::move(fn)) {} 100 operator_Waker101 void operator()(std::atomic_size_t &count) const { 102 _dispatcher.Run( 103 [this, &count]() { 104 // We read the current refCount into oldCount, then we 105 // invoke the task function. Finally we try to CAS the 106 // refCount to zero. If we fail, it means some other 107 // clients have invoked Wake() in the meantime. In that 108 // case we go again to ensure the task can do whatever it 109 // was awakened to do. Once we successfully take the count 110 // to zero, we stop. 111 size_t old = count; 112 do { _fn(); } while ( 113 !count.compare_exchange_strong(old, 0)); 114 }); 115 } 116 Dispatcher &_dispatcher; 117 Fn _fn; 118 }; 119 120 template <class Dispatcher, class Fn> 121 static std::function<void (std::atomic_size_t &)> _MakeWaker(Dispatcher & d,Fn && fn)122 _MakeWaker(Dispatcher &d, Fn &&fn) { 123 return std::function<void (std::atomic_size_t &)>( 124 _Waker<Dispatcher, typename std::decay<Fn>::type>( 125 d, std::forward<Fn>(fn))); 126 } 127 128 std::function<void (std::atomic_size_t &)> _waker; 129 std::atomic_size_t _count; 130 }; 131 132 PXR_NAMESPACE_CLOSE_SCOPE 133 134 #endif // PXR_BASE_WORK_SINGULAR_TASK_H 135