1 #include "partprocessor.h"
2 #include <QFile>
3 #include <QDebug>
4 #include <QTimer>
5 #include <QModelIndex>
6 #include "easylogging++.h"
7
8 const char* PartProcessor::kPartSuffix = "_split-part_";
9
PartProcessor(const QString & source,const QString & destination,qint32 startOffset,qint32 maxPerPart,qint32 originalFileSize_,int part,const QModelIndex & index,kProcessType processType,QObject * parent)10 PartProcessor::PartProcessor(const QString& source, const QString& destination, qint32 startOffset, qint32 maxPerPart, qint32 originalFileSize_, int part, const QModelIndex& index, kProcessType processType, QObject *parent) :
11 QThread(parent),
12 processType(processType),
13 sourceFilename_(source),
14 destinationFilename_(destination),
15 seekLocation_(startOffset),
16 maxSizePerPart_(maxPerPart),
17 originalFileSize_(originalFileSize_),
18 partIndex_(part),
19 sourceFile_(new QFile(source)),
20 modelIndex_(const_cast<QModelIndex&>(index)) {
21
22 this->openFiles();
23 this->cancelled_ = false;
24 this->resume();
25 }
26
PartProcessor(const QList<QString> & parts,const QString & destination,const QModelIndex & index,kProcessType processType,QObject * parent)27 PartProcessor::PartProcessor(const QList<QString>& parts, const QString& destination, const QModelIndex& index, kProcessType processType, QObject *parent) :
28 parts_(parts),
29 processType(processType),
30 destinationFilename_(destination),
31 modelIndex_(const_cast<QModelIndex&>(index)),
32 QThread(parent) {
33 sourceFile_ = NULL;
34 this->cancelled_ = false;
35 this->resume();
36 this->openFiles();
37 }
38
~PartProcessor(void)39 PartProcessor::~PartProcessor(void) {
40 closeFiles();
41 qDeleteAll(errors_);
42 }
43
closeFiles(void)44 void PartProcessor::closeFiles(void) {
45 if (sourceFile_ && sourceFile_->isOpen()) {
46 sourceFile_->close();
47 }
48 if (destinationFile_.isOpen()) {
49 destinationFile_.close();
50 }
51 if (sourceFile_ != NULL) {
52 delete sourceFile_;
53 sourceFile_ = NULL;
54 }
55 }
56
openFiles(void)57 void PartProcessor::openFiles(void) {
58 filesOpened = false;
59 if(processType == kSplit && !sourceFile_->open(QIODevice::ReadOnly)) {
60 errors_.push_back(new Error("Error opening source file", kOpenFileError));
61 LOG(ERROR) << "Error opening source file";
62 return;
63 }
64 destinationFile_.setFileName(this->destinationFilename_ + (processType == kSplit ?
65 (QString(PartProcessor::kPartSuffix) + QString::number(partIndex())) : ""));
66 if(!destinationFile_.open(QIODevice::WriteOnly)) {
67 errors_.push_back(new Error("Error opening source file", kOpenFileError));
68 LOG(ERROR) << "Error opening destination file";
69 return;
70 }
71 filesOpened = true;
72 }
73
split(void)74 int PartProcessor::split(void) {
75 mutex_.lock();
76 LOG(DEBUG) << "Splitting " << this->sourceFilename_.toStdString() << " to " << this->destinationFilename_.toStdString();
77 int nextBuffLength = PartProcessor::kBufferSize;
78 char *data = new char[nextBuffLength];
79 qint32 dataBytes;
80 progress_ = 0;
81 this->sourceFile_->seek(this->seekLocation_);
82 while (!this->sourceFile_->atEnd()) {
83 if (cancelled_) {
84 break;
85 }
86 while (paused()) {
87 msleep(500);
88 }
89 if (data == NULL) {
90 break;
91 }
92
93 nextBuffLength = (progress() + PartProcessor::kBufferSize > this->maxSizePerPart_) ?
94 this->maxSizePerPart_ - progress() :
95 PartProcessor::kBufferSize;
96
97 dataBytes = this->sourceFile_->read(data, nextBuffLength);
98 this->destinationFile_.write(data, dataBytes);
99 this->destinationFile_.flush();
100 progress_ += nextBuffLength;
101 VLOG(2) << "Progress (split) = " << progress_ << " bytes";
102 if (progress() % (PartProcessor::kBufferSize * PartProcessor::kUpdateFrequencyBytes) == 0) {
103 emit updated(this);
104 }
105
106 if (progress() == this->maxSizePerPart_) {
107 emit updated(this);
108 emit finished(this);
109 break;
110 }
111 }
112 delete[] data;
113 data = NULL;
114 closeFiles();
115 int status = (progress() != this->maxSizePerPart_);
116 mutex_.unlock();
117 return status;
118 }
119
merge(void)120 int PartProcessor::merge(void) {
121 int status = -1;
122 this->progress_ = 0;
123 this->partIndex_ = 0;
124 int progBytes = 0;
125 mutex_.lock();
126 for (int i = 0; i < this->parts_.size(); i++) {
127 this->partIndex_ = i;
128 this->progress_ = i;
129 LOG(INFO) << "Merging data from: " << this->parts_.at(i).toStdString();
130 //TODO: check for source file availability
131 this->sourceFile_ = new QFile(this->parts_.at(i));
132 if (!this->sourceFile_->open(QFile::ReadOnly)) {
133 LOG(ERROR) << "Error opening files!";
134 return status;
135 }
136 char* data = new char[PartProcessor::kBufferSize];
137 qint32 dataBytes;
138 while (!this->sourceFile_->atEnd()) {
139 if (cancelled_) {
140 break;
141 }
142 while (paused()) {
143 msleep(500);
144 }
145 //TODO: check for destination file writable permissions beforehand
146 dataBytes = this->sourceFile_->read(data, PartProcessor::kBufferSize);
147 progBytes += static_cast<int>(dataBytes);
148 VLOG(2) << "Progress (merge) = " << progBytes << " bytes";
149 this->destinationFile_.write(data, dataBytes);
150 this->destinationFile_.flush();
151 }
152 delete[] data;
153 data = NULL;
154 this->sourceFile_->close();
155 delete this->sourceFile_;
156 this->sourceFile_ = NULL;
157 emit updated(this);
158 }
159 mutex_.unlock();
160 status = this->progress_ == this->parts_.size();
161 closeFiles();
162 return status;
163 }
164
paused(void) const165 bool PartProcessor::paused(void) const {
166 return paused_;
167 }
168
modelIndex(void) const169 QModelIndex& PartProcessor::modelIndex(void) const {
170 return modelIndex_;
171 }
172
progress(void) const173 qint32 PartProcessor::progress(void) const {
174 return progress_;
175 }
176
originalFileSize(void) const177 qint32 PartProcessor::originalFileSize(void) const {
178 return originalFileSize_;
179 }
180
partIndex(void) const181 int PartProcessor::partIndex(void) const {
182 return partIndex_;
183 }
184
errors(void) const185 QList<Error*> PartProcessor::errors(void) const {
186 return errors_;
187 }
188
pause(void)189 void PartProcessor::pause(void) {
190 paused_ = true;
191 }
192
cancel(void)193 void PartProcessor::cancel(void) {
194 cancelled_ = true;
195 }
196
resume()197 void PartProcessor::resume() {
198 paused_ = false;
199 }
200
run(void)201 void PartProcessor::run(void) {
202 if (!filesOpened) {
203 LOG(ERROR) << "Files were not successfully opened. Cannot start " << (processType == kSplit ? "splitting" : "merging");
204 return;
205 }
206 emit started(this);
207 this->resume();
208 int status = -1;
209 if (processType == kSplit) {
210 if (this->sourceFilename_ == "") {
211 LOG(ERROR) << "Source file not specified";
212 return;
213 }
214 status = split();
215 } else if (processType == kMerge) {
216 if (this->parts_.size() == 0) {
217 LOG(ERROR) << "Parts not specified";
218 return;
219 }
220 status = merge();
221 }
222 if (status == -1) {
223 LOG(ERROR) << "Error occurred while " << (processType == kSplit ? "splitting" : "merging");
224 }
225 }
226