1 /**
2 * UGENE - Integrated Bioinformatics Tools.
3 * Copyright (C) 2008-2021 UniPro <ugene@unipro.ru>
4 * http://ugene.net
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 * MA 02110-1301, USA.
20 */
21
22 #include "GroupWorker.h"
23
24 #include <U2Core/AnnotationData.h>
25 #include <U2Core/MultipleSequenceAlignment.h>
26 #include <U2Core/QVariantUtils.h>
27 #include <U2Core/U2OpStatusUtils.h>
28 #include <U2Core/U2SafePoints.h>
29 #include <U2Core/U2SequenceUtils.h>
30
31 #include <U2Designer/DelegateEditors.h>
32 #include <U2Designer/GrouperEditor.h>
33
34 #include <U2Lang/ActorPrototypeRegistry.h>
35 #include <U2Lang/BaseActorCategories.h>
36 #include <U2Lang/BaseTypes.h>
37 #include <U2Lang/CoreLibConstants.h>
38 #include <U2Lang/GrouperSlotAttribute.h>
39 #include <U2Lang/WorkflowEnv.h>
40
41 namespace U2 {
42 namespace LocalWorkflow {
43
44 static const QString INPUT_PORT("input-data");
45 static const QString OUTPUT_PORT("output-data");
46 static const QString GROUP_SIZE_SLOT_ID("group-size");
47
48 static const QString OPER_ATTR_ID("group-op");
49
50 /************************************************************************/
51 /* GroupWorker */
52 /************************************************************************/
GroupWorker(Actor * p)53 GroupWorker::GroupWorker(Actor *p)
54 : BaseWorker(p, false), inChannel(nullptr), outChannel(nullptr), produceOneGroup(false) {
55 }
56
init()57 void GroupWorker::init() {
58 inChannel = ports.value(INPUT_PORT);
59 outChannel = ports.value(OUTPUT_PORT);
60 mtype = outChannel->getBusType();
61
62 GrouperOutSlotAttribute *slotsAttr = dynamic_cast<GrouperOutSlotAttribute *>(actor->getParameter(CoreLibConstants::GROUPER_OUT_SLOTS_ATTR));
63 outSlots = slotsAttr->getOutSlots();
64 groupSlot = actor->getParameter(CoreLibConstants::GROUPER_SLOT_ATTR)->getAttributePureValue().toString();
65 produceOneGroup = groupSlot.isEmpty();
66 if (!produceOneGroup) {
67 inType = context->getOutSlotType(groupSlot);
68 groupSlot = GrouperOutSlot::readable2busMap(groupSlot);
69 }
70 groupOp = actor->getParameter(OPER_ATTR_ID)->getAttributePureValue().toString();
71
72 if (groupOp != GroupOperations::BY_VALUE().getId() && groupOp != GroupOperations::BY_NAME().getId() && groupOp != GroupOperations::BY_ID().getId()) {
73 groupOp = GroupOperations::BY_ID().getId();
74 }
75 }
76
tick()77 Task *GroupWorker::tick() {
78 while (inChannel->hasMessage()) {
79 Message inMessage = inChannel->look();
80 inChannel->get();
81 QVariantMap mData = inMessage.getData().toMap();
82
83 int foundId = -1;
84 PerformersMap perfs;
85 if (produceOneGroup) {
86 foundId = 0;
87 if (1 == groupedData.size()) {
88 perfs = groupedData[0];
89 } else {
90 groupSize[foundId] = 0;
91 }
92 GrouperActionUtils::applyActions(context, outSlots, mData, perfs);
93 } else {
94 if (!mData.keys().contains(groupSlot)) {
95 continue;
96 }
97
98 QVariant gsData = mData.value(groupSlot);
99 // search group slot data at unique data
100 foreach (int id, uniqueData.keys()) {
101 const QVariant &d = uniqueData[id];
102 bool equal = GrouperActionUtils::equalData(groupOp, d, gsData, inType, context);
103 if (equal) {
104 foundId = id;
105 perfs = groupedData[id];
106 break;
107 }
108 }
109
110 // apply actions for out slots
111 GrouperActionUtils::applyActions(context, outSlots, mData, perfs);
112
113 // add new unique data and action performers
114 if (foundId < 0) {
115 foundId = uniqueData.size();
116 uniqueData[foundId] = gsData;
117 groupSize[foundId] = 0;
118 }
119 }
120 groupedData[foundId] = perfs;
121 groupSize[foundId] = groupSize[foundId] + 1;
122 }
123 if (inChannel->isEnded()) {
124 foreach (int id, groupedData.keys()) {
125 QMap<QString, ActionPerformer *> perfs = groupedData[id];
126
127 QVariantMap data;
128 data[GROUP_SIZE_SLOT_ID] = QByteArray::number(groupSize[id]);
129 // create output data set from action performers
130 foreach (const QString &slotId, perfs.keys()) {
131 ActionPerformer *perf = perfs[slotId];
132 U2OpStatusImpl os;
133 QVariant slotData = perf->finishAction(os);
134 if (os.isCoR()) {
135 continue;
136 }
137 data[slotId] = slotData;
138 }
139
140 MessageMetadata metadata(QString("Group %1").arg(id + 1));
141 context->getMetadataStorage().put(metadata);
142 if (!produceOneGroup) {
143 QVariantMap context;
144 context[groupSlot] = uniqueData[id];
145 outChannel->setContext(context, metadata.getId());
146 }
147 outChannel->put(Message(mtype, data, metadata.getId()));
148 }
149 setDone();
150 cleanup();
151 outChannel->setEnded();
152 }
153
154 return nullptr;
155 }
156
cleanup()157 void GroupWorker::cleanup() {
158 foreach (const PerformersMap &perfs, groupedData.values()) {
159 foreach (ActionPerformer *p, perfs.values()) {
160 delete p;
161 }
162 }
163 groupedData.clear();
164 uniqueData.clear();
165 }
166
167 /************************************************************************/
168 /* GroupWorkerFactory */
169 /************************************************************************/
170 const QString GroupWorkerFactory::ACTOR_ID = CoreLibConstants::GROUPER_ID;
171
init()172 void GroupWorkerFactory::init() {
173 QList<PortDescriptor *> portDescs;
174 {
175 QMap<Descriptor, DataTypePtr> emptyTypeMap;
176 DataTypePtr emptyTypeSet(new MapDataType(Descriptor(DataType::EMPTY_TYPESET_ID), emptyTypeMap));
177
178 Descriptor inputDesc1(INPUT_PORT, GroupWorker::tr("Input data flow"), GroupWorker::tr("Input data flow"));
179 portDescs << new PortDescriptor(inputDesc1, emptyTypeSet, true);
180
181 Descriptor groupSizeDesc(GROUP_SIZE_SLOT_ID, GroupWorker::tr("Group size"), GroupWorker::tr("Size of the created group."));
182 QMap<Descriptor, DataTypePtr> outTypeMap;
183 outTypeMap[groupSizeDesc] = BaseTypes::STRING_TYPE();
184 DataTypePtr outTypeSet(new MapDataType("Grouped data", outTypeMap));
185
186 Descriptor outputDesc(OUTPUT_PORT, GroupWorker::tr("Grouped output data flow"), GroupWorker::tr("Grouped output data flow"));
187 portDescs << new PortDescriptor(outputDesc, outTypeSet, false, true);
188 }
189
190 QList<Attribute *> attrs;
191 {
192 Descriptor slotsDesc(CoreLibConstants::GROUPER_OUT_SLOTS_ATTR, GroupWorker::tr("Out slots"), GroupWorker::tr("Out slots"));
193 Attribute *slotsAttr = new GrouperOutSlotAttribute(slotsDesc, BaseTypes::STRING_TYPE(), false);
194
195 Descriptor groupDesc(CoreLibConstants::GROUPER_SLOT_ATTR, GroupWorker::tr("Group slot"), GroupWorker::tr("Group slot"));
196 Attribute *groupAttr = new GroupSlotAttribute(groupDesc, BaseTypes::STRING_TYPE(), false);
197
198 Descriptor opDesc(OPER_ATTR_ID, GroupWorker::tr("Group operation"), GroupWorker::tr("Group operation"));
199 Attribute *opAttr = new Attribute(opDesc, BaseTypes::STRING_TYPE(), true);
200
201 attrs << slotsAttr;
202 attrs << groupAttr;
203 attrs << opAttr;
204 }
205
206 Descriptor protoDesc(GroupWorkerFactory::ACTOR_ID,
207 GroupWorker::tr("Grouper"),
208 GroupWorker::tr("Groups data supplied to the specified slot by the specified property (for example, by value). Additionally, it is possible to merge data from another slots associated with the specified one."));
209
210 ActorPrototype *proto = new IntegralBusActorPrototype(protoDesc, portDescs, attrs);
211
212 proto->setEditor(new GrouperEditor());
213 proto->setPrompter(new GroupPrompter());
214 proto->setAllowsEmptyPorts(true);
215
216 WorkflowEnv::getProtoRegistry()->registerProto(BaseActorCategories::CATEGORY_DATAFLOW(), proto);
217 WorkflowEnv::getDomainRegistry()->getById(LocalDomainFactory::ID)->registerEntry(new GroupWorkerFactory());
218 }
219
createWorker(Actor * a)220 Worker *GroupWorkerFactory::createWorker(Actor *a) {
221 return new GroupWorker(a);
222 }
223
224 /************************************************************************/
225 /* GroupPrompter */
226 /************************************************************************/
composeRichDoc()227 QString GroupPrompter::composeRichDoc() {
228 QString inputName;
229 bool produceOneGroup = true;
230
231 Port *input = target->getInputPorts().first();
232 if (input->getLinks().size() > 0) {
233 Port *src = input->getLinks().keys().first();
234 IntegralBusPort *bus = dynamic_cast<IntegralBusPort *>(src);
235 assert(nullptr != bus);
236 DataTypePtr type = bus->getType();
237 QMap<Descriptor, DataTypePtr> busMap = type->getDatatypesMap();
238
239 Attribute *groupSlotAttr = target->getParameter(CoreLibConstants::GROUPER_SLOT_ATTR);
240 QString groupSlot = groupSlotAttr->getAttributeValueWithoutScript<QString>();
241 groupSlot = GrouperOutSlot::readable2busMap(groupSlot);
242
243 foreach (const Descriptor &d, busMap.keys()) {
244 if (d.getId() == groupSlot) {
245 inputName = d.getDisplayName();
246 produceOneGroup = false;
247 break;
248 }
249 }
250 }
251
252 if (produceOneGroup) {
253 return tr("Groups all incoming messages into one message.");
254 } else {
255 QString result = tr("Groups all incoming messages <u>%1</u> of <u>%2</u> slot data.");
256 Attribute *groupOpAttr = target->getParameter(CoreLibConstants::GROUPER_OPER_ATTR);
257 QString opId = groupOpAttr->getAttributeValueWithoutScript<QString>();
258
259 QString op;
260 if ("by-id" == opId) {
261 op = tr("by id");
262 } else if ("by-name" == opId) {
263 op = tr("by name");
264 } else if ("by-value" == opId) {
265 op = tr("by value");
266 }
267 return result.arg(op).arg(inputName);
268 }
269 }
270
271 } // namespace LocalWorkflow
272 } // namespace U2
273