1 /*------------------------------------------------------------------------------
2 *
3 * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4 * The YADIFA TM software product is provided under the BSD 3-clause license:
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of EURid nor the names of its contributors may be
16 * used to endorse or promote products derived from this software
17 * without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 *
31 *------------------------------------------------------------------------------
32 *
33 */
34
35 #include "dnscore/dnscore-config.h"
36 #include <unistd.h>
37 #include "dnscore/pipe_stream.h"
38
39 #include "dnscore/logger.h"
40
41 /** @defgroup
42 * @ingroup
43 * @brief
44 *
45 *
46 *
47 * @{
48 *
49 *----------------------------------------------------------------------------*/
50
51 #define MODULE_MSG_HANDLE g_system_logger
52
53 #define DEBUG_PIPE_OUTPUT_STREAM 0
54
55 #define OUTPUT_OPENED 1
56 #define INPUT_OPENED 2
57
58 typedef struct pipe_stream_data pipe_stream_data;
59
60 #define PIPESDTA_TAG 0x4154445345504950
61 #define PIPESBFR_TAG 0x5246425345504950
62
63 struct pipe_stream_data
64 {
65 u8* buffer;
66 u32 buffer_size;
67 u32 write_offset;
68 u32 write_avail;
69 u32 read_offset;
70 u32 read_avail;
71 u8 flags;
72 };
73
74 /*------------------------------------------------------------------------------
75 * GLOBAL VARIABLES */
76
77 /*------------------------------------------------------------------------------
78 * STATIC PROTOTYPES */
79
80 /*------------------------------------------------------------------------------
81 * FUNCTIONS */
82
83
84 static ya_result
pipe_stream_output_write(output_stream * stream,const u8 * buffer,u32 len)85 pipe_stream_output_write(output_stream* stream, const u8* buffer, u32 len)
86 {
87 if(len == 0)
88 {
89 return 0;
90 }
91
92 pipe_stream_data *data = (pipe_stream_data*)stream->data;
93
94 u32 remaining = len;
95
96 while((remaining > 0) && (data->write_avail > 0))
97 {
98 u32 chunk_len = MIN(data->buffer_size - data->write_offset, data->write_avail);
99 chunk_len = MIN(remaining, chunk_len);
100
101 #if DEBUG_PIPE_OUTPUT_STREAM
102 log_debug("pipe: w: %d bytes", len);
103 log_memdump_ex(g_system_logger, LOG_DEBUG, buffer, chunk_len, 16, OSPRINT_DUMP_ALL);
104 #endif
105
106 MEMCOPY(&data->buffer[data->write_offset], buffer, chunk_len);
107 buffer += chunk_len;
108 data->write_offset += chunk_len;
109 data->write_avail -= chunk_len;
110 data->read_avail += chunk_len;
111 if(data->write_offset == data->buffer_size)
112 {
113 data->write_offset = 0;
114 }
115 remaining -= chunk_len;
116
117 //usleep(1000);
118 }
119
120 len -= remaining;
121
122 if((len == 0) && ( (data->flags & (INPUT_OPENED|OUTPUT_OPENED)) != (INPUT_OPENED|OUTPUT_OPENED)))
123 {
124 return UNEXPECTED_EOF; // if one of the sides is closed ...
125 }
126
127 return len;
128 }
129
130 static ya_result
pipe_stream_output_flush(output_stream * stream)131 pipe_stream_output_flush(output_stream* stream)
132 {
133 (void)stream;
134 return SUCCESS;
135 }
136
137 static void
pipe_stream_output_close(output_stream * stream)138 pipe_stream_output_close(output_stream* stream)
139 {
140 pipe_stream_data* data = (pipe_stream_data*)stream->data;
141
142 data->flags &= ~OUTPUT_OPENED;
143
144 if((data->flags & INPUT_OPENED) == 0)
145 {
146 free(data->buffer);
147 free(data);
148 }
149
150 output_stream_set_void(stream);
151 }
152
153 static const output_stream_vtbl pipe_stream_output_vtbl = {
154 pipe_stream_output_write,
155 pipe_stream_output_flush,
156 pipe_stream_output_close,
157 "pipe_stream_output",
158 };
159
160
161 static ya_result
pipe_stream_input_read(input_stream * stream,void * buffer_,u32 len)162 pipe_stream_input_read(input_stream* stream, void *buffer_, u32 len)
163 {
164 if(len == 0)
165 {
166 return 0;
167 }
168
169 u8 *buffer = (u8*)buffer_;
170
171 #if DEBUG
172 memset(buffer, 0xff, len);
173 #endif
174
175 pipe_stream_data* data = (pipe_stream_data*)stream->data;
176
177 u32 remaining = len;
178
179 while((remaining > 0) && (data->read_avail > 0))
180 {
181 u32 chunk_len = MIN(data->buffer_size - data->read_offset, data->read_avail);
182 chunk_len = MIN(remaining, chunk_len);
183 MEMCOPY(buffer, &data->buffer[data->read_offset], chunk_len);
184 buffer += chunk_len;
185 data->read_offset += chunk_len;
186 data->read_avail -= chunk_len;
187 data->write_avail += chunk_len;
188
189 if(data->read_offset == data->buffer_size)
190 {
191 data->read_offset = 0;
192 }
193 remaining -= chunk_len;
194 }
195
196 return len - remaining;
197 }
198
199 static ya_result
pipe_stream_input_skip(input_stream * stream,u32 len)200 pipe_stream_input_skip(input_stream* stream, u32 len)
201 {
202 if(len == 0)
203 {
204 return 0;
205 }
206
207 pipe_stream_data* data = (pipe_stream_data*)stream->data;
208
209 u32 remaining = len;
210
211 for(;;)
212 {
213 while((remaining > 0) && (data->read_avail > 0))
214 {
215 u32 chunk_len = MIN(data->buffer_size - data->read_offset, data->read_avail);
216 chunk_len = MIN(remaining, chunk_len);
217
218 data->read_offset += chunk_len;
219 data->read_avail -= chunk_len;
220 data->write_avail += chunk_len;
221
222 if(data->read_offset == data->buffer_size)
223 {
224 data->read_offset = 0;
225 }
226 remaining -= chunk_len;
227 }
228
229 if((len != remaining) || ((data->flags & OUTPUT_OPENED) == 0))
230 {
231 break;
232 }
233
234 usleep(1000);
235 }
236
237 return len - remaining;
238 }
239
240 static void
pipe_stream_input_close(input_stream * stream)241 pipe_stream_input_close(input_stream* stream)
242 {
243 pipe_stream_data* data = (pipe_stream_data*)stream->data;
244
245 data->flags &= ~INPUT_OPENED;
246
247 if((data->flags & OUTPUT_OPENED) == 0)
248 {
249 free(data->buffer);
250 free(data);
251 }
252
253 input_stream_set_void(stream);
254 }
255
256 static const input_stream_vtbl pipe_stream_input_vtbl =
257 {
258 pipe_stream_input_read,
259 pipe_stream_input_skip,
260 pipe_stream_input_close,
261 "pipe_stream_input_stream",
262 };
263
264 /**
265 * Creates both output and input stream
266 * Writing in the output stream makes it available for the input stream
267 * This is not currently threadable.
268 *
269 * @param output
270 * @param input
271 */
272
273 void
pipe_stream_init(output_stream * output,input_stream * input,u32 buffer_size)274 pipe_stream_init(output_stream *output, input_stream *input, u32 buffer_size)
275 {
276 pipe_stream_data *data;
277 MALLOC_OBJECT_OR_DIE(data, pipe_stream_data, PIPESDTA_TAG);
278 ZEROMEMORY(data, sizeof(pipe_stream_data));
279 MALLOC_OR_DIE(u8*, data->buffer, buffer_size, PIPESBFR_TAG);
280
281 #if DEBUG
282 memset(data->buffer, 0xff, buffer_size);
283 #endif
284
285 data->buffer_size = buffer_size;
286 data->write_avail = buffer_size;
287 data->flags = OUTPUT_OPENED|INPUT_OPENED;
288 output->data = data;
289 output->vtbl = &pipe_stream_output_vtbl;
290 input->data = data;
291 input->vtbl = &pipe_stream_input_vtbl;
292 }
293
294 /**
295 *
296 * Number of available bytes in the input stream
297 *
298 * @param input
299 * @return
300 */
301
302 ya_result
pipe_stream_read_available(input_stream * input)303 pipe_stream_read_available(input_stream *input)
304 {
305 pipe_stream_data *data = (pipe_stream_data*)input->data;
306 return data->read_avail;
307 }
308
309 /**
310 *
311 * Room for bytes in the output stream
312 *
313 * @param input
314 * @return
315 */
316
317 ya_result
pipe_stream_write_available(output_stream * input)318 pipe_stream_write_available(output_stream *input)
319 {
320 pipe_stream_data *data = (pipe_stream_data*)input->data;
321 return data->write_avail;
322 }
323
324 /** @} */
325