1 // Observable Library
2 // Copyright (c) 2016-2017 David Capello
3 //
4 // This file is released under the terms of the MIT license.
5 // Read LICENSE.txt for more information.
6 
7 #ifndef OBS_SAFE_LIST_H_INCLUDED
8 #define OBS_SAFE_LIST_H_INCLUDED
9 #pragma once
10 
11 #include <cassert>
12 #include <chrono>
13 #include <condition_variable>
14 #include <iterator>
15 #include <mutex>
16 #include <thread>
17 #include <vector>
18 
19 namespace obs {
20 
21 // A STL-like list which is safe to remove/add items from multiple
22 // threads while it's being iterated by multiple threads too.
23 template<typename T>
24 class safe_list {
25 public:
26   class iterator;
27 
28 private:
29   // A node in the linked list.
30   struct node {
31     // Pointer to a slot or an observer instance.
32     //
33     // As we cannot modify the list when we are in for-loops iterating
34     // the list, we can temporally mark nodes as disabled changing
35     // this "value" member to nullptr when we use erase(), Then when
36     // the list is not iterated anymore (m_ref==0), we call
37     // delete_nodes() to remove all disabled nodes from the list.
38     //
39     // We try to skip nodes with value=nullptr in the iteration
40     // process (iterator::operator++). But it isn't possible to ensure
41     // that an iterator will always return a non-nullptr value (so the
42     // client have to check the return value from iterators).
43     T* value;
44 
45     // Number of locks for this node, it means the number of iterators
46     // being used and currently pointing to this node. (I.e. number of
47     // times an iterator::operator*() was called for this node and the
48     // iterator is still alive.)
49     //
50     // This variable is incremented/decremented only when
51     // m_mutex_nodes is locked.
52     int locks;
53 
54     // Next node in the list. It's nullptr for the last node in the list.
55     node* next;
56 
57     // Thread used to add the node to the list (i.e. the thread where
58     // safe_list::push_back() was used). We suppose that the same
59     // thread will remove the node.
60     std::thread::id creator_thread;
61 
62     // Pointer to the first iterator that locked this node in the same
63     // thread it was created. It is used to unlock() the node when
64     // erase() is called in the same iterator loop/call.
65     iterator* creator_thread_iterator;
66 
67     node(T* value = nullptr)
valuenode68       : value(value),
69         locks(0),
70         next(nullptr),
71         creator_thread(std::this_thread::get_id()),
72         creator_thread_iterator(nullptr) {
73     }
74 
75     node(const node&) = delete;
76     node& operator=(const node&) = delete;
77 
78     // Returns true if we are using this node from the same thread
79     // where it was created. (i.e. the thread where
80     // safe_list::push_back() was used.)
81     //
82     // This function is used to know if an iterator that locks/unlocks
83     // a node belongs to the same "creator thread," so when we erase()
84     // the node, we can (must) unlock all those iterators.
in_creator_threadnode85     bool in_creator_thread() const {
86       return (creator_thread == std::this_thread::get_id());
87     }
88 
89     // Locks the node by the given iterator. It means that
90     // iterator::operator*() is going to return the node's value so we
91     // can use it. (E.g. in case of a slot, we can call the slot
92     // function.)
93     void lock(iterator* it);
94 
95     // Indicates that the node is not being used by the given iterator
96     // anymore. So we could delete it in case that erase() is
97     // called.
98     void unlock(iterator* it);
99 
100     // Notify to all iterators in the "creator thread" that they don't
101     // own a node lock anymore. It's used to erase() the node.
102     void unlock_all();
103   };
104 
105   // Mutex used to modify the linked-list (m_first/m_last and node::next).
106   mutable std::mutex m_mutex_nodes;
107 
108   // Used to iterate the list from the first element to the last one.
109   node* m_first = nullptr;
110 
111   // Used to add new items at the end of the list (with push_back()).
112   node* m_last = nullptr;
113 
114   // "m_ref" indicates the number of times this list is being iterated
115   // simultaneously.  When "m_ref" reaches 0, the delete_nodes()
116   // function is called to delete all unused nodes (unlocked nodes
117   // with value=nullptr). While "m_ref" > 0 it means that we shouldn't
118   // remove nodes (so we can ensure that an actual node reference is
119   // still valid until the next unref()).
120   std::mutex m_mutex_ref;
121   int m_ref = 0;
122 
123   // Flag that indicates if some node was erased and delete_nodes()
124   // should iterate the whole list to clean disabled nodes (nodes with
125   // value = nullptr).
126   bool m_delete_nodes = false;
127 
128   // Used to notify when a node's locks is zero so erase() can continue.
129   std::condition_variable m_delete_cv;
130 
131 public:
132 
133   // A STL-like iterator for safe_list. It is not a fully working
134   // iterator, and shouldn't be used directly, it's expected to be
135   // used only in range-based for loops.
136   //
137   // The iterator works in the following way:
138   //
139   // 1. It adds a new reference (ref()/unref()) to the safe_list so
140   //    nodes are not deleted while the iterator is alive
141   // 2. operator*() locks the node and returns its value, when a node
142   //    is locked we can use it's "value" (call a slot/observer)
143   // 3. When the iterator is incremented (operator++) it unlocks the
144   //    previous node and goes to the next one (the next node is not
145   //    locked until we use operator*() again)
146   class iterator : public std::iterator<std::forward_iterator_tag, T*> {
147   public:
148     friend struct node;
149 
iterator(safe_list & list,node * node)150     iterator(safe_list& list, node* node)
151       : m_list(list),
152         m_node(node),
153         m_locked(false),
154         m_next_iterator(nullptr) {
155       m_list.ref();
156     }
157 
158     // Cannot copy iterators
159     iterator(const iterator&) = delete;
160     iterator& operator=(const iterator&) = delete;
161 
162     // We can only move iterators
iterator(iterator && other)163     iterator(iterator&& other)
164       : m_list(other.m_list),
165         m_node(other.m_node),
166         m_locked(false),
167         m_next_iterator(nullptr) {
168       assert(!other.m_locked);
169       m_list.ref();
170     }
171 
~iterator()172     ~iterator() {
173       assert(!m_locked);
174       m_list.unref();
175     }
176 
177     // Called when erase() is used from the iterators created in the
178     // "creator thread".
notify_unlock(const node * node)179     void notify_unlock(const node* node) {
180       if (m_locked) {
181         assert(m_node == node);
182         assert(m_locked);
183         m_locked = false;
184       }
185     }
186 
187     // Unlocks the current m_node and goes to the next enabled
188     // (node::value != nullptr) node. It doesn't lock the new found
189     // node, operator*() is the member function that locks the node.
190     iterator& operator++() {
191       std::lock_guard<std::mutex> lock(m_list.m_mutex_nodes);
192       assert(m_node);
193       if (m_node) {
194         if (m_locked) {
195           m_node->unlock(this);
196           m_locked = false;
197 
198           // node's locks count is zero
199           if (m_node->locks == 0)
200             m_list.m_delete_cv.notify_all();
201         }
202         m_node = m_node->next;
203       }
204       return *this;
205     }
206 
207     // Tries to lock the node and returns it's value. If the node was
208     // already deleted, it will return nullptr and the client will
209     // need to call operator++() again. We cannot guarantee that this
210     // function will return a value != nullptr.
211     T* operator*() {
212       std::lock_guard<std::mutex> lock(m_list.m_mutex_nodes);
213       assert(m_node);
214       if (m_node->value) {
215         // Add a lock to m_node before we access to its value. It's
216         // used to keep track of how many iterators are using the node
217         // in the list.
218         if (!m_locked) {
219           m_node->lock(this);
220           m_locked = true;
221         }
222 
223         assert(m_node->value);
224         return m_node->value;
225       }
226       else {
227         // We might try to iterate to the following nodes to get a
228         // m_node->value != nullptr, but we might reach the last node
229         // and should return nullptr anyway (also the next
230         // operator++() call would be an invalid "++it" call using the
231         // end of the list).
232         return nullptr;
233       }
234     }
235 
236     // This can be used only to compare an iterator created from
237     // begin() (in "this" pointer) with end() ("other" argument).
238     bool operator!=(const iterator& other) const {
239       std::lock_guard<std::mutex> lock(m_list.m_mutex_nodes);
240       if (m_node && other.m_node)
241         return (m_node != other.m_node->next);
242       else
243         return false;
244     }
245 
246   private:
247     safe_list& m_list;
248 
249     // Current node being iterated. It is never nullptr.
250     node* m_node;
251 
252     // True if this iterator has added a lock to the "m_node"
253     bool m_locked;
254 
255     // Next iterator locking the same "m_node" from its creator
256     // thread.
257     iterator* m_next_iterator;
258   };
259 
safe_list()260   safe_list() {
261   }
262 
~safe_list()263   ~safe_list() {
264     assert(m_ref == 0);
265 #if _DEBUG
266     {
267       std::lock_guard<std::mutex> lock(m_mutex_nodes);
268       for (node* node=m_first; node; node=node->next) {
269         assert(!node->locks);
270       }
271     }
272 #endif
273     delete_nodes(true);
274 
275     assert(m_first == m_last);
276     assert(m_first == nullptr);
277   }
278 
empty()279   bool empty() const {
280     std::lock_guard<std::mutex> lock(m_mutex_nodes);
281     return (m_first == m_last);
282   }
283 
push_back(T * value)284   void push_back(T* value) {
285     node* n = new node(value);
286 
287     std::lock_guard<std::mutex> lock(m_mutex_nodes);
288     if (!m_first)
289       m_first = m_last = n;
290     else {
291       m_last->next = n;
292       m_last = n;
293     }
294   }
295 
erase(T * value)296   void erase(T* value) {
297     // We add a ref to avoid calling delete_nodes().
298     ref();
299     {
300       std::unique_lock<std::mutex> lock(m_mutex_nodes);
301 
302       for (node* node=m_first; node; node=node->next) {
303         if (node->value == value) {
304           // We disable the node so it isn't used anymore by other
305           // iterators.
306           assert(node->value);
307           node->unlock_all();
308           node->value = nullptr;
309           m_delete_nodes = true;
310 
311           // In this case we should wait until the node is unlocked,
312           // because after erase() the client could be deleting the
313           // value that we are using in other thread.
314           if (node->locks) {
315             // Wait until the node is completely unlocked by other
316             // threads.
317             m_delete_cv.wait(lock, [node]{ return node->locks == 0; });
318           }
319 
320           assert(node->locks == 0);
321 
322           // The node will be finally deleted when we leave the
323           // iteration loop (m_ref==0, i.e. the end() iterator is
324           // destroyed)
325           break;
326         }
327       }
328     }
329     unref();
330   }
331 
begin()332   iterator begin() {
333     std::lock_guard<std::mutex> lock(m_mutex_nodes);
334     return iterator(*this, m_first);
335   }
336 
end()337   iterator end() {
338     std::lock_guard<std::mutex> lock(m_mutex_nodes);
339     return iterator(*this, m_last);
340   }
341 
ref()342   void ref() {
343     std::lock_guard<std::mutex> lock(m_mutex_ref);
344     ++m_ref;
345     assert(m_ref > 0);
346   }
347 
unref()348   void unref() {
349     std::lock_guard<std::mutex> lock(m_mutex_ref);
350     assert(m_ref > 0);
351     --m_ref;
352     if (m_ref == 0 && m_delete_nodes) {
353       delete_nodes(false);
354       m_delete_nodes = false;
355     }
356   }
357 
358 private:
359   // Deletes nodes from the list. If "all" is true, deletes all nodes,
360   // if it's false, it deletes only nodes with value == nullptr, which
361   // are nodes that were disabled
delete_nodes(bool all)362   void delete_nodes(bool all) {
363     node* prev = nullptr;
364     node* next = nullptr;
365 
366     for (node* node=m_first; node; node=next) {
367       next = node->next;
368 
369       if ((all || !node->value) && !node->locks) {
370         if (prev) {
371           prev->next = next;
372           if (node == m_last)
373             m_last = prev;
374         }
375         else {
376           m_first = next;
377           if (node == m_last)
378             m_last = m_first;
379         }
380 
381         delete node;
382       }
383       else
384         prev = node;
385     }
386   }
387 
388 };
389 
390 template<typename T>
lock(iterator * it)391 void safe_list<T>::node::lock(iterator* it) {
392   ++locks;
393   assert(locks > 0);
394 
395   // If we are in the creator thread, we add this iterator in the
396   // "creator thread iterators" linked-list so the iterator is
397   // notified in case that the node is erased.
398   if (in_creator_thread()) {
399     it->m_next_iterator = creator_thread_iterator;
400     creator_thread_iterator = it;
401   }
402 }
403 
404 template<typename T>
unlock(iterator * it)405 void safe_list<T>::node::unlock(iterator* it) {
406   assert(it);
407 
408   // In this case we are unlocking just one iterator, if we are in the
409   // creator thread, we've to remove this iterator from the "creator
410   // thread iterators" linked-list.
411   if (in_creator_thread()) {
412     iterator* prev = nullptr;
413     iterator* next = nullptr;
414     for (auto it2=creator_thread_iterator; it2; it2=next) {
415       next = it2->m_next_iterator;
416       if (it2 == it) {
417         if (prev)
418           prev->m_next_iterator = next;
419         else
420           creator_thread_iterator = next;
421 
422         break;
423       }
424       prev = it2;
425     }
426   }
427 
428   assert(locks > 0);
429   --locks;
430 }
431 
432 // In this case we've called erase() to delete this node, so we have
433 // to unlock the node from the creator thread if we are in the
434 // creator thread.
435 template<typename T>
unlock_all()436 void safe_list<T>::node::unlock_all() {
437   if (in_creator_thread()) {
438     // Notify to all iterators in the creator thread that they don't
439     // have the node locked anymore. In this way we can continue the
440     // erase() call.
441     for (auto it=creator_thread_iterator; it; it=it->m_next_iterator) {
442       it->notify_unlock(this);
443 
444       assert(locks > 0);
445       --locks;
446     }
447     creator_thread_iterator = nullptr;
448   }
449 }
450 
451 } // namespace obs
452 
453 #endif
454