1 /**
2 * UGENE - Integrated Bioinformatics Tools.
3 * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4 * http://ugene.net
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301, USA.
20 */
21
22 #include "LocalDomain.h"
23
24 #include <U2Core/AppContext.h>
25 #include <U2Core/CMDLineRegistry.h>
26 #include <U2Core/CMDLineUtils.h>
27 #include <U2Core/Log.h>
28 #include <U2Core/U2SafePoints.h>
29
30 #include <U2Lang/ActorModel.h>
31 #include <U2Lang/BaseAttributes.h>
32 #include <U2Lang/IntegralBusType.h>
33 #include <U2Lang/LastReadyScheduler.h>
34 #include <U2Lang/Schema.h>
35 #include <U2Lang/WorkflowMonitor.h>
36 #include <U2Lang/WorkflowSettings.h>
37
38 namespace U2 {
39 namespace LocalWorkflow {
40
41 const QString LocalDomainFactory::ID("domain.local.bio");
42
43 /*****************************
44 * BaseWorker
45 *****************************/
BaseWorker(Actor * a,bool autoTransitBus)46 BaseWorker::BaseWorker(Actor *a, bool autoTransitBus)
47 : processDone(false), actor(a) {
48 foreach (Port *p, a->getPorts()) {
49 if (qobject_cast<IntegralBusPort *>(p)) {
50 IntegralBus *bus = new IntegralBus(p);
51 ports.insert(p->getId(), bus);
52 p->setPeer(bus);
53 }
54 }
55 if (autoTransitBus) {
56 foreach (Port *p, a->getInputPorts()) {
57 IntegralBus *bus = p->castPeer<IntegralBus>();
58 QList<Port *> outputPorts = a->getOutputPorts();
59 for (Port *op : qAsConst(outputPorts)) {
60 if (p->isInput() != op->isInput()) {
61 IntegralBus *ob = op->castPeer<IntegralBus>();
62 ob->addComplement(bus);
63 bus->addComplement(ob);
64 }
65 }
66 }
67 }
68 a->setPeer(this);
69 // failFast = WorkflowSettings::failFast();
70 }
71
~BaseWorker()72 BaseWorker::~BaseWorker() {
73 foreach (Port *p, actor->getPorts()) {
74 if (qobject_cast<IntegralBusPort *>(p)) {
75 p->setPeer(nullptr);
76 }
77 }
78 qDeleteAll(ports.values());
79 actor->setPeer(nullptr);
80 }
81
getOutputFiles()82 QStringList BaseWorker::getOutputFiles() {
83 QStringList res;
84 foreach (Attribute *attr, actor->getProto()->getAttributes()) {
85 if (attr->getId() == BaseAttributes::URL_OUT_ATTRIBUTE().getId()) {
86 QString str = actor->getParameter(BaseAttributes::URL_OUT_ATTRIBUTE().getId())->getAttributeValueWithoutScript<QString>();
87 QUrl url(str);
88 if (url.isValid()) {
89 res << url.toString();
90 }
91 }
92 }
93 return res;
94 }
95
addCommunication(const QString & id,CommunicationChannel * ch)96 bool BaseWorker::addCommunication(const QString &id, CommunicationChannel *ch) {
97 Q_UNUSED(id);
98 Q_UNUSED(ch);
99 assert(0);
100 return false;
101 }
102
getCommunication(const QString & name)103 CommunicationChannel *BaseWorker::getCommunication(const QString &name) {
104 return ports.value(name);
105 }
106
getActorId() const107 ActorId BaseWorker::getActorId() const {
108 return actor->getId();
109 }
110
getMessageAndSetupScriptValues(CommunicationChannel * channel)111 Message BaseWorker::getMessageAndSetupScriptValues(CommunicationChannel *channel) {
112 assert(channel != nullptr);
113 assert(channel->hasMessage());
114 bindScriptValues();
115 Message currentMessage = channel->get();
116 currentMessage.isEmpty();
117 messagesProcessedOnLastTick[channel].enqueue(currentMessage);
118
119 return currentMessage;
120 }
121
bindScriptValues()122 void BaseWorker::bindScriptValues() {
123 foreach (IntegralBus *bus, ports.values()) {
124 assert(bus != nullptr);
125 if (!bus->hasMessage()) { // means that it is bus for output port
126 continue;
127 }
128
129 foreach (Attribute *attribute, actor->getParameters().values()) {
130 assert(attribute != nullptr);
131 setScriptVariableFromBus(&attribute->getAttributeScript(), bus);
132
133 if (actor->getCondition()->hasVarWithId(attribute->getId())) {
134 actor->getCondition()->setVarValueWithId(attribute->getId(), attribute->getAttributePureValue());
135 }
136 }
137
138 QVariantMap busData = bus->lookMessage().getData().toMap();
139 foreach (const QString &slotId, busData.keys()) {
140 QString attrId = "in_" + slotId;
141 if (actor->getCondition()->hasVarWithId(attrId)) {
142 actor->getCondition()->setVarValueWithId(attrId, busData.value(slotId));
143 }
144 }
145 }
146 }
147
setScriptVariableFromBus(AttributeScript * script,IntegralBus * bus)148 void BaseWorker::setScriptVariableFromBus(AttributeScript *script, IntegralBus *bus) {
149 QVariantMap busData = bus->look().getData().toMap();
150 foreach (const QString &slotDesc, busData.keys()) {
151 ActorId actorId = IntegralBusType::parseSlotDesc(slotDesc);
152 QString attrId = IntegralBusType::parseAttributeIdFromSlotDesc(slotDesc);
153 QString portId = bus->getPortId();
154 IntegralBusPort *busPort = qobject_cast<IntegralBusPort *>(actor->getPort(portId));
155 assert(busPort != nullptr);
156
157 Actor *bindedAttrOwner = busPort->getLinkedActorById(actorId);
158 if (bindedAttrOwner == nullptr) {
159 continue;
160 }
161 // attrId.replace(".", "_");
162 // attrId.replace("-", "_");
163 if (!script->getScriptText().isEmpty()) {
164 // attrScript.setVarValueWithId(attrId, busData.value(slotDesc));
165 script->setScriptVar(attrId, busData.value(slotDesc));
166 }
167 }
168 }
169
setDone()170 void BaseWorker::setDone() {
171 processDone = true;
172 }
173
isDone() const174 bool BaseWorker::isDone() const {
175 return processDone;
176 }
177
isReady() const178 bool BaseWorker::isReady() const {
179 if (isDone()) {
180 return false;
181 }
182
183 QList<Port *> inPorts = actor->getInputPorts();
184 if (inPorts.isEmpty()) {
185 return true;
186 } else if (1 == inPorts.size()) {
187 IntegralBus *inChannel = ports.value(inPorts.first()->getId());
188 int hasMsg = inChannel->hasMessage();
189 bool ended = inChannel->isEnded();
190 if (hasMsg || ended) {
191 return true;
192 }
193 }
194
195 return false;
196 }
197
saveCurrentChannelsStateAndRestorePrevious()198 void BaseWorker::saveCurrentChannelsStateAndRestorePrevious() {
199 foreach (CommunicationChannel *channel, messagesProcessedOnLastTick.keys()) {
200 assert(ports.values().contains(dynamic_cast<IntegralBus *>(channel)));
201
202 QQueue<Message> currentMessagesBackup;
203 while (channel->hasMessage()) {
204 currentMessagesBackup.enqueue(channel->get());
205 }
206 addMessagesFromBackupToAppropriratePort(channel);
207
208 messagesProcessedOnLastTick[channel] = currentMessagesBackup;
209 }
210 }
211
monitor() const212 WorkflowMonitor *BaseWorker::monitor() const {
213 CHECK(nullptr != context, nullptr);
214 return context->getMonitor();
215 }
216
reportError(const QString & message)217 void BaseWorker::reportError(const QString &message) {
218 CHECK(nullptr != monitor(), );
219 monitor()->addError(message, getActorId());
220 }
221
restoreActualChannelsState()222 void BaseWorker::restoreActualChannelsState() {
223 foreach (CommunicationChannel *channel, messagesProcessedOnLastTick.keys()) {
224 assert(!channel->hasMessage());
225 addMessagesFromBackupToAppropriratePort(channel);
226 }
227 }
228
createLogListeners(int listenersNumber) const229 QList<ExternalToolListener *> BaseWorker::createLogListeners(int listenersNumber) const {
230 return context->getMonitor()->createWorkflowListeners(actor->getId(), actor->getLabel(), listenersNumber);
231 }
232
addMessagesFromBackupToAppropriratePort(CommunicationChannel * channel)233 void BaseWorker::addMessagesFromBackupToAppropriratePort(CommunicationChannel *channel) {
234 while (!messagesProcessedOnLastTick[channel].isEmpty()) {
235 channel->put(messagesProcessedOnLastTick[channel].dequeue(), true);
236 }
237 }
238
canTaskBeCanceled(Task *) const239 bool BaseWorker::canTaskBeCanceled(Task * /*workerTask*/) const {
240 return false;
241 }
242
tick(bool & canResultBeCanceled)243 Task *BaseWorker::tick(bool &canResultBeCanceled) {
244 Task *result = tick();
245 if (nullptr != result) {
246 canResultBeCanceled = canTaskBeCanceled(result);
247 }
248
249 return result;
250 }
251
252 /*****************************
253 * SimpleQueue
254 *****************************/
SimpleQueue()255 SimpleQueue::SimpleQueue()
256 : ended(false), takenMsgs(0) {
257 }
258
get()259 Message SimpleQueue::get() {
260 assert(hasMessage());
261 takenMsgs++;
262 return que.dequeue();
263 }
264
look() const265 Message SimpleQueue::look() const {
266 assert(hasMessage());
267 return que.head();
268 }
269
put(const Message & m,bool isMessageRestored)270 void SimpleQueue::put(const Message &m, bool isMessageRestored) {
271 que.enqueue(m);
272 if (isMessageRestored) {
273 --takenMsgs;
274 }
275 }
276
hasMessage() const277 int SimpleQueue::hasMessage() const {
278 return que.size();
279 }
280
takenMessages() const281 int SimpleQueue::takenMessages() const {
282 return takenMsgs;
283 }
284
hasRoom(const DataType *) const285 int SimpleQueue::hasRoom(const DataType *) const {
286 return 1000;
287 }
288
isEnded() const289 bool SimpleQueue::isEnded() const {
290 return ended && que.isEmpty();
291 }
292
setEnded()293 void SimpleQueue::setEnded() {
294 ended = true;
295 }
296
capacity() const297 int SimpleQueue::capacity() const {
298 return INT_MAX;
299 }
300
setCapacity(int)301 void SimpleQueue::setCapacity(int) {
302 }
303
getMessages(int startIndex,int endIndex) const304 QQueue<Message> SimpleQueue::getMessages(int startIndex, int endIndex) const {
305 if (-1 == endIndex) {
306 endIndex = hasMessage() - 1;
307 }
308 Q_ASSERT(0 <= startIndex && que.size() >= startIndex && 0 <= endIndex && que.size() >= endIndex);
309 QQueue<Message> result;
310 foreach (Message message, que.mid(startIndex, endIndex - startIndex + 1)) {
311 result.enqueue(message);
312 }
313 return result;
314 }
315
316 /*****************************
317 * LocalDomainFactory
318 *****************************/
LocalDomainFactory()319 LocalDomainFactory::LocalDomainFactory()
320 : DomainFactory(ID) {
321 }
322
323 // TODO: this function should be moved to WorkflowRunFromCMDLine.cpp
324 // It must be called only once and save its result to some registry
getSlotsForPrint()325 static QMap<QString, QMap<QString, QList<QString>>> getSlotsForPrint() {
326 QMap<QString, QMap<QString, QList<QString>>> forPrint;
327 CMDLineRegistry *cmdLineRegistry = AppContext::getCMDLineRegistry();
328
329 int printOpIdx = CMDLineRegistryUtils::getParameterIndex("print");
330 while (printOpIdx != -1) {
331 QString printSlot = cmdLineRegistry->getParameterValue("print", printOpIdx); // TODO: "print" == WorkflowDesignerPlugin::PRINT
332 printOpIdx++;
333 if (!printSlot.isEmpty()) {
334 QStringList tokens = printSlot.split(".");
335 if (3 == tokens.size()) {
336 QMap<QString, QList<QString>> ports = forPrint.value(tokens[0], QMap<QString, QList<QString>>());
337 QList<QString> slotS = ports.value(tokens[1], QList<QString>());
338 slotS.append(tokens[2]);
339 ports.insert(tokens[1], slotS);
340 forPrint.insert(tokens[0], ports);
341 }
342 } else {
343 printOpIdx = -1;
344 }
345 }
346 return forPrint;
347 }
348
addPrintSLots(IntegralBus * bus,Port * p)349 static void addPrintSLots(IntegralBus *bus, Port *p) {
350 QMap<QString, QMap<QString, QList<QString>>> forPrint = getSlotsForPrint();
351 QString actorId = p->owner()->getId();
352 if (forPrint.contains(actorId)) {
353 QMap<QString, QList<QString>> ports = forPrint.value(actorId);
354 if (ports.contains(p->getId())) {
355 QList<QString> slotS = ports.value(p->getId());
356 bus->setPrintSlots(p->isInput(), slotS);
357 }
358 }
359 }
360
setupBus(Port * p)361 static CommunicationSubject *setupBus(Port *p) {
362 QString id = p->getId();
363 BaseWorker *worker = p->owner()->castPeer<BaseWorker>();
364 assert(worker);
365 CommunicationSubject *subj = worker;
366 IntegralBus *bus = qobject_cast<IntegralBus *>(p->castPeer<QObject>());
367 if (bus) {
368 assert(subj->getCommunication(id) == dynamic_cast<CommunicationChannel *>(bus));
369 subj = bus;
370 } else if (subj) {
371 assert(0);
372 bus = new IntegralBus(p);
373 p->setPeer(bus);
374 subj->addCommunication(id, bus);
375 subj = bus;
376 foreach (Port *op, p->owner()->getPorts()) {
377 if (p->isInput() != op->isInput()) {
378 IntegralBus *ob = qobject_cast<IntegralBus *>(op->castPeer<QObject>());
379 if (ob) {
380 ob->addComplement(bus);
381 bus->addComplement(ob);
382 }
383 }
384 }
385 }
386 addPrintSLots(bus, p);
387 return subj;
388 }
389
createWorker(Actor * a)390 Worker *LocalDomainFactory::createWorker(Actor *a) {
391 Worker *w = nullptr;
392 DomainFactory *f = getById(a->getProto()->getId());
393 if (f) {
394 w = f->createWorker(a);
395 #ifdef _DEBUG
396 assert(w);
397 BaseWorker *bw = dynamic_cast<BaseWorker *>(w);
398 assert(qobject_cast<BaseWorker *>(bw));
399 assert(bw == a->getPeer());
400 #endif
401 }
402
403 return w;
404 }
405
createConnection(Link * l)406 CommunicationChannel *LocalDomainFactory::createConnection(Link *l) {
407 SimpleQueue *cc = nullptr;
408 QString srcId = l->source()->getId();
409 QString dstId = l->destination()->getId();
410 CommunicationSubject *src = setupBus(l->source());
411 CommunicationSubject *dst = setupBus(l->destination());
412 if (src && dst) {
413 cc = new SimpleQueue();
414 src->addCommunication(srcId, cc);
415 dst->addCommunication(dstId, cc);
416 }
417 l->setPeer(cc);
418 return cc;
419 }
420
createScheduler(Schema * sh)421 Scheduler *LocalDomainFactory::createScheduler(Schema *sh) {
422 Scheduler *sc = new LastReadyScheduler(sh);
423 return sc;
424 }
425
destroy(Scheduler * sh,Schema * schema)426 void LocalDomainFactory::destroy(Scheduler *sh, Schema *schema) {
427 foreach (Link *l, schema->getFlows()) {
428 delete l->castPeer<SimpleQueue>();
429 l->setPeer(nullptr);
430 }
431
432 foreach (Actor *a, schema->getProcesses()) {
433 delete a->castPeer<BaseWorker>();
434 }
435
436 delete sh;
437 }
438
439 } // namespace LocalWorkflow
440 } // namespace U2
441