1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 namespace folly {
20 namespace observer {
21 
22 namespace detail {
23 
24 template <typename Observable, typename Traits>
25 class ObserverCreatorContext {
26   using T = typename Traits::element_type;
27 
28  public:
29   template <typename... Args>
ObserverCreatorContext(Args &&...args)30   ObserverCreatorContext(Args&&... args)
31       : observable_(std::forward<Args>(args)...) {
32     state_.unsafeGetUnlocked().updateValue(Traits::get(observable_));
33   }
34 
~ObserverCreatorContext()35   ~ObserverCreatorContext() {
36     if (state_.unsafeGetUnlocked().value) {
37       Traits::unsubscribe(observable_);
38     }
39   }
40 
setCore(observer_detail::Core::WeakPtr coreWeak)41   void setCore(observer_detail::Core::WeakPtr coreWeak) {
42     coreWeak_ = std::move(coreWeak);
43   }
44 
get()45   std::shared_ptr<const T> get() {
46     auto state = state_.lock();
47     state->updateRequested = false;
48     return state->value;
49   }
50 
update()51   observer_detail::Core::Ptr update() noexcept {
52     try {
53       // This mutex ensures there's no race condition between initial update()
54       // call and update() calls from the subsciption callback.
55       //
56       // Additionally it helps avoid races between two different subscription
57       // callbacks (getting new value from observable and storing it into value_
58       // is not atomic).
59       //
60       // Note that state_ lock is acquired only after Traits::get. Traits::get
61       // is running application code (that may acquire locks) and so it's
62       // important to not hold state_ lock while running it to avoid possible
63       // lock inversion with another code path that needs state_ lock (e.g.
64       // get()).
65       std::lock_guard<std::mutex> updateLockGuard(updateLock_);
66       auto newValue = Traits::get(observable_);
67 
68       auto state = state_.lock();
69       if (!state->updateValue(std::move(newValue))) {
70         // Value didn't change, so we can skip the version update.
71         return nullptr;
72       }
73 
74       if (!std::exchange(state->updateRequested, true)) {
75         return coreWeak_.lock();
76       }
77     } catch (...) {
78       LOG(ERROR) << "Observer update failed: "
79                  << folly::exceptionStr(std::current_exception());
80     }
81 
82     return nullptr;
83   }
84 
85   template <typename F>
subscribe(F && callback)86   void subscribe(F&& callback) {
87     Traits::subscribe(observable_, std::forward<F>(callback));
88   }
89 
90  private:
91   std::mutex updateLock_;
92   struct State {
updateValueState93     bool updateValue(std::shared_ptr<const T> newValue) {
94       auto newValuePtr = newValue.get();
95       if (!newValue) {
96         throw std::logic_error("Observable returned nullptr.");
97       }
98       value.swap(newValue);
99       return newValuePtr != newValue.get();
100     }
101 
102     std::shared_ptr<const T> value;
103     bool updateRequested{false};
104   };
105   folly::Synchronized<State, std::mutex> state_;
106 
107   observer_detail::Core::WeakPtr coreWeak_;
108 
109   Observable observable_;
110 };
111 
112 } // namespace detail
113 
114 // This master shared_ptr allows grabbing derived weak_ptrs, pointing to the
115 // the same Context object, but using a separate reference count. Primary
116 // shared_ptr destructor then blocks until all shared_ptrs obtained from
117 // derived weak_ptrs are released.
118 template <typename Observable, typename Traits>
119 class ObserverCreator<Observable, Traits>::ContextPrimaryPtr {
120  public:
ContextPrimaryPtr(std::shared_ptr<Context> context)121   explicit ContextPrimaryPtr(std::shared_ptr<Context> context)
122       : contextPrimary_(std::move(context)),
123         context_(
124             contextPrimary_.get(), [destroyBaton = destroyBaton_](Context*) {
125               destroyBaton->post();
126             }) {}
~ContextPrimaryPtr()127   ~ContextPrimaryPtr() {
128     if (context_) {
129       context_.reset();
130       destroyBaton_->wait();
131     }
132   }
133   ContextPrimaryPtr(const ContextPrimaryPtr&) = delete;
134   ContextPrimaryPtr(ContextPrimaryPtr&&) = default;
135   ContextPrimaryPtr& operator=(const ContextPrimaryPtr&) = delete;
136   ContextPrimaryPtr& operator=(ContextPrimaryPtr&&) = default;
137 
138   Context* operator->() const { return contextPrimary_.get(); }
139 
get_weak()140   std::weak_ptr<Context> get_weak() { return context_; }
141 
142  private:
143   std::shared_ptr<folly::Baton<>> destroyBaton_{
144       std::make_shared<folly::Baton<>>()};
145   std::shared_ptr<Context> contextPrimary_;
146   std::shared_ptr<Context> context_;
147 };
148 
149 template <typename Observable, typename Traits>
150 template <typename... Args>
ObserverCreator(Args &&...args)151 ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
152     : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
153 
154 template <typename Observable, typename Traits>
155 Observer<typename ObserverCreator<Observable, Traits>::T>
getObserver()156 ObserverCreator<Observable, Traits>::getObserver() && {
157   // We want to make sure that Context can only be destroyed when Core is
158   // destroyed. So we have to avoid the situation when subscribe callback is
159   // locking Context shared_ptr and remains the last to release it.
160   // We solve this by having Core hold the master shared_ptr and subscription
161   // callback gets derived weak_ptr.
162   ContextPrimaryPtr contextPrimary(context_);
163   auto contextWeak = contextPrimary.get_weak();
164   auto observer = makeObserver(
165       [context = std::move(contextPrimary)]() { return context->get(); });
166 
167   context_->setCore(observer.core_);
168 
169   auto scheduleUpdate = [contextWeak = std::move(contextWeak)] {
170     observer_detail::ObserverManager::scheduleRefreshNewVersion(
171         [contextWeak]() -> observer_detail::Core::Ptr {
172           if (auto context = contextWeak.lock()) {
173             return context->update();
174           }
175           return nullptr;
176         });
177   };
178 
179   context_->subscribe(scheduleUpdate);
180 
181   // Do an extra update in case observable was updated between observer creation
182   // and setting updates callback.
183   scheduleUpdate();
184 
185   return observer;
186 }
187 } // namespace observer
188 } // namespace folly
189