1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 **     http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 #ifndef BLOOMBERG_QUANTUM_TASK_QUEUE_H
17 #define BLOOMBERG_QUANTUM_TASK_QUEUE_H
18 
19 #include <quantum/interface/quantum_itask_continuation.h>
20 #include <quantum/interface/quantum_iterminate.h>
21 #include <quantum/interface/quantum_iqueue.h>
22 #include <quantum/quantum_spinlock.h>
23 #include <quantum/quantum_task.h>
24 #include <quantum/quantum_yielding_thread.h>
25 #include <quantum/quantum_queue_statistics.h>
26 #include <quantum/quantum_configuration.h>
27 #include <list>
28 #include <atomic>
29 #include <functional>
30 #include <algorithm>
31 #include <mutex>
32 #include <condition_variable>
33 #include <thread>
34 #include <pthread.h>
35 #include <iostream>
36 
37 namespace Bloomberg {
38 namespace quantum {
39 
40 //==============================================================================================
41 //                                 class TaskQueue
42 //==============================================================================================
43 /// @class TaskQueue.
44 /// @brief Thread queue for running coroutines.
45 /// @note For internal use only.
46 class TaskQueue : public IQueue
47 {
48 public:
49     using TaskList = std::list<Task::Ptr, ContiguousPoolManager<ITask::Ptr>>;
50     using TaskListIter = TaskList::iterator;
51 
52     TaskQueue();
53 
54     TaskQueue(const Configuration& config,
55               std::shared_ptr<TaskQueue> sharedQueue);
56 
57     TaskQueue(const TaskQueue& other);
58 
59     TaskQueue(TaskQueue&& other) = default;
60 
61     ~TaskQueue();
62 
63     void pinToCore(int coreId) final;
64 
65     void run() final;
66 
67     void enqueue(ITask::Ptr task) final;
68 
69     bool tryEnqueue(ITask::Ptr task) final;
70 
71     ITask::Ptr dequeue(std::atomic_bool& hint) final;
72 
73     ITask::Ptr tryDequeue(std::atomic_bool& hint) final;
74 
75     size_t size() const final;
76 
77     bool empty() const final;
78 
79     void terminate() final;
80 
81     IQueueStatistics& stats() final;
82 
83     SpinLock& getLock() final;
84 
85     void signalEmptyCondition(bool value) final;
86 
87     bool isIdle() const final;
88 
89     const std::shared_ptr<std::thread>& getThread() const final;
90 
91 private:
92     struct WorkItem
93     {
94         WorkItem(TaskPtr task,
95                  TaskListIter iter,
96                  bool isBlocked,
97                  unsigned int blockedQueueRound);
98 
99         TaskPtr         _task;              // task pointer
100         TaskListIter    _iter;              // task iterator
101         bool            _isBlocked;         // true if the entire queue is blocked
102         unsigned int    _blockedQueueRound; // blocked queue round id
103     };
104     struct ProcessTaskResult
105     {
106         ProcessTaskResult(bool isBlocked,
107                           unsigned int blockedQueueRound);
108 
109         bool            _isBlocked;         // true if the entire queue is blocked
110         unsigned int    _blockedQueueRound; // blocked queue round id
111     };
112     //Coroutine result handlers
113     bool handleNotCallable(const WorkItem& entry);
114     bool handleAlreadyResumed(WorkItem& entry);
115     bool handleRunning(WorkItem& entry);
116     bool handleSuccess(const WorkItem& entry);
117     bool handleBlocked(WorkItem& entry);
118     bool handleSleeping(WorkItem& entry);
119     bool handleError(const WorkItem& entry);
120     bool handleException(const WorkItem& workItem,
121                          const std::exception* ex = nullptr);
122 
123     void onBlockedTask(WorkItem& entry);
124     void onActiveTask(WorkItem& entry);
125 
126     bool isInterrupted();
127     void signalSharedQueueEmptyCondition(bool value);
128     ProcessTaskResult processTask();
129     WorkItem grabWorkItem();
130     void doEnqueue(ITask::Ptr task);
131     ITask::Ptr doDequeue(std::atomic_bool& hint,
132                          TaskListIter iter);
133     void acquireWaiting();
134     void sleepOnBlockedQueue(const ProcessTaskResult& mainQueueResult);
135     void sleepOnBlockedQueue(const ProcessTaskResult& mainQueueResult,
136                              const ProcessTaskResult& sharedQueueResult);
137 
138 
139     QueueListAllocator                  _alloc;
140     std::shared_ptr<std::thread>        _thread;
141     TaskList                            _runQueue;
142     TaskList                            _waitQueue;
143     TaskListIter                        _queueIt;
144     TaskListIter                        _blockedIt;
145     bool                                _isBlocked;
146     mutable SpinLock                    _runQueueLock;
147     mutable SpinLock                    _waitQueueLock;
148     std::mutex                          _notEmptyMutex; //for accessing the condition variable
149     std::condition_variable             _notEmptyCond;
150     std::atomic_bool                    _isEmpty;
151     std::atomic_bool                    _isSharedQueueEmpty;
152     std::atomic_bool                    _isInterrupted;
153     std::atomic_bool                    _isIdle;
154     std::atomic_bool                    _terminated;
155     bool                                _isAdvanced;
156     QueueStatistics                     _stats;
157     std::shared_ptr<TaskQueue>          _sharedQueue;
158     std::vector<TaskQueue*>             _helpers;
159     unsigned int                        _queueRound;
160     unsigned int                        _lastSleptQueueRound;
161     unsigned int                        _lastSleptSharedQueueRound;
162 };
163 
164 }}
165 
166 #include <quantum/impl/quantum_task_queue_impl.h>
167 
168 #endif //BLOOMBERG_QUANTUM_TASK_QUEUE_H
169