1 /***********************************************************************************************************************************
2 IO Filter Group
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5 
6 #include <stdio.h>
7 
8 #include "common/debug.h"
9 #include "common/io/filter/buffer.h"
10 #include "common/io/filter/filter.h"
11 #include "common/io/filter/group.h"
12 #include "common/io/io.h"
13 #include "common/log.h"
14 #include "common/memContext.h"
15 #include "common/type/list.h"
16 
17 /***********************************************************************************************************************************
18 Filter and buffer structure
19 
20 Contains the filter object and inout/output buffers.
21 ***********************************************************************************************************************************/
22 typedef struct IoFilterData
23 {
24     const Buffer **input;                                           // Input buffer for filter
25     Buffer *inputLocal;                                             // Non-null if a locally created buffer that can be cleared
26     IoFilter *filter;                                               // Filter to apply
27     Buffer *output;                                                 // Output buffer for filter
28 } IoFilterData;
29 
30 // Macros for logging
31 #define FUNCTION_LOG_IO_FILTER_DATA_TYPE                                                                                           \
32     IoFilterData *
33 #define FUNCTION_LOG_IO_FILTER_DATA_FORMAT(value, buffer, bufferSize)                                                              \
34     objToLog(value, "IoFilterData", buffer, bufferSize)
35 
36 /***********************************************************************************************************************************
37 Object type
38 ***********************************************************************************************************************************/
39 struct IoFilterGroup
40 {
41     IoFilterGroupPub pub;                                           // Publicly accessible variables
42     const Buffer *input;                                            // Input buffer passed in for processing
43     KeyValue *filterResult;                                         // Filter results (if any)
44 
45 #ifdef DEBUG
46     bool flushing;                                                  // Is output being flushed?
47 #endif
48 };
49 
50 /**********************************************************************************************************************************/
51 IoFilterGroup *
ioFilterGroupNew(void)52 ioFilterGroupNew(void)
53 {
54     FUNCTION_LOG_VOID(logLevelTrace);
55 
56     IoFilterGroup *this = NULL;
57 
58     MEM_CONTEXT_NEW_BEGIN("IoFilterGroup")
59     {
60         this = memNew(sizeof(IoFilterGroup));
61 
62         *this = (IoFilterGroup)
63         {
64             .pub =
65             {
66                 .memContext = memContextCurrent(),
67                 .done = false,
68                 .filterList = lstNewP(sizeof(IoFilterData)),
69             },
70         };
71     }
72     MEM_CONTEXT_NEW_END();
73 
74     FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
75 }
76 
77 /**********************************************************************************************************************************/
78 IoFilterGroup *
ioFilterGroupAdd(IoFilterGroup * this,IoFilter * filter)79 ioFilterGroupAdd(IoFilterGroup *this, IoFilter *filter)
80 {
81     FUNCTION_LOG_BEGIN(logLevelDebug);
82         FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
83         FUNCTION_LOG_PARAM(IO_FILTER, filter);
84     FUNCTION_LOG_END();
85 
86     ASSERT(this != NULL);
87     ASSERT(!this->pub.opened && !this->pub.closed);
88     ASSERT(filter != NULL);
89 
90     // Move the filter to this object's mem context
91     ioFilterMove(filter, this->pub.memContext);
92 
93     // Add the filter
94     IoFilterData filterData = {.filter = filter};
95     lstAdd(this->pub.filterList, &filterData);
96 
97     FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
98 }
99 
100 /**********************************************************************************************************************************/
101 IoFilterGroup *
ioFilterGroupInsert(IoFilterGroup * this,unsigned int listIdx,IoFilter * filter)102 ioFilterGroupInsert(IoFilterGroup *this, unsigned int listIdx, IoFilter *filter)
103 {
104     FUNCTION_LOG_BEGIN(logLevelDebug);
105         FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
106         FUNCTION_LOG_PARAM(IO_FILTER, filter);
107     FUNCTION_LOG_END();
108 
109     ASSERT(this != NULL);
110     ASSERT(!this->pub.opened && !this->pub.closed);
111     ASSERT(filter != NULL);
112 
113     // Move the filter to this object's mem context
114     ioFilterMove(filter, this->pub.memContext);
115 
116     // Add the filter
117     IoFilterData filterData = {.filter = filter};
118     lstInsert(this->pub.filterList, listIdx, &filterData);
119 
120     FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
121 }
122 
123 /***********************************************************************************************************************************
124 Get a filter
125 ***********************************************************************************************************************************/
126 static IoFilterData *
ioFilterGroupGet(const IoFilterGroup * this,unsigned int filterIdx)127 ioFilterGroupGet(const IoFilterGroup *this, unsigned int filterIdx)
128 {
129     FUNCTION_TEST_BEGIN();
130         FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this);
131         FUNCTION_TEST_PARAM(UINT, filterIdx);
132     FUNCTION_TEST_END();
133 
134     ASSERT(this != NULL);
135 
136     FUNCTION_TEST_RETURN((IoFilterData *)lstGet(this->pub.filterList, filterIdx));
137 }
138 
139 /**********************************************************************************************************************************/
140 IoFilterGroup *
ioFilterGroupClear(IoFilterGroup * this)141 ioFilterGroupClear(IoFilterGroup *this)
142 {
143     FUNCTION_LOG_BEGIN(logLevelDebug);
144         FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
145     FUNCTION_LOG_END();
146 
147     ASSERT(this != NULL);
148     ASSERT(!this->pub.opened);
149 
150     for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
151         ioFilterFree(ioFilterGroupGet(this, filterIdx)->filter);
152 
153     lstClear(this->pub.filterList);
154 
155     FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
156 }
157 
158 /***********************************************************************************************************************************
159 Setup the filter group and allocate any required buffers
160 ***********************************************************************************************************************************/
161 void
ioFilterGroupOpen(IoFilterGroup * this)162 ioFilterGroupOpen(IoFilterGroup *this)
163 {
164     FUNCTION_LOG_BEGIN(logLevelTrace);
165         FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
166     FUNCTION_LOG_END();
167 
168     ASSERT(this != NULL);
169 
170     MEM_CONTEXT_BEGIN(this->pub.memContext)
171     {
172         // If the last filter is not an output filter then add a filter to buffer/copy data.  Input filters won't copy to an output
173         // buffer so we need some way to get the data to the output buffer.
174         if (ioFilterGroupSize(this) == 0 ||
175             !ioFilterOutput((ioFilterGroupGet(this, ioFilterGroupSize(this) - 1))->filter))
176         {
177             ioFilterGroupAdd(this, ioBufferNew());
178         }
179 
180         // Create filter input/output buffers.  Input filters do not get an output buffer since they don't produce output.
181         Buffer **lastOutputBuffer = NULL;
182 
183         for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
184         {
185             IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
186 
187             // If there is no last output buffer yet, then use the input buffer that will be provided by the caller
188             if (lastOutputBuffer == NULL)
189             {
190                 filterData->input = &this->input;
191             }
192             // Else assign the last output buffer to the input
193             else
194             {
195                 // This cast is required because the compiler can't guarantee the const-ness of this object, i.e. it could be
196                 // modified in other parts of the code.  This is actually expected and the only reason we need this const is to
197                 // match the const-ness of the input buffer provided by the caller.
198                 filterData->input = (const Buffer **)lastOutputBuffer;
199                 filterData->inputLocal = *lastOutputBuffer;
200             }
201 
202             // If this is not the last output filter then create a new output buffer for it.  The output buffer for the last filter
203             // will be provided to the process function.
204             if (ioFilterOutput(filterData->filter) && filterIdx < ioFilterGroupSize(this) - 1)
205             {
206                 filterData->output = bufNew(ioBufferSize());
207                 lastOutputBuffer = &filterData->output;
208             }
209         }
210     }
211     MEM_CONTEXT_END();
212 
213     // Filter group is open
214 #ifdef DEBUG
215     this->pub.opened = true;
216 #endif
217 
218     FUNCTION_LOG_RETURN_VOID();
219 }
220 
221 /**********************************************************************************************************************************/
222 void
ioFilterGroupProcess(IoFilterGroup * this,const Buffer * input,Buffer * output)223 ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
224 {
225     FUNCTION_LOG_BEGIN(logLevelTrace);
226         FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
227         FUNCTION_LOG_PARAM(BUFFER, input);
228         FUNCTION_LOG_PARAM(BUFFER, output);
229     FUNCTION_LOG_END();
230 
231     ASSERT(this != NULL);
232     ASSERT(this->pub.opened && !this->pub.closed);
233     ASSERT(input == NULL || !bufEmpty(input));
234     ASSERT(!this->flushing || input == NULL);
235     ASSERT(output != NULL);
236     ASSERT(bufRemains(output) > 0);
237 
238     // Once input goes to NULL then flushing has started
239 #ifdef DEBUG
240     if (input == NULL)
241         this->flushing = true;
242 #endif
243 
244     // Assign input and output buffers
245     this->input = input;
246     (ioFilterGroupGet(this, ioFilterGroupSize(this) - 1))->output = output;
247 
248     //
249     do
250     {
251         // Start from the first filter by default
252         unsigned int filterIdx = 0;
253 
254         // Search from the end of the list for a filter that needs the same input.  This indicates that the filter was not able to
255         // empty the input buffer on the last call.  Maybe it won't this time either -- we can but try.
256         if (ioFilterGroupInputSame(this))
257         {
258             this->pub.inputSame = false;
259             filterIdx = ioFilterGroupSize(this);
260 
261             do
262             {
263                 filterIdx--;
264 
265                 if (ioFilterInputSame((ioFilterGroupGet(this, filterIdx))->filter))
266                 {
267                     this->pub.inputSame = true;
268                     break;
269                 }
270             }
271             while (filterIdx != 0);
272 
273             // If no filter is found that needs the same input that means we are done with the current input.  So end the loop and
274             // get some more input.
275             if (!ioFilterGroupInputSame(this))
276                 break;
277         }
278 
279         // Process forward from the filter that has input to process.  This may be a filter that needs the same input or it may be
280         // new input for the first filter.
281         for (; filterIdx < ioFilterGroupSize(this); filterIdx++)
282         {
283             IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
284 
285             // Process the filter if it is not done
286             if (!ioFilterDone(filterData->filter))
287             {
288                 // If the filter produces output
289                 if (ioFilterOutput(filterData->filter))
290                 {
291                     ioFilterProcessInOut(filterData->filter, *filterData->input, filterData->output);
292 
293                     // If inputSame is set then the output buffer for this filter is full and it will need to be re-processed with
294                     // the same input once the output buffer is cleared
295                     if (ioFilterInputSame(filterData->filter))
296                     {
297                         this->pub.inputSame = true;
298                     }
299                     // Else clear the buffer if it was locally allocated.  If the input buffer was passed in then the caller is
300                     // responsible for clearing it.
301                     else if (filterData->inputLocal != NULL)
302                         bufUsedZero(filterData->inputLocal);
303 
304                     // If the output buffer is not full and the filter is not done then more data is required
305                     if (!bufFull(filterData->output) && !ioFilterDone(filterData->filter))
306                         break;
307                 }
308                 // Else the filter does not produce output
309                 else
310                     ioFilterProcessIn(filterData->filter, *filterData->input);
311             }
312 
313             // If the filter is done and has no more output then null the output buffer.  Downstream filters have a pointer to this
314             // buffer so their inputs will also change to null and they'll flush.
315             if (filterData->output != NULL && ioFilterDone(filterData->filter) && bufUsed(filterData->output) == 0)
316                 filterData->output = NULL;
317         }
318     }
319     while (!bufFull(output) && ioFilterGroupInputSame(this));
320 
321     // Scan the filter list to determine if inputSame is set or done is not set for any filter. We can't trust
322     // ioFilterGroupInputSame() when it is true without going through the loop above again. We need to scan to set this->pub.done
323     // anyway so set this->pub.inputSame in the same loop.
324     this->pub.done = true;
325     this->pub.inputSame = false;
326 
327     for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
328     {
329         IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
330 
331         // When inputSame then this->pub.done = false and we can exit the loop immediately
332         if (ioFilterInputSame(filterData->filter))
333         {
334             this->pub.done = false;
335             this->pub.inputSame = true;
336             break;
337         }
338 
339         // Set this->pub.done = false if any filter is not done
340         if (!ioFilterDone(filterData->filter))
341             this->pub.done = false;
342     }
343 
344     FUNCTION_LOG_RETURN_VOID();
345 }
346 
347 /**********************************************************************************************************************************/
348 void
ioFilterGroupClose(IoFilterGroup * this)349 ioFilterGroupClose(IoFilterGroup *this)
350 {
351     FUNCTION_LOG_BEGIN(logLevelTrace);
352         FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
353     FUNCTION_LOG_END();
354 
355     ASSERT(this != NULL);
356     ASSERT(this->pub.opened && !this->pub.closed);
357 
358     for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
359     {
360         IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
361         const Variant *filterResult = ioFilterResult(filterData->filter);
362 
363         if (this->filterResult == NULL)
364         {
365             MEM_CONTEXT_BEGIN(this->pub.memContext)
366             {
367                 this->filterResult = kvNew();
368             }
369             MEM_CONTEXT_END();
370         }
371 
372         MEM_CONTEXT_TEMP_BEGIN()
373         {
374             kvAdd(this->filterResult, VARSTR(ioFilterType(filterData->filter)), filterResult);
375         }
376         MEM_CONTEXT_TEMP_END();
377     }
378 
379     // Filter group is open
380 #ifdef DEBUG
381     this->pub.closed = true;
382 #endif
383 
384     FUNCTION_LOG_RETURN_VOID();
385 }
386 
387 /**********************************************************************************************************************************/
388 Variant *
ioFilterGroupParamAll(const IoFilterGroup * this)389 ioFilterGroupParamAll(const IoFilterGroup *this)
390 {
391     FUNCTION_LOG_BEGIN(logLevelDebug);
392         FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
393     FUNCTION_LOG_END();
394 
395     ASSERT(this != NULL);
396     ASSERT(!this->pub.opened);
397     ASSERT(this->pub.filterList != NULL);
398 
399     VariantList *result = varLstNew();
400 
401     for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
402     {
403         IoFilter *filter = ioFilterGroupGet(this, filterIdx)->filter;
404         const VariantList *paramList = ioFilterParamList(filter);
405 
406         KeyValue *filterParam = kvNew();
407         kvPut(filterParam, VARSTR(ioFilterType(filter)), paramList ? varNewVarLst(paramList) : NULL);
408 
409         varLstAdd(result, varNewKv(filterParam));
410     }
411 
412     FUNCTION_LOG_RETURN(VARIANT, varNewVarLst(result));
413 }
414 
415 /**********************************************************************************************************************************/
416 const Variant *
ioFilterGroupResult(const IoFilterGroup * this,const String * filterType)417 ioFilterGroupResult(const IoFilterGroup *this, const String *filterType)
418 {
419     FUNCTION_LOG_BEGIN(logLevelDebug);
420         FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
421         FUNCTION_LOG_PARAM(STRING, filterType);
422     FUNCTION_LOG_END();
423 
424     ASSERT(this->pub.opened);
425     ASSERT(filterType != NULL);
426 
427     const Variant *result = NULL;
428 
429     MEM_CONTEXT_TEMP_BEGIN()
430     {
431         result = kvGet(this->filterResult, VARSTR(filterType));
432     }
433     MEM_CONTEXT_TEMP_END();
434 
435     FUNCTION_LOG_RETURN_CONST(VARIANT, result);
436 }
437 
438 /**********************************************************************************************************************************/
439 const Variant *
ioFilterGroupResultAll(const IoFilterGroup * this)440 ioFilterGroupResultAll(const IoFilterGroup *this)
441 {
442     FUNCTION_LOG_BEGIN(logLevelDebug);
443         FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
444     FUNCTION_LOG_END();
445 
446     ASSERT(this != NULL);
447     ASSERT(this->pub.closed);
448 
449     FUNCTION_LOG_RETURN_CONST(VARIANT, varNewKv(this->filterResult));
450 }
451 
452 /**********************************************************************************************************************************/
453 void
ioFilterGroupResultAllSet(IoFilterGroup * this,const Variant * filterResult)454 ioFilterGroupResultAllSet(IoFilterGroup *this, const Variant *filterResult)
455 {
456     FUNCTION_LOG_BEGIN(logLevelDebug);
457         FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
458     FUNCTION_LOG_END();
459 
460     ASSERT(this != NULL);
461 
462     if (filterResult != NULL)
463     {
464         MEM_CONTEXT_BEGIN(this->pub.memContext)
465         {
466             this->filterResult = kvDup(varKv(filterResult));
467         }
468         MEM_CONTEXT_END();
469     }
470 
471     FUNCTION_LOG_RETURN_VOID();
472 }
473 
474 /**********************************************************************************************************************************/
475 String *
ioFilterGroupToLog(const IoFilterGroup * this)476 ioFilterGroupToLog(const IoFilterGroup *this)
477 {
478     return strNewFmt(
479         "{inputSame: %s, done: %s"
480 #ifdef DEBUG
481             ", opened %s, flushing %s, closed %s"
482 #endif
483             "}",
484         cvtBoolToConstZ(this->pub.inputSame), cvtBoolToConstZ(this->pub.done)
485 #ifdef DEBUG
486         , cvtBoolToConstZ(this->pub.opened), cvtBoolToConstZ(this->flushing), cvtBoolToConstZ(this->pub.closed)
487 #endif
488     );
489 }
490