1 //-< FILE.CPP >------------------------------------------------------*--------*
2 // FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
3 // (Main Memory Database Management System)                          *   /\|  *
4 //                                                                   *  /  \  *
5 //                          Created:     20-Nov-98    K.A. Knizhnik  * / [] \ *
6 //                          Last update: 10-Dec-98    K.A. Knizhnik  * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // System independent intrface to mapped on memory file
9 //-------------------------------------------------------------------*--------*
10 
11 #ifndef __FILE_H__
12 #define __FILE_H__
13 
14 #include "sync.h"
15 
16 BEGIN_FASTDB_NAMESPACE
17 
18 #if HAVE_ZLIB
19 #include "zlib.h"
20 #endif
21 
22 #if defined(REPLICATION_SUPPORT)
23 const int dbModMapBlockBits = 12; // 10;
24 const int dbModMapBlockSize = 1 << dbModMapBlockBits;
25 #elif defined(NO_MMAP)
26 const int dbModMapBlockBits = 12;
27 const int dbModMapBlockSize = 1 << dbModMapBlockBits;
28 #endif
29 
30 #ifdef REPLICATION_SUPPORT
31 
32 class dbFile;
33 class dbReplicatedDatabase;
34 class socket_t;
35 
36 struct ReplicationRequest {
37     enum {
38         RR_CONNECT,
39         RR_RECOVERY,
40         RR_GET_STATUS,
41         RR_STATUS,
42         RR_UPDATE_PAGE,
43         RR_RECOVER_PAGE,
44         RR_NEW_ACTIVE_NODE,
45         RR_CHANGE_ACTIVE_NODE,
46         RR_CLOSE,
47         RR_READY,
48         RR_COMMITTED
49     };
50     byte op;
51     byte nodeId;
52     byte status;
53     int  size;
54     struct {
55         int updateCount;
56         size_t offs;
57     } page;
58 };
59 
60 struct RecoveryRequest {
61     dbFile*   file;
62     int       nodeId;
63     int       nPages;
64     int*      updateCounters;
65 };
66 #endif
67 
68 #ifdef FUZZY_CHECKPOINT
69 class  dbFileWriter;
70 #endif
71 
72 class FASTDB_DLL_ENTRY dbFile {
73   protected:
74 #ifdef _WIN32
75     HANDLE fh;
76     HANDLE mh;
77 #else
78 #ifdef USE_SYSV_SHARED_MEMORY
79     dbSharedMemory shmem;
80 #endif
81     int    fd;
82 #endif
83     char_t*  sharedName;
84     char*  mmapAddr;
85     size_t mmapSize;
86     int    flags;
87 
88   public:
89     enum {
90         ok = 0
91     };
92     enum OpenFlags {
93         read_only       = 0x001, // open file in read-only mode
94         truncate        = 0x002, // truncate file when opened
95         sequential      = 0x004, // optimize for sequenial access
96         no_buffering    = 0x008, // do not use OS file cache
97         no_sync         = 0x010, // do not flush data to the disk
98         shared          = 0x020, // shared access to the file
99         write_through   = 0x040, // write through
100         delete_on_close = 0x080, // delete file on close
101         ram_file        = 0x100, // diskless mode
102         force_read      = 0x200  // force load of mapped data from the file
103     };
104     //
105     // Create backup file
106     //
107     virtual int   create(char_t const* name, int flags = no_buffering);
108     //
109     // Open database file and create file mapping object
110     //
111     virtual int    open(char_t const* fileName, char_t const* sharedName,
112                         int flags, size_t initSize, bool replicationSupport);
113 
getAddr()114     virtual void*  getAddr() const { return mmapAddr; }
getSize()115     virtual size_t getSize() const { return mmapSize; }
116     virtual int    setSize(size_t size, char_t const* sharedName, bool initialize = true);
117     virtual int    flush(bool physical = false);
118     virtual int    close();
119     virtual int    erase();
120     virtual int    write(void const* ptr, size_t& writtenBytes, size_t size);
121     virtual int    read(void* ptr, size_t& readBytes, size_t size);
122     virtual bool   write(void const* ptr, size_t size);
123     virtual int    seek(size_t pos, int whence);
124 
125     static char* errorText(int code, char* buf, size_t bufSize);
126 
127     int forceRead(char* data, size_t size);
128 
129 #if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)
130 
131 #ifdef PROTECT_DATABASE
132     void protect(size_t pos, size_t size);
133     void unprotect(size_t pos, size_t size);
134 #endif
135 
markAsDirty(size_t pos,size_t size)136     void markAsDirty(size_t pos, size_t size) {
137         if (!(flags & ram_file)) {
138             size_t page = pos >> dbModMapBlockBits;
139             size_t last = (pos + size + dbModMapBlockSize - 1) >> dbModMapBlockBits;
140             assert(int(last >> 5) <= pageMapSize);
141             while (page < last) {
142                 pageMap[page >> 5] |= 1 << (page & 31);
143                 page += 1;
144             }
145         }
146     }
147 
148   private:
149     int* pageMap;
150     int  pageMapSize;
151     int  pageSize;
152 
153 #ifdef FUZZY_CHECKPOINT
154     dbFileWriter* writer;
155   public:
156     void setCheckpointBufferSize(size_t nPages);
157 #endif
158 
159   public:
160     int  updateCounter;
161 
162 #ifdef REPLICATION_SUPPORT
163     int*      currUpdateCount;
164     int*      diskUpdateCount;
165     byte*     rootPage;
166     bool      doSync;
167     bool      closing;
168 
169     dbReplicatedDatabase* db;
170 
171     int       getUpdateCountTableSize();
172     int       getMaxPages();
173 
174     dbMutex   replCS;
175     dbMutex   syncCS;
176 
177     dbThread     syncThread;
178     dbLocalEvent syncEvent;
179     dbLocalEvent recoveredEvent;
180     int          nRecovered;
181 
182     static int   dbSyncTimeout; // milliseconds
183 
184 #ifdef _WIN32
185     HANDLE    cfh;
186     HANDLE    cmh;
187 #else
188     int       cfd;
189 #endif
190 
191     static void thread_proc startSyncToDisk(void* arg);
192     static void thread_proc startRecovery(void* arg);
193 
194 
195     void doRecovery(int nodeId, int* updateCounters, int nPages);
196 
197     int sendChanges(int nodeId, int* updateCounters, int nPages);
198     void completeRecovery(int nodeId);
199 
200     void syncToDisk();
201     void startSync();
202     void stopSync();
203 
204   public:
configure(dbReplicatedDatabase * db)205     void configure(dbReplicatedDatabase* db) {
206         this->db = db;
207     }
208 
209     bool updatePages(socket_t* s, size_t pos, int updateCount, int size);
210     bool concurrentUpdatePages(socket_t* s, size_t pos, int updateCount, int size);
211     void recovery(int nodeId, int* updateCounters, int nPages);
212 #endif // REPLICATION_SUPPORT
213 
214 
215 #else
markAsDirty(size_t,size_t)216     void markAsDirty(size_t, size_t) {}
217 #endif
218 
219     bool write(size_t pos, void const* ptr, size_t size);
220 
221     dbFile();
222     virtual ~dbFile();
223 };
224 
225 
226 #if HAVE_ZLIB
227 class dbCompressedFile : public dbFile {
228   protected:
229     gzFile gzh;
230   public:
231     virtual int    create(char_t const* name, int flags = truncate|no_buffering);
232 
open(char_t const * fileName,char_t const * sharedName,int flags,size_t initSize,bool replicationSupport)233     virtual int    open(char_t const* fileName, char_t const* sharedName,
234                         int flags, size_t initSize, bool replicationSupport) {
235         assert(false); // not implemented
236         return 0;
237     }
getAddr()238     virtual void*  getAddr() const {SVM_ASSERTMSG(0, "Not Implemented"); return NULL;}
getSize()239     virtual size_t getSize() const {SVM_ASSERTMSG(0, "Not Implemented"); return 0;}
240     virtual int    setSize(size_t size, char_t const* sharedName, bool initialize = true)  {SVM_ASSERTMSG(0, "Not Implemented"); return 0;}
241     virtual int    flush(bool physical = false)  {SVM_ASSERTMSG(0, "Not Implemented"); return 0;}
erase()242     virtual int    erase() {SVM_ASSERTMSG(0, "Not Implemented"); return 0;}
243 
244     virtual int    close();
245     virtual int    write(void const* ptr, size_t& writtenBytes, size_t size);
246     virtual int    read(void* ptr, size_t& readBytes, size_t size);
247 };
248 #endif
249 END_FASTDB_NAMESPACE
250 
251 #endif
252