1 /* Copyright 2016-present Facebook, Inc.
2  * Licensed under the Apache License, Version 2.0 */
3 #include "PubSub.h"
4 #include <algorithm>
5 #include <iterator>
6 
7 namespace watchman {
8 
Subscriber(std::shared_ptr<Publisher> pub,Notifier notify,const w_string & info)9 Publisher::Subscriber::Subscriber(
10     std::shared_ptr<Publisher> pub,
11     Notifier notify,
12     const w_string& info)
13     : serial_(0),
14       publisher_(std::move(pub)),
15       notify_(notify),
16       info_(std::move(info)) {}
17 
~Subscriber()18 Publisher::Subscriber::~Subscriber() {
19   auto wlock = publisher_->state_.wlock();
20   auto it = wlock->subscribers.begin();
21   while (it != wlock->subscribers.end()) {
22     auto sub = it->lock();
23     // Prune vacated weak_ptr's or those that point to us
24     if (!sub || sub.get() == this) {
25       it = wlock->subscribers.erase(it);
26     } else {
27       ++it;
28     }
29   }
30   // Take this opportunity to reap anything that is no longer
31   // referenced now that we've removed some subscriber(s)
32   wlock->collectGarbage();
33 }
34 
getPending(std::vector<std::shared_ptr<const Item>> & pending)35 void Publisher::Subscriber::getPending(
36     std::vector<std::shared_ptr<const Item>>& pending) {
37   {
38     auto rlock = publisher_->state_.rlock();
39     auto& items = rlock->items;
40 
41     if (items.empty()) {
42       return;
43     }
44 
45     // First we walk back to find the end of the range that
46     // we have seen previously.
47     int firstIndex;
48     for (firstIndex = int(items.size()) - 1; firstIndex >= 0; --firstIndex) {
49       if (items[firstIndex]->serial <= serial_) {
50         break;
51       }
52     }
53 
54     // We found the item before the one we really want, so
55     // increment the index; we'll copy the remaining items.
56     ++firstIndex;
57     bool updated = false;
58 
59     while (firstIndex < int(items.size())) {
60       pending.push_back(items[firstIndex]);
61       ++firstIndex;
62       updated = true;
63     }
64 
65     if (updated) {
66       serial_ = pending.back()->serial;
67     }
68 
69     return;
70   }
71 }
72 
getPending(std::vector<std::shared_ptr<const Publisher::Item>> & items,const std::shared_ptr<Publisher::Subscriber> & sub1,const std::shared_ptr<Publisher::Subscriber> & sub2)73 void getPending(
74     std::vector<std::shared_ptr<const Publisher::Item>>& items,
75     const std::shared_ptr<Publisher::Subscriber>& sub1,
76     const std::shared_ptr<Publisher::Subscriber>& sub2) {
77   if (sub1) {
78     sub1->getPending(items);
79   }
80   if (sub2) {
81     sub2->getPending(items);
82   }
83 }
84 
subscribe(Notifier notify,const w_string & info)85 std::shared_ptr<Publisher::Subscriber> Publisher::subscribe(
86     Notifier notify,
87     const w_string& info) {
88   auto sub =
89       std::make_shared<Publisher::Subscriber>(shared_from_this(), notify, info);
90   state_.wlock()->subscribers.emplace_back(sub);
91   return sub;
92 }
93 
hasSubscribers() const94 bool Publisher::hasSubscribers() const {
95   return !state_.rlock()->subscribers.empty();
96 }
97 
collectGarbage()98 void Publisher::state::collectGarbage() {
99   if (items.empty()) {
100     return;
101   }
102 
103   uint64_t minSerial = std::numeric_limits<uint64_t>::max();
104   for (auto& it : subscribers) {
105     auto sub = it.lock();
106     if (sub) {
107       minSerial = std::min(minSerial, sub->getSerial());
108     }
109   }
110 
111   while (!items.empty() && items.front()->serial < minSerial) {
112     items.pop_front();
113   }
114 }
115 
enqueue(json_ref && payload)116 bool Publisher::enqueue(json_ref&& payload) {
117   std::vector<std::shared_ptr<Subscriber>> subscribers;
118 
119   {
120     auto wlock = state_.wlock();
121 
122     // We need to collect live references for the notify portion,
123     // but since we're holding the wlock, take this opportunity to
124     // detect and prune dead subscribers and clean up some garbage.
125     auto it = wlock->subscribers.begin();
126     while (it != wlock->subscribers.end()) {
127       auto sub = it->lock();
128       // Prune vacated weak_ptr's
129       if (!sub) {
130         it = wlock->subscribers.erase(it);
131       } else {
132         ++it;
133         // Remember that live reference so that we can notify
134         // outside of the lock below.
135         subscribers.emplace_back(std::move(sub));
136       }
137     }
138 
139     wlock->collectGarbage();
140 
141     if (subscribers.empty()) {
142       return false;
143     }
144 
145     auto item = std::make_shared<Item>();
146     item->payload = std::move(payload);
147     item->serial = wlock->nextSerial++;
148     wlock->items.emplace_back(std::move(item));
149   }
150 
151   // and notify them outside of the lock
152   for (auto& sub : subscribers) {
153     auto& n = sub->getNotify();
154     if (n) {
155       n();
156     }
157   }
158   return true;
159 }
160 
getDebugInfo() const161 json_ref Publisher::getDebugInfo() const {
162   auto ret = json_object();
163 
164   auto rlock = state_.rlock();
165   ret.set("next_serial", json_integer(rlock->nextSerial));
166 
167   auto subscribers = json_array();
168   auto& subscribers_arr = subscribers.array();
169 
170   for (auto& sub_ref : rlock->subscribers) {
171     auto sub = sub_ref.lock();
172     if (sub) {
173       auto sub_json = json_object({{"serial", json_integer(sub->getSerial())},
174                                    {"info", w_string_to_json(sub->getInfo())}});
175       subscribers_arr.emplace_back(sub_json);
176     } else {
177       // This is a subscriber that is now dead. It will be cleaned up the next
178       // time enqueue is called.
179     }
180   }
181 
182   ret.set("subscribers", std::move(subscribers));
183 
184   auto items = json_array();
185   auto& items_arr = items.array();
186 
187   for (auto& item : rlock->items) {
188     auto item_json = json_object(
189         {{"serial", json_integer(item->serial)}, {"payload", item->payload}});
190     items_arr.emplace_back(item_json);
191   }
192 
193   ret.set("items", std::move(items));
194 
195   return ret;
196 }
197 }
198