1 #ifndef _CALLBACK_REGISTRY_TABLE_H_ 2 #define _CALLBACK_REGISTRY_TABLE_H_ 3 4 #include <Rcpp.h> 5 #include <memory> 6 #include "threadutils.h" 7 #include "debug.h" 8 #include "callback_registry.h" 9 #include "later.h" 10 11 using std::shared_ptr; 12 using std::make_shared; 13 14 // ============================================================================ 15 // Callback registry table 16 // ============================================================================ 17 // 18 // This class is used for accessing a registry by ID. The CallbackRegistries 19 // also have a tree structure. The global loop/registry is the root. However, 20 // there can also be trees that are independent of the global loop, if a loop 21 // is created without a parent. 22 // 23 // The operations on this class are thread-safe, because they might be used to 24 // from another thread. 25 // 26 class CallbackRegistryTable { 27 28 // Basically a struct that keeps track of a registry and whether or an R loop 29 // object references it. 30 class RegistryHandle { 31 public: RegistryHandle(std::shared_ptr<CallbackRegistry> registry,bool r_ref_exists)32 RegistryHandle(std::shared_ptr<CallbackRegistry> registry, bool r_ref_exists) 33 : registry(registry), r_ref_exists(r_ref_exists) { 34 }; 35 // Need to declare a copy constructor. Needed because pre-C++11 std::map 36 // doesn't have an .emplace() method. 37 RegistryHandle() = default; 38 39 std::shared_ptr<CallbackRegistry> registry; 40 bool r_ref_exists; 41 }; 42 43 public: CallbackRegistryTable()44 CallbackRegistryTable() : mutex(tct_mtx_plain | tct_mtx_recursive), condvar(mutex) { 45 } 46 exists(int id)47 bool exists(int id) { 48 Guard guard(&mutex); 49 return (registries.find(id) != registries.end()); 50 } 51 52 // Create a new CallbackRegistry. If parent_id is -1, then there is no parent. create(int id,int parent_id)53 void create(int id, int parent_id) { 54 ASSERT_MAIN_THREAD() 55 Guard guard(&mutex); 56 57 if (exists(id)) { 58 Rcpp::stop("Can't create event loop %d because it already exists.", id); 59 } 60 61 // Each new registry is passed our mutex and condvar. These serve as a 62 // shared lock across all CallbackRegistries and this 63 // CallbackRegistryTable. If each registry had a separate lock, some 64 // routines would recursively acquire a lock downward in the 65 // CallbackRegistry tree, and some recursively acquire a lock upward; 66 // without a shared lock, if these things happen at the same time from 67 // different threads, it could deadlock. 68 shared_ptr<CallbackRegistry> registry = make_shared<CallbackRegistry>(id, &mutex, &condvar); 69 70 if (parent_id != -1) { 71 shared_ptr<CallbackRegistry> parent = getRegistry(parent_id); 72 if (parent == nullptr) { 73 Rcpp::stop("Can't create registry. Parent with id %d does not exist.", parent_id); 74 } 75 registry->parent = parent; 76 parent->children.push_back(registry); 77 } 78 79 // Would be better to use .emplace() to avoid copy-constructor, but that 80 // requires C++11. 81 registries[id] = RegistryHandle(registry, true); 82 } 83 84 // Returns a shared_ptr to the registry. If the registry is not present in 85 // the table, or if the target CallbackRegistry has already been deleted, 86 // then the shared_ptr is empty. getRegistry(int id)87 shared_ptr<CallbackRegistry> getRegistry(int id) { 88 Guard guard(&mutex); 89 if (!exists(id)) { 90 return shared_ptr<CallbackRegistry>(); 91 } 92 // If the target of the shared_ptr has been deleted, then this is an empty 93 // shared_ptr. 94 return registries[id].registry; 95 } 96 scheduleCallback(void (* func)(void *),void * data,double delaySecs,int loop_id)97 uint64_t scheduleCallback(void (*func)(void*), void* data, double delaySecs, int loop_id) { 98 // This method can be called from any thread 99 Guard guard(&mutex); 100 101 shared_ptr<CallbackRegistry> registry = getRegistry(loop_id); 102 if (registry == nullptr) { 103 return 0; 104 } 105 return doExecLater(registry, func, data, delaySecs, true); 106 } 107 108 // This is called when the R loop handle referring to a CallbackRegistry is 109 // destroyed. Returns true if the CallbackRegistry exists and this function 110 // has not previously been called on it; false otherwise. notifyRRefDeleted(int id)111 bool notifyRRefDeleted(int id) { 112 ASSERT_MAIN_THREAD() 113 Guard guard(&mutex); 114 115 if (!exists(id)) { 116 return false; 117 } 118 119 if (registries[id].r_ref_exists) { 120 registries[id].r_ref_exists = false; 121 this->pruneRegistries(); 122 return true; 123 } else { 124 return false; 125 } 126 } 127 128 // Iterate over all registries, and remove a registry when: 129 // * If the loop has a parent: 130 // * There's no R loop object referring to it, AND the registry is empty. 131 // * If the loop does not have a parent: 132 // * There's no R loop object referring to it. (Dont' need the registry to 133 // be empty, because if there's no parent and no R reference to the loop, 134 // there is no way to execute callbacks in the registry.) pruneRegistries()135 void pruneRegistries() { 136 ASSERT_MAIN_THREAD() 137 Guard guard(&mutex); 138 139 std::map<int, RegistryHandle>::iterator it = registries.begin(); 140 141 // Iterate over all registries. Remove under the following conditions: 142 // * There are no more R loop handle references to the registry, AND 143 // * The registry is empty, OR the registry has no parent. 144 // This logic is equivalent to the logic describing the function, just in 145 // a different order. 146 // 147 // std::map are sorted, and children always have a larger ID than their 148 // parents. Because of this, if there is a case where initially a child does 149 // not have any R refs, but the parent does have an R ref, then the parent's 150 // R ref is deleted, both will removed in a single pass. 151 while (it != registries.end()) { 152 if (!it->second.r_ref_exists && 153 (it->second.registry->empty() || it->second.registry->parent == nullptr)) 154 { 155 // Need to increment iterator before removing the registry; otherwise 156 // the iterator will be invalid. 157 int id = it->first; 158 it++; 159 remove(id); 160 } else { 161 it++; 162 } 163 } 164 } 165 166 // Remove a callback registry from the table remove(int id)167 bool remove(int id) { 168 // Removal is always called from the main thread. 169 ASSERT_MAIN_THREAD() 170 Guard guard(&mutex); 171 172 shared_ptr<CallbackRegistry> registry = getRegistry(id); 173 if (registry == nullptr) { 174 return false; 175 } 176 177 // Deregister this object from its parent. Do it here instead of the in the 178 // CallbackRegistry destructor, for two reasons: One is that we can be 100% 179 // sure that the deregistration happens right now (it's possible that the 180 // object's destructor won't run yet, because someone else has a shared_ptr 181 // to it). Second, we can't reliably use a shared_ptr to the object from 182 // inside its destructor; we need to some pointer comparison, but by the 183 // time the destructor runs, you can't run shared_from_this() in the object, 184 // because there are no more shared_ptrs to it. 185 shared_ptr<CallbackRegistry> parent = registry->parent; 186 if (parent != nullptr) { 187 // Remove this registry from the parent's list of children. 188 for (std::vector<shared_ptr<CallbackRegistry> >::iterator it = parent->children.begin(); 189 it != parent->children.end(); 190 ) 191 { 192 if (*it == registry) { 193 parent->children.erase(it); 194 break; 195 } else { 196 ++it; 197 } 198 } 199 } 200 201 // Tell the children that they no longer have a parent. 202 for (std::vector<std::shared_ptr<CallbackRegistry> >::iterator it = registry->children.begin(); 203 it != registry->children.end(); 204 ++it) 205 { 206 (*it)->parent.reset(); 207 } 208 209 registries.erase(id); 210 211 return true; 212 } 213 214 private: 215 std::map<int, RegistryHandle> registries; 216 Mutex mutex; 217 ConditionVariable condvar; 218 219 }; 220 221 222 #endif 223