1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/experimental/observer/detail/Core.h>
18 
19 #include <folly/ExceptionString.h>
20 #include <folly/experimental/observer/detail/ObserverManager.h>
21 
22 namespace folly {
23 namespace observer_detail {
24 
getData()25 Core::VersionedData Core::getData() {
26   if (!ObserverManager::DependencyRecorder::isActive()) {
27     return data_.copy();
28   }
29 
30   ObserverManager::DependencyRecorder::markDependency(shared_from_this());
31 
32   auto version = ObserverManager::getVersion();
33 
34   if (version_ >= version) {
35     return data_.copy();
36   }
37 
38   refresh(version);
39 
40   DCHECK_GE(version_, version);
41   return data_.copy();
42 }
43 
refresh(size_t version)44 size_t Core::refresh(size_t version) {
45   CHECK(ObserverManager::inManagerThread());
46 
47   ObserverManager::DependencyRecorder::markRefreshDependency(*this);
48   SCOPE_EXIT {
49     ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this);
50   };
51 
52   if (version_ >= version) {
53     return versionLastChange_;
54   }
55 
56   {
57     std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
58 
59     // Recheck in case this code was already refreshed
60     if (version_ >= version) {
61       return versionLastChange_;
62     }
63 
64     bool needRefresh = std::exchange(forceRefresh_, false) || version_ == 0;
65 
66     ObserverManager::DependencyRecorder dependencyRecorder(*this);
67 
68     // This can be run in parallel, but we expect most updates to propagate
69     // bottom to top.
70     dependencies_.withRLock([&](const Dependencies& dependencies) {
71       for (const auto& dependency : dependencies) {
72         try {
73           if (dependency->refresh(version) > version_) {
74             needRefresh = true;
75             break;
76           }
77         } catch (...) {
78           LOG(ERROR) << "Exception while checking dependencies for updates: "
79                      << exceptionStr(std::current_exception());
80 
81           needRefresh = true;
82           break;
83         }
84       }
85     });
86 
87     if (!needRefresh) {
88       version_ = version;
89       return versionLastChange_;
90     }
91 
92     try {
93       VersionedData newData{creator_(), version};
94       if (!newData.data) {
95         throw std::logic_error("Observer creator returned nullptr.");
96       }
97       if (data_.copy().data != newData.data) {
98         data_.swap(newData);
99         versionLastChange_ = version;
100       }
101     } catch (...) {
102       LOG(ERROR) << "Exception while refreshing Observer: "
103                  << exceptionStr(std::current_exception());
104 
105       if (version_ == 0) {
106         // Re-throw exception if this is the first time we run creator
107         throw;
108       }
109     }
110 
111     version_ = version;
112 
113     if (versionLastChange_ != version) {
114       return versionLastChange_;
115     }
116 
117     auto newDependencies = dependencyRecorder.release();
118     dependencies_.withWLock([&](Dependencies& dependencies) {
119       for (const auto& dependency : newDependencies) {
120         if (!dependencies.count(dependency)) {
121           dependency->addDependent(this->shared_from_this());
122         }
123       }
124 
125       for (const auto& dependency : dependencies) {
126         if (!newDependencies.count(dependency)) {
127           dependency->removeStaleDependents();
128         }
129       }
130 
131       dependencies = std::move(newDependencies);
132     });
133   }
134 
135   auto dependents = dependents_.copy();
136 
137   for (const auto& dependentWeak : dependents) {
138     if (auto dependent = dependentWeak.lock()) {
139       ObserverManager::scheduleRefresh(std::move(dependent), version);
140     }
141   }
142 
143   return versionLastChange_;
144 }
145 
setForceRefresh()146 void Core::setForceRefresh() {
147   forceRefresh_ = true;
148 }
149 
Core(folly::Function<std::shared_ptr<const void> ()> creator)150 Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
151     : creator_(std::move(creator)) {}
152 
~Core()153 Core::~Core() {
154   dependencies_.withWLock([](const Dependencies& dependencies) {
155     for (const auto& dependecy : dependencies) {
156       dependecy->removeStaleDependents();
157     }
158   });
159 }
160 
create(folly::Function<std::shared_ptr<const void> ()> creator)161 Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
162   auto core = Core::Ptr(new Core(std::move(creator)));
163   return core;
164 }
165 
addDependent(Core::WeakPtr dependent)166 void Core::addDependent(Core::WeakPtr dependent) {
167   dependents_.withWLock([&](Dependents& dependents) {
168     dependents.push_back(std::move(dependent));
169   });
170 }
171 
removeStaleDependents()172 void Core::removeStaleDependents() {
173   // This is inefficient, the assumption is that we won't have many dependents
174   dependents_.withWLock([](Dependents& deps) {
175     auto const pred = [](auto const& d) { return d.expired(); };
176     deps.erase(std::remove_if(deps.begin(), deps.end(), pred), deps.end());
177   });
178 }
179 } // namespace observer_detail
180 } // namespace folly
181