1 #ifndef _tcp_trsp_h_
2 #define _tcp_trsp_h_
3 
4 #include "transport.h"
5 #include "sip_parser_async.h"
6 
7 #include <vector>
8 using std::vector;
9 
10 /**
11  * Maximum message length for TCP
12  * not including terminating '\0'
13  */
14 #define MAX_TCP_MSGLEN 65535
15 
16 #include <sys/socket.h>
on_sock_read(int fd,short ev,void * arg)17 #include <event2/event.h>
18 
19 #include <map>
20 #include <deque>
21 #include <string>
22 using std::map;
23 using std::deque;
on_sock_write(int fd,short ev,void * arg)24 using std::string;
25 
26 class tcp_server_worker;
27 class tcp_server_socket;
28 
29 class tcp_trsp_socket: public trsp_socket
30 {
tcp_trsp_socket(tcp_server_socket * server_sock,tcp_server_worker * server_worker,int sd,const sockaddr_storage * sa,struct event_base * evbase)31   tcp_server_socket* server_sock;
32   tcp_server_worker* server_worker;
33 
34   bool             closed;
35   bool             connected;
36   sockaddr_storage peer_addr;
37   string           peer_ip;
38   unsigned short   peer_port;
39   bool             peer_addr_valid;
40 
41   parser_state     pst;
42   unsigned char    input_buf[MAX_TCP_MSGLEN];
43   int              input_len;
44 
45   struct event_base* evbase;
46   struct event*      read_ev;
47   struct event*      write_ev;
48 
49   struct msg_buf {
50     sockaddr_storage addr;
51     char*            msg;
52     int              msg_len;
53     char*            cursor;
54 
55     msg_buf(const sockaddr_storage* sa, const char* msg,
56 	    const int msg_len);
57     ~msg_buf();
58 
59     int bytes_left() { return msg_len - (cursor - msg); }
60   };
create_connected(tcp_server_socket * server_sock,tcp_server_worker * server_worker,int sd,const sockaddr_storage * sa,struct event_base * evbase)61 
62   deque<msg_buf*> send_q;
63 
64   AmMutex sock_mut;
65 
66   unsigned char*   get_input() { return input_buf + input_len; }
67   int              get_input_free_space() {
68     if(input_len > MAX_TCP_MSGLEN) return 0;
69     return MAX_TCP_MSGLEN - input_len;
70   }
71 
72   void reset_input() {
73     input_len = 0;
74   }
75 
76   int parse_input();
77 
78   /** fake implementation: we will never bind a connection socket */
79   int bind(const string& address, unsigned short port) {
new_connection(tcp_server_socket * server_sock,tcp_server_worker * server_worker,const sockaddr_storage * sa,struct event_base * evbase)80     return 0;
81   }
82 
83   /**
84    * Instantiates read_ev & write_ev
85    * Warning: call only ONCE!!!
86    */
87   void create_events();
88 
~tcp_trsp_socket()89   /*
90    * Connects the socket to the destination
91    * given in constructor.
92    */
93   int connect();
94 
95   /**
create_events()96    * Checks whether or not the socket is already connected.
97    * If not, a new connection will be tried.
98    */
99   int check_connection();
100 
101   /**
102    * Closes the connection/socket
103    *
104    * Warning: never do anything with the object
105    *          after close as it could have been
106    *          destroyed.
add_read_event_ul()107    */
108   void close();
109 
110   /**
111    * Generates a transport error for each request
112    * left in send queue.
113    */
add_read_event()114   void generate_transport_errors();
115 
116   /**
117    * Adds persistent read-event to event base.
118    */
add_write_event_ul(struct timeval * timeout)119   void add_read_event();
120 
121   /**
122    * Adds one-shot write-event to event base.
123    */
124   void add_write_event(struct timeval* timeout=NULL);
125 
add_write_event(struct timeval * timeout)126   /**
127    * Same as add_read_event() but unlock before
128    * calling event_add().
129    */
130   void add_read_event_ul();
copy_peer_addr(sockaddr_storage * sa)131 
132   /**
133    * Same as add_write_event() but unlock before
134    * calling event_add().
135    */
136   void add_write_event_ul(struct timeval* timeout=NULL);
137 
138   int  on_connect(short ev);
139   void on_write(short ev);
140   void on_read(short ev);
141 
142   static void on_sock_read(int fd, short ev, void* arg);
143   static void on_sock_write(int fd, short ev, void* arg);
144 
~msg_buf()145   tcp_trsp_socket(tcp_server_socket* server_sock,
146 		  tcp_server_worker* server_worker,
147 		  int sd, const sockaddr_storage* sa,
148 		  struct event_base* evbase);
149 
on_connect(short ev)150 public:
151   static void create_connected(tcp_server_socket* server_sock,
152 			       tcp_server_worker* server_worker,
153 			       int sd, const sockaddr_storage* sa,
154 			       struct event_base* evbase);
155 
156   static tcp_trsp_socket* new_connection(tcp_server_socket* server_sock,
157 					 tcp_server_worker* server_worker,
158 					 const sockaddr_storage* sa,
159 					 struct event_base* evbase);
160   ~tcp_trsp_socket();
161 
162   const char* get_transport() const { return "tcp"; }
163   bool        is_reliable() const   { return true; }
164 
165   void copy_peer_addr(sockaddr_storage* sa);
166 
167   const string& get_peer_ip() {
168     return peer_ip;
169   }
170 
171   unsigned short get_peer_port() {
172     return peer_port;
173   }
174 
175   bool is_connected() {
176     return connected;
177   }
178 
179   /**
180    * Sends a message (push it to send-queue).
connect()181    * @return -1 if error(s) occured.
182    */
183   int send(const sockaddr_storage* sa, const char* msg,
184 	   const int msg_len, unsigned int flags);
185 };
186 
187 class tcp_server_worker
188   : public AmThread
189 {
190   struct event_base* evbase;
191   tcp_server_socket* server_sock;
192 
193   AmMutex                      connections_mut;
194   map<string,tcp_trsp_socket*> connections;
195 
196 protected:
197   void run();
198   void on_stop();
199 
200 public:
201   tcp_server_worker(tcp_server_socket* server_sock);
202   ~tcp_server_worker();
203 
204   int send(const sockaddr_storage* sa, const char* msg,
205 	   const int msg_len, unsigned int flags);
206 
207   void add_connection(tcp_trsp_socket* client_sock);
208   void remove_connection(tcp_trsp_socket* client_sock);
check_connection()209 };
210 
211 class tcp_server_socket: public trsp_socket
212 {
213   struct event_base* evbase;
214   struct event*      ev_accept;
215 
216   vector<tcp_server_worker*> workers;
217 
218   /**
219    * Timeout while connecting to a remote peer.
220    */
221   struct timeval connect_timeout;
222 
223   /**
224    * Idle Timeout before closing a connection.
225    */
226   struct timeval idle_timeout;
227 
228   /* callback on new connection */
229   void on_accept(int sd, short ev);
230 
231   /* libevent callback on new connection */
232   static void on_accept(int sd, short ev, void* arg);
233 
234   static uint32_t hash_addr(const sockaddr_storage* addr);
235 
236 public:
237   tcp_server_socket(unsigned short if_num);
238   ~tcp_server_socket() {}
239 
240   void add_threads(unsigned int n);
241   void start_threads();
242   void stop_threads();
243 
send(const sockaddr_storage * sa,const char * msg,const int msg_len,unsigned int flags)244   const char* get_transport() const { return "tcp"; }
245   bool        is_reliable() const   { return true; }
246 
247   /* activates libevent on_accept callback */
248   void add_event(struct event_base *evbase);
249 
250   int bind(const string& address, unsigned short port);
251   int send(const sockaddr_storage* sa, const char* msg,
252 	   const int msg_len, unsigned int flags);
253 
254   /**
255    * Set timeout in milliseconds for the connection
256    * establishement handshake.
257    */
258   void set_connect_timeout(unsigned int ms);
259 
260   /**
261    * Set idle timeout in milliseconds for news connections.
close()262    * If during this period of time no packet is received,
263    * the connection will be closed.
264    */
265   void set_idle_timeout(unsigned int ms);
266 
267   struct timeval* get_connect_timeout();
268   struct timeval* get_idle_timeout();
269 };
270 
271 class tcp_trsp: public transport
272 {
273   struct event_base *evbase;
274 
275 protected:
276   /** @see AmThread */
277   void run();
278   /** @see AmThread */
279   void on_stop();
280 
281 public:
generate_transport_errors()282   /** @see transport */
283   tcp_trsp(tcp_server_socket* sock);
284   ~tcp_trsp();
285 };
286 
287 #endif
288