1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 package org.mozilla.gecko.sync.repositories.uploaders; 6 7 import android.support.annotation.Nullable; 8 import android.support.annotation.VisibleForTesting; 9 10 import org.mozilla.gecko.background.common.log.Logger; 11 import org.mozilla.gecko.sync.CollectionConcurrentModificationException; 12 import org.mozilla.gecko.sync.Server15RecordPostFailedException; 13 import org.mozilla.gecko.sync.net.SyncResponse; 14 import org.mozilla.gecko.sync.net.SyncStorageResponse; 15 16 import java.util.ArrayList; 17 import java.util.concurrent.Executor; 18 import java.util.concurrent.atomic.AtomicBoolean; 19 import java.util.concurrent.atomic.AtomicLong; 20 21 /** 22 * Dispatches payload runnables and handles their results. 23 * 24 * All of the methods, except for `queue` and `finalizeQueue`, will be called from the thread(s) 25 * running sequentially on the SingleThreadExecutor `executor`. 26 */ 27 class PayloadDispatcher { 28 private static final String LOG_TAG = "PayloadDispatcher"; 29 30 // All payload runnables share the same whiteboard. 31 // It's accessed directly by the runnables; tests also make use of this direct access. 32 volatile BatchMeta batchWhiteboard; 33 private final AtomicLong uploadTimestamp = new AtomicLong(0); 34 35 private final Executor executor; 36 private final BatchingUploader uploader; 37 38 // Written from sequentially running thread(s) on the SingleThreadExecutor `executor`. 39 // Read by many threads running concurrently on the records consumer thread pool. 40 final AtomicBoolean storeFailed = new AtomicBoolean(false); 41 PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified)42 PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified) { 43 // Initially we don't know if we're in a batching mode. 44 this.batchWhiteboard = new BatchMeta(initialLastModified, null); 45 this.uploader = uploader; 46 this.executor = executor; 47 } 48 queue( final ArrayList<byte[]> outgoing, final ArrayList<String> outgoingGuids, final long byteCount, final boolean isCommit, final boolean isLastPayload)49 void queue( 50 final ArrayList<byte[]> outgoing, 51 final ArrayList<String> outgoingGuids, 52 final long byteCount, 53 final boolean isCommit, final boolean isLastPayload) { 54 55 // Note that `executor` is expected to be a SingleThreadExecutor. 56 executor.execute(new BatchContextRunnable(isCommit) { 57 @Override 58 public void run() { 59 createRecordUploadRunnable(outgoing, outgoingGuids, byteCount, isCommit, isLastPayload).run(); 60 } 61 }); 62 } 63 setInBatchingMode(boolean inBatchingMode)64 void setInBatchingMode(boolean inBatchingMode) { 65 batchWhiteboard.setInBatchingMode(inBatchingMode); 66 uploader.setUnlimitedMode(!inBatchingMode); 67 } 68 69 /** 70 * We've been told by our upload delegate that a payload succeeded. 71 * Depending on the type of payload and batch mode status, inform our delegate of progress. 72 * 73 * @param response success response to our commit post 74 * @param isCommit was this a commit upload? 75 * @param isLastPayload was this a very last payload we'll upload? 76 */ payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload)77 void payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload) { 78 // Sanity check. 79 if (batchWhiteboard.getInBatchingMode() == null) { 80 throw new IllegalStateException("Can't process payload success until we know if we're in a batching mode"); 81 } 82 83 final int recordsSucceeded = batchWhiteboard.getSuccessRecordCount(); 84 // We consider records to have been committed if we're not in a batching mode or this was a commit. 85 // If records have been committed, notify our store delegate. 86 if (!batchWhiteboard.getInBatchingMode() || isCommit) { 87 uploader.sessionStoreDelegate.onRecordStoreSucceeded(recordsSucceeded); 88 89 // If we're not in a batching mode, or just committed a batch, uploaded records have 90 // been applied to the server storage and are now visible to other clients. 91 // Therefore, we bump our local "last store" timestamp. 92 bumpTimestampTo(uploadTimestamp, response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED)); 93 uploader.setLastStoreTimestamp(uploadTimestamp); 94 batchWhiteboard.clearSuccessRecordCounter(); 95 } 96 97 if (isCommit || !batchWhiteboard.getInBatchingMode()) { 98 uploader.sessionStoreDelegate.onBatchCommitted(); 99 } 100 101 // If this was our very last commit, we're done storing records. 102 // Get Last-Modified timestamp from the response, and pass it upstream. 103 if (isLastPayload) { 104 uploader.finished(); 105 } 106 } 107 payloadFailed(Exception e)108 void payloadFailed(Exception e) { 109 doStoreFailed(e); 110 } 111 finalizeQueue(final boolean needToCommit, final Runnable finalRunnable)112 void finalizeQueue(final boolean needToCommit, final Runnable finalRunnable) { 113 executor.execute(new NonPayloadContextRunnable() { 114 @Override 115 public void run() { 116 // Must be called after last payload upload finishes. 117 if (needToCommit && Boolean.TRUE.equals(batchWhiteboard.getInBatchingMode())) { 118 finalRunnable.run(); 119 120 // Otherwise, we're done. 121 } else { 122 uploader.finished(); 123 } 124 } 125 }); 126 } 127 recordFailed(final String recordGuid)128 void recordFailed(final String recordGuid) { 129 recordFailed(new Server15RecordPostFailedException(), recordGuid); 130 } 131 recordFailed(final Exception e, final String recordGuid)132 void recordFailed(final Exception e, final String recordGuid) { 133 Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString()); 134 uploader.sessionStoreDelegate.onRecordStoreFailed(e, recordGuid); 135 } 136 concurrentModificationDetected()137 void concurrentModificationDetected() { 138 doStoreFailed(new CollectionConcurrentModificationException()); 139 } 140 prepareForNextBatch()141 void prepareForNextBatch() { 142 batchWhiteboard = batchWhiteboard.nextBatchMeta(); 143 } 144 doStoreFailed(Exception reason)145 /* package-local */ void doStoreFailed(Exception reason) { 146 if (storeFailed.getAndSet(true)) { 147 return; 148 } 149 uploader.abort(); 150 uploader.sessionStoreDelegate.onStoreFailed(reason); 151 } 152 bumpTimestampTo(final AtomicLong current, long newValue)153 private static void bumpTimestampTo(final AtomicLong current, long newValue) { 154 while (true) { 155 long existing = current.get(); 156 if (existing > newValue) { 157 return; 158 } 159 if (current.compareAndSet(existing, newValue)) { 160 return; 161 } 162 } 163 } 164 165 /** 166 * Allows tests to define their own RecordUploadRunnable. 167 */ 168 @VisibleForTesting createRecordUploadRunnable(final ArrayList<byte[]> outgoing, final ArrayList<String> outgoingGuids, final long byteCount, final boolean isCommit, final boolean isLastPayload)169 Runnable createRecordUploadRunnable(final ArrayList<byte[]> outgoing, 170 final ArrayList<String> outgoingGuids, 171 final long byteCount, 172 final boolean isCommit, final boolean isLastPayload) { 173 return new RecordUploadRunnable( 174 new BatchingAtomicUploaderMayUploadProvider(), 175 uploader.collectionUri, 176 batchWhiteboard.getToken(), 177 new PayloadUploadDelegate( 178 uploader.authHeaderProvider, 179 PayloadDispatcher.this, 180 outgoingGuids, 181 isCommit, 182 isLastPayload 183 ), 184 outgoing, 185 byteCount, 186 isCommit 187 ); 188 } 189 190 /** 191 * Allows tests to easily peek into the flow of upload tasks. 192 */ 193 @VisibleForTesting 194 abstract static class BatchContextRunnable implements Runnable { 195 boolean isCommit; 196 BatchContextRunnable(boolean isCommit)197 BatchContextRunnable(boolean isCommit) { 198 this.isCommit = isCommit; 199 } 200 } 201 202 /** 203 * Allows tests to tell apart non-payload runnables going through the executor. 204 */ 205 @VisibleForTesting 206 abstract static class NonPayloadContextRunnable implements Runnable {} 207 208 // Instances of this class must be accessed from threads running on the `executor`. 209 private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider { mayUpload()210 public boolean mayUpload() { 211 return !storeFailed.get(); 212 } 213 } 214 } 215