1package raft
2
3import (
4	"context"
5	"crypto/tls"
6	"errors"
7	"fmt"
8	"io"
9	"io/ioutil"
10	"os"
11	"path/filepath"
12	"strconv"
13	"sync"
14	"time"
15
16	"github.com/armon/go-metrics"
17	"github.com/golang/protobuf/proto"
18	log "github.com/hashicorp/go-hclog"
19	wrapping "github.com/hashicorp/go-kms-wrapping"
20	"github.com/hashicorp/go-raftchunking"
21	"github.com/hashicorp/go-uuid"
22	"github.com/hashicorp/raft"
23	autopilot "github.com/hashicorp/raft-autopilot"
24	raftboltdb "github.com/hashicorp/raft-boltdb/v2"
25	snapshot "github.com/hashicorp/raft-snapshot"
26	"github.com/hashicorp/vault/helper/metricsutil"
27	"github.com/hashicorp/vault/sdk/helper/consts"
28	"github.com/hashicorp/vault/sdk/helper/jsonutil"
29	"github.com/hashicorp/vault/sdk/helper/tlsutil"
30	"github.com/hashicorp/vault/sdk/logical"
31	"github.com/hashicorp/vault/sdk/physical"
32	"github.com/hashicorp/vault/vault/cluster"
33	"github.com/hashicorp/vault/vault/seal"
34	bolt "go.etcd.io/bbolt"
35)
36
37// EnvVaultRaftNodeID is used to fetch the Raft node ID from the environment.
38const EnvVaultRaftNodeID = "VAULT_RAFT_NODE_ID"
39
40// EnvVaultRaftPath is used to fetch the path where Raft data is stored from the environment.
41const EnvVaultRaftPath = "VAULT_RAFT_PATH"
42
43// Verify RaftBackend satisfies the correct interfaces
44var (
45	_ physical.Backend       = (*RaftBackend)(nil)
46	_ physical.Transactional = (*RaftBackend)(nil)
47	_ physical.HABackend     = (*RaftBackend)(nil)
48	_ physical.Lock          = (*RaftLock)(nil)
49)
50
51var (
52	// raftLogCacheSize is the maximum number of logs to cache in-memory.
53	// This is used to reduce disk I/O for the recently committed entries.
54	raftLogCacheSize = 512
55
56	raftState     = "raft/"
57	peersFileName = "peers.json"
58
59	restoreOpDelayDuration = 5 * time.Second
60
61	defaultMaxEntrySize = uint64(2 * raftchunking.ChunkSize)
62)
63
64// RaftBackend implements the backend interfaces and uses the raft protocol to
65// persist writes to the FSM.
66type RaftBackend struct {
67	logger log.Logger
68	conf   map[string]string
69	l      sync.RWMutex
70
71	// fsm is the state store for vault's data
72	fsm *FSM
73
74	// raft is the instance of raft we will operate on.
75	raft *raft.Raft
76
77	// raftInitCh is used to block during HA lock acquisition if raft
78	// has not been initialized yet, which can occur if raft is being
79	// used for HA-only.
80	raftInitCh chan struct{}
81
82	// raftNotifyCh is used to receive updates about leadership changes
83	// regarding this node.
84	raftNotifyCh chan bool
85
86	// streamLayer is the network layer used to connect the nodes in the raft
87	// cluster.
88	streamLayer *raftLayer
89
90	// raftTransport is the transport layer that the raft library uses for RPC
91	// communication.
92	raftTransport raft.Transport
93
94	// snapStore is our snapshot mechanism.
95	snapStore raft.SnapshotStore
96
97	// logStore is used by the raft library to store the raft logs in durable
98	// storage.
99	logStore raft.LogStore
100
101	// stableStore is used by the raft library to store additional metadata in
102	// durable storage.
103	stableStore raft.StableStore
104
105	// bootstrapConfig is only set when this node needs to be bootstrapped upon
106	// startup.
107	bootstrapConfig *raft.Configuration
108
109	// dataDir is the location on the local filesystem that raft and FSM data
110	// will be stored.
111	dataDir string
112
113	// localID is the ID for this node. This can either be configured in the
114	// config file, via a file on disk, or is otherwise randomly generated.
115	localID string
116
117	// serverAddressProvider is used to map server IDs to addresses.
118	serverAddressProvider raft.ServerAddressProvider
119
120	// permitPool is used to limit the number of concurrent storage calls.
121	permitPool *physical.PermitPool
122
123	// maxEntrySize imposes a size limit (in bytes) on a raft entry (put or transaction).
124	// It is suggested to use a value of 2x the Raft chunking size for optimal
125	// performance.
126	maxEntrySize uint64
127
128	// autopilot is the instance of raft-autopilot library implementation of the
129	// autopilot features. This will be instantiated in both leader and followers.
130	// However, only active node will have a "running" autopilot.
131	autopilot *autopilot.Autopilot
132
133	// autopilotConfig represents the configuration required to instantiate autopilot.
134	autopilotConfig *AutopilotConfig
135
136	// followerStates represents the information about all the peers of the raft
137	// leader. This is used to track some state of the peers and as well as used
138	// to see if the peers are "alive" using the heartbeat received from them.
139	followerStates *FollowerStates
140
141	// followerHeartbeatTicker is used to compute dead servers using follower
142	// state heartbeats.
143	followerHeartbeatTicker *time.Ticker
144
145	// disableAutopilot if set will not put autopilot implementation to use. The
146	// fallback will be to interact with the raft instance directly. This can only
147	// be set during startup via the environment variable
148	// VAULT_RAFT_AUTOPILOT_DISABLE during startup and can't be updated once the
149	// node is up and running.
150	disableAutopilot bool
151
152	autopilotReconcileInterval time.Duration
153}
154
155// LeaderJoinInfo contains information required by a node to join itself as a
156// follower to an existing raft cluster
157type LeaderJoinInfo struct {
158	// AutoJoin defines any cloud auto-join metadata. If supplied, Vault will
159	// attempt to automatically discover peers in addition to what can be provided
160	// via 'leader_api_addr'.
161	AutoJoin string `json:"auto_join"`
162
163	// AutoJoinScheme defines the optional URI protocol scheme for addresses
164	// discovered via auto-join.
165	AutoJoinScheme string `json:"auto_join_scheme"`
166
167	// AutoJoinPort defines the optional port used for addressed discovered via
168	// auto-join.
169	AutoJoinPort uint `json:"auto_join_port"`
170
171	// LeaderAPIAddr is the address of the leader node to connect to
172	LeaderAPIAddr string `json:"leader_api_addr"`
173
174	// LeaderCACert is the CA cert of the leader node
175	LeaderCACert string `json:"leader_ca_cert"`
176
177	// LeaderClientCert is the client certificate for the follower node to
178	// establish client authentication during TLS
179	LeaderClientCert string `json:"leader_client_cert"`
180
181	// LeaderClientKey is the client key for the follower node to establish
182	// client authentication during TLS.
183	LeaderClientKey string `json:"leader_client_key"`
184
185	// LeaderCACertFile is the path on disk to the the CA cert file of the
186	// leader node. This should only be provided via Vault's configuration file.
187	LeaderCACertFile string `json:"leader_ca_cert_file"`
188
189	// LeaderClientCertFile is the path on disk to the client certificate file
190	// for the follower node to establish client authentication during TLS. This
191	// should only be provided via Vault's configuration file.
192	LeaderClientCertFile string `json:"leader_client_cert_file"`
193
194	// LeaderClientKeyFile is the path on disk to the client key file for the
195	// follower node to establish client authentication during TLS. This should
196	// only be provided via Vault's configuration file.
197	LeaderClientKeyFile string `json:"leader_client_key_file"`
198
199	// LeaderTLSServerName is the optional ServerName to expect in the leader's
200	// certificate, instead of the host/IP we're actually connecting to.
201	LeaderTLSServerName string `json:"leader_tls_servername"`
202
203	// Retry indicates if the join process should automatically be retried
204	Retry bool `json:"-"`
205
206	// TLSConfig for the API client to use when communicating with the leader node
207	TLSConfig *tls.Config `json:"-"`
208}
209
210// JoinConfig returns a list of information about possible leader nodes that
211// this node can join as a follower
212func (b *RaftBackend) JoinConfig() ([]*LeaderJoinInfo, error) {
213	config := b.conf["retry_join"]
214	if config == "" {
215		return nil, nil
216	}
217
218	var leaderInfos []*LeaderJoinInfo
219	err := jsonutil.DecodeJSON([]byte(config), &leaderInfos)
220	if err != nil {
221		return nil, fmt.Errorf("failed to decode retry_join config: %w", err)
222	}
223
224	if len(leaderInfos) == 0 {
225		return nil, errors.New("invalid retry_join config")
226	}
227
228	for i, info := range leaderInfos {
229		if len(info.AutoJoin) != 0 && len(info.LeaderAPIAddr) != 0 {
230			return nil, errors.New("cannot provide both a leader_api_addr and auto_join")
231		}
232
233		if info.AutoJoinScheme != "" && (info.AutoJoinScheme != "http" && info.AutoJoinScheme != "https") {
234			return nil, fmt.Errorf("invalid scheme '%s'; must either be http or https", info.AutoJoinScheme)
235		}
236
237		info.Retry = true
238		info.TLSConfig, err = parseTLSInfo(info)
239		if err != nil {
240			return nil, fmt.Errorf("failed to create tls config to communicate with leader node (retry_join index: %d): %w", i, err)
241		}
242	}
243
244	return leaderInfos, nil
245}
246
247// parseTLSInfo is a helper for parses the TLS information, preferring file
248// paths over raw certificate content.
249func parseTLSInfo(leaderInfo *LeaderJoinInfo) (*tls.Config, error) {
250	var tlsConfig *tls.Config
251	var err error
252	if len(leaderInfo.LeaderCACertFile) != 0 || len(leaderInfo.LeaderClientCertFile) != 0 || len(leaderInfo.LeaderClientKeyFile) != 0 {
253		tlsConfig, err = tlsutil.LoadClientTLSConfig(leaderInfo.LeaderCACertFile, leaderInfo.LeaderClientCertFile, leaderInfo.LeaderClientKeyFile)
254		if err != nil {
255			return nil, err
256		}
257	} else if len(leaderInfo.LeaderCACert) != 0 || len(leaderInfo.LeaderClientCert) != 0 || len(leaderInfo.LeaderClientKey) != 0 {
258		tlsConfig, err = tlsutil.ClientTLSConfig([]byte(leaderInfo.LeaderCACert), []byte(leaderInfo.LeaderClientCert), []byte(leaderInfo.LeaderClientKey))
259		if err != nil {
260			return nil, err
261		}
262	}
263	if tlsConfig != nil {
264		tlsConfig.ServerName = leaderInfo.LeaderTLSServerName
265	}
266
267	return tlsConfig, nil
268}
269
270// EnsurePath is used to make sure a path exists
271func EnsurePath(path string, dir bool) error {
272	if !dir {
273		path = filepath.Dir(path)
274	}
275	return os.MkdirAll(path, 0o755)
276}
277
278// NewRaftBackend constructs a RaftBackend using the given directory
279func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
280	path := os.Getenv(EnvVaultRaftPath)
281	if path == "" {
282		pathFromConfig, ok := conf["path"]
283		if !ok {
284			return nil, fmt.Errorf("'path' must be set")
285		}
286		path = pathFromConfig
287	}
288
289	var localID string
290	{
291		// Determine the local node ID from the environment.
292		if raftNodeID := os.Getenv(EnvVaultRaftNodeID); raftNodeID != "" {
293			localID = raftNodeID
294		}
295
296		// If not set in the environment check the configuration file.
297		if len(localID) == 0 {
298			localID = conf["node_id"]
299		}
300
301		// If not set in the config check the "node-id" file.
302		if len(localID) == 0 {
303			localIDRaw, err := ioutil.ReadFile(filepath.Join(path, "node-id"))
304			switch {
305			case err == nil:
306				if len(localIDRaw) > 0 {
307					localID = string(localIDRaw)
308				}
309			case os.IsNotExist(err):
310			default:
311				return nil, err
312			}
313		}
314
315		// If all of the above fails generate a UUID and persist it to the
316		// "node-id" file.
317		if len(localID) == 0 {
318			id, err := uuid.GenerateUUID()
319			if err != nil {
320				return nil, err
321			}
322
323			if err := ioutil.WriteFile(filepath.Join(path, "node-id"), []byte(id), 0o600); err != nil {
324				return nil, err
325			}
326
327			localID = id
328		}
329	}
330
331	// Create the FSM.
332	fsm, err := NewFSM(path, localID, logger.Named("fsm"))
333	if err != nil {
334		return nil, fmt.Errorf("failed to create fsm: %v", err)
335	}
336
337	if delayRaw, ok := conf["apply_delay"]; ok {
338		delay, err := time.ParseDuration(delayRaw)
339		if err != nil {
340			return nil, fmt.Errorf("apply_delay does not parse as a duration: %w", err)
341		}
342		fsm.applyCallback = func() {
343			time.Sleep(delay)
344		}
345	}
346
347	// Build an all in-memory setup for dev mode, otherwise prepare a full
348	// disk-based setup.
349	var log raft.LogStore
350	var stable raft.StableStore
351	var snap raft.SnapshotStore
352
353	var devMode bool
354	if devMode {
355		store := raft.NewInmemStore()
356		stable = store
357		log = store
358		snap = raft.NewInmemSnapshotStore()
359	} else {
360		// Create the base raft path.
361		path := filepath.Join(path, raftState)
362		if err := EnsurePath(path, true); err != nil {
363			return nil, err
364		}
365
366		// Create the backend raft store for logs and stable storage.
367		freelistType, noFreelistSync := freelistOptions()
368		raftOptions := raftboltdb.Options{
369			Path: filepath.Join(path, "raft.db"),
370			BoltOptions: &bolt.Options{
371				FreelistType:   freelistType,
372				NoFreelistSync: noFreelistSync,
373			},
374		}
375		store, err := raftboltdb.New(raftOptions)
376		if err != nil {
377			return nil, err
378		}
379		stable = store
380
381		// Wrap the store in a LogCache to improve performance.
382		cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
383		if err != nil {
384			return nil, err
385		}
386		log = cacheStore
387
388		// Create the snapshot store.
389		snapshots, err := NewBoltSnapshotStore(path, logger.Named("snapshot"), fsm)
390		if err != nil {
391			return nil, err
392		}
393		snap = snapshots
394	}
395
396	if delayRaw, ok := conf["snapshot_delay"]; ok {
397		delay, err := time.ParseDuration(delayRaw)
398		if err != nil {
399			return nil, fmt.Errorf("snapshot_delay does not parse as a duration: %w", err)
400		}
401		snap = newSnapshotStoreDelay(snap, delay, logger)
402	}
403
404	maxEntrySize := defaultMaxEntrySize
405	if maxEntrySizeCfg := conf["max_entry_size"]; len(maxEntrySizeCfg) != 0 {
406		i, err := strconv.Atoi(maxEntrySizeCfg)
407		if err != nil {
408			return nil, fmt.Errorf("failed to parse 'max_entry_size': %w", err)
409		}
410
411		maxEntrySize = uint64(i)
412	}
413
414	var reconcileInterval time.Duration
415	if interval := conf["autopilot_reconcile_interval"]; interval != "" {
416		interval, err := time.ParseDuration(interval)
417		if err != nil {
418			return nil, fmt.Errorf("autopilot_reconcile_interval does not parse as a duration: %w", err)
419		}
420		reconcileInterval = interval
421	}
422
423	return &RaftBackend{
424		logger:                     logger,
425		fsm:                        fsm,
426		raftInitCh:                 make(chan struct{}),
427		conf:                       conf,
428		logStore:                   log,
429		stableStore:                stable,
430		snapStore:                  snap,
431		dataDir:                    path,
432		localID:                    localID,
433		permitPool:                 physical.NewPermitPool(physical.DefaultParallelOperations),
434		maxEntrySize:               maxEntrySize,
435		followerHeartbeatTicker:    time.NewTicker(time.Second),
436		autopilotReconcileInterval: reconcileInterval,
437	}, nil
438}
439
440type snapshotStoreDelay struct {
441	logger  log.Logger
442	wrapped raft.SnapshotStore
443	delay   time.Duration
444}
445
446func (s snapshotStoreDelay) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) {
447	s.logger.Trace("delaying before creating snapshot", "delay", s.delay)
448	time.Sleep(s.delay)
449	return s.wrapped.Create(version, index, term, configuration, configurationIndex, trans)
450}
451
452func (s snapshotStoreDelay) List() ([]*raft.SnapshotMeta, error) {
453	return s.wrapped.List()
454}
455
456func (s snapshotStoreDelay) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) {
457	return s.wrapped.Open(id)
458}
459
460var _ raft.SnapshotStore = &snapshotStoreDelay{}
461
462func newSnapshotStoreDelay(snap raft.SnapshotStore, delay time.Duration, logger log.Logger) *snapshotStoreDelay {
463	return &snapshotStoreDelay{
464		logger:  logger,
465		wrapped: snap,
466		delay:   delay,
467	}
468}
469
470// Close is used to gracefully close all file resources.  N.B. This method
471// should only be called if you are sure the RaftBackend will never be used
472// again.
473func (b *RaftBackend) Close() error {
474	b.l.Lock()
475	defer b.l.Unlock()
476
477	if err := b.fsm.db.Close(); err != nil {
478		return err
479	}
480
481	if err := b.stableStore.(*raftboltdb.BoltStore).Close(); err != nil {
482		return err
483	}
484
485	return nil
486}
487
488func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) {
489	b.l.RLock()
490	logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats()
491	fsmStats := b.fsm.db.Stats()
492	b.l.RUnlock()
493	b.collectMetricsWithStats(logstoreStats, sink, "logstore")
494	b.collectMetricsWithStats(fsmStats, sink, "fsm")
495}
496
497func (b *RaftBackend) collectMetricsWithStats(stats bolt.Stats, sink *metricsutil.ClusterMetricSink, database string) {
498	txstats := stats.TxStats
499	labels := []metricsutil.Label{{"database", database}}
500	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "free_pages"}, float32(stats.FreePageN), labels)
501	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "pending_pages"}, float32(stats.PendingPageN), labels)
502	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "allocated_bytes"}, float32(stats.FreeAlloc), labels)
503	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "used_bytes"}, float32(stats.FreelistInuse), labels)
504	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "transaction", "started_read_transactions"}, float32(stats.TxN), labels)
505	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "transaction", "currently_open_read_transactions"}, float32(stats.OpenTxN), labels)
506	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "page", "count"}, float32(txstats.PageCount), labels)
507	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "page", "bytes_allocated"}, float32(txstats.PageAlloc), labels)
508	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "cursor", "count"}, float32(txstats.CursorCount), labels)
509	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "node", "count"}, float32(txstats.NodeCount), labels)
510	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "node", "dereferences"}, float32(txstats.NodeDeref), labels)
511	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "rebalance", "count"}, float32(txstats.Rebalance), labels)
512	sink.AddSampleWithLabels([]string{"raft_storage", "bolt", "rebalance", "time"}, float32(txstats.RebalanceTime), labels)
513	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "split", "count"}, float32(txstats.Split), labels)
514	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "spill", "count"}, float32(txstats.Spill), labels)
515	sink.AddSampleWithLabels([]string{"raft_storage", "bolt", "spill", "time"}, float32(txstats.SpillTime), labels)
516	sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "write", "count"}, float32(txstats.Write), labels)
517	sink.AddSampleWithLabels([]string{"raft_storage", "bolt", "write", "time"}, float32(txstats.WriteTime), labels)
518}
519
520// RaftServer has information about a server in the Raft configuration
521type RaftServer struct {
522	// NodeID is the name of the server
523	NodeID string `json:"node_id"`
524
525	// Address is the IP:port of the server, used for Raft communications
526	Address string `json:"address"`
527
528	// Leader is true if this server is the current cluster leader
529	Leader bool `json:"leader"`
530
531	// Protocol version is the raft protocol version used by the server
532	ProtocolVersion string `json:"protocol_version"`
533
534	// Voter is true if this server has a vote in the cluster. This might
535	// be false if the server is staging and still coming online.
536	Voter bool `json:"voter"`
537}
538
539// RaftConfigurationResponse is returned when querying for the current Raft
540// configuration.
541type RaftConfigurationResponse struct {
542	// Servers has the list of servers in the Raft configuration.
543	Servers []*RaftServer `json:"servers"`
544
545	// Index has the Raft index of this configuration.
546	Index uint64 `json:"index"`
547}
548
549// Peer defines the ID and Address for a given member of the raft cluster.
550type Peer struct {
551	ID       string `json:"id"`
552	Address  string `json:"address"`
553	Suffrage int    `json:"suffrage"`
554}
555
556// NodeID returns the identifier of the node
557func (b *RaftBackend) NodeID() string {
558	return b.localID
559}
560
561// Initialized tells if raft is running or not
562func (b *RaftBackend) Initialized() bool {
563	b.l.RLock()
564	init := b.raft != nil
565	b.l.RUnlock()
566	return init
567}
568
569// SetTLSKeyring is used to install a new keyring. If the active key has changed
570// it will also close any network connections or streams forcing a reconnect
571// with the new key.
572func (b *RaftBackend) SetTLSKeyring(keyring *TLSKeyring) error {
573	b.l.RLock()
574	err := b.streamLayer.setTLSKeyring(keyring)
575	b.l.RUnlock()
576
577	return err
578}
579
580// SetServerAddressProvider sets a the address provider for determining the raft
581// node addresses. This is currently only used in tests.
582func (b *RaftBackend) SetServerAddressProvider(provider raft.ServerAddressProvider) {
583	b.l.Lock()
584	b.serverAddressProvider = provider
585	b.l.Unlock()
586}
587
588// Bootstrap prepares the given peers to be part of the raft cluster
589func (b *RaftBackend) Bootstrap(peers []Peer) error {
590	b.l.Lock()
591	defer b.l.Unlock()
592
593	hasState, err := raft.HasExistingState(b.logStore, b.stableStore, b.snapStore)
594	if err != nil {
595		return err
596	}
597
598	if hasState {
599		return errors.New("error bootstrapping cluster: cluster already has state")
600	}
601
602	raftConfig := &raft.Configuration{
603		Servers: make([]raft.Server, len(peers)),
604	}
605
606	for i, p := range peers {
607		raftConfig.Servers[i] = raft.Server{
608			ID:       raft.ServerID(p.ID),
609			Address:  raft.ServerAddress(p.Address),
610			Suffrage: raft.ServerSuffrage(p.Suffrage),
611		}
612	}
613
614	// Store the config for later use
615	b.bootstrapConfig = raftConfig
616	return nil
617}
618
619// SetRestoreCallback sets the callback to be used when a restoreCallbackOp is
620// processed through the FSM.
621func (b *RaftBackend) SetRestoreCallback(restoreCb restoreCallback) {
622	b.fsm.l.Lock()
623	b.fsm.restoreCb = restoreCb
624	b.fsm.l.Unlock()
625}
626
627func (b *RaftBackend) applyConfigSettings(config *raft.Config) error {
628	config.Logger = b.logger
629	multiplierRaw, ok := b.conf["performance_multiplier"]
630	multiplier := 5
631	if ok {
632		var err error
633		multiplier, err = strconv.Atoi(multiplierRaw)
634		if err != nil {
635			return err
636		}
637	}
638	config.ElectionTimeout = config.ElectionTimeout * time.Duration(multiplier)
639	config.HeartbeatTimeout = config.HeartbeatTimeout * time.Duration(multiplier)
640	config.LeaderLeaseTimeout = config.LeaderLeaseTimeout * time.Duration(multiplier)
641
642	snapThresholdRaw, ok := b.conf["snapshot_threshold"]
643	if ok {
644		var err error
645		snapThreshold, err := strconv.Atoi(snapThresholdRaw)
646		if err != nil {
647			return err
648		}
649		config.SnapshotThreshold = uint64(snapThreshold)
650	}
651
652	trailingLogsRaw, ok := b.conf["trailing_logs"]
653	if ok {
654		var err error
655		trailingLogs, err := strconv.Atoi(trailingLogsRaw)
656		if err != nil {
657			return err
658		}
659		config.TrailingLogs = uint64(trailingLogs)
660	}
661	snapshotIntervalRaw, ok := b.conf["snapshot_interval"]
662	if ok {
663		var err error
664		snapshotInterval, err := time.ParseDuration(snapshotIntervalRaw)
665		if err != nil {
666			return err
667		}
668		config.SnapshotInterval = snapshotInterval
669	}
670
671	config.NoSnapshotRestoreOnStart = true
672	config.MaxAppendEntries = 64
673
674	// Setting BatchApplyCh allows the raft library to enqueue up to
675	// MaxAppendEntries into each raft apply rather than relying on the
676	// scheduler.
677	config.BatchApplyCh = true
678
679	return nil
680}
681
682// SetupOpts are used to pass options to the raft setup function.
683type SetupOpts struct {
684	// TLSKeyring is the keyring to use for the cluster traffic.
685	TLSKeyring *TLSKeyring
686
687	// ClusterListener is the cluster hook used to register the raft handler and
688	// client with core's cluster listeners.
689	ClusterListener cluster.ClusterHook
690
691	// StartAsLeader is used to specify this node should start as leader and
692	// bypass the leader election. This should be used with caution.
693	StartAsLeader bool
694
695	// RecoveryModeConfig is the configuration for the raft cluster in recovery
696	// mode.
697	RecoveryModeConfig *raft.Configuration
698}
699
700func (b *RaftBackend) StartRecoveryCluster(ctx context.Context, peer Peer) error {
701	recoveryModeConfig := &raft.Configuration{
702		Servers: []raft.Server{
703			{
704				ID:      raft.ServerID(peer.ID),
705				Address: raft.ServerAddress(peer.Address),
706			},
707		},
708	}
709
710	return b.SetupCluster(context.Background(), SetupOpts{
711		StartAsLeader:      true,
712		RecoveryModeConfig: recoveryModeConfig,
713	})
714}
715
716func (b *RaftBackend) HasState() (bool, error) {
717	b.l.RLock()
718	defer b.l.RUnlock()
719
720	return raft.HasExistingState(b.logStore, b.stableStore, b.snapStore)
721}
722
723// SetupCluster starts the raft cluster and enables the networking needed for
724// the raft nodes to communicate.
725func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
726	b.logger.Trace("setting up raft cluster")
727
728	b.l.Lock()
729	defer b.l.Unlock()
730
731	// We are already unsealed
732	if b.raft != nil {
733		b.logger.Debug("raft already started, not setting up cluster")
734		return nil
735	}
736
737	if len(b.localID) == 0 {
738		return errors.New("no local node id configured")
739	}
740
741	// Setup the raft config
742	raftConfig := raft.DefaultConfig()
743	if err := b.applyConfigSettings(raftConfig); err != nil {
744		return err
745	}
746
747	listenerIsNil := func(cl cluster.ClusterHook) bool {
748		switch {
749		case opts.ClusterListener == nil:
750			return true
751		default:
752			// Concrete type checks
753			switch cl.(type) {
754			case *cluster.Listener:
755				return cl.(*cluster.Listener) == nil
756			}
757		}
758		return false
759	}
760
761	switch {
762	case opts.TLSKeyring == nil && listenerIsNil(opts.ClusterListener):
763		// If we don't have a provided network we use an in-memory one.
764		// This allows us to bootstrap a node without bringing up a cluster
765		// network. This will be true during bootstrap, tests and dev modes.
766		_, b.raftTransport = raft.NewInmemTransportWithTimeout(raft.ServerAddress(b.localID), time.Second)
767	case opts.TLSKeyring == nil:
768		return errors.New("no keyring provided")
769	case listenerIsNil(opts.ClusterListener):
770		return errors.New("no cluster listener provided")
771	default:
772		// Set the local address and localID in the streaming layer and the raft config.
773		streamLayer, err := NewRaftLayer(b.logger.Named("stream"), opts.TLSKeyring, opts.ClusterListener)
774		if err != nil {
775			return err
776		}
777		transConfig := &raft.NetworkTransportConfig{
778			Stream:                streamLayer,
779			MaxPool:               3,
780			Timeout:               10 * time.Second,
781			ServerAddressProvider: b.serverAddressProvider,
782			Logger:                b.logger.Named("raft-net"),
783		}
784		transport := raft.NewNetworkTransportWithConfig(transConfig)
785
786		b.streamLayer = streamLayer
787		b.raftTransport = transport
788	}
789
790	raftConfig.LocalID = raft.ServerID(b.localID)
791
792	// Set up a channel for reliable leader notifications.
793	raftNotifyCh := make(chan bool, 10)
794	raftConfig.NotifyCh = raftNotifyCh
795
796	// If we have a bootstrapConfig set we should bootstrap now.
797	if b.bootstrapConfig != nil {
798		bootstrapConfig := b.bootstrapConfig
799		// Unset the bootstrap config
800		b.bootstrapConfig = nil
801
802		// Bootstrap raft with our known cluster members.
803		if err := raft.BootstrapCluster(raftConfig, b.logStore, b.stableStore, b.snapStore, b.raftTransport, *bootstrapConfig); err != nil {
804			return err
805		}
806	}
807
808	// Setup the Raft store.
809	b.fsm.SetNoopRestore(true)
810
811	raftPath := filepath.Join(b.dataDir, raftState)
812	peersFile := filepath.Join(raftPath, peersFileName)
813	_, err := os.Stat(peersFile)
814	if err == nil {
815		b.logger.Info("raft recovery initiated", "recovery_file", peersFileName)
816
817		recoveryConfig, err := raft.ReadConfigJSON(peersFile)
818		if err != nil {
819			return fmt.Errorf("raft recovery failed to parse peers.json: %w", err)
820		}
821
822		// Non-voting servers are only allowed in enterprise. If Suffrage is disabled,
823		// error out to indicate that it isn't allowed.
824		for idx := range recoveryConfig.Servers {
825			if !nonVotersAllowed && recoveryConfig.Servers[idx].Suffrage == raft.Nonvoter {
826				return fmt.Errorf("raft recovery failed to parse configuration for node %q: setting `non_voter` is only supported in enterprise", recoveryConfig.Servers[idx].ID)
827			}
828		}
829
830		b.logger.Info("raft recovery found new config", "config", recoveryConfig)
831
832		err = raft.RecoverCluster(raftConfig, b.fsm, b.logStore, b.stableStore, b.snapStore, b.raftTransport, recoveryConfig)
833		if err != nil {
834			return fmt.Errorf("raft recovery failed: %w", err)
835		}
836
837		err = os.Remove(peersFile)
838		if err != nil {
839			return fmt.Errorf("raft recovery failed to delete peers.json; please delete manually: %w", err)
840		}
841		b.logger.Info("raft recovery deleted peers.json")
842	}
843
844	if opts.RecoveryModeConfig != nil {
845		err = raft.RecoverCluster(raftConfig, b.fsm, b.logStore, b.stableStore, b.snapStore, b.raftTransport, *opts.RecoveryModeConfig)
846		if err != nil {
847			return fmt.Errorf("recovering raft cluster failed: %w", err)
848		}
849	}
850
851	b.logger.Info("creating Raft", "config", fmt.Sprintf("%#v", raftConfig))
852	raftObj, err := raft.NewRaft(raftConfig, b.fsm.chunker, b.logStore, b.stableStore, b.snapStore, b.raftTransport)
853	b.fsm.SetNoopRestore(false)
854	if err != nil {
855		return err
856	}
857
858	// If we are expecting to start as leader wait until we win the election.
859	// This should happen quickly since there is only one node in the cluster.
860	// StartAsLeader is only set during init, recovery mode, storage migration,
861	// and tests.
862	if opts.StartAsLeader {
863		for {
864			if raftObj.State() == raft.Leader {
865				break
866			}
867
868			select {
869			case <-ctx.Done():
870				future := raftObj.Shutdown()
871				if future.Error() != nil {
872					return fmt.Errorf("shutdown while waiting for leadership: %w", future.Error())
873				}
874
875				return errors.New("shutdown while waiting for leadership")
876			case <-time.After(10 * time.Millisecond):
877			}
878		}
879	}
880
881	b.raft = raftObj
882	b.raftNotifyCh = raftNotifyCh
883
884	if err := b.fsm.upgradeLocalNodeConfig(); err != nil {
885		b.logger.Error("failed to upgrade local node configuration")
886		return err
887	}
888
889	if b.streamLayer != nil {
890		// Add Handler to the cluster.
891		opts.ClusterListener.AddHandler(consts.RaftStorageALPN, b.streamLayer)
892
893		// Add Client to the cluster.
894		opts.ClusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer)
895	}
896
897	// Close the init channel to signal setup has been completed
898	close(b.raftInitCh)
899
900	b.logger.Trace("finished setting up raft cluster")
901	return nil
902}
903
904// TeardownCluster shuts down the raft cluster
905func (b *RaftBackend) TeardownCluster(clusterListener cluster.ClusterHook) error {
906	if clusterListener != nil {
907		clusterListener.StopHandler(consts.RaftStorageALPN)
908		clusterListener.RemoveClient(consts.RaftStorageALPN)
909	}
910
911	b.l.Lock()
912
913	// Perform shutdown only if the raft object is non-nil. The object could be nil
914	// if the node is unsealed but has not joined the peer set.
915	var future raft.Future
916	if b.raft != nil {
917		future = b.raft.Shutdown()
918	}
919
920	b.raft = nil
921
922	// If we're tearing down, then we need to recreate the raftInitCh
923	b.raftInitCh = make(chan struct{})
924	b.l.Unlock()
925
926	if future != nil {
927		return future.Error()
928	}
929
930	return nil
931}
932
933// CommittedIndex returns the latest index committed to stable storage
934func (b *RaftBackend) CommittedIndex() uint64 {
935	b.l.RLock()
936	defer b.l.RUnlock()
937
938	if b.raft == nil {
939		return 0
940	}
941
942	return b.raft.LastIndex()
943}
944
945// AppliedIndex returns the latest index applied to the FSM
946func (b *RaftBackend) AppliedIndex() uint64 {
947	b.l.RLock()
948	defer b.l.RUnlock()
949
950	if b.fsm == nil {
951		return 0
952	}
953
954	// We use the latest index that the FSM has seen here, which may be behind
955	// raft.AppliedIndex() due to the async nature of the raft library.
956	indexState, _ := b.fsm.LatestState()
957	return indexState.Index
958}
959
960// Term returns the raft term of this node.
961func (b *RaftBackend) Term() uint64 {
962	b.l.RLock()
963	defer b.l.RUnlock()
964
965	if b.fsm == nil {
966		return 0
967	}
968
969	// We use the latest index that the FSM has seen here, which may be behind
970	// raft.AppliedIndex() due to the async nature of the raft library.
971	indexState, _ := b.fsm.LatestState()
972	return indexState.Term
973}
974
975// RemovePeer removes the given peer ID from the raft cluster. If the node is
976// ourselves we will give up leadership.
977func (b *RaftBackend) RemovePeer(ctx context.Context, peerID string) error {
978	b.l.RLock()
979	defer b.l.RUnlock()
980
981	if b.disableAutopilot {
982		if b.raft == nil {
983			return errors.New("raft storage is not initialized")
984		}
985		b.logger.Trace("removing server from raft", "id", peerID)
986		future := b.raft.RemoveServer(raft.ServerID(peerID), 0, 0)
987		return future.Error()
988	}
989
990	if b.autopilot == nil {
991		return errors.New("raft storage autopilot is not initialized")
992	}
993
994	b.logger.Trace("removing server from raft via autopilot", "id", peerID)
995	return b.autopilot.RemoveServer(raft.ServerID(peerID))
996}
997
998// GetConfigurationOffline is used to read the stale, last known raft
999// configuration to this node. It accesses the last state written into the
1000// FSM. When a server is online use GetConfiguration instead.
1001func (b *RaftBackend) GetConfigurationOffline() (*RaftConfigurationResponse, error) {
1002	b.l.RLock()
1003	defer b.l.RUnlock()
1004
1005	if b.raft != nil {
1006		return nil, errors.New("raft storage is initialized, used GetConfiguration instead")
1007	}
1008
1009	if b.fsm == nil {
1010		return nil, nil
1011	}
1012
1013	state, configuration := b.fsm.LatestState()
1014	config := &RaftConfigurationResponse{
1015		Index: state.Index,
1016	}
1017
1018	if configuration == nil || configuration.Servers == nil {
1019		return config, nil
1020	}
1021
1022	for _, server := range configuration.Servers {
1023		entry := &RaftServer{
1024			NodeID:  server.Id,
1025			Address: server.Address,
1026			// Since we are offline no node is the leader.
1027			Leader: false,
1028			Voter:  raft.ServerSuffrage(server.Suffrage) == raft.Voter,
1029		}
1030		config.Servers = append(config.Servers, entry)
1031	}
1032
1033	return config, nil
1034}
1035
1036func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationResponse, error) {
1037	b.l.RLock()
1038	defer b.l.RUnlock()
1039
1040	if b.raft == nil {
1041		return nil, errors.New("raft storage is not initialized")
1042	}
1043
1044	future := b.raft.GetConfiguration()
1045	if err := future.Error(); err != nil {
1046		return nil, err
1047	}
1048
1049	config := &RaftConfigurationResponse{
1050		Index: future.Index(),
1051	}
1052
1053	for _, server := range future.Configuration().Servers {
1054		entry := &RaftServer{
1055			NodeID:  string(server.ID),
1056			Address: string(server.Address),
1057			// Since we only service this request on the active node our node ID
1058			// denotes the raft leader.
1059			Leader:          string(server.ID) == b.NodeID(),
1060			Voter:           server.Suffrage == raft.Voter,
1061			ProtocolVersion: strconv.Itoa(raft.ProtocolVersionMax),
1062		}
1063		config.Servers = append(config.Servers, entry)
1064	}
1065
1066	return config, nil
1067}
1068
1069// AddPeer adds a new server to the raft cluster
1070func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) error {
1071	b.l.RLock()
1072	defer b.l.RUnlock()
1073
1074	if b.disableAutopilot {
1075		if b.raft == nil {
1076			return errors.New("raft storage is not initialized")
1077		}
1078		b.logger.Trace("adding server to raft", "id", peerID)
1079		future := b.raft.AddVoter(raft.ServerID(peerID), raft.ServerAddress(clusterAddr), 0, 0)
1080		return future.Error()
1081	}
1082
1083	if b.autopilot == nil {
1084		return errors.New("raft storage autopilot is not initialized")
1085	}
1086
1087	b.logger.Trace("adding server to raft via autopilot", "id", peerID)
1088	return b.autopilot.AddServer(&autopilot.Server{
1089		ID:          raft.ServerID(peerID),
1090		Name:        peerID,
1091		Address:     raft.ServerAddress(clusterAddr),
1092		RaftVersion: raft.ProtocolVersionMax,
1093		NodeType:    autopilot.NodeVoter,
1094	})
1095}
1096
1097// Peers returns all the servers present in the raft cluster
1098func (b *RaftBackend) Peers(ctx context.Context) ([]Peer, error) {
1099	b.l.RLock()
1100	defer b.l.RUnlock()
1101
1102	if b.raft == nil {
1103		return nil, errors.New("raft storage is not initialized")
1104	}
1105
1106	future := b.raft.GetConfiguration()
1107	if err := future.Error(); err != nil {
1108		return nil, err
1109	}
1110
1111	ret := make([]Peer, len(future.Configuration().Servers))
1112	for i, s := range future.Configuration().Servers {
1113		ret[i] = Peer{
1114			ID:       string(s.ID),
1115			Address:  string(s.Address),
1116			Suffrage: int(s.Suffrage),
1117		}
1118	}
1119
1120	return ret, nil
1121}
1122
1123// SnapshotHTTP is a wrapper for Snapshot that sends the snapshot as an HTTP
1124// response.
1125func (b *RaftBackend) SnapshotHTTP(out *logical.HTTPResponseWriter, access *seal.Access) error {
1126	out.Header().Add("Content-Disposition", "attachment")
1127	out.Header().Add("Content-Type", "application/gzip")
1128
1129	return b.Snapshot(out, access)
1130}
1131
1132// Snapshot takes a raft snapshot, packages it into a archive file and writes it
1133// to the provided writer. Seal access is used to encrypt the SHASUM file so we
1134// can validate the snapshot was taken using the same master keys or not.
1135func (b *RaftBackend) Snapshot(out io.Writer, access *seal.Access) error {
1136	b.l.RLock()
1137	defer b.l.RUnlock()
1138
1139	if b.raft == nil {
1140		return errors.New("raft storage is sealed")
1141	}
1142
1143	// If we have access to the seal create a sealer object
1144	var s snapshot.Sealer
1145	if access != nil {
1146		s = &sealer{
1147			access: access,
1148		}
1149	}
1150
1151	return snapshot.Write(b.logger.Named("snapshot"), b.raft, s, out)
1152}
1153
1154// WriteSnapshotToTemp reads a snapshot archive off the provided reader,
1155// extracts the data and writes the snapshot to a temporary file. The seal
1156// access is used to decrypt the SHASUM file in the archive to ensure this
1157// snapshot has the same master key as the running instance. If the provided
1158// access is nil then it will skip that validation.
1159func (b *RaftBackend) WriteSnapshotToTemp(in io.ReadCloser, access *seal.Access) (*os.File, func(), raft.SnapshotMeta, error) {
1160	b.l.RLock()
1161	defer b.l.RUnlock()
1162
1163	var metadata raft.SnapshotMeta
1164	if b.raft == nil {
1165		return nil, nil, metadata, errors.New("raft storage is sealed")
1166	}
1167
1168	// If we have access to the seal create a sealer object
1169	var s snapshot.Sealer
1170	if access != nil {
1171		s = &sealer{
1172			access: access,
1173		}
1174	}
1175
1176	snap, cleanup, err := snapshot.WriteToTempFileWithSealer(b.logger.Named("snapshot"), in, &metadata, s)
1177	return snap, cleanup, metadata, err
1178}
1179
1180// RestoreSnapshot applies the provided snapshot metadata and snapshot data to
1181// raft.
1182func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.SnapshotMeta, snap io.Reader) error {
1183	b.l.RLock()
1184	defer b.l.RUnlock()
1185
1186	if b.raft == nil {
1187		return errors.New("raft storage is not initialized")
1188	}
1189
1190	if err := b.raft.Restore(&metadata, snap, 0); err != nil {
1191		b.logger.Named("snapshot").Error("failed to restore snapshot", "error", err)
1192		return err
1193	}
1194
1195	// Apply a log that tells the follower nodes to run the restore callback
1196	// function. This is done after the restore call so we can be sure the
1197	// snapshot applied to a quorum of nodes.
1198	command := &LogData{
1199		Operations: []*LogOperation{
1200			{
1201				OpType: restoreCallbackOp,
1202			},
1203		},
1204	}
1205
1206	err := b.applyLog(ctx, command)
1207
1208	// Do a best-effort attempt to let the standbys apply the restoreCallbackOp
1209	// before we continue.
1210	time.Sleep(restoreOpDelayDuration)
1211	return err
1212}
1213
1214// Delete inserts an entry in the log to delete the given path
1215func (b *RaftBackend) Delete(ctx context.Context, path string) error {
1216	defer metrics.MeasureSince([]string{"raft-storage", "delete"}, time.Now())
1217	command := &LogData{
1218		Operations: []*LogOperation{
1219			{
1220				OpType: deleteOp,
1221				Key:    path,
1222			},
1223		},
1224	}
1225	b.permitPool.Acquire()
1226	defer b.permitPool.Release()
1227
1228	b.l.RLock()
1229	err := b.applyLog(ctx, command)
1230	b.l.RUnlock()
1231	return err
1232}
1233
1234// Get returns the value corresponding to the given path from the fsm
1235func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, error) {
1236	defer metrics.MeasureSince([]string{"raft-storage", "get"}, time.Now())
1237	if b.fsm == nil {
1238		return nil, errors.New("raft: fsm not configured")
1239	}
1240
1241	b.permitPool.Acquire()
1242	defer b.permitPool.Release()
1243
1244	entry, err := b.fsm.Get(ctx, path)
1245	if entry != nil {
1246		valueLen := len(entry.Value)
1247		if uint64(valueLen) > b.maxEntrySize {
1248			b.logger.Warn("retrieved entry value is too large, has raft's max_entry_size been reduced?",
1249				"size", valueLen, "max_entry_size", b.maxEntrySize)
1250		}
1251	}
1252
1253	return entry, err
1254}
1255
1256// Put inserts an entry in the log for the put operation. It will return an
1257// error if the resulting entry encoding exceeds the configured max_entry_size
1258// or if the call to applyLog fails.
1259func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error {
1260	defer metrics.MeasureSince([]string{"raft-storage", "put"}, time.Now())
1261	command := &LogData{
1262		Operations: []*LogOperation{
1263			{
1264				OpType: putOp,
1265				Key:    entry.Key,
1266				Value:  entry.Value,
1267			},
1268		},
1269	}
1270
1271	b.permitPool.Acquire()
1272	defer b.permitPool.Release()
1273
1274	b.l.RLock()
1275	err := b.applyLog(ctx, command)
1276	b.l.RUnlock()
1277	return err
1278}
1279
1280// List enumerates all the items under the prefix from the fsm
1281func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error) {
1282	defer metrics.MeasureSince([]string{"raft-storage", "list"}, time.Now())
1283	if b.fsm == nil {
1284		return nil, errors.New("raft: fsm not configured")
1285	}
1286
1287	b.permitPool.Acquire()
1288	defer b.permitPool.Release()
1289
1290	return b.fsm.List(ctx, prefix)
1291}
1292
1293// Transaction applies all the given operations into a single log and
1294// applies it.
1295func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
1296	defer metrics.MeasureSince([]string{"raft-storage", "transaction"}, time.Now())
1297	command := &LogData{
1298		Operations: make([]*LogOperation, len(txns)),
1299	}
1300	for i, txn := range txns {
1301		op := &LogOperation{}
1302		switch txn.Operation {
1303		case physical.PutOperation:
1304			op.OpType = putOp
1305			op.Key = txn.Entry.Key
1306			op.Value = txn.Entry.Value
1307		case physical.DeleteOperation:
1308			op.OpType = deleteOp
1309			op.Key = txn.Entry.Key
1310		default:
1311			return fmt.Errorf("%q is not a supported transaction operation", txn.Operation)
1312		}
1313
1314		command.Operations[i] = op
1315	}
1316
1317	b.permitPool.Acquire()
1318	defer b.permitPool.Release()
1319
1320	b.l.RLock()
1321	err := b.applyLog(ctx, command)
1322	b.l.RUnlock()
1323	return err
1324}
1325
1326// applyLog will take a given log command and apply it to the raft log. applyLog
1327// doesn't return until the log has been applied to a quorum of servers and is
1328// persisted to the local FSM. Caller should hold the backend's read lock.
1329func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error {
1330	if b.raft == nil {
1331		return errors.New("raft storage is not initialized")
1332	}
1333
1334	commandBytes, err := proto.Marshal(command)
1335	if err != nil {
1336		return err
1337	}
1338
1339	cmdSize := len(commandBytes)
1340	if uint64(cmdSize) > b.maxEntrySize {
1341		return fmt.Errorf("%s; got %d bytes, max: %d bytes", physical.ErrValueTooLarge, cmdSize, b.maxEntrySize)
1342	}
1343
1344	defer metrics.AddSample([]string{"raft-storage", "entry_size"}, float32(cmdSize))
1345
1346	var chunked bool
1347	var applyFuture raft.ApplyFuture
1348	switch {
1349	case len(commandBytes) <= raftchunking.ChunkSize:
1350		applyFuture = b.raft.Apply(commandBytes, 0)
1351	default:
1352		chunked = true
1353		applyFuture = raftchunking.ChunkingApply(commandBytes, nil, 0, b.raft.ApplyLog)
1354	}
1355
1356	if err := applyFuture.Error(); err != nil {
1357		return err
1358	}
1359
1360	resp := applyFuture.Response()
1361
1362	if chunked {
1363		// In this case we didn't apply all chunks successfully, possibly due
1364		// to a term change
1365		if resp == nil {
1366			// This returns the error in the interface because the raft library
1367			// returns errors from the FSM via the future, not via err from the
1368			// apply function. Downstream client code expects to see any error
1369			// from the FSM (as opposed to the apply itself) and decide whether
1370			// it can retry in the future's response.
1371			return errors.New("applying chunking failed, please retry")
1372		}
1373
1374		// We expect that this conversion should always work
1375		chunkedSuccess, ok := resp.(raftchunking.ChunkingSuccess)
1376		if !ok {
1377			return errors.New("unknown type of response back from chunking FSM")
1378		}
1379
1380		// Replace the reply with the inner wrapped version
1381		resp = chunkedSuccess.Response
1382	}
1383
1384	if resp, ok := resp.(*FSMApplyResponse); !ok || !resp.Success {
1385		return errors.New("could not apply data")
1386	}
1387
1388	return nil
1389}
1390
1391// HAEnabled is the implementation of the HABackend interface
1392func (b *RaftBackend) HAEnabled() bool { return true }
1393
1394// HAEnabled is the implementation of the HABackend interface
1395func (b *RaftBackend) LockWith(key, value string) (physical.Lock, error) {
1396	return &RaftLock{
1397		key:   key,
1398		value: []byte(value),
1399		b:     b,
1400	}, nil
1401}
1402
1403// SetDesiredSuffrage sets a field in the fsm indicating the suffrage intent for
1404// this node.
1405func (b *RaftBackend) SetDesiredSuffrage(nonVoter bool) error {
1406	b.l.Lock()
1407	defer b.l.Unlock()
1408
1409	var desiredSuffrage string
1410	switch nonVoter {
1411	case true:
1412		desiredSuffrage = "non-voter"
1413	default:
1414		desiredSuffrage = "voter"
1415	}
1416
1417	err := b.fsm.recordSuffrage(desiredSuffrage)
1418	if err != nil {
1419		return err
1420	}
1421
1422	return nil
1423}
1424
1425func (b *RaftBackend) DesiredSuffrage() string {
1426	return b.fsm.DesiredSuffrage()
1427}
1428
1429// RaftLock implements the physical Lock interface and enables HA for this
1430// backend. The Lock uses the raftNotifyCh for receiving leadership edge
1431// triggers. Vault's active duty matches raft's leadership.
1432type RaftLock struct {
1433	key   string
1434	value []byte
1435
1436	b *RaftBackend
1437}
1438
1439// monitorLeadership waits until we receive an update on the raftNotifyCh and
1440// closes the leaderLost channel.
1441func (l *RaftLock) monitorLeadership(stopCh <-chan struct{}, leaderNotifyCh <-chan bool) <-chan struct{} {
1442	leaderLost := make(chan struct{})
1443	go func() {
1444		for {
1445			select {
1446			case isLeader := <-leaderNotifyCh:
1447				// leaderNotifyCh may deliver a true value initially if this
1448				// server is already the leader prior to RaftLock.Lock call
1449				// (the true message was already queued). The next message is
1450				// always going to be false. The for loop should loop at most
1451				// twice.
1452				if !isLeader {
1453					close(leaderLost)
1454					return
1455				}
1456			case <-stopCh:
1457				return
1458			}
1459		}
1460	}()
1461	return leaderLost
1462}
1463
1464// Lock blocks until we become leader or are shutdown. It returns a channel that
1465// is closed when we detect a loss of leadership.
1466func (l *RaftLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
1467	// If not initialized, block until it is
1468	if !l.b.Initialized() {
1469		select {
1470		case <-l.b.raftInitCh:
1471		case <-stopCh:
1472			return nil, nil
1473		}
1474	}
1475
1476	l.b.l.RLock()
1477
1478	// Ensure that we still have a raft instance after grabbing the read lock
1479	if l.b.raft == nil {
1480		l.b.l.RUnlock()
1481		return nil, errors.New("attempted to grab a lock on a nil raft backend")
1482	}
1483
1484	// Cache the notifyCh locally
1485	leaderNotifyCh := l.b.raftNotifyCh
1486
1487	// Check to see if we are already leader.
1488	if l.b.raft.State() == raft.Leader {
1489		err := l.b.applyLog(context.Background(), &LogData{
1490			Operations: []*LogOperation{
1491				{
1492					OpType: putOp,
1493					Key:    l.key,
1494					Value:  l.value,
1495				},
1496			},
1497		})
1498		l.b.l.RUnlock()
1499		if err != nil {
1500			return nil, err
1501		}
1502
1503		return l.monitorLeadership(stopCh, leaderNotifyCh), nil
1504	}
1505	l.b.l.RUnlock()
1506
1507	for {
1508		select {
1509		case isLeader := <-leaderNotifyCh:
1510			if isLeader {
1511				// We are leader, set the key
1512				l.b.l.RLock()
1513				err := l.b.applyLog(context.Background(), &LogData{
1514					Operations: []*LogOperation{
1515						{
1516							OpType: putOp,
1517							Key:    l.key,
1518							Value:  l.value,
1519						},
1520					},
1521				})
1522				l.b.l.RUnlock()
1523				if err != nil {
1524					return nil, err
1525				}
1526
1527				return l.monitorLeadership(stopCh, leaderNotifyCh), nil
1528			}
1529		case <-stopCh:
1530			return nil, nil
1531		}
1532	}
1533}
1534
1535// Unlock gives up leadership.
1536func (l *RaftLock) Unlock() error {
1537	if l.b.raft == nil {
1538		return nil
1539	}
1540
1541	return l.b.raft.LeadershipTransfer().Error()
1542}
1543
1544// Value reads the value of the lock. This informs us who is currently leader.
1545func (l *RaftLock) Value() (bool, string, error) {
1546	e, err := l.b.Get(context.Background(), l.key)
1547	if err != nil {
1548		return false, "", err
1549	}
1550	if e == nil {
1551		return false, "", nil
1552	}
1553
1554	value := string(e.Value)
1555	// TODO: how to tell if held?
1556	return true, value, nil
1557}
1558
1559// sealer implements the snapshot.Sealer interface and is used in the snapshot
1560// process for encrypting/decrypting the SHASUM file in snapshot archives.
1561type sealer struct {
1562	access *seal.Access
1563}
1564
1565// Seal encrypts the data with using the seal access object.
1566func (s sealer) Seal(ctx context.Context, pt []byte) ([]byte, error) {
1567	if s.access == nil {
1568		return nil, errors.New("no seal access available")
1569	}
1570	eblob, err := s.access.Encrypt(ctx, pt, nil)
1571	if err != nil {
1572		return nil, err
1573	}
1574
1575	return proto.Marshal(eblob)
1576}
1577
1578// Open decrypts the data using the seal access object.
1579func (s sealer) Open(ctx context.Context, ct []byte) ([]byte, error) {
1580	if s.access == nil {
1581		return nil, errors.New("no seal access available")
1582	}
1583
1584	var eblob wrapping.EncryptedBlobInfo
1585	err := proto.Unmarshal(ct, &eblob)
1586	if err != nil {
1587		return nil, err
1588	}
1589
1590	return s.access.Decrypt(ctx, &eblob, nil)
1591}
1592
1593// freelistOptions returns the freelist type and nofreelistsync values to use
1594// when opening boltdb files, based on our preferred defaults, and the possible
1595// presence of overriding environment variables.
1596func freelistOptions() (bolt.FreelistType, bool) {
1597	freelistType := bolt.FreelistMapType
1598	noFreelistSync := true
1599
1600	if os.Getenv("VAULT_RAFT_FREELIST_TYPE") == "array" {
1601		freelistType = bolt.FreelistArrayType
1602	}
1603
1604	if os.Getenv("VAULT_RAFT_FREELIST_SYNC") != "" {
1605		noFreelistSync = false
1606	}
1607
1608	return freelistType, noFreelistSync
1609}
1610