1 /***********************************************************************************************************************************
2 IO Filter Group
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5
6 #include <stdio.h>
7
8 #include "common/debug.h"
9 #include "common/io/filter/buffer.h"
10 #include "common/io/filter/filter.h"
11 #include "common/io/filter/group.h"
12 #include "common/io/io.h"
13 #include "common/log.h"
14 #include "common/memContext.h"
15 #include "common/type/list.h"
16
17 /***********************************************************************************************************************************
18 Filter and buffer structure
19
20 Contains the filter object and inout/output buffers.
21 ***********************************************************************************************************************************/
22 typedef struct IoFilterData
23 {
24 const Buffer **input; // Input buffer for filter
25 Buffer *inputLocal; // Non-null if a locally created buffer that can be cleared
26 IoFilter *filter; // Filter to apply
27 Buffer *output; // Output buffer for filter
28 } IoFilterData;
29
30 // Macros for logging
31 #define FUNCTION_LOG_IO_FILTER_DATA_TYPE \
32 IoFilterData *
33 #define FUNCTION_LOG_IO_FILTER_DATA_FORMAT(value, buffer, bufferSize) \
34 objToLog(value, "IoFilterData", buffer, bufferSize)
35
36 /***********************************************************************************************************************************
37 Object type
38 ***********************************************************************************************************************************/
39 struct IoFilterGroup
40 {
41 IoFilterGroupPub pub; // Publicly accessible variables
42 const Buffer *input; // Input buffer passed in for processing
43 KeyValue *filterResult; // Filter results (if any)
44
45 #ifdef DEBUG
46 bool flushing; // Is output being flushed?
47 #endif
48 };
49
50 /**********************************************************************************************************************************/
51 IoFilterGroup *
ioFilterGroupNew(void)52 ioFilterGroupNew(void)
53 {
54 FUNCTION_LOG_VOID(logLevelTrace);
55
56 IoFilterGroup *this = NULL;
57
58 MEM_CONTEXT_NEW_BEGIN("IoFilterGroup")
59 {
60 this = memNew(sizeof(IoFilterGroup));
61
62 *this = (IoFilterGroup)
63 {
64 .pub =
65 {
66 .memContext = memContextCurrent(),
67 .done = false,
68 .filterList = lstNewP(sizeof(IoFilterData)),
69 },
70 };
71 }
72 MEM_CONTEXT_NEW_END();
73
74 FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
75 }
76
77 /**********************************************************************************************************************************/
78 IoFilterGroup *
ioFilterGroupAdd(IoFilterGroup * this,IoFilter * filter)79 ioFilterGroupAdd(IoFilterGroup *this, IoFilter *filter)
80 {
81 FUNCTION_LOG_BEGIN(logLevelDebug);
82 FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
83 FUNCTION_LOG_PARAM(IO_FILTER, filter);
84 FUNCTION_LOG_END();
85
86 ASSERT(this != NULL);
87 ASSERT(!this->pub.opened && !this->pub.closed);
88 ASSERT(filter != NULL);
89
90 // Move the filter to this object's mem context
91 ioFilterMove(filter, this->pub.memContext);
92
93 // Add the filter
94 IoFilterData filterData = {.filter = filter};
95 lstAdd(this->pub.filterList, &filterData);
96
97 FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
98 }
99
100 /**********************************************************************************************************************************/
101 IoFilterGroup *
ioFilterGroupInsert(IoFilterGroup * this,unsigned int listIdx,IoFilter * filter)102 ioFilterGroupInsert(IoFilterGroup *this, unsigned int listIdx, IoFilter *filter)
103 {
104 FUNCTION_LOG_BEGIN(logLevelDebug);
105 FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
106 FUNCTION_LOG_PARAM(IO_FILTER, filter);
107 FUNCTION_LOG_END();
108
109 ASSERT(this != NULL);
110 ASSERT(!this->pub.opened && !this->pub.closed);
111 ASSERT(filter != NULL);
112
113 // Move the filter to this object's mem context
114 ioFilterMove(filter, this->pub.memContext);
115
116 // Add the filter
117 IoFilterData filterData = {.filter = filter};
118 lstInsert(this->pub.filterList, listIdx, &filterData);
119
120 FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
121 }
122
123 /***********************************************************************************************************************************
124 Get a filter
125 ***********************************************************************************************************************************/
126 static IoFilterData *
ioFilterGroupGet(const IoFilterGroup * this,unsigned int filterIdx)127 ioFilterGroupGet(const IoFilterGroup *this, unsigned int filterIdx)
128 {
129 FUNCTION_TEST_BEGIN();
130 FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this);
131 FUNCTION_TEST_PARAM(UINT, filterIdx);
132 FUNCTION_TEST_END();
133
134 ASSERT(this != NULL);
135
136 FUNCTION_TEST_RETURN((IoFilterData *)lstGet(this->pub.filterList, filterIdx));
137 }
138
139 /**********************************************************************************************************************************/
140 IoFilterGroup *
ioFilterGroupClear(IoFilterGroup * this)141 ioFilterGroupClear(IoFilterGroup *this)
142 {
143 FUNCTION_LOG_BEGIN(logLevelDebug);
144 FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
145 FUNCTION_LOG_END();
146
147 ASSERT(this != NULL);
148 ASSERT(!this->pub.opened);
149
150 for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
151 ioFilterFree(ioFilterGroupGet(this, filterIdx)->filter);
152
153 lstClear(this->pub.filterList);
154
155 FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
156 }
157
158 /***********************************************************************************************************************************
159 Setup the filter group and allocate any required buffers
160 ***********************************************************************************************************************************/
161 void
ioFilterGroupOpen(IoFilterGroup * this)162 ioFilterGroupOpen(IoFilterGroup *this)
163 {
164 FUNCTION_LOG_BEGIN(logLevelTrace);
165 FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
166 FUNCTION_LOG_END();
167
168 ASSERT(this != NULL);
169
170 MEM_CONTEXT_BEGIN(this->pub.memContext)
171 {
172 // If the last filter is not an output filter then add a filter to buffer/copy data. Input filters won't copy to an output
173 // buffer so we need some way to get the data to the output buffer.
174 if (ioFilterGroupSize(this) == 0 ||
175 !ioFilterOutput((ioFilterGroupGet(this, ioFilterGroupSize(this) - 1))->filter))
176 {
177 ioFilterGroupAdd(this, ioBufferNew());
178 }
179
180 // Create filter input/output buffers. Input filters do not get an output buffer since they don't produce output.
181 Buffer **lastOutputBuffer = NULL;
182
183 for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
184 {
185 IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
186
187 // If there is no last output buffer yet, then use the input buffer that will be provided by the caller
188 if (lastOutputBuffer == NULL)
189 {
190 filterData->input = &this->input;
191 }
192 // Else assign the last output buffer to the input
193 else
194 {
195 // This cast is required because the compiler can't guarantee the const-ness of this object, i.e. it could be
196 // modified in other parts of the code. This is actually expected and the only reason we need this const is to
197 // match the const-ness of the input buffer provided by the caller.
198 filterData->input = (const Buffer **)lastOutputBuffer;
199 filterData->inputLocal = *lastOutputBuffer;
200 }
201
202 // If this is not the last output filter then create a new output buffer for it. The output buffer for the last filter
203 // will be provided to the process function.
204 if (ioFilterOutput(filterData->filter) && filterIdx < ioFilterGroupSize(this) - 1)
205 {
206 filterData->output = bufNew(ioBufferSize());
207 lastOutputBuffer = &filterData->output;
208 }
209 }
210 }
211 MEM_CONTEXT_END();
212
213 // Filter group is open
214 #ifdef DEBUG
215 this->pub.opened = true;
216 #endif
217
218 FUNCTION_LOG_RETURN_VOID();
219 }
220
221 /**********************************************************************************************************************************/
222 void
ioFilterGroupProcess(IoFilterGroup * this,const Buffer * input,Buffer * output)223 ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
224 {
225 FUNCTION_LOG_BEGIN(logLevelTrace);
226 FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
227 FUNCTION_LOG_PARAM(BUFFER, input);
228 FUNCTION_LOG_PARAM(BUFFER, output);
229 FUNCTION_LOG_END();
230
231 ASSERT(this != NULL);
232 ASSERT(this->pub.opened && !this->pub.closed);
233 ASSERT(input == NULL || !bufEmpty(input));
234 ASSERT(!this->flushing || input == NULL);
235 ASSERT(output != NULL);
236 ASSERT(bufRemains(output) > 0);
237
238 // Once input goes to NULL then flushing has started
239 #ifdef DEBUG
240 if (input == NULL)
241 this->flushing = true;
242 #endif
243
244 // Assign input and output buffers
245 this->input = input;
246 (ioFilterGroupGet(this, ioFilterGroupSize(this) - 1))->output = output;
247
248 //
249 do
250 {
251 // Start from the first filter by default
252 unsigned int filterIdx = 0;
253
254 // Search from the end of the list for a filter that needs the same input. This indicates that the filter was not able to
255 // empty the input buffer on the last call. Maybe it won't this time either -- we can but try.
256 if (ioFilterGroupInputSame(this))
257 {
258 this->pub.inputSame = false;
259 filterIdx = ioFilterGroupSize(this);
260
261 do
262 {
263 filterIdx--;
264
265 if (ioFilterInputSame((ioFilterGroupGet(this, filterIdx))->filter))
266 {
267 this->pub.inputSame = true;
268 break;
269 }
270 }
271 while (filterIdx != 0);
272
273 // If no filter is found that needs the same input that means we are done with the current input. So end the loop and
274 // get some more input.
275 if (!ioFilterGroupInputSame(this))
276 break;
277 }
278
279 // Process forward from the filter that has input to process. This may be a filter that needs the same input or it may be
280 // new input for the first filter.
281 for (; filterIdx < ioFilterGroupSize(this); filterIdx++)
282 {
283 IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
284
285 // Process the filter if it is not done
286 if (!ioFilterDone(filterData->filter))
287 {
288 // If the filter produces output
289 if (ioFilterOutput(filterData->filter))
290 {
291 ioFilterProcessInOut(filterData->filter, *filterData->input, filterData->output);
292
293 // If inputSame is set then the output buffer for this filter is full and it will need to be re-processed with
294 // the same input once the output buffer is cleared
295 if (ioFilterInputSame(filterData->filter))
296 {
297 this->pub.inputSame = true;
298 }
299 // Else clear the buffer if it was locally allocated. If the input buffer was passed in then the caller is
300 // responsible for clearing it.
301 else if (filterData->inputLocal != NULL)
302 bufUsedZero(filterData->inputLocal);
303
304 // If the output buffer is not full and the filter is not done then more data is required
305 if (!bufFull(filterData->output) && !ioFilterDone(filterData->filter))
306 break;
307 }
308 // Else the filter does not produce output
309 else
310 ioFilterProcessIn(filterData->filter, *filterData->input);
311 }
312
313 // If the filter is done and has no more output then null the output buffer. Downstream filters have a pointer to this
314 // buffer so their inputs will also change to null and they'll flush.
315 if (filterData->output != NULL && ioFilterDone(filterData->filter) && bufUsed(filterData->output) == 0)
316 filterData->output = NULL;
317 }
318 }
319 while (!bufFull(output) && ioFilterGroupInputSame(this));
320
321 // Scan the filter list to determine if inputSame is set or done is not set for any filter. We can't trust
322 // ioFilterGroupInputSame() when it is true without going through the loop above again. We need to scan to set this->pub.done
323 // anyway so set this->pub.inputSame in the same loop.
324 this->pub.done = true;
325 this->pub.inputSame = false;
326
327 for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
328 {
329 IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
330
331 // When inputSame then this->pub.done = false and we can exit the loop immediately
332 if (ioFilterInputSame(filterData->filter))
333 {
334 this->pub.done = false;
335 this->pub.inputSame = true;
336 break;
337 }
338
339 // Set this->pub.done = false if any filter is not done
340 if (!ioFilterDone(filterData->filter))
341 this->pub.done = false;
342 }
343
344 FUNCTION_LOG_RETURN_VOID();
345 }
346
347 /**********************************************************************************************************************************/
348 void
ioFilterGroupClose(IoFilterGroup * this)349 ioFilterGroupClose(IoFilterGroup *this)
350 {
351 FUNCTION_LOG_BEGIN(logLevelTrace);
352 FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
353 FUNCTION_LOG_END();
354
355 ASSERT(this != NULL);
356 ASSERT(this->pub.opened && !this->pub.closed);
357
358 for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
359 {
360 IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
361 const Variant *filterResult = ioFilterResult(filterData->filter);
362
363 if (this->filterResult == NULL)
364 {
365 MEM_CONTEXT_BEGIN(this->pub.memContext)
366 {
367 this->filterResult = kvNew();
368 }
369 MEM_CONTEXT_END();
370 }
371
372 MEM_CONTEXT_TEMP_BEGIN()
373 {
374 kvAdd(this->filterResult, VARSTR(ioFilterType(filterData->filter)), filterResult);
375 }
376 MEM_CONTEXT_TEMP_END();
377 }
378
379 // Filter group is open
380 #ifdef DEBUG
381 this->pub.closed = true;
382 #endif
383
384 FUNCTION_LOG_RETURN_VOID();
385 }
386
387 /**********************************************************************************************************************************/
388 Variant *
ioFilterGroupParamAll(const IoFilterGroup * this)389 ioFilterGroupParamAll(const IoFilterGroup *this)
390 {
391 FUNCTION_LOG_BEGIN(logLevelDebug);
392 FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
393 FUNCTION_LOG_END();
394
395 ASSERT(this != NULL);
396 ASSERT(!this->pub.opened);
397 ASSERT(this->pub.filterList != NULL);
398
399 VariantList *result = varLstNew();
400
401 for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
402 {
403 IoFilter *filter = ioFilterGroupGet(this, filterIdx)->filter;
404 const VariantList *paramList = ioFilterParamList(filter);
405
406 KeyValue *filterParam = kvNew();
407 kvPut(filterParam, VARSTR(ioFilterType(filter)), paramList ? varNewVarLst(paramList) : NULL);
408
409 varLstAdd(result, varNewKv(filterParam));
410 }
411
412 FUNCTION_LOG_RETURN(VARIANT, varNewVarLst(result));
413 }
414
415 /**********************************************************************************************************************************/
416 const Variant *
ioFilterGroupResult(const IoFilterGroup * this,const String * filterType)417 ioFilterGroupResult(const IoFilterGroup *this, const String *filterType)
418 {
419 FUNCTION_LOG_BEGIN(logLevelDebug);
420 FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
421 FUNCTION_LOG_PARAM(STRING, filterType);
422 FUNCTION_LOG_END();
423
424 ASSERT(this->pub.opened);
425 ASSERT(filterType != NULL);
426
427 const Variant *result = NULL;
428
429 MEM_CONTEXT_TEMP_BEGIN()
430 {
431 result = kvGet(this->filterResult, VARSTR(filterType));
432 }
433 MEM_CONTEXT_TEMP_END();
434
435 FUNCTION_LOG_RETURN_CONST(VARIANT, result);
436 }
437
438 /**********************************************************************************************************************************/
439 const Variant *
ioFilterGroupResultAll(const IoFilterGroup * this)440 ioFilterGroupResultAll(const IoFilterGroup *this)
441 {
442 FUNCTION_LOG_BEGIN(logLevelDebug);
443 FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
444 FUNCTION_LOG_END();
445
446 ASSERT(this != NULL);
447 ASSERT(this->pub.closed);
448
449 FUNCTION_LOG_RETURN_CONST(VARIANT, varNewKv(this->filterResult));
450 }
451
452 /**********************************************************************************************************************************/
453 void
ioFilterGroupResultAllSet(IoFilterGroup * this,const Variant * filterResult)454 ioFilterGroupResultAllSet(IoFilterGroup *this, const Variant *filterResult)
455 {
456 FUNCTION_LOG_BEGIN(logLevelDebug);
457 FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
458 FUNCTION_LOG_END();
459
460 ASSERT(this != NULL);
461
462 if (filterResult != NULL)
463 {
464 MEM_CONTEXT_BEGIN(this->pub.memContext)
465 {
466 this->filterResult = kvDup(varKv(filterResult));
467 }
468 MEM_CONTEXT_END();
469 }
470
471 FUNCTION_LOG_RETURN_VOID();
472 }
473
474 /**********************************************************************************************************************************/
475 String *
ioFilterGroupToLog(const IoFilterGroup * this)476 ioFilterGroupToLog(const IoFilterGroup *this)
477 {
478 return strNewFmt(
479 "{inputSame: %s, done: %s"
480 #ifdef DEBUG
481 ", opened %s, flushing %s, closed %s"
482 #endif
483 "}",
484 cvtBoolToConstZ(this->pub.inputSame), cvtBoolToConstZ(this->pub.done)
485 #ifdef DEBUG
486 , cvtBoolToConstZ(this->pub.opened), cvtBoolToConstZ(this->flushing), cvtBoolToConstZ(this->pub.closed)
487 #endif
488 );
489 }
490