1 /* $OpenLDAP$ */
2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
3 *
4 * Copyright 1998-2021 The OpenLDAP Foundation.
5 * Portions Copyright 2007 by Howard Chu, Symas Corporation.
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted only as authorized by the OpenLDAP
10 * Public License.
11 *
12 * A copy of this license is available in the file LICENSE in the
13 * top-level directory of the distribution or, alternatively, at
14 * <http://www.OpenLDAP.org/license.html>.
15 */
16 /* Portions Copyright (c) 1995 Regents of the University of Michigan.
17 * All rights reserved.
18 *
19 * Redistribution and use in source and binary forms are permitted
20 * provided that this notice is preserved and that due credit is given
21 * to the University of Michigan at Ann Arbor. The name of the University
22 * may not be used to endorse or promote products derived from this
23 * software without specific prior written permission. This software
24 * is provided ``as is'' without express or implied warranty.
25 */
26
27 #include "portable.h"
28
29 #include <stdio.h>
30
31 #include <ac/ctype.h>
32 #include <ac/errno.h>
33 #include <ac/socket.h>
34 #include <ac/string.h>
35 #include <ac/time.h>
36 #include <ac/unistd.h>
37
38 #include <event2/event.h>
39 #include <event2/dns.h>
40 #include <event2/listener.h>
41
42 #include "lload.h"
43 #include "ldap_pvt_thread.h"
44 #include "lutil.h"
45
46 #include "ldap_rq.h"
47
48 #ifdef HAVE_SYSTEMD_SD_DAEMON_H
49 #include <systemd/sd-daemon.h>
50 #endif
51
52 #ifdef LDAP_PF_LOCAL
53 #include <sys/stat.h>
54 /* this should go in <ldap.h> as soon as it is accepted */
55 #define LDAPI_MOD_URLEXT "x-mod"
56 #endif /* LDAP_PF_LOCAL */
57
58 #ifndef BALANCER_MODULE
59 #ifdef LDAP_PF_INET6
60 int slap_inet4or6 = AF_UNSPEC;
61 #else /* ! INETv6 */
62 int slap_inet4or6 = AF_INET;
63 #endif /* ! INETv6 */
64
65 /* globals */
66 time_t starttime;
67 struct runqueue_s slapd_rq;
68
69 #ifdef LDAP_TCP_BUFFER
70 int slapd_tcp_rmem;
71 int slapd_tcp_wmem;
72 #endif /* LDAP_TCP_BUFFER */
73
74 volatile sig_atomic_t slapd_shutdown = 0;
75 volatile sig_atomic_t slapd_gentle_shutdown = 0;
76 volatile sig_atomic_t slapd_abrupt_shutdown = 0;
77 #endif /* !BALANCER_MODULE */
78
79 static int emfile;
80
81 ldap_pvt_thread_mutex_t lload_wait_mutex;
82 ldap_pvt_thread_cond_t lload_wait_cond;
83 ldap_pvt_thread_cond_t lload_pause_cond;
84
85 #ifndef SLAPD_MAX_DAEMON_THREADS
86 #define SLAPD_MAX_DAEMON_THREADS 16
87 #endif
88 int lload_daemon_threads = 1;
89 int lload_daemon_mask;
90
91 struct event_base *listener_base = NULL;
92 LloadListener **lload_listeners = NULL;
93 static ldap_pvt_thread_t listener_tid, *daemon_tid;
94
95 struct event_base *daemon_base = NULL;
96 struct evdns_base *dnsbase;
97
98 struct event *lload_timeout_event;
99
100 /*
101 * global lload statistics. Not mutex protected to preserve performance -
102 * increment is atomic, at most we risk a bit of inconsistency
103 */
104 lload_global_stats_t lload_stats = {};
105
106 #ifndef SLAPD_LISTEN_BACKLOG
107 #define SLAPD_LISTEN_BACKLOG 1024
108 #endif /* ! SLAPD_LISTEN_BACKLOG */
109
110 #define DAEMON_ID(fd) ( fd & lload_daemon_mask )
111
112 #ifdef HAVE_WINSOCK
113 ldap_pvt_thread_mutex_t slapd_ws_mutex;
114 SOCKET *slapd_ws_sockets;
115 #define SD_READ 1
116 #define SD_WRITE 2
117 #define SD_ACTIVE 4
118 #define SD_LISTENER 8
119 #endif
120
121 #ifdef HAVE_TCPD
122 static ldap_pvt_thread_mutex_t sd_tcpd_mutex;
123 #endif /* TCP Wrappers */
124
125 typedef struct listener_item {
126 struct evconnlistener *listener;
127 ber_socket_t fd;
128 } listener_item;
129
130 typedef struct lload_daemon_st {
131 ldap_pvt_thread_mutex_t sd_mutex;
132
133 struct event_base *base;
134 struct event *wakeup_event;
135 } lload_daemon_st;
136
137 static lload_daemon_st lload_daemon[SLAPD_MAX_DAEMON_THREADS];
138
139 static void daemon_wakeup_cb( evutil_socket_t sig, short what, void *arg );
140
141 static void
lloadd_close(ber_socket_t s)142 lloadd_close( ber_socket_t s )
143 {
144 Debug( LDAP_DEBUG_CONNS, "lloadd_close: "
145 "closing fd=%ld\n",
146 (long)s );
147 tcp_close( s );
148 }
149
150 static void
lload_free_listener_addresses(struct sockaddr ** sal)151 lload_free_listener_addresses( struct sockaddr **sal )
152 {
153 struct sockaddr **sap;
154 if ( sal == NULL ) return;
155 for ( sap = sal; *sap != NULL; sap++ )
156 ch_free(*sap);
157 ch_free( sal );
158 }
159
160 #if defined(LDAP_PF_LOCAL) || defined(SLAP_X_LISTENER_MOD)
161 static int
get_url_perms(char ** exts,mode_t * perms,int * crit)162 get_url_perms( char **exts, mode_t *perms, int *crit )
163 {
164 int i;
165
166 assert( exts != NULL );
167 assert( perms != NULL );
168 assert( crit != NULL );
169
170 *crit = 0;
171 for ( i = 0; exts[i]; i++ ) {
172 char *type = exts[i];
173 int c = 0;
174
175 if ( type[0] == '!' ) {
176 c = 1;
177 type++;
178 }
179
180 if ( strncasecmp( type, LDAPI_MOD_URLEXT "=",
181 sizeof(LDAPI_MOD_URLEXT "=") - 1 ) == 0 ) {
182 char *value = type + ( sizeof(LDAPI_MOD_URLEXT "=") - 1 );
183 mode_t p = 0;
184 int j;
185
186 switch ( strlen( value ) ) {
187 case 4:
188 /* skip leading '0' */
189 if ( value[0] != '0' ) return LDAP_OTHER;
190 value++;
191
192 case 3:
193 for ( j = 0; j < 3; j++ ) {
194 int v;
195
196 v = value[j] - '0';
197
198 if ( v < 0 || v > 7 ) return LDAP_OTHER;
199
200 p |= v << 3 * ( 2 - j );
201 }
202 break;
203
204 case 10:
205 for ( j = 1; j < 10; j++ ) {
206 static mode_t m[] = { 0, S_IRUSR, S_IWUSR, S_IXUSR,
207 S_IRGRP, S_IWGRP, S_IXGRP, S_IROTH, S_IWOTH,
208 S_IXOTH };
209 static const char c[] = "-rwxrwxrwx";
210
211 if ( value[j] == c[j] ) {
212 p |= m[j];
213
214 } else if ( value[j] != '-' ) {
215 return LDAP_OTHER;
216 }
217 }
218 break;
219
220 default:
221 return LDAP_OTHER;
222 }
223
224 *crit = c;
225 *perms = p;
226
227 return LDAP_SUCCESS;
228 }
229 }
230
231 return LDAP_OTHER;
232 }
233 #endif /* LDAP_PF_LOCAL || SLAP_X_LISTENER_MOD */
234
235 /* port = 0 indicates AF_LOCAL */
236 static int
lload_get_listener_addresses(const char * host,unsigned short port,struct sockaddr *** sal)237 lload_get_listener_addresses(
238 const char *host,
239 unsigned short port,
240 struct sockaddr ***sal )
241 {
242 struct sockaddr **sap;
243
244 #ifdef LDAP_PF_LOCAL
245 if ( port == 0 ) {
246 sap = *sal = ch_malloc( 2 * sizeof(void *) );
247
248 *sap = ch_calloc( 1, sizeof(struct sockaddr_un) );
249 sap[1] = NULL;
250
251 if ( strlen( host ) >
252 ( sizeof( ((struct sockaddr_un *)*sap)->sun_path ) - 1 ) ) {
253 Debug( LDAP_DEBUG_ANY, "lload_get_listener_addresses: "
254 "domain socket path (%s) too long in URL\n",
255 host );
256 goto errexit;
257 }
258
259 (*sap)->sa_family = AF_LOCAL;
260 strcpy( ((struct sockaddr_un *)*sap)->sun_path, host );
261 } else
262 #endif /* LDAP_PF_LOCAL */
263 {
264 #ifdef HAVE_GETADDRINFO
265 struct addrinfo hints, *res, *sai;
266 int n, err;
267 char serv[7];
268
269 memset( &hints, '\0', sizeof(hints) );
270 hints.ai_flags = AI_PASSIVE;
271 hints.ai_socktype = SOCK_STREAM;
272 hints.ai_family = slap_inet4or6;
273 snprintf( serv, sizeof(serv), "%d", port );
274
275 if ( (err = getaddrinfo( host, serv, &hints, &res )) ) {
276 Debug( LDAP_DEBUG_ANY, "lload_get_listener_addresses: "
277 "getaddrinfo() failed: %s\n",
278 AC_GAI_STRERROR(err) );
279 return -1;
280 }
281
282 sai = res;
283 for ( n = 2; ( sai = sai->ai_next ) != NULL; n++ ) {
284 /* EMPTY */;
285 }
286 sap = *sal = ch_calloc( n, sizeof(void *) );
287
288 *sap = NULL;
289
290 for ( sai = res; sai; sai = sai->ai_next ) {
291 if ( sai->ai_addr == NULL ) {
292 Debug( LDAP_DEBUG_ANY, "lload_get_listener_addresses: "
293 "getaddrinfo ai_addr is NULL?\n" );
294 freeaddrinfo( res );
295 goto errexit;
296 }
297
298 switch ( sai->ai_family ) {
299 #ifdef LDAP_PF_INET6
300 case AF_INET6:
301 *sap = ch_malloc( sizeof(struct sockaddr_in6) );
302 *(struct sockaddr_in6 *)*sap =
303 *((struct sockaddr_in6 *)sai->ai_addr);
304 break;
305 #endif /* LDAP_PF_INET6 */
306 case AF_INET:
307 *sap = ch_malloc( sizeof(struct sockaddr_in) );
308 *(struct sockaddr_in *)*sap =
309 *((struct sockaddr_in *)sai->ai_addr);
310 break;
311 default:
312 *sap = NULL;
313 break;
314 }
315
316 if ( *sap != NULL ) {
317 (*sap)->sa_family = sai->ai_family;
318 sap++;
319 *sap = NULL;
320 }
321 }
322
323 freeaddrinfo( res );
324
325 #else /* ! HAVE_GETADDRINFO */
326 int i, n = 1;
327 struct in_addr in;
328 struct hostent *he = NULL;
329
330 if ( host == NULL ) {
331 in.s_addr = htonl( INADDR_ANY );
332
333 } else if ( !inet_aton( host, &in ) ) {
334 he = gethostbyname( host );
335 if ( he == NULL ) {
336 Debug( LDAP_DEBUG_ANY, "lload_get_listener_addresses: "
337 "invalid host %s\n",
338 host );
339 return -1;
340 }
341 for ( n = 0; he->h_addr_list[n]; n++ ) /* empty */;
342 }
343
344 sap = *sal = ch_malloc( ( n + 1 ) * sizeof(void *) );
345
346 for ( i = 0; i < n; i++ ) {
347 sap[i] = ch_calloc( 1, sizeof(struct sockaddr_in) );
348 sap[i]->sa_family = AF_INET;
349 ((struct sockaddr_in *)sap[i])->sin_port = htons( port );
350 AC_MEMCPY( &((struct sockaddr_in *)sap[i])->sin_addr,
351 he ? (struct in_addr *)he->h_addr_list[i] : &in,
352 sizeof(struct in_addr) );
353 }
354 sap[i] = NULL;
355 #endif /* ! HAVE_GETADDRINFO */
356 }
357
358 return 0;
359
360 errexit:
361 lload_free_listener_addresses(*sal);
362 return -1;
363 }
364
365 static int
lload_open_listener(const char * url,LDAPURLDesc * lud,int * listeners,int * cur)366 lload_open_listener(
367 const char *url,
368 LDAPURLDesc *lud,
369 int *listeners,
370 int *cur )
371 {
372 int num, tmp, rc;
373 LloadListener l;
374 LloadListener *li;
375 unsigned short port;
376 int err, addrlen = 0;
377 struct sockaddr **sal = NULL, **psal;
378 int socktype = SOCK_STREAM; /* default to COTS */
379 ber_socket_t s;
380 char ebuf[128];
381
382 #if defined(LDAP_PF_LOCAL) || defined(SLAP_X_LISTENER_MOD)
383 /*
384 * use safe defaults
385 */
386 int crit = 1;
387 #endif /* LDAP_PF_LOCAL || SLAP_X_LISTENER_MOD */
388
389 assert( url );
390 assert( lud );
391
392 l.sl_url.bv_val = NULL;
393 l.sl_mute = 0;
394 l.sl_busy = 0;
395
396 #ifndef HAVE_TLS
397 if ( ldap_pvt_url_scheme2tls( lud->lud_scheme ) ) {
398 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
399 "TLS not supported (%s)\n",
400 url );
401 ldap_free_urldesc( lud );
402 return -1;
403 }
404
405 if ( !lud->lud_port ) lud->lud_port = LDAP_PORT;
406
407 #else /* HAVE_TLS */
408 l.sl_is_tls = ldap_pvt_url_scheme2tls( lud->lud_scheme );
409 #endif /* HAVE_TLS */
410
411 l.sl_is_proxied = ldap_pvt_url_scheme2proxied( lud->lud_scheme );
412
413 #ifdef LDAP_TCP_BUFFER
414 l.sl_tcp_rmem = 0;
415 l.sl_tcp_wmem = 0;
416 #endif /* LDAP_TCP_BUFFER */
417
418 port = (unsigned short)lud->lud_port;
419
420 tmp = ldap_pvt_url_scheme2proto( lud->lud_scheme );
421 if ( tmp == LDAP_PROTO_IPC ) {
422 #ifdef LDAP_PF_LOCAL
423 if ( lud->lud_host == NULL || lud->lud_host[0] == '\0' ) {
424 err = lload_get_listener_addresses( LDAPI_SOCK, 0, &sal );
425 } else {
426 err = lload_get_listener_addresses( lud->lud_host, 0, &sal );
427 }
428 #else /* ! LDAP_PF_LOCAL */
429
430 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
431 "URL scheme not supported: %s\n",
432 url );
433 ldap_free_urldesc( lud );
434 return -1;
435 #endif /* ! LDAP_PF_LOCAL */
436 } else {
437 if ( lud->lud_host == NULL || lud->lud_host[0] == '\0' ||
438 strcmp( lud->lud_host, "*" ) == 0 ) {
439 err = lload_get_listener_addresses( NULL, port, &sal );
440 } else {
441 err = lload_get_listener_addresses( lud->lud_host, port, &sal );
442 }
443 }
444
445 #if defined(LDAP_PF_LOCAL) || defined(SLAP_X_LISTENER_MOD)
446 if ( lud->lud_exts ) {
447 err = get_url_perms( lud->lud_exts, &l.sl_perms, &crit );
448 } else {
449 l.sl_perms = S_IRWXU | S_IRWXO;
450 }
451 #endif /* LDAP_PF_LOCAL || SLAP_X_LISTENER_MOD */
452
453 ldap_free_urldesc( lud );
454 if ( err ) {
455 lload_free_listener_addresses( sal );
456 return -1;
457 }
458
459 /* If we got more than one address returned, we need to make space
460 * for it in the lload_listeners array.
461 */
462 for ( num = 0; sal[num]; num++ ) /* empty */;
463 if ( num > 1 ) {
464 *listeners += num - 1;
465 lload_listeners = ch_realloc( lload_listeners,
466 ( *listeners + 1 ) * sizeof(LloadListener *) );
467 }
468
469 psal = sal;
470 while ( *sal != NULL ) {
471 char *af;
472 switch ( (*sal)->sa_family ) {
473 case AF_INET:
474 af = "IPv4";
475 break;
476 #ifdef LDAP_PF_INET6
477 case AF_INET6:
478 af = "IPv6";
479 break;
480 #endif /* LDAP_PF_INET6 */
481 #ifdef LDAP_PF_LOCAL
482 case AF_LOCAL:
483 af = "Local";
484 break;
485 #endif /* LDAP_PF_LOCAL */
486 default:
487 sal++;
488 continue;
489 }
490
491 s = socket( (*sal)->sa_family, socktype, 0 );
492 if ( s == AC_SOCKET_INVALID ) {
493 int err = sock_errno();
494 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
495 "%s socket() failed errno=%d (%s)\n",
496 af, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
497 sal++;
498 continue;
499 }
500 ber_pvt_socket_set_nonblock( s, 1 );
501 l.sl_sd = s;
502
503 #ifdef LDAP_PF_LOCAL
504 if ( (*sal)->sa_family == AF_LOCAL ) {
505 unlink( ((struct sockaddr_un *)*sal)->sun_path );
506 } else
507 #endif /* LDAP_PF_LOCAL */
508 {
509 #ifdef SO_REUSEADDR
510 /* enable address reuse */
511 tmp = 1;
512 rc = setsockopt(
513 s, SOL_SOCKET, SO_REUSEADDR, (char *)&tmp, sizeof(tmp) );
514 if ( rc == AC_SOCKET_ERROR ) {
515 int err = sock_errno();
516 Debug( LDAP_DEBUG_ANY, "lload_open_listener(%ld): "
517 "setsockopt(SO_REUSEADDR) failed errno=%d (%s)\n",
518 (long)l.sl_sd, err,
519 sock_errstr( err, ebuf, sizeof(ebuf) ) );
520 }
521 #endif /* SO_REUSEADDR */
522 }
523
524 switch ( (*sal)->sa_family ) {
525 case AF_INET:
526 addrlen = sizeof(struct sockaddr_in);
527 break;
528 #ifdef LDAP_PF_INET6
529 case AF_INET6:
530 #ifdef IPV6_V6ONLY
531 /* Try to use IPv6 sockets for IPv6 only */
532 tmp = 1;
533 rc = setsockopt( s, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&tmp,
534 sizeof(tmp) );
535 if ( rc == AC_SOCKET_ERROR ) {
536 int err = sock_errno();
537 Debug( LDAP_DEBUG_ANY, "lload_open_listener(%ld): "
538 "setsockopt(IPV6_V6ONLY) failed errno=%d (%s)\n",
539 (long)l.sl_sd, err,
540 sock_errstr( err, ebuf, sizeof(ebuf) ) );
541 }
542 #endif /* IPV6_V6ONLY */
543 addrlen = sizeof(struct sockaddr_in6);
544 break;
545 #endif /* LDAP_PF_INET6 */
546
547 #ifdef LDAP_PF_LOCAL
548 case AF_LOCAL:
549 #ifdef LOCAL_CREDS
550 {
551 int one = 1;
552 setsockopt( s, 0, LOCAL_CREDS, &one, sizeof(one) );
553 }
554 #endif /* LOCAL_CREDS */
555
556 addrlen = sizeof(struct sockaddr_un);
557 break;
558 #endif /* LDAP_PF_LOCAL */
559 }
560
561 #ifdef LDAP_PF_LOCAL
562 /* create socket with all permissions set for those systems
563 * that honor permissions on sockets (e.g. Linux); typically,
564 * only write is required. To exploit filesystem permissions,
565 * place the socket in a directory and use directory's
566 * permissions. Need write perms to the directory to
567 * create/unlink the socket; likely need exec perms to access
568 * the socket (ITS#4709) */
569 {
570 mode_t old_umask = 0;
571
572 if ( (*sal)->sa_family == AF_LOCAL ) {
573 old_umask = umask( 0 );
574 }
575 #endif /* LDAP_PF_LOCAL */
576 rc = bind( s, *sal, addrlen );
577 #ifdef LDAP_PF_LOCAL
578 if ( old_umask != 0 ) {
579 umask( old_umask );
580 }
581 }
582 #endif /* LDAP_PF_LOCAL */
583 if ( rc ) {
584 err = sock_errno();
585 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
586 "bind(%ld) failed errno=%d (%s)\n",
587 (long)l.sl_sd, err,
588 sock_errstr( err, ebuf, sizeof(ebuf) ) );
589 tcp_close( s );
590 sal++;
591 continue;
592 }
593
594 switch ( (*sal)->sa_family ) {
595 #ifdef LDAP_PF_LOCAL
596 case AF_LOCAL: {
597 char *path = ((struct sockaddr_un *)*sal)->sun_path;
598 l.sl_name.bv_len = strlen( path ) + STRLENOF("PATH=");
599 l.sl_name.bv_val = ch_malloc( l.sl_name.bv_len + 1 );
600 snprintf( l.sl_name.bv_val, l.sl_name.bv_len + 1, "PATH=%s",
601 path );
602 } break;
603 #endif /* LDAP_PF_LOCAL */
604
605 case AF_INET: {
606 char addr[INET_ADDRSTRLEN];
607 const char *s;
608 #if defined(HAVE_GETADDRINFO) && defined(HAVE_INET_NTOP)
609 s = inet_ntop( AF_INET,
610 &((struct sockaddr_in *)*sal)->sin_addr, addr,
611 sizeof(addr) );
612 #else /* ! HAVE_GETADDRINFO || ! HAVE_INET_NTOP */
613 s = inet_ntoa( ((struct sockaddr_in *)*sal)->sin_addr );
614 #endif /* ! HAVE_GETADDRINFO || ! HAVE_INET_NTOP */
615 if ( !s ) s = SLAP_STRING_UNKNOWN;
616 port = ntohs( ((struct sockaddr_in *)*sal)->sin_port );
617 l.sl_name.bv_val =
618 ch_malloc( sizeof("IP=255.255.255.255:65535") );
619 snprintf( l.sl_name.bv_val,
620 sizeof("IP=255.255.255.255:65535"), "IP=%s:%d", s,
621 port );
622 l.sl_name.bv_len = strlen( l.sl_name.bv_val );
623 } break;
624
625 #ifdef LDAP_PF_INET6
626 case AF_INET6: {
627 char addr[INET6_ADDRSTRLEN];
628 const char *s;
629 s = inet_ntop( AF_INET6,
630 &((struct sockaddr_in6 *)*sal)->sin6_addr, addr,
631 sizeof(addr) );
632 if ( !s ) s = SLAP_STRING_UNKNOWN;
633 port = ntohs( ((struct sockaddr_in6 *)*sal)->sin6_port );
634 l.sl_name.bv_len = strlen( s ) + sizeof("IP=[]:65535");
635 l.sl_name.bv_val = ch_malloc( l.sl_name.bv_len );
636 snprintf( l.sl_name.bv_val, l.sl_name.bv_len, "IP=[%s]:%d", s,
637 port );
638 l.sl_name.bv_len = strlen( l.sl_name.bv_val );
639 } break;
640 #endif /* LDAP_PF_INET6 */
641
642 default:
643 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
644 "unsupported address family (%d)\n",
645 (int)(*sal)->sa_family );
646 break;
647 }
648
649 AC_MEMCPY( &l.sl_sa, *sal, addrlen );
650 ber_str2bv( url, 0, 1, &l.sl_url );
651 li = ch_malloc( sizeof(LloadListener) );
652 *li = l;
653 lload_listeners[*cur] = li;
654 (*cur)++;
655 sal++;
656 }
657
658 lload_free_listener_addresses( psal );
659
660 if ( l.sl_url.bv_val == NULL ) {
661 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
662 "failed on %s\n",
663 url );
664 return -1;
665 }
666
667 Debug( LDAP_DEBUG_TRACE, "lload_open_listener: "
668 "listener initialized %s\n",
669 l.sl_url.bv_val );
670
671 return 0;
672 }
673
674 int
lload_open_new_listener(const char * url,LDAPURLDesc * lud)675 lload_open_new_listener( const char *url, LDAPURLDesc *lud )
676 {
677 int rc, i, j = 0;
678
679 for ( i = 0; lload_listeners && lload_listeners[i] != NULL;
680 i++ ) /* count */
681 ;
682 j = i;
683
684 i++;
685 lload_listeners = ch_realloc(
686 lload_listeners, ( i + 1 ) * sizeof(LloadListener *) );
687
688 rc = lload_open_listener( url, lud, &i, &j );
689 lload_listeners[j] = NULL;
690 return rc;
691 }
692
693 int lloadd_inited = 0;
694
695 int
lloadd_listeners_init(const char * urls)696 lloadd_listeners_init( const char *urls )
697 {
698 int i, j, n;
699 char **u;
700 LDAPURLDesc *lud;
701
702 Debug( LDAP_DEBUG_ARGS, "lloadd_listeners_init: %s\n",
703 urls ? urls : "<null>" );
704
705 #ifdef HAVE_TCPD
706 ldap_pvt_thread_mutex_init( &sd_tcpd_mutex );
707 #endif /* TCP Wrappers */
708
709 if ( urls == NULL ) urls = "ldap:///";
710
711 u = ldap_str2charray( urls, " " );
712
713 if ( u == NULL || u[0] == NULL ) {
714 Debug( LDAP_DEBUG_ANY, "lloadd_listeners_init: "
715 "no urls (%s) provided\n",
716 urls );
717 if ( u ) ldap_charray_free( u );
718 return -1;
719 }
720
721 for ( i = 0; u[i] != NULL; i++ ) {
722 Debug( LDAP_DEBUG_TRACE, "lloadd_listeners_init: "
723 "listen on %s\n",
724 u[i] );
725 }
726
727 if ( i == 0 ) {
728 Debug( LDAP_DEBUG_ANY, "lloadd_listeners_init: "
729 "no listeners to open (%s)\n",
730 urls );
731 ldap_charray_free( u );
732 return -1;
733 }
734
735 Debug( LDAP_DEBUG_TRACE, "lloadd_listeners_init: "
736 "%d listeners to open...\n",
737 i );
738 lload_listeners = ch_malloc( ( i + 1 ) * sizeof(LloadListener *) );
739
740 for ( n = 0, j = 0; u[n]; n++ ) {
741 if ( ldap_url_parse_ext( u[n], &lud, LDAP_PVT_URL_PARSE_DEF_PORT ) ) {
742 Debug( LDAP_DEBUG_ANY, "lloadd_listeners_init: "
743 "could not parse url %s\n",
744 u[n] );
745 ldap_charray_free( u );
746 return -1;
747 }
748
749 if ( lload_open_listener( u[n], lud, &i, &j ) ) {
750 ldap_charray_free( u );
751 return -1;
752 }
753 }
754 lload_listeners[j] = NULL;
755
756 Debug( LDAP_DEBUG_TRACE, "lloadd_listeners_init: "
757 "%d listeners opened\n",
758 i );
759
760 ldap_charray_free( u );
761
762 return !i;
763 }
764
765 int
lloadd_daemon_destroy(void)766 lloadd_daemon_destroy( void )
767 {
768 epoch_shutdown();
769 if ( lloadd_inited ) {
770 int i;
771
772 for ( i = 0; i < lload_daemon_threads; i++ ) {
773 ldap_pvt_thread_mutex_destroy( &lload_daemon[i].sd_mutex );
774 if ( lload_daemon[i].wakeup_event ) {
775 event_free( lload_daemon[i].wakeup_event );
776 }
777 if ( lload_daemon[i].base ) {
778 event_base_free( lload_daemon[i].base );
779 }
780 }
781
782 event_base_free( daemon_base );
783 daemon_base = NULL;
784
785 lloadd_inited = 0;
786 #ifdef HAVE_TCPD
787 ldap_pvt_thread_mutex_destroy( &sd_tcpd_mutex );
788 #endif /* TCP Wrappers */
789 }
790
791 return 0;
792 }
793
794 static void
destroy_listeners(void)795 destroy_listeners( void )
796 {
797 LloadListener *lr, **ll = lload_listeners;
798
799 if ( ll == NULL ) return;
800
801 ldap_pvt_thread_join( listener_tid, (void *)NULL );
802
803 while ( (lr = *ll++) != NULL ) {
804 if ( lr->sl_url.bv_val ) {
805 ber_memfree( lr->sl_url.bv_val );
806 }
807
808 if ( lr->sl_name.bv_val ) {
809 ber_memfree( lr->sl_name.bv_val );
810 }
811
812 #ifdef LDAP_PF_LOCAL
813 if ( lr->sl_sa.sa_addr.sa_family == AF_LOCAL ) {
814 unlink( lr->sl_sa.sa_un_addr.sun_path );
815 }
816 #endif /* LDAP_PF_LOCAL */
817
818 evconnlistener_free( lr->listener );
819
820 free( lr );
821 }
822
823 free( lload_listeners );
824 lload_listeners = NULL;
825
826 if ( listener_base ) {
827 event_base_free( listener_base );
828 }
829 }
830
831 static void
lload_listener(struct evconnlistener * listener,ber_socket_t s,struct sockaddr * a,int len,void * arg)832 lload_listener(
833 struct evconnlistener *listener,
834 ber_socket_t s,
835 struct sockaddr *a,
836 int len,
837 void *arg )
838 {
839 LloadListener *sl = arg;
840 LloadConnection *c;
841 Sockaddr *from = (Sockaddr *)a;
842 char peername[LDAP_IPADDRLEN];
843 struct berval peerbv = BER_BVC(peername);
844 int cflag;
845 int tid;
846 char ebuf[128];
847
848 Debug( LDAP_DEBUG_TRACE, ">>> lload_listener(%s)\n", sl->sl_url.bv_val );
849
850 peername[0] = '\0';
851
852 /* Resume the listener FD to allow concurrent-processing of
853 * additional incoming connections.
854 */
855 sl->sl_busy = 0;
856
857 tid = DAEMON_ID(s);
858
859 Debug( LDAP_DEBUG_CONNS, "lload_listener: "
860 "listen=%ld, new connection fd=%ld\n",
861 (long)sl->sl_sd, (long)s );
862
863 #if defined(SO_KEEPALIVE) || defined(TCP_NODELAY)
864 #ifdef LDAP_PF_LOCAL
865 /* for IPv4 and IPv6 sockets only */
866 if ( from->sa_addr.sa_family != AF_LOCAL )
867 #endif /* LDAP_PF_LOCAL */
868 {
869 int rc;
870 int tmp;
871 #ifdef SO_KEEPALIVE
872 /* enable keep alives */
873 tmp = 1;
874 rc = setsockopt(
875 s, SOL_SOCKET, SO_KEEPALIVE, (char *)&tmp, sizeof(tmp) );
876 if ( rc == AC_SOCKET_ERROR ) {
877 int err = sock_errno();
878 Debug( LDAP_DEBUG_ANY, "lload_listener(%ld): "
879 "setsockopt(SO_KEEPALIVE) failed errno=%d (%s)\n",
880 (long)s, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
881 }
882 #endif /* SO_KEEPALIVE */
883 #ifdef TCP_NODELAY
884 /* enable no delay */
885 tmp = 1;
886 rc = setsockopt(
887 s, IPPROTO_TCP, TCP_NODELAY, (char *)&tmp, sizeof(tmp) );
888 if ( rc == AC_SOCKET_ERROR ) {
889 int err = sock_errno();
890 Debug( LDAP_DEBUG_ANY, "lload_listener(%ld): "
891 "setsockopt(TCP_NODELAY) failed errno=%d (%s)\n",
892 (long)s, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
893 }
894 #endif /* TCP_NODELAY */
895 }
896 #endif /* SO_KEEPALIVE || TCP_NODELAY */
897
898 if ( sl->sl_is_proxied ) {
899 if ( !proxyp( s, from ) ) {
900 Debug( LDAP_DEBUG_ANY, "lload_listener: "
901 "proxyp(%ld) failed\n",
902 (long)s );
903 lloadd_close( s );
904 return;
905 }
906 }
907
908 cflag = 0;
909 switch ( from->sa_addr.sa_family ) {
910 #ifdef LDAP_PF_LOCAL
911 case AF_LOCAL:
912 cflag |= CONN_IS_IPC;
913
914 /* FIXME: apparently accept doesn't fill the sun_path member */
915 sprintf( peername, "PATH=%s", sl->sl_sa.sa_un_addr.sun_path );
916 break;
917 #endif /* LDAP_PF_LOCAL */
918
919 #ifdef LDAP_PF_INET6
920 case AF_INET6:
921 #endif /* LDAP_PF_INET6 */
922 case AF_INET:
923 ldap_pvt_sockaddrstr( from, &peerbv );
924 break;
925
926 default:
927 lloadd_close( s );
928 return;
929 }
930
931 #ifdef HAVE_TLS
932 if ( sl->sl_is_tls ) cflag |= CONN_IS_TLS;
933 #endif
934 c = client_init( s, peername, lload_daemon[tid].base, cflag );
935
936 if ( !c ) {
937 Debug( LDAP_DEBUG_ANY, "lload_listener: "
938 "client_init(%ld, %s, %s) failed\n",
939 (long)s, peername, sl->sl_name.bv_val );
940 lloadd_close( s );
941 }
942
943 return;
944 }
945
946 static void *
lload_listener_thread(void * ctx)947 lload_listener_thread( void *ctx )
948 {
949 int rc = event_base_dispatch( listener_base );
950 Debug( LDAP_DEBUG_ANY, "lload_listener_thread: "
951 "event loop finished: rc=%d\n",
952 rc );
953
954 return (void *)NULL;
955 }
956
957 static void
listener_error_cb(struct evconnlistener * lev,void * arg)958 listener_error_cb( struct evconnlistener *lev, void *arg )
959 {
960 LloadListener *l = arg;
961 int err = EVUTIL_SOCKET_ERROR();
962
963 assert( l->listener == lev );
964 if (
965 #ifdef EMFILE
966 err == EMFILE ||
967 #endif /* EMFILE */
968 #ifdef ENFILE
969 err == ENFILE ||
970 #endif /* ENFILE */
971 0 ) {
972 ldap_pvt_thread_mutex_lock( &lload_daemon[0].sd_mutex );
973 emfile++;
974 /* Stop listening until an existing session closes */
975 l->sl_mute = 1;
976 evconnlistener_disable( lev );
977 ldap_pvt_thread_mutex_unlock( &lload_daemon[0].sd_mutex );
978 Debug( LDAP_DEBUG_ANY, "listener_error_cb: "
979 "too many open files, cannot accept new connections on "
980 "url=%s\n",
981 l->sl_url.bv_val );
982 } else {
983 char ebuf[128];
984 Debug( LDAP_DEBUG_ANY, "listener_error_cb: "
985 "received an error on a listener, shutting down: '%s'\n",
986 sock_errstr( err, ebuf, sizeof(ebuf) ) );
987 event_base_loopexit( l->base, NULL );
988 }
989 }
990
991 void
listeners_reactivate(void)992 listeners_reactivate( void )
993 {
994 int i;
995
996 ldap_pvt_thread_mutex_lock( &lload_daemon[0].sd_mutex );
997 for ( i = 0; emfile && lload_listeners[i] != NULL; i++ ) {
998 LloadListener *lr = lload_listeners[i];
999
1000 if ( lr->sl_sd == AC_SOCKET_INVALID ) continue;
1001 if ( lr->sl_mute ) {
1002 emfile--;
1003 evconnlistener_enable( lr->listener );
1004 lr->sl_mute = 0;
1005 Debug( LDAP_DEBUG_CONNS, "listeners_reactivate: "
1006 "reactivated listener url=%s\n",
1007 lr->sl_url.bv_val );
1008 }
1009 }
1010 if ( emfile && lload_listeners[i] == NULL ) {
1011 /* Walked the entire list without enabling anything; emfile
1012 * counter is stale. Reset it. */
1013 emfile = 0;
1014 }
1015 ldap_pvt_thread_mutex_unlock( &lload_daemon[0].sd_mutex );
1016 }
1017
1018 static int
lload_listener_activate(void)1019 lload_listener_activate( void )
1020 {
1021 struct evconnlistener *listener;
1022 int l, rc;
1023 char ebuf[128];
1024
1025 listener_base = event_base_new();
1026 if ( !listener_base ) return -1;
1027
1028 for ( l = 0; lload_listeners[l] != NULL; l++ ) {
1029 if ( lload_listeners[l]->sl_sd == AC_SOCKET_INVALID ) continue;
1030
1031 /* FIXME: TCP-only! */
1032 #ifdef LDAP_TCP_BUFFER
1033 if ( 1 ) {
1034 int origsize, size, realsize, rc;
1035 socklen_t optlen;
1036
1037 size = 0;
1038 if ( lload_listeners[l]->sl_tcp_rmem > 0 ) {
1039 size = lload_listeners[l]->sl_tcp_rmem;
1040 } else if ( slapd_tcp_rmem > 0 ) {
1041 size = slapd_tcp_rmem;
1042 }
1043
1044 if ( size > 0 ) {
1045 optlen = sizeof(origsize);
1046 rc = getsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1047 SO_RCVBUF, (void *)&origsize, &optlen );
1048
1049 if ( rc ) {
1050 int err = sock_errno();
1051 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1052 "getsockopt(SO_RCVBUF) failed errno=%d (%s)\n",
1053 err, AC_STRERROR_R( err, ebuf, sizeof(ebuf) ) );
1054 }
1055
1056 optlen = sizeof(size);
1057 rc = setsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1058 SO_RCVBUF, (const void *)&size, optlen );
1059
1060 if ( rc ) {
1061 int err = sock_errno();
1062 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1063 "setsockopt(SO_RCVBUF) failed errno=%d (%s)\n",
1064 err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
1065 }
1066
1067 optlen = sizeof(realsize);
1068 rc = getsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1069 SO_RCVBUF, (void *)&realsize, &optlen );
1070
1071 if ( rc ) {
1072 int err = sock_errno();
1073 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1074 "getsockopt(SO_RCVBUF) failed errno=%d (%s)\n",
1075 err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
1076 }
1077
1078 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1079 "url=%s (#%d) RCVBUF original size=%d requested "
1080 "size=%d real size=%d\n",
1081 lload_listeners[l]->sl_url.bv_val, l, origsize, size,
1082 realsize );
1083 }
1084
1085 size = 0;
1086 if ( lload_listeners[l]->sl_tcp_wmem > 0 ) {
1087 size = lload_listeners[l]->sl_tcp_wmem;
1088 } else if ( slapd_tcp_wmem > 0 ) {
1089 size = slapd_tcp_wmem;
1090 }
1091
1092 if ( size > 0 ) {
1093 optlen = sizeof(origsize);
1094 rc = getsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1095 SO_SNDBUF, (void *)&origsize, &optlen );
1096
1097 if ( rc ) {
1098 int err = sock_errno();
1099 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1100 "getsockopt(SO_SNDBUF) failed errno=%d (%s)\n",
1101 err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
1102 }
1103
1104 optlen = sizeof(size);
1105 rc = setsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1106 SO_SNDBUF, (const void *)&size, optlen );
1107
1108 if ( rc ) {
1109 int err = sock_errno();
1110 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1111 "setsockopt(SO_SNDBUF) failed errno=%d (%s)\n",
1112 err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
1113 }
1114
1115 optlen = sizeof(realsize);
1116 rc = getsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1117 SO_SNDBUF, (void *)&realsize, &optlen );
1118
1119 if ( rc ) {
1120 int err = sock_errno();
1121 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1122 "getsockopt(SO_SNDBUF) failed errno=%d (%s)\n",
1123 err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
1124 }
1125
1126 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1127 "url=%s (#%d) SNDBUF original size=%d requested "
1128 "size=%d real size=%d\n",
1129 lload_listeners[l]->sl_url.bv_val, l, origsize, size,
1130 realsize );
1131 }
1132 }
1133 #endif /* LDAP_TCP_BUFFER */
1134
1135 lload_listeners[l]->sl_busy = 1;
1136 listener = evconnlistener_new( listener_base, lload_listener,
1137 lload_listeners[l],
1138 LEV_OPT_THREADSAFE|LEV_OPT_DEFERRED_ACCEPT,
1139 SLAPD_LISTEN_BACKLOG, lload_listeners[l]->sl_sd );
1140 if ( !listener ) {
1141 int err = sock_errno();
1142
1143 #ifdef LDAP_PF_INET6
1144 /* If error is EADDRINUSE, we are trying to listen to INADDR_ANY and
1145 * we are already listening to in6addr_any, then we want to ignore
1146 * this and continue.
1147 */
1148 if ( err == EADDRINUSE ) {
1149 int i;
1150 struct sockaddr_in sa = lload_listeners[l]->sl_sa.sa_in_addr;
1151 struct sockaddr_in6 sa6;
1152
1153 if ( sa.sin_family == AF_INET &&
1154 sa.sin_addr.s_addr == htonl( INADDR_ANY ) ) {
1155 for ( i = 0; i < l; i++ ) {
1156 sa6 = lload_listeners[i]->sl_sa.sa_in6_addr;
1157 if ( sa6.sin6_family == AF_INET6 &&
1158 !memcmp( &sa6.sin6_addr, &in6addr_any,
1159 sizeof(struct in6_addr) ) ) {
1160 break;
1161 }
1162 }
1163
1164 if ( i < l ) {
1165 /* We are already listening to in6addr_any */
1166 Debug( LDAP_DEBUG_CONNS, "lload_listener_activate: "
1167 "Attempt to listen to 0.0.0.0 failed, "
1168 "already listening on ::, assuming IPv4 "
1169 "included\n" );
1170 lloadd_close( lload_listeners[l]->sl_sd );
1171 lload_listeners[l]->sl_sd = AC_SOCKET_INVALID;
1172 continue;
1173 }
1174 }
1175 }
1176 #endif /* LDAP_PF_INET6 */
1177 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1178 "listen(%s, 5) failed errno=%d (%s)\n",
1179 lload_listeners[l]->sl_url.bv_val, err,
1180 sock_errstr( err, ebuf, sizeof(ebuf) ) );
1181 return -1;
1182 }
1183
1184 lload_listeners[l]->base = listener_base;
1185 lload_listeners[l]->listener = listener;
1186 evconnlistener_set_error_cb( listener, listener_error_cb );
1187 }
1188
1189 rc = ldap_pvt_thread_create(
1190 &listener_tid, 0, lload_listener_thread, lload_listeners[l] );
1191
1192 if ( rc != 0 ) {
1193 Debug( LDAP_DEBUG_ANY, "lload_listener_activate(%d): "
1194 "submit failed (%d)\n",
1195 lload_listeners[l]->sl_sd, rc );
1196 }
1197 return rc;
1198 }
1199
1200 static void *
lloadd_io_task(void * ptr)1201 lloadd_io_task( void *ptr )
1202 {
1203 int rc;
1204 int tid = (ldap_pvt_thread_t *)ptr - daemon_tid;
1205 struct event_base *base = lload_daemon[tid].base;
1206 struct event *event;
1207
1208 event = event_new( base, -1, EV_WRITE, daemon_wakeup_cb, ptr );
1209 if ( !event ) {
1210 Debug( LDAP_DEBUG_ANY, "lloadd_io_task: "
1211 "failed to set up the wakeup event\n" );
1212 return (void *)-1;
1213 }
1214 event_add( event, NULL );
1215 lload_daemon[tid].wakeup_event = event;
1216
1217 /* run */
1218 rc = event_base_dispatch( base );
1219 Debug( LDAP_DEBUG_ANY, "lloadd_io_task: "
1220 "Daemon %d, event loop finished: rc=%d\n",
1221 tid, rc );
1222
1223 if ( !slapd_gentle_shutdown ) {
1224 slapd_abrupt_shutdown = 1;
1225 }
1226
1227 return NULL;
1228 }
1229
1230 int
lloadd_daemon(struct event_base * daemon_base)1231 lloadd_daemon( struct event_base *daemon_base )
1232 {
1233 int i, rc;
1234 LloadBackend *b;
1235 struct event_base *base;
1236 struct event *event;
1237
1238 assert( daemon_base != NULL );
1239
1240 dnsbase = evdns_base_new( daemon_base, EVDNS_BASE_INITIALIZE_NAMESERVERS );
1241 if ( !dnsbase ) {
1242 Debug( LDAP_DEBUG_ANY, "lloadd startup: "
1243 "failed to set up for async name resolution\n" );
1244 return -1;
1245 }
1246
1247 if ( lload_daemon_threads > SLAPD_MAX_DAEMON_THREADS )
1248 lload_daemon_threads = SLAPD_MAX_DAEMON_THREADS;
1249
1250 daemon_tid =
1251 ch_malloc( lload_daemon_threads * sizeof(ldap_pvt_thread_t) );
1252
1253 for ( i = 0; i < lload_daemon_threads; i++ ) {
1254 base = event_base_new();
1255 if ( !base ) {
1256 Debug( LDAP_DEBUG_ANY, "lloadd startup: "
1257 "failed to acquire event base for an I/O thread\n" );
1258 return -1;
1259 }
1260 lload_daemon[i].base = base;
1261
1262 ldap_pvt_thread_mutex_init( &lload_daemon[i].sd_mutex );
1263 /* threads that handle client and upstream sockets */
1264 rc = ldap_pvt_thread_create(
1265 &daemon_tid[i], 0, lloadd_io_task, &daemon_tid[i] );
1266
1267 if ( rc != 0 ) {
1268 Debug( LDAP_DEBUG_ANY, "lloadd startup: "
1269 "listener ldap_pvt_thread_create failed (%d)\n",
1270 rc );
1271 return rc;
1272 }
1273 }
1274
1275 if ( (rc = lload_listener_activate()) != 0 ) {
1276 return rc;
1277 }
1278
1279 if ( !LDAP_CIRCLEQ_EMPTY( &backend ) ) {
1280 current_backend = LDAP_CIRCLEQ_FIRST( &backend );
1281 LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
1282 event = evtimer_new( daemon_base, backend_connect, b );
1283 if ( !event ) {
1284 Debug( LDAP_DEBUG_ANY, "lloadd: "
1285 "failed to allocate retry event\n" );
1286 return -1;
1287 }
1288
1289 checked_lock( &b->b_mutex );
1290 b->b_retry_event = event;
1291 backend_retry( b );
1292 checked_unlock( &b->b_mutex );
1293 }
1294 }
1295
1296 event = evtimer_new( daemon_base, operations_timeout, event_self_cbarg() );
1297 if ( !event ) {
1298 Debug( LDAP_DEBUG_ANY, "lloadd: "
1299 "failed to allocate timeout event\n" );
1300 return -1;
1301 }
1302 lload_timeout_event = event;
1303
1304 /* TODO: should we just add it with any timeout and re-add when the timeout
1305 * changes? */
1306 if ( lload_timeout_api ) {
1307 event_add( event, lload_timeout_api );
1308 }
1309
1310 checked_lock( &lload_wait_mutex );
1311 lloadd_inited = 1;
1312 ldap_pvt_thread_cond_signal( &lload_wait_cond );
1313 checked_unlock( &lload_wait_mutex );
1314 #if !defined(BALANCER_MODULE) && defined(HAVE_SYSTEMD)
1315 rc = sd_notify( 1, "READY=1" );
1316 if ( rc < 0 ) {
1317 Debug( LDAP_DEBUG_ANY, "lloadd startup: "
1318 "systemd sd_notify failed (%d)\n", rc );
1319 }
1320 #endif /* !BALANCER_MODULE && HAVE_SYSTEMD */
1321
1322 rc = event_base_dispatch( daemon_base );
1323 Debug( LDAP_DEBUG_ANY, "lloadd shutdown: "
1324 "Main event loop finished: rc=%d\n",
1325 rc );
1326
1327 /* shutdown */
1328 event_base_loopexit( listener_base, 0 );
1329
1330 /* wait for the listener threads to complete */
1331 destroy_listeners();
1332
1333 /* Mark upstream connections closing and prevent from opening new ones */
1334 LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
1335 epoch_t epoch = epoch_join();
1336
1337 checked_lock( &b->b_mutex );
1338 b->b_numconns = b->b_numbindconns = 0;
1339 backend_reset( b, 1 );
1340 checked_unlock( &b->b_mutex );
1341
1342 epoch_leave( epoch );
1343 }
1344
1345 /* Do the same for clients */
1346 clients_destroy( 1 );
1347
1348 for ( i = 0; i < lload_daemon_threads; i++ ) {
1349 /*
1350 * https://github.com/libevent/libevent/issues/623
1351 * deleting the event doesn't notify the base, just activate it and
1352 * let it delete itself
1353 */
1354 event_active( lload_daemon[i].wakeup_event, EV_READ, 0 );
1355 }
1356
1357 for ( i = 0; i < lload_daemon_threads; i++ ) {
1358 ldap_pvt_thread_join( daemon_tid[i], (void *)NULL );
1359 }
1360
1361 #ifndef BALANCER_MODULE
1362 if ( LogTest( LDAP_DEBUG_ANY ) ) {
1363 int t = ldap_pvt_thread_pool_backload( &connection_pool );
1364 Debug( LDAP_DEBUG_ANY, "lloadd shutdown: "
1365 "waiting for %d operations/tasks to finish\n",
1366 t );
1367 }
1368 ldap_pvt_thread_pool_close( &connection_pool, 1 );
1369 #endif
1370
1371 lload_backends_destroy();
1372 clients_destroy( 0 );
1373 lload_bindconf_free( &bindconf );
1374 evdns_base_free( dnsbase, 0 );
1375
1376 ch_free( daemon_tid );
1377 daemon_tid = NULL;
1378
1379 lloadd_daemon_destroy();
1380
1381 /* If we're a slapd module, let the thread that initiated the shut down
1382 * know we've finished */
1383 checked_lock( &lload_wait_mutex );
1384 ldap_pvt_thread_cond_signal( &lload_wait_cond );
1385 checked_unlock( &lload_wait_mutex );
1386
1387 return 0;
1388 }
1389
1390 static void
daemon_wakeup_cb(evutil_socket_t sig,short what,void * arg)1391 daemon_wakeup_cb( evutil_socket_t sig, short what, void *arg )
1392 {
1393 int tid = (ldap_pvt_thread_t *)arg - daemon_tid;
1394
1395 Debug( LDAP_DEBUG_TRACE, "daemon_wakeup_cb: "
1396 "Daemon thread %d woken up\n",
1397 tid );
1398 event_del( lload_daemon[tid].wakeup_event );
1399 }
1400
1401 LloadChange lload_change = { .type = LLOAD_CHANGE_UNDEFINED };
1402
1403 #ifdef BALANCER_MODULE
1404 int
backend_conn_cb(ldap_pvt_thread_start_t * start,void * startarg,void * arg)1405 backend_conn_cb( ldap_pvt_thread_start_t *start, void *startarg, void *arg )
1406 {
1407 LloadConnection *c = startarg;
1408 LloadBackend *b = arg;
1409
1410 if ( b == NULL || c->c_backend == b ) {
1411 CONNECTION_LOCK_DESTROY(c);
1412 return 1;
1413 }
1414 return 0;
1415 }
1416
1417 #ifdef HAVE_TLS
1418 int
client_tls_cb(ldap_pvt_thread_start_t * start,void * startarg,void * arg)1419 client_tls_cb( ldap_pvt_thread_start_t *start, void *startarg, void *arg )
1420 {
1421 LloadConnection *c = startarg;
1422
1423 if ( c->c_destroy == client_destroy &&
1424 c->c_is_tls == LLOAD_TLS_ESTABLISHED ) {
1425 CONNECTION_LOCK_DESTROY(c);
1426 return 1;
1427 }
1428 return 0;
1429 }
1430 #endif /* HAVE_TLS */
1431
1432 void
lload_handle_backend_invalidation(LloadChange * change)1433 lload_handle_backend_invalidation( LloadChange *change )
1434 {
1435 LloadBackend *b = change->target;
1436
1437 assert( change->object == LLOAD_BACKEND );
1438
1439 if ( change->type == LLOAD_CHANGE_ADD ) {
1440 BackendInfo *mi = backend_info( "monitor" );
1441
1442 if ( mi ) {
1443 monitor_extra_t *mbe = mi->bi_extra;
1444 if ( mbe->is_configured() ) {
1445 lload_monitor_backend_init( mi, b );
1446 }
1447 }
1448
1449 if ( !current_backend ) {
1450 current_backend = b;
1451 }
1452 checked_lock( &b->b_mutex );
1453 backend_retry( b );
1454 checked_unlock( &b->b_mutex );
1455 return;
1456 } else if ( change->type == LLOAD_CHANGE_DEL ) {
1457 ldap_pvt_thread_pool_walk(
1458 &connection_pool, handle_pdus, backend_conn_cb, b );
1459 ldap_pvt_thread_pool_walk(
1460 &connection_pool, upstream_bind, backend_conn_cb, b );
1461 lload_backend_destroy( b );
1462 return;
1463 }
1464 assert( change->type == LLOAD_CHANGE_MODIFY );
1465
1466 /*
1467 * A change that can't be handled gracefully, terminate all connections and
1468 * start over.
1469 */
1470 if ( change->flags.backend & LLOAD_BACKEND_MOD_OTHER ) {
1471 ldap_pvt_thread_pool_walk(
1472 &connection_pool, handle_pdus, backend_conn_cb, b );
1473 ldap_pvt_thread_pool_walk(
1474 &connection_pool, upstream_bind, backend_conn_cb, b );
1475 checked_lock( &b->b_mutex );
1476 backend_reset( b, 0 );
1477 backend_retry( b );
1478 checked_unlock( &b->b_mutex );
1479 return;
1480 }
1481
1482 /*
1483 * Handle changes to number of connections:
1484 * - a change might get the connection limit above the pool size:
1485 * - consider closing (in order of priority?):
1486 * - connections awaiting connect() completion
1487 * - connections currently preparing
1488 * - bind connections over limit (which is 0 if 'feature vc' is on
1489 * - regular connections over limit
1490 * - below pool size
1491 * - call backend_retry if there are no opening connections
1492 * - one pool size above and one below the configured size
1493 * - still close the ones above limit, it should sort itself out
1494 * the only issue is if a closing connection isn't guaranteed to do
1495 * that at some point
1496 */
1497 if ( change->flags.backend & LLOAD_BACKEND_MOD_CONNS ) {
1498 int bind_requested = 0, need_close = 0, need_open = 0;
1499 LloadConnection *c;
1500
1501 bind_requested =
1502 #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
1503 (lload_features & LLOAD_FEATURE_VC) ? 0 :
1504 #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
1505 b->b_numbindconns;
1506
1507 if ( b->b_bindavail > bind_requested ) {
1508 need_close += b->b_bindavail - bind_requested;
1509 } else if ( b->b_bindavail < bind_requested ) {
1510 need_open = 1;
1511 }
1512
1513 if ( b->b_active > b->b_numconns ) {
1514 need_close += b->b_active - b->b_numconns;
1515 } else if ( b->b_active < b->b_numconns ) {
1516 need_open = 1;
1517 }
1518
1519 if ( !need_open ) {
1520 need_close += b->b_opening;
1521
1522 while ( !LDAP_LIST_EMPTY( &b->b_connecting ) ) {
1523 LloadPendingConnection *p = LDAP_LIST_FIRST( &b->b_connecting );
1524
1525 LDAP_LIST_REMOVE( p, next );
1526 event_free( p->event );
1527 evutil_closesocket( p->fd );
1528 ch_free( p );
1529 b->b_opening--;
1530 need_close--;
1531 }
1532 }
1533
1534 if ( need_close || !need_open ) {
1535 /* It might be too late to repurpose a preparing connection, just
1536 * close them all */
1537 while ( !LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) ) {
1538 c = LDAP_CIRCLEQ_FIRST( &b->b_preparing );
1539
1540 event_del( c->c_read_event );
1541 CONNECTION_LOCK_DESTROY(c);
1542 assert( c == NULL );
1543 b->b_opening--;
1544 need_close--;
1545 }
1546 if ( event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
1547 event_del( b->b_retry_event );
1548 b->b_opening--;
1549 }
1550 assert( b->b_opening == 0 );
1551 }
1552
1553 if ( b->b_bindavail > bind_requested ) {
1554 int diff = b->b_bindavail - bind_requested;
1555
1556 assert( need_close >= diff );
1557
1558 LDAP_CIRCLEQ_FOREACH ( c, &b->b_bindconns, c_next ) {
1559 int gentle = 1;
1560
1561 lload_connection_close( c, &gentle );
1562 need_close--;
1563 diff--;
1564 if ( !diff ) {
1565 break;
1566 }
1567 }
1568 assert( diff == 0 );
1569 }
1570
1571 if ( b->b_active > b->b_numconns ) {
1572 int diff = b->b_active - b->b_numconns;
1573
1574 assert( need_close >= diff );
1575
1576 LDAP_CIRCLEQ_FOREACH ( c, &b->b_conns, c_next ) {
1577 int gentle = 1;
1578
1579 lload_connection_close( c, &gentle );
1580 need_close--;
1581 diff--;
1582 if ( !diff ) {
1583 break;
1584 }
1585 }
1586 assert( diff == 0 );
1587 }
1588 assert( need_close == 0 );
1589
1590 if ( need_open ) {
1591 checked_lock( &b->b_mutex );
1592 backend_retry( b );
1593 checked_unlock( &b->b_mutex );
1594 }
1595 }
1596 }
1597
1598 void
lload_handle_global_invalidation(LloadChange * change)1599 lload_handle_global_invalidation( LloadChange *change )
1600 {
1601 assert( change->type == LLOAD_CHANGE_MODIFY );
1602 assert( change->object == LLOAD_DAEMON );
1603
1604 if ( change->flags.daemon & LLOAD_DAEMON_MOD_THREADS ) {
1605 /* walk the task queue to remove any tasks belonging to us. */
1606 /* TODO: initiate a full module restart, everything will fall into
1607 * place at that point */
1608 ldap_pvt_thread_pool_walk(
1609 &connection_pool, handle_pdus, backend_conn_cb, NULL );
1610 ldap_pvt_thread_pool_walk(
1611 &connection_pool, upstream_bind, backend_conn_cb, NULL );
1612 assert(0);
1613 return;
1614 }
1615
1616 if ( change->flags.daemon & LLOAD_DAEMON_MOD_FEATURES ) {
1617 lload_features_t feature_diff =
1618 lload_features ^ ( ~(uintptr_t)change->target );
1619 /* Feature change handling:
1620 * - VC (TODO):
1621 * - on: terminate all bind connections
1622 * - off: cancel all bind operations in progress, reopen bind connections
1623 * - ProxyAuthz:
1624 * - on: nothing needed
1625 * - off: clear c_auth/privileged on each client
1626 * - read pause (WIP):
1627 * - nothing needed?
1628 */
1629
1630 assert( change->target );
1631 if ( feature_diff & LLOAD_FEATURE_VC ) {
1632 assert(0);
1633 feature_diff &= ~LLOAD_FEATURE_VC;
1634 }
1635 if ( feature_diff & LLOAD_FEATURE_PAUSE ) {
1636 feature_diff &= ~LLOAD_FEATURE_PAUSE;
1637 }
1638 if ( feature_diff & LLOAD_FEATURE_PROXYAUTHZ ) {
1639 if ( !(lload_features & LLOAD_FEATURE_PROXYAUTHZ) ) {
1640 LloadConnection *c;
1641 /* We switched proxyauthz off */
1642 LDAP_CIRCLEQ_FOREACH ( c, &clients, c_next ) {
1643 if ( !BER_BVISNULL( &c->c_auth ) ) {
1644 ber_memfree( c->c_auth.bv_val );
1645 BER_BVZERO( &c->c_auth );
1646 }
1647 if ( c->c_type == LLOAD_C_PRIVILEGED ) {
1648 c->c_type = LLOAD_C_OPEN;
1649 }
1650 }
1651 }
1652 feature_diff &= ~LLOAD_FEATURE_PROXYAUTHZ;
1653 }
1654 assert( !feature_diff );
1655 }
1656
1657 #ifdef HAVE_TLS
1658 if ( change->flags.daemon & LLOAD_DAEMON_MOD_TLS ) {
1659 /* terminate all clients with TLS set up */
1660 ldap_pvt_thread_pool_walk(
1661 &connection_pool, handle_pdus, client_tls_cb, NULL );
1662 if ( !LDAP_CIRCLEQ_EMPTY( &clients ) ) {
1663 LloadConnection *c = LDAP_CIRCLEQ_FIRST( &clients );
1664 unsigned long first_connid = c->c_connid;
1665
1666 while ( c ) {
1667 LloadConnection *next =
1668 LDAP_CIRCLEQ_LOOP_NEXT( &clients, c, c_next );
1669 if ( c->c_is_tls ) {
1670 CONNECTION_LOCK_DESTROY(c);
1671 assert( c == NULL );
1672 }
1673 c = next;
1674 if ( c->c_connid <= first_connid ) {
1675 c = NULL;
1676 }
1677 }
1678 }
1679 }
1680 #endif /* HAVE_TLS */
1681
1682 if ( change->flags.daemon & LLOAD_DAEMON_MOD_BINDCONF ) {
1683 LloadBackend *b;
1684 LloadConnection *c;
1685
1686 /*
1687 * Only timeout changes can be handled gracefully, terminate all
1688 * connections and start over.
1689 */
1690 ldap_pvt_thread_pool_walk(
1691 &connection_pool, handle_pdus, backend_conn_cb, NULL );
1692 ldap_pvt_thread_pool_walk(
1693 &connection_pool, upstream_bind, backend_conn_cb, NULL );
1694
1695 LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
1696 checked_lock( &b->b_mutex );
1697 backend_reset( b, 0 );
1698 backend_retry( b );
1699 checked_unlock( &b->b_mutex );
1700 }
1701
1702 /* Reconsider the PRIVILEGED flag on all clients */
1703 LDAP_CIRCLEQ_FOREACH ( c, &clients, c_next ) {
1704 int privileged = ber_bvstrcasecmp( &c->c_auth, &lloadd_identity );
1705
1706 /* We have just terminated all pending operations (even pins), there
1707 * should be no connections still binding/closing */
1708 assert( c->c_state == LLOAD_C_READY );
1709
1710 c->c_type = privileged ? LLOAD_C_PRIVILEGED : LLOAD_C_OPEN;
1711 }
1712 }
1713 }
1714
1715 int
lload_handle_invalidation(LloadChange * change)1716 lload_handle_invalidation( LloadChange *change )
1717 {
1718 if ( (change->type == LLOAD_CHANGE_MODIFY) &&
1719 change->flags.generic == 0 ) {
1720 Debug( LDAP_DEBUG_ANY, "lload_handle_invalidation: "
1721 "a modify where apparently nothing changed\n" );
1722 }
1723
1724 switch ( change->object ) {
1725 case LLOAD_BACKEND:
1726 lload_handle_backend_invalidation( change );
1727 break;
1728 case LLOAD_DAEMON:
1729 lload_handle_global_invalidation( change );
1730 break;
1731 default:
1732 Debug( LDAP_DEBUG_ANY, "lload_handle_invalidation: "
1733 "unrecognised change\n" );
1734 assert(0);
1735 }
1736
1737 return LDAP_SUCCESS;
1738 }
1739
1740 static void
lload_pause_event_cb(evutil_socket_t s,short what,void * arg)1741 lload_pause_event_cb( evutil_socket_t s, short what, void *arg )
1742 {
1743 /*
1744 * We are pausing, signal the pausing thread we've finished and
1745 * wait until the thread pool resumes operation.
1746 *
1747 * Do this in lockstep with the pausing thread.
1748 */
1749 checked_lock( &lload_wait_mutex );
1750 ldap_pvt_thread_cond_signal( &lload_wait_cond );
1751
1752 /* Now wait until we unpause, then we can resume operation */
1753 ldap_pvt_thread_cond_wait( &lload_pause_cond, &lload_wait_mutex );
1754 checked_unlock( &lload_wait_mutex );
1755 }
1756
1757 /*
1758 * Signal the event base to terminate processing as soon as it can and wait for
1759 * lload_pause_event_cb to notify us this has happened.
1760 */
1761 static int
lload_pause_base(struct event_base * base)1762 lload_pause_base( struct event_base *base )
1763 {
1764 int rc;
1765
1766 checked_lock( &lload_wait_mutex );
1767 event_base_once( base, -1, EV_TIMEOUT, lload_pause_event_cb, base, NULL );
1768 rc = ldap_pvt_thread_cond_wait( &lload_wait_cond, &lload_wait_mutex );
1769 checked_unlock( &lload_wait_mutex );
1770
1771 return rc;
1772 }
1773
1774 void
lload_pause_server(void)1775 lload_pause_server( void )
1776 {
1777 LloadChange ch = { .type = LLOAD_CHANGE_UNDEFINED };
1778 int i;
1779
1780 lload_pause_base( listener_base );
1781 lload_pause_base( daemon_base );
1782
1783 for ( i = 0; i < lload_daemon_threads; i++ ) {
1784 lload_pause_base( lload_daemon[i].base );
1785 }
1786
1787 lload_change = ch;
1788 }
1789
1790 void
lload_unpause_server(void)1791 lload_unpause_server( void )
1792 {
1793 if ( lload_change.type != LLOAD_CHANGE_UNDEFINED ) {
1794 lload_handle_invalidation( &lload_change );
1795 }
1796
1797 /*
1798 * Make sure lloadd is completely ready to unpause by now:
1799 *
1800 * After the broadcast, we handle I/O and begin filling the thread pool, in
1801 * high load conditions, we might hit the pool limits and start processing
1802 * operations in the I/O threads (one PDU per socket at a time for fairness
1803 * sake) even before a pause has finished from slapd's point of view!
1804 *
1805 * When (max_pdus_per_cycle == 0) we don't use the pool for these at all and
1806 * most lload processing starts immediately making this even more prominent.
1807 */
1808 ldap_pvt_thread_cond_broadcast( &lload_pause_cond );
1809 }
1810 #endif /* BALANCER_MODULE */
1811
1812 void
lload_sig_shutdown(evutil_socket_t sig,short what,void * arg)1813 lload_sig_shutdown( evutil_socket_t sig, short what, void *arg )
1814 {
1815 struct event_base *daemon_base = arg;
1816 int save_errno = errno;
1817 int i;
1818
1819 /*
1820 * If the NT Service Manager is controlling the server, we don't
1821 * want SIGBREAK to kill the server. For some strange reason,
1822 * SIGBREAK is generated when a user logs out.
1823 */
1824
1825 #if defined(HAVE_NT_SERVICE_MANAGER) && defined(SIGBREAK)
1826 if ( is_NT_Service && sig == SIGBREAK ) {
1827 /* empty */;
1828 } else
1829 #endif /* HAVE_NT_SERVICE_MANAGER && SIGBREAK */
1830 #ifdef SIGHUP
1831 if ( sig == SIGHUP && global_gentlehup && slapd_gentle_shutdown == 0 ) {
1832 slapd_gentle_shutdown = 1;
1833 } else
1834 #endif /* SIGHUP */
1835 {
1836 slapd_shutdown = 1;
1837 }
1838
1839 for ( i = 0; i < lload_daemon_threads; i++ ) {
1840 event_base_loopexit( lload_daemon[i].base, NULL );
1841 }
1842 event_base_loopexit( daemon_base, NULL );
1843
1844 errno = save_errno;
1845 }
1846
1847 struct event_base *
lload_get_base(ber_socket_t s)1848 lload_get_base( ber_socket_t s )
1849 {
1850 int tid = DAEMON_ID(s);
1851 return lload_daemon[tid].base;
1852 }
1853
1854 LloadListener **
lloadd_get_listeners(void)1855 lloadd_get_listeners( void )
1856 {
1857 /* Could return array with no listeners if !listening, but current
1858 * callers mostly look at the URLs. E.g. syncrepl uses this to
1859 * identify the server, which means it wants the startup arguments.
1860 */
1861 return lload_listeners;
1862 }
1863
1864 /* Reject all incoming requests */
1865 void
lload_suspend_listeners(void)1866 lload_suspend_listeners( void )
1867 {
1868 int i;
1869 for ( i = 0; lload_listeners[i]; i++ ) {
1870 lload_listeners[i]->sl_mute = 1;
1871 evconnlistener_disable( lload_listeners[i]->listener );
1872 listen( lload_listeners[i]->sl_sd, 0 );
1873 }
1874 }
1875
1876 /* Resume after a suspend */
1877 void
lload_resume_listeners(void)1878 lload_resume_listeners( void )
1879 {
1880 int i;
1881 for ( i = 0; lload_listeners[i]; i++ ) {
1882 lload_listeners[i]->sl_mute = 0;
1883 listen( lload_listeners[i]->sl_sd, SLAPD_LISTEN_BACKLOG );
1884 evconnlistener_enable( lload_listeners[i]->listener );
1885 }
1886 }
1887