1 /* Copyright(C) 2004-2007 Brazil
2 
3   This library is free software; you can redistribute it and/or
4   modify it under the terms of the GNU Lesser General Public
5   License as published by the Free Software Foundation; either
6   version 2.1 of the License, or (at your option) any later version.
7 
8   This library is distributed in the hope that it will be useful,
9   but WITHOUT ANY WARRANTY; without even the implied warranty of
10   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11   Lesser General Public License for more details.
12 
13   You should have received a copy of the GNU Lesser General Public
14   License along with this library; if not, write to the Free Software
15   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16 */
17 
18 #include "senna_in.h"
19 
20 #include <string.h>
21 #ifdef HAVE_NETDB_H
22 #include <netdb.h>
23 #endif /* HAVE_NETDB_H */
24 #ifdef HAVE_SYS_SOCKET_H
25 #include <sys/socket.h>
26 #endif /* HAVE_SYS_SOCKET_H */
27 #ifdef HAVE_NETINET_IN_H
28 #include <netinet/in.h>
29 #endif /* HAVE_NETINET_IN_H */
30 #ifdef HAVE_NETINET_TCP_H
31 #include <netinet/tcp.h>
32 #endif /* HAVE_NETINET_TCP_H */
33 #ifdef HAVE_SIGNAL_H
34 #include <signal.h>
35 #endif /* HAVE_SIGNAL_H */
36 
37 #include "com.h"
38 #include "set.h"
39 #include "ctx.h"
40 
41 #ifndef PF_INET
42 #define PF_INET AF_INET
43 #endif /* PF_INET */
44 
45 #ifdef USE_MSG_NOSIGNAL
46 #if __FreeBSD__ >= 2 && __FreeBSD_version >= 600020
47 #define MSG_NOSIGNAL 0x20000
48 #endif /* __FreeBSD__ >= 2 && __FreeBSD_version >= 600020 */
49 #else /* USE_MSG_NOSIGNAL */
50 #define MSG_NOSIGNAL 0
51 #endif /* USE_MSG_NOSIGNAL */
52 
53 #ifdef WIN32
54 void
log_sockerr(const char * msg)55 log_sockerr(const char *msg)
56 {
57   int e = WSAGetLastError();
58   const char *m;
59   switch (e) {
60   case WSANOTINITIALISED: m = "please call sen_com_init first"; break;
61   case WSAEFAULT: m = "bad address"; break;
62   case WSAEINVAL: m = "invalid argument"; break;
63   case WSAEMFILE: m = "too many sockets"; break;
64   case WSAEWOULDBLOCK: m = "operation would block"; break;
65   case WSAENOTSOCK: m = "given fd is not socket fd"; break;
66   case WSAEOPNOTSUPP: m = "operation is not supported"; break;
67   case WSAEADDRINUSE: m = "address is already in use"; break;
68   case WSAEADDRNOTAVAIL: m = "address is not available"; break;
69   case WSAENETDOWN: m = "network is down"; break;
70   case WSAENOBUFS: m = "no buffer"; break;
71   case WSAEISCONN: m = "socket is already connected"; break;
72   case WSAENOTCONN: m = "socket is not connected"; break;
73   case WSAESHUTDOWN: m = "socket is already shutdowned"; break;
74   case WSAETIMEDOUT: m = "connection time out"; break;
75   case WSAECONNREFUSED: m = "connection refused"; break;
76   default:
77     SEN_LOG(sen_log_error, "%s: socket error (%d)", msg, e);
78     return;
79   }
80   SEN_LOG(sen_log_error, "%s: %s", msg, m);
81 }
82 #define LOG_SOCKERR(m) log_sockerr((m))
83 #else /* WIN32 */
84 #define LOG_SOCKERR(m) SEN_LOG(sen_log_error, "%s: %s", (m), strerror(errno))
85 #endif /* WIN32 */
86 
87 /******* sen_com ********/
88 
89 sen_rc
sen_com_init(void)90 sen_com_init(void)
91 {
92 #ifdef WIN32
93   WSADATA wd;
94   if (WSAStartup(MAKEWORD(2, 0), &wd) != 0) {
95     GERR(sen_external_error, "WSAStartup failed");
96   }
97 #else /* WIN32 */
98 #ifndef USE_MSG_NOSIGNAL
99   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
100     GERR(sen_external_error, "signal");
101   }
102 #endif /* USE_MSG_NOSIGNAL */
103 #endif /* WIN32 */
104   return sen_gctx.rc;
105 }
106 
107 void
sen_com_fin(void)108 sen_com_fin(void)
109 {
110 #ifdef WIN32
111   WSACleanup();
112 #endif /* WIN32 */
113 }
114 
115 sen_rc
sen_com_event_init(sen_com_event * ev,int max_nevents,int data_size)116 sen_com_event_init(sen_com_event *ev, int max_nevents, int data_size)
117 {
118   sen_ctx *ctx = &sen_gctx; /* todo : replace it with the local ctx */
119   sen_rc rc = sen_memory_exhausted;
120   ev->max_nevents = max_nevents;
121   if ((ev->set = sen_set_open(sizeof(sen_sock), data_size, 0))) {
122 #ifndef USE_SELECT
123 #ifdef USE_EPOLL
124     if ((ev->events = SEN_MALLOC(sizeof(struct epoll_event) * max_nevents))) {
125       if ((ev->epfd = epoll_create(max_nevents)) != -1) {
126         rc = sen_success;
127         goto exit;
128       } else {
129         LOG_SOCKERR("epoll_create");
130         rc = sen_external_error;
131       }
132       SEN_FREE(ev->events);
133     }
134 #else /* USE_EPOLL */
135     if ((ev->events = SEN_MALLOC(sizeof(struct pollfd) * max_nevents))) {
136       rc = sen_success;
137       goto exit;
138     }
139 #endif /* USE_EPOLL */
140     sen_set_close(ev->set);
141     ev->set = NULL;
142     ev->events = NULL;
143 #else /* USE_SELECT */
144     rc = sen_success;
145 #endif /* USE_SELECT */
146   }
147 exit :
148   return rc;
149 }
150 
151 sen_rc
sen_com_event_fin(sen_com_event * ev)152 sen_com_event_fin(sen_com_event *ev)
153 {
154   sen_ctx *ctx = &sen_gctx; /* todo : replace it with the local ctx */
155   if (ev->set) { sen_set_close(ev->set); }
156 #ifndef USE_SELECT
157   if (ev->events) { SEN_FREE(ev->events); }
158 #endif /* USE_SELECT */
159   return sen_success;
160 }
161 
162 sen_rc
sen_com_event_add(sen_com_event * ev,sen_sock fd,int events,sen_com ** com)163 sen_com_event_add(sen_com_event *ev, sen_sock fd, int events, sen_com **com)
164 {
165   sen_com *c;
166   /* todo : expand events */
167   if (!ev || ev->set->n_entries == ev->max_nevents) {
168     if (ev) { SEN_LOG(sen_log_error, "too many connections (%d)", ev->max_nevents); }
169     return sen_invalid_argument;
170   }
171 #ifdef USE_EPOLL
172   {
173     struct epoll_event e;
174     memset(&e, 0, sizeof(struct epoll_event));
175     e.data.fd = (fd);
176     e.events = (__uint32_t) events;
177     if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, (fd), &e) == -1) {
178       LOG_SOCKERR("epoll_ctl");
179       return sen_external_error;
180     }
181   }
182 #endif /* USE_EPOLL*/
183   if (sen_set_get(ev->set, &fd, (void **)&c)) {
184     c->fd = fd;
185     c->events = events;
186     if (com) { *com = c; }
187     return sen_success;
188   }
189   return sen_internal_error;
190 }
191 
192 sen_rc
sen_com_event_mod(sen_com_event * ev,sen_sock fd,int events,sen_com ** com)193 sen_com_event_mod(sen_com_event *ev, sen_sock fd, int events, sen_com **com)
194 {
195   sen_com *c;
196   if (!ev) { return sen_invalid_argument; }
197   if (sen_set_at(ev->set, &fd, (void **)&c)) {
198     if (c->fd != fd) {
199       SEN_LOG(sen_log_error, "sen_com_event_mod fd unmatch %d != %d", c->fd, fd);
200       return sen_invalid_format;
201     }
202     if (com) { *com = c; }
203     if (c->events != events) {
204 #ifdef USE_EPOLL
205       struct epoll_event e;
206       memset(&e, 0, sizeof(struct epoll_event));
207       e.data.fd = (fd);
208       e.events = (__uint32_t) events;
209       if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, (fd), &e) == -1) {
210         LOG_SOCKERR("epoll_ctl");
211         return sen_external_error;
212       }
213 #endif /* USE_EPOLL*/
214       c->events = events;
215     }
216     return sen_success;
217   }
218   return sen_internal_error;
219 }
220 
221 sen_rc
sen_com_event_del(sen_com_event * ev,sen_sock fd)222 sen_com_event_del(sen_com_event *ev, sen_sock fd)
223 {
224   if (!ev) { return sen_invalid_argument; }
225   {
226     sen_com *c;
227     sen_set_eh *eh = sen_set_at(ev->set, &fd, (void **)&c);
228     if (eh) {
229 #ifdef USE_EPOLL
230       struct epoll_event e;
231       memset(&e, 0, sizeof(struct epoll_event));
232       e.data.fd = fd;
233       e.events = c->events;
234       if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, fd, &e) == -1) {
235         LOG_SOCKERR("epoll_ctl");
236         return sen_external_error;
237       }
238 #endif /* USE_EPOLL*/
239       return sen_set_del(ev->set, eh);
240     } else {
241       SEN_LOG(sen_log_error, "%04x| fd(%d) not found in ev(%p)", getpid(), fd, ev);
242       return sen_internal_error;
243     }
244   }
245 }
246 
247 sen_rc
sen_com_event_poll(sen_com_event * ev,int timeout)248 sen_com_event_poll(sen_com_event *ev, int timeout)
249 {
250   int nevents;
251   sen_com *com;
252 #ifdef USE_SELECT
253   sen_sock *pfd;
254   int nfds = 0;
255   fd_set rfds;
256   fd_set wfds;
257   struct timeval tv;
258   if (timeout >= 0) {
259     tv.tv_sec = timeout / 1000;
260     tv.tv_usec = (timeout % 1000) * 1000;
261   }
262   FD_ZERO(&rfds);
263   FD_ZERO(&wfds);
264   SEN_SET_EACH(ev->set, eh, &pfd, &com, {
265     if ((com->events & SEN_COM_POLLIN)) { FD_SET(*pfd, &rfds); }
266     if ((com->events & SEN_COM_POLLOUT)) { FD_SET(*pfd, &wfds); }
267     if (*pfd > nfds) { nfds = *pfd; }
268   });
269   nevents = select(nfds + 1, &rfds, &wfds, NULL, (timeout >= 0) ? &tv : NULL);
270   if (nevents < 0) {
271 #ifdef WIN32
272     if (WSAGetLastError() == WSAEINTR) { return sen_success; }
273 #else /* WIN32 */
274     if (errno == EINTR) { return sen_success; }
275 #endif /* WIN32 */
276     LOG_SOCKERR("select");
277     return sen_external_error;
278   }
279   if (timeout < 0 && !nevents) { SEN_LOG(sen_log_notice, "select returns 0 events"); }
280   SEN_SET_EACH(ev->set, eh, &pfd, &com, {
281     if (FD_ISSET(*pfd, &rfds)) { com->ev_in(ev, com); }
282     if (FD_ISSET(*pfd, &wfds)) { com->ev_out(ev, com); }
283   });
284 #else /* USE_SELECT */
285 #ifdef USE_EPOLL
286   struct epoll_event *ep;
287   nevents = epoll_wait(ev->epfd, ev->events, ev->max_nevents, timeout);
288 #else /* USE_EPOLL */
289   int nfd = 0, *pfd;
290   struct pollfd *ep = ev->events;
291   SEN_SET_EACH(ev->set, eh, &pfd, &com, {
292     ep->fd = *pfd;
293     //    ep->events =(short) com->events;
294     ep->events = POLLIN;
295     ep->revents = 0;
296     ep++;
297     nfd++;
298   });
299   nevents = poll(ev->events, nfd, timeout);
300 #endif /* USE_EPOLL */
301   if (nevents < 0) {
302     if (errno == EINTR) { return sen_success; }
303     LOG_SOCKERR("poll");
304     return sen_external_error;
305   }
306   if (timeout < 0 && !nevents) { SEN_LOG(sen_log_notice, "poll returns 0 events"); }
307   for (ep = ev->events; nevents; ep++) {
308     int efd;
309 #ifdef USE_EPOLL
310     efd = ep->data.fd;
311     nevents--;
312     if (!sen_set_at(ev->set, &efd, (void *)&com)) {
313       struct epoll_event e;
314       SEN_LOG(sen_log_error, "fd(%d) not found in ev->set", efd);
315       memset(&e, 0, sizeof(struct epoll_event));
316       e.data.fd = efd;
317       e.events = ep->events;
318       if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, efd, &e) == -1) { LOG_SOCKERR("epoll_ctl"); }
319       if (sen_sock_close(efd) == -1) { LOG_SOCKERR("close"); }
320       continue;
321     }
322     if ((ep->events & SEN_COM_POLLIN)) { com->ev_in(ev, com); }
323     if ((ep->events & SEN_COM_POLLOUT)) { com->ev_out(ev, com); }
324 #else /* USE_EPOLL */
325     efd = ep->fd;
326     if (!(ep->events & ep->revents)) { continue; }
327     nevents--;
328     if (!sen_set_at(ev->set, &efd, (void *)&com)) {
329       SEN_LOG(sen_log_error, "fd(%d) not found in ev->set", efd);
330       if (sen_sock_close(efd) == -1) { LOG_SOCKERR("close"); }
331       continue;
332     }
333     if ((ep->revents & SEN_COM_POLLIN)) { com->ev_in(ev, com); }
334     if ((ep->revents & SEN_COM_POLLOUT)) { com->ev_out(ev, com); }
335 #endif /* USE_EPOLL */
336   }
337 #endif /* USE_SELECT */
338   return sen_success;
339 }
340 
341 /******* sen_com_sqtp ********/
342 
343 sen_rc
sen_com_sqtp_send(sen_com_sqtp * cs,sen_com_sqtp_header * header,char * body)344 sen_com_sqtp_send(sen_com_sqtp *cs, sen_com_sqtp_header *header, char *body)
345 {
346   ssize_t ret, whole_size = sizeof(sen_com_sqtp_header) + header->size;
347   header->proto = SEN_COM_PROTO_SQTP;
348   if (cs->com.status == sen_com_closing) { header->flags |= SEN_CTX_QUIT; }
349   SEN_LOG(sen_log_info, "send (%d,%x,%d,%02x,%02x,%04x,%08x)", header->size, header->flags, header->proto, header->qtype, header->level, header->status, header->info);
350 
351   if (header->size) {
352 #ifdef WIN32
353     ssize_t reth;
354     if ((reth = send(cs->com.fd, header, sizeof(sen_com_sqtp_header), 0)) == -1) {
355       LOG_SOCKERR("send size");
356       cs->rc = sen_external_error;
357       goto exit;
358     }
359     if ((ret = send(cs->com.fd, body, header->size, 0)) == -1) {
360       LOG_SOCKERR("send body");
361       cs->rc = sen_external_error;
362       goto exit;
363     }
364     ret += reth;
365 #else /* WIN32 */
366     struct iovec msg_iov[2];
367     struct msghdr msg;
368     msg.msg_name = NULL;
369     msg.msg_namelen = 0;
370     msg.msg_iov = msg_iov;
371     msg.msg_iovlen = 2;
372     msg.msg_control = NULL;
373     msg.msg_controllen = 0;
374     msg.msg_flags = 0;
375     msg_iov[0].iov_base = header;
376     msg_iov[0].iov_len = sizeof(sen_com_sqtp_header);
377     msg_iov[1].iov_base = body;
378     msg_iov[1].iov_len = header->size;
379     while ((ret = sendmsg(cs->com.fd, &msg, MSG_NOSIGNAL)) == -1) {
380       LOG_SOCKERR("sendmsg");
381       if (errno == EAGAIN || errno == EINTR) { continue; }
382       cs->rc = sen_external_error;
383       goto exit;
384     }
385 #endif /* WIN32 */
386   } else {
387     while ((ret = send(cs->com.fd, header, whole_size, MSG_NOSIGNAL)) == -1) {
388 #ifdef WIN32
389       int e = WSAGetLastError();
390       LOG_SOCKERR("send");
391       if (e == WSAEWOULDBLOCK || e == WSAEINTR) { continue; }
392 #else /* WIN32 */
393       LOG_SOCKERR("send");
394       if (errno == EAGAIN || errno == EINTR) { continue; }
395 #endif /* WIN32 */
396       cs->rc = sen_external_error;
397       goto exit;
398     }
399   }
400   if (ret != whole_size) {
401     SEN_LOG(sen_log_error, "sendmsg: %d < %d", ret, whole_size);
402     cs->rc = sen_external_error;
403     goto exit;
404   }
405   cs->rc = sen_success;
406 exit :
407   return cs->rc;
408 }
409 
410 sen_rc
sen_com_sqtp_recv(sen_com_sqtp * cs,sen_rbuf * buf,unsigned int * status,unsigned int * info)411 sen_com_sqtp_recv(sen_com_sqtp *cs, sen_rbuf *buf,
412                   unsigned int *status, unsigned int *info)
413 {
414   ssize_t ret;
415   sen_com_sqtp_header *header;
416   size_t rest = sizeof(sen_com_sqtp_header);
417   if (SEN_RBUF_WSIZE(buf) < rest) {
418     if ((cs->rc = sen_rbuf_reinit(buf, rest))) {
419       *status = sen_com_emem; *info = 1; goto exit;
420     }
421   } else {
422     SEN_RBUF_REWIND(buf);
423   }
424   do {
425     // todo : also support non blocking mode (use MSG_DONTWAIT)
426     if ((ret = recv(cs->com.fd, buf->curr, rest, MSG_WAITALL)) <= 0) {
427       if (ret < 0) {
428 #ifdef WIN32
429         int e = WSAGetLastError();
430         LOG_SOCKERR("recv size");
431         if (e == WSAEWOULDBLOCK || e == WSAEINTR) { continue; }
432         *info = e;
433 #else /* WIN32 */
434         LOG_SOCKERR("recv size");
435         if (errno == EAGAIN || errno == EINTR) { continue; }
436         *info = errno;
437 #endif /* WIN32 */
438       }
439       cs->rc = sen_external_error;
440       *status = sen_com_erecv_head;
441       goto exit;
442     }
443     rest -= ret, buf->curr += ret;
444   } while (rest);
445   header = SEN_COM_SQTP_MSG_HEADER(buf);
446   SEN_LOG(sen_log_info, "recv (%d,%x,%d,%02x,%02x,%04x,%08x)", header->size, header->flags, header->proto, header->qtype, header->level, header->status, header->info);
447   *status = header->status;
448   *info = header->info;
449   {
450     uint16_t proto = header->proto;
451     size_t value_size = header->size;
452     size_t whole_size = sizeof(sen_com_sqtp_header) + value_size;
453     if (proto != SEN_COM_PROTO_SQTP) {
454       SEN_LOG(sen_log_error, "illegal header: %d", proto);
455       cs->rc = sen_invalid_format;
456       *status = sen_com_eproto;
457       *info = proto;
458       goto exit;
459     }
460     if (SEN_RBUF_WSIZE(buf) < whole_size) {
461       if ((cs->rc = sen_rbuf_resize(buf, whole_size))) {
462         *status = sen_com_emem; *info = 2;
463         goto exit;
464       }
465     }
466     for (rest = value_size; rest;) {
467       if ((ret = recv(cs->com.fd, buf->curr, rest, MSG_WAITALL)) <= 0) {
468         if (ret < 0) {
469 #ifdef WIN32
470           int e = WSAGetLastError();
471           LOG_SOCKERR("recv body");
472           if (e == WSAEWOULDBLOCK || e == WSAEINTR) { continue; }
473           *info = e;
474 #else /* WIN32 */
475           LOG_SOCKERR("recv body");
476           if (errno == EAGAIN || errno == EINTR) { continue; }
477           *info = errno;
478 #endif /* WIN32 */
479         }
480         cs->rc = sen_external_error;
481         *status = sen_com_erecv_body;
482         goto exit;
483       }
484       rest -= ret, buf->curr += ret;
485     }
486     *buf->curr = '\0';
487   }
488   cs->rc = sen_success;
489 exit :
490   return cs->rc;
491 }
492 
493 sen_com_sqtp *
sen_com_sqtp_copen(sen_com_event * ev,const char * dest,int port)494 sen_com_sqtp_copen(sen_com_event *ev, const char *dest, int port)
495 {
496   sen_ctx *ctx = &sen_gctx; /* todo : replace it with the local ctx */
497   sen_sock fd;
498   sen_com_sqtp *cs = NULL;
499   struct hostent *he;
500   struct sockaddr_in addr;
501   if (!(he = gethostbyname(dest))) {
502     LOG_SOCKERR("gethostbyname");
503     goto exit;
504   }
505   addr.sin_family = AF_INET;
506   memcpy(&addr.sin_addr, he->h_addr, he->h_length);
507   addr.sin_port = htons(port);
508   if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
509     LOG_SOCKERR("socket");
510     goto exit;
511   }
512   {
513     int v = 1;
514     if (setsockopt(fd, 6, TCP_NODELAY, (void *) &v, sizeof(int)) == -1) {
515       LOG_SOCKERR("setsockopt");
516     }
517   }
518   while (connect(fd, (struct sockaddr *)&addr, sizeof addr) == -1) {
519 #ifdef WIN32
520     if (WSAGetLastError() == WSAECONNREFUSED)
521 #else /* WIN32 */
522     if (errno == ECONNREFUSED)
523 #endif /* WIN32 */
524     {
525       SEN_LOG(sen_log_notice, "connect retrying..");
526       sleep(2);
527       continue;
528     }
529     LOG_SOCKERR("connect");
530     goto exit;
531   }
532   if (ev) {
533     if (sen_com_event_add(ev, fd, SEN_COM_POLLIN, (sen_com **)&cs)) { goto exit; }
534   } else {
535     if (!(cs = SEN_CALLOC(sizeof(sen_com_sqtp)))) { goto exit; }
536     cs->com.fd = fd;
537   }
538   sen_rbuf_init(&cs->msg, 0);
539 exit :
540   if (!cs) { sen_sock_close(fd); }
541   return cs;
542 }
543 
544 static void
sen_com_sqtp_receiver(sen_com_event * ev,sen_com * c)545 sen_com_sqtp_receiver(sen_com_event *ev, sen_com *c)
546 {
547   unsigned int status, info;
548   sen_com_sqtp *cs = (sen_com_sqtp *)c;
549   if (cs->com.status == sen_com_closing) {
550     sen_com_sqtp_close(ev, cs);
551     return;
552   }
553   if (cs->com.status != sen_com_idle) {
554     SEN_LOG(sen_log_info, "waiting to be idle.. (%d) %d", c->fd, ev->set->n_entries);
555     usleep(1000);
556     return;
557   }
558   sen_com_sqtp_recv(cs, &cs->msg, &status, &info);
559   cs->com.status = sen_com_doing;
560   cs->msg_in(ev, c);
561 }
562 
563 static void
sen_com_sqtp_acceptor(sen_com_event * ev,sen_com * c)564 sen_com_sqtp_acceptor(sen_com_event *ev, sen_com *c)
565 {
566   sen_com_sqtp *cs = (sen_com_sqtp *)c, *ncs;
567   sen_sock fd = accept(cs->com.fd, NULL, NULL);
568   if (fd == -1) {
569     LOG_SOCKERR("accept");
570     return;
571   }
572   if (sen_com_event_add(ev, fd, SEN_COM_POLLIN, (sen_com **)&ncs)) {
573     sen_sock_close(fd);
574     return;
575   }
576   ncs->com.ev_in = sen_com_sqtp_receiver;
577   sen_rbuf_init(&ncs->msg, 0);
578   ncs->msg_in = cs->msg_in;
579 }
580 
581 #define LISTEN_BACKLOG 0x1000
582 
583 sen_com_sqtp *
sen_com_sqtp_sopen(sen_com_event * ev,int port,sen_com_callback * func)584 sen_com_sqtp_sopen(sen_com_event *ev, int port, sen_com_callback *func)
585 {
586   sen_ctx *ctx = &sen_gctx; /* todo : replace it with the local ctx */
587   sen_sock lfd;
588   sen_com_sqtp *cs = NULL;
589   struct sockaddr_in addr;
590   memset(&addr, 0, sizeof(struct sockaddr_in));
591   addr.sin_family = AF_INET;
592   addr.sin_addr.s_addr = INADDR_ANY;
593   addr.sin_port = htons(port);
594   if ((lfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
595     LOG_SOCKERR("socket");
596     return NULL;
597   }
598   {
599     int v = 1;
600     if (setsockopt(lfd, 6, TCP_NODELAY, (void *) &v, sizeof(int)) == -1) {
601       LOG_SOCKERR("setsockopt");
602       goto exit;
603     }
604   }
605   {
606     int retry = 0;
607     for (;;) {
608       if (bind(lfd, (struct sockaddr *) &addr, sizeof addr) < 0) {
609 #ifdef WIN32
610         if (WSAGetLastError() == WSAEADDRINUSE)
611 #else /* WIN32 */
612         if (errno == EADDRINUSE)
613 #endif /* WIN32 */
614         {
615           SEN_LOG(sen_log_notice, "bind retrying..(%d)", port);
616           if (++retry < 10) { sleep(2); continue; }
617         }
618         LOG_SOCKERR("bind");
619         goto exit;
620       }
621       break;
622     }
623   }
624   if (listen(lfd, LISTEN_BACKLOG) < 0) {
625     LOG_SOCKERR("listen");
626     goto exit;
627   }
628   if (ev) {
629     if (sen_com_event_add(ev, lfd, SEN_COM_POLLIN, (sen_com **)&cs)) { goto exit; }
630   } else {
631     if (!(cs = SEN_MALLOC(sizeof(sen_com_sqtp)))) { goto exit; }
632     cs->com.fd = lfd;
633   }
634 exit :
635   if (cs) {
636     cs->com.ev_in = sen_com_sqtp_acceptor;
637     cs->msg_in = func;
638   } else {
639     sen_sock_close(lfd);
640   }
641   return cs;
642 }
643 
644 sen_rc
sen_com_sqtp_close(sen_com_event * ev,sen_com_sqtp * cs)645 sen_com_sqtp_close(sen_com_event *ev, sen_com_sqtp *cs)
646 {
647   sen_ctx *ctx = &sen_gctx; /* todo : replace it with the local ctx */
648   sen_sock fd = cs->com.fd;
649   sen_rbuf_fin(&cs->msg);
650   if (ev) {
651     sen_com_event_del(ev, fd);
652   } else {
653     SEN_FREE(cs);
654   }
655   if (shutdown(fd, SHUT_RDWR) == -1) { /* LOG_SOCKERR("shutdown"); */ }
656   if (sen_sock_close(fd) == -1) {
657     LOG_SOCKERR("close");
658     return sen_external_error;
659   }
660   return sen_success;
661 }
662