1 /*
2 This file is part of Telegram Desktop,
3 the official desktop application for the Telegram messaging service.
4 
5 For license and copyright information please follow this link:
6 https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
7 */
8 #include "data/data_sparse_ids.h"
9 
10 #include <rpl/combine.h>
11 #include "storage/storage_sparse_ids_list.h"
12 
SparseIdsMergedSlice(Key key)13 SparseIdsMergedSlice::SparseIdsMergedSlice(Key key)
14 : SparseIdsMergedSlice(
15 	key,
16 	SparseIdsSlice(),
17 	MigratedSlice(key)) {
18 }
19 
SparseIdsMergedSlice(Key key,SparseIdsSlice part,std::optional<SparseIdsSlice> migrated)20 SparseIdsMergedSlice::SparseIdsMergedSlice(
21 	Key key,
22 	SparseIdsSlice part,
23 	std::optional<SparseIdsSlice> migrated)
24 : _key(key)
25 , _part(std::move(part))
26 , _migrated(std::move(migrated)) {
27 }
28 
SparseIdsMergedSlice(Key key,SparseUnsortedIdsSlice scheduled)29 SparseIdsMergedSlice::SparseIdsMergedSlice(
30 	Key key,
31 	SparseUnsortedIdsSlice scheduled)
32 : _key(key)
33 , _scheduled(std::move(scheduled)) {
34 }
35 
fullCount() const36 std::optional<int> SparseIdsMergedSlice::fullCount() const {
37 	return _scheduled
38 		? _scheduled->fullCount()
39 		: Add(
40 			_part.fullCount(),
41 			_migrated ? _migrated->fullCount() : 0);
42 }
43 
skippedBefore() const44 std::optional<int> SparseIdsMergedSlice::skippedBefore() const {
45 	return _scheduled
46 		? _scheduled->skippedBefore()
47 		: Add(
48 			isolatedInMigrated() ? 0 : _part.skippedBefore(),
49 			_migrated
50 				? (isolatedInPart()
51 					? _migrated->fullCount()
52 					: _migrated->skippedBefore())
53 				: 0
54 		);
55 }
56 
skippedAfter() const57 std::optional<int> SparseIdsMergedSlice::skippedAfter() const {
58 	return _scheduled
59 		? _scheduled->skippedAfter()
60 		: Add(
61 			isolatedInMigrated() ? _part.fullCount() : _part.skippedAfter(),
62 			isolatedInPart() ? 0 : _migrated->skippedAfter()
63 		);
64 }
65 
indexOf(FullMsgId fullId) const66 std::optional<int> SparseIdsMergedSlice::indexOf(
67 		FullMsgId fullId) const {
68 	return _scheduled
69 		? _scheduled->indexOf(fullId.msg)
70 		: isFromPart(fullId)
71 		? (_part.indexOf(fullId.msg) | func::add(migratedSize()))
72 		: isolatedInPart()
73 			? std::nullopt
74 			: isFromMigrated(fullId)
75 				? _migrated->indexOf(fullId.msg)
76 				: std::nullopt;
77 }
78 
size() const79 int SparseIdsMergedSlice::size() const {
80 	return _scheduled
81 		? _scheduled->size()
82 		: (isolatedInPart() ? 0 : migratedSize())
83 			+ (isolatedInMigrated() ? 0 : _part.size());
84 }
85 
operator [](int index) const86 FullMsgId SparseIdsMergedSlice::operator[](int index) const {
87 	Expects(index >= 0 && index < size());
88 
89 	if (_scheduled) {
90 		return ComputeId(_key.peerId, (*_scheduled)[index]);
91 	}
92 
93 	if (const auto size = migratedSize()) {
94 		if (index < size) {
95 			return ComputeId(_key.migratedPeerId, (*_migrated)[index]);
96 		}
97 		index -= size;
98 	}
99 	return ComputeId(_key.peerId, _part[index]);
100 }
101 
distance(const Key & a,const Key & b) const102 std::optional<int> SparseIdsMergedSlice::distance(
103 		const Key &a,
104 		const Key &b) const {
105 	if (const auto i = indexOf(ComputeId(a))) {
106 		if (const auto j = indexOf(ComputeId(b))) {
107 			return *j - *i;
108 		}
109 	}
110 	return std::nullopt;
111 }
112 
nearest(UniversalMsgId id) const113 auto SparseIdsMergedSlice::nearest(
114 		UniversalMsgId id) const -> std::optional<FullMsgId> {
115 	if (_scheduled) {
116 		if (const auto nearestId = _scheduled->nearest(id)) {
117 			return ComputeId(_key.peerId, *nearestId);
118 		}
119 	}
120 	const auto convertFromPartNearest = [&](MsgId result) {
121 		return ComputeId(_key.peerId, result);
122 	};
123 	const auto convertFromMigratedNearest = [&](MsgId result) {
124 		return ComputeId(_key.migratedPeerId, result);
125 	};
126 	if (IsServerMsgId(id)) {
127 		if (auto partNearestId = _part.nearest(id)) {
128 			return partNearestId
129 				| convertFromPartNearest;
130 		} else if (isolatedInPart()) {
131 			return std::nullopt;
132 		}
133 		return _migrated->nearest(ServerMaxMsgId - 1)
134 			| convertFromMigratedNearest;
135 	}
136 	if (auto migratedNearestId = _migrated
137 		? _migrated->nearest(id + ServerMaxMsgId)
138 		: std::nullopt) {
139 		return migratedNearestId
140 			| convertFromMigratedNearest;
141 	} else if (isolatedInMigrated()) {
142 		return std::nullopt;
143 	}
144 	return _part.nearest(0)
145 		| convertFromPartNearest;
146 }
147 
SparseIdsSliceBuilder(Key key,int limitBefore,int limitAfter)148 SparseIdsSliceBuilder::SparseIdsSliceBuilder(
149 	Key key,
150 	int limitBefore,
151 	int limitAfter)
152 : _key(key)
153 , _limitBefore(limitBefore)
154 , _limitAfter(limitAfter) {
155 }
156 
applyInitial(const Storage::SparseIdsListResult & result)157 bool SparseIdsSliceBuilder::applyInitial(
158 		const Storage::SparseIdsListResult &result) {
159 	mergeSliceData(
160 		result.count,
161 		result.messageIds,
162 		result.skippedBefore,
163 		result.skippedAfter);
164 	return true;
165 }
166 
applyUpdate(const Storage::SparseIdsSliceUpdate & update)167 bool SparseIdsSliceBuilder::applyUpdate(
168 		const Storage::SparseIdsSliceUpdate &update) {
169 	auto intersects = [](MsgRange range1, MsgRange range2) {
170 		return (range1.from <= range2.till)
171 			&& (range2.from <= range1.till);
172 	};
173 	auto needMergeMessages = (update.messages != nullptr)
174 		&& intersects(update.range, {
175 			_ids.empty() ? _key : _ids.front(),
176 			_ids.empty() ? _key : _ids.back()
177 		});
178 	if (!needMergeMessages && !update.count) {
179 		return false;
180 	}
181 	auto skippedBefore = (update.range.from == 0)
182 		? 0
183 		: std::optional<int> {};
184 	auto skippedAfter = (update.range.till == ServerMaxMsgId)
185 		? 0
186 		: std::optional<int> {};
187 	mergeSliceData(
188 		update.count,
189 		needMergeMessages
190 			? *update.messages
191 			: base::flat_set<MsgId> {},
192 		skippedBefore,
193 		skippedAfter);
194 	return true;
195 }
196 
removeOne(MsgId messageId)197 bool SparseIdsSliceBuilder::removeOne(MsgId messageId) {
198 	auto changed = false;
199 	if (_fullCount && *_fullCount > 0) {
200 		--*_fullCount;
201 		changed = true;
202 	}
203 	if (_ids.contains(messageId)) {
204 		_ids.remove(messageId);
205 		changed = true;
206 	} else if (!_ids.empty()) {
207 		if (_ids.front() > messageId
208 			&& _skippedBefore
209 			&& *_skippedBefore > 0) {
210 			--*_skippedBefore;
211 			changed = true;
212 		} else if (_ids.back() < messageId
213 			&& _skippedAfter
214 			&& *_skippedAfter > 0) {
215 			--*_skippedAfter;
216 			changed = true;
217 		}
218 	}
219 	if (changed) {
220 		checkInsufficient();
221 	}
222 	return changed;
223 }
224 
removeAll()225 bool SparseIdsSliceBuilder::removeAll() {
226 	_ids = {};
227 	_fullCount = 0;
228 	_skippedBefore = 0;
229 	_skippedAfter = 0;
230 	return true;
231 }
232 
invalidateBottom()233 bool SparseIdsSliceBuilder::invalidateBottom() {
234 	_fullCount = _skippedAfter = std::nullopt;
235 	checkInsufficient();
236 	return true;
237 }
238 
checkInsufficient()239 void SparseIdsSliceBuilder::checkInsufficient() {
240 	sliceToLimits();
241 }
242 
mergeSliceData(std::optional<int> count,const base::flat_set<MsgId> & messageIds,std::optional<int> skippedBefore,std::optional<int> skippedAfter)243 void SparseIdsSliceBuilder::mergeSliceData(
244 		std::optional<int> count,
245 		const base::flat_set<MsgId> &messageIds,
246 		std::optional<int> skippedBefore,
247 		std::optional<int> skippedAfter) {
248 	if (messageIds.empty()) {
249 		if (count && _fullCount != count) {
250 			_fullCount = count;
251 			if (*_fullCount <= _ids.size()) {
252 				_fullCount = _ids.size();
253 				_skippedBefore = _skippedAfter = 0;
254 			}
255 		}
256 		fillSkippedAndSliceToLimits();
257 		return;
258 	}
259 	if (count) {
260 		_fullCount = count;
261 	}
262 	auto wasMinId = _ids.empty() ? -1 : _ids.front();
263 	auto wasMaxId = _ids.empty() ? -1 : _ids.back();
264 	_ids.merge(messageIds.begin(), messageIds.end());
265 
266 	auto adjustSkippedBefore = [&](MsgId oldId, int oldSkippedBefore) {
267 		auto it = _ids.find(oldId);
268 		Assert(it != _ids.end());
269 		_skippedBefore = oldSkippedBefore - (it - _ids.begin());
270 		accumulate_max(*_skippedBefore, 0);
271 	};
272 	if (skippedBefore) {
273 		adjustSkippedBefore(messageIds.front(), *skippedBefore);
274 	} else if (wasMinId >= 0 && _skippedBefore) {
275 		adjustSkippedBefore(wasMinId, *_skippedBefore);
276 	} else {
277 		_skippedBefore = std::nullopt;
278 	}
279 
280 	auto adjustSkippedAfter = [&](MsgId oldId, int oldSkippedAfter) {
281 		auto it = _ids.find(oldId);
282 		Assert(it != _ids.end());
283 		_skippedAfter = oldSkippedAfter - (_ids.end() - it - 1);
284 		accumulate_max(*_skippedAfter, 0);
285 	};
286 	if (skippedAfter) {
287 		adjustSkippedAfter(messageIds.back(), *skippedAfter);
288 	} else if (wasMaxId >= 0 && _skippedAfter) {
289 		adjustSkippedAfter(wasMaxId, *_skippedAfter);
290 	} else {
291 		_skippedAfter = std::nullopt;
292 	}
293 	fillSkippedAndSliceToLimits();
294 }
295 
fillSkippedAndSliceToLimits()296 void SparseIdsSliceBuilder::fillSkippedAndSliceToLimits() {
297 	if (_fullCount) {
298 		if (_skippedBefore && !_skippedAfter) {
299 			_skippedAfter = *_fullCount
300 				- *_skippedBefore
301 				- int(_ids.size());
302 		} else if (_skippedAfter && !_skippedBefore) {
303 			_skippedBefore = *_fullCount
304 				- *_skippedAfter
305 				- int(_ids.size());
306 		}
307 	}
308 	sliceToLimits();
309 }
310 
sliceToLimits()311 void SparseIdsSliceBuilder::sliceToLimits() {
312 	if (!_key) {
313 		if (!_fullCount) {
314 			requestMessagesCount();
315 		}
316 		return;
317 	}
318 	auto requestedSomething = false;
319 	auto aroundIt = ranges::lower_bound(_ids, _key);
320 	auto removeFromBegin = (aroundIt - _ids.begin() - _limitBefore);
321 	auto removeFromEnd = (_ids.end() - aroundIt - _limitAfter - 1);
322 	if (removeFromBegin > 0) {
323 		_ids.erase(_ids.begin(), _ids.begin() + removeFromBegin);
324 		if (_skippedBefore) {
325 			*_skippedBefore += removeFromBegin;
326 		}
327 	} else if (removeFromBegin < 0
328 		&& (!_skippedBefore || *_skippedBefore > 0)) {
329 		requestedSomething = true;
330 		requestMessages(RequestDirection::Before);
331 	}
332 	if (removeFromEnd > 0) {
333 		_ids.erase(_ids.end() - removeFromEnd, _ids.end());
334 		if (_skippedAfter) {
335 			*_skippedAfter += removeFromEnd;
336 		}
337 	} else if (removeFromEnd < 0
338 		&& (!_skippedAfter || *_skippedAfter > 0)) {
339 		requestedSomething = true;
340 		requestMessages(RequestDirection::After);
341 	}
342 	if (!_fullCount && !requestedSomething) {
343 		requestMessagesCount();
344 	}
345 }
346 
requestMessages(RequestDirection direction)347 void SparseIdsSliceBuilder::requestMessages(
348 		RequestDirection direction) {
349 	auto requestAroundData = [&]() -> AroundData {
350 		if (_ids.empty()) {
351 			return { _key, Data::LoadDirection::Around };
352 		} else if (direction == RequestDirection::Before) {
353 			return { _ids.front(), Data::LoadDirection::Before };
354 		}
355 		return { _ids.back(), Data::LoadDirection::After };
356 	};
357 	_insufficientAround.fire(requestAroundData());
358 }
359 
requestMessagesCount()360 void SparseIdsSliceBuilder::requestMessagesCount() {
361 	_insufficientAround.fire({ 0, Data::LoadDirection::Around });
362 }
363 
snapshot() const364 SparseIdsSlice SparseIdsSliceBuilder::snapshot() const {
365 	return SparseIdsSlice(
366 		_ids,
367 		_fullCount,
368 		_skippedBefore,
369 		_skippedAfter);
370 }
371 
CreateViewer(SparseIdsMergedSlice::Key key,int limitBefore,int limitAfter,Fn<SimpleViewerFunction> simpleViewer)372 rpl::producer<SparseIdsMergedSlice> SparseIdsMergedSlice::CreateViewer(
373 		SparseIdsMergedSlice::Key key,
374 		int limitBefore,
375 		int limitAfter,
376 		Fn<SimpleViewerFunction> simpleViewer) {
377 	Expects(IsServerMsgId(key.universalId)
378 		|| (key.universalId == 0)
379 		|| (IsServerMsgId(ServerMaxMsgId + key.universalId) && key.migratedPeerId != 0));
380 	Expects((key.universalId != 0)
381 		|| (limitBefore == 0 && limitAfter == 0));
382 
383 	return [=](auto consumer) {
384 		auto partViewer = simpleViewer(
385 			key.peerId,
386 			SparseIdsMergedSlice::PartKey(key),
387 			limitBefore,
388 			limitAfter
389 		);
390 		if (!key.migratedPeerId) {
391 			return std::move(
392 				partViewer
393 			) | rpl::start_with_next([=](SparseIdsSlice &&part) {
394 				consumer.put_next(SparseIdsMergedSlice(
395 					key,
396 					std::move(part),
397 					std::nullopt));
398 			});
399 		}
400 		auto migratedViewer = simpleViewer(
401 			key.migratedPeerId,
402 			SparseIdsMergedSlice::MigratedKey(key),
403 			limitBefore,
404 			limitAfter);
405 		return rpl::combine(
406 			std::move(partViewer),
407 			std::move(migratedViewer)
408 		) | rpl::start_with_next([=](
409 				SparseIdsSlice &&part,
410 				SparseIdsSlice &&migrated) {
411 			consumer.put_next(SparseIdsMergedSlice(
412 				key,
413 				std::move(part),
414 				std::move(migrated)));
415 		});
416 	};
417 }
418