1 /*
2 * Copyright (c) 2003-2007 Niels Provos <provos@citi.umich.edu>
3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27 #include "util-internal.h"
28
29 /* The old tests here need assertions to work. */
30 #undef NDEBUG
31
32 /**
33 * - clang supports __has_feature
34 * - gcc supports __SANITIZE_ADDRESS__
35 *
36 * Let's set __SANITIZE_ADDRESS__ if __has_feature(address_sanitizer)
37 */
38 #ifndef __has_feature
39 #define __has_feature(x) 0
40 #endif
41 #if !defined(__SANITIZE_ADDRESS__) && __has_feature(address_sanitizer)
42 #define __SANITIZE_ADDRESS__
43 #endif
44
45 #ifdef _WIN32
46 #include <winsock2.h>
47 #include <windows.h>
48 #endif
49
50 #include "event2/event-config.h"
51
52 #include <sys/types.h>
53 #include <sys/stat.h>
54 #ifdef EVENT__HAVE_SYS_TIME_H
55 #include <sys/time.h>
56 #endif
57 #include <sys/queue.h>
58 #ifndef _WIN32
59 #include <sys/socket.h>
60 #include <sys/wait.h>
61 #include <signal.h>
62 #include <unistd.h>
63 #include <netdb.h>
64 #include <netinet/in.h>
65 #endif
66 #include <fcntl.h>
67 #include <signal.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <string.h>
71 #include <errno.h>
72 #include <assert.h>
73
74 #ifdef EVENT__HAVE_ARPA_INET_H
75 #include <arpa/inet.h>
76 #endif
77
78 #include "event2/event-config.h"
79 #include "event2/event.h"
80 #include "event2/event_struct.h"
81 #include "event2/event_compat.h"
82 #include "event2/tag.h"
83 #include "event2/buffer.h"
84 #include "event2/bufferevent.h"
85 #include "event2/bufferevent_compat.h"
86 #include "event2/bufferevent_struct.h"
87 #include "event2/listener.h"
88 #include "event2/util.h"
89
90 #include "bufferevent-internal.h"
91 #include "evthread-internal.h"
92 #include "util-internal.h"
93 #ifdef _WIN32
94 #include "iocp-internal.h"
95 #endif
96
97 #include "regress.h"
98 #include "regress_testutils.h"
99
100 /*
101 * simple bufferevent test
102 */
103
104 static void
readcb(struct bufferevent * bev,void * arg)105 readcb(struct bufferevent *bev, void *arg)
106 {
107 if (evbuffer_get_length(bev->input) == 8333) {
108 struct evbuffer *evbuf = evbuffer_new();
109 assert(evbuf != NULL);
110
111 /* gratuitous test of bufferevent_read_buffer */
112 bufferevent_read_buffer(bev, evbuf);
113
114 bufferevent_disable(bev, EV_READ);
115
116 if (evbuffer_get_length(evbuf) == 8333) {
117 test_ok++;
118 }
119
120 evbuffer_free(evbuf);
121 }
122 }
123
124 static void
writecb(struct bufferevent * bev,void * arg)125 writecb(struct bufferevent *bev, void *arg)
126 {
127 if (evbuffer_get_length(bev->output) == 0) {
128 test_ok++;
129 }
130 }
131
132 static void
errorcb(struct bufferevent * bev,short what,void * arg)133 errorcb(struct bufferevent *bev, short what, void *arg)
134 {
135 test_ok = -2;
136 }
137
138 static void
test_bufferevent_impl(int use_pair,int flush)139 test_bufferevent_impl(int use_pair, int flush)
140 {
141 struct bufferevent *bev1 = NULL, *bev2 = NULL;
142 char buffer[8333];
143 int i;
144 int expected = 2;
145
146 if (use_pair) {
147 struct bufferevent *pair[2];
148 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
149 bev1 = pair[0];
150 bev2 = pair[1];
151 bufferevent_setcb(bev1, readcb, writecb, errorcb, bev1);
152 bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL);
153 tt_fd_op(bufferevent_getfd(bev1), ==, EVUTIL_INVALID_SOCKET);
154 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL);
155 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, bev2);
156 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, bev1);
157 } else {
158 bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL);
159 bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL);
160 tt_fd_op(bufferevent_getfd(bev1), ==, pair[0]);
161 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL);
162 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL);
163 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL);
164 }
165
166 {
167 /* Test getcb. */
168 bufferevent_data_cb r, w;
169 bufferevent_event_cb e;
170 void *a;
171 bufferevent_getcb(bev1, &r, &w, &e, &a);
172 tt_ptr_op(r, ==, readcb);
173 tt_ptr_op(w, ==, writecb);
174 tt_ptr_op(e, ==, errorcb);
175 tt_ptr_op(a, ==, use_pair ? bev1 : NULL);
176 }
177
178 bufferevent_disable(bev1, EV_READ);
179 bufferevent_enable(bev2, EV_READ);
180
181 tt_int_op(bufferevent_get_enabled(bev1), ==, EV_WRITE);
182 tt_int_op(bufferevent_get_enabled(bev2), ==, EV_WRITE|EV_READ);
183
184 for (i = 0; i < (int)sizeof(buffer); i++)
185 buffer[i] = i;
186
187 bufferevent_write(bev1, buffer, sizeof(buffer));
188 if (flush >= 0) {
189 tt_int_op(bufferevent_flush(bev1, EV_WRITE, flush), >=, 0);
190 }
191
192 event_dispatch();
193
194 bufferevent_free(bev2);
195 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL);
196 bufferevent_free(bev1);
197
198 /** Only pair call errorcb for BEV_FINISHED */
199 if (use_pair && flush == BEV_FINISHED) {
200 expected = -1;
201 }
202 if (test_ok != expected)
203 test_ok = 0;
204 end:
205 ;
206 }
207
test_bufferevent(void)208 static void test_bufferevent(void) { test_bufferevent_impl(0, -1); }
test_bufferevent_pair(void)209 static void test_bufferevent_pair(void) { test_bufferevent_impl(1, -1); }
210
test_bufferevent_flush_normal(void)211 static void test_bufferevent_flush_normal(void) { test_bufferevent_impl(0, BEV_NORMAL); }
test_bufferevent_flush_flush(void)212 static void test_bufferevent_flush_flush(void) { test_bufferevent_impl(0, BEV_FLUSH); }
test_bufferevent_flush_finished(void)213 static void test_bufferevent_flush_finished(void) { test_bufferevent_impl(0, BEV_FINISHED); }
214
test_bufferevent_pair_flush_normal(void)215 static void test_bufferevent_pair_flush_normal(void) { test_bufferevent_impl(1, BEV_NORMAL); }
test_bufferevent_pair_flush_flush(void)216 static void test_bufferevent_pair_flush_flush(void) { test_bufferevent_impl(1, BEV_FLUSH); }
test_bufferevent_pair_flush_finished(void)217 static void test_bufferevent_pair_flush_finished(void) { test_bufferevent_impl(1, BEV_FINISHED); }
218
219 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) && !defined(__SANITIZE_ADDRESS__)
220 /**
221 * Trace lock/unlock/alloc/free for locks.
222 * (More heavier then evthread_debug*)
223 */
224 typedef struct
225 {
226 void *lock;
227 enum {
228 ALLOC, FREE,
229 } status;
230 size_t locked /** allow recursive locking */;
231 } lock_wrapper;
232 struct lock_unlock_base
233 {
234 /* Original callbacks */
235 struct evthread_lock_callbacks cbs;
236 /* Map of locks */
237 lock_wrapper *locks;
238 size_t nr_locks;
239 } lu_base = {
240 .locks = NULL,
241 };
242
lu_find(void * lock_)243 static lock_wrapper *lu_find(void *lock_)
244 {
245 size_t i;
246 for (i = 0; i < lu_base.nr_locks; ++i) {
247 lock_wrapper *lock = &lu_base.locks[i];
248 if (lock->lock == lock_)
249 return lock;
250 }
251 return NULL;
252 }
253
trace_lock_alloc(unsigned locktype)254 static void *trace_lock_alloc(unsigned locktype)
255 {
256 void *lock;
257 ++lu_base.nr_locks;
258 lu_base.locks = realloc(lu_base.locks,
259 sizeof(lock_wrapper) * lu_base.nr_locks);
260 lock = lu_base.cbs.alloc(locktype);
261 lu_base.locks[lu_base.nr_locks - 1] = (lock_wrapper){ lock, ALLOC, 0 };
262 return lock;
263 }
trace_lock_free(void * lock_,unsigned locktype)264 static void trace_lock_free(void *lock_, unsigned locktype)
265 {
266 lock_wrapper *lock = lu_find(lock_);
267 if (!lock || lock->status == FREE || lock->locked) {
268 TT_FAIL(("lock: free error"));
269 } else {
270 lock->status = FREE;
271 lu_base.cbs.free(lock_, locktype);
272 }
273 }
trace_lock_lock(unsigned mode,void * lock_)274 static int trace_lock_lock(unsigned mode, void *lock_)
275 {
276 lock_wrapper *lock = lu_find(lock_);
277 if (!lock || lock->status == FREE) {
278 TT_FAIL(("lock: lock error"));
279 return -1;
280 } else {
281 ++lock->locked;
282 return lu_base.cbs.lock(mode, lock_);
283 }
284 }
trace_lock_unlock(unsigned mode,void * lock_)285 static int trace_lock_unlock(unsigned mode, void *lock_)
286 {
287 lock_wrapper *lock = lu_find(lock_);
288 if (!lock || lock->status == FREE || !lock->locked) {
289 TT_FAIL(("lock: unlock error"));
290 return -1;
291 } else {
292 --lock->locked;
293 return lu_base.cbs.unlock(mode, lock_);
294 }
295 }
lock_unlock_free_thread_cbs(void)296 static void lock_unlock_free_thread_cbs(void)
297 {
298 event_base_free(NULL);
299
300 if (libevent_tests_running_in_debug_mode)
301 libevent_global_shutdown();
302
303 /** drop immutable flag */
304 evthread_set_lock_callbacks(NULL);
305 /** avoid calling of event_global_setup_locks_() for new cbs */
306 libevent_global_shutdown();
307 /** drop immutable flag for non-debug ops (since called after shutdown) */
308 evthread_set_lock_callbacks(NULL);
309 }
310
use_lock_unlock_profiler(void)311 static int use_lock_unlock_profiler(void)
312 {
313 struct evthread_lock_callbacks cbs = {
314 EVTHREAD_LOCK_API_VERSION,
315 EVTHREAD_LOCKTYPE_RECURSIVE,
316 trace_lock_alloc,
317 trace_lock_free,
318 trace_lock_lock,
319 trace_lock_unlock,
320 };
321 memcpy(&lu_base.cbs, evthread_get_lock_callbacks(),
322 sizeof(lu_base.cbs));
323 {
324 lock_unlock_free_thread_cbs();
325
326 evthread_set_lock_callbacks(&cbs);
327 /** re-create debug locks correctly */
328 evthread_enable_lock_debugging();
329
330 event_init();
331 }
332 return 0;
333 }
free_lock_unlock_profiler(struct basic_test_data * data)334 static void free_lock_unlock_profiler(struct basic_test_data *data)
335 {
336 /** fix "held_by" for kqueue */
337 evthread_set_lock_callbacks(NULL);
338
339 lock_unlock_free_thread_cbs();
340 free(lu_base.locks);
341 data->base = NULL;
342 }
343
test_bufferevent_pair_release_lock(void * arg)344 static void test_bufferevent_pair_release_lock(void *arg)
345 {
346 struct basic_test_data *data = arg;
347 use_lock_unlock_profiler();
348 {
349 struct bufferevent *pair[2];
350 if (!bufferevent_pair_new(NULL, BEV_OPT_THREADSAFE, pair)) {
351 bufferevent_free(pair[0]);
352 bufferevent_free(pair[1]);
353 } else
354 tt_abort_perror("bufferevent_pair_new");
355 }
356 free_lock_unlock_profiler(data);
357 end:
358 ;
359 }
360 #endif
361
362 /*
363 * test watermarks and bufferevent
364 */
365
366 static void
wm_readcb(struct bufferevent * bev,void * arg)367 wm_readcb(struct bufferevent *bev, void *arg)
368 {
369 struct evbuffer *evbuf = evbuffer_new();
370 int len = (int)evbuffer_get_length(bev->input);
371 static int nread;
372
373 assert(len >= 10 && len <= 20);
374
375 assert(evbuf != NULL);
376
377 /* gratuitous test of bufferevent_read_buffer */
378 bufferevent_read_buffer(bev, evbuf);
379
380 nread += len;
381 if (nread == 65000) {
382 bufferevent_disable(bev, EV_READ);
383 test_ok++;
384 }
385
386 evbuffer_free(evbuf);
387 }
388
389 static void
wm_writecb(struct bufferevent * bev,void * arg)390 wm_writecb(struct bufferevent *bev, void *arg)
391 {
392 assert(evbuffer_get_length(bev->output) <= 100);
393 if (evbuffer_get_length(bev->output) == 0) {
394 evbuffer_drain(bev->output, evbuffer_get_length(bev->output));
395 test_ok++;
396 }
397 }
398
399 static void
wm_errorcb(struct bufferevent * bev,short what,void * arg)400 wm_errorcb(struct bufferevent *bev, short what, void *arg)
401 {
402 test_ok = -2;
403 }
404
405 static void
test_bufferevent_watermarks_impl(int use_pair)406 test_bufferevent_watermarks_impl(int use_pair)
407 {
408 struct bufferevent *bev1 = NULL, *bev2 = NULL;
409 char buffer[65000];
410 size_t low, high;
411 int i;
412 test_ok = 0;
413
414 if (use_pair) {
415 struct bufferevent *pair[2];
416 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
417 bev1 = pair[0];
418 bev2 = pair[1];
419 bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL);
420 bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL);
421 } else {
422 bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL);
423 bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL);
424 }
425 tt_assert(bev1);
426 tt_assert(bev2);
427 bufferevent_disable(bev1, EV_READ);
428 bufferevent_enable(bev2, EV_READ);
429
430 /* By default, low watermarks are set to 0 */
431 bufferevent_getwatermark(bev1, EV_READ, &low, NULL);
432 tt_int_op(low, ==, 0);
433 bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL);
434 tt_int_op(low, ==, 0);
435
436 for (i = 0; i < (int)sizeof(buffer); i++)
437 buffer[i] = (char)i;
438
439 /* limit the reading on the receiving bufferevent */
440 bufferevent_setwatermark(bev2, EV_READ, 10, 20);
441
442 bufferevent_getwatermark(bev2, EV_READ, &low, &high);
443 tt_int_op(low, ==, 10);
444 tt_int_op(high, ==, 20);
445
446 /* Tell the sending bufferevent not to notify us till it's down to
447 100 bytes. */
448 bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000);
449
450 bufferevent_getwatermark(bev1, EV_WRITE, &low, &high);
451 tt_int_op(low, ==, 100);
452 tt_int_op(high, ==, 2000);
453
454 {
455 int r = bufferevent_getwatermark(bev1, EV_WRITE | EV_READ, &low, &high);
456 tt_int_op(r, !=, 0);
457 }
458
459 bufferevent_write(bev1, buffer, sizeof(buffer));
460
461 event_dispatch();
462
463 tt_int_op(test_ok, ==, 2);
464
465 /* The write callback drained all the data from outbuf, so we
466 * should have removed the write event... */
467 tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL));
468
469 end:
470 if (bev1)
471 bufferevent_free(bev1);
472 if (bev2)
473 bufferevent_free(bev2);
474 }
475
476 static void
test_bufferevent_watermarks(void)477 test_bufferevent_watermarks(void)
478 {
479 test_bufferevent_watermarks_impl(0);
480 }
481
482 static void
test_bufferevent_pair_watermarks(void)483 test_bufferevent_pair_watermarks(void)
484 {
485 test_bufferevent_watermarks_impl(1);
486 }
487
488 /*
489 * Test bufferevent filters
490 */
491
492 /* strip an 'x' from each byte */
493
494 static enum bufferevent_filter_result
bufferevent_input_filter(struct evbuffer * src,struct evbuffer * dst,ev_ssize_t lim,enum bufferevent_flush_mode state,void * ctx)495 bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst,
496 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
497 {
498 const unsigned char *buffer;
499 unsigned i;
500
501 buffer = evbuffer_pullup(src, evbuffer_get_length(src));
502 for (i = 0; i < evbuffer_get_length(src); i += 2) {
503 if (buffer[i] == '-')
504 continue;
505
506 assert(buffer[i] == 'x');
507 evbuffer_add(dst, buffer + i + 1, 1);
508 }
509
510 evbuffer_drain(src, i);
511 return (BEV_OK);
512 }
513
514 /* add an 'x' before each byte */
515
516 static enum bufferevent_filter_result
bufferevent_output_filter(struct evbuffer * src,struct evbuffer * dst,ev_ssize_t lim,enum bufferevent_flush_mode state,void * ctx)517 bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
518 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
519 {
520 const unsigned char *buffer;
521 unsigned i;
522 struct bufferevent **bevp = ctx;
523
524 ++test_ok;
525
526 if (test_ok == 1) {
527 buffer = evbuffer_pullup(src, evbuffer_get_length(src));
528 for (i = 0; i < evbuffer_get_length(src); ++i) {
529 evbuffer_add(dst, "x", 1);
530 evbuffer_add(dst, buffer + i, 1);
531 }
532 evbuffer_drain(src, evbuffer_get_length(src));
533 } else {
534 return BEV_ERROR;
535 }
536
537 if (bevp && test_ok == 1) {
538 int prev = ++test_ok;
539 bufferevent_write(*bevp, "-", 1);
540 /* check that during this bufferevent_write()
541 * bufferevent_output_filter() will not be called again */
542 assert(test_ok == prev);
543 --test_ok;
544 }
545
546 return (BEV_OK);
547 }
548
549 static void
test_bufferevent_filters_impl(int use_pair,int disable)550 test_bufferevent_filters_impl(int use_pair, int disable)
551 {
552 struct bufferevent *bev1 = NULL, *bev2 = NULL;
553 struct bufferevent *bev1_base = NULL, *bev2_base = NULL;
554 char buffer[8333];
555 int i;
556
557 test_ok = 0;
558
559 if (use_pair) {
560 struct bufferevent *pair[2];
561 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
562 bev1 = pair[0];
563 bev2 = pair[1];
564 } else {
565 bev1 = bufferevent_socket_new(NULL, pair[0], 0);
566 bev2 = bufferevent_socket_new(NULL, pair[1], 0);
567 }
568 bev1_base = bev1;
569 bev2_base = bev2;
570
571 for (i = 0; i < (int)sizeof(buffer); i++)
572 buffer[i] = i;
573
574 bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter,
575 BEV_OPT_CLOSE_ON_FREE, NULL,
576 disable ? &bev1 : NULL);
577
578 bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter,
579 NULL, BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
580 bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL);
581 bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL);
582
583 tt_ptr_op(bufferevent_get_underlying(bev1), ==, bev1_base);
584 tt_ptr_op(bufferevent_get_underlying(bev2), ==, bev2_base);
585 tt_fd_op(bufferevent_getfd(bev1), ==, bufferevent_getfd(bev1_base));
586 tt_fd_op(bufferevent_getfd(bev2), ==, bufferevent_getfd(bev2_base));
587
588 bufferevent_disable(bev1, EV_READ);
589 bufferevent_enable(bev2, EV_READ);
590 /* insert some filters */
591 bufferevent_write(bev1, buffer, sizeof(buffer));
592
593 event_dispatch();
594
595 if (test_ok != 3 + !!disable)
596 test_ok = 0;
597
598 end:
599 if (bev1)
600 bufferevent_free(bev1);
601 if (bev2)
602 bufferevent_free(bev2);
603
604 }
605
test_bufferevent_filters(void)606 static void test_bufferevent_filters(void)
607 { test_bufferevent_filters_impl(0, 0); }
test_bufferevent_pair_filters(void)608 static void test_bufferevent_pair_filters(void)
609 { test_bufferevent_filters_impl(1, 0); }
test_bufferevent_filters_disable(void)610 static void test_bufferevent_filters_disable(void)
611 { test_bufferevent_filters_impl(0, 1); }
test_bufferevent_pair_filters_disable(void)612 static void test_bufferevent_pair_filters_disable(void)
613 { test_bufferevent_filters_impl(1, 1); }
614
615
616 static void
sender_writecb(struct bufferevent * bev,void * ctx)617 sender_writecb(struct bufferevent *bev, void *ctx)
618 {
619 if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) {
620 bufferevent_disable(bev,EV_READ|EV_WRITE);
621 TT_BLATHER(("Flushed %d: freeing it.", (int)bufferevent_getfd(bev)));
622 bufferevent_free(bev);
623 }
624 }
625
626 static void
sender_errorcb(struct bufferevent * bev,short what,void * ctx)627 sender_errorcb(struct bufferevent *bev, short what, void *ctx)
628 {
629 TT_FAIL(("Got sender error %d",(int)what));
630 }
631
632 static int bufferevent_connect_test_flags = 0;
633 static int bufferevent_trigger_test_flags = 0;
634 static int n_strings_read = 0;
635 static int n_reads_invoked = 0;
636 static int n_events_invoked = 0;
637
638 #define TEST_STR "Now is the time for all good events to signal for " \
639 "the good of their protocol"
640 static void
listen_cb(struct evconnlistener * listener,evutil_socket_t fd,struct sockaddr * sa,int socklen,void * arg)641 listen_cb(struct evconnlistener *listener, evutil_socket_t fd,
642 struct sockaddr *sa, int socklen, void *arg)
643 {
644 struct event_base *base = arg;
645 struct bufferevent *bev;
646 const char s[] = TEST_STR;
647 TT_BLATHER(("Got a request on socket %d", (int)fd ));
648 bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags);
649 tt_assert(bev);
650 bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL);
651 bufferevent_write(bev, s, sizeof(s));
652 end:
653 ;
654 }
655
656 static evutil_socket_t
fake_listener_create(struct sockaddr_in * localhost)657 fake_listener_create(struct sockaddr_in *localhost)
658 {
659 struct sockaddr *sa = (struct sockaddr *)localhost;
660 evutil_socket_t fd = -1;
661 ev_socklen_t slen = sizeof(*localhost);
662
663 memset(localhost, 0, sizeof(*localhost));
664 localhost->sin_port = 0; /* have the kernel pick a port */
665 localhost->sin_addr.s_addr = htonl(0x7f000001L);
666 localhost->sin_family = AF_INET;
667
668 /* bind, but don't listen or accept. should trigger
669 "Connection refused" reliably on most platforms. */
670 fd = socket(localhost->sin_family, SOCK_STREAM, 0);
671 tt_assert(fd >= 0);
672 tt_assert(bind(fd, sa, slen) == 0);
673 tt_assert(getsockname(fd, sa, &slen) == 0);
674
675 return fd;
676
677 end:
678 return -1;
679 }
680
681 static void
reader_eventcb(struct bufferevent * bev,short what,void * ctx)682 reader_eventcb(struct bufferevent *bev, short what, void *ctx)
683 {
684 struct event_base *base = ctx;
685 if (what & BEV_EVENT_ERROR) {
686 perror("foobar");
687 TT_FAIL(("got connector error %d", (int)what));
688 return;
689 }
690 if (what & BEV_EVENT_CONNECTED) {
691 TT_BLATHER(("connected on %d", (int)bufferevent_getfd(bev)));
692 bufferevent_enable(bev, EV_READ);
693 }
694 if (what & BEV_EVENT_EOF) {
695 char buf[512];
696 size_t n;
697 n = bufferevent_read(bev, buf, sizeof(buf)-1);
698 tt_int_op(n, >=, 0);
699 buf[n] = '\0';
700 tt_str_op(buf, ==, TEST_STR);
701 if (++n_strings_read == 2)
702 event_base_loopexit(base, NULL);
703 TT_BLATHER(("EOF on %d: %d strings read.",
704 (int)bufferevent_getfd(bev), n_strings_read));
705 }
706 end:
707 ;
708 }
709
710 static void
reader_eventcb_simple(struct bufferevent * bev,short what,void * ctx)711 reader_eventcb_simple(struct bufferevent *bev, short what, void *ctx)
712 {
713 TT_BLATHER(("Read eventcb simple invoked on %d.",
714 (int)bufferevent_getfd(bev)));
715 n_events_invoked++;
716 }
717
718 static void
reader_readcb(struct bufferevent * bev,void * ctx)719 reader_readcb(struct bufferevent *bev, void *ctx)
720 {
721 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
722 n_reads_invoked++;
723 }
724
725 static void
test_bufferevent_connect(void * arg)726 test_bufferevent_connect(void *arg)
727 {
728 struct basic_test_data *data = arg;
729 struct evconnlistener *lev=NULL;
730 struct bufferevent *bev1=NULL, *bev2=NULL;
731 struct sockaddr_in localhost;
732 struct sockaddr_storage ss;
733 struct sockaddr *sa;
734 ev_socklen_t slen;
735
736 int be_flags=BEV_OPT_CLOSE_ON_FREE;
737
738 if (strstr((char*)data->setup_data, "defer")) {
739 be_flags |= BEV_OPT_DEFER_CALLBACKS;
740 }
741 if (strstr((char*)data->setup_data, "unlocked")) {
742 be_flags |= BEV_OPT_UNLOCK_CALLBACKS;
743 }
744 if (strstr((char*)data->setup_data, "lock")) {
745 be_flags |= BEV_OPT_THREADSAFE;
746 }
747 bufferevent_connect_test_flags = be_flags;
748 #ifdef _WIN32
749 if (!strcmp((char*)data->setup_data, "unset_connectex")) {
750 struct win32_extension_fns *ext =
751 (struct win32_extension_fns *)
752 event_get_win32_extension_fns_();
753 ext->ConnectEx = NULL;
754 }
755 #endif
756
757 memset(&localhost, 0, sizeof(localhost));
758
759 localhost.sin_port = 0; /* pick-a-port */
760 localhost.sin_addr.s_addr = htonl(0x7f000001L);
761 localhost.sin_family = AF_INET;
762 sa = (struct sockaddr *)&localhost;
763 lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
764 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
765 16, sa, sizeof(localhost));
766 tt_assert(lev);
767
768 sa = (struct sockaddr *)&ss;
769 slen = sizeof(ss);
770 if (regress_get_listener_addr(lev, sa, &slen) < 0) {
771 tt_abort_perror("getsockname");
772 }
773
774 tt_assert(!evconnlistener_enable(lev));
775 bev1 = bufferevent_socket_new(data->base, -1, be_flags);
776 bev2 = bufferevent_socket_new(data->base, -1, be_flags);
777 tt_assert(bev1);
778 tt_assert(bev2);
779 bufferevent_setcb(bev1, reader_readcb,NULL, reader_eventcb, data->base);
780 bufferevent_setcb(bev2, reader_readcb,NULL, reader_eventcb, data->base);
781
782 bufferevent_enable(bev1, EV_READ);
783 bufferevent_enable(bev2, EV_READ);
784
785 tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost)));
786 tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost)));
787
788 event_base_dispatch(data->base);
789
790 tt_int_op(n_strings_read, ==, 2);
791 tt_int_op(n_reads_invoked, >=, 2);
792 end:
793 if (lev)
794 evconnlistener_free(lev);
795
796 if (bev1)
797 bufferevent_free(bev1);
798
799 if (bev2)
800 bufferevent_free(bev2);
801 }
802
803 static void
close_socket_cb(evutil_socket_t fd,short what,void * arg)804 close_socket_cb(evutil_socket_t fd, short what, void *arg)
805 {
806 evutil_socket_t *fdp = arg;
807 if (*fdp >= 0) {
808 evutil_closesocket(*fdp);
809 *fdp = -1;
810 }
811 }
812
813 static void
test_bufferevent_connect_fail_eventcb(void * arg)814 test_bufferevent_connect_fail_eventcb(void *arg)
815 {
816 struct basic_test_data *data = arg;
817 int flags = BEV_OPT_CLOSE_ON_FREE | (long)data->setup_data;
818 struct event close_listener_event;
819 struct bufferevent *bev = NULL;
820 struct evconnlistener *lev = NULL;
821 struct sockaddr_in localhost;
822 struct timeval close_timeout = { 0, 300000 };
823 ev_socklen_t slen = sizeof(localhost);
824 evutil_socket_t fake_listener = -1;
825 int r;
826
827 fake_listener = fake_listener_create(&localhost);
828
829 tt_int_op(n_events_invoked, ==, 0);
830
831 bev = bufferevent_socket_new(data->base, -1, flags);
832 tt_assert(bev);
833 bufferevent_setcb(bev, reader_readcb, reader_readcb,
834 reader_eventcb_simple, data->base);
835 bufferevent_enable(bev, EV_READ|EV_WRITE);
836 tt_int_op(n_events_invoked, ==, 0);
837 tt_int_op(n_reads_invoked, ==, 0);
838
839 /** @see also test_bufferevent_connect_fail() */
840 r = bufferevent_socket_connect(bev, (struct sockaddr *)&localhost, slen);
841 /* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells
842 * detects the error immediately, which is not really wrong of it. */
843 tt_want(r == 0 || r == -1);
844
845 tt_int_op(n_events_invoked, ==, 0);
846 tt_int_op(n_reads_invoked, ==, 0);
847
848 /* Close the listener socket after a delay. This should trigger
849 "connection refused" on some other platforms, including OSX. */
850 evtimer_assign(&close_listener_event, data->base, close_socket_cb,
851 &fake_listener);
852 event_add(&close_listener_event, &close_timeout);
853
854 event_base_dispatch(data->base);
855 tt_int_op(n_events_invoked, ==, 1);
856 tt_int_op(n_reads_invoked, ==, 0);
857
858 end:
859 if (lev)
860 evconnlistener_free(lev);
861 if (bev)
862 bufferevent_free(bev);
863 if (fake_listener >= 0)
864 evutil_closesocket(fake_listener);
865 }
866
867 static void
want_fail_eventcb(struct bufferevent * bev,short what,void * ctx)868 want_fail_eventcb(struct bufferevent *bev, short what, void *ctx)
869 {
870 struct event_base *base = ctx;
871 const char *err;
872 evutil_socket_t s;
873
874 if (what & BEV_EVENT_ERROR) {
875 s = bufferevent_getfd(bev);
876 err = evutil_socket_error_to_string(evutil_socket_geterror(s));
877 TT_BLATHER(("connection failure on "EV_SOCK_FMT": %s",
878 EV_SOCK_ARG(s), err));
879 test_ok = 1;
880 } else {
881 TT_FAIL(("didn't fail? what %hd", what));
882 }
883
884 event_base_loopexit(base, NULL);
885 }
886
887 static void
test_bufferevent_connect_fail(void * arg)888 test_bufferevent_connect_fail(void *arg)
889 {
890 struct basic_test_data *data = (struct basic_test_data *)arg;
891 struct bufferevent *bev=NULL;
892 struct event close_listener_event;
893 int close_listener_event_added = 0;
894 struct timeval close_timeout = { 0, 300000 };
895 struct sockaddr_in localhost;
896 ev_socklen_t slen = sizeof(localhost);
897 evutil_socket_t fake_listener = -1;
898 int r;
899
900 test_ok = 0;
901
902 fake_listener = fake_listener_create(&localhost);
903 bev = bufferevent_socket_new(data->base, -1,
904 BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
905 tt_assert(bev);
906 bufferevent_setcb(bev, NULL, NULL, want_fail_eventcb, data->base);
907
908 r = bufferevent_socket_connect(bev, (struct sockaddr *)&localhost, slen);
909 /* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells
910 * detects the error immediately, which is not really wrong of it. */
911 tt_want(r == 0 || r == -1);
912
913 /* Close the listener socket after a delay. This should trigger
914 "connection refused" on some other platforms, including OSX. */
915 evtimer_assign(&close_listener_event, data->base, close_socket_cb,
916 &fake_listener);
917 event_add(&close_listener_event, &close_timeout);
918 close_listener_event_added = 1;
919
920 event_base_dispatch(data->base);
921
922 tt_int_op(test_ok, ==, 1);
923
924 end:
925 if (fake_listener >= 0)
926 evutil_closesocket(fake_listener);
927
928 if (bev)
929 bufferevent_free(bev);
930
931 if (close_listener_event_added)
932 event_del(&close_listener_event);
933 }
934
935 struct timeout_cb_result {
936 struct timeval read_timeout_at;
937 struct timeval write_timeout_at;
938 struct timeval last_wrote_at;
939 struct timeval last_read_at;
940 int n_read_timeouts;
941 int n_write_timeouts;
942 int total_calls;
943 };
944
945 static void
bev_timeout_read_cb(struct bufferevent * bev,void * arg)946 bev_timeout_read_cb(struct bufferevent *bev, void *arg)
947 {
948 struct timeout_cb_result *res = arg;
949 evutil_gettimeofday(&res->last_read_at, NULL);
950 }
951 static void
bev_timeout_write_cb(struct bufferevent * bev,void * arg)952 bev_timeout_write_cb(struct bufferevent *bev, void *arg)
953 {
954 struct timeout_cb_result *res = arg;
955 evutil_gettimeofday(&res->last_wrote_at, NULL);
956 }
957 static void
bev_timeout_event_cb(struct bufferevent * bev,short what,void * arg)958 bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg)
959 {
960 struct timeout_cb_result *res = arg;
961 ++res->total_calls;
962
963 if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT))
964 == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) {
965 evutil_gettimeofday(&res->read_timeout_at, NULL);
966 ++res->n_read_timeouts;
967 }
968 if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT))
969 == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) {
970 evutil_gettimeofday(&res->write_timeout_at, NULL);
971 ++res->n_write_timeouts;
972 }
973 }
974
975 static void
test_bufferevent_timeouts(void * arg)976 test_bufferevent_timeouts(void *arg)
977 {
978 /* "arg" is a string containing "pair" and/or "filter". */
979 struct bufferevent *bev1 = NULL, *bev2 = NULL;
980 struct basic_test_data *data = arg;
981 int use_pair = 0, use_filter = 0;
982 struct timeval tv_w, tv_r, started_at;
983 struct timeout_cb_result res1, res2;
984
985 memset(&res1, 0, sizeof(res1));
986 memset(&res2, 0, sizeof(res2));
987
988 if (strstr((char*)data->setup_data, "pair"))
989 use_pair = 1;
990 if (strstr((char*)data->setup_data, "filter"))
991 use_filter = 1;
992
993 if (use_pair) {
994 struct bufferevent *p[2];
995 tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p));
996 bev1 = p[0];
997 bev2 = p[1];
998 } else {
999 bev1 = bufferevent_socket_new(data->base, data->pair[0], 0);
1000 bev2 = bufferevent_socket_new(data->base, data->pair[1], 0);
1001 }
1002 tt_assert(bev1);
1003 tt_assert(bev2);
1004
1005 if (use_filter) {
1006 struct bufferevent *bevf1, *bevf2;
1007 bevf1 = bufferevent_filter_new(bev1, NULL, NULL,
1008 BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
1009 bevf2 = bufferevent_filter_new(bev2, NULL, NULL,
1010 BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
1011 tt_assert(bevf1);
1012 tt_assert(bevf2);
1013 bev1 = bevf1;
1014 bev2 = bevf2;
1015 }
1016
1017 /* Do this nice and early. */
1018 bufferevent_disable(bev2, EV_READ);
1019
1020 /* bev1 will try to write and read. Both will time out. */
1021 evutil_gettimeofday(&started_at, NULL);
1022 tv_w.tv_sec = tv_r.tv_sec = 0;
1023 tv_w.tv_usec = 100*1000;
1024 tv_r.tv_usec = 150*1000;
1025 bufferevent_setcb(bev1, bev_timeout_read_cb, bev_timeout_write_cb,
1026 bev_timeout_event_cb, &res1);
1027 bufferevent_set_timeouts(bev1, &tv_r, &tv_w);
1028 bufferevent_write(bev1, "ABCDEFG", 7);
1029 bufferevent_enable(bev1, EV_READ|EV_WRITE);
1030
1031 /* bev2 has nothing to say, and isn't listening. */
1032 bufferevent_setcb(bev2, bev_timeout_read_cb, bev_timeout_write_cb,
1033 bev_timeout_event_cb, &res2);
1034 tv_w.tv_sec = tv_r.tv_sec = 0;
1035 tv_w.tv_usec = 200*1000;
1036 tv_r.tv_usec = 100*1000;
1037 bufferevent_set_timeouts(bev2, &tv_r, &tv_w);
1038 bufferevent_enable(bev2, EV_WRITE);
1039
1040 tv_r.tv_sec = 0;
1041 tv_r.tv_usec = 350000;
1042
1043 event_base_loopexit(data->base, &tv_r);
1044 event_base_dispatch(data->base);
1045
1046 /* XXXX Test that actually reading or writing a little resets the
1047 * timeouts. */
1048
1049 tt_want(res1.total_calls == 2);
1050 tt_want(res1.n_read_timeouts == 1);
1051 tt_want(res1.n_write_timeouts == 1);
1052 tt_want(res2.total_calls == !(use_pair && !use_filter));
1053 tt_want(res2.n_write_timeouts == !(use_pair && !use_filter));
1054 tt_want(!res2.n_read_timeouts);
1055
1056 test_timeval_diff_eq(&started_at, &res1.read_timeout_at, 150);
1057 test_timeval_diff_eq(&started_at, &res1.write_timeout_at, 100);
1058
1059 #define tt_assert_timeval_empty(tv) do { \
1060 tt_int_op((tv).tv_sec, ==, 0); \
1061 tt_int_op((tv).tv_usec, ==, 0); \
1062 } while(0)
1063 tt_assert_timeval_empty(res1.last_read_at);
1064 tt_assert_timeval_empty(res2.last_read_at);
1065 tt_assert_timeval_empty(res2.last_wrote_at);
1066 tt_assert_timeval_empty(res2.last_wrote_at);
1067 #undef tt_assert_timeval_empty
1068
1069 end:
1070 if (bev1)
1071 bufferevent_free(bev1);
1072 if (bev2)
1073 bufferevent_free(bev2);
1074 }
1075
1076 static void
trigger_failure_cb(evutil_socket_t fd,short what,void * ctx)1077 trigger_failure_cb(evutil_socket_t fd, short what, void *ctx)
1078 {
1079 TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout)."));
1080 }
1081
1082 static void
trigger_eventcb(struct bufferevent * bev,short what,void * ctx)1083 trigger_eventcb(struct bufferevent *bev, short what, void *ctx)
1084 {
1085 struct event_base *base = ctx;
1086 if (what == ~0) {
1087 TT_BLATHER(("Event successfully triggered."));
1088 event_base_loopexit(base, NULL);
1089 return;
1090 }
1091 reader_eventcb(bev, what, ctx);
1092 }
1093
1094 static void
trigger_readcb_triggered(struct bufferevent * bev,void * ctx)1095 trigger_readcb_triggered(struct bufferevent *bev, void *ctx)
1096 {
1097 TT_BLATHER(("Read successfully triggered."));
1098 n_reads_invoked++;
1099 bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags);
1100 }
1101
1102 static void
trigger_readcb(struct bufferevent * bev,void * ctx)1103 trigger_readcb(struct bufferevent *bev, void *ctx)
1104 {
1105 struct timeval timeout = { 30, 0 };
1106 struct event_base *base = ctx;
1107 size_t low, high, len;
1108 int expected_reads;
1109
1110 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
1111 expected_reads = ++n_reads_invoked;
1112
1113 bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx);
1114
1115 bufferevent_getwatermark(bev, EV_READ, &low, &high);
1116 len = evbuffer_get_length(bufferevent_get_input(bev));
1117
1118 bufferevent_setwatermark(bev, EV_READ, len + 1, 0);
1119 bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags);
1120 /* no callback expected */
1121 tt_int_op(n_reads_invoked, ==, expected_reads);
1122
1123 if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) ||
1124 (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) {
1125 /* will be deferred */
1126 } else {
1127 expected_reads++;
1128 }
1129
1130 event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout);
1131
1132 bufferevent_trigger(bev, EV_READ,
1133 bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS);
1134 tt_int_op(n_reads_invoked, ==, expected_reads);
1135
1136 bufferevent_setwatermark(bev, EV_READ, low, high);
1137 end:
1138 ;
1139 }
1140
1141 static void
test_bufferevent_trigger(void * arg)1142 test_bufferevent_trigger(void *arg)
1143 {
1144 struct basic_test_data *data = arg;
1145 struct evconnlistener *lev=NULL;
1146 struct bufferevent *bev=NULL;
1147 struct sockaddr_in localhost;
1148 struct sockaddr_storage ss;
1149 struct sockaddr *sa;
1150 ev_socklen_t slen;
1151
1152 int be_flags=BEV_OPT_CLOSE_ON_FREE;
1153 int trig_flags=0;
1154
1155 if (strstr((char*)data->setup_data, "defer")) {
1156 be_flags |= BEV_OPT_DEFER_CALLBACKS;
1157 }
1158 bufferevent_connect_test_flags = be_flags;
1159
1160 if (strstr((char*)data->setup_data, "postpone")) {
1161 trig_flags |= BEV_TRIG_DEFER_CALLBACKS;
1162 }
1163 bufferevent_trigger_test_flags = trig_flags;
1164
1165 memset(&localhost, 0, sizeof(localhost));
1166
1167 localhost.sin_port = 0; /* pick-a-port */
1168 localhost.sin_addr.s_addr = htonl(0x7f000001L);
1169 localhost.sin_family = AF_INET;
1170 sa = (struct sockaddr *)&localhost;
1171 lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
1172 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
1173 16, sa, sizeof(localhost));
1174 tt_assert(lev);
1175
1176 sa = (struct sockaddr *)&ss;
1177 slen = sizeof(ss);
1178 if (regress_get_listener_addr(lev, sa, &slen) < 0) {
1179 tt_abort_perror("getsockname");
1180 }
1181
1182 tt_assert(!evconnlistener_enable(lev));
1183 bev = bufferevent_socket_new(data->base, -1, be_flags);
1184 tt_assert(bev);
1185 bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base);
1186
1187 bufferevent_enable(bev, EV_READ);
1188
1189 tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost)));
1190
1191 event_base_dispatch(data->base);
1192
1193 tt_int_op(n_reads_invoked, ==, 2);
1194 end:
1195 if (lev)
1196 evconnlistener_free(lev);
1197
1198 if (bev)
1199 bufferevent_free(bev);
1200 }
1201
1202 static void
test_bufferevent_socket_filter_inactive(void * arg)1203 test_bufferevent_socket_filter_inactive(void *arg)
1204 {
1205 struct basic_test_data *data = arg;
1206 struct bufferevent *bev = NULL, *bevf = NULL;
1207
1208 bev = bufferevent_socket_new(data->base, -1, 0);
1209 tt_assert(bev);
1210 bevf = bufferevent_filter_new(bev, NULL, NULL, 0, NULL, NULL);
1211 tt_assert(bevf);
1212
1213 end:
1214 if (bevf)
1215 bufferevent_free(bevf);
1216 if (bev)
1217 bufferevent_free(bev);
1218 }
1219
1220 static void
pair_flush_eventcb(struct bufferevent * bev,short what,void * ctx)1221 pair_flush_eventcb(struct bufferevent *bev, short what, void *ctx)
1222 {
1223 int *callback_what = ctx;
1224 *callback_what = what;
1225 }
1226
1227 static void
test_bufferevent_pair_flush(void * arg)1228 test_bufferevent_pair_flush(void *arg)
1229 {
1230 struct basic_test_data *data = arg;
1231 struct bufferevent *pair[2];
1232 struct bufferevent *bev1 = NULL;
1233 struct bufferevent *bev2 = NULL;
1234 int callback_what = 0;
1235
1236 tt_assert(0 == bufferevent_pair_new(data->base, 0, pair));
1237 bev1 = pair[0];
1238 bev2 = pair[1];
1239 tt_assert(0 == bufferevent_enable(bev1, EV_WRITE));
1240 tt_assert(0 == bufferevent_enable(bev2, EV_READ));
1241
1242 bufferevent_setcb(bev2, NULL, NULL, pair_flush_eventcb, &callback_what);
1243
1244 bufferevent_flush(bev1, EV_WRITE, BEV_FINISHED);
1245
1246 event_base_loop(data->base, EVLOOP_ONCE);
1247
1248 tt_assert(callback_what == (BEV_EVENT_READING | BEV_EVENT_EOF));
1249
1250 end:
1251 if (bev1)
1252 bufferevent_free(bev1);
1253 if (bev2)
1254 bufferevent_free(bev2);
1255 }
1256
1257 struct bufferevent_filter_data_stuck {
1258 size_t header_size;
1259 size_t total_read;
1260 };
1261
1262 static void
bufferevent_filter_data_stuck_readcb(struct bufferevent * bev,void * arg)1263 bufferevent_filter_data_stuck_readcb(struct bufferevent *bev, void *arg)
1264 {
1265 struct bufferevent_filter_data_stuck *filter_data = arg;
1266 struct evbuffer *input = bufferevent_get_input(bev);
1267 size_t read_size = evbuffer_get_length(input);
1268 evbuffer_drain(input, read_size);
1269 filter_data->total_read += read_size;
1270 }
1271
1272 /**
1273 * This filter prepends header once before forwarding data.
1274 */
1275 static enum bufferevent_filter_result
bufferevent_filter_data_stuck_inputcb(struct evbuffer * src,struct evbuffer * dst,ev_ssize_t dst_limit,enum bufferevent_flush_mode mode,void * ctx)1276 bufferevent_filter_data_stuck_inputcb(
1277 struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit,
1278 enum bufferevent_flush_mode mode, void *ctx)
1279 {
1280 struct bufferevent_filter_data_stuck *filter_data = ctx;
1281 static int header_inserted = 0;
1282 size_t payload_size;
1283 size_t header_size = 0;
1284
1285 if (!header_inserted) {
1286 char *header = calloc(filter_data->header_size, 1);
1287 evbuffer_add(dst, header, filter_data->header_size);
1288 free(header);
1289 header_size = filter_data->header_size;
1290 header_inserted = 1;
1291 }
1292
1293 payload_size = evbuffer_get_length(src);
1294 if (payload_size > dst_limit - header_size) {
1295 payload_size = dst_limit - header_size;
1296 }
1297
1298 tt_int_op(payload_size, ==, evbuffer_remove_buffer(src, dst, payload_size));
1299
1300 end:
1301 return BEV_OK;
1302 }
1303
1304 static void
test_bufferevent_filter_data_stuck(void * arg)1305 test_bufferevent_filter_data_stuck(void *arg)
1306 {
1307 const size_t read_high_wm = 4096;
1308 struct bufferevent_filter_data_stuck filter_data;
1309 struct basic_test_data *data = arg;
1310 struct bufferevent *pair[2];
1311 struct bufferevent *filter = NULL;
1312
1313 int options = BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS;
1314
1315 char payload[4096];
1316 int payload_size = sizeof(payload);
1317
1318 memset(&filter_data, 0, sizeof(filter_data));
1319 filter_data.header_size = 20;
1320
1321 tt_assert(bufferevent_pair_new(data->base, options, pair) == 0);
1322
1323 bufferevent_setwatermark(pair[0], EV_READ, 0, read_high_wm);
1324 bufferevent_setwatermark(pair[1], EV_READ, 0, read_high_wm);
1325
1326 tt_assert(
1327 filter =
1328 bufferevent_filter_new(pair[1],
1329 bufferevent_filter_data_stuck_inputcb,
1330 NULL,
1331 options,
1332 NULL,
1333 &filter_data));
1334
1335 bufferevent_setcb(filter,
1336 bufferevent_filter_data_stuck_readcb,
1337 NULL,
1338 NULL,
1339 &filter_data);
1340
1341 tt_assert(bufferevent_enable(filter, EV_READ|EV_WRITE) == 0);
1342
1343 bufferevent_setwatermark(filter, EV_READ, 0, read_high_wm);
1344
1345 tt_assert(bufferevent_write(pair[0], payload, sizeof(payload)) == 0);
1346
1347 event_base_dispatch(data->base);
1348
1349 tt_int_op(filter_data.total_read, ==, payload_size + filter_data.header_size);
1350 end:
1351 if (pair[0])
1352 bufferevent_free(pair[0]);
1353 if (filter)
1354 bufferevent_free(filter);
1355 }
1356
1357 struct testcase_t bufferevent_testcases[] = {
1358
1359 LEGACY(bufferevent, TT_ISOLATED),
1360 LEGACY(bufferevent_pair, TT_ISOLATED),
1361 LEGACY(bufferevent_flush_normal, TT_ISOLATED),
1362 LEGACY(bufferevent_flush_flush, TT_ISOLATED),
1363 LEGACY(bufferevent_flush_finished, TT_ISOLATED),
1364 LEGACY(bufferevent_pair_flush_normal, TT_ISOLATED),
1365 LEGACY(bufferevent_pair_flush_flush, TT_ISOLATED),
1366 LEGACY(bufferevent_pair_flush_finished, TT_ISOLATED),
1367 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) && !defined(__SANITIZE_ADDRESS__)
1368 { "bufferevent_pair_release_lock", test_bufferevent_pair_release_lock,
1369 TT_FORK|TT_ISOLATED|TT_NEED_THREADS|TT_NEED_BASE|TT_LEGACY|TT_NO_LOGS,
1370 &basic_setup, NULL },
1371 #endif
1372 LEGACY(bufferevent_watermarks, TT_ISOLATED),
1373 LEGACY(bufferevent_pair_watermarks, TT_ISOLATED),
1374 LEGACY(bufferevent_filters, TT_ISOLATED),
1375 LEGACY(bufferevent_pair_filters, TT_ISOLATED),
1376 LEGACY(bufferevent_filters_disable, TT_ISOLATED),
1377 LEGACY(bufferevent_pair_filters_disable, TT_ISOLATED),
1378 { "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE,
1379 &basic_setup, (void*)"" },
1380 { "bufferevent_connect_defer", test_bufferevent_connect,
1381 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
1382 { "bufferevent_connect_lock", test_bufferevent_connect,
1383 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" },
1384 { "bufferevent_connect_lock_defer", test_bufferevent_connect,
1385 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
1386 (void*)"defer lock" },
1387 { "bufferevent_connect_unlocked_cbs", test_bufferevent_connect,
1388 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
1389 (void*)"lock defer unlocked" },
1390 { "bufferevent_connect_fail", test_bufferevent_connect_fail,
1391 TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
1392 { "bufferevent_timeout", test_bufferevent_timeouts,
1393 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"" },
1394 { "bufferevent_timeout_pair", test_bufferevent_timeouts,
1395 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" },
1396 { "bufferevent_timeout_filter", test_bufferevent_timeouts,
1397 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" },
1398 { "bufferevent_timeout_filter_pair", test_bufferevent_timeouts,
1399 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" },
1400 { "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE,
1401 &basic_setup, (void*)"" },
1402 { "bufferevent_trigger_defer", test_bufferevent_trigger,
1403 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
1404 { "bufferevent_trigger_postpone", test_bufferevent_trigger,
1405 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
1406 (void*)"postpone" },
1407 { "bufferevent_trigger_defer_postpone", test_bufferevent_trigger,
1408 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
1409 (void*)"defer postpone" },
1410 #ifdef EVENT__HAVE_LIBZ
1411 LEGACY(bufferevent_zlib, TT_ISOLATED),
1412 #else
1413 { "bufferevent_zlib", NULL, TT_SKIP, NULL, NULL },
1414 #endif
1415
1416 { "bufferevent_connect_fail_eventcb_defer",
1417 test_bufferevent_connect_fail_eventcb,
1418 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)BEV_OPT_DEFER_CALLBACKS },
1419 { "bufferevent_connect_fail_eventcb",
1420 test_bufferevent_connect_fail_eventcb,
1421 TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
1422
1423 { "bufferevent_socket_filter_inactive",
1424 test_bufferevent_socket_filter_inactive,
1425 TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
1426 { "bufferevent_pair_flush",
1427 test_bufferevent_pair_flush,
1428 TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
1429 { "bufferevent_filter_data_stuck",
1430 test_bufferevent_filter_data_stuck,
1431 TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
1432
1433 END_OF_TESTCASES,
1434 };
1435
1436 #define TT_IOCP (TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP)
1437 #define TT_IOCP_LEGACY (TT_ISOLATED|TT_ENABLE_IOCP)
1438 struct testcase_t bufferevent_iocp_testcases[] = {
1439 LEGACY(bufferevent, TT_IOCP_LEGACY),
1440 LEGACY(bufferevent_flush_normal, TT_ISOLATED),
1441 LEGACY(bufferevent_flush_flush, TT_ISOLATED),
1442 LEGACY(bufferevent_flush_finished, TT_ISOLATED),
1443 LEGACY(bufferevent_watermarks, TT_IOCP_LEGACY),
1444 LEGACY(bufferevent_filters, TT_IOCP_LEGACY),
1445 LEGACY(bufferevent_filters_disable, TT_IOCP_LEGACY),
1446
1447 { "bufferevent_connect", test_bufferevent_connect,
1448 TT_IOCP, &basic_setup, (void*)"" },
1449 { "bufferevent_connect_defer", test_bufferevent_connect,
1450 TT_IOCP, &basic_setup, (void*)"defer" },
1451 { "bufferevent_connect_lock", test_bufferevent_connect,
1452 TT_IOCP, &basic_setup, (void*)"lock" },
1453 { "bufferevent_connect_lock_defer", test_bufferevent_connect,
1454 TT_IOCP, &basic_setup, (void*)"defer lock" },
1455 { "bufferevent_connect_fail", test_bufferevent_connect_fail,
1456 TT_IOCP, &basic_setup, NULL },
1457 { "bufferevent_connect_nonblocking", test_bufferevent_connect,
1458 TT_IOCP, &basic_setup, (void*)"unset_connectex" },
1459
1460 { "bufferevent_connect_fail_eventcb_defer",
1461 test_bufferevent_connect_fail_eventcb,
1462 TT_IOCP, &basic_setup, (void*)BEV_OPT_DEFER_CALLBACKS },
1463 { "bufferevent_connect_fail_eventcb",
1464 test_bufferevent_connect_fail_eventcb, TT_IOCP, &basic_setup, NULL },
1465
1466 END_OF_TESTCASES,
1467 };
1468