1 /* $OpenBSD: evbuffer.c,v 1.12 2010/04/21 20:02:40 nicm Exp $ */ 2 3 /* 4 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include <sys/types.h> 31 32 #ifdef HAVE_CONFIG_H 33 #include "config.h" 34 #endif 35 36 #ifdef HAVE_SYS_TIME_H 37 #include <sys/time.h> 38 #endif 39 40 #include <errno.h> 41 #include <stdio.h> 42 #include <stdlib.h> 43 #include <string.h> 44 #ifdef HAVE_STDARG_H 45 #include <stdarg.h> 46 #endif 47 48 #ifdef WIN32 49 #include <winsock2.h> 50 #endif 51 52 #include "evutil.h" 53 #include "event.h" 54 55 /* prototypes */ 56 57 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *); 58 59 static int 60 bufferevent_add(struct event *ev, int timeout) 61 { 62 struct timeval tv, *ptv = NULL; 63 64 if (timeout) { 65 evutil_timerclear(&tv); 66 tv.tv_sec = timeout; 67 ptv = &tv; 68 } 69 70 return (event_add(ev, ptv)); 71 } 72 73 /* 74 * This callback is executed when the size of the input buffer changes. 75 * We use it to apply back pressure on the reading side. 76 */ 77 78 void 79 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, 80 void *arg) { 81 struct bufferevent *bufev = arg; 82 /* 83 * If we are below the watermark then reschedule reading if it's 84 * still enabled. 85 */ 86 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { 87 evbuffer_setcb(buf, NULL, NULL); 88 89 if (bufev->enabled & EV_READ) 90 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 91 } 92 } 93 94 static void 95 bufferevent_readcb(int fd, short event, void *arg) 96 { 97 struct bufferevent *bufev = arg; 98 int res = 0; 99 short what = EVBUFFER_READ; 100 size_t len; 101 int howmuch = -1; 102 103 if (event == EV_TIMEOUT) { 104 what |= EVBUFFER_TIMEOUT; 105 goto error; 106 } 107 108 /* 109 * If we have a high watermark configured then we don't want to 110 * read more data than would make us reach the watermark. 111 */ 112 if (bufev->wm_read.high != 0) { 113 howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input); 114 /* we might have lowered the watermark, stop reading */ 115 if (howmuch <= 0) { 116 struct evbuffer *buf = bufev->input; 117 event_del(&bufev->ev_read); 118 evbuffer_setcb(buf, 119 bufferevent_read_pressure_cb, bufev); 120 return; 121 } 122 } 123 124 res = evbuffer_read(bufev->input, fd, howmuch); 125 if (res == -1) { 126 if (errno == EAGAIN || errno == EINTR) 127 goto reschedule; 128 /* error case */ 129 what |= EVBUFFER_ERROR; 130 } else if (res == 0) { 131 /* eof case */ 132 what |= EVBUFFER_EOF; 133 } 134 135 if (res <= 0) 136 goto error; 137 138 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 139 140 /* See if this callbacks meets the water marks */ 141 len = EVBUFFER_LENGTH(bufev->input); 142 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) 143 return; 144 if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) { 145 struct evbuffer *buf = bufev->input; 146 event_del(&bufev->ev_read); 147 148 /* Now schedule a callback for us when the buffer changes */ 149 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); 150 } 151 152 /* Invoke the user callback - must always be called last */ 153 if (bufev->readcb != NULL) 154 (*bufev->readcb)(bufev, bufev->cbarg); 155 return; 156 157 reschedule: 158 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 159 return; 160 161 error: 162 (*bufev->errorcb)(bufev, what, bufev->cbarg); 163 } 164 165 static void 166 bufferevent_writecb(int fd, short event, void *arg) 167 { 168 struct bufferevent *bufev = arg; 169 int res = 0; 170 short what = EVBUFFER_WRITE; 171 172 if (event == EV_TIMEOUT) { 173 what |= EVBUFFER_TIMEOUT; 174 goto error; 175 } 176 177 if (EVBUFFER_LENGTH(bufev->output)) { 178 res = evbuffer_write(bufev->output, fd); 179 if (res == -1) { 180 #ifndef WIN32 181 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not 182 *set errno. thus this error checking is not portable*/ 183 if (errno == EAGAIN || 184 errno == EINTR || 185 errno == EINPROGRESS) 186 goto reschedule; 187 /* error case */ 188 what |= EVBUFFER_ERROR; 189 190 #else 191 goto reschedule; 192 #endif 193 194 } else if (res == 0) { 195 /* eof case */ 196 what |= EVBUFFER_EOF; 197 } 198 if (res <= 0) 199 goto error; 200 } 201 202 if (EVBUFFER_LENGTH(bufev->output) != 0) 203 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 204 205 /* 206 * Invoke the user callback if our buffer is drained or below the 207 * low watermark. 208 */ 209 if (bufev->writecb != NULL && 210 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) 211 (*bufev->writecb)(bufev, bufev->cbarg); 212 213 return; 214 215 reschedule: 216 if (EVBUFFER_LENGTH(bufev->output) != 0) 217 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 218 return; 219 220 error: 221 (*bufev->errorcb)(bufev, what, bufev->cbarg); 222 } 223 224 /* 225 * Create a new buffered event object. 226 * 227 * The read callback is invoked whenever we read new data. 228 * The write callback is invoked whenever the output buffer is drained. 229 * The error callback is invoked on a write/read error or on EOF. 230 * 231 * Both read and write callbacks maybe NULL. The error callback is not 232 * allowed to be NULL and have to be provided always. 233 */ 234 235 struct bufferevent * 236 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, 237 everrorcb errorcb, void *cbarg) 238 { 239 struct bufferevent *bufev; 240 241 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL) 242 return (NULL); 243 244 if ((bufev->input = evbuffer_new()) == NULL) { 245 free(bufev); 246 return (NULL); 247 } 248 249 if ((bufev->output = evbuffer_new()) == NULL) { 250 evbuffer_free(bufev->input); 251 free(bufev); 252 return (NULL); 253 } 254 255 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 256 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 257 258 bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg); 259 260 /* 261 * Set to EV_WRITE so that using bufferevent_write is going to 262 * trigger a callback. Reading needs to be explicitly enabled 263 * because otherwise no data will be available. 264 */ 265 bufev->enabled = EV_WRITE; 266 267 return (bufev); 268 } 269 270 void 271 bufferevent_setcb(struct bufferevent *bufev, 272 evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg) 273 { 274 bufev->readcb = readcb; 275 bufev->writecb = writecb; 276 bufev->errorcb = errorcb; 277 278 bufev->cbarg = cbarg; 279 } 280 281 void 282 bufferevent_setfd(struct bufferevent *bufev, int fd) 283 { 284 event_del(&bufev->ev_read); 285 event_del(&bufev->ev_write); 286 287 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 288 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 289 if (bufev->ev_base != NULL) { 290 event_base_set(bufev->ev_base, &bufev->ev_read); 291 event_base_set(bufev->ev_base, &bufev->ev_write); 292 } 293 294 /* might have to manually trigger event registration */ 295 } 296 297 int 298 bufferevent_priority_set(struct bufferevent *bufev, int priority) 299 { 300 if (event_priority_set(&bufev->ev_read, priority) == -1) 301 return (-1); 302 if (event_priority_set(&bufev->ev_write, priority) == -1) 303 return (-1); 304 305 return (0); 306 } 307 308 /* Closing the file descriptor is the responsibility of the caller */ 309 310 void 311 bufferevent_free(struct bufferevent *bufev) 312 { 313 event_del(&bufev->ev_read); 314 event_del(&bufev->ev_write); 315 316 evbuffer_free(bufev->input); 317 evbuffer_free(bufev->output); 318 319 free(bufev); 320 } 321 322 /* 323 * Returns 0 on success; 324 * -1 on failure. 325 */ 326 327 int 328 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) 329 { 330 int res; 331 332 res = evbuffer_add(bufev->output, data, size); 333 334 if (res == -1) 335 return (res); 336 337 /* If everything is okay, we need to schedule a write */ 338 if (size > 0 && (bufev->enabled & EV_WRITE)) 339 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 340 341 return (res); 342 } 343 344 int 345 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 346 { 347 int res; 348 349 res = bufferevent_write(bufev, buf->buffer, buf->off); 350 if (res != -1) 351 evbuffer_drain(buf, buf->off); 352 353 return (res); 354 } 355 356 size_t 357 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) 358 { 359 struct evbuffer *buf = bufev->input; 360 361 if (buf->off < size) 362 size = buf->off; 363 364 /* Copy the available data to the user buffer */ 365 memcpy(data, buf->buffer, size); 366 367 if (size) 368 evbuffer_drain(buf, size); 369 370 return (size); 371 } 372 373 int 374 bufferevent_enable(struct bufferevent *bufev, short event) 375 { 376 if (event & EV_READ) { 377 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1) 378 return (-1); 379 } 380 if (event & EV_WRITE) { 381 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1) 382 return (-1); 383 } 384 385 bufev->enabled |= event; 386 return (0); 387 } 388 389 int 390 bufferevent_disable(struct bufferevent *bufev, short event) 391 { 392 if (event & EV_READ) { 393 if (event_del(&bufev->ev_read) == -1) 394 return (-1); 395 } 396 if (event & EV_WRITE) { 397 if (event_del(&bufev->ev_write) == -1) 398 return (-1); 399 } 400 401 bufev->enabled &= ~event; 402 return (0); 403 } 404 405 /* 406 * Sets the read and write timeout for a buffered event. 407 */ 408 409 void 410 bufferevent_settimeout(struct bufferevent *bufev, 411 int timeout_read, int timeout_write) { 412 bufev->timeout_read = timeout_read; 413 bufev->timeout_write = timeout_write; 414 415 if (event_pending(&bufev->ev_read, EV_READ, NULL)) 416 bufferevent_add(&bufev->ev_read, timeout_read); 417 if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) 418 bufferevent_add(&bufev->ev_write, timeout_write); 419 } 420 421 /* 422 * Sets the water marks 423 */ 424 425 void 426 bufferevent_setwatermark(struct bufferevent *bufev, short events, 427 size_t lowmark, size_t highmark) 428 { 429 if (events & EV_READ) { 430 bufev->wm_read.low = lowmark; 431 bufev->wm_read.high = highmark; 432 } 433 434 if (events & EV_WRITE) { 435 bufev->wm_write.low = lowmark; 436 bufev->wm_write.high = highmark; 437 } 438 439 /* If the watermarks changed then see if we should call read again */ 440 bufferevent_read_pressure_cb(bufev->input, 441 0, EVBUFFER_LENGTH(bufev->input), bufev); 442 } 443 444 int 445 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) 446 { 447 int res; 448 449 bufev->ev_base = base; 450 451 res = event_base_set(base, &bufev->ev_read); 452 if (res == -1) 453 return (res); 454 455 res = event_base_set(base, &bufev->ev_write); 456 return (res); 457 } 458