1 /* 2 ** Copyright 2018 Bloomberg Finance L.P. 3 ** 4 ** Licensed under the Apache License, Version 2.0 (the "License"); 5 ** you may not use this file except in compliance with the License. 6 ** You may obtain a copy of the License at 7 ** 8 ** http://www.apache.org/licenses/LICENSE-2.0 9 ** 10 ** Unless required by applicable law or agreed to in writing, software 11 ** distributed under the License is distributed on an "AS IS" BASIS, 12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 ** See the License for the specific language governing permissions and 14 ** limitations under the License. 15 */ 16 #ifndef BLOOMBERG_QUANTUM_TASK_QUEUE_H 17 #define BLOOMBERG_QUANTUM_TASK_QUEUE_H 18 19 #include <quantum/interface/quantum_itask_continuation.h> 20 #include <quantum/interface/quantum_iterminate.h> 21 #include <quantum/interface/quantum_iqueue.h> 22 #include <quantum/quantum_spinlock.h> 23 #include <quantum/quantum_task.h> 24 #include <quantum/quantum_yielding_thread.h> 25 #include <quantum/quantum_queue_statistics.h> 26 #include <quantum/quantum_configuration.h> 27 #include <list> 28 #include <atomic> 29 #include <functional> 30 #include <algorithm> 31 #include <mutex> 32 #include <condition_variable> 33 #include <thread> 34 #include <pthread.h> 35 #include <iostream> 36 37 namespace Bloomberg { 38 namespace quantum { 39 40 //============================================================================================== 41 // class TaskQueue 42 //============================================================================================== 43 /// @class TaskQueue. 44 /// @brief Thread queue for running coroutines. 45 /// @note For internal use only. 46 class TaskQueue : public IQueue 47 { 48 public: 49 using TaskList = std::list<Task::Ptr, ContiguousPoolManager<ITask::Ptr>>; 50 using TaskListIter = TaskList::iterator; 51 52 TaskQueue(); 53 54 TaskQueue(const Configuration& config, 55 std::shared_ptr<TaskQueue> sharedQueue); 56 57 TaskQueue(const TaskQueue& other); 58 59 TaskQueue(TaskQueue&& other) = default; 60 61 ~TaskQueue(); 62 63 void pinToCore(int coreId) final; 64 65 void run() final; 66 67 void enqueue(ITask::Ptr task) final; 68 69 bool tryEnqueue(ITask::Ptr task) final; 70 71 ITask::Ptr dequeue(std::atomic_bool& hint) final; 72 73 ITask::Ptr tryDequeue(std::atomic_bool& hint) final; 74 75 size_t size() const final; 76 77 bool empty() const final; 78 79 void terminate() final; 80 81 IQueueStatistics& stats() final; 82 83 SpinLock& getLock() final; 84 85 void signalEmptyCondition(bool value) final; 86 87 bool isIdle() const final; 88 89 const std::shared_ptr<std::thread>& getThread() const final; 90 91 private: 92 struct WorkItem 93 { 94 WorkItem(TaskPtr task, 95 TaskListIter iter, 96 bool isBlocked, 97 unsigned int blockedQueueRound); 98 99 TaskPtr _task; // task pointer 100 TaskListIter _iter; // task iterator 101 bool _isBlocked; // true if the entire queue is blocked 102 unsigned int _blockedQueueRound; // blocked queue round id 103 }; 104 struct ProcessTaskResult 105 { 106 ProcessTaskResult(bool isBlocked, 107 unsigned int blockedQueueRound); 108 109 bool _isBlocked; // true if the entire queue is blocked 110 unsigned int _blockedQueueRound; // blocked queue round id 111 }; 112 //Coroutine result handlers 113 bool handleNotCallable(const WorkItem& entry); 114 bool handleAlreadyResumed(WorkItem& entry); 115 bool handleRunning(WorkItem& entry); 116 bool handleSuccess(const WorkItem& entry); 117 bool handleBlocked(WorkItem& entry); 118 bool handleSleeping(WorkItem& entry); 119 bool handleError(const WorkItem& entry); 120 bool handleException(const WorkItem& workItem, 121 const std::exception* ex = nullptr); 122 123 void onBlockedTask(WorkItem& entry); 124 void onActiveTask(WorkItem& entry); 125 126 bool isInterrupted(); 127 void signalSharedQueueEmptyCondition(bool value); 128 ProcessTaskResult processTask(); 129 WorkItem grabWorkItem(); 130 void doEnqueue(ITask::Ptr task); 131 ITask::Ptr doDequeue(std::atomic_bool& hint, 132 TaskListIter iter); 133 void acquireWaiting(); 134 void sleepOnBlockedQueue(const ProcessTaskResult& mainQueueResult); 135 void sleepOnBlockedQueue(const ProcessTaskResult& mainQueueResult, 136 const ProcessTaskResult& sharedQueueResult); 137 138 139 QueueListAllocator _alloc; 140 std::shared_ptr<std::thread> _thread; 141 TaskList _runQueue; 142 TaskList _waitQueue; 143 TaskListIter _queueIt; 144 TaskListIter _blockedIt; 145 bool _isBlocked; 146 mutable SpinLock _runQueueLock; 147 mutable SpinLock _waitQueueLock; 148 std::mutex _notEmptyMutex; //for accessing the condition variable 149 std::condition_variable _notEmptyCond; 150 std::atomic_bool _isEmpty; 151 std::atomic_bool _isSharedQueueEmpty; 152 std::atomic_bool _isInterrupted; 153 std::atomic_bool _isIdle; 154 std::atomic_bool _terminated; 155 bool _isAdvanced; 156 QueueStatistics _stats; 157 std::shared_ptr<TaskQueue> _sharedQueue; 158 std::vector<TaskQueue*> _helpers; 159 unsigned int _queueRound; 160 unsigned int _lastSleptQueueRound; 161 unsigned int _lastSleptSharedQueueRound; 162 }; 163 164 }} 165 166 #include <quantum/impl/quantum_task_queue_impl.h> 167 168 #endif //BLOOMBERG_QUANTUM_TASK_QUEUE_H 169