1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "components/feed/core/v2/tasks/load_stream_from_store_task.h"
6 
7 #include <algorithm>
8 #include <utility>
9 
10 #include "base/time/clock.h"
11 #include "components/feed/core/proto/v2/store.pb.h"
12 #include "components/feed/core/v2/feed_store.h"
13 #include "components/feed/core/v2/proto_util.h"
14 #include "components/feed/core/v2/public/feed_stream_api.h"
15 #include "components/feed/core/v2/scheduling.h"
16 #include "components/feed/core/v2/stream_model_update_request.h"
17 
18 namespace feed {
19 
20 LoadStreamFromStoreTask::Result::Result() = default;
21 LoadStreamFromStoreTask::Result::~Result() = default;
22 LoadStreamFromStoreTask::Result::Result(Result&&) = default;
23 LoadStreamFromStoreTask::Result& LoadStreamFromStoreTask::Result::operator=(
24     Result&&) = default;
25 
LoadStreamFromStoreTask(FeedStore * store,const base::Clock * clock,UserClass user_class,base::OnceCallback<void (Result)> callback)26 LoadStreamFromStoreTask::LoadStreamFromStoreTask(
27     FeedStore* store,
28     const base::Clock* clock,
29     UserClass user_class,
30     base::OnceCallback<void(Result)> callback)
31     : store_(store),
32       clock_(clock),
33       user_class_(user_class),
34       result_callback_(std::move(callback)),
35       update_request_(std::make_unique<StreamModelUpdateRequest>()) {}
36 
37 LoadStreamFromStoreTask::~LoadStreamFromStoreTask() = default;
38 
Run()39 void LoadStreamFromStoreTask::Run() {
40   store_->LoadStream(
41       base::BindOnce(&LoadStreamFromStoreTask::LoadStreamDone, GetWeakPtr()));
42 }
43 
LoadStreamDone(FeedStore::LoadStreamResult result)44 void LoadStreamFromStoreTask::LoadStreamDone(
45     FeedStore::LoadStreamResult result) {
46   if (result.read_error) {
47     Complete(LoadStreamStatus::kFailedWithStoreError);
48     return;
49   }
50   if (result.stream_structures.empty()) {
51     Complete(LoadStreamStatus::kNoStreamDataInStore);
52     return;
53   }
54   if (!ignore_staleness_) {
55     const base::TimeDelta content_age =
56         clock_->Now() - feedstore::GetLastAddedTime(result.stream_data);
57     if (content_age < base::TimeDelta()) {
58       Complete(LoadStreamStatus::kDataInStoreIsStaleTimestampInFuture);
59       return;
60     } else if (ShouldWaitForNewContent(user_class_, true, content_age)) {
61       Complete(LoadStreamStatus::kDataInStoreIsStale);
62       return;
63     }
64   }
65 
66   // TODO(harringtond): Add other failure cases?
67 
68   std::vector<ContentId> referenced_content_ids;
69   for (const feedstore::StreamStructureSet& structure_set :
70        result.stream_structures) {
71     for (const feedstore::StreamStructure& structure :
72          structure_set.structures()) {
73       if (structure.type() == feedstore::StreamStructure::CONTENT) {
74         referenced_content_ids.push_back(structure.content_id());
75       }
76     }
77   }
78 
79   store_->ReadContent(
80       std::move(referenced_content_ids), {result.stream_data.shared_state_id()},
81       base::BindOnce(&LoadStreamFromStoreTask::LoadContentDone, GetWeakPtr()));
82 
83   update_request_->stream_data = std::move(result.stream_data);
84 
85   // Move stream structures into the update request.
86   // These need sorted by sequence number, and then inserted into
87   // |update_request_->stream_structures|.
88   std::sort(result.stream_structures.begin(), result.stream_structures.end(),
89             [](const feedstore::StreamStructureSet& a,
90                const feedstore::StreamStructureSet& b) {
91               return a.sequence_number() < b.sequence_number();
92             });
93 
94   for (feedstore::StreamStructureSet& structure_set :
95        result.stream_structures) {
96     update_request_->max_structure_sequence_number =
97         structure_set.sequence_number();
98     for (feedstore::StreamStructure& structure :
99          *structure_set.mutable_structures()) {
100       update_request_->stream_structures.push_back(std::move(structure));
101     }
102   }
103 }
104 
LoadContentDone(std::vector<feedstore::Content> content,std::vector<feedstore::StreamSharedState> shared_states)105 void LoadStreamFromStoreTask::LoadContentDone(
106     std::vector<feedstore::Content> content,
107     std::vector<feedstore::StreamSharedState> shared_states) {
108   update_request_->content = std::move(content);
109   update_request_->shared_states = std::move(shared_states);
110 
111   update_request_->source =
112       StreamModelUpdateRequest::Source::kInitialLoadFromStore;
113 
114   Complete(LoadStreamStatus::kLoadedFromStore);
115 }
116 
Complete(LoadStreamStatus status)117 void LoadStreamFromStoreTask::Complete(LoadStreamStatus status) {
118   Result task_result;
119   task_result.status = status;
120   if (status == LoadStreamStatus::kLoadedFromStore) {
121     task_result.update_request = std::move(update_request_);
122   }
123   std::move(result_callback_).Run(std::move(task_result));
124   TaskComplete();
125 }
126 
127 }  // namespace feed
128