1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB__aggregator_impl_H 18 #define __TBB__aggregator_impl_H 19 20 #include "../atomic.h" 21 #if !__TBBMALLOC_BUILD 22 #include "../tbb_profiling.h" 23 #endif 24 25 namespace tbb { 26 namespace interface6 { 27 namespace internal { 28 29 using namespace tbb::internal; 30 31 //! aggregated_operation base class 32 template <typename Derived> 33 class aggregated_operation { 34 public: 35 //! Zero value means "wait" status, all other values are "user" specified values and are defined into the scope of a class which uses "status". 36 uintptr_t status; 37 38 Derived *next; aggregated_operation()39 aggregated_operation() : status(0), next(NULL) {} 40 }; 41 42 //! Aggregator base class 43 /** An aggregator for collecting operations coming from multiple sources and executing 44 them serially on a single thread. operation_type must be derived from 45 aggregated_operation. The parameter handler_type is a functor that will be passed the 46 list of operations and is expected to handle each operation appropriately, setting the 47 status of each operation to non-zero.*/ 48 template < typename operation_type > 49 class aggregator_generic { 50 public: aggregator_generic()51 aggregator_generic() : handler_busy(false) { pending_operations = NULL; } 52 53 //! Execute an operation 54 /** Places an operation into the waitlist (pending_operations), and either handles the list, 55 or waits for the operation to complete, or returns. 56 The long_life_time parameter specifies the life time of the given operation object. 57 Operations with long_life_time == true may be accessed after execution. 58 A "short" life time operation (long_life_time == false) can be destroyed 59 during execution, and so any access to it after it was put into the waitlist, 60 including status check, is invalid. As a consequence, waiting for completion 61 of such operation causes undefined behavior. 62 */ 63 template < typename handler_type > 64 void execute(operation_type *op, handler_type &handle_operations, bool long_life_time = true) { 65 operation_type *res; 66 // op->status should be read before inserting the operation into the 67 // aggregator waitlist since it can become invalid after executing a 68 // handler (if the operation has 'short' life time.) 69 const uintptr_t status = op->status; 70 71 // ITT note: &(op->status) tag is used to cover accesses to this op node. This 72 // thread has created the operation, and now releases it so that the handler 73 // thread may handle the associated operation w/o triggering a race condition; 74 // thus this tag will be acquired just before the operation is handled in the 75 // handle_operations functor. 76 call_itt_notify(releasing, &(op->status)); 77 // insert the operation in the queue. 78 do { 79 // Tools may flag the following line as a race; it is a false positive: 80 // This is an atomic read; we don't provide itt_hide_load_word for atomics 81 op->next = res = pending_operations; // NOT A RACE 82 } while (pending_operations.compare_and_swap(op, res) != res); 83 if (!res) { // first in the list; handle the operations. 84 // ITT note: &pending_operations tag covers access to the handler_busy flag, 85 // which this waiting handler thread will try to set before entering 86 // handle_operations. 87 call_itt_notify(acquired, &pending_operations); 88 start_handle_operations(handle_operations); 89 // The operation with 'short' life time can already be destroyed. 90 if (long_life_time) 91 __TBB_ASSERT(op->status, NULL); 92 } 93 // not first; wait for op to be ready. 94 else if (!status) { // operation is blocking here. 95 __TBB_ASSERT(long_life_time, "Waiting for an operation object that might be destroyed during processing."); 96 call_itt_notify(prepare, &(op->status)); 97 spin_wait_while_eq(op->status, uintptr_t(0)); 98 itt_load_word_with_acquire(op->status); 99 } 100 } 101 102 private: 103 //! An atomically updated list (aka mailbox) of pending operations 104 atomic<operation_type *> pending_operations; 105 //! Controls thread access to handle_operations 106 uintptr_t handler_busy; 107 108 //! Trigger the handling of operations when the handler is free 109 template < typename handler_type > start_handle_operations(handler_type & handle_operations)110 void start_handle_operations( handler_type &handle_operations ) { 111 operation_type *op_list; 112 113 // ITT note: &handler_busy tag covers access to pending_operations as it is passed 114 // between active and waiting handlers. Below, the waiting handler waits until 115 // the active handler releases, and the waiting handler acquires &handler_busy as 116 // it becomes the active_handler. The release point is at the end of this 117 // function, when all operations in pending_operations have been handled by the 118 // owner of this aggregator. 119 call_itt_notify(prepare, &handler_busy); 120 // get the handler_busy: 121 // only one thread can possibly spin here at a time 122 spin_wait_until_eq(handler_busy, uintptr_t(0)); 123 call_itt_notify(acquired, &handler_busy); 124 // acquire fence not necessary here due to causality rule and surrounding atomics 125 __TBB_store_with_release(handler_busy, uintptr_t(1)); 126 127 // ITT note: &pending_operations tag covers access to the handler_busy flag 128 // itself. Capturing the state of the pending_operations signifies that 129 // handler_busy has been set and a new active handler will now process that list's 130 // operations. 131 call_itt_notify(releasing, &pending_operations); 132 // grab pending_operations 133 op_list = pending_operations.fetch_and_store(NULL); 134 135 // handle all the operations 136 handle_operations(op_list); 137 138 // release the handler 139 itt_store_word_with_release(handler_busy, uintptr_t(0)); 140 } 141 }; 142 143 template < typename handler_type, typename operation_type > 144 class aggregator : public aggregator_generic<operation_type> { 145 handler_type handle_operations; 146 public: aggregator()147 aggregator() {} aggregator(handler_type h)148 explicit aggregator(handler_type h) : handle_operations(h) {} 149 initialize_handler(handler_type h)150 void initialize_handler(handler_type h) { handle_operations = h; } 151 execute(operation_type * op)152 void execute(operation_type *op) { 153 aggregator_generic<operation_type>::execute(op, handle_operations); 154 } 155 }; 156 157 // the most-compatible friend declaration (vs, gcc, icc) is 158 // template<class U, class V> friend class aggregating_functor; 159 template<typename aggregating_class, typename operation_list> 160 class aggregating_functor { 161 aggregating_class *fi; 162 public: aggregating_functor()163 aggregating_functor() : fi() {} aggregating_functor(aggregating_class * fi_)164 aggregating_functor(aggregating_class *fi_) : fi(fi_) {} operator()165 void operator()(operation_list* op_list) { fi->handle_operations(op_list); } 166 }; 167 168 } // namespace internal 169 } // namespace interface6 170 171 namespace internal { 172 using interface6::internal::aggregated_operation; 173 using interface6::internal::aggregator_generic; 174 using interface6::internal::aggregator; 175 using interface6::internal::aggregating_functor; 176 } // namespace internal 177 178 } // namespace tbb 179 180 #endif // __TBB__aggregator_impl_H 181