1 /*
2  * Copyright (C) by Olivier Goffart <ogoffart@owncloud.com>
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12  * for more details.
13  */
14 
15 #ifndef OWNCLOUDPROPAGATOR_H
16 #define OWNCLOUDPROPAGATOR_H
17 
18 #include <QHash>
19 #include <QObject>
20 #include <QMap>
21 #include <QLinkedList>
22 #include <QElapsedTimer>
23 #include <QTimer>
24 #include <QPointer>
25 #include <QIODevice>
26 #include <QMutex>
27 
28 #include "csync_util.h"
29 #include "syncfileitem.h"
30 #include "common/syncjournaldb.h"
31 #include "bandwidthmanager.h"
32 #include "accountfwd.h"
33 #include "syncoptions.h"
34 
35 namespace OCC {
36 
37 Q_DECLARE_LOGGING_CATEGORY(lcPropagator)
38 
39 /** Free disk space threshold below which syncs will abort and not even start.
40  */
41 qint64 criticalFreeSpaceLimit();
42 
43 /** The client will not intentionally reduce the available free disk space below
44  *  this limit.
45  *
46  * Uploads will still run and downloads that are small enough will continue too.
47  */
48 qint64 freeSpaceLimit();
49 
50 class SyncJournalDb;
51 class OwncloudPropagator;
52 class PropagatorCompositeJob;
53 
54 /**
55  * @brief the base class of propagator jobs
56  *
57  * This can either be a job, or a container for jobs.
58  * If it is a composite job, it then inherits from PropagateDirectory
59  *
60  * @ingroup libsync
61  */
62 class PropagatorJob : public QObject
63 {
64     Q_OBJECT
65 
66 public:
67     explicit PropagatorJob(OwncloudPropagator *propagator);
68 
69     enum AbortType {
70         Synchronous,
71         Asynchronous
72     };
73 
74     enum JobState {
75         NotYetStarted,
76         Running,
77         Finished
78     };
79     JobState _state;
80 
81     enum JobParallelism {
82 
83         /** Jobs can be run in parallel to this job */
84         FullParallelism,
85 
86         /** No other job shall be started until this one has finished.
87             So this job is guaranteed to finish before any jobs below it
88             are executed. */
89         WaitForFinished,
90     };
91 
parallelism()92     virtual JobParallelism parallelism() { return FullParallelism; }
93 
94     /**
95      * For "small" jobs
96      */
isLikelyFinishedQuickly()97     virtual bool isLikelyFinishedQuickly() { return false; }
98 
99     /** The space that the running jobs need to complete but don't actually use yet.
100      *
101      * Note that this does *not* include the disk space that's already
102      * in use by running jobs for things like a download-in-progress.
103      */
committedDiskSpace()104     virtual qint64 committedDiskSpace() const { return 0; }
105 
106     /** Set the associated composite job
107      *
108      * Used only from PropagatorCompositeJob itself, when a job is added
109      * and from PropagateDirectory to associate the subJobs with the first
110      * job.
111      */
setAssociatedComposite(PropagatorCompositeJob * job)112     void setAssociatedComposite(PropagatorCompositeJob *job) { _associatedComposite = job; }
113 
114 public slots:
115     /*
116      * Asynchronous abort requires emit of abortFinished() signal,
117      * while synchronous is expected to abort immedietaly.
118     */
abort(PropagatorJob::AbortType abortType)119     virtual void abort(PropagatorJob::AbortType abortType) {
120         if (abortType == AbortType::Asynchronous)
121             emit abortFinished();
122     }
123 
124     /** Starts this job, or a new subjob
125      * returns true if a job was started.
126      */
127     virtual bool scheduleSelfOrChild() = 0;
128 signals:
129     /**
130      * Emitted when the job is fully finished
131      */
132     void finished(SyncFileItem::Status);
133 
134     /**
135      * Emitted when the abort is fully finished
136      */
137     void abortFinished(SyncFileItem::Status status = SyncFileItem::NormalError);
138 protected:
139     OwncloudPropagator *propagator() const;
140 
141     /** If this job gets added to a composite job, this will point to the parent.
142      *
143      * For the PropagateDirectory::_firstJob it will point to
144      * PropagateDirectory::_subJobs.
145      *
146      * That can be useful for jobs that want to spawn follow-up jobs without
147      * becoming composite jobs themselves.
148      */
149     PropagatorCompositeJob *_associatedComposite = nullptr;
150 };
151 
152 /*
153  * Abstract class to propagate a single item
154  */
155 class PropagateItemJob : public PropagatorJob
156 {
157     Q_OBJECT
158 protected:
159     virtual void done(SyncFileItem::Status status, const QString &errorString = QString());
160 
161     /*
162      * set a custom restore job message that is used if the restore job succeeded.
163      * It is displayed in the activity view.
164      */
restoreJobMsg()165     QString restoreJobMsg() const
166     {
167         return _item->_isRestoration ? _item->_errorString : QString();
168     }
169     void setRestoreJobMsg(const QString &msg = QString())
170     {
171         _item->_isRestoration = true;
172         _item->_errorString = msg;
173     }
174 
175 protected slots:
176     void slotRestoreJobFinished(SyncFileItem::Status status);
177 
178 private:
179     QScopedPointer<PropagateItemJob> _restoreJob;
180 
181 public:
PropagateItemJob(OwncloudPropagator * propagator,const SyncFileItemPtr & item)182     PropagateItemJob(OwncloudPropagator *propagator, const SyncFileItemPtr &item)
183         : PropagatorJob(propagator)
184         , _item(item)
185     {
186     }
187     ~PropagateItemJob() override;
188 
scheduleSelfOrChild()189     bool scheduleSelfOrChild() override
190     {
191         if (_state != NotYetStarted) {
192             return false;
193         }
194         qCInfo(lcPropagator) << "Starting" << _item->_instruction << "propagation of" << _item->destination() << "by" << this;
195 
196         _state = Running;
197         QMetaObject::invokeMethod(this, "start"); // We could be in a different thread (neon jobs)
198         return true;
199     }
200 
201     SyncFileItemPtr _item;
202 
203 public slots:
204     virtual void start() = 0;
205 };
206 
207 /**
208  * @brief Job that runs subjobs. It becomes finished only when all subjobs are finished.
209  * @ingroup libsync
210  */
211 class PropagatorCompositeJob : public PropagatorJob
212 {
213     Q_OBJECT
214 public:
215     QVector<PropagatorJob *> _jobsToDo;
216     SyncFileItemVector _tasksToDo;
217     QVector<PropagatorJob *> _runningJobs;
218     SyncFileItem::Status _hasError; // NoStatus,  or NormalError / SoftError if there was an error
219     quint64 _abortsCount;
220 
PropagatorCompositeJob(OwncloudPropagator * propagator)221     explicit PropagatorCompositeJob(OwncloudPropagator *propagator)
222         : PropagatorJob(propagator)
223         , _hasError(SyncFileItem::NoStatus), _abortsCount(0)
224     {
225     }
226 
~PropagatorCompositeJob()227     ~PropagatorCompositeJob() override
228     {
229         // Don't delete jobs in _jobsToDo and _runningJobs: they have parents
230         // that will be responsible for cleanup. Deleting them here would risk
231         // deleting something that has already been deleted by a shared parent.
232     }
233 
234     void appendJob(PropagatorJob *job);
appendTask(const SyncFileItemPtr & item)235     void appendTask(const SyncFileItemPtr &item)
236     {
237         _tasksToDo.append(item);
238     }
239 
240     bool scheduleSelfOrChild() override;
241     JobParallelism parallelism() override;
242 
243     /*
244      * Abort synchronously or asynchronously - some jobs
245      * require to be finished without immediete abort (abort on job might
246      * cause conflicts/duplicated files - owncloud/client/issues/5949)
247      */
abort(PropagatorJob::AbortType abortType)248     void abort(PropagatorJob::AbortType abortType) override
249     {
250         if (!_runningJobs.empty()) {
251             _abortsCount = _runningJobs.size();
252             foreach (PropagatorJob *j, _runningJobs) {
253                 if (abortType == AbortType::Asynchronous) {
254                     connect(j, &PropagatorJob::abortFinished,
255                             this, &PropagatorCompositeJob::slotSubJobAbortFinished);
256                 }
257                 j->abort(abortType);
258             }
259         } else if (abortType == AbortType::Asynchronous){
260             emit abortFinished();
261         }
262     }
263 
264     qint64 committedDiskSpace() const override;
265 
266 private slots:
267     void slotSubJobAbortFinished();
possiblyRunNextJob(PropagatorJob * next)268     bool possiblyRunNextJob(PropagatorJob *next)
269     {
270         if (next->_state == NotYetStarted) {
271             connect(next, &PropagatorJob::finished, this, &PropagatorCompositeJob::slotSubJobFinished);
272         }
273         return next->scheduleSelfOrChild();
274     }
275 
276     void slotSubJobFinished(SyncFileItem::Status status);
277     void finalize();
278 };
279 
280 /**
281  * @brief Propagate a directory, and all its sub entries.
282  * @ingroup libsync
283  */
284 class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob
285 {
286     Q_OBJECT
287 public:
288     SyncFileItemPtr _item;
289     // e.g: create the directory
290     QScopedPointer<PropagateItemJob> _firstJob;
291 
292     PropagatorCompositeJob _subJobs;
293 
294     explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item);
295 
appendJob(PropagatorJob * job)296     void appendJob(PropagatorJob *job)
297     {
298         _subJobs.appendJob(job);
299     }
300 
appendTask(const SyncFileItemPtr & item)301     void appendTask(const SyncFileItemPtr &item)
302     {
303         _subJobs.appendTask(item);
304     }
305 
306     bool scheduleSelfOrChild() override;
307     JobParallelism parallelism() override;
abort(PropagatorJob::AbortType abortType)308     void abort(PropagatorJob::AbortType abortType) override
309     {
310         if (_firstJob)
311             // Force first job to abort synchronously
312             // even if caller allows async abort (asyncAbort)
313             _firstJob->abort(AbortType::Synchronous);
314 
315         if (abortType == AbortType::Asynchronous){
316             connect(&_subJobs, &PropagatorCompositeJob::abortFinished, this, &PropagateDirectory::abortFinished);
317         }
318         _subJobs.abort(abortType);
319     }
320 
increaseAffectedCount()321     void increaseAffectedCount()
322     {
323         _firstJob->_item->_affectedItems++;
324     }
325 
326 
committedDiskSpace()327     qint64 committedDiskSpace() const override
328     {
329         return _subJobs.committedDiskSpace();
330     }
331 
332 private slots:
333 
334     void slotFirstJobFinished(SyncFileItem::Status status);
335     virtual void slotSubJobsFinished(SyncFileItem::Status status);
336 
337 };
338 
339 /**
340  * @brief Propagate the root directory, and all its sub entries.
341  * @ingroup libsync
342  *
343  * Primary difference to PropagateDirectory is that it keeps track of directory
344  * deletions that must happen at the very end.
345  */
346 class OWNCLOUDSYNC_EXPORT PropagateRootDirectory : public PropagateDirectory
347 {
348     Q_OBJECT
349 public:
350     PropagatorCompositeJob _dirDeletionJobs;
351 
352     explicit PropagateRootDirectory(OwncloudPropagator *propagator);
353 
354     bool scheduleSelfOrChild() override;
355     JobParallelism parallelism() override;
356     void abort(PropagatorJob::AbortType abortType) override;
357 
358     qint64 committedDiskSpace() const override;
359 
360 private slots:
361     void slotSubJobsFinished(SyncFileItem::Status status) override;
362     void slotDirDeletionJobsFinished(SyncFileItem::Status status);
363 };
364 
365 /**
366  * @brief Dummy job that just mark it as completed and ignored
367  * @ingroup libsync
368  */
369 class PropagateIgnoreJob : public PropagateItemJob
370 {
371     Q_OBJECT
372 public:
PropagateIgnoreJob(OwncloudPropagator * propagator,const SyncFileItemPtr & item)373     PropagateIgnoreJob(OwncloudPropagator *propagator, const SyncFileItemPtr &item)
374         : PropagateItemJob(propagator, item)
375     {
376     }
start()377     void start() override
378     {
379         SyncFileItem::Status status = _item->_status;
380         if (status == SyncFileItem::NoStatus) {
381             if (_item->_instruction == CSYNC_INSTRUCTION_ERROR) {
382                 status = SyncFileItem::NormalError;
383             } else {
384                 status = SyncFileItem::FileIgnored;
385                 OC_ASSERT(_item->_instruction == CSYNC_INSTRUCTION_IGNORE);
386             }
387         }
388         done(status, _item->_errorString);
389     }
390 };
391 
392 class OWNCLOUDSYNC_EXPORT OwncloudPropagator : public QObject
393 {
394     Q_OBJECT
395 public:
396     SyncJournalDb *const _journal;
397     bool _finishedEmited; // used to ensure that finished is only emitted once
398 
399 public:
OwncloudPropagator(AccountPtr account,const QString & localDir,const QString & remoteFolder,SyncJournalDb * progressDb)400     OwncloudPropagator(AccountPtr account, const QString &localDir,
401         const QString &remoteFolder, SyncJournalDb *progressDb)
402         : _localDir((localDir.endsWith(QLatin1Char('/'))) ? localDir : localDir + QLatin1Char('/'))
403         , _remoteFolder((remoteFolder.endsWith(QLatin1Char('/'))) ? remoteFolder : remoteFolder + QLatin1Char('/'))
404         , _journal(progressDb)
405         , _finishedEmited(false)
406         , _bandwidthManager(this)
407         , _anotherSyncNeeded(false)
408         , _chunkSize(10 * 1000 * 1000) // 10 MB, overridden in setSyncOptions
409         , _account(account)
410     {
411         qRegisterMetaType<PropagatorJob::AbortType>("PropagatorJob::AbortType");
412     }
413 
414     ~OwncloudPropagator() override;
415 
416     void start(SyncFileItemVector &&_syncedItems);
417 
418     const SyncOptions &syncOptions() const;
419     void setSyncOptions(const SyncOptions &syncOptions);
420 
421     int _downloadLimit = 0;
422     int _uploadLimit = 0;
423     BandwidthManager _bandwidthManager;
424 
425     bool _abortRequested = false;
426 
427     /** The list of currently active jobs.
428         This list contains the jobs that are currently using ressources and is used purely to
429         know how many jobs there is currently running for the scheduler.
430         Jobs add themself to the list when they do an assynchronous operation.
431         Jobs can be several time on the list (example, when several chunks are uploaded in parallel)
432      */
433     QList<PropagateItemJob *> _activeJobList;
434 
435     /** We detected that another sync is required after this one */
436     bool _anotherSyncNeeded;
437 
438     /** Per-folder quota guesses.
439      *
440      * This starts out empty. When an upload in a folder fails due to insufficent
441      * remote quota, the quota guess is updated to be attempted_size-1 at maximum.
442      *
443      * Note that it will usually just an upper limit for the actual quota - but
444      * since the quota on the server might change at any time it can sometimes be
445      * wrong in the other direction as well.
446      *
447      * This allows skipping of uploads that have a very high likelihood of failure.
448      */
449     QHash<QString, qint64> _folderQuota;
450 
451     /* the maximum number of jobs using bandwidth (uploads or downloads, in parallel) */
452     int maximumActiveTransferJob();
453 
454     /** The size to use for upload chunks.
455      *
456      * Will be dynamically adjusted after each chunk upload finishes
457      * if Capabilities::desiredChunkUploadDuration has a target
458      * chunk-upload duration set.
459      */
460     qint64 _chunkSize;
461     qint64 smallFileSize();
462 
463     /* The maximum number of active jobs in parallel  */
464     int hardMaximumActiveJob();
465 
466     /** Check whether a download would clash with an existing file
467      * in filesystems that are only case-preserving.
468      */
469     bool localFileNameClash(const QString &relfile);
470 
471     /** Check whether a file is properly accessible for upload.
472      *
473      * It is possible to create files with filenames that differ
474      * only by case in NTFS, but most operations such as stat and
475      * open only target one of these by default.
476      *
477      * When that happens, we want to avoid uploading incorrect data
478      * and give up on the file.
479      */
480     bool hasCaseClashAccessibilityProblem(const QString &relfile);
481 
482     Q_REQUIRED_RESULT QString fullLocalPath(const QString &tmp_file_name) const;
483     QString localPath() const;
484 
485     /**
486      * Returns the full remote path including the folder root of a
487      * folder sync path.
488      */
489     Q_REQUIRED_RESULT QString fullRemotePath(const QString &tmp_file_name) const;
490     QString remotePath() const;
491 
492     /** Creates the job for an item.
493      */
494     PropagateItemJob *createJob(const SyncFileItemPtr &item);
495 
496     void scheduleNextJob();
497     void reportProgress(const SyncFileItem &, qint64 bytes);
498     void reportFileTotal(const SyncFileItem &item, qint64 newSize);
499 
abort()500     void abort()
501     {
502         if (_abortRequested)
503             return;
504         if (_rootJob) {
505             // Connect to abortFinished  which signals that abort has been asynchronously finished
506             connect(_rootJob.data(), &PropagateDirectory::abortFinished, this, &OwncloudPropagator::emitFinished);
507 
508             // Use Queued Connection because we're possibly already in an item's finished stack
509             QMetaObject::invokeMethod(_rootJob.data(), "abort", Qt::QueuedConnection,
510                                       Q_ARG(PropagatorJob::AbortType, PropagatorJob::AbortType::Asynchronous));
511 
512             // Give asynchronous abort 5000 msec to finish on its own
513             QTimer::singleShot(5000, this, SLOT(abortTimeout()));
514         } else {
515             // No root job, call emitFinished
516             emitFinished(SyncFileItem::NormalError);
517         }
518     }
519 
520     AccountPtr account() const;
521 
522     enum DiskSpaceResult {
523         DiskSpaceOk,
524         DiskSpaceFailure,
525         DiskSpaceCritical
526     };
527 
528     /** Checks whether there's enough disk space available to complete
529      *  all jobs that are currently running.
530      */
531     DiskSpaceResult diskSpaceCheck() const;
532 
533     /** Handles a conflict by renaming the file 'item'.
534      *
535      * Sets up conflict records.
536      *
537      * It also creates a new upload job in composite if the item that's
538      * moved away is a file and conflict uploads are requested.
539      *
540      * Returns true on success, false and error on error.
541      */
542     bool createConflict(const SyncFileItemPtr &item,
543         PropagatorCompositeJob *composite, QString *error);
544 
545     // Map original path (as in the DB) to target final path
546     QMap<QString, QString> _renamedDirectories;
547     QString adjustRenamedPath(const QString &original) const;
548 
549     /** Update the database for an item.
550      *
551      * Typically after a sync operation succeeded. Updates the inode from
552      * the filesystem.
553      *
554      * Will also trigger a Vfs::convertToPlaceholder.
555      */
556     static bool updateMetadata(const SyncFileItem &item, const QString &localFolderPath, SyncJournalDb &journal, Vfs &vfs);
557     bool updateMetadata(const SyncFileItem &item); // convenience for the above
558 
559 private slots:
560 
abortTimeout()561     void abortTimeout()
562     {
563         // Abort synchronously and finish
564         _rootJob.data()->abort(PropagatorJob::AbortType::Synchronous);
565         emitFinished(SyncFileItem::NormalError);
566     }
567 
568     /** Emit the finished signal and make sure it is only emitted once */
emitFinished(SyncFileItem::Status status)569     void emitFinished(SyncFileItem::Status status)
570     {
571         if (!_finishedEmited)
572             emit finished(status == SyncFileItem::Success);
573         _finishedEmited = true;
574     }
575 
576     void scheduleNextJobImpl();
577 
578 signals:
579     void newItem(const SyncFileItemPtr &);
580     void itemCompleted(const SyncFileItemPtr &);
581     void progress(const SyncFileItem &, qint64 bytes);
582     void updateFileTotal(const SyncFileItem &, qint64 newSize);
583     void finished(bool success);
584 
585     /** Emitted when propagation has problems with a locked file. */
586     void seenLockedFile(const QString &fileName);
587 
588     /** Emitted when propagation touches a file.
589      *
590      * Used to track our own file modifications such that notifications
591      * from the file watcher about these can be ignored.
592      */
593     void touchedFile(const QString &fileName);
594 
595     void insufficientLocalStorage();
596     void insufficientRemoteStorage();
597 
598 private:
599     AccountPtr _account;
600     QScopedPointer<PropagateRootDirectory> _rootJob;
601     SyncOptions _syncOptions;
602     bool _jobScheduled = false;
603 
604     const QString _localDir; // absolute path to the local directory. ends with '/'
605     const QString _remoteFolder; // remote folder, ends with '/'
606 };
607 
608 
609 /**
610  * @brief Job that wait for all the poll jobs to be completed
611  * @ingroup libsync
612  */
613 class CleanupPollsJob : public QObject
614 {
615     Q_OBJECT
616     QVector<SyncJournalDb::PollInfo> _pollInfos;
617     AccountPtr _account;
618     SyncJournalDb *_journal;
619     QString _localPath;
620     QSharedPointer<Vfs> _vfs;
621 
622 public:
623     explicit CleanupPollsJob(const QVector<SyncJournalDb::PollInfo> &pollInfos, AccountPtr account,
624         SyncJournalDb *journal, const QString &localPath, const QSharedPointer<Vfs> &vfs, QObject *parent = nullptr)
QObject(parent)625         : QObject(parent)
626         , _pollInfos(pollInfos)
627         , _account(account)
628         , _journal(journal)
629         , _localPath(localPath)
630         , _vfs(vfs)
631     {
632     }
633 
634     ~CleanupPollsJob() override;
635 
636     /**
637      * Start the job.  After the job is completed, it will emit either finished or aborted, and it
638      * will destroy itself.
639      */
640     void start();
641 signals:
642     void finished();
643     void aborted(const QString &error);
644 private slots:
645     void slotPollFinished();
646 };
647 }
648 
649 #endif
650