1 /*
2   +----------------------------------------------------------------------+
3   | Swoole                                                               |
4   +----------------------------------------------------------------------+
5   | This source file is subject to version 2.0 of the Apache license,    |
6   | that is bundled with this package in the file LICENSE, and is        |
7   | available through the world-wide-web at the following url:           |
8   | http://www.apache.org/licenses/LICENSE-2.0.html                      |
9   | If you did not receive a copy of the Apache2.0 license and are unable|
10   | to obtain it through the world-wide-web, please send a note to       |
11   | license@swoole.com so we can mail you a copy immediately.            |
12   +----------------------------------------------------------------------+
13   | @link     https://www.swoole.com/                                    |
14   | @contact  team@swoole.com                                            |
15   | @license  https://github.com/swoole/swoole-src/blob/master/LICENSE   |
16   | @author   Tianfeng Han  <mikan.tenny@gmail.com>                      |
17   +----------------------------------------------------------------------+
18 */
19 
20 #include "test_core.h"
21 #include "swoole_server.h"
22 
23 using namespace std;
24 using namespace swoole::network;
25 
TEST(stream,send)26 TEST(stream, send) {
27     swServer serv(swoole::Server::MODE_BASE);
28     serv.worker_num = 1;
29     int ori_log_level = sw_logger()->get_level();
30     sw_logger()->set_level(SW_LOG_ERROR);
31 
32     swListenPort *port = serv.add_port(SW_SOCK_TCP, TEST_HOST, TEST_PORT);
33     if (!port) {
34         swoole_warning("listen failed, [error=%d]", swoole_get_last_error());
35         exit(2);
36     }
37 
38     port->open_length_check = true;
39     Stream::set_protocol(&port->protocol);
40 
41     mutex lock;
42     lock.lock();
43 
44     char buf[65536];
45     ASSERT_EQ(swoole_random_bytes(buf, sizeof(buf)), sizeof(buf));
46 
47     ASSERT_EQ(serv.create(), SW_OK);
48 
49     std::thread t1([&]() {
50         swoole_signal_block_all();
51 
52         lock.lock();
53 
54         swoole_event_init(SW_EVENTLOOP_WAIT_EXIT);
55 
56         // bad request
57         auto stream0 = Stream::create(TEST_TMP_FILE, 0, SW_SOCK_UNIX_STREAM);
58         ASSERT_EQ(stream0, nullptr);
59 
60         // bad request
61         auto stream1 = Stream::create(TEST_HOST, 39999, SW_SOCK_TCP);
62         ASSERT_TRUE(stream1);
63         stream1->response = [](Stream *stream, const char *data, uint32_t length) {
64             EXPECT_EQ(data, nullptr);
65             EXPECT_EQ(stream->errCode, ECONNREFUSED);
66         };
67         ASSERT_EQ(stream1->send(buf, sizeof(buf)), SW_OK);
68 
69         // success requset
70         auto stream2 = Stream::create(TEST_HOST, TEST_PORT, SW_SOCK_TCP);
71         ASSERT_TRUE(stream2);
72         stream2->private_data = new string(buf, sizeof(buf));
73         stream2->response = [](Stream *stream, const char *data, uint32_t length) {
74             string *buf = (string *) stream->private_data;
75             string pkt = string("Server: ") + *buf;
76             EXPECT_EQ(string(data, length), pkt);
77             delete buf;
78         };
79         ASSERT_EQ(stream2->send(buf, sizeof(buf)), SW_OK);
80 
81         swoole_event_wait();
82 
83         kill(getpid(), SIGTERM);
84     });
85 
86     serv.onWorkerStart = [&lock](swServer *serv, int worker_id) { lock.unlock(); };
87 
88     serv.onReceive = [&buf](swServer *serv, swRecvData *req) -> int {
89         string req_body(req->data + 4, req->info.len - 4);
90 
91         EXPECT_EQ(string(buf, sizeof(buf)), req_body);
92 
93         string pkt = string("Server: ") + req_body;
94         int packed_len = htonl(pkt.length());
95 
96         EXPECT_TRUE(serv->send(req->info.fd, &packed_len, sizeof(packed_len)));
97         EXPECT_TRUE(serv->send(req->info.fd, pkt.c_str(), pkt.length()));
98 
99         // end stream
100         packed_len = htonl(0);
101         EXPECT_TRUE(serv->send(req->info.fd, &packed_len, sizeof(packed_len)));
102 
103         return SW_OK;
104     };
105 
106     serv.start();
107     t1.join();
108 
109     sw_logger()->set_level(ori_log_level);
110 }
111