1 /*
2 This file is part of Telegram Desktop,
3 the official desktop application for the Telegram messaging service.
4
5 For license and copyright information please follow this link:
6 https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
7 */
8 #include "data/data_sparse_ids.h"
9
10 #include <rpl/combine.h>
11 #include "storage/storage_sparse_ids_list.h"
12
SparseIdsMergedSlice(Key key)13 SparseIdsMergedSlice::SparseIdsMergedSlice(Key key)
14 : SparseIdsMergedSlice(
15 key,
16 SparseIdsSlice(),
17 MigratedSlice(key)) {
18 }
19
SparseIdsMergedSlice(Key key,SparseIdsSlice part,std::optional<SparseIdsSlice> migrated)20 SparseIdsMergedSlice::SparseIdsMergedSlice(
21 Key key,
22 SparseIdsSlice part,
23 std::optional<SparseIdsSlice> migrated)
24 : _key(key)
25 , _part(std::move(part))
26 , _migrated(std::move(migrated)) {
27 }
28
SparseIdsMergedSlice(Key key,SparseUnsortedIdsSlice scheduled)29 SparseIdsMergedSlice::SparseIdsMergedSlice(
30 Key key,
31 SparseUnsortedIdsSlice scheduled)
32 : _key(key)
33 , _scheduled(std::move(scheduled)) {
34 }
35
fullCount() const36 std::optional<int> SparseIdsMergedSlice::fullCount() const {
37 return _scheduled
38 ? _scheduled->fullCount()
39 : Add(
40 _part.fullCount(),
41 _migrated ? _migrated->fullCount() : 0);
42 }
43
skippedBefore() const44 std::optional<int> SparseIdsMergedSlice::skippedBefore() const {
45 return _scheduled
46 ? _scheduled->skippedBefore()
47 : Add(
48 isolatedInMigrated() ? 0 : _part.skippedBefore(),
49 _migrated
50 ? (isolatedInPart()
51 ? _migrated->fullCount()
52 : _migrated->skippedBefore())
53 : 0
54 );
55 }
56
skippedAfter() const57 std::optional<int> SparseIdsMergedSlice::skippedAfter() const {
58 return _scheduled
59 ? _scheduled->skippedAfter()
60 : Add(
61 isolatedInMigrated() ? _part.fullCount() : _part.skippedAfter(),
62 isolatedInPart() ? 0 : _migrated->skippedAfter()
63 );
64 }
65
indexOf(FullMsgId fullId) const66 std::optional<int> SparseIdsMergedSlice::indexOf(
67 FullMsgId fullId) const {
68 return _scheduled
69 ? _scheduled->indexOf(fullId.msg)
70 : isFromPart(fullId)
71 ? (_part.indexOf(fullId.msg) | func::add(migratedSize()))
72 : isolatedInPart()
73 ? std::nullopt
74 : isFromMigrated(fullId)
75 ? _migrated->indexOf(fullId.msg)
76 : std::nullopt;
77 }
78
size() const79 int SparseIdsMergedSlice::size() const {
80 return _scheduled
81 ? _scheduled->size()
82 : (isolatedInPart() ? 0 : migratedSize())
83 + (isolatedInMigrated() ? 0 : _part.size());
84 }
85
operator [](int index) const86 FullMsgId SparseIdsMergedSlice::operator[](int index) const {
87 Expects(index >= 0 && index < size());
88
89 if (_scheduled) {
90 return ComputeId(_key.peerId, (*_scheduled)[index]);
91 }
92
93 if (const auto size = migratedSize()) {
94 if (index < size) {
95 return ComputeId(_key.migratedPeerId, (*_migrated)[index]);
96 }
97 index -= size;
98 }
99 return ComputeId(_key.peerId, _part[index]);
100 }
101
distance(const Key & a,const Key & b) const102 std::optional<int> SparseIdsMergedSlice::distance(
103 const Key &a,
104 const Key &b) const {
105 if (const auto i = indexOf(ComputeId(a))) {
106 if (const auto j = indexOf(ComputeId(b))) {
107 return *j - *i;
108 }
109 }
110 return std::nullopt;
111 }
112
nearest(UniversalMsgId id) const113 auto SparseIdsMergedSlice::nearest(
114 UniversalMsgId id) const -> std::optional<FullMsgId> {
115 if (_scheduled) {
116 if (const auto nearestId = _scheduled->nearest(id)) {
117 return ComputeId(_key.peerId, *nearestId);
118 }
119 }
120 const auto convertFromPartNearest = [&](MsgId result) {
121 return ComputeId(_key.peerId, result);
122 };
123 const auto convertFromMigratedNearest = [&](MsgId result) {
124 return ComputeId(_key.migratedPeerId, result);
125 };
126 if (IsServerMsgId(id)) {
127 if (auto partNearestId = _part.nearest(id)) {
128 return partNearestId
129 | convertFromPartNearest;
130 } else if (isolatedInPart()) {
131 return std::nullopt;
132 }
133 return _migrated->nearest(ServerMaxMsgId - 1)
134 | convertFromMigratedNearest;
135 }
136 if (auto migratedNearestId = _migrated
137 ? _migrated->nearest(id + ServerMaxMsgId)
138 : std::nullopt) {
139 return migratedNearestId
140 | convertFromMigratedNearest;
141 } else if (isolatedInMigrated()) {
142 return std::nullopt;
143 }
144 return _part.nearest(0)
145 | convertFromPartNearest;
146 }
147
SparseIdsSliceBuilder(Key key,int limitBefore,int limitAfter)148 SparseIdsSliceBuilder::SparseIdsSliceBuilder(
149 Key key,
150 int limitBefore,
151 int limitAfter)
152 : _key(key)
153 , _limitBefore(limitBefore)
154 , _limitAfter(limitAfter) {
155 }
156
applyInitial(const Storage::SparseIdsListResult & result)157 bool SparseIdsSliceBuilder::applyInitial(
158 const Storage::SparseIdsListResult &result) {
159 mergeSliceData(
160 result.count,
161 result.messageIds,
162 result.skippedBefore,
163 result.skippedAfter);
164 return true;
165 }
166
applyUpdate(const Storage::SparseIdsSliceUpdate & update)167 bool SparseIdsSliceBuilder::applyUpdate(
168 const Storage::SparseIdsSliceUpdate &update) {
169 auto intersects = [](MsgRange range1, MsgRange range2) {
170 return (range1.from <= range2.till)
171 && (range2.from <= range1.till);
172 };
173 auto needMergeMessages = (update.messages != nullptr)
174 && intersects(update.range, {
175 _ids.empty() ? _key : _ids.front(),
176 _ids.empty() ? _key : _ids.back()
177 });
178 if (!needMergeMessages && !update.count) {
179 return false;
180 }
181 auto skippedBefore = (update.range.from == 0)
182 ? 0
183 : std::optional<int> {};
184 auto skippedAfter = (update.range.till == ServerMaxMsgId)
185 ? 0
186 : std::optional<int> {};
187 mergeSliceData(
188 update.count,
189 needMergeMessages
190 ? *update.messages
191 : base::flat_set<MsgId> {},
192 skippedBefore,
193 skippedAfter);
194 return true;
195 }
196
removeOne(MsgId messageId)197 bool SparseIdsSliceBuilder::removeOne(MsgId messageId) {
198 auto changed = false;
199 if (_fullCount && *_fullCount > 0) {
200 --*_fullCount;
201 changed = true;
202 }
203 if (_ids.contains(messageId)) {
204 _ids.remove(messageId);
205 changed = true;
206 } else if (!_ids.empty()) {
207 if (_ids.front() > messageId
208 && _skippedBefore
209 && *_skippedBefore > 0) {
210 --*_skippedBefore;
211 changed = true;
212 } else if (_ids.back() < messageId
213 && _skippedAfter
214 && *_skippedAfter > 0) {
215 --*_skippedAfter;
216 changed = true;
217 }
218 }
219 if (changed) {
220 checkInsufficient();
221 }
222 return changed;
223 }
224
removeAll()225 bool SparseIdsSliceBuilder::removeAll() {
226 _ids = {};
227 _fullCount = 0;
228 _skippedBefore = 0;
229 _skippedAfter = 0;
230 return true;
231 }
232
invalidateBottom()233 bool SparseIdsSliceBuilder::invalidateBottom() {
234 _fullCount = _skippedAfter = std::nullopt;
235 checkInsufficient();
236 return true;
237 }
238
checkInsufficient()239 void SparseIdsSliceBuilder::checkInsufficient() {
240 sliceToLimits();
241 }
242
mergeSliceData(std::optional<int> count,const base::flat_set<MsgId> & messageIds,std::optional<int> skippedBefore,std::optional<int> skippedAfter)243 void SparseIdsSliceBuilder::mergeSliceData(
244 std::optional<int> count,
245 const base::flat_set<MsgId> &messageIds,
246 std::optional<int> skippedBefore,
247 std::optional<int> skippedAfter) {
248 if (messageIds.empty()) {
249 if (count && _fullCount != count) {
250 _fullCount = count;
251 if (*_fullCount <= _ids.size()) {
252 _fullCount = _ids.size();
253 _skippedBefore = _skippedAfter = 0;
254 }
255 }
256 fillSkippedAndSliceToLimits();
257 return;
258 }
259 if (count) {
260 _fullCount = count;
261 }
262 auto wasMinId = _ids.empty() ? -1 : _ids.front();
263 auto wasMaxId = _ids.empty() ? -1 : _ids.back();
264 _ids.merge(messageIds.begin(), messageIds.end());
265
266 auto adjustSkippedBefore = [&](MsgId oldId, int oldSkippedBefore) {
267 auto it = _ids.find(oldId);
268 Assert(it != _ids.end());
269 _skippedBefore = oldSkippedBefore - (it - _ids.begin());
270 accumulate_max(*_skippedBefore, 0);
271 };
272 if (skippedBefore) {
273 adjustSkippedBefore(messageIds.front(), *skippedBefore);
274 } else if (wasMinId >= 0 && _skippedBefore) {
275 adjustSkippedBefore(wasMinId, *_skippedBefore);
276 } else {
277 _skippedBefore = std::nullopt;
278 }
279
280 auto adjustSkippedAfter = [&](MsgId oldId, int oldSkippedAfter) {
281 auto it = _ids.find(oldId);
282 Assert(it != _ids.end());
283 _skippedAfter = oldSkippedAfter - (_ids.end() - it - 1);
284 accumulate_max(*_skippedAfter, 0);
285 };
286 if (skippedAfter) {
287 adjustSkippedAfter(messageIds.back(), *skippedAfter);
288 } else if (wasMaxId >= 0 && _skippedAfter) {
289 adjustSkippedAfter(wasMaxId, *_skippedAfter);
290 } else {
291 _skippedAfter = std::nullopt;
292 }
293 fillSkippedAndSliceToLimits();
294 }
295
fillSkippedAndSliceToLimits()296 void SparseIdsSliceBuilder::fillSkippedAndSliceToLimits() {
297 if (_fullCount) {
298 if (_skippedBefore && !_skippedAfter) {
299 _skippedAfter = *_fullCount
300 - *_skippedBefore
301 - int(_ids.size());
302 } else if (_skippedAfter && !_skippedBefore) {
303 _skippedBefore = *_fullCount
304 - *_skippedAfter
305 - int(_ids.size());
306 }
307 }
308 sliceToLimits();
309 }
310
sliceToLimits()311 void SparseIdsSliceBuilder::sliceToLimits() {
312 if (!_key) {
313 if (!_fullCount) {
314 requestMessagesCount();
315 }
316 return;
317 }
318 auto requestedSomething = false;
319 auto aroundIt = ranges::lower_bound(_ids, _key);
320 auto removeFromBegin = (aroundIt - _ids.begin() - _limitBefore);
321 auto removeFromEnd = (_ids.end() - aroundIt - _limitAfter - 1);
322 if (removeFromBegin > 0) {
323 _ids.erase(_ids.begin(), _ids.begin() + removeFromBegin);
324 if (_skippedBefore) {
325 *_skippedBefore += removeFromBegin;
326 }
327 } else if (removeFromBegin < 0
328 && (!_skippedBefore || *_skippedBefore > 0)) {
329 requestedSomething = true;
330 requestMessages(RequestDirection::Before);
331 }
332 if (removeFromEnd > 0) {
333 _ids.erase(_ids.end() - removeFromEnd, _ids.end());
334 if (_skippedAfter) {
335 *_skippedAfter += removeFromEnd;
336 }
337 } else if (removeFromEnd < 0
338 && (!_skippedAfter || *_skippedAfter > 0)) {
339 requestedSomething = true;
340 requestMessages(RequestDirection::After);
341 }
342 if (!_fullCount && !requestedSomething) {
343 requestMessagesCount();
344 }
345 }
346
requestMessages(RequestDirection direction)347 void SparseIdsSliceBuilder::requestMessages(
348 RequestDirection direction) {
349 auto requestAroundData = [&]() -> AroundData {
350 if (_ids.empty()) {
351 return { _key, Data::LoadDirection::Around };
352 } else if (direction == RequestDirection::Before) {
353 return { _ids.front(), Data::LoadDirection::Before };
354 }
355 return { _ids.back(), Data::LoadDirection::After };
356 };
357 _insufficientAround.fire(requestAroundData());
358 }
359
requestMessagesCount()360 void SparseIdsSliceBuilder::requestMessagesCount() {
361 _insufficientAround.fire({ 0, Data::LoadDirection::Around });
362 }
363
snapshot() const364 SparseIdsSlice SparseIdsSliceBuilder::snapshot() const {
365 return SparseIdsSlice(
366 _ids,
367 _fullCount,
368 _skippedBefore,
369 _skippedAfter);
370 }
371
CreateViewer(SparseIdsMergedSlice::Key key,int limitBefore,int limitAfter,Fn<SimpleViewerFunction> simpleViewer)372 rpl::producer<SparseIdsMergedSlice> SparseIdsMergedSlice::CreateViewer(
373 SparseIdsMergedSlice::Key key,
374 int limitBefore,
375 int limitAfter,
376 Fn<SimpleViewerFunction> simpleViewer) {
377 Expects(IsServerMsgId(key.universalId)
378 || (key.universalId == 0)
379 || (IsServerMsgId(ServerMaxMsgId + key.universalId) && key.migratedPeerId != 0));
380 Expects((key.universalId != 0)
381 || (limitBefore == 0 && limitAfter == 0));
382
383 return [=](auto consumer) {
384 auto partViewer = simpleViewer(
385 key.peerId,
386 SparseIdsMergedSlice::PartKey(key),
387 limitBefore,
388 limitAfter
389 );
390 if (!key.migratedPeerId) {
391 return std::move(
392 partViewer
393 ) | rpl::start_with_next([=](SparseIdsSlice &&part) {
394 consumer.put_next(SparseIdsMergedSlice(
395 key,
396 std::move(part),
397 std::nullopt));
398 });
399 }
400 auto migratedViewer = simpleViewer(
401 key.migratedPeerId,
402 SparseIdsMergedSlice::MigratedKey(key),
403 limitBefore,
404 limitAfter);
405 return rpl::combine(
406 std::move(partViewer),
407 std::move(migratedViewer)
408 ) | rpl::start_with_next([=](
409 SparseIdsSlice &&part,
410 SparseIdsSlice &&migrated) {
411 consumer.put_next(SparseIdsMergedSlice(
412 key,
413 std::move(part),
414 std::move(migrated)));
415 });
416 };
417 }
418