1 /* $NetBSD: bufferevent_pair.c,v 1.1.1.1 2013/04/11 16:43:26 christos Exp $ */ 2 /* 3 * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include <sys/types.h> 29 30 #ifdef WIN32 31 #include <winsock2.h> 32 #endif 33 34 #include "event2/event-config.h" 35 #include <sys/cdefs.h> 36 __RCSID("$NetBSD: bufferevent_pair.c,v 1.1.1.1 2013/04/11 16:43:26 christos Exp $"); 37 38 #include "event2/util.h" 39 #include "event2/buffer.h" 40 #include "event2/bufferevent.h" 41 #include "event2/bufferevent_struct.h" 42 #include "event2/event.h" 43 #include "defer-internal.h" 44 #include "bufferevent-internal.h" 45 #include "mm-internal.h" 46 #include "util-internal.h" 47 48 struct bufferevent_pair { 49 struct bufferevent_private bev; 50 struct bufferevent_pair *partner; 51 }; 52 53 54 /* Given a bufferevent that's really a bev part of a bufferevent_pair, 55 * return that bufferevent_filtered. Returns NULL otherwise.*/ 56 static inline struct bufferevent_pair * 57 upcast(struct bufferevent *bev) 58 { 59 struct bufferevent_pair *bev_p; 60 if (bev->be_ops != &bufferevent_ops_pair) 61 return NULL; 62 bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev); 63 EVUTIL_ASSERT(bev_p->bev.bev.be_ops == &bufferevent_ops_pair); 64 return bev_p; 65 } 66 67 #define downcast(bev_pair) (&(bev_pair)->bev.bev) 68 69 static inline void 70 incref_and_lock(struct bufferevent *b) 71 { 72 struct bufferevent_pair *bevp; 73 _bufferevent_incref_and_lock(b); 74 bevp = upcast(b); 75 if (bevp->partner) 76 _bufferevent_incref_and_lock(downcast(bevp->partner)); 77 } 78 79 static inline void 80 decref_and_unlock(struct bufferevent *b) 81 { 82 struct bufferevent_pair *bevp = upcast(b); 83 if (bevp->partner) 84 _bufferevent_decref_and_unlock(downcast(bevp->partner)); 85 _bufferevent_decref_and_unlock(b); 86 } 87 88 /* XXX Handle close */ 89 90 static void be_pair_outbuf_cb(struct evbuffer *, 91 const struct evbuffer_cb_info *, void *); 92 93 static struct bufferevent_pair * 94 bufferevent_pair_elt_new(struct event_base *base, 95 int options) 96 { 97 struct bufferevent_pair *bufev; 98 if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair)))) 99 return NULL; 100 if (bufferevent_init_common(&bufev->bev, base, &bufferevent_ops_pair, 101 options)) { 102 mm_free(bufev); 103 return NULL; 104 } 105 if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) { 106 bufferevent_free(downcast(bufev)); 107 return NULL; 108 } 109 110 _bufferevent_init_generic_timeout_cbs(&bufev->bev.bev); 111 112 return bufev; 113 } 114 115 int 116 bufferevent_pair_new(struct event_base *base, int options, 117 struct bufferevent *pair[2]) 118 { 119 struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; 120 int tmp_options; 121 122 options |= BEV_OPT_DEFER_CALLBACKS; 123 tmp_options = options & ~BEV_OPT_THREADSAFE; 124 125 bufev1 = bufferevent_pair_elt_new(base, options); 126 if (!bufev1) 127 return -1; 128 bufev2 = bufferevent_pair_elt_new(base, tmp_options); 129 if (!bufev2) { 130 bufferevent_free(downcast(bufev1)); 131 return -1; 132 } 133 134 if (options & BEV_OPT_THREADSAFE) { 135 /*XXXX check return */ 136 bufferevent_enable_locking(downcast(bufev2), bufev1->bev.lock); 137 } 138 139 bufev1->partner = bufev2; 140 bufev2->partner = bufev1; 141 142 evbuffer_freeze(downcast(bufev1)->input, 0); 143 evbuffer_freeze(downcast(bufev1)->output, 1); 144 evbuffer_freeze(downcast(bufev2)->input, 0); 145 evbuffer_freeze(downcast(bufev2)->output, 1); 146 147 pair[0] = downcast(bufev1); 148 pair[1] = downcast(bufev2); 149 150 return 0; 151 } 152 153 static void 154 be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, 155 int ignore_wm) 156 { 157 size_t src_size, dst_size; 158 size_t n; 159 160 evbuffer_unfreeze(src->output, 1); 161 evbuffer_unfreeze(dst->input, 0); 162 163 if (dst->wm_read.high) { 164 dst_size = evbuffer_get_length(dst->input); 165 if (dst_size < dst->wm_read.high) { 166 n = dst->wm_read.high - dst_size; 167 evbuffer_remove_buffer(src->output, dst->input, n); 168 } else { 169 if (!ignore_wm) 170 goto done; 171 n = evbuffer_get_length(src->output); 172 evbuffer_add_buffer(dst->input, src->output); 173 } 174 } else { 175 n = evbuffer_get_length(src->output); 176 evbuffer_add_buffer(dst->input, src->output); 177 } 178 179 if (n) { 180 BEV_RESET_GENERIC_READ_TIMEOUT(dst); 181 182 if (evbuffer_get_length(dst->output)) 183 BEV_RESET_GENERIC_WRITE_TIMEOUT(dst); 184 else 185 BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); 186 } 187 188 src_size = evbuffer_get_length(src->output); 189 dst_size = evbuffer_get_length(dst->input); 190 191 if (dst_size >= dst->wm_read.low) { 192 _bufferevent_run_readcb(dst); 193 } 194 if (src_size <= src->wm_write.low) { 195 _bufferevent_run_writecb(src); 196 } 197 done: 198 evbuffer_freeze(src->output, 1); 199 evbuffer_freeze(dst->input, 0); 200 } 201 202 static inline int 203 be_pair_wants_to_talk(struct bufferevent_pair *src, 204 struct bufferevent_pair *dst) 205 { 206 return (downcast(src)->enabled & EV_WRITE) && 207 (downcast(dst)->enabled & EV_READ) && 208 !dst->bev.read_suspended && 209 evbuffer_get_length(downcast(src)->output); 210 } 211 212 static void 213 be_pair_outbuf_cb(struct evbuffer *outbuf, 214 const struct evbuffer_cb_info *info, void *arg) 215 { 216 struct bufferevent_pair *bev_pair = arg; 217 struct bufferevent_pair *partner = bev_pair->partner; 218 219 incref_and_lock(downcast(bev_pair)); 220 221 if (info->n_added > info->n_deleted && partner) { 222 /* We got more data. If the other side's reading, then 223 hand it over. */ 224 if (be_pair_wants_to_talk(bev_pair, partner)) { 225 be_pair_transfer(downcast(bev_pair), downcast(partner), 0); 226 } 227 } 228 229 decref_and_unlock(downcast(bev_pair)); 230 } 231 232 static int 233 be_pair_enable(struct bufferevent *bufev, short events) 234 { 235 struct bufferevent_pair *bev_p = upcast(bufev); 236 struct bufferevent_pair *partner = bev_p->partner; 237 238 incref_and_lock(bufev); 239 240 if (events & EV_READ) { 241 BEV_RESET_GENERIC_READ_TIMEOUT(bufev); 242 } 243 if ((events & EV_WRITE) && evbuffer_get_length(bufev->output)) 244 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 245 246 /* We're starting to read! Does the other side have anything to write?*/ 247 if ((events & EV_READ) && partner && 248 be_pair_wants_to_talk(partner, bev_p)) { 249 be_pair_transfer(downcast(partner), bufev, 0); 250 } 251 /* We're starting to write! Does the other side want to read? */ 252 if ((events & EV_WRITE) && partner && 253 be_pair_wants_to_talk(bev_p, partner)) { 254 be_pair_transfer(bufev, downcast(partner), 0); 255 } 256 decref_and_unlock(bufev); 257 return 0; 258 } 259 260 static int 261 be_pair_disable(struct bufferevent *bev, short events) 262 { 263 if (events & EV_READ) { 264 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 265 } 266 if (events & EV_WRITE) 267 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 268 return 0; 269 } 270 271 static void 272 be_pair_destruct(struct bufferevent *bev) 273 { 274 struct bufferevent_pair *bev_p = upcast(bev); 275 276 if (bev_p->partner) { 277 bev_p->partner->partner = NULL; 278 bev_p->partner = NULL; 279 } 280 281 _bufferevent_del_generic_timeout_cbs(bev); 282 } 283 284 static int 285 be_pair_flush(struct bufferevent *bev, short iotype, 286 enum bufferevent_flush_mode mode) 287 { 288 struct bufferevent_pair *bev_p = upcast(bev); 289 struct bufferevent *partner; 290 incref_and_lock(bev); 291 if (!bev_p->partner) 292 return -1; 293 294 partner = downcast(bev_p->partner); 295 296 if (mode == BEV_NORMAL) 297 return 0; 298 299 if ((iotype & EV_READ) != 0) 300 be_pair_transfer(partner, bev, 1); 301 302 if ((iotype & EV_WRITE) != 0) 303 be_pair_transfer(bev, partner, 1); 304 305 if (mode == BEV_FINISHED) { 306 _bufferevent_run_eventcb(partner, iotype|BEV_EVENT_EOF); 307 } 308 decref_and_unlock(bev); 309 return 0; 310 } 311 312 struct bufferevent * 313 bufferevent_pair_get_partner(struct bufferevent *bev) 314 { 315 struct bufferevent_pair *bev_p; 316 struct bufferevent *partner; 317 bev_p = upcast(bev); 318 if (! bev_p) 319 return NULL; 320 321 incref_and_lock(bev); 322 partner = downcast(bev_p->partner); 323 decref_and_unlock(bev); 324 return partner; 325 } 326 327 const struct bufferevent_ops bufferevent_ops_pair = { 328 "pair_elt", 329 evutil_offsetof(struct bufferevent_pair, bev.bev), 330 be_pair_enable, 331 be_pair_disable, 332 be_pair_destruct, 333 _bufferevent_generic_adj_timeouts, 334 be_pair_flush, 335 NULL, /* ctrl */ 336 }; 337