1 /* $OpenBSD: evbuffer.c,v 1.17 2014/10/30 16:45:37 bluhm Exp $ */ 2 3 /* 4 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include <sys/types.h> 31 #include <sys/time.h> 32 33 #include <errno.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <stdarg.h> 38 39 #include "event.h" 40 41 /* prototypes */ 42 43 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *); 44 45 static int 46 bufferevent_add(struct event *ev, int timeout) 47 { 48 struct timeval tv, *ptv = NULL; 49 50 if (timeout) { 51 timerclear(&tv); 52 tv.tv_sec = timeout; 53 ptv = &tv; 54 } 55 56 return (event_add(ev, ptv)); 57 } 58 59 /* 60 * This callback is executed when the size of the input buffer changes. 61 * We use it to apply back pressure on the reading side. 62 */ 63 64 void 65 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, 66 void *arg) { 67 struct bufferevent *bufev = arg; 68 /* 69 * If we are below the watermark then reschedule reading if it's 70 * still enabled. 71 */ 72 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { 73 evbuffer_setcb(buf, NULL, NULL); 74 75 if (bufev->enabled & EV_READ) 76 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 77 } 78 } 79 80 static void 81 bufferevent_readcb(int fd, short event, void *arg) 82 { 83 struct bufferevent *bufev = arg; 84 int res = 0; 85 short what = EVBUFFER_READ; 86 size_t len; 87 int howmuch = -1; 88 89 if (event == EV_TIMEOUT) { 90 what |= EVBUFFER_TIMEOUT; 91 goto error; 92 } 93 94 /* 95 * If we have a high watermark configured then we don't want to 96 * read more data than would make us reach the watermark. 97 */ 98 if (bufev->wm_read.high != 0) { 99 howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input); 100 /* we might have lowered the watermark, stop reading */ 101 if (howmuch <= 0) { 102 struct evbuffer *buf = bufev->input; 103 event_del(&bufev->ev_read); 104 evbuffer_setcb(buf, 105 bufferevent_read_pressure_cb, bufev); 106 return; 107 } 108 } 109 110 res = evbuffer_read(bufev->input, fd, howmuch); 111 if (res == -1) { 112 if (errno == EAGAIN || errno == EINTR) 113 goto reschedule; 114 /* error case */ 115 what |= EVBUFFER_ERROR; 116 } else if (res == 0) { 117 /* eof case */ 118 what |= EVBUFFER_EOF; 119 } 120 121 if (res <= 0) 122 goto error; 123 124 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 125 126 /* See if this callbacks meets the water marks */ 127 len = EVBUFFER_LENGTH(bufev->input); 128 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) 129 return; 130 if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) { 131 struct evbuffer *buf = bufev->input; 132 event_del(&bufev->ev_read); 133 134 /* Now schedule a callback for us when the buffer changes */ 135 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); 136 } 137 138 /* Invoke the user callback - must always be called last */ 139 if (bufev->readcb != NULL) 140 (*bufev->readcb)(bufev, bufev->cbarg); 141 return; 142 143 reschedule: 144 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 145 return; 146 147 error: 148 (*bufev->errorcb)(bufev, what, bufev->cbarg); 149 } 150 151 static void 152 bufferevent_writecb(int fd, short event, void *arg) 153 { 154 struct bufferevent *bufev = arg; 155 int res = 0; 156 short what = EVBUFFER_WRITE; 157 158 if (event == EV_TIMEOUT) { 159 what |= EVBUFFER_TIMEOUT; 160 goto error; 161 } 162 163 if (EVBUFFER_LENGTH(bufev->output)) { 164 res = evbuffer_write(bufev->output, fd); 165 if (res == -1) { 166 if (errno == EAGAIN || 167 errno == EINTR || 168 errno == EINPROGRESS) 169 goto reschedule; 170 /* error case */ 171 what |= EVBUFFER_ERROR; 172 } else if (res == 0) { 173 /* eof case */ 174 what |= EVBUFFER_EOF; 175 } 176 if (res <= 0) 177 goto error; 178 } 179 180 if (EVBUFFER_LENGTH(bufev->output) != 0) 181 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 182 183 /* 184 * Invoke the user callback if our buffer is drained or below the 185 * low watermark. 186 */ 187 if (bufev->writecb != NULL && 188 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) 189 (*bufev->writecb)(bufev, bufev->cbarg); 190 191 return; 192 193 reschedule: 194 if (EVBUFFER_LENGTH(bufev->output) != 0) 195 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 196 return; 197 198 error: 199 (*bufev->errorcb)(bufev, what, bufev->cbarg); 200 } 201 202 /* 203 * Create a new buffered event object. 204 * 205 * The read callback is invoked whenever we read new data. 206 * The write callback is invoked whenever the output buffer is drained. 207 * The error callback is invoked on a write/read error or on EOF. 208 * 209 * Both read and write callbacks maybe NULL. The error callback is not 210 * allowed to be NULL and have to be provided always. 211 */ 212 213 struct bufferevent * 214 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, 215 everrorcb errorcb, void *cbarg) 216 { 217 struct bufferevent *bufev; 218 219 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL) 220 return (NULL); 221 222 if ((bufev->input = evbuffer_new()) == NULL) { 223 free(bufev); 224 return (NULL); 225 } 226 227 if ((bufev->output = evbuffer_new()) == NULL) { 228 evbuffer_free(bufev->input); 229 free(bufev); 230 return (NULL); 231 } 232 233 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 234 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 235 236 bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg); 237 238 /* 239 * Set to EV_WRITE so that using bufferevent_write is going to 240 * trigger a callback. Reading needs to be explicitly enabled 241 * because otherwise no data will be available. 242 */ 243 bufev->enabled = EV_WRITE; 244 245 return (bufev); 246 } 247 248 void 249 bufferevent_setcb(struct bufferevent *bufev, 250 evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg) 251 { 252 bufev->readcb = readcb; 253 bufev->writecb = writecb; 254 bufev->errorcb = errorcb; 255 256 bufev->cbarg = cbarg; 257 } 258 259 void 260 bufferevent_setfd(struct bufferevent *bufev, int fd) 261 { 262 event_del(&bufev->ev_read); 263 event_del(&bufev->ev_write); 264 265 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 266 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 267 if (bufev->ev_base != NULL) { 268 event_base_set(bufev->ev_base, &bufev->ev_read); 269 event_base_set(bufev->ev_base, &bufev->ev_write); 270 } 271 272 /* might have to manually trigger event registration */ 273 } 274 275 int 276 bufferevent_priority_set(struct bufferevent *bufev, int priority) 277 { 278 if (event_priority_set(&bufev->ev_read, priority) == -1) 279 return (-1); 280 if (event_priority_set(&bufev->ev_write, priority) == -1) 281 return (-1); 282 283 return (0); 284 } 285 286 /* Closing the file descriptor is the responsibility of the caller */ 287 288 void 289 bufferevent_free(struct bufferevent *bufev) 290 { 291 event_del(&bufev->ev_read); 292 event_del(&bufev->ev_write); 293 294 evbuffer_free(bufev->input); 295 evbuffer_free(bufev->output); 296 297 free(bufev); 298 } 299 300 /* 301 * Returns 0 on success; 302 * -1 on failure. 303 */ 304 305 int 306 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) 307 { 308 int res; 309 310 res = evbuffer_add(bufev->output, data, size); 311 312 if (res == -1) 313 return (res); 314 315 /* If everything is okay, we need to schedule a write */ 316 if (size > 0 && (bufev->enabled & EV_WRITE)) 317 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 318 319 return (res); 320 } 321 322 int 323 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 324 { 325 int res; 326 327 res = bufferevent_write(bufev, buf->buffer, buf->off); 328 if (res != -1) 329 evbuffer_drain(buf, buf->off); 330 331 return (res); 332 } 333 334 size_t 335 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) 336 { 337 struct evbuffer *buf = bufev->input; 338 339 if (buf->off < size) 340 size = buf->off; 341 342 /* Copy the available data to the user buffer */ 343 memcpy(data, buf->buffer, size); 344 345 if (size) 346 evbuffer_drain(buf, size); 347 348 return (size); 349 } 350 351 int 352 bufferevent_enable(struct bufferevent *bufev, short event) 353 { 354 if (event & EV_READ) { 355 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1) 356 return (-1); 357 } 358 if (event & EV_WRITE) { 359 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1) 360 return (-1); 361 } 362 363 bufev->enabled |= event; 364 return (0); 365 } 366 367 int 368 bufferevent_disable(struct bufferevent *bufev, short event) 369 { 370 if (event & EV_READ) { 371 if (event_del(&bufev->ev_read) == -1) 372 return (-1); 373 } 374 if (event & EV_WRITE) { 375 if (event_del(&bufev->ev_write) == -1) 376 return (-1); 377 } 378 379 bufev->enabled &= ~event; 380 return (0); 381 } 382 383 /* 384 * Sets the read and write timeout for a buffered event. 385 */ 386 387 void 388 bufferevent_settimeout(struct bufferevent *bufev, 389 int timeout_read, int timeout_write) { 390 bufev->timeout_read = timeout_read; 391 bufev->timeout_write = timeout_write; 392 393 if (event_pending(&bufev->ev_read, EV_READ, NULL)) 394 bufferevent_add(&bufev->ev_read, timeout_read); 395 if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) 396 bufferevent_add(&bufev->ev_write, timeout_write); 397 } 398 399 /* 400 * Sets the water marks 401 */ 402 403 void 404 bufferevent_setwatermark(struct bufferevent *bufev, short events, 405 size_t lowmark, size_t highmark) 406 { 407 if (events & EV_READ) { 408 bufev->wm_read.low = lowmark; 409 bufev->wm_read.high = highmark; 410 } 411 412 if (events & EV_WRITE) { 413 bufev->wm_write.low = lowmark; 414 bufev->wm_write.high = highmark; 415 } 416 417 /* If the watermarks changed then see if we should call read again */ 418 bufferevent_read_pressure_cb(bufev->input, 419 0, EVBUFFER_LENGTH(bufev->input), bufev); 420 } 421 422 int 423 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) 424 { 425 int res; 426 427 bufev->ev_base = base; 428 429 res = event_base_set(base, &bufev->ev_read); 430 if (res == -1) 431 return (res); 432 433 res = event_base_set(base, &bufev->ev_write); 434 return (res); 435 } 436