1 #include "hawktracer/client_utils/tcp_client_stream.hpp"
2 
3 #ifdef _WIN32
4 #define NOMINMAX
5 #include <WinSock2.h>
6 #pragma comment(lib, "Ws2_32.lib")
7 #else
8 #include <arpa/inet.h>
9 #include <unistd.h>
10 #endif
11 #include <cstring>
12 
13 namespace HawkTracer
14 {
15 namespace ClientUtils
16 {
17 #define BUFSIZE 1024
18 
close_socket(int sock_fd)19 static int close_socket(int sock_fd)
20 {
21 #ifdef _WIN32
22     return closesocket(sock_fd);
23 #else
24     return close(sock_fd);
25 #endif
26 }
27 
TCPClientStream(const std::string & ip_address,uint16_t port,bool wait_for_server)28 TCPClientStream::TCPClientStream(const std::string& ip_address, uint16_t port, bool wait_for_server) :
29     _sock_fd(-1),
30     _ip_address(ip_address),
31     _port(port),
32     _wait_for_server(wait_for_server)
33 {
34 }
35 
~TCPClientStream()36 TCPClientStream::~TCPClientStream()
37 {
38     stop();
39 }
40 
start()41 bool TCPClientStream::start()
42 {
43     if (is_connected())
44     {
45         stop();
46     }
47 
48 #ifdef _WIN32
49     WSAData wsa_data;
50     if (WSAStartup(MAKEWORD(2, 2), &wsa_data) != 0)
51     {
52         return false;
53     }
54 #endif
55 
56     struct sockaddr_in serveraddr;
57 
58     _sock_fd = socket(AF_INET, SOCK_STREAM, 0);
59     if (_sock_fd < 0)
60     {
61         return false;
62     }
63 
64     memset((char*)&serveraddr, 0, sizeof(serveraddr));
65     serveraddr.sin_family = AF_INET;
66     serveraddr.sin_addr.s_addr = inet_addr(_ip_address.c_str());
67     serveraddr.sin_port = htons(_port);
68 
69     if (_wait_for_server)
70     {
71         while (connect(_sock_fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)) < 0)
72         {
73             std::this_thread::sleep_for(std::chrono::milliseconds(10));
74         }
75     }
76     else
77     {
78         if (connect(_sock_fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)) < 0)
79         {
80             stop();
81             return false;
82         }
83     }
84 
85     _thread = std::thread([this] { _run(); });
86 
87     return true;
88 }
89 
stop()90 void TCPClientStream::stop()
91 {
92     if (is_connected())
93     {
94         close_socket(_sock_fd);
95         _sock_fd = -1;
96     }
97 
98 #ifdef _WIN32
99     WSACleanup();
100 #endif
101 
102     if (_thread.joinable())
103     {
104         _thread.join();
105     }
106 }
107 
_run()108 void TCPClientStream::_run()
109 {
110     char buf[BUFSIZE];
111 
112     while (is_connected())
113     {
114         int size = recv(_sock_fd, buf, BUFSIZE, 0);
115 
116         if (size == 0 || size == -1)
117         {
118             close_socket(_sock_fd);
119             _sock_fd = -1;
120         }
121         else if (size > 0)
122         {
123             {
124                 std::lock_guard<std::mutex> l(_datas_mtx);
125                 _datas.push(std::make_pair(0u, std::vector<char>(buf, buf + size)));
126             }
127             _datas_cv.notify_one();
128         }
129     }
130 
131     _datas_cv.notify_one();
132 }
133 
is_connected() const134 bool TCPClientStream::is_connected() const
135 {
136     return _sock_fd != -1;
137 }
138 
_wait_for_data(std::unique_lock<std::mutex> & l)139 bool TCPClientStream::_wait_for_data(std::unique_lock<std::mutex>& l)
140 {
141      _datas_cv.wait(l, [this] { return !_datas.empty() || !is_connected(); });
142      return !_datas.empty();
143 }
144 
read_byte()145 int TCPClientStream::read_byte()
146 {
147     std::unique_lock<std::mutex> l(_datas_mtx);
148 
149     if (!_wait_for_data(l))
150     {
151         return -1;
152     }
153     else
154     {
155         auto& buffer = _datas.front();
156         int b = buffer.second[buffer.first++];
157         _pop_if_used();
158         return b;
159     }
160 }
161 
read_data(char * buff,size_t size)162 bool TCPClientStream::read_data(char* buff, size_t size)
163 {
164     do {
165         std::unique_lock<std::mutex> l(_datas_mtx);
166         if (!_wait_for_data(l))
167         {
168             return false;
169         }
170         auto& buffer = _datas.front();
171         size_t bytes_count = std::min(size, buffer.second.size() - buffer.first);
172         memcpy(buff, &buffer.second[buffer.first], bytes_count);
173         buffer.first += bytes_count;
174         size -= bytes_count;
175         buff += bytes_count;
176         _pop_if_used();
177     } while (size != 0);
178 
179     return true;
180 }
181 
182 } // namespace ClientUtils
183 } // namespace HawkTracer
184