1 /*------------------------------------------------------------------------------
2  *
3  * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4  * The YADIFA TM software product is provided under the BSD 3-clause license:
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *        * Redistributions in binary form must reproduce the above copyright
13  *          notice, this list of conditions and the following disclaimer in the
14  *          documentation and/or other materials provided with the distribution.
15  *        * Neither the name of EURid nor the names of its contributors may be
16  *          used to endorse or promote products derived from this software
17  *          without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  *------------------------------------------------------------------------------
32  *
33  */
34 
35 #include "dnscore/dnscore-config.h"
36 #include <unistd.h>
37 #include "dnscore/pipe_stream.h"
38 
39 #include "dnscore/logger.h"
40 
41 /** @defgroup
42  *  @ingroup
43  *  @brief
44  *
45  *
46  *
47  * @{
48  *
49  *----------------------------------------------------------------------------*/
50 
51 #define MODULE_MSG_HANDLE		g_system_logger
52 
53 #define DEBUG_PIPE_OUTPUT_STREAM 0
54 
55 #define OUTPUT_OPENED 1
56 #define INPUT_OPENED 2
57 
58 typedef struct pipe_stream_data pipe_stream_data;
59 
60 #define PIPESDTA_TAG 0x4154445345504950
61 #define PIPESBFR_TAG 0x5246425345504950
62 
63 struct pipe_stream_data
64 {
65     u8* buffer;
66     u32 buffer_size;
67     u32 write_offset;
68     u32 write_avail;
69     u32 read_offset;
70     u32 read_avail;
71     u8  flags;
72 };
73 
74 /*------------------------------------------------------------------------------
75  * GLOBAL VARIABLES */
76 
77 /*------------------------------------------------------------------------------
78  * STATIC PROTOTYPES */
79 
80 /*------------------------------------------------------------------------------
81  * FUNCTIONS */
82 
83 
84 static ya_result
pipe_stream_output_write(output_stream * stream,const u8 * buffer,u32 len)85 pipe_stream_output_write(output_stream* stream, const u8* buffer, u32 len)
86 {
87     if(len == 0)
88     {
89         return 0;
90     }
91 
92     pipe_stream_data *data = (pipe_stream_data*)stream->data;
93 
94     u32 remaining = len;
95 
96     while((remaining > 0) && (data->write_avail > 0))
97     {
98         u32 chunk_len = MIN(data->buffer_size - data->write_offset, data->write_avail);
99         chunk_len = MIN(remaining, chunk_len);
100 
101 #if DEBUG_PIPE_OUTPUT_STREAM
102         log_debug("pipe: w: %d bytes", len);
103         log_memdump_ex(g_system_logger, LOG_DEBUG, buffer, chunk_len, 16, OSPRINT_DUMP_ALL);
104 #endif
105 
106         MEMCOPY(&data->buffer[data->write_offset], buffer, chunk_len);
107         buffer += chunk_len;
108         data->write_offset += chunk_len;
109         data->write_avail -= chunk_len;
110         data->read_avail += chunk_len;
111         if(data->write_offset == data->buffer_size)
112         {
113             data->write_offset = 0;
114         }
115         remaining -= chunk_len;
116 
117         //usleep(1000);
118     }
119 
120     len -= remaining;
121 
122     if((len == 0) && ( (data->flags & (INPUT_OPENED|OUTPUT_OPENED))  != (INPUT_OPENED|OUTPUT_OPENED)))
123     {
124         return UNEXPECTED_EOF; // if one of the sides is closed ...
125     }
126 
127     return len;
128 }
129 
130 static ya_result
pipe_stream_output_flush(output_stream * stream)131 pipe_stream_output_flush(output_stream* stream)
132 {
133     (void)stream;
134     return SUCCESS;
135 }
136 
137 static void
pipe_stream_output_close(output_stream * stream)138 pipe_stream_output_close(output_stream* stream)
139 {
140     pipe_stream_data* data = (pipe_stream_data*)stream->data;
141 
142     data->flags &= ~OUTPUT_OPENED;
143 
144     if((data->flags & INPUT_OPENED) == 0)
145     {
146         free(data->buffer);
147         free(data);
148     }
149 
150     output_stream_set_void(stream);
151 }
152 
153 static const output_stream_vtbl pipe_stream_output_vtbl = {
154     pipe_stream_output_write,
155     pipe_stream_output_flush,
156     pipe_stream_output_close,
157     "pipe_stream_output",
158 };
159 
160 
161 static ya_result
pipe_stream_input_read(input_stream * stream,void * buffer_,u32 len)162 pipe_stream_input_read(input_stream* stream, void *buffer_, u32 len)
163 {
164     if(len == 0)
165     {
166         return 0;
167     }
168 
169     u8 *buffer = (u8*)buffer_;
170 
171 #if DEBUG
172     memset(buffer, 0xff, len);
173 #endif
174 
175     pipe_stream_data* data = (pipe_stream_data*)stream->data;
176 
177     u32 remaining = len;
178 
179     while((remaining > 0) && (data->read_avail > 0))
180     {
181         u32 chunk_len = MIN(data->buffer_size - data->read_offset, data->read_avail);
182         chunk_len = MIN(remaining, chunk_len);
183         MEMCOPY(buffer, &data->buffer[data->read_offset], chunk_len);
184         buffer += chunk_len;
185         data->read_offset += chunk_len;
186         data->read_avail -= chunk_len;
187         data->write_avail += chunk_len;
188 
189         if(data->read_offset == data->buffer_size)
190         {
191             data->read_offset = 0;
192         }
193         remaining -= chunk_len;
194     }
195 
196     return len - remaining;
197 }
198 
199 static ya_result
pipe_stream_input_skip(input_stream * stream,u32 len)200 pipe_stream_input_skip(input_stream* stream, u32 len)
201 {
202     if(len == 0)
203     {
204         return 0;
205     }
206 
207     pipe_stream_data* data = (pipe_stream_data*)stream->data;
208 
209     u32 remaining = len;
210 
211     for(;;)
212     {
213         while((remaining > 0) && (data->read_avail > 0))
214         {
215             u32 chunk_len = MIN(data->buffer_size - data->read_offset, data->read_avail);
216             chunk_len = MIN(remaining, chunk_len);
217 
218             data->read_offset += chunk_len;
219             data->read_avail -= chunk_len;
220             data->write_avail += chunk_len;
221 
222             if(data->read_offset == data->buffer_size)
223             {
224                 data->read_offset = 0;
225             }
226             remaining -= chunk_len;
227         }
228 
229         if((len != remaining) || ((data->flags & OUTPUT_OPENED) == 0))
230         {
231             break;
232         }
233 
234         usleep(1000);
235     }
236 
237     return len - remaining;
238 }
239 
240 static void
pipe_stream_input_close(input_stream * stream)241 pipe_stream_input_close(input_stream* stream)
242 {
243     pipe_stream_data* data = (pipe_stream_data*)stream->data;
244 
245     data->flags &= ~INPUT_OPENED;
246 
247     if((data->flags & OUTPUT_OPENED) == 0)
248     {
249         free(data->buffer);
250         free(data);
251     }
252 
253     input_stream_set_void(stream);
254 }
255 
256 static const input_stream_vtbl pipe_stream_input_vtbl =
257 {
258     pipe_stream_input_read,
259     pipe_stream_input_skip,
260     pipe_stream_input_close,
261     "pipe_stream_input_stream",
262 };
263 
264 /**
265  * Creates both output and input stream
266  * Writing in the output stream makes it available for the input stream
267  * This is not currently threadable.
268  *
269  * @param output
270  * @param input
271  */
272 
273 void
pipe_stream_init(output_stream * output,input_stream * input,u32 buffer_size)274 pipe_stream_init(output_stream *output, input_stream *input, u32 buffer_size)
275 {
276     pipe_stream_data *data;
277     MALLOC_OBJECT_OR_DIE(data, pipe_stream_data, PIPESDTA_TAG);
278     ZEROMEMORY(data, sizeof(pipe_stream_data));
279     MALLOC_OR_DIE(u8*, data->buffer, buffer_size, PIPESBFR_TAG);
280 
281 #if DEBUG
282     memset(data->buffer, 0xff, buffer_size);
283 #endif
284 
285     data->buffer_size = buffer_size;
286     data->write_avail = buffer_size;
287     data->flags       = OUTPUT_OPENED|INPUT_OPENED;
288     output->data = data;
289     output->vtbl = &pipe_stream_output_vtbl;
290     input->data = data;
291     input->vtbl = &pipe_stream_input_vtbl;
292 }
293 
294 /**
295  *
296  * Number of available bytes in the input stream
297  *
298  * @param input
299  * @return
300  */
301 
302 ya_result
pipe_stream_read_available(input_stream * input)303 pipe_stream_read_available(input_stream *input)
304 {
305     pipe_stream_data *data = (pipe_stream_data*)input->data;
306     return data->read_avail;
307 }
308 
309 /**
310  *
311  * Room for bytes in the output stream
312  *
313  * @param input
314  * @return
315  */
316 
317 ya_result
pipe_stream_write_available(output_stream * input)318 pipe_stream_write_available(output_stream *input)
319 {
320     pipe_stream_data *data = (pipe_stream_data*)input->data;
321     return data->write_avail;
322 }
323 
324 /** @} */
325