1 /* ndbio.cpp - get/set/del data for NDB */
2 /* OpenLDAP: pkg/ldap/servers/slapd/back-ndb/ndbio.cpp,v 1.3.2.3 2010/04/13 20:23:35 kurt Exp */
3 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4  *
5  * Copyright 2008-2010 The OpenLDAP Foundation.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted only as authorized by the OpenLDAP
10  * Public License.
11  *
12  * A copy of this license is available in the file LICENSE in the
13  * top-level directory of the distribution or, alternatively, at
14  * <http://www.OpenLDAP.org/license.html>.
15  */
16 /* ACKNOWLEDGEMENTS:
17  * This work was initially developed by Howard Chu for inclusion
18  * in OpenLDAP Software. This work was sponsored by MySQL.
19  */
20 
21 #include "portable.h"
22 
23 #include <stdio.h>
24 #include <ac/string.h>
25 #include <ac/errno.h>
26 #include <lutil.h>
27 
28 #include "back-ndb.h"
29 
30 /* For reference only */
31 typedef struct MedVar {
32 	Int16 len;	/* length is always little-endian */
33 	char buf[1024];
34 } MedVar;
35 
36 extern "C" {
37 	static int ndb_name_cmp( const void *v1, const void *v2 );
38 	static int ndb_oc_dup_err( void *v1, void *v2 );
39 };
40 
41 static int
42 ndb_name_cmp( const void *v1, const void *v2 )
43 {
44 	NdbOcInfo *oc1 = (NdbOcInfo *)v1, *oc2 = (NdbOcInfo *)v2;
45 	return ber_bvstrcasecmp( &oc1->no_name, &oc2->no_name );
46 }
47 
48 static int
49 ndb_oc_dup_err( void *v1, void *v2 )
50 {
51 	NdbOcInfo *oc = (NdbOcInfo *)v2;
52 
53 	oc->no_oc = (ObjectClass *)v1;
54 	return -1;
55 }
56 
57 /* Find an existing NdbAttrInfo */
58 extern "C" NdbAttrInfo *
59 ndb_ai_find( struct ndb_info *ni, AttributeType *at )
60 {
61 	NdbAttrInfo atmp;
62 	atmp.na_name = at->sat_cname;
63 
64 	return (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
65 }
66 
67 /* Find or create an NdbAttrInfo */
68 extern "C" NdbAttrInfo *
69 ndb_ai_get( struct ndb_info *ni, struct berval *aname )
70 {
71 	NdbAttrInfo atmp, *ai;
72 	atmp.na_name = *aname;
73 
74 	ai = (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
75 	if ( !ai ) {
76 		const char *text;
77 		AttributeDescription *ad = NULL;
78 
79 		if ( slap_bv2ad( aname, &ad, &text ))
80 			return NULL;
81 
82 		ai = (NdbAttrInfo *)ch_malloc( sizeof( NdbAttrInfo ));
83 		ai->na_desc = ad;
84 		ai->na_attr = ai->na_desc->ad_type;
85 		ai->na_name = ai->na_attr->sat_cname;
86 		ai->na_oi = NULL;
87 		ai->na_flag = 0;
88 		ai->na_ixcol = 0;
89 		ai->na_len = ai->na_attr->sat_atype.at_syntax_len;
90 		/* Reasonable default */
91 		if ( !ai->na_len ) {
92 			if ( ai->na_attr->sat_syntax == slap_schema.si_syn_distinguishedName )
93 				ai->na_len = 1024;
94 			else
95 				ai->na_len = 128;
96 		}
97 		/* Arbitrary limit */
98 		if ( ai->na_len > 1024 )
99 			ai->na_len = 1024;
100 		avl_insert( &ni->ni_ai_tree, ai, ndb_name_cmp, avl_dup_error );
101 	}
102 	return ai;
103 }
104 
105 static int
106 ndb_ai_check( struct ndb_info *ni, NdbOcInfo *oci, AttributeType **attrs, char **ptr, int *col,
107 	int create )
108 {
109 	NdbAttrInfo *ai;
110 	int i;
111 
112 	for ( i=0; attrs[i]; i++ ) {
113 		if ( attrs[i] == slap_schema.si_ad_objectClass->ad_type )
114 			continue;
115 		/* skip attrs that are in a superior */
116 		if ( oci->no_oc && oci->no_oc->soc_sups ) {
117 			int j, k, found=0;
118 			ObjectClass *oc;
119 			for ( j=0; oci->no_oc->soc_sups[j]; j++ ) {
120 				oc = oci->no_oc->soc_sups[j];
121 				if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
122 					continue;
123 				if ( oc->soc_required ) {
124 					for ( k=0; oc->soc_required[k]; k++ ) {
125 						if ( attrs[i] == oc->soc_required[k] ) {
126 							found = 1;
127 							break;
128 						}
129 					}
130 					if ( found ) break;
131 				}
132 				if ( oc->soc_allowed ) {
133 					for ( k=0; oc->soc_allowed[k]; k++ ) {
134 						if ( attrs[i] == oc->soc_allowed[k] ) {
135 							found = 1;
136 							break;
137 						}
138 					}
139 					if ( found ) break;
140 				}
141 			}
142 			if ( found )
143 				continue;
144 		}
145 
146 		ai = ndb_ai_get( ni, &attrs[i]->sat_cname );
147 		if ( !ai ) {
148 			/* can never happen */
149 			return LDAP_OTHER;
150 		}
151 
152 		/* An attrset may have already been connected */
153 		if (( oci->no_flag & NDB_INFO_ATSET ) && ai->na_oi == oci )
154 			continue;
155 
156 		/* An indexed attr is defined before its OC is */
157 		if ( !ai->na_oi ) {
158 			ai->na_oi = oci;
159 			ai->na_column = (*col)++;
160 		}
161 
162 		oci->no_attrs[oci->no_nattrs++] = ai;
163 
164 		/* An attrset attr may already be defined */
165 		if ( ai->na_oi != oci ) {
166 			int j;
167 			for ( j=0; j<oci->no_nsets; j++ )
168 				if ( oci->no_sets[j] == ai->na_oi ) break;
169 			if ( j >= oci->no_nsets ) {
170 				/* FIXME: data loss if more sets are in use */
171 				if ( oci->no_nsets < NDB_MAX_OCSETS ) {
172 					oci->no_sets[oci->no_nsets++] = ai->na_oi;
173 				}
174 			}
175 			continue;
176 		}
177 
178 		if ( create ) {
179 			if ( ai->na_flag & NDB_INFO_ATBLOB ) {
180 				*ptr += sprintf( *ptr, ", `%s` BLOB", ai->na_attr->sat_cname.bv_val );
181 			} else {
182 				*ptr += sprintf( *ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
183 					ai->na_len );
184 			}
185 		}
186 	}
187 	return 0;
188 }
189 
190 static int
191 ndb_oc_create( struct ndb_info *ni, NdbOcInfo *oci, int create )
192 {
193 	char buf[4096], *ptr;
194 	int i, rc = 0, col;
195 
196 	if ( create ) {
197 		ptr = buf + sprintf( buf,
198 			"CREATE TABLE `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
199 			oci->no_table.bv_val );
200 	}
201 
202 	col = 0;
203 	if ( oci->no_oc->soc_required ) {
204 		for ( i=0; oci->no_oc->soc_required[i]; i++ );
205 		col += i;
206 	}
207 	if ( oci->no_oc->soc_allowed ) {
208 		for ( i=0; oci->no_oc->soc_allowed[i]; i++ );
209 		col += i;
210 	}
211 	/* assume all are present */
212 	oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
213 
214 	col = 2;
215 	ldap_pvt_thread_rdwr_wlock( &ni->ni_ai_rwlock );
216 	if ( oci->no_oc->soc_required ) {
217 		rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, &ptr, &col, create );
218 	}
219 	if ( !rc && oci->no_oc->soc_allowed ) {
220 		rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, &ptr, &col, create );
221 	}
222 	ldap_pvt_thread_rdwr_wunlock( &ni->ni_ai_rwlock );
223 
224 	/* shrink down to just the needed size */
225 	oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
226 		oci->no_nattrs * sizeof(struct ndb_attrinfo *));
227 
228 	if ( create ) {
229 		ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
230 		rc = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
231 		if ( rc ) {
232 			Debug( LDAP_DEBUG_ANY,
233 				"ndb_oc_create: CREATE TABLE %s failed, %s (%d)\n",
234 				oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
235 		}
236 	}
237 	return rc;
238 }
239 
240 /* Read table definitions from the DB and populate ObjectClassInfo */
241 extern "C" int
242 ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
243 {
244 	const NdbDictionary::Table *myTable;
245 	const NdbDictionary::Column *myCol;
246 	NdbOcInfo *oci, octmp;
247 	NdbAttrInfo *ai;
248 	ObjectClass *oc;
249 	NdbDictionary::Dictionary::List myList;
250 	struct berval bv;
251 	int i, j, rc, col;
252 
253 	rc = myDict->listObjects( myList, NdbDictionary::Object::UserTable );
254 	/* Populate our objectClass structures */
255 	for ( i=0; i<myList.count; i++ ) {
256 		/* Ignore other DBs */
257 		if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
258 			continue;
259 		/* Ignore internal tables */
260 		if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
261 			continue;
262 		ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
263 		oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
264 		if ( oci )
265 			continue;
266 
267 		oc = oc_bvfind( &octmp.no_name );
268 		if ( !oc ) {
269 			/* undefined - shouldn't happen */
270 			continue;
271 		}
272 		myTable = myDict->getTable( myList.elements[i].name );
273 		oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
274 		oci->no_table.bv_val = (char *)(oci+1);
275 		strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
276 		oci->no_table.bv_len = oc->soc_cname.bv_len;
277 		oci->no_name = oci->no_table;
278 		oci->no_oc = oc;
279 		oci->no_flag = 0;
280 		oci->no_nsets = 0;
281 		oci->no_nattrs = 0;
282 		col = 0;
283 		/* Make space for all attrs, even tho sups will be dropped */
284 		if ( oci->no_oc->soc_required ) {
285 			for ( j=0; oci->no_oc->soc_required[j]; j++ );
286 			col = j;
287 		}
288 		if ( oci->no_oc->soc_allowed ) {
289 			for ( j=0; oci->no_oc->soc_allowed[j]; j++ );
290 			col += j;
291 		}
292 		oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
293 		avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
294 
295 		col = myTable->getNoOfColumns();
296 		/* Skip 0 and 1, eid and vid */
297 		for ( j = 2; j<col; j++ ) {
298 			myCol = myTable->getColumn( j );
299 			ber_str2bv( myCol->getName(), 0, 0, &bv );
300 			ai = ndb_ai_get( ni, &bv );
301 			/* shouldn't happen */
302 			if ( !ai )
303 				continue;
304 			ai->na_oi = oci;
305 			ai->na_column = j;
306 			ai->na_len = myCol->getLength();
307 			if ( myCol->getType() == NdbDictionary::Column::Blob )
308 				ai->na_flag |= NDB_INFO_ATBLOB;
309 		}
310 	}
311 	/* Link to any attrsets */
312 	for ( i=0; i<myList.count; i++ ) {
313 		/* Ignore other DBs */
314 		if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
315 			continue;
316 		/* Ignore internal tables */
317 		if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
318 			continue;
319 		ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
320 		oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
321 		/* shouldn't happen */
322 		if ( !oci )
323 			continue;
324 		col = 2;
325 		if ( oci->no_oc->soc_required ) {
326 			rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, NULL, &col, 0 );
327 		}
328 		if ( oci->no_oc->soc_allowed ) {
329 			rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, NULL, &col, 0 );
330 		}
331 		/* shrink down to just the needed size */
332 		oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
333 			oci->no_nattrs * sizeof(struct ndb_attrinfo *));
334 	}
335 	return 0;
336 }
337 
338 static int
339 ndb_oc_list( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
340 	struct berval *oname, int implied, NdbOcs *out )
341 {
342 	const NdbDictionary::Table *myTable;
343 	NdbOcInfo *oci, octmp;
344 	ObjectClass *oc;
345 	int i, rc;
346 
347 	/* shortcut top */
348 	if ( ber_bvstrcasecmp( oname, &slap_schema.si_oc_top->soc_cname )) {
349 		octmp.no_name = *oname;
350 		oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
351 		if ( oci ) {
352 			oc = oci->no_oc;
353 		} else {
354 			oc = oc_bvfind( oname );
355 			if ( !oc ) {
356 				/* undefined - shouldn't happen */
357 				return LDAP_INVALID_SYNTAX;
358 			}
359 		}
360 		if ( oc->soc_sups ) {
361 			int i;
362 
363 			for ( i=0; oc->soc_sups[i]; i++ ) {
364 				rc = ndb_oc_list( ni, myDict, &oc->soc_sups[i]->soc_cname, 1, out );
365 				if ( rc ) return rc;
366 			}
367 		}
368 	} else {
369 		oc = slap_schema.si_oc_top;
370 	}
371 	/* Only insert once */
372 	for ( i=0; i<out->no_ntext; i++ )
373 		if ( out->no_text[i].bv_val == oc->soc_cname.bv_val )
374 			break;
375 	if ( i == out->no_ntext ) {
376 		for ( i=0; i<out->no_nitext; i++ )
377 			if ( out->no_itext[i].bv_val == oc->soc_cname.bv_val )
378 				break;
379 		if ( i == out->no_nitext ) {
380 			if ( implied )
381 				out->no_itext[out->no_nitext++] = oc->soc_cname;
382 			else
383 				out->no_text[out->no_ntext++] = oc->soc_cname;
384 		}
385 	}
386 
387 	/* ignore top, etc... */
388 	if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
389 		return 0;
390 
391 	if ( !oci ) {
392 		ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
393 		oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
394 		oci->no_table.bv_val = (char *)(oci+1);
395 		strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
396 		oci->no_table.bv_len = oc->soc_cname.bv_len;
397 		oci->no_name = oci->no_table;
398 		oci->no_oc = oc;
399 		oci->no_flag = 0;
400 		oci->no_nsets = 0;
401 		oci->no_nattrs = 0;
402 		ldap_pvt_thread_rdwr_wlock( &ni->ni_oc_rwlock );
403 		if ( avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, ndb_oc_dup_err )) {
404 			octmp.no_oc = oci->no_oc;
405 			ch_free( oci );
406 			oci = (NdbOcInfo *)octmp.no_oc;
407 		}
408 		/* see if the oc table already exists in the DB */
409 		myTable = myDict->getTable( oci->no_table.bv_val );
410 		rc = ndb_oc_create( ni, oci, myTable == NULL );
411 		ldap_pvt_thread_rdwr_wunlock( &ni->ni_oc_rwlock );
412 		ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
413 		if ( rc ) return rc;
414 	}
415 	/* Only insert once */
416 	for ( i=0; i<out->no_ninfo; i++ )
417 		if ( out->no_info[i] == oci )
418 			break;
419 	if ( i == out->no_ninfo )
420 		out->no_info[out->no_ninfo++] = oci;
421 	return 0;
422 }
423 
424 extern "C" int
425 ndb_aset_get( struct ndb_info *ni, struct berval *sname, struct berval *attrs, NdbOcInfo **ret )
426 {
427 	NdbOcInfo *oci, octmp;
428 	int i, rc;
429 
430 	octmp.no_name = *sname;
431 	oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
432 	if ( oci )
433 		return LDAP_ALREADY_EXISTS;
434 
435 	for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ ) {
436 		if ( !at_bvfind( &attrs[i] ))
437 			return LDAP_NO_SUCH_ATTRIBUTE;
438 	}
439 	i++;
440 
441 	oci = (NdbOcInfo *)ch_calloc( 1, sizeof( NdbOcInfo ) + sizeof( ObjectClass ) +
442 		i*sizeof(AttributeType *) + sname->bv_len+1 );
443 	oci->no_oc = (ObjectClass *)(oci+1);
444 	oci->no_oc->soc_required = (AttributeType **)(oci->no_oc+1);
445 	oci->no_table.bv_val = (char *)(oci->no_oc->soc_required+i);
446 
447 	for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ )
448 		oci->no_oc->soc_required[i] = at_bvfind( &attrs[i] );
449 
450 	strcpy( oci->no_table.bv_val, sname->bv_val );
451 	oci->no_table.bv_len = sname->bv_len;
452 	oci->no_name = oci->no_table;
453 	oci->no_oc->soc_cname = oci->no_name;
454 	oci->no_flag = NDB_INFO_ATSET;
455 
456 	if ( !ber_bvcmp( sname, &slap_schema.si_oc_extensibleObject->soc_cname ))
457 		oci->no_oc->soc_kind = slap_schema.si_oc_extensibleObject->soc_kind;
458 
459 	rc = ndb_oc_create( ni, oci, 0 );
460 	if ( !rc )
461 		rc = avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
462 	if ( rc ) {
463 		ch_free( oci );
464 	} else {
465 		*ret = oci;
466 	}
467 	return rc;
468 }
469 
470 extern "C" int
471 ndb_aset_create( struct ndb_info *ni, NdbOcInfo *oci )
472 {
473 	char buf[4096], *ptr;
474 	NdbAttrInfo *ai;
475 	int i;
476 
477 	ptr = buf + sprintf( buf,
478 		"CREATE TABLE IF NOT EXISTS `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
479 		oci->no_table.bv_val );
480 
481 	for ( i=0; i<oci->no_nattrs; i++ ) {
482 		if ( oci->no_attrs[i]->na_oi != oci )
483 			continue;
484 		ai = oci->no_attrs[i];
485 		ptr += sprintf( ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
486 			ai->na_len );
487 		if ( ai->na_flag & NDB_INFO_INDEX ) {
488 			ptr += sprintf( ptr, ", INDEX (`%s`)", ai->na_attr->sat_cname.bv_val );
489 		}
490 	}
491 	ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
492 	i = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
493 	if ( i ) {
494 		Debug( LDAP_DEBUG_ANY,
495 			"ndb_aset_create: CREATE TABLE %s failed, %s (%d)\n",
496 			oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
497 	}
498 	return i;
499 }
500 
501 static int
502 ndb_oc_check( BackendDB *be, Ndb *ndb,
503 	struct berval *ocsin, NdbOcs *out )
504 {
505 	struct ndb_info *ni = (struct ndb_info *) be->be_private;
506 	const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
507 
508 	int i, rc = 0;
509 
510 	out->no_ninfo = 0;
511 	out->no_ntext = 0;
512 	out->no_nitext = 0;
513 
514 	/* Find all objectclasses and their superiors. List
515 	 * the superiors first.
516 	 */
517 
518 	ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
519 	for ( i=0; !BER_BVISNULL( &ocsin[i] ); i++ ) {
520 		rc = ndb_oc_list( ni, myDict, &ocsin[i], 0, out );
521 		if ( rc ) break;
522 	}
523 	ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
524 	return rc;
525 }
526 
527 #define	V_INS	1
528 #define	V_DEL	2
529 #define	V_REP	3
530 
531 static int ndb_flush_blobs;
532 
533 /* set all the unique attrs of this objectclass into the table
534  */
535 extern "C" int
536 ndb_oc_attrs(
537 	NdbTransaction *txn,
538 	const NdbDictionary::Table *myTable,
539 	Entry *e,
540 	NdbOcInfo *no,
541 	NdbAttrInfo **attrs,
542 	int nattrs,
543 	Attribute *old
544 )
545 {
546 	char buf[65538], *ptr;
547 	Attribute **an, **ao, *a;
548 	NdbOperation *myop;
549 	int i, j, max = 0;
550 	int changed, rc;
551 	Uint64 eid = e->e_id;
552 
553 	if ( !nattrs )
554 		return 0;
555 
556 	an = (Attribute **)ch_malloc( 2 * nattrs * sizeof(Attribute *));
557 	ao = an + nattrs;
558 
559 	/* Turn lists of attrs into arrays for easier access */
560 	for ( i=0; i<nattrs; i++ ) {
561 		if ( attrs[i]->na_oi != no ) {
562 			an[i] = NULL;
563 			ao[i] = NULL;
564 			continue;
565 		}
566 		for ( a=e->e_attrs; a; a=a->a_next ) {
567 			if ( a->a_desc == slap_schema.si_ad_objectClass )
568 				continue;
569 			if ( a->a_desc->ad_type == attrs[i]->na_attr ) {
570 				/* Don't process same attr twice */
571 				if ( a->a_flags & SLAP_ATTR_IXADD )
572 					a = NULL;
573 				else
574 					a->a_flags |= SLAP_ATTR_IXADD;
575 				break;
576 			}
577 		}
578 		an[i] = a;
579 		if ( a && a->a_numvals > max )
580 			max = a->a_numvals;
581 		for ( a=old; a; a=a->a_next ) {
582 			if ( a->a_desc == slap_schema.si_ad_objectClass )
583 				continue;
584 			if ( a->a_desc->ad_type == attrs[i]->na_attr )
585 				break;
586 		}
587 		ao[i] = a;
588 		if ( a && a->a_numvals > max )
589 			max = a->a_numvals;
590 	}
591 
592 	for ( i=0; i<max; i++ ) {
593 		myop = NULL;
594 		for ( j=0; j<nattrs; j++ ) {
595 			if ( !an[j] && !ao[j] )
596 				continue;
597 			changed = 0;
598 			if ( an[j] && an[j]->a_numvals > i ) {
599 				/* both old and new are present, compare for changes */
600 				if ( ao[j] && ao[j]->a_numvals > i ) {
601 					if ( ber_bvcmp( &ao[j]->a_nvals[i], &an[j]->a_nvals[i] ))
602 						changed = V_REP;
603 				} else {
604 					changed = V_INS;
605 				}
606 			} else {
607 				if ( ao[j] && ao[j]->a_numvals > i )
608 					changed = V_DEL;
609 			}
610 			if ( changed ) {
611 				if ( !myop ) {
612 					rc = LDAP_OTHER;
613 					myop = txn->getNdbOperation( myTable );
614 					if ( !myop ) {
615 						goto done;
616 					}
617 					if ( old ) {
618 						if ( myop->writeTuple()) {
619 							goto done;
620 						}
621 					} else {
622 						if ( myop->insertTuple()) {
623 							goto done;
624 						}
625 					}
626 					if ( myop->equal( EID_COLUMN, eid )) {
627 						goto done;
628 					}
629 					if ( myop->equal( VID_COLUMN, i )) {
630 						goto done;
631 					}
632 				}
633 				if ( attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
634 					NdbBlob *myBlob = myop->getBlobHandle( attrs[j]->na_column );
635 					rc = LDAP_OTHER;
636 					if ( !myBlob ) {
637 						Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: getBlobHandle failed %s (%d)\n",
638 							myop->getNdbError().message, myop->getNdbError().code, 0 );
639 						goto done;
640 					}
641 					if ( slapMode & SLAP_TOOL_MODE )
642 						ndb_flush_blobs = 1;
643 					if ( changed & V_INS ) {
644 						if ( myBlob->setValue( an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len )) {
645 							Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
646 								myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
647 							goto done;
648 						}
649 					} else {
650 						if ( myBlob->setValue( NULL, 0 )) {
651 							Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
652 								myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
653 							goto done;
654 						}
655 					}
656 				} else {
657 					if ( changed & V_INS ) {
658 						if ( an[j]->a_vals[i].bv_len > attrs[j]->na_len ) {
659 							Debug( LDAP_DEBUG_ANY, "ndb_oc_attrs: attribute %s too long for column\n",
660 								attrs[j]->na_name.bv_val, 0, 0 );
661 							rc = LDAP_CONSTRAINT_VIOLATION;
662 							goto done;
663 						}
664 						ptr = buf;
665 						*ptr++ = an[j]->a_vals[i].bv_len & 0xff;
666 						if ( attrs[j]->na_len > 255 ) {
667 							/* MedVar */
668 							*ptr++ = an[j]->a_vals[i].bv_len >> 8;
669 						}
670 						memcpy( ptr, an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len );
671 						ptr = buf;
672 					} else {
673 						ptr = NULL;
674 					}
675 					if ( myop->setValue( attrs[j]->na_column, ptr )) {
676 						rc = LDAP_OTHER;
677 						goto done;
678 					}
679 				}
680 			}
681 		}
682 	}
683 	rc = LDAP_SUCCESS;
684 done:
685 	ch_free( an );
686 	if ( rc ) {
687 		Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: failed %s (%d)\n",
688 			myop->getNdbError().message, myop->getNdbError().code, 0 );
689 	}
690 	return rc;
691 }
692 
693 static int
694 ndb_oc_put(
695 	const NdbDictionary::Dictionary *myDict,
696 	NdbTransaction *txn, NdbOcInfo *no, Entry *e )
697 {
698 	const NdbDictionary::Table *myTable;
699 	int i, rc;
700 
701 	for ( i=0; i<no->no_nsets; i++ ) {
702 		rc = ndb_oc_put( myDict, txn, no->no_sets[i], e );
703 		if ( rc )
704 			return rc;
705 	}
706 
707 	myTable = myDict->getTable( no->no_table.bv_val );
708 	if ( !myTable )
709 		return LDAP_OTHER;
710 
711 	return ndb_oc_attrs( txn, myTable, e, no, no->no_attrs, no->no_nattrs, NULL );
712 }
713 
714 /* This is now only used for Adds. Modifies call ndb_oc_attrs directly. */
715 extern "C" int
716 ndb_entry_put_data(
717 	BackendDB *be,
718 	NdbArgs *NA
719 )
720 {
721 	struct ndb_info *ni = (struct ndb_info *) be->be_private;
722 	Attribute *aoc;
723 	const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
724 	NdbOcs myOcs;
725 	int i, rc;
726 
727 	/* Get the entry's objectClass attribute */
728 	aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
729 	if ( !aoc )
730 		return LDAP_OTHER;
731 
732 	ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
733 	myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
734 
735 	/* Walk thru objectclasses, find all the attributes belonging to a class */
736 	for ( i=0; i<myOcs.no_ninfo; i++ ) {
737 		rc = ndb_oc_put( myDict, NA->txn, myOcs.no_info[i], NA->e );
738 		if ( rc ) return rc;
739 	}
740 
741 	/* slapadd tries to batch multiple entries per txn, but entry data is
742 	 * transient and blob data is required to remain valid for the whole txn.
743 	 * So we need to flush blobs before their source data disappears.
744 	 */
745 	if (( slapMode & SLAP_TOOL_MODE ) && ndb_flush_blobs )
746 		NA->txn->execute( NdbTransaction::NoCommit );
747 
748 	return 0;
749 }
750 
751 static void
752 ndb_oc_get( Operation *op, NdbOcInfo *no, int *j, int *nocs, NdbOcInfo ***oclist )
753 {
754 	int i;
755 	NdbOcInfo  **ol2;
756 
757 	for ( i=0; i<no->no_nsets; i++ ) {
758 		ndb_oc_get( op, no->no_sets[i], j, nocs, oclist );
759 	}
760 
761 	/* Don't insert twice */
762 	ol2 = *oclist;
763 	for ( i=0; i<*j; i++ )
764 		if ( ol2[i] == no )
765 			return;
766 
767 	if ( *j >= *nocs ) {
768 		*nocs *= 2;
769 		ol2 = (NdbOcInfo **)op->o_tmprealloc( *oclist, *nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
770 		*oclist = ol2;
771 	}
772 	ol2 = *oclist;
773 	ol2[(*j)++] = no;
774 }
775 
776 /* Retrieve attribute data for given entry. The entry's DN and eid should
777  * already be populated.
778  */
779 extern "C" int
780 ndb_entry_get_data(
781 	Operation *op,
782 	NdbArgs *NA,
783 	int update
784 )
785 {
786 	struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
787 	const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
788 	const NdbDictionary::Table *myTable;
789 	NdbIndexScanOperation **myop = NULL;
790 	Uint64 eid;
791 
792 	Attribute *a;
793 	NdbOcs myOcs;
794 	NdbOcInfo *oci, **oclist = NULL;
795 	char abuf[65536], *ptr, **attrs = NULL;
796 	struct berval bv[2];
797 	int *ocx = NULL;
798 
799 	/* FIXME: abuf should be dynamically allocated */
800 
801 	int i, j, k, nocs, nattrs, rc = LDAP_OTHER;
802 
803 	eid = NA->e->e_id;
804 
805 	ndb_oc_check( op->o_bd, NA->ndb, NA->ocs, &myOcs );
806 	myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
807 	nocs = myOcs.no_ninfo;
808 
809 	oclist = (NdbOcInfo **)op->o_tmpcalloc( 1, nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
810 
811 	for ( i=0, j=0; i<myOcs.no_ninfo; i++ ) {
812 		ndb_oc_get( op, myOcs.no_info[i], &j, &nocs, &oclist );
813 	}
814 
815 	nocs = j;
816 	nattrs = 0;
817 	for ( i=0; i<nocs; i++ )
818 		nattrs += oclist[i]->no_nattrs;
819 
820 	ocx = (int *)op->o_tmpalloc( nocs * sizeof(int), op->o_tmpmemctx );
821 
822 	attrs = (char **)op->o_tmpalloc( nattrs * sizeof(char *), op->o_tmpmemctx );
823 
824 	myop = (NdbIndexScanOperation **)op->o_tmpalloc( nattrs * sizeof(NdbIndexScanOperation *), op->o_tmpmemctx );
825 
826 	k = 0;
827 	ptr = abuf;
828 	for ( i=0; i<nocs; i++ ) {
829 		oci = oclist[i];
830 
831 		myop[i] = NA->txn->getNdbIndexScanOperation( "PRIMARY", oci->no_table.bv_val );
832 		if ( !myop[i] )
833 			goto leave;
834 		if ( myop[i]->readTuples( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead ))
835 			goto leave;
836 		if ( myop[i]->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
837 			goto leave;
838 
839 		for ( j=0; j<oci->no_nattrs; j++ ) {
840 			if ( oci->no_attrs[j]->na_oi != oci )
841 				continue;
842 			if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
843 				NdbBlob *bi = myop[i]->getBlobHandle( oci->no_attrs[j]->na_column );
844 				attrs[k++] = (char *)bi;
845 			} else {
846 				attrs[k] = ptr;
847 				*ptr++ = 0;
848 				if ( oci->no_attrs[j]->na_len > 255 )
849 					*ptr++ = 0;
850 				ptr += oci->no_attrs[j]->na_len + 1;
851 				myop[i]->getValue( oci->no_attrs[j]->na_column, attrs[k++] );
852 			}
853 		}
854 		ocx[i] = k;
855 	}
856 	/* Must use IgnoreError, because an entry with multiple objectClasses may not
857 	 * actually have attributes defined in each class / table.
858 	 */
859 	if ( NA->txn->execute( NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 )
860 		goto leave;
861 
862 	/* count results */
863 	for ( i=0; i<nocs; i++ ) {
864 		if (( j = myop[i]->nextResult(true) )) {
865 			if ( j < 0 ) {
866 				Debug( LDAP_DEBUG_TRACE,
867 					"ndb_entry_get_data: first nextResult(%d) failed: %s (%d)\n",
868 					i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
869 			}
870 			myop[i] = NULL;
871 		}
872 	}
873 
874 	nattrs = 0;
875 	k = 0;
876 	for ( i=0; i<nocs; i++ ) {
877 		oci = oclist[i];
878 		for ( j=0; j<oci->no_nattrs; j++ ) {
879 			unsigned char *buf;
880 			int len;
881 			if ( oci->no_attrs[j]->na_oi != oci )
882 				continue;
883 			if ( !myop[i] ) {
884 				attrs[k] = NULL;
885 			} else if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
886 				void *vi = attrs[k];
887 				NdbBlob *bi = (NdbBlob *)vi;
888 				int isNull;
889 				bi->getNull( isNull );
890 				if ( !isNull ) {
891 					nattrs++;
892 				} else {
893 					attrs[k] = NULL;
894 				}
895 			} else {
896 				buf = (unsigned char *)attrs[k];
897 				len = buf[0];
898 				if ( oci->no_attrs[j]->na_len > 255 ) {
899 					/* MedVar */
900 					len |= (buf[1] << 8);
901 				}
902 				if ( len ) {
903 					nattrs++;
904 				} else {
905 					attrs[k] = NULL;
906 				}
907 			}
908 			k++;
909 		}
910 	}
911 
912 	a = attrs_alloc( nattrs+1 );
913 	NA->e->e_attrs = a;
914 
915 	a->a_desc = slap_schema.si_ad_objectClass;
916 	a->a_vals = NULL;
917 	ber_bvarray_dup_x( &a->a_vals, NA->ocs, NULL );
918 	a->a_nvals = a->a_vals;
919 	a->a_numvals = myOcs.no_ntext;
920 
921 	BER_BVZERO( &bv[1] );
922 
923 	do {
924 		a = NA->e->e_attrs->a_next;
925 		k = 0;
926 		for ( i=0; i<nocs; k=ocx[i], i++ ) {
927 			oci = oclist[i];
928 			for ( j=0; j<oci->no_nattrs; j++ ) {
929 				unsigned char *buf;
930 				struct berval nbv;
931 				if ( oci->no_attrs[j]->na_oi != oci )
932 					continue;
933 				buf = (unsigned char *)attrs[k++];
934 				if ( !buf )
935 					continue;
936 				if ( !myop[i] ) {
937 					a=a->a_next;
938 					continue;
939 				}
940 				if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
941 					void *vi = (void *)buf;
942 					NdbBlob *bi = (NdbBlob *)vi;
943 					Uint64 len;
944 					Uint32 len2;
945 					int isNull;
946 					bi->getNull( isNull );
947 					if ( isNull ) {
948 						a = a->a_next;
949 						continue;
950 					}
951 					bi->getLength( len );
952 					bv[0].bv_len = len;
953 					bv[0].bv_val = (char *)ch_malloc( len+1 );
954 					len2 = len;
955 					if ( bi->readData( bv[0].bv_val, len2 )) {
956 						Debug( LDAP_DEBUG_TRACE,
957 							"ndb_entry_get_data: blob readData failed: %s (%d), len %d\n",
958 							bi->getNdbError().message, bi->getNdbError().code, len2 );
959 					}
960 					bv[0].bv_val[len] = '\0';
961 					ber_bvarray_add_x( &a->a_vals, bv, NULL );
962 				} else {
963 					bv[0].bv_len = buf[0];
964 					if ( oci->no_attrs[j]->na_len > 255 ) {
965 						/* MedVar */
966 						bv[0].bv_len |= (buf[1] << 8);
967 						bv[0].bv_val = (char *)buf+2;
968 						buf[1] = 0;
969 					} else {
970 						bv[0].bv_val = (char *)buf+1;
971 					}
972 					buf[0] = 0;
973 					if ( bv[0].bv_len == 0 ) {
974 						a = a->a_next;
975 						continue;
976 					}
977 					bv[0].bv_val[bv[0].bv_len] = '\0';
978 					value_add_one( &a->a_vals, bv );
979 				}
980 				a->a_desc = oci->no_attrs[j]->na_desc;
981 				attr_normalize_one( a->a_desc, bv, &nbv, NULL );
982 				a->a_numvals++;
983 				if ( !BER_BVISNULL( &nbv )) {
984 					ber_bvarray_add_x( &a->a_nvals, &nbv, NULL );
985 				} else if ( !a->a_nvals ) {
986 					a->a_nvals = a->a_vals;
987 				}
988 				a = a->a_next;
989 			}
990 		}
991 		k = 0;
992 		for ( i=0; i<nocs; i++ ) {
993 			if ( !myop[i] )
994 				continue;
995 			if ((j = myop[i]->nextResult(true))) {
996 				if ( j < 0 ) {
997 					Debug( LDAP_DEBUG_TRACE,
998 						"ndb_entry_get_data: last nextResult(%d) failed: %s (%d)\n",
999 						i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
1000 				}
1001 				myop[i] = NULL;
1002 			} else {
1003 				k = 1;
1004 			}
1005 		}
1006 	} while ( k );
1007 
1008 	rc = 0;
1009 leave:
1010 	if ( myop ) {
1011 		op->o_tmpfree( myop, op->o_tmpmemctx );
1012 	}
1013 	if ( attrs ) {
1014 		op->o_tmpfree( attrs, op->o_tmpmemctx );
1015 	}
1016 	if ( ocx ) {
1017 		op->o_tmpfree( ocx, op->o_tmpmemctx );
1018 	}
1019 	if ( oclist ) {
1020 		op->o_tmpfree( oclist, op->o_tmpmemctx );
1021 	}
1022 
1023 	return rc;
1024 }
1025 
1026 static int
1027 ndb_oc_del(
1028 	NdbTransaction *txn, Uint64 eid, NdbOcInfo *no )
1029 {
1030 	NdbIndexScanOperation *myop;
1031 	int i, rc;
1032 
1033 	for ( i=0; i<no->no_nsets; i++ ) {
1034 		rc = ndb_oc_del( txn, eid, no->no_sets[i] );
1035 		if ( rc ) return rc;
1036 	}
1037 
1038 	myop = txn->getNdbIndexScanOperation( "PRIMARY", no->no_table.bv_val );
1039 	if ( !myop )
1040 		return LDAP_OTHER;
1041 	if ( myop->readTuples( NdbOperation::LM_Exclusive ))
1042 		return LDAP_OTHER;
1043 	if ( myop->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
1044 		return LDAP_OTHER;
1045 
1046 	txn->execute(NoCommit);
1047 	while ( myop->nextResult(true) == 0) {
1048 		do {
1049 			myop->deleteCurrentTuple();
1050 		} while (myop->nextResult(false) == 0);
1051 		txn->execute(NoCommit);
1052 	}
1053 
1054 	return 0;
1055 }
1056 
1057 extern "C" int
1058 ndb_entry_del_data(
1059 	BackendDB *be,
1060 	NdbArgs *NA
1061 )
1062 {
1063 	struct ndb_info *ni = (struct ndb_info *) be->be_private;
1064 	Uint64 eid = NA->e->e_id;
1065 	int i;
1066 	NdbOcs myOcs;
1067 
1068 	ndb_oc_check( be, NA->ndb, NA->ocs, &myOcs );
1069 	myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
1070 
1071 	for ( i=0; i<myOcs.no_ninfo; i++ ) {
1072 		if ( ndb_oc_del( NA->txn, eid, myOcs.no_info[i] ))
1073 			return LDAP_OTHER;
1074 	}
1075 
1076 	return 0;
1077 }
1078 
1079 extern "C" int
1080 ndb_dn2rdns(
1081 	struct berval *dn,
1082 	NdbRdns *rdns
1083 )
1084 {
1085 	char *beg, *end;
1086 	int i, len;
1087 
1088 	/* Walk thru RDNs */
1089 	end = dn->bv_val + dn->bv_len;
1090 	for ( i=0; i<NDB_MAX_RDNS; i++ ) {
1091 		for ( beg = end-1; beg > dn->bv_val; beg-- ) {
1092 			if (*beg == ',') {
1093 				beg++;
1094 				break;
1095 			}
1096 		}
1097 		if ( beg >= dn->bv_val ) {
1098 			len = end - beg;
1099 			/* RDN is too long */
1100 			if ( len > NDB_RDN_LEN )
1101 				return LDAP_CONSTRAINT_VIOLATION;
1102 			memcpy( rdns->nr_buf[i]+1, beg, len );
1103 		} else {
1104 			break;
1105 		}
1106 		rdns->nr_buf[i][0] = len;
1107 		end = beg - 1;
1108 	}
1109 	/* Too many RDNs in DN */
1110 	if ( i == NDB_MAX_RDNS && beg > dn->bv_val ) {
1111 			return LDAP_CONSTRAINT_VIOLATION;
1112 	}
1113 	rdns->nr_num = i;
1114 	return 0;
1115 }
1116 
1117 static int
1118 ndb_rdns2keys(
1119 	NdbOperation *myop,
1120 	NdbRdns *rdns
1121 )
1122 {
1123 	int i;
1124 	char dummy[2] = {0,0};
1125 
1126 	/* Walk thru RDNs */
1127 	for ( i=0; i<rdns->nr_num; i++ ) {
1128 		if ( myop->equal( i+RDN_COLUMN, rdns->nr_buf[i] ))
1129 			return LDAP_OTHER;
1130 	}
1131 	for ( ; i<NDB_MAX_RDNS; i++ ) {
1132 		if ( myop->equal( i+RDN_COLUMN, dummy ))
1133 			return LDAP_OTHER;
1134 	}
1135 	return 0;
1136 }
1137 
1138 /* Store the DN2ID_TABLE fields */
1139 extern "C" int
1140 ndb_entry_put_info(
1141 	BackendDB *be,
1142 	NdbArgs *NA,
1143 	int update
1144 )
1145 {
1146 	struct ndb_info *ni = (struct ndb_info *) be->be_private;
1147 	const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1148 	const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1149 	NdbOperation *myop;
1150 	NdbAttrInfo *ai;
1151 	Attribute *aoc, *a;
1152 
1153 	/* Get the entry's objectClass attribute; it's ok to be
1154 	 * absent on a fresh insert
1155 	 */
1156 	aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
1157 	if ( update && !aoc )
1158 		return LDAP_OBJECT_CLASS_VIOLATION;
1159 
1160 	myop = NA->txn->getNdbOperation( myTable );
1161 	if ( !myop )
1162 		return LDAP_OTHER;
1163 	if ( update ) {
1164 		if ( myop->updateTuple())
1165 			return LDAP_OTHER;
1166 	} else {
1167 		if ( myop->insertTuple())
1168 			return LDAP_OTHER;
1169 	}
1170 
1171 	if ( ndb_rdns2keys( myop, NA->rdns ))
1172 		return LDAP_OTHER;
1173 
1174 	/* Set entry ID */
1175 	{
1176 		Uint64 eid = NA->e->e_id;
1177 		if ( myop->setValue( EID_COLUMN, eid ))
1178 			return LDAP_OTHER;
1179 	}
1180 
1181 	/* Set list of objectClasses */
1182 	/* List is <sp> <class> <sp> <class> <sp> ... so that
1183 	 * searches for " class " will yield accurate results
1184 	 */
1185 	if ( aoc ) {
1186 		char *ptr, buf[sizeof(MedVar)];
1187 		NdbOcs myOcs;
1188 		int i;
1189 
1190 		ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
1191 		ptr = buf+2;
1192 		*ptr++ = ' ';
1193 		for ( i=0; i<myOcs.no_ntext; i++ ) {
1194 			/* data loss... */
1195 			if ( ptr + myOcs.no_text[i].bv_len + 1 >= &buf[sizeof(buf)] )
1196 				break;
1197 			ptr = lutil_strcopy( ptr, myOcs.no_text[i].bv_val );
1198 			*ptr++ = ' ';
1199 		}
1200 
1201 		/* implicit classes */
1202 		if ( myOcs.no_nitext ) {
1203 			*ptr++ = '@';
1204 			*ptr++ = ' ';
1205 			for ( i=0; i<myOcs.no_nitext; i++ ) {
1206 				/* data loss... */
1207 				if ( ptr + myOcs.no_itext[i].bv_len + 1 >= &buf[sizeof(buf)] )
1208 					break;
1209 				ptr = lutil_strcopy( ptr, myOcs.no_itext[i].bv_val );
1210 				*ptr++ = ' ';
1211 			}
1212 		}
1213 
1214 		i = ptr - buf - 2;
1215 		buf[0] = i & 0xff;
1216 		buf[1] = i >> 8;
1217 		if ( myop->setValue( OCS_COLUMN, buf ))
1218 			return LDAP_OTHER;
1219 	}
1220 
1221 	/* Set any indexed attrs */
1222 	for ( a = NA->e->e_attrs; a; a=a->a_next ) {
1223 		ai = ndb_ai_find( ni, a->a_desc->ad_type );
1224 		if ( ai && ( ai->na_flag & NDB_INFO_INDEX )) {
1225 			char *ptr, buf[sizeof(MedVar)];
1226 			int len;
1227 
1228 			ptr = buf+1;
1229 			len = a->a_vals[0].bv_len;
1230 			/* FIXME: data loss */
1231 			if ( len > ai->na_len )
1232 				len = ai->na_len;
1233 			buf[0] = len & 0xff;
1234 			if ( ai->na_len > 255 ) {
1235 				*ptr++ = len >> 8;
1236 			}
1237 			memcpy( ptr, a->a_vals[0].bv_val, len );
1238 			if ( myop->setValue( ai->na_ixcol, buf ))
1239 				return LDAP_OTHER;
1240 		}
1241 	}
1242 
1243 	return 0;
1244 }
1245 
1246 extern "C" struct berval *
1247 ndb_str2bvarray(
1248 	char *str,
1249 	int len,
1250 	char delim,
1251 	void *ctx
1252 )
1253 {
1254 	struct berval *list, tmp;
1255 	char *beg;
1256 	int i, num;
1257 
1258 	while ( *str == delim ) {
1259 		str++;
1260 		len--;
1261 	}
1262 
1263 	while ( str[len-1] == delim ) {
1264 		str[--len] = '\0';
1265 	}
1266 
1267 	for ( i = 1, beg = str;; i++ ) {
1268 		beg = strchr( beg, delim );
1269 		if ( !beg )
1270 			break;
1271 		if ( beg >= str + len )
1272 			break;
1273 		beg++;
1274 	}
1275 
1276 	num = i;
1277 	list = (struct berval *)slap_sl_malloc( (num+1)*sizeof(struct berval), ctx);
1278 
1279 	for ( i = 0, beg = str; i<num; i++ ) {
1280 		tmp.bv_val = beg;
1281 		beg = strchr( beg, delim );
1282 		if ( beg >= str + len )
1283 			beg = NULL;
1284 		if ( beg ) {
1285 			tmp.bv_len = beg - tmp.bv_val;
1286 		} else {
1287 			tmp.bv_len = len - (tmp.bv_val - str);
1288 		}
1289 		ber_dupbv_x( &list[i], &tmp, ctx );
1290 		beg++;
1291 	}
1292 
1293 	BER_BVZERO( &list[i] );
1294 	return list;
1295 }
1296 
1297 extern "C" struct berval *
1298 ndb_ref2oclist(
1299 	const char *ref,
1300 	void *ctx
1301 )
1302 {
1303 	char *implied;
1304 
1305 	/* MedVar */
1306 	int len = ref[0] | (ref[1] << 8);
1307 
1308 	/* don't return the implied classes */
1309 	implied = (char *)memchr( ref+2, '@', len );
1310 	if ( implied ) {
1311 		len = implied - ref - 2;
1312 		*implied = '\0';
1313 	}
1314 
1315 	return ndb_str2bvarray( (char *)ref+2, len, ' ', ctx );
1316 }
1317 
1318 /* Retrieve the DN2ID_TABLE fields. Can call with NULL ocs if just verifying
1319  * the existence of a DN.
1320  */
1321 extern "C" int
1322 ndb_entry_get_info(
1323 	Operation *op,
1324 	NdbArgs *NA,
1325 	int update,
1326 	struct berval *matched
1327 )
1328 {
1329 	struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1330 	const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1331 	const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1332 	NdbOperation *myop[NDB_MAX_RDNS];
1333 	NdbRecAttr *eid[NDB_MAX_RDNS], *oc[NDB_MAX_RDNS];
1334 	char idbuf[NDB_MAX_RDNS][2*sizeof(ID)];
1335 	char ocbuf[NDB_MAX_RDNS][NDB_OC_BUFLEN];
1336 
1337 	if ( matched ) {
1338 		BER_BVZERO( matched );
1339 	}
1340 	if ( !myTable ) {
1341 		return LDAP_OTHER;
1342 	}
1343 
1344 	myop[0] = NA->txn->getNdbOperation( myTable );
1345 	if ( !myop[0] ) {
1346 		return LDAP_OTHER;
1347 	}
1348 
1349 	if ( myop[0]->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead )) {
1350 		return LDAP_OTHER;
1351 	}
1352 
1353 	if ( !NA->rdns->nr_num && ndb_dn2rdns( &NA->e->e_name, NA->rdns )) {
1354 		return LDAP_NO_SUCH_OBJECT;
1355 	}
1356 
1357 	if ( ndb_rdns2keys( myop[0], NA->rdns )) {
1358 		return LDAP_OTHER;
1359 	}
1360 
1361 	eid[0] = myop[0]->getValue( EID_COLUMN, idbuf[0] );
1362 	if ( !eid[0] ) {
1363 		return LDAP_OTHER;
1364 	}
1365 
1366 	ocbuf[0][0] = 0;
1367 	ocbuf[0][1] = 0;
1368 	if ( !NA->ocs ) {
1369 		oc[0] = myop[0]->getValue( OCS_COLUMN, ocbuf[0] );
1370 		if ( !oc[0] ) {
1371 			return LDAP_OTHER;
1372 		}
1373 	}
1374 
1375 	if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1376 		return LDAP_OTHER;
1377 	}
1378 
1379 	switch( myop[0]->getNdbError().code ) {
1380 	case 0:
1381 		if ( !eid[0]->isNULL() && ( NA->e->e_id = eid[0]->u_64_value() )) {
1382 			/* If we didn't care about OCs, or we got them */
1383 			if ( NA->ocs || ocbuf[0][0] || ocbuf[0][1] ) {
1384 				/* If wanted, return them */
1385 				if ( !NA->ocs )
1386 					NA->ocs = ndb_ref2oclist( ocbuf[0], op->o_tmpmemctx );
1387 				break;
1388 			}
1389 		}
1390 		/* FALLTHRU */
1391 	case NDB_NO_SUCH_OBJECT:	/* no such tuple: look for closest parent */
1392 		if ( matched ) {
1393 			int i, j, k;
1394 			char dummy[2] = {0,0};
1395 
1396 			/* get to last RDN, then back up 1 */
1397 			k = NA->rdns->nr_num - 1;
1398 
1399 			for ( i=0; i<k; i++ ) {
1400 				myop[i] = NA->txn->getNdbOperation( myTable );
1401 				if ( !myop[i] )
1402 					return LDAP_OTHER;
1403 				if ( myop[i]->readTuple( NdbOperation::LM_CommittedRead ))
1404 					return LDAP_OTHER;
1405 				for ( j=0; j<=i; j++ ) {
1406 					if ( myop[i]->equal( j+RDN_COLUMN, NA->rdns->nr_buf[j] ))
1407 						return LDAP_OTHER;
1408 				}
1409 				for ( ;j<NDB_MAX_RDNS; j++ ) {
1410 					if ( myop[i]->equal( j+RDN_COLUMN, dummy ))
1411 						return LDAP_OTHER;
1412 				}
1413 				eid[i] = myop[i]->getValue( EID_COLUMN, idbuf[i] );
1414 				if ( !eid[i] ) {
1415 					return LDAP_OTHER;
1416 				}
1417 				ocbuf[i][0] = 0;
1418 				ocbuf[i][1] = 0;
1419 				if ( !NA->ocs ) {
1420 					oc[i] = myop[0]->getValue( OCS_COLUMN, ocbuf[i] );
1421 					if ( !oc[i] ) {
1422 						return LDAP_OTHER;
1423 					}
1424 				}
1425 			}
1426 			if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1427 				return LDAP_OTHER;
1428 			}
1429 			for ( --i; i>=0; i-- ) {
1430 				if ( myop[i]->getNdbError().code == 0 ) {
1431 					for ( j=0; j<=i; j++ )
1432 						matched->bv_len += NA->rdns->nr_buf[j][0];
1433 					NA->erdns = NA->rdns->nr_num;
1434 					NA->rdns->nr_num = j;
1435 					matched->bv_len += i;
1436 					matched->bv_val = NA->e->e_name.bv_val +
1437 						NA->e->e_name.bv_len - matched->bv_len;
1438 					if ( !eid[i]->isNULL() )
1439 						NA->e->e_id = eid[i]->u_64_value();
1440 					if ( !NA->ocs )
1441 						NA->ocs = ndb_ref2oclist( ocbuf[i], op->o_tmpmemctx );
1442 					break;
1443 				}
1444 			}
1445 		}
1446 		return LDAP_NO_SUCH_OBJECT;
1447 	default:
1448 		return LDAP_OTHER;
1449 	}
1450 
1451 	return 0;
1452 }
1453 
1454 extern "C" int
1455 ndb_entry_del_info(
1456 	BackendDB *be,
1457 	NdbArgs *NA
1458 )
1459 {
1460 	struct ndb_info *ni = (struct ndb_info *) be->be_private;
1461 	const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1462 	const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1463 	NdbOperation *myop;
1464 
1465 	myop = NA->txn->getNdbOperation( myTable );
1466 	if ( !myop )
1467 		return LDAP_OTHER;
1468 	if ( myop->deleteTuple())
1469 		return LDAP_OTHER;
1470 
1471 	if ( ndb_rdns2keys( myop, NA->rdns ))
1472 		return LDAP_OTHER;
1473 
1474 	return 0;
1475 }
1476 
1477 extern "C" int
1478 ndb_next_id(
1479 	BackendDB *be,
1480 	Ndb *ndb,
1481 	ID *id
1482 )
1483 {
1484 	struct ndb_info *ni = (struct ndb_info *) be->be_private;
1485 	const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
1486 	const NdbDictionary::Table *myTable = myDict->getTable( NEXTID_TABLE );
1487 	Uint64 nid = 0;
1488 	int rc;
1489 
1490 	if ( !myTable ) {
1491 		Debug( LDAP_DEBUG_ANY, "ndb_next_id: " NEXTID_TABLE " table is missing\n",
1492 			0, 0, 0 );
1493 		return LDAP_OTHER;
1494 	}
1495 
1496 	rc = ndb->getAutoIncrementValue( myTable, nid, 1000 );
1497 	if ( !rc )
1498 		*id = nid;
1499 	return rc;
1500 }
1501 
1502 extern "C" { static void ndb_thread_hfree( void *key, void *data ); };
1503 static void
1504 ndb_thread_hfree( void *key, void *data )
1505 {
1506 	Ndb *ndb = (Ndb *)data;
1507 	delete ndb;
1508 }
1509 
1510 extern "C" int
1511 ndb_thread_handle(
1512 	Operation *op,
1513 	Ndb **ndb )
1514 {
1515 	struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1516 	void *data;
1517 
1518 	if ( ldap_pvt_thread_pool_getkey( op->o_threadctx, ni, &data, NULL )) {
1519 		Ndb *myNdb;
1520 		int rc;
1521 		ldap_pvt_thread_mutex_lock( &ni->ni_conn_mutex );
1522 		myNdb = new Ndb( ni->ni_cluster[ni->ni_nextconn++], ni->ni_dbname );
1523 		if ( ni->ni_nextconn >= ni->ni_nconns )
1524 			ni->ni_nextconn = 0;
1525 		ldap_pvt_thread_mutex_unlock( &ni->ni_conn_mutex );
1526 		if ( !myNdb ) {
1527 			return LDAP_OTHER;
1528 		}
1529 		rc = myNdb->init(1024);
1530 		if ( rc ) {
1531 			delete myNdb;
1532 			Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1533 				rc, 0, 0 );
1534 			return rc;
1535 		}
1536 		data = (void *)myNdb;
1537 		if (( rc = ldap_pvt_thread_pool_setkey( op->o_threadctx, ni,
1538 			data, ndb_thread_hfree, NULL, NULL ))) {
1539 			delete myNdb;
1540 			Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1541 				rc, 0, 0 );
1542 			return rc;
1543 		}
1544 	}
1545 	*ndb = (Ndb *)data;
1546 	return 0;
1547 }
1548 
1549 extern "C" int
1550 ndb_entry_get(
1551 	Operation *op,
1552 	struct berval *ndn,
1553 	ObjectClass *oc,
1554 	AttributeDescription *ad,
1555 	int rw,
1556 	Entry **ent )
1557 {
1558 	struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1559 	NdbArgs NA;
1560 	Entry e = {0};
1561 	int rc;
1562 
1563 	/* Get our NDB handle */
1564 	rc = ndb_thread_handle( op, &NA.ndb );
1565 
1566 	NA.txn = NA.ndb->startTransaction();
1567 	if( !NA.txn ) {
1568 		Debug( LDAP_DEBUG_TRACE,
1569 			LDAP_XSTRING(ndb_entry_get) ": startTransaction failed: %s (%d)\n",
1570 			NA.ndb->getNdbError().message, NA.ndb->getNdbError().code, 0 );
1571 		return 1;
1572 	}
1573 
1574 	e.e_name = *ndn;
1575 	NA.e = &e;
1576 	/* get entry */
1577 	{
1578 		NdbRdns rdns;
1579 		rdns.nr_num = 0;
1580 		NA.ocs = NULL;
1581 		NA.rdns = &rdns;
1582 		rc = ndb_entry_get_info( op, &NA, rw, NULL );
1583 	}
1584 	if ( rc == 0 ) {
1585 		e.e_name = *ndn;
1586 		e.e_nname = *ndn;
1587 		rc = ndb_entry_get_data( op, &NA, 0 );
1588 		ber_bvarray_free( NA.ocs );
1589 		if ( rc == 0 ) {
1590 			if ( oc && !is_entry_objectclass_or_sub( &e, oc )) {
1591 				attrs_free( e.e_attrs );
1592 				rc = 1;
1593 			}
1594 		}
1595 	}
1596 	if ( rc == 0 ) {
1597 		*ent = entry_alloc();
1598 		**ent = e;
1599 		ber_dupbv( &(*ent)->e_name, ndn );
1600 		ber_dupbv( &(*ent)->e_nname, ndn );
1601 	} else {
1602 		rc = 1;
1603 	}
1604 	NA.txn->close();
1605 	return rc;
1606 }
1607 
1608 /* Congestion avoidance code
1609  * for Deadlock Rollback
1610  */
1611 
1612 extern "C" void
1613 ndb_trans_backoff( int num_retries )
1614 {
1615 	int i;
1616 	int delay = 0;
1617 	int pow_retries = 1;
1618 	unsigned long key = 0;
1619 	unsigned long max_key = -1;
1620 	struct timeval timeout;
1621 
1622 	lutil_entropy( (unsigned char *) &key, sizeof( unsigned long ));
1623 
1624 	for ( i = 0; i < num_retries; i++ ) {
1625 		if ( i >= 5 ) break;
1626 		pow_retries *= 4;
1627 	}
1628 
1629 	delay = 16384 * (key * (double) pow_retries / (double) max_key);
1630 	delay = delay ? delay : 1;
1631 
1632 	Debug( LDAP_DEBUG_TRACE,  "delay = %d, num_retries = %d\n", delay, num_retries, 0 );
1633 
1634 	timeout.tv_sec = delay / 1000000;
1635 	timeout.tv_usec = delay % 1000000;
1636 	select( 0, NULL, NULL, NULL, &timeout );
1637 }
1638 
1639 extern "C" void
1640 ndb_check_referral( Operation *op, SlapReply *rs, NdbArgs *NA )
1641 {
1642 	struct berval dn, ndn;
1643 	int i, dif;
1644 	dif = NA->erdns - NA->rdns->nr_num;
1645 
1646 	/* Set full DN of matched into entry */
1647 	for ( i=0; i<dif; i++ ) {
1648 		dnParent( &NA->e->e_name, &dn );
1649 		dnParent( &NA->e->e_nname, &ndn );
1650 		NA->e->e_name = dn;
1651 		NA->e->e_nname = ndn;
1652 	}
1653 
1654 	/* return referral only if "disclose" is granted on the object */
1655 	if ( access_allowed( op, NA->e, slap_schema.si_ad_entry,
1656 		NULL, ACL_DISCLOSE, NULL )) {
1657 		Attribute a;
1658 		for ( i=0; !BER_BVISNULL( &NA->ocs[i] ); i++ );
1659 		a.a_numvals = i;
1660 		a.a_desc = slap_schema.si_ad_objectClass;
1661 		a.a_vals = NA->ocs;
1662 		a.a_nvals = NA->ocs;
1663 		a.a_next = NULL;
1664 		NA->e->e_attrs = &a;
1665 		if ( is_entry_referral( NA->e )) {
1666 			NA->e->e_attrs = NULL;
1667 			ndb_entry_get_data( op, NA, 0 );
1668 			rs->sr_ref = get_entry_referrals( op, NA->e );
1669 			if ( rs->sr_ref ) {
1670 				rs->sr_err = LDAP_REFERRAL;
1671 				rs->sr_flags |= REP_REF_MUSTBEFREED;
1672 			}
1673 			attrs_free( NA->e->e_attrs );
1674 		}
1675 		NA->e->e_attrs = NULL;
1676 	}
1677 }
1678