1 /* $OpenBSD: evbuffer.c,v 1.17 2014/10/30 16:45:37 bluhm Exp $ */
2
3 /*
4 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <sys/types.h>
31 #include <sys/time.h>
32
33 #include <errno.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <stdarg.h>
38
39 #include "event.h"
40
41 /* prototypes */
42
43 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
44
45 static int
bufferevent_add(struct event * ev,int timeout)46 bufferevent_add(struct event *ev, int timeout)
47 {
48 struct timeval tv, *ptv = NULL;
49
50 if (timeout) {
51 timerclear(&tv);
52 tv.tv_sec = timeout;
53 ptv = &tv;
54 }
55
56 return (event_add(ev, ptv));
57 }
58
59 /*
60 * This callback is executed when the size of the input buffer changes.
61 * We use it to apply back pressure on the reading side.
62 */
63
64 void
bufferevent_read_pressure_cb(struct evbuffer * buf,size_t old,size_t now,void * arg)65 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
66 void *arg) {
67 struct bufferevent *bufev = arg;
68 /*
69 * If we are below the watermark then reschedule reading if it's
70 * still enabled.
71 */
72 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
73 evbuffer_setcb(buf, NULL, NULL);
74
75 if (bufev->enabled & EV_READ)
76 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
77 }
78 }
79
80 static void
bufferevent_readcb(int fd,short event,void * arg)81 bufferevent_readcb(int fd, short event, void *arg)
82 {
83 struct bufferevent *bufev = arg;
84 int res = 0;
85 short what = EVBUFFER_READ;
86 size_t len;
87 int howmuch = -1;
88
89 if (event == EV_TIMEOUT) {
90 what |= EVBUFFER_TIMEOUT;
91 goto error;
92 }
93
94 /*
95 * If we have a high watermark configured then we don't want to
96 * read more data than would make us reach the watermark.
97 */
98 if (bufev->wm_read.high != 0) {
99 howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
100 /* we might have lowered the watermark, stop reading */
101 if (howmuch <= 0) {
102 struct evbuffer *buf = bufev->input;
103 event_del(&bufev->ev_read);
104 evbuffer_setcb(buf,
105 bufferevent_read_pressure_cb, bufev);
106 return;
107 }
108 }
109
110 res = evbuffer_read(bufev->input, fd, howmuch);
111 if (res == -1) {
112 if (errno == EAGAIN || errno == EINTR)
113 goto reschedule;
114 /* error case */
115 what |= EVBUFFER_ERROR;
116 } else if (res == 0) {
117 /* eof case */
118 what |= EVBUFFER_EOF;
119 }
120
121 if (res <= 0)
122 goto error;
123
124 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
125
126 /* See if this callbacks meets the water marks */
127 len = EVBUFFER_LENGTH(bufev->input);
128 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
129 return;
130 if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
131 struct evbuffer *buf = bufev->input;
132 event_del(&bufev->ev_read);
133
134 /* Now schedule a callback for us when the buffer changes */
135 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
136 }
137
138 /* Invoke the user callback - must always be called last */
139 if (bufev->readcb != NULL)
140 (*bufev->readcb)(bufev, bufev->cbarg);
141 return;
142
143 reschedule:
144 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
145 return;
146
147 error:
148 (*bufev->errorcb)(bufev, what, bufev->cbarg);
149 }
150
151 static void
bufferevent_writecb(int fd,short event,void * arg)152 bufferevent_writecb(int fd, short event, void *arg)
153 {
154 struct bufferevent *bufev = arg;
155 int res = 0;
156 short what = EVBUFFER_WRITE;
157
158 if (event == EV_TIMEOUT) {
159 what |= EVBUFFER_TIMEOUT;
160 goto error;
161 }
162
163 if (EVBUFFER_LENGTH(bufev->output)) {
164 res = evbuffer_write(bufev->output, fd);
165 if (res == -1) {
166 if (errno == EAGAIN ||
167 errno == EINTR ||
168 errno == EINPROGRESS)
169 goto reschedule;
170 /* error case */
171 what |= EVBUFFER_ERROR;
172 } else if (res == 0) {
173 /* eof case */
174 what |= EVBUFFER_EOF;
175 }
176 if (res <= 0)
177 goto error;
178 }
179
180 if (EVBUFFER_LENGTH(bufev->output) != 0)
181 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
182
183 /*
184 * Invoke the user callback if our buffer is drained or below the
185 * low watermark.
186 */
187 if (bufev->writecb != NULL &&
188 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
189 (*bufev->writecb)(bufev, bufev->cbarg);
190
191 return;
192
193 reschedule:
194 if (EVBUFFER_LENGTH(bufev->output) != 0)
195 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
196 return;
197
198 error:
199 (*bufev->errorcb)(bufev, what, bufev->cbarg);
200 }
201
202 /*
203 * Create a new buffered event object.
204 *
205 * The read callback is invoked whenever we read new data.
206 * The write callback is invoked whenever the output buffer is drained.
207 * The error callback is invoked on a write/read error or on EOF.
208 *
209 * Both read and write callbacks maybe NULL. The error callback is not
210 * allowed to be NULL and have to be provided always.
211 */
212
213 struct bufferevent *
bufferevent_new(int fd,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)214 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
215 everrorcb errorcb, void *cbarg)
216 {
217 struct bufferevent *bufev;
218
219 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
220 return (NULL);
221
222 if ((bufev->input = evbuffer_new()) == NULL) {
223 free(bufev);
224 return (NULL);
225 }
226
227 if ((bufev->output = evbuffer_new()) == NULL) {
228 evbuffer_free(bufev->input);
229 free(bufev);
230 return (NULL);
231 }
232
233 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
234 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
235
236 bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
237
238 /*
239 * Set to EV_WRITE so that using bufferevent_write is going to
240 * trigger a callback. Reading needs to be explicitly enabled
241 * because otherwise no data will be available.
242 */
243 bufev->enabled = EV_WRITE;
244
245 return (bufev);
246 }
247
248 void
bufferevent_setcb(struct bufferevent * bufev,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)249 bufferevent_setcb(struct bufferevent *bufev,
250 evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
251 {
252 bufev->readcb = readcb;
253 bufev->writecb = writecb;
254 bufev->errorcb = errorcb;
255
256 bufev->cbarg = cbarg;
257 }
258
259 void
bufferevent_setfd(struct bufferevent * bufev,int fd)260 bufferevent_setfd(struct bufferevent *bufev, int fd)
261 {
262 event_del(&bufev->ev_read);
263 event_del(&bufev->ev_write);
264
265 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
266 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
267 if (bufev->ev_base != NULL) {
268 event_base_set(bufev->ev_base, &bufev->ev_read);
269 event_base_set(bufev->ev_base, &bufev->ev_write);
270 }
271
272 /* might have to manually trigger event registration */
273 }
274
275 int
bufferevent_priority_set(struct bufferevent * bufev,int priority)276 bufferevent_priority_set(struct bufferevent *bufev, int priority)
277 {
278 if (event_priority_set(&bufev->ev_read, priority) == -1)
279 return (-1);
280 if (event_priority_set(&bufev->ev_write, priority) == -1)
281 return (-1);
282
283 return (0);
284 }
285
286 /* Closing the file descriptor is the responsibility of the caller */
287
288 void
bufferevent_free(struct bufferevent * bufev)289 bufferevent_free(struct bufferevent *bufev)
290 {
291 event_del(&bufev->ev_read);
292 event_del(&bufev->ev_write);
293
294 evbuffer_free(bufev->input);
295 evbuffer_free(bufev->output);
296
297 free(bufev);
298 }
299
300 /*
301 * Returns 0 on success;
302 * -1 on failure.
303 */
304
305 int
bufferevent_write(struct bufferevent * bufev,const void * data,size_t size)306 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
307 {
308 int res;
309
310 res = evbuffer_add(bufev->output, data, size);
311
312 if (res == -1)
313 return (res);
314
315 /* If everything is okay, we need to schedule a write */
316 if (size > 0 && (bufev->enabled & EV_WRITE))
317 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
318
319 return (res);
320 }
321
322 int
bufferevent_write_buffer(struct bufferevent * bufev,struct evbuffer * buf)323 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
324 {
325 int res;
326
327 res = bufferevent_write(bufev, buf->buffer, buf->off);
328 if (res != -1)
329 evbuffer_drain(buf, buf->off);
330
331 return (res);
332 }
333
334 size_t
bufferevent_read(struct bufferevent * bufev,void * data,size_t size)335 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
336 {
337 struct evbuffer *buf = bufev->input;
338
339 if (buf->off < size)
340 size = buf->off;
341
342 /* Copy the available data to the user buffer */
343 memcpy(data, buf->buffer, size);
344
345 if (size)
346 evbuffer_drain(buf, size);
347
348 return (size);
349 }
350
351 int
bufferevent_enable(struct bufferevent * bufev,short event)352 bufferevent_enable(struct bufferevent *bufev, short event)
353 {
354 if (event & EV_READ) {
355 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
356 return (-1);
357 }
358 if (event & EV_WRITE) {
359 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
360 return (-1);
361 }
362
363 bufev->enabled |= event;
364 return (0);
365 }
366
367 int
bufferevent_disable(struct bufferevent * bufev,short event)368 bufferevent_disable(struct bufferevent *bufev, short event)
369 {
370 if (event & EV_READ) {
371 if (event_del(&bufev->ev_read) == -1)
372 return (-1);
373 }
374 if (event & EV_WRITE) {
375 if (event_del(&bufev->ev_write) == -1)
376 return (-1);
377 }
378
379 bufev->enabled &= ~event;
380 return (0);
381 }
382
383 /*
384 * Sets the read and write timeout for a buffered event.
385 */
386
387 void
bufferevent_settimeout(struct bufferevent * bufev,int timeout_read,int timeout_write)388 bufferevent_settimeout(struct bufferevent *bufev,
389 int timeout_read, int timeout_write) {
390 bufev->timeout_read = timeout_read;
391 bufev->timeout_write = timeout_write;
392
393 if (event_pending(&bufev->ev_read, EV_READ, NULL))
394 bufferevent_add(&bufev->ev_read, timeout_read);
395 if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
396 bufferevent_add(&bufev->ev_write, timeout_write);
397 }
398
399 /*
400 * Sets the water marks
401 */
402
403 void
bufferevent_setwatermark(struct bufferevent * bufev,short events,size_t lowmark,size_t highmark)404 bufferevent_setwatermark(struct bufferevent *bufev, short events,
405 size_t lowmark, size_t highmark)
406 {
407 if (events & EV_READ) {
408 bufev->wm_read.low = lowmark;
409 bufev->wm_read.high = highmark;
410 }
411
412 if (events & EV_WRITE) {
413 bufev->wm_write.low = lowmark;
414 bufev->wm_write.high = highmark;
415 }
416
417 /* If the watermarks changed then see if we should call read again */
418 bufferevent_read_pressure_cb(bufev->input,
419 0, EVBUFFER_LENGTH(bufev->input), bufev);
420 }
421
422 int
bufferevent_base_set(struct event_base * base,struct bufferevent * bufev)423 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
424 {
425 int res;
426
427 bufev->ev_base = base;
428
429 res = event_base_set(base, &bufev->ev_read);
430 if (res == -1)
431 return (res);
432
433 res = event_base_set(base, &bufev->ev_write);
434 return (res);
435 }
436