1 /* $NetBSD: syncprov.c,v 1.3 2021/08/14 16:15:02 christos Exp $ */
2
3 /* $OpenLDAP$ */
4 /* syncprov.c - syncrepl provider */
5 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
6 *
7 * Copyright 2004-2021 The OpenLDAP Foundation.
8 * All rights reserved.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted only as authorized by the OpenLDAP
12 * Public License.
13 *
14 * A copy of this license is available in the file LICENSE in the
15 * top-level directory of the distribution or, alternatively, at
16 * <http://www.OpenLDAP.org/license.html>.
17 */
18 /* ACKNOWLEDGEMENTS:
19 * This work was initially developed by Howard Chu for inclusion in
20 * OpenLDAP Software.
21 */
22
23 #include <sys/cdefs.h>
24 __RCSID("$NetBSD: syncprov.c,v 1.3 2021/08/14 16:15:02 christos Exp $");
25
26 #include "portable.h"
27
28 #ifdef SLAPD_OVER_SYNCPROV
29
30 #include <ac/string.h>
31 #include "lutil.h"
32 #include "slap.h"
33 #include "slap-config.h"
34 #include "ldap_rq.h"
35
36 #ifdef LDAP_DEVEL
37 #define CHECK_CSN 1
38 #endif
39
40 /* A modify request on a particular entry */
41 typedef struct modinst {
42 struct modinst *mi_next;
43 Operation *mi_op;
44 } modinst;
45
46 typedef struct modtarget {
47 struct modinst *mt_mods;
48 struct modinst *mt_tail;
49 struct berval mt_dn;
50 ldap_pvt_thread_mutex_t mt_mutex;
51 } modtarget;
52
53 /* All the info of a psearch result that's shared between
54 * multiple queues
55 */
56 typedef struct resinfo {
57 struct syncres *ri_list;
58 Entry *ri_e;
59 struct berval ri_dn;
60 struct berval ri_ndn;
61 struct berval ri_uuid;
62 struct berval ri_csn;
63 struct berval ri_cookie;
64 char ri_isref;
65 ldap_pvt_thread_mutex_t ri_mutex;
66 } resinfo;
67
68 /* A queued result of a persistent search */
69 typedef struct syncres {
70 struct syncres *s_next; /* list of results on this psearch queue */
71 struct syncres *s_rilist; /* list of psearches using this result */
72 resinfo *s_info;
73 char s_mode;
74 } syncres;
75
76 /* Record of a persistent search */
77 typedef struct syncops {
78 struct syncops *s_next;
79 struct syncprov_info_t *s_si;
80 struct berval s_base; /* ndn of search base */
81 ID s_eid; /* entryID of search base */
82 Operation *s_op; /* search op */
83 int s_rid;
84 int s_sid;
85 struct berval s_filterstr;
86 int s_flags; /* search status */
87 #define PS_IS_REFRESHING 0x01
88 #define PS_IS_DETACHED 0x02
89 #define PS_WROTE_BASE 0x04
90 #define PS_FIND_BASE 0x08
91 #define PS_FIX_FILTER 0x10
92 #define PS_TASK_QUEUED 0x20
93
94 int s_inuse; /* reference count */
95 struct syncres *s_res;
96 struct syncres *s_restail;
97 void *s_pool_cookie;
98 ldap_pvt_thread_mutex_t s_mutex;
99 } syncops;
100
101 /* A received sync control */
102 typedef struct sync_control {
103 struct sync_cookie sr_state;
104 int sr_rhint;
105 } sync_control;
106
107 #if 0 /* moved back to slap.h */
108 #define o_sync o_ctrlflag[slap_cids.sc_LDAPsync]
109 #endif
110 /* o_sync_mode uses data bits of o_sync */
111 #define o_sync_mode o_ctrlflag[slap_cids.sc_LDAPsync]
112
113 #define SLAP_SYNC_NONE (LDAP_SYNC_NONE<<SLAP_CONTROL_SHIFT)
114 #define SLAP_SYNC_REFRESH (LDAP_SYNC_REFRESH_ONLY<<SLAP_CONTROL_SHIFT)
115 #define SLAP_SYNC_PERSIST (LDAP_SYNC_RESERVED<<SLAP_CONTROL_SHIFT)
116 #define SLAP_SYNC_REFRESH_AND_PERSIST (LDAP_SYNC_REFRESH_AND_PERSIST<<SLAP_CONTROL_SHIFT)
117
118 /* Record of which searches matched at premodify step */
119 typedef struct syncmatches {
120 struct syncmatches *sm_next;
121 syncops *sm_op;
122 } syncmatches;
123
124 /* Session log data */
125 typedef struct slog_entry {
126 struct berval se_uuid;
127 struct berval se_csn;
128 int se_sid;
129 ber_tag_t se_tag;
130 } slog_entry;
131
132 typedef struct sessionlog {
133 BerVarray sl_mincsn;
134 int *sl_sids;
135 int sl_numcsns;
136 int sl_num;
137 int sl_size;
138 int sl_playing;
139 TAvlnode *sl_entries;
140 ldap_pvt_thread_rdwr_t sl_mutex;
141 } sessionlog;
142
143 /* Accesslog callback data */
144 typedef struct syncprov_accesslog_deletes {
145 Operation *op;
146 SlapReply *rs;
147 sync_control *srs;
148 BerVarray ctxcsn;
149 int numcsns, *sids;
150 Avlnode *uuids;
151 BerVarray uuid_list;
152 int ndel, list_len;
153 char *uuid_buf;
154 } syncprov_accesslog_deletes;
155
156 /* The main state for this overlay */
157 typedef struct syncprov_info_t {
158 syncops *si_ops;
159 struct berval si_contextdn;
160 struct berval si_logbase;
161 BerVarray si_ctxcsn; /* ldapsync context */
162 int *si_sids;
163 int si_numcsns;
164 int si_chkops; /* checkpointing info */
165 int si_chktime;
166 int si_numops; /* number of ops since last checkpoint */
167 int si_nopres; /* Skip present phase */
168 int si_usehint; /* use reload hint */
169 int si_active; /* True if there are active mods */
170 int si_dirty; /* True if the context is dirty, i.e changes
171 * have been made without updating the csn. */
172 time_t si_chklast; /* time of last checkpoint */
173 Avlnode *si_mods; /* entries being modified */
174 sessionlog *si_logs;
175 ldap_pvt_thread_rdwr_t si_csn_rwlock;
176 ldap_pvt_thread_mutex_t si_ops_mutex;
177 ldap_pvt_thread_mutex_t si_mods_mutex;
178 ldap_pvt_thread_mutex_t si_resp_mutex;
179 } syncprov_info_t;
180
181 typedef struct opcookie {
182 slap_overinst *son;
183 syncmatches *smatches;
184 modtarget *smt;
185 Entry *se;
186 struct berval sdn; /* DN of entry, for deletes */
187 struct berval sndn;
188 struct berval suuid; /* UUID of entry */
189 struct berval sctxcsn;
190 short osid; /* sid of op csn */
191 short rsid; /* sid of relay */
192 short sreference; /* Is the entry a reference? */
193 syncres ssres;
194 } opcookie;
195
196 typedef struct fbase_cookie {
197 struct berval *fdn; /* DN of a modified entry, for scope testing */
198 syncops *fss; /* persistent search we're testing against */
199 int fbase; /* if TRUE we found the search base and it's still valid */
200 int fscope; /* if TRUE then fdn is within the psearch scope */
201 } fbase_cookie;
202
203 static AttributeName csn_anlist[3];
204 static AttributeName uuid_anlist[2];
205
206 static AttributeDescription *ad_reqType, *ad_reqResult, *ad_reqDN,
207 *ad_reqEntryUUID, *ad_minCSN, *ad_reqNewDN;
208
209 /* Build a LDAPsync intermediate state control */
210 static int
syncprov_state_ctrl(Operation * op,SlapReply * rs,Entry * e,int entry_sync_state,LDAPControl ** ctrls,int num_ctrls,int send_cookie,struct berval * cookie)211 syncprov_state_ctrl(
212 Operation *op,
213 SlapReply *rs,
214 Entry *e,
215 int entry_sync_state,
216 LDAPControl **ctrls,
217 int num_ctrls,
218 int send_cookie,
219 struct berval *cookie )
220 {
221 Attribute* a;
222 int ret;
223
224 BerElementBuffer berbuf;
225 BerElement *ber = (BerElement *)&berbuf;
226 LDAPControl *cp;
227 struct berval bv;
228 struct berval entryuuid_bv = BER_BVNULL;
229
230 ber_init2( ber, 0, LBER_USE_DER );
231 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx );
232
233 for ( a = e->e_attrs; a != NULL; a = a->a_next ) {
234 AttributeDescription *desc = a->a_desc;
235 if ( desc == slap_schema.si_ad_entryUUID ) {
236 entryuuid_bv = a->a_nvals[0];
237 break;
238 }
239 }
240
241 /* FIXME: what if entryuuid is NULL or empty ? */
242
243 if ( send_cookie && cookie ) {
244 ber_printf( ber, "{eOON}",
245 entry_sync_state, &entryuuid_bv, cookie );
246 } else {
247 ber_printf( ber, "{eON}",
248 entry_sync_state, &entryuuid_bv );
249 }
250
251 ret = ber_flatten2( ber, &bv, 0 );
252 if ( ret == 0 ) {
253 cp = op->o_tmpalloc( sizeof( LDAPControl ) + bv.bv_len, op->o_tmpmemctx );
254 cp->ldctl_oid = LDAP_CONTROL_SYNC_STATE;
255 cp->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL);
256 cp->ldctl_value.bv_val = (char *)&cp[1];
257 cp->ldctl_value.bv_len = bv.bv_len;
258 AC_MEMCPY( cp->ldctl_value.bv_val, bv.bv_val, bv.bv_len );
259 ctrls[num_ctrls] = cp;
260 }
261 ber_free_buf( ber );
262
263 if ( ret < 0 ) {
264 Debug( LDAP_DEBUG_TRACE,
265 "slap_build_sync_ctrl: ber_flatten2 failed (%d)\n",
266 ret );
267 send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
268 return LDAP_OTHER;
269 }
270
271 return LDAP_SUCCESS;
272 }
273
274 /* Build a LDAPsync final state control */
275 static int
syncprov_done_ctrl(Operation * op,SlapReply * rs,LDAPControl ** ctrls,int num_ctrls,int send_cookie,struct berval * cookie,int refreshDeletes)276 syncprov_done_ctrl(
277 Operation *op,
278 SlapReply *rs,
279 LDAPControl **ctrls,
280 int num_ctrls,
281 int send_cookie,
282 struct berval *cookie,
283 int refreshDeletes )
284 {
285 int ret;
286 BerElementBuffer berbuf;
287 BerElement *ber = (BerElement *)&berbuf;
288 LDAPControl *cp;
289 struct berval bv;
290
291 ber_init2( ber, NULL, LBER_USE_DER );
292 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx );
293
294 ber_printf( ber, "{" );
295 if ( send_cookie && cookie ) {
296 ber_printf( ber, "O", cookie );
297 }
298 if ( refreshDeletes == LDAP_SYNC_REFRESH_DELETES ) {
299 ber_printf( ber, "b", refreshDeletes );
300 }
301 ber_printf( ber, "N}" );
302
303 ret = ber_flatten2( ber, &bv, 0 );
304 if ( ret == 0 ) {
305 cp = op->o_tmpalloc( sizeof( LDAPControl ) + bv.bv_len, op->o_tmpmemctx );
306 cp->ldctl_oid = LDAP_CONTROL_SYNC_DONE;
307 cp->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL);
308 cp->ldctl_value.bv_val = (char *)&cp[1];
309 cp->ldctl_value.bv_len = bv.bv_len;
310 AC_MEMCPY( cp->ldctl_value.bv_val, bv.bv_val, bv.bv_len );
311 ctrls[num_ctrls] = cp;
312 }
313
314 ber_free_buf( ber );
315
316 if ( ret < 0 ) {
317 Debug( LDAP_DEBUG_TRACE,
318 "syncprov_done_ctrl: ber_flatten2 failed (%d)\n",
319 ret );
320 send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
321 return LDAP_OTHER;
322 }
323
324 return LDAP_SUCCESS;
325 }
326
327 static int
syncprov_sendinfo(Operation * op,SlapReply * rs,int type,struct berval * cookie,int refreshDone,BerVarray syncUUIDs,int refreshDeletes)328 syncprov_sendinfo(
329 Operation *op,
330 SlapReply *rs,
331 int type,
332 struct berval *cookie,
333 int refreshDone,
334 BerVarray syncUUIDs,
335 int refreshDeletes )
336 {
337 BerElementBuffer berbuf;
338 BerElement *ber = (BerElement *)&berbuf;
339 struct berval rspdata;
340
341 int ret;
342
343 ber_init2( ber, NULL, LBER_USE_DER );
344 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx );
345
346 if ( type ) {
347 switch ( type ) {
348 case LDAP_TAG_SYNC_NEW_COOKIE:
349 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendinfo: "
350 "sending a new cookie=%s\n",
351 op->o_log_prefix, cookie->bv_val );
352 ber_printf( ber, "tO", type, cookie );
353 break;
354 case LDAP_TAG_SYNC_REFRESH_DELETE:
355 case LDAP_TAG_SYNC_REFRESH_PRESENT:
356 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendinfo: "
357 "%s cookie=%s\n",
358 op->o_log_prefix,
359 type == LDAP_TAG_SYNC_REFRESH_DELETE ? "refreshDelete" : "refreshPresent",
360 cookie ? cookie->bv_val : "" );
361 ber_printf( ber, "t{", type );
362 if ( cookie ) {
363 ber_printf( ber, "O", cookie );
364 }
365 if ( refreshDone == 0 ) {
366 ber_printf( ber, "b", refreshDone );
367 }
368 ber_printf( ber, "N}" );
369 break;
370 case LDAP_TAG_SYNC_ID_SET:
371 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendinfo: "
372 "%s syncIdSet cookie=%s\n",
373 op->o_log_prefix, refreshDeletes ? "delete" : "present",
374 cookie ? cookie->bv_val : "" );
375 ber_printf( ber, "t{", type );
376 if ( cookie ) {
377 ber_printf( ber, "O", cookie );
378 }
379 if ( refreshDeletes == 1 ) {
380 ber_printf( ber, "b", refreshDeletes );
381 }
382 ber_printf( ber, "[W]", syncUUIDs );
383 ber_printf( ber, "N}" );
384 break;
385 default:
386 Debug( LDAP_DEBUG_TRACE,
387 "%s syncprov_sendinfo: invalid syncinfo type (%d)\n",
388 op->o_log_prefix, type );
389 return LDAP_OTHER;
390 }
391 }
392
393 ret = ber_flatten2( ber, &rspdata, 0 );
394
395 if ( ret < 0 ) {
396 Debug( LDAP_DEBUG_TRACE,
397 "syncprov_sendinfo: ber_flatten2 failed (%d)\n",
398 ret );
399 send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
400 return LDAP_OTHER;
401 }
402
403 rs->sr_rspoid = LDAP_SYNC_INFO;
404 rs->sr_rspdata = &rspdata;
405 send_ldap_intermediate( op, rs );
406 rs->sr_rspdata = NULL;
407 ber_free_buf( ber );
408
409 return LDAP_SUCCESS;
410 }
411
412 /* Find a modtarget in an AVL tree */
413 static int
sp_avl_cmp(const void * c1,const void * c2)414 sp_avl_cmp( const void *c1, const void *c2 )
415 {
416 const modtarget *m1, *m2;
417 int rc;
418
419 m1 = c1; m2 = c2;
420 rc = m1->mt_dn.bv_len - m2->mt_dn.bv_len;
421
422 if ( rc ) return rc;
423 return ber_bvcmp( &m1->mt_dn, &m2->mt_dn );
424 }
425
426 static int
sp_uuid_cmp(const void * l,const void * r)427 sp_uuid_cmp( const void *l, const void *r )
428 {
429 const struct berval *left = l, *right = r;
430
431 return ber_bvcmp( left, right );
432 }
433
434 static int
syncprov_sessionlog_cmp(const void * l,const void * r)435 syncprov_sessionlog_cmp( const void *l, const void *r )
436 {
437 const slog_entry *left = l, *right = r;
438 int ret = ber_bvcmp( &left->se_csn, &right->se_csn );
439 if ( !ret )
440 ret = ber_bvcmp( &left->se_uuid, &right->se_uuid );
441 /* Only time we have two modifications with same CSN is when we detect a
442 * rename during replication.
443 * We invert the test here because LDAP_REQ_MODDN is
444 * numerically greater than LDAP_REQ_MODIFY but we
445 * want it to occur first.
446 */
447 if ( !ret )
448 ret = right->se_tag - left->se_tag;
449
450 return ret;
451 }
452
453 /* syncprov_findbase:
454 * finds the true DN of the base of a search (with alias dereferencing) and
455 * checks to make sure the base entry doesn't get replaced with a different
456 * entry (e.g., swapping trees via ModDN, or retargeting an alias). If a
457 * change is detected, any persistent search on this base must be terminated /
458 * reloaded.
459 * On the first call, we just save the DN and entryID. On subsequent calls
460 * we compare the DN and entryID with the saved values.
461 */
462 static int
findbase_cb(Operation * op,SlapReply * rs)463 findbase_cb( Operation *op, SlapReply *rs )
464 {
465 slap_callback *sc = op->o_callback;
466
467 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
468 fbase_cookie *fc = sc->sc_private;
469
470 /* If no entryID, we're looking for the first time.
471 * Just store whatever we got.
472 */
473 if ( fc->fss->s_eid == NOID ) {
474 fc->fbase = 2;
475 fc->fss->s_eid = rs->sr_entry->e_id;
476 ber_dupbv( &fc->fss->s_base, &rs->sr_entry->e_nname );
477
478 } else if ( rs->sr_entry->e_id == fc->fss->s_eid &&
479 dn_match( &rs->sr_entry->e_nname, &fc->fss->s_base )) {
480
481 /* OK, the DN is the same and the entryID is the same. */
482 fc->fbase = 1;
483 }
484 }
485 if ( rs->sr_err != LDAP_SUCCESS ) {
486 Debug( LDAP_DEBUG_ANY, "findbase failed! %d\n", rs->sr_err );
487 }
488 return LDAP_SUCCESS;
489 }
490
491 static Filter generic_filter = { LDAP_FILTER_PRESENT, { 0 }, NULL };
492 static struct berval generic_filterstr = BER_BVC("(objectclass=*)");
493
494 static int
syncprov_findbase(Operation * op,fbase_cookie * fc)495 syncprov_findbase( Operation *op, fbase_cookie *fc )
496 {
497 /* Use basic parameters from syncrepl search, but use
498 * current op's threadctx / tmpmemctx
499 */
500 ldap_pvt_thread_mutex_lock( &fc->fss->s_mutex );
501 if ( fc->fss->s_flags & PS_FIND_BASE ) {
502 slap_callback cb = {0};
503 Operation fop;
504 SlapReply frs = { REP_RESULT };
505 int rc;
506
507 fc->fss->s_flags ^= PS_FIND_BASE;
508 ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex );
509
510 fop = *fc->fss->s_op;
511
512 fop.o_bd = fop.o_bd->bd_self;
513 fop.o_hdr = op->o_hdr;
514 fop.o_time = op->o_time;
515 fop.o_tincr = op->o_tincr;
516 fop.o_extra = op->o_extra;
517
518 cb.sc_response = findbase_cb;
519 cb.sc_private = fc;
520
521 fop.o_sync_mode = 0; /* turn off sync mode */
522 fop.o_managedsait = SLAP_CONTROL_CRITICAL;
523 fop.o_callback = &cb;
524 fop.o_tag = LDAP_REQ_SEARCH;
525 fop.ors_scope = LDAP_SCOPE_BASE;
526 fop.ors_limit = NULL;
527 fop.ors_slimit = 1;
528 fop.ors_tlimit = SLAP_NO_LIMIT;
529 fop.ors_attrs = slap_anlist_no_attrs;
530 fop.ors_attrsonly = 1;
531 fop.ors_filter = &generic_filter;
532 fop.ors_filterstr = generic_filterstr;
533
534 Debug( LDAP_DEBUG_SYNC, "%s syncprov_findbase: searching\n", op->o_log_prefix );
535 rc = fop.o_bd->be_search( &fop, &frs );
536 } else {
537 ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex );
538 fc->fbase = 1;
539 }
540
541 /* After the first call, see if the fdn resides in the scope */
542 if ( fc->fbase == 1 ) {
543 switch ( fc->fss->s_op->ors_scope ) {
544 case LDAP_SCOPE_BASE:
545 fc->fscope = dn_match( fc->fdn, &fc->fss->s_base );
546 break;
547 case LDAP_SCOPE_ONELEVEL: {
548 struct berval pdn;
549 dnParent( fc->fdn, &pdn );
550 fc->fscope = dn_match( &pdn, &fc->fss->s_base );
551 break; }
552 case LDAP_SCOPE_SUBTREE:
553 fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base );
554 break;
555 case LDAP_SCOPE_SUBORDINATE:
556 fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ) &&
557 !dn_match( fc->fdn, &fc->fss->s_base );
558 break;
559 }
560 }
561
562 if ( fc->fbase )
563 return LDAP_SUCCESS;
564
565 /* If entryID has changed, then the base of this search has
566 * changed. Invalidate the psearch.
567 */
568 return LDAP_NO_SUCH_OBJECT;
569 }
570
571 /* syncprov_findcsn:
572 * This function has three different purposes, but they all use a search
573 * that filters on entryCSN so they're combined here.
574 * 1: at startup time, after a contextCSN has been read from the database,
575 * we search for all entries with CSN >= contextCSN in case the contextCSN
576 * was not checkpointed at the previous shutdown.
577 *
578 * 2: when the current contextCSN is known and we have a sync cookie, we search
579 * for one entry with CSN = the cookie CSN. If not found, try <= cookie CSN.
580 * If an entry is found, the cookie CSN is valid, otherwise it is stale.
581 *
582 * 3: during a refresh phase, we search for all entries with CSN <= the cookie
583 * CSN, and generate Present records for them. We always collect this result
584 * in SyncID sets, even if there's only one match.
585 */
586 typedef enum find_csn_t {
587 FIND_MAXCSN = 1,
588 FIND_CSN = 2,
589 FIND_PRESENT = 3
590 } find_csn_t;
591
592 static int
findmax_cb(Operation * op,SlapReply * rs)593 findmax_cb( Operation *op, SlapReply *rs )
594 {
595 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
596 struct berval *maxcsn = op->o_callback->sc_private;
597 Attribute *a = attr_find( rs->sr_entry->e_attrs,
598 slap_schema.si_ad_entryCSN );
599
600 if ( a && ber_bvcmp( &a->a_vals[0], maxcsn ) > 0 &&
601 slap_parse_csn_sid( &a->a_vals[0] ) == slap_serverID ) {
602 maxcsn->bv_len = a->a_vals[0].bv_len;
603 strcpy( maxcsn->bv_val, a->a_vals[0].bv_val );
604 }
605 }
606 return LDAP_SUCCESS;
607 }
608
609 static int
findcsn_cb(Operation * op,SlapReply * rs)610 findcsn_cb( Operation *op, SlapReply *rs )
611 {
612 slap_callback *sc = op->o_callback;
613
614 /* We just want to know that at least one exists, so it's OK if
615 * we exceed the unchecked limit.
616 */
617 if ( rs->sr_err == LDAP_ADMINLIMIT_EXCEEDED ||
618 (rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS )) {
619 sc->sc_private = (void *)1;
620 }
621 return LDAP_SUCCESS;
622 }
623
624 /* Build a list of entryUUIDs for sending in a SyncID set */
625
626 #define UUID_LEN 16
627
628 typedef struct fpres_cookie {
629 int num;
630 BerVarray uuids;
631 char *last;
632 } fpres_cookie;
633
634 static int
findpres_cb(Operation * op,SlapReply * rs)635 findpres_cb( Operation *op, SlapReply *rs )
636 {
637 slap_callback *sc = op->o_callback;
638 fpres_cookie *pc = sc->sc_private;
639 Attribute *a;
640 int ret = SLAP_CB_CONTINUE;
641
642 switch ( rs->sr_type ) {
643 case REP_SEARCH:
644 a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID );
645 if ( a ) {
646 pc->uuids[pc->num].bv_val = pc->last;
647 AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val,
648 pc->uuids[pc->num].bv_len );
649 pc->num++;
650 pc->last = pc->uuids[pc->num].bv_val;
651 pc->uuids[pc->num].bv_val = NULL;
652 }
653 ret = LDAP_SUCCESS;
654 if ( pc->num != SLAP_SYNCUUID_SET_SIZE )
655 break;
656 /* FALLTHRU */
657 case REP_RESULT:
658 ret = rs->sr_err;
659 if ( pc->num ) {
660 ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
661 0, pc->uuids, 0 );
662 pc->uuids[pc->num].bv_val = pc->last;
663 pc->num = 0;
664 pc->last = pc->uuids[0].bv_val;
665 }
666 break;
667 default:
668 break;
669 }
670 return ret;
671 }
672
673 static int
syncprov_findcsn(Operation * op,find_csn_t mode,struct berval * csn)674 syncprov_findcsn( Operation *op, find_csn_t mode, struct berval *csn )
675 {
676 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
677 syncprov_info_t *si = on->on_bi.bi_private;
678
679 slap_callback cb = {0};
680 Operation fop;
681 SlapReply frs = { REP_RESULT };
682 char buf[LDAP_PVT_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")];
683 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
684 struct berval maxcsn;
685 Filter cf;
686 AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
687 fpres_cookie pcookie;
688 sync_control *srs = NULL;
689 struct slap_limits_set fc_limits;
690 int i, rc = LDAP_SUCCESS, findcsn_retry = 1;
691 int maxid;
692
693 if ( mode != FIND_MAXCSN ) {
694 srs = op->o_controls[slap_cids.sc_LDAPsync];
695 }
696
697 Debug( LDAP_DEBUG_SYNC, "%s syncprov_findcsn: mode=%s csn=%s\n",
698 op->o_log_prefix,
699 mode == FIND_MAXCSN ?
700 "FIND_MAXCSN" :
701 mode == FIND_CSN ?
702 "FIND_CSN" :
703 "FIND_PRESENT",
704 csn ? csn->bv_val : "" );
705
706 fop = *op;
707 fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync_mode */
708 /* We want pure entries, not referrals */
709 fop.o_managedsait = SLAP_CONTROL_CRITICAL;
710
711 cf.f_ava = &eq;
712 cf.f_av_desc = slap_schema.si_ad_entryCSN;
713 BER_BVZERO( &cf.f_av_value );
714 cf.f_next = NULL;
715
716 fop.o_callback = &cb;
717 fop.ors_limit = NULL;
718 fop.ors_tlimit = SLAP_NO_LIMIT;
719 fop.ors_filter = &cf;
720 fop.ors_filterstr.bv_val = buf;
721
722 again:
723 switch( mode ) {
724 case FIND_MAXCSN:
725 cf.f_choice = LDAP_FILTER_GE;
726 /* If there are multiple CSNs, use the one with our serverID */
727 for ( i=0; i<si->si_numcsns; i++) {
728 if ( slap_serverID == si->si_sids[i] ) {
729 maxid = i;
730 break;
731 }
732 }
733 if ( i == si->si_numcsns ) {
734 /* No match: this is multimaster, and none of the content in the DB
735 * originated locally. Treat like no CSN.
736 */
737 return LDAP_NO_SUCH_OBJECT;
738 }
739 cf.f_av_value = si->si_ctxcsn[maxid];
740 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ),
741 "(entryCSN>=%s)", cf.f_av_value.bv_val );
742 if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) {
743 return LDAP_OTHER;
744 }
745 fop.ors_attrsonly = 0;
746 fop.ors_attrs = csn_anlist;
747 fop.ors_slimit = SLAP_NO_LIMIT;
748 cb.sc_private = &maxcsn;
749 cb.sc_response = findmax_cb;
750 strcpy( cbuf, cf.f_av_value.bv_val );
751 maxcsn.bv_val = cbuf;
752 maxcsn.bv_len = cf.f_av_value.bv_len;
753 break;
754 case FIND_CSN:
755 if ( BER_BVISEMPTY( &cf.f_av_value )) {
756 cf.f_av_value = *csn;
757 }
758 fop.o_dn = op->o_bd->be_rootdn;
759 fop.o_ndn = op->o_bd->be_rootndn;
760 fop.o_req_dn = op->o_bd->be_suffix[0];
761 fop.o_req_ndn = op->o_bd->be_nsuffix[0];
762 /* Look for exact match the first time */
763 if ( findcsn_retry ) {
764 cf.f_choice = LDAP_FILTER_EQUALITY;
765 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ),
766 "(entryCSN=%s)", cf.f_av_value.bv_val );
767 /* On retry, look for <= */
768 } else {
769 cf.f_choice = LDAP_FILTER_LE;
770 fop.ors_limit = &fc_limits;
771 memset( &fc_limits, 0, sizeof( fc_limits ));
772 fc_limits.lms_s_unchecked = 1;
773 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ),
774 "(entryCSN<=%s)", cf.f_av_value.bv_val );
775 }
776 if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) {
777 return LDAP_OTHER;
778 }
779 fop.ors_attrsonly = 1;
780 fop.ors_attrs = slap_anlist_no_attrs;
781 fop.ors_slimit = 1;
782 cb.sc_private = NULL;
783 cb.sc_response = findcsn_cb;
784 break;
785 case FIND_PRESENT:
786 fop.ors_filter = op->ors_filter;
787 fop.ors_filterstr = op->ors_filterstr;
788 fop.ors_attrsonly = 0;
789 fop.ors_attrs = uuid_anlist;
790 fop.ors_slimit = SLAP_NO_LIMIT;
791 cb.sc_private = &pcookie;
792 cb.sc_response = findpres_cb;
793 pcookie.num = 0;
794
795 /* preallocate storage for a full set */
796 pcookie.uuids = op->o_tmpalloc( (SLAP_SYNCUUID_SET_SIZE+1) *
797 sizeof(struct berval) + SLAP_SYNCUUID_SET_SIZE * UUID_LEN,
798 op->o_tmpmemctx );
799 pcookie.last = (char *)(pcookie.uuids + SLAP_SYNCUUID_SET_SIZE+1);
800 pcookie.uuids[0].bv_val = pcookie.last;
801 pcookie.uuids[0].bv_len = UUID_LEN;
802 for (i=1; i<SLAP_SYNCUUID_SET_SIZE; i++) {
803 pcookie.uuids[i].bv_val = pcookie.uuids[i-1].bv_val + UUID_LEN;
804 pcookie.uuids[i].bv_len = UUID_LEN;
805 }
806 break;
807 }
808
809 fop.o_bd->bd_info = (BackendInfo *)on->on_info;
810 fop.o_bd->be_search( &fop, &frs );
811 fop.o_bd->bd_info = (BackendInfo *)on;
812
813 switch( mode ) {
814 case FIND_MAXCSN:
815 if ( ber_bvcmp( &si->si_ctxcsn[maxid], &maxcsn )) {
816 #ifdef CHECK_CSN
817 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
818 assert( !syn->ssyn_validate( syn, &maxcsn ));
819 #endif
820 ber_bvreplace( &si->si_ctxcsn[maxid], &maxcsn );
821 si->si_numops++; /* ensure a checkpoint */
822 }
823 break;
824 case FIND_CSN:
825 /* If matching CSN was not found, invalidate the context. */
826 Debug( LDAP_DEBUG_SYNC, "%s syncprov_findcsn: csn%s=%s %sfound\n",
827 op->o_log_prefix,
828 cf.f_choice == LDAP_FILTER_EQUALITY ? "=" : "<",
829 cf.f_av_value.bv_val, cb.sc_private ? "" : "not " );
830 if ( !cb.sc_private ) {
831 /* If we didn't find an exact match, then try for <= */
832 if ( findcsn_retry ) {
833 findcsn_retry = 0;
834 rs_reinit( &frs, REP_RESULT );
835 goto again;
836 }
837 rc = LDAP_NO_SUCH_OBJECT;
838 }
839 break;
840 case FIND_PRESENT:
841 op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx );
842 break;
843 }
844
845 return rc;
846 }
847
free_resinfo(syncres * sr)848 static void free_resinfo( syncres *sr )
849 {
850 syncres **st;
851 int freeit = 0;
852 ldap_pvt_thread_mutex_lock( &sr->s_info->ri_mutex );
853 for (st = &sr->s_info->ri_list; *st; st = &(*st)->s_rilist) {
854 if (*st == sr) {
855 *st = sr->s_rilist;
856 break;
857 }
858 }
859 if ( !sr->s_info->ri_list )
860 freeit = 1;
861 ldap_pvt_thread_mutex_unlock( &sr->s_info->ri_mutex );
862 if ( freeit ) {
863 ldap_pvt_thread_mutex_destroy( &sr->s_info->ri_mutex );
864 if ( sr->s_info->ri_e )
865 entry_free( sr->s_info->ri_e );
866 if ( !BER_BVISNULL( &sr->s_info->ri_cookie ))
867 ch_free( sr->s_info->ri_cookie.bv_val );
868 ch_free( sr->s_info );
869 }
870 }
871
872 #define FS_UNLINK 1
873 #define FS_LOCK 2
874
875 static int
syncprov_free_syncop(syncops * so,int flags)876 syncprov_free_syncop( syncops *so, int flags )
877 {
878 syncres *sr, *srnext;
879 GroupAssertion *ga, *gnext;
880
881 if ( flags & FS_LOCK )
882 ldap_pvt_thread_mutex_lock( &so->s_mutex );
883 /* already being freed, or still in use */
884 if ( !so->s_inuse || --so->s_inuse > 0 ) {
885 if ( flags & FS_LOCK )
886 ldap_pvt_thread_mutex_unlock( &so->s_mutex );
887 return 0;
888 }
889 ldap_pvt_thread_mutex_unlock( &so->s_mutex );
890 if (( flags & FS_UNLINK ) && so->s_si ) {
891 syncops **sop;
892 ldap_pvt_thread_mutex_lock( &so->s_si->si_ops_mutex );
893 for ( sop = &so->s_si->si_ops; *sop; sop = &(*sop)->s_next ) {
894 if ( *sop == so ) {
895 *sop = so->s_next;
896 break;
897 }
898 }
899 ldap_pvt_thread_mutex_unlock( &so->s_si->si_ops_mutex );
900 }
901 if ( so->s_flags & PS_IS_DETACHED ) {
902 filter_free( so->s_op->ors_filter );
903 for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
904 gnext = ga->ga_next;
905 ch_free( ga );
906 }
907 ch_free( so->s_op );
908 }
909 ch_free( so->s_base.bv_val );
910 for ( sr=so->s_res; sr; sr=srnext ) {
911 srnext = sr->s_next;
912 free_resinfo( sr );
913 ch_free( sr );
914 }
915 ldap_pvt_thread_mutex_destroy( &so->s_mutex );
916 ch_free( so );
917 return 1;
918 }
919
920 /* Send a persistent search response */
921 static int
syncprov_sendresp(Operation * op,resinfo * ri,syncops * so,int mode)922 syncprov_sendresp( Operation *op, resinfo *ri, syncops *so, int mode )
923 {
924 SlapReply rs = { REP_SEARCH };
925 struct berval cookie, csns[2];
926 Entry e_uuid = {0};
927 Attribute a_uuid = {0};
928
929 if ( so->s_op->o_abandon )
930 return SLAPD_ABANDON;
931
932 rs.sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, op->o_tmpmemctx );
933 rs.sr_ctrls[1] = NULL;
934 rs.sr_flags = REP_CTRLS_MUSTBEFREED;
935 csns[0] = ri->ri_csn;
936 BER_BVZERO( &csns[1] );
937 slap_compose_sync_cookie( op, &cookie, csns, so->s_rid,
938 slap_serverID ? slap_serverID : -1, NULL );
939
940 #ifdef LDAP_DEBUG
941 if ( so->s_sid > 0 ) {
942 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: to=%03x, cookie=%s\n",
943 op->o_log_prefix, so->s_sid, cookie.bv_val );
944 } else {
945 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: cookie=%s\n",
946 op->o_log_prefix, cookie.bv_val );
947 }
948 #endif
949
950 e_uuid.e_attrs = &a_uuid;
951 a_uuid.a_desc = slap_schema.si_ad_entryUUID;
952 a_uuid.a_nvals = &ri->ri_uuid;
953 rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid,
954 mode, rs.sr_ctrls, 0, 1, &cookie );
955 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
956
957 rs.sr_entry = &e_uuid;
958 if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) {
959 e_uuid = *ri->ri_e;
960 e_uuid.e_private = NULL;
961 }
962
963 switch( mode ) {
964 case LDAP_SYNC_ADD:
965 if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
966 rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
967 rs.sr_err = send_search_reference( op, &rs );
968 ber_bvarray_free( rs.sr_ref );
969 break;
970 }
971 /* fallthru */
972 case LDAP_SYNC_MODIFY:
973 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: sending %s, dn=%s\n",
974 op->o_log_prefix,
975 mode == LDAP_SYNC_ADD ? "LDAP_SYNC_ADD" : "LDAP_SYNC_MODIFY",
976 e_uuid.e_nname.bv_val );
977 rs.sr_attrs = op->ors_attrs;
978 rs.sr_err = send_search_entry( op, &rs );
979 break;
980 case LDAP_SYNC_DELETE:
981 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: "
982 "sending LDAP_SYNC_DELETE, dn=%s\n",
983 op->o_log_prefix, ri->ri_dn.bv_val );
984 e_uuid.e_attrs = NULL;
985 e_uuid.e_name = ri->ri_dn;
986 e_uuid.e_nname = ri->ri_ndn;
987 if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
988 struct berval bv = BER_BVNULL;
989 rs.sr_ref = &bv;
990 rs.sr_err = send_search_reference( op, &rs );
991 } else {
992 rs.sr_err = send_search_entry( op, &rs );
993 }
994 break;
995 default:
996 assert(0);
997 }
998 return rs.sr_err;
999 }
1000
1001 static void
1002 syncprov_qstart( syncops *so );
1003
1004 /* Play back queued responses */
1005 static int
syncprov_qplay(Operation * op,syncops * so)1006 syncprov_qplay( Operation *op, syncops *so )
1007 {
1008 syncres *sr;
1009 int rc = 0;
1010
1011 do {
1012 ldap_pvt_thread_mutex_lock( &so->s_mutex );
1013 sr = so->s_res;
1014 /* Exit loop with mutex held */
1015 if ( !sr )
1016 break;
1017 so->s_res = sr->s_next;
1018 if ( !so->s_res )
1019 so->s_restail = NULL;
1020 ldap_pvt_thread_mutex_unlock( &so->s_mutex );
1021
1022 if ( !so->s_op->o_abandon ) {
1023
1024 if ( sr->s_mode == LDAP_SYNC_NEW_COOKIE ) {
1025 SlapReply rs = { REP_INTERMEDIATE };
1026
1027 rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE,
1028 &sr->s_info->ri_cookie, 0, NULL, 0 );
1029 } else {
1030 rc = syncprov_sendresp( op, sr->s_info, so, sr->s_mode );
1031 }
1032 }
1033
1034 free_resinfo( sr );
1035 ch_free( sr );
1036
1037 if ( so->s_op->o_abandon )
1038 continue;
1039
1040 /* Exit loop with mutex held */
1041 ldap_pvt_thread_mutex_lock( &so->s_mutex );
1042 break;
1043
1044 } while (1);
1045
1046 /* We now only send one change at a time, to prevent one
1047 * psearch from hogging all the CPU. Resubmit this task if
1048 * there are more responses queued and no errors occurred.
1049 */
1050
1051 if ( rc == 0 && so->s_res ) {
1052 syncprov_qstart( so );
1053 }
1054
1055 return rc;
1056 }
1057
1058 /* task for playing back queued responses */
1059 static void *
syncprov_qtask(void * ctx,void * arg)1060 syncprov_qtask( void *ctx, void *arg )
1061 {
1062 syncops *so = arg;
1063 OperationBuffer opbuf;
1064 Operation *op;
1065 BackendDB be;
1066 int rc;
1067
1068 op = &opbuf.ob_op;
1069 *op = *so->s_op;
1070 op->o_hdr = &opbuf.ob_hdr;
1071 op->o_controls = opbuf.ob_controls;
1072 memset( op->o_controls, 0, sizeof(opbuf.ob_controls) );
1073 op->o_sync = SLAP_CONTROL_IGNORED;
1074
1075 *op->o_hdr = *so->s_op->o_hdr;
1076
1077 op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx, 1);
1078 op->o_tmpmfuncs = &slap_sl_mfuncs;
1079 op->o_threadctx = ctx;
1080
1081 /* syncprov_qplay expects a fake db */
1082 be = *so->s_op->o_bd;
1083 be.be_flags |= SLAP_DBFLAG_OVERLAY;
1084 op->o_bd = &be;
1085 LDAP_SLIST_FIRST(&op->o_extra) = NULL;
1086 op->o_callback = NULL;
1087
1088 rc = syncprov_qplay( op, so );
1089
1090 /* if an error occurred, or no responses left, task is no longer queued */
1091 if ( !rc && !so->s_res )
1092 rc = 1;
1093
1094 /* decrement use count... */
1095 if ( !syncprov_free_syncop( so, FS_UNLINK )) {
1096 if ( rc )
1097 /* if we didn't unlink, and task is no longer queued, clear flag */
1098 so->s_flags ^= PS_TASK_QUEUED;
1099 ldap_pvt_thread_mutex_unlock( &so->s_mutex );
1100 }
1101
1102 return NULL;
1103 }
1104
1105 /* Start the task to play back queued psearch responses */
1106 static void
syncprov_qstart(syncops * so)1107 syncprov_qstart( syncops *so )
1108 {
1109 so->s_flags |= PS_TASK_QUEUED;
1110 so->s_inuse++;
1111 ldap_pvt_thread_pool_submit2( &connection_pool,
1112 syncprov_qtask, so, &so->s_pool_cookie );
1113 }
1114
1115 /* Queue a persistent search response */
1116 static int
syncprov_qresp(opcookie * opc,syncops * so,int mode)1117 syncprov_qresp( opcookie *opc, syncops *so, int mode )
1118 {
1119 syncres *sr;
1120 resinfo *ri;
1121 int srsize;
1122 struct berval csn = opc->sctxcsn;
1123
1124 sr = ch_malloc( sizeof( syncres ));
1125 sr->s_next = NULL;
1126 sr->s_mode = mode;
1127 if ( !opc->ssres.s_info ) {
1128
1129 srsize = sizeof( resinfo );
1130 if ( csn.bv_len )
1131 srsize += csn.bv_len + 1;
1132
1133 if ( opc->se ) {
1134 Attribute *a;
1135 ri = ch_malloc( srsize );
1136 ri->ri_dn = opc->se->e_name;
1137 ri->ri_ndn = opc->se->e_nname;
1138 a = attr_find( opc->se->e_attrs, slap_schema.si_ad_entryUUID );
1139 if ( a )
1140 ri->ri_uuid = a->a_nvals[0];
1141 else
1142 ri->ri_uuid.bv_len = 0;
1143 if ( csn.bv_len ) {
1144 ri->ri_csn.bv_val = (char *)(ri + 1);
1145 ri->ri_csn.bv_len = csn.bv_len;
1146 memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len );
1147 ri->ri_csn.bv_val[csn.bv_len] = '\0';
1148 } else {
1149 ri->ri_csn.bv_val = NULL;
1150 }
1151 } else {
1152 srsize += opc->suuid.bv_len +
1153 opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1;
1154 ri = ch_malloc( srsize );
1155 ri->ri_dn.bv_val = (char *)(ri + 1);
1156 ri->ri_dn.bv_len = opc->sdn.bv_len;
1157 ri->ri_ndn.bv_val = lutil_strcopy( ri->ri_dn.bv_val,
1158 opc->sdn.bv_val ) + 1;
1159 ri->ri_ndn.bv_len = opc->sndn.bv_len;
1160 ri->ri_uuid.bv_val = lutil_strcopy( ri->ri_ndn.bv_val,
1161 opc->sndn.bv_val ) + 1;
1162 ri->ri_uuid.bv_len = opc->suuid.bv_len;
1163 AC_MEMCPY( ri->ri_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
1164 if ( csn.bv_len ) {
1165 ri->ri_csn.bv_val = ri->ri_uuid.bv_val + ri->ri_uuid.bv_len;
1166 memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len );
1167 ri->ri_csn.bv_val[csn.bv_len] = '\0';
1168 } else {
1169 ri->ri_csn.bv_val = NULL;
1170 }
1171 }
1172 ri->ri_list = &opc->ssres;
1173 ri->ri_e = opc->se;
1174 ri->ri_csn.bv_len = csn.bv_len;
1175 ri->ri_isref = opc->sreference;
1176 BER_BVZERO( &ri->ri_cookie );
1177 ldap_pvt_thread_mutex_init( &ri->ri_mutex );
1178 opc->se = NULL;
1179 opc->ssres.s_info = ri;
1180 }
1181 ri = opc->ssres.s_info;
1182 sr->s_info = ri;
1183 ldap_pvt_thread_mutex_lock( &ri->ri_mutex );
1184 sr->s_rilist = ri->ri_list;
1185 ri->ri_list = sr;
1186 if ( mode == LDAP_SYNC_NEW_COOKIE && BER_BVISNULL( &ri->ri_cookie )) {
1187 syncprov_info_t *si = opc->son->on_bi.bi_private;
1188
1189 slap_compose_sync_cookie( NULL, &ri->ri_cookie, si->si_ctxcsn,
1190 so->s_rid, slap_serverID ? slap_serverID : -1, NULL );
1191 }
1192 Debug( LDAP_DEBUG_SYNC, "%s syncprov_qresp: "
1193 "set up a new syncres mode=%d csn=%s\n",
1194 so->s_op->o_log_prefix, mode, csn.bv_val ? csn.bv_val : "" );
1195 ldap_pvt_thread_mutex_unlock( &ri->ri_mutex );
1196
1197 ldap_pvt_thread_mutex_lock( &so->s_mutex );
1198 if ( !so->s_res ) {
1199 so->s_res = sr;
1200 } else {
1201 so->s_restail->s_next = sr;
1202 }
1203 so->s_restail = sr;
1204
1205 /* If the base of the psearch was modified, check it next time round */
1206 if ( so->s_flags & PS_WROTE_BASE ) {
1207 so->s_flags ^= PS_WROTE_BASE;
1208 so->s_flags |= PS_FIND_BASE;
1209 }
1210 if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) {
1211 syncprov_qstart( so );
1212 }
1213 ldap_pvt_thread_mutex_unlock( &so->s_mutex );
1214 return LDAP_SUCCESS;
1215 }
1216
1217 static int
syncprov_drop_psearch(syncops * so,int lock)1218 syncprov_drop_psearch( syncops *so, int lock )
1219 {
1220 if ( so->s_flags & PS_IS_DETACHED ) {
1221 if ( lock )
1222 ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex );
1223 so->s_op->o_conn->c_n_ops_executing--;
1224 so->s_op->o_conn->c_n_ops_completed++;
1225 LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, Operation,
1226 o_next );
1227 if ( lock )
1228 ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex );
1229 }
1230 return syncprov_free_syncop( so, FS_LOCK );
1231 }
1232
1233 static int
syncprov_ab_cleanup(Operation * op,SlapReply * rs)1234 syncprov_ab_cleanup( Operation *op, SlapReply *rs )
1235 {
1236 slap_callback *sc = op->o_callback;
1237 op->o_callback = sc->sc_next;
1238 syncprov_drop_psearch( sc->sc_private, 0 );
1239 op->o_tmpfree( sc, op->o_tmpmemctx );
1240 return 0;
1241 }
1242
1243 static int
syncprov_op_abandon(Operation * op,SlapReply * rs)1244 syncprov_op_abandon( Operation *op, SlapReply *rs )
1245 {
1246 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
1247 syncprov_info_t *si = on->on_bi.bi_private;
1248 syncops *so, **sop;
1249
1250 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1251 for ( sop=&si->si_ops; (so = *sop); sop = &(*sop)->s_next ) {
1252 if ( so->s_op->o_connid == op->o_connid &&
1253 so->s_op->o_msgid == op->orn_msgid ) {
1254 so->s_op->o_abandon = 1;
1255 *sop = so->s_next;
1256 break;
1257 }
1258 }
1259 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1260 if ( so ) {
1261 /* Is this really a Cancel exop? */
1262 if ( op->o_tag != LDAP_REQ_ABANDON ) {
1263 so->s_op->o_cancel = SLAP_CANCEL_ACK;
1264 rs->sr_err = LDAP_CANCELLED;
1265 send_ldap_result( so->s_op, rs );
1266 if ( so->s_flags & PS_IS_DETACHED ) {
1267 slap_callback *cb;
1268 cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx );
1269 cb->sc_cleanup = syncprov_ab_cleanup;
1270 cb->sc_next = op->o_callback;
1271 cb->sc_private = so;
1272 op->o_callback = cb;
1273 return SLAP_CB_CONTINUE;
1274 }
1275 }
1276 syncprov_drop_psearch( so, 0 );
1277 }
1278 return SLAP_CB_CONTINUE;
1279 }
1280
1281 /* Find which persistent searches are affected by this operation */
1282 static void
syncprov_matchops(Operation * op,opcookie * opc,int saveit)1283 syncprov_matchops( Operation *op, opcookie *opc, int saveit )
1284 {
1285 slap_overinst *on = opc->son;
1286 syncprov_info_t *si = on->on_bi.bi_private;
1287
1288 fbase_cookie fc;
1289 syncops **pss;
1290 Entry *e = NULL;
1291 Attribute *a;
1292 int rc, gonext;
1293 struct berval newdn;
1294 int freefdn = 0;
1295 BackendDB *b0 = op->o_bd, db;
1296
1297 fc.fdn = &op->o_req_ndn;
1298 /* compute new DN */
1299 if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) {
1300 struct berval pdn;
1301 if ( op->orr_nnewSup ) pdn = *op->orr_nnewSup;
1302 else dnParent( fc.fdn, &pdn );
1303 build_new_dn( &newdn, &pdn, &op->orr_nnewrdn, op->o_tmpmemctx );
1304 fc.fdn = &newdn;
1305 freefdn = 1;
1306 }
1307 if ( op->o_tag != LDAP_REQ_ADD ) {
1308 if ( !SLAP_ISOVERLAY( op->o_bd )) {
1309 db = *op->o_bd;
1310 op->o_bd = &db;
1311 }
1312 rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on );
1313 /* If we're sending responses now, make a copy and unlock the DB */
1314 if ( e && !saveit ) {
1315 if ( !opc->se )
1316 opc->se = entry_dup( e );
1317 overlay_entry_release_ov( op, e, 0, on );
1318 e = opc->se;
1319 }
1320 if ( rc ) {
1321 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: "
1322 "%s check, error finding entry dn=%s in database\n",
1323 op->o_log_prefix, saveit ? "initial" : "final", fc.fdn->bv_val );
1324 op->o_bd = b0;
1325 return;
1326 }
1327 } else {
1328 e = op->ora_e;
1329 if ( !saveit ) {
1330 if ( !opc->se )
1331 opc->se = entry_dup( e );
1332 e = opc->se;
1333 }
1334 }
1335
1336 if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
1337 ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
1338 ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
1339 opc->sreference = is_entry_referral( e );
1340 a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID );
1341 if ( a )
1342 ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx );
1343 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: "
1344 "%srecording uuid for dn=%s on opc=%p\n",
1345 op->o_log_prefix, a ? "" : "not ", opc->sdn.bv_val, opc );
1346 } else if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) {
1347 op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx );
1348 op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx );
1349 ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
1350 ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
1351 }
1352
1353 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1354 for (pss = &si->si_ops; *pss; pss = gonext ? &(*pss)->s_next : pss)
1355 {
1356 Operation op2;
1357 Opheader oh;
1358 syncmatches *sm;
1359 int found = 0;
1360 syncops *snext, *ss = *pss;
1361
1362 gonext = 1;
1363 if ( ss->s_op->o_abandon )
1364 continue;
1365
1366 /* Don't send ops back to the originator */
1367 if ( opc->osid > 0 && opc->osid == ss->s_sid ) {
1368 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: "
1369 "skipping original sid %03x\n",
1370 ss->s_op->o_log_prefix, opc->osid );
1371 continue;
1372 }
1373
1374 /* Don't send ops back to the messenger */
1375 if ( opc->rsid > 0 && opc->rsid == ss->s_sid ) {
1376 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: "
1377 "skipping relayed sid %03x\n",
1378 ss->s_op->o_log_prefix, opc->rsid );
1379 continue;
1380 }
1381
1382 /* validate base */
1383 fc.fss = ss;
1384 fc.fbase = 0;
1385 fc.fscope = 0;
1386
1387 /* If the base of the search is missing, signal a refresh */
1388 rc = syncprov_findbase( op, &fc );
1389 if ( rc != LDAP_SUCCESS ) {
1390 SlapReply rs = {REP_RESULT};
1391 send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED,
1392 "search base has changed" );
1393 snext = ss->s_next;
1394 if ( syncprov_drop_psearch( ss, 1 ) )
1395 *pss = snext;
1396 gonext = 0;
1397 continue;
1398 }
1399
1400 /* If we're sending results now, look for this op in old matches */
1401 if ( !saveit ) {
1402 syncmatches *old;
1403
1404 /* Did we modify the search base? */
1405 if ( dn_match( &op->o_req_ndn, &ss->s_base )) {
1406 ldap_pvt_thread_mutex_lock( &ss->s_mutex );
1407 ss->s_flags |= PS_WROTE_BASE;
1408 ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
1409 }
1410
1411 for ( sm=opc->smatches, old=(syncmatches *)&opc->smatches; sm;
1412 old=sm, sm=sm->sm_next ) {
1413 if ( sm->sm_op == ss ) {
1414 found = 1;
1415 old->sm_next = sm->sm_next;
1416 op->o_tmpfree( sm, op->o_tmpmemctx );
1417 break;
1418 }
1419 }
1420 }
1421
1422 if ( fc.fscope ) {
1423 ldap_pvt_thread_mutex_lock( &ss->s_mutex );
1424 op2 = *ss->s_op;
1425 oh = *op->o_hdr;
1426 oh.oh_conn = ss->s_op->o_conn;
1427 oh.oh_connid = ss->s_op->o_connid;
1428 op2.o_bd = op->o_bd->bd_self;
1429 op2.o_hdr = &oh;
1430 op2.o_extra = op->o_extra;
1431 op2.o_callback = NULL;
1432 if (ss->s_flags & PS_FIX_FILTER) {
1433 /* Skip the AND/GE clause that we stuck on in front. We
1434 would lose deletes/mods that happen during the refresh
1435 phase otherwise (ITS#6555) */
1436 op2.ors_filter = ss->s_op->ors_filter->f_and->f_next;
1437 }
1438 rc = test_filter( &op2, e, op2.ors_filter );
1439 ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
1440 }
1441
1442 Debug( LDAP_DEBUG_TRACE, "%s syncprov_matchops: "
1443 "sid %03x fscope %d rc %d\n",
1444 ss->s_op->o_log_prefix, ss->s_sid, fc.fscope, rc );
1445
1446 /* check if current o_req_dn is in scope and matches filter */
1447 if ( fc.fscope && rc == LDAP_COMPARE_TRUE ) {
1448 if ( saveit ) {
1449 sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx );
1450 sm->sm_next = opc->smatches;
1451 sm->sm_op = ss;
1452 ldap_pvt_thread_mutex_lock( &ss->s_mutex );
1453 ++ss->s_inuse;
1454 ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
1455 opc->smatches = sm;
1456 } else {
1457 /* if found send UPDATE else send ADD */
1458 syncprov_qresp( opc, ss,
1459 found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );
1460 }
1461 } else if ( !saveit && found ) {
1462 /* send DELETE */
1463 syncprov_qresp( opc, ss, LDAP_SYNC_DELETE );
1464 } else if ( !saveit ) {
1465 syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE );
1466 }
1467 if ( !saveit && found ) {
1468 /* Decrement s_inuse, was incremented when called
1469 * with saveit == TRUE
1470 */
1471 snext = ss->s_next;
1472 if ( syncprov_free_syncop( ss, FS_LOCK ) ) {
1473 *pss = snext;
1474 gonext = 0;
1475 }
1476 }
1477 }
1478 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1479
1480 if ( op->o_tag != LDAP_REQ_ADD && e ) {
1481 if ( !SLAP_ISOVERLAY( op->o_bd )) {
1482 op->o_bd = &db;
1483 }
1484 if ( saveit )
1485 overlay_entry_release_ov( op, e, 0, on );
1486 op->o_bd = b0;
1487 }
1488 if ( !saveit ) {
1489 if ( opc->ssres.s_info )
1490 free_resinfo( &opc->ssres );
1491 else if ( opc->se )
1492 entry_free( opc->se );
1493 }
1494 if ( freefdn ) {
1495 op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx );
1496 }
1497 op->o_bd = b0;
1498 }
1499
1500 static int
syncprov_op_cleanup(Operation * op,SlapReply * rs)1501 syncprov_op_cleanup( Operation *op, SlapReply *rs )
1502 {
1503 slap_callback *cb = op->o_callback;
1504 opcookie *opc = cb->sc_private;
1505 slap_overinst *on = opc->son;
1506 syncprov_info_t *si = on->on_bi.bi_private;
1507 syncmatches *sm, *snext;
1508 modtarget *mt;
1509
1510 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1511 if ( si->si_active )
1512 si->si_active--;
1513 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1514
1515 for (sm = opc->smatches; sm; sm=snext) {
1516 snext = sm->sm_next;
1517 syncprov_free_syncop( sm->sm_op, FS_LOCK|FS_UNLINK );
1518 op->o_tmpfree( sm, op->o_tmpmemctx );
1519 }
1520
1521 /* Remove op from lock table */
1522 mt = opc->smt;
1523 if ( mt ) {
1524 modinst *mi = (modinst *)(opc+1), **m2;
1525 ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
1526 for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) {
1527 if ( *m2 == mi ) {
1528 *m2 = mi->mi_next;
1529 if ( mt->mt_tail == mi )
1530 mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2;
1531 break;
1532 }
1533 }
1534 /* If there are more, promote the next one */
1535 if ( mt->mt_mods ) {
1536 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
1537 } else {
1538 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
1539 ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
1540 ldap_avl_delete( &si->si_mods, mt, sp_avl_cmp );
1541 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
1542 ldap_pvt_thread_mutex_destroy( &mt->mt_mutex );
1543 ch_free( mt->mt_dn.bv_val );
1544 ch_free( mt );
1545 }
1546 }
1547 if ( !BER_BVISNULL( &opc->suuid ))
1548 op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx );
1549 if ( !BER_BVISNULL( &opc->sndn ))
1550 op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx );
1551 if ( !BER_BVISNULL( &opc->sdn ))
1552 op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx );
1553 op->o_callback = cb->sc_next;
1554 op->o_tmpfree(cb, op->o_tmpmemctx);
1555
1556 return 0;
1557 }
1558
1559 static void
syncprov_checkpoint(Operation * op,slap_overinst * on)1560 syncprov_checkpoint( Operation *op, slap_overinst *on )
1561 {
1562 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
1563 Modifications mod;
1564 Operation opm;
1565 SlapReply rsm = {REP_RESULT};
1566 slap_callback cb = {0};
1567 BackendDB be;
1568 BackendInfo *bi;
1569
1570 #ifdef CHECK_CSN
1571 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
1572
1573 int i;
1574 for ( i=0; i<si->si_numcsns; i++ ) {
1575 assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i ));
1576 }
1577 #endif
1578
1579 Debug( LDAP_DEBUG_SYNC, "%s syncprov_checkpoint: running checkpoint\n",
1580 op->o_log_prefix );
1581
1582 mod.sml_numvals = si->si_numcsns;
1583 mod.sml_values = si->si_ctxcsn;
1584 mod.sml_nvalues = NULL;
1585 mod.sml_desc = slap_schema.si_ad_contextCSN;
1586 mod.sml_op = LDAP_MOD_REPLACE;
1587 mod.sml_flags = SLAP_MOD_INTERNAL;
1588 mod.sml_next = NULL;
1589
1590 cb.sc_response = slap_null_cb;
1591 opm = *op;
1592 opm.o_tag = LDAP_REQ_MODIFY;
1593 opm.o_callback = &cb;
1594 opm.orm_modlist = &mod;
1595 opm.orm_no_opattrs = 1;
1596 if ( SLAP_GLUE_SUBORDINATE( op->o_bd )) {
1597 be = *on->on_info->oi_origdb;
1598 opm.o_bd = &be;
1599 }
1600 opm.o_req_dn = si->si_contextdn;
1601 opm.o_req_ndn = si->si_contextdn;
1602 bi = opm.o_bd->bd_info;
1603 opm.o_bd->bd_info = on->on_info->oi_orig;
1604 opm.o_managedsait = SLAP_CONTROL_NONCRITICAL;
1605 opm.o_no_schema_check = 1;
1606 opm.o_dont_replicate = 1;
1607 opm.o_opid = -1;
1608 opm.o_bd->be_modify( &opm, &rsm );
1609
1610 if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT &&
1611 SLAP_SYNC_SUBENTRY( opm.o_bd )) {
1612 const char *text;
1613 char txtbuf[SLAP_TEXT_BUFLEN];
1614 size_t textlen = sizeof txtbuf;
1615 Entry *e = slap_create_context_csn_entry( opm.o_bd, NULL );
1616 rs_reinit( &rsm, REP_RESULT );
1617 slap_mods2entry( &mod, &e, 0, 1, &text, txtbuf, textlen);
1618 opm.ora_e = e;
1619 opm.o_bd->be_add( &opm, &rsm );
1620 if ( e == opm.ora_e )
1621 be_entry_release_w( &opm, opm.ora_e );
1622 }
1623 opm.o_bd->bd_info = bi;
1624
1625 if ( mod.sml_next != NULL ) {
1626 slap_mods_free( mod.sml_next, 1 );
1627 }
1628 #ifdef CHECK_CSN
1629 for ( i=0; i<si->si_numcsns; i++ ) {
1630 assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i ));
1631 }
1632 #endif
1633 }
1634
1635 static void
syncprov_add_slog(Operation * op)1636 syncprov_add_slog( Operation *op )
1637 {
1638 opcookie *opc = op->o_callback->sc_private;
1639 slap_overinst *on = opc->son;
1640 syncprov_info_t *si = on->on_bi.bi_private;
1641 sessionlog *sl;
1642 slog_entry *se;
1643 char uuidstr[40];
1644 int rc;
1645
1646 sl = si->si_logs;
1647 {
1648 if ( BER_BVISEMPTY( &op->o_csn ) ) {
1649 /* During the syncrepl refresh phase we can receive operations
1650 * without a csn. We cannot reliably determine the consumers
1651 * state with respect to such operations, so we ignore them and
1652 * wipe out anything in the log if we see them.
1653 */
1654 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
1655 /* can only do this if no one else is reading the log at the moment */
1656 if ( !sl->sl_playing ) {
1657 ldap_tavl_free( sl->sl_entries, (AVL_FREE)ch_free );
1658 sl->sl_num = 0;
1659 sl->sl_entries = NULL;
1660 }
1661 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
1662 return;
1663 }
1664
1665 /* Allocate a record. UUIDs are not NUL-terminated. */
1666 se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
1667 op->o_csn.bv_len + 1 );
1668 se->se_tag = op->o_tag;
1669
1670 se->se_uuid.bv_val = (char *)(&se[1]);
1671 AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
1672 se->se_uuid.bv_len = opc->suuid.bv_len;
1673
1674 se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len;
1675 AC_MEMCPY( se->se_csn.bv_val, op->o_csn.bv_val, op->o_csn.bv_len );
1676 se->se_csn.bv_val[op->o_csn.bv_len] = '\0';
1677 se->se_csn.bv_len = op->o_csn.bv_len;
1678 se->se_sid = slap_parse_csn_sid( &se->se_csn );
1679
1680 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
1681 if ( LogTest( LDAP_DEBUG_SYNC ) ) {
1682 uuidstr[0] = 0;
1683 if ( !BER_BVISEMPTY( &opc->suuid ) ) {
1684 lutil_uuidstr_from_normalized( opc->suuid.bv_val, opc->suuid.bv_len,
1685 uuidstr, 40 );
1686 }
1687 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
1688 "adding csn=%s to sessionlog, uuid=%s\n",
1689 op->o_log_prefix, se->se_csn.bv_val, uuidstr );
1690 }
1691 if ( !sl->sl_entries ) {
1692 if ( !sl->sl_mincsn ) {
1693 sl->sl_numcsns = 1;
1694 sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval ));
1695 sl->sl_sids = ch_malloc( sizeof( int ));
1696 sl->sl_sids[0] = se->se_sid;
1697 ber_dupbv( sl->sl_mincsn, &se->se_csn );
1698 BER_BVZERO( &sl->sl_mincsn[1] );
1699 }
1700 }
1701 rc = ldap_tavl_insert( &sl->sl_entries, se, syncprov_sessionlog_cmp, ldap_avl_dup_error );
1702 if ( rc ) {
1703 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
1704 "duplicate sessionlog entry ignored: csn=%s, uuid=%s\n",
1705 op->o_log_prefix, se->se_csn.bv_val, uuidstr );
1706 ch_free( se );
1707 goto leave;
1708 }
1709 sl->sl_num++;
1710 if ( !sl->sl_playing && sl->sl_num > sl->sl_size ) {
1711 TAvlnode *edge = ldap_tavl_end( sl->sl_entries, TAVL_DIR_LEFT );
1712 while ( sl->sl_num > sl->sl_size ) {
1713 int i;
1714 TAvlnode *next = ldap_tavl_next( edge, TAVL_DIR_RIGHT );
1715 se = edge->avl_data;
1716 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
1717 "expiring csn=%s from sessionlog (sessionlog size=%d)\n",
1718 op->o_log_prefix, se->se_csn.bv_val, sl->sl_num );
1719 for ( i=0; i<sl->sl_numcsns; i++ )
1720 if ( sl->sl_sids[i] >= se->se_sid )
1721 break;
1722 if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) {
1723 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
1724 "adding csn=%s to mincsn\n",
1725 op->o_log_prefix, se->se_csn.bv_val );
1726 slap_insert_csn_sids( (struct sync_cookie *)sl,
1727 i, se->se_sid, &se->se_csn );
1728 } else {
1729 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
1730 "updating mincsn for sid=%d csn=%s to %s\n",
1731 op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val );
1732 ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn );
1733 }
1734 ldap_tavl_delete( &sl->sl_entries, se, syncprov_sessionlog_cmp );
1735 ch_free( se );
1736 edge = next;
1737 sl->sl_num--;
1738 }
1739 }
1740 leave:
1741 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
1742 }
1743 }
1744
1745 /* Just set a flag if we found the matching entry */
1746 static int
playlog_cb(Operation * op,SlapReply * rs)1747 playlog_cb( Operation *op, SlapReply *rs )
1748 {
1749 if ( rs->sr_type == REP_SEARCH ) {
1750 op->o_callback->sc_private = (void *)1;
1751 }
1752 return rs->sr_err;
1753 }
1754
1755 /*
1756 * Check whether the last nmods UUIDs in the uuids list exist in the database
1757 * and (still) match the op filter, zero out the bv_len of any that still exist
1758 * and return the number of UUIDs we have confirmed are gone now.
1759 */
1760 static int
check_uuidlist_presence(Operation * op,struct berval * uuids,int len,int nmods)1761 check_uuidlist_presence(
1762 Operation *op,
1763 struct berval *uuids,
1764 int len,
1765 int nmods )
1766 {
1767 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
1768 Operation fop = *op;
1769 SlapReply frs = { REP_RESULT };
1770 Filter mf, af;
1771 AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
1772 slap_callback cb = {0};
1773 int i, mods = nmods;
1774
1775 fop.o_sync_mode = 0;
1776 fop.o_callback = &cb;
1777 fop.ors_limit = NULL;
1778 fop.ors_tlimit = SLAP_NO_LIMIT;
1779 fop.ors_attrs = slap_anlist_all_attributes;
1780 fop.ors_attrsonly = 0;
1781 fop.o_managedsait = SLAP_CONTROL_CRITICAL;
1782
1783 af.f_choice = LDAP_FILTER_AND;
1784 af.f_next = NULL;
1785 af.f_and = &mf;
1786 mf.f_choice = LDAP_FILTER_EQUALITY;
1787 mf.f_ava = &eq;
1788 mf.f_av_desc = slap_schema.si_ad_entryUUID;
1789 mf.f_next = fop.ors_filter;
1790
1791 fop.ors_filter = ⁡
1792
1793 cb.sc_response = playlog_cb;
1794
1795 fop.o_bd->bd_info = (BackendInfo *)on->on_info;
1796 for ( i=0; i<nmods; i++ ) {
1797 mf.f_av_value = uuids[ len - 1 - i ];
1798 cb.sc_private = NULL;
1799 fop.ors_slimit = 1;
1800
1801 if ( BER_BVISEMPTY( &mf.f_av_value ) ) {
1802 mods--;
1803 continue;
1804 }
1805
1806 rs_reinit( &frs, REP_RESULT );
1807 fop.o_bd->be_search( &fop, &frs );
1808 if ( cb.sc_private ) {
1809 uuids[ len - 1 - i ].bv_len = 0;
1810 mods--;
1811 }
1812 }
1813 fop.o_bd->bd_info = (BackendInfo *)on;
1814
1815 return mods;
1816 }
1817
1818 /*
1819 * On each entry we get from the DB:
1820 * - if it's an ADD, skip
1821 * - check we've not handled it yet, skip if we have
1822 * - check if it's a DELETE or missing from the DB now
1823 * - send a new syncinfo entry
1824 * - remember we've handled it already
1825 *
1826 * If we exhaust the list, clear it, forgetting entries we've handled so far.
1827 */
1828 static int
syncprov_accesslog_uuid_cb(Operation * op,SlapReply * rs)1829 syncprov_accesslog_uuid_cb( Operation *op, SlapReply *rs )
1830 {
1831 slap_callback *sc = op->o_callback;
1832 syncprov_accesslog_deletes *uuid_progress = sc->sc_private;
1833 Attribute *a, *attrs;
1834 sync_control *srs = uuid_progress->srs;
1835 struct berval *bv, csn[2] = {}, uuid[2] = {},
1836 add = BER_BVC("add"),
1837 delete = BER_BVC("delete"),
1838 modrdn = BER_BVC("modrdn");
1839 int cmp, sid, i, is_delete = 0, rc;
1840
1841 if ( rs->sr_type != REP_SEARCH ) {
1842 return rs->sr_err;
1843 }
1844 attrs = rs->sr_entry->e_attrs;
1845
1846 a = attr_find( attrs, ad_reqType );
1847 if ( !a || a->a_numvals == 0 ) {
1848 rs->sr_err = LDAP_CONSTRAINT_VIOLATION;
1849 return rs->sr_err;
1850 }
1851
1852 if ( bvmatch( &a->a_nvals[0], &add ) ) {
1853 return rs->sr_err;
1854 }
1855
1856 if ( bvmatch( &a->a_nvals[0], &delete ) ) {
1857 is_delete = 1;
1858 }
1859
1860 if ( bvmatch( &a->a_nvals[0], &modrdn ) ) {
1861 a = attr_find( attrs, ad_reqDN );
1862 if ( !a || a->a_numvals == 0 ) {
1863 rs->sr_err = LDAP_CONSTRAINT_VIOLATION;
1864 return rs->sr_err;
1865 }
1866
1867 /* Was it present in the first place? If not, skip: */
1868 if ( !dnIsSuffix( &a->a_nvals[0], &uuid_progress->op->o_req_ndn ) ) {
1869 return rs->sr_err;
1870 }
1871
1872 a = attr_find( attrs, ad_reqNewDN );
1873 if ( !a || a->a_numvals == 0 ) {
1874 rs->sr_err = LDAP_CONSTRAINT_VIOLATION;
1875 return rs->sr_err;
1876 }
1877
1878 /* Has it gone away? */
1879 if ( !dnIsSuffix( &a->a_nvals[0], &uuid_progress->op->o_req_ndn ) ) {
1880 is_delete = 1;
1881 }
1882 }
1883
1884 /*
1885 * Only pick entries that are both:
1886 */
1887 a = attr_find( attrs, slap_schema.si_ad_entryCSN );
1888 if ( !a || a->a_numvals == 0 ) {
1889 rs->sr_err = LDAP_CONSTRAINT_VIOLATION;
1890 return rs->sr_err;
1891 }
1892 csn[0] = a->a_nvals[0];
1893
1894 sid = slap_parse_csn_sid( &csn[0] );
1895
1896 /*
1897 * newer than cookieCSN (srs->sr_state.ctxcsn)
1898 */
1899 cmp = 1;
1900 for ( i=0; i<srs->sr_state.numcsns; i++ ) {
1901 if ( sid == srs->sr_state.sids[i] ) {
1902 cmp = ber_bvcmp( &csn[0], &srs->sr_state.ctxcsn[i] );
1903 break;
1904 }
1905 }
1906 if ( cmp <= 0 ) {
1907 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: "
1908 "cmp %d, csn %s too old\n",
1909 op->o_log_prefix, cmp, csn[0].bv_val );
1910 return rs->sr_err;
1911 }
1912
1913 /*
1914 * not newer than snapshot ctxcsn (uuid_progress->ctxcsn)
1915 */
1916 cmp = 0;
1917 for ( i=0; i<uuid_progress->numcsns; i++ ) {
1918 if ( sid == uuid_progress->sids[i] ) {
1919 cmp = ber_bvcmp( &csn[0], &uuid_progress->ctxcsn[i] );
1920 break;
1921 }
1922 }
1923 if ( cmp > 0 ) {
1924 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: "
1925 "cmp %d, csn %s too new\n",
1926 op->o_log_prefix, cmp, csn[0].bv_val );
1927 return rs->sr_err;
1928 }
1929
1930 a = attr_find( attrs, ad_reqEntryUUID );
1931 if ( !a || a->a_numvals == 0 ) {
1932 rs->sr_err = LDAP_CONSTRAINT_VIOLATION;
1933 return rs->sr_err;
1934 }
1935 uuid[0] = a->a_nvals[0];
1936
1937 bv = ldap_avl_find( uuid_progress->uuids, uuid, sp_uuid_cmp );
1938 if ( bv ) {
1939 /* Already checked or sent, no change */
1940 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: "
1941 "uuid %s already checked\n",
1942 op->o_log_prefix, a->a_vals[0].bv_val );
1943 return rs->sr_err;
1944 }
1945
1946 if ( !is_delete ) {
1947 is_delete = check_uuidlist_presence( uuid_progress->op, uuid, 1, 1 );
1948 }
1949 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: "
1950 "uuid %s is %s present\n",
1951 op->o_log_prefix, a->a_vals[0].bv_val,
1952 is_delete ? "no longer" : "still" );
1953
1954 i = uuid_progress->ndel++;
1955
1956 bv = &uuid_progress->uuid_list[i];
1957 bv->bv_val = &uuid_progress->uuid_buf[i*UUID_LEN];
1958 bv->bv_len = a->a_nvals[0].bv_len;
1959 AC_MEMCPY( bv->bv_val, a->a_nvals[0].bv_val, a->a_nvals[0].bv_len );
1960
1961 rc = ldap_avl_insert( &uuid_progress->uuids, bv, sp_uuid_cmp, ldap_avl_dup_error );
1962 assert( rc == LDAP_SUCCESS );
1963
1964 if ( is_delete ) {
1965 struct berval cookie;
1966
1967 slap_compose_sync_cookie( op, &cookie, srs->sr_state.ctxcsn,
1968 srs->sr_state.rid, slap_serverID ? slap_serverID : -1, csn );
1969 syncprov_sendinfo( uuid_progress->op, uuid_progress->rs,
1970 LDAP_TAG_SYNC_ID_SET, &cookie, 0, uuid, 1 );
1971 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
1972 }
1973
1974 if ( uuid_progress->ndel >= uuid_progress->list_len ) {
1975 int ndel;
1976
1977 assert( uuid_progress->ndel == uuid_progress->list_len );
1978 ndel = ldap_avl_free( uuid_progress->uuids, NULL );
1979 assert( ndel == uuid_progress->ndel );
1980 uuid_progress->uuids = NULL;
1981 uuid_progress->ndel = 0;
1982 }
1983
1984 return rs->sr_err;
1985 }
1986
1987 static int
syncprov_play_sessionlog(Operation * op,SlapReply * rs,sync_control * srs,BerVarray ctxcsn,int numcsns,int * sids,struct berval * mincsn,int minsid)1988 syncprov_play_sessionlog( Operation *op, SlapReply *rs, sync_control *srs,
1989 BerVarray ctxcsn, int numcsns, int *sids,
1990 struct berval *mincsn, int minsid )
1991 {
1992 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
1993 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
1994 sessionlog *sl = si->si_logs;
1995 int i, j, ndel, num, nmods, mmods, do_play = 0, rc = -1;
1996 BerVarray uuids, csns;
1997 struct berval uuid[2] = {}, csn[2] = {};
1998 slog_entry *se;
1999 TAvlnode *entry;
2000 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
2001 struct berval delcsn[2];
2002
2003 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
2004 /* Are there any log entries, and is the consumer state
2005 * present in the session log?
2006 */
2007 if ( !sl->sl_num ) {
2008 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
2009 return rc;
2010 }
2011 assert( sl->sl_num > 0 );
2012
2013 for ( i=0; i<sl->sl_numcsns; i++ ) {
2014 /* SID not present == new enough */
2015 if ( minsid < sl->sl_sids[i] ) {
2016 do_play = 1;
2017 break;
2018 }
2019 /* SID present */
2020 if ( minsid == sl->sl_sids[i] ) {
2021 /* new enough? */
2022 if ( ber_bvcmp( mincsn, &sl->sl_mincsn[i] ) >= 0 )
2023 do_play = 1;
2024 break;
2025 }
2026 }
2027 /* SID not present == new enough */
2028 if ( i == sl->sl_numcsns )
2029 do_play = 1;
2030
2031 if ( !do_play ) {
2032 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
2033 return rc;
2034 }
2035
2036 num = sl->sl_num;
2037 i = 0;
2038 nmods = 0;
2039 sl->sl_playing++;
2040 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
2041
2042 uuids = op->o_tmpalloc( (num) * sizeof( struct berval ) +
2043 num * UUID_LEN, op->o_tmpmemctx );
2044 uuids[0].bv_val = (char *)(uuids + num);
2045 csns = op->o_tmpalloc( (num) * sizeof( struct berval ) +
2046 num * LDAP_PVT_CSNSTR_BUFSIZE, op->o_tmpmemctx );
2047 csns[0].bv_val = (char *)(csns + num);
2048
2049 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
2050 /* Make a copy of the relevant UUIDs. Put the Deletes up front
2051 * and everything else at the end. Do this first so we can
2052 * let the write side manage the sessionlog again.
2053 */
2054 assert( sl->sl_entries );
2055
2056 /* Find first relevant log entry. If greater than mincsn, backtrack one entry */
2057 {
2058 slog_entry te = {0};
2059 te.se_csn = *mincsn;
2060 entry = ldap_tavl_find3( sl->sl_entries, &te, syncprov_sessionlog_cmp, &ndel );
2061 }
2062 if ( ndel > 0 && entry )
2063 entry = ldap_tavl_next( entry, TAVL_DIR_LEFT );
2064 /* if none, just start at beginning */
2065 if ( !entry )
2066 entry = ldap_tavl_end( sl->sl_entries, TAVL_DIR_LEFT );
2067
2068 do {
2069 char uuidstr[40] = {};
2070 slog_entry *se = entry->avl_data;
2071 int k;
2072
2073 /* Make sure writes can still make progress */
2074 ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex );
2075 ndel = 1;
2076 for ( k=0; k<srs->sr_state.numcsns; k++ ) {
2077 if ( se->se_sid == srs->sr_state.sids[k] ) {
2078 ndel = ber_bvcmp( &se->se_csn, &srs->sr_state.ctxcsn[k] );
2079 break;
2080 }
2081 }
2082 if ( ndel <= 0 ) {
2083 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
2084 continue;
2085 }
2086 ndel = 0;
2087 for ( k=0; k<numcsns; k++ ) {
2088 if ( se->se_sid == sids[k] ) {
2089 ndel = ber_bvcmp( &se->se_csn, &ctxcsn[k] );
2090 break;
2091 }
2092 }
2093 if ( ndel > 0 ) {
2094 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: "
2095 "cmp %d, csn %s too new, we're finished\n",
2096 op->o_log_prefix, ndel, se->se_csn.bv_val );
2097 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
2098 break;
2099 }
2100 if ( se->se_tag == LDAP_REQ_DELETE ) {
2101 j = i;
2102 i++;
2103 } else {
2104 if ( se->se_tag == LDAP_REQ_ADD ) {
2105 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
2106 continue;
2107 }
2108 nmods++;
2109 j = num - nmods;
2110 }
2111 uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN);
2112 AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN);
2113 uuids[j].bv_len = UUID_LEN;
2114
2115 csns[j].bv_val = csns[0].bv_val + (j * LDAP_PVT_CSNSTR_BUFSIZE);
2116 AC_MEMCPY(csns[j].bv_val, se->se_csn.bv_val, se->se_csn.bv_len);
2117 csns[j].bv_len = se->se_csn.bv_len;
2118 /* We're printing it */
2119 csns[j].bv_val[csns[j].bv_len] = '\0';
2120
2121 if ( LogTest( LDAP_DEBUG_SYNC ) ) {
2122 lutil_uuidstr_from_normalized( uuids[j].bv_val, uuids[j].bv_len,
2123 uuidstr, 40 );
2124 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: "
2125 "picking a %s entry uuid=%s cookie=%s\n",
2126 op->o_log_prefix, se->se_tag == LDAP_REQ_DELETE ? "deleted" : "modified",
2127 uuidstr, csns[j].bv_val );
2128 }
2129 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
2130 } while ( (entry = ldap_tavl_next( entry, TAVL_DIR_RIGHT )) != NULL );
2131 ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex );
2132 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
2133 sl->sl_playing--;
2134 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
2135
2136 ndel = i;
2137
2138 /* Zero out unused slots */
2139 for ( i=ndel; i < num - nmods; i++ )
2140 uuids[i].bv_len = 0;
2141
2142 /* Mods must be validated to see if they belong in this delete set.
2143 */
2144
2145 mmods = nmods;
2146 /* Strip any duplicates */
2147 for ( i=0; i<nmods; i++ ) {
2148 for ( j=0; j<ndel; j++ ) {
2149 if ( bvmatch( &uuids[j], &uuids[num - 1 - i] )) {
2150 uuids[num - 1 - i].bv_len = 0;
2151 mmods --;
2152 break;
2153 }
2154 }
2155 if ( uuids[num - 1 - i].bv_len == 0 ) continue;
2156 for ( j=0; j<i; j++ ) {
2157 if ( bvmatch( &uuids[num - 1 - j], &uuids[num - 1 - i] )) {
2158 uuids[num - 1 - i].bv_len = 0;
2159 mmods --;
2160 break;
2161 }
2162 }
2163 }
2164
2165 /* Check mods now */
2166 if ( mmods ) {
2167 check_uuidlist_presence( op, uuids, num, nmods );
2168 }
2169
2170 /* ITS#8768 Send entries sorted by CSN order */
2171 i = j = 0;
2172 while ( i < ndel || j < nmods ) {
2173 struct berval cookie;
2174 int index;
2175
2176 /* Skip over duplicate mods */
2177 if ( j < nmods && BER_BVISEMPTY( &uuids[ num - 1 - j ] ) ) {
2178 j++;
2179 continue;
2180 }
2181 index = num - 1 - j;
2182
2183 if ( i >= ndel ) {
2184 j++;
2185 } else if ( j >= nmods ) {
2186 index = i++;
2187 /* Take the oldest by CSN order */
2188 } else if ( ber_bvcmp( &csns[index], &csns[i] ) < 0 ) {
2189 j++;
2190 } else {
2191 index = i++;
2192 }
2193
2194 uuid[0] = uuids[index];
2195 csn[0] = csns[index];
2196
2197 slap_compose_sync_cookie( op, &cookie, srs->sr_state.ctxcsn,
2198 srs->sr_state.rid, slap_serverID ? slap_serverID : -1, csn );
2199 if ( LogTest( LDAP_DEBUG_SYNC ) ) {
2200 char uuidstr[40];
2201 lutil_uuidstr_from_normalized( uuid[0].bv_val, uuid[0].bv_len,
2202 uuidstr, 40 );
2203 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: "
2204 "sending a new disappearing entry uuid=%s cookie=%s\n",
2205 op->o_log_prefix, uuidstr, cookie.bv_val );
2206 }
2207
2208 /* TODO: we might batch those that share the same CSN (think present
2209 * phase), but would have to limit how many we send out at once */
2210 syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, &cookie, 0, uuid, 1 );
2211 }
2212 op->o_tmpfree( uuids, op->o_tmpmemctx );
2213 op->o_tmpfree( csns, op->o_tmpmemctx );
2214
2215 return LDAP_SUCCESS;
2216 }
2217
2218 static int
syncprov_play_accesslog(Operation * op,SlapReply * rs,sync_control * srs,BerVarray ctxcsn,int numcsns,int * sids,struct berval * mincsn,int minsid)2219 syncprov_play_accesslog( Operation *op, SlapReply *rs, sync_control *srs,
2220 BerVarray ctxcsn, int numcsns, int *sids,
2221 struct berval *mincsn, int minsid )
2222 {
2223 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
2224 syncprov_info_t *si = on->on_bi.bi_private;
2225 Operation fop;
2226 SlapReply frs = { REP_RESULT };
2227 slap_callback cb = {};
2228 Filter *f;
2229 syncprov_accesslog_deletes uuid_progress = {
2230 .op = op,
2231 .rs = rs,
2232 .srs = srs,
2233 .ctxcsn = ctxcsn,
2234 .numcsns = numcsns,
2235 .sids = sids,
2236 };
2237 struct berval oldestcsn = BER_BVNULL, newestcsn = ctxcsn[0],
2238 basedn, filterpattern = BER_BVC(
2239 "(&"
2240 "(entryCSN>=%s)"
2241 "(entryCSN<=%s)"
2242 "(reqResult=0)"
2243 "(|"
2244 "(reqDN:dnSubtreeMatch:=%s)"
2245 "(reqNewDN:dnSubtreeMatch:=%s)"
2246 ")"
2247 "(|"
2248 "(objectclass=auditWriteObject)"
2249 "(objectclass=auditExtended)"
2250 "))" );
2251 BackendDB *db;
2252 Entry *e;
2253 Attribute *a;
2254 int i, rc = -1;
2255
2256 assert( !BER_BVISNULL( &si->si_logbase ) );
2257
2258 for ( i=1; i < numcsns; i++ ) {
2259 if ( ber_bvcmp( &newestcsn, &ctxcsn[i] ) < 0 ) {
2260 newestcsn = ctxcsn[i];
2261 }
2262 }
2263
2264 db = select_backend( &si->si_logbase, 0 );
2265 if ( !db ) {
2266 Debug( LDAP_DEBUG_ANY, "%s syncprov_play_accesslog: "
2267 "No database configured to hold accesslog dn=%s\n",
2268 op->o_log_prefix, si->si_logbase.bv_val );
2269 return LDAP_NO_SUCH_OBJECT;
2270 }
2271
2272 fop = *op;
2273 fop.o_sync_mode = 0;
2274 fop.o_bd = db;
2275 rc = be_entry_get_rw( &fop, &si->si_logbase, NULL, ad_minCSN, 0, &e );
2276 if ( rc ) {
2277 return rc;
2278 }
2279
2280 a = attr_find( e->e_attrs, ad_minCSN );
2281 if ( !a ) {
2282 be_entry_release_rw( &fop, e, 0 );
2283 return LDAP_NO_SUCH_ATTRIBUTE;
2284 }
2285 for ( i=0; i < a->a_numvals; i++ ) {
2286 if ( BER_BVISEMPTY( &oldestcsn ) ||
2287 ber_bvcmp( &oldestcsn, &a->a_nvals[i] ) > 0 ) {
2288 oldestcsn = a->a_nvals[i];
2289 }
2290 }
2291
2292 filter_escape_value_x( &op->o_req_ndn, &basedn, fop.o_tmpmemctx );
2293 /* filter_escape_value_x sets output to BVNULL if input value is empty,
2294 * supply our own copy */
2295 if ( BER_BVISEMPTY( &basedn ) ) {
2296 basedn.bv_val = "";
2297 }
2298 fop.o_req_ndn = fop.o_req_dn = si->si_logbase;
2299 fop.ors_filterstr.bv_val = fop.o_tmpalloc(
2300 filterpattern.bv_len +
2301 oldestcsn.bv_len + newestcsn.bv_len + 2 * basedn.bv_len,
2302 fop.o_tmpmemctx );
2303 fop.ors_filterstr.bv_len = sprintf( fop.ors_filterstr.bv_val,
2304 filterpattern.bv_val,
2305 oldestcsn.bv_val, newestcsn.bv_val, basedn.bv_val, basedn.bv_val );
2306 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_accesslog: "
2307 "prepared filter '%s', base='%s'\n",
2308 op->o_log_prefix, fop.ors_filterstr.bv_val, si->si_logbase.bv_val );
2309 f = str2filter_x( &fop, fop.ors_filterstr.bv_val );
2310 assert( f != NULL );
2311 fop.ors_filter = f;
2312
2313 if ( !BER_BVISEMPTY( &basedn ) ) {
2314 fop.o_tmpfree( basedn.bv_val, fop.o_tmpmemctx );
2315 }
2316 be_entry_release_rw( &fop, e, 0 );
2317
2318 /*
2319 * Allocate memory for list_len uuids for use by the callback, populate
2320 * with entries that we have sent or checked still match the filter.
2321 * A disappearing entry gets its uuid sent as a delete.
2322 *
2323 * in the callback, we need:
2324 * - original op and rs so we can send the message
2325 * - sync_control
2326 * - the uuid buffer and list and their length
2327 * - number of uuids we already have in the list
2328 * - the lookup structure so we don't have to check/send a uuid twice
2329 * (AVL?)
2330 */
2331 uuid_progress.list_len = SLAP_SYNCUUID_SET_SIZE;
2332 uuid_progress.uuid_list = fop.o_tmpalloc( (uuid_progress.list_len) * sizeof(struct berval), fop.o_tmpmemctx );
2333 uuid_progress.uuid_buf = fop.o_tmpalloc( (uuid_progress.list_len) * UUID_LEN, fop.o_tmpmemctx );
2334
2335 cb.sc_private = &uuid_progress;
2336 cb.sc_response = syncprov_accesslog_uuid_cb;
2337
2338 fop.o_callback = &cb;
2339
2340 rc = fop.o_bd->be_search( &fop, &frs );
2341
2342 fop.o_tmpfree( uuid_progress.uuid_buf, fop.o_tmpmemctx );
2343 fop.o_tmpfree( uuid_progress.uuid_list, fop.o_tmpmemctx );
2344 fop.o_tmpfree( fop.ors_filterstr.bv_val, fop.o_tmpmemctx );
2345 filter_free_x( &fop, f, 1 );
2346
2347 return rc;
2348 }
2349
2350 static int
syncprov_new_ctxcsn(opcookie * opc,syncprov_info_t * si,int csn_changed,int numvals,BerVarray vals)2351 syncprov_new_ctxcsn( opcookie *opc, syncprov_info_t *si, int csn_changed, int numvals, BerVarray vals )
2352 {
2353 unsigned i;
2354 int j, sid;
2355
2356 for ( i=0; i<numvals; i++ ) {
2357 sid = slap_parse_csn_sid( &vals[i] );
2358 for ( j=0; j<si->si_numcsns; j++ ) {
2359 if ( sid < si->si_sids[j] )
2360 break;
2361 if ( sid == si->si_sids[j] ) {
2362 if ( ber_bvcmp( &vals[i], &si->si_ctxcsn[j] ) > 0 ) {
2363 ber_bvreplace( &si->si_ctxcsn[j], &vals[i] );
2364 csn_changed = 1;
2365 }
2366 break;
2367 }
2368 }
2369
2370 if ( j == si->si_numcsns || sid != si->si_sids[j] ) {
2371 slap_insert_csn_sids( (struct sync_cookie *)&si->si_ctxcsn,
2372 j, sid, &vals[i] );
2373 csn_changed = 1;
2374 }
2375 }
2376 if ( csn_changed )
2377 si->si_dirty = 0;
2378 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
2379
2380 if ( csn_changed ) {
2381 syncops *ss;
2382 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
2383 for ( ss = si->si_ops; ss; ss = ss->s_next ) {
2384 if ( ss->s_op->o_abandon )
2385 continue;
2386 /* Send the updated csn to all syncrepl consumers,
2387 * including the server from which it originated.
2388 * The syncrepl consumer and syncprov provider on
2389 * the originating server may be configured to store
2390 * their csn values in different entries.
2391 */
2392 syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE );
2393 }
2394 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
2395 }
2396 return csn_changed;
2397 }
2398
2399 static int
syncprov_op_response(Operation * op,SlapReply * rs)2400 syncprov_op_response( Operation *op, SlapReply *rs )
2401 {
2402 opcookie *opc = op->o_callback->sc_private;
2403 slap_overinst *on = opc->son;
2404 syncprov_info_t *si = on->on_bi.bi_private;
2405 syncmatches *sm;
2406
2407 if ( rs->sr_err == LDAP_SUCCESS )
2408 {
2409 struct berval maxcsn;
2410 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
2411 int do_check = 0, have_psearches, foundit, csn_changed = 0;
2412
2413 ldap_pvt_thread_mutex_lock( &si->si_resp_mutex );
2414
2415 /* Update our context CSN */
2416 cbuf[0] = '\0';
2417 maxcsn.bv_val = cbuf;
2418 maxcsn.bv_len = sizeof(cbuf);
2419 ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock );
2420
2421 slap_get_commit_csn( op, &maxcsn, &foundit );
2422 if ( BER_BVISEMPTY( &maxcsn ) && SLAP_GLUE_SUBORDINATE( op->o_bd )) {
2423 /* syncrepl queues the CSN values in the db where
2424 * it is configured , not where the changes are made.
2425 * So look for a value in the glue db if we didn't
2426 * find any in this db.
2427 */
2428 BackendDB *be = op->o_bd;
2429 op->o_bd = select_backend( &be->be_nsuffix[0], 1);
2430 maxcsn.bv_val = cbuf;
2431 maxcsn.bv_len = sizeof(cbuf);
2432 slap_get_commit_csn( op, &maxcsn, &foundit );
2433 op->o_bd = be;
2434 }
2435 if ( !BER_BVISEMPTY( &maxcsn ) ) {
2436 int i, sid;
2437 #ifdef CHECK_CSN
2438 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
2439 assert( !syn->ssyn_validate( syn, &maxcsn ));
2440 #endif
2441 sid = slap_parse_csn_sid( &maxcsn );
2442 for ( i=0; i<si->si_numcsns; i++ ) {
2443 if ( sid < si->si_sids[i] )
2444 break;
2445 if ( sid == si->si_sids[i] ) {
2446 if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn[i] ) > 0 ) {
2447 ber_bvreplace( &si->si_ctxcsn[i], &maxcsn );
2448 csn_changed = 1;
2449 }
2450 break;
2451 }
2452 }
2453 /* It's a new SID for us */
2454 if ( i == si->si_numcsns || sid != si->si_sids[i] ) {
2455 slap_insert_csn_sids((struct sync_cookie *)&(si->si_ctxcsn),
2456 i, sid, &maxcsn );
2457 csn_changed = 1;
2458 }
2459 }
2460
2461 /* Don't do any processing for consumer contextCSN updates */
2462 if ( SLAPD_SYNC_IS_SYNCCONN( op->o_connid ) &&
2463 op->o_tag == LDAP_REQ_MODIFY &&
2464 op->orm_modlist &&
2465 op->orm_modlist->sml_op == LDAP_MOD_REPLACE &&
2466 op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) {
2467 /* Catch contextCSN updates from syncrepl. We have to look at
2468 * all the attribute values, as there may be more than one csn
2469 * that changed, and only one can be passed in the csn queue.
2470 */
2471 csn_changed = syncprov_new_ctxcsn( opc, si, csn_changed,
2472 op->orm_modlist->sml_numvals, op->orm_modlist->sml_values );
2473 if ( csn_changed )
2474 si->si_numops++;
2475 goto leave;
2476 }
2477 if ( op->o_dont_replicate ) {
2478 if ( csn_changed )
2479 si->si_numops++;
2480 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
2481 goto leave;
2482 }
2483
2484 /* If we're adding the context entry, parse all of its contextCSNs */
2485 if ( op->o_tag == LDAP_REQ_ADD &&
2486 dn_match( &op->o_req_ndn, &si->si_contextdn )) {
2487 Attribute *a = attr_find( op->ora_e->e_attrs, slap_schema.si_ad_contextCSN );
2488 if ( a ) {
2489 csn_changed = syncprov_new_ctxcsn( opc, si, csn_changed, a->a_numvals, a->a_vals );
2490 if ( csn_changed )
2491 si->si_numops++;
2492 goto added;
2493 }
2494 }
2495
2496 if ( csn_changed )
2497 si->si_numops++;
2498 if ( si->si_chkops || si->si_chktime ) {
2499 /* Never checkpoint adding the context entry,
2500 * it will deadlock
2501 */
2502 if ( op->o_tag != LDAP_REQ_ADD ||
2503 !dn_match( &op->o_req_ndn, &si->si_contextdn )) {
2504 if ( si->si_chkops && si->si_numops >= si->si_chkops ) {
2505 do_check = 1;
2506 si->si_numops = 0;
2507 }
2508 if ( si->si_chktime &&
2509 (op->o_time - si->si_chklast >= si->si_chktime )) {
2510 if ( si->si_chklast ) {
2511 do_check = 1;
2512 si->si_chklast = op->o_time;
2513 } else {
2514 si->si_chklast = 1;
2515 }
2516 }
2517 }
2518 }
2519 si->si_dirty = !csn_changed;
2520 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
2521
2522 added:
2523 if ( do_check ) {
2524 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
2525 syncprov_checkpoint( op, on );
2526 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
2527 }
2528
2529 /* only update consumer ctx if this is a newer csn */
2530 if ( csn_changed ) {
2531 opc->sctxcsn = maxcsn;
2532 }
2533
2534 /* Handle any persistent searches */
2535 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
2536 have_psearches = ( si->si_ops != NULL );
2537 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
2538 if ( have_psearches ) {
2539 switch(op->o_tag) {
2540 case LDAP_REQ_ADD:
2541 case LDAP_REQ_MODIFY:
2542 case LDAP_REQ_MODRDN:
2543 case LDAP_REQ_EXTENDED:
2544 syncprov_matchops( op, opc, 0 );
2545 break;
2546 case LDAP_REQ_DELETE:
2547 /* for each match in opc->smatches:
2548 * send DELETE msg
2549 */
2550 for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
2551 if ( sm->sm_op->s_op->o_abandon )
2552 continue;
2553 syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE );
2554 }
2555 if ( opc->ssres.s_info )
2556 free_resinfo( &opc->ssres );
2557 break;
2558 }
2559 }
2560
2561 /* Add any log records */
2562 if ( si->si_logs ) {
2563 syncprov_add_slog( op );
2564 }
2565 leave: ldap_pvt_thread_mutex_unlock( &si->si_resp_mutex );
2566 }
2567 return SLAP_CB_CONTINUE;
2568 }
2569
2570 /* We don't use a subentry to store the context CSN any more.
2571 * We expose the current context CSN as an operational attribute
2572 * of the suffix entry.
2573 */
2574 static int
syncprov_op_compare(Operation * op,SlapReply * rs)2575 syncprov_op_compare( Operation *op, SlapReply *rs )
2576 {
2577 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
2578 syncprov_info_t *si = on->on_bi.bi_private;
2579 int rc = SLAP_CB_CONTINUE;
2580
2581 if ( dn_match( &op->o_req_ndn, &si->si_contextdn ) &&
2582 op->oq_compare.rs_ava->aa_desc == slap_schema.si_ad_contextCSN )
2583 {
2584 Entry e = {0};
2585 Attribute a = {0};
2586
2587 e.e_name = si->si_contextdn;
2588 e.e_nname = si->si_contextdn;
2589 e.e_attrs = &a;
2590
2591 a.a_desc = slap_schema.si_ad_contextCSN;
2592
2593 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
2594
2595 a.a_vals = si->si_ctxcsn;
2596 a.a_nvals = a.a_vals;
2597 a.a_numvals = si->si_numcsns;
2598
2599 rs->sr_err = access_allowed( op, &e, op->oq_compare.rs_ava->aa_desc,
2600 &op->oq_compare.rs_ava->aa_value, ACL_COMPARE, NULL );
2601 if ( ! rs->sr_err ) {
2602 rs->sr_err = LDAP_INSUFFICIENT_ACCESS;
2603 goto return_results;
2604 }
2605
2606 if ( get_assert( op ) &&
2607 ( test_filter( op, &e, get_assertion( op ) ) != LDAP_COMPARE_TRUE ) )
2608 {
2609 rs->sr_err = LDAP_ASSERTION_FAILED;
2610 goto return_results;
2611 }
2612
2613
2614 rs->sr_err = LDAP_COMPARE_FALSE;
2615
2616 if ( attr_valfind( &a,
2617 SLAP_MR_ATTRIBUTE_VALUE_NORMALIZED_MATCH |
2618 SLAP_MR_ASSERTED_VALUE_NORMALIZED_MATCH,
2619 &op->oq_compare.rs_ava->aa_value, NULL, op->o_tmpmemctx ) == 0 )
2620 {
2621 rs->sr_err = LDAP_COMPARE_TRUE;
2622 }
2623
2624 return_results:;
2625
2626 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
2627
2628 send_ldap_result( op, rs );
2629
2630 if( rs->sr_err == LDAP_COMPARE_FALSE || rs->sr_err == LDAP_COMPARE_TRUE ) {
2631 rs->sr_err = LDAP_SUCCESS;
2632 }
2633 rc = rs->sr_err;
2634 }
2635
2636 return rc;
2637 }
2638
2639 static int
syncprov_op_mod(Operation * op,SlapReply * rs)2640 syncprov_op_mod( Operation *op, SlapReply *rs )
2641 {
2642 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
2643 syncprov_info_t *si = on->on_bi.bi_private;
2644 slap_callback *cb;
2645 opcookie *opc;
2646 int have_psearches, cbsize;
2647
2648 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
2649 have_psearches = ( si->si_ops != NULL );
2650 si->si_active++;
2651 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
2652
2653 cbsize = sizeof(slap_callback) + sizeof(opcookie) +
2654 (have_psearches ? sizeof(modinst) : 0 );
2655
2656 cb = op->o_tmpcalloc(1, cbsize, op->o_tmpmemctx);
2657 opc = (opcookie *)(cb+1);
2658 opc->son = on;
2659 cb->sc_response = syncprov_op_response;
2660 cb->sc_cleanup = syncprov_op_cleanup;
2661 cb->sc_private = opc;
2662 cb->sc_next = op->o_callback;
2663 op->o_callback = cb;
2664
2665 opc->osid = -1;
2666 opc->rsid = -1;
2667 if ( op->o_csn.bv_val ) {
2668 opc->osid = slap_parse_csn_sid( &op->o_csn );
2669 }
2670 if ( op->o_controls ) {
2671 struct sync_cookie *scook =
2672 op->o_controls[slap_cids.sc_LDAPsync];
2673 if ( scook )
2674 opc->rsid = scook->sid;
2675 }
2676
2677 if ( op->o_dont_replicate )
2678 return SLAP_CB_CONTINUE;
2679
2680 /* If there are active persistent searches, lock this operation.
2681 * See seqmod.c for the locking logic on its own.
2682 */
2683 if ( have_psearches ) {
2684 modtarget *mt, mtdummy;
2685 modinst *mi;
2686
2687 mi = (modinst *)(opc+1);
2688 mi->mi_op = op;
2689
2690 /* See if we're already modifying this entry... */
2691 mtdummy.mt_dn = op->o_req_ndn;
2692 retry:
2693 ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
2694 mt = ldap_avl_find( si->si_mods, &mtdummy, sp_avl_cmp );
2695 if ( mt ) {
2696 ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
2697 if ( mt->mt_mods == NULL ) {
2698 /* Cannot reuse this mt, as another thread is about
2699 * to release it in syncprov_op_cleanup. Wait for them
2700 * to finish; our own insert is required to succeed.
2701 */
2702 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2703 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
2704 ldap_pvt_thread_yield();
2705 goto retry;
2706 }
2707 }
2708 if ( mt ) {
2709 mt->mt_tail->mi_next = mi;
2710 mt->mt_tail = mi;
2711 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
2712 /* wait for this op to get to head of list */
2713 while ( mt->mt_mods != mi ) {
2714 modinst *m2;
2715 /* don't wait on other mods from the same thread */
2716 for ( m2 = mt->mt_mods; m2; m2 = m2->mi_next ) {
2717 if ( m2->mi_op->o_threadctx == op->o_threadctx ) {
2718 break;
2719 }
2720 }
2721 if ( m2 )
2722 break;
2723
2724 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2725 /* FIXME: if dynamic config can delete overlays or
2726 * databases we'll have to check for cleanup here.
2727 * Currently it's not an issue because there are
2728 * no dynamic config deletes...
2729 */
2730 if ( slapd_shutdown )
2731 return SLAPD_ABANDON;
2732
2733 if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool ))
2734 ldap_pvt_thread_yield();
2735 ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
2736
2737 /* clean up if the caller is giving up */
2738 if ( op->o_abandon ) {
2739 modinst **m2;
2740 slap_callback **sc;
2741 for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) {
2742 if ( *m2 == mi ) {
2743 *m2 = mi->mi_next;
2744 if ( mt->mt_tail == mi )
2745 mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2;
2746 break;
2747 }
2748 }
2749 for (sc = &op->o_callback; ; sc = &(*sc)->sc_next) {
2750 if ( *sc == cb ) {
2751 *sc = cb->sc_next;
2752 break;
2753 }
2754 }
2755 op->o_tmpfree( cb, op->o_tmpmemctx );
2756 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2757 return SLAPD_ABANDON;
2758 }
2759 }
2760 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2761 } else {
2762 /* Record that we're modifying this entry now */
2763 mt = ch_malloc( sizeof(modtarget) );
2764 mt->mt_mods = mi;
2765 mt->mt_tail = mi;
2766 ber_dupbv( &mt->mt_dn, &mi->mi_op->o_req_ndn );
2767 ldap_pvt_thread_mutex_init( &mt->mt_mutex );
2768 ldap_avl_insert( &si->si_mods, mt, sp_avl_cmp, ldap_avl_dup_error );
2769 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
2770 }
2771 opc->smt = mt;
2772 }
2773
2774 if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )
2775 syncprov_matchops( op, opc, 1 );
2776
2777 return SLAP_CB_CONTINUE;
2778 }
2779
2780 static int
syncprov_op_extended(Operation * op,SlapReply * rs)2781 syncprov_op_extended( Operation *op, SlapReply *rs )
2782 {
2783 if ( exop_is_write( op ))
2784 return syncprov_op_mod( op, rs );
2785
2786 return SLAP_CB_CONTINUE;
2787 }
2788
2789 typedef struct searchstate {
2790 slap_overinst *ss_on;
2791 syncops *ss_so;
2792 BerVarray ss_ctxcsn;
2793 int *ss_sids;
2794 int ss_numcsns;
2795 #define SS_PRESENT 0x01
2796 #define SS_CHANGED 0x02
2797 int ss_flags;
2798 } searchstate;
2799
2800 typedef struct SyncOperationBuffer {
2801 Operation sob_op;
2802 Opheader sob_hdr;
2803 OpExtra sob_oe;
2804 AttributeName sob_extra; /* not always present */
2805 /* Further data allocated here */
2806 } SyncOperationBuffer;
2807
2808 static void
syncprov_detach_op(Operation * op,syncops * so,slap_overinst * on)2809 syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on )
2810 {
2811 SyncOperationBuffer *sopbuf2;
2812 Operation *op2;
2813 int i, alen = 0;
2814 size_t size;
2815 char *ptr;
2816 GroupAssertion *g1, *g2;
2817
2818 /* count the search attrs */
2819 for (i=0; op->ors_attrs && !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) {
2820 alen += op->ors_attrs[i].an_name.bv_len + 1;
2821 }
2822 /* Make a new copy of the operation */
2823 size = offsetof( SyncOperationBuffer, sob_extra ) +
2824 (i ? ( (i+1) * sizeof(AttributeName) + alen) : 0) +
2825 op->o_req_dn.bv_len + 1 +
2826 op->o_req_ndn.bv_len + 1 +
2827 op->o_ndn.bv_len + 1 +
2828 so->s_filterstr.bv_len + 1;
2829 sopbuf2 = ch_calloc( 1, size );
2830 op2 = &sopbuf2->sob_op;
2831 op2->o_hdr = &sopbuf2->sob_hdr;
2832 LDAP_SLIST_FIRST(&op2->o_extra) = &sopbuf2->sob_oe;
2833
2834 /* Copy the fields we care about explicitly, leave the rest alone */
2835 *op2->o_hdr = *op->o_hdr;
2836 op2->o_tag = op->o_tag;
2837 op2->o_time = op->o_time;
2838 op2->o_bd = on->on_info->oi_origdb;
2839 op2->o_request = op->o_request;
2840 op2->o_managedsait = op->o_managedsait;
2841 LDAP_SLIST_FIRST(&op2->o_extra)->oe_key = on;
2842 LDAP_SLIST_NEXT(LDAP_SLIST_FIRST(&op2->o_extra), oe_next) = NULL;
2843
2844 ptr = (char *) sopbuf2 + offsetof( SyncOperationBuffer, sob_extra );
2845 if ( i ) {
2846 op2->ors_attrs = (AttributeName *) ptr;
2847 ptr = (char *) &op2->ors_attrs[i+1];
2848 for (i=0; !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) {
2849 op2->ors_attrs[i] = op->ors_attrs[i];
2850 op2->ors_attrs[i].an_name.bv_val = ptr;
2851 ptr = lutil_strcopy( ptr, op->ors_attrs[i].an_name.bv_val ) + 1;
2852 }
2853 BER_BVZERO( &op2->ors_attrs[i].an_name );
2854 }
2855
2856 op2->o_authz = op->o_authz;
2857 op2->o_ndn.bv_val = ptr;
2858 ptr = lutil_strcopy(ptr, op->o_ndn.bv_val) + 1;
2859 op2->o_dn = op2->o_ndn;
2860 op2->o_req_dn.bv_len = op->o_req_dn.bv_len;
2861 op2->o_req_dn.bv_val = ptr;
2862 ptr = lutil_strcopy(ptr, op->o_req_dn.bv_val) + 1;
2863 op2->o_req_ndn.bv_len = op->o_req_ndn.bv_len;
2864 op2->o_req_ndn.bv_val = ptr;
2865 ptr = lutil_strcopy(ptr, op->o_req_ndn.bv_val) + 1;
2866 op2->ors_filterstr.bv_val = ptr;
2867 strcpy( ptr, so->s_filterstr.bv_val );
2868 op2->ors_filterstr.bv_len = so->s_filterstr.bv_len;
2869
2870 /* Skip the AND/GE clause that we stuck on in front */
2871 if ( so->s_flags & PS_FIX_FILTER ) {
2872 op2->ors_filter = op->ors_filter->f_and->f_next;
2873 so->s_flags ^= PS_FIX_FILTER;
2874 } else {
2875 op2->ors_filter = op->ors_filter;
2876 }
2877 op2->ors_filter = filter_dup( op2->ors_filter, NULL );
2878 so->s_op = op2;
2879
2880 /* Copy any cached group ACLs individually */
2881 op2->o_groups = NULL;
2882 for ( g1=op->o_groups; g1; g1=g1->ga_next ) {
2883 g2 = ch_malloc( sizeof(GroupAssertion) + g1->ga_len );
2884 *g2 = *g1;
2885 strcpy( g2->ga_ndn, g1->ga_ndn );
2886 g2->ga_next = op2->o_groups;
2887 op2->o_groups = g2;
2888 }
2889 /* Don't allow any further group caching */
2890 op2->o_do_not_cache = 1;
2891
2892 /* Add op2 to conn so abandon will find us */
2893 op->o_conn->c_n_ops_executing++;
2894 op->o_conn->c_n_ops_completed--;
2895 LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next );
2896 so->s_flags |= PS_IS_DETACHED;
2897
2898 /* Prevent anyone else from trying to send a result for this op */
2899 op->o_abandon = 1;
2900 }
2901
2902 static int
syncprov_search_response(Operation * op,SlapReply * rs)2903 syncprov_search_response( Operation *op, SlapReply *rs )
2904 {
2905 searchstate *ss = op->o_callback->sc_private;
2906 slap_overinst *on = ss->ss_on;
2907 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
2908 sync_control *srs = op->o_controls[slap_cids.sc_LDAPsync];
2909
2910 if ( rs->sr_type == REP_SEARCH || rs->sr_type == REP_SEARCHREF ) {
2911 Attribute *a;
2912 /* If we got a referral without a referral object, there's
2913 * something missing that we cannot replicate. Just ignore it.
2914 * The consumer will abort because we didn't send the expected
2915 * control.
2916 */
2917 if ( !rs->sr_entry ) {
2918 assert( rs->sr_entry != NULL );
2919 Debug( LDAP_DEBUG_ANY, "%s syncprov_search_response: "
2920 "bogus referral in context\n", op->o_log_prefix );
2921 return SLAP_CB_CONTINUE;
2922 }
2923 a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN );
2924 if ( a == NULL && rs->sr_operational_attrs != NULL ) {
2925 a = attr_find( rs->sr_operational_attrs, slap_schema.si_ad_entryCSN );
2926 }
2927 if ( a ) {
2928 int i, sid;
2929 sid = slap_parse_csn_sid( &a->a_nvals[0] );
2930
2931 /* If not a persistent search */
2932 if ( !ss->ss_so ) {
2933 /* Make sure entry is less than the snapshot'd contextCSN */
2934 for ( i=0; i<ss->ss_numcsns; i++ ) {
2935 if ( sid == ss->ss_sids[i] && ber_bvcmp( &a->a_nvals[0],
2936 &ss->ss_ctxcsn[i] ) > 0 ) {
2937 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: "
2938 "Entry %s CSN %s greater than snapshot %s\n",
2939 op->o_log_prefix,
2940 rs->sr_entry->e_name.bv_val,
2941 a->a_nvals[0].bv_val,
2942 ss->ss_ctxcsn[i].bv_val );
2943 return LDAP_SUCCESS;
2944 }
2945 }
2946 }
2947
2948 /* Don't send old entries twice */
2949 if ( srs->sr_state.ctxcsn ) {
2950 for ( i=0; i<srs->sr_state.numcsns; i++ ) {
2951 if ( sid == srs->sr_state.sids[i] &&
2952 ber_bvcmp( &a->a_nvals[0],
2953 &srs->sr_state.ctxcsn[i] )<= 0 ) {
2954 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: "
2955 "Entry %s CSN %s older or equal to ctx %s\n",
2956 op->o_log_prefix,
2957 rs->sr_entry->e_name.bv_val,
2958 a->a_nvals[0].bv_val,
2959 srs->sr_state.ctxcsn[i].bv_val );
2960 return LDAP_SUCCESS;
2961 }
2962 }
2963 }
2964 }
2965 rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
2966 op->o_tmpmemctx );
2967 rs->sr_ctrls[1] = NULL;
2968 rs->sr_flags |= REP_CTRLS_MUSTBEFREED;
2969 /* If we're in delta-sync mode, always send a cookie */
2970 if ( si->si_nopres && si->si_usehint && a ) {
2971 struct berval cookie;
2972 slap_compose_sync_cookie( op, &cookie, a->a_nvals, srs->sr_state.rid,
2973 slap_serverID ? slap_serverID : -1, NULL );
2974 rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry,
2975 LDAP_SYNC_ADD, rs->sr_ctrls, 0, 1, &cookie );
2976 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
2977 } else {
2978 rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry,
2979 LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL );
2980 }
2981 } else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) {
2982 struct berval cookie = BER_BVNULL;
2983
2984 if ( ( ss->ss_flags & SS_CHANGED ) &&
2985 ss->ss_ctxcsn && !BER_BVISNULL( &ss->ss_ctxcsn[0] )) {
2986 slap_compose_sync_cookie( op, &cookie, ss->ss_ctxcsn,
2987 srs->sr_state.rid,
2988 slap_serverID ? slap_serverID : -1, NULL );
2989
2990 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: cookie=%s\n",
2991 op->o_log_prefix, cookie.bv_val );
2992 }
2993
2994 /* Is this a regular refresh?
2995 * Note: refresh never gets here if there were no changes
2996 */
2997 if ( !ss->ss_so ) {
2998 rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
2999 op->o_tmpmemctx );
3000 rs->sr_ctrls[1] = NULL;
3001 rs->sr_flags |= REP_CTRLS_MUSTBEFREED;
3002 rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls,
3003 0, 1, &cookie, ( ss->ss_flags & SS_PRESENT ) ? LDAP_SYNC_REFRESH_PRESENTS :
3004 LDAP_SYNC_REFRESH_DELETES );
3005 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
3006 } else {
3007 /* It's RefreshAndPersist, transition to Persist phase */
3008 syncprov_sendinfo( op, rs, ( ss->ss_flags & SS_PRESENT ) ?
3009 LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
3010 ( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL,
3011 1, NULL, 0 );
3012 if ( !BER_BVISNULL( &cookie ))
3013 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
3014
3015 /* Detach this Op from frontend control */
3016 ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
3017
3018 /* But not if this connection was closed along the way */
3019 if ( op->o_abandon ) {
3020 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
3021 /* syncprov_ab_cleanup will free this syncop */
3022 return SLAPD_ABANDON;
3023
3024 } else {
3025 ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
3026 /* Turn off the refreshing flag */
3027 ss->ss_so->s_flags ^= PS_IS_REFRESHING;
3028
3029 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: "
3030 "detaching op\n", op->o_log_prefix );
3031 syncprov_detach_op( op, ss->ss_so, on );
3032
3033 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
3034
3035 /* If there are queued responses, fire them off */
3036 if ( ss->ss_so->s_res )
3037 syncprov_qstart( ss->ss_so );
3038 ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
3039 }
3040
3041 return LDAP_SUCCESS;
3042 }
3043 }
3044
3045 return SLAP_CB_CONTINUE;
3046 }
3047
3048 static int
syncprov_op_search(Operation * op,SlapReply * rs)3049 syncprov_op_search( Operation *op, SlapReply *rs )
3050 {
3051 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
3052 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
3053 slap_callback *cb;
3054 int gotstate = 0, changed = 0, do_present = 0;
3055 syncops *sop = NULL;
3056 searchstate *ss;
3057 sync_control *srs;
3058 BerVarray ctxcsn;
3059 int i, *sids, numcsns;
3060 struct berval mincsn, maxcsn;
3061 int minsid, maxsid;
3062 int dirty = 0;
3063
3064 if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
3065
3066 if ( op->ors_deref & LDAP_DEREF_SEARCHING ) {
3067 send_ldap_error( op, rs, LDAP_PROTOCOL_ERROR, "illegal value for derefAliases" );
3068 return rs->sr_err;
3069 }
3070
3071 srs = op->o_controls[slap_cids.sc_LDAPsync];
3072 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: "
3073 "got a %ssearch with a cookie=%s\n",
3074 op->o_log_prefix,
3075 op->o_sync_mode & SLAP_SYNC_PERSIST ? "persistent ": "",
3076 srs->sr_state.octet_str.bv_val );
3077
3078 /* If this is a persistent search, set it up right away */
3079 if ( op->o_sync_mode & SLAP_SYNC_PERSIST ) {
3080 syncops so = {0};
3081 fbase_cookie fc;
3082 opcookie opc;
3083 slap_callback sc = {0};
3084
3085 fc.fss = &so;
3086 fc.fbase = 0;
3087 so.s_eid = NOID;
3088 so.s_op = op;
3089 so.s_flags = PS_IS_REFRESHING | PS_FIND_BASE;
3090 /* syncprov_findbase expects to be called as a callback... */
3091 sc.sc_private = &opc;
3092 opc.son = on;
3093 ldap_pvt_thread_mutex_init( &so.s_mutex );
3094 cb = op->o_callback;
3095 op->o_callback = ≻
3096 rs->sr_err = syncprov_findbase( op, &fc );
3097 op->o_callback = cb;
3098 ldap_pvt_thread_mutex_destroy( &so.s_mutex );
3099
3100 if ( rs->sr_err != LDAP_SUCCESS ) {
3101 send_ldap_result( op, rs );
3102 return rs->sr_err;
3103 }
3104 sop = ch_malloc( sizeof( syncops ));
3105 *sop = so;
3106 sop->s_rid = srs->sr_state.rid;
3107 sop->s_sid = srs->sr_state.sid;
3108 /* set refcount=2 to prevent being freed out from under us
3109 * by abandons that occur while we're running here
3110 */
3111 sop->s_inuse = 2;
3112
3113 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
3114 while ( si->si_active ) {
3115 /* Wait for active mods to finish before proceeding, as they
3116 * may already have inspected the si_ops list looking for
3117 * consumers to replicate the change to. Using the log
3118 * doesn't help, as we may finish playing it before the
3119 * active mods gets added to it.
3120 */
3121 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
3122 if ( slapd_shutdown ) {
3123 ch_free( sop );
3124 return SLAPD_ABANDON;
3125 }
3126 if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool ))
3127 ldap_pvt_thread_yield();
3128 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
3129 }
3130 if ( op->o_abandon ) {
3131 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
3132 ch_free( sop );
3133 return SLAPD_ABANDON;
3134 }
3135 ldap_pvt_thread_mutex_init( &sop->s_mutex );
3136 sop->s_next = si->si_ops;
3137 sop->s_si = si;
3138 si->si_ops = sop;
3139 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
3140 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: "
3141 "registered persistent search\n", op->o_log_prefix );
3142 }
3143
3144 /* snapshot the ctxcsn
3145 * Note: this must not be done before the psearch setup. (ITS#8365)
3146 */
3147 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
3148 numcsns = si->si_numcsns;
3149 if ( numcsns ) {
3150 ber_bvarray_dup_x( &ctxcsn, si->si_ctxcsn, op->o_tmpmemctx );
3151 sids = op->o_tmpalloc( numcsns * sizeof(int), op->o_tmpmemctx );
3152 for ( i=0; i<numcsns; i++ )
3153 sids[i] = si->si_sids[i];
3154 } else {
3155 ctxcsn = NULL;
3156 sids = NULL;
3157 }
3158 dirty = si->si_dirty;
3159 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
3160
3161 /* If we have a cookie, handle the PRESENT lookups */
3162 if ( srs->sr_state.ctxcsn ) {
3163 sessionlog *sl;
3164 int i, j;
3165
3166 /* If we don't have any CSN of our own yet, bail out.
3167 */
3168 if ( !numcsns ) {
3169 rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
3170 rs->sr_text = "consumer has state info but provider doesn't!";
3171 goto bailout;
3172 }
3173
3174 if ( !si->si_nopres )
3175 do_present = SS_PRESENT;
3176
3177 /* If there are SIDs we don't recognize in the cookie, drop them */
3178 for (i=0; i<srs->sr_state.numcsns; ) {
3179 for (j=i; j<numcsns; j++) {
3180 if ( srs->sr_state.sids[i] <= sids[j] ) {
3181 break;
3182 }
3183 }
3184 /* not found */
3185 if ( j == numcsns || srs->sr_state.sids[i] != sids[j] ) {
3186 char *tmp = srs->sr_state.ctxcsn[i].bv_val;
3187 srs->sr_state.numcsns--;
3188 for ( j=i; j<srs->sr_state.numcsns; j++ ) {
3189 srs->sr_state.ctxcsn[j] = srs->sr_state.ctxcsn[j+1];
3190 srs->sr_state.sids[j] = srs->sr_state.sids[j+1];
3191 }
3192 srs->sr_state.ctxcsn[j].bv_val = tmp;
3193 srs->sr_state.ctxcsn[j].bv_len = 0;
3194 continue;
3195 }
3196 i++;
3197 }
3198
3199 if (srs->sr_state.numcsns != numcsns) {
3200 /* consumer doesn't have the right number of CSNs */
3201 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: "
3202 "consumer cookie is missing a csn we track\n",
3203 op->o_log_prefix );
3204 changed = SS_CHANGED;
3205 if ( srs->sr_state.ctxcsn ) {
3206 ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx );
3207 srs->sr_state.ctxcsn = NULL;
3208 }
3209 if ( srs->sr_state.sids ) {
3210 slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx );
3211 srs->sr_state.sids = NULL;
3212 }
3213 srs->sr_state.numcsns = 0;
3214 goto shortcut;
3215 }
3216
3217 /* Find the smallest CSN which differs from contextCSN */
3218 mincsn.bv_len = 0;
3219 maxcsn.bv_len = 0;
3220 for ( i=0,j=0; i<srs->sr_state.numcsns; i++ ) {
3221 int newer;
3222 while ( srs->sr_state.sids[i] != sids[j] ) j++;
3223 if ( BER_BVISEMPTY( &maxcsn ) || ber_bvcmp( &maxcsn,
3224 &srs->sr_state.ctxcsn[i] ) < 0 ) {
3225 maxcsn = srs->sr_state.ctxcsn[i];
3226 maxsid = sids[j];
3227 }
3228 newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] );
3229 /* If our state is newer, tell consumer about changes */
3230 if ( newer < 0) {
3231 changed = SS_CHANGED;
3232 if ( BER_BVISEMPTY( &mincsn ) || ber_bvcmp( &mincsn,
3233 &srs->sr_state.ctxcsn[i] ) > 0 ) {
3234 mincsn = srs->sr_state.ctxcsn[i];
3235 minsid = sids[j];
3236 }
3237 } else if ( newer > 0 && sids[j] == slap_serverID ) {
3238 /* our state is older, complain to consumer */
3239 rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
3240 rs->sr_text = "consumer state is newer than provider!";
3241 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: "
3242 "consumer %d state %s is newer than provider %d state %s\n",
3243 op->o_log_prefix, sids[i], srs->sr_state.ctxcsn[i].bv_val,
3244 sids[j], /* == slap_serverID */
3245 ctxcsn[j].bv_val);
3246 bailout:
3247 if ( sop ) {
3248 syncops **sp = &si->si_ops;
3249
3250 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
3251 while ( *sp != sop )
3252 sp = &(*sp)->s_next;
3253 *sp = sop->s_next;
3254 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
3255 ch_free( sop );
3256 }
3257 rs->sr_ctrls = NULL;
3258 send_ldap_result( op, rs );
3259 return rs->sr_err;
3260 }
3261 }
3262 if ( BER_BVISEMPTY( &mincsn )) {
3263 mincsn = maxcsn;
3264 minsid = maxsid;
3265 }
3266
3267 /* If nothing has changed, shortcut it */
3268 if ( !changed && !dirty ) {
3269 do_present = 0;
3270 no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
3271 LDAPControl *ctrls[2];
3272
3273 ctrls[0] = NULL;
3274 ctrls[1] = NULL;
3275 syncprov_done_ctrl( op, rs, ctrls, 0, 0,
3276 NULL, LDAP_SYNC_REFRESH_DELETES );
3277 rs->sr_ctrls = ctrls;
3278 rs->sr_err = LDAP_SUCCESS;
3279 send_ldap_result( op, rs );
3280 rs->sr_ctrls = NULL;
3281 return rs->sr_err;
3282 }
3283 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: "
3284 "no change, skipping log replay\n",
3285 op->o_log_prefix );
3286 goto shortcut;
3287 }
3288
3289 if ( !BER_BVISNULL( &si->si_logbase ) ) {
3290 do_present = 0;
3291 if ( syncprov_play_accesslog( op, rs, srs, ctxcsn,
3292 numcsns, sids, &mincsn, minsid ) ) {
3293 do_present = SS_PRESENT;
3294 }
3295 } else if ( si->si_logs ) {
3296 do_present = 0;
3297 if ( syncprov_play_sessionlog( op, rs, srs, ctxcsn,
3298 numcsns, sids, &mincsn, minsid ) ) {
3299 do_present = SS_PRESENT;
3300 }
3301 }
3302 /*
3303 * If sessionlog wasn't useful, see if we can find at least one entry
3304 * that hasn't changed based on the cookie.
3305 *
3306 * TODO: Using mincsn only (rather than the whole cookie) will
3307 * under-approximate the set of entries that haven't changed, but we
3308 * can't look up CSNs by serverid with the current indexing support.
3309 *
3310 * As a result, dormant serverids in the cluster become mincsns and
3311 * more likely to make syncprov_findcsn(,FIND_CSN,) fail -> triggering
3312 * an expensive refresh...
3313 */
3314 if ( !do_present ) {
3315 gotstate = 1;
3316 } else if ( syncprov_findcsn( op, FIND_CSN, &mincsn ) != LDAP_SUCCESS ) {
3317 /* No, so a reload is required */
3318 /* the 2.2 consumer doesn't send this hint */
3319 if ( si->si_usehint && srs->sr_rhint == 0 ) {
3320 if ( ctxcsn )
3321 ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx );
3322 if ( sids )
3323 op->o_tmpfree( sids, op->o_tmpmemctx );
3324 rs->sr_err = LDAP_SYNC_REFRESH_REQUIRED;
3325 rs->sr_text = "sync cookie is stale";
3326 goto bailout;
3327 }
3328 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: "
3329 "failed to find entry with csn=%s, ignoring cookie\n",
3330 op->o_log_prefix, mincsn.bv_val );
3331 if ( srs->sr_state.ctxcsn ) {
3332 ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx );
3333 srs->sr_state.ctxcsn = NULL;
3334 }
3335 if ( srs->sr_state.sids ) {
3336 slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx );
3337 srs->sr_state.sids = NULL;
3338 }
3339 srs->sr_state.numcsns = 0;
3340 } else {
3341 gotstate = 1;
3342 /* If changed and doing Present lookup, send Present UUIDs */
3343 if ( syncprov_findcsn( op, FIND_PRESENT, 0 ) != LDAP_SUCCESS ) {
3344 if ( ctxcsn )
3345 ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx );
3346 if ( sids )
3347 op->o_tmpfree( sids, op->o_tmpmemctx );
3348 goto bailout;
3349 }
3350 }
3351 } else {
3352 /* The consumer knows nothing, we know nothing. OK. */
3353 if (!numcsns)
3354 goto no_change;
3355 /* No consumer state, assume something has changed */
3356 changed = SS_CHANGED;
3357 }
3358
3359 shortcut:
3360 /* Append CSN range to search filter, save original filter
3361 * for persistent search evaluation
3362 */
3363 if ( sop ) {
3364 ldap_pvt_thread_mutex_lock( &sop->s_mutex );
3365 sop->s_filterstr = op->ors_filterstr;
3366 /* correct the refcount that was set to 2 before */
3367 sop->s_inuse--;
3368 }
3369
3370 /* If something changed, find the changes */
3371 if ( gotstate && ( changed || dirty ) ) {
3372 Filter *fand, *fava;
3373
3374 fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
3375 fand->f_choice = LDAP_FILTER_AND;
3376 fand->f_next = NULL;
3377 fava = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
3378 fand->f_and = fava;
3379 fava->f_choice = LDAP_FILTER_GE;
3380 fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
3381 fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
3382 #ifdef LDAP_COMP_MATCH
3383 fava->f_ava->aa_cf = NULL;
3384 #endif
3385 ber_dupbv_x( &fava->f_ava->aa_value, &mincsn, op->o_tmpmemctx );
3386 fava->f_next = op->ors_filter;
3387 op->ors_filter = fand;
3388 filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
3389 if ( sop ) {
3390 sop->s_flags |= PS_FIX_FILTER;
3391 }
3392 }
3393 if ( sop ) {
3394 ldap_pvt_thread_mutex_unlock( &sop->s_mutex );
3395 }
3396
3397 /* Let our callback add needed info to returned entries */
3398 cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(searchstate), op->o_tmpmemctx);
3399 ss = (searchstate *)(cb+1);
3400 ss->ss_on = on;
3401 ss->ss_so = sop;
3402 ss->ss_flags = do_present | changed;
3403 ss->ss_ctxcsn = ctxcsn;
3404 ss->ss_numcsns = numcsns;
3405 ss->ss_sids = sids;
3406 cb->sc_response = syncprov_search_response;
3407 cb->sc_private = ss;
3408 cb->sc_next = op->o_callback;
3409 op->o_callback = cb;
3410
3411 /* If this is a persistent search and no changes were reported during
3412 * the refresh phase, just invoke the response callback to transition
3413 * us into persist phase
3414 */
3415 if ( !changed && !dirty ) {
3416 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: "
3417 "nothing changed, finishing up initial search early\n",
3418 op->o_log_prefix );
3419 rs->sr_err = LDAP_SUCCESS;
3420 rs->sr_nentries = 0;
3421 send_ldap_result( op, rs );
3422 return rs->sr_err;
3423 }
3424 return SLAP_CB_CONTINUE;
3425 }
3426
3427 static int
syncprov_operational(Operation * op,SlapReply * rs)3428 syncprov_operational(
3429 Operation *op,
3430 SlapReply *rs )
3431 {
3432 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
3433 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
3434
3435 /* This prevents generating unnecessarily; frontend will strip
3436 * any statically stored copy.
3437 */
3438 if ( op->o_sync != SLAP_CONTROL_NONE )
3439 return SLAP_CB_CONTINUE;
3440
3441 if ( rs->sr_entry &&
3442 dn_match( &rs->sr_entry->e_nname, &si->si_contextdn )) {
3443
3444 if ( SLAP_OPATTRS( rs->sr_attr_flags ) ||
3445 ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) {
3446 Attribute *a, **ap = NULL;
3447
3448 for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) {
3449 if ( a->a_desc == slap_schema.si_ad_contextCSN )
3450 break;
3451 }
3452
3453 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
3454 if ( si->si_ctxcsn ) {
3455 if ( !a ) {
3456 for ( ap = &rs->sr_operational_attrs; *ap;
3457 ap=&(*ap)->a_next );
3458
3459 a = attr_alloc( slap_schema.si_ad_contextCSN );
3460 *ap = a;
3461 }
3462
3463 if ( !ap ) {
3464 if ( rs_entry2modifiable( op, rs, on )) {
3465 a = attr_find( rs->sr_entry->e_attrs,
3466 slap_schema.si_ad_contextCSN );
3467 }
3468 if ( a->a_nvals != a->a_vals ) {
3469 ber_bvarray_free( a->a_nvals );
3470 }
3471 a->a_nvals = NULL;
3472 ber_bvarray_free( a->a_vals );
3473 a->a_vals = NULL;
3474 a->a_numvals = 0;
3475 }
3476 attr_valadd( a, si->si_ctxcsn, si->si_ctxcsn, si->si_numcsns );
3477 }
3478 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
3479 }
3480 }
3481 return SLAP_CB_CONTINUE;
3482 }
3483
3484 static int
syncprov_setup_accesslog(void)3485 syncprov_setup_accesslog(void)
3486 {
3487 const char *text;
3488 int rc = -1;
3489
3490 if ( !ad_reqType ) {
3491 if ( slap_str2ad( "reqType", &ad_reqType, &text ) ) {
3492 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: "
3493 "couldn't get definition for attribute reqType, "
3494 "is accessslog configured?\n" );
3495 return rc;
3496 }
3497 }
3498
3499 if ( !ad_reqResult ) {
3500 if ( slap_str2ad( "reqResult", &ad_reqResult, &text ) ) {
3501 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: "
3502 "couldn't get definition for attribute reqResult, "
3503 "is accessslog configured?\n" );
3504 return rc;
3505 }
3506 }
3507
3508 if ( !ad_reqDN ) {
3509 if ( slap_str2ad( "reqDN", &ad_reqDN, &text ) ) {
3510 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: "
3511 "couldn't get definition for attribute reqDN, "
3512 "is accessslog configured?\n" );
3513 return rc;
3514 }
3515 }
3516
3517 if ( !ad_reqEntryUUID ) {
3518 if ( slap_str2ad( "reqEntryUUID", &ad_reqEntryUUID, &text ) ) {
3519 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: "
3520 "couldn't get definition for attribute reqEntryUUID, "
3521 "is accessslog configured?\n" );
3522 return rc;
3523 }
3524 }
3525
3526 if ( !ad_reqNewDN ) {
3527 if ( slap_str2ad( "reqNewDN", &ad_reqNewDN, &text ) ) {
3528 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: "
3529 "couldn't get definition for attribute reqNewDN, "
3530 "is accessslog configured?\n" );
3531 return rc;
3532 }
3533 }
3534
3535 if ( !ad_minCSN ) {
3536 if ( slap_str2ad( "minCSN", &ad_minCSN, &text ) ) {
3537 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: "
3538 "couldn't get definition for attribute minCSN, "
3539 "is accessslog configured?\n" );
3540 return rc;
3541 }
3542 }
3543
3544 return LDAP_SUCCESS;
3545 }
3546
3547 enum {
3548 SP_CHKPT = 1,
3549 SP_SESSL,
3550 SP_NOPRES,
3551 SP_USEHINT,
3552 SP_LOGDB
3553 };
3554
3555 static ConfigDriver sp_cf_gen;
3556
3557 static ConfigTable spcfg[] = {
3558 { "syncprov-checkpoint", "ops> <minutes", 3, 3, 0, ARG_MAGIC|SP_CHKPT,
3559 sp_cf_gen, "( OLcfgOvAt:1.1 NAME 'olcSpCheckpoint' "
3560 "DESC 'ContextCSN checkpoint interval in ops and minutes' "
3561 "EQUALITY caseIgnoreMatch "
3562 "SYNTAX OMsDirectoryString SINGLE-VALUE )", NULL, NULL },
3563 { "syncprov-sessionlog", "ops", 2, 2, 0, ARG_INT|ARG_MAGIC|SP_SESSL,
3564 sp_cf_gen, "( OLcfgOvAt:1.2 NAME 'olcSpSessionlog' "
3565 "DESC 'Session log size in ops' "
3566 "EQUALITY integerMatch "
3567 "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
3568 { "syncprov-nopresent", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_NOPRES,
3569 sp_cf_gen, "( OLcfgOvAt:1.3 NAME 'olcSpNoPresent' "
3570 "DESC 'Omit Present phase processing' "
3571 "EQUALITY booleanMatch "
3572 "SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL },
3573 { "syncprov-reloadhint", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_USEHINT,
3574 sp_cf_gen, "( OLcfgOvAt:1.4 NAME 'olcSpReloadHint' "
3575 "DESC 'Observe Reload Hint in Request control' "
3576 "EQUALITY booleanMatch "
3577 "SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL },
3578 { "syncprov-sessionlog-source", NULL, 2, 2, 0, ARG_DN|ARG_QUOTE|ARG_MAGIC|SP_LOGDB,
3579 sp_cf_gen, "( OLcfgOvAt:1.5 NAME 'olcSpSessionlogSource' "
3580 "DESC 'On startup, try loading sessionlog from this subtree' "
3581 "SYNTAX OMsDN SINGLE-VALUE )", NULL, NULL },
3582 { NULL, NULL, 0, 0, 0, ARG_IGNORED }
3583 };
3584
3585 static ConfigOCs spocs[] = {
3586 { "( OLcfgOvOc:1.1 "
3587 "NAME 'olcSyncProvConfig' "
3588 "DESC 'SyncRepl Provider configuration' "
3589 "SUP olcOverlayConfig "
3590 "MAY ( olcSpCheckpoint "
3591 "$ olcSpSessionlog "
3592 "$ olcSpNoPresent "
3593 "$ olcSpReloadHint "
3594 "$ olcSpSessionlogSource "
3595 ") )",
3596 Cft_Overlay, spcfg },
3597 { NULL, 0, NULL }
3598 };
3599
3600 static int
sp_cf_gen(ConfigArgs * c)3601 sp_cf_gen(ConfigArgs *c)
3602 {
3603 slap_overinst *on = (slap_overinst *)c->bi;
3604 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
3605 int rc = 0;
3606
3607 if ( c->op == SLAP_CONFIG_EMIT ) {
3608 switch ( c->type ) {
3609 case SP_CHKPT:
3610 if ( si->si_chkops || si->si_chktime ) {
3611 struct berval bv;
3612 /* we assume si_chktime is a multiple of 60
3613 * because the parsed value was originally
3614 * multiplied by 60 */
3615 bv.bv_len = snprintf( c->cr_msg, sizeof( c->cr_msg ),
3616 "%d %d", si->si_chkops, si->si_chktime/60 );
3617 if ( bv.bv_len >= sizeof( c->cr_msg ) ) {
3618 rc = 1;
3619 } else {
3620 bv.bv_val = c->cr_msg;
3621 value_add_one( &c->rvalue_vals, &bv );
3622 }
3623 } else {
3624 rc = 1;
3625 }
3626 break;
3627 case SP_SESSL:
3628 if ( si->si_logs ) {
3629 c->value_int = si->si_logs->sl_size;
3630 } else {
3631 rc = 1;
3632 }
3633 break;
3634 case SP_NOPRES:
3635 if ( si->si_nopres ) {
3636 c->value_int = 1;
3637 } else {
3638 rc = 1;
3639 }
3640 break;
3641 case SP_USEHINT:
3642 if ( si->si_usehint ) {
3643 c->value_int = 1;
3644 } else {
3645 rc = 1;
3646 }
3647 break;
3648 case SP_LOGDB:
3649 if ( BER_BVISEMPTY( &si->si_logbase ) ) {
3650 rc = 1;
3651 } else {
3652 value_add_one( &c->rvalue_vals, &si->si_logbase );
3653 value_add_one( &c->rvalue_nvals, &si->si_logbase );
3654 }
3655 break;
3656 }
3657 return rc;
3658 } else if ( c->op == LDAP_MOD_DELETE ) {
3659 switch ( c->type ) {
3660 case SP_CHKPT:
3661 si->si_chkops = 0;
3662 si->si_chktime = 0;
3663 break;
3664 case SP_SESSL:
3665 if ( si->si_logs )
3666 si->si_logs->sl_size = 0;
3667 break;
3668 case SP_NOPRES:
3669 si->si_nopres = 0;
3670 break;
3671 case SP_USEHINT:
3672 si->si_usehint = 0;
3673 break;
3674 case SP_LOGDB:
3675 if ( !BER_BVISNULL( &si->si_logbase ) ) {
3676 ch_free( si->si_logbase.bv_val );
3677 BER_BVZERO( &si->si_logbase );
3678 }
3679 break;
3680 }
3681 return rc;
3682 }
3683 switch ( c->type ) {
3684 case SP_CHKPT:
3685 if ( lutil_atoi( &si->si_chkops, c->argv[1] ) != 0 ) {
3686 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint ops # \"%s\"",
3687 c->argv[0], c->argv[1] );
3688 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
3689 "%s: %s\n", c->log, c->cr_msg );
3690 return ARG_BAD_CONF;
3691 }
3692 if ( si->si_chkops <= 0 ) {
3693 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint ops # \"%d\"",
3694 c->argv[0], si->si_chkops );
3695 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
3696 "%s: %s\n", c->log, c->cr_msg );
3697 return ARG_BAD_CONF;
3698 }
3699 if ( lutil_atoi( &si->si_chktime, c->argv[2] ) != 0 ) {
3700 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint time \"%s\"",
3701 c->argv[0], c->argv[1] );
3702 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
3703 "%s: %s\n", c->log, c->cr_msg );
3704 return ARG_BAD_CONF;
3705 }
3706 if ( si->si_chktime <= 0 ) {
3707 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint time \"%d\"",
3708 c->argv[0], si->si_chkops );
3709 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
3710 "%s: %s\n", c->log, c->cr_msg );
3711 return ARG_BAD_CONF;
3712 }
3713 si->si_chktime *= 60;
3714 break;
3715 case SP_SESSL: {
3716 sessionlog *sl;
3717 int size = c->value_int;
3718
3719 if ( size < 0 ) {
3720 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s size %d is negative",
3721 c->argv[0], size );
3722 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
3723 "%s: %s\n", c->log, c->cr_msg );
3724 return ARG_BAD_CONF;
3725 }
3726 if ( size && !BER_BVISNULL( &si->si_logbase ) ) {
3727 Debug( LDAP_DEBUG_ANY, "syncprov_config: while configuring "
3728 "internal sessionlog, accesslog source has already been "
3729 "configured, this results in wasteful operation\n" );
3730 }
3731 sl = si->si_logs;
3732 if ( !sl ) {
3733 if ( !size ) break;
3734 sl = ch_calloc( 1, sizeof( sessionlog ));
3735 ldap_pvt_thread_rdwr_init( &sl->sl_mutex );
3736 si->si_logs = sl;
3737 }
3738 sl->sl_size = size;
3739 }
3740 break;
3741 case SP_NOPRES:
3742 si->si_nopres = c->value_int;
3743 break;
3744 case SP_USEHINT:
3745 si->si_usehint = c->value_int;
3746 break;
3747 case SP_LOGDB:
3748 if ( si->si_logs ) {
3749 Debug( LDAP_DEBUG_ANY, "syncprov_config: while configuring "
3750 "accesslog source, internal sessionlog has already been "
3751 "configured, this results in wasteful operation\n" );
3752 }
3753 if ( CONFIG_ONLINE_ADD( c ) ) {
3754 if ( !select_backend( &c->value_ndn, 0 ) ) {
3755 snprintf( c->cr_msg, sizeof( c->cr_msg ),
3756 "<%s> no matching backend found for suffix",
3757 c->argv[0] );
3758 Debug( LDAP_DEBUG_ANY, "%s: %s \"%s\"\n",
3759 c->log, c->cr_msg, c->value_dn.bv_val );
3760 rc = 1;
3761 break;
3762 }
3763 ch_free( c->value_ndn.bv_val );
3764 }
3765 si->si_logbase = c->value_ndn;
3766 rc = syncprov_setup_accesslog();
3767 ch_free( c->value_dn.bv_val );
3768 break;
3769 }
3770 return rc;
3771 }
3772
3773 /* ITS#3456 we cannot run this search on the main thread, must use a
3774 * child thread in order to insure we have a big enough stack.
3775 */
3776 static void *
syncprov_db_otask(void * ptr)3777 syncprov_db_otask(
3778 void *ptr
3779 )
3780 {
3781 syncprov_findcsn( ptr, FIND_MAXCSN, 0 );
3782 return NULL;
3783 }
3784
3785 static int
syncprov_db_ocallback(Operation * op,SlapReply * rs)3786 syncprov_db_ocallback(
3787 Operation *op,
3788 SlapReply *rs
3789 )
3790 {
3791 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
3792 if ( rs->sr_entry->e_name.bv_len )
3793 op->o_callback->sc_private = (void *)1;
3794 }
3795 return LDAP_SUCCESS;
3796 }
3797
3798 /* ITS#9015 see if the DB is really empty */
3799 static void *
syncprov_db_otask2(void * ptr)3800 syncprov_db_otask2(
3801 void *ptr
3802 )
3803 {
3804 Operation *op = ptr;
3805 SlapReply rs = {REP_RESULT};
3806 slap_callback cb = {0};
3807 int rc;
3808
3809 cb.sc_response = syncprov_db_ocallback;
3810
3811 op->o_managedsait = SLAP_CONTROL_CRITICAL;
3812 op->o_callback = &cb;
3813 op->o_tag = LDAP_REQ_SEARCH;
3814 op->ors_scope = LDAP_SCOPE_SUBTREE;
3815 op->ors_limit = NULL;
3816 op->ors_slimit = 1;
3817 op->ors_tlimit = SLAP_NO_LIMIT;
3818 op->ors_attrs = slap_anlist_no_attrs;
3819 op->ors_attrsonly = 1;
3820 op->ors_deref = LDAP_DEREF_NEVER;
3821 op->ors_filter = &generic_filter;
3822 op->ors_filterstr = generic_filterstr;
3823 rc = op->o_bd->be_search( op, &rs );
3824 if ( rc == LDAP_SIZELIMIT_EXCEEDED || cb.sc_private )
3825 op->ors_slimit = 2;
3826 return NULL;
3827 }
3828
3829 /* Read any existing contextCSN from the underlying db.
3830 * Then search for any entries newer than that. If no value exists,
3831 * just generate it. Cache whatever result.
3832 */
3833 static int
syncprov_db_open(BackendDB * be,ConfigReply * cr)3834 syncprov_db_open(
3835 BackendDB *be,
3836 ConfigReply *cr
3837 )
3838 {
3839 slap_overinst *on = (slap_overinst *) be->bd_info;
3840 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
3841
3842 Connection conn = { 0 };
3843 OperationBuffer opbuf;
3844 Operation *op;
3845 Entry *e = NULL;
3846 Attribute *a;
3847 int rc;
3848 void *thrctx = NULL;
3849
3850 if ( !SLAP_LASTMOD( be )) {
3851 Debug( LDAP_DEBUG_ANY,
3852 "syncprov_db_open: invalid config, lastmod must be enabled\n" );
3853 return -1;
3854 }
3855
3856 if ( slapMode & SLAP_TOOL_MODE ) {
3857 return 0;
3858 }
3859
3860 rc = overlay_register_control( be, LDAP_CONTROL_SYNC );
3861 if ( rc ) {
3862 return rc;
3863 }
3864
3865 Debug( LDAP_DEBUG_SYNC, "syncprov_db_open: "
3866 "starting syncprov for suffix %s\n",
3867 be->be_suffix[0].bv_val );
3868
3869 thrctx = ldap_pvt_thread_pool_context();
3870 connection_fake_init2( &conn, &opbuf, thrctx, 0 );
3871 op = &opbuf.ob_op;
3872 op->o_bd = be;
3873 op->o_dn = be->be_rootdn;
3874 op->o_ndn = be->be_rootndn;
3875
3876 if ( SLAP_SYNC_SUBENTRY( be )) {
3877 build_new_dn( &si->si_contextdn, be->be_nsuffix,
3878 (struct berval *)&slap_ldapsync_cn_bv, NULL );
3879 } else {
3880 si->si_contextdn = be->be_nsuffix[0];
3881 }
3882 rc = overlay_entry_get_ov( op, &si->si_contextdn, NULL,
3883 slap_schema.si_ad_contextCSN, 0, &e, on );
3884
3885 if ( e ) {
3886 ldap_pvt_thread_t tid;
3887
3888 a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN );
3889 if ( a ) {
3890 ber_bvarray_dup_x( &si->si_ctxcsn, a->a_vals, NULL );
3891 si->si_numcsns = a->a_numvals;
3892 si->si_sids = slap_parse_csn_sids( si->si_ctxcsn, a->a_numvals, NULL );
3893 slap_sort_csn_sids( si->si_ctxcsn, si->si_sids, si->si_numcsns, NULL );
3894 }
3895 overlay_entry_release_ov( op, e, 0, on );
3896 if ( si->si_ctxcsn && !SLAP_DBCLEAN( be )) {
3897 op->o_tag = LDAP_REQ_SEARCH;
3898 op->o_req_dn = be->be_suffix[0];
3899 op->o_req_ndn = be->be_nsuffix[0];
3900 op->ors_scope = LDAP_SCOPE_SUBTREE;
3901 ldap_pvt_thread_create( &tid, 0, syncprov_db_otask, op );
3902 ldap_pvt_thread_join( tid, NULL );
3903 }
3904 }
3905
3906 /* Didn't find a contextCSN, should we generate one? */
3907 if ( !si->si_ctxcsn ) {
3908 char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ];
3909 struct berval csn;
3910
3911 if ( SLAP_SINGLE_SHADOW( op->o_bd ) ) {
3912 /* Not in charge of this serverID, don't generate anything. */
3913 goto out;
3914 }
3915 if ( !SLAP_SYNC_SUBENTRY( be ) && rc != LDAP_SUCCESS
3916 && rc != LDAP_NO_SUCH_ATTRIBUTE ) {
3917 /* If the DB is genuinely empty, don't generate one either. */
3918 goto out;
3919 }
3920 if ( !si->si_contextdn.bv_len ) {
3921 ldap_pvt_thread_t tid;
3922 /* a glue entry here with no contextCSN might mean an empty DB.
3923 * we need to search for children, to be sure.
3924 */
3925 op->o_req_dn = be->be_suffix[0];
3926 op->o_req_ndn = be->be_nsuffix[0];
3927 op->o_bd->bd_info = (BackendInfo *)on->on_info;
3928 ldap_pvt_thread_create( &tid, 0, syncprov_db_otask2, op );
3929 ldap_pvt_thread_join( tid, NULL );
3930 if ( op->ors_slimit == 1 )
3931 goto out;
3932 }
3933
3934 csn.bv_val = csnbuf;
3935 csn.bv_len = sizeof( csnbuf );
3936 slap_get_csn( op, &csn, 0 );
3937 value_add_one( &si->si_ctxcsn, &csn );
3938 si->si_numcsns = 1;
3939 si->si_sids = ch_malloc( sizeof(int) );
3940 si->si_sids[0] = slap_serverID;
3941 Debug( LDAP_DEBUG_SYNC, "syncprov_db_open: "
3942 "generated a new ctxcsn=%s for suffix %s\n",
3943 csn.bv_val, be->be_suffix[0].bv_val );
3944
3945 /* make sure we do a checkpoint on close */
3946 si->si_numops++;
3947 }
3948
3949 /* Initialize the sessionlog mincsn */
3950 if ( si->si_logs && si->si_numcsns ) {
3951 sessionlog *sl = si->si_logs;
3952 int i;
3953 ber_bvarray_dup_x( &sl->sl_mincsn, si->si_ctxcsn, NULL );
3954 sl->sl_numcsns = si->si_numcsns;
3955 sl->sl_sids = ch_malloc( si->si_numcsns * sizeof(int) );
3956 for ( i=0; i < si->si_numcsns; i++ )
3957 sl->sl_sids[i] = si->si_sids[i];
3958 }
3959
3960 if ( !BER_BVISNULL( &si->si_logbase ) ) {
3961 BackendDB *db = select_backend( &si->si_logbase, 0 );
3962 if ( !db ) {
3963 Debug( LDAP_DEBUG_ANY, "syncprov_db_open: "
3964 "configured accesslog database dn='%s' not present\n",
3965 si->si_logbase.bv_val );
3966 return -1;
3967 }
3968 }
3969
3970 out:
3971 op->o_bd->bd_info = (BackendInfo *)on;
3972 return 0;
3973 }
3974
3975 /* Write the current contextCSN into the underlying db.
3976 */
3977 static int
syncprov_db_close(BackendDB * be,ConfigReply * cr)3978 syncprov_db_close(
3979 BackendDB *be,
3980 ConfigReply *cr
3981 )
3982 {
3983 slap_overinst *on = (slap_overinst *) be->bd_info;
3984 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
3985 #ifdef SLAP_CONFIG_DELETE
3986 syncops *so, *sonext;
3987 #endif /* SLAP_CONFIG_DELETE */
3988
3989 if ( slapMode & SLAP_TOOL_MODE ) {
3990 return 0;
3991 }
3992 if ( si->si_numops ) {
3993 Connection conn = {0};
3994 OperationBuffer opbuf;
3995 Operation *op;
3996 void *thrctx;
3997
3998 thrctx = ldap_pvt_thread_pool_context();
3999 connection_fake_init2( &conn, &opbuf, thrctx, 0 );
4000 op = &opbuf.ob_op;
4001 op->o_bd = be;
4002 op->o_dn = be->be_rootdn;
4003 op->o_ndn = be->be_rootndn;
4004 syncprov_checkpoint( op, on );
4005 }
4006
4007 #ifdef SLAP_CONFIG_DELETE
4008 if ( !slapd_shutdown ) {
4009 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
4010 for ( so=si->si_ops, sonext=so; so; so=sonext ) {
4011 SlapReply rs = {REP_RESULT};
4012 rs.sr_err = LDAP_UNAVAILABLE;
4013 ldap_pvt_thread_mutex_lock( &so->s_mutex );
4014 send_ldap_result( so->s_op, &rs );
4015 sonext=so->s_next;
4016 if ( so->s_flags & PS_TASK_QUEUED )
4017 ldap_pvt_thread_pool_retract( so->s_pool_cookie );
4018 ldap_pvt_thread_mutex_unlock( &so->s_mutex );
4019 if ( !syncprov_drop_psearch( so, 0 ))
4020 so->s_si = NULL;
4021 }
4022 si->si_ops=NULL;
4023 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
4024 }
4025 overlay_unregister_control( be, LDAP_CONTROL_SYNC );
4026 #endif /* SLAP_CONFIG_DELETE */
4027
4028 return 0;
4029 }
4030
4031 static int
syncprov_db_init(BackendDB * be,ConfigReply * cr)4032 syncprov_db_init(
4033 BackendDB *be,
4034 ConfigReply *cr
4035 )
4036 {
4037 slap_overinst *on = (slap_overinst *)be->bd_info;
4038 syncprov_info_t *si;
4039
4040 if ( SLAP_ISGLOBALOVERLAY( be ) ) {
4041 Debug( LDAP_DEBUG_ANY,
4042 "syncprov must be instantiated within a database.\n" );
4043 return 1;
4044 }
4045
4046 si = ch_calloc(1, sizeof(syncprov_info_t));
4047 on->on_bi.bi_private = si;
4048 ldap_pvt_thread_rdwr_init( &si->si_csn_rwlock );
4049 ldap_pvt_thread_mutex_init( &si->si_ops_mutex );
4050 ldap_pvt_thread_mutex_init( &si->si_mods_mutex );
4051 ldap_pvt_thread_mutex_init( &si->si_resp_mutex );
4052
4053 csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN;
4054 csn_anlist[0].an_name = slap_schema.si_ad_entryCSN->ad_cname;
4055 csn_anlist[1].an_desc = slap_schema.si_ad_entryUUID;
4056 csn_anlist[1].an_name = slap_schema.si_ad_entryUUID->ad_cname;
4057
4058 uuid_anlist[0].an_desc = slap_schema.si_ad_entryUUID;
4059 uuid_anlist[0].an_name = slap_schema.si_ad_entryUUID->ad_cname;
4060
4061 return 0;
4062 }
4063
4064 static int
syncprov_db_destroy(BackendDB * be,ConfigReply * cr)4065 syncprov_db_destroy(
4066 BackendDB *be,
4067 ConfigReply *cr
4068 )
4069 {
4070 slap_overinst *on = (slap_overinst *)be->bd_info;
4071 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
4072
4073 if ( si ) {
4074 if ( si->si_logs ) {
4075 sessionlog *sl = si->si_logs;
4076
4077 ldap_tavl_free( sl->sl_entries, (AVL_FREE)ch_free );
4078 if ( sl->sl_mincsn )
4079 ber_bvarray_free( sl->sl_mincsn );
4080 if ( sl->sl_sids )
4081 ch_free( sl->sl_sids );
4082
4083 ldap_pvt_thread_rdwr_destroy(&si->si_logs->sl_mutex);
4084 ch_free( si->si_logs );
4085 }
4086 if ( si->si_ctxcsn )
4087 ber_bvarray_free( si->si_ctxcsn );
4088 if ( si->si_sids )
4089 ch_free( si->si_sids );
4090 ldap_pvt_thread_mutex_destroy( &si->si_resp_mutex );
4091 ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex );
4092 ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex );
4093 ldap_pvt_thread_rdwr_destroy( &si->si_csn_rwlock );
4094 ch_free( si );
4095 }
4096
4097 return 0;
4098 }
4099
syncprov_parseCtrl(Operation * op,SlapReply * rs,LDAPControl * ctrl)4100 static int syncprov_parseCtrl (
4101 Operation *op,
4102 SlapReply *rs,
4103 LDAPControl *ctrl )
4104 {
4105 ber_tag_t tag;
4106 BerElementBuffer berbuf;
4107 BerElement *ber = (BerElement *)&berbuf;
4108 ber_int_t mode;
4109 ber_len_t len;
4110 struct berval cookie = BER_BVNULL;
4111 sync_control *sr;
4112 int rhint = 0;
4113
4114 if ( op->o_sync != SLAP_CONTROL_NONE ) {
4115 rs->sr_text = "Sync control specified multiple times";
4116 return LDAP_PROTOCOL_ERROR;
4117 }
4118
4119 if ( op->o_pagedresults != SLAP_CONTROL_NONE ) {
4120 rs->sr_text = "Sync control specified with pagedResults control";
4121 return LDAP_PROTOCOL_ERROR;
4122 }
4123
4124 if ( BER_BVISNULL( &ctrl->ldctl_value ) ) {
4125 rs->sr_text = "Sync control value is absent";
4126 return LDAP_PROTOCOL_ERROR;
4127 }
4128
4129 if ( BER_BVISEMPTY( &ctrl->ldctl_value ) ) {
4130 rs->sr_text = "Sync control value is empty";
4131 return LDAP_PROTOCOL_ERROR;
4132 }
4133
4134 /* Parse the control value
4135 * syncRequestValue ::= SEQUENCE {
4136 * mode ENUMERATED {
4137 * -- 0 unused
4138 * refreshOnly (1),
4139 * -- 2 reserved
4140 * refreshAndPersist (3)
4141 * },
4142 * cookie syncCookie OPTIONAL
4143 * }
4144 */
4145
4146 ber_init2( ber, &ctrl->ldctl_value, 0 );
4147
4148 if ( (tag = ber_scanf( ber, "{i" /*}*/, &mode )) == LBER_ERROR ) {
4149 rs->sr_text = "Sync control : mode decoding error";
4150 return LDAP_PROTOCOL_ERROR;
4151 }
4152
4153 switch( mode ) {
4154 case LDAP_SYNC_REFRESH_ONLY:
4155 mode = SLAP_SYNC_REFRESH;
4156 break;
4157 case LDAP_SYNC_REFRESH_AND_PERSIST:
4158 mode = SLAP_SYNC_REFRESH_AND_PERSIST;
4159 break;
4160 default:
4161 rs->sr_text = "Sync control : unknown update mode";
4162 return LDAP_PROTOCOL_ERROR;
4163 }
4164
4165 tag = ber_peek_tag( ber, &len );
4166
4167 if ( tag == LDAP_TAG_SYNC_COOKIE ) {
4168 if (( ber_scanf( ber, /*{*/ "m", &cookie )) == LBER_ERROR ) {
4169 rs->sr_text = "Sync control : cookie decoding error";
4170 return LDAP_PROTOCOL_ERROR;
4171 }
4172 tag = ber_peek_tag( ber, &len );
4173 }
4174 if ( tag == LDAP_TAG_RELOAD_HINT ) {
4175 if (( ber_scanf( ber, /*{*/ "b", &rhint )) == LBER_ERROR ) {
4176 rs->sr_text = "Sync control : rhint decoding error";
4177 return LDAP_PROTOCOL_ERROR;
4178 }
4179 }
4180 if (( ber_scanf( ber, /*{*/ "}")) == LBER_ERROR ) {
4181 rs->sr_text = "Sync control : decoding error";
4182 return LDAP_PROTOCOL_ERROR;
4183 }
4184 sr = op->o_tmpcalloc( 1, sizeof(struct sync_control), op->o_tmpmemctx );
4185 sr->sr_rhint = rhint;
4186 if (!BER_BVISNULL(&cookie)) {
4187 ber_dupbv_x( &sr->sr_state.octet_str, &cookie, op->o_tmpmemctx );
4188 /* If parse fails, pretend no cookie was sent */
4189 if ( slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx ) ||
4190 sr->sr_state.rid == -1 ) {
4191 if ( sr->sr_state.ctxcsn ) {
4192 ber_bvarray_free_x( sr->sr_state.ctxcsn, op->o_tmpmemctx );
4193 sr->sr_state.ctxcsn = NULL;
4194 }
4195 sr->sr_state.numcsns = 0;
4196 }
4197 }
4198
4199 op->o_controls[slap_cids.sc_LDAPsync] = sr;
4200
4201 op->o_sync = ctrl->ldctl_iscritical
4202 ? SLAP_CONTROL_CRITICAL
4203 : SLAP_CONTROL_NONCRITICAL;
4204
4205 op->o_sync_mode |= mode; /* o_sync_mode shares o_sync */
4206
4207 return LDAP_SUCCESS;
4208 }
4209
4210 /* This overlay is set up for dynamic loading via moduleload. For static
4211 * configuration, you'll need to arrange for the slap_overinst to be
4212 * initialized and registered by some other function inside slapd.
4213 */
4214
4215 static slap_overinst syncprov;
4216
4217 int
syncprov_initialize()4218 syncprov_initialize()
4219 {
4220 int rc;
4221
4222 rc = register_supported_control( LDAP_CONTROL_SYNC,
4223 SLAP_CTRL_SEARCH, NULL,
4224 syncprov_parseCtrl, &slap_cids.sc_LDAPsync );
4225 if ( rc != LDAP_SUCCESS ) {
4226 Debug( LDAP_DEBUG_ANY,
4227 "syncprov_init: Failed to register control %d\n", rc );
4228 return rc;
4229 }
4230
4231 syncprov.on_bi.bi_type = "syncprov";
4232 syncprov.on_bi.bi_flags = SLAPO_BFLAG_SINGLE;
4233 syncprov.on_bi.bi_db_init = syncprov_db_init;
4234 syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;
4235 syncprov.on_bi.bi_db_open = syncprov_db_open;
4236 syncprov.on_bi.bi_db_close = syncprov_db_close;
4237
4238 syncprov.on_bi.bi_op_abandon = syncprov_op_abandon;
4239 syncprov.on_bi.bi_op_cancel = syncprov_op_abandon;
4240
4241 syncprov.on_bi.bi_op_add = syncprov_op_mod;
4242 syncprov.on_bi.bi_op_compare = syncprov_op_compare;
4243 syncprov.on_bi.bi_op_delete = syncprov_op_mod;
4244 syncprov.on_bi.bi_op_modify = syncprov_op_mod;
4245 syncprov.on_bi.bi_op_modrdn = syncprov_op_mod;
4246 syncprov.on_bi.bi_op_search = syncprov_op_search;
4247 syncprov.on_bi.bi_extended = syncprov_op_extended;
4248 syncprov.on_bi.bi_operational = syncprov_operational;
4249
4250 syncprov.on_bi.bi_cf_ocs = spocs;
4251
4252 generic_filter.f_desc = slap_schema.si_ad_objectClass;
4253
4254 rc = config_register_schema( spcfg, spocs );
4255 if ( rc ) return rc;
4256
4257 return overlay_register( &syncprov );
4258 }
4259
4260 #if SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC
4261 int
init_module(int argc,char * argv[])4262 init_module( int argc, char *argv[] )
4263 {
4264 return syncprov_initialize();
4265 }
4266 #endif /* SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC */
4267
4268 #endif /* defined(SLAPD_OVER_SYNCPROV) */
4269