1 /*
2    Partitions ldb module
3 
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6 
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11 
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16 
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31 
32 #include "dsdb/samdb/ldb_modules/partition.h"
33 
34 struct part_request {
35 	struct ldb_module *module;
36 	struct ldb_request *req;
37 };
38 
39 struct partition_context {
40 	struct ldb_module *module;
41 	struct ldb_request *req;
42 
43 	struct part_request *part_req;
44 	unsigned int num_requests;
45 	unsigned int finished_requests;
46 
47 	const char **referrals;
48 };
49 
partition_init_ctx(struct ldb_module * module,struct ldb_request * req)50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52 	struct partition_context *ac;
53 
54 	ac = talloc_zero(req, struct partition_context);
55 	if (ac == NULL) {
56 		ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57 		return NULL;
58 	}
59 
60 	ac->module = module;
61 	ac->req = req;
62 
63 	return ac;
64 }
65 
66 /*
67  * helper functions to call the next module in chain
68  */
partition_request(struct ldb_module * module,struct ldb_request * request)69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71 	if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72 		const struct dsdb_control_current_partition *partition = NULL;
73 		struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74 		if (partition_ctrl) {
75 			partition = talloc_get_type(partition_ctrl->data,
76 						    struct dsdb_control_current_partition);
77 		}
78 
79 		if (partition != NULL) {
80 			ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81 				  ldb_dn_get_linearized(partition->dn));
82 		} else {
83 			ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
84 		}
85 	}
86 
87 	return ldb_next_request(module, request);
88 }
89 
find_partition(struct partition_private_data * data,struct ldb_dn * dn,struct ldb_request * req)90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91 					     struct ldb_dn *dn,
92 					     struct ldb_request *req)
93 {
94 	unsigned int i;
95 	struct ldb_control *partition_ctrl;
96 
97 	/* see if the request has the partition DN specified in a
98 	 * control. The repl_meta_data module can specify this to
99 	 * ensure that replication happens to the right partition
100 	 */
101 	partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102 	if (partition_ctrl) {
103 		const struct dsdb_control_current_partition *partition;
104 		partition = talloc_get_type(partition_ctrl->data,
105 					    struct dsdb_control_current_partition);
106 		if (partition != NULL) {
107 			dn = partition->dn;
108 		}
109 	}
110 
111 	if (dn == NULL) {
112 		return NULL;
113 	}
114 
115 	/* Look at base DN */
116 	/* Figure out which partition it is under */
117 	/* Skip the lot if 'data' isn't here yet (initialisation) */
118 	for (i=0; data && data->partitions && data->partitions[i]; i++) {
119 		if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120 			return data->partitions[i];
121 		}
122 	}
123 
124 	return NULL;
125 }
126 
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
partition_req_callback(struct ldb_request * req,struct ldb_reply * ares)130 static int partition_req_callback(struct ldb_request *req,
131 				  struct ldb_reply *ares)
132 {
133 	struct partition_context *ac;
134 	struct ldb_module *module;
135 	struct ldb_request *nreq;
136 	int ret;
137 	struct ldb_control *partition_ctrl;
138 
139 	ac = talloc_get_type(req->context, struct partition_context);
140 
141 	if (!ares) {
142 		return ldb_module_done(ac->req, NULL, NULL,
143 					LDB_ERR_OPERATIONS_ERROR);
144 	}
145 
146 	partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147 	if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148 		/* If we didn't fan this request out to mulitple partitions,
149 		 * or this is an individual search result, we can
150 		 * deterministically tell the caller what partition this was
151 		 * written to (repl_meta_data likes to know) */
152 		ret = ldb_reply_add_control(ares,
153 					    DSDB_CONTROL_CURRENT_PARTITION_OID,
154 					    false, partition_ctrl->data);
155 		if (ret != LDB_SUCCESS) {
156 			return ldb_module_done(ac->req, NULL, NULL,
157 					       ret);
158 		}
159 	}
160 
161 	if (ares->error != LDB_SUCCESS) {
162 		return ldb_module_done(ac->req, ares->controls,
163 					ares->response, ares->error);
164 	}
165 
166 	switch (ares->type) {
167 	case LDB_REPLY_REFERRAL:
168 		return ldb_module_send_referral(ac->req, ares->referral);
169 
170 	case LDB_REPLY_ENTRY:
171 		if (ac->req->operation != LDB_SEARCH) {
172 			ldb_set_errstring(ldb_module_get_ctx(ac->module),
173 				"partition_req_callback:"
174 				" Unsupported reply type for this request");
175 			return ldb_module_done(ac->req, NULL, NULL,
176 						LDB_ERR_OPERATIONS_ERROR);
177 		}
178 
179 		return ldb_module_send_entry(ac->req, ares->message, ares->controls);
180 
181 	case LDB_REPLY_DONE:
182 		if (ac->req->operation == LDB_EXTENDED) {
183 			/* FIXME: check for ares->response, replmd does not fill it ! */
184 			if (ares->response) {
185 				if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186 					ldb_set_errstring(ldb_module_get_ctx(ac->module),
187 							  "partition_req_callback:"
188 							  " Unknown extended reply, "
189 							  "only supports START_TLS");
190 					talloc_free(ares);
191 					return ldb_module_done(ac->req, NULL, NULL,
192 								LDB_ERR_OPERATIONS_ERROR);
193 				}
194 			}
195 		}
196 
197 		ac->finished_requests++;
198 		if (ac->finished_requests == ac->num_requests) {
199 			/* Send back referrals if they do exist (search ops) */
200 			if (ac->referrals != NULL) {
201 				const char **ref;
202 				for (ref = ac->referrals; *ref != NULL; ++ref) {
203 					ret = ldb_module_send_referral(ac->req,
204 								       talloc_strdup(ac->req, *ref));
205 					if (ret != LDB_SUCCESS) {
206 						return ldb_module_done(ac->req, NULL, NULL,
207 								       ret);
208 					}
209 				}
210 			}
211 
212 			/* this was the last one, call callback */
213 			return ldb_module_done(ac->req, ares->controls,
214 					       ares->response,
215 					       ares->error);
216 		}
217 
218 		/* not the last, now call the next one */
219 		module = ac->part_req[ac->finished_requests].module;
220 		nreq = ac->part_req[ac->finished_requests].req;
221 
222 		ret = partition_request(module, nreq);
223 		if (ret != LDB_SUCCESS) {
224 			talloc_free(ares);
225 			return ldb_module_done(ac->req, NULL, NULL, ret);
226 		}
227 
228 		break;
229 	}
230 
231 	talloc_free(ares);
232 	return LDB_SUCCESS;
233 }
234 
partition_prep_request(struct partition_context * ac,struct dsdb_partition * partition)235 static int partition_prep_request(struct partition_context *ac,
236 				  struct dsdb_partition *partition)
237 {
238 	int ret;
239 	struct ldb_request *req;
240 	struct ldb_control *partition_ctrl = NULL;
241 
242 	ac->part_req = talloc_realloc(ac, ac->part_req,
243 					struct part_request,
244 					ac->num_requests + 1);
245 	if (ac->part_req == NULL) {
246 		return ldb_oom(ldb_module_get_ctx(ac->module));
247 	}
248 
249 	switch (ac->req->operation) {
250 	case LDB_SEARCH:
251 		ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
252 					ac->part_req,
253 					ac->req->op.search.base,
254 					ac->req->op.search.scope,
255 					ac->req->op.search.tree,
256 					ac->req->op.search.attrs,
257 					ac->req->controls,
258 					ac, partition_req_callback,
259 					ac->req);
260 		LDB_REQ_SET_LOCATION(req);
261 		break;
262 	case LDB_ADD:
263 		ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
264 					ac->req->op.add.message,
265 					ac->req->controls,
266 					ac, partition_req_callback,
267 					ac->req);
268 		LDB_REQ_SET_LOCATION(req);
269 		break;
270 	case LDB_MODIFY:
271 		ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
272 					ac->req->op.mod.message,
273 					ac->req->controls,
274 					ac, partition_req_callback,
275 					ac->req);
276 		LDB_REQ_SET_LOCATION(req);
277 		break;
278 	case LDB_DELETE:
279 		ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
280 					ac->req->op.del.dn,
281 					ac->req->controls,
282 					ac, partition_req_callback,
283 					ac->req);
284 		LDB_REQ_SET_LOCATION(req);
285 		break;
286 	case LDB_RENAME:
287 		ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
288 					ac->req->op.rename.olddn,
289 					ac->req->op.rename.newdn,
290 					ac->req->controls,
291 					ac, partition_req_callback,
292 					ac->req);
293 		LDB_REQ_SET_LOCATION(req);
294 		break;
295 	case LDB_EXTENDED:
296 		ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
297 					ac->part_req,
298 					ac->req->op.extended.oid,
299 					ac->req->op.extended.data,
300 					ac->req->controls,
301 					ac, partition_req_callback,
302 					ac->req);
303 		LDB_REQ_SET_LOCATION(req);
304 		break;
305 	default:
306 		ldb_set_errstring(ldb_module_get_ctx(ac->module),
307 				  "Unsupported request type!");
308 		ret = LDB_ERR_UNWILLING_TO_PERFORM;
309 	}
310 
311 	if (ret != LDB_SUCCESS) {
312 		return ret;
313 	}
314 
315 	ac->part_req[ac->num_requests].req = req;
316 
317 	if (ac->req->controls) {
318 		/* Duplicate everything beside the current partition control */
319 		partition_ctrl = ldb_request_get_control(ac->req,
320 							 DSDB_CONTROL_CURRENT_PARTITION_OID);
321 		if (!ldb_save_controls(partition_ctrl, req, NULL)) {
322 			return ldb_module_oom(ac->module);
323 		}
324 	}
325 
326 	if (partition) {
327 		void *part_data = partition->ctrl;
328 
329 		ac->part_req[ac->num_requests].module = partition->module;
330 
331 		if (partition_ctrl != NULL) {
332 			if (partition_ctrl->data != NULL) {
333 				part_data = partition_ctrl->data;
334 			}
335 
336 			/*
337 			 * If the provided current partition control is without
338 			 * data then use the calculated one.
339 			 */
340 			ret = ldb_request_add_control(req,
341 						      DSDB_CONTROL_CURRENT_PARTITION_OID,
342 						      false, part_data);
343 			if (ret != LDB_SUCCESS) {
344 				return ret;
345 			}
346 		}
347 
348 		if (req->operation == LDB_SEARCH) {
349 			/* If the search is for 'more' than this partition,
350 			 * then change the basedn, so a remote LDAP server
351 			 * doesn't object */
352 			if (ldb_dn_compare_base(partition->ctrl->dn,
353 						req->op.search.base) != 0) {
354 				req->op.search.base = partition->ctrl->dn;
355 			}
356 		}
357 
358 	} else {
359 		/* make sure you put the module here, or
360 		 * or ldb_next_request() will skip a module */
361 		ac->part_req[ac->num_requests].module = ac->module;
362 	}
363 
364 	ac->num_requests++;
365 
366 	return LDB_SUCCESS;
367 }
368 
partition_call_first(struct partition_context * ac)369 static int partition_call_first(struct partition_context *ac)
370 {
371 	return partition_request(ac->part_req[0].module, ac->part_req[0].req);
372 }
373 
374 /**
375  * Send a request down to all the partitions (but not the sam.ldb file)
376  */
partition_send_all(struct ldb_module * module,struct partition_context * ac,struct ldb_request * req)377 static int partition_send_all(struct ldb_module *module,
378 			      struct partition_context *ac,
379 			      struct ldb_request *req)
380 {
381 	unsigned int i;
382 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
383 							      struct partition_private_data);
384 	int ret;
385 
386 	for (i=0; data && data->partitions && data->partitions[i]; i++) {
387 		ret = partition_prep_request(ac, data->partitions[i]);
388 		if (ret != LDB_SUCCESS) {
389 			return ret;
390 		}
391 	}
392 
393 	/* fire the first one */
394 	return partition_call_first(ac);
395 }
396 
397 struct partition_copy_context {
398 	struct ldb_module *module;
399 	struct partition_context *partition_context;
400 	struct ldb_request *request;
401 	struct ldb_dn *dn;
402 };
403 
404 /*
405  * A special DN has been updated in the primary partition. Now propagate those
406  * changes to the remaining partitions.
407  *
408  * Note: that the operations are asynchronous and this function is called
409  *       from partition_copy_all_callback_handler in response to an async
410  *       callback.
411  */
partition_copy_all_callback_action(struct ldb_module * module,struct partition_context * ac,struct ldb_request * req,struct ldb_dn * dn)412 static int partition_copy_all_callback_action(
413 	struct ldb_module *module,
414 	struct partition_context *ac,
415 	struct ldb_request *req,
416 	struct ldb_dn *dn)
417 
418 {
419 
420 	unsigned int i;
421 	struct partition_private_data *data =
422 		talloc_get_type(
423 			ldb_module_get_private(module),
424 			struct partition_private_data);
425 	int search_ret;
426 	struct ldb_result *res;
427 	/* now fetch the resulting object, and then copy it to all the
428 	 * other partitions. We need this approach to cope with the
429 	 * partitions getting out of sync. If for example the
430 	 * @ATTRIBUTES object exists on one partition but not the
431 	 * others then just doing each of the partitions in turn will
432 	 * lead to an error
433 	 */
434 	search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
435 	if (search_ret != LDB_SUCCESS) {
436 		return search_ret;
437 	}
438 
439 	/* now delete the object in the other partitions, if requried
440 	*/
441 	if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
442 		for (i=0; data->partitions && data->partitions[i]; i++) {
443 			int pret;
444 			pret = dsdb_module_del(data->partitions[i]->module,
445 					       dn,
446 					       DSDB_FLAG_NEXT_MODULE,
447 					       req);
448 			if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
449 				/* we should only get success or no
450 				   such object from the other partitions */
451 				return pret;
452 			}
453 		}
454 
455 		return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
456 	}
457 
458 	/* now add/modify in the other partitions */
459 	for (i=0; data->partitions && data->partitions[i]; i++) {
460 		struct ldb_message *modify_msg = NULL;
461 		int pret;
462 		unsigned int el_idx;
463 
464 		pret = dsdb_module_add(data->partitions[i]->module,
465 				       res->msgs[0],
466 				       DSDB_FLAG_NEXT_MODULE,
467 				       req);
468 		if (pret == LDB_SUCCESS) {
469 			continue;
470 		}
471 
472 		if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
473 			return pret;
474 		}
475 
476 		modify_msg = ldb_msg_copy(req, res->msgs[0]);
477 		if (modify_msg == NULL) {
478 			return ldb_module_oom(module);
479 		}
480 
481 		/*
482 		 * mark all the message elements as
483 		 * LDB_FLAG_MOD_REPLACE
484 		 */
485 		for (el_idx=0;
486 		     el_idx < modify_msg->num_elements;
487 		     el_idx++) {
488 			modify_msg->elements[el_idx].flags
489 				= LDB_FLAG_MOD_REPLACE;
490 		}
491 
492 		if (req->operation == LDB_MODIFY) {
493 			const struct ldb_message *req_msg = req->op.mod.message;
494 			/*
495 			 * mark elements to be removed, if there were
496 			 * deleted entirely above we need to delete
497 			 * them here too
498 			 */
499 			for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
500 				if (req_msg->elements[el_idx].flags & LDB_FLAG_MOD_DELETE
501 				    || ((req_msg->elements[el_idx].flags & LDB_FLAG_MOD_REPLACE) &&
502 					req_msg->elements[el_idx].num_values == 0)) {
503 					if (ldb_msg_find_element(modify_msg,
504 								 req_msg->elements[el_idx].name) != NULL) {
505 						continue;
506 					}
507 					pret = ldb_msg_add_empty(
508 						modify_msg,
509 						req_msg->elements[el_idx].name,
510 						LDB_FLAG_MOD_REPLACE,
511 						NULL);
512 					if (pret != LDB_SUCCESS) {
513 						return pret;
514 					}
515 				}
516 			}
517 		}
518 
519 		pret = dsdb_module_modify(data->partitions[i]->module,
520 					  modify_msg,
521 					  DSDB_FLAG_NEXT_MODULE,
522 					  req);
523 
524 		if (pret != LDB_SUCCESS) {
525 			return pret;
526 		}
527 	}
528 
529 	return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
530 }
531 
532 
533 /*
534  * @brief call back function for the ldb operations on special DN's.
535  *
536  * As the LDB operations are async, and we wish to use the result
537  * the operations, a callback needs to be registered to process the results
538  * of the LDB operations.
539  *
540  * @param req the ldb request
541  * @param res the result of the operation
542  *
543  * @return the LDB_STATUS
544  */
partition_copy_all_callback_handler(struct ldb_request * req,struct ldb_reply * ares)545 static int partition_copy_all_callback_handler(
546 	struct ldb_request *req,
547 	struct ldb_reply *ares)
548 {
549 	struct partition_copy_context *ac = NULL;
550 
551 	ac = talloc_get_type(
552 		req->context,
553 		struct partition_copy_context);
554 
555 	if (!ares) {
556 		return ldb_module_done(
557 			ac->request,
558 			NULL,
559 			NULL,
560 			LDB_ERR_OPERATIONS_ERROR);
561 	}
562 
563 	/* pass on to the callback */
564 	switch (ares->type) {
565 	case LDB_REPLY_ENTRY:
566 		return ldb_module_send_entry(
567 			ac->request,
568 			ares->message,
569 			ares->controls);
570 
571 	case LDB_REPLY_REFERRAL:
572 		return ldb_module_send_referral(
573 			ac->request,
574 			ares->referral);
575 
576 	case LDB_REPLY_DONE: {
577 		int error = ares->error;
578 		if (error == LDB_SUCCESS) {
579 			error = partition_copy_all_callback_action(
580 				ac->module,
581 				ac->partition_context,
582 				ac->request,
583 				ac->dn);
584 		}
585 		return ldb_module_done(
586 			ac->request,
587 			ares->controls,
588 			ares->response,
589 			error);
590 	}
591 
592 	default:
593 		/* Can't happen */
594 		return LDB_ERR_OPERATIONS_ERROR;
595 	}
596 }
597 
598 /**
599  * send an operation to the top partition, then copy the resulting
600  * object to all other partitions.
601  */
partition_copy_all(struct ldb_module * module,struct partition_context * partition_context,struct ldb_request * req,struct ldb_dn * dn)602 static int partition_copy_all(
603 	struct ldb_module *module,
604 	struct partition_context *partition_context,
605 	struct ldb_request *req,
606 	struct ldb_dn *dn)
607 {
608 	struct ldb_request *new_req = NULL;
609 	struct ldb_context *ldb = NULL;
610 	struct partition_copy_context *context = NULL;
611 
612 	int ret;
613 
614 	ldb = ldb_module_get_ctx(module);
615 
616 	context = talloc_zero(req, struct partition_copy_context);
617 	if (context == NULL) {
618 		return ldb_oom(ldb);
619 	}
620 	context->module = module;
621 	context->request = req;
622 	context->dn = dn;
623 	context->partition_context = partition_context;
624 
625 	switch (req->operation) {
626 	case LDB_ADD:
627 		ret = ldb_build_add_req(
628 			&new_req,
629 			ldb,
630 			req,
631 			req->op.add.message,
632 			req->controls,
633 			context,
634 			partition_copy_all_callback_handler,
635 			req);
636 		break;
637 	case LDB_MODIFY:
638 		ret = ldb_build_mod_req(
639 			&new_req,
640 			ldb,
641 			req,
642 			req->op.mod.message,
643 			req->controls,
644 			context,
645 			partition_copy_all_callback_handler,
646 			req);
647 		break;
648 	case LDB_DELETE:
649 		ret = ldb_build_del_req(
650 			&new_req,
651 			ldb,
652 			req,
653 			req->op.del.dn,
654 			req->controls,
655 			context,
656 			partition_copy_all_callback_handler,
657 			req);
658 		break;
659 	case LDB_RENAME:
660 		ret = ldb_build_rename_req(
661 			&new_req,
662 			ldb,
663 			req,
664 			req->op.rename.olddn,
665 			req->op.rename.newdn,
666 			req->controls,
667 			context,
668 			partition_copy_all_callback_handler,
669 			req);
670 		break;
671 	default:
672 		/*
673 		 * Shouldn't happen.
674 		 */
675 		ldb_debug(
676 			ldb,
677 			LDB_DEBUG_ERROR,
678 			"Unexpected operation type (%d)\n", req->operation);
679 		ret = LDB_ERR_OPERATIONS_ERROR;
680 		break;
681 	}
682 	if (ret != LDB_SUCCESS) {
683 		return ret;
684 	}
685 	return ldb_next_request(module, new_req);
686 }
687 /**
688  * Figure out which backend a request needs to be aimed at.  Some
689  * requests must be replicated to all backends
690  */
partition_replicate(struct ldb_module * module,struct ldb_request * req,struct ldb_dn * dn)691 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
692 {
693 	struct partition_context *ac;
694 	unsigned int i;
695 	int ret;
696 	struct dsdb_partition *partition;
697 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
698 							      struct partition_private_data);
699 
700 	/* if we aren't initialised yet go further */
701 	if (!data || !data->partitions) {
702 		return ldb_next_request(module, req);
703 	}
704 
705 	if (ldb_dn_is_special(dn)) {
706 		/* Is this a special DN, we need to replicate to every backend? */
707 		for (i=0; data->replicate && data->replicate[i]; i++) {
708 			if (ldb_dn_compare(data->replicate[i],
709 					   dn) == 0) {
710 
711 				ac = partition_init_ctx(module, req);
712 				if (!ac) {
713 					return ldb_operr(ldb_module_get_ctx(module));
714 				}
715 
716 				return partition_copy_all(module, ac, req, dn);
717 			}
718 		}
719 	}
720 
721 	/* Otherwise, we need to find the partition to fire it to */
722 
723 	/* Find partition */
724 	partition = find_partition(data, dn, req);
725 	if (!partition) {
726 		/*
727 		 * if we haven't found a matching partition
728 		 * pass the request to the main ldb
729 		 *
730 		 * TODO: we should maybe return an error here
731 		 *       if it's not a special dn
732 		 */
733 
734 		return ldb_next_request(module, req);
735 	}
736 
737 	ac = partition_init_ctx(module, req);
738 	if (!ac) {
739 		return ldb_operr(ldb_module_get_ctx(module));
740 	}
741 
742 	/* we need to add a control but we never touch the original request */
743 	ret = partition_prep_request(ac, partition);
744 	if (ret != LDB_SUCCESS) {
745 		return ret;
746 	}
747 
748 	/* fire the first one */
749 	return partition_call_first(ac);
750 }
751 
752 /* search */
partition_search(struct ldb_module * module,struct ldb_request * req)753 static int partition_search(struct ldb_module *module, struct ldb_request *req)
754 {
755 	struct ldb_control **saved_controls;
756 	/* Find backend */
757 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
758 							      struct partition_private_data);
759 	struct partition_context *ac;
760 	struct ldb_context *ldb;
761 	struct loadparm_context *lp_ctx;
762 
763 	struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
764 	struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
765 	struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
766 
767 	struct ldb_search_options_control *search_options = NULL;
768 	struct dsdb_partition *p;
769 	unsigned int i, j;
770 	int ret;
771 	bool domain_scope = false, phantom_root = false;
772 
773 	p = find_partition(data, NULL, req);
774 	if (p != NULL) {
775 		/* the caller specified what partition they want the
776 		 * search - just pass it on
777 		 */
778 		return ldb_next_request(p->module, req);
779 	}
780 
781 	/* Get back the search options from the search control, and mark it as
782 	 * non-critical (to make backends and also dcpromo happy).
783 	 */
784 	if (search_control) {
785 		search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
786 		search_control->critical = 0;
787 
788 	}
789 
790 	/* Remove the "domain_scope" control, so we don't confuse a backend
791 	 * server */
792 	if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
793 		return ldb_oom(ldb_module_get_ctx(module));
794 	}
795 
796 	/* if we aren't initialised yet go further */
797 	if (!data || !data->partitions) {
798 		return ldb_next_request(module, req);
799 	}
800 
801 	/* Special DNs without specified partition should go further */
802 	if (ldb_dn_is_special(req->op.search.base)) {
803 		return ldb_next_request(module, req);
804 	}
805 
806 	/* Locate the options */
807 	domain_scope = (search_options
808 		&& (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
809 		|| domain_scope_control;
810 	phantom_root = search_options
811 		&& (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
812 
813 	/* Remove handled options from the search control flag */
814 	if (search_options) {
815 		search_options->search_options = search_options->search_options
816 			& ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
817 			& ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
818 	}
819 
820 	ac = partition_init_ctx(module, req);
821 	if (!ac) {
822 		return ldb_operr(ldb_module_get_ctx(module));
823 	}
824 
825 	ldb = ldb_module_get_ctx(ac->module);
826 	lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
827 						struct loadparm_context);
828 
829 	/* Search from the base DN */
830 	if (ldb_dn_is_null(req->op.search.base)) {
831 		if (!phantom_root) {
832 			return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
833 		}
834 		return partition_send_all(module, ac, req);
835 	}
836 
837 	for (i=0; data->partitions[i]; i++) {
838 		bool match = false, stop = false;
839 
840 		if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
841 			if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
842 						req->op.search.base) == 0) {
843 				/* base DN is in a partial replica
844 				   with the NO_GLOBAL_CATALOG
845 				   control. This partition is invisible */
846 				/* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
847 				continue;
848 			}
849 		}
850 
851 		if (phantom_root) {
852 			/* Phantom root: Find all partitions under the
853 			 * search base. We match if:
854 			 *
855 			 * 1) the DN we are looking for exactly matches a
856 			 *    certain partition and always stop
857 			 * 2) the DN we are looking for is a parent of certain
858 			 *    partitions and it isn't a scope base search
859 			 * 3) the DN we are looking for is a child of a certain
860 			 *    partition and always stop
861 			 *    - we don't need to go any further up in the
862 			 *    hierarchy!
863 			 */
864 			if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
865 					   req->op.search.base) == 0) {
866 				match = true;
867 				stop = true;
868 			}
869 			if (!match &&
870 			    (ldb_dn_compare_base(req->op.search.base,
871 						 data->partitions[i]->ctrl->dn) == 0 &&
872 			     req->op.search.scope != LDB_SCOPE_BASE)) {
873 				match = true;
874 			}
875 			if (!match &&
876 			    ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
877 						req->op.search.base) == 0) {
878 				match = true;
879 				stop = true; /* note that this relies on partition ordering */
880 			}
881 		} else {
882 			/* Domain scope: Find all partitions under the search
883 			 * base.
884 			 *
885 			 * We generate referral candidates if we haven't
886 			 * specified the domain scope control, haven't a base
887 			 * search* scope and the DN we are looking for is a real
888 			 * predecessor of certain partitions. When a new
889 			 * referral candidate is nearer to the DN than an
890 			 * existing one delete the latter (we want to have only
891 			 * the closest ones). When we checked this for all
892 			 * candidates we have the final referrals.
893 			 *
894 			 * We match if the DN we are looking for is a child of
895 			 * a certain partition or the partition
896 			 * DN itself - we don't need to go any further
897 			 * up in the hierarchy!
898 			 */
899 			if ((!domain_scope) &&
900 			    (req->op.search.scope != LDB_SCOPE_BASE) &&
901 			    (ldb_dn_compare_base(req->op.search.base,
902 						 data->partitions[i]->ctrl->dn) == 0) &&
903 			    (ldb_dn_compare(req->op.search.base,
904 					    data->partitions[i]->ctrl->dn) != 0)) {
905 				const char *scheme = ldb_get_opaque(
906 				    ldb, LDAP_REFERRAL_SCHEME_OPAQUE);
907 				char *ref = talloc_asprintf(
908 					ac,
909 					"%s://%s/%s%s",
910 					scheme == NULL ? "ldap" : scheme,
911 					lpcfg_dnsdomain(lp_ctx),
912 					ldb_dn_get_linearized(
913 					    data->partitions[i]->ctrl->dn),
914 					req->op.search.scope ==
915 					    LDB_SCOPE_ONELEVEL ? "??base" : "");
916 
917 				if (ref == NULL) {
918 					return ldb_oom(ldb);
919 				}
920 
921 				/* Initialise the referrals list */
922 				if (ac->referrals == NULL) {
923 					char **l = str_list_make_empty(ac);
924 					ac->referrals = discard_const_p(const char *, l);
925 					if (ac->referrals == NULL) {
926 						return ldb_oom(ldb);
927 					}
928 				}
929 
930 				/* Check if the new referral candidate is
931 				 * closer to the base DN than already
932 				 * saved ones and delete the latters */
933 				j = 0;
934 				while (ac->referrals[j] != NULL) {
935 					if (strstr(ac->referrals[j],
936 						   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
937 						str_list_remove(ac->referrals,
938 								ac->referrals[j]);
939 					} else {
940 						++j;
941 					}
942 				}
943 
944 				/* Add our new candidate */
945 				ac->referrals = str_list_add(ac->referrals, ref);
946 
947 				talloc_free(ref);
948 
949 				if (ac->referrals == NULL) {
950 					return ldb_oom(ldb);
951 				}
952 			}
953 			if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
954 				match = true;
955 				stop = true; /* note that this relies on partition ordering */
956 			}
957 		}
958 
959 		if (match) {
960 			ret = partition_prep_request(ac, data->partitions[i]);
961 			if (ret != LDB_SUCCESS) {
962 				return ret;
963 			}
964 		}
965 
966 		if (stop) break;
967 	}
968 
969 	/* Perhaps we didn't match any partitions. Try the main partition */
970 	if (ac->num_requests == 0) {
971 		talloc_free(ac);
972 		return ldb_next_request(module, req);
973 	}
974 
975 	/* fire the first one */
976 	return partition_call_first(ac);
977 }
978 
979 /* add */
partition_add(struct ldb_module * module,struct ldb_request * req)980 static int partition_add(struct ldb_module *module, struct ldb_request *req)
981 {
982 	return partition_replicate(module, req, req->op.add.message->dn);
983 }
984 
985 /* modify */
partition_modify(struct ldb_module * module,struct ldb_request * req)986 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
987 {
988 	return partition_replicate(module, req, req->op.mod.message->dn);
989 }
990 
991 /* delete */
partition_delete(struct ldb_module * module,struct ldb_request * req)992 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
993 {
994 	return partition_replicate(module, req, req->op.del.dn);
995 }
996 
997 /* rename */
partition_rename(struct ldb_module * module,struct ldb_request * req)998 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
999 {
1000 	/* Find backend */
1001 	struct dsdb_partition *backend, *backend2;
1002 
1003 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1004 							      struct partition_private_data);
1005 
1006 	/* Skip the lot if 'data' isn't here yet (initialisation) */
1007 	if (!data) {
1008 		return ldb_operr(ldb_module_get_ctx(module));
1009 	}
1010 
1011 	backend = find_partition(data, req->op.rename.olddn, req);
1012 	backend2 = find_partition(data, req->op.rename.newdn, req);
1013 
1014 	if ((backend && !backend2) || (!backend && backend2)) {
1015 		return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1016 	}
1017 
1018 	if (backend != backend2) {
1019 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
1020 				       "Cannot rename from %s in %s to %s in %s: %s",
1021 				       ldb_dn_get_linearized(req->op.rename.olddn),
1022 				       ldb_dn_get_linearized(backend->ctrl->dn),
1023 				       ldb_dn_get_linearized(req->op.rename.newdn),
1024 				       ldb_dn_get_linearized(backend2->ctrl->dn),
1025 				       ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
1026 		return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1027 	}
1028 
1029 	return partition_replicate(module, req, req->op.rename.olddn);
1030 }
1031 
1032 /* start a transaction */
partition_start_trans(struct ldb_module * module)1033 int partition_start_trans(struct ldb_module *module)
1034 {
1035 	int i = 0;
1036 	int ret = 0;
1037 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1038 							      struct partition_private_data);
1039 	/* Look at base DN */
1040 	/* Figure out which partition it is under */
1041 	/* Skip the lot if 'data' isn't here yet (initialization) */
1042 	if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
1043 		ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
1044 	}
1045 
1046 	/*
1047 	 * We start a transaction on metadata.tdb first and end it last in
1048 	 * end_trans. This makes locking semantics follow TDB rather than MDB,
1049 	 * and effectively locks all partitions at once.
1050 	 * Detail:
1051 	 * Samba AD is special in that the partitions module (this file)
1052 	 * combines multiple independently locked databases into one overall
1053 	 * transaction. Changes across multiple partition DBs in a single
1054 	 * transaction must ALL be either visible or invisible.
1055 	 * The way this is achieved is by taking out a write lock on
1056 	 * metadata.tdb at the start of prepare_commit, while unlocking it at
1057 	 * the end of end_trans. This is matched by read_lock, ensuring it
1058 	 * can't progress until that write lock is released.
1059 	 *
1060 	 * metadata.tdb needs to be a TDB file because MDB uses independent
1061 	 * locks, which means a read lock and a write lock can be held at the
1062 	 * same time, whereas in TDB, the two locks block each other. The TDB
1063 	 * behaviour is required to implement the functionality described
1064 	 * above.
1065 	 *
1066 	 * An important additional detail here is that if prepare_commit is
1067 	 * called on a TDB without any changes being made, no write lock is
1068 	 * taken. We address this by storing a sequence number in metadata.tdb
1069 	 * which is updated every time a replicated attribute is modified.
1070 	 * The possibility of a few unreplicated attributes being out of date
1071 	 * turns out not to be a problem.
1072 	 * For this reason, a lock on sam.ldb (which is a TDB) won't achieve
1073 	 * the same end as locking metadata.tdb, unless we made a modification
1074 	 * to the @ records found there before every prepare_commit.
1075 	 */
1076 	ret = partition_metadata_start_trans(module);
1077 	if (ret != LDB_SUCCESS) {
1078 		return ret;
1079 	}
1080 
1081 	ret = ldb_next_start_trans(module);
1082 	if (ret != LDB_SUCCESS) {
1083 		partition_metadata_del_trans(module);
1084 		return ret;
1085 	}
1086 
1087 	ret = partition_reload_if_required(module, data, NULL);
1088 	if (ret != LDB_SUCCESS) {
1089 		ldb_next_del_trans(module);
1090 		partition_metadata_del_trans(module);
1091 		return ret;
1092 	}
1093 
1094 	/*
1095 	 * The following per partition locks are required mostly because TDB
1096 	 * and MDB require locks before read and write ops are permitted.
1097 	 */
1098 	for (i=0; data && data->partitions && data->partitions[i]; i++) {
1099 		if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1100 			ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
1101 				  ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1102 		}
1103 		ret = ldb_next_start_trans(data->partitions[i]->module);
1104 		if (ret != LDB_SUCCESS) {
1105 			/* Back it out, if it fails on one */
1106 			for (i--; i >= 0; i--) {
1107 				ldb_next_del_trans(data->partitions[i]->module);
1108 			}
1109 			ldb_next_del_trans(module);
1110 			partition_metadata_del_trans(module);
1111 			return ret;
1112 		}
1113 	}
1114 
1115 	data->in_transaction++;
1116 
1117 	return LDB_SUCCESS;
1118 }
1119 
1120 /* prepare for a commit */
partition_prepare_commit(struct ldb_module * module)1121 int partition_prepare_commit(struct ldb_module *module)
1122 {
1123 	unsigned int i;
1124 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1125 							      struct partition_private_data);
1126 	int ret;
1127 
1128 	/*
1129 	 * Order of prepare_commit calls must match that in
1130 	 * partition_start_trans. See comment in that function for detail.
1131 	 */
1132 	ret = partition_metadata_prepare_commit(module);
1133 	if (ret != LDB_SUCCESS) {
1134 		return ret;
1135 	}
1136 
1137 	ret = ldb_next_prepare_commit(module);
1138 	if (ret != LDB_SUCCESS) {
1139 		return ret;
1140 	}
1141 
1142 	for (i=0; data && data->partitions && data->partitions[i]; i++) {
1143 		if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1144 			ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
1145 				  ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1146 		}
1147 		ret = ldb_next_prepare_commit(data->partitions[i]->module);
1148 		if (ret != LDB_SUCCESS) {
1149 			ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
1150 					       ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1151 					       ldb_errstring(ldb_module_get_ctx(module)));
1152 			return ret;
1153 		}
1154 	}
1155 
1156 	if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1157 		ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
1158 	}
1159 
1160 	return LDB_SUCCESS;
1161 }
1162 
1163 
1164 /* end a transaction */
partition_end_trans(struct ldb_module * module)1165 int partition_end_trans(struct ldb_module *module)
1166 {
1167 	int ret, ret2;
1168 	int i;
1169 	struct ldb_context *ldb = ldb_module_get_ctx(module);
1170 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1171 							      struct partition_private_data);
1172 	bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1173 
1174 	ret = LDB_SUCCESS;
1175 
1176 	if (data->in_transaction == 0) {
1177 		DEBUG(0,("partition end transaction mismatch\n"));
1178 		ret = LDB_ERR_OPERATIONS_ERROR;
1179 	} else {
1180 		data->in_transaction--;
1181 	}
1182 
1183 	/*
1184 	 * Order of end_trans calls must be the reverse of that in
1185 	 * partition_start_trans. See comment in that function for detail.
1186 	 */
1187 	if (data && data->partitions) {
1188 		/* Just counting the partitions */
1189 		for (i=0; data->partitions[i]; i++) {}
1190 
1191 		/* now walk them backwards */
1192 		for (i--; i>=0; i--) {
1193 			struct dsdb_partition *p = data->partitions[i];
1194 			if (trace) {
1195 				ldb_debug(ldb,
1196 					  LDB_DEBUG_TRACE,
1197 					  "partition_end_trans() -> %s",
1198 					  ldb_dn_get_linearized(p->ctrl->dn));
1199 			}
1200 			ret2 = ldb_next_end_trans(p->module);
1201 			if (ret2 != LDB_SUCCESS) {
1202 				ldb_asprintf_errstring(ldb,
1203 					"end_trans error on %s: %s",
1204 					ldb_dn_get_linearized(p->ctrl->dn),
1205 					ldb_errstring(ldb));
1206 				ret = ret2;
1207 			}
1208 		}
1209 	}
1210 
1211 	if (trace) {
1212 		ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
1213 	}
1214 	ret2 = ldb_next_end_trans(module);
1215 	if (ret2 != LDB_SUCCESS) {
1216 		ret = ret2;
1217 	}
1218 
1219 	ret2 = partition_metadata_end_trans(module);
1220 	if (ret2 != LDB_SUCCESS) {
1221 		ret = ret2;
1222 	}
1223 
1224 	return ret;
1225 }
1226 
1227 /* delete a transaction */
partition_del_trans(struct ldb_module * module)1228 int partition_del_trans(struct ldb_module *module)
1229 {
1230 	int ret, final_ret = LDB_SUCCESS;
1231 	int i;
1232 	struct ldb_context *ldb = ldb_module_get_ctx(module);
1233 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1234 							      struct partition_private_data);
1235 	bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1236 
1237 	if (data == NULL) {
1238 		DEBUG(0,("partion delete transaction with no private data\n"));
1239 		return ldb_operr(ldb);
1240 	}
1241 
1242 	/*
1243 	 * Order of del_trans calls must be the reverse of that in
1244 	 * partition_start_trans. See comment in that function for detail.
1245 	 */
1246 	if (data->partitions) {
1247 		/* Just counting the partitions */
1248 		for (i=0; data->partitions[i]; i++) {}
1249 
1250 		/* now walk them backwards */
1251 		for (i--; i>=0; i--) {
1252 			struct dsdb_partition *p = data->partitions[i];
1253 			if (trace) {
1254 				ldb_debug(ldb,
1255 					  LDB_DEBUG_TRACE,
1256 					  "partition_del_trans() -> %s",
1257 					  ldb_dn_get_linearized(p->ctrl->dn));
1258 			}
1259 			ret = ldb_next_del_trans(p->module);
1260 			if (ret != LDB_SUCCESS) {
1261 				ldb_asprintf_errstring(ldb,
1262 					"del_trans error on %s: %s",
1263 					ldb_dn_get_linearized(p->ctrl->dn),
1264 					ldb_errstring(ldb));
1265 				final_ret = ret;
1266 			}
1267 		}
1268 	}
1269 
1270 	if (trace) {
1271 		ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
1272 	}
1273 	ret = ldb_next_del_trans(module);
1274 	if (ret != LDB_SUCCESS) {
1275 		final_ret = ret;
1276 	}
1277 
1278 	ret = partition_metadata_del_trans(module);
1279 	if (ret != LDB_SUCCESS) {
1280 		final_ret = ret;
1281 	}
1282 
1283 	if (data->in_transaction == 0) {
1284 		DEBUG(0,("partition del transaction mismatch\n"));
1285 		return ldb_operr(ldb_module_get_ctx(module));
1286 	}
1287 	data->in_transaction--;
1288 
1289 	return final_ret;
1290 }
1291 
partition_primary_sequence_number(struct ldb_module * module,TALLOC_CTX * mem_ctx,uint64_t * seq_number,struct ldb_request * parent)1292 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx,
1293 				      uint64_t *seq_number,
1294 				      struct ldb_request *parent)
1295 {
1296 	int ret;
1297 	struct ldb_result *res;
1298 	struct ldb_seqnum_request *tseq;
1299 	struct ldb_seqnum_result *seqr;
1300 
1301 	tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
1302 	if (tseq == NULL) {
1303 		return ldb_oom(ldb_module_get_ctx(module));
1304 	}
1305 	tseq->type = LDB_SEQ_HIGHEST_SEQ;
1306 
1307 	ret = dsdb_module_extended(module, tseq, &res,
1308 				   LDB_EXTENDED_SEQUENCE_NUMBER,
1309 				   tseq,
1310 				   DSDB_FLAG_NEXT_MODULE,
1311 				   parent);
1312 	if (ret != LDB_SUCCESS) {
1313 		talloc_free(tseq);
1314 		return ret;
1315 	}
1316 
1317 	seqr = talloc_get_type_abort(res->extended->data,
1318 				     struct ldb_seqnum_result);
1319 	if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1320 		talloc_free(res);
1321 		return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1322 			"Primary backend in partition module returned a timestamp based seq");
1323 	}
1324 
1325 	*seq_number = seqr->seq_num;
1326 	talloc_free(tseq);
1327 	return LDB_SUCCESS;
1328 }
1329 
1330 
1331 /*
1332  * Older version of sequence number as sum of sequence numbers for each partition
1333  */
partition_sequence_number_from_partitions(struct ldb_module * module,uint64_t * seqr)1334 int partition_sequence_number_from_partitions(struct ldb_module *module,
1335 					      uint64_t *seqr)
1336 {
1337 	int ret;
1338 	unsigned int i;
1339 	uint64_t seq_number = 0;
1340 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1341 							      struct partition_private_data);
1342 
1343 	ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1344 	if (ret != LDB_SUCCESS) {
1345 		return ret;
1346 	}
1347 
1348 	/* Skip the lot if 'data' isn't here yet (initialisation) */
1349 	for (i=0; data && data->partitions && data->partitions[i]; i++) {
1350 		struct ldb_seqnum_request *tseq;
1351 		struct ldb_seqnum_result *tseqr;
1352 		struct ldb_request *treq;
1353 		struct ldb_result *res = talloc_zero(data, struct ldb_result);
1354 		if (res == NULL) {
1355 			return ldb_oom(ldb_module_get_ctx(module));
1356 		}
1357 		tseq = talloc_zero(res, struct ldb_seqnum_request);
1358 		if (tseq == NULL) {
1359 			talloc_free(res);
1360 			return ldb_oom(ldb_module_get_ctx(module));
1361 		}
1362 		tseq->type = LDB_SEQ_HIGHEST_SEQ;
1363 
1364 		ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1365 					     LDB_EXTENDED_SEQUENCE_NUMBER,
1366 					     tseq,
1367 					     NULL,
1368 					     res,
1369 					     ldb_extended_default_callback,
1370 					     NULL);
1371 		LDB_REQ_SET_LOCATION(treq);
1372 		if (ret != LDB_SUCCESS) {
1373 			talloc_free(res);
1374 			return ret;
1375 		}
1376 
1377 		ret = partition_request(data->partitions[i]->module, treq);
1378 		if (ret != LDB_SUCCESS) {
1379 			talloc_free(res);
1380 			return ret;
1381 		}
1382 		ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1383 		if (ret != LDB_SUCCESS) {
1384 			talloc_free(res);
1385 			return ret;
1386 		}
1387 		tseqr = talloc_get_type(res->extended->data,
1388 					struct ldb_seqnum_result);
1389 		seq_number += tseqr->seq_num;
1390 		talloc_free(res);
1391 	}
1392 
1393 	*seqr = seq_number;
1394 	return LDB_SUCCESS;
1395 }
1396 
1397 
1398 /*
1399  * Newer version of sequence number using metadata tdb
1400  */
partition_sequence_number(struct ldb_module * module,struct ldb_request * req)1401 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1402 {
1403 	struct ldb_extended *ext;
1404 	struct ldb_seqnum_request *seq;
1405 	struct ldb_seqnum_result *seqr;
1406 	uint64_t seq_number;
1407 	int ret;
1408 
1409 	seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1410 	switch (seq->type) {
1411 	case LDB_SEQ_NEXT:
1412 		ret = partition_metadata_sequence_number_increment(module, &seq_number);
1413 		if (ret != LDB_SUCCESS) {
1414 			return ret;
1415 		}
1416 		break;
1417 
1418 	case LDB_SEQ_HIGHEST_SEQ:
1419 		ret = partition_metadata_sequence_number(module, &seq_number);
1420 		if (ret != LDB_SUCCESS) {
1421 			return ret;
1422 		}
1423 		break;
1424 
1425 	case LDB_SEQ_HIGHEST_TIMESTAMP:
1426 		return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1427 					"LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1428 	}
1429 
1430 	ext = talloc_zero(req, struct ldb_extended);
1431 	if (!ext) {
1432 		return ldb_module_oom(module);
1433 	}
1434 	seqr = talloc_zero(ext, struct ldb_seqnum_result);
1435 	if (seqr == NULL) {
1436 		talloc_free(ext);
1437 		return ldb_module_oom(module);
1438 	}
1439 	ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1440 	ext->data = seqr;
1441 
1442 	seqr->seq_num = seq_number;
1443 	seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1444 
1445 	/* send request done */
1446 	return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1447 }
1448 
1449 /* lock all the backends */
partition_read_lock(struct ldb_module * module)1450 int partition_read_lock(struct ldb_module *module)
1451 {
1452 	int i = 0;
1453 	int ret = 0;
1454 	int ret2 = 0;
1455 	struct ldb_context *ldb = ldb_module_get_ctx(module);
1456 	struct partition_private_data *data = \
1457 		talloc_get_type(ldb_module_get_private(module),
1458 				struct partition_private_data);
1459 
1460 	if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1461 		ldb_debug(ldb, LDB_DEBUG_TRACE,
1462 			  "partition_read_lock() -> (metadata partition)");
1463 	}
1464 
1465 	/*
1466 	 * It is important to only do this for LOCK because:
1467 	 * - we don't want to unlock what we did not lock
1468 	 *
1469 	 * - we don't want to make a new lock on the sam.ldb
1470 	 *   (triggered inside this routine due to the seq num check)
1471 	 *   during an unlock phase as that will violate the lock
1472 	 *   ordering
1473 	 */
1474 
1475 	if (data == NULL) {
1476 		TALLOC_CTX *mem_ctx = talloc_new(module);
1477 
1478 		data = talloc_zero(mem_ctx, struct partition_private_data);
1479 		if (data == NULL) {
1480 			talloc_free(mem_ctx);
1481 			return ldb_operr(ldb);
1482 		}
1483 
1484 		/*
1485 		 * When used from Samba4, this message is set by the
1486 		 * samba4 module, as a fixed value not read from the
1487 		 * DB.  This avoids listing modules in the DB
1488 		 */
1489 		data->forced_module_msg = talloc_get_type(
1490 			ldb_get_opaque(ldb,
1491 				       DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
1492 			struct ldb_message);
1493 
1494 		ldb_module_set_private(module, talloc_steal(module,
1495 							    data));
1496 		talloc_free(mem_ctx);
1497 	}
1498 
1499 	/*
1500 	 * This will lock sam.ldb and will also call event loops,
1501 	 * so we do it before we get the whole db lock.
1502 	 */
1503 	ret = partition_reload_if_required(module, data, NULL);
1504 	if (ret != LDB_SUCCESS) {
1505 		return ret;
1506 	}
1507 
1508 	/*
1509 	 * Order of read_lock calls must match that in partition_start_trans.
1510 	 * See comment in that function for detail.
1511 	 */
1512 	ret = partition_metadata_read_lock(module);
1513 	if (ret != LDB_SUCCESS) {
1514 		goto failed;
1515 	}
1516 
1517 	/*
1518 	 * The top level DB (sam.ldb) lock is not enough to block another
1519 	 * process in prepare_commit(), because if nothing was changed in the
1520 	 * specific backend, then prepare_commit() is a no-op. Therefore the
1521 	 * metadata.tdb lock is taken out above, as it is the best we can do
1522 	 * right now.
1523 	 */
1524 	ret = ldb_next_read_lock(module);
1525 	if (ret != LDB_SUCCESS) {
1526 		ldb_debug_set(ldb,
1527 			      LDB_DEBUG_FATAL,
1528 			      "Failed to lock db: %s / %s for metadata partition",
1529 			      ldb_errstring(ldb),
1530 			      ldb_strerror(ret));
1531 
1532 		return ret;
1533 	}
1534 
1535 	/*
1536 	 * The following per partition locks are required mostly because TDB
1537 	 * and MDB require locks before reads are permitted.
1538 	 */
1539 	for (i=0; data && data->partitions && data->partitions[i]; i++) {
1540 		if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1541 			ldb_debug(ldb, LDB_DEBUG_TRACE,
1542 				  "partition_read_lock() -> %s",
1543 				  ldb_dn_get_linearized(
1544 					  data->partitions[i]->ctrl->dn));
1545 		}
1546 		ret = ldb_next_read_lock(data->partitions[i]->module);
1547 		if (ret == LDB_SUCCESS) {
1548 			continue;
1549 		}
1550 
1551 		ldb_debug_set(ldb,
1552 			      LDB_DEBUG_FATAL,
1553 			      "Failed to lock db: %s / %s for %s",
1554 			      ldb_errstring(ldb),
1555 			      ldb_strerror(ret),
1556 			      ldb_dn_get_linearized(
1557 				      data->partitions[i]->ctrl->dn));
1558 
1559 		goto failed;
1560 	}
1561 
1562 	return LDB_SUCCESS;
1563 
1564 failed:
1565 	/* Back it out, if it fails on one */
1566 	for (i--; i >= 0; i--) {
1567 		ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1568 		if (ret2 != LDB_SUCCESS) {
1569 			ldb_debug(ldb,
1570 				  LDB_DEBUG_FATAL,
1571 				  "Failed to unlock db: %s / %s",
1572 				  ldb_errstring(ldb),
1573 				  ldb_strerror(ret2));
1574 		}
1575 	}
1576 	ret2 = ldb_next_read_unlock(module);
1577 	if (ret2 != LDB_SUCCESS) {
1578 		ldb_debug(ldb,
1579 			  LDB_DEBUG_FATAL,
1580 			  "Failed to unlock db: %s / %s",
1581 			  ldb_errstring(ldb),
1582 			  ldb_strerror(ret2));
1583 	}
1584 	return ret;
1585 }
1586 
1587 /* unlock all the backends */
partition_read_unlock(struct ldb_module * module)1588 int partition_read_unlock(struct ldb_module *module)
1589 {
1590 	int i;
1591 	int ret = LDB_SUCCESS;
1592 	int ret2;
1593 	struct ldb_context *ldb = ldb_module_get_ctx(module);
1594 	struct partition_private_data *data = \
1595 		talloc_get_type(ldb_module_get_private(module),
1596 				struct partition_private_data);
1597 	bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1598 
1599 	/*
1600 	 * Order of read_unlock calls must be the reverse of that in
1601 	 * partition_start_trans. See comment in that function for detail.
1602 	 */
1603 	if (data && data->partitions) {
1604 		/* Just counting the partitions */
1605 		for (i=0; data->partitions[i]; i++) {}
1606 
1607 		/* now walk them backwards */
1608 		for (i--; i>=0; i--) {
1609 			struct dsdb_partition *p = data->partitions[i];
1610 			if (trace) {
1611 				ldb_debug(ldb, LDB_DEBUG_TRACE,
1612 					  "partition_read_unlock() -> %s",
1613 					  ldb_dn_get_linearized(p->ctrl->dn));
1614 			}
1615 			ret2 = ldb_next_read_unlock(p->module);
1616 			if (ret2 != LDB_SUCCESS) {
1617 				ldb_debug_set(ldb,
1618 					   LDB_DEBUG_FATAL,
1619 					   "Failed to lock db: %s / %s for %s",
1620 					   ldb_errstring(ldb),
1621 					   ldb_strerror(ret),
1622 					   ldb_dn_get_linearized(p->ctrl->dn));
1623 
1624 				/*
1625 				 * Don't overwrite the original failure code
1626 				 * if there was one
1627 				 */
1628 				if (ret == LDB_SUCCESS) {
1629 					ret = ret2;
1630 				}
1631 			}
1632 		}
1633 	}
1634 
1635 	if (trace) {
1636 		ldb_debug(ldb, LDB_DEBUG_TRACE,
1637 			  "partition_read_unlock() -> (metadata partition)");
1638 	}
1639 
1640 	ret2 = ldb_next_read_unlock(module);
1641 	if (ret2 != LDB_SUCCESS) {
1642 		ldb_debug_set(ldb,
1643 			      LDB_DEBUG_FATAL,
1644 			      "Failed to unlock db: %s / %s for metadata partition",
1645 			      ldb_errstring(ldb),
1646 			      ldb_strerror(ret2));
1647 
1648 		/*
1649 		 * Don't overwrite the original failure code
1650 		 * if there was one
1651 		 */
1652 		if (ret == LDB_SUCCESS) {
1653 			ret = ret2;
1654 		}
1655 	}
1656 
1657 	ret = partition_metadata_read_unlock(module);
1658 
1659 	/*
1660 	 * Don't overwrite the original failure code
1661 	 * if there was one
1662 	 */
1663 	if (ret == LDB_SUCCESS) {
1664 		ret = ret2;
1665 	}
1666 
1667 	return ret;
1668 }
1669 
1670 /* extended */
partition_extended(struct ldb_module * module,struct ldb_request * req)1671 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1672 {
1673 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1674 							      struct partition_private_data);
1675 	struct partition_context *ac;
1676 	int ret;
1677 
1678 	/* if we aren't initialised yet go further */
1679 	if (!data) {
1680 		return ldb_next_request(module, req);
1681 	}
1682 
1683 	if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1684 		/* Update the metadata.tdb to increment the schema version if needed*/
1685 		DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1686 		ret = partition_metadata_inc_schema_sequence(module);
1687 		return ldb_module_done(req, NULL, NULL, ret);
1688 	}
1689 
1690 	if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1691 		return partition_sequence_number(module, req);
1692 	}
1693 
1694 	if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1695 		return partition_create(module, req);
1696 	}
1697 
1698 	/*
1699 	 * as the extended operation has no dn
1700 	 * we need to send it to all partitions
1701 	 */
1702 
1703 	ac = partition_init_ctx(module, req);
1704 	if (!ac) {
1705 		return ldb_operr(ldb_module_get_ctx(module));
1706 	}
1707 
1708 	return partition_send_all(module, ac, req);
1709 }
1710 
1711 static const struct ldb_module_ops ldb_partition_module_ops = {
1712 	.name		   = "partition",
1713 	.init_context	   = partition_init,
1714 	.search            = partition_search,
1715 	.add               = partition_add,
1716 	.modify            = partition_modify,
1717 	.del               = partition_delete,
1718 	.rename            = partition_rename,
1719 	.extended          = partition_extended,
1720 	.start_transaction = partition_start_trans,
1721 	.prepare_commit    = partition_prepare_commit,
1722 	.end_transaction   = partition_end_trans,
1723 	.del_transaction   = partition_del_trans,
1724 	.read_lock         = partition_read_lock,
1725 	.read_unlock       = partition_read_unlock
1726 };
1727 
ldb_partition_module_init(const char * version)1728 int ldb_partition_module_init(const char *version)
1729 {
1730 	LDB_MODULE_CHECK_VERSION(version);
1731 	return ldb_register_module(&ldb_partition_module_ops);
1732 }
1733