1 /*
2 ctdb ltdb code
3
4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Ronnie sahlberg 2011
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24
25 #include <tdb.h>
26
27 #include "lib/tdb_wrap/tdb_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/util/debug.h"
30
31 #include "ctdb_private.h"
32
33 #include "common/common.h"
34 #include "common/logging.h"
35
36
37 /*
38 * Calculate tdb flags based on databse type
39 */
ctdb_db_tdb_flags(uint8_t db_flags,bool with_valgrind,bool with_mutex)40 int ctdb_db_tdb_flags(uint8_t db_flags, bool with_valgrind, bool with_mutex)
41 {
42 int tdb_flags = 0;
43
44 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
45 tdb_flags = TDB_DEFAULT;
46
47 } else if (db_flags & CTDB_DB_FLAGS_REPLICATED) {
48 tdb_flags = TDB_NOSYNC |
49 TDB_CLEAR_IF_FIRST |
50 TDB_INCOMPATIBLE_HASH;
51
52 } else {
53 tdb_flags = TDB_NOSYNC |
54 TDB_CLEAR_IF_FIRST |
55 TDB_INCOMPATIBLE_HASH;
56
57 #ifdef TDB_MUTEX_LOCKING
58 if (with_mutex && tdb_runtime_check_for_robust_mutexes()) {
59 tdb_flags |= TDB_MUTEX_LOCKING;
60 }
61 #endif
62
63 }
64
65 tdb_flags |= TDB_DISALLOW_NESTING;
66 if (with_valgrind) {
67 tdb_flags |= TDB_NOMMAP;
68 }
69
70 return tdb_flags;
71 }
72
73 /*
74 find an attached ctdb_db handle given a name
75 */
ctdb_db_handle(struct ctdb_context * ctdb,const char * name)76 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
77 {
78 struct ctdb_db_context *tmp_db;
79 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
80 if (strcmp(name, tmp_db->db_name) == 0) {
81 return tmp_db;
82 }
83 }
84 return NULL;
85 }
86
ctdb_db_persistent(struct ctdb_db_context * ctdb_db)87 bool ctdb_db_persistent(struct ctdb_db_context *ctdb_db)
88 {
89 if (ctdb_db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
90 return true;
91 }
92 return false;
93 }
94
ctdb_db_replicated(struct ctdb_db_context * ctdb_db)95 bool ctdb_db_replicated(struct ctdb_db_context *ctdb_db)
96 {
97 if (ctdb_db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
98 return true;
99 }
100 return false;
101 }
102
ctdb_db_volatile(struct ctdb_db_context * ctdb_db)103 bool ctdb_db_volatile(struct ctdb_db_context *ctdb_db)
104 {
105 if ((ctdb_db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
106 (ctdb_db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
107 return false;
108 }
109 return true;
110 }
111
ctdb_db_readonly(struct ctdb_db_context * ctdb_db)112 bool ctdb_db_readonly(struct ctdb_db_context *ctdb_db)
113 {
114 if (ctdb_db->db_flags & CTDB_DB_FLAGS_READONLY) {
115 return true;
116 }
117 return false;
118 }
119
ctdb_db_set_readonly(struct ctdb_db_context * ctdb_db)120 void ctdb_db_set_readonly(struct ctdb_db_context *ctdb_db)
121 {
122 ctdb_db->db_flags |= CTDB_DB_FLAGS_READONLY;
123 }
124
ctdb_db_reset_readonly(struct ctdb_db_context * ctdb_db)125 void ctdb_db_reset_readonly(struct ctdb_db_context *ctdb_db)
126 {
127 ctdb_db->db_flags &= ~CTDB_DB_FLAGS_READONLY;
128 }
129
ctdb_db_sticky(struct ctdb_db_context * ctdb_db)130 bool ctdb_db_sticky(struct ctdb_db_context *ctdb_db)
131 {
132 if (ctdb_db->db_flags & CTDB_DB_FLAGS_STICKY) {
133 return true;
134 }
135 return false;
136 }
137
ctdb_db_set_sticky(struct ctdb_db_context * ctdb_db)138 void ctdb_db_set_sticky(struct ctdb_db_context *ctdb_db)
139 {
140 ctdb_db->db_flags |= CTDB_DB_FLAGS_STICKY;
141 }
142
143 /*
144 return the lmaster given a key
145 */
ctdb_lmaster(struct ctdb_context * ctdb,const TDB_DATA * key)146 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
147 {
148 uint32_t idx, lmaster;
149
150 idx = ctdb_hash(key) % ctdb->vnn_map->size;
151 lmaster = ctdb->vnn_map->map[idx];
152
153 return lmaster;
154 }
155
156
157 /*
158 construct an initial header for a record with no ltdb header yet
159 */
ltdb_initial_header(struct ctdb_db_context * ctdb_db,TDB_DATA key,struct ctdb_ltdb_header * header)160 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db,
161 TDB_DATA key,
162 struct ctdb_ltdb_header *header)
163 {
164 ZERO_STRUCTP(header);
165 /* initial dmaster is the lmaster */
166 header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
167 header->flags = CTDB_REC_FLAG_AUTOMATIC;
168 }
169
170 struct ctdb_ltdb_fetch_state {
171 struct ctdb_ltdb_header *header;
172 TALLOC_CTX *mem_ctx;
173 TDB_DATA *data;
174 int ret;
175 bool found;
176 };
177
ctdb_ltdb_fetch_fn(TDB_DATA key,TDB_DATA data,void * private_data)178 static int ctdb_ltdb_fetch_fn(TDB_DATA key, TDB_DATA data, void *private_data)
179 {
180 struct ctdb_ltdb_fetch_state *state = private_data;
181 struct ctdb_ltdb_header *header = state->header;
182 TDB_DATA *dstdata = state->data;
183
184 if (data.dsize < sizeof(*header)) {
185 return 0;
186 }
187
188 state->found = true;
189 memcpy(header, data.dptr, sizeof(*header));
190
191 if (dstdata != NULL) {
192 dstdata->dsize = data.dsize - sizeof(struct ctdb_ltdb_header);
193 dstdata->dptr = talloc_memdup(
194 state->mem_ctx,
195 data.dptr + sizeof(struct ctdb_ltdb_header),
196 dstdata->dsize);
197 if (dstdata->dptr == NULL) {
198 state->ret = -1;
199 }
200 }
201
202 return 0;
203 }
204
205 /*
206 fetch a record from the ltdb, separating out the header information
207 and returning the body of the record. A valid (initial) header is
208 returned if the record is not present
209 */
ctdb_ltdb_fetch(struct ctdb_db_context * ctdb_db,TDB_DATA key,struct ctdb_ltdb_header * header,TALLOC_CTX * mem_ctx,TDB_DATA * data)210 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
211 TDB_DATA key, struct ctdb_ltdb_header *header,
212 TALLOC_CTX *mem_ctx, TDB_DATA *data)
213 {
214 struct ctdb_context *ctdb = ctdb_db->ctdb;
215 struct ctdb_ltdb_fetch_state state = {
216 .header = header,
217 .mem_ctx = mem_ctx,
218 .data = data,
219 .found = false,
220 };
221 int ret;
222
223 ret = tdb_parse_record(
224 ctdb_db->ltdb->tdb, key, ctdb_ltdb_fetch_fn, &state);
225
226 if (ret == -1) {
227 enum TDB_ERROR err = tdb_error(ctdb_db->ltdb->tdb);
228 if (err != TDB_ERR_NOEXIST) {
229 return -1;
230 }
231 }
232
233 if (state.ret != 0) {
234 DBG_DEBUG("ctdb_ltdb_fetch_fn failed\n");
235 return state.ret;
236 }
237
238 if (state.found) {
239 return 0;
240 }
241
242 if (data != NULL) {
243 *data = tdb_null;
244 }
245
246 if (ctdb->vnn_map == NULL) {
247 /* called from the client */
248 header->dmaster = (uint32_t)-1;
249 return -1;
250 }
251
252 ltdb_initial_header(ctdb_db, key, header);
253 if (ctdb_db_persistent(ctdb_db) ||
254 header->dmaster == ctdb_db->ctdb->pnn) {
255
256 ret = ctdb_ltdb_store(ctdb_db, key, header, tdb_null);
257 if (ret != 0) {
258 DBG_NOTICE("failed to store initial header\n");
259 }
260 }
261
262 return 0;
263 }
264
265 /*
266 write a record to a normal database
267 */
ctdb_ltdb_store(struct ctdb_db_context * ctdb_db,TDB_DATA key,struct ctdb_ltdb_header * header,TDB_DATA data)268 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
269 struct ctdb_ltdb_header *header, TDB_DATA data)
270 {
271 struct ctdb_context *ctdb = ctdb_db->ctdb;
272 TDB_DATA rec[2];
273 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
274 int ret;
275
276 if (ctdb_db->ctdb_ltdb_store_fn) {
277 return ctdb_db->ctdb_ltdb_store_fn(ctdb_db, key, header, data);
278 }
279
280 if (ctdb->flags & CTDB_FLAG_TORTURE) {
281 TDB_DATA old;
282 struct ctdb_ltdb_header *h2;
283
284 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
285 h2 = (struct ctdb_ltdb_header *)old.dptr;
286 if (old.dptr != NULL && old.dsize >= hsize &&
287 h2->rsn > header->rsn) {
288 DEBUG(DEBUG_ERR,
289 ("RSN regression! %"PRIu64" %"PRIu64"\n",
290 h2->rsn, header->rsn));
291 }
292 if (old.dptr != NULL) {
293 free(old.dptr);
294 }
295 }
296
297 rec[0].dsize = hsize;
298 rec[0].dptr = (uint8_t *)header;
299
300 rec[1].dsize = data.dsize;
301 rec[1].dptr = data.dptr;
302
303 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
304 if (ret != 0) {
305 DEBUG(DEBUG_ERR, (__location__ " Failed to store dynamic data\n"));
306 }
307
308 return ret;
309 }
310
311 /*
312 lock a record in the ltdb, given a key
313 */
ctdb_ltdb_lock(struct ctdb_db_context * ctdb_db,TDB_DATA key)314 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
315 {
316 return tdb_chainlock(ctdb_db->ltdb->tdb, key);
317 }
318
319 /*
320 unlock a record in the ltdb, given a key
321 */
ctdb_ltdb_unlock(struct ctdb_db_context * ctdb_db,TDB_DATA key)322 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
323 {
324 int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
325 if (ret != 0) {
326 DEBUG(DEBUG_ERR,("tdb_chainunlock failed on db %s [%s]\n", ctdb_db->db_name, tdb_errorstr(ctdb_db->ltdb->tdb)));
327 }
328 return ret;
329 }
330
331
332 /*
333 delete a record from a normal database
334 */
ctdb_ltdb_delete(struct ctdb_db_context * ctdb_db,TDB_DATA key)335 int ctdb_ltdb_delete(struct ctdb_db_context *ctdb_db, TDB_DATA key)
336 {
337 if (! ctdb_db_volatile(ctdb_db)) {
338 DEBUG(DEBUG_WARNING,
339 ("Ignored deletion of empty record from "
340 "non-volatile database\n"));
341 return 0;
342 }
343 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
344 DEBUG(DEBUG_ERR,("Failed to delete empty record."));
345 return -1;
346 }
347 return 0;
348 }
349
ctdb_trackingdb_add_pnn(struct ctdb_context * ctdb,TDB_DATA * data,uint32_t pnn)350 int ctdb_trackingdb_add_pnn(struct ctdb_context *ctdb, TDB_DATA *data, uint32_t pnn)
351 {
352 unsigned int byte_pos = pnn / 8;
353 unsigned char bit_mask = 1 << (pnn % 8);
354
355 if (byte_pos + 1 > data->dsize) {
356 char *buf;
357
358 buf = malloc(byte_pos + 1);
359 memset(buf, 0, byte_pos + 1);
360 if (buf == NULL) {
361 DEBUG(DEBUG_ERR, ("Out of memory when allocating buffer of %d bytes for trackingdb\n", byte_pos + 1));
362 return -1;
363 }
364 if (data->dptr != NULL) {
365 memcpy(buf, data->dptr, data->dsize);
366 free(data->dptr);
367 }
368 data->dptr = (uint8_t *)buf;
369 data->dsize = byte_pos + 1;
370 }
371
372 data->dptr[byte_pos] |= bit_mask;
373 return 0;
374 }
375
ctdb_trackingdb_traverse(struct ctdb_context * ctdb,TDB_DATA data,ctdb_trackingdb_cb cb,void * private_data)376 void ctdb_trackingdb_traverse(struct ctdb_context *ctdb, TDB_DATA data, ctdb_trackingdb_cb cb, void *private_data)
377 {
378 unsigned int i;
379
380 for(i = 0; i < data.dsize; i++) {
381 unsigned int j;
382
383 for (j=0; j<8; j++) {
384 int mask = 1<<j;
385
386 if (data.dptr[i] & mask) {
387 cb(ctdb, i * 8 + j, private_data);
388 }
389 }
390 }
391 }
392
393 /*
394 this is the dummy null procedure that all databases support
395 */
ctdb_null_func(struct ctdb_call_info * call)396 int ctdb_null_func(struct ctdb_call_info *call)
397 {
398 return 0;
399 }
400
401 /*
402 this is a plain fetch procedure that all databases support
403 */
ctdb_fetch_func(struct ctdb_call_info * call)404 int ctdb_fetch_func(struct ctdb_call_info *call)
405 {
406 call->reply_data = &call->record_data;
407 return 0;
408 }
409
410 /*
411 this is a plain fetch procedure that all databases support
412 this returns the full record including the ltdb header
413 */
ctdb_fetch_with_header_func(struct ctdb_call_info * call)414 int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
415 {
416 call->reply_data = talloc(call, TDB_DATA);
417 if (call->reply_data == NULL) {
418 return -1;
419 }
420 call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
421 call->reply_data->dptr = talloc_size(call->reply_data, call->reply_data->dsize);
422 if (call->reply_data->dptr == NULL) {
423 return -1;
424 }
425 memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
426 memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
427
428 return 0;
429 }
430
431