1package raft 2 3import ( 4 "context" 5 "crypto/tls" 6 "errors" 7 "fmt" 8 "io" 9 "io/ioutil" 10 "os" 11 "path/filepath" 12 "strconv" 13 "sync" 14 "time" 15 16 "github.com/armon/go-metrics" 17 "github.com/golang/protobuf/proto" 18 log "github.com/hashicorp/go-hclog" 19 wrapping "github.com/hashicorp/go-kms-wrapping" 20 "github.com/hashicorp/go-raftchunking" 21 "github.com/hashicorp/go-uuid" 22 "github.com/hashicorp/raft" 23 autopilot "github.com/hashicorp/raft-autopilot" 24 raftboltdb "github.com/hashicorp/raft-boltdb/v2" 25 snapshot "github.com/hashicorp/raft-snapshot" 26 "github.com/hashicorp/vault/helper/metricsutil" 27 "github.com/hashicorp/vault/sdk/helper/consts" 28 "github.com/hashicorp/vault/sdk/helper/jsonutil" 29 "github.com/hashicorp/vault/sdk/helper/tlsutil" 30 "github.com/hashicorp/vault/sdk/logical" 31 "github.com/hashicorp/vault/sdk/physical" 32 "github.com/hashicorp/vault/vault/cluster" 33 "github.com/hashicorp/vault/vault/seal" 34 bolt "go.etcd.io/bbolt" 35) 36 37// EnvVaultRaftNodeID is used to fetch the Raft node ID from the environment. 38const EnvVaultRaftNodeID = "VAULT_RAFT_NODE_ID" 39 40// EnvVaultRaftPath is used to fetch the path where Raft data is stored from the environment. 41const EnvVaultRaftPath = "VAULT_RAFT_PATH" 42 43// Verify RaftBackend satisfies the correct interfaces 44var ( 45 _ physical.Backend = (*RaftBackend)(nil) 46 _ physical.Transactional = (*RaftBackend)(nil) 47 _ physical.HABackend = (*RaftBackend)(nil) 48 _ physical.Lock = (*RaftLock)(nil) 49) 50 51var ( 52 // raftLogCacheSize is the maximum number of logs to cache in-memory. 53 // This is used to reduce disk I/O for the recently committed entries. 54 raftLogCacheSize = 512 55 56 raftState = "raft/" 57 peersFileName = "peers.json" 58 59 restoreOpDelayDuration = 5 * time.Second 60 61 defaultMaxEntrySize = uint64(2 * raftchunking.ChunkSize) 62) 63 64// RaftBackend implements the backend interfaces and uses the raft protocol to 65// persist writes to the FSM. 66type RaftBackend struct { 67 logger log.Logger 68 conf map[string]string 69 l sync.RWMutex 70 71 // fsm is the state store for vault's data 72 fsm *FSM 73 74 // raft is the instance of raft we will operate on. 75 raft *raft.Raft 76 77 // raftInitCh is used to block during HA lock acquisition if raft 78 // has not been initialized yet, which can occur if raft is being 79 // used for HA-only. 80 raftInitCh chan struct{} 81 82 // raftNotifyCh is used to receive updates about leadership changes 83 // regarding this node. 84 raftNotifyCh chan bool 85 86 // streamLayer is the network layer used to connect the nodes in the raft 87 // cluster. 88 streamLayer *raftLayer 89 90 // raftTransport is the transport layer that the raft library uses for RPC 91 // communication. 92 raftTransport raft.Transport 93 94 // snapStore is our snapshot mechanism. 95 snapStore raft.SnapshotStore 96 97 // logStore is used by the raft library to store the raft logs in durable 98 // storage. 99 logStore raft.LogStore 100 101 // stableStore is used by the raft library to store additional metadata in 102 // durable storage. 103 stableStore raft.StableStore 104 105 // bootstrapConfig is only set when this node needs to be bootstrapped upon 106 // startup. 107 bootstrapConfig *raft.Configuration 108 109 // dataDir is the location on the local filesystem that raft and FSM data 110 // will be stored. 111 dataDir string 112 113 // localID is the ID for this node. This can either be configured in the 114 // config file, via a file on disk, or is otherwise randomly generated. 115 localID string 116 117 // serverAddressProvider is used to map server IDs to addresses. 118 serverAddressProvider raft.ServerAddressProvider 119 120 // permitPool is used to limit the number of concurrent storage calls. 121 permitPool *physical.PermitPool 122 123 // maxEntrySize imposes a size limit (in bytes) on a raft entry (put or transaction). 124 // It is suggested to use a value of 2x the Raft chunking size for optimal 125 // performance. 126 maxEntrySize uint64 127 128 // autopilot is the instance of raft-autopilot library implementation of the 129 // autopilot features. This will be instantiated in both leader and followers. 130 // However, only active node will have a "running" autopilot. 131 autopilot *autopilot.Autopilot 132 133 // autopilotConfig represents the configuration required to instantiate autopilot. 134 autopilotConfig *AutopilotConfig 135 136 // followerStates represents the information about all the peers of the raft 137 // leader. This is used to track some state of the peers and as well as used 138 // to see if the peers are "alive" using the heartbeat received from them. 139 followerStates *FollowerStates 140 141 // followerHeartbeatTicker is used to compute dead servers using follower 142 // state heartbeats. 143 followerHeartbeatTicker *time.Ticker 144 145 // disableAutopilot if set will not put autopilot implementation to use. The 146 // fallback will be to interact with the raft instance directly. This can only 147 // be set during startup via the environment variable 148 // VAULT_RAFT_AUTOPILOT_DISABLE during startup and can't be updated once the 149 // node is up and running. 150 disableAutopilot bool 151 152 autopilotReconcileInterval time.Duration 153} 154 155// LeaderJoinInfo contains information required by a node to join itself as a 156// follower to an existing raft cluster 157type LeaderJoinInfo struct { 158 // AutoJoin defines any cloud auto-join metadata. If supplied, Vault will 159 // attempt to automatically discover peers in addition to what can be provided 160 // via 'leader_api_addr'. 161 AutoJoin string `json:"auto_join"` 162 163 // AutoJoinScheme defines the optional URI protocol scheme for addresses 164 // discovered via auto-join. 165 AutoJoinScheme string `json:"auto_join_scheme"` 166 167 // AutoJoinPort defines the optional port used for addressed discovered via 168 // auto-join. 169 AutoJoinPort uint `json:"auto_join_port"` 170 171 // LeaderAPIAddr is the address of the leader node to connect to 172 LeaderAPIAddr string `json:"leader_api_addr"` 173 174 // LeaderCACert is the CA cert of the leader node 175 LeaderCACert string `json:"leader_ca_cert"` 176 177 // LeaderClientCert is the client certificate for the follower node to 178 // establish client authentication during TLS 179 LeaderClientCert string `json:"leader_client_cert"` 180 181 // LeaderClientKey is the client key for the follower node to establish 182 // client authentication during TLS. 183 LeaderClientKey string `json:"leader_client_key"` 184 185 // LeaderCACertFile is the path on disk to the the CA cert file of the 186 // leader node. This should only be provided via Vault's configuration file. 187 LeaderCACertFile string `json:"leader_ca_cert_file"` 188 189 // LeaderClientCertFile is the path on disk to the client certificate file 190 // for the follower node to establish client authentication during TLS. This 191 // should only be provided via Vault's configuration file. 192 LeaderClientCertFile string `json:"leader_client_cert_file"` 193 194 // LeaderClientKeyFile is the path on disk to the client key file for the 195 // follower node to establish client authentication during TLS. This should 196 // only be provided via Vault's configuration file. 197 LeaderClientKeyFile string `json:"leader_client_key_file"` 198 199 // LeaderTLSServerName is the optional ServerName to expect in the leader's 200 // certificate, instead of the host/IP we're actually connecting to. 201 LeaderTLSServerName string `json:"leader_tls_servername"` 202 203 // Retry indicates if the join process should automatically be retried 204 Retry bool `json:"-"` 205 206 // TLSConfig for the API client to use when communicating with the leader node 207 TLSConfig *tls.Config `json:"-"` 208} 209 210// JoinConfig returns a list of information about possible leader nodes that 211// this node can join as a follower 212func (b *RaftBackend) JoinConfig() ([]*LeaderJoinInfo, error) { 213 config := b.conf["retry_join"] 214 if config == "" { 215 return nil, nil 216 } 217 218 var leaderInfos []*LeaderJoinInfo 219 err := jsonutil.DecodeJSON([]byte(config), &leaderInfos) 220 if err != nil { 221 return nil, fmt.Errorf("failed to decode retry_join config: %w", err) 222 } 223 224 if len(leaderInfos) == 0 { 225 return nil, errors.New("invalid retry_join config") 226 } 227 228 for i, info := range leaderInfos { 229 if len(info.AutoJoin) != 0 && len(info.LeaderAPIAddr) != 0 { 230 return nil, errors.New("cannot provide both a leader_api_addr and auto_join") 231 } 232 233 if info.AutoJoinScheme != "" && (info.AutoJoinScheme != "http" && info.AutoJoinScheme != "https") { 234 return nil, fmt.Errorf("invalid scheme '%s'; must either be http or https", info.AutoJoinScheme) 235 } 236 237 info.Retry = true 238 info.TLSConfig, err = parseTLSInfo(info) 239 if err != nil { 240 return nil, fmt.Errorf("failed to create tls config to communicate with leader node (retry_join index: %d): %w", i, err) 241 } 242 } 243 244 return leaderInfos, nil 245} 246 247// parseTLSInfo is a helper for parses the TLS information, preferring file 248// paths over raw certificate content. 249func parseTLSInfo(leaderInfo *LeaderJoinInfo) (*tls.Config, error) { 250 var tlsConfig *tls.Config 251 var err error 252 if len(leaderInfo.LeaderCACertFile) != 0 || len(leaderInfo.LeaderClientCertFile) != 0 || len(leaderInfo.LeaderClientKeyFile) != 0 { 253 tlsConfig, err = tlsutil.LoadClientTLSConfig(leaderInfo.LeaderCACertFile, leaderInfo.LeaderClientCertFile, leaderInfo.LeaderClientKeyFile) 254 if err != nil { 255 return nil, err 256 } 257 } else if len(leaderInfo.LeaderCACert) != 0 || len(leaderInfo.LeaderClientCert) != 0 || len(leaderInfo.LeaderClientKey) != 0 { 258 tlsConfig, err = tlsutil.ClientTLSConfig([]byte(leaderInfo.LeaderCACert), []byte(leaderInfo.LeaderClientCert), []byte(leaderInfo.LeaderClientKey)) 259 if err != nil { 260 return nil, err 261 } 262 } 263 if tlsConfig != nil { 264 tlsConfig.ServerName = leaderInfo.LeaderTLSServerName 265 } 266 267 return tlsConfig, nil 268} 269 270// EnsurePath is used to make sure a path exists 271func EnsurePath(path string, dir bool) error { 272 if !dir { 273 path = filepath.Dir(path) 274 } 275 return os.MkdirAll(path, 0o755) 276} 277 278// NewRaftBackend constructs a RaftBackend using the given directory 279func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) { 280 path := os.Getenv(EnvVaultRaftPath) 281 if path == "" { 282 pathFromConfig, ok := conf["path"] 283 if !ok { 284 return nil, fmt.Errorf("'path' must be set") 285 } 286 path = pathFromConfig 287 } 288 289 var localID string 290 { 291 // Determine the local node ID from the environment. 292 if raftNodeID := os.Getenv(EnvVaultRaftNodeID); raftNodeID != "" { 293 localID = raftNodeID 294 } 295 296 // If not set in the environment check the configuration file. 297 if len(localID) == 0 { 298 localID = conf["node_id"] 299 } 300 301 // If not set in the config check the "node-id" file. 302 if len(localID) == 0 { 303 localIDRaw, err := ioutil.ReadFile(filepath.Join(path, "node-id")) 304 switch { 305 case err == nil: 306 if len(localIDRaw) > 0 { 307 localID = string(localIDRaw) 308 } 309 case os.IsNotExist(err): 310 default: 311 return nil, err 312 } 313 } 314 315 // If all of the above fails generate a UUID and persist it to the 316 // "node-id" file. 317 if len(localID) == 0 { 318 id, err := uuid.GenerateUUID() 319 if err != nil { 320 return nil, err 321 } 322 323 if err := ioutil.WriteFile(filepath.Join(path, "node-id"), []byte(id), 0o600); err != nil { 324 return nil, err 325 } 326 327 localID = id 328 } 329 } 330 331 // Create the FSM. 332 fsm, err := NewFSM(path, localID, logger.Named("fsm")) 333 if err != nil { 334 return nil, fmt.Errorf("failed to create fsm: %v", err) 335 } 336 337 if delayRaw, ok := conf["apply_delay"]; ok { 338 delay, err := time.ParseDuration(delayRaw) 339 if err != nil { 340 return nil, fmt.Errorf("apply_delay does not parse as a duration: %w", err) 341 } 342 fsm.applyCallback = func() { 343 time.Sleep(delay) 344 } 345 } 346 347 // Build an all in-memory setup for dev mode, otherwise prepare a full 348 // disk-based setup. 349 var log raft.LogStore 350 var stable raft.StableStore 351 var snap raft.SnapshotStore 352 353 var devMode bool 354 if devMode { 355 store := raft.NewInmemStore() 356 stable = store 357 log = store 358 snap = raft.NewInmemSnapshotStore() 359 } else { 360 // Create the base raft path. 361 path := filepath.Join(path, raftState) 362 if err := EnsurePath(path, true); err != nil { 363 return nil, err 364 } 365 366 // Create the backend raft store for logs and stable storage. 367 freelistType, noFreelistSync := freelistOptions() 368 raftOptions := raftboltdb.Options{ 369 Path: filepath.Join(path, "raft.db"), 370 BoltOptions: &bolt.Options{ 371 FreelistType: freelistType, 372 NoFreelistSync: noFreelistSync, 373 }, 374 } 375 store, err := raftboltdb.New(raftOptions) 376 if err != nil { 377 return nil, err 378 } 379 stable = store 380 381 // Wrap the store in a LogCache to improve performance. 382 cacheStore, err := raft.NewLogCache(raftLogCacheSize, store) 383 if err != nil { 384 return nil, err 385 } 386 log = cacheStore 387 388 // Create the snapshot store. 389 snapshots, err := NewBoltSnapshotStore(path, logger.Named("snapshot"), fsm) 390 if err != nil { 391 return nil, err 392 } 393 snap = snapshots 394 } 395 396 if delayRaw, ok := conf["snapshot_delay"]; ok { 397 delay, err := time.ParseDuration(delayRaw) 398 if err != nil { 399 return nil, fmt.Errorf("snapshot_delay does not parse as a duration: %w", err) 400 } 401 snap = newSnapshotStoreDelay(snap, delay, logger) 402 } 403 404 maxEntrySize := defaultMaxEntrySize 405 if maxEntrySizeCfg := conf["max_entry_size"]; len(maxEntrySizeCfg) != 0 { 406 i, err := strconv.Atoi(maxEntrySizeCfg) 407 if err != nil { 408 return nil, fmt.Errorf("failed to parse 'max_entry_size': %w", err) 409 } 410 411 maxEntrySize = uint64(i) 412 } 413 414 var reconcileInterval time.Duration 415 if interval := conf["autopilot_reconcile_interval"]; interval != "" { 416 interval, err := time.ParseDuration(interval) 417 if err != nil { 418 return nil, fmt.Errorf("autopilot_reconcile_interval does not parse as a duration: %w", err) 419 } 420 reconcileInterval = interval 421 } 422 423 return &RaftBackend{ 424 logger: logger, 425 fsm: fsm, 426 raftInitCh: make(chan struct{}), 427 conf: conf, 428 logStore: log, 429 stableStore: stable, 430 snapStore: snap, 431 dataDir: path, 432 localID: localID, 433 permitPool: physical.NewPermitPool(physical.DefaultParallelOperations), 434 maxEntrySize: maxEntrySize, 435 followerHeartbeatTicker: time.NewTicker(time.Second), 436 autopilotReconcileInterval: reconcileInterval, 437 }, nil 438} 439 440type snapshotStoreDelay struct { 441 logger log.Logger 442 wrapped raft.SnapshotStore 443 delay time.Duration 444} 445 446func (s snapshotStoreDelay) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) { 447 s.logger.Trace("delaying before creating snapshot", "delay", s.delay) 448 time.Sleep(s.delay) 449 return s.wrapped.Create(version, index, term, configuration, configurationIndex, trans) 450} 451 452func (s snapshotStoreDelay) List() ([]*raft.SnapshotMeta, error) { 453 return s.wrapped.List() 454} 455 456func (s snapshotStoreDelay) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) { 457 return s.wrapped.Open(id) 458} 459 460var _ raft.SnapshotStore = &snapshotStoreDelay{} 461 462func newSnapshotStoreDelay(snap raft.SnapshotStore, delay time.Duration, logger log.Logger) *snapshotStoreDelay { 463 return &snapshotStoreDelay{ 464 logger: logger, 465 wrapped: snap, 466 delay: delay, 467 } 468} 469 470// Close is used to gracefully close all file resources. N.B. This method 471// should only be called if you are sure the RaftBackend will never be used 472// again. 473func (b *RaftBackend) Close() error { 474 b.l.Lock() 475 defer b.l.Unlock() 476 477 if err := b.fsm.db.Close(); err != nil { 478 return err 479 } 480 481 if err := b.stableStore.(*raftboltdb.BoltStore).Close(); err != nil { 482 return err 483 } 484 485 return nil 486} 487 488func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) { 489 b.l.RLock() 490 logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats() 491 fsmStats := b.fsm.db.Stats() 492 b.l.RUnlock() 493 b.collectMetricsWithStats(logstoreStats, sink, "logstore") 494 b.collectMetricsWithStats(fsmStats, sink, "fsm") 495} 496 497func (b *RaftBackend) collectMetricsWithStats(stats bolt.Stats, sink *metricsutil.ClusterMetricSink, database string) { 498 txstats := stats.TxStats 499 labels := []metricsutil.Label{{"database", database}} 500 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "free_pages"}, float32(stats.FreePageN), labels) 501 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "pending_pages"}, float32(stats.PendingPageN), labels) 502 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "allocated_bytes"}, float32(stats.FreeAlloc), labels) 503 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "used_bytes"}, float32(stats.FreelistInuse), labels) 504 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "transaction", "started_read_transactions"}, float32(stats.TxN), labels) 505 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "transaction", "currently_open_read_transactions"}, float32(stats.OpenTxN), labels) 506 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "page", "count"}, float32(txstats.PageCount), labels) 507 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "page", "bytes_allocated"}, float32(txstats.PageAlloc), labels) 508 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "cursor", "count"}, float32(txstats.CursorCount), labels) 509 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "node", "count"}, float32(txstats.NodeCount), labels) 510 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "node", "dereferences"}, float32(txstats.NodeDeref), labels) 511 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "rebalance", "count"}, float32(txstats.Rebalance), labels) 512 sink.AddSampleWithLabels([]string{"raft_storage", "bolt", "rebalance", "time"}, float32(txstats.RebalanceTime), labels) 513 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "split", "count"}, float32(txstats.Split), labels) 514 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "spill", "count"}, float32(txstats.Spill), labels) 515 sink.AddSampleWithLabels([]string{"raft_storage", "bolt", "spill", "time"}, float32(txstats.SpillTime), labels) 516 sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "write", "count"}, float32(txstats.Write), labels) 517 sink.AddSampleWithLabels([]string{"raft_storage", "bolt", "write", "time"}, float32(txstats.WriteTime), labels) 518} 519 520// RaftServer has information about a server in the Raft configuration 521type RaftServer struct { 522 // NodeID is the name of the server 523 NodeID string `json:"node_id"` 524 525 // Address is the IP:port of the server, used for Raft communications 526 Address string `json:"address"` 527 528 // Leader is true if this server is the current cluster leader 529 Leader bool `json:"leader"` 530 531 // Protocol version is the raft protocol version used by the server 532 ProtocolVersion string `json:"protocol_version"` 533 534 // Voter is true if this server has a vote in the cluster. This might 535 // be false if the server is staging and still coming online. 536 Voter bool `json:"voter"` 537} 538 539// RaftConfigurationResponse is returned when querying for the current Raft 540// configuration. 541type RaftConfigurationResponse struct { 542 // Servers has the list of servers in the Raft configuration. 543 Servers []*RaftServer `json:"servers"` 544 545 // Index has the Raft index of this configuration. 546 Index uint64 `json:"index"` 547} 548 549// Peer defines the ID and Address for a given member of the raft cluster. 550type Peer struct { 551 ID string `json:"id"` 552 Address string `json:"address"` 553 Suffrage int `json:"suffrage"` 554} 555 556// NodeID returns the identifier of the node 557func (b *RaftBackend) NodeID() string { 558 return b.localID 559} 560 561// Initialized tells if raft is running or not 562func (b *RaftBackend) Initialized() bool { 563 b.l.RLock() 564 init := b.raft != nil 565 b.l.RUnlock() 566 return init 567} 568 569// SetTLSKeyring is used to install a new keyring. If the active key has changed 570// it will also close any network connections or streams forcing a reconnect 571// with the new key. 572func (b *RaftBackend) SetTLSKeyring(keyring *TLSKeyring) error { 573 b.l.RLock() 574 err := b.streamLayer.setTLSKeyring(keyring) 575 b.l.RUnlock() 576 577 return err 578} 579 580// SetServerAddressProvider sets a the address provider for determining the raft 581// node addresses. This is currently only used in tests. 582func (b *RaftBackend) SetServerAddressProvider(provider raft.ServerAddressProvider) { 583 b.l.Lock() 584 b.serverAddressProvider = provider 585 b.l.Unlock() 586} 587 588// Bootstrap prepares the given peers to be part of the raft cluster 589func (b *RaftBackend) Bootstrap(peers []Peer) error { 590 b.l.Lock() 591 defer b.l.Unlock() 592 593 hasState, err := raft.HasExistingState(b.logStore, b.stableStore, b.snapStore) 594 if err != nil { 595 return err 596 } 597 598 if hasState { 599 return errors.New("error bootstrapping cluster: cluster already has state") 600 } 601 602 raftConfig := &raft.Configuration{ 603 Servers: make([]raft.Server, len(peers)), 604 } 605 606 for i, p := range peers { 607 raftConfig.Servers[i] = raft.Server{ 608 ID: raft.ServerID(p.ID), 609 Address: raft.ServerAddress(p.Address), 610 Suffrage: raft.ServerSuffrage(p.Suffrage), 611 } 612 } 613 614 // Store the config for later use 615 b.bootstrapConfig = raftConfig 616 return nil 617} 618 619// SetRestoreCallback sets the callback to be used when a restoreCallbackOp is 620// processed through the FSM. 621func (b *RaftBackend) SetRestoreCallback(restoreCb restoreCallback) { 622 b.fsm.l.Lock() 623 b.fsm.restoreCb = restoreCb 624 b.fsm.l.Unlock() 625} 626 627func (b *RaftBackend) applyConfigSettings(config *raft.Config) error { 628 config.Logger = b.logger 629 multiplierRaw, ok := b.conf["performance_multiplier"] 630 multiplier := 5 631 if ok { 632 var err error 633 multiplier, err = strconv.Atoi(multiplierRaw) 634 if err != nil { 635 return err 636 } 637 } 638 config.ElectionTimeout = config.ElectionTimeout * time.Duration(multiplier) 639 config.HeartbeatTimeout = config.HeartbeatTimeout * time.Duration(multiplier) 640 config.LeaderLeaseTimeout = config.LeaderLeaseTimeout * time.Duration(multiplier) 641 642 snapThresholdRaw, ok := b.conf["snapshot_threshold"] 643 if ok { 644 var err error 645 snapThreshold, err := strconv.Atoi(snapThresholdRaw) 646 if err != nil { 647 return err 648 } 649 config.SnapshotThreshold = uint64(snapThreshold) 650 } 651 652 trailingLogsRaw, ok := b.conf["trailing_logs"] 653 if ok { 654 var err error 655 trailingLogs, err := strconv.Atoi(trailingLogsRaw) 656 if err != nil { 657 return err 658 } 659 config.TrailingLogs = uint64(trailingLogs) 660 } 661 snapshotIntervalRaw, ok := b.conf["snapshot_interval"] 662 if ok { 663 var err error 664 snapshotInterval, err := time.ParseDuration(snapshotIntervalRaw) 665 if err != nil { 666 return err 667 } 668 config.SnapshotInterval = snapshotInterval 669 } 670 671 config.NoSnapshotRestoreOnStart = true 672 config.MaxAppendEntries = 64 673 674 // Setting BatchApplyCh allows the raft library to enqueue up to 675 // MaxAppendEntries into each raft apply rather than relying on the 676 // scheduler. 677 config.BatchApplyCh = true 678 679 return nil 680} 681 682// SetupOpts are used to pass options to the raft setup function. 683type SetupOpts struct { 684 // TLSKeyring is the keyring to use for the cluster traffic. 685 TLSKeyring *TLSKeyring 686 687 // ClusterListener is the cluster hook used to register the raft handler and 688 // client with core's cluster listeners. 689 ClusterListener cluster.ClusterHook 690 691 // StartAsLeader is used to specify this node should start as leader and 692 // bypass the leader election. This should be used with caution. 693 StartAsLeader bool 694 695 // RecoveryModeConfig is the configuration for the raft cluster in recovery 696 // mode. 697 RecoveryModeConfig *raft.Configuration 698} 699 700func (b *RaftBackend) StartRecoveryCluster(ctx context.Context, peer Peer) error { 701 recoveryModeConfig := &raft.Configuration{ 702 Servers: []raft.Server{ 703 { 704 ID: raft.ServerID(peer.ID), 705 Address: raft.ServerAddress(peer.Address), 706 }, 707 }, 708 } 709 710 return b.SetupCluster(context.Background(), SetupOpts{ 711 StartAsLeader: true, 712 RecoveryModeConfig: recoveryModeConfig, 713 }) 714} 715 716func (b *RaftBackend) HasState() (bool, error) { 717 b.l.RLock() 718 defer b.l.RUnlock() 719 720 return raft.HasExistingState(b.logStore, b.stableStore, b.snapStore) 721} 722 723// SetupCluster starts the raft cluster and enables the networking needed for 724// the raft nodes to communicate. 725func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error { 726 b.logger.Trace("setting up raft cluster") 727 728 b.l.Lock() 729 defer b.l.Unlock() 730 731 // We are already unsealed 732 if b.raft != nil { 733 b.logger.Debug("raft already started, not setting up cluster") 734 return nil 735 } 736 737 if len(b.localID) == 0 { 738 return errors.New("no local node id configured") 739 } 740 741 // Setup the raft config 742 raftConfig := raft.DefaultConfig() 743 if err := b.applyConfigSettings(raftConfig); err != nil { 744 return err 745 } 746 747 listenerIsNil := func(cl cluster.ClusterHook) bool { 748 switch { 749 case opts.ClusterListener == nil: 750 return true 751 default: 752 // Concrete type checks 753 switch cl.(type) { 754 case *cluster.Listener: 755 return cl.(*cluster.Listener) == nil 756 } 757 } 758 return false 759 } 760 761 switch { 762 case opts.TLSKeyring == nil && listenerIsNil(opts.ClusterListener): 763 // If we don't have a provided network we use an in-memory one. 764 // This allows us to bootstrap a node without bringing up a cluster 765 // network. This will be true during bootstrap, tests and dev modes. 766 _, b.raftTransport = raft.NewInmemTransportWithTimeout(raft.ServerAddress(b.localID), time.Second) 767 case opts.TLSKeyring == nil: 768 return errors.New("no keyring provided") 769 case listenerIsNil(opts.ClusterListener): 770 return errors.New("no cluster listener provided") 771 default: 772 // Set the local address and localID in the streaming layer and the raft config. 773 streamLayer, err := NewRaftLayer(b.logger.Named("stream"), opts.TLSKeyring, opts.ClusterListener) 774 if err != nil { 775 return err 776 } 777 transConfig := &raft.NetworkTransportConfig{ 778 Stream: streamLayer, 779 MaxPool: 3, 780 Timeout: 10 * time.Second, 781 ServerAddressProvider: b.serverAddressProvider, 782 Logger: b.logger.Named("raft-net"), 783 } 784 transport := raft.NewNetworkTransportWithConfig(transConfig) 785 786 b.streamLayer = streamLayer 787 b.raftTransport = transport 788 } 789 790 raftConfig.LocalID = raft.ServerID(b.localID) 791 792 // Set up a channel for reliable leader notifications. 793 raftNotifyCh := make(chan bool, 10) 794 raftConfig.NotifyCh = raftNotifyCh 795 796 // If we have a bootstrapConfig set we should bootstrap now. 797 if b.bootstrapConfig != nil { 798 bootstrapConfig := b.bootstrapConfig 799 // Unset the bootstrap config 800 b.bootstrapConfig = nil 801 802 // Bootstrap raft with our known cluster members. 803 if err := raft.BootstrapCluster(raftConfig, b.logStore, b.stableStore, b.snapStore, b.raftTransport, *bootstrapConfig); err != nil { 804 return err 805 } 806 } 807 808 // Setup the Raft store. 809 b.fsm.SetNoopRestore(true) 810 811 raftPath := filepath.Join(b.dataDir, raftState) 812 peersFile := filepath.Join(raftPath, peersFileName) 813 _, err := os.Stat(peersFile) 814 if err == nil { 815 b.logger.Info("raft recovery initiated", "recovery_file", peersFileName) 816 817 recoveryConfig, err := raft.ReadConfigJSON(peersFile) 818 if err != nil { 819 return fmt.Errorf("raft recovery failed to parse peers.json: %w", err) 820 } 821 822 // Non-voting servers are only allowed in enterprise. If Suffrage is disabled, 823 // error out to indicate that it isn't allowed. 824 for idx := range recoveryConfig.Servers { 825 if !nonVotersAllowed && recoveryConfig.Servers[idx].Suffrage == raft.Nonvoter { 826 return fmt.Errorf("raft recovery failed to parse configuration for node %q: setting `non_voter` is only supported in enterprise", recoveryConfig.Servers[idx].ID) 827 } 828 } 829 830 b.logger.Info("raft recovery found new config", "config", recoveryConfig) 831 832 err = raft.RecoverCluster(raftConfig, b.fsm, b.logStore, b.stableStore, b.snapStore, b.raftTransport, recoveryConfig) 833 if err != nil { 834 return fmt.Errorf("raft recovery failed: %w", err) 835 } 836 837 err = os.Remove(peersFile) 838 if err != nil { 839 return fmt.Errorf("raft recovery failed to delete peers.json; please delete manually: %w", err) 840 } 841 b.logger.Info("raft recovery deleted peers.json") 842 } 843 844 if opts.RecoveryModeConfig != nil { 845 err = raft.RecoverCluster(raftConfig, b.fsm, b.logStore, b.stableStore, b.snapStore, b.raftTransport, *opts.RecoveryModeConfig) 846 if err != nil { 847 return fmt.Errorf("recovering raft cluster failed: %w", err) 848 } 849 } 850 851 b.logger.Info("creating Raft", "config", fmt.Sprintf("%#v", raftConfig)) 852 raftObj, err := raft.NewRaft(raftConfig, b.fsm.chunker, b.logStore, b.stableStore, b.snapStore, b.raftTransport) 853 b.fsm.SetNoopRestore(false) 854 if err != nil { 855 return err 856 } 857 858 // If we are expecting to start as leader wait until we win the election. 859 // This should happen quickly since there is only one node in the cluster. 860 // StartAsLeader is only set during init, recovery mode, storage migration, 861 // and tests. 862 if opts.StartAsLeader { 863 for { 864 if raftObj.State() == raft.Leader { 865 break 866 } 867 868 select { 869 case <-ctx.Done(): 870 future := raftObj.Shutdown() 871 if future.Error() != nil { 872 return fmt.Errorf("shutdown while waiting for leadership: %w", future.Error()) 873 } 874 875 return errors.New("shutdown while waiting for leadership") 876 case <-time.After(10 * time.Millisecond): 877 } 878 } 879 } 880 881 b.raft = raftObj 882 b.raftNotifyCh = raftNotifyCh 883 884 if err := b.fsm.upgradeLocalNodeConfig(); err != nil { 885 b.logger.Error("failed to upgrade local node configuration") 886 return err 887 } 888 889 if b.streamLayer != nil { 890 // Add Handler to the cluster. 891 opts.ClusterListener.AddHandler(consts.RaftStorageALPN, b.streamLayer) 892 893 // Add Client to the cluster. 894 opts.ClusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer) 895 } 896 897 // Close the init channel to signal setup has been completed 898 close(b.raftInitCh) 899 900 b.logger.Trace("finished setting up raft cluster") 901 return nil 902} 903 904// TeardownCluster shuts down the raft cluster 905func (b *RaftBackend) TeardownCluster(clusterListener cluster.ClusterHook) error { 906 if clusterListener != nil { 907 clusterListener.StopHandler(consts.RaftStorageALPN) 908 clusterListener.RemoveClient(consts.RaftStorageALPN) 909 } 910 911 b.l.Lock() 912 913 // Perform shutdown only if the raft object is non-nil. The object could be nil 914 // if the node is unsealed but has not joined the peer set. 915 var future raft.Future 916 if b.raft != nil { 917 future = b.raft.Shutdown() 918 } 919 920 b.raft = nil 921 922 // If we're tearing down, then we need to recreate the raftInitCh 923 b.raftInitCh = make(chan struct{}) 924 b.l.Unlock() 925 926 if future != nil { 927 return future.Error() 928 } 929 930 return nil 931} 932 933// CommittedIndex returns the latest index committed to stable storage 934func (b *RaftBackend) CommittedIndex() uint64 { 935 b.l.RLock() 936 defer b.l.RUnlock() 937 938 if b.raft == nil { 939 return 0 940 } 941 942 return b.raft.LastIndex() 943} 944 945// AppliedIndex returns the latest index applied to the FSM 946func (b *RaftBackend) AppliedIndex() uint64 { 947 b.l.RLock() 948 defer b.l.RUnlock() 949 950 if b.fsm == nil { 951 return 0 952 } 953 954 // We use the latest index that the FSM has seen here, which may be behind 955 // raft.AppliedIndex() due to the async nature of the raft library. 956 indexState, _ := b.fsm.LatestState() 957 return indexState.Index 958} 959 960// Term returns the raft term of this node. 961func (b *RaftBackend) Term() uint64 { 962 b.l.RLock() 963 defer b.l.RUnlock() 964 965 if b.fsm == nil { 966 return 0 967 } 968 969 // We use the latest index that the FSM has seen here, which may be behind 970 // raft.AppliedIndex() due to the async nature of the raft library. 971 indexState, _ := b.fsm.LatestState() 972 return indexState.Term 973} 974 975// RemovePeer removes the given peer ID from the raft cluster. If the node is 976// ourselves we will give up leadership. 977func (b *RaftBackend) RemovePeer(ctx context.Context, peerID string) error { 978 b.l.RLock() 979 defer b.l.RUnlock() 980 981 if b.disableAutopilot { 982 if b.raft == nil { 983 return errors.New("raft storage is not initialized") 984 } 985 b.logger.Trace("removing server from raft", "id", peerID) 986 future := b.raft.RemoveServer(raft.ServerID(peerID), 0, 0) 987 return future.Error() 988 } 989 990 if b.autopilot == nil { 991 return errors.New("raft storage autopilot is not initialized") 992 } 993 994 b.logger.Trace("removing server from raft via autopilot", "id", peerID) 995 return b.autopilot.RemoveServer(raft.ServerID(peerID)) 996} 997 998// GetConfigurationOffline is used to read the stale, last known raft 999// configuration to this node. It accesses the last state written into the 1000// FSM. When a server is online use GetConfiguration instead. 1001func (b *RaftBackend) GetConfigurationOffline() (*RaftConfigurationResponse, error) { 1002 b.l.RLock() 1003 defer b.l.RUnlock() 1004 1005 if b.raft != nil { 1006 return nil, errors.New("raft storage is initialized, used GetConfiguration instead") 1007 } 1008 1009 if b.fsm == nil { 1010 return nil, nil 1011 } 1012 1013 state, configuration := b.fsm.LatestState() 1014 config := &RaftConfigurationResponse{ 1015 Index: state.Index, 1016 } 1017 1018 if configuration == nil || configuration.Servers == nil { 1019 return config, nil 1020 } 1021 1022 for _, server := range configuration.Servers { 1023 entry := &RaftServer{ 1024 NodeID: server.Id, 1025 Address: server.Address, 1026 // Since we are offline no node is the leader. 1027 Leader: false, 1028 Voter: raft.ServerSuffrage(server.Suffrage) == raft.Voter, 1029 } 1030 config.Servers = append(config.Servers, entry) 1031 } 1032 1033 return config, nil 1034} 1035 1036func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationResponse, error) { 1037 b.l.RLock() 1038 defer b.l.RUnlock() 1039 1040 if b.raft == nil { 1041 return nil, errors.New("raft storage is not initialized") 1042 } 1043 1044 future := b.raft.GetConfiguration() 1045 if err := future.Error(); err != nil { 1046 return nil, err 1047 } 1048 1049 config := &RaftConfigurationResponse{ 1050 Index: future.Index(), 1051 } 1052 1053 for _, server := range future.Configuration().Servers { 1054 entry := &RaftServer{ 1055 NodeID: string(server.ID), 1056 Address: string(server.Address), 1057 // Since we only service this request on the active node our node ID 1058 // denotes the raft leader. 1059 Leader: string(server.ID) == b.NodeID(), 1060 Voter: server.Suffrage == raft.Voter, 1061 ProtocolVersion: strconv.Itoa(raft.ProtocolVersionMax), 1062 } 1063 config.Servers = append(config.Servers, entry) 1064 } 1065 1066 return config, nil 1067} 1068 1069// AddPeer adds a new server to the raft cluster 1070func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) error { 1071 b.l.RLock() 1072 defer b.l.RUnlock() 1073 1074 if b.disableAutopilot { 1075 if b.raft == nil { 1076 return errors.New("raft storage is not initialized") 1077 } 1078 b.logger.Trace("adding server to raft", "id", peerID) 1079 future := b.raft.AddVoter(raft.ServerID(peerID), raft.ServerAddress(clusterAddr), 0, 0) 1080 return future.Error() 1081 } 1082 1083 if b.autopilot == nil { 1084 return errors.New("raft storage autopilot is not initialized") 1085 } 1086 1087 b.logger.Trace("adding server to raft via autopilot", "id", peerID) 1088 return b.autopilot.AddServer(&autopilot.Server{ 1089 ID: raft.ServerID(peerID), 1090 Name: peerID, 1091 Address: raft.ServerAddress(clusterAddr), 1092 RaftVersion: raft.ProtocolVersionMax, 1093 NodeType: autopilot.NodeVoter, 1094 }) 1095} 1096 1097// Peers returns all the servers present in the raft cluster 1098func (b *RaftBackend) Peers(ctx context.Context) ([]Peer, error) { 1099 b.l.RLock() 1100 defer b.l.RUnlock() 1101 1102 if b.raft == nil { 1103 return nil, errors.New("raft storage is not initialized") 1104 } 1105 1106 future := b.raft.GetConfiguration() 1107 if err := future.Error(); err != nil { 1108 return nil, err 1109 } 1110 1111 ret := make([]Peer, len(future.Configuration().Servers)) 1112 for i, s := range future.Configuration().Servers { 1113 ret[i] = Peer{ 1114 ID: string(s.ID), 1115 Address: string(s.Address), 1116 Suffrage: int(s.Suffrage), 1117 } 1118 } 1119 1120 return ret, nil 1121} 1122 1123// SnapshotHTTP is a wrapper for Snapshot that sends the snapshot as an HTTP 1124// response. 1125func (b *RaftBackend) SnapshotHTTP(out *logical.HTTPResponseWriter, access *seal.Access) error { 1126 out.Header().Add("Content-Disposition", "attachment") 1127 out.Header().Add("Content-Type", "application/gzip") 1128 1129 return b.Snapshot(out, access) 1130} 1131 1132// Snapshot takes a raft snapshot, packages it into a archive file and writes it 1133// to the provided writer. Seal access is used to encrypt the SHASUM file so we 1134// can validate the snapshot was taken using the same master keys or not. 1135func (b *RaftBackend) Snapshot(out io.Writer, access *seal.Access) error { 1136 b.l.RLock() 1137 defer b.l.RUnlock() 1138 1139 if b.raft == nil { 1140 return errors.New("raft storage is sealed") 1141 } 1142 1143 // If we have access to the seal create a sealer object 1144 var s snapshot.Sealer 1145 if access != nil { 1146 s = &sealer{ 1147 access: access, 1148 } 1149 } 1150 1151 return snapshot.Write(b.logger.Named("snapshot"), b.raft, s, out) 1152} 1153 1154// WriteSnapshotToTemp reads a snapshot archive off the provided reader, 1155// extracts the data and writes the snapshot to a temporary file. The seal 1156// access is used to decrypt the SHASUM file in the archive to ensure this 1157// snapshot has the same master key as the running instance. If the provided 1158// access is nil then it will skip that validation. 1159func (b *RaftBackend) WriteSnapshotToTemp(in io.ReadCloser, access *seal.Access) (*os.File, func(), raft.SnapshotMeta, error) { 1160 b.l.RLock() 1161 defer b.l.RUnlock() 1162 1163 var metadata raft.SnapshotMeta 1164 if b.raft == nil { 1165 return nil, nil, metadata, errors.New("raft storage is sealed") 1166 } 1167 1168 // If we have access to the seal create a sealer object 1169 var s snapshot.Sealer 1170 if access != nil { 1171 s = &sealer{ 1172 access: access, 1173 } 1174 } 1175 1176 snap, cleanup, err := snapshot.WriteToTempFileWithSealer(b.logger.Named("snapshot"), in, &metadata, s) 1177 return snap, cleanup, metadata, err 1178} 1179 1180// RestoreSnapshot applies the provided snapshot metadata and snapshot data to 1181// raft. 1182func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.SnapshotMeta, snap io.Reader) error { 1183 b.l.RLock() 1184 defer b.l.RUnlock() 1185 1186 if b.raft == nil { 1187 return errors.New("raft storage is not initialized") 1188 } 1189 1190 if err := b.raft.Restore(&metadata, snap, 0); err != nil { 1191 b.logger.Named("snapshot").Error("failed to restore snapshot", "error", err) 1192 return err 1193 } 1194 1195 // Apply a log that tells the follower nodes to run the restore callback 1196 // function. This is done after the restore call so we can be sure the 1197 // snapshot applied to a quorum of nodes. 1198 command := &LogData{ 1199 Operations: []*LogOperation{ 1200 { 1201 OpType: restoreCallbackOp, 1202 }, 1203 }, 1204 } 1205 1206 err := b.applyLog(ctx, command) 1207 1208 // Do a best-effort attempt to let the standbys apply the restoreCallbackOp 1209 // before we continue. 1210 time.Sleep(restoreOpDelayDuration) 1211 return err 1212} 1213 1214// Delete inserts an entry in the log to delete the given path 1215func (b *RaftBackend) Delete(ctx context.Context, path string) error { 1216 defer metrics.MeasureSince([]string{"raft-storage", "delete"}, time.Now()) 1217 command := &LogData{ 1218 Operations: []*LogOperation{ 1219 { 1220 OpType: deleteOp, 1221 Key: path, 1222 }, 1223 }, 1224 } 1225 b.permitPool.Acquire() 1226 defer b.permitPool.Release() 1227 1228 b.l.RLock() 1229 err := b.applyLog(ctx, command) 1230 b.l.RUnlock() 1231 return err 1232} 1233 1234// Get returns the value corresponding to the given path from the fsm 1235func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, error) { 1236 defer metrics.MeasureSince([]string{"raft-storage", "get"}, time.Now()) 1237 if b.fsm == nil { 1238 return nil, errors.New("raft: fsm not configured") 1239 } 1240 1241 b.permitPool.Acquire() 1242 defer b.permitPool.Release() 1243 1244 entry, err := b.fsm.Get(ctx, path) 1245 if entry != nil { 1246 valueLen := len(entry.Value) 1247 if uint64(valueLen) > b.maxEntrySize { 1248 b.logger.Warn("retrieved entry value is too large, has raft's max_entry_size been reduced?", 1249 "size", valueLen, "max_entry_size", b.maxEntrySize) 1250 } 1251 } 1252 1253 return entry, err 1254} 1255 1256// Put inserts an entry in the log for the put operation. It will return an 1257// error if the resulting entry encoding exceeds the configured max_entry_size 1258// or if the call to applyLog fails. 1259func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error { 1260 defer metrics.MeasureSince([]string{"raft-storage", "put"}, time.Now()) 1261 command := &LogData{ 1262 Operations: []*LogOperation{ 1263 { 1264 OpType: putOp, 1265 Key: entry.Key, 1266 Value: entry.Value, 1267 }, 1268 }, 1269 } 1270 1271 b.permitPool.Acquire() 1272 defer b.permitPool.Release() 1273 1274 b.l.RLock() 1275 err := b.applyLog(ctx, command) 1276 b.l.RUnlock() 1277 return err 1278} 1279 1280// List enumerates all the items under the prefix from the fsm 1281func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error) { 1282 defer metrics.MeasureSince([]string{"raft-storage", "list"}, time.Now()) 1283 if b.fsm == nil { 1284 return nil, errors.New("raft: fsm not configured") 1285 } 1286 1287 b.permitPool.Acquire() 1288 defer b.permitPool.Release() 1289 1290 return b.fsm.List(ctx, prefix) 1291} 1292 1293// Transaction applies all the given operations into a single log and 1294// applies it. 1295func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error { 1296 defer metrics.MeasureSince([]string{"raft-storage", "transaction"}, time.Now()) 1297 command := &LogData{ 1298 Operations: make([]*LogOperation, len(txns)), 1299 } 1300 for i, txn := range txns { 1301 op := &LogOperation{} 1302 switch txn.Operation { 1303 case physical.PutOperation: 1304 op.OpType = putOp 1305 op.Key = txn.Entry.Key 1306 op.Value = txn.Entry.Value 1307 case physical.DeleteOperation: 1308 op.OpType = deleteOp 1309 op.Key = txn.Entry.Key 1310 default: 1311 return fmt.Errorf("%q is not a supported transaction operation", txn.Operation) 1312 } 1313 1314 command.Operations[i] = op 1315 } 1316 1317 b.permitPool.Acquire() 1318 defer b.permitPool.Release() 1319 1320 b.l.RLock() 1321 err := b.applyLog(ctx, command) 1322 b.l.RUnlock() 1323 return err 1324} 1325 1326// applyLog will take a given log command and apply it to the raft log. applyLog 1327// doesn't return until the log has been applied to a quorum of servers and is 1328// persisted to the local FSM. Caller should hold the backend's read lock. 1329func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error { 1330 if b.raft == nil { 1331 return errors.New("raft storage is not initialized") 1332 } 1333 1334 commandBytes, err := proto.Marshal(command) 1335 if err != nil { 1336 return err 1337 } 1338 1339 cmdSize := len(commandBytes) 1340 if uint64(cmdSize) > b.maxEntrySize { 1341 return fmt.Errorf("%s; got %d bytes, max: %d bytes", physical.ErrValueTooLarge, cmdSize, b.maxEntrySize) 1342 } 1343 1344 defer metrics.AddSample([]string{"raft-storage", "entry_size"}, float32(cmdSize)) 1345 1346 var chunked bool 1347 var applyFuture raft.ApplyFuture 1348 switch { 1349 case len(commandBytes) <= raftchunking.ChunkSize: 1350 applyFuture = b.raft.Apply(commandBytes, 0) 1351 default: 1352 chunked = true 1353 applyFuture = raftchunking.ChunkingApply(commandBytes, nil, 0, b.raft.ApplyLog) 1354 } 1355 1356 if err := applyFuture.Error(); err != nil { 1357 return err 1358 } 1359 1360 resp := applyFuture.Response() 1361 1362 if chunked { 1363 // In this case we didn't apply all chunks successfully, possibly due 1364 // to a term change 1365 if resp == nil { 1366 // This returns the error in the interface because the raft library 1367 // returns errors from the FSM via the future, not via err from the 1368 // apply function. Downstream client code expects to see any error 1369 // from the FSM (as opposed to the apply itself) and decide whether 1370 // it can retry in the future's response. 1371 return errors.New("applying chunking failed, please retry") 1372 } 1373 1374 // We expect that this conversion should always work 1375 chunkedSuccess, ok := resp.(raftchunking.ChunkingSuccess) 1376 if !ok { 1377 return errors.New("unknown type of response back from chunking FSM") 1378 } 1379 1380 // Replace the reply with the inner wrapped version 1381 resp = chunkedSuccess.Response 1382 } 1383 1384 if resp, ok := resp.(*FSMApplyResponse); !ok || !resp.Success { 1385 return errors.New("could not apply data") 1386 } 1387 1388 return nil 1389} 1390 1391// HAEnabled is the implementation of the HABackend interface 1392func (b *RaftBackend) HAEnabled() bool { return true } 1393 1394// HAEnabled is the implementation of the HABackend interface 1395func (b *RaftBackend) LockWith(key, value string) (physical.Lock, error) { 1396 return &RaftLock{ 1397 key: key, 1398 value: []byte(value), 1399 b: b, 1400 }, nil 1401} 1402 1403// SetDesiredSuffrage sets a field in the fsm indicating the suffrage intent for 1404// this node. 1405func (b *RaftBackend) SetDesiredSuffrage(nonVoter bool) error { 1406 b.l.Lock() 1407 defer b.l.Unlock() 1408 1409 var desiredSuffrage string 1410 switch nonVoter { 1411 case true: 1412 desiredSuffrage = "non-voter" 1413 default: 1414 desiredSuffrage = "voter" 1415 } 1416 1417 err := b.fsm.recordSuffrage(desiredSuffrage) 1418 if err != nil { 1419 return err 1420 } 1421 1422 return nil 1423} 1424 1425func (b *RaftBackend) DesiredSuffrage() string { 1426 return b.fsm.DesiredSuffrage() 1427} 1428 1429// RaftLock implements the physical Lock interface and enables HA for this 1430// backend. The Lock uses the raftNotifyCh for receiving leadership edge 1431// triggers. Vault's active duty matches raft's leadership. 1432type RaftLock struct { 1433 key string 1434 value []byte 1435 1436 b *RaftBackend 1437} 1438 1439// monitorLeadership waits until we receive an update on the raftNotifyCh and 1440// closes the leaderLost channel. 1441func (l *RaftLock) monitorLeadership(stopCh <-chan struct{}, leaderNotifyCh <-chan bool) <-chan struct{} { 1442 leaderLost := make(chan struct{}) 1443 go func() { 1444 for { 1445 select { 1446 case isLeader := <-leaderNotifyCh: 1447 // leaderNotifyCh may deliver a true value initially if this 1448 // server is already the leader prior to RaftLock.Lock call 1449 // (the true message was already queued). The next message is 1450 // always going to be false. The for loop should loop at most 1451 // twice. 1452 if !isLeader { 1453 close(leaderLost) 1454 return 1455 } 1456 case <-stopCh: 1457 return 1458 } 1459 } 1460 }() 1461 return leaderLost 1462} 1463 1464// Lock blocks until we become leader or are shutdown. It returns a channel that 1465// is closed when we detect a loss of leadership. 1466func (l *RaftLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { 1467 // If not initialized, block until it is 1468 if !l.b.Initialized() { 1469 select { 1470 case <-l.b.raftInitCh: 1471 case <-stopCh: 1472 return nil, nil 1473 } 1474 } 1475 1476 l.b.l.RLock() 1477 1478 // Ensure that we still have a raft instance after grabbing the read lock 1479 if l.b.raft == nil { 1480 l.b.l.RUnlock() 1481 return nil, errors.New("attempted to grab a lock on a nil raft backend") 1482 } 1483 1484 // Cache the notifyCh locally 1485 leaderNotifyCh := l.b.raftNotifyCh 1486 1487 // Check to see if we are already leader. 1488 if l.b.raft.State() == raft.Leader { 1489 err := l.b.applyLog(context.Background(), &LogData{ 1490 Operations: []*LogOperation{ 1491 { 1492 OpType: putOp, 1493 Key: l.key, 1494 Value: l.value, 1495 }, 1496 }, 1497 }) 1498 l.b.l.RUnlock() 1499 if err != nil { 1500 return nil, err 1501 } 1502 1503 return l.monitorLeadership(stopCh, leaderNotifyCh), nil 1504 } 1505 l.b.l.RUnlock() 1506 1507 for { 1508 select { 1509 case isLeader := <-leaderNotifyCh: 1510 if isLeader { 1511 // We are leader, set the key 1512 l.b.l.RLock() 1513 err := l.b.applyLog(context.Background(), &LogData{ 1514 Operations: []*LogOperation{ 1515 { 1516 OpType: putOp, 1517 Key: l.key, 1518 Value: l.value, 1519 }, 1520 }, 1521 }) 1522 l.b.l.RUnlock() 1523 if err != nil { 1524 return nil, err 1525 } 1526 1527 return l.monitorLeadership(stopCh, leaderNotifyCh), nil 1528 } 1529 case <-stopCh: 1530 return nil, nil 1531 } 1532 } 1533} 1534 1535// Unlock gives up leadership. 1536func (l *RaftLock) Unlock() error { 1537 if l.b.raft == nil { 1538 return nil 1539 } 1540 1541 return l.b.raft.LeadershipTransfer().Error() 1542} 1543 1544// Value reads the value of the lock. This informs us who is currently leader. 1545func (l *RaftLock) Value() (bool, string, error) { 1546 e, err := l.b.Get(context.Background(), l.key) 1547 if err != nil { 1548 return false, "", err 1549 } 1550 if e == nil { 1551 return false, "", nil 1552 } 1553 1554 value := string(e.Value) 1555 // TODO: how to tell if held? 1556 return true, value, nil 1557} 1558 1559// sealer implements the snapshot.Sealer interface and is used in the snapshot 1560// process for encrypting/decrypting the SHASUM file in snapshot archives. 1561type sealer struct { 1562 access *seal.Access 1563} 1564 1565// Seal encrypts the data with using the seal access object. 1566func (s sealer) Seal(ctx context.Context, pt []byte) ([]byte, error) { 1567 if s.access == nil { 1568 return nil, errors.New("no seal access available") 1569 } 1570 eblob, err := s.access.Encrypt(ctx, pt, nil) 1571 if err != nil { 1572 return nil, err 1573 } 1574 1575 return proto.Marshal(eblob) 1576} 1577 1578// Open decrypts the data using the seal access object. 1579func (s sealer) Open(ctx context.Context, ct []byte) ([]byte, error) { 1580 if s.access == nil { 1581 return nil, errors.New("no seal access available") 1582 } 1583 1584 var eblob wrapping.EncryptedBlobInfo 1585 err := proto.Unmarshal(ct, &eblob) 1586 if err != nil { 1587 return nil, err 1588 } 1589 1590 return s.access.Decrypt(ctx, &eblob, nil) 1591} 1592 1593// freelistOptions returns the freelist type and nofreelistsync values to use 1594// when opening boltdb files, based on our preferred defaults, and the possible 1595// presence of overriding environment variables. 1596func freelistOptions() (bolt.FreelistType, bool) { 1597 freelistType := bolt.FreelistMapType 1598 noFreelistSync := true 1599 1600 if os.Getenv("VAULT_RAFT_FREELIST_TYPE") == "array" { 1601 freelistType = bolt.FreelistArrayType 1602 } 1603 1604 if os.Getenv("VAULT_RAFT_FREELIST_SYNC") != "" { 1605 noFreelistSync = false 1606 } 1607 1608 return freelistType, noFreelistSync 1609} 1610