1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/feed/core/v2/tasks/load_stream_from_store_task.h"
6
7 #include <algorithm>
8 #include <utility>
9
10 #include "base/time/clock.h"
11 #include "components/feed/core/proto/v2/store.pb.h"
12 #include "components/feed/core/v2/feed_store.h"
13 #include "components/feed/core/v2/proto_util.h"
14 #include "components/feed/core/v2/public/feed_stream_api.h"
15 #include "components/feed/core/v2/scheduling.h"
16 #include "components/feed/core/v2/stream_model_update_request.h"
17
18 namespace feed {
19
20 LoadStreamFromStoreTask::Result::Result() = default;
21 LoadStreamFromStoreTask::Result::~Result() = default;
22 LoadStreamFromStoreTask::Result::Result(Result&&) = default;
23 LoadStreamFromStoreTask::Result& LoadStreamFromStoreTask::Result::operator=(
24 Result&&) = default;
25
LoadStreamFromStoreTask(FeedStore * store,const base::Clock * clock,UserClass user_class,base::OnceCallback<void (Result)> callback)26 LoadStreamFromStoreTask::LoadStreamFromStoreTask(
27 FeedStore* store,
28 const base::Clock* clock,
29 UserClass user_class,
30 base::OnceCallback<void(Result)> callback)
31 : store_(store),
32 clock_(clock),
33 user_class_(user_class),
34 result_callback_(std::move(callback)),
35 update_request_(std::make_unique<StreamModelUpdateRequest>()) {}
36
37 LoadStreamFromStoreTask::~LoadStreamFromStoreTask() = default;
38
Run()39 void LoadStreamFromStoreTask::Run() {
40 store_->LoadStream(
41 base::BindOnce(&LoadStreamFromStoreTask::LoadStreamDone, GetWeakPtr()));
42 }
43
LoadStreamDone(FeedStore::LoadStreamResult result)44 void LoadStreamFromStoreTask::LoadStreamDone(
45 FeedStore::LoadStreamResult result) {
46 if (result.read_error) {
47 Complete(LoadStreamStatus::kFailedWithStoreError);
48 return;
49 }
50 if (result.stream_structures.empty()) {
51 Complete(LoadStreamStatus::kNoStreamDataInStore);
52 return;
53 }
54 if (!ignore_staleness_) {
55 const base::TimeDelta content_age =
56 clock_->Now() - feedstore::GetLastAddedTime(result.stream_data);
57 if (content_age < base::TimeDelta()) {
58 Complete(LoadStreamStatus::kDataInStoreIsStaleTimestampInFuture);
59 return;
60 } else if (ShouldWaitForNewContent(user_class_, true, content_age)) {
61 Complete(LoadStreamStatus::kDataInStoreIsStale);
62 return;
63 }
64 }
65
66 // TODO(harringtond): Add other failure cases?
67
68 std::vector<ContentId> referenced_content_ids;
69 for (const feedstore::StreamStructureSet& structure_set :
70 result.stream_structures) {
71 for (const feedstore::StreamStructure& structure :
72 structure_set.structures()) {
73 if (structure.type() == feedstore::StreamStructure::CONTENT) {
74 referenced_content_ids.push_back(structure.content_id());
75 }
76 }
77 }
78
79 store_->ReadContent(
80 std::move(referenced_content_ids), {result.stream_data.shared_state_id()},
81 base::BindOnce(&LoadStreamFromStoreTask::LoadContentDone, GetWeakPtr()));
82
83 update_request_->stream_data = std::move(result.stream_data);
84
85 // Move stream structures into the update request.
86 // These need sorted by sequence number, and then inserted into
87 // |update_request_->stream_structures|.
88 std::sort(result.stream_structures.begin(), result.stream_structures.end(),
89 [](const feedstore::StreamStructureSet& a,
90 const feedstore::StreamStructureSet& b) {
91 return a.sequence_number() < b.sequence_number();
92 });
93
94 for (feedstore::StreamStructureSet& structure_set :
95 result.stream_structures) {
96 update_request_->max_structure_sequence_number =
97 structure_set.sequence_number();
98 for (feedstore::StreamStructure& structure :
99 *structure_set.mutable_structures()) {
100 update_request_->stream_structures.push_back(std::move(structure));
101 }
102 }
103 }
104
LoadContentDone(std::vector<feedstore::Content> content,std::vector<feedstore::StreamSharedState> shared_states)105 void LoadStreamFromStoreTask::LoadContentDone(
106 std::vector<feedstore::Content> content,
107 std::vector<feedstore::StreamSharedState> shared_states) {
108 update_request_->content = std::move(content);
109 update_request_->shared_states = std::move(shared_states);
110
111 update_request_->source =
112 StreamModelUpdateRequest::Source::kInitialLoadFromStore;
113
114 Complete(LoadStreamStatus::kLoadedFromStore);
115 }
116
Complete(LoadStreamStatus status)117 void LoadStreamFromStoreTask::Complete(LoadStreamStatus status) {
118 Result task_result;
119 task_result.status = status;
120 if (status == LoadStreamStatus::kLoadedFromStore) {
121 task_result.update_request = std::move(update_request_);
122 }
123 std::move(result_callback_).Run(std::move(task_result));
124 TaskComplete();
125 }
126
127 } // namespace feed
128