1package litestream_test
2
3import (
4	"context"
5	"database/sql"
6	"io/ioutil"
7	"os"
8	"path/filepath"
9	"strings"
10	"testing"
11	"time"
12
13	"github.com/benbjohnson/litestream"
14)
15
16func TestDB_Path(t *testing.T) {
17	db := litestream.NewDB("/tmp/db")
18	if got, want := db.Path(), `/tmp/db`; got != want {
19		t.Fatalf("Path()=%v, want %v", got, want)
20	}
21}
22
23func TestDB_WALPath(t *testing.T) {
24	db := litestream.NewDB("/tmp/db")
25	if got, want := db.WALPath(), `/tmp/db-wal`; got != want {
26		t.Fatalf("WALPath()=%v, want %v", got, want)
27	}
28}
29
30func TestDB_MetaPath(t *testing.T) {
31	t.Run("Absolute", func(t *testing.T) {
32		db := litestream.NewDB("/tmp/db")
33		if got, want := db.MetaPath(), `/tmp/.db-litestream`; got != want {
34			t.Fatalf("MetaPath()=%v, want %v", got, want)
35		}
36	})
37	t.Run("Relative", func(t *testing.T) {
38		db := litestream.NewDB("db")
39		if got, want := db.MetaPath(), `.db-litestream`; got != want {
40			t.Fatalf("MetaPath()=%v, want %v", got, want)
41		}
42	})
43}
44
45func TestDB_GenerationNamePath(t *testing.T) {
46	db := litestream.NewDB("/tmp/db")
47	if got, want := db.GenerationNamePath(), `/tmp/.db-litestream/generation`; got != want {
48		t.Fatalf("GenerationNamePath()=%v, want %v", got, want)
49	}
50}
51
52func TestDB_GenerationPath(t *testing.T) {
53	db := litestream.NewDB("/tmp/db")
54	if got, want := db.GenerationPath("xxxx"), `/tmp/.db-litestream/generations/xxxx`; got != want {
55		t.Fatalf("GenerationPath()=%v, want %v", got, want)
56	}
57}
58
59func TestDB_ShadowWALDir(t *testing.T) {
60	db := litestream.NewDB("/tmp/db")
61	if got, want := db.ShadowWALDir("xxxx"), `/tmp/.db-litestream/generations/xxxx/wal`; got != want {
62		t.Fatalf("ShadowWALDir()=%v, want %v", got, want)
63	}
64}
65
66func TestDB_ShadowWALPath(t *testing.T) {
67	db := litestream.NewDB("/tmp/db")
68	if got, want := db.ShadowWALPath("xxxx", 1000), `/tmp/.db-litestream/generations/xxxx/wal/000003e8.wal`; got != want {
69		t.Fatalf("ShadowWALPath()=%v, want %v", got, want)
70	}
71}
72
73// Ensure we can check the last modified time of the real database and its WAL.
74func TestDB_UpdatedAt(t *testing.T) {
75	t.Run("ErrNotExist", func(t *testing.T) {
76		db := MustOpenDB(t)
77		defer MustCloseDB(t, db)
78		if _, err := db.UpdatedAt(); !os.IsNotExist(err) {
79			t.Fatalf("unexpected error: %#v", err)
80		}
81	})
82
83	t.Run("DB", func(t *testing.T) {
84		db, sqldb := MustOpenDBs(t)
85		defer MustCloseDBs(t, db, sqldb)
86
87		if t0, err := db.UpdatedAt(); err != nil {
88			t.Fatal(err)
89		} else if time.Since(t0) > 10*time.Second {
90			t.Fatalf("unexpected updated at time: %s", t0)
91		}
92	})
93
94	t.Run("WAL", func(t *testing.T) {
95		db, sqldb := MustOpenDBs(t)
96		defer MustCloseDBs(t, db, sqldb)
97
98		t0, err := db.UpdatedAt()
99		if err != nil {
100			t.Fatal(err)
101		}
102
103		sleepTime := 100 * time.Millisecond
104		if os.Getenv("CI") != "" {
105			sleepTime = 1 * time.Second
106		}
107		time.Sleep(sleepTime)
108
109		if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil {
110			t.Fatal(err)
111		}
112
113		if t1, err := db.UpdatedAt(); err != nil {
114			t.Fatal(err)
115		} else if !t1.After(t0) {
116			t.Fatalf("expected newer updated at time: %s > %s", t1, t0)
117		}
118	})
119}
120
121// Ensure we can compute a checksum on the real database.
122func TestDB_CRC64(t *testing.T) {
123	t.Run("ErrNotExist", func(t *testing.T) {
124		db := MustOpenDB(t)
125		defer MustCloseDB(t, db)
126		if _, _, err := db.CRC64(context.Background()); !os.IsNotExist(err) {
127			t.Fatalf("unexpected error: %#v", err)
128		}
129	})
130
131	t.Run("DB", func(t *testing.T) {
132		db, sqldb := MustOpenDBs(t)
133		defer MustCloseDBs(t, db, sqldb)
134
135		if err := db.Sync(context.Background()); err != nil {
136			t.Fatal(err)
137		}
138
139		chksum0, _, err := db.CRC64(context.Background())
140		if err != nil {
141			t.Fatal(err)
142		}
143
144		// Issue change that is applied to the WAL. Checksum should not change.
145		if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil {
146			t.Fatal(err)
147		} else if chksum1, _, err := db.CRC64(context.Background()); err != nil {
148			t.Fatal(err)
149		} else if chksum0 == chksum1 {
150			t.Fatal("expected different checksum event after WAL change")
151		}
152
153		// Checkpoint change into database. Checksum should change.
154		if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil {
155			t.Fatal(err)
156		}
157
158		if chksum2, _, err := db.CRC64(context.Background()); err != nil {
159			t.Fatal(err)
160		} else if chksum0 == chksum2 {
161			t.Fatal("expected different checksums after checkpoint")
162		}
163	})
164}
165
166// Ensure we can sync the real WAL to the shadow WAL.
167func TestDB_Sync(t *testing.T) {
168	// Ensure sync is skipped if no database exists.
169	t.Run("NoDB", func(t *testing.T) {
170		db := MustOpenDB(t)
171		defer MustCloseDB(t, db)
172		if err := db.Sync(context.Background()); err != nil {
173			t.Fatal(err)
174		}
175	})
176
177	// Ensure sync can successfully run on the initial sync.
178	t.Run("Initial", func(t *testing.T) {
179		db, sqldb := MustOpenDBs(t)
180		defer MustCloseDBs(t, db, sqldb)
181
182		if err := db.Sync(context.Background()); err != nil {
183			t.Fatal(err)
184		}
185
186		// Verify page size if now available.
187		if db.PageSize() == 0 {
188			t.Fatal("expected page size after initial sync")
189		}
190
191		// Obtain real WAL size.
192		fi, err := os.Stat(db.WALPath())
193		if err != nil {
194			t.Fatal(err)
195		}
196
197		// Ensure position now available.
198		if pos, err := db.Pos(); err != nil {
199			t.Fatal(err)
200		} else if pos.Generation == "" {
201			t.Fatal("expected generation")
202		} else if got, want := pos.Index, 0; got != want {
203			t.Fatalf("pos.Index=%v, want %v", got, want)
204		} else if got, want := pos.Offset, fi.Size(); got != want {
205			t.Fatalf("pos.Offset=%v, want %v", got, want)
206		}
207	})
208
209	// Ensure DB can keep in sync across multiple Sync() invocations.
210	t.Run("MultiSync", func(t *testing.T) {
211		db, sqldb := MustOpenDBs(t)
212		defer MustCloseDBs(t, db, sqldb)
213
214		// Execute a query to force a write to the WAL.
215		if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
216			t.Fatal(err)
217		}
218
219		// Perform initial sync & grab initial position.
220		if err := db.Sync(context.Background()); err != nil {
221			t.Fatal(err)
222		}
223
224		pos0, err := db.Pos()
225		if err != nil {
226			t.Fatal(err)
227		}
228
229		// Insert into table.
230		if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
231			t.Fatal(err)
232		}
233
234		// Sync to ensure position moves forward one page.
235		if err := db.Sync(context.Background()); err != nil {
236			t.Fatal(err)
237		} else if pos1, err := db.Pos(); err != nil {
238			t.Fatal(err)
239		} else if pos0.Generation != pos1.Generation {
240			t.Fatal("expected the same generation")
241		} else if got, want := pos1.Index, pos0.Index; got != want {
242			t.Fatalf("Index=%v, want %v", got, want)
243		} else if got, want := pos1.Offset, pos0.Offset+4096+litestream.WALFrameHeaderSize; got != want {
244			t.Fatalf("Offset=%v, want %v", got, want)
245		}
246	})
247
248	// Ensure a WAL file is created if one does not already exist.
249	t.Run("NoWAL", func(t *testing.T) {
250		db, sqldb := MustOpenDBs(t)
251		defer MustCloseDBs(t, db, sqldb)
252
253		// Issue initial sync and truncate WAL.
254		if err := db.Sync(context.Background()); err != nil {
255			t.Fatal(err)
256		}
257
258		// Obtain initial position.
259		pos0, err := db.Pos()
260		if err != nil {
261			t.Fatal(err)
262		}
263
264		// Checkpoint & fully close which should close WAL file.
265		if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil {
266			t.Fatal(err)
267		} else if err := db.Close(); err != nil {
268			t.Fatal(err)
269		} else if err := sqldb.Close(); err != nil {
270			t.Fatal(err)
271		}
272
273		// Verify WAL does not exist.
274		if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) {
275			t.Fatal(err)
276		}
277
278		// Reopen the managed database.
279		db = MustOpenDBAt(t, db.Path())
280		defer MustCloseDB(t, db)
281
282		// Re-sync and ensure new generation has been created.
283		if err := db.Sync(context.Background()); err != nil {
284			t.Fatal(err)
285		}
286
287		// Obtain initial position.
288		if pos1, err := db.Pos(); err != nil {
289			t.Fatal(err)
290		} else if pos0.Generation == pos1.Generation {
291			t.Fatal("expected new generation after truncation")
292		}
293	})
294
295	// Ensure DB can start new generation if it detects it cannot verify last position.
296	t.Run("OverwritePrevPosition", func(t *testing.T) {
297		db, sqldb := MustOpenDBs(t)
298		defer MustCloseDBs(t, db, sqldb)
299
300		// Execute a query to force a write to the WAL.
301		if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
302			t.Fatal(err)
303		}
304
305		// Issue initial sync and truncate WAL.
306		if err := db.Sync(context.Background()); err != nil {
307			t.Fatal(err)
308		}
309
310		// Obtain initial position.
311		pos0, err := db.Pos()
312		if err != nil {
313			t.Fatal(err)
314		}
315
316		// Fully close which should close WAL file.
317		if err := db.Close(); err != nil {
318			t.Fatal(err)
319		} else if err := sqldb.Close(); err != nil {
320			t.Fatal(err)
321		}
322
323		// Verify WAL does not exist.
324		if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) {
325			t.Fatal(err)
326		}
327
328		// Insert into table multiple times to move past old offset
329		sqldb = MustOpenSQLDB(t, db.Path())
330		defer MustCloseSQLDB(t, sqldb)
331		for i := 0; i < 100; i++ {
332			if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
333				t.Fatal(err)
334			}
335		}
336
337		// Reopen the managed database.
338		db = MustOpenDBAt(t, db.Path())
339		defer MustCloseDB(t, db)
340
341		// Re-sync and ensure new generation has been created.
342		if err := db.Sync(context.Background()); err != nil {
343			t.Fatal(err)
344		}
345
346		// Obtain initial position.
347		if pos1, err := db.Pos(); err != nil {
348			t.Fatal(err)
349		} else if pos0.Generation == pos1.Generation {
350			t.Fatal("expected new generation after truncation")
351		}
352	})
353
354	// Ensure DB can handle a mismatched header-only and start new generation.
355	t.Run("WALHeaderMismatch", func(t *testing.T) {
356		db, sqldb := MustOpenDBs(t)
357		defer MustCloseDBs(t, db, sqldb)
358
359		// Execute a query to force a write to the WAL and then sync.
360		if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
361			t.Fatal(err)
362		} else if err := db.Sync(context.Background()); err != nil {
363			t.Fatal(err)
364		}
365
366		// Grab initial position & close.
367		pos0, err := db.Pos()
368		if err != nil {
369			t.Fatal(err)
370		} else if err := db.Close(); err != nil {
371			t.Fatal(err)
372		}
373
374		// Read existing file, update header checksum, and write back only header
375		// to simulate a header with a mismatched checksum.
376		shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
377		if buf, err := ioutil.ReadFile(shadowWALPath); err != nil {
378			t.Fatal(err)
379		} else if err := ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
380			t.Fatal(err)
381		}
382
383		// Reopen managed database & ensure sync will still work.
384		db = MustOpenDBAt(t, db.Path())
385		defer MustCloseDB(t, db)
386		if err := db.Sync(context.Background()); err != nil {
387			t.Fatal(err)
388		}
389
390		// Verify a new generation was started.
391		if pos1, err := db.Pos(); err != nil {
392			t.Fatal(err)
393		} else if pos0.Generation == pos1.Generation {
394			t.Fatal("expected new generation")
395		}
396	})
397
398	// Ensure DB can handle partial shadow WAL header write.
399	t.Run("PartialShadowWALHeader", func(t *testing.T) {
400		db, sqldb := MustOpenDBs(t)
401		defer MustCloseDBs(t, db, sqldb)
402
403		// Execute a query to force a write to the WAL and then sync.
404		if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
405			t.Fatal(err)
406		} else if err := db.Sync(context.Background()); err != nil {
407			t.Fatal(err)
408		}
409
410		pos0, err := db.Pos()
411		if err != nil {
412			t.Fatal(err)
413		}
414
415		// Close & truncate shadow WAL to simulate a partial header write.
416		if err := db.Close(); err != nil {
417			t.Fatal(err)
418		} else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), litestream.WALHeaderSize-1); err != nil {
419			t.Fatal(err)
420		}
421
422		// Reopen managed database & ensure sync will still work.
423		db = MustOpenDBAt(t, db.Path())
424		defer MustCloseDB(t, db)
425		if err := db.Sync(context.Background()); err != nil {
426			t.Fatal(err)
427		}
428
429		// Verify a new generation was started.
430		if pos1, err := db.Pos(); err != nil {
431			t.Fatal(err)
432		} else if pos0.Generation == pos1.Generation {
433			t.Fatal("expected new generation")
434		}
435	})
436
437	// Ensure DB can handle partial shadow WAL writes.
438	t.Run("PartialShadowWALFrame", func(t *testing.T) {
439		db, sqldb := MustOpenDBs(t)
440		defer MustCloseDBs(t, db, sqldb)
441
442		// Execute a query to force a write to the WAL and then sync.
443		if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
444			t.Fatal(err)
445		} else if err := db.Sync(context.Background()); err != nil {
446			t.Fatal(err)
447		}
448
449		pos0, err := db.Pos()
450		if err != nil {
451			t.Fatal(err)
452		}
453
454		// Obtain current shadow WAL size.
455		fi, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index))
456		if err != nil {
457			t.Fatal(err)
458		}
459
460		// Close & truncate shadow WAL to simulate a partial frame write.
461		if err := db.Close(); err != nil {
462			t.Fatal(err)
463		} else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), fi.Size()-1); err != nil {
464			t.Fatal(err)
465		}
466
467		// Reopen managed database & ensure sync will still work.
468		db = MustOpenDBAt(t, db.Path())
469		defer MustCloseDB(t, db)
470		if err := db.Sync(context.Background()); err != nil {
471			t.Fatal(err)
472		}
473
474		// Verify same generation is kept.
475		if pos1, err := db.Pos(); err != nil {
476			t.Fatal(err)
477		} else if got, want := pos1, pos0; got != want {
478			t.Fatalf("Pos()=%s want %s", got, want)
479		}
480
481		// Ensure shadow WAL has recovered.
482		if fi0, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil {
483			t.Fatal(err)
484		} else if got, want := fi0.Size(), fi.Size(); got != want {
485			t.Fatalf("Size()=%v, want %v", got, want)
486		}
487	})
488
489	// Ensure DB can handle a generation directory with a missing shadow WAL.
490	t.Run("NoShadowWAL", func(t *testing.T) {
491		db, sqldb := MustOpenDBs(t)
492		defer MustCloseDBs(t, db, sqldb)
493
494		// Execute a query to force a write to the WAL and then sync.
495		if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
496			t.Fatal(err)
497		} else if err := db.Sync(context.Background()); err != nil {
498			t.Fatal(err)
499		}
500
501		pos0, err := db.Pos()
502		if err != nil {
503			t.Fatal(err)
504		}
505
506		// Close & delete shadow WAL to simulate dir created but not WAL.
507		if err := db.Close(); err != nil {
508			t.Fatal(err)
509		} else if err := os.Remove(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil {
510			t.Fatal(err)
511		}
512
513		// Reopen managed database & ensure sync will still work.
514		db = MustOpenDBAt(t, db.Path())
515		defer MustCloseDB(t, db)
516		if err := db.Sync(context.Background()); err != nil {
517			t.Fatal(err)
518		}
519
520		// Verify new generation created but index/offset the same.
521		if pos1, err := db.Pos(); err != nil {
522			t.Fatal(err)
523		} else if pos0.Generation == pos1.Generation {
524			t.Fatal("expected new generation")
525		} else if got, want := pos1.Index, pos0.Index; got != want {
526			t.Fatalf("Index=%v want %v", got, want)
527		} else if got, want := pos1.Offset, pos0.Offset; got != want {
528			t.Fatalf("Offset=%v want %v", got, want)
529		}
530	})
531
532	// Ensure DB checkpoints after minimum number of pages.
533	t.Run("MinCheckpointPageN", func(t *testing.T) {
534		db, sqldb := MustOpenDBs(t)
535		defer MustCloseDBs(t, db, sqldb)
536
537		// Execute a query to force a write to the WAL and then sync.
538		if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
539			t.Fatal(err)
540		} else if err := db.Sync(context.Background()); err != nil {
541			t.Fatal(err)
542		}
543
544		// Write at least minimum number of pages to trigger rollover.
545		for i := 0; i < db.MinCheckpointPageN; i++ {
546			if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
547				t.Fatal(err)
548			}
549		}
550
551		// Sync to shadow WAL.
552		if err := db.Sync(context.Background()); err != nil {
553			t.Fatal(err)
554		}
555
556		// Ensure position is now on the second index.
557		if pos, err := db.Pos(); err != nil {
558			t.Fatal(err)
559		} else if got, want := pos.Index, 1; got != want {
560			t.Fatalf("Index=%v, want %v", got, want)
561		}
562	})
563
564	// Ensure DB checkpoints after interval.
565	t.Run("CheckpointInterval", func(t *testing.T) {
566		db, sqldb := MustOpenDBs(t)
567		defer MustCloseDBs(t, db, sqldb)
568
569		// Execute a query to force a write to the WAL and then sync.
570		if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
571			t.Fatal(err)
572		} else if err := db.Sync(context.Background()); err != nil {
573			t.Fatal(err)
574		}
575
576		// Reduce checkpoint interval to ensure a rollover is triggered.
577		db.CheckpointInterval = 1 * time.Nanosecond
578
579		// Write to WAL & sync.
580		if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil {
581			t.Fatal(err)
582		} else if err := db.Sync(context.Background()); err != nil {
583			t.Fatal(err)
584		}
585
586		// Ensure position is now on the second index.
587		if pos, err := db.Pos(); err != nil {
588			t.Fatal(err)
589		} else if got, want := pos.Index, 1; got != want {
590			t.Fatalf("Index=%v, want %v", got, want)
591		}
592	})
593}
594
595// MustOpenDBs returns a new instance of a DB & associated SQL DB.
596func MustOpenDBs(tb testing.TB) (*litestream.DB, *sql.DB) {
597	tb.Helper()
598	db := MustOpenDB(tb)
599	return db, MustOpenSQLDB(tb, db.Path())
600}
601
602// MustCloseDBs closes db & sqldb and removes the parent directory.
603func MustCloseDBs(tb testing.TB, db *litestream.DB, sqldb *sql.DB) {
604	tb.Helper()
605	MustCloseDB(tb, db)
606	MustCloseSQLDB(tb, sqldb)
607}
608
609// MustOpenDB returns a new instance of a DB.
610func MustOpenDB(tb testing.TB) *litestream.DB {
611	dir := tb.TempDir()
612	return MustOpenDBAt(tb, filepath.Join(dir, "db"))
613}
614
615// MustOpenDBAt returns a new instance of a DB for a given path.
616func MustOpenDBAt(tb testing.TB, path string) *litestream.DB {
617	tb.Helper()
618	db := litestream.NewDB(path)
619	db.MonitorInterval = 0 // disable background goroutine
620	if err := db.Open(); err != nil {
621		tb.Fatal(err)
622	}
623	return db
624}
625
626// MustCloseDB closes db and removes its parent directory.
627func MustCloseDB(tb testing.TB, db *litestream.DB) {
628	tb.Helper()
629	if err := db.Close(); err != nil && !strings.Contains(err.Error(), `database is closed`) {
630		tb.Fatal(err)
631	} else if err := os.RemoveAll(filepath.Dir(db.Path())); err != nil {
632		tb.Fatal(err)
633	}
634}
635
636// MustOpenSQLDB returns a database/sql DB.
637func MustOpenSQLDB(tb testing.TB, path string) *sql.DB {
638	tb.Helper()
639	d, err := sql.Open("sqlite3", path)
640	if err != nil {
641		tb.Fatal(err)
642	} else if _, err := d.Exec(`PRAGMA journal_mode = wal;`); err != nil {
643		tb.Fatal(err)
644	}
645	return d
646}
647
648// MustCloseSQLDB closes a database/sql DB.
649func MustCloseSQLDB(tb testing.TB, d *sql.DB) {
650	tb.Helper()
651	if err := d.Close(); err != nil {
652		tb.Fatal(err)
653	}
654}
655