1 /* Copyright(C) 2004-2007 Brazil
2
3 This library is free software; you can redistribute it and/or
4 modify it under the terms of the GNU Lesser General Public
5 License as published by the Free Software Foundation; either
6 version 2.1 of the License, or (at your option) any later version.
7
8 This library is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 Lesser General Public License for more details.
12
13 You should have received a copy of the GNU Lesser General Public
14 License along with this library; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 */
17
18 #include "senna_in.h"
19
20 #include <string.h>
21 #ifdef HAVE_NETDB_H
22 #include <netdb.h>
23 #endif /* HAVE_NETDB_H */
24 #ifdef HAVE_SYS_SOCKET_H
25 #include <sys/socket.h>
26 #endif /* HAVE_SYS_SOCKET_H */
27 #ifdef HAVE_NETINET_IN_H
28 #include <netinet/in.h>
29 #endif /* HAVE_NETINET_IN_H */
30 #ifdef HAVE_NETINET_TCP_H
31 #include <netinet/tcp.h>
32 #endif /* HAVE_NETINET_TCP_H */
33 #ifdef HAVE_SIGNAL_H
34 #include <signal.h>
35 #endif /* HAVE_SIGNAL_H */
36
37 #include "com.h"
38 #include "set.h"
39 #include "ctx.h"
40
41 #ifndef PF_INET
42 #define PF_INET AF_INET
43 #endif /* PF_INET */
44
45 #ifdef USE_MSG_NOSIGNAL
46 #if __FreeBSD__ >= 2 && __FreeBSD_version >= 600020
47 #define MSG_NOSIGNAL 0x20000
48 #endif /* __FreeBSD__ >= 2 && __FreeBSD_version >= 600020 */
49 #else /* USE_MSG_NOSIGNAL */
50 #define MSG_NOSIGNAL 0
51 #endif /* USE_MSG_NOSIGNAL */
52
53 #ifdef WIN32
54 void
log_sockerr(const char * msg)55 log_sockerr(const char *msg)
56 {
57 int e = WSAGetLastError();
58 const char *m;
59 switch (e) {
60 case WSANOTINITIALISED: m = "please call sen_com_init first"; break;
61 case WSAEFAULT: m = "bad address"; break;
62 case WSAEINVAL: m = "invalid argument"; break;
63 case WSAEMFILE: m = "too many sockets"; break;
64 case WSAEWOULDBLOCK: m = "operation would block"; break;
65 case WSAENOTSOCK: m = "given fd is not socket fd"; break;
66 case WSAEOPNOTSUPP: m = "operation is not supported"; break;
67 case WSAEADDRINUSE: m = "address is already in use"; break;
68 case WSAEADDRNOTAVAIL: m = "address is not available"; break;
69 case WSAENETDOWN: m = "network is down"; break;
70 case WSAENOBUFS: m = "no buffer"; break;
71 case WSAEISCONN: m = "socket is already connected"; break;
72 case WSAENOTCONN: m = "socket is not connected"; break;
73 case WSAESHUTDOWN: m = "socket is already shutdowned"; break;
74 case WSAETIMEDOUT: m = "connection time out"; break;
75 case WSAECONNREFUSED: m = "connection refused"; break;
76 default:
77 SEN_LOG(sen_log_error, "%s: socket error (%d)", msg, e);
78 return;
79 }
80 SEN_LOG(sen_log_error, "%s: %s", msg, m);
81 }
82 #define LOG_SOCKERR(m) log_sockerr((m))
83 #else /* WIN32 */
84 #define LOG_SOCKERR(m) SEN_LOG(sen_log_error, "%s: %s", (m), strerror(errno))
85 #endif /* WIN32 */
86
87 /******* sen_com ********/
88
89 sen_rc
sen_com_init(void)90 sen_com_init(void)
91 {
92 #ifdef WIN32
93 WSADATA wd;
94 if (WSAStartup(MAKEWORD(2, 0), &wd) != 0) {
95 GERR(sen_external_error, "WSAStartup failed");
96 }
97 #else /* WIN32 */
98 #ifndef USE_MSG_NOSIGNAL
99 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
100 GERR(sen_external_error, "signal");
101 }
102 #endif /* USE_MSG_NOSIGNAL */
103 #endif /* WIN32 */
104 return sen_gctx.rc;
105 }
106
107 void
sen_com_fin(void)108 sen_com_fin(void)
109 {
110 #ifdef WIN32
111 WSACleanup();
112 #endif /* WIN32 */
113 }
114
115 sen_rc
sen_com_event_init(sen_com_event * ev,int max_nevents,int data_size)116 sen_com_event_init(sen_com_event *ev, int max_nevents, int data_size)
117 {
118 sen_ctx *ctx = &sen_gctx; /* todo : replace it with the local ctx */
119 sen_rc rc = sen_memory_exhausted;
120 ev->max_nevents = max_nevents;
121 if ((ev->set = sen_set_open(sizeof(sen_sock), data_size, 0))) {
122 #ifndef USE_SELECT
123 #ifdef USE_EPOLL
124 if ((ev->events = SEN_MALLOC(sizeof(struct epoll_event) * max_nevents))) {
125 if ((ev->epfd = epoll_create(max_nevents)) != -1) {
126 rc = sen_success;
127 goto exit;
128 } else {
129 LOG_SOCKERR("epoll_create");
130 rc = sen_external_error;
131 }
132 SEN_FREE(ev->events);
133 }
134 #else /* USE_EPOLL */
135 if ((ev->events = SEN_MALLOC(sizeof(struct pollfd) * max_nevents))) {
136 rc = sen_success;
137 goto exit;
138 }
139 #endif /* USE_EPOLL */
140 sen_set_close(ev->set);
141 ev->set = NULL;
142 ev->events = NULL;
143 #else /* USE_SELECT */
144 rc = sen_success;
145 #endif /* USE_SELECT */
146 }
147 exit :
148 return rc;
149 }
150
151 sen_rc
sen_com_event_fin(sen_com_event * ev)152 sen_com_event_fin(sen_com_event *ev)
153 {
154 sen_ctx *ctx = &sen_gctx; /* todo : replace it with the local ctx */
155 if (ev->set) { sen_set_close(ev->set); }
156 #ifndef USE_SELECT
157 if (ev->events) { SEN_FREE(ev->events); }
158 #endif /* USE_SELECT */
159 return sen_success;
160 }
161
162 sen_rc
sen_com_event_add(sen_com_event * ev,sen_sock fd,int events,sen_com ** com)163 sen_com_event_add(sen_com_event *ev, sen_sock fd, int events, sen_com **com)
164 {
165 sen_com *c;
166 /* todo : expand events */
167 if (!ev || ev->set->n_entries == ev->max_nevents) {
168 if (ev) { SEN_LOG(sen_log_error, "too many connections (%d)", ev->max_nevents); }
169 return sen_invalid_argument;
170 }
171 #ifdef USE_EPOLL
172 {
173 struct epoll_event e;
174 memset(&e, 0, sizeof(struct epoll_event));
175 e.data.fd = (fd);
176 e.events = (__uint32_t) events;
177 if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, (fd), &e) == -1) {
178 LOG_SOCKERR("epoll_ctl");
179 return sen_external_error;
180 }
181 }
182 #endif /* USE_EPOLL*/
183 if (sen_set_get(ev->set, &fd, (void **)&c)) {
184 c->fd = fd;
185 c->events = events;
186 if (com) { *com = c; }
187 return sen_success;
188 }
189 return sen_internal_error;
190 }
191
192 sen_rc
sen_com_event_mod(sen_com_event * ev,sen_sock fd,int events,sen_com ** com)193 sen_com_event_mod(sen_com_event *ev, sen_sock fd, int events, sen_com **com)
194 {
195 sen_com *c;
196 if (!ev) { return sen_invalid_argument; }
197 if (sen_set_at(ev->set, &fd, (void **)&c)) {
198 if (c->fd != fd) {
199 SEN_LOG(sen_log_error, "sen_com_event_mod fd unmatch %d != %d", c->fd, fd);
200 return sen_invalid_format;
201 }
202 if (com) { *com = c; }
203 if (c->events != events) {
204 #ifdef USE_EPOLL
205 struct epoll_event e;
206 memset(&e, 0, sizeof(struct epoll_event));
207 e.data.fd = (fd);
208 e.events = (__uint32_t) events;
209 if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, (fd), &e) == -1) {
210 LOG_SOCKERR("epoll_ctl");
211 return sen_external_error;
212 }
213 #endif /* USE_EPOLL*/
214 c->events = events;
215 }
216 return sen_success;
217 }
218 return sen_internal_error;
219 }
220
221 sen_rc
sen_com_event_del(sen_com_event * ev,sen_sock fd)222 sen_com_event_del(sen_com_event *ev, sen_sock fd)
223 {
224 if (!ev) { return sen_invalid_argument; }
225 {
226 sen_com *c;
227 sen_set_eh *eh = sen_set_at(ev->set, &fd, (void **)&c);
228 if (eh) {
229 #ifdef USE_EPOLL
230 struct epoll_event e;
231 memset(&e, 0, sizeof(struct epoll_event));
232 e.data.fd = fd;
233 e.events = c->events;
234 if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, fd, &e) == -1) {
235 LOG_SOCKERR("epoll_ctl");
236 return sen_external_error;
237 }
238 #endif /* USE_EPOLL*/
239 return sen_set_del(ev->set, eh);
240 } else {
241 SEN_LOG(sen_log_error, "%04x| fd(%d) not found in ev(%p)", getpid(), fd, ev);
242 return sen_internal_error;
243 }
244 }
245 }
246
247 sen_rc
sen_com_event_poll(sen_com_event * ev,int timeout)248 sen_com_event_poll(sen_com_event *ev, int timeout)
249 {
250 int nevents;
251 sen_com *com;
252 #ifdef USE_SELECT
253 sen_sock *pfd;
254 int nfds = 0;
255 fd_set rfds;
256 fd_set wfds;
257 struct timeval tv;
258 if (timeout >= 0) {
259 tv.tv_sec = timeout / 1000;
260 tv.tv_usec = (timeout % 1000) * 1000;
261 }
262 FD_ZERO(&rfds);
263 FD_ZERO(&wfds);
264 SEN_SET_EACH(ev->set, eh, &pfd, &com, {
265 if ((com->events & SEN_COM_POLLIN)) { FD_SET(*pfd, &rfds); }
266 if ((com->events & SEN_COM_POLLOUT)) { FD_SET(*pfd, &wfds); }
267 if (*pfd > nfds) { nfds = *pfd; }
268 });
269 nevents = select(nfds + 1, &rfds, &wfds, NULL, (timeout >= 0) ? &tv : NULL);
270 if (nevents < 0) {
271 #ifdef WIN32
272 if (WSAGetLastError() == WSAEINTR) { return sen_success; }
273 #else /* WIN32 */
274 if (errno == EINTR) { return sen_success; }
275 #endif /* WIN32 */
276 LOG_SOCKERR("select");
277 return sen_external_error;
278 }
279 if (timeout < 0 && !nevents) { SEN_LOG(sen_log_notice, "select returns 0 events"); }
280 SEN_SET_EACH(ev->set, eh, &pfd, &com, {
281 if (FD_ISSET(*pfd, &rfds)) { com->ev_in(ev, com); }
282 if (FD_ISSET(*pfd, &wfds)) { com->ev_out(ev, com); }
283 });
284 #else /* USE_SELECT */
285 #ifdef USE_EPOLL
286 struct epoll_event *ep;
287 nevents = epoll_wait(ev->epfd, ev->events, ev->max_nevents, timeout);
288 #else /* USE_EPOLL */
289 int nfd = 0, *pfd;
290 struct pollfd *ep = ev->events;
291 SEN_SET_EACH(ev->set, eh, &pfd, &com, {
292 ep->fd = *pfd;
293 // ep->events =(short) com->events;
294 ep->events = POLLIN;
295 ep->revents = 0;
296 ep++;
297 nfd++;
298 });
299 nevents = poll(ev->events, nfd, timeout);
300 #endif /* USE_EPOLL */
301 if (nevents < 0) {
302 if (errno == EINTR) { return sen_success; }
303 LOG_SOCKERR("poll");
304 return sen_external_error;
305 }
306 if (timeout < 0 && !nevents) { SEN_LOG(sen_log_notice, "poll returns 0 events"); }
307 for (ep = ev->events; nevents; ep++) {
308 int efd;
309 #ifdef USE_EPOLL
310 efd = ep->data.fd;
311 nevents--;
312 if (!sen_set_at(ev->set, &efd, (void *)&com)) {
313 struct epoll_event e;
314 SEN_LOG(sen_log_error, "fd(%d) not found in ev->set", efd);
315 memset(&e, 0, sizeof(struct epoll_event));
316 e.data.fd = efd;
317 e.events = ep->events;
318 if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, efd, &e) == -1) { LOG_SOCKERR("epoll_ctl"); }
319 if (sen_sock_close(efd) == -1) { LOG_SOCKERR("close"); }
320 continue;
321 }
322 if ((ep->events & SEN_COM_POLLIN)) { com->ev_in(ev, com); }
323 if ((ep->events & SEN_COM_POLLOUT)) { com->ev_out(ev, com); }
324 #else /* USE_EPOLL */
325 efd = ep->fd;
326 if (!(ep->events & ep->revents)) { continue; }
327 nevents--;
328 if (!sen_set_at(ev->set, &efd, (void *)&com)) {
329 SEN_LOG(sen_log_error, "fd(%d) not found in ev->set", efd);
330 if (sen_sock_close(efd) == -1) { LOG_SOCKERR("close"); }
331 continue;
332 }
333 if ((ep->revents & SEN_COM_POLLIN)) { com->ev_in(ev, com); }
334 if ((ep->revents & SEN_COM_POLLOUT)) { com->ev_out(ev, com); }
335 #endif /* USE_EPOLL */
336 }
337 #endif /* USE_SELECT */
338 return sen_success;
339 }
340
341 /******* sen_com_sqtp ********/
342
343 sen_rc
sen_com_sqtp_send(sen_com_sqtp * cs,sen_com_sqtp_header * header,char * body)344 sen_com_sqtp_send(sen_com_sqtp *cs, sen_com_sqtp_header *header, char *body)
345 {
346 ssize_t ret, whole_size = sizeof(sen_com_sqtp_header) + header->size;
347 header->proto = SEN_COM_PROTO_SQTP;
348 if (cs->com.status == sen_com_closing) { header->flags |= SEN_CTX_QUIT; }
349 SEN_LOG(sen_log_info, "send (%d,%x,%d,%02x,%02x,%04x,%08x)", header->size, header->flags, header->proto, header->qtype, header->level, header->status, header->info);
350
351 if (header->size) {
352 #ifdef WIN32
353 ssize_t reth;
354 if ((reth = send(cs->com.fd, header, sizeof(sen_com_sqtp_header), 0)) == -1) {
355 LOG_SOCKERR("send size");
356 cs->rc = sen_external_error;
357 goto exit;
358 }
359 if ((ret = send(cs->com.fd, body, header->size, 0)) == -1) {
360 LOG_SOCKERR("send body");
361 cs->rc = sen_external_error;
362 goto exit;
363 }
364 ret += reth;
365 #else /* WIN32 */
366 struct iovec msg_iov[2];
367 struct msghdr msg;
368 msg.msg_name = NULL;
369 msg.msg_namelen = 0;
370 msg.msg_iov = msg_iov;
371 msg.msg_iovlen = 2;
372 msg.msg_control = NULL;
373 msg.msg_controllen = 0;
374 msg.msg_flags = 0;
375 msg_iov[0].iov_base = header;
376 msg_iov[0].iov_len = sizeof(sen_com_sqtp_header);
377 msg_iov[1].iov_base = body;
378 msg_iov[1].iov_len = header->size;
379 while ((ret = sendmsg(cs->com.fd, &msg, MSG_NOSIGNAL)) == -1) {
380 LOG_SOCKERR("sendmsg");
381 if (errno == EAGAIN || errno == EINTR) { continue; }
382 cs->rc = sen_external_error;
383 goto exit;
384 }
385 #endif /* WIN32 */
386 } else {
387 while ((ret = send(cs->com.fd, header, whole_size, MSG_NOSIGNAL)) == -1) {
388 #ifdef WIN32
389 int e = WSAGetLastError();
390 LOG_SOCKERR("send");
391 if (e == WSAEWOULDBLOCK || e == WSAEINTR) { continue; }
392 #else /* WIN32 */
393 LOG_SOCKERR("send");
394 if (errno == EAGAIN || errno == EINTR) { continue; }
395 #endif /* WIN32 */
396 cs->rc = sen_external_error;
397 goto exit;
398 }
399 }
400 if (ret != whole_size) {
401 SEN_LOG(sen_log_error, "sendmsg: %d < %d", ret, whole_size);
402 cs->rc = sen_external_error;
403 goto exit;
404 }
405 cs->rc = sen_success;
406 exit :
407 return cs->rc;
408 }
409
410 sen_rc
sen_com_sqtp_recv(sen_com_sqtp * cs,sen_rbuf * buf,unsigned int * status,unsigned int * info)411 sen_com_sqtp_recv(sen_com_sqtp *cs, sen_rbuf *buf,
412 unsigned int *status, unsigned int *info)
413 {
414 ssize_t ret;
415 sen_com_sqtp_header *header;
416 size_t rest = sizeof(sen_com_sqtp_header);
417 if (SEN_RBUF_WSIZE(buf) < rest) {
418 if ((cs->rc = sen_rbuf_reinit(buf, rest))) {
419 *status = sen_com_emem; *info = 1; goto exit;
420 }
421 } else {
422 SEN_RBUF_REWIND(buf);
423 }
424 do {
425 // todo : also support non blocking mode (use MSG_DONTWAIT)
426 if ((ret = recv(cs->com.fd, buf->curr, rest, MSG_WAITALL)) <= 0) {
427 if (ret < 0) {
428 #ifdef WIN32
429 int e = WSAGetLastError();
430 LOG_SOCKERR("recv size");
431 if (e == WSAEWOULDBLOCK || e == WSAEINTR) { continue; }
432 *info = e;
433 #else /* WIN32 */
434 LOG_SOCKERR("recv size");
435 if (errno == EAGAIN || errno == EINTR) { continue; }
436 *info = errno;
437 #endif /* WIN32 */
438 }
439 cs->rc = sen_external_error;
440 *status = sen_com_erecv_head;
441 goto exit;
442 }
443 rest -= ret, buf->curr += ret;
444 } while (rest);
445 header = SEN_COM_SQTP_MSG_HEADER(buf);
446 SEN_LOG(sen_log_info, "recv (%d,%x,%d,%02x,%02x,%04x,%08x)", header->size, header->flags, header->proto, header->qtype, header->level, header->status, header->info);
447 *status = header->status;
448 *info = header->info;
449 {
450 uint16_t proto = header->proto;
451 size_t value_size = header->size;
452 size_t whole_size = sizeof(sen_com_sqtp_header) + value_size;
453 if (proto != SEN_COM_PROTO_SQTP) {
454 SEN_LOG(sen_log_error, "illegal header: %d", proto);
455 cs->rc = sen_invalid_format;
456 *status = sen_com_eproto;
457 *info = proto;
458 goto exit;
459 }
460 if (SEN_RBUF_WSIZE(buf) < whole_size) {
461 if ((cs->rc = sen_rbuf_resize(buf, whole_size))) {
462 *status = sen_com_emem; *info = 2;
463 goto exit;
464 }
465 }
466 for (rest = value_size; rest;) {
467 if ((ret = recv(cs->com.fd, buf->curr, rest, MSG_WAITALL)) <= 0) {
468 if (ret < 0) {
469 #ifdef WIN32
470 int e = WSAGetLastError();
471 LOG_SOCKERR("recv body");
472 if (e == WSAEWOULDBLOCK || e == WSAEINTR) { continue; }
473 *info = e;
474 #else /* WIN32 */
475 LOG_SOCKERR("recv body");
476 if (errno == EAGAIN || errno == EINTR) { continue; }
477 *info = errno;
478 #endif /* WIN32 */
479 }
480 cs->rc = sen_external_error;
481 *status = sen_com_erecv_body;
482 goto exit;
483 }
484 rest -= ret, buf->curr += ret;
485 }
486 *buf->curr = '\0';
487 }
488 cs->rc = sen_success;
489 exit :
490 return cs->rc;
491 }
492
493 sen_com_sqtp *
sen_com_sqtp_copen(sen_com_event * ev,const char * dest,int port)494 sen_com_sqtp_copen(sen_com_event *ev, const char *dest, int port)
495 {
496 sen_ctx *ctx = &sen_gctx; /* todo : replace it with the local ctx */
497 sen_sock fd;
498 sen_com_sqtp *cs = NULL;
499 struct hostent *he;
500 struct sockaddr_in addr;
501 if (!(he = gethostbyname(dest))) {
502 LOG_SOCKERR("gethostbyname");
503 goto exit;
504 }
505 addr.sin_family = AF_INET;
506 memcpy(&addr.sin_addr, he->h_addr, he->h_length);
507 addr.sin_port = htons(port);
508 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
509 LOG_SOCKERR("socket");
510 goto exit;
511 }
512 {
513 int v = 1;
514 if (setsockopt(fd, 6, TCP_NODELAY, (void *) &v, sizeof(int)) == -1) {
515 LOG_SOCKERR("setsockopt");
516 }
517 }
518 while (connect(fd, (struct sockaddr *)&addr, sizeof addr) == -1) {
519 #ifdef WIN32
520 if (WSAGetLastError() == WSAECONNREFUSED)
521 #else /* WIN32 */
522 if (errno == ECONNREFUSED)
523 #endif /* WIN32 */
524 {
525 SEN_LOG(sen_log_notice, "connect retrying..");
526 sleep(2);
527 continue;
528 }
529 LOG_SOCKERR("connect");
530 goto exit;
531 }
532 if (ev) {
533 if (sen_com_event_add(ev, fd, SEN_COM_POLLIN, (sen_com **)&cs)) { goto exit; }
534 } else {
535 if (!(cs = SEN_CALLOC(sizeof(sen_com_sqtp)))) { goto exit; }
536 cs->com.fd = fd;
537 }
538 sen_rbuf_init(&cs->msg, 0);
539 exit :
540 if (!cs) { sen_sock_close(fd); }
541 return cs;
542 }
543
544 static void
sen_com_sqtp_receiver(sen_com_event * ev,sen_com * c)545 sen_com_sqtp_receiver(sen_com_event *ev, sen_com *c)
546 {
547 unsigned int status, info;
548 sen_com_sqtp *cs = (sen_com_sqtp *)c;
549 if (cs->com.status == sen_com_closing) {
550 sen_com_sqtp_close(ev, cs);
551 return;
552 }
553 if (cs->com.status != sen_com_idle) {
554 SEN_LOG(sen_log_info, "waiting to be idle.. (%d) %d", c->fd, ev->set->n_entries);
555 usleep(1000);
556 return;
557 }
558 sen_com_sqtp_recv(cs, &cs->msg, &status, &info);
559 cs->com.status = sen_com_doing;
560 cs->msg_in(ev, c);
561 }
562
563 static void
sen_com_sqtp_acceptor(sen_com_event * ev,sen_com * c)564 sen_com_sqtp_acceptor(sen_com_event *ev, sen_com *c)
565 {
566 sen_com_sqtp *cs = (sen_com_sqtp *)c, *ncs;
567 sen_sock fd = accept(cs->com.fd, NULL, NULL);
568 if (fd == -1) {
569 LOG_SOCKERR("accept");
570 return;
571 }
572 if (sen_com_event_add(ev, fd, SEN_COM_POLLIN, (sen_com **)&ncs)) {
573 sen_sock_close(fd);
574 return;
575 }
576 ncs->com.ev_in = sen_com_sqtp_receiver;
577 sen_rbuf_init(&ncs->msg, 0);
578 ncs->msg_in = cs->msg_in;
579 }
580
581 #define LISTEN_BACKLOG 0x1000
582
583 sen_com_sqtp *
sen_com_sqtp_sopen(sen_com_event * ev,int port,sen_com_callback * func)584 sen_com_sqtp_sopen(sen_com_event *ev, int port, sen_com_callback *func)
585 {
586 sen_ctx *ctx = &sen_gctx; /* todo : replace it with the local ctx */
587 sen_sock lfd;
588 sen_com_sqtp *cs = NULL;
589 struct sockaddr_in addr;
590 memset(&addr, 0, sizeof(struct sockaddr_in));
591 addr.sin_family = AF_INET;
592 addr.sin_addr.s_addr = INADDR_ANY;
593 addr.sin_port = htons(port);
594 if ((lfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
595 LOG_SOCKERR("socket");
596 return NULL;
597 }
598 {
599 int v = 1;
600 if (setsockopt(lfd, 6, TCP_NODELAY, (void *) &v, sizeof(int)) == -1) {
601 LOG_SOCKERR("setsockopt");
602 goto exit;
603 }
604 }
605 {
606 int retry = 0;
607 for (;;) {
608 if (bind(lfd, (struct sockaddr *) &addr, sizeof addr) < 0) {
609 #ifdef WIN32
610 if (WSAGetLastError() == WSAEADDRINUSE)
611 #else /* WIN32 */
612 if (errno == EADDRINUSE)
613 #endif /* WIN32 */
614 {
615 SEN_LOG(sen_log_notice, "bind retrying..(%d)", port);
616 if (++retry < 10) { sleep(2); continue; }
617 }
618 LOG_SOCKERR("bind");
619 goto exit;
620 }
621 break;
622 }
623 }
624 if (listen(lfd, LISTEN_BACKLOG) < 0) {
625 LOG_SOCKERR("listen");
626 goto exit;
627 }
628 if (ev) {
629 if (sen_com_event_add(ev, lfd, SEN_COM_POLLIN, (sen_com **)&cs)) { goto exit; }
630 } else {
631 if (!(cs = SEN_MALLOC(sizeof(sen_com_sqtp)))) { goto exit; }
632 cs->com.fd = lfd;
633 }
634 exit :
635 if (cs) {
636 cs->com.ev_in = sen_com_sqtp_acceptor;
637 cs->msg_in = func;
638 } else {
639 sen_sock_close(lfd);
640 }
641 return cs;
642 }
643
644 sen_rc
sen_com_sqtp_close(sen_com_event * ev,sen_com_sqtp * cs)645 sen_com_sqtp_close(sen_com_event *ev, sen_com_sqtp *cs)
646 {
647 sen_ctx *ctx = &sen_gctx; /* todo : replace it with the local ctx */
648 sen_sock fd = cs->com.fd;
649 sen_rbuf_fin(&cs->msg);
650 if (ev) {
651 sen_com_event_del(ev, fd);
652 } else {
653 SEN_FREE(cs);
654 }
655 if (shutdown(fd, SHUT_RDWR) == -1) { /* LOG_SOCKERR("shutdown"); */ }
656 if (sen_sock_close(fd) == -1) {
657 LOG_SOCKERR("close");
658 return sen_external_error;
659 }
660 return sen_success;
661 }
662