1 //-< FILE.CPP >------------------------------------------------------*--------* 2 // FastDB Version 1.0 (c) 1999 GARRET * ? * 3 // (Main Memory Database Management System) * /\| * 4 // * / \ * 5 // Created: 20-Nov-98 K.A. Knizhnik * / [] \ * 6 // Last update: 10-Dec-98 K.A. Knizhnik * GARRET * 7 //-------------------------------------------------------------------*--------* 8 // System independent intrface to mapped on memory file 9 //-------------------------------------------------------------------*--------* 10 11 #ifndef __FILE_H__ 12 #define __FILE_H__ 13 14 #include "sync.h" 15 16 BEGIN_FASTDB_NAMESPACE 17 18 #if HAVE_ZLIB 19 #include "zlib.h" 20 #endif 21 22 #if defined(REPLICATION_SUPPORT) 23 const int dbModMapBlockBits = 12; // 10; 24 const int dbModMapBlockSize = 1 << dbModMapBlockBits; 25 #elif defined(NO_MMAP) 26 const int dbModMapBlockBits = 12; 27 const int dbModMapBlockSize = 1 << dbModMapBlockBits; 28 #endif 29 30 #ifdef REPLICATION_SUPPORT 31 32 class dbFile; 33 class dbReplicatedDatabase; 34 class socket_t; 35 36 struct ReplicationRequest { 37 enum { 38 RR_CONNECT, 39 RR_RECOVERY, 40 RR_GET_STATUS, 41 RR_STATUS, 42 RR_UPDATE_PAGE, 43 RR_RECOVER_PAGE, 44 RR_NEW_ACTIVE_NODE, 45 RR_CHANGE_ACTIVE_NODE, 46 RR_CLOSE, 47 RR_READY, 48 RR_COMMITTED 49 }; 50 byte op; 51 byte nodeId; 52 byte status; 53 int size; 54 struct { 55 int updateCount; 56 size_t offs; 57 } page; 58 }; 59 60 struct RecoveryRequest { 61 dbFile* file; 62 int nodeId; 63 int nPages; 64 int* updateCounters; 65 }; 66 #endif 67 68 #ifdef FUZZY_CHECKPOINT 69 class dbFileWriter; 70 #endif 71 72 class FASTDB_DLL_ENTRY dbFile { 73 protected: 74 #ifdef _WIN32 75 HANDLE fh; 76 HANDLE mh; 77 #else 78 #ifdef USE_SYSV_SHARED_MEMORY 79 dbSharedMemory shmem; 80 #endif 81 int fd; 82 #endif 83 char_t* sharedName; 84 char* mmapAddr; 85 size_t mmapSize; 86 int flags; 87 88 public: 89 enum { 90 ok = 0 91 }; 92 enum OpenFlags { 93 read_only = 0x001, // open file in read-only mode 94 truncate = 0x002, // truncate file when opened 95 sequential = 0x004, // optimize for sequenial access 96 no_buffering = 0x008, // do not use OS file cache 97 no_sync = 0x010, // do not flush data to the disk 98 shared = 0x020, // shared access to the file 99 write_through = 0x040, // write through 100 delete_on_close = 0x080, // delete file on close 101 ram_file = 0x100, // diskless mode 102 force_read = 0x200 // force load of mapped data from the file 103 }; 104 // 105 // Create backup file 106 // 107 virtual int create(char_t const* name, int flags = no_buffering); 108 // 109 // Open database file and create file mapping object 110 // 111 virtual int open(char_t const* fileName, char_t const* sharedName, 112 int flags, size_t initSize, bool replicationSupport); 113 getAddr()114 virtual void* getAddr() const { return mmapAddr; } getSize()115 virtual size_t getSize() const { return mmapSize; } 116 virtual int setSize(size_t size, char_t const* sharedName, bool initialize = true); 117 virtual int flush(bool physical = false); 118 virtual int close(); 119 virtual int erase(); 120 virtual int write(void const* ptr, size_t& writtenBytes, size_t size); 121 virtual int read(void* ptr, size_t& readBytes, size_t size); 122 virtual bool write(void const* ptr, size_t size); 123 virtual int seek(size_t pos, int whence); 124 125 static char* errorText(int code, char* buf, size_t bufSize); 126 127 int forceRead(char* data, size_t size); 128 129 #if defined(NO_MMAP) || defined(REPLICATION_SUPPORT) 130 131 #ifdef PROTECT_DATABASE 132 void protect(size_t pos, size_t size); 133 void unprotect(size_t pos, size_t size); 134 #endif 135 markAsDirty(size_t pos,size_t size)136 void markAsDirty(size_t pos, size_t size) { 137 if (!(flags & ram_file)) { 138 size_t page = pos >> dbModMapBlockBits; 139 size_t last = (pos + size + dbModMapBlockSize - 1) >> dbModMapBlockBits; 140 assert(int(last >> 5) <= pageMapSize); 141 while (page < last) { 142 pageMap[page >> 5] |= 1 << (page & 31); 143 page += 1; 144 } 145 } 146 } 147 148 private: 149 int* pageMap; 150 int pageMapSize; 151 int pageSize; 152 153 #ifdef FUZZY_CHECKPOINT 154 dbFileWriter* writer; 155 public: 156 void setCheckpointBufferSize(size_t nPages); 157 #endif 158 159 public: 160 int updateCounter; 161 162 #ifdef REPLICATION_SUPPORT 163 int* currUpdateCount; 164 int* diskUpdateCount; 165 byte* rootPage; 166 bool doSync; 167 bool closing; 168 169 dbReplicatedDatabase* db; 170 171 int getUpdateCountTableSize(); 172 int getMaxPages(); 173 174 dbMutex replCS; 175 dbMutex syncCS; 176 177 dbThread syncThread; 178 dbLocalEvent syncEvent; 179 dbLocalEvent recoveredEvent; 180 int nRecovered; 181 182 static int dbSyncTimeout; // milliseconds 183 184 #ifdef _WIN32 185 HANDLE cfh; 186 HANDLE cmh; 187 #else 188 int cfd; 189 #endif 190 191 static void thread_proc startSyncToDisk(void* arg); 192 static void thread_proc startRecovery(void* arg); 193 194 195 void doRecovery(int nodeId, int* updateCounters, int nPages); 196 197 int sendChanges(int nodeId, int* updateCounters, int nPages); 198 void completeRecovery(int nodeId); 199 200 void syncToDisk(); 201 void startSync(); 202 void stopSync(); 203 204 public: configure(dbReplicatedDatabase * db)205 void configure(dbReplicatedDatabase* db) { 206 this->db = db; 207 } 208 209 bool updatePages(socket_t* s, size_t pos, int updateCount, int size); 210 bool concurrentUpdatePages(socket_t* s, size_t pos, int updateCount, int size); 211 void recovery(int nodeId, int* updateCounters, int nPages); 212 #endif // REPLICATION_SUPPORT 213 214 215 #else markAsDirty(size_t,size_t)216 void markAsDirty(size_t, size_t) {} 217 #endif 218 219 bool write(size_t pos, void const* ptr, size_t size); 220 221 dbFile(); 222 virtual ~dbFile(); 223 }; 224 225 226 #if HAVE_ZLIB 227 class dbCompressedFile : public dbFile { 228 protected: 229 gzFile gzh; 230 public: 231 virtual int create(char_t const* name, int flags = truncate|no_buffering); 232 open(char_t const * fileName,char_t const * sharedName,int flags,size_t initSize,bool replicationSupport)233 virtual int open(char_t const* fileName, char_t const* sharedName, 234 int flags, size_t initSize, bool replicationSupport) { 235 assert(false); // not implemented 236 return 0; 237 } getAddr()238 virtual void* getAddr() const {SVM_ASSERTMSG(0, "Not Implemented"); return NULL;} getSize()239 virtual size_t getSize() const {SVM_ASSERTMSG(0, "Not Implemented"); return 0;} 240 virtual int setSize(size_t size, char_t const* sharedName, bool initialize = true) {SVM_ASSERTMSG(0, "Not Implemented"); return 0;} 241 virtual int flush(bool physical = false) {SVM_ASSERTMSG(0, "Not Implemented"); return 0;} erase()242 virtual int erase() {SVM_ASSERTMSG(0, "Not Implemented"); return 0;} 243 244 virtual int close(); 245 virtual int write(void const* ptr, size_t& writtenBytes, size_t size); 246 virtual int read(void* ptr, size_t& readBytes, size_t size); 247 }; 248 #endif 249 END_FASTDB_NAMESPACE 250 251 #endif 252