1 // Observable Library
2 // Copyright (c) 2016-2017 David Capello
3 //
4 // This file is released under the terms of the MIT license.
5 // Read LICENSE.txt for more information.
6
7 #ifndef OBS_SAFE_LIST_H_INCLUDED
8 #define OBS_SAFE_LIST_H_INCLUDED
9 #pragma once
10
11 #include <cassert>
12 #include <chrono>
13 #include <condition_variable>
14 #include <iterator>
15 #include <mutex>
16 #include <thread>
17 #include <vector>
18
19 namespace obs {
20
21 // A STL-like list which is safe to remove/add items from multiple
22 // threads while it's being iterated by multiple threads too.
23 template<typename T>
24 class safe_list {
25 public:
26 class iterator;
27
28 private:
29 // A node in the linked list.
30 struct node {
31 // Pointer to a slot or an observer instance.
32 //
33 // As we cannot modify the list when we are in for-loops iterating
34 // the list, we can temporally mark nodes as disabled changing
35 // this "value" member to nullptr when we use erase(), Then when
36 // the list is not iterated anymore (m_ref==0), we call
37 // delete_nodes() to remove all disabled nodes from the list.
38 //
39 // We try to skip nodes with value=nullptr in the iteration
40 // process (iterator::operator++). But it isn't possible to ensure
41 // that an iterator will always return a non-nullptr value (so the
42 // client have to check the return value from iterators).
43 T* value;
44
45 // Number of locks for this node, it means the number of iterators
46 // being used and currently pointing to this node. (I.e. number of
47 // times an iterator::operator*() was called for this node and the
48 // iterator is still alive.)
49 //
50 // This variable is incremented/decremented only when
51 // m_mutex_nodes is locked.
52 int locks;
53
54 // Next node in the list. It's nullptr for the last node in the list.
55 node* next;
56
57 // Thread used to add the node to the list (i.e. the thread where
58 // safe_list::push_back() was used). We suppose that the same
59 // thread will remove the node.
60 std::thread::id creator_thread;
61
62 // Pointer to the first iterator that locked this node in the same
63 // thread it was created. It is used to unlock() the node when
64 // erase() is called in the same iterator loop/call.
65 iterator* creator_thread_iterator;
66
67 node(T* value = nullptr)
valuenode68 : value(value),
69 locks(0),
70 next(nullptr),
71 creator_thread(std::this_thread::get_id()),
72 creator_thread_iterator(nullptr) {
73 }
74
75 node(const node&) = delete;
76 node& operator=(const node&) = delete;
77
78 // Returns true if we are using this node from the same thread
79 // where it was created. (i.e. the thread where
80 // safe_list::push_back() was used.)
81 //
82 // This function is used to know if an iterator that locks/unlocks
83 // a node belongs to the same "creator thread," so when we erase()
84 // the node, we can (must) unlock all those iterators.
in_creator_threadnode85 bool in_creator_thread() const {
86 return (creator_thread == std::this_thread::get_id());
87 }
88
89 // Locks the node by the given iterator. It means that
90 // iterator::operator*() is going to return the node's value so we
91 // can use it. (E.g. in case of a slot, we can call the slot
92 // function.)
93 void lock(iterator* it);
94
95 // Indicates that the node is not being used by the given iterator
96 // anymore. So we could delete it in case that erase() is
97 // called.
98 void unlock(iterator* it);
99
100 // Notify to all iterators in the "creator thread" that they don't
101 // own a node lock anymore. It's used to erase() the node.
102 void unlock_all();
103 };
104
105 // Mutex used to modify the linked-list (m_first/m_last and node::next).
106 mutable std::mutex m_mutex_nodes;
107
108 // Used to iterate the list from the first element to the last one.
109 node* m_first = nullptr;
110
111 // Used to add new items at the end of the list (with push_back()).
112 node* m_last = nullptr;
113
114 // "m_ref" indicates the number of times this list is being iterated
115 // simultaneously. When "m_ref" reaches 0, the delete_nodes()
116 // function is called to delete all unused nodes (unlocked nodes
117 // with value=nullptr). While "m_ref" > 0 it means that we shouldn't
118 // remove nodes (so we can ensure that an actual node reference is
119 // still valid until the next unref()).
120 std::mutex m_mutex_ref;
121 int m_ref = 0;
122
123 // Flag that indicates if some node was erased and delete_nodes()
124 // should iterate the whole list to clean disabled nodes (nodes with
125 // value = nullptr).
126 bool m_delete_nodes = false;
127
128 // Used to notify when a node's locks is zero so erase() can continue.
129 std::condition_variable m_delete_cv;
130
131 public:
132
133 // A STL-like iterator for safe_list. It is not a fully working
134 // iterator, and shouldn't be used directly, it's expected to be
135 // used only in range-based for loops.
136 //
137 // The iterator works in the following way:
138 //
139 // 1. It adds a new reference (ref()/unref()) to the safe_list so
140 // nodes are not deleted while the iterator is alive
141 // 2. operator*() locks the node and returns its value, when a node
142 // is locked we can use it's "value" (call a slot/observer)
143 // 3. When the iterator is incremented (operator++) it unlocks the
144 // previous node and goes to the next one (the next node is not
145 // locked until we use operator*() again)
146 class iterator : public std::iterator<std::forward_iterator_tag, T*> {
147 public:
148 friend struct node;
149
iterator(safe_list & list,node * node)150 iterator(safe_list& list, node* node)
151 : m_list(list),
152 m_node(node),
153 m_locked(false),
154 m_next_iterator(nullptr) {
155 m_list.ref();
156 }
157
158 // Cannot copy iterators
159 iterator(const iterator&) = delete;
160 iterator& operator=(const iterator&) = delete;
161
162 // We can only move iterators
iterator(iterator && other)163 iterator(iterator&& other)
164 : m_list(other.m_list),
165 m_node(other.m_node),
166 m_locked(false),
167 m_next_iterator(nullptr) {
168 assert(!other.m_locked);
169 m_list.ref();
170 }
171
~iterator()172 ~iterator() {
173 assert(!m_locked);
174 m_list.unref();
175 }
176
177 // Called when erase() is used from the iterators created in the
178 // "creator thread".
notify_unlock(const node * node)179 void notify_unlock(const node* node) {
180 if (m_locked) {
181 assert(m_node == node);
182 assert(m_locked);
183 m_locked = false;
184 }
185 }
186
187 // Unlocks the current m_node and goes to the next enabled
188 // (node::value != nullptr) node. It doesn't lock the new found
189 // node, operator*() is the member function that locks the node.
190 iterator& operator++() {
191 std::lock_guard<std::mutex> lock(m_list.m_mutex_nodes);
192 assert(m_node);
193 if (m_node) {
194 if (m_locked) {
195 m_node->unlock(this);
196 m_locked = false;
197
198 // node's locks count is zero
199 if (m_node->locks == 0)
200 m_list.m_delete_cv.notify_all();
201 }
202 m_node = m_node->next;
203 }
204 return *this;
205 }
206
207 // Tries to lock the node and returns it's value. If the node was
208 // already deleted, it will return nullptr and the client will
209 // need to call operator++() again. We cannot guarantee that this
210 // function will return a value != nullptr.
211 T* operator*() {
212 std::lock_guard<std::mutex> lock(m_list.m_mutex_nodes);
213 assert(m_node);
214 if (m_node->value) {
215 // Add a lock to m_node before we access to its value. It's
216 // used to keep track of how many iterators are using the node
217 // in the list.
218 if (!m_locked) {
219 m_node->lock(this);
220 m_locked = true;
221 }
222
223 assert(m_node->value);
224 return m_node->value;
225 }
226 else {
227 // We might try to iterate to the following nodes to get a
228 // m_node->value != nullptr, but we might reach the last node
229 // and should return nullptr anyway (also the next
230 // operator++() call would be an invalid "++it" call using the
231 // end of the list).
232 return nullptr;
233 }
234 }
235
236 // This can be used only to compare an iterator created from
237 // begin() (in "this" pointer) with end() ("other" argument).
238 bool operator!=(const iterator& other) const {
239 std::lock_guard<std::mutex> lock(m_list.m_mutex_nodes);
240 if (m_node && other.m_node)
241 return (m_node != other.m_node->next);
242 else
243 return false;
244 }
245
246 private:
247 safe_list& m_list;
248
249 // Current node being iterated. It is never nullptr.
250 node* m_node;
251
252 // True if this iterator has added a lock to the "m_node"
253 bool m_locked;
254
255 // Next iterator locking the same "m_node" from its creator
256 // thread.
257 iterator* m_next_iterator;
258 };
259
safe_list()260 safe_list() {
261 }
262
~safe_list()263 ~safe_list() {
264 assert(m_ref == 0);
265 #if _DEBUG
266 {
267 std::lock_guard<std::mutex> lock(m_mutex_nodes);
268 for (node* node=m_first; node; node=node->next) {
269 assert(!node->locks);
270 }
271 }
272 #endif
273 delete_nodes(true);
274
275 assert(m_first == m_last);
276 assert(m_first == nullptr);
277 }
278
empty()279 bool empty() const {
280 std::lock_guard<std::mutex> lock(m_mutex_nodes);
281 return (m_first == m_last);
282 }
283
push_back(T * value)284 void push_back(T* value) {
285 node* n = new node(value);
286
287 std::lock_guard<std::mutex> lock(m_mutex_nodes);
288 if (!m_first)
289 m_first = m_last = n;
290 else {
291 m_last->next = n;
292 m_last = n;
293 }
294 }
295
erase(T * value)296 void erase(T* value) {
297 // We add a ref to avoid calling delete_nodes().
298 ref();
299 {
300 std::unique_lock<std::mutex> lock(m_mutex_nodes);
301
302 for (node* node=m_first; node; node=node->next) {
303 if (node->value == value) {
304 // We disable the node so it isn't used anymore by other
305 // iterators.
306 assert(node->value);
307 node->unlock_all();
308 node->value = nullptr;
309 m_delete_nodes = true;
310
311 // In this case we should wait until the node is unlocked,
312 // because after erase() the client could be deleting the
313 // value that we are using in other thread.
314 if (node->locks) {
315 // Wait until the node is completely unlocked by other
316 // threads.
317 m_delete_cv.wait(lock, [node]{ return node->locks == 0; });
318 }
319
320 assert(node->locks == 0);
321
322 // The node will be finally deleted when we leave the
323 // iteration loop (m_ref==0, i.e. the end() iterator is
324 // destroyed)
325 break;
326 }
327 }
328 }
329 unref();
330 }
331
begin()332 iterator begin() {
333 std::lock_guard<std::mutex> lock(m_mutex_nodes);
334 return iterator(*this, m_first);
335 }
336
end()337 iterator end() {
338 std::lock_guard<std::mutex> lock(m_mutex_nodes);
339 return iterator(*this, m_last);
340 }
341
ref()342 void ref() {
343 std::lock_guard<std::mutex> lock(m_mutex_ref);
344 ++m_ref;
345 assert(m_ref > 0);
346 }
347
unref()348 void unref() {
349 std::lock_guard<std::mutex> lock(m_mutex_ref);
350 assert(m_ref > 0);
351 --m_ref;
352 if (m_ref == 0 && m_delete_nodes) {
353 delete_nodes(false);
354 m_delete_nodes = false;
355 }
356 }
357
358 private:
359 // Deletes nodes from the list. If "all" is true, deletes all nodes,
360 // if it's false, it deletes only nodes with value == nullptr, which
361 // are nodes that were disabled
delete_nodes(bool all)362 void delete_nodes(bool all) {
363 node* prev = nullptr;
364 node* next = nullptr;
365
366 for (node* node=m_first; node; node=next) {
367 next = node->next;
368
369 if ((all || !node->value) && !node->locks) {
370 if (prev) {
371 prev->next = next;
372 if (node == m_last)
373 m_last = prev;
374 }
375 else {
376 m_first = next;
377 if (node == m_last)
378 m_last = m_first;
379 }
380
381 delete node;
382 }
383 else
384 prev = node;
385 }
386 }
387
388 };
389
390 template<typename T>
lock(iterator * it)391 void safe_list<T>::node::lock(iterator* it) {
392 ++locks;
393 assert(locks > 0);
394
395 // If we are in the creator thread, we add this iterator in the
396 // "creator thread iterators" linked-list so the iterator is
397 // notified in case that the node is erased.
398 if (in_creator_thread()) {
399 it->m_next_iterator = creator_thread_iterator;
400 creator_thread_iterator = it;
401 }
402 }
403
404 template<typename T>
unlock(iterator * it)405 void safe_list<T>::node::unlock(iterator* it) {
406 assert(it);
407
408 // In this case we are unlocking just one iterator, if we are in the
409 // creator thread, we've to remove this iterator from the "creator
410 // thread iterators" linked-list.
411 if (in_creator_thread()) {
412 iterator* prev = nullptr;
413 iterator* next = nullptr;
414 for (auto it2=creator_thread_iterator; it2; it2=next) {
415 next = it2->m_next_iterator;
416 if (it2 == it) {
417 if (prev)
418 prev->m_next_iterator = next;
419 else
420 creator_thread_iterator = next;
421
422 break;
423 }
424 prev = it2;
425 }
426 }
427
428 assert(locks > 0);
429 --locks;
430 }
431
432 // In this case we've called erase() to delete this node, so we have
433 // to unlock the node from the creator thread if we are in the
434 // creator thread.
435 template<typename T>
unlock_all()436 void safe_list<T>::node::unlock_all() {
437 if (in_creator_thread()) {
438 // Notify to all iterators in the creator thread that they don't
439 // have the node locked anymore. In this way we can continue the
440 // erase() call.
441 for (auto it=creator_thread_iterator; it; it=it->m_next_iterator) {
442 it->notify_unlock(this);
443
444 assert(locks > 0);
445 --locks;
446 }
447 creator_thread_iterator = nullptr;
448 }
449 }
450
451 } // namespace obs
452
453 #endif
454