1 #ifndef _CALLBACK_REGISTRY_TABLE_H_
2 #define _CALLBACK_REGISTRY_TABLE_H_
3 
4 #include <Rcpp.h>
5 #include <memory>
6 #include "threadutils.h"
7 #include "debug.h"
8 #include "callback_registry.h"
9 #include "later.h"
10 
11 using std::shared_ptr;
12 using std::make_shared;
13 
14 // ============================================================================
15 // Callback registry table
16 // ============================================================================
17 //
18 // This class is used for accessing a registry by ID. The CallbackRegistries
19 // also have a tree structure. The global loop/registry is the root. However,
20 // there can also be trees that are independent of the global loop, if a loop
21 // is created without a parent.
22 //
23 // The operations on this class are thread-safe, because they might be used to
24 // from another thread.
25 //
26 class CallbackRegistryTable {
27 
28   // Basically a struct that keeps track of a registry and whether or an R loop
29   // object references it.
30   class RegistryHandle {
31   public:
RegistryHandle(std::shared_ptr<CallbackRegistry> registry,bool r_ref_exists)32     RegistryHandle(std::shared_ptr<CallbackRegistry> registry, bool r_ref_exists)
33       : registry(registry), r_ref_exists(r_ref_exists) {
34     };
35     // Need to declare a copy constructor. Needed because pre-C++11 std::map
36     // doesn't have an .emplace() method.
37     RegistryHandle() = default;
38 
39     std::shared_ptr<CallbackRegistry> registry;
40     bool r_ref_exists;
41   };
42 
43 public:
CallbackRegistryTable()44   CallbackRegistryTable() : mutex(tct_mtx_plain | tct_mtx_recursive), condvar(mutex)  {
45   }
46 
exists(int id)47   bool exists(int id) {
48     Guard guard(&mutex);
49     return (registries.find(id) != registries.end());
50   }
51 
52   // Create a new CallbackRegistry. If parent_id is -1, then there is no parent.
create(int id,int parent_id)53   void create(int id, int parent_id) {
54     ASSERT_MAIN_THREAD()
55     Guard guard(&mutex);
56 
57     if (exists(id)) {
58       Rcpp::stop("Can't create event loop %d because it already exists.", id);
59     }
60 
61     // Each new registry is passed our mutex and condvar. These serve as a
62     // shared lock across all CallbackRegistries and this
63     // CallbackRegistryTable. If each registry had a separate lock, some
64     // routines would recursively acquire a lock downward in the
65     // CallbackRegistry tree, and some recursively acquire a lock upward;
66     // without a shared lock, if these things happen at the same time from
67     // different threads, it could deadlock.
68     shared_ptr<CallbackRegistry> registry = make_shared<CallbackRegistry>(id, &mutex, &condvar);
69 
70     if (parent_id != -1) {
71       shared_ptr<CallbackRegistry> parent = getRegistry(parent_id);
72       if (parent == nullptr) {
73         Rcpp::stop("Can't create registry. Parent with id %d does not exist.", parent_id);
74       }
75       registry->parent = parent;
76       parent->children.push_back(registry);
77     }
78 
79     // Would be better to use .emplace() to avoid copy-constructor, but that
80     // requires C++11.
81     registries[id] = RegistryHandle(registry, true);
82   }
83 
84   // Returns a shared_ptr to the registry. If the registry is not present in
85   // the table, or if the target CallbackRegistry has already been deleted,
86   // then the shared_ptr is empty.
getRegistry(int id)87   shared_ptr<CallbackRegistry> getRegistry(int id) {
88     Guard guard(&mutex);
89     if (!exists(id)) {
90       return shared_ptr<CallbackRegistry>();
91     }
92     // If the target of the shared_ptr has been deleted, then this is an empty
93     // shared_ptr.
94     return registries[id].registry;
95   }
96 
scheduleCallback(void (* func)(void *),void * data,double delaySecs,int loop_id)97   uint64_t scheduleCallback(void (*func)(void*), void* data, double delaySecs, int loop_id) {
98     // This method can be called from any thread
99     Guard guard(&mutex);
100 
101     shared_ptr<CallbackRegistry> registry = getRegistry(loop_id);
102     if (registry == nullptr) {
103       return 0;
104     }
105     return doExecLater(registry, func, data, delaySecs, true);
106   }
107 
108   // This is called when the R loop handle referring to a CallbackRegistry is
109   // destroyed. Returns true if the CallbackRegistry exists and this function
110   // has not previously been called on it; false otherwise.
notifyRRefDeleted(int id)111   bool notifyRRefDeleted(int id) {
112     ASSERT_MAIN_THREAD()
113     Guard guard(&mutex);
114 
115     if (!exists(id)) {
116       return false;
117     }
118 
119     if (registries[id].r_ref_exists) {
120       registries[id].r_ref_exists = false;
121       this->pruneRegistries();
122       return true;
123     } else {
124       return false;
125     }
126   }
127 
128   // Iterate over all registries, and remove a registry when:
129   // * If the loop has a parent:
130   //   * There's no R loop object referring to it, AND the registry is empty.
131   // * If the loop does not have a parent:
132   //   * There's no R loop object referring to it. (Dont' need the registry to
133   //     be empty, because if there's no parent and no R reference to the loop,
134   //     there is no way to execute callbacks in the registry.)
pruneRegistries()135   void pruneRegistries() {
136     ASSERT_MAIN_THREAD()
137     Guard guard(&mutex);
138 
139     std::map<int, RegistryHandle>::iterator it = registries.begin();
140 
141     // Iterate over all registries. Remove under the following conditions:
142     // * There are no more R loop handle references to the registry, AND
143     // * The registry is empty, OR the registry has no parent.
144     // This logic is equivalent to the logic describing the function, just in
145     // a different order.
146     //
147     // std::map are sorted, and children always have a larger ID than their
148     // parents. Because of this, if there is a case where initially a child does
149     // not have any R refs, but the parent does have an R ref, then the parent's
150     // R ref is deleted, both will removed in a single pass.
151     while (it != registries.end()) {
152       if (!it->second.r_ref_exists &&
153           (it->second.registry->empty() || it->second.registry->parent == nullptr))
154       {
155         // Need to increment iterator before removing the registry; otherwise
156         // the iterator will be invalid.
157         int id = it->first;
158         it++;
159         remove(id);
160       } else {
161         it++;
162       }
163     }
164   }
165 
166   // Remove a callback registry from the table
remove(int id)167   bool remove(int id) {
168     // Removal is always called from the main thread.
169     ASSERT_MAIN_THREAD()
170     Guard guard(&mutex);
171 
172     shared_ptr<CallbackRegistry> registry = getRegistry(id);
173     if (registry == nullptr) {
174       return false;
175     }
176 
177     // Deregister this object from its parent. Do it here instead of the in the
178     // CallbackRegistry destructor, for two reasons: One is that we can be 100%
179     // sure that the deregistration happens right now (it's possible that the
180     // object's destructor won't run yet, because someone else has a shared_ptr
181     // to it). Second, we can't reliably use a shared_ptr to the object from
182     // inside its destructor; we need to some pointer comparison, but by the
183     // time the destructor runs, you can't run shared_from_this() in the object,
184     // because there are no more shared_ptrs to it.
185     shared_ptr<CallbackRegistry> parent = registry->parent;
186     if (parent != nullptr) {
187       // Remove this registry from the parent's list of children.
188       for (std::vector<shared_ptr<CallbackRegistry> >::iterator it = parent->children.begin();
189            it != parent->children.end();
190           )
191       {
192         if (*it == registry) {
193           parent->children.erase(it);
194           break;
195         } else {
196           ++it;
197         }
198       }
199     }
200 
201     // Tell the children that they no longer have a parent.
202     for (std::vector<std::shared_ptr<CallbackRegistry> >::iterator it = registry->children.begin();
203          it != registry->children.end();
204          ++it)
205     {
206       (*it)->parent.reset();
207     }
208 
209     registries.erase(id);
210 
211     return true;
212   }
213 
214 private:
215   std::map<int, RegistryHandle> registries;
216   Mutex mutex;
217   ConditionVariable condvar;
218 
219 };
220 
221 
222 #endif
223