1package litestream_test 2 3import ( 4 "context" 5 "database/sql" 6 "io/ioutil" 7 "os" 8 "path/filepath" 9 "strings" 10 "testing" 11 "time" 12 13 "github.com/benbjohnson/litestream" 14) 15 16func TestDB_Path(t *testing.T) { 17 db := litestream.NewDB("/tmp/db") 18 if got, want := db.Path(), `/tmp/db`; got != want { 19 t.Fatalf("Path()=%v, want %v", got, want) 20 } 21} 22 23func TestDB_WALPath(t *testing.T) { 24 db := litestream.NewDB("/tmp/db") 25 if got, want := db.WALPath(), `/tmp/db-wal`; got != want { 26 t.Fatalf("WALPath()=%v, want %v", got, want) 27 } 28} 29 30func TestDB_MetaPath(t *testing.T) { 31 t.Run("Absolute", func(t *testing.T) { 32 db := litestream.NewDB("/tmp/db") 33 if got, want := db.MetaPath(), `/tmp/.db-litestream`; got != want { 34 t.Fatalf("MetaPath()=%v, want %v", got, want) 35 } 36 }) 37 t.Run("Relative", func(t *testing.T) { 38 db := litestream.NewDB("db") 39 if got, want := db.MetaPath(), `.db-litestream`; got != want { 40 t.Fatalf("MetaPath()=%v, want %v", got, want) 41 } 42 }) 43} 44 45func TestDB_GenerationNamePath(t *testing.T) { 46 db := litestream.NewDB("/tmp/db") 47 if got, want := db.GenerationNamePath(), `/tmp/.db-litestream/generation`; got != want { 48 t.Fatalf("GenerationNamePath()=%v, want %v", got, want) 49 } 50} 51 52func TestDB_GenerationPath(t *testing.T) { 53 db := litestream.NewDB("/tmp/db") 54 if got, want := db.GenerationPath("xxxx"), `/tmp/.db-litestream/generations/xxxx`; got != want { 55 t.Fatalf("GenerationPath()=%v, want %v", got, want) 56 } 57} 58 59func TestDB_ShadowWALDir(t *testing.T) { 60 db := litestream.NewDB("/tmp/db") 61 if got, want := db.ShadowWALDir("xxxx"), `/tmp/.db-litestream/generations/xxxx/wal`; got != want { 62 t.Fatalf("ShadowWALDir()=%v, want %v", got, want) 63 } 64} 65 66func TestDB_ShadowWALPath(t *testing.T) { 67 db := litestream.NewDB("/tmp/db") 68 if got, want := db.ShadowWALPath("xxxx", 1000), `/tmp/.db-litestream/generations/xxxx/wal/000003e8.wal`; got != want { 69 t.Fatalf("ShadowWALPath()=%v, want %v", got, want) 70 } 71} 72 73// Ensure we can check the last modified time of the real database and its WAL. 74func TestDB_UpdatedAt(t *testing.T) { 75 t.Run("ErrNotExist", func(t *testing.T) { 76 db := MustOpenDB(t) 77 defer MustCloseDB(t, db) 78 if _, err := db.UpdatedAt(); !os.IsNotExist(err) { 79 t.Fatalf("unexpected error: %#v", err) 80 } 81 }) 82 83 t.Run("DB", func(t *testing.T) { 84 db, sqldb := MustOpenDBs(t) 85 defer MustCloseDBs(t, db, sqldb) 86 87 if t0, err := db.UpdatedAt(); err != nil { 88 t.Fatal(err) 89 } else if time.Since(t0) > 10*time.Second { 90 t.Fatalf("unexpected updated at time: %s", t0) 91 } 92 }) 93 94 t.Run("WAL", func(t *testing.T) { 95 db, sqldb := MustOpenDBs(t) 96 defer MustCloseDBs(t, db, sqldb) 97 98 t0, err := db.UpdatedAt() 99 if err != nil { 100 t.Fatal(err) 101 } 102 103 sleepTime := 100 * time.Millisecond 104 if os.Getenv("CI") != "" { 105 sleepTime = 1 * time.Second 106 } 107 time.Sleep(sleepTime) 108 109 if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil { 110 t.Fatal(err) 111 } 112 113 if t1, err := db.UpdatedAt(); err != nil { 114 t.Fatal(err) 115 } else if !t1.After(t0) { 116 t.Fatalf("expected newer updated at time: %s > %s", t1, t0) 117 } 118 }) 119} 120 121// Ensure we can compute a checksum on the real database. 122func TestDB_CRC64(t *testing.T) { 123 t.Run("ErrNotExist", func(t *testing.T) { 124 db := MustOpenDB(t) 125 defer MustCloseDB(t, db) 126 if _, _, err := db.CRC64(context.Background()); !os.IsNotExist(err) { 127 t.Fatalf("unexpected error: %#v", err) 128 } 129 }) 130 131 t.Run("DB", func(t *testing.T) { 132 db, sqldb := MustOpenDBs(t) 133 defer MustCloseDBs(t, db, sqldb) 134 135 if err := db.Sync(context.Background()); err != nil { 136 t.Fatal(err) 137 } 138 139 chksum0, _, err := db.CRC64(context.Background()) 140 if err != nil { 141 t.Fatal(err) 142 } 143 144 // Issue change that is applied to the WAL. Checksum should not change. 145 if _, err := sqldb.Exec(`CREATE TABLE t (id INT);`); err != nil { 146 t.Fatal(err) 147 } else if chksum1, _, err := db.CRC64(context.Background()); err != nil { 148 t.Fatal(err) 149 } else if chksum0 == chksum1 { 150 t.Fatal("expected different checksum event after WAL change") 151 } 152 153 // Checkpoint change into database. Checksum should change. 154 if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil { 155 t.Fatal(err) 156 } 157 158 if chksum2, _, err := db.CRC64(context.Background()); err != nil { 159 t.Fatal(err) 160 } else if chksum0 == chksum2 { 161 t.Fatal("expected different checksums after checkpoint") 162 } 163 }) 164} 165 166// Ensure we can sync the real WAL to the shadow WAL. 167func TestDB_Sync(t *testing.T) { 168 // Ensure sync is skipped if no database exists. 169 t.Run("NoDB", func(t *testing.T) { 170 db := MustOpenDB(t) 171 defer MustCloseDB(t, db) 172 if err := db.Sync(context.Background()); err != nil { 173 t.Fatal(err) 174 } 175 }) 176 177 // Ensure sync can successfully run on the initial sync. 178 t.Run("Initial", func(t *testing.T) { 179 db, sqldb := MustOpenDBs(t) 180 defer MustCloseDBs(t, db, sqldb) 181 182 if err := db.Sync(context.Background()); err != nil { 183 t.Fatal(err) 184 } 185 186 // Verify page size if now available. 187 if db.PageSize() == 0 { 188 t.Fatal("expected page size after initial sync") 189 } 190 191 // Obtain real WAL size. 192 fi, err := os.Stat(db.WALPath()) 193 if err != nil { 194 t.Fatal(err) 195 } 196 197 // Ensure position now available. 198 if pos, err := db.Pos(); err != nil { 199 t.Fatal(err) 200 } else if pos.Generation == "" { 201 t.Fatal("expected generation") 202 } else if got, want := pos.Index, 0; got != want { 203 t.Fatalf("pos.Index=%v, want %v", got, want) 204 } else if got, want := pos.Offset, fi.Size(); got != want { 205 t.Fatalf("pos.Offset=%v, want %v", got, want) 206 } 207 }) 208 209 // Ensure DB can keep in sync across multiple Sync() invocations. 210 t.Run("MultiSync", func(t *testing.T) { 211 db, sqldb := MustOpenDBs(t) 212 defer MustCloseDBs(t, db, sqldb) 213 214 // Execute a query to force a write to the WAL. 215 if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { 216 t.Fatal(err) 217 } 218 219 // Perform initial sync & grab initial position. 220 if err := db.Sync(context.Background()); err != nil { 221 t.Fatal(err) 222 } 223 224 pos0, err := db.Pos() 225 if err != nil { 226 t.Fatal(err) 227 } 228 229 // Insert into table. 230 if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil { 231 t.Fatal(err) 232 } 233 234 // Sync to ensure position moves forward one page. 235 if err := db.Sync(context.Background()); err != nil { 236 t.Fatal(err) 237 } else if pos1, err := db.Pos(); err != nil { 238 t.Fatal(err) 239 } else if pos0.Generation != pos1.Generation { 240 t.Fatal("expected the same generation") 241 } else if got, want := pos1.Index, pos0.Index; got != want { 242 t.Fatalf("Index=%v, want %v", got, want) 243 } else if got, want := pos1.Offset, pos0.Offset+4096+litestream.WALFrameHeaderSize; got != want { 244 t.Fatalf("Offset=%v, want %v", got, want) 245 } 246 }) 247 248 // Ensure a WAL file is created if one does not already exist. 249 t.Run("NoWAL", func(t *testing.T) { 250 db, sqldb := MustOpenDBs(t) 251 defer MustCloseDBs(t, db, sqldb) 252 253 // Issue initial sync and truncate WAL. 254 if err := db.Sync(context.Background()); err != nil { 255 t.Fatal(err) 256 } 257 258 // Obtain initial position. 259 pos0, err := db.Pos() 260 if err != nil { 261 t.Fatal(err) 262 } 263 264 // Checkpoint & fully close which should close WAL file. 265 if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil { 266 t.Fatal(err) 267 } else if err := db.Close(); err != nil { 268 t.Fatal(err) 269 } else if err := sqldb.Close(); err != nil { 270 t.Fatal(err) 271 } 272 273 // Verify WAL does not exist. 274 if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) { 275 t.Fatal(err) 276 } 277 278 // Reopen the managed database. 279 db = MustOpenDBAt(t, db.Path()) 280 defer MustCloseDB(t, db) 281 282 // Re-sync and ensure new generation has been created. 283 if err := db.Sync(context.Background()); err != nil { 284 t.Fatal(err) 285 } 286 287 // Obtain initial position. 288 if pos1, err := db.Pos(); err != nil { 289 t.Fatal(err) 290 } else if pos0.Generation == pos1.Generation { 291 t.Fatal("expected new generation after truncation") 292 } 293 }) 294 295 // Ensure DB can start new generation if it detects it cannot verify last position. 296 t.Run("OverwritePrevPosition", func(t *testing.T) { 297 db, sqldb := MustOpenDBs(t) 298 defer MustCloseDBs(t, db, sqldb) 299 300 // Execute a query to force a write to the WAL. 301 if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { 302 t.Fatal(err) 303 } 304 305 // Issue initial sync and truncate WAL. 306 if err := db.Sync(context.Background()); err != nil { 307 t.Fatal(err) 308 } 309 310 // Obtain initial position. 311 pos0, err := db.Pos() 312 if err != nil { 313 t.Fatal(err) 314 } 315 316 // Fully close which should close WAL file. 317 if err := db.Close(); err != nil { 318 t.Fatal(err) 319 } else if err := sqldb.Close(); err != nil { 320 t.Fatal(err) 321 } 322 323 // Verify WAL does not exist. 324 if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) { 325 t.Fatal(err) 326 } 327 328 // Insert into table multiple times to move past old offset 329 sqldb = MustOpenSQLDB(t, db.Path()) 330 defer MustCloseSQLDB(t, sqldb) 331 for i := 0; i < 100; i++ { 332 if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil { 333 t.Fatal(err) 334 } 335 } 336 337 // Reopen the managed database. 338 db = MustOpenDBAt(t, db.Path()) 339 defer MustCloseDB(t, db) 340 341 // Re-sync and ensure new generation has been created. 342 if err := db.Sync(context.Background()); err != nil { 343 t.Fatal(err) 344 } 345 346 // Obtain initial position. 347 if pos1, err := db.Pos(); err != nil { 348 t.Fatal(err) 349 } else if pos0.Generation == pos1.Generation { 350 t.Fatal("expected new generation after truncation") 351 } 352 }) 353 354 // Ensure DB can handle a mismatched header-only and start new generation. 355 t.Run("WALHeaderMismatch", func(t *testing.T) { 356 db, sqldb := MustOpenDBs(t) 357 defer MustCloseDBs(t, db, sqldb) 358 359 // Execute a query to force a write to the WAL and then sync. 360 if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { 361 t.Fatal(err) 362 } else if err := db.Sync(context.Background()); err != nil { 363 t.Fatal(err) 364 } 365 366 // Grab initial position & close. 367 pos0, err := db.Pos() 368 if err != nil { 369 t.Fatal(err) 370 } else if err := db.Close(); err != nil { 371 t.Fatal(err) 372 } 373 374 // Read existing file, update header checksum, and write back only header 375 // to simulate a header with a mismatched checksum. 376 shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index) 377 if buf, err := ioutil.ReadFile(shadowWALPath); err != nil { 378 t.Fatal(err) 379 } else if err := ioutil.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil { 380 t.Fatal(err) 381 } 382 383 // Reopen managed database & ensure sync will still work. 384 db = MustOpenDBAt(t, db.Path()) 385 defer MustCloseDB(t, db) 386 if err := db.Sync(context.Background()); err != nil { 387 t.Fatal(err) 388 } 389 390 // Verify a new generation was started. 391 if pos1, err := db.Pos(); err != nil { 392 t.Fatal(err) 393 } else if pos0.Generation == pos1.Generation { 394 t.Fatal("expected new generation") 395 } 396 }) 397 398 // Ensure DB can handle partial shadow WAL header write. 399 t.Run("PartialShadowWALHeader", func(t *testing.T) { 400 db, sqldb := MustOpenDBs(t) 401 defer MustCloseDBs(t, db, sqldb) 402 403 // Execute a query to force a write to the WAL and then sync. 404 if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { 405 t.Fatal(err) 406 } else if err := db.Sync(context.Background()); err != nil { 407 t.Fatal(err) 408 } 409 410 pos0, err := db.Pos() 411 if err != nil { 412 t.Fatal(err) 413 } 414 415 // Close & truncate shadow WAL to simulate a partial header write. 416 if err := db.Close(); err != nil { 417 t.Fatal(err) 418 } else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), litestream.WALHeaderSize-1); err != nil { 419 t.Fatal(err) 420 } 421 422 // Reopen managed database & ensure sync will still work. 423 db = MustOpenDBAt(t, db.Path()) 424 defer MustCloseDB(t, db) 425 if err := db.Sync(context.Background()); err != nil { 426 t.Fatal(err) 427 } 428 429 // Verify a new generation was started. 430 if pos1, err := db.Pos(); err != nil { 431 t.Fatal(err) 432 } else if pos0.Generation == pos1.Generation { 433 t.Fatal("expected new generation") 434 } 435 }) 436 437 // Ensure DB can handle partial shadow WAL writes. 438 t.Run("PartialShadowWALFrame", func(t *testing.T) { 439 db, sqldb := MustOpenDBs(t) 440 defer MustCloseDBs(t, db, sqldb) 441 442 // Execute a query to force a write to the WAL and then sync. 443 if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { 444 t.Fatal(err) 445 } else if err := db.Sync(context.Background()); err != nil { 446 t.Fatal(err) 447 } 448 449 pos0, err := db.Pos() 450 if err != nil { 451 t.Fatal(err) 452 } 453 454 // Obtain current shadow WAL size. 455 fi, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index)) 456 if err != nil { 457 t.Fatal(err) 458 } 459 460 // Close & truncate shadow WAL to simulate a partial frame write. 461 if err := db.Close(); err != nil { 462 t.Fatal(err) 463 } else if err := os.Truncate(db.ShadowWALPath(pos0.Generation, pos0.Index), fi.Size()-1); err != nil { 464 t.Fatal(err) 465 } 466 467 // Reopen managed database & ensure sync will still work. 468 db = MustOpenDBAt(t, db.Path()) 469 defer MustCloseDB(t, db) 470 if err := db.Sync(context.Background()); err != nil { 471 t.Fatal(err) 472 } 473 474 // Verify same generation is kept. 475 if pos1, err := db.Pos(); err != nil { 476 t.Fatal(err) 477 } else if got, want := pos1, pos0; got != want { 478 t.Fatalf("Pos()=%s want %s", got, want) 479 } 480 481 // Ensure shadow WAL has recovered. 482 if fi0, err := os.Stat(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil { 483 t.Fatal(err) 484 } else if got, want := fi0.Size(), fi.Size(); got != want { 485 t.Fatalf("Size()=%v, want %v", got, want) 486 } 487 }) 488 489 // Ensure DB can handle a generation directory with a missing shadow WAL. 490 t.Run("NoShadowWAL", func(t *testing.T) { 491 db, sqldb := MustOpenDBs(t) 492 defer MustCloseDBs(t, db, sqldb) 493 494 // Execute a query to force a write to the WAL and then sync. 495 if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { 496 t.Fatal(err) 497 } else if err := db.Sync(context.Background()); err != nil { 498 t.Fatal(err) 499 } 500 501 pos0, err := db.Pos() 502 if err != nil { 503 t.Fatal(err) 504 } 505 506 // Close & delete shadow WAL to simulate dir created but not WAL. 507 if err := db.Close(); err != nil { 508 t.Fatal(err) 509 } else if err := os.Remove(db.ShadowWALPath(pos0.Generation, pos0.Index)); err != nil { 510 t.Fatal(err) 511 } 512 513 // Reopen managed database & ensure sync will still work. 514 db = MustOpenDBAt(t, db.Path()) 515 defer MustCloseDB(t, db) 516 if err := db.Sync(context.Background()); err != nil { 517 t.Fatal(err) 518 } 519 520 // Verify new generation created but index/offset the same. 521 if pos1, err := db.Pos(); err != nil { 522 t.Fatal(err) 523 } else if pos0.Generation == pos1.Generation { 524 t.Fatal("expected new generation") 525 } else if got, want := pos1.Index, pos0.Index; got != want { 526 t.Fatalf("Index=%v want %v", got, want) 527 } else if got, want := pos1.Offset, pos0.Offset; got != want { 528 t.Fatalf("Offset=%v want %v", got, want) 529 } 530 }) 531 532 // Ensure DB checkpoints after minimum number of pages. 533 t.Run("MinCheckpointPageN", func(t *testing.T) { 534 db, sqldb := MustOpenDBs(t) 535 defer MustCloseDBs(t, db, sqldb) 536 537 // Execute a query to force a write to the WAL and then sync. 538 if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { 539 t.Fatal(err) 540 } else if err := db.Sync(context.Background()); err != nil { 541 t.Fatal(err) 542 } 543 544 // Write at least minimum number of pages to trigger rollover. 545 for i := 0; i < db.MinCheckpointPageN; i++ { 546 if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil { 547 t.Fatal(err) 548 } 549 } 550 551 // Sync to shadow WAL. 552 if err := db.Sync(context.Background()); err != nil { 553 t.Fatal(err) 554 } 555 556 // Ensure position is now on the second index. 557 if pos, err := db.Pos(); err != nil { 558 t.Fatal(err) 559 } else if got, want := pos.Index, 1; got != want { 560 t.Fatalf("Index=%v, want %v", got, want) 561 } 562 }) 563 564 // Ensure DB checkpoints after interval. 565 t.Run("CheckpointInterval", func(t *testing.T) { 566 db, sqldb := MustOpenDBs(t) 567 defer MustCloseDBs(t, db, sqldb) 568 569 // Execute a query to force a write to the WAL and then sync. 570 if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil { 571 t.Fatal(err) 572 } else if err := db.Sync(context.Background()); err != nil { 573 t.Fatal(err) 574 } 575 576 // Reduce checkpoint interval to ensure a rollover is triggered. 577 db.CheckpointInterval = 1 * time.Nanosecond 578 579 // Write to WAL & sync. 580 if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz');`); err != nil { 581 t.Fatal(err) 582 } else if err := db.Sync(context.Background()); err != nil { 583 t.Fatal(err) 584 } 585 586 // Ensure position is now on the second index. 587 if pos, err := db.Pos(); err != nil { 588 t.Fatal(err) 589 } else if got, want := pos.Index, 1; got != want { 590 t.Fatalf("Index=%v, want %v", got, want) 591 } 592 }) 593} 594 595// MustOpenDBs returns a new instance of a DB & associated SQL DB. 596func MustOpenDBs(tb testing.TB) (*litestream.DB, *sql.DB) { 597 tb.Helper() 598 db := MustOpenDB(tb) 599 return db, MustOpenSQLDB(tb, db.Path()) 600} 601 602// MustCloseDBs closes db & sqldb and removes the parent directory. 603func MustCloseDBs(tb testing.TB, db *litestream.DB, sqldb *sql.DB) { 604 tb.Helper() 605 MustCloseDB(tb, db) 606 MustCloseSQLDB(tb, sqldb) 607} 608 609// MustOpenDB returns a new instance of a DB. 610func MustOpenDB(tb testing.TB) *litestream.DB { 611 dir := tb.TempDir() 612 return MustOpenDBAt(tb, filepath.Join(dir, "db")) 613} 614 615// MustOpenDBAt returns a new instance of a DB for a given path. 616func MustOpenDBAt(tb testing.TB, path string) *litestream.DB { 617 tb.Helper() 618 db := litestream.NewDB(path) 619 db.MonitorInterval = 0 // disable background goroutine 620 if err := db.Open(); err != nil { 621 tb.Fatal(err) 622 } 623 return db 624} 625 626// MustCloseDB closes db and removes its parent directory. 627func MustCloseDB(tb testing.TB, db *litestream.DB) { 628 tb.Helper() 629 if err := db.Close(); err != nil && !strings.Contains(err.Error(), `database is closed`) { 630 tb.Fatal(err) 631 } else if err := os.RemoveAll(filepath.Dir(db.Path())); err != nil { 632 tb.Fatal(err) 633 } 634} 635 636// MustOpenSQLDB returns a database/sql DB. 637func MustOpenSQLDB(tb testing.TB, path string) *sql.DB { 638 tb.Helper() 639 d, err := sql.Open("sqlite3", path) 640 if err != nil { 641 tb.Fatal(err) 642 } else if _, err := d.Exec(`PRAGMA journal_mode = wal;`); err != nil { 643 tb.Fatal(err) 644 } 645 return d 646} 647 648// MustCloseSQLDB closes a database/sql DB. 649func MustCloseSQLDB(tb testing.TB, d *sql.DB) { 650 tb.Helper() 651 if err := d.Close(); err != nil { 652 tb.Fatal(err) 653 } 654} 655