1 /* $OpenLDAP$ */
2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
3 *
4 * Copyright 1998-2021 The OpenLDAP Foundation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted only as authorized by the OpenLDAP
9 * Public License.
10 *
11 * A copy of this license is available in the file LICENSE in the
12 * top-level directory of the distribution or, alternatively, at
13 * <http://www.OpenLDAP.org/license.html>.
14 */
15 /* Portions Copyright (c) 1995 Regents of the University of Michigan.
16 * All rights reserved.
17 *
18 * Redistribution and use in source and binary forms are permitted
19 * provided that this notice is preserved and that due credit is given
20 * to the University of Michigan at Ann Arbor. The name of the University
21 * may not be used to endorse or promote products derived from this
22 * software without specific prior written permission. This software
23 * is provided ``as is'' without express or implied warranty.
24 */
25
26 #include "portable.h"
27
28 #include <stdio.h>
29 #ifdef HAVE_LIMITS_H
30 #include <limits.h>
31 #endif
32
33 #include <ac/socket.h>
34 #include <ac/errno.h>
35 #include <ac/string.h>
36 #include <ac/time.h>
37 #include <ac/unistd.h>
38
39 #include "lload.h"
40
41 #include "lutil.h"
42 #include "lutil_ldap.h"
43
44 static unsigned long conn_nextid = 0;
45
46 static void
lload_connection_assign_nextid(LloadConnection * conn)47 lload_connection_assign_nextid( LloadConnection *conn )
48 {
49 conn->c_connid = __atomic_fetch_add( &conn_nextid, 1, __ATOMIC_RELAXED );
50 }
51
52 /*
53 * We start off with the connection muted and c_currentber holding the pdu we
54 * received.
55 *
56 * We run c->c_pdu_cb for each pdu, stopping once we hit an error, have to wait
57 * on reading or after we process lload_conn_max_pdus_per_cycle pdus so as to
58 * maintain fairness and not hog the worker thread forever.
59 *
60 * If we've run out of pdus immediately available from the stream or hit the
61 * budget, we unmute the connection.
62 *
63 * c->c_pdu_cb might return an 'error' and not free the connection. That can
64 * happen when changing the state or when client is blocked on writing and
65 * already has a pdu pending on the same operation, it's their job to make sure
66 * we're woken up again.
67 */
68 void *
handle_pdus(void * ctx,void * arg)69 handle_pdus( void *ctx, void *arg )
70 {
71 LloadConnection *c = arg;
72 int pdus_handled = 0;
73 epoch_t epoch;
74
75 /* A reference was passed on to us */
76 assert( IS_ALIVE( c, c_refcnt ) );
77
78 epoch = epoch_join();
79 for ( ;; ) {
80 BerElement *ber;
81 ber_tag_t tag;
82 ber_len_t len;
83
84 if ( c->c_pdu_cb( c ) ) {
85 /* Error/reset, get rid ouf our reference and bail */
86 goto done;
87 }
88
89 if ( !IS_ALIVE( c, c_live ) ) {
90 break;
91 }
92
93 if ( ++pdus_handled >= lload_conn_max_pdus_per_cycle ) {
94 /* Do not read now, re-enable read event instead */
95 break;
96 }
97
98 ber = c->c_currentber;
99 if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
100 Debug( LDAP_DEBUG_ANY, "handle_pdus: "
101 "connid=%lu, ber_alloc failed\n",
102 c->c_connid );
103 CONNECTION_LOCK_DESTROY(c);
104 goto done;
105 }
106 c->c_currentber = ber;
107
108 checked_lock( &c->c_io_mutex );
109 if ( (lload_features & LLOAD_FEATURE_PAUSE) &&
110 (c->c_io_state & LLOAD_C_READ_PAUSE) ) {
111 goto pause;
112 }
113 tag = ber_get_next( c->c_sb, &len, ber );
114 checked_unlock( &c->c_io_mutex );
115 if ( tag != LDAP_TAG_MESSAGE ) {
116 int err = sock_errno();
117
118 if ( err != EWOULDBLOCK && err != EAGAIN ) {
119 if ( err || tag == LBER_ERROR ) {
120 char ebuf[128];
121 Debug( LDAP_DEBUG_ANY, "handle_pdus: "
122 "ber_get_next on fd=%d failed errno=%d (%s)\n",
123 c->c_fd, err,
124 sock_errstr( err, ebuf, sizeof(ebuf) ) );
125 } else {
126 Debug( LDAP_DEBUG_STATS, "handle_pdus: "
127 "ber_get_next on fd=%d connid=%lu received "
128 "a strange PDU tag=%lx\n",
129 c->c_fd, c->c_connid, tag );
130 }
131
132 c->c_currentber = NULL;
133 ber_free( ber, 1 );
134 CONNECTION_LOCK_DESTROY(c);
135 goto done;
136 }
137 break;
138 }
139
140 assert( IS_ALIVE( c, c_refcnt ) );
141 epoch_leave( epoch );
142 epoch = epoch_join();
143 assert( IS_ALIVE( c, c_refcnt ) );
144 }
145
146 checked_lock( &c->c_io_mutex );
147 if ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
148 !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
149 event_add( c->c_read_event, c->c_read_timeout );
150 Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
151 "re-enabled read event on connid=%lu\n",
152 c->c_connid );
153 }
154 pause:
155 c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
156 checked_unlock( &c->c_io_mutex );
157
158 done:
159 RELEASE_REF( c, c_refcnt, c->c_destroy );
160 epoch_leave( epoch );
161 return NULL;
162 }
163
164 /*
165 * Initial read on the connection, if we get an LDAP PDU, submit the
166 * processing of this and successive ones to the work queue.
167 *
168 * If we can't submit it to the queue (overload), process this one and return
169 * to the event loop immediately after.
170 */
171 void
connection_read_cb(evutil_socket_t s,short what,void * arg)172 connection_read_cb( evutil_socket_t s, short what, void *arg )
173 {
174 LloadConnection *c = arg;
175 BerElement *ber;
176 ber_tag_t tag;
177 ber_len_t len;
178 epoch_t epoch;
179 int pause;
180
181 if ( !IS_ALIVE( c, c_live ) ) {
182 event_del( c->c_read_event );
183 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
184 "suspended read event on a dead connid=%lu\n",
185 c->c_connid );
186 return;
187 }
188
189 if ( what & EV_TIMEOUT ) {
190 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
191 "connid=%lu, timeout reached, destroying\n",
192 c->c_connid );
193 /* Make sure the connection stays around for us to unlock it */
194 epoch = epoch_join();
195 CONNECTION_LOCK_DESTROY(c);
196 epoch_leave( epoch );
197 return;
198 }
199
200 if ( !acquire_ref( &c->c_refcnt ) ) {
201 return;
202 }
203 epoch = epoch_join();
204
205 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
206 "connection connid=%lu ready to read\n",
207 c->c_connid );
208
209 ber = c->c_currentber;
210 if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
211 Debug( LDAP_DEBUG_ANY, "connection_read_cb: "
212 "connid=%lu, ber_alloc failed\n",
213 c->c_connid );
214 goto out;
215 }
216 c->c_currentber = ber;
217
218 checked_lock( &c->c_io_mutex );
219 assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) );
220 tag = ber_get_next( c->c_sb, &len, ber );
221 pause = c->c_io_state & LLOAD_C_READ_PAUSE;
222 checked_unlock( &c->c_io_mutex );
223
224 if ( tag != LDAP_TAG_MESSAGE ) {
225 int err = sock_errno();
226
227 if ( err != EWOULDBLOCK && err != EAGAIN ) {
228 if ( err || tag == LBER_ERROR ) {
229 char ebuf[128];
230 Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
231 "ber_get_next on fd=%d failed errno=%d (%s)\n",
232 c->c_fd, err,
233 sock_errstr( err, ebuf, sizeof(ebuf) ) );
234 } else {
235 Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
236 "ber_get_next on fd=%d connid=%lu received "
237 "a strange PDU tag=%lx\n",
238 c->c_fd, c->c_connid, tag );
239 }
240
241 c->c_currentber = NULL;
242 ber_free( ber, 1 );
243
244 event_del( c->c_read_event );
245 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
246 "suspended read event on dying connid=%lu\n",
247 c->c_connid );
248 CONNECTION_LOCK_DESTROY(c);
249 goto out;
250 }
251 if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) {
252 event_add( c->c_read_event, c->c_read_timeout );
253 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
254 "re-enabled read event on connid=%lu\n",
255 c->c_connid );
256 }
257 goto out;
258 }
259
260 checked_lock( &c->c_io_mutex );
261 c->c_io_state |= LLOAD_C_READ_HANDOVER;
262 checked_unlock( &c->c_io_mutex );
263 event_del( c->c_read_event );
264
265 if ( !lload_conn_max_pdus_per_cycle ||
266 ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
267 /* If we're overloaded or configured as such, process one and resume in
268 * the next cycle. */
269 int rc = c->c_pdu_cb( c );
270
271 checked_lock( &c->c_io_mutex );
272 c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
273 if ( rc == LDAP_SUCCESS &&
274 ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
275 !(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) {
276 event_add( c->c_read_event, c->c_read_timeout );
277 }
278 checked_unlock( &c->c_io_mutex );
279 goto out;
280 }
281
282 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
283 "suspended read event on connid=%lu\n",
284 c->c_connid );
285
286 /*
287 * We have scheduled a call to handle_pdus to take care of handling this
288 * and further requests, its reference is now owned by that task.
289 */
290 epoch_leave( epoch );
291 return;
292
293 out:
294 RELEASE_REF( c, c_refcnt, c->c_destroy );
295 epoch_leave( epoch );
296 }
297
298 void
connection_write_cb(evutil_socket_t s,short what,void * arg)299 connection_write_cb( evutil_socket_t s, short what, void *arg )
300 {
301 LloadConnection *c = arg;
302 epoch_t epoch;
303
304 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
305 "considering writing to%s connid=%lu what=%hd\n",
306 c->c_live ? " live" : " dead", c->c_connid, what );
307 if ( !IS_ALIVE( c, c_live ) ) {
308 return;
309 }
310
311 if ( what & EV_TIMEOUT ) {
312 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
313 "connid=%lu, timeout reached, destroying\n",
314 c->c_connid );
315 /* Make sure the connection stays around for us to unlock it */
316 epoch = epoch_join();
317 CONNECTION_LOCK_DESTROY(c);
318 epoch_leave( epoch );
319 return;
320 }
321
322 /* Before we acquire any locks */
323 event_del( c->c_write_event );
324
325 if ( !acquire_ref( &c->c_refcnt ) ) {
326 return;
327 }
328
329 /* If what == 0, we have a caller as opposed to being a callback */
330 if ( what ) {
331 epoch = epoch_join();
332 }
333
334 checked_lock( &c->c_io_mutex );
335 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
336 "have something to write to connection connid=%lu\n",
337 c->c_connid );
338
339 /* We might have been beaten to flushing the data by another thread */
340 if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
341 int err = sock_errno();
342
343 if ( err != EWOULDBLOCK && err != EAGAIN ) {
344 char ebuf[128];
345 checked_unlock( &c->c_io_mutex );
346 Debug( LDAP_DEBUG_ANY, "connection_write_cb: "
347 "ber_flush on fd=%d failed errno=%d (%s)\n",
348 c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
349 CONNECTION_LOCK_DESTROY(c);
350 goto done;
351 }
352
353 if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
354 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
355 "connection connid=%lu blocked on writing, marking "
356 "paused\n",
357 c->c_connid );
358 }
359 c->c_io_state |= LLOAD_C_READ_PAUSE;
360
361 /* TODO: Do not reset write timeout unless we wrote something */
362 event_add( c->c_write_event, lload_write_timeout );
363 } else {
364 c->c_pendingber = NULL;
365 if ( c->c_io_state & LLOAD_C_READ_PAUSE ) {
366 c->c_io_state ^= LLOAD_C_READ_PAUSE;
367 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
368 "Unpausing connection connid=%lu\n",
369 c->c_connid );
370 if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) {
371 event_add( c->c_read_event, c->c_read_timeout );
372 }
373 }
374 }
375 checked_unlock( &c->c_io_mutex );
376
377 done:
378 RELEASE_REF( c, c_refcnt, c->c_destroy );
379 if ( what ) {
380 epoch_leave( epoch );
381 }
382 }
383
384 void
connection_destroy(LloadConnection * c)385 connection_destroy( LloadConnection *c )
386 {
387 assert( c );
388 Debug( LDAP_DEBUG_CONNS, "connection_destroy: "
389 "destroying connection connid=%lu\n",
390 c->c_connid );
391
392 CONNECTION_ASSERT_LOCKED(c);
393 assert( c->c_live == 0 );
394 assert( c->c_refcnt == 0 );
395 assert( c->c_state == LLOAD_C_INVALID );
396
397 ber_sockbuf_free( c->c_sb );
398
399 if ( c->c_currentber ) {
400 ber_free( c->c_currentber, 1 );
401 c->c_currentber = NULL;
402 }
403 if ( c->c_pendingber ) {
404 ber_free( c->c_pendingber, 1 );
405 c->c_pendingber = NULL;
406 }
407
408 if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
409 ber_memfree( c->c_sasl_bind_mech.bv_val );
410 BER_BVZERO( &c->c_sasl_bind_mech );
411 }
412 #ifdef HAVE_CYRUS_SASL
413 if ( c->c_sasl_defaults ) {
414 lutil_sasl_freedefs( c->c_sasl_defaults );
415 c->c_sasl_defaults = NULL;
416 }
417 if ( c->c_sasl_authctx ) {
418 #ifdef SASL_CHANNEL_BINDING /* 2.1.25+ */
419 if ( c->c_sasl_cbinding ) {
420 ch_free( c->c_sasl_cbinding );
421 }
422 #endif
423 sasl_dispose( &c->c_sasl_authctx );
424 }
425 #endif /* HAVE_CYRUS_SASL */
426
427 CONNECTION_UNLOCK(c);
428
429 ldap_pvt_thread_mutex_destroy( &c->c_io_mutex );
430 ldap_pvt_thread_mutex_destroy( &c->c_mutex );
431
432 ch_free( c );
433
434 listeners_reactivate();
435 }
436
437 /*
438 * Called holding mutex, will walk cq calling cb on all connections whose
439 * c_connid <= cq_last->c_connid that still exist at the time we get to them.
440 */
441 void
connections_walk_last(ldap_pvt_thread_mutex_t * cq_mutex,lload_c_head * cq,LloadConnection * cq_last,CONNCB cb,void * arg)442 connections_walk_last(
443 ldap_pvt_thread_mutex_t *cq_mutex,
444 lload_c_head *cq,
445 LloadConnection *cq_last,
446 CONNCB cb,
447 void *arg )
448 {
449 LloadConnection *c = cq_last;
450 uintptr_t last_connid;
451
452 if ( LDAP_CIRCLEQ_EMPTY( cq ) ) {
453 return;
454 }
455 assert_locked( cq_mutex );
456
457 last_connid = c->c_connid;
458 c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
459
460 while ( !acquire_ref( &c->c_refcnt ) ) {
461 c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
462 if ( c->c_connid >= last_connid ) {
463 assert_locked( cq_mutex );
464 return;
465 }
466 }
467
468 /*
469 * Notes:
470 * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
471 * order
472 * - the connection with the highest c_connid is passed in cq_last
473 * - we can only use cq when we hold cq_mutex
474 * - connections might be added to or removed from cq while we're busy
475 * processing connections
476 * - we need a way to detect we've finished looping around cq for some
477 * definition of looping around
478 */
479 do {
480 int rc;
481
482 checked_unlock( cq_mutex );
483
484 rc = cb( c, arg );
485 RELEASE_REF( c, c_refcnt, c->c_destroy );
486
487 checked_lock( cq_mutex );
488 if ( rc || LDAP_CIRCLEQ_EMPTY( cq ) ) {
489 break;
490 }
491
492 do {
493 LloadConnection *old = c;
494 c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
495 if ( c->c_connid <= old->c_connid || c->c_connid > last_connid ) {
496 assert_locked( cq_mutex );
497 return;
498 }
499 } while ( !acquire_ref( &c->c_refcnt ) );
500 } while ( c->c_connid <= last_connid );
501 assert_locked( cq_mutex );
502 }
503
504 void
connections_walk(ldap_pvt_thread_mutex_t * cq_mutex,lload_c_head * cq,CONNCB cb,void * arg)505 connections_walk(
506 ldap_pvt_thread_mutex_t *cq_mutex,
507 lload_c_head *cq,
508 CONNCB cb,
509 void *arg )
510 {
511 LloadConnection *cq_last = LDAP_CIRCLEQ_LAST( cq );
512 return connections_walk_last( cq_mutex, cq, cq_last, cb, arg );
513 }
514
515 int
lload_connection_close(LloadConnection * c,void * arg)516 lload_connection_close( LloadConnection *c, void *arg )
517 {
518 int gentle = *(int *)arg;
519 LloadOperation *op;
520
521 Debug( LDAP_DEBUG_CONNS, "lload_connection_close: "
522 "marking connection connid=%lu closing\n",
523 c->c_connid );
524
525 /* We were approached from the connection list */
526 assert( IS_ALIVE( c, c_refcnt ) );
527
528 CONNECTION_LOCK(c);
529 if ( !gentle || !c->c_ops ) {
530 CONNECTION_DESTROY(c);
531 return LDAP_SUCCESS;
532 }
533
534 /* The first thing we do is make sure we don't get new Operations in */
535 c->c_state = LLOAD_C_CLOSING;
536
537 do {
538 TAvlnode *node = ldap_tavl_end( c->c_ops, TAVL_DIR_LEFT );
539 op = node->avl_data;
540
541 /* Close operations that would need client action to resolve,
542 * only SASL binds in progress do that right now */
543 if ( op->o_client_msgid || op->o_upstream_msgid ) {
544 break;
545 }
546
547 CONNECTION_UNLOCK(c);
548 operation_unlink( op );
549 CONNECTION_LOCK(c);
550 } while ( c->c_ops );
551
552 CONNECTION_UNLOCK(c);
553 return LDAP_SUCCESS;
554 }
555
556 LloadConnection *
lload_connection_init(ber_socket_t s,const char * peername,int flags)557 lload_connection_init( ber_socket_t s, const char *peername, int flags )
558 {
559 LloadConnection *c;
560
561 assert( peername != NULL );
562
563 if ( s == AC_SOCKET_INVALID ) {
564 Debug( LDAP_DEBUG_ANY, "lload_connection_init: "
565 "init of socket fd=%ld invalid\n",
566 (long)s );
567 return NULL;
568 }
569
570 assert( s >= 0 );
571
572 c = ch_calloc( 1, sizeof(LloadConnection) );
573
574 c->c_fd = s;
575 c->c_sb = ber_sockbuf_alloc();
576 ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s );
577
578 #ifdef LDAP_PF_LOCAL
579 if ( flags & CONN_IS_IPC ) {
580 #ifdef LDAP_DEBUG
581 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug,
582 LBER_SBIOD_LEVEL_PROVIDER, (void *)"ipc_" );
583 #endif
584 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_fd,
585 LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
586 } else
587 #endif /* LDAP_PF_LOCAL */
588 {
589 #ifdef LDAP_DEBUG
590 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug,
591 LBER_SBIOD_LEVEL_PROVIDER, (void *)"tcp_" );
592 #endif
593 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_tcp,
594 LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
595 }
596
597 #ifdef LDAP_DEBUG
598 ber_sockbuf_add_io(
599 c->c_sb, &ber_sockbuf_io_debug, INT_MAX, (void *)"lload_" );
600 #endif
601
602 c->c_next_msgid = 1;
603 c->c_refcnt = c->c_live = 1;
604 c->c_destroy = connection_destroy;
605
606 LDAP_CIRCLEQ_ENTRY_INIT( c, c_next );
607
608 ldap_pvt_thread_mutex_init( &c->c_mutex );
609 ldap_pvt_thread_mutex_init( &c->c_io_mutex );
610
611 lload_connection_assign_nextid( c );
612
613 Debug( LDAP_DEBUG_CONNS, "lload_connection_init: "
614 "connection connid=%lu allocated for socket fd=%d peername=%s\n",
615 c->c_connid, s, peername );
616
617 c->c_state = LLOAD_C_ACTIVE;
618
619 return c;
620 }
621