1 /**
2  * UGENE - Integrated Bioinformatics Tools.
3  * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4  * http://ugene.net
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19  * MA 02110-1301, USA.
20  */
21 
22 #include "BaseDocWriter.h"
23 
24 #include <U2Core/AppContext.h>
25 #include <U2Core/DeleteObjectsTask.h>
26 #include <U2Core/DocumentModel.h>
27 #include <U2Core/DocumentUtils.h>
28 #include <U2Core/FailTask.h>
29 #include <U2Core/GHints.h>
30 #include <U2Core/GUrlUtils.h>
31 #include <U2Core/IOAdapter.h>
32 #include <U2Core/IOAdapterUtils.h>
33 #include <U2Core/ImportObjectToDatabaseTask.h>
34 #include <U2Core/Log.h>
35 #include <U2Core/MultiTask.h>
36 #include <U2Core/ProjectModel.h>
37 #include <U2Core/TaskSignalMapper.h>
38 #include <U2Core/U2DbiRegistry.h>
39 #include <U2Core/U2OpStatusUtils.h>
40 #include <U2Core/U2SafePoints.h>
41 
42 #include <U2Gui/DialogUtils.h>
43 
44 #include <U2Lang/BaseAttributes.h>
45 #include <U2Lang/BaseSlots.h>
46 #include <U2Lang/CoreLibConstants.h>
47 #include <U2Lang/SharedDbUrlUtils.h>
48 #include <U2Lang/WorkflowEnv.h>
49 #include <U2Lang/WorkflowMonitor.h>
50 #include <U2Lang/WorkflowUtils.h>
51 
52 #include "CoreLib.h"
53 
54 namespace U2 {
55 namespace LocalWorkflow {
56 
57 /**********************************
58  * BaseDocWriter
59  **********************************/
BaseDocWriter(Actor * a,const DocumentFormatId & fid)60 BaseDocWriter::BaseDocWriter(Actor *a, const DocumentFormatId &fid)
61     : BaseWorker(a), format(nullptr), dataStorage(LocalFs), ch(nullptr), append(true), fileMode(SaveDoc_Roll), objectsReceived(false) {
62     format = AppContext::getDocumentFormatRegistry()->getFormatById(fid);
63 }
64 
BaseDocWriter(Actor * a)65 BaseDocWriter::BaseDocWriter(Actor *a)
66     : BaseWorker(a), format(nullptr), dataStorage(LocalFs), ch(nullptr), append(true), fileMode(SaveDoc_Roll), objectsReceived(false) {
67 }
68 
cleanup()69 void BaseDocWriter::cleanup() {
70     foreach (IOAdapter *io, adapters.values()) {
71         if (io->isOpen()) {
72             io->close();
73         }
74     }
75 }
76 
init()77 void BaseDocWriter::init() {
78     SAFE_POINT(ports.size() == 1, "Unexpected port count", );
79     ch = ports.values().first();
80 }
81 
takeParameters(U2OpStatus & os)82 void BaseDocWriter::takeParameters(U2OpStatus &os) {
83     Attribute *dataStorageAttr = actor->getParameter(BaseAttributes::DATA_STORAGE_ATTRIBUTE().getId());
84 
85     const QString storage = (nullptr == dataStorageAttr) ? BaseAttributes::LOCAL_FS_DATA_STORAGE() : dataStorageAttr->getAttributeValue<QString>(context);
86     if (BaseAttributes::LOCAL_FS_DATA_STORAGE() == storage) {
87         dataStorage = LocalFs;
88 
89         Attribute *formatAttr = actor->getParameter(BaseAttributes::DOCUMENT_FORMAT_ATTRIBUTE().getId());
90         if (nullptr != formatAttr) {  // user sets format
91             QString formatId = formatAttr->getAttributeValue<QString>(context);
92             format = AppContext::getDocumentFormatRegistry()->getFormatById(formatId);
93         }
94         if (nullptr == format) {
95             os.setError(tr("Document format not set"));
96             return;
97         }
98 
99         fileMode = getValue<uint>(BaseAttributes::FILE_MODE_ATTRIBUTE().getId());
100         Attribute *a = actor->getParameter(BaseAttributes::ACCUMULATE_OBJS_ATTRIBUTE().getId());
101         if (a != nullptr) {
102             append = a->getAttributeValue<bool>(context);
103         } else {
104             append = true;
105         }
106     } else if (BaseAttributes::SHARED_DB_DATA_STORAGE() == storage) {
107         dataStorage = SharedDb;
108 
109         const QString fullDbUrl = getValue<QString>(BaseAttributes::DATABASE_ATTRIBUTE().getId());
110         dstDbiRef = SharedDbUrlUtils::getDbRefFromEntityUrl(fullDbUrl);
111         CHECK_EXT(dstDbiRef.isValid(), os.setError(tr("Invalid database reference")), );
112 
113         dstPathInDb = getValue<QString>(BaseAttributes::DB_PATH().getId());
114         CHECK_EXT(!dstPathInDb.isEmpty(), os.setError(tr("Empty destination path supplied")), );
115     } else {
116         os.setError(tr("Unexpected data storage attribute value"));
117     }
118 }
119 
120 namespace {
toFileName(const QString & base,const QString & suffix,const QString & ext)121 QString toFileName(const QString &base, const QString &suffix, const QString &ext) {
122     QString result = base + suffix;
123     if (!ext.isEmpty()) {
124         result += "." + ext;
125     }
126     return result;
127 }
128 }  // namespace
129 
getDefaultFileName() const130 QString BaseDocWriter::getDefaultFileName() const {
131     return actor->getId() + "_output";
132 }
133 
ifGroupByDatasets() const134 bool BaseDocWriter::ifGroupByDatasets() const {
135     Attribute *a = actor->getParameter(BaseAttributes::ACCUMULATE_OBJS_ATTRIBUTE().getId());
136     if (nullptr == a) {
137         return false;
138     }
139     return a->getAttributeValue<bool>(context);
140 }
141 
getSuffix() const142 QString BaseDocWriter::getSuffix() const {
143     Attribute *a = actor->getParameter(BaseAttributes::URL_SUFFIX().getId());
144     if (nullptr == a) {
145         return "";
146     }
147     return a->getAttributeValue<QString>(context);
148 }
149 
getExtension() const150 QString BaseDocWriter::getExtension() const {
151     CHECK(nullptr != format, "");
152     QStringList exts = format->getSupportedDocumentFileExtensions();
153     CHECK(!exts.isEmpty(), "");
154     return exts.first();
155 }
156 
getBaseName(const MessageMetadata & metadata,bool groupByDatasets,const QString & defaultName)157 QString BaseDocWriter::getBaseName(const MessageMetadata &metadata, bool groupByDatasets, const QString &defaultName) {
158     if (groupByDatasets) {
159         if (metadata.getDatasetName().isEmpty()) {
160             return defaultName;
161         }
162         return metadata.getDatasetName();
163     } else if (!metadata.getFileUrl().isEmpty()) {
164         QFileInfo info(metadata.getFileUrl());
165         return info.baseName();
166     } else if (!metadata.getDatabaseId().isEmpty()) {
167         return metadata.getDatabaseId();
168     }
169     return defaultName;
170 }
171 
generateUrl(int metadataId) const172 QString BaseDocWriter::generateUrl(int metadataId) const {
173     MessageMetadata metadata = context->getMetadataStorage().get(metadataId);
174     return generateUrl(metadata, ifGroupByDatasets(), getSuffix(), getExtension(), getDefaultFileName());
175 }
176 
generateUrl(const MessageMetadata & metadata,bool groupByDatasets,const QString & suffix,const QString & ext,const QString & defaultName)177 QString BaseDocWriter::generateUrl(const MessageMetadata &metadata, bool groupByDatasets, const QString &suffix, const QString &ext, const QString &defaultName) {
178     QString baseName = getBaseName(metadata, groupByDatasets, defaultName);
179     return toFileName(baseName, suffix, ext);
180 }
181 
takeUrlList(const QVariantMap & data,int metadataId,U2OpStatus & os)182 QStringList BaseDocWriter::takeUrlList(const QVariantMap &data, int metadataId, U2OpStatus &os) {
183     QString url = getValue<QString>(BaseAttributes::URL_OUT_ATTRIBUTE().getId());
184     if (url.isEmpty()) {
185         url = data.value(BaseSlots::URL_SLOT().getId()).toString();
186     }
187     if (url.isEmpty()) {
188         url = generateUrl(metadataId);
189     }
190     if (url.isEmpty()) {
191         QString err = tr("Unspecified URL to write %1").arg(format->getFormatName());
192         os.setError(err);
193         return QStringList();
194     }
195 
196     QStringList result;
197     result << context->absolutePath(url);
198     return result;
199 }
200 
isSupportedSeveralMessages() const201 bool BaseDocWriter::isSupportedSeveralMessages() const {
202     // if the format can contain only one object then adapters must be created for each message
203     if (format->checkFlags(DocumentFormatFlag_SingleObjectFormat)) {
204         return false;
205     }
206     if (format->checkFlags(DocumentFormatFlag_OnlyOneObject)) {
207         return false;
208     }
209     return true;
210 }
211 
ifCreateAdapter(const QString & url) const212 bool BaseDocWriter::ifCreateAdapter(const QString &url) const {
213     if (!isSupportedSeveralMessages()) {
214         return true;
215     }
216 
217     // if not accumulate object in one file
218     if (!append) {
219         return true;
220     }
221 
222     return (!adapters.contains(url));
223 }
224 
openAdapter(IOAdapter * io,const QString & aUrl,const SaveDocFlags & flags,U2OpStatus & os)225 void BaseDocWriter::openAdapter(IOAdapter *io, const QString &aUrl, const SaveDocFlags &flags, U2OpStatus &os) {
226     {  // prepare dir
227         QFileInfo info(aUrl);
228         if (!info.dir().exists()) {
229             bool created = info.dir().mkpath(info.dir().absolutePath());
230             if (!created) {
231                 os.setError(tr("Can not create folder: %1").arg(info.dir().absolutePath()));
232             }
233         }
234     }
235 
236     // Generate a target URL from the source URL.
237     QString url = aUrl;
238     int suffix = 0;
239     do {
240         if (suffix == 0 && counters.contains(aUrl)) {
241             suffix = counters[aUrl];
242         }
243         if (suffix > 0) {
244             url = GUrlUtils::insertSuffix(aUrl, "_" + QString::number(suffix));
245         }
246         suffix++;
247     } while (monitor()->containsOutputFile(url));
248 
249     if (flags.testFlag(SaveDoc_Roll)) {
250         TaskStateInfo ti;
251         if (!GUrlUtils::renameFileWithNameRoll(url, ti, usedUrls)) {
252             os.setError(ti.getError());
253             return;
254         }
255     }
256     IOAdapterMode mode = flags.testFlag(SaveDoc_Append) ? IOAdapterMode_Append : IOAdapterMode_Write;
257     bool opened = io->open(url, mode);
258     if (!opened) {
259         os.setError(tr("Can not open a file for writing: %1").arg(url));
260     }
261 
262     counters[aUrl] = suffix;
263 }
264 
getAdapter(const QString & url,U2OpStatus & os)265 IOAdapter *BaseDocWriter::getAdapter(const QString &url, U2OpStatus &os) {
266     if (!ifCreateAdapter(url)) {
267         return adapters[url];
268     }
269 
270     IOAdapterFactory *iof = AppContext::getIOAdapterRegistry()->getIOAdapterFactoryById(IOAdapterUtils::url2io(url));
271     QScopedPointer<IOAdapter> io(iof->createIOAdapter());
272     openAdapter(io.data(), url, SaveDocFlags(fileMode), os);
273     CHECK_OP(os, nullptr);
274 
275     QString resultUrl = io->getURL().getURLString();
276     if (!adapters.contains(url)) {
277         adapters[url] = io.data();
278     }
279     if (!adapters.contains(resultUrl)) {
280         adapters[resultUrl] = io.data();
281     }
282     usedUrls << resultUrl;
283     monitor()->addOutputFile(resultUrl, getActorId());
284 
285     return io.take();
286 }
287 
getDocument(IOAdapter * io,U2OpStatus & os)288 Document *BaseDocWriter::getDocument(IOAdapter *io, U2OpStatus &os) {
289     if (docs.contains(io)) {
290         return docs[io];
291     }
292 
293     QVariantMap hints;
294     U2DbiRef dbiRef = context->getDataStorage()->getDbiRef();
295     hints.insert(DocumentFormat::DBI_REF_HINT, qVariantFromValue(dbiRef));
296     Document *doc = format->createNewLoadedDocument(io->getFactory(), io->getURL(), os, hints);
297     CHECK_OP(os, nullptr);
298 
299     doc->setDocumentOwnsDbiResources(false);
300     docs[io] = doc;
301     return doc;
302 }
303 
isStreamingSupport() const304 bool BaseDocWriter::isStreamingSupport() const {
305     return format->isStreamingSupport();
306 }
307 
storeData(const QStringList & urls,const QVariantMap & data,U2OpStatus & os)308 void BaseDocWriter::storeData(const QStringList &urls, const QVariantMap &data, U2OpStatus &os) {
309     foreach (const QString &anUrl, urls) {
310         IOAdapter *io = getAdapter(anUrl, os);
311         CHECK_OP(os, );
312         if (isStreamingSupport()) {
313             // TODO: make it in separate thread!
314             storeEntry(io, data, ch->takenMessages());
315         } else {
316             Document *doc = getDocument(io, os);
317             CHECK_OP(os, );
318             data2doc(doc, data);
319         }
320     }
321 }
322 
323 #define CHECK_OS(os) \
324     if (os.hasError()) { \
325         reportError(os.getError()); \
326         continue; \
327     }
328 
tick()329 Task *BaseDocWriter::tick() {
330     U2OpStatusImpl os;
331     while (ch->hasMessage()) {
332         const Message inputMessage = getMessageAndSetupScriptValues(ch);
333         takeParameters(os);
334         CHECK_OS(os);
335 
336         const QVariantMap data = inputMessage.getData().toMap();
337         if (!hasDataToWrite(data)) {
338             reportError(tr("No data to write"));
339             continue;
340         }
341 
342         if (dataStorage == LocalFs) {
343             const QStringList urls = takeUrlList(data, inputMessage.getMetadataId(), os);
344             CHECK_OS(os);
345             storeData(urls, data, os);
346             CHECK_OS(os);
347 
348             if (!append) {
349                 break;
350             }
351         } else if (dataStorage == SharedDb) {
352             Task *result = createWriteToSharedDbTask(data);
353             if (result == nullptr) {
354                 continue;
355             } else {
356                 return result;
357             }
358         } else {
359             reportError(tr("Unexpected data storage attribute value"));
360         }
361     }
362 
363     bool done = ch->isEnded() && !ch->hasMessage();
364     if (append && !done) {
365         return nullptr;
366     }
367     if (done) {
368         setDone();
369     }
370     if (dataStorage == SharedDb && !objectsReceived) {
371         reportNoDataReceivedWarning();
372     }
373     return dataStorage == LocalFs ? processDocs() : nullptr;
374 }
375 
reportNoDataReceivedWarning()376 void BaseDocWriter::reportNoDataReceivedWarning() {
377     monitor()->addError(tr("Nothing to write"), getActorId(), WorkflowNotification::U2_WARNING);
378 }
379 
getObjectsToWriteBaseImpl(const QVariantMap & data) const380 QSet<GObject *> BaseDocWriter::getObjectsToWriteBaseImpl(const QVariantMap &data) const {
381     QSet<GObject *> result = getObjectsToWrite(data);
382     result.remove(nullptr);  // eliminate invalid objects
383     return result;
384 }
385 
createWriteToSharedDbTask(const QVariantMap & data)386 Task *BaseDocWriter::createWriteToSharedDbTask(const QVariantMap &data) {
387     QList<Task *> tasks;
388     foreach (GObject *obj, getObjectsToWriteBaseImpl(data)) {
389         if (nullptr == obj) {
390             reportError(tr("Unable to fetch data from a message"));
391             continue;
392         }
393         Task *importTask = new ImportObjectToDatabaseTask(obj, dstDbiRef, dstPathInDb);
394         connect(new TaskSignalMapper(importTask), SIGNAL(si_taskFinished(Task *)), SLOT(sl_objectImported(Task *)));
395         tasks.append(importTask);
396     }
397     if (tasks.isEmpty()) {
398         return nullptr;
399     } else {
400         objectsReceived = true;
401     }
402     Task *resultTask = tasks.size() == 1 ? tasks.first() : new MultiTask(tr("Save objects to a shared database"), tasks);
403     return resultTask;
404 }
405 
sl_objectImported(Task * importTask)406 void BaseDocWriter::sl_objectImported(Task *importTask) {
407     ImportObjectToDatabaseTask *realTask = qobject_cast<ImportObjectToDatabaseTask *>(importTask);
408     SAFE_POINT(nullptr != realTask, "Invalid task detected", );
409     delete realTask->getSourceObject();
410 }
411 
processDocs()412 Task *BaseDocWriter::processDocs() {
413     if (adapters.isEmpty()) {
414         reportNoDataReceivedWarning();
415     }
416     if (docs.isEmpty()) {
417         return nullptr;
418     }
419     QList<Task *> tlist;
420     foreach (IOAdapter *io, docs.keys()) {
421         Document *doc = docs[io];
422         ioLog.details(tr("Writing to %1 [%2]").arg(io->getURL().getURLString()).arg(format->getFormatName()));
423         io->close();
424         GHints *hints = doc->getGHints();
425         hints->set(DocumentRemovalMode_Synchronous, QString());
426         tlist << getWriteDocTask(doc, getDocFlags());
427     }
428     docs.clear();
429 
430     return tlist.size() == 1 ? tlist.first() : new MultiTask(tr("Save documents"), tlist);
431 }
432 
getWriteDocTask(Document * doc,const SaveDocFlags & flags)433 Task *BaseDocWriter::getWriteDocTask(Document *doc, const SaveDocFlags &flags) {
434     return new SaveDocumentTask(doc, flags, DocumentUtils::getNewDocFileNameExcludesHint());
435 }
436 
getDocFlags() const437 SaveDocFlags BaseDocWriter::getDocFlags() const {
438     SaveDocFlags flags(fileMode);
439     flags |= SaveDoc_DestroyAfter;
440     if (flags.testFlag(SaveDoc_Roll)) {
441         flags ^= SaveDoc_Roll;
442     }
443     return flags;
444 }
445 
getUniqueObjectName(const Document * doc,const QString & name)446 QString BaseDocWriter::getUniqueObjectName(const Document *doc, const QString &name) {
447     QString result = name;
448     int num = 0;
449     bool found = false;
450     while (nullptr != doc->findGObjectByName(result)) {
451         found = true;
452         num++;
453         result = name + QString("_%1").arg(num);
454     }
455 
456     return found ? result : name;
457 }
458 
459 }  // namespace LocalWorkflow
460 }  // namespace U2
461