1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 package org.chromium.chrome.browser.feed.library.feedrequestmanager;
6 
7 import org.chromium.base.Consumer;
8 import org.chromium.chrome.browser.feed.library.api.host.config.Configuration;
9 import org.chromium.chrome.browser.feed.library.api.host.config.Configuration.ConfigKey;
10 import org.chromium.chrome.browser.feed.library.api.host.logging.Task;
11 import org.chromium.chrome.browser.feed.library.api.host.network.HttpRequest;
12 import org.chromium.chrome.browser.feed.library.api.host.network.HttpRequest.HttpMethod;
13 import org.chromium.chrome.browser.feed.library.api.host.network.NetworkClient;
14 import org.chromium.chrome.browser.feed.library.api.host.storage.CommitResult;
15 import org.chromium.chrome.browser.feed.library.api.internal.actionmanager.ViewActionManager;
16 import org.chromium.chrome.browser.feed.library.api.internal.common.SemanticPropertiesWithId;
17 import org.chromium.chrome.browser.feed.library.api.internal.common.ThreadUtils;
18 import org.chromium.chrome.browser.feed.library.api.internal.protocoladapter.ProtocolAdapter;
19 import org.chromium.chrome.browser.feed.library.api.internal.requestmanager.ActionUploadRequestManager;
20 import org.chromium.chrome.browser.feed.library.api.internal.store.Store;
21 import org.chromium.chrome.browser.feed.library.api.internal.store.UploadableActionMutation;
22 import org.chromium.chrome.browser.feed.library.common.Result;
23 import org.chromium.chrome.browser.feed.library.common.concurrent.MainThreadRunner;
24 import org.chromium.chrome.browser.feed.library.common.concurrent.TaskQueue;
25 import org.chromium.chrome.browser.feed.library.common.concurrent.TaskQueue.TaskType;
26 import org.chromium.chrome.browser.feed.library.common.logging.Logger;
27 import org.chromium.chrome.browser.feed.library.common.protoextensions.FeedExtensionRegistry;
28 import org.chromium.chrome.browser.feed.library.common.time.Clock;
29 import org.chromium.components.feed.core.proto.libraries.api.internal.StreamDataProto.StreamUploadableAction;
30 import org.chromium.components.feed.core.proto.wire.ConsistencyTokenProto.ConsistencyToken;
31 import org.chromium.components.feed.core.proto.wire.FeedActionResponseProto.FeedActionResponse;
32 import org.chromium.components.feed.core.proto.wire.ResponseProto.Response;
33 
34 import java.io.IOException;
35 import java.io.UnsupportedEncodingException;
36 import java.util.ArrayList;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Set;
40 import java.util.concurrent.TimeUnit;
41 
42 /** Default implementation of ActionUploadRequestManager. */
43 public final class FeedActionUploadRequestManager implements ActionUploadRequestManager {
44     private static final String TAG = "ActionUploadRequest";
45 
46     private final ViewActionManager mViewActionManager;
47     private final Configuration mConfiguration;
48     private final NetworkClient mNetworkClient;
49     private final ProtocolAdapter mProtocolAdapter;
50     private final FeedExtensionRegistry mExtensionRegistry;
51     private final MainThreadRunner mMainThreadRunner;
52     private final TaskQueue mTaskQueue;
53     private final ThreadUtils mThreadUtils;
54     private final Store mStore;
55     private final Clock mClock;
56     private final long mMaxActionUploadAttempts;
57     // Total number of actions that can be uploaded in a chained batch request.
58     private final long mMaxActionsUploadsPerBatchedRequest;
59     // Maximum bytes of StreamUploadableAction that can be uploaded in a single request. The actual
60     // request is made of ActionPayloads and SemanticProperties, not StreamUploadableActions, so
61     // this serves as a proxy.
62     private final long mMaxBytesPerRequest;
63     private final long mMaxActionUploadTtl;
64 
FeedActionUploadRequestManager(ViewActionManager viewActionManager, Configuration configuration, NetworkClient networkClient, ProtocolAdapter protocolAdapter, FeedExtensionRegistry extensionRegistry, MainThreadRunner mainThreadRunner, TaskQueue taskQueue, ThreadUtils threadUtils, Store store, Clock clock)65     public FeedActionUploadRequestManager(ViewActionManager viewActionManager,
66             Configuration configuration, NetworkClient networkClient,
67             ProtocolAdapter protocolAdapter, FeedExtensionRegistry extensionRegistry,
68             MainThreadRunner mainThreadRunner, TaskQueue taskQueue, ThreadUtils threadUtils,
69             Store store, Clock clock) {
70         this.mViewActionManager = viewActionManager;
71         this.mConfiguration = configuration;
72         this.mNetworkClient = networkClient;
73         this.mProtocolAdapter = protocolAdapter;
74         this.mExtensionRegistry = extensionRegistry;
75         this.mMainThreadRunner = mainThreadRunner;
76         this.mTaskQueue = taskQueue;
77         this.mThreadUtils = threadUtils;
78         this.mStore = store;
79         this.mClock = clock;
80         mMaxBytesPerRequest = configuration.getValueOrDefault(
81                 ConfigKey.FEED_ACTION_SERVER_MAX_SIZE_PER_REQUEST, 4000L);
82         mMaxActionsUploadsPerBatchedRequest = configuration.getValueOrDefault(
83                 ConfigKey.FEED_ACTION_SERVER_MAX_ACTIONS_PER_REQUEST, 10L);
84         mMaxActionUploadAttempts =
85                 configuration.getValueOrDefault(ConfigKey.FEED_ACTION_MAX_UPLOAD_ATTEMPTS, 1L);
86         mMaxActionUploadTtl = configuration.getValueOrDefault(
87                 ConfigKey.FEED_ACTION_TTL_SECONDS, TimeUnit.DAYS.toSeconds(2));
88     }
89 
90     @Override
triggerUploadActions(Set<StreamUploadableAction> actions, ConsistencyToken token, Consumer<Result<ConsistencyToken>> consumer)91     public void triggerUploadActions(Set<StreamUploadableAction> actions, ConsistencyToken token,
92             Consumer<Result<ConsistencyToken>> consumer) {
93         mThreadUtils.checkNotMainThread();
94         if (mMaxActionUploadAttempts == 0 || mMaxBytesPerRequest == 0
95                 || mMaxActionsUploadsPerBatchedRequest == 0) {
96             consumer.accept(Result.success(token));
97             return;
98         }
99         triggerUploadActions(actions, token, consumer, /* uploadCount= */ 0);
100     }
101 
102     @Override
triggerUploadAllActions( ConsistencyToken token, Consumer<Result<ConsistencyToken>> consumer)103     public void triggerUploadAllActions(
104             ConsistencyToken token, Consumer<Result<ConsistencyToken>> consumer) {
105         mThreadUtils.checkNotMainThread();
106         mMainThreadRunner.execute("Store view actions and", () -> {
107             mViewActionManager.storeViewActions(() -> {
108                 mTaskQueue.execute(Task.UPLOAD_ALL_ACTIONS, TaskType.IMMEDIATE, () -> {
109                     Result<Set<StreamUploadableAction>> actionsResult =
110                             mStore.getAllUploadableActions();
111                     if (actionsResult.isSuccessful()) {
112                         triggerUploadActions(actionsResult.getValue(), token, consumer);
113                     }
114                 });
115             });
116         });
117     }
118 
triggerUploadActions(Set<StreamUploadableAction> actions, ConsistencyToken token, Consumer<Result<ConsistencyToken>> consumer, int uploadCount)119     private void triggerUploadActions(Set<StreamUploadableAction> actions, ConsistencyToken token,
120             Consumer<Result<ConsistencyToken>> consumer, int uploadCount) {
121         mThreadUtils.checkNotMainThread();
122         // Return the token if there are no actions to upload.
123         if (actions.isEmpty() || uploadCount >= mMaxActionsUploadsPerBatchedRequest) {
124             consumer.accept(Result.success(token));
125             return;
126         }
127         UploadableActionsRequestBuilder requestBuilder =
128                 new UploadableActionsRequestBuilder(mProtocolAdapter);
129         int actionPayloadBytes = 0;
130 
131         Set<StreamUploadableAction> actionsToUpload = new HashSet<>();
132         Set<StreamUploadableAction> actionsRemaining = new HashSet<>();
133         ArrayList<String> contentIds = new ArrayList<>();
134 
135         UploadableActionMutation actionMutation = mStore.editUploadableActions();
136 
137         // Select which actions to send in this request, and update their attempt count. Remaining
138         // actions are collected and sent next. Stale actions are removed from the store.
139         for (StreamUploadableAction action : actions) {
140             if (isStale(action)) {
141                 actionMutation.remove(action, action.getFeatureContentId());
142                 continue;
143             }
144 
145             int actionBytes = action.toByteArray().length;
146             if (actionPayloadBytes + actionBytes < mMaxBytesPerRequest) {
147                 actionPayloadBytes += actionBytes;
148                 actionMutation.remove(action, action.getFeatureContentId());
149                 StreamUploadableAction actionToUpload =
150                         action.toBuilder()
151                                 .setUploadAttempts(action.getUploadAttempts() + 1)
152                                 .build();
153                 actionMutation.upsert(actionToUpload, actionToUpload.getFeatureContentId());
154                 actionsToUpload.add(actionToUpload);
155 
156                 contentIds.add(actionToUpload.getFeatureContentId());
157             } else {
158                 actionsRemaining.add(action);
159             }
160         }
161 
162         CommitResult commitResult = actionMutation.commit();
163         if (commitResult != CommitResult.SUCCESS) {
164             Logger.e(TAG, "Upserting uploaded actions failed");
165             consumer.accept(Result.failure());
166             return;
167         }
168 
169         if (actionsToUpload.isEmpty()) {
170             if (actionsRemaining.isEmpty()) {
171                 // All actions were too stale to be uploaded.
172                 consumer.accept(Result.success(token));
173             } else {
174                 Logger.e(TAG, "No action fitted in the request.");
175                 consumer.accept(Result.failure());
176             }
177             return;
178         }
179 
180         Result<List<SemanticPropertiesWithId>> semanticPropertiesResult =
181                 mStore.getSemanticProperties(contentIds);
182         List<SemanticPropertiesWithId> semanticPropertiesList = new ArrayList<>();
183         if (semanticPropertiesResult.isSuccessful()
184                 && !semanticPropertiesResult.getValue().isEmpty()) {
185             semanticPropertiesList = semanticPropertiesResult.getValue();
186         }
187 
188         Consumer<Result<ConsistencyToken>> tokenConsumer = result -> {
189             mThreadUtils.checkNotMainThread();
190             if (result.isSuccessful()) {
191                 if (!actionsRemaining.isEmpty()) {
192                     triggerUploadActions(actionsRemaining, result.getValue(), consumer,
193                             uploadCount + actionsToUpload.size());
194                 } else {
195                     consumer.accept(Result.success(result.getValue()));
196                 }
197             } else {
198                 consumer.accept(uploadCount == 0 ? Result.failure() : Result.success(token));
199             }
200         };
201         requestBuilder.setConsistencyToken(token)
202                 .setActions(actionsToUpload)
203                 .setSemanticProperties(semanticPropertiesList);
204         executeUploadActionRequest(actionsToUpload, requestBuilder, tokenConsumer);
205     }
206 
executeUploadActionRequest(Set<StreamUploadableAction> actions, UploadableActionsRequestBuilder requestBuilder, Consumer<Result<ConsistencyToken>> consumer)207     private void executeUploadActionRequest(Set<StreamUploadableAction> actions,
208             UploadableActionsRequestBuilder requestBuilder,
209             Consumer<Result<ConsistencyToken>> consumer) {
210         mThreadUtils.checkNotMainThread();
211 
212         String endpoint =
213                 mConfiguration.getValueOrDefault(ConfigKey.FEED_ACTION_SERVER_ENDPOINT, "");
214         @HttpMethod
215         String httpMethod = mConfiguration.getValueOrDefault(
216                 ConfigKey.FEED_ACTION_SERVER_METHOD, HttpMethod.POST);
217         HttpRequest httpRequest =
218                 RequestHelper.buildHttpRequest(httpMethod, requestBuilder.build().toByteArray(),
219                         endpoint, /* locale= */ "", /* priorityParamValue= */ "");
220 
221         Logger.i(TAG, "Making Request: %s", httpRequest.getUri().getPath());
222         mNetworkClient.send(httpRequest, input -> {
223             Logger.i(TAG, "Request: %s completed with response code: %s",
224                     httpRequest.getUri().getPath(), input.getResponseCode());
225             if (input.getResponseCode() != 200) {
226                 String errorBody = null;
227                 try {
228                     errorBody = new String(input.getResponseBody(), "UTF-8");
229                 } catch (UnsupportedEncodingException e) {
230                     Logger.e(TAG, "Error handling http error logging", e);
231                 }
232                 Logger.e(TAG, "errorCode: %d", input.getResponseCode());
233                 Logger.e(TAG, "errorResponse: %s", errorBody);
234                 mTaskQueue.execute(Task.EXECUTE_UPLOAD_ACTION_REQUEST, TaskType.IMMEDIATE,
235                         () -> { consumer.accept(Result.failure()); });
236                 return;
237             }
238             handleUploadableActionResponseBytes(actions, input.getResponseBody(), consumer);
239         });
240     }
241 
handleUploadableActionResponseBytes(Set<StreamUploadableAction> actions, final byte[] responseBytes, final Consumer<Result<ConsistencyToken>> consumer)242     private void handleUploadableActionResponseBytes(Set<StreamUploadableAction> actions,
243             final byte[] responseBytes, final Consumer<Result<ConsistencyToken>> consumer) {
244         mTaskQueue.execute(Task.HANDLE_UPLOADABLE_ACTION_RESPONSE_BYTES, TaskType.IMMEDIATE, () -> {
245             Response response;
246             boolean isLengthPrefixed = mConfiguration.getValueOrDefault(
247                     ConfigKey.FEED_ACTION_SERVER_RESPONSE_LENGTH_PREFIXED, true);
248             try {
249                 response = Response.parseFrom(isLengthPrefixed
250                                 ? RequestHelper.getLengthPrefixedValue(responseBytes)
251                                 : responseBytes,
252                         mExtensionRegistry.getExtensionRegistry());
253             } catch (IOException e) {
254                 Logger.e(TAG, e, "Response parse failed");
255                 consumer.accept(Result.failure());
256                 return;
257             }
258             FeedActionResponse feedActionResponse =
259                     response.getExtension(FeedActionResponse.feedActionResponse);
260             final Result<ConsistencyToken> contextResult;
261             if (feedActionResponse.hasConsistencyToken()) {
262                 contextResult = Result.success(feedActionResponse.getConsistencyToken());
263                 UploadableActionMutation actionMutation = mStore.editUploadableActions();
264                 for (StreamUploadableAction action : actions) {
265                     actionMutation.remove(action, action.getFeatureContentId());
266                 }
267                 CommitResult commitResult = actionMutation.commit();
268                 if (commitResult != CommitResult.SUCCESS) {
269                     // TODO:log failure to the basicLoggingApi
270                     Logger.e(TAG, "Removing actions on success failed");
271                 }
272             } else {
273                 contextResult = Result.failure();
274             }
275             consumer.accept(contextResult);
276         });
277     }
278 
isStale(StreamUploadableAction action)279     private boolean isStale(StreamUploadableAction action) {
280         int uploadAttempts = action.getUploadAttempts();
281         long currentTime = TimeUnit.MILLISECONDS.toSeconds(mClock.currentTimeMillis());
282         long timeSinceUpload = currentTime - action.getTimestampSeconds();
283         return uploadAttempts >= mMaxActionUploadAttempts || timeSinceUpload > mMaxActionUploadTtl;
284     }
285 }
286