1 /* 2 * Copyright (C) by Olivier Goffart <ogoffart@owncloud.com> 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2 of the License, or 7 * (at your option) any later version. 8 * 9 * This program is distributed in the hope that it will be useful, but 10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * for more details. 13 */ 14 #pragma once 15 16 #include "owncloudpropagator.h" 17 #include "networkjobs.h" 18 19 #include <QBuffer> 20 #include <QFile> 21 #include <QElapsedTimer> 22 23 24 namespace OCC { 25 26 Q_DECLARE_LOGGING_CATEGORY(lcPutJob) 27 Q_DECLARE_LOGGING_CATEGORY(lcPropagateUpload) 28 Q_DECLARE_LOGGING_CATEGORY(lcPropagateUploadV1) 29 Q_DECLARE_LOGGING_CATEGORY(lcPropagateUploadNG) 30 31 class BandwidthManager; 32 33 /** 34 * @brief The UploadDevice class 35 * @ingroup libsync 36 */ 37 class UploadDevice : public QIODevice 38 { 39 Q_OBJECT 40 public: 41 UploadDevice(const QString &fileName, qint64 start, qint64 size, BandwidthManager *bwm); 42 ~UploadDevice() override; 43 44 bool open(QIODevice::OpenMode mode) override; 45 void close() override; 46 47 qint64 writeData(const char *, qint64) override; 48 qint64 readData(char *data, qint64 maxlen) override; 49 bool atEnd() const override; 50 qint64 size() const override; 51 qint64 bytesAvailable() const override; 52 bool isSequential() const override; 53 bool seek(qint64 pos) override; 54 55 void setBandwidthLimited(bool); isBandwidthLimited()56 bool isBandwidthLimited() { return _bandwidthLimited; } 57 void setChoked(bool); isChoked()58 bool isChoked() { return _choked; } 59 void giveBandwidthQuota(qint64 bwq); 60 61 signals: 62 63 private: 64 /// The local file to read data from 65 QFile _file; 66 67 /// Start of the file data to use 68 qint64 _start = 0; 69 /// Amount of file data after _start to use 70 qint64 _size = 0; 71 /// Position between _start and _start+_size 72 qint64 _read = 0; 73 74 // Bandwidth manager related 75 QPointer<BandwidthManager> _bandwidthManager; 76 qint64 _bandwidthQuota = 0; 77 qint64 _readWithProgress = 0; 78 bool _bandwidthLimited = false; // if _bandwidthQuota will be used 79 bool _choked = false; // if upload is paused (readData() will return 0) 80 friend class BandwidthManager; 81 public slots: 82 void slotJobUploadProgress(qint64 sent, qint64 t); 83 }; 84 85 /** 86 * @brief The PUTFileJob class 87 * @ingroup libsync 88 */ 89 class PUTFileJob : public AbstractNetworkJob 90 { 91 Q_OBJECT 92 93 private: 94 QIODevice *_device; 95 QMap<QByteArray, QByteArray> _headers; 96 QString _errorString; 97 QUrl _url; 98 QElapsedTimer _requestTimer; 99 100 public: 101 // Takes ownership of the device 102 explicit PUTFileJob(AccountPtr account, const QString &path, std::unique_ptr<QIODevice> device, 103 const QMap<QByteArray, QByteArray> &headers, int chunk, QObject *parent = nullptr) AbstractNetworkJob(account,path,parent)104 : AbstractNetworkJob(account, path, parent) 105 , _device(device.release()) 106 , _headers(headers) 107 , _chunk(chunk) 108 { 109 _device->setParent(this); 110 } 111 explicit PUTFileJob(AccountPtr account, const QUrl &url, std::unique_ptr<QIODevice> device, 112 const QMap<QByteArray, QByteArray> &headers, int chunk, QObject *parent = nullptr) AbstractNetworkJob(account,QString (),parent)113 : AbstractNetworkJob(account, QString(), parent) 114 , _device(device.release()) 115 , _headers(headers) 116 , _url(url) 117 , _chunk(chunk) 118 { 119 _device->setParent(this); 120 } 121 ~PUTFileJob() override; 122 123 int _chunk; 124 125 void start() override; 126 127 bool finished() override; 128 device()129 QIODevice *device() 130 { 131 return _device; 132 } 133 errorString()134 QString errorString() const override 135 { 136 return _errorString.isEmpty() ? AbstractNetworkJob::errorString() : _errorString; 137 } 138 msSinceStart()139 std::chrono::milliseconds msSinceStart() const 140 { 141 return std::chrono::milliseconds(_requestTimer.elapsed()); 142 } 143 144 signals: 145 void finishedSignal(); 146 void uploadProgress(qint64, qint64); 147 148 }; 149 150 /** 151 * @brief This job implements the asynchronous PUT 152 * 153 * If the server replies to a PUT with a OC-JobStatus-Location path, we will query this url until the server 154 * replies with an etag. 155 * @ingroup libsync 156 */ 157 class PollJob : public AbstractNetworkJob 158 { 159 Q_OBJECT 160 SyncJournalDb *_journal; 161 QString _localPath; 162 163 public: 164 SyncFileItemPtr _item; 165 // Takes ownership of the device PollJob(AccountPtr account,const QString & path,const SyncFileItemPtr & item,SyncJournalDb * journal,const QString & localPath,QObject * parent)166 explicit PollJob(AccountPtr account, const QString &path, const SyncFileItemPtr &item, 167 SyncJournalDb *journal, const QString &localPath, QObject *parent) 168 : AbstractNetworkJob(account, path, parent) 169 , _journal(journal) 170 , _localPath(localPath) 171 , _item(item) 172 { 173 } 174 175 void start() override; 176 bool finished() override; 177 178 signals: 179 void finishedSignal(); 180 }; 181 182 class PropagateUploadEncrypted; 183 184 /** 185 * @brief The PropagateUploadFileCommon class is the code common between all chunking algorithms 186 * @ingroup libsync 187 * 188 * State Machine: 189 * 190 * +---> start() --> (delete job) -------+ 191 * | | 192 * +--> slotComputeContentChecksum() <---+ 193 * | 194 * v 195 * slotComputeTransmissionChecksum() 196 * | 197 * v 198 * slotStartUpload() -> doStartUpload() 199 * . 200 * . 201 * v 202 * finalize() or abortWithError() or startPollJob() 203 */ 204 class PropagateUploadFileCommon : public PropagateItemJob 205 { 206 Q_OBJECT 207 208 struct UploadStatus { 209 SyncFileItem::Status status = SyncFileItem::NoStatus; 210 QString message; 211 }; 212 213 protected: 214 QVector<AbstractNetworkJob *> _jobs; /// network jobs that are currently in transit 215 bool _finished BITFIELD(1); /// Tells that all the jobs have been finished 216 bool _deleteExisting BITFIELD(1); 217 218 /** Whether an abort is currently ongoing. 219 * 220 * Important to avoid duplicate aborts since each finishing PUTFileJob might 221 * trigger an abort on error. 222 */ 223 bool _aborting BITFIELD(1); 224 225 /* This is a minified version of the SyncFileItem, 226 * that holds only the specifics about the file that's 227 * being uploaded. 228 * 229 * This is needed if we wanna apply changes on the file 230 * that's being uploaded while keeping the original on disk. 231 */ 232 struct UploadFileInfo { 233 QString _file; /// I'm still unsure if I should use a SyncFilePtr here. 234 QString _path; /// the full path on disk. 235 qint64 _size; 236 }; 237 UploadFileInfo _fileToUpload; 238 QByteArray _transmissionChecksumHeader; 239 240 public: 241 PropagateUploadFileCommon(OwncloudPropagator *propagator, const SyncFileItemPtr &item); 242 243 /** 244 * Whether an existing entity with the same name may be deleted before 245 * the upload. 246 * 247 * Default: false. 248 */ 249 void setDeleteExisting(bool enabled); 250 251 /* start should setup the file, path and size that will be send to the server */ 252 void start() override; 253 void setupEncryptedFile(const QString& path, const QString& filename, quint64 size); 254 void setupUnencryptedFile(); 255 void startUploadFile(); 256 void callUnlockFolder(); isLikelyFinishedQuickly()257 bool isLikelyFinishedQuickly() override { return _item->_size < propagator()->smallFileSize(); } 258 259 private slots: 260 void slotComputeContentChecksum(); 261 // Content checksum computed, compute the transmission checksum 262 void slotComputeTransmissionChecksum(const QByteArray &contentChecksumType, const QByteArray &contentChecksum); 263 // transmission checksum computed, prepare the upload 264 void slotStartUpload(const QByteArray &transmissionChecksumType, const QByteArray &transmissionChecksum); 265 // invoked when encrypted folder lock has been released 266 void slotFolderUnlocked(const QByteArray &folderId, int httpReturnCode); 267 // invoked on internal error to unlock a folder and faile 268 void slotOnErrorStartFolderUnlock(SyncFileItem::Status status, const QString &errorString); 269 270 public: 271 virtual void doStartUpload() = 0; 272 273 void startPollJob(const QString &path); 274 void finalize(); 275 void abortWithError(SyncFileItem::Status status, const QString &error); 276 277 public slots: 278 void slotJobDestroyed(QObject *job); 279 280 private slots: 281 void slotPollFinished(); 282 283 protected: 284 void done(SyncFileItem::Status status, const QString &errorString = QString()) override; 285 286 /** 287 * Aborts all running network jobs, except for the ones that mayAbortJob 288 * returns false on and, for async aborts, emits abortFinished when done. 289 */ 290 void abortNetworkJobs( 291 AbortType abortType, 292 const std::function<bool(AbstractNetworkJob *job)> &mayAbortJob); 293 294 /** 295 * Checks whether the current error is one that should reset the whole 296 * transfer if it happens too often. If so: Bump UploadInfo::errorCount 297 * and maybe perform the reset. 298 */ 299 void checkResettingErrors(); 300 301 /** 302 * Error handling functionality that is shared between jobs. 303 */ 304 void commonErrorHandling(AbstractNetworkJob *job); 305 306 /** 307 * Increases the timeout for the final MOVE/PUT for large files. 308 * 309 * This is an unfortunate workaround since the drawback is not being able to 310 * detect real disconnects in a timely manner. Shall go away when the server 311 * response starts coming quicker, or there is some sort of async api. 312 * 313 * See #6527, enterprise#2480 314 */ 315 static void adjustLastJobTimeout(AbstractNetworkJob *job, qint64 fileSize); 316 317 /** Bases headers that need to be sent on the PUT, or in the MOVE for chunking-ng */ 318 QMap<QByteArray, QByteArray> headers(); 319 private: 320 PropagateUploadEncrypted *_uploadEncryptedHelper; 321 bool _uploadingEncrypted; 322 UploadStatus _uploadStatus; 323 }; 324 325 /** 326 * @ingroup libsync 327 * 328 * Propagation job, impementing the old chunking agorithm 329 * 330 */ 331 class PropagateUploadFileV1 : public PropagateUploadFileCommon 332 { 333 Q_OBJECT 334 335 private: 336 /** 337 * That's the start chunk that was stored in the database for resuming. 338 * In the non-resuming case it is 0. 339 * If we are resuming, this is the first chunk we need to send 340 */ 341 int _startChunk = 0; 342 /** 343 * This is the next chunk that we need to send. Starting from 0 even if _startChunk != 0 344 * (In other words, _startChunk + _currentChunk is really the number of the chunk we need to send next) 345 * (In other words, _currentChunk is the number of the chunk that we already sent or started sending) 346 */ 347 int _currentChunk = 0; 348 int _chunkCount = 0; /// Total number of chunks for this file 349 uint _transferId = 0; /// transfer id (part of the url) 350 chunkSize()351 qint64 chunkSize() const { 352 // Old chunking does not use dynamic chunking algorithm, and does not adjusts the chunk size respectively, 353 // thus this value should be used as the one classifing item to be chunked 354 return propagator()->syncOptions()._initialChunkSize; 355 } 356 357 public: PropagateUploadFileV1(OwncloudPropagator * propagator,const SyncFileItemPtr & item)358 PropagateUploadFileV1(OwncloudPropagator *propagator, const SyncFileItemPtr &item) 359 : PropagateUploadFileCommon(propagator, item) 360 { 361 } 362 363 void doStartUpload() override; 364 public slots: 365 void abort(PropagatorJob::AbortType abortType) override; 366 private slots: 367 void startNextChunk(); 368 void slotPutFinished(); 369 void slotUploadProgress(qint64, qint64); 370 }; 371 372 /** 373 * @ingroup libsync 374 * 375 * Propagation job, impementing the new chunking agorithm 376 * 377 */ 378 class PropagateUploadFileNG : public PropagateUploadFileCommon 379 { 380 Q_OBJECT 381 private: 382 qint64 _sent = 0; /// amount of data (bytes) that was already sent 383 uint _transferId = 0; /// transfer id (part of the url) 384 int _currentChunk = 0; /// Id of the next chunk that will be sent 385 qint64 _currentChunkSize = 0; /// current chunk size 386 bool _removeJobError = false; /// If not null, there was an error removing the job 387 388 // Map chunk number with its size from the PROPFIND on resume. 389 // (Only used from slotPropfindIterate/slotPropfindFinished because the LsColJob use signals to report data.) 390 struct ServerChunkInfo 391 { 392 qint64 size; 393 QString originalName; 394 }; 395 QMap<qint64, ServerChunkInfo> _serverChunks; 396 397 /** 398 * Return the URL of a chunk. 399 * If chunk == -1, returns the URL of the parent folder containing the chunks 400 */ 401 QUrl chunkUrl(int chunk = -1); 402 403 public: PropagateUploadFileNG(OwncloudPropagator * propagator,const SyncFileItemPtr & item)404 PropagateUploadFileNG(OwncloudPropagator *propagator, const SyncFileItemPtr &item) 405 : PropagateUploadFileCommon(propagator, item) 406 { 407 } 408 409 void doStartUpload() override; 410 411 private: 412 void startNewUpload(); 413 void startNextChunk(); 414 public slots: 415 void abort(AbortType abortType) override; 416 private slots: 417 void slotPropfindFinished(); 418 void slotPropfindFinishedWithError(); 419 void slotPropfindIterate(const QString &name, const QMap<QString, QString> &properties); 420 void slotDeleteJobFinished(); 421 void slotMkColFinished(); 422 void slotPutFinished(); 423 void slotMoveJobFinished(); 424 void slotUploadProgress(qint64, qint64); 425 }; 426 } 427