1 /*
2 * pubsub.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdbclient/NativeAPI.actor.h"
22 #include "fdbserver/pubsub.h"
23 #include "flow/actorcompiler.h" // This must be the last #include.
24
uInt64ToValue(uint64_t v)25 Value uInt64ToValue( uint64_t v ) {
26 return StringRef(format("%016llx", v));
27 }
valueToUInt64(const StringRef & v)28 uint64_t valueToUInt64( const StringRef& v ) {
29 uint64_t x = 0;
30 sscanf( v.toString().c_str(), "%llx", &x );
31 return x;
32 }
33
keyForInbox(uint64_t inbox)34 Key keyForInbox(uint64_t inbox) {
35 return StringRef(format("i/%016llx", inbox));
36 }
keyForInboxSubcription(uint64_t inbox,uint64_t feed)37 Key keyForInboxSubcription(uint64_t inbox, uint64_t feed) {
38 return StringRef(format("i/%016llx/subs/%016llx", inbox, feed));
39 }
keyForInboxSubcriptionCount(uint64_t inbox)40 Key keyForInboxSubcriptionCount(uint64_t inbox) {
41 return StringRef(format("i/%016llx/subsCnt", inbox));
42 }
keyForInboxStalePrefix(uint64_t inbox)43 Key keyForInboxStalePrefix(uint64_t inbox) {
44 return StringRef(format("i/%016llx/stale/", inbox));
45 }
keyForInboxStaleFeed(uint64_t inbox,uint64_t feed)46 Key keyForInboxStaleFeed(uint64_t inbox, uint64_t feed) {
47 return StringRef(format("i/%016llx/stale/%016llx", inbox, feed));
48 }
keyForInboxCacheByIDPrefix(uint64_t inbox)49 Key keyForInboxCacheByIDPrefix(uint64_t inbox) {
50 return StringRef(format("i/%016llx/cid/", inbox));
51 }
keyForInboxCacheByID(uint64_t inbox,uint64_t messageId)52 Key keyForInboxCacheByID(uint64_t inbox, uint64_t messageId) {
53 return StringRef(format("i/%016llx/cid/%016llx", inbox, messageId));
54 }
keyForInboxCacheByFeedPrefix(uint64_t inbox)55 Key keyForInboxCacheByFeedPrefix(uint64_t inbox) {
56 return StringRef(format("i/%016llx/cf/", inbox));
57 }
keyForInboxCacheByFeed(uint64_t inbox,uint64_t feed)58 Key keyForInboxCacheByFeed(uint64_t inbox, uint64_t feed) {
59 return StringRef(format("i/%016llx/cf/%016llx", inbox, feed));
60 }
61
keyForFeed(uint64_t feed)62 Key keyForFeed(uint64_t feed) {
63 return StringRef(format("f/%016llx", feed));
64 }
keyForFeedSubcriber(uint64_t feed,uint64_t inbox)65 Key keyForFeedSubcriber(uint64_t feed, uint64_t inbox) {
66 return StringRef(format("f/%016llx/subs/%016llx", feed, inbox));
67 }
keyForFeedSubcriberCount(uint64_t feed)68 Key keyForFeedSubcriberCount(uint64_t feed) {
69 return StringRef(format("f/%016llx/subscCnt", feed));
70 }
keyForFeedMessage(uint64_t feed,uint64_t message)71 Key keyForFeedMessage(uint64_t feed, uint64_t message) {
72 return StringRef(format("f/%016llx/m/%016llx", feed, message));
73 }
keyForFeedMessagePrefix(uint64_t feed)74 Key keyForFeedMessagePrefix(uint64_t feed) {
75 return StringRef(format("f/%016llx/m/", feed));
76 }
keyForFeedMessageCount(uint64_t feed)77 Key keyForFeedMessageCount(uint64_t feed) {
78 return StringRef(format("f/%016llx/messCount", feed));
79 }
80 // the following should go at some point: change over to range query of count 1 from feed message list
keyForFeedLatestMessage(uint64_t feed)81 Key keyForFeedLatestMessage(uint64_t feed) {
82 return StringRef(format("f/%016llx/latestMessID", feed));
83 }
keyForFeedWatcherPrefix(uint64_t feed)84 Key keyForFeedWatcherPrefix(uint64_t feed) {
85 return StringRef(format("f/%016llx/watchers/", feed));
86 }
keyForFeedWatcher(uint64_t feed,uint64_t inbox)87 Key keyForFeedWatcher(uint64_t feed, uint64_t inbox) {
88 return StringRef(format("f/%016llx/watchers/%016llx", feed, inbox));
89 }
90
91 Standalone<StringRef> messagePrefix(LiteralStringRef("m/"));
92
keyForMessage(uint64_t message)93 Key keyForMessage(uint64_t message) {
94 return StringRef(format("m/%016llx", message));
95 }
96
keyForDisptchEntry(uint64_t message)97 Key keyForDisptchEntry(uint64_t message) {
98 return StringRef(format("d/%016llx", message));
99 }
100
PubSub(Database _cx)101 PubSub::PubSub(Database _cx)
102 : cx(_cx)
103 {
104 }
105
_createFeed(Database cx,Standalone<StringRef> metadata)106 ACTOR Future<uint64_t> _createFeed(Database cx, Standalone<StringRef> metadata) {
107 state uint64_t id(g_random->randomUniqueID().first()); // SOMEDAY: this should be an atomic increment
108 TraceEvent("PubSubCreateFeed").detail("Feed", id);
109 state Transaction tr(cx);
110 loop {
111 try {
112 state Optional<Value> val = wait(tr.get(keyForFeed(id)));
113 while(val.present()) {
114 id = id + g_random->randomInt(1, 100);
115 Optional<Value> v = wait(tr.get(keyForFeed(id)));
116 val = v;
117 }
118 tr.set(keyForFeed(id), metadata);
119 tr.set(keyForFeedSubcriberCount(id), uInt64ToValue(0));
120 tr.set(keyForFeedMessageCount(id), uInt64ToValue(0));
121 wait(tr.commit());
122 break;
123 } catch(Error& e) {
124 wait( tr.onError(e) );
125 }
126 }
127 return id;
128 }
129
createFeed(Standalone<StringRef> metadata)130 Future<uint64_t> PubSub::createFeed(Standalone<StringRef> metadata) {
131 return _createFeed(cx, metadata);
132 }
133
_createInbox(Database cx,Standalone<StringRef> metadata)134 ACTOR Future<uint64_t> _createInbox(Database cx, Standalone<StringRef> metadata) {
135 state uint64_t id = g_random->randomUniqueID().first();
136 TraceEvent("PubSubCreateInbox").detail("Inbox", id);
137 state Transaction tr(cx);
138 loop {
139 try {
140 state Optional<Value> val = wait(tr.get(keyForInbox(id)));
141 while(val.present()) {
142 id += g_random->randomInt(1, 100);
143 Optional<Value> v = wait(tr.get(keyForFeed(id)));
144 val = v;
145 }
146 tr.set(keyForInbox(id), metadata);
147 tr.set(keyForInboxSubcriptionCount(id), uInt64ToValue(0));
148 wait( tr.commit() );
149 break;
150 } catch(Error& e) {
151 wait( tr.onError(e) );
152 }
153 }
154 return id;
155 }
156
createInbox(Standalone<StringRef> metadata)157 Future<uint64_t> PubSub::createInbox(Standalone<StringRef> metadata) {
158 return _createInbox(cx, metadata);
159 }
160
_createSubcription(Database cx,uint64_t feed,uint64_t inbox)161 ACTOR Future<bool> _createSubcription(Database cx, uint64_t feed, uint64_t inbox) {
162 state Transaction tr(cx);
163 TraceEvent("PubSubCreateSubscription").detail("Feed", feed).detail("Inbox", inbox);
164 loop {
165 try {
166 Optional<Value> subcription = wait(tr.get(keyForInboxSubcription(inbox, feed)));
167 if(subcription.present()) {
168 // For idempotency, this could exist from a previous transaction from us that succeeded
169 return true;
170 }
171 Optional<Value> inboxVal = wait(tr.get(keyForInbox(inbox)));
172 if(!inboxVal.present()) {
173 return false;
174 }
175 Optional<Value> feedVal = wait(tr.get(keyForFeed(feed)));
176 if(!feedVal.present()) {
177 return false;
178 }
179
180 // Update the subscriptions of the inbox
181 Optional<Value> subcriptionCountVal = wait(tr.get(keyForInboxSubcriptionCount(inbox)));
182 uint64_t subcriptionCount = valueToUInt64(subcriptionCountVal.get()); // throws if count not present
183 tr.set(keyForInboxSubcription(inbox, feed), StringRef());
184 tr.set(keyForInboxSubcriptionCount(inbox), uInt64ToValue(subcriptionCount + 1));
185
186 // Update the subcribers of the feed
187 Optional<Value> subcriberCountVal = wait(tr.get(keyForFeedSubcriberCount(feed)));
188 uint64_t subcriberCount = valueToUInt64(subcriberCountVal.get()); // throws if count not present
189 tr.set(keyForFeedSubcriber(feed, inbox), StringRef());
190 tr.set(keyForFeedSubcriberCount(inbox), uInt64ToValue(subcriberCount + 1));
191
192 // Add inbox as watcher of feed.
193 tr.set(keyForFeedWatcher(feed, inbox), StringRef());
194
195 wait( tr.commit() );
196 break;
197 } catch(Error& e) {
198 wait( tr.onError(e) );
199 }
200 }
201 return true;
202 }
203
createSubcription(uint64_t feed,uint64_t inbox)204 Future<bool> PubSub::createSubcription(uint64_t feed, uint64_t inbox) {
205 return _createSubcription(cx, feed, inbox);
206 }
207
208 // Since we are not relying on "read-your-own-writes", we need to keep track of
209 // the highest-numbered inbox that we've cleared from the watchers list and
210 // make sure that further requests start after this inbox.
updateFeedWatchers(Transaction * tr,uint64_t feed)211 ACTOR Future<Void> updateFeedWatchers(Transaction *tr, uint64_t feed) {
212 state StringRef watcherPrefix = keyForFeedWatcherPrefix(feed);
213 state uint64_t highestInbox;
214 state bool first = true;
215 loop {
216 // Grab watching inboxes in swaths of 100
217 state Standalone<RangeResultRef> watchingInboxes = wait( (*tr).getRange(
218 firstGreaterOrEqual(keyForFeedWatcher(feed, first ? 0 : highestInbox + 1)),
219 firstGreaterOrEqual(keyForFeedWatcher(feed, UINT64_MAX)), 100 ) ); // REVIEW: does 100 make sense?
220 if(!watchingInboxes.size())
221 // If there are no watchers, return.
222 return Void();
223 first = false;
224 state int idx = 0;
225 for(; idx < watchingInboxes.size(); idx++) {
226 KeyRef key = watchingInboxes[idx].key;
227 StringRef inboxStr = key.removePrefix(watcherPrefix);
228 uint64_t inbox = valueToUInt64(inboxStr);
229 // add this feed to the stale list of inbox
230 (*tr).set(keyForInboxStaleFeed(inbox, feed), StringRef());
231 // remove the inbox from the list of watchers on this feed
232 (*tr).clear(key);
233 highestInbox = inbox;
234 }
235 if(watchingInboxes.size() < 100)
236 // If there were fewer watchers returned that we asked for, we're done.
237 return Void();
238 }
239 }
240
241 /*
242 * Posts a message to a feed. This updates the list of stale feeds to all watchers.
243 * Return: a per-feed (non-global) message ID.
244 *
245 * This needs many additions to make it "real"
246 * SOMEDAY: create better global message table to enforce cross-feed ordering.
247 * SOMEDAY: create a global "dispatching" list for feeds that have yet to fully update inboxes.
248 * Move feed in and remove watchers in one transaction, possibly
249 * SOMEDAY: create a global list of the most-subscribed-to feeds that all inbox reads check
250 */
_postMessage(Database cx,uint64_t feed,Standalone<StringRef> data)251 ACTOR Future<uint64_t> _postMessage(Database cx, uint64_t feed, Standalone<StringRef> data) {
252 state Transaction tr(cx);
253 state uint64_t messageId = UINT64_MAX - (uint64_t)now();
254 TraceEvent("PubSubPost").detail("Feed", feed).detail("Message", messageId);
255 loop {
256 try {
257 Optional<Value> feedValue = wait(tr.get(keyForFeed(feed)));
258 if(!feedValue.present()) {
259 // No such feed!!
260 return uint64_t(0);
261 }
262
263 // Get globally latest message, set our ID to that less one
264 state Standalone<RangeResultRef> latestMessage = wait( tr.getRange(
265 firstGreaterOrEqual(keyForMessage(0)),
266 firstGreaterOrEqual(keyForMessage(UINT64_MAX)), 1 ) );
267 if(!latestMessage.size()) {
268 messageId = UINT64_MAX - 1;
269 } else {
270 StringRef messageStr = latestMessage[0].key.removePrefix(messagePrefix);
271 messageId = valueToUInt64(messageStr) - 1;
272 }
273
274 tr.set(keyForMessage(messageId), StringRef());
275 tr.set(keyForDisptchEntry(messageId), StringRef());
276
277 wait(tr.commit());
278 break;
279 } catch(Error& e) {
280 wait( tr.onError(e) );
281 }
282 }
283 tr = Transaction(cx);
284 loop {
285 try {
286 // Record this ID as the "latest message" for a feed
287 tr.set(keyForFeedLatestMessage(feed), uInt64ToValue(messageId));
288
289 // Store message in list of feed's messages
290 tr.set(keyForFeedMessage(feed, messageId), StringRef());
291
292 // Update the count of message that this feed has published
293 Optional<Value> cntValue = wait(tr.get(keyForFeedMessageCount(feed)));
294 uint64_t messageCount(valueToUInt64(cntValue.get()) + 1);
295 tr.set(keyForFeedMessageCount(feed), uInt64ToValue(messageCount));
296
297 // Go through the list of watching inboxes
298 wait(updateFeedWatchers(&tr, feed));
299
300 // Post the real message data; clear the "dispatching" entry
301 tr.set(keyForMessage(messageId), data);
302 tr.clear(keyForDisptchEntry(messageId));
303
304 wait(tr.commit());
305 break;
306 } catch(Error& e) {
307 wait( tr.onError(e) );
308 }
309 }
310 return messageId;
311 }
312
postMessage(uint64_t feed,Standalone<StringRef> data)313 Future<uint64_t> PubSub::postMessage(uint64_t feed, Standalone<StringRef> data) {
314 return _postMessage(cx, feed, data);
315 }
316
singlePassInboxCacheUpdate(Database cx,uint64_t inbox,int swath)317 ACTOR Future<int> singlePassInboxCacheUpdate(Database cx, uint64_t inbox, int swath) {
318 state Transaction tr(cx);
319 loop {
320 try {
321 // For each stale feed, update cache with latest message id
322 state Standalone<RangeResultRef> staleFeeds = wait( tr.getRange(
323 firstGreaterOrEqual(keyForInboxStaleFeed(inbox, 0)),
324 firstGreaterOrEqual(keyForInboxStaleFeed(inbox, UINT64_MAX)), swath ) ); //REVIEW: does 100 make sense?
325 //printf(" --> stale feeds list size: %d\n", staleFeeds.size());
326 if(!staleFeeds.size())
327 // If there are no stale feeds, return.
328 return 0;
329 state StringRef stalePrefix = keyForInboxStalePrefix(inbox);
330 state int idx = 0;
331 for(; idx < staleFeeds.size(); idx++) {
332 StringRef feedStr = staleFeeds[idx].key.removePrefix(stalePrefix);
333 //printf(" --> clearing stale entry: %s\n", feedStr.toString().c_str());
334 state uint64_t feed = valueToUInt64(feedStr);
335
336 // SOMEDAY: change this to be a range query for the highest #'ed message
337 Optional<Value> v = wait(tr.get(keyForFeedLatestMessage(feed)));
338 state Value latestMessageValue = v.get();
339 //printf(" --> latest message from feed: %s\n", latestMessageValue.toString().c_str());
340
341 // find the messageID which is currently cached for this feed
342 Optional<Value> lastCachedValue = wait(tr.get(keyForInboxCacheByFeed(inbox, feed)));
343 if(lastCachedValue.present()) {
344 uint64_t lastCachedId = valueToUInt64(lastCachedValue.get());
345 // clear out the cache entry in the "by-ID" list for this feed
346 // SOMEDAY: should we leave this in there in some way, or should we pull a better/more recent cache?
347 tr.clear(keyForInboxCacheByID(inbox, lastCachedId));
348 }
349 //printf(" --> caching message by ID: %s\n", keyForInboxCacheByID(inbox, valueToUInt64(latestMessageValue)).toString().c_str());
350 tr.set(keyForInboxCacheByID(inbox, valueToUInt64(latestMessageValue)), uInt64ToValue(feed));
351
352 // set the latest message
353 tr.set(keyForInboxCacheByFeed(inbox, feed), latestMessageValue);
354 tr.clear(staleFeeds[idx].key);
355 // place watch back on feed
356 tr.set(keyForFeedWatcher(feed, inbox), StringRef());
357 //printf(" --> adding watch to feed: %s\n", keyForFeedWatcher(feed, inbox).toString().c_str());
358 }
359 wait(tr.commit());
360 return staleFeeds.size();
361 } catch(Error& e) {
362 wait( tr.onError(e) );
363 }
364 }
365 }
366
367 // SOMEDAY: evaluate if this could lead to painful loop if there's one frequent feed.
updateInboxCache(Database cx,uint64_t inbox)368 ACTOR Future<Void> updateInboxCache(Database cx, uint64_t inbox) {
369 state int swath = 100;
370 state int updatedEntries = swath;
371 while(updatedEntries >= swath) {
372 int retVal = wait(singlePassInboxCacheUpdate(cx, inbox, swath));
373 updatedEntries = retVal;
374 }
375 return Void();
376 }
377
getFeedLatestAtOrAfter(Transaction * tr,Feed feed,MessageId position)378 ACTOR Future<MessageId> getFeedLatestAtOrAfter(Transaction *tr, Feed feed, MessageId position) {
379 state Standalone<RangeResultRef> lastMessageRange = wait( (*tr).getRange(
380 firstGreaterOrEqual(keyForFeedMessage(feed, position)),
381 firstGreaterOrEqual(keyForFeedMessage(feed, UINT64_MAX)), 1 ) );
382 if(!lastMessageRange.size())
383 return uint64_t(0);
384 KeyValueRef m = lastMessageRange[0];
385 StringRef prefix = keyForFeedMessagePrefix(feed);
386 StringRef mIdStr = m.key.removePrefix(prefix);
387 return valueToUInt64(mIdStr);
388 }
389
getMessage(Transaction * tr,Feed feed,MessageId id)390 ACTOR Future<Message> getMessage(Transaction *tr, Feed feed, MessageId id) {
391 state Message m;
392 m.originatorFeed = feed;
393 m.messageId = id;
394 Optional<Value> data = wait(tr->get(keyForMessage(id)));
395 m.data = data.get();
396 return m;
397 }
398
399 ACTOR Future<std::vector<Message>> _listInboxMessages(Database cx, uint64_t inbox, int count, uint64_t cursor);
400
401 // inboxes with MANY fast feeds may be punished by the following checks
402 // SOMEDAY: add a check on global lists (or on dispatching list)
_listInboxMessages(Database cx,uint64_t inbox,int count,uint64_t cursor)403 ACTOR Future<std::vector<Message>> _listInboxMessages(Database cx, uint64_t inbox, int count, uint64_t cursor)
404 {
405 TraceEvent("PubSubListInbox").detail("Inbox", inbox).detail("Count", count).detail("Cursor", cursor);
406 wait(updateInboxCache(cx, inbox));
407 state StringRef perIdPrefix = keyForInboxCacheByIDPrefix(inbox);
408 loop {
409 state Transaction tr(cx);
410 state std::vector<Message> messages;
411 state std::map<MessageId, Feed> feedLatest;
412 try {
413 // Fetch all cached entries for all the feeds to which we are subscribed
414 Optional<Value> cntValue = wait(tr.get(keyForInboxSubcriptionCount(inbox)));
415 uint64_t subscriptions = valueToUInt64(cntValue.get());
416 state Standalone<RangeResultRef> feeds = wait( tr.getRange(
417 firstGreaterOrEqual(keyForInboxCacheByID(inbox, 0)),
418 firstGreaterOrEqual(keyForInboxCacheByID(inbox, UINT64_MAX)), subscriptions ) );
419 if(!feeds.size())
420 return messages;
421
422 // read cache into map, replace entries newer than cursor with the newest older than cursor
423 state int idx = 0;
424 for(; idx < feeds.size(); idx++) {
425 StringRef mIdStr = feeds[idx].key.removePrefix(perIdPrefix);
426 MessageId messageId = valueToUInt64(mIdStr);
427 state Feed feed = valueToUInt64(feeds[idx].value);
428 //printf(" -> cached message %016llx from feed %016llx\n", messageId, feed);
429 if(messageId >= cursor) {
430 //printf(" -> entering message %016llx from feed %016llx\n", messageId, feed);
431 feedLatest.insert(pair<MessageId, Feed>(messageId, feed));
432 } else {
433 // replace this with the first message older than the cursor
434 MessageId mId = wait(getFeedLatestAtOrAfter(&tr, feed, cursor));
435 if(mId) {
436 feedLatest.insert(pair<MessageId, Feed>(mId, feed));
437 }
438 }
439 }
440 // There were some cached feeds, but none with messages older than "cursor"
441 if(!feedLatest.size())
442 return messages;
443
444 // Check the list of dispatching messages to make sure there are no older ones than ours
445 state MessageId earliestMessage = feedLatest.begin()->first;
446 Standalone<RangeResultRef> dispatching = wait( tr.getRange(
447 firstGreaterOrEqual(keyForDisptchEntry(earliestMessage)),
448 firstGreaterOrEqual(keyForDisptchEntry(UINT64_MAX)), 1 ) );
449 // If there are messages "older" than ours, try this again
450 // (with a new transaction and a flush of the "stale" feeds
451 if(dispatching.size()) {
452 std::vector<Message> r = wait( _listInboxMessages(cx, inbox, count, earliestMessage) );
453 return r;
454 }
455
456 while (messages.size() < count && feedLatest.size() > 0) {
457 std::map<MessageId, Feed>::iterator latest = feedLatest.begin();
458 state MessageId id = latest->first;
459 state Feed f = latest->second;
460 feedLatest.erase(latest);
461
462 Message m = wait( getMessage( &tr, f, id ) );
463 messages.push_back( m );
464
465 MessageId nextMessage = wait(getFeedLatestAtOrAfter(&tr, f, id + 1));
466 if(nextMessage) {
467 feedLatest.insert(pair<MessageId, Feed>(nextMessage, f));
468 }
469 }
470
471 return messages;
472 } catch(Error& e) {
473 wait( tr.onError(e) );
474 }
475 }
476 }
477
listInboxMessages(uint64_t inbox,int count,uint64_t cursor)478 Future<std::vector<Message>> PubSub::listInboxMessages(uint64_t inbox, int count, uint64_t cursor) {
479 return _listInboxMessages(cx, inbox, count, cursor);
480 }
481
_listFeedMessages(Database cx,Feed feed,int count,uint64_t cursor)482 ACTOR Future<std::vector<Message>> _listFeedMessages(Database cx, Feed feed, int count, uint64_t cursor) {
483 state std::vector<Message> messages;
484 state Transaction tr(cx);
485 TraceEvent("PubSubListFeed").detail("Feed", feed).detail("Count", count).detail("Cursor", cursor);
486 loop {
487 try {
488 state Standalone<RangeResultRef> messageIds = wait( tr.getRange(
489 firstGreaterOrEqual(keyForFeedMessage(feed, cursor)),
490 firstGreaterOrEqual(keyForFeedMessage(feed, UINT64_MAX)), count ) );
491 if(!messageIds.size())
492 return messages;
493
494 state int idx = 0;
495 for(; idx < messageIds.size(); idx++) {
496 StringRef mIdStr = messageIds[idx].key.removePrefix(keyForFeedMessagePrefix(feed));
497 MessageId messageId = valueToUInt64(mIdStr);
498 Message m = wait( getMessage(&tr, feed, messageId) );
499 messages.push_back( m );
500 }
501 return messages;
502 } catch(Error& e) {
503 wait( tr.onError(e) );
504 }
505 }
506 }
507
listFeedMessages(Feed feed,int count,uint64_t cursor)508 Future<std::vector<Message>> PubSub::listFeedMessages(Feed feed, int count, uint64_t cursor) {
509 return _listFeedMessages(cx, feed, count, cursor);
510 }
511
512