1 /** 2 * UGENE - Integrated Bioinformatics Tools. 3 * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru> 4 * http://ugene.net 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 2 9 * of the License, or (at your option) any later version. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with this program; if not, write to the Free Software 18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 19 * MA 02110-1301, USA. 20 */ 21 22 #ifndef _U2_WORKFLOW_INTEGRAL_BUS_H_ 23 #define _U2_WORKFLOW_INTEGRAL_BUS_H_ 24 25 #include <QMutex> 26 27 #include <U2Lang/IntegralBusModel.h> 28 #include <U2Lang/WorkflowContext.h> 29 #include <U2Lang/WorkflowTransport.h> 30 31 namespace U2 { 32 33 namespace Workflow { 34 35 /** 36 * Keeps the type of the bus. 37 * Helps to convert messages for actors. 38 */ 39 class U2LANG_EXPORT BusMap { 40 public: 41 BusMap(const StrStrMap &busMap, const QMap<QString, QStringList> &listMap, const SlotPathMap &paths); 42 BusMap(const StrStrMap &busMap, bool breaksDataflow, const QString &actorId); 43 44 Message takeMessageMap(CommunicationChannel *ch, QVariantMap &context); 45 Message lookMessageMap(CommunicationChannel *ch); 46 QVariantMap composeMessageMap(const Message &m, const QVariantMap &context); 47 48 static void parseSource(const QString &src, QString &srcId, QStringList &path); 49 static QString getNewSourceId(const QString &srcId, const QString &actorId); 50 51 private: 52 bool input; 53 54 StrStrMap busMap; 55 QMap<QString, QStringList> listMap; 56 SlotPathMap paths; 57 58 bool breaksDataflow; 59 QString actorId; 60 61 private: 62 QVariantMap getMessageData(const Message &m) const; 63 }; 64 65 /** 66 * represents communication channel for support passing data between actors 67 * connected in transitive closure of schema graph 68 * 69 * is a container of communications with other actors 70 */ 71 class U2LANG_EXPORT IntegralBus : public QObject, public CommunicationSubject, public CommunicationChannel { 72 Q_OBJECT 73 public: 74 IntegralBus(Port *peer); 75 ~IntegralBus(); 76 77 // reimplemented from CommunicationSubject 78 virtual bool addCommunication(const QString &id, CommunicationChannel *ch); 79 virtual CommunicationChannel *getCommunication(const QString &id); 80 81 void putWithoutContext(const Message &m); 82 83 // reimplemented from CommunicationChannel 84 virtual Message get(); 85 virtual Message look() const; 86 virtual void put(const Message &m, bool isMessageRestored = false); 87 // put incoming context to the output channels 88 virtual void transit(); 89 virtual int hasMessage() const; 90 virtual int takenMessages() const; 91 virtual int hasRoom(const DataType *t = nullptr) const; 92 virtual bool isEnded() const; 93 virtual void setEnded(); capacity()94 virtual int capacity() const { 95 return 1; 96 } setCapacity(int)97 virtual void setCapacity(int) { 98 } 99 virtual Message lookMessage() const; 100 virtual QQueue<Message> getMessages(int startIndex, int endIndex) const; 101 getContext()102 QVariantMap getContext() const { 103 return context; 104 } getLastMessageContext()105 QVariantMap getLastMessageContext() const { 106 return lastMessageContext; 107 } 108 void setContext(const QVariantMap &m, int metadataId); 109 int getContextMetadataId() const; 110 addComplement(IntegralBus * b)111 virtual void addComplement(IntegralBus *b) { 112 assert(!complement); 113 complement = b; 114 } 115 getPortId()116 QString getPortId() const { 117 return portId; 118 } getBusType()119 DataTypePtr getBusType() const { 120 return busType; 121 } 122 123 void setPrintSlots(bool in, const QList<QString> &printSlots); 124 125 void setWorkflowContext(WorkflowContext *context); 126 127 protected: 128 virtual Message composeMessage(const Message &); 129 130 protected: 131 // type of port integral bus is binded to 132 DataTypePtr busType; 133 // communications with other ports 134 QMap<QString, CommunicationChannel *> outerChannels; 135 // busmap of port integral bus is binded to 136 BusMap *busMap; 137 // context of an output message. See put() for details 138 QVariantMap context; 139 QVariantMap lastMessageContext; 140 int contextMetadataId; 141 // 142 IntegralBus *complement; 143 // integral bus is binded to port with this id 144 QString portId; 145 // 146 int takenMsgs; 147 148 // a content of these slots is printed to the standart output 149 QList<QString> printSlots; 150 ActorId actorId; 151 WorkflowContext *workflowContext; 152 153 QMutex *contextMutex; 154 155 }; // IntegralBus 156 157 } // namespace Workflow 158 159 } // namespace U2 160 161 #endif // _U2_WORKFLOW_INTEGRAL_BUS_H_ 162