1 #ifndef _tcp_trsp_h_
2 #define _tcp_trsp_h_
3
4 #include "transport.h"
5 #include "sip_parser_async.h"
6
7 #include <vector>
8 using std::vector;
9
10 /**
11 * Maximum message length for TCP
12 * not including terminating '\0'
13 */
14 #define MAX_TCP_MSGLEN 65535
15
16 #include <sys/socket.h>
on_sock_read(int fd,short ev,void * arg)17 #include <event2/event.h>
18
19 #include <map>
20 #include <deque>
21 #include <string>
22 using std::map;
23 using std::deque;
on_sock_write(int fd,short ev,void * arg)24 using std::string;
25
26 class tcp_server_worker;
27 class tcp_server_socket;
28
29 class tcp_trsp_socket: public trsp_socket
30 {
tcp_trsp_socket(tcp_server_socket * server_sock,tcp_server_worker * server_worker,int sd,const sockaddr_storage * sa,struct event_base * evbase)31 tcp_server_socket* server_sock;
32 tcp_server_worker* server_worker;
33
34 bool closed;
35 bool connected;
36 sockaddr_storage peer_addr;
37 string peer_ip;
38 unsigned short peer_port;
39 bool peer_addr_valid;
40
41 parser_state pst;
42 unsigned char input_buf[MAX_TCP_MSGLEN];
43 int input_len;
44
45 struct event_base* evbase;
46 struct event* read_ev;
47 struct event* write_ev;
48
49 struct msg_buf {
50 sockaddr_storage addr;
51 char* msg;
52 int msg_len;
53 char* cursor;
54
55 msg_buf(const sockaddr_storage* sa, const char* msg,
56 const int msg_len);
57 ~msg_buf();
58
59 int bytes_left() { return msg_len - (cursor - msg); }
60 };
create_connected(tcp_server_socket * server_sock,tcp_server_worker * server_worker,int sd,const sockaddr_storage * sa,struct event_base * evbase)61
62 deque<msg_buf*> send_q;
63
64 AmMutex sock_mut;
65
66 unsigned char* get_input() { return input_buf + input_len; }
67 int get_input_free_space() {
68 if(input_len > MAX_TCP_MSGLEN) return 0;
69 return MAX_TCP_MSGLEN - input_len;
70 }
71
72 void reset_input() {
73 input_len = 0;
74 }
75
76 int parse_input();
77
78 /** fake implementation: we will never bind a connection socket */
79 int bind(const string& address, unsigned short port) {
new_connection(tcp_server_socket * server_sock,tcp_server_worker * server_worker,const sockaddr_storage * sa,struct event_base * evbase)80 return 0;
81 }
82
83 /**
84 * Instantiates read_ev & write_ev
85 * Warning: call only ONCE!!!
86 */
87 void create_events();
88
~tcp_trsp_socket()89 /*
90 * Connects the socket to the destination
91 * given in constructor.
92 */
93 int connect();
94
95 /**
create_events()96 * Checks whether or not the socket is already connected.
97 * If not, a new connection will be tried.
98 */
99 int check_connection();
100
101 /**
102 * Closes the connection/socket
103 *
104 * Warning: never do anything with the object
105 * after close as it could have been
106 * destroyed.
add_read_event_ul()107 */
108 void close();
109
110 /**
111 * Generates a transport error for each request
112 * left in send queue.
113 */
add_read_event()114 void generate_transport_errors();
115
116 /**
117 * Adds persistent read-event to event base.
118 */
add_write_event_ul(struct timeval * timeout)119 void add_read_event();
120
121 /**
122 * Adds one-shot write-event to event base.
123 */
124 void add_write_event(struct timeval* timeout=NULL);
125
add_write_event(struct timeval * timeout)126 /**
127 * Same as add_read_event() but unlock before
128 * calling event_add().
129 */
130 void add_read_event_ul();
copy_peer_addr(sockaddr_storage * sa)131
132 /**
133 * Same as add_write_event() but unlock before
134 * calling event_add().
135 */
136 void add_write_event_ul(struct timeval* timeout=NULL);
137
138 int on_connect(short ev);
139 void on_write(short ev);
140 void on_read(short ev);
141
142 static void on_sock_read(int fd, short ev, void* arg);
143 static void on_sock_write(int fd, short ev, void* arg);
144
~msg_buf()145 tcp_trsp_socket(tcp_server_socket* server_sock,
146 tcp_server_worker* server_worker,
147 int sd, const sockaddr_storage* sa,
148 struct event_base* evbase);
149
on_connect(short ev)150 public:
151 static void create_connected(tcp_server_socket* server_sock,
152 tcp_server_worker* server_worker,
153 int sd, const sockaddr_storage* sa,
154 struct event_base* evbase);
155
156 static tcp_trsp_socket* new_connection(tcp_server_socket* server_sock,
157 tcp_server_worker* server_worker,
158 const sockaddr_storage* sa,
159 struct event_base* evbase);
160 ~tcp_trsp_socket();
161
162 const char* get_transport() const { return "tcp"; }
163 bool is_reliable() const { return true; }
164
165 void copy_peer_addr(sockaddr_storage* sa);
166
167 const string& get_peer_ip() {
168 return peer_ip;
169 }
170
171 unsigned short get_peer_port() {
172 return peer_port;
173 }
174
175 bool is_connected() {
176 return connected;
177 }
178
179 /**
180 * Sends a message (push it to send-queue).
connect()181 * @return -1 if error(s) occured.
182 */
183 int send(const sockaddr_storage* sa, const char* msg,
184 const int msg_len, unsigned int flags);
185 };
186
187 class tcp_server_worker
188 : public AmThread
189 {
190 struct event_base* evbase;
191 tcp_server_socket* server_sock;
192
193 AmMutex connections_mut;
194 map<string,tcp_trsp_socket*> connections;
195
196 protected:
197 void run();
198 void on_stop();
199
200 public:
201 tcp_server_worker(tcp_server_socket* server_sock);
202 ~tcp_server_worker();
203
204 int send(const sockaddr_storage* sa, const char* msg,
205 const int msg_len, unsigned int flags);
206
207 void add_connection(tcp_trsp_socket* client_sock);
208 void remove_connection(tcp_trsp_socket* client_sock);
check_connection()209 };
210
211 class tcp_server_socket: public trsp_socket
212 {
213 struct event_base* evbase;
214 struct event* ev_accept;
215
216 vector<tcp_server_worker*> workers;
217
218 /**
219 * Timeout while connecting to a remote peer.
220 */
221 struct timeval connect_timeout;
222
223 /**
224 * Idle Timeout before closing a connection.
225 */
226 struct timeval idle_timeout;
227
228 /* callback on new connection */
229 void on_accept(int sd, short ev);
230
231 /* libevent callback on new connection */
232 static void on_accept(int sd, short ev, void* arg);
233
234 static uint32_t hash_addr(const sockaddr_storage* addr);
235
236 public:
237 tcp_server_socket(unsigned short if_num);
238 ~tcp_server_socket() {}
239
240 void add_threads(unsigned int n);
241 void start_threads();
242 void stop_threads();
243
send(const sockaddr_storage * sa,const char * msg,const int msg_len,unsigned int flags)244 const char* get_transport() const { return "tcp"; }
245 bool is_reliable() const { return true; }
246
247 /* activates libevent on_accept callback */
248 void add_event(struct event_base *evbase);
249
250 int bind(const string& address, unsigned short port);
251 int send(const sockaddr_storage* sa, const char* msg,
252 const int msg_len, unsigned int flags);
253
254 /**
255 * Set timeout in milliseconds for the connection
256 * establishement handshake.
257 */
258 void set_connect_timeout(unsigned int ms);
259
260 /**
261 * Set idle timeout in milliseconds for news connections.
close()262 * If during this period of time no packet is received,
263 * the connection will be closed.
264 */
265 void set_idle_timeout(unsigned int ms);
266
267 struct timeval* get_connect_timeout();
268 struct timeval* get_idle_timeout();
269 };
270
271 class tcp_trsp: public transport
272 {
273 struct event_base *evbase;
274
275 protected:
276 /** @see AmThread */
277 void run();
278 /** @see AmThread */
279 void on_stop();
280
281 public:
generate_transport_errors()282 /** @see transport */
283 tcp_trsp(tcp_server_socket* sock);
284 ~tcp_trsp();
285 };
286
287 #endif
288