xref: /openbsd/lib/libevent/evbuffer.c (revision 824b820d)
1 /*	$OpenBSD: evbuffer.c,v 1.17 2014/10/30 16:45:37 bluhm Exp $	*/
2 
3 /*
4  * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. The name of the author may not be used to endorse or promote products
16  *    derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include <sys/types.h>
31 #include <sys/time.h>
32 
33 #include <errno.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <stdarg.h>
38 
39 #include "event.h"
40 
41 /* prototypes */
42 
43 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
44 
45 static int
bufferevent_add(struct event * ev,int timeout)46 bufferevent_add(struct event *ev, int timeout)
47 {
48 	struct timeval tv, *ptv = NULL;
49 
50 	if (timeout) {
51 		timerclear(&tv);
52 		tv.tv_sec = timeout;
53 		ptv = &tv;
54 	}
55 
56 	return (event_add(ev, ptv));
57 }
58 
59 /*
60  * This callback is executed when the size of the input buffer changes.
61  * We use it to apply back pressure on the reading side.
62  */
63 
64 void
bufferevent_read_pressure_cb(struct evbuffer * buf,size_t old,size_t now,void * arg)65 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
66     void *arg) {
67 	struct bufferevent *bufev = arg;
68 	/*
69 	 * If we are below the watermark then reschedule reading if it's
70 	 * still enabled.
71 	 */
72 	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
73 		evbuffer_setcb(buf, NULL, NULL);
74 
75 		if (bufev->enabled & EV_READ)
76 			bufferevent_add(&bufev->ev_read, bufev->timeout_read);
77 	}
78 }
79 
80 static void
bufferevent_readcb(int fd,short event,void * arg)81 bufferevent_readcb(int fd, short event, void *arg)
82 {
83 	struct bufferevent *bufev = arg;
84 	int res = 0;
85 	short what = EVBUFFER_READ;
86 	size_t len;
87 	int howmuch = -1;
88 
89 	if (event == EV_TIMEOUT) {
90 		what |= EVBUFFER_TIMEOUT;
91 		goto error;
92 	}
93 
94 	/*
95 	 * If we have a high watermark configured then we don't want to
96 	 * read more data than would make us reach the watermark.
97 	 */
98 	if (bufev->wm_read.high != 0) {
99 		howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
100 		/* we might have lowered the watermark, stop reading */
101 		if (howmuch <= 0) {
102 			struct evbuffer *buf = bufev->input;
103 			event_del(&bufev->ev_read);
104 			evbuffer_setcb(buf,
105 			    bufferevent_read_pressure_cb, bufev);
106 			return;
107 		}
108 	}
109 
110 	res = evbuffer_read(bufev->input, fd, howmuch);
111 	if (res == -1) {
112 		if (errno == EAGAIN || errno == EINTR)
113 			goto reschedule;
114 		/* error case */
115 		what |= EVBUFFER_ERROR;
116 	} else if (res == 0) {
117 		/* eof case */
118 		what |= EVBUFFER_EOF;
119 	}
120 
121 	if (res <= 0)
122 		goto error;
123 
124 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
125 
126 	/* See if this callbacks meets the water marks */
127 	len = EVBUFFER_LENGTH(bufev->input);
128 	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
129 		return;
130 	if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
131 		struct evbuffer *buf = bufev->input;
132 		event_del(&bufev->ev_read);
133 
134 		/* Now schedule a callback for us when the buffer changes */
135 		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
136 	}
137 
138 	/* Invoke the user callback - must always be called last */
139 	if (bufev->readcb != NULL)
140 		(*bufev->readcb)(bufev, bufev->cbarg);
141 	return;
142 
143  reschedule:
144 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
145 	return;
146 
147  error:
148 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
149 }
150 
151 static void
bufferevent_writecb(int fd,short event,void * arg)152 bufferevent_writecb(int fd, short event, void *arg)
153 {
154 	struct bufferevent *bufev = arg;
155 	int res = 0;
156 	short what = EVBUFFER_WRITE;
157 
158 	if (event == EV_TIMEOUT) {
159 		what |= EVBUFFER_TIMEOUT;
160 		goto error;
161 	}
162 
163 	if (EVBUFFER_LENGTH(bufev->output)) {
164 	    res = evbuffer_write(bufev->output, fd);
165 	    if (res == -1) {
166 		    if (errno == EAGAIN ||
167 			errno == EINTR ||
168 			errno == EINPROGRESS)
169 			    goto reschedule;
170 		    /* error case */
171 		    what |= EVBUFFER_ERROR;
172 	    } else if (res == 0) {
173 		    /* eof case */
174 		    what |= EVBUFFER_EOF;
175 	    }
176 	    if (res <= 0)
177 		    goto error;
178 	}
179 
180 	if (EVBUFFER_LENGTH(bufev->output) != 0)
181 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
182 
183 	/*
184 	 * Invoke the user callback if our buffer is drained or below the
185 	 * low watermark.
186 	 */
187 	if (bufev->writecb != NULL &&
188 	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
189 		(*bufev->writecb)(bufev, bufev->cbarg);
190 
191 	return;
192 
193  reschedule:
194 	if (EVBUFFER_LENGTH(bufev->output) != 0)
195 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
196 	return;
197 
198  error:
199 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
200 }
201 
202 /*
203  * Create a new buffered event object.
204  *
205  * The read callback is invoked whenever we read new data.
206  * The write callback is invoked whenever the output buffer is drained.
207  * The error callback is invoked on a write/read error or on EOF.
208  *
209  * Both read and write callbacks maybe NULL.  The error callback is not
210  * allowed to be NULL and have to be provided always.
211  */
212 
213 struct bufferevent *
bufferevent_new(int fd,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)214 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
215     everrorcb errorcb, void *cbarg)
216 {
217 	struct bufferevent *bufev;
218 
219 	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
220 		return (NULL);
221 
222 	if ((bufev->input = evbuffer_new()) == NULL) {
223 		free(bufev);
224 		return (NULL);
225 	}
226 
227 	if ((bufev->output = evbuffer_new()) == NULL) {
228 		evbuffer_free(bufev->input);
229 		free(bufev);
230 		return (NULL);
231 	}
232 
233 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
234 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
235 
236 	bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
237 
238 	/*
239 	 * Set to EV_WRITE so that using bufferevent_write is going to
240 	 * trigger a callback.  Reading needs to be explicitly enabled
241 	 * because otherwise no data will be available.
242 	 */
243 	bufev->enabled = EV_WRITE;
244 
245 	return (bufev);
246 }
247 
248 void
bufferevent_setcb(struct bufferevent * bufev,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)249 bufferevent_setcb(struct bufferevent *bufev,
250     evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
251 {
252 	bufev->readcb = readcb;
253 	bufev->writecb = writecb;
254 	bufev->errorcb = errorcb;
255 
256 	bufev->cbarg = cbarg;
257 }
258 
259 void
bufferevent_setfd(struct bufferevent * bufev,int fd)260 bufferevent_setfd(struct bufferevent *bufev, int fd)
261 {
262 	event_del(&bufev->ev_read);
263 	event_del(&bufev->ev_write);
264 
265 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
266 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
267 	if (bufev->ev_base != NULL) {
268 		event_base_set(bufev->ev_base, &bufev->ev_read);
269 		event_base_set(bufev->ev_base, &bufev->ev_write);
270 	}
271 
272 	/* might have to manually trigger event registration */
273 }
274 
275 int
bufferevent_priority_set(struct bufferevent * bufev,int priority)276 bufferevent_priority_set(struct bufferevent *bufev, int priority)
277 {
278 	if (event_priority_set(&bufev->ev_read, priority) == -1)
279 		return (-1);
280 	if (event_priority_set(&bufev->ev_write, priority) == -1)
281 		return (-1);
282 
283 	return (0);
284 }
285 
286 /* Closing the file descriptor is the responsibility of the caller */
287 
288 void
bufferevent_free(struct bufferevent * bufev)289 bufferevent_free(struct bufferevent *bufev)
290 {
291 	event_del(&bufev->ev_read);
292 	event_del(&bufev->ev_write);
293 
294 	evbuffer_free(bufev->input);
295 	evbuffer_free(bufev->output);
296 
297 	free(bufev);
298 }
299 
300 /*
301  * Returns 0 on success;
302  *        -1 on failure.
303  */
304 
305 int
bufferevent_write(struct bufferevent * bufev,const void * data,size_t size)306 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
307 {
308 	int res;
309 
310 	res = evbuffer_add(bufev->output, data, size);
311 
312 	if (res == -1)
313 		return (res);
314 
315 	/* If everything is okay, we need to schedule a write */
316 	if (size > 0 && (bufev->enabled & EV_WRITE))
317 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
318 
319 	return (res);
320 }
321 
322 int
bufferevent_write_buffer(struct bufferevent * bufev,struct evbuffer * buf)323 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
324 {
325 	int res;
326 
327 	res = bufferevent_write(bufev, buf->buffer, buf->off);
328 	if (res != -1)
329 		evbuffer_drain(buf, buf->off);
330 
331 	return (res);
332 }
333 
334 size_t
bufferevent_read(struct bufferevent * bufev,void * data,size_t size)335 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
336 {
337 	struct evbuffer *buf = bufev->input;
338 
339 	if (buf->off < size)
340 		size = buf->off;
341 
342 	/* Copy the available data to the user buffer */
343 	memcpy(data, buf->buffer, size);
344 
345 	if (size)
346 		evbuffer_drain(buf, size);
347 
348 	return (size);
349 }
350 
351 int
bufferevent_enable(struct bufferevent * bufev,short event)352 bufferevent_enable(struct bufferevent *bufev, short event)
353 {
354 	if (event & EV_READ) {
355 		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
356 			return (-1);
357 	}
358 	if (event & EV_WRITE) {
359 		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
360 			return (-1);
361 	}
362 
363 	bufev->enabled |= event;
364 	return (0);
365 }
366 
367 int
bufferevent_disable(struct bufferevent * bufev,short event)368 bufferevent_disable(struct bufferevent *bufev, short event)
369 {
370 	if (event & EV_READ) {
371 		if (event_del(&bufev->ev_read) == -1)
372 			return (-1);
373 	}
374 	if (event & EV_WRITE) {
375 		if (event_del(&bufev->ev_write) == -1)
376 			return (-1);
377 	}
378 
379 	bufev->enabled &= ~event;
380 	return (0);
381 }
382 
383 /*
384  * Sets the read and write timeout for a buffered event.
385  */
386 
387 void
bufferevent_settimeout(struct bufferevent * bufev,int timeout_read,int timeout_write)388 bufferevent_settimeout(struct bufferevent *bufev,
389     int timeout_read, int timeout_write) {
390 	bufev->timeout_read = timeout_read;
391 	bufev->timeout_write = timeout_write;
392 
393 	if (event_pending(&bufev->ev_read, EV_READ, NULL))
394 		bufferevent_add(&bufev->ev_read, timeout_read);
395 	if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
396 		bufferevent_add(&bufev->ev_write, timeout_write);
397 }
398 
399 /*
400  * Sets the water marks
401  */
402 
403 void
bufferevent_setwatermark(struct bufferevent * bufev,short events,size_t lowmark,size_t highmark)404 bufferevent_setwatermark(struct bufferevent *bufev, short events,
405     size_t lowmark, size_t highmark)
406 {
407 	if (events & EV_READ) {
408 		bufev->wm_read.low = lowmark;
409 		bufev->wm_read.high = highmark;
410 	}
411 
412 	if (events & EV_WRITE) {
413 		bufev->wm_write.low = lowmark;
414 		bufev->wm_write.high = highmark;
415 	}
416 
417 	/* If the watermarks changed then see if we should call read again */
418 	bufferevent_read_pressure_cb(bufev->input,
419 	    0, EVBUFFER_LENGTH(bufev->input), bufev);
420 }
421 
422 int
bufferevent_base_set(struct event_base * base,struct bufferevent * bufev)423 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
424 {
425 	int res;
426 
427 	bufev->ev_base = base;
428 
429 	res = event_base_set(base, &bufev->ev_read);
430 	if (res == -1)
431 		return (res);
432 
433 	res = event_base_set(base, &bufev->ev_write);
434 	return (res);
435 }
436