1 #include "partprocessor.h"
2 #include <QFile>
3 #include <QDebug>
4 #include <QTimer>
5 #include <QModelIndex>
6 #include "easylogging++.h"
7 
8 const char* PartProcessor::kPartSuffix = "_split-part_";
9 
PartProcessor(const QString & source,const QString & destination,qint32 startOffset,qint32 maxPerPart,qint32 originalFileSize_,int part,const QModelIndex & index,kProcessType processType,QObject * parent)10 PartProcessor::PartProcessor(const QString& source, const QString& destination, qint32 startOffset, qint32 maxPerPart, qint32 originalFileSize_, int part, const QModelIndex& index, kProcessType processType, QObject *parent) :
11     QThread(parent),
12     processType(processType),
13     sourceFilename_(source),
14     destinationFilename_(destination),
15     seekLocation_(startOffset),
16     maxSizePerPart_(maxPerPart),
17     originalFileSize_(originalFileSize_),
18     partIndex_(part),
19     sourceFile_(new QFile(source)),
20     modelIndex_(const_cast<QModelIndex&>(index)) {
21 
22     this->openFiles();
23     this->cancelled_ = false;
24     this->resume();
25 }
26 
PartProcessor(const QList<QString> & parts,const QString & destination,const QModelIndex & index,kProcessType processType,QObject * parent)27 PartProcessor::PartProcessor(const QList<QString>& parts, const QString& destination, const QModelIndex& index, kProcessType processType, QObject *parent) :
28     parts_(parts),
29     processType(processType),
30     destinationFilename_(destination),
31     modelIndex_(const_cast<QModelIndex&>(index)),
32     QThread(parent) {
33     sourceFile_ = NULL;
34     this->cancelled_ = false;
35     this->resume();
36     this->openFiles();
37 }
38 
~PartProcessor(void)39 PartProcessor::~PartProcessor(void) {
40     closeFiles();
41     qDeleteAll(errors_);
42 }
43 
closeFiles(void)44 void PartProcessor::closeFiles(void) {
45     if (sourceFile_ && sourceFile_->isOpen()) {
46         sourceFile_->close();
47     }
48     if (destinationFile_.isOpen()) {
49         destinationFile_.close();
50     }
51     if (sourceFile_ != NULL) {
52         delete sourceFile_;
53         sourceFile_ = NULL;
54     }
55 }
56 
openFiles(void)57 void PartProcessor::openFiles(void) {
58     filesOpened = false;
59     if(processType == kSplit && !sourceFile_->open(QIODevice::ReadOnly)) {
60         errors_.push_back(new Error("Error opening source file", kOpenFileError));
61         LOG(ERROR) << "Error opening source file";
62         return;
63     }
64     destinationFile_.setFileName(this->destinationFilename_ + (processType == kSplit ?
65                                                                    (QString(PartProcessor::kPartSuffix) + QString::number(partIndex())) : ""));
66     if(!destinationFile_.open(QIODevice::WriteOnly)) {
67         errors_.push_back(new Error("Error opening source file", kOpenFileError));
68         LOG(ERROR) << "Error opening destination file";
69         return;
70     }
71     filesOpened = true;
72 }
73 
split(void)74 int PartProcessor::split(void) {
75     mutex_.lock();
76     LOG(DEBUG) << "Splitting " << this->sourceFilename_.toStdString() << " to " << this->destinationFilename_.toStdString();
77     int nextBuffLength = PartProcessor::kBufferSize;
78     char *data = new char[nextBuffLength];
79     qint32 dataBytes;
80     progress_ = 0;
81     this->sourceFile_->seek(this->seekLocation_);
82     while (!this->sourceFile_->atEnd()) {
83         if (cancelled_) {
84             break;
85         }
86         while (paused()) {
87             msleep(500);
88         }
89         if (data == NULL) {
90             break;
91         }
92 
93         nextBuffLength = (progress() + PartProcessor::kBufferSize > this->maxSizePerPart_) ?
94                              this->maxSizePerPart_ - progress() :
95                              PartProcessor::kBufferSize;
96 
97         dataBytes = this->sourceFile_->read(data, nextBuffLength);
98         this->destinationFile_.write(data, dataBytes);
99         this->destinationFile_.flush();
100         progress_ += nextBuffLength;
101         VLOG(2) << "Progress (split) = " << progress_ << " bytes";
102         if (progress() % (PartProcessor::kBufferSize * PartProcessor::kUpdateFrequencyBytes) == 0) {
103             emit updated(this);
104         }
105 
106         if (progress() == this->maxSizePerPart_) {
107             emit updated(this);
108             emit finished(this);
109             break;
110         }
111     }
112     delete[] data;
113     data = NULL;
114     closeFiles();
115     int status = (progress() != this->maxSizePerPart_);
116     mutex_.unlock();
117     return status;
118 }
119 
merge(void)120 int PartProcessor::merge(void) {
121     int status = -1;
122     this->progress_ = 0;
123     this->partIndex_ = 0;
124     int progBytes = 0;
125     mutex_.lock();
126     for (int i = 0; i < this->parts_.size(); i++) {
127         this->partIndex_ = i;
128         this->progress_ = i;
129         LOG(INFO) << "Merging data from: " << this->parts_.at(i).toStdString();
130         //TODO: check for source file availability
131         this->sourceFile_ = new QFile(this->parts_.at(i));
132         if (!this->sourceFile_->open(QFile::ReadOnly)) {
133             LOG(ERROR) << "Error opening files!";
134             return status;
135         }
136         char* data = new char[PartProcessor::kBufferSize];
137         qint32 dataBytes;
138         while (!this->sourceFile_->atEnd()) {
139             if (cancelled_) {
140                 break;
141             }
142             while (paused()) {
143                 msleep(500);
144             }
145             //TODO: check for destination file writable permissions beforehand
146             dataBytes = this->sourceFile_->read(data, PartProcessor::kBufferSize);
147             progBytes += static_cast<int>(dataBytes);
148             VLOG(2) << "Progress (merge) = " << progBytes << " bytes";
149             this->destinationFile_.write(data, dataBytes);
150             this->destinationFile_.flush();
151         }
152         delete[] data;
153         data = NULL;
154         this->sourceFile_->close();
155         delete this->sourceFile_;
156         this->sourceFile_ = NULL;
157         emit updated(this);
158     }
159     mutex_.unlock();
160     status = this->progress_ == this->parts_.size();
161     closeFiles();
162     return status;
163 }
164 
paused(void) const165 bool PartProcessor::paused(void) const {
166     return paused_;
167 }
168 
modelIndex(void) const169 QModelIndex& PartProcessor::modelIndex(void) const {
170     return modelIndex_;
171 }
172 
progress(void) const173 qint32 PartProcessor::progress(void) const {
174     return progress_;
175 }
176 
originalFileSize(void) const177 qint32 PartProcessor::originalFileSize(void) const {
178     return originalFileSize_;
179 }
180 
partIndex(void) const181 int PartProcessor::partIndex(void) const {
182     return partIndex_;
183 }
184 
errors(void) const185 QList<Error*> PartProcessor::errors(void) const {
186     return errors_;
187 }
188 
pause(void)189 void PartProcessor::pause(void) {
190     paused_ = true;
191 }
192 
cancel(void)193 void PartProcessor::cancel(void) {
194     cancelled_ = true;
195 }
196 
resume()197 void PartProcessor::resume() {
198     paused_ = false;
199 }
200 
run(void)201 void PartProcessor::run(void) {
202     if (!filesOpened) {
203         LOG(ERROR) << "Files were not successfully opened. Cannot start " << (processType == kSplit ? "splitting" : "merging");
204                 return;
205     }
206     emit started(this);
207     this->resume();
208     int status = -1;
209     if (processType == kSplit) {
210         if (this->sourceFilename_ == "") {
211             LOG(ERROR) << "Source file not specified";
212             return;
213         }
214         status = split();
215     } else if (processType == kMerge) {
216         if (this->parts_.size() == 0) {
217             LOG(ERROR) << "Parts not specified";
218             return;
219         }
220         status = merge();
221     }
222     if (status == -1) {
223         LOG(ERROR) << "Error occurred while " << (processType == kSplit ? "splitting" : "merging");
224     }
225 }
226