1 /* ====================================================================
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 * ====================================================================
19 */
20
21 #include "apr.h"
22 #include "apr_pools.h"
23 #include <apr_poll.h>
24 #include <apr_version.h>
25 #include <stdlib.h>
26
27 #include "serf.h"
28 #include "serf_private.h" /* for serf__log and serf__bucket_stream_create */
29
30 #include "test_server.h"
31
32 #define BUFSIZE 8192
33
34 /* Cleanup callback for a server. */
cleanup_server(void * baton)35 static apr_status_t cleanup_server(void *baton)
36 {
37 serv_ctx_t *servctx = baton;
38 apr_status_t status;
39
40 if (servctx->serv_sock)
41 status = apr_socket_close(servctx->serv_sock);
42 else
43 status = APR_EGENERAL;
44
45 if (servctx->client_sock) {
46 apr_socket_close(servctx->client_sock);
47 }
48
49 return status;
50 }
51
52 /* Replay support functions */
next_message(serv_ctx_t * servctx)53 static void next_message(serv_ctx_t *servctx)
54 {
55 servctx->cur_message++;
56 }
57
next_action(serv_ctx_t * servctx)58 static void next_action(serv_ctx_t *servctx)
59 {
60 servctx->cur_action++;
61 servctx->action_buf_pos = 0;
62 }
63
64 static apr_status_t
socket_write(serv_ctx_t * serv_ctx,const char * data,apr_size_t * len)65 socket_write(serv_ctx_t *serv_ctx, const char *data,
66 apr_size_t *len)
67 {
68 return apr_socket_send(serv_ctx->client_sock, data, len);
69 }
70
71 static apr_status_t
socket_read(serv_ctx_t * serv_ctx,char * data,apr_size_t * len)72 socket_read(serv_ctx_t *serv_ctx, char *data,
73 apr_size_t *len)
74 {
75 return apr_socket_recv(serv_ctx->client_sock, data, len);
76 }
77
78 static apr_status_t
create_client_socket(apr_socket_t ** skt,serv_ctx_t * servctx,const char * url)79 create_client_socket(apr_socket_t **skt,
80 serv_ctx_t *servctx,
81 const char *url)
82 {
83 apr_sockaddr_t *address;
84 apr_uri_t uri;
85 apr_status_t status;
86
87 status = apr_uri_parse(servctx->pool, url, &uri);
88 if (status != APR_SUCCESS)
89 return status;
90
91 status = apr_sockaddr_info_get(&address,
92 uri.hostname,
93 APR_UNSPEC,
94 uri.port,
95 0,
96 servctx->pool);
97 if (status != APR_SUCCESS)
98 return status;
99
100 status = apr_socket_create(skt,
101 address->family,
102 SOCK_STREAM,
103 #if APR_MAJOR_VERSION > 0
104 APR_PROTO_TCP,
105 #endif
106 servctx->pool);
107 if (status != APR_SUCCESS)
108 return status;
109
110 /* Set the socket to be non-blocking */
111 status = apr_socket_timeout_set(*skt, 0);
112 if (status != APR_SUCCESS)
113 return status;
114
115 status = apr_socket_connect(*skt, address);
116 if (status != APR_SUCCESS && !APR_STATUS_IS_EINPROGRESS(status))
117 return status;
118
119 return APR_SUCCESS;
120 }
121
detect_eof(void * baton,serf_bucket_t * aggregate_bucket)122 static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
123 {
124 return APR_EAGAIN;
125 }
126
127 /* Verify received requests and take the necessary actions
128 (return a response, kill the connection ...) */
replay(serv_ctx_t * servctx,apr_int16_t rtnevents,apr_pool_t * pool)129 static apr_status_t replay(serv_ctx_t *servctx,
130 apr_int16_t rtnevents,
131 apr_pool_t *pool)
132 {
133 apr_status_t status = APR_SUCCESS;
134 test_server_action_t *action;
135
136 if (rtnevents & APR_POLLIN) {
137 if (servctx->message_list == NULL) {
138 /* we're not expecting any requests to reach this server! */
139 serf__log(TEST_VERBOSE, __FILE__,
140 "Received request where none was expected.\n");
141
142 return SERF_ERROR_ISSUE_IN_TESTSUITE;
143 }
144
145 if (servctx->cur_action >= servctx->action_count) {
146 char buf[128];
147 apr_size_t len = sizeof(buf);
148
149 status = servctx->read(servctx, buf, &len);
150 if (! APR_STATUS_IS_EAGAIN(status)) {
151 /* we're out of actions! */
152 serf__log(TEST_VERBOSE, __FILE__,
153 "Received more requests than expected.\n");
154
155 return SERF_ERROR_ISSUE_IN_TESTSUITE;
156 }
157 return status;
158 }
159
160 action = &servctx->action_list[servctx->cur_action];
161
162 serf__log(TEST_VERBOSE, __FILE__,
163 "POLLIN while replaying action %d, kind: %d.\n",
164 servctx->cur_action, action->kind);
165
166 /* Read the remaining data from the client and kill the socket. */
167 if (action->kind == SERVER_IGNORE_AND_KILL_CONNECTION) {
168 char buf[128];
169 apr_size_t len = sizeof(buf);
170
171 status = servctx->read(servctx, buf, &len);
172
173 if (status == APR_EOF) {
174 serf__log(TEST_VERBOSE, __FILE__,
175 "Killing this connection.\n");
176 apr_socket_close(servctx->client_sock);
177 servctx->client_sock = NULL;
178 next_action(servctx);
179 return APR_SUCCESS;
180 }
181
182 return status;
183 }
184 else if (action->kind == SERVER_RECV ||
185 (action->kind == SERVER_RESPOND &&
186 servctx->outstanding_responses == 0)) {
187 apr_size_t msg_len, len;
188 char buf[128];
189 test_server_message_t *message;
190
191 message = &servctx->message_list[servctx->cur_message];
192 msg_len = strlen(message->text);
193
194 do
195 {
196 len = msg_len - servctx->message_buf_pos;
197 if (len > sizeof(buf))
198 len = sizeof(buf);
199
200 status = servctx->read(servctx, buf, &len);
201 if (SERF_BUCKET_READ_ERROR(status))
202 return status;
203
204 if (status == APR_EOF) {
205 serf__log(TEST_VERBOSE, __FILE__,
206 "Server: Client hung up the connection.\n");
207 break;
208 }
209 if (servctx->options & TEST_SERVER_DUMP)
210 fwrite(buf, len, 1, stdout);
211
212 if (strncmp(buf,
213 message->text + servctx->message_buf_pos,
214 len) != 0) {
215 /* ## TODO: Better diagnostics. */
216 printf("Expected: (\n");
217 fwrite(message->text + servctx->message_buf_pos, len, 1,
218 stdout);
219 printf(")\n");
220 printf("Actual: (\n");
221 fwrite(buf, len, 1, stdout);
222 printf(")\n");
223
224 return SERF_ERROR_ISSUE_IN_TESTSUITE;
225 }
226
227 servctx->message_buf_pos += len;
228
229 if (servctx->message_buf_pos >= msg_len) {
230 next_message(servctx);
231 servctx->message_buf_pos -= msg_len;
232 if (action->kind == SERVER_RESPOND)
233 servctx->outstanding_responses++;
234 if (action->kind == SERVER_RECV)
235 next_action(servctx);
236 break;
237 }
238 } while (!status);
239 }
240 else if (action->kind == PROXY_FORWARD) {
241 apr_size_t len;
242 char buf[BUFSIZE];
243 serf_bucket_t *tmp;
244
245 /* Read all incoming data from the client to forward it to the
246 server later. */
247 do
248 {
249 len = BUFSIZE;
250
251 status = servctx->read(servctx, buf, &len);
252 if (SERF_BUCKET_READ_ERROR(status))
253 return status;
254
255 serf__log(TEST_VERBOSE, __FILE__,
256 "proxy: reading %d bytes %.*s from client with "
257 "status %d.\n",
258 len, len, buf, status);
259
260 if (status == APR_EOF) {
261 serf__log(TEST_VERBOSE, __FILE__,
262 "Proxy: client hung up the connection. Reset the "
263 "connection to the server.\n");
264 /* We have to stop forwarding, if a new connection opens
265 the CONNECT request should not be forwarded to the
266 server. */
267 next_action(servctx);
268 }
269 if (!servctx->servstream)
270 servctx->servstream = serf__bucket_stream_create(
271 servctx->allocator,
272 detect_eof,servctx);
273 if (len) {
274 tmp = serf_bucket_simple_copy_create(buf, len,
275 servctx->allocator);
276 serf_bucket_aggregate_append(servctx->servstream, tmp);
277 }
278 } while (!status);
279 }
280 }
281 if (rtnevents & APR_POLLOUT) {
282 action = &servctx->action_list[servctx->cur_action];
283
284 serf__log(TEST_VERBOSE, __FILE__,
285 "POLLOUT when replaying action %d, kind: %d.\n", servctx->cur_action,
286 action->kind);
287
288 if (action->kind == SERVER_RESPOND && servctx->outstanding_responses) {
289 apr_size_t msg_len;
290 apr_size_t len;
291
292 msg_len = strlen(action->text);
293 len = msg_len - servctx->action_buf_pos;
294
295 status = servctx->send(servctx,
296 action->text + servctx->action_buf_pos,
297 &len);
298 if (status != APR_SUCCESS)
299 return status;
300
301 if (servctx->options & TEST_SERVER_DUMP)
302 fwrite(action->text + servctx->action_buf_pos, len, 1, stdout);
303
304 servctx->action_buf_pos += len;
305
306 if (servctx->action_buf_pos >= msg_len) {
307 next_action(servctx);
308 servctx->outstanding_responses--;
309 }
310 }
311 else if (action->kind == SERVER_KILL_CONNECTION ||
312 action->kind == SERVER_IGNORE_AND_KILL_CONNECTION) {
313 serf__log(TEST_VERBOSE, __FILE__,
314 "Killing this connection.\n");
315 apr_socket_close(servctx->client_sock);
316 servctx->client_sock = NULL;
317 next_action(servctx);
318 }
319 else if (action->kind == PROXY_FORWARD) {
320 apr_size_t len;
321 char *buf;
322
323 if (!servctx->proxy_client_sock) {
324 serf__log(TEST_VERBOSE, __FILE__, "Proxy: setting up connection "
325 "to server.\n");
326 status = create_client_socket(&servctx->proxy_client_sock,
327 servctx, action->text);
328 if (!servctx->clientstream)
329 servctx->clientstream = serf__bucket_stream_create(
330 servctx->allocator,
331 detect_eof,servctx);
332 }
333
334 /* Send all data received from the server to the client. */
335 do
336 {
337 apr_size_t readlen;
338
339 readlen = BUFSIZE;
340
341 status = serf_bucket_read(servctx->clientstream, readlen,
342 &buf, &readlen);
343 if (SERF_BUCKET_READ_ERROR(status))
344 return status;
345 if (!readlen)
346 break;
347
348 len = readlen;
349
350 serf__log(TEST_VERBOSE, __FILE__,
351 "proxy: sending %d bytes to client.\n", len);
352 status = servctx->send(servctx, buf, &len);
353 if (status != APR_SUCCESS) {
354 return status;
355 }
356
357 if (len != readlen) /* abort for now, return buf to aggregate
358 if not everything could be sent. */
359 return APR_EGENERAL;
360 } while (!status);
361 }
362 }
363 else if (rtnevents & APR_POLLIN) {
364 /* ignore */
365 }
366 else {
367 printf("Unknown rtnevents: %d\n", rtnevents);
368 abort();
369 }
370
371 return status;
372 }
373
374 /* Exchange data between proxy and server */
proxy_replay(serv_ctx_t * servctx,apr_int16_t rtnevents,apr_pool_t * pool)375 static apr_status_t proxy_replay(serv_ctx_t *servctx,
376 apr_int16_t rtnevents,
377 apr_pool_t *pool)
378 {
379 apr_status_t status;
380
381 if (rtnevents & APR_POLLIN) {
382 apr_size_t len;
383 char buf[BUFSIZE];
384 serf_bucket_t *tmp;
385
386 serf__log(TEST_VERBOSE, __FILE__, "proxy_replay: POLLIN\n");
387 /* Read all incoming data from the server to forward it to the
388 client later. */
389 do
390 {
391 len = BUFSIZE;
392
393 status = apr_socket_recv(servctx->proxy_client_sock, buf, &len);
394 if (SERF_BUCKET_READ_ERROR(status))
395 return status;
396
397 serf__log(TEST_VERBOSE, __FILE__,
398 "proxy: reading %d bytes %.*s from server.\n",
399 len, len, buf);
400 tmp = serf_bucket_simple_copy_create(buf, len,
401 servctx->allocator);
402 serf_bucket_aggregate_append(servctx->clientstream, tmp);
403 } while (!status);
404 }
405
406 if (rtnevents & APR_POLLOUT) {
407 apr_size_t len;
408 char *buf;
409
410 serf__log(TEST_VERBOSE, __FILE__, "proxy_replay: POLLOUT\n");
411 /* Send all data received from the client to the server. */
412 do
413 {
414 apr_size_t readlen;
415
416 readlen = BUFSIZE;
417
418 if (!servctx->servstream)
419 servctx->servstream = serf__bucket_stream_create(
420 servctx->allocator,
421 detect_eof,servctx);
422 status = serf_bucket_read(servctx->servstream, BUFSIZE,
423 &buf, &readlen);
424 if (SERF_BUCKET_READ_ERROR(status))
425 return status;
426 if (!readlen)
427 break;
428
429 len = readlen;
430
431 serf__log(TEST_VERBOSE, __FILE__,
432 "proxy: sending %d bytes %.*s to server.\n",
433 len, len, buf);
434 status = apr_socket_send(servctx->proxy_client_sock, buf, &len);
435 if (status != APR_SUCCESS) {
436 return status;
437 }
438
439 if (len != readlen) /* abort for now */
440 return APR_EGENERAL;
441 } while (!status);
442 }
443 else if (rtnevents & APR_POLLIN) {
444 /* ignore */
445 }
446 else {
447 printf("Unknown rtnevents: %d\n", rtnevents);
448 abort();
449 }
450
451 return status;
452 }
453
run_test_server(serv_ctx_t * servctx,apr_short_interval_time_t duration,apr_pool_t * pool)454 apr_status_t run_test_server(serv_ctx_t *servctx,
455 apr_short_interval_time_t duration,
456 apr_pool_t *pool)
457 {
458 apr_status_t status;
459 apr_pollset_t *pollset;
460 apr_int32_t num;
461 const apr_pollfd_t *desc;
462
463 /* create a new pollset */
464 #ifdef BROKEN_WSAPOLL
465 status = apr_pollset_create_ex(&pollset, 32, pool, 0,
466 APR_POLLSET_SELECT);
467 #else
468 status = apr_pollset_create(&pollset, 32, pool, 0);
469 #endif
470
471 if (status != APR_SUCCESS)
472 return status;
473
474 /* Don't accept new connection while processing client connection. At
475 least for present time.*/
476 if (servctx->client_sock) {
477 apr_pollfd_t pfd = { 0 };
478
479 pfd.desc_type = APR_POLL_SOCKET;
480 pfd.desc.s = servctx->client_sock;
481 pfd.reqevents = APR_POLLIN | APR_POLLOUT;
482
483 status = apr_pollset_add(pollset, &pfd);
484 if (status != APR_SUCCESS)
485 goto cleanup;
486
487 if (servctx->proxy_client_sock) {
488 apr_pollfd_t pfd = { 0 };
489
490 pfd.desc_type = APR_POLL_SOCKET;
491 pfd.desc.s = servctx->proxy_client_sock;
492 pfd.reqevents = APR_POLLIN | APR_POLLOUT;
493
494 status = apr_pollset_add(pollset, &pfd);
495 if (status != APR_SUCCESS)
496 goto cleanup;
497 }
498 }
499 else {
500 apr_pollfd_t pfd = { 0 };
501
502 pfd.desc_type = APR_POLL_SOCKET;
503 pfd.desc.s = servctx->serv_sock;
504 pfd.reqevents = APR_POLLIN;
505
506 status = apr_pollset_add(pollset, &pfd);
507 if (status != APR_SUCCESS)
508 goto cleanup;
509 }
510
511 status = apr_pollset_poll(pollset, APR_USEC_PER_SEC >> 1, &num, &desc);
512 if (status != APR_SUCCESS)
513 goto cleanup;
514
515 while (num--) {
516 if (desc->desc.s == servctx->serv_sock) {
517 status = apr_socket_accept(&servctx->client_sock, servctx->serv_sock,
518 servctx->pool);
519 if (status != APR_SUCCESS)
520 goto cleanup;
521
522 serf__log_skt(TEST_VERBOSE, __FILE__, servctx->client_sock,
523 "server/proxy accepted incoming connection.\n");
524
525
526 apr_socket_opt_set(servctx->client_sock, APR_SO_NONBLOCK, 1);
527 apr_socket_timeout_set(servctx->client_sock, 0);
528
529 status = APR_SUCCESS;
530 goto cleanup;
531 }
532
533 if (desc->desc.s == servctx->client_sock) {
534 if (servctx->handshake) {
535 status = servctx->handshake(servctx);
536 if (status)
537 goto cleanup;
538 }
539
540 /* Replay data to socket. */
541 status = replay(servctx, desc->rtnevents, pool);
542
543 if (APR_STATUS_IS_EOF(status)) {
544 apr_socket_close(servctx->client_sock);
545 servctx->client_sock = NULL;
546 if (servctx->reset)
547 servctx->reset(servctx);
548
549 /* If this is a proxy and the client closed the connection, also
550 close the connection to the server. */
551 if (servctx->proxy_client_sock) {
552 apr_socket_close(servctx->proxy_client_sock);
553 servctx->proxy_client_sock = NULL;
554 goto cleanup;
555 }
556 }
557 else if (APR_STATUS_IS_EAGAIN(status)) {
558 status = APR_SUCCESS;
559 }
560 else if (status != APR_SUCCESS) {
561 /* Real error. */
562 goto cleanup;
563 }
564 }
565 if (desc->desc.s == servctx->proxy_client_sock) {
566 /* Replay data to proxy socket. */
567 status = proxy_replay(servctx, desc->rtnevents, pool);
568 if (APR_STATUS_IS_EOF(status)) {
569 apr_socket_close(servctx->proxy_client_sock);
570 servctx->proxy_client_sock = NULL;
571 }
572 else if (APR_STATUS_IS_EAGAIN(status)) {
573 status = APR_SUCCESS;
574 }
575 else if (status != APR_SUCCESS) {
576 /* Real error. */
577 goto cleanup;
578 }
579 }
580
581 desc++;
582 }
583
584 cleanup:
585 apr_pollset_destroy(pollset);
586
587 return status;
588 }
589
590
591 /* Setup the context needed to start a TCP server on adress.
592 message_list is a list of expected requests.
593 action_list is the list of responses to be returned in order.
594 */
setup_test_server(serv_ctx_t ** servctx_p,apr_sockaddr_t * address,test_server_message_t * message_list,apr_size_t message_count,test_server_action_t * action_list,apr_size_t action_count,apr_int32_t options,apr_pool_t * pool)595 void setup_test_server(serv_ctx_t **servctx_p,
596 apr_sockaddr_t *address,
597 test_server_message_t *message_list,
598 apr_size_t message_count,
599 test_server_action_t *action_list,
600 apr_size_t action_count,
601 apr_int32_t options,
602 apr_pool_t *pool)
603 {
604 serv_ctx_t *servctx;
605
606 servctx = apr_pcalloc(pool, sizeof(*servctx));
607 apr_pool_cleanup_register(pool, servctx,
608 cleanup_server,
609 apr_pool_cleanup_null);
610 *servctx_p = servctx;
611
612 servctx->serv_addr = address;
613 servctx->options = options;
614 servctx->pool = pool;
615 servctx->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
616 servctx->message_list = message_list;
617 servctx->message_count = message_count;
618 servctx->action_list = action_list;
619 servctx->action_count = action_count;
620
621 /* Start replay from first action. */
622 servctx->cur_action = 0;
623 servctx->action_buf_pos = 0;
624 servctx->outstanding_responses = 0;
625
626 servctx->read = socket_read;
627 servctx->send = socket_write;
628
629 *servctx_p = servctx;
630 }
631
start_test_server(serv_ctx_t * servctx)632 apr_status_t start_test_server(serv_ctx_t *servctx)
633 {
634 apr_status_t status;
635 apr_socket_t *serv_sock;
636
637 /* create server socket */
638 #if APR_VERSION_AT_LEAST(1, 0, 0)
639 status = apr_socket_create(&serv_sock, servctx->serv_addr->family,
640 SOCK_STREAM, 0,
641 servctx->pool);
642 #else
643 status = apr_socket_create(&serv_sock, servctx->serv_addr->family,
644 SOCK_STREAM,
645 servctx->pool);
646 #endif
647
648 if (status != APR_SUCCESS)
649 return status;
650
651 apr_socket_opt_set(serv_sock, APR_SO_NONBLOCK, 1);
652 apr_socket_timeout_set(serv_sock, 0);
653 apr_socket_opt_set(serv_sock, APR_SO_REUSEADDR, 1);
654
655 status = apr_socket_bind(serv_sock, servctx->serv_addr);
656 if (status != APR_SUCCESS)
657 return status;
658
659 /* listen for clients */
660 status = apr_socket_listen(serv_sock, SOMAXCONN);
661 if (status != APR_SUCCESS)
662 return status;
663
664 servctx->serv_sock = serv_sock;
665 servctx->client_sock = NULL;
666
667 return APR_SUCCESS;
668 }
669