1 /* $OpenLDAP$ */
2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
3  *
4  * Copyright 1998-2021 The OpenLDAP Foundation.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted only as authorized by the OpenLDAP
9  * Public License.
10  *
11  * A copy of this license is available in the file LICENSE in the
12  * top-level directory of the distribution or, alternatively, at
13  * <http://www.OpenLDAP.org/license.html>.
14  */
15 /* Portions Copyright (c) 1995 Regents of the University of Michigan.
16  * All rights reserved.
17  *
18  * Redistribution and use in source and binary forms are permitted
19  * provided that this notice is preserved and that due credit is given
20  * to the University of Michigan at Ann Arbor. The name of the University
21  * may not be used to endorse or promote products derived from this
22  * software without specific prior written permission. This software
23  * is provided ``as is'' without express or implied warranty.
24  */
25 
26 #include "portable.h"
27 
28 #include <stdio.h>
29 #ifdef HAVE_LIMITS_H
30 #include <limits.h>
31 #endif
32 
33 #include <ac/socket.h>
34 #include <ac/errno.h>
35 #include <ac/string.h>
36 #include <ac/time.h>
37 #include <ac/unistd.h>
38 
39 #include "lload.h"
40 
41 #include "lutil.h"
42 #include "lutil_ldap.h"
43 
44 static unsigned long conn_nextid = 0;
45 
46 static void
lload_connection_assign_nextid(LloadConnection * conn)47 lload_connection_assign_nextid( LloadConnection *conn )
48 {
49     conn->c_connid = __atomic_fetch_add( &conn_nextid, 1, __ATOMIC_RELAXED );
50 }
51 
52 /*
53  * We start off with the connection muted and c_currentber holding the pdu we
54  * received.
55  *
56  * We run c->c_pdu_cb for each pdu, stopping once we hit an error, have to wait
57  * on reading or after we process lload_conn_max_pdus_per_cycle pdus so as to
58  * maintain fairness and not hog the worker thread forever.
59  *
60  * If we've run out of pdus immediately available from the stream or hit the
61  * budget, we unmute the connection.
62  *
63  * c->c_pdu_cb might return an 'error' and not free the connection. That can
64  * happen when changing the state or when client is blocked on writing and
65  * already has a pdu pending on the same operation, it's their job to make sure
66  * we're woken up again.
67  */
68 void *
handle_pdus(void * ctx,void * arg)69 handle_pdus( void *ctx, void *arg )
70 {
71     LloadConnection *c = arg;
72     int pdus_handled = 0;
73     epoch_t epoch;
74 
75     /* A reference was passed on to us */
76     assert( IS_ALIVE( c, c_refcnt ) );
77 
78     epoch = epoch_join();
79     for ( ;; ) {
80         BerElement *ber;
81         ber_tag_t tag;
82         ber_len_t len;
83 
84         if ( c->c_pdu_cb( c ) ) {
85             /* Error/reset, get rid ouf our reference and bail */
86             goto done;
87         }
88 
89         if ( !IS_ALIVE( c, c_live ) ) {
90             break;
91         }
92 
93         if ( ++pdus_handled >= lload_conn_max_pdus_per_cycle ) {
94             /* Do not read now, re-enable read event instead */
95             break;
96         }
97 
98         ber = c->c_currentber;
99         if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
100             Debug( LDAP_DEBUG_ANY, "handle_pdus: "
101                     "connid=%lu, ber_alloc failed\n",
102                     c->c_connid );
103             CONNECTION_LOCK_DESTROY(c);
104             goto done;
105         }
106         c->c_currentber = ber;
107 
108         checked_lock( &c->c_io_mutex );
109         if ( (lload_features & LLOAD_FEATURE_PAUSE) &&
110                 (c->c_io_state & LLOAD_C_READ_PAUSE) ) {
111             goto pause;
112         }
113         tag = ber_get_next( c->c_sb, &len, ber );
114         checked_unlock( &c->c_io_mutex );
115         if ( tag != LDAP_TAG_MESSAGE ) {
116             int err = sock_errno();
117 
118             if ( err != EWOULDBLOCK && err != EAGAIN ) {
119                 if ( err || tag == LBER_ERROR ) {
120                     char ebuf[128];
121                     Debug( LDAP_DEBUG_ANY, "handle_pdus: "
122                             "ber_get_next on fd=%d failed errno=%d (%s)\n",
123                             c->c_fd, err,
124                             sock_errstr( err, ebuf, sizeof(ebuf) ) );
125                 } else {
126                     Debug( LDAP_DEBUG_STATS, "handle_pdus: "
127                             "ber_get_next on fd=%d connid=%lu received "
128                             "a strange PDU tag=%lx\n",
129                             c->c_fd, c->c_connid, tag );
130                 }
131 
132                 c->c_currentber = NULL;
133                 ber_free( ber, 1 );
134                 CONNECTION_LOCK_DESTROY(c);
135                 goto done;
136             }
137             break;
138         }
139 
140         assert( IS_ALIVE( c, c_refcnt ) );
141         epoch_leave( epoch );
142         epoch = epoch_join();
143         assert( IS_ALIVE( c, c_refcnt ) );
144     }
145 
146     checked_lock( &c->c_io_mutex );
147     if ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
148             !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
149         event_add( c->c_read_event, c->c_read_timeout );
150         Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
151                 "re-enabled read event on connid=%lu\n",
152                 c->c_connid );
153     }
154 pause:
155     c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
156     checked_unlock( &c->c_io_mutex );
157 
158 done:
159     RELEASE_REF( c, c_refcnt, c->c_destroy );
160     epoch_leave( epoch );
161     return NULL;
162 }
163 
164 /*
165  * Initial read on the connection, if we get an LDAP PDU, submit the
166  * processing of this and successive ones to the work queue.
167  *
168  * If we can't submit it to the queue (overload), process this one and return
169  * to the event loop immediately after.
170  */
171 void
connection_read_cb(evutil_socket_t s,short what,void * arg)172 connection_read_cb( evutil_socket_t s, short what, void *arg )
173 {
174     LloadConnection *c = arg;
175     BerElement *ber;
176     ber_tag_t tag;
177     ber_len_t len;
178     epoch_t epoch;
179     int pause;
180 
181     if ( !IS_ALIVE( c, c_live ) ) {
182         event_del( c->c_read_event );
183         Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
184                 "suspended read event on a dead connid=%lu\n",
185                 c->c_connid );
186         return;
187     }
188 
189     if ( what & EV_TIMEOUT ) {
190         Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
191                 "connid=%lu, timeout reached, destroying\n",
192                 c->c_connid );
193         /* Make sure the connection stays around for us to unlock it */
194         epoch = epoch_join();
195         CONNECTION_LOCK_DESTROY(c);
196         epoch_leave( epoch );
197         return;
198     }
199 
200     if ( !acquire_ref( &c->c_refcnt ) ) {
201         return;
202     }
203     epoch = epoch_join();
204 
205     Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
206             "connection connid=%lu ready to read\n",
207             c->c_connid );
208 
209     ber = c->c_currentber;
210     if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
211         Debug( LDAP_DEBUG_ANY, "connection_read_cb: "
212                 "connid=%lu, ber_alloc failed\n",
213                 c->c_connid );
214         goto out;
215     }
216     c->c_currentber = ber;
217 
218     checked_lock( &c->c_io_mutex );
219     assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) );
220     tag = ber_get_next( c->c_sb, &len, ber );
221     pause = c->c_io_state & LLOAD_C_READ_PAUSE;
222     checked_unlock( &c->c_io_mutex );
223 
224     if ( tag != LDAP_TAG_MESSAGE ) {
225         int err = sock_errno();
226 
227         if ( err != EWOULDBLOCK && err != EAGAIN ) {
228             if ( err || tag == LBER_ERROR ) {
229                 char ebuf[128];
230                 Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
231                         "ber_get_next on fd=%d failed errno=%d (%s)\n",
232                         c->c_fd, err,
233                         sock_errstr( err, ebuf, sizeof(ebuf) ) );
234             } else {
235                 Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
236                         "ber_get_next on fd=%d connid=%lu received "
237                         "a strange PDU tag=%lx\n",
238                         c->c_fd, c->c_connid, tag );
239             }
240 
241             c->c_currentber = NULL;
242             ber_free( ber, 1 );
243 
244             event_del( c->c_read_event );
245             Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
246                     "suspended read event on dying connid=%lu\n",
247                     c->c_connid );
248             CONNECTION_LOCK_DESTROY(c);
249             goto out;
250         }
251         if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) {
252             event_add( c->c_read_event, c->c_read_timeout );
253             Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
254                     "re-enabled read event on connid=%lu\n",
255                     c->c_connid );
256         }
257         goto out;
258     }
259 
260     checked_lock( &c->c_io_mutex );
261     c->c_io_state |= LLOAD_C_READ_HANDOVER;
262     checked_unlock( &c->c_io_mutex );
263     event_del( c->c_read_event );
264 
265     if ( !lload_conn_max_pdus_per_cycle ||
266             ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
267         /* If we're overloaded or configured as such, process one and resume in
268          * the next cycle. */
269         int rc = c->c_pdu_cb( c );
270 
271         checked_lock( &c->c_io_mutex );
272         c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
273         if ( rc == LDAP_SUCCESS &&
274                 ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
275                         !(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) {
276             event_add( c->c_read_event, c->c_read_timeout );
277         }
278         checked_unlock( &c->c_io_mutex );
279         goto out;
280     }
281 
282     Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
283             "suspended read event on connid=%lu\n",
284             c->c_connid );
285 
286     /*
287      * We have scheduled a call to handle_pdus to take care of handling this
288      * and further requests, its reference is now owned by that task.
289      */
290     epoch_leave( epoch );
291     return;
292 
293 out:
294     RELEASE_REF( c, c_refcnt, c->c_destroy );
295     epoch_leave( epoch );
296 }
297 
298 void
connection_write_cb(evutil_socket_t s,short what,void * arg)299 connection_write_cb( evutil_socket_t s, short what, void *arg )
300 {
301     LloadConnection *c = arg;
302     epoch_t epoch;
303 
304     Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
305             "considering writing to%s connid=%lu what=%hd\n",
306             c->c_live ? " live" : " dead", c->c_connid, what );
307     if ( !IS_ALIVE( c, c_live ) ) {
308         return;
309     }
310 
311     if ( what & EV_TIMEOUT ) {
312         Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
313                 "connid=%lu, timeout reached, destroying\n",
314                 c->c_connid );
315         /* Make sure the connection stays around for us to unlock it */
316         epoch = epoch_join();
317         CONNECTION_LOCK_DESTROY(c);
318         epoch_leave( epoch );
319         return;
320     }
321 
322     /* Before we acquire any locks */
323     event_del( c->c_write_event );
324 
325     if ( !acquire_ref( &c->c_refcnt ) ) {
326         return;
327     }
328 
329     /* If what == 0, we have a caller as opposed to being a callback */
330     if ( what ) {
331         epoch = epoch_join();
332     }
333 
334     checked_lock( &c->c_io_mutex );
335     Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
336             "have something to write to connection connid=%lu\n",
337             c->c_connid );
338 
339     /* We might have been beaten to flushing the data by another thread */
340     if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
341         int err = sock_errno();
342 
343         if ( err != EWOULDBLOCK && err != EAGAIN ) {
344             char ebuf[128];
345             checked_unlock( &c->c_io_mutex );
346             Debug( LDAP_DEBUG_ANY, "connection_write_cb: "
347                     "ber_flush on fd=%d failed errno=%d (%s)\n",
348                     c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
349             CONNECTION_LOCK_DESTROY(c);
350             goto done;
351         }
352 
353         if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
354             Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
355                     "connection connid=%lu blocked on writing, marking "
356                     "paused\n",
357                     c->c_connid );
358         }
359         c->c_io_state |= LLOAD_C_READ_PAUSE;
360 
361         /* TODO: Do not reset write timeout unless we wrote something */
362         event_add( c->c_write_event, lload_write_timeout );
363     } else {
364         c->c_pendingber = NULL;
365         if ( c->c_io_state & LLOAD_C_READ_PAUSE ) {
366             c->c_io_state ^= LLOAD_C_READ_PAUSE;
367             Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
368                     "Unpausing connection connid=%lu\n",
369                     c->c_connid );
370             if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) {
371                 event_add( c->c_read_event, c->c_read_timeout );
372             }
373         }
374     }
375     checked_unlock( &c->c_io_mutex );
376 
377 done:
378     RELEASE_REF( c, c_refcnt, c->c_destroy );
379     if ( what ) {
380         epoch_leave( epoch );
381     }
382 }
383 
384 void
connection_destroy(LloadConnection * c)385 connection_destroy( LloadConnection *c )
386 {
387     assert( c );
388     Debug( LDAP_DEBUG_CONNS, "connection_destroy: "
389             "destroying connection connid=%lu\n",
390             c->c_connid );
391 
392     CONNECTION_ASSERT_LOCKED(c);
393     assert( c->c_live == 0 );
394     assert( c->c_refcnt == 0 );
395     assert( c->c_state == LLOAD_C_INVALID );
396 
397     ber_sockbuf_free( c->c_sb );
398 
399     if ( c->c_currentber ) {
400         ber_free( c->c_currentber, 1 );
401         c->c_currentber = NULL;
402     }
403     if ( c->c_pendingber ) {
404         ber_free( c->c_pendingber, 1 );
405         c->c_pendingber = NULL;
406     }
407 
408     if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
409         ber_memfree( c->c_sasl_bind_mech.bv_val );
410         BER_BVZERO( &c->c_sasl_bind_mech );
411     }
412 #ifdef HAVE_CYRUS_SASL
413     if ( c->c_sasl_defaults ) {
414         lutil_sasl_freedefs( c->c_sasl_defaults );
415         c->c_sasl_defaults = NULL;
416     }
417     if ( c->c_sasl_authctx ) {
418 #ifdef SASL_CHANNEL_BINDING /* 2.1.25+ */
419         if ( c->c_sasl_cbinding ) {
420             ch_free( c->c_sasl_cbinding );
421         }
422 #endif
423         sasl_dispose( &c->c_sasl_authctx );
424     }
425 #endif /* HAVE_CYRUS_SASL */
426 
427     CONNECTION_UNLOCK(c);
428 
429     ldap_pvt_thread_mutex_destroy( &c->c_io_mutex );
430     ldap_pvt_thread_mutex_destroy( &c->c_mutex );
431 
432     ch_free( c );
433 
434     listeners_reactivate();
435 }
436 
437 /*
438  * Called holding mutex, will walk cq calling cb on all connections whose
439  * c_connid <= cq_last->c_connid that still exist at the time we get to them.
440  */
441 void
connections_walk_last(ldap_pvt_thread_mutex_t * cq_mutex,lload_c_head * cq,LloadConnection * cq_last,CONNCB cb,void * arg)442 connections_walk_last(
443         ldap_pvt_thread_mutex_t *cq_mutex,
444         lload_c_head *cq,
445         LloadConnection *cq_last,
446         CONNCB cb,
447         void *arg )
448 {
449     LloadConnection *c = cq_last;
450     uintptr_t last_connid;
451 
452     if ( LDAP_CIRCLEQ_EMPTY( cq ) ) {
453         return;
454     }
455     assert_locked( cq_mutex );
456 
457     last_connid = c->c_connid;
458     c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
459 
460     while ( !acquire_ref( &c->c_refcnt ) ) {
461         c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
462         if ( c->c_connid >= last_connid ) {
463             assert_locked( cq_mutex );
464             return;
465         }
466     }
467 
468     /*
469      * Notes:
470      * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
471      *   order
472      * - the connection with the highest c_connid is passed in cq_last
473      * - we can only use cq when we hold cq_mutex
474      * - connections might be added to or removed from cq while we're busy
475      *   processing connections
476      * - we need a way to detect we've finished looping around cq for some
477      *   definition of looping around
478      */
479     do {
480         int rc;
481 
482         checked_unlock( cq_mutex );
483 
484         rc = cb( c, arg );
485         RELEASE_REF( c, c_refcnt, c->c_destroy );
486 
487         checked_lock( cq_mutex );
488         if ( rc || LDAP_CIRCLEQ_EMPTY( cq ) ) {
489             break;
490         }
491 
492         do {
493             LloadConnection *old = c;
494             c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
495             if ( c->c_connid <= old->c_connid || c->c_connid > last_connid ) {
496                 assert_locked( cq_mutex );
497                 return;
498             }
499         } while ( !acquire_ref( &c->c_refcnt ) );
500     } while ( c->c_connid <= last_connid );
501     assert_locked( cq_mutex );
502 }
503 
504 void
connections_walk(ldap_pvt_thread_mutex_t * cq_mutex,lload_c_head * cq,CONNCB cb,void * arg)505 connections_walk(
506         ldap_pvt_thread_mutex_t *cq_mutex,
507         lload_c_head *cq,
508         CONNCB cb,
509         void *arg )
510 {
511     LloadConnection *cq_last = LDAP_CIRCLEQ_LAST( cq );
512     return connections_walk_last( cq_mutex, cq, cq_last, cb, arg );
513 }
514 
515 int
lload_connection_close(LloadConnection * c,void * arg)516 lload_connection_close( LloadConnection *c, void *arg )
517 {
518     int unlock, gentle = *(int *)arg;
519     LloadOperation *op;
520 
521     Debug( LDAP_DEBUG_CONNS, "lload_connection_close: "
522             "marking connection connid=%lu closing\n",
523             c->c_connid );
524 
525     /* We were approached from the connection list or cn=monitor */
526     assert( IS_ALIVE( c, c_refcnt ) );
527 
528     /* Need to acquire this first, even if we won't need it */
529     unlock = 1;
530     checked_lock( &c->c_io_mutex );
531     CONNECTION_LOCK(c);
532 
533     /* Only if it's a usable client */
534     if ( ( c->c_state == LLOAD_C_READY || c->c_state == LLOAD_C_BINDING ) &&
535             c->c_destroy == client_destroy ) {
536         if ( c->c_pendingber != NULL ||
537                 (c->c_pendingber = ber_alloc()) != NULL ) {
538             ber_printf( c->c_pendingber, "t{tit{esss}}", LDAP_TAG_MESSAGE,
539                     LDAP_TAG_MSGID, LDAP_RES_UNSOLICITED,
540                     LDAP_RES_EXTENDED, LDAP_UNAVAILABLE, "",
541                     "connection closing", LDAP_NOTICE_OF_DISCONNECTION );
542             unlock = 0;
543             checked_unlock( &c->c_io_mutex );
544             CONNECTION_UNLOCK(c);
545             connection_write_cb( -1, 0, c );
546             CONNECTION_LOCK(c);
547         }
548     }
549     if ( unlock )
550         checked_unlock( &c->c_io_mutex );
551 
552     if ( !gentle || !c->c_ops ) {
553         CONNECTION_DESTROY(c);
554         return LDAP_SUCCESS;
555     }
556 
557     /* The first thing we do is make sure we don't get new Operations in */
558     c->c_state = LLOAD_C_CLOSING;
559 
560     do {
561         TAvlnode *node = ldap_tavl_end( c->c_ops, TAVL_DIR_LEFT );
562         op = node->avl_data;
563 
564         /* Close operations that would need client action to resolve,
565          * only SASL binds in progress do that right now */
566         if ( op->o_client_msgid || op->o_upstream_msgid ) {
567             break;
568         }
569 
570         CONNECTION_UNLOCK(c);
571         operation_unlink( op );
572         CONNECTION_LOCK(c);
573     } while ( c->c_ops );
574 
575     CONNECTION_UNLOCK(c);
576     return LDAP_SUCCESS;
577 }
578 
579 LloadConnection *
lload_connection_init(ber_socket_t s,const char * peername,int flags)580 lload_connection_init( ber_socket_t s, const char *peername, int flags )
581 {
582     LloadConnection *c;
583 
584     assert( peername != NULL );
585 
586     if ( s == AC_SOCKET_INVALID ) {
587         Debug( LDAP_DEBUG_ANY, "lload_connection_init: "
588                 "init of socket fd=%ld invalid\n",
589                 (long)s );
590         return NULL;
591     }
592 
593     assert( s >= 0 );
594 
595     c = ch_calloc( 1, sizeof(LloadConnection) );
596 
597     c->c_fd = s;
598     c->c_sb = ber_sockbuf_alloc();
599     ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s );
600 
601 #ifdef LDAP_PF_LOCAL
602     if ( flags & CONN_IS_IPC ) {
603 #ifdef LDAP_DEBUG
604         ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug,
605                 LBER_SBIOD_LEVEL_PROVIDER, (void *)"ipc_" );
606 #endif
607         ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_fd,
608                 LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
609     } else
610 #endif /* LDAP_PF_LOCAL */
611     {
612 #ifdef LDAP_DEBUG
613         ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug,
614                 LBER_SBIOD_LEVEL_PROVIDER, (void *)"tcp_" );
615 #endif
616         ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_tcp,
617                 LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
618     }
619 
620 #ifdef LDAP_DEBUG
621     ber_sockbuf_add_io(
622             c->c_sb, &ber_sockbuf_io_debug, INT_MAX, (void *)"lload_" );
623 #endif
624 
625     c->c_next_msgid = 1;
626     c->c_refcnt = c->c_live = 1;
627     c->c_destroy = connection_destroy;
628 
629     LDAP_CIRCLEQ_ENTRY_INIT( c, c_next );
630 
631     ldap_pvt_thread_mutex_init( &c->c_mutex );
632     ldap_pvt_thread_mutex_init( &c->c_io_mutex );
633 
634     lload_connection_assign_nextid( c );
635 
636     Debug( LDAP_DEBUG_CONNS, "lload_connection_init: "
637             "connection connid=%lu allocated for socket fd=%d peername=%s\n",
638             c->c_connid, s, peername );
639 
640     c->c_state = LLOAD_C_ACTIVE;
641 
642     return c;
643 }
644