1 /**
2  * UGENE - Integrated Bioinformatics Tools.
3  * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4  * http://ugene.net
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19  * MA 02110-1301, USA.
20  */
21 
22 #include "LocalDomain.h"
23 
24 #include <U2Core/AppContext.h>
25 #include <U2Core/CMDLineRegistry.h>
26 #include <U2Core/CMDLineUtils.h>
27 #include <U2Core/Log.h>
28 #include <U2Core/U2SafePoints.h>
29 
30 #include <U2Lang/ActorModel.h>
31 #include <U2Lang/BaseAttributes.h>
32 #include <U2Lang/IntegralBusType.h>
33 #include <U2Lang/LastReadyScheduler.h>
34 #include <U2Lang/Schema.h>
35 #include <U2Lang/WorkflowMonitor.h>
36 #include <U2Lang/WorkflowSettings.h>
37 
38 namespace U2 {
39 namespace LocalWorkflow {
40 
41 const QString LocalDomainFactory::ID("domain.local.bio");
42 
43 /*****************************
44  * BaseWorker
45  *****************************/
BaseWorker(Actor * a,bool autoTransitBus)46 BaseWorker::BaseWorker(Actor *a, bool autoTransitBus)
47     : processDone(false), actor(a) {
48     foreach (Port *p, a->getPorts()) {
49         if (qobject_cast<IntegralBusPort *>(p)) {
50             IntegralBus *bus = new IntegralBus(p);
51             ports.insert(p->getId(), bus);
52             p->setPeer(bus);
53         }
54     }
55     if (autoTransitBus) {
56         foreach (Port *p, a->getInputPorts()) {
57             IntegralBus *bus = p->castPeer<IntegralBus>();
58             QList<Port *> outputPorts = a->getOutputPorts();
59             for (Port *op : qAsConst(outputPorts)) {
60                 if (p->isInput() != op->isInput()) {
61                     IntegralBus *ob = op->castPeer<IntegralBus>();
62                     ob->addComplement(bus);
63                     bus->addComplement(ob);
64                 }
65             }
66         }
67     }
68     a->setPeer(this);
69     // failFast = WorkflowSettings::failFast();
70 }
71 
~BaseWorker()72 BaseWorker::~BaseWorker() {
73     foreach (Port *p, actor->getPorts()) {
74         if (qobject_cast<IntegralBusPort *>(p)) {
75             p->setPeer(nullptr);
76         }
77     }
78     qDeleteAll(ports.values());
79     actor->setPeer(nullptr);
80 }
81 
getOutputFiles()82 QStringList BaseWorker::getOutputFiles() {
83     QStringList res;
84     foreach (Attribute *attr, actor->getProto()->getAttributes()) {
85         if (attr->getId() == BaseAttributes::URL_OUT_ATTRIBUTE().getId()) {
86             QString str = actor->getParameter(BaseAttributes::URL_OUT_ATTRIBUTE().getId())->getAttributeValueWithoutScript<QString>();
87             QUrl url(str);
88             if (url.isValid()) {
89                 res << url.toString();
90             }
91         }
92     }
93     return res;
94 }
95 
addCommunication(const QString & id,CommunicationChannel * ch)96 bool BaseWorker::addCommunication(const QString &id, CommunicationChannel *ch) {
97     Q_UNUSED(id);
98     Q_UNUSED(ch);
99     assert(0);
100     return false;
101 }
102 
getCommunication(const QString & name)103 CommunicationChannel *BaseWorker::getCommunication(const QString &name) {
104     return ports.value(name);
105 }
106 
getActorId() const107 ActorId BaseWorker::getActorId() const {
108     return actor->getId();
109 }
110 
getMessageAndSetupScriptValues(CommunicationChannel * channel)111 Message BaseWorker::getMessageAndSetupScriptValues(CommunicationChannel *channel) {
112     assert(channel != nullptr);
113     assert(channel->hasMessage());
114     bindScriptValues();
115     Message currentMessage = channel->get();
116     currentMessage.isEmpty();
117     messagesProcessedOnLastTick[channel].enqueue(currentMessage);
118 
119     return currentMessage;
120 }
121 
bindScriptValues()122 void BaseWorker::bindScriptValues() {
123     foreach (IntegralBus *bus, ports.values()) {
124         assert(bus != nullptr);
125         if (!bus->hasMessage()) {  // means that it is bus for output port
126             continue;
127         }
128 
129         foreach (Attribute *attribute, actor->getParameters().values()) {
130             assert(attribute != nullptr);
131             setScriptVariableFromBus(&attribute->getAttributeScript(), bus);
132 
133             if (actor->getCondition()->hasVarWithId(attribute->getId())) {
134                 actor->getCondition()->setVarValueWithId(attribute->getId(), attribute->getAttributePureValue());
135             }
136         }
137 
138         QVariantMap busData = bus->lookMessage().getData().toMap();
139         foreach (const QString &slotId, busData.keys()) {
140             QString attrId = "in_" + slotId;
141             if (actor->getCondition()->hasVarWithId(attrId)) {
142                 actor->getCondition()->setVarValueWithId(attrId, busData.value(slotId));
143             }
144         }
145     }
146 }
147 
setScriptVariableFromBus(AttributeScript * script,IntegralBus * bus)148 void BaseWorker::setScriptVariableFromBus(AttributeScript *script, IntegralBus *bus) {
149     QVariantMap busData = bus->look().getData().toMap();
150     foreach (const QString &slotDesc, busData.keys()) {
151         ActorId actorId = IntegralBusType::parseSlotDesc(slotDesc);
152         QString attrId = IntegralBusType::parseAttributeIdFromSlotDesc(slotDesc);
153         QString portId = bus->getPortId();
154         IntegralBusPort *busPort = qobject_cast<IntegralBusPort *>(actor->getPort(portId));
155         assert(busPort != nullptr);
156 
157         Actor *bindedAttrOwner = busPort->getLinkedActorById(actorId);
158         if (bindedAttrOwner == nullptr) {
159             continue;
160         }
161         // attrId.replace(".", "_");
162         // attrId.replace("-", "_");
163         if (!script->getScriptText().isEmpty()) {
164             // attrScript.setVarValueWithId(attrId, busData.value(slotDesc));
165             script->setScriptVar(attrId, busData.value(slotDesc));
166         }
167     }
168 }
169 
setDone()170 void BaseWorker::setDone() {
171     processDone = true;
172 }
173 
isDone() const174 bool BaseWorker::isDone() const {
175     return processDone;
176 }
177 
isReady() const178 bool BaseWorker::isReady() const {
179     if (isDone()) {
180         return false;
181     }
182 
183     QList<Port *> inPorts = actor->getInputPorts();
184     if (inPorts.isEmpty()) {
185         return true;
186     } else if (1 == inPorts.size()) {
187         IntegralBus *inChannel = ports.value(inPorts.first()->getId());
188         int hasMsg = inChannel->hasMessage();
189         bool ended = inChannel->isEnded();
190         if (hasMsg || ended) {
191             return true;
192         }
193     }
194 
195     return false;
196 }
197 
saveCurrentChannelsStateAndRestorePrevious()198 void BaseWorker::saveCurrentChannelsStateAndRestorePrevious() {
199     foreach (CommunicationChannel *channel, messagesProcessedOnLastTick.keys()) {
200         assert(ports.values().contains(dynamic_cast<IntegralBus *>(channel)));
201 
202         QQueue<Message> currentMessagesBackup;
203         while (channel->hasMessage()) {
204             currentMessagesBackup.enqueue(channel->get());
205         }
206         addMessagesFromBackupToAppropriratePort(channel);
207 
208         messagesProcessedOnLastTick[channel] = currentMessagesBackup;
209     }
210 }
211 
monitor() const212 WorkflowMonitor *BaseWorker::monitor() const {
213     CHECK(nullptr != context, nullptr);
214     return context->getMonitor();
215 }
216 
reportError(const QString & message)217 void BaseWorker::reportError(const QString &message) {
218     CHECK(nullptr != monitor(), );
219     monitor()->addError(message, getActorId());
220 }
221 
restoreActualChannelsState()222 void BaseWorker::restoreActualChannelsState() {
223     foreach (CommunicationChannel *channel, messagesProcessedOnLastTick.keys()) {
224         assert(!channel->hasMessage());
225         addMessagesFromBackupToAppropriratePort(channel);
226     }
227 }
228 
createLogListeners(int listenersNumber) const229 QList<ExternalToolListener *> BaseWorker::createLogListeners(int listenersNumber) const {
230     return context->getMonitor()->createWorkflowListeners(actor->getId(), actor->getLabel(), listenersNumber);
231 }
232 
addMessagesFromBackupToAppropriratePort(CommunicationChannel * channel)233 void BaseWorker::addMessagesFromBackupToAppropriratePort(CommunicationChannel *channel) {
234     while (!messagesProcessedOnLastTick[channel].isEmpty()) {
235         channel->put(messagesProcessedOnLastTick[channel].dequeue(), true);
236     }
237 }
238 
canTaskBeCanceled(Task *) const239 bool BaseWorker::canTaskBeCanceled(Task * /*workerTask*/) const {
240     return false;
241 }
242 
tick(bool & canResultBeCanceled)243 Task *BaseWorker::tick(bool &canResultBeCanceled) {
244     Task *result = tick();
245     if (nullptr != result) {
246         canResultBeCanceled = canTaskBeCanceled(result);
247     }
248 
249     return result;
250 }
251 
252 /*****************************
253  * SimpleQueue
254  *****************************/
SimpleQueue()255 SimpleQueue::SimpleQueue()
256     : ended(false), takenMsgs(0) {
257 }
258 
get()259 Message SimpleQueue::get() {
260     assert(hasMessage());
261     takenMsgs++;
262     return que.dequeue();
263 }
264 
look() const265 Message SimpleQueue::look() const {
266     assert(hasMessage());
267     return que.head();
268 }
269 
put(const Message & m,bool isMessageRestored)270 void SimpleQueue::put(const Message &m, bool isMessageRestored) {
271     que.enqueue(m);
272     if (isMessageRestored) {
273         --takenMsgs;
274     }
275 }
276 
hasMessage() const277 int SimpleQueue::hasMessage() const {
278     return que.size();
279 }
280 
takenMessages() const281 int SimpleQueue::takenMessages() const {
282     return takenMsgs;
283 }
284 
hasRoom(const DataType *) const285 int SimpleQueue::hasRoom(const DataType *) const {
286     return 1000;
287 }
288 
isEnded() const289 bool SimpleQueue::isEnded() const {
290     return ended && que.isEmpty();
291 }
292 
setEnded()293 void SimpleQueue::setEnded() {
294     ended = true;
295 }
296 
capacity() const297 int SimpleQueue::capacity() const {
298     return INT_MAX;
299 }
300 
setCapacity(int)301 void SimpleQueue::setCapacity(int) {
302 }
303 
getMessages(int startIndex,int endIndex) const304 QQueue<Message> SimpleQueue::getMessages(int startIndex, int endIndex) const {
305     if (-1 == endIndex) {
306         endIndex = hasMessage() - 1;
307     }
308     Q_ASSERT(0 <= startIndex && que.size() >= startIndex && 0 <= endIndex && que.size() >= endIndex);
309     QQueue<Message> result;
310     foreach (Message message, que.mid(startIndex, endIndex - startIndex + 1)) {
311         result.enqueue(message);
312     }
313     return result;
314 }
315 
316 /*****************************
317  * LocalDomainFactory
318  *****************************/
LocalDomainFactory()319 LocalDomainFactory::LocalDomainFactory()
320     : DomainFactory(ID) {
321 }
322 
323 // TODO: this function should be moved to WorkflowRunFromCMDLine.cpp
324 // It must be called only once and save its result to some registry
getSlotsForPrint()325 static QMap<QString, QMap<QString, QList<QString>>> getSlotsForPrint() {
326     QMap<QString, QMap<QString, QList<QString>>> forPrint;
327     CMDLineRegistry *cmdLineRegistry = AppContext::getCMDLineRegistry();
328 
329     int printOpIdx = CMDLineRegistryUtils::getParameterIndex("print");
330     while (printOpIdx != -1) {
331         QString printSlot = cmdLineRegistry->getParameterValue("print", printOpIdx);  // TODO: "print" == WorkflowDesignerPlugin::PRINT
332         printOpIdx++;
333         if (!printSlot.isEmpty()) {
334             QStringList tokens = printSlot.split(".");
335             if (3 == tokens.size()) {
336                 QMap<QString, QList<QString>> ports = forPrint.value(tokens[0], QMap<QString, QList<QString>>());
337                 QList<QString> slotS = ports.value(tokens[1], QList<QString>());
338                 slotS.append(tokens[2]);
339                 ports.insert(tokens[1], slotS);
340                 forPrint.insert(tokens[0], ports);
341             }
342         } else {
343             printOpIdx = -1;
344         }
345     }
346     return forPrint;
347 }
348 
addPrintSLots(IntegralBus * bus,Port * p)349 static void addPrintSLots(IntegralBus *bus, Port *p) {
350     QMap<QString, QMap<QString, QList<QString>>> forPrint = getSlotsForPrint();
351     QString actorId = p->owner()->getId();
352     if (forPrint.contains(actorId)) {
353         QMap<QString, QList<QString>> ports = forPrint.value(actorId);
354         if (ports.contains(p->getId())) {
355             QList<QString> slotS = ports.value(p->getId());
356             bus->setPrintSlots(p->isInput(), slotS);
357         }
358     }
359 }
360 
setupBus(Port * p)361 static CommunicationSubject *setupBus(Port *p) {
362     QString id = p->getId();
363     BaseWorker *worker = p->owner()->castPeer<BaseWorker>();
364     assert(worker);
365     CommunicationSubject *subj = worker;
366     IntegralBus *bus = qobject_cast<IntegralBus *>(p->castPeer<QObject>());
367     if (bus) {
368         assert(subj->getCommunication(id) == dynamic_cast<CommunicationChannel *>(bus));
369         subj = bus;
370     } else if (subj) {
371         assert(0);
372         bus = new IntegralBus(p);
373         p->setPeer(bus);
374         subj->addCommunication(id, bus);
375         subj = bus;
376         foreach (Port *op, p->owner()->getPorts()) {
377             if (p->isInput() != op->isInput()) {
378                 IntegralBus *ob = qobject_cast<IntegralBus *>(op->castPeer<QObject>());
379                 if (ob) {
380                     ob->addComplement(bus);
381                     bus->addComplement(ob);
382                 }
383             }
384         }
385     }
386     addPrintSLots(bus, p);
387     return subj;
388 }
389 
createWorker(Actor * a)390 Worker *LocalDomainFactory::createWorker(Actor *a) {
391     Worker *w = nullptr;
392     DomainFactory *f = getById(a->getProto()->getId());
393     if (f) {
394         w = f->createWorker(a);
395 #ifdef _DEBUG
396         assert(w);
397         BaseWorker *bw = dynamic_cast<BaseWorker *>(w);
398         assert(qobject_cast<BaseWorker *>(bw));
399         assert(bw == a->getPeer());
400 #endif
401     }
402 
403     return w;
404 }
405 
createConnection(Link * l)406 CommunicationChannel *LocalDomainFactory::createConnection(Link *l) {
407     SimpleQueue *cc = nullptr;
408     QString srcId = l->source()->getId();
409     QString dstId = l->destination()->getId();
410     CommunicationSubject *src = setupBus(l->source());
411     CommunicationSubject *dst = setupBus(l->destination());
412     if (src && dst) {
413         cc = new SimpleQueue();
414         src->addCommunication(srcId, cc);
415         dst->addCommunication(dstId, cc);
416     }
417     l->setPeer(cc);
418     return cc;
419 }
420 
createScheduler(Schema * sh)421 Scheduler *LocalDomainFactory::createScheduler(Schema *sh) {
422     Scheduler *sc = new LastReadyScheduler(sh);
423     return sc;
424 }
425 
destroy(Scheduler * sh,Schema * schema)426 void LocalDomainFactory::destroy(Scheduler *sh, Schema *schema) {
427     foreach (Link *l, schema->getFlows()) {
428         delete l->castPeer<SimpleQueue>();
429         l->setPeer(nullptr);
430     }
431 
432     foreach (Actor *a, schema->getProcesses()) {
433         delete a->castPeer<BaseWorker>();
434     }
435 
436     delete sh;
437 }
438 
439 }  // namespace LocalWorkflow
440 }  // namespace U2
441