1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 package org.mozilla.gecko.sync.repositories.uploaders;
6 
7 import android.support.annotation.Nullable;
8 import android.support.annotation.VisibleForTesting;
9 
10 import org.mozilla.gecko.background.common.log.Logger;
11 import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
12 import org.mozilla.gecko.sync.Server15RecordPostFailedException;
13 import org.mozilla.gecko.sync.net.SyncResponse;
14 import org.mozilla.gecko.sync.net.SyncStorageResponse;
15 
16 import java.util.ArrayList;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import java.util.concurrent.atomic.AtomicLong;
20 
21 /**
22  * Dispatches payload runnables and handles their results.
23  *
24  * All of the methods, except for `queue` and `finalizeQueue`, will be called from the thread(s)
25  * running sequentially on the SingleThreadExecutor `executor`.
26  */
27 class PayloadDispatcher {
28     private static final String LOG_TAG = "PayloadDispatcher";
29 
30     // All payload runnables share the same whiteboard.
31     // It's accessed directly by the runnables; tests also make use of this direct access.
32     volatile BatchMeta batchWhiteboard;
33     private final AtomicLong uploadTimestamp = new AtomicLong(0);
34 
35     private final Executor executor;
36     private final BatchingUploader uploader;
37 
38     // Written from sequentially running thread(s) on the SingleThreadExecutor `executor`.
39     // Read by many threads running concurrently on the records consumer thread pool.
40     final AtomicBoolean storeFailed = new AtomicBoolean(false);
41 
PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified)42     PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified) {
43         // Initially we don't know if we're in a batching mode.
44         this.batchWhiteboard = new BatchMeta(initialLastModified, null);
45         this.uploader = uploader;
46         this.executor = executor;
47     }
48 
queue( final ArrayList<byte[]> outgoing, final ArrayList<String> outgoingGuids, final long byteCount, final boolean isCommit, final boolean isLastPayload)49     void queue(
50             final ArrayList<byte[]> outgoing,
51             final ArrayList<String> outgoingGuids,
52             final long byteCount,
53             final boolean isCommit, final boolean isLastPayload) {
54 
55         // Note that `executor` is expected to be a SingleThreadExecutor.
56         executor.execute(new BatchContextRunnable(isCommit) {
57             @Override
58             public void run() {
59                 createRecordUploadRunnable(outgoing, outgoingGuids, byteCount, isCommit, isLastPayload).run();
60             }
61         });
62     }
63 
setInBatchingMode(boolean inBatchingMode)64     void setInBatchingMode(boolean inBatchingMode) {
65         batchWhiteboard.setInBatchingMode(inBatchingMode);
66         uploader.setUnlimitedMode(!inBatchingMode);
67     }
68 
69     /**
70      * We've been told by our upload delegate that a payload succeeded.
71      * Depending on the type of payload and batch mode status, inform our delegate of progress.
72      *
73      * @param response success response to our commit post
74      * @param isCommit was this a commit upload?
75      * @param isLastPayload was this a very last payload we'll upload?
76      */
payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload)77     void payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload) {
78         // Sanity check.
79         if (batchWhiteboard.getInBatchingMode() == null) {
80             throw new IllegalStateException("Can't process payload success until we know if we're in a batching mode");
81         }
82 
83         final int recordsSucceeded = batchWhiteboard.getSuccessRecordCount();
84         // We consider records to have been committed if we're not in a batching mode or this was a commit.
85         // If records have been committed, notify our store delegate.
86         if (!batchWhiteboard.getInBatchingMode() || isCommit) {
87             uploader.sessionStoreDelegate.onRecordStoreSucceeded(recordsSucceeded);
88 
89             // If we're not in a batching mode, or just committed a batch, uploaded records have
90             // been applied to the server storage and are now visible to other clients.
91             // Therefore, we bump our local "last store" timestamp.
92             bumpTimestampTo(uploadTimestamp, response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED));
93             uploader.setLastStoreTimestamp(uploadTimestamp);
94             batchWhiteboard.clearSuccessRecordCounter();
95         }
96 
97         if (isCommit || !batchWhiteboard.getInBatchingMode()) {
98             uploader.sessionStoreDelegate.onBatchCommitted();
99         }
100 
101         // If this was our very last commit, we're done storing records.
102         // Get Last-Modified timestamp from the response, and pass it upstream.
103         if (isLastPayload) {
104             uploader.finished();
105         }
106     }
107 
payloadFailed(Exception e)108     void payloadFailed(Exception e) {
109         doStoreFailed(e);
110     }
111 
finalizeQueue(final boolean needToCommit, final Runnable finalRunnable)112     void finalizeQueue(final boolean needToCommit, final Runnable finalRunnable) {
113         executor.execute(new NonPayloadContextRunnable() {
114             @Override
115             public void run() {
116                 // Must be called after last payload upload finishes.
117                 if (needToCommit && Boolean.TRUE.equals(batchWhiteboard.getInBatchingMode())) {
118                     finalRunnable.run();
119 
120                     // Otherwise, we're done.
121                 } else {
122                     uploader.finished();
123                 }
124             }
125         });
126     }
127 
recordFailed(final String recordGuid)128     void recordFailed(final String recordGuid) {
129         recordFailed(new Server15RecordPostFailedException(), recordGuid);
130     }
131 
recordFailed(final Exception e, final String recordGuid)132     void recordFailed(final Exception e, final String recordGuid) {
133         Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
134         uploader.sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
135     }
136 
concurrentModificationDetected()137     void concurrentModificationDetected() {
138         doStoreFailed(new CollectionConcurrentModificationException());
139     }
140 
prepareForNextBatch()141     void prepareForNextBatch() {
142         batchWhiteboard = batchWhiteboard.nextBatchMeta();
143     }
144 
doStoreFailed(Exception reason)145     /* package-local */ void doStoreFailed(Exception reason) {
146         if (storeFailed.getAndSet(true)) {
147             return;
148         }
149         uploader.abort();
150         uploader.sessionStoreDelegate.onStoreFailed(reason);
151     }
152 
bumpTimestampTo(final AtomicLong current, long newValue)153     private static void bumpTimestampTo(final AtomicLong current, long newValue) {
154         while (true) {
155             long existing = current.get();
156             if (existing > newValue) {
157                 return;
158             }
159             if (current.compareAndSet(existing, newValue)) {
160                 return;
161             }
162         }
163     }
164 
165     /**
166      * Allows tests to define their own RecordUploadRunnable.
167      */
168     @VisibleForTesting
createRecordUploadRunnable(final ArrayList<byte[]> outgoing, final ArrayList<String> outgoingGuids, final long byteCount, final boolean isCommit, final boolean isLastPayload)169     Runnable createRecordUploadRunnable(final ArrayList<byte[]> outgoing,
170                                                   final ArrayList<String> outgoingGuids,
171                                                   final long byteCount,
172                                                   final boolean isCommit, final boolean isLastPayload) {
173         return new RecordUploadRunnable(
174                 new BatchingAtomicUploaderMayUploadProvider(),
175                 uploader.collectionUri,
176                 batchWhiteboard.getToken(),
177                 new PayloadUploadDelegate(
178                         uploader.authHeaderProvider,
179                         PayloadDispatcher.this,
180                         outgoingGuids,
181                         isCommit,
182                         isLastPayload
183                 ),
184                 outgoing,
185                 byteCount,
186                 isCommit
187         );
188     }
189 
190     /**
191      * Allows tests to easily peek into the flow of upload tasks.
192      */
193     @VisibleForTesting
194     abstract static class BatchContextRunnable implements Runnable {
195         boolean isCommit;
196 
BatchContextRunnable(boolean isCommit)197         BatchContextRunnable(boolean isCommit) {
198             this.isCommit = isCommit;
199         }
200     }
201 
202     /**
203      * Allows tests to tell apart non-payload runnables going through the executor.
204      */
205     @VisibleForTesting
206     abstract static class NonPayloadContextRunnable implements Runnable {}
207 
208     // Instances of this class must be accessed from threads running on the `executor`.
209     private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider {
mayUpload()210         public boolean mayUpload() {
211             return !storeFailed.get();
212         }
213     }
214 }
215