1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/experimental/observer/detail/Core.h>
18
19 #include <folly/ExceptionString.h>
20 #include <folly/experimental/observer/detail/ObserverManager.h>
21
22 namespace folly {
23 namespace observer_detail {
24
getData()25 Core::VersionedData Core::getData() {
26 if (!ObserverManager::DependencyRecorder::isActive()) {
27 return data_.copy();
28 }
29
30 ObserverManager::DependencyRecorder::markDependency(shared_from_this());
31
32 auto version = ObserverManager::getVersion();
33
34 if (version_ >= version) {
35 return data_.copy();
36 }
37
38 refresh(version);
39
40 DCHECK_GE(version_, version);
41 return data_.copy();
42 }
43
refresh(size_t version)44 size_t Core::refresh(size_t version) {
45 CHECK(ObserverManager::inManagerThread());
46
47 ObserverManager::DependencyRecorder::markRefreshDependency(*this);
48 SCOPE_EXIT {
49 ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this);
50 };
51
52 if (version_ >= version) {
53 return versionLastChange_;
54 }
55
56 {
57 std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
58
59 // Recheck in case this code was already refreshed
60 if (version_ >= version) {
61 return versionLastChange_;
62 }
63
64 bool needRefresh = std::exchange(forceRefresh_, false) || version_ == 0;
65
66 ObserverManager::DependencyRecorder dependencyRecorder(*this);
67
68 // This can be run in parallel, but we expect most updates to propagate
69 // bottom to top.
70 dependencies_.withRLock([&](const Dependencies& dependencies) {
71 for (const auto& dependency : dependencies) {
72 try {
73 if (dependency->refresh(version) > version_) {
74 needRefresh = true;
75 break;
76 }
77 } catch (...) {
78 LOG(ERROR) << "Exception while checking dependencies for updates: "
79 << exceptionStr(std::current_exception());
80
81 needRefresh = true;
82 break;
83 }
84 }
85 });
86
87 if (!needRefresh) {
88 version_ = version;
89 return versionLastChange_;
90 }
91
92 try {
93 VersionedData newData{creator_(), version};
94 if (!newData.data) {
95 throw std::logic_error("Observer creator returned nullptr.");
96 }
97 if (data_.copy().data != newData.data) {
98 data_.swap(newData);
99 versionLastChange_ = version;
100 }
101 } catch (...) {
102 LOG(ERROR) << "Exception while refreshing Observer: "
103 << exceptionStr(std::current_exception());
104
105 if (version_ == 0) {
106 // Re-throw exception if this is the first time we run creator
107 throw;
108 }
109 }
110
111 version_ = version;
112
113 if (versionLastChange_ != version) {
114 return versionLastChange_;
115 }
116
117 auto newDependencies = dependencyRecorder.release();
118 dependencies_.withWLock([&](Dependencies& dependencies) {
119 for (const auto& dependency : newDependencies) {
120 if (!dependencies.count(dependency)) {
121 dependency->addDependent(this->shared_from_this());
122 }
123 }
124
125 for (const auto& dependency : dependencies) {
126 if (!newDependencies.count(dependency)) {
127 dependency->removeStaleDependents();
128 }
129 }
130
131 dependencies = std::move(newDependencies);
132 });
133 }
134
135 auto dependents = dependents_.copy();
136
137 for (const auto& dependentWeak : dependents) {
138 if (auto dependent = dependentWeak.lock()) {
139 ObserverManager::scheduleRefresh(std::move(dependent), version);
140 }
141 }
142
143 return versionLastChange_;
144 }
145
setForceRefresh()146 void Core::setForceRefresh() {
147 forceRefresh_ = true;
148 }
149
Core(folly::Function<std::shared_ptr<const void> ()> creator)150 Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
151 : creator_(std::move(creator)) {}
152
~Core()153 Core::~Core() {
154 dependencies_.withWLock([](const Dependencies& dependencies) {
155 for (const auto& dependecy : dependencies) {
156 dependecy->removeStaleDependents();
157 }
158 });
159 }
160
create(folly::Function<std::shared_ptr<const void> ()> creator)161 Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
162 auto core = Core::Ptr(new Core(std::move(creator)));
163 return core;
164 }
165
addDependent(Core::WeakPtr dependent)166 void Core::addDependent(Core::WeakPtr dependent) {
167 dependents_.withWLock([&](Dependents& dependents) {
168 dependents.push_back(std::move(dependent));
169 });
170 }
171
removeStaleDependents()172 void Core::removeStaleDependents() {
173 // This is inefficient, the assumption is that we won't have many dependents
174 dependents_.withWLock([](Dependents& deps) {
175 auto const pred = [](auto const& d) { return d.expired(); };
176 deps.erase(std::remove_if(deps.begin(), deps.end(), pred), deps.end());
177 });
178 }
179 } // namespace observer_detail
180 } // namespace folly
181