1 /*
2  * pubsub.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbclient/NativeAPI.actor.h"
22 #include "fdbserver/pubsub.h"
23 #include "flow/actorcompiler.h"  // This must be the last #include.
24 
uInt64ToValue(uint64_t v)25 Value uInt64ToValue( uint64_t v ) {
26 	return StringRef(format("%016llx", v));
27 }
valueToUInt64(const StringRef & v)28 uint64_t valueToUInt64( const StringRef& v ) {
29 	uint64_t x = 0;
30 	sscanf( v.toString().c_str(), "%llx", &x );
31 	return x;
32 }
33 
keyForInbox(uint64_t inbox)34 Key keyForInbox(uint64_t inbox) {
35 	return StringRef(format("i/%016llx", inbox));
36 }
keyForInboxSubcription(uint64_t inbox,uint64_t feed)37 Key keyForInboxSubcription(uint64_t inbox, uint64_t feed) {
38 	return StringRef(format("i/%016llx/subs/%016llx", inbox, feed));
39 }
keyForInboxSubcriptionCount(uint64_t inbox)40 Key keyForInboxSubcriptionCount(uint64_t inbox) {
41 	return StringRef(format("i/%016llx/subsCnt", inbox));
42 }
keyForInboxStalePrefix(uint64_t inbox)43 Key keyForInboxStalePrefix(uint64_t inbox) {
44 	return StringRef(format("i/%016llx/stale/", inbox));
45 }
keyForInboxStaleFeed(uint64_t inbox,uint64_t feed)46 Key keyForInboxStaleFeed(uint64_t inbox, uint64_t feed) {
47 	return StringRef(format("i/%016llx/stale/%016llx", inbox, feed));
48 }
keyForInboxCacheByIDPrefix(uint64_t inbox)49 Key keyForInboxCacheByIDPrefix(uint64_t inbox) {
50 	return StringRef(format("i/%016llx/cid/", inbox));
51 }
keyForInboxCacheByID(uint64_t inbox,uint64_t messageId)52 Key keyForInboxCacheByID(uint64_t inbox, uint64_t messageId) {
53 	return StringRef(format("i/%016llx/cid/%016llx", inbox, messageId));
54 }
keyForInboxCacheByFeedPrefix(uint64_t inbox)55 Key keyForInboxCacheByFeedPrefix(uint64_t inbox) {
56 	return StringRef(format("i/%016llx/cf/", inbox));
57 }
keyForInboxCacheByFeed(uint64_t inbox,uint64_t feed)58 Key keyForInboxCacheByFeed(uint64_t inbox, uint64_t feed) {
59 	return StringRef(format("i/%016llx/cf/%016llx", inbox, feed));
60 }
61 
keyForFeed(uint64_t feed)62 Key keyForFeed(uint64_t feed) {
63 	return StringRef(format("f/%016llx", feed));
64 }
keyForFeedSubcriber(uint64_t feed,uint64_t inbox)65 Key keyForFeedSubcriber(uint64_t feed, uint64_t inbox) {
66 	return StringRef(format("f/%016llx/subs/%016llx", feed, inbox));
67 }
keyForFeedSubcriberCount(uint64_t feed)68 Key keyForFeedSubcriberCount(uint64_t feed) {
69 	return StringRef(format("f/%016llx/subscCnt", feed));
70 }
keyForFeedMessage(uint64_t feed,uint64_t message)71 Key keyForFeedMessage(uint64_t feed, uint64_t message) {
72 	return StringRef(format("f/%016llx/m/%016llx", feed, message));
73 }
keyForFeedMessagePrefix(uint64_t feed)74 Key keyForFeedMessagePrefix(uint64_t feed) {
75 	return StringRef(format("f/%016llx/m/", feed));
76 }
keyForFeedMessageCount(uint64_t feed)77 Key keyForFeedMessageCount(uint64_t feed) {
78 	return StringRef(format("f/%016llx/messCount", feed));
79 }
80 // the following should go at some point: change over to range query of count 1 from feed message list
keyForFeedLatestMessage(uint64_t feed)81 Key keyForFeedLatestMessage(uint64_t feed) {
82 	return StringRef(format("f/%016llx/latestMessID", feed));
83 }
keyForFeedWatcherPrefix(uint64_t feed)84 Key keyForFeedWatcherPrefix(uint64_t feed) {
85 	return StringRef(format("f/%016llx/watchers/", feed));
86 }
keyForFeedWatcher(uint64_t feed,uint64_t inbox)87 Key keyForFeedWatcher(uint64_t feed, uint64_t inbox) {
88 	return StringRef(format("f/%016llx/watchers/%016llx", feed, inbox));
89 }
90 
91 Standalone<StringRef> messagePrefix(LiteralStringRef("m/"));
92 
keyForMessage(uint64_t message)93 Key keyForMessage(uint64_t message) {
94 	return StringRef(format("m/%016llx", message));
95 }
96 
keyForDisptchEntry(uint64_t message)97 Key keyForDisptchEntry(uint64_t message) {
98 	return StringRef(format("d/%016llx", message));
99 }
100 
PubSub(Database _cx)101 PubSub::PubSub(Database _cx)
102 	: cx(_cx)
103 {
104 }
105 
_createFeed(Database cx,Standalone<StringRef> metadata)106 ACTOR Future<uint64_t> _createFeed(Database cx, Standalone<StringRef> metadata) {
107 	state uint64_t id(g_random->randomUniqueID().first());  // SOMEDAY: this should be an atomic increment
108 	TraceEvent("PubSubCreateFeed").detail("Feed", id);
109 	state Transaction tr(cx);
110 	loop {
111 		try {
112 			state Optional<Value> val = wait(tr.get(keyForFeed(id)));
113 			while(val.present()) {
114 				id = id + g_random->randomInt(1, 100);
115 				Optional<Value> v = wait(tr.get(keyForFeed(id)));
116 				val = v;
117 			}
118 			tr.set(keyForFeed(id), metadata);
119 			tr.set(keyForFeedSubcriberCount(id), uInt64ToValue(0));
120 			tr.set(keyForFeedMessageCount(id), uInt64ToValue(0));
121 			wait(tr.commit());
122 			break;
123 		} catch(Error& e) {
124 			wait( tr.onError(e) );
125 		}
126 	}
127 	return id;
128 }
129 
createFeed(Standalone<StringRef> metadata)130 Future<uint64_t> PubSub::createFeed(Standalone<StringRef> metadata) {
131 	return _createFeed(cx, metadata);
132 }
133 
_createInbox(Database cx,Standalone<StringRef> metadata)134 ACTOR Future<uint64_t> _createInbox(Database cx, Standalone<StringRef> metadata) {
135 	state uint64_t id = g_random->randomUniqueID().first();
136 	TraceEvent("PubSubCreateInbox").detail("Inbox", id);
137 	state Transaction tr(cx);
138 	loop {
139 		try {
140 			state Optional<Value> val = wait(tr.get(keyForInbox(id)));
141 			while(val.present()) {
142 				id += g_random->randomInt(1, 100);
143 				Optional<Value> v = wait(tr.get(keyForFeed(id)));
144 				val = v;
145 			}
146 			tr.set(keyForInbox(id), metadata);
147 			tr.set(keyForInboxSubcriptionCount(id), uInt64ToValue(0));
148 			wait( tr.commit() );
149 			break;
150 		} catch(Error& e) {
151 			wait( tr.onError(e) );
152 		}
153 	}
154 	return id;
155 }
156 
createInbox(Standalone<StringRef> metadata)157 Future<uint64_t> PubSub::createInbox(Standalone<StringRef> metadata) {
158 	return _createInbox(cx, metadata);
159 }
160 
_createSubcription(Database cx,uint64_t feed,uint64_t inbox)161 ACTOR Future<bool> _createSubcription(Database cx, uint64_t feed, uint64_t inbox) {
162 	state Transaction tr(cx);
163 	TraceEvent("PubSubCreateSubscription").detail("Feed", feed).detail("Inbox", inbox);
164 	loop {
165 		try {
166 			Optional<Value> subcription = wait(tr.get(keyForInboxSubcription(inbox, feed)));
167 			if(subcription.present()) {
168 				// For idempotency, this could exist from a previous transaction from us that succeeded
169 				return true;
170 			}
171 			Optional<Value> inboxVal = wait(tr.get(keyForInbox(inbox)));
172 			if(!inboxVal.present()) {
173 				return false;
174 			}
175 			Optional<Value> feedVal = wait(tr.get(keyForFeed(feed)));
176 			if(!feedVal.present()) {
177 				return false;
178 			}
179 
180 			// Update the subscriptions of the inbox
181 			Optional<Value> subcriptionCountVal = wait(tr.get(keyForInboxSubcriptionCount(inbox)));
182 			uint64_t subcriptionCount = valueToUInt64(subcriptionCountVal.get()); // throws if count not present
183 			tr.set(keyForInboxSubcription(inbox, feed), StringRef());
184 			tr.set(keyForInboxSubcriptionCount(inbox), uInt64ToValue(subcriptionCount + 1));
185 
186 			// Update the subcribers of the feed
187 			Optional<Value> subcriberCountVal = wait(tr.get(keyForFeedSubcriberCount(feed)));
188 			uint64_t subcriberCount = valueToUInt64(subcriberCountVal.get()); // throws if count not present
189 			tr.set(keyForFeedSubcriber(feed, inbox), StringRef());
190 			tr.set(keyForFeedSubcriberCount(inbox), uInt64ToValue(subcriberCount + 1));
191 
192 			// Add inbox as watcher of feed.
193 			tr.set(keyForFeedWatcher(feed, inbox), StringRef());
194 
195 			wait( tr.commit() );
196 			break;
197 		} catch(Error& e) {
198 			wait( tr.onError(e) );
199 		}
200 	}
201 	return true;
202 }
203 
createSubcription(uint64_t feed,uint64_t inbox)204 Future<bool> PubSub::createSubcription(uint64_t feed, uint64_t inbox) {
205 	return _createSubcription(cx, feed, inbox);
206 }
207 
208 // Since we are not relying on "read-your-own-writes", we need to keep track of
209 //  the highest-numbered inbox that we've cleared from the watchers list and
210 //  make sure that further requests start after this inbox.
updateFeedWatchers(Transaction * tr,uint64_t feed)211 ACTOR Future<Void> updateFeedWatchers(Transaction *tr, uint64_t feed) {
212 	state StringRef watcherPrefix = keyForFeedWatcherPrefix(feed);
213 	state uint64_t highestInbox;
214 	state bool first = true;
215 	loop {
216 		// Grab watching inboxes in swaths of 100
217 		state Standalone<RangeResultRef> watchingInboxes = wait( (*tr).getRange(
218 			firstGreaterOrEqual(keyForFeedWatcher(feed, first ? 0 : highestInbox + 1)),
219 			firstGreaterOrEqual(keyForFeedWatcher(feed, UINT64_MAX)), 100 ) );  // REVIEW: does 100 make sense?
220 		if(!watchingInboxes.size())
221 			// If there are no watchers, return.
222 			return Void();
223 		first = false;
224 		state int idx = 0;
225 		for(; idx < watchingInboxes.size(); idx++) {
226 			KeyRef key = watchingInboxes[idx].key;
227 			StringRef inboxStr = key.removePrefix(watcherPrefix);
228 			uint64_t inbox = valueToUInt64(inboxStr);
229 			// add this feed to the stale list of inbox
230 			(*tr).set(keyForInboxStaleFeed(inbox, feed), StringRef());
231 			// remove the inbox from the list of watchers on this feed
232 			(*tr).clear(key);
233 			highestInbox = inbox;
234 		}
235 		if(watchingInboxes.size() < 100)
236 			// If there were fewer watchers returned that we asked for, we're done.
237 			return Void();
238 	}
239 }
240 
241 /*
242  * Posts a message to a feed.  This updates the list of stale feeds to all watchers.
243  * Return: a per-feed (non-global) message ID.
244  *
245  * This needs many additions to make it "real"
246  * SOMEDAY: create better global message table to enforce cross-feed ordering.
247  * SOMEDAY: create a global "dispatching" list for feeds that have yet to fully update inboxes.
248  *        Move feed in and remove watchers in one transaction, possibly
249  * SOMEDAY: create a global list of the most-subscribed-to feeds that all inbox reads check
250  */
_postMessage(Database cx,uint64_t feed,Standalone<StringRef> data)251 ACTOR Future<uint64_t> _postMessage(Database cx, uint64_t feed, Standalone<StringRef> data) {
252 	state Transaction tr(cx);
253 	state uint64_t messageId = UINT64_MAX - (uint64_t)now();
254 	TraceEvent("PubSubPost").detail("Feed", feed).detail("Message", messageId);
255 	loop {
256 		try {
257 			Optional<Value> feedValue = wait(tr.get(keyForFeed(feed)));
258 			if(!feedValue.present()) {
259 				// No such feed!!
260 				return uint64_t(0);
261 			}
262 
263 			// Get globally latest message, set our ID to that less one
264 			state Standalone<RangeResultRef> latestMessage = wait( tr.getRange(
265 				firstGreaterOrEqual(keyForMessage(0)),
266 				firstGreaterOrEqual(keyForMessage(UINT64_MAX)), 1 ) );
267 			if(!latestMessage.size()) {
268 				messageId = UINT64_MAX - 1;
269 			} else {
270 				StringRef messageStr = latestMessage[0].key.removePrefix(messagePrefix);
271 				messageId = valueToUInt64(messageStr) - 1;
272 			}
273 
274 			tr.set(keyForMessage(messageId), StringRef());
275 			tr.set(keyForDisptchEntry(messageId), StringRef());
276 
277 			wait(tr.commit());
278 			break;
279 		} catch(Error& e) {
280 			wait( tr.onError(e) );
281 		}
282 	}
283 	tr = Transaction(cx);
284 	loop {
285 		try {
286 			// Record this ID as the "latest message" for a feed
287 			tr.set(keyForFeedLatestMessage(feed), uInt64ToValue(messageId));
288 
289 			// Store message in list of feed's messages
290 			tr.set(keyForFeedMessage(feed, messageId), StringRef());
291 
292 			// Update the count of message that this feed has published
293 			Optional<Value> cntValue = wait(tr.get(keyForFeedMessageCount(feed)));
294 			uint64_t messageCount(valueToUInt64(cntValue.get()) + 1);
295 			tr.set(keyForFeedMessageCount(feed), uInt64ToValue(messageCount));
296 
297 			// Go through the list of watching inboxes
298 			wait(updateFeedWatchers(&tr, feed));
299 
300 			// Post the real message data; clear the "dispatching" entry
301 			tr.set(keyForMessage(messageId), data);
302 			tr.clear(keyForDisptchEntry(messageId));
303 
304 			wait(tr.commit());
305 			break;
306 		} catch(Error& e) {
307 			wait( tr.onError(e) );
308 		}
309 	}
310 	return messageId;
311 }
312 
postMessage(uint64_t feed,Standalone<StringRef> data)313 Future<uint64_t> PubSub::postMessage(uint64_t feed, Standalone<StringRef> data) {
314 	return _postMessage(cx, feed, data);
315 }
316 
singlePassInboxCacheUpdate(Database cx,uint64_t inbox,int swath)317 ACTOR Future<int> singlePassInboxCacheUpdate(Database cx, uint64_t inbox, int swath) {
318 	state Transaction tr(cx);
319 	loop {
320 		try {
321 			// For each stale feed, update cache with latest message id
322 			state Standalone<RangeResultRef> staleFeeds = wait( tr.getRange(
323 				firstGreaterOrEqual(keyForInboxStaleFeed(inbox, 0)),
324 				firstGreaterOrEqual(keyForInboxStaleFeed(inbox, UINT64_MAX)), swath ) );  //REVIEW: does 100 make sense?
325 			//printf("  --> stale feeds list size: %d\n", staleFeeds.size());
326 			if(!staleFeeds.size())
327 				// If there are no stale feeds, return.
328 				return 0;
329 			state StringRef stalePrefix = keyForInboxStalePrefix(inbox);
330 			state int idx = 0;
331 			for(; idx < staleFeeds.size(); idx++) {
332 				StringRef feedStr = staleFeeds[idx].key.removePrefix(stalePrefix);
333 				//printf("  --> clearing stale entry: %s\n", feedStr.toString().c_str());
334 				state uint64_t feed = valueToUInt64(feedStr);
335 
336 				// SOMEDAY: change this to be a range query for the highest #'ed message
337 				Optional<Value> v = wait(tr.get(keyForFeedLatestMessage(feed)));
338 				state Value latestMessageValue = v.get();
339 				//printf("  --> latest message from feed: %s\n", latestMessageValue.toString().c_str());
340 
341 				// find the messageID which is currently cached for this feed
342 				Optional<Value> lastCachedValue = wait(tr.get(keyForInboxCacheByFeed(inbox, feed)));
343 				if(lastCachedValue.present()) {
344 					uint64_t lastCachedId = valueToUInt64(lastCachedValue.get());
345 					// clear out the cache entry in the "by-ID" list for this feed
346 					// SOMEDAY: should we leave this in there in some way, or should we pull a better/more recent cache?
347 					tr.clear(keyForInboxCacheByID(inbox, lastCachedId));
348 				}
349 				//printf("  --> caching message by ID: %s\n", keyForInboxCacheByID(inbox, valueToUInt64(latestMessageValue)).toString().c_str());
350 				tr.set(keyForInboxCacheByID(inbox, valueToUInt64(latestMessageValue)), uInt64ToValue(feed));
351 
352 				// set the latest message
353 				tr.set(keyForInboxCacheByFeed(inbox, feed), latestMessageValue);
354 				tr.clear(staleFeeds[idx].key);
355 				// place watch back on feed
356 				tr.set(keyForFeedWatcher(feed, inbox), StringRef());
357 				//printf("  --> adding watch to feed: %s\n", keyForFeedWatcher(feed, inbox).toString().c_str());
358 			}
359 			wait(tr.commit());
360 			return staleFeeds.size();
361 		} catch(Error& e) {
362 			wait( tr.onError(e) );
363 		}
364 	}
365 }
366 
367 // SOMEDAY: evaluate if this could lead to painful loop if there's one frequent feed.
updateInboxCache(Database cx,uint64_t inbox)368 ACTOR Future<Void> updateInboxCache(Database cx, uint64_t inbox) {
369 	state int swath = 100;
370 	state int updatedEntries = swath;
371 	while(updatedEntries >= swath) {
372 		int retVal = wait(singlePassInboxCacheUpdate(cx, inbox, swath));
373 		updatedEntries = retVal;
374 	}
375 	return Void();
376 }
377 
getFeedLatestAtOrAfter(Transaction * tr,Feed feed,MessageId position)378 ACTOR Future<MessageId> getFeedLatestAtOrAfter(Transaction *tr, Feed feed, MessageId position) {
379 	state Standalone<RangeResultRef> lastMessageRange = wait( (*tr).getRange(
380 			firstGreaterOrEqual(keyForFeedMessage(feed, position)),
381 			firstGreaterOrEqual(keyForFeedMessage(feed, UINT64_MAX)), 1 ) );
382 	if(!lastMessageRange.size())
383 		return uint64_t(0);
384 	KeyValueRef m = lastMessageRange[0];
385 	StringRef prefix = keyForFeedMessagePrefix(feed);
386 	StringRef mIdStr = m.key.removePrefix(prefix);
387 	return valueToUInt64(mIdStr);
388 }
389 
getMessage(Transaction * tr,Feed feed,MessageId id)390 ACTOR Future<Message> getMessage(Transaction *tr, Feed feed, MessageId id) {
391 	state Message m;
392 	m.originatorFeed = feed;
393 	m.messageId = id;
394 	Optional<Value> data = wait(tr->get(keyForMessage(id)));
395 	m.data = data.get();
396 	return m;
397 }
398 
399 ACTOR Future<std::vector<Message>> _listInboxMessages(Database cx, uint64_t inbox, int count, uint64_t cursor);
400 
401 // inboxes with MANY fast feeds may be punished by the following checks
402 // SOMEDAY: add a check on global lists (or on dispatching list)
_listInboxMessages(Database cx,uint64_t inbox,int count,uint64_t cursor)403 ACTOR Future<std::vector<Message>> _listInboxMessages(Database cx, uint64_t inbox, int count, uint64_t cursor)
404 {
405 	TraceEvent("PubSubListInbox").detail("Inbox", inbox).detail("Count", count).detail("Cursor", cursor);
406 	wait(updateInboxCache(cx, inbox));
407 	state StringRef perIdPrefix = keyForInboxCacheByIDPrefix(inbox);
408 	loop {
409 		state Transaction tr(cx);
410 		state std::vector<Message> messages;
411 		state std::map<MessageId, Feed> feedLatest;
412 		try {
413 			// Fetch all cached entries for all the feeds to which we are subscribed
414 			Optional<Value> cntValue = wait(tr.get(keyForInboxSubcriptionCount(inbox)));
415 			uint64_t subscriptions = valueToUInt64(cntValue.get());
416 			state Standalone<RangeResultRef> feeds = wait( tr.getRange(
417 				firstGreaterOrEqual(keyForInboxCacheByID(inbox, 0)),
418 				firstGreaterOrEqual(keyForInboxCacheByID(inbox, UINT64_MAX)), subscriptions ) );
419 			if(!feeds.size())
420 				return messages;
421 
422 			// read cache into map, replace entries newer than cursor with the newest older than cursor
423 			state int idx = 0;
424 			for(; idx < feeds.size(); idx++) {
425 				StringRef mIdStr = feeds[idx].key.removePrefix(perIdPrefix);
426 				MessageId messageId = valueToUInt64(mIdStr);
427 				state Feed feed = valueToUInt64(feeds[idx].value);
428 				//printf(" -> cached message %016llx from feed %016llx\n", messageId, feed);
429 				if(messageId >= cursor) {
430 					//printf(" -> entering message %016llx from feed %016llx\n", messageId, feed);
431 					feedLatest.insert(pair<MessageId, Feed>(messageId, feed));
432 				} else {
433 					// replace this with the first message older than the cursor
434 					MessageId mId = wait(getFeedLatestAtOrAfter(&tr, feed, cursor));
435 					if(mId) {
436 						feedLatest.insert(pair<MessageId, Feed>(mId, feed));
437 					}
438 				}
439 			}
440 			// There were some cached feeds, but none with messages older than "cursor"
441 			if(!feedLatest.size())
442 				return messages;
443 
444 			// Check the list of dispatching messages to make sure there are no older ones than ours
445 			state MessageId earliestMessage = feedLatest.begin()->first;
446 			Standalone<RangeResultRef> dispatching = wait( tr.getRange(
447 				firstGreaterOrEqual(keyForDisptchEntry(earliestMessage)),
448 				firstGreaterOrEqual(keyForDisptchEntry(UINT64_MAX)), 1 ) );
449 			// If there are messages "older" than ours, try this again
450 			//  (with a new transaction and a flush of the "stale" feeds
451 			if(dispatching.size()) {
452 				std::vector<Message> r = wait( _listInboxMessages(cx, inbox, count, earliestMessage) );
453 				return r;
454 			}
455 
456 			while (messages.size() < count && feedLatest.size() > 0) {
457 				std::map<MessageId, Feed>::iterator latest = feedLatest.begin();
458 				state MessageId id = latest->first;
459 				state Feed f = latest->second;
460 				feedLatest.erase(latest);
461 
462 				Message m = wait( getMessage( &tr, f, id ) );
463 				messages.push_back( m );
464 
465 				MessageId nextMessage = wait(getFeedLatestAtOrAfter(&tr, f, id + 1));
466 				if(nextMessage) {
467 					feedLatest.insert(pair<MessageId, Feed>(nextMessage, f));
468 				}
469 			}
470 
471 			return messages;
472 		} catch(Error& e) {
473 			wait( tr.onError(e) );
474 		}
475 	}
476 }
477 
listInboxMessages(uint64_t inbox,int count,uint64_t cursor)478 Future<std::vector<Message>> PubSub::listInboxMessages(uint64_t inbox, int count, uint64_t cursor) {
479 	return _listInboxMessages(cx, inbox, count, cursor);
480 }
481 
_listFeedMessages(Database cx,Feed feed,int count,uint64_t cursor)482 ACTOR Future<std::vector<Message>> _listFeedMessages(Database cx, Feed feed, int count, uint64_t cursor) {
483 	state std::vector<Message> messages;
484 	state Transaction tr(cx);
485 	TraceEvent("PubSubListFeed").detail("Feed", feed).detail("Count", count).detail("Cursor", cursor);
486 	loop {
487 		try {
488 			state Standalone<RangeResultRef> messageIds = wait( tr.getRange(
489 				firstGreaterOrEqual(keyForFeedMessage(feed, cursor)),
490 				firstGreaterOrEqual(keyForFeedMessage(feed, UINT64_MAX)), count ) );
491 			if(!messageIds.size())
492 				return messages;
493 
494 			state int idx = 0;
495 			for(; idx < messageIds.size(); idx++) {
496 				StringRef mIdStr = messageIds[idx].key.removePrefix(keyForFeedMessagePrefix(feed));
497 				MessageId messageId = valueToUInt64(mIdStr);
498 				Message m = wait( getMessage(&tr, feed, messageId) );
499 				messages.push_back( m );
500 			}
501 			return messages;
502 		} catch(Error& e) {
503 			wait( tr.onError(e) );
504 		}
505 	}
506 }
507 
listFeedMessages(Feed feed,int count,uint64_t cursor)508 Future<std::vector<Message>> PubSub::listFeedMessages(Feed feed, int count, uint64_t cursor) {
509 	return _listFeedMessages(cx, feed, count, cursor);
510 }
511 
512