1 /* Copyright 2016-present Facebook, Inc. 2 * Licensed under the Apache License, Version 2.0 */ 3 #pragma once 4 #include "watchman_system.h" 5 #include "watchman_synchronized.h" 6 #include "watchman_string.h" 7 #include "thirdparty/jansson/jansson.h" 8 9 #include <deque> 10 #include <functional> 11 #include <vector> 12 13 namespace watchman { 14 15 class Publisher : public std::enable_shared_from_this<Publisher> { 16 public: 17 struct Item { 18 // copy of nextSerial_ at the time this was created. 19 // The item can be released when all subscribers have 20 // observed this serial number. 21 uint64_t serial; 22 json_ref payload; 23 }; 24 25 // Generic callback that subscribers can register to arrange 26 // to be woken up when something is published 27 using Notifier = std::function<void()>; 28 29 // Each subscriber is represented by one of these 30 class Subscriber : public std::enable_shared_from_this<Subscriber> { 31 // The serial of the last Item to be consumed by 32 // this subscriber. 33 uint64_t serial_; 34 // Subscriber keeps the publisher alive so that no Items are lost 35 // if the Publisher is released before all of the subscribers. 36 std::shared_ptr<Publisher> publisher_; 37 // Advising the subscriber that there may be more items available 38 Notifier notify_; 39 // Information for debugging purposes 40 const w_string info_; 41 42 public: 43 ~Subscriber(); 44 Subscriber( 45 std::shared_ptr<Publisher> pub, 46 Notifier notify, 47 const w_string& info); 48 Subscriber(const Subscriber&) = delete; 49 50 // Returns all as yet unseen published items for this subscriber. 51 void getPending(std::vector<std::shared_ptr<const Item>>& pending); 52 getSerial()53 inline uint64_t getSerial() const { 54 return serial_; 55 } 56 getNotify()57 inline Notifier& getNotify() { 58 return notify_; 59 } 60 getInfo()61 inline const w_string& getInfo() { 62 return info_; 63 } 64 }; 65 66 // Register a new subscriber. 67 // When the Subscriber object is released, the registration is 68 // automatically removed. 69 std::shared_ptr<Subscriber> subscribe( 70 Notifier notify, 71 const w_string& info = nullptr); 72 73 // Returns true if there are any subscribers. 74 // This is racy and intended to be used to gate building a payload 75 // if there are no current subscribers. 76 bool hasSubscribers() const; 77 78 // Enqueue a new item, but only if there are subscribers. 79 // Returns true if the item was queued. 80 bool enqueue(json_ref&& payload); 81 82 // Return debugging info useful for state inspection. 83 json_ref getDebugInfo() const; 84 85 private: 86 struct state { 87 state() = default; 88 state(const state&) = delete; 89 // Serial number to use for the next Item 90 uint64_t nextSerial{1}; 91 // The stream of Items 92 std::deque<std::shared_ptr<const Item>> items; 93 // The subscribers 94 std::vector<std::weak_ptr<Subscriber>> subscribers; 95 96 void collectGarbage(); 97 void enqueue(json_ref&& payload); 98 }; 99 Synchronized<state> state_; 100 101 friend class Subscriber; 102 }; 103 104 // Equivalent to calling getPending on up to two Subscriber and 105 // joining the resultant vectors together. 106 void getPending( 107 std::vector<std::shared_ptr<const Publisher::Item>>& pending, 108 const std::shared_ptr<Publisher::Subscriber>& sub1, 109 const std::shared_ptr<Publisher::Subscriber>& sub2); 110 } 111