1 /*
2    ldb database library
3 
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8 
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12 
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17 
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22 
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26 
27 /*
28  *  Name: ldb_kv
29  *
30  *  Component: ldb key value backend
31  *
32  *  Description: core functions for ldb key value backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51 
52 #include "ldb_kv.h"
53 #include "ldb_private.h"
54 
55 /*
56   prevent memory errors on callbacks
57 */
58 struct ldb_kv_req_spy {
59 	struct ldb_kv_context *ctx;
60 };
61 
62 /*
63  * Determine if this key could hold a record.  We allow the new GUID
64  * index, the old DN index and a possible future ID=
65  */
ldb_kv_key_is_normal_record(struct ldb_val key)66 bool ldb_kv_key_is_normal_record(struct ldb_val key)
67 {
68 	if (key.length < 4) {
69 		return false;
70 	}
71 
72 	/*
73 	 * @ records are not normal records, we don't want to index
74 	 * them nor search on them
75 	 */
76 	if (key.length > 4 &&
77 	    memcmp(key.data, "DN=@", 4) == 0) {
78 		return false;
79 	}
80 
81 	/* All other DN= records are however */
82 	if (memcmp(key.data, "DN=", 3) == 0) {
83 		return true;
84 	}
85 
86 	if (memcmp(key.data, "ID=", 3) == 0) {
87 		return true;
88 	}
89 
90 	if (key.length < sizeof(LDB_KV_GUID_KEY_PREFIX)) {
91 		return false;
92 	}
93 
94 	if (memcmp(key.data, LDB_KV_GUID_KEY_PREFIX,
95 		   sizeof(LDB_KV_GUID_KEY_PREFIX) - 1) == 0) {
96 		return true;
97 	}
98 
99 	return false;
100 }
101 
102 /*
103   form a ldb_val for a record key
104   caller frees
105 
106   note that the key for a record can depend on whether the
107   dn refers to a case sensitive index record or not
108 */
ldb_kv_key_dn(struct ldb_module * module,TALLOC_CTX * mem_ctx,struct ldb_dn * dn)109 struct ldb_val ldb_kv_key_dn(struct ldb_module *module,
110 			     TALLOC_CTX *mem_ctx,
111 			     struct ldb_dn *dn)
112 {
113 	struct ldb_val key;
114 	char *key_str = NULL;
115 	const char *dn_folded = NULL;
116 
117 	/*
118 	  most DNs are case insensitive. The exception is index DNs for
119 	  case sensitive attributes
120 
121 	  there are 3 cases dealt with in this code:
122 
123 	  1) if the dn doesn't start with @ then uppercase the attribute
124              names and the attributes values of case insensitive attributes
125 	  2) if the dn starts with @ then leave it alone -
126 	     the indexing code handles the rest
127 	*/
128 
129 	dn_folded = ldb_dn_get_casefold(dn);
130 	if (!dn_folded) {
131 		goto failed;
132 	}
133 
134 	key_str = talloc_strdup(mem_ctx, "DN=");
135 	if (!key_str) {
136 		goto failed;
137 	}
138 
139 	key_str = talloc_strdup_append_buffer(key_str, dn_folded);
140 	if (!key_str) {
141 		goto failed;
142 	}
143 
144 	key.data = (uint8_t *)key_str;
145 	key.length = strlen(key_str) + 1;
146 
147 	return key;
148 
149 failed:
150 	errno = ENOMEM;
151 	key.data = NULL;
152 	key.length = 0;
153 	return key;
154 }
155 
156 /* The caller is to provide a correctly sized key */
ldb_kv_guid_to_key(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_val * GUID_val,struct ldb_val * key)157 int ldb_kv_guid_to_key(struct ldb_module *module,
158 		       struct ldb_kv_private *ldb_kv,
159 		       const struct ldb_val *GUID_val,
160 		       struct ldb_val *key)
161 {
162 	const char *GUID_prefix = LDB_KV_GUID_KEY_PREFIX;
163 	const int GUID_prefix_len = sizeof(LDB_KV_GUID_KEY_PREFIX) - 1;
164 
165 	if (key->length != (GUID_val->length+GUID_prefix_len)) {
166 		return LDB_ERR_OPERATIONS_ERROR;
167 	}
168 
169 	memcpy(key->data, GUID_prefix, GUID_prefix_len);
170 	memcpy(&key->data[GUID_prefix_len],
171 	       GUID_val->data, GUID_val->length);
172 	return LDB_SUCCESS;
173 }
174 
175 /*
176  * The caller is to provide a correctly sized key, used only in
177  * the GUID index mode
178  */
ldb_kv_idx_to_key(struct ldb_module * module,struct ldb_kv_private * ldb_kv,TALLOC_CTX * mem_ctx,const struct ldb_val * idx_val,struct ldb_val * key)179 int ldb_kv_idx_to_key(struct ldb_module *module,
180 		      struct ldb_kv_private *ldb_kv,
181 		      TALLOC_CTX *mem_ctx,
182 		      const struct ldb_val *idx_val,
183 		      struct ldb_val *key)
184 {
185 	struct ldb_context *ldb = ldb_module_get_ctx(module);
186 	struct ldb_dn *dn;
187 
188 	if (ldb_kv->cache->GUID_index_attribute != NULL) {
189 		return ldb_kv_guid_to_key(module, ldb_kv, idx_val, key);
190 	}
191 
192 	dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
193 	if (dn == NULL) {
194 		/*
195 		 * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
196 		 * to the caller, as this in an invalid index value
197 		 */
198 		return LDB_ERR_OPERATIONS_ERROR;
199 	}
200 	/* form the key */
201 	*key = ldb_kv_key_dn(module, mem_ctx, dn);
202 	TALLOC_FREE(dn);
203 	if (!key->data) {
204 		return ldb_module_oom(module);
205 	}
206 	return LDB_SUCCESS;
207 }
208 
209 /*
210   form a TDB_DATA for a record key
211   caller frees mem_ctx, which may or may not have the key
212   as a child.
213 
214   note that the key for a record can depend on whether a
215   GUID index is in use, or the DN is used as the key
216 */
ldb_kv_key_msg(struct ldb_module * module,TALLOC_CTX * mem_ctx,const struct ldb_message * msg)217 struct ldb_val ldb_kv_key_msg(struct ldb_module *module,
218 			TALLOC_CTX *mem_ctx,
219 			const struct ldb_message *msg)
220 {
221 	void *data = ldb_module_get_private(module);
222 	struct ldb_kv_private *ldb_kv =
223 	    talloc_get_type(data, struct ldb_kv_private);
224 	struct ldb_val key;
225 	const struct ldb_val *guid_val;
226 	int ret;
227 
228 	if (ldb_kv->cache->GUID_index_attribute == NULL) {
229 		return ldb_kv_key_dn(module, mem_ctx, msg->dn);
230 	}
231 
232 	if (ldb_dn_is_special(msg->dn)) {
233 		return ldb_kv_key_dn(module, mem_ctx, msg->dn);
234 	}
235 
236 	guid_val =
237 	    ldb_msg_find_ldb_val(msg, ldb_kv->cache->GUID_index_attribute);
238 	if (guid_val == NULL) {
239 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
240 				       "Did not find GUID attribute %s "
241 				       "in %s, required for TDB record "
242 				       "key in " LDB_KV_IDXGUID " mode.",
243 				       ldb_kv->cache->GUID_index_attribute,
244 				       ldb_dn_get_linearized(msg->dn));
245 		errno = EINVAL;
246 		key.data = NULL;
247 		key.length = 0;
248 		return key;
249 	}
250 
251 	/* In this case, allocate with talloc */
252 	key.data = talloc_size(mem_ctx, LDB_KV_GUID_KEY_SIZE);
253 	if (key.data == NULL) {
254 		errno = ENOMEM;
255 		key.data = NULL;
256 		key.length = 0;
257 		return key;
258 	}
259 	key.length = talloc_get_size(key.data);
260 
261 	ret = ldb_kv_guid_to_key(module, ldb_kv, guid_val, &key);
262 
263 	if (ret != LDB_SUCCESS) {
264 		errno = EINVAL;
265 		key.data = NULL;
266 		key.length = 0;
267 		return key;
268 	}
269 	return key;
270 }
271 
272 /*
273   check special dn's have valid attributes
274   currently only @ATTRIBUTES is checked
275 */
ldb_kv_check_special_dn(struct ldb_module * module,const struct ldb_message * msg)276 static int ldb_kv_check_special_dn(struct ldb_module *module,
277 				   const struct ldb_message *msg)
278 {
279 	struct ldb_context *ldb = ldb_module_get_ctx(module);
280 	unsigned int i, j;
281 
282 	if (! ldb_dn_is_special(msg->dn) ||
283 	    ! ldb_dn_check_special(msg->dn, LDB_KV_ATTRIBUTES)) {
284 		return LDB_SUCCESS;
285 	}
286 
287 	/* we have @ATTRIBUTES, let's check attributes are fine */
288 	/* should we check that we deny multivalued attributes ? */
289 	for (i = 0; i < msg->num_elements; i++) {
290 		if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
291 
292 		for (j = 0; j < msg->elements[i].num_values; j++) {
293 			if (ldb_kv_check_at_attributes_values(
294 				&msg->elements[i].values[j]) != 0) {
295 				ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
296 				return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
297 			}
298 		}
299 	}
300 
301 	return LDB_SUCCESS;
302 }
303 
304 
305 /*
306   we've made a modification to a dn - possibly reindex and
307   update sequence number
308 */
ldb_kv_modified(struct ldb_module * module,struct ldb_dn * dn)309 static int ldb_kv_modified(struct ldb_module *module, struct ldb_dn *dn)
310 {
311 	int ret = LDB_SUCCESS;
312 	struct ldb_kv_private *ldb_kv = talloc_get_type(
313 	    ldb_module_get_private(module), struct ldb_kv_private);
314 
315 	/* only allow modifies inside a transaction, otherwise the
316 	 * ldb is unsafe */
317 	if (ldb_kv->kv_ops->transaction_active(ldb_kv) == false) {
318 		ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
319 		return LDB_ERR_OPERATIONS_ERROR;
320 	}
321 
322 	if (ldb_dn_is_special(dn) &&
323 	    (ldb_dn_check_special(dn, LDB_KV_INDEXLIST) ||
324 	     ldb_dn_check_special(dn, LDB_KV_ATTRIBUTES)) )
325 	{
326 		if (ldb_kv->warn_reindex) {
327 			ldb_debug(ldb_module_get_ctx(module),
328 				  LDB_DEBUG_ERROR,
329 				  "Reindexing %s due to modification on %s",
330 				  ldb_kv->kv_ops->name(ldb_kv),
331 				  ldb_dn_get_linearized(dn));
332 		}
333 		ret = ldb_kv_reindex(module);
334 	}
335 
336 	/* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
337 	if (ret == LDB_SUCCESS &&
338 	    !(ldb_dn_is_special(dn) &&
339 	      ldb_dn_check_special(dn, LDB_KV_BASEINFO)) ) {
340 		ret = ldb_kv_increase_sequence_number(module);
341 	}
342 
343 	/* If the modify was to @OPTIONS, reload the cache */
344 	if (ret == LDB_SUCCESS &&
345 	    ldb_dn_is_special(dn) &&
346 	    (ldb_dn_check_special(dn, LDB_KV_OPTIONS)) ) {
347 		ret = ldb_kv_cache_reload(module);
348 	}
349 
350 	if (ret != LDB_SUCCESS) {
351 		ldb_kv->reindex_failed = true;
352 	}
353 
354 	return ret;
355 }
356 /*
357   store a record into the db
358 */
ldb_kv_store(struct ldb_module * module,const struct ldb_message * msg,int flgs)359 int ldb_kv_store(struct ldb_module *module,
360 		 const struct ldb_message *msg,
361 		 int flgs)
362 {
363 	void *data = ldb_module_get_private(module);
364 	struct ldb_kv_private *ldb_kv =
365 	    talloc_get_type(data, struct ldb_kv_private);
366 	struct ldb_val key;
367 	struct ldb_val ldb_data;
368 	int ret = LDB_SUCCESS;
369 	TALLOC_CTX *key_ctx = talloc_new(module);
370 
371 	if (key_ctx == NULL) {
372 		return ldb_module_oom(module);
373 	}
374 
375 	if (ldb_kv->read_only) {
376 		talloc_free(key_ctx);
377 		return LDB_ERR_UNWILLING_TO_PERFORM;
378 	}
379 
380 	key = ldb_kv_key_msg(module, key_ctx, msg);
381 	if (key.data == NULL) {
382 		TALLOC_FREE(key_ctx);
383 		return LDB_ERR_OTHER;
384 	}
385 
386 	ret = ldb_pack_data(ldb_module_get_ctx(module),
387 			    msg, &ldb_data);
388 	if (ret == -1) {
389 		TALLOC_FREE(key_ctx);
390 		return LDB_ERR_OTHER;
391 	}
392 
393 	ret = ldb_kv->kv_ops->store(ldb_kv, key, ldb_data, flgs);
394 	if (ret != 0) {
395 		bool is_special = ldb_dn_is_special(msg->dn);
396 		ret = ldb_kv->kv_ops->error(ldb_kv);
397 
398 		/*
399 		 * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
400 		 * the GUID, so re-map
401 		 */
402 		if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS && !is_special &&
403 		    ldb_kv->cache->GUID_index_attribute != NULL) {
404 			ret = LDB_ERR_CONSTRAINT_VIOLATION;
405 		}
406 		goto done;
407 	}
408 
409 done:
410 	TALLOC_FREE(key_ctx);
411 	talloc_free(ldb_data.data);
412 
413 	return ret;
414 }
415 
416 
417 /*
418   check if a attribute is a single valued, for a given element
419  */
ldb_kv_single_valued(const struct ldb_schema_attribute * a,struct ldb_message_element * el)420 static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
421 				 struct ldb_message_element *el)
422 {
423 	if (!a) return false;
424 	if (el != NULL) {
425 		if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
426 			/* override from a ldb module, for example
427 			   used for the description field, which is
428 			   marked multi-valued in the schema but which
429 			   should not actually accept multiple
430 			   values */
431 			return true;
432 		}
433 		if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
434 			/* override from a ldb module, for example used for
435 			   deleted linked attribute entries */
436 			return false;
437 		}
438 	}
439 	if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
440 		return true;
441 	}
442 	return false;
443 }
444 
ldb_kv_add_internal(struct ldb_module * module,struct ldb_kv_private * ldb_kv,const struct ldb_message * msg,bool check_single_value)445 static int ldb_kv_add_internal(struct ldb_module *module,
446 			       struct ldb_kv_private *ldb_kv,
447 			       const struct ldb_message *msg,
448 			       bool check_single_value)
449 {
450 	struct ldb_context *ldb = ldb_module_get_ctx(module);
451 	int ret = LDB_SUCCESS;
452 	unsigned int i;
453 	bool valid_dn = false;
454 
455 	/* Check the new DN is reasonable */
456 	valid_dn = ldb_dn_validate(msg->dn);
457 	if (valid_dn == false) {
458 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
459 				       "Invalid DN in ADD: %s",
460 				       ldb_dn_get_linearized(msg->dn));
461 		return LDB_ERR_INVALID_DN_SYNTAX;
462 	}
463 
464 	for (i=0;i<msg->num_elements;i++) {
465 		struct ldb_message_element *el = &msg->elements[i];
466 		const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
467 
468 		if (el->num_values == 0) {
469 			ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
470 					       el->name, ldb_dn_get_linearized(msg->dn));
471 			return LDB_ERR_CONSTRAINT_VIOLATION;
472 		}
473 		if (check_single_value && el->num_values > 1 &&
474 		    ldb_kv_single_valued(a, el)) {
475 			ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
476 					       el->name, ldb_dn_get_linearized(msg->dn));
477 			return LDB_ERR_CONSTRAINT_VIOLATION;
478 		}
479 
480 		/* Do not check "@ATTRIBUTES" for duplicated values */
481 		if (ldb_dn_is_special(msg->dn) &&
482 		    ldb_dn_check_special(msg->dn, LDB_KV_ATTRIBUTES)) {
483 			continue;
484 		}
485 
486 		if (check_single_value &&
487 		    !(el->flags &
488 		      LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
489 			struct ldb_val *duplicate = NULL;
490 
491 			ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
492 							 el, &duplicate, 0);
493 			if (ret != LDB_SUCCESS) {
494 				return ret;
495 			}
496 			if (duplicate != NULL) {
497 				ldb_asprintf_errstring(
498 					ldb,
499 					"attribute '%s': value '%.*s' on '%s' "
500 					"provided more than once in ADD object",
501 					el->name,
502 					(int)duplicate->length,
503 					duplicate->data,
504 					ldb_dn_get_linearized(msg->dn));
505 				return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
506 			}
507 		}
508 	}
509 
510 	ret = ldb_kv_store(module, msg, TDB_INSERT);
511 	if (ret != LDB_SUCCESS) {
512 		/*
513 		 * Try really hard to get the right error code for
514 		 * a re-add situation, as this can matter!
515 		 */
516 		if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
517 			int ret2;
518 			struct ldb_dn *dn2 = NULL;
519 			TALLOC_CTX *mem_ctx = talloc_new(module);
520 			if (mem_ctx == NULL) {
521 				return ldb_module_operr(module);
522 			}
523 			ret2 =
524 			    ldb_kv_search_base(module, mem_ctx, msg->dn, &dn2);
525 			TALLOC_FREE(mem_ctx);
526 			if (ret2 == LDB_SUCCESS) {
527 				ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
528 			}
529 		}
530 		if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
531 			ldb_asprintf_errstring(ldb,
532 					       "Entry %s already exists",
533 					       ldb_dn_get_linearized(msg->dn));
534 		}
535 		return ret;
536 	}
537 
538 	ret = ldb_kv_index_add_new(module, ldb_kv, msg);
539 	if (ret != LDB_SUCCESS) {
540 		/*
541 		 * If we failed to index, delete the message again.
542 		 *
543 		 * This is particularly important for the GUID index
544 		 * case, which will only fail for a duplicate DN
545 		 * in the index add.
546 		 *
547 		 * Note that the caller may not cancel the transation
548 		 * and this means the above add might really show up!
549 		 */
550 		ldb_kv_delete_noindex(module, msg);
551 		return ret;
552 	}
553 
554 	ret = ldb_kv_modified(module, msg->dn);
555 
556 	return ret;
557 }
558 
559 /*
560   add a record to the database
561 */
ldb_kv_add(struct ldb_kv_context * ctx)562 static int ldb_kv_add(struct ldb_kv_context *ctx)
563 {
564 	struct ldb_module *module = ctx->module;
565 	struct ldb_request *req = ctx->req;
566 	void *data = ldb_module_get_private(module);
567 	struct ldb_kv_private *ldb_kv =
568 	    talloc_get_type(data, struct ldb_kv_private);
569 	int ret = LDB_SUCCESS;
570 
571 	if (ldb_kv->max_key_length != 0 &&
572 	    ldb_kv->cache->GUID_index_attribute == NULL &&
573 	    !ldb_dn_is_special(req->op.add.message->dn)) {
574 		ldb_set_errstring(ldb_module_get_ctx(module),
575 				  "Must operate ldb_mdb in GUID "
576 				  "index mode, but " LDB_KV_IDXGUID " not set.");
577 		return LDB_ERR_UNWILLING_TO_PERFORM;
578 	}
579 
580 	ret = ldb_kv_check_special_dn(module, req->op.add.message);
581 	if (ret != LDB_SUCCESS) {
582 		return ret;
583 	}
584 
585 	ldb_request_set_state(req, LDB_ASYNC_PENDING);
586 
587 	if (ldb_kv_cache_load(module) != 0) {
588 		return LDB_ERR_OPERATIONS_ERROR;
589 	}
590 
591 	ret = ldb_kv_add_internal(module, ldb_kv, req->op.add.message, true);
592 
593 	return ret;
594 }
595 
596 /*
597   delete a record from the database, not updating indexes (used for deleting
598   index records)
599 */
ldb_kv_delete_noindex(struct ldb_module * module,const struct ldb_message * msg)600 int ldb_kv_delete_noindex(struct ldb_module *module,
601 			  const struct ldb_message *msg)
602 {
603 	void *data = ldb_module_get_private(module);
604 	struct ldb_kv_private *ldb_kv =
605 	    talloc_get_type(data, struct ldb_kv_private);
606 	struct ldb_val key;
607 	int ret;
608 	TALLOC_CTX *tdb_key_ctx = talloc_new(module);
609 
610 	if (tdb_key_ctx == NULL) {
611 		return ldb_module_oom(module);
612 	}
613 
614 	if (ldb_kv->read_only) {
615 		talloc_free(tdb_key_ctx);
616 		return LDB_ERR_UNWILLING_TO_PERFORM;
617 	}
618 
619 	key = ldb_kv_key_msg(module, tdb_key_ctx, msg);
620 	if (!key.data) {
621 		TALLOC_FREE(tdb_key_ctx);
622 		return LDB_ERR_OTHER;
623 	}
624 
625 	ret = ldb_kv->kv_ops->delete (ldb_kv, key);
626 	TALLOC_FREE(tdb_key_ctx);
627 
628 	if (ret != 0) {
629 		ret = ldb_kv->kv_ops->error(ldb_kv);
630 	}
631 
632 	return ret;
633 }
634 
ldb_kv_delete_internal(struct ldb_module * module,struct ldb_dn * dn)635 static int ldb_kv_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
636 {
637 	struct ldb_message *msg;
638 	int ret = LDB_SUCCESS;
639 
640 	msg = ldb_msg_new(module);
641 	if (msg == NULL) {
642 		return LDB_ERR_OPERATIONS_ERROR;
643 	}
644 
645 	/* in case any attribute of the message was indexed, we need
646 	   to fetch the old record */
647 	ret = ldb_kv_search_dn1(
648 	    module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
649 	if (ret != LDB_SUCCESS) {
650 		/* not finding the old record is an error */
651 		goto done;
652 	}
653 
654 	ret = ldb_kv_delete_noindex(module, msg);
655 	if (ret != LDB_SUCCESS) {
656 		goto done;
657 	}
658 
659 	/* remove any indexed attributes */
660 	ret = ldb_kv_index_delete(module, msg);
661 	if (ret != LDB_SUCCESS) {
662 		goto done;
663 	}
664 
665 	ret = ldb_kv_modified(module, dn);
666 	if (ret != LDB_SUCCESS) {
667 		goto done;
668 	}
669 
670 done:
671 	talloc_free(msg);
672 	return ret;
673 }
674 
675 /*
676   delete a record from the database
677 */
ldb_kv_delete(struct ldb_kv_context * ctx)678 static int ldb_kv_delete(struct ldb_kv_context *ctx)
679 {
680 	struct ldb_module *module = ctx->module;
681 	struct ldb_request *req = ctx->req;
682 	int ret = LDB_SUCCESS;
683 
684 	ldb_request_set_state(req, LDB_ASYNC_PENDING);
685 
686 	if (ldb_kv_cache_load(module) != 0) {
687 		return LDB_ERR_OPERATIONS_ERROR;
688 	}
689 
690 	ret = ldb_kv_delete_internal(module, req->op.del.dn);
691 
692 	return ret;
693 }
694 
695 /*
696   find an element by attribute name. At the moment this does a linear search,
697   it should be re-coded to use a binary search once all places that modify
698   records guarantee sorted order
699 
700   return the index of the first matching element if found, otherwise -1
701 */
ldb_kv_find_element(const struct ldb_message * msg,const char * name)702 static int ldb_kv_find_element(const struct ldb_message *msg, const char *name)
703 {
704 	unsigned int i;
705 	for (i=0;i<msg->num_elements;i++) {
706 		if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
707 			return i;
708 		}
709 	}
710 	return -1;
711 }
712 
713 
714 /*
715   add an element to an existing record. Assumes a elements array that we
716   can call re-alloc on, and assumed that we can re-use the data pointers from
717   the passed in additional values. Use with care!
718 
719   returns 0 on success, -1 on failure (and sets errno)
720 */
ldb_kv_msg_add_element(struct ldb_message * msg,struct ldb_message_element * el)721 static int ldb_kv_msg_add_element(struct ldb_message *msg,
722 				  struct ldb_message_element *el)
723 {
724 	struct ldb_message_element *e2;
725 	unsigned int i;
726 
727 	if (el->num_values == 0) {
728 		/* nothing to do here - we don't add empty elements */
729 		return 0;
730 	}
731 
732 	e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
733 			      msg->num_elements+1);
734 	if (!e2) {
735 		errno = ENOMEM;
736 		return -1;
737 	}
738 
739 	msg->elements = e2;
740 
741 	e2 = &msg->elements[msg->num_elements];
742 
743 	e2->name = el->name;
744 	e2->flags = el->flags;
745 	e2->values = talloc_array(msg->elements,
746 				  struct ldb_val, el->num_values);
747 	if (!e2->values) {
748 		errno = ENOMEM;
749 		return -1;
750 	}
751 	for (i=0;i<el->num_values;i++) {
752 		e2->values[i] = el->values[i];
753 	}
754 	e2->num_values = el->num_values;
755 
756 	++msg->num_elements;
757 
758 	return 0;
759 }
760 
761 /*
762   delete all elements having a specified attribute name
763 */
ldb_kv_msg_delete_attribute(struct ldb_module * module,struct ldb_kv_private * ldb_kv,struct ldb_message * msg,const char * name)764 static int ldb_kv_msg_delete_attribute(struct ldb_module *module,
765 				       struct ldb_kv_private *ldb_kv,
766 				       struct ldb_message *msg,
767 				       const char *name)
768 {
769 	unsigned int i;
770 	int ret;
771 	struct ldb_message_element *el;
772 	bool is_special = ldb_dn_is_special(msg->dn);
773 
774 	if (!is_special && ldb_kv->cache->GUID_index_attribute != NULL &&
775 	    ldb_attr_cmp(name, ldb_kv->cache->GUID_index_attribute) == 0) {
776 		struct ldb_context *ldb = ldb_module_get_ctx(module);
777 		ldb_asprintf_errstring(ldb,
778 				       "Must not modify GUID "
779 				       "attribute %s (used as DB index)",
780 				       ldb_kv->cache->GUID_index_attribute);
781 		return LDB_ERR_CONSTRAINT_VIOLATION;
782 	}
783 
784 	el = ldb_msg_find_element(msg, name);
785 	if (el == NULL) {
786 		return LDB_ERR_NO_SUCH_ATTRIBUTE;
787 	}
788 	i = el - msg->elements;
789 
790 	ret = ldb_kv_index_del_element(module, ldb_kv, msg, el);
791 	if (ret != LDB_SUCCESS) {
792 		return ret;
793 	}
794 
795 	talloc_free(el->values);
796 	if (msg->num_elements > (i+1)) {
797 		memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
798 	}
799 	msg->num_elements--;
800 	msg->elements = talloc_realloc(msg, msg->elements,
801 				       struct ldb_message_element,
802 				       msg->num_elements);
803 	return LDB_SUCCESS;
804 }
805 
806 /*
807   delete all elements matching an attribute name/value
808 
809   return LDB Error on failure
810 */
ldb_kv_msg_delete_element(struct ldb_module * module,struct ldb_kv_private * ldb_kv,struct ldb_message * msg,const char * name,const struct ldb_val * val)811 static int ldb_kv_msg_delete_element(struct ldb_module *module,
812 				     struct ldb_kv_private *ldb_kv,
813 				     struct ldb_message *msg,
814 				     const char *name,
815 				     const struct ldb_val *val)
816 {
817 	struct ldb_context *ldb = ldb_module_get_ctx(module);
818 	unsigned int i;
819 	int found, ret;
820 	struct ldb_message_element *el;
821 	const struct ldb_schema_attribute *a;
822 
823 	found = ldb_kv_find_element(msg, name);
824 	if (found == -1) {
825 		return LDB_ERR_NO_SUCH_ATTRIBUTE;
826 	}
827 
828 	i = (unsigned int) found;
829 	el = &(msg->elements[i]);
830 
831 	a = ldb_schema_attribute_by_name(ldb, el->name);
832 
833 	for (i=0;i<el->num_values;i++) {
834 		bool matched;
835 		if (a->syntax->operator_fn) {
836 			ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
837 						     &el->values[i], val, &matched);
838 			if (ret != LDB_SUCCESS) return ret;
839 		} else {
840 			matched = (a->syntax->comparison_fn(ldb, ldb,
841 							    &el->values[i], val) == 0);
842 		}
843 		if (matched) {
844 			if (el->num_values == 1) {
845 				return ldb_kv_msg_delete_attribute(
846 				    module, ldb_kv, msg, name);
847 			}
848 
849 			ret =
850 			    ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
851 			if (ret != LDB_SUCCESS) {
852 				return ret;
853 			}
854 
855 			if (i<el->num_values-1) {
856 				memmove(&el->values[i], &el->values[i+1],
857 					sizeof(el->values[i])*
858 						(el->num_values-(i+1)));
859 			}
860 			el->num_values--;
861 
862 			/* per definition we find in a canonicalised message an
863 			   attribute value only once. So we are finished here */
864 			return LDB_SUCCESS;
865 		}
866 	}
867 
868 	/* Not found */
869 	return LDB_ERR_NO_SUCH_ATTRIBUTE;
870 }
871 
872 /*
873   modify a record - internal interface
874 
875   yuck - this is O(n^2). Luckily n is usually small so we probably
876   get away with it, but if we ever have really large attribute lists
877   then we'll need to look at this again
878 
879   'req' is optional, and is used to specify controls if supplied
880 */
ldb_kv_modify_internal(struct ldb_module * module,const struct ldb_message * msg,struct ldb_request * req)881 int ldb_kv_modify_internal(struct ldb_module *module,
882 			   const struct ldb_message *msg,
883 			   struct ldb_request *req)
884 {
885 	struct ldb_context *ldb = ldb_module_get_ctx(module);
886 	void *data = ldb_module_get_private(module);
887 	struct ldb_kv_private *ldb_kv =
888 	    talloc_get_type(data, struct ldb_kv_private);
889 	struct ldb_message *msg2;
890 	unsigned int i, j;
891 	int ret = LDB_SUCCESS, idx;
892 	struct ldb_control *control_permissive = NULL;
893 	TALLOC_CTX *mem_ctx = talloc_new(req);
894 
895 	if (mem_ctx == NULL) {
896 		return ldb_module_oom(module);
897 	}
898 
899 	if (req) {
900 		control_permissive = ldb_request_get_control(req,
901 					LDB_CONTROL_PERMISSIVE_MODIFY_OID);
902 	}
903 
904 	msg2 = ldb_msg_new(mem_ctx);
905 	if (msg2 == NULL) {
906 		ret = LDB_ERR_OTHER;
907 		goto done;
908 	}
909 
910 	ret = ldb_kv_search_dn1(
911 	    module, msg->dn, msg2, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
912 	if (ret != LDB_SUCCESS) {
913 		goto done;
914 	}
915 
916 	for (i=0; i<msg->num_elements; i++) {
917 		struct ldb_message_element *el = &msg->elements[i], *el2;
918 		struct ldb_val *vals;
919 		const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
920 		const char *dn;
921 		uint32_t options = 0;
922 		if (control_permissive != NULL) {
923 			options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
924 		}
925 
926 		switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
927 		case LDB_FLAG_MOD_ADD:
928 
929 			if (el->num_values == 0) {
930 				ldb_asprintf_errstring(ldb,
931 						       "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
932 						       el->name, ldb_dn_get_linearized(msg2->dn));
933 				ret = LDB_ERR_CONSTRAINT_VIOLATION;
934 				goto done;
935 			}
936 
937 			/* make a copy of the array so that a permissive
938 			 * control can remove duplicates without changing the
939 			 * original values, but do not copy data as we do not
940 			 * need to keep it around once the operation is
941 			 * finished */
942 			if (control_permissive) {
943 				el = talloc(msg2, struct ldb_message_element);
944 				if (!el) {
945 					ret = LDB_ERR_OTHER;
946 					goto done;
947 				}
948 				*el = msg->elements[i];
949 				el->values = talloc_array(el, struct ldb_val, el->num_values);
950 				if (el->values == NULL) {
951 					ret = LDB_ERR_OTHER;
952 					goto done;
953 				}
954 				for (j = 0; j < el->num_values; j++) {
955 					el->values[j] = msg->elements[i].values[j];
956 				}
957 			}
958 
959 			if (el->num_values > 1 && ldb_kv_single_valued(a, el)) {
960 				ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
961 						       el->name, ldb_dn_get_linearized(msg2->dn));
962 				ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
963 				goto done;
964 			}
965 
966 			/* Checks if element already exists */
967 			idx = ldb_kv_find_element(msg2, el->name);
968 			if (idx == -1) {
969 				if (ldb_kv_msg_add_element(msg2, el) != 0) {
970 					ret = LDB_ERR_OTHER;
971 					goto done;
972 				}
973 				ret = ldb_kv_index_add_element(
974 				    module, ldb_kv, msg2, el);
975 				if (ret != LDB_SUCCESS) {
976 					goto done;
977 				}
978 			} else {
979 				j = (unsigned int) idx;
980 				el2 = &(msg2->elements[j]);
981 
982 				/* We cannot add another value on a existing one
983 				   if the attribute is single-valued */
984 				if (ldb_kv_single_valued(a, el)) {
985 					ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
986 						               el->name, ldb_dn_get_linearized(msg2->dn));
987 					ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
988 					goto done;
989 				}
990 
991 				/* Check that values don't exist yet on multi-
992 				   valued attributes or aren't provided twice */
993 				if (!(el->flags &
994 				      LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
995 					struct ldb_val *duplicate = NULL;
996 					ret = ldb_msg_find_common_values(ldb,
997 									 msg2,
998 									 el,
999 									 el2,
1000 									 options);
1001 
1002 					if (ret ==
1003 					    LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1004 						ldb_asprintf_errstring(ldb,
1005 							"attribute '%s': value "
1006 							"#%u on '%s' already "
1007 							"exists", el->name, j,
1008 							ldb_dn_get_linearized(msg2->dn));
1009 						goto done;
1010 					} else if (ret != LDB_SUCCESS) {
1011 						goto done;
1012 					}
1013 
1014 					ret = ldb_msg_find_duplicate_val(
1015 						ldb, msg2, el, &duplicate, 0);
1016 					if (ret != LDB_SUCCESS) {
1017 						goto done;
1018 					}
1019 					if (duplicate != NULL) {
1020 						ldb_asprintf_errstring(
1021 							ldb,
1022 							"attribute '%s': value "
1023 							"'%.*s' on '%s' "
1024 							"provided more than "
1025 							"once in ADD",
1026 							el->name,
1027 							(int)duplicate->length,
1028 							duplicate->data,
1029 							ldb_dn_get_linearized(msg->dn));
1030 						ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1031 						goto done;
1032 					}
1033 				}
1034 
1035 				/* Now combine existing and new values to a new
1036 				   attribute record */
1037 				vals = talloc_realloc(msg2->elements,
1038 						      el2->values, struct ldb_val,
1039 						      el2->num_values + el->num_values);
1040 				if (vals == NULL) {
1041 					ldb_oom(ldb);
1042 					ret = LDB_ERR_OTHER;
1043 					goto done;
1044 				}
1045 
1046 				for (j=0; j<el->num_values; j++) {
1047 					vals[el2->num_values + j] =
1048 						ldb_val_dup(vals, &el->values[j]);
1049 				}
1050 
1051 				el2->values = vals;
1052 				el2->num_values += el->num_values;
1053 
1054 				ret = ldb_kv_index_add_element(
1055 				    module, ldb_kv, msg2, el);
1056 				if (ret != LDB_SUCCESS) {
1057 					goto done;
1058 				}
1059 			}
1060 
1061 			break;
1062 
1063 		case LDB_FLAG_MOD_REPLACE:
1064 
1065 			if (el->num_values > 1 && ldb_kv_single_valued(a, el)) {
1066 				ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1067 						       el->name, ldb_dn_get_linearized(msg2->dn));
1068 				ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1069 				goto done;
1070 			}
1071 
1072 			/*
1073 			 * We don't need to check this if we have been
1074 			 * pre-screened by the repl_meta_data module
1075 			 * in Samba, or someone else who can claim to
1076 			 * know what they are doing.
1077 			 */
1078 			if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1079 				struct ldb_val *duplicate = NULL;
1080 
1081 				ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1082 								 &duplicate, 0);
1083 				if (ret != LDB_SUCCESS) {
1084 					goto done;
1085 				}
1086 				if (duplicate != NULL) {
1087 					ldb_asprintf_errstring(
1088 						ldb,
1089 						"attribute '%s': value '%.*s' "
1090 						"on '%s' provided more than "
1091 						"once in REPLACE",
1092 						el->name,
1093 						(int)duplicate->length,
1094 						duplicate->data,
1095 						ldb_dn_get_linearized(msg2->dn));
1096 					ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1097 					goto done;
1098 				}
1099 			}
1100 
1101 			/* Checks if element already exists */
1102 			idx = ldb_kv_find_element(msg2, el->name);
1103 			if (idx != -1) {
1104 				j = (unsigned int) idx;
1105 				el2 = &(msg2->elements[j]);
1106 
1107 				/* we consider two elements to be
1108 				 * equal only if the order
1109 				 * matches. This allows dbcheck to
1110 				 * fix the ordering on attributes
1111 				 * where order matters, such as
1112 				 * objectClass
1113 				 */
1114 				if (ldb_msg_element_equal_ordered(el, el2)) {
1115 					continue;
1116 				}
1117 
1118 				/* Delete the attribute if it exists in the DB */
1119 				if (ldb_kv_msg_delete_attribute(
1120 					module, ldb_kv, msg2, el->name) != 0) {
1121 					ret = LDB_ERR_OTHER;
1122 					goto done;
1123 				}
1124 			}
1125 
1126 			/* Recreate it with the new values */
1127 			if (ldb_kv_msg_add_element(msg2, el) != 0) {
1128 				ret = LDB_ERR_OTHER;
1129 				goto done;
1130 			}
1131 
1132 			ret =
1133 			    ldb_kv_index_add_element(module, ldb_kv, msg2, el);
1134 			if (ret != LDB_SUCCESS) {
1135 				goto done;
1136 			}
1137 
1138 			break;
1139 
1140 		case LDB_FLAG_MOD_DELETE:
1141 			dn = ldb_dn_get_linearized(msg2->dn);
1142 			if (dn == NULL) {
1143 				ret = LDB_ERR_OTHER;
1144 				goto done;
1145 			}
1146 
1147 			if (msg->elements[i].num_values == 0) {
1148 				/* Delete the whole attribute */
1149 				ret = ldb_kv_msg_delete_attribute(
1150 				    module,
1151 				    ldb_kv,
1152 				    msg2,
1153 				    msg->elements[i].name);
1154 				if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1155 				    control_permissive) {
1156 					ret = LDB_SUCCESS;
1157 				} else {
1158 					ldb_asprintf_errstring(ldb,
1159 							       "attribute '%s': no such attribute for delete on '%s'",
1160 							       msg->elements[i].name, dn);
1161 				}
1162 				if (ret != LDB_SUCCESS) {
1163 					goto done;
1164 				}
1165 			} else {
1166 				/* Delete specified values from an attribute */
1167 				for (j=0; j < msg->elements[i].num_values; j++) {
1168 					ret = ldb_kv_msg_delete_element(
1169 					    module,
1170 					    ldb_kv,
1171 					    msg2,
1172 					    msg->elements[i].name,
1173 					    &msg->elements[i].values[j]);
1174 					if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1175 					    control_permissive) {
1176 						ret = LDB_SUCCESS;
1177 					} else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1178 						ldb_asprintf_errstring(ldb,
1179 								       "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1180 								       msg->elements[i].name, dn);
1181 					}
1182 					if (ret != LDB_SUCCESS) {
1183 						goto done;
1184 					}
1185 				}
1186 			}
1187 			break;
1188 		default:
1189 			ldb_asprintf_errstring(ldb,
1190 					       "attribute '%s': invalid modify flags on '%s': 0x%x",
1191 					       msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1192 					       msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1193 			ret = LDB_ERR_PROTOCOL_ERROR;
1194 			goto done;
1195 		}
1196 	}
1197 
1198 	ret = ldb_kv_store(module, msg2, TDB_MODIFY);
1199 	if (ret != LDB_SUCCESS) {
1200 		goto done;
1201 	}
1202 
1203 	ret = ldb_kv_modified(module, msg2->dn);
1204 	if (ret != LDB_SUCCESS) {
1205 		goto done;
1206 	}
1207 
1208 done:
1209 	TALLOC_FREE(mem_ctx);
1210 	return ret;
1211 }
1212 
1213 /*
1214   modify a record
1215 */
ldb_kv_modify(struct ldb_kv_context * ctx)1216 static int ldb_kv_modify(struct ldb_kv_context *ctx)
1217 {
1218 	struct ldb_module *module = ctx->module;
1219 	struct ldb_request *req = ctx->req;
1220 	int ret = LDB_SUCCESS;
1221 
1222 	ret = ldb_kv_check_special_dn(module, req->op.mod.message);
1223 	if (ret != LDB_SUCCESS) {
1224 		return ret;
1225 	}
1226 
1227 	ldb_request_set_state(req, LDB_ASYNC_PENDING);
1228 
1229 	if (ldb_kv_cache_load(module) != 0) {
1230 		return LDB_ERR_OPERATIONS_ERROR;
1231 	}
1232 
1233 	ret = ldb_kv_modify_internal(module, req->op.mod.message, req);
1234 
1235 	return ret;
1236 }
1237 
1238 /*
1239   rename a record
1240 */
ldb_kv_rename(struct ldb_kv_context * ctx)1241 static int ldb_kv_rename(struct ldb_kv_context *ctx)
1242 {
1243 	struct ldb_module *module = ctx->module;
1244 	void *data = ldb_module_get_private(module);
1245 	struct ldb_kv_private *ldb_kv =
1246 	    talloc_get_type(data, struct ldb_kv_private);
1247 	struct ldb_request *req = ctx->req;
1248 	struct ldb_message *msg;
1249 	int ret = LDB_SUCCESS;
1250 	struct ldb_val  key, key_old;
1251 	struct ldb_dn *db_dn;
1252 	bool valid_dn = false;
1253 
1254 	ldb_request_set_state(req, LDB_ASYNC_PENDING);
1255 
1256 	if (ldb_kv_cache_load(ctx->module) != 0) {
1257 		return LDB_ERR_OPERATIONS_ERROR;
1258 	}
1259 
1260 	msg = ldb_msg_new(ctx);
1261 	if (msg == NULL) {
1262 		return LDB_ERR_OPERATIONS_ERROR;
1263 	}
1264 
1265 	/* Check the new DN is reasonable */
1266 	valid_dn = ldb_dn_validate(req->op.rename.newdn);
1267 	if (valid_dn == false) {
1268 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
1269 				       "Invalid New DN: %s",
1270 				       ldb_dn_get_linearized(req->op.rename.newdn));
1271 		return LDB_ERR_INVALID_DN_SYNTAX;
1272 	}
1273 
1274 	/* we need to fetch the old record to re-add under the new name */
1275 	ret = ldb_kv_search_dn1(module,
1276 				req->op.rename.olddn,
1277 				msg,
1278 				LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1279 	if (ret == LDB_ERR_INVALID_DN_SYNTAX) {
1280 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
1281 				       "Invalid Old DN: %s",
1282 				       ldb_dn_get_linearized(req->op.rename.newdn));
1283 		return ret;
1284 	} else if (ret != LDB_SUCCESS) {
1285 		/* not finding the old record is an error */
1286 		return ret;
1287 	}
1288 
1289 	/* We need to, before changing the DB, check if the new DN
1290 	 * exists, so we can return this error to the caller with an
1291 	 * unmodified DB
1292 	 *
1293 	 * Even in GUID index mode we use ltdb_key_dn() as we are
1294 	 * trying to figure out if this is just a case rename
1295 	 */
1296 	key = ldb_kv_key_dn(module, msg, req->op.rename.newdn);
1297 	if (!key.data) {
1298 		talloc_free(msg);
1299 		return LDB_ERR_OPERATIONS_ERROR;
1300 	}
1301 
1302 	key_old = ldb_kv_key_dn(module, msg, req->op.rename.olddn);
1303 	if (!key_old.data) {
1304 		talloc_free(msg);
1305 		talloc_free(key.data);
1306 		return LDB_ERR_OPERATIONS_ERROR;
1307 	}
1308 
1309 	/*
1310 	 * Only declare a conflict if the new DN already exists,
1311 	 * and it isn't a case change on the old DN
1312 	 */
1313 	if (key_old.length != key.length
1314 	    || memcmp(key.data, key_old.data, key.length) != 0) {
1315 		ret = ldb_kv_search_base(
1316 		    module, msg, req->op.rename.newdn, &db_dn);
1317 		if (ret == LDB_SUCCESS) {
1318 			ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1319 		} else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1320 			ret = LDB_SUCCESS;
1321 		}
1322 	}
1323 
1324 	/* finding the new record already in the DB is an error */
1325 
1326 	if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1327 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
1328 				       "Entry %s already exists",
1329 				       ldb_dn_get_linearized(req->op.rename.newdn));
1330 	}
1331 	if (ret != LDB_SUCCESS) {
1332 		talloc_free(key_old.data);
1333 		talloc_free(key.data);
1334 		talloc_free(msg);
1335 		return ret;
1336 	}
1337 
1338 	talloc_free(key_old.data);
1339 	talloc_free(key.data);
1340 
1341 	/* Always delete first then add, to avoid conflicts with
1342 	 * unique indexes. We rely on the transaction to make this
1343 	 * atomic
1344 	 */
1345 	ret = ldb_kv_delete_internal(module, msg->dn);
1346 	if (ret != LDB_SUCCESS) {
1347 		talloc_free(msg);
1348 		return ret;
1349 	}
1350 
1351 	msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1352 	if (msg->dn == NULL) {
1353 		talloc_free(msg);
1354 		return LDB_ERR_OPERATIONS_ERROR;
1355 	}
1356 
1357 	/* We don't check single value as we can have more than 1 with
1358 	 * deleted attributes. We could go through all elements but that's
1359 	 * maybe not the most efficient way
1360 	 */
1361 	ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
1362 
1363 	talloc_free(msg);
1364 
1365 	return ret;
1366 }
1367 
ldb_kv_start_trans(struct ldb_module * module)1368 static int ldb_kv_start_trans(struct ldb_module *module)
1369 {
1370 	void *data = ldb_module_get_private(module);
1371 	struct ldb_kv_private *ldb_kv =
1372 	    talloc_get_type(data, struct ldb_kv_private);
1373 
1374 	pid_t pid = getpid();
1375 
1376 	if (ldb_kv->pid != pid) {
1377 		ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
1378 				       __location__
1379 				       ": Reusing ldb opend by pid %d in "
1380 				       "process %d\n",
1381 				       ldb_kv->pid,
1382 				       pid);
1383 		return LDB_ERR_PROTOCOL_ERROR;
1384 	}
1385 
1386 	/* Do not take out the transaction lock on a read-only DB */
1387 	if (ldb_kv->read_only) {
1388 		return LDB_ERR_UNWILLING_TO_PERFORM;
1389 	}
1390 
1391 	if (ldb_kv->kv_ops->begin_write(ldb_kv) != 0) {
1392 		return ldb_kv->kv_ops->error(ldb_kv);
1393 	}
1394 
1395 	ldb_kv_index_transaction_start(module);
1396 
1397 	ldb_kv->reindex_failed = false;
1398 
1399 	return LDB_SUCCESS;
1400 }
1401 
1402 /*
1403  * Forward declaration to allow prepare_commit to in fact abort the
1404  * transaction
1405  */
1406 static int ldb_kv_del_trans(struct ldb_module *module);
1407 
ldb_kv_prepare_commit(struct ldb_module * module)1408 static int ldb_kv_prepare_commit(struct ldb_module *module)
1409 {
1410 	int ret;
1411 	void *data = ldb_module_get_private(module);
1412 	struct ldb_kv_private *ldb_kv =
1413 	    talloc_get_type(data, struct ldb_kv_private);
1414 	pid_t pid = getpid();
1415 
1416 	if (ldb_kv->pid != pid) {
1417 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
1418 				       __location__
1419 				       ": Reusing ldb opend by pid %d in "
1420 				       "process %d\n",
1421 				       ldb_kv->pid,
1422 				       pid);
1423 		return LDB_ERR_PROTOCOL_ERROR;
1424 	}
1425 
1426 	if (!ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1427 		ldb_set_errstring(ldb_module_get_ctx(module),
1428 				  "ltdb_prepare_commit() called "
1429 				  "without transaction active");
1430 		return LDB_ERR_OPERATIONS_ERROR;
1431 	}
1432 
1433 	/*
1434 	 * Check if the last re-index failed.
1435 	 *
1436 	 * This can happen if for example a duplicate value was marked
1437 	 * unique.  We must not write a partial re-index into the DB.
1438 	 */
1439 	if (ldb_kv->reindex_failed) {
1440 		/*
1441 		 * We must instead abort the transaction so we get the
1442 		 * old values and old index back
1443 		 */
1444 		ldb_kv_del_trans(module);
1445 		ldb_set_errstring(ldb_module_get_ctx(module),
1446 				  "Failure during re-index, so "
1447 				  "transaction must be aborted.");
1448 		return LDB_ERR_OPERATIONS_ERROR;
1449 	}
1450 
1451 	ret = ldb_kv_index_transaction_commit(module);
1452 	if (ret != LDB_SUCCESS) {
1453 		ldb_kv->kv_ops->abort_write(ldb_kv);
1454 		return ret;
1455 	}
1456 
1457 	if (ldb_kv->kv_ops->prepare_write(ldb_kv) != 0) {
1458 		ret = ldb_kv->kv_ops->error(ldb_kv);
1459 		ldb_debug_set(ldb_module_get_ctx(module),
1460 			      LDB_DEBUG_FATAL,
1461 			      "Failure during "
1462 			      "prepare_write): %s -> %s",
1463 			      ldb_kv->kv_ops->errorstr(ldb_kv),
1464 			      ldb_strerror(ret));
1465 		return ret;
1466 	}
1467 
1468 	ldb_kv->prepared_commit = true;
1469 
1470 	return LDB_SUCCESS;
1471 }
1472 
ldb_kv_end_trans(struct ldb_module * module)1473 static int ldb_kv_end_trans(struct ldb_module *module)
1474 {
1475 	int ret;
1476 	void *data = ldb_module_get_private(module);
1477 	struct ldb_kv_private *ldb_kv =
1478 	    talloc_get_type(data, struct ldb_kv_private);
1479 
1480 	if (!ldb_kv->prepared_commit) {
1481 		ret = ldb_kv_prepare_commit(module);
1482 		if (ret != LDB_SUCCESS) {
1483 			return ret;
1484 		}
1485 	}
1486 
1487 	ldb_kv->prepared_commit = false;
1488 
1489 	if (ldb_kv->kv_ops->finish_write(ldb_kv) != 0) {
1490 		ret = ldb_kv->kv_ops->error(ldb_kv);
1491 		ldb_asprintf_errstring(
1492 		    ldb_module_get_ctx(module),
1493 		    "Failure during tdb_transaction_commit(): %s -> %s",
1494 		    ldb_kv->kv_ops->errorstr(ldb_kv),
1495 		    ldb_strerror(ret));
1496 		return ret;
1497 	}
1498 
1499 	return LDB_SUCCESS;
1500 }
1501 
ldb_kv_del_trans(struct ldb_module * module)1502 static int ldb_kv_del_trans(struct ldb_module *module)
1503 {
1504 	void *data = ldb_module_get_private(module);
1505 	struct ldb_kv_private *ldb_kv =
1506 	    talloc_get_type(data, struct ldb_kv_private);
1507 
1508 	if (ldb_kv_index_transaction_cancel(module) != 0) {
1509 		ldb_kv->kv_ops->abort_write(ldb_kv);
1510 		return ldb_kv->kv_ops->error(ldb_kv);
1511 	}
1512 
1513 	ldb_kv->kv_ops->abort_write(ldb_kv);
1514 	return LDB_SUCCESS;
1515 }
1516 
1517 /*
1518   return sequenceNumber from @BASEINFO
1519 */
ldb_kv_sequence_number(struct ldb_kv_context * ctx,struct ldb_extended ** ext)1520 static int ldb_kv_sequence_number(struct ldb_kv_context *ctx,
1521 				  struct ldb_extended **ext)
1522 {
1523 	struct ldb_context *ldb;
1524 	struct ldb_module *module = ctx->module;
1525 	struct ldb_request *req = ctx->req;
1526 	void *data = ldb_module_get_private(module);
1527 	struct ldb_kv_private *ldb_kv =
1528 	    talloc_get_type(data, struct ldb_kv_private);
1529 	TALLOC_CTX *tmp_ctx = NULL;
1530 	struct ldb_seqnum_request *seq;
1531 	struct ldb_seqnum_result *res;
1532 	struct ldb_message *msg = NULL;
1533 	struct ldb_dn *dn;
1534 	const char *date;
1535 	int ret = LDB_SUCCESS;
1536 
1537 	ldb = ldb_module_get_ctx(module);
1538 
1539 	seq = talloc_get_type(req->op.extended.data,
1540 				struct ldb_seqnum_request);
1541 	if (seq == NULL) {
1542 		return LDB_ERR_OPERATIONS_ERROR;
1543 	}
1544 
1545 	ldb_request_set_state(req, LDB_ASYNC_PENDING);
1546 
1547 	if (ldb_kv->kv_ops->lock_read(module) != 0) {
1548 		return LDB_ERR_OPERATIONS_ERROR;
1549 	}
1550 
1551 	res = talloc_zero(req, struct ldb_seqnum_result);
1552 	if (res == NULL) {
1553 		ret = LDB_ERR_OPERATIONS_ERROR;
1554 		goto done;
1555 	}
1556 
1557 	tmp_ctx = talloc_new(req);
1558 	if (tmp_ctx == NULL) {
1559 		ret = LDB_ERR_OPERATIONS_ERROR;
1560 		goto done;
1561 	}
1562 
1563 	dn = ldb_dn_new(tmp_ctx, ldb, LDB_KV_BASEINFO);
1564 	if (dn == NULL) {
1565 		ret = LDB_ERR_OPERATIONS_ERROR;
1566 		goto done;
1567 	}
1568 
1569 	msg = ldb_msg_new(tmp_ctx);
1570 	if (msg == NULL) {
1571 		ret = LDB_ERR_OPERATIONS_ERROR;
1572 		goto done;
1573 	}
1574 
1575 	ret = ldb_kv_search_dn1(module, dn, msg, 0);
1576 	if (ret != LDB_SUCCESS) {
1577 		goto done;
1578 	}
1579 
1580 	switch (seq->type) {
1581 	case LDB_SEQ_HIGHEST_SEQ:
1582 		res->seq_num = ldb_msg_find_attr_as_uint64(msg, LDB_KV_SEQUENCE_NUMBER, 0);
1583 		break;
1584 	case LDB_SEQ_NEXT:
1585 		res->seq_num = ldb_msg_find_attr_as_uint64(msg, LDB_KV_SEQUENCE_NUMBER, 0);
1586 		res->seq_num++;
1587 		break;
1588 	case LDB_SEQ_HIGHEST_TIMESTAMP:
1589 		date = ldb_msg_find_attr_as_string(msg, LDB_KV_MOD_TIMESTAMP, NULL);
1590 		if (date) {
1591 			res->seq_num = ldb_string_to_time(date);
1592 		} else {
1593 			res->seq_num = 0;
1594 			/* zero is as good as anything when we don't know */
1595 		}
1596 		break;
1597 	}
1598 
1599 	*ext = talloc_zero(req, struct ldb_extended);
1600 	if (*ext == NULL) {
1601 		ret = LDB_ERR_OPERATIONS_ERROR;
1602 		goto done;
1603 	}
1604 	(*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1605 	(*ext)->data = talloc_steal(*ext, res);
1606 
1607 done:
1608 	talloc_free(tmp_ctx);
1609 
1610 	ldb_kv->kv_ops->unlock_read(module);
1611 	return ret;
1612 }
1613 
ldb_kv_request_done(struct ldb_kv_context * ctx,int error)1614 static void ldb_kv_request_done(struct ldb_kv_context *ctx, int error)
1615 {
1616 	struct ldb_context *ldb;
1617 	struct ldb_request *req;
1618 	struct ldb_reply *ares;
1619 
1620 	ldb = ldb_module_get_ctx(ctx->module);
1621 	req = ctx->req;
1622 
1623 	/* if we already returned an error just return */
1624 	if (ldb_request_get_status(req) != LDB_SUCCESS) {
1625 		return;
1626 	}
1627 
1628 	ares = talloc_zero(req, struct ldb_reply);
1629 	if (!ares) {
1630 		ldb_oom(ldb);
1631 		req->callback(req, NULL);
1632 		return;
1633 	}
1634 	ares->type = LDB_REPLY_DONE;
1635 	ares->error = error;
1636 
1637 	req->callback(req, ares);
1638 }
1639 
ldb_kv_timeout(struct tevent_context * ev,struct tevent_timer * te,struct timeval t,void * private_data)1640 static void ldb_kv_timeout(struct tevent_context *ev,
1641 			   struct tevent_timer *te,
1642 			   struct timeval t,
1643 			   void *private_data)
1644 {
1645 	struct ldb_kv_context *ctx;
1646 	ctx = talloc_get_type(private_data, struct ldb_kv_context);
1647 
1648 	if (!ctx->request_terminated) {
1649 		/* request is done now */
1650 		ldb_kv_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1651 	}
1652 
1653 	if (ctx->spy) {
1654 		/* neutralize the spy */
1655 		ctx->spy->ctx = NULL;
1656 		ctx->spy = NULL;
1657 	}
1658 	talloc_free(ctx);
1659 }
1660 
ldb_kv_request_extended_done(struct ldb_kv_context * ctx,struct ldb_extended * ext,int error)1661 static void ldb_kv_request_extended_done(struct ldb_kv_context *ctx,
1662 					 struct ldb_extended *ext,
1663 					 int error)
1664 {
1665 	struct ldb_context *ldb;
1666 	struct ldb_request *req;
1667 	struct ldb_reply *ares;
1668 
1669 	ldb = ldb_module_get_ctx(ctx->module);
1670 	req = ctx->req;
1671 
1672 	/* if we already returned an error just return */
1673 	if (ldb_request_get_status(req) != LDB_SUCCESS) {
1674 		return;
1675 	}
1676 
1677 	ares = talloc_zero(req, struct ldb_reply);
1678 	if (!ares) {
1679 		ldb_oom(ldb);
1680 		req->callback(req, NULL);
1681 		return;
1682 	}
1683 	ares->type = LDB_REPLY_DONE;
1684 	ares->response = ext;
1685 	ares->error = error;
1686 
1687 	req->callback(req, ares);
1688 }
1689 
ldb_kv_handle_extended(struct ldb_kv_context * ctx)1690 static void ldb_kv_handle_extended(struct ldb_kv_context *ctx)
1691 {
1692 	struct ldb_extended *ext = NULL;
1693 	int ret;
1694 
1695 	if (strcmp(ctx->req->op.extended.oid,
1696 		   LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1697 		/* get sequence number */
1698 		ret = ldb_kv_sequence_number(ctx, &ext);
1699 	} else {
1700 		/* not recognized */
1701 		ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1702 	}
1703 
1704 	ldb_kv_request_extended_done(ctx, ext, ret);
1705 }
1706 
ldb_kv_callback(struct tevent_context * ev,struct tevent_timer * te,struct timeval t,void * private_data)1707 static void ldb_kv_callback(struct tevent_context *ev,
1708 			    struct tevent_timer *te,
1709 			    struct timeval t,
1710 			    void *private_data)
1711 {
1712 	struct ldb_kv_context *ctx;
1713 	int ret;
1714 
1715 	ctx = talloc_get_type(private_data, struct ldb_kv_context);
1716 
1717 	if (ctx->request_terminated) {
1718 		goto done;
1719 	}
1720 
1721 	switch (ctx->req->operation) {
1722 	case LDB_SEARCH:
1723 		ret = ldb_kv_search(ctx);
1724 		break;
1725 	case LDB_ADD:
1726 		ret = ldb_kv_add(ctx);
1727 		break;
1728 	case LDB_MODIFY:
1729 		ret = ldb_kv_modify(ctx);
1730 		break;
1731 	case LDB_DELETE:
1732 		ret = ldb_kv_delete(ctx);
1733 		break;
1734 	case LDB_RENAME:
1735 		ret = ldb_kv_rename(ctx);
1736 		break;
1737 	case LDB_EXTENDED:
1738 		ldb_kv_handle_extended(ctx);
1739 		goto done;
1740 	default:
1741 		/* no other op supported */
1742 		ret = LDB_ERR_PROTOCOL_ERROR;
1743 	}
1744 
1745 	if (!ctx->request_terminated) {
1746 		/* request is done now */
1747 		ldb_kv_request_done(ctx, ret);
1748 	}
1749 
1750 done:
1751 	if (ctx->spy) {
1752 		/* neutralize the spy */
1753 		ctx->spy->ctx = NULL;
1754 		ctx->spy = NULL;
1755 	}
1756 	talloc_free(ctx);
1757 }
1758 
ldb_kv_request_destructor(void * ptr)1759 static int ldb_kv_request_destructor(void *ptr)
1760 {
1761 	struct ldb_kv_req_spy *spy =
1762 	    talloc_get_type(ptr, struct ldb_kv_req_spy);
1763 
1764 	if (spy->ctx != NULL) {
1765 		spy->ctx->spy = NULL;
1766 		spy->ctx->request_terminated = true;
1767 		spy->ctx = NULL;
1768 	}
1769 
1770 	return 0;
1771 }
1772 
ldb_kv_handle_request(struct ldb_module * module,struct ldb_request * req)1773 static int ldb_kv_handle_request(struct ldb_module *module,
1774 				 struct ldb_request *req)
1775 {
1776 	struct ldb_control *control_permissive;
1777 	struct ldb_context *ldb;
1778 	struct tevent_context *ev;
1779 	struct ldb_kv_context *ac;
1780 	struct tevent_timer *te;
1781 	struct timeval tv;
1782 	unsigned int i;
1783 
1784 	ldb = ldb_module_get_ctx(module);
1785 
1786 	control_permissive = ldb_request_get_control(req,
1787 					LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1788 
1789 	for (i = 0; req->controls && req->controls[i]; i++) {
1790 		if (req->controls[i]->critical &&
1791 		    req->controls[i] != control_permissive) {
1792 			ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1793 					       req->controls[i]->oid);
1794 			return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1795 		}
1796 	}
1797 
1798 	if (req->starttime == 0 || req->timeout == 0) {
1799 		ldb_set_errstring(ldb, "Invalid timeout settings");
1800 		return LDB_ERR_TIME_LIMIT_EXCEEDED;
1801 	}
1802 
1803 	ev = ldb_handle_get_event_context(req->handle);
1804 
1805 	ac = talloc_zero(ldb, struct ldb_kv_context);
1806 	if (ac == NULL) {
1807 		ldb_oom(ldb);
1808 		return LDB_ERR_OPERATIONS_ERROR;
1809 	}
1810 
1811 	ac->module = module;
1812 	ac->req = req;
1813 
1814 	tv.tv_sec = 0;
1815 	tv.tv_usec = 0;
1816 	te = tevent_add_timer(ev, ac, tv, ldb_kv_callback, ac);
1817 	if (NULL == te) {
1818 		talloc_free(ac);
1819 		return LDB_ERR_OPERATIONS_ERROR;
1820 	}
1821 
1822 	if (req->timeout > 0) {
1823 		tv.tv_sec = req->starttime + req->timeout;
1824 		tv.tv_usec = 0;
1825 		ac->timeout_event =
1826 		    tevent_add_timer(ev, ac, tv, ldb_kv_timeout, ac);
1827 		if (NULL == ac->timeout_event) {
1828 			talloc_free(ac);
1829 			return LDB_ERR_OPERATIONS_ERROR;
1830 		}
1831 	}
1832 
1833 	/* set a spy so that we do not try to use the request context
1834 	 * if it is freed before ltdb_callback fires */
1835 	ac->spy = talloc(req, struct ldb_kv_req_spy);
1836 	if (NULL == ac->spy) {
1837 		talloc_free(ac);
1838 		return LDB_ERR_OPERATIONS_ERROR;
1839 	}
1840 	ac->spy->ctx = ac;
1841 
1842 	talloc_set_destructor((TALLOC_CTX *)ac->spy, ldb_kv_request_destructor);
1843 
1844 	return LDB_SUCCESS;
1845 }
1846 
ldb_kv_init_rootdse(struct ldb_module * module)1847 static int ldb_kv_init_rootdse(struct ldb_module *module)
1848 {
1849 	/* ignore errors on this - we expect it for non-sam databases */
1850 	ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1851 
1852 	/* there can be no module beyond the backend, just return */
1853 	return LDB_SUCCESS;
1854 }
1855 
ldb_kv_lock_read(struct ldb_module * module)1856 static int ldb_kv_lock_read(struct ldb_module *module)
1857 {
1858 	void *data = ldb_module_get_private(module);
1859 	struct ldb_kv_private *ldb_kv =
1860 	    talloc_get_type(data, struct ldb_kv_private);
1861 	return ldb_kv->kv_ops->lock_read(module);
1862 }
1863 
ldb_kv_unlock_read(struct ldb_module * module)1864 static int ldb_kv_unlock_read(struct ldb_module *module)
1865 {
1866 	void *data = ldb_module_get_private(module);
1867 	struct ldb_kv_private *ldb_kv =
1868 	    talloc_get_type(data, struct ldb_kv_private);
1869 	return ldb_kv->kv_ops->unlock_read(module);
1870 }
1871 
1872 static const struct ldb_module_ops ldb_kv_ops = {
1873     .name = "tdb",
1874     .init_context = ldb_kv_init_rootdse,
1875     .search = ldb_kv_handle_request,
1876     .add = ldb_kv_handle_request,
1877     .modify = ldb_kv_handle_request,
1878     .del = ldb_kv_handle_request,
1879     .rename = ldb_kv_handle_request,
1880     .extended = ldb_kv_handle_request,
1881     .start_transaction = ldb_kv_start_trans,
1882     .end_transaction = ldb_kv_end_trans,
1883     .prepare_commit = ldb_kv_prepare_commit,
1884     .del_transaction = ldb_kv_del_trans,
1885     .read_lock = ldb_kv_lock_read,
1886     .read_unlock = ldb_kv_unlock_read,
1887 };
1888 
ldb_kv_init_store(struct ldb_kv_private * ldb_kv,const char * name,struct ldb_context * ldb,const char * options[],struct ldb_module ** _module)1889 int ldb_kv_init_store(struct ldb_kv_private *ldb_kv,
1890 		      const char *name,
1891 		      struct ldb_context *ldb,
1892 		      const char *options[],
1893 		      struct ldb_module **_module)
1894 {
1895 	if (getenv("LDB_WARN_UNINDEXED")) {
1896 		ldb_kv->warn_unindexed = true;
1897 	}
1898 
1899 	if (getenv("LDB_WARN_REINDEX")) {
1900 		ldb_kv->warn_reindex = true;
1901 	}
1902 
1903 	ldb_kv->sequence_number = 0;
1904 
1905 	ldb_kv->pack_format_version = LDB_PACKING_FORMAT;
1906 
1907 	ldb_kv->pid = getpid();
1908 
1909 	ldb_kv->module = ldb_module_new(ldb, ldb, name, &ldb_kv_ops);
1910 	if (!ldb_kv->module) {
1911 		ldb_oom(ldb);
1912 		talloc_free(ldb_kv);
1913 		return LDB_ERR_OPERATIONS_ERROR;
1914 	}
1915 	ldb_module_set_private(ldb_kv->module, ldb_kv);
1916 	talloc_steal(ldb_kv->module, ldb_kv);
1917 
1918 	if (ldb_kv_cache_load(ldb_kv->module) != 0) {
1919 		ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
1920 				       "records for backend '%s'", name);
1921 		talloc_free(ldb_kv->module);
1922 		return LDB_ERR_OPERATIONS_ERROR;
1923 	}
1924 
1925 	*_module = ldb_kv->module;
1926 	/*
1927 	 * Set or override the maximum key length
1928 	 *
1929 	 * The ldb_mdb code will have set this to 511, but our tests
1930 	 * set this even smaller (to make the tests more practical).
1931 	 *
1932 	 * This must only be used for the selftest as the length
1933 	 * becomes encoded in the index keys.
1934 	 */
1935 	{
1936 		const char *len_str =
1937 			ldb_options_find(ldb, options,
1938 					 "max_key_len_for_self_test");
1939 		if (len_str != NULL) {
1940 			unsigned len = strtoul(len_str, NULL, 0);
1941 			ldb_kv->max_key_length = len;
1942 		}
1943 	}
1944 
1945 	/*
1946 	 * Override full DB scans
1947 	 *
1948 	 * A full DB scan is expensive on a large database.  This
1949 	 * option is for testing to show that the full DB scan is not
1950 	 * triggered.
1951 	 */
1952 	{
1953 		const char *len_str =
1954 			ldb_options_find(ldb, options,
1955 					 "disable_full_db_scan_for_self_test");
1956 		if (len_str != NULL) {
1957 			ldb_kv->disable_full_db_scan = true;
1958 		}
1959 	}
1960 
1961 	return LDB_SUCCESS;
1962 }
1963