1 #include "iwkv_internal.h"
2 #include <sys/types.h>
3 #include <fcntl.h>
4 #include <time.h>
5 
6 #ifdef _WIN32
7 #include "win32/mman/mman.h"
8 #else
9 
10 #ifndef O_CLOEXEC
11 #define O_CLOEXEC 0
12 #endif
13 
14 #include <sys/mman.h>
15 
16 #endif
17 
18 extern atomic_uint_fast64_t g_trigger;
19 
20 #define BKP_STARTED     0x1       /**< Backup started */
21 #define BKP_WAL_CLEANUP 0x2       /**< Do checkpoint and truncate WAL file */
22 #define BKP_MAIN_COPY   0x3       /**< Copy main database file */
23 #define BKP_WAL_COPY1   0x4       /**< Copy most of WAL file content */
24 #define BKP_WAL_COPY2   0x5       /**< Copy rest of WAL file in exclusive locked mode */
25 
26 typedef struct IWAL {
27   IWDLSNR     lsnr;
28   atomic_bool applying;             /**< WAL applying */
29   atomic_bool open;                 /**< Is WAL in use */
30   atomic_bool force_cp;             /**< Next checkpoint scheduled */
31   atomic_bool synched;              /**< WAL is synched or WBFIXPOINT is the last write operation */
32   bool force_sp;                    /**< Next savepoint scheduled */
33   bool check_cp_crc;                /**< Check CRC32 sum of data blocks during checkpoint. Default: false  */
34   iwkv_openflags oflags;            /**< File open flags */
35   atomic_int     bkp_stage;         /**< Online backup stage */
36   size_t   wal_buffer_sz;           /**< WAL file intermediate buffer size. */
37   size_t   checkpoint_buffer_sz;    /**< Checkpoint buffer size in bytes. */
38   uint32_t bufpos;                  /**< Current position in buffer */
39   uint32_t bufsz;                   /**< Size of buffer */
40   HANDLE   fh;                      /**< File handle */
41   uint8_t *buf;                     /**< File buffer */
42   char    *path;                    /**< WAL file path */
43   pthread_mutex_t *mtxp;            /**< Global WAL mutex */
44   pthread_cond_t  *cpt_condp;       /**< Checkpoint thread cond variable */
45   pthread_t       *cptp;            /**< Checkpoint thread */
46   iwrc (*wal_lock_interceptor)(bool, void*);
47   /**< Optional function called
48        - before acquiring
49        - after releasing
50        exclusive database lock by WAL checkpoint thread.
51        In the case of `before lock` first argument will be set to true */
52   void    *wal_lock_interceptor_opaque;  /**< Opaque data for `wal_lock_interceptor` */
53   uint32_t savepoint_timeout_sec;        /**< Savepoint timeout seconds */
54   uint32_t checkpoint_timeout_sec;       /**< Checkpoint timeout seconds */
55   atomic_size_t mbytes;                  /**< Estimated size of modifed private mmaped memory bytes */
56   off_t    rollforward_offset;           /**< Rollforward offset during online backup */
57   uint64_t checkpoint_ts;                /**< Last checkpoint timestamp milliseconds */
58   pthread_mutex_t mtx;                   /**< Global WAL mutex */
59   pthread_cond_t  cpt_cond;              /**< Checkpoint thread cond variable */
60   pthread_t       cpt;                   /**< Checkpoint thread */
61   IWKV iwkv;
62 } IWAL;
63 
64 static iwrc _checkpoint_exl(IWAL *wal, uint64_t *tsp, bool no_fixpoint);
65 
_lock(IWAL * wal)66 IW_INLINE iwrc _lock(IWAL *wal) {
67   int rci = pthread_mutex_lock(wal->mtxp);
68   return (rci ? iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci) : 0);
69 }
70 
_unlock(IWAL * wal)71 IW_INLINE iwrc _unlock(IWAL *wal) {
72   int rci = pthread_mutex_unlock(wal->mtxp);
73   return (rci ? iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci) : 0);
74 }
75 
_excl_lock(IWAL * wal)76 static iwrc _excl_lock(IWAL *wal) {
77   iwrc rc = 0;
78   if (wal->wal_lock_interceptor) {
79     rc = wal->wal_lock_interceptor(true, wal->wal_lock_interceptor_opaque);
80     RCRET(rc);
81   }
82   rc = iwkv_exclusive_lock(wal->iwkv);
83   if (rc) {
84     if (wal->wal_lock_interceptor) {
85       IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
86     }
87     return rc;
88   }
89   rc = _lock(wal);
90   if (rc) {
91     IWRC(iwkv_exclusive_unlock(wal->iwkv), rc);
92     if (wal->wal_lock_interceptor) {
93       IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
94     }
95   }
96   return rc;
97 }
98 
_excl_unlock(IWAL * wal)99 static iwrc _excl_unlock(IWAL *wal) {
100   iwrc rc = _unlock(wal);
101   IWRC(iwkv_exclusive_unlock(wal->iwkv), rc);
102   if (wal->wal_lock_interceptor) {
103     IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
104   }
105   return rc;
106 }
107 
_init_locks(IWAL * wal)108 static iwrc _init_locks(IWAL *wal) {
109   int rci = pthread_mutex_init(&wal->mtx, 0);
110   if (rci) {
111     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
112   }
113   wal->mtxp = &wal->mtx;
114   return 0;
115 }
116 
_destroy(IWAL * wal)117 static void _destroy(IWAL *wal) {
118   if (wal) {
119     wal->open = false;
120     if (!INVALIDHANDLE(wal->fh)) {
121       iwp_unlock(wal->fh);
122       iwp_closefh(wal->fh);
123     }
124     if (wal->cpt_condp) {
125       pthread_cond_destroy(wal->cpt_condp);
126       wal->cpt_condp = 0;
127     }
128     if (wal->mtxp) {
129       pthread_mutex_destroy(wal->mtxp);
130       wal->mtxp = 0;
131     }
132     free(wal->path);
133     if (wal->buf) {
134       wal->buf -= sizeof(WBSEP);
135       free(wal->buf);
136     }
137     free(wal);
138   }
139 }
140 
_flush_wl(IWAL * wal,bool sync)141 static iwrc _flush_wl(IWAL *wal, bool sync) {
142   iwrc rc = 0;
143   if (wal->bufpos) {
144     uint32_t crc = wal->check_cp_crc ? iwu_crc32(wal->buf, wal->bufpos, 0) : 0;
145     WBSEP sep = {
146       .id  = WOP_SEP,
147       .crc = crc,
148       .len = wal->bufpos
149     };
150     size_t wz = wal->bufpos + sizeof(WBSEP);
151     uint8_t *wp = wal->buf - sizeof(WBSEP);
152     memcpy(wp, &sep, sizeof(WBSEP));
153     rc = iwp_write(wal->fh, wp, wz);
154     RCRET(rc);
155     wal->bufpos = 0;
156   }
157   if (sync) {
158     rc = iwp_fsync(wal->fh);
159   }
160   return rc;
161 }
162 
_truncate_wl(IWAL * wal)163 IW_INLINE iwrc _truncate_wl(IWAL *wal) {
164   iwrc rc = iwp_ftruncate(wal->fh, 0);
165   RCRET(rc);
166   wal->rollforward_offset = 0;
167   rc = iwp_lseek(wal->fh, 0, IWP_SEEK_SET, 0);
168   RCRET(rc);
169   rc = iwp_fsync(wal->fh);
170   return rc;
171 }
172 
_write_wl(IWAL * wal,const void * op,off_t oplen,const uint8_t * data,off_t len)173 static iwrc _write_wl(IWAL *wal, const void *op, off_t oplen, const uint8_t *data, off_t len) {
174   iwrc rc = 0;
175   const off_t bufsz = wal->bufsz;
176   wal->synched = false;
177   if (bufsz - wal->bufpos < oplen) {
178     rc = _flush_wl(wal, false);
179     RCRET(rc);
180   }
181   assert(bufsz - wal->bufpos >= oplen);
182   memcpy(wal->buf + wal->bufpos, op, (size_t) oplen);
183   wal->bufpos += oplen;
184   if (bufsz - wal->bufpos < len) {
185     rc = _flush_wl(wal, false);
186     RCRET(rc);
187     rc = iwp_write(wal->fh, data, (size_t) len);
188     RCRET(rc);
189   } else {
190     assert(bufsz - wal->bufpos >= len);
191     memcpy(wal->buf + wal->bufpos, data, (size_t) len);
192     wal->bufpos += len;
193   }
194   return rc;
195 }
196 
_write_op(IWAL * wal,const void * op,off_t oplen,const uint8_t * data,off_t len)197 IW_INLINE iwrc _write_op(IWAL *wal, const void *op, off_t oplen, const uint8_t *data, off_t len) {
198   iwrc rc = _lock(wal);
199   RCRET(rc);
200   rc = _write_wl(wal, op, oplen, data, len);
201   IWRC(_unlock(wal), rc);
202   return rc;
203 }
204 
iwal_sync(IWKV iwkv)205 iwrc iwal_sync(IWKV iwkv) {
206   IWAL *wal = (IWAL*) iwkv->dlsnr;
207   iwrc rc = _lock(wal);
208   RCRET(rc);
209   rc = _flush_wl(wal, true);
210   IWRC(_unlock(wal), rc);
211   return rc;
212 }
213 
_onopen(struct IWDLSNR * self,const char * path,int mode)214 static iwrc _onopen(struct IWDLSNR *self, const char *path, int mode) {
215   return 0;
216 }
217 
_onclosing(struct IWDLSNR * self)218 static iwrc _onclosing(struct IWDLSNR *self) {
219   IWAL *wal = (IWAL*) self;
220 #ifdef IW_TESTS
221   uint64_t tv = g_trigger;
222   if (tv & IWKVD_WAL_NO_CHECKPOINT_ON_CLOSE) {
223     _destroy(wal);
224     return 0;
225   }
226 #endif
227   iwrc rc = _checkpoint_exl(wal, 0, false);
228   _destroy(wal);
229   return rc;
230 }
231 
_onset(struct IWDLSNR * self,off_t off,uint8_t val,off_t len,int flags)232 static iwrc _onset(struct IWDLSNR *self, off_t off, uint8_t val, off_t len, int flags) {
233   IWAL *wal = (IWAL*) self;
234   if (wal->applying) {
235     return 0;
236   }
237   WBSET wb = {
238     .id  = WOP_SET,
239     .val = val,
240     .off = off,
241     .len = len
242   };
243   wal->mbytes += len;
244   return _write_op((IWAL*) self, &wb, sizeof(wb), 0, 0);
245 }
246 
_oncopy(struct IWDLSNR * self,off_t off,off_t len,off_t noff,int flags)247 static iwrc _oncopy(struct IWDLSNR *self, off_t off, off_t len, off_t noff, int flags) {
248   IWAL *wal = (IWAL*) self;
249   if (wal->applying) {
250     return 0;
251   }
252   WBCOPY wb = {
253     .id   = WOP_COPY,
254     .off  = off,
255     .len  = len,
256     .noff = noff
257   };
258   wal->mbytes += len;
259   return _write_op(wal, &wb, sizeof(wb), 0, 0);
260 }
261 
_onwrite(struct IWDLSNR * self,off_t off,const void * buf,off_t len,int flags)262 static iwrc _onwrite(struct IWDLSNR *self, off_t off, const void *buf, off_t len, int flags) {
263   assert(len <= (size_t) (-1));
264   IWAL *wal = (IWAL*) self;
265   if (wal->applying) {
266     return 0;
267   }
268   WBWRITE wb = {
269     .id  = WOP_WRITE,
270     .crc = wal->check_cp_crc ? iwu_crc32(buf,len,  0) : 0,
271     .len = len,
272     .off = off
273   };
274   wal->mbytes += len;
275   return _write_op(wal, &wb, sizeof(wb), buf, len);
276 }
277 
_onresize(struct IWDLSNR * self,off_t osize,off_t nsize,int flags,bool * handled)278 static iwrc _onresize(struct IWDLSNR *self, off_t osize, off_t nsize, int flags, bool *handled) {
279   IWAL *wal = (IWAL*) self;
280   if (wal->applying) {
281     *handled = false;
282     return 0;
283   }
284   *handled = true;
285   WBRESIZE wb = {
286     .id    = WOP_RESIZE,
287     .osize = osize,
288     .nsize = nsize
289   };
290   iwrc rc = _lock(wal);
291   RCRET(rc);
292   rc = _write_wl(wal, &wb, sizeof(wb), 0, 0);
293   RCGO(rc, finish);
294   rc = _checkpoint_exl(wal, 0, true);
295 finish:
296   IWRC(_unlock(wal), rc);
297   return rc;
298 }
299 
_onsynced(struct IWDLSNR * self,int flags)300 static iwrc _onsynced(struct IWDLSNR *self, int flags) {
301   IWAL *wal = (IWAL*) self;
302   if (wal->applying) {
303     return 0;
304   }
305   iwrc rc = _lock(wal);
306   RCRET(rc);
307   rc = _flush_wl(wal, true);
308   IWRC(_unlock(wal), rc);
309   return rc;
310 }
311 
_last_fix_and_reset_points(IWAL * wal,uint8_t * wmm,off_t fsz,off_t * fpos,off_t * rpos)312 static void _last_fix_and_reset_points(IWAL *wal, uint8_t *wmm, off_t fsz, off_t *fpos, off_t *rpos) {
313   uint8_t *rp = wmm;
314   *fpos = 0;
315   *rpos = 0;
316 
317   for (uint32_t i = 0; rp - wmm < fsz; ++i) {
318     uint8_t opid;
319     off_t avail = fsz - (rp - wmm);
320     memcpy(&opid, rp, 1);
321     if ((i == 0) && (opid != WOP_SEP)) {
322       return;
323     }
324     switch (opid) {
325       case WOP_SEP: {
326         WBSEP wb;
327         if (avail < sizeof(wb)) {
328           return;
329         }
330         memcpy(&wb, rp, sizeof(wb));
331         rp += sizeof(wb);
332         if (wb.len > avail) {
333           return;
334         }
335         break;
336       }
337       case WOP_SET: {
338         if (avail < sizeof(WBSET)) {
339           return;
340         }
341         rp += sizeof(WBSET);
342         break;
343       }
344       case WOP_COPY: {
345         if (avail < sizeof(WBCOPY)) {
346           return;
347         }
348         rp += sizeof(WBCOPY);
349         break;
350       }
351       case WOP_WRITE: {
352         WBWRITE wb;
353         if (avail < sizeof(wb)) {
354           return;
355         }
356         memcpy(&wb, rp, sizeof(wb));
357         rp += sizeof(wb);
358         if (avail < wb.len) {
359           return;
360         }
361         rp += wb.len;
362         break;
363       }
364       case WOP_RESIZE: {
365         if (avail < sizeof(WBRESIZE)) {
366           return;
367         }
368         rp += sizeof(WBRESIZE);
369         break;
370       }
371       case WOP_FIXPOINT: {
372         *fpos = (rp - wmm);
373         rp += sizeof(WBFIXPOINT);
374         break;
375       }
376       case WOP_RESET: {
377         *rpos = (rp - wmm);
378         rp += sizeof(WBRESET);
379         break;
380       }
381       default: {
382         return;
383         break;
384       }
385     }
386   }
387 }
388 
_rollforward_exl(IWAL * wal,IWFS_EXT * extf,int recover_mode)389 static iwrc _rollforward_exl(IWAL *wal, IWFS_EXT *extf, int recover_mode) {
390   assert(wal->bufpos == 0);
391   off_t fsz = 0;
392   iwrc rc = iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz);
393   RCRET(rc);
394   if (!fsz) { // empty wal log
395     return 0;
396   }
397   size_t sp;
398   uint8_t *mm;
399   const bool ccrc = wal->check_cp_crc;
400   off_t fpos = 0; // checkpoint
401 #ifndef _WIN32
402   off_t pfsz = IW_ROUNDUP(fsz, iwp_page_size());
403   uint8_t *wmm = mmap(0, (size_t) pfsz, PROT_READ, MAP_PRIVATE, wal->fh, 0);
404   #if defined(MADV_SEQUENTIAL) || defined(MADV_DONTFORK)
405   int adv = 0;
406   #ifdef MADV_SEQUENTIAL
407   adv |= MADV_SEQUENTIAL;
408   #endif
409   #ifdef MADV_DONTFORK
410   adv |= MADV_DONTFORK;
411   #endif
412   madvise(wmm, (size_t) fsz, adv);
413   #endif
414 #else
415   off_t pfsz = fsz;
416   uint8_t *wmm = mmap(0, 0, PROT_READ, MAP_PRIVATE, wal->fh, 0);
417 #endif
418   if (wmm == MAP_FAILED) {
419     return iwrc_set_errno(IW_ERROR_ERRNO, errno);
420   }
421   // Temporary turn off extf locking
422   wal->applying = true;
423 
424   // Remap fsm in MAP_SHARED mode
425   extf->remove_mmap_unsafe(extf, 0);
426   rc = extf->add_mmap_unsafe(extf, 0, SIZE_T_MAX, IWFS_MMAP_SHARED);
427   if (rc) {
428     munmap(wmm, (size_t) pfsz);
429     wal->iwkv->fatalrc = rc;
430     wal->applying = false;
431     return rc;
432   }
433 
434 #define _WAL_CORRUPTED(msg_) do { \
435     rc = IWKV_ERROR_CORRUPTED_WAL_FILE; \
436     iwlog_ecode_error2(rc, msg_); \
437     goto finish; \
438 } while (0);
439 
440   if (recover_mode) {
441     off_t rpos; // reset point
442     _last_fix_and_reset_points(wal, wmm, fsz, &fpos, &rpos);
443     if (!fpos) {
444       goto finish;
445     }
446     if ((rpos > 0) && (recover_mode == 1)) {
447       // Recover from last known reset point
448       if (fpos < rpos) {
449         goto finish;
450       }
451       // WBSEP__WBRESET
452       //        \_rpos
453       rpos -= sizeof(WBSEP);
454       // WBSEP__WBRESET
455       // \_rpos
456       wmm += rpos;
457       fsz -= rpos;
458     }
459   } else if (wal->rollforward_offset > 0) {
460     if (wal->rollforward_offset >= fsz) {
461       _WAL_CORRUPTED("Invalid rollforward offset");
462     }
463     wmm += wal->rollforward_offset;
464     fsz -= wal->rollforward_offset;
465   }
466 
467   uint8_t *rp = wmm;
468   for (uint32_t i = 0; rp - wmm < fsz; ++i) {
469     uint8_t opid;
470     off_t avail = fsz - (rp - wmm);
471     memcpy(&opid, rp, 1);
472     if ((i == 0) && (opid != WOP_SEP)) {
473       rc = IWKV_ERROR_CORRUPTED_WAL_FILE;
474       goto finish;
475     }
476     switch (opid) {
477       case WOP_SEP: {
478         WBSEP wb;
479         if (avail < sizeof(wb)) {
480           _WAL_CORRUPTED("Premature end of WAL (WBSEP)");
481         }
482         memcpy(&wb, rp, sizeof(wb));
483         rp += sizeof(wb);
484         if (wb.len > avail) {
485           _WAL_CORRUPTED("Premature end of WAL (WBSEP)");
486         }
487         if (ccrc && wb.crc) {
488           uint32_t crc = iwu_crc32(rp, wb.len, 0);
489           if (crc != wb.crc) {
490             _WAL_CORRUPTED("Invalid CRC32 checksum of WAL segment (WBSEP)");
491           }
492         }
493         break;
494       }
495       case WOP_SET: {
496         WBSET wb;
497         if (avail < sizeof(wb)) {
498           _WAL_CORRUPTED("Premature end of WAL (WBSET)");
499         }
500         memcpy(&wb, rp, sizeof(wb));
501         rp += sizeof(wb);
502         rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
503         RCGO(rc, finish);
504         memset(mm + wb.off, wb.val, (size_t) wb.len);
505         break;
506       }
507       case WOP_COPY: {
508         WBCOPY wb;
509         if (avail < sizeof(wb)) {
510           _WAL_CORRUPTED("Premature end of WAL (WBCOPY)");
511         }
512         memcpy(&wb, rp, sizeof(wb));
513         rp += sizeof(wb);
514         rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
515         RCGO(rc, finish);
516         memmove(mm + wb.noff, mm + wb.off, (size_t) wb.len);
517         break;
518       }
519       case WOP_WRITE: {
520         WBWRITE wb;
521         if (avail < sizeof(wb)) {
522           _WAL_CORRUPTED("Premature end of WAL (WBWRITE)");
523         }
524         memcpy(&wb, rp, sizeof(wb));
525         rp += sizeof(wb);
526         if (avail < wb.len) {
527           _WAL_CORRUPTED("Premature end of WAL (WBWRITE)");
528         }
529         if (ccrc && wb.crc) {
530           uint32_t crc = iwu_crc32(rp, wb.len, 0);
531           if (crc != wb.crc) {
532             _WAL_CORRUPTED("Invalid CRC32 checksum of WAL segment (WBWRITE)");
533           }
534         }
535         rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
536         RCGO(rc, finish);
537         memmove(mm + wb.off, rp, wb.len);
538         rp += wb.len;
539         break;
540       }
541       case WOP_RESIZE: {
542         WBRESIZE wb;
543         if (avail < sizeof(wb)) {
544           _WAL_CORRUPTED("Premature end of WAL (WBRESIZE)");
545         }
546         memcpy(&wb, rp, sizeof(wb));
547         rp += sizeof(wb);
548         rc = extf->truncate_unsafe(extf, wb.nsize);
549         RCGO(rc, finish);
550         break;
551       }
552       case WOP_FIXPOINT:
553         if (fpos == rp - wmm) { // last fixpoint to
554           WBFIXPOINT wb;
555           memcpy(&wb, rp, sizeof(wb));
556           iwlog_warn("Database recovered at point of time: %"
557                      PRIu64
558                      " ms since epoch\n", wb.ts);
559           goto finish;
560         }
561         rp += sizeof(WBFIXPOINT);
562         break;
563       case WOP_RESET: {
564         rp += sizeof(WBRESET);
565         break;
566       }
567       default: {
568         _WAL_CORRUPTED("Invalid WAL command");
569         break;
570       }
571     }
572   }
573 #undef _WAL_CORRUPTED
574 
575 finish:
576   if (!rc) {
577     rc = extf->sync_mmap_unsafe(extf, 0, IWFS_SYNCDEFAULT);
578   }
579   munmap(wmm, (size_t) pfsz);
580   IWRC(extf->remove_mmap_unsafe(extf, 0), rc);
581   IWRC(extf->add_mmap_unsafe(extf, 0, SIZE_T_MAX, IWFS_MMAP_PRIVATE), rc);
582   if (!rc) {
583     int stage = wal->bkp_stage;
584     if ((stage == 0) || (stage == BKP_WAL_CLEANUP)) {
585       rc = _truncate_wl(wal);
586     } else {
587       // Don't truncate WAL during online backup.
588       // Just append the WBRESET mark
589       WBRESET wb = {
590         .id = WOP_RESET
591       };
592       IWRC(_flush_wl(wal, false), rc);
593       // Write: WBSEP + WBRESET
594       IWRC(_write_wl(wal, &wb, sizeof(wb), 0, 0), rc);
595       IWRC(_flush_wl(wal, true), rc);
596       IWRC(iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz), rc);
597       if (!rc) {
598         // rollforward_offset points here --> WBSEP __ WBRESET __ EOF
599         wal->rollforward_offset = fsz - (sizeof(WBSEP) + sizeof(WBRESET));
600       }
601     }
602   }
603   if (rc && !wal->iwkv->fatalrc) {
604     wal->iwkv->fatalrc = rc;
605   }
606   wal->synched = true;
607   wal->applying = false;
608   return rc;
609 }
610 
_recover_wl(IWKV iwkv,IWAL * wal,IWFS_FSM_OPTS * fsmopts,bool recover_backup)611 static iwrc _recover_wl(IWKV iwkv, IWAL *wal, IWFS_FSM_OPTS *fsmopts, bool recover_backup) {
612   off_t fsz = 0;
613   iwrc rc = iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz);
614   RCRET(rc);
615   if (!fsz) { // empty wal log
616     return 0;
617   }
618   IWFS_EXT extf;
619   IWFS_EXT_OPTS extopts;
620   memcpy(&extopts, &fsmopts->exfile, sizeof(extopts));
621   extopts.use_locks = false;
622   extopts.file.omode = IWFS_OCREATE | IWFS_OWRITE;
623   extopts.file.dlsnr = 0;
624   rc = iwfs_exfile_open(&extf, &extopts);
625   RCRET(rc);
626   rc = _rollforward_exl(wal, &extf, recover_backup ? 2 : 1);
627   IWRC(extf.close(&extf), rc);
628   return rc;
629 }
630 
_need_checkpoint(IWAL * wal)631 IW_INLINE bool _need_checkpoint(IWAL *wal) {
632   uint64_t mbytes = wal->mbytes;
633   bool force = wal->force_cp;
634   return (force || mbytes >= wal->checkpoint_buffer_sz);
635 }
636 
_checkpoint_exl(IWAL * wal,uint64_t * tsp,bool no_fixpoint)637 static iwrc _checkpoint_exl(IWAL *wal, uint64_t *tsp, bool no_fixpoint) {
638   if (tsp) {
639     *tsp = 0;
640   }
641   int stage = wal->bkp_stage;
642   if (stage == BKP_MAIN_COPY) {
643     // No checkpoints during main file copying
644     return 0;
645   }
646   iwrc rc = 0;
647   IWFS_EXT *extf;
648   IWKV iwkv = wal->iwkv;
649   if (!no_fixpoint) {
650     wal->force_cp = false;
651     wal->force_sp = false;
652     WBFIXPOINT wb = {
653       .id = WOP_FIXPOINT
654     };
655     rc = iwp_current_time_ms(&wb.ts, false);
656     RCGO(rc, finish);
657     rc = _write_wl(wal, &wb, sizeof(wb), 0, 0);
658     RCGO(rc, finish);
659   }
660   rc = _flush_wl(wal, true);
661   RCGO(rc, finish);
662   rc = iwkv->fsm.extfile(&iwkv->fsm, &extf);
663   RCGO(rc, finish);
664 
665   rc = _rollforward_exl(wal, extf, 0);
666   wal->mbytes = 0;
667   wal->synched = true;
668   iwp_current_time_ms(&wal->checkpoint_ts, true);
669   if (tsp) {
670     *tsp = wal->checkpoint_ts;
671   }
672 
673 finish:
674   if (rc) {
675     if (iwkv->fatalrc) {
676       iwlog_ecode_error3(rc);
677     } else {
678       iwkv->fatalrc = rc;
679     }
680   }
681   return rc;
682 }
683 
684 #ifdef IW_TESTS
685 
iwal_test_checkpoint(IWKV iwkv)686 iwrc iwal_test_checkpoint(IWKV iwkv) {
687   if (!iwkv->dlsnr) {
688     return IWKV_ERROR_WAL_MODE_REQUIRED;
689   }
690   IWAL *wal = (IWAL*) iwkv->dlsnr;
691   iwrc rc = _excl_lock(wal);
692   RCRET(rc);
693   rc = _checkpoint_exl(wal, 0, false);
694   IWRC(_excl_unlock(wal), rc);
695   return rc;
696 }
697 
698 #endif
699 
700 //--------------------------------------- Public API
701 
iwal_poke_checkpoint(IWKV iwkv,bool force)702 WUR iwrc iwal_poke_checkpoint(IWKV iwkv, bool force) {
703   IWAL *wal = (IWAL*) iwkv->dlsnr;
704   if (!wal || !(force || _need_checkpoint(wal))) {
705     return 0;
706   }
707   iwrc rc = _lock(wal);
708   RCRET(rc);
709   bool cforce = wal->force_cp;
710   if (cforce) { // Forced already
711     _unlock(wal);
712     return 0;
713   } else if (force) {
714     wal->force_cp = true;
715   } else if (!_need_checkpoint(wal)) {
716     _unlock(wal);
717     return 0;
718   }
719   int rci = pthread_cond_broadcast(wal->cpt_condp);
720   if (rci) {
721     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
722   }
723   _unlock(wal);
724   return rc;
725 }
726 
iwal_poke_savepoint(IWKV iwkv)727 iwrc iwal_poke_savepoint(IWKV iwkv) {
728   IWAL *wal = (IWAL*) iwkv->dlsnr;
729   if (!wal) {
730     return 0;
731   }
732   iwrc rc = _lock(wal);
733   RCRET(rc);
734   bool fsp = wal->force_sp;
735   if (!fsp) {
736     wal->force_sp = true;
737     int rci = pthread_cond_broadcast(wal->cpt_condp);
738     if (rci) {
739       rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
740     }
741   }
742   _unlock(wal);
743   return rc;
744 }
745 
_savepoint_exl(IWAL * wal,uint64_t * tsp,bool sync)746 iwrc _savepoint_exl(IWAL *wal, uint64_t *tsp, bool sync) {
747   if (tsp) {
748     *tsp = 0;
749   }
750   wal->force_sp = false;
751   WBFIXPOINT wbfp = {
752     .id = WOP_FIXPOINT
753   };
754   iwrc rc = iwp_current_time_ms(&wbfp.ts, false);
755   RCRET(rc);
756   rc = _write_wl(wal, &wbfp, sizeof(wbfp), 0, 0);
757   RCRET(rc);
758   rc = _flush_wl(wal, sync);
759   RCRET(rc);
760   if (sync) {
761     wal->synched = true;
762   }
763   if (tsp) {
764     *tsp = wbfp.ts;
765   }
766   return 0;
767 }
768 
iwal_synched(IWKV iwkv)769 bool iwal_synched(IWKV iwkv) {
770   IWAL *wal = (IWAL*) iwkv->dlsnr;
771   if (!wal) {
772     return false;
773   }
774   return wal->synched;
775 }
776 
iwal_savepoint_exl(IWKV iwkv,bool sync)777 iwrc iwal_savepoint_exl(IWKV iwkv, bool sync) {
778   IWAL *wal = (IWAL*) iwkv->dlsnr;
779   if (!wal) {
780     return 0;
781   }
782   return _savepoint_exl(wal, 0, sync);
783 }
784 
iwal_shutdown(IWKV iwkv)785 void iwal_shutdown(IWKV iwkv) {
786   IWAL *wal = (IWAL*) iwkv->dlsnr;
787   if (!wal) {
788     return;
789   }
790   while (wal->bkp_stage) { // todo: review
791     iwp_sleep(50);
792   }
793   wal->open = false;
794   if (wal->mtxp && wal->cpt_condp) {
795     pthread_mutex_lock(wal->mtxp);
796     pthread_cond_broadcast(wal->cpt_condp);
797     pthread_mutex_unlock(wal->mtxp);
798   }
799   if (wal->cptp) {
800     pthread_join(wal->cpt, 0);
801     wal->cpt = 0;
802   }
803 }
804 
_cpt_worker_fn(void * op)805 static void* _cpt_worker_fn(void *op) {
806   int rci;
807   iwrc rc = 0;
808   IWAL *wal = op;
809   IWKV iwkv = wal->iwkv;
810   uint64_t savepoint_ts = 0;
811 
812   while (wal->open) {
813     struct timespec tp;
814     uint64_t tick_ts;
815     bool sp = false, cp = false;
816     rc = _lock(wal);
817     RCBREAK(rc);
818 
819     if (_need_checkpoint(wal)) {
820       cp = true;
821       _unlock(wal);
822       goto cprun;
823     } else if (wal->force_sp) {
824       sp = true;
825       _unlock(wal);
826       goto cprun;
827     }
828 
829 #if defined(IW_HAVE_CLOCK_MONOTONIC) && defined(IW_HAVE_PTHREAD_CONDATTR_SETCLOCK)
830     rc = iwp_clock_get_time(CLOCK_MONOTONIC, &tp);
831 #else
832     rc = iwp_clock_get_time(CLOCK_REALTIME, &tp);
833 #endif
834     if (rc) {
835       _unlock(wal);
836       break;
837     }
838     tp.tv_sec += 1; // one sec tick
839     tick_ts = tp.tv_sec * 1000 + (uint64_t) round(tp.tv_nsec / 1.0e6);
840     rci = pthread_cond_timedwait(wal->cpt_condp, wal->mtxp, &tp);
841     if (rci && (rci != ETIMEDOUT)) {
842       rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
843       _unlock(wal);
844       break;
845     }
846     if (!wal->open || iwkv->fatalrc) {
847       _unlock(wal);
848       break;
849     }
850     bool synched = wal->synched;
851     size_t mbytes = wal->mbytes;
852     cp = _need_checkpoint(wal) || ((mbytes && (tick_ts - wal->checkpoint_ts) >= 1000LL * wal->checkpoint_timeout_sec));
853     if (!cp) {
854       sp = !synched && (wal->force_sp || ((tick_ts - savepoint_ts) >= 1000LL * wal->savepoint_timeout_sec));
855     }
856     _unlock(wal);
857 
858 cprun:
859     if (cp || sp) {
860       rc = _excl_lock(wal);
861       RCBREAK(rc);
862       if (iwkv->open) {
863         if (cp) {
864           rc = _checkpoint_exl(wal, &savepoint_ts, false);
865         } else {
866           rc = _savepoint_exl(wal, &savepoint_ts, true);
867         }
868       }
869       _excl_unlock(wal);
870       if (rc) {
871         iwlog_ecode_error2(rc, "WAL worker savepoint/checkpoint error\n");
872         rc = 0;
873       }
874     }
875   }
876   if (rc) {
877     iwkv->fatalrc = iwkv->fatalrc ? iwkv->fatalrc : rc;
878     iwlog_ecode_error2(rc, "WAL worker exited with error\n");
879   }
880   return 0;
881 }
882 
iwal_online_backup(IWKV iwkv,uint64_t * ts,const char * target_file)883 iwrc iwal_online_backup(IWKV iwkv, uint64_t *ts, const char *target_file) {
884   iwrc rc;
885   size_t sp;
886   uint32_t lv;
887   uint64_t llv;
888   char buf[16384];
889   off_t off = 0, fsize = 0;
890   *ts = 0;
891 
892   if (!target_file) {
893     return IW_ERROR_INVALID_ARGS;
894   }
895   IWAL *wal = (IWAL*) iwkv->dlsnr;
896   if (!wal) {
897     return IWKV_ERROR_WAL_MODE_REQUIRED;
898   }
899   rc = _lock(wal);
900   RCRET(rc);
901   if (wal->bkp_stage) {
902     rc = IWKV_ERROR_BACKUP_IN_PROGRESS;
903   } else {
904     wal->bkp_stage = BKP_STARTED;
905   }
906   _unlock(wal);
907 
908 #ifndef _WIN32
909   HANDLE fh = open(target_file, O_CREAT | O_WRONLY | O_TRUNC, 00600);
910   if (INVALIDHANDLE(fh)) {
911     rc = iwrc_set_errno(IW_ERROR_IO_ERRNO, errno);
912     goto finish;
913   }
914 #else
915   HANDLE fh = CreateFile(target_file, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE,
916                          NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
917   if (INVALIDHANDLE(fh)) {
918     rc = iwrc_set_werror(IW_ERROR_IO_ERRNO, GetLastError());
919     goto finish;
920   }
921 #endif
922 
923   // Flush all pending WAL changes
924   rc = _excl_lock(wal);
925   RCGO(rc, finish);
926   wal->bkp_stage = BKP_WAL_CLEANUP;
927   rc = _checkpoint_exl(wal, 0, false);
928   wal->bkp_stage = BKP_MAIN_COPY;
929   _excl_unlock(wal);
930   RCGO(rc, finish);
931 
932   // Copy main database file
933   IWFS_FSM_STATE fstate = { 0 };
934   rc = iwkv->fsm.state(&iwkv->fsm, &fstate);
935   RCGO(rc, finish);
936   do {
937     rc = iwp_pread(fstate.exfile.file.fh, off, buf, sizeof(buf), &sp);
938     RCGO(rc, finish);
939     if (sp > 0) {
940       rc = iwp_write(fh, buf, sp);
941       RCGO(rc, finish);
942       off += sp;
943     }
944   } while (sp > 0);
945 
946   // Copy most of WAL file content
947   rc = _lock(wal);
948   RCGO(rc, finish);
949   wal->bkp_stage = BKP_WAL_COPY1;
950   rc = _flush_wl(wal, false);
951   _unlock(wal);
952   RCGO(rc, finish);
953 
954   fsize = off;
955   off = 0;
956   do {
957     rc = iwp_pread(wal->fh, off, buf, sizeof(buf), &sp);
958     RCGO(rc, finish);
959     if (sp > 0) {
960       rc = iwp_write(fh, buf, sp);
961       RCGO(rc, finish);
962       off += sp;
963     }
964   } while (sp > 0);
965 
966 
967   // Copy rest of WAL file in exclusive locked mode
968   rc = _excl_lock(wal);
969   RCGO(rc, finish);
970   wal->bkp_stage = BKP_WAL_COPY2;
971   rc = _savepoint_exl(wal, ts, true);
972   RCGO(rc, unlock);
973   do {
974     rc = iwp_pread(wal->fh, off, buf, sizeof(buf), &sp);
975     RCGO(rc, unlock);
976     if (sp > 0) {
977       rc = iwp_write(fh, buf, sp);
978       RCGO(rc, unlock);
979       off += sp;
980     }
981   } while (sp > 0);
982 
983   llv = IW_HTOILL(fsize);
984   rc = iwp_write(fh, &llv, sizeof(llv));
985   RCGO(rc, unlock);
986 
987   lv = IW_HTOIL(IWKV_BACKUP_MAGIC);
988   rc = iwp_write(fh, &lv, sizeof(lv));
989   RCGO(rc, unlock);
990 
991 unlock:
992   wal->bkp_stage = 0;
993   IWRC(_excl_unlock(wal), rc);
994 
995 finish:
996   if (rc) {
997     _lock(wal);
998     wal->bkp_stage = 0;
999     _unlock(wal);
1000   } else {
1001     rc = iwal_poke_checkpoint(iwkv, true);
1002   }
1003   if (!INVALIDHANDLE(fh)) {
1004     IWRC(iwp_fdatasync(fh), rc);
1005     IWRC(iwp_closefh(fh), rc);
1006   }
1007   return rc;
1008 }
1009 
_init_cpt(IWAL * wal)1010 iwrc _init_cpt(IWAL *wal) {
1011   if (  (wal->savepoint_timeout_sec == UINT32_MAX)
1012      && (wal->checkpoint_timeout_sec == UINT32_MAX)) {
1013     // do not start checkpoint thread
1014     return 0;
1015   }
1016   pthread_attr_t pattr;
1017   pthread_condattr_t cattr;
1018   int rci = pthread_condattr_init(&cattr);
1019   if (rci) {
1020     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1021   }
1022 #if defined(IW_HAVE_CLOCK_MONOTONIC) && defined(IW_HAVE_PTHREAD_CONDATTR_SETCLOCK)
1023   rci = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
1024   if (rci) {
1025     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1026   }
1027 #endif
1028   rci = pthread_cond_init(&wal->cpt_cond, &cattr);
1029   if (rci) {
1030     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1031   }
1032   wal->cpt_condp = &wal->cpt_cond;
1033   rci = pthread_attr_init(&pattr);
1034   if (rci) {
1035     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1036   }
1037   pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_JOINABLE);
1038   rci = pthread_create(&wal->cpt, &pattr, _cpt_worker_fn, wal);
1039   if (rci) {
1040     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1041   }
1042   wal->cptp = &wal->cpt;
1043   return 0;
1044 }
1045 
iwal_create(IWKV iwkv,const IWKV_OPTS * opts,IWFS_FSM_OPTS * fsmopts,bool recover_backup)1046 iwrc iwal_create(IWKV iwkv, const IWKV_OPTS *opts, IWFS_FSM_OPTS *fsmopts, bool recover_backup) {
1047   assert(!iwkv->dlsnr && opts && fsmopts);
1048   if (!opts) {
1049     return IW_ERROR_INVALID_ARGS;
1050   }
1051   if ((opts->oflags & IWKV_RDONLY) || !opts->wal.enabled) {
1052     return 0;
1053   }
1054   iwrc rc = 0;
1055   IWAL *wal = calloc(1, sizeof(*wal));
1056   if (!wal) {
1057     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1058   }
1059 
1060   wal->wal_lock_interceptor = opts->wal.wal_lock_interceptor;
1061   wal->wal_lock_interceptor_opaque = opts->wal.wal_lock_interceptor_opaque;
1062 
1063   size_t sz = strlen(opts->path);
1064   char *wpath = malloc(sz + 4 /*-wal*/ + 1 /*\0*/);
1065   if (!wpath) {
1066     free(wal);
1067     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1068   }
1069   memcpy(wpath, opts->path, sz);
1070   memcpy(wpath + sz, "-wal", 4);
1071   wpath[sz + 4] = '\0';
1072 
1073   wal->fh = INVALID_HANDLE_VALUE;
1074   wal->path = wpath;
1075   wal->oflags = opts->oflags;
1076   wal->iwkv = iwkv;
1077   iwp_current_time_ms(&wal->checkpoint_ts, true);
1078 
1079   rc = _init_locks(wal);
1080   RCGO(rc, finish);
1081 
1082   IWDLSNR *dlsnr = &wal->lsnr;
1083   dlsnr->onopen = _onopen;
1084   dlsnr->onclosing = _onclosing;
1085   dlsnr->onset = _onset;
1086   dlsnr->oncopy = _oncopy;
1087   dlsnr->onwrite = _onwrite;
1088   dlsnr->onresize = _onresize;
1089   dlsnr->onsynced = _onsynced;
1090   iwkv->dlsnr = (IWDLSNR*) wal;
1091 
1092   wal->wal_buffer_sz
1093     = opts->wal.wal_buffer_sz > 0
1094       ? opts->wal.wal_buffer_sz :
1095 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1096       2 * 1024 * 1024; // 2M
1097 #else
1098       8 * 1024 * 1024; // 8M
1099 #endif
1100   if (wal->wal_buffer_sz < 4096) {
1101     wal->wal_buffer_sz = 4096;
1102   }
1103 
1104   wal->checkpoint_buffer_sz
1105     = opts->wal.checkpoint_buffer_sz > 0
1106       ? opts->wal.checkpoint_buffer_sz :
1107 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1108       64ULL * 1024 * 1024; // 64M
1109 #else
1110       1024ULL * 1024 * 1024; // 1G
1111 #endif
1112   if (wal->checkpoint_buffer_sz < 1024 * 1024) { // 1M minimal
1113     wal->checkpoint_buffer_sz = 1024 * 1024;
1114   }
1115 
1116   wal->savepoint_timeout_sec
1117     = opts->wal.savepoint_timeout_sec > 0
1118       ? opts->wal.savepoint_timeout_sec : 10; // 10 sec
1119 
1120   wal->checkpoint_timeout_sec
1121     = opts->wal.checkpoint_timeout_sec > 0 ?
1122 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1123       opts->wal.checkpoint_timeout_sec : 60; // 1 min
1124 #else
1125       opts->wal.checkpoint_timeout_sec : 300; // 5 min
1126 #endif
1127 
1128   if (wal->checkpoint_timeout_sec < 10) { // 10 sec minimal
1129     wal->checkpoint_timeout_sec = 10;
1130   }
1131   if (wal->savepoint_timeout_sec >= wal->checkpoint_timeout_sec) {
1132     wal->savepoint_timeout_sec = wal->checkpoint_timeout_sec / 2;
1133   }
1134 
1135   wal->check_cp_crc = opts->wal.check_crc_on_checkpoint;
1136 
1137   wal->buf = malloc(wal->wal_buffer_sz);
1138   if (!wal->buf) {
1139     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1140     goto finish;
1141   }
1142   wal->buf += sizeof(WBSEP);
1143   wal->bufsz = wal->wal_buffer_sz - sizeof(WBSEP);
1144 
1145   // Now open WAL file
1146 
1147 #ifndef _WIN32
1148   HANDLE fh = open(wal->path, O_CREAT | O_RDWR | O_CLOEXEC, IWFS_DEFAULT_FILEMODE);
1149   if (INVALIDHANDLE(fh)) {
1150     rc = iwrc_set_errno(IW_ERROR_IO_ERRNO, errno);
1151     goto finish;
1152   }
1153 #else
1154   HANDLE fh = CreateFile(wal->path, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ,
1155                          NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
1156   if (INVALIDHANDLE(fh)) {
1157     rc = iwrc_set_werror(IW_ERROR_IO_ERRNO, GetLastError());
1158     goto finish;
1159   }
1160 #endif
1161 
1162   wal->fh = fh;
1163   rc = iwp_flock(wal->fh, IWP_WLOCK);
1164   RCGO(rc, finish);
1165 
1166   // Now force all fsm data to be privately mmaped.
1167   // We will apply wal log to main database file
1168   // then re-read our private mmaps
1169   fsmopts->mmap_opts = IWFS_MMAP_PRIVATE;
1170   fsmopts->exfile.file.dlsnr = iwkv->dlsnr;
1171 
1172   if (wal->oflags & IWKV_TRUNC) {
1173     rc = _truncate_wl(wal);
1174     RCGO(rc, finish);
1175   } else {
1176     rc = _recover_wl(iwkv, wal, fsmopts, recover_backup);
1177     RCGO(rc, finish);
1178   }
1179 
1180   wal->open = true;
1181   // Start checkpoint thread
1182   rc = _init_cpt(wal);
1183 
1184 finish:
1185   if (rc) {
1186     iwkv->dlsnr = 0;
1187     iwkv->fatalrc = iwkv->fatalrc ? iwkv->fatalrc : rc;
1188     iwal_shutdown(iwkv);
1189     _destroy(wal);
1190   }
1191   return rc;
1192 }
1193