1package serf
2
3import (
4	"bufio"
5	"fmt"
6	"log"
7	"math/rand"
8	"net"
9	"os"
10	"strconv"
11	"strings"
12	"time"
13
14	"github.com/armon/go-metrics"
15)
16
17/*
18Serf supports using a "snapshot" file that contains various
19transactional data that is used to help Serf recover quickly
20and gracefully from a failure. We append member events, as well
21as the latest clock values to the file during normal operation,
22and periodically checkpoint and roll over the file. During a restore,
23we can replay the various member events to recall a list of known
24nodes to re-join, as well as restore our clock values to avoid replaying
25old events.
26*/
27
28const (
29	// flushInterval is how often we force a flush of the snapshot file
30	flushInterval = 500 * time.Millisecond
31
32	// clockUpdateInterval is how often we fetch the current lamport time of the cluster and write to the snapshot file
33	clockUpdateInterval = 500 * time.Millisecond
34
35	// tmpExt is the extention we use for the temporary file during compaction
36	tmpExt = ".compact"
37
38	// snapshotErrorRecoveryInterval is how often we attempt to recover from
39	// errors writing to the snapshot file.
40	snapshotErrorRecoveryInterval = 30 * time.Second
41
42	// eventChSize is the size of the event buffers between Serf and the
43	// consuming application. If this is exhausted we will block Serf and Memberlist.
44	eventChSize = 2048
45
46	// shutdownFlushTimeout is the time limit to write pending events to the snapshot during a shutdown
47	shutdownFlushTimeout = 250 * time.Millisecond
48
49	// snapshotBytesPerNode is an estimated bytes per node to snapshot
50	snapshotBytesPerNode = 128
51
52	// snapshotCompactionThreshold is the threshold we apply to
53	// the snapshot size estimate (nodes * bytes per node) before compacting.
54	snapshotCompactionThreshold = 2
55)
56
57// Snapshotter is responsible for ingesting events and persisting
58// them to disk, and providing a recovery mechanism at start time.
59type Snapshotter struct {
60	aliveNodes              map[string]string
61	clock                   *LamportClock
62	fh                      *os.File
63	buffered                *bufio.Writer
64	inCh                    <-chan Event
65	streamCh                chan Event
66	lastFlush               time.Time
67	lastClock               LamportTime
68	lastEventClock          LamportTime
69	lastQueryClock          LamportTime
70	leaveCh                 chan struct{}
71	leaving                 bool
72	logger                  *log.Logger
73	minCompactSize          int64
74	path                    string
75	offset                  int64
76	outCh                   chan<- Event
77	rejoinAfterLeave        bool
78	shutdownCh              <-chan struct{}
79	waitCh                  chan struct{}
80	lastAttemptedCompaction time.Time
81}
82
83// PreviousNode is used to represent the previously known alive nodes
84type PreviousNode struct {
85	Name string
86	Addr string
87}
88
89func (p PreviousNode) String() string {
90	return fmt.Sprintf("%s: %s", p.Name, p.Addr)
91}
92
93// NewSnapshotter creates a new Snapshotter that records events up to a
94// max byte size before rotating the file. It can also be used to
95// recover old state. Snapshotter works by reading an event channel it returns,
96// passing through to an output channel, and persisting relevant events to disk.
97// Setting rejoinAfterLeave makes leave not clear the state, and can be used
98// if you intend to rejoin the same cluster after a leave.
99func NewSnapshotter(path string,
100	minCompactSize int,
101	rejoinAfterLeave bool,
102	logger *log.Logger,
103	clock *LamportClock,
104	outCh chan<- Event,
105	shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) {
106	inCh := make(chan Event, eventChSize)
107	streamCh := make(chan Event, eventChSize)
108
109	// Try to open the file
110	fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
111	if err != nil {
112		return nil, nil, fmt.Errorf("failed to open snapshot: %v", err)
113	}
114
115	// Determine the offset
116	info, err := fh.Stat()
117	if err != nil {
118		fh.Close()
119		return nil, nil, fmt.Errorf("failed to stat snapshot: %v", err)
120	}
121	offset := info.Size()
122
123	// Create the snapshotter
124	snap := &Snapshotter{
125		aliveNodes:       make(map[string]string),
126		clock:            clock,
127		fh:               fh,
128		buffered:         bufio.NewWriter(fh),
129		inCh:             inCh,
130		streamCh:         streamCh,
131		lastClock:        0,
132		lastEventClock:   0,
133		lastQueryClock:   0,
134		leaveCh:          make(chan struct{}),
135		logger:           logger,
136		minCompactSize:   int64(minCompactSize),
137		path:             path,
138		offset:           offset,
139		outCh:            outCh,
140		rejoinAfterLeave: rejoinAfterLeave,
141		shutdownCh:       shutdownCh,
142		waitCh:           make(chan struct{}),
143	}
144
145	// Recover the last known state
146	if err := snap.replay(); err != nil {
147		fh.Close()
148		return nil, nil, err
149	}
150
151	// Start handling new commands
152	go snap.teeStream()
153	go snap.stream()
154	return inCh, snap, nil
155}
156
157// LastClock returns the last known clock time
158func (s *Snapshotter) LastClock() LamportTime {
159	return s.lastClock
160}
161
162// LastEventClock returns the last known event clock time
163func (s *Snapshotter) LastEventClock() LamportTime {
164	return s.lastEventClock
165}
166
167// LastQueryClock returns the last known query clock time
168func (s *Snapshotter) LastQueryClock() LamportTime {
169	return s.lastQueryClock
170}
171
172// AliveNodes returns the last known alive nodes
173func (s *Snapshotter) AliveNodes() []*PreviousNode {
174	// Copy the previously known
175	previous := make([]*PreviousNode, 0, len(s.aliveNodes))
176	for name, addr := range s.aliveNodes {
177		previous = append(previous, &PreviousNode{name, addr})
178	}
179
180	// Randomize the order, prevents hot shards
181	for i := range previous {
182		j := rand.Intn(i + 1)
183		previous[i], previous[j] = previous[j], previous[i]
184	}
185	return previous
186}
187
188// Wait is used to wait until the snapshotter finishes shut down
189func (s *Snapshotter) Wait() {
190	<-s.waitCh
191}
192
193// Leave is used to remove known nodes to prevent a restart from
194// causing a join. Otherwise nodes will re-join after leaving!
195func (s *Snapshotter) Leave() {
196	select {
197	case s.leaveCh <- struct{}{}:
198	case <-s.shutdownCh:
199	}
200}
201
202// teeStream is a long running routine that is used to copy events
203// to the output channel and the internal event handler.
204func (s *Snapshotter) teeStream() {
205	flushEvent := func(e Event) {
206		// Forward to the internal stream, do not block
207		select {
208		case s.streamCh <- e:
209		default:
210		}
211
212		// Forward the event immediately, do not block
213		if s.outCh != nil {
214			select {
215			case s.outCh <- e:
216			default:
217			}
218		}
219	}
220
221OUTER:
222	for {
223		select {
224		case e := <-s.inCh:
225			flushEvent(e)
226		case <-s.shutdownCh:
227			break OUTER
228		}
229	}
230
231	// Drain any remaining events before exiting
232	for {
233		select {
234		case e := <-s.inCh:
235			flushEvent(e)
236		default:
237			return
238		}
239	}
240}
241
242// stream is a long running routine that is used to handle events
243func (s *Snapshotter) stream() {
244	clockTicker := time.NewTicker(clockUpdateInterval)
245	defer clockTicker.Stop()
246
247	// flushEvent is used to handle writing out an event
248	flushEvent := func(e Event) {
249		// Stop recording events after a leave is issued
250		if s.leaving {
251			return
252		}
253		switch typed := e.(type) {
254		case MemberEvent:
255			s.processMemberEvent(typed)
256		case UserEvent:
257			s.processUserEvent(typed)
258		case *Query:
259			s.processQuery(typed)
260		default:
261			s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e)
262		}
263	}
264
265	for {
266		select {
267		case <-s.leaveCh:
268			s.leaving = true
269
270			// If we plan to re-join, keep our state
271			if !s.rejoinAfterLeave {
272				s.aliveNodes = make(map[string]string)
273			}
274			s.tryAppend("leave\n")
275			if err := s.buffered.Flush(); err != nil {
276				s.logger.Printf("[ERR] serf: failed to flush leave to snapshot: %v", err)
277			}
278			if err := s.fh.Sync(); err != nil {
279				s.logger.Printf("[ERR] serf: failed to sync leave to snapshot: %v", err)
280			}
281
282		case e := <-s.streamCh:
283			flushEvent(e)
284
285		case <-clockTicker.C:
286			s.updateClock()
287
288		case <-s.shutdownCh:
289			// Setup a timeout
290			flushTimeout := time.After(shutdownFlushTimeout)
291
292			// Snapshot the clock
293			s.updateClock()
294
295			// Clear out the buffers
296		FLUSH:
297			for {
298				select {
299				case e := <-s.streamCh:
300					flushEvent(e)
301				case <-flushTimeout:
302					break FLUSH
303				default:
304					break FLUSH
305				}
306			}
307
308			if err := s.buffered.Flush(); err != nil {
309				s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err)
310			}
311			if err := s.fh.Sync(); err != nil {
312				s.logger.Printf("[ERR] serf: failed to sync snapshot: %v", err)
313			}
314			s.fh.Close()
315			close(s.waitCh)
316			return
317		}
318	}
319}
320
321// processMemberEvent is used to handle a single member event
322func (s *Snapshotter) processMemberEvent(e MemberEvent) {
323	switch e.Type {
324	case EventMemberJoin:
325		for _, mem := range e.Members {
326			addr := net.TCPAddr{IP: mem.Addr, Port: int(mem.Port)}
327			s.aliveNodes[mem.Name] = addr.String()
328			s.tryAppend(fmt.Sprintf("alive: %s %s\n", mem.Name, addr.String()))
329		}
330
331	case EventMemberLeave:
332		fallthrough
333	case EventMemberFailed:
334		for _, mem := range e.Members {
335			delete(s.aliveNodes, mem.Name)
336			s.tryAppend(fmt.Sprintf("not-alive: %s\n", mem.Name))
337		}
338	}
339	s.updateClock()
340}
341
342// updateClock is called periodically to check if we should udpate our
343// clock value. This is done after member events but should also be done
344// periodically due to race conditions with join and leave intents
345func (s *Snapshotter) updateClock() {
346	lastSeen := s.clock.Time() - 1
347	if lastSeen > s.lastClock {
348		s.lastClock = lastSeen
349		s.tryAppend(fmt.Sprintf("clock: %d\n", s.lastClock))
350	}
351}
352
353// processUserEvent is used to handle a single user event
354func (s *Snapshotter) processUserEvent(e UserEvent) {
355	// Ignore old clocks
356	if e.LTime <= s.lastEventClock {
357		return
358	}
359	s.lastEventClock = e.LTime
360	s.tryAppend(fmt.Sprintf("event-clock: %d\n", e.LTime))
361}
362
363// processQuery is used to handle a single query event
364func (s *Snapshotter) processQuery(q *Query) {
365	// Ignore old clocks
366	if q.LTime <= s.lastQueryClock {
367		return
368	}
369	s.lastQueryClock = q.LTime
370	s.tryAppend(fmt.Sprintf("query-clock: %d\n", q.LTime))
371}
372
373// tryAppend will invoke append line but will not return an error
374func (s *Snapshotter) tryAppend(l string) {
375	if err := s.appendLine(l); err != nil {
376		s.logger.Printf("[ERR] serf: Failed to update snapshot: %v", err)
377		now := time.Now()
378		if now.Sub(s.lastAttemptedCompaction) > snapshotErrorRecoveryInterval {
379			s.lastAttemptedCompaction = now
380			s.logger.Printf("[INFO] serf: Attempting compaction to recover from error...")
381			err = s.compact()
382			if err != nil {
383				s.logger.Printf("[ERR] serf: Compaction failed, will reattempt after %v: %v", snapshotErrorRecoveryInterval, err)
384			} else {
385				s.logger.Printf("[INFO] serf: Finished compaction, successfully recovered from error state")
386			}
387		}
388	}
389}
390
391// appendLine is used to append a line to the existing log
392func (s *Snapshotter) appendLine(l string) error {
393	defer metrics.MeasureSince([]string{"serf", "snapshot", "appendLine"}, time.Now())
394
395	n, err := s.buffered.WriteString(l)
396	if err != nil {
397		return err
398	}
399
400	// Check if we should flush
401	now := time.Now()
402	if now.Sub(s.lastFlush) > flushInterval {
403		s.lastFlush = now
404		if err := s.buffered.Flush(); err != nil {
405			return err
406		}
407	}
408
409	// Check if a compaction is necessary
410	s.offset += int64(n)
411	if s.offset > s.snapshotMaxSize() {
412		return s.compact()
413	}
414	return nil
415}
416
417// snapshotMaxSize computes the maximum size and is used to force periodic compaction.
418func (s *Snapshotter) snapshotMaxSize() int64 {
419	nodes := int64(len(s.aliveNodes))
420	estSize := nodes * snapshotBytesPerNode
421	threshold := estSize * snapshotCompactionThreshold
422
423	// Apply a minimum threshold to avoid frequent compaction
424	if threshold < s.minCompactSize {
425		threshold = s.minCompactSize
426	}
427	return threshold
428}
429
430// Compact is used to compact the snapshot once it is too large
431func (s *Snapshotter) compact() error {
432	defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now())
433
434	// Try to open the file to new fiel
435	newPath := s.path + tmpExt
436	fh, err := os.OpenFile(newPath, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0755)
437	if err != nil {
438		return fmt.Errorf("failed to open new snapshot: %v", err)
439	}
440
441	// Create a buffered writer
442	buf := bufio.NewWriter(fh)
443
444	// Write out the live nodes
445	var offset int64
446	for name, addr := range s.aliveNodes {
447		line := fmt.Sprintf("alive: %s %s\n", name, addr)
448		n, err := buf.WriteString(line)
449		if err != nil {
450			fh.Close()
451			return err
452		}
453		offset += int64(n)
454	}
455
456	// Write out the clocks
457	line := fmt.Sprintf("clock: %d\n", s.lastClock)
458	n, err := buf.WriteString(line)
459	if err != nil {
460		fh.Close()
461		return err
462	}
463	offset += int64(n)
464
465	line = fmt.Sprintf("event-clock: %d\n", s.lastEventClock)
466	n, err = buf.WriteString(line)
467	if err != nil {
468		fh.Close()
469		return err
470	}
471	offset += int64(n)
472
473	line = fmt.Sprintf("query-clock: %d\n", s.lastQueryClock)
474	n, err = buf.WriteString(line)
475	if err != nil {
476		fh.Close()
477		return err
478	}
479	offset += int64(n)
480
481	// Flush the new snapshot
482	err = buf.Flush()
483
484	if err != nil {
485		return fmt.Errorf("failed to flush new snapshot: %v", err)
486	}
487
488	err = fh.Sync()
489
490	if err != nil {
491		fh.Close()
492		return fmt.Errorf("failed to fsync new snapshot: %v", err)
493	}
494
495	fh.Close()
496
497	// We now need to swap the old snapshot file with the new snapshot.
498	// Turns out, Windows won't let us rename the files if we have
499	// open handles to them or if the destination already exists. This
500	// means we are forced to close the existing handles, delete the
501	// old file, move the new one in place, and then re-open the file
502	// handles.
503
504	// Flush the existing snapshot, ignoring errors since we will
505	// delete it momentarily.
506	s.buffered.Flush()
507	s.buffered = nil
508
509	// Close the file handle to the old snapshot
510	s.fh.Close()
511	s.fh = nil
512
513	// Delete the old file
514	if err := os.Remove(s.path); err != nil {
515		return fmt.Errorf("failed to remove old snapshot: %v", err)
516	}
517
518	// Move the new file into place
519	if err := os.Rename(newPath, s.path); err != nil {
520		return fmt.Errorf("failed to install new snapshot: %v", err)
521	}
522
523	// Open the new snapshot
524	fh, err = os.OpenFile(s.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
525	if err != nil {
526		return fmt.Errorf("failed to open snapshot: %v", err)
527	}
528	buf = bufio.NewWriter(fh)
529
530	// Rotate our handles
531	s.fh = fh
532	s.buffered = buf
533	s.offset = offset
534	s.lastFlush = time.Now()
535	return nil
536}
537
538// replay is used to seek to reset our internal state by replaying
539// the snapshot file. It is used at initialization time to read old
540// state
541func (s *Snapshotter) replay() error {
542	// Seek to the beginning
543	if _, err := s.fh.Seek(0, os.SEEK_SET); err != nil {
544		return err
545	}
546
547	// Read each line
548	reader := bufio.NewReader(s.fh)
549	for {
550		line, err := reader.ReadString('\n')
551		if err != nil {
552			break
553		}
554
555		// Skip the newline
556		line = line[:len(line)-1]
557
558		// Switch on the prefix
559		if strings.HasPrefix(line, "alive: ") {
560			info := strings.TrimPrefix(line, "alive: ")
561			addrIdx := strings.LastIndex(info, " ")
562			if addrIdx == -1 {
563				s.logger.Printf("[WARN] serf: Failed to parse address: %v", line)
564				continue
565			}
566			addr := info[addrIdx+1:]
567			name := info[:addrIdx]
568			s.aliveNodes[name] = addr
569
570		} else if strings.HasPrefix(line, "not-alive: ") {
571			name := strings.TrimPrefix(line, "not-alive: ")
572			delete(s.aliveNodes, name)
573
574		} else if strings.HasPrefix(line, "clock: ") {
575			timeStr := strings.TrimPrefix(line, "clock: ")
576			timeInt, err := strconv.ParseUint(timeStr, 10, 64)
577			if err != nil {
578				s.logger.Printf("[WARN] serf: Failed to convert clock time: %v", err)
579				continue
580			}
581			s.lastClock = LamportTime(timeInt)
582
583		} else if strings.HasPrefix(line, "event-clock: ") {
584			timeStr := strings.TrimPrefix(line, "event-clock: ")
585			timeInt, err := strconv.ParseUint(timeStr, 10, 64)
586			if err != nil {
587				s.logger.Printf("[WARN] serf: Failed to convert event clock time: %v", err)
588				continue
589			}
590			s.lastEventClock = LamportTime(timeInt)
591
592		} else if strings.HasPrefix(line, "query-clock: ") {
593			timeStr := strings.TrimPrefix(line, "query-clock: ")
594			timeInt, err := strconv.ParseUint(timeStr, 10, 64)
595			if err != nil {
596				s.logger.Printf("[WARN] serf: Failed to convert query clock time: %v", err)
597				continue
598			}
599			s.lastQueryClock = LamportTime(timeInt)
600
601		} else if strings.HasPrefix(line, "coordinate: ") {
602			continue // Ignores any coordinate persistence from old snapshots, serf should re-converge
603		} else if line == "leave" {
604			// Ignore a leave if we plan on re-joining
605			if s.rejoinAfterLeave {
606				s.logger.Printf("[INFO] serf: Ignoring previous leave in snapshot")
607				continue
608			}
609			s.aliveNodes = make(map[string]string)
610			s.lastClock = 0
611			s.lastEventClock = 0
612			s.lastQueryClock = 0
613
614		} else if strings.HasPrefix(line, "#") {
615			// Skip comment lines
616
617		} else {
618			s.logger.Printf("[WARN] serf: Unrecognized snapshot line: %v", line)
619		}
620	}
621
622	// Seek to the end
623	if _, err := s.fh.Seek(0, os.SEEK_END); err != nil {
624		return err
625	}
626	return nil
627}
628