/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * db.cc * * Copyright 2004 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ #pragma ident "%Z%%M% %I% %E% SMI" #include #include #ifdef TDRPC #include #else #include #endif #include "nisdb_mt.h" #include "db_headers.h" #include "db.h" extern db_result *empty_result(db_status); extern int add_to_standby_list(db*); extern int remove_from_standby_list(db*); /* for db_next_desc */ #define LINEAR 1 #define CHAINED 2 struct db_next_info { int next_type; /* linear or chained */ void* next_value; /* linear: entryp; */ /* chained: db_next_index_desc* */ }; /* Constructor: Create a database using the given name, 'dbname.' The database is stored in a file named 'dbname'. The log file is stored in a file named 'dbname'.log. A temporary file 'dbname'.tmp is also used. */ db::db(char* dbname) { int len = strlen(dbname); dbfilename = new char[len+1]; if (dbfilename == NULL) FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT); logfilename = new char[len+5]; if (logfilename == NULL) { delete dbfilename; FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT); } tmpfilename = new char[len+5]; if (tmpfilename == NULL) { delete dbfilename; delete logfilename; FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT); } sprintf(dbfilename, "%s", dbname); sprintf(logfilename, "%s.log", dbname); sprintf(tmpfilename, "%s.tmp", dbname); logfile = NULL; logfile_opened = FALSE; changed = FALSE; INITRW(db); READLOCKOK(db); internal_db.setDbPtr(this); (void) internal_db.configure(dbname); } /* destructor: note that associated files should be removed separated */ db::~db() { (void)acqexcl(); internal_db.reset(); /* clear any associated data structures */ delete dbfilename; delete logfilename; delete tmpfilename; close_log(); delete logfile; (void)destroylock(); } static void assign_next_desc(db_next_desc* desc, entryp value) { db_next_info * store = new db_next_info; if (store == NULL) { desc->db_next_desc_val = NULL; desc->db_next_desc_len = 0; FATAL("db::assign_next_desc: cannot allocate space", DB_MEMORY_LIMIT); } store->next_type = LINEAR; store->next_value = (void*)value; desc->db_next_desc_val = (char*) store; desc->db_next_desc_len = sizeof (db_next_info); } static void assign_next_desc(db_next_desc* desc, db_next_index_desc * value) { db_next_info * store = new db_next_info; if (store == NULL) { desc->db_next_desc_val = NULL; desc->db_next_desc_len = 0; FATAL("db::assign_next_desc: cannot allocate space (2)", DB_MEMORY_LIMIT); } store->next_type = CHAINED; store->next_value = (void*)value; desc->db_next_desc_val = (char*) store; desc->db_next_desc_len = sizeof (db_next_info); } static entryp extract_next_desc(db_next_desc* desc, int *next_type, db_next_index_desc** place2) { entryp place; if (desc == NULL || desc->db_next_desc_len != sizeof (db_next_info)) { *next_type = 0; return (0); } *next_type = ((db_next_info*) desc->db_next_desc_val)->next_type; switch (*next_type) { case LINEAR: place = (entryp) ((db_next_info*) desc->db_next_desc_val)->next_value; return (place); case CHAINED: *place2 = (db_next_index_desc*) ((db_next_info*) desc->db_next_desc_val) ->next_value; return (0); default: *next_type = 0; // invalid type return (0); } } /* Execute the specified action using the rest of the arguments as input. Return a structure db_result containing the result. */ db_result * db::exec_action(db_action action, db_query *query, entry_object *content, db_next_desc* previous) { entryp where, prev; db_result *res = new db_result; long num_answers; entry_object_p * ans; entry_object * single; db_next_index_desc *index_desc; int next_type; db_next_index_desc *prev_desc; if (res == NULL) FATAL3("db::exec_action: cannot allocate space for result", DB_MEMORY_LIMIT, NULL); res->objects.objects_len = 0; /* default */ res->objects.objects_val = NULL; /* default */ switch (action) { case DB_LOOKUP: res->status = internal_db.lookup(query, &num_answers, &ans); res->objects.objects_len = (int) num_answers; res->objects.objects_val = ans; break; case DB_ADD: res->status = internal_db.add(query, content); break; case DB_REMOVE: res->status = internal_db.remove(query); break; case DB_FIRST: if (query == NULL) { res->status = internal_db.first(&where, &single); if (res->status == DB_SUCCESS) assign_next_desc(&(res->nextinfo), where); } else { res->status = internal_db.first(query, &index_desc, &single); if (res->status == DB_SUCCESS) assign_next_desc(&(res->nextinfo), index_desc); } if (res->status == DB_SUCCESS) { res->objects.objects_val = new entry_object_p; if (res->objects.objects_val == NULL) { res->objects.objects_len = 0; delete res; FATAL3( "db::exec_action: cannot allocate space for DB_FIRST result", DB_MEMORY_LIMIT, NULL); } res->objects.objects_len = 1; res->objects.objects_val[0] = single; } break; case DB_NEXT: prev = extract_next_desc(previous, &next_type, &prev_desc); switch (next_type) { case LINEAR: if (prev != 0) { res->status = internal_db.next(prev, &where, &single); if (res->status == DB_SUCCESS) assign_next_desc(&(res->nextinfo), where); } else // invalid previous indicator res->status = DB_NOTFOUND; break; case CHAINED: if (prev_desc != NULL) { res->status = internal_db.next(prev_desc, &index_desc, &single); if (res->status == DB_SUCCESS) assign_next_desc(&(res->nextinfo), index_desc); } else // invalid previous indicator res->status = DB_NOTFOUND; break; default: WARNING("db::exec_action: invalid previous indicator"); res->status = DB_BADQUERY; } if (previous && previous->db_next_desc_val) { delete previous->db_next_desc_val; previous->db_next_desc_len = 0; previous->db_next_desc_val = NULL; } if (res->status == DB_SUCCESS) { res->objects.objects_len = 1; res->objects.objects_val = new entry_object_p; if (res->objects.objects_val == NULL) { res->objects.objects_len = 0; delete res; FATAL3( "db::exec_action: cannot allocate space for DB_NEXT result", DB_MEMORY_LIMIT, NULL); } res->objects.objects_val[0] = single; } break; case DB_RESET_NEXT: prev = extract_next_desc(previous, &next_type, &prev_desc); switch (next_type) { case LINEAR: res->status = DB_SUCCESS; if (previous->db_next_desc_val) { delete previous->db_next_desc_val; previous->db_next_desc_len = 0; previous->db_next_desc_val = NULL; } break; // do nothing case CHAINED: res->status = internal_db.reset_next(prev_desc); if (previous->db_next_desc_val) { delete previous->db_next_desc_val; previous->db_next_desc_len = 0; previous->db_next_desc_val = NULL; } break; default: WARNING("db::exec_action: invalid previous indicator"); res->status = DB_BADQUERY; } break; case DB_ALL: res->status = internal_db.all(&num_answers, &ans); res->objects.objects_len = (int) num_answers; res->objects.objects_val = ans; break; default: WARNING("unknown request"); res->status = DB_BADQUERY; return (res); } return (res); } /* * Log the given action and execute it. * The minor version of the database is updated after the action has * been executed and the database is flagged as being changed. * Return the structure db_result, or NULL if the logging failed or the * action is unknown. */ db_result * db::log_action(db_action action, db_query *query, entry_object *content) { vers *v = internal_db.get_version()->nextminor(); db_result * res; db_log_entry le(action, v, query, content); bool_t copylog = FALSE; WRITELOCK(this, empty_result(DB_LOCK_ERROR), "w db::log_action"); /* * If this is a synchronous operation on the master we should * not copy the log for each operation. Doing so causes * massive disk IO that hampers the performance of these operations. * Where as on the replica these operations are not synchronous * (batched) and don't affect the performance as much. */ if ((action == DB_ADD_NOSYNC) || (action == DB_REMOVE_NOSYNC)) copylog = TRUE; if (open_log(copylog) < 0) { delete v; WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR), "wu db::log_action DB_STORAGE_LIMIT"); return (empty_result(DB_STORAGE_LIMIT)); } if (logfile->append(&le) < 0) { close_log(); WARNING_M("db::log_action: could not add log entry: "); delete v; WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR), "wu db::log_action DB_STORAGE_LIMIT"); return (empty_result(DB_STORAGE_LIMIT)); } switch (action) { case DB_ADD_NOSYNC: action = DB_ADD; break; case DB_REMOVE_NOSYNC: action = DB_REMOVE; break; default: if (logfile->sync_log() < 0) { close_log(); WARNING_M("db::log_action: could not add log entry: "); delete v; WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR), "wu db::log_action DB_STORAGE_LIMIT"); return (empty_result(DB_STORAGE_LIMIT)); } break; } res = exec_action(action, query, content, NULL); internal_db.change_version(v); delete v; changed = TRUE; WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR), "wu db::log_action"); return (res); } /* * Execute 'action' using the rest of the arguments as input. * Return the result of the operation in a db_result structure; * Return NULL if the request is unknown. * If the action involves updates (ADD and REMOVE), it is logged first. */ db_result * db::execute(db_action action, db_query *query, entry_object *content, db_next_desc* previous) { db_result *res; switch (action) { case DB_LOOKUP: case DB_FIRST: case DB_NEXT: case DB_ALL: case DB_RESET_NEXT: READLOCK(this, empty_result(DB_LOCK_ERROR), "r db::execute"); res = exec_action(action, query, content, previous); READUNLOCK(this, empty_result(DB_LOCK_ERROR), "ru db::execute"); return (res); case DB_ADD_NOLOG: WRITELOCK(this, empty_result(DB_LOCK_ERROR), "w db::execute"); changed = TRUE; res = exec_action(DB_ADD, query, content, previous); WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR), "wu db::execute"); return (res); case DB_ADD: case DB_REMOVE: case DB_ADD_NOSYNC: case DB_REMOVE_NOSYNC: /* log_action() will do the locking */ return (log_action(action, query, content)); default: WARNING("db::execute: unknown request"); return (empty_result(DB_INTERNAL_ERROR)); } } /* close existing logfile and delete its structure */ int db::reset_log() { WRITELOCK(this, -1, "w db::reset_log"); /* try to close old log file */ /* doesnot matter since we do synchronous writes only */ if (logfile != NULL) { if (logfile_opened == TRUE) { logfile->sync_log(); if (logfile->close() < 0) { WARNING_M("db::reset_log: could not close log file: "); } remove_from_standby_list(this); } delete logfile; logfile = NULL; } logfile_opened = FALSE; WRITEUNLOCK(this, -1, "wu db::reset_log"); return (0); } /* close existing logfile, but leave its structure if exists */ int db::close_log(int bypass_standby) { WRITELOCK(this, -1, "w db::close_log"); if (logfile != NULL && logfile_opened == TRUE) { logfile->sync_log(); logfile->close(); if (!bypass_standby) remove_from_standby_list(this); } logfile_opened = FALSE; WRITEUNLOCK(this, -1, "wu db::close_log"); return (0); } /* open logfile, creating its structure if it does not exist */ int db::open_log(bool_t copylog) { WRITELOCK(this, -1, "w db::open_log"); if (logfile == NULL) { if ((logfile = new db_log(logfilename, PICKLE_APPEND)) == NULL) FATAL3("db::reset_log: cannot allocate space", DB_MEMORY_LIMIT, -1); } if (logfile_opened == TRUE) { WRITEUNLOCK(this, -1, "wu db::open_log"); return (0); } logfile->copylog = copylog; if ((logfile->open()) == NULL){ WARNING_M("db::open_log: could not open log file: "); delete logfile; logfile = NULL; WRITEUNLOCK(this, -1, "wu db::open_log"); return (-1); } add_to_standby_list(this); logfile_opened = TRUE; WRITEUNLOCK(this, -1, "wu db::open_log"); return (0); } /* * Execute log entry 'j' on the database identified by 'dbchar' if the * version of j is later than that of the database. If 'j' is executed, * 'count' is incremented and the database's verison is updated to that of 'j'. * Returns TRUE always for valid log entries; FALSE otherwise. */ static bool_t apply_log_entry(db_log_entry * j, char * dbchar, int *count) { db_mindex * db = (db_mindex *) dbchar; bool_t status = TRUE; WRITELOCK(db, FALSE, "db::apply_log_entry"); if (db->get_version()->earlier_than(j->get_version())) { ++ *count; #ifdef DEBUG j->print(); #endif /* DEBUG */ switch (j->get_action()) { case DB_ADD: case DB_ADD_NOSYNC: db->add(j->get_query(), j->get_object()); break; case DB_REMOVE: case DB_REMOVE_NOSYNC: db->remove(j->get_query()); break; default: WARNING("db::apply_log_entry: unknown action_type"); WRITEUNLOCK(db, FALSE, "db::apply_log_entry"); return (FALSE); } db->change_version(j->get_version()); } WRITEUNLOCK(db, FALSE, "db::apply_log_entry"); return (TRUE); /* always want to TRUE if action valid ? */ } /* * Execute log entry 'j' on this db. 'j' is executed if its version is * later than that of the database; if executed, the database's version * will be changed to that of 'j', regardless of the status of the operation. * Returns TRUE if 'j' was executed; FALSE if it was not. * Log entry is added to this database's log if log_entry is applied. */ bool_t db::execute_log_entry(db_log_entry *j) { int count = 0; apply_log_entry (j, (char *) &internal_db, &count); bool_t copylog = FALSE; db_action action; /* * If this is a synchronous operation on the master we should * not copy the log for each operation. Doing so causes * massive disk IO that hampers the performance of these operations. * Where as on the replica these operations are not synchronous * (batched) and don't affect the performance as much. */ action = j->get_action(); if ((action == DB_ADD_NOSYNC) || (action == DB_REMOVE_NOSYNC)) copylog = TRUE; /* * should really record the log entry first, but can''t do that without * knowing whether the log entry is applicable. */ WRITELOCK(this, FALSE, "w db::execute_log_entry"); if (count == 1) { if (open_log(copylog) < 0) { WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry"); return (FALSE); } if (logfile->append(j) < 0) { close_log(); WARNING_M( "db::execute_log_entry: could not add log entry: "); WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry"); return (FALSE); } // close_log(); /* do this asynchronously */ } WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry"); return (count == 1); } /* Incorporate updates in log to database already loaded. Does not affect "logfile" */ int db::incorporate_log(char* filename) { db_log f(filename, PICKLE_READ); int ret; WRITELOCK(this, -1, "w db::incorporate_log"); WRITELOCK2((&internal_db), -1, "w internal_db db::incorporate_log", this); internal_db.setNoWriteThrough(); ret = f.execute_on_log(&(apply_log_entry), (char *) &internal_db); internal_db.clearNoWriteThrough(); WRITEUNLOCK2(this, (&internal_db), ret, ret, "wu db::incorporate_log", "wu mindex db::incorporate_log"); return (ret); } /* Load database and incorporate any logged updates into the loaded copy. Return TRUE if load succeeds; FALSE otherwise. */ bool_t db::load() { int count; int load_status; WRITELOCK(this, FALSE, "w db::load"); if (changed == TRUE) syslog(LOG_ERR, "WARNING: the current db '%s' has been changed but not checkpointed", dbfilename); unlink(tmpfilename); /* get rid of partial checkpoints */ if ((load_status = internal_db.load(dbfilename)) != 0) { if (load_status < 0) syslog(LOG_ERR, "Load of db '%s' failed", dbfilename); /* otherwise, there was just nothing to load */ WRITEUNLOCK(this, FALSE, "wu db::load"); return (FALSE); } changed = FALSE; reset_log(); WRITELOCK2((&internal_db), FALSE, "w internal_db db::load", this); internal_db.setInitialLoad(); if ((count = incorporate_log(logfilename)) < 0) syslog(LOG_ERR, "incorporation of db logfile '%s' load failed", logfilename); changed = (count > 0); internal_db.clearInitialLoad(); WRITEUNLOCK2(this, (&internal_db), (changed ? TRUE : FALSE), (changed ? TRUE : FALSE), "wu db::load", "wu internal_db db::load"); return (TRUE); } /* * Initialize the database using table scheme 's'. * Because the 'scheme' must be 'remembered' between restarts, * after the initialization, the empty database is checkpointed to record * the scheme. Returns TRUE if initialization succeeds; FALSE otherwise. */ bool_t db::init(db_scheme * s) { bool_t ret = FALSE; WRITELOCK(this, FALSE, "w db::init"); internal_db.init(s); if (internal_db.good()) { unlink(tmpfilename); /* delete partial checkpoints */ unlink(logfilename); /* delete previous logfile */ reset_log(); changed = TRUE; /* force dump to get scheme stored. */ ret = checkpoint(); } WRITEUNLOCK(this, FALSE, "wu db::init"); return (ret); } /* Write out in-memory copy of database to file. 1. Update major version. 2. Dump contents to temporary file. 3. Rename temporary file to real database file. 4. Remove log file. A checkpoint is done only if it has changed since the previous checkpoint. Returns TRUE if checkpoint was successful; FALSE otherwise. */ bool_t db::checkpoint() { WRITELOCK(this, FALSE, "w db::checkpoint"); if (changed == FALSE) { WRITEUNLOCK(this, FALSE, "wu db::checkpoint"); return (TRUE); } vers *oldversion = new vers(internal_db.get_version()); /* copy */ vers *nextversion = oldversion->nextmajor(); /* get next version */ internal_db.change_version(nextversion); /* change version */ if (internal_db.dump(tmpfilename) < 0) { /* dump to tempfile */ WARNING_M("db::checkpoint: could not dump database: "); internal_db.change_version(oldversion); /* rollback */ delete nextversion; delete oldversion; WRITEUNLOCK(this, FALSE, "wu db::checkpoint"); return (FALSE); } if (rename(tmpfilename, dbfilename) < 0){ /* rename permanently */ WARNING_M( "db::checkpoint: could not rename temp file to db file: "); internal_db.change_version(oldversion); /* rollback */ delete nextversion; delete oldversion; WRITEUNLOCK(this, FALSE, "wu db::checkpoint"); return (FALSE); } reset_log(); /* should check for what? */ unlink(logfilename); /* should do atomic rename and log delete */ delete nextversion; delete oldversion; changed = FALSE; WRITEUNLOCK(this, FALSE, "wu db::checkpoint"); return (TRUE); } /* For generating log_list */ struct traverse_info { vers *version; // version to check for db_log_entry * head; // head of list of log entries found db_log_entry * tail; // tail of list of log entries found }; /* * For the given entry determine, if it is later than the version supplied, * 1. increment 'count'. * 2. add the entry to the list of log entries found. * * Since traversal happens on an automatic (struct traverse_info) in * db::get_log_entries_since(), no locking is necessary. */ static bool_t entry_since(db_log_entry * j, char * tichar, int *count) { traverse_info *ti = (traverse_info*) tichar; if (ti->version->earlier_than(j->get_version())) { ++ *count; // j->print(); // debug if (ti->head == NULL) ti->head = j; else { ti->tail->setnextptr(j); // make last entry point to j } ti->tail = j; // make j new last entry } return (TRUE); } /* Return structure db_log_list containing entries that are later than the version 'v' given. */ db_log_list* db::get_log_entries_since(vers * v) { int count; struct traverse_info ti; db_log f(logfilename, PICKLE_READ); ti.version = v; ti.head = ti.tail = NULL; count = f.execute_on_log(&(entry_since), (char *) &ti, FALSE); db_log_list * answer = new db_log_list; if (answer == NULL) FATAL3("db::get_log_entries_since: cannot allocate space", DB_MEMORY_LIMIT, NULL); answer->list.list_len = count; if (count > 0) { db_log_entry_p *entries; db_log_entry_p currentry, nextentry; int i; entries = answer->list.list_val = new db_log_entry_p[count]; if (entries == NULL) { delete answer; FATAL3( "db::get_log_entries_since: cannot allocate space for entries", DB_MEMORY_LIMIT, NULL); } currentry = ti.head; for (i = 0, currentry = ti.head; i < count && currentry != NULL; i++) { entries[i] = currentry; nextentry = currentry->getnextptr(); currentry->setnextptr(NULL); currentry = nextentry; } } else answer->list.list_val = NULL; return (answer); } /* Delete all files associated with database. */ int db::remove_files() { WRITELOCK(this, -1, "w db::remove_files"); unlink(tmpfilename); /* delete partial checkpoints */ reset_log(); unlink(logfilename); /* delete logfile */ unlink(dbfilename); /* delete database file */ WRITEUNLOCK(this, -1, "wu db::remove_files"); return (0); } db_status db::sync_log() { db_status ret; WRITELOCK(this, DB_LOCK_ERROR, "w db::sync_log"); if (logfile == 0) { ret = DB_BADTABLE; } else { if (logfile_opened == FALSE || logfile->sync_log()) ret = DB_SUCCESS; else ret = DB_SYNC_FAILED; } WRITEUNLOCK(this, DB_LOCK_ERROR, "wu db::sync_log"); return (ret); } /* Pass configuration information to the db_mindex */ bool_t db::configure(char *objName) { return (internal_db.configure(objName)); } db_mindex * db::mindex(void) { return (&internal_db); }