1 // Copyright 2019 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.chrome.browser.feed.library.feedrequestmanager; 6 7 import org.chromium.base.Consumer; 8 import org.chromium.chrome.browser.feed.library.api.host.config.Configuration; 9 import org.chromium.chrome.browser.feed.library.api.host.config.Configuration.ConfigKey; 10 import org.chromium.chrome.browser.feed.library.api.host.logging.Task; 11 import org.chromium.chrome.browser.feed.library.api.host.network.HttpRequest; 12 import org.chromium.chrome.browser.feed.library.api.host.network.HttpRequest.HttpMethod; 13 import org.chromium.chrome.browser.feed.library.api.host.network.NetworkClient; 14 import org.chromium.chrome.browser.feed.library.api.host.storage.CommitResult; 15 import org.chromium.chrome.browser.feed.library.api.internal.actionmanager.ViewActionManager; 16 import org.chromium.chrome.browser.feed.library.api.internal.common.SemanticPropertiesWithId; 17 import org.chromium.chrome.browser.feed.library.api.internal.common.ThreadUtils; 18 import org.chromium.chrome.browser.feed.library.api.internal.protocoladapter.ProtocolAdapter; 19 import org.chromium.chrome.browser.feed.library.api.internal.requestmanager.ActionUploadRequestManager; 20 import org.chromium.chrome.browser.feed.library.api.internal.store.Store; 21 import org.chromium.chrome.browser.feed.library.api.internal.store.UploadableActionMutation; 22 import org.chromium.chrome.browser.feed.library.common.Result; 23 import org.chromium.chrome.browser.feed.library.common.concurrent.MainThreadRunner; 24 import org.chromium.chrome.browser.feed.library.common.concurrent.TaskQueue; 25 import org.chromium.chrome.browser.feed.library.common.concurrent.TaskQueue.TaskType; 26 import org.chromium.chrome.browser.feed.library.common.logging.Logger; 27 import org.chromium.chrome.browser.feed.library.common.protoextensions.FeedExtensionRegistry; 28 import org.chromium.chrome.browser.feed.library.common.time.Clock; 29 import org.chromium.components.feed.core.proto.libraries.api.internal.StreamDataProto.StreamUploadableAction; 30 import org.chromium.components.feed.core.proto.wire.ConsistencyTokenProto.ConsistencyToken; 31 import org.chromium.components.feed.core.proto.wire.FeedActionResponseProto.FeedActionResponse; 32 import org.chromium.components.feed.core.proto.wire.ResponseProto.Response; 33 34 import java.io.IOException; 35 import java.io.UnsupportedEncodingException; 36 import java.util.ArrayList; 37 import java.util.HashSet; 38 import java.util.List; 39 import java.util.Set; 40 import java.util.concurrent.TimeUnit; 41 42 /** Default implementation of ActionUploadRequestManager. */ 43 public final class FeedActionUploadRequestManager implements ActionUploadRequestManager { 44 private static final String TAG = "ActionUploadRequest"; 45 46 private final ViewActionManager mViewActionManager; 47 private final Configuration mConfiguration; 48 private final NetworkClient mNetworkClient; 49 private final ProtocolAdapter mProtocolAdapter; 50 private final FeedExtensionRegistry mExtensionRegistry; 51 private final MainThreadRunner mMainThreadRunner; 52 private final TaskQueue mTaskQueue; 53 private final ThreadUtils mThreadUtils; 54 private final Store mStore; 55 private final Clock mClock; 56 private final long mMaxActionUploadAttempts; 57 // Total number of actions that can be uploaded in a chained batch request. 58 private final long mMaxActionsUploadsPerBatchedRequest; 59 // Maximum bytes of StreamUploadableAction that can be uploaded in a single request. The actual 60 // request is made of ActionPayloads and SemanticProperties, not StreamUploadableActions, so 61 // this serves as a proxy. 62 private final long mMaxBytesPerRequest; 63 private final long mMaxActionUploadTtl; 64 FeedActionUploadRequestManager(ViewActionManager viewActionManager, Configuration configuration, NetworkClient networkClient, ProtocolAdapter protocolAdapter, FeedExtensionRegistry extensionRegistry, MainThreadRunner mainThreadRunner, TaskQueue taskQueue, ThreadUtils threadUtils, Store store, Clock clock)65 public FeedActionUploadRequestManager(ViewActionManager viewActionManager, 66 Configuration configuration, NetworkClient networkClient, 67 ProtocolAdapter protocolAdapter, FeedExtensionRegistry extensionRegistry, 68 MainThreadRunner mainThreadRunner, TaskQueue taskQueue, ThreadUtils threadUtils, 69 Store store, Clock clock) { 70 this.mViewActionManager = viewActionManager; 71 this.mConfiguration = configuration; 72 this.mNetworkClient = networkClient; 73 this.mProtocolAdapter = protocolAdapter; 74 this.mExtensionRegistry = extensionRegistry; 75 this.mMainThreadRunner = mainThreadRunner; 76 this.mTaskQueue = taskQueue; 77 this.mThreadUtils = threadUtils; 78 this.mStore = store; 79 this.mClock = clock; 80 mMaxBytesPerRequest = configuration.getValueOrDefault( 81 ConfigKey.FEED_ACTION_SERVER_MAX_SIZE_PER_REQUEST, 4000L); 82 mMaxActionsUploadsPerBatchedRequest = configuration.getValueOrDefault( 83 ConfigKey.FEED_ACTION_SERVER_MAX_ACTIONS_PER_REQUEST, 10L); 84 mMaxActionUploadAttempts = 85 configuration.getValueOrDefault(ConfigKey.FEED_ACTION_MAX_UPLOAD_ATTEMPTS, 1L); 86 mMaxActionUploadTtl = configuration.getValueOrDefault( 87 ConfigKey.FEED_ACTION_TTL_SECONDS, TimeUnit.DAYS.toSeconds(2)); 88 } 89 90 @Override triggerUploadActions(Set<StreamUploadableAction> actions, ConsistencyToken token, Consumer<Result<ConsistencyToken>> consumer)91 public void triggerUploadActions(Set<StreamUploadableAction> actions, ConsistencyToken token, 92 Consumer<Result<ConsistencyToken>> consumer) { 93 mThreadUtils.checkNotMainThread(); 94 if (mMaxActionUploadAttempts == 0 || mMaxBytesPerRequest == 0 95 || mMaxActionsUploadsPerBatchedRequest == 0) { 96 consumer.accept(Result.success(token)); 97 return; 98 } 99 triggerUploadActions(actions, token, consumer, /* uploadCount= */ 0); 100 } 101 102 @Override triggerUploadAllActions( ConsistencyToken token, Consumer<Result<ConsistencyToken>> consumer)103 public void triggerUploadAllActions( 104 ConsistencyToken token, Consumer<Result<ConsistencyToken>> consumer) { 105 mThreadUtils.checkNotMainThread(); 106 mMainThreadRunner.execute("Store view actions and", () -> { 107 mViewActionManager.storeViewActions(() -> { 108 mTaskQueue.execute(Task.UPLOAD_ALL_ACTIONS, TaskType.IMMEDIATE, () -> { 109 Result<Set<StreamUploadableAction>> actionsResult = 110 mStore.getAllUploadableActions(); 111 if (actionsResult.isSuccessful()) { 112 triggerUploadActions(actionsResult.getValue(), token, consumer); 113 } 114 }); 115 }); 116 }); 117 } 118 triggerUploadActions(Set<StreamUploadableAction> actions, ConsistencyToken token, Consumer<Result<ConsistencyToken>> consumer, int uploadCount)119 private void triggerUploadActions(Set<StreamUploadableAction> actions, ConsistencyToken token, 120 Consumer<Result<ConsistencyToken>> consumer, int uploadCount) { 121 mThreadUtils.checkNotMainThread(); 122 // Return the token if there are no actions to upload. 123 if (actions.isEmpty() || uploadCount >= mMaxActionsUploadsPerBatchedRequest) { 124 consumer.accept(Result.success(token)); 125 return; 126 } 127 UploadableActionsRequestBuilder requestBuilder = 128 new UploadableActionsRequestBuilder(mProtocolAdapter); 129 int actionPayloadBytes = 0; 130 131 Set<StreamUploadableAction> actionsToUpload = new HashSet<>(); 132 Set<StreamUploadableAction> actionsRemaining = new HashSet<>(); 133 ArrayList<String> contentIds = new ArrayList<>(); 134 135 UploadableActionMutation actionMutation = mStore.editUploadableActions(); 136 137 // Select which actions to send in this request, and update their attempt count. Remaining 138 // actions are collected and sent next. Stale actions are removed from the store. 139 for (StreamUploadableAction action : actions) { 140 if (isStale(action)) { 141 actionMutation.remove(action, action.getFeatureContentId()); 142 continue; 143 } 144 145 int actionBytes = action.toByteArray().length; 146 if (actionPayloadBytes + actionBytes < mMaxBytesPerRequest) { 147 actionPayloadBytes += actionBytes; 148 actionMutation.remove(action, action.getFeatureContentId()); 149 StreamUploadableAction actionToUpload = 150 action.toBuilder() 151 .setUploadAttempts(action.getUploadAttempts() + 1) 152 .build(); 153 actionMutation.upsert(actionToUpload, actionToUpload.getFeatureContentId()); 154 actionsToUpload.add(actionToUpload); 155 156 contentIds.add(actionToUpload.getFeatureContentId()); 157 } else { 158 actionsRemaining.add(action); 159 } 160 } 161 162 CommitResult commitResult = actionMutation.commit(); 163 if (commitResult != CommitResult.SUCCESS) { 164 Logger.e(TAG, "Upserting uploaded actions failed"); 165 consumer.accept(Result.failure()); 166 return; 167 } 168 169 if (actionsToUpload.isEmpty()) { 170 if (actionsRemaining.isEmpty()) { 171 // All actions were too stale to be uploaded. 172 consumer.accept(Result.success(token)); 173 } else { 174 Logger.e(TAG, "No action fitted in the request."); 175 consumer.accept(Result.failure()); 176 } 177 return; 178 } 179 180 Result<List<SemanticPropertiesWithId>> semanticPropertiesResult = 181 mStore.getSemanticProperties(contentIds); 182 List<SemanticPropertiesWithId> semanticPropertiesList = new ArrayList<>(); 183 if (semanticPropertiesResult.isSuccessful() 184 && !semanticPropertiesResult.getValue().isEmpty()) { 185 semanticPropertiesList = semanticPropertiesResult.getValue(); 186 } 187 188 Consumer<Result<ConsistencyToken>> tokenConsumer = result -> { 189 mThreadUtils.checkNotMainThread(); 190 if (result.isSuccessful()) { 191 if (!actionsRemaining.isEmpty()) { 192 triggerUploadActions(actionsRemaining, result.getValue(), consumer, 193 uploadCount + actionsToUpload.size()); 194 } else { 195 consumer.accept(Result.success(result.getValue())); 196 } 197 } else { 198 consumer.accept(uploadCount == 0 ? Result.failure() : Result.success(token)); 199 } 200 }; 201 requestBuilder.setConsistencyToken(token) 202 .setActions(actionsToUpload) 203 .setSemanticProperties(semanticPropertiesList); 204 executeUploadActionRequest(actionsToUpload, requestBuilder, tokenConsumer); 205 } 206 executeUploadActionRequest(Set<StreamUploadableAction> actions, UploadableActionsRequestBuilder requestBuilder, Consumer<Result<ConsistencyToken>> consumer)207 private void executeUploadActionRequest(Set<StreamUploadableAction> actions, 208 UploadableActionsRequestBuilder requestBuilder, 209 Consumer<Result<ConsistencyToken>> consumer) { 210 mThreadUtils.checkNotMainThread(); 211 212 String endpoint = 213 mConfiguration.getValueOrDefault(ConfigKey.FEED_ACTION_SERVER_ENDPOINT, ""); 214 @HttpMethod 215 String httpMethod = mConfiguration.getValueOrDefault( 216 ConfigKey.FEED_ACTION_SERVER_METHOD, HttpMethod.POST); 217 HttpRequest httpRequest = 218 RequestHelper.buildHttpRequest(httpMethod, requestBuilder.build().toByteArray(), 219 endpoint, /* locale= */ "", /* priorityParamValue= */ ""); 220 221 Logger.i(TAG, "Making Request: %s", httpRequest.getUri().getPath()); 222 mNetworkClient.send(httpRequest, input -> { 223 Logger.i(TAG, "Request: %s completed with response code: %s", 224 httpRequest.getUri().getPath(), input.getResponseCode()); 225 if (input.getResponseCode() != 200) { 226 String errorBody = null; 227 try { 228 errorBody = new String(input.getResponseBody(), "UTF-8"); 229 } catch (UnsupportedEncodingException e) { 230 Logger.e(TAG, "Error handling http error logging", e); 231 } 232 Logger.e(TAG, "errorCode: %d", input.getResponseCode()); 233 Logger.e(TAG, "errorResponse: %s", errorBody); 234 mTaskQueue.execute(Task.EXECUTE_UPLOAD_ACTION_REQUEST, TaskType.IMMEDIATE, 235 () -> { consumer.accept(Result.failure()); }); 236 return; 237 } 238 handleUploadableActionResponseBytes(actions, input.getResponseBody(), consumer); 239 }); 240 } 241 handleUploadableActionResponseBytes(Set<StreamUploadableAction> actions, final byte[] responseBytes, final Consumer<Result<ConsistencyToken>> consumer)242 private void handleUploadableActionResponseBytes(Set<StreamUploadableAction> actions, 243 final byte[] responseBytes, final Consumer<Result<ConsistencyToken>> consumer) { 244 mTaskQueue.execute(Task.HANDLE_UPLOADABLE_ACTION_RESPONSE_BYTES, TaskType.IMMEDIATE, () -> { 245 Response response; 246 boolean isLengthPrefixed = mConfiguration.getValueOrDefault( 247 ConfigKey.FEED_ACTION_SERVER_RESPONSE_LENGTH_PREFIXED, true); 248 try { 249 response = Response.parseFrom(isLengthPrefixed 250 ? RequestHelper.getLengthPrefixedValue(responseBytes) 251 : responseBytes, 252 mExtensionRegistry.getExtensionRegistry()); 253 } catch (IOException e) { 254 Logger.e(TAG, e, "Response parse failed"); 255 consumer.accept(Result.failure()); 256 return; 257 } 258 FeedActionResponse feedActionResponse = 259 response.getExtension(FeedActionResponse.feedActionResponse); 260 final Result<ConsistencyToken> contextResult; 261 if (feedActionResponse.hasConsistencyToken()) { 262 contextResult = Result.success(feedActionResponse.getConsistencyToken()); 263 UploadableActionMutation actionMutation = mStore.editUploadableActions(); 264 for (StreamUploadableAction action : actions) { 265 actionMutation.remove(action, action.getFeatureContentId()); 266 } 267 CommitResult commitResult = actionMutation.commit(); 268 if (commitResult != CommitResult.SUCCESS) { 269 // TODO:log failure to the basicLoggingApi 270 Logger.e(TAG, "Removing actions on success failed"); 271 } 272 } else { 273 contextResult = Result.failure(); 274 } 275 consumer.accept(contextResult); 276 }); 277 } 278 isStale(StreamUploadableAction action)279 private boolean isStale(StreamUploadableAction action) { 280 int uploadAttempts = action.getUploadAttempts(); 281 long currentTime = TimeUnit.MILLISECONDS.toSeconds(mClock.currentTimeMillis()); 282 long timeSinceUpload = currentTime - action.getTimestampSeconds(); 283 return uploadAttempts >= mMaxActionUploadAttempts || timeSinceUpload > mMaxActionUploadTtl; 284 } 285 } 286