xref: /openbsd/lib/libevent/evbuffer.c (revision 3d8817e4)
1 /*	$OpenBSD: evbuffer.c,v 1.12 2010/04/21 20:02:40 nicm Exp $	*/
2 
3 /*
4  * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. The name of the author may not be used to endorse or promote products
16  *    derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include <sys/types.h>
31 
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35 
36 #ifdef HAVE_SYS_TIME_H
37 #include <sys/time.h>
38 #endif
39 
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #ifdef HAVE_STDARG_H
45 #include <stdarg.h>
46 #endif
47 
48 #ifdef WIN32
49 #include <winsock2.h>
50 #endif
51 
52 #include "evutil.h"
53 #include "event.h"
54 
55 /* prototypes */
56 
57 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
58 
59 static int
60 bufferevent_add(struct event *ev, int timeout)
61 {
62 	struct timeval tv, *ptv = NULL;
63 
64 	if (timeout) {
65 		evutil_timerclear(&tv);
66 		tv.tv_sec = timeout;
67 		ptv = &tv;
68 	}
69 
70 	return (event_add(ev, ptv));
71 }
72 
73 /*
74  * This callback is executed when the size of the input buffer changes.
75  * We use it to apply back pressure on the reading side.
76  */
77 
78 void
79 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
80     void *arg) {
81 	struct bufferevent *bufev = arg;
82 	/*
83 	 * If we are below the watermark then reschedule reading if it's
84 	 * still enabled.
85 	 */
86 	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
87 		evbuffer_setcb(buf, NULL, NULL);
88 
89 		if (bufev->enabled & EV_READ)
90 			bufferevent_add(&bufev->ev_read, bufev->timeout_read);
91 	}
92 }
93 
94 static void
95 bufferevent_readcb(int fd, short event, void *arg)
96 {
97 	struct bufferevent *bufev = arg;
98 	int res = 0;
99 	short what = EVBUFFER_READ;
100 	size_t len;
101 	int howmuch = -1;
102 
103 	if (event == EV_TIMEOUT) {
104 		what |= EVBUFFER_TIMEOUT;
105 		goto error;
106 	}
107 
108 	/*
109 	 * If we have a high watermark configured then we don't want to
110 	 * read more data than would make us reach the watermark.
111 	 */
112 	if (bufev->wm_read.high != 0) {
113 		howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
114 		/* we might have lowered the watermark, stop reading */
115 		if (howmuch <= 0) {
116 			struct evbuffer *buf = bufev->input;
117 			event_del(&bufev->ev_read);
118 			evbuffer_setcb(buf,
119 			    bufferevent_read_pressure_cb, bufev);
120 			return;
121 		}
122 	}
123 
124 	res = evbuffer_read(bufev->input, fd, howmuch);
125 	if (res == -1) {
126 		if (errno == EAGAIN || errno == EINTR)
127 			goto reschedule;
128 		/* error case */
129 		what |= EVBUFFER_ERROR;
130 	} else if (res == 0) {
131 		/* eof case */
132 		what |= EVBUFFER_EOF;
133 	}
134 
135 	if (res <= 0)
136 		goto error;
137 
138 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
139 
140 	/* See if this callbacks meets the water marks */
141 	len = EVBUFFER_LENGTH(bufev->input);
142 	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
143 		return;
144 	if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
145 		struct evbuffer *buf = bufev->input;
146 		event_del(&bufev->ev_read);
147 
148 		/* Now schedule a callback for us when the buffer changes */
149 		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
150 	}
151 
152 	/* Invoke the user callback - must always be called last */
153 	if (bufev->readcb != NULL)
154 		(*bufev->readcb)(bufev, bufev->cbarg);
155 	return;
156 
157  reschedule:
158 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
159 	return;
160 
161  error:
162 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
163 }
164 
165 static void
166 bufferevent_writecb(int fd, short event, void *arg)
167 {
168 	struct bufferevent *bufev = arg;
169 	int res = 0;
170 	short what = EVBUFFER_WRITE;
171 
172 	if (event == EV_TIMEOUT) {
173 		what |= EVBUFFER_TIMEOUT;
174 		goto error;
175 	}
176 
177 	if (EVBUFFER_LENGTH(bufev->output)) {
178 	    res = evbuffer_write(bufev->output, fd);
179 	    if (res == -1) {
180 #ifndef WIN32
181 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
182  *set errno. thus this error checking is not portable*/
183 		    if (errno == EAGAIN ||
184 			errno == EINTR ||
185 			errno == EINPROGRESS)
186 			    goto reschedule;
187 		    /* error case */
188 		    what |= EVBUFFER_ERROR;
189 
190 #else
191 				goto reschedule;
192 #endif
193 
194 	    } else if (res == 0) {
195 		    /* eof case */
196 		    what |= EVBUFFER_EOF;
197 	    }
198 	    if (res <= 0)
199 		    goto error;
200 	}
201 
202 	if (EVBUFFER_LENGTH(bufev->output) != 0)
203 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
204 
205 	/*
206 	 * Invoke the user callback if our buffer is drained or below the
207 	 * low watermark.
208 	 */
209 	if (bufev->writecb != NULL &&
210 	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
211 		(*bufev->writecb)(bufev, bufev->cbarg);
212 
213 	return;
214 
215  reschedule:
216 	if (EVBUFFER_LENGTH(bufev->output) != 0)
217 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
218 	return;
219 
220  error:
221 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
222 }
223 
224 /*
225  * Create a new buffered event object.
226  *
227  * The read callback is invoked whenever we read new data.
228  * The write callback is invoked whenever the output buffer is drained.
229  * The error callback is invoked on a write/read error or on EOF.
230  *
231  * Both read and write callbacks maybe NULL.  The error callback is not
232  * allowed to be NULL and have to be provided always.
233  */
234 
235 struct bufferevent *
236 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
237     everrorcb errorcb, void *cbarg)
238 {
239 	struct bufferevent *bufev;
240 
241 	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
242 		return (NULL);
243 
244 	if ((bufev->input = evbuffer_new()) == NULL) {
245 		free(bufev);
246 		return (NULL);
247 	}
248 
249 	if ((bufev->output = evbuffer_new()) == NULL) {
250 		evbuffer_free(bufev->input);
251 		free(bufev);
252 		return (NULL);
253 	}
254 
255 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
256 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
257 
258 	bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
259 
260 	/*
261 	 * Set to EV_WRITE so that using bufferevent_write is going to
262 	 * trigger a callback.  Reading needs to be explicitly enabled
263 	 * because otherwise no data will be available.
264 	 */
265 	bufev->enabled = EV_WRITE;
266 
267 	return (bufev);
268 }
269 
270 void
271 bufferevent_setcb(struct bufferevent *bufev,
272     evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
273 {
274 	bufev->readcb = readcb;
275 	bufev->writecb = writecb;
276 	bufev->errorcb = errorcb;
277 
278 	bufev->cbarg = cbarg;
279 }
280 
281 void
282 bufferevent_setfd(struct bufferevent *bufev, int fd)
283 {
284 	event_del(&bufev->ev_read);
285 	event_del(&bufev->ev_write);
286 
287 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
288 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
289 	if (bufev->ev_base != NULL) {
290 		event_base_set(bufev->ev_base, &bufev->ev_read);
291 		event_base_set(bufev->ev_base, &bufev->ev_write);
292 	}
293 
294 	/* might have to manually trigger event registration */
295 }
296 
297 int
298 bufferevent_priority_set(struct bufferevent *bufev, int priority)
299 {
300 	if (event_priority_set(&bufev->ev_read, priority) == -1)
301 		return (-1);
302 	if (event_priority_set(&bufev->ev_write, priority) == -1)
303 		return (-1);
304 
305 	return (0);
306 }
307 
308 /* Closing the file descriptor is the responsibility of the caller */
309 
310 void
311 bufferevent_free(struct bufferevent *bufev)
312 {
313 	event_del(&bufev->ev_read);
314 	event_del(&bufev->ev_write);
315 
316 	evbuffer_free(bufev->input);
317 	evbuffer_free(bufev->output);
318 
319 	free(bufev);
320 }
321 
322 /*
323  * Returns 0 on success;
324  *        -1 on failure.
325  */
326 
327 int
328 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
329 {
330 	int res;
331 
332 	res = evbuffer_add(bufev->output, data, size);
333 
334 	if (res == -1)
335 		return (res);
336 
337 	/* If everything is okay, we need to schedule a write */
338 	if (size > 0 && (bufev->enabled & EV_WRITE))
339 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
340 
341 	return (res);
342 }
343 
344 int
345 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
346 {
347 	int res;
348 
349 	res = bufferevent_write(bufev, buf->buffer, buf->off);
350 	if (res != -1)
351 		evbuffer_drain(buf, buf->off);
352 
353 	return (res);
354 }
355 
356 size_t
357 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
358 {
359 	struct evbuffer *buf = bufev->input;
360 
361 	if (buf->off < size)
362 		size = buf->off;
363 
364 	/* Copy the available data to the user buffer */
365 	memcpy(data, buf->buffer, size);
366 
367 	if (size)
368 		evbuffer_drain(buf, size);
369 
370 	return (size);
371 }
372 
373 int
374 bufferevent_enable(struct bufferevent *bufev, short event)
375 {
376 	if (event & EV_READ) {
377 		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
378 			return (-1);
379 	}
380 	if (event & EV_WRITE) {
381 		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
382 			return (-1);
383 	}
384 
385 	bufev->enabled |= event;
386 	return (0);
387 }
388 
389 int
390 bufferevent_disable(struct bufferevent *bufev, short event)
391 {
392 	if (event & EV_READ) {
393 		if (event_del(&bufev->ev_read) == -1)
394 			return (-1);
395 	}
396 	if (event & EV_WRITE) {
397 		if (event_del(&bufev->ev_write) == -1)
398 			return (-1);
399 	}
400 
401 	bufev->enabled &= ~event;
402 	return (0);
403 }
404 
405 /*
406  * Sets the read and write timeout for a buffered event.
407  */
408 
409 void
410 bufferevent_settimeout(struct bufferevent *bufev,
411     int timeout_read, int timeout_write) {
412 	bufev->timeout_read = timeout_read;
413 	bufev->timeout_write = timeout_write;
414 
415 	if (event_pending(&bufev->ev_read, EV_READ, NULL))
416 		bufferevent_add(&bufev->ev_read, timeout_read);
417 	if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
418 		bufferevent_add(&bufev->ev_write, timeout_write);
419 }
420 
421 /*
422  * Sets the water marks
423  */
424 
425 void
426 bufferevent_setwatermark(struct bufferevent *bufev, short events,
427     size_t lowmark, size_t highmark)
428 {
429 	if (events & EV_READ) {
430 		bufev->wm_read.low = lowmark;
431 		bufev->wm_read.high = highmark;
432 	}
433 
434 	if (events & EV_WRITE) {
435 		bufev->wm_write.low = lowmark;
436 		bufev->wm_write.high = highmark;
437 	}
438 
439 	/* If the watermarks changed then see if we should call read again */
440 	bufferevent_read_pressure_cb(bufev->input,
441 	    0, EVBUFFER_LENGTH(bufev->input), bufev);
442 }
443 
444 int
445 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
446 {
447 	int res;
448 
449 	bufev->ev_base = base;
450 
451 	res = event_base_set(base, &bufev->ev_read);
452 	if (res == -1)
453 		return (res);
454 
455 	res = event_base_set(base, &bufev->ev_write);
456 	return (res);
457 }
458