1 /* GNU Mailutils -- a suite of utilities for electronic mail
2    Copyright (C) 2009-2021 Free Software Foundation, Inc.
3 
4    GNU Mailutils is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; either version 3, or (at your option)
7    any later version.
8 
9    GNU Mailutils is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with GNU Mailutils.  If not, see <http://www.gnu.org/licenses/>. */
16 
17 #ifdef HAVE_CONFIG_H
18 # include <config.h>
19 #endif
20 #include <string.h>
21 #include <stdlib.h>
22 #include <limits.h>
23 #ifndef SIZE_MAX
24 # define SIZE_MAX (~((size_t)0))
25 #endif
26 
27 #include <mailutils/types.h>
28 #include <mailutils/alloc.h>
29 #include <mailutils/error.h>
30 #include <mailutils/errno.h>
31 #include <mailutils/nls.h>
32 #include <mailutils/stream.h>
33 #include <mailutils/cstr.h>
34 #include <mailutils/sys/stream.h>
35 
36 size_t mu_stream_default_buffer_size = MU_STREAM_DEFBUFSIZ;
37 
38 static inline void
_stream_event(mu_stream_t str,int code,unsigned long n,void * p)39 _stream_event (mu_stream_t str, int code, unsigned long n, void *p)
40 {
41   if (str->event_cb && (str->event_mask & _MU_STR_EVMASK (code)))
42     str->event_cb (str, code, n, p);
43 }
44 
45 static inline void
_bootstrap_event(mu_stream_t str)46 _bootstrap_event (mu_stream_t str)
47 {
48   if (str->event_cb &&
49       (str->event_mask & _MU_STR_EVMASK (_MU_STR_EVENT_BOOTSTRAP)))
50     {
51       str->event_cb (str, _MU_STR_EVENT_BOOTSTRAP, 0, NULL);
52       str->event_mask &= ~_MU_STR_EVMASK (_MU_STR_EVENT_BOOTSTRAP);
53     }
54 }
55 
56 static inline void
_stream_stat_incr(mu_stream_t s,int k,size_t n)57 _stream_stat_incr (mu_stream_t s, int k, size_t n)
58 {
59   if (s->statmask & MU_STREAM_STAT_MASK (k))
60     s->statbuf[k] += n;
61 }
62 
63 static inline int
_stream_read(mu_stream_t str,char * buf,size_t size,size_t * pbytes)64 _stream_read (mu_stream_t str, char *buf, size_t size, size_t *pbytes)
65 {
66   int rc;
67   _stream_stat_incr (str, MU_STREAM_STAT_READS, 1);
68   rc = str->read (str, buf, size, pbytes);
69   if (rc == 0)
70     {
71       if (str->statmask & MU_STREAM_STAT_MASK (MU_STREAM_STAT_INLN))
72 	str->statbuf[MU_STREAM_STAT_INLN] += mu_mem_c_count (buf, '\n', *pbytes);
73       if (str->statmask & MU_STREAM_STAT_MASK (MU_STREAM_STAT_IN8BIT))
74 	str->statbuf[MU_STREAM_STAT_IN8BIT] += mu_mem_8bit_count (buf, *pbytes);
75     }
76   return rc;
77 }
78 
79 static inline int
_stream_write(mu_stream_t str,char const * buf,size_t size,size_t * pbytes)80 _stream_write (mu_stream_t str, char const *buf, size_t size, size_t *pbytes)
81 {
82   int rc;
83   _stream_stat_incr (str, MU_STREAM_STAT_WRITES, 1);
84   rc = str->write (str, buf, size, pbytes);
85   if (rc == 0)
86     {
87       if (str->statmask & MU_STREAM_STAT_MASK (MU_STREAM_STAT_OUTLN))
88 	str->statbuf[MU_STREAM_STAT_OUTLN] += mu_mem_c_count (buf, '\n', *pbytes);
89       if (str->statmask & MU_STREAM_STAT_MASK (MU_STREAM_STAT_OUT8BIT))
90 	str->statbuf[MU_STREAM_STAT_OUT8BIT] += mu_mem_8bit_count (buf, *pbytes);
91     }
92   return rc;
93 }
94 
95 static inline int
_stream_seek(mu_stream_t str,mu_off_t pos,mu_off_t * poff)96 _stream_seek (mu_stream_t str, mu_off_t pos, mu_off_t *poff)
97 {
98   _stream_stat_incr (str, MU_STREAM_STAT_SEEKS, 1);
99   return str->seek (str, pos, poff);
100 }
101 
102 static int _stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
103 				    int full_read, size_t *pnread);
104 static int _stream_write_unbuffered (mu_stream_t stream,
105 				     const void *buf, size_t size,
106 				     int full_write, size_t *pnwritten);
107 
108 static inline void
_stream_setflag(struct _mu_stream * stream,int flag)109 _stream_setflag (struct _mu_stream *stream, int flag)
110 {
111   _stream_event (stream, _MU_STR_EVENT_SETFLAG, flag, NULL);
112   stream->flags |= flag;
113 }
114 
115 static inline void
_stream_clrflag(struct _mu_stream * stream,int flag)116 _stream_clrflag (struct _mu_stream *stream, int flag)
117 {
118   _stream_event (stream, _MU_STR_EVENT_CLRFLAG, flag, NULL);
119   stream->flags &= ~flag;
120 }
121 
122 int
mu_stream_seterr(struct _mu_stream * stream,int code,int perm)123 mu_stream_seterr (struct _mu_stream *stream, int code, int perm)
124 {
125   stream->last_err = code;
126   switch (code)
127     {
128     case 0:
129     case EAGAIN:
130     case EINTR:
131     case ENOSYS:
132     case EINPROGRESS:
133       break;
134 
135     default:
136       if (perm)
137 	_stream_setflag (stream, _MU_STR_ERR);
138     }
139   return code;
140 }
141 
142 void
_mu_stream_cleareof(mu_stream_t str)143 _mu_stream_cleareof (mu_stream_t str)
144 {
145   _stream_clrflag (str, _MU_STR_EOF);
146 }
147 
148 void
_mu_stream_seteof(mu_stream_t str)149 _mu_stream_seteof (mu_stream_t str)
150 {
151   _stream_setflag (str, _MU_STR_EOF);
152 }
153 
154 #define _stream_curp(s) ((s)->buffer + (s)->pos)
155 
156 static int
_stream_fill_buffer(struct _mu_stream * stream)157 _stream_fill_buffer (struct _mu_stream *stream)
158 {
159   size_t n;
160   size_t rdn;
161   int rc = 0;
162   char c;
163 
164   switch (stream->buftype)
165     {
166     case mu_buffer_none:
167       return 0;
168 
169     case mu_buffer_full:
170       rc = _stream_read_unbuffered (stream,
171 				    stream->buffer, stream->bufsize,
172 				    0,
173 				    &stream->level);
174       break;
175 
176     case mu_buffer_line:
177       for (n = 0;
178 	   n < stream->bufsize
179 	     && (rc = _stream_read_unbuffered (stream,
180 					       &c, 1,
181 					       0, &rdn)) == 0;)
182 	{
183 	  if (rdn == 0)
184 	    {
185 	      _stream_setflag (stream, _MU_STR_EOF);
186 	      break;
187 	    }
188 	  stream->buffer[n++] = c;
189 	  if (c == '\n')
190 	    break;
191 	}
192       stream->level = n;
193       break;
194     }
195   if (rc == 0)
196     {
197       stream->pos = 0;
198       _stream_event (stream, _MU_STR_EVENT_FILLBUF,
199 		     stream->level, _stream_curp (stream));
200     }
201   return rc;
202 }
203 
204 enum
205   {
206     FLUSH_WRITE, /* Flush only modified data.  Keep buffer level and position
207 		    intact */
208     FLUSH_RDWR   /* Flush modified data and reset buffer level and position */
209   };
210 
211 static int
_stream_flush_buffer(struct _mu_stream * stream,int what)212 _stream_flush_buffer (struct _mu_stream *stream, int what)
213 {
214   int rc;
215 
216   if (stream->flags & _MU_STR_DIRTY)
217     {
218       if ((stream->flags & MU_STREAM_SEEK) && stream->seek)
219 	{
220 	  mu_off_t off;
221 	  rc = _stream_seek (stream, stream->offset, &off);
222 	  if (rc)
223 	    return rc;
224 	}
225 
226       if ((rc = _stream_write_unbuffered (stream, stream->buffer,
227 					  stream->level,
228 					  1, NULL)))
229 	return rc;
230       _stream_event (stream, _MU_STR_EVENT_FLUSHBUF,
231 		     stream->level, stream->buffer);
232       _stream_clrflag (stream, _MU_STR_DIRTY);
233 
234       if (stream->pos < stream->level)
235 	memmove (stream->buffer, stream->buffer + stream->pos,
236 		 stream->level - stream->pos);
237       stream->offset += stream->pos;
238       stream->level -= stream->pos;
239       stream->pos = 0;
240     }
241 
242   if (what)
243     {
244       stream->offset += stream->level;
245       stream->pos = stream->level = 0;
246     }
247 
248   return 0;
249 }
250 
251 
252 mu_stream_t
_mu_stream_create(size_t size,int flags)253 _mu_stream_create (size_t size, int flags)
254 {
255   struct _mu_stream *str;
256   if (size < sizeof (str))
257     abort ();
258   str = mu_zalloc (size);
259   str->flags = flags & ~(_MU_STR_INTERN_MASK & ~_MU_STR_OPEN);
260   mu_stream_ref (str);
261   return str;
262 }
263 
264 void
mu_stream_destroy(mu_stream_t * pstream)265 mu_stream_destroy (mu_stream_t *pstream)
266 {
267   if (pstream)
268     {
269       mu_stream_t str = *pstream;
270       if (str && (str->ref_count == 0 || --str->ref_count == 0))
271 	{
272 	  mu_stream_close (str);
273 	  if (str->buftype != mu_buffer_none)
274 	    {
275 	      free (str->buffer);
276 	      str->buffer = NULL;
277 	      str->buftype = mu_buffer_none;
278 	    }
279 	  if (str->done)
280 	    str->done (str);
281 	  if (str->destroy)
282 	    str->destroy (str);
283 	  else
284 	    free (str);
285 	  *pstream = NULL;
286 	}
287     }
288 }
289 
290 void
mu_stream_get_flags(mu_stream_t str,int * pflags)291 mu_stream_get_flags (mu_stream_t str, int *pflags)
292 {
293   *pflags = str->flags & ~_MU_STR_INTERN_MASK;
294 }
295 
296 void
mu_stream_ref(mu_stream_t stream)297 mu_stream_ref (mu_stream_t stream)
298 {
299   stream->ref_count++;
300 }
301 
302 void
mu_stream_unref(mu_stream_t stream)303 mu_stream_unref (mu_stream_t stream)
304 {
305   mu_stream_destroy (&stream);
306 }
307 
308 static void
_stream_init(mu_stream_t stream)309 _stream_init (mu_stream_t stream)
310 {
311   if (stream->statmask)
312     memset (stream->statbuf, 0,
313 	    _MU_STREAM_STAT_MAX * sizeof (stream->statbuf[0]));
314   stream->flags &= ~_MU_STR_INTERN_MASK;
315   _stream_setflag (stream, _MU_STR_OPEN);
316   stream->offset = 0;
317   stream->level = stream->pos = 0;
318   stream->last_err = 0;
319 }
320 
321 int
mu_stream_open(mu_stream_t stream)322 mu_stream_open (mu_stream_t stream)
323 {
324   int rc;
325 
326   if (stream->flags & _MU_STR_OPEN)
327     return MU_ERR_OPEN;
328   _bootstrap_event (stream);
329   if (stream->open)
330     {
331       if ((rc = stream->open (stream)))
332 	return mu_stream_seterr (stream, rc, 1);
333     }
334   _stream_init (stream);
335   if ((stream->flags & (MU_STREAM_APPEND|MU_STREAM_SEEK)) ==
336       (MU_STREAM_APPEND|MU_STREAM_SEEK) &&
337       (rc = mu_stream_seek (stream, 0, MU_SEEK_END, NULL)))
338     return mu_stream_seterr (stream, rc, 1);
339   return 0;
340 }
341 
342 const char *
mu_stream_strerror(mu_stream_t stream,int rc)343 mu_stream_strerror (mu_stream_t stream, int rc)
344 {
345   const char *str;
346 
347   if (!stream)
348     return mu_strerror (rc);
349   if (stream->error_string)
350     str = stream->error_string (stream, rc);
351   else
352     str = mu_strerror (rc);
353   return str;
354 }
355 
356 int
mu_stream_err(mu_stream_t stream)357 mu_stream_err (mu_stream_t stream)
358 {
359   return stream->flags & _MU_STR_ERR;
360 }
361 
362 int
mu_stream_last_error(mu_stream_t stream)363 mu_stream_last_error (mu_stream_t stream)
364 {
365   return stream->last_err;
366 }
367 
368 void
mu_stream_clearerr(mu_stream_t stream)369 mu_stream_clearerr (mu_stream_t stream)
370 {
371   stream->last_err = 0;
372   _stream_clrflag (stream, _MU_STR_ERR);
373 }
374 
375 int
mu_stream_eof(mu_stream_t stream)376 mu_stream_eof (mu_stream_t stream)
377 {
378   return (stream->flags & _MU_STR_EOF) && (stream->pos == stream->level);
379 }
380 
381 int
mu_stream_is_open(mu_stream_t stream)382 mu_stream_is_open (mu_stream_t stream)
383 {
384   return stream->flags & _MU_STR_OPEN;
385 }
386 
387 int
mu_stream_seek(mu_stream_t stream,mu_off_t offset,int whence,mu_off_t * pres)388 mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
389 		mu_off_t *pres)
390 {
391   int rc;
392   mu_off_t size;
393 
394   _bootstrap_event (stream);
395   if (!(stream->flags & _MU_STR_OPEN))
396     {
397       if (stream->open)
398 	return MU_ERR_NOT_OPEN;
399       _stream_init (stream);
400     }
401 
402   if (!stream->seek)
403     return mu_stream_seterr (stream, ENOSYS, 0);
404 
405   if (!(stream->flags & MU_STREAM_SEEK))
406     return mu_stream_seterr (stream, EACCES, 1);
407 
408   switch (whence)
409     {
410     case MU_SEEK_SET:
411       break;
412 
413     case MU_SEEK_CUR:
414       if (offset == 0)
415 	{
416 	  *pres = stream->offset + stream->pos;
417 	  return 0;
418 	}
419       offset += stream->offset + stream->pos;
420       break;
421 
422     case MU_SEEK_END:
423       rc = mu_stream_size (stream, &size);
424       if (rc)
425 	return rc;
426       offset += size;
427       break;
428 
429     default:
430       return mu_stream_seterr (stream, EINVAL, 1);
431     }
432 
433   if (!(stream->buftype == mu_buffer_none ?
434 	(offset == stream->offset)
435 	: (stream->offset <= offset &&
436 	   offset < stream->offset + stream->level)))
437     {
438       if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
439 	return rc;
440       if (stream->offset != offset)
441 	{
442 	  rc = _stream_seek (stream, offset, &stream->offset);
443 	  if (rc == ESPIPE)
444 	    return rc;
445 	  if (rc)
446 	    return mu_stream_seterr (stream, rc, 1);
447 	}
448       _mu_stream_cleareof (stream);
449     }
450   else if (stream->buftype != mu_buffer_none)
451     stream->pos = offset - stream->offset;
452 
453   _mu_stream_cleareof (stream);
454 
455   if (pres)
456     *pres = stream->offset + stream->pos;
457   return 0;
458 }
459 
460 /* Skip COUNT bytes from the current position in stream by reading from
461    it.  Return new offset in PRES.
462 
463    Return 0 on success, EACCES if STREAM was not opened for reading.
464    Another non-zero exit codes are propagated from the underlying
465    input operations.
466 
467    This function is designed to help implement seek method in otherwise
468    unseekable streams (such as filters).  Do not use it unless you absolutely
469    have to.  Using it on an unbuffered stream is a terrible waste of CPU. */
470 static int
_stream_skip_input_bytes(mu_stream_t stream,mu_off_t count,mu_off_t * pres)471 _stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
472 {
473   mu_off_t pos;
474   int rc = 0;
475 
476   if (!(stream->flags & MU_STREAM_READ))
477     return mu_stream_seterr (stream, EACCES, 1);
478 
479   if (count)
480     {
481       if (stream->buftype == mu_buffer_none)
482 	{
483 	  for (pos = 0; pos < count; pos++)
484 	    {
485 	      char c;
486 	      size_t nrd;
487 	      rc = mu_stream_read (stream, &c, 1, &nrd);
488 	      if (nrd == 0)
489 		rc = ESPIPE;
490 	      if (rc)
491 		break;
492 	    }
493 	}
494       else
495 	{
496 	  for (pos = 0;;)
497 	    {
498 	      if (pos || stream->level == 0)
499 		{
500 		  if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
501 		    return rc;
502 		  rc = _stream_fill_buffer (stream);
503 		  if (rc)
504 		    break;
505 		  if (stream->level == 0)
506 		    {
507 		      rc = ESPIPE;
508 		      break;
509 		    }
510 		}
511 	      if (pos <= count && count < pos + stream->level)
512 		{
513 		  stream->pos = count - pos;
514 		  rc = 0;
515 		  break;
516 		}
517 	      pos += stream->level;
518 	    }
519 	}
520     }
521 
522   if (pres)
523     *pres = stream->offset + stream->pos;
524   return rc;
525 }
526 
527 /* A wrapper for the above function.  It is normally called from a
528    seek method implementation, so it makes sure the MU_STREAM_SEEK
529    is cleared while in _stream_skip_input_bytes, to avoid infitite
530    recursion that may be triggered by _stream_flush_buffer invoking
531    stream->seek. */
532 int
mu_stream_skip_input_bytes(mu_stream_t stream,mu_off_t count,mu_off_t * pres)533 mu_stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
534 {
535   int rc;
536   int seek_flag = stream->flags & MU_STREAM_SEEK;
537   stream->flags &= ~MU_STREAM_SEEK;
538   rc = _stream_skip_input_bytes (stream, count, pres);
539   stream->flags |= seek_flag;
540   return rc;
541 }
542 
543 int
mu_stream_set_buffer(mu_stream_t stream,enum mu_buffer_type type,size_t size)544 mu_stream_set_buffer (mu_stream_t stream, enum mu_buffer_type type,
545 		      size_t size)
546 {
547   _bootstrap_event (stream);
548 
549   if (size == 0)
550     size = mu_stream_default_buffer_size;
551 
552   if (stream->setbuf_hook)
553     {
554       int rc = stream->setbuf_hook (stream, type, size);
555       if (rc)
556 	return rc;
557     }
558 
559   if (stream->buffer)
560     {
561       mu_stream_flush (stream);
562       free (stream->buffer);
563     }
564 
565   stream->buftype = type;
566   if (type == mu_buffer_none)
567     {
568       stream->buffer = NULL;
569       return 0;
570     }
571 
572   stream->buffer = malloc (size);
573   if (stream->buffer == NULL)
574     {
575       stream->buftype = mu_buffer_none;
576       return mu_stream_seterr (stream, ENOMEM, 1);
577     }
578   stream->bufsize = size;
579   stream->pos = 0;
580   stream->level = 0;
581 
582   return 0;
583 }
584 
585 int
mu_stream_get_buffer(mu_stream_t stream,struct mu_buffer_query * qry)586 mu_stream_get_buffer (mu_stream_t stream, struct mu_buffer_query *qry)
587 {
588   qry->buftype = stream->buftype;
589   qry->bufsize = stream->bufsize;
590   return 0;
591 }
592 
593 static int
_stream_read_unbuffered(mu_stream_t stream,void * buf,size_t size,int full_read,size_t * pnread)594 _stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
595 			 int full_read, size_t *pnread)
596 {
597   int rc;
598   size_t nread;
599 
600   if (!stream->read)
601     return mu_stream_seterr (stream, ENOSYS, 0);
602 
603   if (!(stream->flags & MU_STREAM_READ))
604     return mu_stream_seterr (stream, EACCES, 1);
605 
606   if (stream->flags & _MU_STR_ERR)
607     return stream->last_err;
608 
609   if (mu_stream_eof (stream) || size == 0)
610     {
611       if (pnread)
612 	*pnread = 0;
613       return 0;
614     }
615 
616   if (full_read)
617     {
618       size_t rdbytes;
619 
620       nread = 0;
621       while (size > 0
622 	     && (rc = _stream_read (stream, buf, size, &rdbytes)) == 0)
623 	{
624 	  if (rdbytes == 0)
625 	    {
626 	      _stream_setflag (stream, _MU_STR_EOF);
627 	      break;
628 	    }
629 	  buf += rdbytes;
630 	  nread += rdbytes;
631 	  size -= rdbytes;
632 	  _stream_stat_incr (stream, MU_STREAM_STAT_IN, rdbytes);
633 	}
634       if (size && rc)
635 	rc = mu_stream_seterr (stream, rc, 0);
636     }
637   else
638     {
639       rc = _stream_read (stream, buf, size, &nread);
640       if (rc == 0)
641 	{
642 	  if (nread == 0)
643 	    _stream_setflag (stream, _MU_STR_EOF);
644 	  _stream_stat_incr (stream, MU_STREAM_STAT_IN, nread);
645 	}
646       mu_stream_seterr (stream, rc, rc != 0);
647     }
648   if (rc == 0 && pnread)
649     *pnread = nread;
650 
651   return rc;
652 }
653 
654 static int
_stream_write_unbuffered(mu_stream_t stream,const void * buf,size_t size,int full_write,size_t * pnwritten)655 _stream_write_unbuffered (mu_stream_t stream,
656 			  const void *buf, size_t size,
657 			  int full_write, size_t *pnwritten)
658 {
659   int rc;
660   size_t nwritten;
661 
662   if (!stream->write)
663     return mu_stream_seterr (stream, ENOSYS, 0);
664 
665   if (!(stream->flags & (MU_STREAM_WRITE|MU_STREAM_APPEND)))
666     return mu_stream_seterr (stream, EACCES, 1);
667 
668   if (stream->flags & _MU_STR_ERR)
669     return stream->last_err;
670 
671   if (size == 0)
672     {
673       if (pnwritten)
674 	*pnwritten = 0;
675       return 0;
676     }
677 
678   if (full_write)
679     {
680       size_t wrbytes;
681       const char *bufp = buf;
682 
683       nwritten = 0;
684       while (size > 0
685 	     && (rc = _stream_write (stream, bufp, size, &wrbytes))
686 	     == 0)
687 	{
688 	  if (wrbytes == 0)
689 	    {
690 	      rc = EIO;
691 	      break;
692 	    }
693 	  bufp += wrbytes;
694 	  nwritten += wrbytes;
695 	  size -= wrbytes;
696 	  _stream_stat_incr (stream, MU_STREAM_STAT_OUT, wrbytes);
697 	}
698     }
699   else
700     {
701       rc = _stream_write (stream, buf, size, &nwritten);
702       if (rc == 0)
703 	_stream_stat_incr (stream, MU_STREAM_STAT_OUT, nwritten);
704     }
705   _stream_setflag (stream, _MU_STR_WRT);
706   if (pnwritten)
707     *pnwritten = nwritten;
708   mu_stream_seterr (stream, rc, rc != 0);
709   return rc;
710 }
711 
712 int
mu_stream_read(mu_stream_t stream,void * buf,size_t size,size_t * pread)713 mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread)
714 {
715   _bootstrap_event (stream);
716 
717   if (!(stream->flags & _MU_STR_OPEN))
718     {
719       if (stream->open)
720 	return MU_ERR_NOT_OPEN;
721       _stream_init (stream);
722     }
723 
724   if (stream->buftype == mu_buffer_none)
725     {
726       size_t nread = 0;
727       int rc = _stream_read_unbuffered (stream, buf, size, !pread, &nread);
728       stream->offset += nread;
729       if (pread)
730 	*pread = nread;
731       return rc;
732     }
733   else
734     {
735       char *bufp = buf;
736       size_t nbytes = 0;
737       int rc;
738 
739       while (size)
740 	{
741 	  size_t n;
742 
743 	  if (stream->pos == stream->level)
744 	    {
745 	      if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
746 		{
747 		  if (nbytes)
748 		    break;
749 		  return rc;
750 		}
751 	      if ((rc = _stream_fill_buffer (stream)))
752 		{
753 		  if (nbytes)
754 		    break;
755 		  return rc;
756 		}
757 	      if (stream->level == 0)
758 		break;
759 	    }
760 
761 	  n = size;
762 	  if (n > stream->level - stream->pos)
763 	    n = stream->level - stream->pos;
764 	  memcpy (bufp, _stream_curp (stream), n);
765 	  stream->pos += n;
766 	  nbytes += n;
767 	  bufp += n;
768 	  size -= n;
769 	  if (stream->buftype == mu_buffer_line && bufp[-1] == '\n')
770 	    break;
771 	}
772 
773       if (pread)
774 	*pread = nbytes;
775     }
776   return 0;
777 }
778 
779 int
_stream_scandelim(mu_stream_t stream,char * buf,size_t size,int delim,size_t * pnread)780 _stream_scandelim (mu_stream_t stream, char *buf, size_t size, int delim,
781 		   size_t *pnread)
782 {
783   int rc = 0;
784   size_t nread = 0;
785 
786   size--;
787   if (size == 0)
788     return MU_ERR_BUFSPACE;
789   while (size)
790     {
791       char *p, *q;
792       size_t len;
793 
794       if (stream->pos == stream->level)
795 	{
796 	  if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
797 	    break;
798 	  if ((rc = _stream_fill_buffer (stream)) || stream->level == 0)
799 	    break;
800 	}
801 
802       q = _stream_curp (stream);
803       len = stream->level - stream->pos;
804       p = memchr (q, delim, len);
805       if (p)
806 	len = p - q + 1;
807       if (len > size)
808 	len = size;
809       memcpy (buf, _stream_curp (stream), len);
810       stream->pos += len;
811       buf += len;
812       size -= len;
813       nread += len;
814       if (p) /* Delimiter found */
815 	break;
816     }
817   *buf = 0;
818   *pnread = nread;
819   return rc;
820 }
821 
822 static int
_stream_readdelim(mu_stream_t stream,char * buf,size_t size,int delim,size_t * pread)823 _stream_readdelim (mu_stream_t stream, char *buf, size_t size,
824 		   int delim, size_t *pread)
825 {
826   int rc;
827   char c;
828   size_t n = 0, rdn;
829 
830   size--;
831   if (size == 0)
832     return MU_ERR_BUFSPACE;
833   for (n = 0;
834        n < size && (rc = mu_stream_read (stream, &c, 1, &rdn)) == 0 && rdn;)
835     {
836       *buf++ = c;
837       n++;
838       if (c == delim)
839 	break;
840     }
841   *buf = 0;
842   if (pread)
843     *pread = n;
844   return rc;
845 }
846 
847 int
mu_stream_readdelim(mu_stream_t stream,char * buf,size_t size,int delim,size_t * pread)848 mu_stream_readdelim (mu_stream_t stream, char *buf, size_t size,
849 		     int delim, size_t *pread)
850 {
851   int rc;
852 
853   _bootstrap_event (stream);
854 
855   if (size == 0)
856     return EINVAL;
857 
858   if (!(stream->flags & _MU_STR_OPEN))
859     {
860       if (stream->open)
861 	return MU_ERR_NOT_OPEN;
862       _stream_init (stream);
863     }
864 
865   if (stream->buftype == mu_buffer_none)
866     {
867       rc = _stream_readdelim (stream, buf, size, delim, pread);
868     }
869   else
870     {
871       if ((rc = _stream_flush_buffer (stream, FLUSH_WRITE)))
872 	return rc;
873       rc = _stream_scandelim (stream, buf, size, delim, pread);
874     }
875   return rc;
876 }
877 
878 int
mu_stream_readline(mu_stream_t stream,char * buf,size_t size,size_t * pread)879 mu_stream_readline (mu_stream_t stream, char *buf, size_t size, size_t *pread)
880 {
881   return mu_stream_readdelim (stream, buf, size, '\n', pread);
882 }
883 
884 int
mu_stream_getdelim(mu_stream_t stream,char ** pbuf,size_t * psize,int delim,size_t * pread)885 mu_stream_getdelim (mu_stream_t stream, char **pbuf, size_t *psize,
886 		    int delim, size_t *pread)
887 {
888   int rc;
889   char *lineptr = *pbuf;
890   size_t n = *psize;
891   size_t cur_len = 0;
892 
893   _bootstrap_event (stream);
894 
895   if (!(stream->flags & _MU_STR_OPEN))
896     {
897       if (stream->open)
898 	return MU_ERR_NOT_OPEN;
899       _stream_init (stream);
900     }
901 
902   if ((rc = _stream_flush_buffer (stream, FLUSH_WRITE)))
903     return rc;
904 
905   if (lineptr == NULL || n == 0)
906     {
907       char *new_lineptr;
908       n = 120;
909       new_lineptr = realloc (lineptr, n);
910       if (new_lineptr == NULL)
911 	return ENOMEM;
912       lineptr = new_lineptr;
913     }
914 
915   for (;;)
916     {
917       size_t rdn;
918 
919       /* Make enough space for len+1 (for final NUL) bytes.  */
920       if (cur_len + 1 >= n)
921 	{
922 	  size_t needed_max =
923 	    SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX;
924 	  size_t needed = 2 * n + 1;   /* Be generous. */
925 	  char *new_lineptr;
926 
927 	  if (needed_max < needed)
928 	    needed = needed_max;
929 	  if (cur_len + 1 >= needed)
930 	    {
931 	      rc = EOVERFLOW;
932 	      break;
933 	    }
934 
935 	  new_lineptr = realloc (lineptr, needed);
936 	  if (new_lineptr == NULL)
937 	    {
938 	      rc = ENOMEM;
939 	      break;
940 	    }
941 
942 	  lineptr = new_lineptr;
943 	  n = needed;
944 	}
945 
946       if (stream->buftype == mu_buffer_none)
947 	rc = _stream_readdelim (stream, lineptr + cur_len, n - cur_len, delim,
948 				&rdn);
949       else
950 	rc = _stream_scandelim (stream, lineptr + cur_len, n - cur_len, delim,
951 				&rdn);
952 
953       if (rc || rdn == 0)
954 	break;
955       cur_len += rdn;
956 
957       if (lineptr[cur_len - 1] == delim)
958 	break;
959     }
960   lineptr[cur_len] = '\0';
961 
962   *pbuf = lineptr;
963   *psize = n;
964 
965   if (pread)
966     *pread = cur_len;
967   return rc;
968 }
969 
970 int
mu_stream_getline(mu_stream_t stream,char ** pbuf,size_t * psize,size_t * pread)971 mu_stream_getline (mu_stream_t stream, char **pbuf, size_t *psize,
972 		   size_t *pread)
973 {
974   return mu_stream_getdelim (stream, pbuf, psize, '\n', pread);
975 }
976 
977 /* Return 1 if no more data can be written to the current buffer. */
978 static inline int
_stream_buffer_full_p(struct _mu_stream * stream)979 _stream_buffer_full_p (struct _mu_stream *stream)
980 {
981   /* This function should be called only for buffered streams */
982   if (stream->buftype == mu_buffer_none)
983     return 0;
984 
985   if (stream->bufsize == stream->pos)
986     /* No space left in buffer */
987     return 1;
988 
989   /* For line buffering, the buffer is flushed immediately after
990      receiving a newline character. */
991   if (stream->buftype == mu_buffer_line &&
992       stream->pos > 0 &&
993       memchr (stream->buffer, '\n', stream->pos) != NULL)
994     return 1;
995 
996   return 0;
997 }
998 
999 int
mu_stream_write(mu_stream_t stream,const void * buf,size_t size,size_t * pnwritten)1000 mu_stream_write (mu_stream_t stream, const void *buf, size_t size,
1001 		 size_t *pnwritten)
1002 {
1003   int rc = 0;
1004 
1005   _bootstrap_event (stream);
1006 
1007   if (!(stream->flags & _MU_STR_OPEN))
1008     {
1009       if (stream->open)
1010 	return MU_ERR_NOT_OPEN;
1011       _stream_init (stream);
1012     }
1013 
1014   if (stream->buftype == mu_buffer_none)
1015     {
1016       size_t nwritten;
1017       rc = _stream_write_unbuffered (stream, buf, size,
1018 				     !pnwritten, &nwritten);
1019       stream->offset += nwritten;
1020       if (pnwritten)
1021 	*pnwritten = nwritten;
1022     }
1023   else
1024     {
1025       size_t nbytes = 0;
1026       const char *bufp = buf;
1027 
1028       while (1)
1029 	{
1030 	  size_t n;
1031 
1032 	  if (_stream_buffer_full_p (stream))
1033 	    {
1034 	      if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
1035 		break;
1036 	    }
1037 
1038 	  if (size == 0)
1039 	    break;
1040 
1041 	  n = stream->bufsize - stream->pos;
1042 	  if (n > size)
1043 	    n = size;
1044 	  memcpy (_stream_curp (stream), bufp, n);
1045 	  stream->pos += n;
1046 	  if (stream->pos > stream->level)
1047 	    stream->level = stream->pos;
1048 
1049 	  nbytes += n;
1050 	  bufp += n;
1051 	  size -= n;
1052 	  _stream_setflag (stream, _MU_STR_DIRTY);
1053 	}
1054       if (pnwritten)
1055 	*pnwritten = nbytes;
1056       else if (nbytes < size)
1057 	{
1058 	  rc = EIO;
1059 	  mu_stream_seterr (stream, rc, 1);
1060 	}
1061     }
1062   return rc;
1063 }
1064 
1065 int
mu_stream_writeline(mu_stream_t stream,const char * buf,size_t size)1066 mu_stream_writeline (mu_stream_t stream, const char *buf, size_t size)
1067 {
1068   int rc;
1069   if ((rc = mu_stream_write (stream, buf, size, NULL)) == 0)
1070     rc = mu_stream_write (stream, "\r\n", 2, NULL);
1071   return rc;
1072 }
1073 
1074 int
mu_stream_flush(mu_stream_t stream)1075 mu_stream_flush (mu_stream_t stream)
1076 {
1077   int rc;
1078 
1079   if (!stream)
1080     return EINVAL;
1081   _bootstrap_event (stream);
1082   if (!(stream->flags & _MU_STR_OPEN))
1083     {
1084       if (stream->open)
1085 	return MU_ERR_NOT_OPEN;
1086       _stream_init (stream);
1087     }
1088   rc = _stream_flush_buffer (stream, FLUSH_RDWR);
1089   if (rc)
1090     return rc;
1091   if ((stream->flags & _MU_STR_WRT) && stream->flush)
1092     return stream->flush (stream);
1093   _stream_clrflag (stream, _MU_STR_WRT);
1094   return 0;
1095 }
1096 
1097 int
mu_stream_close(mu_stream_t stream)1098 mu_stream_close (mu_stream_t stream)
1099 {
1100   int rc = 0;
1101 
1102   if (!stream)
1103     return EINVAL;
1104   if (!(stream->flags & _MU_STR_OPEN))
1105     return MU_ERR_NOT_OPEN;
1106 
1107   rc = mu_stream_flush (stream);
1108   if (rc == 0)
1109     {
1110       /* Do close the stream only if it is not used by anyone else */
1111       if (stream->ref_count > 1)
1112 	return 0;
1113       _stream_event (stream, _MU_STR_EVENT_CLOSE, 0, NULL);
1114       if (stream->close)
1115 	rc = stream->close (stream);
1116       if (rc == 0)
1117 	_stream_clrflag (stream, _MU_STR_OPEN);
1118     }
1119   return rc;
1120 }
1121 
1122 int
mu_stream_size(mu_stream_t stream,mu_off_t * psize)1123 mu_stream_size (mu_stream_t stream, mu_off_t *psize)
1124 {
1125   int rc;
1126   mu_off_t size;
1127 
1128   _bootstrap_event (stream);
1129   if (!(stream->flags & _MU_STR_OPEN))
1130     {
1131       if (stream->open)
1132 	return MU_ERR_NOT_OPEN;
1133       _stream_init (stream);
1134     }
1135   if (!stream->size)
1136     return mu_stream_seterr (stream, ENOSYS, 0);
1137   rc = stream->size (stream, &size);
1138   if (rc == 0)
1139     {
1140       if (stream->buftype != mu_buffer_none)
1141 	{
1142 	  size_t n = stream->offset + stream->level;
1143 	  if (n > size)
1144 	    size = n;
1145 	}
1146       *psize = size;
1147     }
1148   return mu_stream_seterr (stream, rc, rc != 0);
1149 }
1150 
1151 int
mu_stream_ioctl(mu_stream_t stream,int family,int opcode,void * ptr)1152 mu_stream_ioctl (mu_stream_t stream, int family, int opcode, void *ptr)
1153 {
1154   int rc;
1155   _bootstrap_event (stream);
1156   if ((rc = _stream_flush_buffer (stream, FLUSH_WRITE)))
1157     return rc;
1158   if (stream->ctl == NULL)
1159     return ENOSYS;
1160   return stream->ctl (stream, family, opcode, ptr);
1161 }
1162 
1163 int
mu_stream_wait(mu_stream_t stream,int * pflags,struct timeval * tvp)1164 mu_stream_wait (mu_stream_t stream, int *pflags, struct timeval *tvp)
1165 {
1166   int flg = 0;
1167 
1168   if (stream == NULL)
1169     return EINVAL;
1170   _bootstrap_event (stream);
1171 #if 0
1172   /* NOTE: Sometimes mu_stream_wait is called after a failed mu_stream_open.
1173      In particular, this is needed for a TCP stream opened with a
1174      MU_STREAM_NONBLOCK flag (see examples/http.c).  Until a better
1175      solution is found, this check is commented out. */
1176   if (!(stream->flags & _MU_STR_OPEN))
1177     {
1178       if (stream->open)
1179 	return MU_ERR_NOT_OPEN;
1180       _stream_init (stream);
1181     }
1182 #endif
1183   /* Take into account if we have any buffering.  */
1184   /* FIXME: How about MU_STREAM_READY_WR? */
1185   if ((*pflags) & MU_STREAM_READY_RD
1186       && stream->buftype != mu_buffer_none
1187       && stream->pos < stream->level)
1188     {
1189       flg = MU_STREAM_READY_RD;
1190       *pflags &= ~MU_STREAM_READY_RD;
1191     }
1192 
1193   if (stream->wait)
1194     {
1195       int rc;
1196 
1197       if (flg && *pflags == 0)
1198 	/* Don't call wait method if our modifications (see above) resulted
1199 	   in empty *pflags. */
1200 	rc = 0;
1201       else
1202         rc = stream->wait (stream, pflags, tvp);
1203       if (rc == 0)
1204 	*pflags |= flg;
1205       return rc;
1206     }
1207 
1208   return ENOSYS;
1209 }
1210 
1211 int
mu_stream_truncate(mu_stream_t stream,mu_off_t size)1212 mu_stream_truncate (mu_stream_t stream, mu_off_t size)
1213 {
1214   _bootstrap_event (stream);
1215 
1216   if (!(stream->flags & _MU_STR_OPEN))
1217     {
1218       if (stream->open)
1219 	return MU_ERR_NOT_OPEN;
1220       _stream_init (stream);
1221     }
1222 
1223   if (stream->truncate)
1224     {
1225       int rc;
1226 
1227       if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
1228 	return rc;
1229       if (stream->offset > size)
1230 	{
1231 	  stream->offset = size;
1232 	  stream->level = 0;
1233 	  stream->pos = 0;
1234 	}
1235       else if (stream->offset + stream->pos > size)
1236 	{
1237 	  stream->pos = size - stream->offset;
1238 	  stream->level = stream->pos;
1239 	}
1240       else if (stream->offset + stream->level > size)
1241 	{
1242 	  stream->level = size - stream->offset;
1243 	  if (stream->pos > stream->level)
1244 	    stream->pos = stream->level;
1245 	}
1246       return stream->truncate (stream, size);
1247     }
1248   return ENOSYS;
1249 }
1250 
1251 int
mu_stream_shutdown(mu_stream_t stream,int how)1252 mu_stream_shutdown (mu_stream_t stream, int how)
1253 {
1254   int rc;
1255 
1256   _bootstrap_event (stream);
1257 
1258   if (!(stream->flags & _MU_STR_OPEN))
1259     {
1260       if (stream->open)
1261 	return MU_ERR_NOT_OPEN;
1262       _stream_init (stream);
1263     }
1264 
1265   rc = mu_stream_flush (stream);
1266   if (rc)
1267     return rc;
1268   if (stream->shutdown)
1269     return stream->shutdown (stream, how);
1270   return 0;
1271 }
1272 
1273 int
mu_stream_set_flags(mu_stream_t stream,int fl)1274 mu_stream_set_flags (mu_stream_t stream, int fl)
1275 {
1276   if (stream == NULL)
1277     return EINVAL;
1278   stream->flags |= (fl & ~_MU_STR_INTERN_MASK);
1279   return 0;
1280 }
1281 
1282 int
mu_stream_clr_flags(mu_stream_t stream,int fl)1283 mu_stream_clr_flags (mu_stream_t stream, int fl)
1284 {
1285   if (stream == NULL)
1286     return EINVAL;
1287   stream->flags &= ~(fl & ~_MU_STR_INTERN_MASK);
1288   return 0;
1289 }
1290 
1291 int
mu_stream_set_stat(mu_stream_t stream,int statmask,mu_stream_stat_buffer statbuf)1292 mu_stream_set_stat (mu_stream_t stream, int statmask,
1293 		    mu_stream_stat_buffer statbuf)
1294 {
1295   if (stream == NULL)
1296     return EINVAL;
1297   if (!statbuf)
1298     statmask = 0;
1299   stream->statmask = statmask;
1300   stream->statbuf = statbuf;
1301   if (stream->statbuf)
1302     memset (stream->statbuf, 0,
1303 	    _MU_STREAM_STAT_MAX * sizeof (stream->statbuf[0]));
1304   return 0;
1305 }
1306 
1307 int
mu_stream_get_stat(mu_stream_t stream,int * pstatmask,mu_off_t ** pstatbuf)1308 mu_stream_get_stat (mu_stream_t stream, int *pstatmask,
1309 		    mu_off_t **pstatbuf)
1310 {
1311   if (stream == NULL)
1312     return EINVAL;
1313   *pstatmask = stream->statmask;
1314   *pstatbuf = stream->statbuf;
1315   return 0;
1316 }
1317 
1318