1 /* message_queue.c - routines to maintain the per-connection lists
2 * of pending operations */
3 /* $OpenLDAP$ */
4 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
5 *
6 * Copyright 2016-2021 The OpenLDAP Foundation.
7 * Portions Copyright 2016 Symas Corporation.
8 * All rights reserved.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted only as authorized by the OpenLDAP
12 * Public License.
13 *
14 * A copy of this license is available in the file LICENSE in the
15 * top-level directory of the distribution or, alternatively, at
16 * <http://www.OpenLDAP.org/license.html>.
17 */
18
19 /* ACKNOWLEDGEMENTS:
20 * This work was developed by Symas Corporation
21 * based on back-meta module for inclusion in OpenLDAP Software.
22 * This work was sponsored by Ericsson. */
23
24 #include "portable.h"
25
26 #include <stdio.h>
27
28 #include <ac/socket.h>
29 #include <ac/string.h>
30 #include <ac/time.h>
31
32 #include "lutil.h"
33 #include "slap.h"
34 #include "../back-ldap/back-ldap.h"
35 #include "back-asyncmeta.h"
36 #include "../../../libraries/liblber/lber-int.h"
37 #include "lutil.h"
38
39
40 typedef struct listptr {
41 void *reserved;
42 struct listptr *next;
43 } listptr;
44
45 typedef struct listhead {
46 struct listptr *list;
47 int cnt;
48 } listhead;
49
50 #ifndef LH_MAX
51 #define LH_MAX 16
52 #endif
53
asyncmeta_memctx_put(void * threadctx,void * memctx)54 static void asyncmeta_memctx_put(void *threadctx, void *memctx)
55 {
56 slap_sl_mem_setctx(threadctx, NULL);
57 slap_sl_mem_destroy((void *)1, memctx);
58 }
59
asyncmeta_new_bm_context(Operation * op,SlapReply * rs,bm_context_t ** new_bc,int ntargets,a_metainfo_t * mi)60 int asyncmeta_new_bm_context(Operation *op,
61 SlapReply *rs,
62 bm_context_t **new_bc,
63 int ntargets,
64 a_metainfo_t *mi)
65 {
66 int i;
67 *new_bc = op->o_tmpcalloc( 1, sizeof( bm_context_t ), op->o_tmpmemctx );
68
69 (*new_bc)->op = op;
70 (*new_bc)->copy_op = *op;
71 (*new_bc)->candidates = op->o_tmpcalloc(ntargets, sizeof(SlapReply),op->o_tmpmemctx);
72 (*new_bc)->msgids = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx);
73 (*new_bc)->nretries = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx);
74 (*new_bc)->c_peer_name = op->o_conn->c_peer_name;
75 (*new_bc)->is_root = be_isroot( op );
76
77 switch(op->o_tag) {
78 case LDAP_REQ_COMPARE:
79 {
80 AttributeAssertion *ava = op->o_tmpcalloc( 1, sizeof(AttributeAssertion), op->o_tmpmemctx );
81 *ava = *op->orc_ava;
82 op->orc_ava = ava;
83 }
84 break;
85 case LDAP_REQ_MODRDN:
86 if (op->orr_newSup != NULL) {
87 struct berval *bv = op->o_tmpalloc( sizeof( struct berval ), op->o_tmpmemctx );
88 *bv = *op->orr_newSup;
89 op->orr_newSup = bv;
90 }
91
92 if (op->orr_nnewSup != NULL) {
93 struct berval *bv = op->o_tmpalloc( sizeof( struct berval ), op->o_tmpmemctx );
94 *bv = *op->orr_nnewSup;
95 op->orr_nnewSup = bv;
96 }
97 break;
98 default:
99 break;
100 }
101 for (i = 0; i < ntargets; i++) {
102 (*new_bc)->msgids[i] = META_MSGID_UNDEFINED;
103 }
104 for (i = 0; i < ntargets; i++) {
105 (*new_bc)->nretries[i] = mi->mi_targets[i]->mt_nretries;
106 }
107 return LDAP_SUCCESS;
108 }
109
asyncmeta_free_op(Operation * op)110 void asyncmeta_free_op(Operation *op)
111 {
112 assert (op != NULL);
113 switch (op->o_tag) {
114 case LDAP_REQ_SEARCH:
115 break;
116 case LDAP_REQ_ADD:
117 if ( op->ora_modlist != NULL ) {
118 slap_mods_free(op->ora_modlist, 0 );
119 }
120
121 if ( op->ora_e != NULL ) {
122 entry_free( op->ora_e );
123 }
124
125 break;
126 case LDAP_REQ_MODIFY:
127 if ( op->orm_modlist != NULL ) {
128 slap_mods_free(op->orm_modlist, 1 );
129 }
130 break;
131 case LDAP_REQ_MODRDN:
132 if ( op->orr_modlist != NULL ) {
133 slap_mods_free(op->orr_modlist, 1 );
134 }
135 break;
136 case LDAP_REQ_COMPARE:
137 break;
138 case LDAP_REQ_DELETE:
139 break;
140 default:
141 Debug( LDAP_DEBUG_TRACE, "==> asyncmeta_free_op : other message type" );
142 }
143
144 connection_op_finish( op );
145 slap_op_free( op, op->o_threadctx );
146 }
147
148
149
150
asyncmeta_clear_bm_context(bm_context_t * bc)151 void asyncmeta_clear_bm_context(bm_context_t *bc)
152 {
153
154 Operation *op = bc->op;
155 void *thrctx, *memctx;
156 int i;
157
158 if ( bc->bc_mc && bc->bc_mc->mc_info ) {
159 for (i = 0; i < bc->bc_mc->mc_info->mi_ntargets; i++) {
160 if (bc->candidates[ i ].sr_text != NULL) {
161 ch_free( (char *)bc->candidates[ i ].sr_text );
162 bc->candidates[ i ].sr_text = NULL;
163 }
164 }
165 }
166
167 if (op->o_conn->c_conn_idx == -1)
168 return;
169 memctx = op->o_tmpmemctx;
170 thrctx = op->o_threadctx;
171 while (op->o_bd == bc->copy_op.o_bd)
172 ldap_pvt_thread_yield();
173 asyncmeta_free_op(op);
174 asyncmeta_memctx_put(thrctx, memctx);
175 }
176
asyncmeta_add_message_queue(a_metaconn_t * mc,bm_context_t * bc)177 int asyncmeta_add_message_queue(a_metaconn_t *mc, bm_context_t *bc)
178 {
179 a_metainfo_t *mi = mc->mc_info;
180 int max_pending_ops = (mi->mi_max_pending_ops == 0) ? META_BACK_CFG_MAX_PENDING_OPS : mi->mi_max_pending_ops;
181
182 Debug( LDAP_DEBUG_TRACE, "add_message_queue: mc %p, pending_ops %d, max_pending %d\n",
183 mc, mc->pending_ops, max_pending_ops );
184
185 assert(bc->bc_mc == NULL);
186 if (mc->pending_ops >= max_pending_ops) {
187 return LDAP_BUSY;
188 }
189 bc->bc_mc = mc;
190
191 slap_sl_mem_setctx(bc->op->o_threadctx, NULL);
192 LDAP_STAILQ_INSERT_TAIL( &mc->mc_om_list, bc, bc_next);
193 mc->pending_ops++;
194 return LDAP_SUCCESS;
195 }
196
197
198 void
asyncmeta_drop_bc(a_metaconn_t * mc,bm_context_t * bc)199 asyncmeta_drop_bc(a_metaconn_t *mc, bm_context_t *bc)
200 {
201 bm_context_t *om;
202 LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
203 if (om == bc) {
204 LDAP_STAILQ_REMOVE(&mc->mc_om_list, om, bm_context_t, bc_next);
205 mc->pending_ops--;
206 break;
207 }
208 }
209 assert(om == bc);
210 assert(bc->bc_mc == mc);
211 }
212
213
214 bm_context_t *
asyncmeta_find_message(ber_int_t msgid,a_metaconn_t * mc,int candidate)215 asyncmeta_find_message(ber_int_t msgid, a_metaconn_t *mc, int candidate)
216 {
217 bm_context_t *om;
218 LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
219 if (om->candidates[candidate].sr_msgid == msgid && !om->bc_invalid) {
220 break;
221 }
222 }
223 return om;
224 }
225
226 bm_context_t *
asyncmeta_bc_in_queue(a_metaconn_t * mc,bm_context_t * bc)227 asyncmeta_bc_in_queue(a_metaconn_t *mc, bm_context_t *bc)
228 {
229 bm_context_t *om;
230 LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
231 if (om == bc) {
232 return bc;
233 }
234 }
235 return NULL;
236 }
237