1 /* Copyright 2016-present Facebook, Inc.
2  * Licensed under the Apache License, Version 2.0 */
3 #pragma once
4 #include "watchman_system.h"
5 #include "watchman_synchronized.h"
6 #include "watchman_string.h"
7 #include "thirdparty/jansson/jansson.h"
8 
9 #include <deque>
10 #include <functional>
11 #include <vector>
12 
13 namespace watchman {
14 
15 class Publisher : public std::enable_shared_from_this<Publisher> {
16  public:
17   struct Item {
18     // copy of nextSerial_ at the time this was created.
19     // The item can be released when all subscribers have
20     // observed this serial number.
21     uint64_t serial;
22     json_ref payload;
23   };
24 
25   // Generic callback that subscribers can register to arrange
26   // to be woken up when something is published
27   using Notifier = std::function<void()>;
28 
29   // Each subscriber is represented by one of these
30   class Subscriber : public std::enable_shared_from_this<Subscriber> {
31     // The serial of the last Item to be consumed by
32     // this subscriber.
33     uint64_t serial_;
34     // Subscriber keeps the publisher alive so that no Items are lost
35     // if the Publisher is released before all of the subscribers.
36     std::shared_ptr<Publisher> publisher_;
37     // Advising the subscriber that there may be more items available
38     Notifier notify_;
39     // Information for debugging purposes
40     const w_string info_;
41 
42    public:
43     ~Subscriber();
44     Subscriber(
45         std::shared_ptr<Publisher> pub,
46         Notifier notify,
47         const w_string& info);
48     Subscriber(const Subscriber&) = delete;
49 
50     // Returns all as yet unseen published items for this subscriber.
51     void getPending(std::vector<std::shared_ptr<const Item>>& pending);
52 
getSerial()53     inline uint64_t getSerial() const {
54       return serial_;
55     }
56 
getNotify()57     inline Notifier& getNotify() {
58       return notify_;
59     }
60 
getInfo()61     inline const w_string& getInfo() {
62       return info_;
63     }
64   };
65 
66   // Register a new subscriber.
67   // When the Subscriber object is released, the registration is
68   // automatically removed.
69   std::shared_ptr<Subscriber> subscribe(
70       Notifier notify,
71       const w_string& info = nullptr);
72 
73   // Returns true if there are any subscribers.
74   // This is racy and intended to be used to gate building a payload
75   // if there are no current subscribers.
76   bool hasSubscribers() const;
77 
78   // Enqueue a new item, but only if there are subscribers.
79   // Returns true if the item was queued.
80   bool enqueue(json_ref&& payload);
81 
82   // Return debugging info useful for state inspection.
83   json_ref getDebugInfo() const;
84 
85  private:
86   struct state {
87     state() = default;
88     state(const state&) = delete;
89     // Serial number to use for the next Item
90     uint64_t nextSerial{1};
91     // The stream of Items
92     std::deque<std::shared_ptr<const Item>> items;
93     // The subscribers
94     std::vector<std::weak_ptr<Subscriber>> subscribers;
95 
96     void collectGarbage();
97     void enqueue(json_ref&& payload);
98   };
99   Synchronized<state> state_;
100 
101   friend class Subscriber;
102 };
103 
104 // Equivalent to calling getPending on up to two Subscriber and
105 // joining the resultant vectors together.
106 void getPending(
107     std::vector<std::shared_ptr<const Publisher::Item>>& pending,
108     const std::shared_ptr<Publisher::Subscriber>& sub1,
109     const std::shared_ptr<Publisher::Subscriber>& sub2);
110 }
111