1 /**
2 * UGENE - Integrated Bioinformatics Tools.
3 * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4 * http://ugene.net
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301, USA.
20 */
21
22 #include "BaseDocWriter.h"
23
24 #include <U2Core/AppContext.h>
25 #include <U2Core/DeleteObjectsTask.h>
26 #include <U2Core/DocumentModel.h>
27 #include <U2Core/DocumentUtils.h>
28 #include <U2Core/FailTask.h>
29 #include <U2Core/GHints.h>
30 #include <U2Core/GUrlUtils.h>
31 #include <U2Core/IOAdapter.h>
32 #include <U2Core/IOAdapterUtils.h>
33 #include <U2Core/ImportObjectToDatabaseTask.h>
34 #include <U2Core/Log.h>
35 #include <U2Core/MultiTask.h>
36 #include <U2Core/ProjectModel.h>
37 #include <U2Core/TaskSignalMapper.h>
38 #include <U2Core/U2DbiRegistry.h>
39 #include <U2Core/U2OpStatusUtils.h>
40 #include <U2Core/U2SafePoints.h>
41
42 #include <U2Gui/DialogUtils.h>
43
44 #include <U2Lang/BaseAttributes.h>
45 #include <U2Lang/BaseSlots.h>
46 #include <U2Lang/CoreLibConstants.h>
47 #include <U2Lang/SharedDbUrlUtils.h>
48 #include <U2Lang/WorkflowEnv.h>
49 #include <U2Lang/WorkflowMonitor.h>
50 #include <U2Lang/WorkflowUtils.h>
51
52 #include "CoreLib.h"
53
54 namespace U2 {
55 namespace LocalWorkflow {
56
57 /**********************************
58 * BaseDocWriter
59 **********************************/
BaseDocWriter(Actor * a,const DocumentFormatId & fid)60 BaseDocWriter::BaseDocWriter(Actor *a, const DocumentFormatId &fid)
61 : BaseWorker(a), format(nullptr), dataStorage(LocalFs), ch(nullptr), append(true), fileMode(SaveDoc_Roll), objectsReceived(false) {
62 format = AppContext::getDocumentFormatRegistry()->getFormatById(fid);
63 }
64
BaseDocWriter(Actor * a)65 BaseDocWriter::BaseDocWriter(Actor *a)
66 : BaseWorker(a), format(nullptr), dataStorage(LocalFs), ch(nullptr), append(true), fileMode(SaveDoc_Roll), objectsReceived(false) {
67 }
68
cleanup()69 void BaseDocWriter::cleanup() {
70 foreach (IOAdapter *io, adapters.values()) {
71 if (io->isOpen()) {
72 io->close();
73 }
74 }
75 }
76
init()77 void BaseDocWriter::init() {
78 SAFE_POINT(ports.size() == 1, "Unexpected port count", );
79 ch = ports.values().first();
80 }
81
takeParameters(U2OpStatus & os)82 void BaseDocWriter::takeParameters(U2OpStatus &os) {
83 Attribute *dataStorageAttr = actor->getParameter(BaseAttributes::DATA_STORAGE_ATTRIBUTE().getId());
84
85 const QString storage = (nullptr == dataStorageAttr) ? BaseAttributes::LOCAL_FS_DATA_STORAGE() : dataStorageAttr->getAttributeValue<QString>(context);
86 if (BaseAttributes::LOCAL_FS_DATA_STORAGE() == storage) {
87 dataStorage = LocalFs;
88
89 Attribute *formatAttr = actor->getParameter(BaseAttributes::DOCUMENT_FORMAT_ATTRIBUTE().getId());
90 if (nullptr != formatAttr) { // user sets format
91 QString formatId = formatAttr->getAttributeValue<QString>(context);
92 format = AppContext::getDocumentFormatRegistry()->getFormatById(formatId);
93 }
94 if (nullptr == format) {
95 os.setError(tr("Document format not set"));
96 return;
97 }
98
99 fileMode = getValue<uint>(BaseAttributes::FILE_MODE_ATTRIBUTE().getId());
100 Attribute *a = actor->getParameter(BaseAttributes::ACCUMULATE_OBJS_ATTRIBUTE().getId());
101 if (a != nullptr) {
102 append = a->getAttributeValue<bool>(context);
103 } else {
104 append = true;
105 }
106 } else if (BaseAttributes::SHARED_DB_DATA_STORAGE() == storage) {
107 dataStorage = SharedDb;
108
109 const QString fullDbUrl = getValue<QString>(BaseAttributes::DATABASE_ATTRIBUTE().getId());
110 dstDbiRef = SharedDbUrlUtils::getDbRefFromEntityUrl(fullDbUrl);
111 CHECK_EXT(dstDbiRef.isValid(), os.setError(tr("Invalid database reference")), );
112
113 dstPathInDb = getValue<QString>(BaseAttributes::DB_PATH().getId());
114 CHECK_EXT(!dstPathInDb.isEmpty(), os.setError(tr("Empty destination path supplied")), );
115 } else {
116 os.setError(tr("Unexpected data storage attribute value"));
117 }
118 }
119
120 namespace {
toFileName(const QString & base,const QString & suffix,const QString & ext)121 QString toFileName(const QString &base, const QString &suffix, const QString &ext) {
122 QString result = base + suffix;
123 if (!ext.isEmpty()) {
124 result += "." + ext;
125 }
126 return result;
127 }
128 } // namespace
129
getDefaultFileName() const130 QString BaseDocWriter::getDefaultFileName() const {
131 return actor->getId() + "_output";
132 }
133
ifGroupByDatasets() const134 bool BaseDocWriter::ifGroupByDatasets() const {
135 Attribute *a = actor->getParameter(BaseAttributes::ACCUMULATE_OBJS_ATTRIBUTE().getId());
136 if (nullptr == a) {
137 return false;
138 }
139 return a->getAttributeValue<bool>(context);
140 }
141
getSuffix() const142 QString BaseDocWriter::getSuffix() const {
143 Attribute *a = actor->getParameter(BaseAttributes::URL_SUFFIX().getId());
144 if (nullptr == a) {
145 return "";
146 }
147 return a->getAttributeValue<QString>(context);
148 }
149
getExtension() const150 QString BaseDocWriter::getExtension() const {
151 CHECK(nullptr != format, "");
152 QStringList exts = format->getSupportedDocumentFileExtensions();
153 CHECK(!exts.isEmpty(), "");
154 return exts.first();
155 }
156
getBaseName(const MessageMetadata & metadata,bool groupByDatasets,const QString & defaultName)157 QString BaseDocWriter::getBaseName(const MessageMetadata &metadata, bool groupByDatasets, const QString &defaultName) {
158 if (groupByDatasets) {
159 if (metadata.getDatasetName().isEmpty()) {
160 return defaultName;
161 }
162 return metadata.getDatasetName();
163 } else if (!metadata.getFileUrl().isEmpty()) {
164 QFileInfo info(metadata.getFileUrl());
165 return info.baseName();
166 } else if (!metadata.getDatabaseId().isEmpty()) {
167 return metadata.getDatabaseId();
168 }
169 return defaultName;
170 }
171
generateUrl(int metadataId) const172 QString BaseDocWriter::generateUrl(int metadataId) const {
173 MessageMetadata metadata = context->getMetadataStorage().get(metadataId);
174 return generateUrl(metadata, ifGroupByDatasets(), getSuffix(), getExtension(), getDefaultFileName());
175 }
176
generateUrl(const MessageMetadata & metadata,bool groupByDatasets,const QString & suffix,const QString & ext,const QString & defaultName)177 QString BaseDocWriter::generateUrl(const MessageMetadata &metadata, bool groupByDatasets, const QString &suffix, const QString &ext, const QString &defaultName) {
178 QString baseName = getBaseName(metadata, groupByDatasets, defaultName);
179 return toFileName(baseName, suffix, ext);
180 }
181
takeUrlList(const QVariantMap & data,int metadataId,U2OpStatus & os)182 QStringList BaseDocWriter::takeUrlList(const QVariantMap &data, int metadataId, U2OpStatus &os) {
183 QString url = getValue<QString>(BaseAttributes::URL_OUT_ATTRIBUTE().getId());
184 if (url.isEmpty()) {
185 url = data.value(BaseSlots::URL_SLOT().getId()).toString();
186 }
187 if (url.isEmpty()) {
188 url = generateUrl(metadataId);
189 }
190 if (url.isEmpty()) {
191 QString err = tr("Unspecified URL to write %1").arg(format->getFormatName());
192 os.setError(err);
193 return QStringList();
194 }
195
196 QStringList result;
197 result << context->absolutePath(url);
198 return result;
199 }
200
isSupportedSeveralMessages() const201 bool BaseDocWriter::isSupportedSeveralMessages() const {
202 // if the format can contain only one object then adapters must be created for each message
203 if (format->checkFlags(DocumentFormatFlag_SingleObjectFormat)) {
204 return false;
205 }
206 if (format->checkFlags(DocumentFormatFlag_OnlyOneObject)) {
207 return false;
208 }
209 return true;
210 }
211
ifCreateAdapter(const QString & url) const212 bool BaseDocWriter::ifCreateAdapter(const QString &url) const {
213 if (!isSupportedSeveralMessages()) {
214 return true;
215 }
216
217 // if not accumulate object in one file
218 if (!append) {
219 return true;
220 }
221
222 return (!adapters.contains(url));
223 }
224
openAdapter(IOAdapter * io,const QString & aUrl,const SaveDocFlags & flags,U2OpStatus & os)225 void BaseDocWriter::openAdapter(IOAdapter *io, const QString &aUrl, const SaveDocFlags &flags, U2OpStatus &os) {
226 { // prepare dir
227 QFileInfo info(aUrl);
228 if (!info.dir().exists()) {
229 bool created = info.dir().mkpath(info.dir().absolutePath());
230 if (!created) {
231 os.setError(tr("Can not create folder: %1").arg(info.dir().absolutePath()));
232 }
233 }
234 }
235
236 // Generate a target URL from the source URL.
237 QString url = aUrl;
238 int suffix = 0;
239 do {
240 if (suffix == 0 && counters.contains(aUrl)) {
241 suffix = counters[aUrl];
242 }
243 if (suffix > 0) {
244 url = GUrlUtils::insertSuffix(aUrl, "_" + QString::number(suffix));
245 }
246 suffix++;
247 } while (monitor()->containsOutputFile(url));
248
249 if (flags.testFlag(SaveDoc_Roll)) {
250 TaskStateInfo ti;
251 if (!GUrlUtils::renameFileWithNameRoll(url, ti, usedUrls)) {
252 os.setError(ti.getError());
253 return;
254 }
255 }
256 IOAdapterMode mode = flags.testFlag(SaveDoc_Append) ? IOAdapterMode_Append : IOAdapterMode_Write;
257 bool opened = io->open(url, mode);
258 if (!opened) {
259 os.setError(tr("Can not open a file for writing: %1").arg(url));
260 }
261
262 counters[aUrl] = suffix;
263 }
264
getAdapter(const QString & url,U2OpStatus & os)265 IOAdapter *BaseDocWriter::getAdapter(const QString &url, U2OpStatus &os) {
266 if (!ifCreateAdapter(url)) {
267 return adapters[url];
268 }
269
270 IOAdapterFactory *iof = AppContext::getIOAdapterRegistry()->getIOAdapterFactoryById(IOAdapterUtils::url2io(url));
271 QScopedPointer<IOAdapter> io(iof->createIOAdapter());
272 openAdapter(io.data(), url, SaveDocFlags(fileMode), os);
273 CHECK_OP(os, nullptr);
274
275 QString resultUrl = io->getURL().getURLString();
276 if (!adapters.contains(url)) {
277 adapters[url] = io.data();
278 }
279 if (!adapters.contains(resultUrl)) {
280 adapters[resultUrl] = io.data();
281 }
282 usedUrls << resultUrl;
283 monitor()->addOutputFile(resultUrl, getActorId());
284
285 return io.take();
286 }
287
getDocument(IOAdapter * io,U2OpStatus & os)288 Document *BaseDocWriter::getDocument(IOAdapter *io, U2OpStatus &os) {
289 if (docs.contains(io)) {
290 return docs[io];
291 }
292
293 QVariantMap hints;
294 U2DbiRef dbiRef = context->getDataStorage()->getDbiRef();
295 hints.insert(DocumentFormat::DBI_REF_HINT, qVariantFromValue(dbiRef));
296 Document *doc = format->createNewLoadedDocument(io->getFactory(), io->getURL(), os, hints);
297 CHECK_OP(os, nullptr);
298
299 doc->setDocumentOwnsDbiResources(false);
300 docs[io] = doc;
301 return doc;
302 }
303
isStreamingSupport() const304 bool BaseDocWriter::isStreamingSupport() const {
305 return format->isStreamingSupport();
306 }
307
storeData(const QStringList & urls,const QVariantMap & data,U2OpStatus & os)308 void BaseDocWriter::storeData(const QStringList &urls, const QVariantMap &data, U2OpStatus &os) {
309 foreach (const QString &anUrl, urls) {
310 IOAdapter *io = getAdapter(anUrl, os);
311 CHECK_OP(os, );
312 if (isStreamingSupport()) {
313 // TODO: make it in separate thread!
314 storeEntry(io, data, ch->takenMessages());
315 } else {
316 Document *doc = getDocument(io, os);
317 CHECK_OP(os, );
318 data2doc(doc, data);
319 }
320 }
321 }
322
323 #define CHECK_OS(os) \
324 if (os.hasError()) { \
325 reportError(os.getError()); \
326 continue; \
327 }
328
tick()329 Task *BaseDocWriter::tick() {
330 U2OpStatusImpl os;
331 while (ch->hasMessage()) {
332 const Message inputMessage = getMessageAndSetupScriptValues(ch);
333 takeParameters(os);
334 CHECK_OS(os);
335
336 const QVariantMap data = inputMessage.getData().toMap();
337 if (!hasDataToWrite(data)) {
338 reportError(tr("No data to write"));
339 continue;
340 }
341
342 if (dataStorage == LocalFs) {
343 const QStringList urls = takeUrlList(data, inputMessage.getMetadataId(), os);
344 CHECK_OS(os);
345 storeData(urls, data, os);
346 CHECK_OS(os);
347
348 if (!append) {
349 break;
350 }
351 } else if (dataStorage == SharedDb) {
352 Task *result = createWriteToSharedDbTask(data);
353 if (result == nullptr) {
354 continue;
355 } else {
356 return result;
357 }
358 } else {
359 reportError(tr("Unexpected data storage attribute value"));
360 }
361 }
362
363 bool done = ch->isEnded() && !ch->hasMessage();
364 if (append && !done) {
365 return nullptr;
366 }
367 if (done) {
368 setDone();
369 }
370 if (dataStorage == SharedDb && !objectsReceived) {
371 reportNoDataReceivedWarning();
372 }
373 return dataStorage == LocalFs ? processDocs() : nullptr;
374 }
375
reportNoDataReceivedWarning()376 void BaseDocWriter::reportNoDataReceivedWarning() {
377 monitor()->addError(tr("Nothing to write"), getActorId(), WorkflowNotification::U2_WARNING);
378 }
379
getObjectsToWriteBaseImpl(const QVariantMap & data) const380 QSet<GObject *> BaseDocWriter::getObjectsToWriteBaseImpl(const QVariantMap &data) const {
381 QSet<GObject *> result = getObjectsToWrite(data);
382 result.remove(nullptr); // eliminate invalid objects
383 return result;
384 }
385
createWriteToSharedDbTask(const QVariantMap & data)386 Task *BaseDocWriter::createWriteToSharedDbTask(const QVariantMap &data) {
387 QList<Task *> tasks;
388 foreach (GObject *obj, getObjectsToWriteBaseImpl(data)) {
389 if (nullptr == obj) {
390 reportError(tr("Unable to fetch data from a message"));
391 continue;
392 }
393 Task *importTask = new ImportObjectToDatabaseTask(obj, dstDbiRef, dstPathInDb);
394 connect(new TaskSignalMapper(importTask), SIGNAL(si_taskFinished(Task *)), SLOT(sl_objectImported(Task *)));
395 tasks.append(importTask);
396 }
397 if (tasks.isEmpty()) {
398 return nullptr;
399 } else {
400 objectsReceived = true;
401 }
402 Task *resultTask = tasks.size() == 1 ? tasks.first() : new MultiTask(tr("Save objects to a shared database"), tasks);
403 return resultTask;
404 }
405
sl_objectImported(Task * importTask)406 void BaseDocWriter::sl_objectImported(Task *importTask) {
407 ImportObjectToDatabaseTask *realTask = qobject_cast<ImportObjectToDatabaseTask *>(importTask);
408 SAFE_POINT(nullptr != realTask, "Invalid task detected", );
409 delete realTask->getSourceObject();
410 }
411
processDocs()412 Task *BaseDocWriter::processDocs() {
413 if (adapters.isEmpty()) {
414 reportNoDataReceivedWarning();
415 }
416 if (docs.isEmpty()) {
417 return nullptr;
418 }
419 QList<Task *> tlist;
420 foreach (IOAdapter *io, docs.keys()) {
421 Document *doc = docs[io];
422 ioLog.details(tr("Writing to %1 [%2]").arg(io->getURL().getURLString()).arg(format->getFormatName()));
423 io->close();
424 GHints *hints = doc->getGHints();
425 hints->set(DocumentRemovalMode_Synchronous, QString());
426 tlist << getWriteDocTask(doc, getDocFlags());
427 }
428 docs.clear();
429
430 return tlist.size() == 1 ? tlist.first() : new MultiTask(tr("Save documents"), tlist);
431 }
432
getWriteDocTask(Document * doc,const SaveDocFlags & flags)433 Task *BaseDocWriter::getWriteDocTask(Document *doc, const SaveDocFlags &flags) {
434 return new SaveDocumentTask(doc, flags, DocumentUtils::getNewDocFileNameExcludesHint());
435 }
436
getDocFlags() const437 SaveDocFlags BaseDocWriter::getDocFlags() const {
438 SaveDocFlags flags(fileMode);
439 flags |= SaveDoc_DestroyAfter;
440 if (flags.testFlag(SaveDoc_Roll)) {
441 flags ^= SaveDoc_Roll;
442 }
443 return flags;
444 }
445
getUniqueObjectName(const Document * doc,const QString & name)446 QString BaseDocWriter::getUniqueObjectName(const Document *doc, const QString &name) {
447 QString result = name;
448 int num = 0;
449 bool found = false;
450 while (nullptr != doc->findGObjectByName(result)) {
451 found = true;
452 num++;
453 result = name + QString("_%1").arg(num);
454 }
455
456 return found ? result : name;
457 }
458
459 } // namespace LocalWorkflow
460 } // namespace U2
461