1 /* Copyright 2016-present Facebook, Inc.
2 * Licensed under the Apache License, Version 2.0 */
3 #include "PubSub.h"
4 #include <algorithm>
5 #include <iterator>
6
7 namespace watchman {
8
Subscriber(std::shared_ptr<Publisher> pub,Notifier notify,const w_string & info)9 Publisher::Subscriber::Subscriber(
10 std::shared_ptr<Publisher> pub,
11 Notifier notify,
12 const w_string& info)
13 : serial_(0),
14 publisher_(std::move(pub)),
15 notify_(notify),
16 info_(std::move(info)) {}
17
~Subscriber()18 Publisher::Subscriber::~Subscriber() {
19 auto wlock = publisher_->state_.wlock();
20 auto it = wlock->subscribers.begin();
21 while (it != wlock->subscribers.end()) {
22 auto sub = it->lock();
23 // Prune vacated weak_ptr's or those that point to us
24 if (!sub || sub.get() == this) {
25 it = wlock->subscribers.erase(it);
26 } else {
27 ++it;
28 }
29 }
30 // Take this opportunity to reap anything that is no longer
31 // referenced now that we've removed some subscriber(s)
32 wlock->collectGarbage();
33 }
34
getPending(std::vector<std::shared_ptr<const Item>> & pending)35 void Publisher::Subscriber::getPending(
36 std::vector<std::shared_ptr<const Item>>& pending) {
37 {
38 auto rlock = publisher_->state_.rlock();
39 auto& items = rlock->items;
40
41 if (items.empty()) {
42 return;
43 }
44
45 // First we walk back to find the end of the range that
46 // we have seen previously.
47 int firstIndex;
48 for (firstIndex = int(items.size()) - 1; firstIndex >= 0; --firstIndex) {
49 if (items[firstIndex]->serial <= serial_) {
50 break;
51 }
52 }
53
54 // We found the item before the one we really want, so
55 // increment the index; we'll copy the remaining items.
56 ++firstIndex;
57 bool updated = false;
58
59 while (firstIndex < int(items.size())) {
60 pending.push_back(items[firstIndex]);
61 ++firstIndex;
62 updated = true;
63 }
64
65 if (updated) {
66 serial_ = pending.back()->serial;
67 }
68
69 return;
70 }
71 }
72
getPending(std::vector<std::shared_ptr<const Publisher::Item>> & items,const std::shared_ptr<Publisher::Subscriber> & sub1,const std::shared_ptr<Publisher::Subscriber> & sub2)73 void getPending(
74 std::vector<std::shared_ptr<const Publisher::Item>>& items,
75 const std::shared_ptr<Publisher::Subscriber>& sub1,
76 const std::shared_ptr<Publisher::Subscriber>& sub2) {
77 if (sub1) {
78 sub1->getPending(items);
79 }
80 if (sub2) {
81 sub2->getPending(items);
82 }
83 }
84
subscribe(Notifier notify,const w_string & info)85 std::shared_ptr<Publisher::Subscriber> Publisher::subscribe(
86 Notifier notify,
87 const w_string& info) {
88 auto sub =
89 std::make_shared<Publisher::Subscriber>(shared_from_this(), notify, info);
90 state_.wlock()->subscribers.emplace_back(sub);
91 return sub;
92 }
93
hasSubscribers() const94 bool Publisher::hasSubscribers() const {
95 return !state_.rlock()->subscribers.empty();
96 }
97
collectGarbage()98 void Publisher::state::collectGarbage() {
99 if (items.empty()) {
100 return;
101 }
102
103 uint64_t minSerial = std::numeric_limits<uint64_t>::max();
104 for (auto& it : subscribers) {
105 auto sub = it.lock();
106 if (sub) {
107 minSerial = std::min(minSerial, sub->getSerial());
108 }
109 }
110
111 while (!items.empty() && items.front()->serial < minSerial) {
112 items.pop_front();
113 }
114 }
115
enqueue(json_ref && payload)116 bool Publisher::enqueue(json_ref&& payload) {
117 std::vector<std::shared_ptr<Subscriber>> subscribers;
118
119 {
120 auto wlock = state_.wlock();
121
122 // We need to collect live references for the notify portion,
123 // but since we're holding the wlock, take this opportunity to
124 // detect and prune dead subscribers and clean up some garbage.
125 auto it = wlock->subscribers.begin();
126 while (it != wlock->subscribers.end()) {
127 auto sub = it->lock();
128 // Prune vacated weak_ptr's
129 if (!sub) {
130 it = wlock->subscribers.erase(it);
131 } else {
132 ++it;
133 // Remember that live reference so that we can notify
134 // outside of the lock below.
135 subscribers.emplace_back(std::move(sub));
136 }
137 }
138
139 wlock->collectGarbage();
140
141 if (subscribers.empty()) {
142 return false;
143 }
144
145 auto item = std::make_shared<Item>();
146 item->payload = std::move(payload);
147 item->serial = wlock->nextSerial++;
148 wlock->items.emplace_back(std::move(item));
149 }
150
151 // and notify them outside of the lock
152 for (auto& sub : subscribers) {
153 auto& n = sub->getNotify();
154 if (n) {
155 n();
156 }
157 }
158 return true;
159 }
160
getDebugInfo() const161 json_ref Publisher::getDebugInfo() const {
162 auto ret = json_object();
163
164 auto rlock = state_.rlock();
165 ret.set("next_serial", json_integer(rlock->nextSerial));
166
167 auto subscribers = json_array();
168 auto& subscribers_arr = subscribers.array();
169
170 for (auto& sub_ref : rlock->subscribers) {
171 auto sub = sub_ref.lock();
172 if (sub) {
173 auto sub_json = json_object({{"serial", json_integer(sub->getSerial())},
174 {"info", w_string_to_json(sub->getInfo())}});
175 subscribers_arr.emplace_back(sub_json);
176 } else {
177 // This is a subscriber that is now dead. It will be cleaned up the next
178 // time enqueue is called.
179 }
180 }
181
182 ret.set("subscribers", std::move(subscribers));
183
184 auto items = json_array();
185 auto& items_arr = items.array();
186
187 for (auto& item : rlock->items) {
188 auto item_json = json_object(
189 {{"serial", json_integer(item->serial)}, {"payload", item->payload}});
190 items_arr.emplace_back(item_json);
191 }
192
193 ret.set("items", std::move(items));
194
195 return ret;
196 }
197 }
198