1 /* GNU Mailutils -- a suite of utilities for electronic mail
2 Copyright (C) 2009-2021 Free Software Foundation, Inc.
3
4 GNU Mailutils is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 3, or (at your option)
7 any later version.
8
9 GNU Mailutils is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with GNU Mailutils. If not, see <http://www.gnu.org/licenses/>. */
16
17 #ifdef HAVE_CONFIG_H
18 # include <config.h>
19 #endif
20 #include <string.h>
21 #include <stdlib.h>
22 #include <limits.h>
23 #ifndef SIZE_MAX
24 # define SIZE_MAX (~((size_t)0))
25 #endif
26
27 #include <mailutils/types.h>
28 #include <mailutils/alloc.h>
29 #include <mailutils/error.h>
30 #include <mailutils/errno.h>
31 #include <mailutils/nls.h>
32 #include <mailutils/stream.h>
33 #include <mailutils/cstr.h>
34 #include <mailutils/sys/stream.h>
35
36 size_t mu_stream_default_buffer_size = MU_STREAM_DEFBUFSIZ;
37
38 static inline void
_stream_event(mu_stream_t str,int code,unsigned long n,void * p)39 _stream_event (mu_stream_t str, int code, unsigned long n, void *p)
40 {
41 if (str->event_cb && (str->event_mask & _MU_STR_EVMASK (code)))
42 str->event_cb (str, code, n, p);
43 }
44
45 static inline void
_bootstrap_event(mu_stream_t str)46 _bootstrap_event (mu_stream_t str)
47 {
48 if (str->event_cb &&
49 (str->event_mask & _MU_STR_EVMASK (_MU_STR_EVENT_BOOTSTRAP)))
50 {
51 str->event_cb (str, _MU_STR_EVENT_BOOTSTRAP, 0, NULL);
52 str->event_mask &= ~_MU_STR_EVMASK (_MU_STR_EVENT_BOOTSTRAP);
53 }
54 }
55
56 static inline void
_stream_stat_incr(mu_stream_t s,int k,size_t n)57 _stream_stat_incr (mu_stream_t s, int k, size_t n)
58 {
59 if (s->statmask & MU_STREAM_STAT_MASK (k))
60 s->statbuf[k] += n;
61 }
62
63 static inline int
_stream_read(mu_stream_t str,char * buf,size_t size,size_t * pbytes)64 _stream_read (mu_stream_t str, char *buf, size_t size, size_t *pbytes)
65 {
66 int rc;
67 _stream_stat_incr (str, MU_STREAM_STAT_READS, 1);
68 rc = str->read (str, buf, size, pbytes);
69 if (rc == 0)
70 {
71 if (str->statmask & MU_STREAM_STAT_MASK (MU_STREAM_STAT_INLN))
72 str->statbuf[MU_STREAM_STAT_INLN] += mu_mem_c_count (buf, '\n', *pbytes);
73 if (str->statmask & MU_STREAM_STAT_MASK (MU_STREAM_STAT_IN8BIT))
74 str->statbuf[MU_STREAM_STAT_IN8BIT] += mu_mem_8bit_count (buf, *pbytes);
75 }
76 return rc;
77 }
78
79 static inline int
_stream_write(mu_stream_t str,char const * buf,size_t size,size_t * pbytes)80 _stream_write (mu_stream_t str, char const *buf, size_t size, size_t *pbytes)
81 {
82 int rc;
83 _stream_stat_incr (str, MU_STREAM_STAT_WRITES, 1);
84 rc = str->write (str, buf, size, pbytes);
85 if (rc == 0)
86 {
87 if (str->statmask & MU_STREAM_STAT_MASK (MU_STREAM_STAT_OUTLN))
88 str->statbuf[MU_STREAM_STAT_OUTLN] += mu_mem_c_count (buf, '\n', *pbytes);
89 if (str->statmask & MU_STREAM_STAT_MASK (MU_STREAM_STAT_OUT8BIT))
90 str->statbuf[MU_STREAM_STAT_OUT8BIT] += mu_mem_8bit_count (buf, *pbytes);
91 }
92 return rc;
93 }
94
95 static inline int
_stream_seek(mu_stream_t str,mu_off_t pos,mu_off_t * poff)96 _stream_seek (mu_stream_t str, mu_off_t pos, mu_off_t *poff)
97 {
98 _stream_stat_incr (str, MU_STREAM_STAT_SEEKS, 1);
99 return str->seek (str, pos, poff);
100 }
101
102 static int _stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
103 int full_read, size_t *pnread);
104 static int _stream_write_unbuffered (mu_stream_t stream,
105 const void *buf, size_t size,
106 int full_write, size_t *pnwritten);
107
108 static inline void
_stream_setflag(struct _mu_stream * stream,int flag)109 _stream_setflag (struct _mu_stream *stream, int flag)
110 {
111 _stream_event (stream, _MU_STR_EVENT_SETFLAG, flag, NULL);
112 stream->flags |= flag;
113 }
114
115 static inline void
_stream_clrflag(struct _mu_stream * stream,int flag)116 _stream_clrflag (struct _mu_stream *stream, int flag)
117 {
118 _stream_event (stream, _MU_STR_EVENT_CLRFLAG, flag, NULL);
119 stream->flags &= ~flag;
120 }
121
122 int
mu_stream_seterr(struct _mu_stream * stream,int code,int perm)123 mu_stream_seterr (struct _mu_stream *stream, int code, int perm)
124 {
125 stream->last_err = code;
126 switch (code)
127 {
128 case 0:
129 case EAGAIN:
130 case EINTR:
131 case ENOSYS:
132 case EINPROGRESS:
133 break;
134
135 default:
136 if (perm)
137 _stream_setflag (stream, _MU_STR_ERR);
138 }
139 return code;
140 }
141
142 void
_mu_stream_cleareof(mu_stream_t str)143 _mu_stream_cleareof (mu_stream_t str)
144 {
145 _stream_clrflag (str, _MU_STR_EOF);
146 }
147
148 void
_mu_stream_seteof(mu_stream_t str)149 _mu_stream_seteof (mu_stream_t str)
150 {
151 _stream_setflag (str, _MU_STR_EOF);
152 }
153
154 #define _stream_curp(s) ((s)->buffer + (s)->pos)
155
156 static int
_stream_fill_buffer(struct _mu_stream * stream)157 _stream_fill_buffer (struct _mu_stream *stream)
158 {
159 size_t n;
160 size_t rdn;
161 int rc = 0;
162 char c;
163
164 switch (stream->buftype)
165 {
166 case mu_buffer_none:
167 return 0;
168
169 case mu_buffer_full:
170 rc = _stream_read_unbuffered (stream,
171 stream->buffer, stream->bufsize,
172 0,
173 &stream->level);
174 break;
175
176 case mu_buffer_line:
177 for (n = 0;
178 n < stream->bufsize
179 && (rc = _stream_read_unbuffered (stream,
180 &c, 1,
181 0, &rdn)) == 0;)
182 {
183 if (rdn == 0)
184 {
185 _stream_setflag (stream, _MU_STR_EOF);
186 break;
187 }
188 stream->buffer[n++] = c;
189 if (c == '\n')
190 break;
191 }
192 stream->level = n;
193 break;
194 }
195 if (rc == 0)
196 {
197 stream->pos = 0;
198 _stream_event (stream, _MU_STR_EVENT_FILLBUF,
199 stream->level, _stream_curp (stream));
200 }
201 return rc;
202 }
203
204 enum
205 {
206 FLUSH_WRITE, /* Flush only modified data. Keep buffer level and position
207 intact */
208 FLUSH_RDWR /* Flush modified data and reset buffer level and position */
209 };
210
211 static int
_stream_flush_buffer(struct _mu_stream * stream,int what)212 _stream_flush_buffer (struct _mu_stream *stream, int what)
213 {
214 int rc;
215
216 if (stream->flags & _MU_STR_DIRTY)
217 {
218 if ((stream->flags & MU_STREAM_SEEK) && stream->seek)
219 {
220 mu_off_t off;
221 rc = _stream_seek (stream, stream->offset, &off);
222 if (rc)
223 return rc;
224 }
225
226 if ((rc = _stream_write_unbuffered (stream, stream->buffer,
227 stream->level,
228 1, NULL)))
229 return rc;
230 _stream_event (stream, _MU_STR_EVENT_FLUSHBUF,
231 stream->level, stream->buffer);
232 _stream_clrflag (stream, _MU_STR_DIRTY);
233
234 if (stream->pos < stream->level)
235 memmove (stream->buffer, stream->buffer + stream->pos,
236 stream->level - stream->pos);
237 stream->offset += stream->pos;
238 stream->level -= stream->pos;
239 stream->pos = 0;
240 }
241
242 if (what)
243 {
244 stream->offset += stream->level;
245 stream->pos = stream->level = 0;
246 }
247
248 return 0;
249 }
250
251
252 mu_stream_t
_mu_stream_create(size_t size,int flags)253 _mu_stream_create (size_t size, int flags)
254 {
255 struct _mu_stream *str;
256 if (size < sizeof (str))
257 abort ();
258 str = mu_zalloc (size);
259 str->flags = flags & ~(_MU_STR_INTERN_MASK & ~_MU_STR_OPEN);
260 mu_stream_ref (str);
261 return str;
262 }
263
264 void
mu_stream_destroy(mu_stream_t * pstream)265 mu_stream_destroy (mu_stream_t *pstream)
266 {
267 if (pstream)
268 {
269 mu_stream_t str = *pstream;
270 if (str && (str->ref_count == 0 || --str->ref_count == 0))
271 {
272 mu_stream_close (str);
273 if (str->buftype != mu_buffer_none)
274 {
275 free (str->buffer);
276 str->buffer = NULL;
277 str->buftype = mu_buffer_none;
278 }
279 if (str->done)
280 str->done (str);
281 if (str->destroy)
282 str->destroy (str);
283 else
284 free (str);
285 *pstream = NULL;
286 }
287 }
288 }
289
290 void
mu_stream_get_flags(mu_stream_t str,int * pflags)291 mu_stream_get_flags (mu_stream_t str, int *pflags)
292 {
293 *pflags = str->flags & ~_MU_STR_INTERN_MASK;
294 }
295
296 void
mu_stream_ref(mu_stream_t stream)297 mu_stream_ref (mu_stream_t stream)
298 {
299 stream->ref_count++;
300 }
301
302 void
mu_stream_unref(mu_stream_t stream)303 mu_stream_unref (mu_stream_t stream)
304 {
305 mu_stream_destroy (&stream);
306 }
307
308 static void
_stream_init(mu_stream_t stream)309 _stream_init (mu_stream_t stream)
310 {
311 if (stream->statmask)
312 memset (stream->statbuf, 0,
313 _MU_STREAM_STAT_MAX * sizeof (stream->statbuf[0]));
314 stream->flags &= ~_MU_STR_INTERN_MASK;
315 _stream_setflag (stream, _MU_STR_OPEN);
316 stream->offset = 0;
317 stream->level = stream->pos = 0;
318 stream->last_err = 0;
319 }
320
321 int
mu_stream_open(mu_stream_t stream)322 mu_stream_open (mu_stream_t stream)
323 {
324 int rc;
325
326 if (stream->flags & _MU_STR_OPEN)
327 return MU_ERR_OPEN;
328 _bootstrap_event (stream);
329 if (stream->open)
330 {
331 if ((rc = stream->open (stream)))
332 return mu_stream_seterr (stream, rc, 1);
333 }
334 _stream_init (stream);
335 if ((stream->flags & (MU_STREAM_APPEND|MU_STREAM_SEEK)) ==
336 (MU_STREAM_APPEND|MU_STREAM_SEEK) &&
337 (rc = mu_stream_seek (stream, 0, MU_SEEK_END, NULL)))
338 return mu_stream_seterr (stream, rc, 1);
339 return 0;
340 }
341
342 const char *
mu_stream_strerror(mu_stream_t stream,int rc)343 mu_stream_strerror (mu_stream_t stream, int rc)
344 {
345 const char *str;
346
347 if (!stream)
348 return mu_strerror (rc);
349 if (stream->error_string)
350 str = stream->error_string (stream, rc);
351 else
352 str = mu_strerror (rc);
353 return str;
354 }
355
356 int
mu_stream_err(mu_stream_t stream)357 mu_stream_err (mu_stream_t stream)
358 {
359 return stream->flags & _MU_STR_ERR;
360 }
361
362 int
mu_stream_last_error(mu_stream_t stream)363 mu_stream_last_error (mu_stream_t stream)
364 {
365 return stream->last_err;
366 }
367
368 void
mu_stream_clearerr(mu_stream_t stream)369 mu_stream_clearerr (mu_stream_t stream)
370 {
371 stream->last_err = 0;
372 _stream_clrflag (stream, _MU_STR_ERR);
373 }
374
375 int
mu_stream_eof(mu_stream_t stream)376 mu_stream_eof (mu_stream_t stream)
377 {
378 return (stream->flags & _MU_STR_EOF) && (stream->pos == stream->level);
379 }
380
381 int
mu_stream_is_open(mu_stream_t stream)382 mu_stream_is_open (mu_stream_t stream)
383 {
384 return stream->flags & _MU_STR_OPEN;
385 }
386
387 int
mu_stream_seek(mu_stream_t stream,mu_off_t offset,int whence,mu_off_t * pres)388 mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
389 mu_off_t *pres)
390 {
391 int rc;
392 mu_off_t size;
393
394 _bootstrap_event (stream);
395 if (!(stream->flags & _MU_STR_OPEN))
396 {
397 if (stream->open)
398 return MU_ERR_NOT_OPEN;
399 _stream_init (stream);
400 }
401
402 if (!stream->seek)
403 return mu_stream_seterr (stream, ENOSYS, 0);
404
405 if (!(stream->flags & MU_STREAM_SEEK))
406 return mu_stream_seterr (stream, EACCES, 1);
407
408 switch (whence)
409 {
410 case MU_SEEK_SET:
411 break;
412
413 case MU_SEEK_CUR:
414 if (offset == 0)
415 {
416 *pres = stream->offset + stream->pos;
417 return 0;
418 }
419 offset += stream->offset + stream->pos;
420 break;
421
422 case MU_SEEK_END:
423 rc = mu_stream_size (stream, &size);
424 if (rc)
425 return rc;
426 offset += size;
427 break;
428
429 default:
430 return mu_stream_seterr (stream, EINVAL, 1);
431 }
432
433 if (!(stream->buftype == mu_buffer_none ?
434 (offset == stream->offset)
435 : (stream->offset <= offset &&
436 offset < stream->offset + stream->level)))
437 {
438 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
439 return rc;
440 if (stream->offset != offset)
441 {
442 rc = _stream_seek (stream, offset, &stream->offset);
443 if (rc == ESPIPE)
444 return rc;
445 if (rc)
446 return mu_stream_seterr (stream, rc, 1);
447 }
448 _mu_stream_cleareof (stream);
449 }
450 else if (stream->buftype != mu_buffer_none)
451 stream->pos = offset - stream->offset;
452
453 _mu_stream_cleareof (stream);
454
455 if (pres)
456 *pres = stream->offset + stream->pos;
457 return 0;
458 }
459
460 /* Skip COUNT bytes from the current position in stream by reading from
461 it. Return new offset in PRES.
462
463 Return 0 on success, EACCES if STREAM was not opened for reading.
464 Another non-zero exit codes are propagated from the underlying
465 input operations.
466
467 This function is designed to help implement seek method in otherwise
468 unseekable streams (such as filters). Do not use it unless you absolutely
469 have to. Using it on an unbuffered stream is a terrible waste of CPU. */
470 static int
_stream_skip_input_bytes(mu_stream_t stream,mu_off_t count,mu_off_t * pres)471 _stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
472 {
473 mu_off_t pos;
474 int rc = 0;
475
476 if (!(stream->flags & MU_STREAM_READ))
477 return mu_stream_seterr (stream, EACCES, 1);
478
479 if (count)
480 {
481 if (stream->buftype == mu_buffer_none)
482 {
483 for (pos = 0; pos < count; pos++)
484 {
485 char c;
486 size_t nrd;
487 rc = mu_stream_read (stream, &c, 1, &nrd);
488 if (nrd == 0)
489 rc = ESPIPE;
490 if (rc)
491 break;
492 }
493 }
494 else
495 {
496 for (pos = 0;;)
497 {
498 if (pos || stream->level == 0)
499 {
500 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
501 return rc;
502 rc = _stream_fill_buffer (stream);
503 if (rc)
504 break;
505 if (stream->level == 0)
506 {
507 rc = ESPIPE;
508 break;
509 }
510 }
511 if (pos <= count && count < pos + stream->level)
512 {
513 stream->pos = count - pos;
514 rc = 0;
515 break;
516 }
517 pos += stream->level;
518 }
519 }
520 }
521
522 if (pres)
523 *pres = stream->offset + stream->pos;
524 return rc;
525 }
526
527 /* A wrapper for the above function. It is normally called from a
528 seek method implementation, so it makes sure the MU_STREAM_SEEK
529 is cleared while in _stream_skip_input_bytes, to avoid infitite
530 recursion that may be triggered by _stream_flush_buffer invoking
531 stream->seek. */
532 int
mu_stream_skip_input_bytes(mu_stream_t stream,mu_off_t count,mu_off_t * pres)533 mu_stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
534 {
535 int rc;
536 int seek_flag = stream->flags & MU_STREAM_SEEK;
537 stream->flags &= ~MU_STREAM_SEEK;
538 rc = _stream_skip_input_bytes (stream, count, pres);
539 stream->flags |= seek_flag;
540 return rc;
541 }
542
543 int
mu_stream_set_buffer(mu_stream_t stream,enum mu_buffer_type type,size_t size)544 mu_stream_set_buffer (mu_stream_t stream, enum mu_buffer_type type,
545 size_t size)
546 {
547 _bootstrap_event (stream);
548
549 if (size == 0)
550 size = mu_stream_default_buffer_size;
551
552 if (stream->setbuf_hook)
553 {
554 int rc = stream->setbuf_hook (stream, type, size);
555 if (rc)
556 return rc;
557 }
558
559 if (stream->buffer)
560 {
561 mu_stream_flush (stream);
562 free (stream->buffer);
563 }
564
565 stream->buftype = type;
566 if (type == mu_buffer_none)
567 {
568 stream->buffer = NULL;
569 return 0;
570 }
571
572 stream->buffer = malloc (size);
573 if (stream->buffer == NULL)
574 {
575 stream->buftype = mu_buffer_none;
576 return mu_stream_seterr (stream, ENOMEM, 1);
577 }
578 stream->bufsize = size;
579 stream->pos = 0;
580 stream->level = 0;
581
582 return 0;
583 }
584
585 int
mu_stream_get_buffer(mu_stream_t stream,struct mu_buffer_query * qry)586 mu_stream_get_buffer (mu_stream_t stream, struct mu_buffer_query *qry)
587 {
588 qry->buftype = stream->buftype;
589 qry->bufsize = stream->bufsize;
590 return 0;
591 }
592
593 static int
_stream_read_unbuffered(mu_stream_t stream,void * buf,size_t size,int full_read,size_t * pnread)594 _stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
595 int full_read, size_t *pnread)
596 {
597 int rc;
598 size_t nread;
599
600 if (!stream->read)
601 return mu_stream_seterr (stream, ENOSYS, 0);
602
603 if (!(stream->flags & MU_STREAM_READ))
604 return mu_stream_seterr (stream, EACCES, 1);
605
606 if (stream->flags & _MU_STR_ERR)
607 return stream->last_err;
608
609 if (mu_stream_eof (stream) || size == 0)
610 {
611 if (pnread)
612 *pnread = 0;
613 return 0;
614 }
615
616 if (full_read)
617 {
618 size_t rdbytes;
619
620 nread = 0;
621 while (size > 0
622 && (rc = _stream_read (stream, buf, size, &rdbytes)) == 0)
623 {
624 if (rdbytes == 0)
625 {
626 _stream_setflag (stream, _MU_STR_EOF);
627 break;
628 }
629 buf += rdbytes;
630 nread += rdbytes;
631 size -= rdbytes;
632 _stream_stat_incr (stream, MU_STREAM_STAT_IN, rdbytes);
633 }
634 if (size && rc)
635 rc = mu_stream_seterr (stream, rc, 0);
636 }
637 else
638 {
639 rc = _stream_read (stream, buf, size, &nread);
640 if (rc == 0)
641 {
642 if (nread == 0)
643 _stream_setflag (stream, _MU_STR_EOF);
644 _stream_stat_incr (stream, MU_STREAM_STAT_IN, nread);
645 }
646 mu_stream_seterr (stream, rc, rc != 0);
647 }
648 if (rc == 0 && pnread)
649 *pnread = nread;
650
651 return rc;
652 }
653
654 static int
_stream_write_unbuffered(mu_stream_t stream,const void * buf,size_t size,int full_write,size_t * pnwritten)655 _stream_write_unbuffered (mu_stream_t stream,
656 const void *buf, size_t size,
657 int full_write, size_t *pnwritten)
658 {
659 int rc;
660 size_t nwritten;
661
662 if (!stream->write)
663 return mu_stream_seterr (stream, ENOSYS, 0);
664
665 if (!(stream->flags & (MU_STREAM_WRITE|MU_STREAM_APPEND)))
666 return mu_stream_seterr (stream, EACCES, 1);
667
668 if (stream->flags & _MU_STR_ERR)
669 return stream->last_err;
670
671 if (size == 0)
672 {
673 if (pnwritten)
674 *pnwritten = 0;
675 return 0;
676 }
677
678 if (full_write)
679 {
680 size_t wrbytes;
681 const char *bufp = buf;
682
683 nwritten = 0;
684 while (size > 0
685 && (rc = _stream_write (stream, bufp, size, &wrbytes))
686 == 0)
687 {
688 if (wrbytes == 0)
689 {
690 rc = EIO;
691 break;
692 }
693 bufp += wrbytes;
694 nwritten += wrbytes;
695 size -= wrbytes;
696 _stream_stat_incr (stream, MU_STREAM_STAT_OUT, wrbytes);
697 }
698 }
699 else
700 {
701 rc = _stream_write (stream, buf, size, &nwritten);
702 if (rc == 0)
703 _stream_stat_incr (stream, MU_STREAM_STAT_OUT, nwritten);
704 }
705 _stream_setflag (stream, _MU_STR_WRT);
706 if (pnwritten)
707 *pnwritten = nwritten;
708 mu_stream_seterr (stream, rc, rc != 0);
709 return rc;
710 }
711
712 int
mu_stream_read(mu_stream_t stream,void * buf,size_t size,size_t * pread)713 mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread)
714 {
715 _bootstrap_event (stream);
716
717 if (!(stream->flags & _MU_STR_OPEN))
718 {
719 if (stream->open)
720 return MU_ERR_NOT_OPEN;
721 _stream_init (stream);
722 }
723
724 if (stream->buftype == mu_buffer_none)
725 {
726 size_t nread = 0;
727 int rc = _stream_read_unbuffered (stream, buf, size, !pread, &nread);
728 stream->offset += nread;
729 if (pread)
730 *pread = nread;
731 return rc;
732 }
733 else
734 {
735 char *bufp = buf;
736 size_t nbytes = 0;
737 int rc;
738
739 while (size)
740 {
741 size_t n;
742
743 if (stream->pos == stream->level)
744 {
745 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
746 {
747 if (nbytes)
748 break;
749 return rc;
750 }
751 if ((rc = _stream_fill_buffer (stream)))
752 {
753 if (nbytes)
754 break;
755 return rc;
756 }
757 if (stream->level == 0)
758 break;
759 }
760
761 n = size;
762 if (n > stream->level - stream->pos)
763 n = stream->level - stream->pos;
764 memcpy (bufp, _stream_curp (stream), n);
765 stream->pos += n;
766 nbytes += n;
767 bufp += n;
768 size -= n;
769 if (stream->buftype == mu_buffer_line && bufp[-1] == '\n')
770 break;
771 }
772
773 if (pread)
774 *pread = nbytes;
775 }
776 return 0;
777 }
778
779 int
_stream_scandelim(mu_stream_t stream,char * buf,size_t size,int delim,size_t * pnread)780 _stream_scandelim (mu_stream_t stream, char *buf, size_t size, int delim,
781 size_t *pnread)
782 {
783 int rc = 0;
784 size_t nread = 0;
785
786 size--;
787 if (size == 0)
788 return MU_ERR_BUFSPACE;
789 while (size)
790 {
791 char *p, *q;
792 size_t len;
793
794 if (stream->pos == stream->level)
795 {
796 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
797 break;
798 if ((rc = _stream_fill_buffer (stream)) || stream->level == 0)
799 break;
800 }
801
802 q = _stream_curp (stream);
803 len = stream->level - stream->pos;
804 p = memchr (q, delim, len);
805 if (p)
806 len = p - q + 1;
807 if (len > size)
808 len = size;
809 memcpy (buf, _stream_curp (stream), len);
810 stream->pos += len;
811 buf += len;
812 size -= len;
813 nread += len;
814 if (p) /* Delimiter found */
815 break;
816 }
817 *buf = 0;
818 *pnread = nread;
819 return rc;
820 }
821
822 static int
_stream_readdelim(mu_stream_t stream,char * buf,size_t size,int delim,size_t * pread)823 _stream_readdelim (mu_stream_t stream, char *buf, size_t size,
824 int delim, size_t *pread)
825 {
826 int rc;
827 char c;
828 size_t n = 0, rdn;
829
830 size--;
831 if (size == 0)
832 return MU_ERR_BUFSPACE;
833 for (n = 0;
834 n < size && (rc = mu_stream_read (stream, &c, 1, &rdn)) == 0 && rdn;)
835 {
836 *buf++ = c;
837 n++;
838 if (c == delim)
839 break;
840 }
841 *buf = 0;
842 if (pread)
843 *pread = n;
844 return rc;
845 }
846
847 int
mu_stream_readdelim(mu_stream_t stream,char * buf,size_t size,int delim,size_t * pread)848 mu_stream_readdelim (mu_stream_t stream, char *buf, size_t size,
849 int delim, size_t *pread)
850 {
851 int rc;
852
853 _bootstrap_event (stream);
854
855 if (size == 0)
856 return EINVAL;
857
858 if (!(stream->flags & _MU_STR_OPEN))
859 {
860 if (stream->open)
861 return MU_ERR_NOT_OPEN;
862 _stream_init (stream);
863 }
864
865 if (stream->buftype == mu_buffer_none)
866 {
867 rc = _stream_readdelim (stream, buf, size, delim, pread);
868 }
869 else
870 {
871 if ((rc = _stream_flush_buffer (stream, FLUSH_WRITE)))
872 return rc;
873 rc = _stream_scandelim (stream, buf, size, delim, pread);
874 }
875 return rc;
876 }
877
878 int
mu_stream_readline(mu_stream_t stream,char * buf,size_t size,size_t * pread)879 mu_stream_readline (mu_stream_t stream, char *buf, size_t size, size_t *pread)
880 {
881 return mu_stream_readdelim (stream, buf, size, '\n', pread);
882 }
883
884 int
mu_stream_getdelim(mu_stream_t stream,char ** pbuf,size_t * psize,int delim,size_t * pread)885 mu_stream_getdelim (mu_stream_t stream, char **pbuf, size_t *psize,
886 int delim, size_t *pread)
887 {
888 int rc;
889 char *lineptr = *pbuf;
890 size_t n = *psize;
891 size_t cur_len = 0;
892
893 _bootstrap_event (stream);
894
895 if (!(stream->flags & _MU_STR_OPEN))
896 {
897 if (stream->open)
898 return MU_ERR_NOT_OPEN;
899 _stream_init (stream);
900 }
901
902 if ((rc = _stream_flush_buffer (stream, FLUSH_WRITE)))
903 return rc;
904
905 if (lineptr == NULL || n == 0)
906 {
907 char *new_lineptr;
908 n = 120;
909 new_lineptr = realloc (lineptr, n);
910 if (new_lineptr == NULL)
911 return ENOMEM;
912 lineptr = new_lineptr;
913 }
914
915 for (;;)
916 {
917 size_t rdn;
918
919 /* Make enough space for len+1 (for final NUL) bytes. */
920 if (cur_len + 1 >= n)
921 {
922 size_t needed_max =
923 SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX;
924 size_t needed = 2 * n + 1; /* Be generous. */
925 char *new_lineptr;
926
927 if (needed_max < needed)
928 needed = needed_max;
929 if (cur_len + 1 >= needed)
930 {
931 rc = EOVERFLOW;
932 break;
933 }
934
935 new_lineptr = realloc (lineptr, needed);
936 if (new_lineptr == NULL)
937 {
938 rc = ENOMEM;
939 break;
940 }
941
942 lineptr = new_lineptr;
943 n = needed;
944 }
945
946 if (stream->buftype == mu_buffer_none)
947 rc = _stream_readdelim (stream, lineptr + cur_len, n - cur_len, delim,
948 &rdn);
949 else
950 rc = _stream_scandelim (stream, lineptr + cur_len, n - cur_len, delim,
951 &rdn);
952
953 if (rc || rdn == 0)
954 break;
955 cur_len += rdn;
956
957 if (lineptr[cur_len - 1] == delim)
958 break;
959 }
960 lineptr[cur_len] = '\0';
961
962 *pbuf = lineptr;
963 *psize = n;
964
965 if (pread)
966 *pread = cur_len;
967 return rc;
968 }
969
970 int
mu_stream_getline(mu_stream_t stream,char ** pbuf,size_t * psize,size_t * pread)971 mu_stream_getline (mu_stream_t stream, char **pbuf, size_t *psize,
972 size_t *pread)
973 {
974 return mu_stream_getdelim (stream, pbuf, psize, '\n', pread);
975 }
976
977 /* Return 1 if no more data can be written to the current buffer. */
978 static inline int
_stream_buffer_full_p(struct _mu_stream * stream)979 _stream_buffer_full_p (struct _mu_stream *stream)
980 {
981 /* This function should be called only for buffered streams */
982 if (stream->buftype == mu_buffer_none)
983 return 0;
984
985 if (stream->bufsize == stream->pos)
986 /* No space left in buffer */
987 return 1;
988
989 /* For line buffering, the buffer is flushed immediately after
990 receiving a newline character. */
991 if (stream->buftype == mu_buffer_line &&
992 stream->pos > 0 &&
993 memchr (stream->buffer, '\n', stream->pos) != NULL)
994 return 1;
995
996 return 0;
997 }
998
999 int
mu_stream_write(mu_stream_t stream,const void * buf,size_t size,size_t * pnwritten)1000 mu_stream_write (mu_stream_t stream, const void *buf, size_t size,
1001 size_t *pnwritten)
1002 {
1003 int rc = 0;
1004
1005 _bootstrap_event (stream);
1006
1007 if (!(stream->flags & _MU_STR_OPEN))
1008 {
1009 if (stream->open)
1010 return MU_ERR_NOT_OPEN;
1011 _stream_init (stream);
1012 }
1013
1014 if (stream->buftype == mu_buffer_none)
1015 {
1016 size_t nwritten;
1017 rc = _stream_write_unbuffered (stream, buf, size,
1018 !pnwritten, &nwritten);
1019 stream->offset += nwritten;
1020 if (pnwritten)
1021 *pnwritten = nwritten;
1022 }
1023 else
1024 {
1025 size_t nbytes = 0;
1026 const char *bufp = buf;
1027
1028 while (1)
1029 {
1030 size_t n;
1031
1032 if (_stream_buffer_full_p (stream))
1033 {
1034 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
1035 break;
1036 }
1037
1038 if (size == 0)
1039 break;
1040
1041 n = stream->bufsize - stream->pos;
1042 if (n > size)
1043 n = size;
1044 memcpy (_stream_curp (stream), bufp, n);
1045 stream->pos += n;
1046 if (stream->pos > stream->level)
1047 stream->level = stream->pos;
1048
1049 nbytes += n;
1050 bufp += n;
1051 size -= n;
1052 _stream_setflag (stream, _MU_STR_DIRTY);
1053 }
1054 if (pnwritten)
1055 *pnwritten = nbytes;
1056 else if (nbytes < size)
1057 {
1058 rc = EIO;
1059 mu_stream_seterr (stream, rc, 1);
1060 }
1061 }
1062 return rc;
1063 }
1064
1065 int
mu_stream_writeline(mu_stream_t stream,const char * buf,size_t size)1066 mu_stream_writeline (mu_stream_t stream, const char *buf, size_t size)
1067 {
1068 int rc;
1069 if ((rc = mu_stream_write (stream, buf, size, NULL)) == 0)
1070 rc = mu_stream_write (stream, "\r\n", 2, NULL);
1071 return rc;
1072 }
1073
1074 int
mu_stream_flush(mu_stream_t stream)1075 mu_stream_flush (mu_stream_t stream)
1076 {
1077 int rc;
1078
1079 if (!stream)
1080 return EINVAL;
1081 _bootstrap_event (stream);
1082 if (!(stream->flags & _MU_STR_OPEN))
1083 {
1084 if (stream->open)
1085 return MU_ERR_NOT_OPEN;
1086 _stream_init (stream);
1087 }
1088 rc = _stream_flush_buffer (stream, FLUSH_RDWR);
1089 if (rc)
1090 return rc;
1091 if ((stream->flags & _MU_STR_WRT) && stream->flush)
1092 return stream->flush (stream);
1093 _stream_clrflag (stream, _MU_STR_WRT);
1094 return 0;
1095 }
1096
1097 int
mu_stream_close(mu_stream_t stream)1098 mu_stream_close (mu_stream_t stream)
1099 {
1100 int rc = 0;
1101
1102 if (!stream)
1103 return EINVAL;
1104 if (!(stream->flags & _MU_STR_OPEN))
1105 return MU_ERR_NOT_OPEN;
1106
1107 rc = mu_stream_flush (stream);
1108 if (rc == 0)
1109 {
1110 /* Do close the stream only if it is not used by anyone else */
1111 if (stream->ref_count > 1)
1112 return 0;
1113 _stream_event (stream, _MU_STR_EVENT_CLOSE, 0, NULL);
1114 if (stream->close)
1115 rc = stream->close (stream);
1116 if (rc == 0)
1117 _stream_clrflag (stream, _MU_STR_OPEN);
1118 }
1119 return rc;
1120 }
1121
1122 int
mu_stream_size(mu_stream_t stream,mu_off_t * psize)1123 mu_stream_size (mu_stream_t stream, mu_off_t *psize)
1124 {
1125 int rc;
1126 mu_off_t size;
1127
1128 _bootstrap_event (stream);
1129 if (!(stream->flags & _MU_STR_OPEN))
1130 {
1131 if (stream->open)
1132 return MU_ERR_NOT_OPEN;
1133 _stream_init (stream);
1134 }
1135 if (!stream->size)
1136 return mu_stream_seterr (stream, ENOSYS, 0);
1137 rc = stream->size (stream, &size);
1138 if (rc == 0)
1139 {
1140 if (stream->buftype != mu_buffer_none)
1141 {
1142 size_t n = stream->offset + stream->level;
1143 if (n > size)
1144 size = n;
1145 }
1146 *psize = size;
1147 }
1148 return mu_stream_seterr (stream, rc, rc != 0);
1149 }
1150
1151 int
mu_stream_ioctl(mu_stream_t stream,int family,int opcode,void * ptr)1152 mu_stream_ioctl (mu_stream_t stream, int family, int opcode, void *ptr)
1153 {
1154 int rc;
1155 _bootstrap_event (stream);
1156 if ((rc = _stream_flush_buffer (stream, FLUSH_WRITE)))
1157 return rc;
1158 if (stream->ctl == NULL)
1159 return ENOSYS;
1160 return stream->ctl (stream, family, opcode, ptr);
1161 }
1162
1163 int
mu_stream_wait(mu_stream_t stream,int * pflags,struct timeval * tvp)1164 mu_stream_wait (mu_stream_t stream, int *pflags, struct timeval *tvp)
1165 {
1166 int flg = 0;
1167
1168 if (stream == NULL)
1169 return EINVAL;
1170 _bootstrap_event (stream);
1171 #if 0
1172 /* NOTE: Sometimes mu_stream_wait is called after a failed mu_stream_open.
1173 In particular, this is needed for a TCP stream opened with a
1174 MU_STREAM_NONBLOCK flag (see examples/http.c). Until a better
1175 solution is found, this check is commented out. */
1176 if (!(stream->flags & _MU_STR_OPEN))
1177 {
1178 if (stream->open)
1179 return MU_ERR_NOT_OPEN;
1180 _stream_init (stream);
1181 }
1182 #endif
1183 /* Take into account if we have any buffering. */
1184 /* FIXME: How about MU_STREAM_READY_WR? */
1185 if ((*pflags) & MU_STREAM_READY_RD
1186 && stream->buftype != mu_buffer_none
1187 && stream->pos < stream->level)
1188 {
1189 flg = MU_STREAM_READY_RD;
1190 *pflags &= ~MU_STREAM_READY_RD;
1191 }
1192
1193 if (stream->wait)
1194 {
1195 int rc;
1196
1197 if (flg && *pflags == 0)
1198 /* Don't call wait method if our modifications (see above) resulted
1199 in empty *pflags. */
1200 rc = 0;
1201 else
1202 rc = stream->wait (stream, pflags, tvp);
1203 if (rc == 0)
1204 *pflags |= flg;
1205 return rc;
1206 }
1207
1208 return ENOSYS;
1209 }
1210
1211 int
mu_stream_truncate(mu_stream_t stream,mu_off_t size)1212 mu_stream_truncate (mu_stream_t stream, mu_off_t size)
1213 {
1214 _bootstrap_event (stream);
1215
1216 if (!(stream->flags & _MU_STR_OPEN))
1217 {
1218 if (stream->open)
1219 return MU_ERR_NOT_OPEN;
1220 _stream_init (stream);
1221 }
1222
1223 if (stream->truncate)
1224 {
1225 int rc;
1226
1227 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
1228 return rc;
1229 if (stream->offset > size)
1230 {
1231 stream->offset = size;
1232 stream->level = 0;
1233 stream->pos = 0;
1234 }
1235 else if (stream->offset + stream->pos > size)
1236 {
1237 stream->pos = size - stream->offset;
1238 stream->level = stream->pos;
1239 }
1240 else if (stream->offset + stream->level > size)
1241 {
1242 stream->level = size - stream->offset;
1243 if (stream->pos > stream->level)
1244 stream->pos = stream->level;
1245 }
1246 return stream->truncate (stream, size);
1247 }
1248 return ENOSYS;
1249 }
1250
1251 int
mu_stream_shutdown(mu_stream_t stream,int how)1252 mu_stream_shutdown (mu_stream_t stream, int how)
1253 {
1254 int rc;
1255
1256 _bootstrap_event (stream);
1257
1258 if (!(stream->flags & _MU_STR_OPEN))
1259 {
1260 if (stream->open)
1261 return MU_ERR_NOT_OPEN;
1262 _stream_init (stream);
1263 }
1264
1265 rc = mu_stream_flush (stream);
1266 if (rc)
1267 return rc;
1268 if (stream->shutdown)
1269 return stream->shutdown (stream, how);
1270 return 0;
1271 }
1272
1273 int
mu_stream_set_flags(mu_stream_t stream,int fl)1274 mu_stream_set_flags (mu_stream_t stream, int fl)
1275 {
1276 if (stream == NULL)
1277 return EINVAL;
1278 stream->flags |= (fl & ~_MU_STR_INTERN_MASK);
1279 return 0;
1280 }
1281
1282 int
mu_stream_clr_flags(mu_stream_t stream,int fl)1283 mu_stream_clr_flags (mu_stream_t stream, int fl)
1284 {
1285 if (stream == NULL)
1286 return EINVAL;
1287 stream->flags &= ~(fl & ~_MU_STR_INTERN_MASK);
1288 return 0;
1289 }
1290
1291 int
mu_stream_set_stat(mu_stream_t stream,int statmask,mu_stream_stat_buffer statbuf)1292 mu_stream_set_stat (mu_stream_t stream, int statmask,
1293 mu_stream_stat_buffer statbuf)
1294 {
1295 if (stream == NULL)
1296 return EINVAL;
1297 if (!statbuf)
1298 statmask = 0;
1299 stream->statmask = statmask;
1300 stream->statbuf = statbuf;
1301 if (stream->statbuf)
1302 memset (stream->statbuf, 0,
1303 _MU_STREAM_STAT_MAX * sizeof (stream->statbuf[0]));
1304 return 0;
1305 }
1306
1307 int
mu_stream_get_stat(mu_stream_t stream,int * pstatmask,mu_off_t ** pstatbuf)1308 mu_stream_get_stat (mu_stream_t stream, int *pstatmask,
1309 mu_off_t **pstatbuf)
1310 {
1311 if (stream == NULL)
1312 return EINVAL;
1313 *pstatmask = stream->statmask;
1314 *pstatbuf = stream->statbuf;
1315 return 0;
1316 }
1317
1318