1 /*
2    ctdb ltdb code
3 
4    Copyright (C) Andrew Tridgell  2006
5    Copyright (C) Ronnie sahlberg  2011
6 
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11 
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16 
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 
25 #include <tdb.h>
26 
27 #include "lib/tdb_wrap/tdb_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/util/debug.h"
30 
31 #include "ctdb_private.h"
32 
33 #include "common/common.h"
34 #include "common/logging.h"
35 
36 
37 /*
38  * Calculate tdb flags based on databse type
39  */
ctdb_db_tdb_flags(uint8_t db_flags,bool with_valgrind,bool with_mutex)40 int ctdb_db_tdb_flags(uint8_t db_flags, bool with_valgrind, bool with_mutex)
41 {
42 	int tdb_flags = 0;
43 
44 	if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
45 		tdb_flags = TDB_DEFAULT;
46 
47 	} else if (db_flags & CTDB_DB_FLAGS_REPLICATED) {
48 		tdb_flags = TDB_NOSYNC |
49 			    TDB_CLEAR_IF_FIRST |
50 			    TDB_INCOMPATIBLE_HASH;
51 
52 	} else {
53 		tdb_flags = TDB_NOSYNC |
54 			    TDB_CLEAR_IF_FIRST |
55 			    TDB_INCOMPATIBLE_HASH;
56 
57 #ifdef TDB_MUTEX_LOCKING
58 		if (with_mutex && tdb_runtime_check_for_robust_mutexes()) {
59 			tdb_flags |= TDB_MUTEX_LOCKING;
60 		}
61 #endif
62 
63 	}
64 
65 	tdb_flags |= TDB_DISALLOW_NESTING;
66 	if (with_valgrind) {
67 		tdb_flags |= TDB_NOMMAP;
68 	}
69 
70 	return tdb_flags;
71 }
72 
73 /*
74   find an attached ctdb_db handle given a name
75  */
ctdb_db_handle(struct ctdb_context * ctdb,const char * name)76 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
77 {
78 	struct ctdb_db_context *tmp_db;
79 	for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
80 		if (strcmp(name, tmp_db->db_name) == 0) {
81 			return tmp_db;
82 		}
83 	}
84 	return NULL;
85 }
86 
ctdb_db_persistent(struct ctdb_db_context * ctdb_db)87 bool ctdb_db_persistent(struct ctdb_db_context *ctdb_db)
88 {
89 	if (ctdb_db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
90 		return true;
91 	}
92 	return false;
93 }
94 
ctdb_db_replicated(struct ctdb_db_context * ctdb_db)95 bool ctdb_db_replicated(struct ctdb_db_context *ctdb_db)
96 {
97 	if (ctdb_db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
98 		return true;
99 	}
100 	return false;
101 }
102 
ctdb_db_volatile(struct ctdb_db_context * ctdb_db)103 bool ctdb_db_volatile(struct ctdb_db_context *ctdb_db)
104 {
105 	if ((ctdb_db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
106 	    (ctdb_db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
107 		return false;
108 	}
109 	return true;
110 }
111 
ctdb_db_readonly(struct ctdb_db_context * ctdb_db)112 bool ctdb_db_readonly(struct ctdb_db_context *ctdb_db)
113 {
114 	if (ctdb_db->db_flags & CTDB_DB_FLAGS_READONLY) {
115 		return true;
116 	}
117 	return false;
118 }
119 
ctdb_db_set_readonly(struct ctdb_db_context * ctdb_db)120 void ctdb_db_set_readonly(struct ctdb_db_context *ctdb_db)
121 {
122 	ctdb_db->db_flags |= CTDB_DB_FLAGS_READONLY;
123 }
124 
ctdb_db_reset_readonly(struct ctdb_db_context * ctdb_db)125 void ctdb_db_reset_readonly(struct ctdb_db_context *ctdb_db)
126 {
127 	ctdb_db->db_flags &= ~CTDB_DB_FLAGS_READONLY;
128 }
129 
ctdb_db_sticky(struct ctdb_db_context * ctdb_db)130 bool ctdb_db_sticky(struct ctdb_db_context *ctdb_db)
131 {
132 	if (ctdb_db->db_flags & CTDB_DB_FLAGS_STICKY) {
133 		return true;
134 	}
135 	return false;
136 }
137 
ctdb_db_set_sticky(struct ctdb_db_context * ctdb_db)138 void ctdb_db_set_sticky(struct ctdb_db_context *ctdb_db)
139 {
140 	ctdb_db->db_flags |= CTDB_DB_FLAGS_STICKY;
141 }
142 
143 /*
144   return the lmaster given a key
145 */
ctdb_lmaster(struct ctdb_context * ctdb,const TDB_DATA * key)146 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
147 {
148 	uint32_t idx, lmaster;
149 
150 	idx = ctdb_hash(key) % ctdb->vnn_map->size;
151 	lmaster = ctdb->vnn_map->map[idx];
152 
153 	return lmaster;
154 }
155 
156 
157 /*
158   construct an initial header for a record with no ltdb header yet
159 */
ltdb_initial_header(struct ctdb_db_context * ctdb_db,TDB_DATA key,struct ctdb_ltdb_header * header)160 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db,
161 				TDB_DATA key,
162 				struct ctdb_ltdb_header *header)
163 {
164 	ZERO_STRUCTP(header);
165 	/* initial dmaster is the lmaster */
166 	header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
167 	header->flags = CTDB_REC_FLAG_AUTOMATIC;
168 }
169 
170 struct ctdb_ltdb_fetch_state {
171 	struct ctdb_ltdb_header *header;
172 	TALLOC_CTX *mem_ctx;
173 	TDB_DATA *data;
174 	int ret;
175 	bool found;
176 };
177 
ctdb_ltdb_fetch_fn(TDB_DATA key,TDB_DATA data,void * private_data)178 static int ctdb_ltdb_fetch_fn(TDB_DATA key, TDB_DATA data, void *private_data)
179 {
180 	struct ctdb_ltdb_fetch_state *state = private_data;
181 	struct ctdb_ltdb_header *header = state->header;
182 	TDB_DATA *dstdata = state->data;
183 
184 	if (data.dsize < sizeof(*header)) {
185 		return 0;
186 	}
187 
188 	state->found = true;
189 	memcpy(header, data.dptr, sizeof(*header));
190 
191 	if (dstdata != NULL) {
192 		dstdata->dsize = data.dsize - sizeof(struct ctdb_ltdb_header);
193 		dstdata->dptr = talloc_memdup(
194 			state->mem_ctx,
195 			data.dptr + sizeof(struct ctdb_ltdb_header),
196 			dstdata->dsize);
197 		if (dstdata->dptr == NULL) {
198 			state->ret = -1;
199 		}
200 	}
201 
202 	return 0;
203 }
204 
205 /*
206   fetch a record from the ltdb, separating out the header information
207   and returning the body of the record. A valid (initial) header is
208   returned if the record is not present
209 */
ctdb_ltdb_fetch(struct ctdb_db_context * ctdb_db,TDB_DATA key,struct ctdb_ltdb_header * header,TALLOC_CTX * mem_ctx,TDB_DATA * data)210 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
211 		    TDB_DATA key, struct ctdb_ltdb_header *header,
212 		    TALLOC_CTX *mem_ctx, TDB_DATA *data)
213 {
214 	struct ctdb_context *ctdb = ctdb_db->ctdb;
215 	struct ctdb_ltdb_fetch_state state = {
216 		.header = header,
217 		.mem_ctx = mem_ctx,
218 		.data = data,
219 		.found = false,
220 	};
221 	int ret;
222 
223 	ret = tdb_parse_record(
224 		ctdb_db->ltdb->tdb, key, ctdb_ltdb_fetch_fn, &state);
225 
226 	if (ret == -1) {
227 		enum TDB_ERROR err = tdb_error(ctdb_db->ltdb->tdb);
228 		if (err != TDB_ERR_NOEXIST) {
229 			return -1;
230 		}
231 	}
232 
233 	if (state.ret != 0) {
234 		DBG_DEBUG("ctdb_ltdb_fetch_fn failed\n");
235 		return state.ret;
236 	}
237 
238 	if (state.found) {
239 		return 0;
240 	}
241 
242 	if (data != NULL) {
243 		*data = tdb_null;
244 	}
245 
246 	if (ctdb->vnn_map == NULL) {
247 		/* called from the client */
248 		header->dmaster = (uint32_t)-1;
249 		return -1;
250 	}
251 
252 	ltdb_initial_header(ctdb_db, key, header);
253 	if (ctdb_db_persistent(ctdb_db) ||
254 	    header->dmaster == ctdb_db->ctdb->pnn) {
255 
256 		ret = ctdb_ltdb_store(ctdb_db, key, header, tdb_null);
257 		if (ret != 0) {
258 			DBG_NOTICE("failed to store initial header\n");
259 		}
260 	}
261 
262 	return 0;
263 }
264 
265 /*
266   write a record to a normal database
267 */
ctdb_ltdb_store(struct ctdb_db_context * ctdb_db,TDB_DATA key,struct ctdb_ltdb_header * header,TDB_DATA data)268 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
269 		    struct ctdb_ltdb_header *header, TDB_DATA data)
270 {
271 	struct ctdb_context *ctdb = ctdb_db->ctdb;
272 	TDB_DATA rec[2];
273 	uint32_t hsize = sizeof(struct ctdb_ltdb_header);
274 	int ret;
275 
276 	if (ctdb_db->ctdb_ltdb_store_fn) {
277 		return ctdb_db->ctdb_ltdb_store_fn(ctdb_db, key, header, data);
278 	}
279 
280 	if (ctdb->flags & CTDB_FLAG_TORTURE) {
281 		TDB_DATA old;
282 		struct ctdb_ltdb_header *h2;
283 
284 		old = tdb_fetch(ctdb_db->ltdb->tdb, key);
285 		h2 = (struct ctdb_ltdb_header *)old.dptr;
286 		if (old.dptr != NULL && old.dsize >= hsize &&
287 		    h2->rsn > header->rsn) {
288 			DEBUG(DEBUG_ERR,
289 			      ("RSN regression! %"PRIu64" %"PRIu64"\n",
290 			       h2->rsn, header->rsn));
291 		}
292 		if (old.dptr != NULL) {
293 			free(old.dptr);
294 		}
295 	}
296 
297 	rec[0].dsize = hsize;
298 	rec[0].dptr = (uint8_t *)header;
299 
300 	rec[1].dsize = data.dsize;
301 	rec[1].dptr = data.dptr;
302 
303 	ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
304 	if (ret != 0) {
305 		DEBUG(DEBUG_ERR, (__location__ " Failed to store dynamic data\n"));
306 	}
307 
308 	return ret;
309 }
310 
311 /*
312   lock a record in the ltdb, given a key
313  */
ctdb_ltdb_lock(struct ctdb_db_context * ctdb_db,TDB_DATA key)314 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
315 {
316 	return tdb_chainlock(ctdb_db->ltdb->tdb, key);
317 }
318 
319 /*
320   unlock a record in the ltdb, given a key
321  */
ctdb_ltdb_unlock(struct ctdb_db_context * ctdb_db,TDB_DATA key)322 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
323 {
324 	int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
325 	if (ret != 0) {
326  		DEBUG(DEBUG_ERR,("tdb_chainunlock failed on db %s [%s]\n", ctdb_db->db_name, tdb_errorstr(ctdb_db->ltdb->tdb)));
327 	}
328 	return ret;
329 }
330 
331 
332 /*
333   delete a record from a normal database
334 */
ctdb_ltdb_delete(struct ctdb_db_context * ctdb_db,TDB_DATA key)335 int ctdb_ltdb_delete(struct ctdb_db_context *ctdb_db, TDB_DATA key)
336 {
337 	if (! ctdb_db_volatile(ctdb_db)) {
338 		DEBUG(DEBUG_WARNING,
339 		      ("Ignored deletion of empty record from "
340 		       "non-volatile database\n"));
341 		return 0;
342 	}
343 	if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
344 		DEBUG(DEBUG_ERR,("Failed to delete empty record."));
345 		return -1;
346 	}
347 	return 0;
348 }
349 
ctdb_trackingdb_add_pnn(struct ctdb_context * ctdb,TDB_DATA * data,uint32_t pnn)350 int ctdb_trackingdb_add_pnn(struct ctdb_context *ctdb, TDB_DATA *data, uint32_t pnn)
351 {
352 	unsigned int byte_pos = pnn / 8;
353 	unsigned char bit_mask = 1 << (pnn % 8);
354 
355 	if (byte_pos + 1 > data->dsize) {
356 		char *buf;
357 
358 		buf = malloc(byte_pos + 1);
359 		memset(buf, 0, byte_pos + 1);
360 		if (buf == NULL) {
361 			DEBUG(DEBUG_ERR, ("Out of memory when allocating buffer of %d bytes for trackingdb\n", byte_pos + 1));
362 			return -1;
363 		}
364 		if (data->dptr != NULL) {
365 			memcpy(buf, data->dptr, data->dsize);
366 			free(data->dptr);
367 		}
368 		data->dptr  = (uint8_t *)buf;
369 		data->dsize = byte_pos + 1;
370 	}
371 
372 	data->dptr[byte_pos] |= bit_mask;
373 	return 0;
374 }
375 
ctdb_trackingdb_traverse(struct ctdb_context * ctdb,TDB_DATA data,ctdb_trackingdb_cb cb,void * private_data)376 void ctdb_trackingdb_traverse(struct ctdb_context *ctdb, TDB_DATA data, ctdb_trackingdb_cb cb, void *private_data)
377 {
378 	unsigned int i;
379 
380 	for(i = 0; i < data.dsize; i++) {
381 		unsigned int j;
382 
383 		for (j=0; j<8; j++) {
384 			int mask = 1<<j;
385 
386 			if (data.dptr[i] & mask) {
387 				cb(ctdb, i * 8 + j, private_data);
388 			}
389 		}
390 	}
391 }
392 
393 /*
394   this is the dummy null procedure that all databases support
395 */
ctdb_null_func(struct ctdb_call_info * call)396 int ctdb_null_func(struct ctdb_call_info *call)
397 {
398 	return 0;
399 }
400 
401 /*
402   this is a plain fetch procedure that all databases support
403 */
ctdb_fetch_func(struct ctdb_call_info * call)404 int ctdb_fetch_func(struct ctdb_call_info *call)
405 {
406 	call->reply_data = &call->record_data;
407 	return 0;
408 }
409 
410 /*
411   this is a plain fetch procedure that all databases support
412   this returns the full record including the ltdb header
413 */
ctdb_fetch_with_header_func(struct ctdb_call_info * call)414 int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
415 {
416 	call->reply_data = talloc(call, TDB_DATA);
417 	if (call->reply_data == NULL) {
418 		return -1;
419 	}
420 	call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
421 	call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
422 	if (call->reply_data->dptr == NULL) {
423 		return -1;
424 	}
425 	memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
426 	memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
427 
428 	return 0;
429 }
430 
431