1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 namespace folly {
20 namespace observer {
21
22 namespace detail {
23
24 template <typename Observable, typename Traits>
25 class ObserverCreatorContext {
26 using T = typename Traits::element_type;
27
28 public:
29 template <typename... Args>
ObserverCreatorContext(Args &&...args)30 ObserverCreatorContext(Args&&... args)
31 : observable_(std::forward<Args>(args)...) {
32 state_.unsafeGetUnlocked().updateValue(Traits::get(observable_));
33 }
34
~ObserverCreatorContext()35 ~ObserverCreatorContext() {
36 if (state_.unsafeGetUnlocked().value) {
37 Traits::unsubscribe(observable_);
38 }
39 }
40
setCore(observer_detail::Core::WeakPtr coreWeak)41 void setCore(observer_detail::Core::WeakPtr coreWeak) {
42 coreWeak_ = std::move(coreWeak);
43 }
44
get()45 std::shared_ptr<const T> get() {
46 auto state = state_.lock();
47 state->updateRequested = false;
48 return state->value;
49 }
50
update()51 observer_detail::Core::Ptr update() noexcept {
52 try {
53 // This mutex ensures there's no race condition between initial update()
54 // call and update() calls from the subsciption callback.
55 //
56 // Additionally it helps avoid races between two different subscription
57 // callbacks (getting new value from observable and storing it into value_
58 // is not atomic).
59 //
60 // Note that state_ lock is acquired only after Traits::get. Traits::get
61 // is running application code (that may acquire locks) and so it's
62 // important to not hold state_ lock while running it to avoid possible
63 // lock inversion with another code path that needs state_ lock (e.g.
64 // get()).
65 std::lock_guard<std::mutex> updateLockGuard(updateLock_);
66 auto newValue = Traits::get(observable_);
67
68 auto state = state_.lock();
69 if (!state->updateValue(std::move(newValue))) {
70 // Value didn't change, so we can skip the version update.
71 return nullptr;
72 }
73
74 if (!std::exchange(state->updateRequested, true)) {
75 return coreWeak_.lock();
76 }
77 } catch (...) {
78 LOG(ERROR) << "Observer update failed: "
79 << folly::exceptionStr(std::current_exception());
80 }
81
82 return nullptr;
83 }
84
85 template <typename F>
subscribe(F && callback)86 void subscribe(F&& callback) {
87 Traits::subscribe(observable_, std::forward<F>(callback));
88 }
89
90 private:
91 std::mutex updateLock_;
92 struct State {
updateValueState93 bool updateValue(std::shared_ptr<const T> newValue) {
94 auto newValuePtr = newValue.get();
95 if (!newValue) {
96 throw std::logic_error("Observable returned nullptr.");
97 }
98 value.swap(newValue);
99 return newValuePtr != newValue.get();
100 }
101
102 std::shared_ptr<const T> value;
103 bool updateRequested{false};
104 };
105 folly::Synchronized<State, std::mutex> state_;
106
107 observer_detail::Core::WeakPtr coreWeak_;
108
109 Observable observable_;
110 };
111
112 } // namespace detail
113
114 // This master shared_ptr allows grabbing derived weak_ptrs, pointing to the
115 // the same Context object, but using a separate reference count. Primary
116 // shared_ptr destructor then blocks until all shared_ptrs obtained from
117 // derived weak_ptrs are released.
118 template <typename Observable, typename Traits>
119 class ObserverCreator<Observable, Traits>::ContextPrimaryPtr {
120 public:
ContextPrimaryPtr(std::shared_ptr<Context> context)121 explicit ContextPrimaryPtr(std::shared_ptr<Context> context)
122 : contextPrimary_(std::move(context)),
123 context_(
124 contextPrimary_.get(), [destroyBaton = destroyBaton_](Context*) {
125 destroyBaton->post();
126 }) {}
~ContextPrimaryPtr()127 ~ContextPrimaryPtr() {
128 if (context_) {
129 context_.reset();
130 destroyBaton_->wait();
131 }
132 }
133 ContextPrimaryPtr(const ContextPrimaryPtr&) = delete;
134 ContextPrimaryPtr(ContextPrimaryPtr&&) = default;
135 ContextPrimaryPtr& operator=(const ContextPrimaryPtr&) = delete;
136 ContextPrimaryPtr& operator=(ContextPrimaryPtr&&) = default;
137
138 Context* operator->() const { return contextPrimary_.get(); }
139
get_weak()140 std::weak_ptr<Context> get_weak() { return context_; }
141
142 private:
143 std::shared_ptr<folly::Baton<>> destroyBaton_{
144 std::make_shared<folly::Baton<>>()};
145 std::shared_ptr<Context> contextPrimary_;
146 std::shared_ptr<Context> context_;
147 };
148
149 template <typename Observable, typename Traits>
150 template <typename... Args>
ObserverCreator(Args &&...args)151 ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
152 : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
153
154 template <typename Observable, typename Traits>
155 Observer<typename ObserverCreator<Observable, Traits>::T>
getObserver()156 ObserverCreator<Observable, Traits>::getObserver() && {
157 // We want to make sure that Context can only be destroyed when Core is
158 // destroyed. So we have to avoid the situation when subscribe callback is
159 // locking Context shared_ptr and remains the last to release it.
160 // We solve this by having Core hold the master shared_ptr and subscription
161 // callback gets derived weak_ptr.
162 ContextPrimaryPtr contextPrimary(context_);
163 auto contextWeak = contextPrimary.get_weak();
164 auto observer = makeObserver(
165 [context = std::move(contextPrimary)]() { return context->get(); });
166
167 context_->setCore(observer.core_);
168
169 auto scheduleUpdate = [contextWeak = std::move(contextWeak)] {
170 observer_detail::ObserverManager::scheduleRefreshNewVersion(
171 [contextWeak]() -> observer_detail::Core::Ptr {
172 if (auto context = contextWeak.lock()) {
173 return context->update();
174 }
175 return nullptr;
176 });
177 };
178
179 context_->subscribe(scheduleUpdate);
180
181 // Do an extra update in case observable was updated between observer creation
182 // and setting updates callback.
183 scheduleUpdate();
184
185 return observer;
186 }
187 } // namespace observer
188 } // namespace folly
189