1 /* message_queue.c - routines to maintain the per-connection lists
2  * of pending operations */
3 /* $OpenLDAP$ */
4 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
5  *
6  * Copyright 2016-2021 The OpenLDAP Foundation.
7  * Portions Copyright 2016 Symas Corporation.
8  * All rights reserved.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted only as authorized by the OpenLDAP
12  * Public License.
13  *
14  * A copy of this license is available in the file LICENSE in the
15  * top-level directory of the distribution or, alternatively, at
16  * <http://www.OpenLDAP.org/license.html>.
17  */
18 
19 /* ACKNOWLEDGEMENTS:
20  * This work was developed by Symas Corporation
21  * based on back-meta module for inclusion in OpenLDAP Software.
22  * This work was sponsored by Ericsson. */
23 
24 #include "portable.h"
25 
26 #include <stdio.h>
27 
28 #include <ac/socket.h>
29 #include <ac/string.h>
30 #include <ac/time.h>
31 
32 #include "lutil.h"
33 #include "slap.h"
34 #include "../back-ldap/back-ldap.h"
35 #include "back-asyncmeta.h"
36 #include "../../../libraries/liblber/lber-int.h"
37 #include "lutil.h"
38 
39 
40 typedef struct listptr {
41 	void *reserved;
42 	struct listptr *next;
43 } listptr;
44 
45 typedef struct listhead {
46 	struct listptr *list;
47 	int cnt;
48 } listhead;
49 
50 #ifndef LH_MAX
51 #define LH_MAX	16
52 #endif
53 
asyncmeta_memctx_put(void * threadctx,void * memctx)54 static void asyncmeta_memctx_put(void *threadctx, void *memctx)
55 {
56 	slap_sl_mem_setctx(threadctx, NULL);
57 	slap_sl_mem_destroy((void *)1, memctx);
58 }
59 
asyncmeta_new_bm_context(Operation * op,SlapReply * rs,bm_context_t ** new_bc,int ntargets,a_metainfo_t * mi)60 int asyncmeta_new_bm_context(Operation *op,
61 			     SlapReply *rs,
62 			     bm_context_t **new_bc,
63 			     int ntargets,
64 			     a_metainfo_t	*mi)
65 {
66 	int i;
67 	*new_bc = op->o_tmpcalloc( 1, sizeof( bm_context_t ), op->o_tmpmemctx );
68 
69 	(*new_bc)->op = op;
70 	(*new_bc)->copy_op = *op;
71 	(*new_bc)->candidates = op->o_tmpcalloc(ntargets, sizeof(SlapReply),op->o_tmpmemctx);
72 	(*new_bc)->msgids = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx);
73 	(*new_bc)->nretries = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx);
74 	(*new_bc)->c_peer_name = op->o_conn->c_peer_name;
75 	(*new_bc)->is_root = be_isroot( op );
76 
77 	switch(op->o_tag) {
78 	case LDAP_REQ_COMPARE:
79 		{
80 		AttributeAssertion *ava = op->o_tmpcalloc( 1, sizeof(AttributeAssertion), op->o_tmpmemctx );
81 		*ava = *op->orc_ava;
82 		op->orc_ava = ava;
83 		}
84 		break;
85 	case LDAP_REQ_MODRDN:
86 		if (op->orr_newSup != NULL) {
87 			struct berval *bv = op->o_tmpalloc( sizeof( struct berval ), op->o_tmpmemctx );
88 			*bv = *op->orr_newSup;
89 			op->orr_newSup = bv;
90 		}
91 
92 		if (op->orr_nnewSup != NULL) {
93 			struct berval *bv = op->o_tmpalloc( sizeof( struct berval ), op->o_tmpmemctx );
94 			*bv = *op->orr_nnewSup;
95 			op->orr_nnewSup = bv;
96 		}
97 		break;
98 	default:
99 		break;
100 	}
101 	for (i = 0; i < ntargets; i++) {
102 		(*new_bc)->msgids[i] = META_MSGID_UNDEFINED;
103 	}
104 	for (i = 0; i < ntargets; i++) {
105 		(*new_bc)->nretries[i] = mi->mi_targets[i]->mt_nretries;
106 	}
107 	return LDAP_SUCCESS;
108 }
109 
asyncmeta_free_op(Operation * op)110 void asyncmeta_free_op(Operation *op)
111 {
112 	assert (op != NULL);
113 	switch (op->o_tag) {
114 	case LDAP_REQ_SEARCH:
115 		break;
116 	case LDAP_REQ_ADD:
117 		if ( op->ora_modlist != NULL ) {
118 			slap_mods_free(op->ora_modlist, 0 );
119 		}
120 
121 		if ( op->ora_e != NULL ) {
122 			entry_free( op->ora_e );
123 		}
124 
125 		break;
126 	case LDAP_REQ_MODIFY:
127 		if ( op->orm_modlist != NULL ) {
128 			slap_mods_free(op->orm_modlist, 1 );
129 		}
130 		break;
131 	case LDAP_REQ_MODRDN:
132 		if ( op->orr_modlist != NULL ) {
133 			slap_mods_free(op->orr_modlist, 1 );
134 		}
135 		break;
136 	case LDAP_REQ_COMPARE:
137 		break;
138 	case LDAP_REQ_DELETE:
139 		break;
140 	default:
141 		Debug( LDAP_DEBUG_TRACE, "==> asyncmeta_free_op : other message type" );
142 	}
143 
144 	connection_op_finish( op );
145 	slap_op_free( op, op->o_threadctx );
146 }
147 
148 
149 
150 
asyncmeta_clear_bm_context(bm_context_t * bc)151 void asyncmeta_clear_bm_context(bm_context_t *bc)
152 {
153 
154 	Operation *op = bc->op;
155 	void *thrctx, *memctx;
156 	int i;
157 
158 	if ( bc->bc_mc && bc->bc_mc->mc_info ) {
159 		for (i = 0; i < bc->bc_mc->mc_info->mi_ntargets; i++) {
160 			if (bc->candidates[ i ].sr_text != NULL) {
161 				ch_free( (char *)bc->candidates[ i ].sr_text );
162 				bc->candidates[ i ].sr_text = NULL;
163 			}
164 		}
165 	}
166 
167 	if (op->o_conn->c_conn_idx == -1)
168 		return;
169 	memctx = op->o_tmpmemctx;
170 	thrctx = op->o_threadctx;
171 	while (op->o_bd == bc->copy_op.o_bd)
172 		ldap_pvt_thread_yield();
173 	asyncmeta_free_op(op);
174 	asyncmeta_memctx_put(thrctx, memctx);
175 }
176 
asyncmeta_add_message_queue(a_metaconn_t * mc,bm_context_t * bc)177 int asyncmeta_add_message_queue(a_metaconn_t *mc, bm_context_t *bc)
178 {
179 	a_metainfo_t *mi = mc->mc_info;
180 	int max_pending_ops = (mi->mi_max_pending_ops == 0) ? META_BACK_CFG_MAX_PENDING_OPS : mi->mi_max_pending_ops;
181 
182 	Debug( LDAP_DEBUG_TRACE, "add_message_queue: mc %p, pending_ops %d, max_pending %d\n",
183 		mc, mc->pending_ops, max_pending_ops );
184 
185 	assert(bc->bc_mc == NULL);
186 	if (mc->pending_ops >= max_pending_ops) {
187 		return LDAP_BUSY;
188 	}
189 	bc->bc_mc = mc;
190 
191 	slap_sl_mem_setctx(bc->op->o_threadctx, NULL);
192 	LDAP_STAILQ_INSERT_TAIL( &mc->mc_om_list, bc, bc_next);
193 	mc->pending_ops++;
194 	return LDAP_SUCCESS;
195 }
196 
197 
198 void
asyncmeta_drop_bc(a_metaconn_t * mc,bm_context_t * bc)199 asyncmeta_drop_bc(a_metaconn_t *mc, bm_context_t *bc)
200 {
201 	bm_context_t *om;
202 	LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
203 		if (om == bc) {
204 			LDAP_STAILQ_REMOVE(&mc->mc_om_list, om, bm_context_t, bc_next);
205 			mc->pending_ops--;
206 			break;
207 		}
208 	}
209 	assert(om == bc);
210 	assert(bc->bc_mc == mc);
211 }
212 
213 
214 bm_context_t *
asyncmeta_find_message(ber_int_t msgid,a_metaconn_t * mc,int candidate)215 asyncmeta_find_message(ber_int_t msgid, a_metaconn_t *mc, int candidate)
216 {
217 	bm_context_t *om;
218 	LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
219 		if (om->candidates[candidate].sr_msgid == msgid && !om->bc_invalid) {
220 			break;
221 		}
222 	}
223 	return om;
224 }
225 
226 bm_context_t *
asyncmeta_bc_in_queue(a_metaconn_t * mc,bm_context_t * bc)227 asyncmeta_bc_in_queue(a_metaconn_t *mc, bm_context_t *bc)
228 {
229 	bm_context_t *om;
230 	LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
231 		if (om == bc) {
232 			return bc;
233 		}
234 	}
235 	return NULL;
236 }
237