1"""Provide helper classes used by other models.""" 2import random 3import time 4 5 6class BoundedSet(object): 7 """A set with a maximum size that evicts the oldest items when necessary. 8 9 This class does not implement the complete set interface. 10 """ 11 12 def __init__(self, max_items): 13 """Construct an instance of the BoundedSet.""" 14 self.max_items = max_items 15 self._fifo = [] 16 self._set = set() 17 18 def __contains__(self, item): 19 """Test if the BoundedSet contains item.""" 20 return item in self._set 21 22 def add(self, item): 23 """Add an item to the set discarding the oldest item if necessary.""" 24 if len(self._set) == self.max_items: 25 self._set.remove(self._fifo.pop(0)) 26 self._fifo.append(item) 27 self._set.add(item) 28 29 30class ExponentialCounter(object): 31 """A class to provide an exponential counter with jitter.""" 32 33 def __init__(self, max_counter): 34 """Initialize an instance of ExponentialCounter. 35 36 :param max_counter: The maximum base value. Note that the computed 37 value may be 3.125% higher due to jitter. 38 """ 39 self._base = 1 40 self._max = max_counter 41 42 def counter(self): 43 """Increment the counter and return the current value with jitter.""" 44 max_jitter = self._base / 16.0 45 value = self._base + random.random() * max_jitter - max_jitter / 2 46 self._base = min(self._base * 2, self._max) 47 return value 48 49 def reset(self): 50 """Reset the counter to 1.""" 51 self._base = 1 52 53 54def permissions_string(permissions, known_permissions): 55 """Return a comma separated string of permission changes. 56 57 :param permissions: A list of strings, or ``None``. These strings can 58 exclusively contain ``+`` or ``-`` prefixes, or contain no prefixes at 59 all. When prefixed, the resulting string will simply be the joining of 60 these inputs. When not prefixed, all permissions are considered to be 61 additions, and all permissions in the ``known_permissions`` set that 62 aren't provided are considered to be removals. When None, the result is 63 ``+all``. 64 :param known_permissions: A set of strings representing the available 65 permissions. 66 67 """ 68 to_set = [] 69 if permissions is None: 70 to_set = ["+all"] 71 else: 72 to_set = ["-all"] 73 omitted = sorted(known_permissions - set(permissions)) 74 to_set.extend("-{}".format(x) for x in omitted) 75 to_set.extend("+{}".format(x) for x in permissions) 76 return ",".join(to_set) 77 78 79def stream_generator( 80 function, 81 pause_after=None, 82 skip_existing=False, 83 attribute_name="fullname", 84 **function_kwargs 85): 86 """Yield new items from ListingGenerators and ``None`` when paused. 87 88 :param function: A callable that returns a ListingGenerator, e.g. 89 ``subreddit.comments`` or ``subreddit.new``. 90 91 :param pause_after: An integer representing the number of requests that 92 result in no new items before this function yields ``None``, 93 effectively introducing a pause into the stream. A negative value 94 yields ``None`` after items from a single response have been yielded, 95 regardless of number of new items obtained in that response. A value of 96 ``0`` yields ``None`` after every response resulting in no new items, 97 and a value of ``None`` never introduces a pause (default: None). 98 99 :param skip_existing: When True does not yield any results from the first 100 request thereby skipping any items that existed in the stream prior to 101 starting the stream (default: False). 102 103 :param attribute_name: The field to use as an id (default: "fullname"). 104 105 Additional keyword arguments will be passed to ``function``. 106 107 .. note:: This function internally uses an exponential delay with jitter 108 between subsequent responses that contain no new results, up to a 109 maximum delay of just over a 16 seconds. In practice that means that the 110 time before pause for ``pause_after=N+1`` is approximately twice the 111 time before pause for ``pause_after=N``. 112 113 For example, to create a stream of comment replies, try: 114 115 .. code:: python 116 117 reply_function = reddit.inbox.comment_replies 118 for reply in praw.models.util.stream_generator(reply_function): 119 print(reply) 120 121 To pause a comment stream after six responses with no new 122 comments, try: 123 124 .. code:: python 125 126 subreddit = reddit.subreddit('redditdev') 127 for comment in subreddit.stream.comments(pause_after=6): 128 if comment is None: 129 break 130 print(comment) 131 132 To resume fetching comments after a pause, try: 133 134 .. code:: python 135 136 subreddit = reddit.subreddit('help') 137 comment_stream = subreddit.stream.comments(pause_after=5) 138 139 for comment in comment_stream: 140 if comment is None: 141 break 142 print(comment) 143 # Do any other processing, then try to fetch more data 144 for comment in comment_stream: 145 if comment is None: 146 break 147 print(comment) 148 149 To bypass the internal exponential backoff, try the following. This 150 approach is useful if you are monitoring a subreddit with infrequent 151 activity, and you want the to consistently learn about new items from the 152 stream as soon as possible, rather than up to a delay of just over sixteen 153 seconds. 154 155 .. code:: python 156 157 subreddit = reddit.subreddit('help') 158 for comment in subreddit.stream.comments(pause_after=0): 159 if comment is None: 160 continue 161 print(comment) 162 163 """ 164 before_attribute = None 165 exponential_counter = ExponentialCounter(max_counter=16) 166 seen_attributes = BoundedSet(301) 167 without_before_counter = 0 168 responses_without_new = 0 169 valid_pause_after = pause_after is not None 170 while True: 171 found = False 172 newest_attribute = None 173 limit = 100 174 if before_attribute is None: 175 limit -= without_before_counter 176 without_before_counter = (without_before_counter + 1) % 30 177 for item in reversed( 178 list( 179 function( 180 limit=limit, 181 params={"before": before_attribute}, 182 **function_kwargs 183 ) 184 ) 185 ): 186 attribute = getattr(item, attribute_name) 187 if attribute in seen_attributes: 188 continue 189 found = True 190 seen_attributes.add(attribute) 191 newest_attribute = attribute 192 if not skip_existing: 193 yield item 194 before_attribute = newest_attribute 195 skip_existing = False 196 if valid_pause_after and pause_after < 0: 197 yield None 198 elif found: 199 exponential_counter.reset() 200 responses_without_new = 0 201 else: 202 responses_without_new += 1 203 if valid_pause_after and responses_without_new > pause_after: 204 exponential_counter.reset() 205 responses_without_new = 0 206 yield None 207 else: 208 time.sleep(exponential_counter.counter()) 209