1 /*
2 +----------------------------------------------------------------------+
3 | Swoole |
4 +----------------------------------------------------------------------+
5 | This source file is subject to version 2.0 of the Apache license, |
6 | that is bundled with this package in the file LICENSE, and is |
7 | available through the world-wide-web at the following url: |
8 | http://www.apache.org/licenses/LICENSE-2.0.html |
9 | If you did not receive a copy of the Apache2.0 license and are unable|
10 | to obtain it through the world-wide-web, please send a note to |
11 | license@swoole.com so we can mail you a copy immediately. |
12 +----------------------------------------------------------------------+
13 | @link https://www.swoole.com/ |
14 | @contact team@swoole.com |
15 | @license https://github.com/swoole/swoole-src/blob/master/LICENSE |
16 | @author Tianfeng Han <mikan.tenny@gmail.com> |
17 +----------------------------------------------------------------------+
18 */
19
20 #include "test_core.h"
21 #include "swoole_server.h"
22
23 using namespace std;
24 using namespace swoole::network;
25
TEST(stream,send)26 TEST(stream, send) {
27 swServer serv(swoole::Server::MODE_BASE);
28 serv.worker_num = 1;
29 int ori_log_level = sw_logger()->get_level();
30 sw_logger()->set_level(SW_LOG_ERROR);
31
32 swListenPort *port = serv.add_port(SW_SOCK_TCP, TEST_HOST, TEST_PORT);
33 if (!port) {
34 swoole_warning("listen failed, [error=%d]", swoole_get_last_error());
35 exit(2);
36 }
37
38 port->open_length_check = true;
39 Stream::set_protocol(&port->protocol);
40
41 mutex lock;
42 lock.lock();
43
44 char buf[65536];
45 ASSERT_EQ(swoole_random_bytes(buf, sizeof(buf)), sizeof(buf));
46
47 ASSERT_EQ(serv.create(), SW_OK);
48
49 std::thread t1([&]() {
50 swoole_signal_block_all();
51
52 lock.lock();
53
54 swoole_event_init(SW_EVENTLOOP_WAIT_EXIT);
55
56 // bad request
57 auto stream0 = Stream::create(TEST_TMP_FILE, 0, SW_SOCK_UNIX_STREAM);
58 ASSERT_EQ(stream0, nullptr);
59
60 // bad request
61 auto stream1 = Stream::create(TEST_HOST, 39999, SW_SOCK_TCP);
62 ASSERT_TRUE(stream1);
63 stream1->response = [](Stream *stream, const char *data, uint32_t length) {
64 EXPECT_EQ(data, nullptr);
65 EXPECT_EQ(stream->errCode, ECONNREFUSED);
66 };
67 ASSERT_EQ(stream1->send(buf, sizeof(buf)), SW_OK);
68
69 // success requset
70 auto stream2 = Stream::create(TEST_HOST, TEST_PORT, SW_SOCK_TCP);
71 ASSERT_TRUE(stream2);
72 stream2->private_data = new string(buf, sizeof(buf));
73 stream2->response = [](Stream *stream, const char *data, uint32_t length) {
74 string *buf = (string *) stream->private_data;
75 string pkt = string("Server: ") + *buf;
76 EXPECT_EQ(string(data, length), pkt);
77 delete buf;
78 };
79 ASSERT_EQ(stream2->send(buf, sizeof(buf)), SW_OK);
80
81 swoole_event_wait();
82
83 kill(getpid(), SIGTERM);
84 });
85
86 serv.onWorkerStart = [&lock](swServer *serv, int worker_id) { lock.unlock(); };
87
88 serv.onReceive = [&buf](swServer *serv, swRecvData *req) -> int {
89 string req_body(req->data + 4, req->info.len - 4);
90
91 EXPECT_EQ(string(buf, sizeof(buf)), req_body);
92
93 string pkt = string("Server: ") + req_body;
94 int packed_len = htonl(pkt.length());
95
96 EXPECT_TRUE(serv->send(req->info.fd, &packed_len, sizeof(packed_len)));
97 EXPECT_TRUE(serv->send(req->info.fd, pkt.c_str(), pkt.length()));
98
99 // end stream
100 packed_len = htonl(0);
101 EXPECT_TRUE(serv->send(req->info.fd, &packed_len, sizeof(packed_len)));
102
103 return SW_OK;
104 };
105
106 serv.start();
107 t1.join();
108
109 sw_logger()->set_level(ori_log_level);
110 }
111