1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/files/PartsManager.h"
8
9 #include "td/telegram/files/FileLoaderUtils.h"
10
11 #include "td/utils/format.h"
12 #include "td/utils/logging.h"
13 #include "td/utils/misc.h"
14 #include "td/utils/SliceBuilder.h"
15
16 #include <limits>
17 #include <numeric>
18
19 namespace td {
20
21 namespace {
calc_part_count(int64 size,int64 part_size)22 int64 calc_part_count(int64 size, int64 part_size) {
23 CHECK(part_size != 0);
24 return (size + part_size - 1) / part_size;
25 }
26 } // namespace
27
init_known_prefix(int64 known_prefix,size_t part_size,const std::vector<int> & ready_parts)28 Status PartsManager::init_known_prefix(int64 known_prefix, size_t part_size, const std::vector<int> &ready_parts) {
29 known_prefix_flag_ = true;
30 known_prefix_size_ = known_prefix;
31 return init_no_size(part_size, ready_parts);
32 }
33
set_streaming_offset(int64 offset,int64 limit)34 int32 PartsManager::set_streaming_offset(int64 offset, int64 limit) {
35 auto finish = [&] {
36 set_streaming_limit(limit);
37 update_first_not_ready_part();
38 return first_streaming_not_ready_part_;
39 };
40
41 if (offset < 0 || need_check_ || (!unknown_size_flag_ && get_size() < offset)) {
42 streaming_offset_ = 0;
43 LOG_IF(ERROR, offset != 0) << "Ignore streaming_offset " << offset << ", need_check_ = " << need_check_
44 << ", unknown_size_flag_ = " << unknown_size_flag_ << ", size = " << get_size();
45
46 return finish();
47 }
48
49 auto part_i = offset / part_size_;
50 if (use_part_count_limit_ && part_i >= MAX_PART_COUNT) {
51 streaming_offset_ = 0;
52 LOG(ERROR) << "Ignore streaming_offset " << offset << " in part " << part_i;
53
54 return finish();
55 }
56
57 streaming_offset_ = offset;
58 first_streaming_empty_part_ = narrow_cast<int>(part_i);
59 first_streaming_not_ready_part_ = narrow_cast<int>(part_i);
60 if (part_count_ < first_streaming_empty_part_) {
61 part_count_ = first_streaming_empty_part_;
62 part_status_.resize(part_count_, PartStatus::Empty);
63 }
64
65 return finish();
66 }
67
get_pending_count() const68 int32 PartsManager::get_pending_count() const {
69 return pending_count_;
70 }
71
set_streaming_limit(int64 limit)72 void PartsManager::set_streaming_limit(int64 limit) {
73 streaming_limit_ = limit;
74 streaming_ready_size_ = 0;
75 if (streaming_limit_ == 0) {
76 return;
77 }
78 for (int part_i = 0; part_i < part_count_; part_i++) {
79 if (is_part_in_streaming_limit(part_i) && part_status_[part_i] == PartStatus::Ready) {
80 streaming_ready_size_ += get_part(part_i).size;
81 }
82 }
83 }
84
init_no_size(size_t part_size,const std::vector<int> & ready_parts)85 Status PartsManager::init_no_size(size_t part_size, const std::vector<int> &ready_parts) {
86 unknown_size_flag_ = true;
87 size_ = 0;
88 min_size_ = 0;
89 max_size_ = std::numeric_limits<int64>::max();
90
91 if (part_size != 0) {
92 part_size_ = part_size;
93 } else {
94 part_size_ = 32 << 10;
95 while (calc_part_count(expected_size_, part_size_) > MAX_PART_COUNT) {
96 part_size_ *= 2;
97 CHECK(part_size_ <= MAX_PART_SIZE);
98 }
99 // just in case if expected_size_ is wrong
100 if (part_size_ < MAX_PART_SIZE) {
101 part_size_ *= 2;
102 }
103 }
104 part_count_ =
105 std::accumulate(ready_parts.begin(), ready_parts.end(), 0, [](auto a, auto b) { return max(a, b + 1); });
106
107 return init_common(ready_parts);
108 }
109
init(int64 size,int64 expected_size,bool is_size_final,size_t part_size,const std::vector<int> & ready_parts,bool use_part_count_limit,bool is_upload)110 Status PartsManager::init(int64 size, int64 expected_size, bool is_size_final, size_t part_size,
111 const std::vector<int> &ready_parts, bool use_part_count_limit, bool is_upload) {
112 CHECK(expected_size >= size);
113 is_upload_ = is_upload;
114 use_part_count_limit_ = use_part_count_limit;
115 expected_size_ = expected_size;
116 if (expected_size_ > MAX_FILE_SIZE) {
117 return Status::Error("Too big file");
118 }
119 if (!is_size_final) {
120 return init_known_prefix(size, part_size, ready_parts);
121 }
122 if (size == 0) {
123 return init_no_size(part_size, ready_parts);
124 }
125 LOG_CHECK(size > 0) << tag("size", size);
126 unknown_size_flag_ = false;
127 size_ = size;
128
129 if (part_size != 0) {
130 part_size_ = part_size;
131 if (use_part_count_limit_ && calc_part_count(expected_size_, part_size_) > MAX_PART_COUNT) {
132 CHECK(is_upload_);
133 return Status::Error("FILE_UPLOAD_RESTART");
134 }
135 } else {
136 part_size_ = 64 << 10;
137 while (calc_part_count(expected_size_, part_size_) > MAX_PART_COUNT) {
138 part_size_ *= 2;
139 CHECK(part_size_ <= MAX_PART_SIZE);
140 }
141 }
142 LOG_CHECK(1 <= size_) << tag("size_", size_);
143 LOG_CHECK(!use_part_count_limit || calc_part_count(expected_size_, part_size_) <= MAX_PART_COUNT)
144 << tag("size_", size_) << tag("expected_size", size_) << tag("is_size_final", is_size_final)
145 << tag("part_size_", part_size_) << tag("ready_parts", ready_parts.size());
146 part_count_ = static_cast<int>(calc_part_count(size_, part_size_));
147
148 return init_common(ready_parts);
149 }
150
unchecked_ready()151 bool PartsManager::unchecked_ready() {
152 VLOG(file_loader) << "Check readiness. Ready size is " << ready_size_ << ", total size is " << size_
153 << ", unknown_size_flag = " << unknown_size_flag_ << ", need_check = " << need_check_
154 << ", checked_prefix_size = " << checked_prefix_size_;
155 return !unknown_size_flag_ && ready_size_ == size_;
156 }
157
may_finish()158 bool PartsManager::may_finish() {
159 if (is_streaming_limit_reached()) {
160 return true;
161 }
162 return ready();
163 }
164
ready()165 bool PartsManager::ready() {
166 return unchecked_ready() && (!need_check_ || checked_prefix_size_ == size_);
167 }
168
finish()169 Status PartsManager::finish() {
170 if (ready()) {
171 return Status::OK();
172 }
173 if (is_streaming_limit_reached()) {
174 return Status::Error("FILE_DOWNLOAD_LIMIT");
175 }
176 return Status::Error("File transferring not finished");
177 }
178
update_first_empty_part()179 void PartsManager::update_first_empty_part() {
180 while (first_empty_part_ < part_count_ && part_status_[first_empty_part_] != PartStatus::Empty) {
181 first_empty_part_++;
182 }
183
184 if (streaming_offset_ == 0) {
185 first_streaming_empty_part_ = first_empty_part_;
186 return;
187 }
188 while (first_streaming_empty_part_ < part_count_ && part_status_[first_streaming_empty_part_] != PartStatus::Empty) {
189 first_streaming_empty_part_++;
190 }
191 }
192
update_first_not_ready_part()193 void PartsManager::update_first_not_ready_part() {
194 while (first_not_ready_part_ < part_count_ && part_status_[first_not_ready_part_] == PartStatus::Ready) {
195 first_not_ready_part_++;
196 }
197 if (streaming_offset_ == 0) {
198 first_streaming_not_ready_part_ = first_not_ready_part_;
199 return;
200 }
201 while (first_streaming_not_ready_part_ < part_count_ &&
202 part_status_[first_streaming_not_ready_part_] == PartStatus::Ready) {
203 first_streaming_not_ready_part_++;
204 }
205 }
206
get_unchecked_ready_prefix_count()207 int32 PartsManager::get_unchecked_ready_prefix_count() {
208 update_first_not_ready_part();
209 return first_not_ready_part_;
210 }
211
get_ready_prefix_count()212 int32 PartsManager::get_ready_prefix_count() {
213 auto res = get_unchecked_ready_prefix_count();
214 if (need_check_) {
215 auto checked_parts = narrow_cast<int32>(checked_prefix_size_ / part_size_);
216 if (checked_parts < res) {
217 return checked_parts;
218 }
219 }
220 return res;
221 }
222
get_streaming_offset() const223 int64 PartsManager::get_streaming_offset() const {
224 return streaming_offset_;
225 }
226
get_bitmask()227 string PartsManager::get_bitmask() {
228 int32 prefix_count = -1;
229 if (need_check_) {
230 prefix_count = narrow_cast<int32>(checked_prefix_size_ / part_size_);
231 }
232 return bitmask_.encode(prefix_count);
233 }
234
is_part_in_streaming_limit(int part_i) const235 bool PartsManager::is_part_in_streaming_limit(int part_i) const {
236 CHECK(part_i < part_count_);
237 auto offset_begin = static_cast<int64>(part_i) * static_cast<int64>(get_part_size());
238 auto offset_end = offset_begin + static_cast<int64>(get_part(part_i).size);
239
240 if (offset_begin >= get_expected_size()) {
241 return false;
242 }
243
244 if (streaming_limit_ == 0) {
245 return true;
246 }
247
248 auto is_intersect_with = [&](int64 begin, int64 end) {
249 return max(begin, offset_begin) < min(end, offset_end);
250 };
251
252 auto streaming_begin = streaming_offset_;
253 auto streaming_end = streaming_offset_ + streaming_limit_;
254 if (is_intersect_with(streaming_begin, streaming_end)) {
255 return true;
256 }
257 // wrap limit
258 if (!unknown_size_flag_ && streaming_end > get_size() && is_intersect_with(0, streaming_end - get_size())) {
259 return true;
260 }
261 return false;
262 }
263
is_streaming_limit_reached()264 bool PartsManager::is_streaming_limit_reached() {
265 if (streaming_limit_ == 0) {
266 return false;
267 }
268 update_first_not_ready_part();
269 auto part_i = first_streaming_not_ready_part_;
270
271 // wrap
272 if (!unknown_size_flag_ && part_i == part_count_) {
273 part_i = first_not_ready_part_;
274 }
275 if (part_i == part_count_) {
276 return false;
277 }
278 return !is_part_in_streaming_limit(part_i);
279 }
280
start_part()281 Result<Part> PartsManager::start_part() {
282 update_first_empty_part();
283 auto part_i = first_streaming_empty_part_;
284 if (known_prefix_flag_ && part_i >= static_cast<int>(known_prefix_size_ / part_size_)) {
285 return Status::Error(1, "Wait for prefix to be known");
286 }
287 if (part_i == part_count_) {
288 if (unknown_size_flag_) {
289 part_count_++;
290 if (part_count_ > MAX_PART_COUNT + (use_part_count_limit_ ? 0 : 64)) {
291 if (!is_upload_) {
292 // Caller will try to increase part size if it is possible
293 return Status::Error("FILE_DOWNLOAD_RESTART_INCREASE_PART_SIZE");
294 }
295 return Status::Error("Too big file with unknown size");
296 }
297 part_status_.push_back(PartStatus::Empty);
298 } else {
299 if (first_empty_part_ < part_count_) {
300 part_i = first_empty_part_;
301 } else {
302 return get_empty_part();
303 }
304 }
305 }
306
307 if (!is_part_in_streaming_limit(part_i)) {
308 return get_empty_part();
309 }
310 CHECK(part_status_[part_i] == PartStatus::Empty);
311 on_part_start(part_i);
312 return get_part(part_i);
313 }
314
set_known_prefix(size_t size,bool is_ready)315 Status PartsManager::set_known_prefix(size_t size, bool is_ready) {
316 if (!known_prefix_flag_ || size < static_cast<size_t>(known_prefix_size_)) {
317 CHECK(is_upload_);
318 return Status::Error("FILE_UPLOAD_RESTART");
319 }
320 known_prefix_size_ = narrow_cast<int64>(size);
321 expected_size_ = max(known_prefix_size_, expected_size_);
322
323 CHECK(static_cast<size_t>(part_count_) == part_status_.size());
324 if (is_ready) {
325 part_count_ = static_cast<int>(calc_part_count(size, part_size_));
326
327 size_ = narrow_cast<int64>(size);
328 unknown_size_flag_ = false;
329 known_prefix_flag_ = false;
330 } else {
331 part_count_ = static_cast<int>(size / part_size_);
332 }
333
334 LOG_CHECK(static_cast<size_t>(part_count_) >= part_status_.size())
335 << size << " " << is_ready << " " << part_count_ << " " << part_size_ << " " << part_status_.size();
336 part_status_.resize(part_count_);
337 if (use_part_count_limit_ && calc_part_count(expected_size_, part_size_) > MAX_PART_COUNT) {
338 CHECK(is_upload_);
339 return Status::Error("FILE_UPLOAD_RESTART");
340 }
341 return Status::OK();
342 }
343
on_part_ok(int32 id,size_t part_size,size_t actual_size)344 Status PartsManager::on_part_ok(int32 id, size_t part_size, size_t actual_size) {
345 CHECK(part_status_[id] == PartStatus::Pending);
346 pending_count_--;
347
348 part_status_[id] = PartStatus::Ready;
349 if (actual_size != 0) {
350 bitmask_.set(id);
351 }
352 ready_size_ += narrow_cast<int64>(actual_size);
353 if (streaming_limit_ > 0 && is_part_in_streaming_limit(id)) {
354 streaming_ready_size_ += narrow_cast<int64>(actual_size);
355 }
356
357 VLOG(file_loader) << "Transferred part " << id << " of size " << part_size << ", total ready size = " << ready_size_;
358
359 int64 offset = narrow_cast<int64>(part_size_) * id;
360 int64 end_offset = offset + narrow_cast<int64>(actual_size);
361 if (unknown_size_flag_) {
362 CHECK(part_size == part_size_);
363 if (actual_size < part_size_) {
364 max_size_ = min(max_size_, end_offset);
365 }
366 if (actual_size) {
367 min_size_ = max(min_size_, end_offset);
368 }
369 if (min_size_ > max_size_) {
370 auto status = Status::Error(PSLICE() << "Failed to transfer file: " << tag("min_size", min_size_)
371 << tag("max_size", max_size_));
372 LOG(ERROR) << status;
373 return status;
374 } else if (min_size_ == max_size_) {
375 unknown_size_flag_ = false;
376 size_ = min_size_;
377 }
378 } else {
379 if ((actual_size < part_size && offset < size_) || (offset >= size_ && actual_size > 0)) {
380 auto status = Status::Error(PSLICE() << "Failed to transfer file: " << tag("size", size_) << tag("offset", offset)
381 << tag("transferred size", actual_size) << tag("part size", part_size));
382 LOG(ERROR) << status;
383 return status;
384 }
385 }
386 return Status::OK();
387 }
388
on_part_failed(int32 id)389 void PartsManager::on_part_failed(int32 id) {
390 CHECK(part_status_[id] == PartStatus::Pending);
391 pending_count_--;
392 part_status_[id] = PartStatus::Empty;
393 if (id < first_empty_part_) {
394 first_empty_part_ = id;
395 }
396 if (streaming_offset_ == 0) {
397 first_streaming_empty_part_ = id;
398 return;
399 }
400 auto part_i = narrow_cast<int>(streaming_offset_ / part_size_);
401 if (id >= part_i && id < first_streaming_empty_part_) {
402 first_streaming_empty_part_ = id;
403 }
404 }
405
get_size() const406 int64 PartsManager::get_size() const {
407 CHECK(!unknown_size_flag_);
408 return size_;
409 }
410
get_size_or_zero() const411 int64 PartsManager::get_size_or_zero() const {
412 return size_;
413 }
414
get_estimated_extra() const415 int64 PartsManager::get_estimated_extra() const {
416 auto total_estimated_extra = get_expected_size() - get_ready_size();
417 if (streaming_limit_ != 0) {
418 int64 expected_size = get_expected_size();
419 int64 streaming_begin = streaming_offset_ / get_part_size() * get_part_size();
420 int64 streaming_end =
421 (streaming_offset_ + streaming_limit_ + get_part_size() - 1) / get_part_size() * get_part_size();
422 int64 streaming_size = streaming_end - streaming_begin;
423 if (unknown_size_flag_) {
424 if (streaming_begin < expected_size) {
425 streaming_size = min(expected_size - streaming_begin, streaming_size);
426 } else {
427 streaming_size = 0;
428 }
429 } else {
430 if (streaming_end > expected_size) {
431 int64 total = streaming_limit_;
432 int64 suffix = 0;
433 if (streaming_offset_ < expected_size_) {
434 suffix = expected_size_ - streaming_begin;
435 total -= expected_size_ - streaming_offset_;
436 }
437 int64 prefix = (total + get_part_size() - 1) / get_part_size() * get_part_size();
438 streaming_size = min(expected_size, prefix + suffix);
439 }
440 }
441 int64 res = streaming_size;
442
443 //TODO: delete this block if CHECK won't fail
444 int64 sub = 0;
445 for (int part_i = 0; part_i < part_count_; part_i++) {
446 if (is_part_in_streaming_limit(part_i) && part_status_[part_i] == PartStatus::Ready) {
447 sub += get_part(part_i).size;
448 }
449 }
450 CHECK(sub == streaming_ready_size_);
451
452 res -= streaming_ready_size_;
453 CHECK(res >= 0);
454 return res;
455 }
456 return total_estimated_extra;
457 }
458
get_ready_size() const459 int64 PartsManager::get_ready_size() const {
460 return ready_size_;
461 }
462
get_expected_size() const463 int64 PartsManager::get_expected_size() const {
464 if (unknown_size_flag_) {
465 return max(static_cast<int64>(512 * (1 << 10)), get_ready_size() * 2);
466 }
467 return get_size();
468 }
469
get_part_size() const470 size_t PartsManager::get_part_size() const {
471 return part_size_;
472 }
473
get_part_count() const474 int32 PartsManager::get_part_count() const {
475 return part_count_;
476 }
477
init_common(const std::vector<int> & ready_parts)478 Status PartsManager::init_common(const std::vector<int> &ready_parts) {
479 ready_size_ = 0;
480 streaming_ready_size_ = 0;
481 pending_count_ = 0;
482 first_empty_part_ = 0;
483 first_not_ready_part_ = 0;
484 part_status_ = vector<PartStatus>(part_count_);
485
486 for (auto i : ready_parts) {
487 if (known_prefix_flag_ && i >= static_cast<int>(known_prefix_size_ / part_size_)) {
488 CHECK(is_upload_);
489 return Status::Error("FILE_UPLOAD_RESTART");
490 }
491 if (is_upload_ && i >= part_count_) {
492 return Status::Error("FILE_UPLOAD_RESTART");
493 }
494 LOG_CHECK(0 <= i && i < part_count_) << tag("i", i) << tag("part_count", part_count_) << tag("size", size_)
495 << tag("part_size", part_size_) << tag("known_prefix_flag", known_prefix_flag_)
496 << tag("known_prefix_size", known_prefix_size_)
497 << tag("real part_count",
498 std::accumulate(ready_parts.begin(), ready_parts.end(), 0,
499 [](auto a, auto b) { return max(a, b + 1); }));
500 part_status_[i] = PartStatus::Ready;
501 bitmask_.set(i);
502 auto part = get_part(i);
503 ready_size_ += narrow_cast<int64>(part.size);
504 }
505
506 checked_prefix_size_ = get_ready_prefix_count() * narrow_cast<int64>(part_size_);
507
508 return Status::OK();
509 }
510
set_need_check()511 void PartsManager::set_need_check() {
512 need_check_ = true;
513 set_streaming_offset(0, 0);
514 }
515
set_checked_prefix_size(int64 size)516 void PartsManager::set_checked_prefix_size(int64 size) {
517 checked_prefix_size_ = size;
518 }
519
get_checked_prefix_size() const520 int64 PartsManager::get_checked_prefix_size() const {
521 return checked_prefix_size_;
522 }
523
get_unchecked_ready_prefix_size()524 int64 PartsManager::get_unchecked_ready_prefix_size() {
525 update_first_not_ready_part();
526 auto count = first_not_ready_part_;
527 if (count == 0) {
528 return 0;
529 }
530 auto part = get_part(count - 1);
531 int64 res = part.offset;
532 if (!unknown_size_flag_) {
533 res += narrow_cast<int64>(part.size);
534 res = min(res, get_size());
535 }
536 return res;
537 }
538
get_part(int id) const539 Part PartsManager::get_part(int id) const {
540 auto size = narrow_cast<int64>(part_size_);
541 auto offset = size * id;
542 auto total_size = unknown_size_flag_ ? max_size_ : get_size();
543 if (total_size < offset) {
544 size = 0;
545 } else {
546 size = min(size, total_size - offset);
547 }
548 return Part{id, offset, static_cast<size_t>(size)};
549 }
550
get_empty_part()551 Part PartsManager::get_empty_part() {
552 return Part{-1, 0, 0};
553 }
554
on_part_start(int32 id)555 void PartsManager::on_part_start(int32 id) {
556 CHECK(part_status_[id] == PartStatus::Empty);
557 part_status_[id] = PartStatus::Pending;
558 pending_count_++;
559 }
560
561 } // namespace td
562