1 /* ====================================================================
2  *    Licensed to the Apache Software Foundation (ASF) under one
3  *    or more contributor license agreements.  See the NOTICE file
4  *    distributed with this work for additional information
5  *    regarding copyright ownership.  The ASF licenses this file
6  *    to you under the Apache License, Version 2.0 (the
7  *    "License"); you may not use this file except in compliance
8  *    with the License.  You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *    Unless required by applicable law or agreed to in writing,
13  *    software distributed under the License is distributed on an
14  *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  *    KIND, either express or implied.  See the License for the
16  *    specific language governing permissions and limitations
17  *    under the License.
18  * ====================================================================
19  */
20 
21 #include "apr.h"
22 #include "apr_pools.h"
23 #include <apr_poll.h>
24 #include <apr_version.h>
25 #include <stdlib.h>
26 
27 #include "serf.h"
28 #include "serf_private.h" /* for serf__log and serf__bucket_stream_create */
29 
30 #include "test_server.h"
31 
32 #define BUFSIZE 8192
33 
34 /* Cleanup callback for a server. */
cleanup_server(void * baton)35 static apr_status_t cleanup_server(void *baton)
36 {
37     serv_ctx_t *servctx = baton;
38     apr_status_t status;
39 
40     if (servctx->serv_sock)
41       status = apr_socket_close(servctx->serv_sock);
42     else
43       status = APR_EGENERAL;
44 
45     if (servctx->client_sock) {
46         apr_socket_close(servctx->client_sock);
47     }
48 
49     return status;
50 }
51 
52 /* Replay support functions */
next_message(serv_ctx_t * servctx)53 static void next_message(serv_ctx_t *servctx)
54 {
55     servctx->cur_message++;
56 }
57 
next_action(serv_ctx_t * servctx)58 static void next_action(serv_ctx_t *servctx)
59 {
60     servctx->cur_action++;
61     servctx->action_buf_pos = 0;
62 }
63 
64 static apr_status_t
socket_write(serv_ctx_t * serv_ctx,const char * data,apr_size_t * len)65 socket_write(serv_ctx_t *serv_ctx, const char *data,
66              apr_size_t *len)
67 {
68     return apr_socket_send(serv_ctx->client_sock, data, len);
69 }
70 
71 static apr_status_t
socket_read(serv_ctx_t * serv_ctx,char * data,apr_size_t * len)72 socket_read(serv_ctx_t *serv_ctx, char *data,
73             apr_size_t *len)
74 {
75     return apr_socket_recv(serv_ctx->client_sock, data, len);
76 }
77 
78 static apr_status_t
create_client_socket(apr_socket_t ** skt,serv_ctx_t * servctx,const char * url)79 create_client_socket(apr_socket_t **skt,
80                      serv_ctx_t *servctx,
81                      const char *url)
82 {
83     apr_sockaddr_t *address;
84     apr_uri_t uri;
85     apr_status_t status;
86 
87     status = apr_uri_parse(servctx->pool, url, &uri);
88     if (status != APR_SUCCESS)
89         return status;
90 
91     status = apr_sockaddr_info_get(&address,
92                                    uri.hostname,
93                                    APR_UNSPEC,
94                                    uri.port,
95                                    0,
96                                    servctx->pool);
97     if (status != APR_SUCCESS)
98         return status;
99 
100     status = apr_socket_create(skt,
101                                address->family,
102                                SOCK_STREAM,
103 #if APR_MAJOR_VERSION > 0
104                                APR_PROTO_TCP,
105 #endif
106                                servctx->pool);
107     if (status != APR_SUCCESS)
108         return status;
109 
110     /* Set the socket to be non-blocking */
111     status = apr_socket_timeout_set(*skt, 0);
112     if (status != APR_SUCCESS)
113         return status;
114 
115     status = apr_socket_connect(*skt, address);
116     if (status != APR_SUCCESS && !APR_STATUS_IS_EINPROGRESS(status))
117         return status;
118 
119     return APR_SUCCESS;
120 }
121 
detect_eof(void * baton,serf_bucket_t * aggregate_bucket)122 static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
123 {
124     return APR_EAGAIN;
125 }
126 
127 /* Verify received requests and take the necessary actions
128    (return a response, kill the connection ...) */
replay(serv_ctx_t * servctx,apr_int16_t rtnevents,apr_pool_t * pool)129 static apr_status_t replay(serv_ctx_t *servctx,
130                            apr_int16_t rtnevents,
131                            apr_pool_t *pool)
132 {
133     apr_status_t status = APR_SUCCESS;
134     test_server_action_t *action;
135 
136     if (rtnevents & APR_POLLIN) {
137         if (servctx->message_list == NULL) {
138             /* we're not expecting any requests to reach this server! */
139             serf__log(TEST_VERBOSE, __FILE__,
140                       "Received request where none was expected.\n");
141 
142             return SERF_ERROR_ISSUE_IN_TESTSUITE;
143         }
144 
145         if (servctx->cur_action >= servctx->action_count) {
146             char buf[128];
147             apr_size_t len = sizeof(buf);
148 
149             status = servctx->read(servctx, buf, &len);
150             if (! APR_STATUS_IS_EAGAIN(status)) {
151                 /* we're out of actions! */
152                 serf__log(TEST_VERBOSE, __FILE__,
153                           "Received more requests than expected.\n");
154 
155                 return SERF_ERROR_ISSUE_IN_TESTSUITE;
156             }
157             return status;
158         }
159 
160         action = &servctx->action_list[servctx->cur_action];
161 
162         serf__log(TEST_VERBOSE, __FILE__,
163                   "POLLIN while replaying action %d, kind: %d.\n",
164                   servctx->cur_action, action->kind);
165 
166         /* Read the remaining data from the client and kill the socket. */
167         if (action->kind == SERVER_IGNORE_AND_KILL_CONNECTION) {
168             char buf[128];
169             apr_size_t len = sizeof(buf);
170 
171             status = servctx->read(servctx, buf, &len);
172 
173             if (status == APR_EOF) {
174                 serf__log(TEST_VERBOSE, __FILE__,
175                           "Killing this connection.\n");
176                 apr_socket_close(servctx->client_sock);
177                 servctx->client_sock = NULL;
178                 next_action(servctx);
179                 return APR_SUCCESS;
180             }
181 
182             return status;
183         }
184         else if (action->kind == SERVER_RECV ||
185                  (action->kind == SERVER_RESPOND &&
186                   servctx->outstanding_responses == 0)) {
187             apr_size_t msg_len, len;
188             char buf[128];
189             test_server_message_t *message;
190 
191             message = &servctx->message_list[servctx->cur_message];
192             msg_len = strlen(message->text);
193 
194             do
195             {
196                 len = msg_len - servctx->message_buf_pos;
197                 if (len > sizeof(buf))
198                     len = sizeof(buf);
199 
200                 status = servctx->read(servctx, buf, &len);
201                 if (SERF_BUCKET_READ_ERROR(status))
202                     return status;
203 
204                 if (status == APR_EOF) {
205                     serf__log(TEST_VERBOSE, __FILE__,
206                               "Server: Client hung up the connection.\n");
207                     break;
208                 }
209                 if (servctx->options & TEST_SERVER_DUMP)
210                     fwrite(buf, len, 1, stdout);
211 
212                 if (strncmp(buf,
213                             message->text + servctx->message_buf_pos,
214                             len) != 0) {
215                     /* ## TODO: Better diagnostics. */
216                     printf("Expected: (\n");
217                     fwrite(message->text + servctx->message_buf_pos, len, 1,
218                            stdout);
219                     printf(")\n");
220                     printf("Actual: (\n");
221                     fwrite(buf, len, 1, stdout);
222                     printf(")\n");
223 
224                     return SERF_ERROR_ISSUE_IN_TESTSUITE;
225                 }
226 
227                 servctx->message_buf_pos += len;
228 
229                 if (servctx->message_buf_pos >= msg_len) {
230                     next_message(servctx);
231                     servctx->message_buf_pos -= msg_len;
232                     if (action->kind == SERVER_RESPOND)
233                         servctx->outstanding_responses++;
234                     if (action->kind == SERVER_RECV)
235                         next_action(servctx);
236                     break;
237                 }
238             } while (!status);
239         }
240         else if (action->kind == PROXY_FORWARD) {
241             apr_size_t len;
242             char buf[BUFSIZE];
243             serf_bucket_t *tmp;
244 
245             /* Read all incoming data from the client to forward it to the
246                server later. */
247             do
248             {
249                 len = BUFSIZE;
250 
251                 status = servctx->read(servctx, buf, &len);
252                 if (SERF_BUCKET_READ_ERROR(status))
253                     return status;
254 
255                 serf__log(TEST_VERBOSE, __FILE__,
256                           "proxy: reading %d bytes %.*s from client with "
257                           "status %d.\n",
258                           len, len, buf, status);
259 
260                 if (status == APR_EOF) {
261                     serf__log(TEST_VERBOSE, __FILE__,
262                               "Proxy: client hung up the connection. Reset the "
263                               "connection to the server.\n");
264                     /* We have to stop forwarding, if a new connection opens
265                        the CONNECT request should not be forwarded to the
266                        server. */
267                     next_action(servctx);
268                 }
269                 if (!servctx->servstream)
270                     servctx->servstream = serf__bucket_stream_create(
271                                               servctx->allocator,
272                                               detect_eof,servctx);
273                 if (len) {
274                     tmp = serf_bucket_simple_copy_create(buf, len,
275                                                          servctx->allocator);
276                     serf_bucket_aggregate_append(servctx->servstream, tmp);
277                 }
278             } while (!status);
279         }
280     }
281     if (rtnevents & APR_POLLOUT) {
282         action = &servctx->action_list[servctx->cur_action];
283 
284         serf__log(TEST_VERBOSE, __FILE__,
285                   "POLLOUT when replaying action %d, kind: %d.\n", servctx->cur_action,
286                   action->kind);
287 
288         if (action->kind == SERVER_RESPOND && servctx->outstanding_responses) {
289             apr_size_t msg_len;
290             apr_size_t len;
291 
292             msg_len = strlen(action->text);
293             len = msg_len - servctx->action_buf_pos;
294 
295             status = servctx->send(servctx,
296                                    action->text + servctx->action_buf_pos,
297                                    &len);
298             if (status != APR_SUCCESS)
299                 return status;
300 
301             if (servctx->options & TEST_SERVER_DUMP)
302                 fwrite(action->text + servctx->action_buf_pos, len, 1, stdout);
303 
304             servctx->action_buf_pos += len;
305 
306             if (servctx->action_buf_pos >= msg_len) {
307                 next_action(servctx);
308                 servctx->outstanding_responses--;
309             }
310         }
311         else if (action->kind == SERVER_KILL_CONNECTION ||
312                  action->kind == SERVER_IGNORE_AND_KILL_CONNECTION) {
313             serf__log(TEST_VERBOSE, __FILE__,
314                       "Killing this connection.\n");
315             apr_socket_close(servctx->client_sock);
316             servctx->client_sock = NULL;
317             next_action(servctx);
318         }
319         else if (action->kind == PROXY_FORWARD) {
320             apr_size_t len;
321             char *buf;
322 
323             if (!servctx->proxy_client_sock) {
324                 serf__log(TEST_VERBOSE, __FILE__, "Proxy: setting up connection "
325                           "to server.\n");
326                 status = create_client_socket(&servctx->proxy_client_sock,
327                                               servctx, action->text);
328                 if (!servctx->clientstream)
329                     servctx->clientstream = serf__bucket_stream_create(
330                                                 servctx->allocator,
331                                                 detect_eof,servctx);
332             }
333 
334             /* Send all data received from the server to the client. */
335             do
336             {
337                 apr_size_t readlen;
338 
339                 readlen = BUFSIZE;
340 
341                 status = serf_bucket_read(servctx->clientstream, readlen,
342                                           &buf, &readlen);
343                 if (SERF_BUCKET_READ_ERROR(status))
344                     return status;
345                 if (!readlen)
346                     break;
347 
348                 len = readlen;
349 
350                 serf__log(TEST_VERBOSE, __FILE__,
351                           "proxy: sending %d bytes to client.\n", len);
352                 status = servctx->send(servctx, buf, &len);
353                 if (status != APR_SUCCESS) {
354                     return status;
355                 }
356 
357                 if (len != readlen) /* abort for now, return buf to aggregate
358                                        if not everything could be sent. */
359                     return APR_EGENERAL;
360             } while (!status);
361         }
362     }
363     else if (rtnevents & APR_POLLIN) {
364         /* ignore */
365     }
366     else {
367         printf("Unknown rtnevents: %d\n", rtnevents);
368         abort();
369     }
370 
371     return status;
372 }
373 
374 /* Exchange data between proxy and server */
proxy_replay(serv_ctx_t * servctx,apr_int16_t rtnevents,apr_pool_t * pool)375 static apr_status_t proxy_replay(serv_ctx_t *servctx,
376                                  apr_int16_t rtnevents,
377                                  apr_pool_t *pool)
378 {
379     apr_status_t status;
380 
381     if (rtnevents & APR_POLLIN) {
382         apr_size_t len;
383         char buf[BUFSIZE];
384         serf_bucket_t *tmp;
385 
386         serf__log(TEST_VERBOSE, __FILE__, "proxy_replay: POLLIN\n");
387         /* Read all incoming data from the server to forward it to the
388            client later. */
389         do
390         {
391             len = BUFSIZE;
392 
393             status = apr_socket_recv(servctx->proxy_client_sock, buf, &len);
394             if (SERF_BUCKET_READ_ERROR(status))
395                 return status;
396 
397             serf__log(TEST_VERBOSE, __FILE__,
398                       "proxy: reading %d bytes %.*s from server.\n",
399                       len, len, buf);
400             tmp = serf_bucket_simple_copy_create(buf, len,
401                                                  servctx->allocator);
402             serf_bucket_aggregate_append(servctx->clientstream, tmp);
403         } while (!status);
404     }
405 
406     if (rtnevents & APR_POLLOUT) {
407         apr_size_t len;
408         char *buf;
409 
410         serf__log(TEST_VERBOSE, __FILE__, "proxy_replay: POLLOUT\n");
411         /* Send all data received from the client to the server. */
412         do
413         {
414             apr_size_t readlen;
415 
416             readlen = BUFSIZE;
417 
418             if (!servctx->servstream)
419                 servctx->servstream = serf__bucket_stream_create(
420                                           servctx->allocator,
421                                           detect_eof,servctx);
422             status = serf_bucket_read(servctx->servstream, BUFSIZE,
423                                       &buf, &readlen);
424             if (SERF_BUCKET_READ_ERROR(status))
425                 return status;
426             if (!readlen)
427                 break;
428 
429             len = readlen;
430 
431             serf__log(TEST_VERBOSE, __FILE__,
432                       "proxy: sending %d bytes %.*s to server.\n",
433                       len, len, buf);
434             status = apr_socket_send(servctx->proxy_client_sock, buf, &len);
435             if (status != APR_SUCCESS) {
436                 return status;
437             }
438 
439             if (len != readlen) /* abort for now */
440                 return APR_EGENERAL;
441         } while (!status);
442     }
443     else if (rtnevents & APR_POLLIN) {
444         /* ignore */
445     }
446     else {
447         printf("Unknown rtnevents: %d\n", rtnevents);
448         abort();
449     }
450 
451     return status;
452 }
453 
run_test_server(serv_ctx_t * servctx,apr_short_interval_time_t duration,apr_pool_t * pool)454 apr_status_t run_test_server(serv_ctx_t *servctx,
455                              apr_short_interval_time_t duration,
456                              apr_pool_t *pool)
457 {
458     apr_status_t status;
459     apr_pollset_t *pollset;
460     apr_int32_t num;
461     const apr_pollfd_t *desc;
462 
463     /* create a new pollset */
464 #ifdef BROKEN_WSAPOLL
465     status = apr_pollset_create_ex(&pollset, 32, pool, 0,
466                                  APR_POLLSET_SELECT);
467 #else
468     status = apr_pollset_create(&pollset, 32, pool, 0);
469 #endif
470 
471     if (status != APR_SUCCESS)
472         return status;
473 
474     /* Don't accept new connection while processing client connection. At
475        least for present time.*/
476     if (servctx->client_sock) {
477         apr_pollfd_t pfd = { 0 };
478 
479         pfd.desc_type = APR_POLL_SOCKET;
480         pfd.desc.s = servctx->client_sock;
481         pfd.reqevents = APR_POLLIN | APR_POLLOUT;
482 
483         status = apr_pollset_add(pollset, &pfd);
484         if (status != APR_SUCCESS)
485             goto cleanup;
486 
487         if (servctx->proxy_client_sock) {
488             apr_pollfd_t pfd = { 0 };
489 
490             pfd.desc_type = APR_POLL_SOCKET;
491             pfd.desc.s = servctx->proxy_client_sock;
492             pfd.reqevents = APR_POLLIN | APR_POLLOUT;
493 
494             status = apr_pollset_add(pollset, &pfd);
495             if (status != APR_SUCCESS)
496                 goto cleanup;
497         }
498     }
499     else {
500         apr_pollfd_t pfd = { 0 };
501 
502         pfd.desc_type = APR_POLL_SOCKET;
503         pfd.desc.s = servctx->serv_sock;
504         pfd.reqevents = APR_POLLIN;
505 
506         status = apr_pollset_add(pollset, &pfd);
507         if (status != APR_SUCCESS)
508             goto cleanup;
509     }
510 
511     status = apr_pollset_poll(pollset, APR_USEC_PER_SEC >> 1, &num, &desc);
512     if (status != APR_SUCCESS)
513         goto cleanup;
514 
515     while (num--) {
516         if (desc->desc.s == servctx->serv_sock) {
517             status = apr_socket_accept(&servctx->client_sock, servctx->serv_sock,
518                                        servctx->pool);
519             if (status != APR_SUCCESS)
520                 goto cleanup;
521 
522             serf__log_skt(TEST_VERBOSE, __FILE__, servctx->client_sock,
523                           "server/proxy accepted incoming connection.\n");
524 
525 
526             apr_socket_opt_set(servctx->client_sock, APR_SO_NONBLOCK, 1);
527             apr_socket_timeout_set(servctx->client_sock, 0);
528 
529             status = APR_SUCCESS;
530             goto cleanup;
531         }
532 
533         if (desc->desc.s == servctx->client_sock) {
534             if (servctx->handshake) {
535                 status = servctx->handshake(servctx);
536                 if (status)
537                     goto cleanup;
538             }
539 
540             /* Replay data to socket. */
541             status = replay(servctx, desc->rtnevents, pool);
542 
543             if (APR_STATUS_IS_EOF(status)) {
544                 apr_socket_close(servctx->client_sock);
545                 servctx->client_sock = NULL;
546                 if (servctx->reset)
547                     servctx->reset(servctx);
548 
549                 /* If this is a proxy and the client closed the connection, also
550                    close the connection to the server. */
551                 if (servctx->proxy_client_sock) {
552                     apr_socket_close(servctx->proxy_client_sock);
553                     servctx->proxy_client_sock = NULL;
554                     goto cleanup;
555                 }
556             }
557             else if (APR_STATUS_IS_EAGAIN(status)) {
558                 status = APR_SUCCESS;
559             }
560             else if (status != APR_SUCCESS) {
561                 /* Real error. */
562                 goto cleanup;
563             }
564         }
565         if (desc->desc.s == servctx->proxy_client_sock) {
566             /* Replay data to proxy socket. */
567             status = proxy_replay(servctx, desc->rtnevents, pool);
568             if (APR_STATUS_IS_EOF(status)) {
569                 apr_socket_close(servctx->proxy_client_sock);
570                 servctx->proxy_client_sock = NULL;
571             }
572             else if (APR_STATUS_IS_EAGAIN(status)) {
573                 status = APR_SUCCESS;
574             }
575             else if (status != APR_SUCCESS) {
576                 /* Real error. */
577                 goto cleanup;
578             }
579         }
580 
581         desc++;
582     }
583 
584 cleanup:
585     apr_pollset_destroy(pollset);
586 
587     return status;
588 }
589 
590 
591 /* Setup the context needed to start a TCP server on adress.
592    message_list is a list of expected requests.
593    action_list is the list of responses to be returned in order.
594  */
setup_test_server(serv_ctx_t ** servctx_p,apr_sockaddr_t * address,test_server_message_t * message_list,apr_size_t message_count,test_server_action_t * action_list,apr_size_t action_count,apr_int32_t options,apr_pool_t * pool)595 void setup_test_server(serv_ctx_t **servctx_p,
596                        apr_sockaddr_t *address,
597                        test_server_message_t *message_list,
598                        apr_size_t message_count,
599                        test_server_action_t *action_list,
600                        apr_size_t action_count,
601                        apr_int32_t options,
602                        apr_pool_t *pool)
603 {
604     serv_ctx_t *servctx;
605 
606     servctx = apr_pcalloc(pool, sizeof(*servctx));
607     apr_pool_cleanup_register(pool, servctx,
608                               cleanup_server,
609                               apr_pool_cleanup_null);
610     *servctx_p = servctx;
611 
612     servctx->serv_addr = address;
613     servctx->options = options;
614     servctx->pool = pool;
615     servctx->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
616     servctx->message_list = message_list;
617     servctx->message_count = message_count;
618     servctx->action_list = action_list;
619     servctx->action_count = action_count;
620 
621     /* Start replay from first action. */
622     servctx->cur_action = 0;
623     servctx->action_buf_pos = 0;
624     servctx->outstanding_responses = 0;
625 
626     servctx->read = socket_read;
627     servctx->send = socket_write;
628 
629     *servctx_p = servctx;
630 }
631 
start_test_server(serv_ctx_t * servctx)632 apr_status_t start_test_server(serv_ctx_t *servctx)
633 {
634     apr_status_t status;
635     apr_socket_t *serv_sock;
636 
637     /* create server socket */
638 #if APR_VERSION_AT_LEAST(1, 0, 0)
639     status = apr_socket_create(&serv_sock, servctx->serv_addr->family,
640                                SOCK_STREAM, 0,
641                                servctx->pool);
642 #else
643     status = apr_socket_create(&serv_sock, servctx->serv_addr->family,
644                                SOCK_STREAM,
645                                servctx->pool);
646 #endif
647 
648     if (status != APR_SUCCESS)
649         return status;
650 
651     apr_socket_opt_set(serv_sock, APR_SO_NONBLOCK, 1);
652     apr_socket_timeout_set(serv_sock, 0);
653     apr_socket_opt_set(serv_sock, APR_SO_REUSEADDR, 1);
654 
655     status = apr_socket_bind(serv_sock, servctx->serv_addr);
656     if (status != APR_SUCCESS)
657         return status;
658 
659     /* listen for clients */
660     status = apr_socket_listen(serv_sock, SOMAXCONN);
661     if (status != APR_SUCCESS)
662         return status;
663 
664     servctx->serv_sock = serv_sock;
665     servctx->client_sock = NULL;
666 
667     return APR_SUCCESS;
668 }
669