1 /*
2    ldb database library
3 
4    Copyright (C) Andrew Tridgell  2004-2009
5 
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9 
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14 
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19 
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23 
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb key value backend - indexing
28  *
29  *  Description: indexing routines for ldb key value backend
30  *
31  *  Author: Andrew Tridgell
32  */
33 
34 /*
35 
36 LDB Index design and choice of key:
37 =======================================
38 
39 LDB has index records held as LDB objects with a special record like:
40 
41 dn: @INDEX:attr:value
42 
43 value may be base64 encoded, if it is deemed not printable:
44 
45 dn: @INDEX:attr::base64-value
46 
47 In each record, there is two possible formats:
48 
49 The original format is:
50 -----------------------
51 
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55 
56 In this format, @IDX is multi-valued, one entry for each match
57 
58 The corrosponding entry is stored in a TDB record with key:
59 
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61 
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64 
65 The original mixed-case DN is stored in the entry iself.
66 
67 
68 The new 'GUID index' format is:
69 -------------------------------
70 
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74 
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78 
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81 
82 The corrosponding entry is stored in a TDB record with key:
83 
84 GUID=<binary GUID>
85 
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90 
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94 
95 Exception for special @ DNs:
96 
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100 
101 
102 Control points for choice of index mode
103 ---------------------------------------
104 
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107 
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111 
112 By default, the original DN format is used.
113 
114 
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117 
118 @IDXATTR controls if an attribute is indexed
119 
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123 
124 
125 C Override functions
126 --------------------
127 
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131 
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134 
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140 
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144 
145 */
146 
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151 #include "lib/util/attr.h"
152 
153 struct dn_list {
154 	unsigned int count;
155 	struct ldb_val *dn;
156 	/*
157 	 * Do not optimise the intersection of this list,
158 	 * we must never return an entry not in this
159 	 * list.  This allows the index for
160 	 * SCOPE_ONELEVEL to be trusted.
161 	 */
162 	bool strict;
163 };
164 
165 struct ldb_kv_idxptr {
166 	/*
167 	 * In memory tdb to cache the index updates performed during a
168 	 * transaction.  This improves the performance of operations like
169 	 * re-index and join
170 	 */
171 	struct tdb_context *itdb;
172 	int error;
173 };
174 
175 enum key_truncation {
176 	KEY_NOT_TRUNCATED,
177 	KEY_TRUNCATED,
178 };
179 
180 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
181 				      const struct ldb_message *msg,
182 				      int add);
183 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
184 				   struct ldb_kv_private *ldb_kv,
185 				   struct ldb_dn *base_dn,
186 				   struct dn_list *dn_list,
187 				   enum key_truncation *truncation);
188 
189 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
190 				struct dn_list *list);
191 
192 /* we put a @IDXVERSION attribute on index entries. This
193    allows us to tell if it was written by an older version
194 */
195 #define LDB_KV_INDEXING_VERSION 2
196 
197 #define LDB_KV_GUID_INDEXING_VERSION 3
198 
ldb_kv_max_key_length(struct ldb_kv_private * ldb_kv)199 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
200 {
201 	if (ldb_kv->max_key_length == 0) {
202 		return UINT_MAX;
203 	}
204 	return ldb_kv->max_key_length;
205 }
206 
207 /* enable the idxptr mode when transactions start */
ldb_kv_index_transaction_start(struct ldb_module * module,size_t cache_size)208 int ldb_kv_index_transaction_start(
209 	struct ldb_module *module,
210 	size_t cache_size)
211 {
212 	struct ldb_kv_private *ldb_kv = talloc_get_type(
213 	    ldb_module_get_private(module), struct ldb_kv_private);
214 	ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
215 	if (ldb_kv->idxptr == NULL) {
216 		return ldb_oom(ldb_module_get_ctx(module));
217 	}
218 
219 	ldb_kv->idxptr->itdb = tdb_open(
220 		NULL,
221 		cache_size,
222 		TDB_INTERNAL,
223 		O_RDWR,
224 		0);
225 	if (ldb_kv->idxptr->itdb == NULL) {
226 		return LDB_ERR_OPERATIONS_ERROR;
227 	}
228 
229 	return LDB_SUCCESS;
230 }
231 
232 /*
233   see if two ldb_val structures contain exactly the same data
234   return -1 or 1 for a mismatch, 0 for match
235 */
ldb_val_equal_exact_for_qsort(const struct ldb_val * v1,const struct ldb_val * v2)236 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
237 					 const struct ldb_val *v2)
238 {
239 	if (v1->length > v2->length) {
240 		return -1;
241 	}
242 	if (v1->length < v2->length) {
243 		return 1;
244 	}
245 	return memcmp(v1->data, v2->data, v1->length);
246 }
247 
248 /*
249   see if two ldb_val structures contain exactly the same data
250   return -1 or 1 for a mismatch, 0 for match
251 */
ldb_val_equal_exact_ordered(const struct ldb_val v1,const struct ldb_val * v2)252 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
253 				       const struct ldb_val *v2)
254 {
255 	if (v1.length > v2->length) {
256 		return -1;
257 	}
258 	if (v1.length < v2->length) {
259 		return 1;
260 	}
261 	return memcmp(v1.data, v2->data, v1.length);
262 }
263 
264 
265 /*
266   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
267   binary-safe comparison for the 'dn' returns -1 if not found
268 
269   This is therefore safe when the value is a GUID in the future
270  */
ldb_kv_dn_list_find_val(struct ldb_kv_private * ldb_kv,const struct dn_list * list,const struct ldb_val * v)271 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
272 				   const struct dn_list *list,
273 				   const struct ldb_val *v)
274 {
275 	unsigned int i;
276 	struct ldb_val *exact = NULL, *next = NULL;
277 
278 	if (ldb_kv->cache->GUID_index_attribute == NULL) {
279 		for (i=0; i<list->count; i++) {
280 			if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
281 				return i;
282 			}
283 		}
284 		return -1;
285 	}
286 
287 	BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
288 				*v, ldb_val_equal_exact_ordered,
289 				exact, next);
290 	if (exact == NULL) {
291 		return -1;
292 	}
293 	/* Not required, but keeps the compiler quiet */
294 	if (next != NULL) {
295 		return -1;
296 	}
297 
298 	i = exact - list->dn;
299 	return i;
300 }
301 
302 /*
303   find a entry in a dn_list. Uses a case sensitive comparison with the dn
304   returns -1 if not found
305  */
ldb_kv_dn_list_find_msg(struct ldb_kv_private * ldb_kv,struct dn_list * list,const struct ldb_message * msg)306 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
307 				   struct dn_list *list,
308 				   const struct ldb_message *msg)
309 {
310 	struct ldb_val v;
311 	const struct ldb_val *key_val;
312 	if (ldb_kv->cache->GUID_index_attribute == NULL) {
313 		const char *dn_str = ldb_dn_get_linearized(msg->dn);
314 		v.data = discard_const_p(unsigned char, dn_str);
315 		v.length = strlen(dn_str);
316 	} else {
317 		key_val = ldb_msg_find_ldb_val(
318 		    msg, ldb_kv->cache->GUID_index_attribute);
319 		if (key_val == NULL) {
320 			return -1;
321 		}
322 		v = *key_val;
323 	}
324 	return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
325 }
326 
327 /*
328   this is effectively a cast function, but with lots of paranoia
329   checks and also copes with CPUs that are fussy about pointer
330   alignment
331  */
ldb_kv_index_idxptr(struct ldb_module * module,TDB_DATA rec)332 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
333 					   TDB_DATA rec)
334 {
335 	struct dn_list *list;
336 	if (rec.dsize != sizeof(void *)) {
337 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
338 				       "Bad data size for idxptr %u", (unsigned)rec.dsize);
339 		return NULL;
340 	}
341 	/* note that we can't just use a cast here, as rec.dptr may
342 	   not be aligned sufficiently for a pointer. A cast would cause
343 	   platforms like some ARM CPUs to crash */
344 	memcpy(&list, rec.dptr, sizeof(void *));
345 	list = talloc_get_type(list, struct dn_list);
346 	if (list == NULL) {
347 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
348 				       "Bad type '%s' for idxptr",
349 				       talloc_get_name(list));
350 		return NULL;
351 	}
352 	return list;
353 }
354 
355 enum dn_list_will_be_read_only {
356 	DN_LIST_MUTABLE = 0,
357 	DN_LIST_WILL_BE_READ_ONLY = 1,
358 };
359 
360 /*
361   return the @IDX list in an index entry for a dn as a
362   struct dn_list
363  */
ldb_kv_dn_list_load(struct ldb_module * module,struct ldb_kv_private * ldb_kv,struct ldb_dn * dn,struct dn_list * list,enum dn_list_will_be_read_only read_only)364 static int ldb_kv_dn_list_load(struct ldb_module *module,
365 			       struct ldb_kv_private *ldb_kv,
366 			       struct ldb_dn *dn,
367 			       struct dn_list *list,
368 			       enum dn_list_will_be_read_only read_only)
369 {
370 	struct ldb_message *msg;
371 	int ret, version;
372 	struct ldb_message_element *el;
373 	TDB_DATA rec = {0};
374 	struct dn_list *list2;
375 	bool from_primary_cache = false;
376 	TDB_DATA key = {0};
377 
378 	list->dn = NULL;
379 	list->count = 0;
380 	list->strict = false;
381 
382 	/*
383 	 * See if we have an in memory index cache
384 	 */
385 	if (ldb_kv->idxptr == NULL) {
386 		goto normal_index;
387 	}
388 
389 	key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
390 	key.dsize = strlen((char *)key.dptr);
391 
392 	/*
393 	 * Have we cached this index record?
394 	 * If we have a nested transaction cache try that first.
395 	 * then try the transaction cache.
396 	 * if the record is not cached it will need to be read from disk.
397 	 */
398 	if (ldb_kv->nested_idx_ptr != NULL) {
399 		rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
400 	}
401 	if (rec.dptr == NULL) {
402 		from_primary_cache = true;
403 		rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
404 	}
405 	if (rec.dptr == NULL) {
406 		goto normal_index;
407 	}
408 
409 	/* we've found an in-memory index entry */
410 	list2 = ldb_kv_index_idxptr(module, rec);
411 	if (list2 == NULL) {
412 		free(rec.dptr);
413 		return LDB_ERR_OPERATIONS_ERROR;
414 	}
415 	free(rec.dptr);
416 
417 	/*
418 	 * If this is a read only transaction the indexes will not be
419 	 * changed so we don't need a copy in the event of a rollback
420 	 *
421 	 * In this case make an early return
422 	 */
423 	if (read_only == DN_LIST_WILL_BE_READ_ONLY) {
424 		*list = *list2;
425 		return LDB_SUCCESS;
426 	}
427 
428 	/*
429 	 * record was read from the sub transaction cache, so we have
430 	 * already copied the primary cache record
431 	 */
432 	if (!from_primary_cache) {
433 		*list = *list2;
434 		return LDB_SUCCESS;
435 	}
436 
437 	/*
438 	 * No index sub transaction active, so no need to cache a copy
439 	 */
440 	if (ldb_kv->nested_idx_ptr == NULL) {
441 		*list = *list2;
442 		return LDB_SUCCESS;
443 	}
444 
445 	/*
446 	 * There is an active index sub transaction, and the record was
447 	 * found in the primary index transaction cache.  A copy of the
448 	 * record needs be taken to prevent the original entry being
449 	 * altered, until the index sub transaction is committed.
450 	 */
451 
452 	{
453 		struct ldb_val *dns = NULL;
454 		size_t x = 0;
455 
456 		dns = talloc_array(
457 			list,
458 			struct ldb_val,
459 			list2->count);
460 		if (dns == NULL) {
461 			return LDB_ERR_OPERATIONS_ERROR;
462 		}
463 		for (x = 0; x < list2->count; x++) {
464 			dns[x].length = list2->dn[x].length;
465 			dns[x].data = talloc_memdup(
466 				dns,
467 				list2->dn[x].data,
468 				list2->dn[x].length);
469 			if (dns[x].data == NULL) {
470 				TALLOC_FREE(dns);
471 				return LDB_ERR_OPERATIONS_ERROR;
472 			}
473 		}
474 		list->dn = dns;
475 		list->count = list2->count;
476 	}
477 	return LDB_SUCCESS;
478 
479 	/*
480 	 * Index record not found in the caches, read it from the
481 	 * database.
482 	 */
483 normal_index:
484 	msg = ldb_msg_new(list);
485 	if (msg == NULL) {
486 		return LDB_ERR_OPERATIONS_ERROR;
487 	}
488 
489 	ret = ldb_kv_search_dn1(module,
490 				dn,
491 				msg,
492 				LDB_UNPACK_DATA_FLAG_NO_DN |
493 				/*
494 				 * The entry point ldb_kv_search_indexed is
495 				 * only called from the read-locked
496 				 * ldb_kv_search.
497 				 */
498 				LDB_UNPACK_DATA_FLAG_READ_LOCKED);
499 	if (ret != LDB_SUCCESS) {
500 		talloc_free(msg);
501 		return ret;
502 	}
503 
504 	el = ldb_msg_find_element(msg, LDB_KV_IDX);
505 	if (!el) {
506 		talloc_free(msg);
507 		return LDB_SUCCESS;
508 	}
509 
510 	version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
511 
512 	/*
513 	 * we avoid copying the strings by stealing the list.  We have
514 	 * to steal msg onto el->values (which looks odd) because
515 	 * the memory is allocated on msg, not on each value.
516 	 */
517 	if (ldb_kv->cache->GUID_index_attribute == NULL) {
518 		/* check indexing version number */
519 		if (version != LDB_KV_INDEXING_VERSION) {
520 			ldb_debug_set(ldb_module_get_ctx(module),
521 				      LDB_DEBUG_ERROR,
522 				      "Wrong DN index version %d "
523 				      "expected %d for %s",
524 				      version, LDB_KV_INDEXING_VERSION,
525 				      ldb_dn_get_linearized(dn));
526 			talloc_free(msg);
527 			return LDB_ERR_OPERATIONS_ERROR;
528 		}
529 
530 		talloc_steal(el->values, msg);
531 		list->dn = talloc_steal(list, el->values);
532 		list->count = el->num_values;
533 	} else {
534 		unsigned int i;
535 		if (version != LDB_KV_GUID_INDEXING_VERSION) {
536 			/* This is quite likely during the DB startup
537 			   on first upgrade to using a GUID index */
538 			ldb_debug_set(ldb_module_get_ctx(module),
539 				      LDB_DEBUG_ERROR,
540 				      "Wrong GUID index version %d "
541 				      "expected %d for %s",
542 				      version, LDB_KV_GUID_INDEXING_VERSION,
543 				      ldb_dn_get_linearized(dn));
544 			talloc_free(msg);
545 			return LDB_ERR_OPERATIONS_ERROR;
546 		}
547 
548 		if (el->num_values == 0) {
549 			talloc_free(msg);
550 			return LDB_ERR_OPERATIONS_ERROR;
551 		}
552 
553 		if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
554 			talloc_free(msg);
555 			return LDB_ERR_OPERATIONS_ERROR;
556 		}
557 
558 		list->count = el->values[0].length / LDB_KV_GUID_SIZE;
559 		list->dn = talloc_array(list, struct ldb_val, list->count);
560 		if (list->dn == NULL) {
561 			talloc_free(msg);
562 			return LDB_ERR_OPERATIONS_ERROR;
563 		}
564 
565 		/*
566 		 * The actual data is on msg.
567 		 */
568 		talloc_steal(list->dn, msg);
569 		for (i = 0; i < list->count; i++) {
570 			list->dn[i].data
571 				= &el->values[0].data[i * LDB_KV_GUID_SIZE];
572 			list->dn[i].length = LDB_KV_GUID_SIZE;
573 		}
574 	}
575 
576 	/* We don't need msg->elements any more */
577 	talloc_free(msg->elements);
578 	return LDB_SUCCESS;
579 }
580 
ldb_kv_key_dn_from_idx(struct ldb_module * module,struct ldb_kv_private * ldb_kv,TALLOC_CTX * mem_ctx,struct ldb_dn * dn,struct ldb_val * ldb_key)581 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
582 			   struct ldb_kv_private *ldb_kv,
583 			   TALLOC_CTX *mem_ctx,
584 			   struct ldb_dn *dn,
585 			   struct ldb_val *ldb_key)
586 {
587 	struct ldb_context *ldb = ldb_module_get_ctx(module);
588 	int ret;
589 	int index = 0;
590 	enum key_truncation truncation = KEY_NOT_TRUNCATED;
591 	struct dn_list *list = talloc(mem_ctx, struct dn_list);
592 	if (list == NULL) {
593 		ldb_oom(ldb);
594 		return LDB_ERR_OPERATIONS_ERROR;
595 	}
596 
597 	ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
598 	if (ret != LDB_SUCCESS) {
599 		TALLOC_FREE(list);
600 		return ret;
601 	}
602 
603 	if (list->count == 0) {
604 		TALLOC_FREE(list);
605 		return LDB_ERR_NO_SUCH_OBJECT;
606 	}
607 
608 	if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
609 		const char *dn_str = ldb_dn_get_linearized(dn);
610 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
611 				       __location__
612 				       ": Failed to read DN index "
613 				       "against %s for %s: too many "
614 				       "values (%u > 1)",
615 				       ldb_kv->cache->GUID_index_attribute,
616 				       dn_str,
617 				       list->count);
618 		TALLOC_FREE(list);
619 		return LDB_ERR_CONSTRAINT_VIOLATION;
620 	}
621 
622 	if (list->count > 0 && truncation == KEY_TRUNCATED)  {
623 		/*
624 		 * DN key has been truncated, need to inspect the actual
625 		 * records to locate the actual DN
626 		 */
627 		unsigned int i;
628 		index = -1;
629 		for (i=0; i < list->count; i++) {
630 			uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
631 			struct ldb_val key = {
632 				.data = guid_key,
633 				.length = sizeof(guid_key)
634 			};
635 			const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
636 			struct ldb_message *rec = ldb_msg_new(ldb);
637 			if (rec == NULL) {
638 				TALLOC_FREE(list);
639 				return LDB_ERR_OPERATIONS_ERROR;
640 			}
641 
642 			ret = ldb_kv_idx_to_key(
643 			    module, ldb_kv, ldb, &list->dn[i], &key);
644 			if (ret != LDB_SUCCESS) {
645 				TALLOC_FREE(list);
646 				TALLOC_FREE(rec);
647 				return ret;
648 			}
649 
650 			ret =
651 			    ldb_kv_search_key(module, ldb_kv, key, rec, flags);
652 			if (key.data != guid_key) {
653 				TALLOC_FREE(key.data);
654 			}
655 			if (ret == LDB_ERR_NO_SUCH_OBJECT) {
656 				/*
657 				 * the record has disappeared?
658 				 * yes, this can happen
659 				 */
660 				TALLOC_FREE(rec);
661 				continue;
662 			}
663 
664 			if (ret != LDB_SUCCESS) {
665 				/* an internal error */
666 				TALLOC_FREE(rec);
667 				TALLOC_FREE(list);
668 				return LDB_ERR_OPERATIONS_ERROR;
669 			}
670 
671 			/*
672 			 * We found the actual DN that we wanted from in the
673 			 * multiple values that matched the index
674 			 * (due to truncation), so return that.
675 			 *
676 			 */
677 			if (ldb_dn_compare(dn, rec->dn) == 0) {
678 				index = i;
679 				TALLOC_FREE(rec);
680 				break;
681 			}
682 		}
683 
684 		/*
685 		 * We matched the index but the actual DN we wanted
686 		 * was not here.
687 		 */
688 		if (index == -1) {
689 			TALLOC_FREE(list);
690 			return LDB_ERR_NO_SUCH_OBJECT;
691 		}
692 	}
693 
694 	/* The ldb_key memory is allocated by the caller */
695 	ret = ldb_kv_guid_to_key(&list->dn[index], ldb_key);
696 	TALLOC_FREE(list);
697 
698 	if (ret != LDB_SUCCESS) {
699 		return LDB_ERR_OPERATIONS_ERROR;
700 	}
701 
702 	return LDB_SUCCESS;
703 }
704 
705 
706 
707 /*
708   save a dn_list into a full @IDX style record
709  */
ldb_kv_dn_list_store_full(struct ldb_module * module,struct ldb_kv_private * ldb_kv,struct ldb_dn * dn,struct dn_list * list)710 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
711 				     struct ldb_kv_private *ldb_kv,
712 				     struct ldb_dn *dn,
713 				     struct dn_list *list)
714 {
715 	struct ldb_message *msg;
716 	int ret;
717 
718 	msg = ldb_msg_new(module);
719 	if (!msg) {
720 		return ldb_module_oom(module);
721 	}
722 
723 	msg->dn = dn;
724 
725 	if (list->count == 0) {
726 		ret = ldb_kv_delete_noindex(module, msg);
727 		if (ret == LDB_ERR_NO_SUCH_OBJECT) {
728 			ret = LDB_SUCCESS;
729 		}
730 		TALLOC_FREE(msg);
731 		return ret;
732 	}
733 
734 	if (ldb_kv->cache->GUID_index_attribute == NULL) {
735 		ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
736 				      LDB_KV_INDEXING_VERSION);
737 		if (ret != LDB_SUCCESS) {
738 			TALLOC_FREE(msg);
739 			return ldb_module_oom(module);
740 		}
741 	} else {
742 		ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
743 				      LDB_KV_GUID_INDEXING_VERSION);
744 		if (ret != LDB_SUCCESS) {
745 			TALLOC_FREE(msg);
746 			return ldb_module_oom(module);
747 		}
748 	}
749 
750 	if (list->count > 0) {
751 		struct ldb_message_element *el;
752 
753 		ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
754 		if (ret != LDB_SUCCESS) {
755 			TALLOC_FREE(msg);
756 			return ldb_module_oom(module);
757 		}
758 
759 		if (ldb_kv->cache->GUID_index_attribute == NULL) {
760 			el->values = list->dn;
761 			el->num_values = list->count;
762 		} else {
763 			struct ldb_val v;
764 			unsigned int i;
765 			el->values = talloc_array(msg,
766 						  struct ldb_val, 1);
767 			if (el->values == NULL) {
768 				TALLOC_FREE(msg);
769 				return ldb_module_oom(module);
770 			}
771 
772 			v.data = talloc_array_size(el->values,
773 						   list->count,
774 						   LDB_KV_GUID_SIZE);
775 			if (v.data == NULL) {
776 				TALLOC_FREE(msg);
777 				return ldb_module_oom(module);
778 			}
779 
780 			v.length = talloc_get_size(v.data);
781 
782 			for (i = 0; i < list->count; i++) {
783 				if (list->dn[i].length !=
784 				    LDB_KV_GUID_SIZE) {
785 					TALLOC_FREE(msg);
786 					return ldb_module_operr(module);
787 				}
788 				memcpy(&v.data[LDB_KV_GUID_SIZE*i],
789 				       list->dn[i].data,
790 				       LDB_KV_GUID_SIZE);
791 			}
792 			el->values[0] = v;
793 			el->num_values = 1;
794 		}
795 	}
796 
797 	ret = ldb_kv_store(module, msg, TDB_REPLACE);
798 	TALLOC_FREE(msg);
799 	return ret;
800 }
801 
802 /*
803   save a dn_list into the database, in either @IDX or internal format
804  */
ldb_kv_dn_list_store(struct ldb_module * module,struct ldb_dn * dn,struct dn_list * list)805 static int ldb_kv_dn_list_store(struct ldb_module *module,
806 				struct ldb_dn *dn,
807 				struct dn_list *list)
808 {
809 	struct ldb_kv_private *ldb_kv = talloc_get_type(
810 	    ldb_module_get_private(module), struct ldb_kv_private);
811 	TDB_DATA rec = {0};
812 	TDB_DATA key = {0};
813 
814 	int ret = LDB_SUCCESS;
815 	struct dn_list *list2 = NULL;
816 	struct ldb_kv_idxptr *idxptr = NULL;
817 
818 	key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
819 	if (key.dptr == NULL) {
820 		return LDB_ERR_OPERATIONS_ERROR;
821 	}
822 	key.dsize = strlen((char *)key.dptr);
823 
824 	/*
825 	 * If there is an index sub transaction active, update the
826 	 * sub transaction index cache.  Otherwise update the
827 	 * primary index cache
828 	 */
829 	if (ldb_kv->nested_idx_ptr != NULL) {
830 		idxptr = ldb_kv->nested_idx_ptr;
831 	} else {
832 		idxptr = ldb_kv->idxptr;
833 	}
834 	/*
835 	 * Get the cache entry for the index
836 	 *
837 	 * As the value in the cache is a pointer to a dn_list we update
838 	 * the dn_list directly.
839 	 *
840 	 */
841 	rec = tdb_fetch(idxptr->itdb, key);
842 	if (rec.dptr != NULL) {
843 		list2 = ldb_kv_index_idxptr(module, rec);
844 		if (list2 == NULL) {
845 			free(rec.dptr);
846 			return LDB_ERR_OPERATIONS_ERROR;
847 		}
848 		free(rec.dptr);
849 		/* Now put the updated pointer back in the cache */
850 		if (list->dn == NULL) {
851 			list2->dn = NULL;
852 			list2->count = 0;
853 		} else {
854 			list2->dn = talloc_steal(list2, list->dn);
855 			list2->count = list->count;
856 		}
857 		return LDB_SUCCESS;
858 	}
859 
860 	list2 = talloc(idxptr, struct dn_list);
861 	if (list2 == NULL) {
862 		return LDB_ERR_OPERATIONS_ERROR;
863 	}
864 	list2->dn = talloc_steal(list2, list->dn);
865 	list2->count = list->count;
866 
867 	rec.dptr = (uint8_t *)&list2;
868 	rec.dsize = sizeof(void *);
869 
870 
871 	/*
872 	 * This is not a store into the main DB, but into an in-memory
873 	 * TDB, so we don't need a guard on ltdb->read_only
874 	 *
875 	 * Also as we directly update the in memory dn_list for existing
876 	 * cache entries we must be adding a new entry to the cache.
877 	 */
878 	ret = tdb_store(idxptr->itdb, key, rec, TDB_INSERT);
879 	if (ret != 0) {
880 		return ltdb_err_map( tdb_error(idxptr->itdb));
881 	}
882 	return LDB_SUCCESS;
883 }
884 
885 /*
886   traverse function for storing the in-memory index entries on disk
887  */
ldb_kv_index_traverse_store(_UNUSED_ struct tdb_context * tdb,TDB_DATA key,TDB_DATA data,void * state)888 static int ldb_kv_index_traverse_store(_UNUSED_ struct tdb_context *tdb,
889 				       TDB_DATA key,
890 				       TDB_DATA data,
891 				       void *state)
892 {
893 	struct ldb_module *module = state;
894 	struct ldb_kv_private *ldb_kv = talloc_get_type(
895 	    ldb_module_get_private(module), struct ldb_kv_private);
896 	struct ldb_dn *dn;
897 	struct ldb_context *ldb = ldb_module_get_ctx(module);
898 	struct ldb_val v;
899 	struct dn_list *list;
900 
901 	list = ldb_kv_index_idxptr(module, data);
902 	if (list == NULL) {
903 		ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
904 		return -1;
905 	}
906 
907 	v.data = key.dptr;
908 	v.length = strnlen((char *)key.dptr, key.dsize);
909 
910 	dn = ldb_dn_from_ldb_val(module, ldb, &v);
911 	if (dn == NULL) {
912 		ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
913 		ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
914 		return -1;
915 	}
916 
917 	ldb_kv->idxptr->error =
918 	    ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
919 	talloc_free(dn);
920 	if (ldb_kv->idxptr->error != 0) {
921 		return -1;
922 	}
923 	return 0;
924 }
925 
926 /* cleanup the idxptr mode when transaction commits */
ldb_kv_index_transaction_commit(struct ldb_module * module)927 int ldb_kv_index_transaction_commit(struct ldb_module *module)
928 {
929 	struct ldb_kv_private *ldb_kv = talloc_get_type(
930 	    ldb_module_get_private(module), struct ldb_kv_private);
931 	int ret;
932 
933 	struct ldb_context *ldb = ldb_module_get_ctx(module);
934 
935 	ldb_reset_err_string(ldb);
936 
937 	if (ldb_kv->idxptr->itdb) {
938 		tdb_traverse(
939 		    ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
940 		tdb_close(ldb_kv->idxptr->itdb);
941 	}
942 
943 	ret = ldb_kv->idxptr->error;
944 	if (ret != LDB_SUCCESS) {
945 		if (!ldb_errstring(ldb)) {
946 			ldb_set_errstring(ldb, ldb_strerror(ret));
947 		}
948 		ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
949 	}
950 
951 	talloc_free(ldb_kv->idxptr);
952 	ldb_kv->idxptr = NULL;
953 	return ret;
954 }
955 
956 /* cleanup the idxptr mode when transaction cancels */
ldb_kv_index_transaction_cancel(struct ldb_module * module)957 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
958 {
959 	struct ldb_kv_private *ldb_kv = talloc_get_type(
960 	    ldb_module_get_private(module), struct ldb_kv_private);
961 	if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
962 		tdb_close(ldb_kv->idxptr->itdb);
963 	}
964 	TALLOC_FREE(ldb_kv->idxptr);
965 	if (ldb_kv->nested_idx_ptr && ldb_kv->nested_idx_ptr->itdb) {
966 		tdb_close(ldb_kv->nested_idx_ptr->itdb);
967 	}
968 	TALLOC_FREE(ldb_kv->nested_idx_ptr);
969 	return LDB_SUCCESS;
970 }
971 
972 
973 /*
974   return the dn key to be used for an index
975   the caller is responsible for freeing
976 */
ldb_kv_index_key(struct ldb_context * ldb,struct ldb_kv_private * ldb_kv,const char * attr,const struct ldb_val * value,const struct ldb_schema_attribute ** ap,enum key_truncation * truncation)977 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
978 				       struct ldb_kv_private *ldb_kv,
979 				       const char *attr,
980 				       const struct ldb_val *value,
981 				       const struct ldb_schema_attribute **ap,
982 				       enum key_truncation *truncation)
983 {
984 	struct ldb_dn *ret;
985 	struct ldb_val v;
986 	const struct ldb_schema_attribute *a = NULL;
987 	char *attr_folded = NULL;
988 	const char *attr_for_dn = NULL;
989 	int r;
990 	bool should_b64_encode;
991 
992 	unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
993 	size_t key_len = 0;
994 	size_t attr_len = 0;
995 	const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
996 	unsigned frmt_len = 0;
997 	const size_t additional_key_length = 4;
998 	unsigned int num_separators = 3; /* Estimate for overflow check */
999 	const size_t min_data = 1;
1000 	const size_t min_key_length = additional_key_length
1001 		+ indx_len + num_separators + min_data;
1002 	struct ldb_val empty;
1003 
1004 	/*
1005 	 * Accept a NULL value as a request for a key with no value.  This is
1006 	 * different from passing an empty value, which might be given
1007 	 * significance by some canonicalise functions.
1008 	 */
1009 	bool empty_val = value == NULL;
1010 	if (empty_val) {
1011 		empty.length = 0;
1012 		empty.data = discard_const_p(unsigned char, "");
1013 		value = &empty;
1014 	}
1015 
1016 	if (attr[0] == '@') {
1017 		attr_for_dn = attr;
1018 		v = *value;
1019 		if (ap != NULL) {
1020 			*ap = NULL;
1021 		}
1022 	} else {
1023 		attr_folded = ldb_attr_casefold(ldb, attr);
1024 		if (!attr_folded) {
1025 			return NULL;
1026 		}
1027 
1028 		attr_for_dn = attr_folded;
1029 
1030 		a = ldb_schema_attribute_by_name(ldb, attr);
1031 		if (ap) {
1032 			*ap = a;
1033 		}
1034 
1035 		if (empty_val) {
1036 			v = *value;
1037 		} else {
1038 			ldb_attr_handler_t fn;
1039 			if (a->syntax->index_format_fn &&
1040 			    ldb_kv->cache->GUID_index_attribute != NULL) {
1041 				fn = a->syntax->index_format_fn;
1042 			} else {
1043 				fn = a->syntax->canonicalise_fn;
1044 			}
1045 			r = fn(ldb, ldb, value, &v);
1046 			if (r != LDB_SUCCESS) {
1047 				const char *errstr = ldb_errstring(ldb);
1048 				/* canonicalisation can be refused. For
1049 				   example, a attribute that takes wildcards
1050 				   will refuse to canonicalise if the value
1051 				   contains a wildcard */
1052 				ldb_asprintf_errstring(ldb,
1053 						       "Failed to create "
1054 						       "index key for "
1055 						       "attribute '%s':%s%s%s",
1056 						       attr, ldb_strerror(r),
1057 						       (errstr?":":""),
1058 						       (errstr?errstr:""));
1059 				talloc_free(attr_folded);
1060 				return NULL;
1061 			}
1062 		}
1063 	}
1064 	attr_len = strlen(attr_for_dn);
1065 
1066 	/*
1067 	 * Check if there is any hope this will fit into the DB.
1068 	 * Overflow here is not actually critical the code below
1069 	 * checks again to make the printf and the DB does another
1070 	 * check for too long keys
1071 	 */
1072 	if (max_key_length - attr_len < min_key_length) {
1073 		ldb_asprintf_errstring(
1074 			ldb,
1075 			__location__ ": max_key_length "
1076 			"is too small (%u) < (%u)",
1077 			max_key_length,
1078 			(unsigned)(min_key_length + attr_len));
1079 		talloc_free(attr_folded);
1080 		return NULL;
1081 	}
1082 
1083 	/*
1084 	 * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
1085 	 * "DN=" and a trailing string terminator
1086 	 */
1087 	max_key_length -= additional_key_length;
1088 
1089 	/*
1090 	 * We do not base 64 encode a DN in a key, it has already been
1091 	 * casefold and lineraized, that is good enough.  That already
1092 	 * avoids embedded NUL etc.
1093 	 */
1094 	if (ldb_kv->cache->GUID_index_attribute != NULL) {
1095 		if (strcmp(attr, LDB_KV_IDXDN) == 0) {
1096 			should_b64_encode = false;
1097 		} else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
1098 			/*
1099 			 * We can only change the behaviour for IDXONE
1100 			 * when the GUID index is enabled
1101 			 */
1102 			should_b64_encode = false;
1103 		} else {
1104 			should_b64_encode
1105 				= ldb_should_b64_encode(ldb, &v);
1106 		}
1107 	} else {
1108 		should_b64_encode = ldb_should_b64_encode(ldb, &v);
1109 	}
1110 
1111 	if (should_b64_encode) {
1112 		size_t vstr_len = 0;
1113 		char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
1114 		if (!vstr) {
1115 			talloc_free(attr_folded);
1116 			return NULL;
1117 		}
1118 		vstr_len = strlen(vstr);
1119 		/*
1120 		 * Overflow here is not critical as we only use this
1121 		 * to choose the printf truncation
1122 		 */
1123 		key_len = num_separators + indx_len + attr_len + vstr_len;
1124 		if (key_len > max_key_length) {
1125 			size_t excess = key_len - max_key_length;
1126 			frmt_len = vstr_len - excess;
1127 			*truncation = KEY_TRUNCATED;
1128 			/*
1129 			* Truncated keys are placed in a separate key space
1130 			* from the non truncated keys
1131 			* Note: the double hash "##" is not a typo and
1132 			* indicates that the following value is base64 encoded
1133 			*/
1134 			ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
1135 					     LDB_KV_INDEX, attr_for_dn,
1136 					     frmt_len, vstr);
1137 		} else {
1138 			frmt_len = vstr_len;
1139 			*truncation = KEY_NOT_TRUNCATED;
1140 			/*
1141 			 * Note: the double colon "::" is not a typo and
1142 			 * indicates that the following value is base64 encoded
1143 			 */
1144 			ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1145 					     LDB_KV_INDEX, attr_for_dn,
1146 					     frmt_len, vstr);
1147 		}
1148 		talloc_free(vstr);
1149 	} else {
1150 		/* Only need two seperators */
1151 		num_separators = 2;
1152 
1153 		/*
1154 		 * Overflow here is not critical as we only use this
1155 		 * to choose the printf truncation
1156 		 */
1157 		key_len = num_separators + indx_len + attr_len + (int)v.length;
1158 		if (key_len > max_key_length) {
1159 			size_t excess = key_len - max_key_length;
1160 			frmt_len = v.length - excess;
1161 			*truncation = KEY_TRUNCATED;
1162 			/*
1163 			 * Truncated keys are placed in a separate key space
1164 			 * from the non truncated keys
1165 			 */
1166 			ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1167 					     LDB_KV_INDEX, attr_for_dn,
1168 					     frmt_len, (char *)v.data);
1169 		} else {
1170 			frmt_len = v.length;
1171 			*truncation = KEY_NOT_TRUNCATED;
1172 			ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1173 					     LDB_KV_INDEX, attr_for_dn,
1174 					     frmt_len, (char *)v.data);
1175 		}
1176 	}
1177 
1178 	if (value != NULL && v.data != value->data && !empty_val) {
1179 		talloc_free(v.data);
1180 	}
1181 	talloc_free(attr_folded);
1182 
1183 	return ret;
1184 }
1185 
1186 /*
1187   see if a attribute value is in the list of indexed attributes
1188 */
ldb_kv_is_indexed(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const char * attr)1189 static bool ldb_kv_is_indexed(struct ldb_module *module,
1190 			      struct ldb_kv_private *ldb_kv,
1191 			      const char *attr)
1192 {
1193 	struct ldb_context *ldb = ldb_module_get_ctx(module);
1194 	unsigned int i;
1195 	struct ldb_message_element *el;
1196 
1197 	if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1198 	    (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1199 		/* Implicity covered, this is the index key */
1200 		return false;
1201 	}
1202 	if (ldb->schema.index_handler_override) {
1203 		const struct ldb_schema_attribute *a
1204 			= ldb_schema_attribute_by_name(ldb, attr);
1205 
1206 		if (a == NULL) {
1207 			return false;
1208 		}
1209 
1210 		if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1211 			return true;
1212 		} else {
1213 			return false;
1214 		}
1215 	}
1216 
1217 	if (!ldb_kv->cache->attribute_indexes) {
1218 		return false;
1219 	}
1220 
1221 	el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1222 	if (el == NULL) {
1223 		return false;
1224 	}
1225 
1226 	/* TODO: this is too expensive! At least use a binary search */
1227 	for (i=0; i<el->num_values; i++) {
1228 		if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1229 			return true;
1230 		}
1231 	}
1232 	return false;
1233 }
1234 
1235 /*
1236   in the following logic functions, the return value is treated as
1237   follows:
1238 
1239      LDB_SUCCESS: we found some matching index values
1240 
1241      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1242 
1243      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1244                                we'll need a full search
1245  */
1246 
1247 /*
1248   return a list of dn's that might match a simple indexed search (an
1249   equality search only)
1250  */
ldb_kv_index_dn_simple(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_parse_tree * tree,struct dn_list * list)1251 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1252 				  struct ldb_kv_private *ldb_kv,
1253 				  const struct ldb_parse_tree *tree,
1254 				  struct dn_list *list)
1255 {
1256 	struct ldb_context *ldb;
1257 	struct ldb_dn *dn;
1258 	int ret;
1259 	enum key_truncation truncation = KEY_NOT_TRUNCATED;
1260 
1261 	ldb = ldb_module_get_ctx(module);
1262 
1263 	list->count = 0;
1264 	list->dn = NULL;
1265 
1266 	/* if the attribute isn't in the list of indexed attributes then
1267 	   this node needs a full search */
1268 	if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1269 		return LDB_ERR_OPERATIONS_ERROR;
1270 	}
1271 
1272 	/* the attribute is indexed. Pull the list of DNs that match the
1273 	   search criterion */
1274 	dn = ldb_kv_index_key(ldb,
1275 			      ldb_kv,
1276 			      tree->u.equality.attr,
1277 			      &tree->u.equality.value,
1278 			      NULL,
1279 			      &truncation);
1280 	/*
1281 	 * We ignore truncation here and allow multi-valued matches
1282 	 * as ltdb_search_indexed will filter out the wrong one in
1283 	 * ltdb_index_filter() which calls ldb_match_message().
1284 	 */
1285 	if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1286 
1287 	ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list,
1288 				  DN_LIST_WILL_BE_READ_ONLY);
1289 	talloc_free(dn);
1290 	return ret;
1291 }
1292 
1293 static bool list_union(struct ldb_context *ldb,
1294 		       struct ldb_kv_private *ldb_kv,
1295 		       struct dn_list *list,
1296 		       struct dn_list *list2);
1297 
1298 /*
1299   return a list of dn's that might match a leaf indexed search
1300  */
ldb_kv_index_dn_leaf(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_parse_tree * tree,struct dn_list * list)1301 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1302 				struct ldb_kv_private *ldb_kv,
1303 				const struct ldb_parse_tree *tree,
1304 				struct dn_list *list)
1305 {
1306 	if (ldb_kv->disallow_dn_filter &&
1307 	    (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1308 		/* in AD mode we do not support "(dn=...)" search filters */
1309 		list->dn = NULL;
1310 		list->count = 0;
1311 		return LDB_SUCCESS;
1312 	}
1313 	if (tree->u.equality.attr[0] == '@') {
1314 		/* Do not allow a indexed search against an @ */
1315 		list->dn = NULL;
1316 		list->count = 0;
1317 		return LDB_SUCCESS;
1318 	}
1319 	if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1320 		enum key_truncation truncation = KEY_NOT_TRUNCATED;
1321 		bool valid_dn = false;
1322 		struct ldb_dn *dn
1323 			= ldb_dn_from_ldb_val(list,
1324 					      ldb_module_get_ctx(module),
1325 					      &tree->u.equality.value);
1326 		if (dn == NULL) {
1327 			/* If we can't parse it, no match */
1328 			list->dn = NULL;
1329 			list->count = 0;
1330 			return LDB_SUCCESS;
1331 		}
1332 
1333 		valid_dn = ldb_dn_validate(dn);
1334 		if (valid_dn == false) {
1335 			/* If we can't parse it, no match */
1336 			list->dn = NULL;
1337 			list->count = 0;
1338 			return LDB_SUCCESS;
1339 		}
1340 
1341 		/*
1342 		 * Re-use the same code we use for a SCOPE_BASE
1343 		 * search
1344 		 *
1345 		 * We can't call TALLOC_FREE(dn) as this must belong
1346 		 * to list for the memory to remain valid.
1347 		 */
1348 		return ldb_kv_index_dn_base_dn(
1349 		    module, ldb_kv, dn, list, &truncation);
1350 		/*
1351 		 * We ignore truncation here and allow multi-valued matches
1352 		 * as ltdb_search_indexed will filter out the wrong one in
1353 		 * ltdb_index_filter() which calls ldb_match_message().
1354 		 */
1355 
1356 	} else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1357 		   (ldb_attr_cmp(tree->u.equality.attr,
1358 				 ldb_kv->cache->GUID_index_attribute) == 0)) {
1359 		int ret;
1360 		struct ldb_context *ldb = ldb_module_get_ctx(module);
1361 		list->dn = talloc_array(list, struct ldb_val, 1);
1362 		if (list->dn == NULL) {
1363 			ldb_module_oom(module);
1364 			return LDB_ERR_OPERATIONS_ERROR;
1365 		}
1366 		/*
1367 		 * We need to go via the canonicalise_fn() to
1368 		 * ensure we get the index in binary, rather
1369 		 * than a string
1370 		 */
1371 		ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1372 		    ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1373 		if (ret != LDB_SUCCESS) {
1374 			return LDB_ERR_OPERATIONS_ERROR;
1375 		}
1376 		list->count = 1;
1377 		return LDB_SUCCESS;
1378 	}
1379 
1380 	return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1381 }
1382 
1383 
1384 /*
1385   list intersection
1386   list = list & list2
1387 */
list_intersect(struct ldb_kv_private * ldb_kv,struct dn_list * list,const struct dn_list * list2)1388 static bool list_intersect(struct ldb_kv_private *ldb_kv,
1389 			   struct dn_list *list,
1390 			   const struct dn_list *list2)
1391 {
1392 	const struct dn_list *short_list, *long_list;
1393 	struct dn_list *list3;
1394 	unsigned int i;
1395 
1396 	if (list->count == 0) {
1397 		/* 0 & X == 0 */
1398 		return true;
1399 	}
1400 	if (list2->count == 0) {
1401 		/* X & 0 == 0 */
1402 		list->count = 0;
1403 		list->dn = NULL;
1404 		return true;
1405 	}
1406 
1407 	/*
1408 	 * In both of the below we check for strict and in that
1409 	 * case do not optimise the intersection of this list,
1410 	 * we must never return an entry not in this
1411 	 * list.  This allows the index for
1412 	 * SCOPE_ONELEVEL to be trusted.
1413 	 */
1414 
1415 	/* the indexing code is allowed to return a longer list than
1416 	   what really matches, as all results are filtered by the
1417 	   full expression at the end - this shortcut avoids a lot of
1418 	   work in some cases */
1419 	if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1420 		return true;
1421 	}
1422 	if (list2->count < 2 && list->count > 10 && list->strict == false) {
1423 		list->count = list2->count;
1424 		list->dn = list2->dn;
1425 		/* note that list2 may not be the parent of list2->dn,
1426 		   as list2->dn may be owned by ltdb->idxptr. In that
1427 		   case we expect this reparent call to fail, which is
1428 		   OK */
1429 		talloc_reparent(list2, list, list2->dn);
1430 		return true;
1431 	}
1432 
1433 	if (list->count > list2->count) {
1434 		short_list = list2;
1435 		long_list = list;
1436 	} else {
1437 		short_list = list;
1438 		long_list = list2;
1439 	}
1440 
1441 	list3 = talloc_zero(list, struct dn_list);
1442 	if (list3 == NULL) {
1443 		return false;
1444 	}
1445 
1446 	list3->dn = talloc_array(list3, struct ldb_val,
1447 				 MIN(list->count, list2->count));
1448 	if (!list3->dn) {
1449 		talloc_free(list3);
1450 		return false;
1451 	}
1452 	list3->count = 0;
1453 
1454 	for (i=0;i<short_list->count;i++) {
1455 		/* For the GUID index case, this is a binary search */
1456 		if (ldb_kv_dn_list_find_val(
1457 			ldb_kv, long_list, &short_list->dn[i]) != -1) {
1458 			list3->dn[list3->count] = short_list->dn[i];
1459 			list3->count++;
1460 		}
1461 	}
1462 
1463 	list->strict |= list2->strict;
1464 	list->dn = talloc_steal(list, list3->dn);
1465 	list->count = list3->count;
1466 	talloc_free(list3);
1467 
1468 	return true;
1469 }
1470 
1471 
1472 /*
1473   list union
1474   list = list | list2
1475 */
list_union(struct ldb_context * ldb,struct ldb_kv_private * ldb_kv,struct dn_list * list,struct dn_list * list2)1476 static bool list_union(struct ldb_context *ldb,
1477 		       struct ldb_kv_private *ldb_kv,
1478 		       struct dn_list *list,
1479 		       struct dn_list *list2)
1480 {
1481 	struct ldb_val *dn3;
1482 	unsigned int i = 0, j = 0, k = 0;
1483 
1484 	if (list2->count == 0) {
1485 		/* X | 0 == X */
1486 		return true;
1487 	}
1488 
1489 	if (list->count == 0) {
1490 		/* 0 | X == X */
1491 		list->count = list2->count;
1492 		list->dn = list2->dn;
1493 		/* note that list2 may not be the parent of list2->dn,
1494 		   as list2->dn may be owned by ltdb->idxptr. In that
1495 		   case we expect this reparent call to fail, which is
1496 		   OK */
1497 		talloc_reparent(list2, list, list2->dn);
1498 		return true;
1499 	}
1500 
1501 	/*
1502 	 * Sort the lists (if not in GUID DN mode) so we can do
1503 	 * the de-duplication during the merge
1504 	 *
1505 	 * NOTE: This can sort the in-memory index values, as list or
1506 	 * list2 might not be a copy!
1507 	 */
1508 	ldb_kv_dn_list_sort(ldb_kv, list);
1509 	ldb_kv_dn_list_sort(ldb_kv, list2);
1510 
1511 	dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1512 	if (!dn3) {
1513 		ldb_oom(ldb);
1514 		return false;
1515 	}
1516 
1517 	while (i < list->count || j < list2->count) {
1518 		int cmp;
1519 		if (i >= list->count) {
1520 			cmp = 1;
1521 		} else if (j >= list2->count) {
1522 			cmp = -1;
1523 		} else {
1524 			cmp = ldb_val_equal_exact_ordered(list->dn[i],
1525 							  &list2->dn[j]);
1526 		}
1527 
1528 		if (cmp < 0) {
1529 			/* Take list */
1530 			dn3[k] = list->dn[i];
1531 			i++;
1532 			k++;
1533 		} else if (cmp > 0) {
1534 			/* Take list2 */
1535 			dn3[k] = list2->dn[j];
1536 			j++;
1537 			k++;
1538 		} else {
1539 			/* Equal, take list */
1540 			dn3[k] = list->dn[i];
1541 			i++;
1542 			j++;
1543 			k++;
1544 		}
1545 	}
1546 
1547 	list->dn = dn3;
1548 	list->count = k;
1549 
1550 	return true;
1551 }
1552 
1553 static int ldb_kv_index_dn(struct ldb_module *module,
1554 			   struct ldb_kv_private *ldb_kv,
1555 			   const struct ldb_parse_tree *tree,
1556 			   struct dn_list *list);
1557 
1558 /*
1559   process an OR list (a union)
1560  */
ldb_kv_index_dn_or(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_parse_tree * tree,struct dn_list * list)1561 static int ldb_kv_index_dn_or(struct ldb_module *module,
1562 			      struct ldb_kv_private *ldb_kv,
1563 			      const struct ldb_parse_tree *tree,
1564 			      struct dn_list *list)
1565 {
1566 	struct ldb_context *ldb;
1567 	unsigned int i;
1568 
1569 	ldb = ldb_module_get_ctx(module);
1570 
1571 	list->dn = NULL;
1572 	list->count = 0;
1573 
1574 	for (i=0; i<tree->u.list.num_elements; i++) {
1575 		struct dn_list *list2;
1576 		int ret;
1577 
1578 		list2 = talloc_zero(list, struct dn_list);
1579 		if (list2 == NULL) {
1580 			return LDB_ERR_OPERATIONS_ERROR;
1581 		}
1582 
1583 		ret = ldb_kv_index_dn(
1584 		    module, ldb_kv, tree->u.list.elements[i], list2);
1585 
1586 		if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1587 			/* X || 0 == X */
1588 			talloc_free(list2);
1589 			continue;
1590 		}
1591 
1592 		if (ret != LDB_SUCCESS) {
1593 			/* X || * == * */
1594 			talloc_free(list2);
1595 			return ret;
1596 		}
1597 
1598 		if (!list_union(ldb, ldb_kv, list, list2)) {
1599 			talloc_free(list2);
1600 			return LDB_ERR_OPERATIONS_ERROR;
1601 		}
1602 	}
1603 
1604 	if (list->count == 0) {
1605 		return LDB_ERR_NO_SUCH_OBJECT;
1606 	}
1607 
1608 	return LDB_SUCCESS;
1609 }
1610 
1611 
1612 /*
1613   NOT an index results
1614  */
ldb_kv_index_dn_not(_UNUSED_ struct ldb_module * module,_UNUSED_ struct ldb_kv_private * ldb_kv,_UNUSED_ const struct ldb_parse_tree * tree,_UNUSED_ struct dn_list * list)1615 static int ldb_kv_index_dn_not(_UNUSED_ struct ldb_module *module,
1616 			       _UNUSED_ struct ldb_kv_private *ldb_kv,
1617 			       _UNUSED_ const struct ldb_parse_tree *tree,
1618 			       _UNUSED_ struct dn_list *list)
1619 {
1620 	/* the only way to do an indexed not would be if we could
1621 	   negate the not via another not or if we knew the total
1622 	   number of database elements so we could know that the
1623 	   existing expression covered the whole database.
1624 
1625 	   instead, we just give up, and rely on a full index scan
1626 	   (unless an outer & manages to reduce the list)
1627 	*/
1628 	return LDB_ERR_OPERATIONS_ERROR;
1629 }
1630 
1631 /*
1632  * These things are unique, so avoid a full scan if this is a search
1633  * by GUID, DN or a unique attribute
1634  */
ldb_kv_index_unique(struct ldb_context * ldb,struct ldb_kv_private * ldb_kv,const char * attr)1635 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1636 				struct ldb_kv_private *ldb_kv,
1637 				const char *attr)
1638 {
1639 	const struct ldb_schema_attribute *a;
1640 	if (ldb_kv->cache->GUID_index_attribute != NULL) {
1641 		if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1642 		    0) {
1643 			return true;
1644 		}
1645 	}
1646 	if (ldb_attr_dn(attr) == 0) {
1647 		return true;
1648 	}
1649 
1650 	a = ldb_schema_attribute_by_name(ldb, attr);
1651 	if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1652 		return true;
1653 	}
1654 	return false;
1655 }
1656 
1657 /*
1658   process an AND expression (intersection)
1659  */
ldb_kv_index_dn_and(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_parse_tree * tree,struct dn_list * list)1660 static int ldb_kv_index_dn_and(struct ldb_module *module,
1661 			       struct ldb_kv_private *ldb_kv,
1662 			       const struct ldb_parse_tree *tree,
1663 			       struct dn_list *list)
1664 {
1665 	struct ldb_context *ldb;
1666 	unsigned int i;
1667 	bool found;
1668 
1669 	ldb = ldb_module_get_ctx(module);
1670 
1671 	list->dn = NULL;
1672 	list->count = 0;
1673 
1674 	/* in the first pass we only look for unique simple
1675 	   equality tests, in the hope of avoiding having to look
1676 	   at any others */
1677 	for (i=0; i<tree->u.list.num_elements; i++) {
1678 		const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1679 		int ret;
1680 
1681 		if (subtree->operation != LDB_OP_EQUALITY ||
1682 		    !ldb_kv_index_unique(
1683 			ldb, ldb_kv, subtree->u.equality.attr)) {
1684 			continue;
1685 		}
1686 
1687 		ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1688 		if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1689 			/* 0 && X == 0 */
1690 			return LDB_ERR_NO_SUCH_OBJECT;
1691 		}
1692 		if (ret == LDB_SUCCESS) {
1693 			/* a unique index match means we can
1694 			 * stop. Note that we don't care if we return
1695 			 * a few too many objects, due to later
1696 			 * filtering */
1697 			return LDB_SUCCESS;
1698 		}
1699 	}
1700 
1701 	/* now do a full intersection */
1702 	found = false;
1703 
1704 	for (i=0; i<tree->u.list.num_elements; i++) {
1705 		const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1706 		struct dn_list *list2;
1707 		int ret;
1708 
1709 		list2 = talloc_zero(list, struct dn_list);
1710 		if (list2 == NULL) {
1711 			return ldb_module_oom(module);
1712 		}
1713 
1714 		ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1715 
1716 		if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1717 			/* X && 0 == 0 */
1718 			list->dn = NULL;
1719 			list->count = 0;
1720 			talloc_free(list2);
1721 			return LDB_ERR_NO_SUCH_OBJECT;
1722 		}
1723 
1724 		if (ret != LDB_SUCCESS) {
1725 			/* this didn't adding anything */
1726 			talloc_free(list2);
1727 			continue;
1728 		}
1729 
1730 		if (!found) {
1731 			talloc_reparent(list2, list, list->dn);
1732 			list->dn = list2->dn;
1733 			list->count = list2->count;
1734 			found = true;
1735 		} else if (!list_intersect(ldb_kv, list, list2)) {
1736 			talloc_free(list2);
1737 			return LDB_ERR_OPERATIONS_ERROR;
1738 		}
1739 
1740 		if (list->count == 0) {
1741 			list->dn = NULL;
1742 			return LDB_ERR_NO_SUCH_OBJECT;
1743 		}
1744 
1745 		if (list->count < 2) {
1746 			/* it isn't worth loading the next part of the tree */
1747 			return LDB_SUCCESS;
1748 		}
1749 	}
1750 
1751 	if (!found) {
1752 		/* none of the attributes were indexed */
1753 		return LDB_ERR_OPERATIONS_ERROR;
1754 	}
1755 
1756 	return LDB_SUCCESS;
1757 }
1758 
1759 struct ldb_kv_ordered_index_context {
1760 	struct ldb_module *module;
1761 	int error;
1762 	struct dn_list *dn_list;
1763 };
1764 
traverse_range_index(_UNUSED_ struct ldb_kv_private * ldb_kv,_UNUSED_ struct ldb_val key,struct ldb_val data,void * state)1765 static int traverse_range_index(_UNUSED_ struct ldb_kv_private *ldb_kv,
1766 				_UNUSED_ struct ldb_val key,
1767 				struct ldb_val data,
1768 				void *state)
1769 {
1770 
1771 	struct ldb_context *ldb;
1772 	struct ldb_kv_ordered_index_context *ctx =
1773 	    (struct ldb_kv_ordered_index_context *)state;
1774 	struct ldb_module *module = ctx->module;
1775 	struct ldb_message_element *el = NULL;
1776 	struct ldb_message *msg = NULL;
1777 	int version;
1778 	size_t dn_array_size, additional_length;
1779 	unsigned int i;
1780 
1781 	ldb = ldb_module_get_ctx(module);
1782 
1783 	msg = ldb_msg_new(module);
1784 
1785 	ctx->error = ldb_unpack_data_flags(ldb, &data, msg,
1786 					   LDB_UNPACK_DATA_FLAG_NO_DN);
1787 
1788 	if (ctx->error != LDB_SUCCESS) {
1789 		talloc_free(msg);
1790 		return ctx->error;
1791 	}
1792 
1793 	el = ldb_msg_find_element(msg, LDB_KV_IDX);
1794 	if (!el) {
1795 		talloc_free(msg);
1796 		return LDB_SUCCESS;
1797 	}
1798 
1799 	version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
1800 
1801 	/*
1802 	 * we avoid copying the strings by stealing the list.  We have
1803 	 * to steal msg onto el->values (which looks odd) because
1804 	 * the memory is allocated on msg, not on each value.
1805 	 */
1806 	if (version != LDB_KV_GUID_INDEXING_VERSION) {
1807 		/* This is quite likely during the DB startup
1808 		   on first upgrade to using a GUID index */
1809 		ldb_debug_set(ldb_module_get_ctx(module),
1810 			      LDB_DEBUG_ERROR, __location__
1811 			      ": Wrong GUID index version %d expected %d",
1812 			      version, LDB_KV_GUID_INDEXING_VERSION);
1813 		talloc_free(msg);
1814 		ctx->error = LDB_ERR_OPERATIONS_ERROR;
1815 		return ctx->error;
1816 	}
1817 
1818 	if (el->num_values == 0) {
1819 		talloc_free(msg);
1820 		ctx->error = LDB_ERR_OPERATIONS_ERROR;
1821 		return ctx->error;
1822 	}
1823 
1824 	if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0
1825 	    || el->values[0].length == 0) {
1826 		talloc_free(msg);
1827 		ctx->error = LDB_ERR_OPERATIONS_ERROR;
1828 		return ctx->error;
1829 	}
1830 
1831 	dn_array_size = talloc_array_length(ctx->dn_list->dn);
1832 
1833 	additional_length = el->values[0].length / LDB_KV_GUID_SIZE;
1834 
1835 	if (ctx->dn_list->count + additional_length < ctx->dn_list->count) {
1836 		talloc_free(msg);
1837 		ctx->error = LDB_ERR_OPERATIONS_ERROR;
1838 		return ctx->error;
1839 	}
1840 
1841 	if ((ctx->dn_list->count + additional_length) >= dn_array_size) {
1842 		size_t new_array_length;
1843 
1844 		if (dn_array_size * 2 < dn_array_size) {
1845 			talloc_free(msg);
1846 			ctx->error = LDB_ERR_OPERATIONS_ERROR;
1847 			return ctx->error;
1848 		}
1849 
1850 		new_array_length = MAX(ctx->dn_list->count + additional_length,
1851 				       dn_array_size * 2);
1852 
1853 		ctx->dn_list->dn = talloc_realloc(ctx->dn_list,
1854 						  ctx->dn_list->dn,
1855 						  struct ldb_val,
1856 						  new_array_length);
1857 	}
1858 
1859 	if (ctx->dn_list->dn == NULL) {
1860 		talloc_free(msg);
1861 		ctx->error = LDB_ERR_OPERATIONS_ERROR;
1862 		return ctx->error;
1863 	}
1864 
1865 	/*
1866 	 * The actual data is on msg.
1867 	 */
1868 	talloc_steal(ctx->dn_list->dn, msg);
1869 	for (i = 0; i < additional_length; i++) {
1870 		ctx->dn_list->dn[i + ctx->dn_list->count].data
1871 			= &el->values[0].data[i * LDB_KV_GUID_SIZE];
1872 		ctx->dn_list->dn[i + ctx->dn_list->count].length = LDB_KV_GUID_SIZE;
1873 
1874 	}
1875 
1876 	ctx->dn_list->count += additional_length;
1877 
1878 	talloc_free(msg->elements);
1879 
1880 	return LDB_SUCCESS;
1881 }
1882 
1883 /*
1884  * >= and <= indexing implemented using lexicographically sorted keys
1885  *
1886  * We only run this in GUID indexing mode and when there is no write
1887  * transaction (only implicit read locks are being held). Otherwise, we would
1888  * have to deal with the in-memory index cache.
1889  *
1890  * We rely on the implementation of index_format_fn on a schema syntax which
1891  * will can help us to construct keys which can be ordered correctly, and we
1892  * terminate using schema agnostic start and end keys.
1893  *
1894  * index_format_fn must output values which can be memcmp-able to produce the
1895  * correct ordering as defined by the schema syntax class.
1896  */
ldb_kv_index_dn_ordered(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_parse_tree * tree,struct dn_list * list,bool ascending)1897 static int ldb_kv_index_dn_ordered(struct ldb_module *module,
1898 				   struct ldb_kv_private *ldb_kv,
1899 				   const struct ldb_parse_tree *tree,
1900 				   struct dn_list *list, bool ascending)
1901 {
1902 	enum key_truncation truncation = KEY_NOT_TRUNCATED;
1903 	struct ldb_context *ldb = ldb_module_get_ctx(module);
1904 
1905 	struct ldb_val ldb_key = { 0 }, ldb_key2 = { 0 };
1906 	struct ldb_val start_key, end_key;
1907 	struct ldb_dn *key_dn = NULL;
1908 	const struct ldb_schema_attribute *a = NULL;
1909 
1910 	struct ldb_kv_ordered_index_context ctx;
1911 	int ret;
1912 
1913 	TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1914 
1915 	if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.comparison.attr)) {
1916 		return LDB_ERR_OPERATIONS_ERROR;
1917 	}
1918 
1919 	if (ldb_kv->cache->GUID_index_attribute == NULL) {
1920 		return LDB_ERR_OPERATIONS_ERROR;
1921 	}
1922 
1923 	/* bail out if we're in a transaction, full search instead. */
1924 	if (ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1925 		return LDB_ERR_OPERATIONS_ERROR;
1926 	}
1927 
1928 	if (ldb_kv->disallow_dn_filter &&
1929 	    (ldb_attr_cmp(tree->u.comparison.attr, "dn") == 0)) {
1930 		/* in AD mode we do not support "(dn=...)" search filters */
1931 		list->dn = NULL;
1932 		list->count = 0;
1933 		return LDB_SUCCESS;
1934 	}
1935 	if (tree->u.comparison.attr[0] == '@') {
1936 		/* Do not allow a indexed search against an @ */
1937 		list->dn = NULL;
1938 		list->count = 0;
1939 		return LDB_SUCCESS;
1940 	}
1941 
1942 	a = ldb_schema_attribute_by_name(ldb, tree->u.comparison.attr);
1943 
1944 	/*
1945 	 * If there's no index format function defined for this attr, then
1946 	 * the lexicographic order in the database doesn't correspond to the
1947 	 * attr's ordering, so we can't use the iterate_range op.
1948 	 */
1949 	if (a->syntax->index_format_fn == NULL) {
1950 		return LDB_ERR_OPERATIONS_ERROR;
1951 	}
1952 
1953 	key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1954 				  &tree->u.comparison.value,
1955 				  NULL, &truncation);
1956 	if (!key_dn) {
1957 		return LDB_ERR_OPERATIONS_ERROR;
1958 	} else if (truncation == KEY_TRUNCATED) {
1959 		ldb_debug(ldb, LDB_DEBUG_WARNING,
1960 			  __location__
1961 			  ": ordered index violation: key dn truncated: %s\n",
1962 			  ldb_dn_get_linearized(key_dn));
1963 		return LDB_ERR_OPERATIONS_ERROR;
1964 	}
1965 	ldb_key = ldb_kv_key_dn(tmp_ctx, key_dn);
1966 	talloc_free(key_dn);
1967 	if (ldb_key.data == NULL) {
1968 		return LDB_ERR_OPERATIONS_ERROR;
1969 	}
1970 
1971 	key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1972 				  NULL, NULL, &truncation);
1973 	if (!key_dn) {
1974 		return LDB_ERR_OPERATIONS_ERROR;
1975 	} else if (truncation == KEY_TRUNCATED) {
1976 		ldb_debug(ldb, LDB_DEBUG_WARNING,
1977 			  __location__
1978 			  ": ordered index violation: key dn truncated: %s\n",
1979 			  ldb_dn_get_linearized(key_dn));
1980 		return LDB_ERR_OPERATIONS_ERROR;
1981 	}
1982 
1983 	ldb_key2 = ldb_kv_key_dn(tmp_ctx, key_dn);
1984 	talloc_free(key_dn);
1985 	if (ldb_key2.data == NULL) {
1986 		return LDB_ERR_OPERATIONS_ERROR;
1987 	}
1988 
1989 	/*
1990 	 * In order to avoid defining a start and end key for the search, we
1991 	 * notice that each index key is of the form:
1992 	 *
1993 	 *     DN=@INDEX:<ATTRIBUTE>:<VALUE>\0.
1994 	 *
1995 	 * We can simply make our start key DN=@INDEX:<ATTRIBUTE>: and our end
1996 	 * key DN=@INDEX:<ATTRIBUTE>; to return all index entries for a
1997 	 * particular attribute.
1998 	 *
1999 	 * Our LMDB backend uses the default memcmp for key comparison.
2000 	 */
2001 
2002 	/* Eliminate NUL byte at the end of the empty key */
2003 	ldb_key2.length--;
2004 
2005 	if (ascending) {
2006 		/* : becomes ; for pseudo end-key */
2007 		ldb_key2.data[ldb_key2.length-1]++;
2008 		start_key = ldb_key;
2009 		end_key = ldb_key2;
2010 	} else {
2011 		start_key = ldb_key2;
2012 		end_key = ldb_key;
2013 	}
2014 
2015 	ctx.module = module;
2016 	ctx.error = 0;
2017 	ctx.dn_list = list;
2018 	ctx.dn_list->count = 0;
2019 	ctx.dn_list->dn = talloc_zero_array(ctx.dn_list, struct ldb_val, 2);
2020 
2021 	ret = ldb_kv->kv_ops->iterate_range(ldb_kv, start_key, end_key,
2022 					    traverse_range_index, &ctx);
2023 
2024 	if (ret != LDB_SUCCESS || ctx.error != LDB_SUCCESS) {
2025 		return LDB_ERR_OPERATIONS_ERROR;
2026 	}
2027 
2028 	TYPESAFE_QSORT(ctx.dn_list->dn, ctx.dn_list->count,
2029 		       ldb_val_equal_exact_for_qsort);
2030 
2031 	talloc_free(tmp_ctx);
2032 
2033 	return LDB_SUCCESS;
2034 }
2035 
ldb_kv_index_dn_greater(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_parse_tree * tree,struct dn_list * list)2036 static int ldb_kv_index_dn_greater(struct ldb_module *module,
2037 				   struct ldb_kv_private *ldb_kv,
2038 				   const struct ldb_parse_tree *tree,
2039 				   struct dn_list *list)
2040 {
2041 	return ldb_kv_index_dn_ordered(module,
2042 				       ldb_kv,
2043 				       tree,
2044 				       list, true);
2045 }
2046 
ldb_kv_index_dn_less(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_parse_tree * tree,struct dn_list * list)2047 static int ldb_kv_index_dn_less(struct ldb_module *module,
2048 				   struct ldb_kv_private *ldb_kv,
2049 				   const struct ldb_parse_tree *tree,
2050 				   struct dn_list *list)
2051 {
2052 	return ldb_kv_index_dn_ordered(module,
2053 				       ldb_kv,
2054 				       tree,
2055 				       list, false);
2056 }
2057 
2058 /*
2059   return a list of matching objects using a one-level index
2060  */
ldb_kv_index_dn_attr(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const char * attr,struct ldb_dn * dn,struct dn_list * list,enum key_truncation * truncation)2061 static int ldb_kv_index_dn_attr(struct ldb_module *module,
2062 				struct ldb_kv_private *ldb_kv,
2063 				const char *attr,
2064 				struct ldb_dn *dn,
2065 				struct dn_list *list,
2066 				enum key_truncation *truncation)
2067 {
2068 	struct ldb_context *ldb;
2069 	struct ldb_dn *key;
2070 	struct ldb_val val;
2071 	int ret;
2072 
2073 	ldb = ldb_module_get_ctx(module);
2074 
2075 	/* work out the index key from the parent DN */
2076 	val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2077 	if (val.data == NULL) {
2078 		const char *dn_str = ldb_dn_get_linearized(dn);
2079 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
2080 				       __location__
2081 				       ": Failed to get casefold DN "
2082 				       "from: %s",
2083 				       dn_str);
2084 		return LDB_ERR_OPERATIONS_ERROR;
2085 	}
2086 	val.length = strlen((char *)val.data);
2087 	key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
2088 	if (!key) {
2089 		ldb_oom(ldb);
2090 		return LDB_ERR_OPERATIONS_ERROR;
2091 	}
2092 
2093 	ret = ldb_kv_dn_list_load(module, ldb_kv, key, list,
2094 				  DN_LIST_WILL_BE_READ_ONLY);
2095 	talloc_free(key);
2096 	if (ret != LDB_SUCCESS) {
2097 		return ret;
2098 	}
2099 
2100 	if (list->count == 0) {
2101 		return LDB_ERR_NO_SUCH_OBJECT;
2102 	}
2103 
2104 	return LDB_SUCCESS;
2105 }
2106 
2107 /*
2108   return a list of matching objects using a one-level index
2109  */
ldb_kv_index_dn_one(struct ldb_module * module,struct ldb_kv_private * ldb_kv,struct ldb_dn * parent_dn,struct dn_list * list,enum key_truncation * truncation)2110 static int ldb_kv_index_dn_one(struct ldb_module *module,
2111 			       struct ldb_kv_private *ldb_kv,
2112 			       struct ldb_dn *parent_dn,
2113 			       struct dn_list *list,
2114 			       enum key_truncation *truncation)
2115 {
2116 	int ret = ldb_kv_index_dn_attr(
2117 	    module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
2118 	if (ret == LDB_SUCCESS) {
2119 		/*
2120 		 * Ensure we do not shortcut on intersection for this
2121 		 * list.  We must never be lazy and return an entry
2122 		 * not in this list.  This allows the index for
2123 		 * SCOPE_ONELEVEL to be trusted.
2124 		 */
2125 
2126 		list->strict = true;
2127 	}
2128 	return ret;
2129 }
2130 
2131 /*
2132   return a list of matching objects using the DN index
2133  */
ldb_kv_index_dn_base_dn(struct ldb_module * module,struct ldb_kv_private * ldb_kv,struct ldb_dn * base_dn,struct dn_list * dn_list,enum key_truncation * truncation)2134 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
2135 				   struct ldb_kv_private *ldb_kv,
2136 				   struct ldb_dn *base_dn,
2137 				   struct dn_list *dn_list,
2138 				   enum key_truncation *truncation)
2139 {
2140 	const struct ldb_val *guid_val = NULL;
2141 	if (ldb_kv->cache->GUID_index_attribute == NULL) {
2142 		dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2143 		if (dn_list->dn == NULL) {
2144 			return ldb_module_oom(module);
2145 		}
2146 		dn_list->dn[0].data = discard_const_p(unsigned char,
2147 						      ldb_dn_get_linearized(base_dn));
2148 		if (dn_list->dn[0].data == NULL) {
2149 			talloc_free(dn_list->dn);
2150 			return ldb_module_oom(module);
2151 		}
2152 		dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
2153 		dn_list->count = 1;
2154 
2155 		return LDB_SUCCESS;
2156 	}
2157 
2158 	if (ldb_kv->cache->GUID_index_dn_component != NULL) {
2159 		guid_val = ldb_dn_get_extended_component(
2160 		    base_dn, ldb_kv->cache->GUID_index_dn_component);
2161 	}
2162 
2163 	if (guid_val != NULL) {
2164 		dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2165 		if (dn_list->dn == NULL) {
2166 			return ldb_module_oom(module);
2167 		}
2168 		dn_list->dn[0].data = guid_val->data;
2169 		dn_list->dn[0].length = guid_val->length;
2170 		dn_list->count = 1;
2171 
2172 		return LDB_SUCCESS;
2173 	}
2174 
2175 	return ldb_kv_index_dn_attr(
2176 	    module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
2177 }
2178 
2179 /*
2180   return a list of dn's that might match a indexed search or
2181   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
2182  */
ldb_kv_index_dn(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_parse_tree * tree,struct dn_list * list)2183 static int ldb_kv_index_dn(struct ldb_module *module,
2184 			   struct ldb_kv_private *ldb_kv,
2185 			   const struct ldb_parse_tree *tree,
2186 			   struct dn_list *list)
2187 {
2188 	int ret = LDB_ERR_OPERATIONS_ERROR;
2189 
2190 	switch (tree->operation) {
2191 	case LDB_OP_AND:
2192 		ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
2193 		break;
2194 
2195 	case LDB_OP_OR:
2196 		ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
2197 		break;
2198 
2199 	case LDB_OP_NOT:
2200 		ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
2201 		break;
2202 
2203 	case LDB_OP_EQUALITY:
2204 		ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
2205 		break;
2206 
2207 	case LDB_OP_GREATER:
2208 		ret = ldb_kv_index_dn_greater(module, ldb_kv, tree, list);
2209 		break;
2210 
2211 	case LDB_OP_LESS:
2212 		ret = ldb_kv_index_dn_less(module, ldb_kv, tree, list);
2213 		break;
2214 
2215 	case LDB_OP_SUBSTRING:
2216 	case LDB_OP_PRESENT:
2217 	case LDB_OP_APPROX:
2218 	case LDB_OP_EXTENDED:
2219 		/* we can't index with fancy bitops yet */
2220 		ret = LDB_ERR_OPERATIONS_ERROR;
2221 		break;
2222 	}
2223 
2224 	return ret;
2225 }
2226 
2227 /*
2228   filter a candidate dn_list from an indexed search into a set of results
2229   extracting just the given attributes
2230 */
ldb_kv_index_filter(struct ldb_kv_private * ldb_kv,const struct dn_list * dn_list,struct ldb_kv_context * ac,uint32_t * match_count,enum key_truncation scope_one_truncation)2231 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
2232 			       const struct dn_list *dn_list,
2233 			       struct ldb_kv_context *ac,
2234 			       uint32_t *match_count,
2235 			       enum key_truncation scope_one_truncation)
2236 {
2237 	struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2238 	struct ldb_message *msg;
2239 	struct ldb_message *filtered_msg;
2240 	unsigned int i;
2241 	unsigned int num_keys = 0;
2242 	uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
2243 	struct ldb_val *keys = NULL;
2244 
2245 	/*
2246 	 * We have to allocate the key list (rather than just walk the
2247 	 * caller supplied list) as the callback could change the list
2248 	 * (by modifying an indexed attribute hosted in the in-memory
2249 	 * index cache!)
2250 	 */
2251 	keys = talloc_array(ac, struct ldb_val, dn_list->count);
2252 	if (keys == NULL) {
2253 		return ldb_module_oom(ac->module);
2254 	}
2255 
2256 	if (ldb_kv->cache->GUID_index_attribute != NULL) {
2257 		/*
2258 		 * We speculate that the keys will be GUID based and so
2259 		 * pre-fill in enough space for a GUID (avoiding a pile of
2260 		 * small allocations)
2261 		 */
2262 		struct guid_tdb_key {
2263 			uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2264 		} *key_values = NULL;
2265 
2266 		key_values = talloc_array(keys,
2267 					  struct guid_tdb_key,
2268 					  dn_list->count);
2269 
2270 		if (key_values == NULL) {
2271 			talloc_free(keys);
2272 			return ldb_module_oom(ac->module);
2273 		}
2274 		for (i = 0; i < dn_list->count; i++) {
2275 			keys[i].data = key_values[i].guid_key;
2276 			keys[i].length = sizeof(key_values[i].guid_key);
2277 		}
2278 	} else {
2279 		for (i = 0; i < dn_list->count; i++) {
2280 			keys[i].data = NULL;
2281 			keys[i].length = 0;
2282 		}
2283 	}
2284 
2285 	for (i = 0; i < dn_list->count; i++) {
2286 		int ret;
2287 
2288 		ret = ldb_kv_idx_to_key(
2289 		    ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
2290 		if (ret != LDB_SUCCESS) {
2291 			talloc_free(keys);
2292 			return ret;
2293 		}
2294 
2295 		if (ldb_kv->cache->GUID_index_attribute != NULL) {
2296 			/*
2297 			 * If we are in GUID index mode, then the dn_list is
2298 			 * sorted.  If we got a duplicate, forget about it, as
2299 			 * otherwise we would send the same entry back more
2300 			 * than once.
2301 			 *
2302 			 * This is needed in the truncated DN case, or if a
2303 			 * duplicate was forced in via
2304 			 * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2305 			 */
2306 
2307 			if (memcmp(previous_guid_key,
2308 				   keys[num_keys].data,
2309 				   sizeof(previous_guid_key)) == 0) {
2310 				continue;
2311 			}
2312 
2313 			memcpy(previous_guid_key,
2314 			       keys[num_keys].data,
2315 			       sizeof(previous_guid_key));
2316 		}
2317 		num_keys++;
2318 	}
2319 
2320 
2321 	/*
2322 	 * Now that the list is a safe copy, send the callbacks
2323 	 */
2324 	for (i = 0; i < num_keys; i++) {
2325 		int ret;
2326 		bool matched;
2327 		msg = ldb_msg_new(ac);
2328 		if (!msg) {
2329 			talloc_free(keys);
2330 			return LDB_ERR_OPERATIONS_ERROR;
2331 		}
2332 
2333 		ret =
2334 		    ldb_kv_search_key(ac->module,
2335 				      ldb_kv,
2336 				      keys[i],
2337 				      msg,
2338 				      LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC |
2339 				      /*
2340 				       * The entry point ldb_kv_search_indexed is
2341 				       * only called from the read-locked
2342 				       * ldb_kv_search.
2343 				       */
2344 				      LDB_UNPACK_DATA_FLAG_READ_LOCKED);
2345 		if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2346 			/*
2347 			 * the record has disappeared? yes, this can
2348 			 * happen if the entry is deleted by something
2349 			 * operating in the callback (not another
2350 			 * process, as we have a read lock)
2351 			 */
2352 			talloc_free(msg);
2353 			continue;
2354 		}
2355 
2356 		if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2357 			/* an internal error */
2358 			talloc_free(keys);
2359 			talloc_free(msg);
2360 			return LDB_ERR_OPERATIONS_ERROR;
2361 		}
2362 
2363 		/*
2364 		 * We trust the index for LDB_SCOPE_ONELEVEL
2365 		 * unless the index key has been truncated.
2366 		 *
2367 		 * LDB_SCOPE_BASE is not passed in by our only caller.
2368 		 */
2369 		if (ac->scope == LDB_SCOPE_ONELEVEL &&
2370 		    ldb_kv->cache->one_level_indexes &&
2371 		    scope_one_truncation == KEY_NOT_TRUNCATED) {
2372 			ret = ldb_match_message(ldb, msg, ac->tree,
2373 						ac->scope, &matched);
2374 		} else {
2375 			ret = ldb_match_msg_error(ldb, msg,
2376 						  ac->tree, ac->base,
2377 						  ac->scope, &matched);
2378 		}
2379 
2380 		if (ret != LDB_SUCCESS) {
2381 			talloc_free(keys);
2382 			talloc_free(msg);
2383 			return ret;
2384 		}
2385 		if (!matched) {
2386 			talloc_free(msg);
2387 			continue;
2388 		}
2389 
2390 		filtered_msg = ldb_msg_new(ac);
2391 		if (filtered_msg == NULL) {
2392 			TALLOC_FREE(keys);
2393 			TALLOC_FREE(msg);
2394 			return LDB_ERR_OPERATIONS_ERROR;
2395 		}
2396 
2397 		filtered_msg->dn = talloc_steal(filtered_msg, msg->dn);
2398 
2399 		/* filter the attributes that the user wants */
2400 		ret = ldb_kv_filter_attrs(ldb, msg, ac->attrs, filtered_msg);
2401 
2402 		talloc_free(msg);
2403 
2404 		if (ret == -1) {
2405 			TALLOC_FREE(filtered_msg);
2406 			talloc_free(keys);
2407 			return LDB_ERR_OPERATIONS_ERROR;
2408 		}
2409 
2410 		ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
2411 		if (ret != LDB_SUCCESS) {
2412 			/* Regardless of success or failure, the msg
2413 			 * is the callbacks responsiblity, and should
2414 			 * not be talloc_free()'ed */
2415 			ac->request_terminated = true;
2416 			talloc_free(keys);
2417 			return ret;
2418 		}
2419 
2420 		(*match_count)++;
2421 	}
2422 
2423 	TALLOC_FREE(keys);
2424 	return LDB_SUCCESS;
2425 }
2426 
2427 /*
2428   sort a DN list
2429  */
ldb_kv_dn_list_sort(struct ldb_kv_private * ltdb,struct dn_list * list)2430 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
2431 				struct dn_list *list)
2432 {
2433 	if (list->count < 2) {
2434 		return;
2435 	}
2436 
2437 	/* We know the list is sorted when using the GUID index */
2438 	if (ltdb->cache->GUID_index_attribute != NULL) {
2439 		return;
2440 	}
2441 
2442 	TYPESAFE_QSORT(list->dn, list->count,
2443 		       ldb_val_equal_exact_for_qsort);
2444 }
2445 
2446 /*
2447   search the database with a LDAP-like expression using indexes
2448   returns -1 if an indexed search is not possible, in which
2449   case the caller should call ltdb_search_full()
2450 */
ldb_kv_search_indexed(struct ldb_kv_context * ac,uint32_t * match_count)2451 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
2452 {
2453 	struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2454 	struct ldb_kv_private *ldb_kv = talloc_get_type(
2455 	    ldb_module_get_private(ac->module), struct ldb_kv_private);
2456 	struct dn_list *dn_list;
2457 	int ret;
2458 	enum ldb_scope index_scope;
2459 	enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
2460 
2461 	/* see if indexing is enabled */
2462 	if (!ldb_kv->cache->attribute_indexes &&
2463 	    !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
2464 		/* fallback to a full search */
2465 		return LDB_ERR_OPERATIONS_ERROR;
2466 	}
2467 
2468 	dn_list = talloc_zero(ac, struct dn_list);
2469 	if (dn_list == NULL) {
2470 		return ldb_module_oom(ac->module);
2471 	}
2472 
2473 	/*
2474 	 * For the purposes of selecting the switch arm below, if we
2475 	 * don't have a one-level index then treat it like a subtree
2476 	 * search
2477 	 */
2478 	if (ac->scope == LDB_SCOPE_ONELEVEL &&
2479 	    !ldb_kv->cache->one_level_indexes) {
2480 		index_scope = LDB_SCOPE_SUBTREE;
2481 	} else {
2482 		index_scope = ac->scope;
2483 	}
2484 
2485 	switch (index_scope) {
2486 	case LDB_SCOPE_BASE:
2487 		/*
2488 		 * The only caller will have filtered the operation out
2489 		 * so we should never get here
2490 		 */
2491 		return ldb_operr(ldb);
2492 
2493 	case LDB_SCOPE_ONELEVEL:
2494 
2495 		/*
2496 		 * First, load all the one-level child objects (regardless of
2497 		 * whether they match the search filter or not). The database
2498 		 * maintains a one-level index, so retrieving this is quick.
2499 		 */
2500 		ret = ldb_kv_index_dn_one(ac->module,
2501 					  ldb_kv,
2502 					  ac->base,
2503 					  dn_list,
2504 					  &scope_one_truncation);
2505 		if (ret != LDB_SUCCESS) {
2506 			talloc_free(dn_list);
2507 			return ret;
2508 		}
2509 
2510 		/*
2511 		 * If we have too many children, running ldb_kv_index_filter()
2512 		 * over all the child objects can be quite expensive. So next
2513 		 * we do a separate indexed query using the search filter.
2514 		 *
2515 		 * This should be quick, but it may return objects that are not
2516 		 * the direct one-level child objects we're interested in.
2517 		 *
2518 		 * We only do this in the GUID index mode, which is
2519 		 * O(n*log(m)) otherwise the intersection below will
2520 		 * be too costly at O(n*m).
2521 		 *
2522 		 * We don't set a heuristic for 'too many' but instead
2523 		 * do it always and rely on the index lookup being
2524 		 * fast enough in the small case.
2525 		 */
2526 		if (ldb_kv->cache->GUID_index_attribute != NULL) {
2527 			struct dn_list *indexed_search_result
2528 				= talloc_zero(ac, struct dn_list);
2529 			if (indexed_search_result == NULL) {
2530 				talloc_free(dn_list);
2531 				return ldb_module_oom(ac->module);
2532 			}
2533 
2534 			if (!ldb_kv->cache->attribute_indexes) {
2535 				talloc_free(indexed_search_result);
2536 				talloc_free(dn_list);
2537 				return LDB_ERR_OPERATIONS_ERROR;
2538 			}
2539 
2540 			/*
2541 			 * Try to do an indexed database search
2542 			 */
2543 			ret = ldb_kv_index_dn(
2544 			    ac->module, ldb_kv, ac->tree,
2545 			    indexed_search_result);
2546 
2547 			/*
2548 			 * We can stop if we're sure the object doesn't exist
2549 			 */
2550 			if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2551 				talloc_free(indexed_search_result);
2552 				talloc_free(dn_list);
2553 				return LDB_ERR_NO_SUCH_OBJECT;
2554 			}
2555 
2556 			/*
2557 			 * Once we have a successful search result, we
2558 			 * intersect it with the one-level children (dn_list).
2559 			 * This should give us exactly the result we're after
2560 			 * (we still need to run ldb_kv_index_filter() to
2561 			 * handle potential index truncation cases).
2562 			 *
2563 			 * The indexed search may fail because we don't support
2564 			 * indexing on that type of search operation, e.g.
2565 			 * matching against '*'. In which case we fall through
2566 			 * and run ldb_kv_index_filter() over all the one-level
2567 			 * children (which is still better than bailing out here
2568 			 * and falling back to a full DB scan).
2569 			 */
2570 			if (ret == LDB_SUCCESS) {
2571 				if (!list_intersect(ldb_kv,
2572 						    dn_list,
2573 						    indexed_search_result)) {
2574 					talloc_free(indexed_search_result);
2575 					talloc_free(dn_list);
2576 					return LDB_ERR_OPERATIONS_ERROR;
2577 				}
2578 			}
2579 		}
2580 		break;
2581 
2582 	case LDB_SCOPE_SUBTREE:
2583 	case LDB_SCOPE_DEFAULT:
2584 		if (!ldb_kv->cache->attribute_indexes) {
2585 			talloc_free(dn_list);
2586 			return LDB_ERR_OPERATIONS_ERROR;
2587 		}
2588 		/*
2589 		 * Here we load the index for the tree.  We have no
2590 		 * index for the subtree.
2591 		 */
2592 		ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2593 		if (ret != LDB_SUCCESS) {
2594 			talloc_free(dn_list);
2595 			return ret;
2596 		}
2597 		break;
2598 	}
2599 
2600 	/*
2601 	 * It is critical that this function do the re-filter even
2602 	 * on things found by the index as the index can over-match
2603 	 * in cases of truncation (as well as when it decides it is
2604 	 * not worth further filtering)
2605 	 *
2606 	 * If this changes, then the index code above would need to
2607 	 * pass up a flag to say if any index was truncated during
2608 	 * processing as the truncation here refers only to the
2609 	 * SCOPE_ONELEVEL index.
2610 	 */
2611 	ret = ldb_kv_index_filter(
2612 	    ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2613 	talloc_free(dn_list);
2614 	return ret;
2615 }
2616 
2617 /**
2618  * @brief Add a DN in the index list of a given attribute name/value pair
2619  *
2620  * This function will add the DN in the index list for the index for
2621  * the given attribute name and value.
2622  *
2623  * @param[in]  module       A ldb_module structure
2624  *
2625  * @param[in]  dn           The string representation of the DN as it
2626  *                          will be stored in the index entry
2627  *
2628  * @param[in]  el           A ldb_message_element array, one of the entry
2629  *                          referred by the v_idx is the attribute name and
2630  *                          value pair which will be used to construct the
2631  *                          index name
2632  *
2633  * @param[in]  v_idx        The index of element in the el array to use
2634  *
2635  * @return                  An ldb error code
2636  */
ldb_kv_index_add1(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_message * msg,struct ldb_message_element * el,int v_idx)2637 static int ldb_kv_index_add1(struct ldb_module *module,
2638 			     struct ldb_kv_private *ldb_kv,
2639 			     const struct ldb_message *msg,
2640 			     struct ldb_message_element *el,
2641 			     int v_idx)
2642 {
2643 	struct ldb_context *ldb;
2644 	struct ldb_dn *dn_key;
2645 	int ret;
2646 	const struct ldb_schema_attribute *a;
2647 	struct dn_list *list;
2648 	unsigned alloc_len;
2649 	enum key_truncation truncation = KEY_TRUNCATED;
2650 
2651 
2652 	ldb = ldb_module_get_ctx(module);
2653 
2654 	list = talloc_zero(module, struct dn_list);
2655 	if (list == NULL) {
2656 		return LDB_ERR_OPERATIONS_ERROR;
2657 	}
2658 
2659 	dn_key = ldb_kv_index_key(
2660 	    ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2661 	if (!dn_key) {
2662 		talloc_free(list);
2663 		return LDB_ERR_OPERATIONS_ERROR;
2664 	}
2665 	/*
2666 	 * Samba only maintains unique indexes on the objectSID and objectGUID
2667 	 * so if a unique index key exceeds the maximum length there is a
2668 	 * problem.
2669 	 */
2670 	if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2671 		(a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2672 		(el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2673 
2674 		ldb_asprintf_errstring(
2675 		    ldb,
2676 		    __location__ ": unique index key on %s in %s, "
2677 				 "exceeds maximum key length of %u (encoded).",
2678 		    el->name,
2679 		    ldb_dn_get_linearized(msg->dn),
2680 		    ldb_kv->max_key_length);
2681 		talloc_free(list);
2682 		return LDB_ERR_CONSTRAINT_VIOLATION;
2683 	}
2684 	talloc_steal(list, dn_key);
2685 
2686 	ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
2687 				  DN_LIST_MUTABLE);
2688 	if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2689 		talloc_free(list);
2690 		return ret;
2691 	}
2692 
2693 	/*
2694 	 * Check for duplicates in the @IDXDN DN -> GUID record
2695 	 *
2696 	 * This is very normal, it just means a duplicate DN creation
2697 	 * was attempted, so don't set the error string or print scary
2698 	 * messages.
2699 	 */
2700 	if (list->count > 0 &&
2701 	    ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2702 	    truncation == KEY_NOT_TRUNCATED) {
2703 
2704 		talloc_free(list);
2705 		return LDB_ERR_CONSTRAINT_VIOLATION;
2706 
2707 	} else if (list->count > 0
2708 		   && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2709 
2710 		/*
2711 		 * At least one existing entry in the DN->GUID index, which
2712 		 * arises when the DN indexes have been truncated
2713 		 *
2714 		 * So need to pull the DN's to check if it's really a duplicate
2715 		 */
2716 		unsigned int i;
2717 		for (i=0; i < list->count; i++) {
2718 			uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2719 			struct ldb_val key = {
2720 				.data = guid_key,
2721 				.length = sizeof(guid_key)
2722 			};
2723 			const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2724 			struct ldb_message *rec = ldb_msg_new(ldb);
2725 			if (rec == NULL) {
2726 				return LDB_ERR_OPERATIONS_ERROR;
2727 			}
2728 
2729 			ret = ldb_kv_idx_to_key(
2730 			    module, ldb_kv, ldb, &list->dn[i], &key);
2731 			if (ret != LDB_SUCCESS) {
2732 				TALLOC_FREE(list);
2733 				TALLOC_FREE(rec);
2734 				return ret;
2735 			}
2736 
2737 			ret =
2738 			    ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2739 			if (key.data != guid_key) {
2740 				TALLOC_FREE(key.data);
2741 			}
2742 			if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2743 				/*
2744 				 * the record has disappeared?
2745 				 * yes, this can happen
2746 				 */
2747 				talloc_free(rec);
2748 				continue;
2749 			}
2750 
2751 			if (ret != LDB_SUCCESS) {
2752 				/* an internal error */
2753 				TALLOC_FREE(rec);
2754 				TALLOC_FREE(list);
2755 				return LDB_ERR_OPERATIONS_ERROR;
2756 			}
2757 			/*
2758 			 * The DN we are trying to add to the DB and index
2759 			 * is already here, so we must deny the addition
2760 			 */
2761 			if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2762 				TALLOC_FREE(rec);
2763 				TALLOC_FREE(list);
2764 				return LDB_ERR_CONSTRAINT_VIOLATION;
2765 			}
2766 		}
2767 	}
2768 
2769 	/*
2770 	 * Check for duplicates in unique indexes
2771 	 *
2772 	 * We don't need to do a loop test like the @IDXDN case
2773 	 * above as we have a ban on long unique index values
2774 	 * at the start of this function.
2775 	 */
2776 	if (list->count > 0 &&
2777 	    ((a != NULL
2778 	      && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2779 		  (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2780 		/*
2781 		 * We do not want to print info about a possibly
2782 		 * confidential DN that the conflict was with in the
2783 		 * user-visible error string
2784 		 */
2785 
2786 		if (ldb_kv->cache->GUID_index_attribute == NULL) {
2787 			ldb_debug(ldb, LDB_DEBUG_WARNING,
2788 				  __location__
2789 				  ": unique index violation on %s in %s, "
2790 				  "conflicts with %*.*s in %s",
2791 				  el->name, ldb_dn_get_linearized(msg->dn),
2792 				  (int)list->dn[0].length,
2793 				  (int)list->dn[0].length,
2794 				  list->dn[0].data,
2795 				  ldb_dn_get_linearized(dn_key));
2796 		} else {
2797 			/* This can't fail, gives a default at worst */
2798 			const struct ldb_schema_attribute *attr =
2799 			    ldb_schema_attribute_by_name(
2800 				ldb, ldb_kv->cache->GUID_index_attribute);
2801 			struct ldb_val v;
2802 			ret = attr->syntax->ldif_write_fn(ldb, list,
2803 							  &list->dn[0], &v);
2804 			if (ret == LDB_SUCCESS) {
2805 				ldb_debug(ldb,
2806 					  LDB_DEBUG_WARNING,
2807 					  __location__
2808 					  ": unique index violation on %s in "
2809 					  "%s, conflicts with %s %*.*s in %s",
2810 					  el->name,
2811 					  ldb_dn_get_linearized(msg->dn),
2812 					  ldb_kv->cache->GUID_index_attribute,
2813 					  (int)v.length,
2814 					  (int)v.length,
2815 					  v.data,
2816 					  ldb_dn_get_linearized(dn_key));
2817 			}
2818 		}
2819 		ldb_asprintf_errstring(ldb,
2820 				       __location__ ": unique index violation "
2821 				       "on %s in %s",
2822 				       el->name,
2823 				       ldb_dn_get_linearized(msg->dn));
2824 		talloc_free(list);
2825 		return LDB_ERR_CONSTRAINT_VIOLATION;
2826 	}
2827 
2828 	/* overallocate the list a bit, to reduce the number of
2829 	 * realloc trigered copies */
2830 	alloc_len = ((list->count+1)+7) & ~7;
2831 	list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2832 	if (list->dn == NULL) {
2833 		talloc_free(list);
2834 		return LDB_ERR_OPERATIONS_ERROR;
2835 	}
2836 
2837 	if (ldb_kv->cache->GUID_index_attribute == NULL) {
2838 		const char *dn_str = ldb_dn_get_linearized(msg->dn);
2839 		list->dn[list->count].data
2840 			= (uint8_t *)talloc_strdup(list->dn, dn_str);
2841 		if (list->dn[list->count].data == NULL) {
2842 			talloc_free(list);
2843 			return LDB_ERR_OPERATIONS_ERROR;
2844 		}
2845 		list->dn[list->count].length = strlen(dn_str);
2846 	} else {
2847 		const struct ldb_val *key_val;
2848 		struct ldb_val *exact = NULL, *next = NULL;
2849 		key_val = ldb_msg_find_ldb_val(
2850 		    msg, ldb_kv->cache->GUID_index_attribute);
2851 		if (key_val == NULL) {
2852 			talloc_free(list);
2853 			return ldb_module_operr(module);
2854 		}
2855 
2856 		if (key_val->length != LDB_KV_GUID_SIZE) {
2857 			talloc_free(list);
2858 			return ldb_module_operr(module);
2859 		}
2860 
2861 		BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2862 					*key_val, ldb_val_equal_exact_ordered,
2863 					exact, next);
2864 
2865 		/*
2866 		 * Give a warning rather than fail, this could be a
2867 		 * duplicate value in the record allowed by a caller
2868 		 * forcing in the value with
2869 		 * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2870 		 */
2871 		if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2872 			/* This can't fail, gives a default at worst */
2873 			const struct ldb_schema_attribute *attr =
2874 			    ldb_schema_attribute_by_name(
2875 				ldb, ldb_kv->cache->GUID_index_attribute);
2876 			struct ldb_val v;
2877 			ret = attr->syntax->ldif_write_fn(ldb, list,
2878 							  exact, &v);
2879 			if (ret == LDB_SUCCESS) {
2880 				ldb_debug(ldb,
2881 					  LDB_DEBUG_WARNING,
2882 					  __location__
2883 					  ": duplicate attribute value in %s "
2884 					  "for index on %s, "
2885 					  "duplicate of %s %*.*s in %s",
2886 					  ldb_dn_get_linearized(msg->dn),
2887 					  el->name,
2888 					  ldb_kv->cache->GUID_index_attribute,
2889 					  (int)v.length,
2890 					  (int)v.length,
2891 					  v.data,
2892 					  ldb_dn_get_linearized(dn_key));
2893 			}
2894 		}
2895 
2896 		if (next == NULL) {
2897 			next = &list->dn[list->count];
2898 		} else {
2899 			memmove(&next[1], next,
2900 				sizeof(*next) * (list->count - (next - list->dn)));
2901 		}
2902 		*next = ldb_val_dup(list->dn, key_val);
2903 		if (next->data == NULL) {
2904 			talloc_free(list);
2905 			return ldb_module_operr(module);
2906 		}
2907 	}
2908 	list->count++;
2909 
2910 	ret = ldb_kv_dn_list_store(module, dn_key, list);
2911 
2912 	talloc_free(list);
2913 
2914 	return ret;
2915 }
2916 
2917 /*
2918   add index entries for one elements in a message
2919  */
ldb_kv_index_add_el(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_message * msg,struct ldb_message_element * el)2920 static int ldb_kv_index_add_el(struct ldb_module *module,
2921 			       struct ldb_kv_private *ldb_kv,
2922 			       const struct ldb_message *msg,
2923 			       struct ldb_message_element *el)
2924 {
2925 	unsigned int i;
2926 	for (i = 0; i < el->num_values; i++) {
2927 		int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2928 		if (ret != LDB_SUCCESS) {
2929 			return ret;
2930 		}
2931 	}
2932 
2933 	return LDB_SUCCESS;
2934 }
2935 
2936 /*
2937   add index entries for all elements in a message
2938  */
ldb_kv_index_add_all(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_message * msg)2939 static int ldb_kv_index_add_all(struct ldb_module *module,
2940 				struct ldb_kv_private *ldb_kv,
2941 				const struct ldb_message *msg)
2942 {
2943 	struct ldb_message_element *elements = msg->elements;
2944 	unsigned int i;
2945 	const char *dn_str;
2946 	int ret;
2947 
2948 	if (ldb_dn_is_special(msg->dn)) {
2949 		return LDB_SUCCESS;
2950 	}
2951 
2952 	dn_str = ldb_dn_get_linearized(msg->dn);
2953 	if (dn_str == NULL) {
2954 		return LDB_ERR_OPERATIONS_ERROR;
2955 	}
2956 
2957 	ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2958 	if (ret != LDB_SUCCESS) {
2959 		return ret;
2960 	}
2961 
2962 	if (!ldb_kv->cache->attribute_indexes) {
2963 		/* no indexed fields */
2964 		return LDB_SUCCESS;
2965 	}
2966 
2967 	for (i = 0; i < msg->num_elements; i++) {
2968 		if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2969 			continue;
2970 		}
2971 		ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2972 		if (ret != LDB_SUCCESS) {
2973 			struct ldb_context *ldb = ldb_module_get_ctx(module);
2974 			ldb_asprintf_errstring(ldb,
2975 					       __location__ ": Failed to re-index %s in %s - %s",
2976 					       elements[i].name, dn_str,
2977 					       ldb_errstring(ldb));
2978 			return ret;
2979 		}
2980 	}
2981 
2982 	return LDB_SUCCESS;
2983 }
2984 
2985 
2986 /*
2987   insert a DN index for a message
2988 */
ldb_kv_modify_index_dn(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_message * msg,struct ldb_dn * dn,const char * index,int add)2989 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2990 				  struct ldb_kv_private *ldb_kv,
2991 				  const struct ldb_message *msg,
2992 				  struct ldb_dn *dn,
2993 				  const char *index,
2994 				  int add)
2995 {
2996 	struct ldb_message_element el;
2997 	struct ldb_val val;
2998 	int ret;
2999 
3000 	val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
3001 	if (val.data == NULL) {
3002 		const char *dn_str = ldb_dn_get_linearized(dn);
3003 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
3004 				       __location__ ": Failed to modify %s "
3005 						    "against %s in %s: failed "
3006 						    "to get casefold DN",
3007 				       index,
3008 				       ldb_kv->cache->GUID_index_attribute,
3009 				       dn_str);
3010 		return LDB_ERR_OPERATIONS_ERROR;
3011 	}
3012 
3013 	val.length = strlen((char *)val.data);
3014 	el.name = index;
3015 	el.values = &val;
3016 	el.num_values = 1;
3017 
3018 	if (add) {
3019 		ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
3020 	} else { /* delete */
3021 		ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
3022 	}
3023 
3024 	if (ret != LDB_SUCCESS) {
3025 		struct ldb_context *ldb = ldb_module_get_ctx(module);
3026 		const char *dn_str = ldb_dn_get_linearized(dn);
3027 		ldb_asprintf_errstring(ldb,
3028 				       __location__ ": Failed to modify %s "
3029 						    "against %s in %s - %s",
3030 				       index,
3031 				       ldb_kv->cache->GUID_index_attribute,
3032 				       dn_str,
3033 				       ldb_errstring(ldb));
3034 		return ret;
3035 	}
3036 	return ret;
3037 }
3038 
3039 /*
3040   insert a one level index for a message
3041 */
ldb_kv_index_onelevel(struct ldb_module * module,const struct ldb_message * msg,int add)3042 static int ldb_kv_index_onelevel(struct ldb_module *module,
3043 				 const struct ldb_message *msg,
3044 				 int add)
3045 {
3046 	struct ldb_kv_private *ldb_kv = talloc_get_type(
3047 	    ldb_module_get_private(module), struct ldb_kv_private);
3048 	struct ldb_dn *pdn;
3049 	int ret;
3050 
3051 	/* We index for ONE Level only if requested */
3052 	if (!ldb_kv->cache->one_level_indexes) {
3053 		return LDB_SUCCESS;
3054 	}
3055 
3056 	pdn = ldb_dn_get_parent(module, msg->dn);
3057 	if (pdn == NULL) {
3058 		return LDB_ERR_OPERATIONS_ERROR;
3059 	}
3060 	ret =
3061 	    ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
3062 
3063 	talloc_free(pdn);
3064 
3065 	return ret;
3066 }
3067 
3068 /*
3069   insert a one level index for a message
3070 */
ldb_kv_write_index_dn_guid(struct ldb_module * module,const struct ldb_message * msg,int add)3071 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
3072 				      const struct ldb_message *msg,
3073 				      int add)
3074 {
3075 	int ret;
3076 	struct ldb_kv_private *ldb_kv = talloc_get_type(
3077 	    ldb_module_get_private(module), struct ldb_kv_private);
3078 
3079 	/* We index for DN only if using a GUID index */
3080 	if (ldb_kv->cache->GUID_index_attribute == NULL) {
3081 		return LDB_SUCCESS;
3082 	}
3083 
3084 	ret = ldb_kv_modify_index_dn(
3085 	    module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
3086 
3087 	if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
3088 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
3089 				       "Entry %s already exists",
3090 				       ldb_dn_get_linearized(msg->dn));
3091 		ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
3092 	}
3093 	return ret;
3094 }
3095 
3096 /*
3097   add the index entries for a new element in a record
3098   The caller guarantees that these element values are not yet indexed
3099 */
ldb_kv_index_add_element(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_message * msg,struct ldb_message_element * el)3100 int ldb_kv_index_add_element(struct ldb_module *module,
3101 			     struct ldb_kv_private *ldb_kv,
3102 			     const struct ldb_message *msg,
3103 			     struct ldb_message_element *el)
3104 {
3105 	if (ldb_dn_is_special(msg->dn)) {
3106 		return LDB_SUCCESS;
3107 	}
3108 	if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3109 		return LDB_SUCCESS;
3110 	}
3111 	return ldb_kv_index_add_el(module, ldb_kv, msg, el);
3112 }
3113 
3114 /*
3115   add the index entries for a new record
3116 */
ldb_kv_index_add_new(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_message * msg)3117 int ldb_kv_index_add_new(struct ldb_module *module,
3118 			 struct ldb_kv_private *ldb_kv,
3119 			 const struct ldb_message *msg)
3120 {
3121 	int ret;
3122 
3123 	if (ldb_dn_is_special(msg->dn)) {
3124 		return LDB_SUCCESS;
3125 	}
3126 
3127 	ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3128 	if (ret != LDB_SUCCESS) {
3129 		/*
3130 		 * Because we can't trust the caller to be doing
3131 		 * transactions properly, clean up any index for this
3132 		 * entry rather than relying on a transaction
3133 		 * cleanup
3134 		 */
3135 
3136 		ldb_kv_index_delete(module, msg);
3137 		return ret;
3138 	}
3139 
3140 	ret = ldb_kv_index_onelevel(module, msg, 1);
3141 	if (ret != LDB_SUCCESS) {
3142 		/*
3143 		 * Because we can't trust the caller to be doing
3144 		 * transactions properly, clean up any index for this
3145 		 * entry rather than relying on a transaction
3146 		 * cleanup
3147 		 */
3148 		ldb_kv_index_delete(module, msg);
3149 		return ret;
3150 	}
3151 	return ret;
3152 }
3153 
3154 
3155 /*
3156   delete an index entry for one message element
3157 */
ldb_kv_index_del_value(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_message * msg,struct ldb_message_element * el,unsigned int v_idx)3158 int ldb_kv_index_del_value(struct ldb_module *module,
3159 			   struct ldb_kv_private *ldb_kv,
3160 			   const struct ldb_message *msg,
3161 			   struct ldb_message_element *el,
3162 			   unsigned int v_idx)
3163 {
3164 	struct ldb_context *ldb;
3165 	struct ldb_dn *dn_key;
3166 	const char *dn_str;
3167 	int ret, i;
3168 	unsigned int j;
3169 	struct dn_list *list;
3170 	struct ldb_dn *dn = msg->dn;
3171 	enum key_truncation truncation = KEY_NOT_TRUNCATED;
3172 
3173 	ldb = ldb_module_get_ctx(module);
3174 
3175 	dn_str = ldb_dn_get_linearized(dn);
3176 	if (dn_str == NULL) {
3177 		return LDB_ERR_OPERATIONS_ERROR;
3178 	}
3179 
3180 	if (dn_str[0] == '@') {
3181 		return LDB_SUCCESS;
3182 	}
3183 
3184 	dn_key = ldb_kv_index_key(
3185 	    ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
3186 	/*
3187 	 * We ignore key truncation in ltdb_index_add1() so
3188 	 * match that by ignoring it here as well
3189 	 *
3190 	 * Multiple values are legitimate and accepted
3191 	 */
3192 	if (!dn_key) {
3193 		return LDB_ERR_OPERATIONS_ERROR;
3194 	}
3195 
3196 	list = talloc_zero(dn_key, struct dn_list);
3197 	if (list == NULL) {
3198 		talloc_free(dn_key);
3199 		return LDB_ERR_OPERATIONS_ERROR;
3200 	}
3201 
3202 	ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
3203 				  DN_LIST_MUTABLE);
3204 	if (ret == LDB_ERR_NO_SUCH_OBJECT) {
3205 		/* it wasn't indexed. Did we have an earlier error? If we did then
3206 		   its gone now */
3207 		talloc_free(dn_key);
3208 		return LDB_SUCCESS;
3209 	}
3210 
3211 	if (ret != LDB_SUCCESS) {
3212 		talloc_free(dn_key);
3213 		return ret;
3214 	}
3215 
3216 	/*
3217 	 * Find one of the values matching this message to remove
3218 	 */
3219 	i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
3220 	if (i == -1) {
3221 		/* nothing to delete */
3222 		talloc_free(dn_key);
3223 		return LDB_SUCCESS;
3224 	}
3225 
3226 	j = (unsigned int) i;
3227 	if (j != list->count - 1) {
3228 		memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
3229 	}
3230 	list->count--;
3231 	if (list->count == 0) {
3232 		talloc_free(list->dn);
3233 		list->dn = NULL;
3234 	} else {
3235 		list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
3236 	}
3237 
3238 	ret = ldb_kv_dn_list_store(module, dn_key, list);
3239 
3240 	talloc_free(dn_key);
3241 
3242 	return ret;
3243 }
3244 
3245 /*
3246   delete the index entries for a element
3247   return -1 on failure
3248 */
ldb_kv_index_del_element(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_message * msg,struct ldb_message_element * el)3249 int ldb_kv_index_del_element(struct ldb_module *module,
3250 			     struct ldb_kv_private *ldb_kv,
3251 			     const struct ldb_message *msg,
3252 			     struct ldb_message_element *el)
3253 {
3254 	const char *dn_str;
3255 	int ret;
3256 	unsigned int i;
3257 
3258 	if (!ldb_kv->cache->attribute_indexes) {
3259 		/* no indexed fields */
3260 		return LDB_SUCCESS;
3261 	}
3262 
3263 	dn_str = ldb_dn_get_linearized(msg->dn);
3264 	if (dn_str == NULL) {
3265 		return LDB_ERR_OPERATIONS_ERROR;
3266 	}
3267 
3268 	if (dn_str[0] == '@') {
3269 		return LDB_SUCCESS;
3270 	}
3271 
3272 	if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3273 		return LDB_SUCCESS;
3274 	}
3275 	for (i = 0; i < el->num_values; i++) {
3276 		ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
3277 		if (ret != LDB_SUCCESS) {
3278 			return ret;
3279 		}
3280 	}
3281 
3282 	return LDB_SUCCESS;
3283 }
3284 
3285 /*
3286   delete the index entries for a record
3287   return -1 on failure
3288 */
ldb_kv_index_delete(struct ldb_module * module,const struct ldb_message * msg)3289 int ldb_kv_index_delete(struct ldb_module *module,
3290 			const struct ldb_message *msg)
3291 {
3292 	struct ldb_kv_private *ldb_kv = talloc_get_type(
3293 	    ldb_module_get_private(module), struct ldb_kv_private);
3294 	int ret;
3295 	unsigned int i;
3296 
3297 	if (ldb_dn_is_special(msg->dn)) {
3298 		return LDB_SUCCESS;
3299 	}
3300 
3301 	ret = ldb_kv_index_onelevel(module, msg, 0);
3302 	if (ret != LDB_SUCCESS) {
3303 		return ret;
3304 	}
3305 
3306 	ret = ldb_kv_write_index_dn_guid(module, msg, 0);
3307 	if (ret != LDB_SUCCESS) {
3308 		return ret;
3309 	}
3310 
3311 	if (!ldb_kv->cache->attribute_indexes) {
3312 		/* no indexed fields */
3313 		return LDB_SUCCESS;
3314 	}
3315 
3316 	for (i = 0; i < msg->num_elements; i++) {
3317 		ret = ldb_kv_index_del_element(
3318 		    module, ldb_kv, msg, &msg->elements[i]);
3319 		if (ret != LDB_SUCCESS) {
3320 			return ret;
3321 		}
3322 	}
3323 
3324 	return LDB_SUCCESS;
3325 }
3326 
3327 
3328 /*
3329   traversal function that deletes all @INDEX records in the in-memory
3330   TDB.
3331 
3332   This does not touch the actual DB, that is done at transaction
3333   commit, which in turn greatly reduces DB churn as we will likely
3334   be able to do a direct update into the old record.
3335 */
delete_index(struct ldb_kv_private * ldb_kv,struct ldb_val key,_UNUSED_ struct ldb_val data,void * state)3336 static int delete_index(struct ldb_kv_private *ldb_kv,
3337 			struct ldb_val key,
3338 			_UNUSED_ struct ldb_val data,
3339 			void *state)
3340 {
3341 	struct ldb_module *module = state;
3342 	const char *dnstr = "DN=" LDB_KV_INDEX ":";
3343 	struct dn_list list;
3344 	struct ldb_dn *dn;
3345 	struct ldb_val v;
3346 	int ret;
3347 
3348 	if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
3349 		return 0;
3350 	}
3351 	/* we need to put a empty list in the internal tdb for this
3352 	 * index entry */
3353 	list.dn = NULL;
3354 	list.count = 0;
3355 
3356 	/* the offset of 3 is to remove the DN= prefix. */
3357 	v.data = key.data + 3;
3358 	v.length = strnlen((char *)key.data, key.length) - 3;
3359 
3360 	dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
3361 
3362 	/*
3363 	 * This does not actually touch the DB quite yet, just
3364          * the in-memory index cache
3365 	 */
3366 	ret = ldb_kv_dn_list_store(module, dn, &list);
3367 	if (ret != LDB_SUCCESS) {
3368 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
3369 				       "Unable to store null index for %s\n",
3370 						ldb_dn_get_linearized(dn));
3371 		talloc_free(dn);
3372 		return -1;
3373 	}
3374 	talloc_free(dn);
3375 	return 0;
3376 }
3377 
3378 /*
3379   traversal function that adds @INDEX records during a re index TODO wrong comment
3380 */
re_key(struct ldb_kv_private * ldb_kv,struct ldb_val key,struct ldb_val val,void * state)3381 static int re_key(struct ldb_kv_private *ldb_kv,
3382 		  struct ldb_val key,
3383 		  struct ldb_val val,
3384 		  void *state)
3385 {
3386 	struct ldb_context *ldb;
3387 	struct ldb_kv_reindex_context *ctx =
3388 	    (struct ldb_kv_reindex_context *)state;
3389 	struct ldb_module *module = ldb_kv->module;
3390 	struct ldb_message *msg;
3391 	int ret;
3392 	struct ldb_val key2;
3393 	bool is_record;
3394 
3395 	ldb = ldb_module_get_ctx(module);
3396 
3397 	is_record = ldb_kv_key_is_normal_record(key);
3398 	if (is_record == false) {
3399 		return 0;
3400 	}
3401 
3402 	msg = ldb_msg_new(module);
3403 	if (msg == NULL) {
3404 		return -1;
3405 	}
3406 
3407 	ret = ldb_unpack_data(ldb, &val, msg);
3408 	if (ret != 0) {
3409 		ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3410 						ldb_dn_get_linearized(msg->dn));
3411 		ctx->error = ret;
3412 		talloc_free(msg);
3413 		return -1;
3414 	}
3415 
3416 	if (msg->dn == NULL) {
3417 		ldb_debug(ldb, LDB_DEBUG_ERROR,
3418 			  "Refusing to re-index as GUID "
3419 			  "key %*.*s with no DN\n",
3420 			  (int)key.length, (int)key.length,
3421 			  (char *)key.data);
3422 		talloc_free(msg);
3423 		return -1;
3424 	}
3425 
3426 	/* check if the DN key has changed, perhaps due to the case
3427 	   insensitivity of an element changing, or a change from DN
3428 	   to GUID keys */
3429 	key2 = ldb_kv_key_msg(module, msg, msg);
3430 	if (key2.data == NULL) {
3431 		/* probably a corrupt record ... darn */
3432 		ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
3433 						ldb_dn_get_linearized(msg->dn));
3434 		talloc_free(msg);
3435 		return 0;
3436 	}
3437 	if (key.length != key2.length ||
3438 	    (memcmp(key.data, key2.data, key.length) != 0)) {
3439 		ldb_kv->kv_ops->update_in_iterate(
3440 		    ldb_kv, key, key2, val, ctx);
3441 	}
3442 	talloc_free(key2.data);
3443 
3444 	talloc_free(msg);
3445 
3446 	ctx->count++;
3447 	if (ctx->count % 10000 == 0) {
3448 		ldb_debug(ldb, LDB_DEBUG_WARNING,
3449 			  "Reindexing: re-keyed %u records so far",
3450 			  ctx->count);
3451 	}
3452 
3453 	return 0;
3454 }
3455 
3456 /*
3457   traversal function that adds @INDEX records during a re index
3458 */
re_index(struct ldb_kv_private * ldb_kv,struct ldb_val key,struct ldb_val val,void * state)3459 static int re_index(struct ldb_kv_private *ldb_kv,
3460 		    struct ldb_val key,
3461 		    struct ldb_val val,
3462 		    void *state)
3463 {
3464 	struct ldb_context *ldb;
3465 	struct ldb_kv_reindex_context *ctx =
3466 	    (struct ldb_kv_reindex_context *)state;
3467 	struct ldb_module *module = ldb_kv->module;
3468 	struct ldb_message *msg;
3469 	int ret;
3470 	bool is_record;
3471 
3472 	ldb = ldb_module_get_ctx(module);
3473 
3474 	is_record = ldb_kv_key_is_normal_record(key);
3475 	if (is_record == false) {
3476 		return 0;
3477 	}
3478 
3479 	msg = ldb_msg_new(module);
3480 	if (msg == NULL) {
3481 		return -1;
3482 	}
3483 
3484 	ret = ldb_unpack_data(ldb, &val, msg);
3485 	if (ret != 0) {
3486 		ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3487 						ldb_dn_get_linearized(msg->dn));
3488 		ctx->error = ret;
3489 		talloc_free(msg);
3490 		return -1;
3491 	}
3492 
3493 	if (msg->dn == NULL) {
3494 		ldb_debug(ldb, LDB_DEBUG_ERROR,
3495 			  "Refusing to re-index as GUID "
3496 			  "key %*.*s with no DN\n",
3497 			  (int)key.length, (int)key.length,
3498 			  (char *)key.data);
3499 		talloc_free(msg);
3500 		return -1;
3501 	}
3502 
3503 	ret = ldb_kv_index_onelevel(module, msg, 1);
3504 	if (ret != LDB_SUCCESS) {
3505 		ldb_debug(ldb, LDB_DEBUG_ERROR,
3506 			  "Adding special ONE LEVEL index failed (%s)!",
3507 						ldb_dn_get_linearized(msg->dn));
3508 		talloc_free(msg);
3509 		return -1;
3510 	}
3511 
3512 	ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3513 
3514 	if (ret != LDB_SUCCESS) {
3515 		ctx->error = ret;
3516 		talloc_free(msg);
3517 		return -1;
3518 	}
3519 
3520 	talloc_free(msg);
3521 
3522 	ctx->count++;
3523 	if (ctx->count % 10000 == 0) {
3524 		ldb_debug(ldb, LDB_DEBUG_WARNING,
3525 			  "Reindexing: re-indexed %u records so far",
3526 			  ctx->count);
3527 	}
3528 
3529 	return 0;
3530 }
3531 
3532 /*
3533  * Convert the 4-byte pack format version to a number that's slightly
3534  * more intelligible to a user e.g. version 0, 1, 2, etc.
3535  */
displayable_pack_version(uint32_t version)3536 static uint32_t displayable_pack_version(uint32_t version) {
3537 	if (version < LDB_PACKING_FORMAT_NODN) {
3538 		return version; /* unknown - can't convert */
3539 	}
3540 
3541 	return (version - LDB_PACKING_FORMAT_NODN);
3542 }
3543 
re_pack(struct ldb_kv_private * ldb_kv,_UNUSED_ struct ldb_val key,struct ldb_val val,void * state)3544 static int re_pack(struct ldb_kv_private *ldb_kv,
3545 		   _UNUSED_ struct ldb_val key,
3546 		   struct ldb_val val,
3547 		   void *state)
3548 {
3549 	struct ldb_context *ldb;
3550 	struct ldb_message *msg;
3551 	struct ldb_module *module = ldb_kv->module;
3552 	struct ldb_kv_repack_context *ctx =
3553 	    (struct ldb_kv_repack_context *)state;
3554 	int ret;
3555 
3556 	ldb = ldb_module_get_ctx(module);
3557 
3558 	msg = ldb_msg_new(module);
3559 	if (msg == NULL) {
3560 		return -1;
3561 	}
3562 
3563 	ret = ldb_unpack_data(ldb, &val, msg);
3564 	if (ret != 0) {
3565 		ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: unpack failed: %s\n",
3566 			  ldb_dn_get_linearized(msg->dn));
3567 		ctx->error = ret;
3568 		talloc_free(msg);
3569 		return -1;
3570 	}
3571 
3572 	ret = ldb_kv_store(module, msg, TDB_MODIFY);
3573 	if (ret != LDB_SUCCESS) {
3574 		ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: store failed: %s\n",
3575 			  ldb_dn_get_linearized(msg->dn));
3576 		ctx->error = ret;
3577 		talloc_free(msg);
3578 		return -1;
3579 	}
3580 
3581 	/*
3582 	 * Warn the user that we're repacking the first time we see a normal
3583 	 * record. This means we never warn if we're repacking a database with
3584 	 * only @ records. This is because during database initialisation,
3585 	 * we might repack as initial settings are written out, and we don't
3586 	 * want to spam the log.
3587 	 */
3588 	if ((!ctx->normal_record_seen) && (!ldb_dn_is_special(msg->dn))) {
3589 		ldb_debug(ldb, LDB_DEBUG_ALWAYS_LOG,
3590 			  "Repacking database from v%u to v%u format "
3591 			  "(first record %s)",
3592 			  displayable_pack_version(ctx->old_version),
3593 			  displayable_pack_version(ldb_kv->pack_format_version),
3594 			  ldb_dn_get_linearized(msg->dn));
3595 		ctx->normal_record_seen = true;
3596 	}
3597 
3598 	ctx->count++;
3599 	if (ctx->count % 10000 == 0) {
3600 		ldb_debug(ldb, LDB_DEBUG_WARNING,
3601 			  "Repack: re-packed %u records so far",
3602 			  ctx->count);
3603 	}
3604 
3605 	talloc_free(msg);
3606 	return 0;
3607 }
3608 
ldb_kv_repack(struct ldb_module * module)3609 int ldb_kv_repack(struct ldb_module *module)
3610 {
3611 	struct ldb_kv_private *ldb_kv = talloc_get_type(
3612 	    ldb_module_get_private(module), struct ldb_kv_private);
3613 	struct ldb_context *ldb = ldb_module_get_ctx(module);
3614 	struct ldb_kv_repack_context ctx;
3615 	int ret;
3616 
3617 	ctx.old_version = ldb_kv->pack_format_version;
3618 	ctx.count = 0;
3619 	ctx.error = LDB_SUCCESS;
3620 	ctx.normal_record_seen = false;
3621 
3622 	ldb_kv->pack_format_version = ldb_kv->target_pack_format_version;
3623 
3624 	/* Iterate all database records and repack them in the new format */
3625 	ret = ldb_kv->kv_ops->iterate(ldb_kv, re_pack, &ctx);
3626 	if (ret < 0) {
3627 		ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack traverse failed: %s",
3628 			  ldb_errstring(ldb));
3629 		return LDB_ERR_OPERATIONS_ERROR;
3630 	}
3631 
3632 	if (ctx.error != LDB_SUCCESS) {
3633 		ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack failed: %s",
3634 			  ldb_errstring(ldb));
3635 		return ctx.error;
3636 	}
3637 
3638 	return LDB_SUCCESS;
3639 }
3640 
3641 /*
3642   force a complete reindex of the database
3643 */
ldb_kv_reindex(struct ldb_module * module)3644 int ldb_kv_reindex(struct ldb_module *module)
3645 {
3646 	struct ldb_kv_private *ldb_kv = talloc_get_type(
3647 	    ldb_module_get_private(module), struct ldb_kv_private);
3648 	int ret;
3649 	struct ldb_kv_reindex_context ctx;
3650 	size_t index_cache_size = 0;
3651 
3652 	/*
3653 	 * Only triggered after a modification, but make clear we do
3654 	 * not re-index a read-only DB
3655 	 */
3656 	if (ldb_kv->read_only) {
3657 		return LDB_ERR_UNWILLING_TO_PERFORM;
3658 	}
3659 
3660 	if (ldb_kv_cache_reload(module) != 0) {
3661 		return LDB_ERR_OPERATIONS_ERROR;
3662 	}
3663 
3664 	/*
3665 	 * Ensure we read (and so remove) the entries from the real
3666 	 * DB, no values stored so far are any use as we want to do a
3667 	 * re-index
3668 	 */
3669 	ldb_kv_index_transaction_cancel(module);
3670 	if (ldb_kv->nested_idx_ptr != NULL) {
3671 		ldb_kv_index_sub_transaction_cancel(ldb_kv);
3672 	}
3673 
3674 	/*
3675 	 * Calculate the size of the index cache needed for
3676 	 * the re-index. If specified always use the
3677 	 * ldb_kv->index_transaction_cache_size otherwise use the maximum
3678 	 * of the size estimate or the DEFAULT_INDEX_CACHE_SIZE
3679 	 */
3680 	if (ldb_kv->index_transaction_cache_size > 0) {
3681 		index_cache_size = ldb_kv->index_transaction_cache_size;
3682 	} else {
3683 		index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
3684 		if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
3685 			index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
3686 		}
3687 	}
3688 
3689 	/*
3690 	 * Note that we don't start an index sub transaction for re-indexing
3691 	 */
3692 	ret = ldb_kv_index_transaction_start(module, index_cache_size);
3693 	if (ret != LDB_SUCCESS) {
3694 		return ret;
3695 	}
3696 
3697 	/* first traverse the database deleting any @INDEX records by
3698 	 * putting NULL entries in the in-memory tdb
3699 	 */
3700 	ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3701 	if (ret < 0) {
3702 		struct ldb_context *ldb = ldb_module_get_ctx(module);
3703 		ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3704 				       ldb_errstring(ldb));
3705 		return LDB_ERR_OPERATIONS_ERROR;
3706 	}
3707 
3708 	ctx.error = 0;
3709 	ctx.count = 0;
3710 
3711 	ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3712 	if (ret < 0) {
3713 		struct ldb_context *ldb = ldb_module_get_ctx(module);
3714 		ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3715 				       ldb_errstring(ldb));
3716 		return LDB_ERR_OPERATIONS_ERROR;
3717 	}
3718 
3719 	if (ctx.error != LDB_SUCCESS) {
3720 		struct ldb_context *ldb = ldb_module_get_ctx(module);
3721 		ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3722 		return ctx.error;
3723 	}
3724 
3725 	ctx.error = 0;
3726 	ctx.count = 0;
3727 
3728 	/* now traverse adding any indexes for normal LDB records */
3729 	ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3730 	if (ret < 0) {
3731 		struct ldb_context *ldb = ldb_module_get_ctx(module);
3732 		ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3733 				       ldb_errstring(ldb));
3734 		return LDB_ERR_OPERATIONS_ERROR;
3735 	}
3736 
3737 	if (ctx.error != LDB_SUCCESS) {
3738 		struct ldb_context *ldb = ldb_module_get_ctx(module);
3739 		ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3740 		return ctx.error;
3741 	}
3742 
3743 	if (ctx.count > 10000) {
3744 		ldb_debug(ldb_module_get_ctx(module),
3745 			  LDB_DEBUG_WARNING,
3746 			  "Reindexing: re_index successful on %s, "
3747 			  "final index write-out will be in transaction commit",
3748 			  ldb_kv->kv_ops->name(ldb_kv));
3749 	}
3750 	return LDB_SUCCESS;
3751 }
3752 
3753 /*
3754  * Copy the contents of the nested transaction index cache record to the
3755  * transaction index cache.
3756  *
3757  * During this 'commit' of the subtransaction to the main transaction
3758  * (cache), care must be taken to free any existing index at the top
3759  * level because otherwise we would leak memory.
3760  */
ldb_kv_sub_transaction_traverse(struct tdb_context * tdb,TDB_DATA key,TDB_DATA data,void * state)3761 static int ldb_kv_sub_transaction_traverse(
3762 	struct tdb_context *tdb,
3763 	TDB_DATA key,
3764 	TDB_DATA data,
3765 	void *state)
3766 {
3767 	struct ldb_module *module = state;
3768 	struct ldb_kv_private *ldb_kv = talloc_get_type(
3769 	    ldb_module_get_private(module), struct ldb_kv_private);
3770 	TDB_DATA rec = {0};
3771 	struct dn_list *index_in_subtransaction = NULL;
3772 	struct dn_list *index_in_top_level = NULL;
3773 	int ret = 0;
3774 
3775 	/*
3776 	 * This unwraps the pointer in the DB into a pointer in
3777 	 * memory, we are abusing TDB as a hash map, not a linearised
3778 	 * database store
3779 	 */
3780 	index_in_subtransaction = ldb_kv_index_idxptr(module, data);
3781 	if (index_in_subtransaction == NULL) {
3782 		ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
3783 		return -1;
3784 	}
3785 
3786 	/*
3787 	 * Do we already have an entry in the primary transaction cache
3788 	 * If so free it's dn_list and replace it with the dn_list from
3789 	 * the secondary cache
3790 	 *
3791 	 * The TDB and so the fetched rec contains NO DATA, just a
3792 	 * pointer to data held in memory.
3793 	 */
3794 	rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
3795 	if (rec.dptr != NULL) {
3796 		index_in_top_level = ldb_kv_index_idxptr(module, rec);
3797 		free(rec.dptr);
3798 		if (index_in_top_level == NULL) {
3799 			abort();
3800 		}
3801 		/*
3802 		 * We had this key at the top level.  However we made a copy
3803 		 * at the sub-transaction level so that we could possibly
3804 		 * roll back.  We have to free the top level index memory
3805 		 * otherwise we would leak
3806 		 */
3807 		if (index_in_top_level->count > 0) {
3808 			TALLOC_FREE(index_in_top_level->dn);
3809 		}
3810 		index_in_top_level->dn
3811 			= talloc_steal(index_in_top_level,
3812 				       index_in_subtransaction->dn);
3813 		index_in_top_level->count = index_in_subtransaction->count;
3814 		return 0;
3815 	}
3816 
3817 	index_in_top_level = talloc(ldb_kv->idxptr, struct dn_list);
3818 	if (index_in_top_level == NULL) {
3819 		ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
3820 		return -1;
3821 	}
3822 	index_in_top_level->dn
3823 		= talloc_steal(index_in_top_level,
3824 			       index_in_subtransaction->dn);
3825 	index_in_top_level->count = index_in_subtransaction->count;
3826 
3827 	rec.dptr = (uint8_t *)&index_in_top_level;
3828 	rec.dsize = sizeof(void *);
3829 
3830 
3831 	/*
3832 	 * This is not a store into the main DB, but into an in-memory
3833 	 * TDB, so we don't need a guard on ltdb->read_only
3834 	 */
3835 	ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
3836 	if (ret != 0) {
3837 		ldb_kv->idxptr->error = ltdb_err_map(
3838 		    tdb_error(ldb_kv->idxptr->itdb));
3839 		return -1;
3840 	}
3841 	return 0;
3842 }
3843 
3844 /*
3845  * Initialise the index cache for a sub transaction.
3846  */
ldb_kv_index_sub_transaction_start(struct ldb_kv_private * ldb_kv)3847 int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
3848 {
3849 	ldb_kv->nested_idx_ptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
3850 	if (ldb_kv->nested_idx_ptr == NULL) {
3851 		return LDB_ERR_OPERATIONS_ERROR;
3852 	}
3853 
3854 	/*
3855 	 * We use a tiny hash size for the sub-database (11).
3856 	 *
3857 	 * The sub-transaction is only for one record at a time, we
3858 	 * would use a linked list but that would make the code even
3859 	 * more complex when manipulating the index, as it would have
3860 	 * to know if we were in a nested transaction (normal
3861 	 * operations) or the top one (a reindex).
3862 	 */
3863 	ldb_kv->nested_idx_ptr->itdb =
3864 		tdb_open(NULL, 11, TDB_INTERNAL, O_RDWR, 0);
3865 	if (ldb_kv->nested_idx_ptr->itdb == NULL) {
3866 		return LDB_ERR_OPERATIONS_ERROR;
3867 	}
3868 	return LDB_SUCCESS;
3869 }
3870 
3871 /*
3872  * Clear the contents of the nested transaction index cache when the nested
3873  * transaction is cancelled.
3874  */
ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private * ldb_kv)3875 int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
3876 {
3877 	if (ldb_kv->nested_idx_ptr != NULL) {
3878 		tdb_close(ldb_kv->nested_idx_ptr->itdb);
3879 		TALLOC_FREE(ldb_kv->nested_idx_ptr);
3880 	}
3881 	return LDB_SUCCESS;
3882 }
3883 
3884 /*
3885  * Commit a nested transaction,
3886  * Copy the contents of the nested transaction index cache to the
3887  * transaction index cache.
3888  */
ldb_kv_index_sub_transaction_commit(struct ldb_kv_private * ldb_kv)3889 int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
3890 {
3891 	int ret = 0;
3892 
3893 	if (ldb_kv->nested_idx_ptr == NULL) {
3894 		return LDB_SUCCESS;
3895 	}
3896 	if (ldb_kv->nested_idx_ptr->itdb == NULL) {
3897 		return LDB_SUCCESS;
3898 	}
3899 	tdb_traverse(
3900 	    ldb_kv->nested_idx_ptr->itdb,
3901 	    ldb_kv_sub_transaction_traverse,
3902 	    ldb_kv->module);
3903 	tdb_close(ldb_kv->nested_idx_ptr->itdb);
3904 	ldb_kv->nested_idx_ptr->itdb = NULL;
3905 
3906 	ret = ldb_kv->nested_idx_ptr->error;
3907 	if (ret != LDB_SUCCESS) {
3908 		struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
3909 		if (!ldb_errstring(ldb)) {
3910 			ldb_set_errstring(ldb, ldb_strerror(ret));
3911 		}
3912 		ldb_asprintf_errstring(
3913 			ldb,
3914 			__location__": Failed to update index records in "
3915 			"sub transaction commit: %s",
3916 			ldb_errstring(ldb));
3917 	}
3918 	TALLOC_FREE(ldb_kv->nested_idx_ptr);
3919 	return ret;
3920 }
3921