1 /* 2 * Copyright (C) by Olivier Goffart <ogoffart@owncloud.com> 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2 of the License, or 7 * (at your option) any later version. 8 * 9 * This program is distributed in the hope that it will be useful, but 10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * for more details. 13 */ 14 15 #ifndef OWNCLOUDPROPAGATOR_H 16 #define OWNCLOUDPROPAGATOR_H 17 18 #include <QHash> 19 #include <QObject> 20 #include <QMap> 21 #include <QLinkedList> 22 #include <QElapsedTimer> 23 #include <QTimer> 24 #include <QPointer> 25 #include <QIODevice> 26 #include <QMutex> 27 28 #include "csync_util.h" 29 #include "syncfileitem.h" 30 #include "common/syncjournaldb.h" 31 #include "bandwidthmanager.h" 32 #include "accountfwd.h" 33 #include "syncoptions.h" 34 35 namespace OCC { 36 37 Q_DECLARE_LOGGING_CATEGORY(lcPropagator) 38 39 /** Free disk space threshold below which syncs will abort and not even start. 40 */ 41 qint64 criticalFreeSpaceLimit(); 42 43 /** The client will not intentionally reduce the available free disk space below 44 * this limit. 45 * 46 * Uploads will still run and downloads that are small enough will continue too. 47 */ 48 qint64 freeSpaceLimit(); 49 50 class SyncJournalDb; 51 class OwncloudPropagator; 52 class PropagatorCompositeJob; 53 54 /** 55 * @brief the base class of propagator jobs 56 * 57 * This can either be a job, or a container for jobs. 58 * If it is a composite job, it then inherits from PropagateDirectory 59 * 60 * @ingroup libsync 61 */ 62 class PropagatorJob : public QObject 63 { 64 Q_OBJECT 65 66 public: 67 explicit PropagatorJob(OwncloudPropagator *propagator); 68 69 enum AbortType { 70 Synchronous, 71 Asynchronous 72 }; 73 74 enum JobState { 75 NotYetStarted, 76 Running, 77 Finished 78 }; 79 JobState _state; 80 81 enum JobParallelism { 82 83 /** Jobs can be run in parallel to this job */ 84 FullParallelism, 85 86 /** No other job shall be started until this one has finished. 87 So this job is guaranteed to finish before any jobs below it 88 are executed. */ 89 WaitForFinished, 90 }; 91 parallelism()92 virtual JobParallelism parallelism() { return FullParallelism; } 93 94 /** 95 * For "small" jobs 96 */ isLikelyFinishedQuickly()97 virtual bool isLikelyFinishedQuickly() { return false; } 98 99 /** The space that the running jobs need to complete but don't actually use yet. 100 * 101 * Note that this does *not* include the disk space that's already 102 * in use by running jobs for things like a download-in-progress. 103 */ committedDiskSpace()104 virtual qint64 committedDiskSpace() const { return 0; } 105 106 /** Set the associated composite job 107 * 108 * Used only from PropagatorCompositeJob itself, when a job is added 109 * and from PropagateDirectory to associate the subJobs with the first 110 * job. 111 */ setAssociatedComposite(PropagatorCompositeJob * job)112 void setAssociatedComposite(PropagatorCompositeJob *job) { _associatedComposite = job; } 113 114 public slots: 115 /* 116 * Asynchronous abort requires emit of abortFinished() signal, 117 * while synchronous is expected to abort immedietaly. 118 */ abort(PropagatorJob::AbortType abortType)119 virtual void abort(PropagatorJob::AbortType abortType) { 120 if (abortType == AbortType::Asynchronous) 121 emit abortFinished(); 122 } 123 124 /** Starts this job, or a new subjob 125 * returns true if a job was started. 126 */ 127 virtual bool scheduleSelfOrChild() = 0; 128 signals: 129 /** 130 * Emitted when the job is fully finished 131 */ 132 void finished(SyncFileItem::Status); 133 134 /** 135 * Emitted when the abort is fully finished 136 */ 137 void abortFinished(SyncFileItem::Status status = SyncFileItem::NormalError); 138 protected: 139 OwncloudPropagator *propagator() const; 140 141 /** If this job gets added to a composite job, this will point to the parent. 142 * 143 * For the PropagateDirectory::_firstJob it will point to 144 * PropagateDirectory::_subJobs. 145 * 146 * That can be useful for jobs that want to spawn follow-up jobs without 147 * becoming composite jobs themselves. 148 */ 149 PropagatorCompositeJob *_associatedComposite = nullptr; 150 }; 151 152 /* 153 * Abstract class to propagate a single item 154 */ 155 class PropagateItemJob : public PropagatorJob 156 { 157 Q_OBJECT 158 protected: 159 virtual void done(SyncFileItem::Status status, const QString &errorString = QString()); 160 161 /* 162 * set a custom restore job message that is used if the restore job succeeded. 163 * It is displayed in the activity view. 164 */ restoreJobMsg()165 QString restoreJobMsg() const 166 { 167 return _item->_isRestoration ? _item->_errorString : QString(); 168 } 169 void setRestoreJobMsg(const QString &msg = QString()) 170 { 171 _item->_isRestoration = true; 172 _item->_errorString = msg; 173 } 174 175 protected slots: 176 void slotRestoreJobFinished(SyncFileItem::Status status); 177 178 private: 179 QScopedPointer<PropagateItemJob> _restoreJob; 180 181 public: PropagateItemJob(OwncloudPropagator * propagator,const SyncFileItemPtr & item)182 PropagateItemJob(OwncloudPropagator *propagator, const SyncFileItemPtr &item) 183 : PropagatorJob(propagator) 184 , _item(item) 185 { 186 } 187 ~PropagateItemJob() override; 188 scheduleSelfOrChild()189 bool scheduleSelfOrChild() override 190 { 191 if (_state != NotYetStarted) { 192 return false; 193 } 194 qCInfo(lcPropagator) << "Starting" << _item->_instruction << "propagation of" << _item->destination() << "by" << this; 195 196 _state = Running; 197 QMetaObject::invokeMethod(this, "start"); // We could be in a different thread (neon jobs) 198 return true; 199 } 200 201 SyncFileItemPtr _item; 202 203 public slots: 204 virtual void start() = 0; 205 }; 206 207 /** 208 * @brief Job that runs subjobs. It becomes finished only when all subjobs are finished. 209 * @ingroup libsync 210 */ 211 class PropagatorCompositeJob : public PropagatorJob 212 { 213 Q_OBJECT 214 public: 215 QVector<PropagatorJob *> _jobsToDo; 216 SyncFileItemVector _tasksToDo; 217 QVector<PropagatorJob *> _runningJobs; 218 SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error 219 quint64 _abortsCount; 220 PropagatorCompositeJob(OwncloudPropagator * propagator)221 explicit PropagatorCompositeJob(OwncloudPropagator *propagator) 222 : PropagatorJob(propagator) 223 , _hasError(SyncFileItem::NoStatus), _abortsCount(0) 224 { 225 } 226 ~PropagatorCompositeJob()227 ~PropagatorCompositeJob() override 228 { 229 // Don't delete jobs in _jobsToDo and _runningJobs: they have parents 230 // that will be responsible for cleanup. Deleting them here would risk 231 // deleting something that has already been deleted by a shared parent. 232 } 233 234 void appendJob(PropagatorJob *job); appendTask(const SyncFileItemPtr & item)235 void appendTask(const SyncFileItemPtr &item) 236 { 237 _tasksToDo.append(item); 238 } 239 240 bool scheduleSelfOrChild() override; 241 JobParallelism parallelism() override; 242 243 /* 244 * Abort synchronously or asynchronously - some jobs 245 * require to be finished without immediete abort (abort on job might 246 * cause conflicts/duplicated files - owncloud/client/issues/5949) 247 */ abort(PropagatorJob::AbortType abortType)248 void abort(PropagatorJob::AbortType abortType) override 249 { 250 if (!_runningJobs.empty()) { 251 _abortsCount = _runningJobs.size(); 252 foreach (PropagatorJob *j, _runningJobs) { 253 if (abortType == AbortType::Asynchronous) { 254 connect(j, &PropagatorJob::abortFinished, 255 this, &PropagatorCompositeJob::slotSubJobAbortFinished); 256 } 257 j->abort(abortType); 258 } 259 } else if (abortType == AbortType::Asynchronous){ 260 emit abortFinished(); 261 } 262 } 263 264 qint64 committedDiskSpace() const override; 265 266 private slots: 267 void slotSubJobAbortFinished(); possiblyRunNextJob(PropagatorJob * next)268 bool possiblyRunNextJob(PropagatorJob *next) 269 { 270 if (next->_state == NotYetStarted) { 271 connect(next, &PropagatorJob::finished, this, &PropagatorCompositeJob::slotSubJobFinished); 272 } 273 return next->scheduleSelfOrChild(); 274 } 275 276 void slotSubJobFinished(SyncFileItem::Status status); 277 void finalize(); 278 }; 279 280 /** 281 * @brief Propagate a directory, and all its sub entries. 282 * @ingroup libsync 283 */ 284 class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob 285 { 286 Q_OBJECT 287 public: 288 SyncFileItemPtr _item; 289 // e.g: create the directory 290 QScopedPointer<PropagateItemJob> _firstJob; 291 292 PropagatorCompositeJob _subJobs; 293 294 explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item); 295 appendJob(PropagatorJob * job)296 void appendJob(PropagatorJob *job) 297 { 298 _subJobs.appendJob(job); 299 } 300 appendTask(const SyncFileItemPtr & item)301 void appendTask(const SyncFileItemPtr &item) 302 { 303 _subJobs.appendTask(item); 304 } 305 306 bool scheduleSelfOrChild() override; 307 JobParallelism parallelism() override; abort(PropagatorJob::AbortType abortType)308 void abort(PropagatorJob::AbortType abortType) override 309 { 310 if (_firstJob) 311 // Force first job to abort synchronously 312 // even if caller allows async abort (asyncAbort) 313 _firstJob->abort(AbortType::Synchronous); 314 315 if (abortType == AbortType::Asynchronous){ 316 connect(&_subJobs, &PropagatorCompositeJob::abortFinished, this, &PropagateDirectory::abortFinished); 317 } 318 _subJobs.abort(abortType); 319 } 320 increaseAffectedCount()321 void increaseAffectedCount() 322 { 323 _firstJob->_item->_affectedItems++; 324 } 325 326 committedDiskSpace()327 qint64 committedDiskSpace() const override 328 { 329 return _subJobs.committedDiskSpace(); 330 } 331 332 private slots: 333 334 void slotFirstJobFinished(SyncFileItem::Status status); 335 virtual void slotSubJobsFinished(SyncFileItem::Status status); 336 337 }; 338 339 /** 340 * @brief Propagate the root directory, and all its sub entries. 341 * @ingroup libsync 342 * 343 * Primary difference to PropagateDirectory is that it keeps track of directory 344 * deletions that must happen at the very end. 345 */ 346 class OWNCLOUDSYNC_EXPORT PropagateRootDirectory : public PropagateDirectory 347 { 348 Q_OBJECT 349 public: 350 PropagatorCompositeJob _dirDeletionJobs; 351 352 explicit PropagateRootDirectory(OwncloudPropagator *propagator); 353 354 bool scheduleSelfOrChild() override; 355 JobParallelism parallelism() override; 356 void abort(PropagatorJob::AbortType abortType) override; 357 358 qint64 committedDiskSpace() const override; 359 360 private slots: 361 void slotSubJobsFinished(SyncFileItem::Status status) override; 362 void slotDirDeletionJobsFinished(SyncFileItem::Status status); 363 }; 364 365 /** 366 * @brief Dummy job that just mark it as completed and ignored 367 * @ingroup libsync 368 */ 369 class PropagateIgnoreJob : public PropagateItemJob 370 { 371 Q_OBJECT 372 public: PropagateIgnoreJob(OwncloudPropagator * propagator,const SyncFileItemPtr & item)373 PropagateIgnoreJob(OwncloudPropagator *propagator, const SyncFileItemPtr &item) 374 : PropagateItemJob(propagator, item) 375 { 376 } start()377 void start() override 378 { 379 SyncFileItem::Status status = _item->_status; 380 if (status == SyncFileItem::NoStatus) { 381 if (_item->_instruction == CSYNC_INSTRUCTION_ERROR) { 382 status = SyncFileItem::NormalError; 383 } else { 384 status = SyncFileItem::FileIgnored; 385 OC_ASSERT(_item->_instruction == CSYNC_INSTRUCTION_IGNORE); 386 } 387 } 388 done(status, _item->_errorString); 389 } 390 }; 391 392 class OWNCLOUDSYNC_EXPORT OwncloudPropagator : public QObject 393 { 394 Q_OBJECT 395 public: 396 SyncJournalDb *const _journal; 397 bool _finishedEmited; // used to ensure that finished is only emitted once 398 399 public: OwncloudPropagator(AccountPtr account,const QString & localDir,const QString & remoteFolder,SyncJournalDb * progressDb)400 OwncloudPropagator(AccountPtr account, const QString &localDir, 401 const QString &remoteFolder, SyncJournalDb *progressDb) 402 : _localDir((localDir.endsWith(QLatin1Char('/'))) ? localDir : localDir + QLatin1Char('/')) 403 , _remoteFolder((remoteFolder.endsWith(QLatin1Char('/'))) ? remoteFolder : remoteFolder + QLatin1Char('/')) 404 , _journal(progressDb) 405 , _finishedEmited(false) 406 , _bandwidthManager(this) 407 , _anotherSyncNeeded(false) 408 , _chunkSize(10 * 1000 * 1000) // 10 MB, overridden in setSyncOptions 409 , _account(account) 410 { 411 qRegisterMetaType<PropagatorJob::AbortType>("PropagatorJob::AbortType"); 412 } 413 414 ~OwncloudPropagator() override; 415 416 void start(SyncFileItemVector &&_syncedItems); 417 418 const SyncOptions &syncOptions() const; 419 void setSyncOptions(const SyncOptions &syncOptions); 420 421 int _downloadLimit = 0; 422 int _uploadLimit = 0; 423 BandwidthManager _bandwidthManager; 424 425 bool _abortRequested = false; 426 427 /** The list of currently active jobs. 428 This list contains the jobs that are currently using ressources and is used purely to 429 know how many jobs there is currently running for the scheduler. 430 Jobs add themself to the list when they do an assynchronous operation. 431 Jobs can be several time on the list (example, when several chunks are uploaded in parallel) 432 */ 433 QList<PropagateItemJob *> _activeJobList; 434 435 /** We detected that another sync is required after this one */ 436 bool _anotherSyncNeeded; 437 438 /** Per-folder quota guesses. 439 * 440 * This starts out empty. When an upload in a folder fails due to insufficent 441 * remote quota, the quota guess is updated to be attempted_size-1 at maximum. 442 * 443 * Note that it will usually just an upper limit for the actual quota - but 444 * since the quota on the server might change at any time it can sometimes be 445 * wrong in the other direction as well. 446 * 447 * This allows skipping of uploads that have a very high likelihood of failure. 448 */ 449 QHash<QString, qint64> _folderQuota; 450 451 /* the maximum number of jobs using bandwidth (uploads or downloads, in parallel) */ 452 int maximumActiveTransferJob(); 453 454 /** The size to use for upload chunks. 455 * 456 * Will be dynamically adjusted after each chunk upload finishes 457 * if Capabilities::desiredChunkUploadDuration has a target 458 * chunk-upload duration set. 459 */ 460 qint64 _chunkSize; 461 qint64 smallFileSize(); 462 463 /* The maximum number of active jobs in parallel */ 464 int hardMaximumActiveJob(); 465 466 /** Check whether a download would clash with an existing file 467 * in filesystems that are only case-preserving. 468 */ 469 bool localFileNameClash(const QString &relfile); 470 471 /** Check whether a file is properly accessible for upload. 472 * 473 * It is possible to create files with filenames that differ 474 * only by case in NTFS, but most operations such as stat and 475 * open only target one of these by default. 476 * 477 * When that happens, we want to avoid uploading incorrect data 478 * and give up on the file. 479 */ 480 bool hasCaseClashAccessibilityProblem(const QString &relfile); 481 482 Q_REQUIRED_RESULT QString fullLocalPath(const QString &tmp_file_name) const; 483 QString localPath() const; 484 485 /** 486 * Returns the full remote path including the folder root of a 487 * folder sync path. 488 */ 489 Q_REQUIRED_RESULT QString fullRemotePath(const QString &tmp_file_name) const; 490 QString remotePath() const; 491 492 /** Creates the job for an item. 493 */ 494 PropagateItemJob *createJob(const SyncFileItemPtr &item); 495 496 void scheduleNextJob(); 497 void reportProgress(const SyncFileItem &, qint64 bytes); 498 void reportFileTotal(const SyncFileItem &item, qint64 newSize); 499 abort()500 void abort() 501 { 502 if (_abortRequested) 503 return; 504 if (_rootJob) { 505 // Connect to abortFinished which signals that abort has been asynchronously finished 506 connect(_rootJob.data(), &PropagateDirectory::abortFinished, this, &OwncloudPropagator::emitFinished); 507 508 // Use Queued Connection because we're possibly already in an item's finished stack 509 QMetaObject::invokeMethod(_rootJob.data(), "abort", Qt::QueuedConnection, 510 Q_ARG(PropagatorJob::AbortType, PropagatorJob::AbortType::Asynchronous)); 511 512 // Give asynchronous abort 5000 msec to finish on its own 513 QTimer::singleShot(5000, this, SLOT(abortTimeout())); 514 } else { 515 // No root job, call emitFinished 516 emitFinished(SyncFileItem::NormalError); 517 } 518 } 519 520 AccountPtr account() const; 521 522 enum DiskSpaceResult { 523 DiskSpaceOk, 524 DiskSpaceFailure, 525 DiskSpaceCritical 526 }; 527 528 /** Checks whether there's enough disk space available to complete 529 * all jobs that are currently running. 530 */ 531 DiskSpaceResult diskSpaceCheck() const; 532 533 /** Handles a conflict by renaming the file 'item'. 534 * 535 * Sets up conflict records. 536 * 537 * It also creates a new upload job in composite if the item that's 538 * moved away is a file and conflict uploads are requested. 539 * 540 * Returns true on success, false and error on error. 541 */ 542 bool createConflict(const SyncFileItemPtr &item, 543 PropagatorCompositeJob *composite, QString *error); 544 545 // Map original path (as in the DB) to target final path 546 QMap<QString, QString> _renamedDirectories; 547 QString adjustRenamedPath(const QString &original) const; 548 549 /** Update the database for an item. 550 * 551 * Typically after a sync operation succeeded. Updates the inode from 552 * the filesystem. 553 * 554 * Will also trigger a Vfs::convertToPlaceholder. 555 */ 556 static bool updateMetadata(const SyncFileItem &item, const QString &localFolderPath, SyncJournalDb &journal, Vfs &vfs); 557 bool updateMetadata(const SyncFileItem &item); // convenience for the above 558 559 private slots: 560 abortTimeout()561 void abortTimeout() 562 { 563 // Abort synchronously and finish 564 _rootJob.data()->abort(PropagatorJob::AbortType::Synchronous); 565 emitFinished(SyncFileItem::NormalError); 566 } 567 568 /** Emit the finished signal and make sure it is only emitted once */ emitFinished(SyncFileItem::Status status)569 void emitFinished(SyncFileItem::Status status) 570 { 571 if (!_finishedEmited) 572 emit finished(status == SyncFileItem::Success); 573 _finishedEmited = true; 574 } 575 576 void scheduleNextJobImpl(); 577 578 signals: 579 void newItem(const SyncFileItemPtr &); 580 void itemCompleted(const SyncFileItemPtr &); 581 void progress(const SyncFileItem &, qint64 bytes); 582 void updateFileTotal(const SyncFileItem &, qint64 newSize); 583 void finished(bool success); 584 585 /** Emitted when propagation has problems with a locked file. */ 586 void seenLockedFile(const QString &fileName); 587 588 /** Emitted when propagation touches a file. 589 * 590 * Used to track our own file modifications such that notifications 591 * from the file watcher about these can be ignored. 592 */ 593 void touchedFile(const QString &fileName); 594 595 void insufficientLocalStorage(); 596 void insufficientRemoteStorage(); 597 598 private: 599 AccountPtr _account; 600 QScopedPointer<PropagateRootDirectory> _rootJob; 601 SyncOptions _syncOptions; 602 bool _jobScheduled = false; 603 604 const QString _localDir; // absolute path to the local directory. ends with '/' 605 const QString _remoteFolder; // remote folder, ends with '/' 606 }; 607 608 609 /** 610 * @brief Job that wait for all the poll jobs to be completed 611 * @ingroup libsync 612 */ 613 class CleanupPollsJob : public QObject 614 { 615 Q_OBJECT 616 QVector<SyncJournalDb::PollInfo> _pollInfos; 617 AccountPtr _account; 618 SyncJournalDb *_journal; 619 QString _localPath; 620 QSharedPointer<Vfs> _vfs; 621 622 public: 623 explicit CleanupPollsJob(const QVector<SyncJournalDb::PollInfo> &pollInfos, AccountPtr account, 624 SyncJournalDb *journal, const QString &localPath, const QSharedPointer<Vfs> &vfs, QObject *parent = nullptr) QObject(parent)625 : QObject(parent) 626 , _pollInfos(pollInfos) 627 , _account(account) 628 , _journal(journal) 629 , _localPath(localPath) 630 , _vfs(vfs) 631 { 632 } 633 634 ~CleanupPollsJob() override; 635 636 /** 637 * Start the job. After the job is completed, it will emit either finished or aborted, and it 638 * will destroy itself. 639 */ 640 void start(); 641 signals: 642 void finished(); 643 void aborted(const QString &error); 644 private slots: 645 void slotPollFinished(); 646 }; 647 } 648 649 #endif 650