1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/files/PartsManager.h"
8 
9 #include "td/telegram/files/FileLoaderUtils.h"
10 
11 #include "td/utils/format.h"
12 #include "td/utils/logging.h"
13 #include "td/utils/misc.h"
14 #include "td/utils/SliceBuilder.h"
15 
16 #include <limits>
17 #include <numeric>
18 
19 namespace td {
20 
21 namespace {
calc_part_count(int64 size,int64 part_size)22 int64 calc_part_count(int64 size, int64 part_size) {
23   CHECK(part_size != 0);
24   return (size + part_size - 1) / part_size;
25 }
26 }  // namespace
27 
init_known_prefix(int64 known_prefix,size_t part_size,const std::vector<int> & ready_parts)28 Status PartsManager::init_known_prefix(int64 known_prefix, size_t part_size, const std::vector<int> &ready_parts) {
29   known_prefix_flag_ = true;
30   known_prefix_size_ = known_prefix;
31   return init_no_size(part_size, ready_parts);
32 }
33 
set_streaming_offset(int64 offset,int64 limit)34 int32 PartsManager::set_streaming_offset(int64 offset, int64 limit) {
35   auto finish = [&] {
36     set_streaming_limit(limit);
37     update_first_not_ready_part();
38     return first_streaming_not_ready_part_;
39   };
40 
41   if (offset < 0 || need_check_ || (!unknown_size_flag_ && get_size() < offset)) {
42     streaming_offset_ = 0;
43     LOG_IF(ERROR, offset != 0) << "Ignore streaming_offset " << offset << ", need_check_ = " << need_check_
44                                << ", unknown_size_flag_ = " << unknown_size_flag_ << ", size = " << get_size();
45 
46     return finish();
47   }
48 
49   auto part_i = offset / part_size_;
50   if (use_part_count_limit_ && part_i >= MAX_PART_COUNT) {
51     streaming_offset_ = 0;
52     LOG(ERROR) << "Ignore streaming_offset " << offset << " in part " << part_i;
53 
54     return finish();
55   }
56 
57   streaming_offset_ = offset;
58   first_streaming_empty_part_ = narrow_cast<int>(part_i);
59   first_streaming_not_ready_part_ = narrow_cast<int>(part_i);
60   if (part_count_ < first_streaming_empty_part_) {
61     part_count_ = first_streaming_empty_part_;
62     part_status_.resize(part_count_, PartStatus::Empty);
63   }
64 
65   return finish();
66 }
67 
get_pending_count() const68 int32 PartsManager::get_pending_count() const {
69   return pending_count_;
70 }
71 
set_streaming_limit(int64 limit)72 void PartsManager::set_streaming_limit(int64 limit) {
73   streaming_limit_ = limit;
74   streaming_ready_size_ = 0;
75   if (streaming_limit_ == 0) {
76     return;
77   }
78   for (int part_i = 0; part_i < part_count_; part_i++) {
79     if (is_part_in_streaming_limit(part_i) && part_status_[part_i] == PartStatus::Ready) {
80       streaming_ready_size_ += get_part(part_i).size;
81     }
82   }
83 }
84 
init_no_size(size_t part_size,const std::vector<int> & ready_parts)85 Status PartsManager::init_no_size(size_t part_size, const std::vector<int> &ready_parts) {
86   unknown_size_flag_ = true;
87   size_ = 0;
88   min_size_ = 0;
89   max_size_ = std::numeric_limits<int64>::max();
90 
91   if (part_size != 0) {
92     part_size_ = part_size;
93   } else {
94     part_size_ = 32 << 10;
95     while (calc_part_count(expected_size_, part_size_) > MAX_PART_COUNT) {
96       part_size_ *= 2;
97       CHECK(part_size_ <= MAX_PART_SIZE);
98     }
99     // just in case if expected_size_ is wrong
100     if (part_size_ < MAX_PART_SIZE) {
101       part_size_ *= 2;
102     }
103   }
104   part_count_ =
105       std::accumulate(ready_parts.begin(), ready_parts.end(), 0, [](auto a, auto b) { return max(a, b + 1); });
106 
107   return init_common(ready_parts);
108 }
109 
init(int64 size,int64 expected_size,bool is_size_final,size_t part_size,const std::vector<int> & ready_parts,bool use_part_count_limit,bool is_upload)110 Status PartsManager::init(int64 size, int64 expected_size, bool is_size_final, size_t part_size,
111                           const std::vector<int> &ready_parts, bool use_part_count_limit, bool is_upload) {
112   CHECK(expected_size >= size);
113   is_upload_ = is_upload;
114   use_part_count_limit_ = use_part_count_limit;
115   expected_size_ = expected_size;
116   if (expected_size_ > MAX_FILE_SIZE) {
117     return Status::Error("Too big file");
118   }
119   if (!is_size_final) {
120     return init_known_prefix(size, part_size, ready_parts);
121   }
122   if (size == 0) {
123     return init_no_size(part_size, ready_parts);
124   }
125   LOG_CHECK(size > 0) << tag("size", size);
126   unknown_size_flag_ = false;
127   size_ = size;
128 
129   if (part_size != 0) {
130     part_size_ = part_size;
131     if (use_part_count_limit_ && calc_part_count(expected_size_, part_size_) > MAX_PART_COUNT) {
132       CHECK(is_upload_);
133       return Status::Error("FILE_UPLOAD_RESTART");
134     }
135   } else {
136     part_size_ = 64 << 10;
137     while (calc_part_count(expected_size_, part_size_) > MAX_PART_COUNT) {
138       part_size_ *= 2;
139       CHECK(part_size_ <= MAX_PART_SIZE);
140     }
141   }
142   LOG_CHECK(1 <= size_) << tag("size_", size_);
143   LOG_CHECK(!use_part_count_limit || calc_part_count(expected_size_, part_size_) <= MAX_PART_COUNT)
144       << tag("size_", size_) << tag("expected_size", size_) << tag("is_size_final", is_size_final)
145       << tag("part_size_", part_size_) << tag("ready_parts", ready_parts.size());
146   part_count_ = static_cast<int>(calc_part_count(size_, part_size_));
147 
148   return init_common(ready_parts);
149 }
150 
unchecked_ready()151 bool PartsManager::unchecked_ready() {
152   VLOG(file_loader) << "Check readiness. Ready size is " << ready_size_ << ", total size is " << size_
153                     << ", unknown_size_flag = " << unknown_size_flag_ << ", need_check = " << need_check_
154                     << ", checked_prefix_size = " << checked_prefix_size_;
155   return !unknown_size_flag_ && ready_size_ == size_;
156 }
157 
may_finish()158 bool PartsManager::may_finish() {
159   if (is_streaming_limit_reached()) {
160     return true;
161   }
162   return ready();
163 }
164 
ready()165 bool PartsManager::ready() {
166   return unchecked_ready() && (!need_check_ || checked_prefix_size_ == size_);
167 }
168 
finish()169 Status PartsManager::finish() {
170   if (ready()) {
171     return Status::OK();
172   }
173   if (is_streaming_limit_reached()) {
174     return Status::Error("FILE_DOWNLOAD_LIMIT");
175   }
176   return Status::Error("File transferring not finished");
177 }
178 
update_first_empty_part()179 void PartsManager::update_first_empty_part() {
180   while (first_empty_part_ < part_count_ && part_status_[first_empty_part_] != PartStatus::Empty) {
181     first_empty_part_++;
182   }
183 
184   if (streaming_offset_ == 0) {
185     first_streaming_empty_part_ = first_empty_part_;
186     return;
187   }
188   while (first_streaming_empty_part_ < part_count_ && part_status_[first_streaming_empty_part_] != PartStatus::Empty) {
189     first_streaming_empty_part_++;
190   }
191 }
192 
update_first_not_ready_part()193 void PartsManager::update_first_not_ready_part() {
194   while (first_not_ready_part_ < part_count_ && part_status_[first_not_ready_part_] == PartStatus::Ready) {
195     first_not_ready_part_++;
196   }
197   if (streaming_offset_ == 0) {
198     first_streaming_not_ready_part_ = first_not_ready_part_;
199     return;
200   }
201   while (first_streaming_not_ready_part_ < part_count_ &&
202          part_status_[first_streaming_not_ready_part_] == PartStatus::Ready) {
203     first_streaming_not_ready_part_++;
204   }
205 }
206 
get_unchecked_ready_prefix_count()207 int32 PartsManager::get_unchecked_ready_prefix_count() {
208   update_first_not_ready_part();
209   return first_not_ready_part_;
210 }
211 
get_ready_prefix_count()212 int32 PartsManager::get_ready_prefix_count() {
213   auto res = get_unchecked_ready_prefix_count();
214   if (need_check_) {
215     auto checked_parts = narrow_cast<int32>(checked_prefix_size_ / part_size_);
216     if (checked_parts < res) {
217       return checked_parts;
218     }
219   }
220   return res;
221 }
222 
get_streaming_offset() const223 int64 PartsManager::get_streaming_offset() const {
224   return streaming_offset_;
225 }
226 
get_bitmask()227 string PartsManager::get_bitmask() {
228   int32 prefix_count = -1;
229   if (need_check_) {
230     prefix_count = narrow_cast<int32>(checked_prefix_size_ / part_size_);
231   }
232   return bitmask_.encode(prefix_count);
233 }
234 
is_part_in_streaming_limit(int part_i) const235 bool PartsManager::is_part_in_streaming_limit(int part_i) const {
236   CHECK(part_i < part_count_);
237   auto offset_begin = static_cast<int64>(part_i) * static_cast<int64>(get_part_size());
238   auto offset_end = offset_begin + static_cast<int64>(get_part(part_i).size);
239 
240   if (offset_begin >= get_expected_size()) {
241     return false;
242   }
243 
244   if (streaming_limit_ == 0) {
245     return true;
246   }
247 
248   auto is_intersect_with = [&](int64 begin, int64 end) {
249     return max(begin, offset_begin) < min(end, offset_end);
250   };
251 
252   auto streaming_begin = streaming_offset_;
253   auto streaming_end = streaming_offset_ + streaming_limit_;
254   if (is_intersect_with(streaming_begin, streaming_end)) {
255     return true;
256   }
257   // wrap limit
258   if (!unknown_size_flag_ && streaming_end > get_size() && is_intersect_with(0, streaming_end - get_size())) {
259     return true;
260   }
261   return false;
262 }
263 
is_streaming_limit_reached()264 bool PartsManager::is_streaming_limit_reached() {
265   if (streaming_limit_ == 0) {
266     return false;
267   }
268   update_first_not_ready_part();
269   auto part_i = first_streaming_not_ready_part_;
270 
271   // wrap
272   if (!unknown_size_flag_ && part_i == part_count_) {
273     part_i = first_not_ready_part_;
274   }
275   if (part_i == part_count_) {
276     return false;
277   }
278   return !is_part_in_streaming_limit(part_i);
279 }
280 
start_part()281 Result<Part> PartsManager::start_part() {
282   update_first_empty_part();
283   auto part_i = first_streaming_empty_part_;
284   if (known_prefix_flag_ && part_i >= static_cast<int>(known_prefix_size_ / part_size_)) {
285     return Status::Error(1, "Wait for prefix to be known");
286   }
287   if (part_i == part_count_) {
288     if (unknown_size_flag_) {
289       part_count_++;
290       if (part_count_ > MAX_PART_COUNT + (use_part_count_limit_ ? 0 : 64)) {
291         if (!is_upload_) {
292           // Caller will try to increase part size if it is possible
293           return Status::Error("FILE_DOWNLOAD_RESTART_INCREASE_PART_SIZE");
294         }
295         return Status::Error("Too big file with unknown size");
296       }
297       part_status_.push_back(PartStatus::Empty);
298     } else {
299       if (first_empty_part_ < part_count_) {
300         part_i = first_empty_part_;
301       } else {
302         return get_empty_part();
303       }
304     }
305   }
306 
307   if (!is_part_in_streaming_limit(part_i)) {
308     return get_empty_part();
309   }
310   CHECK(part_status_[part_i] == PartStatus::Empty);
311   on_part_start(part_i);
312   return get_part(part_i);
313 }
314 
set_known_prefix(size_t size,bool is_ready)315 Status PartsManager::set_known_prefix(size_t size, bool is_ready) {
316   if (!known_prefix_flag_ || size < static_cast<size_t>(known_prefix_size_)) {
317     CHECK(is_upload_);
318     return Status::Error("FILE_UPLOAD_RESTART");
319   }
320   known_prefix_size_ = narrow_cast<int64>(size);
321   expected_size_ = max(known_prefix_size_, expected_size_);
322 
323   CHECK(static_cast<size_t>(part_count_) == part_status_.size());
324   if (is_ready) {
325     part_count_ = static_cast<int>(calc_part_count(size, part_size_));
326 
327     size_ = narrow_cast<int64>(size);
328     unknown_size_flag_ = false;
329     known_prefix_flag_ = false;
330   } else {
331     part_count_ = static_cast<int>(size / part_size_);
332   }
333 
334   LOG_CHECK(static_cast<size_t>(part_count_) >= part_status_.size())
335       << size << " " << is_ready << " " << part_count_ << " " << part_size_ << " " << part_status_.size();
336   part_status_.resize(part_count_);
337   if (use_part_count_limit_ && calc_part_count(expected_size_, part_size_) > MAX_PART_COUNT) {
338     CHECK(is_upload_);
339     return Status::Error("FILE_UPLOAD_RESTART");
340   }
341   return Status::OK();
342 }
343 
on_part_ok(int32 id,size_t part_size,size_t actual_size)344 Status PartsManager::on_part_ok(int32 id, size_t part_size, size_t actual_size) {
345   CHECK(part_status_[id] == PartStatus::Pending);
346   pending_count_--;
347 
348   part_status_[id] = PartStatus::Ready;
349   if (actual_size != 0) {
350     bitmask_.set(id);
351   }
352   ready_size_ += narrow_cast<int64>(actual_size);
353   if (streaming_limit_ > 0 && is_part_in_streaming_limit(id)) {
354     streaming_ready_size_ += narrow_cast<int64>(actual_size);
355   }
356 
357   VLOG(file_loader) << "Transferred part " << id << " of size " << part_size << ", total ready size = " << ready_size_;
358 
359   int64 offset = narrow_cast<int64>(part_size_) * id;
360   int64 end_offset = offset + narrow_cast<int64>(actual_size);
361   if (unknown_size_flag_) {
362     CHECK(part_size == part_size_);
363     if (actual_size < part_size_) {
364       max_size_ = min(max_size_, end_offset);
365     }
366     if (actual_size) {
367       min_size_ = max(min_size_, end_offset);
368     }
369     if (min_size_ > max_size_) {
370       auto status = Status::Error(PSLICE() << "Failed to transfer file: " << tag("min_size", min_size_)
371                                            << tag("max_size", max_size_));
372       LOG(ERROR) << status;
373       return status;
374     } else if (min_size_ == max_size_) {
375       unknown_size_flag_ = false;
376       size_ = min_size_;
377     }
378   } else {
379     if ((actual_size < part_size && offset < size_) || (offset >= size_ && actual_size > 0)) {
380       auto status = Status::Error(PSLICE() << "Failed to transfer file: " << tag("size", size_) << tag("offset", offset)
381                                            << tag("transferred size", actual_size) << tag("part size", part_size));
382       LOG(ERROR) << status;
383       return status;
384     }
385   }
386   return Status::OK();
387 }
388 
on_part_failed(int32 id)389 void PartsManager::on_part_failed(int32 id) {
390   CHECK(part_status_[id] == PartStatus::Pending);
391   pending_count_--;
392   part_status_[id] = PartStatus::Empty;
393   if (id < first_empty_part_) {
394     first_empty_part_ = id;
395   }
396   if (streaming_offset_ == 0) {
397     first_streaming_empty_part_ = id;
398     return;
399   }
400   auto part_i = narrow_cast<int>(streaming_offset_ / part_size_);
401   if (id >= part_i && id < first_streaming_empty_part_) {
402     first_streaming_empty_part_ = id;
403   }
404 }
405 
get_size() const406 int64 PartsManager::get_size() const {
407   CHECK(!unknown_size_flag_);
408   return size_;
409 }
410 
get_size_or_zero() const411 int64 PartsManager::get_size_or_zero() const {
412   return size_;
413 }
414 
get_estimated_extra() const415 int64 PartsManager::get_estimated_extra() const {
416   auto total_estimated_extra = get_expected_size() - get_ready_size();
417   if (streaming_limit_ != 0) {
418     int64 expected_size = get_expected_size();
419     int64 streaming_begin = streaming_offset_ / get_part_size() * get_part_size();
420     int64 streaming_end =
421         (streaming_offset_ + streaming_limit_ + get_part_size() - 1) / get_part_size() * get_part_size();
422     int64 streaming_size = streaming_end - streaming_begin;
423     if (unknown_size_flag_) {
424       if (streaming_begin < expected_size) {
425         streaming_size = min(expected_size - streaming_begin, streaming_size);
426       } else {
427         streaming_size = 0;
428       }
429     } else {
430       if (streaming_end > expected_size) {
431         int64 total = streaming_limit_;
432         int64 suffix = 0;
433         if (streaming_offset_ < expected_size_) {
434           suffix = expected_size_ - streaming_begin;
435           total -= expected_size_ - streaming_offset_;
436         }
437         int64 prefix = (total + get_part_size() - 1) / get_part_size() * get_part_size();
438         streaming_size = min(expected_size, prefix + suffix);
439       }
440     }
441     int64 res = streaming_size;
442 
443     //TODO: delete this block if CHECK won't fail
444     int64 sub = 0;
445     for (int part_i = 0; part_i < part_count_; part_i++) {
446       if (is_part_in_streaming_limit(part_i) && part_status_[part_i] == PartStatus::Ready) {
447         sub += get_part(part_i).size;
448       }
449     }
450     CHECK(sub == streaming_ready_size_);
451 
452     res -= streaming_ready_size_;
453     CHECK(res >= 0);
454     return res;
455   }
456   return total_estimated_extra;
457 }
458 
get_ready_size() const459 int64 PartsManager::get_ready_size() const {
460   return ready_size_;
461 }
462 
get_expected_size() const463 int64 PartsManager::get_expected_size() const {
464   if (unknown_size_flag_) {
465     return max(static_cast<int64>(512 * (1 << 10)), get_ready_size() * 2);
466   }
467   return get_size();
468 }
469 
get_part_size() const470 size_t PartsManager::get_part_size() const {
471   return part_size_;
472 }
473 
get_part_count() const474 int32 PartsManager::get_part_count() const {
475   return part_count_;
476 }
477 
init_common(const std::vector<int> & ready_parts)478 Status PartsManager::init_common(const std::vector<int> &ready_parts) {
479   ready_size_ = 0;
480   streaming_ready_size_ = 0;
481   pending_count_ = 0;
482   first_empty_part_ = 0;
483   first_not_ready_part_ = 0;
484   part_status_ = vector<PartStatus>(part_count_);
485 
486   for (auto i : ready_parts) {
487     if (known_prefix_flag_ && i >= static_cast<int>(known_prefix_size_ / part_size_)) {
488       CHECK(is_upload_);
489       return Status::Error("FILE_UPLOAD_RESTART");
490     }
491     if (is_upload_ && i >= part_count_) {
492       return Status::Error("FILE_UPLOAD_RESTART");
493     }
494     LOG_CHECK(0 <= i && i < part_count_) << tag("i", i) << tag("part_count", part_count_) << tag("size", size_)
495                                          << tag("part_size", part_size_) << tag("known_prefix_flag", known_prefix_flag_)
496                                          << tag("known_prefix_size", known_prefix_size_)
497                                          << tag("real part_count",
498                                                 std::accumulate(ready_parts.begin(), ready_parts.end(), 0,
499                                                                 [](auto a, auto b) { return max(a, b + 1); }));
500     part_status_[i] = PartStatus::Ready;
501     bitmask_.set(i);
502     auto part = get_part(i);
503     ready_size_ += narrow_cast<int64>(part.size);
504   }
505 
506   checked_prefix_size_ = get_ready_prefix_count() * narrow_cast<int64>(part_size_);
507 
508   return Status::OK();
509 }
510 
set_need_check()511 void PartsManager::set_need_check() {
512   need_check_ = true;
513   set_streaming_offset(0, 0);
514 }
515 
set_checked_prefix_size(int64 size)516 void PartsManager::set_checked_prefix_size(int64 size) {
517   checked_prefix_size_ = size;
518 }
519 
get_checked_prefix_size() const520 int64 PartsManager::get_checked_prefix_size() const {
521   return checked_prefix_size_;
522 }
523 
get_unchecked_ready_prefix_size()524 int64 PartsManager::get_unchecked_ready_prefix_size() {
525   update_first_not_ready_part();
526   auto count = first_not_ready_part_;
527   if (count == 0) {
528     return 0;
529   }
530   auto part = get_part(count - 1);
531   int64 res = part.offset;
532   if (!unknown_size_flag_) {
533     res += narrow_cast<int64>(part.size);
534     res = min(res, get_size());
535   }
536   return res;
537 }
538 
get_part(int id) const539 Part PartsManager::get_part(int id) const {
540   auto size = narrow_cast<int64>(part_size_);
541   auto offset = size * id;
542   auto total_size = unknown_size_flag_ ? max_size_ : get_size();
543   if (total_size < offset) {
544     size = 0;
545   } else {
546     size = min(size, total_size - offset);
547   }
548   return Part{id, offset, static_cast<size_t>(size)};
549 }
550 
get_empty_part()551 Part PartsManager::get_empty_part() {
552   return Part{-1, 0, 0};
553 }
554 
on_part_start(int32 id)555 void PartsManager::on_part_start(int32 id) {
556   CHECK(part_status_[id] == PartStatus::Empty);
557   part_status_[id] = PartStatus::Pending;
558   pending_count_++;
559 }
560 
561 }  // namespace td
562