1 #include "hawktracer/client_utils/tcp_client_stream.hpp"
2
3 #ifdef _WIN32
4 #define NOMINMAX
5 #include <WinSock2.h>
6 #pragma comment(lib, "Ws2_32.lib")
7 #else
8 #include <arpa/inet.h>
9 #include <unistd.h>
10 #endif
11 #include <cstring>
12
13 namespace HawkTracer
14 {
15 namespace ClientUtils
16 {
17 #define BUFSIZE 1024
18
close_socket(int sock_fd)19 static int close_socket(int sock_fd)
20 {
21 #ifdef _WIN32
22 return closesocket(sock_fd);
23 #else
24 return close(sock_fd);
25 #endif
26 }
27
TCPClientStream(const std::string & ip_address,uint16_t port,bool wait_for_server)28 TCPClientStream::TCPClientStream(const std::string& ip_address, uint16_t port, bool wait_for_server) :
29 _sock_fd(-1),
30 _ip_address(ip_address),
31 _port(port),
32 _wait_for_server(wait_for_server)
33 {
34 }
35
~TCPClientStream()36 TCPClientStream::~TCPClientStream()
37 {
38 stop();
39 }
40
start()41 bool TCPClientStream::start()
42 {
43 if (is_connected())
44 {
45 stop();
46 }
47
48 #ifdef _WIN32
49 WSAData wsa_data;
50 if (WSAStartup(MAKEWORD(2, 2), &wsa_data) != 0)
51 {
52 return false;
53 }
54 #endif
55
56 struct sockaddr_in serveraddr;
57
58 _sock_fd = socket(AF_INET, SOCK_STREAM, 0);
59 if (_sock_fd < 0)
60 {
61 return false;
62 }
63
64 memset((char*)&serveraddr, 0, sizeof(serveraddr));
65 serveraddr.sin_family = AF_INET;
66 serveraddr.sin_addr.s_addr = inet_addr(_ip_address.c_str());
67 serveraddr.sin_port = htons(_port);
68
69 if (_wait_for_server)
70 {
71 while (connect(_sock_fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)) < 0)
72 {
73 std::this_thread::sleep_for(std::chrono::milliseconds(10));
74 }
75 }
76 else
77 {
78 if (connect(_sock_fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)) < 0)
79 {
80 stop();
81 return false;
82 }
83 }
84
85 _thread = std::thread([this] { _run(); });
86
87 return true;
88 }
89
stop()90 void TCPClientStream::stop()
91 {
92 if (is_connected())
93 {
94 close_socket(_sock_fd);
95 _sock_fd = -1;
96 }
97
98 #ifdef _WIN32
99 WSACleanup();
100 #endif
101
102 if (_thread.joinable())
103 {
104 _thread.join();
105 }
106 }
107
_run()108 void TCPClientStream::_run()
109 {
110 char buf[BUFSIZE];
111
112 while (is_connected())
113 {
114 int size = recv(_sock_fd, buf, BUFSIZE, 0);
115
116 if (size == 0 || size == -1)
117 {
118 close_socket(_sock_fd);
119 _sock_fd = -1;
120 }
121 else if (size > 0)
122 {
123 {
124 std::lock_guard<std::mutex> l(_datas_mtx);
125 _datas.push(std::make_pair(0u, std::vector<char>(buf, buf + size)));
126 }
127 _datas_cv.notify_one();
128 }
129 }
130
131 _datas_cv.notify_one();
132 }
133
is_connected() const134 bool TCPClientStream::is_connected() const
135 {
136 return _sock_fd != -1;
137 }
138
_wait_for_data(std::unique_lock<std::mutex> & l)139 bool TCPClientStream::_wait_for_data(std::unique_lock<std::mutex>& l)
140 {
141 _datas_cv.wait(l, [this] { return !_datas.empty() || !is_connected(); });
142 return !_datas.empty();
143 }
144
read_byte()145 int TCPClientStream::read_byte()
146 {
147 std::unique_lock<std::mutex> l(_datas_mtx);
148
149 if (!_wait_for_data(l))
150 {
151 return -1;
152 }
153 else
154 {
155 auto& buffer = _datas.front();
156 int b = buffer.second[buffer.first++];
157 _pop_if_used();
158 return b;
159 }
160 }
161
read_data(char * buff,size_t size)162 bool TCPClientStream::read_data(char* buff, size_t size)
163 {
164 do {
165 std::unique_lock<std::mutex> l(_datas_mtx);
166 if (!_wait_for_data(l))
167 {
168 return false;
169 }
170 auto& buffer = _datas.front();
171 size_t bytes_count = std::min(size, buffer.second.size() - buffer.first);
172 memcpy(buff, &buffer.second[buffer.first], bytes_count);
173 buffer.first += bytes_count;
174 size -= bytes_count;
175 buff += bytes_count;
176 _pop_if_used();
177 } while (size != 0);
178
179 return true;
180 }
181
182 } // namespace ClientUtils
183 } // namespace HawkTracer
184