1 /***********************************************************************************************************************************
2 IO Read Interface
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5 
6 #include <string.h>
7 
8 #include "common/debug.h"
9 #include "common/io/io.h"
10 #include "common/io/read.h"
11 #include "common/log.h"
12 #include "common/memContext.h"
13 
14 /***********************************************************************************************************************************
15 Object type
16 ***********************************************************************************************************************************/
17 struct IoRead
18 {
19     IoReadPub pub;                                                  // Publicly accessible variables
20     Buffer *input;                                                  // Input buffer
21     Buffer *output;                                                 // Internal output buffer (extra output from buffered reads)
22     size_t outputPos;                                               // Current position in the internal output buffer
23 };
24 
25 /**********************************************************************************************************************************/
26 IoRead *
ioReadNew(void * driver,IoReadInterface interface)27 ioReadNew(void *driver, IoReadInterface interface)
28 {
29     FUNCTION_LOG_BEGIN(logLevelTrace);
30         FUNCTION_LOG_PARAM_P(VOID, driver);
31         FUNCTION_LOG_PARAM(IO_READ_INTERFACE, interface);
32     FUNCTION_LOG_END();
33 
34     ASSERT(driver != NULL);
35     ASSERT(interface.read != NULL);
36 
37     IoRead *this = NULL;
38 
39     MEM_CONTEXT_NEW_BEGIN("IoRead")
40     {
41         this = memNew(sizeof(IoRead));
42 
43         *this = (IoRead)
44         {
45             .pub =
46             {
47                 .memContext = memContextCurrent(),
48                 .driver = driver,
49                 .interface = interface,
50                 .filterGroup = ioFilterGroupNew(),
51             },
52             .input = bufNew(ioBufferSize()),
53         };
54     }
55     MEM_CONTEXT_NEW_END();
56 
57     FUNCTION_LOG_RETURN(IO_READ, this);
58 }
59 
60 /**********************************************************************************************************************************/
61 bool
ioReadOpen(IoRead * this)62 ioReadOpen(IoRead *this)
63 {
64     FUNCTION_LOG_BEGIN(logLevelTrace);
65         FUNCTION_LOG_PARAM(IO_READ, this);
66     FUNCTION_LOG_END();
67 
68     ASSERT(this != NULL);
69     ASSERT(!this->pub.opened && !this->pub.closed);
70     ASSERT(ioFilterGroupSize(this->pub.filterGroup) == 0 || !ioReadBlock(this));
71 
72     // Open if the driver has an open function
73     bool result = this->pub.interface.open != NULL ? this->pub.interface.open(this->pub.driver) : true;
74 
75     // Only open the filter group if the read was opened
76     if (result)
77         ioFilterGroupOpen(this->pub.filterGroup);
78 
79 #ifdef DEBUG
80     this->pub.opened = result;
81 #endif
82 
83     FUNCTION_LOG_RETURN(BOOL, result);
84 }
85 
86 /***********************************************************************************************************************************
87 Is the driver at EOF?
88 
89 This is different from the overall eof because filters may still be holding buffered data.
90 ***********************************************************************************************************************************/
91 static bool
ioReadEofDriver(const IoRead * this)92 ioReadEofDriver(const IoRead *this)
93 {
94     FUNCTION_LOG_BEGIN(logLevelTrace);
95         FUNCTION_LOG_PARAM(IO_READ, this);
96     FUNCTION_LOG_END();
97 
98     ASSERT(this != NULL);
99     ASSERT(this->pub.opened && !this->pub.closed);
100 
101     FUNCTION_LOG_RETURN(BOOL, this->pub.interface.eof != NULL ? this->pub.interface.eof(this->pub.driver) : false);
102 }
103 
104 /**********************************************************************************************************************************/
105 static void
ioReadInternal(IoRead * this,Buffer * buffer,bool block)106 ioReadInternal(IoRead *this, Buffer *buffer, bool block)
107 {
108     FUNCTION_LOG_BEGIN(logLevelTrace);
109         FUNCTION_LOG_PARAM(IO_READ, this);
110         FUNCTION_LOG_PARAM(BUFFER, buffer);
111         FUNCTION_LOG_PARAM(BOOL, block);
112     FUNCTION_LOG_END();
113 
114     ASSERT(this != NULL);
115     ASSERT(buffer != NULL);
116     ASSERT(this->pub.opened && !this->pub.closed);
117 
118     // Loop until EOF or the output buffer is full
119     size_t bufferUsedBegin = bufUsed(buffer);
120 
121     while (!ioReadEof(this) && bufRemains(buffer) > 0)
122     {
123         // Process input buffer again to get more output
124         if (ioFilterGroupInputSame(this->pub.filterGroup))
125         {
126             ioFilterGroupProcess(this->pub.filterGroup, this->input, buffer);
127         }
128         // Else new input can be accepted
129         else
130         {
131             // Read if not EOF
132             if (this->input != NULL)
133             {
134                 if (!ioReadEofDriver(this))
135                 {
136                     bufUsedZero(this->input);
137 
138                     // If blocking then limit the amount of data requested
139                     if (ioReadBlock(this) && bufRemains(this->input) > bufRemains(buffer))
140                         bufLimitSet(this->input, bufRemains(buffer));
141 
142                     this->pub.interface.read(this->pub.driver, this->input, block);
143                     bufLimitClear(this->input);
144                 }
145                 // Set input to NULL and flush (no need to actually free the buffer here as it will be freed with the mem context)
146                 else
147                     this->input = NULL;
148             }
149 
150             // Process the input buffer (or flush if NULL)
151             if (this->input == NULL || !bufEmpty(this->input))
152                 ioFilterGroupProcess(this->pub.filterGroup, this->input, buffer);
153 
154             // Stop if not blocking -- we don't need to fill the buffer as long as we got some data
155             if (!block && bufUsed(buffer) > bufferUsedBegin)
156                 break;
157         }
158 
159         // Eof when no more input and the filter group is done
160         this->pub.eofAll = ioReadEofDriver(this) && ioFilterGroupDone(this->pub.filterGroup);
161     }
162 
163     FUNCTION_LOG_RETURN_VOID();
164 }
165 
166 /***********************************************************************************************************************************
167 Read data and use buffered line read output when present
168 ***********************************************************************************************************************************/
169 size_t
ioRead(IoRead * this,Buffer * buffer)170 ioRead(IoRead *this, Buffer *buffer)
171 {
172     FUNCTION_LOG_BEGIN(logLevelTrace);
173         FUNCTION_LOG_PARAM(IO_READ, this);
174         FUNCTION_LOG_PARAM(BUFFER, buffer);
175         FUNCTION_LOG_PARAM(BUFFER, this->output);
176     FUNCTION_LOG_END();
177 
178     ASSERT(this != NULL);
179     ASSERT(buffer != NULL);
180     ASSERT(this->pub.opened && !this->pub.closed);
181 
182     // Store size of remaining portion of buffer to calculate total read at the end
183     size_t outputRemains = bufRemains(buffer);
184 
185     // Copy any data in the internal output buffer
186     if (this->output != NULL && bufUsed(this->output) - this->outputPos > 0 && bufRemains(buffer) > 0)
187     {
188         // Internal output buffer remains taking into account the position
189         size_t outputInternalRemains = bufUsed(this->output) - this->outputPos;
190 
191         // Determine how much data should be copied
192         size_t size = outputInternalRemains > bufRemains(buffer) ? bufRemains(buffer) : outputInternalRemains;
193 
194         // Copy data to the output buffer
195         bufCatSub(buffer, this->output, this->outputPos, size);
196         this->outputPos += size;
197     }
198 
199     // Read data
200     ioReadInternal(this, buffer, true);
201 
202     FUNCTION_LOG_RETURN(SIZE, outputRemains - bufRemains(buffer));
203 }
204 
205 /**********************************************************************************************************************************/
206 size_t
ioReadSmall(IoRead * this,Buffer * buffer)207 ioReadSmall(IoRead *this, Buffer *buffer)
208 {
209     FUNCTION_TEST_BEGIN();
210         FUNCTION_TEST_PARAM(IO_READ, this);
211         FUNCTION_TEST_PARAM(BUFFER, buffer);
212     FUNCTION_TEST_END();
213 
214     ASSERT(this != NULL);
215     ASSERT(buffer != NULL);
216     ASSERT(this->pub.opened && !this->pub.closed);
217 
218     // Allocate the internal output buffer if it has not already been allocated
219     if (this->output == NULL)
220     {
221         MEM_CONTEXT_BEGIN(this->pub.memContext)
222         {
223             this->output = bufNew(ioBufferSize());
224         }
225         MEM_CONTEXT_END();
226     }
227 
228     // Store size of remaining portion of buffer to calculate total read at the end
229     size_t outputRemains = bufRemains(buffer);
230 
231     do
232     {
233         // Internal output buffer remains taking into account the position
234         size_t outputInternalRemains = bufUsed(this->output) - this->outputPos;
235 
236         // Use any data in the internal output buffer
237         if (outputInternalRemains > 0)
238         {
239             // Determine how much data should be copied
240             size_t size = outputInternalRemains > bufRemains(buffer) ? bufRemains(buffer) : outputInternalRemains;
241 
242             // Copy data to the output buffer
243             bufCatSub(buffer, this->output, this->outputPos, size);
244             this->outputPos += size;
245         }
246 
247         // If more data is required
248         if (!bufFull(buffer))
249         {
250             // If the data required is the same size as the internal output buffer then just read into the external buffer
251             if (bufRemains(buffer) >= bufSize(this->output))
252             {
253                 ioReadInternal(this, buffer, true);
254             }
255             // Else read as much data as is available. If it is not enough we will try again later.
256             else
257             {
258                 // Clear the internal output buffer since all data was copied already
259                 bufUsedZero(this->output);
260                 this->outputPos = 0;
261 
262                 ioReadInternal(this, this->output, false);
263             }
264         }
265     }
266     while (!bufFull(buffer) && !ioReadEof(this));
267 
268     FUNCTION_TEST_RETURN(outputRemains - bufRemains(buffer));
269 }
270 
271 /***********************************************************************************************************************************
272 The entire string to search for must fit within a single buffer.
273 ***********************************************************************************************************************************/
274 String *
ioReadLineParam(IoRead * this,bool allowEof)275 ioReadLineParam(IoRead *this, bool allowEof)
276 {
277     FUNCTION_LOG_BEGIN(logLevelTrace);
278         FUNCTION_LOG_PARAM(IO_READ, this);
279         FUNCTION_LOG_PARAM(BOOL, allowEof);
280     FUNCTION_LOG_END();
281 
282     ASSERT(this != NULL);
283     ASSERT(this->pub.opened && !this->pub.closed);
284 
285     // Allocate the output buffer if it has not already been allocated.  This buffer is not allocated at object creation because it
286     // is not always used.
287     if (this->output == NULL)
288     {
289         MEM_CONTEXT_BEGIN(this->pub.memContext)
290         {
291             this->output = bufNew(ioBufferSize());
292         }
293         MEM_CONTEXT_END();
294     }
295 
296     // Search for a linefeed
297     String *result = NULL;
298 
299     do
300     {
301         // Internal output buffer remains taking into account the position
302         size_t outputInternalRemains = bufUsed(this->output) - this->outputPos;
303 
304         if (outputInternalRemains > 0)
305         {
306             // Internal output buffer pointer taking into account the position
307             char *outputPtr = (char *)bufPtr(this->output) + this->outputPos;
308 
309             // Search for a linefeed in the buffer
310             char *linefeed = memchr(outputPtr, '\n', outputInternalRemains);
311 
312             // A linefeed was found so get the string
313             if (linefeed != NULL)
314             {
315                 // Get the string size
316                 size_t size = (size_t)(linefeed - outputPtr);
317 
318                 // Create the string
319                 result = strNewN(outputPtr, size);
320                 this->outputPos += size + 1;
321             }
322         }
323 
324         // Read data if no linefeed was found in the existing buffer
325         if (result == NULL)
326         {
327             // If there is remaining data left in the internal output buffer then trim off the used data
328             if (outputInternalRemains > 0)
329             {
330                 memmove(
331                     bufPtr(this->output), bufPtr(this->output) + (bufUsed(this->output) - outputInternalRemains),
332                     outputInternalRemains);
333             }
334 
335             // Set used bytes and reset position
336             bufUsedSet(this->output, outputInternalRemains);
337             this->outputPos = 0;
338 
339             // If the buffer is full then the linefeed (if it exists) is outside the buffer
340             if (bufFull(this->output))
341                 THROW_FMT(FileReadError, "unable to find line in %zu byte buffer", bufUsed(this->output));
342 
343             if (ioReadEof(this))
344             {
345                 if (allowEof)
346                     result = strNewN((char *)bufPtr(this->output), bufUsed(this->output));
347                 else
348                     THROW(FileReadError, "unexpected eof while reading line");
349             }
350             else
351                 ioReadInternal(this, this->output, false);
352         }
353     }
354     while (result == NULL);
355 
356     FUNCTION_LOG_RETURN(STRING, result);
357 }
358 
359 /**********************************************************************************************************************************/
360 bool
ioReadReady(IoRead * this,IoReadReadyParam param)361 ioReadReady(IoRead *this, IoReadReadyParam param)
362 {
363     FUNCTION_LOG_BEGIN(logLevelTrace);
364         FUNCTION_LOG_PARAM(IO_READ, this);
365         FUNCTION_LOG_PARAM(BOOL, param.error);
366     FUNCTION_LOG_END();
367 
368     ASSERT(this != NULL);
369 
370     bool result = true;
371 
372     if (this->pub.interface.ready != NULL)
373         result = this->pub.interface.ready(this->pub.driver, param.error);
374 
375     FUNCTION_LOG_RETURN(BOOL, result);
376 }
377 
378 /**********************************************************************************************************************************/
379 void
ioReadClose(IoRead * this)380 ioReadClose(IoRead *this)
381 {
382     FUNCTION_LOG_BEGIN(logLevelTrace);
383         FUNCTION_LOG_PARAM(IO_READ, this);
384     FUNCTION_LOG_END();
385 
386     ASSERT(this != NULL);
387     ASSERT(this->pub.opened && !this->pub.closed);
388 
389     // Close the filter group and gather results
390     ioFilterGroupClose(this->pub.filterGroup);
391 
392     // Close the driver if there is a close function
393     if (this->pub.interface.close != NULL)
394         this->pub.interface.close(this->pub.driver);
395 
396 #ifdef DEBUG
397     this->pub.closed = true;
398 #endif
399 
400     FUNCTION_LOG_RETURN_VOID();
401 }
402 
403 /**********************************************************************************************************************************/
404 int
ioReadFd(const IoRead * this)405 ioReadFd(const IoRead *this)
406 {
407     FUNCTION_LOG_BEGIN(logLevelTrace);
408         FUNCTION_LOG_PARAM(IO_READ, this);
409     FUNCTION_LOG_END();
410 
411     ASSERT(this != NULL);
412 
413     FUNCTION_LOG_RETURN(INT, this->pub.interface.fd == NULL ? -1 : this->pub.interface.fd(this->pub.driver));
414 }
415