1package serf 2 3import ( 4 "bufio" 5 "fmt" 6 "log" 7 "math/rand" 8 "net" 9 "os" 10 "strconv" 11 "strings" 12 "time" 13 14 "github.com/armon/go-metrics" 15) 16 17/* 18Serf supports using a "snapshot" file that contains various 19transactional data that is used to help Serf recover quickly 20and gracefully from a failure. We append member events, as well 21as the latest clock values to the file during normal operation, 22and periodically checkpoint and roll over the file. During a restore, 23we can replay the various member events to recall a list of known 24nodes to re-join, as well as restore our clock values to avoid replaying 25old events. 26*/ 27 28const ( 29 // flushInterval is how often we force a flush of the snapshot file 30 flushInterval = 500 * time.Millisecond 31 32 // clockUpdateInterval is how often we fetch the current lamport time of the cluster and write to the snapshot file 33 clockUpdateInterval = 500 * time.Millisecond 34 35 // tmpExt is the extention we use for the temporary file during compaction 36 tmpExt = ".compact" 37 38 // snapshotErrorRecoveryInterval is how often we attempt to recover from 39 // errors writing to the snapshot file. 40 snapshotErrorRecoveryInterval = 30 * time.Second 41 42 // eventChSize is the size of the event buffers between Serf and the 43 // consuming application. If this is exhausted we will block Serf and Memberlist. 44 eventChSize = 2048 45 46 // shutdownFlushTimeout is the time limit to write pending events to the snapshot during a shutdown 47 shutdownFlushTimeout = 250 * time.Millisecond 48 49 // snapshotBytesPerNode is an estimated bytes per node to snapshot 50 snapshotBytesPerNode = 128 51 52 // snapshotCompactionThreshold is the threshold we apply to 53 // the snapshot size estimate (nodes * bytes per node) before compacting. 54 snapshotCompactionThreshold = 2 55) 56 57// Snapshotter is responsible for ingesting events and persisting 58// them to disk, and providing a recovery mechanism at start time. 59type Snapshotter struct { 60 aliveNodes map[string]string 61 clock *LamportClock 62 fh *os.File 63 buffered *bufio.Writer 64 inCh <-chan Event 65 streamCh chan Event 66 lastFlush time.Time 67 lastClock LamportTime 68 lastEventClock LamportTime 69 lastQueryClock LamportTime 70 leaveCh chan struct{} 71 leaving bool 72 logger *log.Logger 73 minCompactSize int64 74 path string 75 offset int64 76 outCh chan<- Event 77 rejoinAfterLeave bool 78 shutdownCh <-chan struct{} 79 waitCh chan struct{} 80 lastAttemptedCompaction time.Time 81} 82 83// PreviousNode is used to represent the previously known alive nodes 84type PreviousNode struct { 85 Name string 86 Addr string 87} 88 89func (p PreviousNode) String() string { 90 return fmt.Sprintf("%s: %s", p.Name, p.Addr) 91} 92 93// NewSnapshotter creates a new Snapshotter that records events up to a 94// max byte size before rotating the file. It can also be used to 95// recover old state. Snapshotter works by reading an event channel it returns, 96// passing through to an output channel, and persisting relevant events to disk. 97// Setting rejoinAfterLeave makes leave not clear the state, and can be used 98// if you intend to rejoin the same cluster after a leave. 99func NewSnapshotter(path string, 100 minCompactSize int, 101 rejoinAfterLeave bool, 102 logger *log.Logger, 103 clock *LamportClock, 104 outCh chan<- Event, 105 shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) { 106 inCh := make(chan Event, eventChSize) 107 streamCh := make(chan Event, eventChSize) 108 109 // Try to open the file 110 fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644) 111 if err != nil { 112 return nil, nil, fmt.Errorf("failed to open snapshot: %v", err) 113 } 114 115 // Determine the offset 116 info, err := fh.Stat() 117 if err != nil { 118 fh.Close() 119 return nil, nil, fmt.Errorf("failed to stat snapshot: %v", err) 120 } 121 offset := info.Size() 122 123 // Create the snapshotter 124 snap := &Snapshotter{ 125 aliveNodes: make(map[string]string), 126 clock: clock, 127 fh: fh, 128 buffered: bufio.NewWriter(fh), 129 inCh: inCh, 130 streamCh: streamCh, 131 lastClock: 0, 132 lastEventClock: 0, 133 lastQueryClock: 0, 134 leaveCh: make(chan struct{}), 135 logger: logger, 136 minCompactSize: int64(minCompactSize), 137 path: path, 138 offset: offset, 139 outCh: outCh, 140 rejoinAfterLeave: rejoinAfterLeave, 141 shutdownCh: shutdownCh, 142 waitCh: make(chan struct{}), 143 } 144 145 // Recover the last known state 146 if err := snap.replay(); err != nil { 147 fh.Close() 148 return nil, nil, err 149 } 150 151 // Start handling new commands 152 go snap.teeStream() 153 go snap.stream() 154 return inCh, snap, nil 155} 156 157// LastClock returns the last known clock time 158func (s *Snapshotter) LastClock() LamportTime { 159 return s.lastClock 160} 161 162// LastEventClock returns the last known event clock time 163func (s *Snapshotter) LastEventClock() LamportTime { 164 return s.lastEventClock 165} 166 167// LastQueryClock returns the last known query clock time 168func (s *Snapshotter) LastQueryClock() LamportTime { 169 return s.lastQueryClock 170} 171 172// AliveNodes returns the last known alive nodes 173func (s *Snapshotter) AliveNodes() []*PreviousNode { 174 // Copy the previously known 175 previous := make([]*PreviousNode, 0, len(s.aliveNodes)) 176 for name, addr := range s.aliveNodes { 177 previous = append(previous, &PreviousNode{name, addr}) 178 } 179 180 // Randomize the order, prevents hot shards 181 for i := range previous { 182 j := rand.Intn(i + 1) 183 previous[i], previous[j] = previous[j], previous[i] 184 } 185 return previous 186} 187 188// Wait is used to wait until the snapshotter finishes shut down 189func (s *Snapshotter) Wait() { 190 <-s.waitCh 191} 192 193// Leave is used to remove known nodes to prevent a restart from 194// causing a join. Otherwise nodes will re-join after leaving! 195func (s *Snapshotter) Leave() { 196 select { 197 case s.leaveCh <- struct{}{}: 198 case <-s.shutdownCh: 199 } 200} 201 202// teeStream is a long running routine that is used to copy events 203// to the output channel and the internal event handler. 204func (s *Snapshotter) teeStream() { 205 flushEvent := func(e Event) { 206 // Forward to the internal stream, do not block 207 select { 208 case s.streamCh <- e: 209 default: 210 } 211 212 // Forward the event immediately, do not block 213 if s.outCh != nil { 214 select { 215 case s.outCh <- e: 216 default: 217 } 218 } 219 } 220 221OUTER: 222 for { 223 select { 224 case e := <-s.inCh: 225 flushEvent(e) 226 case <-s.shutdownCh: 227 break OUTER 228 } 229 } 230 231 // Drain any remaining events before exiting 232 for { 233 select { 234 case e := <-s.inCh: 235 flushEvent(e) 236 default: 237 return 238 } 239 } 240} 241 242// stream is a long running routine that is used to handle events 243func (s *Snapshotter) stream() { 244 clockTicker := time.NewTicker(clockUpdateInterval) 245 defer clockTicker.Stop() 246 247 // flushEvent is used to handle writing out an event 248 flushEvent := func(e Event) { 249 // Stop recording events after a leave is issued 250 if s.leaving { 251 return 252 } 253 switch typed := e.(type) { 254 case MemberEvent: 255 s.processMemberEvent(typed) 256 case UserEvent: 257 s.processUserEvent(typed) 258 case *Query: 259 s.processQuery(typed) 260 default: 261 s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e) 262 } 263 } 264 265 for { 266 select { 267 case <-s.leaveCh: 268 s.leaving = true 269 270 // If we plan to re-join, keep our state 271 if !s.rejoinAfterLeave { 272 s.aliveNodes = make(map[string]string) 273 } 274 s.tryAppend("leave\n") 275 if err := s.buffered.Flush(); err != nil { 276 s.logger.Printf("[ERR] serf: failed to flush leave to snapshot: %v", err) 277 } 278 if err := s.fh.Sync(); err != nil { 279 s.logger.Printf("[ERR] serf: failed to sync leave to snapshot: %v", err) 280 } 281 282 case e := <-s.streamCh: 283 flushEvent(e) 284 285 case <-clockTicker.C: 286 s.updateClock() 287 288 case <-s.shutdownCh: 289 // Setup a timeout 290 flushTimeout := time.After(shutdownFlushTimeout) 291 292 // Snapshot the clock 293 s.updateClock() 294 295 // Clear out the buffers 296 FLUSH: 297 for { 298 select { 299 case e := <-s.streamCh: 300 flushEvent(e) 301 case <-flushTimeout: 302 break FLUSH 303 default: 304 break FLUSH 305 } 306 } 307 308 if err := s.buffered.Flush(); err != nil { 309 s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err) 310 } 311 if err := s.fh.Sync(); err != nil { 312 s.logger.Printf("[ERR] serf: failed to sync snapshot: %v", err) 313 } 314 s.fh.Close() 315 close(s.waitCh) 316 return 317 } 318 } 319} 320 321// processMemberEvent is used to handle a single member event 322func (s *Snapshotter) processMemberEvent(e MemberEvent) { 323 switch e.Type { 324 case EventMemberJoin: 325 for _, mem := range e.Members { 326 addr := net.TCPAddr{IP: mem.Addr, Port: int(mem.Port)} 327 s.aliveNodes[mem.Name] = addr.String() 328 s.tryAppend(fmt.Sprintf("alive: %s %s\n", mem.Name, addr.String())) 329 } 330 331 case EventMemberLeave: 332 fallthrough 333 case EventMemberFailed: 334 for _, mem := range e.Members { 335 delete(s.aliveNodes, mem.Name) 336 s.tryAppend(fmt.Sprintf("not-alive: %s\n", mem.Name)) 337 } 338 } 339 s.updateClock() 340} 341 342// updateClock is called periodically to check if we should udpate our 343// clock value. This is done after member events but should also be done 344// periodically due to race conditions with join and leave intents 345func (s *Snapshotter) updateClock() { 346 lastSeen := s.clock.Time() - 1 347 if lastSeen > s.lastClock { 348 s.lastClock = lastSeen 349 s.tryAppend(fmt.Sprintf("clock: %d\n", s.lastClock)) 350 } 351} 352 353// processUserEvent is used to handle a single user event 354func (s *Snapshotter) processUserEvent(e UserEvent) { 355 // Ignore old clocks 356 if e.LTime <= s.lastEventClock { 357 return 358 } 359 s.lastEventClock = e.LTime 360 s.tryAppend(fmt.Sprintf("event-clock: %d\n", e.LTime)) 361} 362 363// processQuery is used to handle a single query event 364func (s *Snapshotter) processQuery(q *Query) { 365 // Ignore old clocks 366 if q.LTime <= s.lastQueryClock { 367 return 368 } 369 s.lastQueryClock = q.LTime 370 s.tryAppend(fmt.Sprintf("query-clock: %d\n", q.LTime)) 371} 372 373// tryAppend will invoke append line but will not return an error 374func (s *Snapshotter) tryAppend(l string) { 375 if err := s.appendLine(l); err != nil { 376 s.logger.Printf("[ERR] serf: Failed to update snapshot: %v", err) 377 now := time.Now() 378 if now.Sub(s.lastAttemptedCompaction) > snapshotErrorRecoveryInterval { 379 s.lastAttemptedCompaction = now 380 s.logger.Printf("[INFO] serf: Attempting compaction to recover from error...") 381 err = s.compact() 382 if err != nil { 383 s.logger.Printf("[ERR] serf: Compaction failed, will reattempt after %v: %v", snapshotErrorRecoveryInterval, err) 384 } else { 385 s.logger.Printf("[INFO] serf: Finished compaction, successfully recovered from error state") 386 } 387 } 388 } 389} 390 391// appendLine is used to append a line to the existing log 392func (s *Snapshotter) appendLine(l string) error { 393 defer metrics.MeasureSince([]string{"serf", "snapshot", "appendLine"}, time.Now()) 394 395 n, err := s.buffered.WriteString(l) 396 if err != nil { 397 return err 398 } 399 400 // Check if we should flush 401 now := time.Now() 402 if now.Sub(s.lastFlush) > flushInterval { 403 s.lastFlush = now 404 if err := s.buffered.Flush(); err != nil { 405 return err 406 } 407 } 408 409 // Check if a compaction is necessary 410 s.offset += int64(n) 411 if s.offset > s.snapshotMaxSize() { 412 return s.compact() 413 } 414 return nil 415} 416 417// snapshotMaxSize computes the maximum size and is used to force periodic compaction. 418func (s *Snapshotter) snapshotMaxSize() int64 { 419 nodes := int64(len(s.aliveNodes)) 420 estSize := nodes * snapshotBytesPerNode 421 threshold := estSize * snapshotCompactionThreshold 422 423 // Apply a minimum threshold to avoid frequent compaction 424 if threshold < s.minCompactSize { 425 threshold = s.minCompactSize 426 } 427 return threshold 428} 429 430// Compact is used to compact the snapshot once it is too large 431func (s *Snapshotter) compact() error { 432 defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now()) 433 434 // Try to open the file to new fiel 435 newPath := s.path + tmpExt 436 fh, err := os.OpenFile(newPath, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0755) 437 if err != nil { 438 return fmt.Errorf("failed to open new snapshot: %v", err) 439 } 440 441 // Create a buffered writer 442 buf := bufio.NewWriter(fh) 443 444 // Write out the live nodes 445 var offset int64 446 for name, addr := range s.aliveNodes { 447 line := fmt.Sprintf("alive: %s %s\n", name, addr) 448 n, err := buf.WriteString(line) 449 if err != nil { 450 fh.Close() 451 return err 452 } 453 offset += int64(n) 454 } 455 456 // Write out the clocks 457 line := fmt.Sprintf("clock: %d\n", s.lastClock) 458 n, err := buf.WriteString(line) 459 if err != nil { 460 fh.Close() 461 return err 462 } 463 offset += int64(n) 464 465 line = fmt.Sprintf("event-clock: %d\n", s.lastEventClock) 466 n, err = buf.WriteString(line) 467 if err != nil { 468 fh.Close() 469 return err 470 } 471 offset += int64(n) 472 473 line = fmt.Sprintf("query-clock: %d\n", s.lastQueryClock) 474 n, err = buf.WriteString(line) 475 if err != nil { 476 fh.Close() 477 return err 478 } 479 offset += int64(n) 480 481 // Flush the new snapshot 482 err = buf.Flush() 483 484 if err != nil { 485 return fmt.Errorf("failed to flush new snapshot: %v", err) 486 } 487 488 err = fh.Sync() 489 490 if err != nil { 491 fh.Close() 492 return fmt.Errorf("failed to fsync new snapshot: %v", err) 493 } 494 495 fh.Close() 496 497 // We now need to swap the old snapshot file with the new snapshot. 498 // Turns out, Windows won't let us rename the files if we have 499 // open handles to them or if the destination already exists. This 500 // means we are forced to close the existing handles, delete the 501 // old file, move the new one in place, and then re-open the file 502 // handles. 503 504 // Flush the existing snapshot, ignoring errors since we will 505 // delete it momentarily. 506 s.buffered.Flush() 507 s.buffered = nil 508 509 // Close the file handle to the old snapshot 510 s.fh.Close() 511 s.fh = nil 512 513 // Delete the old file 514 if err := os.Remove(s.path); err != nil { 515 return fmt.Errorf("failed to remove old snapshot: %v", err) 516 } 517 518 // Move the new file into place 519 if err := os.Rename(newPath, s.path); err != nil { 520 return fmt.Errorf("failed to install new snapshot: %v", err) 521 } 522 523 // Open the new snapshot 524 fh, err = os.OpenFile(s.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755) 525 if err != nil { 526 return fmt.Errorf("failed to open snapshot: %v", err) 527 } 528 buf = bufio.NewWriter(fh) 529 530 // Rotate our handles 531 s.fh = fh 532 s.buffered = buf 533 s.offset = offset 534 s.lastFlush = time.Now() 535 return nil 536} 537 538// replay is used to seek to reset our internal state by replaying 539// the snapshot file. It is used at initialization time to read old 540// state 541func (s *Snapshotter) replay() error { 542 // Seek to the beginning 543 if _, err := s.fh.Seek(0, os.SEEK_SET); err != nil { 544 return err 545 } 546 547 // Read each line 548 reader := bufio.NewReader(s.fh) 549 for { 550 line, err := reader.ReadString('\n') 551 if err != nil { 552 break 553 } 554 555 // Skip the newline 556 line = line[:len(line)-1] 557 558 // Switch on the prefix 559 if strings.HasPrefix(line, "alive: ") { 560 info := strings.TrimPrefix(line, "alive: ") 561 addrIdx := strings.LastIndex(info, " ") 562 if addrIdx == -1 { 563 s.logger.Printf("[WARN] serf: Failed to parse address: %v", line) 564 continue 565 } 566 addr := info[addrIdx+1:] 567 name := info[:addrIdx] 568 s.aliveNodes[name] = addr 569 570 } else if strings.HasPrefix(line, "not-alive: ") { 571 name := strings.TrimPrefix(line, "not-alive: ") 572 delete(s.aliveNodes, name) 573 574 } else if strings.HasPrefix(line, "clock: ") { 575 timeStr := strings.TrimPrefix(line, "clock: ") 576 timeInt, err := strconv.ParseUint(timeStr, 10, 64) 577 if err != nil { 578 s.logger.Printf("[WARN] serf: Failed to convert clock time: %v", err) 579 continue 580 } 581 s.lastClock = LamportTime(timeInt) 582 583 } else if strings.HasPrefix(line, "event-clock: ") { 584 timeStr := strings.TrimPrefix(line, "event-clock: ") 585 timeInt, err := strconv.ParseUint(timeStr, 10, 64) 586 if err != nil { 587 s.logger.Printf("[WARN] serf: Failed to convert event clock time: %v", err) 588 continue 589 } 590 s.lastEventClock = LamportTime(timeInt) 591 592 } else if strings.HasPrefix(line, "query-clock: ") { 593 timeStr := strings.TrimPrefix(line, "query-clock: ") 594 timeInt, err := strconv.ParseUint(timeStr, 10, 64) 595 if err != nil { 596 s.logger.Printf("[WARN] serf: Failed to convert query clock time: %v", err) 597 continue 598 } 599 s.lastQueryClock = LamportTime(timeInt) 600 601 } else if strings.HasPrefix(line, "coordinate: ") { 602 continue // Ignores any coordinate persistence from old snapshots, serf should re-converge 603 } else if line == "leave" { 604 // Ignore a leave if we plan on re-joining 605 if s.rejoinAfterLeave { 606 s.logger.Printf("[INFO] serf: Ignoring previous leave in snapshot") 607 continue 608 } 609 s.aliveNodes = make(map[string]string) 610 s.lastClock = 0 611 s.lastEventClock = 0 612 s.lastQueryClock = 0 613 614 } else if strings.HasPrefix(line, "#") { 615 // Skip comment lines 616 617 } else { 618 s.logger.Printf("[WARN] serf: Unrecognized snapshot line: %v", line) 619 } 620 } 621 622 // Seek to the end 623 if _, err := s.fh.Seek(0, os.SEEK_END); err != nil { 624 return err 625 } 626 return nil 627} 628