1 /**
2  * UGENE - Integrated Bioinformatics Tools.
3  * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4  * http://ugene.net
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19  * MA 02110-1301, USA.
20  */
21 
22 #include "GroupWorker.h"
23 
24 #include <U2Core/AnnotationData.h>
25 #include <U2Core/MultipleSequenceAlignment.h>
26 #include <U2Core/QVariantUtils.h>
27 #include <U2Core/U2OpStatusUtils.h>
28 #include <U2Core/U2SafePoints.h>
29 #include <U2Core/U2SequenceUtils.h>
30 
31 #include <U2Designer/DelegateEditors.h>
32 #include <U2Designer/GrouperEditor.h>
33 
34 #include <U2Lang/ActorPrototypeRegistry.h>
35 #include <U2Lang/BaseActorCategories.h>
36 #include <U2Lang/BaseTypes.h>
37 #include <U2Lang/CoreLibConstants.h>
38 #include <U2Lang/GrouperSlotAttribute.h>
39 #include <U2Lang/WorkflowEnv.h>
40 
41 namespace U2 {
42 namespace LocalWorkflow {
43 
44 static const QString INPUT_PORT("input-data");
45 static const QString OUTPUT_PORT("output-data");
46 static const QString GROUP_SIZE_SLOT_ID("group-size");
47 
48 static const QString OPER_ATTR_ID("group-op");
49 
50 /************************************************************************/
51 /* GroupWorker */
52 /************************************************************************/
GroupWorker(Actor * p)53 GroupWorker::GroupWorker(Actor *p)
54     : BaseWorker(p, false), inChannel(nullptr), outChannel(nullptr), produceOneGroup(false) {
55 }
56 
init()57 void GroupWorker::init() {
58     inChannel = ports.value(INPUT_PORT);
59     outChannel = ports.value(OUTPUT_PORT);
60     mtype = outChannel->getBusType();
61 
62     GrouperOutSlotAttribute *slotsAttr = dynamic_cast<GrouperOutSlotAttribute *>(actor->getParameter(CoreLibConstants::GROUPER_OUT_SLOTS_ATTR));
63     outSlots = slotsAttr->getOutSlots();
64     groupSlot = actor->getParameter(CoreLibConstants::GROUPER_SLOT_ATTR)->getAttributePureValue().toString();
65     produceOneGroup = groupSlot.isEmpty();
66     if (!produceOneGroup) {
67         inType = context->getOutSlotType(groupSlot);
68         groupSlot = GrouperOutSlot::readable2busMap(groupSlot);
69     }
70     groupOp = actor->getParameter(OPER_ATTR_ID)->getAttributePureValue().toString();
71 
72     if (groupOp != GroupOperations::BY_VALUE().getId() && groupOp != GroupOperations::BY_NAME().getId() && groupOp != GroupOperations::BY_ID().getId()) {
73         groupOp = GroupOperations::BY_ID().getId();
74     }
75 }
76 
tick()77 Task *GroupWorker::tick() {
78     while (inChannel->hasMessage()) {
79         Message inMessage = inChannel->look();
80         inChannel->get();
81         QVariantMap mData = inMessage.getData().toMap();
82 
83         int foundId = -1;
84         PerformersMap perfs;
85         if (produceOneGroup) {
86             foundId = 0;
87             if (1 == groupedData.size()) {
88                 perfs = groupedData[0];
89             } else {
90                 groupSize[foundId] = 0;
91             }
92             GrouperActionUtils::applyActions(context, outSlots, mData, perfs);
93         } else {
94             if (!mData.keys().contains(groupSlot)) {
95                 continue;
96             }
97 
98             QVariant gsData = mData.value(groupSlot);
99             // search group slot data at unique data
100             foreach (int id, uniqueData.keys()) {
101                 const QVariant &d = uniqueData[id];
102                 bool equal = GrouperActionUtils::equalData(groupOp, d, gsData, inType, context);
103                 if (equal) {
104                     foundId = id;
105                     perfs = groupedData[id];
106                     break;
107                 }
108             }
109 
110             // apply actions for out slots
111             GrouperActionUtils::applyActions(context, outSlots, mData, perfs);
112 
113             // add new unique data and action performers
114             if (foundId < 0) {
115                 foundId = uniqueData.size();
116                 uniqueData[foundId] = gsData;
117                 groupSize[foundId] = 0;
118             }
119         }
120         groupedData[foundId] = perfs;
121         groupSize[foundId] = groupSize[foundId] + 1;
122     }
123     if (inChannel->isEnded()) {
124         foreach (int id, groupedData.keys()) {
125             QMap<QString, ActionPerformer *> perfs = groupedData[id];
126 
127             QVariantMap data;
128             data[GROUP_SIZE_SLOT_ID] = QByteArray::number(groupSize[id]);
129             // create output data set from action performers
130             foreach (const QString &slotId, perfs.keys()) {
131                 ActionPerformer *perf = perfs[slotId];
132                 U2OpStatusImpl os;
133                 QVariant slotData = perf->finishAction(os);
134                 if (os.isCoR()) {
135                     continue;
136                 }
137                 data[slotId] = slotData;
138             }
139 
140             MessageMetadata metadata(QString("Group %1").arg(id + 1));
141             context->getMetadataStorage().put(metadata);
142             if (!produceOneGroup) {
143                 QVariantMap context;
144                 context[groupSlot] = uniqueData[id];
145                 outChannel->setContext(context, metadata.getId());
146             }
147             outChannel->put(Message(mtype, data, metadata.getId()));
148         }
149         setDone();
150         cleanup();
151         outChannel->setEnded();
152     }
153 
154     return nullptr;
155 }
156 
cleanup()157 void GroupWorker::cleanup() {
158     foreach (const PerformersMap &perfs, groupedData.values()) {
159         foreach (ActionPerformer *p, perfs.values()) {
160             delete p;
161         }
162     }
163     groupedData.clear();
164     uniqueData.clear();
165 }
166 
167 /************************************************************************/
168 /* GroupWorkerFactory */
169 /************************************************************************/
170 const QString GroupWorkerFactory::ACTOR_ID = CoreLibConstants::GROUPER_ID;
171 
init()172 void GroupWorkerFactory::init() {
173     QList<PortDescriptor *> portDescs;
174     {
175         QMap<Descriptor, DataTypePtr> emptyTypeMap;
176         DataTypePtr emptyTypeSet(new MapDataType(Descriptor(DataType::EMPTY_TYPESET_ID), emptyTypeMap));
177 
178         Descriptor inputDesc1(INPUT_PORT, GroupWorker::tr("Input data flow"), GroupWorker::tr("Input data flow"));
179         portDescs << new PortDescriptor(inputDesc1, emptyTypeSet, true);
180 
181         Descriptor groupSizeDesc(GROUP_SIZE_SLOT_ID, GroupWorker::tr("Group size"), GroupWorker::tr("Size of the created group."));
182         QMap<Descriptor, DataTypePtr> outTypeMap;
183         outTypeMap[groupSizeDesc] = BaseTypes::STRING_TYPE();
184         DataTypePtr outTypeSet(new MapDataType("Grouped data", outTypeMap));
185 
186         Descriptor outputDesc(OUTPUT_PORT, GroupWorker::tr("Grouped output data flow"), GroupWorker::tr("Grouped output data flow"));
187         portDescs << new PortDescriptor(outputDesc, outTypeSet, false, true);
188     }
189 
190     QList<Attribute *> attrs;
191     {
192         Descriptor slotsDesc(CoreLibConstants::GROUPER_OUT_SLOTS_ATTR, GroupWorker::tr("Out slots"), GroupWorker::tr("Out slots"));
193         Attribute *slotsAttr = new GrouperOutSlotAttribute(slotsDesc, BaseTypes::STRING_TYPE(), false);
194 
195         Descriptor groupDesc(CoreLibConstants::GROUPER_SLOT_ATTR, GroupWorker::tr("Group slot"), GroupWorker::tr("Group slot"));
196         Attribute *groupAttr = new GroupSlotAttribute(groupDesc, BaseTypes::STRING_TYPE(), false);
197 
198         Descriptor opDesc(OPER_ATTR_ID, GroupWorker::tr("Group operation"), GroupWorker::tr("Group operation"));
199         Attribute *opAttr = new Attribute(opDesc, BaseTypes::STRING_TYPE(), true);
200 
201         attrs << slotsAttr;
202         attrs << groupAttr;
203         attrs << opAttr;
204     }
205 
206     Descriptor protoDesc(GroupWorkerFactory::ACTOR_ID,
207                          GroupWorker::tr("Grouper"),
208                          GroupWorker::tr("Groups data supplied to the specified slot by the specified property (for example, by value). Additionally, it is possible to merge data from another slots associated with the specified one."));
209 
210     ActorPrototype *proto = new IntegralBusActorPrototype(protoDesc, portDescs, attrs);
211 
212     proto->setEditor(new GrouperEditor());
213     proto->setPrompter(new GroupPrompter());
214     proto->setAllowsEmptyPorts(true);
215 
216     WorkflowEnv::getProtoRegistry()->registerProto(BaseActorCategories::CATEGORY_DATAFLOW(), proto);
217     WorkflowEnv::getDomainRegistry()->getById(LocalDomainFactory::ID)->registerEntry(new GroupWorkerFactory());
218 }
219 
createWorker(Actor * a)220 Worker *GroupWorkerFactory::createWorker(Actor *a) {
221     return new GroupWorker(a);
222 }
223 
224 /************************************************************************/
225 /* GroupPrompter */
226 /************************************************************************/
composeRichDoc()227 QString GroupPrompter::composeRichDoc() {
228     QString inputName;
229     bool produceOneGroup = true;
230 
231     Port *input = target->getInputPorts().first();
232     if (input->getLinks().size() > 0) {
233         Port *src = input->getLinks().keys().first();
234         IntegralBusPort *bus = dynamic_cast<IntegralBusPort *>(src);
235         assert(nullptr != bus);
236         DataTypePtr type = bus->getType();
237         QMap<Descriptor, DataTypePtr> busMap = type->getDatatypesMap();
238 
239         Attribute *groupSlotAttr = target->getParameter(CoreLibConstants::GROUPER_SLOT_ATTR);
240         QString groupSlot = groupSlotAttr->getAttributeValueWithoutScript<QString>();
241         groupSlot = GrouperOutSlot::readable2busMap(groupSlot);
242 
243         foreach (const Descriptor &d, busMap.keys()) {
244             if (d.getId() == groupSlot) {
245                 inputName = d.getDisplayName();
246                 produceOneGroup = false;
247                 break;
248             }
249         }
250     }
251 
252     if (produceOneGroup) {
253         return tr("Groups all incoming messages into one message.");
254     } else {
255         QString result = tr("Groups all incoming messages <u>%1</u> of <u>%2</u> slot data.");
256         Attribute *groupOpAttr = target->getParameter(CoreLibConstants::GROUPER_OPER_ATTR);
257         QString opId = groupOpAttr->getAttributeValueWithoutScript<QString>();
258 
259         QString op;
260         if ("by-id" == opId) {
261             op = tr("by id");
262         } else if ("by-name" == opId) {
263             op = tr("by name");
264         } else if ("by-value" == opId) {
265             op = tr("by value");
266         }
267         return result.arg(op).arg(inputName);
268     }
269 }
270 
271 }  // namespace LocalWorkflow
272 }  // namespace U2
273