1package fsm 2 3import ( 4 "fmt" 5 "time" 6 7 "github.com/armon/go-metrics" 8 "github.com/armon/go-metrics/prometheus" 9 10 "github.com/hashicorp/consul/agent/consul/state" 11 "github.com/hashicorp/consul/agent/structs" 12 "github.com/hashicorp/consul/api" 13) 14 15var CommandsSummaries = []prometheus.SummaryDefinition{ 16 { 17 Name: []string{"fsm", "register"}, 18 Help: "Measures the time it takes to apply a catalog register operation to the FSM.", 19 }, 20 { 21 Name: []string{"fsm", "deregister"}, 22 Help: "Measures the time it takes to apply a catalog deregister operation to the FSM.", 23 }, 24 { 25 Name: []string{"fsm", "kvs"}, 26 Help: "Measures the time it takes to apply the given KV operation to the FSM.", 27 }, 28 { 29 Name: []string{"fsm", "session"}, 30 Help: "Measures the time it takes to apply the given session operation to the FSM.", 31 }, 32 { 33 Name: []string{"fsm", "acl"}, 34 Help: "Measures the time it takes to apply the given ACL operation to the FSM.", 35 }, 36 { 37 Name: []string{"fsm", "tombstone"}, 38 Help: "Measures the time it takes to apply the given tombstone operation to the FSM.", 39 }, 40 { 41 Name: []string{"fsm", "coordinate", "batch-update"}, 42 Help: "Measures the time it takes to apply the given batch coordinate update to the FSM.", 43 }, 44 { 45 Name: []string{"fsm", "prepared-query"}, 46 Help: "Measures the time it takes to apply the given prepared query update operation to the FSM.", 47 }, 48 { 49 Name: []string{"fsm", "txn"}, 50 Help: "Measures the time it takes to apply the given transaction update to the FSM.", 51 }, 52 { 53 Name: []string{"fsm", "autopilot"}, 54 Help: "Measures the time it takes to apply the given autopilot update to the FSM.", 55 }, 56 { 57 Name: []string{"consul", "fsm", "intention"}, 58 Help: "Deprecated - use fsm_intention instead", 59 }, 60 { 61 Name: []string{"fsm", "intention"}, 62 Help: "Measures the time it takes to apply an intention operation to the FSM.", 63 }, 64 { 65 Name: []string{"consul", "fsm", "ca"}, 66 Help: "Deprecated - use fsm_ca instead", 67 }, 68 { 69 Name: []string{"fsm", "ca"}, 70 Help: "Measures the time it takes to apply CA configuration operations to the FSM.", 71 }, 72 { 73 Name: []string{"fsm", "ca", "leaf"}, 74 Help: "Measures the time it takes to apply an operation while signing a leaf certificate.", 75 }, 76 { 77 Name: []string{"fsm", "acl", "token"}, 78 Help: "Measures the time it takes to apply an ACL token operation to the FSM.", 79 }, 80 { 81 Name: []string{"fsm", "acl", "policy"}, 82 Help: "Measures the time it takes to apply an ACL policy operation to the FSM.", 83 }, 84 { 85 Name: []string{"fsm", "acl", "bindingrule"}, 86 Help: "Measures the time it takes to apply an ACL binding rule operation to the FSM.", 87 }, 88 { 89 Name: []string{"fsm", "acl", "authmethod"}, 90 Help: "Measures the time it takes to apply an ACL authmethod operation to the FSM.", 91 }, 92 { 93 Name: []string{"fsm", "system_metadata"}, 94 Help: "Measures the time it takes to apply a system metadata operation to the FSM.", 95 }, 96 // TODO(kit): We generate the config-entry fsm summaries by reading off of the request. It is 97 // possible to statically declare these when we know all of the names, but I didn't get to it 98 // in this patch. Config-entries are known though and we should add these in the future. 99 // { 100 // Name: []string{"fsm", "config_entry", req.Entry.GetKind()}, 101 // Help: "", 102 // }, 103} 104 105func init() { 106 registerCommand(structs.RegisterRequestType, (*FSM).applyRegister) 107 registerCommand(structs.DeregisterRequestType, (*FSM).applyDeregister) 108 registerCommand(structs.KVSRequestType, (*FSM).applyKVSOperation) 109 registerCommand(structs.SessionRequestType, (*FSM).applySessionOperation) 110 // DEPRECATED (ACL-Legacy-Compat) - Only needed for v1 ACL compat 111 registerCommand(structs.ACLRequestType, (*FSM).applyACLOperation) 112 registerCommand(structs.TombstoneRequestType, (*FSM).applyTombstoneOperation) 113 registerCommand(structs.CoordinateBatchUpdateType, (*FSM).applyCoordinateBatchUpdate) 114 registerCommand(structs.PreparedQueryRequestType, (*FSM).applyPreparedQueryOperation) 115 registerCommand(structs.TxnRequestType, (*FSM).applyTxn) 116 registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate) 117 registerCommand(structs.IntentionRequestType, (*FSM).applyIntentionOperation) 118 registerCommand(structs.ConnectCARequestType, (*FSM).applyConnectCAOperation) 119 registerCommand(structs.ACLTokenSetRequestType, (*FSM).applyACLTokenSetOperation) 120 registerCommand(structs.ACLTokenDeleteRequestType, (*FSM).applyACLTokenDeleteOperation) 121 registerCommand(structs.ACLBootstrapRequestType, (*FSM).applyACLTokenBootstrap) 122 registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation) 123 registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation) 124 registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation) 125 registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation) 126 registerCommand(structs.ACLRoleSetRequestType, (*FSM).applyACLRoleSetOperation) 127 registerCommand(structs.ACLRoleDeleteRequestType, (*FSM).applyACLRoleDeleteOperation) 128 registerCommand(structs.ACLBindingRuleSetRequestType, (*FSM).applyACLBindingRuleSetOperation) 129 registerCommand(structs.ACLBindingRuleDeleteRequestType, (*FSM).applyACLBindingRuleDeleteOperation) 130 registerCommand(structs.ACLAuthMethodSetRequestType, (*FSM).applyACLAuthMethodSetOperation) 131 registerCommand(structs.ACLAuthMethodDeleteRequestType, (*FSM).applyACLAuthMethodDeleteOperation) 132 registerCommand(structs.FederationStateRequestType, (*FSM).applyFederationStateOperation) 133 registerCommand(structs.SystemMetadataRequestType, (*FSM).applySystemMetadataOperation) 134} 135 136func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { 137 defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now()) 138 var req structs.RegisterRequest 139 if err := structs.Decode(buf, &req); err != nil { 140 panic(fmt.Errorf("failed to decode request: %v", err)) 141 } 142 143 // Apply all updates in a single transaction 144 if err := c.state.EnsureRegistration(index, &req); err != nil { 145 c.logger.Warn("EnsureRegistration failed", "error", err) 146 return err 147 } 148 return nil 149} 150 151func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} { 152 defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now()) 153 var req structs.DeregisterRequest 154 if err := structs.Decode(buf, &req); err != nil { 155 panic(fmt.Errorf("failed to decode request: %v", err)) 156 } 157 158 // Either remove the service entry or the whole node. The precedence 159 // here is also baked into vetDeregisterWithACL() in acl.go, so if you 160 // make changes here, be sure to also adjust the code over there. 161 if req.ServiceID != "" { 162 if err := c.state.DeleteService(index, req.Node, req.ServiceID, &req.EnterpriseMeta); err != nil { 163 c.logger.Warn("DeleteNodeService failed", "error", err) 164 return err 165 } 166 } else if req.CheckID != "" { 167 if err := c.state.DeleteCheck(index, req.Node, req.CheckID, &req.EnterpriseMeta); err != nil { 168 c.logger.Warn("DeleteNodeCheck failed", "error", err) 169 return err 170 } 171 } else { 172 if err := c.state.DeleteNode(index, req.Node); err != nil { 173 c.logger.Warn("DeleteNode failed", "error", err) 174 return err 175 } 176 } 177 return nil 178} 179 180func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} { 181 var req structs.KVSRequest 182 if err := structs.Decode(buf, &req); err != nil { 183 panic(fmt.Errorf("failed to decode request: %v", err)) 184 } 185 defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(), 186 []metrics.Label{{Name: "op", Value: string(req.Op)}}) 187 switch req.Op { 188 case api.KVSet: 189 return c.state.KVSSet(index, &req.DirEnt) 190 case api.KVDelete: 191 return c.state.KVSDelete(index, req.DirEnt.Key, &req.DirEnt.EnterpriseMeta) 192 case api.KVDeleteCAS: 193 act, err := c.state.KVSDeleteCAS(index, req.DirEnt.ModifyIndex, req.DirEnt.Key, &req.DirEnt.EnterpriseMeta) 194 if err != nil { 195 return err 196 } 197 return act 198 case api.KVDeleteTree: 199 return c.state.KVSDeleteTree(index, req.DirEnt.Key, &req.DirEnt.EnterpriseMeta) 200 case api.KVCAS: 201 act, err := c.state.KVSSetCAS(index, &req.DirEnt) 202 if err != nil { 203 return err 204 } 205 return act 206 case api.KVLock: 207 act, err := c.state.KVSLock(index, &req.DirEnt) 208 if err != nil { 209 return err 210 } 211 return act 212 case api.KVUnlock: 213 act, err := c.state.KVSUnlock(index, &req.DirEnt) 214 if err != nil { 215 return err 216 } 217 return act 218 default: 219 err := fmt.Errorf("Invalid KVS operation '%s'", req.Op) 220 c.logger.Warn("Invalid KVS operation", "operation", req.Op) 221 return err 222 } 223} 224 225func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} { 226 var req structs.SessionRequest 227 if err := structs.Decode(buf, &req); err != nil { 228 panic(fmt.Errorf("failed to decode request: %v", err)) 229 } 230 defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(), 231 []metrics.Label{{Name: "op", Value: string(req.Op)}}) 232 switch req.Op { 233 case structs.SessionCreate: 234 if err := c.state.SessionCreate(index, &req.Session); err != nil { 235 return err 236 } 237 return req.Session.ID 238 case structs.SessionDestroy: 239 return c.state.SessionDestroy(index, req.Session.ID, &req.Session.EnterpriseMeta) 240 default: 241 c.logger.Warn("Invalid Session operation", "operation", req.Op) 242 return fmt.Errorf("Invalid Session operation '%s'", req.Op) 243 } 244} 245 246// DEPRECATED (ACL-Legacy-Compat) - Only needed for legacy compat 247func (c *FSM) applyACLOperation(buf []byte, index uint64) interface{} { 248 // TODO (ACL-Legacy-Compat) - Should we warn here somehow about using deprecated features 249 // maybe emit a second metric? 250 var req structs.ACLRequest 251 if err := structs.Decode(buf, &req); err != nil { 252 panic(fmt.Errorf("failed to decode request: %v", err)) 253 } 254 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl"}, time.Now(), 255 []metrics.Label{{Name: "op", Value: string(req.Op)}}) 256 switch req.Op { 257 case structs.ACLBootstrapInit: 258 enabled, _, err := c.state.CanBootstrapACLToken() 259 if err != nil { 260 return err 261 } 262 return enabled 263 case structs.ACLBootstrapNow: 264 // This is a bootstrap request from a non-upgraded node 265 if err := c.state.ACLBootstrap(index, 0, req.ACL.Convert(), true); err != nil { 266 return err 267 } 268 269 // No need to check expiration times as those did not exist in legacy tokens. 270 if _, token, err := c.state.ACLTokenGetBySecret(nil, req.ACL.ID, nil); err != nil { 271 return err 272 } else { 273 acl, err := token.Convert() 274 if err != nil { 275 return err 276 } 277 return acl 278 } 279 280 case structs.ACLForceSet, structs.ACLSet: 281 if err := c.state.ACLTokenSet(index, req.ACL.Convert(), true); err != nil { 282 return err 283 } 284 return req.ACL.ID 285 case structs.ACLDelete: 286 return c.state.ACLTokenDeleteBySecret(index, req.ACL.ID, nil) 287 default: 288 c.logger.Warn("Invalid ACL operation", "operation", req.Op) 289 return fmt.Errorf("Invalid ACL operation '%s'", req.Op) 290 } 291} 292 293func (c *FSM) applyTombstoneOperation(buf []byte, index uint64) interface{} { 294 var req structs.TombstoneRequest 295 if err := structs.Decode(buf, &req); err != nil { 296 panic(fmt.Errorf("failed to decode request: %v", err)) 297 } 298 defer metrics.MeasureSinceWithLabels([]string{"fsm", "tombstone"}, time.Now(), 299 []metrics.Label{{Name: "op", Value: string(req.Op)}}) 300 switch req.Op { 301 case structs.TombstoneReap: 302 return c.state.ReapTombstones(index, req.ReapIndex) 303 default: 304 c.logger.Warn("Invalid Tombstone operation", "operation", req.Op) 305 return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op) 306 } 307} 308 309// applyCoordinateBatchUpdate processes a batch of coordinate updates and applies 310// them in a single underlying transaction. This interface isn't 1:1 with the outer 311// update interface that the coordinate endpoint exposes, so we made it single 312// purpose and avoided the opcode convention. 313func (c *FSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} { 314 var updates structs.Coordinates 315 if err := structs.Decode(buf, &updates); err != nil { 316 panic(fmt.Errorf("failed to decode batch updates: %v", err)) 317 } 318 defer metrics.MeasureSince([]string{"fsm", "coordinate", "batch-update"}, time.Now()) 319 if err := c.state.CoordinateBatchUpdate(index, updates); err != nil { 320 return err 321 } 322 return nil 323} 324 325// applyPreparedQueryOperation applies the given prepared query operation to the 326// state store. 327func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} { 328 var req structs.PreparedQueryRequest 329 if err := structs.Decode(buf, &req); err != nil { 330 panic(fmt.Errorf("failed to decode request: %v", err)) 331 } 332 333 defer metrics.MeasureSinceWithLabels([]string{"fsm", "prepared-query"}, time.Now(), 334 []metrics.Label{{Name: "op", Value: string(req.Op)}}) 335 switch req.Op { 336 case structs.PreparedQueryCreate, structs.PreparedQueryUpdate: 337 return c.state.PreparedQuerySet(index, req.Query) 338 case structs.PreparedQueryDelete: 339 return c.state.PreparedQueryDelete(index, req.Query.ID) 340 default: 341 c.logger.Warn("Invalid PreparedQuery operation", "operation", req.Op) 342 return fmt.Errorf("Invalid PreparedQuery operation '%s'", req.Op) 343 } 344} 345 346func (c *FSM) applyTxn(buf []byte, index uint64) interface{} { 347 var req structs.TxnRequest 348 if err := structs.Decode(buf, &req); err != nil { 349 panic(fmt.Errorf("failed to decode request: %v", err)) 350 } 351 defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now()) 352 results, errors := c.state.TxnRW(index, req.Ops) 353 return structs.TxnResponse{ 354 Results: results, 355 Errors: errors, 356 } 357} 358 359func (c *FSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} { 360 var req structs.AutopilotSetConfigRequest 361 if err := structs.Decode(buf, &req); err != nil { 362 panic(fmt.Errorf("failed to decode request: %v", err)) 363 } 364 defer metrics.MeasureSince([]string{"fsm", "autopilot"}, time.Now()) 365 366 if req.CAS { 367 act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config) 368 if err != nil { 369 return err 370 } 371 return act 372 } 373 return c.state.AutopilotSetConfig(index, &req.Config) 374} 375 376// applyIntentionOperation applies the given intention operation to the state store. 377func (c *FSM) applyIntentionOperation(buf []byte, index uint64) interface{} { 378 var req structs.IntentionRequest 379 if err := structs.Decode(buf, &req); err != nil { 380 panic(fmt.Errorf("failed to decode request: %v", err)) 381 } 382 383 // TODO(kit): We should deprecate this first metric that writes the metrics_prefix itself, 384 // the config we use to flag this out, telemetry.disable_compat_1.9 is on the agent - how do 385 // we access it here? 386 defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "intention"}, time.Now(), 387 []metrics.Label{{Name: "op", Value: string(req.Op)}}) 388 389 defer metrics.MeasureSinceWithLabels([]string{"fsm", "intention"}, time.Now(), 390 []metrics.Label{{Name: "op", Value: string(req.Op)}}) 391 392 if req.Mutation != nil { 393 return c.state.IntentionMutation(index, req.Op, req.Mutation) 394 } 395 396 switch req.Op { 397 case structs.IntentionOpCreate, structs.IntentionOpUpdate: 398 //nolint:staticcheck 399 return c.state.LegacyIntentionSet(index, req.Intention) 400 case structs.IntentionOpDelete: 401 //nolint:staticcheck 402 return c.state.LegacyIntentionDelete(index, req.Intention.ID) 403 case structs.IntentionOpDeleteAll: 404 return c.state.LegacyIntentionDeleteAll(index) 405 case structs.IntentionOpUpsert: 406 fallthrough // unsupported 407 default: 408 c.logger.Warn("Invalid Intention operation", "operation", req.Op) 409 return fmt.Errorf("Invalid Intention operation '%s'", req.Op) 410 } 411} 412 413// applyConnectCAOperation applies the given CA operation to the state store. 414func (c *FSM) applyConnectCAOperation(buf []byte, index uint64) interface{} { 415 var req structs.CARequest 416 if err := structs.Decode(buf, &req); err != nil { 417 panic(fmt.Errorf("failed to decode request: %v", err)) 418 } 419 420 defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "ca"}, time.Now(), 421 []metrics.Label{{Name: "op", Value: string(req.Op)}}) 422 defer metrics.MeasureSinceWithLabels([]string{"fsm", "ca"}, time.Now(), 423 []metrics.Label{{Name: "op", Value: string(req.Op)}}) 424 switch req.Op { 425 case structs.CAOpSetConfig: 426 if req.Config.ModifyIndex != 0 { 427 act, err := c.state.CACheckAndSetConfig(index, req.Config.ModifyIndex, req.Config) 428 if err != nil { 429 return err 430 } 431 432 return act 433 } 434 435 return c.state.CASetConfig(index, req.Config) 436 case structs.CAOpSetRoots: 437 act, err := c.state.CARootSetCAS(index, req.Index, req.Roots) 438 if err != nil { 439 return err 440 } 441 442 return act 443 case structs.CAOpSetProviderState: 444 act, err := c.state.CASetProviderState(index, req.ProviderState) 445 if err != nil { 446 return err 447 } 448 449 return act 450 case structs.CAOpDeleteProviderState: 451 if err := c.state.CADeleteProviderState(index, req.ProviderState.ID); err != nil { 452 return err 453 } 454 455 return true 456 case structs.CAOpSetRootsAndConfig: 457 act, err := c.state.CARootSetCAS(index, req.Index, req.Roots) 458 if err != nil { 459 return err 460 } 461 if !act { 462 return act 463 } 464 465 act, err = c.state.CACheckAndSetConfig(index, req.Config.ModifyIndex, req.Config) 466 if err != nil { 467 return err 468 } 469 return act 470 case structs.CAOpIncrementProviderSerialNumber: 471 sn, err := c.state.CAIncrementProviderSerialNumber(index) 472 if err != nil { 473 return err 474 } 475 476 return sn 477 default: 478 c.logger.Warn("Invalid CA operation", "operation", req.Op) 479 return fmt.Errorf("Invalid CA operation '%s'", req.Op) 480 } 481} 482 483// applyConnectCALeafOperation applies an operation while signing a leaf certificate. 484func (c *FSM) applyConnectCALeafOperation(buf []byte, index uint64) interface{} { 485 var req structs.CALeafRequest 486 if err := structs.Decode(buf, &req); err != nil { 487 panic(fmt.Errorf("failed to decode request: %v", err)) 488 } 489 490 defer metrics.MeasureSinceWithLabels([]string{"fsm", "ca", "leaf"}, time.Now(), 491 []metrics.Label{{Name: "op", Value: string(req.Op)}}) 492 switch req.Op { 493 case structs.CALeafOpIncrementIndex: 494 // Use current index as the new value as well as the value to write at. 495 // TODO(banks) do we even use this op any more? 496 if err := c.state.CALeafSetIndex(index, index); err != nil { 497 return err 498 } 499 return index 500 default: 501 c.logger.Warn("Invalid CA Leaf operation", "operation", req.Op) 502 return fmt.Errorf("Invalid CA operation '%s'", req.Op) 503 } 504} 505 506func (c *FSM) applyACLTokenSetOperation(buf []byte, index uint64) interface{} { 507 var req structs.ACLTokenBatchSetRequest 508 if err := structs.Decode(buf, &req); err != nil { 509 panic(fmt.Errorf("failed to decode request: %v", err)) 510 } 511 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "token"}, time.Now(), 512 []metrics.Label{{Name: "op", Value: "upsert"}}) 513 514 opts := state.ACLTokenSetOptions{ 515 CAS: req.CAS, 516 AllowMissingPolicyAndRoleIDs: req.AllowMissingLinks, 517 ProhibitUnprivileged: req.ProhibitUnprivileged, 518 Legacy: false, 519 FromReplication: req.FromReplication, 520 } 521 return c.state.ACLTokenBatchSet(index, req.Tokens, opts) 522} 523 524func (c *FSM) applyACLTokenDeleteOperation(buf []byte, index uint64) interface{} { 525 var req structs.ACLTokenBatchDeleteRequest 526 if err := structs.Decode(buf, &req); err != nil { 527 panic(fmt.Errorf("failed to decode request: %v", err)) 528 } 529 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "token"}, time.Now(), 530 []metrics.Label{{Name: "op", Value: "delete"}}) 531 532 return c.state.ACLTokenBatchDelete(index, req.TokenIDs) 533} 534 535func (c *FSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{} { 536 var req structs.ACLTokenBootstrapRequest 537 if err := structs.Decode(buf, &req); err != nil { 538 panic(fmt.Errorf("failed to decode request: %v", err)) 539 } 540 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "token"}, time.Now(), 541 []metrics.Label{{Name: "op", Value: "bootstrap"}}) 542 return c.state.ACLBootstrap(index, req.ResetIndex, &req.Token, false) 543} 544 545func (c *FSM) applyACLPolicySetOperation(buf []byte, index uint64) interface{} { 546 var req structs.ACLPolicyBatchSetRequest 547 if err := structs.Decode(buf, &req); err != nil { 548 panic(fmt.Errorf("failed to decode request: %v", err)) 549 } 550 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "policy"}, time.Now(), 551 []metrics.Label{{Name: "op", Value: "upsert"}}) 552 553 return c.state.ACLPolicyBatchSet(index, req.Policies) 554} 555 556func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{} { 557 var req structs.ACLPolicyBatchDeleteRequest 558 if err := structs.Decode(buf, &req); err != nil { 559 panic(fmt.Errorf("failed to decode request: %v", err)) 560 } 561 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "policy"}, time.Now(), 562 []metrics.Label{{Name: "op", Value: "delete"}}) 563 564 return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs) 565} 566 567func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} { 568 req := structs.ConfigEntryRequest{ 569 Entry: &structs.ProxyConfigEntry{}, 570 } 571 if err := structs.Decode(buf, &req); err != nil { 572 panic(fmt.Errorf("failed to decode request: %v", err)) 573 } 574 575 switch req.Op { 576 case structs.ConfigEntryUpsertCAS: 577 defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), 578 []metrics.Label{{Name: "op", Value: "upsert"}}) 579 updated, err := c.state.EnsureConfigEntryCAS(index, req.Entry.GetRaftIndex().ModifyIndex, req.Entry) 580 if err != nil { 581 return err 582 } 583 return updated 584 case structs.ConfigEntryUpsert: 585 defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), 586 []metrics.Label{{Name: "op", Value: "upsert"}}) 587 if err := c.state.EnsureConfigEntry(index, req.Entry); err != nil { 588 return err 589 } 590 return true 591 case structs.ConfigEntryDelete: 592 defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(), 593 []metrics.Label{{Name: "op", Value: "delete"}}) 594 return c.state.DeleteConfigEntry(index, req.Entry.GetKind(), req.Entry.GetName(), req.Entry.GetEnterpriseMeta()) 595 default: 596 return fmt.Errorf("invalid config entry operation type: %v", req.Op) 597 } 598} 599 600func (c *FSM) applyACLRoleSetOperation(buf []byte, index uint64) interface{} { 601 var req structs.ACLRoleBatchSetRequest 602 if err := structs.Decode(buf, &req); err != nil { 603 panic(fmt.Errorf("failed to decode request: %v", err)) 604 } 605 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "role"}, time.Now(), 606 []metrics.Label{{Name: "op", Value: "upsert"}}) 607 608 return c.state.ACLRoleBatchSet(index, req.Roles, req.AllowMissingLinks) 609} 610 611func (c *FSM) applyACLRoleDeleteOperation(buf []byte, index uint64) interface{} { 612 var req structs.ACLRoleBatchDeleteRequest 613 if err := structs.Decode(buf, &req); err != nil { 614 panic(fmt.Errorf("failed to decode request: %v", err)) 615 } 616 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "role"}, time.Now(), 617 []metrics.Label{{Name: "op", Value: "delete"}}) 618 619 return c.state.ACLRoleBatchDelete(index, req.RoleIDs) 620} 621 622func (c *FSM) applyACLBindingRuleSetOperation(buf []byte, index uint64) interface{} { 623 var req structs.ACLBindingRuleBatchSetRequest 624 if err := structs.Decode(buf, &req); err != nil { 625 panic(fmt.Errorf("failed to decode request: %v", err)) 626 } 627 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "bindingrule"}, time.Now(), 628 []metrics.Label{{Name: "op", Value: "upsert"}}) 629 630 return c.state.ACLBindingRuleBatchSet(index, req.BindingRules) 631} 632 633func (c *FSM) applyACLBindingRuleDeleteOperation(buf []byte, index uint64) interface{} { 634 var req structs.ACLBindingRuleBatchDeleteRequest 635 if err := structs.Decode(buf, &req); err != nil { 636 panic(fmt.Errorf("failed to decode request: %v", err)) 637 } 638 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "bindingrule"}, time.Now(), 639 []metrics.Label{{Name: "op", Value: "delete"}}) 640 641 return c.state.ACLBindingRuleBatchDelete(index, req.BindingRuleIDs) 642} 643 644func (c *FSM) applyACLAuthMethodSetOperation(buf []byte, index uint64) interface{} { 645 var req structs.ACLAuthMethodBatchSetRequest 646 if err := structs.Decode(buf, &req); err != nil { 647 panic(fmt.Errorf("failed to decode request: %v", err)) 648 } 649 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "authmethod"}, time.Now(), 650 []metrics.Label{{Name: "op", Value: "upsert"}}) 651 652 return c.state.ACLAuthMethodBatchSet(index, req.AuthMethods) 653} 654 655func (c *FSM) applyACLAuthMethodDeleteOperation(buf []byte, index uint64) interface{} { 656 var req structs.ACLAuthMethodBatchDeleteRequest 657 if err := structs.Decode(buf, &req); err != nil { 658 panic(fmt.Errorf("failed to decode request: %v", err)) 659 } 660 defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "authmethod"}, time.Now(), 661 []metrics.Label{{Name: "op", Value: "delete"}}) 662 663 return c.state.ACLAuthMethodBatchDelete(index, req.AuthMethodNames, &req.EnterpriseMeta) 664} 665 666func (c *FSM) applyFederationStateOperation(buf []byte, index uint64) interface{} { 667 var req structs.FederationStateRequest 668 if err := structs.Decode(buf, &req); err != nil { 669 panic(fmt.Errorf("failed to decode request: %v", err)) 670 } 671 672 switch req.Op { 673 case structs.FederationStateUpsert: 674 defer metrics.MeasureSinceWithLabels([]string{"fsm", "federation_state", req.State.Datacenter}, time.Now(), 675 []metrics.Label{{Name: "op", Value: "upsert"}}) 676 if err := c.state.FederationStateSet(index, req.State); err != nil { 677 return err 678 } 679 return true 680 case structs.FederationStateDelete: 681 defer metrics.MeasureSinceWithLabels([]string{"fsm", "federation_state", req.State.Datacenter}, time.Now(), 682 []metrics.Label{{Name: "op", Value: "delete"}}) 683 return c.state.FederationStateDelete(index, req.State.Datacenter) 684 default: 685 return fmt.Errorf("invalid federation state operation type: %v", req.Op) 686 } 687} 688 689func (c *FSM) applySystemMetadataOperation(buf []byte, index uint64) interface{} { 690 var req structs.SystemMetadataRequest 691 if err := structs.Decode(buf, &req); err != nil { 692 panic(fmt.Errorf("failed to decode request: %v", err)) 693 } 694 695 switch req.Op { 696 case structs.SystemMetadataUpsert: 697 defer metrics.MeasureSinceWithLabels([]string{"fsm", "system_metadata"}, time.Now(), 698 []metrics.Label{{Name: "op", Value: "upsert"}}) 699 if err := c.state.SystemMetadataSet(index, req.Entry); err != nil { 700 return err 701 } 702 return true 703 case structs.SystemMetadataDelete: 704 defer metrics.MeasureSinceWithLabels([]string{"fsm", "system_metadata"}, time.Now(), 705 []metrics.Label{{Name: "op", Value: "delete"}}) 706 return c.state.SystemMetadataDelete(index, req.Entry) 707 default: 708 return fmt.Errorf("invalid system metadata operation type: %v", req.Op) 709 } 710} 711